消息路由
本章介绍了使用 Spring Integration 路由消息的详细信息。
路由器
本节介绍路由器的工作原理。它包括以下主题:
概述
路由器是许多消息传递体系结构中的关键元素。它们从消息通道消费消息,并根据一组条件将每个消费消息转发到一个或多个不同的消息通道。
Spring Integration 提供以下路由器:
路由器实现共享许多配置参数。但是,路由器之间存在某些差异。此外,配置参数的可用性取决于路由器是在链内部还是外部使用。为了提供快速概览,所有可用属性都列在以下两个表格中。
下表显示了可用于链外路由器的配置参数:
属性 | 路由器 | 标头值路由器 | xpath 路由器 | 有效载荷类型路由器 | 收件人列表路由 | 异常类型路由器 |
---|---|---|---|---|---|---|
应用序列 |
||||||
默认输出通道 |
||||||
分辨率要求 |
||||||
忽略发送失败 |
||||||
暂停 |
||||||
ID |
||||||
自动启动 |
||||||
输入通道 |
||||||
命令 |
||||||
方法 |
||||||
参考 |
||||||
表达 |
||||||
标题名称 |
||||||
评估为字符串 |
||||||
xpath 表达式引用 |
||||||
转换器 |
下表显示了可用于链内路由器的配置参数:
属性 | 路由器 | 标头值路由器 | xpath 路由器 | 有效载荷类型路由器 | 收件人列表路由器 | 异常类型路由器 |
---|---|---|---|---|---|---|
应用序列 |
||||||
默认输出通道 |
||||||
分辨率要求 |
||||||
忽略发送失败 |
||||||
暂停 |
||||||
ID |
||||||
自动启动 |
||||||
输入通道 |
||||||
命令 |
||||||
方法 |
||||||
参考 |
||||||
表达 |
||||||
标题名称 |
||||||
评估为字符串 |
||||||
xpath 表达式引用 |
||||||
转换器 |
从 Spring Integration 2.1 开始,路由器参数在所有路由器实现中都更加标准化。因此,一些小的更改可能会破坏旧的基于 Spring Integration 的应用程序。 从 Spring Integration 2.1 开始,该 在进行这些更改之前,该 如果您确实希望静默发送消息,您可以设置 |
常用路由器参数
本节介绍所有路由器参数共有的参数(在本章前面显示的两个表格中勾选了所有框的参数)。
链的内部和外部
以下参数对链内外的所有路由器均有效。
apply-sequence
-
此属性指定是否应将序列号和大小标头添加到每条消息。此可选属性默认为
false
. default-output-channel
-
如果设置,则此属性提供对通道的引用,如果通道解析未能返回任何通道,则应将消息发送到该通道。如果没有提供默认输出通道,路由器会抛出异常。如果您想以静默方式删除这些消息,请将默认输出通道属性值设置为
nullChannel
.default-output-channel
仅向if 发送消息,resolution-required
并且false
通道未解析。 resolution-required
-
此属性指定通道名称是否必须始终成功解析为存在的通道实例。如果设置为
true
,MessagingException
则在无法解析通道时引发 a。将此属性设置为false
会导致忽略任何不可解析的通道。此可选属性默认为true
.消息仅发送到 default-output-channel
,如果指定,何时resolution-required
是false
并且通道未解析。 ignore-send-failures
-
如果设置为
true
,发送到消息通道的失败将被忽略。如果设置为false
,MessageDeliveryException
则抛出 a ,并且如果路由器解析多个通道,则任何后续通道都不会收到消息。此属性的确切行为取决于
Channel
消息发送到的类型。例如,当使用直接通道(单线程)时,发送失败可能是由更下游的组件抛出的异常引起的。但是,当向简单的队列通道(异步)发送消息时,抛出异常的可能性相当小。虽然大多数路由器路由到单个通道,但它们可以返回多个通道名称。例如 recipient-list-router
, 正是这样做的。如果true
在只路由到单个通道的路由器上将此属性设置为,则任何导致的异常都会被吞没,这通常没有什么意义。在这种情况下,最好在流入口点捕获错误流中的异常。因此,当路由器实现返回多个通道名称时 ,将ignore-send-failures
属性设置为通常更有意义,因为失败的通道之后的其他通道仍会收到消息。true
此属性默认为
false
. timeout
-
该
timeout
属性指定向目标消息通道发送消息时等待的最长时间(以毫秒为单位)。默认情况下,发送操作无限期阻塞。
顶层(链外)
以下参数仅对链外的所有顶级路由器有效。
id
-
标识底层 Spring bean 定义,在路由器的情况下,它是
EventDrivenConsumer
or的一个实例PollingConsumer
,具体取决于路由器input-channel
是 aSubscribableChannel
还是 aPollableChannel
。这是一个可选属性。 auto-startup
-
这个“生命周期”属性表明该组件是否应该在应用程序上下文启动期间启动。此可选属性默认为
true
. input-channel
-
此端点的接收消息通道。
order
-
此属性定义此端点作为订阅者连接到通道时的调用顺序。当该通道使用故障转移调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询使用者时,它没有任何作用。
路由器实现
由于基于内容的路由通常需要一些特定于域的逻辑,因此大多数用例都需要 Spring Integration 的选项,以便通过使用 XML 命名空间支持或注释来委托给 POJO。这两个将在后面讨论。但是,我们首先介绍几个满足常见要求的实现。
PayloadTypeRouter
APayloadTypeRouter
将消息发送到由有效负载类型映射定义的通道,如以下示例所示:
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
PayloadTypeRouter
Spring Integration 提供的命名空间也支持配置(请参阅 参考资料Namespace Support
),它通过将<router/>
配置及其相应的实现(使用<bean/>
元素定义)组合成一个更简洁的配置元素,从本质上简化了配置。以下示例显示了PayloadTypeRouter
与上述配置等效但使用命名空间支持的配置:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
使用 Java DSL 时,有两种选择。
首先,您可以定义路由器对象,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlows.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
请注意,路由器可以是但不一定是@Bean
. 如果它不是@Bean
.
其次,您可以在 DSL 流本身内定义路由功能,如以下示例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlows.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
AHeaderValueRouter
根据各个标头值映射将消息发送到通道。创建a 时HeaderValueRouter
,它会使用要评估的标头的名称进行初始化。标头的值可能是以下两种情况之一:
-
任意值
-
频道名称
如果它是任意值,则需要将这些标头值附加到通道名称的映射。否则,不需要额外的配置。
Spring Integration 提供了一个简单的基于命名空间的 XML 配置来配置HeaderValueRouter
. 以下示例演示了HeaderValueRouter
何时需要将标头值映射到通道的配置:
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析过程中,上例中定义的路由器可能会遇到通道解析失败,导致异常。如果您想抑制此类异常并将未解决的消息发送到default-output-channel
设置resolution-required
为false
.
通常,标头值未显式映射到通道的消息将发送到default-output-channel
. 但是,当标头值映射到通道名称但无法解析通道时,将resolution-required
属性设置为会false
导致将此类消息路由到default-output-channel
.
从 Spring Integration 2.1 开始,该属性已从 更改ignore-channel-name-resolution-failures 为resolution-required 。属性resolution-required 默认为true .
|
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
使用 Java DSL 时,有两种选择。首先,您可以定义路由器对象,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlows.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
请注意,路由器可以是但不一定是@Bean
. 如果它不是@Bean
.
其次,您可以在 DSL 流本身内定义路由功能,如以下示例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlows.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
不需要将标头值映射到通道名称的配置,因为标头值本身代表通道名称。以下示例显示了不需要将标头值映射到通道名称的路由器:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
从 Spring Integration 2.1 开始,解析通道的行为更加明确。例如,如果您省略该 基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。如果您真的想删除消息,您还必须 |
RecipientListRouter
ARecipientListRouter
将每个接收到的消息发送到一个静态定义的消息通道列表。以下示例创建一个RecipientListRouter
:
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring Integration 还为RecipientListRouter
配置提供命名空间支持(请参阅命名空间支持),如以下示例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例显示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlows.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
此处的“应用序列”标志与发布订阅通道的效果相同,并且与发布订阅通道一样,默认情况下它在recipient-list-router . 有关详细信息,请参阅PublishSubscribeChannel 配置。
|
配置 a 时的另一个方便选项RecipientListRouter
是使用 Spring 表达式语言 (SpEL) 支持作为各个接收者通道的选择器。这样做类似于在“链”的开头使用过滤器来充当“选择性消费者”。但是,在这种情况下,它们都相当简洁地组合到路由器的配置中,如以下示例所示:
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在前面的配置中,selector-expression
评估由属性标识的 SpEL 表达式以确定该收件人是否应包含在给定输入消息的收件人列表中。表达式的评估结果必须是布尔值。如果未定义此属性,则频道始终位于收件人列表中。
RecipientListRouterManagement
从 4.1 版开始,RecipientListRouter
提供了多种操作来在运行时动态操作收件人。这些管理操作是RecipientListRouterManagement
通过@ManagedResource
注解来呈现的。它们可通过使用控制总线和 JMX 获得,如以下示例所示:
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");
从应用程序启动simpleRouter
,只有一个channel1
收件人。但是在addRecipient
命令之后,channel2
添加了收件人。这是一个“注册对作为消息的一部分的东西感兴趣”的用例,当我们可能在某个时间段对来自路由器的消息感兴趣时,我们正在订阅recipient-list-router
并在某个时候决定取消订阅。
由于 的运行时管理操作<recipient-list-router>
,它可以<recipient>
从一开始就无需任何配置。RecipientListRouter
在这种情况下,当消息没有匹配的收件人时, 的行为是相同的。如果defaultOutputChannel
已配置,则将消息发送到那里。否则MessageDeliveryException
抛出。
XPath 路由器
XPath 路由器是 XML 模块的一部分。请参阅使用 XPath 路由 XML 消息。
路由和错误处理
Spring Integration 还提供了一个特殊的基于类型的路由器,称为ErrorMessageExceptionTypeRouter
路由错误消息(定义为实例的消息payload
)Throwable
。
ErrorMessageExceptionTypeRouter
类似于PayloadTypeRouter
. 事实上,它们几乎是相同的。唯一的区别是,在PayloadTypeRouter
导航有效负载实例的实例层次结构(例如,payload.getClass().getSuperclass()
)以查找最具体的类型和通道映射时,ErrorMessageExceptionTypeRouter
导航“异常原因”的层次结构(例如,payload.getCause()
)以查找最具体的Throwable
类型或通道映射并用于与类或任何超类mappingClass.isInstance(cause)
匹配。cause
在这种情况下,通道映射顺序很重要。因此,如果需要获取 aIllegalArgumentException 而非 a的映射RuntimeException ,则必须首先在路由器上配置最后一个。
|
从 4.3 版ErrorMessageExceptionTypeRouter 开始,在初始化阶段加载所有映射类以快速为ClassNotFoundException .
|
以下示例显示了以下示例配置ErrorMessageExceptionTypeRouter
:
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />
配置通用路由器
Spring Integration 提供了一个通用路由器。您可以将它用于通用路由(与 Spring Integration 提供的其他路由器相反,每个路由器都有某种形式的专业化)。
使用 XML 配置基于内容的路由器
该router
元素提供了一种将路由器连接到输入通道的方法,并且还接受可选default-output-channel
属性。该ref
属性引用自定义路由器实现的 bean 名称(必须扩展AbstractMessageRouter
)。以下示例显示了三个通用路由器:
<int:router ref="payloadTypeRouter" input-channel="input1"
default-output-channel="defaultOutput1"/>
<int:router ref="recipientListRouter" input-channel="input2"
default-output-channel="defaultOutput2"/>
<int:router ref="customRouter" input-channel="input3"
default-output-channel="defaultOutput3"/>
<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>
或者,ref
可以指向包含@Router
注释的 POJO(稍后显示),或者您可以将ref
与显式方法名称结合使用。指定方法会应用@Router
本文档后面的注释部分中描述的相同行为。ref
以下示例定义了一个路由器,该路由器在其属性中指向 POJO :
<int:router input-channel="input" ref="somePojo" method="someMethod"/>
ref
如果在其他定义中引用了自定义路由器实现,我们通常建议使用属性<router>
。但是,如果自定义路由器实现的范围应为 的单个定义<router>
,您可以提供内部 bean 定义,如以下示例所示:
<int:router method="someMethod" input-channel="input3"
default-output-channel="defaultOutput3">
<beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
不允许在同一配置中
同时使用ref 属性和内部处理程序定义。<router> 这样做会产生模棱两可的条件并引发异常。
|
如果ref 属性引用了一个扩展的bean AbstractMessageProducingHandler (比如框架本身提供的路由器),那么配置会被优化为直接引用路由器。在这种情况下,每个ref 属性必须引用一个单独的 bean 实例(或一个prototype -scoped bean)或使用内部<bean/> 配置类型。但是,仅当您未在路由器 XML 定义中提供任何特定于路由器的属性时,此优化才适用。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。
|
以下示例显示了在 Java 中配置的等效路由器:
@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
以下示例显示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlows.from("routingChannel")
.route(myCustomRouter())
.get();
}
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
或者,您可以路由来自消息有效负载的数据,如以下示例所示:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlows.from("routingChannel")
.route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
.get();
}
路由器和 Spring 表达式语言 (SpEL)
有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 bean 可能看起来有点矫枉过正。从 Spring Integration 2.0 开始,我们提供了一种替代方案,让您可以使用 SpEL 来实现以前需要自定义 POJO 路由器的简单计算。
有关 Spring 表达式语言的更多信息,请参阅Spring Framework Reference Guide 中的相关章节。 |
通常,对 SpEL 表达式求值并将其结果映射到通道,如以下示例所示:
<int:router input-channel="inChannel" expression="payload.paymentType">
<int:mapping value="CASH" channel="cashPaymentChannel"/>
<int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
<int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>
以下示例显示了在 Java 中配置的等效路由器:
@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
router.setChannelMapping("CASH", "cashPaymentChannel");
router.setChannelMapping("CREDIT", "authorizePaymentChannel");
router.setChannelMapping("DEBIT", "authorizePaymentChannel");
return router;
}
以下示例显示了在 Java DSL 中配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlows.from("routingChannel")
.route("payload.paymentType", r -> r
.channelMapping("CASH", "cashPaymentChannel")
.channelMapping("CREDIT", "authorizePaymentChannel")
.channelMapping("DEBIT", "authorizePaymentChannel"))
.get();
}
为了进一步简化,SpEL 表达式可以计算为通道名称,如下面的表达式所示:
<int:router input-channel="inChannel" expression="payload + 'Channel'"/>
在前面的配置中,结果通道由 SpEL 表达式计算,该表达式将 的值payload
与文字String
'Channel' 连接起来。
SpEL 用于配置路由器的另一个优点是表达式可以返回 a Collection
,从而有效地使每个<router>
收件人列表成为路由器。每当表达式返回多个通道值时,都会将消息转发到每个通道。以下示例显示了这样的表达式:
<int:router input-channel="inChannel" expression="headers.channels"/>
在上述配置中,如果消息包含名称为“channels”的标头,并且该标头的值是List
通道名称中的一个,则将消息发送到列表中的每个通道。当您需要选择多个通道时,您可能还会发现集合投影和集合选择表达式很有用。有关详细信息,请参阅:
使用注释配置路由器
当使用@Router
注解一个方法时,该方法可能返回一个MessageChannel
或一个String
类型。在后一种情况下,端点解析通道名称,就像它解析默认输出通道一样。此外,该方法可以返回单个值或集合。如果返回集合,则将回复消息发送到多个通道。总而言之,以下方法签名都是有效的:
@Router
public MessageChannel route(Message message) {...}
@Router
public List<MessageChannel> route(Message message) {...}
@Router
public String route(Foo payload) {...}
@Router
public List<String> route(Foo payload) {...}
除了基于负载的路由之外,还可以基于消息头中可用的元数据作为属性或属性来路由消息。在这种情况下,带有注释的方法@Router
可能包含带有注释的参数@Header
,该参数映射到标头值,如以下示例所示并记录在注释支持中:
@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
有关基于 XML 的消息的路由,包括 XPath 支持,请参阅XML 支持 - 处理 XML 有效负载。 |
有关路由器配置的更多信息,另请参阅Java DSL 章节中的消息路由器。
动态路由器
Spring Integration 为常见的基于内容的路由用例提供了许多不同的路由器配置,以及将自定义路由器实现为 POJO 的选项。例如,PayloadTypeRouter
提供了一种简单的方法来配置路由器,该路由器根据传入消息的有效负载类型计算通道,HeaderValueRouter
同时在配置路由器时提供相同的便利,该路由器通过评估特定消息头的值来计算通道。还有基于表达式 (SpEL) 的路由器,其中通道是基于评估表达式来确定的。所有这些类型的路由器都表现出一些动态特性。
但是,这些路由器都需要静态配置。即使在基于表达式的路由器的情况下,表达式本身也被定义为路由器配置的一部分,这意味着对相同值操作的相同表达式总是导致计算相同通道。这在大多数情况下是可以接受的,因为这样的路线是明确定义的,因此是可预测的。但是有时我们需要动态更改路由器配置,以便消息流可以路由到不同的通道。
例如,您可能希望关闭系统的某些部分以进行维护,并临时将消息重新路由到不同的消息流。作为另一个示例,您可能希望通过添加另一个路由来处理更具体的类型java.lang.Number
(在 的情况下PayloadTypeRouter
)来为您的消息流引入更多粒度。
不幸的是,使用静态路由器配置来实现这些目标中的任何一个,您都必须关闭整个应用程序,更改路由器的配置(更改路由),然后重新启动应用程序。这显然不是任何人想要的解决方案。
动态路由器模式描述了您可以动态更改或配置路由器而无需关闭系统或单个路由器的机制。
在详细了解 Spring Integration 如何支持动态路由之前,我们需要考虑路由器的典型流程:
-
计算一个通道标识符,它是路由器收到消息后计算的一个值。通常,它是 String 或实际
MessageChannel
. -
将通道标识符解析为通道名称。我们将在本节后面描述此过程的细节。
-
将通道名称解析为实际
MessageChannel
如果步骤 1 产生了 的实际实例,那么动态路由方面就无能为力了MessageChannel
,因为MessageChannel
是任何路由器工作的最终产品。但是,如果第一步生成的通道标识符不是 的实例MessageChannel
,那么您有很多可能的方法来影响MessageChannel
. 考虑以下负载类型路由器的示例:
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="channel1" />
<int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>
在负载类型路由器的上下文中,前面提到的三个步骤将实现如下:
-
计算作为有效负载类型的完全限定名称的通道标识符(例如,
java.lang.String
)。 -
将通道标识符解析为通道名称,其中上一步的结果用于从
mapping
元素中定义的有效负载类型映射中选择适当的值。 -
将通道名称解析为 的实际实例,作为对由上一步的结果标识
MessageChannel
的应用程序上下文(希望是 a )中的 bean 的引用。MessageChannel
换句话说,每个步骤都支持下一步,直到过程完成。
现在考虑一个标头值路由器的示例:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
<int:mapping value="foo" channel="fooChannel" />
<int:mapping value="bar" channel="barChannel" />
</int:header-value-router>
现在我们可以考虑这三个步骤如何用于标头值路由器:
-
计算一个通道标识符,它是由
header-name
属性标识的标头的值。 -
将通道标识符 a 解析为通道名称,其中上一步的结果用于从
mapping
元素中定义的通用映射中选择适当的值。 -
将通道名称解析为 的实际实例,作为对由上一步的结果标识
MessageChannel
的应用程序上下文(希望是 a )中的 bean 的引用。MessageChannel
两种不同路由器类型的前两种配置看起来几乎相同。但是,如果您查看替代配置,HeaderValueRouter
我们会清楚地看到没有mapping
子元素,如以下清单所示:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
但是,配置仍然完全有效。那么自然的问题就是第二步的映射呢?
第二步现在是可选的。如果mapping
未定义,则在第一步中计算的通道标识符值将自动视为channel name
,现在解析为实际的MessageChannel
,如第三步中一样。这也意味着第二步是为路由器提供动态特征的关键步骤之一,因为它引入了一个过程,可以让您更改通道标识符解析为通道名称的方式,从而影响确定最终的过程MessageChannel
来自初始通道标识符的实例。
例如,在前面的配置中,假设testHeader
值为 'kermit',现在是通道标识符(第一步)。由于此路由器中没有映射,因此无法将此通道标识符解析为通道名称(第二步),并且此通道标识符现在被视为通道名称。但是,如果有一个映射但具有不同的值怎么办?最终结果仍然是相同的,因为如果无法通过将通道标识符解析为通道名称的过程确定新值,则通道标识符将成为通道名称。
剩下的就是第三步,将通道名称(“kermit”)解析MessageChannel
为由该名称标识的实际实例。这基本上涉及对提供的名称的 bean 查找。现在所有包含 header-value 对的消息testHeader=kermit
都将被路由到MessageChannel
其 bean 名称(其id
)为“kermit”的。
但是,如果您想将这些消息路由到“simpson”频道怎么办?显然更改静态配置是可行的,但这样做也需要关闭系统。但是,如果您可以访问频道标识符映射,则可以在 header-value 对 now 的位置引入一个新映射kermit=simpson
,从而让第二步将“kermit”视为频道标识符,同时将其解析为“辛普森”作为频道姓名。
这显然适用于PayloadTypeRouter
,您现在可以重新映射或删除特定的有效负载类型映射。事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步解析为实际的channel name
.
任何作为 子类的路由器AbstractMappingMessageRouter
(包括大多数框架定义的路由器)都是动态路由器,因为channelMapping
是在级别定义的AbstractMappingMessageRouter
。该地图的 setter 方法与“setChannelMapping”和“removeChannelMapping”方法一起作为公共方法公开。这些允许您在运行时更改、添加和删除路由器映射,只要您有对路由器本身的引用。这也意味着您可以通过 JMX(请参阅JMX 支持)或 Spring Integration 控制总线(请参阅控制总线)功能公开这些相同的配置选项。
回退到频道键作为频道名称,灵活方便。但是,如果您不信任消息创建者,恶意行为者(了解系统)可能会创建路由到意外通道的消息。例如,如果 key 设置为路由器输入通道的通道名称,这样的消息将被路由回路由器,最终导致堆栈溢出错误。因此,您可能希望禁用此功能(将channelKeyFallback 属性设置为false ),并在需要时更改映射。
|
使用控制总线管理路由器映射
管理路由器映射的一种方法是通过控制总线模式,它公开了一个控制通道,您可以向该通道发送控制消息以管理和监视 Spring Integration 组件,包括路由器。
有关控制总线的更多信息,请参阅控制总线。 |
通常,您会发送一条控制消息,要求在特定托管组件(例如路由器)上调用特定操作。以下托管操作(方法)特定于更改路由器解析过程:
-
public void setChannelMapping(String key, String channelName)
channel identifier
:允许您在和之间添加新的或修改现有的映射channel name
-
public void removeChannelMapping(String key)
channel identifier
:让您删除特定的通道映射,从而断开和之间的关系channel name
请注意,这些方法可用于简单的更改(例如更新单个路由或添加或删除路由)。但是,如果您想删除一个路由并添加另一个路由,则更新不是原子的。这意味着路由表可能在更新之间处于不确定状态。从 4.0 版开始,您现在可以使用控制总线自动更新整个路由表。以下方法可让您这样做:
-
public Map<String, String>getChannelMappings()
:返回当前映射。 -
public void replaceChannelMappings(Properties channelMappings)
:更新映射。请注意,channelMappings
参数是一个Properties
对象。这种安排让控制总线命令使用内置的StringToPropertiesConverter
,如以下示例所示:
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
请注意,每个映射都由换行符 ( \n
) 分隔。setChannelMappings
对于地图的编程更改,出于类型安全的考虑
,我们建议您使用该方法。replaceChannelMappings
忽略不是String
对象的键或值。
使用 JMX 管理路由器映射
您还可以使用 Spring 的 JMX 支持来公开路由器实例,然后使用您最喜欢的 JMX 客户端(例如,JConsole)来管理这些操作(方法)以更改路由器的配置。
有关 Spring Integration 的 JMX 支持的更多信息,请参阅JMX 支持。 |
路由单
从版本 4.1 开始,Spring Integration 提供了路由单企业集成模式的实现。它被实现为一个消息头,当没有为端点指定an 时routingSlip
,它用于确定AbstractMessageProducingHandler
实例中的下一个通道。outputChannel
这种模式在复杂的动态情况下很有用,因为当配置多个路由器来确定消息流变得困难时。当消息到达没有 的端点时output-channel
,将routingSlip
咨询 以确定将消息发送到的下一个通道。当路由单用尽时,replyChannel
恢复正常处理。
路由单的配置作为一个HeaderEnricher
选项提供 - 一个包含path
条目的分号分隔的路由单,如以下示例所示:
<util:properties id="properties">
<beans:prop key="myRoutePath1">channel1</beans:prop>
<beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>
<context:property-placeholder properties-ref="properties"/>
<header-enricher input-channel="input" output-channel="process">
<routing-slip
value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>
前面的例子有:
-
一种
<context:property-placeholder>
配置,用于证明路由清单中的条目path
可以指定为可解析的键。 -
<header-enricher>
<routing-slip>
子元素用于填充RoutingSlipHeaderValueMessageProcessor
处理HeaderEnricher
程序。 -
RoutingSlipHeaderValueMessageProcessor
接受一String
组已解析的路由单条目path
并返回(从processMessage()
) asingletonMap
与path
askey
和0
as initialroutingSlipIndex
。
Routing Slippath
条目可以包含MessageChannel
bean 名称、RoutingSlipRouteStrategy
bean 名称和 Spring 表达式 (SpEL)。在第一次调用时RoutingSlipHeaderValueMessageProcessor
检查每个路由清单path
条目。它将条目(不是应用程序上下文中的 bean 名称)转换为实例。
条目被多次调用,直到它们返回 null 或空。BeanFactory
processMessage
ExpressionEvaluatingRoutingSlipRouteStrategy
RoutingSlipRouteStrategy
String
由于getOutputChannel
流程中涉及路由单,因此我们有一个请求-回复上下文。已RoutingSlipRouteStrategy
被引入来确定下一个outputChannel
使用requestMessage
和reply
对象。此策略的实现应在应用程序上下文中注册为 bean,并且其 bean 名称用于路由单path
。提供ExpressionEvaluatingRoutingSlipRouteStrategy
了实现。它接受一个 SpEL 表达式,并使用一个内部ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply
对象作为评估上下文的根对象。这是为了避免EvaluationContext
每次ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()
调用的创建开销。它是一个简单的 Java bean,具有两个属性:Message<?> request
和Object reply
. 通过这个表达式实现,我们可以使用 SpEL 指定路由path
表条目(例如,@routingSlipRoutingPojo.get(request, reply)
和request.headers[myRoutingSlipChannel]
) 并避免为RoutingSlipRouteStrategy
.
参数requestMessage 始终是
. Message<?> 根据上下文,回复对象可以是Message<?> 、AbstractIntegrationMessageBuilder 或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时)。在前两种情况下,在使用 SpEL(或 Java 实现)时,通常的Message 属性 (payload 和headers ) 可用。对于任意域对象,这些属性不可用。出于这个原因,如果结果用于确定下一条路径,则在将路由单与 POJO 方法结合使用时要小心。
|
如果分布式环境中涉及路由单,我们建议不要对路由单使用内联表达式path 。此建议适用于分布式环境,例如跨 JVM 应用程序,request-reply 通过消息代理(例如AMQP 支持或JMS 支持)或在集成流中使用持久性MessageStore (消息存储)。框架用于RoutingSlipHeaderValueMessageProcessor 将它们转换为ExpressionEvaluatingRoutingSlipRouteStrategy 对象,并在routingSlip 消息头中使用它们。由于这个类不是Serializable (它不能是,因为它依赖于BeanFactory ),所以整个Message 变得不可序列化,并且在任何分布式操作中,我们最终都会得到一个NotSerializableException . 要克服此限制,ExpressionEvaluatingRoutingSlipRouteStrategy 请使用所需的 SpEL 注册一个 bean,并在路由单path 配置中使用其 bean 名称。
|
对于 Java 配置,您可以将RoutingSlipHeaderValueMessageProcessor
实例添加到HeaderEnricher
bean 定义中,如以下示例所示:
@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
"@routingSlipRoutingPojo.get(request, reply)",
"routingSlipRoutingStrategy",
"request.headers[myRoutingSlipChannel]",
"finishChannel")));
}
当端点产生回复并且没有outputChannel
定义时,路由滑动算法的工作方式如下:
-
routingSlipIndex
用于从路由清单列表中获取值path
。 -
如果 from 的
routingSlipIndex
值为String
,则用于从 获取 beanBeanFactory
。 -
如果返回的 bean 是 的实例
MessageChannel
,则将其用作下一个outputChannel
,并routingSlipIndex
在回复消息头中递增 (路由清单path
条目保持不变)。 -
如果返回的 bean 是 的一个实例
RoutingSlipRouteStrategy
并且它getNextPath
不返回空String
值,那么该结果将用作下一个 bean 的名称outputChannel
。routingSlipIndex
保持不变。 -
如果
RoutingSlipRouteStrategy.getNextPath
返回一个空的String
ornull
,routingSlipIndex
则 递增,并为下一个 Routing Slip项getOutputChannelFromRoutingSlip
递归调用。path
-
如果下一个传送单
path
条目不是String
,它必须是 的实例RoutingSlipRouteStrategy
。 -
当
routingSlipIndex
超过路由path
清单列表的大小时,算法将移动到标准replyChannel
标题的默认行为。
流程管理器企业集成模式
企业集成模式包括流程管理器模式。您现在可以通过使用封装在RoutingSlipRouteStrategy
路由清单中的自定义流程管理器逻辑轻松实现此模式。除了 bean 名称之外,RoutingSlipRouteStrategy
还可以返回任何MessageChannel
对象,并且不要求此MessageChannel
实例是应用程序上下文中的 bean。这样,当无法预测应该使用哪个通道时,我们可以提供强大的动态路由逻辑。AMessageChannel
可以在 中创建RoutingSlipRouteStrategy
并返回。FixedSubscriberChannel
对于这种情况,具有关联MessageHandler
实现的A是一个很好的组合。例如,您可以路由到Reactive Streams,如以下示例所示:
@Bean
public PollableChannel resultsChannel() {
return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
? new FixedSubscriberChannel(m ->
Mono.just((String) m.getPayload())
.map(String::toUpperCase)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
: new FixedSubscriberChannel(m ->
Mono.just((Integer) m.getPayload())
.map(v -> v * 2)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}
筛选
消息过滤器用于根据某些标准(例如消息头值或消息内容本身)来决定是否Message
应该传递或删除 a。因此,消息过滤器类似于路由器,不同之处在于,对于从过滤器的输入通道接收到的每条消息,相同的消息可能会或可能不会发送到过滤器的输出通道。与路由器不同,它不决定将消息发送到哪个消息通道,而只决定是否发送消息。
正如我们在本节后面描述的那样,过滤器还支持丢弃通道。在某些情况下,它可以根据布尔条件扮演一个非常简单的路由器(或“交换机”)的角色。 |
在 Spring Integration 中,您可以将消息过滤器配置为委托给MessageSelector
接口实现的消息端点。该界面本身非常简单,如以下清单所示:
public interface MessageSelector {
boolean accept(Message<?> message);
}
构造MessageFilter
函数接受一个选择器实例,如以下示例所示:
MessageFilter filter = new MessageFilter(someSelector);
结合命名空间和 SpEL,您可以用很少的 Java 代码配置强大的过滤器。
使用 XML 配置过滤器
您可以使用该<filter>
元素来创建消息选择端点。除了input-channel
和output-channel
属性,它还需要一个ref
属性。ref
可以指向一个实现,如以下MessageSelector
示例所示:
<int:filter input-channel="input" ref="selector" output-channel="output"/>
<bean id="selector" class="example.MessageSelectorImpl"/>
或者,您可以添加method
属性。在这种情况下,ref
属性可以引用任何对象。引用的方法可能需要Message
入站消息的类型或负载类型。该方法必须返回一个布尔值。如果该方法返回“true”,则将消息发送到输出通道。以下示例显示如何配置使用该method
属性的过滤器:
<int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod"/>
<bean id="exampleObject" class="example.SomeObject"/>
如果选择器或适配的 POJO 方法返回false
,则一些设置控制对被拒绝消息的处理。默认情况下(如果按照前面的示例进行配置),被拒绝的消息会被静默丢弃。如果拒绝反而会导致错误情况,请将throw-exception-on-rejection
属性设置为true
,如以下示例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" throw-exception-on-rejection="true"/>
如果您希望将被拒绝的消息路由到特定通道,请将该引用提供为discard-channel
,如以下示例所示:
<int:filter input-channel="input" ref="selector"
output-channel="output" discard-channel="rejectedMessages"/>
另请参阅建议过滤器。
消息过滤器通常与发布-订阅通道结合使用。许多过滤器端点可能订阅了同一个通道,它们决定是否将消息传递给下一个端点,下一个端点可以是任何受支持的类型(例如服务激活器)。这为使用具有单个点对点输入通道和多个输出通道的消息路由器的更主动方法提供了一种反应性替代方案。 |
ref
如果在其他定义中引用了自定义过滤器实现,我们建议使用属性<filter>
。但是,如果自定义过滤器实现的范围仅限于单个<filter>
元素,则应提供内部 bean 定义,如以下示例所示:
<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>
不允许在同一配置中
同时使用ref 属性和内部处理程序定义,因为它会创建模棱两可的条件并引发异常。<filter> |
如果该ref 属性引用了一个扩展的bean MessageFilter (例如框架本身提供的过滤器),则通过将输出通道直接注入过滤器bean来优化配置。在这种情况下,每个都ref 必须是一个单独的 bean 实例(或一个prototype -scoped bean)或使用内部<bean/> 配置类型。但是,仅当您未在过滤器 XML 定义中提供任何特定于过滤器的属性时,此优化才适用。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。
|
随着 SpEL 支持的引入,Spring Integration 将expression
属性添加到过滤器元素中。对于简单的过滤器,它可以用来完全避免使用 Java,如以下示例所示:
<int:filter input-channel="input" expression="payload.equals('nonsense')"/>
作为表达式属性的值传递的字符串被评估为 SpEL 表达式,并且消息在评估上下文中可用。如果您必须在应用程序上下文的范围内包含表达式的结果,则可以使用SpEL 参考文档#{}
中定义的表示法,如以下示例所示:
<int:filter input-channel="input"
expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
如果表达式本身需要是动态的,您可以使用“表达式”子元素。这提供了一定程度的间接性,用于通过ExpressionSource
. 这是一个您可以直接实现的策略接口,或者您可以依赖 Spring Integration 中可用的版本,该版本从“资源包”加载表达式,并可以在给定的秒数后检查修改。所有这些都在以下配置示例中进行了演示,如果基础文件已被修改,则表达式可以在一分钟内重新加载:
<int:filter input-channel="input" output-channel="output">
<int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>
<beans:bean id="myExpressions" id="myExpressions"
class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
<beans:property name="basename" value="config/integration/expressions"/>
<beans:property name="cacheSeconds" value="60"/>
</beans:bean>
如果ExpressionSource
bean 被命名expressionSource
,则不需要在<expression>
元素上提供`source` 属性。但是,在前面的示例中,我们将其显示为完整性。
'config/integration/expressions.properties' 文件(或任何更具体的版本,具有以加载资源包的典型方式解析的语言环境扩展名)可以包含键/值对,如以下示例所示:
filterPatterns.example=payload > 100
所有这些expression 用作属性或子元素的示例也可以应用于转换器、路由器、拆分器、服务激活器和标头丰富器元素。给定组件类型的语义和角色将影响评估结果的解释,就像解释方法调用的返回值一样。例如,表达式可以返回将被路由器组件视为消息通道名称的字符串。但是,在 Spring Integration 中的所有核心 EIP 组件中,将表达式作为根对象评估表达式并解析以“@”为前缀的 bean 名称的底层功能是一致的。
|
使用注释配置过滤器
以下示例显示了如何使用注释配置过滤器:
public class PetFilter {
...
@Filter (1)
public boolean dogsOnly(String input) {
...
}
}
1 | 指示此方法将用作过滤器的注释。如果要将此类用作过滤器,则必须指定它。 |
XML 元素提供的所有配置选项也可用于@Filter
注释。
过滤器可以从 XML 中显式引用,或者,如果@MessageEndpoint
注释是在类上定义的,则可以通过类路径扫描自动检测到。
另请参阅使用注释为端点提供建议。
分路器
拆分器是一个组件,其作用是将消息分割成多个部分,并将生成的消息发送到独立处理。很多时候,他们是包含聚合器的管道中的上游生产者。
编程模型
用于执行拆分的 API 由一个基类AbstractMessageSplitter
. 它是一种MessageHandler
封装拆分器通用功能的实现,例如在生成的消息上填充适当的消息头(CORRELATION_ID
、SEQUENCE_SIZE
和SEQUENCE_NUMBER
)。这种填充可以跟踪消息及其处理结果(在典型情况下,这些标头被复制到由各种转换端点生成的消息中)。然后可以使用这些值,例如,由组合消息处理器使用。
以下示例显示了以下内容的摘录AbstractMessageSplitter
:
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的拆分器,您可以扩展AbstractMessageSplitter
并实现该splitMessage
方法,其中包含用于拆分消息的逻辑。返回值可以是以下之一:
-
一个
Collection
或一个消息数组或一个Iterable
(或Iterator
)迭代消息。在这种情况下,消息作为消息发送(在CORRELATION_ID
,SEQUENCE_SIZE
和SEQUENCE_NUMBER
填充之后)。使用这种方法可以让您获得更多控制权——例如,在拆分过程中填充自定义消息头。 -
一个
Collection
或一组非消息对象,或一个Iterable
(orIterator
) 迭代非消息对象。它的工作原理与前一种情况类似,不同之处在于每个集合元素都用作消息有效负载。使用这种方法可以让您专注于域对象,而无需考虑消息传递系统并生成更易于测试的代码。 -
一个
Message
或非消息对象(但不是集合或数组)。它的工作方式与前面的情况类似,只是发送了一条消息。
在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值如前所述进行解释。输入参数可能是一个Message
POJO 或一个简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。
迭代器
从 4.1 版本开始,AbstractMessageSplitter
支持拆分的Iterator
类型。value
请注意,在Iterator
(或Iterable
)的情况下,我们无权访问基础项目的数量,并且SEQUENCE_SIZE
标题设置为0
。这意味着SequenceSizeReleaseStrategy
an的默认值<aggregator>
将不起作用,并且CORRELATION_ID
from 的组splitter
将不会被释放;它将保持为incomplete
. 在这种情况下,您应该使用适当的自定义ReleaseStrategy
或依赖send-partial-result-on-expiry
与group-timeout
或一起使用MessageGroupStoreReaper
。
从 5.0 版开始,如果可能的话,AbstractMessageSplitter
提供protected obtainSizeIfPossible()
了允许确定Iterable
和对象大小的方法。Iterator
例如XPathMessageSplitter
可以确定底层NodeList
对象的大小。从 5.0.9 版本开始,此方法还正确返回com.fasterxml.jackson.core.TreeNode
.
Iterator
对象有助于避免在拆分之前在内存中构建整个集合。例如,当MGET
使用迭代或流从某个外部系统(例如 DataBase 或 FTP )填充底层项目时。
流和通量
从 5.0 版开始,支持用于拆分AbstractMessageSplitter
的 JavaStream
和 Reactive StreamsPublisher
类型。value
在这种情况下,目标Iterator
是建立在它们的迭代功能之上的。
此外,如果拆分器的输出通道是 a 的实例ReactiveStreamsSubscribableChannel
,则AbstractMessageSplitter
生成Flux
结果而不是 a Iterator
,并且输出通道订阅此结果Flux
,以根据下游流量需求进行基于背压的拆分。
从 5.2 版开始,拆分器支持discardChannel
发送那些拆分函数返回空容器(集合、数组、流Flux
等)的请求消息的选项。在这种情况下,没有要迭代的项目以发送到outputChannel
. 拆分结果null
保留为流结束指示符。
使用 XML 配置拆分器
可以通过 XML 配置拆分器,如下所示:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可选的。 |
2 | 对应用程序上下文中定义的 bean 的引用。bean 必须实现拆分逻辑,如前面部分所述。可选的。如果未提供对 bean 的引用,则假定到达的消息的有效负载input-channel 是 的实现,java.util.Collection 并且默认拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到output-channel . |
3 | 实现拆分逻辑的方法(在 bean 上定义)。可选的。 |
4 | 分配器的输入通道。必需的。 |
5 | 拆分器将传入消息拆分结果发送到的通道。可选(因为传入的消息可以自己指定回复通道)。 |
6 | 拆分结果为空时请求消息发送到的通道。可选(将在null 结果的情况下停止)。 |
ref
如果可以在其他定义中引用自定义拆分器实现,我们建议使用属性<splitter>
。但是,如果自定义拆分器处理程序实现的范围应为 的单个定义,则<splitter>
可以配置内部 bean 定义,如下例所示:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一配置中
同时使用ref 属性和内部处理程序定义,因为它会创建模棱两可的条件并导致引发异常。<int:splitter> |
如果ref 属性引用了扩展的 bean AbstractMessageProducingHandler (例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个ref 必须是单独的 bean 实例(或prototype -scoped bean)或使用内部<bean/> 配置类型。但是,仅当您未在拆分器 XML 定义中提供任何拆分器特定属性时,此优化才适用。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。
|
聚合器
基本上是拆分器的镜像,聚合器是一种消息处理程序,它接收多条消息并将它们组合成一条消息。实际上,聚合器通常是包含拆分器的管道中的下游消费者。
从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。它必须保存要聚合的消息,并确定何时可以聚合完整的消息组。为此,它需要一个MessageStore
.
功能性
聚合器通过关联和存储一组相关消息来组合一组相关消息,直到该组被认为是完整的。此时,聚合器通过处理整个组创建单个消息并将聚合消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即,从多个创建单个消息)。两个相关的概念是相关性和释放。
相关性确定消息如何分组以进行聚合。在 Spring Integration 中,默认情况下会根据IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头完成关联。相同的消息IntegrationMessageHeaderAccessor.CORRELATION_ID
被分组在一起。但是,您可以自定义关联策略以允许以其他方式指定消息应如何组合在一起。为此,您可以实现一个CorrelationStrategy
(本章稍后介绍)。
为了确定准备处理一组消息的点,ReleaseStrategy
请查阅 a。当序列中包含的所有消息都存在时,聚合器的默认释放策略会根据IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
标头释放组。您可以通过提供对自定义ReleaseStrategy
实现的引用来覆盖此默认策略。
编程模型
聚合 API 由许多类组成:
-
接口
MessageGroupProcessor
及其子类:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
ReleaseStrategy
接口及其默认实现:SimpleSequenceSizeReleaseStrategy
-
CorrelationStrategy
接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(的子类)AbstractCorrelatingMessageHandler
是一个MessageHandler
实现,封装了聚合器(和其他相关用例)的通用功能,如下所示:
-
将消息关联到要聚合的组中
-
将这些消息保留在 a 中,
MessageStore
直到可以释放该组 -
决定何时可以释放该组
-
将已发布的组聚合为单个消息
-
识别和响应过期组
决定如何将消息组合在一起的责任委托给一个CorrelationStrategy
实例。决定是否可以释放消息组的责任委托给一个ReleaseStrategy
实例。
以下清单显示了基础的简要亮点AbstractAggregatingMessageGroupProcessor
(实现该aggregatePayloads
方法的责任留给开发人员):
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅DefaultAggregatingMessageGroupProcessor
和作为ExpressionEvaluatingMessageGroupProcessor
的MethodInvokingMessageGroupProcessor
开箱即用实现AbstractAggregatingMessageGroupProcessor
。
从 5.2 版开始,Function<MessageGroup, Map<String, Object>>
可以使用一种策略AbstractAggregatingMessageGroupProcessor
来合并和计算(聚合)输出消息的标头。该DefaultAggregateHeadersFunction
实现可用于返回组之间没有冲突的所有标头的逻辑;组内的一条或多条消息上缺少标头不被视为冲突。省略冲突的标题。与新引入的 一起DelegatingMessageGroupProcessor
,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessor
)MessageGroupProcessor
实现。本质上,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor
实例中,并将所有其他实现包装到DelegatingMessageGroupProcessor
. AbstractAggregatingMessageGroupProcessor
the和 the之间的逻辑差异DelegatingMessageGroupProcessor
后者在调用委托策略之前不会提前计算标头,并且如果委托返回 aMessage
或,则不会调用该函数AbstractIntegrationMessageBuilder
。在这种情况下,框架假定目标实现已经负责生成一组正确的标题,这些标题填充到返回的结果中。该Function<MessageGroup, Map<String, Object>>
策略可用作headers-function
XML 配置的参考属性、AggregatorSpec.headersFunction()
Java DSL 的选项和AggregatorFactoryBean.setHeadersFunction()
纯 Java 配置的选项。
由CorrelationStrategy
拥有,AbstractCorrelatingMessageHandler
并且具有基于IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头的默认值,如以下示例所示:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor
. 它创建一个单一Message
的,其有效负载是List
为给定组接收的有效负载中的一个。这适用于具有拆分器、发布-订阅通道或上游接收者列表路由器的简单分散-收集实现。
在这种情况下使用发布-订阅通道或收件人列表路由器时,请务必启用该apply-sequence 标志。这样做会添加必要的标题:CORRELATION_ID 、SEQUENCE_NUMBER 和SEQUENCE_SIZE . Spring Integration 中的拆分器默认启用该行为,但对于发布-订阅通道或收件人列表路由器未启用该行为,因为这些组件可能在不需要这些标头的各种上下文中使用。
|
在为应用程序实现特定聚合器策略时,您可以扩展AbstractAggregatingMessageGroupProcessor
和实现该aggregatePayloads
方法。但是,有更好的解决方案,与 API 耦合较少,用于实现聚合逻辑,可以通过 XML 或通过注释进行配置。
一般来说,任何 POJO 都可以实现聚合算法,如果它提供了一个接受单个java.util.List
作为参数的方法(也支持参数化列表)。调用此方法来聚合消息,如下所示:
-
如果参数是 a
java.util.Collection<T>
并且参数类型 T 可分配给Message
,则为聚合而累积的整个消息列表将发送到聚合器。 -
如果参数是非参数化的
java.util.Collection
或参数类型不可分配给Message
,则该方法接收累积消息的有效负载。 -
如果返回类型不可分配给
Message
,则将其视为Message
框架自动创建的 a 的有效负载。
为了简化代码并促进低耦合、可测试性等最佳实践,实现聚合逻辑的首选方式是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。 |
从 5.3 版开始,在处理消息组之后,将针对具有多个嵌套级别的正确拆分器-聚合器场景AbstractCorrelatingMessageHandler
执行MessageBuilder.popSequenceDetails()
消息头修改。仅当消息组发布结果不是消息集合时才会这样做。在这种情况下,目标在构建这些消息时MessageGroupProcessor
负责MessageBuilder.popSequenceDetails()
调用。
如果MessageGroupProcessor
返回 a Message
,则仅当与组中的第一条消息匹配MessageBuilder.popSequenceDetails()
时才会对输出消息执行a 。sequenceDetails
(以前只有当一个普通的有效载荷或AbstractIntegrationMessageBuilder
从MessageGroupProcessor
.
此功能可以由新popSequence
boolean
属性控制,因此MessageBuilder.popSequenceDetails()
在标准拆分器未填充相关详细信息时,可以在某些情况下禁用该功能。从本质上讲,此属性撤消applySequence = true
了AbstractMessageSplitter
. 有关详细信息,请参阅拆分器。
该SimpleMessageGroup.getMessages() 方法返回一个unmodifiableCollection . 因此,如果您的聚合 POJO 方法有一个Collection<Message> 参数,则传入的参数正是该Collection 实例,并且当您将 aSimpleMessageStore 用于聚合器时,该原始参数Collection<Message> 在释放组后被清除。因此,Collection<Message> 如果 POJO 中的变量从聚合器中传递出去,它也会被清除。如果您希望简单地按原样发布该集合以进行进一步处理,则必须构建一个新的Collection (例如,new ArrayList<Message>(messages) )。从 4.3 版开始,框架不再将消息复制到新集合中,以避免不必要的额外对象创建。
|
如果该processMessageGroup
方法MessageGroupProcessor
返回一个集合,它必须是一个Message<?>
对象的集合。在这种情况下,消息是单独发布的。在 4.2 版之前,无法MessageGroupProcessor
通过使用 XML 配置来提供。只有 POJO 方法可以用于聚合。现在,如果框架检测到引用的(或内部)bean 实现MessageProcessor
了 ,则它被用作聚合器的输出处理器。
如果您希望从自定义中释放一组对象MessageGroupProcessor
作为消息的有效负载,您的类应该扩展AbstractAggregatingMessageGroupProcessor
并实现aggregatePayloads()
.
此外,从 4.2 版开始,SimpleMessageGroupProcessor
提供了 a。它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。
这让聚合器作为消息屏障工作,到达的消息被保留,直到释放策略触发并且组作为单个消息序列被释放。
ReleaseStrategy
ReleaseStrategy
接口定义如下:
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,任何 POJO 都可以实现完成决策逻辑,前提是它提供了一个接受单个java.util.List
作为参数的方法(也支持参数化列表)并返回一个布尔值。每条新消息到达后调用该方法,判断组是否完整,如下:
-
如果参数是 a
java.util.List<T>
并且参数类型T
可分配给Message
,则将组中累积的整个消息列表发送到该方法。 -
如果参数是非参数化
java.util.List
的或参数类型不可分配给Message
,则该方法接收累积消息的有效负载。 -
true
如果消息组准备好聚合,则该方法必须返回,否则返回 false。
以下示例显示了如何将@ReleaseStrategy
注解用于 aList
类型Message
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例显示了如何将@ReleaseStrategy
注解用于 aList
类型String
:
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前面两个示例中的签名,基于 POJO 的发布策略是传递一个Collection
尚未发布的消息(如果您需要访问整体Message
)或一个Collection
有效负载对象(如果类型参数不是Message
) . 这满足了大多数用例。但是,如果由于某种原因需要访问完整的MessageGroup
,则应提供ReleaseStrategy
接口的实现。
在处理潜在的大型组时,您应该了解这些方法是如何调用的,因为在释放组之前可能会多次调用释放策略。最有效的是 的实现 由于这些原因,对于大型团体,我们建议您实施 |
当组被释放以进行聚合时,其所有尚未释放的消息都将被处理并从组中删除。如果组也是完整的(即,如果来自一个序列的所有消息都已到达,或者如果没有定义序列),则将该组标记为完整。该组的任何新消息都将发送到丢弃通道(如果已定义)。设置expire-groups-upon-completion
为true
(默认为false
)会删除整个组,并且任何新消息(与已删除组具有相同的相关 ID)形成一个新组。MessageGroupStoreReaper
您可以通过将 a与send-partial-result-on-expiry
设置为一起使用来释放部分序列true
。
为了便于丢弃迟到的消息,聚合器必须在组被释放后维护组的状态。这最终会导致内存不足的情况。为避免这种情况,您应该考虑配置 aMessageGroupStoreReaper 以删除组元数据。一旦到达某个点,过期参数应设置为过期组,之后延迟消息预计不会到达。有关配置收割机的信息,请参阅在聚合器中管理状态:MessageGroupStore 。
|
Spring Integration 提供了一个实现ReleaseStrategy
:SimpleSequenceSizeReleaseStrategy
. 此实现参考每个到达消息的SEQUENCE_NUMBER
和SEQUENCE_SIZE
标头来决定消息组何时完成并准备好进行聚合。如前所述,它也是默认策略。
在 5.0 版本之前,默认的发布策略是SequenceSizeReleaseStrategy ,这在大型组中表现不佳。使用该策略,可以检测并拒绝重复的序列号。此操作可能很昂贵。
|
如果您正在聚合大型组,则不需要释放部分组,并且不需要检测/拒绝重复序列,请考虑SimpleSequenceSizeReleaseStrategy
改用 - 对于这些用例来说效率更高,并且是默认设置,因为未指定部分组发布时的版本 5.0 。
聚合大型组
4.3 版本将Collection
a 中消息的默认值更改SimpleMessageGroup
为HashSet
(以前是 a BlockingQueue
)。当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。尽管散列集的删除速度通常要快得多,但对于大型消息来说可能代价高昂,因为必须在插入和删除时都计算散列。如果您有散列成本很高的消息,请考虑使用其他一些集合类型。如UsingMessageGroupFactory
中所讨论的,提供了 aSimpleMessageGroupFactory
以便您可以选择Collection
最适合您需要的。您还可以提供自己的工厂实现来创建其他一些Collection<Message<?>>
.
以下示例显示了如何使用先前的实现和 a 配置聚合器SimpleSequenceSizeReleaseStrategy
:
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器的上游流,则序列大小释放策略(固定或基于sequenceSize 标头)将无法达到其目的,因为来自序列的某些消息可能会被过滤器丢弃。在这种情况下,建议选择另一个ReleaseStrategy ,或使用从丢弃子流发送的补偿消息,该消息在其内容中携带一些信息,以便在自定义完整组功能中跳过。有关详细信息,
请参阅过滤器。 |
关联策略
CorrelationStrategy
接口定义如下:
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个Object
表示用于将消息与消息组相关联的相关键的值。密钥必须满足用于 a 中的密钥的标准,Map
以实现equals()
和hashCode()
。
一般来说,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与 a 的规则相同ServiceActivator
(包括对@Header
注释的支持)。该方法必须返回一个值,并且该值不能是null
.
Spring Integration 提供了一个实现CorrelationStrategy
:HeaderAttributeCorrelationStrategy
. 此实现返回消息头之一(其名称由构造函数参数指定)的值作为关联键。默认情况下,关联策略是HeaderAttributeCorrelationStrategy
返回CORRELATION_ID
header 属性的值。如果您有一个想要用于关联的自定义标头名称,您可以在一个实例上对其进行配置,HeaderAttributeCorrelationStrategy
并将其作为聚合器关联策略的参考。
锁定注册表
对组的更改是线程安全的。因此,当您同时发送具有相同关联 ID 的消息时,聚合器中只会处理其中一个消息,从而使其有效地成为每个消息组的单线程。ALockRegistry
用于获取已解析的相关 ID 的锁定。ADefaultLockRegistry
默认使用(在内存中)。为了在使用共享MessageGroupStore
的服务器之间同步更新,您必须配置共享锁注册表。
避免死锁
如上所述,当消息组发生变异(添加或释放消息)时,会持有一个锁。
考虑以下流程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。这将导致线程挂起,并jstack <pid>
可能出现以下结果:
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免这个问题:
-
确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)
-
使用
ExecutorChannel
orQueueChannel
作为聚合器的输出通道,以便下游流在新线程上运行 -
从版本 5.1.1 开始,将
releaseLockBeforeSend
聚合器属性设置为true
如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也可能导致此问题。当然,上述第一种解决方案不适用于这种情况。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器的信息,请参阅聚合器和重新排序器。
使用 XML 配置聚合器
Spring Integration 支持通过元素配置带有 XML 的聚合<aggregator/>
器。以下示例显示了聚合器的示例:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 id 是可选的。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。可选(默认为“真”)。 |
3 | 聚合器从中接收消息的通道。必需的。 |
4 | 聚合器将聚合结果发送到的通道。可选(因为传入消息本身可以在“replyChannel”消息头中指定回复通道)。 |
5 | 聚合器将超时消息发送到的通道(如果send-partial-result-on-expiry 是false )。可选的。 |
6 | 对 a 的引用,MessageGroupStore 用于将消息组存储在它们的相关键下,直到它们完成。可选的。默认情况下,它是一个易失的内存存储。有关详细信息,请参阅消息存储。 |
7 | 订阅多个句柄时此聚合器的顺序DirectChannel (用于负载平衡目的)。可选的。 |
8 | 指示一旦包含过期消息,就应该聚合过期消息并将其发送到“输出通道”或“回复通道” MessageGroup (请参阅 参考资料MessageGroupStore.expireMessageGroups(long) )。使 a 过期的一种方法MessageGroup 是配置 a MessageGroupStoreReaper 。MessageGroup 但是,您也可以通过调用来过期MessageGroupStore.expireMessageGroups(timeout) 。您可以通过控制总线操作来完成此操作,或者,如果您有对MessageGroupStore 实例的引用,则可以通过调用expireMessageGroups(timeout) . 否则,这个属性本身什么也不做。它仅用作是否丢弃或发送到输出或回复通道的任何仍然在MessageGroup 即将过期的消息的指示符。可选(默认为false )。注意:这个属性可能更恰当地称为send-partial-result-on-timeout ,因为如果组可能实际上不会过期expire-groups-upon-timeout 设置为false 。 |
9 | Message 向output-channel 或发送回复时等待的超时间隔discard-channel 。默认为-1 ,这会导致无限期阻塞。仅当输出通道具有某些“发送”限制时才应用它,例如QueueChannel 具有固定“容量”的 a。在这种情况下,MessageDeliveryException 抛出 a。对于AbstractSubscribableChannel 实现,send-timeout 被忽略。对于group-timeout(-expression) ,MessageDeliveryException 来自计划的过期任务导致该任务被重新计划。可选的。 |
10 | 对实现消息相关(分组)算法的 bean 的引用。bean 可以是CorrelationStrategy 接口或 POJO 的实现。在后一种情况下,correlation-strategy-method 还必须定义属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 标头)。 |
11 | 在 引用的 bean 上定义的方法correlation-strategy 。它实现了相关决策算法。可选,有限制(correlation-strategy 必须存在)。 |
12 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。只允许correlation-strategy 或之一。correlation-strategy-expression |
13 | 对应用程序上下文中定义的 bean 的引用。bean 必须实现聚合逻辑,如前所述。可选(默认情况下,聚合消息列表成为输出消息的有效负载)。 |
14 | ref 在属性引用的 bean 上定义的方法。它实现了消息聚合算法。可选(取决于ref 定义的属性)。 |
15 | 对实现发布策略的 bean 的引用。bean 可以是ReleaseStrategy 接口或 POJO 的实现。在后一种情况下,release-strategy-method 还必须定义属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header 属性)。 |
16 | release-strategy 在属性引用的 bean 上定义的方法。它实现了完成决策算法。可选,有限制(release-strategy 必须存在)。 |
17 | 表示发布策略的 SpEL 表达式。表达式的根对象是 a MessageGroup 。示例:"size() == 5" 。只允许release-strategy 或之一。release-strategy-expression |
18 | 当设置为true (默认为false )时,已完成的组将从消息存储中删除,让具有相同相关性的后续消息形成一个新组。默认行为是将具有与已完成组相同相关性的消息发送到discard-channel . |
19 | 仅当MessageGroupStoreReaper 为 的 配置了MessageStore a时才适用<aggregator> 。默认情况下,当 aMessageGroupStoreReaper 配置为使部分组过期时,也会删除空组。空组在组正常释放后存在。空组可以检测和丢弃迟到的消息。如果您希望空组的过期时间比部分组过期的时间长,请设置此属性。MessageStore 然后,直到至少在此毫秒数内没有修改空组,才会从 中删除空组。请注意,使空组过期的实际时间也受 reapertimeout 属性的影响,它可能与该值加上超时一样多。 |
20 | org.springframework.integration.util.LockRegistry 对bean的引用。它用于获取一个Lock 基于 的groupId for 并发操作MessageGroup 。默认情况下,使用内部DefaultLockRegistry 。使用分布式LockRegistry ,例如ZookeeperLockRegistry ,确保只有一个聚合器实例可以同时对组进行操作。有关详细信息,请参阅Redis 锁注册表、Gemfire 锁注册表和Zookeeper 锁注册表。 |
21 | 当前消息到达时未释放组时强制MessageGroup 完成的超时(以毫秒为单位) 。ReleaseStrategy 此属性为聚合器提供内置的基于时间的发布策略,如果新消息MessageGroup 在超时时间内未到达,则需要发出部分结果(或丢弃组),该超时时间从最后一个时间开始计算消息到达。要设置从MessageGroup 创建时间开始计算的超时,请参阅group-timeout-expression 信息。当一条新消息到达聚合器时,任何现有ScheduledFuture<?> 的消息MessageGroup 都会被取消。如果ReleaseStrategy 返回false (意思是不释放)和groupTimeout > 0 ,一个新的任务计划使该组到期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关group-timeout-expression 信息,请参阅。完成期间采取的操作取决于ReleaseStrategy 和send-partial-group-on-expiry 属性。有关详细信息,请参阅聚合器和组超时。它与 'group-timeout-expression' 属性互斥。 |
22 | 计算结果为 a 的 SpEL 表达式,groupTimeout 其中MessageGroup 为计算#root 上下文对象。用于调度MessageGroup 强制完成。如果表达式的计算结果为null ,则不安排完成。如果它的计算结果为零,则该组立即在当前线程上完成。实际上,这提供了动态group-timeout 属性。例如,如果您希望MessageGroup 在创建组后 10 秒后强制完成 a,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis() where timestamp is provided by MessageGroup.getTimestamp() as the MessageGroup here is the#root 评估上下文对象。但是请记住,组创建时间可能与首次到达消息的时间不同,具体取决于其他组过期属性的配置。有关group-timeout 更多信息,请参阅。与“组超时”属性互斥。 |
23 | 当组由于超时(或通过 a MessageGroupStoreReaper )完成时,默认情况下该组过期(完全删除)。迟到的消息会启动一个新组。您可以将其设置false 为完成组,但保留其元数据,以便丢弃迟到的消息。空组可以在以后使用 aMessageGroupStoreReaper 和empty-group-min-timeout 属性过期。它默认为“真”。 |
24 | 如果在. TaskScheduler _ MessageGroup _ 如果未提供,则使用在( ) 中注册的默认调度程序 ( ) 。如果指定或未指定,则此属性不适用。MessageGroup groupTimeout taskScheduler ApplicationContext ThreadPoolTaskScheduler group-timeout group-timeout-expression |
25 | 从 4.1 版开始。它允许为forceComplete 操作启动事务。它由 agroup-timeout(-expression) 或 a启动,MessageGroupStoreReaper 不适用于正常add 的release 、 和discard 操作。只有这个子元素 or<expire-advice-chain/> 是允许的。 |
26 | 从4.1 版开始。它允许Advice 为forceComplete 操作配置任何。它由 agroup-timeout(-expression) 或 a启动,MessageGroupStoreReaper 不适用于正常add 的release 、 和discard 操作。只有这个子元素 or<expire-transactional/> 是允许的。Advice 也可以使用 Spring 命名空间在此处配置事务tx 。 |
即将到期的组
有两个与过期(完全删除)组相关的属性。当一个组过期时,没有它的记录,如果新消息到达时具有相同的相关性,则启动一个新组。当一个组完成时(没有过期),空组仍然存在,迟到的消息被丢弃。稍后可以通过将 a
如果一个组没有正常完成,但由于超时而被释放或丢弃,则该组正常过期。从 4.1 版开始,您可以使用
从 5.0 版开始,空组也计划在 从 5.4 版开始,聚合器(和重排序器)可以配置为使孤立组(持久消息存储中的组,否则可能不会被释放)过期。( |
ref
如果自定义聚合器处理程序实现可能在其他<aggregator>
定义中被引用,我们通常建议使用属性。但是,如果自定义聚合器实现仅由 的单个定义使用<aggregator>
,则可以使用内部 bean 定义(从版本 1.0.3 开始)在<aggregator>
元素内配置聚合 POJO,如以下示例所示:
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
不允许在同一配置中
同时使用ref 属性和内部 bean 定义,因为它会创建模棱两可的条件。<aggregator> 在这种情况下,会引发异常。
|
以下示例显示了聚合器 bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 的实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
只要这样做有意义,发布策略方法和聚合器方法都可以组合到一个 bean 中。 |
上述示例的关联策略 bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某些标准(在本例中为除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字总和超过某个值。
只要这样做有意义,发布策略方法、相关策略方法和聚合器方法都可以组合在一个 bean 中。(实际上,它们都可以组合起来,也可以任意组合起来。) |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,您可以使用SpEL处理各种策略(相关、发布和聚合),如果此类发布策略背后的逻辑相对简单,我们建议您这样做。假设您有一个设计用于接收对象数组的遗留组件。我们知道,默认发布策略将所有聚合消息组合在List
. 现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每条消息的有效负载并组装对象数组。以下示例解决了这两个问题:
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
然而,使用 SpEL,这样的需求实际上可以通过单行表达式相对容易地处理,从而使您不必编写自定义类并将其配置为 bean。以下示例显示了如何执行此操作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载组装一个新集合,然后将其转换为数组,从而实现与早期 Java 代码相同的结果。
在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方法。
您可以将简单的关联逻辑实现为 SpEL 表达式并在属性中配置它,而不是在CorrelationStrategy
属性中为自定义定义 bean ,如以下示例所示:correlation-strategy
correlation-strategy-expression
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有带有 的person
属性,id
该属性将用于关联消息。
同样,对于ReleaseStrategy
,您可以将发布逻辑实现为 SpEL 表达式并在release-strategy-expression
属性中进行配置。评估上下文的根对象是它MessageGroup
本身。List
可以通过使用message
表达式中组的属性来引用消息。
在 5.0 之前的版本中,根对象是 的集合Message<?> ,如前面的示例所示:
|
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是MessageGroup
自身,并且您声明,一旦5
该组中存在有效负载为的消息,就应该释放该组。
聚合器和组超时
从 4.0 版开始,引入了两个新的互斥属性:group-timeout
和group-timeout-expression
. 请参阅使用 XML 配置聚合器。ReleaseStrategy
在某些情况下,如果当前消息到达时未释放,您可能需要在超时后发出聚合器结果(或丢弃组) 。为此,该选项允许强制完成groupTimeout
调度,如以下示例所示:MessageGroup
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器按release-strategy-expression
. 如果该特定消息未到达,则groupTimeout
强制组在十秒后完成,只要该组包含至少两条消息。
强制组完成的结果取决于ReleaseStrategy
和send-partial-result-on-expiry
。首先,再次咨询发布策略,看是否要进行正常发布。虽然该组没有改变,但此时ReleaseStrategy
可以决定释放该组。如果释放策略仍然没有释放该组,则它已过期。如果send-partial-result-on-expiry
是true
,(部分)MessageGroup
中的现有消息将作为普通聚合器回复消息发布到output-channel
. 否则,它被丢弃。
groupTimeout
行为和MessageGroupStoreReaper
(请参阅使用 XML 配置聚合器)之间存在差异。reaper周期性地为所有MessageGroup
s启动强制完成。如果在. MessageGroupStore
_ 此外,reaper 可用于删除空组(如果为 false ,则保留空组以丢弃迟到的消息)。groupTimeout
MessageGroup
groupTimeout
expire-groups-upon-completion
从 5.5 版开始,groupTimeoutExpression
可以对java.util.Date
实例进行评估。MessageGroup.getTimestamp()
这在诸如groupTimeoutExpression
根据long
组创建时间(
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了一个配置有注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 指示此方法应用作聚合器的注释。如果将此类用作聚合器,则必须指定它。 |
2 | 表示此方法用作聚合器的发布策略的注解。如果在任何方法上都不存在,则聚合器使用SimpleSequenceSizeReleaseStrategy . |
3 | 指示此方法应用作聚合器的关联策略的注释。如果未指示关联策略,则聚合器使用HeaderAttributeCorrelationStrategy based on CORRELATION_ID 。 |
XML 元素提供的所有配置选项也可用于@Aggregator
注释。
可以从 XML 显式引用聚合器,或者如果在@MessageEndpoint
类上定义,则可以通过类路径扫描自动检测到聚合器。
聚合器组件的注释配置(@Aggregator
和其他)仅涵盖简单的用例,其中大多数默认选项就足够了。如果您在使用注解配置时需要对这些选项进行更多控制,请考虑使用@Bean
定义AggregatingMessageHandler
并用 标记其@Bean
方法@ServiceActivator
,如以下示例所示:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
从 4.2 版开始,AggregatorFactoryBean 可用于简化AggregatingMessageHandler .
|
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态模式,它需要根据在一段时间内到达的一组消息做出决策,所有消息都具有相同的关联键。有状态模式(例如ReleaseStrategy
)中的接口设计是由组件(无论是由框架定义还是由用户定义)应该能够保持无状态的原则驱动的。所有状态都由 承载,MessageGroup
其管理委托给MessageGroupStore
。MessageGroupStore
接口定义如下:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关详细信息,请参阅Javadoc。
在等待触发发布策略的同时累积状态信息,并且该事件可能永远不会发生MessageGroupStore
。MessageGroups
因此,为了防止陈旧的消息滞留,并为易失性存储提供一个挂钩以在应用程序关闭时进行清理,您可以注册回调以在它们过期时MessageGroupStore
应用到它。MessageGroups
该界面非常简单,如以下清单所示:
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。
维护这些回调的MessageGroupStore
列表,它根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(参见前面描述的registerMessageGroupExpiryCallback(..)
和expireMessageGroups(..)
方法)。
MessageGroupStore 当您打算依赖功能时,
不要在不同的聚合器组件中使用相同的实例,这一点很重要expireMessageGroups 。每个都根据回调AbstractCorrelatingMessageHandler 注册自己的。这样,每个到期组都可能被错误的聚合器完成或丢弃。从版本 5.0.10 开始,a用于. 反过来,检查该类的实例是否存在,并在回调集中已经存在的情况下用适当的消息记录错误。这种方式框架不允许使用MessageGroupCallback forceComplete() UniqueExpiryCallback AbstractCorrelatingMessageHandler MessageGroupStore MessageGroupStore MessageGroupStore 在不同的聚合器/重排序器中实例化,以避免提到的过期组的副作用不是由特定的相关处理程序创建的。
|
expireMessageGroups
您可以使用超时值调用该方法。任何早于当前时间减去此值的消息都会过期并应用回调。因此,商店的用户定义了消息组“过期”的含义。
为了方便用户,Spring Integration 以 a 的形式为消息过期提供了一个包装器MessageGroupStoreReaper
,如以下示例所示:
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个Runnable
。在前面的示例中,每十秒调用一次消息组存储的 expire 方法。超时本身是 30 秒。
重要的是要了解 'timeout' 属性MessageGroupStoreReaper 是一个近似值,并受任务调度程序的速率影响,因为此属性仅在MessageGroupStoreReaper 任务的下一个计划执行时检查。例如,如果超时设置为 10 分钟,但MessageGroupStoreReaper 任务计划每小时运行一次,并且MessageGroupStoreReaper 任务的最后一次执行发生在超时前一分钟,则MessageGroup 在接下来的 59 分钟内不会过期。因此,我们建议将速率设置为至少等于超时值或更短。
|
除了 reaper 之外,到期回调在应用程序关闭时通过AbstractCorrelatingMessageHandler
.
AbstractCorrelatingMessageHandler
注册自己的到期回调,这是聚合器的 XML 配置中带有布尔标志的链接send-partial-result-on-expiry
。如果标志设置为true
,则在调用到期回调时,组中尚未释放的任何未标记消息都可以发送到输出通道。
由于MessageGroupStoreReaper 是从计划任务中调用的,并且可能会导致向下游集成流生成消息(取决于sendPartialResultOnExpiry 选项),因此建议TaskScheduler 通过 aMessagePublishingErrorHandler 为处理程序异常提供自定义errorChannel ,正如预期的那样通过常规聚合器发布功能。相同的逻辑适用于组超时功能,它也依赖于TaskScheduler . 有关详细信息,请参阅错误处理。
|
当一个共享 一些 有关 |
通量聚合器
在 5.2 版本中,该FluxAggregatorMessageHandler
组件已被引入。它基于 Project ReactorFlux.groupBy()
和Flux.window()
运营商。传入的消息被发送到由该组件的构造函数中FluxSink
发起的。Flux.create()
如果outputChannel
未提供 或它不是 的实例,则从实现中完成ReactiveStreamsSubscribableChannel
对 main 的订阅。否则它被推迟到由实现完成的订阅。消息通过使用 a作为组键进行分组。默认情况下,查询消息的标题。Flux
Lifecycle.start()
ReactiveStreamsSubscribableChannel
Flux.groupBy()
CorrelationStrategy
IntegrationMessageHeaderAccessor.CORRELATION_ID
默认情况下,每个关闭的窗口都作为Flux
要生成的消息的有效负载释放。此消息包含窗口中第一条消息的所有标题。Flux
必须在下游订阅和处理输出消息有效负载中的此内容。这样的逻辑可以setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
通过FluxAggregatorMessageHandler
. 例如,如果我们想List
在最终消息中包含一个有效负载,我们可以Flux.collectList()
这样配置:
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
有几个选项FluxAggregatorMessageHandler
可以选择合适的窗口策略:
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播给Flux.windowUntil()
操作员。有关更多信息,请参阅其 JavaDocs。优先于所有其他窗口选项。 -
setWindowSize(int)
andsetWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
orwindowTimeout(int, Duration)
。默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
标头计算的。 -
setWindowTimespan(Duration)
- 传播到Flux.window(Duration)
或windowTimeout(int, Duration)
取决于窗口大小配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 将转换应用于未包含在公开选项中的任何自定义窗口操作的分组通量的函数。
由于这个组件是一个MessageHandler
实现,它可以简单地用作一个@Bean
定义和一个@ServiceActivator
消息注释。使用 Java DSL,它可以从.handle()
EIP 方法中使用。下面的示例演示了我们如何IntegrationFlow
在运行时注册 a 以及如何将 aFluxAggregatorMessageHandler
与上游拆分器相关联:
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
@SuppressWarnings("unchecked")
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
重测序仪
重排序器与聚合器相关,但用于不同的目的。当聚合器组合消息时,重排序器在不更改消息的情况下传递消息。
功能性
重排序器的工作方式与聚合器类似,因为它使用CORRELATION_ID
来将消息存储在组中。不同之处在于 Resequencer 不以任何方式处理消息。SEQUENCE_NUMBER
相反,它会按照标题值的顺序释放它们。
关于这一点,您可以选择立即释放所有消息(在整个序列之后,根据SEQUENCE_SIZE
和其他可能性)或在有效序列可用时立即释放。(我们将在本章后面讨论“有效序列”的含义。)
重新排序器旨在重新排序具有小间隙的相对较短的消息序列。如果您有大量具有许多间隙的不相交序列,则可能会遇到性能问题。 |
配置重定序器
有关在 Java DSL 中配置重新排序器的信息,请参阅聚合器和重新排序器。
配置重排序器只需要在 XML 中包含适当的元素。
以下示例显示了重新排序器配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | 重定序器的 id 是可选的。 |
2 | 重定序器的输入通道。必需的。 |
3 | 重新排序器将重新排序的消息发送到的通道。可选的。 |
4 | 重定序器将超时消息发送到的通道(如果send-partial-result-on-timeout 设置为false )。可选的。 |
5 | 是在有序序列可用时立即发送还是仅在整个消息组到达后发送。可选的。(默认为false 。) |
6 | 对 a 的引用MessageGroupStore ,可用于将消息组存储在其相关键下,直到它们完成。可选的。(默认为易失性内存存储。) |
7 | 是否应在组到期时发送有序组(即使某些消息丢失)。可选的。(默认为 false。)请参阅在聚合器中管理状态:MessageGroupStore 。 |
8 | Message 向output-channel 或发送回复时等待的超时间隔discard-channel 。默认为-1 ,无限期阻塞。仅当输出通道具有某些“发送”限制时才应用它,例如QueueChannel 具有固定“容量”的 a。在这种情况下,MessageDeliveryException 抛出 a。被send-timeout 忽略的AbstractSubscribableChannel 实现。对于group-timeout(-expression) ,MessageDeliveryException 来自计划的过期任务导致该任务被重新计划。可选的。 |
9 | 对实现消息相关(分组)算法的 bean 的引用。bean 可以是CorrelationStrategy 接口或 POJO 的实现。在后一种情况下,correlation-strategy-method 还必须定义属性。可选的。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 标头。) |
10 | 在 bean 上定义correlation-strategy 并实现相关决策算法的方法。可选,有限制(需要correlation-strategy 存在)。 |
11 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。只允许correlation-strategy 或之一。correlation-strategy-expression |
12 | 对实现发布策略的 bean 的引用。bean 可以是ReleaseStrategy 接口或 POJO 的实现。在后一种情况下,release-strategy-method 还必须定义属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header 属性)。 |
13 | 在 bean 上定义release-strategy 并实现完成决策算法的方法。可选,有限制(需要release-strategy 存在)。 |
14 | 表示发布策略的 SpEL 表达式。表达式的根对象是 a MessageGroup 。示例:"size() == 5" 。只允许release-strategy 或之一。release-strategy-expression |
15 | 仅适用于MessageGroupStoreReaper 为<resequencer> MessageStore . 默认情况下,当 aMessageGroupStoreReaper 配置为使部分组过期时,也会删除空组。正常释放组后存在空组。这是为了能够检测和丢弃迟到的消息。如果您希望空组的过期时间比部分组过期的时间长,请设置此属性。MessageStore 然后,直到至少在此毫秒数内没有修改空组,才会从 中删除空组。请注意,使空组过期的实际时间也受 reaper 的 timeout 属性的影响,它可能与该值加上 timeout 一样多。 |
16 | 请参阅使用 XML 配置聚合器。 |
17 | 请参阅使用 XML 配置聚合器。 |
18 | 请参阅使用 XML 配置聚合器。 |
19 | 请参阅使用 XML 配置聚合器。 |
20 | 默认情况下,当组由于超时(或通过 a MessageGroupStoreReaper )完成时,将保留空组的元数据。迟到的消息会立即被丢弃。将此设置为true 以完全删除组。然后,迟到的消息开始一个新的组,直到组再次超时才被丢弃。由于序列范围中的“洞”导致超时,新组永远不会正常释放。稍后可以通过将 aMessageGroupStoreReaper 与empty-group-min-timeout 属性一起使用来使空组过期(完全删除)。从版本 5.0 开始,空组也计划在过期后删除empty-group-min-timeout 。默认值为“假”。 |
另请参阅聚合器到期组以获取更多信息。
由于没有在 Java 类中为重定序器实现自定义行为,因此没有注释支持。 |
消息处理链
这MessageHandlerChain
是一个MessageHandler
可以配置为单个消息端点的实现,同时实际上委托给其他处理程序链,例如过滤器、转换器、拆分器等。当需要以固定的线性进程连接多个处理程序时,这可能会导致更简单的配置。例如,在其他组件之前提供变压器是相当普遍的。类似地,当您在链中的某个其他组件之前提供过滤器时,您实际上创建了一个选择性消费者。在任何一种情况下,链都只需要一个input-channel
和一个output-channel
,从而无需为每个单独的组件定义通道。
主要MessageHandlerChain 是为 XML 配置设计的。对于 Java DSL 来说,IntegrationFlow 定义可以看作是一个链式组件,但它与下面本章描述的概念和原理无关。有关详细信息,请参阅Java DSL。
|
Spring IntegrationFilter 提供了一个布尔属性:throwExceptionOnRejection . 当您在同一个点对点通道上提供多个具有不同接受标准的选择性消费者时,您应该将此值设置为“true”(默认值为false ),以便调度程序知道消息被拒绝并因此尝试将消息传递给其他订阅者。如果没有抛出异常,即使过滤器已丢弃消息以防止进一步处理,调度程序也会显示消息已成功传递。如果您确实想要“丢弃”消息,过滤器的“丢弃通道”可能很有用,因为它确实让您有机会对丢弃的消息执行一些操作(例如将其发送到 JMS 队列或将其写入到日志)。
|
处理程序链简化了配置,同时在内部保持组件之间相同程度的松散耦合,如果在某些时候需要非线性安排,修改配置是微不足道的。
在内部,链扩展为列出的端点的线性设置,由匿名通道分隔。链中不考虑回复通道标头。只有在调用最后一个处理程序之后,才会将结果消息转发到回复通道或链的输出通道。由于这种设置,除最后一个之外的所有处理程序都必须实现该MessageProducer
接口(它提供了一个“setOutputChannel()”方法)。如果设置了outputChannel
on MessageHandlerChain
,则最后一个处理程序只需要一个输出通道。
与其他端点一样,output-channel 是可选的。如果在链的末尾有回复消息,则输出通道优先。但是,如果它不可用,则链处理程序会检查入站消息上的回复通道标头作为后备。
|
在大多数情况下,您不需要MessageHandler
自己实现。下一节重点介绍链元素的命名空间支持。大多数 Spring Integration 端点,例如服务激活器和转换器,都适合在MessageHandlerChain
.
配置链
该<chain>
元素提供了一个input-channel
属性。如果链中的最后一个元素能够产生回复消息(可选),它也支持一个output-channel
属性。子元素是过滤器、转换器、分离器和服务激活器。最后一个元素也可以是路由器或出站通道适配器。以下示例显示了链定义:
<int:chain input-channel="input" output-channel="output">
<int:filter ref="someSelector" throw-exception-on-rejection="true"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:service-activator ref="someService" method="someMethod"/>
</int:chain>
<header-enricher>
前面示例中使用的元素设置了一个消息头,该消息头以消息thing1
的值命名thing2
。Transformer
标头丰富器是仅涉及标头值的专业化。您可以通过实现一个MessageHandler
对标头进行修改并将其连接为 bean 的方法来获得相同的结果,但标头丰富器是一个更简单的选项。
可以将<chain>
其配置为消息流的最后一个“封闭箱”消费者。对于此解决方案,您可以将其放在 <chain> 的末尾一些 <outbound-channel-adapter>,如以下示例所示:
<int:chain input-channel="input">
<int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
<int:service-activator ref="someService" method="someMethod"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
不允许的属性和元素
某些属性,例如 对于 Spring Integration 核心组件,XML 模式本身强制执行其中一些约束。但是,对于非核心组件或您自己的自定义组件,这些约束由 XML 命名空间解析器而非 XML 模式强制执行。 这些 XML 命名空间解析器约束是在 Spring Integration 2.2 中添加的。如果您尝试使用不允许的属性和元素,XML 命名空间解析器会抛出一个 |
使用“id”属性
从 Spring Integration 3.0 开始,如果给链元素一个id
属性,则元素的 bean 名称是链id
和id
元素本身的组合。没有属性的元素id
不会注册为 bean,但每个元素都会被赋予一个componentName
包含链的id
。考虑以下示例:
<int:chain id="somethingChain" input-channel="input">
<int:service-activator id="somethingService" ref="someService" method="someMethod"/>
<int:object-to-json-transformer/>
</int:chain>
在前面的示例中:
-
根
<chain>
元素有一个id
'somethingChain'。因此,AbstractEndpoint
实现(PollingConsumer
或EventDrivenConsumer
,取决于input-channel
类型)bean 将此值作为其 bean 名称。 -
MessageHandlerChain
bean 获取一个 bean 别名('somethingChain.handler'),它允许从BeanFactory
. -
<service-activator>
不是一个完全成熟的消息传递端点(它不是一个orPollingConsumer
)EventDrivenConsumer
。它是MessageHandler
.<chain>
在这种情况下,注册的 bean 名称BeanFactory
是 'somethingChain$child.somethingService.handler'。 -
的
componentName
thisServiceActivatingHandler
采用相同的值,但没有 '.handler' 后缀。它变成了“somethingChain$child.somethingService”。 -
最后一个
<chain>
子组件<object-to-json-transformer>
没有id
属性。它componentName
基于其在<chain>
. 在这种情况下,它是“somethingChain$child#1”。(名称的最后一个元素是链中的顺序,以“#0”开头)。请注意,此转换器未在应用程序上下文中注册为 bean,因此它不会获得beanName
. 然而,它componentName
有一个对日志记录和其他目的有用的值。
id 在元素上提供显式属性<chain> 以简化日志中子组件的标识并提供对它们的访问BeanFactory 等
很有用。 |
从链中调用链
有时,您需要从一个链中对另一个链进行嵌套调用,然后返回并在原始链中继续执行。为此,您可以通过包含 <gateway> 元素来使用消息传递网关,如以下示例所示:
<int:chain id="main-chain" input-channel="in" output-channel="out">
<int:header-enricher>
<int:header name="name" value="Many" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
<int:gateway request-channel="inputA"/>
</int:chain>
<int:chain id="nested-chain-a" input-channel="inputA">
<int:header-enricher>
<int:header name="name" value="Moe" />
</int:header-enricher>
<int:gateway request-channel="inputB"/>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
<int:chain id="nested-chain-b" input-channel="inputB">
<int:header-enricher>
<int:header name="name" value="Jack" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
在前面的示例中,nested-chain-a
在处理结束时main-chain
由那里配置的“网关”元素调用。在 中nested-chain-a
,对 a 的调用nested-chain-b
是在标头丰富之后进行的。然后流程返回以在 中完成执行nested-chain-b
。最后,流程返回到main-chain
。当<gateway>
元素的嵌套版本在链中定义时,它不需要该service-interface
属性。相反,它将消息处于其当前状态并将其放置在request-channel
属性中定义的通道上。当该网关启动的下游流完成时,aMessage
返回到网关并继续其在当前链中的旅程。
分散-聚集
从 4.1 版开始,Spring Integration 提供了分散-聚集企业集成模式的实现。它是一个复合端点,其目标是向收件人发送消息并聚合结果。如企业集成模式中所述,它是“最佳报价”等场景的组件,我们需要从多个供应商处请求信息,并决定哪一个为我们提供所请求项目的最佳条款。
以前,可以通过使用分立组件来配置模式。这种增强带来了更方便的配置。
TheScatterGatherHandler
是一个请求-回复端点,它结合了 a PublishSubscribeChannel
(或 a RecipientListRouter
)和 a AggregatingMessageHandler
。请求消息被发送到scatter
通道,并ScatterGatherHandler
等待聚合器发送给outputChannel
.
功能性
该Scatter-Gather
模式提出了两种情况:“拍卖”和“分配”。在这两种情况下,aggregation
功能是相同的,并提供所有可用的选项AggregatingMessageHandler
。(实际上,ScatterGatherHandler
只需要 aAggregatingMessageHandler
作为构造函数参数。)有关更多信息,请参阅聚合器。
拍卖
拍卖Scatter-Gather
变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是PublishSubscribeChannel
with apply-sequence="true"
。然而,这个通道可以是任何MessageChannel
实现(就像request-channel
在ContentEnricher
- 见Content Enricher中的情况一样)。correlationStrategy
但是,在这种情况下,您应该为该aggregation
函数创建自己的自定义。
分配
分发Scatter-Gather
变体基于RecipientListRouter
(参见RecipientListRouter
)以及RecipientListRouter
. 这是第二个ScatterGatherHandler
构造函数参数。如果您只想依赖 和 的默认值,correlationStrategy
您应该指定. 否则,您应该为. 与变体(拍卖变体)不同,拥有一个选项可以根据消息过滤目标供应商。使用,提供默认值,并且可以正确释放组。分配选项与拍卖选项互斥。recipient-list-router
aggregator
apply-sequence="true"
correlationStrategy
aggregator
PublishSubscribeChannel
recipient-list-router
selector
apply-sequence="true"
sequenceSize
aggregator
对于拍卖和分发变体,请求(分散)消息都带有gatherResultChannel
标题以等待来自aggregator
.
默认情况下,所有供应商都应将其结果发送到replyChannel
标头(通常通过省略output-channel
来自最终端点的 )。但是,gatherChannel
还提供了该选项,让供应商将他们的回复发送到该通道以进行聚合。
配置分散-聚集端点
以下示例显示了 bean 定义的 Java 配置Scatter-Gather
:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们使用接收通道列表来配置RecipientListRouter
distributor
bean 。applySequence="true"
下一个 bean 用于AggregatingMessageHandler
. 最后,我们将这两个 bean 注入到ScatterGatherHandler
bean 定义中并将其标记为 a@ServiceActivator
以将 scatter-gather 组件连接到集成流中。
以下示例显示如何<scatter-gather>
使用 XML 命名空间配置端点:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 端点的 ID。bean使用ScatterGatherHandler 别名注册id + '.handler' 。bean使用RecipientListRouter 别名注册id + '.scatterer' 。AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer' 。_ 可选的。(BeanFactory 生成默认id 值。) |
2 | 生命周期属性表明端点是否应该在应用程序上下文初始化期间启动。此外,如果提供了 a ,则ScatterGatherHandler 还实现了Lifecycle 启动和停止gatherEndpoint ,这将在内部创建gather-channel 。可选的。(默认为true 。) |
3 | 接收请求消息以处理它们的通道ScatterGatherHandler 。必需的。 |
4 | ScatterGatherHandler 将聚合结果发送到的通道。可选的。replyChannel (传入的消息可以在消息头中自己指定回复通道)。 |
5 | 将拍卖场景的分散消息发送到的通道。可选的。<scatterer> 与子元素互斥。 |
6 | 接收来自每个供应商的聚合回复的通道。它用作replyChannel 分散消息中的标头。可选的。默认情况下,FixedSubscriberChannel 创建。 |
7 | 当多个处理程序订阅同一个处理程序时此组件的顺序DirectChannel (用于负载平衡目的)。可选的。 |
8 | 指定端点应该启动和停止的阶段。启动顺序从低到高,关机顺序从高到低。默认情况下,此值为Integer.MAX_VALUE ,表示此容器尽可能晚地启动并尽快停止。可选的。 |
9 | 向 发送回复时等待的超时Message 间隔output-channel 。默认情况下,发送会阻塞一秒钟。仅当输出通道具有某些“发送”限制时才适用——例如,QueueChannel 具有固定“容量”且已满的 a。在这种情况下,MessageDeliveryException 抛出 a。被send-timeout 忽略的AbstractSubscribableChannel 实现。对于group-timeout(-expression) ,MessageDeliveryException 来自计划的过期任务导致该任务被重新计划。可选的。 |
10 | 让您指定 scatter-gather 在返回之前等待回复消息的时间。默认情况下,它会无限期地等待。如果回复超时,则返回“null”。可选的。默认为-1 ,表示无限期等待。 |
11 | 指定 scatter-gather 是否必须返回非空值。该值是true 默认值。因此,ReplyRequiredException 当底层聚合器在 之后返回空值时,会抛出 a gather-timeout 。请注意,如果null 有可能,gather-timeout 应指定 以避免无限期等待。 |
12 | <recipient-list-router> 选项。可选的。scatter-channel 与属性互斥。 |
13 | <aggregator> 选项。必需的。 |
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。在某些情况下,如果ReleaseStrategy
允许进程以比请求更少的回复完成,最好只捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑从子流返回诸如“补偿消息”之类的东西。
每个异步子流都应该配置一个errorChannel
标头,以便从MessagePublishingErrorHandler
. errorChannel
否则,将使用通用错误处理逻辑将错误发送到全局。有关异步错误处理的更多信息,请参阅错误处理。
同步流可以使用ExpressionEvaluatingRequestHandlerAdvice
忽略异常或返回补偿消息。当从其中一个子流向 中抛出异常时ScatterGatherHandler
,它只是被重新抛出到上游。这样,所有其他子流程都将毫无用处,并且它们的回复将在ScatterGatherHandler
. 有时这可能是一种预期行为,但在大多数情况下,处理特定子流中的错误而不影响所有其他错误和收集器中的预期会更好。
从版本 5.1.3 开始,ScatterGatherHandler
随errorChannelName
选件一起提供。它填充到errorChannel
分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流程中用于直接发送错误消息。
下面的示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.applySequence(true)
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了产生正确的回复,我们必须从 的 复制标题(包括replyChannel
和errorChannel
)发送给。这样,目标异常将返回给回复消息组完成的收集器。这种异常可以在收集器中被过滤掉,或者在分散收集端点之后以其他方式在下游处理。failedMessage
MessagingException
scatterGatherErrorChannel
MessagePublishingErrorHandler
ScatterGatherHandler
payload
MessageGroupProcessor
在将分散结果发送到收集器之前,ScatterGatherHandler 恢复请求消息头,包括回复和错误通道(如果有)。这样AggregatingMessageHandler ,即使在分散的接收者子流中应用了异步切换,来自 的错误也会传播到调用者。对于成功的操作,必须将 agatherResultChannel 和originalReplyChannel headersoriginalErrorChannel 传输回来自分散接收者子流的回复。在这种情况下,gatherTimeout 必须为ScatterGatherHandler . 否则,默认情况下,它将永远被阻止等待收集者的回复。
|
线程屏障
有时,我们需要暂停消息流线程,直到发生其他一些异步事件。例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出已收到消息的确认之前不回复用户。
在 4.2 版本中,Spring Integration<barrier/>
为此引入了该组件。底层MessageHandler
是BarrierMessageHandler
. 此类还实现MessageTriggerAction
了 ,其中传递给trigger()
方法的消息释放方法中的相应线程handleRequestMessage()
(如果存在)。
CorrelationStrategy
挂起的线程和触发线程通过在消息上调用 a 来关联。当有消息发送到 时input-channel
,线程会暂停最多requestTimeout
毫秒,等待相应的触发消息。默认关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID
标头。当触发消息以相同的相关性到达时,线程被释放。发送到output-channel
后发布的消息是使用MessageGroupProcessor
. 默认情况下,消息是Collection<?>
两个有效负载中的一个,并且标头使用 a 合并DefaultAggregatingMessageGroupProcessor
。
如果trigger() 首先调用该方法(或在主线程超时之后),则将其挂起直到triggerTimeout 等待挂起消息到达。如果您不想挂起触发器线程,请考虑将其移交给 a TaskExecutor ,以便其线程改为挂起。
|
在 5.4 之前的版本中,请求和触发消息只有一个timeout 选项,但在某些用例中,最好为这些操作设置不同的超时时间。因此requestTimeout ,triggerTimeout 引入了选项。
|
如果在触发消息到达之前挂起的线程超时,该requires-reply
属性确定要采取的操作。默认为false
,表示端点返回null
,流程结束,线程返回给调用者。当 时true
,ReplyRequiredException
抛出 a。
您可以以编程方式调用该trigger()
方法(通过使用名称获取 bean 引用,barrier.handler
其中barrier
是屏障端点的 bean 名称)。或者,您可以配置一个<outbound-channel-adapter/>
来触发发布。
只有一个线程可以挂起具有相同的相关性。相同的关联可以多次使用,但只能同时使用一次。如果第二个线程到达时具有相同的相关性,则会引发异常。 |
以下示例显示了如何使用自定义标头进行关联:
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger(message);
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据哪个消息首先到达,发送消息in
的线程或发送消息的线程release
等待最多十秒钟,直到另一条消息到达。当消息发布时,out
通道会发送一条消息,该消息结合了调用自定义MessageGroupProcessor
bean 的结果,名为myOutputProcessor
. 如果主线程超时,触发延迟到达,可以配置发送延迟触发的丢弃通道。
有关此组件的示例,请参阅屏障示例应用程序。