AMQP 支持

Spring Integration 通过使用高级消息队列协议 (AMQP) 提供用于接收和发送消息的通道适配器。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-amqp:5.5.13"

可以使用以下适配器:

Spring Integration 还提供了点对点消息通道和由 AMQP 交换和队列支持的发布-订阅消息通道。

为了提供 AMQP 支持,Spring Integration 依赖于 ( Spring AMQP ),它将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。Spring AMQP 提供与 ( Spring JMS ) 类似的语义。

虽然提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收),但 Spring Integration 还为请求-回复操作提供了入站和出站 AMQP 网关。

提示:您应该熟悉Spring AMQP 项目的参考文档。它提供了关于 Spring 与 AMQP 的集成,特别是 RabbitMQ 的更深入的信息。

入站通道适配器

以下清单显示了 AMQP 入站通道适配器的可能配置选项:

Java DSL
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
XML
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  tx-size=""                      (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
1 此适配器的唯一 ID。可选的。
2 应将转换后的消息发送到的消息通道。必需的。
3 应该从中消费消息的 AMQP 队列(逗号分隔列表)的名称。必需的。
4 的确认模式MessageListenerContainer。当设置为MANUAL时,传递标签和通道分别在消息头amqp_deliveryTag和中提供amqp_channel。用户应用程序负责确认。 NONE表示没有确认 ( autoAck)。 AUTO表示适配器的容器在下游流程完成时进行确认。可选(默认为自动)。请参阅入站端点确认模式
5 处理与此入站通道适配器相关的横切行为的额外 AOP 建议。可选的。
6 指示此组件创建的通道是事务性的标志。如果为真,它会告诉框架使用事务通道并根据结果以提交或回滚结束所有操作(发送或接收),但会发出回滚信号。可选(默认为 false)。
7 指定要创建的并发使用者的数量。默认值为1. 我们建议增加并发消费者的数量以扩展对来自队列的消息的消费。但是,请注意,一旦注册了多个消费者,任何订购保证都将丢失。通常,对低容量队列使用一个消费者。设置“每个队列的消费者”时不允许。可选的。
8 对 RabbitMQ 的 Bean 引用ConnectionFactory。可选(默认为connectionFactory)。
9 错误消息应该发送到的消息通道。可选的。
10 侦听器通道 (com.rabbitmq.client.Channel) 是否暴露给已注册的ChannelAwareMessageListener. 可选(默认为真)。
11 AmqpHeaderMapper接收 AMQP 消息时使用的引用。可选的。默认情况下,仅将标准 AMQP 属性(例如contentType)复制到 Spring Integration MessageHeaders。默认情况下,AMQP 中的任何用户定义的标头MessageProperties都不会复制到消息中DefaultAmqpHeaderMapper。如果提供了“请求标头名称”,则不允许。
12 要从 AMQP 请求映射到MessageHeaders. 仅当未提供“header-mapper”参考时才能提供。此列表中的值也可以是与标头名称匹配的简单模式(例如“*”或“thing1*、thing2”或“*something”)。
13 AbstractMessageListenerContainer用于接收 AMQP 消息的参考。如果提供了此属性,则不应提供与侦听器容器配置相关的其他属性。换句话说,通过设置此引用,您必须对侦听器容器配置承担全部责任。唯一的例外是它MessageListener本身。由于这实际上是此通道适配器实现的核心职责,因此引用的侦听器容器必须没有自己的MessageListener. 可选的。
14 MessageConverter接收 AMQP 消息时使用的。可选的。
15 MessagePropertiesConverter接收 AMQP 消息时使用的。可选的。
16 AbstractMessageListenerContainer指定应该启动和停止底层证券的阶段。启动顺序从最低到最高,关闭顺序与之相反。默认情况下,此值为Integer.MAX_VALUE,表示此容器尽可能晚地启动并尽快停止。可选的。
17 告诉 AMQP 代理在单个请求中向每个消费者发送多少消息。通常,您可以将此值设置得较高以提高吞吐量。它应该大于或等于事务大小(请参阅tx-size此列表后面的属性)。可选(默认为1)。
18 以毫秒为单位接收超时。可选(默认为1000)。
19 指定底层证券的恢复尝试之间的间隔AbstractMessageListenerContainer(以毫秒为单位)。可选(默认为5000)。
20 如果为“true”并且代理上没有可用的队列,则容器在启动期间会引发致命异常,如果在容器运行时删除队列(在尝试被动声明队列三次之后),则容器将停止。如果false,容器不会抛出异常并进入恢复模式,尝试根据recovery-interval. 可选(默认为true)。
21 AbstractMessageListenerContainer在底层停止之后和 AMQP 连接被强制关闭之前等待工作人员的时间(以毫秒为单位) 。如果有任何工作人员在关闭信号到来时处于活动状态,则只要他们能够在此超时时间内完成,就允许他们完成处理。否则,连接将关闭并且消息保持未被确认(如果通道是事务性的)。可选(默认为5000)。
22 默认情况下,底层AbstractMessageListenerContainer使用一个SimpleAsyncTaskExecutor实现,它为每个任务启动一个新线程,异步运行它。默认情况下,并发线程数是无限的。请注意,此实现不重用线程。考虑使用线程池TaskExecutor实现作为替代方案。可选(默认为SimpleAsyncTaskExecutor)。
23 默认情况下,底层AbstractMessageListenerContainer创建一个新的实例DefaultTransactionAttribute(它采用 EJB 方法在运行时回滚但不检查异常)。可选(默认为DefaultTransactionAttribute)。
24 PlatformTransactionManager设置对底层外部的 bean 引用AbstractMessageListenerContainer。事务管理器与channel-transacted属性一起工作。如果框架在发送或接收消息时已经有一个事务在进行中并且channelTransacted标志为true,则消息事务的提交或回滚将推迟到当前事务结束。如果channelTransacted标志是false,则没有事务语义适用于消息传递操作(它是自动确认的)。有关详细信息,请参阅 Spring AMQP 的事务。可选的。
25 告诉SimpleMessageListenerContainer在单个事务中处理多少消息(如果通道是事务性的)。为获得最佳效果,它应小于或等于 中设置的值prefetch-count。设置“每个队列的消费者”时不允许。可选(默认为1)。
26 指示底层侦听器容器应该是 aDirectMessageListenerContainer而不是默认的SimpleMessageListenerContainer。有关详细信息,请参阅Spring AMQP 参考手册
27 当容器consumerBatchEnabledtrue时,确定适配器如何在消息有效负载中呈现这批消息。当设置为MESSAGES(默认)时,有效负载是 aList<Message<?>>其中每条消息都有从传入 AMQP 映射的标头Message,有效负载是转换后的body. 当设置为EXTRACT_PAYLOADS时,有效负载是从 AMQP主体 List<?>转换元素的地方。类似于,但除此之外,每个消息的标头都从相应索引处的a映射;标题名称是.MessageEXTRACT_PAYLOADS_WITH_HEADERSEXTRACT_PAYLOADSMessagePropertiesList<Map<String, Object>AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
容器

请注意,使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。这是因为命名空间至少需要一个<listener/>元素。在此环境中,侦听器位于适配器内部。因此,您必须使用普通的 Spring<bean/>定义来定义容器,如以下示例所示:

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring Integration JMS 和 AMQP 支持相似,但仍存在重要差异。JMS 入站通道适配器JmsDestinationPollingSource在幕后使用 a 并期望配置轮询器。AMQP 入站通道适配器使用一个AbstractMessageListenerContainer并且是消息驱动的。在这方面,它更类似于 JMS 消息驱动的通道适配器。

从 5.5 版开始,可以使用在内部调用重试操作时使用的策略AmqpInboundChannelAdapter进行配置。有关更多信息,请参阅JavaDocs。org.springframework.amqp.rabbit.retry.MessageRecovererRecoveryCallbacksetMessageRecoverer()

批量消息

有关批处理消息的更多信息,请参阅Spring AMQP 文档

要使用 Spring Integration 生成批处理消息,只需使用BatchingRabbitTemplate.

接收批量消息时,默认情况下,侦听器容器提取每个片段消息,适配器将为Message<?>每个片段生成一个。从版本 5.2 开始,如果容器的deBatchingEnabled属性设置为false,则由适配器执行反批处理,并Message<List<?>>生成一个有效负载是片段有效负载列表的单个(如果适当,在转换后)。

默认BatchingStrategy值为SimpleBatchingStrategy,但这可以在适配器上被覆盖。

org.springframework.amqp.rabbit.retry.MessageBatchRecoverer重试操作需要恢复时,必须与批处理一起使用。

轮询入站通道适配器

概述

5.0.1 版引入了轮询通道适配器,让您可以按需获取单个消息——例如,使用 aMessageSourcePollingTemplate或 poller。有关详细信息,请参阅延迟确认可轮询消息源

它目前不支持 XML 配置。

以下示例显示了如何配置AmqpMessageSource

Java DSL
@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
            .handle(p -> {
                ...
            })
            .get();
}
java
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
    return new AmqpMessageSource(connectionFactory, "someQueue");
}

