Java - Hanging Thread Detection and Handling

By Alex. C. Punnen

Architect – Nokia Siemens Networks

Bangalore

Hanging threads are a common challenge in the development of software that has to interface with proprietary devices using proprietary or standardized interfaces such as SNMP, Q3, or Telnet. This problem is not limited to network management but occurs across a wide range of fields like web servers, processes invoking remote procedure calls, and so on.

A thread that initiates a request to a device needs a mechanism to detect in case the device does not respond or responds only partially. In some cases where such a hang is detected, a specific action must be taken. The specific action could be either retrial or letting the end-user know about the task failure or some other recovery option. In some cases where a large number of tasks must be fired to a large number of network elements by a component, hanging thread detection is important so that it does not become a bottleneck for other task processing. So there are two aspects to managing hanging threads: performance and notification.

For the notification aspect we can tailor the Java Observer pattern to fit in the multithreaded world.

Tailoring the Java Observer Pattern to Multithreaded Systems

Due to hanging tasks, using the Java ThreadPool class with a fitting strategy is the first solution that comes to mind. However using the Java ThreadPool in the context of some threads randomly hanging over a period of time gives unwanted behavior based on the particular strategy used, like thread starvation in the case of fixed thread pool strategy. This is mainly due to the fact that the Java ThreadPool does not have a mechanism to detect a thread hang.

We could try a Cached thread pool, but it also has problems. If there is a high rate of task firing, and some threads hang, the number of threads could shoot up, eventually causing resource starvation and out-of-memory exceptions. Or we could use a Custom ThreadPool strategy invoking a CallerRunsPolicy. In this case, too, a thread hang could cause all threads to hang eventually. (The main thread should never be the caller, since there is a possibility that any task passed to the main thread can hang, causing everything to grind to a halt.)

So, what is the solution? I'll demonstrate a not-so-simple ThreadPool pattern that adjusts the pool size according to the task rate and based on the number of hanging threads. Let us first go to the problem of detecting hanging threads.

Detecting Hanging Threads

Figure 1 shows an abstraction of the pattern:

There are two important classes here: ThreadManager and ManagedThread. Both extend from the Java Thread class. The ThreadManager holds a container which holds the ManagedThreads. When a new ManagedThread is created it adds itself to this container.


ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false);
testthread.start();
thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10);
thrdManger.start();

The ThreadManager iterates through this list and calls the ManagedThread's isHung() method. This is basically a timestamp check logic.


if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime )
{
    logger.debug("Thread is hung");
    return true;
}

If it finds that a thread has gone into a task loop and never updated its results it takes a recovery mechanism as stipulated by the ManageThread.


while(isRunning)
{
    for (Iterator<ManagedThreadData> iterator = managedThreads.iterator(); iterator.hasNext();) {
        ManagedThreadData thrddata = (ManagedThreadData) iterator.next();

      if(thrddata.getManagedThread().isHung())
      {
            logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() );

    switch (thrddata.getManagedAction()) 
    {
       case RESTART_THREAD: // The action here is to restart the the thread
        //remove from the manager
        iterator.remove();
        //stop the processing of this thread if possible
        thrddata.getManagedThread().stopProcessing();
  
        if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create
        {
            ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread
            newThread.start();
            //add it back to be managed
            manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime());
        }
        break;
.........

For a new ManagedThread to be created and used in place of the hung one it should not hold any state or any container. For this the container on which the ManagedThread acts should be separated out. Here we are using the ENUM-based Singleton pattern to hold the Task list. So the container holding the tasks is independent of the thread processing the tasks. Click the following link to download the source for the pattern described: Java Thread Manager Source.

Hanging Threads and Java ThreadPool Strategies

The Java ThreadPool does not have a mechanism for detecting hanging threads. Using a strategy like fixed threadpool (Executors.newFixedThreadPool()) will not work because if some tasks hang over time, all of the threads will eventually be in a hung state. Another option is using a cached ThreadPool policy (Executors.newCachedThreadPool()). This could ensure that there will be always threads available to process a task, only constrained by VM memory, CPU, and thread limits. However, with this policy there is no control of the number of threads that get created. Regardless of whether a processing thread hangs or not, using this policy while the task rate is high leads to a huge number of threads getting created. If you do not have enough resources for the JVM very soon you will hit the maximum memory threshold or high CPU. It is pretty common to see the number of threads hit hundreds or thousands. Even though they are released once the task is processed, sometimes during burst-handling the high number of threads will overwhelm system resources.

A third option is using custom strategies or policies. One such option is to have a thread pool that scales from 0 to some maximum number. So even if one thread hung a new thread would be created as long as the maximum thread count was reached:


execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

Here 3 is the maximum thread count and keep-alive time is set to 60 seconds as this is a task-intensive process. If we give a high enough maximum thread count, this is more or less a reasonable policy to use in the context of hanging tasks. The only problem is that if the hanging threads are not released eventually there is a slight chance that all threads could at some point hang. If maximum threads is sufficiently high and assuming that a task hang is an infrequent phenomenon, then this policy would fit the bill.

It would have been sweet if the ThreadPool had also a pluggable mechanism of detecting hanging threads. I'll discuss one such design later. Of course if all the threads are freezed up you could configure and use the rejected-task policy of the thread pool. If you do not want to discard the tasks you would have to use the CallerRunsPolicy:


execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>() new
ThreadPoolExecutor.CallerRunsPolicy());

In this case, if a thread hang caused a task to be rejected, that task would be given to the calling thread to be handled. There is always a chance of that task too hanging. In this case the entire process would freeze. So it is better not to add such a policy in this context.


public class NotificationProcessor implements Runnable {

private final NotificationOriginator notificationOrginator;

boolean isRunning = true;

private final ExecutorService execexec;

AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor

// execexec = Executors.newCachedThreadPool();// Too many threads
// execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection
execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());

}

