Programming Java threads in the real world, Part 4

Condition variables and counting semaphores -- filling in a few chinks in Java's threading model

1 2 3 Page 2
Page 2 of 3
Listing 1: Using wait() and notify() for a condition variable
01 | import java.awt.*;
02 | import java.awt.event.*;
03 | 
04 | public class Input_source extends Frame
05 | {
06 |     int[]   text_has_been_entered = new int[1]; // The condition variable
07 | 
08 |     TextField input = new TextField();
09 | 
10 |     public Input_source()
11 |     {
12 |         input.addActionListener
13 |         (   new ActionListener()
14 |             {   public void actionPerformed( ActionEvent e )
15 |                 {   synchronized( text_has_been_entered )
16 |                     {   text_has_been_entered.notify();  // set the condition true
17 |                     }
18 |                 }
19 |             }
20 |         );
21 | 
22 |         add(input);
23 |         pack();
24 |         show();
25 |     }
26 | 
27 |     /** A blocking function that works like readLine(), but gets its
28 |      *  text from the current window's text area. The function doesn't
29 |      *  return until somebody types a line of text, whereupon it returns
30 |      *  the line. Returns null if the user types an empty line.
31 |      */
32 | 
33 |     synchronized String read_line() throws InterruptedException
34 |     {   synchronized( text_has_been_entered )
35 |         {   text_has_been_entered.wait(); // wait for the condition to become true
36 |         }
37 |         
38 |         String entered = input.getText();
39 |         input.setText("");
40 |         return (entered.length() == 0) ? null : entered;
41 | 
42 |     }
43 | 
44 | 
45 |     static public void main( String[] args ) throws Exception
46 |     {   Input_source source = new Input_source();
47 |         String input;
48 | 
49 |         while( (input = source.read_line()) != null )
50 |             System.out.println("Got: " + input );
51 | 
52 |         System.exit(0);         // kill the AWT Thread on exit
53 |     }
54 | }

But Wellington is coming

A subtle problem occurs when using Java's built-in condition variable, however. What if you call read_line() just after the value has changed rather than before? You'll just wait until the value is changed again, missing the first value entirely. The problem is that the built-in condition variable doesn't really have a notion of state associated with it. What we really need, to solve the current problem, is a true condition variable -- one that blocks only if we try to wait() when the condition is false, and that doesn't block at all if the condition is true.

Listing 2 shows a simple implementation of such a beast. There's not much to it -- the vast majority of the file is comments. The main thing is the _is_true flag declared at the top of the class. The _is_true flag stores the state of the condition variable. You can set the condition to true or false by calling set_true() or set_false(). You can test the current state without blocking by calling is_true() or is_false(). You can block, waiting for the condition to become true by calling wait_for_true(), which doesn't block at all if the condition happens to be true when you call it.