有关配置属性,请参阅Javadoc

XML
This adapter currently does not have XML configuration support.

批量消息

请参阅批处理消息

对于轮询适配器,没有侦听器容器,批量消息总是被分批(如果BatchingStrategy支持这样做的话)。

入站网关

入站网关支持入站通道适配器上的所有属性(除了“通道”被“请求通道”替换),以及一些附加属性。以下清单显示了可用的属性:

Java DSL
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, "foo"))
            .transform(String.class, String::toUpperCase)
            .get();
}
java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
    gateway.setRequestChannel(channel);
    gateway.setDefaultReplyTo("bar");
    return gateway;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                    new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("foo");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new AbstractReplyProducingMessageHandler() {

        @Override
        protected Object handleRequestMessage(Message<?> requestMessage) {
            return "reply to " + requestMessage.getPayload();
        }

    };
}
XML
<int-amqp:inbound-gateway
                          id="inboundGateway"                (1)
                          request-channel="myRequestChannel" (2)
                          header-mapper=""                   (3)
                          mapped-request-headers=""          (4)
                          mapped-reply-headers=""            (5)
                          reply-channel="myReplyChannel"     (6)
                          reply-timeout="1000"               (7)
                          amqp-template=""                   (8)
                          default-reply-to="" />             (9)
1 此适配器的唯一 ID。可选的。
2 将转换后的消息发送到的消息通道。必需的。
3 AmqpHeaderMapper接收 AMQP 消息时使用的引用。可选的。默认情况下,只有标准 AMQP 属性(例如contentType)被复制到 Spring Integration 和从 Spring Integration 复制MessageHeaders。默认情况下,AMQP 中的任何用户定义的标头MessageProperties都不会复制到 AMQP 消息或从 AMQP 消息复制DefaultAmqpHeaderMapper。如果提供了“request-header-names”或“reply-header-names”,则不允许。
4 要从 AMQP 请求映射到MessageHeaders. 仅当未提供“header-mapper”引用时才能提供此属性。此列表中的值也可以是与标头名称匹配的简单模式(例如"*"or"thing1*, thing2""*thing1")。
5 MessageHeaders要映射到 AMQP 回复消息的 AMQP 消息属性的名称的逗号分隔列表。所有标准标头(例如contentType)都映射到 AMQP 消息属性,而用户定义的标头映射到“标头”属性。仅当未提供“header-mapper”引用时才能提供此属性。此列表中的值也可以是与标头名称匹配的简单模式(例如,"*""foo*, bar""*foo")。
6 期望回复消息的消息通道。可选的。
7 设置receiveTimeout基础o.s.i.core.MessagingTemplate以从回复通道接收消息。如果未指定,则此属性默认为1000(1 秒)。仅适用于容器线程在发送回复之前移交给另一个线程的情况。
8 定制的AmqpTemplatebean 引用(对要发送的回复消息有更多的控制)。您可以为RabbitTemplate.
9 没有 属性replyTo o.s.amqp.core.Address时使用的。如果未指定此选项,则提供 no ,请求消息中不存在任何属性,并且由于无法路由回复而引发 an。如果未指定此选项并提供了外部选项,则不会引发异常。如果您预计请求消息中不存在任何属性的情况,您必须指定此选项或在该模板上配置默认值。requestMessagereplyToamqp-templatereplyToIllegalStateExceptionamqp-templateexchangeroutingKeyreplyTo

