STOMP 支持
Spring Integration 4.2 版引入了 STOMP(面向简单文本的消息传递协议)客户端支持。它基于 Spring Framework 的消息传递模块 stomp 包中的架构、基础设施和 API。Spring Integration 使用了许多 Spring STOMP 组件(例如StompSession
和StompClientSupport
)。有关详细信息,请参阅Spring Framework 参考手册中的Spring Framework STOMP Support一章。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:5.5.13"
对于服务器端组件,您需要添加一个org.springframework:spring-websocket
和/或io.projectreactor.netty:reactor-netty
依赖项。
概述
要配置 STOMP,您应该从 STOMP 客户端对象开始。Spring 框架提供了以下实现:
-
WebSocketStompClient
:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于使用 SockJS 客户端进行基于 HTTP 的 WebSocket 仿真。 -
ReactorNettyTcpStompClient
: 建立在ReactorNettyTcpClient
项目reactor-netty
之上。
您可以提供任何其他StompClientSupport
实现。请参阅这些类的Javadoc。
该类StompClientSupport
被设计为一个工厂StompSession
,为提供的对象生成一个StompSessionHandler
,所有剩余的工作都是通过对该对象的回调StompSessionHandler
和StompSession
抽象来完成的。使用 Spring Integration适配器抽象,我们需要提供一些托管共享对象来将我们的应用程序表示为具有唯一会话的 STOMP 客户端。为此,Spring Integration 提供了StompSessionManager
抽象来管理任何 StompSession
提供的StompSessionHandler
. 这允许为特定的 STOMP 代理使用入站或出站通道适配器(或两者)。看StompSessionManager
(及其实现)JavaDocs 了解更多信息。
STOMP 入站通道适配器
这StompInboundChannelAdapter
是一个一站式MessageProducer
组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(通过使用MessageConverter
connected 上的提供从 STOMP 帧转换StompSession
)。您可以在运行时通过@ManagedOperation
在StompInboundChannelAdapter
.
有关更多配置选项,请参阅STOMP 命名空间支持和StompInboundChannelAdapter
Javadoc。
STOMP 出站通道适配器
是forStompMessageHandler
并且用于通过(由 shared 提供)将传出实例发送到 STOMP (预先配置或在运行时使用 SpEL 表达式确定)。MessageHandler
<int-stomp:outbound-channel-adapter>
Message<?>
destination
StompSession
StompSessionManager
有关更多配置选项,请参阅STOMP 命名空间支持和StompMessageHandler
Javadoc。
STOMP 标头映射
STOMP 协议提供标头作为其帧的一部分。STOMP 帧的整个结构具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供StompHeaders
来表示这些标头。有关更多详细信息,请参阅Javadoc。STOMP 帧与实例相互转换,Message<?>
这些标头映射到实例和从MessageHeaders
实例映射。Spring Integration 为 STOMP 适配器提供了默认HeaderMapper
实现。实施是StompHeaderMapper
. 它分别为入站和出站适配器提供fromHeaders()
和操作。toHeaders()
与许多其他 Spring Integration 模块一样,IntegrationStompHeaders
该类已被引入以将标准 STOMP 标头映射到MessageHeaders
,stomp_
作为标头名称前缀。此外,MessageHeaders
具有该前缀的所有实例StompHeaders
在发送到目的地时都映射到。
有关更多信息,请参阅这些类的Javadoc和STOMP 命名空间支持mapped-headers
中的属性描述。
STOMP 整合活动
许多 STOMP 操作是异步的,包括错误处理。例如,STOMP 有一个RECEIPT
服务器框架,当客户端框架通过添加RECEIPT
标头请求一个服务器框架时,它会返回该框架。为了提供对这些异步事件的访问,Spring Integration 会发出StompIntegrationEvent
实例,您可以通过实现一个ApplicationListener
或使用一个来获得这些实例<int-event:inbound-channel-adapter>
(请参阅接收 Spring 应用程序事件)。
具体来说,由于无法连接到 STOMP 代理,当 a接收时StompExceptionEvent
发出a。另一个例子是. 它处理STOMP 帧,这些帧是服务器对 this 发送的不正确(未接受)消息的响应。AbstractStompSessionManager
stompSessionListenableFuture
onFailure()
StompMessageHandler
ERROR
StompMessageHandler
在StompMessageHandler
发送到. StompReceiptEvent
_ 可以是正数或负数,具体取决于是否在该时间段内从服务器接收到帧,您可以在实例上进行配置。它默认为(以毫秒为单位,因此为 15 秒)。StompSession.Receiptable
StompSession
StompReceiptEvent
RECEIPT
receiptTimeLimit
StompClientSupport
15 * 1000
只有当要发送的消息StompSession.Receiptable 的 STOMP 标头不是时,才会添加回调。您可以通过其选项和分别
启用自动标头生成。RECEIPT null RECEIPT StompSession autoReceipt StompSessionManager |
有关如何配置 Spring Integration 以接受这些实例的更多信息,请参阅STOMP 适配器 Java 配置。ApplicationEvent
STOMP 适配器 Java 配置
以下示例显示了 STOMP 适配器的全面 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空间支持
Spring Integration STOMP 命名空间实现了入站和出站通道适配器组件。要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
了解<int-stomp:outbound-channel-adapter>
元素
以下清单显示了 STOMP 出站通道适配器的可用属性:
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 bean 名称。MessageHandler 注册了一个 bean别名id plus .handler 。如果您不设置该channel 属性,DirectChannel 则会在应用程序上下文中创建并注册 a,并将该id 属性的值作为 bean 名称。在这种情况下,端点注册了一个 bean 名称id plus .adapter 。 |
2 | id 如果存在,则标识连接到此适配器的通道。见id 。可选的。 |
3 | 引用一个StompSessionManager bean,它封装了低级的连接和StompSession 处理操作。必需的。 |
4 | 对实现 的 bean 的引用HeaderMapper<StompHeaders> ,它将 Spring Integration 映射MessageHeaders 到和来自 STOMP 帧头。它与 互斥mapped-headers 。它默认为StompHeaderMapper . |
5 | 要映射到 STOMP 帧头的 STOMP 标头名称的逗号分隔列表。只有在header-mapper 未设置参考时才能提供。此列表中的值也可以是与标头名称匹配的简单模式(例如myheader* 或*myheader )。一个特殊的标记 ( STOMP_OUTBOUND_HEADERS ) 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。默认情况下包含它们。如果要添加自己的标头并希望也映射标准标头,则必须包含此令牌或HeaderMapper 使用header-mapper . |
6 | STOMP 消息发送到的目的地的名称。它与destination-expression . |
7 | 一个 SpEL 表达式,在运行时针对Message 作为根对象的每个 Spring Integration 进行评估。它与destination . |
8 | 指示此端点是否应自动启动的布尔值。它默认为true . |
9 | 此端点应启动和停止的生命周期阶段。该值越低,此端点启动越早,停止越晚。默认值为Integer.MIN_VALUE . 值可以是负数。见SmartLifeCycle 。 |
了解<int-stomp:inbound-channel-adapter>
元素
以下清单显示了 STOMP 入站通道适配器的可用属性:
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | The component bean name.
If you do not set the channel attribute, a DirectChannel is created and registered in the application context with the value of this id attribute as the bean name.
In this case, the endpoint is registered with the bean name id plus .adapter . |
2 | Identifies the channel attached to this adapter. |
3 | The MessageChannel bean reference to which ErrorMessage instances should be sent. |
4 | See the same option on the <int-stomp:outbound-channel-adapter> . |
5 | 要从 STOMP 帧标头映射的 STOMP 标头名称的逗号分隔列表。header-mapper 如果未设置参考,您只能提供此信息。此列表中的值也可以是与标头名称匹配的简单模式(例如,myheader* 或*myheader )。一个特殊的标记 ( STOMP_INBOUND_HEADERS ) 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。默认情况下包含它们。如果要添加自己的标头并希望也映射标准标头,则还必须包含此令牌或HeaderMapper 使用header-mapper . |
6 | 在 上查看相同的选项<int-stomp:outbound-channel-adapter> 。 |
7 | 要订阅的 STOMP 目标名称的逗号分隔列表。目标列表(以及订阅)可以在运行时通过addDestination() 和removeDestination() @ManagedOperation 注释进行修改。 |
8 | 如果通道可以阻塞,则在向通道发送消息时等待的最长时间(以毫秒为单位)。例如,QueueChannel 如果已达到其最大容量,则可以阻塞直到有可用空间。 |
9 | payload 要从传入的 STOMP 帧转换的目标的 Java 类型的完全限定名称。它默认为String.class . |
10 | 在 上查看相同的选项<int-stomp:outbound-channel-adapter> 。 |
11 | 在 上查看相同的选项<int-stomp:outbound-channel-adapter> 。 |