JMS 支持

Spring Integration 提供了用于接收和发送 JMS 消息的通道适配器。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-jms:5.5.13"

javax.jms:javax.jms-api必须通过一些特定于 JMS 供应商的实现来显式添加,例如 Apache ActiveMQ 。

实际上有两个基于 JMS 的入站通道适配器。第一个使用 Spring 的JmsTemplate基于轮询周期的接收。第二种是“消息驱动”,依赖于 SpringMessageListener容器。出站通道适配器使用JmsTemplate来按需转换和发送 JMS 消息。

通过使用JmsTemplateMessageListener容器,Spring Integration 依赖于 Spring 的 JMS 支持。理解这一点很重要,因为这些适配器上公开的大多数属性都配置了底层JmsTemplateMessageListener容器。有关容器的更多详细信息JmsTemplateMessageListener请参阅Spring JMS 文档

尽管 JMS 通道适配器旨在用于单向消息传递(仅发送或仅接收),但 Spring Integration 还为请求和回复操作提供了入站和出站 JMS 网关。入站网关依赖于 Spring 的一种MessageListener容器实现来进行消息驱动的接收。它还能够根据reply-to接收到的消息向目的地发送返回值。request-destination出站网关向(orrequest-destination-name或)发送 JMS 消息request-destination-expression,然后接收回复消息。您可以显式配置reply-destination引用(或reply-destination-namereply-destination-expression)。否则,出站网关使用 JMS TemporaryQueue

在 Spring Integration 2.2 之前,如有必要,TemporaryQueue会为每个请求或回复创建(并删除)一个。从 Spring Integration 2.2 开始,您可以将出站网关配置为使用MessageListener容器来接收回复,而不是直接使用新的(或缓存的)Consumer来接收每个请求的回复。当这样配置并且没有提供明确的回复目的地时,TemporaryQueue每个网关使用一个,而不是每个请求使用一个。

入站通道适配器

入站通道适配器需要对单个JmsTemplate实例或 aConnectionFactory和 a的引用Destination(您可以提供“destinationName”来代替“destination”引用)。以下示例定义了一个带有Destination引用的入站通道适配器:

Java DSL
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
科特林 DSL
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }
java
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}
XML
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
从前面的配置中可以看出,这inbound-channel-adapter是一个轮询消费者。这意味着它receive()在触发时调用。您应该仅在轮询相对不频繁且及时性不重要的情况下使用此选项。对于所有其他情况(绝大多数基于 JMS 的用例),message-driven-channel-adapter稍后描述)是更好的选择。
默认情况下,所有需要引用的 JMS 适配器都会ConnectionFactory自动查找名为jmsConnectionFactory. 这就是为什么您connection-factory在许多示例中看​​不到属性的原因。但是,如果您的 JMSConnectionFactory具有不同的 bean 名称,则需要提供该属性。

如果extract-payload设置为true(默认值),则接收到的 JMS 消息通过MessageConverter. 当依赖 defaultSimpleMessageConverter时,这意味着生成的 Spring Integration Message 将 JMS 消息的主体作为其有效负载。JMSTextMessage生成基于字符串的有效负载,JMSBytesMessage生成字节数组有效负载,而 JMS 的可序列化实例ObjectMessage成为 Spring Integration 消息的有效负载。如果您希望将原始 JMS 消息作为 Spring Integration 消息的有效负载,请将extractPayload选项设置为false.

从版本 5.0.8 开始, and 的默认值为(receive-timeout-1等待),否则为 1 秒。JMS 入站通道适配器根据提供的和选项创建一个。如果需要外部(例如在 Spring Boot 环境中),或者没有缓存,或者没有,建议设置是否期望非阻塞消耗:org.springframework.jms.connection.CachingConnectionFactorycacheConsumersDynamicJmsTemplateConnectionFactoryJmsTemplateConnectionFactorycacheConsumersjmsTemplate.receiveTimeout(-1)

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

交易

从 4.0 版开始,入站通道适配器支持该session-transacted属性。在早期版本中,您必须注入一个JmsTemplatewith sessionTransactedset to true。(适配器确实允许您将acknowledge属性设置为transacted,但这是不正确的并且不起作用)。

但是请注意,设置session-transactedtrue没有什么价值,因为在receive()操作之后和消息发送到channel.

如果您希望整个流程是事务性的(例如,如果有下游出站通道适配器),则必须使用transactional带有JmsTransactionManager. 或者,考虑使用jms-message-driven-channel-adapter设置acknowledgetransacted(默认)。

消息驱动的通道适配器