请参阅入站通道适配器中有关配置listener-container属性的注释。

从 5.5 版开始,可以使用在内部调用重试操作时使用的策略AmqpInboundChannelAdapter进行配置。有关更多信息,请参阅JavaDocs。org.springframework.amqp.rabbit.retry.MessageRecovererRecoveryCallbacksetMessageRecoverer()

批量消息

请参阅批处理消息

入站端点确认模式

默认情况下,入站端点使用确认模式,这意味着容器会在下游集成流程完成时自动确认消息(或使用或AUTO将消息传递给另一个线程)。将模式设置为配置消费者,以便根本不使用确认(代理在发送消息后自动确认消息)。将模式设置为让用户代码在处理过程中的某个其他时间点确认消息。为了支持这一点,在这种模式下,端点分别在和标头中提供和。QueueChannelExecutorChannelNONEMANUALChanneldeliveryTagamqp_channelamqp_deliveryTag

您可以对 执行任何有效的 Rabbit 命令,Channel但通常只使用basicAckbasicNack(或basicReject)。为了不干扰容器的操作,您不应保留对通道的引用并仅在当前消息的上下文中使用它。

由于Channel是对“活动”对象的引用,因此如果消息被持久化,它不能被序列化并且会丢失。

以下示例显示了如何使用MANUAL确认:

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {

    // Do some processing

    if (allOK) {
        channel.basicAck(deliveryTag, false);

        // perhaps do some more processing

    }
    else {
        channel.basicNack(deliveryTag, false, true);
    }
    return someResultForDownStreamProcessing;
}

出站端点

以下出站端点具有许多类似的配置选项。从 5.2 版开始,confirm-timeout添加了 。通常,当发布者确认启用时,代理将快速返回一个 ack(或 nack),该 ack 将发送到适当的通道。如果在收到确认之前关闭了通道,Spring AMQP 框架将合成一个 nack。“丢失”的确认永远不会发生,但是,如果您设置此属性,端点将定期检查它们并在时间过去而没有收到确认的情况下合成一个 nack。

出站通道适配器

以下示例显示了 AMQP 出站通道适配器的可用属性:

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlows.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
XML
<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)
                               channel="outboundChannel"         (2)
                               amqp-template="myAmqpTemplate"    (3)
                               exchange-name=""                  (4)
                               exchange-name-expression=""       (5)
                               order="1"                         (6)
                               routing-key=""                    (7)
                               routing-key-expression=""         (8)
                               default-delivery-mode""           (9)
                               confirm-correlation-expression="" (10)
                               confirm-ack-channel=""            (11)
                               confirm-nack-channel=""           (12)
                               confirm-timeout=""                (13)
                               wait-for-confirm=""               (14)
                               return-channel=""                 (15)
                               error-message-strategy=""         (16)
                               header-mapper=""                  (17)
                               mapped-request-headers=""         (18)
                               lazy-connect="true"               (19)
                               multi-send="false"/>              (20)