Listing 2: A condition-variable implementation
001 | package com.holub.asynch;
002 | 
003 | //-------------------------------------------------------
004 | // This code (c) 1998 Allen I. Holub. All rights reserved.
005 | //-------------------------------------------------------
006 | // This code may not be distributed by yourself except in binary form,
007 | // incorporated into a Java .class file. You may use this code freely
008 | // for personal purposes, but you may not incorporate it into any
009 | // commercial product without express permission of Allen I. Holub in writing.
010 | //-------------------------------------------------------
011 | 
012 | /**
013 |  *  This class implements a simple "condition variable." The notion
014 |  *  is that a thread waits for some condition to become true.
015 |  *  If the condition is false, then no wait occurs.
016 |  *
017 |  *  Be very careful of nested-monitor-lockout here:
018 |  * <PRE>
019 |  *   class lockout
020 |  *   {  Condition godot = new Condition(false);
021 |  *   
022 |  *      synchronized void f()
023 |  *      {   
024 |  *          some_code();
025 |  *          godot.wait_for_true();
026 |  *      }
027 |  *   
028 |  *      synchronized void set() // Deadlock if another thread is in f()
029 |  *      {   godot.set_true();
030 |  *      }
031 |  *   }
032 |  * </PRE>
033 |  *  You enter f(), locking the monitor, then block waiting for the
034 |  *  condition to become true. Note that you have not released the
035 |  *  monitor for the "lockout" object. [The only way to set godot true
036 |  *  is to call set(), but you'll block on entry to set() because
037 |  *  the original caller to f() has the monitor containing "lockout"
038 |  *  object.]
039 |  *  <p>Solve the problem by releasing the monitor before waiting:
040 |  * <PRE>
041 |  *   class okay
042 |  *   {  Condition godot = new Condition(false);
043 |  *   
044 |  *      void f()
045 |  *      {   synchronized( this )
046 |  *          {   some_code();
047 |  *          }
048 |  *          godot.wait_for_true();  // Move the wait outside the monitor
049 |  *      }
050 |  *   
051 |  *      synchronized void set()
052 |  *      {   godot.set_true();
053 |  *      }
054 |  *   }
055 |  * </PRE>
056 |  * or by not synchronizing the `set()` method:
057 |  * <PRE>
058 |  *   class okay
059 |  *   {  Condition godot = new Condition(false);
060 |  *   
061 |  *      synchronized void f()
062 |  *      {   some_code();
063 |  *          godot.wait_for_true();
064 |  *      }
065 |  *   
066 |  *      void set()              // Remove the synchronized statement
067 |  *      {   godot.set_true();
068 |  *      }
069 |  *  }
070 |  * </PRE>
071 |  * The normal wait()/notify() mechanism doesn't have this problem since
072 |  * wait() releases the monitor, but you can't always use wait()/notify().
073 |  */
074 | 
075 | 
076 | public class Condition
077 | {
078 |     private boolean _is_true;
079 | 
080 |     /** Create a new condition variable in a known state.
081 |      */
082 |     public Condition( boolean is_true ){ _is_true = is_true; }
083 | 
084 |     /** See if the condition variable is true (without releasing).
085 |      */
086 |     public synchronized boolean is_true()  { return _is_true; }
087 | 
088 |     /** Set the condition to false. Waiting threads are not affected.
089 |      */
090 |     public synchronized void set_false(){ _is_true = false; }
091 | 
092 |     /** Set the condition to true. Waiting threads are not released.
093 |      */
094 |     public synchronized void set_true() { _is_true = true; notifyAll(); }
095 | 
096 |     /** Release all waiting threads without setting the condition true
097 |      */
098 |     public synchronized void release_all(){ notifyAll(); }
099 | 
100 |     /** Release one waiting thread without setting the condition true
101 |      */
102 |     public synchronized void release_one(){ notify(); }
103 | 
104 |     /** Wait for the condition to become true.
105 |      *  @param timeout Timeout in milliseconds
106 |      */
107 |     public synchronized void wait_for_true( long timeout )
108 |                                             throws InterruptedException
109 |     {   if( !_is_true )
110 |             wait( timeout );
111 |     }
112 | 
113 |     /** Wait (potentially forever) for the condition to become true.
114 |      */
115 |     public synchronized void wait_for_true() throws InterruptedException
116 |     {   if( !_is_true )
117 |             wait();
118 |     }
119 | }

Listing 3 below is basically Listing 1 rewritten to use a Condition object. Now, a call to read_line() after the user enters the text works just fine because the condition will be in the true state, and wait_for_true() won't block. Notice that read_line() has to explicitly set the condition back to false after it has read the line.

There are still a few problems that have to be fixed to make this all work in the real world. For example, there's no way for a program to know if the user overtypes a string when nobody fetches the original string before it's overwritten. Input strings should be queued up as they come in and read_line() should return a string from the queue if there is one, blocking only if the queue is empty. Listing 3 serves to illustrate the problem at hand without addressing these other issues.

