消息路由

本章介绍了使用 Spring Integration 路由消息的详细信息。

路由器

本节介绍路由器的工作原理。它包括以下主题:

概述

路由器是许多消息传递体系结构中的关键元素。它们从消息通道消费消息,并根据一组条件将每个消费消息转发到一个或多个不同的消息通道。

Spring Integration 提供以下路由器:

路由器实现共享许多配置参数。但是,路由器之间存在某些差异。此外,配置参数的可用性取决于路由器是在链内部还是外部使用。为了提供快速概览,所有可用属性都列在以下两个表格中。

下表显示了可用于链外路由器的配置参数:

表 1. 链外的路由器
属性 路由器 标头值路由器 xpath 路由器 有效载荷类型路由器 收件人列表路由 异常类型路由器

应用序列

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

默认输出通道

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

分辨率要求

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

忽略发送失败

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

暂停

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

ID

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

自动启动

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

输入通道

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

命令

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

方法

刻度线

参考

刻度线

表达

刻度线

标题名称

刻度线

评估为字符串

刻度线

xpath 表达式引用

刻度线

转换器

刻度线

下表显示了可用于链内路由器的配置参数:

表 2. 链内的路由器
属性 路由器 标头值路由器 xpath 路由器 有效载荷类型路由器 收件人列表路由器 异常类型路由器

应用序列

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

默认输出通道

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

分辨率要求

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

忽略发送失败

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

暂停

刻度线
刻度线
刻度线
刻度线
刻度线
刻度线

ID

自动启动

输入通道

命令

方法

刻度线

参考

刻度线

表达

刻度线

标题名称

刻度线

评估为字符串

刻度线

xpath 表达式引用

刻度线

转换器

刻度线

从 Spring Integration 2.1 开始,路由器参数在所有路由器实现中都更加标准化。因此,一些小的更改可能会破坏旧的基于 Spring Integration 的应用程序。

从 Spring Integration 2.1 开始,该ignore-channel-name-resolution-failures属性被删除,以便将其行为与该resolution-required属性合并。此外,该resolution-required属性现在默认为true.

在进行这些更改之前,该resolution-required属性默认为false,当没有解析通道并且default-output-channel设置 no 时,会导致消息被静默丢弃。新行为至少需要一个已解析的通道,并且默认情况下,MessageDeliveryException如果未确定通道(或尝试发送不成功),则抛出一个。

如果您确实希望静默发送消息,您可以设置default-output-channel="nullChannel".

常用路由器参数

本节介绍所有路由器参数共有的参数(在本章前面显示的两个表格中勾选了所有框的参数)。

链的内部和外部

以下参数对链内外的所有路由器均有效。

apply-sequence

此属性指定是否应将序列号和大小标头添加到每条消息。此可选属性默认为false.

default-output-channel

如果设置,则此属性提供对通道的引用,如果通道解析未能返回任何通道,则应将消息发送到该通道。如果没有提供默认输出通道,路由器会抛出异常。如果您想以静默方式删除这些消息,请将默认输出通道属性值设置为nullChannel.

default-output-channel仅向if 发送消息,resolution-required并且false通道未解析。
resolution-required

此属性指定通道名称是否必须始终成功解析为存在的通道实例。如果设置为trueMessagingException则在无法解析通道时引发 a。将此属性设置为false会导致忽略任何不可解析的通道。此可选属性默认为true.

消息仅发送到default-output-channel,如果指定,何时resolution-requiredfalse并且通道未解析。
ignore-send-failures

如果设置为true,发送到消息通道的失败将被忽略。如果设置为falseMessageDeliveryException则抛出 a ,并且如果路由器解析多个通道,则任何后续通道都不会收到消息。

此属性的确切行为取决于Channel消息发送到的类型。例如,当使用直接通道(单线程)时,发送失败可能是由更下游的组件抛出的异常引起的。但是,当向简单的队列通道(异步)发送消息时,抛出异常的可能性相当小。

虽然大多数路由器路由到单个通道,但它们可以返回多个通道名称。例如recipient-list-router, 正是这样做的。如果true在只路由到单个通道的路由器上将此属性设置为,则任何导致的异常都会被吞没,这通常没有什么意义。在这种情况下,最好在流入口点捕获错误流中的异常。因此,当路由器实现返回多个通道名称时 ,将ignore-send-failures属性设置为通常更有意义,因为失败的通道之后的其他通道仍会收到消息。true

此属性默认为false.

timeout

timeout属性指定向目标消息通道发送消息时等待的最长时间(以毫秒为单位)。默认情况下,发送操作无限期阻塞。

顶层(链外)

以下参数仅对链外的所有顶级路由器有效。

id

标识底层 Spring bean 定义,在路由器的情况下,它是EventDrivenConsumeror的一个实例PollingConsumer,具体取决于路由器input-channel是 aSubscribableChannel还是 a PollableChannel。这是一个可选属性。

auto-startup

这个“生命周期”属性表明该组件是否应该在应用程序上下文启动期间启动。此可选属性默认为true.

input-channel

此端点的接收消息通道。

order

此属性定义此端点作为订阅者连接到通道时的调用顺序。当该通道使用故障转移调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询使用者时,它没有任何作用。

路由器实现

由于基于内容的路由通常需要一些特定于域的逻辑,因此大多数用例都需要 Spring Integration 的选项,以便通过使用 XML 命名空间支持或注释来委托给 POJO。这两个将在后面讨论。但是,我们首先介绍几个满足常见要求的实现。

PayloadTypeRouter

APayloadTypeRouter将消息发送到由有效负载类型映射定义的通道,如以下示例所示:

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

PayloadTypeRouterSpring Integration 提供的命名空间也支持配置(请参阅 参考资料Namespace Support),它通过将<router/>配置及其相应的实现(使用<bean/>元素定义)组合成一个更简洁的配置元素,从本质上简化了配置。以下示例显示了PayloadTypeRouter与上述配置等效但使用命名空间支持的配置:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

使用 Java DSL 时,有两种选择。

首先,您可以定义路由器对象,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlows.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

请注意,路由器可以是但不一定是@Bean. 如果它不是@Bean.

其次,您可以在 DSL 流本身内定义路由功能,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlows.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}
HeaderValueRouter

AHeaderValueRouter根据各个标头值映射将消息发送到通道。创建a 时HeaderValueRouter,它会使用要评估的标头的名称进行初始化。标头的值可能是以下两种情况之一:

  • 任意值

  • 频道名称

如果它是任意值,则需要将这些标头值附加到通道名称的映射。否则,不需要额外的配置。

Spring Integration 提供了一个简单的基于命名空间的 XML 配置来配置HeaderValueRouter. 以下示例演示了HeaderValueRouter何时需要将标头值映射到通道的配置:

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

在解析过程中,上例中定义的路由器可能会遇到通道解析失败,导致异常。如果您想抑制此类异常并将未解决的消息发送到default-output-channel设置resolution-requiredfalse.

通常,标头值未显式映射到通道的消息将发送到default-output-channel. 但是,当标头值映射到通道名称但无法解析通道时,将resolution-required属性设置为会false导致将此类消息路由到default-output-channel.

从 Spring Integration 2.1 开始,该属性已从 更改ignore-channel-name-resolution-failuresresolution-required。属性resolution-required默认为true.

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

使用 Java DSL 时,有两种选择。首先,您可以定义路由器对象,如前面的示例所示:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlows.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

请注意,路由器可以是但不一定是@Bean. 如果它不是@Bean.

其次,您可以在 DSL 流本身内定义路由功能,如以下示例所示:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlows.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

不需要将标头值映射到通道名称的配置,因为标头值本身代表通道名称。以下示例显示了不需要将标头值映射到通道名称的路由器:

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

从 Spring Integration 2.1 开始,解析通道的行为更加明确。例如,如果您省略该default-output-channel属性,则路由器无法解析至少一个有效通道,并且通过设置为 忽略任何通道名称解析失败resolution-requiredfalse然后MessageDeliveryException抛出 a。

基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。如果您真的想删除消息,您还必须default-output-channel设置为nullChannel

RecipientListRouter

ARecipientListRouter将每个接收到的消息发送到一个静态定义的消息通道列表。以下示例创建一个RecipientListRouter

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration 还为RecipientListRouter配置提供命名空间支持(请参阅命名空间支持),如以下示例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

以下示例显示了在 Java 中配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

以下示例显示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}
此处的“应用序列”标志与发布订阅通道的效果相同,并且与发布订阅通道一样,默认情况下它在recipient-list-router. 有关详细信息,请参阅PublishSubscribeChannel配置

配置 a 时的另一个方便选项RecipientListRouter是使用 Spring 表达式语言 (SpEL) 支持作为各个接收者通道的选择器。这样做类似于在“链”的开头使用过滤器来充当“选择性消费者”。但是,在这种情况下,它们都相当简洁地组合到路由器的配置中,如以下示例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

在前面的配置中,selector-expression评估由属性标识的 SpEL 表达式以确定该收件人是否应包含在给定输入消息的收件人列表中。表达式的评估结果必须是布尔值。如果未定义此属性,则频道始终位于收件人列表中。

RecipientListRouterManagement

从 4.1 版开始,RecipientListRouter提供了多种操作来在运行时动态操作收件人。这些管理操作是RecipientListRouterManagement通过@ManagedResource注解来呈现的。它们可通过使用控制总线和 JMX 获得,如以下示例所示:

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");

从应用程序启动simpleRouter,只有一个channel1收件人。但是在addRecipient命令之后,channel2添加了收件人。这是一个“注册对作为消息的一部分的东西感兴趣”的用例,当我们可能在某个时间段对来自路由器的消息感兴趣时,我们正在订阅recipient-list-router并在某个时候决定取消订阅。

由于 的运行时管理操作<recipient-list-router>,它可以<recipient>从一开始就无需任何配置。RecipientListRouter在这种情况下,当消息没有匹配的收件人时, 的行为是相同的。如果defaultOutputChannel已配置,则将消息发送到那里。否则MessageDeliveryException抛出。

XPath 路由器

XPath 路由器是 XML 模块的一部分。请参阅使用 XPath 路由 XML 消息

路由和错误处理

Spring Integration 还提供了一个特殊的基于类型的路由器,称为ErrorMessageExceptionTypeRouter路由错误消息(定义为实例的消息payloadThrowableErrorMessageExceptionTypeRouter类似于PayloadTypeRouter. 事实上,它们几乎是相同的。唯一的区别是,在PayloadTypeRouter导航有效负载实例的实例层次结构(例如,payload.getClass().getSuperclass())以查找最具体的类型和通道映射时,ErrorMessageExceptionTypeRouter导航“异常原因”的层次结构(例如,payload.getCause())以查找最具体的Throwable类型或通道映射并用于与类或任何超类mappingClass.isInstance(cause)匹配。cause

在这种情况下,通道映射顺序很重要。因此,如果需要获取 aIllegalArgumentException而非 a的映射RuntimeException,则必须首先在路由器上配置最后一个。
从 4.3 版ErrorMessageExceptionTypeRouter开始,在初始化阶段加载所有映射类以快速为ClassNotFoundException.

以下示例显示了以下示例配置ErrorMessageExceptionTypeRouter

<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />

配置通用路由器

Spring Integration 提供了一个通用路由器。您可以将它用于通用路由(与 Spring Integration 提供的其他路由器相反,每个路由器都有某种形式的专业化)。

使用 XML 配置基于内容的路由器

router元素提供了一种将路由器连接到输入通道的方法,并且还接受可选default-output-channel属性。该ref属性引用自定义路由器实现的 bean 名称(必须扩展AbstractMessageRouter)。以下示例显示了三个通用路由器:

<int:router ref="payloadTypeRouter" input-channel="input1"
            default-output-channel="defaultOutput1"/>

<int:router ref="recipientListRouter" input-channel="input2"
            default-output-channel="defaultOutput2"/>

<int:router ref="customRouter" input-channel="input3"
            default-output-channel="defaultOutput3"/>

<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>

或者,ref可以指向包含@Router注释的 POJO(稍后显示),或者您可以将ref与显式方法名称结合使用。指定方法会应用@Router本文档后面的注释部分中描述的相同行为。ref以下示例定义了一个路由器,该路由器在其属性中指向 POJO :

<int:router input-channel="input" ref="somePojo" method="someMethod"/>

ref如果在其他定义中引用了自定义路由器实现,我们通常建议使用属性<router>。但是,如果自定义路由器实现的范围应为 的单个定义<router>,您可以提供内部 bean 定义,如以下示例所示:

<int:router method="someMethod" input-channel="input3"
            default-output-channel="defaultOutput3">
    <beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
不允许在同一配置中 同时使用ref属性和内部处理程序定义。<router>这样做会产生模棱两可的条件并引发异常。
如果ref属性引用了一个扩展的bean AbstractMessageProducingHandler(比如框架本身提供的路由器),那么配置会被优化为直接引用路由器。在这种情况下,每个ref属性必须引用一个单独的 bean 实例(或一个prototype-scoped bean)或使用内部<bean/>配置类型。但是,仅当您未在路由器 XML 定义中提供任何特定于路由器的属性时,此优化才适用。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。

以下示例显示了在 Java 中配置的等效路由器:

@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

以下示例显示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .route(myCustomRouter())
            .get();
}

