RSocket 支持

RSocket Spring 集成模块 ( spring-integration-rsocket) 允许执行RSocket 应用程序协议

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-rsocket:5.5.13"

该模块从 5.2 版开始可用,它基于 Spring Messaging 基础及其 RSocket 组件实现,例如RSocketRequesterRSocketMessageHandlerRSocketStrategies。有关 RSocket 协议、术语和组件的更多信息,请参阅Spring Framework RSocket Support

在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供ServerRSocketConnector了.ClientRSocketConnectorAbstractRSocketConnector

根据ServerRSocketConnector提供io.rsocket.transport.ServerTransport的用于接受来自客户端的连接,在主机和端口上公开一个侦听器。内部RSocketServer实例可以使用setServerConfigurer()以及其他可以配置的选项进行定制,例如RSocketStrategies有效MimeType载荷数据和标头元数据。当setupRoute客户端请求者(见ClientRSocketConnector下文)提供 a 时,连接的客户端将作为 a 存储RSocketRequester在由clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object>. 默认情况下,连接数据用作键的转换值,转换为 UTF-8 字符集的字符串。这样一个RSocketRequester注册表可以在应用程序逻辑中用于确定特定的客户端连接以与之交互,或将相同的消息发布到所有连接的客户端。当从客户端建立连接时,RSocketConnectedEventServerRSocketConnector. 这类似于@ConnectMappingSpring Messaging 模块中的注释提供的内容。映射模式*意味着接受所有客户端路由。可RSocketConnectedEvent用于通过DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER标头区分不同的路由。

典型的服务器配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项,包括RSocketStrategiesbean 和@EventListenerfor RSocketConnectedEvent,都是可选的。有关更多信息,请参阅ServerRSocketConnectorJavaDocs。

从版本 5.2.1 开始,它ServerRSocketMessageHandler被提取到一个公共的顶级类,以便与现有的 RSocket 服务器进行可能的连接。当 aServerRSocketConnector提供了 的外部实例时ServerRSocketMessageHandler,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler可以配置一个messageMappingCompatible标志来处理@MessageMappingRSocket 控制器,完全取代标准提供的功能RSocketMessageHandler。这在混合配置中很有用,当经典@MessageMapping方法与 RSocket 通道适配器一起存在于同一个应用程序中并且应用程序中存在外部配置的 RSocket 服务器时。

ClientRSocketConnector用作RSocketRequester基于所提供的连接RSocket的支架ClientTransportRSocketConnector可以使用提供的RSocketConnectorConfigurer. (setupRoute带有可选模板变量)和setupData带有元数据的也可以在此组件上配置。

典型的客户端配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

大多数这些选项(包括RSocketStrategiesbean)都是可选的。请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。请参阅ServerRSocketConnector.clientRSocketKeyStrategysetupData例。另请参阅ClientRSocketConnector及其AbstractRSocketConnector超类 JavaDocs 了解更多信息。

两者ClientRSocketConnectorServerRSocketConnector负责将入站通道适配器映射到它们的path配置,以路由传入的 RSocket 请求。有关详细信息,请参阅下一节。

RSocket 入站网关

RSocketInboundGateway负责接收 RSocket 请求并产生响应(如果有)。它需要一个映射数组path,可以是类似于 MVC 请求映射或@MessageMapping语义的模式。此外(从 5.2.2 版开始),RSocketInteractionModel可以在 上配置一组交互模型(请参阅 参考资料),RSocketInboundGateway以通过特定帧类型限制对该端点的 RSocket 请求。默认情况下,支持所有交互模型。这样的 bean,根据它的IntegrationRSocketEndpoint实现(a 的扩展ReactiveMessageHandler),由ServerRSocketConnectorClientRSocketConnector内部的路由逻辑自动检测到IntegrationRSocketMessageHandler传入请求。可以AbstractRSocketConnector提供给RSocketInboundGateway用于显式端点注册。这样,自动检测选项就被禁用了AbstractRSocketConnectorRSocketStrategies也可以注入到RSocketInboundGateway或它们是从提供的AbstractRSocketConnector覆盖任何显式注入中获得的。解码器用于RSocketStrategies根据提供的解码请求有效负载requestElementType。如果RSocketPayloadReturnValueHandler.RESPONSE_HEADER传入的 中未提供标头Message,则将RSocketInboundGateway请求视为fireAndForgetRSocket 交互模型。在这种情况下,RSocketInboundGatewayansendoutputChannel. 否则,标头中的MonoProcessorRSocketPayloadReturnValueHandler.RESPONSE_HEADER用于向 RSocket 发送回复。为此,RSocketInboundGatewayansendAndReceiveMessageReactiveoutputChannel. 这payload向下游发送的消息始终是Flux根据MessagingRSocket逻辑的。在fireAndForgetRSocket 交互模型中,消息有一个普通的 converted payload. 回复payload可以是一个普通的对象,也可以是一个Publisher-RSocketInboundGateway根据RSocketStrategies.

