1. 概述

RxJava 是一个用于构建异步和事件驱动程序的流行库,其核心思想源自 Reactive Extensions 项目。而 Vert.x 作为 Eclipse 基金会下的项目,提供了专为响应式编程设计的组件。两者结合能为 Java 应用提供强大的响应式基础。

本文将演示一个实际场景:读取城市列表文件,计算每个城市的日出日落时间差。我们将使用 MetaWeather 的 REST API 获取数据,并通过 RxJava 和 Vert.x 实现纯响应式处理。

2. Maven 依赖

首先引入核心依赖 vertx-rx-java2

<dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-rx-java2</artifactId>
    <version>3.9.15</version>
</dependency>

⚠️ 注意:

  • 当前 Vert.x 与 RxJava 2 的集成仍处于 Beta 版,但稳定度足以满足本文示例
  • 该依赖已包含 rxjava2,无需额外引入
  • 最新版本可在 Maven Central 查询

3. 环境搭建

创建 Vert.x 实例作为入口点:

Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();

关键区别:

  • io.vertx.core.Vertx:标准 Vert.x 入口
  • io.vertx.reactivex.core.Vertx:RxJava 集成专用入口(本文使用)

初始化所需组件:

FileSystem fileSystem = vertx.fileSystem();
HttpClient httpClient = vertx.createHttpClient();

FileSystem 提供响应式文件访问 ✅ HttpClient 提供响应式 HTTP 通信

4. 响应式链构建

通过操作符组合实现完整处理流程:

fileSystem
  .rxReadFile("cities.txt").toFlowable()
  .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
  .flatMap(city -> searchByCityName(httpClient, city))
  .flatMap(HttpClientResponse::toFlowable)
  .map(extractingWoeid())
  .flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
  .flatMap(toBufferFlowable())
  .map(Buffer::toJsonObject)
  .map(toCityAndDayLength())
  .subscribe(System.out::println, Throwable::printStackTrace);

接下来逐步拆解每个环节的实现逻辑。

5. 城市名称读取

从文件加载城市列表(每行一个城市名):

fileSystem
 .rxReadFile("cities.txt").toFlowable()
 .flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))

处理流程:

  1. rxReadFile() 返回 Single<Buffer>(Vert.x 异步性 + RxJava 数据结构)
  2. 转换为 Flowable 后通过 flatMap 拆分行数据
  3. 最终得到每个城市名作为独立事件发射的流

6. 城市描述符获取

根据城市名查询 MetaWeather API 获取城市标识符:

.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)

实现 searchByCityName 方法:

Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
    HttpClientRequest req = httpClient.get(
        new RequestOptions()
          .setHost("www.metaweather.com")
          .setPort(443)
          .setSsl(true)
          .setURI(format("/api/location/search/?query=%s", cityName)));
    return req
      .toFlowable()
      .doOnSubscribe(subscription -> req.end());
}

踩坑点:❌ Vert.x 要求必须调用 end() 发送请求,且需在订阅后执行。解决方案:使用 doOnSubscribe 确保 end() 在订阅时触发。

7. 城市标识符提取

从 JSON 响应中解析城市唯一标识 woeid

.map(extractingWoeid())

实现 extractingWoeid 方法:

private static Function<Buffer, Long> extractingWoeid() {
    return cityBuffer -> cityBuffer
      .toJsonArray()
      .getJsonObject(0)
      .getLong("woeid");
}

利用 Buffer 提供的 JSON 解析方法直接访问目标字段。

8. 城市详情获取

根据标识符获取完整城市数据:

.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())

实现 getDataByPlaceId 方法:

static Flowable<HttpClientResponse> getDataByPlaceId(
  HttpClient httpClient, long placeId) {
 
    return autoPerformingReq(
      httpClient,
      format("/api/location/%s/", placeId));
}

响应数据聚合方法 toBufferFlowable

static Function<HttpClientResponse, Publisher<? extends Buffer>>
  toBufferFlowable() {
    return response -> response
      .toObservable()
      .reduce(
        Buffer.buffer(),
        Buffer::appendBuffer).toFlowable();
}

处理逻辑:将分块响应合并为完整 Buffer。

9. 日出日落时间计算

从 JSON 中提取关键数据并计算白昼时长:

.map(toCityAndDayLength())

实现 toCityAndDayLength 方法:

static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
    return json -> {
        ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
        ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
        String cityName = json.getString("title");
        return new CityAndDayLength(
          cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
    };
}

简单粗暴:直接计算时间戳差值得到秒数。

10. 结果订阅

构建完整的订阅处理:

.subscribe(
  System.out::println, 
  Throwable::printStackTrace)

运行示例输出(实际结果取决于城市列表和运行日期):

In Chicago there are 13.3 hours of light.
In Milan there are 13.5 hours of light.
In Cairo there are 12.9 hours of light.
In Moscow there are 14.1 hours of light.
In Santiago there are 11.3 hours of light.
In Auckland there are 11.2 hours of light.

⚠️ 注意:城市输出顺序可能与文件不一致(异步请求导致)。

11. 总结

本文展示了 Vert.x 响应式模块与 RxJava 操作符的完美结合:

  1. 通过响应式链清晰表达复杂业务逻辑
  2. 充分利用 Vert.x 的异步能力
  3. 保持代码简洁可维护

完整源码可在 GitHub 获取。


原始标题:Example of Vertx and RxJava Integration