Untangling Java concurrency

Java 101: Java concurrency without the pain, Part 1

Get started with the Java Concurrency Utilities

1 2 3 4 Page 2
Page 2 of 4

Listing 3. ReadWebPage.java

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;

import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

import java.util.ArrayList;
import java.util.List;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;

public class ReadWebPage
{
   public static void main(final String[] args)
   {
      if (args.length != 1)
      {
         System.err.println("usage: java ReadWebPage url");
         return;
      }
      ExecutorService executor = Executors.newSingleThreadExecutor();
      Callable<List<String>> callable;
      callable = new Callable<List<String>>()
                 {
                    @Override
                    public List<String> call()
                       throws IOException, MalformedURLException
                    {
                       List<String> lines = new ArrayList<>();
                       URL url = new URL(args[0]);
                       HttpURLConnection con;
                       con = (HttpURLConnection) url.openConnection();
                       InputStreamReader isr;
                       isr = new InputStreamReader(con.getInputStream());
                       BufferedReader br;
                       br = new BufferedReader(isr);
                       String line;
                       while ((line = br.readLine()) != null)
                          lines.add(line);
                       return lines;
                    }
                 };
      Future<List<String>> future = executor.submit(callable);
      try
      {
         List<String> lines = future.get(5, TimeUnit.SECONDS);
         for (String line: lines)
            System.out.println(line);
      }
      catch (ExecutionException ee)
      {
         System.err.println("Callable through exception: "+ee.getMessage());
      }
      catch (InterruptedException | TimeoutException eite)
      {
         System.err.println("URL not responding");
      }
      executor.shutdown();
    }
}

Listing 3's main() method first verifies that a single (URL-based) command-line argument has been specified. It then creates a single-thread executor and a callable that tries to open a connection to this URL, read its contents line by line, and save these lines in a list, which it returns.

The callable is subsequently submitted to the executor and a future representing the list of strings is returned. main() invokes the future's V get(long timeout, TimeUnit unit) method to obtain this list.

get() throws TimeoutException when the callable doesn't finish within five seconds. It throws ExecutionException when the callable throws an exception (for instance, the callable will throw java.net.MalformedURLException when the URL is invalid).

Regardless of whether an exception is thrown or not, the executor must be shut down before the application exits. If the executor isn't shut down, the application won't exit because the non-daemon thread-pool threads are still executing.

Synchronizers

Synchronizers are high-level constructs that coordinate and control thread execution. The Java Concurrency Utilities framework provides classes that implement semaphore, cyclic barrier, countdown latch, exchanger, and phaser synchronizers. I'll introduce each of these synchronizer types and then show you how they'd work in a concurrent Java application.

Semaphores

A semaphore is a thread-synchronization construct for controlling thread access to a common resource. It's often implemented as a protected variable whose value is incremented by an acquire operation and decremented by a release operation.

The acquire operation either returns control to the invoking thread immediately or causes that thread to block when the semaphore's current value reaches a certain limit. The release operation decreases the current value, which causes a blocked thread to resume.

Semaphores whose current values can be incremented past 1 are known as counting semaphores, whereas semaphores whose current values can be only 0 or 1 are known as binary semaphores or mutexes. In either case, the current value cannot be negative.

The java.lang.concurrent.Semaphore class conceptualizes a semaphore as an object maintaining a set of permits. This class provides Semaphore(int permits) and Semaphore(int permits, boolean fair) constructors for specifying the number of permits.

Each call to the Semaphore's void acquire() method takes one of the available permits or blocks the calling thread when one isn't available. Each call to Semaphore's void release() method returns an available permit, potentially releasing a blocking acquirer thread.

Working with semaphores

Semaphores are often used to restrict the number of threads that can access a resource. Listing 4 demonstrates this capability by using a semaphore to control access to a pool of string items -- the source code is based on Semaphore's Javadoc example code.

