1. 概述

在本教程中,我们将介绍 Wire Tap 企业集成模式 (EIP),它可以帮助我们监控流经系统的消息。

这种模式使我们能够 拦截消息,而无需将它们永久地从通道中消耗掉

2. 丝攻模式

Wire Tap 检查在点对点通道上传输的消息。它接收消息,制作副本,然后将其发送到 Tap Destination:

窃听企业集成模式

为了更好地理解这一点,让我们使用ActiveMQCamel创建一个 Spring Boot 应用程序。

3.Maven依赖

让我们添加 camel-spring-boot-dependency

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-dependencies</artifactId>
            <version>${camel.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

现在,我们将添加 camel-spring-boot-starter

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-spring-boot-starter</artifactId>
</dependency>

要查看流经路由的消息,我们还需要包含 ActiveMQ

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-activemq-starter</artifactId>
</dependency>

4. 消息交换

让我们创建一个消息对象:

public class MyPayload implements Serializable {
    private String value;
    ...
}

我们将将此消息发送到 direct:source 以启动路由:

try (CamelContext context = new DefaultCamelContext()) {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
    connectionFactory.setTrustAllPackages(true);
    context.addComponent("direct", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
    addRoute(context);

    try (ProducerTemplate template = context.createProducerTemplate()) {
        context.start();

        MyPayload payload = new MyPayload("One");
        template.sendBody("direct:source", payload);
        Thread.sleep(10000);
    } finally {
        context.stop();
    }
}

接下来,我们将添加路线并点击目的地。

5.利用交易所

我们将使用 wireTap 方法来设置Tap Destination 的端点URI。 Camel 不会等待 wireTap 的响应,因为它将消息交换模式 设置 InOnly 。 Wire Tap 处理器在 单独的线程 上处理它

wireTap("direct:tap").delay(1000)

Camel 的 Wire Tap 节点在点击交易所时支持两种风格:

5.1.传统窃听器

让我们添加传统的 Wire Tap 路由:

RoutesBuilder traditionalWireTapRoute() {
    return new RouteBuilder() {
        public void configure() {

            from("direct:source").wireTap("direct:tap")
                .delay(1000)
                .bean(MyBean.class, "addTwo")
                .to("direct:destination");

            from("direct:tap").log("Tap Wire route: received");

            from("direct:destination").log("Output at destination: '${body}'");
        }
    };
}

在这里,Camel 只会 复制 Exchange它不会进行深度克隆 。所有副本都可以共享原始交换中的对象。

同时处理多个消息时,可能 损坏最终的有效负载 。我们可以在将有效负载传递到 Tap Destination 之前创建一个深度克隆,以防止这种情况发生。

5.2.发送新的交换

Wire Tap EIP 支持 表达式处理器 ,预先填充了交换的副本。 表达式 只能用于设置消息正文。

处理器 变体赋予交换如何填充的全部权力(设置属性、标头等)。

让我们在有效负载中实现深度克隆:

public class MyPayload implements Serializable {

    private String value;
    ...
    public MyPayload deepClone() {
        MyPayload myPayload = new MyPayload(value);
        return myPayload;
   }
}

现在,让我们使用原始交换的副本作为输入来实现 Processor 类:

public class MyPayloadClonePrepare implements Processor {

    public void process(Exchange exchange) throws Exception {
        MyPayload myPayload = exchange.getIn().getBody(MyPayload.class);
        exchange.getIn().setBody(myPayload.deepClone());
        exchange.getIn().setHeader("date", new Date());
    }
}

我们将在 wireTap 之后立即使用 onPrepare 来调用它:

RoutesBuilder newExchangeRoute() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {

        from("direct:source").wireTap("direct:tap")
            .onPrepare(new MyPayloadClonePrepare())
            .end()
            .delay(1000);

        from("direct:tap").bean(MyBean.class, "addThree");
        }
     };
}

六,结论

在本文中,我们实现了 Wire Tap 模式来监视通过某些消息端点传递的消息。使用 Apache Camel 的 wireTap ,我们复制消息并将其发送到不同的端点,而不改变现有的流。

Camel 支持两种方式来挖掘交易所。在传统的 Wire Tap 中,原始交换被复制。第二,我们可以创建一个新的交换。我们可以使用 表达式 用消息正文的新值填充这个新的交换,或者我们可以使用 处理器 设置标头(以及可选的正文)。

代码示例可在 GitHub 上获取。