本节涵盖 Spring Integration 中核心消息传递 API 的所有方面。它涵盖了消息、消息通道和消息端点。它还涵盖了许多企业集成模式,例如过滤器、路由器、转换器、服务激活器、拆分器和聚合器。
本节还包含有关系统管理的材料,包括控制总线和消息历史支持。
消息渠道
消息渠道
虽然它Message
在封装数据方面起着至关重要的作用,但它MessageChannel
使消息生产者与消息消费者分离。
消息通道接口
Spring Integration 的顶层MessageChannel
接口定义如下:
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
发送消息时,返回值为true
消息是否发送成功。如果发送调用超时或被中断,则返回false
。
PollableChannel
由于消息通道可能会或可能不会缓冲消息(如Spring Integration Overview中所讨论的),因此两个子接口定义了缓冲(可轮询)和非缓冲(可订阅)通道行为。以下清单显示了PollableChannel
接口的定义:
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
与 send 方法一样,接收消息时,在超时或中断的情况下返回值为 null。
消息通道实现
Spring Integration 提供了几种不同的消息通道实现。以下部分简要介绍了每一部分。
PublishSubscribeChannel
该实现将发送给它的PublishSubscribeChannel
任何内容广播到其所有订阅的处理程序。Message
这最常用于发送事件消息,其主要作用是通知(与通常旨在由单个处理程序处理的文档消息相反)。请注意,PublishSubscribeChannel
它仅用于发送。由于它在调用其方法时直接向其订阅者广播send(Message)
,因此消费者无法轮询消息(它没有实现PollableChannel
,因此没有receive()
方法)。相反,任何订阅者本身必须是 a ,然后依次调用MessageHandler
订阅者的方法。handleMessage(Message)
在 3.0 版之前,在没有返回订阅者send
的 a 上调用该方法。当与 a 一起使用时,会抛出 a。从版本 3.0 开始,行为发生了变化,如果至少存在最少的订阅者(并成功处理消息),则始终认为 a 是成功的。可以通过设置属性来修改此行为,该属性默认为.PublishSubscribeChannel
false
MessagingTemplate
MessageDeliveryException
send
minSubscribers
0
如果您使用 a TaskExecutor ,则仅使用正确数量的订阅者来进行此确定,因为消息的实际处理是异步执行的。
|
QueueChannel
该QueueChannel
实现包装了一个队列。与 不同PublishSubscribeChannel
,QueueChannel
具有点对点语义。换句话说,即使通道有多个消费者,也只有其中一个应该接收Message
发送到该通道的任何内容。它提供了一个默认的无参数构造函数(提供了一个基本上无限的容量Integer.MAX_VALUE
)以及一个接受队列容量的构造函数,如下面的清单所示:
public QueueChannel(int capacity)
未达到其容量限制的通道将消息存储在其内部队列中,并且该send(Message<?>)
方法立即返回,即使没有接收者准备好处理该消息。如果队列已满,发送方将阻塞,直到队列中有可用空间。或者,如果您使用具有附加超时参数的发送方法,则队列将阻塞,直到任一房间可用或超时期限结束,以先发生者为准。同样,一个receive()
如果队列中有消息可用,则调用立即返回,但是,如果队列为空,则接收调用可能会阻塞,直到消息可用或超时(如果提供)过去。在任何一种情况下,都可以通过传递超时值 0 来强制立即返回,而不管队列的状态如何。但是请注意,对版本send()
和receive()
不带timeout
参数块的调用会无限期地调用。
PriorityChannel
尽管QueueChannel
强制执行先进先出 (FIFO) 排序,但PriorityChannel
它是一种替代实现,它允许基于优先级在通道内对消息进行排序。默认情况下,优先级由priority
每条消息中的标头确定。Comparator<Message<?>>
但是,对于自定义优先级确定逻辑,可以向PriorityChannel
构造函数提供类型比较器。
RendezvousChannel
这RendezvousChannel
启用了“直接切换”方案,其中发送方阻塞,直到另一方调用通道的receive()
方法。对方阻塞,直到发送者发送消息。在内部,这个实现与 非常相似QueueChannel
,除了它使用一个SynchronousQueue
(一个零容量的实现BlockingQueue
)。这适用于发送方和接收方在不同线程中操作的情况,但不适合将消息异步丢弃到队列中。换句话说,使用 a RendezvousChannel
,发送者知道某个接收者已经接受了消息,而使用 a QueueChannel
,消息将被存储到内部队列中并且可能永远不会收到。
请记住,所有这些基于队列的通道默认情况下仅将消息存储在内存中。当需要持久性时,您可以在“队列”元素中提供“消息存储”属性以引用持久性MessageStore 实现,或者您可以将本地通道替换为由持久性代理支持的通道,例如 JMS 支持的通道或通道适配器。后一个选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如JMS 支持中所述。但是,当不需要在队列中缓冲时,最简单的方法是依赖DirectChannel ,将在下一节中讨论。
|
这RendezvousChannel
对于实现请求-回复操作也很有用。发件人可以创建一个临时的匿名实例RendezvousChannel
,然后在构建Message
. 发送后Message
,发送者可以立即调用receive
(可选地提供超时值)以便在等待回复时阻塞Message
。这与许多 Spring Integration 的请求-回复组件内部使用的实现非常相似。
DirectChannel
具有DirectChannel
点对点语义,但在其他方面更类似于PublishSubscribeChannel
前面描述的任何基于队列的通道实现。它实现了SubscribableChannel
接口而不是PollableChannel
接口,因此它直接将消息分派给订阅者。然而,作为一个点对点通道,它不同于PublishSubscribeChannel
将每个通道发送Message
给单个订阅的MessageHandler
.
除了是最简单的点对点通道选项之外,它最重要的特性之一是它允许单个线程在通道的“两侧”执行操作。例如,如果处理程序订阅了 a DirectChannel
,那么在方法调用返回之前,将 a 发送到该通道会直接在发送者的线程中Message
触发该处理程序的方法的调用。handleMessage(Message)
send()
提供具有此行为的通道实现的主要动机是支持必须跨越通道的事务,同时仍受益于通道提供的抽象和松散耦合。如果在事务范围内调用发送调用,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)中发挥作用。
由于这DirectChannel 是最简单的选项,并且不会增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。总体思路是为应用程序定义通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为基于队列的PollableChannels 。同样,如果一个频道需要广播消息,它不应该是 aDirectChannel 而应该是 a PublishSubscribeChannel 。稍后,我们将展示如何配置这些通道中的每一个。
|
内部DirectChannel
委托给消息调度程序以调用其订阅的消息处理程序,并且该调度程序可以具有由load-balancer
或load-balancer-ref
属性公开的负载平衡策略(互斥)。当多个消息处理程序订阅同一个通道时,消息调度程序使用负载平衡策略来帮助确定消息在消息处理程序之间的分布方式。为方便起见,该load-balancer
属性公开了指向预先存在的LoadBalancingStrategy
. A round-robin
(轮换处理程序之间的负载平衡)和none
(对于想要明确禁用负载平衡的情况)是唯一可用的值。在未来的版本中可能会添加其他策略实现。但是,从 3.0 版开始,您可以提供自己的实现LoadBalancingStrategy
并使用load-balancer-ref
属性注入它,该属性应指向实现 的 bean LoadBalancingStrategy
,如以下示例所示:
AFixedSubscriberChannel
是SubscribableChannel
只支持一个MessageHandler
不能取消订阅的订阅者。当不涉及其他订阅者并且不需要通道拦截器时,这对于高吞吐量性能用例很有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
注意load-balancer
和load-balancer-ref
属性是互斥的。
负载平衡还可以与布尔failover
属性结合使用。如果该failover
值为 true(默认值),则调度程序会在前面的处理程序抛出异常时回退到任何后续处理程序(根据需要)。顺序由在处理程序本身上定义的可选顺序值确定,如果不存在这样的值,则由处理程序订阅的顺序确定。
如果某种情况要求调度程序总是尝试调用第一个处理程序,然后每次发生错误时以相同的固定顺序回退,则不应提供负载平衡策略。换句话说,failover
即使没有启用负载平衡,调度程序仍然支持布尔属性。然而,如果没有负载平衡,处理程序的调用总是根据它们的顺序从第一个开始。例如,当对初级、二级、三级等有明确的定义时,这种方法效果很好。使用命名空间支持时,order
任何端点上的属性都会确定顺序。
请记住,负载平衡failover 仅适用于通道具有多个订阅消息处理程序的情况。使用命名空间支持时,这意味着多个端点共享input-channel 属性中定义的相同通道引用。
|
从版本 5.2 开始,当failover
为 true 时,当前处理程序的失败以及失败的消息将记录在debug
或info
分别配置下。
ExecutorChannel
这ExecutorChannel
是一个点对点通道,支持与DirectChannel
(负载平衡策略和failover
布尔属性)相同的调度程序配置。这两种调度通道类型之间的主要区别在于ExecutorChannel
委托实例TaskExecutor
执行调度。这意味着 send 方法通常不会阻塞,但这也意味着处理程序调用可能不会发生在发送方的线程中。因此,它不支持跨越发送方和接收处理程序的事务。
发件人有时可以阻止。例如,当使用TaskExecutor 带有限制客户端的拒绝策略(例如ThreadPoolExecutor.CallerRunsPolicy )时,发送者的线程可以在线程池达到其最大容量并且执行者的工作队列已满的任何时候执行该方法。由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行交易。
|
FluxMessageChannel
这FluxMessageChannel
是一种将消息发送到内部以供下游反应订阅者按需消费的org.reactivestreams.Publisher
实现。此通道实现既不是 a也不是 a ,因此只有实例可用于从该通道消费,以尊重反应流的背压特性。另一方面,实现 a的合约允许从响应式源发布者接收事件,将响应式流桥接到集成流中。为了实现整个集成流的完全反应行为,必须在流中的所有端点之间放置这样的通道。"sinking"
reactor.core.publisher.Flux
SubscribableChannel
PollableChannel
org.reactivestreams.Subscriber
FluxMessageChannel
ReactiveStreamsSubscribableChannel
subscribeTo(Publisher<Message<?>>)
有关与 Reactive Streams 交互的更多信息,请参阅Reactive Streams Support 。
范围频道
Spring Integration 1.0 提供了一个ThreadLocalChannel
实现,但在 2.0 中已被删除。现在处理相同需求的更通用方法是向scope
通道添加属性。属性的值可以是上下文中可用的范围的名称。例如,在 Web 环境中,某些范围是可用的,并且任何自定义范围实现都可以在上下文中注册。以下示例显示了应用于通道的线程本地范围,包括范围本身的注册:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
上例中定义的通道也在内部委托给了一个队列,但是通道绑定到当前线程,所以队列的内容也是类似绑定的。这样,发送到通道的线程稍后可以接收这些相同的消息,但没有其他线程能够访问它们。虽然很少需要线程范围的通道,但它们在DirectChannel
使用实例来强制执行单个操作线程但任何回复消息都应发送到“终端”通道的情况下很有用。如果该终端通道是线程范围的,则原始发送线程可以从终端通道收集其回复。
现在,由于可以对任何通道进行作用域,因此除了 thread-Local 之外,您还可以定义自己的作用域。
通道拦截器
消息传递体系结构的优点之一是能够提供常见行为并以非侵入方式捕获有关通过系统的消息的有意义信息。由于Message
实例是从MessageChannel
实例发送和接收的,因此这些通道提供了拦截发送和接收操作的机会。ChannelInterceptor
如下清单所示的策略接口为这些操作中的每一个提供了方法:
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
实现接口后,向通道注册拦截器只需进行以下调用:
channel.addInterceptor(someChannelInterceptor);
返回Message
实例的方法可用于转换Message
或可以返回 'null' 以防止进一步处理(当然,任何方法都可以抛出 a RuntimeException
)。此外,该preReceive
方法可以返回false
以阻止接收操作继续进行。
请记住,receive() 呼叫仅与PollableChannels . 事实上,SubscribableChannel 接口甚至没有定义receive() 方法。这样做的原因是,当 aMessage 被发送到 aSubscribableChannel 时,它会直接发送给零个或多个订阅者,具体取决于通道的类型(例如,aPublishSubscribeChannel 发送给它的所有订阅者)。因此,只有当拦截器应用于 a 时,才会调用 、 和 拦截preReceive(…) 器postReceive(…) 方法。
afterReceiveCompletion(…) PollableChannel |
Spring Integration 还提供了Wire Tap模式的实现。它是一个简单的拦截器,可以在Message
不改变现有流的情况下将其发送到另一个通道。它对于调试和监控非常有用。Wire Tap中显示了一个示例。
因为很少需要实现所有的拦截器方法,所以接口提供了无操作方法(方法返回void
方法没有代码,-返回方法按Message
原样返回Message
,boolean
方法返回true
)。
拦截器方法的调用顺序取决于通道的类型。如前所述,基于队列的通道是唯一首先拦截接收方法的通道。此外,发送和接收拦截之间的关系取决于单独的发送者和接收者线程的时间。例如,如果接收者在等待消息时已经被阻塞,则顺序可能如下:preSend , preReceive , postReceive , postSend 。但是,如果接收者在发送者在通道上放置消息并且已经返回之后进行轮询,则顺序如下:preSend , postSend (some-time-elapses), preReceive ,postReceive . 在这种情况下经过的时间取决于许多因素,因此通常是不可预测的(事实上,接收可能永远不会发生)。队列的类型也起作用(例如,集合点与优先级)。简而言之,除了preSend 前面postSend 和preReceive 前面的事实之外,您不能依赖顺序postReceive 。
|
从 Spring Framework 4.1 和 Spring Integration 4.1 开始,ChannelInterceptor
提供了新方法:afterSendCompletion()
和afterReceiveCompletion()
. 无论引发任何异常,它们都会在调用后send()' and 'receive()
调用,这允许资源清理。请注意,通道ChannelInterceptor
以与初始preSend()
和preReceive()
调用相反的顺序调用列表中的这些方法。
从 5.1 版开始,全局通道拦截器现在适用于动态注册的通道 - 例如通过使用beanFactory.initializeBean()
或IntegrationFlowContext
使用 Java DSL 初始化的 bean。以前,在刷新应用程序上下文后创建 bean 时不应用拦截器。
此外,从 5.1 版本开始,ChannelInterceptor.postReceive()
不再在没有收到消息时调用;不再需要检查null
Message<?>
. 以前,该方法被调用。如果您有一个依赖于先前行为的拦截器,请afterReceiveCompleted()
改为实现,因为该方法被调用,无论是否接收到消息。
从版本 5.2 开始,ChannelInterceptorAware 不推荐使用InterceptableChannel Spring Messaging 模块,现在它扩展了该模块以实现向后兼容性。
|
MessagingTemplate
当引入端点及其各种配置选项时,Spring Integration 为消息传递组件提供了基础,支持从消息传递系统对应用程序代码进行非侵入式调用。但是,有时需要从您的应用程序代码中调用消息传递系统。为了方便实现此类用例,Spring Integration 提供了MessagingTemplate
支持跨消息通道的各种操作,包括请求和回复场景。例如,可以发送请求并等待回复,如下所示:
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
在前面的示例中,模板将在内部创建一个临时匿名通道。'sendTimeout' 和 'receiveTimeout' 属性也可以在模板上设置,并且还支持其他交换类型。以下清单显示了此类方法的签名:
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
Message 输入.
_ _GatewayProxyFactoryBean |
配置消息通道
要创建消息通道实例,您可以使用<channel/>
xml 元素或DirectChannel
Java 配置实例,如下所示:
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
当您使用<channel/>
没有任何子元素的元素时,它会创建一个DirectChannel
实例 (a SubscribableChannel
)。
要创建发布-订阅通道,请使用<publish-subscribe-channel/>
元素(PublishSubscribeChannel
Java 中的 ),如下所示:
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
您也可以提供各种<queue/>
子元素来创建任何可轮询通道类型(如消息通道实现中所述)。以下部分显示了每种通道类型的示例。
DirectChannel
配置
如前所述,DirectChannel
是默认类型。以下清单显示了定义谁:
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
默认通道具有循环负载平衡器,并且还启用了故障转移(DirectChannel
有关详细信息,请参阅)。要禁用其中一项或两项,请添加一个<dispatcher/>
子元素( 的LoadBalancingStrategy
构造函数DirectChannel
)并按如下方式配置属性:
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
数据类型通道配置
有时,消费者只能处理特定类型的有效负载,这迫使您确保输入消息的有效负载类型。首先想到的可能是使用消息过滤器。但是,消息过滤器所能做的就是过滤掉不符合消费者要求的消息。另一种方法是使用基于内容的路由器并将具有不兼容数据类型的消息路由到特定的转换器,以强制转换和转换为所需的数据类型。这会起作用,但完成同样事情的更简单方法是应用数据类型通道模式。您可以为每个特定的有效负载数据类型使用单独的数据类型通道。
要创建仅接受包含特定负载类型的消息的数据类型通道,请在通道元素的datatype
属性中提供数据类型的完全限定类名称,如以下示例所示:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
请注意,对于可分配给通道数据类型的任何类型,类型检查都会通过。换句话说,numberChannel
前面示例中的 将接受有效负载为java.lang.Integer
或的消息java.lang.Double
。可以以逗号分隔列表的形式提供多种类型,如以下示例所示:
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前面示例中的“numberChannel”只接受数据类型为 的消息java.lang.Number
。但是,如果消息的有效负载不是所需的类型,会发生什么?这取决于您是否定义了一个名为的 bean integrationConversionService
,它是 Spring 的Conversion Service的一个实例。如果不是,那么 anException
将立即被抛出。但是,如果您定义了一个integrationConversionService
bean,它会被用来尝试将消息的有效负载转换为可接受的类型。
您甚至可以注册自定义转换器。例如,假设您将带有String
有效负载的消息发送到我们上面配置的“numberChannel”。您可以按如下方式处理该消息:
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常这将是一个完全合法的操作。但是,由于我们使用 Datatype Channel,因此此类操作的结果会产生类似于以下的异常:
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
发生异常是因为我们要求有效负载类型为 a Number
,但我们发送了String
. 所以我们需要一些东西来将 a 转换String
为 a Number
。为此,我们可以实现类似于以下示例的转换器:
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然后我们可以将它注册为集成转换服务的转换器,如以下示例所示:
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者在StringToIntegerConverter
标有@Component
自动扫描注释的类上。
解析“converter”元素时,integrationConversionService
如果尚未定义 bean,它将创建 bean。使用该转换器,send
操作现在将成功,因为数据类型通道使用该转换器将String
有效负载转换为Integer
.
有关有效负载类型转换的更多信息,请参阅有效负载类型转换。
从 4.0 版开始, 由integrationConversionService
调用DefaultDatatypeChannelMessageConverter
,它在应用程序上下文中查找转换服务。要使用不同的转换技术,您可以message-converter
在通道上指定属性。这必须是对MessageConverter
实现的引用。仅使用fromMessage
方法。它为转换器提供了对消息标头的访问(以防转换可能需要来自标头的信息,例如content-type
)。该方法只能返回转换后的有效负载或完整Message
对象。如果是后者,转换器必须小心复制入站消息中的所有标头。
或者,您可以声明一个ID 为<bean/>
的类型,并且所有通道都使用该转换器。MessageConverter
datatypeChannelMessageConverter
datatype
QueueChannel
配置
要创建QueueChannel
,请使用<queue/>
子元素。您可以按如下方式指定通道的容量:
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果您没有为此<queue/> 子元素的“容量”属性提供值,则生成的队列是无界的。为避免内存不足等问题,我们强烈建议您为有界队列设置显式值。
|
持久QueueChannel
配置
由于 aQueueChannel
提供了缓冲消息的能力,但默认情况下仅在内存中进行,因此它还引入了在系统故障时消息可能丢失的可能性。为了减轻这种风险,QueueChannel
可以通过MessageGroupStore
策略接口的持久实现来支持。有关MessageGroupStore
and的更多详细信息MessageStore
,请参阅消息存储。
使用capacity 属性时不允许使用该message-store 属性。
|
当 aQueueChannel
接收到 aMessage
时,它会将消息添加到消息存储中。当Message
从 a 轮询 aQueueChannel
时,它会从消息存储中删除。
默认情况下,aQueueChannel
将其消息存储在内存队列中,这可能导致前面提到的消息丢失情况。但是,Spring Integration 提供了持久存储,例如JdbcChannelMessageStore
.
QueueChannel
您可以通过添加属性为任何配置消息存储message-store
,如以下示例所示:
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(有关 Java/Kotlin 配置选项,请参阅下面的示例。)
Spring Integration JDBC 模块还为许多流行的数据库提供了模式数据定义语言 (DDL)。这些模式位于该模块 ( spring-integration-jdbc
) 的 org.springframework.integration.jdbc.store.channel 包中。
一个重要的特性是,对于任何事务性持久存储(例如JdbcChannelMessageStore ),只要轮询器配置了事务,从存储中删除的消息只有在事务成功完成时才能被永久删除。否则事务回滚,并且Message 不会丢失。
|
随着越来越多的与“NoSQL”数据存储相关的 Spring 项目开始为这些存储提供底层支持,消息存储的许多其他实现都可用。MessageGroupStore
如果找不到满足您特定需求的接口,您也可以提供自己的接口实现。
从 4.0 版开始,如果可能,我们建议将QueueChannel
实例配置为使用. ChannelMessageStore
与一般消息存储相比,这些通常针对此用途进行了优化。如果ChannelMessageStore
是 a ChannelPriorityMessageStore
,则按照优先级顺序在 FIFO 中接收消息。优先级的概念由消息存储实现确定。例如,以下示例显示了MongoDB Channel Message Store的 Java 配置:
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlows.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
注意MessageGroupQueue 上课。那是BlockingQueue 使用MessageGroupStore 操作的实现。
|
QueueChannel
自定义环境的另一个选项由子元素或其特定构造函数的ref
属性提供。<int:queue>
此属性提供对任何java.util.Queue
实现的引用。例如,一个 Hazelcast 分布式IQueue
可以配置如下:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
配置
要创建PublishSubscribeChannel
,请使用 <publish-subscribe-channel/> 元素。使用该元素时,还可以指定task-executor
用于发布消息(如果没有指定,则在发送者的线程中发布),如下:
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果您在 a 的下游提供重新排序器或聚合器PublishSubscribeChannel
,则可以将通道上的“应用序列”属性设置为true
。这样做表示通道应该在传递消息之前设置sequence-size
和sequence-number
消息头以及相关 ID。例如,如果有五个订阅者,则sequence-size
将设置为5
,并且消息的sequence-number
标头值范围从1
到5
。
除了Executor
,您还可以配置ErrorHandler
. 默认情况下,PublishSubscribeChannel
使用MessagePublishingErrorHandler
实现将错误发送到MessageChannel
标errorChannel
头或全局errorChannel
实例。如果 anExecutor
未配置,ErrorHandler
则忽略并直接向调用者的线程抛出异常。
如果您提供 aResequencer
或a 的Aggregator
下游PublishSubscribeChannel
,则可以将通道上的 'apply-sequence' 属性设置为true
。这样做表示通道应该在传递消息之前设置序列大小和序列号消息头以及相关 ID。例如,如果有五个订阅者,则序列大小将设置为5
,并且消息将具有范围从1
到的序列号标头值5
。
以下示例显示如何将apply-sequence
标头设置为true
:
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(false);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
该apply-sequence 值是false 默认值,以便发布-订阅通道可以将完全相同的消息实例发送到多个出站通道。由于 Spring Integration 强制有效负载和标头引用的不变性,当标志设置为true 时,通道创建Message 具有相同有效负载引用但不同标头值的新实例。
|
从版本 5.4.3 开始,PublishSubscribeChannel
还可以使用它的requireSubscribers
选项进行配置,BroadcastingDispatcher
以指示该频道在没有订阅者时不会静默忽略消息。当没有订阅者并且此选项设置为 时,会抛出带有MessageDispatchingException
消息的A。Dispatcher has no subscribers
true
ExecutorChannel
要创建ExecutorChannel
,请添加具有属性的<dispatcher>
子元素。task-executor
属性的值可以引用TaskExecutor
上下文中的任何值。例如,这样做可以配置线程池以将消息分派到订阅的处理程序。如前所述,这样做会破坏发送方和接收方之间的单线程执行上下文,因此任何活动的事务上下文都不会被处理程序的调用共享(即处理程序可能会抛出一个Exception
,但send
调用已经成功返回) . 以下示例显示了如何使用该元素并在属性dispatcher
中指定执行器:task-executor
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
PriorityChannel
配置
要创建PriorityChannel
,请使用<priority-queue/>
子元素,如以下示例所示:
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
默认情况下,通道会参考priority
消息的标头。但是,您可以改为提供自定义Comparator
参考。另外,请注意PriorityChannel
(与其他类型一样)确实支持该datatype
属性。与 一样QueueChannel
,它也支持capacity
属性。以下示例演示了所有这些:
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
从 4.0 版开始,priority-channel
子元素支持该message-store
选项(在这种情况comparator
下capacity
是不允许的)。消息存储必须是PriorityCapableChannelMessageStore
. PriorityCapableChannelMessageStore
目前为Redis
、JDBC
和提供了的实现MongoDB
。有关详细信息,请参阅QueueChannel
配置和消息存储。您可以在Backing Message Channels中找到示例配置。
RendezvousChannel
配置
ARendezvousChannel
在队列子元素是 a 时创建<rendezvous-queue>
。它不为前面描述的那些提供任何额外的配置选项,并且它的队列不接受任何容量值,因为它是一个零容量的直接切换队列。以下示例显示了如何声明 a RendezvousChannel
:
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道拦截器配置
消息通道也可能有拦截器,如Channel Interceptors中所述。<interceptors/>
子元素可以添加到一个(或更具体的<channel/>
元素类型)。您可以提供该ref
属性来引用任何实现该ChannelInterceptor
接口的 Spring 管理的对象,如以下示例所示:
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
一般来说,我们建议在单独的位置定义拦截器实现,因为它们通常提供可以跨多个通道重用的通用行为。
全局通道拦截器配置
通道拦截器提供了一种简洁明了的方式来应用每个通道的横切行为。如果应该在多个通道上应用相同的行为,那么为每个通道配置相同的拦截器集将不是最有效的方法。为了避免重复配置,同时也使拦截器可以应用于多个通道,Spring Integration 提供了全局拦截器。考虑以下一对示例:
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每个元素都允许您定义一个全局拦截器,该拦截器应用于与该属性<channel-interceptor/>
定义的任何模式匹配的所有通道。pattern
在上述情况下,全局拦截器应用于“thing1”通道和所有其他以“thing2”或“input”开头的通道,但不适用于以“thing3”开头的通道(从 5.0 版开始)。
将此语法添加到模式中会导致一个可能(尽管可能不太可能)的问题。如果您有一个名为的 bean ,并且您在通道拦截器的模式中!thing1 包含了一个模式,则它不再匹配。该模式现在匹配所有未命名的 bean 。在这种情况下,您可以使用 转义模式中的。该模式与名为 的 bean 匹配。
!thing1 pattern thing1 ! \ \!thing1 !thing1 |
当给定通道上有多个拦截器时,order 属性允许您管理此拦截器的注入位置。例如,通道“inputChannel”可以在本地配置单独的拦截器(见下文),如下例所示:
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一个合理的问题是“一个全局拦截器是如何相对于本地配置的其他拦截器或通过其他全局拦截器定义注入的?” 当前的实现提供了一种简单的机制来定义拦截器的执行顺序。属性中的正数order
确保拦截器在任何现有拦截器之后注入,而负数确保拦截器在现有拦截器之前注入。这意味着,在前面的示例中,全局拦截器是在本地配置的“wire-tap”拦截器之后(因为它order
大于)注入的。0
如果有另一个匹配的全局拦截器pattern
,它的顺序将通过比较两个拦截器的值来确定order
属性。order
要在现有拦截器之前注入全局拦截器,请为属性使用负值。
请注意,order 和pattern 属性都是可选的。的默认值为order 0,而pattern 的默认值为 '*'(匹配所有通道)。
|
丝锥
如前所述,Spring Integration 提供了一个简单的窃听拦截器。<interceptors/>
您可以在元素内的任何通道上配置接线。这样做对于调试特别有用,可以与 Spring Integration 的日志通道适配器结合使用,如下所示:
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 还接受 'expression' 属性,以便您可以根据 'payload' 和 'headers' 变量评估 SpEL 表达式。或者,要记录完整的消息结果,请为“log-full-message”属性toString() 提供一个值。true 默认情况下,false 只记录有效负载。将其设置为true 启用除有效负载之外的所有标头的日志记录。'expression' 选项提供了最大的灵活性(例如,expression="payload.user.name" )。
|
关于窃听器和其他类似组件的常见误解之一(消息发布配置) 是它们本质上是自动异步的。默认情况下,不会异步调用作为组件的窃听器。相反,Spring Integration 专注于配置异步行为的单一统一方法:消息通道。使消息流的某些部分同步或异步的原因是已在该流中配置的消息通道的类型。这是消息通道抽象的主要好处之一。从框架诞生之初,我们就一直强调消息通道作为框架一等公民的必要性和价值。它不仅仅是 EIP 模式的内部隐式实现。它作为可配置组件完全公开给最终用户。因此,wire tap 组件只负责执行以下任务:
-
通过点击通道来拦截消息流(例如,
channelA
) -
抓住每条消息
-
将消息发送到另一个频道(例如,
channelB
)
它本质上是桥接模式的一种变体,但它被封装在通道定义中(因此更容易启用和禁用而不会中断流)。此外,与网桥不同的是,它基本上分叉了另一个消息流。该流程是同步的还是异步的?答案取决于“channelB”的消息通道类型。我们有以下选项:直接通道、可轮询通道和执行器通道。最后两个打破了线程边界,使这些通道上的通信异步,因为从该通道向其订阅的处理程序分派消息发生在与用于将消息发送到该通道的线程不同的线程上。这就是使您的窃听流同步或异步的原因。它与框架内的其他组件(例如消息发布者)保持一致,并且通过让您不必提前担心(除了编写线程安全代码之外)是否应该将特定代码实现为,从而增加了一致性和简单性同步或异步。两条代码(例如,组件 A 和组件 B)在消息通道上的实际连接是使它们的协作同步或异步的原因。您甚至可能希望将来从同步更改为异步,而消息通道可让您快速完成此操作,而无需接触代码。两条代码(例如,组件 A 和组件 B)在消息通道上的实际连接是使它们的协作同步或异步的原因。您甚至可能希望将来从同步更改为异步,而消息通道可让您快速完成此操作,而无需接触代码。两条代码(例如,组件 A 和组件 B)在消息通道上的实际连接是使它们的协作同步或异步的原因。您甚至可能希望将来从同步更改为异步,而消息通道可让您快速完成此操作,而无需接触代码。
关于窃听的最后一点是,尽管上面提供了默认情况下不异步的基本原理,但您应该记住,通常希望尽快传递消息。因此,使用异步通道选项作为接线的出站通道是很常见的。但是,默认情况下我们不强制执行异步行为。如果我们这样做,有许多用例会中断,包括您可能不想打破事务边界。也许您将窃听模式用于审计目的,并且您确实希望在原始事务中发送审计消息。例如,您可以将接线连接到 JMS 出站通道适配器。这样,您将获得两全其美:
从版本 4.0 开始,当拦截器(例如WireTap class)引用通道时,避免循环引用很重要。您需要从当前拦截器拦截的通道中排除此类通道。这可以通过适当的模式或以编程方式完成。如果您有一个ChannelInterceptor 引用 a的自定义channel ,请考虑实施VetoCapableInterceptor . 这样,框架会根据提供的模式询问拦截器是否可以拦截每个候选通道。您还可以在拦截器方法中添加运行时保护,以确保通道不是被拦截器引用的通道。使用WireTap 这两种技术。
|
从版本 4.3 开始,WireTap
有额外的构造函数,channelName
而不是
MessageChannel
实例。这对于 Java 配置和使用通道自动创建逻辑时很方便。目标bean在与拦截器的第一次交互时MessageChannel
从稍后提供的解析。channelName
通道解析需要一个BeanFactory ,所以接线实例必须是 Spring 管理的 bean。
|
这种后期绑定方法还允许使用 Java DSL 配置简化典型的窃听模式,如以下示例所示:
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
有条件的丝锥
可以使用selector
orselector-expression
属性来设置分接头。selector
引用一个bean ,MessageSelector
它可以在运行时确定消息是否应该发送到 tap 通道。类似地,selector-expression
是执行相同目的的布尔 SpEL 表达式:如果表达式的计算结果为true
,则将消息发送到点击通道。
全局接线配置
可以将全局线接头配置为全局通道拦截器配置的特例。为此,请配置顶级wire-tap
元素。现在,除了正常的wire-tap
命名空间支持之外,还支持pattern
和order
属性,并且它们的工作方式与它们对channel-interceptor
. 以下示例显示了如何配置全局接线:
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全局接线提供了一种在外部配置单通道接线的便捷方式,无需修改现有通道配置。为此,请将pattern 属性设置为目标通道名称。例如,您可以使用此技术配置测试用例以验证通道上的消息。
|
特殊频道
默认情况下,在应用程序上下文中定义了两个特殊通道:errorChannel
和nullChannel
. 'nullChannel'( 的一个实例NullChannel
)的作用类似于/dev/null
,在级别记录发送给它的任何消息DEBUG
并立即返回。对已发送消息的有效负载进行特殊处理org.reactivestreams.Publisher
:立即在该通道中订阅它,以启动反应流处理,尽管数据被丢弃。反应式流处理引发的错误(请参阅 参考资料Subscriber.onError(Throwable)
)记录在警告级别下,以便进行可能的调查。如果需要对此类错误进行任何处理,可以将ReactiveRequestHandlerAdvice
自Mono.doOnError()
定义应用到消息处理程序,从而产生Mono
对此的回复nullChannel
. 每当您遇到您不关心的回复的通道解析错误时,您可以将受影响组件的output-channel
属性设置为“nullChannel”(名称“nullChannel”在应用程序上下文中保留)。
'errorChannel' 在内部用于发送错误消息,并且可以用自定义配置覆盖。这在错误处理中有更详细的讨论。
有关消息通道和拦截器的更多信息,另请参阅Java DSL 章节中的消息通道。
轮询器
本节介绍 Spring Integration 中轮询的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并被实例化时,它们会产生以下实例之一:
实际实现取决于这些端点连接的通道类型。连接到实现org.springframework.messaging.SubscribableChannel
接口的通道的通道适配器会生成EventDrivenConsumer
. 另一方面,连接到实现org.springframework.messaging.PollableChannel
接口的通道(例如 a QueueChannel
)的通道适配器会生成 的实例PollingConsumer
。
轮询消费者让 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。
它们代表了许多消息传递场景中的关键横切关注点。在 Spring Integration 中,轮询消费者基于同名模式,该模式在 Gregor Hohpe 和 Bobby Woolf所著的Enterprise Integration Patterns一书中进行了描述。您可以在本书的网站上找到该模式的描述。
可轮询消息源
Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常由SourcePollingChannelAdapter
. 例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器中描述的适配器配置有轮询器以定期检索消息。因此,当组件配置有轮询器时,生成的实例属于以下类型之一:
这意味着轮询器用于入站和出站消息传递方案。以下是使用轮询器的一些用例:
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如在 Java 类上重复执行方法)
AOP 建议类可以应用于轮询器,advice-chain 例如用于启动事务的事务建议。从 4.1 版开始,PollSkipAdvice 提供了 a。轮询器使用触发器来确定下一次轮询的时间。可PollSkipAdvice 用于抑制(跳过)轮询,可能是因为某些下游条件会阻止消息被处理。要使用此建议,您必须为其提供一个PollSkipStrategy . 从版本 4.2.5 开始,SimplePollSkipStrategy 提供了 a。要使用它,您可以将实例作为 bean 添加到应用程序上下文中,将其注入到 aPollSkipAdvice 中,然后将其添加到轮询器的建议链中。要跳过轮询,请调用skipPolls() 。要恢复轮询,请调用reset() 。4.2 版在这方面增加了更多的灵活性。看消息源的条件轮询器。
|
延迟确认可轮询消息源
从版本 5.0.1 开始,某些模块提供MessageSource
了支持延迟确认直到下游流程完成(或将消息移交给另一个线程)的实现。目前仅限于AmqpMessageSource
和KafkaMessageSource
。
使用这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头(请参阅MessageHeaderAccessor
API)被添加到消息中。与可轮询消息源一起使用时,标头的值是 的实例AcknowledgmentCallback
,如以下示例所示:
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如 a KafkaMessageSource
)都支持该REJECT
状态。它的处理方式与 相同ACCEPT
。
应用程序可以随时确认消息,如以下示例所示:
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果MessageSource
连接到 a SourcePollingChannelAdapter
,当轮询线程在下游流程完成后返回到适配器时,适配器会检查确认是否已经被确认,如果没有,则将其状态设置为ACCEPT
它(或者REJECT
如果流程抛出异常) . 状态值在AcknowledgmentCallback.Status
枚举中定义。
Spring Integration 提供MessageSourcePollingTemplate
对MessageSource
. 这也负责设置回调何时返回(ACCEPT
或抛出异常)REJECT
。以下示例显示如何使用 进行轮询:AcknowledgmentCallback
MessageHandler
MessageSourcePollingTemplate
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下(SourcePollingChannelAdapter
和),您都可以通过调用回调MessageSourcePollingTemplate
来禁用自动确认/确认。noAutoAck()
如果您将消息传递给另一个线程并希望稍后确认,您可能会这样做。并非所有实现都支持这一点(例如,Apache Kafka 不支持,因为偏移提交必须在同一个线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
Advice
轮询器中的对象advice-chain
建议整个轮询任务(消息检索和处理)。这些“环绕建议”方法无法访问任何轮询上下文——只能访问轮询本身。如前所述,这对于诸如使任务具有事务性或由于某些外部条件而跳过轮询之类的要求很好。如果我们希望根据receive
轮询部分的结果采取一些行动,或者如果我们想根据条件调整轮询器怎么办?对于这些实例,Spring Integration 提供“智能”轮询。
“智能”投票
5.3 版引入了该ReceiveMessageAdvice
接口。(AbstractMessageSourceAdvice
已弃用 ,取而代之的是 中的default
方法MessageSourceMutator
。) 中实现此接口的任何Advice
对象advice-chain
仅适用于接收操作 -MessageSource.receive()
和PollableChannel.receive(timeout)
。因此它们只能应用于SourcePollingChannelAdapter
or PollingConsumer
。此类类实现以下方法:
-
beforeReceive(Object source)
此方法在方法之前调用Object.receive()
。它允许您检查和重新配置源。返回false
取消此轮询(类似于PollSkipAdvice
前面提到的)。 -
Message<?> afterReceive(Message<?> result, Object source)
该方法在方法之后调用receive()
。同样,您可以重新配置源或采取任何操作(可能取决于结果,null
如果源没有创建消息)。您甚至可以返回不同的消息
线程安全
如果通知改变了 ,则不应使用 |
建议链订购
您应该了解在初始化期间如何处理建议链。
|
SimpleActiveIdleReceiveMessageAdvice
(以前SimpleActiveIdleMessageSourceAdvice
的 for onlyMessageSource
已弃用。)此建议是ReceiveMessageAdvice
. 当与 a 一起使用时DynamicPeriodicTrigger
,它会根据之前的轮询是否产生消息来调整轮询频率。轮询器还必须具有对相同的引用DynamicPeriodicTrigger
。
重要:异步切换
SimpleActiveIdleReceiveMessageAdvice 根据receive() 结果修改触发器。这仅在轮询线程上调用通知时才有效。如果轮询器有task-executor . 要在轮询结果后使用异步操作的地方使用此建议,请稍后执行异步切换,可能使用ExecutorChannel .
|
CompoundTriggerAdvice
此建议允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用CronTrigger
.
CronTrigger
实例是不可变的,因此一旦构造它们就不能更改。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果没有收到消息,则每分钟轮询一次,当检索到消息时,恢复使用 cron 表达式。
建议(和轮询器)CompoundTrigger
为此目的使用 a 。触发器的primary
触发器可以是CronTrigger
. 当通知检测到没有收到消息时,它会将辅助触发器添加到CompoundTrigger
. 当CompoundTrigger
实例的nextExecutionTime
方法被调用时,它会委托给辅助触发器(如果存在)。否则,它会委托给主触发器。
轮询器还必须具有对相同的引用CompoundTrigger
。
以下示例显示了每小时 cron 表达式的配置,并回退到每分钟:
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要:异步切换
CompoundTriggerAdvice 根据receive() 结果修改触发器。这仅在轮询线程上调用通知时才有效。如果轮询器有task-executor . 要在轮询结果后使用异步操作的地方使用此建议,请稍后执行异步切换,可能使用ExecutorChannel .
|
MessageSource-only 建议
有些建议可能仅适用于 ,MessageSource.receive()
而对PollableChannel
. 为此,仍然存在一个MessageSourceMutator
接口( 的扩展)。ReceiveMessageAdvice
使用default
方法,它完全取代了已经弃用的方法AbstractMessageSourceAdvice
,应该在那些只需要MessageSource
代理的实现中使用。有关详细信息,请参阅入站通道适配器:轮询多个服务器和目录。
通道适配器
通道适配器是一个消息端点,可以将单个发送者或接收者连接到消息通道。Spring Integration 提供了许多适配器来支持各种传输,例如 JMS、文件、HTTP、Web 服务、邮件等。本参考指南的后续章节将讨论每个适配器。但是,本章重点介绍简单但灵活的方法调用通道适配器支持。有入站和出站适配器,每个都可以使用核心命名空间中提供的 XML 元素进行配置。这些提供了一种扩展 Spring Integration 的简单方法,只要您有一个可以作为源或目标调用的方法。
配置入站通道适配器
inbound-channel-adapter
元素(Java 配置中的 a )可以调用 Spring 管理的对象上的任何方法,并在将方法的输出转换SourcePollingChannelAdapter
为 a 之后将非 null 返回值发送到 a 。当适配器的订阅被激活时,轮询器会尝试从源接收消息。根据提供的配置安排轮询器。要为单个通道适配器配置轮询间隔或 cron 表达式,您可以提供具有调度属性之一的“poller”元素,例如“fixed-rate”或“cron”。以下示例定义了两个实例:MessageChannel
Message
TaskScheduler
inbound-channel-adapter
@Bean
public IntegrationFlow source1() {
return IntegrationFlows.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.fixedRate(5000)))
...
.get();
}
@Bean
public IntegrationFlow source2() {
return IntegrationFlows.from(() -> new GenericMessage<>(...),
e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
...
.get();
}
public class SourceService {
@InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
Object method1() {
...
}
@InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
Object method2() {
...
}
}
@Bean
fun messageSourceFlow() =
integrationFlow( { GenericMessage<>(...) },
{ poller { it.fixedRate(5000) } }) {
...
}
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>
<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
<int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
另请参阅通道适配器表达式和脚本。
如果没有提供轮询器,则必须在上下文中注册一个默认轮询器。有关更多详细信息,请参阅端点命名空间支持。 |
重要:轮询器配置
所有
请注意,没有 但是,在 中 但是,如果您确定您的方法可以返回 null 并且您需要在每次轮询时轮询尽可能多的可用源,则应显式设置
从版本 5.5 开始, 另请参阅全局默认轮询器以获取更多信息。 |
配置出站通道适配器
outbound-channel-adapter
元素(@ServiceActivator
用于 Java 配置的 a)还可以将 a 连接到MessageChannel
任何 POJO 使用者方法,该方法应使用发送到该通道的消息的有效负载来调用。以下示例显示了如何定义出站通道适配器:
@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
return f -> f
.handle(myPojo, "handle");
}
public class MyPojo {
@ServiceActivator(channel = "channel1")
void handle(Object payload) {
...
}
}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
integrationFlow {
handle(myPojo, "handle")
}
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>
<beans:bean id="target" class="org.MyPojo"/>
如果要适配的频道是 a PollableChannel
,则必须提供 poller 子元素( 上的@Poller
子注释@ServiceActivator
),如以下示例所示:
public class MyPojo {
@ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
void handle(Object payload) {
...
}
}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
<int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>
<beans:bean id="target" class="org.MyPojo"/>
ref
如果 POJO 消费者实现可以在其他<outbound-channel-adapter>
定义中重用,则应该使用属性。但是,如果消费者实现仅由 的单个定义引用,则<outbound-channel-adapter>
可以将其定义为内部 bean,如以下示例所示:
<int:outbound-channel-adapter channel="channel" method="handle">
<beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
不允许在同一配置中
同时使用ref 属性和内部处理程序定义,因为它会创建模棱两可的条件。<outbound-channel-adapter> 这样的配置会导致抛出异常。
|
任何通道适配器都可以在没有channel
引用的情况下创建,在这种情况下,它会隐式创建DirectChannel
. 创建的频道名称与or元素的id
属性相匹配。因此,如果未提供,则为必填项。<inbound-channel-adapter>
<outbound-channel-adapter>
channel
id
通道适配器表达式和脚本
与许多其他 Spring Integration 组件一样,它<inbound-channel-adapter>
也<outbound-channel-adapter>
提供对 SpEL 表达式评估的支持。要使用 SpEL,请在 'expression' 属性中提供表达式字符串,而不是提供用于 bean 上的方法调用的 'ref' 和 'method' 属性。计算表达式时,它遵循与方法调用相同的约定,其中:<inbound-channel-adapter>
每当计算结果为非空值时,an 的表达式都会生成一条消息,而 an 的表达式<outbound-channel-adapter>
必须等效于返回 void方法调用。
从 Spring Integration 3.0 开始,<int:inbound-channel-adapter/>
还可以使用 SpEL <expression/>
(或什至使用<script/>
)子元素来配置 an,当需要比使用简单的 'expression' 属性更复杂时。Resource
如果您使用该属性提供脚本作为 a location
,您还可以设置refresh-check-delay
,这允许定期刷新资源。如果您希望在每次轮询时检查脚本,则需要将此设置与轮询器的触发器协调,如以下示例所示:
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
<int:poller max-messages-per-poll="1" fixed-delay="5000"/>
<script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>
另请参阅使用子元素时的cacheSeconds
属性。有关表达式的更多信息,请参阅Spring 表达式语言 (SpEL)。有关脚本,请参阅Groovy 支持和脚本支持。ReloadableResourceBundleExpressionSource
<expression/>
( ) 是一个端点<int:inbound-channel-adapter/> ,SourcePollingChannelAdapter 它通过定期触发轮询一些底层来启动消息流MessageSource 。由于在轮询时没有消息对象,因此表达式和脚本无权访问 root Message ,因此在大多数其他消息传递 SpEL 表达式中没有可用的有效负载或标头属性。该脚本可以生成并返回Message 带有标头和有效负载的完整对象,也可以仅返回一个有效负载,框架将其添加到带有基本标头的消息中。
|
消息桥
消息传递桥是连接两个消息通道或通道适配器的相对简单的端点。例如,您可能希望将 a 连接PollableChannel
到 a SubscribableChannel
,以便订阅端点不必担心任何轮询配置。相反,消息传递桥提供轮询配置。
通过在两个通道之间提供中间轮询器,您可以使用消息传递桥来限制入站消息。轮询器的触发器确定消息到达第二个通道的速率,轮询器的maxMessagesPerPoll
属性强制限制吞吐量。
消息传递桥的另一个有效用途是连接两个不同的系统。在这种情况下,Spring Integration 的角色仅限于在这些系统之间建立连接并在必要时管理轮询器。在两个系统之间至少有一个转换器可能更常见,以便在它们的格式之间进行转换。在这种情况下,通道可以作为变压器端点的“输入通道”和“输出通道”提供。如果不需要数据格式转换,消息传递桥可能确实足够了。
使用 XML 配置网桥
您可以使用该<bridge>
元素来创建两个消息通道或通道适配器之间的消息传递桥。为此,请提供input-channel
和output-channel
属性,如以下示例所示:
<int:bridge input-channel="input" output-channel="output"/>
如上所述,消息传递桥的一个常见用例是将 a 连接PollableChannel
到 a SubscribableChannel
。执行此角色时,消息传递桥也可以用作节流器:
<int:bridge input-channel="pollable" output-channel="subscribable">
<int:poller max-messages-per-poll="10" fixed-rate="5000"/>
</int:bridge>
您可以使用类似的机制来连接通道适配器。以下示例显示了Spring Integration 命名空间中的stdin
和stdout
适配器之间的简单“回声” stream
:
<int-stream:stdin-channel-adapter id="stdin"/>
<int-stream:stdout-channel-adapter id="stdout"/>
<int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>
类似的配置适用于其他(可能更有用的)通道适配器桥,例如文件到 JMS 或邮件到文件。即将到来的章节将介绍各种通道适配器。
如果在网桥上没有定义“输出通道”,则使用入站消息提供的回复通道(如果可用)。如果输出和回复通道均不可用,则会引发异常。 |
使用 Java 配置配置网桥
以下示例显示了如何使用@BridgeFrom
注解在 Java 中配置网桥:
@Bean
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
return new DirectChannel();
}
以下示例显示了如何使用@BridgeTo
注解在 Java 中配置网桥:
@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
return new QueueChannel();
}
@Bean
public SubscribableChannel direct() {
return new DirectChannel();
}
或者,您可以使用 a BridgeHandler
,如以下示例所示:
@Bean
@ServiceActivator(inputChannel = "polled",
poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("direct");
return bridge;
}