Asynchronous Event Processing with Broadway using RabbitMQ
Broadway allows the processing of events by passing them to an event bus. You can have a Projector subscribe to this event bus in order to update your read models, or you can have a Processor subscribe to the event bus in order to for example send an email. In Broadway this all happens synchronously: dispatching of the command, persisting the new events and processing the events (the projectors and processors) all happen on a single request, on a single thread. In this blog post we will explain to you how you can improve your user experience by running the processors asynchronously using RabbitMQ (or any other message queue).
Broadway - the regular flow
Before we get to the juicy part, let's start by visualizing the default usage of Broadway:
The projectors and processors are subscribed to the EventBus. In this setup, when an
event is dispatched, all four subscribers will process the event synchronously,
blocking the response to the user. Since this only happens when writing to the
application (we dispatched an event because our aggregate was updated), this
usually isn't that bad. Most of the requests to our applications are GET
requests, and because we have read models these are really fast.
However, we can make the POST
requests faster by doing the processing
asynchronously.
Using RabbitMQ for processing events
Here at Qandidate.com we do this by also passing the persisted event to RabbitMQ. Consider having a processor that sends an email. Instead of subscribing our mailer to the EventBus, we subscribe it to the message queue. And how do we get the message in the queue? By using Broadway of course! We wrote a Processor that receives and event and forwards it to the RabbitMQ Exchange, allowing you to run your other processors in a separate process.
This way you can still run your projectors the way you normally do, and run (some of) your processors asynchronously.
The new flow looks like this:
Notice that we added a new Processor, the RabbitmqProcessor
. This processor receives the events and
publishes it to the RabbitMQ exchange. Depending on how you configured your
RabbitMQ exchange it routes the events to various queues. Each processor that
handles a task (e.g. sending the email) gets its own queue. We did this to
ensure each processor receives all events: You want to make sure that both
Asynchronous Processor 1
and Asynchronous Processor 2
receive the same event.
Of course you do need a way to run these Processors! For this we use videlalvaro/RabbitMqBundle. The Processors are set up as Consumers, and everything is monitored using Supervisor. You can run multiple instances of each processor if you want!
So, when is this approach useful? Well, it really depends on how many processors you have and how long it takes to run these processors. For example, we use this approach for persisting files to S3 and sending emails, but you could also use it for resizing images, injecting data into a slow legacy application, etc.
How about those projectors
In the example above we still run all Projectors during the user's request. It is also possible to run your Projectors asynchronously. This, however, does come at a cost: when your Request is processed your read models have not yet been updated (as this happens asynchronously and the user's Request doesn't wait for the projectors to finish). If you show your read model to your users, they might see outdated data. And this can be very confusing for them. You can take care of this by polling for changes or adding a NodeJS server dealing with websockets to push the read model changes to your users. The downside of this approach is that it makes your application a lot more complex.
Another problem with running the projectors in a different process is that your events might end up arriving out of order. If your projector crashes, RabbitMQ will requeue your message, but ordering is not guaranteed. Therefore we decided to only make the Processors run asynchronous. As a result our frontend will remain untouched and the flow remains relatively simple.
tl;dr
In order to reduce the time of POST
/PUT
calls (and thus increase user
experience) we offload some of the heavy-lifting by using a Processor that
forwards our events to RabbitMQ. For each processor we run a console command using videlalvaro/RabbitMQBundle
that subscribes to the message queue and processes the messages.
Hope you enjoyed our brief post regarding asynchronous event processing with Broadway!