public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

或者,您可以路由来自消息有效负载的数据,如以下示例所示:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
            .get();
}

路由器和 Spring 表达式语言 (SpEL)

有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 bean 可能看起来有点矫枉过正。从 Spring Integration 2.0 开始,我们提供了一种替代方案,让您可以使用 SpEL 来实现以前需要自定义 POJO 路由器的简单计算。

有关 Spring 表达式语言的更多信息,请参阅Spring Framework Reference Guide 中的相关章节

通常,对 SpEL 表达式求值并将其结果映射到通道,如以下示例所示:

<int:router input-channel="inChannel" expression="payload.paymentType">
    <int:mapping value="CASH" channel="cashPaymentChannel"/>
    <int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
    <int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>

以下示例显示了在 Java 中配置的等效路由器:

@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
    router.setChannelMapping("CASH", "cashPaymentChannel");
    router.setChannelMapping("CREDIT", "authorizePaymentChannel");
    router.setChannelMapping("DEBIT", "authorizePaymentChannel");
    return router;
}

以下示例显示了在 Java DSL 中配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
        .route("payload.paymentType", r -> r
            .channelMapping("CASH", "cashPaymentChannel")
            .channelMapping("CREDIT", "authorizePaymentChannel")
            .channelMapping("DEBIT", "authorizePaymentChannel"))
        .get();
}

为了进一步简化,SpEL 表达式可以计算为通道名称,如下面的表达式所示:

<int:router input-channel="inChannel" expression="payload + 'Channel'"/>

在前面的配置中,结果通道由 SpEL 表达式计算,该表达式将 的值payload与文字String'Channel' 连接起来。

SpEL 用于配置路由器的另一个优点是表达式可以返回 a Collection,从而有效地使每个<router>收件人列表成为路由器。每当表达式返回多个通道值时,都会将消息转发到每个通道。以下示例显示了这样的表达式:

<int:router input-channel="inChannel" expression="headers.channels"/>

在上述配置中,如果消息包含名称为“channels”的标头,并且该标头的值是List通道名称中的一个,则将消息发送到列表中的每个通道。当您需要选择多个通道时,您可能还会发现集合投影和集合选择表达式很有用。有关详细信息,请参阅:

使用注释配置路由器

当使用@Router注解一个方法时,该方法可能返回一个MessageChannel或一个String类型。在后一种情况下,端点解析通道名称,就像它解析默认输出通道一样。此外,该方法可以返回单个值或集合。如果返回集合,则将回复消息发送到多个通道。总而言之,以下方法签名都是有效的:

@Router
public MessageChannel route(Message message) {...}

@Router
public List<MessageChannel> route(Message message) {...}

@Router
public String route(Foo payload) {...}

@Router
public List<String> route(Foo payload) {...}

除了基于负载的路由之外,还可以基于消息头中可用的元数据作为属性或属性来路由消息。在这种情况下,带有注释的方法@Router可能包含带有注释的参数@Header,该参数映射到标头值,如以下示例所示并记录在注释支持中:

@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
有关基于 XML 的消息的路由,包括 XPath 支持,请参阅XML 支持 - 处理 XML 有效负载

有关路由器配置的更多信息,另请参阅Java DSL 章节中的消息路由器

动态路由器

Spring Integration 为常见的基于内容的路由用例提供了许多不同的路由器配置,以及将自定义路由器实现为 POJO 的选项。例如,PayloadTypeRouter提供了一种简单的方法来配置路由器,该路由器根据传入消息的有效负载类型计算通道,HeaderValueRouter同时在配置路由器时提供相同的便利,该路由器通过评估特定消息头的值来计算通道。还有基于表达式 (SpEL) 的路由器,其中通道是基于评估表达式来确定的。所有这些类型的路由器都表现出一些动态特性。

但是,这些路由器都需要静态配置。即使在基于表达式的路由器的情况下,表达式本身也被定义为路由器配置的一部分,这意味着对相同值操作的相同表达式总是导致计算相同通道。这在大多数情况下是可以接受的,因为这样的路线是明确定义的,因此是可预测的。但是有时我们需要动态更改路由器配置,以便消息流可以路由到不同的通道。

例如,您可能希望关闭系统的某些部分以进行维护,并临时将消息重新路由到不同的消息流。作为另一个示例,您可能希望通过添加另一个路由来处理更具体的类型java.lang.Number(在 的情况下PayloadTypeRouter)来为您的消息流引入更多粒度。

不幸的是,使用静态路由器配置来实现这些目标中的任何一个,您都必须关闭整个应用程序,更改路由器的配置(更改路由),然后重新启动应用程序。这显然不是任何人想要的解决方案。

动态路由器模式描述了您可以动态更改或配置路由器而无需关闭系统或单个路由器的机制。

在详细了解 Spring Integration 如何支持动态路由之前,我们需要考虑路由器的典型流程:

  1. 计算一个通道标识符,它是路由器收到消息后计算的一个值。通常,它是 String 或实际MessageChannel.

  2. 将通道标识符解析为通道名称。我们将在本节后面描述此过程的细节。

  3. 将通道名称解析为实际MessageChannel

如果步骤 1 产生了 的实际实例,那么动态路由方面就无能为力了MessageChannel,因为MessageChannel是任何路由器工作的最终产品。但是,如果第一步生成的通道标识符不是 的实例MessageChannel,那么您有很多可能的方法来影响MessageChannel. 考虑以下负载类型路由器的示例:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String"  channel="channel1" />
    <int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>

在负载类型路由器的上下文中,前面提到的三个步骤将实现如下:

  1. 计算作为有效负载类型的完全限定名称的通道标识符(例如,java.lang.String)。

  2. 将通道标识符解析为通道名称,其中上​​一步的结果用于从mapping元素中定义的有效负载类型映射中选择适当的值。

  3. 将通道名称解析为 的实际实例,作为对由上一步的结果标识MessageChannel的应用程序上下文(希望是 a )中的 bean 的引用。MessageChannel

换句话说,每个步骤都支持下一步,直到过程完成。

现在考虑一个标头值路由器的示例:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">
    <int:mapping value="foo" channel="fooChannel" />
    <int:mapping value="bar" channel="barChannel" />
</int:header-value-router>

现在我们可以考虑这三个步骤如何用于标头值路由器:

  1. 计算一个通道标识符,它是由header-name属性标识的标头的值。

  2. 将通道标识符 a 解析为通道名称,其中上​​一步的结果用于从mapping元素中定义的通用映射中选择适当的值。

  3. 将通道名称解析为 的实际实例,作为对由上一步的结果标识MessageChannel的应用程序上下文(希望是 a )中的 bean 的引用。MessageChannel

两种不同路由器类型的前两种配置看起来几乎相同。但是,如果您查看替代配置,HeaderValueRouter我们会清楚地看到没有mapping子元素,如以下清单所示:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">

但是,配置仍然完全有效。那么自然的问题就是第二步的映射呢?

第二步现在是可选的。如果mapping未定义,则在第一步中计算的通道标识符值将自动视为channel name,现在解析为实际的MessageChannel,如第三步中一样。这也意味着第二步是为路由器提供动态特征的关键步骤之一,因为它引入了一个过程,可以让您更改通道标识符解析为通道名称的方式,从而影响确定最终的过程MessageChannel来自初始通道标识符的实例。

例如,在前面的配置中,假设testHeader值为 'kermit',现在是通道标识符(第一步)。由于此路由器中没有映射,因此无法将此通道标识符解析为通道名称(第二步),并且此通道标识符现在被视为通道名称。但是,如果有一个映射但具有不同的值怎么办?最终结果仍然是相同的,因为如果无法通过将通道标识符解析为通道名称的过程确定新值,则通道标识符将成为通道名称。

剩下的就是第三步,将通道名称(“kermit”)解析MessageChannel为由该名称标识的实际实例。这基本上涉及对提供的名称的 bean 查找。现在所有包含 header-value 对的消息testHeader=kermit都将被路由到MessageChannel其 bean 名称(其id)为“kermit”的。

但是,如果您想将这些消息路由到“simpson”频道怎么办?显然更改静态配置是可行的,但这样做也需要关闭系统。但是,如果您可以访问频道标识符映射,则可以在 header-value 对 now 的位置引入一个新映射kermit=simpson,从而让第二步将“kermit”视为频道标识符,同时将其解析为“辛普森”作为频道姓名。

这显然适用于PayloadTypeRouter,您现在可以重新映射或删除特定的有效负载类型映射。事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步解析为实际的channel name.

任何作为 子类的路由器AbstractMappingMessageRouter(包括大多数框架定义的路由器)都是动态路由器,因为channelMapping是在级别定义的AbstractMappingMessageRouter。该地图的 setter 方法与“setChannelMapping”和“removeChannelMapping”方法一起作为公共方法公开。这些允许您在运行时更改、添加和删除路由器映射,只要您有对路由器本身的引用。这也意味着您可以通过 JMX(请参阅JMX 支持)或 Spring Integration 控制总线(请参阅控制总线)功能公开这些相同的配置选项。

回退到频道键作为频道名称,灵活方便。但是,如果您不信任消息创建者,恶意行为者(了解系统)可能会创建路由到意外通道的消息。例如,如果 key 设置为路由器输入通道的通道名称,这样的消息将被路由回路由器,最终导致堆栈溢出错误。因此,您可能希望禁用此功能(将channelKeyFallback属性设置为false),并在需要时更改映射。
使用控制总线管理路由器映射

管理路由器映射的一种方法是通过控制总线模式,它公开了一个控制通道,您可以向该通道发送控制消息以管理和监视 Spring Integration 组件,包括路由器。

有关控制总线的更多信息,请参阅控制总线

通常,您会发送一条控制消息,要求在特定托管组件(例如路由器)上调用特定操作。以下托管操作(方法)特定于更改路由器解析过程:

  • public void setChannelMapping(String key, String channelName)channel identifier:允许您在和之间添加新的或修改现有的映射channel name

  • public void removeChannelMapping(String key)channel identifier:让您删除特定的通道映射,从而断开和之间的关系channel name

请注意,这些方法可用于简单的更改(例如更新单个路由或添加或删除路由)。但是,如果您想删除一个路由并添加另一个路由,则更新不是原子的。这意味着路由表可能在更新之间处于不确定状态。从 4.0 版开始,您现在可以使用控制总线自动更新整个路由表。以下方法可让您这样做:

  • public Map<String, String>getChannelMappings():返回当前映射。

  • public void replaceChannelMappings(Properties channelMappings):更新映射。请注意,channelMappings参数是一个Properties对象。这种安排让控制总线命令使用内置的StringToPropertiesConverter,如以下示例所示:

"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"

请注意,每个映射都由换行符 ( \n) 分隔。setChannelMappings对于地图的编程更改,出于类型安全的考虑 ,我们建议您使用该方法。replaceChannelMappings忽略不是String对象的键或值。

使用 JMX 管理路由器映射

您还可以使用 Spring 的 JMX 支持来公开路由器实例,然后使用您最喜欢的 JMX 客户端(例如,JConsole)来管理这些操作(方法)以更改路由器的配置。

有关 Spring Integration 的 JMX 支持的更多信息,请参阅JMX 支持
路由单

从版本 4.1 开始,Spring Integration 提供了路由单企业集成模式的实现。它被实现为一个消息头,当没有为端点指定an 时routingSlip,它用于确定AbstractMessageProducingHandler实例中的下一个通道。outputChannel这种模式在复杂的动态情况下很有用,因为当配置多个路由器来确定消息流变得困难时。当消息到达没有 的端点时output-channel,将routingSlip咨询 以确定将消息发送到的下一个通道。当路由单用尽时,replyChannel恢复正常处理。

路由单的配置作为一个HeaderEnricher选项提供 - 一个包含path条目的分号分隔的路由单,如以下示例所示:

<util:properties id="properties">
    <beans:prop key="myRoutePath1">channel1</beans:prop>
    <beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>

<context:property-placeholder properties-ref="properties"/>

<header-enricher input-channel="input" output-channel="process">
    <routing-slip
        value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
               routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>

