Programming Java threads in the real world, Part 8

Threads in an object-oriented world, thread pools, implementing socket 'accept' loops

1 2 3 4 5 Page 3
Page 3 of 5
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
044  
045  
    public Thread_pool(int initial_thread_count, int maximum_thread_count)
    {   super( "Thread_pool" + group_number++ );
        maximum_size = (maximum_thread_count > 0)
                            ? maximum_thread_count: Integer.MAX_VALUE;
        pool_size    = Math.min(initial_thread_count, maximum_size);
        for(int i = pool_size; --i >= 0 ;)
            new Pooled_thread().start();
    }
    /**********************************************
     |    Create a dynamic Thread pool as if you had used Thread_pool(0, true);
     **/
046  
047  
048  
049  
050  
051  
052  
    public Thread_pool()
    {   
        super( "Thread_pool" + group_number++ );
        this.maximum_size = 0;
    }
    /**********************************************
     |    

Execute the run() method of the Runnable object on a thread

in the pool. A new thread is created if the pool is empty and

the number of threads in the pool is not at the maximum.

@throws Thread_pool.Closed if you try to execute an action on a pool to which a close()

request has been sent.

     */
053  
054  
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
071  
072  
073  
074  
075  
076  
077  
078  
079  
080  
081  
082  
083  
    public synchronized void execute( Runnable action ) throws Closed
    {   
        // You must synchronize on the pool because the Pooled_thread's
        // run method is asynchronously dequeueing elements. If we
        // didn't synchronize, it would be possible for the is_empty
        // query to return false, and then have a Pooled_thread sneak
        // in and put a thread on the queue (by doing a blocking dequeue).
        // In this scenario, the number of threads in the pool could
        // exceed the maximum specified in the constructor. The 
        // test for pool_size < maximum_size is outside the synchronized
        // block because I didn't want to incur the overhead of
        // synchronization unnecessarily. This means that I could
        // occasionally create an extra thread unnecessarily, but
        // the pool size will never exceed the maximum.
        if( has_closed )
            throw new Closed();
        if( pool_size < maximum_size )
            synchronized( pool )
            {   if( pool.is_empty() )
                {   ++pool_size;
                    new Pooled_thread().start(); // Add thread to pool
                }
            }
        pool.enqueue( action );          // Attach action to it.
    }
    /**********************************************
     |    

Objects of class Thread_pool.Closed are thrown if you try to

execute an action on a closed Thread_pool.

     */
084  
085  
086  
087  
088  
089  
090  
    public class Closed extends RuntimeException
    {   Closed()
        {   super("Tried to execute operation on a closed Thread_pool");
        }
    }
    /**********************************************
     |    

Kill all the threads waiting in the thread pool, and arrange

for all threads that came out of the pool, but which are working,

to die natural deaths when they're finished with whatever they're

doing. Actions that have been passed to execute() but which

have not been assigned to a thread for execution are discarded.

No further operations are permitted on a closed pool, though

closing a closed pool is a harmless no-op.

     */
091  
092  
093  
094  
095  
096  
097  
098  
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
109  
110  
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126  
127  
128  
129  
130  
131  
132  
    public synchronized void close()
    {   has_closed = true;
        pool.close();                   // release all waiting threads
    }
    /* ============================================================== */
    public static class Test
    {   
        private static Thread_pool pool = new Thread_pool( 10, 10 );
        public static void main( String[] args )
        {
            Test test_bed = new Test();
            test_bed.fire( "hello" );
            test_bed.fire( "world" );
            // Give the threads a chance to start before closing the pool
            try
            {   Thread.currentThread().sleep(1500);
            } catch(InterruptedException e){}
            pool.close();
        }
        private void fire( final String id )
        {   pool.execute
            (   new Runnable()
                {   public void run()
                    {   System.out.println("Starting " + id );
                        try{    Thread.currentThread().sleep(500); }
                        catch(InterruptedException e){}
                        System.out.println("Stopping " + id );
                    }
                }
            );
        }
    }
}

Putting the pool to work

Now let's return to the earlier flush() example and add a Thread_pool. I've done that in Listing 4. The changes are minimal (and are all boldfaced). Rather than creating a Thread and starting it up, I create a Runnable object and pass it to the pools execute method. Note that since the operation is defined by the Runnable object, not by the pooled thread itself, a single thread pool can be shared by many asynchronous methods that would pass in different Runnable command objects.

Listing 4: Thread_pool_flush.java
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
import com.holub.asynch.Reader_writer;import com.holub.asynch.Thread_pool;
import java.io.*;
class Thread_pool_flush
{   Thread_pool pool = new Thread_pool( 3, 10 );
    public interface Flush_error_handler
    {   void error( IOException e );
    }
    private final OutputStream  out;
    private Reader_writer       lock = new Reader_writer();
    private byte[]              buffer;
    private int                 length;
    public Thread_pool_flush( OutputStream out )
    {   this.out = out;
    }
    //...
    synchronized void flush( final Flush_error_handler handler )
    {       pool.execute 
        (   new Runnable()
            {   int     length; 
                byte[]  copy;
                {   length = Thread_pool_flush.this.length;
                    copy   = new byte[length];
                    System.arraycopy(Thread_pool_flush.this.buffer, 0,
                                                        copy, 0, length);
                    Thread_pool_flush.this.length = 0;
                }
                public void run()
                {   try
                    {   lock.request_write();
                        out.write( copy, 0, length );
                    }
                    catch( IOException e )
                    {   handler.error(e);
                    }
                    finally
                    {   lock.write_accomplished();
                    }
                }
            }       );
    }
}         

