1. 简介
Spring Cloud Data Flow 是一个云原生编程模型,专为可组合的数据微服务设计。
通过 Spring Cloud Data Flow,开发者可以轻松构建和编排数据管道,覆盖数据摄入、实时分析、数据导入导出等常见场景。
这些数据管道分为两种类型:
- 流处理管道:处理无界数据流(通过消息中间件)
- 批处理管道:处理有限数据集(短期任务)
本文重点介绍流处理管道的实现。
2. 架构概览
核心组件包括:
- 应用程序(Applications)
- Data Flow 服务器
- Skipper 服务器
- 目标运行时
通常还包含:
- Data Flow Shell(命令行工具)
- 消息代理(如 RabbitMQ)
2.1. 应用程序类型
流处理管道包含三个标准组件:
- Source(数据源)
- 从外部系统消费事件
- ✅ 示例:IoT 传感器数据、24/7 事件流
- Processor(处理器)
- 接收 Source 数据
- 执行业务逻辑(如转换/过滤)
- 发送处理结果到下一环节
- Sink(数据汇)
- 接收 Source 或 Processor 的数据
- 写入持久化层(数据库/文件系统等)
⚠️ 这些应用可打包为 Spring Boot 应用或 Docker 镜像
3. 安装消息代理
管道中的应用需要消息中间件通信,本文选用 RabbitMQ。
安装步骤:
# Ubuntu 示例
sudo apt-get install rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
详细安装指南参考 RabbitMQ 官网
4. 本地 Data Flow 和 Skipper 服务器
下载服务器组件:
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-server/2.11.2/spring-cloud-dataflow-server-2.11.2.jar
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-skipper-server/2.11.2/spring-cloud-skipper-server-2.11.2.jar
启动顺序:
- Skipper 服务器(端口 7577):
java -jar spring-cloud-skipper-server-2.11.2.jar
- Data Flow 服务器(端口 9393):
java -jar spring-cloud-dataflow-server-2.11.2.jar
⚠️ 需确保 RabbitMQ 已启动,否则会报连接错误
5. Data Flow Shell
下载并启动 Shell:
wget https://repo.maven.apache.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.11.2/spring-cloud-dataflow-shell-2.11.2.jar
java -jar spring-cloud-dataflow-shell-2.11.2.jar
启动后输入 help
查看所有可用命令:
dataflow:>help
6. Source 应用开发
创建 Spring Boot 项目,添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
主类添加注解:
@EnableBinding(Source.class)
@SpringBootApplication
public class SpringDataFlowTimeSourceApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeSourceApplication.class, args);
}
}
定义数据源(每 10 秒生成时间戳):
@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1")
)
public MessageSource<Long> timeMessageSource() {
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}
✅ 简单粗暴的定时数据源,生产环境可替换为 Kafka/Kinesis 等
7. Processor 应用开发
创建新 Spring Boot 项目,添加相同依赖。
主类注解:
@EnableBinding(Processor.class)
@SpringBootApplication
public class SpringDataFlowTimeProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowTimeProcessorApplication.class, args);
}
}
定义数据转换逻辑:
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
return date;
}
将时间戳转换为格式化日期字符串
8. Sink 应用开发
创建最终 Spring Boot 项目,添加依赖。
主类注解:
@EnableBinding(Sink.class)
@SpringBootApplication
public class SpringDataFlowLoggingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowLoggingSinkApplication.class, args);
}
}
定义数据消费逻辑:
@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
logger.info("Received: " + date);
}
✅ 将格式化日期输出到日志文件
9. 注册流应用
在 Data Flow Shell 中注册应用:
app register --name time-source --type source --uri file:///path/to/your/jars/time-source-0.0.1-SNAPSHOT.jar
app register --name time-processor --type processor --uri file:///path/to/your/jars/time-processor-0.0.1-SNAPSHOT.jar
app register --name log-sink --type sink --uri file:///path/to/your/jars/log-sink-0.0.1-SNAPSHOT.jar
⚠️ 路径需替换为实际 JAR 文件位置
10. 创建并部署流
定义流管道:
stream create --name time-to-log --definition 'time-source | time-processor | log-sink'
部署流:
stream deploy --name time-to-log
成功部署后 Data Flow 服务器日志会显示:
2024-04-15 15:15:27.153 INFO 23568 --- [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@4de8f9c3 using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
2024-04-15 15:15:32.156 INFO 23568 --- [pool-3-thread-1] o.s.c.s.s.d.DefaultReleaseManager : Getting status for org.springframework.cloud.skipper.domain.Release@4d72121e using deploymentIds time-to-log2.time-processor-v1,time-to-log2.time-source1-v1,time-to-log2.log-sink-v1
11. 验证结果
查看 Sink 应用的日志输出:
tail -f /tmp/spring-cloud-dataflow/time-to-log/log-sink/stdout_0.log
预期输出:
2024-04-15 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2024/04/15 11:40:01
2024-04-15 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2024/04/15 11:40:11
✅ 时间戳被正确格式化并输出
12. 总结
通过本文我们实践了:
- 搭建 Spring Cloud Data Flow 本地环境
- 开发 Source/Processor/Sink 三种核心应用
- 使用 Shell 工具编排流处理管道
- 实现端到端的数据转换与输出
⚠️ 生产环境建议使用 Kubernetes 部署,并配置监控告警
Spring Cloud Data Flow 提供了声明式的流处理能力,特别适合实时 ETL、IoT 数据处理等场景。掌握其核心组件后,可快速构建高可扩展的数据管道。