Tag Archives: Concurrency

Java 8 Streams API as Friendly ForkJoinPool Facade

One of features I love the most about Java 8 is the streams API. It finally eliminates pretty much all loops from the code and lets you write code that is so much more expressive and focused.

Today I realized it can be used for something else: As a nice front-end for the ForkJoinPool.

Problem: Executors Boilerplate

Let’s say we want to run a number of tasks in parallel. Nothing fancy, let’s say each of them just prints out the name of the executing thread (so we can see it run in parallel). We want to resume execution after they’re all done.

If you want to run a bunch of tasks in parallel using an ExecutorService, you probably need to do something like the following:

ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    executor.submit(() -> System.out.println(Thread.currentThread()));
}
executor.shutdown();
try {
    executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
    // TODO handle...
}

Now, that is a lot of code! But we can do better.

Solution: Stream API

In the end I came up with this utility:

void doInParallelNTimes(int times, Runnable op) {
    IntStream.range(0, times).parallel().forEach(i -> op.run());
}

Reusable and all. Call it like:

doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));

Done.

This one prints out the following. Note that it’s actually using the main thread as well – since it’s held hostage anyway and cannot resume until execution finishes.

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

Another Example: Parallel Computation

Here’s another example. Instead of doing the same thing N times, we can use the stream API to process a number of different tasks in parallel. We can create (“seed”) a stream with any collection or set of values, have a function executed on them in parallel, and finally aggregate the results (collect to a collection, reduce to a single value etc.)

Let’s see how we could calculate a sum of the first 45 Fibonacci numbers:

public class Tester {
    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        IntStream.range(1, 45).parallel().map(Tester::fib).sum();
        System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");

