WebSocket 支持

从 4.1 版开始,Spring Integration 支持 WebSocket。它基于 Spring Frameworkweb-socket模块中的架构、基础设施和 API。因此,Spring WebSocket 的许多组件(例如SubProtocolHandlerWebSocketClient)和配置选项(例如@EnableWebSocketMessageBroker)都可以在 Spring Integration 中重用。有关详细信息,请参阅Spring Framework 参考手册中的Spring Framework WebSocket Support一章。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-websocket:5.5.13"

对于服务器端,org.springframework:spring-webmvc必须明确包含依赖项。

Spring Framework WebSocket 基础架构基于 Spring 消息传递基础,并提供了一个基本的消息传递框架,该框架基于 Spring Integration 使用的相同MessageChannel实现和MessageHandler实现(以及一些 POJO 方法注释映射)。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。为此,您可以@MessagingGateway使用适当的注释配置 Spring 集成,如以下示例所示:

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

概述

由于 WebSocket 协议根据定义是流式传输的,并且我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的WebSocketSession,无论是在客户端还是服务器端。为了封装连接管理和WebSocketSession注册,IntegrationWebSocketContainer提供了ClientWebSocketContainerServerWebSocketContainer实现。由于WebSocket API及其在 Spring Framework 中的实现(具有许多扩展),服务器端和客户端都使用了相同的类(当然,从 Java 的角度来看)。因此,大多数连接和WebSocketSession两侧的注册表选项相同。这让我们可以重用许多配置项和基础设施挂钩来在服务器端和客户端构建 WebSocket 应用程序。下面的例子展示了组件如何服务于这两个目的:

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

旨在实现双向消息传递,IntegrationWebSocketContainer并且可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时只能从其中一个引用。它可以在没有任何通道适配器的情况下使用,但在这种情况下,IntegrationWebSocketContainer它只起到WebSocketSession注册表的作用。

ServerWebSocketContainer内部WebSocketConfigurer注册IntegrationWebSocketContainer.IntegrationWebSocketHandlerEndpoint. 它在目标供应商 WebSocket 容器中提供的paths和其他服务器 WebSocket 选项(例如HandshakeHandlerSockJS fallback)下执行此操作。ServletWebSocketHandlerRegistry这种注册是通过一个基础设施WebSocketIntegrationConfigurationInitializer组件来实现的,它的作用与@EnableWebSocket注释相同。这意味着,通过使用@EnableIntegration(或应用程序上下文中的任何 Spring Integration 命名空间),您可以省略@EnableWebSocket声明,因为 Spring Integration 基础设施检测所有 WebSocket 端点。

WebSocket 入站通道适配器

实现交互的WebSocketInboundChannelAdapter接收部分。WebSocketSession您必须为它提供 a IntegrationWebSocketContainer,并且适配器将自己注册为 aWebSocketListener以处理传入的消息和WebSocketSession事件。

只能WebSocketListener注册一个IntegrationWebSocketContainer

对于 WebSocket 子协议,WebSocketInboundChannelAdapter可以将其配置SubProtocolHandlerRegistry为第二个构造函数参数。适配器委托给来确定接受SubProtocolHandlerRegistry的合适,并根据子协议实现将 a 转换为 a 。SubProtocolHandlerWebSocketSessionWebSocketMessageMessage

默认情况下,WebSocketInboundChannelAdapter仅依赖于原始PassThruSubProtocolHandler实现,它将 转换WebSocketMessageMessage.

仅接受具有或为空标头的实例并将其WebSocketInboundChannelAdapter发送到底层集成流。所有其他类型都通过从实现发出的实例(例如)来处理。MessageSimpMessageType.MESSAGEsimpMessageTypeMessageApplicationEventSubProtocolHandlerStompSubProtocolHandler

在服务器端,如果@EnableWebSocketMessageBroker存在配置,您可以WebSocketInboundChannelAdapter使用该useBroker = true选项进行配置。在这种情况下,所有non-MESSAGE Message类型都委托给提供的AbstractBrokerMessageHandler. 此外,如果代理中继配置了目标前缀,则与代理目标匹配的那些消息将路由到 .而AbstractBrokerMessageHandler不是.outputChannelWebSocketInboundChannelAdapter

