Programming Java threads in the real world, Part 8

Threads in an object-oriented world, thread pools, implementing socket 'accept' loops

One of the biggest problems associated with thread use in an object-oriented environment is a conceptual one: though procedural programmers naturally think about the flow of control from function to function as the system works, object-oriented designers focus on the message flow within an individual scenario or use case. The traditional view of threading, however, concerns itself entirely with flow of control. As a consequence, object-oriented designers typically don't think about threads -- at least not until they get down to the very low-level implementation part of the design; rather, they think about two categories of messages: synchronous messages that don't return until they're done doing whatever they do, and asynchronous messages, which initiate some background operation and return immediately. This month's column (along with next month's) will address such issues by showing you how to reconcile these two points of view and implement object-oriented-style threading using Java's essentially procedural implementation of threads.

Java's implementation of threading, in fact, complicates matters by using a misleading metaphor for threads. You have to create a thread by deriving from Thread, which leads many a novice Java programmer to the erroneous belief that all the methods of the Thread derivative will run on that thread. In fact, a method of a Thread derivative is just like any other method: it runs on a thread only if called directly or indirectly from that thread's run() method. Objects do not run on threads; methods do.

It's sometimes difficult to predict any control flow in an object-oriented system. In a procedural system, one procedure just calls another, beginning at a single starting point. If there are many threads, there are many paths through the code, but each path starts at a known place, and the control flow through a given thread is predictable (though sometimes not easily so). Object-oriented systems are another matter. Object-oriented systems tend to be networks of cooperating objects, communicating with one another via some message-passing system. The system's "main" method may well do nothing but create a bunch of objects, hook them up to each other, and then terminate. Flow of control is very difficult to predict in this system.

Synchronous vs. asynchronous messages

As I mentioned earlier, an object-oriented designer looks at the world in terms of objects and messages. Objects pass messages to each other, and the receipt of some message causes an appropriate message-handler -- a Java method -- to be executed. Most of these messages are synchronous: their handlers don't return until they're finished doing what they do. Other messages are asynchronous: the handler returns immediately, before the requested operation completes. Meanwhile, work is going on in the background to satisfy the original request. A good example of an asynchronous message in Java is Toolkit.getImage(), which initiates the process of fetching an image and then returns immediately, long before the actual image arrives.

The broad categories of messages (synchronous and asynchronous) can themselves be subdivided in various ways. For example, a balking message is one that can't even be initiated. Imagine that you could only open a limited number of database connections at a given moment, and that all the connections were in use. A message that required access to the database could "balk" if it couldn't get the connection. It isn't that it tried to do the operation and failed; rather, it couldn't even initiate the operation to give it a chance to fail.

Another variant on synchronous messages is a timeout. Rather than balking immediately, the method decides to wait for a predetermined amount of time for the resource to become available. If that time expires, the request will fail. (The operation probably never started, but if the operation indeed started, it certainly didn't complete successfully.) In Java, a read from a socket can timeout in this way.

The problem isn't design; it's implementation. Designing asynchronous systems in an object-oriented way isn't particularly difficult. Object-oriented-design notations such as UML (the "Universal Modeling Language") can easily capture notions such as synchronous and asynchronous messages. Implementing these notions in the essentially procedural system mandated by the Java threading model is another matter, however.

The thread-per-method solution

Given an object-oriented design perspective -- a network of objects communicating via messages -- what's the best way to implement an asynchronous message? The most naive way, which is workable in simple situations, is for each asynchronous-message handler to spawn its own thread.

First, let's consider the following synchronous method, which flushes an internal buffer out to a file. (The Reader_Writer lock was discussed last month.)

 import com.holub.asynch.Reader_writer; import
java.io.*;
class Synchronous_flush
{
    private final OutputStream  out;
    private Reader_writer       lock = new Reader_writer();
    private byte[]              buffer;
    private int                 length;
    public Synchronous_flush( OutputStream out )
    {   this.out = out;
    }
    //...
    synchronized void flush( ) throws IOException
    {   try
        {   lock.request_write();
            out.write( buffer, 0, length );
            length = 0;
        }
        finally
        {   lock.write_accomplished();
        }
    }
}

This blocking version of flush() presents several problems. For one thing, flush() can block indefinitely while waiting to acquire the reader/writer lock. Moreover, if the OutputStream was a socket connection rather than a file, the write operation itself could take a long time to do. Finally, since flush() is synchronized, the entire object is locked while the flush is in progress, so any thread that tries to call any other synchronized method of Synchronous_flush will block until the flush() completes. This wait could turn into a nested-monitor-lockout situation should the lock not be released.

These problems can be solved by making flush() asynchronous; the flush() method should simply initiate the flush operation and then return immediately. Here's an initial (yet, as you'll soon see, not very successful) attempt:

import com.holub.asynch.Reader_writer;
import java.io.*;
class Asynchronous_flush
{
    private OutputStream    out;
    private Reader_writer   lock = new Reader_writer();
    private byte[]          buffer;
    private int             length;
    //...      synchronized void flush( )
    {   new Thread()
        {   public void run() 
            {   try
                {   lock.request_write();
                    out.write( buffer, 0, length );
                    length = 0;
                }                catch( IOException e )
                {   // ignore it.
                }
                finally
                {   lock.write_accomplished();
                }
            }       }.start();
    }
}

I've wrapped the former contents of the flush() method inside the run() method of an anonymous inner class that extends Thread. Now flush() does nothing but fire off the thread and return. This simple strategy can work for simple situations, but unfortunately it doesn't work here. Let's analyze the problems one at a time.

The main problem is that the write operation is no longer thread-safe. Simply synchronizing the flush() method locks the object only while we're in the flush() method, which isn't for very long. The actual write() operation is performed on its own thread long after flush() has returned, and the buffer may have been modified several times in the interim (or even worse, may be modified while the write is in progress). A possible solution to the synchronization problem is to make a copy of the buffer while we're synchronized, and then work from the copy when inside the (unsynchronized) auxiliary thread. The only time synchronization is necessary is while we're actually making the copy.

Because it's so easy, it would be nice if we could implement this strategy like this:

    synchronized void flush( )
    {   
        byte[] copy = buffer.clone();
        length = 0;
        new Thread()
        {   public void run()
            {   try
                {   lock.request_write();                   out.write( copy, 0, length );
                }
                catch( IOException e )
                {   // ignore it.
                }
                finally
                {   lock.write_accomplished();
                }
            }
        }.start();
    }

But this code doesn't even compile. Remember that the inner-class object -- the anonymous Thread derivative -- exists long after the method returns. Consequently, the local variables of the method can't be used by the thread (unless they're final, which, in this case, they aren't) simply because they won't exist any more; they're destroyed when flush() returns. We can copy local variables into the thread object, however.

Listing 1 solves most of these problems by using the copy strategy I just discussed. The strange-looking thing on line 24 is an "instance initializer" for the inner class. Think of it syntactically as a static initializer that isn't static -- a sort-of metaconstructor. The code in the instance initializer is effectively copied into all constructors, including the compiler-generated "default" constructor, above any code specified in the constructor itself. That is, if you have both an instance initializer and a constructor, the code in the instance initializer executes first. (The one exception to this rule is that the instance initializer is not copied into any constructor that calls another constructor using the this(optional_args) syntax. This way the code in the instance initializer is executed only once.) The syntax is pretty ugly, but there it is.

An alternative to the solution in Listing 1 would be to encapsulate the code from the instance initializer in a nonstatic method, and then call it when initializing the field:

new Thread()
{   int     length; 
    byte[]  copy = init();
    private void init()
    {   length        = Flush_example.this.length;
        byte[] copy   = new byte[length];
        System.arraycopy(Flush_example.this.buffer, 0, copy, 0, length);
        Flush_example.this.length = 0;
        return copy;
    }
    //...
}

This isn't much of an improvement over the instance initializer in clarity, and initializing length as a side effect of the init() call is particularly hideous. I've used arraycopy() rather than clone because I didn't want to mess with the CloneNotSupportedException. Exceptions are not allowed to propagate out of instance initializers.

Whatever method we use for initialization, the inner-class's construction happens in the new() on line 20 of Listing 1, while the outer-class object is locked, so the copy operation is thread-safe. The newly-created thread then acquires the writer lock and writes to the file in its own good time, using the copy for this purpose.

Listing 1: Flush_example.java
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
import com.holub.asynch.Reader_writer;
import java.io.*;
class Flush_example
{
    public interface Flush_error_handler
    {   void error( IOException e );
    }
    private final OutputStream  out;
    private Reader_writer       lock = new Reader_writer();
    private byte[]              buffer;
    private int                 length;
    public Flush_example( OutputStream out )
    {   this.out = out;
    }
    //...
    synchronized void flush( final Flush_error_handler handler )
    {   new Thread()
        {   int     length; 
            byte[]  copy;
            {   length = Flush_example.this.length;
                copy   = new byte[length];
                System.arraycopy(Flush_example.this.buffer, 0, copy, 0, length);
                Flush_example.this.length = 0;
            }
            public void run()
            {   try
                {   lock.request_write();
                    out.write( copy, 0, length );
                }
                catch( IOException e )
                {   handler.error(e);
                }
                finally
                {   lock.write_accomplished();
                }
            }
        }.start();
    }
}

An exceptional problem

The next perplexing issue is what to do with the IOException. Back in the original version of the code, the exception propagated out of the flush() method to whomever called flush(). We can't do that here, because there's nobody to propagate it to -- if you start backtracking down the call stack, you'll end up back in run(), but you didn't call run(); the system did when it fired up the thread. Simply ignoring the write error, as I've been doing, isn't a good strategy for obvious reasons.

One solution to this problem is the Observer pattern discussed in March. (AWT's listeners, for example, are observers.) The method that finally catches the IOException must be a method that calls flush(), either directly or indirectly, so this method could simply register a listener and be notified when something goes wrong. In the current situation, a full-blown observer is something of an overkill, however. After all, there will only be one observer for a given I/O operation.

Take Command

A better solution to the error-notification problem is a different, though related, design pattern: Command. To quote the Gang-of-Four book's section titled "Applicability": "Commands are an object-oriented replacement for callbacks." In a procedural language, for example, a method like flush() could be passed a pointer to a function to call when something went wrong -- a "callback." The basic notion in Command is to encapsulate that callback method into an object.

This strategy is used for the error handler in the final version of our flush() method in Listing 1 (above). The Flush_error_handler interface (line 6) defines a method to call when an IOException is encountered on line 36. The caller of flush defines what to do on an error as follows:

flush(  new Flush_error_handler()
        {   public void error( IOException e )
            {   // the code that would normally handle the exception goes here.
                System.err.println("Flush error: " + e );
                System.exit(1);
            }
        }
    );

The error() method contains whatever code would normally be found in the exception handler. Note that error() is executed on the thread that detects the error, not on the thread that creates the error handler. Be careful of synchronization problems if error() accesses any fields outside of its own class.

Thread pools and blocking queues

The next problem with the one-thread-per-method problem is that we're creating an awful lot of threads on the fly as the program executes. Creating a thread is not a low-overhead operation. Many system calls are involved. (In Windows NT, for example, there is a 600-machine-cycle penalty imposed every time you enter the kernel.) A better strategy is to precreate a bunch of threads, and have them sitting around waiting for something to do. When it's time to do something, simply wake up one of these existing threads rather than creating one from scratch. There's also no reason why the same thread can't be recycled many times to perform the same (or different) operations. This strategy for thread management is called a thread pool.

Blocking queues

The first step in building a thread pool is coming up with a realistic implementation of the Notifying_queue discussed back in October. I've done that in Listing 2. The basic notion is that a thread that tries to dequeue from an empty queue will block until an object is put into the queue by a second thread.

The queue itself is the LinkedList, declared on line 8 of Listing 2.

The enqueue() operation just adds an Object to the end of the queue. The dequeue() operation removes whatever happens to be at the front. If the queue is empty, the dequeue() method waits (on line 32) until enqueue() notifies it that an element has been added (line 23). Note the use of a spin lock. When several threads are simultaneously blocked waiting to dequeue something, there's no way to predict which thread will actually get the object. Again, I discussed all this back in October.

Unlike the October example, here it's possible to close a Blocking_queue by calling close() (line 57). This method does two things: first, it sets the closed flag, which causes enqueue() to start throwing exceptions when it's called. Next close() releases all threads that are blocked on the current queue by issuing a notifyAll(); all these threads will then return from the wait() on line 32, and since closed will be true, will return from dequeue() with an exception toss.

Listing 2: /src/com/holub/asynch/Blocking_queue.java
001  
002  
003  
004  
package com.holub.asynch;
import java.util.*;
/***************************************************
 |    

This is a thread-safe queue that blocks automatically if you try to

dequeue from an empty queue. It's based on a linked list, so it will

never fill up. (You'll never block on a queue-full condition because

there isn't one.)

This class uses the LinkedList class, introduced into the

JDK at version 1.2 (aka Java 2). It will not work with earlier releases.

(c) 1999, Allen I. Holub.

You may not distributed this code except in binary form, incorporated

into a java .class file. You may use this code freely for personal

purposes, but you may not incorporate it into any commercial product

without my express permission in writing.

@author Allen

I. Holub

 */
005  
006  
007  
008  
009  
010  
public class Blocking_queue
{
    private LinkedList elements = new LinkedList();
    private boolean    closed   = false;
    /***********************************************
     |    

The Closed exception is thrown if you try to used an explicitly

closed queue. See

close

.

     */
011  
012  
013  
014  
015  
016  
017  
    public class Closed extends RuntimeException
    {   private Closed()
        {   super("Tried to access closed Blocking_queue");
        }
    }
    /***********************************************
     |    Enqueue an object
     **/
018  
019  
020  
021  
022  
023  
024  
025  
    public synchronized final void enqueue( Object new_element )
                                            throws Blocking_queue.Closed
    {   if( closed )
            throw new Closed();
        elements.addLast( new_element );
        notify();
    }
    /***********************************************
     |    

Dequeues an element; blocks if the queue is empty (until something

is enqueued). Be careful of nested-monitor lockout if you call this

function. You must ensure that there's a way to get something into

the queue that does not involve calling a synchronized method of

whatever class is blocked, waiting to dequeue something.

@see dequeue

@see enqueue

@return s

the dequeued object always

     **/
026  
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
    public synchronized final Object dequeue() throws InterruptedException,
                                                   Blocking_queue.Closed
    {   try
        {   
            while( elements.size() <= 0 )
            {   wait();
                if( closed )
                    throw new Closed();
            }
            return elements.removeFirst();
        }
        catch( NoSuchElementException e )   // Shouldn't happen
        {   throw new Error(
                    "Internal error (com.holub.asynch.Blocking_queue)");
        }
    }
    /***********************************************
     |    

The is_empty() method is inherently unreliable in a multithreaded

situation. In code like the following, it's possible for a thread

to sneak in after the test but before the dequeue operation and steal

the element you thought you were dequeueing.

   Blocking_queue queue = new Blocking_queue();
    //...
    if( !some_queue.is_empty() )
        some_queue.dequeue();

To do the foregoing reliably, you must synchronize on the queue as follows:

   Blocking_queue queue = new Blocking_queue();
    //...
    synchronized( queue )
    {   if( !some_queue.is_empty() )
            some_queue.dequeue();
    }

The same effect can be achieved if the test/dequeue operation

is done inside a synchronized method, and the only way to

add or remove queue elements is from other synchronized methods.

     */
044  
045  
046  
047  
048  
049  
050  
051  
052  
053  
054  
055  
056  
057  
058  
059  
060  
061  
     public synchronized final boolean is_empty()
     {  return elements.size() > 0;
     }
     /* Releasing a blocking queue causes all threads that are blocked
      * [waiting in dequeue() for items to be enqueued] to be released.
      * The dequeue() call will throw a Blocking_queue.Closed runtime
      * exception instead of returning normally in this case.
      * Once a queue is closed, any attempt to enqueue() an item will
      * also result in a Blocking_queue.Closed exception toss.
      */
     public synchronized void close()
     {  closed = true;
        notifyAll();
     }
}

Pooling threads

Armed with our blocking queue, we can now implement the Thread_pool class (Listing 3). A Thread_pool is an extension of Thread_group whose threads execute arbitrary actions. That is, you ask the pool to execute some action for you on one of its threads. The request returns immediately; the action goes on in the background. You create a thread pool like this:

    Thread_pool pool = new Thread_pool(initial_thread_count, maximum_thread_count);

The pool initially has initial_thread_count threads in it, poised with bated breath, ready to leap into action at a moment's notice. If all of the threads in the pool are busy when you ask the pool to do something for you, additional threads are created up to a maximum of maximum_thread_count.

You tell the pool to do something by calling execute():

    pool.execute(   new Runnable()          // print on another thread
                    {   public void run()
                        {   System.out.println("Hello World");
                        }
                    }
                );

Looking at the implementation, the Thread_pool constructor (Listing 3, line 34) simply creates a bunch of Pooled_thread objects (each of which extends Thread) and starts up the associated threads. Skipping up to the Pooled_thread definition, line 16, these threads' run() methods enter their main execution loop, and then block on a Blocking_queue (pool, defined on line pool). The implementation of execute() (line 54) just passes the Runnable objects to the threads in the pool by enqueueing the Runnable object on the Blocking_queue. The enqueue operation causes one of the waiting Pooled_thread objects to wake up, dequeue the Runnable object, execute its run() method, and then go back to sleep.

This use of the Runnable object is another example of the Command pattern. The Runnable Command object defines a method (run()) that's executed on some thread in the pool.

Listing 3: /src/com/holub/asynch/Thread_pool.java
001  
002  
003  
package com.holub.asynch;
import  com.holub.asynch.Blocking_queue;
/**
 |    

A generic implementation of a thread pool. Use it like this:

Thread_pool pool = new Thread_pool();
pool.execute
(   new Runnable()
{   public void run()
{   // execute this function on an existing
// thread from the pool.
}
}
);

The size of the thread pool can expand automatically to accommodate

requests for execution. That is, if a thread is available in the

pool, it's used to execute the Runnable object, otherwise a new

thread can be created (and added to the pool) to execute the request.

A maximum count can be specified to limit the number of threads in

the pool, however. Each thread pool also forms a thread group (all

threads in the pool are in the group). In practice this means that

the security manager controls whether a thread in the pool can access

threads in other groups, but it also gives you an easy mechanism to

make the entire group a daemon.

(c) 1999, Allen I. Holub.

You may not distribute this code except in binary form,

incorporated into a java .class file. You may use this code freely

for personal purposes, but you may not incorporate it into any

commercial product without securing my express permission in writing.

  */
004  
005  
006  
007  
008  
009  
010  
011  
012  
013  
014  
public class Thread_pool extends ThreadGroup
{
    private final Blocking_queue pool         = new Blocking_queue();
    private /*final*/ int        maximum_size;
    private           int        pool_size;
    private           boolean    has_closed   = false;
    private static int  group_number  = 0;
    private static int  thread_id     = 0;
    /**********************************************
     |    

These are the objects that wait to be activated. They are

typically blocked on an empty queue. You post a Runnable

object to the queue to release a thread, which will execute

the run() method from that object. All pooled-thread objects

will be members of the thread group that comprises the thread pool.

     */
015  
016  
017  
018  
019  
020  
021  
022  
023  
024  
025  
026  
027  
028  
029  
030  
031  
032  
      private class Pooled_thread extends Thread
    {   
        public Pooled_thread()
        {   super( Thread_pool.this, "T" + thread_id );
        }
        public void run() 
        {   try
            {   while( !has_closed )
                {   ((Runnable)( pool.dequeue() )).run();
                }
            }
            catch(InterruptedException  e){/* ignore it, stop thread */}
            catch(Blocking_queue.Closed e){/* ignore it, stop thread */}
        }
    }
    /**********************************************
     |    

Create a thread pool with initial_thread_count threads in it. The

pool can expand to contain additional threads if they are needed.

@param initial_thread_count The initial thread count. If the initial count is greater than

the maximum, it is silently truncated to the maximum.

@param maximum_thread_count specifies the maximum number of threads that can be in

the pool. A maximum of 0 indicates that the pool will be permitted to grow indefinitely.

     */
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
044  
045  
    public Thread_pool(int initial_thread_count, int maximum_thread_count)
    {   super( "Thread_pool" + group_number++ );
        maximum_size = (maximum_thread_count > 0)
                            ? maximum_thread_count: Integer.MAX_VALUE;
        pool_size    = Math.min(initial_thread_count, maximum_size);
        for(int i = pool_size; --i >= 0 ;)
            new Pooled_thread().start();
    }
    /**********************************************
     |    Create a dynamic Thread pool as if you had used Thread_pool(0, true);
     **/
046  
047  
048  
049  
050  
051  
052  
    public Thread_pool()
    {   
        super( "Thread_pool" + group_number++ );
        this.maximum_size = 0;
    }
    /**********************************************
     |    

Execute the run() method of the Runnable object on a thread

in the pool. A new thread is created if the pool is empty and

the number of threads in the pool is not at the maximum.

@throws Thread_pool.Closed if you try to execute an action on a pool to which a close()

request has been sent.

     */
053  
054  
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
071  
072  
073  
074  
075  
076  
077  
078  
079  
080  
081  
082  
083  
    public synchronized void execute( Runnable action ) throws Closed
    {   
        // You must synchronize on the pool because the Pooled_thread's
        // run method is asynchronously dequeueing elements. If we
        // didn't synchronize, it would be possible for the is_empty
        // query to return false, and then have a Pooled_thread sneak
        // in and put a thread on the queue (by doing a blocking dequeue).
        // In this scenario, the number of threads in the pool could
        // exceed the maximum specified in the constructor. The 
        // test for pool_size < maximum_size is outside the synchronized
        // block because I didn't want to incur the overhead of
        // synchronization unnecessarily. This means that I could
        // occasionally create an extra thread unnecessarily, but
        // the pool size will never exceed the maximum.
        if( has_closed )
            throw new Closed();
        if( pool_size < maximum_size )
            synchronized( pool )
            {   if( pool.is_empty() )
                {   ++pool_size;
                    new Pooled_thread().start(); // Add thread to pool
                }
            }
        pool.enqueue( action );          // Attach action to it.
    }
    /**********************************************
     |    

Objects of class Thread_pool.Closed are thrown if you try to

execute an action on a closed Thread_pool.

     */
084  
085  
086  
087  
088  
089  
090  
    public class Closed extends RuntimeException
    {   Closed()
        {   super("Tried to execute operation on a closed Thread_pool");
        }
    }
    /**********************************************
     |    

Kill all the threads waiting in the thread pool, and arrange

for all threads that came out of the pool, but which are working,

to die natural deaths when they're finished with whatever they're

doing. Actions that have been passed to execute() but which

have not been assigned to a thread for execution are discarded.

No further operations are permitted on a closed pool, though

closing a closed pool is a harmless no-op.

     */
091  
092  
093  
094  
095  
096  
097  
098  
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
109  
110  
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126  
127  
128  
129  
130  
131  
132  
    public synchronized void close()
    {   has_closed = true;
        pool.close();                   // release all waiting threads
    }
    /* ============================================================== */
    public static class Test
    {   
        private static Thread_pool pool = new Thread_pool( 10, 10 );
        public static void main( String[] args )
        {
            Test test_bed = new Test();
            test_bed.fire( "hello" );
            test_bed.fire( "world" );
            // Give the threads a chance to start before closing the pool
            try
            {   Thread.currentThread().sleep(1500);
            } catch(InterruptedException e){}
            pool.close();
        }
        private void fire( final String id )
        {   pool.execute
            (   new Runnable()
                {   public void run()
                    {   System.out.println("Starting " + id );
                        try{    Thread.currentThread().sleep(500); }
                        catch(InterruptedException e){}
                        System.out.println("Stopping " + id );
                    }
                }
            );
        }
    }
}

Putting the pool to work

Now let's return to the earlier flush() example and add a Thread_pool. I've done that in Listing 4. The changes are minimal (and are all boldfaced). Rather than creating a Thread and starting it up, I create a Runnable object and pass it to the pools execute method. Note that since the operation is defined by the Runnable object, not by the pooled thread itself, a single thread pool can be shared by many asynchronous methods that would pass in different Runnable command objects.

Listing 4: Thread_pool_flush.java
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
import com.holub.asynch.Reader_writer;import com.holub.asynch.Thread_pool;
import java.io.*;
class Thread_pool_flush
{   Thread_pool pool = new Thread_pool( 3, 10 );
    public interface Flush_error_handler
    {   void error( IOException e );
    }
    private final OutputStream  out;
    private Reader_writer       lock = new Reader_writer();
    private byte[]              buffer;
    private int                 length;
    public Thread_pool_flush( OutputStream out )
    {   this.out = out;
    }
    //...
    synchronized void flush( final Flush_error_handler handler )
    {       pool.execute 
        (   new Runnable()
            {   int     length; 
                byte[]  copy;
                {   length = Thread_pool_flush.this.length;
                    copy   = new byte[length];
                    System.arraycopy(Thread_pool_flush.this.buffer, 0,
                                                        copy, 0, length);
                    Thread_pool_flush.this.length = 0;
                }
                public void run()
                {   try
                    {   lock.request_write();
                        out.write( copy, 0, length );
                    }
                    catch( IOException e )
                    {   handler.error(e);
                    }
                    finally
                    {   lock.write_accomplished();
                    }
                }
            }       );
    }
}         

Sockets and thread pools

Another good application for thread pools is in socket programming. (If you've never used Java sockets, you should read about them before proceeding. A few articles on this topic are listed in the Resources section.)

The main difficulty in programming a socket is on the server side, where you need an accept loop like the following one to pick up client connections:

ServerSocket main = new ServerSocket( port_number );
while( true )
{   new Thread()
    {   Socket client_connection = main.accept();
        public void run()
        {
            // Communicate with the client over the client_connection
            // socket.
        }
    }.start();
}

The accept loop spends most of its time blocked (in the accept() call), waiting for clients to connect. In each iteration of the loop, the server creates a new anonymous Thread derivative whose purpose is to communicate with the client. In this implementation, the connection with the client is established in the anonymous Thread derivative's implicit constructor. That is, main.accept() is called as part of the implicit construction process; start() is called after accept() returns and the object is fully constructed. Then run() executes (on its own thread), performing any necessary communication with the client.

The problem with this approach centers around limitations in the socket system itself. Only a limited number of clients can wait to be connected to the server at any given moment (sometimes as few as five). If the clients-waiting-to-be-connected queue fills, then requests for connections will be refused by the server. This means that the accept loop has to be as tight as possible. That's why a thread was created in the foregoing example, so that the server could go right back to waiting once a connection was established. Nonetheless, the foregoing accept loop spends a not-inconsiderable amount of time creating threads and starting them up. (Most of the examples you find in books do the same thing.) It's easily possible for the waiting-for-connection queue to fill while all this thread-creation activity is going on.

You can considerably shorten the time spent in the body of the accept loop by using a thread pool. I've demonstrate the process with the Socket_server class in Listing 5. To create one of these beasts, you first need to implement the Client_action interface (Listing 5, line 11) to define the action to perform once you've been connected to the socket. Here's a thread pool that implements a simple "echo server." (It simply echoes back to the client whatever strings the client sends it.)

class Action implements Socket_server.Client_action
{   public void action( Socket socket )
    {   try
        {
            BufferedReader in =
                new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
            OutputStreamWriter out =
                    new OutputStreamWriter(socket.getOutputStream());
            String line;
            while( (line = in.readLine()) != null )
            {   out.write( line, 0, line.length() );
                out.write( "\n", 0, 1             );
                out.flush();
            }
            socket.close();
        }
        catch( Exception e )
        {   System.out.println(e.toString());
        }
    }
};

(Most of the complexity is in setting up the Unicode-compliant readers and writers needed to talk to the client.)

You set things up for client communication as follows:

Socket_server echo_server = new Socket_server
                            (   port_number, 
                                expected_connection_count,
                                Action.class,
                                new Socket_server.Death()
                                {   public void action( Exception e )
                                    { // performed when server aborts or
                                      // is killed
                                    }
                                }
                            );
echo_server.start();
//...
echo_server.kill();

The first two arguments to the Socket_server's constructor are the port number for the main socket and the number of client connections that you're expecting (used as the initial size of the thread pool). If more than expected_connection_count clients connect, then the threads for the additional clients will be created, but they are created inside the accept loop.

The third argument is the Class object associated with the Client_action derivative. The Socket_server will dynamically create instances of this class on a one-instance-per-connection basis. Initially, it creates expected_connection_count instances, but as with threads, it can create more if it needs to. These client-communication objects are never destroyed. They're cached internally, and when a client connects to the server, the Socket_server pulls a Client_action object out of the cache, causes its action() method to execute on an independent thread, and then puts the object back into the cache. Since a given Client_action object is dedicated to talking to an individual thread, it can contain client-specific state information (that is, the class can have fields in it). Since the objects are recycled, however, this state information should be reinitialized at the top of the action() method.

The final argument to theSocket_server constructor is a Command object -- an instance of class Socket_server.Death whose action() method is called when the Socket_server itself shuts down. If the shutdown occurred because of an exception toss, the exception object is passed into the action as an argument; null is passed for a normal shutdown.

The Socket_server is implemented in Listing 5. The Client class (line 21) defines the Runnable object that's passed to the Thread_pool's execute() method. Each Client encapsulates a Client_connection. The main accept loop (on line 75) either pulls a Client out of an internal cache or makes a new one if the cache is empty. It then blocks, waiting for a client to connect, finishes initializing the Client by setting its socket field to reference the socket just returned from accept(), and then passes the Client object off to the Thread_pool for execution.

I suppose I could have made Client_action implement Runnable and then have used it directly, rather than wrapping it in a Client, but that's actually a more complex solution from the perspective of the user of a Socket_server. For one thing, simply implementing Runnable is not sufficient because there would be no well-defined way to get the socket argument into the object: run() doesn't take arguments. I figured that since I'd need the user to implement an interface that extended Runnable by adding a set_socket() method, I might as well make the user's life as simple as possible and dispense with the extra method entirely. Also, I didn't want to expose to the end user the mechanisms that Socket_server uses for thread management. That's just bad design. That is, the current architecture makes it easy to handle clients any way I wish, without having to modify any code that uses Socket_server. This statement wouldn't hold if I forced my users to implement Runnable along with whatever methods would be necessary to get the socket argument passed into run().

Finally, this implementation could be sped up a bit by implementing the thread pool locally rather than using the generic Thread_pool class. This way you'd never have to create a Client object in the accept loop. On the other hand, if you have specified a reasonable expected_connections argument to the Socket_server constructor, Clients won't be created very often, and the current implementation will work just fine, even in the most demanding of scenarios.

Listing 5: /src/com/holub/asynch/Socket_server.java
001  
002  
003  
004  
005  
006  
007  
package com.holub.asynch;
import java.net.*;
import java.util.*;
import java.io.*;
import com.holub.asynch.Thread_pool;
/**
 |    

A generic server-side socket that efficiently executes client-related

actions on their own threads. Use it like this:

   class Action implements Socket_server.Client_action
    {   public void action( Socket socket )
        {   // perform any action that's needed to talk to
            // a single client through the "socket". This
            // method executes on its own thread.
        }
    };
    Socket_server echo_server = new Socket_server (
                                    port_number, 
                                    expected_connection_count,
                                    Action.class,
                                    new Socket_server.Death()
                                    {   public void action( Exception e )
                                        { // performed when server aborts or
                                          // is killed
                                        }
                                    }
                                );
    echo_server.start();
    //...
    echo_server.kill();

The Client_action object encapsulates whatever action

must be performed when a client connects to the current server. The

action() method runs on its own thread, and is passed

the socket that's connected to the client. Client_action

objects are manufactured by the Socket_server, and a

minimal number of objects are created and recycled as necessary.

Consequently, the socket argument should not be cached.

Moreover, the Client_action object should reinitialize

itself every time the action() method is called. (This

is not the same architecture as a servlet. One instance of

the Client_action will exist for each connection, so the

object can maintain a unique state in local fields if it wishes.

Once the action completes, however, the object might be used again

for another connection, so it should reinitialize itself at the top

of the action method.)

(c) 1999, Allen I. Holub.

This code may not be distributed by yourself except in binary form,

incorporated into a java .class file. You may use this code freely

for personal purposes, but you may not incorporate it into any

commercial product without express permission of Allen I. Holub in

writing.

  */
008  
009  
010  
public class Socket_server extends Thread
{
    /**
     |    

Implementations of this class service individual client connections.

The action() method is called every time a client

connects. There is a one-to-one mapping of client connections to

Client_action objects, but a given Client_action object may be

recycled to service many clients in sequence.

     */
011  
012  
013  
014  
    public interface Client_action
    {   void    action( Socket client );
    }
    /**
     |    

The Socket_server.Death object is passed into the server to

specify a shutdown action. Its action() method is called when

the server thread shuts down. The argument is null for a normal

shutdown, otherwise it's the exception object that caused the shutdown.

     */
015  
016  
017  
018  
019  
    public interface Death
    {   void    action( Exception e );
    }
    /**
     |    

A wrapper that makes it a bit easier for the user to implement the

client-connection operation. Makes all the mechanics of thread

manipulation internal.

     */
020  
021  
022  
023  
024  
025  
026  
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
    private class Client implements Runnable
    {   private final Client_action action; // Clone of user-supplied action 
        public        Socket        socket; // Changes with each use
        Client( Client_action action ){ this.action = action; }
        public void run()
        {   
            action.action(socket);
            synchronized( connections )
            {   connections.addLast( this );    // Put self back into queue to
            }                                   // be used again
        }
    }
    // All of the following should be final. A compiler bug (that's
    // in all compiler versions up to and including 1.2) won't permit it.
    private /*final*/ ServerSocket  main;
    private /*final*/ Thread_pool   pool;
    private /*final*/ Class         action;
    private /*final*/ Death         shutdown;
    private /*final*/ LinkedList    connections = new LinkedList();
    /**
     |    

Thrown by the Socket_server constructor if it can't

be created for some reason.

     */
044  
045  
046  
047  
048  
049  
050  
    public class Not_created extends RuntimeException
    {   public Not_created( Exception e )
        {   super("Couldn't create socket: " + e.getMessage() );
        }
    }
    /**
     |    

Create a socket-sever thread that creates and maintains a server-side

socket, dispatching client actions on individual threads when clients connect.

@param port_number The port number to use for the main socket

@param expected_connections The number of simultaneous connections

that you expect. More connections are supported, but the threads that service

them may need to be created dynamically when the client connects.

@param action The Class object that

represents a Socket_server.Client_action derivative. Objects are

created dynamically by the system to service clients.

@param shutdown Command object that encapsulates the

action to take when the server thread shuts down (either by being passed

a kill message, or by some internal error).

@throws Not_created If the object can't be created successfully

for one of various reasons (the socket can't be opened, client actions can't

be manufactured, and so on).

     */
051  
052  
053  
054  
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
    public Socket_server(int port_number, int expected_connections,
                                            Class action, Death shutdown)
                                            throws  Not_created
    {   try
        {
            main            = new ServerSocket(port_number);
            pool            = new Thread_pool (expected_connections, 0);
            this.action     = action;
            this.shutdown   = shutdown;
            for( int i = expected_connections; --i >= 0 ; )
                connections.addLast( 
                        new Client((Client_action)(action.newInstance())) );
            setDaemon( true );      // Don't keep the process alive
        }
        catch( Exception e ){ throw new Not_created(e); }
    }
    /***********************************************
     |    

Implements the accept loop, waits for clients to connect and

when a client does connect, executes the action() method of a

clone of the Client prototype passed into the constructor on its

own thread. If the accept() fails catastrophically, the

thread shuts down, and the shut_down object passed to the

constructor is notified (with a null argument).

     */
071  
072  
073  
074  
075  
076  
077  
078  
079  
080  
081  
082  
083  
084  
085  
086  
087  
088  
089  
090  
091  
092  
093  
094  
095  
096  
097  
098  
    public void run()
    {   try
        {   Client client;
            while( !isInterrupted() )
            {   synchronized( connections )
                {   client = (connections.size() > 0)
                                        ? (Client)(connections.removeLast())
                                        : new Client((Client_action)(action.newInstance()))
                                        ;
                }
                client.socket = main.accept();  
                if( isInterrupted() )
                    break;
                pool.execute( client );
            }
        }
        catch(Exception e)
        {   if( shutdown != null )
                shutdown.action(e);
        }
        finally
        {   pool.close();
            if( shutdown != null )
                shutdown.action(null);
        }
    }
    /***********************************************
     |    

Shuts down the Socket_server thread gracefully. All associated

threads are terminated (but those that are working on serving a client will

remain active until the service is complete). The main socket is closed as well.

     */
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
    public void kill()
    {   try
        {   pool.close();
            interrupt();
            main.close();
        }
        catch( IOException e ){ /*ignore*/ }
    }
    /***********************************************
     |    

A small test class. Creates a socket server that implements an

echo server, then opens two connections to it and runs a string

through each connection.

     */
109  
110  
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126  
127  
128  
129  
130  
131  
132  
133  
134  
135  
136  
137  
138  
139  
140  
141  
142  
143  
144  
145  
146  
147  
148  
149  
150  
151  
152  
153  
154  
155  
156  
157  
158  
159  
160  
161  
162  
163  
164  
165  
166  
167  
168  
169  
170  
    static public class Test
    {
        public static void main( String[] args ) throws Exception
        {
            class Action implements Socket_server.Client_action
            {   public void action( Socket socket )
                {   try
                    {
                        BufferedReader in =
                            new BufferedReader(
                                new InputStreamReader(socket.getInputStream()));
                        OutputStreamWriter out =
                                new OutputStreamWriter(socket.getOutputStream());
                        String line;
                        while( (line = in.readLine()) != null )
                        {   out.write( line, 0, line.length() );
                            out.write( "\n", 0, 1             );
                            out.flush();
                        }
                        socket.close();
                    }
                    catch( Exception e )
                    {   System.out.println(e.toString());
                    }
                }
            };
            Action act = new Action();
            Socket_server echo_server = new Socket_server( 9999, 10, Action.class, 
                                        new Socket_server.Death()
                                        {   public void action( Exception e )
                                            { System.out.println("goodbye world (" + e + ")");
                                            }
                                        }
                                    );
            echo_server.start();
            connect("Hello\n");
            connect("World\n");
            echo_server.kill();
        }
        private static void connect( String s ) throws Exception
        {   
            Socket          client  = new Socket( "localhost", 9999 );
            BufferedReader  in      = new BufferedReader(
                                        new InputStreamReader(
                                            client.getInputStream() ));
            OutputStreamWriter out  = new OutputStreamWriter(
                                            client.getOutputStream());
            out.write( s, 0, s.length() );
            out.flush();
            s = in.readLine();
            System.out.println( s );
            client.close();
        }
    }
}

The net result of all this work, then, is an efficient reusable socket server to which you can assign any operation you like, to be preformed when the clients connect. Though the example is complex, it does serve to illustrate why you might want to use thread pools in the real world.

Conclusion

So that's it for another month. This month I've presented a few ways to implement asynchronous methods in Java: using both the one-thread-per-method approach and a thread pool. The socket-server presented this month is a good example of just how useful thread pools can be. Next month, we'll see how the Blocking_queue, used to implement the thread pool, is in itself an essential tool for interthread communication. In fact, the Blocking_queue can be the only object in the system that requires synchronization in many multithreaded systems. I'll look at another object-oriented architecture for implementing asynchronous methods next month which does just that: uses the blocking queue as the sole point of synchronization in the system.

Allen Holub has been working in the computer industry since 1979. He is widely published in magazines (Dr. Dobb's Journal, Programmers Journal, Byte, MSJ, among others). He has seven books to his credit, and is currently working on an eighth that will present the complete sources for a Java compiler written in Java. After eight years as a C++ programmer, Allen abandoned C++ for Java in early 1996. He now looks at C++ as a bad dream, the memory of which is mercifully fading. He's been teaching programming (first C, then C++ and MFC, now object-oriented design and Java) both on his own and for the University of California Berkeley Extension since 1982. Allen offers both public classes and in-house training in Java and object-oriented design topics. He also does object-oriented design consulting and contract Java programming. Get information, and contact Allen, via his Web site http://www.holub.com.

Learn more about this topic

  • Links to all previous articles in this series can be found in the "Articles" section of Allen's Web site. You can find downloadable version of all the code in the same place. http://www.holub.com
  • "Sockets programming in JavaA tutorial," Qusay H. Mahmoud (JavaWorld, December 1996) Get up to speed on Java's socket support with this tutorial. http://www.javaworld.com/javaworld/jw-12-1996/jw-12-sockets.html
  • "Write your own threaded discussion forumThe communications and server components, Part 2," Michael Shoffner (JavaWorld, March 1997). http://www.javaworld.com/javaworld/jw-03-1997/jw-03-step.html
  • Java Network Programming, Elliotte Rusty Harold (O'Reilly, 1997). http://www1.fatbrain.com/asp/bookinfo/bookinfo.asp?theisbn=1565922271
  • Design Patterns Elements of Reusable Object-Oriented Software, Erich Gamma, Richard Helm, Ralph Johnson, John Vlissides (Addison Wesley, 1994). The Command design pattern is presented. This book is essential reading for any object-oriented designer. http://www1.fatbrain.com/asp/bookinfo/bookinfo.asp?theisbn=0201633612
Join the discussion
Be the first to comment on this article. Our Commenting Policies