1. 概述

在这个教程中,我们将探讨如何使用zipWhen()方法协调两个或多个Mono流的结果。我们将从快速概述开始,然后通过一个涉及用户数据存储和电子邮件的简单示例来演示。在需要同时从多个源收集和处理数据的场景中,zipWhen()帮助我们组织和协调多个异步操作。

2. zipWhen()是什么?

在Reactive编程中,zipWhen()Mono的一个操作符,它允许我们以协调的方式结合两个或多个Mono流的结果。当我们需要并发执行多个异步操作,并将它们的结果合并到单个输出时,它非常有用。

我们从代表异步操作的两个或多个Mono流开始,这些Mono可以发出不同类型的数据,它们之间可能存在或可能不存在依赖关系。

接着,我们使用zipWhen()进行协调。我们将zipWhen()操作符应用于其中一个Mono。这个操作符会等待第一个Mono发出一个值,然后使用该值触发其他Mono的执行。zipWhen()的结果是一个新的Mono,它将所有Mono的结果合并到一个单一的数据结构中,通常是元组(Tuple)或我们定义的对象。

最后,我们可以指定如何组合Mono的结果。我们可以使用组合后的值创建新对象、执行计算或构造有意义的响应。

3. 示例设置

让我们设置一个简单的例子,包含三个简化服务:UserServiceEmailServiceDataBaseService。每个服务都以不同类型的Mono形式产生数据。我们希望将所有数据合并到单个响应中并返回给调用客户端。首先,让我们设置适当的Maven依赖项。

3.1. 依赖项

首先设置所需的依赖项。我们需要spring-boot-starter-webflux(版本3.1.3)和reactor-test(版本3.5.10):

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

3.2. 设置UserService

首先介绍UserService

public class UserService {
    public Mono<User> getUser(String userId) {
        return Mono.just(new User(userId, "John Stewart"));
    }
}

在这里,UserService提供了一个根据给定的userId获取用户数据的方法,返回一个表示用户信息的Mono<User>

3.3. 设置EmailService

接下来添加EmailService

public class EmailService {
    private final UserService userService;

    public EmailService(UserService userService) {
        this.userService = userService;
    }

    public Mono<Boolean> sendEmail(String userId) {
        return userService.getUser(userId)
          .flatMap(user -> {
              System.out.println("Sending email to: " + user.getEmail());
              return Mono.just(true);
          })
          .defaultIfEmpty(false);
    }
}

顾名思义,EmailService负责向用户发送电子邮件。重要的是,它依赖于UserService获取用户详情,然后根据检索到的信息发送邮件。sendEmail()方法返回一个Mono<Boolean>,表示电子邮件是否成功发送。

3.4. 设置DatabaseService

public class DatabaseService {
    private Map<String, User> dataStore = new ConcurrentHashMap<>();

    public Mono<Boolean> saveUserData(User user) {
        return Mono.create(sink -> {
            try {
                dataStore.put(user.getId(), user);
                sink.success(true);
            } catch (Exception e) {
                sink.success(false);
            }
        });
    }
}

DatabaseService负责处理用户数据的持久化到数据库。为了简化,我们在这里使用一个并发映射来表示数据存储。

它提供一个saveUserData()方法,接受用户信息并返回一个Mono<Boolean>,表示数据库操作的成功或失败。

4. zipWhen()的使用

现在我们已经定义了所有服务,让我们定义一个控制器方法,将来自这三个服务的所有Mono流合并为一个类型为Mono<ResponseEntity<String>>的响应。我们将展示如何使用zipWhen()操作符协调各种异步操作,并将它们全部转换为单个响应供调用客户端使用。首先定义GET方法:

@GetMapping("/example/{userId}")
public Mono<ResponseEntity<String>> combineAllDataFor(@PathVariable String userId) {
    Mono<User> userMono = userService.getUser(userId);
    Mono<Boolean> emailSentMono = emailService.sendEmail(userId)
      .subscribeOn(Schedulers.parallel());
    Mono<String> databaseResultMono = userMono.flatMap(user -> databaseService.saveUserData(user)
      .map(Object::toString));

    return userMono.zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
      .zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
          User user = tuple.getT1();
          Boolean emailSent = tuple.getT2();
          return ResponseEntity.ok()
            .body("Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult);
      });
}

当客户端访问GET /example/{userId}端点时,userService会调用getUser(userId)来根据提供的userId获取用户信息。这个结果存储在名为userMonoMono<User>中。

接下来,它向同一用户发送电子邮件。但在发送电子邮件之前,它检查用户是否存在。电子邮件发送操作的结果(成功或失败)由emailSentMono类型的Mono<Boolean>表示。为了节省时间,这个操作并行执行。它使用databaseService.saveUserData(user)将用户数据(步骤1中获取的)保存到数据库。这个操作的结果(成功或失败)被转换为字符串并存储在Mono<String>中。

关键在于,它使用zipWhen()操作符结合先前步骤的结果。第一个zipWhen()将用户数据userMono和电子邮件发送状态的emailSentMono合并成一个元组。第二个zipWhen()将之前的元组与dataBaseResultMono的数据库结果结合起来,构建最终响应。在第二个zipWhen()内部,它使用组合的数据构建响应消息。

消息包含了用户信息、电子邮件是否成功发送以及数据库操作的结果。总的来说,这个方法为特定用户组织了用户数据的获取、电子邮件发送和数据库操作,并将结果合并成有意义的响应,确保一切都高效且并发地进行。

5. 测试

现在,让我们测试我们的系统,确保返回正确的响应,该响应结合了三种不同的Reactive流类型:

@Test
public void givenUserId_whenCombineAllData_thenReturnsMonoWithCombinedData() {
    UserService userService = Mockito.mock(UserService.class);
    EmailService emailService = Mockito.mock(EmailService.class);
    DatabaseService databaseService = Mockito.mock(DatabaseService.class);

    String userId = "123";
    User user = new User(userId, "John Doe");

    Mockito.when(userService.getUser(userId))
      .thenReturn(Mono.just(user));
    Mockito.when(emailService.sendEmail(userId))
      .thenReturn(Mono.just(true));
    Mockito.when(databaseService.saveUserData(user))
      .thenReturn(Mono.just(true));

    UserController userController = new UserController(userService, emailService, databaseService);

    Mono<ResponseEntity<String>> responseMono = userController.combineAllDataFor(userId);

    StepVerifier.create(responseMono)
      .expectNextMatches(responseEntity -> responseEntity.getStatusCode() == HttpStatus.OK && responseEntity.getBody()
        .equals("Response: " + user + ", Email Sent: true, Database Result: " + true))
      .verifyComplete();
}