从 5.3 版开始decodeFluxAsUnitfalseRSocketInboundGateway. 默认情况下,传入Flux会转换为每个事件单独解码的方式。这是目前存在于@MessageMapping语义上的确切行为。Flux要根据应用要求恢复以前的行为或将整体解码为单个单元,decodeFluxAsUnit必须将 设置为true。然而,目标解码逻辑取决于所Decoder选择的,例如,aStringDecoder需要在流中出现新的行分隔符(默认情况下)以指示字节缓冲区结束。

有关如何配置端点和处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。RSocketInboundGateway

RSocket 出站网关

RSocketOutboundGateway是一个AbstractReplyProducingMessageHandler向 RSocket 执行请求并根据 RSocket 回复(如果有)生成回复。一个低级别的 RSocket 协议交互被委派为从提供的或从服务器端请求消息中的标头RSocketRequester解析的。根据为连接请求映射选择的某些业务密钥,可以从 API或使用API解析服务器端的目标。有关更多信息,请参阅JavaDocs。ClientRSocketConnectorRSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADERRSocketRequesterRSocketConnectedEventServerRSocketConnector.getClientRSocketRequester()ServerRSocketConnector.setClientRSocketKeyStrategy()ServerRSocketConnector

route发送请求必须显式配置(连同路径变量)或通过根据请求消息评估的 SpEL 表达式。

RSocket 交互模型可以通过RSocketInteractionModel选项或相应的表达式设置来提供。默认情况下,arequestResponse用于常见网关用例。

当请求消息有效负载是 a时,可以提供Publisher一个选项来根据目标中提供的a 对其元素进行编码。此选项的表达式可以计算为. 有关数据及其类型的更多信息,请参阅JavaDocs。publisherElementTypeRSocketStrategiesRSocketRequesterParameterizedTypeReferenceRSocketRequester.RequestSpec.data()

RSocket 请求也可以使用metadata. 为此metadataExpression,可以在RSocketOutboundGateway. 这样的表达式必须计算为 a Map<Object, MimeType>

interactionModelis notfireAndForget时,expectedResponseType必须提供 an。默认情况下是一个String.class。此选项的表达式可以计算为ParameterizedTypeReference. 有关回复数据及其类型的更多信息,请参阅RSocketRequester.RetrieveSpec.retrieveMono()和JavaDocs。RSocketRequester.RetrieveSpec.retrieveFlux()

payload来自 的回复RSocketOutboundGatewayMono(即使对于fireAndForget交互模型也是如此Mono<Void>)总是将此组件设置为async。这样的一个Mono在生成到outputChannel常规频道之前订阅或由FluxMessageChannel. FluxrequestStreamor交互模型的响应requestChannel也包含在回复Mono中。它可以通过FluxMessageChannel带有直通服务激活器的下游展平:

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目标应用程序逻辑中显式订阅。

预期的响应类型也可以配置(或通过表达式评估)以void将此网关视为出站通道适配器。然而,outputChannel仍然必须配置(即使它只是一个NullChannel)来启动对返回的订阅Mono

有关如何配置端点以处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。RSocketOutboundGateway

RSocket 命名空间支持

Spring Integration 提供了一个rsocket命名空间和相应的模式定义。要将其包含在您的配置中,请在应用程序上下文配置文件中添加以下命名空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入站

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用命名空间中的适当inbound-gateway组件int-rsocket。以下示例显示了如何配置它:

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

AClientRSocketConnectorServerRSocketConnector应配置为通用<bean>定义。

出境

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关spring-integration-rsocket.xsd所有这些 XML 属性的说明,请参阅。

使用 Java 配置 RSocket 端点

以下示例显示如何使用 Java 配置 RSocket 入站端点:

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

在此配置中假定为ClientRSocketConnector或,用于自动检测“回声”路径上的此类端点。ServerRSocketConnector请注意@Transformer签名及其对 RSocket 请求的完全反应式处理并产生反应性回复。

以下示例显示如何使用 Java DSL 配置 RSocket 入站网关:

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlows
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

在此配置中假定为“或”,用于自动检测“/大写”路径上的此类端点,并将预期的交互模型视为“请求通道” ClientRSocketConnectorServerRSocketConnector

以下示例显示如何使用 Java 配置 RSocket 出站网关:

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector()仅客户端需要。在服务器端,必须在请求消息中提供RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER带有值的标头。RSocketRequester

以下示例显示如何使用 Java DSL 配置 RSocket 出站网关:

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

有关如何使用上述流程开头提到的接口的更多信息,请参阅IntegrationFlow网关。Function


1. see XML Configuration