Monthly Archives: September 2015

Persistence in CQRS Read Models

One of the biggest benefits of CQRS is the ability to implement multiple read models. Business rules and the domain model are safe, clean and isolated over in the write model. They are not getting in the way of view models, which can selectively pick the pieces they are interested in, freely reshape them, and do everything in a way that needs different kind of elegance and clarity as the domain model. The read models are all about query performance and convenience.

Put simply, CQRS is a practical implementation of what Pat Helland described in his paper on immutability: The truth is the log. The database is a cache of a subset of the log. Let’s have a look at some consequences of this approach.

Persistent Models in Relational Databases

Perhaps the most obvious way to implement a read model is in a traditional SQL database. The technology has been around for decades, is really mature and battle-tested, and everyone is familiar with it.

However, in the CQRS world we can do things that would be problematic in a typical application database schema. Since we optimize for read convenience and performance, the data is very often denormalized. It can happen in a number of ways:

  • There may be fields combining data from some other fields (e.g. a single text field with human-friendly street address).
  • The same data may be present in more than one place, more than one table. For example it may make sense to have human-readable street address in a single column, but at the same time keep state and city in different columns (or table).
  • Sometimes it also makes sense to keep multiple revisions or change history of an entity, not just the final version.
  • Another good example of denormalized data is analytics (like OLAP cubes).
  • The data does not have to be relational. You don’t need to map fields in a Java object to columns, or map a graph of objects to a number of tables. You can just serialize the whole thing (to JSON, XML, using native Java serialization etc.) and put it in a blob field.

Why Denormalize?

You may be wondering why one would be using a denormalized schema in the first place. The answer is: to do more computation ahead of time, while processing events and while no human is waiting for the answer. It means less computation at the moment a human is waiting for the answer.

The reason for this is that human time is expensive and getting more so, while computing power and storage is already extremely cheap and getting even cheaper. It is worth doing potentially a lot of computation in advance, to save a human a little bit of time.

How Much Denormalization?

The degree of denormalization depends mainly on performance and query complexity. Fully normalized schema has its benefits, but also many drawbacks. Numerous joins, calculations and filters quickly become tricky to write and maintain. They can also become performance nightmare, for example with joins between massive tables. Even if you’re not joining many thousands of rows, nontrivial calculations can keep the users waiting.

Denormalization can be used to prepare the answers for queries. If a query needs data that would normally live in several large tables, they can be combined once (in asynchronous projection), and then looked up in constant or logarithmic time when the users need it. It may even be possible to go to the extreme and precalculate responses to all common queries, eliminating the need for higher-level caching. In this case the view model is the cache.

It’s necessary to look for balance here, though. Overly aggressive denormalization can lead to poor maintainability related to code duplication, as well as increase the sheer volume of data (in terms of bytes).

Other Persistent Solutions

If the data doesn’t have to be relational, or if it can be denormalized, it may be a good idea to put it in a different kind of database. There is a wide range of NoSQL options to choose from, with the most obvious candidates being document and key-value stores.

We don’t have to stop there though – if the data could benefit from a graph database, there are no obstacles. Another great example of a view model are search indexes like Lucene.

Such stores often have their downsides. They may be trading off consistency for availability and performance. They may be very specialized or limited to particular models (graphs, documents, key-values etc.). It makes them challenging or even inapplicable as the primary persistence mechanism in a typical non-CQRS read/write model. However, they may be perfectly acceptable in a CQRS view model, and the advantages make the whole thing even more powerful.

In-Memory

Another idea we have been considering is in-memory models. Writing to and reading from disk is slow, and if the data fits in RAM, why not just keep it in memory, in ordinary data structures in your language of choice?

There are some challenges:

  • If the event store is large enough, reading and consuming it may take a relatively long time and lots of resources. The limit may be farther away than it first seems, but it certainly is something that has to be carefully thought through.
  • It needs to be transactional. It’s unacceptable for data to change while a query is reading it. You also may need to roll back, and that is far from trivial. It’s much easier in languages that support transactional memory or persistent data structures (like Clojure), and you would probably need a library with such functionality elsewhere.

These challenges could be solved by using persistent, transactional storage:

  • When consuming domain events, update an in-memory model. Don’t touch the disk.
  • Every now and then (e.g. every 1000 events or every minute) take a snapshot of that model and write it to some persistent storage.
  • Let queries read from that persistent snapshot, possibly caching it in memory.
  • After restarting the application or an error, continue consuming the events from the latest snapshot.

It’s getting close to persistent projections, but there are important differences. In this case persistence is only used for isolation and a way to resume from the savepoint after restart. Disk IO can happen asynchronously or less frequently, without slowing down the writer and queries.

Data Retention