前面的例子有:

  • 一种<context:property-placeholder>配置,用于证明路由清单中的条目path可以指定为可解析的键。

  • <header-enricher> <routing-slip>子元素用于填充RoutingSlipHeaderValueMessageProcessor处理HeaderEnricher程序。

  • RoutingSlipHeaderValueMessageProcessor接受一String组已解析的路由单条目path并返回(从processMessage()) asingletonMappathaskey0as initial routingSlipIndex

Routing Slippath条目可以包含MessageChannelbean 名称、RoutingSlipRouteStrategybean 名称和 Spring 表达式 (SpEL)。在第一次调用时RoutingSlipHeaderValueMessageProcessor检查每个路由清单path条目。它将条目(不是应用程序上下文中的 bean 名称)转换为实例。 条目被多次调用,直到它们返回 null 或空。BeanFactoryprocessMessageExpressionEvaluatingRoutingSlipRouteStrategyRoutingSlipRouteStrategyString

由于getOutputChannel流程中涉及路由单,因此我们有一个请求-回复上下文。已RoutingSlipRouteStrategy被引入来确定下一个outputChannel使用requestMessagereply对象。此策略的实现应在应用程序上下文中注册为 bean,并且其 bean 名称用于路由单path。提供ExpressionEvaluatingRoutingSlipRouteStrategy了实现。它接受一个 SpEL 表达式,并使用一个内部ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply对象作为评估上下文的根对象。这是为了避免EvaluationContext每次ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()调用的创建开销。它是一个简单的 Java bean,具有两个属性:Message<?> requestObject reply. 通过这个表达式实现,我们可以使用 SpEL 指定路由path表条目(例如,@routingSlipRoutingPojo.get(request, reply)request.headers[myRoutingSlipChannel]) 并避免为RoutingSlipRouteStrategy.

参数requestMessage始终是 . Message<?>根据上下文,回复对象可以是Message<?>AbstractIntegrationMessageBuilder或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时)。在前两种情况下,在使用 SpEL(或 Java 实现)时,通常的Message属性 (payloadheaders) 可用。对于任意域对象,这些属性不可用。出于这个原因,如果结果用于确定下一条路径,则在将路由单与 POJO 方法结合使用时要小心。
如果分布式环境中涉及路由单,我们建议不要对路由单使用内联表达式path。此建议适用于分布式环境,例如跨 JVM 应用程序,request-reply通过消息代理(例如AMQP 支持JMS 支持)或在集成流中使用持久性MessageStore消息存储)。框架用于RoutingSlipHeaderValueMessageProcessor将它们转换为ExpressionEvaluatingRoutingSlipRouteStrategy对象,并在routingSlip消息头中使用它们。由于这个类不是Serializable(它不能是,因为它依赖于BeanFactory),所以整个Message变得不可序列化,并且在任何分布式操作中,我们最终都会得到一个NotSerializableException. 要克服此限制,ExpressionEvaluatingRoutingSlipRouteStrategy请使用所需的 SpEL 注册一个 bean,并在路由单path配置中使用其 bean 名称。

对于 Java 配置,您可以将RoutingSlipHeaderValueMessageProcessor实例添加到HeaderEnricherbean 定义中,如以下示例所示:

@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
    return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
            new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
                                                       "@routingSlipRoutingPojo.get(request, reply)",
                                                       "routingSlipRoutingStrategy",
                                                       "request.headers[myRoutingSlipChannel]",
                                                       "finishChannel")));
}

当端点产生回复并且没有outputChannel定义时,路由滑动算法的工作方式如下:

  • routingSlipIndex用于从路由清单列表中获取值path

  • 如果 from 的routingSlipIndex值为String,则用于从 获取 bean BeanFactory

  • 如果返回的 bean 是 的实例MessageChannel,则将其用作下一个outputChannel,并routingSlipIndex在回复消息头中递增 (路由清单path条目保持不变)。

  • 如果返回的 bean 是 的一个实例RoutingSlipRouteStrategy并且它getNextPath不返回空String值,那么该结果将用作下一个 bean 的名称outputChannelroutingSlipIndex保持不变。

  • 如果RoutingSlipRouteStrategy.getNextPath返回一个空的Stringor nullroutingSlipIndex则 递增,并为下一个 Routing Slip项getOutputChannelFromRoutingSlip递归调用。path

  • 如果下一个传送单path条目不是String,它必须是 的实例RoutingSlipRouteStrategy

  • routingSlipIndex超过路由path清单列表的大小时,算法将移动到标准replyChannel标题的默认行为。

流程管理器企业集成模式

企业集成模式包括流程管理器模式。您现在可以通过使用封装在RoutingSlipRouteStrategy路由清单中的自定义流程管理器逻辑轻松实现此模式。除了 bean 名称之外,RoutingSlipRouteStrategy还可以返回任何MessageChannel对象,并且不要求此MessageChannel实例是应用程序上下文中的 bean。这样,当无法预测应该使用哪个通道时,我们可以提供强大的动态路由逻辑。AMessageChannel可以在 中创建RoutingSlipRouteStrategy并返回。FixedSubscriberChannel对于这种情况,具有关联MessageHandler实现的A是一个很好的组合。例如,您可以路由到Reactive Streams,如以下示例所示:

