Tag Archives: Java

Walking Recursive Data Structures Using Java 8 Streams

The Streams API is a real gem in Java 8, and I keep finding more or less unexpected uses for them. I recently wrote about using them as ForkJoinPool facade. Here’s another interesting example: Walking recursive data structures.

Without much ado, have a look at the code:

class Tree {
    private int value;
    private List<Tree> children = new LinkedList<>();

    public Tree(int value, List<Tree> children) {
        super();
        this.value = value;
        this.children.addAll(children);
    }

    public Tree(int value, Tree... children) {
        this(value, asList(children));
    }

    public int getValue() {
        return value;
    }

    public List<Tree> getChildren() {
        return Collections.unmodifiableList(children);
    }

    public Stream<Tree> flattened() {
        return Stream.concat(
                Stream.of(this),
                children.stream().flatMap(Tree::flattened));
    }
}

It’s pretty boring, except for the few highlighted lines.

Let’s say we want to be able to find elements matching some criteria in the tree or find particular element. One typical way to do it is a recursive function – but that has some complexity and is likely to need a mutable argument (e.g. a set where you can append matching elements). Another approach is iteration with a stack or a queue. They work fine, but take a few lines of code and aren’t so easy to generalize.

Here’s what we can do with this flattened function:

// Get all values in the tree:
t.flattened().map(Tree::getValue).collect(toList());

// Get even values:
t.flattened().map(Tree::getValue).filter(v -> v % 2 == 0).collect(toList());

// Sum of even values:
t.flattened().map(Tree::getValue).filter(v -> v % 2 == 0).reduce((a, b) -> a + b);

// Does it contain 13?
t.flattened().anyMatch(t -> t.getValue() == 13);

I think this solution is pretty slick and versatile. One line of code (here split to 3 for readability on blog) is enough to flatten the tree to a straightforward stream that can be searched, filtered and whatnot.

It’s not perfect though: It is not lazy and flattened is called for each and every node in the tree every time. It probably could be improved using a Supplier. Anyway, it doesn’t matter for typical, reasonably small trees, especially in a business application on a very tall stack of libraries. But for very large trees, very frequent execution and tight time constraints the overhead might cause some trouble.

Java 8 Streams API as Friendly ForkJoinPool Facade

One of features I love the most about Java 8 is the streams API. It finally eliminates pretty much all loops from the code and lets you write code that is so much more expressive and focused.

Today I realized it can be used for something else: As a nice front-end for the ForkJoinPool.

Problem: Executors Boilerplate

Let’s say we want to run a number of tasks in parallel. Nothing fancy, let’s say each of them just prints out the name of the executing thread (so we can see it run in parallel). We want to resume execution after they’re all done.

If you want to run a bunch of tasks in parallel using an ExecutorService, you probably need to do something like the following:

ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
    executor.submit(() -> System.out.println(Thread.currentThread()));
}
executor.shutdown();
try {
    executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
    // TODO handle...
}

Now, that is a lot of code! But we can do better.

Solution: Stream API

In the end I came up with this utility:

void doInParallelNTimes(int times, Runnable op) {
    IntStream.range(0, times).parallel().forEach(i -> op.run());
}

Reusable and all. Call it like:

doInParallelNTimes(5, () -> System.out.println(Thread.currentThread()));

Done.

This one prints out the following. Note that it’s actually using the main thread as well – since it’s held hostage anyway and cannot resume until execution finishes.

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]

Another Example: Parallel Computation

Here’s another example. Instead of doing the same thing N times, we can use the stream API to process a number of different tasks in parallel. We can create (“seed”) a stream with any collection or set of values, have a function executed on them in parallel, and finally aggregate the results (collect to a collection, reduce to a single value etc.)

Let’s see how we could calculate a sum of the first 45 Fibonacci numbers:

public class Tester {
    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        IntStream.range(1, 45).parallel().map(Tester::fib).sum();
        System.out.println("Parallel took " + stopwatch.elapsed(MILLISECONDS) + " ms");

