|
|
Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs
Page 4 of 6
Returning to the more mundane task of generating and buffering prime numbers, the simple application controls access to the results buffer by a gate that functions in a manner similar to a binary latch: when the gate is open, threads can pass, and when it is closed, threads must queue until it is re-opened. It is more clever than a latch in the sense that it batches waiting threads into generations, which ensures that threads in the same generation all pass through the gate at the same time; threads in an older generation always take priority over those in younger generations. The gate implementation also has an innovative feature that ensures that results can always be written to the results buffer regardless of how many waiting threads there are. If a generic thread gate is like a traffic light, then this would be more akin to a railway crossing in the sense that the train always gets priority -- and for good reason, too.
The gate implementation is contained in the compilation unit com.javaworld.primefinder.ThreadGate, and is detailed in Listing 3.
package com.javaworld.primefinder;
class ThreadGate
{
private int waitPosition;
private int generationSize;
private int openAttempts;
private int activeSearchThreads;
private boolean open;
public ThreadGate(int searchThreadsCount)
public synchronized void await() throws InterruptedException
public synchronized void close() throws InterruptedException
public synchronized void open()
}
To deconstruct this class, begin by taking a look at the class-level fields, of which there are five:
waitPosition: This field tracks the generation index of waiting threads. It helps ensure fairness, because the threads that are let through
are the last set of threads to arrive -- counting from the last time the gate was opened. Its value is incremented each time
the open() method is invoked, thus forcing subsequent thread arrivals into a new, younger generation.
generationSize: Tracks the size of the current generation of threads waiting for the gate to open. Its value should be reset each time the
open() or close() method is invoked.
openAttempts: An integer counter indicating the number of attempts that have been made to open the gate since the last time it was closed.
In practice, it actually tracks the number of results that have been added to the results buffer since the last time the gate
was closed. This should be apparent on examining the result handle implementation com.javaworld.primefinder.ConcurrentPrimeNumberSource, which shows that the open() method is invoked each time a new prime number is added to the results buffer. The value of this field is not only altered
by calling open() but is also reset to zero with each call to close().
activeSearchThreads: Reader threads should not wait needlessly if there is no prospect of any further results being made available. A key indicator
of the prospect of further results is whether or not there are still any active search threads. As such, it is important that
the gate keep track of the number of active search threads. If this number is equal to zero, then the await() method will not block, thus allowing reader threads free passage through the gate.
open: A binary value that indicates whether the gate is open or closed.
Although the gate implementation's methods are self explanatory, take some time to examine the await() and open() methods. The code for await() is shown in Listing 4.
public synchronized void await() throws InterruptedException
{
int generationIndex = waitPosition;
generationSize++;
while (!open && generationIndex ==waitPosition
&& activeSearchThreads!=0)
wait();
}
You'll note that this method captures the value of the current wait position -- the generation index, in other words. Whilst
the gate is closed and the generation index has not been advanced, it will remain in a wait state. This means that any threads
arriving since the last time the gate was opened will have to wait until the previous generation of threads has been served
and the gate undergoes another close() to open() cycle, or the producer (search) threads terminate.
(As an exercise, consider how this method can be altered so that the gate does not necessarily have to be closed in order to batch threads into generations. In essence, how would altering the method help implement a scheme where the gate is simply used to batch processing threads? What would an application look like when efficiency demands that requests are serviced in bulk, rather than individually?)
The open() method is illustrated in Listing 5. It should be clear that the generation counter is advanced by 1 each time the gate re-opens.
public synchronized void open()
{
openAttempts++;
if (generationSize<=openAttempts)
{
++waitPosition; //reset the counter
open=true;
generationSize = 0;
notifyAll();
}
}
You should also note that the method uses notifyAll() instead of notify(), thus ensuring that all the threads in a given generation are given the opportunity to proceed.
In the next section will outline the implementation of the PrimeNumberSource interface, which serves as the handle for accessing search results.
The result handle is encapsulated in the compilation unit com.javaworld.primefinder.ConcurrentPrimeNumberSource; its contents are outlined in Listing 6.
public class ConcurrentPrimeNumberSource
implements PrimeNumberSource
{
private ThreadGate barrier;
private List<BigInteger> resultsBuffer;
private int resultsBufferAccessIndex;
protected ConcurrentPrimeNumberSource(
int searchThreadsCount)
public synchronized BigInteger nextPrime()
//delegates to barrier
protected void searchThreadCompleted()
//updates results
protected void addResultToBuffer(
BigInteger result)
}
As you can see in the code, instances of this class hold a reference to a gate, which is used to control concurrent access
to the underlying results buffer -- as encapsulated by the field resultsBuffer.
An integer counter resultsBufferAccessIndex is used as a pointer to the last read buffer location. Note that the field is not atomic, and as such the nextPrime() method is defined as synchronized so as to prevent destructive updates. In Listing 7, you can also see that the gate's close() method is only invoked if the result buffer has been exhausted and the gate is not already closed.
java.util.concurrent that can be used to speed up time-consuming tasks in multithreaded Java applications.