message-driven-channel-adapter需要引用 Spring 容器的实例(的任何子MessageListenerAbstractMessageListenerContainer)或两者ConnectionFactory,并且Destination(可以提供“destinationName”来代替“destination”引用)。以下示例定义了一个带有Destination引用的消息驱动通道适配器:

Java DSL
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}
科特林 DSL
@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }
java
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}
XML
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>

MessageListener消息驱动的适配器还接受与容器相关的几个属性。仅当您不提供container参考时才会考虑这些值。在这种情况下,DefaultMessageListenerContainer会根据这些属性创建和配置 的实例。例如,您可以指定transaction-manager引用、concurrent-consumers值以及其他几个属性引用和值。有关更多详细信息,请参阅Javadoc和 Spring Integration 的 JMS 模式 ( spring-integration-jms.xsd)。

如果您有自定义侦听器容器实现(通常是 的子类DefaultMessageListenerContainer),则可以使用属性提供对它的实例的引用,也可以使用container属性提供其完全限定的类名container-class。在这种情况下,适配器上的属性将传输到您的自定义容器的实例。

您不能使用 Spring JMS 命名空间元素<jms:listener-container/>来配置容器引用,<int-jms:message-driven-channel-adapter>因为该元素实际上并不引用容器。每个<jms:listener/>子元素都有自己的DefaultMessageListenerContainer(在父<jms:listener-container/>元素上定义共享属性)。您可以给每个监听器子元素一个id,并使用它来注入通道适配器,但是,<jms:/>命名空间需要一个真正的监听器。

建议为 配置一个正则<bean>DefaultMessageListenerContainer并在通道适配器中使用它作为参考。

从版本 4.2 开始,默认acknowledge模式是transacted,除非您提供外部容器。在这种情况下,您应该根据需要配置容器。我们建议使用transactedwithDefaultMessageListenerContainer以避免消息丢失。

'extract-payload' 属性具有相同的效果,其默认值为'true'。该poller元素不适用于消息驱动的通道适配器,因为它是主动调用的。MessageChannel对于大多数情况,消息驱动的方法更好,因为一旦从底层 JMS 使用者接收到消息,它们就会传递给。

最后,该<message-driven-channel-adapter>元素还接受 'error-channel' 属性。这提供了相同的基本功能,如输入GatewayProxyFactoryBean. 以下示例显示如何在消息驱动的通道适配器上设置错误通道:

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

将前面的示例与我们稍后讨论的通用网关配置或 JMS“入站网关”进行比较时,主要区别在于我们处于单向流中,因为这是一个“通道适配器”,而不是网关. 因此,“错误通道”下游的流也应该是单向的。例如,它可以发送到日志处理程序,也可以连接到不同的 JMS<outbound-channel-adapter>元素。

从主题消费时,将pub-sub-domain属性设置为 true。设置subscription-durabletrue用于持久订阅或subscription-shared共享订阅(需要 JMS 2.0 代理,并且从 4.2 版开始可用)。用于subscription-name命名订阅。

从 5.1 版开始,当端点停止而应用程序仍在运行时,底层侦听器容器将关闭,关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到以前的行为,请将设置shutdownContainerOnStop为。JmsMessageDrivenEndpointfalse

入站转换错误

从版本 4.2 开始,“错误通道”也用于转换错误。以前,如果 JMS<message-driven-channel-adapter/><inbound-gateway/>由于转换错误而无法传递消息,则会将异常抛出回容器。如果容器配置为使用事务,则消息会回滚并重复传递。转换过程发生在消息构造之前和期间,因此此类错误不会发送到“错误通道”。现在这种转换异常会导致ErrorMessage被发送到“错误通道”,异常为payload. 如果您希望事务回滚并且定义了“错误通道”,则“错误通道”上的集成流必须重新抛出异常(或另一个异常)。如果错误流未引发异常,则提交事务并删除消息。如果没有定义“错误通道”,则异常将像以前一样被抛出回容器。

出站通道适配器

JmsSendingMessageHandler实现MessageHandler接口并且能够将 Spring Integration 转换为MessagesJMS 消息,然后发送到 JMS 目标。它需要一个jmsTemplate参考或两者都需要jmsConnectionFactorydestination参考(destinationName可以提供代替destination)。与入站通道适配器一样,配置此适配器的最简单方法是使用命名空间支持。以下配置生成一个适配器,它从 接收 Spring Integration 消息exampleChannel,将它们转换为 JMS 消息,并将它们发送到 bean 名称为的 JMS 目标引用outQueue

