1. 概述
在这篇简短的文章中,我们将通过构建一个现实场景来介绍Reactor项目中的reactor-bus。
注意:在Reactor 3.x版本中已移除reactor-bus项目:存档的reactor-bus存储库
2. Reactor的基本原理
2.1. 为什么选择Reactor?
现代应用需要处理大量并发请求并处理大量数据。标准的阻塞代码已经不足以满足这些需求。
事件驱动架构模式 是一种异步处理大量并发服务请求的基于事件的架构方法,这些请求来自单个或多个服务处理者。
Reactor项目基于此模式,并有明确且雄心勃勃的目标,即在JVM上构建非阻塞、反应式的应用程序。
2.2. 示例场景
在开始之前,让我们列举一些使用反应式架构风格有意义的场景,以便了解我们可能在哪里应用它:
- 像亚马逊这样的大型在线购物平台的通知服务
- 银行业务中的大规模交易处理服务
- 股票交易业务,股票价格会同时变化
3. Maven依赖项
现在,让我们通过在pom.xml
中添加以下依赖来开始使用Project Reactor Bus:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
您可以在Maven中央仓库检查reactor-bus的最新版本。
4. 构建示例应用
为了更好地理解基于reactor的方法的好处,让我们看一个实际的例子。
我们将构建一个简单的应用,负责向在线购物平台的用户发送通知。例如,如果用户下新订单,应用将通过电子邮件或短信发送订单确认。
传统的同步实现自然会受到电子邮件或短信服务吞吐量的限制。因此,像节假日这样的流量高峰通常会存在问题。
采用反应式方法,我们可以设计系统更灵活,更好地适应外部系统(如网关服务器)可能出现的故障或超时。
让我们来看看应用——从传统方面开始,然后转向更反应式的构建。
4.1. 简单POJO
首先,创建一个POJO类来表示通知数据:
public class NotificationData {
private long id;
private String name;
private String email;
private String mobile;
// getter and setter methods
}
4.2. 服务层
接下来,定义一个简单的服务层:
public interface NotificationService {
void initiateNotification(NotificationData notificationData)
throws InterruptedException;
}
以及模拟长时间运行操作的实现:
@Service
public class NotificationServiceimpl implements NotificationService {
@Override
public void initiateNotification(NotificationData notificationData)
throws InterruptedException {
System.out.println("Notification service started for "
+ "Notification ID: " + notificationData.getId());
Thread.sleep(5000);
System.out.println("Notification service ended for "
+ "Notification ID: " + notificationData.getId());
}
}
请注意,为了演示通过电子邮件或短信网关发送消息的真实场景,我们在initiateNotification
方法中故意引入了5秒的延迟,使用Thread.sleep(5000)
。
因此,当线程到达服务时,它会被阻塞5秒。
4.3. 消费者
现在,让我们转向应用的更反应式部分,实现一个消费者,然后将其映射到reactor事件总线:
@Service
public class NotificationConsumer implements
Consumer<Event<NotificationData>> {
@Autowired
private NotificationService notificationService;
@Override
public void accept(Event<NotificationData> notificationDataEvent) {
NotificationData notificationData = notificationDataEvent.getData();
try {
notificationService.initiateNotification(notificationData);
} catch (InterruptedException e) {
// ignore
}
}
}
如我们所见,我们创建的消费者实现了Consumer<T>
接口。主要逻辑位于accept
方法中。
这与典型的Spring监听器实现类似。
4.4. 控制器
最后,既然我们能够消费事件,我们也来生成它们。
我们将在一个简单的控制器中做到这一点:
@Controller
public class NotificationController {
@Autowired
private EventBus eventBus;
@GetMapping("/startNotification/{param}")
public void startNotification(@PathVariable Integer param) {
for (int i = 0; i < param; i++) {
NotificationData data = new NotificationData();
data.setId(i);
eventBus.notify("notificationConsumer", Event.wrap(data));
System.out.println(
"Notification " + i + ": notification task submitted successfully");
}
}
}
这相当直观——我们在这里通过EventBus
发出事件。
例如,如果客户端访问带有参数值为十的URL,则会通过事件总线发送十个事件。
4.5. Java配置
现在,让我们将所有内容放在一起,创建一个简单的Spring Boot应用。
首先,我们需要配置EventBus
和Environment
bean:
@Configuration
public class Config {
@Bean
public Environment env() {
return Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus createEventBus(Environment env) {
return EventBus.create(env, Environment.THREAD_POOL);
}
}
在我们的例子中,**我们使用环境中的默认线程池实例化EventBus
**。
或者,我们可以使用自定义的Dispatcher
实例:
EventBus evBus = EventBus.create(
env,
Environment.newDispatcher(
REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,
DispatcherType.THREAD_POOL_EXECUTOR));
现在,我们准备好创建主应用代码:
import static reactor.bus.selector.Selectors.$;
@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {
@Autowired
private EventBus eventBus;
@Autowired
private NotificationConsumer notificationConsumer;
@Override
public void run(String... args) throws Exception {
eventBus.on($("notificationConsumer"), notificationConsumer);
}
public static void main(String[] args) {
SpringApplication.run(NotificationApplication.class, args);
}
}
在run
方法中,我们注册notificationConsumer
,使其在匹配给定选择器的事件触发时被触发。
注意我们如何使用$
属性的静态导入来创建一个Selector
对象。
5. 测试应用
现在,让我们创建一个测试来看看NotificationApplication
的运行情况:
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {
@LocalServerPort
private int port;
@Test
public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
}
}
正如我们所看到的,一旦请求执行,所有十个任务立即提交,而不会产生任何阻塞。提交后,通知事件将并行处理。
Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9
重要的是要记住,在我们的场景中,没有必要按照特定顺序处理这些事件。
6. 总结
在这篇快速教程中,我们创建了一个简单的事件驱动应用。我们也看到了如何开始编写更反应式和非阻塞的代码。
然而,这个场景只是触及了主题的表面,仅作为开始实验反应式范式的良好基础。
如往常一样,源代码可在GitHub上找到。