@Bean
public PollableChannel resultsChannel() {
    return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
    return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
            ? new FixedSubscriberChannel(m ->
            Mono.just((String) m.getPayload())
                    .map(String::toUpperCase)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
            : new FixedSubscriberChannel(m ->
            Mono.just((Integer) m.getPayload())
                    .map(v -> v * 2)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}

筛选

消息过滤器用于根据某些标准(例如消息头值或消息内容本身)来决定是否Message应该传递或删除 a。因此,消息过滤器类似于路由器,不同之处在于,对于从过滤器的输入通道接收到的每条消息,相同的消息可能会或可能不会发送到过滤器的输出通道。与路由器不同,它不决定将消息发送到哪个消息通道,而只决定是否发送消息。

正如我们在本节后面描述的那样,过滤器还支持丢弃通道。在某些情况下,它可以根据布尔条件扮演一个非常简单的路由器(或“交换机”)的角色。

在 Spring Integration 中,您可以将消息过滤器配置为委托给MessageSelector接口实现的消息端点。该界面本身非常简单,如以下清单所示:

public interface MessageSelector {

    boolean accept(Message<?> message);

}

构造MessageFilter函数接受一个选择器实例,如以下示例所示:

MessageFilter filter = new MessageFilter(someSelector);

结合命名空间和 SpEL,您可以用很少的 Java 代码配置强大的过滤器。

使用 XML 配置过滤器

您可以使用该<filter>元素来创建消息选择端点。除了input-channeloutput-channel属性,它还需要一个ref属性。ref可以指向一个实现,如以下MessageSelector示例所示:

<int:filter input-channel="input" ref="selector" output-channel="output"/>

<bean id="selector" class="example.MessageSelectorImpl"/>

或者,您可以添加method属性。在这种情况下,ref属性可以引用任何对象。引用的方法可能需要Message入站消息的类型或负载类型。该方法必须返回一个布尔值。如果该方法返回“true”,则将消息发送到输出通道。以下示例显示如何配置使用该method属性的过滤器:

<int:filter input-channel="input" output-channel="output"
    ref="exampleObject" method="someBooleanReturningMethod"/>

<bean id="exampleObject" class="example.SomeObject"/>

如果选择器或适配的 POJO 方法返回false,则一些设置控制对被拒绝消息的处理。默认情况下(如果按照前面的示例进行配置),被拒绝的消息会被静默丢弃。如果拒绝反而会导致错误情况,请将throw-exception-on-rejection属性设置为true,如以下示例所示:

<int:filter input-channel="input" ref="selector"
    output-channel="output" throw-exception-on-rejection="true"/>

如果您希望将被拒绝的消息路由到特定通道,请将该引用提供为discard-channel,如以下示例所示:

<int:filter input-channel="input" ref="selector"
    output-channel="output" discard-channel="rejectedMessages"/>

另请参阅建议过滤器

消息过滤器通常与发布-订阅通道结合使用。许多过滤器端点可能订阅了同一个通道,它们决定是否将消息传递给下一个端点,下一个端点可以是任何受支持的类型(例如服务激活器)。这为使用具有单个点对点输入通道和多个输出通道的消息路由器的更主动方法提供了一种反应性替代方案。

ref如果在其他定义中引用了自定义过滤器实现,我们建议使用属性<filter>。但是,如果自定义过滤器实现的范围仅限于单个<filter>元素,则应提供内部 bean 定义,如以下示例所示:

<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
  <beans:bean class="org.foo.MyCustomFilter"/>
</filter>
不允许在同一配置中 同时使用ref属性和内部处理程序定义,因为它会创建模棱两可的条件并引发异常。<filter>
如果该ref属性引用了一个扩展的bean MessageFilter(例如框架本身提供的过滤器),则通过将输出通道直接注入过滤器bean来优化配置。在这种情况下,每个都ref必须是一个单独的 bean 实例(或一个prototype-scoped bean)或使用内部<bean/>配置类型。但是,仅当您未在过滤器 XML 定义中提供任何特定于过滤器的属性时,此优化才适用。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。

随着 SpEL 支持的引入,Spring Integration 将expression属性添加到过滤器元素中。对于简单的过滤器,它可以用来完全避免使用 Java,如以下示例所示:

<int:filter input-channel="input" expression="payload.equals('nonsense')"/>

作为表达式属性的值传递的字符串被评估为 SpEL 表达式,并且消息在评估上下文中可用。如果您必须在应用程序上下文的范围内包含表达式的结果,则可以使用SpEL 参考文档#{}中定义的表示法,如以下示例所示:

<int:filter input-channel="input"
            expression="payload.matches(#{filterPatterns.nonsensePattern})"/>

如果表达式本身需要是动态的,您可以使用“表达式”子元素。这提供了一定程度的间接性,用于通过ExpressionSource. 这是一个您可以直接实现的策略接口,或者您可以依赖 Spring Integration 中可用的版本,该版本从“资源包”加载表达式,并可以在给定的秒数后检查修改。所有这些都在以下配置示例中进行了演示,如果基础文件已被修改,则表达式可以在一分钟内重新加载:

<int:filter input-channel="input" output-channel="output">
    <int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>

<beans:bean id="myExpressions" id="myExpressions"
    class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
    <beans:property name="basename" value="config/integration/expressions"/>
    <beans:property name="cacheSeconds" value="60"/>
</beans:bean>

如果ExpressionSourcebean 被命名expressionSource,则不需要在<expression>元素上提供`source` 属性。但是,在前面的示例中,我们将其显示为完整性。

'config/integration/expressions.properties' 文件(或任何更具体的版本,具有以加载资源包的典型方式解析的语言环境扩展名)可以包含键/值对,如以下示例所示:

filterPatterns.example=payload > 100
所有这些expression用作属性或子元素的示例也可以应用于转换器、路由器、拆分器、服务激活器和标头丰富器元素。给定组件类型的语义和角色将影响评估结果的解释,就像解释方法调用的返回值一样。例如,表达式可以返回将被路由器组件视为消息通道名称的字符串。但是,在 Spring Integration 中的所有核心 EIP 组件中,将表达式作为根对象评估表达式并解析以“@”为前缀的 bean 名称的底层功能是一致的。

使用注释配置过滤器

以下示例显示了如何使用注释配置过滤器:

public class PetFilter {
    ...
    @Filter  (1)
    public boolean dogsOnly(String input) {
        ...
    }
}
1 指示此方法将用作过滤器的注释。如果要将此类用作过滤器,则必须指定它。

XML 元素提供的所有配置选项也可用于@Filter注释。

过滤器可以从 XML 中显式引用,或者,如果@MessageEndpoint注释是在类上定义的,则可以通过类路径扫描自动检测到。

分路器

拆分器是一个组件,其作用是将消息分割成多个部分,并将生成的消息发送到独立处理。很多时候,他们是包含聚合器的管道中的上游生产者。

编程模型

用于执行拆分的 API 由一个基类AbstractMessageSplitter. 它是一种MessageHandler封装拆分器通用功能的实现,例如在生成的消息上填充适当的消息头(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。这种填充可以跟踪消息及其处理结果(在典型情况下,这些标头被复制到由各种转换端点生成的消息中)。然后可以使用这些值,例如,由组合消息处理器使用

以下示例显示了以下内容的摘录AbstractMessageSplitter

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

要在应用程序中实现特定的拆分器,您可以扩展AbstractMessageSplitter并实现该splitMessage方法,其中包含用于拆分消息的逻辑。返回值可以是以下之一:

  • 一个Collection或一个消息数组或一个Iterable(或Iterator)迭代消息。在这种情况下,消息作为消息发送(在CORRELATION_ID,SEQUENCE_SIZESEQUENCE_NUMBER填充之后)。使用这种方法可以让您获得更多控制权——例如,在拆分过程中填充自定义消息头。

  • 一个Collection或一组非消息对象,或一个Iterable(or Iterator) 迭代非消息对象。它的工作原理与前一种情况类似,不同之处在于每个集合元素都用作消息有效负载。使用这种方法可以让您专注于域对象,而无需考虑消息传递系统并生成更易于测试的代码。

  • 一个Message或非消息对象(但不是集合或数组)。它的工作方式与前面的情况类似,只是发送了一条消息。

在 Spring Integration 中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值如前所述进行解释。输入参数可能是一个MessagePOJO 或一个简单的 POJO。在后一种情况下,拆分器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。

迭代器

从 4.1 版本开始,AbstractMessageSplitter支持拆分的Iterator类型。value请注意,在Iterator(或Iterable)的情况下,我们无权访问基础项目的数量,并且SEQUENCE_SIZE标题设置为0。这意味着SequenceSizeReleaseStrategyan的默认值<aggregator>将不起作用,并且CORRELATION_IDfrom 的组splitter将不会被释放;它将保持为incomplete. 在这种情况下,您应该使用适当的自定义ReleaseStrategy或依赖send-partial-result-on-expirygroup-timeout或一起使用MessageGroupStoreReaper

从 5.0 版开始,如果可能的话,AbstractMessageSplitter提供protected obtainSizeIfPossible()了允许确定Iterable和对象大小的方法。Iterator例如XPathMessageSplitter可以确定底层NodeList对象的大小。从 5.0.9 版本开始,此方法还正确返回com.fasterxml.jackson.core.TreeNode.

Iterator对象有助于避免在拆分之前在内存中构建整个集合。例如,当MGET使用迭代或流从某个外部系统(例如 DataBase 或 FTP )填充底层项目时。

流和通量

从 5.0 版开始,支持用于拆分AbstractMessageSplitter的 JavaStream和 Reactive StreamsPublisher类型。value在这种情况下,目标Iterator是建立在它们的迭代功能之上的。

此外,如果拆分器的输出通道是 a 的实例ReactiveStreamsSubscribableChannel,则AbstractMessageSplitter生成Flux结果而不是 a Iterator,并且输出通道订阅此结果Flux,以根据下游流量需求进行基于背压的拆分。

从 5.2 版开始,拆分器支持discardChannel发送那些拆分函数返回空容器(集合、数组、流Flux等)的请求消息的选项。在这种情况下,没有要迭代的项目以发送到outputChannel. 拆分结果null保留为流结束指示符。

使用 XML 配置拆分器

可以通过 XML 配置拆分器,如下所示:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 拆分器的 ID 是可选的。
2 对应用程序上下文中定义的 bean 的引用。bean 必须实现拆分逻辑,如前面部分所述。可选的。如果未提供对 bean 的引用,则假定到达的消息的有效负载input-channel是 的实现,java.util.Collection并且默认拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到output-channel.
3 实现拆分逻辑的方法(在 bean 上定义)。可选的。
4 分配器的输入通道。必需的。
5 拆分器将传入消息拆分结果发送到的通道。可选(因为传入的消息可以自己指定回复通道)。
6 拆分结果为空时请求消息发送到的通道。可选(将在null结果的情况下停止)。

ref如果可以在其他定义中引用自定义拆分器实现,我们建议使用属性<splitter>。但是,如果自定义拆分器处理程序实现的范围应为 的单个定义,则<splitter>可以配置内部 bean 定义,如下例所示:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一配置中 同时使用ref属性和内部处理程序定义,因为它会创建模棱两可的条件并导致引发异常。<int:splitter>
如果ref属性引用了扩展的 bean AbstractMessageProducingHandler(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个ref必须是单独的 bean 实例(或prototype-scoped bean)或使用内部<bean/>配置类型。但是,仅当您未在拆分器 XML 定义中提供任何拆分器特定属性时,此优化才适用。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。

使用注释配置拆分器

@Splitter注解适用于期望类型或消息负载类型的Message方法,并且方法的返回值应该是Collection任何类型的 a。如果返回的值不是实际Message对象,则每个项目都包装在 aMessage作为Message. 每个结果Message都被发送到定义 的端点的指定输出通道@Splitter

以下示例显示了如何使用@Splitter注解配置拆分器:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}

另请参阅使用注释、拆分器文件拆分器为端点提供建议

聚合器

基本上是拆分器的镜像,聚合器是一种消息处理程序,它接收多条消息并将它们组合成一条消息。实际上,聚合器通常是包含拆分器的管道中的下游消费者。

从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。它必须保存要聚合的消息,并确定何时可以聚合完整的消息组。为此,它需要一个MessageStore.

功能性

聚合器通过关联和存储一组相关消息来组合一组相关消息,直到该组被认为是完整的。此时,聚合器通过处理整个组创建单个消息并将聚合消息作为输出发送。

实现聚合器需要提供执行聚合的逻辑(即,从多个创建单个消息)。两个相关的概念是相关性和释放。

相关性确定消息如何分组以进行聚合。在 Spring Integration 中,默认情况下会根据IntegrationMessageHeaderAccessor.CORRELATION_ID消息头完成关联。相同的消息IntegrationMessageHeaderAccessor.CORRELATION_ID被分组在一起。但是,您可以自定义关联策略以允许以其他方式指定消息应如何组合在一起。为此,您可以实现一个CorrelationStrategy(本章稍后介绍)。

为了确定准备处理一组消息的点,ReleaseStrategy请查阅 a。当序列中包含的所有消息都存在时,聚合器的默认释放策略会根据IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标头释放组。您可以通过提供对自定义ReleaseStrategy实现的引用来覆盖此默认策略。

编程模型

聚合 API 由许多类组成:

  • 接口MessageGroupProcessor及其子类:MethodInvokingAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessor

  • ReleaseStrategy接口及其默认实现:SimpleSequenceSizeReleaseStrategy

  • CorrelationStrategy接口及其默认实现:HeaderAttributeCorrelationStrategy

AggregatingMessageHandler

AggregatingMessageHandler(的子类)AbstractCorrelatingMessageHandler是一个MessageHandler实现,封装了聚合器(和其他相关用例)的通用功能,如下所示:

  • 将消息关联到要聚合的组中

  • 将这些消息保留在 a 中,MessageStore直到可以释放该组

  • 决定何时可以释放该组

  • 将已发布的组聚合为单个消息

  • 识别和响应过期组

决定如何将消息组合在一起的责任委托给一个CorrelationStrategy实例。决定是否可以释放消息组的责任委托给一个ReleaseStrategy实例。

以下清单显示了基础的简要亮点AbstractAggregatingMessageGroupProcessor(实现该aggregatePayloads方法的责任留给开发人员):

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

请参阅DefaultAggregatingMessageGroupProcessor和作为ExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor开箱即用实现AbstractAggregatingMessageGroupProcessor

从 5.2 版开始,Function<MessageGroup, Map<String, Object>>可以使用一种策略AbstractAggregatingMessageGroupProcessor来合并和计算(聚合)输出消息的标头。该DefaultAggregateHeadersFunction实现可用于返回组之间没有冲突的所有标头的逻辑;组内的一条或多条消息上缺少标头不被视为冲突。省略冲突的标题。与新引入的 一起DelegatingMessageGroupProcessor,此函数用于任何任意(非AbstractAggregatingMessageGroupProcessorMessageGroupProcessor实现。本质上,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor实例中,并将所有其他实现包装到DelegatingMessageGroupProcessor. AbstractAggregatingMessageGroupProcessorthe和 the之间的逻辑差异DelegatingMessageGroupProcessor后者在调用委托策略之前不会提前计算标头,并且如果委托返回 aMessage或,则不会调用该函数AbstractIntegrationMessageBuilder。在这种情况下,框架假定目标实现已经负责生成一组正确的标题,这些标题填充到返回的结果中。该Function<MessageGroup, Map<String, Object>>策略可用作headers-functionXML 配置的参考属性、AggregatorSpec.headersFunction()Java DSL 的选项和AggregatorFactoryBean.setHeadersFunction()纯 Java 配置的选项。

CorrelationStrategy拥有,AbstractCorrelatingMessageHandler并且具有基于IntegrationMessageHeaderAccessor.CORRELATION_ID消息头的默认值,如以下示例所示:

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor. 它创建一个单一Message的,其有效负载是List为给定组接收的有效负载中的一个。这适用于具有拆分器、发布-订阅通道或上游接收者列表路由器的简单分散-收集实现。

在这种情况下使用发布-订阅通道或收件人列表路由器时,请务必启用该apply-sequence标志。这样做会添加必要的标题:CORRELATION_IDSEQUENCE_NUMBERSEQUENCE_SIZE. Spring Integration 中的拆分器默认启用该行为,但对于发布-订阅通道或收件人列表路由器未启用该行为,因为这些组件可能在不需要这些标头的各种上下文中使用。

在为应用程序实现特定聚合器策略时,您可以扩展AbstractAggregatingMessageGroupProcessor和实现该aggregatePayloads方法。但是,有更好的解决方案,与 API 耦合较少,用于实现聚合逻辑,可以通过 XML 或通过注释进行配置。

一般来说,任何 POJO 都可以实现聚合算法,如果它提供了一个接受单个java.util.List作为参数的方法(也支持参数化列表)。调用此方法来聚合消息,如下所示:

  • 如果参数是 ajava.util.Collection<T>并且参数类型 T 可分配给Message,则为聚合而累积的整个消息列表将发送到聚合器。

  • 如果参数是非参数化的java.util.Collection或参数类型不可分配给Message,则该方法接收累积消息的有效负载。

  • 如果返回类型不可分配给Message,则将其视为Message框架自动创建的 a 的有效负载。

为了简化代码并促进低耦合、可测试性等最佳实践,实现聚合逻辑的首选方式是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。

从 5.3 版开始,在处理消息组之后,将针对具有多个嵌套级别的正确拆分器-聚合器场景AbstractCorrelatingMessageHandler执行MessageBuilder.popSequenceDetails()消息头修改。仅当消息组发布结果不是消息集合时才会这样做。在这种情况下,目标在构建这些消息时MessageGroupProcessor负责MessageBuilder.popSequenceDetails()调用。

如果MessageGroupProcessor返回 a Message,则仅当与组中的第一条消息匹配MessageBuilder.popSequenceDetails()时才会对输出消息执行a 。sequenceDetails(以前只有当一个普通的有效载荷或AbstractIntegrationMessageBuilderMessageGroupProcessor.

此功能可以由新popSequence boolean属性控制,因此MessageBuilder.popSequenceDetails()在标准拆分器未填充相关详细信息时,可以在某些情况下禁用该功能。从本质上讲,此属性撤消applySequence = trueAbstractMessageSplitter. 有关详细信息,请参阅拆分器

SimpleMessageGroup.getMessages()方法返回一个unmodifiableCollection. 因此,如果您的聚合 POJO 方法有一个Collection<Message>参数,则传入的参数正是该Collection实例,并且当您将 aSimpleMessageStore用于聚合器时,该原始参数Collection<Message>在释放组后被清除。因此,Collection<Message>如果 POJO 中的变量从聚合器中传递出去,它也会被清除。如果您希望简单地按原样发布该集合以进行进一步处理,则必须构建一个新的Collection(例如,new ArrayList<Message>(messages))。从 4.3 版开始,框架不再将消息复制到新集合中,以避免不必要的额外对象创建。

如果该processMessageGroup方法MessageGroupProcessor返回一个集合,它必须是一个Message<?>对象的集合。在这种情况下,消息是单独发布的。在 4.2 版之前,无法MessageGroupProcessor通过使用 XML 配置来提供。只有 POJO 方法可以用于聚合。现在,如果框架检测到引用的(或内部)bean 实现MessageProcessor了 ,则它被用作聚合器的输出处理器。

如果您希望从自定义中释放一组对象MessageGroupProcessor作为消息的有效负载,您的类应该扩展AbstractAggregatingMessageGroupProcessor并实现aggregatePayloads().

此外,从 4.2 版开始,SimpleMessageGroupProcessor提供了 a。它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。

这让聚合器作为消息屏障工作,到达的消息被保留,直到释放策略触发并且组作为单个消息序列被释放。

ReleaseStrategy

ReleaseStrategy接口定义如下:

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

一般来说,任何 POJO 都可以实现完成决策逻辑,前提是它提供了一个接受单个java.util.List作为参数的方法(也支持参数化列表)并返回一个布尔值。每条新消息到达后调用该方法,判断组是否完整,如下:

  • 如果参数是 ajava.util.List<T>并且参数类型T可分配给Message,则将组中累积的整个消息列表发送到该方法。

  • 如果参数是非参数化java.util.List的或参数类型不可分配给Message,则该方法接收累积消息的有效负载。

  • true如果消息组准备好聚合,则该方法必须返回,否则返回 false。

以下示例显示了如何将@ReleaseStrategy注解用于 aList类型Message

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

以下示例显示了如何将@ReleaseStrategy注解用于 aList类型String

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

根据前面两个示例中的签名,基于 POJO 的发布策略是传递一个Collection尚未发布的消息(如果您需要访问整体Message)或一个Collection有效负载对象(如果类型参数不是Message) . 这满足了大多数用例。但是,如果由于某种原因需要访问完整的MessageGroup,则应提供ReleaseStrategy接口的实现。

在处理潜在的大型组时,您应该了解这些方法是如何调用的,因为在释放组之前可能会多次调用释放策略。最有效的是 的实现ReleaseStrategy,因为聚合器可以直接调用它。第二个最有效的是带有Collection<Message<?>>参数类型的 POJO 方法。效率最低的是带有Collection<Something>类型的 POJO 方法。Something每次调用发布策略时,框架都必须将组中消息的有效负载复制到新的集合中(并可能尝试将有效负载转换为)。使用Collection<?>可避免转换,但仍需要创建新的Collection.

由于这些原因,对于大型团体,我们建议您实施ReleaseStrategy.

当组被释放以进行聚合时,其所有尚未释放的消息都将被处理并从组中删除。如果组也是完整的(即,如果来自一个序列的所有消息都已到达,或者如果没有定义序列),则将该组标记为完整。该组的任何新消息都将发送到丢弃通道(如果已定义)。设置expire-groups-upon-completiontrue(默认为false)会删除整个组,并且任何新消息(与已删除组具有相同的相关 ID)形成一个新组。MessageGroupStoreReaper您可以通过将 a与send-partial-result-on-expiry设置为一起使用来释放部分序列true

为了便于丢弃迟到的消息,聚合器必须在组被释放后维护组的状态。这最终会导致内存不足的情况。为避免这种情况,您应该考虑配置 aMessageGroupStoreReaper以删除组元数据。一旦到达某个点,过期参数应设置为过期组,之后延迟消息预计不会到达。有关配置收割机的信息,请参阅在聚合器中管理状态:MessageGroupStore

Spring Integration 提供了一个实现ReleaseStrategySimpleSequenceSizeReleaseStrategy. 此实现参考每个到达消息的SEQUENCE_NUMBERSEQUENCE_SIZE标头来决定消息组何时完成并准备好进行聚合。如前所述,它也是默认策略。

在 5.0 版本之前,默认的发布策略是SequenceSizeReleaseStrategy,这在大型组中表现不佳。使用该策略,可以检测并拒绝重复的序列号。此操作可能很昂贵。

如果您正在聚合大型组,则不需要释放部分组,并且不需要检测/拒绝重复序列,请考虑SimpleSequenceSizeReleaseStrategy改用 - 对于这些用例来说效率更高,并且是默认设置,因为未指定部分组发布时的版本 5.0 。

聚合大型组

4.3 版本将Collectiona 中消息的默认值更改SimpleMessageGroupHashSet(以前是 a BlockingQueue)。当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。尽管散列集的删除速度通常要快得多,但对于大型消息来说可能代价高昂,因为必须在插入和删除时都计算散列。如果您有散列成本很高的消息,请考虑使用其他一些集合类型。如UsingMessageGroupFactory中所讨论的,提供了 aSimpleMessageGroupFactory以便您可以选择Collection最适合您需要的。您还可以提供自己的工厂实现来创建其他一些Collection<Message<?>>.

以下示例显示了如何使用先前的实现和 a 配置聚合器SimpleSequenceSizeReleaseStrategy

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器的上游流,则序列大小释放策略(固定或基于sequenceSize标头)将无法达到其目的,因为来自序列的某些消息可能会被过滤器丢弃。在这种情况下,建议选择另一个ReleaseStrategy,或使用从丢弃子流发送的补偿消息,该消息在其内容中携带一些信息,以便在自定义完整组功能中跳过。有关详细信息, 请参阅过滤器。
关联策略

CorrelationStrategy接口定义如下:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

该方法返回一个Object表示用于将消息与消息组相关联的相关键的值。密钥必须满足用于 a 中的密钥的标准,Map以实现equals()hashCode()

一般来说,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与 a 的规则相同ServiceActivator(包括对@Header注释的支持)。该方法必须返回一个值,并且该值不能是null.

Spring Integration 提供了一个实现CorrelationStrategyHeaderAttributeCorrelationStrategy. 此实现返回消息头之一(其名称由构造函数参数指定)的值作为关联键。默认情况下,关联策略是HeaderAttributeCorrelationStrategy返回CORRELATION_IDheader 属性的值。如果您有一个想要用于关联的自定义标头名称,您可以在一个实例上对其进行配置,HeaderAttributeCorrelationStrategy并将其作为聚合器关联策略的参考。

锁定注册表

对组的更改是线程安全的。因此,当您同时发送具有相同关联 ID 的消息时,聚合器中只会处理其中一个消息,从而使其有效地成为每个消息组的单线程。ALockRegistry用于获取已解析的相关 ID 的锁定。ADefaultLockRegistry默认使用(在内存中)。为了在使用共享MessageGroupStore的服务器之间同步更新,您必须配置共享锁注册表。

避免死锁

如上所述,当消息组发生变异(添加或释放消息)时,会持有一个锁。

考虑以下流程:

...->aggregator1-> ... ->aggregator2-> ...

如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。这将导致线程挂起,并jstack <pid>可能出现以下结果:

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

有几种方法可以避免这个问题:

  • 确保每个聚合器都有自己的锁注册表(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)

  • 使用ExecutorChannelorQueueChannel作为聚合器的输出通道,以便下游流在新线程上运行

  • 从版本 5.1.1 开始,将releaseLockBeforeSend聚合器属性设置为true

如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也可能导致此问题。当然,上述第一种解决方案不适用于这种情况。

在 Java DSL 中配置聚合器

有关如何在 Java DSL 中配置聚合器的信息,请参阅聚合器和重新排序器。

使用 XML 配置聚合器

Spring Integration 支持通过元素配置带有 XML 的聚合<aggregator/>器。以下示例显示了聚合器​​的示例:

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)

        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)

        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)

        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)

        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)

        lock-registry="lockRegistry"                       (20)

        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)

        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 聚合器的 id 是可选的。
