Category Archives: Java

Rapid Development with Hibernate in CQRS Read Models

In this post I’m going to share a few tricks for using Hibernate tooling in CQRS read models for rapid development.

Why Hibernate?

Hibernate is extremely popular. It’s also deceptively easy on the outside and fairly complex on the inside. It makes it very easy get started without in-depth understanding, misuse, and discover problems when it’s already too late. For all these reasons these days it’s rather infamous.

However, it still is a piece of solid and mature technology. Battle-tested, robust, well-documented and having solutions to many common problems in the box. It can make you *very* productive. Even more so if you include tooling and libraries around it. Finally, it is safe as long as you know what you’re doing.

Automatic Schema Generation

Keeping SQL schema in sync with Java class definitions is rather expensive a bit of a struggle. In the best case it’s very tedious and time-consuming activity. There are numerous opportunities for mistakes.

Hibernate comes with a schema generator (hbm2ddl), but in its “native” form is of limited use in production. It can only validate the schema, attempt an update or export it, when the SessionFactory is created. Fortunately, the same utility is available for custom programmatic use.

We went one step further and integrated it with CQRS projections. Here’s how it works:

  • When the projection process thread starts, validate whether DB schema matches the Java class definitions.
  • If it does not, drop the schema and re-export it (using hbm2ddl). Restart the projection, reprocessing the event store from the very beginning. Make the projection start from the very beginning.
  • If it does match, just continue updating the model from the current state.

Thanks to this, much of the time you don’t have to we almost never type SQL with table definitions by hand. It makes development a lot faster. It’s similar to working with hbm2ddl.auto = create-drop. However, using this in a view model means it does not actually lose data (which is safe in the event store). Also, it’s smart enough to only recreate the schema if it’s actually changed – unlike the create-drop strategy.

Preserving data and avoiding needless restarts does not only improve the development cycle. It also may make it usable in production. At least under certain conditions, see below.

There is one caveat: Not all changes in the schema make the Hibernate validation fail. One example is changing field length – as long as it’s varchar or text, validation passes regardless of limit. Another undetected change is nullability.

These issues can be solved by restarting the projection by hand (see below). Another possibility is having a dummy entity that doesn’t store data, but is modified to trigger the automatic restart. It could have a single field called schemaVersion, with @Column(name = "v_4") annotation updated (by developer) every time the schema changes.

Implementation

Here’s how it can be implemented:

public class HibernateSchemaExporter {
    private final EntityManager entityManager;

    public HibernateSchemaExporter(EntityManager entityManager) {
        this.entityManager = entityManager;
    }

    public void validateAndExportIfNeeded(List<Class> entityClasses) {
        Configuration config = getConfiguration(entityClasses);
        if (!isSchemaValid(config)) {
            export(config);
        }
    }

    private Configuration getConfiguration(List<Class> entityClasses) {
        SessionFactoryImplementor sessionFactory = (SessionFactoryImplementor) getSessionFactory();
        Configuration cfg = new Configuration();
        cfg.setProperty("hibernate.dialect", sessionFactory.getDialect().toString());

        // Do this when using a custom naming strategy, e.g. with Spring Boot:
        
        Object namingStrategy = sessionFactory.getProperties().get("hibernate.ejb.naming_strategy");
        if (namingStrategy instanceof NamingStrategy) {
            cfg.setNamingStrategy((NamingStrategy) namingStrategy);
        } else if (namingStrategy instanceof String) {
            try {
                log.debug("Instantiating naming strategy: " + namingStrategy);
                cfg.setNamingStrategy((NamingStrategy) Class.forName((String) namingStrategy).newInstance());
            } catch (ReflectiveOperationException ex) {
                log.warn("Problem setting naming strategy", ex);
            }
        } else {
            log.warn("Using default naming strategy");
        }
        entityClasses.forEach(cfg::addAnnotatedClass);
        return cfg;
    }

    private boolean isSchemaValid(Configuration cfg) {
        try {
            new SchemaValidator(getServiceRegistry(), cfg).validate();
            return true;
        } catch (HibernateException e) {
            // Yay, exception-driven flow!
            return false;
        }
    }

    private void export(Configuration cfg) {
        new SchemaExport(getServiceRegistry(), cfg).create(false, true);
        clearCaches(cfg);
    }

    private ServiceRegistry getServiceRegistry() {
        return getSessionFactory().getSessionFactoryOptions().getServiceRegistry();
    }

