Untangling Java concurrency

Java 101: Java concurrency without the pain, Part 2

Locking, atomic variables, Fork/Join, and what to expect in Java 8

1 2 3 4 Page 2
Page 2 of 4

Listing 2. CondDemo.java

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CondDemo
{
   public static void main(String[] args)
   {
      Shared s = new Shared();
      new Producer(s).start();
      new Consumer(s).start();
   }
}

class Shared
{
   // Fields c and available are volatile so that writes to them are visible to 
   // the various threads. Fields lock and condition are final so that they're
   // initial values are visible to the various threads. (The Java memory model 
   // promises that, after a final field has been initialized, any thread will 
   // see the same [correct] value.)

   private volatile char c;
   private volatile boolean available;
   private final Lock lock;
   private final Condition condition;

   Shared()
   {
      c = '\u0000';
      available = false;
      lock = new ReentrantLock();
      condition = lock.newCondition();
   }

   Lock getLock()
   {
      return lock;
   }

   char getSharedChar()
   {
      lock.lock();
      try
      {
         while (!available)
            try
            {
               condition.await();
            }
            catch (InterruptedException ie)
            {
               ie.printStackTrace();
            }
         available = false;
         condition.signal();
      }
      finally
      {
         lock.unlock();
         return c;
      }
   }

   void setSharedChar(char c)
   {
      lock.lock();
      try
      {
         while (available)
            try
            {
               condition.await();
            }
            catch (InterruptedException ie)
            {
               ie.printStackTrace();
            }
         this.c = c;
         available = true;
         condition.signal();
      }
      finally
      {
         lock.unlock();
      }
   }
}

class Producer extends Thread
{
   // l is final because it's initialized on the main thread and accessed on the
   // producer thread.

   private final Lock l;

   // s is final because it's initialized on the main thread and accessed on the
   // producer thread.

   private final Shared s;
   
   Producer(Shared s)
   {
      this.s = s;
      l = s.getLock();
   }

   @Override
   public void run()
   {
      for (char ch = 'A'; ch <= 'Z'; ch++)
      {
         l.lock();
         s.setSharedChar(ch);
         System.out.println(ch + " produced by producer.");
         l.unlock();
      }
   }
}
class Consumer extends Thread
{
   // l is final because it's initialized on the main thread and accessed on the
   // consumer thread.

   private final Lock l;

   // s is final because it's initialized on the main thread and accessed on the
   // consumer thread.

   private final Shared s;

   Consumer(Shared s)
   {
      this.s = s;
      l = s.getLock();
   }

   @Override
   public void run()
   {
      char ch;
      do
      {
         l.lock();
         ch = s.getSharedChar();
         System.out.println(ch + " consumed by consumer.");
         l.unlock();
      }
      while (ch != 'Z');
   }
}

Listing 2 presents four classes: CondDemo, Shared, Producer, and Consumer. CondDemo drives the application, Shared encapsulates the logic for setting and getting a shared variable's value, Producer describes the producer thread, and Consumer describes the consumer thread.

The mechanics of CondDemo

CondDemo's main() method instantiates Shared, Producer, and Consumer. It passes the Shared instance to the Producer and Consumer thread instance constructors and starts these threads.

The Producer and Consumer constructors are invoked on the main thread. Because the Shared instance is also accessed on the producer and consumer threads, it's necessary for this instance to be visible to them, especially when these threads run on different cores. Within each of Producer and Consumer, I accomplish this task by declaring s final. I could have declared this field volatile, but volatile suggests that there will be further writes to the field and s isn't supposed to change after being initialized.

Shared's constructor creates a lock (lock = new ReentrantLock();) and an associated condition (condition = lock.newCondition();). This lock is made available to the producer and consumer threads via the Lock getLock() method.

The producer thread always invokes the void setSharedChar(char c) method to generate a new character. This method locks the previously created lock object and then enters a while loop that repeatedly tests variable available -- this variable is true when a produced character hasn't yet been consumed.

As long as available is true, the producer invokes the condition's await() method to wait for available to become false. The consumer will signal the condition to wake up the producer when it has consumed the character. (A loop is used instead of an if statement because spurious wakeups are possible and available might still be true.)

After exiting the loop, the producer records the new character, assigns true to available to indicate that a new character is available for consumption, and signals the condition to wake up a waiting consumer. Lastly, it unlocks the lock and returns from setSharedChar().

Locking controls output order

Why am I locking the get/print and set/print code blocks? Without this locking, you might observe consuming messages before producing messages, even though characters are produced before they're consumed. Locking these blocks prevents this strange output order.