2 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。可选(默认为“真”)。
3 聚合器从中接收消息的通道。必需的。
4 聚合器将聚合结果发送到的通道。可选(因为传入消息本身可以在“replyChannel”消息头中指定回复通道)。
5 聚合器将超时消息发送到的通道(如果send-partial-result-on-expiryfalse)。可选的。
6 对 a 的引用,MessageGroupStore用于将消息组存储在它们的相关键下,直到它们完成。可选的。默认情况下,它是一个易失的内存存储。有关详细信息,请参阅消息存储
7 订阅多个句柄时此聚合器的顺序DirectChannel(用于负载平衡目的)。可选的。
8 指示一旦包含过期消息,就应该聚合过期消息并将其发送到“输出通道”或“回复通道” MessageGroup(请参阅​​ 参考资料MessageGroupStore.expireMessageGroups(long))。使 a 过期的一种方法MessageGroup是配置 a MessageGroupStoreReaperMessageGroup但是,您也可以通过调用来过期MessageGroupStore.expireMessageGroups(timeout)。您可以通过控制总线操作来完成此操作,或者,如果您有对MessageGroupStore实例的引用,则可以通过调用expireMessageGroups(timeout). 否则,这个属性本身什么也不做。它仅用作是否丢弃或发送到输出或回复通道的任何仍然在MessageGroup即将过期的消息的指示符。可选(默认为false)。注意:这个属性可能更恰当地称为send-partial-result-on-timeout,因为如果组可能实际上不会过期expire-groups-upon-timeout设置为false
9 Messageoutput-channel或发送回复时等待的超时间隔discard-channel。默认为-1,这会导致无限期阻塞。仅当输出通道具有某些“发送”限制时才应用它,例如QueueChannel具有固定“容量”的 a。在这种情况下,MessageDeliveryException抛出 a。对于AbstractSubscribableChannel实现,send-timeout被忽略。对于group-timeout(-expression)MessageDeliveryException来自计划的过期任务导致该任务被重新计划。可选的。
10 对实现消息相关(分组)算法的 bean 的引用。bean 可以是CorrelationStrategy接口或 POJO 的实现。在后一种情况下,correlation-strategy-method还必须定义属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头)。
11 在 引用的 bean 上定义的方法correlation-strategy。它实现了相关决策算法。可选,有限制(correlation-strategy必须存在)。
12 表示关联策略的 SpEL 表达式。示例:"headers['something']"。只允许correlation-strategy或之一。correlation-strategy-expression
13 对应用程序上下文中定义的 bean 的引用。bean 必须实现聚合逻辑,如前所述。可选(默认情况下,聚合消息列表成为输出消息的有效负载)。
14 ref在属性引用的 bean 上定义的方法。它实现了消息聚合算法。可选(取决于ref定义的属性)。
15 对实现发布策略的 bean 的引用。bean 可以是ReleaseStrategy接口或 POJO 的实现。在后一种情况下,release-strategy-method还必须定义属性。可选(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader 属性)。
16 release-strategy在属性引用的 bean 上定义的方法。它实现了完成决策算法。可选,有限制(release-strategy必须存在)。
17 表示发布策略的 SpEL 表达式。表达式的根对象是 a MessageGroup。示例:"size() == 5"。只允许release-strategy或之一。release-strategy-expression
18 当设置为true(默认为false)时,已完成的组将从消息存储中删除,让具有相同相关性的后续消息形成一个新组。默认行为是将具有与已完成组相同相关性的消息发送到discard-channel.
19 仅当MessageGroupStoreReaper为 的 配置了MessageStorea时才适用<aggregator>。默认情况下,当 aMessageGroupStoreReaper配置为使部分组过期时,也会删除空组。空组在组正常释放后存在。空组可以检测和丢弃迟到的消息。如果您希望空组的过期时间比部分组过期的时间长,请设置此属性。MessageStore然后,直到至少在此毫秒数内没有修改空组,才会从 中删除空组。请注意,使空组过期的实际时间也受 reapertimeout属性的影响,它可能与该值加上超时一样多。
20 org.springframework.integration.util.LockRegistry对bean的引用。它用于获取一个Lock基于 的groupIdfor 并发操作MessageGroup。默认情况下,使用内部DefaultLockRegistry。使用分布式LockRegistry,例如ZookeeperLockRegistry,确保只有一个聚合器实例可以同时对组进行操作。有关详细信息,请参阅Redis 锁注册表Gemfire 锁注册表Zookeeper 锁注册表
21 当前消息到达时未释放组时强制MessageGroup完成的超时(以毫秒为单位) 。ReleaseStrategy此属性为聚合器提供内置的基于时间的发布策略,如果新消息MessageGroup在超时时间内未到达,则需要发出部分结果(或丢弃组),该超时时间从最后一个时间开始计算消息到达。要设置从MessageGroup创建时间开始计算的超时,请参阅group-timeout-expression信息。当一条新消息到达聚合器时,任何现有ScheduledFuture<?>的消息MessageGroup都会被取消。如果ReleaseStrategy返回false(意思是不释放)和groupTimeout > 0,一个新的任务计划使该组到期。我们不建议将此属性设置为零(或负值)。这样做会有效地禁用聚合器,因为每个消息组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关group-timeout-expression信息,请参阅。完成期间采取的操作取决于ReleaseStrategysend-partial-group-on-expiry属性。有关详细信息,请参阅聚合器和组超时。它与 'group-timeout-expression' 属性互斥。
22 计算结果为 a 的 SpEL 表达式,groupTimeout其中MessageGroup为计算#root上下文对象。用于调度MessageGroup强制完成。如果表达式的计算结果为null,则不安排完成。如果它的计算结果为零,则该组立即在当前线程上完成。实际上,这提供了动态group-timeout属性。例如,如果您希望MessageGroup在创建组后 10 秒后强制完成 a,您可以考虑使用以下 SpEL 表达式:timestamp + 10000 - T(System).currentTimeMillis()where timestampis provided by MessageGroup.getTimestamp()as the MessageGrouphere is the#root评估上下文对象。但是请记住,组创建时间可能与首次到达消息的时间不同,具体取决于其他组过期属性的配置。有关group-timeout更多信息,请参阅。与“组超时”属性互斥。
23 当组由于超时(或通过 a MessageGroupStoreReaper)完成时,默认情况下该组过期(完全删除)。迟到的消息会启动一个新组。您可以将其设置false为完成组,但保留其元数据,以便丢弃迟到的消息。空组可以在以后使用 aMessageGroupStoreReaperempty-group-min-timeout属性过期。它默认为“真”。
24 如果在. TaskScheduler_ MessageGroup_ 如果未提供,则使用在( ) 中注册的默认调度程序 ( ) 。如果指定或未指定,则此属性不适用。MessageGroupgroupTimeouttaskSchedulerApplicationContextThreadPoolTaskSchedulergroup-timeoutgroup-timeout-expression
25 从 4.1 版开始。它允许为forceComplete操作启动事务。它由 agroup-timeout(-expression)或 a启动,MessageGroupStoreReaper不适用于正常addrelease、 和discard操作。只有这个子元素 or<expire-advice-chain/>是允许的。
26 4.1 版开始。它允许AdviceforceComplete操作配置任何。它由 agroup-timeout(-expression)或 a启动,MessageGroupStoreReaper不适用于正常addrelease、 和discard操作。只有这个子元素 or<expire-transactional/>是允许的。Advice也可以使用 Spring 命名空间在此处配置事务tx
即将到期的组