Java DSL
@Bean
public IntegrationFlow jmsOutboundFlow() {
    return f -> f
            .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
科特林 DSL
@Bean
fun jmsOutboundFlow() =
        integrationFlow {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }
java
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}
XML
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

与入站通道适配器一样,有一个“提取有效负载”属性。但是,出站适配器的含义相反。布尔属性不是应用于 JMS 消息,而是应用于 Spring Integration 消息有效负载。换句话说,决定是将 Spring Integration 消息本身作为 JMS 消息体传递,还是将 Spring Integration 消息有效负载作为 JMS 消息体传递。默认值是true'。因此,如果您传递有效负载为 a 的 Spring Integration 消息,则会创建String一个 JMS TextMessage。另一方面,如果您想通过 JMS 将实际的 Spring Integration 消息发送到另一个系统,请将其设置为“false”。

无论有效负载提取的布尔值如何,MessageHeaders只要您依赖默认转换器或提供对另一个实例的引用,Spring Integration 就会映射到 JMS 属性MessageConverter。(对于“入站”适配器也是如此,除了在这些情况下,JMS 属性映射到 Spring Integration MessageHeaders)。

从版本 5.1 开始,可以使用和属性配置<int-jms:outbound-channel-adapter>( )以评估 JMS 消息的适当 QoS 值,以便在运行时针对请求 Spring 发送。的新选项和选项可能有助于作为动态和消息标题的信息来源:JmsSendingMessageHandlerdeliveryModeExpressiontimeToLiveExpressionMessagesetMapInboundDeliveryMode(true)setMapInboundExpiration(true)DefaultJmsHeaderMapperdeliveryModetimeToLive

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

交易

从版本 4.0 开始,出站通道适配器支持该session-transacted属性。在早期版本中,您必须注入一个JmsTemplatewith sessionTransactedset to true。该属性现在将属性设置为内置的 default JmsTemplate。如果存在事务(可能来自上游message-driven-channel-adapter),则在同一事务中执行发送操作。否则,将启动一个新事务。

入站网关

Spring Integration 的消息驱动的 JMS 入站网关委托给一个MessageListener容器,支持动态调整并发消费者,还可以处理回复。入站网关需要引用 aConnectionFactory和请求Destination(或“requestDestinationName”)。以下示例定义了一个 JMS inbound-gateway,该 JMS 从 bean id 引用的 JMS 队列接收,并发inQueue送到名为 的 Spring Integration 通道exampleChannel

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

由于网关提供请求-回复行为而不是单向发送或接收行为,因此它们还具有两个不同的“有效负载提取”属性(如前面讨论的通道适配器的“提取有效负载”设置)。对于入站网关,“extract-request-payload”属性确定是否提取接收到的 JMS 消息正文。如果为“false”,则 JMS 消息本身成为 Spring Integration 消息有效负载。默认值为“真”。

类似地,对于入站网关,“extract-reply-payload”属性适用于要转换为回复 JMS 消息的 Spring Integration 消息。如果要传递整个 Spring Integration 消息(作为 JMS ObjectMessage 的主体),请将值设置为“false”。默认情况下,将 Spring Integration 消息有效负载转换为 JMS 消息也是“真”(例如,String有效负载变成 JMS TextMessage)。

与其他任何事情一样,网关调用可能会导致错误。默认情况下,生产者不会收到消费者端可能发生的错误的通知,并会超时等待回复。但是,有时您可能希望将错误条件传达回消费者(换句话说,您可能希望通过将异常映射到消息来将其视为有效回复)。为了实现这一点,JMS 入站网关提供对消息通道的支持,错误可以发送到该消息通道进行处理,这可能会导致回复消息有效负载符合某些合同,该合同定义了调用者可能期望的“错误”回复。您可以使用 error-channel 属性来配置这样的通道,如以下示例所示:

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

您可能会注意到此示例与EnterGatewayProxyFactoryBean中包含的示例非常相似。同样的想法也适用于这里:exceptionTransformer可以是创建错误响应对象的 POJO,您可以引用nullChannel来抑制错误,或者您可以保留“错误通道”以让异常传播。

请参阅入站转换错误

从主题消费时,将pub-sub-domain属性设置为 true。为持久订阅或共享订阅设置subscription-durable为(需要 JMS 2.0 代理,并且从 4.2 版开始可用)。用于命名订阅。truesubscription-sharedsubscription-name

从 4.2 版开始,默认acknowledge模式是transacted,除非提供了外部容器。在这种情况下,您应该根据需要配置容器。我们建议您使用transactedwithDefaultMessageListenerContainer以避免消息丢失。

