Open source Java projects: Storm

Parallel realtime computation for unbounded data streams

Storm is a big data framework that is similar to Hadoop but fine-tuned to handle unbounded data streams. In this installment of Open source Java projects, learn how Storm builds on the lessons and success of Hadoop to deliver massive amounts of data in realtime, then dive into Storm's API with a small demonstration app.

When I wrote my recent Open source Java projects introduction to Github I noted that Storm, a project submitted by developer Nathan Marz, was among GitHub's most watched Java repositories. Curious, I decided to learn more about Storm and why it was causing a stir in the GitHub Java developer community.

What I found is that Storm is a big data processing system similar to Hadoop in its basic technology architecture, but tuned for a different set of use cases. Whereas Hadoop targets batch processing, Storm is an always-active service that receives and processes unbound streams of data. Like Hadoop, Storm is a distributed system that offers massive scalability for applications that store and manipulate big data. Unlike Hadoop, it delivers that data instantaneously, in realtime.

Open source licensing

Storm is a free and open source project. It is hosted on GitHub and available under the Eclipse Public License for use in both open source and proprietary software.

In this installment of the Open source Java projects series I introduce Storm. We'll start with an overview of Storm's architecture and use case scenarios. Then I'll walk through a demonstration of setting up a Storm development environment and building a simple application whose goal is to process prime numbers in realtime. You'll learn a bit about Storm and get your hands into its code, and you'll also get a little taste of its speed and versatility. If Storm is applicable to your application needs, you'll be ready for next steps after reading this article.

What is Storm?

Storm is a free and open source distributed real-time computation system that can be used with any programming language. It is written primarily in Clojure and supports Java by default. In terms of categorical use cases, Storm is especially well suited to any of the following:

  • Realtime analytics
  • Online machine learning
  • Continuous computation
  • Distributed RPC
  • ETL

Perusing the Storm User group discussion forum, I found that Storm has been used in some very interesting real-world scenarios. Given a large database for an online auction site, for instance, Storm was used to view the top number of trending items in any category, in realtime. It has also been connected to the Twitter firehose and used to identify posting trends. Other potential uses for Storm include the following:

  • Watch live traffic to a website in order to understand user behavior and feed a recommendation engine.
  • Execute search operations in parallel (i.e., use parallel threads to search through different areas of a data set).
  • Compute video analytics, perform tagging, build tracks, and so forth.

See the Storm user group discussion forum for more real-world and speculative use cases for Storm.

Storm vs Hadoop

For many developers investigating Storm, the first question will be: How does it differ from Hadoop? The simple answer is that Storm analyzes realtime data while Hadoop analyzes offline data. In truth, the two frameworks complement one another more than they compete.

Hadoop provides its own file system (HDFS) and manages both data and code/tasks. It divides data into blocks and when a "job" executes, it pushes analysis code close to the data it is analyzing. This is how Hadoop avoids the overhead of network communication in loading data -- keeping the analysis code next to the data enables Hadoop to read it faster by orders of magnitude.

Hadoop's programming paradigm is the infamous MapReduce. Basically, Hadoop partitions data into chunks and passes those chunks to mappers that map keys to values, such as the hit count for a resource on your website. Reducers then assemble those mapped key/value pairs into a usable output. The MapReduce paradigm operates quite elegantly but is targeted at data analysis. In order to leverage all the power of Hadoop application data must be stored in the HDFS file system.

Storm solves a different problem altogether. Storm is interested in understanding things that are happening in realtime -- meaning right now -- and interpreting them. Storm does not have its own file system and its programming paradigm is quite a bit different from Hadoop's. Storm is all about obtaining chunks of data, known as spouts, from somewhere (like a Twitter feed or live web traffic to your site) and passing that data through various processing components, known as bolts. Storm's data processing mechanism is extremely fast and is meant to help you identify live trends as they are happening. Unlike Hadoop, Storm doesn't care what happened yesterday or last week.

Some use cases have shown that Storm and Hadoop can work beautifully together (see Resources). For instance, you might use Storm to dynamically adjust your advertising engine to respond to current user behavior, then use Hadoop to identify the long-term patterns in that behavior. The important point is that you don't have to choose between Storm and Hadoop; rather, work to understand the problem you are trying to solve and then choose the best tool for the job.

Storm spouts and bolts