有两个与过期(完全删除)组相关的属性。当一个组过期时,没有它的记录,如果新消息到达时具有相同的相关性,则启动一个新组。当一个组完成时(没有过期),空组仍然存在,迟到的消息被丢弃。稍后可以通过将 aMessageGroupStoreReaperempty-group-min-timeout属性结合使用来删除空组。

expire-groups-upon-completionReleaseStrategy释放组时的“正常”完成有关。这默认为false.

如果一个组没有正常完成,但由于超时而被释放或丢弃,则该组正常过期。从 4.1 版开始,您可以使用expire-groups-upon-timeout. 它默认true为向后兼容。

当一个组超时时,将ReleaseStrategy再给一次机会来释放该组。如果这样做并且expire-groups-upon-timeout为假,则过期由 控制expire-groups-upon-completion。如果组在超时期间没有被释放策略释放,则过期由expire-groups-upon-timeout. 超时组要么被丢弃,要么发生部分释放(基于send-partial-result-on-expiry)。

从 5.0 版开始,空组也计划在empty-group-min-timeout. 如果expireGroupsUponCompletion == falseminimumTimeoutForEmptyGroups > 0,则在正常或部分序列发布发生时安排删除组的任务。

从 5.4 版开始,聚合器(和重排序器)可以配置为使孤立组(持久消息存储中的组,否则可能不会被释放)过期。(expireTimeout如果大于0)表示应清除存储中比该值更早的组。该purgeOrphanedGroups()方法在启动时调用,并与提供的 一起expireDuration在计划任务中定期调用。该方法也可以随时被外部调用。过期逻辑完全委托给forceComplete(MessageGroup)根据上述提供的到期选项的功能。当需要从那些不再使用常规消息到达逻辑释放的旧组中清除消息存储时,这种定期清除功能很有用。在大多数情况下,这发生在应用程序重新启动后,使用持久消息组存储时。该功能类似于MessageGroupStoreReaper计划任务,但在使用组超时而不是收割机时,提供了一种方便的方式来处理特定组件中的旧组。这MessageGroupStore必须专门为当前相关端点提供。否则,一个聚合器可能会从另一个聚合器中清除组。使用聚合器,使用此技术过期的组将被丢弃或作为部分组释放,具体取决于expireGroupsUponCompletion属性。

ref如果自定义聚合器处理程序实现可能在其他<aggregator>定义中被引用,我们通常建议使用属性。但是,如果自定义聚合器实现仅由 的单个定义使用<aggregator>,则可以使用内部 bean 定义(从版本 1.0.3 开始)在<aggregator>元素内配置聚合 POJO,如以下示例所示:

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
不允许在同一配置中 同时使用ref属性和内部 bean 定义,因为它会创建模棱两可的条件。<aggregator>在这种情况下,会引发异常。

以下示例显示了聚合器​​ bean 的实现:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前面示例的完成策略 bean 的实现可能如下所示:

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
只要这样做有意义,发布策略方法和聚合器方法都可以组合到一个 bean 中。

上述示例的关联策略 bean 的实现可能如下所示:

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前面示例中的聚合器将按某些标准(在本例中为除以十后的余数)对数字进行分组,并保留该组,直到有效负载提供的数字总和超过某个值。

只要这样做有意义,发布策略方法、相关策略方法和聚合器方法都可以组合在一个 bean 中。(实际上,它们都可以组合起来,也可以任意组合起来。)
聚合器和 Spring 表达式语言 (SpEL)

从 Spring Integration 2.0 开始,您可以使用SpEL处理各种策略(相关、发布和聚合),如果此类发布策略背后的逻辑相对简单,我们建议您这样做。假设您有一个设计用于接收对象数组的遗留组件。我们知道,默认发布策略将所有聚合消息组合在List. 现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每条消息的有效负载并组装对象数组。以下示例解决了这两个问题:

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

然而,使用 SpEL,这样的需求实际上可以通过单行表达式相对容易地处理,从而使您不必编写自定义类并将其配置为 bean。以下示例显示了如何执行此操作:

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载组装一个新集合,然后将其转换为数组,从而实现与早期 Java 代码相同的结果。

在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方法。

您可以将简单的关联逻辑实现为 SpEL 表达式并在属性中配置它,而不是在CorrelationStrategy属性中为自定义定义 bean ,如以下示例所示:correlation-strategycorrelation-strategy-expression

correlation-strategy-expression="payload.person.id"

在前面的示例中,我们假设有效负载具有带有 的person属性,id该属性将用于关联消息。

同样,对于ReleaseStrategy,您可以将发布逻辑实现为 SpEL 表达式并在release-strategy-expression属性中进行配置。评估上下文的根对象是它MessageGroup本身。List可以通过使用message表达式中组的属性来引用消息。

在 5.0 之前的版本中,根对象是 的集合Message<?>,如前面的示例所示:
release-strategy-expression="!messages.?[payload==5].empty"

在前面的示例中,SpEL 评估上下文的根对象是MessageGroup自身,并且您声明,一旦5该组中存在有效负载为的消息,就应该释放该组。

聚合器和组超时

从 4.0 版开始,引入了两个新的互斥属性:group-timeoutgroup-timeout-expression. 请参阅使用 XML 配置聚合器ReleaseStrategy在某些情况下,如果当前消息到达时未释放,您可能需要在超时后发出聚合器结果(或丢弃组) 。为此,该选项允许强制完成groupTimeout调度,如以下示例所示:MessageGroup

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

在此示例中,如果聚合器按release-strategy-expression. 如果该特定消息未到达,则groupTimeout强制组在十秒后完成,只要该组包含至少两条消息。

强制组完成的结果取决于ReleaseStrategysend-partial-result-on-expiry。首先,再次咨询发布策略,看是否要进行正常发布。虽然该组没有改变,但此时ReleaseStrategy可以决定释放该组。如果释放策略仍然没有释放该组,则它已过期。如果send-partial-result-on-expirytrue,(部分)MessageGroup中的现有消息将作为普通聚合器回复消息发布到output-channel. 否则,它被丢弃。

groupTimeout行为和MessageGroupStoreReaper(请参阅使用 XML 配置聚合器)之间存在差异。reaper周期性地为所有MessageGroups启动强制完成。如果在. MessageGroupStore_ 此外,reaper 可用于删除空组(如果为 false ,则保留空组以丢弃迟到的消息)。groupTimeoutMessageGroupgroupTimeoutexpire-groups-upon-completion

