All posts by Konrad Garus

Environment-specific modules, services and components in Angular

Sometimes your Angular application needs to be a little bit different depending on the environment. Maybe your local development build has some API services stubbed out. Or you have two production destinations with a little bit different behavior. Or you have some other build-time configuration or feature toggles.

The differences can be on any level: services, components, suites of ngrx/effects, or entire modules.

For example, it could look like this:

The dependency injection and module mechanism in Angular is pretty basic, and it does not seem to offer much to answer such use cases. If you want to use ahead-of-time (AOT) compilation (and you should!), you can’t just put random functions in module definition. I found a solution for doing it one service at a time, but it’s pretty ugly and not flexible enough. The problem does not at all seem uncommon to me, though, so I wanted to describe a nice little trick to work around this.

Sample app

Let’s study this on a sample app. To keep things interesting, let’s make a realistic simulator of various organization hierarchies.

We’ll need a component to tell us who rules the “organization”:

As well as a service:

On a picture it could look like this:

Now, let’s say we have two environments:

  • Playground, where little Steve would really like to own the place… except that he can only do that when the local bully is not around, and that’s only when he’s eating. To determine the ruler we need to ask the Bully’s mother about his status. So it goes.
  • Wild West, where the rules are much simpler in comparison: It’s sheriff who runs the show, period.

In other words, we need something like this:

So, how to achieve that?

Solution

The solution is actually pretty straightforward. All it takes is a little abuse (?) of Angular CLI environments. Long story short, this mechanism provides different environment file to the compiler based on a compile-time flag.

If you check the documentation or pretty much any existing examples, the environments are typically used to provide different configuration. Just a single object with a bunch of properties. You might be tempted to use it in some functions on module definition, but it’s not probably not going to work with AOT. Oops.

However, at the end of the day it’s just a simple file substitution. You can put whatever you like in the file, and as long as it compiles everything is going to be OK. Classes. Exports from other files. Anything.

In our case, the AppModule can import the RulersModule from the environment. It doesn’t care much what the module actually contains.

The environments would export it from the relevant “package”. They could have the classes inline, but I prefer to keep them in separate files closer to the application.

Now, the modules:

The final trick that makes these work is that there is an abstract class/interface for the RulersService, and AppComponent (or other services, if we had them) only depends on that. Actually, this pattern has been around for a while. This is the old good programming to interfaces.

The same goes for the RulersModule: In a sense it’s abstract too, AppModule doesn’t know or care what concrete class it is. As long as the symbol with the same name exists during compilation.

This sample demonstrates it on the module level, but you can use the same trick for any TypeScript code, be it a component, a service, etc.

I am not sure if such use of environments is ingenious, or insane and abusive. I have not seen this anywhere. However, it does solve a real problem that does not seem to have a solution in the Angular toolbox (and it’s not particularly toxic).

Sample app on GitHub

Check out this sample app at my GitHub repository.

Credits

The idea comes from Kyle Cordes. I only made it work.

classycle-gradle-plugin update: works with Gradle 2.14.1 and Java 7

A while ago I released classycle-gradle-plugin, enabling easy integration of Classycle into Gradle builds. Classycle is a really great tool for static code analysis, especially checking for unwanted dependencies, cycles, etc.

I’ve just released a new version of the plugin. Since 1.2, it should work without problems with Gradle 2.14.1 (tested with 3.2.1 as well) and Java 7. Happy hacking, and thanks a ton to GitHub contributors!

Rapid Development with Hibernate in CQRS Read Models

In this post I’m going to share a few tricks for using Hibernate tooling in CQRS read models for rapid development.

Why Hibernate?

Hibernate is extremely popular. It’s also deceptively easy on the outside and fairly complex on the inside. It makes it very easy get started without in-depth understanding, misuse, and discover problems when it’s already too late. For all these reasons these days it’s rather infamous.

However, it still is a piece of solid and mature technology. Battle-tested, robust, well-documented and having solutions to many common problems in the box. It can make you *very* productive. Even more so if you include tooling and libraries around it. Finally, it is safe as long as you know what you’re doing.

Automatic Schema Generation

Keeping SQL schema in sync with Java class definitions is rather expensive a bit of a struggle. In the best case it’s very tedious and time-consuming activity. There are numerous opportunities for mistakes.

Hibernate comes with a schema generator (hbm2ddl), but in its “native” form is of limited use in production. It can only validate the schema, attempt an update or export it, when the SessionFactory is created. Fortunately, the same utility is available for custom programmatic use.

We went one step further and integrated it with CQRS projections. Here’s how it works:

  • When the projection process thread starts, validate whether DB schema matches the Java class definitions.
  • If it does not, drop the schema and re-export it (using hbm2ddl). Restart the projection, reprocessing the event store from the very beginning. Make the projection start from the very beginning.
  • If it does match, just continue updating the model from the current state.

Thanks to this, much of the time you don’t have to we almost never type SQL with table definitions by hand. It makes development a lot faster. It’s similar to working with hbm2ddl.auto = create-drop. However, using this in a view model means it does not actually lose data (which is safe in the event store). Also, it’s smart enough to only recreate the schema if it’s actually changed – unlike the create-drop strategy.

Preserving data and avoiding needless restarts does not only improve the development cycle. It also may make it usable in production. At least under certain conditions, see below.