如果useBroker = false接收到的消息属于该SimpMessageType.CONNECT类型,则WebSocketInboundChannelAdapter立即向 发送SimpMessageType.CONNECT_ACK消息WebSocketSession而不将其发送到通道。

Spring 的 WebSocket 支持只允许配置一个代理中继。因此,我们不需要AbstractBrokerMessageHandler参考。它在应用程序上下文中被检测到。

有关更多配置选项,请参阅WebSockets 命名空间支持

WebSocket 出站通道适配器

WebSocketOutboundChannelAdapter: _

  1. 接受来自其的 Spring Integration 消息MessageChannel

  2. 确定WebSocketSession idMessageHeaders

  3. WebSocketSession从提供的检索IntegrationWebSocketContainer

  4. WebSocketMessage将工作的转换和发送委托给SubProtocolHandler提供的SubProtocolHandlerRegistry.

在客户端,WebSocketSession id不需要消息头,因为ClientWebSocketContainer只处理单个连接及其WebSocketSession分别。

要使用 STOMP 子协议,您应该使用StompSubProtocolHandler. StompHeaderAccessor.create(StompCommand…​)然后,您可以使用and aMessageBuilder或仅使用 a将任何 STOMP 消息类型发送到此适配器HeaderEnricher(请参阅Header Enricher)。

本章的其余部分主要介绍其他配置选项。

WebSockets 命名空间支持

Spring Integration WebSocket 命名空间包括本章其余部分中描述的几个组件。要将其包含在您的配置中,请在应用程序上下文配置文件中使用以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container>属性

以下清单显示了<int-websocket:client-container>元素可用的属性:

<int-websocket:client-container
                  id=""                        (1)
                  client=""                    (2)
                  uri=""                       (3)
                  uri-variables=""             (4)
                  origin=""                    (5)
                  send-time-limit=""           (6)
                  send-buffer-size-limit=""    (7)
                  auto-startup=""              (8)
                  phase="">                    (9)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>  (10)
</int-websocket:client-container>
1 组件 bean 名称。
2 WebSocketClientbean 参考。
3 目标 WebSocket 服务的uri或。uriTemplate如果将其用作uriTemplate带有 URI 变量的占位符,则该uri-variables属性是必需的。
4 属性值中 URI 变量占位符的逗号分隔值uri。这些值根据它们在uri. 见UriComponents.expand(Object…​uriVariableValues)
5 Origin握手 HTTP 标头值。
6 WebSocket 会话“发送”超时限制。默认为10000.
7 WebSocket 会话“发送”消息大小限制。默认为524288.
8 指示此端点是否应自动启动的布尔值。默认为false,假设此容器是从WebSocket 入站适配器启动的。
9 此端点应启动和停止的生命周期阶段。该值越低,此端点启动越早,停止越晚。默认值为Integer.MAX_VALUE. 值可以是负数。见SmartLifeCycle
10 A MapofHttpHeaders与握手请求一起使用。

<int-websocket:server-container>属性

以下清单显示了<int-websocket:server-container>元素可用的属性:

<int-websocket:server-container
          id=""                         (1)
          path=""                       (2)
          handshake-handler=""          (3)
          handshake-interceptors=""     (4)
          decorator-factories=""        (5)
          send-time-limit=""            (6)
          send-buffer-size-limit=""     (7)
          allowed-origins="">           (8)
          <int-websocket:sockjs
            client-library-url=""       (9)
            stream-bytes-limit=""       (10)
            session-cookie-needed=""    (11)
            heartbeat-time=""           (12)
            disconnect-delay=""         (13)
            message-cache-size=""       (14)
            websocket-enabled=""        (15)
            scheduler=""                (16)
            message-codec=""            (17)
            transport-handlers=""       (18)
            suppress-cors="true"="" />  (19)
