本节涵盖 Spring Integration 中核心消息传递 API 的所有方面。它涵盖了消息、消息通道和消息端点。它还涵盖了许多企业集成模式,例如过滤器、路由器、转换器、服务激活器、拆分器和聚合器。

本节还包含有关系统管理的材料,包括控制总线和消息历史支持。

消息渠道

消息渠道

虽然它Message在封装数据方面起着至关重要的作用,但它MessageChannel使消息生产者与消息消费者分离。

消息通道接口

Spring Integration 的顶层MessageChannel接口定义如下:

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

发送消息时,返回值为true消息是否发送成功。如果发送调用超时或被中断,则返回false

PollableChannel

由于消息通道可能会或可能不会缓冲消息(如Spring Integration Overview中所讨论的),因此两个子接口定义了缓冲(可轮询)和非缓冲(可订阅)通道行为。以下清单显示了PollableChannel接口的定义:

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

与 send 方法一样,接收消息时,在超时或中断的情况下返回值为 null。

SubscribableChannel

SubscribableChannel基本接口由将消息直接发送到其订阅MessageHandler实例的通道实现。因此,它们不提供轮询的接收方法。相反,他们定义了管理这些订阅者的方法。以下清单显示了SubscribableChannel接口的定义:

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

消息通道实现

Spring Integration 提供了几种不同的消息通道实现。以下部分简要介绍了每一部分。

PublishSubscribeChannel

该实现将发送给它的PublishSubscribeChannel任何内容广播到其所有订阅的处理程序。Message这最常用于发送事件消息,其主要作用是通知(与通常旨在由单个处理程序处理的文档消息相反)。请注意,PublishSubscribeChannel它仅用于发送。由于它在调用其方法时直接向其订阅者广播send(Message),因此消费者无法轮询消息(它没有实现PollableChannel,因此没有receive()方法)。相反,任何订阅者本身必须是 a ,然后依次调用MessageHandler订阅者的方法。handleMessage(Message)

在 3.0 版之前,在没有返回订阅者send的 a 上调​​用该方法。当与 a 一起使用时,会抛出 a。从版本 3.0 开始,行为发生了变化,如果至少存在最少的订阅者(并成功处理消息),则始终认为 a 是成功的。可以通过设置属性来修改此行为,该属性默认为.PublishSubscribeChannelfalseMessagingTemplateMessageDeliveryExceptionsendminSubscribers0

如果您使用 a TaskExecutor,则仅使用正确数量的订阅者来进行此确定,因为消息的实际处理是异步执行的。
QueueChannel

QueueChannel实现包装了一个队列。与 不同PublishSubscribeChannelQueueChannel具有点对点语义。换句话说,即使通道有多个消费者,也只有其中一个应该接收Message发送到该通道的任何内容。它提供了一个默认的无参数构造函数(提供了一个基本上无限的容量Integer.MAX_VALUE)以及一个接受队列容量的构造函数,如下面的清单所示:

public QueueChannel(int capacity)

未达到其容量限制的通道将消息存储在其内部队列中,并且该send(Message<?>)方法立即返回,即使没有接收者准备好处理该消息。如果队列已满,发送方将阻塞,直到队列中有可用空间。或者,如果您使用具有附加超时参数的发送方法,则队列将阻塞,直到任一房间可用或超时期限结束,以先发生者为准。同样,一个receive()如果队列中有消息可用,则调用立即返回,但是,如果队列为空,则接收调用可能会阻塞,直到消息可用或超时(如果提供)过去。在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。但是请注意,对版本send()receive()不带timeout参数块的调用会无限期地调用。

PriorityChannel

尽管QueueChannel强制执行先进先出 (FIFO) 排序,但PriorityChannel它是一种替代实现,它允许基于优先级在通道内对消息进行排序。默认情况下,优先级由priority每条消息中的标头确定。Comparator<Message<?>>但是,对于自定义优先级确定逻辑,可以向PriorityChannel构造函数提供类型比较器。

RendezvousChannel

RendezvousChannel启用了“直接切换”方案,其中发送方阻塞,直到另一方调用通道的receive()方法。对方阻塞,直到发送者发送消息。在内部,这个实现与 非常相似QueueChannel,除了它使用一个SynchronousQueue(一个零容量的实现BlockingQueue)。这适用于发送方和接收方在不同线程中操作的情况,但不适合将消息异步丢弃到队列中。换句话说,使用 a RendezvousChannel,发送者知道某个接收者已经接受了消息,而使用 a QueueChannel,消息将被存储到内部队列中并且可能永远不会收到。

请记住,所有这些基于队列的通道默认情况下仅将消息存储在内存中。当需要持久性时,您可以在“队列”元素中提供“消息存储”属性以引用持久性MessageStore实现,或者您可以将本地通道替换为由持久性代理支持的通道,例如 JMS 支持的通道或通道适配器。后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如JMS 支持中所述。但是,当不需要在队列中缓冲时,最简单的方法是依赖DirectChannel,将在下一节中讨论。

RendezvousChannel对于实现请求-回复操作也很有用。发件人可以创建一个临时的匿名实例RendezvousChannel,然后在构建Message. 发送后Message,发送者可以立即调用receive(可选地提供超时值)以便在等待回复时阻塞Message。这与许多 Spring Integration 的请求-回复组件内部使用的实现非常相似。

DirectChannel

具有DirectChannel点对点语义,但在其他方面更类似于PublishSubscribeChannel前面描述的任何基于队列的通道实现。它实现了SubscribableChannel接口而不是PollableChannel接口,因此它直接将消息分派给订阅者。然而,作为一个点对点通道,它不同于PublishSubscribeChannel将每个通道发送Message给单个订阅的MessageHandler.

除了是最简单的点对点通道选项之外,它最重要的特性之一是它允许单个线程在通道的“两侧”执行操作。例如,如果处理程序订阅了 a DirectChannel,那么在方法调用返回之前,将 a 发送到该通道会直接在发送者的线程中Message触发该处理程序的方法的调用。handleMessage(Message)send()

提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍受益于通道提供的抽象和松散耦合。如果在事务范围内调用发送调用,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)中发挥作用。

由于这DirectChannel是最简单的选项,并且不会增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。总体思路是为应用程序定义通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为基于队列的PollableChannels。同样,如果一个频道需要广播消息,它不应该是 aDirectChannel而应该是 a PublishSubscribeChannel。稍后,我们将展示如何配置这些通道中的每一个。

