One of features I love the most about Java 8 is the streams API. It finally eliminates pretty much all loops from the code and lets you write code that is so much more expressive and focused.
Today I realized it can be used for something else: As a nice front-end for the ForkJoinPool
.
Problem: Executors Boilerplate
Let’s say we want to run a number of tasks in parallel. Nothing fancy, let’s say each of them just prints out the name of the executing thread (so we can see it run in parallel). We want to resume execution after they’re all done.
If you want to run a bunch of tasks in parallel using an ExecutorService
, you probably need to do something like the following:
ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { executor.submit(() -> System.out.println(Thread.currentThread())); } executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException ex) { // TODO handle... }
Now, that is a lot of code! But we can do better.
Solution: Stream API
In the end I came up with this utility:
void doInParallelNTimes(int times, Runnable op) { IntStream.range(0, times).parallel().forEach(i -> op.run()); }
Reusable and all. Call it like:
doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));
Done.
This one prints out the following. Note that it’s actually using the main thread as well – since it’s held hostage anyway and cannot resume until execution finishes.
Thread[main,5,main] Thread[ForkJoinPool.commonPool-worker-1,5,main] Thread[main,5,main] Thread[ForkJoinPool.commonPool-worker-3,5,main] Thread[ForkJoinPool.commonPool-worker-2,5,main]
Another Example: Parallel Computation
Here’s another example. Instead of doing the same thing N times, we can use the stream API to process a number of different tasks in parallel. We can create (“seed”) a stream with any collection or set of values, have a function executed on them in parallel, and finally aggregate the results (collect to a collection, reduce to a single value etc.)
Let’s see how we could calculate a sum of the first 45 Fibonacci numbers:
public class Tester { public static void main(String[] args) { Stopwatch stopwatch = Stopwatch.createStarted(); IntStream.range(1, 45).parallel().map(Tester::fib).sum(); System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms"); stopwatch.reset(); stopwatch.start(); IntStream.range(1, 45).map(Tester::fib).sum(); System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms"); } private static int fib(int n) { if (n == 1 || n == 2) { return 1; } else { return fib(n - 1) + fib(n - 2); } } }
Prints out:
Parallel took 3078 ms Sequential took 7327 ms
It achieves a lot in a single line of code. First it creates a stream with descriptions of all the tasks that we want to run in parallel. Then it calls a function on all of them in parallel. Finally it returns the sum of all these results.
It’s not all that contrived. I can easily imagine creating a stream with arbitrary values (including rich Java objects) and executing a nontrivial operation on them. It doesn’t matter, orchestrating all that would still look the same.
When to do it?
I think this solution is pretty good for all the cases when you know the load upfront, and you want to fork execution to multiple threads and resume after they’re all done. I needed this for some test code, but it would probably work well in many other fork/join or divide-and-conquer scenarios.
Obviously it does not work if you want to run something in background and resume execution or if you want to have a background executor running over a long period of time.
Thank you for this post!
A problem with this solution is the use of the common thread pool.
If you have long running tasks, you might easily block your whole application.
AFAIK the only possibility to run parallel streams on a custom thread pool is by executing the operations within a ForkJoinPool:
ForkJoinPool forkJoinPool = new ForkJoinPool(times);
forkJoinPool.submit(() ->
IntStream.range(0, times).parallel().forEach(i -> op.run());
).get();
Unfortunately, the code is not as neat and compact as before.