Achieving Consistency in CQRS with Linear Event Store

In a recent project involving an event-sourced CQRS system, we decided to do some things that seem somewhat unusual compared to solutions mostly talked about. However, they let us achieve some nice properties that would be hard (if possible at all) otherwise.

Event Store as Regular Table

We decided to implement the event store as a regular table in an RDBMS. We used PostgreSQL, but there is little PostgreSQL-specific here. We know this database is very reliable, powerful and simply mature. On top of that, single-node ACID transactions provide some really nice benefits.

The table ended up with the following fields:

  • event_id (int) – primary key coming from a global sequence
  • stream_id (UUID) – ID of an event stream, typically a DDD aggregate
  • seq_no (int) – sequence number in history of a particular stream
  • transaction_time (timestamp) – transaction start time, the same for all events committed in one transaction
  • correlation_id (UUID)
  • payload (JSON)

Not all of them are mandatory for an event store, but there is one important and uncommon difference: event_id – globally, sequentially increasing number. We’ll get to that in a moment.

Can You Do It?

If you go for an event store in a regular DB table, getting a global event ID like this is extremely cheap. Databases are really efficient generating, storing, indexing etc. such columns. The only actual problem is whether you can afford using a DB table for it in the first place.

The system we’ve been building does not face the wide web. It’s intended for internal use in companies, with hundreds or thousands of users. This is relatively low scale, something that Postgres will have no issue serving.

All in all, it’s not something I’d recommend if you were building the next Amazon. But chances are you aren’t, and so you may be able to afford the luxury of using simpler technology.

Benefits of Global, Sequential Event ID

Now that we have this peculiar event ID, what can we do with it?

Let’s have a look at the read interface of our event store:

public interface EventStoreReader {
    List<Event> getEventsForStream(UUID streamId, long afterSequence, int limit);
    List<Event> getEventsForAllStreams(long afterEventId, int limit);
    Optional<Long> getLastEventId();
}

The first method is pretty obvious and something you can find everywhere. We only use it to restore a single stream (aggregate) from the event store for handling a new command.

The other two are using the event ID, returning a batch of events after a particular event, and ID of the last event. They are the base of our read models (projections).

Read models are implemented by polling (with hints) the event store. They remember the ID of the last processed event. Every once in a while (or when awoken by a notification from the event store), they read the next batch of events from the store and process them in sequence, in a single thread.

This kind of linear, single-threaded processing is probably as simple as it can get, but it obviously has limited scalability. If you get 600 events per minute, it means on average you cannot be slower than 100 ms per event, no matter what. In reality you also need to consider overhead and leave some headroom, so it needs to be faster than that.

It can be addressed with sharding or parallelizing writes in the read model, but for the moment we did not find it necessary. Having multiple independent, specialized models running in parallel certainly helps with that.

Comparing the last-processed event ID for a projection to the current global maximum, you can immediately tell how much behind the projection is. It’s the logical equivalent of queue size.

The global sequence can also be used to mitigate the downsides of eventual consistency (or staleness).

Executing a command could return the ID of the last written event. Then a query can use this ID, requesting: “I’m fine waiting 5 seconds, but don’t give me the result if your data is older than this ID”. Most of the time it’s a matter of mere milliseconds. For that price, when a user makes a change, she immediately sees the results. And it’s the actual data coming from the server, not simulation achieved by duplicating domain logic in the user interface!

It’s also useful on the domain side. We have some application and domain services that query some domain-specific projections (e.g. for unique checks). If you know that the last event in the event store is X, you can just wait until the projection catches up to that point before making further progress on the command. That’s all it takes to address many problems typically solved with a saga.

Last but not least, since all events are ordered, the projection is always consistent. It may be behind by a few seconds, or a few days, but it’s never inconsistent. It’s simply impossible to run into issues like having one stream processed until Monday, but another until Thursday. If something had happened before a particular event occurred, the same order is always maintained in the view model.

It makes the code and the state of the system a lot easier to write, maintain, and reason about.

Scalability and Complexity Revisited

There is a tendency to use complex, high-scalability technology regardless of the actual customer requirements and realistic scale. Such tools have their place, but they’re not obvious winners, no golden hammers solving all problems. Moreover, they are really expensive, if you consider the complexity of development and operations, and their limits.

Sometimes a simpler tool will solve the problem well. Not only do you save on development and ops, but also gain access to some really powerful tools that are impossible at high scale. Including global counters, linearizability and ACID transactions.

Our example shows a system that was complex enough to warrant event sourcing with CQRS, but sufficiently low scale to allow use of a linear event store, even with linear projections, all in plain Postgres databases.

There are many good reasons to choose boring technology. If you innovate (and you should), be careful with why you actually do it, and don’t innovate in all areas at the same time.

This post also appeared on the Oasis Digital blog.

UPDATE: Discussion

There is a very interesting discussion of the post at Reddit.

