Reading textual data: Fun with streams

Find out how to extend and customize the character-stream classes to easily read textual data

Never let it be said that I'm happy to rush out a simple article (or book) just to meet a publisher's deadline. What started out as a basic, inefficient character-stream filter, designed to read a stream of digits and parse it into a number, has gradually ballooned into a small stream library sporting unnecessary features and go-faster stripes. Not only has the evolving character-stream library discarded many genetically-inferior siblings along the way, it's behind schedule! (This brings back memories of that 300-page manuscript I discarded in favor of starting anew. I think I'm beginning to see a destructive pattern emerging...)

I initially set out to design a data-reading character-stream filter class. Analogous to the byte-stream filter DataInputStream, this character-stream filter was intended to provide the capability to read textual data from a character stream (namely the output of a human or the PrintWriter println() method).

Let me now, in retrospect, describe what I actually implemented.

First, I created an UndoReader class. This character-stream filter supports three special methods:

  • checkpoint()
  • commit()
  • rollback()

As you read characters through the stream, you have the option to checkpoint the stream -- that is, save the stream's current state and put it into a mode such that it stores all data subsequently read through it. From that point on, the UndoReader stores all the data you read. After any amount of reading, choosing to commit the stream will cause the stored data to be discarded, after which reading proceeds undisturbed. Alternatively, choosing to rollback the stream will cause it to rewind and revert to reading from the position at which you asserted the checkpoint -- just as if you hadn't yet read anything. This stream also supports a couple of related methods.

Next, I implemented the DataReader class, the character-stream filter. This class makes use of the UndoReader class and provides methods to read all the primitive Java types (readInt(), readFloat(), readBoolean(), and so on). What is special about this class is that if you attempt to read a primitive from the stream and it turns out the stream data is incorrect -- if, for example, you attempt to read a Boolean and the next token in the stream is truthfulness -- it rolls back, un-reading any characters read during the erroneous operation, and throws an exception. The stream also supports a feature whereby it can read data one line at a time, signaling each time the end of a line is reached (among other wonders).

The classes I developed will only work in JDK 1.1-plus. Adapting them to work as InputStreams, usable under JDK 1.0.2, should be quite easy, however.

Justification for UndoReader

In the interest of brevity, I'll spare you an introduction to character streams. Todd Sundsted's November 1997 How-To Java column should serve that purpose quite adequately; if you want an introduction to byte streams, check out Todd's October 1997 column. For further details of the Java stream classes, I refer you to Java Network Programming, Second Edition, which I coauthored with Michael Shoffner and Derek Hamner, and which is due out any day now. (See Resources.)

What I should perhaps explain is my justification for the UndoReader class. At the most abstract level, I want the ability to undo a series of read operations, because otherwise my DataReader class will violate the basic law of propriety: It would be improper for an erroneous attempt at reading an int to end up consuming a Boolean. Furthermore, the behavior of my class wouldn't necessarily be clearly defined in the presence of such an error: the amount of erroneous input consumed would be implementation-dependent, and exposing implementation-dependent details of this nature simply invites abuse.

Those readers familiar with the stream classes may then ask about the mark() and reset() methods, or indeed the PushbackReader class: Do these basic features of the stream API not already address my needs? Indeed, use of the mark() and reset() methods does allow a sequence of read operations to be undone, and the PushbackReader unread() methods can be used to the same effect. However, both of these options are bounded. Therefore you must, in each case, declare ahead of time the maximum volume of data you will un-read. In our situation, no such limit exists for textual data: "00...01" is a valid integer, just as "00...0z" is not. I cannot, simply to avoid writing an extra class, presume to impose arbitrary limitations on the data I will process.

Thus rationalized, we can now proceed with the code.

Class UndoReader

The UndoReader class is a character-stream filter that provides unbounded checkpoint, commit, and rollback operations and an additional undo facility.

Figure 1. Using methods checkpoint(), commit(), rollback(), and undo()
  • When checkpoint() is used, it proceeds to store all data read through it in an internal, expanding buffer

  • When commit() is used, the stored contents of the checkpoint buffer are discarded and further reads are no longer stored

  • When rollback() is used, reading reverts to data stored in the internal buffer; when this is used up, reading proceeds as normal

  • Any number of reads performed since a checkpoint can be undone without a full rollback by partially reverting in the internal buffer

  • It is an error to checkpoint a stream that has already been checkpointed or to commit, rollback, or undo a noncheckpointed stream

  • We must support the case where a checkpoint is placed while we're still reading out of the internal buffer

The class definition

We'll start by looking at our class definition:

package org.merlin.io;
import java.io.*;
public class UndoReader extends Reader {
  protected static final int INITIAL_UNDO_SIZE = 64;
  
  protected Reader reader;
  protected int[] buffer;
  protected int index, stored, capacity;
  protected boolean storing, restoring, closed;
  
  public UndoReader (Reader reader) {
    this (reader, INITIAL_UNDO_SIZE);
  }
  public UndoReader (Reader reader, int capacity) {
    super (reader);
    this.reader = reader;
    this.capacity = capacity;
    buffer = new int[capacity];
  }
  ...
}

Ordinarily, a character-stream filter would inherit from FilterReader; however in this case we must override the entire function of the Reader superclass so FilterReader will be of no benefit.

The constructors we provide allow the attached stream, and optionally the initial buffer capacity, to be specified. As the internal buffer can grow as necessary, this will only affect efficiency. In the constructor, we initialize the buffer variable, in which checkpoint data are stored, and the capacity variable, which defines the current size of our buffer.

