Getting Started will guide you through the process of creating a simple Crunch pipeline to count the words in a text document, which is the Hello World of distributed computing. Along the way, we'll explain the core Crunch concepts and how to use them to create effective and efficient data pipelines.
The Apache Crunch project develops and supports Java APIs that simplify the process of creating data pipelines on top of Apache Hadoop. The Crunch APIs are modeled after FlumeJava (PDF), which is the library that Google uses for building data pipelines on top of their own implementation of MapReduce.
One of the most common questions we hear is how Crunch compares to other projects that provide abstractions on top of MapReduce, such as Apache Pig, Apache Hive, and Cascading.
The core libraries are primarily developed against Hadoop 1.1.2, and are also tested against Hadoop 2.2.0. They should work with any version of Hadoop 1.x after 1.0.3 and any version of Hadoop 2.x after 2.0.0-alpha, although you should note that some of Hadoop 2.x's dependencies changed between 2.0.4-alpha and 2.2.0 (for example, the protocol buffer library switched from 2.4.1 to 2.5.0.) Crunch is also known to work with distributions from vendors like Cloudera, Hortonworks, and IBM. The Crunch libraries are not compatible with version of Hadoop prior to 1.x, such as 0.20.2.
If you're using the crunch-hbase library, please note that Crunch 0.9.0 switched to using HBase 0.96.0, while all prior versions of crunch-hbase were developed against HBase 0.94.3.
Here are all of the currently recommended Crunch versions in one convenient table:
Hadoop Versions | HBase Versions | Recommended Crunch Version |
1.x | 0.96.x | 0.12.0 |
2.x | 1.0 | 0.14.0 |
The Crunch project provides Maven artifacts on Maven Central of the form:
<dependency> <groupId>org.apache.crunch</groupId> <artifactId>crunch-core</artifactId> <version>${crunch.version}</version> </dependency>
The crunch-core
artifact contains the core libraries for planning and executing MapReduce
pipelines. Depending on your use case, you may also find the following artifacts useful:
crunch-test
: Helper classes for integration testing of Crunch pipelinescrunch-hbase
: Utilities for pipelines that read/write data to Apache HBasecrunch-scrunch
: Scrunch, a Scala API for Crunchcrunch-spark
: Executes Crunch pipelines using Apache Sparkcrunch-contrib
: Extra Crunch libraries for text processing, JDBC connections, and BloomFilters.crunch-examples
: Example MapReduce and HBase pipelinescrunch-archetype
: A Maven archetype for creating new Crunch pipeline projectsYou can download the most recently released Crunch libraries from the Download page or from the Maven Central Repository.
If you prefer, you can also build the Crunch libraries from the source code using Maven and install it in your local repository:
mvn clean install
This also runs the integration test suite which will take a while to complete; you can skip them by running with the
-DskipTests
option. If you are planning to run Crunch against Hadoop 2.x, you should also specify -Dcrunch.platform=2
.
After you have built Crunch, you can run the bundled example applications such as WordCount:
hadoop jar crunch-examples/target/crunch-examples-*-job.jar org.apache.crunch.examples.WordCount <inputfile> <outputdir>
There are three additional examples in the org.apache.crunch.examples package: AverageBytesByIP, TotalBytesByIP, and WordAggregationHBase.
AverageBytesByIP and TotalBytesByIP take as input a file in the Common Log Format (an example is provided in
crunch-examples/src/main/resources/access_logs.tar.gz
.) The WordAggregationHBase requires an Apache HBase cluster to be
available, but creates tables and loads sample data as part of its run.
There are a couple of ways to get started with Crunch. If you use Git, you can clone this project which contains an example Crunch pipeline:
git clone http://github.com/jwills/crunch-demo
You can also use the following Maven archetype, which will generate the same code as the example and allow you to choose a different version of Crunch. Enter the following command and answer the questions as shown below:
$ mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype [...] 1: remote -> org.apache.crunch:crunch-archetype (Create a basic, self-contained job with the core library.) Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1 [CHOOSE THE VERSION OF CRUNCH YOU WANT TO USE] Define value for property 'groupId': : com.example Define value for property 'artifactId': : crunch-demo Define value for property 'version': 1.0-SNAPSHOT: : [HIT ENTER] Define value for property 'package': com.example: : [HIT ENTER] Confirm properties configuration: groupId: com.example artifactId: crunch-demo version: 1.0-SNAPSHOT package: com.example Y: : [HIT ENTER] [...] $
The example Maven project contains an example application that counts word frequencies in text files:
$ cd crunch-demo $ tree . |-- pom.xml `-- src |-- main | |-- assembly | | `-- hadoop-job.xml | `-- java | `-- com | `-- example | |-- StopWordFilter.java | |-- Tokenizer.java | `-- WordCount.java `-- test `-- java `-- com `-- example |-- StopWordFilterTest.java `-- TokenizerTest.java
The WordCount.java
file contains the main class that defines a pipeline
application which is referenced from pom.xml
.
Build the code:
$ mvn package
Your packaged application is created in the target
directory. The build
process uses Maven's assembly plugin with some configuration in
hadoop-job.xml
to create a special JAR file (suffix -job.jar
).
Depending on your Hadoop configuration, you can run it locally or on a
cluster using Hadoop's launcher script:
$ hadoop jar target/crunch-demo-1.0-SNAPSHOT-job.jar <in> <out>
The <in>
parameter references a text file or a directory containing text
files, while <out>
is a directory where the pipeline writes the final results to.
The library also supports running applications from within an IDE, either as standalone
Java applications or from unit tests. All required dependencies are on Maven's
classpath so you can run the WordCount
class directly without any additional
setup.
Let's walk through the run
method of the WordCount
example line by line and explain the
data processing concepts we encounter.
Our WordCount application starts out with a main
method that should be familiar to most
MapReduce developers:
public class WordCount extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new WordCount(), args); }
The WordCount
class extends Configured
and implements Tool
, which allows us to use
Hadoop's ToolRunner
class to parse the standard commandline arguments for MapReduce jobs
and make them available to the WordCount
class via the getConf()
method that is inherited
from Configured
. This is an easy way to allow us to override Hadoop configuration parameters
for our job from the commandline, without having to update and recompile our application.
The Crunch-specific bits are introduced in the run
method, just after the commandline argument
parsing is completed:
String inputPath = args[0]; String outputPath = args[1]; // Create an object to coordinate pipeline creation and execution. Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
Every Crunch job begins with a Pipeline
instance that manages the execution lifecycle of your data
pipeline. As of the 0.9.0 release, there are three implementations of the Pipeline interface:
MRPipeline
: Executes a pipeline as a series of MapReduce jobs that can run locally or on a Hadoop cluster.MemPipeline
: Executes a pipeline in-memory on the client.SparkPipeline
: Executes a pipeline by running a series of Apache Spark
jobs, either locally or on a Hadoop cluster.The MemPipeline is most useful when you are initially developing and testing the logic of your pipeline on small, local data sets. The MRPipeline is the oldest and most robust of the Pipeline implementations for processing large amounts of data. The SparkPipeline is the newest implementation and leverages features of the underlying Spark engine that should allow it to run substantially faster than the MRPipeline, especially when your problem requires running many iterations over the same data. You can read more about the properties and configuration options of the different Pipeline implementations in this section of the user guide.
Once we've created our Pipeline instance, we need to identify the location and format of the data that our pipeline should process:
// Reference a given text file as a collection of Strings. PCollection<String> lines = pipeline.readTextFile(inputPath);
A PCollection<T>
is the core data abstraction of the Crunch API, representing a distributed, immutable collection of records of type T
.
PCollections are similar to Pig's relations, Hive's tables, or Cascading's Pipes. (The user guide has a table that illustrates how all of the various abstractions used by Crunch, Pig, Hive, and Cascading are related to each other.)
In this example, we create a PCollection of Strings from the input text file by using the readTextFile(String path)
convenience method on the Pipeline interface, but we can create PCollections from any kind of Hadoop InputFormat
. The Crunch
API defines an interface, Source<T>
, that wraps an InputFormat with any additional configuration information needed
to read that InputFormat into a pipeline, such as the path(s) to read data from. A single Pipeline instance can read data from
multiple Sources, which makes it convenient to use Crunch to join data from multiple sources together. You can read more about Sources,
such as which ones are provided by the Crunch libraries and how to write your own, in this section of the user guide.
After we've specified our input source, we indicate the processing that we want to perform on the records in the PCollection:
// Define a function that splits each line in a PCollection of Strings into // a PCollection made up of the individual words in the file. // The second argument sets the serialization format. PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
The Tokenizer
instance in this snippet is a subclass of Crunch's DoFn<S, T>
class. DoFns are used
by Crunch in the same way that MapReduce uses the Mapper or Reducer classes, but instead of
overriding a map
or reduce
method, DoFns have an abstract void process(S input, Emitter<T> emitter)
method
that subclasses override to emit zero or more output records for each input record. The Tokenizer class is a
simple example of a DoFn that parses a line of text and emits the individual word tokens:
public class Tokenizer extends DoFn<String, String> { private static final Splitter SPLITTER = Splitter.onPattern("\\s+").omitEmptyStrings(); @Override public void process(String line, Emitter<String> emitter) { for (String word : SPLITTER.split(line)) { emitter.emit(word); } } }
To apply a DoFn to a PCollection, we use the PCollection's parallelDo(DoFn<S, T> doFn, PType<T> ptype)
method, which
returns a new PCollection<T>
instance (remember, PCollections are immutable.) The PType<T>
interface is a description
of how to serialize the records in a PCollection, and is used by the Crunch runtime whenever it need to checkpoint or
shuffle the data in a PCollection. Crunch provides two different serialization frameworks with a number of convenience methods
for defining PTypes; one is based on Hadoop's Writable
interface, and the second is based on the Apache Avro
serialization
project. You can read more about data serialization for Crunch pipelines in this section of the user guide.
Although we can think of DoFns as analogous to Hadoop's Mapper or Reducer classes, Crunch can execute an individual DoFn in either the map or reduce phase of a MapReduce job, and we also have the option of executing multiple DoFns within a single phase. This allows us to break the logic of our pipeline into smaller DoFns that are easier to test and re-use than monolithic Mapper or Reducer classes. The Crunch APIs also provide several specializations of DoFns for common data processing tasks, such as filtering input records based on a boolean condition:
// Take the collection of words and remove known stop words. PCollection<String> noStopWords = words.filter(new StopWordFilter());
This snippet references a StopWordFilter
instance, which is a subclass of Crunch's FilterFn<S>
. A FilterFn is a subclass
of DoFn that implements DoFn's process method by referencing an abstract public boolean accept(S input)
method. The
StopWordFilter class implements the accept method by comparing the input word to a set of stop words:
public class StopWordFilter extends FilterFn<String> { // English stop words, borrowed from Lucene. private static final Set<String> STOP_WORDS = ImmutableSet.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no", "not", "of", "on", "or", "s", "such", "t", "that", "the", "their", "then", "there", "these", "they", "this", "to", "was", "will", "with" }); @Override public boolean accept(String word) { return !STOP_WORDS.contains(word); } }
The Crunch libraries have a number of specialized implementations of DoFn and associated methods for PCollection that can clarify the intent of the steps in your data processing pipelines. You can review these convenience classes in this section of the user guide.
Now that we have our filtered list of tokens from the input file, we would like to count how often each word occurs.
Crunch provides a PTable<S, Long> count()
method on PCollection<S>
that makes this kind of aggregation very easy:
// The count method applies a series of Crunch primitives and returns // a map of the unique words in the input PCollection to their counts. PTable<String, Long> counts = noStopWords.count();
A PTable<K, V>
is a sub-interface of PCollection that provides a number of methods for working with
PCollections of key-value pairs. Crunch provides a large collection of patterns for performing common data processing
tasks, including joins, cogroups, sorting, set operations, and aggregations. A complete list of Crunch's built-in
patterns is provided in this section of the user guide, which also contains information on how you can define your own custom operations in terms of Crunch's data processing primitives.
Finally, we'd like to store the output of our pipeline run to the location specified by the outputPath
argument:
// Instruct the pipeline to write the resulting counts to a text file. pipeline.writeTextFile(counts, outputPath);
Just as Pipeline has a convenience method for reading in text files, the writeTextFile(PCollection<S> data, String output)
method is a convenient way to output text pipelines. The analog of Crunch's Source interface for writing output data using
Hadoop OutputFormats
is the Target
interface. Just as a single Pipeline instance can read data from multiple Sources,
a Pipeline may also write multiple outputs for each PCollection. You can read about Crunch's built-in Targets, patterns for
creating your own custom Targets, and support for output options like checkpointing in this section
of the user guide.
Although we have fully specified all of the stages in our data pipeline, Crunch hasn't actually done any data processing
yet. Both the MRPipeline and SparkPipeline use a lazy execution model, which means that no jobs will be started until
the client is ready for them to run. The Pipeline interface declares a number of methods for signalling that jobs should
start running, including the run()
method that blocks the client until the job finishes, the done()
method which calls
run()
and also cleans up any temporary files the Crunch execution engine creates, and the runAsync()
method that kicks
off the jobs and returns a handler that can be used for monitoring their progress. In this example, we aren't doing any
additional processing, and so we'll call Pipeline's done method to signal that Crunch should plan and execute our MapReduce
jobs and then do any necessary cleanup:
// Execute the pipeline as a MapReduce. PipelineResult result = pipeline.done(); return result.succeeded() ? 0 : 1;
The PipelineResult
instance has methods that indicate whether the jobs that were run as part of the pipeline succeeded
or failed and also contains statistics and the Hadoop Counters
associated with the individual jobs. You can get more
details on how to manage pipeline runs in this section of the user guide.
We hope you enjoyed your first walk through a Crunch pipeline. You can get more detailed information about developing pipelines with the Crunch libraries in the user guide, and you are also welcome to ask questions or report any problems you have on the project's mailing list.