Aggregating with Apache Spark

Solving aggregation with cluster computing

1 2 Page 2
Page 2 of 2

Issues with MapReduce

The MapReduce solution achieves scalability, but scalability is relative. We were able to scale our application to find the sum of the number of flowers for a trillion or more pickers. But what if we wanted to perform another operation, such as finding the standard deviation among the number of flowers picked, or the mean or mode of these numbers? We would need to write a new MapReduce program for each computation.

Every MapReduce application reads input data individually, and writes its output back to HDFS. As a result, MapReduce is made for acyclic data flow models, a paradigm that doesn't allow for iterative querying or repeated executions on results or inputs. This is limiting, because for every solution, there is always another problem waiting in the wings.

We also don't necessarily want to be bound by reading large data sets from HDFS, given that disk I/O is very expensive and bandwidth is limited. Ideally, we want an in-memory structure partitioned across commodity machines, which will enable us to do repeated querying without reloading from disk.

Aggregating with Spark

As we try out different ways of resolving our problem statement, our list of requirements is growing. The code in Listings 6 and 7 is 104 lines long, and yet it doesn't let us create a chain of operations or computations. In addition to more composable code, we want a language that supports features like cohesion, loose coupling, and orthogonality--meaning that a relatively small set of primitive constructs can be combined in a relatively small number of ways to build the control and data structures of the language.

Java 8 added functional features to the language, but Scala is inherently functional. In this section we'll see what programming in Scala on Apache Spark brings to the table. Note that we assume some experience with Spark, and that you have Spark installed and setup in your development environment already.

Spark vs MapReduce

Just like MapReduce is the programming model for Hadoop, DAG (Direct Acyclic Graph) is the programming model for Spark. We've already noted that MapReduce doesn't have the flexibility needed to create data flows. In MapReduce, you create a map, a shuffler, and a reduce operation. The map is executed on individual blocks, its output is spilled to a circular buffer, and then it's shuffled and sorted for the reducer to pick up. Even if we use a ChainMapper or a ChainReducer, the model is [MAP+ / REDUCE MAP*], where + indicates one or more relevant tasks and * indicates zero or more.

As a result, we can never have a model that executes map->  shuffle->reduce->reduce, although this is an important requirement for iterative algorithms. If you want to set up something iterative, your only option is to schedule map->shuffle->reduce as your first job, followed by map-> shuffle->reduce. Since jobs in MapReduce are mutually exclusive, the first job has no idea whether a second job will follow. This is represented in Figure 7, where the blue line represents a map task and the black line represents a reduce task. We've excluded shuffle and sort for brevity.

jw sparkaggregate fig7 Ravishankar Nair

Figure 7. Hadoop MapReduce vs Spark DAG

Now let's analyze the same solution in Spark. You can see the data flow illustrated on the right side of Figure 7. First, note that there are no maps between reducers (i.e., no blue circles) for the Spark solution in Figure 7. That means output from the first MapReduce can be directly fed to the second set of reducers without an explicit map process. Second, there is no HDFS file system between operations. Instead, Spark leverages memory as long as the data fits it, which greatly reduces the cost for disk I/O. Most importantly, note the part of the diagram marked STAGE 1, where the yellow circles 1->2->4 and 3->5 are mutually exclusive. These can be simple transformations. Most importantly, Spark can intelligently connect the multiple stages into an execution pipeline, deciding which stages should run in parallel. This powerful design supports lightning-fast cluster computing in Spark.

Solving aggregation with cluster computing

Cluster computing makes Spark an excellent choice for solving our aggregation problem. As we've mentioned, Spark leverages memory rather than relying on HDFS. Objects stored in memory are called RDDs, or resilient distributed datasets, and they are the horsepower in the Spark system.

An RDD is a pointer for a collection of individual memory units. When you combine multiple RDDs, you get the entire object stored in Spark. If you apply a transformation or action on an RDD, all the individual partitions across the cluster are affected. Any loss to an individual unit can be reconstructed by lineage graphs associated with the creation of corresponding units. A lineage graph is the plan Spark employs to create individual RDD units or partitions. Further, RDD is immutable, and is evaluated only till an action (terminal operation) is executed against it (lazy evaluation). RDDs can handle any type of data, which makes Spark very popular. There is one drawback, however: operations on RDD must be generic in order to be applicable to any kind of data, which limits the number and type of available operations.

Listing 8 shows the definition of the aggregate function in Spark, in its simplest form. Before using aggregate, we need to specify how many partitions are to be used. We specify the number of partitions when we create the initial RDD.

Listing 8. Definition of the aggregate function

def aggregate( an initial value)(an intra-partition sequence operation)(an inter-partition combination operation)

Now let's take this aggregation model back to our garden example. Recall that there are four quadrants in the JavaWorld garden: south, north, east, and west. We'll use four partitions, one for each quadrant. Going back to Listing 1, here's how we would write the first line (listing the number of flowers) in Scala:

Listing 9. Aggregation with Spark

val flowers =
     sc.parallelize(List(11,12,13,24,25, 26, 35,36,37, 24,15,16),4)

Note that the second parameter of 4 represents the number of partitions available in our Spark cluster.

Now it's easy to visualize the data as it relates to our problem statement: 11, 12, and 13 are the number of flowers picked by each person in the south quadrant of the garden. The numbers 24, 25, and 26 are from the north quadrant; 35, 36, and 37 are from the west quadrant; and 24, 25,and 16 are from the east. Each quadrant corresponds to one node in the Spark cluster.

Next, we divide the problem statement into two parts. The first part of the problem is to aggregate the total number of flowers picked in each quadrant; that's the intra-partition sequence aggregation from Listing 8. The second part of the problem is to sum these individual aggregates across the partitions; that's the inter-partition aggregation.

So let's find the intra-partition sequence aggregation results. Note that each flower picker initially goes to the garden with an empty bucket. That will be our starting value of 0.

Southside garden: 11+12+13 = 36
Northside garden: 24+25+26 = 75
Westside garden: 35+36+37 = 108
Eastside garden: 24+25 +16 = 65

Next, we calculate the inter-partition aggregation results:

Southside + Northside + Westside + Eastside = 36 + 75 + 108 + 65 = 284

The sum, stored in an RDD can further be used and processed for any kind of transformation or other action, and it can be persisted for later iterative use. Now we write the last part of the code. This one line of Scala performs both of the above aggregations:

Listing 10. Complex aggregation with Scala

val sum = flowers.aggregate(0)(_+_, _+_)

We start with 0 as the initial value in each of the 12 buckets. The first _+_ is the intra-partition sum, adding the total number of flowers picked by each picker in each quadrant of the garden. The second _+_ is the inter-partition sum, which aggregates the total sums from each quadrant.

For the aggregation to work, we need two reduce functions after the initial value. What would happen if initial value weren't zero? If it were 5, for example, that number would be added to each intra-partition aggregate, and also to the inter-partition aggregate. So the first calculation would be:

Southside garden: 11+12+13 = 36 + 5 = 41
Northside garden: 24+25+26 = 75 + 5 = 80
Westside garden: 35+36+37 = 108 + 5 = 113
Eastside garden: 24+25 +16 = 65 + 5 = 70

Here's the inter-partition aggregation calculation with the initial value of 5:

Southside + Northside + Westside + Eastside + 5 = 41 + 80 + 113 + 70 = 309

Aggregation with accumulators

To illustrate the concept further, assume we want to find out the maximum number of flowers in each quadrant of the garden, and then aggregate the totals. We only need a slight alteration to the intra-partition function:

Listing 11. Accumulators for a quadrant

val sumofmaximums = flowers.aggregate(0)(Math.max(_,_), _+_)

And what if we wanted to find the maximum number of flowers each person could pick, across the entire garden? We could do:

Listing 12. Accumulators for the garden

val maximum = flowers.aggregate(0)(Math.max(_,_), Math.max(_,_))

The initial value used in these examples is called an accumulator, in this case, a value that is iterated across partitions and then propagated for the final result.

Aggregation with tuples

For our final example, let's say that we can use as many initial values as we want. In that case, we could solve the problem of finding the average number of flowers among all of the flower pickers in each quadrant of our garden like this:

Listing 13. Tuples

val flowersandpickers = flowers.aggregate((0,0)) (
        (acc, value) => (acc._1 + value, acc_.2 +1),
         (acc1, acc2) => acc1._1 + acc2._1, acc1._2 + acc2._2)

In this example, notice that the reduce functions applied within the aggregate have to be both commutative and associative. There shouldn't be any order of execution for sequencing or combining operations. The initial values are two zeros, representing a tuple, or pair. The first zero is the initial value for the sum total number of flowers picked (because you start with zero flowers in the basket); the second zero is the initial value we use to find the average sum of flowers picked per picker (because you start with zero flowers picked). The intra-sequence aggregation adds the number of flowers in each quadrant. At the same time, we add the number 1, indicating that we have added one flower picker per basket. The inter-partition combination function adds the number of flowers and the number of flower pickers from each quadrant. To find the average, we then write:

Listing 14. Averaging with tuples

val avg = flowersandpickers._1/ flowersandpickers._2.toDouble

For comparison, here is how you would write equivalent code in Python instead of Scala:

Listing 15. Aggregating with Python

 flowersandpickers = sc.parallelize([11,12,13,24,25, 26, 35,36,37, 24,15,16],4).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
  avg = flowersandpickers[0]/ float(flowersandpickers[1])

In conclusion

A three-node Spark cluster outperforms MapReduce at about 14 times the speed, and the DAG execution patterns ensures that RDDs are reusable for future iterations. While you could program your Spark solution in Java, Scala code is significantly more efficient. The entire program, with accumulators and tuples, is just 10 lines of code.

1 2 Page 2
Page 2 of 2