内部DirectChannel委托给消息调度程序以调用其订阅的消息处理程序,并且该调度程序可以具有由load-balancerload-balancer-ref属性公开的负载平衡策略(互斥)。当多个消息处理程序订阅同一个通道时,消息调度程序使用负载平衡策略来帮助确定消息在消息处理程序之间的分布方式。为方便起见,该load-balancer属性公开了指向预先存在的LoadBalancingStrategy. A round-robin(轮换处理程序之间的负载平衡)和none(对于想要明确禁用负载平衡的情况)是唯一可用的值。在未来的版本中可能会添加其他策略实现。但是,从 3.0 版开始,您可以提供自己的实现LoadBalancingStrategy并使用load-balancer-ref属性注入它,该属性应指向实现 的 bean LoadBalancingStrategy,如以下示例所示:

AFixedSubscriberChannelSubscribableChannel只支持一个MessageHandler不能取消订阅的订阅者。当不涉及其他订阅者并且不需要通道拦截器时,这对于高吞吐量性能用例很有用。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

注意load-balancerload-balancer-ref属性是互斥的。

负载平衡还可以与布尔failover属性结合使用。如果该failover值为 true(默认值),则调度程序会在前面的处理程序抛出异常时回退到任何后续处理程序(根据需要)。顺序由在处理程序本身上定义的可选顺序值确定,如果不存在这样的值,则由处理程序订阅的顺序确定。

如果某种情况要求调度程序总是尝试调用第一个处理程序,然后每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。换句话说,failover即使没有启用负载平衡,调度程序仍然支持布尔属性。然而,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。例如,当对初级、二级、三级等有明确的定义时,这种方法效果很好。使用命名空间支持时,order任何端点上的属性都会确定顺序。

请记住,负载平衡failover仅适用于通道具有多个订阅消息处理程序的情况。使用命名空间支持时,这意味着多个端点共享input-channel属性中定义的相同通道引用。

从版本 5.2 开始,当failover为 true 时,当前处理程序的失败以及失败的消息将记录在debuginfo分别配置下。

ExecutorChannel

ExecutorChannel是一个点对点通道,支持与DirectChannel(负载平衡策略和failover布尔属性)相同的调度程序配置。这两种调度通道类型之间的主要区别在于ExecutorChannel委托实例TaskExecutor执行调度。这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。因此,它不支持跨越发送方和接收处理程序的事务。

发件人有时可以阻止。例如,当使用TaskExecutor带有限制客户端的拒绝策略(例如ThreadPoolExecutor.CallerRunsPolicy)时,发送者的线程可以在线程池达到其最大容量并且执行者的工作队列已满的任何时候执行该方法。由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。
FluxMessageChannel

FluxMessageChannel是一种将消息发送到内部以供下游反应订阅者按需消费的org.reactivestreams.Publisher实现。此通道实现既不是 a也不是 a ,因此只有实例可用于从该通道消费,以尊重反应流的背压特性。另一方面,实现 a的合约允许从响应式源发布者接收事件,将响应式流桥接到集成流中。为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。"sinking"reactor.core.publisher.FluxSubscribableChannelPollableChannelorg.reactivestreams.SubscriberFluxMessageChannelReactiveStreamsSubscribableChannelsubscribeTo(Publisher<Message<?>>)

有关与 Reactive Streams 交互的更多信息,请参阅Reactive Streams Support 。

范围频道

Spring Integration 1.0 提供了一个ThreadLocalChannel实现,但在 2.0 中已被删除。现在处理相同需求的更通用方法是向scope通道添加属性。属性的值可以是上下文中可用的范围的名称。例如,在 Web 环境中,某些范围是可用的,并且任何自定义范围实现都可以在上下文中注册。以下示例显示了应用于通道的线程本地范围,包括范围本身的注册:

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

上例中定义的通道也在内部委托给了一个队列,但是通道绑定到当前线程,所以队列的内容也是类似绑定的。这样,发送到通道的线程稍后可以接收这些相同的消息,但没有其他线程能够访问它们。虽然很少需要线程范围的通道,但它们在DirectChannel使用实例来强制执行单个操作线程但任何回复消息都应发送到“终端”通道的情况下很有用。如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。

现在,由于可以对任何通道进行作用域,因此除了 thread-Local 之外,您还可以定义自己的作用域。

通道拦截器

消息传递体系结构的优点之一是能够提供常见行为并以非侵入方式捕获有关通过系统的消息的有意义信息。由于Message实例是从MessageChannel实例发送和接收的,因此这些通道提供了拦截发送和接收操作的机会。ChannelInterceptor如下清单所示的策略接口为这些操作中的每一个提供了方法:

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

实现接口后,向通道注册拦截器只需进行以下调用:

channel.addInterceptor(someChannelInterceptor);

返回Message实例的方法可用于转换Message或可以返回 'null' 以防止进一步处理(当然,任何方法都可以抛出 a RuntimeException)。此外,该preReceive方法可以返回false以阻止接收操作继续进行。

请记住,receive()呼叫仅与PollableChannels. 事实上,SubscribableChannel接口甚至没有定义receive()方法。这样做的原因是,当 aMessage被发送到 aSubscribableChannel时,它会直接发送给零个或多个订阅者,具体取决于通道的类型(例如,aPublishSubscribeChannel发送给它的所有订阅者)。因此,只有当拦截器应用于 a 时,才会调用 、 和 拦截preReceive(…​)postReceive(…​)方法。 afterReceiveCompletion(…​)PollableChannel

Spring Integration 还提供了Wire Tap模式的实现。它是一个简单的拦截器,可以在Message不改变现有流的情况下将其发送到另一个通道。它对于调试和监控非常有用。Wire Tap中显示了一个示例。

因为很少需要实现所有的拦截器方法,所以接口提供了无操作方法(方法返回void方法没有代码,-返回方法按Message原样返回Messageboolean方法返回true)。

拦截器方法的调用顺序取决于通道的类型。如前所述,基于队列的通道是唯一首先拦截接收方法的通道。此外,发送和接收拦截之间的关系取决于单独的发送者和接收者线程的时间。例如,如果接收者在等待消息时已经被阻塞,则顺序可能如下:preSend, preReceive, postReceive, postSend。但是,如果接收者在发送者在通道上放置消息并且已经返回之后进行轮询,则顺序如下:preSend, postSend(some-time-elapses), preReceive,postReceive. 在这种情况下经过的时间取决于许多因素,因此通常是不可预测的(事实上,接收可能永远不会发生)。队列的类型也起作用(例如,集合点与优先级)。简而言之,除了preSend前面postSendpreReceive前面的事实之外,您不能依赖顺序postReceive