        stopwatch.reset();
        stopwatch.start();
        IntStream.range(1, 45).map(Tester::fib).sum();
        System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");
    }

    private static int fib(int n) {
        if (n == 1 || n == 2) {
            return 1;
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Prints out:

Parallel took 3078 ms
Sequential took 7327 ms

It achieves a lot in a single line of code. First it creates a stream with descriptions of all the tasks that we want to run in parallel. Then it calls a function on all of them in parallel. Finally it returns the sum of all these results.

It’s not all that contrived. I can easily imagine creating a stream with arbitrary values (including rich Java objects) and executing a nontrivial operation on them. It doesn’t matter, orchestrating all that would still look the same.

When to do it?

I think this solution is pretty good for all the cases when you know the load upfront, and you want to fork execution to multiple threads and resume after they’re all done. I needed this for some test code, but it would probably work well in many other fork/join or divide-and-conquer scenarios.

Obviously it does not work if you want to run something in background and resume execution or if you want to have a background executor running over a long period of time.

Systems that Run Forever Self-heal and Scale

I recently saw a great presentation by Joe Armstrong called “Systems that run forever self-heal and scale” . Joe Armstrong is the inventor of Erlang and he does mention Erlang quite a lot, but the principles are very much universal and applicable with other languages and tools.

The talk is well worth watching, but here’s a few quick notes for a busy reader or my future self.

General remarks

  • If you want to run forever, you have to have more than one instance of everything. If anything is unique, then as soon as that service or machine goes down your system goes down. This may be due to unplanned outage or routine software update. Obvious but still pretty hard.

  • There are two ways to design systems: scaling up or scaling down. If you want a system for 1,000 users, you can start with design for 10 users and expand it, or start with 1,000,000 users and scale it down. You will get different design for your 1,000 users depending on where you start.

  • The hardest part is distributing data in a consistent, durable manner. Don’t even try to do it yourself, use known algorithms, libraries and products.

    Data is sacred, pay attention to it. Web services and such frameworks? Whatever, anyone can write those.

  • Distributing computations is much easier. They can be performed anywhere, resumed or retried after a failure etc. There are some more suggestions hints on how to do it.

Six rules of a reliable system

  1. Isolation – when one process crashes, it should not crash others. Naturally leads to better fault-tolerance, scalability, reliability, testability and comprehensibility. It all also means much easier code upgrades.

  2. Concurrency – pretty obvious: you need more than one computer to make a non-stop system, and that automatically means they will operate concurrently and be distributed.

  3. Failure detection – you can’t fix it if you can’t detect it. It has to work across machine and process boundaries because the entire machine and process can’t fail. You can’t heal yourself when you have a heart attack, it has to be external force.

    It implies asynchronous communication and message-driven model.

    Interesting idea: supervision trees. Supervisors on higher levels of the tree, workers in leaves.

  4. Fault identification – when it fails, you also need to know why it failed.

  5. Live code upgrade – obvioius must have for zero downtime. Once you start the system, never stop it.

  6. Stable storage – store things forever in multiple copies, distributed across many machines and places etc.

    With proper stable storage you don’t need backups. Snapshots, yes, but not backups.

Others: Fail fast, fail early, let it crash. Don’t swallow errors, don’t continue unless you really know what you’re doing. Better crash and let the higher level process decide how to deal with illegal state.

Actor model in Erlang

We’re used to two notions of running things concurrently: processes and threads. The difference? Processes are isolated, live in different places in memory and one can’t screw the other. Threads can.

Answer from Erlang: Actors. They are isolated processes, but they’re not the heavy operating system processes. They all live in the Erlang VM, rely on it for scheduling etc. They’re very light and you can easily run thousands of them on a computer.

Conclusion

Much of this is very natural in functional programming. Perhaps that’s what makes functional programming so popular nowadays – that in this paradigm it’s so much easier to write reliable, fault-tolerant scalable, comprehensible systems.

Version-Based Optimistic Concurrency Control in JPA/Hibernate

This article is an introduction to version-based optimistic concurrency control in Hibernate and JPA. The concept is fairly old and much has been written on it, but anyway I have seen it reinvented, misunderstood and misused. I’m writing it just to spread knowledge and hopefully spark interest in the subject of concurrency control and locking.

Use Cases

Let’s say we have a system used by multiple users, where each entity can be modified by more than one user. We want to prevent situations where two persons load some information, make some decision based on what they see, and update the state at the same time. We don’t want to lose changes made by the user who first clicked “save” by overwriting them in the following transaction.

It can also happen in server environment – multiple transactions can modify a shared entity, and we want to prevent scenarios like this:

  1. Transaction 1 loads data
  2. Transaction 2 updates that data and commits
  3. Using state loaded in step 1 (which is no longer current), transaction 1 performs some calculations and update the state

In some ways it’s comparable to non-repeatable reads.

Solution: Versioning

Hibernate and JPA implement the concept of version-based concurrency control for this reason. Here’s how it works.

You can mark a simple property with @Version or <version> (numeric or timestamp). It’s going to be a special column in database. Our mapping can look like:

@Entity
@Table(name = "orders")
public class Order {
	@Id
	private long id;

	@Version
	private int version;

	private String description;

	private String status;

	// ... mutators
}

When such an entity is persisted, the version property is set to a starting value.

Whenever it’s updated, Hibernate executes query like:

update orders
set description=?, status=?, version=? 
where id=? and version=?

Note that in the last line, the WHERE clause now includes version. This value is always set to the “old” value, so that it only will update a row if it has the expected version.

Let’s say two users load an order at version 1 and take a while looking at it in the GUI.

Anne decides to approve the order and executes such action. Status is updated in database, everything works as expected. Versions passed to update statement look like:

update orders
set description=?, status=?, version=2
where id=? and version=1

As you can see, while persisting that update the persistence layer increments the version counter to 2.

In her GUI, Betty still has the old version (number 1). When she decides to perform an update on the order, the statement looks like:

update orders
set description=?, status=?, version=2
where id=? and version=1

At this point, after Anne’s update, the row’s version in database is 2. So this second update affects 0 rows (nothing matches the WHERE clause). Hibernate detects that and an org.hibernate.StaleObjectStateException (wrapped in a javax.persistence.OptimisticLockException).

As a result, the second user cannot perform any updates unless he refreshes the view. For proper user experience we need some clean exception handling, but I’ll leave that out.

Configuration

There is little to customize here. The @Version property can be a number or a timestamp. Number is artificial, but typically occupies fewer bytes in memory and database. Timestamp is larger, but it always is updated to “current timestamp”, so you can actually use it to determine when the entity was updated.

Why?

So why would we use it?

  • It provides a convenient and automated way to maintain consistency in scenarios like those described above. It means that each action can only be performed once, and it guarantees that the user or server process saw up-to-date state while making a business decision.
  • It takes very little work to set up.
  • Thanks to its optimistic nature, it’s fast. There is no locking anywhere, only one more field added to the same queries.
  • In a way it guarantees repeatable reads even with read committed transaction isolation level. It would end with an exception, but at least it’s not possible to create inconsistent state.
  • It works well with very long conversations, including those that span multiple transactions.
  • It’s perfectly consistent in all possible scenarios and race conditions on ACID databases. The updates must be sequential, an update involves a row lock and the “second” one will always affect 0 rows and fail.

Demo

To demonstrate this, I created a very simple web application. It wires together Spring and Hibernate (behind JPA API), but it would work in other settings as well: Pure Hibernate (no JPA), JPA with different implementation, non-webapp, non-Spring etc.

The application keeps one Order with schema similar to above and shows it in a web form where you can update description and status. To experiment with concurrency control, open the page in two tabs, do different modifications and save. Try the same thing without @Version.

It uses an embedded database, so it needs minimal setup (only a web container) and only takes a restart to start with a fresh database.

It’s pretty simplistic – accesses EntityManager in a @Transactional @Controller and backs the form directly with JPA-mapped entity. May not be the best way to do things for less trivial projects, but at least it gathers all code in one place and is very easy to grasp.

Full source code as Eclipse project can be found at my GitHub repository.

IO vs. NIO – Interruptions, Timeouts and Buffers

Let’s imagine a system that sometimes needs to copy a file to a few locations, but in a way where responsiveness is critical. In other words, if for some reason a file system is overloaded and we are unable to write our file in less than a second, it should give up.

ExecutorService is a very convenient tool for the job. You can easily use it for executing several tasks in parallel (each writing to a different file system). Yuo also can tell it to give up after some timeout, and it will interrupt them for you. Perfect, just what we need.

The scaffolding looks like this:

void testCopy() throws Exception {
	ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors
			.newCachedThreadPool();
	final long start = System.currentTimeMillis();
	Callable<Object> task = new Callable<Object>() {
		@Override
		public Object call() throws Exception {
			try {
				copy("a.bin", "b.bin");
			} catch (Exception e) {
				e.printStackTrace();
			}
			System.out.println("Call really finished after: "
					+ (System.currentTimeMillis() - start));
			return null;
		}
	};
	Collection<Callable<Object>> taskWrapper = Arrays.asList(task);
	List<Future<Object>> futures = exec.invokeAll(taskWrapper, 50,
			TimeUnit.MILLISECONDS);
	System.out.println("invokeAll finished after: "
			+ (System.currentTimeMillis() - start));
	System.out.println("Future.isCancelled? "
			+ futures.get(0).isCancelled());
	Thread.sleep(20);
	System.out.println("Threads still active: " + exec.getActiveCount());
}

To simulate response to timeouts on a healthy system with low load, I use a 100 MB file and very short timeout. The task always times out, there is no way my system can copy 100 MB in 50 ms.

I expect the following results:

  1. invokeAll finished after about 50 ms.
  2. Future.isCancelled? is true.
  3. Active thread count is 0. The sleep is there to eliminate some edge cases. Long story short, it gives the copy function some time to detect the interruption.
  4. Call really finishes after about 50 ms. This is very important, I definitely do not want the IO operations to continue after the task is cancelled. Under higher load that would breed way too many threads stuck in bogus IO.

Just in case, those tests were run on the 1.6 JVM from Oracle on 64-bit Windows 7.

Solution 1: Stream Copy

The first attempt is probably the straightforward – use a loop with a buffer and classic IO, like this:

private void copy(String in, String out) throws Exception {
	FileInputStream fin = new FileInputStream(in);
	FileOutputStream fout = new FileOutputStream(out);

	byte[] buf = new byte[4096];
	int read;
	while ((read = fin.read(buf)) > -1) {
		fout.write(buf, 0, read);
	}

	fin.close();
	fout.close();
}

That’s what all popular stream copying libraries do, including IOUtils from Apache Commons and ByteStreams from Guava.

It also fails miserably:

invokeAll finished after: 53
Future.isCancelled? true
Threads still active: 1
Call really finished after: 338

The reason is fairly obvious: there is no check for thread interrupted status in the loop or anywhere, so the thread continues normally.

Solution 2: Stream Copy with Check for Interruption

Let’s fix that! One way to do it is:

while ((read = fin.read(buf)) > -1) {
	fout.write(buf, 0, read);
	if (Thread.interrupted()) {
		throw new IOException("Thread interrupted, cancelling");
	}
}

Now that works as expected, printing:

invokeAll finished after: 52
java.io.IOException: Thread interrupted, cancelling
	at TransferTest.copyInterruptingStream(TransferTest.java:75)
	at TransferTest.access$0(TransferTest.java:66)
	at TransferTest$1.call(TransferTest.java:25)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)Future.isCancelled? true
	at java.lang.Thread.run(Thread.java:662)

Call really finished after: 53
Threads still active: 0

Nice, but I find it unsatisfactory. It looks dirty and I’m not particularly happy with having this code around my IO lib. There must be a better way, which brings us to…

Solution 3: NIO with transfer

NIO has this nice feature that it actually respects thread interruptions. If you try to read from or write to a channel after the thread has been interrupted, you get a ClosedByInterruptException.

That’s just what I need. For some reason I also read this answer at StackOverflow, saying:

“Don’t use a buffer if you don’t need to. Why copy to memory if your target is another disk or a NIC? With larger files, the latency incured is non-trivial. (…) Use FileChannel.transferTo() or FileChannel.transferFrom(). The key advantage here is that the JVM uses the OS’s access to DMA (Direct Memory Access), if present. (This is implementation dependent, but modern Sun and IBM versions on general purpose CPUs are good to go.) What happens is the data goes straight to/from disc, to the bus, and then to the destination…by passing any circuit through RAM or the CPU.”

Great, let’s do it!

private void copy(String in, String out) throws Exception {
	FileChannel fin = new FileInputStream(in).getChannel();
	FileChannel fout = new FileOutputStream(out).getChannel();

	fout.transferFrom(fin, 0, new File(in).length());

	fin.close();
	fout.close();
}

Output:

invokeAll finished after: 52
Future.isCancelled? true
Threads still active: 1
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:304)
	at sun.nio.ch.FileChannelImpl.transferFrom(FileChannelImpl.java:587)
	at TransferTest.copyNioTransfer(TransferTest.java:91)
	at TransferTest.access$0(TransferTest.java:87)
	at TransferTest$1.call(TransferTest.java:27)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
