Ashraf Mageed bio photo

Ashraf Mageed

EventSourcing, DDD and Microservices

I was involved in a discussion at my current client recently regarding how to improve a service that aggregates data from several other services for, let’s say, reporting purposes. There are several issues with the current service but the major one is that in order to fix its data, a, what they call, “take-on” needs to be performed that basically fetches the data from the underlying services’ databases, transforms it and stores it in the aggregate service’s database. This is a cumbersome, time consuming manual process that is prone to human error. Furthermore, this happens on a fairly regular basis as the data in this service often gets corrupted. Furthermore, the views created off the back of the data cannot be altered without a take-on as there’s no way to replay the messages to store them differently or to save more information.

Why do we need a take-on?

The problem of sending data to an aggregator of some sort is a well known problem in SOA systems that has already been solved through “data pumps”, which is exactly what was used in this instance. Data was quite literally being pumped through a queue to the aggregation service, which in turn stored that data in its local data store. This all seems like a very reasonable solution however, message queues are not ideal for data pumps. Now I’m a massive fan of message queues in general but using them in such scenarios has a few shortcomings:

  • messages are gone once consumed, and
  • message ordering is not guaranteed.

Let’s examine these issues in turn. Once we get a message off of a queue, process it successfully - in this occasion storing the information we require from it in a data store - that message is gone. If we want to, let’s say, store more information from that message, because we added an extra column somwhere in our data store, we can’t. If somehow the data gets corrupted and we want to fix it, we can’t either. The only way to do this through queues is to ask the data owner to resend the messages again. Resending messages presents a few complications too, mainly:

  • Republishing events sends it to all subscribers - unless we create specific topics or we send events via a queue rather than publish them
  • Events would need to be republished every time a new aggregator is added

Secondly, message ordering is important in this case too, let’s take an example in an e-commerce system where we have a search and reporting service. They each need to aggregate data from multiple services and store it in their own local data store to offer better reliability and preformance. The ordering service successfully processed an order and published an OrderProcessed event, which the shipping service reacted to, shipped the order and published an OrderShipped event. Both events are sent to the aggregators, the search aggregator received the events in the right order and set the status of the order to “Shipped”, the reporting service, on the other hand, received them in the wrong order and, therefore, the final state of its representation of the order is “Processed”. Granted you should not rely on message ordering when using message queues and there are ways to deal with this - for example, you could use a timestamp from the events to determine which message to apply last - but they will involve handcoding a solution. This scenario would become more frequent with retries as messages will be requeued in different orders or, for persistant exceptions, retried after a delay.

Streaming to the rescue

I posited in that discussion that rather than attempting to shoehorn message queues in this situation, we are better off looking into streams. This would give us ordering, which is important in this case, as well as allow us to keep these ordered streams around. That facilitates replayability; meaning we no longer have to run the cumbersome “take-on” process to fix our data. Instead, we can simply replay all the events in the stream and re-apply them to build up the data store. Additionally, there is no need to send those events to all consumers, or make any changes whatsoever, every time a new aggregator is added as firstly the stream is available and secondly it is down to consumers to manage what events they need to consume, making it simpler to maintain. There are a few options when it comes to streaming, the two below are the ones we could be looking at.

Kafka and Kafka Streams

Kafka and Kafka Streams provide streaming out-of-the-box. They can simply be bolted onto these aggregators and used as a medium for storing and publishing streams of data. Data from these streams can either be stored externally or within Kafka Streams that provide options of either a RocksDB store or an in-memory hashmap. For more information about this, you can read this article: [https://www.confluent.io/blog/event-sourcing-cqrs-stream-processing-apache-kafka-whats-connection/]

AtomPub

Another alternative to Kafka is AtomPub or Atom feeds, which is used by the GetEventStore. Consuming AtomPubs can be a tad confusing but, fortunately, there are libraries to help with that. Additionally, Events are immutable and therefore the Atom feed can be safely cached indefinetly as explained on the GetEventStore website:

AtomPub is a RESTful protocol that can reuse many existing components, for example Reverse Proxies and client�s native HTTP caching. Since events stored in the Event Store are entirely immutable, cache expiration can be infinite.

But… What About Data Duplication?

There are genuine cases where data duplication is the best solution and that’s fine as long as you respect the immutability of that data. Take searching for instance, how else can you provide users with the ability to search for everything in your system in every forseeable configuration? Say you’re a retailer and would like to allow your users to not only search your products and filter them in great details - price range, size, colour and so on - but you want to also recommend products based on the buying patterns of other customers who viewed similar items. You can try to access all underlying service on the spot, aggregate the data on the fly, store it in-memory (temporary duplication I guess) pray that your customers are benevolent souls that will just grab a cup of coffee and wait patiently every time they want to search for products. And that recommendation feature, forget about it, tell that clueless product manager that this is not Amazon. Or you could simply just allow a recommendation service and a search service to derive their data from a stream, transform it and store it in a data store optimized for their usages - like a graph database for recommendtions and ElasticSearch for searching and everyone lives happily ever after. There is no need for these service to alter the data in any way as they are built for reads and thus immutability is preserved. Pat Helland has an excellent talk about how immutability changes everything that delves deeper into this.