        stopwatch.reset();
        stopwatch.start();
        IntStream.range(1, 45).map(Tester::fib).sum();
        System.out.println("Sequential took " + stopwatch.elapsed(MILLISECONDS) + " ms");
    }

    private static int fib(int n) {
        if (n == 1 || n == 2) {
            return 1;
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Prints out:

Parallel took 3078 ms
Sequential took 7327 ms

It achieves a lot in a single line of code. First it creates a stream with descriptions of all the tasks that we want to run in parallel. Then it calls a function on all of them in parallel. Finally it returns the sum of all these results.

It’s not all that contrived. I can easily imagine creating a stream with arbitrary values (including rich Java objects) and executing a nontrivial operation on them. It doesn’t matter, orchestrating all that would still look the same.

When to do it?

I think this solution is pretty good for all the cases when you know the load upfront, and you want to fork execution to multiple threads and resume after they’re all done. I needed this for some test code, but it would probably work well in many other fork/join or divide-and-conquer scenarios.

Obviously it does not work if you want to run something in background and resume execution or if you want to have a background executor running over a long period of time.

Careful With Native SQL in Hibernate

I really like Hibernate, but I also don’t know a tool that would be nearly as powerful and deceptive at the same time. I could write a book on surprises in production and cargo cult programming related to Hibernate alone. It’s more of an issue with the users than with the tool, but let’s not get too ranty.

So, here’s a recent example.

Problem

We need a background job that lists all files in a directory and inserts an entry for each of them to a table.

Naive Solution

The job used to be written in Bash and there is some direct SQL reading from the table. So, blinders on and let’s write some direct SQL!

for (String fileName : folder.list()) {
    SQLQuery sql = session.getDelegate().createSQLQuery(
        "insert into dir_contents values (?)");
    sql.setString(0, fileName);
    sql.executeUpdate();
}

Does it work? Sure it does.

Now, what happens if there are 10,000 files in the folder? What if you also have a not so elegant domain model, with way too many entity classes, thousands of instances and two levels of cache all in one context?

All of a sudden this trivial job takes 10 minutes to execute, all that time keeping 2 or 3 CPUs busy at 100%.

What, for just a bunch of inserts?

Easy Fix

The problem is that it’s Hibernate. It’s not just a dumb JDBC wrapper, but it has a lot more going on. It’s trying to keep caches and session state up to date. If you run a bare SQL update, it has no idea what table(s) you are updating, what it depends on and how it affects everything, so just in case it pretty much flushes everything.

If you do this 10,000 times in such a crowded environment, it adds up.

Here’s one way to fix it – rather than running 10,000 updates with flushes, execute everything in one block and flush once.

session.doWork(new Work() {
    public void execute(Connection connection) throws SQLException {
        PreparedStatement ps = connection
                .prepareStatement("insert into dir_contents values (?)");
        for (String fileName : folder.list()) {
            ps.setString(1, fileName);
            ps.executeUpdate();
        }
    }
});

Other Solutions

Surprise, surprise:

  • Do use Hibernate. Create a real entity to represent DirContents and just use it like everything else. Then Hibernate knows what caches to flush when, how to batch updates and so on.
  • Don’t use Hibernate. Use plain old JDBC, MyBatis, or whatever else suits your stack or is there already.

Takeaway

Native SQL has its place, even if this example is not the best use case. Anyway, the point is: If you are using native SQL with Hibernate, mind the session state and caches.

“RESTful Java with JAX-RS 2.0, 2nd Edition” by Bill Burke; O’Reilly Media

RESTful Java with JAX-RS 2.0

REST is all the rage now (not without a reason), and in the Java world the standard API for that is JAX-RS (under the JEE umbrella). “RESTful Java with JAX-RS 2.0″ is the second edition of Bill Burke’s book on the JAX-RS API. Bill Burke is the creator of RESTEasy and a member of the committee that designed JAX-RS.

The book is divided into two parts, over a dozen short chapters in each. The first part includes a very nice introduction to REST, has a great systematic reference over the API and finally a few words on integration with various frameworks, security, caching etc. The second chapter is basically a workbook – there is downloadable code with a few examples for each chapter, and these chapters basically are a detailed walk through.

The author is careful not to get ahead of himself and starts quite slow, introducing the more advanced, automated and magical features step by step. As a result, the book is a great introduction for complete newcomers. But it doesn’t stop there – it discusses all the more advanced features as well (later), with the same depth and clarity. That makes it a great reference and a cookbook that you’re likely to get back to as use the API in your work.

Everything is well thought out and executed. It’s a very easy read, with each chapter stating the problem that it’s trying to solve, following up with presentation of the relevant part of the API and a number of practical examples. If for some reason you need more, you’re free to explore the workbook or complete running code.

Highly recommended.

Note on edition: I read it on Kindle, no issues at all.

Version-Based Optimistic Concurrency Control in JPA/Hibernate

This article is an introduction to version-based optimistic concurrency control in Hibernate and JPA. The concept is fairly old and much has been written on it, but anyway I have seen it reinvented, misunderstood and misused. I’m writing it just to spread knowledge and hopefully spark interest in the subject of concurrency control and locking.

Use Cases

Let’s say we have a system used by multiple users, where each entity can be modified by more than one user. We want to prevent situations where two persons load some information, make some decision based on what they see, and update the state at the same time. We don’t want to lose changes made by the user who first clicked “save” by overwriting them in the following transaction.

It can also happen in server environment – multiple transactions can modify a shared entity, and we want to prevent scenarios like this:

  1. Transaction 1 loads data
  2. Transaction 2 updates that data and commits
  3. Using state loaded in step 1 (which is no longer current), transaction 1 performs some calculations and update the state

In some ways it’s comparable to non-repeatable reads.

Solution: Versioning

Hibernate and JPA implement the concept of version-based concurrency control for this reason. Here’s how it works.

You can mark a simple property with @Version or <version> (numeric or timestamp). It’s going to be a special column in database. Our mapping can look like:

@Entity
@Table(name = "orders")
public class Order {
	@Id
	private long id;

	@Version
	private int version;

	private String description;

	private String status;

	// ... mutators
}

When such an entity is persisted, the version property is set to a starting value.

Whenever it’s updated, Hibernate executes query like:

update orders
set description=?, status=?, version=? 
where id=? and version=?

Note that in the last line, the WHERE clause now includes version. This value is always set to the “old” value, so that it only will update a row if it has the expected version.

Let’s say two users load an order at version 1 and take a while looking at it in the GUI.

Anne decides to approve the order and executes such action. Status is updated in database, everything works as expected. Versions passed to update statement look like:

update orders
set description=?, status=?, version=2
where id=? and version=1

As you can see, while persisting that update the persistence layer increments the version counter to 2.

In her GUI, Betty still has the old version (number 1). When she decides to perform an update on the order, the statement looks like:

update orders
set description=?, status=?, version=2
where id=? and version=1

At this point, after Anne’s update, the row’s version in database is 2. So this second update affects 0 rows (nothing matches the WHERE clause). Hibernate detects that and an org.hibernate.StaleObjectStateException (wrapped in a javax.persistence.OptimisticLockException).

As a result, the second user cannot perform any updates unless he refreshes the view. For proper user experience we need some clean exception handling, but I’ll leave that out.

Configuration

There is little to customize here. The @Version property can be a number or a timestamp. Number is artificial, but typically occupies fewer bytes in memory and database. Timestamp is larger, but it always is updated to “current timestamp”, so you can actually use it to determine when the entity was updated.

Why?

So why would we use it?

  • It provides a convenient and automated way to maintain consistency in scenarios like those described above. It means that each action can only be performed once, and it guarantees that the user or server process saw up-to-date state while making a business decision.
  • It takes very little work to set up.
  • Thanks to its optimistic nature, it’s fast. There is no locking anywhere, only one more field added to the same queries.
  • In a way it guarantees repeatable reads even with read committed transaction isolation level. It would end with an exception, but at least it’s not possible to create inconsistent state.
  • It works well with very long conversations, including those that span multiple transactions.
  • It’s perfectly consistent in all possible scenarios and race conditions on ACID databases. The updates must be sequential, an update involves a row lock and the “second” one will always affect 0 rows and fail.

Demo

To demonstrate this, I created a very simple web application. It wires together Spring and Hibernate (behind JPA API), but it would work in other settings as well: Pure Hibernate (no JPA), JPA with different implementation, non-webapp, non-Spring etc.

The application keeps one Order with schema similar to above and shows it in a web form where you can update description and status. To experiment with concurrency control, open the page in two tabs, do different modifications and save. Try the same thing without @Version.

It uses an embedded database, so it needs minimal setup (only a web container) and only takes a restart to start with a fresh database.

It’s pretty simplistic – accesses EntityManager in a @Transactional @Controller and backs the form directly with JPA-mapped entity. May not be the best way to do things for less trivial projects, but at least it gathers all code in one place and is very easy to grasp.

Full source code as Eclipse project can be found at my GitHub repository.

Spring: @EnableWebMvc and JSR-303

I’ve been happily using XML-free Spring with Web MVC, right until the moment when I wanted to plug in JSR-303 validation.

Failure

I imported validation-api and hibernate-validator to my project. I annotated code for my command:

public class SpendingCommand {
@Size(min=3)
private String category;
// ...
}

… and controller:

@Controller
public class SpendingEditionController {

@RequestMapping(value = "/spending_insert", method = RequestMethod.POST)
public String addSpending(@Valid SpendingCommand spending, 
		BindingResult result, ModelMap model) {
	return "my_view";
}

// ...
}

I plugged it in to form:

#springBind("command.$field")
<label for="$field" class="control-label">${label}:</label>
<div class="controls">
	<input type="text" name="${status.expression}" value="$!{status.value}" />
	$!{status.errorMessage}
</div>

… and nothing happened.

I looked for errors in BindingResult in my controller, and nothing was there. Clearly validation was not working at all.

Almost There: @Valid Working

I read a ton of tutorials, and they did not mention any specific black magic. After a long while of doc reading, random trying and debugging, I found this StackOverflow answer. Skaffman said that <mvc:annotation-driven /> was “rather pointless” so “don’t bother”. Luckily I read comments to that answer as well and discovered that this is actually crucial for all the new goodies in Spring Web MVC, including conversions and validation.

I added annotation equivalent of mvc:annotation-driven to my view configuration:

@Configuration
@ComponentScan(basePackages = "pl.squirrel.money.web")
@EnableWebMvc
public class ViewConfig

When I tested my code again, I did see errors in BindingResult in my controller, so finally validation was working. Unfortunately, the web page still did not show the message. Do you know why?

Bindings and Naming Conventions

It took me even longer to figure this one out. I even began to suspect my custom view for Velocity Tools & Tiles.

Finally in debug I noticed I had my command bound twice in page context: as command and as spendingCommand. I had two bindings for BindingResult as well, but with two different instances! One was org.springframework.validation.BindingResult.command, with zero errors, and another was org.springframework.validation.BindingResult.spendingCommand, containing all errors as expected.

In a word, mess. To clean this up, I had to explicitly name my command like this:

@RequestMapping(value = "/spending_insert", method = RequestMethod.POST)
public String addSpending(@ModelAttribute("command") @Valid SpendingCommand spending,
		BindingResult result, ModelMap model) {
	return "my_view";
}

Now I only have one instance of everything, and everything is working as expected. And they lived happily ever after.

Quirks

In the end, I find it interesting (in a bad sense) that it works like this. I think it’s a bug that the same command is bound under two different names, but it’s quite the opposite for BindingResult.

To test it, I attempted to edit this SpendingCommand in controller by overwriting value of a field. At this point I knew what would happen: My web page showed overwritten value in form (because Spring was still able to match the command with different name), but no validation errors (because there are two different instances of BindingResult.

Spring & Velocity Tools (No XML)

A few months ago I wrote about integrating Spring, Velocity and Tiles. I discovered that one bit was missing from there: Velocity Tools. Two hours of yak shaving, frantic googling and source reading later, I figured out how to add support for Velocity Tools to such project with no XML configuration. Here’s how.

For starters, let’s say I want to use some tools in my Velocity and Tiles pages. Let’s add the LinkTool.

template.vm:

<html>
	<head><title>#tiles_insertAttribute({"name":"title"})#end</title></head>
	<body>
		#tiles_insertAttribute({"name":"body"})#end
		<p>Spring macros work in tiles template, too: #springUrl("/myUrl")</p>
		<p>Do Velocity tools work in template? $link.contextPath</p>
	</body>
</html>

body.vm:

<p>Here's a demonstration that Spring macros work with Tiles: #springUrl("/myUrl")</p>
<p>Do Velocity tools work in Tile? $link.contextPath</p>

When I render the code from previous post, I get this:

Here's a demonstration that Spring macros work with Tiles: /SpringVelocityTiles/myUrl

Do Velocity tools work in Tile? $link.contextPath

Spring macros work in tiles template, too: /SpringVelocityTiles/myUrl

Do Velocity tools work in template? $link.contextPath

Not good.

After some googling, I found a similar question on StackOverflow. It had two helpful answers – one from serg, delegating to this blog post, and another from Scott.

None of them worked out of the box, though. I’m tired of XML configs, and apparently it’s too easy to get weird exceptions related to some Struts tools. No wonder I get them, I don’t use Struts and don’t want any of its tools!

Apparently the issue is that Spring support for Velocity Tools is rubbish. One way out is to write your own ViewResolver or View, and that’s what I did in the end.

For starters, I’ll configure my ViewResolver to use a new view class:

@Bean
public ViewResolver viewResolver() {
	VelocityViewResolver resolver = new VelocityViewResolver();
	resolver.setViewClass(MyVelocityToolboxView.class);
	resolver.setSuffix(".vm");
	return resolver;
}

MyVelocityToolboxView is below. This time I’m pasting it with imports to avoid ambiguity on names like Context or VelocityView.

package pl.squirrel.svt;

import java.util.Map;
import java.util.Set;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.velocity.context.Context;
import org.apache.velocity.tools.Scope;
import org.apache.velocity.tools.ToolboxFactory;
import org.apache.velocity.tools.config.ConfigurationUtils;
import org.apache.velocity.tools.view.ViewToolContext;
import org.springframework.web.servlet.view.velocity.VelocityView;

public class MyVelocityToolboxView extends VelocityView {
	@Override
	protected Context createVelocityContext(Map<String, Object> model,
			HttpServletRequest request, HttpServletResponse response) {
		ViewToolContext context = new ViewToolContext(getVelocityEngine(),
				request, response, getServletContext());
		
		ToolboxFactory factory = new ToolboxFactory();
		factory.configure(ConfigurationUtils.getVelocityView());
		
		for (String scope : Scope.values()) {
			context.addToolbox(factory.createToolbox(scope));
		}

		if (model != null) {
			for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) model
					.entrySet()) {
				context.put(entry.getKey(), entry.getValue());
			}
		}
		return context;
	}
}

It’s important that we only use ConfigurationUtils.getVelocityView() – it includes generic tools and view tools, but not Struts tools.

That’s it, now we have a project which uses Tiles for high-level templating, Velocity for individual pages and details, with (hopefully) full support for Spring macros and Velocity tools in all areas. Even if you don’t like Tiles, it may still serve as a good example of how to integrate Spring and Velocity Tools.

I pushed updated code for the demo application to my GitHub repository.

In the irregular habit of posting a sermon on Friday… Less than a week ago I saw an excellent presentation on using Clojure for Spring views. Compared to all this mess and yak shaving here, that Clojure solution is infinitely simpler, more elegant and more powerful at the same time. Too bad it does not have the market share of Spring & Velocity yet.

Testing Spring & Hibernate Without XML

I’m very keen on the improvements in Spring 3 that eventually let you move away from XML into plain Java configuration with proper support from IDE and compiler. It doesn’t change the fact that Spring is a huge suite and it sometimes finding the thing you need can take a while.

XML-free unit tests around Hibernate are one such thing. I knew it was possible, but it took me more than 5 minutes to find all the pieces, so here I am writing it down.

I am going to initialize all my beans in a @Configuration class like this:

@Configuration
@EnableTransactionManagement
public class TestRepositoryConfig {
	@Bean
	public DataSource dataSource() {
		return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2)
				.setName("Nuts").build();
	}

	@Bean
	public LocalSessionFactoryBean sessionFactoryBean() {
		LocalSessionFactoryBean result = new LocalSessionFactoryBean();
		result.setDataSource(dataSource());
		result.setPackagesToScan(new String[] { "pl.squirrel.testnoxml.entity" });

		Properties properties = new Properties();
		properties.setProperty("hibernate.hbm2ddl.auto", "create-drop");
		result.setHibernateProperties(properties);
		return result;
	}

	@Bean
	public SessionFactory sessionFactory() {
		return sessionFactoryBean().getObject();
	}

	@Bean
	public HibernateTransactionManager transactionManager() {
		HibernateTransactionManager man = new HibernateTransactionManager();
		man.setSessionFactory(sessionFactory());
		return man;
	}

	@Bean
	public OrderRepository orderRepo() {
		return new OrderRepository();
	}
}

