1. 概述
在这个教程中,我们将探讨如何使用zipWhen()
方法协调两个或多个Mono
流的结果。我们将从快速概述开始,然后通过一个涉及用户数据存储和电子邮件的简单示例来演示。在需要同时从多个源收集和处理数据的场景中,zipWhen()
帮助我们组织和协调多个异步操作。
2. zipWhen()
是什么?
在Reactive编程中,zipWhen()
是Mono
的一个操作符,它允许我们以协调的方式结合两个或多个Mono
流的结果。当我们需要并发执行多个异步操作,并将它们的结果合并到单个输出时,它非常有用。
我们从代表异步操作的两个或多个Mono
流开始,这些Mono
可以发出不同类型的数据,它们之间可能存在或可能不存在依赖关系。
接着,我们使用zipWhen()
进行协调。我们将zipWhen()
操作符应用于其中一个Mono
。这个操作符会等待第一个Mono
发出一个值,然后使用该值触发其他Mono
的执行。zipWhen()
的结果是一个新的Mono
,它将所有Mono
的结果合并到一个单一的数据结构中,通常是元组(Tuple
)或我们定义的对象。
最后,我们可以指定如何组合Mono
的结果。我们可以使用组合后的值创建新对象、执行计算或构造有意义的响应。
3. 示例设置
让我们设置一个简单的例子,包含三个简化服务:UserService
、EmailService
和DataBaseService
。每个服务都以不同类型的Mono
形式产生数据。我们希望将所有数据合并到单个响应中并返回给调用客户端。首先,让我们设置适当的Maven依赖项。
3.1. 依赖项
首先设置所需的依赖项。我们需要spring-boot-starter-webflux
(版本3.1.3)和reactor-test
(版本3.5.10):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
3.2. 设置UserService
首先介绍UserService
:
public class UserService {
public Mono<User> getUser(String userId) {
return Mono.just(new User(userId, "John Stewart"));
}
}
在这里,UserService
提供了一个根据给定的userId
获取用户数据的方法,返回一个表示用户信息的Mono<User>
。
3.3. 设置EmailService
接下来添加EmailService
:
public class EmailService {
private final UserService userService;
public EmailService(UserService userService) {
this.userService = userService;
}
public Mono<Boolean> sendEmail(String userId) {
return userService.getUser(userId)
.flatMap(user -> {
System.out.println("Sending email to: " + user.getEmail());
return Mono.just(true);
})
.defaultIfEmpty(false);
}
}
顾名思义,EmailService
负责向用户发送电子邮件。重要的是,它依赖于UserService
获取用户详情,然后根据检索到的信息发送邮件。sendEmail()
方法返回一个Mono<Boolean>
,表示电子邮件是否成功发送。
3.4. 设置DatabaseService
public class DatabaseService {
private Map<String, User> dataStore = new ConcurrentHashMap<>();
public Mono<Boolean> saveUserData(User user) {
return Mono.create(sink -> {
try {
dataStore.put(user.getId(), user);
sink.success(true);
} catch (Exception e) {
sink.success(false);
}
});
}
}
DatabaseService
负责处理用户数据的持久化到数据库。为了简化,我们在这里使用一个并发映射来表示数据存储。
它提供一个saveUserData()
方法,接受用户信息并返回一个Mono<Boolean>
,表示数据库操作的成功或失败。
4. zipWhen()
的使用
现在我们已经定义了所有服务,让我们定义一个控制器方法,将来自这三个服务的所有Mono
流合并为一个类型为Mono<ResponseEntity<String>>
的响应。我们将展示如何使用zipWhen()
操作符协调各种异步操作,并将它们全部转换为单个响应供调用客户端使用。首先定义GET
方法:
@GetMapping("/example/{userId}")
public Mono<ResponseEntity<String>> combineAllDataFor(@PathVariable String userId) {
Mono<User> userMono = userService.getUser(userId);
Mono<Boolean> emailSentMono = emailService.sendEmail(userId)
.subscribeOn(Schedulers.parallel());
Mono<String> databaseResultMono = userMono.flatMap(user -> databaseService.saveUserData(user)
.map(Object::toString));
return userMono.zipWhen(user -> emailSentMono, (t1, t2) -> Tuples.of(t1, t2))
.zipWhen(tuple -> databaseResultMono, (tuple, databaseResult) -> {
User user = tuple.getT1();
Boolean emailSent = tuple.getT2();
return ResponseEntity.ok()
.body("Response: " + user + ", Email Sent: " + emailSent + ", Database Result: " + databaseResult);
});
}
当客户端访问GET /example/{userId}端点时,userService
会调用getUser(userId)
来根据提供的userId
获取用户信息。这个结果存储在名为userMono
的Mono<User>
中。
接下来,它向同一用户发送电子邮件。但在发送电子邮件之前,它检查用户是否存在。电子邮件发送操作的结果(成功或失败)由emailSentMono
类型的Mono<Boolean>
表示。为了节省时间,这个操作并行执行。它使用databaseService.saveUserData(user)
将用户数据(步骤1中获取的)保存到数据库。这个操作的结果(成功或失败)被转换为字符串并存储在Mono<String>
中。
关键在于,它使用zipWhen()
操作符结合先前步骤的结果。第一个zipWhen()
将用户数据userMono
和电子邮件发送状态的emailSentMono
合并成一个元组。第二个zipWhen()
将之前的元组与dataBaseResultMono
的数据库结果结合起来,构建最终响应。在第二个zipWhen()
内部,它使用组合的数据构建响应消息。
消息包含了用户信息、电子邮件是否成功发送以及数据库操作的结果。总的来说,这个方法为特定用户组织了用户数据的获取、电子邮件发送和数据库操作,并将结果合并成有意义的响应,确保一切都高效且并发地进行。
5. 测试
现在,让我们测试我们的系统,确保返回正确的响应,该响应结合了三种不同的Reactive流类型:
@Test
public void givenUserId_whenCombineAllData_thenReturnsMonoWithCombinedData() {
UserService userService = Mockito.mock(UserService.class);
EmailService emailService = Mockito.mock(EmailService.class);
DatabaseService databaseService = Mockito.mock(DatabaseService.class);
String userId = "123";
User user = new User(userId, "John Doe");
Mockito.when(userService.getUser(userId))
.thenReturn(Mono.just(user));
Mockito.when(emailService.sendEmail(userId))
.thenReturn(Mono.just(true));
Mockito.when(databaseService.saveUserData(user))
.thenReturn(Mono.just(true));
UserController userController = new UserController(userService, emailService, databaseService);
Mono<ResponseEntity<String>> responseMono = userController.combineAllDataFor(userId);
StepVerifier.create(responseMono)
.expectNextMatches(responseEntity -> responseEntity.getStatusCode() == HttpStatus.OK && responseEntity.getBody()
.equals("Response: " + user + ", Email Sent: true, Database Result: " + true))
.verifyComplete();
}