Programming Java threads in the real world, Part 8

Threads in an object-oriented world, thread pools, implementing socket 'accept' loops

1 2 3 4 5 Page 4
Page 4 of 5

I suppose I could have made Client_action implement Runnable and then have used it directly, rather than wrapping it in a Client, but that's actually a more complex solution from the perspective of the user of a Socket_server. For one thing, simply implementing Runnable is not sufficient because there would be no well-defined way to get the socket argument into the object: run() doesn't take arguments. I figured that since I'd need the user to implement an interface that extended Runnable by adding a set_socket() method, I might as well make the user's life as simple as possible and dispense with the extra method entirely. Also, I didn't want to expose to the end user the mechanisms that Socket_server uses for thread management. That's just bad design. That is, the current architecture makes it easy to handle clients any way I wish, without having to modify any code that uses Socket_server. This statement wouldn't hold if I forced my users to implement Runnable along with whatever methods would be necessary to get the socket argument passed into run().

Finally, this implementation could be sped up a bit by implementing the thread pool locally rather than using the generic Thread_pool class. This way you'd never have to create a Client object in the accept loop. On the other hand, if you have specified a reasonable expected_connections argument to the Socket_server constructor, Clients won't be created very often, and the current implementation will work just fine, even in the most demanding of scenarios.

Listing 5: /src/com/holub/asynch/Socket_server.java
001  
002  
003  
004  
005  
006  
007  
package com.holub.asynch;
import java.net.*;
import java.util.*;
import java.io.*;
import com.holub.asynch.Thread_pool;
/**
 |    

A generic server-side socket that efficiently executes client-related

actions on their own threads. Use it like this:

   class Action implements Socket_server.Client_action
    {   public void action( Socket socket )
        {   // perform any action that's needed to talk to
            // a single client through the "socket". This
            // method executes on its own thread.
        }
    };
    Socket_server echo_server = new Socket_server (
                                    port_number, 
                                    expected_connection_count,
                                    Action.class,
                                    new Socket_server.Death()
                                    {   public void action( Exception e )
                                        { // performed when server aborts or
                                          // is killed
                                        }
                                    }
                                );
    echo_server.start();
    //...
    echo_server.kill();

The Client_action object encapsulates whatever action

must be performed when a client connects to the current server. The

action() method runs on its own thread, and is passed

the socket that's connected to the client. Client_action

objects are manufactured by the Socket_server, and a

minimal number of objects are created and recycled as necessary.

Consequently, the socket argument should not be cached.

Moreover, the Client_action object should reinitialize

itself every time the action() method is called. (This

is not the same architecture as a servlet. One instance of

the Client_action will exist for each connection, so the

object can maintain a unique state in local fields if it wishes.

Once the action completes, however, the object might be used again

for another connection, so it should reinitialize itself at the top

of the action method.)

(c) 1999, Allen I. Holub.

This code may not be distributed by yourself except in binary form,

incorporated into a java .class file. You may use this code freely

for personal purposes, but you may not incorporate it into any

commercial product without express permission of Allen I. Holub in

writing.

  */