Storm has its own vernacular, but if you've studied Hadoop and other distributed data processing systems you should find its basic architecture familiar.

At the highest level, Storm is comprised of topologies. A topology is a graph of computations -- each node contains processing logic and each path between nodes indicates how data should be passed between nodes.

Inside of toplogies you have networks of streams, which are unbounded sequences of tuples. Storm provides a mechanism to transform streams into new streams using spouts and bolts. Spouts generate streams, which can pull data from a site like Twitter or Facebook and then publish it in an abstract format. Bolts consume input streams, process them, and then optionally generate new streams.

A stream can be simple and consumed by a single bolt or it could be complex, require multiple streams, and hence require multiple bolts. A bolt can do most anything, including run functions, filter tuples, perform stream aggregation, join streams, or even execute calls to a database.

Figure 1 shows the relationship between spouts and bolts within a topology.

Figure 1. Spouts and Bolts within a Storm topology (click to enlarge)

As Figure 1 illustrates, a Storm topology can have multiple spouts and each spout can have one or more bolts. Additionally, each bolt can exist on its own or it could have additional bolts that listen to it. For example, you might have a spout that observed user behavior on your site and a bolt that used a database field to categorize the types of pages that users were accessing. Another bolt might handle article views and add a tick to a recommendation engine for a unique view on each type of article. You would simply configure the relationships between spouts and bolts to match your own business logic and application demands.

As previously mentioned, Storm's data model is represented by tuples. A tuple is a named list of values of any type. Storm supports all primitive types, Strings, and byte-arrays and you can build your own serializer if you want to use your own object types. Your spouts will "emit" tuples and your bolts will consume them. Your bolts may also emit tuples if their output is destined to be processed by another bolt downstream. Basically, emitting tuples is the mechanism for passing data from a spout to a bolt, or from a bolt to another bolt.

A Storm cluster

I've covered the main vernacular for the structure of a Storm-based application, but deploying to a Storm cluster involves another set of terms.

A Storm cluster is somewhat similar to Hadoop clusters, but while a Hadoop cluster runs map-reduce jobs, Storm runs topologies. As mentioned earlier, one of the primary differences between map-reduce jobs and topologies is that map-reduce jobs eventually end, while topologies are destined to run until you explicitly kill them. Storm clusters define two types of nodes:

  • Master node: This node runs a daemon process called Nimbus. Nimbus is responsible for distributing code across the cluster, assigning tasks to machines, and monitoring the success and failure of units of work.
  • Worker nodes: These nodes run a daemon process called the Supervisor. A Supervisor is responsible for listening for work assignments for its machine. It then subsequently starts and stops worker processes. Each worker process executes a subset of a topology, so that the execution of a topology is spread across a multitude of worker processes running on a multitude of machines.

Storm and ZooKeeper

Sitting between the Nimbus and the various Supervisors is the Apache open source project ZooKeeper. ZooKeeper's goal is to enable highly reliable distributed coordination, mainly by acting as a centralized service for distributed cluster functionality.

Storm topologies are deployed to the Nimbus and then the Nimbus deploys spouts and bolts to Supervisors. When it comes time to execute spouts and bolts, the Nimbus communicates with the Supervisors by passing messages to Zookeepers. Zookeepers maintain all state for the topology, which allows the Nimbus and Supervisors to be fail-fast and stateless: if the Nimbus or Supervisor processes go down then the state of processing is not lost; work is reassigned to another Supervisor and processing continues. Then, when the Nimbus or a Supervisor is restarted, they simply rejoin the cluster and their capacity is added to the cluster.

The relationship between the Nimbus, Zookeepers, and Supervisors is shown in Figure 2.

Figure 2. Nimbus communicates with Supervisors through Zookeepers (click to enlarge)

Setting up a Storm development environment

Storm can run in one of two modes: local mode or distributed mode. In local mode, Storm executes topologies completely in-process by simulating worker nodes using threads. In distributed mode, it runs across a cluster of machines. We'll set up a local development environment as follows:

  1. Download the latest release of Storm and decompress it to your local hard drive.
  2. Add the Storm bin directory to your PATH environment variable.

The bin directory includes the storm command, which is the command-line interface for interacting with a storm cluster. You can learn more about each command-line option from the Storm documentation. At a high-level, the storm command allows you to deploy your topologies to a storm cluster, as well as activate, deactivate, and kill topologies.

