Programming Java threads in the real world, Part 8

Threads in an object-oriented world, thread pools, implementing socket 'accept' loops

1 2 3 4 5 Page 2
Page 2 of 5

One solution to this problem is the Observer pattern discussed in March. (AWT's listeners, for example, are observers.) The method that finally catches the IOException must be a method that calls flush(), either directly or indirectly, so this method could simply register a listener and be notified when something goes wrong. In the current situation, a full-blown observer is something of an overkill, however. After all, there will only be one observer for a given I/O operation.

Take Command

A better solution to the error-notification problem is a different, though related, design pattern: Command. To quote the Gang-of-Four book's section titled "Applicability": "Commands are an object-oriented replacement for callbacks." In a procedural language, for example, a method like flush() could be passed a pointer to a function to call when something went wrong -- a "callback." The basic notion in Command is to encapsulate that callback method into an object.

This strategy is used for the error handler in the final version of our flush() method in Listing 1 (above). The Flush_error_handler interface (line 6) defines a method to call when an IOException is encountered on line 36. The caller of flush defines what to do on an error as follows:

flush(  new Flush_error_handler()
        {   public void error( IOException e )
            {   // the code that would normally handle the exception goes here.
                System.err.println("Flush error: " + e );
                System.exit(1);
            }
        }
    );

The error() method contains whatever code would normally be found in the exception handler. Note that error() is executed on the thread that detects the error, not on the thread that creates the error handler. Be careful of synchronization problems if error() accesses any fields outside of its own class.

Thread pools and blocking queues

The next problem with the one-thread-per-method problem is that we're creating an awful lot of threads on the fly as the program executes. Creating a thread is not a low-overhead operation. Many system calls are involved. (In Windows NT, for example, there is a 600-machine-cycle penalty imposed every time you enter the kernel.) A better strategy is to precreate a bunch of threads, and have them sitting around waiting for something to do. When it's time to do something, simply wake up one of these existing threads rather than creating one from scratch. There's also no reason why the same thread can't be recycled many times to perform the same (or different) operations. This strategy for thread management is called a thread pool.

Blocking queues

The first step in building a thread pool is coming up with a realistic implementation of the Notifying_queue discussed back in October. I've done that in Listing 2. The basic notion is that a thread that tries to dequeue from an empty queue will block until an object is put into the queue by a second thread.

The queue itself is the LinkedList, declared on line 8 of Listing 2.

The enqueue() operation just adds an Object to the end of the queue. The dequeue() operation removes whatever happens to be at the front. If the queue is empty, the dequeue() method waits (on line 32) until enqueue() notifies it that an element has been added (line 23). Note the use of a spin lock. When several threads are simultaneously blocked waiting to dequeue something, there's no way to predict which thread will actually get the object. Again, I discussed all this back in October.

Unlike the October example, here it's possible to close a Blocking_queue by calling close() (line 57). This method does two things: first, it sets the closed flag, which causes enqueue() to start throwing exceptions when it's called. Next close() releases all threads that are blocked on the current queue by issuing a notifyAll(); all these threads will then return from the wait() on line 32, and since closed will be true, will return from dequeue() with an exception toss.

Listing 2: /src/com/holub/asynch/Blocking_queue.java
001  
002  
003  
004  
package com.holub.asynch;
import java.util.*;
/***************************************************
 |    

This is a thread-safe queue that blocks automatically if you try to

dequeue from an empty queue. It's based on a linked list, so it will

never fill up. (You'll never block on a queue-full condition because

there isn't one.)

This class uses the LinkedList class, introduced into the

JDK at version 1.2 (aka Java 2). It will not work with earlier releases.

(c) 1999, Allen I. Holub.

You may not distributed this code except in binary form, incorporated

into a java .class file. You may use this code freely for personal

purposes, but you may not incorporate it into any commercial product

without my express permission in writing.

@author Allen

I. Holub

 */
005  
006  
007  
008  
009  
010  
public class Blocking_queue
{
    private LinkedList elements = new LinkedList();
    private boolean    closed   = false;
    /***********************************************
     |    

The Closed exception is thrown if you try to used an explicitly

closed queue. See

close

.

     */
011  
012  
013  
014  
015  
016  
017  
    public class Closed extends RuntimeException
    {   private Closed()
        {   super("Tried to access closed Blocking_queue");
        }
    }
    /***********************************************
     |    Enqueue an object
     **/
018  
019  
020  
021  
022  
023  
024  
025  
    public synchronized final void enqueue( Object new_element )
                                            throws Blocking_queue.Closed
    {   if( closed )
            throw new Closed();
        elements.addLast( new_element );
        notify();
    }
    /***********************************************
     |    

Dequeues an element; blocks if the queue is empty (until something

is enqueued). Be careful of nested-monitor lockout if you call this

function. You must ensure that there's a way to get something into

the queue that does not involve calling a synchronized method of

whatever class is blocked, waiting to dequeue something.

@see dequeue

@see enqueue

@return s

the dequeued object always

     **/
026  
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
    public synchronized final Object dequeue() throws InterruptedException,
                                                   Blocking_queue.Closed
    {   try
        {   
            while( elements.size() <= 0 )
            {   wait();
                if( closed )
                    throw new Closed();
            }
            return elements.removeFirst();
        }
        catch( NoSuchElementException e )   // Shouldn't happen
        {   throw new Error(
                    "Internal error (com.holub.asynch.Blocking_queue)");
        }
    }
    /***********************************************
     |    

The is_empty() method is inherently unreliable in a multithreaded

situation. In code like the following, it's possible for a thread

to sneak in after the test but before the dequeue operation and steal

the element you thought you were dequeueing.

   Blocking_queue queue = new Blocking_queue();
    //...
    if( !some_queue.is_empty() )
        some_queue.dequeue();

To do the foregoing reliably, you must synchronize on the queue as follows:

   Blocking_queue queue = new Blocking_queue();
    //...
    synchronized( queue )
    {   if( !some_queue.is_empty() )
            some_queue.dequeue();
    }

The same effect can be achieved if the test/dequeue operation

is done inside a synchronized method, and the only way to

add or remove queue elements is from other synchronized methods.

     */
044  
045  
046  
047  
048  
049  
050  
051  
052  
053  
054  
055  
056  
057  
058  
059  
060  
061  
     public synchronized final boolean is_empty()
     {  return elements.size() > 0;
     }
     /* Releasing a blocking queue causes all threads that are blocked
      * [waiting in dequeue() for items to be enqueued] to be released.
      * The dequeue() call will throw a Blocking_queue.Closed runtime
      * exception instead of returning normally in this case.
      * Once a queue is closed, any attempt to enqueue() an item will
      * also result in a Blocking_queue.Closed exception toss.
      */
     public synchronized void close()
     {  closed = true;
        notifyAll();
     }
}

Pooling threads

Armed with our blocking queue, we can now implement the Thread_pool class (Listing 3). A Thread_pool is an extension of Thread_group whose threads execute arbitrary actions. That is, you ask the pool to execute some action for you on one of its threads. The request returns immediately; the action goes on in the background. You create a thread pool like this:

    Thread_pool pool = new Thread_pool(initial_thread_count, maximum_thread_count);

The pool initially has initial_thread_count threads in it, poised with bated breath, ready to leap into action at a moment's notice. If all of the threads in the pool are busy when you ask the pool to do something for you, additional threads are created up to a maximum of maximum_thread_count.

You tell the pool to do something by calling execute():

    pool.execute(   new Runnable()          // print on another thread
                    {   public void run()
                        {   System.out.println("Hello World");
                        }
                    }
                );

Looking at the implementation, the Thread_pool constructor (Listing 3, line 34) simply creates a bunch of Pooled_thread objects (each of which extends Thread) and starts up the associated threads. Skipping up to the Pooled_thread definition, line 16, these threads' run() methods enter their main execution loop, and then block on a Blocking_queue (pool, defined on line pool). The implementation of execute() (line 54) just passes the Runnable objects to the threads in the pool by enqueueing the Runnable object on the Blocking_queue. The enqueue operation causes one of the waiting Pooled_thread objects to wake up, dequeue the Runnable object, execute its run() method, and then go back to sleep.

This use of the Runnable object is another example of the Command pattern. The Runnable Command object defines a method (run()) that's executed on some thread in the pool.

Listing 3: /src/com/holub/asynch/Thread_pool.java
001  
002  
003  
package com.holub.asynch;
import  com.holub.asynch.Blocking_queue;
/**
 |    

A generic implementation of a thread pool. Use it like this:

Thread_pool pool = new Thread_pool();
pool.execute
(   new Runnable()
{   public void run()
{   // execute this function on an existing
// thread from the pool.
}
}
);

The size of the thread pool can expand automatically to accommodate

requests for execution. That is, if a thread is available in the

pool, it's used to execute the Runnable object, otherwise a new

thread can be created (and added to the pool) to execute the request.

A maximum count can be specified to limit the number of threads in

the pool, however. Each thread pool also forms a thread group (all

threads in the pool are in the group). In practice this means that

the security manager controls whether a thread in the pool can access

threads in other groups, but it also gives you an easy mechanism to

make the entire group a daemon.

(c) 1999, Allen I. Holub.

You may not distribute this code except in binary form,

incorporated into a java .class file. You may use this code freely

for personal purposes, but you may not incorporate it into any

commercial product without securing my express permission in writing.

  */
004  
005  
006  
007  
008  
009  
010  
011  
012  
013  
014  
public class Thread_pool extends ThreadGroup
{
    private final Blocking_queue pool         = new Blocking_queue();
    private /*final*/ int        maximum_size;
    private           int        pool_size;
    private           boolean    has_closed   = false;
    private static int  group_number  = 0;
    private static int  thread_id     = 0;
    /**********************************************
     |    

These are the objects that wait to be activated. They are

typically blocked on an empty queue. You post a Runnable

object to the queue to release a thread, which will execute

the run() method from that object. All pooled-thread objects

will be members of the thread group that comprises the thread pool.

     */
015  
016  
017  
018  
019  
020  
021  
022  
023  
024  
025  
026  
027  
028  
029  
030  
031  
032  
      private class Pooled_thread extends Thread
    {   
        public Pooled_thread()
        {   super( Thread_pool.this, "T" + thread_id );
        }
        public void run() 
        {   try
            {   while( !has_closed )
                {   ((Runnable)( pool.dequeue() )).run();
                }
            }
            catch(InterruptedException  e){/* ignore it, stop thread */}
            catch(Blocking_queue.Closed e){/* ignore it, stop thread */}
        }
    }
    /**********************************************
     |    

Create a thread pool with initial_thread_count threads in it. The

pool can expand to contain additional threads if they are needed.

@param initial_thread_count The initial thread count. If the initial count is greater than

the maximum, it is silently truncated to the maximum.

@param maximum_thread_count specifies the maximum number of threads that can be in

the pool. A maximum of 0 indicates that the pool will be permitted to grow indefinitely.

     */
1 2 3 4 5 Page 2
Page 2 of 5