1. 概述
在本教程中,我们将介绍 Wire Tap 企业集成模式 (EIP),它可以帮助我们监控流经系统的消息。
这种模式使我们能够 拦截消息,而无需将它们永久地从通道中消耗掉 。
2. 丝攻模式
Wire Tap 检查在点对点通道上传输的消息。它接收消息,制作副本,然后将其发送到 Tap Destination:
为了更好地理解这一点,让我们使用ActiveMQ和Camel创建一个 Spring Boot 应用程序。
3.Maven依赖
让我们添加 camel-spring-boot-dependency :
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-dependencies</artifactId>
<version>${camel.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
现在,我们将添加 camel-spring-boot-starter :
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
</dependency>
要查看流经路由的消息,我们还需要包含 ActiveMQ :
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-activemq-starter</artifactId>
</dependency>
4. 消息交换
让我们创建一个消息对象:
public class MyPayload implements Serializable {
private String value;
...
}
我们将将此消息发送到 direct:source 以启动路由:
try (CamelContext context = new DefaultCamelContext()) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connectionFactory.setTrustAllPackages(true);
context.addComponent("direct", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
addRoute(context);
try (ProducerTemplate template = context.createProducerTemplate()) {
context.start();
MyPayload payload = new MyPayload("One");
template.sendBody("direct:source", payload);
Thread.sleep(10000);
} finally {
context.stop();
}
}
接下来,我们将添加路线并点击目的地。
5.利用交易所
我们将使用 wireTap 方法来设置Tap Destination 的端点URI。 Camel 不会等待 wireTap 的响应,因为它将消息交换模式 设置 为 InOnly 。 Wire Tap 处理器在 单独的线程 上处理它 :
wireTap("direct:tap").delay(1000)
Camel 的 Wire Tap 节点在点击交易所时支持两种风格:
5.1.传统窃听器
让我们添加传统的 Wire Tap 路由:
RoutesBuilder traditionalWireTapRoute() {
return new RouteBuilder() {
public void configure() {
from("direct:source").wireTap("direct:tap")
.delay(1000)
.bean(MyBean.class, "addTwo")
.to("direct:destination");
from("direct:tap").log("Tap Wire route: received");
from("direct:destination").log("Output at destination: '${body}'");
}
};
}
在这里,Camel 只会 复制 Exchange – 它不会进行深度克隆 。所有副本都可以共享原始交换中的对象。
同时处理多个消息时,可能 会 损坏最终的有效负载 。我们可以在将有效负载传递到 Tap Destination 之前创建一个深度克隆,以防止这种情况发生。
5.2.发送新的交换
Wire Tap EIP 支持 表达式 或 处理器 ,预先填充了交换的副本。 表达式 只能用于设置消息正文。
处理器 变体赋予交换如何填充的全部权力(设置属性、标头等)。
让我们在有效负载中实现深度克隆:
public class MyPayload implements Serializable {
private String value;
...
public MyPayload deepClone() {
MyPayload myPayload = new MyPayload(value);
return myPayload;
}
}
现在,让我们使用原始交换的副本作为输入来实现 Processor 类:
public class MyPayloadClonePrepare implements Processor {
public void process(Exchange exchange) throws Exception {
MyPayload myPayload = exchange.getIn().getBody(MyPayload.class);
exchange.getIn().setBody(myPayload.deepClone());
exchange.getIn().setHeader("date", new Date());
}
}
我们将在 wireTap 之后立即使用 onPrepare 来调用它:
RoutesBuilder newExchangeRoute() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
from("direct:source").wireTap("direct:tap")
.onPrepare(new MyPayloadClonePrepare())
.end()
.delay(1000);
from("direct:tap").bean(MyBean.class, "addThree");
}
};
}
六,结论
在本文中,我们实现了 Wire Tap 模式来监视通过某些消息端点传递的消息。使用 Apache Camel 的 wireTap ,我们复制消息并将其发送到不同的端点,而不改变现有的流。
Camel 支持两种方式来挖掘交易所。在传统的 Wire Tap 中,原始交换被复制。第二,我们可以创建一个新的交换。我们可以使用 表达式 用消息正文的新值填充这个新的交换,或者我们可以使用 处理器 设置标头(以及可选的正文)。
代码示例可在 GitHub 上获取。