Programming Java threads in the real world, Part 4

Condition variables and counting semaphores -- filling in a few chinks in Java's threading model

This month's column adds a few more classes to the threading arsenal we started building in last month's Java Toolbox column. This time I'll discuss the following:

  1. Roll-your-own versions of the condition variable, which replaces wait() and notify() in some situations

  2. Djikstra's "counting" semaphore, which is used to manage pools of resources

(A semaphore is any of several mechanisms used to synchronize and communicate between threads. Think of the "semaphore" as in the flags that two boy scouts use to talk to each other from a distance -- different flag positions represent different letters of the alphabet. Napoleon's army used the vanes of windmills on mountain tops to send semaphore messages great distances very quickly. The mutex discussed last month, since it's a communications mechanism, is also a "semaphore.")

The condition variable can often be simulated using Java alone -- and I'll show you how -- while the counting semaphore can't.

Before I start, though, I'd like to go over a few loose ends from last month and fix a few bugs.

Oops! Could my code have bugs in it?

Before leaping into this month's meat (now there's a metaphor you can sink your teeth into), let's look at a few loose ends that either caught my eye (or were brought to my attention) after last month's article went live.

During one of my book's "peer reviews," an academic reviewer once took exception to the sentence "if you're anything like me, you'll forget to ... so you should write your code to do it automatically." His comment to me was: "I would never admit that in print." This guy was (and as far as I know, still is) a tenured professor at an Ivy League university, and I suppose his comment was correct in a literal sense: since he never had written any actual code, he never had any bugs to admit. I might as well say it up front, though: my code contains an occasional bug (gasp). Consequently, I expect an "Oops" section or its moral equivalent to become a regular feature of this column. There's nothing like having 100,000 people look over your code for problems to emerge and be highlighted.

Joe Bowbeer pointed out (quite correctly):

Why not advise the use of try/finally to prevent an exception from gunking up the works?