You may notice that the checkpoint buffer is, in fact, an int array and not a char array. The reason: I wish to be able to store not only normal character data, but the end of file (EOF) [-1] in this buffer. Although there may be other ways to implement this, my purpose is served by this solution.

Other variables related to this buffer are stored, for the amount of checkpoint data that have been stored; and index, for the current read index when data are read from the buffer. The storing flag indicates that a stream has been checkpointed, so data should be stored in the checkpoint buffer; restoring indicates a rollback, so data should be read from the checkpoint buffer; and closed indicates the stream has been closed.

Figure 2. Internals of the UndoReader class

The read() methods

We now look at the operation of reading a single character:

  public int read () throws IOException {
    synchronized (lock) {
      int result;
      if (!storing && !restoring) {
        result = reader.read ();
      } else if (restoring) {
        result = buffer[index ++];
        if (index >= stored)
          restoring = false;
      } else {
        result = reader.read ();
    if (stored + 1 > capacity)
          ensureCapacity (1);
        buffer[stored ++] = result;
      }
      return result;
    }
  }
  protected void ensureCapacity (int amount) {
    capacity = capacity * 2 + amount;
    int[] copy = new int[capacity];
    System.arraycopy (buffer, 0, copy, 0, index);
    buffer = copy;
  }

We start by synchronizing on lock. This variable, inherited from the superclass, is the attached stream (reader) we passed to the superconstructor. It is more efficient for us to synchronize on reader than on this, because if we subsequently call synchronized methods on the attached stream, those synchronization calls will not incur any significant costs.

Figure 3. Synchronization efficiency

If we have neither placed the stream in the checkpoint buffer nor rolled it back, we can simply read a character from the attached stream. Otherwise, if we have rolled it back, we must return a character from the internal checkpoint buffer. If we then reach the end of the internal buffer, we can reset restoring. Otherwise, the stream has been checkpointed, so we must read a character from the attached stream and store it in the checkpoint buffer.

Before inserting data into the checkpoint buffer, we use the ensureCapacity() method to enlarge the array to hold the extra datum. Whenever we perform this enlargement, we more than double the buffer's size. This means it will grow very rapidly to accommodate our needs. (It could, for instance, grow from 1 byte to 1 megabyte in just 20 expansions.)

Read multiple characters in one call:

  public int read (char[] dst, int offset, int length) throws IOException {
    synchronized (lock) {
      int result;
      if (!storing && !restoring) {
        result = reader.read (dst, offset, length);
      } else if (restoring) {
        if (buffer[index] < 0) {
          result = buffer[index ++];
        } else {
          result = (length < stored - index) ? length : stored - index;
          for (int i = 0; i < result; ++ i)
            dst[offset + i] = (char) buffer[index ++];
        }
        if (index >= stored)
          restoring = false;
      } else {
        result = reader.read (dst, offset, length);
        if (result < 0) {
          if (stored + 1 > capacity)
            ensureCapacity (1);
          buffer[stored ++] = result;
        } else {
          if (stored + result > capacity)
            ensureCapacity (result);
          for (int i = 0; i < result; ++ i)
            buffer[stored ++] = dst[offset + i];
        }
      }
      return result;
    }
  }

This method essentially follows the same logic as before. Note, however, the extra code to handle the end of file.

Finally, we test to see if the stream is ready:

  public boolean ready () throws IOException {
    synchronized (lock) {
      return restoring || reader.ready ();
    }
  }

This method returns true if an attempt at reading data from this stream will retrieve data immediately and without blocking (that is to say if we currently are reading from the checkpoint buffer, or else if the attached stream is ready).

The close() method

Here, we look at closing the stream:

  public void close () throws IOException {
    try {
      reader.close ();
    } finally {
      synchronized (lock) {
        storing = restoring = false;
        closed = true;
      }
    }
  }

To close our stream, we first close the attached stream, then synchronize on lock, and finally set all flags appropriately. Consider what might happen if we instead synchronized on the stream before closing it. If one thread calls read() and there are no data available, it will block while holding the synchronization lock. We cannot then close the stream from another thread to wake up the blocked reader, as the close() method would first need to obtain the synchronization lock itself. Synchronizing first, in this manner, would be improper.

The checkpoint() method

Here, we implement the checkpoint operation:

  public void checkpoint () throws IOException {
    synchronized (lock) {
      if (closed)
        throw new IOException ("Stream closed");
      else if (storing)
        throw new IOException ("Alreading checkpointed");
      if (restoring)
        System.arraycopy (buffer, index, buffer, 0, stored - index);
      stored -= index;
      index = 0;
      storing = true;
    }
  }

In this situation, we must verify that our state is OK and assert the storing flag. Additional logic is required, however, if we currently are restoring from the checkpoint buffer. In that case, we must copy the remaining checkpoint data back to the start of the buffer and update our indices.

The commit() method

Here, we execute the commit operation:

  public void commit () throws IOException {
    synchronized (lock) {
      if (closed)
        throw new IOException ("Stream closed");
      else if (!storing)
        throw new IOException ("Undo without checkpoint");
      storing = false;
    }
  }

All we need do is verify that our state is okay and clear the storing flag.

The rollback() method

Here, we implement the rollback operation:

  public void rollback () throws IOException {
    synchronized (lock) {
      if (closed)
        throw new IOException ("Stream closed");
      else if (!storing)
        throw new IOException ("Rollback without checkpoint");
      storing = false;
      index = 0;
      restoring = (index < stored);
    }
  }

The rollback() method verifies that our state is okay, clears the storing flag, and asserts the restoring flag if data remain in the checkpoint buffer.

1 2 3 4 Page
Join the discussion
Be the first to comment on this article. Our Commenting Policies
See more