… and my test can look like this:

@RunWith(SpringJUnit4ClassRunner.class)
@TransactionConfiguration(defaultRollback = true)
@ContextConfiguration(classes = { TestRepositoryConfig.class })
@Transactional
public class OrderRepositoryTest {
	@Autowired
	private OrderRepository repo;

	@Autowired
	private SessionFactory sessionFactory;

	@Test
	public void testPersistOrderWithItems() {
		Session s = sessionFactory.getCurrentSession();

		Product chestnut = new Product("Chestnut", "2.50");
		s.save(chestnut);
		Product hazelnut = new Product("Hazelnut", "5.59");
		s.save(hazelnut);

		Order order = new Order();
		order.addLine(chestnut, 20);
		order.addLine(hazelnut, 150);

		repo.saveOrder(order);
		s.flush();

		Order persistent = (Order) s.createCriteria(Order.class).uniqueResult();
		Assert.assertNotSame(0, persistent.getId());
		Assert.assertEquals(new OrderLine(chestnut, 20), persistent
				.getOrderLines().get(0));
		Assert.assertEquals(new OrderLine(hazelnut, 150), persistent
				.getOrderLines().get(1));
	}
}

There are a few details worth noting here, though:

  1. I marked the test @Transactional, so that I can access Session directly. In this scenario, @EnableTransactionManagement on @Configuration seems to have no effect as the test is wrapped in transaction anyway.
  2. If the test is not marked as @Transactional (sensible when it only uses @Transactional components), the transaction seems to always be committed regardless of @TransactionConfiguration settings.
  3. If the test is marked as @Transactional, @TransactionConfiguration seems to be applied by default. Even if it’s omitted the transaction will be rolled back at the end of the test, and if you want it committed you need @TransactionConfiguration(defaultRollback=false).
  4. This probably goes without saying, but the @Configuration for tests is probably different from production. Here it uses embedded H2 database, for real application I would use a test database on the same engine as production.

