Newsletter sign-up
View all newsletters

Enterprise Java Newsletter
Stay up to date on the latest tutorials and Java community news posted on JavaWorld

Sponsored Links

Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs

Open source Java projects: Storm

Parallel realtime computation for unbounded data streams

  • Print
  • Feedback

Page 5 of 7

The declareOutputFields() method defines the fields that the spout emits, which in this example includes a single value with the name number. Remember that by default Fields include support for primitive types (and primitive type wrappers) like Integers and Strings. So it will be up to the nextTuple() method to pass values in the order that they are declared in the declareOutputFields() response.

PrimeNumberBolt

The PrimeNumberBolt's prepare() method, shown in Listing 2, is called when the bolt is initialized, so it saves the OutputCollector. If this bolt were to emit its result for another bolt to consume, it would use the collector to do so. In this example it does not emit any bolts, but I've included it to demonstrate how it would continue processing.

Listing 2. PrimeNumberBolt.java

package com.geekcap.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class PrimeNumberBolt extends BaseRichBolt 
{
    private OutputCollector collector;

    public void prepare( Map conf, TopologyContext context, OutputCollector collector ) 
    {
        this.collector = collector;
    }

    public void execute( Tuple tuple ) 
    {
        int number = tuple.getInteger( 0 );
        if( isPrime( number) )
        {
            System.out.println( number );
        }
        collector.ack( tuple );
    }

    public void declareOutputFields( OutputFieldsDeclarer declarer ) 
    {
        declarer.declare( new Fields( "number" ) );
    }   
    
    private boolean isPrime( int n ) 
    {
        if( n == 1 || n == 2 || n == 3 )
        {
            return true;
        }
        
        // Is n an even number?
        if( n % 2 == 0 )
        {
            return false;
        }
        
        //if not, then just check the odds
        for( int i=3; i*i<=n; i+=2 ) 
        {
            if( n % i == 0)
            {
                return false;
            }
        }
        return true;
    }
}

When processing, the PrimeNumberBolt extracts the first value from the Tuple, which is the Integer to process, inspects it to see if it is prime by calling the isPrime() method, and then prints out the value if the number is prime. The declareOutputFields() method would be used if the bolt were to emit a new Tuple for processing by another bolt. In this example it defines that it outputs a single field called number, but it does not call collector.emit() to emit the tuple (it's only shown for your reference).

PrimeNumberTopology

The PrimeNumberTopology class in Listing 3 uses the TopologyBuilder helper class to build a Topology object. To construct a Topology, create an instance of TopologyBuilder and invoke its setSpout() and setBolt() methods to add spouts and bolts to the topology. Each spout and bolt has a name -- in this example the spout has the name "spout."

Listing 3. PrimeNumberTopology.java

package com.geekcap.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

public class PrimeNumberTopology 
{
    public static void main(String[] args) 
    {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout( "spout", new NumberSpout() );
        builder.setBolt( "prime", new PrimeNumberBolt() )
                .shuffleGrouping("spout");


        Config conf = new Config();
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("test");
        cluster.shutdown();
    }
}

The setBolt() method in above returns an instance of the InputDeclarer object that can be used to define the inputs to the bolt. I've used the shuffleGrouping() method, which means that the tuples should be randomly distributed from the named spout to the bolt's tasks. The InputDeclarer allows us to define other types of distributions beyond the random distribution provided by the shuffle grouping. For example, we could define a "field grouping" so that a bolt only received a subset of tuple values. (Use the fieldGrouping() method if you want to try building a field grouping.)

  • Print
  • Feedback

Resources
  • Download the source code for this article's demonstration app.

Recent articles in the Open source Java projects series:

More about Storm: