Newsletter sign-up
View all newsletters

Enterprise Java Newsletter
Stay up to date on the latest tutorials and Java community news posted on JavaWorld

Sponsored Links

Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs

Master Merlin's new I/O classes

Squeeze maximum performance out of nonblocking I/O and memory-mapped buffers

  • Print
  • Feedback

Page 7 of 7

 class AcceptThread extends Thread {
  private ServerSocketChannel ssc;
  public AcceptThread(Selector connectSelector, 
                      ConnectionList list, 
                      int port) 
      throws Exception 
  {
    super("Acceptor");
    ...
    ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    InetSocketAddress address = new InetSocketAddress(port);
    ssc.socket().bind(address);
    ...
  }

SocketChannel

java.nio.channels.SocketChannel is the real workhorse in this application. It encapsulates a java.net.Socket and adds a nonblocking mode and a state machine.

SocketChannels can be created one of two ways. First, SocketChannel.open() creates a new, unconnected SocketChannel. Second, the Socket returned by ServerSocketChannel.accept() actually has an open and connected SocketChannel attached to it. This code fragment, from AcceptThread, illustrates the second approach to acquiring a SocketChannel:

class AcceptThread extends Thread {
  private ConnectionList acceptedConnections;
  ...
  protected void acceptPendingConnections() throws Exception {
    ...
    for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
      ...
      ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel();
      SocketChannel incomingChannel = readyChannel.accept();
      acceptedConnections.push(incomingChannel);
    }
  }
}
Like SelectableChannel's other subclasses, SocketChannel can be blocking or nonblocking. If it is blocking, then read and write operations on the SocketChannel behave exactly like blocking reads and writes on a Socket, with one vital exception: these blocking reads and writes can be interrupted if another thread closes the channel.

FileChannel

Unlike SocketChannel and ServerSocketChannel, java.nio.channels.FileChannel does not derive from SelectableChannel. As you will see in the next section, that means that FileChannels cannot be used for nonblocking I/O. Nevertheless, FileChannel has a slew of sophisticated features that were previously reserved for C programmers. FileChannels allow locking of file portions and direct file-to-file transfers that use the operating system's file cache. FileChannel can also map file regions into memory. Memory mapping a file uses the native operating system's memory manager to make a file's contents look like memory locations. For more efficient mapping, the operating system uses its disk paging system. From the application's perspective, the file contents just exist in memory at some range of addresses. When it maps a file region into memory, FileChannel creates a MappedByteBuffer to represent that memory region. MappedByteBuffer is a type of direct byte buffer. A MappedByteBuffer offers two big advantages. First, reading memory-mapped files is fast. The biggest gains go to sequential access, but random access also speeds up. The operating system can page the file into memory far better than java.io.BufferedInputStream can do its block reads. The second advantage is that using MappedByteBuffers to send files is simple, as shown in the next code fragment, also from ReadWriteThread:
  protected void sendFile(String uri, SocketChannel channel) throws 
RequestException, IOException {
    if(Server.verbose) 
      System.out.println("ReadWriteThread: Sending " + uri);
    Object obj = fileCache.get(uri);
    
    if(obj == null) {
      Server.statistics.fileMiss();
      try {
            File f = new File(baseDirectory, uri);
            FileInputStream fis = new FileInputStream(f);
            FileChannel fc = fis.getChannel();
            
            int fileSize = (int)fc.size();
            responseBuffers[1] = fc.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
            fileCache.put(uri, responseBuffers[1]);
      } catch(FileNotFoundException fnfe) {
            throw RequestException.PAGE_NOT_FOUND;
      }
    } else {
      Server.statistics.fileHit();
      responseBuffers[1] = (MappedByteBuffer)obj;
      responseBuffers[1].rewind();
    }
    responseBuffers[0].rewind();
    channel.write(responseBuffers);
  }
The sendFile() method sends a file as an HTTP response. The lines inside the try block create the MappedByteBuffer. The rest of the method caches the memory-mapped file buffers in a WeakHashMap. That way, repeated requests for the same file are blindingly fast, yet when memory tightens, the garbage collector eliminates the cached files. You could keep the buffers in a normal HashMap, but only if you know that the file number is small (and fixed). Notice that the call to channel.write() actually passes an array of two ByteBuffers (one direct, one mapped). Passing two buffers makes the call a gathering write operation. The first buffer is fixed to contain the HTTP response code, headers, and body separator. The second buffer is the memory-mapped file. The channel sends the entire contents of the first buffer (the response header) followed by the entire contents of the second buffer (the file data).

The bridge to the old world



