Open source Java projects: Apache Spark

High-performance big data analysis with Spark!

Big data adoption has been growing by leaps and bounds over the past few years, which has necessitated new technologies to analyze that data holistically. Individual big data solutions provide their own mechanisms for data analysis, but how do you analyze data that is contained in Hadoop, Splunk, files on a file system, a local database, and so forth?

The answer is that you need an abstraction that can pull data from all of these sources and analyze potentially petabytes of information very rapidly.

Spark is a computational engine that manages tasks across a collection of worker machines in what is called a computing cluster. It provides the necessary abstraction, integrates with a host of different data sources, and analyzes data very quickly. This installation in the Open source Java projects series reviews Spark, describes how to set up a local environment, and demonstrates how to use Spark to derive business value from your data.

download
Source code for "Open source Java projects: Apache Spark!" Created by Steven Haines for JavaWorld.

Counting words with Spark

Let's begin by writing a simple word-counting application using Spark in Java. After this hands-on demonstration we'll explore Spark's architecture and how it works.

Similar to the standard "Hello, Hadoop" application, the "Hello, Spark" application will take a source text file and count the number of unique words that are in it. To start, create a new project using Maven with the following command:


mvn archetype:generate -DgroupId=com.geekcap.javaworld -DartifactId=spark-example

Next, modify your pom.xml file to include the following Spark dependency:


<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>1.4.0</version>
</dependency>		

Listing 1 shows the complete contents of my pom.xml file.

Listing 1. pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.geekcap.javaworld</groupId>
    <artifactId>spark-example</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>spark-example</name>
    <url>http://maven.apache.org</url>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>

        <!-- Import Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>	

Note the three plugins I added to the build directive:

  • Setting the compilation level to Java 8 enables lambda support.
  • Defining a main class that references the WordCount class instructs Maven to create an executable JAR for WordCount.
  • Having Maven copy all dependencies to the target/lib directory enables the executable JAR file to run.

With that out of the way, Listing 2 shows the source code for the WordCount application. Note that it shows how to write the Spark code in both Java 7 and Java 8. I'll discuss highlights of both below.

Listing 2. WordCount.java


package com.geekcap.javaworld.sparkexample;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Sample Spark application that counts the words in a text file
 */
public class WordCount
{

    public static void wordCountJava7( String filename )
    {
        // Define a configuration to use to interact with Spark
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");

        // Create a Java version of the Spark Context from the configuration
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Load the input data, which is a text file read from the command line
        JavaRDD<String> input = sc.textFile( filename );

        // Java 7 and earlier
        JavaRDD<String> words = input.flatMap(
                new FlatMapFunction<String, String>() {
                    public Iterable<String> call(String s) {
                        return Arrays.asList(s.split(" "));
                    }
                } );

        // Java 7 and earlier: transform the collection of words into pairs (word and 1)
        JavaPairRDD<String, Integer> counts = words.mapToPair(
            new PairFunction<String, String, Integer>(){
                public Tuple2<String, Integer> call(String s){
                        return new Tuple2(s, 1);
                    }
            } );

        // Java 7 and earlier: count the words
        JavaPairRDD<String, Integer> reducedCounts = counts.reduceByKey(
            new Function2<Integer, Integer, Integer>(){
                public Integer call(Integer x, Integer y){ return x + y; }
            } );

        // Save the word count back out to a text file, causing evaluation.
        reducedCounts.saveAsTextFile( "output" );
    }

    public static void wordCountJava8( String filename )
    {
        // Define a configuration to use to interact with Spark
        SparkConf conf = new SparkConf().setMaster("local").setAppName("Work Count App");

        // Create a Java version of the Spark Context from the configuration
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Load the input data, which is a text file read from the command line
        JavaRDD<String> input = sc.textFile( filename );

        // Java 8 with lambdas: split the input string into words
        JavaRDD<String> words = input.flatMap( s -> Arrays.asList( s.split( " " ) ) );

        // Java 8 with lambdas: transform the collection of words into pairs (word and 1) and then count them
        JavaPairRDD<String, Integer> counts = words.mapToPair( t -> new Tuple2( t, 1 ) ).reduceByKey( (x, y) -> (int)x + (int)y );

        // Save the word count back out to a text file, causing evaluation.
        counts.saveAsTextFile( "output" );
    }

