1. 概述
本教程将带你入门 Apache Storm,一个分布式实时计算系统。
我们将重点讲解以下内容:
- Apache Storm 是什么,以及它能解决什么问题
- 它的架构设计
- 如何在项目中使用它
2. Apache Storm 是什么?
Apache Storm 是一个免费开源的分布式实时计算系统。
✅ 它具备容错性、可扩展性,并且能保证数据处理的可靠性,尤其擅长处理无界的数据流。
一些典型的应用场景包括:
- 信用卡交易的欺诈检测
- 智能家居中传感器异常数据的实时处理
Storm 支持与多种数据库和消息队列系统集成,如 Kafka、Kestrel、ActiveMQ 等。
3. Maven 依赖配置
使用 Apache Storm 前,我们需要在项目中添加 storm-core 依赖:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
⚠️ 如果你打算将应用部署到 Storm 集群上运行,必须使用 provided
作用域。
如果想在本地运行,可以使用所谓的“本地模式”(Local Mode)来模拟 Storm 集群环境。此时,你应该移除 <scope>provided</scope>
。
4. 数据模型
Storm 的数据模型由两个核心元素构成:Tuple 和 Stream。
4.1. Tuple
Tuple 是一个有序的、带命名字段的列表,字段类型是动态的。也就是说,你不需要显式声明字段的类型。
Storm 需要知道如何序列化 Tuple 中的值。默认情况下,Storm 已经支持基本类型、String
和 byte[]
。
由于 Storm 使用 Kryo 进行序列化,如果使用自定义类型,需要通过 Config
注册对应的序列化器。有以下两种方式:
- 仅注册类名,使用默认序列化器:
Config config = new Config();
config.registerSerialization(User.class);
在这种情况下,Kryo 会使用 FieldSerializer
,默认序列化该类所有非 transient
字段(包括私有字段)。
- 指定类和自定义序列化器:
Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);
要实现自定义序列化器,需要继承泛型类 Serializer
,并实现 write
和 read
方法。
4.2. Stream
Stream 是 Storm 中的核心抽象,表示一个无界的 Tuple 序列。
Storm 支持并行处理多个 Stream。每个 Stream 在声明时都会被赋予一个唯一的 ID。
5. Topology(拓扑)
Storm 应用的逻辑被打包成一个 Topology(拓扑)。拓扑由 Spout 和 Bolt 组成。
5.1. Spout
Spout 是数据流的源头,负责将 Tuple 发送到拓扑中。
Tuple 可以从 Kafka、Kestrel 或 ActiveMQ 等外部系统读取。
Spout 分为两种类型:
- 可靠的(Reliable):当 Tuple 处理失败时,Spout 可以重新发送
- 不可靠的(Unreliable):使用“发完即忘”机制,不关心处理结果
要创建自定义 Spout,可以实现 IRichSpout
接口,或者继承 BaseRichSpout
类。
下面是一个简单的不可靠 Spout 示例:
public class RandomIntSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector outputCollector;
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
outputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
}
}
这个 RandomIntSpout
每秒生成一个随机整数和时间戳。
5.2. Bolt
Bolt 用于处理 Tuple 流,可以执行过滤、聚合、函数计算等操作。
如果操作较复杂,可以拆分成多个 Bolt 协作完成。
要创建自定义 Bolt,可以实现 IRichBolt
或 IBasicBolt
接口。也可以使用辅助类,例如 BaseBasicBolt
。
下面是一个打印所有 Tuple 的 Bolt:
public class PrintingBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
6. 构建一个简单的拓扑
我们将把前面的内容整合成一个完整的拓扑结构,包含一个 Spout 和三个 Bolt。
6.1. RandomNumberSpout
首先,我们创建一个不可靠的 Spout,每秒生成一个 0 到 100 之间的随机整数:
public class RandomNumberSpout extends BaseRichSpout {
private Random random;
private SpoutOutputCollector collector;
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
random = new Random();
collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
int operation = random.nextInt(101);
long timestamp = System.currentTimeMillis();
Values values = new Values(operation, timestamp);
collector.emit(values);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}
6.2. FilteringBolt
接下来创建一个 Bolt,用于过滤掉 operation 为 0 的数据:
public class FilteringBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
int operation = tuple.getIntegerByField("operation");
if (operation > 0) {
basicOutputCollector.emit(tuple.getValues());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
}
}
6.3. AggregatingBolt
这个 Bolt 更复杂一些,它会基于时间窗口对数据进行聚合。
窗口(Window)是流处理中的关键概念,将无限流划分为有限块进行处理。常见窗口类型有:
- 时间窗口:按时间戳分组
- 计数窗口:按固定数量分组
我们的 AggregatingBolt
使用时间窗口,计算每个窗口内所有 operation 的总和,并记录窗口的起止时间:
public class AggregatingBolt extends BaseWindowedBolt {
private OutputCollector outputCollector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.outputCollector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
}
@Override
public void execute(TupleWindow tupleWindow) {
List<Tuple> tuples = tupleWindow.get();
tuples.sort(Comparator.comparing(this::getTimestamp));
int sumOfOperations = tuples.stream()
.mapToInt(tuple -> tuple.getIntegerByField("operation"))
.sum();
Long beginningTimestamp = getTimestamp(tuples.get(0));
Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));
Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
outputCollector.emit(values);
}
private Long getTimestamp(Tuple tuple) {
return tuple.getLongByField("timestamp");
}
}
⚠️ 注意:由于我们按时间戳分组,每个窗口至少会有一个元素,因此可以直接取第一个元素。
6.4. FileWritingBolt
最后一个 Bolt 会将 sumOfOperations
大于 2000 的结果写入文件:
public class FileWritingBolt extends BaseRichBolt {
public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
private BufferedWriter writer;
private String filePath;
private ObjectMapper objectMapper;
@Override
public void cleanup() {
try {
writer.close();
} catch (IOException e) {
logger.error("Failed to close writer!");
}
}
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
try {
writer = new BufferedWriter(new FileWriter(filePath));
} catch (IOException e) {
logger.error("Failed to open a file for writing.", e);
}
}
@Override
public void execute(Tuple tuple) {
int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
long endTimestamp = tuple.getLongByField("endTimestamp");
if (sumOfOperations > 2000) {
AggregatedWindow aggregatedWindow = new AggregatedWindow(
sumOfOperations, beginningTimestamp, endTimestamp);
try {
writer.write(objectMapper.writeValueAsString(aggregatedWindow));
writer.newLine();
writer.flush();
} catch (IOException e) {
logger.error("Failed to write data to file.", e);
}
}
}
// public constructor and other methods
}
✅ 注意:这是拓扑中的最后一个 Bolt,因此不需要声明输出字段。
6.5. 运行拓扑
最后,我们将所有组件组装起来并运行拓扑:
public static void runTopology() {
TopologyBuilder builder = new TopologyBuilder();
Spout random = new RandomNumberSpout();
builder.setSpout("randomNumberSpout");
Bolt filtering = new FilteringBolt();
builder.setBolt("filteringBolt", filtering)
.shuffleGrouping("randomNumberSpout");
Bolt aggregating = new AggregatingBolt()
.withTimestampField("timestamp")
.withLag(BaseWindowedBolt.Duration.seconds(1))
.withWindow(BaseWindowedBolt.Duration.seconds(5));
builder.setBolt("aggregatingBolt", aggregating)
.shuffleGrouping("filteringBolt");
String filePath = "./src/main/resources/data.txt";
Bolt file = new FileWritingBolt(filePath);
builder.setBolt("fileBolt", file)
.shuffleGrouping("aggregatingBolt");
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Test", config, builder.createTopology());
}
在拓扑中,我们需要通过 shuffleGrouping
来指定数据流向。例如:
builder.setBolt("filteringBolt", filtering)
.shuffleGrouping("randomNumberSpout");
表示 filteringBolt
的数据来自 randomNumberSpout
。
✅ 每个 Bolt 都需要指定其数据来源,可以是 Spout 或另一个 Bolt。如果多个 Bolt 共用一个来源,该来源会将数据广播给它们。
在这个例子中,我们使用 LocalCluster
在本地运行拓扑。
7. 总结
本文介绍了 Apache Storm 的基本概念,包括其架构、数据模型、Spout 和 Bolt 的使用方式,并通过一个完整的拓扑实例展示了如何构建实时处理应用。
如需查看完整代码,可以访问 GitHub 项目地址。