Make room for JavaSpaces, Part 6

Build and use distributed data structures in your JavaSpaces programs

1 2 3 Page 3
Page 3 of 3
private void processNextRequest() {
    Index tail = readIndex("tail", channel);
    Index head = removeIndex("head", channel);
    if (tail.position.intValue() < head.position.intValue()) {
        // there are no requests 
        writeIndex(head);
        return;
    }
    // get the next request & increment the head:
    MP3Request request = removeRequest(channel, head.position);
    head.increment();
    writeIndex(head);
    if (request == null) {
        System.out.println("Communication error.");
        return;
    } 
        
    // retrieved an MP3 request, so call third-party freeware 
    // program to perform the conversion from WAV to MP3 
          
    String inputName = request.inputName;
    byte[] inputData = request.data;
    byte[] outputData = null;        
    String tmpInputFile = "./tmp" + request.position + ".wav";
    String tmpOutputFile = "./tmp" + request.position + ".mp3";
    Process subProcess = null;
                       
    Utils.putRawData(inputData, tmpInputFile);
    try {
        String[] cmdArray = { "\"C:\\Program Files\\BladeEnc\\BladeEnc\"", 
            "-quit", "-quiet", "-progress=0", tmpInputFile, tmpOutputFile};
        subProcess = Runtime.getRuntime().exec(cmdArray);
        subProcess.waitFor();
    } catch (Exception e) {
        . . .
        return;                 
    }        
         
    . . .
         
    // put MP3 result into the space
    outputData = Utils.getRawData(tmpOutputFile);
    MP3Result result = new MP3Result(inputName, outputData, from);
    try {
        space.write(result, null, Lease.FOREVER);
           . . . 
    } catch (Exception e) {
        . . .
        return;
    }            
}

The processNextRequest method first reads the tail index for the MP3 Request channel, but leaves it in the space. Then it removes the head index and compares the two positions. If the tail's position is less than the head's position, that means the channel is currently empty and there are no encoding requests to process. If that is the case, this method simply returns the unmodified head index to the space and returns.

However, if there are requests in the channel, processNextRequest calls removeRequest to remove the request entry at the head, increments the head index, and then writes the updated head index back into the space. Then the method is ready to obtain the wav data from the request entry and perform the actual conversion. Let's look at how that happens.