That’s it, just those two Java classes. No XML or twisted depedencies. Take a look at my github repository for complete code.

IO vs. NIO – Interruptions, Timeouts and Buffers

Let’s imagine a system that sometimes needs to copy a file to a few locations, but in a way where responsiveness is critical. In other words, if for some reason a file system is overloaded and we are unable to write our file in less than a second, it should give up.

ExecutorService is a very convenient tool for the job. You can easily use it for executing several tasks in parallel (each writing to a different file system). Yuo also can tell it to give up after some timeout, and it will interrupt them for you. Perfect, just what we need.

The scaffolding looks like this:

void testCopy() throws Exception {
	ThreadPoolExecutor exec = (ThreadPoolExecutor) Executors
			.newCachedThreadPool();
	final long start = System.currentTimeMillis();
	Callable<Object> task = new Callable<Object>() {
		@Override
		public Object call() throws Exception {
			try {
				copy("a.bin", "b.bin");
			} catch (Exception e) {
				e.printStackTrace();
			}
			System.out.println("Call really finished after: "
					+ (System.currentTimeMillis() - start));
			return null;
		}
	};
	Collection<Callable<Object>> taskWrapper = Arrays.asList(task);
	List<Future<Object>> futures = exec.invokeAll(taskWrapper, 50,
			TimeUnit.MILLISECONDS);
	System.out.println("invokeAll finished after: "
			+ (System.currentTimeMillis() - start));
	System.out.println("Future.isCancelled? "
			+ futures.get(0).isCancelled());
	Thread.sleep(20);
	System.out.println("Threads still active: " + exec.getActiveCount());
}