The behavior of the consumer thread in the char getSharedChar() method is similar.

The mechanics of the producer and consumer threads are simpler. Each run() method first locks the lock, then sets or gets a character and outputs a message, and unlocks the lock. (I didn't use the try/finally idiom because an exception isn't thrown from this context.)

Compile CondDemo.java and run the application. You should observe the following output:

A produced by producer.
A consumed by consumer.
B produced by producer.
B consumed by consumer.
C produced by producer.
C consumed by consumer.
D produced by producer.
D consumed by consumer.
E produced by producer.
E consumed by consumer.
F produced by producer.
F consumed by consumer.
G produced by producer.
G consumed by consumer.
H produced by producer.
H consumed by consumer.
I produced by producer.
I consumed by consumer.
J produced by producer.
J consumed by consumer.
K produced by producer.
K consumed by consumer.
L produced by producer.
L consumed by consumer.
M produced by producer.
M consumed by consumer.
N produced by producer.
N consumed by consumer.
O produced by producer.
O consumed by consumer.
P produced by producer.
P consumed by consumer.
Q produced by producer.
Q consumed by consumer.
R produced by producer.
R consumed by consumer.
S produced by producer.
S consumed by consumer.
T produced by producer.
T consumed by consumer.
U produced by producer.
U consumed by consumer.
V produced by producer.
V consumed by consumer.
W produced by producer.
W consumed by consumer.
X produced by producer.
X consumed by consumer.
Y produced by producer.
Y consumed by consumer.
Z produced by producer.
Z consumed by consumer.

Read-write locks

You'll occasionally encounter a situation where data structures are read more often than they're modified. The Locking framework has a read-write locking mechanism for these situations that yields both greater concurrency when reading and the safety of exclusive access when writing.