Before moving on to nonblocking operations, you should investigate the class java.nio.channels.Channels. Channels allows new I/O channels to interoperate with old I/O streams and readers. Channels has static methods that can create a channel from a stream or reader or vice versa. It proves most useful when you deal with third-party packages that expect streams, such as XML parsers.

Selectors

In the old days of blocking I/O, you always knew when you could read or write to a stream, because your call would not return until the stream was ready. Now, with nonblocking channels, you need some other way to tell when a channel is ready. In the new I/O packages, Selectors serve that purpose.

In Pattern Oriented Software Architecture, Volume 2, by Douglas Schmidt, Michael Stal, Hans Rohnert, and Frank Buschmann (John Wiley & Son Ltd, 1996), the authors present a pattern called Reactor. Reactor allows applications to decouple event arrival from event handling. Events arrive at arbitrary times but are not immediately dispatched. Instead, a Reactor keeps track of the events until the handlers ask for them.

A java.nio.channels.Selector plays the role of a Reactor. A Selector multiplexes events on SelectableChannels. In other words, a Selector provides a rendezvous point between I/O events on channels and client applications. Each SelectableChannel can register interest in certain events. Instead of notifying the application when the events happen, the channels track the events. Later, when the application calls one of the selection methods on the Selector, it polls the registered channels to see if any interesting events have occurred. Figure 10 depicts an example of a selector with two registered channels.

Figure 10. A selector polling its channels

Channels only register for operations they have interest in. Not every channel supports every operation. SelectionKey defines all possible operation bits, which are used twice. First, when the application registers the channel by calling SelectableChannel.register(Selector sel, int operations), it passes the sum of the desired operations as the second argument. Then, once a SelectionKey has been selected, the SelectionKey's readyOps() method returns the sum of all the operation bits that its channel is ready to perform. SelectableChannel.validOps() returns the allowed operations for each channel. Attempting to register a channel for operations it doesn't support results in an IllegalArgumentException. The following table lists the valid operations for each concrete subclass of SelectableChannel:

ServerSocketChannel OP_ACCEPT
SocketChannel OP_CONNECT, OP_READ, OP_WRITE
DatagramChannel OP_READ, OP_WRITE
Pipe.SourceChannel OP_READ
Pipe.SinkChannel OP_WRITE


Table 2. SelectableChannels and their valid operations


A channel can register for different operation sets on different selectors. When the operating system indicates that a channel can perform one of the valid operations that it registered for, the channel is ready. On each selection call, a selector undergoes a series of actions. First, every key cancelled since the last selection drops from the selector's key set. A key can be cancelled by explicitly calling SelectionKey.cancel(), by closing the key's channel, or by closing the key's selector. Keys can be cancelled asynchronously -- even while the selector is blocking. Second, the selector checks each channel to see if it's ready. If it is, then the selector adds the channel's key to the ready set. When a key is in the ready set, the key's readyOps() method always returns a set of operations that the key's channel can perform. If the key was already in the ready set before this call to select(), then the new operations are added to the key's readyOps(), so that the key reflects all the available operations.

Next, if any keys have cancelled while the operating system checks are underway, they drop from the ready set and the registered key set.

Finally, the selector returns the number of keys in its ready set. The set itself can be obtained with the selectedKeys() method. If you call Selector.selectNow() and no channels are ready, then selectNow() just returns zero. On the other hand, if you call Selector.select() or Selector.select(int timeout), then the selector blocks until at least one channel is ready or the timeout is reached. Selectors should be familiar to Unix or Win32 system programmers, who will recognize them as object-oriented versions of select() or WaitForSingleEvent(). Before Merlin, asynchronous I/O was the domain of C or C++ programmers; now it is available to Java programmers too. See the sidebar, "Is the New I/O Too Platform-Specific?", for a discussion of why Java is just now acquiring asynchronous I/O.

Applying selectors



The sample application uses two selectors. In AcceptThread, the first selector just handles the ServerSocketChannel:

class AcceptThread extends Thread {
  private ServerSocketChannel ssc;
  private Selector connectSelector;
  private ConnectionList acceptedConnections;
  public AcceptThread(Selector connectSelector, 
                      ConnectionList list, 
                      int port) 
      throws Exception 
  {
    super("Acceptor");
    this.connectSelector = connectSelector;
    this.acceptedConnections = list;
    ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    InetSocketAddress address = new InetSocketAddress(port);
    ssc.socket().bind(address);
    ssc.register(this.connectSelector, SelectionKey.OP_ACCEPT);
  }
   
