1. 简介

本文将演示如何使用Spring Cloud应用启动器——这些开箱即用的预构建应用模板,可作为未来开发的起点。

简单来说:

  • Task应用启动器:适用于数据库迁移、分布式测试等场景
  • Stream应用启动器:提供与外部系统的集成能力

目前官方提供超过55种启动器,完整列表可参考:

接下来我们将构建一个分布式Twitter应用,实现将推文流实时存储到Hadoop分布式文件系统(HDFS)的完整流程。

2. 环境准备

我们需要准备三块核心组件:

  1. Twitter应用:使用consumer-key和access-token创建
  2. Hadoop集群:用于持久化存储推文数据
  3. 应用组件:可选择以下两种方式组合:
    • ✅ 使用Maven编译独立的sources-processors-sinks组件
    • ✅ 通过Spring Stream绑定接口组合各组件

⚠️ 注意:早期所有Stream启动器都集中在一个大仓库中,现在已拆分为独立模块(例如hdfs启动器

3. 配置Twitter凭证

首先获取Twitter开发者凭证,步骤详见官方文档

需要准备以下四项关键凭证:

  1. Consumer Key
  2. Consumer Key Secret
  3. Access Token
  4. Access Token Secret

操作建议:保持凭证页面打开或记录备用,后续配置会频繁使用!

4. 安装Hadoop

有两种安装方式可选:

方式一:手动安装

参考官方安装指南

方式二:Docker快速部署(推荐)

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. 编译应用启动器

5.1 Twitter Stream启动器

克隆Twitter Stream启动器仓库:

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

执行Maven编译:

./mvnw clean install -PgenerateApps

编译产物位于项目根目录的/target文件夹。运行时可通过命令行传递凭证:

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

或者使用Spring标准配置文件application.properties

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2 HDFS Sink启动器

克隆HDFS Sink启动器仓库:

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

执行Maven编译:

./mvnw clean install -PgenerateApps

编译产物同样位于/target目录。运行时指定HDFS URI:

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

⚠️ 注意:hdfs://127.0.0.1:50010/是Hadoop默认地址,实际端口可能因配置不同而变化。可通过http://0.0.0.0:50070查看数据节点列表及端口。

同样支持在application.properties中预配置:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. 使用AggregateApplicationBuilder组合组件

更优雅的方式是通过AggregateApplicationBuilder将Source和Sink组合为单个Spring Boot应用。

6.1 构建应用组件

首先在pom.xml添加依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

Source组件

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

Processor组件(可选)

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

Sink组件

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

组合主应用

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

6.2 运行组合应用

编译并运行:

$ mvn install
$ java -jar twitterhdfs.jar

⚠️ 踩坑提醒:务必将每个@SpringBootApplication类放在独立包中,否则会抛出绑定异常!

成功运行后控制台输出示例(实际推文内容会变化):

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

这表明Processor和Sink组件正确接收并处理了来自Source的数据。本示例中HDFS Sink仅打印消息,实际生产环境可扩展为存储到HDFS。

7. 总结

我们成功将两个Spring Stream应用启动器组合为一个完整的Spring Boot应用,实现了Twitter流数据到HDFS的实时存储。

延伸阅读:

完整代码示例可在GitHub仓库获取。


原始标题:Using a Spring Cloud App Starter