从 5.1 版开始,当端点停止而应用程序仍在运行时,底层侦听器容器将关闭,关闭其共享连接和消费者。以前,连接和消费者保持打开状态。要恢复到以前的行为,请将设置shutdownContainerOnStop为。JmsInboundGatewayfalse

出站网关

出站网关从 Spring Integration 消息创建 JMS 消息并将它们发送到“请求目标”。然后,它通过使用选择器从您配置的“回复目标”接收,或者如果没有提供“回复目标”,则通过创建 JMSTemporaryQueue实例来处理 JMS 回复消息。

将 a reply-destination(or reply-destination-name) 与CachingConnectionFactorycacheConsumers 设置为的 a 一起使用true可能会导致内存不足的情况。这是因为每个请求都会获得一个带有新选择器的新消费者(在correlation-key值上进行选择,或者,如果没有correlation-key,则在发送的 JMSMessageID 上进行选择)。鉴于这些选择器是唯一的,它们会在当前请求完成后保留在缓存中(未使用)。

如果您指定回复目的地,建议您不要使用缓存的消费者。或者,考虑使用 a <reply-listener/>,如下所述

以下示例显示了如何配置出站网关:

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

'outbound-gateway' 有效负载提取属性与'inbound-gateway' 的属性负相关(参见前面的讨论)。这意味着“extract-request-payload”属性值适用于被转换为 JMS 消息以作为请求发送的 Spring Integration 消息。'extract-reply-payload' 属性值适用于作为回复接收的 JMS 消息,然后转换为 Spring Integration 消息,随后发送到 'reply-channel',如前面的配置示例所示。

用一个<reply-listener/>

Spring Integration 2.2 引入了另一种处理回复的技术。如果您将<reply-listener/>子元素添加到网关而不是为每个回复创建消费者,MessageListener则使用容器来接收回复并将它们交给请求线程。这提供了许多性能优势,并减轻了前面警告中描述的缓存消费者内存使用问题。

将 a<reply-listener/>与没有 的出站网关一起使用时,不是为每个请求reply-destination创建一个,而是使用一个。(如果与代理的连接丢失并恢复,网关会根据需要创建一个额外的 )。TemporaryQueueTemporaryQueueTemporaryQueue

使用 a 时correlation-key,多个网关可以共享相同的回复目标,因为侦听器容器使用每个网关唯一的选择器。

如果您指定回复侦听器并指定回复目标(或回复目标名称)但未提供关联键,则网关会记录警告并回退到 2.2 之前的行为。这是因为在这种情况下无法配置选择器。因此,无法避免将回复发送到可能配置有相同回复目的地的不同网关。

请注意,在这种情况下,每个请求都会使用一个新的消费者,消费者可以按照上面的注意事项在内存中建立;因此在这种情况下不应使用缓存的消费者。

以下示例显示了具有默认属性的回复侦听器:

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

侦听器非常轻量级,我们预计在大多数情况下,您只需要一个消费者。但是,您可以添加concurrent-consumersmax-concurrent-consumers等属性。有关受支持属性的完整列表,请参阅架构以及Spring JMS 文档以了解它们的含义。

空闲回复监听器

从版本 4.2 开始,您可以根据需要启动回复侦听器(并在空闲时间后停止),而不是在网关的生命周期内运行。如果您在应用程序上下文中有许多网关,而这些网关大多处于空闲状态,这将很有用。一种这样的情况是使用 Spring Integration 和 JMS 进行分区分配的许多(非活动)分区Spring Batch作业的上下文。如果所有回复侦听器都处于活动状态,则 JMS 代理对每个网关都有一个活动使用者。通过启用空闲超时,每个消费者仅在相应的批处理作业运行时存在(并且在它完成后的短时间内)。

请参阅属性参考idle-reply-listener-timeout

网关回复相关性

本节描述了用于回复关联的机制(确保始发网关仅接收对其请求的回复),具体取决于网关的配置方式。有关此处讨论的属性的完整描述,请参阅属性参考