There is one caveat: Not all changes in the schema make the Hibernate validation fail. One example is changing field length – as long as it’s varchar or text, validation passes regardless of limit. Another undetected change is nullability.

These issues can be solved by restarting the projection by hand (see below). Another possibility is having a dummy entity that doesn’t store data, but is modified to trigger the automatic restart. It could have a single field called schemaVersion, with @Column(name = "v_4") annotation updated (by developer) every time the schema changes.

Implementation

Here’s how it can be implemented:

public class HibernateSchemaExporter {
    private final EntityManager entityManager;

    public HibernateSchemaExporter(EntityManager entityManager) {
        this.entityManager = entityManager;
    }

    public void validateAndExportIfNeeded(List<Class> entityClasses) {
        Configuration config = getConfiguration(entityClasses);
        if (!isSchemaValid(config)) {
            export(config);
        }
    }

    private Configuration getConfiguration(List<Class> entityClasses) {
        SessionFactoryImplementor sessionFactory = (SessionFactoryImplementor) getSessionFactory();
        Configuration cfg = new Configuration();
        cfg.setProperty("hibernate.dialect", sessionFactory.getDialect().toString());

        // Do this when using a custom naming strategy, e.g. with Spring Boot:
        
        Object namingStrategy = sessionFactory.getProperties().get("hibernate.ejb.naming_strategy");
        if (namingStrategy instanceof NamingStrategy) {
            cfg.setNamingStrategy((NamingStrategy) namingStrategy);
        } else if (namingStrategy instanceof String) {
            try {
                log.debug("Instantiating naming strategy: " + namingStrategy);
                cfg.setNamingStrategy((NamingStrategy) Class.forName((String) namingStrategy).newInstance());
            } catch (ReflectiveOperationException ex) {
                log.warn("Problem setting naming strategy", ex);
            }
        } else {
            log.warn("Using default naming strategy");
        }
        entityClasses.forEach(cfg::addAnnotatedClass);
        return cfg;
    }

    private boolean isSchemaValid(Configuration cfg) {
        try {
            new SchemaValidator(getServiceRegistry(), cfg).validate();
            return true;
        } catch (HibernateException e) {
            // Yay, exception-driven flow!
            return false;
        }
    }

    private void export(Configuration cfg) {
        new SchemaExport(getServiceRegistry(), cfg).create(false, true);
        clearCaches(cfg);
    }

    private ServiceRegistry getServiceRegistry() {
        return getSessionFactory().getSessionFactoryOptions().getServiceRegistry();
    }

    private void clearCaches(Configuration cfg) {
        SessionFactory sf = entityManager.unwrap(Session.class).getSessionFactory();
        Cache cache = sf.getCache();
        stream(cfg.getClassMappings()).forEach(pc -> {
            if (pc instanceof RootClass) {
                cache.evictEntityRegion(((RootClass) pc).getCacheRegionName());
            }
        });
        stream(cfg.getCollectionMappings()).forEach(coll -> {
            cache.evictCollectionRegion(((Collection) coll).getCacheRegionName());
        });
    }

    private SessionFactory getSessionFactory() {
        return entityManager.unwrap(Session.class).getSessionFactory();
    }
}

The API looks pretty dated and cumbersome. There does not seem to be a way to extract Configuration from the existing SessionFactory. It’s only something that’s used to create the factory and thrown away. We have to recreate it from scratch. The above is all we needed to make it work well with Spring Boot and L2 cache.

Restarting Projections

We’ve also implemented a way to perform such a reinitialization manually, exposed as a button in the admin console. It comes in handy when something about the projection changes but does not involve modifying the schema. For example, if a value is calculated/formatted differently, but it’s still a text field, this mechanism can be used to manually have the history reprocessed. Another use case is fixing a bug.

Production Use?

We’ve been using this mechanism with great success during development. It let us freely modify the schema by only changing the Java classes and never worrying about table definitions. Thanks to combination with CQRS, we could even maintain long-running demo or pilot customer instances. Data has always been safe in the event store. We could develop the read model schema incrementally and have the changes automatically deployed to a running instance, without data loss or manually writing SQL migration scripts.

Obviously this approach has its limits. Reprocessing the entire event store at random point in time is only feasible on very small instances or if the events can be processed fast enough.

Otherwise the migration might be solved using an SQL migration script, but it has its limits. It’s often risky and difficult. It may be slow. Most importantly, if the changes are bigger and involve data that was not previously included in the read model (but is available in the events), using an SQL script simply is not an option.

A much better solution is to point the projection (with new code) to a new database. Let it reprocess the event log. When it catches up, test the view model, redirect traffic and discard the old instance. The presented solution works perfectly with this approach as well.

This post also appeared on the Oasis Digital blog.

Persistence in CQRS Read Models

One of the biggest benefits of CQRS is the ability to implement multiple read models. Business rules and the domain model are safe, clean and isolated over in the write model. They are not getting in the way of view models, which can selectively pick the pieces they are interested in, freely reshape them, and do everything in a way that needs different kind of elegance and clarity as the domain model. The read models are all about query performance and convenience.

Put simply, CQRS is a practical implementation of what Pat Helland described in his paper on immutability: The truth is the log. The database is a cache of a subset of the log. Let’s have a look at some consequences of this approach.

Persistent Models in Relational Databases