从 Spring Framework 4.1 和 Spring Integration 4.1 开始,ChannelInterceptor提供了新方法:afterSendCompletion()afterReceiveCompletion(). 无论引发任何异常,它们都会在调用后send()' and 'receive()调用,这允许资源清理。请注意,通道ChannelInterceptor以与初始preSend()preReceive()调用相反的顺序调用列表中的这些方法。

从 5.1 版开始,全局通道拦截器现在适用于动态注册的通道 - 例如通过使用beanFactory.initializeBean()IntegrationFlowContext使用 Java DSL 初始化的 bean。以前,在刷新应用程序上下文后创建 bean 时不应用拦截器。

此外,从 5.1 版本开始,ChannelInterceptor.postReceive()不再在没有收到消息时调用;不再需要检查null Message<?>. 以前,该方法被调用。如果您有一个依赖于先前行为的拦截器,请afterReceiveCompleted()改为实现,因为该方法被调用,无论是否接收到消息。

从版本 5.2 开始,ChannelInterceptorAware不推荐使用InterceptableChannelSpring Messaging 模块,现在它扩展了该模块以实现向后兼容性。

MessagingTemplate

当引入端点及其各种配置选项时,Spring Integration 为消息传递组件提供了基础,支持从消息传递系统对应用程序代码进行非侵入式调用。但是,有时需要从您的应用程序代码中调用消息传递系统。为了方便实现此类用例,Spring Integration 提供了MessagingTemplate支持跨消息通道的各种操作,包括请求和回复场景。例如,可以发送请求并等待回复,如下所示:

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));

在前面的示例中,模板将在内部创建一个临时匿名通道。'sendTimeout' 和 'receiveTimeout' 属性也可以在模板上设置,并且还支持其他交换类型。以下清单显示了此类方法的签名:

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}
Message输入. _ _GatewayProxyFactoryBean

配置消息通道

要创建消息通道实例,您可以使用<channel/>xml 元素或DirectChannelJava 配置实例,如下所示:

java
@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
XML
<int:channel id="exampleChannel"/>

当您使用<channel/>没有任何子元素的元素时,它会创建一个DirectChannel实例 (a SubscribableChannel)。

要创建发布-订阅通道,请使用<publish-subscribe-channel/>元素(PublishSubscribeChannelJava 中的 ),如下所示:

java
@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
XML
<int:publish-subscribe-channel id="exampleChannel"/>

您也可以提供各种<queue/>子元素来创建任何可轮询通道类型(如消息通道实现中所述)。以下部分显示了每种通道类型的示例。

DirectChannel配置

如前所述,DirectChannel是默认类型。以下清单显示了定义谁:

java
@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
XML
<int:channel id="directChannel"/>

默认通道具有循环负载平衡器,并且还启用了故障转移(DirectChannel有关详细信息,请参阅)。要禁用其中一项或两项,请添加一个<dispatcher/>子元素( 的LoadBalancingStrategy构造函数DirectChannel)并按如下方式配置属性:

java
@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
XML
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>
数据类型通道配置

有时,消费者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。首先想到的可能是使用消息过滤器。但是,消息过滤器所能做的就是过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器并将具有不兼容数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。这会起作用,但完成同样事情的更简单方法是应用数据类型通道模式。您可以为每个特定的有效负载数据类型使用单独的数据类型通道。

要创建仅接受包含特定负载类型的消息的数据类型通道,请在通道元素的datatype属性中提供数据类型的完全限定类名称,如以下示例所示:

java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
XML
<int:channel id="numberChannel" datatype="java.lang.Number"/>

请注意,对于可分配给通道数据类型的任何类型,类型检查都会通过。换句话说,numberChannel前面示例中的 将接受有效负载为java.lang.Integer或的消息java.lang.Double。可以以逗号分隔列表的形式提供多种类型,如以下示例所示:

java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
XML
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

因此,前面示例中的“numberChannel”只接受数据类型为 的消息java.lang.Number。但是,如果消息的有效负载不是所需的类型,会发生什么?这取决于您是否定义了一个名为的 bean integrationConversionService,它是 Spring 的Conversion Service的一个实例。如果不是,那么 anException将立即被抛出。但是,如果您定义了一个integrationConversionServicebean,它会被用来尝试将消息的有效负载转换为可接受的类型。

您甚至可以注册自定义转换器。例如,假设您将带有String有效负载的消息发送到我们上面配置的“numberChannel”。您可以按如下方式处理该消息:

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常这将是一个完全合法的操作。但是,由于我们使用 Datatype Channel,因此此类操作的结果会产生类似于以下的异常:

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

发生异常是因为我们要求有效负载类型为 a Number,但我们发送了String. 所以我们需要一些东西来将 a 转换String为 a Number。为此,我们可以实现类似于以下示例的转换器:

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

然后我们可以将它注册为集成转换服务的转换器,如以下示例所示:

java
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
XML
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

或者在StringToIntegerConverter标有@Component自动扫描注释的类上。

解析“converter”元素时,integrationConversionService如果尚未定义 bean,它将创建 bean。使用该转换器,send操作现在将成功,因为数据类型通道使用该转换器将String有效负载转换为Integer.

有关有效负载类型转换的更多信息,请参阅有效负载类型转换

从 4.0 版开始, 由integrationConversionService调用DefaultDatatypeChannelMessageConverter,它在应用程序上下文中查找转换服务。要使用不同的转换技术,您可以message-converter在通道上指定属性。这必须是对MessageConverter实现的引用。仅使用fromMessage方法。它为转换器提供了对消息标头的访问(以防转换可能需要来自标头的信息,例如content-type)。该方法只能返回转换后的有效负载或完整Message对象。如果是后者,转换器必须小心复制入站消息中的所有标头。

或者,您可以声明一个ID 为<bean/>的类型,并且所有通道都使用该转换器。MessageConverterdatatypeChannelMessageConverterdatatype

QueueChannel配置

要创建QueueChannel,请使用<queue/>子元素。您可以按如下方式指定通道的容量:

java
@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
XML
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
如果您没有为此<queue/>子元素的“容量”属性提供值,则生成的队列是无界的。为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。
持久QueueChannel配置

