Multicast the chatwaves

Use IP multicast along with custom stream classes to implement a cool, yet simple, peer-to-peer networked chat system

1 2 3 Page 2
Page 2 of 3
import java.io.*;
import java.net.*;
public class DatagramOutputStream extends ByteArrayOutputStream {
  DatagramSocket socket;
  DatagramPacket packet;
  
  public DatagramOutputStream
  (DatagramSocket socket, InetAddress address, int port) {
    this (socket, address, port, 512);
  }
  public DatagramOutputStream
  (DatagramSocket socket, InetAddress address, int port, int initialSize) {
    super (initialSize);
    this.socket = socket;
    packet = new DatagramPacket (buf, 0, address, port);
  }
  public synchronized void flush () throws IOException {
    if (count >= 65508)
      throw new IOException ("Packet overflow (" + count + ") bytes");
    packet.setData (buf, 0, count);
    socket.send (packet);
    reset ();
  }
}

In the code above, we extend the ByteArrayOutputStream class because it provides a convenient foundation on which to build; it buffers all data written to it into an expanding byte array.

In our constructor, we accept the DatagramSocket through which messages should be sent, the address and port to which messages should be addressed, and, optionally, an initial size for the internal buffer. We then create a DatagramPacket with the specified destination, and an empty payload. For efficiency and convenience, we will use this single packet for all messages that we transmit.

We override the flush() method to actually package the buffered data into a datagram and deliver this to the network. If too much data has been written to this stream before flushing (i.e., more than can fit in a datagram packet), we throw an exception; we cannot guarantee that it would be acceptable to split the data into multiple datagrams. Otherwise, we modify the DatagramPacket payload to be our internal buffer (buf and count are inherited from the superclass), and we deliver this to the network before calling reset() to reset the internal buffer.

We have chosen the standard flush() method to be the mechanism by which messages are delivered. We could have chosen instead to add a new method called send(); however, overriding flush() makes the operation of this stream just more transparent.

Note that we don't override the close() method, so closing this stream will have no effect on the associated socket.

Class DatagramInputStream

Now, let's look at our DatagramInputStream:

import java.io.*;
import java.net.*;
public class DatagramInputStream extends InputStream {
  byte[] buffer;
  int index, count;
  DatagramSocket socket;
  DatagramPacket packet;
  
  public DatagramInputStream (DatagramSocket socket) {
    this.socket = socket;
    buffer = new byte[65508];
    packet = new DatagramPacket (buffer, 0);
  }
  public synchronized int read () throws IOException {
    while (index >= count)
      receive ();
    return (int) buffer[index ++];
  }
  public synchronized int read (byte[] data, int offset, int length)
  throws IOException {
    if (length <= 0)
      return 0;
    while (index >= count)
      receive ();
    if (count - index < length)
      length = count - index;
    System.arraycopy (buffer, index, data, offset, length);
    index += length;
    return length;
  }
  public synchronized long skip (long amount) throws IOException {
    if (amount <= 0)
      return 0;
    while (index >= count)
      receive ();
    if (count - index < amount)
      amount = count - index;
    index += amount;
    return amount;
  }
  public synchronized int available () throws IOException {
    return count - index;
  }
  void receive () throws IOException {
    packet.setLength (buffer.length);
    socket.receive (packet);
    index = 0;
    count = packet.getLength ();
  }
}

DatagramInputStream is a bit more complex because there is no convenient existing stream that we can override. Instead, we must effectively reimplement the ByteArrayInputStream class.

In the constructor, we accept the DatagramSocket through which we should receive packets and create a DatagramPacket to hold these packets. We create a 65,508 byte buffer into which messages will be written and from which the resulting data will be read out.

All of our I/O methods (read(), skip(), and available()) follow a standard pattern. Each one checks to see if the internal buffer is empty. If it is, it calls receive() to receive a new packet. Otherwise, it returns some data from our internal buffer. As a result, the internal receive() method is the only interesting method.

We automatically call the receive() method whenever this stream needs to obtain more data to return to the user. We expand the DatagramPacket so that it can receive a maximum-sized packet from the network. Then we wait to receive a packet from the network. After this, we update the value of our count variable (the amount of valid data in our buffer) to be equal to the length of the packet just received, and we reset index.

A word of warning

After seeing these classes, you may think that they are the bee's knees, providing a clean, stream-based interface to datagram connections. However, remember that datagrams are not reliable. For stateless streams of data (for example, a stream of uncompressed audio), loss or duplication of datagrams is relatively inconsequential. However, in the case of a stateful stream, a single error can be catastrophic. Consider, for example, a stream encrypted with a stream or chaining block-cipher. If, in such an environment, a single packet is lost or duplicated, it will render all future communications wholly unintelligible.

Closer to home, the object streams are another example of stateful streams. The ObjectOutputStream class maintains a record of every object that you transmit through it, so that, if you write the same object again, only a back reference to the original is transmitted. This is necessary in order to preserve the referential integrity of interconnected object structures.

If you employ the object streams across an unreliable connection, it is possible that the packet containing the actual object data will be lost, and a later packet containing a back reference will be meaningless to the recipient. Similarly, duplication of packets may cause the recipient great distress. The problem of loss can be overcome by calling reset() on the object stream each time you flush it. This will clear it of all back reference information. However, if you actually require long-term referential equality of transmitted objects, this technique may introduce semantic errors to the data that you transmit. The problem of duplication, on the other hand, is only resolvable by adding some intelligence to these streams, so that duplicate packets can be discarded.

