|
|
Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs
Page 5 of 7
The declareOutputFields() method defines the fields that the spout emits, which in this example includes a single value with the name number. Remember that by default Fields include support for primitive types (and primitive type wrappers) like Integers and Strings. So it will be up to the nextTuple() method to pass values in the order that they are declared in the declareOutputFields() response.
The PrimeNumberBolt's prepare() method, shown in Listing 2, is called when the bolt is initialized, so it saves the OutputCollector. If this bolt were to emit its result for another bolt to consume, it would use the collector to do so. In this example it
does not emit any bolts, but I've included it to demonstrate how it would continue processing.
package com.geekcap.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class PrimeNumberBolt extends BaseRichBolt
{
private OutputCollector collector;
public void prepare( Map conf, TopologyContext context, OutputCollector collector )
{
this.collector = collector;
}
public void execute( Tuple tuple )
{
int number = tuple.getInteger( 0 );
if( isPrime( number) )
{
System.out.println( number );
}
collector.ack( tuple );
}
public void declareOutputFields( OutputFieldsDeclarer declarer )
{
declarer.declare( new Fields( "number" ) );
}
private boolean isPrime( int n )
{
if( n == 1 || n == 2 || n == 3 )
{
return true;
}
// Is n an even number?
if( n % 2 == 0 )
{
return false;
}
//if not, then just check the odds
for( int i=3; i*i<=n; i+=2 )
{
if( n % i == 0)
{
return false;
}
}
return true;
}
}
When processing, the PrimeNumberBolt extracts the first value from the Tuple, which is the Integer to process, inspects it to see if it is prime by calling the isPrime() method, and then prints out the value if the number is prime. The declareOutputFields() method would be used if the bolt were to emit a new Tuple for processing by another bolt. In this example it defines that it outputs a single field called number, but it does not call collector.emit() to emit the tuple (it's only shown for your reference).
The PrimeNumberTopology class in Listing 3 uses the TopologyBuilder helper class to build a Topology object. To construct a Topology, create an instance of TopologyBuilder and invoke its setSpout() and setBolt() methods to add spouts and bolts to the topology. Each spout and bolt has a name -- in this example the spout has the name
"spout."
package com.geekcap.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
public class PrimeNumberTopology
{
public static void main(String[] args)
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout", new NumberSpout() );
builder.setBolt( "prime", new PrimeNumberBolt() )
.shuffleGrouping("spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
The setBolt() method in above returns an instance of the InputDeclarer object that can be used to define the inputs to the bolt. I've used the shuffleGrouping() method, which means that the tuples should be randomly distributed from the named spout to the bolt's tasks. The InputDeclarer allows us to define other types of distributions beyond the random distribution provided by the shuffle grouping. For example,
we could define a "field grouping" so that a bolt only received a subset of tuple values. (Use the fieldGrouping() method if you want to try building a field grouping.)
Recent articles in the Open source Java projects series:
More about Storm: