Untangling Java concurrency

Java 101: Java concurrency without the pain, Part 2

Locking, atomic variables, Fork/Join, and what to expect in Java 8

1 2 3 4 Page 3
Page 3 of 4

Atomic variables

Multithreaded applications that run on multicore processors or multiprocessor systems can achieve good hardware utilization and be highly scalable. They can achieve these ends by having their threads spend most of their time performing work rather than waiting for work to accomplish, or waiting to acquire locks in order to access shared data structures.

However, Java's traditional synchronization mechanism, which enforces mutual exclusion (the thread holding the lock that guards a set of variables has exclusive access to them) and visibility (changes to the guarded variables become visible to other threads that subsequently acquire the lock), impacts hardware utilization and scalability, as follows:

  • Contended synchronization (multiple threads constantly competing for a lock) is expensive and throughput suffers as a result. A major reason for the expense is the frequent context switching that takes place; a context switch operation can take many processor cycles to complete. In contrast, uncontended synchronization is inexpensive on modern JVMs.
  • When a thread holding a lock is delayed (e.g., because of a scheduling delay), no thread that requires that lock makes any progress, and the hardware isn't utilized as well as it otherwise might be.

You might think that you can use volatile as a synchronization alternative. However, volatile variables only solve the visibility problem. They cannot be used to safely implement the atomic read-modify-write sequences that are necessary for safely implementing counters and other entities that require mutual exclusion.

Java 5 introduced a synchronization alternative that offers mutual exclusion combined with the performance of volatile. This atomic variable alternative is based on a microprocessor's compare-and-swap instruction and largely consists of the types in the java.util.concurrent.atomic package.

Understanding compare-and-swap

The compare-and-swap (CAS) instruction is an uninterruptible instruction that reads a memory location, compares the read value with an expected value, and stores a new value in the memory location when the read value matches the expected value. Otherwise, nothing is done. The actual microprocessor instruction may differ somewhat (e.g., return true if CAS succeeded or false otherwise instead of the read value).

Microprocessor CAS instructions

Modern microprocessors offer some kind of CAS instruction. For example, Intel microprocessors offer the cmpxchg family of instructions, whereas PowerPC microprocessors offer load-link (e.g., lwarx) and store-conditional (e.g., stwcx) instructions for the same purpose.

CAS makes it possible to support atomic read-modify-write sequences. You would typically use CAS as follows:

  1. Read value v from address X.
  2. Perform a multistep computation to derive a new value v2.
  3. Use CAS to change the value of X from v to v2. CAS succeeds when X's value hasn't changed while performing these steps.

To see how CAS offers better performance (and scalability) over synchronization, consider a counter example that lets you read its current value and increment the counter. The following class implements a counter based on synchronized:

Listing 4. Counter.java (version 1)

public class Counter 
{
   private int value;

   public synchronized int getValue() 
   { 
      return value; 
   }

   public synchronized int increment() 
   { 
      return ++value; 
   }
}

High contention for the monitor lock will result in excessive context switching that can delay all of the threads and result in an application that doesn't scale well.

The CAS alternative requires an implementation of the compare-and-swap instruction. The following class emulates CAS. It uses synchronized instead of the actual hardware instruction to simplify the code:

Listing 5. EmulatedCAS.java

public class EmulatedCAS
{
   private int value;

   public synchronized int getValue()
   { 
      return value; 
   }

   public synchronized int compareAndSwap(int expectedValue, int newValue) 
   {
      int readValue = value;
      if (readValue == expectedValue)
         value = newValue;
      return readValue;
   }
}

Here, value identifies a memory location, which can be retrieved by getValue(). Also, compareAndSwap() implements the CAS algorithm.

