Open source Java projects: Akka

Building distributed systems for concurrent and scalable Java applications

1 2 3 4 5 6 Page 5
Page 5 of 6

PrimeWorker

The PrimeWorker shown in Listing 6, below, examines the number range passed to it and returns a list of all prime numbers within that range.

Listing 6. PrimeWorker.java

package com.geekcap.akka.prime;

import akka.actor.UntypedActor;
import com.geekcap.akka.prime.message.NumberRangeMessage;
import com.geekcap.akka.prime.message.Result;

public class PrimeWorker extends UntypedActor
{
    /**
     * Invoked by the mailbox when it receives a thread timeslice and a message is available
     *
     * @param message   The message to process
     */
    public void onReceive( Object message )
    {
        // We only handle NumberRangeMessages
        if( message instanceof NumberRangeMessage )
        {
            // Cast the message to a NumberRangeMessage
            NumberRangeMessage numberRangeMessage = ( NumberRangeMessage )message;
            System.out.println( "Number Rage: " + numberRangeMessage.getStartNumber() + " to " + numberRangeMessage.getEndNumber() );

            // Iterate over the range, compute primes, and return the list of numbers that are prime
            Result result = new Result();
            for( long l = numberRangeMessage.getStartNumber(); l <= numberRangeMessage.getEndNumber(); l++ )
            {
                if( isPrime( l ) )
                {
                    result.getResults().add( l );
                }
            }

            // Send a notification back to the sender
            getSender().tell( result, getSelf() );
        }
        else
        {
            // Mark this message as unhandled
            unhandled( message );
        }
    }

    /**
     * Returns true if n is prime, false otherwise
     *
     * @param n     The long to check
     *
     * @return      True if n is prime, false otherwise
     */
    private boolean isPrime( long n )
    {
        if( n == 1 || n == 2 || n == 3 )
        {
            return true;
        }

        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }

        //if not, then just check the odds
        for( long i=3; i*i<=n; i+=2 )
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}

PrimeWorker is the class that performs the labor of computing prime numbers. It inspects a received message and if it is a NumberRangeMessage it processes it. Otherwise PrimeWorker invokes the unhandled() method to notify its parent that it did not handle the message. PrimeWorker's logic is actually rather simple: it iterates over the numbers in its number range and then invokes its isPrime() method on each number. After it completes processing it reports back to the PrimeMaster, which it does by calling getSender() to retrieve an ActorRef to the actor that sent the message (PrimeMaster), and then invoking its tell() method with the results. PrimeWorkers run asynchronously, so this "callback" lets the PrimeMaster know that a given worker has completed its task.

PrimeMaster

The PrimeMaster divides the number range passed to it into 10 relatively even chunks and then distributes them to its PrimeWorker router.

Listing 7. PrimeMaster.java

package com.geekcap.akka.prime;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.RoundRobinRouter;
import com.geekcap.akka.prime.message.NumberRangeMessage;
import com.geekcap.akka.prime.message.Result;

public class PrimeMaster extends UntypedActor
{
    private final ActorRef workerRouter;
    private final ActorRef listener;

    private final int numberOfWorkers;
    private int numberOfResults = 0;

    private Result finalResults = new Result();

    public PrimeMaster( final int numberOfWorkers, ActorRef listener )
    {
        // Save our parameters locally
        this.numberOfWorkers = numberOfWorkers;
        this.listener = listener;

        // Create a new router to distribute messages out to 10 workers
        workerRouter = this.getContext()
                .actorOf( new Props(PrimeWorker.class )
                        .withRouter( new RoundRobinRouter( numberOfWorkers )), "workerRouter" );
    }

    @Override
    public void onReceive( Object message )
    {
        if( message instanceof NumberRangeMessage )
        {
            // We have a new set of work to perform
            NumberRangeMessage numberRangeMessage = ( NumberRangeMessage )message;

            // Just as a demo: break the work up into 10 chunks of numbers
            long numberOfNumbers = numberRangeMessage.getEndNumber() - numberRangeMessage.getStartNumber();
            long segmentLength = numberOfNumbers / 10;

            for( int i=0; i<numberOfWorkers; i++ )
            {
                // Compute the start and end numbers for this worker
                long startNumber = numberRangeMessage.getStartNumber() + ( i * segmentLength );
                long endNumber = startNumber + segmentLength - 1;

                // Handle any remainder
                if( i == numberOfWorkers - 1 )
                {
                    // Make sure we get the rest of the list
                    endNumber = numberRangeMessage.getEndNumber();
                }

                // Send a new message to the work router for this subset of numbers
                workerRouter.tell( new NumberRangeMessage( startNumber, endNumber ), getSelf() );
            }
        }
        else if( message instanceof Result )
        {
            // We received results from our worker: add its results to our final results
            Result result = ( Result )message;
            finalResults.getResults().addAll( result.getResults() );

            if( ++numberOfResults >= 10 )
            {
                // Notify our listener
                listener.tell( finalResults, getSelf() );

                // Stop our actor hierarchy
                getContext().stop( getSelf() );
            }

        }
        else
        {
            unhandled( message );
        }
    }
}

The PrimeMaster is a bit more complicated than the PrimeWorker. In its constructor it creates a RoundRobinRouter that distributes messages to a pool of PrimeWorkers. It creates the RoundRobinRouter by first creating the actor with the actorOf() method, but then invoking the withRouter() method on the returned ActorRef. A round-robin strategy distributes messages to each actor in turn. When it reaches the end of its list it starts over again at the beginning.

The PrimeMaster's onReceive() method needs to handle two types of messages: a NumberRangeMessage that is received from the PrimeCalculator to start the process and a Result message that is received from each PrimeWorker when it completes its work. The NumberRangeMessage handler divides the work into 10 chunks and dispatches those chunks to the PrimeWorkers by invoking the router's tell() method. The router then handles distributing those messages to workers in a round-robin fashion.

An additional complexity in this handler is that it might miss some messages at the end when the range is divided into chunks of 10 (unless the number of numbers to examine is evenly divisible by 10, in which case there will be a remainder that needs to be accounted for). The last handler ensures that all the numbers are processed by including them all to the end of the range. This is not the best way to distribute numbers to workers because smaller numbers will take less processing time than larger numbers, but it does illustrate the process without adding additional complexity.

1 2 3 4 5 6 Page 5
Page 5 of 6