1. 概述
Apache Spark 是一个快速、分布式的通用数据处理系统,它支持内存计算,通过内存缓存和优化执行机制,实现极高的性能。Spark 提供了面向多种主流编程语言(如 Scala、Python、Java 和 R)的高级 API。
在本篇文章中,我们将深入探讨 Spark 中三个核心概念:DataFrame、Dataset 和 RDD。
2. DataFrame
从 Spark 1.3 版本开始,Spark SQL 引入了一种名为 DataFrame 的结构化数据抽象。自那以后,它就成为了 Spark 中最重要的特性之一。当我们需要处理结构化或半结构化的分布式数据时,DataFrame 是一个非常合适的选择。
在第 3 节中我们会讨论 RDD(Resilient Distributed Dataset)。DataFrame 在存储效率上优于 RDD,因为它不仅继承了 RDD 的不可变性、内存计算、容错性和分布式并行处理能力,还为数据添加了 schema(结构),使得 Spark 可以更好地优化执行计划。
此外,DataFrame 会将 SQL 查询翻译为优化后的低级 RDD 操作。
我们可以用以下三种方式创建 DataFrame:
- 将已有的 RDD 转换而来
- 通过 SQL 查询生成
- 从外部数据源加载
在 Spark 2.0 中,官方引入了 SparkSession,统一了之前多个上下文的使用方式,开发者不再需要手动管理不同的上下文对象:
SparkSession session = SparkSession.builder()
.appName("TouristDataFrameExample")
.master("local[*]")
.getOrCreate();
DataFrameReader dataFrameReader = session.read();
我们接下来将分析 Tourist.csv
文件中的数据:
Dataset<Row> data = dataFrameReader.option("header", "true")
.csv("data/Tourist.csv");
⚠️ 注意:从 Spark 2.0 开始,DataFrame 实际上是类型为 Row
的 Dataset,即 Dataset<Row>
的别名。
我们可以选择特定列进行展示、过滤或分组操作:
data.select(col("country"), col("year"), col("value"))
.show();
data.filter(col("country").equalTo("Mexico"))
.show();
data.groupBy(col("country"))
.count()
.show();
3. Dataset
Dataset 是一组强类型、结构化的数据集合,它结合了面向对象编程风格和类型安全的优势,可以在编译期进行语法检查和错误捕获。
Dataset 可以看作是 DataFrame 的扩展。从这个角度来说,DataFrame 可以被视作一个无类型的 Dataset。
Dataset API 是在 Spark 1.6 中引入的,正如 Spark 团队所说:
“Spark Dataset 的目标是提供一个既能方便表达对象域上的转换操作,又能充分利用 Spark SQL 执行引擎性能和稳定性的 API。”
首先,我们需要定义一个 Java 类 TouristData
来映射数据:
public class TouristData {
private String region;
private String country;
private String year;
private String series;
private Double value;
private String footnotes;
private String source;
// ... getters and setters
}
为了将记录映射到指定类型,我们需要使用 Encoder。Encoder 负责在 Java 对象和 Spark 内部的二进制格式之间进行转换:
// SparkSession 初始化与数据加载
Dataset<Row> responseWithSelectedColumns = data.select(
col("region"),
col("country"),
col("year"),
col("series"),
col("value").cast("double"),
col("footnotes"),
col("source")
);
Dataset<TouristData> typedDataset = responseWithSelectedColumns
.as(Encoders.bean(TouristData.class));
像 DataFrame 一样,我们也可以对 Dataset 进行过滤和分组操作:
typedDataset.filter((FilterFunction) record -> record.getCountry()
.equals("Norway"))
.show();
typedDataset.groupBy(typedDataset.col("country"))
.count()
.show();
我们还可以实现更复杂的操作,例如筛选某个年份范围内的数据,或者计算某列的总和:
typedDataset.filter((FilterFunction) record -> record.getYear() != null
&& (Long.valueOf(record.getYear()) > 2010
&& Long.valueOf(record.getYear()) < 2017)).show();
typedDataset.filter((FilterFunction) record -> record.getValue() != null
&& record.getSeries()
.contains("expenditure"))
.groupBy("country")
.agg(sum("value"))
.show();
4. RDD
RDD(Resilient Distributed Dataset)是 Spark 最核心的编程抽象。它表示一个不可变、容错、分布式的元素集合。
RDD 封装了大量数据,Spark 会自动将 RDD 中的数据分布到集群节点上,并并行执行操作。
我们只能通过以下两种方式创建 RDD:
- 从稳定存储中读取数据
- 从其他 RDD 上执行转换操作生成
在处理大规模分布式数据时,容错性至关重要。RDD 的容错能力来自于 Spark 内置的恢复机制。Spark 会记录 RDD 的血缘关系(lineage),以便在发生故障时能快速重建丢失的分区。
对 RDD 的操作主要分为两类:转换(Transformations)和动作(Actions)
4.1. 转换(Transformations)
转换操作用于对 RDD 中的数据进行处理,每次转换操作都会生成一个新的 RDD,因为 RDD 是不可变的。
我们以 Map 和 Filter 为例,这是最常用的两个转换操作。
首先创建一个 JavaSparkContext
并从 Tourist.csv
文件中加载数据为 RDD:
SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");
然后我们使用 map
操作提取每条记录中的国家名,并将其转换为大写,再保存到磁盘:
JavaRDD<String> upperCaseCountries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1].toUpperCase();
}).distinct();
upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");
如果我们只想筛选出特定国家的数据,可以使用 filter
:
JavaRDD<String> touristsInMexico = tourists
.filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));
touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");
4.2. 动作(Actions)
动作操作用于触发实际的计算,并返回结果或将结果写入存储系统。
在 Spark 中,最常用的动作包括 count
和 reduce
。
我们先统计 CSV 文件中不同国家的数量:
// Spark Context 初始化与数据加载
JavaRDD<String> countries = tourists.map(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return columns[1];
}).distinct();
Long numberOfCountries = countries.count();
接下来,我们计算各国旅游支出总和。为此,我们需要筛选出描述中包含 “expenditure” 的记录。
这里我们使用 JavaPairRDD
来处理键值对数据。JavaPairRDD 是一种可以存储键值对的特殊 RDD 类型:
JavaRDD<String> touristsExpenditure = tourists
.filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));
JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
.mapToPair(line -> {
String[] columns = line.split(COMMA_DELIMITER);
return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});
List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
.reduceByKey((x, y) -> x + y)
.collect();
5. 总结
✅ 简单总结一下:
- 当我们需要结构化数据处理、高级聚合操作或 SQL 查询能力时,推荐使用 DataFrame 或 Dataset。
- 如果你需要编译时类型安全,Dataset 是更好的选择。
- 如果数据是非结构化的,或者你需要进行低级别的转换和控制,RDD 仍然是不可替代的。
一如既往,本文所有示例代码都可以在 GitHub 上找到。