反应式流支持

Spring Integration在框架的某些地方以及从不同方面提供了对Reactive Streams交互的支持。我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。

前言

回顾一下,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。Spring Integration 在基于 Spring 的应用程序中启用轻量级消息传递,并支持通过声明性适配器与外部系统集成。Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持关注点的分离,这对于生成可维护、可测试的代码至关重要。这个目标是在目标应用程序中使用一等公民实现的,message例如channelendpoint,它允许我们构建一个集成流(管道),其中(在大多数情况下)一个端点将消息生成到一个通道中以供另一个端点使用。通过这种方式,我们将集成交互模型与目标业务逻辑区分开来。这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点不受影响。

另一方面,Reactive Streams 是具有非阻塞背压的异步流处理标准。Reactive Streams 的主要目标是管理跨异步边界的流数据交换——比如将元素传递到另一个线程或线程池——同时确保接收端不会被迫缓冲任意数量的数据。换句话说,背压是这个模型的一个组成部分,以便允许在线程之间进行调解的队列有界。Reactive Streams 实现的意图,例如Project Reactor,是在流应用程序的整个处理图上保留这些好处和特征。Reactive Streams 库的最终目标是使用可用的编程语言结构以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,但最终的解决方案并不像使用正常的函数链调用。它分为两个阶段:定义和执行,这发生在订阅最终反应式发布者的一段时间后,并且对数据的需求从定义的底部推送到顶部,根据需要施加背压 - 我们请求尽可能多的我们目前可以处理的事件。反应式应用程序看起来像"stream"我们在 Spring Integration 术语中习惯的那样 -"flow". 事实上,自 Java 9 以来的 Reactive Streams SPI 已在java.util.concurrent.Flow课程中呈现。

从这里开始,当我们在端点上应用一些响应式框架操作符时,看起来 Spring Integration 流确实非常适合编写响应式流应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如JdbcMessageHandler) 可以在反应流中透明地处理。当然,Spring Integration 中对 Reactive Streams 支持的主要目标是允许整个过程完全响应,按需启动并准备好背压。在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这是不可能的。在下面的部分中,我们将描述 Spring Integration 中提供了哪些组件和方法,用于开发保留集成流结构的反应式应用程序。

Spring Integration 中的所有 Reactive Streams 交互都使用Project Reactor类型实现,例如MonoFlux

消息网关

与 Reactive Streams 交互的最简单点是@MessagingGateway,我们只需将网关方法的返回类型设置为- 并且当在返回的实例Mono<?>上发生订阅时,将执行网关方法调用背后的整个集成流程。有关更多信息,Mono请参阅反应器。Mono在框架内部,类似的Mono-reply 方法用于完全基于 Reactive Streams 兼容协议的入站网关(有关更多信息,请参阅下面的Reactive Channel Adapters)。发送和接收操作被包装到一个Mono.deffer()与链接来自的回复评估replyChannel标头可用时。这样,特定响应式协议(例如 Netty)的入站组件将作为在 Spring Integration 上执行的响应式流的订阅者和发起者。如果请求有效负载是响应式类型,则最好使用响应式流定义来处理它,从而将进程推迟到发起者订阅。为此,处理程序方法也必须返回反应类型。有关详细信息,请参阅下一节。

反应性回复有效载荷

当回复产生MessageHandler返回回复消息的反应类型有效负载时,它会以异步方式处理,并使用为MessageChannel提供的常规实现,outputChannel并在输出通道是实现时使用按需订阅扁平化ReactiveStreamsSubscribableChannel,例如FluxMessageChannel。使用标准的命令式MessageChannel用例,并且如果回复有效负载是多值发布者(ReactiveAdapter.isMultiValue()有关更多信息,请参阅 ),则将其包装到Mono.just(). 因此,Mono必须在下游显式订阅或由FluxMessageChannel下游展平。有了ReactiveStreamsSubscribableChannelfor outputChannel,就不用关心返回类型和订阅了;一切都由框架在内部顺利处理。

有关详细信息,请参阅异步服务激活器。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannel和的组合实现Publisher<Message<?>>。AFlux作为热源,在内部创建用于接收来自send()实现的传入消息。Publisher.subscribe()实现委托给该internal Flux。此外,对于按需上游消费,FluxMessageChannel提供了ReactiveStreamsSubscribableChannel合约的实现。为该通道提供的任何上游Publisher(例如,请参阅下面的源轮询通道适配器和拆分器)都会在此通道准备好订阅时自动订阅。来自该委托发布者的事件被沉入Flux上述内部。

的消费者FluxMessageChannel必须是org.reactivestreams.Subscriber遵守 Reactive Streams 合同的实例。幸运的是,MessageHandlerSpring Integration 中的所有实现也都实现了一个CoreSubscriberfrom project Reactor。由于ReactiveStreamsConsumer两者之间的实现,整个集成流程配置对目标开发人员来说是透明的。在这种情况下,流程行为从命令式推送模型更改为反应式拉取模型。AReactiveStreamsConsumer也可用于使用 将 anyMessageChannel转换为反应源IntegrationReactiveUtils,从而使集成流部分反应。

有关FluxMessageChannel更多信息,请参阅。

从 5.5 版开始,ConsumerEndpointSpec引入了一个reactive()选项,使流中的端点ReactiveStreamsConsumer独立于输入通道。Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>可以提供可选选项以Flux通过操作自定义输入通道的源Flux.transform(),例如使用publishOn(),doOnNext()retry()。此功能通过其属性表示为所有消息注释(等)的@Reactive子注释。@ServiceActivator@Splitterreactive()

源轮询通道适配器

通常,SourcePollingChannelAdapter依赖于由TaskScheduler. 轮询触发器是根据提供的选项构建的,用于定期调度任务以轮询目标数据或事件源。当 anoutputChannel是 aReactiveStreamsSubscribableChannel时,同样Trigger用于确定下一次执行时间,但不是调度任务,而是基于上一步的值和持续时间SourcePollingChannelAdapter创建 a 。然后使用A轮询并将它们接收到输出中。该生成器由提供的下游背压订阅。从 5.5 版开始,当 时,根本不调用源,并且Flux<Message<?>>Flux.generate()nextExecutionTimeMono.delay()Flux.flatMapMany()maxMessagesPerPollFluxFluxReactiveStreamsSubscribableChannelmaxMessagesPerPoll == 0flatMapMany()通过Mono.empty()结果立即完成,直到maxMessagesPerPoll稍后更改为非零值,例如通过控制总线。这样,任何MessageSource实现都可以变成反应式热源。

有关更多信息,请参阅轮询消费者

事件驱动的通道适配器

MessageProducerSupport是事件驱动的通道适配器的基类,通常,它sendMessage(Message<?>)用作生产驱动程序 API 中的侦听器回调。doOnNext()当消息生产者实现构建Flux消息而不是基于侦听器的功能时,此回调也可以轻松插入Reactor 运算符。实际上,这是在outputChannel消息生产者的 an 不是 a时在框架中完成的ReactiveStreamsSubscribableChannel。但是,为了改善最终用户体验,并允许更多的背压就绪功能,当 a是来自目标系统的数据源时,它MessageProducerSupport提供了subscribeToPublisher(Publisher<? extends Message<?>>)在目标实现中使用的 API 。Publisher<Message<?>>>通常,它在doStart()调用目标驱动程序 API 时从实现中使用Publisher的源数据。建议将响应式MessageProducerSupport实现与下游FluxMessageChanneloutputChannel按需订阅和事件消费结合起来。当Publisher取消订阅时,通道适配器进入停止状态。调用stop()这样的通道适配器就完成了从源头的制作Publisher。可以通过自动订阅新创建的源来重新启动通道适配器Publisher

响应式流的消息源

从版本 5.3 开始,ReactiveMessageSourceProducer提供了 a。它是将提供的MessageSource和事件驱动的生产组合到已配置的outputChannel. 在内部,它将 a 包装MessageSource到重复重新订阅中,从而Mono产生上述Flux<Message<?>>要订阅的a subscribeToPublisher(Publisher<? extends Message<?>>)。对此的订阅MonoSchedulers.boundedElastic()为了避免在 target 中可能发生的阻塞MessageSource。当消息源返回null(没有数据可拉)时,它会根据订阅者上下文中的条目Mono转换为具有后续重新订阅的repeatWhenEmpty()状态。默认为 1 秒。如果生成的消息在标头中包含信息,则在delayIntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY DurationMessageSourceIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKdoOnSuccess()如果下游流抛出带有失败消息以拒绝的原始Mono和拒绝。当轮询通道适配器的功能应转变为任何现有实现的反应式按需解决方案时,这可用于任何用例。doOnError()MessagingExceptionReactiveMessageSourceProducerMessageSource<?>

拆分器和聚合器

当 an为其逻辑AbstractMessageSplitter获取 a时,该过程自然会遍历 中的项目以将它们映射为消息以发送到. 如果该通道是 a ,则该通道的包装器是按需订阅的,当我们将传入事件映射到多值输出时,这种拆分器行为看起来更像是一个Reactor 操作符。当整个集成流程使用拆分器之前和之后构建时最有意义,将 Spring Integration 配置与 Reactive Streams 要求及其用于事件处理的操作符对齐。使用常规通道,a被转换为标准迭代和生成拆分逻辑。PublisherPublisheroutputChannelReactiveStreamsSubscribableChannelFluxPublisherflatMapPublisherFluxMessageChannelPublisherIterable

AFluxAggregatorMessageHandler是特定 Reactive Streams 逻辑实现的另一个示例,可以将其视为"reactive operator"Project Reactor。它基于Flux.groupBy()and Flux.window()(or buffer()) 运算符。在创建 a 时,传入的消息会沉入到Flux.create()启动FluxAggregatorMessageHandler中,使其成为热源。这是由一个按需Flux订阅的,或者直接在没有反应的时候订阅。当整个集成流程在此组件之前和之后构建时,这有其强大的功能,从而使整个逻辑背压准备就绪。ReactiveStreamsSubscribableChannelFluxAggregatorMessageHandler.start()outputChannelMessageHandlerFluxMessageChannel