</int-websocket:server-container>
1 组件 bean 名称。
2 将特定请求映射到WebSocketHandler. 支持精确的路径映射 URI(例如/myPath)和 ant 样式的路径模式(例如/myPath/**)。
3 HandshakeHandlerbean 参考。默认为DefaultHandshakeHandler.
4 HandshakeInterceptorbean 引用列表。
5 WebSocketHandlerDecoratorFactory装饰用于处理 WebSocket 消息的处理程序的一个或多个工厂 ( ) 的列表。这对于一些高级用例可能很有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。有关更多信息,请参阅Spring Session 项目
6 在 上查看相同的选项<int-websocket:client-container>
7 在 上查看相同的选项<int-websocket:client-container>
8 允许的原始标头值。您可以将多个来源指定为逗号分隔的列表。此检查主要是为浏览器客户端设计的。没有什么可以阻止其他类型的客户端修改原始标头值。当启用 SockJS 并限制允许的来源时,不使用来源标头进行跨域请求的传输类型(jsonp-pollingiframe-xhr-pollingiframe-eventsourceiframe-htmlfile)将被禁用。因此,不支持 IE6 和 IE7,仅支持不带 cookie 的 IE8 和 IE9。默认情况下,允许所有来源。
9 没有本机跨域通信的传输(例如eventsourceand htmlfile)必须从不可见 iframe 中的“外部”域中获取一个简单页面,以便 iframe 中的代码可以从 SockJS 服务器的本地域运行。由于 iframe 需要加载 SockJS javascript 客户端库,因此该属性允许您指定加载它的位置。默认情况下,它指向https://d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js. 但是,您也可以将其设置为指向应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,该 URL 必须相对于 iframe URL。例如,假设映射到的 SockJS 端点/sockjs和生成的 iframe URL 是/sockjs/iframe.html, 相对 URL 必须以 "../../" 开头才能遍历到 SockJS 映射上方的位置。对于基于前缀的 servlet 映射,您可能需要再进行一次遍历。
10 在关闭之前可以通过单个 HTTP 流请求发送的最小字节数。默认为128K(即 128*1024 或 131072 字节)。
11 cookie_needed来自 SockJs/info端点的响应中的值。此属性指示JSESSIONID应用程序是否需要 cookie 才能正常运行(例如,用于负载平衡或在 Java Servlet 容器中以使用 HTTP 会话)。
12 服务器未发送任何消息的时间量(以毫秒为单位),之后服务器应向客户端发送心跳帧以防止连接中断。默认值为25,000(25 秒)。
13 在没有接收连接(即服务器可以向客户端发送数据的活动连接)之后,客户端被认为断开连接之前的时间量(以毫秒为单位)。默认值为5000
14 会话在等待来自客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息的数量。默认大小为100.
15 一些负载均衡器不支持 WebSocket。将此选项设置false为禁用服务器端的 WebSocket 传输。默认值为true
16 TaskSchedulerbean 参考。ThreadPoolTaskScheduler如果没有提供值,则会创建一个新实例。此调度程序实例用于调度心跳消息。
17 SockJsMessageCodec用于编码和解码 SockJS 消息的bean 引用。默认情况下,Jackson2SockJsMessageCodec使用,这要求 Jackson 库存在于类路径中。
18 TransportHandlerbean 引用列表。
19 是否禁用为 SockJS 请求自动添加 CORS 标头。默认值为false

<int-websocket:outbound-channel-adapter>属性

以下清单显示了<int-websocket:outbound-channel-adapter>元素可用的属性:

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 组件 bean 名称。如果您不提供该channel属性,DirectChannel则会在应用程序上下文中创建并注册一个并使用该id属性作为 bean 名称。在这种情况下,端点注册了 bean 名称idplus .adapter。并且MessageHandler使用 bean 别名idplus注册.handler
2 标识连接到此适配器的通道。
3 对 bean 的引用IntegrationWebSocketContainer,它封装了低级连接和WebSocketSession处理操作。必需的。
4 SubProtocolHandler对实例的可选引用。当客户端没有请求子协议或者它是单个协议处理程序时使用它。如果未提供此引用或protocol-handlers列表,PassThruSubProtocolHandler则默认使用 。
5 SubProtocolHandler此通道适配器的 bean 引用列表。如果您只提供一个 bean 引用而不提供一个default-protocol-handler,则该单个SubProtocolHandler将用作default-protocol-handler. 如果不设置此属性 或default-protocol-handlerPassThruSubProtocolHandler则默认使用 。
6 MessageConverter此通道适配器的 bean 引用列表。
7 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。此标志仅在message-converters提供时使用。否则,将注册所有默认转换器。默认为false. 默认转换器是(按顺序):StringMessageConverter、、ByteArrayMessageConverterMappingJackson2MessageConverter(如果 Jackson 库存在于类路径中)。
8 指示此端点是否应自动启动的布尔值。默认为true.
9 此端点应启动和停止的生命周期阶段。该值越低,此端点启动越早,停止越晚。默认值为Integer.MIN_VALUE. 值可以是负数。见SmartLifeCycle

<int-websocket:inbound-channel-adapter>属性

以下清单显示了<int-websocket:outbound-channel-adapter>元素可用的属性:

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 组件 bean 名称。如果您不设置该channel属性,DirectChannel则会在应用程序上下文中创建并注册一个并使用该id属性作为 bean 名称。在这种情况下,端点注册了 bean 名称idplus .adapter
2 标识连接到此适配器的通道。
3 实例应发送到的MessageChannelbean 引用。ErrorMessage
4 在 上查看相同的选项<int-websocket:outbound-channel-adapter>
5 在 上查看相同的选项<int-websocket:outbound-channel-adapter>
6 在 上查看相同的选项<int-websocket:outbound-channel-adapter>
7 在 上查看相同的选项<int-websocket:outbound-channel-adapter>
8 在 上查看相同的选项<int-websocket:outbound-channel-adapter>
9 如果通道可以阻塞,则在向通道发送消息时等待的最长时间(以毫秒为单位)。例如,QueueChannel如果已达到其最大容量,则可以阻塞直到有可用空间。
10 payload要从传入的 转换的目标的 Java 类型的完全限定名称WebSocketMessage。默认为java.lang.String.
11 指示此适配器是否将non-MESSAGE WebSocketMessage具有代理目标的实例和消息AbstractBrokerMessageHandler从应用程序上下文发送到。该属性为true时,Broker Relay需要配置。此属性仅在服务器端使用。在客户端,它被忽略。默认为false.
12 在 上查看相同的选项<int-websocket:outbound-channel-adapter>
13 在 上查看相同的选项<int-websocket:outbound-channel-adapter>

使用ClientStompEncoder

从版本 4.3.13 开始,Spring Integration 提供ClientStompEncoder(作为标准的扩展StompEncoder)用于 WebSocket 通道适配器的客户端。为了正确准备客户端消息,您必须ClientStompEncoderStompSubProtocolHandler. 默认值的一个问题StompSubProtocolHandler是它是为服务器端设计的,因此它将SEND stompCommand标头更新为MESSAGE(根据服务器端的 STOMP 协议的要求)。如果客户端没有在正确的SENDWeb 套接字帧中发送其消息,则某些 STOMP 代理不接受它们。ClientStompEncoder在这种情况下,的目的是覆盖stompCommand标头并将其设置为SEND在将消息编码为 之前的值byte[]

动态 WebSocket 端点注册

从版本 5.5 开始,WebSocket 服务器端点(基于 a 的通道适配器ServerWebSocketContainer)现在可以在运行时注册(和删除) - 映射的pathsaServerWebSocketContainer通过暴露HandlerMapping到 aDispatcherServlet中并且可供 WebSocket 客户端访问。动态和运行时集成流支持有助于以透明的方式注册这些端点:

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlows.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
调用.addBean(serverWebSocketContainer)动态流注册以将实例添加ServerWebSocketContainerApplicationContext端点注册中很重要。当动态流注册被销毁时,关联的ServerWebSocketContainer实例也会被销毁,以及相应的端点注册,包括 URL 路径映射。
动态 Websocket 端点只能通过 Spring Integration 机制注册:当使用常规 Spring 时@EnableWebsocket,Spring Integration 配置回退,并且没有注册动态端点的基础设施。

1. see XML Configuration