Most queries are only interested in relatively recent data. Some may need a year or two, others may only be interested in the last week. With the source data safe on the domain side, the read models are free to keep as little as they need. It can have huge positive impact on their performance and storage requirements.

It’s also possible to have a number of models with identical schema but different data retention. Use the smaller data set as much as possible for best responsiveness. But still have the ability to fall back to a bigger data set for the occasional query about the faraway past, where longer response time is acceptable.

This approach can be combined with different granularity: Keep all the details for the last few weeks or months, and aggregate or narrow down for the longer time period.

Wrapping Up

NoSQL stores, analytics, search indexes, caches etc. are all very popular and useful tools, and very often they are used in a way resembling CQRS without acknowledging it. Whether they’re populated with triggers, messaging, polling or ETL, the end result is a new, specialized, read-only view on the data.

However, the more mature and the bigger the project, the harder it is to introduce such things. It may become prohibitively expensive, with missed opportunities eventually leading to many problems down the road.

It’s much, much easier if you have CQRS from the beginning. The domain model is kept safe and clean elsewhere, as is the ultimate source of data (like event store). The data is easily available for consumption (especially with event sourcing). All it takes to spin off a view model is plug in another consumer to the domain events.

The view models are very good candidates for innovation, too. It’s really easy to try various kinds of databases and programming languages, as well as different ways of solving problems with the same tools.

This post also appeared on the Oasis Digital blog.

Writing an Event-Sourced CQRS Read Model

Discussions about event sourcing and CQRS seem to usually focus on the overall system architecture or various flavors of domain-driven design in CQRS context. However, the read models are often neglected, even though there are some interesting considerations on this side as well. In this post we’re going to present a sample implementation of populating a view model by consuming event stream.

Overview

The idea of a read model is really simple. You take the event log, apply (replay) all the events on an initially empty data model using appropriate functions, and you get the populated model. The code could look like:

List<Event> events = getEvents();
Model model = Model.empty();
for (Event event : events) {
    apply(model, event);
}

We can make this even shorter with functional programming:

Model m = reduce(getEvents(),
                 Model.empty(),
                 (m, e) -> apply(m, e));

That is the essence. Note that it is just the abstract outline and realistic implementation is likely to differ, including buffering, batching (or streaming), persistence etc.

Applying Events

The actual Java code to apply the events may look similar to the below:

EventProcessingResult processEvents() {
    if (getState().isRunning()) {
        int batchSize = getEventsPerIteration();
        List<Event> events = eventStore.getEventsForAllStreams(getLastEventId(),
                                                               batchSize);
        if (events.isEmpty()) {
            return NO_EVENTS_TO_PROCESS;
        } else {
            return processEvents(events);
        }
    } else {
        return NOT_RUNNING;
    }
}

EventProcessingResult processEvents(List<Event> events) {
    try {
        for (Event event : events) {
            dispatchEvent(event);
        }
        return SUCCESS;
    } catch (RuntimeException e) {
        return FAILURE;
    }
}

All in all it’s really simple and straightforward. It is possible to enhance it with hooks before and after processing individual events and the entire batch. Such hooks could be used to:

  • implement transactions,
  • plug in monitoring,
  • implement error handling,
  • calculate the batch size depending on speed,
  • perform arbitrary operations, e.g. setting something up or recalculating once per batch.

The last interesting piece is the dispatchEvent method. Aside from walking the type hierarchy, error handling and making it all optional, it boils down to:

void dispatchEvent(Event e) {
    Method handler = projector.getClass().findMethod("on", e.getClass());
    handler.invoke(projector, e);
}

In other words, for each event type (like OrderCreated), we look for a public method called on that takes a single argument of matching type, on a projector object.

All of the above is part of an engine, a piece of infrastructure backing many view models. All that is necessary to implement a projection is actually provide the projector, with handlers for interesting event types. All other events will simply be ignored.

It could look like this:

public class OrderProjector {
    @Inject
    private OrderDao orders;

    public void on(OrderCreated e) {
        orders.save(new Order(e.getOrderNumber()));
    }

    public void on(OrderApproved e) {
        Order o = orders.find(e.getOrderNumber());
        o.setApproved(true);
    }
}

Projection Thread

Let’s discuss multi-threading for a moment. Shared mutable state immediately brings numerous problems and should be avoided as much as possible. One of the ways to deal with it is not having concurrency in the first place, e.g. by limiting writes to a single thread. In most cases a single-threaded writer combined with ACID transactions is more than enough to keep up with the write load. (The read/query load can be heavy and use many threads – all of the details here are only about the writes.)

The thread is responsible for applying the events to the read model, all the way from querying the event store to updating the view model database. Normally it just loads batches of events from the store and applies them. It continues as long as there are more events to process, and goes to sleep after it’s caught up. It wakes up after a certain amount of time or when notified about new events by the event store.

