Redux Observable middleware =========================== **Motivation** In mapstore we wanted to realize some complex async operations, like interactions with different servers, soft failing ... Realizing some complex tasks using only the thunk middleware was very hard and required a lot of code. We needed to: - Have interactions between plugins, keeping the isolation between them - Allow simple operation cancellation and recovery - Intercept actions to perform side-effects To do these operations MapStore uses, together with thunk, another middleware called **Redux Observable** In the following section we will have a basic introduction to the basic concepts behind this middleware: - Reactive Programming - Programming with Observables - Observables - RxJS .. note:: Getting into this can be complicated at the beginning. For this reason MapStore provides multiple ways to do asynchronous programming. You can use hooks (newer components of MapStore use them a lot too), or thunks if you prefer to keep all simple. Redux Observable gives a lot of benefits and allows to make code clean and smart when you get confident with it. Reactive Programming -------------------- **Reactive Programming** is a *declarative programming paradigm* that uses asynchronous data *streams*. In this paradigm all the asynchronous events, actions, data from server, callbacks are turned into a *stream*. A **stream** is a flow of events over the time (value, error, complete) .. figure:: img/marble.png :alt: marble diagram :align: center Marble diagram, commonly used to represent streams A *stream* is an entity that emits 3 kind of events - *values* - *errors* - *complete* In the diagram above, called *"Marble diagram"*, you can see these 3 kind of events represented respectively as: - "marble" (O) for *values* - "cross" (X) for *errors* - vertical line ("|") for *complete* events An object that represents this stream is also called "*Observable*". An observable in JavaScript has the subscribe method that looks like this: .. code-block:: javascript MyObservable.subscribe( onValue, // callback for new values, onError, // callback for errors onComplete // callback when complete ) So this object has at least one method, called ``subscribe``, that takes 3 functions as arguments: - The *first* method, will be called every time a new value (O) is emitted. - The *second* method will be called when an error is emitted - The *third* method will be called on complete. .. note:: In fact this object is very similar to a **Promise**, but because it can emit more than one "value", it need also a *complete* event. You can imagine a *Promise* as a special case of an Observable, that emits only one event, and then completes implicitly. RxJs ---- Is a library of of functions to combine, create and filter any of those streams and create the event flows you need. .. figure:: img/observable.jpg :alt: observable :align: center Observable A stream can be used as an input to another one. Even multiple streams can be used as inputs to another stream. - You can merge streams. - You can filter a stream to get another one that has only those events you are interested in. - You can map data values from one stream to another new one. - You have the power! **Example:** Let's imagine we want to create a function that log "click" events only if they are only double and triple clicks in an interval of 350ms (ignoring single clicks): This is the code you need in vanilla JavaScript: .. code-block:: javascript var clicks = 0; var timer; var timeout = 350; // time between each click function log(text) => console.log(text); // click timer document.addEventListener('click', function(e) { clearTimeout(timer); clicks++; var evt = e; timer = setTimeout(function() { if(clicks==2) { log(2); } if(clicks==3) { log(3); } clicks = 0; }, timeout); }); The RxJS version is: .. code-block:: javascript const single$ = Rx.Observable.fromEvent(button, 'click'); // $ is often used to indate that a variable is a stream function log(text) => console.log(text); single$ .bufferWhen(() => single$.debounceTime(350)) .map(list => list.length) .filter(length => length >= 2) .subscribe(totalClicks => { console.log(totalClicks); }); In this way of programming, you get your stream of events, than you can create a chain of operators to manage them. .. figure:: img/function-streams.jpg :alt: functions streams :align: center RxJS Operators -------------- All the operators listed below return a new Observable, so they can be chained each other to produce create the desired stream to subscribe. There are many operators that you can use to use for various use case, you can refer the `official documentation `_ to discover them all, or put them on test with `These interactive diagrams `_. Creating Observables """""""""""""""""""" These are methods of ``Rx.Observable``, and allow to create streams - ``fromEvent`` - Creates an Observable that emits events of a specific type coming from the given event target. (e.g. from mouse click event) .. figure:: img/fromEvent.png :alt: fromEvent operator :align: center ``fromEvent`` operator - ``of( )`` — convert an object or several objects into an Observable that emits that object and completes. .. figure:: img/of.jpg :alt: of operator :align: center ``of`` operator - ``from( )`` — convert an Iterable, a Future, or an Array into an Observable .. figure:: img/from.jpg :alt: from operator :align: center ``from`` operator - ``defer()``: defers the creation of the observable to the given function. ``defer(() => new Promise(....) )`` can be used to converts a promise into an Observable. .. figure:: img/defer.png :alt: defer :align: center defer operator Filter Operators """""""""""""""" This kind of operators allow to filter the events of a stream or completes under certain circumstances, without modifying the values. - ``filter()``: Filters the elements of an observable sequence based on a predicate. .. figure:: img/filter.png :alt: filter operator :align: center ``filter`` operator - ``takeUntil()``: Emits the values emitted by the source Observable until a notifier Observable emits a value. .. figure:: img/take-until.png :alt: takeUntil operator :align: center ``takeUntil`` operator Transformation Operators """""""""""""""""""""""" These kind of operators can be applied to a source stream to modify the values or the whole event sequence. - ``map()``: Similar to the well known ``Array.prototype.map`` function, this operator applies a transforms each value with a given function and emits that transformed values in the output Observable. .. figure:: img/map.png :alt: map :align: center ``map`` operator - ``mergeMap()`` Projects each source value to **an Observable** which is merged in the output Observable. .. figure:: img/mergeMap.jpg :alt: mergeMap :align: center mergeMap operator - ``switchMap()`` Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable .. figure:: img/switchMap-2.png :alt: switchMap :align: center switchMap operator The main difference between ``switchMap`` and and ``mergeMap`` is cancelling effect. On each emission source obserable the inner observable is stopped emitting (the result of the function you supplied) is cancelled and the new observable is subscribed. This works perfectly for scenarios like type-ahead where you are no longer concerned with the response of the previous request when a new input arrives. This also is a safe option in situations where a long lived inner observable could cause memory leaks, for instance if you used mergeMap with an interval and forgot to properly dispose of inner subscriptions. Remember, switchMap maintains only one inner subscription at a time. Be careful though, you probably want to avoid switchMap in scenarios where every request needs to complete, think writes to a database. switchMap could cancel a request if the source emits quickly enough. In these scenarios ``mergeMap`` is the correct option. .. note:: You can remember this by the phrase "switch to a new observable" . Redux Observable ---------------- `Redux Observable `_ is a middleware for redux that allow you to observe the stream of actions coming from the store and dispatch new actions. No need to subscribe, the middleware does it for you. Only thing to do is write an **Epic** **What is an Epic?** It is a function which takes a stream of actions as argument and returns a stream of actions to emit. *Actions in, actions out*. You can think of it of having roughly this type signature: .. code-block:: javascript function (action$: Observable, store: Store): Observable; The actions you emit will be immediately dispatched through the normal store.dispatch(), so under the hood redux-observable effectively does ``epic(actions$, store).subscribe(store.dispatch)`` .. note:: Epics run alongside the normal Redux dispatch channel, after the reducers have already received them--so you cannot "swallow" an incoming action. Actions always run through your reducers before your Epics even receive them. .. figure:: img/redux.gif :alt: redux animation :align: center redux with middleware Examples ======== .. figure:: img/delayedPingPong.png :alt: example of delayed ping pong :align: center delayed Ping Pong .. figure:: img/realWorldExample.png :alt: real world example :align: center real world example *Example from MapStore* The code below implements the search logic of the search input: less then 50 lines of code (included the comments) to provide: - Multiple search on different services - Wait 250ms before to trigger request, to wait the user to finish stop typing (debouncing) - Automatic cancellation and clean up when the user re-start typing, when he clicks on reset button, or when the input loses focus. - Progressive result presentation while services responses arrive - Sorting of results by service priority - Retry on error after 200ms - Error handling .. code-block:: javascript export const searchEpic = action$ => action$.ofType(TEXT_SEARCH_STARTED) .debounceTime(250) .switchMap( action => // create a stream of streams from array Rx.Observable.from( (action.services || [ {type: "nominatim", priority: 5} ]) // Create an stream for each Service .map((service) => { const serviceInstance = API.Utils.getService(service.type); if (!serviceInstance) { const err = new Error("Service Missing"); err.msgId = "search.service_missing"; err.serviceType = service.type; return Rx.Observable.of(err).do((e) => {throw e; }); } return Rx.Observable.defer(() => serviceInstance(action.searchText, service.options) // returning a promise is equivalent // to returning an observable .then( (response = []) => response .map(result => ({ ...result, __SERVICE__: service, __PRIORITY__: service.priority || 0 })) )) .retryWhen(errors => errors.delay(200).scan((count, err) => { if ( count >= 2) { throw err; } return count + 1; }, 0)); }) // map )// from // merge all results from the streams .mergeAll() .scan((oldRes, newRes) => sortBy([...oldRes, ...newRes], ["__PRIORITY__"])) // limit the number of results returned from all services to maxResults .map((results) => searchResultLoaded(results.slice(0, action.maxResults || 15), false)) .startWith(searchTextLoading(true)) .takeUntil(action$.ofType( TEXT_SEARCH_RESULTS_PURGE, TEXT_SEARCH_RESET, TEXT_SEARCH_ITEM_SELECTED)) .concat([searchTextLoading(false)]) .catch(e => { const err = {msgId: "search.generic_error", ...e, message: e.message, stack: e.stack}; return Rx.Observable.from([searchResultError(err), searchTextLoading(false)]); }) ); References ========== - `RxJS API V5 `__ - `redux-observable `__ - `Interactive diagrams of Rx Observables `__