Perhaps the most obvious way to implement a read model is in a traditional SQL database. The technology has been around for decades, is really mature and battle-tested, and everyone is familiar with it.

However, in the CQRS world we can do things that would be problematic in a typical application database schema. Since we optimize for read convenience and performance, the data is very often denormalized. It can happen in a number of ways:

  • There may be fields combining data from some other fields (e.g. a single text field with human-friendly street address).
  • The same data may be present in more than one place, more than one table. For example it may make sense to have human-readable street address in a single column, but at the same time keep state and city in different columns (or table).
  • Sometimes it also makes sense to keep multiple revisions or change history of an entity, not just the final version.
  • Another good example of denormalized data is analytics (like OLAP cubes).
  • The data does not have to be relational. You don?t need to map fields in a Java object to columns, or map a graph of objects to a number of tables. You can just serialize the whole thing (to JSON, XML, using native Java serialization etc.) and put it in a blob field.

Why Denormalize?

You may be wondering why one would be using a denormalized schema in the first place. The answer is: to do more computation ahead of time, while processing events and while no human is waiting for the answer. It means less computation at the moment a human is waiting for the answer.

The reason for this is that human time is expensive and getting more so, while computing power and storage is already extremely cheap and getting even cheaper. It is worth doing potentially a lot of computation in advance, to save a human a little bit of time.

How Much Denormalization?

The degree of denormalization depends mainly on performance and query complexity. Fully normalized schema has its benefits, but also many drawbacks. Numerous joins, calculations and filters quickly become tricky to write and maintain. They can also become performance nightmare, for example with joins between massive tables. Even if you?re not joining many thousands of rows, nontrivial calculations can keep the users waiting.

Denormalization can be used to prepare the answers for queries. If a query needs data that would normally live in several large tables, they can be combined once (in asynchronous projection), and then looked up in constant or logarithmic time when the users need it. It may even be possible to go to the extreme and precalculate responses to all common queries, eliminating the need for higher-level caching. In this case the view model is the cache.

It?s necessary to look for balance here, though. Overly aggressive denormalization can lead to poor maintainability related to code duplication, as well as increase the sheer volume of data (in terms of bytes).

Other Persistent Solutions

If the data doesn?t have to be relational, or if it can be denormalized, it may be a good idea to put it in a different kind of database. There is a wide range of NoSQL options to choose from, with the most obvious candidates being document and key-value stores.

We don?t have to stop there though ? if the data could benefit from a graph database, there are no obstacles. Another great example of a view model are search indexes like Lucene.

Such stores often have their downsides. They may be trading off consistency for availability and performance. They may be very specialized or limited to particular models (graphs, documents, key-values etc.). It makes them challenging or even inapplicable as the primary persistence mechanism in a typical non-CQRS read/write model. However, they may be perfectly acceptable in a CQRS view model, and the advantages make the whole thing even more powerful.

In-Memory

Another idea we have been considering is in-memory models. Writing to and reading from disk is slow, and if the data fits in RAM, why not just keep it in memory, in ordinary data structures in your language of choice?

There are some challenges:

  • If the event store is large enough, reading and consuming it may take a relatively long time and lots of resources. The limit may be farther away than it first seems, but it certainly is something that has to be carefully thought through.
  • It needs to be transactional. It?s unacceptable for data to change while a query is reading it. You also may need to roll back, and that is far from trivial. It?s much easier in languages that support transactional memory or persistent data structures (like Clojure), and you would probably need a library with such functionality elsewhere.

These challenges could be solved by using persistent, transactional storage:

  • When consuming domain events, update an in-memory model. Don?t touch the disk.
  • Every now and then (e.g. every 1000 events or every minute) take a snapshot of that model and write it to some persistent storage.
  • Let queries read from that persistent snapshot, possibly caching it in memory.
  • After restarting the application or an error, continue consuming the events from the latest snapshot.

It?s getting close to persistent projections, but there are important differences. In this case persistence is only used for isolation and a way to resume from the savepoint after restart. Disk IO can happen asynchronously or less frequently, without slowing down the writer and queries.

Data Retention

Most queries are only interested in relatively recent data. Some may need a year or two, others may only be interested in the last week. With the source data safe on the domain side, the read models are free to keep as little as they need. It can have huge positive impact on their performance and storage requirements.

It’s also possible to have a number of models with identical schema but different data retention. Use the smaller data set as much as possible for best responsiveness. But still have the ability to fall back to a bigger data set for the occasional query about the faraway past, where longer response time is acceptable.

This approach can be combined with different granularity: Keep all the details for the last few weeks or months, and aggregate or narrow down for the longer time period.

Wrapping Up

NoSQL stores, analytics, search indexes, caches etc. are all very popular and useful tools, and very often they are used in a way resembling CQRS without acknowledging it. Whether they’re populated with triggers, messaging, polling or ETL, the end result is a new, specialized, read-only view on the data.

However, the more mature and the bigger the project, the harder it is to introduce such things. It may become prohibitively expensive, with missed opportunities eventually leading to many problems down the road.

It?s much, much easier if you have CQRS from the beginning. The domain model is kept safe and clean elsewhere, as is the ultimate source of data (like event store). The data is easily available for consumption (especially with event sourcing). All it takes to spin off a view model is plug in another consumer to the domain events.

The view models are very good candidates for innovation, too. It?s really easy to try various kinds of databases and programming languages, as well as different ways of solving problems with the same tools.

