You can download the most recently released Crunch libraries from the Download page or from the Maven Central Repository.
If you prefer, you can also build the Crunch libraries from the source code using Maven and install it in your local repository:
mvn clean install
This also runs the integration test suite which will take a while to complete. Afterwards you can run the bundled example applications such as WordCount:
hadoop jar crunch-examples/target/crunch-examples-*-job.jar org.apache.crunch.examples.WordCount <inputfile> <outputdir>
There are three additional examples in the org.apache.crunch.examples package AverageBytesByIP, TotalBytesByIP, and WordAggregationHBase. AverageBytesByIP and TotalBytesByIP take as input Common Log Format of which there is a sample included: crunch-examples/src/main/resources/access_logs.tar.gz. WordAggregationHBase requires an Apache HBase cluster but no input data.
The Java API is centered around three interfaces that represent distributed datasets: PCollection<T>, PTable<K, V>, and PGroupedTable<K, V>.
A PCollection<T> represents a distributed, unordered collection of elements of type T. For example, we represent a text file as a
PCollection<String> object. PCollection provides a method, parallelDo, that applies a function to each element in a PCollection in parallel,
and returns a new PCollection as its result.
A PTable<K, V> is a sub-interface of PCollection that represents a distributed, unordered multimap of its key type K to its value type V.
In addition to the parallelDo operation, PTable provides a groupByKey operation that aggregates all of the values in the PTable that
have the same key into a single record. It is the groupByKey operation that triggers the sort phase of a MapReduce job.
The result of a groupByKey operation is a PGroupedTable<K, V> object, which is a distributed, sorted map of keys of type K to an Iterable
collection of values of type V. In addition to parallelDo, the PGroupedTable provides a combineValues operation, which allows for
a commutative and associative aggregation operator to be applied to the values of the PGroupedTable instance on both the map side and the
reduce side of a MapReduce job.
Finally, PCollection, PTable, and PGroupedTable all support a union operation, which takes a series of distinct PCollections and treats
them as a single, virtual PCollection. The union operator is required for operations that combine multiple inputs, such as cogroups and
joins.
Every pipeline starts with a Pipeline object that is used to coordinate building the pipeline and executing the underlying MapReduce
jobs. For efficiency, the Crunch planner uses lazy evaluation, so it will only construct MapReduce jobs from the different stages of the pipelines when
the Pipeline object's run or done methods are called.
Here is the classic WordCount application using the APIs:
import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.types.writable.Writables; public class WordCount { public static void main(String[] args) throws Exception { Pipeline pipeline = new MRPipeline(WordCount.class); PCollection<String> lines = pipeline.readTextFile(args[0]); PCollection<String> words = lines.parallelDo("my splitter", new DoFn<String, String>() { public void process(String line, Emitter<String> emitter) { for (String word : line.split("\\s+")) { emitter.emit(word); } } }, Writables.strings()); PTable<String, Long> counts = Aggregate.count(words); pipeline.writeTextFile(counts, args[1]); pipeline.run(); } }
Let's walk through the example line by line.
The MRPipeline implementation of the Pipeline interface compiles the individual stages of a
pipeline into a series of MapReduce jobs. The MRPipeline constructor takes a class argument
that is used to tell Hadoop where to find the code that is used in the pipeline execution.
We now need to tell the Pipeline about the inputs it will be consuming. The Pipeline interface
defines a readTextFile method that takes in a String and returns a PCollection of Strings.
In addition to text files, the library supports reading data from SequenceFiles and Avro container files,
via the SequenceFileSource and AvroFileSource classes defined in the org.apache.crunch.io package.
Note that each PCollection is a reference to a source of data- no data is actually loaded into a PCollection on the client machine.
The Crunch library defines a small set of primitive operations that can be composed in order to build complex data
pipelines. The first of these primitives is the parallelDo function, which applies a function (defined
by a subclass of DoFn) to every record in a PCollection, and returns a new PCollection that contains
the results.
The first argument to parallelDo is a string that is used to identify this step in the pipeline. When a pipeline is composed into a series of MapReduce jobs, it is often the case that multiple stages will run within the same Mapper or Reducer. Having a string that identifies each processing step is useful for debugging errors that occur in a running pipeline.
The second argument to parallelDo is an anonymous subclass of DoFn. Each DoFn subclass must override
the process method, which takes in a record from the input PCollection and an Emitter object that
may have any number of output values written to it. In this case, our DoFn splits each lines up into
words, using a blank space as a separator, and emits the words from the split to the output PCollection.
The last argument to parallelDo is an instance of the PType interface, which specifies how the data
in the output PCollection is serialized. While the API takes advantage of Java Generics to provide
compile-time type safety, the generic type information is not available at runtime. The job planner needs to know
how to map the records stored in each PCollection into a Hadoop-supported serialization format in order
to read and write data to disk. Two serialization implementations are supported in crunch via the
PTypeFamily interface: a Writable-based system that is defined in the org.apache.crunch.types.writable
package, and an Avro-based system that is defined in the org.apache.crunch.types.avro package. Each
implementation provides convenience methods for working with the common PTypes (Strings, longs, bytes, etc.)
as well as utility methods for creating PTypes from existing Writable classes or Avro schemas.
Out of the simple primitive operations, we can build arbitrarily complex chains of operations in order
to perform higher-level operations, like aggregations and joins, that can work on any type of input data.
Let's look at the implementation of the Aggregate.count function:
package org.apache.crunch.lib; import org.apache.crunch.CombineFn; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PGroupedTable; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.types.PTypeFamily; public class Aggregate { private static class Counter<S> extends MapFn<S, Pair<S, Long>> { public Pair<S, Long> map(S input) { return Pair.of(input, 1L); } } public static <S> PTable<S, Long> count(PCollection<S> collect) { PTypeFamily tf = collect.getTypeFamily(); // Create a PTable from the PCollection by mapping each element // to a key of the PTable with the value equal to 1L PTable<S, Long> withCounts = collect.parallelDo("count:" + collect.getName(), new Counter<S>(), tf.tableOf(collect.getPType(), tf.longs())); // Group the records of the PTable based on their key. PGroupedTable<S, Long> grouped = withCounts.groupByKey(); // Sum the 1L values associated with the keys to get the // count of each element in this PCollection, and return it // as a PTable so that it may be processed further or written // out for storage. return grouped.combineValues(CombineFn.<S>SUM_LONGS()); } }
First, we get the PTypeFamily that is associated with the PType for the collection. The
call to parallelDo converts each record in this PCollection into a Pair of the input record
and the number one by extending the MapFn convenience subclass of DoFn, and uses the
tableOf method of the PTypeFamily to specify that the returned PCollection should be a
PTable instance, with the key being the PType of the PCollection and the value being the Long
implementation for this PTypeFamily.
The next line features the second of the four primary operations, groupByKey. The groupByKey
operation may only be applied to a PTable, and returns an instance of the PGroupedTable
interface, which references the grouping of all of the values in the PTable that have the same key.
The groupByKey operation is what triggers the reduce phase of a MapReduce.
The last line in the function returns the output of the third of the four primary operations,
combineValues. The combineValues operator takes a CombineFn as an argument, which is a
specialized subclass of DoFn that operates on an implementation of Java's Iterable interface. The
use of combineValues (as opposed to parallelDo) signals to the planner that the CombineFn may be used to
aggregate values for the same key on the map side of a MapReduce job as well as the reduce side.
The Pipeline object also provides a writeTextFile convenience method for indicating that a
PCollection should be written to a text file. There are also output targets for SequenceFiles and
Avro container files, available in the org.apache.crunch.io package.
After you are finished constructing a pipeline and specifying the output destinations, call the
pipeline's blocking run method in order to compile the pipeline into one or more MapReduce
jobs and execute them.