Creating a storm application in local mode is a great way to test out a Storm application before deploying it to a cluster, where the processing capacity will increase by orders of magnitude.

Storm demo: Crunching prime numbers

While you might not feel inclined to compute prime numbers for the rest of eternity, doing it once is a great demonstration of Storm's potential that is also easy to implement.

For this demo we'll create a NumberSpout that emits numbers, starting with 1 and incrementing until we kill the topology. To this spout we'll connect a PrimeNumberBolt that evaluates the number, which is passed in a Tuple, and prints the number out to the screen if it is prime. The NumberSpout and the PrimeNumberBolt are packaged together inside a PrimeNumberTopology class.

NumberSpout

Listing 1 shows the NumberSpout, which is responsible for emitting numbers, starting with 1 and incrementing until the spout is killed.

Listing 1. NumberSpout.java

package com.geekcap.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values; 
import java.util.Map;

public class NumberSpout extends BaseRichSpout 
{
    private SpoutOutputCollector collector;
    
    private static int currentNumber = 1;
        
    @Override
    public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) 
    {
        this.collector = collector;
    }
    
    @Override
    public void nextTuple() 
    {
        // Emit the next number
        collector.emit( new Values( new Integer( currentNumber++ ) ) );
    }

    @Override
    public void ack(Object id) 
    {
    }

    @Override
    public void fail(Object id) 
    {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare( new Fields( "number" ) );
    }
}

In Listing 1, the open() method saves the SpoutOutputCollector: Storm invokes this method when the spout starts up. The collector is the mechanism through which the NumberSpout emits numbers. Storm invokes the nextTuple() method to retrieve the next tuple to pass to subscribed bolts -- it builds a new Value object that contains the next number in the sequence and then increments that number so that the next invocation will send a new number.

The declareOutputFields() method defines the fields that the spout emits, which in this example includes a single value with the name number. Remember that by default Fields include support for primitive types (and primitive type wrappers) like Integers and Strings. So it will be up to the nextTuple() method to pass values in the order that they are declared in the declareOutputFields() response.

PrimeNumberBolt

The PrimeNumberBolt's prepare() method, shown in Listing 2, is called when the bolt is initialized, so it saves the OutputCollector. If this bolt were to emit its result for another bolt to consume, it would use the collector to do so. In this example it does not emit any bolts, but I've included it to demonstrate how it would continue processing.

Listing 2. PrimeNumberBolt.java

package com.geekcap.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class PrimeNumberBolt extends BaseRichBolt 
{
    private OutputCollector collector;

    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) 
    {
        this.collector = collector;
    }

    public void execute( Tuple tuple ) 
    {
        int number = tuple.getInteger( 0 );
        if( isPrime( number) )
        {
            System.out.println( number );
        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ) 
    {
        declarer.declare( new Fields( "number" ) );
    }   
    
    private boolean isPrime( int n ) 
    {
        if( n == 1 || n == 2 || n == 3 )
        {
            return true;
        }
        
        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }
        
        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) 
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}