Listing 3: Using a Condition object
01 | import java.awt.*;
02 | import java.awt.event.*;
03 | import com.holub.asynch.Condition;
04 | 
05 | public class Input_source_fixed extends Frame
06 | {
07 |     Condition   text_has_been_entered = new Condition(false); // Initial condition is false
08 | 
09 |     TextField input = new TextField();
10 | 
11 |     public Input_source_fixed()
12 |     {
13 |         input.addActionListener
14 |         (   new ActionListener()
15 |             {   public void actionPerformed( ActionEvent e )
16 |                 {   text_has_been_entered.set_true();  // set the condition true
17 |                 }
18 |             }
19 |         );
20 | 
21 |         add(input);
22 |         pack();
23 |         show();
24 |     }
25 | 
26 |     /** A blocking function that works like readLine(), but gets its
27 |      *  text from the current window's text area. The function doesn't
28 |      *  return until somebody types a line of text, whereupon it returns
29 |      *  the line. Returns null if the user types an empty line.
30 |      */
31 | 
32 |     synchronized String read_line() throws InterruptedException
33 |     {
34 |         text_has_been_entered.wait_for_true();
35 |         text_has_been_entered.set_false();
36 | 
37 |         String entered = input.getText();
38 |         input.setText("");
39 |         return (entered.length() == 0) ? null : entered;
40 | 
41 |     }
42 | 
43 | 
44 |     static public void main( String[] args ) throws Exception
45 |     {   Input_source_fixed source = new Input_source_fixed();
46 |         String input;
47 | 
48 |         while( (input = source.read_line()) != null )
49 |             System.out.println("Got: " + input );
50 | 
51 |         System.exit(0);         // kill the AWT Thread on exit
52 |     }
53 | }

Counting semaphores

The other semaphore I want to look at this month is the "Djikstra" counting semaphore. This one has no direct analog in Java, so it's among the more useful of the com.holub.asynch classes.

Counting semaphores are used to keep track of the availability of a resource within a pool of limited size. For example, you might have four connections open to a database server that are simply recycled to perform multiple queries. This way, you won't incur the overhead of opening a connection every time you make a query. Threads seeking to make queries should block (should be suspended, waiting), if no connections are available. They should be reactivated (released from wait) when a connection becomes available. A counting semaphore can solve this problem (though other solutions -- such as a thread-safe stack with a pop method that blocks if the stack is empty -- are also possible).

Counting semaphores are initialized with a count -- typically the number of objects available in the pool. Every time you acquire the semaphore, the count is decremented; every time you release the semaphore, it's incremented. On acquisition, if the count (after the decrement) is non-0, nothing happens, and you get your slot in the pool. If the count is 0, however, the acquiring thread blocks until some other thread releases the semaphore, thereby incrementing the count.

Counting semaphores typically have maximum counts as well as initial counts. A semaphore initialized with a count of 0, but with a maximum of 10, is effectively saying that 10 objects can be in the pool, but none of them are available right now. A reverse-sensing semaphore (which I haven't implemented) is also occasionally useful. This one blocks unless the count is 0. It's useful if you need to acquire the entire pool before you can do anything useful, or if you need to do something when the pool becomes empty (such as add extra elements).

Listing 4, below, shows my Counting_semaphore implementation. It implements the Semaphore interface introduced last month, so slots in multiple pools can be acquired safely by using last month's lock-manager class. As was the case with the Condition class, the counting semaphore is built around Java's wait() and notify() methods. The acquire() method waits if enough slots aren't available; the release() method notifies any waiting threads when a slot becomes available. If multiple threads are waiting, they'll have to sort out amongst themselves which actually gets the slot -- all but one will go back to waiting, but you can't reliably predict which one will get the slot.

I've taken the coward's way out and haven't implemented a version of acquire() that lets you get multiple slots at once. You have to acquire slots one at a time. The problem is that the number of slots you need may become available one at a time, but they may also be grabbed by other threads before the total number you need becomes available. Being forced to acquire slots one at a time actually increases your odds of getting the total number of slots you need over the odds you'd get by waiting until the semaphore's internal count came up to the total. You'll be able to suck the slots up one at a time as they become available. I do have a version of release() that lets you free up multiple slots all at once, however.

Note that the "honor system" is in use in Figure 4. An individual slot doesn't have the concept of ownership associated with it. A thread is on its honor to free up only the slots it has previously acquired. A Counting_semaphore.Too_many_releases object will be thrown if a thread tries to bring the total available-slot count above the maximum, but a thread could still incorrectly release the wrong number of slots without triggering the exception toss.

Related:
1 2 3 Page 2
Page 2 of 3