Untangling Java concurrency

Java 101: Java concurrency without the pain, Part 1

Get started with the Java Concurrency Utilities

1 2 3 4 Page 3
Page 3 of 4

The java.util.concurrent.CountDownLatch class implements a countdown latch. Its CountDownLatch(int count) constructor initializes the countdown latch to the specified count. A thread invokes the void await() method to wait until the count has reached zero (or the thread has been interrupted). Subsequent calls to await() for a zero count return immediately. A thread calls void countDown() to decrement the count.

Working with countdown latches

Countdown latches are useful for decomposing a problem into smaller pieces and giving a piece to a separate thread, as follows:

  1. A main thread creates a countdown latch with a count of 1 that's used as a "starting gate" to start a group of worker threads simultaneously.
  2. Each worker thread waits on the latch and the main thread decrements this latch to let all worker threads proceed.
  3. The main thread waits on another countdown latch initialized to the number of worker threads.
  4. When a worker thread completes, it decrements this count. After the count reaches zero (meaning that all worker threads have finished), the main thread proceeds and gathers the results.

Listing 6 demonstrates this scenario.

Listing 6. CountDownLatchDemo.java

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo
{
   final static int N = 3;

   public static void main(String[] args) throws InterruptedException
   {
      CountDownLatch startSignal = new CountDownLatch(1);
      CountDownLatch doneSignal = new CountDownLatch(N);
      for (int i = 0; i < N; ++i) // create and start threads
         new Thread(new Worker(startSignal, doneSignal)).start();
      System.out.println("about to let threads proceed");
      startSignal.countDown(); // let all threads proceed
      System.out.println("doing work");
      System.out.println("waiting for threads to finish");
      doneSignal.await(); // wait for all threads to finish
      System.out.println("main thread terminating");
   }
}

class Worker implements Runnable
{
   private final static int N = 5;

   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;

   Worker(CountDownLatch startSignal, CountDownLatch doneSignal)
   {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
   }

   @Override
   public void run()
   {
      try
      {
         String name = Thread.currentThread().getName();
         startSignal.await();
         for (int i = 0; i < N; i++)
         {
            System.out.printf("thread %s is working%n", name);
            try
            {
               Thread.sleep((int)(Math.random()*300));
            }
            catch (InterruptedException ie)
            {
            }
         }
         System.out.printf("thread %s finishing%n", name);
         doneSignal.countDown();
      }
      catch (InterruptedException ie)
      {
         System.out.println("interrupted");
      }
   }
}

Listing 6 presents CountDownLatchDemo and Worker classes. CountDownLatchDemo's main() method creates a startSignal countdown latch initialized to 1 and a doneSignal countdown latch initialized to 3, the number of worker threads.

main() proceeds to create three worker threads described by Worker and then start these threads. After outputting a message, main() executes startSignal.countDown() to tell the worker threads that they can proceed.

After outputting a few more messages, main() executes doneSignal.await() to wait until all worker threads have finished.

Worker's constructor saves these latches, and its run() method performs some work. Before performing this work, the thread executes startSignal.await() to block until the main thread allows it to proceed (by executing startSignal.countDown()).

The worker then enters a loop to simulate doing some work by alternately outputting messages and sleeping for random amounts of time. It then executes doneSignal.countDown() to decrement the doneSignal countdown latch so that the main thread will eventually wake up.

Compile and run this application. You should see output similar to the following:

about to let threads proceed
doing work
waiting for threads to finish
thread Thread-1 is working
thread Thread-0 is working
thread Thread-2 is working
thread Thread-1 is working
thread Thread-0 is working
thread Thread-0 is working
thread Thread-1 is working
thread Thread-2 is working
thread Thread-1 is working
thread Thread-2 is working
thread Thread-0 is working
thread Thread-0 is working
thread Thread-1 is working
thread Thread-2 is working
thread Thread-1 finishing
thread Thread-0 finishing
thread Thread-2 is working
thread Thread-2 finishing
main thread terminating

Exchangers

An exchanger (also known as a rendezvous) is a thread-synchronization construct that lets a pair of threads exchange data items. An exchanger is similar to a cyclic barrier whose count is set to 2 but also supports exchange of data when both threads reach the barrier.

The java.util.concurrent.Exchanger<V> class implements an exchanger. This class provides an Exchanger() constructor for initializing an exchanger that describes an exchange point and a pair of exchange() methods for performing an exchange.

For example, V exchange(V x) throws InterruptedException waits for another thread to arrive at the exchange point (unless the current thread is interrupted) and then transfers the given object to it, receiving its object in return.

Working with exchangers

Exchanger's Javadoc states that this synchronizer may be useful in genetic algorithms and pipeline designs, where one thread fills a buffer and the other thread empties the buffer. When both threads meet at the exchange point, they swap their buffers. Listing 7 demonstrates.

Listing 7. ExchangerDemo.java

import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.Exchanger;

public class ExchangerDemo
{
   static Exchanger<DataBuffer> exchanger = new
Exchanger<DataBuffer>();
   static DataBuffer initialEmptyBuffer = new DataBuffer();
   static DataBuffer initialFullBuffer = new DataBuffer("ITEM");