以下列表描述了各种场景(数字用于识别 - 顺序无关紧要):

  1. 没有reply-destination*属性也没有<reply-listener>

    ATemporaryQueue为每个请求创建并在请求完成(成功或其他方式)时删除。 correlation-key无关紧要。

  2. 提供了一个reply-destination*属性,但既没有提供 a<reply-listener/>也没有correlation-key提供a

    JMSCorrelationID等于传出消息 IS 用作消费者的消息选择器:

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    JMSMessageID响应系统应在回复中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter消息驱动 POJO 实现。

    使用此配置时,不应使用主题进行回复。回复可能会丢失。
  3. 提供了reply-destination*属性,没有<reply-listener/>提供,并且correlation-key="JMSCorrelationID"

    网关生成一个唯一的相关 IS 并将其插入到JMSCorrelationID标头中。消息选择器是:

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    JMSCorrelationID响应系统应在回复中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter消息驱动 POJO 实现。

  4. 提供了reply-destination*属性,没有<reply-listener/>提供,并且correlation-key="myCorrelationHeader"

    网关生成一个唯一的相关 ID 并将其插入到myCorrelationHeader消息属性中。correlation-key可以是任何用户定义的值。消息选择器是:

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    myCorrelationHeader响应系统应在回复中返回入站myCorrelationHeader

  5. 提供了一个reply-destination*属性,没有<reply-listener/>提供,并且correlation-key="JMSCorrelationID*" (注意*相关键中的。)

    网关使用jms_correlationId请求消息的标头(如果存在)中的值并将其插入JMSCorrelationID标头中。消息选择器是:

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    用户必须确保该值是唯一的。

    如果标头不存在,则网关的行为与3.

    JMSCorrelationID响应系统应在回复中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter消息驱动 POJO 实现。

  6. 没有reply-destination*提供属性,只提供了<reply-listener>a

    创建一个临时队列并将其用于来自此网关实例的所有回复。消息中不需要相关数据,但JMSMessageID在网关内部使用传出以将回复定向到正确的请求线程。

  7. 提供了一个reply-destination*属性,提供了一个<reply-listener>,没有correlation-key提供了

    不允许。

    <reply-listener/>配置被忽略,网关的行为与2. 将写入警告日志消息以指示这种情况。

  8. 提供了一个reply-destination*属性,提供了一个<reply-listener>,并且correlation-key="JMSCorrelationID"

    JMSCorrelationID网关具有唯一的相关 ID 并将其与标头 ( gatewayId + "_" + ++seq)中的递增值一起插入。消息选择器是:

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    JMSCorrelationID响应系统应在回复中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring Integration 入站网关以及 Spring 的MessageListenerAdapter消息驱动 POJO 实现。由于每个网关都有一个唯一的 ID,每个实例只能得到自己的回复。完整的相关数据用于将回复路由到正确的请求线程。

  9. 提供了一个reply-destination*属性<reply-listener/>,并且correlation-key="myCorrelationHeader"

    myCorrelationHeader网关具有唯一的相关 ID 并将其与属性 ( gatewayId + "_" + ++seq)中的递增值一起插入。correlation-key可以是任何用户定义的值。消息选择器是:

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    myCorrelationHeader响应系统应在回复中返回入站myCorrelationHeader。由于每个网关都有一个唯一的 ID,每个实例只能得到自己的回复。完整的相关数据用于将回复路由到正确的请求线程。

  10. 提供了一个reply-destination*属性,提供了一个<reply-listener/>,并且correlation-key="JMSCorrelationID*"

    (注意*相关键中的)

    不允许。

    回复侦听器不允许用户提供的相关 ID。网关不会使用此配置进行初始化。

异步网关

从 4.3 版开始,您现在可以在配置出站网关时指定async="true"(或setAsync(true)在 Java 中)。

默认情况下,当向网关发送请求时,请求线程会暂停,直到收到回复。然后流程在该线程上继续。如果asynctrue,则发送完成后立即释放请求线程,并在侦听器容器线程上返回回复(并且流程继续)。当在轮询线程上调用网关时,这可能很有用。线程被释放并且可用于框架内的其他任务。

async需要一个<reply-listener/>(或setUseReplyContainer(true)在使用 Java 配置时)。它还需要指定correlationKey(通常JMSCorrelationID)。如果其中任何一个条件不满足,async则忽略。

属性参考

以下清单显示了 的所有可用属性outbound-gateway

<int-jms:outbound-gateway
    connection-factory="connectionFactory" (1)
    correlation-key="" (2)
    delivery-persistent="" (3)
    destination-resolver="" (4)
    explicit-qos-enabled="" (5)
    extract-reply-payload="true" (6)
    extract-request-payload="true" (7)
    header-mapper="" (8)
    message-converter="" (9)
    priority="" (10)
    receive-timeout="" (11)
    reply-channel="" (12)
    reply-destination="" (13)
    reply-destination-expression="" (14)
    reply-destination-name="" (15)
    reply-pub-sub-domain="" (16)
    reply-timeout="" (17)
    request-channel="" (18)
    request-destination="" (19)
    request-destination-expression="" (20)
    request-destination-name="" (21)
    request-pub-sub-domain="" (22)
    time-to-live="" (23)
    requires-reply="" (24)
    idle-reply-listener-timeout="" (25)
    async=""> (26)
  <int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 参考一个javax.jms.ConnectionFactory. 默认jmsConnectionFactory.