由于 aQueueChannel提供了缓冲消息的能力,但默认情况下仅在内存中进行,因此它还引入了在系统故障时消息可能丢失的可能性。为了减轻这种风险,QueueChannel可以通过MessageGroupStore策略接口的持久实现来支持。有关MessageGroupStoreand的更多详细信息MessageStore,请参阅消息存储

使用capacity属性时不允许使用该message-store属性。

当 aQueueChannel接收到 aMessage时,它会将消息添加到消息存储中。当Message从 a 轮询 aQueueChannel时,它会从消息存储中删除。

默认情况下,aQueueChannel将其消息存储在内存队列中,这可能导致前面提到的消息丢失情况。但是,Spring Integration 提供了持久存储,例如JdbcChannelMessageStore.

QueueChannel您可以通过添加属性为任何配置消息存储message-store,如以下示例所示:

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(有关 Java/Kotlin 配置选项,请参阅下面的示例。)

Spring Integration JDBC 模块还为许多流行的数据库提供了模式数据定义语言 (DDL)。这些模式位于该模块 ( spring-integration-jdbc) 的 org.springframework.integration.jdbc.store.channel 包中。

一个重要的特性是,对于任何事务性持久存储(例如JdbcChannelMessageStore),只要轮询器配置了事务,从存储中删除的消息只有在事务成功完成时才能被永久删除。否则事务回滚,并且Message不会丢失。

随着越来越多的与“NoSQL”数据存储相关的 Spring 项目开始为这些存储提供底层支持,消息存储的许多其他实现都可用。MessageGroupStore如果找不到满足您特定需求的接口,您也可以提供自己的接口实现。

从 4.0 版开始,如果可能,我们建议将QueueChannel实例配置为使用. ChannelMessageStore与一般消息存储相比,这些通常针对此用途进行了优化。如果ChannelMessageStore是 a ChannelPriorityMessageStore,则按照优先级顺序在 FIFO 中接收消息。优先级的概念由消息存储实现确定。例如,以下示例显示了MongoDB Channel Message Store的 Java 配置:

java
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
Java DSL
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlows.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
科特林 DSL
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
注意MessageGroupQueue上课。那是BlockingQueue使用MessageGroupStore操作的实现。

QueueChannel自定义环境的另一个选项由子元素或其特定构造函数的ref属性提供。<int:queue>此属性提供对任何java.util.Queue实现的引用。例如,一个 Hazelcast 分布式IQueue可以配置如下:

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel配置

要创建PublishSubscribeChannel,请使用 <publish-subscribe-channel/> 元素。使用该元素时,还可以指定task-executor用于发布消息(如果没有指定,则在发送者的线程中发布),如下:

java
@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
XML
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

如果您在 a 的下游提供重新排序器或聚合器PublishSubscribeChannel,则可以将通道上的“应用序列”属性设置为true。这样做表示通道应该在传递消息之前设置sequence-sizesequence-number消息头以及相关 ID。例如,如果有五个订阅者,则sequence-size将设置为5,并且消息的sequence-number标头值范围从15

除了Executor,您还可以配置ErrorHandler. 默认情况下,PublishSubscribeChannel使用MessagePublishingErrorHandler实现将错误发送到MessageChannelerrorChannel头或全局errorChannel实例。如果 anExecutor未配置,ErrorHandler则忽略并直接向调用者的线程抛出异常。

如果您提供 aResequencer或a 的Aggregator下游PublishSubscribeChannel,则可以将通道上的 'apply-sequence' 属性设置为true。这样做表示通道应该在传递消息之前设置序列大小和序列号消息头以及相关 ID。例如,如果有五个订阅者,则序列大小将设置为5,并且消息将具有范围从1到的序列号标头值5

以下示例显示如何将apply-sequence标头设置为true

java
@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(false);
    return channel;
}
XML
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
apply-sequence值是false默认值,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。由于 Spring Integration 强制有效负载和标头引用的不变性,当标志设置为true时,通道创建Message具有相同有效负载引用但不同标头值的新实例。

从版本 5.4.3 开始,PublishSubscribeChannel还可以使用它的requireSubscribers选项进行配置,BroadcastingDispatcher以指示该频道在没有订阅者时不会静默忽略消息。当没有订阅者并且此选项设置为 时,会抛出带有MessageDispatchingException消息的A。Dispatcher has no subscriberstrue

ExecutorChannel

要创建ExecutorChannel,请添加具有属性的<dispatcher>子元素。task-executor属性的值可以引用TaskExecutor上下文中的任何值。例如,这样做可以配置线程池以将消息分派到订阅的处理程序。如前所述,这样做会破坏发送方和接收方之间的单线程执行上下文,因此任何活动的事务上下文都不会被处理程序的调用共享(即处理程序可能会抛出一个Exception,但send调用已经成功返回) . 以下示例显示了如何使用该元素并在属性dispatcher中指定执行器:task-executor

java
@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
XML
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

load-balancer和选项在failover<dispatcher/> 子元素上也可用,如前面在DirectChannelConfiguration中所述。相同的默认值适用。因此,通道具有启用故障转移的循环负载平衡策略,除非为这些属性中的一个或两个提供显式配置,如以下示例所示:

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>
PriorityChannel配置

要创建PriorityChannel,请使用<priority-queue/>子元素,如以下示例所示:

java
@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
XML
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

默认情况下,通道会参考priority消息的标头。但是,您可以改为提供自定义Comparator参考。另外,请注意PriorityChannel(与其他类型一样)确实支持该datatype属性。与 一样QueueChannel,它也支持capacity属性。以下示例演示了所有这些:

java
@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
XML
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

从 4.0 版开始,priority-channel子元素支持该message-store选项(在这种情况comparatorcapacity是不允许的)。消息存储必须是PriorityCapableChannelMessageStore. PriorityCapableChannelMessageStore目前为RedisJDBC和提供了的实现MongoDB。有关详细信息,请参阅QueueChannel配置消息存储您可以在Backing Message Channels中找到示例配置。

RendezvousChannel配置

ARendezvousChannel在队列子元素是 a 时创建<rendezvous-queue>。它不为前面描述的那些提供任何额外的配置选项,并且它的队列不接受任何容量值,因为它是一个零容量的直接切换队列。以下示例显示了如何声明 a RendezvousChannel

java
@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
XML
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>
范围通道配置

任何通道都可以配置一个scope属性,如以下示例所示:

<int:channel id="threadLocalChannel" scope="thread"/>
通道拦截器配置