008  
009  
010  
public class Socket_server extends Thread
{
    /**
     |    

Implementations of this class service individual client connections.

The action() method is called every time a client

connects. There is a one-to-one mapping of client connections to

Client_action objects, but a given Client_action object may be

recycled to service many clients in sequence.

     */
011  
012  
013  
014  
    public interface Client_action
    {   void    action( Socket client );
    }
    /**
     |    

The Socket_server.Death object is passed into the server to

specify a shutdown action. Its action() method is called when

the server thread shuts down. The argument is null for a normal

shutdown, otherwise it's the exception object that caused the shutdown.

     */
015  
016  
017  
018  
019  
    public interface Death
    {   void    action( Exception e );
    }
    /**
     |    

A wrapper that makes it a bit easier for the user to implement the

client-connection operation. Makes all the mechanics of thread

manipulation internal.

     */
020  
021  
022  
023  
024  
025  
026  
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
    private class Client implements Runnable
    {   private final Client_action action; // Clone of user-supplied action 
        public        Socket        socket; // Changes with each use
        Client( Client_action action ){ this.action = action; }
        public void run()
        {   
            action.action(socket);
            synchronized( connections )
            {   connections.addLast( this );    // Put self back into queue to
            }                                   // be used again
        }
    }
    // All of the following should be final. A compiler bug (that's
    // in all compiler versions up to and including 1.2) won't permit it.
    private /*final*/ ServerSocket  main;
    private /*final*/ Thread_pool   pool;
    private /*final*/ Class         action;
    private /*final*/ Death         shutdown;
    private /*final*/ LinkedList    connections = new LinkedList();
    /**
     |    

Thrown by the Socket_server constructor if it can't

be created for some reason.

     */
044  
045  
046  
047  
048  
049  
050  
    public class Not_created extends RuntimeException
    {   public Not_created( Exception e )
        {   super("Couldn't create socket: " + e.getMessage() );
        }
    }
    /**
     |    

Create a socket-sever thread that creates and maintains a server-side

socket, dispatching client actions on individual threads when clients connect.

@param port_number The port number to use for the main socket

@param expected_connections The number of simultaneous connections

that you expect. More connections are supported, but the threads that service

them may need to be created dynamically when the client connects.

@param action The Class object that

represents a Socket_server.Client_action derivative. Objects are

created dynamically by the system to service clients.

@param shutdown Command object that encapsulates the

action to take when the server thread shuts down (either by being passed

a kill message, or by some internal error).

@throws Not_created If the object can't be created successfully

for one of various reasons (the socket can't be opened, client actions can't

be manufactured, and so on).

     */
051  
052  
053  
054  
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
    public Socket_server(int port_number, int expected_connections,
                                            Class action, Death shutdown)
                                            throws  Not_created
    {   try
        {
            main            = new ServerSocket(port_number);
            pool            = new Thread_pool (expected_connections, 0);
            this.action     = action;
            this.shutdown   = shutdown;
            for( int i = expected_connections; --i >= 0 ; )
                connections.addLast( 
                        new Client((Client_action)(action.newInstance())) );
            setDaemon( true );      // Don't keep the process alive
        }
        catch( Exception e ){ throw new Not_created(e); }
    }
    /***********************************************
     |    

Implements the accept loop, waits for clients to connect and

when a client does connect, executes the action() method of a

clone of the Client prototype passed into the constructor on its

own thread. If the accept() fails catastrophically, the

thread shuts down, and the shut_down object passed to the

constructor is notified (with a null argument).

     */
071  
072  
073  
074  
075  
076  
077  
078  
079  
080  
081  
082  
083  
084  
085  
086  
087  
088  
089  
090  
091  
092  
093  
094  
095  
096  
097  
098  
    public void run()
    {   try
        {   Client client;
            while( !isInterrupted() )
            {   synchronized( connections )
                {   client = (connections.size() > 0)
                                        ? (Client)(connections.removeLast())
                                        : new Client((Client_action)(action.newInstance()))
                                        ;
                }
                client.socket = main.accept();  
                if( isInterrupted() )
                    break;
                pool.execute( client );
            }
        }
        catch(Exception e)
        {   if( shutdown != null )
                shutdown.action(e);
        }
        finally
        {   pool.close();
            if( shutdown != null )
                shutdown.action(null);
        }
    }
    /***********************************************
     |    

Shuts down the Socket_server thread gracefully. All associated

threads are terminated (but those that are working on serving a client will

remain active until the service is complete). The main socket is closed as well.

     */
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
    public void kill()
    {   try
        {   pool.close();
            interrupt();
            main.close();
        }
        catch( IOException e ){ /*ignore*/ }
    }
    /***********************************************
     |    

A small test class. Creates a socket server that implements an

echo server, then opens two connections to it and runs a string

through each connection.

     */
109  
110  
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126  
127  
128  
129  
130  
131  
132  
133  
134  
135  
136  
137  
138  
139  
140  
141  
142  
143  
144  
145  
146  
147  
148  
149  
150  
151  
152  
153  
154  
155  
156  
157  
158  
159  
160  
161  
162  
163  
164  
165  
166  
167  
168  
169  
170  
    static public class Test
    {
        public static void main( String[] args ) throws Exception
        {
            class Action implements Socket_server.Client_action
            {   public void action( Socket socket )
                {   try
                    {
                        BufferedReader in =
                            new BufferedReader(
                                new InputStreamReader(socket.getInputStream()));
                        OutputStreamWriter out =
                                new OutputStreamWriter(socket.getOutputStream());
                        String line;
                        while( (line = in.readLine()) != null )
                        {   out.write( line, 0, line.length() );
                            out.write( "\n", 0, 1             );
                            out.flush();
                        }
                        socket.close();
                    }
                    catch( Exception e )
                    {   System.out.println(e.toString());
                    }
                }
            };
            Action act = new Action();
            Socket_server echo_server = new Socket_server( 9999, 10, Action.class, 
                                        new Socket_server.Death()
                                        {   public void action( Exception e )
                                            { System.out.println("goodbye world (" + e + ")");
                                            }
                                        }
                                    );
            echo_server.start();
            connect("Hello\n");
            connect("World\n");
            echo_server.kill();
        }
        private static void connect( String s ) throws Exception
        {   
            Socket          client  = new Socket( "localhost", 9999 );
            BufferedReader  in      = new BufferedReader(
                                        new InputStreamReader(
                                            client.getInputStream() ));
            OutputStreamWriter out  = new OutputStreamWriter(
                                            client.getOutputStream());
            out.write( s, 0, s.length() );
            out.flush();
            s = in.readLine();
            System.out.println( s );
            client.close();
        }
    }
}

The net result of all this work, then, is an efficient reusable socket server to which you can assign any operation you like, to be preformed when the clients connect. Though the example is complex, it does serve to illustrate why you might want to use thread pools in the real world.

Conclusion

So that's it for another month. This month I've presented a few ways to implement asynchronous methods in Java: using both the one-thread-per-method approach and a thread pool. The socket-server presented this month is a good example of just how useful thread pools can be. Next month, we'll see how the Blocking_queue, used to implement the thread pool, is in itself an essential tool for interthread communication. In fact, the Blocking_queue can be the only object in the system that requires synchronization in many multithreaded systems. I'll look at another object-oriented architecture for implementing asynchronous methods next month which does just that: uses the blocking queue as the sole point of synchronization in the system.

1 2 3 4 5 Page 4
Page 4 of 5