1. 简介
本文将演示如何使用Spring Cloud应用启动器——这些开箱即用的预构建应用模板,可作为未来开发的起点。
简单来说:
- Task应用启动器:适用于数据库迁移、分布式测试等场景
- Stream应用启动器:提供与外部系统的集成能力
目前官方提供超过55种启动器,完整列表可参考:
接下来我们将构建一个分布式Twitter应用,实现将推文流实时存储到Hadoop分布式文件系统(HDFS)的完整流程。
2. 环境准备
我们需要准备三块核心组件:
- Twitter应用:使用consumer-key和access-token创建
- Hadoop集群:用于持久化存储推文数据
- 应用组件:可选择以下两种方式组合:
- ✅ 使用Maven编译独立的
sources
-processors
-sinks
组件 - ✅ 通过Spring Stream绑定接口组合各组件
- ✅ 使用Maven编译独立的
⚠️ 注意:早期所有Stream启动器都集中在一个大仓库中,现在已拆分为独立模块(例如hdfs启动器)
3. 配置Twitter凭证
首先获取Twitter开发者凭证,步骤详见官方文档。
需要准备以下四项关键凭证:
- Consumer Key
- Consumer Key Secret
- Access Token
- Access Token Secret
操作建议:保持凭证页面打开或记录备用,后续配置会频繁使用!
4. 安装Hadoop
有两种安装方式可选:
方式一:手动安装
参考官方安装指南
方式二:Docker快速部署(推荐)
$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1
5. 编译应用启动器
5.1 Twitter Stream启动器
克隆Twitter Stream启动器仓库:
git clone https://github.com/spring-cloud-stream-app-starters/twitter.git
执行Maven编译:
./mvnw clean install -PgenerateApps
编译产物位于项目根目录的/target
文件夹。运行时可通过命令行传递凭证:
java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
--accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>
或者使用Spring标准配置文件application.properties
:
twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...
5.2 HDFS Sink启动器
克隆HDFS Sink启动器仓库:
git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git
执行Maven编译:
./mvnw clean install -PgenerateApps
编译产物同样位于/target
目录。运行时指定HDFS URI:
java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/
⚠️ 注意:hdfs://127.0.0.1:50010/
是Hadoop默认地址,实际端口可能因配置不同而变化。可通过http://0.0.0.0:50070
查看数据节点列表及端口。
同样支持在application.properties
中预配置:
hdfs.fs-uri=hdfs://127.0.0.1:50010/
6. 使用AggregateApplicationBuilder组合组件
更优雅的方式是通过AggregateApplicationBuilder
将Source和Sink组合为单个Spring Boot应用。
6.1 构建应用组件
首先在pom.xml
添加依赖:
<dependencies>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
</dependencies>
Source组件
@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
@InboundChannelAdapter(Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
Processor组件(可选)
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
log.info("Payload received!");
return payload;
}
}
Sink组件
@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
@ServiceActivator(inputChannel= Sink.INPUT)
public void loggerSink(Object payload) {
log.info("Received: " + payload);
}
}
组合主应用
@SpringBootApplication
public class AggregateApp {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApp.class).args("--fixedDelay=5000")
.via(ProcessorApp.class)
.to(SinkApp.class).args("--debug=true")
.run(args);
}
}
6.2 运行组合应用
编译并运行:
$ mvn install
$ java -jar twitterhdfs.jar
⚠️ 踩坑提醒:务必将每个@SpringBootApplication
类放在独立包中,否则会抛出绑定异常!
成功运行后控制台输出示例(实际推文内容会变化):
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
c.b.twitterhdfs.processor.ProcessorApp : Payload received!
2018-01-15 04:38:32.255 INFO 28778 --- [itterSource-1-1]
com.baeldung.twitterhdfs.sink.SinkApp : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...
这表明Processor和Sink组件正确接收并处理了来自Source的数据。本示例中HDFS Sink仅打印消息,实际生产环境可扩展为存储到HDFS。
7. 总结
我们成功将两个Spring Stream应用启动器组合为一个完整的Spring Boot应用,实现了Twitter流数据到HDFS的实时存储。
延伸阅读:
完整代码示例可在GitHub仓库获取。