The following class uses EmulatedCAS to implement a non-synchronized counter (pretend that EmulatedCAS doesn't require synchronized):

Listing 6. Counter.java (version 2)

public class Counter 
{
   private EmulatedCAS value = new EmulatedCAS();

   public int getValue() 
   {
      return value.getValue();
   }

   public int increment() 
   {
      int readValue = value.getValue();
      while (value.compareAndSwap(readValue, readValue+1) != readValue)
         readValue = value.getValue();
      return readValue+1;
   }
}

Counter encapsulates an EmulatedCAS instance and declares methods for retrieving and incrementing a counter value with help from this instance. getValue() retrieves the instance's "current counter value" and increment() safely increments the counter value.

increment() repeatedly invokes compareAndSwap() until readValue's value doesn't change. It's then free to change this value. When no lock is involved, contention is avoided along with excessive context switching. Performance improves and the code is more scalable.

ReentrantLock and CAS

You previously learned that ReentrantLock offers better performance than synchronized under high thread contention. To boost performance, ReentrantLock's synchronization is managed by a subclass of the abstract java.util.concurrent.locks.AbstractQueuedSynchronizer class. In turn, this class leverages the undocumented sun.misc.Unsafe class and its compareAndSwapInt() CAS method.

Exploring the atomic variables package

You don't have to implement compareAndSwap() via the nonportable Java Native Interface. Instead, Java 5 offers this support via java.util.concurrent.atomic: a toolkit of classes used for lock-free, thread-safe programming on single variables.

According to java.util.concurrent.atomic's Javadoc, these classes

extend the notion of volatile values, fields, and array elements to those that also provide an atomic conditional update operation of the form boolean compareAndSet(expectedValue, updateValue). This method (which varies in argument types across different classes) atomically sets a variable to the updateValue if it currently holds the expectedValue, reporting true on success.

This package offers classes for Boolean (AtomicBoolean), integer (AtomicInteger), long integer (AtomicLong) and reference (AtomicReference) types. It also offers array versions of integer, long integer, and reference (AtomicIntegerArray, AtomicLongArray, and AtomicReferenceArray), markable and stamped reference classes for atomically updating a pair of values (AtomicMarkableReference and AtomicStampedReference), and more.

Implementing compareAndSet()

Java implements compareAndSet() via the fastest available native construct (e.g., cmpxchg or load-link/store-conditional) or (in the worst case) spin locks.

Consider AtomicInteger, which lets you update an int value atomically. We can use this class to implement the counter shown in Listing 6. Listing 7 presents the equivalent source code.

Listing 7. Counter.java (version 3)

import java.util.concurrent.atomic.AtomicInteger;

public class Counter 
{
   private AtomicInteger value = new AtomicInteger();

   public int getValue() 
   {
      return value.get();
   }

   public int increment() 
   {
      int readValue = value.get();
      while (!value.compareAndSet(readValue, readValue+1))
         readValue = value.get();
      return readValue+1;
   }
}

Listing 7 is very similar to Listing 6 except that it replaces EmulatedCAS with AtomicInteger. Incidentally, you can simplify increment() because AtomicInteger supplies its own int getAndIncrement() method (and similar methods).

Fork/Join framework

Computer hardware has evolved significantly since Java's debut in 1995. Back in the day, single-processor systems dominated the computing landscape and Java's synchronization primitives, such as synchronized and volatile, as well as its threading library (the Thread class, for example) were generally adequate.

Multiprocessor systems became cheaper and developers found themselves needing to create Java applications that effectively exploited the hardware parallelism that these systems offered. However, they soon discovered that Java's low-level threading primitives and library were very difficult to use in this context, and the resulting solutions were often riddled with errors.

What is parallelism?

Parallelism is the simultaneous execution of multiple threads/tasks via some combination of multiple processors and processor cores.

The Java Concurrency Utilities framework simplifies the development of these applications; however, the utilities offered by this framework do not scale to thousands of processors or processor cores. In our many-core era, we need a solution for achieving a finer-grained parallelism, or we risk keeping processors idle even when there is lots of work for them to handle.

Professor Doug Lea presented a solution to this problem in his paper introducing the idea for a Java-based fork/join framework. Lea describes a framework that supports "a style of parallel programming in which problems are solved by (recursively) splitting them into subtasks that are solved in parallel." The Fork/Join framework was eventually included in Java 7.

Overview of the Fork/Join framework

The Fork/Join framework is based on a special executor service for running a special kind of task. It consists of the following types that are located in the java.util.concurrent package:

  • ForkJoinPool: an ExecutorService implementation that runs ForkJoinTasks. ForkJoinPool provides task-submission methods, such as void execute(ForkJoinTask<?> task), along with management and monitoring methods, such as int getParallelism() and long getStealCount().
  • ForkJoinTask: an abstract base class for tasks that run within a ForkJoinPool context. ForkJoinTask describes thread-like entities that have a much lighter weight than normal threads. Many tasks and subtasks can be hosted by very few actual threads in a ForkJoinPool instance.
  • ForkJoinWorkerThread: a class that describes a thread managed by a ForkJoinPool instance. ForkJoinWorkerThread is responsible for executing ForkJoinTasks.
  • RecursiveAction: an abstract class that describes a recursive resultless ForkJoinTask.
  • RecursiveTask: an abstract class that describes a recursive result-bearing ForkJoinTask.

The ForkJoinPool executor service is the entry-point for submitting tasks that are typically described by subclasses of RecursiveAction or RecursiveTask. Behind the scenes, the task is divided into smaller tasks that are forked (distributed among different threads for execution) from the pool. A task waits until joined (its subtasks finish so that results can be combined).

ForkJoinPool manages a pool of worker threads, where each worker thread has its own double-ended work queue (deque). When a task forks a new subtask, the thread pushes the subtask onto the head of its deque. When a task tries to join with another task that hasn't finished, the thread pops another task off the head of its deque and executes the task. If the thread's deque is empty, it tries to steal another task from the tail of another thread's deque. This work stealing behavior maximizes throughput while minimizing contention.

Using the Fork/Join framework

Fork/Join was designed to efficiently execute divide-and-conquer algorithms, which recursively divide problems into sub-problems until they are simple enough to solve directly; for example, a merge sort. The solutions to these sub-problems are combined to provide a solution to the original problem. Each sub-problem can be executed independently on a different processor or core.

Lea's paper presents the following pseudocode to describe the divide-and-conquer behavior:

Result solve(Problem problem) 
{
   if (problem is small)
      directly solve problem
   else 
   {
      split problem into independent parts
      fork new subtasks to solve each part
      join all subtasks
      compose result from subresults
   }
}

The pseudocode presents a solve method that's called with some problem to solve and which returns a Result that contains the problem's solution. If the problem is too small to solve via parallelism, it's solved directly. (The overhead of using parallelism on a small problem exceeds any gained benefit.) Otherwise, the problem is divided into subtasks: each subtask independently focuses on part of the problem.

Operation fork launches a new fork/join subtask that will execute in parallel with other subtasks. Operation join delays the current task until the forked subtask finishes. At some point, the problem will be small enough to be executed sequentially, and its result will be combined along with other subresults to achieve an overall solution that's returned to the caller.

The Javadoc for the RecursiveAction and RecursiveTask classes presents several divide-and-conquer algorithm examples implemented as fork/join tasks. For RecursiveAction the examples sort an array of long integers, increment each element in an array, and sum the squares of each element in an array of doubles. RecursiveTask's solitary example computes a Fibonacci number.

Listing 8 presents an application that demonstrates the sorting example in non-fork/join as well as fork/join contexts. It also presents some timing information to contrast the sorting speeds.

1 2 3 4 Page 3
Page 3 of 4