Class MulticastChat

This final class is the chat client itself:

import java.io.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
public class MulticastChat {
  static final String DEFAULT_GROUP = "239.1.2.3";
  static final int DEFAULT_PORT = 1234;
  static final int DEFAULT_TTL = 1;
  
  InetAddress group;
  int port;
  
  public MulticastChat (InetAddress group, int port, int ttl) {
    this.group = group;
    this.port = port;
    this.ttl = ttl;
    initAWT ();
  }
  Frame frame;
  TextArea area;
  TextField field;
  public void initAWT () {
    frame = new Frame
      ("MulticastChat [" + group.getHostAddress () + ":" + port + "]");
    frame.addWindowListener (new WindowAdapter () {
      public void windowOpened (WindowEvent event) {
        field.requestFocus ();
      }
      public void windowClosing (WindowEvent event) {
        try {
          stop ();
        } catch (IOException ignored) {
        }
      }
    });
    area = new TextArea ("", 12, 24, TextArea.SCROLLBARS_VERTICAL_ONLY);
    area.setEditable (false);
    frame.add (area, "Center");
    field = new TextField ("");
    field.addActionListener (new ActionListener () {
      public void actionPerformed (ActionEvent event) {
        netSend (event.getActionCommand ());
        field.selectAll ();
      }
    });
    frame.add (field, "South");
    frame.pack ();
  }
      
  public void start () throws IOException {
    netStart ();
    frame.setVisible (true);
  }
  public void stop () throws IOException {
    netStop ();
    frame.setVisible (false);
  }
  MulticastSocket socket;
  BufferedReader in;
  OutputStreamWriter out;
  Thread listener;
  void netStart () throws IOException {
    socket = new MulticastSocket (port);
    socket.setTimeToLive (ttl);
    socket.joinGroup (group);
    in = new BufferedReader
      (new InputStreamReader (new DatagramInputStream (socket), "UTF8"));
    out =
      new OutputStreamWriter (new DatagramOutputStream (socket, group, port), "UTF8");
    listener = new Thread () {
      public void run () {
        netReceive ();
      }
    };
    listener.start ();
  }
  void netStop () throws IOException {
    listener.interrupt ();
    listener = null;
    socket.leaveGroup (group);
    socket.close ();
  }
  void netSend (String message) {
    try {
      out.write (message + "\n");
      out.flush ();
    } catch (IOException ex) {
      ex.printStackTrace ();
    }
  }
  void netReceive () {
    try {
      Thread myself = Thread.currentThread ();
      while (listener == myself) {
        String message = in.readLine ();
        area.append (message + "\n");
      }
    } catch (IOException ex) {
      area.append ("- listener stopped");
      ex.printStackTrace ();
    }
  }
  public static void main (String[] args) throws IOException {
    if ((args.length > 3) || ((args.length > 0) && args[1].endsWith ("help"))) {
      System.out.println
        ("Syntax: MulticastChat [<group:" + DEFAULT_GROUP +
         "> [<port:" + DEFAULT_PORT + ">] [<ttl:" + DEFAULT_TT + ">]]");
      System.exit (0);
    }
    String groupStr = (args.length > 0) ? args[0] : DEFAULT_GROUP;
    InetAddress group = InetAddress.getByName (groupStr);
    int port = (args.length > 1) ? Integer.parseInt (args[1]) : DEFAULT_PORT;
    int ttl = (args.length > 2) ? Integer.parseInt (args[2]) : DEFAULT_TTL;
    MulticastChat chat = new MulticastChat (group, port, ttl);
    chat.start ();
  }
}

In the MulticastChat constructor above, we set up the user interface, which consists of a Frame, a TextArea, and a TextField. We use an anonymous class to handle the basic window events appropriately, and another anonymous class to translate the user-pressed Return keystroke in the TextField into an invocation of the netSend() method.

The start() method invokes the netStart() method to initialize the network, then shows the Frame. To initialize the network, we create a MulticastSocket listening on the chosen port, set the time-to-live of packets that it sends to the user-specified value (the default is 1, remember), and then create our datagram streams. Note that MulticastSocket is a subclass of DatagramSocket, so it can be used by our datagram streams. To make textual communications easier, we attach an InputStreamReader and an OutputStreamWriter using UTF 8 encoding on top of these byte streams. Then we create an anonymous thread that invokes our netReceive() method to receive messages from the network.

The stop() method invokes the netStop() method to close down the network, then hides the Frame. To close down the network, we interrupt our listener thread, then close the MulticastSocket. You may find that, in practice, this results in an IOException being thrown whenever the chat system is shut down. A little bit of glue can catch these exceptions and ignore them as appropriate.

The netSend() and netReceive() methods behave as expected, sending and receiving messages using our datagram streams. To send a message, we simply write it out and flush the stream; to receive a message, we simply read from the stream. Note that, if you multicast a message to a group of which you are a member, you will automatically receive a copy of your own message. As a result, we don't need to explicitly display messages entered by the user; that will be handled naturally by the message-receiving thread. Of course, error handling here is very crude. You might wish to handle exceptions more gracefully in a real application.

Finally, we provide a main() method that starts up the chat client. If you specify -help as an argument, a brief usage message is displayed. Otherwise, we parse the command-line arguments to determine the host, port, and time-to-live to use, with defaults if an argument is omitted, and then start up an instance of the chat client.

1 2 3 Page 2
Page 2 of 3