STOMP 支持

Spring Integration 4.2 版引入了 STOMP(面向简单文本的消息传递协议)客户端支持。它基于 Spring Framework 的消息传递模块 stomp 包中的架构、基础设施和 API。Spring Integration 使用了许多 Spring STOMP 组件(例如StompSessionStompClientSupport)。有关详细信息,请参阅Spring Framework 参考手册中的Spring Framework STOMP Support一章。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-stomp:5.5.13"

对于服务器端组件,您需要添加一个org.springframework:spring-websocket和/或io.projectreactor.netty:reactor-netty依赖项。

概述

要配置 STOMP,您应该从 STOMP 客户端对象开始。Spring 框架提供了以下实现:

  • WebSocketStompClient:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于使用 SockJS 客户端进行基于 HTTP 的 WebSocket 仿真。

  • ReactorNettyTcpStompClient: 建立在ReactorNettyTcpClient项目reactor-netty之上。

您可以提供任何其他StompClientSupport实现。请参阅这些类的Javadoc

该类StompClientSupport被设计为一个工厂StompSession,为提供的对象生成一个StompSessionHandler,所有剩余的工作都是通过对该对象的回调StompSessionHandlerStompSession抽象来完成的。使用 Spring Integration适配器抽象,我们需要提供一些托管共享对象来将我们的应用程序表示为具有唯一会话的 STOMP 客户端。为此,Spring Integration 提供了StompSessionManager抽象来管理任何 StompSession提供的StompSessionHandler. 这允许为特定的 STOMP 代理使用入站出站通道适配器(或两者)。StompSessionManager(及其实现)JavaDocs 了解更多信息。

STOMP 入站通道适配器

StompInboundChannelAdapter是一个一站式MessageProducer组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(通过使用MessageConverterconnected 上的提供从 STOMP 帧转换StompSession)。您可以在运行时通过@ManagedOperationStompInboundChannelAdapter.

有关更多配置选项,请参阅STOMP 命名空间支持StompInboundChannelAdapter Javadoc

STOMP 出站通道适配器

是forStompMessageHandler并且用于通过(由 shared 提供)将传出实例发送到 STOMP (预先配置或在运行时使用 SpEL 表达式确定)。MessageHandler<int-stomp:outbound-channel-adapter>Message<?>destinationStompSessionStompSessionManager

有关更多配置选项,请参阅STOMP 命名空间支持StompMessageHandler Javadoc

STOMP 标头映射

STOMP 协议提供标头作为其帧的一部分。STOMP 帧的整个结构具有以下格式:

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供StompHeaders来表示这些标头。有关更多详细信息,请参阅Javadoc。STOMP 帧与实例相互转换,Message<?>这些标头映射到实例和从MessageHeaders实例映射。Spring Integration 为 STOMP 适配器提供了默认HeaderMapper实现。实施是StompHeaderMapper. 它分别为入站和出站适配器提供fromHeaders()和操作。toHeaders()

与许多其他 Spring Integration 模块一样,IntegrationStompHeaders该类已被引入以将标准 STOMP 标头映射到MessageHeadersstomp_作为标头名称前缀。此外,MessageHeaders具有该前缀的所有实例StompHeaders在发送到目的地时都映射到。

有关更多信息,请参阅这些类的Javadoc和STOMP 命名空间支持mapped-headers中的属性描述。

STOMP 整合活动

许多 STOMP 操作是异步的,包括错误处理。例如,STOMP 有一个RECEIPT服务器框架,当客户端框架通过添加RECEIPT标头请求一个服务器框架时,它会返回该框架。为了提供对这些异步事件的访问,Spring Integration 会发出StompIntegrationEvent实例,您可以通过实现一个ApplicationListener或使用一个来获得这些实例<int-event:inbound-channel-adapter>(请参阅接收 Spring 应用程序事件)。

具体来说,由于无法连接到 STOMP 代理,当 a接收时StompExceptionEvent发出a。另一个例子是. 它处理STOMP 帧,这些帧是服务器对 this 发送的不正确(未接受)消息的响应。AbstractStompSessionManagerstompSessionListenableFutureonFailure()StompMessageHandlerERRORStompMessageHandler

