# Aggregating with Apache Spark

### Solving aggregation with cluster computing

An aggregate in mathematics is defined as a "collective amount, sum, or mass arrived at by adding or putting together all components, elements, or parts of an assemblage or group without implying that the resulting total is whole." While there are many uses for aggregation in data science--examples include log aggregation, spatial aggregation, and network aggregation--it always pertains to some form of summation or collection. In this article, we'll look at the mechanics of aggregation in Apache Spark, a top-level Apache project that is popularly used for lightning-fast cluster computing.

Aggregation is abstract in theory, but we use it all the time in the real world. Imagine a garden (call it the JavaWorld garden) that is divided into four quadrants: east, west, south, and north. Each quadrant is landscaped with beautiful roses of different varieties, from Alba to Mini-flora to Polyantha. Three people are assigned to each quadrant of the garden, and each person is tasked with picking all of the roses in their section of the quadrant. In total, twelve people are picking roses in the garden.

Our task is to find the aggregate number of all the flowers picked. We'll then divide that number by 12, to determine the average number of flowers picked by each person.

Get the source code for the example applications demonstrated in this article: "Aggregating with Apache Spark." Created by Ravishankar Nair for JavaWorld.

## Aggregation with the Streams API

At first, the problem statement appears very simple. Let's consider the first part: find the aggregate number of roses picked from the entire garden. Starting in Java 8, we can use `Stream` to represent a sequence of elements and run various computational operations on those elements. In this case, we could use it to process the elements of an array, reducing them to a function able to take in another function, like so:

#### Listing 1. Aggregation with Stream

``````
import java.util.stream.IntStream;
public class  NormalAggregate
{
public static void main(String[] args)
{
/* First store number of flowers picked by each person in an
array called flowers */
int[] flowers = new int[]{11,12,13,24,25, 26, 35,36,37, 24,15,16};
int noofpersons=12;
int sum = IntStream.of(flowers).reduce( 0,(a, b) -> a + b);
System.out.println("The no of flowers aggregated: " + sum);
System.out.println("The average flowers picked per person: " + (float)sum/(float)noofpersons);
System.out.println("Another way to find aggregate :" + IntStream.of(flowers).sum());
}
}
```
```

## Streams and pipes

If you've ever worked with pipes in Unix, the code in Listing 1 will look familiar. Here's how we would use a pipe to filter out the Apache logs for the month of August:

#### Listing 1. Filtering with pipes

``````
ls -al | grep Aug
```
```

As its name indicates, a pipe (`|`) is a component in a Unix pipeline, taking output from one command and letting it flow to the next one. Going back to the problem in Java, we would first store the number of flowers picked by each person in an array called `flowers`. Using a lambda function, we would then add the elements with an accumulator value of 0. Internally, the parameter starts with the initial element of the array, then adds in succession until it reaches the last element:

#### Listing 2. Calculating in aggregate with Stream

``````
int accumulator = 0;
for( int i = 0; i < flowers.length; i++)
accumulator += flowers[i];
```
```

So far, so good. Now let's see what happens when we distribute the load.

## A better approach: Parallel aggregation with Java threads

Multicore CPUs are common enough now that we can take a multithreaded approach to solving aggregation. Assuming for the sake of example that we have an Intel i7 four-core processor, we would start by dividing the array, with a lower and upper bound for each available processor. We'd then calculate the aggregate. Listing 3 shows the most important part of the solution. You can find the rest in the source code for this article.

#### Listing 3. Multithreaded calculation using multicore processing

``````
public static int parallelAggregate(int[] arr, int threads)
{
int size = (int) Math.ceil(arr.length * 1.0 / threads);
for (int i = 0; i < threads; i++) {
individualTotals[i] = new ParallelAggregation(arr, i * size, (i + 1) * size);
individualTotals[i].start();
}
try {
for (ParallelAggregation sum : indivdualTotals) {
sum.join();
}
} catch (InterruptedException e) { }
int total = 0;
for (ParallelAggregation sum : individualTotals) {
}
}
```
```

The function in Listing 3 starts by dividing the size of the array almost equally among the number of threads. We can then run parallel threads, taking the sum from each. Finally, we add all of the sums to get the aggregate.

Here's how we would invoke the above method:

``````
public static int parallelAggregate(int[] arr){
return parallelAggregate(arr, Runtime.getRuntime().availableProcessors());
```
```

Figure 1 is a screenshot of the number of processors used to run this test in our standalone system:

The function in Listing 3 uses all of the available processors, with each processor running its own thread. Figure 2 shows the details of the Intel i7 processor.

Figure 2 shows that we have up to four threads available in the i7 Intel processor, so our program will divide the number of elements in the array into four equal sizes and calculate the aggregate. For the sake of comparison, we will run the aggregation using both threadless and multithreaded aggregation. For fun, let's assume we have 500 million flower pickers:

``````
public static void main(String[] args)
{
java.util.Random rand = new java.util.Random();
int[] flowers = new int[500000000];
for (int i = 0; i < flowers.length; i++) {
flowers[i] = rand.nextInt(50) + 1; // simulate 1..50
}
long start = System.currentTimeMillis();
ParallelAggregation.aggregate(flowers);
System.out.println("Single: " + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
ParallelAggregation.parallelAggregate(flowers);
System.out.println("Parallel: " + (System.currentTimeMillis() - start));     }
```
```

Figure 3 shows the sample output from an intel i7-5500 CPU based system.

The program in Listing 4 works, and it produces accurate results for both multithreaded and threadless solutions. But how would these solutions scale for big data? Let's say we wanted to aggregate for trillions of flower pickers. Figure 4 shows the results for the multithreaded program:

The program doesn't have sufficient processing power for an array of that size.  Next, we run the program again. This time we keep the number of flower pickers to 500 million, but increase our thread count from four to 1,000.

The program runs this time, but the output isn't as promising as it was when running four threads on four processors. In multicore, a common load-balancing recommendation is n+1 threads, with n being the number of CPU cores available. That way, n threads can work with the CPU while one thread waits for disk I/O. Having fewer threads would not fully utilize the CPU resource (because at some point there will always be I/O to wait for), while having more threads forces them to fight for CPU.

Threads come at a cost, which pays off when you have the dedicated CPU cores to run your code. On a single-core CPU, a single-process (threadless) solution is usually faster. Threads alone don't automatically increase the speed of processing, but they do require more work. As you saw, threads also don't necessarily scale for big data. Given these issues, our search for an ideal solution is not over.

## MapReduce: When scalability is the goal

In order to solve our problem statement and aggregate at a scale of one trillion or more flower pickers, we need a solution that can scale without dependency on the underlying machine's processing capacity. We want accuracy, consistency, fault tolerance, fail safety (gradual degradation), and efficient resource utilization.

MapReduce 2, with YARN, is a good framework for this challenge. The script below generates 500 million random numbers of flowers between 1 and 50:

#### Listing 5. Aggregation with MapReduce and YARN

``````
for  i in {1 .. 500000000 }; do echo \$[(\$RANDOM % 50 +1)];
done > test.dat
```
```

To run this example for yourself, find the complete "test.dat" script in the source download. Create a directory in your HDFS and place the test data file inside, then follow the instructions.

Here's the mapper for the MapReduce aggregation:

#### Listing 6. Aggregation mapper

``````
public void map(Object key,Text value,Context context)
throws IOException, InterruptedException
{
StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\n\r\f,.:;?![]'");
while (tokenizer.hasMoreTokens())
{
// make the words lowercase so words like "an" and "An" are counted as one word
String s = tokenizer.nextToken().toLowerCase().trim();
IntWritable val = new IntWritable(Integer.parseInt(s));
word.set("aggregate");
context.write(word, val);
}
}
```
```

The `map` function in Listing 6 reads the input data line by line. It then creates a key called `aggregate`. It emits every number on each line, along with the `aggregate` key as a tuple. Note that we've used the same key for the reducer in Listing 7, thus directing the output from both `map` and `reduce` to a single node for aggregation.

#### Listing 7. Aggregation reducer

``````
public void reduce(Text key,
Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
total.set(sum);
// this writes the word and the count, like this:
// ("aggregate", 2)
context.write(key, total);
}
```
```

The reducer gets the emitted key and list of values from the mapper and aggregates all values. Because we've used a constant key, the reduce operation occurs in the same node. Using the reducer class as a combiner in the driver program makes this MapReduce program very efficient for executing on a large cluster.

Using MapReduce with YARN, we can efficiently execute data for a trillion rows on a two-node cluster. If we need to add even more flower pickers, we can scale by increasing application nodes. The inherent design of MapReduce allows speculative execution, which ensures fail safety by running a filed or slow task in a secondary node, where a copy of the original data resides. Figure 6 shows the output for a successful single-node execution.

## Issues with MapReduce

The MapReduce solution achieves scalability, but scalability is relative. We were able to scale our application to find the sum of the number of flowers for a trillion or more pickers. But what if we wanted to perform another operation, such as finding the standard deviation among the number of flowers picked, or the mean or mode of these numbers? We would need to write a new MapReduce program for each computation.

Every MapReduce application reads input data individually, and writes its output back to HDFS. As a result, MapReduce is made for acyclic data flow models, a paradigm that doesn't allow for iterative querying or repeated executions on results or inputs. This is limiting, because for every solution, there is always another problem waiting in the wings.

We also don't necessarily want to be bound by reading large data sets from HDFS, given that disk I/O is very expensive and bandwidth is limited. Ideally, we want an in-memory structure partitioned across commodity machines, which will enable us to do repeated querying without reloading from disk.

## Aggregating with Spark

As we try out different ways of resolving our problem statement, our list of requirements is growing. The code in Listings 6 and 7 is 104 lines long, and yet it doesn't let us create a chain of operations or computations. In addition to more composable code, we want a language that supports features like cohesion, loose coupling, and orthogonality--meaning that a relatively small set of primitive constructs can be combined in a relatively small number of ways to build the control and data structures of the language.

Java 8 added functional features to the language, but Scala is inherently functional. In this section we'll see what programming in Scala on Apache Spark brings to the table. Note that we assume some experience with Spark, and that you have Spark installed and setup in your development environment already.

## Spark vs MapReduce

Just like MapReduce is the programming model for Hadoop, DAG (Direct Acyclic Graph) is the programming model for Spark. We've already noted that MapReduce doesn't have the flexibility needed to create data flows. In MapReduce, you create a map, a shuffler, and a reduce operation. The map is executed on individual blocks, its output is spilled to a circular buffer, and then it's shuffled and sorted for the reducer to pick up. Even if we use a `ChainMapper` or a `ChainReducer`, the model is `[MAP+ / REDUCE MAP*]`, where `+` indicates one or more relevant tasks and `*` indicates zero or more.

As a result, we can never have a model that executes `map->  shuffle->reduce->reduce`, although this is an important requirement for iterative algorithms. If you want to set up something iterative, your only option is to schedule `map->shuffle->reduce` as your first job, followed by `map-> shuffle->reduce`. Since jobs in MapReduce are mutually exclusive, the first job has no idea whether a second job will follow. This is represented in Figure 7, where the blue line represents a `map` task and the black line represents a `reduce` task. We've excluded `shuffle` and `sort` for brevity.

Now let's analyze the same solution in Spark. You can see the data flow illustrated on the right side of Figure 7. First, note that there are no maps between reducers (i.e., no blue circles) for the Spark solution in Figure 7. That means output from the first MapReduce can be directly fed to the second set of reducers without an explicit `map` process. Second, there is no HDFS file system between operations. Instead, Spark leverages memory as long as the data fits it, which greatly reduces the cost for disk I/O. Most importantly, note the part of the diagram marked STAGE 1, where the yellow circles `1->2->4` and `3->5` are mutually exclusive. These can be simple transformations. Most importantly, Spark can intelligently connect the multiple stages into an execution pipeline, deciding which stages should run in parallel. This powerful design supports lightning-fast cluster computing in Spark.

## Solving aggregation with cluster computing

Cluster computing makes Spark an excellent choice for solving our aggregation problem. As we've mentioned, Spark leverages memory rather than relying on HDFS. Objects stored in memory are called RDDs, or resilient distributed datasets, and they are the horsepower in the Spark system.

An RDD is a pointer for a collection of individual memory units. When you combine multiple RDDs, you get the entire object stored in Spark. If you apply a transformation or action on an RDD, all the individual partitions across the cluster are affected. Any loss to an individual unit can be reconstructed by lineage graphs associated with the creation of corresponding units. A lineage graph is the plan Spark employs to create individual RDD units or partitions. Further, RDD is immutable, and is evaluated only till an action (terminal operation) is executed against it (lazy evaluation). RDDs can handle any type of data, which makes Spark very popular. There is one drawback, however: operations on RDD must be generic in order to be applicable to any kind of data, which limits the number and type of available operations.

Listing 8 shows the definition of the aggregate function in Spark, in its simplest form. Before using `aggregate`, we need to specify how many partitions are to be used. We specify the number of partitions when we create the initial RDD.

#### Listing 8. Definition of the aggregate function

``````
def aggregate( an initial value)(an intra-partition sequence operation)(an inter-partition combination operation)
```
```

Now let's take this aggregation model back to our garden example. Recall that there are four quadrants in the JavaWorld garden: south, north, east, and west. We'll use four partitions, one for each quadrant. Going back to Listing 1, here's how we would write the first line (listing the number of flowers) in Scala:

#### Listing 9. Aggregation with Spark

``````
val flowers =
sc.parallelize(List(11,12,13,24,25, 26, 35,36,37, 24,15,16),4)
```
```

Note that the second parameter of 4 represents the number of partitions available in our Spark cluster.

Now it's easy to visualize the data as it relates to our problem statement: 11, 12, and 13 are the number of flowers picked by each person in the south quadrant of the garden. The numbers 24, 25, and 26 are from the north quadrant; 35, 36, and 37 are from the west quadrant; and 24, 25,and 16 are from the east. Each quadrant corresponds to one node in the Spark cluster.

Next, we divide the problem statement into two parts. The first part of the problem is to aggregate the total number of flowers picked in each quadrant; that's the intra-partition sequence aggregation from Listing 8. The second part of the problem is to sum these individual aggregates across the partitions; that's the inter-partition aggregation.

So let's find the intra-partition sequence aggregation results. Note that each flower picker initially goes to the garden with an empty bucket. That will be our starting value of 0.

Southside garden: 11+12+13 = 36
Northside garden: 24+25+26 = 75
Westside garden: 35+36+37 = 108
Eastside garden: 24+25 +16 = 65

Next, we calculate the inter-partition aggregation results:

Southside + Northside + Westside + Eastside = 36 + 75 + 108 + 65 = 284

The sum, stored in an RDD can further be used and processed for any kind of transformation or other action, and it can be persisted for later iterative use. Now we write the last part of the code. This one line of Scala performs both of the above aggregations:

#### Listing 10. Complex aggregation with Scala

``````
val sum = flowers.aggregate(0)(_+_, _+_)
```
```

We start with 0 as the initial value in each of the 12 buckets. The first `_+_` is the intra-partition sum, adding the total number of flowers picked by each picker in each quadrant of the garden. The second `_+_` is the inter-partition sum, which aggregates the total sums from each quadrant.

For the aggregation to work, we need two `reduce` functions after the initial value. What would happen if initial value weren't zero? If it were 5, for example, that number would be added to each intra-partition aggregate, and also to the inter-partition aggregate. So the first calculation would be:

Southside garden: 11+12+13 = 36 + 5 = 41
Northside garden: 24+25+26 = 75 + 5 = 80
Westside garden: 35+36+37 = 108 + 5 = 113
Eastside garden: 24+25 +16 = 65 + 5 = 70

Here's the inter-partition aggregation calculation with the initial value of 5:

Southside + Northside + Westside + Eastside + 5 = 41 + 80 + 113 + 70 = 309

## Aggregation with accumulators

To illustrate the concept further, assume we want to find out the maximum number of flowers in each quadrant of the garden, and then aggregate the totals. We only need a slight alteration to the intra-partition function:

#### Listing 11. Accumulators for a quadrant

``````
val sumofmaximums = flowers.aggregate(0)(Math.max(_,_), _+_)
```
```

And what if we wanted to find the maximum number of flowers each person could pick, across the entire garden? We could do:

#### Listing 12. Accumulators for the garden

``````
val maximum = flowers.aggregate(0)(Math.max(_,_), Math.max(_,_))
```
```

The initial value used in these examples is called an accumulator, in this case, a value that is iterated across partitions and then propagated for the final result.

## Aggregation with tuples

For our final example, let's say that we can use as many initial values as we want. In that case, we could solve the problem of finding the average number of flowers among all of the flower pickers in each quadrant of our garden like this:

#### Listing 13. Tuples

``````
val flowersandpickers = flowers.aggregate((0,0)) (
(acc, value) => (acc._1 + value, acc_.2 +1),
(acc1, acc2) => acc1._1 + acc2._1, acc1._2 + acc2._2)
)
```
```

In this example, notice that the `reduce` functions applied within the aggregate have to be both commutative and associative. There shouldn't be any order of execution for sequencing or combining operations. The initial values are two zeros, representing a tuple, or pair. The first zero is the initial value for the sum total number of flowers picked (because you start with zero flowers in the basket); the second zero is the initial value we use to find the average sum of flowers picked per picker (because you start with zero flowers picked). The intra-sequence aggregation adds the number of flowers in each quadrant. At the same time, we add the number 1, indicating that we have added one flower picker per basket. The inter-partition combination function adds the number of flowers and the number of flower pickers from each quadrant. To find the average, we then write:

#### Listing 14. Averaging with tuples

``````
val avg = flowersandpickers._1/ flowersandpickers._2.toDouble
```
```

For comparison, here is how you would write equivalent code in Python instead of Scala:

#### Listing 15. Aggregating with Python

``````
flowersandpickers = sc.parallelize([11,12,13,24,25, 26, 35,36,37, 24,15,16],4).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
avg = flowersandpickers[0]/ float(flowersandpickers[1])
```
```

## In conclusion

A three-node Spark cluster outperforms MapReduce at about 14 times the speed, and the DAG execution patterns ensures that RDDs are reusable for future iterations. While you could program your Spark solution in Java, Scala code is significantly more efficient. The entire program, with accumulators and tuples, is just 10 lines of code.