RSocket 支持
RSocket Spring 集成模块 ( spring-integration-rsocket
) 允许执行RSocket 应用程序协议。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:5.5.13"
该模块从 5.2 版开始可用,它基于 Spring Messaging 基础及其 RSocket 组件实现,例如RSocketRequester
、RSocketMessageHandler
和RSocketStrategies
。有关 RSocket 协议、术语和组件的更多信息,请参阅Spring Framework RSocket Support。
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring Integration RSocket 支持提供ServerRSocketConnector
了.ClientRSocketConnector
AbstractRSocketConnector
根据ServerRSocketConnector
提供io.rsocket.transport.ServerTransport
的用于接受来自客户端的连接,在主机和端口上公开一个侦听器。内部RSocketServer
实例可以使用setServerConfigurer()
以及其他可以配置的选项进行定制,例如RSocketStrategies
有效MimeType
载荷数据和标头元数据。当setupRoute
客户端请求者(见ClientRSocketConnector
下文)提供 a 时,连接的客户端将作为 a 存储RSocketRequester
在由clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
. 默认情况下,连接数据用作键的转换值,转换为 UTF-8 字符集的字符串。这样一个RSocketRequester
注册表可以在应用程序逻辑中用于确定特定的客户端连接以与之交互,或将相同的消息发布到所有连接的客户端。当从客户端建立连接时,RSocketConnectedEvent
从ServerRSocketConnector
. 这类似于@ConnectMapping
Spring Messaging 模块中的注释提供的内容。映射模式*
意味着接受所有客户端路由。可RSocketConnectedEvent
用于通过DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
标头区分不同的路由。
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括RSocketStrategies
bean 和@EventListener
for RSocketConnectedEvent
,都是可选的。有关更多信息,请参阅ServerRSocketConnector
JavaDocs。
从版本 5.2.1 开始,它ServerRSocketMessageHandler
被提取到一个公共的顶级类,以便与现有的 RSocket 服务器进行可能的连接。当 aServerRSocketConnector
提供了 的外部实例时ServerRSocketMessageHandler
,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler
可以配置一个messageMappingCompatible
标志来处理@MessageMapping
RSocket 控制器,完全取代标准提供的功能RSocketMessageHandler
。这在混合配置中很有用,当经典@MessageMapping
方法与 RSocket 通道适配器一起存在于同一个应用程序中并且应用程序中存在外部配置的 RSocket 服务器时。
ClientRSocketConnector
用作RSocketRequester
基于所提供的连接RSocket
的支架ClientTransport
。RSocketConnector
可以使用提供的RSocketConnectorConfigurer
. (setupRoute
带有可选模板变量)和setupData
带有元数据的也可以在此组件上配置。
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
大多数这些选项(包括RSocketStrategies
bean)都是可选的。请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。请参阅ServerRSocketConnector.clientRSocketKeyStrategy
用setupData
例。另请参阅ClientRSocketConnector
及其AbstractRSocketConnector
超类 JavaDocs 了解更多信息。
两者ClientRSocketConnector
都ServerRSocketConnector
负责将入站通道适配器映射到它们的path
配置,以路由传入的 RSocket 请求。有关详细信息,请参阅下一节。
RSocket 入站网关
RSocketInboundGateway
负责接收 RSocket 请求并产生响应(如果有)。它需要一个映射数组path
,可以是类似于 MVC 请求映射或@MessageMapping
语义的模式。此外(从 5.2.2 版开始),RSocketInteractionModel
可以在 上配置一组交互模型(请参阅 参考资料),RSocketInboundGateway
以通过特定帧类型限制对该端点的 RSocket 请求。默认情况下,支持所有交互模型。这样的 bean,根据它的IntegrationRSocketEndpoint
实现(a 的扩展ReactiveMessageHandler
),由ServerRSocketConnector
或ClientRSocketConnector
内部的路由逻辑自动检测到IntegrationRSocketMessageHandler
传入请求。可以AbstractRSocketConnector
提供给RSocketInboundGateway
用于显式端点注册。这样,自动检测选项就被禁用了AbstractRSocketConnector
。RSocketStrategies
也可以注入到RSocketInboundGateway
或它们是从提供的AbstractRSocketConnector
覆盖任何显式注入中获得的。解码器用于RSocketStrategies
根据提供的解码请求有效负载requestElementType
。如果RSocketPayloadReturnValueHandler.RESPONSE_HEADER
传入的 中未提供标头Message
,则将RSocketInboundGateway
请求视为fireAndForget
RSocket 交互模型。在这种情况下,RSocketInboundGateway
ansend
对outputChannel
. 否则,标头中的MonoProcessor
值RSocketPayloadReturnValueHandler.RESPONSE_HEADER
用于向 RSocket 发送回复。为此,RSocketInboundGateway
ansendAndReceiveMessageReactive
对outputChannel
. 这payload
向下游发送的消息始终是Flux
根据MessagingRSocket
逻辑的。在fireAndForget
RSocket 交互模型中,消息有一个普通的 converted payload
. 回复payload
可以是一个普通的对象,也可以是一个Publisher
-RSocketInboundGateway
根据RSocketStrategies
.
从 5.3 版开始decodeFluxAsUnit
,false
在RSocketInboundGateway
. 默认情况下,传入Flux
会转换为每个事件单独解码的方式。这是目前存在于@MessageMapping
语义上的确切行为。Flux
要根据应用要求恢复以前的行为或将整体解码为单个单元,decodeFluxAsUnit
必须将 设置为true
。然而,目标解码逻辑取决于所Decoder
选择的,例如,aStringDecoder
需要在流中出现新的行分隔符(默认情况下)以指示字节缓冲区结束。
有关如何配置端点和处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。RSocketInboundGateway
RSocket 出站网关
这RSocketOutboundGateway
是一个AbstractReplyProducingMessageHandler
向 RSocket 执行请求并根据 RSocket 回复(如果有)生成回复。一个低级别的 RSocket 协议交互被委派为从提供的或从服务器端请求消息中的标头RSocketRequester
解析的。根据为连接请求映射选择的某些业务密钥,可以从 API或使用API解析服务器端的目标。有关更多信息,请参阅JavaDocs。ClientRSocketConnector
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
RSocketRequester
RSocketConnectedEvent
ServerRSocketConnector.getClientRSocketRequester()
ServerRSocketConnector.setClientRSocketKeyStrategy()
ServerRSocketConnector
route
发送请求必须显式配置(连同路径变量)或通过根据请求消息评估的 SpEL 表达式。
RSocket 交互模型可以通过RSocketInteractionModel
选项或相应的表达式设置来提供。默认情况下,arequestResponse
用于常见网关用例。
当请求消息有效负载是 a时,可以提供Publisher
一个选项来根据目标中提供的a 对其元素进行编码。此选项的表达式可以计算为. 有关数据及其类型的更多信息,请参阅JavaDocs。publisherElementType
RSocketStrategies
RSocketRequester
ParameterizedTypeReference
RSocketRequester.RequestSpec.data()
RSocket 请求也可以使用metadata
. 为此metadataExpression
,可以在RSocketOutboundGateway
. 这样的表达式必须计算为 a Map<Object, MimeType>
。
当interactionModel
is notfireAndForget
时,expectedResponseType
必须提供 an。默认情况下是一个String.class
。此选项的表达式可以计算为ParameterizedTypeReference
. 有关回复数据及其类型的更多信息,请参阅RSocketRequester.RetrieveSpec.retrieveMono()
和JavaDocs。RSocketRequester.RetrieveSpec.retrieveFlux()
payload
来自 的回复RSocketOutboundGateway
是Mono
(即使对于fireAndForget
交互模型也是如此Mono<Void>
)总是将此组件设置为async
。这样的一个Mono
在生成到outputChannel
常规频道之前订阅或由FluxMessageChannel
. Flux
对requestStream
or交互模型的响应requestChannel
也包含在回复Mono
中。它可以通过FluxMessageChannel
带有直通服务激活器的下游展平:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
预期的响应类型也可以配置(或通过表达式评估)以void
将此网关视为出站通道适配器。然而,outputChannel
仍然必须配置(即使它只是一个NullChannel
)来启动对返回的订阅Mono
。
有关如何配置端点以处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点。RSocketOutboundGateway
RSocket 命名空间支持
Spring Integration 提供了一个rsocket
命名空间和相应的模式定义。要将其包含在您的配置中,请在应用程序上下文配置文件中添加以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入站
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用命名空间中的适当inbound-gateway
组件int-rsocket
。以下示例显示了如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
AClientRSocketConnector
和ServerRSocketConnector
应配置为通用<bean>
定义。
出境
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关spring-integration-rsocket.xsd
所有这些 XML 属性的说明,请参阅。
使用 Java 配置 RSocket 端点
以下示例显示如何使用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中假定为ClientRSocketConnector
或,用于自动检测“回声”路径上的此类端点。ServerRSocketConnector
请注意@Transformer
签名及其对 RSocket 请求的完全反应式处理并产生反应性回复。
以下示例显示如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定为“或”,用于自动检测“/大写”路径上的此类端点,并将预期的交互模型视为“请求通道” ClientRSocketConnector
。ServerRSocketConnector
以下示例显示如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
仅客户端需要。在服务器端,必须在请求消息中提供RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
带有值的标头。RSocketRequester
以下示例显示如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何使用上述流程开头提到的接口的更多信息,请参阅IntegrationFlow
网关。Function