This post also appeared on the Oasis Digital blog.

Writing an Event-Sourced CQRS Read Model

Discussions about event sourcing and CQRS seem to usually focus on the overall system architecture or various flavors of domain-driven design in CQRS context. However, the read models are often neglected, even though there are some interesting considerations on this side as well. In this post we?re going to present a sample implementation of populating a view model by consuming event stream.

Overview

The idea of a read model is really simple. You take the event log, apply (replay) all the events on an initially empty data model using appropriate functions, and you get the populated model. The code could look like:

List<Event> events = getEvents();
Model model = Model.empty();
for (Event event : events) {
    apply(model, event);
}

We can make this even shorter with functional programming:

Model m = reduce(getEvents(),
                 Model.empty(),
                 (m, e) -> apply(m, e));

That is the essence. Note that it is just the abstract outline and realistic implementation is likely to differ, including buffering, batching (or streaming), persistence etc.

Applying Events

The actual Java code to apply the events may look similar to the below:

EventProcessingResult processEvents() {
    if (getState().isRunning()) {
        int batchSize = getEventsPerIteration();
        List<Event> events = eventStore.getEventsForAllStreams(getLastEventId(),
                                                               batchSize);
        if (events.isEmpty()) {
            return NO_EVENTS_TO_PROCESS;
        } else {
            return processEvents(events);
        }
    } else {
        return NOT_RUNNING;
    }
}

EventProcessingResult processEvents(List<Event> events) {
    try {
        for (Event event : events) {
            dispatchEvent(event);
        }
        return SUCCESS;
    } catch (RuntimeException e) {
        return FAILURE;
    }
}

All in all it?s really simple and straightforward. It is possible to enhance it with hooks before and after processing individual events and the entire batch. Such hooks could be used to:

  • implement transactions,
  • plug in monitoring,
  • implement error handling,
  • calculate the batch size depending on speed,
  • perform arbitrary operations, e.g. setting something up or recalculating once per batch.

The last interesting piece is the dispatchEvent method. Aside from walking the type hierarchy, error handling and making it all optional, it boils down to:

void dispatchEvent(Event e) {
    Method handler = projector.getClass().findMethod("on", e.getClass());
    handler.invoke(projector, e);
}

In other words, for each event type (like OrderCreated), we look for a public method called on that takes a single argument of matching type, on a projector object.

All of the above is part of an engine, a piece of infrastructure backing many view models. All that is necessary to implement a projection is actually provide the projector, with handlers for interesting event types. All other events will simply be ignored.

It could look like this:

public class OrderProjector {
    @Inject
    private OrderDao orders;

    public void on(OrderCreated e) {
        orders.save(new Order(e.getOrderNumber()));
    }

    public void on(OrderApproved e) {
        Order o = orders.find(e.getOrderNumber());
        o.setApproved(true);
    }
}

Projection Thread

Let?s discuss multi-threading for a moment. Shared mutable state immediately brings numerous problems and should be avoided as much as possible. One of the ways to deal with it is not having concurrency in the first place, e.g. by limiting writes to a single thread. In most cases a single-threaded writer combined with ACID transactions is more than enough to keep up with the write load. (The read/query load can be heavy and use many threads – all of the details here are only about the writes.)

The thread is responsible for applying the events to the read model, all the way from querying the event store to updating the view model database. Normally it just loads batches of events from the store and applies them. It continues as long as there are more events to process, and goes to sleep after it?s caught up. It wakes up after a certain amount of time or when notified about new events by the event store.

We also have some control over this thread?s life cycle. For example, we have a way to programmatically pause and resume each projection?s thread, even exposed in an admin GUI.

Push or Pull?

With a database-backed event store, it’s very easy to query repeatedly for new events. This is the pull model. Unfortunately, it also means that you may end up polling too often and generating needless load, or polling too infrequently and thus possibly taking longer to propagate changes to the view model.

That’s why in addition to polling the event store it’s a good idea to introduce notifications that wake up the read models as soon as new events are saved. This effectively becomes a push model with minimal delays and load. We found JGroups to be a very good tool for the job – it supports multiple protocols and is very easy to set up, involving much less hassle than a full-blown message queue.

The notifications may or may not contain actual events.

In the latter (and simpler) design, they only spread the information that a new event has been saved, along with its sequential ID (so that all projections can estimate how much behind they are). When awakened, the executor can continue along its normal path, starting with querying the event store.

Why? Because handling events coming from a single source is easier, but more importantly because a DB-backed event store trivially guarantees ordering and has no issues with lost or duplicate messages. Querying the database is very fast, given that we?re reading a single table sequentially by primary key, and most of the time the data is in RAM cache anyway. The bottleneck is in the projection thread updating its read model database.

However, there are no obstacles to putting event data in the notifications (except for maybe size or network traffic considerations). It would likely decrease the load on the event store and save some round trips to database. The projector would need to maintain a buffer and fall back to querying the event store when needed. Or the system could use a more reliable message queue.

Restarting Projections

Aside from pause/resume, the above screenshot shows one more action: restart. Innocuous as it looks, it?s a really nice and powerful feature.

Since the view model is completely derived from the event log, at any time it can be thrown away and recreated from the beginning (or from some initial state/old enough snapshot). Data is safe in the event log, the ultimate source of truth.