从 5.5 版开始,groupTimeoutExpression可以对java.util.Date实例进行评估。MessageGroup.getTimestamp()这在诸如groupTimeoutExpression根据long组创建时间(

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器

以下示例显示了一个配置有注释的聚合器:

public class Waiter {
  ...

  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 指示此方法应用作聚合器的注释。如果将此类用作聚合器,则必须指定它。
2 表示此方法用作聚合器的发布策略的注解。如果在任何方法上都不存在,则聚合器使用SimpleSequenceSizeReleaseStrategy.
3 指示此方法应用作聚合器的关联策略的注释。如果未指示关联策略,则聚合器使用HeaderAttributeCorrelationStrategybased on CORRELATION_ID

XML 元素提供的所有配置选项也可用于@Aggregator注释。

可以从 XML 显式引用聚合器,或者如果在@MessageEndpoint类上定义,则可以通过类路径扫描自动检测到聚合器。

聚合器组件的注释配置(@Aggregator和其他)仅涵盖简单的用例,其中大多数默认选项就足够了。如果您在使用注解配置时需要对这些选项进行更多控制,请考虑使用@Bean定义AggregatingMessageHandler并用 标记其@Bean方法@ServiceActivator,如以下示例所示:

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

有关更多信息,请参阅编程模型方法注释。@Bean

从 4.2 版开始,AggregatorFactoryBean可用于简化AggregatingMessageHandler.

在聚合器中管理状态:MessageGroupStore

聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态模式,它需要根据在一段时间内到达的一组消息做出决策,所有消息都具有相同的关联键。有状态模式(例如ReleaseStrategy)中的接口设计是由组件(无论是由框架定义还是由用户定义)应该能够保持无状态的原则驱动的。所有状态都由 承载,MessageGroup其管理委托给MessageGroupStoreMessageGroupStore接口定义如下:

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

有关详细信息,请参阅Javadoc

在等待触发发布策略的同时累积状态信息,并且该事件可能永远不会发生MessageGroupStoreMessageGroups因此,为了防止陈旧的消息滞留,并为易失性存储提供一个挂钩以在应用程序关闭时进行清理,您可以注册回调以在它们过期时MessageGroupStore应用到它。MessageGroups该界面非常简单,如以下清单所示:

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。

维护这些回调的MessageGroupStore列表,它根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(参见前面描述的registerMessageGroupExpiryCallback(..)expireMessageGroups(..)方法)。

MessageGroupStore当您打算依赖功能时, 不要在不同的聚合器组件中使用相同的实例,这一点很重要expireMessageGroups。每个都根据回调AbstractCorrelatingMessageHandler注册自己的。这样,每个到期组都可能被错误的聚合器完成或丢弃。从版本 5.0.10 开始,a用于. 反过来,检查该类的实例是否存在,并在回调集中已经存在的情况下用适当的消息记录错误。这种方式框架不允许使用MessageGroupCallbackforceComplete()UniqueExpiryCallbackAbstractCorrelatingMessageHandlerMessageGroupStoreMessageGroupStoreMessageGroupStore在不同的聚合器/重排序器中实例化,以避免提到的过期组的副作用不是由特定的相关处理程序创建的。

expireMessageGroups您可以使用超时值调用该方法。任何早于当前时间减去此值的消息都会过期并应用回调。因此,商店的用户定义了消息组“过期”的含义。

为了方便用户,Spring Integration 以 a 的形式为消息过期提供了一个包装器MessageGroupStoreReaper,如以下示例所示:

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

收割者是一个Runnable。在前面的示例中,每十秒调用一次消息组存储的 expire 方法。超时本身是 30 秒。

重要的是要了解 'timeout' 属性MessageGroupStoreReaper是一个近似值,并受任务调度程序的速率影响,因为此属性仅在MessageGroupStoreReaper任务的下一个计划执行时检查。例如,如果超时设置为 10 分钟,但MessageGroupStoreReaper任务计划每小时运行一次,并且MessageGroupStoreReaper任务的最后一次执行发生在超时前一分钟,则MessageGroup在接下来的 59 分钟内不会过期。因此,我们建议将速率设置为至少等于超时值或更短。

除了 reaper 之外,到期回调在应用程序关闭时通过AbstractCorrelatingMessageHandler.

AbstractCorrelatingMessageHandler注册自己的到期回调,这是聚合器的 XML 配置中带有布尔标志的链接send-partial-result-on-expiry。如果标志设置为true,则在调用到期回调时,组中尚未释放的任何未标记消息都可以发送到输出通道。

由于MessageGroupStoreReaper是从计划任务中调用的,并且可能会导致向下游集成流生成消息(取决于sendPartialResultOnExpiry选项),因此建议TaskScheduler通过 aMessagePublishingErrorHandler为处理程序异常提供自定义errorChannel,正如预期的那样通过常规聚合器发布功能。相同的逻辑适用于组超时功能,它也依赖于TaskScheduler. 有关详细信息,请参阅错误处理

当一个共享MessageStore用于不同的关联端点时,您必须配置一个属性CorrelationStrategy来确保组 ID 的唯一性。否则,当一个关联端点释放或过期来自其他端点的消息时,可能会发生意外行为。具有相同关联键的消息存储在同一个消息组中。

一些MessageStore实现允许通过对数据进行分区来使用相同的物理资源。例如,JdbcMessageStore有一个region属性,而MongoDbMessageStore有一个collectionName属性。

有关MessageStore接口及其实现的更多信息,请参阅消息存储

通量聚合器

在 5.2 版本中,该FluxAggregatorMessageHandler组件已被引入。它基于 Project ReactorFlux.groupBy()Flux.window()运营商。传入的消息被发送到由该组件的构造函数中FluxSink发起的。Flux.create()如果outputChannel未提供 或它不是 的实例,则从实现中完成ReactiveStreamsSubscribableChannel对 main 的订阅。否则它被推迟到由实现完成的订阅。消息通过使用 a作为组键进行分组。默认情况下,查询消息的标题。FluxLifecycle.start()ReactiveStreamsSubscribableChannelFlux.groupBy()CorrelationStrategyIntegrationMessageHeaderAccessor.CORRELATION_ID

默认情况下,每个关闭的窗口都作为Flux要生成的消息的有效负载释放。此消息包含窗口中第一条消息的所有标题。Flux必须在下游订阅和处理输出消息有效负载中的此内容。这样的逻辑可以setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)通过FluxAggregatorMessageHandler. 例如,如果我们想List在最终消息中包含一个有效负载,我们可以Flux.collectList()这样配置:

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

有几个选项FluxAggregatorMessageHandler可以选择合适的窗口策略:

  • setBoundaryTrigger(Predicate<Message<?>>)- 传播给Flux.windowUntil()操作员。有关更多信息,请参阅其 JavaDocs。优先于所有其他窗口选项。

  • setWindowSize(int)and setWindowSizeFunction(Function<Message<?>, Integer>)- 传播到Flux.window(int)or windowTimeout(int, Duration)。默认情况下,窗口大小是根据组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标头计算的。

  • setWindowTimespan(Duration)- 传播到Flux.window(Duration)windowTimeout(int, Duration)取决于窗口大小配置。

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 将转换应用于未包含在公开选项中的任何自定义窗口操作的分组通量的函数。

由于这个组件是一个MessageHandler实现,它可以简单地用作一个@Bean定义和一个@ServiceActivator消息注释。使用 Java DSL,它可以从.handle()EIP 方法中使用。下面的示例演示了我们如何IntegrationFlow在运行时注册 a 以及如何将 aFluxAggregatorMessageHandler与上游拆分器相关联:

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

@SuppressWarnings("unchecked")
Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

消息组的条件

从 5.5 版开始,一个AbstractCorrelatingMessageHandler(包括其 Java 和 XML DSL)公开了一个实现groupConditionSupplier选项BiFunction<Message<?>, String, String>。此功能用于添加到组中的每条消息,并将结果条件语句存储到组中以供将来考虑。可以咨询此条件,ReleaseStrategy而不是遍历组中的所有消息。有关详细信息,请参阅GroupConditionProviderJavaDocs 和消息组条件

另请参阅文件聚合器

重测序仪

重排序器与聚合器相关,但用于不同的目的。当聚合器组合消息时,重排序器在不更改消息的情况下传递消息。

功能性

重排序器的工作方式与聚合器类似,因为它使用CORRELATION_ID来将消息存储在组中。不同之处在于 Resequencer 不以任何方式处理消息。SEQUENCE_NUMBER相反,它会按照标题值的顺序释放它们。

关于这一点,您可以选择立即释放所有消息(在整个序列之后,根据SEQUENCE_SIZE和其他可能性)或在有效序列可用时立即释放。(我们将在本章后面讨论“有效序列”的含义。)

重新排序器旨在重新排序具有小间隙的相对较短的消息序列。如果您有大量具有许多间隙的不相交序列,则可能会遇到性能问题。

配置重定序器

有关在 Java DSL 中配置重新排序器的信息,请参阅聚合器和重新排序器。

配置重排序器只需要在 XML 中包含适当的元素。

以下示例显示了重新排序器配置:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  (1)
  input-channel="inputChannel"  (2)
  output-channel="outputChannel"  (3)
  discard-channel="discardChannel"  (4)
  release-partial-sequences="true"  (5)
  message-store="messageStore"  (6)
  send-partial-result-on-expiry="true"  (7)
  send-timeout="86420000"  (8)
  correlation-strategy="correlationStrategyBean"  (9)
  correlation-strategy-method="correlate"  (10)
  correlation-strategy-expression="headers['something']"  (11)
  release-strategy="releaseStrategyBean"  (12)
  release-strategy-method="release"  (13)
  release-strategy-expression="size() == 10"  (14)
  empty-group-min-timeout="60000"  (15)

  lock-registry="lockRegistry"  (16)

  group-timeout="60000"  (17)
  group-timeout-expression="size() ge 2 ? 100 : -1"  (18)
  scheduler="taskScheduler" />  (19)
  expire-group-upon-timeout="false" />  (20)
1 重定序器的 id 是可选的。
2 重定序器的输入通道。必需的。
3 重新排序器将重新排序的消息发送到的通道。可选的。
4 重定序器将超时消息发送到的通道(如果send-partial-result-on-timeout设置为false)。可选的。
5 是在有序序列可用时立即发送还是仅在整个消息组到达后发送。可选的。(默认为false。)
6 对 a 的引用MessageGroupStore,可用于将消息组存储在其相关键下,直到它们完成。可选的。(默认为易失性内存存储。)
7 是否应在组到期时发送有序组(即使某些消息丢失)。可选的。(默认为 false。)请参阅在聚合器中管理状态:MessageGroupStore
8 Messageoutput-channel或发送回复时等待的超时间隔discard-channel。默认为-1,无限期阻塞。仅当输出通道具有某些“发送”限制时才应用它,例如QueueChannel具有固定“容量”的 a。在这种情况下,MessageDeliveryException抛出 a。被send-timeout忽略的AbstractSubscribableChannel实现。对于group-timeout(-expression)MessageDeliveryException来自计划的过期任务导致该任务被重新计划。可选的。
9 对实现消息相关(分组)算法的 bean 的引用。bean 可以是CorrelationStrategy接口或 POJO 的实现。在后一种情况下,correlation-strategy-method还必须定义属性。可选的。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头。)
10 在 bean 上定义correlation-strategy并实现相关决策算法的方法。可选,有限制(需要correlation-strategy存在)。
11 表示关联策略的 SpEL 表达式。示例:"headers['something']"。只允许correlation-strategy或之一。correlation-strategy-expression
12 对实现发布策略的 bean 的引用。bean 可以是ReleaseStrategy接口或 POJO 的实现。在后一种情况下,release-strategy-method还必须定义属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader 属性)。
13 在 bean 上定义release-strategy并实现完成决策算法的方法。可选,有限制(需要release-strategy存在)。
14 表示发布策略的 SpEL 表达式。表达式的根对象是 a MessageGroup。示例:"size() == 5"。只允许release-strategy或之一。release-strategy-expression
15 仅适用于MessageGroupStoreReaper<resequencer> MessageStore. 默认情况下,当 aMessageGroupStoreReaper配置为使部分组过期时,也会删除空组。正常释放组后存在空组。这是为了能够检测和丢弃迟到的消息。如果您希望空组的过期时间比部分组过期的时间长,请设置此属性。MessageStore然后,直到至少在此毫秒数内没有修改空组,才会从 中删除空组。请注意,使空组过期的实际时间也受 reaper 的 timeout 属性的影响,它可能与该值加上 timeout 一样多。
16 请参阅使用 XML 配置聚合器
17 请参阅使用 XML 配置聚合器
18 请参阅使用 XML 配置聚合器
19 请参阅使用 XML 配置聚合器
20 默认情况下,当组由于超时(或通过 a MessageGroupStoreReaper)完成时,将保留空组的元数据。迟到的消息会立即被丢弃。将此设置为true以完全删除组。然后,迟到的消息开始一个新的组,直到组再次超时才被丢弃。由于序列范围中的“洞”导致超时,新组永远不会正常释放。稍后可以通过将 aMessageGroupStoreReaperempty-group-min-timeout属性一起使用来使空组过期(完全删除)。从版本 5.0 开始,空组也计划在过期后删除empty-group-min-timeout。默认值为“假”。

另请参阅聚合器到期组以获取更多信息。

由于没有在 Java 类中为重定序器实现自定义行为,因此没有注释支持。

消息处理链

MessageHandlerChain是一个MessageHandler可以配置为单个消息端点的实现,同时实际上委托给其他处理程序链,例如过滤器、转换器、拆分器等。当需要以固定的线性进程连接多个处理程序时,这可能会导致更简单的配置。例如,在其他组件之前提供变压器是相当普遍的。类似地,当您在链中的某个其他组件之前提供过滤器时,您实际上创建了一个选择性消费者。在任何一种情况下,链都只需要一个input-channel和一个output-channel,从而无需为每个单独的组件定义通道。

主要MessageHandlerChain是为 XML 配置设计的。对于 Java DSL 来说,IntegrationFlow定义可以看作是一个链式组件,但它与下面本章描述的概念和原理无关。有关详细信息,请参阅Java DSL
Spring IntegrationFilter提供了一个布尔属性:throwExceptionOnRejection. 当您在同一个点对点通道上提供多个具有不同接受标准的选择性消费者时,您应该将此值设置为“true”(默认值为false),以便调度程序知道消息被拒绝并因此尝试将消息传递给其他订阅者。如果没有抛出异常,即使过滤器已丢弃消息以防止进一步处理,调度程序也会显示消息已成功传递。如果您确实想要“丢弃”消息,过滤器的“丢弃通道”可能很有用,因为它确实让您有机会对丢弃的消息执行一些操作(例如将其发送到 JMS 队列或将其写入到日志)。

处理程序链简化了配置,同时在内部保持组件之间相同程度的松散耦合,如果在某些时候需要非线性安排,修改配置是微不足道的。

在内部,链扩展为列出的端点的线性设置,由匿名通道分隔。链中不考虑回复通道标头。只有在调用最后一个处理程序之后,才会将结果消息转发到回复通道或链的输出通道。由于这种设置,除最后一个之外的所有处理程序都必须实现该MessageProducer接口(它提供了一个“setOutputChannel()”方法)。如果设置了outputChannelon MessageHandlerChain,则最后一个处理程序只需要一个输出通道。

与其他端点一样,output-channel是可选的。如果在链的末尾有回复消息,则输出通道优先。但是,如果它不可用,则链处理程序会检查入站消息上的回复通道标头作为后备。

在大多数情况下,您不需要MessageHandler自己实现。下一节重点介绍链元素的命名空间支持。大多数 Spring Integration 端点,例如服务激活器和转换器,都适合在MessageHandlerChain.

配置链

<chain>元素提供了一个input-channel属性。如果链中的最后一个元素能够产生回复消息(可选),它也支持一个output-channel属性。子元素是过滤器、转换器、分离器和服务激活器。最后一个元素也可以是路由器或出站通道适配器。以下示例显示了链定义:

<int:chain input-channel="input" output-channel="output">
    <int:filter ref="someSelector" throw-exception-on-rejection="true"/>
    <int:header-enricher>
        <int:header name="thing1" value="thing2"/>
    </int:header-enricher>
    <int:service-activator ref="someService" method="someMethod"/>
</int:chain>

<header-enricher>前面示例中使用的元素设置了一个消息头,该消息头以消息thing1的值命名thing2Transformer标头丰富器是仅涉及标头值的专业化。您可以通过实现一个MessageHandler对标头进行修改并将其连接为 bean 的方法来获得相同的结果,但标头丰富器是一个更简单的选项。

可以将<chain>其配置为消息流的最后一个“封闭箱”消费者。对于此解决方案,您可以将其放在 <chain> 的末尾一些 <outbound-channel-adapter>,如以下示例所示:

<int:chain input-channel="input">
    <int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
    <int:service-activator ref="someService" method="someMethod"/>
    <int:header-enricher>
        <int:header name="thing1" value="thing2"/>
    </int:header-enricher>
    <int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
不允许的属性和元素

某些属性,例如orderinput-channel不允许在链中使用的组件上指定。poller 子元素也是如此。

对于 Spring Integration 核心组件,XML 模式本身强制执行其中一些约束。但是,对于非核心组件或您自己的自定义组件,这些约束由 XML 命名空间解析器而非 XML 模式强制执行。

这些 XML 命名空间解析器约束是在 Spring Integration 2.2 中添加的。如果您尝试使用不允许的属性和元素,XML 命名空间解析器会抛出一个BeanDefinitionParsingException.

使用“id”属性

