reduceByKey() method iterates over the
JavaPairRDD, finds all distinct keys in the tuples, and executes the provided
call() method against all of the tuple's values. Stated another way, it finds all instances of the same word (such as apple) and then passes each count (each of the 1 values) to the supplied function to count occurrences of the word. Our
call() function simply adds the two counts and returns the result.
Note that this is similar to how we would implement a word count in a Hadoop MapReduce application: map each word in the text file to a count of 1 and then reduce the results by adding all of the counts for each unique key.
Iterating the RDD with Java 8
Once again, the Java 8 example performs all of these same functions, but does it in a single, succinct line of code:
JavaPairRDD<String, Integer> counts = words.mapToPair( t -> new Tuple2( t, 1 ) ).reduceByKey( (x, y) -> (int)x + (int)y );
mapToPair() method is passed a function. Given a
t, which in our case is a word, the function returns a
Tuple2 that maps the word to the number 1. We then chain the
reduceByKey() method and pass it a function that reads: given input
y, return their sum. Note that we needed to cast the input to
int so that we could perform the addition operation.
Transformations for key/value pairs
When writing Spark applications you will find yourself frequently working with pairs of elements so Spark provides a set of common transformations that can be applied specifically to key/value pairs:
reduceByKey(function)combines values with the same key using the provided function.
groupByKey()maps unique keys to an array of values assigned to that key.
combineByKey()combines values with the same key, but uses a different result type.
mapValues(function)applies a function to each of the values in the RDD.
flatMapValues(function)applies a function to all values, but in this case the function returns an iterator to each newly generated value. Spark then creates new pairs for each value mapping the original key to each of the generated values.
keys()returns an RDD that contains only the keys from the pairs.
values()returns an RDD that contains only the values from the pairs.
sortByKey()sorts the RDD by the key value.
subtractByKey(another RDD)removes all element from the RDD for which there is a key in the other RDD.
join(another RDD)performs an inner join between the two RDDs; the result will only contain keys present in both and those keys will be mapped to all values in both RDDs.
rightOuterJoin(another RDD)performs a right outer join between the two RDDs in which all keys must be present in the other RDD.
leftOuterJoin(another RDD)performs a left outer join between the two RDDs in which all of the keys must be present in the original RDD.
cogroup(another RDD)groups data from both RDDs that have the same key.
After we have transformed our RDD pairs, we invoke the
saveAsTextFile() action method on the
JavaPairRDD to create a directory called
output and store the results in files in that directory.
With our data compiled into the format that we want, our final step is to build and execute the application, then use Spark actions to derive some results sets.
Actions in Spark
WordCount application with the following command:
mvn clean install
Execute it from the
target directory with the following command:
java -jar spark-example-1.0-SNAPSHOT.jar YOUR_TEXT_FILE
Let's create a short text file to test that it works. Enter the following text into a file called
This is a test This is a test The test is not over Okay, now the test is over
target folder, execute the following command:
java -jar spark-example-1.0-SNAPSHOT.jar test.txt
Spark will create an
output folder with a new file called
part-00000. If you look at this file, you should see the following output:
(not,1) (The,1) (is,4) (a,2) (This,2) (over,2) (Okay,,1) (now,1) (the,1) (test,4)
The output contains all words and the number of times that they occur. If you want to optimize the output, you might want to set all words to lower case (note the two "the"s), but I'll leave that as an exercise for you.
Finally, if you really want to take Spark for a spin, check out Project Gutenberg and download the full text of a large book. (I parsed Homer's Odyssey to test this application.)
Common actions in Spark
saveAsTextFile() method is a Spark action. Earlier we saw transformations, which transform an RDD into another RDD, but actions generate our actual results. The following, in addition to the various save actions, are the most common actions:
collect()returns all elements in the RDD.
count()returns the number of elements in the RDD.
countByValue()returns the number of elements with the specified value that are in the RDD.
take(count)returns the requested number of elements from the RDD.
top(count)returns the top "count" number of elements from the RDD.
takeOrdered(count)(ordering)returns the specified number of elements ordered by the specified ordering function.
takeSample()returns a random sample of the number of requested elements from the RDD.
reduce()executes the provided function to combine the elements into a result set.
fold()is similar to
reduce(), but provides a "zero value."
aggregate()is similar to
fold()(it also accepts a zero value), but is used to return a different type that the source RDD.
foreach()executes the provided function on each element in the RDD, which good for things like writing to a database or publishing to a web service.
The actions listed are some of the most common that you'll use, but other actions exist, including some designed to operate different types of RDD collections. For example
variance() operate on RDDs of numbers and
join() operates on RDDs of key/value pairs.
Summary: Data analysis with Spark
The steps for analyzing data with Spark can be grossly summarized as follows:
- Obtain a reference to an RDD.
- Perform transformations to convert the RDD to the form you want to analyze.
- Execute actions to derive your result set.
An important note is that while you may specify transformations, they do not actually get executed until you specify an action. This allows Spark to optimize transformations and reduce the amount of redundant work that it needs to do. Another important thing to note is that once an action is executed, you'll need to apply the transformations again in order to execute more actions. If you know that you're going to execute multiple actions then you can persist the RDD before executing the first action by invoking the
persist() method; just be sure to release it by invoking
unpersist() when you're done.
Spark in a distributed environment
Now that you've seen an overview of the programming model for Spark, let's briefly review how Spark works in a distributed environment. Figure 1 shows the distributed model for executing Spark analysis.
Spark consists of two main components:
- Spark Driver
The Spark Driver is the process that contains your
main() method and defines what the Spark application should do. This includes creating RDDs, transforming RDDs, and applying actions. Under the hood, when the Spark Driver runs, it performs two key activities:
- Converts your program into tasks: Your application will contain zero or more transformations and actions, so it's the Spark Driver's responsibility to convert those into executable tasks that can be distributed across the cluster to executors. Additionally, the Spark Driver optimizes your transformations into a pipeline to reduce the number of actual transformations needed and builds an execution plan. It is that execution plan that defines how tasks will be executed and the tasks themselves are bundled up and sent to executors.
- Schedules tasks for executors: From the execution plan, the Spark Driver coordinates the scheduling of each task execution. As executors start, they register themselves with the Spark Driver, which gives the driver insight into all available executors. Because tasks execute against data, the Spark Driver will find the executors running on the machines with the correct data, send the tasks to execute, and receive the results.
Spark Executors are processes running on distributed machines that execute Spark tasks. Executors start when the application starts and typically run for the duration of the application. They provide two key roles:
- Execute tasks sent to them by the driver and return the results.
- Maintain memory storage for hosting and caching RDDs.
The cluster manager is the glue that wires together drivers and executors. Spark provides support for different cluster managers, including Hadoop YARN and Apache Mesos. The cluster manager is the component that deploys and launches executors when the driver starts. You configure the cluster manager in your Spark Context configuration.
The following steps summarize the execution model for distributed Spark applications:
- Submit a Spark application using the
spark-submitcommand will launch your driver program's
- The driver program will connect to the cluster manager and request resources on which to launch the executors.
- The cluster manager deploys the executor code and starts their processes on those resources.
- The driver runs and sends tasks to the executors.
- The executors run the tasks and sends the results back to the driver.
- The driver completes its execution, stops the Spark context, and the resources managed by the cluster manager are released.
Applying Spark to different technologies
We've reviewed the Spark programming model and seen how Spark applications are distributed across a Spark cluster. We'll conclude with a quick look at how Spark can be used to analyze different data sources. Figure 2 shows the logical components that make up the Spark stack.
At the center of the Spark stack is the Spark Core. The Spark Core contains all the basic functionality of Spark, including task scheduling, memory management, fault recovery, integration with storage systems, and so forth. It also defines the resilient distributed datasets (RDDs) that we saw in our
WordCount example and the APIs to interact with RDDs, including all of the transformations and actions we explored in the previous section.
Four libraries are built on top of the Spark Core that allow you to analyze data from other sources:
- Spark SQL: Allows for querying data via SQL as well as the Hive Query Language (HQL), if you are running Hive on top of Hadoop. The important thing about Spark SQL is that it allows you to intermix SQL queries with the programmatic RDD operations and actions that we saw in the
- Spark Streaming: Allows you to process live streaming data, such as data that is generated by an application's log file or live feeds from a message queue. Spark Streaming provides an API that is very similar to the core RDD operations and actions.
- MLib: Provides machine learning functionality and algorithms, including classification, regressions, clustering, and collaborative filtering.
- GraphX: Extends the Spark Core RDD API to add support for manipulating graph data, such as might be found in a social network.
So where do you go from here? As we have already seen, you can load data from a local file system or a distributed file system such as HDFS and S3. Spark provides capabilities to read plain text, JSON, sequence files, protocol buffers, and more. Additionally, Spark allows you to read structured data through Spark SQL, and it allows you to read key/value data from data sources such as Cassandra, HBase, and ElasticSearch. For all these instances the process is the same:
- Create one or more RDDs that references your data sources (file, SQL database, HDFS files, key/value store, etc).
- Transform those RDDs into the format that you want, including operating on two RDDs together.
- If you are going to execute multiple actions, persist your RDDs.
- Execute actions to derive your business value.
- If you persisted your RDDs, be sure to unpersist them when you're finished.
As more small and large operations realize the benefits of big data we are seeing an increase in solutions addressing specific problem domains for big data analysis. One of the challenges for big data is how to analyze collections of data distributed across different technology stacks. Apache Spark provides a computational engine that can pull data from multiple sources and analyze it using the common abstraction of resilient distributed datasets, or RDDs. Its core operation types include transformations to massage your data and convert it from its source format to the form you want to analyze, and actions that derive your business value.
In this article we've built a small, locally run Spark application whose purpose is to count words. You've practiced several different transformations and one save action, and had an overview of Spark's programming and execution models. I've also discussed Spark's support for running distributed across multiple machines by leveraging a cluster manager such as Hadoop YARN and Apache Mesos with drivers and executors. Finally, I discussed extensions built on top of Spark for analyzing data in an SQL database, from a streaming source, and to perform applied analysis solutions for machine learning and graph processing.
We've only scratched the surface of what is possible with Spark, but I hope that this article has both inspired you and equipped you with the fundamentals to start analyzing Internet-scale collections of data using Spark.