Listing 4. SemaphoreDemo.java

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo
{
   public static void main(String[] args)
   {
      final Pool pool = new Pool();
      Runnable r = new Runnable()
                   {
                      @Override
                      public void run()
                      {
                         String name = Thread.currentThread().getName();
                         try
                         {
                            while (true)
                            {
                               String item;
                               System.out.printf("%s acquiring %s%n", name,
                                                 item = pool.getItem());
                               Thread.sleep(200+(int)(Math.random()*100));
                               System.out.printf("%s putting back %s%n",
name,
                                                 item);
                               pool.putItem(item);
                            }
                         }
                         catch (InterruptedException ie)
                         {
                            System.out.printf("%s interrupted%n", name);
                         }
                      }
                   };
      ExecutorService[] executors = new
ExecutorService[Pool.MAX_AVAILABLE+1];
      for (int i = 0; i < executors.length; i++)
      {
         executors[i] = Executors.newSingleThreadExecutor();
         executors[i].execute(r);
      }
   }
}

final class Pool
{
   public static final int MAX_AVAILABLE = 10;

   private Semaphore available = new Semaphore(MAX_AVAILABLE, true);
   private String[] items;
   private boolean[] used = new boolean[MAX_AVAILABLE];

   Pool()
   {
      items = new String[MAX_AVAILABLE];
      for (int i = 0; i < items.length; i++)
         items[i] = "ITEM"+i;
   }

   String getItem() throws InterruptedException
   {
      available.acquire();
      return getNextAvailableItem();
   }

   void putItem(String item)
   {
      if (markAsUnused(item))
         available.release();
   }

   private synchronized String getNextAvailableItem()
   {
      for (int i = 0; i < MAX_AVAILABLE; ++i)
      {
         if (!used[i])
         {
            used[i] = true;
            return items[i];
         }
      }
      return null; // not reached
   }

   private synchronized boolean markAsUnused(String item)
   {
      for (int i = 0; i < MAX_AVAILABLE; ++i)
      {
         if (item == items[i])
         {
            if (used[i])
            {
               used[i] = false;
               return true;
            }
            else
               return false;
         }
      }
      return false;
   }
}

Listing 4 presents SemaphoreDemo and Pool classes. SemaphoreDemo drives the application by creating executors and having them execute a runnable that repeatedly acquires string item resources from a pool (implemented by Pool) and then returns them.

Pool provides String getItem() and void putItem(String item) methods for obtaining and returning resources. Before obtaining an item in getItem(), a thread must acquire a permit from the semaphore, guaranteeing that an item is available for use. When the thread has finished with the item, it calls putItem(String), which returns the item to the pool and then returns a permit to the semaphore, which lets another thread acquire that item.

No synchronization lock is held when acquire() is called because that would prevent an item from being returned to the pool. However, String getNextAvailableItem() and boolean markAsUnused(String item) are synchronized to maintain pool consistency. (The semaphore encapsulates the synchronization needed to restrict access to the pool separately from the synchronization needed to maintain pool consistency.)

Compile Listing 4 (javac SemaphoreDemo.java) and run this application (java SemaphoreDemo). A prefix of the output generated from one run is shown below:

pool-1-thread-1 acquiring ITEM0
pool-9-thread-1 acquiring ITEM9
pool-7-thread-1 acquiring ITEM8
pool-5-thread-1 acquiring ITEM7
pool-3-thread-1 acquiring ITEM6
pool-10-thread-1 acquiring ITEM5
pool-8-thread-1 acquiring ITEM4
pool-6-thread-1 acquiring ITEM3
pool-4-thread-1 acquiring ITEM2
pool-2-thread-1 acquiring ITEM1
pool-5-thread-1 putting back ITEM7
pool-11-thread-1 acquiring ITEM7
pool-9-thread-1 putting back ITEM9
pool-5-thread-1 acquiring ITEM9
pool-7-thread-1 putting back ITEM8
pool-9-thread-1 acquiring ITEM8
pool-3-thread-1 putting back ITEM6
pool-7-thread-1 acquiring ITEM6