We also have some control over this thread’s life cycle. For example, we have a way to programmatically pause and resume each projection’s thread, even exposed in an admin GUI.

Push or Pull?

With a database-backed event store, it’s very easy to query repeatedly for new events. This is the pull model. Unfortunately, it also means that you may end up polling too often and generating needless load, or polling too infrequently and thus possibly taking longer to propagate changes to the view model.

That’s why in addition to polling the event store it’s a good idea to introduce notifications that wake up the read models as soon as new events are saved. This effectively becomes a push model with minimal delays and load. We found JGroups to be a very good tool for the job – it supports multiple protocols and is very easy to set up, involving much less hassle than a full-blown message queue.

The notifications may or may not contain actual events.

In the latter (and simpler) design, they only spread the information that a new event has been saved, along with its sequential ID (so that all projections can estimate how much behind they are). When awakened, the executor can continue along its normal path, starting with querying the event store.

Why? Because handling events coming from a single source is easier, but more importantly because a DB-backed event store trivially guarantees ordering and has no issues with lost or duplicate messages. Querying the database is very fast, given that we’re reading a single table sequentially by primary key, and most of the time the data is in RAM cache anyway. The bottleneck is in the projection thread updating its read model database.

However, there are no obstacles to putting event data in the notifications (except for maybe size or network traffic considerations). It would likely decrease the load on the event store and save some round trips to database. The projector would need to maintain a buffer and fall back to querying the event store when needed. Or the system could use a more reliable message queue.

Restarting Projections

Aside from pause/resume, the above screenshot shows one more action: restart. Innocuous as it looks, it’s a really nice and powerful feature.

Since the view model is completely derived from the event log, at any time it can be thrown away and recreated from the beginning (or from some initial state/old enough snapshot). Data is safe in the event log, the ultimate source of truth.

It’s useful when anything about the view changes: a field or a table is added, a bug is fixed, something is calculated differently. When it happens, it’s often easier (or required) to just start from the beginning, rather than for example implement massive SQL migration script.

It’s even possible to go as far as fully automating it, so that when the system starts up and it detects the DB schema does not match the corresponding Java model, it can automatically recreate the schema and reprocess the event log. It’s like running with Hibernate create-drop policy, except for that it doesn’t lose data.

Performance

The solution may appear quite limited with regards to performance.

One point that could raise an eyebrow is the single-threaded writer. In reality a single thread is usually fast enough to easily keep up with the load. Concurrency is not only more difficult to implement and maintain, but it also introduces contention. Reads (queries) can be heavily multi-threaded and easy to scale out.

We also gain a lot by having multiple read models, for example separating analytics from administration and “transactional” data. Each model is single-threaded (for writing), but the multiple models consume events in parallel. Finally, the solution could be modified to use sharding or some kind of fork-join processing.

Another interesting point is restarting projections from scratch.

A good solution is something like kappa architecture:

  • Keep the outdated projection up and running and answering all the queries.
  • Start a new projection, e.g. to another database. Just let it process the events, don’t point any traffic to it.
  • When the new projection catches up, redirect traffic and shut down the old one.

On a very small instance, especially for development, it may even be possible to do a restart online, on the same instance. It depends on answers to the following questions: How long does it take to reprocess all events? Is it acceptable for this projection to be stale for 30 minutes? Can we deploy at night or weekend, when nobody is using the system anyway? Do we have to replay all the history?

Another factor to consider here is persistence. If it’s too much of a bottleneck and cannot be further optimized, consider using in-memory view models.

Summing Up

In essence, that’s all it takes to implement a read model consuming an event store. It gains much simplicity thanks to a linear event store and processing everything in a single thread. So much that in the end it’s really just a loop, implementing the reduction shown in the beginning.

In future posts I am going to dig deeper into practical concerns of implementing projections.

This post also appeared on the Oasis Digital blog.

Achieving Consistency in CQRS with Linear Event Store

In a recent project involving an event-sourced CQRS system, we decided to do some things that seem somewhat unusual compared to solutions mostly talked about. However, they let us achieve some nice properties that would be hard (if possible at all) otherwise.

Event Store as Regular Table

We decided to implement the event store as a regular table in an RDBMS. We used PostgreSQL, but there is little PostgreSQL-specific here. We know this database is very reliable, powerful and simply mature. On top of that, single-node ACID transactions provide some really nice benefits.

The table ended up with the following fields:

  • event_id (int) – primary key coming from a global sequence
  • stream_id (UUID) – ID of an event stream, typically a DDD aggregate
  • seq_no (int) – sequence number in history of a particular stream
  • transaction_time (timestamp) – transaction start time, the same for all events committed in one transaction
  • correlation_id (UUID)
  • payload (JSON)