  public void run() {
    while(true) {
      try {
        connectSelector.select();
        acceptPendingConnections();
      } catch(Exception ex) {
        ex.printStackTrace();
      }
    }
  }
  protected void acceptPendingConnections() throws Exception {
    Set readyKeys = connectSelector.selectedKeys();
    for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
      SelectionKey key = (SelectionKey)i.next();
      i.remove();
      ServerSocketChannel readyChannel = (ServerSocketChannel)key.channel();
      SocketChannel incomingChannel = readyChannel.accept();
      acceptedConnections.push(incomingChannel);
    }
  }
}
AcceptThread uses connectSelector to detect incoming connection attempts. Whenever the selector indicates that the ServerSocketChannel is ready, there must be a connection attempt. AcceptThread.acceptPendingConnections() iterates through the selected keys (there can be only one) and removes it from the set. Thanks to the selector, we know that the call to ServerSocketChannel.accept() returns immediately. We can get a SocketChannel -- representing a client connection -- from the new Socket. That new channel passes to ReadWriteThread, by way of a FIFO (first in, first out) queue.

ReadWriteThread uses readSelector to find out when a request has been received. Because it is only a sample, our server application assumes that all requests arrive in a single TCP packet. That is not a good assumption for a real Web server. Other code samples have already shown ReadWriteThread's buffer management, file mapping, and response sending, so this listing contains only the selection code:

class ReadWriteThread extends Thread {
  private Selector readSelector;
  private ConnectionList acceptedConnections;
  ...
  public void run() {
    while(true) {
      try {
        registerNewChannels();
        int keysReady = readSelector.select();
        if(keysReady > 0) {
          acceptPendingRequests();
        }
      } catch(Exception ex) {
        ex.printStackTrace();
      }
    }
  }
  protected void registerNewChannels() throws Exception {
    SocketChannel channel;
    while(null != (channel = acceptedConnections.removeFirst())) {
      channel.configureBlocking(false);
      channel.register(readSelector, SelectionKey.OP_READ, new StringBuffer());
    }  
  }
  protected void acceptPendingRequests() throws Exception {
    Set readyKeys = readSelector.selectedKeys();
    for(Iterator i = readyKeys.iterator(); i.hasNext(); ) {
      SelectionKey key = (SelectionKey)i.next();
      i.remove();
      SocketChannel incomingChannel = (SocketChannel)key.channel();
      Socket incomingSocket = incomingChannel.socket();
      ...
            String path = readRequest(incomingSocket);
            sendFile(path, incomingChannel);
      ...
    }
  }


The main loops of each thread resemble each other, with the main difference being that ReadWriteThread registers for OP_READ, while AcceptThread registers for OP_ACCEPT. Naturally, the ways in which the respective events are handled differ; overall, however, both threads are instances of the Reactor pattern.

The third argument of register() is an attachment to the SelectionKey, which can be any object. The key holds on to the attachment for later use. Here, the attachment is a StringBuffer that readRequest uses to receive the incoming HTTP request. Each time readRequest reads a Buffer from the socket, it decodes the buffer and appends it to the request string. Once the request string is finished, readRequest calls handleCompleteRequest to parse the request and send the response.

A few gotchas

You might encounter a few tricky points when using selectors. First, although selectors are thread safe, their key sets are not. When Selector.selectedKeys() is called, it actually returns the Set that the selector uses internally. That means that any other selection operations (perhaps called by other threads) can change the Set. Second, once the selector puts a key in the selected set, it stays there. The only time a selector removes keys from the selected set is when the key's channel closes. Otherwise, the key stays in the selected set until it is explicitly removed. Third, registering a new channel with a selector that is already blocking does not wake up the selector. Although the selector appears as if it misses events, it will detect the events with the next call to select(). Fourth, a selector can have only 63 channels registered, which is probably not a big deal.

Create high-performance servers



This article has introduced the JDK's new capabilities, including buffer management, byte ordering, character sets, channels, selectors, and nonblocking I/O. Along the way, a functional Web server illustrated the concepts. For a better idea of how to apply these features in real-world applications, check out Pattern-Oriented Software Architecture, Volume 2. It features a wealth of patterns aimed at networked applications and objects, all of which can be enabled with Merlin's new I/O classes. The new I/O packages in Merlin cover a broad scope. Amazingly, the Java Specification Request actually specifies even more features that did not make it into Merlin (see the sidebar, "Where Did the New I/O Come From?" below). These long overdue capabilities correct a serious deficiency in the Java platform. With the nonblocking I/O, you can now create even higher-performance Java servers.

About the author

Michael Nygard has been developing large-scale applications in Java since 1996. His experience runs the gamut, covering scientific, military, financial, educational, banking, retail, and manufacturing applications. He still remembers the API upheaval from Java 0.9 alpha to beta. Michael recently joined in founding Halley's Fifth to promote unit testing and agile methods.
  • Print
  • Feedback

Resources