Recommended: Sing it, brah! 5 fabulous songs for developers
JW's Top 5
Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs
Page 2 of 2
Now it is time for the nitty-gritty third phase of SMT adaptation. If you haven't done so already, take a break to study the
code in BasicKMeans.java so you'll fully understand the listings that follow. This file is rigorously commented, as are the other files that accompany
it.
Based upon the benchmarks, computeDistances() and makeAssignments() are adapted to SMT in the final version of K-means, found in ConcurrentKMeans.java. Before jumping into the code, I should comment on two very important utility classes from java.util.concurrent: ThreadPoolExecutor and CyclicBarrier.
As you probably guess from the name, ThreadPoolExecutor contains a thread pool. It implements the Executor interface, which mandates the single method public void execute(Runnable command). An Executor executes a Runnable by calling its run method in some manner. A ThreadPoolExecutor executes a Runnable on one of the threads it keeps in an internal pool. When a Runnable is submitted to its execute method, it finds an idle thread in the pool and gives it the Runnable to execute.
ThreadPoolExecutor may be configured with either a variable or a fixed number of threads. The threads may be configured to time out if not given
a task for a sufficient length of time. For the purposes of SMT-adaptation described here, a simple fixed-size pool with threads
that do not time out is desired. Such a pool is obtained using the simple factory method in the Executors class: public Executor newFixedThreadPool(int nThreads).
The other class, CyclicBarrier, is needed for coordination. When an SMT-adapted step is being performed by the pool's threads, the controlling thread needs
to know when they all have finished. The next step's success depends upon what the threads in the pool have done. A CyclicBarrier instance informs the controlling thread when an SMT-adapted step is complete, so the controlling thread knows when to proceed.
An appropriate CyclicBarrier is instantiated using the constructor: public CyclicBarrier(int parties, Runnable barrierAction). The argument parties is the number of threads that the barrier will coordinate. In this case, this is the number of threads in the thread pool.
The barrierAction is a Runnable for the barrier to execute whenever it is reached. Reaching the barrier means that all of its parties—the subtask threads
in the thread pool—have informed the barrier that they are finished. These threads inform the barrier of their completion
by calling the barrier's await() method. The barrier tracks the number of calls to await(), and when that number becomes equal to parties, the barrier executes barrierAction.
In ConcurrentKMeans, the controlling thread waits while the thread pool's threads are doing one of the subtasks. The execution of the barrier
action breaks the controlling thread out of the wait, allowing K-means to continue.
Now let's plunge into the code. Compare the run method of ConcurrentKMeans in Listing 3 below, with that of BasicKMeans in Listing 1. The only significant difference is the instantiation of an object variable of type SubtaskManager. This object manages the SMT-adapted subtasks of the concurrent version of K-means.
Listing 3. The run method of ConcurrentKMeans
public void run() {
try {
// Note the start time.
long startTime = System.currentTimeMillis();
postKMeansMessage("K-Means clustering started");
// Randomly initialize the cluster centers.
initCenters();
postKMeansMessage("... centers initialized");
// Instantiate the subtask manager.
mSubtaskManager = new SubtaskManager(mThreadCount);
// Post a message about the state of concurrent subprocessing.
if (mThreadCount > 1) {
postKMeansMessage("... concurrent processing mode with "
+ mThreadCount + " subtask threads");
} else {
postKMeansMessage("... non-concurrent processing mode");
}
// Perform the initial computation of distances.
computeDistances();
// Make the initial cluster assignments.
makeAssignments();
// Number of moves in the iteration and the iteration counter.
int moves = 0, it = 0;
// Main Loop:
//
// Two stopping criteria:
// - no moves in makeAssignments
// (moves == 0)
// OR
// - the maximum number of iterations has been reached
// (it == mMaxIterations)
//
do {
// Compute the centers of the clusters that need updating.
computeCenters();
// Compute the stored distances between the updated clusters and the
// coordinates.
computeDistances();
// Make this iteration's assignments.
moves = makeAssignments();
it++;
postKMeansMessage("... iteration " + it + " moves = " + moves);
} while (moves > 0 && it < mMaxIterations);
// Transform the array of ProtoClusters to an array
// of the simpler class Cluster.
mClusters = generateFinalClusters();
long executionTime = System.currentTimeMillis() - startTime;
postKMeansComplete(mClusters, executionTime);
} catch (Throwable t) {
postKMeansError(t);
} finally {
// Clean up temporary datastructures used during the algorithm.
cleanup();
}
}
Compare the version of the computeDistances() from Listing 2 with the ConcurrentKMeans version, shown in Listing 4. The former uses an outer for loop to iterate over the coordinates, computing the distances to
changed clusters in the inner loop. This for loop has been replaced in ConcurrentKMeans by a call to mSubtaskManager.computeDistances(). Similarly, ConcurrentKMeans delegates the bulk of the work of the method makeAssignments() to the SubtaskManager.
Listing 4. computeDistances() from ConcurrentKMeans
/**
* Compute distances between coordinates and cluster centers,
* storing them in the distance cache. Only distances that
* need to be computed are computed. This is determined by
* distance update flags in the protocluster objects.
*/
private void computeDistances() throws InsufficientMemoryException {
if (mDistanceCache == null) { // Distance cache instantiated on first call.
int numCoords = mCoordinates.length;
int numClusters = mProtoClusters.length;
// Explicit garbage collection to reduce likelihood of insufficient
// memory. System.gc();
// Ensure there is enough memory available for the distances.
// Throw an exception if not.
long memRequired = 8L * numCoords * numClusters;
if (Runtime.getRuntime().freeMemory() < memRequired) {
throw new InsufficientMemoryException();
}
// Instantiate an array to hold the distances between coordinates
// and cluster centers.
mDistanceCache = new double[numCoords][numClusters];
}
// Bulk of the work is delegated to the
// SubtaskManager.
mSubtaskManager.computeDistances();
}
SubtaskManager is a non-static class nested within ConcurrentKMeans to manage the SMT-adapted steps. It has to be non-static so that it can access the object fields of the enclosing ConcurrentKMeans. Nested in SubtaskManager is yet another class, a non-static Runnable called Worker. While only one instance of SubtaskManager is created, the number of Worker instances is equal to the number of subtask threads in the thread pool. The Workers are the Runnables submitted to the ThreadPoolExecutor to do the grunt work.
Look at the SubtaskManager code shown in Listing 5 between Lines 6 and 31. The integer field mDoing, having possible values DOING_NOTHING, COMPUTING_DISTANCES, and MAKING_ASSIGNMENTS, is used to track the current subtask. The Boolean mWorking is a flag set to indicate when at least one of the Worker objects is executing. The field mExecutor is declared not as a ThreadPoolExecutor, but as the more general type Executor. You'll understand why when you read the explanation of the constructor. The CyclicBarrier is assigned to the reference field mBarrier. Finally, references to the Worker instances are placed in the array mWorkers.
The constructor, beginning on Line 39, takes the number of threads to use in the thread pool. This must, of course, be at
least 1, or it will throw a much deserved IllegalArgumentException. Line 52 reduces numThreads to the number of coordinates, in case someone attempts something ridiculous, such as clustering 100 coordinates with 200
threads. Lines 55-79 instantiate the Worker objects, dividing up the coordinates among them as evenly as possible and ensuring every coordinate is covered. The for loop
beginning on Line 68 exists simply to apportion any leftover coordinates, since the number of coordinates may not be evenly
divisible by the number of threads.
On Line 81, things get interesting. If numThreads equals 1, Line 85 sets mExecutor to an anonymous inner implementation that calls its Runnable directly. In other words, when only one subtask thread is requested, there is no thread pool. Execution of the single Worker is done on the controlling thread. Also, the CyclicBarrier is not set, because it is unnecessary.
The code within the else clause between Lines 95 and 111 configures the SubtaskManager for the concurrent case. First, mBarrier is instantiated and given a barrier action that calls the SubtaskManager method workersDone(). Then, mExecutor is set to a fixed size ThreadPoolExecutor.
To understand how SubtaskManager accomplishes a subtask, look at its methods makeAssignments() and computeDistances() on Lines 119 and 130, respectively. Both simply set the flag mDoing and call the method work(). This method, beginning on Line 141, returns a Boolean to indicate success of the subtask. It begins by initializing an ok flag to false on Line 142. Then it sets the flag mWorking. Entering the try-catch-finally block, on Line 149, mBarrier is reset if it is non-null. (Remember, the barrier is null if using only one thread.) To reuse the barrier, reset() must be called. The for loop beginning on Line 152 submits each of the Worker objects to the executor. If mExecutor is a ThreadPoolExecutor, as it is when the number of subtask threads is more than one, the Worker objects are executed not on the controlling thread running work(), but on threads in the pool. However, if the number of subtask threads is one, the single Worker executes on the controlling thread.
Immediately after the for loop, if mBarrier is not null, the method waitOnWorkers() blocks the controlling thread in wait mode until the method workersDone() causes it to unblock. Recall that workersDone() is called by the barrier action Runnable when the barrier is reached. Then the ok flag is set to true if mBarrier.isBroken() returns false. The barrier reports a broken state if one of the worker threads is interrupted or throws an exception while
running.
If, on the other hand, mBarrier is null after the for loop, the one Worker is executed on the calling thread and there is no reason to block. In that case, the ok flag is set to true.
The catch clause traps a RejectedExecutionException, an exception that ThreadPoolExecutor's execute method may throw if insufficient threads are available in the pool when the execute request was made, or if the
thread pool was shut down before submitting the request. Because of the code's structure, neither of these cases can happen;
however, to conform to good coding practice, the exception is trapped. (The impossible has a way of happening in software,
doesn't it?) Before returning ok, the method ensures mWorking is set back to false in the finally clause.
The blocking and unblocking functions of SubtaskManager are completed by the methods waitOnWorkers() and workersDone(). Look at workersDone() first, which begins on Line 200. It sets mWorking to false, then calls notifyAll() to break the controlling thread out of its wait. Now look at waitOnWorkers() on Line 180. This method calls wait() inside a while loop that exits when mWorking is false.
I learned the hard way that waitOnWorkers() should check mWorking before calling wait() the first time, because it is possible for all of the workers to finish before the controlling thread even enters waitOnWorkers(). As the workers finish, they call the barrier's await() method, which eventually triggers the barrier action that calls workersDone(). If they finish their work so quickly that this happens before the controlling thread enters waitOnWorkers(), the thread gets stuck if it calls wait(). Thus, it ascertains that mWorking is true first. This flag can only be true if workersDone() did not precede the controlling thread's call to waitOnWorkers().
Now look at the code for Worker beginning on Line 234. As stated before, this class does the grunt work. It has integer object variables mStartCoord and mNumCoords to define the range of coordinates over which an instance operates. Worker's run method (see Line 268) contains a switch-case statement keyed on the SubtaskManager field mDoing. When mDoing is equal to COMPUTING_DISTANCES, the method calls workerComputeDistances(). If mDoing is equal to MAKING_ASSIGNMENTS, the method calls workerMakeAssignments(). After returning from one of these methods, the Worker calls mBarrier's await() method if it is non-null. When all Workers have called await(), the barrier action calls workersDone() to unblock SubtaskManager's controlling thread.
If you look at the Worker methods workerComputeDistances() and workerMakeAssignments(), you'll see that they resemble the methods computeDistances() and makeAssignments() in BasicKMeans. The chief difference is that the Worker methods loop through a subset of the coordinates instead of all of them.
Keeping track of the number of cluster-assignment moves is a little more complicated than before. Each Worker must keep a separate tally in the field mMoves. The SubtaskManager method numberOfMoves() returns their sum. ConcurrentKMeans.makeAssignments() calls mSubtaskManager.makeAssignments(), then returns the result of mSubtaskManager.numberOfMoves().
Listing 5. The nested class SubtaskManager
001 /**
002 * The class which manages the SMT-adapted subtasks.
003 */
004 private class SubtaskManager {
005
006 // Codes used to identify what step is being done.
007 static final int DOING_NOTHING = 0;
008 static final int COMPUTING_DISTANCES = 1;
009 static final int MAKING_ASSIGNMENTS = 2;
010
011 // What the object is currently doing. Set to one of the
012 // three codes above.
013 private int mDoing = DOING_NOTHING;
014
015 // True if at least one of the Workers is doing something.
016 private boolean mWorking;
017
018 // The executor that runs the Workers.
019 // When in multiple-processor mode, this is a ThreadPoolExecutor
020 // with a fixed number of threads. In single-processor mode, it's
021 // a simple implementation that calls the single worker's run
022 // method directly.
023 private Executor mExecutor;
024
025 // A Barrier to wait on multiple Workers to finish up the current task.
026 // In single-processor mode, there is no need for a barrier, so it
027 // is not set.
028 private CyclicBarrier mBarrier;
029
030 // The worker objects which implement Runnable.
031 private Worker[] mWorkers;
032
033 /**
034 * Constructor
035 *
036 * @param numThreads the number of worker threads to be used for
037 * the subtasks.
038 */
039 SubtaskManager(int numThreads) {
040
041 if (numThreads <= 0) {
042 throw new IllegalArgumentException("number of threads <= 0: "
043 + numThreads);
044 }
045
046 int coordCount = mCoordinates.length;
047
048 // There would be no point in having more workers than
049 // coordinates, since some of the workers would have nothing
050 // to do.
051 if (numThreads > coordCount) {
052 numThreads = coordCount;
053 }
054
055 // Create the workers.
056 mWorkers = new Worker[numThreads];
057
058 // To hold the number of coordinates for each worker.
059 int[] coordsPerWorker = new int[numThreads];
060
061 // Initialize with the base amounts.
062 Arrays.fill(coordsPerWorker, coordCount/numThreads);
063
064 // There may be some leftovers, since coordCount may not be
065 // evenly divisible by numWorkers. Add a coordinate to each
066 // until all are covered.
067 int leftOvers = coordCount - numThreads * coordsPerWorker[0];
068 for (int i = 0; i < leftOvers; i++) {
069 coordsPerWorker[i]++;
070 }
071
072 int startCoord = 0;
073 // Instantiate the workers.
074 for (int i = 0; i < numThreads; i++) {
075 // Each worker needs to know its starting coordinate and the
076 // number of coordinates it handles.
077 mWorkers[i] = new Worker(startCoord, coordsPerWorker[i]);
078 startCoord += coordsPerWorker[i];
079 }
080
081 if (numThreads == 1) { // Single-processor mode.
082
083 // Create a simple executor that directly calls the single
084 // worker's run method. Do not set the barrier.
085 mExecutor = new Executor() {
086 public void execute(Runnable runnable) {
087 if (!Thread.interrupted()) {
088 runnable.run();
089 } else {
090 throw new RejectedExecutionException();
091 }
092 }
093 };
094
095 } else { // Multiple-processor mode.
096
097 // Need the barrier to notify the controlling thread when the
098 // Workers are done.
099 mBarrier = new CyclicBarrier(numThreads, new Runnable() {
100 public void run() {
101 // Method called after all workers haved called await() on the
102 // barrier. The call to workersDone()
103 // unblocks the controlling thread.
104 workersDone();
105 }
106 });
107
108 // Set the executor to a fixed thread pool with
109 // threads that do not time out.
110 mExecutor = Executors.newFixedThreadPool(numThreads);
111 }
112 }
113
114 /**
115 * Make the cluster assignments.
116 *
117 * @return true if nothing went wrong.
118 */
119 boolean makeAssignments() {
120 mDoing = MAKING_ASSIGNMENTS;
121 return work();
122 }
123
124 /**
125 * Compute the distances between the coordinates and those centers
126 * with update flags set.
127 *
128 * @return true if nothing went wrong.
129 */
130 boolean computeDistances() {
131 mDoing = COMPUTING_DISTANCES;
132 return work();
133 }
134
135 /**
136 * Perform the current subtask, waiting until all the workers
137 * finish their part of the current task before returning.
138 *
139 * @return true if the subtask succeeded.
140 */
141 private boolean work() {
142 boolean ok = false;
143 // Set the working flag to true.
144 mWorking = true;
145 try {
146 if (mBarrier != null) {
147 // Resets the barrier so it can be reused if
148 // this is not the first call to this method.
149 mBarrier.reset();
150 }
151 // Now execute the run methods on the Workers.
152 for (int i = 0; i < mWorkers.length; i++) {
153 mExecutor.execute(mWorkers[i]);
154 }
155 if (mBarrier != null) {
156 // Block until the workers are done. The barrier
157 // triggers the unblocking.
158 waitOnWorkers();
159 // If the isBroken() method of the barrier returns false,
160 // no problems.
161 ok = !mBarrier.isBroken();
162 } else {
163 // No barrier, so the run() method of a single worker
164 // was called directly and everything must have worked
165 // if we made it here.
166 ok = true;
167 }
168 } catch (RejectedExecutionException ree) {
169 // Possibly thrown by the executor.
170 } finally {
171 mWorking = false;
172 }
173 return ok;
174 }
175
176 /**
177 * Called from work() to put the controlling thread into
178 * wait mode until the barrier calls workersDone().
179 */
180 private synchronized void waitOnWorkers() {
181 // It is possible for the workers to have finished so quickly that
182 // workersDone() has already been called. Since workersDone() sets
183 // mWorking to false, check this flag before going into wait mode.
184 // Not doing so could result in hanging the SubtaskManager.
185 while (mWorking) {
186 try {
187 // Blocks until workersDone() is called.
188 wait();
189 } catch (InterruptedException ie) {
190 // mBarrier.isBroken() will return true.
191 break;
192 }
193 }
194 }
195
196 /**
197 * Notifies the controlling thread that it can come out of
198 * wait mode.
199 */
200 private synchronized void workersDone() {
201 // If this gets called before waitOnWorkers(), setting this
202 // to false prevents waitOnWorkers() from entering a
203 // permanent wait.
204 mWorking = false;
205 notifyAll();
206 }
207
208 /**
209 * Shutdown the thread pool when k-means is finished.
210 */
211 void shutdown() {
212 if (mExecutor instanceof ThreadPoolExecutor) {
213 // This terminates the threads in the thread pool.
214 ((ThreadPoolExecutor) mExecutor).shutdownNow();
215 }216 }
217
218 /**
219 * Returns the number of cluster assignment changes made in the
220 * previous call to makeAssignments().
221 */
222 int numberOfMoves() {
223 // Sum the contributions from the workers.
224 int moves = 0;
225 for (int i=0; i<mWorkers.length; i++) {
226 moves += mWorkers[i].numberOfMoves();
227 }
228 return moves;
229 }
230
231 /**
232 * The class which does the hard work of the subtasks.
233 */
234 private class Worker implements Runnable {
235
236 // Defines range of coordinates to cover.
237 private int mStartCoord, mNumCoords;
238
239 // Number of moves made by this worker in the last call
240 // to workerMakeAssignments(). The SubtaskManager totals up
241 // this value from all the workers in numberOfMoves().
242 private int mMoves;
243
244 /**
245 * Constructor
246 *
247 * @param startCoord index of the first coordinate covered by
248 * this Worker.
249 * @param numCoords the number of coordinates covered.
250 */
251 Worker(int startCoord, int numCoords) {
252 mStartCoord = startCoord;
253 mNumCoords = numCoords;
254 }
255
256 /**
257 * Returns the number of moves this worker made in the last
258 * execution of workerMakeAssignments()
259 */
260 int numberOfMoves() {
261 return mMoves;
262 }
263
264 /**
265 * The run method. It accesses the SubtaskManager field mDoing
266 * to determine what subtask to perform.
267 */
268 public void run() {
269
270 try {
271 switch (mDoing) {
272 case COMPUTING_DISTANCES:
273 workerComputeDistances();
274 break;
275 case MAKING_ASSIGNMENTS:
276 workerMakeAssignments();
277 break;
278 }
279 } finally {
280 // If there's a barrier, call its await() method. To
281 // ensure that it gets done, it's placed in the finally clause.
282 if (mBarrier != null) {
283 try {
284 mBarrier.await();
285 // barrier.isBroken() will return true if either of
286 // these exceptions happens, so the SubtaskManager
287 // will detect the problem.
288 } catch (InterruptedException ex) {
289 } catch (BrokenBarrierException ex) {
290 }
291 }
292 }
293
294 }
295
296 /**
297 * Compute the distances for the covered coordinates
298 * to the updated centers.
299 */
300 private void workerComputeDistances() {
301 int lim = mStartCoord + mNumCoords;
302 for (int i = mStartCoord; i < lim; i++) {
303 int numClusters = mProtoClusters.length;
304 for (int c = 0; c < numClusters; c++) {
305 ProtoCluster cluster = mProtoClusters[c];
306 if (cluster.getConsiderForAssignment() &&
307 cluster.needsUpdate()) {
308 mDistanceCache[i][c] = distance(mCoordinates[i],
309 cluster.getCenter());
310 }
311 }
312 }
313 }
314
315 /**
316 * Assign each covered coordinate to the nearest cluster.
317 */
318 private void workerMakeAssignments() {
319 mMoves = 0;
320 int lim = mStartCoord + mNumCoords;
321 for (int i = mStartCoord; i < lim; i++) {
322 int c = nearestCluster(i);
323 mProtoClusters[c].add(i);
324 if (mClusterAssignments[i] != c) {
325 mClusterAssignments[i] = c;
326 mMoves++;
327 }
328 }
329 }
330
331 }
332 }
At some point while reading this, you've probably wondered about what synchronization issues might be encountered in SMT adaptation.
I encountered two with ConcurrentKMeans, both in the nested class ProtoCluster, which is a class K-means uses to track intermediate clustering results. It was clear to me that something was wrong, because
ConcurrentKMeans gave results different from BasicKMeans even though it was using the same N, K, and random seed. I ran it several times, occasionally getting an ArrayIndexOutOfBoundsException originating from the ProtoCluster method add(int ndx). Then the obvious dawned on me: multiple threads were calling an unsynchronized method. (Doh!) The exception happened because
one worker thread attempted to add a coordinate, while another thread was expanding the array holding the coordinate indices.
Simply adding the synchronized modifier to the add(int ndx) method definition fixed the problem.
The second problem was more subtle. I was disappointed with the speedup when running with four threads on my dual-processor
Xeon system. It was only 30 percent faster than the single-threaded version. So, stepping through the code in debug mode,
I finally noticed that computeDistances() computed nearly all of the distances each time it was called. This method is supposed to compute only those distances to
clusters whose update flag is set to true. The reason it was doing more work than necessary was that ProtoCluster's setUpdateFlag() method was spuriously setting the flag regardless of whether the cluster had actually changed. It turned out that the two
arrays in ProtoCluster, mPreviousMembership and mCurrentMembership, which setUpdateFlag() compares element by element, were in mismatched order. Even though they contained the same indices, their elements were out
of order because the order in which the multiple threads add them is undefined. In single-threaded mode, this did not happen:
the one thread always added the coordinates in order. To mend the problem, I just needed to add a single line sorting mCurrentMembership before comparing its elements to the other array.
Now for the fun part: the results. The table below shows the results I obtained running the test program on my Dell workstation
with two hyper-threaded Xeon processors. The percentages shown in parentheses are reductions in processing time over BasicKMeans. As you can see, ConcurrentKMeans gains significantly in speed when run with either two or four threads. Since my system is capable of executing four threads
simultaneously, the maximum speedup is 64 percent—almost a tripling of the speed. I also tested the program on a Panasonic
laptop with a Centrino Duo processor. Running it with two threads gave a time reduction of 47 percent!
Results on Dell PC with Two Xeon Processors
| Number of coordinates | Number of clusters | BasicKmeans: | ConcurrentKMeans: Processing times (and time reduction percent) | ||
| (N) | (K) | Processing times | Running 1 thread | Running 2 threads | Running 4 threads |
| 25,000 | 300 | 88,300 ms | 88,252 ms | 48,595 ms (45 percent) | 31,610 ms (64 percent) |
| 10,000 | 100 | 8,907 ms | 8,922 ms | 5,000 ms (44 percent) | 3,328 ms (63 percent) |
| 5000 | 50 | 1,813 ms | 1,812 ms | 1,031 ms (43 percent) | 719 ms (60 percent) |
| 1000 | 10 | 78 ms | 78 ms | 62 ms (21 percent) | 63 ms (19 percent) |
I included the single thread column for concurrent K-means to demonstrate that the SMT adaptation did not incur a performance penalty in single-threaded mode. This is an important point, since algorithms that you adapt to SMT almost certainly still need to execute well on SMT-incapable systems.
This article should convince you that adapting your time-consuming tasks for concurrency can be well worth the effort. In fact, it may be imperative if your programs are to compete successfully on the platforms of the future. If you get in the habit of programming your tasks for concurrency now, as they are redeployed on newer and newer systems capable of running increasing numbers of threads at once, your programs will scream ahead of the single-threaded products of your competitors.
Archived Discussions (Read only)