It?s useful when anything about the view changes: a field or a table is added, a bug is fixed, something is calculated differently. When it happens, it?s often easier (or required) to just start from the beginning, rather than for example implement massive SQL migration script.

It’s even possible to go as far as fully automating it, so that when the system starts up and it detects the DB schema does not match the corresponding Java model, it can automatically recreate the schema and reprocess the event log. It’s like running with Hibernate create-drop policy, except for that it doesn’t lose data.

Performance

The solution may appear quite limited with regards to performance.

One point that could raise an eyebrow is the single-threaded writer. In reality a single thread is usually fast enough to easily keep up with the load. Concurrency is not only more difficult to implement and maintain, but it also introduces contention. Reads (queries) can be heavily multi-threaded and easy to scale out.

We also gain a lot by having multiple read models, for example separating analytics from administration and ?transactional? data. Each model is single-threaded (for writing), but the multiple models consume events in parallel. Finally, the solution could be modified to use sharding or some kind of fork-join processing.

Another interesting point is restarting projections from scratch.

A good solution is something like kappa architecture:

  • Keep the outdated projection up and running and answering all the queries.
  • Start a new projection, e.g. to another database. Just let it process the events, don?t point any traffic to it.
  • When the new projection catches up, redirect traffic and shut down the old one.

On a very small instance, especially for development, it may even be possible to do a restart online, on the same instance. It depends on answers to the following questions: How long does it take to reprocess all events? Is it acceptable for this projection to be stale for 30 minutes? Can we deploy at night or weekend, when nobody is using the system anyway? Do we have to replay all the history?

Another factor to consider here is persistence. If it’s too much of a bottleneck and cannot be further optimized, consider using in-memory view models.

Summing Up

In essence, that?s all it takes to implement a read model consuming an event store. It gains much simplicity thanks to a linear event store and processing everything in a single thread. So much that in the end it?s really just a loop, implementing the reduction shown in the beginning.

In future posts I am going to dig deeper into practical concerns of implementing projections.

This post also appeared on the Oasis Digital blog.

Achieving Consistency in CQRS with Linear Event Store

In a recent project involving an event-sourced CQRS system, we decided to do some things that seem somewhat unusual compared to solutions mostly talked about. However, they let us achieve some nice properties that would be hard (if possible at all) otherwise.

Event Store as Regular Table

We decided to implement the event store as a regular table in an RDBMS. We used PostgreSQL, but there is little PostgreSQL-specific here. We know this database is very reliable, powerful and simply mature. On top of that, single-node ACID transactions provide some really nice benefits.

The table ended up with the following fields:

  • event_id (int) – primary key coming from a global sequence
  • stream_id (UUID) – ID of an event stream, typically a DDD aggregate
  • seq_no (int) – sequence number in history of a particular stream
  • transaction_time (timestamp) – transaction start time, the same for all events committed in one transaction
  • correlation_id (UUID)
  • payload (JSON)

Not all of them are mandatory for an event store, but there is one important and uncommon difference: event_id – globally, sequentially increasing number. We?ll get to that in a moment.

Can You Do It?

If you go for an event store in a regular DB table, getting a global event ID like this is extremely cheap. Databases are really efficient generating, storing, indexing etc. such columns. The only actual problem is whether you can afford using a DB table for it in the first place.

The system we?ve been building does not face the wide web. It?s intended for internal use in companies, with hundreds or thousands of users. This is relatively low scale, something that Postgres will have no issue serving.

All in all, it?s not something I’d recommend if you were building the next Amazon. But chances are you aren?t, and so you may be able to afford the luxury of using simpler technology.

Benefits of Global, Sequential Event ID

Now that we have this peculiar event ID, what can we do with it?

Let?s have a look at the read interface of our event store:

public interface EventStoreReader {
    List<Event> getEventsForStream(UUID streamId, long afterSequence, int limit);
    List<Event> getEventsForAllStreams(long afterEventId, int limit);
    Optional<Long> getLastEventId();
}

The first method is pretty obvious and something you can find everywhere. We only use it to restore a single stream (aggregate) from the event store for handling a new command.

The other two are using the event ID, returning a batch of events after a particular event, and ID of the last event. They are the base of our read models (projections).

Read models are implemented by polling (with hints) the event store. They remember the ID of the last processed event. Every once in a while (or when awoken by a notification from the event store), they read the next batch of events from the store and process them in sequence, in a single thread.

This kind of linear, single-threaded processing is probably as simple as it can get, but it obviously has limited scalability. If you get 600 events per minute, it means on average you cannot be slower than 100 ms per event, no matter what. In reality you also need to consider overhead and leave some headroom, so it needs to be faster than that.

It can be addressed with sharding or parallelizing writes in the read model, but for the moment we did not find it necessary. Having multiple independent, specialized models running in parallel certainly helps with that.

Comparing the last-processed event ID for a projection to the current global maximum, you can immediately tell how much behind the projection is. It?s the logical equivalent of queue size.

The global sequence can also be used to mitigate the downsides of eventual consistency (or staleness).

Executing a command could return the ID of the last written event. Then a query can use this ID, requesting: ?I?m fine waiting 5 seconds, but don?t give me the result if your data is older than this ID?. Most of the time it?s a matter of mere milliseconds. For that price, when a user makes a change, she immediately sees the results. And it?s the actual data coming from the server, not simulation achieved by duplicating domain logic in the user interface!

