Using threads in Java, Part 2

Create communication channels that operate between threads

In part one of this series of colums, I showed you how two or more threads in the Java runtime can synchronize with the wait() and notify() methods in class Object. In this installment we will write a class that uses these techniques to create a communication channel that operates between threads.

The communication channel performs four tasks. It

  • opens a connection to a named channel;
  • puts data into that channel;
  • pulls data out of the channel;
  • releases a channel when it is no longer needed.

This model is pretty typical of such designs.

The first of these, opening a named channel, requires that the class be able to share information (the channel registry) between threads. In Java, threads typically run in a shared address space. Subsequently, static fields in a Java class are shared across all instances of that class. You will recall that static instance variables (sometimes called class variables) do not require that an object instance be created to access them. Instead, they may be accessed using the Classname.variable_name syntax. For the purposes of our example, however, the more salient property is that they are visible to all threads.

Let the games begin

To create our DataChannel class we start out with this code:

 1    package util.comm;
 3    import java.util.Hashtable;
 4    import java.util.Enumeration;
 6    public class DataChannel {
 8        private Hashtable TIDs;
10        private DataChannel() {
11            TIDs = new Hashtable();
12        }
14        private static Hashtable registry = null;
16    /** getChannel (writer version) */
17        public static synchronized DataChannel getChannel(String name) {
18        if (registry == null)
19        registry = new Hashtable();
20        DataChannel it = (DataChannel) registry.get(name);
21        if (it == null) {
22            it = new DataChannel();
23            it.myName = name;
24            registry.put(name, it);
25        }
26        return it;
27        }

Line 1 puts this code in the util.comm package. There are two advantages to doing this: the classes for the data channels don't clutter up our main classes directory; and we can create a "helper" class named DataItem that is private to this package. Note that the package statement must be the first non-comment line in the file.

Lines 3 through 13 are the constructor for a new DataChannel object. As you can see, the constructor is declared private. This declaration makes it impossible for any object, except a DataChannel object, to construct a new instance of a DataChannel. This is a common technique to restrict the ability to construct new objects to a static method like the one in line 17.

Line 14 declares the registry hash table. Because this declaration is static and private, the hash table is shared across thread address spaces, but accessible only to the methods in DataChannel.

Finally, in lines 17 through 27, comes the channel-writer version of the getChannel() method. As a static method, it is accessed using the syntax:

DataChannel dc = DataChannel.getChannel("name");

Because it is a method in DataChannel, it is allowed to manipulate the private registry hash table and to call the private constructor defined in the class. Static methods such as this one that return an object of their own class type are called factory methods. This term is used because unlike constructors that simply allocate an object from the heap, factory methods manufacture one.

Because getChannel is synchronized, the Java runtime will guarantee that only one thread at a time can be executing it. As you can see, if the named channel doesn't exist, it is created.

Reading is fundamental

The second version of the getChannel method creates a reader DataChannel. Here are the bits:

27    /** getChannel (reader version) */
28        public static DataChannel getChannel(String name, Thread myTID, int qSize) {
29        DataChannel it = getItem(name);
31        if (qSize == 0)
32            return it;
33        it.TIDs.put(myTID, new DataItem(myTID, qSize));
34        return it;
35        }

This version takes the additional parameters of a thread identifier and a queue size. It starts by accessing the globally named DataChannel using the other getChannel method. When that channel is returned, it attaches to it an instance of a DataItem object. This object provides the actual channel for the thread. Note that every data channel has one data item for every thread that is interested in receiving data on the channel.

As you may now see, our data channels can have many readers, and they can also have many writers. Further, there can be several data channels active at one time. To prevent all of them from attempting to synchronize on one object (the data channel), we'll create the DataItem class to provide a distinct object between each reader and writer.

The DataItem class is defined below; we'll describe it in sections. This version of DataItem is easy to understand, but it is not as flexible as it could be.

 1    package util.comm;
 3    class DataItem {
 4        private Thread TID;       // The consumer thread
 5        private Object q[];       // Queued values for thread 
 6        private int nQueue;       // Number of items in the Queue
 7        private int dataStatus;   // Status of the queue
 9        // legal Queue status values
10        final static int NONE = 0;    // No data available
11        final static int NEW = 1; // New data available
12        final static int OVERRUN = 2; // Data overrun (queue overflow)
14        DataItem(Thread t, int Q) {
15        q = new Object[Q];
16        nQueue = 0;
17        TID = t;
18        dataStatus = NONE;
19        }
21        /** shorter version for a queue size of two (2) */
22        DataItem(Thread t) {
23        this(t, 2);
24        }

Again, this class is part of the util.comm package. This is declared in line 1. Remember that the package statement must be the first non-comment line in the file.

Lines 3 through 12 declare the instance variables for this class and some constants used within the methods. These maintain the state in this instance of the data item.

Lines 14 through 24 are the constructors for DataItem. Neither are public since they are used only by DataChannel, which is also defined in the util.comm package. The first constructor allocates a data item for the referenced thread with a queue size as specified in Q. The second simply always allocates a queue size of two, which has been shown to be a useful default.

26    /** insert is the "write" method */ 
27        synchronized void insert(Object x) {
28        if (nQueue == q.length) {
29            dataStatus = OVERRUN;
30            q[nQueue-1] = x;
31        } else {
32            q[nQueue] = x;
33            dataStatus = NEW;
34            nQueue++;
35        }
36        notify();
37        }

The method insert() puts new data into the queue. It is synchronized so the writer gets the object's monitor before entering. This ensures multiple writers won't enter the method and potentially corrupt the state of the queue.

Lines 28 through 30 check for data overruns. An overrun occurs when an attempt to insert an object into a full queue is made. This version of the method implements a policy that the previous last value is discarded and the new value is placed at the end of the queue.

Lines 32 through 34 insert the item into the queue, change dataStatus, and then update the number of items queued. Finally, in line 36, any threads reading this data are notified that there is data available to be consumed.

The data is read by fetch() below. It is only a bit more complicated than insert().

39    /** fetch is the "read" method */
40        synchronized Object fetch() throws
        DataChannelOverrun, DataChannelShutdown, DataChannelTimeout {
41        Object r = null;
43  /* If nothing is waiting, we sleep. */
44  if (dataStatus == NONE) {
45      try {
46          long was = System.currentTimeMillis(); 
47          wait(timeout);
48          if ((System.currentTimeMillis() - was) >= timeout)
49          throw new DataChannelTimeout();
50      } catch (InterruptedException e) { dataStatus = SHUTDOWN; }
51  }
53  /* Data (or status) has arrived so dispatch it. */
54  switch (dataStatus) {
55      case NONE:
56      break;
57      case NEW:
58      r = q[0];
59      System.arraycopy(q, 1, q, 0, q.length - 1);
60      nQueue--;
61      break;
62      case OVERRUN:
63      dataStatus = NEW; // Next call will get the data
64      throw new DataChannelOverrun();
65      case SHUTDOWN:
66      throw new DataChannelShutdown();
67  }
68  if (nQueue == 0)
69      dataStatus = NONE;
70  return r;
71    }

In line 44 the code checks the value of dataStatus and goes to sleep in a wait if there is no data available. This is similar to the code in the earlier PingPong class described in Part 1 of this series ("Synchronizing threads in Java," April 1996 JavaWorld).

When the thread is notified, it awakens and then checks to see how much time has passed in case this was a timeout. If it was a timeout it throws a timeout exception. If the thread awakened because it was sent an InterruptedException, it immediately sets the data channel into "shutdown" mode and throws the DataChannelShutdown exception a bit later.

If the state is NEW, the code returns the next object in the array and then shuffles the array to make room for more data. In this example, we use the static method arraycopy in the System class. Alternatively, we could use a read index and a write index into the queue.

If the state is OVERRUN, the code throws the DataChannelOverrun exception. This exception has to be thrown here, rather than at write time, because Java doesn't currently support posting an asynchronous exception to a thread. Note that the only tricky thing here is that the data status changes from OVERRUN to NEW, so that the next time this function is called, the values in the queue will be read.

All of the exceptions are a subclass of DataChannelException. This is done so that clients can either catch individual exceptions if they are interested in those conditions, or they can simply catch DataChannelException if they wish to catch all possible exceptions thrown.

Finally, there are a few convenience methods that we use to monitor the state of the object. These are pretty self explanatory.

63        /** available items to be read */
64        int queueSize() { return nQueue; }
66        /** data is waiting check */
67        boolean hasData() {   return (nQueue > 0); }
69        /** last value posted (doesn't block) */
70        Object lastValue() { return q[0]; }
72        synchronized void delete() { dataStatus = SHUTDOWN; notify(); }
74    } 

However, the last is used by the releaseChannel method of the DataChannel class. The trick is that the thread reading the channel needs to be notified that the channel is being shut down, but the thread is asleep waiting for data that will never come. So in releaseChannel, the DataItem object associated with the passed thread identifier has its delete() method called. This sets the state to shutdown and wakes up the thread. The next thing the thread will see is a DataChannelShutdown exception that it must be prepared to deal with.

Straight is the gate

Using the DataItem class is reasonably straightforward. In the DataChannel class you will notice there are two methods for writing and reading data. The first is for writing and is shown below.

37        public synchronized void putValue(Object x) {
38            for (Enumeration e = TIDs.elements(); e.hasMoreElements(); ) {
39            ((DataItem)(e.nextElement())).insert(x);
40        }
41        }

This method, putValue(), puts the object reference passed in x into the queues of all the data items associated with this channel. Remember that there is one data-channel object but many data-item objects (one for each thread that registered interest in this channel.) This method uses an Enumeration to enumerate each element in the TIDs hash table. As insert() is called on each data item, the corresponding thread is sent a notify and may immediately begin running. Further, if no threads have yet registered an interest in this data channel, this method simply returns.

Reading values is straightforward as well. That is handled by the getValue() method shown here.

42        public Object getValue() throws DataChannelOverrun {
43        DataItem di = (DataItem) TIDs.get(Thread.currentThread());
44        return ((di != null) ? di.fetch() : null);
45        }

This method gets the data item associated with the current thread and returns it; if the data item has no information in its queue, the thread will block on the call to fetch(). Otherwise, it will return immediately with data from the data item.

Quadrophenia: building a simple application

Let's take a moment to put together a simple application of these classes. The example application is completely contrived, of course, but it does demonstrate how the DataChannel class might be used. We'll implement a parallel quadratic equation computation engine. This engine takes an equation of the form

AX2 + BX + C

and produces outputs for the value of X between 1 and 5.

The helper class in this application is called Computer. This class takes two values coming in on two data channels, applies a mathematical operation to those values, and writes the result out on a third (result) channel. Here it is.

1 2 Page 1
Page 1 of 2