{   doit();
{   mutex.release();

I prefer the [above] form of try/finally because it separates the exceptions that might occur in changing the state (acquire) from the exceptions that might occur when working in the new state (doit).

The other (more intuitive?) form is

{   mutex.acquire();
{   mutex.release();

This requires more programming in release() to ensure that mutex is in a consistent state: if release() is called after an exception in acquire(), the mutex may not have been acquired, or [may have been] half-acquired, etc.

I should add that Joe's last point is important in the case of last month's Mutex class. The acquire_without_blocking() method, where the actual acquisition occurs, doesn't throw any exceptions at awkward times. The only exception that can be thrown from acquire(), in fact, is an InterruptedException, thrown if a timeout is specified and the waiting thread is interrupted. This operation does not leave the mutex in an unstable state, however.

Be that as it may, while looking at my code to make sure Joe hadn't found a bug, I found a bug myself. (I'll show you the code in a moment, but let's discuss the problems in it first.) The acquire() method was using a standard spin lock while waiting to acquire the mutex, but this strategy is appropriate only when the timeout is infinitely large. I've fixed the problem by making the loop terminate for timeout values less than Integer.MAX_VALUE (the value I use for "forever"). It continues to use a spin lock in "forever" cases.

While I was at it, I also decided to have acquire() indicate whether or not it had timed out. The choices here are the usual ones: a return value or an exception toss. I opted for the latter because a timeout typically is an error condition, and I didn't want to clutter up the code with unnecessary tests for false return values.

I modified the definition for the Semaphore interface to incorporate this new exception:

01 | package com.holub.asynch;
02 | 
03 | interface Semaphore
04 | {
05 |     int  id     ();
06 |     void acquire(long timeout)  throws InterruptedException,
07 |                                        Timed_out;
08 |     void release();
09 | 
10 |     public static class Timed_out extends java.lang.RuntimeException
11 |     {   Timed_out(){ super("Timed out while waiting to acquire semaphore"); };
12 |     }
13 | }

Note that Semaphore.Timed_out is a RuntimeException, so you don't have to catch it if the timeout is a fatal error (often the case).

The new (and this time, I hope, correct) version of acquire() now looks like this:

01 | public synchronized void acquire( long timeout ) throws InterruptedException
02 | {   
03 |     if( timeout == 0 )                      // don't wait at all
04 |     {   acquire_without_blocking();
05 |     }
06 |     else if( timeout == Long.MAX_VALUE )    // wait forever
07 |     {   while( !acquire_without_blocking() ) 
08 |             this.wait( timeout );
09 |     }
10 |     else                                    // wait limited by timeout
11 |     {   if( !acquire_without_blocking() )
12 |         {   this.wait( timeout );
13 |             if( !acquire_without_blocking() )
14 |                 throw new Semaphore.Timed_out();
15 |         }
16 |     }
17 | }

Finally, in last month's column, I inadvertently used an outdated version of the Lock_manager's comparator class. (It threw a Not_Comparable exception -- an artifact of my own sort implementation, which I abandoned when Java added an official one.) Anyway, the comparator class should look like this:

private static class Lock_comparator implements Comparator
{   public int compare(Object a, Object b)
    {   return ((Semaphore)a).id() - ((Semaphore)b).id();
    public boolean equals(Object obj)
    {   return obj instanceof Lock_comparator;

I've modified the code in the "Goodies" section of my Web site (see Resources) to incorporate all these changes.

Now, on to the meat of this month's article.

Condition variables

Forging ahead -- to boldly split infinitives no one has split before:

I've brought up "condition variables" before in the context of wait() and notify(). The central concept is that a thread will wait until some condition becomes true. For example, a thread may need to wait for somebody to push a button before proceeding with an action, or a thread may wait for something to appear in an empty queue (for the queue-not-empty condition to become true).

Using a condition variable to wait for events

The following code illustrates a classic problem that is easily solved with a condition variable: How do I wait for an event to occur without wasting machine cycles in a polling loop? The code sets up a simple TextField and an ActionListener that's notified when the user types the Enter key:

01 | class Some_class extends Frame
02 | {
03 |     TextField input = new TextField();
04 |     String    entered = "";
05 | 
06 |     public The_wrong_way()
07 |     {   input.addActionListener
08 |         (   new ActionListener()
09 |             {   public void actionPerformed( ActionEvent e )
10 |                 {   entered = input.getText();
11 |                 }
12 |             }
13 |         );
14 | 
15 |         add(input);
16 |         pack();
17 |         show();
18 |     }
19 | 
20 |     String read_line(){ return entered; }
21 |     //...
22 | }

So far, so good, but let's look at the situation in more detail.

When you display the Frame, AWT fires up a thread that monitors events coming in from the operating system, including key-press events. When the Enter key is pressed, for example, the AWT thread gets the key-press event from the OS and, in turn, calls the listener's actionPerformed() method. The "actionPerformed()" messages are coming in asynchronously from the AWT event-loop thread. Put another way: the "actionPerformed()" message is actually running on that AWT thread.

Meanwhile, a user thread (as differentiated from the AWT thread) calls read_line() to find out what the user has typed. The problem is that both the AWT and the user thread can access the entered field simultaneously -- a classic race condition. The second thread could call read_line() while the AWT thread is in the middle of ActionPerformed() and end up reading garbage.

Solve this first problem with synchronization:

01 | class Some_class extends Frame
02 | {
03 |     TextField input = new TextField();
04 |     String    entered = "";
05 | 
06 |     public The_wrong_way()
07 |     {   input.addActionListener
08 |         (   new ActionListener()
09 |             {   public void actionPerformed( ActionEvent e )
10 |                 {   synchronized( Some_class.this )
11 |                     {    entered = input.getText();
12 |                     }
13 |                 }
14 |             }
15 |         );
16 | 
17 |         add(input);
18 |         pack();
19 |         show();
20 |     }
21 | 
22 |     String synchronized read_line(){ return entered; }
23 |     //...
24 | }

Note that the inner-class method has to synchronize explicitly on the outer-class object. Simply synchronizing actionPerformed() doesn't work because you'll be synchronizing on the monitor of the anonymous inner-class object, and the field you want to guard is in the outer-class object.

Moving on, our user thread needs to know when an entire line has been typed to be sure that read_line() will return a complete line of input, but (and this is the big but), there's no direct communication between the two threads involved in this transaction. The code running on the AWT thread (actionPerformed()) doesn't tell the user thread that an entire-line-has-been-typed event has occurred.

So how does the caller of read_line() know the string has changed? It could sit in a tight polling loop calling read_line() and checking the current return value against the previously returned value, but that's an awful lot of machine cycles wasted on doing nothing useful.

Send in the cavalry

So what's one way for two threads to communicate with each other? (That's a rhetorical question.) Use a semaphore (think Napoleon, flags, mountain tops). To the rescue comes the semaphore known as a condition variable. To rehash from previous months' material: the basic notion of a condition variable is that some thread waits (is suspended) on the condition variable until the condition it represents becomes true. Every Java object has a condition variable associated with it -- in the same way it has the mutex used to guard the monitor. You wait for the condition to become true by calling wait(), and you set the condition to true by calling notify(). (The notify() call doesn't work in quite this way; I'll talk more about this in a moment.) It's easy enough to do a roll-your-own condition variable that solves the current thread-communication problem by using wait() and notify().

Listing 1 demonstrates how to do this. A condition variable called text_has_been_entered is declared up at the top of the class definition. (We're going to wait for the text-has-been-entered condition to become true.) The actionPerformed() method doesn't read the text at all; rather, it simply notifies the condition variable, setting the condition to true. Note that Java requires you to be in the monitor for an object before you can call wait() or notify() on that object, so the synchronized(text_has_been_entered) statement is mandatory. (You must be holding the lock associated with the object, by being in either a synchronized function of that object's class or a standalone synchronized statement whose argument is the object on which you're synchronizing.)

The synchronized(text_has_been_entered) statement on line 15 is mandatory, since entering the synchronized block puts us into the monitor of the object referenced by text_has_been_entered.

Meanwhile, down in read_line() on line 33, the thread that calls read_line() is waiting for the condition to become true. When this happens, the new text value is read and returned. The read_line() method is itself synchronized so that two threads can't attempt to read the same line simultaneously. It's now possible to have a simple loop like the one in main()

while( (input = source.read_line()) != null )
System.out.println("Got: " + input );

which blocks until a new input line arrives.

1 2 3 Page 1
Page 1 of 3