To simulate response to timeouts on a healthy system with low load, I use a 100 MB file and very short timeout. The task always times out, there is no way my system can copy 100 MB in 50 ms.

I expect the following results:

  1. invokeAll finished after about 50 ms.
  2. Future.isCancelled? is true.
  3. Active thread count is 0. The sleep is there to eliminate some edge cases. Long story short, it gives the copy function some time to detect the interruption.
  4. Call really finishes after about 50 ms. This is very important, I definitely do not want the IO operations to continue after the task is cancelled. Under higher load that would breed way too many threads stuck in bogus IO.

Just in case, those tests were run on the 1.6 JVM from Oracle on 64-bit Windows 7.

Solution 1: Stream Copy

The first attempt is probably the straightforward – use a loop with a buffer and classic IO, like this:

private void copy(String in, String out) throws Exception {
	FileInputStream fin = new FileInputStream(in);
	FileOutputStream fout = new FileOutputStream(out);

	byte[] buf = new byte[4096];
	int read;
	while ((read = fin.read(buf)) > -1) {
		fout.write(buf, 0, read);
	}

	fin.close();
	fout.close();
}

That’s what all popular stream copying libraries do, including IOUtils from Apache Commons and ByteStreams from Guava.

It also fails miserably:

invokeAll finished after: 53
Future.isCancelled? true
Threads still active: 1
Call really finished after: 338

