Make room for JavaSpaces, Part 5

Make your compute server robust and scalable with Jini and JavaSpaces

In "Make Room for JavaSpaces, Part 2: Build a compute server with JavaSpaces" you saw how to build a simple general-purpose compute server using the JavaSpaces technology. Recall that a compute server is a powerful, all-purpose computing engine that accepts tasks, computes them, and returns results. In that design, a master process breaks down a large, compute-intensive problem into smaller tasks -- entries that describe the task and contain a method to perform the necessary computation -- and writes them into a space. In the meantime, worker processes watch the space and retrieve tasks as they become available, compute them, and write the results back to the space, from which the master will retrieve them at some point.

TEXTBOX:

TEXTBOX_HEAD: Make room for JavaSpaces: Read the whole series!

:END_TEXTBOX

Simple as it is, the compute server has some impressive qualities. First, it is general purpose: You can simply send it new tasks whenever you want and know they will be computed. There's no need to bring the compute server down or install new task code on various machines, since executable content is built into the task entries themselves. Second, the compute server scales automatically: As new workers come along, they can pick up tasks and join in the computation, thereby speeding up the overall solution. Workers can come and go from the compute server, without requiring code changes or reconfiguration. And finally, the compute server is well suited to load balancing: workers pick up tasks whenever they can, so the workload is balanced naturally among workers on slower and faster machines.

Despite its admirable properties, the compute server still isn't quite ready for the real world yet. In this article, I'll show you how to make two big improvements to gear the system up for real-world use. If you read Part 4 of this series, you've probably guessed that one of the compute server's weaknesses is that it neglects to account for the presence of partial failure. Armed with what you know about Jini transactions from that article, I'll now have you revisit the compute server code and show you how to make it more fault tolerant. Another potential shortcoming of the compute server is its use of a single JavaSpace, running on a single CPU. Under some circumstances, reliance on one space may introduce a bottleneck, so I'll revisit the compute server code to show how you can make use of multiple spaces to allow for greater scalability.

Adding transactions to the worker

Take another look at the original worker code from the compute server and see why it's not fault tolerant:

public class Worker {
    private JavaSpace space;
    public static void main(String[] args) {
        Worker worker = new Worker();
        worker.startWork();
    }
    public Worker() {
        space = SpaceAccessor.getSpace();
    }
    public void startWork() {
        TaskEntry template = new TaskEntry();
        for (;;) {
            try {
                TaskEntry task = (TaskEntry) 
                    space.take(template, null, Long.MAX_VALUE);
                Entry result = task.execute();
                if (result != null) {
                    space.write(result, null, 1000*60*10);
                }
            } catch (Exception e) {
                System.out.println("Task cancelled");
            }
        }
    }
}

After gaining access to a space and calling the startWork method, the worker repeatedly takes a task entry from the space, computes the task, and writes the result to the space. Note that take and write are both performed under a null transaction, which means each of those operations consists of one indivisible action (the operation itself). Step back and think about one scenario that can occur in networked environments, which are prone to partial failure. Consider the case in which a worker removes a task and begins executing it, and then failure occurs (maybe the worker dies unexpectedly or gets disconnected from the network). In this scenario, the task entry is lost for good, and as a result the overall computation won't ever be fully solved.

You can make the worker more robust by using transactions. (The complete code for the compute server that has been reworked with transactions can be found in Resources and forms the javaworld.simplecompute2 package.) First you'll modify the worker's constructor to obtain a TransactionManager proxy object and assign it to the variable mgr, and you'll define a getTransaction method that creates and returns new transactions:

public class Worker {
    private JavaSpace space;
    private TransactionManager mgr;
. . .
public Worker() {
        space = SpaceAccessor.getSpace();
        mgr = TransactionManagerAccessor.getManager();
    }
    public Transaction getTransaction(long leaseTime) {       
        try {
            Transaction.Created created =
                TransactionFactory.create(mgr, leaseTime);
            return created.transaction;
        } catch(RemoteException e) {
            e.printStackTrace();
            return null;
        } catch(LeaseDeniedException e) {
            e.printStackTrace();
            return null;
        }
    }
}

Most of the getTransaction method should be familiar to you after you have read Make Room for JavaSpaces, Part 4. Note that the method has a leaseTime parameter, which indicates the lease time that you'd like the transaction to have.

Now let's modify the startWork method to add support for transactions:

public void startWork() {
    TaskEntry template = new TaskEntry();
    for (;;) {
        // try to get a transaction with a 10-min lease time
        Transaction txn = getTransaction(1000*10*60);
        if (txn == null) {
            throw new RuntimeException("Can't obtain a transaction");
        }
       try {
            try {
                // take the task under a transaction
                TaskEntry task = (TaskEntry) 
                    space.take(template, txn, Long.MAX_VALUE);
// perform the task
                Entry result = task.execute();
// write the result into the space under a transaction
                if (result != null) {
                    space.write(result, txn, 1000*60*10);
                }
            } catch (Exception e) {
                System.out.println("Task cancelled:" + e);
                txn.abort();
            }
            txn.commit();
        } catch (Exception e) {
            System.out.println("Transaction failed:" + e);
        }
    }

Each time startWork iterates through its loop, it calls getTransaction to attempt to get a new transaction with a lease time of 10 minutes. If an exception occurs while creating the transaction, then the call to getTransaction returns null, and the worker throws a runtime exception. Otherwise, the worker has a transaction in hand and can continue with its work.

First, you call take (passing it the transaction) and wait until it returns a task entry. Once you have a task entry, you call the task's execute method and assign the returned value to the local variable result. If the result entry is non-null, then you write it into the space under the transaction, with a lease time of 10 minutes.

In this scenario, three things could happen. One possibility is that the operations complete without throwing any exceptions, and you attempt to commit the transaction by calling the transaction's commit method. By calling this method, you're asking the transaction manager to commit the transaction. If the commit is successful, then all the operations invoked under the transaction (in this case, the take and write) occur in the space as one atomic operation.

The second possibility is that an exception occurs while carrying out the operations. In this case, you explicitly ask the transaction manager to abort the transaction in the inner catch clause. If the abort is successful, then no operations occur in the space -- the task still exists in the space as if it hadn't been touched.

A third possibility is that an exception occurs in the process of committing or aborting the transaction. In this case, the outer catch clause catches the exception and prints a message, indicating that the transaction failed. The transaction will expire when its lease time ends (in this case after 10 minutes), and no operations will take place. The transaction will also expire if this client unexpectedly dies or becomes disconnected from the network during the series of calls.

Now that you've made the worker code robust, let's turn to the master code and show how you can improve it as well.

Adding transactions to the master

Recall the Master code from the compute server example, which calls the generateTasks method to generate a set of tasks and then calls the collectResults method to collect results:

public class Master {
    private JavaSpace space;
    public static void main(String[] args) {
        Master master = new Master();
        master.startComputing();
    }
    private void startComputing() {
        space = SpaceAccessor.getSpace();
        generateTasks();
        collectResults();
    }
    private void generateTasks() {
        for (int i = 0; i < 10; i++) {           
            writeTask(new AddTask(new Integer(i), new Integer(i)));
        }
        for (int i = 0; i < 10; i++) {
            writeTask(new MultTask(new Integer(i), new Integer(i)));
        }
    }
    private void collectResults() {
        for (int i = 0; i < 20; i++) {
            ResultEntry result = takeResult();
            if (result != null) {
                System.out.println(result);
            }
        }
    }
    private void writeTask(Command task) {
        try {
            space.write(task, null, Lease.FOREVER);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    protected ResultEntry takeResult() {
        ResultEntry template = new ResultEntry();
        try {
            ResultEntry result = (ResultEntry)
                space.take(template, null, Long.MAX_VALUE);
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }            
} 

You'll notice in writeTask that the write occurs under a null transaction. If the write returns without throwing an exception, then you can trust that the entry was committed to the space. But if a problem occurs during the operation and an exception is thrown, you can't know for sure whether or not the task entry was written to the space. If a RemoteException is thrown (which can occur whenever a space operation communicates with the remote JavaSpace service), the task entry may or may not have been written. If any other type of exception is thrown, then you know the entry wasn't written to the space.

The master isn't very fault tolerant, since you never know for sure whether or not a task is written successfully into the space. And if some of the tasks don't get written, the compute server isn't going to be able to completely solve the parallel computation on which it's working. To make Master more robust, you'll first add the convenient getTransaction method that you saw previously. You'll also modify the writeTask to make use of transactions and to return a Boolean value that indicates whether or not it wrote the task:

private boolean writeTask(Command task) {
    // try to get a transaction with a 10-min lease time
    Transaction txn = getTransaction(1000*10*60);
    if (txn == null) {
        throw new RuntimeException("Can't obtain a transaction");
    }
    try {
        try {
            space.write(task, txn, Lease.FOREVER);
        } catch (Exception e) {
            txn.abort();
            return false;
        }
        txn.commit();
        return true;
    } catch (Exception e) {
        System.err.println("Transaction failed");
        return false;
    }
}

First, writeTask tries to obtain a transaction with a 10-minute lease by calling getTransaction, just as you did in the worker code. With the transaction in hand, you can retrofit the write operation to work under it. This time when you call write, you supply the transaction as the second argument.

Again, three things can happen as this code runs. If the write completes without throwing an exception, you attempt to commit the transaction. If the commit succeeds, then you know the task entry has been written to the space, and you return a status of true. On the other hand, if the write throws an exception, you attempt to abort the transaction. If the abort succeeds, you know that the task entry was not written to the space (if it was written, it's discarded), and you return a status of false.

The third possibility is that an exception is thrown in the process of either committing or aborting the transaction. In this case, the outer catch clause prints a message, the transaction expires when its lease time is up, and the write operation doesn't occur, so you return a status of false.

1 2 3 Page
Recommended
Join the discussion
Be the first to comment on this article. Our Commenting Policies
See more