Redux Observable middleware

Motivation

In mapstore we wanted to realize some complex async operations, like interactions with different servers, soft failing … Realizing some complex tasks using only the thunk middleware was very hard and required a lot of code. We needed to:

  • Have interactions between plugins, keeping the isolation between them

  • Allow simple operation cancellation and recovery

  • Intercept actions to perform side-effects

To do these operations MapStore uses, together with thunk, another middleware called Redux Observable

In the following section we will have a basic introduction to the basic concepts behind this middleware:

  • Reactive Programming - Programming with Observables

  • Observables

  • RxJS

Note

Getting into this can be complicated at the beginning. For this reason MapStore provides multiple ways to do asynchronous programming. You can use hooks (newer components of MapStore use them a lot too), or thunks if you prefer to keep all simple. Redux Observable gives a lot of benefits and allows to make code clean and smart when you get confident with it.

Reactive Programming

Reactive Programming is a declarative programming paradigm that uses asynchronous data streams. In this paradigm all the asynchronous events, actions, data from server, callbacks are turned into a stream.

A stream is a flow of events over the time (value, error, complete)

marble diagram

Marble diagram, commonly used to represent streams

A stream is an entity that emits 3 kind of events

  • values

  • errors

  • complete

In the diagram above, called “Marble diagram”, you can see these 3 kind of events represented respectively as:

  • “marble” (O) for values

  • “cross” (X) for errors

  • vertical line (“|”) for complete events

An object that represents this stream is also called “Observable”.

An observable in JavaScript has the subscribe method that looks like this:

MyObservable.subscribe(
  onValue, // callback for new values,
  onError, // callback for errors
  onComplete // callback when complete
)

So this object has at least one method, called subscribe, that takes 3 functions as arguments:

  • The first method, will be called every time a new value (O) is emitted.

  • The second method will be called when an error is emitted

  • The third method will be called on complete.

Note

In fact this object is very similar to a Promise, but because it can emit more than one “value”, it need also a complete event. You can imagine a Promise as a special case of an Observable, that emits only one event, and then completes implicitly.

RxJs

Is a library of of functions to combine, create and filter any of those streams and create the event flows you need.

observable

Observable

A stream can be used as an input to another one. Even multiple streams can be used as inputs to another stream.

  • You can merge streams.

  • You can filter a stream to get another one that has only those events you are interested in.

  • You can map data values from one stream to another new one.

  • You have the power!

Example:

Let’s imagine we want to create a function that log “click” events only if they are only double and triple clicks in an interval of 350ms (ignoring single clicks):

This is the code you need in vanilla JavaScript:

var clicks = 0;
var timer;
var timeout = 350; // time between each click
function log(text) => console.log(text);
// click timer
document.addEventListener('click', function(e) {
  clearTimeout(timer);
  clicks++;
  var evt = e;
  timer = setTimeout(function() {
    if(clicks==2) {
      log(2);
    }
    if(clicks==3) {
      log(3);
    }
    clicks = 0;
  }, timeout);
});

The RxJS version is:

const single$ = Rx.Observable.fromEvent(button, 'click'); // $ is often used to indate that a variable is a stream
function log(text) => console.log(text);

single$
    .bufferWhen(() => single$.debounceTime(350))
    .map(list => list.length)
    .filter(length => length >= 2)
    .subscribe(totalClicks => {
        console.log(totalClicks);
    });

In this way of programming, you get your stream of events, than you can create a chain of operators to manage them.

functions streams

RxJS Operators

All the operators listed below return a new Observable, so they can be chained each other to produce create the desired stream to subscribe.

There are many operators that you can use to use for various use case, you can refer the official documentation to discover them all, or put them on test with These interactive diagrams.

Creating Observables

These are methods of Rx.Observable, and allow to create streams

  • fromEvent - Creates an Observable that emits events of a specific type coming from the given event target. (e.g. from mouse click event)

fromEvent operator

fromEvent operator

  • of( ) — convert an object or several objects into an Observable that emits that object and completes.

of operator

of operator

  • from( ) — convert an Iterable, a Future, or an Array into an Observable

from operator

from operator

  • defer(): defers the creation of the observable to the given function. defer(() => new Promise(....) ) can be used to converts a promise into an Observable.

defer

defer operator

Filter Operators

This kind of operators allow to filter the events of a stream or completes under certain circumstances, without modifying the values.

  • filter(): Filters the elements of an observable sequence based on a predicate.

filter operator

filter operator

  • takeUntil(): Emits the values emitted by the source Observable until a notifier Observable emits a value.