消息通道也可能有拦截器,如Channel Interceptors中所述。<interceptors/>子元素可以添加到一个(或更具体的<channel/>元素类型)。您可以提供该ref属性来引用任何实现该ChannelInterceptor接口的 Spring 管理的对象,如以下示例所示:

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

一般来说,我们建议在单独的位置定义拦截器实现,因为它们通常提供可以跨多个通道重用的通用行为。

全局通道拦截器配置

通道拦截器提供了一种简洁明了的方式来应用每个通道的横切行为。如果应该在多个通道上应用相同的行为,那么为每个通道配置相同的拦截器集将不是最有效的方法。为了避免重复配置,同时也使拦截器可以应用于多个通道,Spring Integration 提供了全局拦截器。考虑以下一对示例:

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

每个元素都允许您定义一个全局拦截器,该拦截器应用于与该属性<channel-interceptor/>定义的任何模式匹配的所有通道。pattern在上述情况下,全局拦截器应用于“thing1”通道和所有其他以“thing2”或“input”开头的通道,但不适用于以“thing3”开头的通道(从 5.0 版开始)。

将此语法添加到模式中会导致一个可能(尽管可能不太可能)的问题。如果您有一个名为的 bean ,并且您在通道拦截器的模式中!thing1包含了一个模式,则它不再匹配。该模式现在匹配所有未命名的 bean 。在这种情况下,您可以使用 转义模式中的。该模式与名为 的 bean 匹配。 !thing1patternthing1!\\!thing1!thing1

当给定通道上有多个拦截器时,order 属性允许您管理此拦截器的注入位置。例如,通道“inputChannel”可以在本地配置单独的拦截器(见下文),如下例所示:

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

一个合理的问题是“一个全局拦截器是如何相对于本地配置的其他拦截器或通过其他全局拦截器定义注入的?” 当前的实现提供了一种简单的机制来定义拦截器的执行顺序。属性中的正数order确保拦截器在任何现有拦截器之后注入,而负数确保拦截器在现有拦截器之前注入。这意味着,在前面的示例中,全局拦截器是在本地配置的“wire-tap”拦截器之后(因为它order大于)注入的。0如果有另一个匹配的全局拦截器pattern,它的顺序将通过比较两个拦截器的值来确定order属性。order要在现有拦截器之前注入全局拦截器,请为属性使用负值。

请注意,orderpattern属性都是可选的。的默认值为order0,而pattern的默认值为 '*'(匹配所有通道)。
丝锥

如前所述,Spring Integration 提供了一个简单的窃听拦截器。<interceptors/>您可以在元素内的任何通道上配置接线。这样做对于调试特别有用,可以与 Spring Integration 的日志通道适配器结合使用,如下所示:

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 还接受 'expression' 属性,以便您可以根据 'payload' 和 'headers' 变量评估 SpEL 表达式。或者,要记录完整的消息结果,请为“log-full-message”属性toString()提供一个值。true默认情况下,false只记录有效负载。将其设置为true启用除有效负载之外的所有标头的日志记录。'expression' 选项提供了最大的灵活性(例如,expression="payload.user.name")。

