Open source Java projects: Storm

Parallel realtime computation for unbounded data streams

1 2 Page 2
Page 2 of 2

The declareOutputFields() method defines the fields that the spout emits, which in this example includes a single value with the name number. Remember that by default Fields include support for primitive types (and primitive type wrappers) like Integers and Strings. So it will be up to the nextTuple() method to pass values in the order that they are declared in the declareOutputFields() response.

PrimeNumberBolt

The PrimeNumberBolt's prepare() method, shown in Listing 2, is called when the bolt is initialized, so it saves the OutputCollector. If this bolt were to emit its result for another bolt to consume, it would use the collector to do so. In this example it does not emit any bolts, but I've included it to demonstrate how it would continue processing.

Listing 2. PrimeNumberBolt.java

package com.geekcap.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class PrimeNumberBolt extends BaseRichBolt 
{
    private OutputCollector collector;

    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) 
    {
        this.collector = collector;
    }

    public void execute( Tuple tuple ) 
    {
        int number = tuple.getInteger( 0 );
        if( isPrime( number) )
        {
            System.out.println( number );
        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ) 
    {
        declarer.declare( new Fields( "number" ) );
    }   
    
    private boolean isPrime( int n ) 
    {
        if( n == 1 || n == 2 || n == 3 )
        {
            return true;
        }
        
        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }
        
        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) 
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}

When processing, the PrimeNumberBolt extracts the first value from the Tuple, which is the Integer to process, inspects it to see if it is prime by calling the isPrime() method, and then prints out the value if the number is prime. The declareOutputFields() method would be used if the bolt were to emit a new Tuple for processing by another bolt. In this example it defines that it outputs a single field called number, but it does not call collector.emit() to emit the tuple (it's only shown for your reference).

PrimeNumberTopology

The PrimeNumberTopology class in Listing 3 uses the TopologyBuilder helper class to build a Topology object. To construct a Topology, create an instance of TopologyBuilder and invoke its setSpout() and setBolt() methods to add spouts and bolts to the topology. Each spout and bolt has a name -- in this example the spout has the name "spout."

Listing 3. PrimeNumberTopology.java

package com.geekcap.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

public class PrimeNumberTopology 
{
    public static void main(String[] args) 
    {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout( "spout", new NumberSpout() );
        builder.setBolt( "prime", new PrimeNumberBolt() )
                .shuffleGrouping("spout");


        Config conf = new Config();
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("test");
        cluster.shutdown();
    }
}

The setBolt() method in above returns an instance of the InputDeclarer object that can be used to define the inputs to the bolt. I've used the shuffleGrouping() method, which means that the tuples should be randomly distributed from the named spout to the bolt's tasks. The InputDeclarer allows us to define other types of distributions beyond the random distribution provided by the shuffle grouping. For example, we could define a "field grouping" so that a bolt only received a subset of tuple values. (Use the fieldGrouping() method if you want to try building a field grouping.)

After creating the topology, the PrimeNumberTopology class creates the infrastructure to execute the topology in local mode. In creates a LocalCluster object, passes a default Config object and the constructed Topology object to the submitTopology() method. This example, the topology "test," uses Util.sleep() to sleep for 10 seconds, then kills the topology and shuts down the LocalCluster. (In the next section we'll look at how to deploy such a topology to a real Storm cluster.

Building the project

Listing 4 shows the Maven pom.xml file for building this project.

Listing 4. pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.geekcap</groupId>
    <artifactId>storm-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>storm-test</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.geekcap.storm.PrimeNumberTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-lib</artifactId>
            <version>0.8.1</version>
            <!-- keep storm out of the jar-with-dependencies
               <scope>provided</scope> -->
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
</project>

Some notables about the POM file:

  • maven-jar-plugin defines the main class for the .jar file, which is com.geekcap.storm.PrimeNumberTopology.
  • maven-dependency-plugin copies all dependencies to the target folder's lib directory.
  • The POM includes the storm-lib dependency. If you were building a distributed-mode application you would include storm rather than storm-lib.
  • The POM also includes the clojars.org Maven repository, which includes the storm-lib project. (Clojars is a community repository for open source Clojure libraries.)

You can build this project with mvn clean install and then execute the it with java -jar storm-test-1.0-SNAPSHOT.jar from the target directory. In my environment I see the following output:

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
1
3
2
7
5
11
...
3471031
3471283
3471317
3471407

The process runs for 10 seconds and in that time we are able to observe 3.4 million numbers.

Deploying a topology to a Storm cluster

Deploying a topology to a Storm cluster is an involved process, but once you go through it once, it will not be hard to go through it again. The Storm homepage hosts a good, in-depth Storm clustering tutorial but I'll summarize the process here.

  1. Define a class with a main method, just like we did for the PrimeNumberTopology class.
  2. Use the Storm TopologyBuilder to build a topology.
  3. Use the StormSubmitter class's submitTopology() method to name, configure, and submit your topology to the storm cluster.
  4. Package your topology, including spouts and bolts and helper classes, in a .jar file with the mainClass defined to point to your topology class, like we did in the pom.xml file in Listing 4.
  5. Execute the storm command (in your storm installation's bin folder) with the jar command, passing it a reference to your .jar file and the main class name (which uses the StormSubmitter.submit() method to submit the topology to the cluster).

At this point you can execute storm activate to activate your topology in the cluster, storm deactivate to deactivate your topology in the cluster, and storm kill to deactivate and kill (stop) a topology from running.

If you wanted to deploy the PrimeNumberTopology to a Storm cluster, your code would look something like this:

Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology( "primenumbertopology", conf, builder.createTopology() );

In conclusion

Storm provides a framework for executing a continuous stream of data to produce business value. Storm applications are comprised of topologies that contain spouts that emit data and bolts that consume data or emit additional data for other bolts to consume. Storm differs from Hadoop in that Hadoop jobs have a target completion, whereas Storm topologies are meant to run indefinitely. In simple terms this means that Storm and Hadoop solve similar but different problems.

We've reviewed the use cases for Storm applications, the structure of a Storm application, and a Storm deployment environment and built a simple PrimeNumberTopology that computes prime numbers across the range of integer numbers. This example demonstrates how Storm tuples pass from a spout to a bolt to compute business value, and also should leave with you an idea of Storm's processing power, even running on a local instance.

See the Resources section to learn more about Storm.

Steven Haines is a technical architect at Kit Digital, currently working onsite at Disney in Orlando. He is the founder of www.geekcap.com, an online education website, and has written hundreds of Java-related articles as well as three books: Java 2 From Scratch, Java 2 Primer Plus, and Pro Java EE Performance Management and Optimization. He lives with his wife and two children in Apopka, Florida.

More about Storm

1 2 Page 2
Page 2 of 2