Getting Started

Getting Started will guide you through the process of creating a simple Crunch pipeline to count the words in a text document, which is the Hello World of distributed computing. Along the way, we'll explain the core Crunch concepts and how to use them to create effective and efficient data pipelines.

Overview

The Apache Crunch project develops and supports Java APIs that simplify the process of creating data pipelines on top of Apache Hadoop. The Crunch APIs are modeled after FlumeJava (PDF), which is the library that Google uses for building data pipelines on top of their own implementation of MapReduce.

One of the most common questions we hear is how Crunch compares to other projects that provide abstractions on top of MapReduce, such as Apache Pig, Apache Hive, and Cascading.

  1. Developer focused. Apache Hive and Apache Pig were built to make MapReduce accessible to data analysts with limited experience in Java programming. Crunch was designed for developers who understand Java and want to use MapReduce effectively in order to write fast, reliable applications that need to meet tight SLAs. Crunch is often used in conjunction with Hive and Pig; a Crunch pipeline written by the development team sessionizes a set of user logs generates are then processed by a diverse collection of Pig scripts and Hive queries written by analysts.
  2. Minimal abstractions. Crunch pipelines provide a thin veneer on top of MapReduce. Developers have access to low-level MapReduce APIs whenever they need them. This mimimalism also means that Crunch is extremely fast, only slightly slower than a hand-tuned pipeline developed with the MapReduce APIs, and the community is working on making it faster all the time. That said, one of the goals of the project is portability, and the abstractions that Crunch provides are designed to ease the transition from Hadoop 1.0 to Hadoop 2.0 and to provide transparent support for future data processing frameworks that run on Hadoop, including Apache Spark and Apache Tez.
  3. Flexible Data Model. Hive, Pig, and Cascading all use a tuple-centric data model that works best when your input data can be represented using a named collection of scalar values, much like the rows of a database table. Crunch allows developers considerable flexibility in how they represent their data, which makes Crunch the best pipeline platform for developers working with complex structures like Apache Avro records or protocol buffers, geospatial and time series data, and data stored in Apache HBase tables.

Which Version of Crunch Do I Need?

The core libraries are primarily developed against Hadoop 1.1.2, and are also tested against Hadoop 2.2.0. They should work with any version of Hadoop 1.x after 1.0.3 and any version of Hadoop 2.x after 2.0.0-alpha, although you should note that some of Hadoop 2.x's dependencies changed between 2.0.4-alpha and 2.2.0 (for example, the protocol buffer library switched from 2.4.1 to 2.5.0.) Crunch is also known to work with distributions from vendors like Cloudera, Hortonworks, and IBM. The Crunch libraries are not compatible with version of Hadoop prior to 1.x, such as 0.20.2.

If you're using the crunch-hbase library, please note that Crunch 0.9.0 switched to using HBase 0.96.0, while all prior versions of crunch-hbase were developed against HBase 0.94.3.

Here are all of the currently recommended Crunch versions in one convenient table:

Hadoop Versions HBase Versions Recommended Crunch Version
1.x 0.94.x 0.8.4
2.x 0.94.x 0.8.4-hadoop2
1.x 0.96.x 0.11.0
2.x 0.96.x 0.11.0-hadoop2

Maven Dependencies

The Crunch project provides Maven artifacts on Maven Central of the form:

  <dependency>
    <groupId>org.apache.crunch</groupId>
    <artifactId>crunch-core</artifactId>
    <version>${crunch.version}</version>
  </dependency>

The crunch-core artifact contains the core libraries for planning and executing MapReduce pipelines. Depending on your use case, you may also find the following artifacts useful:

  • crunch-test: Helper classes for integration testing of Crunch pipelines
  • crunch-hbase: Utilities for pipelines that read/write data to Apache HBase
  • crunch-scrunch: Scrunch, a Scala API for Crunch
  • crunch-spark: Executes Crunch pipelines using Apache Spark
  • crunch-contrib: Extra Crunch libraries for text processing, JDBC connections, and BloomFilters.
  • crunch-examples: Example MapReduce and HBase pipelines
  • crunch-archetype: A Maven archetype for creating new Crunch pipeline projects