The reason is fairly obvious: there is no check for thread interrupted status in the loop or anywhere, so the thread continues normally.

Solution 2: Stream Copy with Check for Interruption

Let’s fix that! One way to do it is:

while ((read = fin.read(buf)) > -1) {
	fout.write(buf, 0, read);
	if (Thread.interrupted()) {
		throw new IOException("Thread interrupted, cancelling");
	}
}

Now that works as expected, printing:

invokeAll finished after: 52
java.io.IOException: Thread interrupted, cancelling
	at TransferTest.copyInterruptingStream(TransferTest.java:75)
	at TransferTest.access$0(TransferTest.java:66)
	at TransferTest$1.call(TransferTest.java:25)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)Future.isCancelled? true
	at java.lang.Thread.run(Thread.java:662)

Call really finished after: 53
Threads still active: 0

Nice, but I find it unsatisfactory. It looks dirty and I’m not particularly happy with having this code around my IO lib. There must be a better way, which brings us to…

Solution 3: NIO with transfer

NIO has this nice feature that it actually respects thread interruptions. If you try to read from or write to a channel after the thread has been interrupted, you get a ClosedByInterruptException.

That’s just what I need. For some reason I also read this answer at StackOverflow, saying:

“Don’t use a buffer if you don’t need to. Why copy to memory if your target is another disk or a NIC? With larger files, the latency incured is non-trivial. (…) Use FileChannel.transferTo() or FileChannel.transferFrom(). The key advantage here is that the JVM uses the OS’s access to DMA (Direct Memory Access), if present. (This is implementation dependent, but modern Sun and IBM versions on general purpose CPUs are good to go.) What happens is the data goes straight to/from disc, to the bus, and then to the destination…by passing any circuit through RAM or the CPU.”