2 包含关联数据以将响应与回复关联的属性的名称。如果省略,网关期望响应系统返回JMSMessageID标头中的出站标头的值JMSCorrelationID。如果指定,网关会生成一个相关 ID 并用它填充指定的属性。响应系统必须在同一属性中回显该值。它可以设置为JMSCorrelationID,在这种情况下,使用标准标题而不是String属性来保存相关数据。使用 时,如果提供显式,则<reply-container/>必须指定。从 4.0.1 版本开始,该属性也支持 value ,这意味着如果出站消息已经有一个(从correlation-keyreply-destinationJMSCorrelationID*JMSCorrelationIDjms_correlationId) 标头,它被用来代替生成一个新的标头。注意,JMSCorrelationID*使用 a 时不允许使用 key <reply-container/>,因为容器在初始化时需要设置消息选择器。
您应该了解网关无法确保唯一性,如果提供的关联 ID 不唯一,可能会出现意想不到的副作用。
3 一个布尔值,指示交付模式应该是DeliveryMode.PERSISTENT( true) 还是DeliveryMode.NON_PERSISTENT( false)。explicit-qos-enabled此设置仅在为时生效true
4 一个DestinationResolver。默认值为 a DynamicDestinationResolver,它将目标名称映射到该名称的队列或主题。
5 当设置为 时true,它可以使用服务质量属性:prioritydelivery-modetime-to-live
6 当设置为true(默认值)时,Spring Integration 回复消息的有效负载是从 JMS 回复消息的正文中创建的(通过使用MessageConverter)。当设置为false时,整个 JMS 消息将成为 Spring Integration 消息的负载。
7 当设置为true(默认值)时,Spring Integration 消息的有效负载将转换为JMSMessage(通过使用MessageConverter)。当设置为false时,整个 Spring 集成消息将转换为JMSMessage. 在这两种情况下,Spring Integration 消息头都通过使用HeaderMapper.
8 用于HeaderMapper将 Spring Integration 消息头映射到 JMS 消息头和属性以及从 JMS 消息头和属性映射。
9 MessageConverter对用于在 JMS 消息和 Spring Integration 消息有效负载(或消息,如果extract-request-payloadfalse)之间转换的引用。默认值为SimpleMessageConverter.
10 请求消息的默认优先级。被消息优先级标头覆盖(如果存在)。其范围09explicit-qos-enabled此设置仅在为时生效true
11 等待回复的时间(以毫秒为单位)。默认值为5000(五秒)。
12 回复消息发送到的通道。
13 对 a 的引用Destination,它被设置为JMSReplyTo标头。最多只允许reply-destinationreply-destination-expression或之一。reply-destination-name如果未提供,TemporaryQueue则使用 a 回复此网关。
14 评估为 a 的 SpEL 表达式Destination,将设置为JMSReplyTo标头。表达式可以产生一个Destination对象或一个String. 它被用于DestinationResolver解决实际的Destination. 最多只允许reply-destinationreply-destination-expression或之一。reply-destination-name如果未提供,TemporaryQueue则使用 a 回复此网关。
15 设置为 JMSReplyTo 标头的目标名称。它被用于DestinationResolver解决实际的Destination. 最多只允许reply-destinationreply-destination-expression或之一。reply-destination-name如果未提供,TemporaryQueue则使用 a 回复此网关。
16 当设置为 时true,它表示Destination由 解析的任何回复DestinationResolver都应该是 aTopic而不是 a Queue
17 网关在发送回复消息到reply-channel. 这仅在reply-channelcan 阻塞时才有效——例如QueueChannel容量限制当前已满。默认值为无穷大。
18 此网关接收请求消息的通道。
19 Destination对发送请求消息的a 的引用。reply-destinationreply-destination-expression或之一reply-destination-name是必需的。您只能使用这三个属性之一。
20 一个 SpEL 表达式,计算Destination请求消息发送到的 a。表达式可以产生一个Destination对象或一个String. 它被用于DestinationResolver解决实际的Destination. reply-destinationreply-destination-expression或之一reply-destination-name是必需的。您只能使用这三个属性之一。
21 请求消息发送到的目的地的名称。它被用于DestinationResolver解决实际的Destination. reply-destinationreply-destination-expression或之一reply-destination-name是必需的。您只能使用这三个属性之一。
22 当设置为 时true,它表示Destination由 解析的任何请求DestinationResolver应该是 aTopic而不是 a Queue
23 指定消息的生存时间。explicit-qos-enabled此设置仅在为时生效true
24 指定此出站网关是否必须返回非空值。默认情况下,该值为trueMessageTimeoutException当底层服务在receive-timeout. 请注意,如果从不期望服务返回回复,则最好使用 a<int-jms:outbound-channel-adapter/>而不是<int-jms:outbound-gateway/>with requires-reply="false"。使用后者,发送线程被阻塞,等待receive-timeout一段时间的回复。
25 当您使用 a<reply-listener />时,它的生命周期(开始和停止)默认与网关的生命周期相匹配。当此值大于0时,容器按需启动(发送请求时)。容器继续运行,直到至少经过这段时间而没有收到任何请求(并且直到没有未完成的回复)。容器在下一个请求时再次启动。停止时间最短,实际上可能高达该值的 1.5 倍。
26 请参阅异步网关
27 当包含此元素时,异步接收回复,MessageListenerContainer而不是为每个回复创建消费者。在许多情况下,这可能更有效。