Call really finished after: 146

All I do is a trivial call to transferFrom. It’s very concise, and promises so much support from hardware and OS… But wait a moment, why did it take 146 ms? I mean, 146 milliseconds is much faster than 338 ms in the first test, but I expected it to terminate after around 50 ms.

Let’s repeat the test on a bigger file, something around 1.5 GB:

invokeAll finished after: 9012
Future.isCancelled? true
Threads still active: 1
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	(...)
Call really finished after: 9170

How awful is that? This is probably the worst thing that could happen:

  • The task was not interrupted in a timely manner. 9 seconds is way too long, I expected around 50 millis.
  • invokeAll was blocked for the entire time of the operation – 9 seconds. What the hell?

Solution 4 – NIO with Buffering

It turns out I do need some buffering. Let’s try with this one:

private void copyNioBuffered(String in, String out) throws Exception {
	FileChannel fin = new FileInputStream(in).getChannel();
	FileChannel fout = new FileOutputStream(out).getChannel();

	ByteBuffer buff = ByteBuffer.allocate(4096);
	while (fin.read(buff) != -1 || buff.position() > 0) {
		buff.flip();
		fout.write(buff);
		buff.compact();
	}

	fin.close();
	fout.close();
}

Output:

invokeAll finished after: 52
Future.isCancelled? true
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:203)
	at TransferTest.copyNioBuffered(TransferTest.java:105)
	at TransferTest.access$0(TransferTest.java:98)
	at TransferTest$1.call(TransferTest.java:29)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
