消息传递端点
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了很多关于驱动 Spring Integration 的各种消息传递组件的底层 API。如果您想真正了解幕后发生的事情,这些信息会很有帮助。但是,如果您想启动并运行各种元素的基于命名空间的简化配置,请暂时跳到端点命名空间支持。
如概述中所述,消息端点负责将各种消息传递组件连接到通道。在接下来的几章中,我们将介绍许多使用消息的不同组件。其中一些还能够发送回复消息。发送消息非常简单。如前面消息通道中所示,您可以将消息发送到消息通道。但是,接收有点复杂。主要原因是有两种消费者:轮询消费者和事件驱动消费者。
在这两者中,事件驱动的消费者要简单得多。无需管理和调度单独的 poller 线程,它们本质上是具有回调方法的侦听器。当连接到 Spring Integration 的一个可订阅消息通道时,这个简单的选项效果很好。但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来适应这两种类型的消费者。因此,消费者自己只需要实现回调接口即可。当需要轮询时,端点充当消费者实例的容器。好处类似于使用容器来托管消息驱动的 bean,但是,因为这些消费者是 Spring 管理的对象,在ApplicationContext
,它更接近于 Spring 自己的MessageListener
容器。
消息处理程序
Spring Integration 的MessageHandler
接口由框架内的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会MessageHandler
直接实现。然而,消息消费者使用它来实际处理所消费的消息,因此了解此策略接口确实有助于理解消费者的整体角色。接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但该接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。Spring Integration 提供了两个端点实现来托管这些基于回调的处理程序并让它们连接到消息通道。
事件驱动的消费者
因为它是两者中更简单的一个,所以我们首先介绍事件驱动的消费者端点。您可能还记得该SubscribableChannel
接口提供了一个subscribe()
方法,并且该方法接受一个MessageHandler
参数(如 中所示SubscribableChannel
)。以下清单显示了该subscribe
方法的定义:
subscribableChannel.subscribe(messageHandler);
由于订阅频道的处理程序不必主动轮询该频道,因此这是一个事件驱动的消费者,并且 Spring Integration 提供的实现接受 aSubscribableChannel
和 a MessageHandler
,如以下示例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 也提供了PollingConsumer
,除了通道必须实现 之外,它可以以相同的方式实例化,PollableChannel
如下例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。例如,触发器是必需的属性。以下示例显示了如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(30, TimeUnit.SECONDS));
PeriodicTrigger
通常使用简单的时间间隔(以毫秒为单位)定义,但也支持属性initialDelay
和布尔fixedRate
属性(默认为false
- 即没有固定延迟)。以下示例设置了这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);
前面示例中的三个设置的结果是等待五秒钟然后每秒触发一次的触发器。
CronTrigger
需要有效的 cron 表达式。有关详细信息,请参阅Javadoc。以下示例设置了一个新的CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是每十秒触发一次的触发器,从星期一到星期五。
除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPoll
和receiveTimeout
。以下示例显示了如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
该maxMessagesPerPoll
属性指定在给定轮询操作中接收的最大消息数。这意味着轮询器继续调用receive()
而不等待,直到null
返回或达到最大值。例如,如果轮询器具有 10 秒间隔触发器和maxMessagesPerPoll
设置25
,并且它正在轮询在其队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。它抓取 25,等待 10 秒,然后抓取下一个 25,依此类推。如果maxMessagesPerPoll
配置为负值,则MessageSource.receive()
在单个轮询周期内调用 then ,直到它返回null
。从 5.5 版开始,0
值具有特殊含义 - 跳过MessageSource.receive()
完全调用,这可被视为暂停此轮询端点,直到maxMessagesPerPoll
稍后更改为非零值,例如通过控制总线。
这receiveTimeout
属性指定轮询器在调用接收操作时如果没有可用消息则应等待的时间量。例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个具有 5 秒的间隔触发和 50 毫秒的接收超时,而第二个具有 50 毫秒的间隔触发和接收超时5秒。第一个可能会比它到达通道晚 4950 毫秒收到一条消息(如果该消息在它的一个轮询调用返回后立即到达)。另一方面,第二种配置永远不会错过超过 50 毫秒的消息。不同之处在于第二个选项需要一个线程来等待。但是,因此,它可以更快地响应到达的消息。这种技术,
轮询消费者也可以委托给 Spring TaskExecutor
,如以下示例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外, aPollingConsumer
有一个名为 的属性adviceChain
。此属性允许您指定一个List
AOP 建议来处理包括事务在内的其他横切关注点。这些建议适用于该doPoll()
方法。有关更深入的信息,请参阅Endpoint Namespace Support下有关 AOP 建议链和事务支持的部分。
前面的示例显示了依赖项查找。但是,请记住,这些消费者通常配置为 Spring bean 定义。事实上,Spring Integration 还提供了一个FactoryBean
调用ConsumerEndpointFactoryBean
,可以根据通道的类型创建适当的消费者类型。此外,Spring Integration 具有完整的 XML 命名空间支持,可以进一步隐藏这些细节。本指南中介绍了基于名称空间的配置,因为每个组件类型都被介绍了。
许多MessageHandler 实现可以生成回复消息。如前所述,与接收消息相比,发送消息是微不足道的。然而,发送回复消息的时间和数量取决于处理程序类型。例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游消费者,它可以为它处理的每条消息生成多个回复。使用命名空间配置时,您不必严格了解所有细节。但是,仍然值得知道其中几个组件共享一个公共基类 the AbstractReplyProducingMessageHandler ,并且它提供了一个setOutputChannel(..) 方法。
|
端点命名空间支持
在本参考手册中,您可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。其中大多数支持一个input-channel
属性,许多支持一个output-channel
属性。解析后,这些端点元素会产生一个PollingConsumer
或的实例EventDrivenConsumer
,具体取决于input-channel
所引用的类型:PollableChannel
或SubscribableChannel
。当通道可轮询时,轮询行为基于端点元素的poller
子元素及其属性。
以下清单列出了 a 的所有可用配置选项poller
:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
1 | 提供使用 Cron 表达式配置轮询器的能力。底层实现使用org.springframework.scheduling.support.CronTrigger . 如果设置了此属性,则不得指定以下任何属性:fixed-delay 、trigger 、fixed-rate 和ref 。 |
2 | 通过将此属性设置为true ,您可以准确定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。任何连接到PollableChannel ( PollingConsumer ) 的端点或任何SourcePollingChannelAdapter 没有显式配置轮询器的端点都使用全局默认轮询器。它默认为false . 可选的。 |
3 | 如果在此轮询器的调用中发生故障,则标识将错误消息发送到的通道。要完全抑制异常,您可以提供对nullChannel . 可选的。 |
4 | 固定延迟触发器PeriodicTrigger 在幕后使用 a。如果不使用该time-unit 属性,则指定值以毫秒为单位表示。如果设置了此属性,则不得指定以下任何属性:fixed-rate 、trigger 、cron 和ref 。 |
5 | 固定利率触发器PeriodicTrigger 在幕后使用 a。如果不使用该time-unit 属性,则指定值以毫秒为单位表示。如果设置了此属性,则不得指定以下任何属性:fixed-delay 、trigger 、cron 和ref 。 |
6 | 引用轮询器底层 bean 定义的 ID,其类型为org.springframework.integration.scheduling.PollerMetadata . id 顶级轮询器元素需要该属性,除非它是默认轮询器 ( default="true" )。 |
7 | 有关详细信息,请参阅配置入站通道适配器。如果未指定,则默认值取决于上下文。如果您使用PollingConsumer ,则此属性默认为-1 。但是,如果您使用SourcePollingChannelAdapter ,则该max-messages-per-poll 属性默认为1 。可选的。 |
8 | 在基础类上设置值PollerMetadata 。如果未指定,则默认为 1000(毫秒)。可选的。 |
9 | 对另一个顶级轮询器的 Bean 引用。该ref 属性不得出现在顶级poller 元素上。但是,如果设置了此属性,则不得指定以下任何属性:fixed-rate 、trigger 、cron 和fixed-delay 。 |
10 | 提供引用自定义任务执行器的能力。有关详细信息,请参阅TaskExecutor 支持。可选的。 |
11 | 此属性指定java.util.concurrent.TimeUnit 底层的枚举值org.springframework.scheduling.support.PeriodicTrigger 。因此,该属性只能与fixed-delay orfixed-rate 属性结合使用。如果与其中一个cron 或一个trigger 引用属性结合使用,则会导致失败。a 的最小支持粒度PeriodicTrigger 是毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则任何fixed-delay 或fixed-rate 值都被解释为毫秒。基本上,这个枚举为基于秒的间隔触发值提供了便利。对于每小时、每天和每月设置,我们建议改用cron 触发器。 |
12 | org.springframework.scheduling.Trigger 对实现接口的任何 Spring 配置的 bean 的引用。但是,如果设置了此属性,则不得指定以下任何属性:fixed-delay 、fixed-rate 、cron 和ref 。可选的。 |
13 | 允许指定额外的 AOP 建议来处理额外的横切关注点。有关详细信息,请参阅事务支持。可选的。 |
14 | 轮询器可以进行事务处理。有关详细信息,请参阅AOP 建议链。可选的。 |
例子
一个简单的基于间隔的轮询器,间隔为 1 秒,可以配置如下:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用fixed-rate
属性的替代方法,您还可以使用fixed-delay
属性。
对于基于 Cron 表达式的轮询器,请改用该cron
属性,如以下示例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 a PollableChannel
,则需要轮询器配置。具体来说,如前所述,trigger
是PollingConsumer
类的必需属性。因此,如果您省略poller
轮询消费者端点配置的子元素,则可能会引发异常。如果您尝试在连接到不可轮询通道的元素上配置轮询器,也可能会引发异常。
也可以创建顶级轮询器,在这种情况下只ref
需要一个属性,如以下示例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
该ref 属性仅允许在内部轮询器定义中使用。在顶级轮询器上定义此属性会导致在应用程序上下文初始化期间引发配置异常。
|
全局默认轮询器
为了进一步简化配置,您可以定义一个全局默认轮询器。XML DSL 中的单个顶级轮询器组件可能将default
属性设置为true
. 对于 Java 配置,在这种情况下必须声明PollerMetadata
具有名称的 bean 。PollerMetadata.DEFAULT_POLLER
在这种情况下,任何带有 a 作为其输入通道的端点,PollableChannel
在相同的 中定义ApplicationContext
,并且没有明确配置poller
使用该默认值。下面的例子展示了这样一个轮询器和一个使用它的转换器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlows.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
交易支持
Spring Integration 还为轮询器提供事务支持,以便每个接收和转发操作都可以作为一个原子工作单元执行。要为轮询器配置事务,请添加<transactional/>
子元素。以下示例显示了可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参阅轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于代理机制,通过TransactionInterceptor
(AOP Advice) 处理轮询器发起的消息流的事务行为,您有时必须提供额外的建议来处理与轮询器相关的其他横切行为。为此,poller
定义了一个元素,允许您在实现接口advice-chain
的类中添加更多建议。MethodInterceptor
下面的例子展示了如何advice-chain
为 a 定义一个poller
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现MethodInterceptor
接口的更多信息,请参阅Spring Framework Reference Guide 的 AOP 部分。建议链也可以应用于没有任何事务配置的轮询器,让您增强轮询器启动的消息流的行为。
使用通知链时,<transactional/> 不能指定子元素。相反,声明一个<tx:advice/> bean 并将其添加到<advice-chain/> . 有关完整的配置详细信息,请参阅轮询事务支持。
|
任务执行器支持
轮询线程可以由 SpringTaskExecutor
抽象的任何实例执行。这为一个端点或一组端点启用并发性。从 Spring 3.0 开始,核心 Spring Framework 有一个task
命名空间,它的<executor/>
element 支持创建简单的线程池执行器。该元素接受常见并发设置的属性,例如池大小和队列容量。配置线程池执行器可以对端点在负载下的执行方式产生重大影响。这些设置可用于每个端点,因为端点的性能是要考虑的主要因素之一(另一个主要因素是端点订阅的频道上的预期音量)。要为配置了 XML 命名空间支持的轮询端点启用并发,请提供task-executor
对其<poller/>
元素的引用,然后提供以下示例中显示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供任务执行器,则在调用者的线程中调用消费者的处理程序。请注意,调用者通常是默认调用者TaskScheduler
(请参阅配置任务计划程序)。您还应该记住,该属性可以通过指定 bean 名称task-executor
来提供对 Spring 接口的任何实现的引用。TaskExecutor
前面显示的executor
元素是为了方便而提供的。
正如前面在轮询消费者的背景部分中提到的,您还可以配置轮询消费者以模拟事件驱动的行为。通过较长的接收超时和较短的触发器间隔,即使在轮询消息源上,您也可以确保对到达的消息做出非常及时的反应。请注意,这仅适用于具有超时阻塞等待调用的源。例如,文件轮询器不会阻塞。每个receive()
调用都会立即返回,并且要么包含新文件,要么不包含新文件。因此,即使轮询器包含一个长receive-timeout
,在这种情况下永远不会使用该值。另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。以下示例显示了轮询使用者如何几乎立即接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)一个颠簸的无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询率
fixed-delay
当使用 a或属性配置轮询器时fixed-rate
,默认实现使用PeriodicTrigger
实例。它PeriodicTrigger
是核心 Spring 框架的一部分。它仅接受区间作为构造函数参数。因此,它不能在运行时更改。
但是,您可以定义自己的org.springframework.scheduling.Trigger
接口实现。您甚至可以使用PeriodicTrigger
作为起点。然后你可以为间隔(周期)添加一个设置器,或者你甚至可以在触发器本身中嵌入你自己的节流逻辑。period
每次调用都使用该属性nextExecutionTime
来安排下一次轮询。trigger
要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用引用自定义触发器 bean 实例的属性将依赖项注入轮询器配置。您现在可以获得对触发器 bean 的引用并更改轮询之间的轮询间隔。
例如,请参阅Spring Integration Samples项目。它包含一个名为 的示例dynamic-poller
,该示例使用自定义触发器并演示了在运行时更改轮询间隔的能力。
该示例提供了一个实现该org.springframework.scheduling.Trigger
接口的自定义触发器。该示例的触发器基于 Spring 的PeriodicTrigger
实现。但是,自定义触发器的字段不是最终的,并且属性具有显式的 getter 和 setter,让您可以在运行时动态更改轮询周期。
不过需要注意的是,由于 Trigger 方法是nextExecutionTime() ,因此根据现有配置,对动态触发器的任何更改直到下一次轮询才会生效。无法强制触发器在其当前配置的下一个执行时间之前触发。
|
有效载荷类型转换
在本参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任意任意Object
作为输入参数。在 的情况下Object
,此类参数被映射到消息有效负载或有效负载或标头的一部分(使用 Spring 表达式语言时)。但是,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。在这种情况下,我们需要进行类型转换。Spring Integration 提供了一种方便的方法来ConversionService
在其自己的名为的转换服务 bean 实例中注册类型转换器(通过使用 Spring )integrationConversionService
. 一旦使用 Spring Integration 基础结构定义了第一个转换器,就会自动创建该 bean。要注册转换器,您可以实现org.springframework.core.convert.converter.Converter
、org.springframework.core.convert.converter.GenericConverter
或org.springframework.core.convert.converter.ConverterFactory
。
实现是最简单的Converter
,可以从一种类型转换为另一种类型。对于更复杂的情况,例如转换为类层次结构,您可以实现 aGenericConverter
并且可能实现 a ConditionalConverter
。这些使您可以完全访问from
和to
类型描述符,从而实现复杂的转换。例如,如果您有一个名为的抽象类Something
,它是您转换的目标(参数类型、通道数据类型等),您有两个名为Thing1
and的具体实现Thing
,并且您希望转换为基于输入类型,这GenericConverter
将是一个很好的选择。有关更多信息,请参阅这些接口的 Javadoc:
实现转换器后,可以使用方便的命名空间支持注册它,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 bean,如以下示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注解来创建上述配置,如以下示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用@Configuration
注释,如以下示例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时,Spring 框架允许您添加一个 相反, 但是,如果您确实想使用 Spring
在这种情况下,提供的转换器 |
内容类型转换
从 5.0 版本开始,默认情况下,方法调用机制是基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基础架构的。它的HandlerMethodArgumentResolver
实现(例如PayloadArgumentResolver
and MessageMethodArgumentResolver
)可以使用MessageConverter
抽象将传入payload
的方法参数类型转换为目标方法。转换可以基于contentType
消息头。为此,Spring Integration 提供了ConfigurableCompositeMessageConverter
,它委托给要调用的已注册转换器列表,直到其中一个返回非空结果。默认情况下,此转换器提供(按严格顺序):
contentType
有关它们的用途和适当的转换值的更多信息,请参阅 Javadoc(链接在前面的列表中) 。使用ConfigurableCompositeMessageConverter
是因为它可以与任何其他MessageConverter
实现一起提供,包括或排除前面提到的默认转换器。它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如以下示例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在默认值之前在复合中注册。您也可以不使用 a ,而是通过使用名称注册一个 bean 来ConfigurableCompositeMessageConverter
提供您自己的(通过设置属性)。MessageConverter
integrationArgumentResolverMessageConverter
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
使用 SpEL 方法调用时,MessageConverter 基于 - 的(包括contentType 标头)转换不可用。在这种情况下,只有上述Payload Type Conversion中提到的常规类到类转换可用。
|
异步轮询
如果您希望轮询是异步的,轮询器可以选择指定一个task-executor
指向任何TaskExecutor
bean 的现有实例的属性(Spring 3.0 通过命名空间提供了方便的命名空间配置task
)。但是,在使用TaskExecutor
.
问题是有两种配置,轮询器和TaskExecutor
. 它们必须相互协调。否则,您最终可能会造成人为的内存泄漏。
考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了一个失调的配置。
默认情况下,任务执行器有一个无界的任务队列。即使所有线程都被阻塞,轮询器也会继续调度新任务,等待新消息到达或超时到期。假设有 20 个线程执行超时 5 秒的任务,它们以每秒 4 个的速率执行。但是,新任务以每秒 20 个的速度被调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(当进程处于空闲状态时),所以我们有内存泄漏。
处理此问题的方法之一是设置queue-capacity
任务执行器的属性。即使是 0 也是一个合理的值。rejection-policy
您还可以通过设置Task Executor 的属性(例如 to DISCARD
)来指定如何处理无法排队的消息来管理它。换句话说,在配置时必须了解某些细节TaskExecutor
。有关该主题的更多详细信息,请参阅Spring 参考手册中的“任务执行和调度”。
端点内部 Bean
许多端点是复合 bean。这包括所有消费者和所有轮询的入站通道适配器。消费者(轮询或事件驱动)委托给MessageHandler
. 轮询适配器通过委托给MessageSource
. 通常,获取对委托 bean 的引用很有用,也许可以在运行时更改配置或进行测试。这些 bean 可以从ApplicationContext
具有众所周知的名称的 中获得。
MessageHandler
实例在应用程序上下文中注册,其 bean ID 类似于someConsumer.handler
(其中“消费者”是端点id
属性的值)。
MessageSource
实例使用类似于 的 bean ID 注册somePolledAdapter.source
,其中 'somePolledAdapter' 是适配器的 ID。
以上仅适用于框架组件本身。您可以改为使用内部 bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
bean 被视为任何声明的内部 bean,并且未在应用程序上下文中注册。如果您希望以其他方式访问此 bean,请在顶层使用 an 声明它,id
并改用该ref
属性。有关更多信息,请参阅Spring 文档。
端点角色
从版本 4.2 开始,可以将端点分配给角色。角色允许端点作为一个组启动和停止。这在使用领导选举时特别有用,当领导被授予或撤销时,一组端点可以分别启动或停止。为此,框架SmartLifecycleRoleController
在应用程序上下文中注册一个名为 的 bean IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER
。每当需要控制生命周期时,可以注入此 bean 或@Autowired
:
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
您可以使用 XML、Java 配置或以编程方式将端点分配给角色。以下示例显示如何使用 XML 配置端点角色:
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
以下示例显示了如何为在 Java 中创建的 bean 配置端点角色:
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
以下示例显示如何在 Java 中的方法上配置端点角色:
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
以下示例显示如何使用SmartLifecycleRoleController
Java 配置端点角色:
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
以下示例显示如何使用IntegrationFlow
Java 配置端点角色:
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
这些中的每一个都将端点添加到cluster
角色中。
调用roleController.startLifecyclesInRole("cluster")
和相应的stop…
方法启动和停止端点。
任何实现的对象SmartLifecycle 都可以通过编程方式添加——不仅仅是端点。
|
当领导权被授予或撤销时(当某些 bean 分别发布或)时,SmartLifecycleRoleController
实现ApplicationListener<AbstractLeaderEvent>
和它会自动启动和停止其配置的对象。SmartLifecycle
OnGrantedEvent
OnRevokedEvent
当使用领导选举来启动和停止组件时,设置auto-startup XML 属性(autoStartup bean 属性)很重要,false 这样应用程序上下文就不会在上下文初始化期间启动组件。
|
从 4.3.8 版本开始,SmartLifecycleRoleController
提供了几种状态方法:
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 | 返回被管理角色的列表。 |
2 | true 如果角色中的所有端点都在运行,则返回。 |
3 | true 如果角色中没有任何端点正在运行,则返回。 |
4 | 返回 的地图component name : running status 。组件名称通常是 bean 名称。 |
领导事件处理
可以根据授予或撤销的领导权分别启动和停止端点组。这在共享资源必须仅由单个实例使用的集群场景中很有用。这方面的一个示例是轮询共享目录的文件入站通道适配器。(请参阅阅读文件)。
To participate in a leader election and be notified when elected leader, when leadership is revoked, or on failure to acquire the resources to become leader, an application creates a component in the application context called a “leader initiator”. 通常,领导发起者是 a SmartLifecycle
,因此它在上下文启动时启动(可选),然后在领导改变时发布通知。您还可以通过将 设置publishFailedEvents
为true
(从版本 5.0 开始)来接收故障通知,以便在发生故障时采取特定操作。按照惯例,您应该提供一个Candidate
接收回调的。您还可以通过Context
框架提供的对象撤销领导。您的代码还可以侦听o.s.i.leader.event.AbstractLeaderEvent
实例(OnGrantedEvent
和OnRevokedEvent
) 并做出相应的响应(例如,使用 a SmartLifecycleRoleController
)。事件包含对Context
对象的引用。以下清单显示了Context
接口的定义:
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
从版本 5.0.6 开始,上下文提供了对候选人角色的引用。
Spring Integration 提供了基于LockRegistry
抽象的领导发起者的基本实现。要使用它,您需要创建一个实例作为 bean,如以下示例所示:
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
如果锁注册表被正确实现,那么最多只有一个领导者。如果锁注册表还提供了InterruptedException
在过期或被破坏时抛出异常(理想情况下)的锁,那么无领导周期的持续时间可以短于锁实现中的固有延迟所允许的时间。默认情况下,该busyWaitMillis
属性会增加一些额外的延迟,以防止在(更常见的)锁不完美且您仅在尝试再次获取锁时才知道它们已过期的情况下(更常见的)CPU 饥饿。
有关使用 Zookeeper 的领导选举和事件的更多信息,请参阅Zookeeper 领导事件处理。
消息传递网关
网关隐藏了 Spring Integration 提供的消息传递 API。它让您的应用程序的业务逻辑不知道 Spring Integration API。通过使用通用网关,您的代码只与一个简单的界面交互。
输入GatewayProxyFactoryBean
如前所述,不依赖 Spring Integration API 会很棒——包括网关类。出于这个原因,Spring Integration 提供了GatewayProxyFactoryBean
,它为任何接口生成代理并在内部调用如下所示的网关方法。通过使用依赖注入,您可以将接口暴露给您的业务方法。
以下示例显示了一个可用于与 Spring Integration 交互的接口:
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
网关 XML 命名空间支持
还提供命名空间支持。它允许您将接口配置为服务,如以下示例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义此配置后,cafeService
现在可以将其注入到其他 bean 中,并且调用该Cafe
接口代理实例上的方法的代码不知道 Spring Integration API。一般方法类似于 Spring Remoting(RMI、HttpInvoker 等)。有关使用该元素的示例(在 Cafe 演示中) ,请参见“示例”附录。gateway
上述配置中的默认值适用于网关接口上的所有方法。如果未指定回复超时,则调用线程将无限期地等待回复。请参阅无响应到达时的网关行为。
可以覆盖单个方法的默认值。请参阅带有注释和 XML 的网关配置。
设置默认回复渠道
通常,您不需要指定default-reply-channel
,因为网关会自动创建一个临时的匿名回复通道,它会在其中侦听回复。但是,某些情况下可能会提示您定义一个default-reply-channel
(或reply-channel
使用适配器网关,例如 HTTP、JMS 等)。
对于一些背景,我们简要讨论网关的一些内部工作原理。网关创建一个临时的点对点回复通道。它是匿名的,并以名称添加到邮件标题中replyChannel
。在提供显式default-reply-channel
(reply-channel
带有远程适配器网关)时,您可以指向发布-订阅通道,该通道之所以如此命名是因为您可以向其添加多个订阅者。在内部,Spring Integration 在临时replyChannel
和明确定义的default-reply-channel
.
假设您希望您的回复不仅发送到网关,还发送到其他消费者。在这种情况下,您需要两件事:
-
您可以订阅的命名频道
-
该频道将成为发布-订阅-频道
网关使用的默认策略不能满足这些需求,因为添加到标头的回复通道是匿名的和点对点的。这意味着没有其他订阅者可以处理它,即使可以,通道也具有点对点行为,因此只有一个订阅者会收到消息。通过定义 adefault-reply-channel
您可以指向您选择的频道。在这种情况下,这是一个publish-subscribe-channel
. 网关创建一个从它到存储在标头中的临时匿名回复通道的桥。
您可能还希望通过拦截器(例如,窃听器)显式提供用于监视或审计的回复通道。要配置通道拦截器,您需要一个命名通道。
从版本 5.4 开始,当网关方法返回类型为 时,如果未显式提供此类标头,则框架会将标头void 填充为 bean 引用。这允许丢弃来自下游流的任何可能的回复,以满足单向网关合同。
replyChannel nullChannel |
带有注释和 XML 的网关配置
考虑以下示例,该Cafe
示例通过添加@Gateway
注释扩展了前面的接口示例:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
注释允许您添加被解释为消息头的@Header
值,如以下示例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢 XML 方法来配置网关方法,则可以将method
元素添加到网关配置中,如以下示例所示:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。如果您要设置的标头本质上是静态的,并且您不想通过使用@Header
注释将它们嵌入网关的方法签名中,这可能很有用。例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。尽管可能通过评估调用哪个网关方法来确定请求的类型,但会违反关注点分离范式(该方法是 Java 工件)。但是,在消息头中表达您的意图(元信息)在消息传递体系结构中是很自然的。以下示例显示如何为两种方法中的每一种添加不同的消息头:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法为“RESPONSE_TYPE”标头设置了不同的值。
例如,如果您指定requestChannel in<int:method/> 和 in@Gateway 注释,则注释值获胜。
|
如果在 XML 中指定了无参数网关,并且接口方法同时具有 a@Payload 和@Gateway 注释(在元素中带有 apayloadExpression 或 a ),则忽略该值。
payload-expression <int:method/> @Payload |
表达式和“全局”标题
该<header/>
元素支持expression
作为value
. 评估 SpEL 表达式以确定标头的值。从版本 5.2 开始,#root
评估上下文的对象是MethodArgsHolder
withgetMethod()
和getArgs()
访问器。
这两个表达式评估上下文变量自 5.2 版起已弃用:
-
#args:
Object[]
包含方法参数的一个 -
#gatewayMethod:表示被调用
java.reflect.Method
的方法的对象(派生自)。service-interface
包含此变量的标头可以稍后在流程中使用(例如,用于路由)。例如,如果您希望在简单方法名称上进行路由,您可以添加带有以下表达式的标头:#gatewayMethod.name
.
java.reflect.Method 是不可序列化
的。如果您稍后序列化消息,则带有表达式的标头method 会丢失。因此,您可能希望在这些情况下使用method.name 或。方法提供方法的method.toString() 表示,包括参数和返回类型。
toString() String |
从 3.0 版开始,<default-header/>
可以定义元素以向网关生成的所有消息添加标头,而不管调用的方法如何。为方法定义的特定标头优先于默认标头。此处为方法定义的特定标头会覆盖@Header
服务接口中的任何注释。但是,默认标头不会覆盖@Header
服务接口中的任何注释。
网关现在还支持 a default-payload-expression
,它适用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。当没有使用显式配置时,使用某些约定来执行映射。在某些情况下,这些约定无法确定哪个参数是有效负载,哪个应该映射到标头。考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 a Map
),第二个参数的内容成为标头。
在第二种情况下(或第一种情况,当参数的参数thing1
是 a时Map
),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以使用payload-expression
、@Payload
注释或@Headers
注释来解决。
或者(并且每当约定失效时),您可以承担将方法调用映射到消息的全部责任。为此,请实现 anMethodArgsMessageMapper
并<gateway/>
使用mapper
属性将其提供给 。映射器映射 a MethodArgsHolder
,这是一个包装java.reflect.Method
实例和Object[]
包含参数的简单类。提供自定义映射器时,网关上不允许使用default-payload-expression
属性和元素。<default-header/>
同样,payload-expression
属性和<header/>
元素也不允许出现在任何<method/>
元素上。
映射方法参数
以下示例展示了如何将方法参数映射到消息,并展示了一些无效配置的示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("#args[0] + #args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(#args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在本例中,SpEL 变量 ,#this 指的是参数——在本例中为 的值s 。 |
XML 等效项看起来有些不同,因为方法参数没有#this
上下文。但是,表达式可以通过使用变量来引用方法参数#args
,如以下示例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="#args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(#args[0])"/>
<int:method name="send3" payload-expression="#method"/>
<int:method name="send4">
<int:header name="thing1" expression="#args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注解
从 4.0 版本开始,网关服务接口可以使用@MessagingGateway
注解进行标记,而不需要定义<gateway />
xml 元素进行配置。以下一对示例比较了配置同一网关的两种方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注释时,它会proxy 使用其消息传递基础架构创建实现。要执行此扫描并BeanDefinition 在应用程序上下文中注册,请将@IntegrationComponentScan 注释添加到@Configuration 类中。标准@ComponentScan 基础设施不处理接口。因此,我们引入了自定义@IntegrationComponentScan 逻辑来细化@MessagingGateway 接口上的注释并GatewayProxyFactoryBean 为它们注册实例。另请参阅注释支持。
|
与@MessagingGateway
注释一起,您可以使用注释标记服务接口@Profile
以避免创建 bean,如果这样的配置文件不活动。
如果您没有 XML 配置,则@EnableIntegration 至少一个@Configuration 类需要注释。有关详细信息,请参阅配置@EnableIntegration 。
|
调用无参数方法
在网关接口上调用没有任何参数的方法时,默认行为是Message
从 a 接收 a PollableChannel
。
但是,有时您可能希望触发无参数方法,以便您可以与不需要用户提供参数的下游其他组件交互,例如触发无参数 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。要生成有效负载,不需要接口上的方法参数。您可以在元素上使用 XML 中的@Payload
注释或payload-expression
属性。method
以下列表包括一些有效负载的示例:
-
文字字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例显示了如何使用@Payload
注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您也可以使用@Gateway
注释。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注释都存在(并且payloadExpression 提供了),则@Gateway 获胜。
|
另请参阅带有注释和 XML 的网关配置。
如果方法没有参数也没有返回值,但包含有效负载表达式,则将其视为仅发送操作。
调用default
方法
网关代理的接口也可能有default
方法,从 5.3 版本开始,框架将 a注入到代理中,以使用方法而不是DefaultMethodInvokingMethodInterceptor
代理来调用方法。来自 JDK 的接口,例如,仍可用于网关代理,但由于内部 Java 安全原因,无法调用它们的方法,以针对 JDK 类进行实例化。这些方法也可以使用方法或注释或XML 组件上的显式注释来代理(丢失它们的实现逻辑,同时恢复以前的网关代理行为) 。default
java.lang.invoke.MethodHandle
java.util.function.Function
default
MethodHandles.Lookup
@Gateway
proxyDefaultMethods
@MessagingGateway
<gateway>
错误处理
网关调用可能会导致错误。默认情况下,下游发生的任何错误都会在网关的方法调用时“按原样”重新抛出。例如,考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出 a MyException
(例如),框架会将其包装在 a 中MessagingException
,并将传递给服务激活器的消息附加到failedMessage
属性中。因此,框架执行的任何日志记录都具有完整的故障上下文。默认情况下,当网关捕获到异常时,会将其MyException
解包并抛出给调用者。您可以在网关方法声明上配置一个throws
子句,以匹配原因链中的特定异常类型。例如,如果你想捕获一个MessagingException
包含下游错误原因的所有消息传递信息的整体,你应该有一个类似于以下的网关方法:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,您可能不希望将调用者暴露给消息传递基础设施。
如果您的网关方法没有throws
子句,则网关会遍历原因树,寻找RuntimeException
不是 a 的 a MessagingException
。如果没有找到,框架会抛出MessagingException
. 如果MyException
前面讨论中的原因是SomeOtherException
和你的方法throws SomeOtherException
,网关会进一步解包并将其扔给调用者。
当网关声明为 noservice-interface
时,将使用内部框架接口RequestReplyExchanger
。
考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在 5.0 版本之前,此exchange
方法没有throws
子句,因此异常被解包。如果您使用此接口并希望恢复之前的展开行为,请改用自定义service-interface
或访问您自己cause
的MessagingException
。
但是,您可能希望记录错误而不是传播它,或者您可能希望将异常视为有效回复(通过将其映射到符合调用者理解的某些“错误消息”协定的消息)。为此,网关通过包含对error-channel
属性的支持来提供对专用于错误的消息通道的支持。在以下示例中,“transformer”创建Message
来自 的回复Exception
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
这exceptionTransformer
可能是一个简单的 POJO,它知道如何创建预期的错误响应对象。这成为发送回调用者的有效负载。如有必要,您可以在这样的“错误流”中做更多复杂的事情。它可能涉及路由器(包括 Spring Integration 的ErrorMessageExceptionTypeRouter
)、过滤器等。然而,大多数时候,一个简单的“变压器”就足够了。
或者,您可能只想记录异常(或将其异步发送到某处)。如果您提供单向流,则不会将任何内容发送回调用者。如果要完全抑制异常,可以提供对全局的引用nullChannel
(本质上是一种/dev/null
方法)。最后,如上所述,如果没有error-channel
定义,那么异常会照常传播。
当您使用@MessagingGateway
注解(参见 参考资料
)时,您可以使用使用@MessagingGateway
AnnotationerrorChannel
属性。
从版本 5.0 开始,当您使用具有void
返回类型(单向流)的网关方法时,error-channel
引用(如果提供)将填充到errorChannel
每个已发送消息的标准标头中。此功能允许基于标准ExecutorChannel
配置(或 a QueueChannel
)的下游异步流覆盖默认的全局errorChannel
异常发送行为。以前,您必须手动指定带有注释或元素的errorChannel
标题。对于具有异步流的方法,该属性被忽略。相反,错误消息被发送到默认的.@GatewayHeader
<header>
error-channel
void
errorChannel
通过简单的 POJI 网关公开消息系统会带来好处,但“隐藏”底层消息系统的现实确实是有代价的,因此您应该考虑一些事情。我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期挂起(无论是 void、返回值还是抛出的异常)。当常规方法用作消息系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。这意味着由网关发起的消息可能会被过滤器丢弃,并且永远不会到达负责产生回复的组件。某些服务激活方法可能会导致异常,因此不提供回复(因为我们不生成空消息)。换句话说,多种情况可能导致回复消息永远不会出现。这在消息传递系统中是非常自然的。但是,考虑一下网关方法的含义。网关的方法输入参数被合并到消息中并发送到下游。回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。否则,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方法是显式设置 这在消息传递系统中是非常自然的。但是,考虑一下网关方法的含义。网关的方法输入参数被合并到消息中并发送到下游。回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。否则,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方法是显式设置 这在消息传递系统中是非常自然的。但是,考虑一下网关方法的含义。网关的方法输入参数被合并到消息中并发送到下游。回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。否则,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方法是显式设置 回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。否则,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方法是显式设置 回复消息将被转换为网关方法的返回值。因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。否则,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后解释)。另一种处理方法是显式设置reply-timeout 属性。这样,网关的挂起时间不会超过 指定的时间,reply-timeout 并且如果超时确实过去,则返回“null”。最后,您可能需要考虑在服务激活器上设置下游标志,例如“requires-reply”或在过滤器上设置“throw-exceptions-on-rejection”。这些选项将在本章的最后一节中更详细地讨论。
|
如果下游流返回 a ErrorMessage ,则其payload (a Throwable ) 被视为常规下游错误。如果有error-channel 配置,则将其发送到错误流。否则,有效负载将被扔给网关的调用者。同样,如果错误流error-channel 返回一个ErrorMessage ,它的有效负载被抛出给调用者。这同样适用于任何带有Throwable 有效负载的消息。当您需要将 anException 直接传播给调用者时,这在异步情况下很有用。为此,您可以返回一个Exception (作为reply 来自某些服务的)或抛出它。通常,即使使用异步流,框架也会负责将下游流抛出的异常传播回网关。这TCP Client-Server Multiplex示例演示了将异常返回给调用者的两种技术。它通过使用aggregator with group-timeout (参见Aggregator and Group Timeout)和MessagingTimeoutException 对丢弃流的回复来模拟等待线程的套接字 IO 错误。
|
网关超时
网关有两个超时属性:requestTimeout
和replyTimeout
. 请求超时仅适用于通道可以阻塞(例如,QueueChannel
已满的有界)。该replyTimeout
值是网关等待回复或返回的时间null
。它默认为无穷大。
可以将超时设置为网关(defaultRequestTimeout
和defaultReplyTimeout
)或MessagingGateway
接口注释上的所有方法的默认值。个别方法可以覆盖这些默认值(在<method/>
子元素中)或@Gateway
注释上。
从 5.0 版开始,可以将超时定义为表达式,如以下示例所示:
@Gateway(payloadExpression = "#args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "#args[1]", replyTimeoutExpression = "#args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文有一个BeanResolver
(用于@someBean
引用其他bean),并且#args
数组变量可用。
使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如以下示例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="#args[0]"
request-timeout="1000"
reply-timeout="#args[1]">
</method>
异步网关
作为一种模式,消息传递网关提供了一种很好的方式来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。如前所述,_GatewayProxyFactoryBean
提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己域中的对象、原语/字符串或其他对象)。但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。由于消息传递系统自然是异步的,因此您可能无法始终保证“对于每个请求,总会有回复”的约定。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复需要多长时间时,它提供了一种启动流程的便捷方式。
为了处理这些类型的场景,Spring Integration 使用java.util.concurrent.Future
实例来支持异步网关。
从 XML 配置来看,没有任何变化,您仍然可以像定义常规网关一样定义异步网关,如以下示例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(一个服务接口)有点不同,如下:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的示例所示,网关方法的返回类型是 a Future
。当GatewayProxyFactoryBean
看到网关方法的返回类型是 aFuture
时,立即使用 a 切换到异步模式AsyncTaskExecutor
。这就是差异的程度。对这种方法的调用总是立即返回一个Future
实例。Future
然后您可以按照自己的节奏与 进行交互以获得结果、取消等。此外,与Future
实例的任何其他使用一样,调用get()
可能会显示超时、执行异常等。以下示例显示如何使用Future
从异步网关返回的 a:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring Integration 示例中的async-gateway示例。
ListenableFuture
从 4.1 版本开始,异步网关方法也可以返回ListenableFuture
(在 Spring Framework 4.0 中引入)。这些返回类型允许您提供一个回调,当结果可用(或发生异常)时调用该回调。当网关检测到这个返回类型并且任务执行器是一个AsyncListenableTaskExecutor
时,执行器的submitListenable()
方法被调用。以下示例显示了如何使用 a ListenableFuture
:
ListenableFuture<String> result = this.asyncGateway.async("something");
result.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
...
}
@Override
public void onFailure(Throwable t) {
...
}
});
AsyncTaskExecutor
默认情况下,在为返回类型为 a 的任何网关方法提交内部实例时GatewayProxyFactoryBean
使用。但是,元素配置中的属性允许您提供对 Spring 应用程序上下文中可用的任何实现的引用。org.springframework.core.task.SimpleAsyncTaskExecutor
AsyncInvocationTask
Future
async-executor
<gateway/>
java.util.concurrent.Executor
(默认)SimpleAsyncTaskExecutor
同时支持Future
和ListenableFuture
返回类型,分别返回FutureTask
或ListenableFutureTask
。见CompletableFuture
。即使有一个默认的执行器,提供一个外部的执行器通常很有用,这样您就可以在日志中识别它的线程(使用 XML 时,线程名称基于执行器的 bean 名称),如以下示例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望返回不同的Future
实现,您可以提供自定义执行器或完全禁用执行器并Future
从下游流返回回复消息有效负载。要禁用执行器,请将其设置为null
(GatewayProxyFactoryBean
通过使用setAsyncTaskExecutor(null)
)。使用 XML 配置网关时,请使用async-executor=""
. 使用@MessagingGateway
注解进行配置时,使用类似如下的代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的具体Future 实现或配置的执行程序不支持的其他子接口,则流程在调用者的线程上运行,并且流程必须在回复消息有效负载中返回所需的类型。
|
CompletableFuture
从 4.2 版开始,网关方法现在可以返回CompletableFuture<?>
. 返回此类型时有两种操作模式:
-
当提供了异步执行器并且返回类型准确
CompletableFuture
(不是子类)时,框架在执行器上运行任务并立即将 a 返回CompletableFuture
给调用者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
用于创造未来。 -
当异步执行器显式设置为
null
且返回类型为CompletableFuture
或返回类型为 的子类时CompletableFuture
,将在调用者的线程上调用流程。在这种情况下,下游流应返回CompletableFuture
适当类型的 a。
使用场景
在以下场景中,调用者线程立即返回 a CompletableFuture<Invoice>
,当下游流回复网关(带有Invoice
对象)时完成。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,CompletableFuture<Invoice>
当下游流将其作为回复的有效负载提供给网关时,调用者线程返回 a。当发票准备好时,其他一些过程必须完成未来。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,CompletableFuture<Invoice>
当下游流将其作为回复的有效负载提供给网关时,调用者线程返回 a。当发票准备好时,其他一些过程必须完成未来。如果DEBUG
启用了日志记录,则会发出一个日志条目,指示异步执行器不能用于此场景。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行额外的操作,如以下示例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应堆Mono
从 5.0 版开始,GatewayProxyFactoryBean
允许使用带有网关接口方法的Project Reactor,使用Mono<T>
返回类型。内部AsyncInvocationTask
包裹在一个Mono.fromCallable()
.
AMono
可用于稍后检索结果(类似于 a ),或者您可以通过在结果返回到网关时Future<?>
调用 your 来使用调度程序使用它。Consumer
Mono 框架不会立即刷新
。因此,在网关方法返回之前,底层消息流不会启动(与Future<?> Executor 任务一样)。当Mono 订阅 时,流程开始。或者,Mono 当subscribe() 与整个Flux . 以下示例显示如何使用 Project Reactor 创建网关:
|
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
另一个使用 Project Reactor 的示例是一个简单的回调场景,如以下示例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,handleInvoice()
当流程完成时被调用。
返回异步类型的下游流
如上ListenableFuture
节所述,如果您希望某些下游组件返回带有异步负载(Future
、、Mono
等)的消息,则必须将异步执行器显式设置为null
(或""
在使用 XML 配置时)。然后在调用者线程上调用流程,稍后可以检索结果。
void
返回类型
与前面提到的返回类型不同,当方法返回类型为void
时,框架无法隐式确定您希望下游流异步运行,调用者线程立即返回。在这种情况下,您必须使用 注释接口方法@Async
,如以下示例所示:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
与Future<?>
返回类型不同,如果流程抛出了某些异常,则无法通知调用者,除非某些自定义TaskExecutor
(例如ErrorHandlingTaskExecutor
)与@Async
注释相关联。
无响应时的网关行为
如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。然而,一个典型的方法调用,通常期望总是返回(即使有异常),可能并不总是一对一地映射到消息交换(例如,回复消息可能不会到达 - 相当于方法没有返回)。
本节的其余部分涵盖各种场景以及如何使网关的行为更具可预测性。可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是像您预期的那样工作。其中之一是reply-timeout
(在方法级别或default-reply-timeout
网关级别)。我们检查该reply-timeout
属性以了解它如何影响和不能影响同步网关在各种场景中的行为。我们检查单线程场景(所有下游组件都通过直接通道连接)和多线程场景(例如,在下游某处,您可能有一个可轮询或执行器通道,它打破了单线程边界)。
下游长期运行的流程
- 同步网关,单线程
-
如果下游的某个组件还在运行(可能是因为死循环或者服务慢),设置a
reply-timeout
没有任何作用,网关方法调用直到下游服务退出(通过返回或抛出异常)才返回。 - 同步网关,多线程
-
如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务缓慢),
reply-timeout
则GatewayProxyFactoryBean
设置回复通道,等待消息直到超时到期。但是,如果在生成实际回复之前已达到超时,则可能导致网关方法返回“null”。您应该了解,回复消息(如果已生成)在网关方法调用可能返回后发送到回复通道,因此您必须意识到这一点并在设计流程时牢记这一点。
下游组件返回“null”
- 同步网关——单线程
-
如果下游组件返回“null”并且没有
reply-timeout
配置,则网关方法调用将无限期挂起,除非reply-timeout
已配置 a 或requires-reply
已在下游组件(例如,服务激活器)上设置了可能返回“null”的属性. 在这种情况下,将抛出异常并将其传播到网关。 - 同步网关——多线程
-
行为与前一种情况相同。
下游组件返回签名为“无效”,而网关方法签名为非无效
- 同步网关——单线程
-
如果下游组件返回 'void' 并且
reply-timeout
已配置 no,则网关方法调用将无限期挂起,除非reply-timeout
已配置 a。 - 同步网关——多线程
-
行为与前一种情况相同。
下游组件导致运行时异常
- 同步网关——单线程
-
如果下游组件抛出运行时异常,异常会通过错误消息传播回网关并重新抛出。
- 同步网关——多线程
-
行为与前一种情况相同。
您应该明白,默认情况下reply-timeout 是无界的。因此,如果您没有显式设置reply-timeout ,您的网关方法调用可能会无限期挂起。因此,为了确保您分析您的流程,并且如果这些场景之一发生的可能性很小,您应该将该reply-timeout 属性设置为“安全”值。更好的是,您可以将requires-reply 下游组件的属性设置为 'true' 以确保及时响应,正如下游组件在内部返回 null 时立即抛出异常所产生的那样。但是,您还应该意识到,在某些情况下(请参见第一个)reply-timeout 没有帮助。这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。如前所述,后一种情况是定义返回Future 实例的网关方法的问题。然后,您可以保证接收到该返回值,并且您可以更精细地控制调用结果。此外,在处理路由器时,您应该记住将resolution-required 属性设置为“true”会导致路由器在无法解析特定通道时抛出异常。同样,在处理过滤器时,您可以设置throw-exception-on-rejection 属性。在这两种情况下,结果流的行为就像它包含一个具有“requires-reply”属性的服务激活器。换句话说,它有助于确保网关方法调用的及时响应。
|
reply-timeout 对于<gateway/> 元素是无界的(由 创建GatewayProxyFactoryBean )。用于外部集成(WS、HTTP 等)的入站网关与这些网关共享许多特征和属性。但是,对于那些入站网关,默认reply-timeout 值为 1000 毫秒(一秒)。如果下游异步切换到另一个线程,您可能需要增加此属性以在网关超时之前留出足够的时间让流程完成。
|
您应该了解,计时器在线程返回网关时启动——即,当流程完成或消息被移交给另一个线程时。此时,调用线程开始等待回复。如果流程完全同步,则回复立即可用。对于异步流,线程最多等待这个时间。 |
请参阅Java DSL 章节中的网关,了解IntegrationFlow
通过IntegrationFlows
.
服务激活器
服务激活器是将任何 Spring 管理的对象连接到输入通道的端点类型,以便它可以扮演服务的角色。如果服务产生输出,它也可以连接到输出通道。或者,输出生成服务可能位于处理管道或消息流的末端,在这种情况下,replyChannel
可以使用入站消息的标头。如果未定义输出通道,这是默认行为。与此处描述的大多数配置选项一样,相同的行为实际上适用于大多数其他组件。
配置服务激活器
要创建服务激活器,请使用具有 'input-channel' 和 'ref' 属性的 'service-activator' 元素,如以下示例所示:
<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>
上述配置选择了exampleHandler
满足其中一种消息传递要求的所有方法,如下所示:
-
注释
@ServiceActivator
-
是
public
-
void
如果不返回requiresReply == true
在运行时调用的目标方法是通过它们的类型为每个请求消息选择的,或者如果目标类上存在这样的方法,则payload
作为该类型的回退。Message<?>
从 5.0 版开始,一种服务方法可以用 标记@org.springframework.integration.annotation.Default
为所有不匹配情况的后备。这在使用内容类型转换和转换后调用的目标方法时很有用。
要委托给任何对象的明确定义的方法,您可以添加该method
属性,如以下示例所示:
<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
在任何一种情况下,当服务方法返回非空值时,端点都会尝试将回复消息发送到适当的回复通道。为了确定回复通道,它首先检查output-channel
端点配置中是否提供了 an,如以下示例所示:
<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
ref="somePojo" method="someMethod"/>
如果该方法返回结果并且没有output-channel
定义,则框架然后检查请求消息的replyChannel
标头值。如果该值可用,则检查其类型。如果是MessageChannel
,则将回复消息发送到该通道。如果是 a String
,则端点尝试将通道名称解析为通道实例。如果通道无法解析,DestinationResolutionException
则抛出 a。它可以解决,消息发送到那里。如果请求消息没有replyChannel
标头且reply
对象是 a Message
,则查询其replyChannel
标头以获取目标目的地。这是 Spring Integration 中用于请求-回复消息传递的技术,也是返回地址模式的一个示例。
如果您的方法返回结果并且您想丢弃它并结束流程,您应该将 配置output-channel
为发送到NullChannel
. 为方便起见,框架注册了一个名称为nullChannel
. 有关详细信息,请参阅特殊频道。
服务激活器是不需要生成回复消息的组件之一。如果您的方法返回null
或具有void
返回类型,则服务激活器在方法调用后退出,没有任何信号。此行为可以由选项控制,该选项在使用 XML 命名空间进行配置时AbstractReplyProducingMessageHandler.requiresReply
也会公开。requires-reply
如果标志设置为true
并且方法返回 null,ReplyRequiredException
则抛出 a。
服务方法中的参数可以是消息或任意类型。如果是后者,则假定它是消息负载,从消息中提取并注入到服务方法中。我们通常推荐这种方法,因为它在使用 Spring Integration 时遵循并促进了 POJO 模型。参数也可能有@Header
或@Headers
注释,如注释支持中所述。
服务方法不需要任何参数,这意味着您可以实现事件样式的服务激活器(您只关心服务方法的调用)而不必担心消息的内容。将其视为空 JMS 消息。这种实现的一个示例用例是一个简单的计数器或输入通道上存放的消息的监视器。 |
从版本 4.1 开始,框架将消息属性 (payload
和headers
) 正确转换为 Java 8 Optional
POJO 方法参数,如以下示例所示:
public class MyBean {
public String computeValue(Optional<String> payload,
@Header(value="foo", required=false) String foo1,
@Header(value="foo") Optional<String> foo2) {
if (payload.isPresent()) {
String value = payload.get();
...
}
else {
...
}
}
}
ref
如果自定义服务激活器处理程序实现可以在其他<service-activator>
定义中重用,我们通常建议使用属性。但是,如果自定义服务激活器处理程序实现仅在 的单个定义中使用<service-activator>
,您可以提供内部 bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="someMethod">
<beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>
不允许在同一配置中
同时使用ref 属性和内部处理程序定义,因为它会创建模棱两可的条件并导致引发异常。<service-activator> |
如果该ref 属性引用了一个扩展的bean AbstractMessageProducingHandler (例如框架本身提供的处理程序),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个都ref 必须是一个单独的 bean 实例(或一个prototype -scoped bean)或使用内部<bean/> 配置类型。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。
|
服务激活器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,服务激活器也可以从SpEL中受益。
例如,您可以调用任何 bean 方法,而无需在ref
属性中指向 bean 或将其作为内部 bean 定义包括在内,如下所示:
<int:service-activator input-channel="in" output-channel="out"
expression="@accountService.processAccount(payload, headers.accountId)"/>
<bean id="accountService" class="thing1.thing2.Account"/>
在前面的配置中,我们不是通过使用ref
or 作为内部 bean 来注入“accountService”,而是使用 SpEL 的@beanId
表示法并调用一个采用与消息有效负载兼容的类型的方法。我们还传递了一个标头值。任何有效的 SpEL 表达式都可以针对消息中的任何内容进行评估。对于简单的场景,如果所有逻辑都可以封装在这样的表达式中,那么您的服务激活器不需要引用 bean,如以下示例所示:
<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>
在前面的配置中,我们的服务逻辑是将有效载荷值乘以 2。SpEL 让我们可以相对轻松地处理它。
有关配置服务激活器的更多信息,请参阅Java DSL 章节中的服务激活器和.handle()
方法。
异步服务激活器
服务激活器由调用线程调用。SubscribableChannel
如果输入通道是 a或 a 的轮询线程,则这是上游线程PollableChannel
。如果服务返回 a ListenableFuture<?>
,则默认操作是将其作为发送到输出(或回复)通道的消息的有效负载发送。从版本 4.3 开始,您现在可以将async
属性设置为true
(setAsync(true)
在使用 Java 配置时使用)。如果该属性设置为ListenableFuture<?>
时服务返回 a ,则调用线程将立即释放,并在完成未来的线程(从您的服务内部)上发送回复消息。这对于使用async
true
PollableChannel
,因为轮询线程被释放以执行框架内的其他服务。
如果服务以 完成未来,则Exception
发生正常的错误处理。AnErrorMessage
被发送到errorChannel
消息头(如果存在)。否则,将ErrorMessage
发送到默认值errorChannel
(如果可用)。
服务激活器和方法返回类型
服务方法可以返回任何成为回复消息有效负载的类型。在这种情况下,将创建一个新Message<?>
对象并复制请求消息中的所有标头。MessageHandler
当交互基于 POJO 方法调用时,这对于大多数 Spring Integration 实现的工作方式相同。
Message<?>
该方法也可以返回一个完整的对象。但是请记住,与转换器不同的是,对于服务激活器,如果返回的消息中尚不存在此消息,则将通过从请求消息中复制标头来修改此消息。因此,如果您的方法参数是 aMessage<?>
并且您在服务方法中复制了一些(但不是全部)现有标头,它们将重新出现在回复消息中。从回复消息中删除标头不是 Service Activator 的职责,并且遵循松散耦合原则,最好HeaderFilter
在集成流程中添加一个。或者,可以使用 Transformer 代替 Service Activator,但在这种情况下,当返回一个完整的Message<?>
该方法完全负责消息,包括复制请求消息头(如果需要)。您必须确保必须保留重要的框架头文件(例如replyChannel
,errorChannel
),如果存在的话。
延迟器
延迟器是一个简单的端点,它可以让消息流延迟一定的时间间隔。当消息延迟时,原始发件人不会阻止。相反,延迟的消息被安排org.springframework.scheduling.TaskScheduler
在延迟过去后发送到输出通道的实例。这种方法即使对于相当长的延迟也是可扩展的,因为它不会导致大量阻塞的发送者线程。相反,在典型情况下,线程池用于实际执行释放消息。本节包含配置延迟器的几个示例。
配置延迟器
该<delayer>
元素用于延迟两个消息通道之间的消息流。与其他端点一样,您可以提供“输入通道”和“输出通道”属性,但延迟器还具有确定数量的“默认延迟”和“表达式”属性(以及“表达式”元素)每条消息应延迟的毫秒数。以下示例将所有消息延迟三秒:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果您需要确定每条消息的延迟,您还可以使用 'expression' 属性提供 SpEL 表达式,如以下表达式所示:
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,三秒延迟仅适用于给定入站消息的表达式计算结果为 null 时。如果您只想对具有有效表达式评估结果的消息应用延迟,您可以使用0
(默认值)的“默认延迟”。对于延迟为0
(或更少)的任何消息,消息会立即在调用线程上发送。
XML 解析器使用消息组 ID <beanName>.messageGroupId 。
|
延迟处理程序支持表示以毫秒为单位的间隔的表达式评估结果(任何Object 方法toString() 产生可以解析为 a 的值Long )以及java.util.Date 表示绝对时间的实例。在第一种情况下,毫秒数是从当前时间开始计算的(例如,5000 从延迟器收到消息的时间起,值将延迟消息至少五秒)。对于Date 实例,消息直到该Date 对象表示的时间才会发布。等于非正延迟或过去日期的值不会导致延迟。相反,它直接发送到原始发送者线程上的输出通道。如果表达式评估结果不是Date 并且不能被解析为Long ,应用默认延迟(如果有的话——默认为0 )。
|
表达式评估可能会由于各种原因引发评估异常,包括无效的表达式或其他条件。默认情况下,此类异常被忽略(尽管在 DEBUG 级别记录)并且延迟器回退到默认延迟(如果有)。ignore-expression-failures 您可以通过设置属性来修改此行为。默认情况下,此属性设置为true ,延迟器行为如前所述。但是,如果您不希望忽略表达式求值异常并将它们抛出给延迟器的调用者,请将ignore-expression-failures 属性设置为false .
|
在前面的示例中,延迟表达式被指定为
因此,如果有可能忽略标头并且您想回退到默认延迟,则使用索引器语法而不是点属性访问器语法通常更有效(并且推荐),因为检测空值更快而不是捕捉异常。 |
延迟器委托给 SpringTaskScheduler
抽象的一个实例。延迟器使用的默认调度器是ThreadPoolTaskScheduler
Spring Integration 在启动时提供的实例。请参阅配置任务计划程序。如果你想委托给不同的调度器,你可以通过延迟器元素的“调度器”属性提供一个引用,如以下示例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置 external ThreadPoolTaskScheduler ,则可以waitForTasksToCompleteOnShutdown = true 在此属性上进行设置。它允许在应用程序关闭时成功完成已经处于执行状态(释放消息)的“延迟”任务。在 Spring Integration 2.2 之前,该属性在<delayer> 元素上可用,因为DelayHandler 可以在后台创建自己的调度程序。从 2.2 开始,延迟器需要一个外部调度器实例waitForTasksToCompleteOnShutdown 并被删除。您应该使用调度程序自己的配置。
|
ThreadPoolTaskScheduler 有一个属性errorHandler ,可以注入org.springframework.util.ErrorHandler . 此处理程序允许处理Exception 来自发送延迟消息的计划任务的线程。默认情况下,它使用org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler ,您可以在日志中看到堆栈跟踪。您可能需要考虑使用一个,它将一个从失败消息的标头或默认org.springframework.integration.channel.MessagePublishingErrorHandler 发送ErrorMessage 到一个。此错误处理在事务回滚(如果存在)之后执行。请参阅发布失败。
error-channel error-channel |
延迟器和消息存储
将DelayHandler
延迟消息持久化到提供的消息组中MessageStore
。(“groupId”基于<delayer>
元素所需的“id”属性。)在将消息发送到MessageStore
之前,计划任务立即从 中删除延迟消息。如果提供的是持久的(例如),它提供了在应用程序关闭时不丢失消息的能力。应用程序启动后,从其消息组中读取消息,并根据消息的原始到达时间(如果延迟是数字的)延迟重新调度它们。对于延迟标头为 的消息,在重新安排时使用。如果延迟消息留在DelayHandler
output-channel
MessageStore
JdbcMessageStore
DelayHandler
MessageStore
Date
Date
MessageStore
超过它的“延迟”,它在启动后立即发送。
<delayer>
可以用两个相互排斥的元素中的任何一个来丰富:<transactional>
和<advice-chain>
。这些List
AOP 建议中的 应用到被代理的内部DelayHandler.ReleaseMessageHandler
,它有责任在延迟后发布Thread
计划任务的消息。例如,当下游消息流抛出异常并且事务ReleaseMessageHandler
被回滚时,可能会使用它。在这种情况下,延迟的消息会保留在持久化的MessageStore
. 您可以org.aopalliance.aop.Advice
在<advice-chain>
. 该<transactional>
元素定义了一个仅包含事务性建议的简单建议链。以下示例显示了一个advice-chain
inside a <delayer>
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler
可以导出为MBean
具有托管操作 (getDelayedMessageCount
和)的JMX reschedulePersistedMessages
,这允许在运行时重新安排延迟的持久消息 - 例如,如果TaskScheduler
之前已停止。可以通过Control Bus
命令调用这些操作,如以下示例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅系统管理。 |
从版本 5.3.7 开始,如果在将消息存储到 a 时事务处于活动状态,则在回调MessageStore
中安排发布任务。TransactionSynchronization.afterCommit()
这对于防止竞争条件是必要的,在这种情况下,计划的发布可能在事务提交之前运行,并且找不到消息。在这种情况下,消息将在延迟后或事务提交后释放,以较晚者为准。
发布失败
从版本 5.0.8 开始,延迟器上有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
发布消息时,如果下游流失败,将在retryDelay
. 如果maxAttempts
达到,则丢弃消息(除非发布是事务性的,在这种情况下,消息将保留在存储中,但将不再安排发布,直到重新启动应用程序或reschedulePersistedMessages()
调用该方法,如上所述以上)。
此外,您可以配置一个delayedMessageErrorChannel
; 当发布失败时,将一个ErrorMessage
异常发送到该通道,作为有效负载并具有originalMessage
属性。包含一个包含当前计数的ErrorMessage
标题。IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
如果错误流消费了错误信息并正常退出,则不采取进一步行动;如果发布是事务性的,则事务将提交并将消息从存储中删除。如果错误流抛出异常,将重试释放,直到maxAttempts
如上所述。
脚本支持
Spring Integration 2.1 添加了对 Java 版本 6 中引入的JSR223 Scripting for Java 规范的支持。它允许您使用以任何受支持的语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本来为各种集成组件提供逻辑,类似于Spring 集成中使用 Spring 表达式语言 (SpEL) 的方式。有关 JSR223 的更多信息,请参阅文档。
从 Java 11 开始,Nashorn JavaScript 引擎已被弃用,可能会在 Java 15 中删除。建议从现在开始重新考虑支持其他脚本语言。 |
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-scripting</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-scripting:5.5.13"
另外你需要添加一个脚本引擎实现,例如JRuby、Jython。
从 5.2 版开始,Spring Integration 提供了对 Kotlin Jsr223 的支持。您需要将这些依赖项添加到您的项目中以使其正常工作:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-script-util</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-compiler-embeddable</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-scripting-compiler-embeddable</artifactId>
<scope>runtime</scope>
</dependency>
runtime 'org.jetbrains.kotlin:kotlin-script-util'
runtime 'org.jetbrains.kotlin:kotlin-compiler-embeddable'
runtime 'org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable'
由KotlinScriptExecutor
提供的kotlin
语言指示器或带有.kts
扩展名的脚本文件选择。
第三方已经开发了各种 JSR223 语言实现。特定实现与 Spring Integration 的兼容性取决于它与规范的符合程度以及实现者对规范的解释。 |
如果您打算使用 Groovy 作为脚本语言,我们建议您使用Spring-Integration 的 Groovy Support,因为它提供了特定于 Groovy 的附加功能。然而,这部分也是相关的。 |
脚本配置
根据您的集成要求的复杂性,脚本可以作为 XML 配置中的 CDATA 内联提供,或者作为对包含该脚本的 Spring 资源的引用。为了启用脚本支持,Spring Integration 定义了一个ScriptExecutingMessageProcessor
,它将消息有效负载绑定到一个名为的变量payload
,并将消息头绑定到一个headers
变量,两者都可以在脚本执行上下文中访问。您需要做的就是编写一个使用这些变量的脚本。以下一对示例显示了创建过滤器的示例配置:
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
return new ByteArrayResource("headers.type == 'good'".getBytes());
}
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}
<int:filter input-channel="referencedScriptInput">
<int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-script:script lang="groovy">
<![CDATA[
return payload == 'good'
]]>
</int-script:script>
</int:filter>
如前面的示例所示,脚本可以内联包含,也可以通过引用资源位置(通过使用location
属性)来包含。此外,该lang
属性对应于语言名称(或其 JSR223 别名)。
其他支持脚本的 Spring Integration 端点元素包括router
、service-activator
、transformer
和splitter
. 每种情况下的脚本配置都与上述相同(除了端点元素)。
脚本支持的另一个有用特性是无需重新启动应用程序上下文即可更新(重新加载)脚本的能力。为此,请refresh-check-delay
在元素上指定属性script
,如以下示例所示:
Scripts.processor(...).refreshCheckDelay(5000)
}
<int-script:script location="..." refresh-check-delay="5000"/>
在前面的示例中,每 5 秒检查一次脚本位置是否有更新。如果脚本已更新,则在更新后 5 秒后发生的任何调用都会导致运行新脚本。
考虑以下示例:
Scripts.processor(...).refreshCheckDelay(0)
}
<int-script:script location="..." refresh-check-delay="0"/>
在前面的示例中,一旦发生任何脚本修改,上下文就会更新,从而为“实时”配置提供了一种简单的机制。任何负值都意味着在应用程序上下文初始化后不会重新加载脚本。这是默认行为。以下示例显示了一个从不更新的脚本:
Scripts.processor(...).refreshCheckDelay(-1)
}
<int-script:script location="..." refresh-check-delay="-1"/>
内联脚本无法重新加载。 |
脚本变量绑定
需要变量绑定才能使脚本能够引用外部提供给脚本执行上下文的变量。默认情况下,payload
和headers
用作绑定变量。您可以使用<variable>
元素(或ScriptSpec.variables()
选项)将其他变量绑定到脚本,如以下示例所示:
Scripts.processor("foo/bar/MyScript.py")
.variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}
<script:script lang="py" location="foo/bar/MyScript.py">
<script:variable name="var1" value="thing1"/>
<script:variable name="var2" value="thing2"/>
<script:variable name="date" ref="date"/>
</script:script>
如前面的示例所示,您可以将脚本变量绑定到标量值或 Spring bean 引用。请注意,payload
andheaders
仍然作为绑定变量包含在内。
在 Spring Integration 3.0 中,除了variable
元素之外,variables
还引入了属性。此属性和variable
元素不是相互排斥的,您可以将它们组合在一个script
组件中。但是,变量必须是唯一的,无论它们是在哪里定义的。此外,从 Spring Integration 3.0 开始,内联脚本也允许变量绑定,如以下示例所示:
<service-activator input-channel="input">
<script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
<script:variable name="thing2" ref="thing2Bean"/>
<script:variable name="thing3" value="thing2"/>
<![CDATA[
payload.foo = thing1
payload.date = date
payload.bar = thing2
payload.baz = thing3
payload
]]>
</script:script>
</service-activator>
前面的示例显示了内联脚本、variable
元素和variables
属性的组合。该variables
属性包含一个逗号分隔的值,其中每个段包含一个“=”分隔的变量及其值对。变量名称可以以 为后缀-ref
,如上例中的date-ref
变量。这意味着绑定变量的名称为date
,但其值是dateBean
应用程序上下文中对 bean 的引用。这在使用属性占位符配置或命令行参数时可能很有用。
如果您需要更多地控制变量的生成方式,您可以实现自己的使用该ScriptVariableGenerator
策略的 Java 类,该策略由以下接口定义:
public interface ScriptVariableGenerator {
Map<String, Object> generateScriptVariables(Message<?> message);
}
此接口需要您实现该generateScriptVariables(Message)
方法。message 参数允许您访问消息有效负载和标头中可用的任何数据,返回值是Map
绑定变量的值。每次为消息执行脚本时都会调用此方法。以下示例显示了如何提供实现并使用属性ScriptVariableGenerator
引用它:script-variable-generator
Scripts.processor("foo/bar/MyScript.groovy")
.variableGenerator(new foo.bar.MyScriptVariableGenerator())
}
<int-script:script location="foo/bar/MyScript.groovy"
script-variable-generator="variableGenerator"/>
<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>
如果 ascript-variable-generator
未提供,脚本组件将使用DefaultScriptVariableGenerator
,它将任何提供的<variable>
元素与其方法中的payload
和headers
变量合并。Message
generateScriptVariables(Message)
您不能同时提供script-variable-generator 属性和<variable> 元素。它们是相互排斥的。
|
Groovy 支持
在 Spring Integration 2.0 中,我们添加了对 Groovy 的支持,让您可以使用 Groovy 脚本语言为各种集成组件提供逻辑——类似于 Spring Expression Language (SpEL) 支持路由、转换和其他集成问题的方式。有关 Groovy 的更多信息,请参阅 Groovy 文档,您可以在项目网站上找到该文档。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-groovy:5.5.13"
Groovy 配置
在 Spring Integration 2.1 中,Groovy 支持的配置命名空间是 Spring Integration 脚本支持的扩展,并共享脚本支持部分中详细描述的核心配置和行为。尽管通用脚本支持很好地支持了 Groovy 脚本,但 Groovy 支持提供了Groovy
由 Spring 框架和相关组件支持的配置命名空间,org.springframework.scripting.groovy.GroovyScriptFactory
为使用 Groovy 提供了扩展功能。以下清单显示了两个示例配置:
<int:filter input-channel="referencedScriptInput">
<int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-groovy:script><![CDATA[
return payload == 'good'
]]></int-groovy:script>
</int:filter>
如前面的示例所示,配置看起来与一般脚本支持配置相同。唯一的区别是 Groovy 命名空间的使用,如int-groovy
命名空间前缀所示。另请注意,标记上的lang
属性<script>
在此命名空间中无效。
Groovy 对象自定义
如果您需要自定义 Groovy 对象本身(除了设置变量),您可以引用GroovyObjectCustomizer
通过使用customizer
属性实现的 bean。例如,如果您想通过修改MetaClass
和注册函数以在脚本中可用来实现域特定语言 (DSL),这可能很有用。以下示例显示了如何执行此操作:
<int:service-activator input-channel="groovyChannel">
<int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>
<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>
设置自定义与元素或属性GroovyObjectCustomizer
并不相互排斥。也可以在定义内联脚本时提供。<variable>
script-variable-generator
Spring Integration 3.0 引入了variables
属性,它与variable
元素一起工作。BeanFactory
此外,如果绑定变量未提供名称,则groovy 脚本能够将变量解析为 bean 中的 bean 。以下示例显示了如何使用变量 ( entityManager
):
<int-groovy:script>
<![CDATA[
entityManager.persist(payload)
payload
]]>
</int-groovy:script>
entityManager
必须是应用程序上下文中的适当 bean。
有关<variable>
元素、variables
属性和script-variable-generator
属性的更多信息,请参阅脚本变量绑定。
Groovy 脚本编译器自定义
@CompileStatic
提示是最流行的 Groovy 编译器自定义选项。它可以在类或方法级别上使用。有关更多信息,请参阅 Groovy参考手册,特别是@CompileStatic。为了将此特性用于短脚本(在集成场景中),我们不得不将简单的脚本更改为更类似于 Java 的代码。考虑以下<filter>
脚本:
headers.type == 'good'
上述脚本在 Spring Integration 中变成如下方法:
@groovy.transform.CompileStatic
String filter(Map headers) {
headers.type == 'good'
}
filter(headers)
这样,该filter()
方法就被转换并编译为静态 Java 代码,绕过了 Groovy 的动态调用阶段,例如getProperty()
工厂和CallSite
代理。
从版本 4.3 开始,您可以使用compile-static
boolean
选项配置 Spring Integration Groovy 组件,指定应该将ASTTransformationCustomizer
for@CompileStatic
添加到 internal CompilerConfiguration
. 有了它,您可以@CompileStatic
在我们的脚本代码中省略方法声明,仍然可以获得编译后的纯 Java 代码。在这种情况下,前面的脚本可以很短,但仍需要比解释脚本更详细一点,如以下示例所示:
binding.variables.headers.type == 'good'
您必须通过属性访问headers
和payload
(或任何其他)变量,因为使用,我们没有动态能力。groovy.lang.Script
binding
@CompileStatic
GroovyObject.getProperty()
此外,我们还介绍了compiler-configuration
bean 引用。使用此属性,您可以提供任何其他必需的 Groovy 编译器自定义,例如ImportCustomizer
. 有关此功能的更多信息,请参阅 Groovy 文档以获取高级编译器配置。
UsingcompilerConfiguration 不会自动ASTTransformationCustomizer 为@CompileStatic 注解添加一个,它会覆盖该compileStatic 选项。如果您仍然需要CompileStatic ,您应该手动将一个添加new ASTTransformationCustomizer(CompileStatic.class) 到该CompilationCustomizers 自定义compilerConfiguration 中。
|
Groovy 编译器自定义对该选项没有任何影响refresh-check-delay ,并且可重新加载的脚本也可以静态编译。
|
控制总线
如(企业集成模式)中所述,控制总线背后的想法是,您可以使用与“应用程序级”消息传递相同的消息传递系统来监视和管理框架内的组件。在 Spring Integration 中,我们构建在前面描述的适配器之上,以便您可以发送消息作为调用公开操作的一种方式。这些操作的一种选择是 Groovy 脚本。以下示例为控制总线配置了一个 Groovy 脚本:
<int-groovy:control-bus input-channel="operationChannel"/>
控制总线有一个输入通道,可以访问该通道以在应用程序上下文中调用对 bean 的操作。
Groovy 控制总线将输入通道上的消息作为 Groovy 脚本运行。它接收一条消息,将正文编译为脚本,使用 a 对其进行自定义GroovyObjectCustomizer
,然后运行它。控制总线'MessageProcessor
公开应用程序上下文中的所有 bean,这些 bean 使用 Spring 的接口进行注释@ManagedResource
并实现 Spring 的Lifecycle
接口或扩展 Spring 的CustomizableThreadCreator
基类(例如,几个TaskExecutor
andTaskScheduler
实现)。
在控制总线的命令脚本中使用具有自定义范围(例如“请求”)的托管 bean 时要小心,尤其是在异步消息流中。如果MessageProcessor 控制总线不能从应用程序上下文中公开一个 bean,那么您可能会BeansException 在命令脚本运行期间得到一些。例如,如果未建立自定义范围的上下文,则在该范围内获取 bean 的尝试会触发BeanCreationException .
|
如果需要进一步自定义 Groovy 对象,还可以提供对GroovyObjectCustomizer
通过customizer
属性实现的 bean 的引用,如以下示例所示:
<int-groovy:control-bus input-channel="input"
output-channel="output"
customizer="groovyCustomizer"/>
<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>
向端点添加行为
<advice-chain/>
在 Spring Integration 2.2 之前,您可以通过将 AOP Advice 添加到轮询器的元素来将行为添加到整个集成流。但是,假设您想重试,比如说,只是一个 REST Web 服务调用,而不是任何下游端点。
例如,考虑以下流程:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
如果您将一些重试逻辑配置到轮询器上的建议链中,并且http-gateway2
由于网络故障而导致调用失败,则重试会导致http-gateway1
和http-gateway2
被第二次调用。类似地,在 jdbc-outbound-adapter 发生短暂故障后,两个 HTTP 网关都被再次调用,然后再次调用jdbc-outbound-adapter
.
Spring Integration 2.2 增加了向单个端点添加行为的能力。这是通过将<request-handler-advice-chain/>
元素添加到许多端点来实现的。以下示例显示了如何在 中的<request-handler-advice-chain/>
元素outbound-gateway
:
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
在这种情况下,myRetryAdvice
仅在本地应用于此网关,并且不适用于在将回复发送到后采取的进一步下游操作nextChannel
。建议的范围仅限于端点本身。
此时,您无法建议整个 但是,可以将 a添加到元素 |
提供咨询课程
除了提供应用 AOP 建议类的通用机制之外,Spring Integration 还提供了这些开箱即用的建议实现:
重试建议
重试建议 ( o.s.i.handler.advice.RequestHandlerRetryAdvice
) 利用Spring Retry项目提供的丰富重试机制。的核心组件spring-retry
是RetryTemplate
,它允许配置复杂的重试场景,包括RetryPolicy
和BackoffPolicy
策略(具有许多实现)以及RecoveryCallback
确定重试耗尽时要采取的操作的策略。
- 无状态重试
-
无状态重试是指重试活动完全在通知内处理的情况。线程暂停(如果配置为这样做)并重试该操作。
- 有状态重试
-
有状态重试是在通知中管理重试状态但抛出异常并且调用者重新提交请求的情况。有状态重试的一个例子是当我们希望消息发起者(例如,JMS)负责重新提交,而不是在当前线程上执行它。有状态重试需要某种机制来检测重试提交。
有关 的更多信息spring-retry
,请参阅项目的 Javadoc和Spring Batch的参考文档,该文档的来源spring-retry
。
默认退避行为是不退避。立即尝试重试。使用导致线程在尝试之间暂停的退避策略可能会导致性能问题,包括内存使用过多和线程不足。在高容量环境中,应谨慎使用回退策略。 |
配置重试建议
本节中的示例使用以下<service-activator>
始终引发异常的内容:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单的无状态重试
-
默认
RetryTemplate
有一个SimpleRetryPolicy
尝试三次。没有BackOffPolicy
,因此三个尝试是背靠背进行的,尝试之间没有延迟。没有RecoveryCallback
,所以结果是在最后一次失败重试发生后,将异常抛给调用者。在 Spring Integration 环境中,最终的异常可能会通过使用error-channel
入站端点上的 an 来处理。以下示例使用RetryTemplate
并显示其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 带恢复的简单无状态重试
-
以下示例将 a 添加
RecoveryCallback
到前面的示例并使用 anErrorMessageSendingRecoverer
将 an 发送ErrorMessage
到通道:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 使用自定义策略和恢复的无状态重试
-
为了更复杂,我们可以提供定制的建议
RetryTemplate
。此示例继续使用,SimpleRetryPolicy
但将尝试次数增加到四次。它还添加了ExponentialBackoffPolicy
第一次重试等待一秒钟,第二次等待五秒钟,第三次等待 25 秒(总共四次尝试)。以下清单显示了示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 无状态重试的命名空间支持
-
从 4.0 版本开始,由于重试建议的命名空间支持,上述配置可以大大简化,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,通知被定义为顶级 bean,以便它可以在多个
request-handler-advice-chain
实例中使用。您还可以直接在链中定义通知,如以下示例所示:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
A
<handler-retry-advice>
可以有一个<fixed-back-off>
或<exponential-back-off>
子元素,也可以没有子元素。没有子元素的 A<handler-retry-advice>
不使用退避。如果没有recovery-channel
,则在重试用尽时抛出异常。命名空间只能用于无状态重试。对于更复杂的环境(自定义策略等),请使用常规
<bean>
定义。 - 带恢复的简单有状态重试
-
为了使重试有状态,我们需要提供带有
RetryStateGenerator
实现的建议。此类用于将消息标识为重新提交,以便RetryTemplate
可以确定此消息的当前重试状态。该框架提供了一个SpelExpressionRetryStateGenerator
,它通过使用 SpEL 表达式来确定消息标识符。此示例再次使用默认策略(没有退避的三次尝试)。与无状态重试一样,这些策略可以自定义。以下清单显示了示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果将前面的示例与无状态示例进行比较,您可以看到,通过有状态重试,每次失败都会向调用者抛出异常。
- 重试异常分类
-
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。默认配置重试所有异常,异常分类器查看顶级异常。如果您将其配置为仅重试 on并且您的应用程序在原因为 a 的地方
MyException
抛出 a ,则不会发生重试。SomeOtherException
MyException
从 Spring Retry 1.0.3 开始,
BinaryExceptionClassifier
有一个名为traverseCauses
(默认为false
)的属性。当 时true
,它会遍历异常原因,直到找到匹配项或用完遍历的原因。要使用此分类器进行重试,请使用带构造函数的
SimpleRetryPolicy
created ,该构造函数采用最大尝试次数、Map
对象数Exception
和traverseCauses
布尔值。然后您可以将此策略注入到RetryTemplate
.
traverseCauses 在这种情况下是必需的,因为用户异常可能包含在MessagingException .
|
断路器建议
断路器模式的一般思想是,如果服务当前不可用,则不要浪费时间(和资源)尝试使用它。实现了这种o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
模式。当断路器处于关闭状态时,端点会尝试调用服务。如果一定数量的连续尝试失败,断路器将进入打开状态。当它处于打开状态时,新请求“快速失败”并且在一段时间到期之前不会尝试调用服务。
当该时间到期时,断路器被设置为半开状态。在这种状态下,即使一次尝试失败,断路器也会立即进入打开状态。如果尝试成功,断路器将进入关闭状态,在这种情况下,它不会再次进入打开状态,直到再次发生配置的连续故障次数。任何成功的尝试都会将状态重置为零故障,以确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,它可能需要一些时间才能失败(例如尝试建立网络连接的超时)。
有RequestHandlerCircuitBreakerAdvice
两个属性:threshold
和halfOpenAfter
。该threshold
属性表示在断路器打开之前需要发生的连续故障数。它默认为5
. 该halfOpenAfter
属性表示在最后一次失败后,断路器在尝试另一个请求之前等待的时间。默认值为 1000 毫秒。
以下示例配置断路器并显示其DEBUG
和ERROR
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值2
设置halfOpenAfter
为12
秒。每 5 秒有一个新请求到达。前两次尝试调用了该服务。第三次和第四次失败,异常表明断路器已打开。尝试了第五个请求,因为该请求在上次失败后 15 秒。第六次尝试立即失败,因为断路器立即打开。
表达评估建议
最后提供的建议类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
. 这个建议比其他两个建议更笼统。它提供了一种机制来评估发送到端点的原始入站消息的表达式。在成功或失败之后,可以评估单独的表达式。可选地,可以将包含评估结果的消息与输入消息一起发送到消息通道。
此建议的典型用例可能是 ,<ftp:outbound-channel-adapter/>
如果传输成功,则可能将文件移动到一个目录,如果传输失败,则可能将文件移动到另一个目录:
建议具有设置成功时的表达式、失败时的表达式以及每个对应的通道的属性。对于成功的情况,发送到的消息successChannel
是一个AdviceMessage
,有效负载是表达式评估的结果。一个名为 的附加属性inputMessage
包含发送到处理程序的原始消息。发送到failureChannel
(当处理程序抛出异常时)的消息是ErrorMessage
带有有效负载的MessageHandlingExpressionEvaluatingAdviceException
. 与所有MessagingException
实例一样,此有效负载具有failedMessage
和cause
属性,以及一个名为 的附加属性evaluationResult
,其中包含表达式评估的结果。
从版本 5.1.3 开始,如果配置了通道,但未提供表达式,则使用默认表达式计算payload 消息的值。
|
当在通知范围内抛出异常时,默认情况下,在failureExpression
评估 any 后,该异常会被抛出给调用者。如果您希望禁止抛出异常,请将trapException
属性设置为true
. 以下建议显示了如何使用 Java DSL 配置建议:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.handle((GenericHandler<String>) (payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议 ( RateLimiterRequestHandlerAdvice
) 允许确保端点不会因请求而过载。当超出速率限制时,请求将进入阻塞状态。
此建议的典型用例可能是外部服务提供商不允许超过n
每分钟的请求数。
该RateLimiterRequestHandlerAdvice
实现完全基于Resilience4j项目,需要注入RateLimiter
或RateLimiterConfig
注入。也可以使用默认值和/或自定义名称进行配置。
以下示例配置了一个速率限制器建议,每 1 秒有一个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从 5.2 版开始,CacheRequestHandlerAdvice
引入了 。它基于Spring Framework@Caching
中的缓存抽象,并与注解家族提供的概念和功能保持一致。内部逻辑基于CacheAspectSupport
扩展,其中代理缓存操作是围绕AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
以请求Message<?>
为参数的方法完成的。可以使用 SpEL 表达式或 a 配置此建议Function
以评估缓存键。该请求Message<?>
可用作 SpEL 评估上下文的根对象,或作为Function
输入参数。默认情况下,payload
请求消息的 用于缓存键。CacheRequestHandlerAdvice
必须配置,当cacheNames
默认缓存操作是CacheableOperation
,或与任意任意CacheOperation
s 的集合。每个CacheOperation
都可以单独配置或具有共享选项,例如 a和CacheManager
,可以从配置中重用。此配置功能类似于 Spring Framework和注解的组合。如果未提供 a,则默认情况下从.CacheResolver
CacheErrorHandler
CacheRequestHandlerAdvice
@CacheConfig
@Caching
CacheManager
BeanFactory
CacheAspectSupport
以下示例配置了两个具有不同缓存操作集的建议:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
反应性建议
从 5.3 版开始,aReactiveRequestHandlerAdvice
可用于请求消息处理程序产生Mono
回复。BiFunction<Message<?>, Mono<?>, Publisher<?>>
必须为此建议提供A ,并从操作员根据截获的方法实现Mono.transform()
产生的回复调用它。通常,当我们想通过和类似的支持运营商控制网络波动时,handleRequestMessage()
这种定制是必要的。例如,当我们可以通过 WebFlux 客户端发出 HTTP 请求时,我们可以使用以下配置来等待响应不超过 5 秒:Mono
timeout()
retry()
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
该message
参数是消息处理程序的请求消息,可用于确定请求范围属性。mono
参数是此消息处理程序的handleRequestMessage()
方法实现的结果。Mono.transform()
也可以从此函数调用嵌套以应用例如Reactive Circuit Breaker。
自定义建议类
除了前面描述的提供的建议类之外,您还可以实现自己的建议类。虽然您可以提供org.aopalliance.aop.Advice
(通常org.aopalliance.intercept.MethodInterceptor
)的任何实现,但我们通常建议您将o.s.i.handler.advice.AbstractRequestHandlerAdvice
. 这样做的好处是避免了编写低级的面向方面的编程代码,并提供了一个专门为在这种环境中使用而定制的起点。
子类需要实现该doInvoke()
方法,其定义如下:
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
回调参数是为了方便避免子类直接处理 AOP。调用该callback.execute()
方法会调用消息处理程序。
该target
参数是为那些需要为特定处理程序维护状态的子类提供的,可能是Map
通过在目标键中维护该状态。此功能允许将相同的建议应用于多个处理程序。使用RequestHandlerCircuitBreakerAdvice
建议来保持每个处理程序的断路器状态。
message
参数是发送给处理程序的消息。虽然通知不能在调用处理程序之前修改消息,但它可以修改有效负载(如果它具有可变属性)。通常,通知将使用消息进行日志记录或在调用处理程序之前或之后在某处发送消息的副本。
返回值通常是由callback.execute()
. 但是,通知确实具有修改返回值的能力。请注意,只有AbstractReplyProducingMessageHandler
实例返回值。以下示例显示了扩展的自定义建议类AbstractRequestHandlerAdvice
:
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
除了 有关详细信息,请参阅ReflectiveMethodInvocation Javadoc。 |
处理消息通知
正如本节介绍中所讨论的,请求处理程序建议链中的建议对象仅应用于当前端点,而不是下游流(如果有)。对于MessageHandler
产生回复的对象(例如那些扩展的对象AbstractReplyProducingMessageHandler
),建议将应用于内部方法:(handleRequestMessage()
调用 from MessageHandler.handleMessage()
)。对于其他消息处理程序,建议应用于MessageHandler.handleMessage()
.
在某些情况下,即使消息处理程序是AbstractReplyProducingMessageHandler
,也必须将通知应用于handleMessage
方法。例如,幂等接收器可能返回,如果处理程序的属性设置为null
,这将导致异常。另一个例子是 - 请参阅严格消息排序。replyRequired
true
BoundRabbitChannelAdvice
从版本 4.3.1 开始,引入了一个新HandleMessageAdvice
接口及其基本实现 ( AbstractHandleMessageAdvice
)。
无论处理程序类型如何Advice
,实现HandleMessageAdvice
的对象始终应用于方法。handleMessage()
重要的是要理解HandleMessageAdvice
实现(例如幂等接收器)在应用于返回响应的处理程序时,会adviceChain
与方法分离并正确应用于MessageHandler.handleMessage()
方法。
由于这种分离,建议链顺序不受尊重。 |
考虑以下配置:
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
在前面的示例中,<tx:advice>
应用于AbstractReplyProducingMessageHandler.handleRequestMessage()
. 但是,myHandleMessageAdvice
申请到MessageHandler.handleMessage()
。因此,它在. <tx:advice>
要保留顺序,您应该遵循标准的Spring AOP配置方法,并使用端点id
和.handler
后缀来获取目标MessageHandler
bean。请注意,在这种情况下,整个下游流程都在事务范围内。
在MessageHandler
不返回响应的情况下,建议链顺序被保留。
从 5.3 版开始,HandleMessageAdviceAdapter
存在 可以应用任何现有MethodInterceptor
的MessageHandler.handleMessage()
,因此,整个子流。例如,aRetryOperationsInterceptor
可以应用于从某个端点开始的整个子流,默认情况下这是不可能的,因为消费者端点仅对AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
. 从 5.3 版开始,HandleMessageAdviceAdapter
提供了对方法应用 anyMethodInterceptor
的MessageHandler.handleMessage()
方法,因此也适用于整个子流程。例如,aRetryOperationsInterceptor
可以应用于从某个端点开始的整个子流;默认情况下这是不可能的,因为消费者端点仅将建议应用于AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
.
交易支持
从 5.0 版开始TransactionHandleMessageAdvice
,由于实施,引入了一个新功能以使整个下游流程具有事务性HandleMessageAdvice
。当TransactionInterceptor
在<request-handler-advice-chain>
元素中使用正则时(例如,通过配置<tx:advice>
),启动的事务仅适用于内部AbstractReplyProducingMessageHandler.handleRequestMessage()
,不会传播到下游流。
为了简化 XML 配置,除了 ,还向所有和相关组件添加了一个元素<request-handler-advice-chain>
。下面的例子展示了在使用中:<transactional>
<outbound-gateway>
<service-activator>
<transactional>
<int-rmi:outbound-gateway remote-channel="foo" host="localhost"
request-channel="good" reply-channel="reply" port="#{@port}">
<int-rmi:transactional/>
</int-rmi:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
Java 配置可以使用 来简化TransactionInterceptorBuilder
,结果 bean 名称可以用在消息注解 adviceChain
属性中,如以下示例所示:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
注意构造true
函数上的参数TransactionInterceptorBuilder
。它会导致创建一个TransactionHandleMessageAdvice
,而不是一个常规的TransactionInterceptor
。
Java DSL 支持Advice
通过.transactional()
端点配置上的选项,如以下示例所示:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
建议过滤器
提供建议时还有一个额外的考虑Filter
。默认情况下,任何丢弃操作(当过滤器返回时false
)都在建议链的范围内执行。这可能包括丢弃通道下游的所有流。因此,例如,如果丢弃通道下游的元素抛出异常并且有重试建议,则重试该过程。此外,如果throwExceptionOnRejection
设置为true
(在建议范围内引发异常)。
设置discard-within-advice
为false
修改此行为,并且在调用建议链后发生丢弃(或异常)。
使用注解建议端点
使用注释(@Filter
、@ServiceActivator
、@Splitter
和)配置某些端点时,您可以在属性@Transformer
中为建议链提供 bean 名称。adviceChain
此外,@Filter
注解还具有discardWithinAdvice
属性,可用于配置丢弃行为,如Advising Filters中所述。以下示例导致在通知之后执行丢弃:
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
在建议链中订购建议
建议类是“围绕”建议,并以嵌套方式应用。第一个建议是最外面的,而最后一个建议是最里面的(即最接近被建议的处理程序)。以正确的顺序放置建议类以实现您想要的功能非常重要。
例如,假设您要添加重试建议和事务建议。您可能希望首先放置重试通知通知,然后是事务通知。因此,每次重试都在新事务中执行。另一方面,如果您希望所有尝试和任何恢复操作(在 retry 中RecoveryCallback
)都在事务范围内,您可以将事务建议放在首位。
建议的处理程序属性
有时,从建议中访问处理程序属性很有用。例如,大多数处理程序实现NamedComponent
让您访问组件名称。
可以通过target
参数(子类化时AbstractRequestHandlerAdvice
)或invocation.getThis()
(实现时)访问目标对象org.aopalliance.intercept.MethodInterceptor
。
当通知整个处理程序时(例如处理程序不产生回复或通知实现时HandleMessageAdvice
),您可以将目标对象强制转换为接口,例如NamedComponent
,如下例所示:
String componentName = ((NamedComponent) target).getComponentName();
当您MethodInterceptor
直接实现时,您可以按如下方式转换目标对象:
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
当仅handleRequestMessage()
建议方法时(在产生回复的处理程序中),您需要访问完整的处理程序,即AbstractReplyProducingMessageHandler
. 以下示例显示了如何执行此操作:
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
幂等接收器企业集成模式
从 4.1 版开始,Spring Integration 提供了Idempotent Receiver Enterprise Integration Pattern 的实现。它是一种功能模式,整个幂等性逻辑应该在应用程序中实现。但是,为了简化决策,IdempotentReceiverInterceptor
提供了该组件。Advice
这是一个应用于MessageHandler.handleMessage()
方法的 AOP ,可以根据其配置filter
请求消息或将其标记为。duplicate
例如,以前,您可以通过MessageSelector
在 a 中使用自定义<filter/>
(请参阅Filter)来实现此模式。然而,由于这种模式真正定义了端点的行为而不是端点本身,所以幂等接收器实现不提供端点组件。相反,它应用于应用程序中声明的端点。
的逻辑IdempotentReceiverInterceptor
基于提供MessageSelector
的信息,如果该选择器不接受该消息,则会通过duplicateMessage
设置为 的标头来丰富它true
。目标MessageHandler
(或下游流)可以查阅此标头以实现正确的幂等性逻辑。如果IdempotentReceiverInterceptor
用discardChannel
or配置throwExceptionOnRejection = true
,则不会将重复消息发送到目标MessageHandler.handleMessage()
。相反,它被丢弃了。如果您想丢弃(不处理)重复消息,discardChannel
则应使用 配置NullChannel
,例如默认nullChannel
bean。
为了维护消息之间的状态并提供比较消息以实现幂等性的能力,我们提供了MetadataStoreSelector
. 它接受一个MessageProcessor
实现(它基于创建一个查找键Message
)和一个可选的ConcurrentMetadataStore
(元数据存储)。有关更多信息,请参阅MetadataStoreSelector
Javadoc。您还可以使用附加value
的. 默认情况下,使用消息头。ConcurrentMetadataStore
MessageProcessor
MetadataStoreSelector
timestamp
通常,如果键不存在现有值,则选择器会选择要接受的消息。在某些情况下,比较一个键的当前值和新值是有用的,以确定是否应该接受该消息。从 5.3 版开始,compareValues
提供了引用 a 的属性BiPredicate<String, String>
;第一个参数是旧值;返回true
以接受消息并将旧值替换为MetadataStore
. 这对于减少键的数量很有用;例如,在处理文件中的行时,可以将文件名存储在 key 中,将当前行号存储在 value 中。然后,在重新启动后,您可以跳过已经处理过的行。有关示例,请参阅幂等下游处理拆分文件。
为方便起见,这些MetadataStoreSelector
选项可直接在<idempotent-receiver>
组件上进行配置。以下清单显示了所有可能的属性:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | IdempotentReceiverInterceptor bean的 ID 。可选的。 |
2 | 应用此拦截器的消费者端点名称或模式。用逗号 ( ) 分隔名称(模式), ,例如. 然后使用与这些模式匹配的端点 bean 名称来检索目标端点的bean(使用其后缀),并将应用于这些 bean。必需的。endpoint="aaa, bbb*, ccc, *ddd, eee*fff" MessageHandler .handler IdempotentReceiverInterceptor |
3 | MessageSelector 豆类参考。metadata-store 与和互斥key-strategy (key-expression) 。如果selector 未提供,则需要key-strategy 或之一key-strategy-expression 。 |
4 | IdempotentReceiverInterceptor 标识在不接受消息时向其发送消息的通道。省略时,重复的消息将转发到带有duplicateMessage 标头的处理程序。可选的。 |
5 | 一个ConcurrentMetadataStore 参考。由底层证券使用MetadataStoreSelector 。与 互斥selector 。可选的。默认使用不跨应用程序执行维护状态MetadataStoreSelector 的内部。SimpleMetadataStore |
6 | 一个MessageProcessor 参考。由底层证券使用MetadataStoreSelector 。idempotentKey 从请求消息中评估一个。selector 与和互斥key-expression 。如果selector 未提供 a,则需要key-strategy or之一key-strategy-expression 。 |
7 | 用于填充ExpressionEvaluatingMessageProcessor . 由底层证券使用MetadataStoreSelector 。idempotentKey 通过使用请求消息作为评估上下文根对象来评估一个。selector 与和互斥key-strategy 。如果selector 未提供 a,则需要key-strategy or之一key-strategy-expression 。 |
8 | 一个MessageProcessor 参考。由底层证券使用MetadataStoreSelector 。评估来自请求消息的value a 。与和idempotentKey 互斥。默认情况下,“MetadataStoreSelector”使用“timestamp”消息头作为元数据“值”。selector value-expression |
9 | 用于填充ExpressionEvaluatingMessageProcessor . 由底层证券使用MetadataStoreSelector 。通过使用请求消息作为评估上下文根对象来value 评估a 。与和idempotentKey 互斥。默认情况下,“MetadataStoreSelector”使用“timestamp”消息头作为元数据“值”。selector value-strategy |
10 | 对 bean 的引用,BiPredicate<String, String> 它允许您通过比较键的旧值和新值来选择性地选择消息;null 默认。 |
11 | IdempotentReceiverInterceptor 如果拒绝消息,是否抛出异常。默认为false . 无论是否discard-channel 提供 a 都将应用它。 |
对于 Java 配置,Spring Integration 提供了方法级别的@IdempotentReceiver
注解。它用于标记method
具有消息注释 ( @ServiceActivator
,@Router, and others) to specify which `IdempotentReceiverInterceptor
对象应用于此端点。以下示例显示如何使用@IdempotentReceiver
注释:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当您使用 Java DSL 时,您可以将拦截器添加到端点的通知链中,如以下示例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 仅为该
方法设计MessageHandler.handleMessage(Message<?>) 。从版本 4.3.1 开始,它实现HandleMessageAdvice 了 ,AbstractHandleMessageAdvice 作为基类,以实现更好的分离。有关详细信息,请参阅处理消息通知。
|
记录通道适配器
如Wire Tap<logging-channel-adapter>
中所讨论的,通常与线接头一起使用。但是,它也可以用作任何流的最终消费者。例如,考虑一个以 a 结尾的流,它返回一个结果,但您希望丢弃该结果。为此,您可以将结果发送到. 或者,您可以将其路由到一个级别。这样,您可以在级别登录时看到丢弃的消息,但在(例如)级别登录时看不到它。使用 a ,您在级别登录时只会看到丢弃的消息。以下清单显示了该元素的所有可能属性:<service-activator>
NullChannel
INFO
<logging-channel-adapter>
INFO
WARN
NullChannel
DEBUG
logging-channel-adapter
<int:logging-channel-adapter
channel="" (1)
level="INFO" (2)
expression="" (3)
log-full-message="false" (4)
logger-name="" /> (5)
1 | 将日志适配器连接到上游组件的通道。 |
2 | 将记录发送到此适配器的消息的日志记录级别。默认值:INFO . |
3 | 一个 SpEL 表达式,表示准确记录了消息的哪些部分。默认值:payload — 仅记录有效负载。如果log-full-message 指定,则不能指定该属性。 |
4 | 当 时true ,将记录整个消息(包括标题)。默认值:false — 仅记录有效负载。如果已指定,则无法指定此属性expression 。 |
5 | 指定name 记录器的类型(称为category in log4j )。用于标识此适配器创建的日志消息。这可以为各个适配器设置日志名称(在日志子系统中)。默认情况下,所有适配器都以以下名称记录:org.springframework.integration.handler.LoggingHandler . |
使用 Java 配置
以下 Spring Boot 应用程序显示了LoggingHandler
使用 Java 配置进行配置的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
@ServiceActivator(inputChannel = "logChannel")
public LoggingHandler logging() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(defaultRequestChannel = "logChannel")
public interface MyGateway {
void sendToLogger(String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了使用 Java DSL 配置日志记录通道适配器的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlows.from(MyGateway.class)
.log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
m -> m.getHeaders().getId() + ": " + m.getPayload());
}
@MessagingGateway
public interface MyGateway {
void sendToLogger(String data);
}
}
java.util.function
接口支持
从 5.1 版本开始,Spring Integration 提供对包中接口的直接支持java.util.function
。所有消息传递端点(Service Activator、Transformer、Filter 等)现在都可以引用Function
(或Consumer
)bean。消息注释可以直接应用于这些 bean,类似于常规定义MessageHandler
。例如,如果你有这个Function
bean 定义:
@Configuration
public class FunctionConfiguration {
@Bean
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
}
您可以将其用作 XML 配置文件中的简单参考:
<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>
当我们使用消息注释配置我们的流程时,代码很简单:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
当函数返回一个数组Collection
(本质上是 any Iterable
)Stream
或 ReactorFlux
时,@Splitter
可以在这样的 bean 上使用以对结果内容执行迭代。
该java.util.function.Consumer
接口可用于<int:outbound-channel-adapter>
or 与@ServiceActivator
注释一起执行流程的最后一步:
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
// Has to be an anonymous class for proper type inference
return new Consumer<Message<?>>() {
@Override
public void accept(Message<?> e) {
collector().add(e);
}
};
}
另外,请注意上面代码片段中的注释:如果您想处理Function
/中的整个消息,Consumer
则不能使用 lambda 定义。由于 Java 类型擦除,我们无法确定apply()/accept()
方法调用的目标类型。
该java.util.function.Supplier
接口可以简单地与@InboundChannelAdapter
注解一起使用,或者作为 a ref
in an 使用<int:inbound-channel-adapter>
:
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
使用 Java DSL,我们只需要在端点定义中使用对函数 bean 的引用。同时,Supplier
接口的实现可以用作常规MessageSource
定义:
@Bean
public Function<String, String> toUpperCaseFunction() {
return String::toUpperCase;
}
@Bean
public Supplier<String> stringSupplier() {
return () -> "foo";
}
@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlows.from(stringSupplier())
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
}
当与Spring Cloud Function框架一起使用时,此函数支持很有用,我们有一个函数目录,并且可以从集成流定义中引用其成员函数。
Kotlin Lambda
该框架也得到了改进,以支持函数的 Kotlin lambda,因此现在您可以结合使用 Kotlin 语言和 Spring Integration 流定义:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
fun kotlinSupplier(): () -> String {
return { "baz" }
}