Sockets and thread pools

Another good application for thread pools is in socket programming. (If you've never used Java sockets, you should read about them before proceeding. A few articles on this topic are listed in the Resources section.)

The main difficulty in programming a socket is on the server side, where you need an accept loop like the following one to pick up client connections:

ServerSocket main = new ServerSocket( port_number );
while( true )
{   new Thread()
    {   Socket client_connection = main.accept();
        public void run()
        {
            // Communicate with the client over the client_connection
            // socket.
        }
    }.start();
}

The accept loop spends most of its time blocked (in the accept() call), waiting for clients to connect. In each iteration of the loop, the server creates a new anonymous Thread derivative whose purpose is to communicate with the client. In this implementation, the connection with the client is established in the anonymous Thread derivative's implicit constructor. That is, main.accept() is called as part of the implicit construction process; start() is called after accept() returns and the object is fully constructed. Then run() executes (on its own thread), performing any necessary communication with the client.

The problem with this approach centers around limitations in the socket system itself. Only a limited number of clients can wait to be connected to the server at any given moment (sometimes as few as five). If the clients-waiting-to-be-connected queue fills, then requests for connections will be refused by the server. This means that the accept loop has to be as tight as possible. That's why a thread was created in the foregoing example, so that the server could go right back to waiting once a connection was established. Nonetheless, the foregoing accept loop spends a not-inconsiderable amount of time creating threads and starting them up. (Most of the examples you find in books do the same thing.) It's easily possible for the waiting-for-connection queue to fill while all this thread-creation activity is going on.

You can considerably shorten the time spent in the body of the accept loop by using a thread pool. I've demonstrate the process with the Socket_server class in Listing 5. To create one of these beasts, you first need to implement the Client_action interface (Listing 5, line 11) to define the action to perform once you've been connected to the socket. Here's a thread pool that implements a simple "echo server." (It simply echoes back to the client whatever strings the client sends it.)

class Action implements Socket_server.Client_action
{   public void action( Socket socket )
    {   try
        {
            BufferedReader in =
                new BufferedReader(
                    new InputStreamReader(socket.getInputStream()));
            OutputStreamWriter out =
                    new OutputStreamWriter(socket.getOutputStream());
            String line;
            while( (line = in.readLine()) != null )
            {   out.write( line, 0, line.length() );
                out.write( "\n", 0, 1             );
                out.flush();
            }
            socket.close();
        }
        catch( Exception e )
        {   System.out.println(e.toString());
        }
    }
};

(Most of the complexity is in setting up the Unicode-compliant readers and writers needed to talk to the client.)

You set things up for client communication as follows:

Socket_server echo_server = new Socket_server
                            (   port_number, 
                                expected_connection_count,
                                Action.class,
                                new Socket_server.Death()
                                {   public void action( Exception e )
                                    { // performed when server aborts or
                                      // is killed
                                    }
                                }
                            );
echo_server.start();
//...
echo_server.kill();

The first two arguments to the Socket_server's constructor are the port number for the main socket and the number of client connections that you're expecting (used as the initial size of the thread pool). If more than expected_connection_count clients connect, then the threads for the additional clients will be created, but they are created inside the accept loop.

The third argument is the Class object associated with the Client_action derivative. The Socket_server will dynamically create instances of this class on a one-instance-per-connection basis. Initially, it creates expected_connection_count instances, but as with threads, it can create more if it needs to. These client-communication objects are never destroyed. They're cached internally, and when a client connects to the server, the Socket_server pulls a Client_action object out of the cache, causes its action() method to execute on an independent thread, and then puts the object back into the cache. Since a given Client_action object is dedicated to talking to an individual thread, it can contain client-specific state information (that is, the class can have fields in it). Since the objects are recycled, however, this state information should be reinitialized at the top of the action() method.

The final argument to theSocket_server constructor is a Command object -- an instance of class Socket_server.Death whose action() method is called when the Socket_server itself shuts down. If the shutdown occurred because of an exception toss, the exception object is passed into the action as an argument; null is passed for a normal shutdown.

The Socket_server is implemented in Listing 5. The Client class (line 21) defines the Runnable object that's passed to the Thread_pool's execute() method. Each Client encapsulates a Client_connection. The main accept loop (on line 75) either pulls a Client out of an internal cache or makes a new one if the cache is empty. It then blocks, waiting for a client to connect, finishes initializing the Client by setting its socket field to reference the socket just returned from accept(), and then passes the Client object off to the Thread_pool for execution.

1 2 3 4 5 Page 3
Page 3 of 5