|
|
By Alex. C. Punnen
Architect – Nokia Siemens Networks
Bangalore
Hanging threads are a common challenge in the development of software that has to interface with proprietary devices using proprietary or standardized interfaces such as SNMP, Q3, or Telnet. This problem is not limited to network management but occurs across a wide range of fields like web servers, processes invoking remote procedure calls, and so on.
A thread that initiates a request to a device needs a mechanism to detect in case the device does not respond or responds only partially. In some cases where such a hang is detected, a specific action must be taken. The specific action could be either retrial or letting the end-user know about the task failure or some other recovery option. In some cases where a large number of tasks must be fired to a large number of network elements by a component, hanging thread detection is important so that it does not become a bottleneck for other task processing. So there are two aspects to managing hanging threads: performance and notification.
For the notification aspect we can tailor the Java Observer pattern to fit in the multithreaded world.
Due to hanging tasks, using the Java ThreadPool class with a
fitting strategy is the first solution that comes to mind. However
using the Java ThreadPool in the context of some threads randomly
hanging over a period of time gives unwanted behavior based on the
particular strategy used, like thread starvation in the case of fixed thread
pool strategy. This is mainly due to the fact that the Java
ThreadPool does not have a mechanism to detect a thread hang.
We could try a Cached thread pool, but it also has problems. If
there is a high rate of task firing, and some threads hang, the number
of threads could shoot up, eventually causing resource starvation and
out-of-memory exceptions. Or we could use a Custom ThreadPool strategy
invoking a CallerRunsPolicy. In this case, too, a thread hang could cause all
threads to hang eventually. (The main thread should never be the caller,
since there is a possibility that any task passed to the
main thread can hang, causing everything to grind to a
halt.)
So, what is the solution? I'll demonstrate a not-so-simple ThreadPool pattern that adjusts the pool size according to the task rate and based on the number of hanging threads. Let us first go to the problem of detecting hanging threads.
Figure 1 shows an abstraction of the pattern:

There are two important classes here: ThreadManager and
ManagedThread. Both extend from the Java
Thread class. The ThreadManager holds a
container which holds the ManagedThreads. When a new
ManagedThread is created it adds itself to this
container.
ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false);
testthread.start();
thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10);
thrdManger.start();The ThreadManager iterates through this list and calls the
ManagedThread's isHung() method. This is basically a timestamp
check logic.
if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime )
{
logger.debug("Thread is hung");
return true;
}If it finds that a thread has
gone into a task loop and never updated its results it takes a recovery
mechanism as stipulated by the ManageThread.
while(isRunning)
{
for (Iterator<ManagedThreadData> iterator = managedThreads.iterator(); iterator.hasNext();) {
ManagedThreadData thrddata = (ManagedThreadData) iterator.next();
if(thrddata.getManagedThread().isHung())
{
logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() );
switch (thrddata.getManagedAction())
{
case RESTART_THREAD: // The action here is to restart the the thread
//remove from the manager
iterator.remove();
//stop the processing of this thread if possible
thrddata.getManagedThread().stopProcessing();
if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create
{
ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread
newThread.start();
//add it back to be managed
manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime());
}
break;
.........For a new ManagedThread to be created and used in place of the hung
one it should not hold any state or any container. For this the
container on which the ManagedThread acts should be separated out.
Here we are using the ENUM-based Singleton pattern to hold the Task
list. So the container holding the tasks is independent of the thread
processing the tasks. Click the following link to download the source for the pattern described:
Java Thread Manager Source.
The Java ThreadPool does not have a mechanism for
detecting hanging threads. Using a strategy like fixed threadpool
(Executors.newFixedThreadPool()) will not work
because if some tasks hang over time, all of the threads will
eventually be in a hung state. Another option is using a cached
ThreadPool policy (Executors.newCachedThreadPool()).
This could ensure that there will be always threads available to process
a task, only constrained by VM memory, CPU, and thread limits. However,
with this policy there is no control of the number of threads that get
created. Regardless of whether a processing thread hangs or not, using
this policy while the task rate is high leads to a huge number of
threads getting created. If you do not have enough resources for the JVM
very soon you will hit the maximum memory threshold or high CPU. It is
pretty common to see the number of threads hit hundreds or thousands.
Even though they are released once the task is processed, sometimes
during burst-handling the high number of threads will overwhelm system
resources.
A third option is using custom strategies or policies. One such option is to have a thread pool that scales from 0 to some maximum number. So even if one thread hung a new thread would be created as long as the maximum thread count was reached:
execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());Here 3 is the maximum thread count and keep-alive time is set to 60 seconds as this is a task-intensive process. If we give a high enough maximum thread count, this is more or less a reasonable policy to use in the context of hanging tasks. The only problem is that if the hanging threads are not released eventually there is a slight chance that all threads could at some point hang. If maximum threads is sufficiently high and assuming that a task hang is an infrequent phenomenon, then this policy would fit the bill.
It would have been sweet if the ThreadPool had also a pluggable
mechanism of detecting hanging threads. I'll discuss one such
design later. Of course if all the threads are freezed up you could
configure and use the rejected-task policy of the thread pool. If you do
not want to discard the tasks you would have to use the CallerRunsPolicy:
execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>() new
ThreadPoolExecutor.CallerRunsPolicy());In this case, if a thread hang caused a task to be rejected, that task would be given to the calling thread to be handled. There is always a chance of that task too hanging. In this case the entire process would freeze. So it is better not to add such a policy in this context.
public class NotificationProcessor implements Runnable {
private final NotificationOriginator notificationOrginator;
boolean isRunning = true;
private final ExecutorService execexec;
AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor
// execexec = Executors.newCachedThreadPool();// Too many threads
// execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection
execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
public void run() {
while (isRunning) {
try {
final Task task = TaskQueue.INSTANCE.getTask();
Runnable thisTrap= new Runnable() {
public void run() {
++alarmid;
notificaionOrginator.notify(new OctetString(), // Task processing
nbialarmnew.getOID(),
nbialarmnew.createVariableBindingPayload());
É........}} ;
execexec.execute(thisTrap);
}A thread pool library with the capability of task hang detection and handling would be great to have. I've developed one and I'll demonstrate it below. This is actually a port from a C++ thread pool that I designed and used some time back (see references). Basically, this solution uses the Command pattern and the Chain of Responsibility pattern. However to implement the Command pattern in Java without the aid of Function object support is a bit difficult. For this I had to change the implementation slightly to use Java reflection. Note that the context in which this pattern was designed was where a thread pool had to be fitted in/plugged in without modifying any of the existing classes. (I believe the one big benefit of object-oriented programming is that it gives us a way to design classes so as to make effective use of the Open Closed Principle. This is especially true of complex old legacy code and might be of less relevance for new product development.) Hence I used reflection instead of using an interface to implement the Command pattern. The rest of the code could be ported without major change as almost all the thread synchronization and signaling primitives are available in Java 1.5 onwards.
public class Command<T> {
private Object[ ]argParameter;
........
//Ctor for a method with two args
Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) {
m_objptr = pObj;
m_methodName = mthodName;
m_timeout = timeout;
m_key = key;
argParameter = new Object[2];
argParameter[0] = arg1;
argParameter[1] = arg2;
}
// Calls the method of the object
void execute() {
Class klass = m_objptr.getClass();
Class[] paramTypes = new Class[]{int.class, int.class};
try {
Method methodName = klass.getMethod(m_methodName, paramTypes);
//System.out.println("Found the method--> " + methodName);
if (argParameter.length == 2) {
methodName.invoke(m_objptr, (Object) argParameter[0],
(Object) argParameter[1]);
}
Example of usage of this pattern:
public class CTask
{..
public int DoSomething(int a, int b) {...}
}Command<CTask> cmd4 = new Command<CTask>(task4, "DoMultiplication", 1, "key2",2,5);
Now we have two more important classes here. One is the ThreadChain class, which implements the
Chain of Responsibility pattern:
public class ThreadChain<T> implements Runnable {
public ThreadChain(ThreadChain p, ThreadPool pool, String name) {
AddRef();
deleteMe = false;
busy = false; //--> very important
next = p; //set the thread chain - note this is like a linked list impl
threadpool = pool; //set the thread pool - Root of the threadpool
........
threadId = ++ThreadId;
......
// start the thread
thisThread = new Thread(this, name + inttid.toString());
thisThread.start();
}
This class has two main methods. One is Boolean CanHandle() which is
initiated by the ThreadPool class and then proceeds recursively. This
checks if the current thread (current ThreadChain instance) is free to
handle the task. If it is is already handling a task it calls the next one
in the chain.
public Boolean canHandle() {
if (!busy) { //If not busy
System.out.println("Can Handle This Event in id=" + threadId);
// todo signal an event
try {
condLock.lock();
condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method
.........................................
return true;
}
.........................................
///Else see if the next object in the chain is free
/// to handle the request
return next.canHandle();Note that the HandleRequest is a method of ThreadChain that is invoked
from the the Thread run() method and awaits the signal from the
canHandle method. Also note how the task is handled via the Command
pattern.
void HandleRequest() {
.......
while (!deleteMe) {
try {
............................
condLock.lock();
condWait.await(); ///This event is signalled from
the canHandle mthd
condLock.unlock();
............................
starttime.set( System.currentTimeMillis()); ///Set
the start time,AtomicLong - for checking if the thread is hang or not
............................
//Now set that this object as busy-it is going to handle
a request
busy = true;
............................
Command<t> temp = threadpool.getRequest();
m_timeout = temp.GetTimeOut() * 1000; /// The time out is in seconds
temp.execute();Each ThreadChain object also checks if the thread associated with it is
in a hung state; if so it will set the deleteMe flag to unlink itself
from the chain.
void HandleHungThreads(ThreadChain prev) {
ThreadChain next_p = GetNext();
boolean bIsHung = false;
//Basically resetting the links if one in the chain is hanging,akin to deleting a node in a linked list
if (IsHung() || ReleaseIdleThread()) {
bIsHung = true;
if (this == threadpool.root) ///case if root is hanging
{
threadpool.root = next_p;
prev = next_p;
} else {
prev.next = GetNext(); // remove this item from thread chain link
}
}
if (bIsHung)///if this is a hung thread that is waking up
{
Release();
}
...........
//Propagating the call to the next in the chain
if (bIsHung)
{
next.HandleHungThreads(prev);
} else {
next.HandleHungThreads(this);
}
Note that adding or reducing the threads is done in the ThreadChain class.
public Boolean canHandle() {
............................
if (busy)///if the associated thread is already processing a request
{
if (next == null) {
///Means there is no next thread so check if threads should be added
///if the thread count is already max nothing to be done
for (int i = (int) threadpool.m_threadCount; i <= threadpool.maxthreadCount; ++i) {
next = new ThreadChain<t>(null, threadpool);///create a new thread
break; ///Only add one thread at a time
....................................The last class here is the ThreadPool class. This acts as a sort of
controller though most of the functionality of detecting hanging threads
and increasing or decreasing the thread count based on the current
number of threads is done by the ThreadChain class. The ThreadPool holds
the container with the task queue and basically starts up the chain for
processing.
public ThreadPool(int _minthreadcount, int _maxthreadcount, long maxtimeout) {
.......
//Creating the initial ThreadChain
// /There is at least one thread in the thread pool
ThreadChain prev = new ThreadChain(null, this, "root");// /last node
root = prev;
// /Initially only create the minimum threads needed
for (int i = 0; i < minthreadCount - 1; ++i) {
ThreadChain temp = new ThreadChain(prev, this);
prev = temp;
root = temp;
}The finer details regarding this are in the source code. Note
that the custom ThreadPool is not used in any production code and I just
ported it for fun. So the provided source serves as an illustration only:
Java ThreadPool Source.
This section compares thread creation and releasing behavior based on
the various Java ThreadPool strategies discussed. To test
each strategy I fired a thousand tasks, after which some of them
randomly hung. After about five seconds the number of tasks executed was
printed out.
ExecutorService pool = Executors.newFixedThreadPool(100);With enough threads this strategy should cover the task-hang problem, though the task execution rate is pretty low. The only problem is that you have these many threads present irrespective of the number of tasks, which is overhead.

ExecutorService pool = Executors.newCachedThreadPool();Note that some tasks are hanging which reflects the reason why some threads are still on. Here the number of threads equals the number of Tasks that are fired. If the task rate is high, or during burst conditions, this will immediately have a problem: you can see the number of threads shooting up to thousands.

ExecutorService pool = new ThreadPoolExecutor(0, 100, 250, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());Don't use this one as the calling thread may hang.
ExecutorService pool = new ThreadPoolExecutor(10, 100, 250,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000));Once all the threads hang no more are created.

ExecutorService pool = new ThreadPoolExecutor(10, 1000, 250, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());Make sure that you handle the rejected task exception and also keep the maximum thread count to a big enough number for better performance:
try{
pool.execute(task4);
} catch (RejectedExecutionException e) {
.....
ThreadPool<CTask> tpool = new ThreadPool<ctask>(2, 100, 1000);
</ctask>
Here you can see the threads managed more actively and in a more intelligent manner and hanging threads are detected and removed. Hopefully you got some ideas from all of this!