Building From Source

You can download the most recently released Crunch libraries from the Download page or from the Maven Central Repository.

If you prefer, you can also build the Crunch libraries from the source code using Maven and install it in your local repository:

mvn clean install

This also runs the integration test suite which will take a while to complete; you can skip them by running with the -DskipTests option. If you are planning to run Crunch against Hadoop 2.x, you should also specify -Dcrunch.platform=2.

After you have built Crunch, you can run the bundled example applications such as WordCount:

hadoop jar crunch-examples/target/crunch-examples-*-job.jar org.apache.crunch.examples.WordCount <inputfile> <outputdir>

There are three additional examples in the org.apache.crunch.examples package: AverageBytesByIP, TotalBytesByIP, and WordAggregationHBase. AverageBytesByIP and TotalBytesByIP take as input a file in the Common Log Format (an example is provided in crunch-examples/src/main/resources/access_logs.tar.gz.) The WordAggregationHBase requires an Apache HBase cluster to be available, but creates tables and loads sample data as part of its run.

Your First Crunch Pipeline

There are a couple of ways to get started with Crunch. If you use Git, you can clone this project which contains an example Crunch pipeline:

git clone http://github.com/jwills/crunch-demo

You can also use the following Maven archetype, which will generate the same code as the example and allow you to choose a different version of Crunch. Enter the following command and answer the questions as shown below:

$ mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype
[...]
1: remote -> org.apache.crunch:crunch-archetype (Create a basic, self-contained job with the core library.)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1
[CHOOSE THE VERSION OF CRUNCH YOU WANT TO USE]
Define value for property 'groupId': : com.example
Define value for property 'artifactId': : crunch-demo
Define value for property 'version':  1.0-SNAPSHOT: : [HIT ENTER]
Define value for property 'package':  com.example: : [HIT ENTER]
Confirm properties configuration:
groupId: com.example
artifactId: crunch-demo
version: 1.0-SNAPSHOT
package: com.example
 Y: : [HIT ENTER]
[...]
$

The example Maven project contains an example application that counts word frequencies in text files:

$ cd crunch-demo
$ tree
.
|-- pom.xml
`-- src
    |-- main
    |   |-- assembly
    |   |   `-- hadoop-job.xml
    |   `-- java
    |       `-- com
    |           `-- example
    |               |-- StopWordFilter.java
    |               |-- Tokenizer.java
    |               `-- WordCount.java
    `-- test
        `-- java
            `-- com
                `-- example
                    |-- StopWordFilterTest.java
                    `-- TokenizerTest.java

The WordCount.java file contains the main class that defines a pipeline application which is referenced from pom.xml.

Build the code:

$ mvn package

Your packaged application is created in the target directory. The build process uses Maven's assembly plugin with some configuration in hadoop-job.xml to create a special JAR file (suffix -job.jar). Depending on your Hadoop configuration, you can run it locally or on a cluster using Hadoop's launcher script:

$ hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar <in> <out>

The <in> parameter references a text file or a directory containing text files, while <out> is a directory where the pipeline writes the final results to.

The library also supports running applications from within an IDE, either as standalone Java applications or from unit tests. All required dependencies are on Maven's classpath so you can run the WordCount class directly without any additional setup.

Walking Through The WordCount Example

Let's walk through the run method of the WordCount example line by line and explain the data processing concepts we encounter.

Our WordCount application starts out with a main method that should be familiar to most MapReduce developers:

public class WordCount extends Configured implements Tool {
  public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new WordCount(), args);
  }

The WordCount class extends Configured and implements Tool, which allows us to use Hadoop's ToolRunner class to parse the standard commandline arguments for MapReduce jobs and make them available to the WordCount class via the getConf() method that is inherited from Configured. This is an easy way to allow us to override Hadoop configuration parameters for our job from the commandline, without having to update and recompile our application.

The Crunch-specific bits are introduced in the run method, just after the commandline argument parsing is completed:

  String inputPath = args[0];
  String outputPath = args[1];

  // Create an object to coordinate pipeline creation and execution.
  Pipeline pipeline = new MRPipeline(WordCount.class, getConf());

Every Crunch job begins with a Pipeline instance that manages the execution lifecycle of your data pipeline. As of the 0.9.0 release, there are three implementations of the Pipeline interface:

  1. MRPipeline: Executes a pipeline as a series of MapReduce jobs that can run locally or on a Hadoop cluster.
  2. MemPipeline: Executes a pipeline in-memory on the client.
  3. SparkPipeline: Executes a pipeline by running a series of Apache Spark jobs, either locally or on a Hadoop cluster.

The MemPipeline is most useful when you are initially developing and testing the logic of your pipeline on small, local data sets. The MRPipeline is the oldest and most robust of the Pipeline implementations for processing large amounts of data. The SparkPipeline is the newest implementation and leverages features of the underlying Spark engine that should allow it to run substantially faster than the MRPipeline, especially when your problem requires running many iterations over the same data. You can read more about the properties and configuration options of the different Pipeline implementations in this section of the user guide.

Once we've created our Pipeline instance, we need to identify the location and format of the data that our pipeline should process:

  // Reference a given text file as a collection of Strings.
  PCollection<String> lines = pipeline.readTextFile(inputPath);

A PCollection<T> is the core data abstraction of the Crunch API, representing a distributed, immutable collection of records of type T. PCollections are similar to Pig's relations, Hive's tables, or Cascading's Pipes. (The user guide has a table that illustrates how all of the various abstractions used by Crunch, Pig, Hive, and Cascading are related to each other.)

In this example, we create a PCollection of Strings from the input text file by using the readTextFile(String path) convenience method on the Pipeline interface, but we can create PCollections from any kind of Hadoop InputFormat. The Crunch API defines an interface, Source<T>, that wraps an InputFormat with any additional configuration information needed to read that InputFormat into a pipeline, such as the path(s) to read data from. A single Pipeline instance can read data from multiple Sources, which makes it convenient to use Crunch to join data from multiple sources together. You can read more about Sources, such as which ones are provided by the Crunch libraries and how to write your own, in this section of the user guide.

After we've specified our input source, we indicate the processing that we want to perform on the records in the PCollection:

  // Define a function that splits each line in a PCollection of Strings into
  // a PCollection made up of the individual words in the file.
  // The second argument sets the serialization format.
  PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());

The Tokenizer instance in this snippet is a subclass of Crunch's DoFn<S, T> class. DoFns are used by Crunch in the same way that MapReduce uses the Mapper or Reducer classes, but instead of overriding a map or reduce method, DoFns have an abstract void process(S input, Emitter<T> emitter) method that subclasses override to emit zero or more output records for each input record. The Tokenizer class is a simple example of a DoFn that parses a line of text and emits the individual word tokens:

public class Tokenizer extends DoFn<String, String> {
  private static final Splitter SPLITTER = Splitter.onPattern("\\s+").omitEmptyStrings();

  @Override
  public void process(String line, Emitter<String> emitter) {
    for (String word : SPLITTER.split(line)) {
      emitter.emit(word);
    }
  }
}

To apply a DoFn to a PCollection, we use the PCollection's parallelDo(DoFn<S, T> doFn, PType<T> ptype) method, which returns a new PCollection<T> instance (remember, PCollections are immutable.) The PType<T> interface is a description of how to serialize the records in a PCollection, and is used by the Crunch runtime whenever it need to checkpoint or shuffle the data in a PCollection. Crunch provides two different serialization frameworks with a number of convenience methods for defining PTypes; one is based on Hadoop's Writable interface, and the second is based on the Apache Avro serialization project. You can read more about data serialization for Crunch pipelines in this section of the user guide.

Although we can think of DoFns as analogous to Hadoop's Mapper or Reducer classes, Crunch can execute an individual DoFn in either the map or reduce phase of a MapReduce job, and we also have the option of executing multiple DoFns within a single phase. This allows us to break the logic of our pipeline into smaller DoFns that are easier to test and re-use than monolithic Mapper or Reducer classes. The Crunch APIs also provide several specializations of DoFns for common data processing tasks, such as filtering input records based on a boolean condition:

  // Take the collection of words and remove known stop words.
  PCollection<String> noStopWords = words.filter(new StopWordFilter());

This snippet references a StopWordFilter instance, which is a subclass of Crunch's FilterFn<S>. A FilterFn is a subclass of DoFn that implements DoFn's process method by referencing an abstract public boolean accept(S input) method. The StopWordFilter class implements the accept method by comparing the input word to a set of stop words:

public class StopWordFilter extends FilterFn<String> {
  // English stop words, borrowed from Lucene.
  private static final Set<String> STOP_WORDS = ImmutableSet.copyOf(new String[] {
      "a", "and", "are", "as", "at", "be", "but", "by",
      "for", "if", "in", "into", "is", "it",
      "no", "not", "of", "on", "or", "s", "such",
      "t", "that", "the", "their", "then", "there", "these",
      "they", "this", "to", "was", "will", "with"
  });

  @Override
  public boolean accept(String word) {
    return !STOP_WORDS.contains(word);
  }
}

The Crunch libraries have a number of specialized implementations of DoFn and associated methods for PCollection that can clarify the intent of the steps in your data processing pipelines. You can review these convenience classes in this section of the user guide.

Now that we have our filtered list of tokens from the input file, we would like to count how often each word occurs. Crunch provides a PTable<S, Long> count() method on PCollection<S> that makes this kind of aggregation very easy:

  // The count method applies a series of Crunch primitives and returns
  // a map of the unique words in the input PCollection to their counts.
  PTable<String, Long> counts = noStopWords.count();

A PTable<K, V> is a sub-interface of PCollection that provides a number of methods for working with PCollections of key-value pairs. Crunch provides a large collection of patterns for performing common data processing tasks, including joins, cogroups, sorting, set operations, and aggregations. A complete list of Crunch's built-in patterns is provided in this section of the user guide, which also contains information on how you can define your own custom operations in terms of Crunch's data processing primitives.

Finally, we'd like to store the output of our pipeline run to the location specified by the outputPath argument:

  // Instruct the pipeline to write the resulting counts to a text file.
  pipeline.writeTextFile(counts, outputPath);

Just as Pipeline has a convenience method for reading in text files, the writeTextFile(PCollection<S> data, String output) method is a convenient way to output text pipelines. The analog of Crunch's Source interface for writing output data using Hadoop OutputFormats is the Target interface. Just as a single Pipeline instance can read data from multiple Sources, a Pipeline may also write multiple outputs for each PCollection. You can read about Crunch's built-in Targets, patterns for creating your own custom Targets, and support for output options like checkpointing in this section of the user guide.

Although we have fully specified all of the stages in our data pipeline, Crunch hasn't actually done any data processing yet. Both the MRPipeline and SparkPipeline use a lazy execution model, which means that no jobs will be started until the client is ready for them to run. The Pipeline interface declares a number of methods for signalling that jobs should start running, including the run() method that blocks the client until the job finishes, the done() method which calls run() and also cleans up any temporary files the Crunch execution engine creates, and the runAsync() method that kicks off the jobs and returns a handler that can be used for monitoring their progress. In this example, we aren't doing any additional processing, and so we'll call Pipeline's done method to signal that Crunch should plan and execute our MapReduce jobs and then do any necessary cleanup:

  // Execute the pipeline as a MapReduce.
  PipelineResult result = pipeline.done();
  return result.succeeded() ? 0 : 1;

The PipelineResult instance has methods that indicate whether the jobs that were run as part of the pipeline succeeded or failed and also contains statistics and the Hadoop Counters associated with the individual jobs. You can get more details on how to manage pipeline runs in this section of the user guide.

We hope you enjoyed your first walk through a Crunch pipeline. You can get more detailed information about developing pipelines with the Crunch libraries in the user guide, and you are also welcome to ask questions or report any problems you have on the project's mailing list.