将消息头映射到 JMS 消息和从 JMS 消息映射

JMS 消息可以包含元信息,例如 JMS API 标头和简单属性。您可以使用JmsHeaderMapper. JMS API 标头被传递给适当的 setter 方法(例如setJMSReplyTo),而其他标头被复制到 JMS 消息的一般属性中。JMS 出站网关使用 的默认实现进行引导JmsHeaderMapper,它将映射标准 JMS API 标头以及原始或String消息标头。header-mapper您还可以通过使用入站和出站网关的属性来提供自定义标头映射器。

许多特定于 JMS 供应商的客户端不允许直接在已创建的 JMS 消息上设置deliveryModepriority和属性。timeToLive它们被认为是 QoS 属性,因此必须传播到目标MessageProducer.send(message, deliveryMode, priority, timeToLive)API。由于这个原因,DefaultJmsHeaderMapper不会将适当的 Spring Integration 标头(或表达式结果)映射到提到的 JMS 消息属性中。相反,aDynamicJmsTemplate用于JmsSendingMessageHandler将请求消息中的标头值传播到MessageProducer.send()API。要启用此功能,您必须配置出站端点,DynamicJmsTemplate并将其explicitQosEnabled属性设置为 true。Spring Integration Java DSLDynamicJmsTemplate默认配置一个,但您仍必须设置该explicitQosEnabled属性。
从 4.0 版开始,JMSPriority标头映射到priority入站消息的标准标头。以前,priority标头仅用于出站消息。要恢复到以前的行为(即不映射入站优先级),请将 的mapInboundPriority属性设置DefaultJmsHeaderMapperfalse
从 4.3 版开始,通过调用其方法(通常是 a ,JMS 不支持)DefaultJmsHeaderMapper将标准标头映射为消息属性。在入站端,它被映射为. 这独立于标头,该标头映射到标头和从标头映射。通常用于关联请求和回复,而通常用于将相关消息组合成一个组(例如与聚合器或重排序器)。 correlationIdtoString()correlationIdUUIDStringjms_correlationIdJMSCorrelationIDJMSCorrelationIDcorrelationId

从 5.1 版开始,DefaultJmsHeaderMapper可以配置用于映射入站JMSDeliveryModeJMSExpiration属性:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

这些 JMS 属性分别映射到JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATIONSpring Message 标头。

消息转换、编组和解组

如果您需要转换消息,所有 JMS 适配器和网关都允许您MessageConverter通过设置message-converter属性来提供一个。为此,请提供MessageConverter在同一 ApplicationContext 中可用的实例的 bean 名称。此外,为了提供与 marshaller 和 unmarshaller 接口的一致性,Spring 提供了MarshallingMessageConverter,您可以使用自己的自定义 marshaller 和 unmarshaller 进行配置。以下示例显示了如何执行此操作

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>
当您提供自己的MessageConverter实例时,它仍然包含在HeaderMappingMessageConverter. 这意味着“extract-request-payload”和“extract-reply-payload”属性会影响传递给转换器的实际对象。它HeaderMappingMessageConverter本身委托给一个目标MessageConverter,同时还将 Spring Integration 映射MessageHeaders到 JMS 消息属性并再次映射回来。

JMS 支持的消息通道

