11 June 2014

Reactive DDD with Akka - lesson 3 (Projections)

Don't miss: ddd-leaven-akka-v2 - runnable sample e-commerce application built from Akka-DDD artifacts described in this series.

Let's continue our adventure with Akka and DDD. So far we have been concentrated on write/command side of the system. We have covered topics like command processing, event storage, sharding etc. but haven't touched the other side of the system: the query side. Our goal of this lesson is to implement necessary infrastructure supporting creation of components responsible for feeding query-side database: projections. For query side we will choose sql database although there are other technologies worth considering (document or graph databases might be a better choice depending on requirements).

Our first task is to check if Akka (akka-persistence in current, still experimental version to be precise) supports concept of projections. And as it turns out, Akka provides View trait that defines an actor capable of receiving events from journal of particular processor (aggregate root - AR) instance. This sounds good but unfortunately the concept of View is insufficient for building projections that typically listen to aggregated stream of events rather then events of particular AR instance. A workaround would be to register a view for a processor that receives events from all ARs of given type, but in such scenario, events would be persisted twice and reliable event delivery (more on that later) would had to be ensured between ARs and aggregating processor.

What we should require from "ideal" event store implementation is a dsl for event streams transformation and aggregation.

EventStore should be mentioned here as it provides impressive set of methods for handling streams of events: Event Store as a Read Model including stream aggregation (per category/AR type) . You can configure EventStore Journal for Akka Persistence and try experiment with projections (be prepared to write some javascript and json).

But what if we don't want to depend on any particular journal provider? Well, composing streams of events would be possible if journals were exposed through some functional (composable) interface rather than View. And this is one of the goals already established by akka team with introduction of akka-stream module. Akka-stream is implementation of recently announced standard for asynchronous, composable and non-blocking stream processing on JVM: http://www.reactive-streams.org/ that models stream publishers and subscribers (as defined by the specification) as akka actors. It also already provides simple dsl (PersistentFlow) for creating event stream publishers that are backed by Views. As you can see here  event streams can be aggregated by merging stream publishers (although aggregation by category is not yet supported). Once the akka-streams is released we should definitely make use of it and see how to implement projections as reactive components (subscribers of event streams). For now we need to find a different solution. The only possibility left is to introduce third-party broker and forward events to topics or queues configured within the broker. As we will see in following section, integrating akka with external broker is much simpler that one could imagine. This approach will require enriching AR with Publisher component (trait) responsible for sending persisted events to configured target actor. This is a perfect opportunity to learn more about message delivery in distributed systems.

Reliable event delivery

Sending events generated by AR to any other actor (being a broker gateway or aggregating processor (already mentioned before)) should be straightforward to implement, right? Yes, it is, if you accept that in case of node crash some messages might be lost (despite the fact that corresponding commands had been acknowledged to the client!). Akka does not guarantee that message will be delivered (this is so called at-most-once delivery semantics) because reliable delivery does not come for free in distributed systems. To obtain at-least-once delivery semantics (message is guaranteed to be delivered, but duplicates may occur) acknowledgment protocol must be used. Sender must store the message before sending and receiver must acknowledge the message once it is received.  If acknowledgment is received, sender must mark previously stored message as confirmed. If acknowledgment is not received within configured time period or sender is restarted (for example as the result of crash) sender retries delivery. Because acknowledgment message might be lost as well, sender might resend message already delivered. Akka-persistence provides at-least-once delivery semantics in its core. Journal plugin api defines methods for storing confirmation entries for messaged that have been acknowledged. Acknowledgment and retry mechanism is encapsulated within Channel component that must be used to send events from within processor (AR) to destination actor. Events must be wrapped in Persistent envelop so that receiver can acknowledge the message by simply calling Persistent.acknowledge method.

ReliablePublisher

Thanks to traits composability we can put all the behavior required by reliable AR (sender) to separate component: ReliablePublisher. It extends from abstract EventPublisher that in turns extends from abstract EventHandler. We mix in AggregateRoot with EventHandler making event handling in context of AR configurable as much as possible (useful for testing purposes). Please notice that it is possible to stop redelivery when configured number of retries is exceeded. For that purpose RedeliverFailureListener has been registered on the channel to be notified if redelivery fails. The listener actor throws RedeliveryFailedException exception that results in restart of parent actor (AR) (to make it work supervisorStrategy of listener actor had to be adjusted). Inside preRestart method (that RedeliveryPublisher overrides) we can trigger compensation action and/or mark failing event as deleted (to prevent continuation of redelivery after AR is restarted).

To provide destination for ReliablePublisher its abstract member target must be defined on creation time (AggregateRootActorFactory should take care of this as shown here: ProductPublishingSpec). Finally we should verify if ReliablePublisher works as expected and is really reliable. You can find test here: ReliablePublisherSpec.

Message exchange over queue

Now we will configure infrastructure for events transmission over durable queue. We will use Apache Camel to arrange in-only message exchange  between a component writing to the queue (producer) and projection component reading from the queue (consumer). Thanks to akka-camel both components (producer and consumer) can be represented as ... (surprise! ;-) actors. EventMessageConfirmableProducer is the producer that receives event messages (notice that Events are published inside EventMessage envelope) coming from ReliablePublisher (AR) and forwards them to the configured camel endpoint (for example jms queue or topic) (see transformOutgoingMessage method). Once event message is accepted by the queue (and persisted if the queue is durable) producer acknowledges event reception to the publisher (ReliablePublisher) (see routeResponse method). Please notice that we choosed to unwrap EventMessage from ConfirmablePersistent envelope before putting it into the queue (so that consumer does not have to do unwrapping itself). ConfirmablePersistent still needs to be attached to the EventMessage so we convert it to meta attribute.

Finally we can implement projection as Consumer actor, an actor that consumes messages from camel endpoint. Projection actor simply applies provided projection specification (ProjectionSpec) to received event and finalizes message exchange by either sending acknowledgment or failure. To prevent processing of duplicated events concrete implementation of ProjectionSpec must return sequence number of last processed event for given aggregareId (see currentVersion method):

Before pulling all the pieces together we need to register concrete broker as component of Camel. We will use ActiveMQ. Configuration of the ActiveMQ component is straightforward (see ActiveMQMessaging trait) And of course we need a runner class (EmbeddedActiveMQRunner) that starts the broker as embedded service.

Implementing projection 

Now we can implement concrete projection specification. Let's model ProductCatalog service inside Sales module (context) that maintains list of products with price. Whenever new Product is added to inventory (within Inventory module/context) it should also be added to product catalog in Sales module. Thus we need projection that listens to InventoryQueue and updates product catalog. We will skip implementation details of ProductCatalog as accessing relational db is rather not very interesting topic (we use Slick for that purpose). InventoryProjection just calls insert method of ProductCatalog providing Product data copied from the ProductAdded event and empty price.

An integration test is available here: EventsPublishingClusterSpec . From first node we send two AddProduct commands. Commands are handled by Product AR within inventory context and generated events are published (reliably) to InventoryQueue that is available from all nodes in the cluster. On the second node we register InvetoryProjection and wait for 1 second (so that published events have time to reach the queue) and then send GetProduct query message to ProductFinder to check if expected product has been added to product catalog.

And surprisingly test succeeds ;)!


Last but not least, I added experimental feature to our application:
If command sender wants to be notified once the view has been updated he can request special delivery receipt!

Please see ProductAcknowledgedPublicationSpec:

     

As homework you can find out how the feature is implemented and let me know your opinion about the idea :)