Much attention goes to the use of an SQL database. It does slow things down, it has so many features you won’t use with an append-only log, but it also has all you need. The limits themselves are way high, at least in the thousands (or tens of thounsands) of writes per second. We know very well how these DBs behave, everyone knows how to use one, you can find an admin right around the corner.

Regarding linear, single-threaded writes alone, they can scale even further. Single-threaded writes align well with hardware limitations, and at the same time are capable of processing millions of transactions per second (though not necessarily with an SQL database, yet). A good related example is the LMAX architecture. In any case, there is a good chance you don’t need multithreading or distributed systems.

9 thoughts on “Achieving Consistency in CQRS with Linear Event Store

  1. Hi,
    Great series of posts. I’m wondering how do you achieve linearity with example like this:

    one transaction inserting new events begins (reserves sequential id), second transaction inserting new other events (reserves sequential id) and second transaction ends first, only then first transaction succeeded.

    As you can see timeline is unsettled. What do you think about this?

    Regards

  2. Dariol – I’m not sure what you mean by “reserving sequential ID”. Let’s see this helps though…

    The insert transactions are sequential. If two “upstream” producers want to insert a bunch of events each, first all events from one are inserted, then all from the other. So reading this later, you get a sequence of consistent “chunks”.

    How does the “upstream” do its business operations?

    * If two calculations are completely independent, the order doesn’t matter. They’re just independent.
    * Concurrent modifications on the same aggregate are blocked and forced to sequential. This can be achieved with locking (pessimistic or lease-based – we ended up with the latter).
    * If one thing has a strong dependency on another, we used the same locking mechanism in application service. For example: “I am inserting a new user, and I don’t want any concurrent inserts of other users” (for uniqueness checks).

  3. @Konrad

    Example situation:
    One transaction creates events (event_id = 4,5,6) in data table but not finished yet. The second transaction starts (add new events with event_id = 7,8) and commits first. Projections thread reads events with optimised way (READPAST hint in MS SQL Server): 1,2,3,7,8 (4,5,6 are not committed yet) and stores the last sequence equals 8.

    Then the first transaction commits.

    Now, we have events from 1 to 8 but 4,5,6 are not processed by projections.

    Can You spot the problem now?

  4. @Dariol – you’re right, this could happen. It may depend on transaction isolation level and some DB internals. It should be extremely rare (depends on volume/concurrency), but possible.

    If it’s unacceptable, in the worst case it can be addressed with locks or single-threaded writer.

  5. If this can happen at any point in time then this is the serious problem. It’s hidden. It’s unacceptable at all and I’m surprised that nobody writes about it.

    Single-Writer is one of the solutions that works well without additional locks pulling your system performance down.

  6. Hi Konrad,

    interesting article!

    I see a problem with this: “Executing a command could return the ID of the last written event.”
    Seems this is breaking a CQRS principle -> commands should not return values.

    We have a similar problem for a different solution we use to address staleness of read models in the clients. We have a “concurrencyVersion” property in our Aggregates that we also put into the events and project into our read models. Now if a client has version 5 of a read model and sends a Command this version is included, so the Aggregate can reject the Command because the client (user) made his decision on stale data. Also the client can poll the read model until it’s in version 6 or higher. This is just guessing the next version, so sometimes it’s not right:

    a) another command for this Aggregate could have been handled first
    b) some commands result in multiple events

    So we’d need to do basically the same as you suggest, return the concurrencyVersion from the last event that was caused by this command.
    Just sacrifice the CQRS principle for this purpose?
    Or do you think it’s ok because it’s not data from a business entity?

  7. Anton, thanks for the comment.

    Like many things in software, CQRS is not a very strict rule book, more like a collection of patterns. Commands are not “fire and forget”. You mention rejection, that’s already a response to the command (and a perfectly valid and common one).

    All these cases about concurrency and consistency are really addressed individually. It’s often recommended that you are more lenient with accepting commands (in favor of throughput, and because everything may be in flux all the time) – but then you have to deal with consistency later. Depending on domain, it may be OK to just accept most commands. Then you can just apply them all (if they’re commutative or otherwise non-conflicting), or keep all “versions” and show to the user, or employ some kind of conflict resolution (automatic or manual).

    We did something like that in a few places.

    In other places we ended up putting “revision number” in the domain model, letting the client see it, and requiring the client to include it in the update command. So the semantics are: “This is what I saw on the screen when I made this update, let me know if anything has changed in the meantime so I can review my changes on top of the new version.”

    The event ID described in the article is a third option. It’s something from the meta level, a global clock for the entire system. Something that lets the client wait for the changes to propagate to the read model, so their query returns data that is guaranteed to have their command applied.

    I think all of these are fine with CQRS. At the end of the day, we have business problems to solve. If it’s critical (or even very convenient down the road) that something is serialized or manually resolved, just do it.

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Spam protection by WP Captcha-Free