有关详细信息,请参阅流和通量拆分通量聚合器

Java DSL

IntegrationFlowJava DSL 可以从任何实例开始Publisher(请参阅 参考资料IntegrationFlows.from(Publisher<Message<T>>))。此外,通过IntegrationFlowBuilder.toReactivePublisher()操作员,IntegrationFlow可以将其变成反应性热源。AFluxMessageChannel在这两种情况下都在内部使用;Publisher它可以根据其ReactiveStreamsSubscribableChannel合同订阅一个入站,并且它Publisher<Message<?>>本身就是一个下游订阅者。通过动态IntegrationFlow注册,我们可以实现一个强大的逻辑,将 Reactive Streams 与此集成流桥接至/从Publisher.

从版本 5.5.6 开始,存在一个运算符变体来控制返回toReactivePublisher(boolean autoStartOnSubscribe)的整个生命周期。通常,来自响应式发布者的订阅和消费发生在运行时后期,而不是在响应式流组合甚至启动期间。为了避免在订阅点进行生命周期管理的样板代码并获得更好的最终用户体验,引入了这个带有标志的新运算符。它将 (if )及其组件标记为,因此不会自动启动流中消息的生产和消费。相反,for是从内部发起的IntegrationFlowPublisher<Message<?>>ApplicationContextIntegrationFlowPublisher<Message<?>>autoStartOnSubscribetrueIntegrationFlowautoStartup = falseApplicationContextstart()IntegrationFlowFlux.doOnSubscribe(). 与autoStartOnSubscribe值无关,流从 a 停止Flux.doOnCancel()并且Flux.doOnTerminate()- 如果没有任何东西可以使用它们,则生成消息是没有意义的。

对于完全相反的用例,何时IntegrationFlow应该调用反应流并在完成后继续,fluxTransform()IntegrationFlowDefinition. 此时的流变成了 a FluxMessageChannel,传播到了 providedfluxFunction中,在Flux.transform()算子中执行。该函数的结果被包装到一个Mono<Message<?>>用于平面映射的输出Flux中,该输出由另一个订阅FluxMessageChannel用于下游流。

有关详细信息,请参阅Java DSL 章节

ReactiveMessageHandler

从 5.3 版开始,ReactiveMessageHandler框架原生支持 。这种类型的消息处理程序是为响应式客户端设计的,这些客户端返回响应式类型以用于低级操作执行的按需订阅,并且不提供任何回复数据来继续响应式流组合。当ReactiveMessageHandler在命令式集成流程中使用 a 时,handleMessage()结果在返回后立即被订阅,只是因为在这样的流程中没有响应式流组合来兑现背压。在这种情况下,框架将其包装ReactiveMessageHandlerReactiveMessageHandlerAdapter一个简单的MessageHandler. 但是,当ReactiveStreamsConsumer流中涉及 a 时(例如,当要消费的通道是 a 时FluxMessageChannel),这样的 aReactiveMessageHandler将由带有 a 的整个反应流组成flatMap()反应器操作员在消耗期间遵守背压。

开箱即用的ReactiveMessageHandler实现之一是ReactiveMongoDbStoringMessageHandler用于出站通道适配器。有关更多信息,请参阅MongoDB 反应式通道适配器

反应式通道适配器

当集成的目标协议提供了 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器变得简单。

入站的、事件驱动的通道适配器实现是关于将请求(如果需要)包装到延迟Mono或仅当协议组件启动对从侦听器方法返回Flux的订阅时才执行发送(并产生回复,如果有) 。Mono这样我们就有了一个完全封装在这个组件中的反应流解决方案。当然,在输出通道上订阅的下游集成流应该遵守 Reactive Streams 规范,并以按需、背压就绪的方式执行。这并不总是由MessageHandler集成流程中使用的处理器的性质(或当前实现)提供。可以使用线程池和队列来处理此限制,或者FluxMessageChannel(见上文)在没有响应式实现时集成端点之前和之后。

反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或继续)反应式流以与外部系统交互。入站有效负载本身可以是反应类型,也可以作为整个集成流的事件,它是顶部反应流的一部分。如果我们处于单向、即发即弃的场景中,可以立即订阅返回的响应式类型,或者将其传播到下游(请求-回复场景)以进行进一步的集成流程或目标业务逻辑中的显式订阅,但仍然下游保留反应流语义。

目前 Spring Integration 为WebFluxRSocketMongoDbR2DBC提供通道适配器(或网关)实现。Redis 流通道适配器也是响应式的,并使用Spring ReactiveStreamOperationsData。此外,Apache Cassandra 扩展提供了MessageHandlerCassandra 反应式驱动程序的实现。更多的反应式通道适配器即将出现,例如基于Kafka 的 Apache KafkaReactiveKafkaProducerTemplate以及ReactiveKafkaConsumerTemplate来自Spring 的 Apache Kafka等。对于许多其他非反应式通道适配器,建议使用线程池以避免反应式流处理期间的阻塞。


1. see XML Configuration