Not all of them are mandatory for an event store, but there is one important and uncommon difference: event_id – globally, sequentially increasing number. We’ll get to that in a moment.

Can You Do It?

If you go for an event store in a regular DB table, getting a global event ID like this is extremely cheap. Databases are really efficient generating, storing, indexing etc. such columns. The only actual problem is whether you can afford using a DB table for it in the first place.

The system we’ve been building does not face the wide web. It’s intended for internal use in companies, with hundreds or thousands of users. This is relatively low scale, something that Postgres will have no issue serving.

All in all, it’s not something I’d recommend if you were building the next Amazon. But chances are you aren’t, and so you may be able to afford the luxury of using simpler technology.

Benefits of Global, Sequential Event ID

Now that we have this peculiar event ID, what can we do with it?

Let’s have a look at the read interface of our event store:

public interface EventStoreReader {
    List<Event> getEventsForStream(UUID streamId, long afterSequence, int limit);
    List<Event> getEventsForAllStreams(long afterEventId, int limit);
    Optional<Long> getLastEventId();
}

The first method is pretty obvious and something you can find everywhere. We only use it to restore a single stream (aggregate) from the event store for handling a new command.

The other two are using the event ID, returning a batch of events after a particular event, and ID of the last event. They are the base of our read models (projections).

Read models are implemented by polling (with hints) the event store. They remember the ID of the last processed event. Every once in a while (or when awoken by a notification from the event store), they read the next batch of events from the store and process them in sequence, in a single thread.

This kind of linear, single-threaded processing is probably as simple as it can get, but it obviously has limited scalability. If you get 600 events per minute, it means on average you cannot be slower than 100 ms per event, no matter what. In reality you also need to consider overhead and leave some headroom, so it needs to be faster than that.

It can be addressed with sharding or parallelizing writes in the read model, but for the moment we did not find it necessary. Having multiple independent, specialized models running in parallel certainly helps with that.

Comparing the last-processed event ID for a projection to the current global maximum, you can immediately tell how much behind the projection is. It’s the logical equivalent of queue size.

The global sequence can also be used to mitigate the downsides of eventual consistency (or staleness).

Executing a command could return the ID of the last written event. Then a query can use this ID, requesting: “I’m fine waiting 5 seconds, but don’t give me the result if your data is older than this ID”. Most of the time it’s a matter of mere milliseconds. For that price, when a user makes a change, she immediately sees the results. And it’s the actual data coming from the server, not simulation achieved by duplicating domain logic in the user interface!

It’s also useful on the domain side. We have some application and domain services that query some domain-specific projections (e.g. for unique checks). If you know that the last event in the event store is X, you can just wait until the projection catches up to that point before making further progress on the command. That’s all it takes to address many problems typically solved with a saga.

Last but not least, since all events are ordered, the projection is always consistent. It may be behind by a few seconds, or a few days, but it’s never inconsistent. It’s simply impossible to run into issues like having one stream processed until Monday, but another until Thursday. If something had happened before a particular event occurred, the same order is always maintained in the view model.

It makes the code and the state of the system a lot easier to write, maintain, and reason about.

Scalability and Complexity Revisited

There is a tendency to use complex, high-scalability technology regardless of the actual customer requirements and realistic scale. Such tools have their place, but they’re not obvious winners, no golden hammers solving all problems. Moreover, they are really expensive, if you consider the complexity of development and operations, and their limits.

Sometimes a simpler tool will solve the problem well. Not only do you save on development and ops, but also gain access to some really powerful tools that are impossible at high scale. Including global counters, linearizability and ACID transactions.

Our example shows a system that was complex enough to warrant event sourcing with CQRS, but sufficiently low scale to allow use of a linear event store, even with linear projections, all in plain Postgres databases.

There are many good reasons to choose boring technology. If you innovate (and you should), be careful with why you actually do it, and don’t innovate in all areas at the same time.

This post also appeared on the Oasis Digital blog.

UPDATE: Discussion

There is a very interesting discussion of the post at Reddit.

Much attention goes to the use of an SQL database. It does slow things down, it has so many features you won’t use with an append-only log, but it also has all you need. The limits themselves are way high, at least in the thousands (or tens of thounsands) of writes per second. We know very well how these DBs behave, everyone knows how to use one, you can find an admin right around the corner.

Regarding linear, single-threaded writes alone, they can scale even further. Single-threaded writes align well with hardware limitations, and at the same time are capable of processing millions of transactions per second (though not necessarily with an SQL database, yet). A good related example is the LMAX architecture. In any case, there is a good chance you don’t need multithreading or distributed systems.