1. Introduction
In this tutorial, we’ll demonstrate Apache Crunch with an example data processing application. We’ll run this application using the MapReduce framework.
We’ll start by covering briefly some Apache Crunch concepts. Then we’ll jump into a sample app. In this app we’ll do text processing:
- First of all, we’ll read the lines from a text file
- Later, we’ll split them into words and remove some common words
- Then, we’ll group the remaining words to get a list of unique words and their counts
- Finally, we’ll write this list to a text file
2. What Is Crunch?
MapReduce is a distributed, parallel programming framework for processing large amounts of data on a cluster of servers. Software frameworks such as Hadoop and Spark implement MapReduce.
Crunch provides a framework for writing, testing and running MapReduce pipelines in Java. Here, we don’t write the MapReduce jobs directly. Rather, we define data pipeline (i.e. the operations to perform input, processing, and output steps) using the Crunch APIs. Crunch Planner maps them to the MapReduce jobs and executes them when needed.
Therefore, every Crunch data pipeline is coordinated by an instance of the Pipeline interface. This interface also defines methods for reading data into a pipeline via Source instances and writing data out from a pipeline to Target instances.
We have 3 interfaces for representing data:
- PCollection – an immutable, distributed collection of elements
- PTable<K*, V*> – an immutable, distributed, unordered multi-map of keys and values
- PGroupedTable<K*, V*> – a distributed, sorted map of keys of type K to an Iterable V that may be iterated over exactly once
DoFn is the base class for all data processing functions. It corresponds to Mapper, Reducer and Combiner classes in MapReduce. We spend most of the development time writing and testing logical computations using it*.*
Now that we’re more familiar with Crunch, let’s use it to build the example application.
3. Setting up a Crunch Project
First of all, let’s set up a Crunch Project with Maven. We can do so in two ways:
- Add the required dependencies in the pom.xml file of an existing project
- Use an archetype to generate a starter project
Let’s have a quick look at both approaches.
3.1. Maven Dependencies
In order to add Crunch to an existing project, let’s add the required dependencies in the pom.xml file.
First, let’s add the crunch-core library:
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>1.0.0</version>
</dependency>
Next, let’s add the hadoop-client library to communicate with Hadoop. We use the version matching Hadoop installation:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
We can check Maven Central for the latest versions of crunch-core and hadoop-client libraries.
3.2. Maven Archetype
Another approach is to quickly generate a starter project using the Maven archetype provided by Crunch:
mvn archetype:generate -Dfilter=org.apache.crunch:crunch-archetype
When prompted by the above command, we provide the Crunch version and the project artifact details.
4. Crunch Pipeline Setup
After setting up the project, we need to create a Pipeline object. Crunch has 3 Pipeline implementations:
- MRPipeline – executes within Hadoop MapReduce
- SparkPipeline – executes as a series of Spark pipelines
- MemPipeline – executes in-memory on the client and is useful for unit testing
Usually, we develop and test using an instance of MemPipeline. Later we use an instance of MRPipeline or SparkPipeline for actual execution.
If we needed an in-memory pipeline, we could use the static method getInstance to get the MemPipeline instance:
Pipeline pipeline = MemPipeline.getInstance();
But for now, let’s create an instance of MRPipeline to execute the application with Hadoop*:*
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
5. Read Input Data
After creating the pipeline object, we want to read input data. The Pipeline interface provides a convenience method to read input from a text file, readTextFile(pathName).
Let’s call this method to read the input text file:
PCollection<String> lines = pipeline.readTextFile(inputPath);
The above code reads the text file as a collection of String.
As the next step, let’s write a test case for reading input:
@Test
public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead() {
Pipeline pipeline = MemPipeline.getInstance();
PCollection<String> lines = pipeline.readTextFile(INPUT_FILE_PATH);
assertEquals(21, lines.asCollection()
.getValue()
.size());
}
In this test, we verify that we get the expected number of lines when reading a text file.
6. Data Processing Steps
After reading the input data, we need to process it. Crunch API contains a number of subclasses of DoFn to handle common data processing scenarios:
- FilterFn – filters members of a collection based on a boolean condition
- MapFn – maps each input record to exactly one output record
- CombineFn – combines a number of values into a single value
- JoinFn – performs joins such as inner join, left outer join, right outer join and full outer join
Let’s implement the following data processing logic by using these classes:
- Split each line in the input file into words
- Remove the stop words
- Count the unique words
6.1. Split a Line of Text Into Words
First of all, let’s create the Tokenizer class to split a line into words.
We’ll extend the DoFn class. This class has an abstract method called process. This method processes the input records from a PCollection and sends the output to an Emitter.
We need to implement the splitting logic in this method:
public class Tokenizer extends DoFn<String, String> {
private static final Splitter SPLITTER = Splitter
.onPattern("\\s+")
.omitEmptyStrings();
@Override
public void process(String line, Emitter<String> emitter) {
for (String word : SPLITTER.split(line)) {
emitter.emit(word);
}
}
}
In the above implementation, we’ve used the Splitter class from Guava library to extract words from a line.
Next, let’s write a unit test for the Tokenizer class:
@RunWith(MockitoJUnitRunner.class)
public class TokenizerUnitTest {
@Mock
private Emitter<String> emitter;
@Test
public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted() {
Tokenizer splitter = new Tokenizer();
splitter.process(" hello world ", emitter);
verify(emitter).emit("hello");
verify(emitter).emit("world");
verifyNoMoreInteractions(emitter);
}
}
The above test verifies that the correct words are returned.
Finally, let’s split the lines read from the input text file using this class.
The parallelDo method of PCollection interface applies the given DoFn to all the elements and returns a new PCollection.
Let’s call this method on the lines collection and pass an instance of Tokenizer:
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
As a result, we get the list of words in the input text file. We’ll remove the stop words in the next step.
6.2. Remove Stop Words
Similarly to the previous step, let’s create a StopWordFilter class to filter out stop words.
However, we’ll extend FilterFn instead of DoFn. FilterFn has an abstract method called accept. We need to implement the filtering logic in this method:
public class StopWordFilter extends FilterFn<String> {
// English stop words, borrowed from Lucene.
private static final Set<String> STOP_WORDS = ImmutableSet
.copyOf(new String[] { "a", "and", "are", "as", "at", "be", "but", "by",
"for", "if", "in", "into", "is", "it", "no", "not", "of", "on",
"or", "s", "such", "t", "that", "the", "their", "then", "there",
"these", "they", "this", "to", "was", "will", "with" });
@Override
public boolean accept(String word) {
return !STOP_WORDS.contains(word);
}
}
Next, let’s write the unit test for StopWordFilter class:
public class StopWordFilterUnitTest {
@Test
public void givenFilter_whenStopWordPassed_thenFalseReturned() {
FilterFn<String> filter = new StopWordFilter();
assertFalse(filter.accept("the"));
assertFalse(filter.accept("a"));
}
@Test
public void givenFilter_whenNonStopWordPassed_thenTrueReturned() {
FilterFn<String> filter = new StopWordFilter();
assertTrue(filter.accept("Hello"));
assertTrue(filter.accept("World"));
}
@Test
public void givenWordCollection_whenFiltered_thenStopWordsRemoved() {
PCollection<String> words = MemPipeline
.collectionOf("This", "is", "a", "test", "sentence");
PCollection<String> noStopWords = words.filter(new StopWordFilter());
assertEquals(ImmutableList.of("This", "test", "sentence"),
Lists.newArrayList(noStopWords.materialize()));
}
}
This test verifies that the filtering logic is performed correctly.
Finally, let’s use StopWordFilter to filter the list of words generated in the previous step. The filter method of PCollection interface applies the given FilterFn to all the elements and returns a new PCollection.
Let’s call this method on the words collection and pass an instance of StopWordFilter:
PCollection<String> noStopWords = words.filter(new StopWordFilter());
As a result, we get the filtered collection of words.
6.3. Count Unique Words
After getting the filtered collection of words, we want to count how often each word occurs. PCollection interface has a number of methods to perform common aggregations:
- min – returns the minimum element of the collection
- max – returns the maximum element of the collection
- length – returns the number of elements in the collection
- count – returns a PTable that contains the count of each unique element of the collection
Let’s use the count method to get the unique words along with their counts:
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
7. Specify Output
As a result of the previous steps, we have a table of words and their counts. We want to write this result to a text file. The Pipeline interface provides convenience methods to write output:
void write(PCollection<?> collection, Target target);
void write(PCollection<?> collection, Target target,
Target.WriteMode writeMode);
<T> void writeTextFile(PCollection<T> collection, String pathName);
Therefore, let’s call the writeTextFile method:
pipeline.writeTextFile(counts, outputPath);
8. Manage Pipeline Execution
All the steps so far have just defined the data pipeline. No input has been read or processed. This is because Crunch uses lazy execution model.
It doesn’t run the MapReduce jobs until a method that controls job planning and execution is invoked on the Pipeline interface:
- run – prepares an execution plan to create the required outputs and then executes it synchronously
- done – runs any remaining jobs required to generate outputs and then cleans up any intermediate data files created
- runAsync – similar to run method, but executes in a non-blocking fashion
Therefore, let’s call the done method to execute the pipeline as MapReduce jobs:
PipelineResult result = pipeline.done();
The above statement runs the MapReduce jobs to read input, process them and write the result to the output directory.
9. Putting the Pipeline Together
So far we have developed and unit tested the logic to read input data, process it and write to the output file.
Next, let’s put them together to build the entire data pipeline:
public int run(String[] args) throws Exception {
String inputPath = args[0];
String outputPath = args[1];
// Create an object to coordinate pipeline creation and execution.
Pipeline pipeline = new MRPipeline(WordCount.class, getConf());
// Reference a given text file as a collection of Strings.
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Define a function that splits each line in a PCollection of Strings into
// a PCollection made up of the individual words in the file.
// The second argument sets the serialization format.
PCollection<String> words = lines.parallelDo(new Tokenizer(), Writables.strings());
// Take the collection of words and remove known stop words.
PCollection<String> noStopWords = words.filter(new StopWordFilter());
// The count method applies a series of Crunch primitives and returns
// a map of the unique words in the input PCollection to their counts.
PTable<String, Long> counts = noStopWords.count();
// Instruct the pipeline to write the resulting counts to a text file.
pipeline.writeTextFile(counts, outputPath);
// Execute the pipeline as a MapReduce.
PipelineResult result = pipeline.done();
return result.succeeded() ? 0 : 1;
}
10. Hadoop Launch Configuration
The data pipeline is thus ready.
However, we need the code to launch it. Therefore, let’s write the main method to launch the application:
public class WordCount extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new WordCount(), args);
}
ToolRunner.run parses the Hadoop configuration from the command line and executes the MapReduce job.
11. Run Application
The complete application is now ready. Let’s run the following command to build it:
mvn package
As a result of the above command, we get the packaged application and a special job jar in the target directory.
Let’s use this job jar to execute the application on Hadoop:
hadoop jar target/crunch-1.0-SNAPSHOT-job.jar <input file path> <output directory>
The application reads the input file and writes the result to the output file. The output file contains unique words along with their counts similar to the following:
[Add,1]
[Added,1]
[Admiration,1]
[Admitting,1]
[Allowance,1]
In addition to Hadoop, we can run the application within IDE, as a stand-alone application or as unit tests.
12. Conclusion
In this tutorial, we created a data processing application running on MapReduce. Apache Crunch makes it easy to write, test and execute MapReduce pipelines in Java.
As usual, the full source code can be found over on Github.