从 Spring Integration 3.0 开始,如果给链元素一个id属性,则元素的 bean 名称是链idid元素本身的组合。没有属性的元素id不会注册为 bean,但每个元素都会被赋予一个componentName包含链的id。考虑以下示例:

<int:chain id="somethingChain" input-channel="input">
    <int:service-activator id="somethingService" ref="someService" method="someMethod"/>
    <int:object-to-json-transformer/>
</int:chain>

在前面的示例中:

  • <chain>元素有一个id'somethingChain'。因此,AbstractEndpoint实现(PollingConsumerEventDrivenConsumer,取决于input-channel类型)bean 将此值作为其 bean 名称。

  • MessageHandlerChainbean 获取一个 bean 别名('somethingChain.handler'),它允许从BeanFactory.

  • <service-activator>不是一个完全成熟的消息传递端点(它不是一个or PollingConsumerEventDrivenConsumer。它是MessageHandler. <chain>在这种情况下,注册的 bean 名称BeanFactory是 'somethingChain$child.somethingService.handler'。

  • componentNamethisServiceActivatingHandler采用相同的值,但没有 '.handler' 后缀。它变成了“somethingChain$child.somethingService”。

  • 最后一个<chain>子组件<object-to-json-transformer>没有id属性。它componentName基于其在<chain>. 在这种情况下,它是“somethingChain$child#1”。(名称的最后一个元素是链中的顺序,以“#0”开头)。请注意,此转换器未在应用程序上下文中注册为 bean,因此它不会获得beanName. 然而,它componentName有一个对日志记录和其他目的有用的值。

元素的id属性<chain>使它们有资格进行JMX 导出,并且可以在消息历史记录中跟踪它们。BeanFactory如前所述,您可以使用适当的 bean 名称从 中访问它们。

id在元素上提供显式属性<chain>以简化日志中子组件的标识并提供对它们的访问BeanFactory等 很有用。

从链中调用链

有时,您需要从一个链中对另一个链进行嵌套调用,然后返回并在原始链中继续执行。为此,您可以通过包含 <gateway> 元素来使用消息传递网关,如以下示例所示:

<int:chain id="main-chain" input-channel="in" output-channel="out">
    <int:header-enricher>
      <int:header name="name" value="Many" />
    </int:header-enricher>
    <int:service-activator>
      <bean class="org.foo.SampleService" />
    </int:service-activator>
    <int:gateway request-channel="inputA"/>
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
    <int:header-enricher>
        <int:header name="name" value="Moe" />
    </int:header-enricher>
    <int:gateway request-channel="inputB"/>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
    <int:header-enricher>
        <int:header name="name" value="Jack" />
    </int:header-enricher>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

在前面的示例中,nested-chain-a在处理结束时main-chain由那里配置的“网关”元素调用。在 中nested-chain-a,对 a 的调用nested-chain-b是在标头丰富之后进行的。然后流程返回以在 中完成执行nested-chain-b。最后,流程返回到main-chain。当<gateway>元素的嵌套版本在链中定义时,它不需要该service-interface属性。相反,它将消息处于其当前状态并将其放置在request-channel属性中定义的通道上。当该网关启动的下游流完成时,aMessage返回到网关并继续其在当前链中的旅程。

分散-聚集

从 4.1 版开始,Spring Integration 提供了分散-聚集企业集成模式的实现。它是一个复合端点,其目标是向收件人发送消息并聚合结果。如企业集成模式中所述,它是“最佳报价”等场景的组件,我们需要从多个供应商处请求信息,并决定哪一个为我们提供所请求项目的最佳条款。

以前,可以通过使用分立组件来配置模式。这种增强带来了更方便的配置。

TheScatterGatherHandler是一个请求-回复端点,它结合了 a PublishSubscribeChannel(或 a RecipientListRouter)和 a AggregatingMessageHandler。请求消息被发送到scatter通道,并ScatterGatherHandler等待聚合器发送给outputChannel.

功能性

Scatter-Gather模式提出了两种情况:“拍卖”和“分配”。在这两种情况下,aggregation功能是相同的,并提供所有可用的选项AggregatingMessageHandler。(实际上,ScatterGatherHandler只需要 aAggregatingMessageHandler作为构造函数参数。)有关更多信息,请参阅聚合器。

拍卖

拍卖Scatter-Gather变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是PublishSubscribeChannelwith apply-sequence="true"。然而,这个通道可以是任何MessageChannel实现(就像request-channelContentEnricher - 见Content Enricher中的情况一样)。correlationStrategy但是,在这种情况下,您应该为该aggregation函数创建自己的自定义。

分配

分发Scatter-Gather变体基于RecipientListRouter(参见RecipientListRouter)以及RecipientListRouter. 这是第二个ScatterGatherHandler构造函数参数。如果您只想依赖 和 的默认值,correlationStrategy您应该指定. 否则,您应该为. 与变体(拍卖变体)不同,拥有一个选项可以根据消息过滤目标供应商。使用,提供默认值,并且可以正确释放组。分配选项与拍卖选项互斥。recipient-list-routeraggregatorapply-sequence="true"correlationStrategyaggregatorPublishSubscribeChannelrecipient-list-router selectorapply-sequence="true"sequenceSizeaggregator

对于拍卖和分发变体,请求(分散)消息都带有gatherResultChannel标题以等待来自aggregator.

默认情况下,所有供应商都应将其结果发送到replyChannel标头(通常通过省略output-channel来自最终端点的 )。但是,gatherChannel还提供了该选项,让供应商将他们的回复发送到该通道以进行聚合。

配置分散-聚集端点

以下示例显示了 bean 定义的 Java 配置Scatter-Gather

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们使用接收通道列表来配置RecipientListRouter distributorbean 。applySequence="true"下一个 bean 用于AggregatingMessageHandler. 最后,我们将这两个 bean 注入到ScatterGatherHandlerbean 定义中并将其标记为 a@ServiceActivator以将 scatter-gather 组件连接到集成流中。

以下示例显示如何<scatter-gather>使用 XML 命名空间配置端点:

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 端点的 ID。bean使用ScatterGatherHandler别名注册id + '.handler'。bean使用RecipientListRouter别名注册id + '.scatterer'AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer'。_ 可选的。(BeanFactory生成默认id值。)
2 生命周期属性表明端点是否应该在应用程序上下文初始化期间启动。此外,如果提供了 a ,则ScatterGatherHandler还实现了Lifecycle启动和停止gatherEndpoint,这将在内部创建gather-channel。可选的。(默认为true。)
3 接收请求消息以处理它们的通道ScatterGatherHandler。必需的。
4 ScatterGatherHandler将聚合结果发送到的通道。可选的。replyChannel(传入的消息可以在消息头中自己指定回复通道)。
5 将拍卖场景的分散消息发送到的通道。可选的。<scatterer>与子元素互斥。
6 接收来自每个供应商的聚合回复的通道。它用作replyChannel分散消息中的标头。可选的。默认情况下,FixedSubscriberChannel创建。
7 当多个处理程序订阅同一个处理程序时此组件的顺序DirectChannel(用于负载平衡目的)。可选的。
8 指定端点应该启动和停止的阶段。启动顺序从低到高,关机顺序从高到低。默认情况下,此值为Integer.MAX_VALUE,表示此容器尽可能晚地启动并尽快停止。可选的。
9 向 发送回复时等待的超时Message间隔output-channel。默认情况下,发送会阻塞一秒钟。仅当输出通道具有某些“发送”限制时才适用——例如,QueueChannel具有固定“容量”且已满的 a。在这种情况下,MessageDeliveryException抛出 a。被send-timeout忽略的AbstractSubscribableChannel实现。对于group-timeout(-expression)MessageDeliveryException来自计划的过期任务导致该任务被重新计划。可选的。
10 让您指定 scatter-gather 在返回之前等待回复消息的时间。默认情况下,它会无限期地等待。如果回复超时,则返回“null”。可选的。默认为-1,表示无限期等待。
11 指定 scatter-gather 是否必须返回非空值。该值是true默认值。因此,ReplyRequiredException当底层聚合器在 之后返回空值时,会抛出 a gather-timeout。请注意,如果null有可能,gather-timeout应指定 以避免无限期等待。
12 <recipient-list-router>选项。可选的。scatter-channel与属性互斥。
13 <aggregator>选项。必需的。

错误处理

由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。在某些情况下,如果ReleaseStrategy允许进程以比请求更少的回复完成,最好只捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑从子流返回诸如“补偿消息”之类的东西。

每个异步子流都应该配置一个errorChannel标头,以便从MessagePublishingErrorHandler. errorChannel否则,将使用通用错误处理逻辑将错误发送到全局。有关异步错误处理的更多信息,请参阅错误处理

同步流可以使用ExpressionEvaluatingRequestHandlerAdvice忽略异常或返回补偿消息。当从其中一个子流向 中抛出异常时ScatterGatherHandler,它只是被重新抛出到上游。这样,所有其他子流程都将毫无用处,并且它们的回复将在ScatterGatherHandler. 有时这可能是一种预期行为,但在大多数情况下,处理特定子流中的错误而不影响所有其他错误和收集器中的预期会更好。

从版本 5.1.3 开始,ScatterGatherHandlererrorChannelName选件一起提供。它填充到errorChannel分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流程中用于直接发送错误消息。

下面的示例配置通过返回补偿消息来演示异步错误处理:

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了产生正确的回复,我们必须从 的 复制标题(包括replyChannelerrorChannel)发送给。这样,目标异常将返回给回复消息组完成的收集器。这种异常可以在收集器中被过滤掉,或者在分散收集端点之后以其他方式在下游处理。failedMessageMessagingExceptionscatterGatherErrorChannelMessagePublishingErrorHandlerScatterGatherHandlerpayloadMessageGroupProcessor

在将分散结果发送到收集器之前,ScatterGatherHandler恢复请求消息头,包括回复和错误通道(如果有)。这样AggregatingMessageHandler,即使在分散的接收者子流中应用了异步切换,来自 的错误也会传播到调用者。对于成功的操作,必须将 agatherResultChanneloriginalReplyChannelheadersoriginalErrorChannel传输回来自分散接收者子流的回复。在这种情况下,gatherTimeout必须为ScatterGatherHandler. 否则,默认情况下,它将永远被阻止等待收集者的回复。

线程屏障

有时,我们需要暂停消息流线程,直到发生其他一些异步事件。例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出已收到消息的确认之前不回复用户。

在 4.2 版本中,Spring Integration<barrier/>为此引入了该组件。底层MessageHandlerBarrierMessageHandler. 此类还实现MessageTriggerAction了 ,其中传递给trigger()方法的消息释放方法中的相应线程handleRequestMessage()(如果存在)。

CorrelationStrategy挂起的线程和触发线程通过在消息上调用 a 来关联。当有消息发送到 时input-channel,线程会暂停最多requestTimeout毫秒,等待相应的触发消息。默认关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头。当触发消息以相同的相关性到达时,线程被释放。发送到output-channel后发布的消息是使用MessageGroupProcessor. 默认情况下,消息是Collection<?>两个有效负载中的一个,并且标头使用 a 合并DefaultAggregatingMessageGroupProcessor

如果trigger()首先调用该方法(或在主线程超时之后),则将其挂起直到triggerTimeout等待挂起消息到达。如果您不想挂起触发器线程,请考虑将其移交给 a TaskExecutor,以便其线程改为挂起。
在 5.4 之前的版本中,请求和触发消息只有一个timeout选项,但在某些用例中,最好为这些操作设置不同的超时时间。因此requestTimeouttriggerTimeout引入了选项。

如果在触发消息到达之前挂起的线程超时,该requires-reply属性确定要采取的操作。默认为false,表示端点返回null,流程结束,线程返回给调用者。当 时trueReplyRequiredException抛出 a。

您可以以编程方式调用该trigger()方法(通过使用名称获取 bean 引用,barrier.handler 其中barrier是屏障端点的 bean 名称)。或者,您可以配置一个<outbound-channel-adapter/>来触发发布。

只有一个线程可以挂起具有相同的相关性。相同的关联可以多次使用,但只能同时使用一次。如果第二个线程到达时具有相同的相关性,则会引发异常。

以下示例显示了如何使用自定义标头进行关联:

java
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger(message);
}
XML
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据哪个消息首先到达,发送消息in的线程或发送消息的线程release等待最多十秒钟,直到另一条消息到达。当消息发布时,out通道会发送一条消息,该消息结合了调用自定义MessageGroupProcessorbean 的结果,名为myOutputProcessor. 如果主线程超时,触发延迟到达,可以配置发送延迟触发的丢弃通道。

有关此组件的示例,请参阅屏障示例应用程序


1. see XML Configuration