Projecting your event stream


When you are using a CQRS and Event Sourcing architecture you are separating your writes from your reads. You can fire commands that change state but return nothing and you have queries that return a result but don't change state.

So when you need to query some data, you can replay your eventstream and listen to specific events you are interested in. Although this can work for quite a lot of cases, most of the time this is not very efficient and it doesn't scale very well.

One solution to solve that, is to use projections to create a view that you can query against. Those projections are created with a projector. A projector listens to events and creates a read model from it. With this method it is really easy to create multiple read models at the same time. Because of the nature of your event stream, collecting facts that happened in the past, you can also easily create new read models for events that happened a long time ago.

Let's take another look at the demo application that we released recently. There we already had a projector, the PeopleThatBoughtThisProductAlsoBoughtProjector. That projector is responsible for tracking each bought product and which other products were bought together.

protected function applyBasketCheckedOut(BasketCheckedOut $event)
{
    foreach ($event->getProducts() as $productId => $count) {
        $readModel = $this->getReadModel($productId);
        $products  = $event->getProducts();

        unset($products[$productId]);

        $this->addProducts($readModel, $products);

        $this->repository->save($readModel);
    }
}

private function getReadModel($productId)
{
    $readModel = $this->repository->find($productId);

    if (null === $readModel) {
        $readModel = new PeopleThatBoughtThisProductAlsoBought($productId);
    }

    return $readModel;
}

private function addProducts(PeopleThatBoughtThisProductAlsoBought $readModel, array $products)
{
    foreach ($products as $productId => $count) {
        $readModel->addProduct($productId, $count);
    }
}

This projector listens to the BasketCheckedOut event. In that event we have information about all the products in that Basket. For each product, we are going to retrieve the current PeopleThatBoughtThisProductAlsoBought read model for that product, if it doesn't exists yet we create a new one. In the next step, we loop over all the other products and add the product with the count to the read model, increasing the count when already present.

As a last step we persist the read model to the repository. In this case the elasticsearch repository. A big advantage of elasticsearch is its search possibilities, like querying for products that have never been bought together with other products or finding the top most sold combinations.

Using multiple events for a projection

Above example only used one event, but often you want to combine the data of multiple events to create a read model.

As an example we are going to create a read model for active Baskets. It is a simple read model that stores the quantity of each product in the basket.

class ActiveBasket implements ReadModelInterface, SerializableInterface
{
    private $basketId;
    private $products;

    public function __construct(BasketId $basketId)
    {
        $this->basketId = $basketId;
        $this->products = array();
    }

    /**
     * {@inheritDoc}
     */
    public function getId()
    {
        return $this->basketId;
    }

    public function addProduct($productId)
    {
        if (! isset($this->products[$productId])) {
            $this->products[$productId] = 0;
        }

        $this->products[$productId]++;
    }

    public function removeProduct($productId)
    {
        if (! isset($this->products[$productId]) || 0 === $this->products[$productId]) {
            return;
        }

        $this->products[$productId]--;
    }

    /**
     * {@inheritDoc}
     */
    public function serialize()
    {
        return array(
            'basketId' => $this->basketId,
            'products' => $this->products,
        );
    }

    /**
     * {@inheritDoc}
     */
    public static function deserialize(array $data)
    {
        $basket = new ActiveBasket($data['basketId']);
        $basket->products = $data['products'];

        return $basket;
    }
}

To do this, we have to track 4 events: BasketPickedUp, ProductWasAddedToBasket, ProductWasRemovedFromBasket and BasketCheckedOut. So we create a new projector, the ActiveBasketProjector, where we listen to the events.

When we pick up the basket we simple create a new (empty) read model and save it.

public function applyBasketWasPickedUp(BasketWasPickedUp $event)
{
    $this->repository->save(new ActiveBasket($event->getBasketId()));
}

When a product is added, we load the basket from the repository and add the product to it.

public function applyProductWasAddedToBasket(ProductWasAddedToBasket $event)
{
    $activeBasket = $this->repository->find($event->getBasketId());

    if (null === $activeBasket) {
        return;
    }

    $activeBasket->addProduct($event->getProductId());

    $this->repository->save($activeBasket);
}

When a product is removed, we remove the product from the read model:

public function applyProductWasRemovedFromBasket(ProductWasRemovedFromBasket $event)
{
    $activeBasket = $this->repository->find($event->getBasketId());

    if (null === $activeBasket) {
        return;
    }

    $activeBasket->removeProduct($event->getProductId());

    $this->repository->save($activeBasket);
}

Then finally, on checkout, we remove the read model as the basket is no longer active.

public function applyBasketCheckedOut(BasketCheckedOut $event)
{
    $activeBasket = $this->repository->find($event->getBasketId());

    if (null === $activeBasket) {
        return;
    }

    $this->repository->remove($activeBasket);
}

With above examples, I gave you a quick overview of projections. There is more to tell, like how we test those projectors, but that's for another post. If you have any feedback, we are happy to hear about it! Mention us in #qandidate on Freenode or on Twitter.