In the above output, eleven threads compete for ten resources. Thread pool-11-thread-1 is forced to wait when it attempts to acquire a resource. It resumes with the ITEM7 resource when thread pool-5-thread-1 returns this resource to the pool.

Cyclic barriers

A cyclic barrier is a thread-synchronization construct that lets a set of threads wait for each other to reach a common barrier point. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A cyclic barrier is implemented by the java.lang.concurrent.CyclicBarrier class. This class provides the following constructors:

  • CyclicBarrier(int nthreads, Runnable barrierAction) causes a maximum of nthreads-1 threads to wait at the barrier. When one more thread arrives, it executes the nonnull barrierAction and then all threads proceed. This action is useful for updating shared state before any of the threads continue.
  • CyclicBarrier(int nthreads) is similar to the previous constructor except that no runnable is executed when the barrier is tripped.

Either constructor throws java.lang.IllegalArgumentException when the value passed to nthreads is less than 1.

CyclicBarrier declares an int await() method that typically causes the calling thread to wait unless the thread is the final thread. If so, and if a nonnull Runnable was passed to barrierAction, the final thread executes the runnable before the other threads continue.

await() throws InterruptedException when the thread that invoked this method is interrupted while waiting. This method throws BrokenBarrierException when another thread was interrupted while the invoking thread was waiting, the barrier was broken when await() was called, or the barrier action (when present) failed because an exception was thrown from the runnable's run() method.

Working with cyclic barriers

Cyclic barriers can be used to perform lengthy calculations by breaking them into smaller individual tasks (as demonstrated by CyclicBarrier's Javadoc example code). They're also used in multiplayer games that cannot start until the last player has joined, as shown in Listing 5.

Listing 5. CyclicBarrierDemo.java

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierDemo
{
   public static void main(String[] args)
   {
      Runnable action = new Runnable()
                        {
                           @Override
                           public void run()
                           {
                              String name =
Thread.currentThread().getName();
                              System.out.printf("Thread %s "+
                                                "executing barrier
action.%n",
                                                name);
                           }
                        };
      final CyclicBarrier barrier = new CyclicBarrier(3, action);
      Runnable task = new Runnable()
                      {
                         @Override
                         public void run()
                         {
                            String name = Thread.currentThread().getName();
                            System.out.printf("%s about to join game...%n",
                                              name);
                            try
                            {
                               barrier.await();
                            }
                            catch (BrokenBarrierException bbe)
                            {
                               System.out.println("barrier is broken");
                               return;
                            }
                            catch (InterruptedException ie)
                            {
                               System.out.println("thread interrupted");
                               return;
                            }
                            System.out.printf("%s has joined game%n", name);
                         }
                      };
      ExecutorService[] executors = new ExecutorService[]
                                    {
                                       Executors.newSingleThreadExecutor(),
                                       Executors.newSingleThreadExecutor(),
                                       Executors.newSingleThreadExecutor()
                                    };
      for (ExecutorService executor: executors)
      {
         executor.execute(task);
         executor.shutdown();
      }
   }
}

The above main() method first creates a barrier action that's run by the last thread to reach the barrier. Next, a cyclic barrier is created. When three players arrive it trips and executes the barrier action.

Reusing a CyclicBarrier

To reuse a CyclicBarrier instance, invoke its void reset() method.

main() now creates a runnable that outputs various status messages and invokes await(), followed by a three-executor array. Each executor runs this runnable and shuts down after the runnable finishes.

Compile and run this application. You should see output similar to the following:

pool-1-thread-1 about to join game...
pool-3-thread-1 about to join game...
pool-2-thread-1 about to join game...
Thread pool-2-thread-1 executing barrier action.
pool-2-thread-1 has joined game
pool-3-thread-1 has joined game
pool-1-thread-1 has joined game

Countdown latches

A countdown latch is a thread-synchronization construct that causes one or more threads to wait until a set of operations being performed by other threads finishes. It consists of a count and "cause a thread to wait until the count reaches zero" and "decrement the count" operations.

1 2 3 4 Page 2
Page 2 of 4