Newsletter sign-up
View all newsletters

Enterprise Java Newsletter
Stay up to date on the latest tutorials and Java community news posted on JavaWorld

Sponsored Links

Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs

Programming Java threads in the real world, Part 4

Condition variables and counting semaphores -- filling in a few chinks in Java's threading model

  • Print
  • Feedback

Page 7 of 7

Counting semaphores typically have maximum counts as well as initial counts. A semaphore initialized with a count of 0, but with a maximum of 10, is effectively saying that 10 objects can be in the pool, but none of them are available right now. A reverse-sensing semaphore (which I haven't implemented) is also occasionally useful. This one blocks unless the count is 0. It's useful if you need to acquire the entire pool before you can do anything useful, or if you need to do something when the pool becomes empty (such as add extra elements).

Listing 4, below, shows my Counting_semaphore implementation. It implements the Semaphore interface introduced last month, so slots in multiple pools can be acquired safely by using last month's lock-manager class. As was the case with the Condition class, the counting semaphore is built around Java's wait() and notify() methods. The acquire() method waits if enough slots aren't available; the release() method notifies any waiting threads when a slot becomes available. If multiple threads are waiting, they'll have to sort out amongst themselves which actually gets the slot -- all but one will go back to waiting, but you can't reliably predict which one will get the slot.

I've taken the coward's way out and haven't implemented a version of acquire() that lets you get multiple slots at once. You have to acquire slots one at a time. The problem is that the number of slots you need may become available one at a time, but they may also be grabbed by other threads before the total number you need becomes available. Being forced to acquire slots one at a time actually increases your odds of getting the total number of slots you need over the odds you'd get by waiting until the semaphore's internal count came up to the total. You'll be able to suck the slots up one at a time as they become available. I do have a version of release() that lets you free up multiple slots all at once, however.

Note that the "honor system" is in use in Figure 4. An individual slot doesn't have the concept of ownership associated with it. A thread is on its honor to free up only the slots it has previously acquired. A Counting_semaphore.Too_many_releases object will be thrown if a thread tries to bring the total available-slot count above the maximum, but a thread could still incorrectly release the wrong number of slots without triggering the exception toss.

Listing 4: A counting semaphore
001 | //-------------------------------------------------------
002 | // This code (c) 1998 Allen I. Holub. All rights reserved.
003 | //-------------------------------------------------------
004 | // This code may not be distributed by yourself except in binary form,
005 | // incorporated into a java .class file. You may use this code freely
006 | // for personal purposes, but you may not incorporate it into any
007 | // commercial product without express permission of Allen I. Holub in writing.
008 | //-------------------------------------------------------
009 | 
010 | package com.holub.asynch;
011 | 
012 | import com.holub.tools.Comparable;
013 | import com.holub.asynch.Semaphore;
014 | import com.holub.asynch.Lock_manager;
015 | 
016 | class Counting_semaphore implements Semaphore
017 | {
018 |     private int         count;
019 |     private final int   max_count;
020 |   
021 |     /*****************************************************************
022 |      *  Create a counting semaphore with the specified initial and
023 |      *  maximum counts. release(), which increments the count, is not
024 |      *  permitted to increment it past the maximum. If the initial_count
025 |      *  is larger than the max_count, it is silently truncated.
026 |      *
027 |      *  @see release
028 |      */
029 |     Counting_semaphore( int initial_count, int max_count )
030 |     {
031 |         this.max_count  = max_count;
032 |         this.count      = (initial_count > max_count)
033 |                                     ? max_count : initial_count ;
034 |     }
035 | 
036 |     /*****************************************************************
037 |      *  Create a counting semaphore with a maximum count of
038 |      *  Integer.MAX_VALUE
039 |      */
040 |     Counting_semaphore( int initial_count )
041 |     {   this( initial_count, Integer.MAX_VALUE );
042 |     }
043 | 
044 |     /*****************************************************************
045 |      *  Required override of Semaphore.id(). Don't call this function.
046 |      *  @see Lock_manager
047 |      */
048 | 
049 |     public        int  id() { return _id; }
050 |     private final int  _id = Lock_manager.new_id();
051 | 
052 |     /*****************************************************************
053 |      *  Acquire the semaphore, decrementing the count. Block if the
054 |      *  count goes to zero. Bug: It's possible in some situations
055 |      *  for the timeout to be exceeded.
056 |      *
057 |      *  <p>I have deliberately not implemented a variant that allows
058 |      *  acquisition of multiple slots in a single call because it's not
059 |      *  clear what you'd do if all requested slots aren't available.
060 |      *
061 |      *  @throws InterruptedException if interrupted while waiting
062 |      *          for the semaphore.
063 |      *  @return true if we got the slot.
064 |      */
065 |     public synchronized void acquire(long timeout)
066 |                                             throws InterruptedException
067 |     {   while( count <= 0 )
068 |             this.wait( timeout );
069 |         --count;
070 |     }
071 | 
072 |     /*****************************************************************
073 |      *  Release the semaphore and increment the count.
074 |      *  This one is the generic release required by the Semaphore
075 |      *  interface, so all it can do is throw an exception if
076 |      *  there's an error.
077 |      *  @throws Counting_semaphore.TooManyReleases (a RuntimeException)
078 |      *      if you try to release a semaphore whose count is already
079 |      *      at the maximum value.
080 |      */
081 |     public synchronized void release(){ release(1); }
082 | 
083 |     /*****************************************************************
084 |      *  Release "increment" slots in the semaphore all at once.
085 |      *  @param increment The amount to increment the count.
086 |      *      If this value is zero, the current count is returned and
087 |      *      no threads are released.
088 |      *  @throws Counting_semaphore.TooManyReleases (a RuntimeException)
089 |      *      if the current value + count is greater than the maximum.
090 |      *      The semaphore will not have been modified in this case.
091 |      *  @return the value of the count after the increment is added.
092 |      */
093 |     public synchronized int release( int increment )
094 |     {
095 |         int current_count = count;
096 |         int new_count     = count + increment;
097 | 
098 |         if( new_count > max_count )
099 |             throw new TooManyReleases();
100 | 
101 |         count = new_count;
102 |         if( current_count == 0 && count > 0 )
103 |             notifyAll();
104 | 
105 |         return count;
106 |     }
107 | 
108 |     /** Thrown if you try to release more than the maximum number
109 |      * of slots.
110 |      */
111 |     public static class TooManyReleases extends RuntimeException    
112 |     {   private TooManyReleases()
113 |         {   super("Released semaphore that was at capacity");
114 |         }
115 |     }
116 | 
117 |     /*****************************************************************
118 |      *  A semaphore-specific release function, returns an error status
119 |      *  if the count would go past the initially specified maximum.
120 |      *  @returns true if the semaphore was successfully released, false
121 |      *  otherwise.
122 |      */
123 |     public synchronized boolean notifying_release()
124 |     {
125 |         if( count >= max_count )
126 |             return false;
127 | 
128 |         if( ++count == 1 )
129 |             notifyAll();
130 |         return true;
131 |     }
132 | }   


Listing 4 is pretty useful as it stands, but there's one variations on the counting-semaphore theme that deserves mention. I sometimes use what I call a "reverse sensing" semaphore. This one blocks until the pool is empty. In the pooled-database-connection example I discussed earlier, a thread that created new database connections might wait on this reverse-sensing semaphore, waking up when it needed to open additional connections. I'm sure you can think of other variations as well.

Wrapping up

So that's it for this month. We've seen how to use the built-in condition variable to communicate between threads generally (and the AWT event thread in particular). We've also seen that, though you can do a lot of what a condition variable does just by using wait() and notify(), you can't do everything. The Condition class adds the essential ability to not wait when the condition is already true. The Counting_semaphore isn't implemented as a Java primitive at all, so it can be particularly useful when you really need it to manage pooled resources.

Next month's column will continue on the current theme, presenting a Timer class that makes it easy to fire events on a regular schedule. The Timer implementation also demonstrates how to write code to suspend, resume, and stop threads without using the (now deprecated) suspend(), resume(), and stop() methods. :END_BODY

About the author

Allen Holub has been working in the computer industry since 1979. He is widely published in magazines (Dr. Dobb's Journal, Programmers Journal, Byte, MSJ, among others). He has seven books to his credit, and is currently working on an eighth that will present the complete sources for a Java compiler written in Java. Allen abandoned C++ for Java in early 1996 and now looks at C++ as a bad dream, the memory of which is mercifully fading. He's been teaching programming (first C, then C++ and MFC, now OO-Design and Java) both on his own and for the University of California Berkeley Extension since 1982. Allen offers both public classes and in-house training in Java and object-oriented design topics. He also does object-oriented design consulting. Get more information and contact Allen via his Web site http://www.holub.com.
  • Print
  • Feedback

Resources
  • All the real code discussed in this article (the stuff in the com.holub.asynch package) is available in the "Goodies" section on my Web site. The version on the Web site should be considered the definitive version -- at least it corrects any bugs I know about. http://www.holub.com