The ReadWriteLock interface maintains a pair of associated locks, one for read-only operations and one for write operations. The read lock may be held simultaneously by multiple reader threads as long as there are no writers. The write lock is exclusive: only a single thread can modify shared data. (The lock that's associated with the synchronized keyword is also exclusive.)

ReadWriteLock declares the following methods:

  • Lock readLock() returns the lock that's used for reading.
  • Lock writeLock() returns the lock that's used for writing.

Working with read-write locks

The ReentrantReadWriteLock class implements ReadWriteLock and describes a read-write lock with similar semantics to a reentrant lock. Like ReentrantLock, ReentrantReadWriteLock declares a pair of constructors:

  • ReentrantReadWriteLock() creates a reentrant read-write lock with default (nonfair) ordering properties.
  • ReentrantReadWriteLock(boolean fair) creates a reentrant read-write lock with the given fairness policy.

ReentrantReadWriteLock implements ReadWriteLock's methods and provides additional methods, including the following trio:

  • int getQueueLength() returns an estimate of the number of threads waiting to acquire either the read or write lock.
  • int getReadHoldCount() returns the number of read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock action that is not matched by an unlock action.
  • boolean hasWaiters(Condition condition) returns true when there are threads waiting on the given condition associated with the write lock.

Listing 3 demonstrates ReentrantReadWriteLock.

LIsting 3. RWLockDemo.java

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class RWLockDemo
{
   final static int DELAY = 80;
   final static int NUMITER = 5;

   public static void main(String[] args) 
   {
      final Names names = new Names();

      class NamedThread implements ThreadFactory
      {
         private String name;

         NamedThread(String name)
         {
            this.name = name;
         }

         @Override
         public Thread newThread(Runnable r)
         {
            return new Thread(r, name);
         }
      }

      ExecutorService writer;
      writer = Executors.newSingleThreadExecutor(new NamedThread("writer"));
      Runnable wrunnable = new Runnable()
                           {
                              @Override
                              public void run()
                              {
                                 for (int i = 0; i < NUMITER; i++)
                                 {
                                    names.add(Thread.currentThread().getName(), 
                                              "A" + i);
                                    try
                                    {
                                       Thread.sleep(DELAY);
                                    }
                                    catch (InterruptedException ie)
                                    {
                                    }
                                 }
                              }
                           };
      writer.submit(wrunnable);

      ExecutorService reader1;
      reader1 = Executors.newSingleThreadExecutor(new NamedThread("reader1"));
      ExecutorService reader2;
      reader2 = Executors.newSingleThreadExecutor(new NamedThread("reader2"));
      Runnable rrunnable = new Runnable()
                           {
                              @Override
                              public void run()
                              {
                                 for (int i = 0; i < NUMITER; i++)
                                    names.dump(Thread.currentThread().getName());
                              }
                           };
      reader1.submit(rrunnable);
      reader2.submit(rrunnable);

      reader1.shutdown();
      reader2.shutdown();
      writer.shutdown();
   }
}

class Names 
{
   private final List<String> names;
 
   private final ReentrantReadWriteLock lock;
   private final Lock readLock, writeLock;
 
   Names()
   {
      names = new ArrayList<>();
      lock = new ReentrantReadWriteLock();
      readLock = lock.readLock();
      writeLock = lock.writeLock();
   }
 
   void add(String threadName, String name)
   {
      writeLock.lock();
      try 
      {
         System.out.printf("%s: num waiting threads = %d%n", 
                           threadName, lock.getQueueLength());
         names.add(name);
      } 
      finally 
      {
         writeLock.unlock();
      }
   }

   void dump(String threadName)
   {
      readLock.lock();
      try
      {
         System.out.printf("%s: num waiting threads = %d%n",
                           threadName, lock.getQueueLength());
         Iterator<String> iter = names.iterator();
         while (iter.hasNext())
         {
            System.out.printf("%s: %s%n", threadName, iter.next());
            try
            {
               Thread.sleep((int)(Math.random()*100));
            }
            catch (InterruptedException ie)
            {
            }
         }
      }
      finally
      {
         readLock.unlock();
      }
   }
}

Listing 3 describes an application where a writer thread appends names to a list of names and a pair of reader threads repeatedly dump this list to the standard output.

The mechanics of RWLockDemo

The main() method first instantiates the Names class, which stores the list of names and provides methods for adding names to and dumping the list. It then declares NamedThread.

NamedThread is a local class that is subsequently used in an executor context to provide a name for the executor's thread. It implements the java.util.concurrent.ThreadFactory interface and its Thread newThread(Runnable r) method, which returns a new thread whose name was previously passed to the NamedThread(String name) constructor.

Next, main() invokes java.util.concurrent.Executors's ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) method to create an executor for the writer thread. The name is obtained from the NamedThread instance.

A runnable for the writer thread is then created and submitted to the executor. The runnable repeatedly creates and adds a name to the list of names, and then delays for a short amount of time to give the reader threads a chance to run.

A pair of executors for the reader threads are now created along with a shared runnable for repeatedly dumping the names list. This runnable is submitted to each of the reader executors.

Lastly, main() invokes shutdown() on each executor to initiate an orderly shutdown of the executor as soon as it finishes.

About Names

Names is a simple class that demonstrates read-write locks. It first declares list-of-names, reentrant read-write lock, read-lock, and write-lock fields followed by a constructor that initializes these fields.

The void add(String threadName, String name) method is invoked by the writer thread to add a new name. The threadName argument is used to identify the writer thread (perhaps we might want to add more writer threads) and the name argument identifies the name to be added to the list.

This method first executes writeLock.lock(); to acquire the write lock and then outputs the number of threads waiting to acquire the read (0 to 2) or write (1) lock. After adding the name to the list, it executes writeLock.unlock(); to release the write lock.

The void dump(String threadName) method is similar to add() except for iterating over the list of names, outputting each name, and sleeping for a random amount of time.

Execute javac RWLockDemo.java to compile Listing 3. Then execute java RWLockDemo to run the application. On a Windows 7 platform, I observe something like the following output:

writer: num waiting threads = 0
reader1: num waiting threads = 1
reader2: num waiting threads = 0
reader2: A0
reader1: A0
reader2: num waiting threads = 0
reader2: A0
writer: num waiting threads = 1
reader2: num waiting threads = 1
reader2: A0
reader1: num waiting threads = 0
reader1: A0
reader2: A1
reader1: A1
writer: num waiting threads = 2
reader2: num waiting threads = 1
reader2: A0
reader1: num waiting threads = 0
reader1: A0
reader2: A1
reader1: A1
reader1: A2
reader2: A2
writer: num waiting threads = 2
reader2: num waiting threads = 1
reader2: A0
reader1: num waiting threads = 0
reader1: A0
reader2: A1
reader1: A1
reader2: A2
reader1: A2
reader2: A3
reader1: A3
writer: num waiting threads = 0
reader1: num waiting threads = 0
reader1: A0
reader1: A1
reader1: A2
reader1: A3
reader1: A4

The interleaved output for the reader threads demonstrates that the read lock may be held simultaneously by multiple reader threads.

1 2 3 4 Page 2
Page 2 of 4