前面介绍的通道适配器和网关都适用于与其他外部系统集成的应用程序。入站选项假设某个其他系统正在向 JMS 目标发送 JMS 消息,出站选项假设某个其他系统正在从目标接收。另一个系统可能是也可能不是 Spring Integration 应用程序。当然,当发送一个 Spring Integration 消息实例作为 JMS 消息本身的主体(将 'extract-payload' 值设置为false)时,假设另一个系统是基于 Spring Integration 的。然而,这绝不是一个要求。这种灵活性是使用基于消息的集成选项和抽象的“通道”(或 JMS 中的目的地)的好处之一。

有时,给定 JMS 目标的生产者和消费者都打算成为同一个应用程序的一部分,在同一个进程中运行。您可以通过使用一对入站和出站通道适配器来完成此操作。这种方法的问题是您需要两个适配器,尽管从概念上讲,目标是拥有一个消息通道。从 Spring Integration 版本 2.0 开始支持更好的选择。现在可以在使用 JMS 命名空间时定义单个“通道”,如以下示例所示:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

前面示例中的通道的行为<channel/>与主 Spring Integration 命名空间中的普通元素非常相似。它可以被任何端点的input-channel和属性引用。output-channel不同之处在于此通道由名为exampleQueue. 这意味着在生产和消费端点之间可以进行异步消息传递。<queue/>但是,与通过在非 JMS 中添加元素创建的更简单的异步消息通道不同<channel/>元素,消息不存储在内存队列中。相反,这些消息在 JMS 消息体中传递,然后底层 JMS 提供程序的全部功能可用于该通道。使用这种替代方法最常见的理由可能是利用 JMS 消息传递的存储转发方法提供的持久性。

如果配置正确,JMS 支持的消息通道也支持事务。换句话说,如果生产者的发送操作是回滚事务的一部分,则生产者实际上不会写入事务性 JMS 支持的通道。同样,如果消息的接收是回滚事务的一部分,则使用者不会从通道中物理删除 JMS 消息。请注意,在这种情况下,生产者和消费者事务是分开的。<channel/>这与跨没有<queue/>子元素的简单同步元素传播事务上下文有很大不同。

由于上述示例引用了一个 JMS Queue 实例,因此它充当点对点通道。另一方面,如果您需要发布-订阅行为,则可以使用单独的元素并引用 JMS 主题。以下示例显示了如何执行此操作:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

对于任何一种 JMS 支持的通道,都可以提供目标名称而不是引用,如以下示例所示:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在前面的示例中,目标名称由 Spring 的默认DynamicDestinationResolver实现解析,但您可以提供DestinationResolver接口的任何实现。此外,JMSConnectionFactory是通道的必需属性,但默认情况下,预期的 bean 名称为jmsConnectionFactory. 以下示例提供了用于解析 JMS 目标名称的自定义实例和不同的名称ConnectionFactory

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

对于<publish-subscribe-channel />,将durable属性设置true为 用于持久订阅或subscription-shared共享订阅(需要 JMS 2.0 代理,并且从 4.2 版开始可用)。用于subscription命名订阅。

使用 JMS 消息选择器

使用 JMS 消息选择器,您可以根据 JMS 标头和 JMS 属性过滤JMS 消息。例如,如果您想侦听其自定义 JMS 标头属性myHeaderProperty、 等于的消息something,您可以指定以下表达式:

myHeaderProperty = 'something'

消息选择器表达式是SQL-92条件表达式语法的子集,被定义为Java 消息服务规范的一部分。selector您可以使用 XML 命名空间配置为以下 Spring Integration JMS 组件指定 JMS 消息属性:

  • JMS 频道

  • JMS 发布订阅频道

  • JMS 入站通道适配器

  • JMS 入站网关

  • JMS 消息驱动的通道适配器

您不能使用 JMS 消息选择器来引用消息正文值。

JMS 示例

要试验这些 JMS 适配器,请查看位于https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms的 Spring Integration Samples Git 存储库中提供的 JMS 示例。

该存储库包括两个示例。一个提供入站和出站通道适配器,另一个提供入站和出站网关。它们被配置为使用嵌入式ActiveMQ进程运行,但您可以修改每个示例的common.xml Spring 应用程序上下文文件以支持不同的 JMS 提供程序或独立的 ActiveMQ 进程。

换句话说,您可以拆分配置,以便入站和出站适配器在不同的 JVM 中运行。如果您安装了 ActiveMQ,请修改要使用的文件中的brokerURL属性(而不是)。这两个样本都接受来自标准输入的输入并回显到标准输出。查看配置以了解这些消息是如何通过 JMS 路由的。common.xmltcp://localhost:61616vm://localhost


1. see XML Configuration