It?s also useful on the domain side. We have some application and domain services that query some domain-specific projections (e.g. for unique checks). If you know that the last event in the event store is X, you can just wait until the projection catches up to that point before making further progress on the command. That?s all it takes to address many problems typically solved with a saga.

Last but not least, since all events are ordered, the projection is always consistent. It may be behind by a few seconds, or a few days, but it?s never inconsistent. It?s simply impossible to run into issues like having one stream processed until Monday, but another until Thursday. If something had happened before a particular event occurred, the same order is always maintained in the view model.

It makes the code and the state of the system a lot easier to write, maintain, and reason about.

Scalability and Complexity Revisited

There is a tendency to use complex, high-scalability technology regardless of the actual customer requirements and realistic scale. Such tools have their place, but they’re not obvious winners, no golden hammers solving all problems. Moreover, they are really expensive, if you consider the complexity of development and operations, and their limits.

Sometimes a simpler tool will solve the problem well. Not only do you save on development and ops, but also gain access to some really powerful tools that are impossible at high scale. Including global counters, linearizability and ACID transactions.

Our example shows a system that was complex enough to warrant event sourcing with CQRS, but sufficiently low scale to allow use of a linear event store, even with linear projections, all in plain Postgres databases.

There are many good reasons to choose boring technology. If you innovate (and you should), be careful with why you actually do it, and don?t innovate in all areas at the same time.

This post also appeared on the Oasis Digital blog.

UPDATE: Discussion

There is a very interesting discussion of the post at Reddit.

Much attention goes to the use of an SQL database. It does slow things down, it has so many features you won’t use with an append-only log, but it also has all you need. The limits themselves are way high, at least in the thousands (or tens of thounsands) of writes per second. We know very well how these DBs behave, everyone knows how to use one, you can find an admin right around the corner.

Regarding linear, single-threaded writes alone, they can scale even further. Single-threaded writes align well with hardware limitations, and at the same time are capable of processing millions of transactions per second (though not necessarily with an SQL database, yet). A good related example is the LMAX architecture. In any case, there is a good chance you don’t need multithreading or distributed systems.

Introduction to Event Sourcing and Command-Query Responsibility Segregation

The concepts of event sourcing (ES) and command-query responsibility segregation (CQRS) have been around for quite a while. They are getting more and more attention in the Java community, though they seem to have been much more popular over on the .NET side.

I have spent the last few months implementing a system with such an architecture at Oasis Digital. While working on it, we aimed for flexibility and the ability to make the right trade-offs and choose our own tools. Some interesting solutions came up in the process, things that would probably be impossible with existing frameworks like Axon.

I am going to write more about some of these pieces in the future. Before we get there, let?s start with a brief introduction to event sourcing and command-query responsibility segregation.

CQS

The main idea of command-query separation (CQS) is that all operations are either:

  • commands, changing state of the system,
  • queries, getting some information from the system.

Either one or the other, never both. For example, if a command changes anything in the system, it should not be used to read its state. Mutation-free read access should always be possible. Asking a system to change something to read a value seems plain wrong, and queries inadvertently changing state are very confusing, surprising and leading to bugs at best.

These principles can be applied on all levels. Uncle Bob in ?Clean Code? calls it one of the main principles of good function design. The pattern is also applicable to system design.

CQRS in System Architecture

Command-query responsibility segregation (CQRS) is a pattern in system architecture inspired by CQS. It divides the system in two distinct parts, separating the components used for writing (executing commands) from those for querying. In such a system we can find two kinds of requests corresponding to these two models.

AC stands for Autonomous Component

Firstly, there are commands – ordering the system do something or change its state. A piece of business logic updates the domain model (or rejects the command) and lets the client know that the change has been accepted (or not).

The business logic is often implemented using domain-driven design, but it may just as well be transaction script or any other applicable technique. Actually, we ended up using a mix of these two in one area of the system.

Whenever anything changes, the domain model somehow notifies everyone – for example by publishing domain events. These events are received by view models, which update their own representation of the state, separate from the domain model.

This leads us to the second kind of requests: queries, getting information from the system without changing its state. These only use the view models, not the domain.

What is especially powerful about this pattern is the separation of concerns.

The domain (write) side is all about the business. It does not care about queries, all the different ways to show this information to the users, performance, optimal storage for these purposes and all that.

On the other hand, the query (read) side is all about the read access. Its main purpose is making queries fast and convenient.

While the domain is only implemented once, there can be multiple view models over the same data. They often use different databases. They can even be using different technologies – all kinds of NoSQL, normalized and denormalized SQL, in-memory representations, Lucene indexes, OLAP cubes etc.

The read models are also free to use different languages, if they make anything easier. There are no obstacles to implementing the domain model in Scala, but doing view models in Clojure or even SQL stored procedures.

Overall, the view models are very good (and safe) candidates for innovation.

Unlike the domain model, code quality in view models does not have to be perfect. They’re all about reshaping data and moving it around to make reads convenient. Some shortcuts and dirty tricks may be acceptable, as long as they don’t make the whole thing unmaintainable.

Read models are often denormalized, prepared to answer concrete questions in optimal way. It can even be as extreme as having a precomputed set answers to every query that the system will handle, stored in some trivial key-value way.