The method extracts the input filename and wav input data from the request entry. It also constructs two temporary local files -- one to hold the wav input data and one to hold the MP3 output data -- and writes the wav input data into the temporary input file. Next the method makes a call to an external, third-party freeware program that takes the wav input file, encodes it into the MP3 format, and writes the resulting MP3 data to the temporary output file. At that point, our method constructs an MP3Result (which I'll look at shortly) using the generated MP3 data, and writes it to the space.

For the sake of completeness, here are the definitions of a few helper methods used in the code. Their implementations should be fairly straightforward:

// remove a request entry from the channel 
private MP3Request removeRequest(String channel, 
    Integer position) 
{
    MP3Request template = new MP3Request (channel, position);
    MP3Request request = null;
    try {
        request = (MP3Request)
            space.take(template, null, Long.MAX_VALUE);
        return request;
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}
// remove an index ("head" or "tail") from the channel
private Index removeIndex(String type, String channel) {
    Index template = new Index(type, channel);
    Index index = null;
    try {
        return (Index)space.take(template, null, Long.MAX_VALUE);
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}
// read an index ("head" or "tail") from the channel
private Index readIndex(String type, String channel) {
    Index template = new Index(type, channel);
    Index index = null;
    try {
        return (Index)space.read(template, null, Long.MAX_VALUE);
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}
// write a head or tail index to the channel
private void writeIndex(Index index) {
    try {
        space.write(index, null, Lease.FOREVER);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

The MP3Result entry

As the workers obtain MP3 data, they wrap the results in MP3Result entries and write those into the space for MP3 Requesters to find. Here's how an MP3Result entry is defined:

public class MP3Result implements Entry {
    public String inputName;    // name of file that was encoded
    public byte[] data;         // raw MP3 data 
    public String from;         // who sent the request
    public MP3Result() {        // the no-arg constructor
    }
    public MP3Result(String from) {
        this.from = from;
    }
    public MP3Result(String inputName, byte[] data, String from) {
        this.inputName = inputName;
        this.data = data;
        this.from = from;
    }    
}

The result entry is straightforward: it holds the filename that was encoded, the bytes representing the MP3 encoding, and a String representing the name of the person who requested the MP3.

Collecting and displaying the MP3 results

As I mentioned earlier, each MP3 Requester has a thread that loops continuously, removing MP3 results from the space and displaying them. To complete your understanding of the whole picture, here's the code that accomplishes this:

  
// thread in the MP3Requester that loops,
// taking & displaying links to MP3 results
public void run() {    
    MP3Result template = new MP3Result(from);
    MP3Result result = null;
    String outputName = "";   // name of MP3 output file
        
    while(true) {
        try {
            result = (MP3Result)
                space.take(template, null, Long.MAX_VALUE);
            int pos = result.inputName.indexOf(".wav");
            outputName = result.inputName.substring(0, pos) + ".mp3";
            Utils.putRawData(result.data, outputName);
            displayResult(outputName);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

This method first constructs an MP3Result template, supplying only the username that was entered into the requester's GUI. Then the method enters a loop, looking for any MP3Result entries that are tagged with the requester's name. Whenever an entry is found, it is removed from the space, and the raw MP3 data is written to an output file that corresponds to the input file the user specified. For instance, if the user specified C:\Windows\Desktop\wavs\drpepper.wav as the file to encode, the resulting MP3 data is written to C:\Windows\Desktop\wavs\drpepper.mp3. Then displayResult is called to display the name of the file in the GUI (see the results area of Figure 4); if the user double clicks on the MP3 filename, an MP3 player (currently hardcoded in the application as C:\Program Files\winamp\winamp) will be launched to play the MP3 file.

Wrapping it up

With this article, we've created a real-world distributed application that revolves around two distributed data structures -- an ordered MP3 Request channel holding MP3Request entries, and an unordered bag holding MP3Result entries. The channel distributed data structure is versatile; with small changes to the protocols, it can form the basis of a variety of communication patterns. For instance, if workers don't remove the entries in the channel but simply read them, the channel can form the basis of an archived chat stream in the space. To explore channels further, refer to JavaSpaces Principles, Patterns, and Practice.

You might think of the pattern in this example as a variation on the master/worker pattern Eric Freeman and I used in earlier columns. Recall that, in the master/worker pattern, a single master process deposits tasks into a task bag in the space, and worker processes pick up arbitrary tasks, compute them, and write the results into a result bag. Now, in our MP3 encoder architecture, there are potentially many requester processes that append requests to an ordered channel of requests, and potentially many workers that pick up the requests in order and process them. Just as in the master/worker pattern, the workers write the results into a result bag in the space. The requester processes pick up just the results earmarked for them. You could apply the pattern developed here to a wide variety of domains: writers simply add tasks (or requests for services) to the channel, and worker processes remove and process the tasks (computing them or providing some other service). The channel distributed data structure allows requests to be processed in a fair, first-come, first-served manner.

It's also worth noting, once again, that space-based communication decouples the senders and receivers of information and promotes a loosely coupled communication style in which senders and receivers don't interact directly, but rather communicate through distributed data structures in a space. In this distributed MP3 encoding example, the requesters don't care which worker or backend server ultimately processes their request; they only care that they get a result back. Conversely, the workers don't care who wants MP3 data; they just know how to take the request and deliver a result back to the space. Loosely coupled applications tend to be more flexible and scalable than tightly coupled ones. Indeed, in this case you can deploy as many workers on as many machines as you would like, without revising or recompiling any code.

Distributed data structures are the building blocks of space-based distributed programs. There is no limit to the variety and complexity of distributed data structures you can build using entries: distributed linked lists, hierarchical tree structures, graphs, and so on. Now that you've seen a few distributed data structures and their power, you should be ready to start inventing new and useful structures for your own applications.

Dr. Susanne Hupfer is vice president of engineering for Mirror Worlds Technologies, a Java- and Jini-based software applications company, and a research affiliate in the Department of Computer Science at Yale University, where she completed her PhD in space-based systems and distributed computing. Previously, she taught Java network programming as an assistant professor of computer science at Trinity College. Susanne coauthored the Sun Microsystems book JavaSpaces Principles, Patterns, and Practice. She would like to thank Peter Sparago, principal architect of Mirror Worlds, for providing his valuable insights on distributed design, and Mark Rubelman and Priyantha Jayanetti for sharing their Java expertise.

Learn more about this topic

1 2 3 Page 3
Page 3 of 3