Movatterモバイル変換


[0]ホーム

URL:


Skip to content
DEV Community
Log in Create account

DEV Community

Cover image for Ordering Event Bus Events with RxJS and concatMap
Dean Radcliffe
Dean Radcliffe

Posted on

     

Ordering Event Bus Events with RxJS and concatMap

An Event Bus can help our app by providing a single source of truth for the relative timing of events. Iftrigger('B') followstrigger('A') , then we we expect listeners will receive events inA, B order. Any other order would certainly lead to bugs.

Strangely enough, with a simple implementation of an Event Bus in RxJS (a Subject, and some Observers thattap in a function), one can accidentally introduce a bug in which listeners don't hear events in the same order! Both the buses Luis Aviles madehere and the one I built inthis post suffer from this bug. In this post we will useconcatMap to fix that, learning the "Return The Work" principle of Observables.

Your Thoughts?

The library that implements this bus fully today is calledomnibus-rxjs. I'm thinking of putting it up as@rxfx/bus, and extracting out a family of libraries under that namespace. Which name do you like? Leave a comment below.


Trigger and Listen

Inpart 1 we constructed an Event Bus that let us:

  1. Callbus.trigger to trigger an event to the bus
  2. Usebus.listen to register a function to be run on matching events.

On this bus, what happens if a handler function run bybus.listen contains a call tobus.trigger? Could this confuse the event ordering?

bus.listen((item)=>item==="hello-dave",()=>{bus.trigger("hello-HAL")});
Enter fullscreen modeExit fullscreen mode

On the surface there's nothing suspicious here. And to review our bus code at this point, it's nothing more than this:

classBus<T>{privateevents:Subject<T>;constructor(){this.events=newSubject();}listen(matcher:Predicate<T>,handler:(item:T)=>void):Subscription{returnthis.events.asObservable().pipe(filter(matcher),tap(handler)).subscribe();}trigger(item:T){this.events.next(item);}}
Enter fullscreen modeExit fullscreen mode

So, assuming 3 events are triggered, how could 2 listeners disagree on what order the events arrived?

// Listener 1 heard: ["hello-world", "hello-Dave", "hello-HAL"]// Listener 2 heard: ["hello-world", "hello-HAL", "hello-Dave"]
Enter fullscreen modeExit fullscreen mode

The Stack is The Culprit

JavaScript is a stack-based language, where each synchronous function call begins and ends inside its parents'. How is this relevant? Allow me to illustrate..

Suppose there are 3 listeners on a bus carrying strings. First we create a logger called L1. Then our 'Dave listener' which will re-trigger. Then another logger called L2 will be attached. If we fire an event that re-triggers, like"hello-dave", this shows the sequence of calls to the listeners that results in the problem:

trigger('hello-Dave')-L1listener-Davelistener-trigger('hello-HAL')-L1listener-L2listener-L2listener
Enter fullscreen modeExit fullscreen mode

At runtime, thetrigger('hello-HAL') from inside the "Dave" listener started firing each matching listener sequentially. But L2 hadn't yet processedhello-Dave, so L2 seeshello-HAL even beforehello-Dave. This is what we want to prevent. Our answer will be to not begin the re-triggering immediately, but to "Return the Work"

Observables - Units of Work

Of all the definitions of Observables out there, the one I like is that an Observable represents the potential for some work to be done, or resource to be used. Like the command-line stringls -l encodes thepotential of a memory-occupying, data-emitting process. So wherever you did work immediately before, you can now do it in an Observable. Like React VDOM is for DOM elements, Observables are to effects.

Now, if we have a stream of these work Observables, we can strictly serialize them with the RxJS operatorconcatMap.

The Fix

If you suspected some kind of queueing was the solution you're right. But if you didn't know aboutconcatMap, you may have imagined building some data structure to hold the queue. But theconcatMap operator actually does this internally already. Let's give our bus an RxJSSubject for the triggerings we want to serialize.

class Bus<T> {  private events: Subject<T>;+  private triggerings: Subject<Observable<void>>;  constructor() {    this.events = new Subject();+    this.triggerings = new Subject();+    this.triggerings.pipe(+      concatMap((t) => t)+    ).subscribe();  }}
Enter fullscreen modeExit fullscreen mode

When the constructor is run, thetriggeringsSubject begins listening for work items—Observables—which it passes toconcatMap to execute only serially. Now, we change the implementation oftrigger to push one of these work items to that Subject:

  trigger(item: T) {-    this.events.next(item);+    this.triggerings.next(+      new Observable((notify) => {+        this.events.next(item);+        notify.complete();+      })+    );  }
Enter fullscreen modeExit fullscreen mode

The Observable's work is to run each listener for the item completely and synchronously withevents.next(item), and only then callnotify.complete().

And voila!concatMap can serialize the calls now! We no longer violate causality, and our logs show each listener agrees on the same event ordering.

// Listener 1 heard: ["hello-world", "hello-Dave", "hello-HAL"]// Listener 2 heard: ["hello-world", "hello-Dave", "hello-HAL"]
Enter fullscreen modeExit fullscreen mode

And we can see this is because triggerings—the execution of those Observables—always finishes notifying listeners for one event before processing the next one.

trigger('hello-dave')-L1listener-Davelistener(queueshelloHAL)-L2listenertrigger('hello-HAL')-L1listener-L2listener
Enter fullscreen modeExit fullscreen mode

Decoupling For The Win

Where we're going with this bus is to make it usable for a Pub-Sub implementation. This bug we just fixed was a kind of coupling, where "Dave listener" was interfering with L2's event order. This is normal when making sync calls in a stack-based language, but we can bring order back to it again with RxJS.

Now that we've decoupled this runtime behavior of listeners from each other, it's time to look at another important way listeners should be decoupled: errors. And that will be the topic of the next post.

Links

Top comments(0)

Subscribe
pic
Create template

Templates let you quickly answer FAQs or store snippets for re-use.

Dismiss

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment'spermalink.

For further actions, you may consider blocking this person and/orreporting abuse

  • Location
    Chicago, USA
  • Joined

More fromDean Radcliffe

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

Log in Create account

[8]ページ先頭

©2009-2025 Movatter.jp