We often call the view models “projections” – because they “project” the domain events to a particular model, keeping only as much information as is necessary and in optimal shape for queries they serve.

Note that all the domain logic is only implemented in the domain (write) model. It’s done once and in the right place. Even if a value is directly derived (calculated), as soon as it has a business meaning or is calculated with some business logic, it belongs in the domain model.

Event Sourcing

Another pattern commonly used with CQRS is event sourcing.

In short, it means that all data in the system is stored in the form of events, in an event log. An event is a piece of information stating that something has occurred (user created, name changed, shipping address added, order submitted, order delivered). They?re always in past tense, saying that something has happened.

The events never change. You can never delete or update them. If something has happened, it?s happened. If it was a mistake, it can be corrected with a complementary action generating a new ?inverse? event, but there?s no going back and saying it has not happened.

Combining ES and CQRS

Event sourcing and command-query responsibility segregation perfectly fit each other. It?s a powerful synergy effect: each of them becomes more powerful thanks to the combination.

When a command comes in, the domain model calculates the new state of the system and possibly emits some new events (which are the only way the changes are persisted). When another command comes in for the same logical area, the domain model is restored from the past events, and responds to the new command by generating some new events. The events represent concrete changes that have business meanings. Technically in a sense they are ?deltas? (or ?diffs?) of the system state.

The view models simply handle these events, picking up only these they are interested in, and updating their state to support future queries.

Benefits

There are many benefits to using this approach, let me just point out a few of them.

CQRS naturally leads to task-based UIs. Every action represents some very concrete business event. There can be a huge difference between changing someone?s name, changing their shipping address, or making them a gold customer. If end user wants to change name, they get a small form that has exactly what is needed for that operation. If they make a customer gold – that?s another action in another form.

Contrast this with the traditional CRUD, spreadsheet-like systems. They have no such operations as ?change name? or ?change customer status to gold?. All the users can do is ?change user?. Implementing logic that changes something when a particular field changes all of a sudden becomes harder. Validation is a nightmare. Auditing and simply seeing when something happened and what the users did, is impossible without adding more layers of complexity on top.

It is harder for the users, too – the use cases have to be implemented in their minds, knowing what to change when to achieve a desired effect.

Related to the above, and a reason why CQRS is often used with domain-driven design, is that the shape of the system naturally maps to business contexts and use cases. Commands correspond to concrete user intents, and queries are designed to answer concrete questions. Once again, it?s the exact opposite of a glorified spreadsheet-like DB frontend.

It?s applicable on higher level, too: Different areas of the business (bounded contexts in the DDD lingo) can be implemented as separate models. For example, a warehousing context can represent a book in a completely different way than a sales context. One may be interested in its size, weight and number on stock. The other – in author, genre, cover image, publisher description etc.

Event sourcing also makes reporting a lot easier. By definition it never loses information. Maybe yesterday you did not need to know how often users add and remove items from the shopping cart, and only cared about the orders they eventually submitted. But today business wants to trace this information, so maybe they can discover items that clients wanted but changed their mind. It may be worthwhile to tempt them with these items in future ads.

Answering such a change in a ?traditional? system would be a nontrivial effort. With ES+CQRS chances are that all it will take is just another straightforward projection of data that is already there, and immediately answer questions about the past!

Finally, another obvious benefit is performance. Since the view models are separate, they can have very different schema. Avoid joins, keep data denormalized, answer many questions in linear or even constant time. Read-only access is much easier to scale, as is the write side which doesn?t care about queries anymore.

Costs

ES+CQRS is not without cost, and is not the best approach to all systems. More about this in a future post.

More Resources

Like I said in the beginning, these ideas have been around for quite a few years. There is a huge number of resources available online, in books and at conferences. This post has merely scratched the surface and is only meant as (yet another) humble introduction to the topic.

Here?s a few links to the masters:

This post also appeared on the Oasis Digital blog.

Validating Class/Package Dependencies with Classycle

Classycle is a very nice analyzer and dependency checker for class and package dependencies.

It lets you define package groups (components, layers) and express unwanted dependencies such as cycles, or dependencies between particular packages. For example you can specify that you want no package cycles and no dependencies from com.foo.domain.* on com.foo.api.*. All using a very human-friendly, concise format.

Then you kick off the analyzer (it comes with an Ant task and a standalone command line tool) and it produces a report with violations.

There is a number of other tools out there: JDepend, Sonar, JArchitect and so on. So why Classycle?

  • It’s free (BSD license).
  • It’s fast.
  • It’s powerful and expressive. The rules take just a few lines of easily readable text.
  • It integrates with build tools very well. We have it running as part of the build script, for every build. It’s really just another automated test. Thanks to that the project structure is probably the cleanest one I’ve worked with so far.

Gradle Plugin

Thanks to having an Ant task Classycle is very easy to integrate with Gradle, with one caveat: The official build is not in Maven Central, and the only build that is there does not include the Ant task.

Gradle itself uses Classycle via a script plugin, buried somewhere in project structure. They published Classycle on their own repository, but it’s the older version that doesn’t support Java 8.

Inspired by that, we wrote our own plugin and made it available for everyone with minimum effort. It’s available on Gradle Plugin Portal and on GitHub.