public void run() {
while (isRunning) {
try {

final Task task = TaskQueue.INSTANCE.getTask();
Runnable thisTrap= new Runnable() { 
public void run() {
++alarmid;
notificaionOrginator.notify(new OctetString(), // Task processing
nbialarmnew.getOID(),
nbialarmnew.createVariableBindingPayload());
É........}} ;

execexec.execute(thisTrap);

}

A Custom ThreadPool with Hang Detection

A thread pool library with the capability of task hang detection and handling would be great to have. I've developed one and I'll demonstrate it below. This is actually a port from a C++ thread pool that I designed and used some time back (see references). Basically, this solution uses the Command pattern and the Chain of Responsibility pattern. However to implement the Command pattern in Java without the aid of Function object support is a bit difficult. For this I had to change the implementation slightly to use Java reflection. Note that the context in which this pattern was designed was where a thread pool had to be fitted in/plugged in without modifying any of the existing classes. (I believe the one big benefit of object-oriented programming is that it gives us a way to design classes so as to make effective use of the Open Closed Principle. This is especially true of complex old legacy code and might be of less relevance for new product development.) Hence I used reflection instead of using an interface to implement the Command pattern. The rest of the code could be ported without major change as almost all the thread synchronization and signaling primitives are available in Java 1.5 onwards.


public class Command<T> {
    private Object[ ]argParameter;
........
 //Ctor for a method with two args
   Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) {
        m_objptr = pObj;
        m_methodName = mthodName;
        m_timeout = timeout;
        m_key = key;
        argParameter = new Object[2];
        argParameter[0] = arg1;
        argParameter[1] = arg2;

    }

 // Calls the method of the object
    void execute() {

              Class klass = m_objptr.getClass();
        Class[] paramTypes = new Class[]{int.class, int.class};

        try {
            Method methodName = klass.getMethod(m_methodName, paramTypes);
            //System.out.println("Found the method--> " + methodName);
            if (argParameter.length == 2) {
                methodName.invoke(m_objptr, (Object) argParameter[0],
                        (Object) argParameter[1]);
            }
            
 

Example of usage of this pattern:


public class CTask
{..
 public int DoSomething(int a, int b) {...}
}

Command<CTask> cmd4 = new Command<CTask>(task4, "DoMultiplication", 1, "key2",2,5);

Now we have two more important classes here. One is the ThreadChain class, which implements the Chain of Responsibility pattern:


public class ThreadChain<T> implements Runnable {

 public ThreadChain(ThreadChain p, ThreadPool pool, String name) {
        AddRef();
          deleteMe = false;
        busy = false; //--> very important
        next = p; //set the thread chain  - note this is like a linked list impl
        threadpool = pool; //set the thread pool - Root of the threadpool
          ........
        threadId = ++ThreadId;
                 ......
        // start the thread
        thisThread = new Thread(this, name + inttid.toString());
        thisThread.start();
    }
  

This class has two main methods. One is Boolean CanHandle() which is initiated by the ThreadPool class and then proceeds recursively. This checks if the current thread (current ThreadChain instance) is free to handle the task. If it is is already handling a task it calls the next one in the chain.


 public Boolean canHandle() {
if (!busy) { //If not busy
            System.out.println("Can Handle This Event in id=" + threadId);
            // todo signal an event
            try {
                condLock.lock();
                condWait.signal(); //Signal the HandleRequest which is waiting for this in the run  method
            .........................................
            return true;
        }
              .........................................
            ///Else see if the next object in the chain is free
            /// to handle the request
            return next.canHandle();

Note that the HandleRequest is a method of ThreadChain that is invoked from the the Thread run() method and awaits the signal from the canHandle method. Also note how the task is handled via the Command pattern.

1 2 Page 1