When processing, the PrimeNumberBolt extracts the first value from the Tuple, which is the Integer to process, inspects it to see if it is prime by calling the isPrime() method, and then prints out the value if the number is prime. The declareOutputFields() method would be used if the bolt were to emit a new Tuple for processing by another bolt. In this example it defines that it outputs a single field called number, but it does not call collector.emit() to emit the tuple (it's only shown for your reference).

PrimeNumberTopology

The PrimeNumberTopology class in Listing 3 uses the TopologyBuilder helper class to build a Topology object. To construct a Topology, create an instance of TopologyBuilder and invoke its setSpout() and setBolt() methods to add spouts and bolts to the topology. Each spout and bolt has a name -- in this example the spout has the name "spout."

Listing 3. PrimeNumberTopology.java

package com.geekcap.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

public class PrimeNumberTopology 
{
    public static void main(String[] args) 
    {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout( "spout", new NumberSpout() );
        builder.setBolt( "prime", new PrimeNumberBolt() )
                .shuffleGrouping("spout");


        Config conf = new Config();
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("test");
        cluster.shutdown();
    }
}

The setBolt() method in above returns an instance of the InputDeclarer object that can be used to define the inputs to the bolt. I've used the shuffleGrouping() method, which means that the tuples should be randomly distributed from the named spout to the bolt's tasks. The InputDeclarer allows us to define other types of distributions beyond the random distribution provided by the shuffle grouping. For example, we could define a "field grouping" so that a bolt only received a subset of tuple values. (Use the fieldGrouping() method if you want to try building a field grouping.)

After creating the topology, the PrimeNumberTopology class creates the infrastructure to execute the topology in local mode. In creates a LocalCluster object, passes a default Config object and the constructed Topology object to the submitTopology() method. This example, the topology "test," uses Util.sleep() to sleep for 10 seconds, then kills the topology and shuts down the LocalCluster. (In the next section we'll look at how to deploy such a topology to a real Storm cluster.

Building the project

Listing 4 shows the Maven pom.xml file for building this project.

Listing 4. pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.geekcap</groupId>
    <artifactId>storm-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>storm-test</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.geekcap.storm.PrimeNumberTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-lib</artifactId>
            <version>0.8.1</version>
            <!-- keep storm out of the jar-with-dependencies
               <scope>provided</scope> -->
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
    </repositories>
</project>

Some notables about the POM file:

  • maven-jar-plugin defines the main class for the .jar file, which is com.geekcap.storm.PrimeNumberTopology.
  • maven-dependency-plugin copies all dependencies to the target folder's lib directory.
  • The POM includes the storm-lib dependency. If you were building a distributed-mode application you would include storm rather than storm-lib.
  • The POM also includes the clojars.org Maven repository, which includes the storm-lib project. (Clojars is a community repository for open source Clojure libraries.)

You can build this project with mvn clean install and then execute the it with java -jar storm-test-1.0-SNAPSHOT.jar from the target directory. In my environment I see the following output:

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
1
3
2
7
5
11
...
3471031
3471283
3471317
3471407

The process runs for 10 seconds and in that time we are able to observe 3.4 million numbers.

Deploying a topology to a Storm cluster

Deploying a topology to a Storm cluster is an involved process, but once you go through it once, it will not be hard to go through it again. The Storm homepage hosts a good, in-depth Storm clustering tutorial but I'll summarize the process here.

  1. Define a class with a main method, just like we did for the PrimeNumberTopology class.
  2. Use the Storm TopologyBuilder to build a topology.
  3. Use the StormSubmitter class's submitTopology() method to name, configure, and submit your topology to the storm cluster.
  4. Package your topology, including spouts and bolts and helper classes, in a .jar file with the mainClass defined to point to your topology class, like we did in the pom.xml file in Listing 4.
  5. Execute the storm command (in your storm installation's bin folder) with the jar command, passing it a reference to your .jar file and the main class name (which uses the StormSubmitter.submit() method to submit the topology to the cluster).

At this point you can execute storm activate to activate your topology in the cluster, storm deactivate to deactivate your topology in the cluster, and storm kill to deactivate and kill (stop) a topology from running.

If you wanted to deploy the PrimeNumberTopology to a Storm cluster, your code would look something like this:

Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology( "primenumbertopology", conf, builder.createTopology() );

In conclusion

Storm provides a framework for executing a continuous stream of data to produce business value. Storm applications are comprised of topologies that contain spouts that emit data and bolts that consume data or emit additional data for other bolts to consume. Storm differs from Hadoop in that Hadoop jobs have a target completion, whereas Storm topologies are meant to run indefinitely. In simple terms this means that Storm and Hadoop solve similar but different problems.

We've reviewed the use cases for Storm applications, the structure of a Storm application, and a Storm deployment environment and built a simple PrimeNumberTopology that computes prime numbers across the range of integer numbers. This example demonstrates how Storm tuples pass from a spout to a bolt to compute business value, and also should leave with you an idea of Storm's processing power, even running on a local instance.

See the Resources section to learn more about Storm.

Steven Haines is a technical architect at Kit Digital, currently working onsite at Disney in Orlando. He is the founder of www.geekcap.com, an online education website, and has written hundreds of Java-related articles as well as three books: Java 2 From Scratch, Java 2 Primer Plus, and Pro Java EE Performance Management and Optimization. He lives with his wife and two children in Apopka, Florida.

More about Storm

Join the discussion
Be the first to comment on this article. Our Commenting Policies