takeUntil operator

takeUntil operator

Transformation Operators

These kind of operators can be applied to a source stream to modify the values or the whole event sequence.

  • map(): Similar to the well known Array.prototype.map function, this operator applies a transforms each value with a given function and emits that transformed values in the output Observable.

map

map operator

  • mergeMap() Projects each source value to an Observable which is merged in the output Observable.

mergeMap

mergeMap operator

  • switchMap() Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable

switchMap

switchMap operator

The main difference between switchMap and and mergeMap is cancelling effect. On each emission source obserable the inner observable is stopped emitting (the result of the function you supplied) is cancelled and the new observable is subscribed.

This works perfectly for scenarios like type-ahead where you are no longer concerned with the response of the previous request when a new input arrives. This also is a safe option in situations where a long lived inner observable could cause memory leaks, for instance if you used mergeMap with an interval and forgot to properly dispose of inner subscriptions. Remember, switchMap maintains only one inner subscription at a time.

Be careful though, you probably want to avoid switchMap in scenarios where every request needs to complete, think writes to a database. switchMap could cancel a request if the source emits quickly enough. In these scenarios mergeMap is the correct option.

Note

You can remember this by the phrase “switch to a new observable” .

Redux Observable

Redux Observable is a middleware for redux that allow you to observe the stream of actions coming from the store and dispatch new actions.

No need to subscribe, the middleware does it for you.

Only thing to do is write an Epic

What is an Epic?

It is a function which takes a stream of actions as argument and returns a stream of actions to emit. Actions in, actions out.

You can think of it of having roughly this type signature:

function (action$: Observable<Action>, store: Store): Observable<Action>;

The actions you emit will be immediately dispatched through the normal store.dispatch(), so under the hood redux-observable effectively does epic(actions$, store).subscribe(store.dispatch)

Note

Epics run alongside the normal Redux dispatch channel, after the reducers have already received them–so you cannot “swallow” an incoming action. Actions always run through your reducers before your Epics even receive them.

redux animation

redux with middleware

Examples

example of delayed ping pong

delayed Ping Pong

real world example

real world example

Example from MapStore

The code below implements the search logic of the search input: less then 50 lines of code (included the comments) to provide:

  • Multiple search on different services

  • Wait 250ms before to trigger request, to wait the user to finish stop typing (debouncing)

  • Automatic cancellation and clean up when the user re-start typing, when he clicks on reset button, or when the input loses focus.

  • Progressive result presentation while services responses arrive

  • Sorting of results by service priority

  • Retry on error after 200ms

  • Error handling

export const searchEpic = action$ =>
    action$.ofType(TEXT_SEARCH_STARTED)
        .debounceTime(250)
        .switchMap( action =>
        // create a stream of streams from array
          Rx.Observable.from(
            (action.services || [ {type: "nominatim", priority: 5} ])
            // Create an stream for each Service
                .map((service) => {
                    const serviceInstance = API.Utils.getService(service.type);
                    if (!serviceInstance) {
                      const err = new Error("Service Missing");
                      err.msgId = "search.service_missing";
                      err.serviceType = service.type;
                      return Rx.Observable.of(err).do((e) => {throw e; });
                    }
                    return Rx.Observable.defer(() =>
                        serviceInstance(action.searchText, service.options)
                          // returning a promise is equivalent
                          // to returning an observable
                          .then( (response = []) => response
                            .map(result => ({
                              ...result,
                              __SERVICE__: service,
                              __PRIORITY__: service.priority || 0
                            }))
                          ))
                        .retryWhen(errors => errors.delay(200).scan((count, err) => {
                          if ( count >= 2) {
                              throw err;
                          }
                          return count + 1;
                        }, 0));
                }) // map
            )// from
            // merge all results from the streams
            .mergeAll()
            .scan((oldRes, newRes) => sortBy([...oldRes, ...newRes], ["__PRIORITY__"]))
            // limit the number of results returned from all services to maxResults
            .map((results) => searchResultLoaded(results.slice(0, action.maxResults || 15), false))
            .startWith(searchTextLoading(true))
            .takeUntil(action$.ofType( TEXT_SEARCH_RESULTS_PURGE, TEXT_SEARCH_RESET, TEXT_SEARCH_ITEM_SELECTED))
            .concat([searchTextLoading(false)])
            .catch(e => {
                const err = {msgId: "search.generic_error", ...e, message: e.message, stack: e.stack};
                return Rx.Observable.from([searchResultError(err), searchTextLoading(false)]);
            })
        );

References