1 此适配器的唯一 ID。可选的。
2 消息应该发送到的消息通道,以便将它们转换并发布到 AMQP 交换。必需的。
3 对配置的 AMQP 模板的 Bean 引用。可选(默认为amqpTemplate)。
4 向其发送消息的 AMQP 交换的名称。如果未提供,则将消息发送到默认的无名交换器。与“exchange-name-expression”互斥。可选的。
5 一个 SpEL 表达式,用于确定向其发送消息的 AMQP 交换的名称,消息作为根对象。如果未提供,则将消息发送到默认的无名交换器。与“exchange-name”互斥。可选的。
6 注册多个消费者时此消费者的顺序,从而启用负载平衡和故障转移。可选(默认为Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 发送消息时使用的固定路由键。默认情况下,这是一个空的String. 与“路由键表达式”互斥。可选的。
8 一个 SpEL 表达式,用于确定发送消息时要使用的路由键,将消息作为根对象(例如,'payload.key')。默认情况下,这是一个空的String. 与“路由键”互斥。可选的。
9 消息的默认传递模式:PERSISTENTNON_PERSISTENT。如果header-mapper设置了交付模式,则被覆盖。如果存在 Spring Integration 消息头amqp_deliveryMode,则DefaultHeaderMapper设置该值。MessagePropertiesConverter如果未提供此属性并且标头映射器未设置它,则默认值取决于RabbitTemplate. 如果根本没有自定义,则默认为PERSISTENT. 可选的。
10 定义相关数据的表达式。如果提供,这会将底层 AMQP 模板配置为接收发布者确认。需要一个 dedicatedRabbitTemplate和一个CachingConnectionFactory属性publisherConfirms设置为true. 当收到发布者确认并提供相关数据时,将根据确认类型将其写入confirm-ack-channel或。confirm-nack-channel确认的有效负载是相关数据,如该表达式所定义。该消息的“amqp_publishConfirm”标头设置为true( ack) 或false( nack)。示例:headers['myCorrelationData']payload。4.1 版引入了amqp_publishConfirmNackCause消息头。它包含cause发布者确认的“nack”。从 4.2 版开始,如果表达式解析为Message<?>实例(例如#this),则在ack/nack通道上发出的消息基于该消息,并添加了额外的标头。以前,无论类型如何,都会使用关联数据作为其有效负载创建新消息。另请参阅发布者确认和退货的替代机制。可选的。
11 ack肯定 ( ) 发布者确认发送到的通道。有效载荷是由confirm-correlation-expression. 如果表达式是#rootor #this,则消息是从原始消息构建的,并且amqp_publishConfirm标头设置为true。另请参阅发布者确认和退货的替代机制。可选(默认为nullChannel)。
12 nack向其发送否定 ( ) 发布者确认的通道。有效载荷是由定义的相关数据confirm-correlation-expression(如果没有ErrorMessageStrategy配置)。如果表达式是#rootor #this,则消息是从原始消息构建的,并且amqp_publishConfirm标头设置为false。当有 时ErrorMessageStrategy,消息是ErrorMessage带有NackedAmqpMessageException有效负载的。另请参阅发布者确认和退货的替代机制。可选(默认为nullChannel)。
13 设置后,如果在毫秒时间内未收到发布者确认,适配器将合成否定确认 (nack)。每 50% 的该值检查一次未决确认,因此发送 nack 的实际时间将在该值的 1x 和 1.5x 之间。另请参阅发布者确认和退货的替代机制。默认无(不会生成nacks)。
14 当设置为 true 时,调用线程将阻塞,等待发布者确认。这需要一个RabbitTemplate配置为 Confirms 以及一个confirm-correlation-expression. 线程最多会阻塞confirm-timeout(或默认为 5 秒)。如果发生超时,MessageTimeoutException将抛出 a。如果启用返回并返回一条消息,或者在等待确认时发生任何其他异常,MessageHandlingException则将抛出 a 并带有适当的消息。
15 返回消息发送到的通道。提供时,底层 AMQP 模板配置为将无法传递的消息返回到适配器。如果没有ErrorMessageStrategy配置,则从 AMQP 接收到的数据构造消息,并带有以下附加标头:amqp_returnReplyCode, amqp_returnReplyText, amqp_returnExchange, amqp_returnRoutingKey。当有 时ErrorMessageStrategy,消息是ErrorMessage带有ReturnedAmqpMessageException有效负载的。另请参阅发布者确认和退货的替代机制。可选的。
16 对用于在发送返回或否定确认消息时ErrorMessageStrategy构建实例的实现的引用。ErrorMessage
17 AmqpHeaderMapper发送 AMQP 消息时使用的引用。默认情况下,仅将标准 AMQP 属性(例如contentType)复制到 Spring Integration MessageHeaders。默认情况下,任何用户定义的标头都不会复制到消息中。DefaultAmqpHeaderMapper。如果提供了“请求标头名称”,则不允许。可选的。
18 要从 映射MessageHeaders到 AMQP 消息的 AMQP 标头名称的逗号分隔列表。如果提供了“header-mapper”引用,则不允许。此列表中的值也可以是与标头名称匹配的简单模式(例如"*"or"thing1*, thing2""*thing1")。
19 当设置为false时,端点会在应用程序上下文初始化期间尝试连接到代理。这允许“快速失败”检测错误配置,但如果代理关闭,也会导致初始化失败。当true(默认值),在发送第一条消息时建立连接(如果它不存在,因为某些其他组件建立了它)。
20 当设置为true时,类型的有效负载Iterable<Message<?>>将在单个RabbitTemplate调用范围内的同一通道上作为离散消息发送。需要一个RabbitTemplate. 当wait-for-confirms为真时,RabbitTemplate.waitForConfirmsOrDie()在消息发送后调用。使用事务模板,发送将在新事务或已启动的事务(如果存在)中执行。
返回通道

使用 areturn-channel需要 aRabbitTemplatemandatory属性设置为true和 aCachingConnectionFactorypublisherReturns属性设置为true。当使用带返回的多个出站端点时,RabbitTemplate每个端点都需要一个单独的端点。

出站网关

以下清单显示了 AMQP 出站网关的可能属性:

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
                    .routingKey("foo")) // default exchange - route to queue 'foo'
            .get();
}

