1. Overview

Apache Spark is a fast, distributed data processing system. It does in-memory data processing and uses in-memory caching and optimized execution resulting in fast performance. It provides high-level APIs for popular programming languages like Scala, Python, Java, and R.

In this quick tutorial, we’ll go through three of the Spark basic concepts: dataframes, datasets, and RDDs.

2. DataFrame

Spark SQL introduced a tabular data abstraction called a DataFrame since Spark 1.3. Since then, it has become one of the most important features in Spark. This API is useful when we want to handle structured and semi-structured, distributed data.

In section 3, we’ll discuss Resilient Distributed Datasets (RDD). DataFrames store data in a more efficient manner than RDDs, this is because they use the immutable, in-memory, resilient, distributed, and parallel capabilities of RDDs but they also apply a schema to the data. DataFrames also translate SQL code into optimized low-level RDD operations.

We can create DataFrames in three ways:

  • Converting existing RDDs
  • Running SQL queries
  • Loading external data

Spark team introduced SparkSession in version 2.0, it unifies all different contexts assuring developers won’t need to worry about creating different contexts:

SparkSession session = SparkSession.builder()
  .appName("TouristDataFrameExample")
  .master("local[*]")
  .getOrCreate();

DataFrameReader dataFrameReader = session.read();

We’ll be analyzing the Tourist.csv file:

Dataset<Row> data = dataFrameReader.option("header", "true")
  .csv("data/Tourist.csv");

Since Spark 2.0 DataFrame became a Dataset of type Row, so we can use a DataFrame as an alias for a Dataset.

We can select specific columns that we are interested in. We can also filter and group by a given column:

data.select(col("country"), col("year"), col("value"))
  .show();

data.filter(col("country").equalTo("Mexico"))
  .show();

data.groupBy(col("country"))
  .count()
  .show();

3. Datasets

A dataset is a set of strongly-typed, structured data. They provide the familiar object-oriented programming style plus the benefits of type safety since datasets can check syntax and catch errors at compile time.

Dataset is an extension of DataFrame, thus we can consider a DataFrame an untyped view of a dataset.

The Spark team released the Dataset API in Spark 1.6 and as they mentioned: “the goal of Spark Datasets is to provide an API that allows users to easily express transformations on object domains, while also providing the performance and robustness advantages of the Spark SQL execution engine”.

First, we’ll need to create a class of type TouristData:

public class TouristData {
    private String region;
    private String country;
    private String year;
    private String series;
    private Double value;
    private String footnotes;
    private String source;
    // ... getters and setters
}

To map each of our records to the specified type we will need to use an Encoder. Encoders translate between Java objects and Spark’s internal binary format:

// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"), 
  col("country"), col("year"), col("series"), col("value").cast("double"), 
  col("footnotes"), col("source"));

Dataset<TouristData> typedDataset = responseWithSelectedColumns
  .as(Encoders.bean(TouristData.class));

As with DataFrame, we can filter and group by specific columns:

typedDataset.filter((FilterFunction) record -> record.getCountry()
  .equals("Norway"))
  .show();

typedDataset.groupBy(typedDataset.col("country"))
  .count()
  .show();

We can also do operations like filter by column matching a certain range or computing the sum of a specific column, to get the total value of it:

typedDataset.filter((FilterFunction) record -> record.getYear() != null 
  && (Long.valueOf(record.getYear()) > 2010 
  && Long.valueOf(record.getYear()) < 2017)).show();

typedDataset.filter((FilterFunction) record -> record.getValue() != null 
  && record.getSeries()
    .contains("expenditure"))
    .groupBy("country")
    .agg(sum("value"))
    .show();

4. RDDs

The Resilient Distributed Dataset or RDD is Spark’s primary programming abstraction. It represents a collection of elements that is: immutable, resilient, and distributed.

An RDD encapsulates a large dataset, Spark will automatically distribute the data contained in RDDs across our cluster and parallelize the operations we perform on them.

We can create RDDs only through operations of data in stable storage or operations on other RDDs.

Fault tolerance is essential when we deal with large sets of data and the data is distributed on cluster machines. RDDs are resilient because of Spark’s built-in fault recovery mechanics. Spark relies on the fact that RDDs memorize how they were created so that we can easily trace back the lineage to restore the partition.

There are two types of operations we can do on RDDs: Transformations and Actions.

4.1. Transformations

We can apply Transformations to an RDD to manipulate its data. After this manipulation is performed, we’ll get a brand-new RDD, since RDDs are immutable objects.

We’ll check how to implement Map and Filter, two of the most common transformations.

First, we need to create a JavaSparkContext and load the data as an RDD from the Tourist.csv file:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
  .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

Next, let’s apply the map function to get the name of the country from each record and convert the name to uppercase. We can save this newly generated dataset as a text file on disk:

JavaRDD<String> upperCaseCountries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1].toUpperCase();
}).distinct();

upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

If we want to select only a specific country, we can apply the filter function on our original tourists RDD:

JavaRDD<String> touristsInMexico = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));

touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2. Actions

Actions will return a final value or save the results to disc, after doing some computation on the data.

Two of the recurrently used actions in Spark are Count and Reduce.

Let’s count the total countries on our CSV file:

// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1];
}).distinct();

Long numberOfCountries = countries.count();

Now, we’ll calculate the total expenditure by country. We’ll need to filter the records containing expenditure in their description.

Instead of using a JavaRDD, we’ll use a JavaPairRDD. A pair of RDD is a type of RDD that can store key-value pairs. Let’s check it next:

JavaRDD<String> touristsExpenditure = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));

JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
  .mapToPair(line -> {
      String[] columns = line.split(COMMA_DELIMITER);
      return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});

List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
  .reduceByKey((x, y) -> x + y)
  .collect();

5. Conclusion

To sum up, we should use DataFrames or Datasets when we need domain-specific APIs, we need high-level expressions such as aggregation, sum, or SQL queries. Or when we want type-safety at compile time.

On the other hand, we should use RDDs when data is unstructured and we don’t need to implement a specific schema or when we need low-level transformations and actions.

As always, all of the code samples are available over on GitHub.