Open source Java projects: Akka

Building distributed systems for concurrent and scalable Java applications

1 2 3 4 5 6 Page 6
Page 6 of 6

The Result message handler adds the results from the worker to its final result list and then checks to see if it has received messages from all the workers. Since NumberRangeMessage handler divided the work into 10 chunks, the Results handler counts responses and stops when it has received 10 responses. When it complete it invokes the listener's tell() method with the final results, and then it shuts itself down.

PrimeListener

Listing 8 shows the PrimerListener, which is the actor that receives the final results from the PrimeMaster.

Listing 8. PrimeListener.java

package com.geekcap.akka.prime;

import akka.actor.UntypedActor;
import com.geekcap.akka.prime.message.Result;


public class PrimeListener extends UntypedActor
{
    @Override
    public void onReceive( Object message ) throws Exception
    {
        if( message instanceof Result )
        {
            Result result = ( Result )message;

            System.out.println( "Results: " );
            for( Long value : result.getResults() )
            {
                System.out.print( value + ", " );
            }
            System.out.println();

            // Exit
            getContext().system().shutdown();
        }
        else
        {
            unhandled( message );
        }
    }
}

The PrimeListener handles Result messages that contain the final results of all PrimeWorker processing. It handles this message by printing all of the values in the Results list, then it shuts down the entire actor system by retrieving its actor context and then invoking the actor system's shutdown() method. This shutdown step is not required if you have additional work to perform. In this example we've reached the end of the work for the application, so we shut down the system so that the application can exit.

PrimeCalculator

Finally, Listing 9 shows the PrimeCalculator, which creates the ActorSystem, configures the PrimeMaster and PrimeListener, and then submits work to the PrimeMaster.

Listing 9. PrimeCalculator.java

package com.geekcap.akka.prime;

import akka.actor.*;
import com.geekcap.akka.prime.message.NumberRangeMessage;

public class PrimeCalculator
{
    public void calculate( long startNumber, long endNumber )
    {
        // Create our ActorSystem, which owns and configures the classes
        ActorSystem actorSystem = ActorSystem.create( "primeCalculator" );

        // Create our listener
        final ActorRef primeListener = actorSystem.actorOf( new Props( PrimeListener.class ), "primeListener" );

        // Create the PrimeMaster: we need to define an UntypedActorFactory so that we can control
        // how PrimeMaster instances are created (pass in the number of workers and listener reference
        ActorRef primeMaster = actorSystem.actorOf( new Props( new UntypedActorFactory() {
            public UntypedActor create() {
                return new PrimeMaster( 10, primeListener );
            }
        }), "primeMaster" );

        // Start the calculation
        primeMaster.tell( new NumberRangeMessage( startNumber, endNumber ) );
    }

    public static void main( String[] args )
    {
        if( args.length < 2 )
        {
            System.out.println( "Usage: java com.geekcap.akka.prime.PrimeCalculator <start-number> <end-number>" );
            System.exit( 0 );
        }

        long startNumber = Long.parseLong( args[ 0 ] );
        long endNumber = Long.parseLong( args[ 1 ] );

        PrimeCalculator primeCalculator = new PrimeCalculator();
        primeCalculator.calculate( startNumber, endNumber );
    }
}

The PrimeCalculator sets up the ActorSystem, the PrimeListener, and the PrimeMaster in its calculate() method. Note that actors live in their own hierarchies, so the PrimeCalculator sets up the PrimeMaster and then the PrimeMaster sets up all of its PrimeWorkers.

The additional complexity in the PrimeMaster ActorRef creation is that we need to pass parameters to the PrimeMaster's constructor, namely the number of workers to use and a reference to the listener to notify when it is complete. To do this, we create an anonymous inner class that extends the UntypedActorFactory and overrides its create() method. This allows us to create and optionally configure the PrimeMaster before it is wrapped by an ActorRef and returned to the caller. Finally we start the process by invoking the PrimeMaster ActorRef's tell() method, passing it the number range received from the command line.

A Maven build for PrimeCalculator

Listing 10 shows the Maven POM file that builds this project. Note that it is the same as the POM file shown in Listing 1, but the mainClass now references PrimeCalculator.

Listing 10. pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.geekcap.akka</groupId>
    <artifactId>AkkaSample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>AkkaSample</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.geekcap.akka.prime.PrimeCalculator</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.10</artifactId>
            <version>2.1.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

You can build the project with the following command:

mvn clean install

You can execute it from the target directory as follows:

java -jar AkkaSample-1.0-SNAPSHOT.jar 1 23

Passing "1 23" to the PrimeCalculator tells it to return all prime numbers between 1 and 23. The output should resemble the following:

Number Rage: 1 to 2
Number Rage: 3 to 4
Number Rage: 5 to 6
Number Rage: 11 to 12
Number Rage: 13 to 14
Number Rage: 15 to 16
Number Rage: 17 to 18
Number Rage: 19 to 23
Number Rage: 9 to 10
Number Rage: 7 to 8
Results: 
1, 2, 3, 5, 11, 13, 17, 19, 23, 7,

I left the number-range display so that you can see the numbers that each worker is processing. Because processing is asynchronous, the numbers are not in order. This is just an artifact of storing data in a list. As an exercise, try fixing this by changing the List to a TreeSet.

In conclusion

Any application of substance will eventually face concurrency issues; basically, "How can I accomplish all of my work at the same time?" The standard implementation is to manually create threads and then synchronize access to shared objects, in order to ensure that data is not corrupted. This is technically challenging and can lead to problems such as thread depravation, thread deadlock, and scalability issues. Akka resolves these familiar concurrency problems in a different way by implementing the actor model design pattern.

In the actor model, all work units are defined as actors and communication between them is accomplished via message passing. Akka handles all of the underlying threading complexity leaving you to divide your tasks into actors, define messages to pass between actors, and wire together their communication logic. Akka simplifies concurrent code, but more importantly, it provides infrastructure that allows you to scale without changing your application: if necessary, you could start up hundreds of Akka servers to run your actors. Akka seamlessly handles the distribution of messages and communication between actors.

Being able to distribute actors across multiple machines is the real power of Akka, especially for systems where high scalability is a requirement.

Steven Haines is a technical architect at Kit Digital, currently working onsite at Disney in Orlando. He is the founder of www.geekcap.com, an online education website, and has written hundreds of Java-related articles as well as three books: Java 2 From Scratch, Java 2 Primer Plus, and Pro Java EE Performance Management and Optimization. He lives with his wife and two children in Apopka, Florida.

Learn more about this topic

1 2 3 4 5 6 Page 6
Page 6 of 6