    private void clearCaches(Configuration cfg) {
        SessionFactory sf = entityManager.unwrap(Session.class).getSessionFactory();
        Cache cache = sf.getCache();
        stream(cfg.getClassMappings()).forEach(pc -> {
            if (pc instanceof RootClass) {
                cache.evictEntityRegion(((RootClass) pc).getCacheRegionName());
            }
        });
        stream(cfg.getCollectionMappings()).forEach(coll -> {
            cache.evictCollectionRegion(((Collection) coll).getCacheRegionName());
        });
    }

    private SessionFactory getSessionFactory() {
        return entityManager.unwrap(Session.class).getSessionFactory();
    }
}

The API looks pretty dated and cumbersome. There does not seem to be a way to extract Configuration from the existing SessionFactory. It’s only something that’s used to create the factory and thrown away. We have to recreate it from scratch. The above is all we needed to make it work well with Spring Boot and L2 cache.

Restarting Projections

We’ve also implemented a way to perform such a reinitialization manually, exposed as a button in the admin console. It comes in handy when something about the projection changes but does not involve modifying the schema. For example, if a value is calculated/formatted differently, but it’s still a text field, this mechanism can be used to manually have the history reprocessed. Another use case is fixing a bug.

Production Use?

We’ve been using this mechanism with great success during development. It let us freely modify the schema by only changing the Java classes and never worrying about table definitions. Thanks to combination with CQRS, we could even maintain long-running demo or pilot customer instances. Data has always been safe in the event store. We could develop the read model schema incrementally and have the changes automatically deployed to a running instance, without data loss or manually writing SQL migration scripts.

Obviously this approach has its limits. Reprocessing the entire event store at random point in time is only feasible on very small instances or if the events can be processed fast enough.

Otherwise the migration might be solved using an SQL migration script, but it has its limits. It’s often risky and difficult. It may be slow. Most importantly, if the changes are bigger and involve data that was not previously included in the read model (but is available in the events), using an SQL script simply is not an option.

A much better solution is to point the projection (with new code) to a new database. Let it reprocess the event log. When it catches up, test the view model, redirect traffic and discard the old instance. The presented solution works perfectly with this approach as well.

This post also appeared on the Oasis Digital blog.

Validating Class/Package Dependencies with Classycle

Classycle is a very nice analyzer and dependency checker for class and package dependencies.

It lets you define package groups (components, layers) and express unwanted dependencies such as cycles, or dependencies between particular packages. For example you can specify that you want no package cycles and no dependencies from com.foo.domain.* on com.foo.api.*. All using a very human-friendly, concise format.

Then you kick off the analyzer (it comes with an Ant task and a standalone command line tool) and it produces a report with violations.

There is a number of other tools out there: JDepend, Sonar, JArchitect and so on. So why Classycle?

  • It’s free (BSD license).
  • It’s fast.
  • It’s powerful and expressive. The rules take just a few lines of easily readable text.
  • It integrates with build tools very well. We have it running as part of the build script, for every build. It’s really just another automated test. Thanks to that the project structure is probably the cleanest one I’ve worked with so far.

Gradle Plugin

Thanks to having an Ant task Classycle is very easy to integrate with Gradle, with one caveat: The official build is not in Maven Central, and the only build that is there does not include the Ant task.

Gradle itself uses Classycle via a script plugin, buried somewhere in project structure. They published Classycle on their own repository, but it’s the older version that doesn’t support Java 8.

Inspired by that, we wrote our own plugin and made it available for everyone with minimum effort. It’s available on Gradle Plugin Portal and on GitHub.

In order to use it, all you need is:

  • Add the plugin to your project:
    plugins { id "pl.squirrel.classycle" version "1.1" }
    
  • Create Classycle definition file for each source set you want to have covered in src/test/resources/classycle-${sourceSet.name}.txt:

    show allResults
    
    {package} = com.example
    check absenceOfPackageCycles > 1 in ${package}.*
    
  • Congratulations, that’s all it takes to integrate Classycle with your Gradle build! Now you have the following tasks:
    # For each source set that has the dependency definition file:
    classycleMain, classycleTest, ... 
    
    # Analyze all source steps in one hit:
    classycle
    
    # Also part of the check task:
    check
    

See Plugin Portal and GitHub for more information. Happy validating!

Walking Recursive Data Structures Using Java 8 Streams

The Streams API is a real gem in Java 8, and I keep finding more or less unexpected uses for them. I recently wrote about using them as ForkJoinPool facade. Here’s another interesting example: Walking recursive data structures.

Without much ado, have a look at the code:

class Tree {
    private int value;
    private List<Tree> children = new LinkedList<>();

    public Tree(int value, List<Tree> children) {
        super();
        this.value = value;
        this.children.addAll(children);
    }

    public Tree(int value, Tree... children) {
        this(value, asList(children));
    }

    public int getValue() {
        return value;
    }

    public List<Tree> getChildren() {
        return Collections.unmodifiableList(children);
    }

    public Stream<Tree> flattened() {
        return Stream.concat(
                Stream.of(this),
                children.stream().flatMap(Tree::flattened));
    }
}

It’s pretty boring, except for the few highlighted lines.

Let’s say we want to be able to find elements matching some criteria in the tree or find particular element. One typical way to do it is a recursive function – but that has some complexity and is likely to need a mutable argument (e.g. a set where you can append matching elements). Another approach is iteration with a stack or a queue. They work fine, but take a few lines of code and aren’t so easy to generalize.

Here’s what we can do with this flattened function:

// Get all values in the tree:
t.flattened().map(Tree::getValue).collect(toList());

// Get even values:
t.flattened().map(Tree::getValue).filter(v -> v % 2 == 0).collect(toList());

// Sum of even values:
t.flattened().map(Tree::getValue).filter(v -> v % 2 == 0).reduce((a, b) -> a + b);

// Does it contain 13?
t.flattened().anyMatch(n -> n.getValue() == 13);

I think this solution is pretty slick and versatile. One line of code (here split to 3 for readability on blog) is enough to flatten the tree to a straightforward stream that can be searched, filtered and whatnot.

It’s not perfect though: It is not lazy and flattened is called for each and every node in the tree every time. It probably could be improved using a Supplier. Anyway, it doesn’t matter for typical, reasonably small trees, especially in a business application on a very tall stack of libraries. But for very large trees, very frequent execution and tight time constraints the overhead might cause some trouble.

Java 8 Streams API as Friendly ForkJoinPool Facade

One of features I love the most about Java 8 is the streams API. It finally eliminates pretty much all loops from the code and lets you write code that is so much more expressive and focused.

Today I realized it can be used for something else: As a nice front-end for the ForkJoinPool.

Problem: Executors Boilerplate

Let’s say we want to run a number of tasks in parallel. Nothing fancy, let’s say each of them just prints out the name of the executing thread (so we can see it run in parallel). We want to resume execution after they’re all done.

If you want to run a bunch of tasks in parallel using an ExecutorService, you probably need to do something like the following:

ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    executor.submit(() -> System.out.println(Thread.currentThread()));
}
executor.shutdown();
try {
    executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
    // TODO handle...
}

Now, that is a lot of code! But we can do better.

Solution: Stream API

In the end I came up with this utility:

void doInParallelNTimes(int times, Runnable op) {
    IntStream.range(0, times).parallel().forEach(i -> op.run());
}

Reusable and all. Call it like:

doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));

Done.

This one prints out the following. Note that it’s actually using the main thread as well – since it’s held hostage anyway and cannot resume until execution finishes.

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

Another Example: Parallel Computation

Here’s another example. Instead of doing the same thing N times, we can use the stream API to process a number of different tasks in parallel. We can create (“seed”) a stream with any collection or set of values, have a function executed on them in parallel, and finally aggregate the results (collect to a collection, reduce to a single value etc.)

Let’s see how we could calculate a sum of the first 45 Fibonacci numbers:

public class Tester {
    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        IntStream.range(1, 45).parallel().map(Tester::fib).sum();
        System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");

        stopwatch.reset();
        stopwatch.start();
        IntStream.range(1, 45).map(Tester::fib).sum();
        System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");
    }

    private static int fib(int n) {
        if (n == 1 || n == 2) {
            return 1;
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Prints out:

Parallel took 3078 ms
Sequential took 7327 ms

It achieves a lot in a single line of code. First it creates a stream with descriptions of all the tasks that we want to run in parallel. Then it calls a function on all of them in parallel. Finally it returns the sum of all these results.

It’s not all that contrived. I can easily imagine creating a stream with arbitrary values (including rich Java objects) and executing a nontrivial operation on them. It doesn’t matter, orchestrating all that would still look the same.

When to do it?

I think this solution is pretty good for all the cases when you know the load upfront, and you want to fork execution to multiple threads and resume after they’re all done. I needed this for some test code, but it would probably work well in many other fork/join or divide-and-conquer scenarios.

Obviously it does not work if you want to run something in background and resume execution or if you want to have a background executor running over a long period of time.