    public static void main( String[] args )
    {
        if( args.length == 0 )
        {
            System.out.println( "Usage: WordCount <file>" );
            System.exit( 0 );
        }

        wordCountJava8( args[ 0 ] );
    }
}	

The WordCount application's main method accepts the source text file name from the command line and then invokes the workCountJava8() method. It defines two helper methods -- wordCountJava7() and wordCountJava8() -- that perform the same function (counting words), first in Java 7's notation and then in Java 8's.

WordCount in Java 7

The wordCountJava7() method is more explicit, so we'll start there. We first create a SparkConf object that points to our Spark instance, which in this case is "local." This means that we're going to be running Spark locally in our Java process space. In order to start interacting with Spark, we need a SparkContext instance, so we create a new JavaSparkContext that is configured to use our SparkConf. Now we have four steps:

  1. Load our input data.
  2. Parse our input into words.
  3. Reduce our words into a tuple pair that contains the word and the count of occurrences.
  4. Save our results.

The first step is to leverage the JavaSparkContext's textFile() to load our input from the specified file. This method reads the file from either the local file system or from a Hadoop Distributed File System (HDFS) and returns a resilient distributed dataset (RDD) of Strings. An RDD is Spark's core data abstraction and represents a distributed collection of elements. You'll find that we perform operations on RDDs, in the form of Spark transformations, and ultimately we leverage Spark actions to translate an RDD into our desired result set.

In this case, the transformation we want to first apply to the RDD is the flat map transformation. Transformations come in many flavors, but the most common are as follows:

  • map() applies a function to each element in the RDD and returns an RDD of the result.
  • flatMap() is similar to a map() in that it applies a function individually to each element in the RDD. But rather than returning a single element it returns an iterator with the return values. Spark then flattens the iterators of return values into one large result.
  • filter() returns an RDD that contains only those elements that match the specified filter criteria.
  • distinct() returns an RDD with only distinct or unique elements -- it removes any duplicates.
  • union() is executed on an RDD in order to return a new RDD that contains the union (set operation) of it and another RDD. The result contains all elements from both RDDs.
  • intersection() returns an RDD that contains the intersection between two RDDs. The result contains only those elements that are in both RDDs.
  • subtract() removes the elements that are in one RDD from another RDD.
  • cartesian() computes the cartesian product between two RDDs; note that this transformation should be used very cautiously because the result could consume a lot of memory!

The flatMap() transformation in Listing 2 returns an RDD that contains one element for each word, split by a space character. The flatMap() method expects a function that accepts a String and returns an Iterable interface to a collection of Strings.

WordCount in Java 8

In the Java 7 example, we create an anonymous inner class of type FlatMapFunction and override its call() method. The call() method is passed the input String and returns an Iterable reference to the results. The Java 7 example leverages the Arrays class's asList() method to create an Iterable interface to the String[], returned by the String's split() method. In the Java 8 example we use a lambda expression to create the same function without creating the anonymous inner class:


s -> Arrays.asList( s.split( " " ) )

Given an input s, this function splits s into words separated by spaces, and wraps the resultant String[] into an Iterable collection by calling Arrays.asList(). You can see that this is the exact same implementation, but it's much more succinct.

Spark transformations

At this point we have an RDD that contains all of our words. Our next step is to reduce the words RDD into a collection of RDD pairs that map each distinct word to a count of 1, then we'll count the words. The mapToPair() method iterates over every element in the RDD and executes a PairFunction on the element. The PairFunction implements a call() method that accepts an input String (the word from the previous step) and returns a Tuple2 instance.

1 2 Page 1
Notice to our Readers
We're now using social media to take your comments and feedback. Learn more about this here.