1. 概述

在这篇简短的文章中,我们将通过构建一个现实场景来介绍Reactor项目中的reactor-bus

注意:在Reactor 3.x版本中已移除reactor-bus项目:存档的reactor-bus存储库

2. Reactor的基本原理

2.1. 为什么选择Reactor?

现代应用需要处理大量并发请求并处理大量数据。标准的阻塞代码已经不足以满足这些需求。

事件驱动架构模式 是一种异步处理大量并发服务请求的基于事件的架构方法,这些请求来自单个或多个服务处理者。

Reactor项目基于此模式,并有明确且雄心勃勃的目标,即在JVM上构建非阻塞、反应式的应用程序。

2.2. 示例场景

在开始之前,让我们列举一些使用反应式架构风格有意义的场景,以便了解我们可能在哪里应用它:

  • 像亚马逊这样的大型在线购物平台的通知服务
  • 银行业务中的大规模交易处理服务
  • 股票交易业务,股票价格会同时变化

3. Maven依赖项

现在,让我们通过在pom.xml中添加以下依赖来开始使用Project Reactor Bus:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

您可以在Maven中央仓库检查reactor-bus的最新版本。

4. 构建示例应用

为了更好地理解基于reactor的方法的好处,让我们看一个实际的例子。

我们将构建一个简单的应用,负责向在线购物平台的用户发送通知。例如,如果用户下新订单,应用将通过电子邮件或短信发送订单确认。

传统的同步实现自然会受到电子邮件或短信服务吞吐量的限制。因此,像节假日这样的流量高峰通常会存在问题。

采用反应式方法,我们可以设计系统更灵活,更好地适应外部系统(如网关服务器)可能出现的故障或超时。

让我们来看看应用——从传统方面开始,然后转向更反应式的构建。

4.1. 简单POJO

首先,创建一个POJO类来表示通知数据:

public class NotificationData {
    
    private long id;
    private String name;
    private String email;
    private String mobile;
    
    // getter and setter methods
}

4.2. 服务层

接下来,定义一个简单的服务层:

public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;

}

以及模拟长时间运行操作的实现:

@Service
public class NotificationServiceimpl implements NotificationService {
    
    @Override
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
        
      Thread.sleep(5000);
        
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());
    }
}

请注意,为了演示通过电子邮件或短信网关发送消息的真实场景,我们在initiateNotification方法中故意引入了5秒的延迟,使用Thread.sleep(5000)

因此,当线程到达服务时,它会被阻塞5秒。

4.3. 消费者

现在,让我们转向应用的更反应式部分,实现一个消费者,然后将其映射到reactor事件总线:

@Service
public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    @Autowired
    private NotificationService notificationService;
    
    @Override
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        
        try {
            notificationService.initiateNotification(notificationData);
        } catch (InterruptedException e) {
            // ignore        
        }    
    }
}

如我们所见,我们创建的消费者实现了Consumer<T>接口。主要逻辑位于accept方法中。

这与典型的Spring监听器实现类似。

4.4. 控制器

最后,既然我们能够消费事件,我们也来生成它们。

我们将在一个简单的控制器中做到这一点:

@Controller
public class NotificationController {

    @Autowired
    private EventBus eventBus;

    @GetMapping("/startNotification/{param}")
    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();
            data.setId(i);

            eventBus.notify("notificationConsumer", Event.wrap(data));

            System.out.println(
              "Notification " + i + ": notification task submitted successfully");
        }
    }
}

这相当直观——我们在这里通过EventBus发出事件。

例如,如果客户端访问带有参数值为十的URL,则会通过事件总线发送十个事件。

4.5. Java配置

现在,让我们将所有内容放在一起,创建一个简单的Spring Boot应用。

首先,我们需要配置EventBusEnvironment bean:

@Configuration
public class Config {

    @Bean
    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

在我们的例子中,**我们使用环境中的默认线程池实例化EventBus**。

或者,我们可以使用自定义的Dispatcher实例:

EventBus evBus = EventBus.create(
  env, 
  Environment.newDispatcher(
    REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,   
    DispatcherType.THREAD_POOL_EXECUTOR));

现在,我们准备好创建主应用代码:

import static reactor.bus.selector.Selectors.$;

@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private NotificationConsumer notificationConsumer;

    @Override
    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);
    }

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class, args);
    }
}

run方法中,我们注册notificationConsumer,使其在匹配给定选择器的事件触发时被触发

注意我们如何使用$属性的静态导入来创建一个Selector对象。

5. 测试应用

现在,让我们创建一个测试来看看NotificationApplication的运行情况:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    @LocalServerPort
    private int port;

    @Test
    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
    }
}

正如我们所看到的,一旦请求执行,所有十个任务立即提交,而不会产生任何阻塞。提交后,通知事件将并行处理。

Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

重要的是要记住,在我们的场景中,没有必要按照特定顺序处理这些事件

6. 总结

在这篇快速教程中,我们创建了一个简单的事件驱动应用。我们也看到了如何开始编写更反应式和非阻塞的代码。

然而,这个场景只是触及了主题的表面,仅作为开始实验反应式范式的良好基础。

如往常一样,源代码可在GitHub上找到。


« 上一篇: Java中的Dijkstra算法
» 下一篇: Nashorn介绍