@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {

    String sendToRabbit(String data);

}
java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setExpectReply(true);
    outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {

    String sendToRabbit(String data);

}
XML
<int-amqp:outbound-gateway id="outboundGateway"               (1)
                           request-channel="myRequestChannel" (2)
                           amqp-template=""                   (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           error-message-strategy=""          (18)
                           lazy-connect="true" />             (19)
1 此适配器的唯一 ID。可选的。
2 将消息发送到的消息通道,以便将它们转换并发布到 AMQP 交换。必需的。
3 对配置的 AMQP 模板的 Bean 引用。可选(默认为amqpTemplate)。
4 应该向其发送消息的 AMQP 交换的名称。如果未提供,则将消息发送到默认的无名 cxchange。与“exchange-name-expression”互斥。可选的。
5 一个 SpEL 表达式,用于确定应该向其发送消息的 AMQP 交换的名称,消息作为根对象。如果未提供,则将消息发送到默认的无名交换器。与“exchange-name”互斥。可选的。
6 注册多个消费者时此消费者的顺序,从而启用负载平衡和故障转移。可选(默认为Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 从 AMQP 队列接收并转换后应将回复发送到的消息通道。可选的。
8 网关在发送回复消息到reply-channel. 这仅适用于reply-channelcan 阻塞的情况——例如QueueChannel当前已满的容量限制。默认为无穷大。
9 时,如果在属性true内没有收到回复消息,网关将引发异常。AmqpTemplate’s `replyTimeout默认为true.
10 routing-key发送消息时使用的。默认情况下,这是一个空的String. 与“路由键表达式”互斥。可选的。
11 一个 SpEL 表达式,用于确定routing-key在发送消息时使用的,消息作为根对象(例如,'payload.key')。默认情况下,这是一个空的String. 与“路由键”互斥。可选的。
12 消息的默认传递模式:PERSISTENTNON_PERSISTENT。如果header-mapper设置了交付模式,则被覆盖。如果存在 Spring Integration 消息头amqp_deliveryMode,则DefaultHeaderMapper设置该值。MessagePropertiesConverter如果未提供此属性并且标头映射器未设置它,则默认值取决于RabbitTemplate. 如果根本没有自定义,则默认为PERSISTENT. 可选的。
13 从 4.2 版开始。定义相关数据的表达式。提供时,这会将底层 AMQP 模板配置为接收发布者确认。需要一个 dedicatedRabbitTemplate和一个CachingConnectionFactory属性publisherConfirms设置为true. 当接收到发布者确认并提供相关数据时,将根据确认类型将其写入confirm-ack-channel或。confirm-nack-channel确认的有效负载是相关数据,如该表达式所定义。该消息的标头“amqp_publishConfirm”设置为true( ack) 或false( nack)。为了nack确认,Spring Integration 提供了一个额外的 header amqp_publishConfirmNackCause。示例:headers['myCorrelationData']payload。如果表达式解析为一个Message<?>实例(例如#this),在ack/nack通道上发出的消息基于该消息,并添加了额外的标头。以前,无论类型如何,都会使用关联数据作为其有效负载创建新消息。另请参阅发布者确认和退货的替代机制。可选的。
14 ack向其发送肯定 ( ) 发布者确认的通道。有效载荷是定义的相关数据confirm-correlation-expression。如果表达式是#rootor #this,则消息是从原始消息构建的,并且amqp_publishConfirm标头设置为true。另请参阅发布者确认和退货的替代机制。可选(默认为nullChannel)。
15 nack向其发送否定 ( ) 发布者确认的通道。有效载荷是定义的相关数据confirm-correlation-expression(如果没有ErrorMessageStrategy配置)。如果表达式是#rootor #this,则消息是从原始消息构建的,并且amqp_publishConfirm标头设置为false。当有 时ErrorMessageStrategy,消息是ErrorMessage带有NackedAmqpMessageException有效负载的。另请参阅发布者确认和退货的替代机制。可选(默认为nullChannel)。
16 设置后,如果在毫秒时间内未收到发布者确认,网关将合成否定确认 (nack)。每 50% 的该值检查一次未决确认,因此发送 nack 的实际时间将在该值的 1x 和 1.5x 之间。默认无(不会生成 nack)。
17 返回消息发送到的通道。提供时,底层 AMQP 模板配置为将无法传递的消息返回到适配器。当没有ErrorMessageStrategy配置时,消息是根据从 AMQP 接收到的数据构造的,带有以下附加标头:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。当有 时ErrorMessageStrategy,消息是ErrorMessage带有ReturnedAmqpMessageException有效负载的。另请参阅发布者确认和退货的替代机制。可选的。
18 对用于在发送返回或否定确认消息时ErrorMessageStrategy构建实例的实现的引用。ErrorMessage
19 当设置为false时,端点会在应用程序上下文初始化期间尝试连接到代理。这允许通过在代理关闭时记录错误消息来“快速”检测错误配置。当true(默认值),在发送第一条消息时建立连接(如果它不存在,因为某些其他组件建立了它)。
返回通道

使用 areturn-channel需要 aRabbitTemplatemandatory属性设置为true和 aCachingConnectionFactorypublisherReturns属性设置为true。当使用带返回的多个出站端点时,RabbitTemplate每个端点都需要一个单独的端点。

底层证券AmqpTemplate的默认replyTimeout值为五秒。如果您需要更长的超时时间,则必须在template.

请注意,出站适配器和出站网关配置之间的唯一区别是 expectReply属性的设置。

异步出站网关

上一节中讨论的网关是同步的,因为发送线程被挂起,直到收到回复(或发生超时)。Spring Integration 4.3 版添加了一个异步网关,它使用AsyncRabbitTemplate来自 Spring AMQP。当发送消息时,线程在发送操作完成后立即返回,并且,当接收到消息时,在模板的侦听器容器线程上发送回复。当在轮询线程上调用网关时,这可能很有用。线程被释放并且可用于框架中的其他任务。

以下清单显示了 AMQP 异步出站网关的可能配置选项:

Java DSL
@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
java
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
XML
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 此适配器的唯一 ID。可选的。
2 消息应该发送到的消息通道,以便将它们转换并发布到 AMQP 交换。必需的。
3 Bean 对已配置的AsyncRabbitTemplate. 可选(默认为asyncRabbitTemplate)。
4 应该向其发送消息的 AMQP 交换的名称。如果未提供,则将消息发送到默认的无名交换器。与“exchange-name-expression”互斥。可选的。
5 一个 SpEL 表达式,用于确定向其发送消息的 AMQP 交换的名称,消息作为根对象。如果未提供,则将消息发送到默认的无名交换器。与“exchange-name”互斥。可选的。
6 注册多个消费者时此消费者的顺序,从而启用负载平衡和故障转移。可选(默认为Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 从 AMQP 队列接收并转换后应将回复发送到的消息通道。可选的。
8 网关在发送回复消息到reply-channel. 这仅适用于reply-channelcan 阻塞的情况——例如QueueChannel当前已满的容量限制。默认值为无穷大。
9 如果在属性内未收到回复消息AsyncRabbitTemplate’s `receiveTimeout且此设置为true,网关将向入站消息的标头发送错误消息errorChannel。当在属性内没有收到回复消息AsyncRabbitTemplate’s `receiveTimeout并且此设置为false时,网关将错误消息发送到默认值errorChannel(如果可用)。它默认为true.
10 发送消息时使用的路由键。默认情况下,这是一个空的String. 与“路由键表达式”互斥。可选的。
11 一个 SpEL 表达式,用于确定发送消息时使用的路由键,消息作为根对象(例如,'payload.key')。默认情况下,这是一个空的String. 与“路由键”互斥。可选的。
12 消息的默认传递模式:PERSISTENTNON_PERSISTENT。如果header-mapper设置了交付模式,则被覆盖。如果存在 Spring Integration 消息头 ( amqp_deliveryMode),则DefaultHeaderMapper设置该值。MessagePropertiesConverter如果未提供此属性并且标头映射器未设置它,则默认值取决于RabbitTemplate. 如果未自定义,则默认为PERSISTENT. 可选的。
13 定义相关数据的表达式。如果提供,这会将底层 AMQP 模板配置为接收发布者确认。需要一个 dedicatedRabbitTemplate和 a CachingConnectionFactory,其publisherConfirms属性设置为true。当收到发布者确认并提供相关数据时,确认将写入confirm-ack-channelconfirm-nack-channel,具体取决于确认类型。确认的有效负载是此表达式定义的相关数据,并且消息的“amqp_publishConfirm”标头设置为true( ack) 或false( nack)。例如nack,提供了一个额外的标题 ( amqp_publishConfirmNackCause)。例子:headers['myCorrelationData'], payload. 如果表达式解析为Message<?>实例(例如“#this”),在ack/nack通道上发出的消息基于该消息,并添加了额外的标头。另请参阅发布者确认和退货的替代机制。可选的。
14 ack向其发送肯定 ( ) 发布者确认的通道。有效载荷是由confirm-correlation-expression. 要求底层证券AsyncRabbitTemplateenableConfirms属性设置为true。另请参阅发布者确认和退货的替代机制。可选(默认为nullChannel)。
15 从 4.2 版开始。nack向其发送否定 ( ) 发布者确认的通道。有效载荷是由confirm-correlation-expression. 要求底层证券AsyncRabbitTemplateenableConfirms属性设置为true。另请参阅发布者确认和退货的替代机制。可选(默认为nullChannel)。
16 设置后,如果在毫秒时间内未收到发布者确认,网关将合成否定确认 (nack)。每 50% 的该值检查一次未决确认,因此发送 nack 的实际时间将在该值的 1x 和 1.5x 之间。另请参阅发布者确认和退货的替代机制。默认无(不会生成nacks)。
17 返回消息发送到的通道。提供时,底层 AMQP 模板配置为将无法传递的消息返回到网关。该消息是根据从 AMQP 接收的数据构造的,带有以下附加标头:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。要求底层证券AsyncRabbitTemplatemandatory属性设置为true。另请参阅发布者确认和退货的替代机制。可选的。
18 当设置为false时,端点会在应用程序上下文初始化期间尝试连接到代理。这样做可以通过在代理关闭时记录错误消息来“快速”检测错误配置。当true(默认值),在发送第一条消息时建立连接(如果它不存在,因为某些其他组件建立了它)。

另请参阅异步服务激活器以获取更多信息。

兔子模板

当您使用确认和退货时,我们建议您使用专用的RabbitTemplate接线。AsyncRabbitTemplate否则,可能会遇到意想不到的副作用。

出版商确认和退货的替代机制

当为发布者确认和返回配置连接工厂时,上面的部分讨论了配置消息通道以异步接收确认和返回。从 5.4 版开始,有一个额外的机制通常更容易使用。

在这种情况下,不要配置confirm-correlation-expression确认和返回通道。相反,在标题中添加一个CorrelationData实例;然后,您可以稍后通过检查您已发送消息AmqpHeaders.PUBLISH_CONFIRM_CORRELATION的实例中的未来状态来等待结果。CorrelationData在未来完成之前,该returnedMessage字段将始终被填充(如果返回消息)。

CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
        .setHeader("rk", "someKeyThatWontRoute")
        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
        .build());
...
try {
    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
    Message returned = corr.getReturnedMessage();
    if (returned !- null) {
        // message could not be routed
    }
}
catch { ... }

为了提高性能,您可能希望发送多条消息并稍后等待确认,而不是一次一条。返回的消息是转换后的原始消息;您可以CorrelationData使用所需的任何其他数据进行子类化。

入站消息转换

到达通道适配器或网关的入站消息spring-messaging Message<?>使用消息转换器转换为有效负载。默认情况下,使用 aSimpleMessageConverter来处理 java 序列化和文本。DefaultHeaderMapper.inboundMapper()默认情况下使用 映射标头。如果发生转换错误,并且没有定义错误通道,则将异常抛出到容器并由侦听器容器的错误处理程序处理。默认错误处理程序将转换错误视为致命错误,并且消息将被拒绝(并路由到死信交换,如果队列是这样配置的)。如果定义了错误通道,则ErrorMessage有效负载是ListenerExecutionFailedException带有属性failedMessage(无法转换的 Spring AMQP 消息)和cause. 如果容器AcknowledgeModeAUTO(默认)并且错误流消耗错误而不抛出异常,原始消息将被确认。如果错误流抛出异常,异常类型连同容器的错误处理程序将决定消息是否被重新排队。如果容器配置了AcknowledgeMode.MANUAL,则有效负载是ManualAckListenerExecutionFailedException具有附加属性的 achanneldeliveryTag。这使错误流能够为消息调用basicAckbasicNack(或basicReject),以控制其处置。

出站消息转换

Spring AMQP 1.4 引入了ContentTypeDelegatingMessageConverter,根据传入的内容类型消息属性选择实际转换器。这可以由入站端点使用。

从 Spring Integration 版本 4.3 开始,您也可以使用ContentTypeDelegatingMessageConverteron 出站端点,contentType标头指定使用哪个转换器。

下面的示例配置一个ContentTypeDelegatingMessageConverter,默认转换器是SimpleMessageConverter(处理 Java 序列化和纯文本),以及一个 JSON 转换器:

<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
                               exchange-name="someExchange"
                               routing-key="someKey"
                               amqp-template="amqpTemplateContentTypeConverter" />

<int:channel id="ctRequestChannel"/>

<rabbit:template id="amqpTemplateContentTypeConverter"
        connection-factory="connectionFactory" message-converter="ctConverter" />

<bean id="ctConverter"
        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json">
                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
            </entry>
        </map>
    </property>
</bean>

将消息发送到ctRequestChannelcontentType头设置为application/json会导致选择 JSON 转换器。

这适用于出站通道适配器和网关。

从版本 5.0 开始,添加到MessageProperties出站邮件的标头永远不会被映射的标头覆盖(默认情况下)。以前,只有消息转换器是 a 时才会出现这种情况ContentTypeDelegatingMessageConverter(在这种情况下,首先映射标头以便可以选择正确的转换器)。对于其他转换器,例如SimpleMessageConverter,映射的标头会覆盖转换器添加的任何标头。当出站消息有一些剩余的contentType标头(可能来自入站通道适配器)并且正确的出站contentType被错误地覆盖时,这会导致问题。解决方法是在将消息发送到出站端点之前使用标头过滤器删除标头。

但是,在某些情况下,需要前面的行为——例如,当一个String包含 JSON 的有效负载时,SimpleMessageConverter它不知道内容并将contentTypemessage 属性设置为,text/plain但您的应用程序希望application/json通过设置contentTypeheader来覆盖它发送到出站端点的消息。正是这样ObjectToJsonTransformer做的(默认情况下)。

现在有一个headersMappedLast在出站通道适配器和网关(以及 AMQP 支持的通道)上调用的属性。将此设置为true恢复覆盖转换器添加的属性的行为。

从版本 5.1.9 开始,当我们产生回复并希望覆盖由转换器填充的标头时,replyHeadersMappedLast提供了类似的功能。AmqpInboundGateway有关更多信息,请参阅其 JavaDocs。

出站用户 ID

Spring AMQP 1.6 版引入了一种机制,允许为出站消息指定默认用户 ID。始终可以设置AmqpHeaders.USER_ID标题,现在它优先于默认值。这可能对消息接收者有用。对于入站消息,如果消息发布者设置了该属性,则它在AmqpHeaders.RECEIVED_USER_ID标头中可用。请注意,RabbitMQ验证用户 ID 是连接的实际用户 ID,或者连接允许模拟

要为出站消息配置默认用户 ID,请在 a 上配置它RabbitTemplate并将出站适配器或网关配置为使用该模板。同样,要在回复中设置用户 ID 属性,请将适当配置的模板注入入站网关。有关更多信息,请参阅Spring AMQP 文档

延迟消息交换

Spring AMQP 支持RabbitMQ 延迟消息交换插件。对于入站消息,x-delay标头映射到AmqpHeaders.RECEIVED_DELAY标头。设置标头会导致在出站消息中设置AMQPHeaders.DELAY相应的标头。x-delay您还可以在出站端点上指定delaydelayExpression属性(delay-expression使用 XML 配置时)。这些属性优先于AmqpHeaders.DELAY标题。

AMQP 支持的消息通道

有两种可用的消息通道实现。一种是点对点,另一种是发布-订阅。AmqpTemplate这两个通道都为底层和 网关提供了广泛的配置属性SimpleMessageListenerContainer(如本章前面的通道适配器和网关所示)。但是,我们在此处展示的示例具有最少的配置。探索 XML 模式以查看可用属性。

点对点通道可能类似于以下示例:

<int-amqp:channel id="p2pChannel"/>

在幕后,前面的例子导致一个Queue命名si.p2pChannel的被声明,并且这个通道发送到那个Queue(从技术上讲,通过发送到一个与这个名称匹配的路由键的无名直接交换Queue)。该频道还注册了一个消费者Queue。如果您希望通道是“可轮询的”而不是消息驱动的,请为message-driven标志提供值false,如以下示例所示:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅通道可能如下所示:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在幕后,前面的示例导致si.fanout.pubSubChannel声明一个名为的扇出交换,并且该通道发送到该扇出交换。该通道还声明了一个服务器命名的独占、自动删除、非持久Queue,并将其绑定到扇出交换,同时在其上注册消费者Queue以接收消息。发布-订阅-通道没有“可轮询”选项。它必须是消息驱动的。

从 4.1 版开始,AMQP 支持的消息通道(与 结合使用)channel-transacted支持 template-channel-transacted单独transactional配置AbstractMessageListenerContainer. RabbitTemplate请注意,以前channel-transactedtrue默认设置。现在,默认情况下,它false用于AbstractMessageListenerContainer.

在 4.3 版之前,AMQP 支持的通道仅支持带有Serializable有效负载和标头的消息。整个消息被转换(序列化)并发送到 RabbitMQ。现在,您可以将extract-payload属性(或setExtractPayload()在使用 Java 配置时)设置为true. 当此标志为true时,将转换消息有效负载并映射标头,其方式类似于您使用通道适配器时的方式。这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如Jackson2JsonMessageConverter. 有关默认映射标头的更多信息,请参阅AMQP 消息标头。您可以通过提供使用outbound-header-mapperinbound-header-mapper属性的自定义映射器来修改映射。您现在还可以指定一个default-delivery-mode,用于设置没有amqp_deliveryModeheader时的投递方式。默认情况下,Spring AMQPMessageProperties使用PERSISTENT交付模式。

与其他支持持久性的通道一样,支持 AMQP 的通道旨在提供消息持久性以避免消息丢失。它们不打算将工作分配给其他对等应用程序。为此,请改用通道适配器。
从 5.0 版开始,可轮询通道现在阻塞指定的轮询线程receiveTimeout(默认为 1 秒)。以前,与其他PollableChannel实现不同,如果没有可用消息,线程会立即返回调度程序,而不管接收超时。阻塞比使用 abasicGet()来检索消息(没有超时)要贵一点,因为必须创建一个消费者来接收每条消息。要恢复以前的行为,请将轮询器设置receiveTimeout为 0。

使用 Java 配置进行配置

以下示例显示如何使用 Java 配置配置通道:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 进行配置

以下示例显示如何使用 Java DSL 配置通道:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}

AMQP 消息头

概述

Spring Integration AMQP 适配器自动映射所有 AMQP 属性和标头。(这是对 4.3 的更改 - 以前,仅映射了标准标头)。默认情况下,这些属性MessageHeaders通过使用 DefaultAmqpHeaderMapper.

您可以传入您自己的 AMQP 特定标头映射器的实现,因为适配器具有支持这样做的属性。

除非MessageProperties被. requestHeaderNames_ replyHeaderNames_ DefaultAmqpHeaderMapper默认情况下,对于出站映射器,不映射任何x-*标头。请参阅本节后面出现的警告以了解原因。

要覆盖默认值并恢复到 4.3 之前的行为,请在属性中使用STANDARD_REQUEST_HEADERS和 。STANDARD_REPLY_HEADERS

映射用户定义的标头时,值还可以包含要匹配 的简单通配符模式(例如thing*or )。匹配所有标题 *thing*

从版本 4.1 开始,AbstractHeaderMapperDefaultAmqpHeaderMapper超类)允许为and属性NON_STANDARD_HEADERS配置令牌(除了现有的and )以映射所有用户定义的标头。requestHeaderNamesreplyHeaderNamesSTANDARD_REQUEST_HEADERSSTANDARD_REPLY_HEADERS

该类org.springframework.amqp.support.AmqpHeaders标识了以下使用的默认标头DefaultAmqpHeaderMapper

  • amqp_appId

  • amqp_clusterId

  • amqp_contentEncoding

  • amqp_contentLength

  • content-type(见标题contentType

  • amqp_correlationId

  • amqp_delay

  • amqp_deliveryMode

  • amqp_deliveryTag

  • amqp_expiration

  • amqp_messageCount

  • amqp_messageId

  • amqp_receivedDelay

  • amqp_receivedDeliveryMode

  • amqp_receivedExchange

  • amqp_receivedRoutingKey

  • amqp_redelivered

  • amqp_replyTo

  • amqp_timestamp

  • amqp_type

  • amqp_userId

  • amqp_publishConfirm

  • amqp_publishConfirmNackCause

  • amqp_returnReplyCode

  • amqp_returnReplyText

  • amqp_returnExchange

  • amqp_returnRoutingKey

  • amqp_channel

  • amqp_consumerTag

  • amqp_consumerQueue

如本节前面所述,使用头映射模式*是复制所有头的常用方法。但是,这可能会产生一些意想不到的副作用,因为某些 RabbitMQ 专有属性/标头也会被复制。例如,当您使用federation时,接收到的消息可能有一个名为 的属性x-received-from,其中包含发送消息的节点。如果您使用通配符*对于入站网关上的请求和回复标头映射,会复制此标头,这可能会导致一些联合问题。此回复消息可能会被联合回发送代理,它可能认为消息正在循环,因此会默默地丢弃它。如果您希望使用通配符标头映射的便利,您可能需要过滤掉下游流中的一些标头。例如,为了避免将x-received-from标头复制回回复,您可以<int:header-filter …​ header-names="x-received-from">在将回复发送到 AMQP 入站网关之前使用。或者,您可以明确列出您实际想要映射的那些属性,而不是使用通配符。由于这些原因,对于入站消息,映射器(默认情况下)不映射任何x-*标头。它也没有映射deliveryModeamqp_deliveryMode标头,以避免该标头从入站消息传播到出站消息。相反,此标头映射到amqp_receivedDeliveryMode未映射到输出的 。

从版本 4.3 开始,标头映射中的模式可以通过在模式前面加上!. 否定模式获得优先权,因此诸如STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1不映射thing1(nor thing2nor thing3) 之类的列表。标准标题加上badqux被映射。否定技术可能很有用,例如当 JSON 反序列化逻辑在接收器下游以不同方式完成时,不映射传入消息的 JSON 类型标头。为此,!json_*应为入站通道适配器/网关的标头映射器配置模式。

!如果您有一个以您希望映射 的开头的用户定义标头,则需要使用 对其进行转义\,如下所示STANDARD_REQUEST_HEADERS,\!myBangHeader:命名的标头!myBangHeader现在已映射。
从版本 5.1 开始,如果出站消息上不存在相应的或标头,则将分别DefaultAmqpHeaderMapper回退到映射MessageHeaders.IDMessageHeaders.TIMESTAMPMessageProperties.messageId和。入站属性将像以前一样映射到标头。当消息消费者使用有状态重试时 ,填充属性很有用。MessageProperties.timestampamqp_messageIdamqp_timestampamqp_*messageId

contentType标题_

与其他标题不同,AmqpHeaders.CONTENT_TYPE不以amqp_;为前缀 这允许跨不同技术透明地传递 contentType 标头。例如,发送到 RabbitMQ 队列的入站 HTTP 消息。

contentType头映射到 Spring AMQP 的MessageProperties.contentType属性,随后映射到 RabbitMQ 的content_type属性。

在 5.1 版本之前,此标头也被映射为映射中的条目MessageProperties.headers;这是不正确的,此外,由于基础 Spring AMQP 消息转换器可能已更改内容类型,因此该值可能是错误的。这样的更改将反映在第一类content_type属性中,但不会反映在 RabbitMQ 标头映射中。入站映射忽略了标头映射值。 contentType不再映射到标题映射中的条目。

严格的消息排序

本节介绍入站和出站消息的消息排序。

入站

如果您需要对入站消息进行严格排序,则必须将入站侦听器容器的prefetchCount属性配置为1. 这是因为,如果消息失败并重新传递,它会在现有预取消息之后到达。从 Spring AMQP 2.0 版开始,prefetchCount默认250为提高性能。严格的订购要求是以降低性能为代价的。

出境

考虑以下集成流程:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlows.from(Gateway.class)
            .split(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假设我们发送消息A,BC到网关。虽然消息A, B,很可能C是按顺序发送的,但不能保证。这是因为模板为每个发送操作从缓存中“借用”了一个通道,并且不能保证每个消息都使用相同的通道。一种解决方案是在拆分器之前启动事务,但是在 RabbitMQ 中事务的成本很高,并且会降低数百倍的性能。

为了更有效地解决这个问题,从 5.1 版本开始,Spring Integration 提供了BoundRabbitChannelAdvice一个HandleMessageAdvice. 请参阅处理消息通知。在拆分器之前应用时,它确保所有下游操作都在同一通道上执行,并且可以选择等待,直到收到所有已发送消息的发布者确认(如果连接工厂配置为确认)。下面的例子展示了如何使用BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlows.from(Gateway.class)
            .split(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

请注意,在通知和出站适配器中使用了相同的RabbitTemplate(实现)。RabbitOperations通知在模板的方法中运行下游流,invoke以便所有操作在同一通道上运行。如果提供了可选的超时,当流程完成时,通知会调用该waitForConfirmsOrDie方法,如果在指定时间内未收到确认,则会引发异常。

QueueChannel下游流( 、、ExecutorChannel等) 中不得有线程切换。

AMQP 样本

目前,一个示例通过使用出站通道适配器和入站通道适配器来演示 Spring Integration AMQP 适配器的基本功能。由于示例中的 AMQP 代理实现使用RabbitMQ

为了运行该示例,您需要一个正在运行的 RabbitMQ 实例。仅具有基本默认值的本地安装就足够了。详细的 RabbitMQ 安装过程见https://www.rabbitmq.com/install.html

启动示例应用程序后,在命令提示符处输入一些文本,然后将包含该输入文本的消息发送到 AMQP 队列。作为回报,该消息由 Spring Integration 检索并打印到控制台。

下图说明了此示例中使用的基本 Spring Integration 组件集。

spring 集成 amqp 示例图
图 1. AMQP 示例的 Spring Integration 图

1. see XML Configuration