Great, let’s do it!

private void copy(String in, String out) throws Exception {
	FileChannel fin = new FileInputStream(in).getChannel();
	FileChannel fout = new FileOutputStream(out).getChannel();

	fout.transferFrom(fin, 0, new File(in).length());

	fin.close();
	fout.close();
}

Output:

invokeAll finished after: 52
Future.isCancelled? true
Threads still active: 1
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:304)
	at sun.nio.ch.FileChannelImpl.transferFrom(FileChannelImpl.java:587)
	at TransferTest.copyNioTransfer(TransferTest.java:91)
	at TransferTest.access$0(TransferTest.java:87)
	at TransferTest$1.call(TransferTest.java:27)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
Call really finished after: 146

All I do is a trivial call to transferFrom. It’s very concise, and promises so much support from hardware and OS… But wait a moment, why did it take 146 ms? I mean, 146 milliseconds is much faster than 338 ms in the first test, but I expected it to terminate after around 50 ms.

Let’s repeat the test on a bigger file, something around 1.5 GB:

invokeAll finished after: 9012
Future.isCancelled? true
Threads still active: 1
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	(...)
Call really finished after: 9170

How awful is that? This is probably the worst thing that could happen:

  • The task was not interrupted in a timely manner. 9 seconds is way too long, I expected around 50 millis.
  • invokeAll was blocked for the entire time of the operation – 9 seconds. What the hell?

Solution 4 – NIO with Buffering

It turns out I do need some buffering. Let’s try with this one:

private void copyNioBuffered(String in, String out) throws Exception {
	FileChannel fin = new FileInputStream(in).getChannel();
	FileChannel fout = new FileOutputStream(out).getChannel();

	ByteBuffer buff = ByteBuffer.allocate(4096);
	while (fin.read(buff) != -1 || buff.position() > 0) {
		buff.flip();
		fout.write(buff);
		buff.compact();
	}

	fin.close();
	fout.close();
}

Output:

invokeAll finished after: 52
Future.isCancelled? true
java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:203)
	at TransferTest.copyNioBuffered(TransferTest.java:105)
	at TransferTest.access$0(TransferTest.java:98)
	at TransferTest$1.call(TransferTest.java:29)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
Call really finished after: 55
Threads still active: 0

Now that’s exactly what I needed. It respects interruptions by itself, so I don’t need those tedious checks all over my IO utility.

Quirks: Different types of channels

If my IO utility is only used for copying files that it gets by name, like this:

static public void copy(String source, String destination)

… then it’s fairly easy to rewrite the method for NIO.

But what if it’s a more generic signature that operates on streams?

static public void copy(InputStream source, OutputStream destination)

NIO has a little Channels utility with very useful methods like:

public static ReadableByteChannel newChannel(InputStream in)
public static WritableByteChannel newChannel(OutputStream out)

So it almost seems like we could wrap our streams using this helper and benefit from interruptible NIO API. Until we look at the source:

public static WritableByteChannel newChannel(final OutputStream out) {
	if (out == null) {
	    throw new NullPointerException();
	}

	if (out instanceof FileOutputStream &&
		FileOutputStream.class.equals(out.getClass())) {
		return ((FileOutputStream)out).getChannel();
	}

	return new WritableByteChannelImpl(out);
}

private static class WritableByteChannelImpl
	extends AbstractInterruptibleChannel	// Not really interruptible
	implements WritableByteChannel
{
// ... Ignores interrupts completely

Watch out! If your streams are file streams, they will be interruptible. Otherwise you’re out of luck – it’s just a dumb wrapper, more like an adapter for API compatibility. Assumptions kill, always check the source.

“Programming Concurrency on the JVM”

A few years ago when I took concurrency classes pretty much everything I was told was that in java synchronized is key. That’s the way to go, whenever you have multithreading you have to do it this way, period. I also spent quite a while solving many classic and less classic concurrency problems using only this construct, reimplementing more fancy locks using only this construct, preventing deadlocks, starvation and everything.

Later in my career I learned that is not the only way to go, or at least there are those fancy java.util.concurrent classes that take care of some stuff for you. That was nice, but apparently I never took enough time to actually stop and think how those things work, what they solve and why.

The light dawned when I started reading Programming Concurrency on the JVM: Mastering Synchronization, STM, and Actors by Venkat Subramaniam.

The book starts with a brief introduction on why concurrency is important today with its powers and perils. It quickly moves on to a few examples of different problems: IO-intensive task like calculating size of a large directory, and computationally intensive task of calculating prime numbers. Once the ground is set, it introduces three approaches to concurrent programming.

The first way to do it is what I summed up in the first paragraph, and what Venkat calls the “synchronize and suffer” model. Been there, done that, we know how bad it can get. This approach is called shared mutability, where different threads mutate shared state concurrently. It may be tamed (and a few ways to do it are shown in the book), but is a lot harder than it seems.

Another approach is isolated mutability, where each mutable part of state is only accessed by one thread. Usually this is the actor based concurrency model. The third way is pure immutability where there simply is no mutable state. That is typical for functional programming.

In the following chapters the book explores each of those areas in depth. It briefly explains the Java memory model nad shows what options for dealing with shared mutability and coordinating threads exist in core Java. It clearly states why the features from Java 5 are superior to the classic “synchronize and suffer” and describes locks, concurrent collections, executors, atomic references etc. in more detail. This is what most of us typically deal with in our daily Java programming, and the book is a great introduction to those modern (if old, in a way) APIs.

That’s about one third of the book. The rest is devoted to much more interesting, intriguing and powerful tools: software transactional memory and actors.

Sometimes we have to deal with shared mutability, and very often we need to coordinate many threads accessing many variables. The classic synchronization tools don’t have proper support for it: Rolling back changes and preventing one thread from seeing uncommited changes of another is difficult, and most likely they lead to coarse-grained locks which basically lock everything while a thread is mutating something.

We know how relational databases deal with it with their ACID transactional model. Software transactional memory is just that but applied to memory, with proper atomicity, consistency and isolation of transactions. If one thread mutates a transactional reference in transaction, another will not see it until that transaction is committed. There is no need for any explicit locks as the libraries (like Akka or Clojure) monitor what variables you access and mutate in transaction and apply locking automatically. They even can rollback and retry the transaction for you.

Another approach is isolated mutability, a.k.a. actors, best demonstated on Akka. Each actor runs in a single thread and all it can do is receive or pass messages. This is probably closest to the original concept of object-oriented programming (recommended reading by Michael Feathers). You have isolated cells that pass messages to each other, and that’s it. When you have a task to execute, you spawn actors and dispatch it to them as immutable messages. When they’re done, they can call you back by passing another message (if the coordinator is also an actor), or if you’re not that pure you can wait for the result. Either way, eveything is neatly isolated in scope of a single thread.

Lengthy as this summary/review is, it really does not do justice to the book. The book itself is dense with valuable information and practical examples, which are as close to perfection as possible: There are a few recurring problems which are fairly simple and easy to grasp, solved over and over again with different techniques and different languages. There are many examples in Java, Scala, Groovy, Clojure and JRuby, dealing with libraries such as the core Java API, Clojure, Akka, GPars… In a few words, a ton of useful stuff.

Last but not the least, it’s excellently written. If anyone has seen Venkat in real life, this book is all like him – entertaining, but also thought-provoking, challenging and inspiring. It reads like a novel (if not better than some of them) and is very hard to put down until you’re done.

Highly recommended.