1. 概述

本教程将带你入门 Apache Storm一个分布式实时计算系统。

我们将重点讲解以下内容:

  • Apache Storm 是什么,以及它能解决什么问题
  • 它的架构设计
  • 如何在项目中使用它

2. Apache Storm 是什么?

Apache Storm 是一个免费开源的分布式实时计算系统。

✅ 它具备容错性、可扩展性,并且能保证数据处理的可靠性,尤其擅长处理无界的数据流。

一些典型的应用场景包括:

  • 信用卡交易的欺诈检测
  • 智能家居中传感器异常数据的实时处理

Storm 支持与多种数据库和消息队列系统集成,如 Kafka、Kestrel、ActiveMQ 等。

3. Maven 依赖配置

使用 Apache Storm 前,我们需要在项目中添加 storm-core 依赖

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

⚠️ 如果你打算将应用部署到 Storm 集群上运行,必须使用 provided 作用域。

如果想在本地运行,可以使用所谓的“本地模式”(Local Mode)来模拟 Storm 集群环境。此时,你应该移除 <scope>provided</scope>

4. 数据模型

Storm 的数据模型由两个核心元素构成:TupleStream

4.1. Tuple

Tuple 是一个有序的、带命名字段的列表,字段类型是动态的。也就是说,你不需要显式声明字段的类型。

Storm 需要知道如何序列化 Tuple 中的值。默认情况下,Storm 已经支持基本类型、Stringbyte[]

由于 Storm 使用 Kryo 进行序列化,如果使用自定义类型,需要通过 Config 注册对应的序列化器。有以下两种方式:

  1. 仅注册类名,使用默认序列化器:
Config config = new Config();
config.registerSerialization(User.class);

在这种情况下,Kryo 会使用 FieldSerializer,默认序列化该类所有非 transient 字段(包括私有字段)。

  1. 指定类和自定义序列化器:
Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

要实现自定义序列化器,需要继承泛型类 Serializer,并实现 writeread 方法。

4.2. Stream

Stream 是 Storm 中的核心抽象,表示一个无界的 Tuple 序列。

Storm 支持并行处理多个 Stream。每个 Stream 在声明时都会被赋予一个唯一的 ID。

5. Topology(拓扑)

Storm 应用的逻辑被打包成一个 Topology(拓扑)。拓扑由 SpoutBolt 组成。

5.1. Spout

Spout 是数据流的源头,负责将 Tuple 发送到拓扑中。

Tuple 可以从 Kafka、Kestrel 或 ActiveMQ 等外部系统读取。

Spout 分为两种类型:

  • 可靠的(Reliable):当 Tuple 处理失败时,Spout 可以重新发送
  • 不可靠的(Unreliable):使用“发完即忘”机制,不关心处理结果

要创建自定义 Spout,可以实现 IRichSpout 接口,或者继承 BaseRichSpout 类。

下面是一个简单的不可靠 Spout 示例:

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

这个 RandomIntSpout 每秒生成一个随机整数和时间戳。

5.2. Bolt

Bolt 用于处理 Tuple 流,可以执行过滤、聚合、函数计算等操作。

如果操作较复杂,可以拆分成多个 Bolt 协作完成。

要创建自定义 Bolt,可以实现 IRichBoltIBasicBolt 接口。也可以使用辅助类,例如 BaseBasicBolt

下面是一个打印所有 Tuple 的 Bolt:

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

6. 构建一个简单的拓扑

我们将把前面的内容整合成一个完整的拓扑结构,包含一个 Spout 和三个 Bolt。

6.1. RandomNumberSpout

首先,我们创建一个不可靠的 Spout,每秒生成一个 0 到 100 之间的随机整数:

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.2. FilteringBolt

接下来创建一个 Bolt,用于过滤掉 operation 为 0 的数据:

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.3. AggregatingBolt

这个 Bolt 更复杂一些,它会基于时间窗口对数据进行聚合。

窗口(Window)是流处理中的关键概念,将无限流划分为有限块进行处理。常见窗口类型有:

  • 时间窗口:按时间戳分组
  • 计数窗口:按固定数量分组

我们的 AggregatingBolt 使用时间窗口,计算每个窗口内所有 operation 的总和,并记录窗口的起止时间:

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List<Tuple> tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

⚠️ 注意:由于我们按时间戳分组,每个窗口至少会有一个元素,因此可以直接取第一个元素。

6.4. FileWritingBolt

最后一个 Bolt 会将 sumOfOperations 大于 2000 的结果写入文件:

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

✅ 注意:这是拓扑中的最后一个 Bolt,因此不需要声明输出字段。

6.5. 运行拓扑

最后,我们将所有组件组装起来并运行拓扑:

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

在拓扑中,我们需要通过 shuffleGrouping 来指定数据流向。例如:

builder.setBolt("filteringBolt", filtering)
  .shuffleGrouping("randomNumberSpout");

表示 filteringBolt 的数据来自 randomNumberSpout

✅ 每个 Bolt 都需要指定其数据来源,可以是 Spout 或另一个 Bolt。如果多个 Bolt 共用一个来源,该来源会将数据广播给它们。

在这个例子中,我们使用 LocalCluster 在本地运行拓扑。

7. 总结

本文介绍了 Apache Storm 的基本概念,包括其架构、数据模型、Spout 和 Bolt 的使用方式,并通过一个完整的拓扑实例展示了如何构建实时处理应用。

如需查看完整代码,可以访问 GitHub 项目地址


原始标题:Intro to Apache Storm