StompMessageHandler发送到. StompReceiptEvent_ 可以是正数或负数,具体取决于是否在该时间段内从服务器接收到帧,您可以在实例上进行配置。它默认为(以毫秒为单位,因此为 15 秒)。StompSession.ReceiptableStompSessionStompReceiptEventRECEIPTreceiptTimeLimitStompClientSupport15 * 1000

只有当要发送的消息StompSession.Receiptable的 STOMP 标头不是时,才会添加回调。您可以通过其选项和分别 启用自动标头生成。RECEIPTnullRECEIPTStompSessionautoReceiptStompSessionManager

有关如何配置 Spring Integration 以接受这些实例的更多信息,请参阅STOMP 适配器 Java 配置。ApplicationEvent

STOMP 适配器 Java 配置

以下示例显示了 STOMP 适配器的全面 Java 配置:

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

了解<int-stomp:outbound-channel-adapter>元素

以下清单显示了 STOMP 出站通道适配器的可用属性:

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 bean 名称。MessageHandler注册了一个 bean别名idplus .handler。如果您不设置该channel属性,DirectChannel则会在应用程序上下文中创建并注册 a,并将该id属性的值作为 bean 名称。在这种情况下,端点注册了一个 bean 名称idplus .adapter
2 id如果存在,则标识连接到此适配器的通道。见id。可选的。
3 引用一个StompSessionManagerbean,它封装了低级的连接和StompSession处理操作。必需的。
4 对实现 的 bean 的引用HeaderMapper<StompHeaders>,它将 Spring Integration 映射MessageHeaders到和来自 STOMP 帧头。它与 互斥mapped-headers。它默认为StompHeaderMapper.
5 要映射到 STOMP 帧头的 STOMP 标头名称的逗号分隔列表。只有在header-mapper未设置参考时才能提供。此列表中的值也可以是与标头名称匹配的简单模式(例如myheader**myheader)。一个特殊的标记 ( STOMP_OUTBOUND_HEADERS) 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。默认情况下包含它们。如果要添加自己的标头并希望也映射标准标头,则必须包含此令牌或HeaderMapper使用header-mapper.
6 STOMP 消息发送到的目的地的名称。它与destination-expression.
7 一个 SpEL 表达式,在运行时针对Message作为根对象的每个 Spring Integration 进行评估。它与destination.
8 指示此端点是否应自动启动的布尔值。它默认为true.
9 此端点应启动和停止的生命周期阶段。该值越低,此端点启动越早,停止越晚。默认值为Integer.MIN_VALUE. 值可以是负数。见SmartLifeCycle

了解<int-stomp:inbound-channel-adapter>元素

以下清单显示了 STOMP 入站通道适配器的可用属性:

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 The component bean name. If you do not set the channel attribute, a DirectChannel is created and registered in the application context with the value of this id attribute as the bean name. In this case, the endpoint is registered with the bean name id plus .adapter.
2 Identifies the channel attached to this adapter.
3 The MessageChannel bean reference to which ErrorMessage instances should be sent.
4 See the same option on the <int-stomp:outbound-channel-adapter>.
5 要从 STOMP 帧标头映射的 STOMP 标头名称的逗号分隔列表。header-mapper如果未设置参考,您只能提供此信息。此列表中的值也可以是与标头名称匹配的简单模式(例如,myheader**myheader)。一个特殊的标记 ( STOMP_INBOUND_HEADERS) 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。默认情况下包含它们。如果要添加自己的标头并希望也映射标准标头,则还必须包含此令牌或HeaderMapper使用header-mapper.
6 在 上查看相同的选项<int-stomp:outbound-channel-adapter>
7 要订阅的 STOMP 目标名称的逗号分隔列表。目标列表(以及订阅)可以在运行时通过addDestination()removeDestination() @ManagedOperation注释进行修改。
8 如果通道可以阻塞,则在向通道发送消息时等待的最长时间(以毫秒为单位)。例如,QueueChannel如果已达到其最大容量,则可以阻塞直到有可用空间。
9 payload要从传入的 STOMP 帧转换的目标的 Java 类型的完全限定名称。它默认为String.class.
10 在 上查看相同的选项<int-stomp:outbound-channel-adapter>
11 在 上查看相同的选项<int-stomp:outbound-channel-adapter>

1. see XML Configuration