关于窃听器和其他类似组件的常见误解之一(消息发布配置) 是它们本质上是自动异步的。默认情况下,不会异步调用作为组件的窃听器。相反,Spring Integration 专注于配置异步行为的单一统一方法:消息通道。使消息流的某些部分同步或异步的原因是已在该流中配置的消息通道的类型。这是消息通道抽象的主要好处之一。从框架诞生之初,我们就一直强调消息通道作为框架一等公民的必要性和价值。它不仅仅是 EIP 模式的内部隐式实现。它作为可配置组件完全公开给最终用户。因此,wire tap 组件只负责执行以下任务:

  • 通过点击通道来拦截消息流(例如,channelA

  • 抓住每条消息

  • 将消息发送到另一个频道(例如,channelB

它本质上是桥接模式的一种变体,但它被封装在通道定义中(因此更容易启用和禁用而不会中断流)。此外,与网桥不同的是,它基本上分叉了另一个消息流。该流程是同步的还是异步的?答案取决于“channelB”的消息通道类型。我们有以下选项:直接通道、可轮询通道和执行器通道。最后两个打破了线程边界,使这些通道上的通信异步,因为从该通道向其订阅的处理程序分派消息发生在与用于将消息发送到该通道的线程不同的线程上。这就是使您的窃听流同步或异步的原因。它与框架内的其他组件(例如消息发布者)保持一致,并且通过让您不必提前担心(除了编写线程安全代码之外)是否应该将特定代码实现为,从而增加了一致性和简单性同步或异步。两条代码(例如,组件 A 和组件 B)在消息通道上的实际连接是使它们的协作同步或异步的原因。您甚至可能希望将来从同步更改为异步,而消息通道可让您快速完成此操作,而无需接触代码。两条代码(例如,组件 A 和组件 B)在消息通道上的实际连接是使它们的协作同步或异步的原因。您甚至可能希望将来从同步更改为异步,而消息通道可让您快速完成此操作,而无需接触代码。两条代码(例如,组件 A 和组件 B)在消息通道上的实际连接是使它们的协作同步或异步的原因。您甚至可能希望将来从同步更改为异步,而消息通道可让您快速完成此操作,而无需接触代码。

关于窃听的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快传递消息。因此,使用异步通道选项作为接线的出站通道是很常见的。但是,默认情况下我们不强制执行异步行为。如果我们这样做,有许多用例会中断,包括您可能不想打破事务边界。也许您将窃听模式用于审计目的,并且您确实希望在原始事务中发送审计消息。例如,您可以将接线连接到 JMS 出站通道适配器。这样,您将获得两全其美:

从版本 4.0 开始,当拦截器(例如WireTapclass)引用通道时,避免循环引用很重要。您需要从当前拦截器拦截的通道中排除此类通道。这可以通过适当的模式或以编程方式完成。如果您有一个ChannelInterceptor引用 a的自定义channel,请考虑实施VetoCapableInterceptor. 这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。您还可以在拦截器方法中添加运行时保护,以确保通道不是被拦截器引用的通道。使用WireTap这两种技术。

从版本 4.3 开始,WireTap有额外的构造函数,channelName而不是 MessageChannel实例。这对于 Java 配置和使用通道自动创建逻辑时很方便。目标bean在与拦截器的第一次交互时MessageChannel从稍后提供的解析。channelName

通道解析需要一个BeanFactory,所以接线实例必须是 Spring 管理的 bean。

这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如以下示例所示:

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
有条件的丝锥

可以使用selectororselector-expression属性来设置分接头。selector引用一个bean ,MessageSelector它可以在运行时确定消息是否应该发送到 tap 通道。类似地,selector-expression是执行相同目的的布尔 SpEL 表达式:如果表达式的计算结果为true,则将消息发送到点击通道。

全局接线配置

可以将全局线接头配置为全局通道拦截器配置的特例。为此,请配置顶级wire-tap元素。现在,除了正常的wire-tap命名空间支持之外,还支持patternorder属性,并且它们的工作方式与它们对channel-interceptor. 以下示例显示了如何配置全局接线:

java
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
XML
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局接线提供了一种在外部配置单通道接线的便捷方式,无需修改现有通道配置。为此,请将pattern属性设置为目标通道名称。例如,您可以使用此技术配置测试用例以验证通道上的消息。

特殊频道

默认情况下,在应用程序上下文中定义了两个特殊通道:errorChannelnullChannel. 'nullChannel'( 的一个实例NullChannel)的作用类似于/dev/null,在级别记录发送给它的任何消息DEBUG并立即返回。对已发送消息的有效负载进行特殊处理org.reactivestreams.Publisher:立即在该通道中订阅它,以启动反应流处理,尽管数据被丢弃。反应式流处理引发的错误(请参阅 参考资料Subscriber.onError(Throwable))记录在警告级别下,以便进行可能的调查。如果需要对此类错误进行任何处理,可以将ReactiveRequestHandlerAdviceMono.doOnError()定义应用到消息处理程序,从而产生Mono对此的回复nullChannel. 每当您遇到您不关心的回复的通道解析错误时,您可以将受影响组件的output-channel属性设置为“nullChannel”(名称“nullChannel”在应用程序上下文中保留)。

'errorChannel' 在内部用于发送错误消息,并且可以用自定义配置覆盖。这在错误处理中有更详细的讨论。

有关消息通道和拦截器的更多信息,另请参阅Java DSL 章节中的消息通道。

轮询器

本节介绍 Spring Integration 中轮询的工作原理。

轮询消费者

当消息端点(通道适配器)连接到通道并被实例化时,它们会产生以下实例之一:

实际实现取决于这些端点连接的通道类型。连接到实现org.springframework.messaging.SubscribableChannel接口的通道的通道适配器会生成EventDrivenConsumer. 另一方面,连接到实现org.springframework.messaging.PollableChannel接口的通道(例如 a QueueChannel)的通道适配器会生成 的实例PollingConsumer

轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。

它们代表了许多消息传递场景中的关键横切关注点。在 Spring Integration 中,轮询消费者基于同名模式,该模式在 Gregor Hohpe 和 Bobby Woolf所著的Enterprise Integration Patterns一书中进行了描述。您可以在本书的网站上找到该模式的描述。

可轮询消息源

Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常由SourcePollingChannelAdapter. 例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置有轮询器以定期检索消息。因此,当组件配置有轮询器时,生成的实例属于以下类型之一:

这意味着轮询器用于入站和出站消息传递方案。以下是使用轮询器的一些用例:

  • 轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务

  • 轮询内部(可轮询)消息通道

  • 轮询内部服务(例如在 Java 类上重复执行方法)

AOP 建议类可以应用于轮询器,advice-chain例如用于启动事务的事务建议。从 4.1 版开始,PollSkipAdvice提供了 a。轮询器使用触发器来确定下一次轮询的时间。可PollSkipAdvice用于抑制(跳过)轮询,可能是因为某些下游条件会阻止消息被处理。要使用此建议,您必须为其提供一个PollSkipStrategy. 从版本 4.2.5 开始,SimplePollSkipStrategy提供了 a。要使用它,您可以将实例作为 bean 添加到应用程序上下文中,将其注入到 aPollSkipAdvice中,然后将其添加到轮询器的建议链中。要跳过轮询,请调用skipPolls()。要恢复轮询,请调用reset()。4.2 版在这方面增加了更多的灵活性。看消息源的条件轮询器

本章仅对轮询消费者以及它们如何适应消息通道(请参阅消息通道)和通道适配器(请参阅通道适配器)的概念进行高级概述。有关一般消息端点和特别是轮询消费者的更多信息,请参阅消息端点

延迟确认可轮询消息源

从版本 5.0.1 开始,某些模块提供MessageSource了支持延迟确认直到下游流程完成(或将消息移交给另一个线程)的实现。目前仅限于AmqpMessageSourceKafkaMessageSource

使用这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK标头(请参阅MessageHeaderAccessorAPI)被添加到消息中。与可轮询消息源一起使用时,标头的值是 的实例AcknowledgmentCallback,如以下示例所示:

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

并非所有消息源(例如 a KafkaMessageSource)都支持该REJECT状态。它的处理方式与 相同ACCEPT

应用程序可以随时确认消息,如以下示例所示:

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果MessageSource连接到 a SourcePollingChannelAdapter,当轮询线程在下游流程完成后返回到适配器时,适配器会检查确认是否已经被确认,如果没有,则将其状态设置为ACCEPT它(或者REJECT如果流程抛出异常) . 状态值在AcknowledgmentCallback.Status枚举中定义。

Spring Integration 提供MessageSourcePollingTemplateMessageSource. 这也负责设置回调何时返回(ACCEPT或抛出异常)REJECT。以下示例显示如何使用 进行轮询:AcknowledgmentCallbackMessageHandlerMessageSourcePollingTemplate

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在这两种情况下(SourcePollingChannelAdapter和),您都可以通过调用回调MessageSourcePollingTemplate来禁用自动确认/确认。noAutoAck()如果您将消息传递给另一个线程并希望稍后确认,您可能会这样做。并非所有实现都支持这一点(例如,Apache Kafka 不支持,因为偏移提交必须在同一个线程上执行)。

消息源的条件轮询器

本节介绍如何使用条件轮询器。

背景

Advice轮询器中的对象advice-chain建议整个轮询任务(消息检索和处理)。这些“环绕建议”方法无法访问任何轮询上下文——只能访问轮询本身。如前所述,这对于诸如使任务具有事务性或由于某些外部条件而跳过轮询之类的要求很好。如果我们希望根据receive轮询部分的结果采取一些行动,或者如果我们想根据条件调整轮询器怎么办?对于这些实例,Spring Integration 提供“智能”轮询。

“智能”投票

5.3 版引入了该ReceiveMessageAdvice接口。(AbstractMessageSourceAdvice已弃用 ,取而代之的是 中的default方法MessageSourceMutator。) 中实现此接口的任何Advice对象advice-chain仅适用于接收操作 -MessageSource.receive()PollableChannel.receive(timeout)。因此它们只能应用于SourcePollingChannelAdapteror PollingConsumer。此类类实现以下方法:

  • beforeReceive(Object source) 此方法在方法之前调用Object.receive()。它允许您检查和重新配置源。返回false取消此轮询(类似于PollSkipAdvice前面提到的)。

  • Message<?> afterReceive(Message<?> result, Object source) 该方法在方法之后调用receive()。同样,您可以重新配置源或采取任何操作(可能取决于结果,null如果源没有创建消息)。您甚至可以返回不同的消息

线程安全

如果通知改变了 ,则不应使用TaskExecutor. 如果一个通知改变了源,这种突变不是线程安全的,并且可能导致意想不到的结果,尤其是对于高频轮询器。如果您需要同时处理轮询结果,请考虑使用下游ExecutorChannel而不是向轮询器添加执行器。

建议链订购

您应该了解在初始化期间如何处理建议链。 Advice未实现ReceiveMessageAdvice的对象将应用于整个轮询过程,并在 any 之前按顺序首先调用ReceiveMessageAdvice。然后围绕源方法ReceiveMessageAdvice按顺序调用对象。receive()例如,如果您有Adviceobjects a, b, c, d、 wherebdare ReceiveMessageAdvice,则这些对象将按以下顺序应用:a, c, b, d。此外,如果源已经是 a Proxy,则在任何现有对象ReceiveMessageAdvice之后调用。Advice如果您想更改订单,您必须自己连接代理。

SimpleActiveIdleReceiveMessageAdvice

(以前SimpleActiveIdleMessageSourceAdvice的 for onlyMessageSource已弃用。)此建议是ReceiveMessageAdvice. 当与 a 一起使用时DynamicPeriodicTrigger,它会根据之前的轮询是否产生消息来调整轮询频率。轮询器还必须具有对相同的引用DynamicPeriodicTrigger

重要:异步切换
SimpleActiveIdleReceiveMessageAdvice根据receive()结果​​修改触发器。这仅在轮询线程上调用通知时才有效。如果轮询器有task-executor. 要在轮询结果后使用异步操作的地方使用此建议,请稍后执行异步切换,可能使用ExecutorChannel.
CompoundTriggerAdvice

此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用CronTrigger. CronTrigger实例是不可变的,因此一旦构造它们就不能更改。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,当检索到消息时,恢复使用 cron 表达式。

建议(和轮询器)CompoundTrigger为此目的使用 a 。触发器的primary触发器可以是CronTrigger. 当通知检测到没有收到消息时,它会将辅助触发器添加到CompoundTrigger. 当CompoundTrigger实例的nextExecutionTime方法被调用时,它会委托给辅助触发器(如果存在)。否则,它会委托给主触发器。

轮询器还必须具有对相同的引用CompoundTrigger

以下示例显示了每小时 cron 表达式的配置,并回退到每分钟:

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要:异步切换
CompoundTriggerAdvice根据receive()结果​​修改触发器。这仅在轮询线程上调用通知时才有效。如果轮询器有task-executor. 要在轮询结果后使用异步操作的地方使用此建议,请稍后执行异步切换,可能使用ExecutorChannel.
MessageSource-only 建议

有些建议可能仅适用于 ,MessageSource.receive()而对PollableChannel. 为此,仍然存在一个MessageSourceMutator接口( 的扩展)。ReceiveMessageAdvice使用default方法,它完全取代了已经弃用的方法AbstractMessageSourceAdvice,应该在那些只需要MessageSource代理的实现中使用。有关详细信息,请参阅入站通道适配器:轮询多个服务器和目录

通道适配器

通道适配器是一个消息端点,可以将单个发送者或接收者连接到消息通道。Spring Integration 提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。本参考指南的后续章节将讨论每个适配器。但是,本章重点介绍简单但灵活的方法调用通道适配器支持。有入站和出站适配器,每个都可以使用核心命名空间中提供的 XML 元素进行配置。这些提供了一种扩展 Spring Integration 的简单方法,只要您有一个可以作为源或目标调用的方法。

配置入站通道适配器

inbound-channel-adapter元素(Java 配置中的 a )可以调用 Spring 管理的对象上的任何方法,并在将方法的输出转换SourcePollingChannelAdapter为 a 之后将非 null 返回值发送到 a 。当适配器的订阅被激活时,轮询器会尝试从源接收消息。根据提供的配置安排轮询器。要为单个通道适配器配置轮询间隔或 cron 表达式,您可以提供具有调度属性之一的“poller”元素,例如“fixed-rate”或“cron”。以下示例定义了两个实例:MessageChannelMessageTaskSchedulerinbound-channel-adapter

Java DSL
@Bean
public IntegrationFlow source1() {
    return IntegrationFlows.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlows.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
java
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
科特林 DSL
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
XML
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果没有提供轮询器,则必须在上下文中注册一个默认轮询器。有关更多详细信息,请参阅端点命名空间支持
重要:轮询器配置

所有inbound-channel-adapter类型都由 a 支持SourcePollingChannelAdapter,这意味着它们包含一个 poller 配置,该配置根据 Poller 中指定的配置轮询MessageSource(以调用生成成为Message有效负载的值的自定义方法)。以下示例显示了两个轮询器的配置:

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

max-messages-per-poll在第一种配置中,每次轮询调用一次轮询任务,并且在每个任务(轮询)期间,根据属性值调用一次方法(导致生成消息) 。在第二种配置中,每次轮询会调用轮询任务 10 次,或者直到它返回“null”,因此每次轮询可能会产生 10 条消息,而每次轮询以一秒的间隔发生。但是,如果配置类似于以下示例,会发生什么情况:

<int:poller fixed-rate="1000"/>

请注意,没有max-messages-per-poll指定。正如我们稍后介绍的那样,相同的轮询器配置PollingConsumer(例如service-activator,、、filterrouter)将具有默认值-1for max-messages-per-poll,这意味着“除非轮询方法返回 null(可能是因为有)”中没有更多消息,QueueChannel然后休眠一秒钟。

但是,在 中SourcePollingChannelAdapter,它有点不同。的默认值为,除非您明确将其设置max-messages-per-poll1负值(例如-1)。这确保轮询器可以对生命周期事件(例如启动和停止)做出反应,并防止它在自定义方法的实现MessageSource有可能永远不会返回 null 并且恰好是非可中断的。

但是,如果您确定您的方法可以返回 null 并且您需要在每次轮询时轮询尽可能多的可用源,则应显式设置max-messages-per-poll为负值,如以下示例所示:

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

从版本 5.5 开始,0值 formax-messages-per-poll具有特殊含义 -MessageSource.receive()完全跳过调用,这可能被视为暂停此入站通道适配器,直到maxMessagesPerPoll稍后更改为非零值,例如通过控制总线。

另请参阅全局默认轮询器以获取更多信息。

配置出站通道适配器

outbound-channel-adapter元素(@ServiceActivator用于 Java 配置的 a)还可以将 a 连接到MessageChannel任何 POJO 使用者方法,该方法应使用发送到该通道的消息的有效负载来调用。以下示例显示了如何定义出站通道适配器:

Java DSL
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
java
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
科特林 DSL
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
XML
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果要适配的频道是 a PollableChannel,则必须提供 poller 子元素( 上的@Poller子注释@ServiceActivator),如以下示例所示:

java
public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
XML
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

ref如果 POJO 消费者实现可以在其他<outbound-channel-adapter>定义中重用,则应该使用属性。但是,如果消费者实现仅由 的单个定义引用,则<outbound-channel-adapter>可以将其定义为内部 bean,如以下示例所示:

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一配置中 同时使用ref属性和内部处理程序定义,因为它会创建模棱两可的条件。<outbound-channel-adapter>这样的配置会导致抛出异常。

任何通道适配器都可以在没有channel引用的情况下创建,在这种情况下,它会隐式创建DirectChannel. 创建的频道名称与or元素的id属性相匹配。因此,如果未提供,则为必填项。<inbound-channel-adapter><outbound-channel-adapter>channelid

通道适配器表达式和脚本

与许多其他 Spring Integration 组件一样,它<inbound-channel-adapter><outbound-channel-adapter>提供对 SpEL 表达式评估的支持。要使用 SpEL,请在 'expression' 属性中提供表达式字符串,而不是提供用于 bean 上的方法调用的 'ref' 和 'method' 属性。计算表达式时,它遵循与方法调用相同的约定,其中:<inbound-channel-adapter>每当计算结果为非空值时,an 的表达式都会生成一条消息,而 an 的表达式<outbound-channel-adapter>必须等效于返回 void方法调用。

从 Spring Integration 3.0 开始,<int:inbound-channel-adapter/>还可以使用 SpEL <expression/>(或什至使用<script/>)子元素来配置 an,当需要比使用简单的 'expression' 属性更复杂时。Resource如果您使用该属性提供脚本作为 a location,您还可以设置refresh-check-delay,这允许定期刷新资源。如果您希望在每次轮询时检查脚本,则需要将此设置与轮询器的触发器协调,如以下示例所示:

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另请参阅使用子元素时的cacheSeconds属性。有关表达式的更多信息,请参阅Spring 表达式语言 (SpEL)。有关脚本,请参阅Groovy 支持脚本支持ReloadableResourceBundleExpressionSource<expression/>

( ) 是一个端点<int:inbound-channel-adapter/>SourcePollingChannelAdapter它通过定期触发轮询一些底层来启动消息流MessageSource。由于在轮询时没有消息对象,因此表达式和脚本无权访问 root Message,因此在大多数其他消息传递 SpEL 表达式中没有可用的有效负载或标头属性。该脚本可以生成并返回Message带有标头和有效负载的完整对象,也可以仅返回一个有效负载,框架将其添加到带有基本标头的消息中。

消息桥

消息传递桥是连接两个消息通道或通道适配器的相对简单的端点。例如,您可能希望将 a 连接PollableChannel到 a SubscribableChannel,以便订阅端点不必担心任何轮询配置。相反,消息传递桥提供轮询配置。

通过在两个通道之间提供中间轮询器,您可以使用消息传递桥来限制入站消息。轮询器的触发器确定消息到达第二个通道的速率,轮询器的maxMessagesPerPoll属性强制限制吞吐量。

消息传递桥的另一个有效用途是连接两个不同的系统。在这种情况下,Spring Integration 的角色仅限于在这些系统之间建立连接并在必要时管理轮询器。在两个系统之间至少有一个转换器可能更常见,以便在它们的格式之间进行转换。在这种情况下,通道可以作为变压器端点的“输入通道”和“输出通道”提供。如果不需要数据格式转换,消息传递桥可能确实足够了。

使用 XML 配置网桥

您可以使用该<bridge>元素来创建两个消息通道或通道适配器之间的消息传递桥。为此,请提供input-channeloutput-channel属性,如以下示例所示:

<int:bridge input-channel="input" output-channel="output"/>

如上所述,消息传递桥的一个常见用例是将 a 连接PollableChannel到 a SubscribableChannel。执行此角色时,消息传递桥也可以用作节流器:

<int:bridge input-channel="pollable" output-channel="subscribable">
     <int:poller max-messages-per-poll="10" fixed-rate="5000"/>
 </int:bridge>

您可以使用类似的机制来连接通道适配器。以下示例显示了Spring Integration 命名空间中的stdinstdout适配器之间的简单“回声” stream

<int-stream:stdin-channel-adapter id="stdin"/>

 <int-stream:stdout-channel-adapter id="stdout"/>

 <int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>

类似的配置适用于其他(可能更有用的)通道适配器桥,例如文件到 JMS 或邮件到文件。即将到来的章节将介绍各种通道适配器。

如果在网桥上没有定义“输出通道”,则使用入站消息提供的回复通道(如果可用)。如果输出和回复通道均不可用,则会引发异常。

使用 Java 配置配置网桥

以下示例显示了如何使用@BridgeFrom注解在 Java 中配置网桥:

@Bean
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
    return new DirectChannel();
}

以下示例显示了如何使用@BridgeTo注解在 Java 中配置网桥:

@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
public SubscribableChannel direct() {
    return new DirectChannel();
}

或者,您可以使用 a BridgeHandler,如以下示例所示:

@Bean
@ServiceActivator(inputChannel = "polled",
        poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
    BridgeHandler bridge = new BridgeHandler();
    bridge.setOutputChannelName("direct");
    return bridge;
}

使用 Java DSL 配置网桥

您可以使用 Java 领域特定语言 (DSL) 来配置网桥,如以下示例所示:

@Bean
public IntegrationFlow bridgeFlow() {
    return IntegrationFlows.from("polled")
            .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
            .channel("direct")
            .get();
}

1. see XML Configuration