   public static void main(String[] args)
   {
      class FillingLoop implements Runnable
      {
         int count = 0;

         @Override
         public void run()
         {
            DataBuffer currentBuffer = initialEmptyBuffer;
            try
            {
               while (true)
               {
                  addToBuffer(currentBuffer);
                  if (currentBuffer.isFull())
                  {
                     System.out.println("filling loop thread wants to
exchange");
                     currentBuffer = exchanger.exchange(currentBuffer);
                     System.out.println("filling loop thread observes an
exchange");
                  }
               }
            }
            catch (InterruptedException ie)
            {
               System.out.println("filling loop thread interrupted");
            }
         }

         void addToBuffer(DataBuffer buffer)
         {
            String item = "NEWITEM"+count++;
            System.out.printf("Adding %s%n", item);
            buffer.add(item);
         }
      }

      class EmptyingLoop implements Runnable
      {
         @Override
         public void run()
         {
            DataBuffer currentBuffer = initialFullBuffer;
            try
            {
               while (true)
               {
                  takeFromBuffer(currentBuffer);
                  if (currentBuffer.isEmpty())
                  {
                     System.out.println("emptying loop thread wants to
exchange");
                     currentBuffer = exchanger.exchange(currentBuffer);
                     System.out.println("emptying loop thread observes an
exchange");
                  }
               }
            }
            catch (InterruptedException ie)
            {
               System.out.println("emptying loop thread interrupted");
            }
         }

         void takeFromBuffer(DataBuffer buffer)
         {
            System.out.printf("taking %s%n", buffer.remove());
         }
      }

      new Thread(new EmptyingLoop()).start();
      new Thread(new FillingLoop()).start();
   }
}

class DataBuffer
{
   private final static int MAX = 10;
   private List<String> items = new ArrayList<>();

   DataBuffer()
   {
   }

   DataBuffer(String prefix)
   {
      for (int i = 0; i < MAX; i++)
      {
         String item = prefix+i;
         System.out.printf("Adding %s%n", item);
         items.add(item);
      }
   }

   void add(String s)
   {
      if (!isFull())
         items.add(s);
   }

   boolean isEmpty()
   {
      return items.size() == 0;
   }

   boolean isFull()
   {
      return items.size() == MAX;
   }

   String remove()
   {
      if (!isEmpty())
         return items.remove(0);
      return null;
   }
}

Listing 7 is based on the example code in Exchanger's Javadoc. One thread fills one buffer with strings while another thread empties another buffer. When the respective buffer is full or empty, these threads meet at an exchange point and swap buffers.

For example, when the filling thread's currentBuffer.isFull() expression is true, it executes currentBuffer = exchanger.exchange(currentBuffer) and waits. The emptying thread continues until currentBuffer.isEmpty() evaluates to true, and also invokes exchange(currentBuffer). At this point, the buffers are swapped and the threads continue.

Compile and run this application. Your initial output should be similar to the following prefix:

Adding ITEM0
Adding ITEM1
Adding ITEM2
Adding ITEM3
Adding ITEM4
Adding ITEM5
Adding ITEM6
Adding ITEM7
Adding ITEM8
Adding ITEM9
taking ITEM0
taking ITEM1
taking ITEM2
taking ITEM3
taking ITEM4
taking ITEM5
taking ITEM6
taking ITEM7
taking ITEM8
Adding NEWITEM0
taking ITEM9
Adding NEWITEM1
emptying loop thread wants to exchange
Adding NEWITEM2
Adding NEWITEM3
Adding NEWITEM4
Adding NEWITEM5
Adding NEWITEM6
Adding NEWITEM7
Adding NEWITEM8
Adding NEWITEM9
filling loop thread wants to exchange
filling loop thread observes an exchange
emptying loop thread observes an exchange
Adding NEWITEM10
Adding NEWITEM11
taking NEWITEM0
taking NEWITEM1
Adding NEWITEM12
taking NEWITEM2
taking NEWITEM3
Adding NEWITEM13
taking NEWITEM4
taking NEWITEM5
Adding NEWITEM14
taking NEWITEM6
taking NEWITEM7
Adding NEWITEM15
taking NEWITEM8
taking NEWITEM9
emptying loop thread wants to exchange
Adding NEWITEM16
Adding NEWITEM17
Adding NEWITEM18
Adding NEWITEM19
filling loop thread wants to exchange
filling loop thread observes an exchange
emptying loop thread observes an exchange
Adding NEWITEM20

Phasers

A phaser is a thread-synchronization construct that's similar to a cyclic barrier in that it lets a group of threads wait on a barrier and then proceed after the last thread arrives. It also offers the equivalent of a barrier action. However, a phaser is more flexible.

Unlike a cyclic barrier, which coordinates a fixed number of threads, a phaser can coordinate a variable number of threads, which can register at any time. To implement this capability, a phaser takes advantage of phases and phase numbers.

A phase is the phaser's current state, and this state is identified by an integer-based phase number. When the last of the registered threads arrives at the phaser barrier, a phaser advances to the next phase and increments its phase number by 1.

The java.util.concurrent.Phaser class implements a phaser. Because this class is thoroughly described in its Javadoc, I'll point out only a few constructors and methods:

  • The Phaser(int threads) constructor creates a phaser that initially coordinates nthreads threads (which have yet to arrive at the phaser barrier) and whose phase number is initially set to 0.
  • The int register() method adds a new unarrived thread to this phaser and returns the phase number to which the arrival applies. This number is known as the arrival phase number.
  • The int arriveAndAwaitAdvance() method records arrival and waits for the phaser to advance (which happens after the other threads have arrived). It returns the phase number to which the arrival applies.
  • The int arriveAndDeregister() method arrives at this phaser and deregisters from it without waiting for others to arrive, reducing the number of threads required to advance in future phases.

Working with phasers

The small application in Listing 8 demonstrates the constructor and methods described above.

1 2 3 4 Page 3
Page 3 of 4