Replaying event streams with Broadway


With event sourcing you have access to some powerful mechanisms. One of which is the ability to revisit the events that happened in the past. By revisiting the events, you can discover new patterns in your data and the usage of your system!

For example: in Qandidate.com we have companies that register for an account, but not everybody that registered actually starts using the system. We have a report about active companies, where the definition of active is determined by combining multiple events. But at some point we might change the definition of active. With replaying we have the possibility to revisit those events and create new reports based on the new defintion of active.

It also allows for error correction. Whenever you find a bug in one of your read models, you can simply fix the bug and recreate your read models by replaying the events.

In this post we will go through the process of replaying events.

How does it actually work?

Before we look at replaying, lets look at the event flow in a normal application using Broadway, from the point that an Aggregate records the event.

As soon as an Aggregate records an event, it records it as an uncommitted event. When you then save the Aggregate Root, it will retrieve all uncommitted events and stores them in the event store. After the events are saved they are published on the event bus.

Everything that is interested in events can subscribe to the event bus, to receive the DomainMessages.

In one of our previous posts we talked about projecting your event stream. A projector implements the EventListenerInterface, which means it can listen to the published events. That way you can build your read models, based on the events your Aggregate Root generates.

event-loop

What do we need?

When it comes to replaying, it all sounds so simple; retrieve the events and pass them to the event bus, but there are some things you have to keep in mind:

Which events are we going to replay

A lot of times, you don't want to replay all events. You only need events from a specific Aggregate, or only want events that are published in a specific time frame. So you will need to be able to query those specific events.

Who is listening

When replaying, you should keep in mind that you might not want to have all listeners subscribing to the events. Let's take a project management tool as an example. We will assume that when somebody changes the state of a ticket to done, you publish the event IssueResolved.

You probably have projectors that update all related read models (issue list, scrumboard, etc..). But besides that, you also have a Processor that sends an email to participants. If you are adding a new read model (for a dashboard for example), you want to visit all previous events to have the new up-to-date dashboard, but you don't want to sent those emails again.

Solution: Pass the events to a separate event bus

At the moment writing, Broadway doesn't have a complete solution for replaying. The main reasons are the complications mentioned above. One of the things we are missing, is a generic way to find and retrieve events based on some criteria. So at this moment you would need to write something yourself for the event store you are using.

When you have the events, you would have to publish the events to the projectors of your choice.

In the simplest form it could look something like the following. Note that I didn't include all of the bootstrapping code, like setting up the event store with the DBAL connection.

<?php
require __DIR__ . '/bootstrap.php';

$eventBus = new Broadway\EventHandling\SimpleEventBus();

$events = [];
foreach ($connection->fetchAll('SELECT * FROM events') as $event) {
    $events[] = new Broadway\Domain\DomainMessage(
        $event['uuid'],
        $event['playhead'],
        $metadataSerializer->deserialize(json_decode($event['metadata'], true)),
        $payloadSerializer->deserialize(json_decode($event['payload'], true)),
        Broadway\Domain\DateTime::fromString($event['recorded_on'])
    );
}

$eventStream = new Broadway\Domain\DomainEventStream($events);
$projector   = new Acme\IssueDashboardProjector(); // implements Broadway\ReadModel\Projector

$eventBus->subscribe($projector);
$eventBus->publish($eventStream);

Important to notice is that I use a separate EventBus here. This would not be the event bus where your Processors are listening to.

The above example can be used to create a new read model from your existing event stream.

But what about my existing read models?

If this looks to good to be true, then you might be right. This can work in a very specific use-case. But often you have more to worry about. If you need to replay your read models because they changed structure, you would need to replace the current read models.

There are multiple ways to achieve this. The easiest would be to remove the old read models and recreate them. If that is not an option, for example when you can't afford your read models to be gone for a few minutes, there are still multiple ways to do this. You could for example project the new data in a different table/index/collection (depending on your storage), and rename it after you finished the projection.

Crushing the dream

I'm sorry if you came here hoping to see a drop-in solution to do this. As with a lot of the more abstract ideas, there is not a single solution here. You will have to decide which solution would fit best in your application.