Call really finished after: 55
Threads still active: 0

Now that’s exactly what I needed. It respects interruptions by itself, so I don’t need those tedious checks all over my IO utility.

Quirks: Different types of channels

If my IO utility is only used for copying files that it gets by name, like this:

static public void copy(String source, String destination)

… then it’s fairly easy to rewrite the method for NIO.

But what if it’s a more generic signature that operates on streams?

static public void copy(InputStream source, OutputStream destination)

NIO has a little Channels utility with very useful methods like:

public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)

So it almost seems like we could wrap our streams using this helper and benefit from interruptible NIO API. Until we look at the source:

public static WritableByteChannel newChannel(final OutputStream out) {
	if (out == null) {
	    throw new NullPointerException();
	}

	if (out instanceof FileOutputStream &&
		FileOutputStream.class.equals(out.getClass())) {
		return ((FileOutputStream)out).getChannel();
	}

	return new WritableByteChannelImpl(out);
}

private static class WritableByteChannelImpl
	extends AbstractInterruptibleChannel	// Not really interruptible
	implements WritableByteChannel
{
// ... Ignores interrupts completely

Watch out! If your streams are file streams, they will be interruptible. Otherwise you’re out of luck – it’s just a dumb wrapper, more like an adapter for API compatibility. Assumptions kill, always check the source.

“Programming Concurrency on the JVM”

A few years ago when I took concurrency classes pretty much everything I was told was that in java synchronized is key. That’s the way to go, whenever you have multithreading you have to do it this way, period. I also spent quite a while solving many classic and less classic concurrency problems using only this construct, reimplementing more fancy locks using only this construct, preventing deadlocks, starvation and everything.

Later in my career I learned that is not the only way to go, or at least there are those fancy java.util.concurrent classes that take care of some stuff for you. That was nice, but apparently I never took enough time to actually stop and think how those things work, what they solve and why.

The light dawned when I started reading Programming Concurrency on the JVM: Mastering Synchronization, STM, and Actors by Venkat Subramaniam.

The book starts with a brief introduction on why concurrency is important today with its powers and perils. It quickly moves on to a few examples of different problems: IO-intensive task like calculating size of a large directory, and computationally intensive task of calculating prime numbers. Once the ground is set, it introduces three approaches to concurrent programming.

The first way to do it is what I summed up in the first paragraph, and what Venkat calls the “synchronize and suffer” model. Been there, done that, we know how bad it can get. This approach is called shared mutability, where different threads mutate shared state concurrently. It may be tamed (and a few ways to do it are shown in the book), but is a lot harder than it seems.

Another approach is isolated mutability, where each mutable part of state is only accessed by one thread. Usually this is the actor based concurrency model. The third way is pure immutability where there simply is no mutable state. That is typical for functional programming.

In the following chapters the book explores each of those areas in depth. It briefly explains the Java memory model nad shows what options for dealing with shared mutability and coordinating threads exist in core Java. It clearly states why the features from Java 5 are superior to the classic “synchronize and suffer” and describes locks, concurrent collections, executors, atomic references etc. in more detail. This is what most of us typically deal with in our daily Java programming, and the book is a great introduction to those modern (if old, in a way) APIs.

That’s about one third of the book. The rest is devoted to much more interesting, intriguing and powerful tools: software transactional memory and actors.

Sometimes we have to deal with shared mutability, and very often we need to coordinate many threads accessing many variables. The classic synchronization tools don’t have proper support for it: Rolling back changes and preventing one thread from seeing uncommited changes of another is difficult, and most likely they lead to coarse-grained locks which basically lock everything while a thread is mutating something.

We know how relational databases deal with it with their ACID transactional model. Software transactional memory is just that but applied to memory, with proper atomicity, consistency and isolation of transactions. If one thread mutates a transactional reference in transaction, another will not see it until that transaction is committed. There is no need for any explicit locks as the libraries (like Akka or Clojure) monitor what variables you access and mutate in transaction and apply locking automatically. They even can rollback and retry the transaction for you.

Another approach is isolated mutability, a.k.a. actors, best demonstated on Akka. Each actor runs in a single thread and all it can do is receive or pass messages. This is probably closest to the original concept of object-oriented programming (recommended reading by Michael Feathers). You have isolated cells that pass messages to each other, and that’s it. When you have a task to execute, you spawn actors and dispatch it to them as immutable messages. When they’re done, they can call you back by passing another message (if the coordinator is also an actor), or if you’re not that pure you can wait for the result. Either way, eveything is neatly isolated in scope of a single thread.

Lengthy as this summary/review is, it really does not do justice to the book. The book itself is dense with valuable information and practical examples, which are as close to perfection as possible: There are a few recurring problems which are fairly simple and easy to grasp, solved over and over again with different techniques and different languages. There are many examples in Java, Scala, Groovy, Clojure and JRuby, dealing with libraries such as the core Java API, Clojure, Akka, GPars… In a few words, a ton of useful stuff.

Last but not the least, it’s excellently written. If anyone has seen Venkat in real life, this book is all like him – entertaining, but also thought-provoking, challenging and inspiring. It reads like a novel (if not better than some of them) and is very hard to put down until you’re done.

Highly recommended.