In order to use it, all you need is:

  • Add the plugin to your project:
    plugins { id "pl.squirrel.classycle" version "1.1" }
    
  • Create Classycle definition file for each source set you want to have covered in src/test/resources/classycle-${sourceSet.name}.txt:

    show allResults
    
    {package} = com.example
    check absenceOfPackageCycles > 1 in ${package}.*
    
  • Congratulations, that’s all it takes to integrate Classycle with your Gradle build! Now you have the following tasks:
    # For each source set that has the dependency definition file:
    classycleMain, classycleTest, ... 
    
    # Analyze all source steps in one hit:
    classycle
    
    # Also part of the check task:
    check
    

See Plugin Portal and GitHub for more information. Happy validating!

Walking Recursive Data Structures Using Java 8 Streams

The Streams API is a real gem in Java 8, and I keep finding more or less unexpected uses for them. I recently wrote about using them as ForkJoinPool facade. Here’s another interesting example: Walking recursive data structures.

Without much ado, have a look at the code:

class Tree {
    private int value;
    private List<Tree> children = new LinkedList<>();

    public Tree(int value, List<Tree> children) {
        super();
        this.value = value;
        this.children.addAll(children);
    }

    public Tree(int value, Tree... children) {
        this(value, asList(children));
    }

    public int getValue() {
        return value;
    }

    public List<Tree> getChildren() {
        return Collections.unmodifiableList(children);
    }

    public Stream<Tree> flattened() {
        return Stream.concat(
                Stream.of(this),
                children.stream().flatMap(Tree::flattened));
    }
}

It’s pretty boring, except for the few highlighted lines.

Let’s say we want to be able to find elements matching some criteria in the tree or find particular element. One typical way to do it is a recursive function – but that has some complexity and is likely to need a mutable argument (e.g. a set where you can append matching elements). Another approach is iteration with a stack or a queue. They work fine, but take a few lines of code and aren’t so easy to generalize.

Here’s what we can do with this flattened function:

// Get all values in the tree:
t.flattened().map(Tree::getValue).collect(toList());

// Get even values:
t.flattened().map(Tree::getValue).filter(v -> v % 2 == 0).collect(toList());

// Sum of even values:
t.flattened().map(Tree::getValue).filter(v -> v % 2 == 0).reduce((a, b) -> a + b);

// Does it contain 13?
t.flattened().anyMatch(n -> n.getValue() == 13);

I think this solution is pretty slick and versatile. One line of code (here split to 3 for readability on blog) is enough to flatten the tree to a straightforward stream that can be searched, filtered and whatnot.

It’s not perfect though: It is not lazy and flattened is called for each and every node in the tree every time. It probably could be improved using a Supplier. Anyway, it doesn’t matter for typical, reasonably small trees, especially in a business application on a very tall stack of libraries. But for very large trees, very frequent execution and tight time constraints the overhead might cause some trouble.

Java 8 Streams API as Friendly ForkJoinPool Facade

One of features I love the most about Java 8 is the streams API. It finally eliminates pretty much all loops from the code and lets you write code that is so much more expressive and focused.

Today I realized it can be used for something else: As a nice front-end for the ForkJoinPool.

Problem: Executors Boilerplate

Let’s say we want to run a number of tasks in parallel. Nothing fancy, let’s say each of them just prints out the name of the executing thread (so we can see it run in parallel). We want to resume execution after they’re all done.

If you want to run a bunch of tasks in parallel using an ExecutorService, you probably need to do something like the following:

ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    executor.submit(() -> System.out.println(Thread.currentThread()));
}
executor.shutdown();
try {
    executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
    // TODO handle...
}

Now, that is a lot of code! But we can do better.

Solution: Stream API

In the end I came up with this utility:

void doInParallelNTimes(int times, Runnable op) {
    IntStream.range(0, times).parallel().forEach(i -> op.run());
}

Reusable and all. Call it like:

doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));

Done.

This one prints out the following. Note that it’s actually using the main thread as well – since it’s held hostage anyway and cannot resume until execution finishes.

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

Another Example: Parallel Computation

Here’s another example. Instead of doing the same thing N times, we can use the stream API to process a number of different tasks in parallel. We can create (“seed”) a stream with any collection or set of values, have a function executed on them in parallel, and finally aggregate the results (collect to a collection, reduce to a single value etc.)

Let’s see how we could calculate a sum of the first 45 Fibonacci numbers:

public class Tester {
    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        IntStream.range(1, 45).parallel().map(Tester::fib).sum();
        System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");

        stopwatch.reset();
        stopwatch.start();
        IntStream.range(1, 45).map(Tester::fib).sum();
        System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");
    }

    private static int fib(int n) {
        if (n == 1 || n == 2) {
            return 1;
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Prints out:

Parallel took 3078 ms
Sequential took 7327 ms

It achieves a lot in a single line of code. First it creates a stream with descriptions of all the tasks that we want to run in parallel. Then it calls a function on all of them in parallel. Finally it returns the sum of all these results.

It’s not all that contrived. I can easily imagine creating a stream with arbitrary values (including rich Java objects) and executing a nontrivial operation on them. It doesn’t matter, orchestrating all that would still look the same.

When to do it?

I think this solution is pretty good for all the cases when you know the load upfront, and you want to fork execution to multiple threads and resume after they’re all done. I needed this for some test code, but it would probably work well in many other fork/join or divide-and-conquer scenarios.

Obviously it does not work if you want to run something in background and resume execution or if you want to have a background executor running over a long period of time.