WebSocket 支持
从 4.1 版开始,Spring Integration 支持 WebSocket。它基于 Spring Frameworkweb-socket
模块中的架构、基础设施和 API。因此,Spring WebSocket 的许多组件(例如SubProtocolHandler
或WebSocketClient
)和配置选项(例如@EnableWebSocketMessageBroker
)都可以在 Spring Integration 中重用。有关详细信息,请参阅Spring Framework 参考手册中的Spring Framework WebSocket Support一章。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:5.5.13"
对于服务器端,org.springframework:spring-webmvc
必须明确包含依赖项。
Spring Framework WebSocket 基础架构基于 Spring 消息传递基础,并提供了一个基本的消息传递框架,该框架基于 Spring Integration 使用的相同MessageChannel
实现和MessageHandler
实现(以及一些 POJO 方法注释映射)。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。为此,您可以@MessagingGateway
使用适当的注释配置 Spring 集成,如以下示例所示:
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由于 WebSocket 协议根据定义是流式传输的,并且我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的WebSocketSession
,无论是在客户端还是服务器端。为了封装连接管理和WebSocketSession
注册,IntegrationWebSocketContainer
提供了ClientWebSocketContainer
和ServerWebSocketContainer
实现。由于WebSocket API及其在 Spring Framework 中的实现(具有许多扩展),服务器端和客户端都使用了相同的类(当然,从 Java 的角度来看)。因此,大多数连接和WebSocketSession
两侧的注册表选项相同。这让我们可以重用许多配置项和基础设施挂钩来在服务器端和客户端构建 WebSocket 应用程序。下面的例子展示了组件如何服务于这两个目的:
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
旨在实现双向消息传递,IntegrationWebSocketContainer
并且可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时只能从其中一个引用。它可以在没有任何通道适配器的情况下使用,但在这种情况下,IntegrationWebSocketContainer
它只起到WebSocketSession
注册表的作用。
将ServerWebSocketContainer 内部WebSocketConfigurer 注册IntegrationWebSocketContainer.IntegrationWebSocketHandler 为Endpoint . 它在目标供应商 WebSocket 容器中提供的paths 和其他服务器 WebSocket 选项(例如HandshakeHandler 或SockJS fallback )下执行此操作。ServletWebSocketHandlerRegistry 这种注册是通过一个基础设施WebSocketIntegrationConfigurationInitializer 组件来实现的,它的作用与@EnableWebSocket 注释相同。这意味着,通过使用@EnableIntegration (或应用程序上下文中的任何 Spring Integration 命名空间),您可以省略@EnableWebSocket 声明,因为 Spring Integration 基础设施检测所有 WebSocket 端点。
|
WebSocket 入站通道适配器
实现交互的WebSocketInboundChannelAdapter
接收部分。WebSocketSession
您必须为它提供 a IntegrationWebSocketContainer
,并且适配器将自己注册为 aWebSocketListener
以处理传入的消息和WebSocketSession
事件。
只能WebSocketListener 注册一个IntegrationWebSocketContainer 。
|
对于 WebSocket 子协议,WebSocketInboundChannelAdapter
可以将其配置SubProtocolHandlerRegistry
为第二个构造函数参数。适配器委托给来确定接受SubProtocolHandlerRegistry
的合适,并根据子协议实现将 a 转换为 a 。SubProtocolHandler
WebSocketSession
WebSocketMessage
Message
默认情况下,WebSocketInboundChannelAdapter 仅依赖于原始PassThruSubProtocolHandler 实现,它将 转换WebSocketMessage 为Message .
|
仅接受具有或为空标头的实例并将其WebSocketInboundChannelAdapter
发送到底层集成流。所有其他类型都通过从实现发出的实例(例如)来处理。Message
SimpMessageType.MESSAGE
simpMessageType
Message
ApplicationEvent
SubProtocolHandler
StompSubProtocolHandler
在服务器端,如果@EnableWebSocketMessageBroker
存在配置,您可以WebSocketInboundChannelAdapter
使用该useBroker = true
选项进行配置。在这种情况下,所有non-MESSAGE
Message
类型都委托给提供的AbstractBrokerMessageHandler
. 此外,如果代理中继配置了目标前缀,则与代理目标匹配的那些消息将路由到 .而AbstractBrokerMessageHandler
不是.outputChannel
WebSocketInboundChannelAdapter
如果useBroker = false
接收到的消息属于该SimpMessageType.CONNECT
类型,则WebSocketInboundChannelAdapter
立即向 发送SimpMessageType.CONNECT_ACK
消息WebSocketSession
而不将其发送到通道。
Spring 的 WebSocket 支持只允许配置一个代理中继。因此,我们不需要AbstractBrokerMessageHandler 参考。它在应用程序上下文中被检测到。
|
有关更多配置选项,请参阅WebSockets 命名空间支持。
WebSocket 出站通道适配器
WebSocketOutboundChannelAdapter
: _
-
接受来自其的 Spring Integration 消息
MessageChannel
-
确定
WebSocketSession
id
从MessageHeaders
-
WebSocketSession
从提供的检索IntegrationWebSocketContainer
-
WebSocketMessage
将工作的转换和发送委托给SubProtocolHandler
提供的SubProtocolHandlerRegistry
.
在客户端,WebSocketSession
id
不需要消息头,因为ClientWebSocketContainer
只处理单个连接及其WebSocketSession
分别。
要使用 STOMP 子协议,您应该使用StompSubProtocolHandler
. StompHeaderAccessor.create(StompCommand…)
然后,您可以使用and aMessageBuilder
或仅使用 a将任何 STOMP 消息类型发送到此适配器HeaderEnricher
(请参阅Header Enricher)。
本章的其余部分主要介绍其他配置选项。
WebSockets 命名空间支持
Spring Integration WebSocket 命名空间包括本章其余部分中描述的几个组件。要将其包含在您的配置中,请在应用程序上下文配置文件中使用以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
属性
以下清单显示了<int-websocket:client-container>
元素可用的属性:
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
auto-startup="" (8)
phase=""> (9)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (10)
</int-websocket:client-container>
1 | 组件 bean 名称。 |
2 | WebSocketClient bean 参考。 |
3 | 目标 WebSocket 服务的uri 或。uriTemplate 如果将其用作uriTemplate 带有 URI 变量的占位符,则该uri-variables 属性是必需的。 |
4 | 属性值中 URI 变量占位符的逗号分隔值uri 。这些值根据它们在uri . 见UriComponents.expand(Object…uriVariableValues) 。 |
5 | Origin 握手 HTTP 标头值。 |
6 | WebSocket 会话“发送”超时限制。默认为10000 . |
7 | WebSocket 会话“发送”消息大小限制。默认为524288 . |
8 | 指示此端点是否应自动启动的布尔值。默认为false ,假设此容器是从WebSocket 入站适配器启动的。 |
9 | 此端点应启动和停止的生命周期阶段。该值越低,此端点启动越早,停止越晚。默认值为Integer.MAX_VALUE . 值可以是负数。见SmartLifeCycle 。 |
10 | A Map ofHttpHeaders 与握手请求一起使用。 |
<int-websocket:server-container>
属性
以下清单显示了<int-websocket:server-container>
元素可用的属性:
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
allowed-origins=""> (8)
<int-websocket:sockjs
client-library-url="" (9)
stream-bytes-limit="" (10)
session-cookie-needed="" (11)
heartbeat-time="" (12)
disconnect-delay="" (13)
message-cache-size="" (14)
websocket-enabled="" (15)
scheduler="" (16)
message-codec="" (17)
transport-handlers="" (18)
suppress-cors="true"="" /> (19)
</int-websocket:server-container>
1 | 组件 bean 名称。 |
2 | 将特定请求映射到WebSocketHandler . 支持精确的路径映射 URI(例如/myPath )和 ant 样式的路径模式(例如/myPath/** )。 |
3 | HandshakeHandler bean 参考。默认为DefaultHandshakeHandler . |
4 | HandshakeInterceptor bean 引用列表。 |
5 | WebSocketHandlerDecoratorFactory 装饰用于处理 WebSocket 消息的处理程序的一个或多个工厂 ( ) 的列表。这对于一些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。有关更多信息,请参阅Spring Session 项目。 |
6 | 在 上查看相同的选项<int-websocket:client-container> 。 |
7 | 在 上查看相同的选项<int-websocket:client-container> 。 |
8 | 允许的原始标头值。您可以将多个来源指定为逗号分隔的列表。此检查主要是为浏览器客户端设计的。没有什么可以阻止其他类型的客户端修改原始标头值。当启用 SockJS 并限制允许的来源时,不使用来源标头进行跨域请求的传输类型(jsonp-polling 、iframe-xhr-polling 、iframe-eventsource 和iframe-htmlfile )将被禁用。因此,不支持 IE6 和 IE7,仅支持不带 cookie 的 IE8 和 IE9。默认情况下,允许所有来源。 |
9 | 没有本机跨域通信的传输(例如eventsource and htmlfile )必须从不可见 iframe 中的“外部”域中获取一个简单页面,以便 iframe 中的代码可以从 SockJS 服务器的本地域运行。由于 iframe 需要加载 SockJS javascript 客户端库,因此该属性允许您指定加载它的位置。默认情况下,它指向https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js . 但是,您也可以将其设置为指向应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,该 URL 必须相对于 iframe URL。例如,假设映射到的 SockJS 端点/sockjs 和生成的 iframe URL 是/sockjs/iframe.html , 相对 URL 必须以 "../../" 开头才能遍历到 SockJS 映射上方的位置。对于基于前缀的 servlet 映射,您可能需要再进行一次遍历。 |
10 | 在关闭之前可以通过单个 HTTP 流请求发送的最小字节数。默认为128K (即 128*1024 或 131072 字节)。 |
11 | cookie_needed 来自 SockJs/info 端点的响应中的值。此属性指示JSESSIONID 应用程序是否需要 cookie 才能正常运行(例如,用于负载平衡或在 Java Servlet 容器中以使用 HTTP 会话)。 |
12 | 服务器未发送任何消息的时间量(以毫秒为单位),之后服务器应向客户端发送心跳帧以防止连接中断。默认值为25,000 (25 秒)。 |
13 | 在没有接收连接(即服务器可以向客户端发送数据的活动连接)之后,客户端被认为断开连接之前的时间量(以毫秒为单位)。默认值为5000 。 |
14 | 会话在等待来自客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息的数量。默认大小为100 . |
15 | 一些负载均衡器不支持 WebSocket。将此选项设置false 为禁用服务器端的 WebSocket 传输。默认值为true 。 |
16 | TaskScheduler bean 参考。ThreadPoolTaskScheduler 如果没有提供值,则会创建一个新实例。此调度程序实例用于调度心跳消息。 |
17 | SockJsMessageCodec 用于编码和解码 SockJS 消息的bean 引用。默认情况下,Jackson2SockJsMessageCodec 使用,这要求 Jackson 库存在于类路径中。 |
18 | TransportHandler bean 引用列表。 |
19 | 是否禁用为 SockJS 请求自动添加 CORS 标头。默认值为false 。 |
<int-websocket:outbound-channel-adapter>
属性
以下清单显示了<int-websocket:outbound-channel-adapter>
元素可用的属性:
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 bean 名称。如果您不提供该channel 属性,DirectChannel 则会在应用程序上下文中创建并注册一个并使用该id 属性作为 bean 名称。在这种情况下,端点注册了 bean 名称id plus .adapter 。并且MessageHandler 使用 bean 别名id plus注册.handler 。 |
2 | 标识连接到此适配器的通道。 |
3 | 对 bean 的引用IntegrationWebSocketContainer ,它封装了低级连接和WebSocketSession 处理操作。必需的。 |
4 | SubProtocolHandler 对实例的可选引用。当客户端没有请求子协议或者它是单个协议处理程序时使用它。如果未提供此引用或protocol-handlers 列表,PassThruSubProtocolHandler 则默认使用 。 |
5 | SubProtocolHandler 此通道适配器的 bean 引用列表。如果您只提供一个 bean 引用而不提供一个default-protocol-handler ,则该单个SubProtocolHandler 将用作default-protocol-handler . 如果不设置此属性 或default-protocol-handler ,PassThruSubProtocolHandler 则默认使用 。 |
6 | MessageConverter 此通道适配器的 bean 引用列表。 |
7 | 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。此标志仅在message-converters 提供时使用。否则,将注册所有默认转换器。默认为false . 默认转换器是(按顺序):StringMessageConverter 、、ByteArrayMessageConverter 和MappingJackson2MessageConverter (如果 Jackson 库存在于类路径中)。 |
8 | 指示此端点是否应自动启动的布尔值。默认为true . |
9 | 此端点应启动和停止的生命周期阶段。该值越低,此端点启动越早,停止越晚。默认值为Integer.MIN_VALUE . 值可以是负数。见SmartLifeCycle 。 |
<int-websocket:inbound-channel-adapter>
属性
以下清单显示了<int-websocket:outbound-channel-adapter>
元素可用的属性:
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
1 | 组件 bean 名称。如果您不设置该channel 属性,DirectChannel 则会在应用程序上下文中创建并注册一个并使用该id 属性作为 bean 名称。在这种情况下,端点注册了 bean 名称id plus .adapter 。 |
2 | 标识连接到此适配器的通道。 |
3 | 实例应发送到的MessageChannel bean 引用。ErrorMessage |
4 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
5 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
6 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
7 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
8 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
9 | 如果通道可以阻塞,则在向通道发送消息时等待的最长时间(以毫秒为单位)。例如,QueueChannel 如果已达到其最大容量,则可以阻塞直到有可用空间。 |
10 | payload 要从传入的 转换的目标的 Java 类型的完全限定名称WebSocketMessage 。默认为java.lang.String . |
11 | 指示此适配器是否将non-MESSAGE WebSocketMessage 具有代理目标的实例和消息AbstractBrokerMessageHandler 从应用程序上下文发送到。该属性为true 时,Broker Relay 需要配置。此属性仅在服务器端使用。在客户端,它被忽略。默认为false . |
12 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
13 | 在 上查看相同的选项<int-websocket:outbound-channel-adapter> 。 |
使用ClientStompEncoder
从版本 4.3.13 开始,Spring Integration 提供ClientStompEncoder
(作为标准的扩展StompEncoder
)用于 WebSocket 通道适配器的客户端。为了正确准备客户端消息,您必须ClientStompEncoder
将StompSubProtocolHandler
. 默认值的一个问题StompSubProtocolHandler
是它是为服务器端设计的,因此它将SEND
stompCommand
标头更新为MESSAGE
(根据服务器端的 STOMP 协议的要求)。如果客户端没有在正确的SEND
Web 套接字帧中发送其消息,则某些 STOMP 代理不接受它们。ClientStompEncoder
在这种情况下,的目的是覆盖stompCommand
标头并将其设置为SEND
在将消息编码为 之前的值byte[]
。
动态 WebSocket 端点注册
从版本 5.5 开始,WebSocket 服务器端点(基于 a 的通道适配器ServerWebSocketContainer
)现在可以在运行时注册(和删除) - 映射的paths
aServerWebSocketContainer
通过暴露HandlerMapping
到 aDispatcherServlet
中并且可供 WebSocket 客户端访问。动态和运行时集成流支持有助于以透明的方式注册这些端点:
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlows.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
调用.addBean(serverWebSocketContainer) 动态流注册以将实例添加ServerWebSocketContainer 到ApplicationContext 端点注册中很重要。当动态流注册被销毁时,关联的ServerWebSocketContainer 实例也会被销毁,以及相应的端点注册,包括 URL 路径映射。
|
动态 Websocket 端点只能通过 Spring Integration 机制注册:当使用常规 Spring 时@EnableWebsocket ,Spring Integration 配置回退,并且没有注册动态端点的基础设施。
|