1. 简介

Spring Cloud Data Flow 是一个云原生编程模型,专为可组合的数据微服务设计。

通过 Spring Cloud Data Flow,开发者可以轻松构建和编排数据管道,覆盖数据摄入、实时分析、数据导入导出等常见场景。

这些数据管道分为两种类型:

  • 流处理管道:处理无界数据流(通过消息中间件)
  • 批处理管道:处理有限数据集(短期任务)

本文重点介绍流处理管道的实现。

2. 架构概览

核心组件包括:

  • 应用程序(Applications)
  • Data Flow 服务器
  • Skipper 服务器
  • 目标运行时

通常还包含:

  • Data Flow Shell(命令行工具)
  • 消息代理(如 RabbitMQ)

2.1. 应用程序类型

流处理管道包含三个标准组件:

  1. Source(数据源)
    • 从外部系统消费事件
    • ✅ 示例:IoT 传感器数据、24/7 事件流
  2. Processor(处理器)
    • 接收 Source 数据
    • 执行业务逻辑(如转换/过滤)
    • 发送处理结果到下一环节
  3. Sink(数据汇)
    • 接收 Source 或 Processor 的数据
    • 写入持久化层(数据库/文件系统等)

⚠️ 这些应用可打包为 Spring Boot 应用或 Docker 镜像

3. 安装消息代理

管道中的应用需要消息中间件通信,本文选用 RabbitMQ

安装步骤:

# Ubuntu 示例
sudo apt-get install rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server

详细安装指南参考 RabbitMQ 官网

4. 本地 Data Flow 和 Skipper 服务器

下载服务器组件:

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.2/spring-cloud-dataflow-server-2.11.2.jar 
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.2/spring-cloud-skipper-server-2.11.2.jar

启动顺序:

  1. Skipper 服务器(端口 7577):
    java -jar spring-cloud-skipper-server-2.11.2.jar
    
  2. Data Flow 服务器(端口 9393):
    java -jar spring-cloud-dataflow-server-2.11.2.jar
    

⚠️ 需确保 RabbitMQ 已启动,否则会报连接错误

5. Data Flow Shell

下载并启动 Shell:

wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.2/spring-cloud-dataflow-shell-2.11.2.jar
java -jar spring-cloud-dataflow-shell-2.11.2.jar

启动后输入 help 查看所有可用命令:

dataflow:>help

6. Source 应用开发

创建 Spring Boot 项目,添加依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

主类添加注解:

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeSourceApplication.class, args);
    }
}

定义数据源(每 10 秒生成时间戳):

@Bean
@InboundChannelAdapter(
  value = Source.OUTPUT, 
  poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
    return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

✅ 简单粗暴的定时数据源,生产环境可替换为 Kafka/Kinesis 等

7. Processor 应用开发

创建新 Spring Boot 项目,添加相同依赖。

主类注解:

@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowTimeProcessorApplication.class, args);
    }
}

定义数据转换逻辑:

@Transformer(inputChannel = Processor.INPUT, 
  outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
    String date = dateFormat.format(timestamp);
    return date;
}

将时间戳转换为格式化日期字符串

8. Sink 应用开发

创建最终 Spring Boot 项目,添加依赖。

主类注解:

@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {

    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowLoggingSinkApplication.class, args);
    }
}

定义数据消费逻辑:

@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
    logger.info("Received: " + date);
}

✅ 将格式化日期输出到日志文件

9. 注册流应用

在 Data Flow Shell 中注册应用:

app register --name time-source --type source --uri file:///path/to/your/jars/time-source-0.0.1-SNAPSHOT.jar
app register --name time-processor --type processor --uri file:///path/to/your/jars/time-processor-0.0.1-SNAPSHOT.jar
app register --name log-sink --type sink --uri file:///path/to/your/jars/log-sink-0.0.1-SNAPSHOT.jar

⚠️ 路径需替换为实际 JAR 文件位置

10. 创建并部署流

定义流管道:

stream create --name time-to-log --definition 'time-source | time-processor | log-sink'

部署流:

stream deploy --name time-to-log

成功部署后 Data Flow 服务器日志会显示:

2024-04-15 15:15:27.153  INFO 23568 --- [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager        : Getting status for org.springframework.cloud.skipper.domain.Release@4de8f9c3 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:32.156  INFO 23568 --- [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager        : Getting status for org.springframework.cloud.skipper.domain.Release@4d72121e using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1

11. 验证结果

查看 Sink 应用的日志输出:

tail -f /tmp/spring-cloud-dataflow/time-to-log/log-sink/stdout_0.log

预期输出:

2024-04-15 12:40:42.029  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2024/04/15 11:40:01
2024-04-15 12:40:52.035  INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2024/04/15 11:40:11

✅ 时间戳被正确格式化并输出

12. 总结

通过本文我们实践了:

  1. 搭建 Spring Cloud Data Flow 本地环境
  2. 开发 Source/Processor/Sink 三种核心应用
  3. 使用 Shell 工具编排流处理管道
  4. 实现端到端的数据转换与输出

⚠️ 生产环境建议使用 Kubernetes 部署,并配置监控告警

Spring Cloud Data Flow 提供了声明式的流处理能力,特别适合实时 ETL、IoT 数据处理等场景。掌握其核心组件后,可快速构建高可扩展的数据管道。


原始标题:Getting Started with Stream Processing + Spring Cloud Data Flow | Baeldung