Java DSL

Spring Integration Java 配置和 DSL 提供了一组方便的构建器和流畅的 API,允许您从 Spring@Configuration类配置 Spring Integration 消息流。

(另请参阅Kotlin DSL。)

用于 Spring 集成的 Java DSL 本质上是 Spring 集成的外观。DSL 提供了一种将 Spring Integration 消息流嵌入到应用程序中的简单方法,方法是使用 fluentBuilder模式以及来自 Spring Framework 和 Spring Integration 的现有 Java 配置。我们还使用并支持 lambdas(Java 8 提供)以进一步简化 Java 配置。

咖啡馆提供了使用 DSL的一个很好的例子。

DSL 由IntegrationFlows工厂提供,用于IntegrationFlowBuilder. 这将生成IntegrationFlow组件,该组件应注册为 Spring bean(通过使用@Bean注释)。构建器模式用于将任意复杂的结构表示为可以接受 lambda 作为参数的方法层次结构。

IntegrationFlowBuilder唯一收集 bean 中的集成组件(实例MessageChannelAbstractEndpoint实例等),IntegrationFlow用于进一步解析和注册应用程序上下文中的具体 bean IntegrationFlowBeanPostProcessor

Java DSL 直接使用 Spring Integration 类并绕过任何 XML 生成和解析。然而,DSL 提供的不仅仅是 XML 之上的语法糖。它最引人注目的功能之一是能够定义内联 lambda 来实现端点逻辑,从而无需外部类来实现自定义逻辑。从某种意义上说,Spring Integration 对 Spring 表达式语言 (SpEL) 和内联脚本的支持解决了这个问题,但 lambda 更简单且功能更强大。

以下示例显示了如何使用 Java 配置进行 Spring 集成:

@Configuration
@EnableIntegration
public class MyConfiguration {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @Bean
    public IntegrationFlow myFlow() {
        return IntegrationFlows.fromSupplier(integerSource()::getAndIncrement,
                                         c -> c.poller(Pollers.fixedRate(100)))
                    .channel("inputChannel")
                    .filter((Integer p) -> p > 0)
                    .transform(Object::toString)
                    .channel(MessageChannels.queue())
                    .get();
    }
}

前面的配置示例的结果是它在ApplicationContext启动后创建了 Spring Integration 端点和消息通道。Java 配置可用于替换和扩充 XML 配置。您无需替换所有现有 XML 配置即可使用 Java 配置。

DSL 基础知识

org.springframework.integration.dsl包包含IntegrationFlowBuilder前面提到的 API 和许多IntegrationComponentSpec实现,它们也是构建器并提供流式 API 来配置具体端点。该基础架构为基于消息的应用程序(例如通道、端点、轮询器和通道拦截器)IntegrationFlowBuilder提供通用企业集成模式(EIP)。

端点在 DSL 中表示为动词以提高可读性。以下列表包括常见的 DSL 方法名称和关联的 EIP 端点:

  • 变换 →Transformer

  • 过滤 →Filter

  • 把手 →ServiceActivator

  • 分裂→Splitter

  • 聚合 →Aggregator

  • 路线 →Router

  • 桥 →Bridge

从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。请注意,EIP 没有正式定义术语“消息流”,但将其视为使用众所周知的消息传递模式的工作单元是有用的。DSL 提供了一个IntegrationFlow组件来定义它们之间的通道和端点的组合,但现在IntegrationFlow只扮演在应用程序上下文中填充真实 bean 的配置角色,而不是在运行时使用。然而 beanIntegrationFlow可以自动装配为一个Lifecycle控制start()stop()整个流程,它被委托给与 this 关联的所有 Spring Integration 组件IntegrationFlow。下面的例子使用IntegrationFlows工厂定义一个IntegrationFlowbean 通过使用来自以下的 EIP 方法IntegrationFlowBuilder

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform方法接受 lambda 作为端点参数以对消息有效负载进行操作。这种方法的真正论据是GenericTransformer<S, T>。因此,任何提供的转换器(ObjectToJsonTransformerFileToStringTransformer和其他)都可以在这里使用。

在幕后,分别用和IntegrationFlowBuilder识别它的MessageHandler和 端点。考虑另一个例子:MessageTransformingHandlerConsumerEndpointFactoryBean

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例组成一个序列Filter → Transformer → Service Activator。流程是“单向​​”。也就是说,它不提供回复消息,而仅将有效负载打印到 STDOUT。端点通过使用直接通道自动连接在一起。

Lambda 和Message<?>参数

在 EIP 方法中使用 lambda 时,“输入”参数通常是消息负载。如果您希望访问整个消息,请使用以 aClass<?>作为第一个参数的重载方法之一。例如,这不起作用:

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时以 a 失败,ClassCastException因为 lambda 不保留参数类型,并且框架将尝试将有效负载转换为 a Message<?>

相反,使用:

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以为流定义中内联定义的对象注册 bean,也可以重用现有的注入 bean。如果为 in-line 对象定义的 bean 名称和现有 bean 定义相同,BeanDefinitionOverrideException则抛出 a 指示这样的配置是错误的。但是,当您处理prototypebean 时,无法从集成流处理器中检测到现有的 bean 定义,因为每次我们prototype从其中调用 bean 时BeanFactory都会得到一个新实例。这样,提供的实例在IntegrationFlow没有任何 bean 注册和对现有prototypebean 定义的任何可能检查的情况下按原样使用。但是BeanFactory.initializeBean(),如果此对象具有显式id且此名称的 bean 定义在prototype范围内,则会为此对象调用。

消息渠道

除了IntegrationFlowBuilder使用 EIP 方法之外,Java DSL 还提供了一个流畅的 API 来配置MessageChannel实例。为此,MessageChannels提供了构建器工厂。以下示例显示了如何使用它:

@Bean
public MessageChannel priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}

MessageChannels可以在channel()EIP 方法中使用相同的构建器工厂IntegrationFlowBuilder来连接端点,类似于在 XML 配置中连接input-channel/output-channel对。默认情况下,端点与DirectChannelbean 名称基于以下模式的实例连接:[IntegrationFlow.beanName].channel#[channelNameIndex]. 此规则也适用于内联MessageChannels构建器工厂使用产生的未命名通道。但是,所有MessageChannels方法都有一个变体,它知道channelId您可以用来设置MessageChannel实例的 bean 名称。MessageChannel引用 和可以beanName用作 bean 方法调用。以下示例显示了使用channel()EIP 方法的可能方式:

@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

@Bean
public MessageChannel publishSubscribe() {
    return MessageChannels.publishSubscribe().get();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlows.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input")表示“'查找并使用MessageChannel带有“输入”id的,或创建一个'”。

  • fixedSubscriberChannel()生成 的实例FixedSubscriberChannel并将其注册为channelFlow.channel#0.

  • channel("queueChannel")工作方式相同,但使用现有的queueChannelbean。

  • channel(publishSubscribe())是 bean 方法参考。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder暴露IntegrationComponentSpecExecutorChannel并将其注册为 的executorChannel

  • channel("output")只要不存在具有此名称的DirectChannelbean,就将 bean注册为其名称。output

注意:前面的IntegrationFlow定义是有效的,它的所有通道都应用于具有BridgeHandler实例的端点。

请注意通过MessageChannels来自不同IntegrationFlow实例的工厂使用相同的内联通道定义。即使 DSL 解析器将不存在的对象注册为 bean,它也无法确定MessageChannel来自不同IntegrationFlow容器的相同对象 ()。下面的例子是错误的:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlows.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlows.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

该不良示例的结果是以下异常:

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

要使其工作,您需要@Bean为该通道声明并从不同IntegrationFlow实例中使用其 bean 方法。

轮询器

Spring Integration 还提供了一个流畅的 API,让您可以配置PollerMetadata实现AbstractPollingEndpoint。您可以使用Pollers构建器工厂来配置公共 bean 定义或从IntegrationFlowBuilderEIP 方法创建的定义,如以下示例所示:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(500)
        .errorChannel("myErrors");
}

有关更多信息,请参阅JavadocPollers和中的内容。PollerSpec

如果使用 DSL 构造 a PollerSpecas a @Bean,请不要调用get()bean 定义中的方法。是PollerSpec从规范FactoryBean生成PollerMetadata对象并初始化其所有属性的。

reactive()端点_

从 5.5 版开始,ConsumerEndpointSpec提供了一个reactive()带有可选定制器的配置属性Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。此选项将目标端点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,输入通道类型转换为Fluxvia IntegrationReactiveUtils.messageChannelToFlux()。操作员使用提供的函数来自Flux.transform()定义(publishOn()log()doOnNext()等)来自输入通道的反应流源。

以下示例演示了如何将发布线程从独立于最终订阅者和生产者的输入通道更改为DirectChannel

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlows
            .from("inputChannel")
            .<String, Integer>transform(Integer::parseInt,
                    e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())))
            .get();
}

有关更多信息,请参阅反应式流支持

DSL 和端点配置

所有IntegrationFlowBuilderEIP 方法都有一个变体,它应用 lambda 参数来为AbstractEndpoint实例提供选项:SmartLifecyclePollerMetadatarequest-handler-advice-chain和其他。它们中的每一个都有通用参数,因此它允许您配置端点,甚至MessageHandler在上下文中配置端点,如以下示例所示:

@Bean
public IntegrationFlow flow2() {
    return IntegrationFlows.from(this.inputChannel)
                .transform(new PayloadSerializingTransformer(),
                       c -> c.autoStartup(false).id("payloadSerializingTransformer"))
                .transform((Integer p) -> p * 2, c -> c.advice(this.expressionAdvice()))
                .get();
}

此外,EndpointSpec提供了一种id()方法让您使用给定的 bean 名称注册端点 bean,而不是生成的。

如果MessageHandler被引用为 bean,那么如果DSL 定义中存在adviceChain该方法,则任何现有配置都将被覆盖:.advice()

@Bean
public TcpOutboundGateway tcpOut() {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(cf());
    gateway.setAdviceChain(Collections.singletonList(fooAdvice()));
    return gateway;
}

@Bean
public IntegrationFlow clientTcpFlow() {
    return f -> f
        .handle(tcpOut(), e -> e.advice(testAdvice()))
        .transform(Transformers.objectToString());
}

也就是说,它们没有被合并,testAdvice()在这种情况下只使用 bean。

变形金刚

DSL API 提供了一个方便、流畅的Transformers工厂,可用作.transform()EIP 方法中的内联目标对象定义。以下示例显示了如何使用它:

@Bean
public IntegrationFlow transformFlow() {
    return IntegrationFlows.from("input")
            .transform(Transformers.fromJson(MyPojo.class))
            .transform(Transformers.serializer())
            .get();
}

它避免了使用 setter 进行不方便的编码,并使流定义更加直接。请注意,您可以使用Transformers将目标Transformer实例声明为@Bean实例,并再次从IntegrationFlow定义中将它们用作 bean 方法。尽管如此,如果内联对象尚未定义为 bean,则 DSL 解析器会处理它们的 bean 声明。

有关更多信息和支持的工厂方法,请参阅Javadoc 中的Transformers

入站通道适配器

通常,消息流从入站通道适配器(例如<int-jdbc:inbound-channel-adapter>)开始。适配器配置有<poller>,它要求 aMessageSource<?>定期生成消息。Java DSL 也允许IntegrationFlow从 a开始MessageSource<?>。为此,IntegrationFlows构建器工厂提供了一个重载IntegrationFlows.from(MessageSource<?> messageSource)方法。您可以将 配置MessageSource<?>为 bean 并将其作为该方法的参数提供。的第二个参数IntegrationFlows.from()是一个Consumer<SourcePollingChannelAdapterSpec>lambda,它允许您为. 以下示例展示了如何使用 fluent API 和 lambda 来创建:PollerMetadataSmartLifecycleSourcePollingChannelAdapterIntegrationFlow

@Bean
public MessageSource<Object> jdbcMessageSource() {
    return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}

@Bean
public IntegrationFlow pollingFlow() {
    return IntegrationFlows.from(jdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
            .transform(Transformers.toJson())
            .channel("furtherProcessChannel")
            .get();
}

对于那些不需要Message直接构建对象的情况,您可以使用IntegrationFlows.fromSupplier()基于java.util.function.Supplier. 的结果Supplier.get()自动包装在 a 中Message(如果它还不是 a Message)。

消息路由器

Spring Integration 原生提供了专门的路由器类型,包括:

  • HeaderValueRouter

  • PayloadTypeRouter

  • ExceptionTypeRouter

  • RecipientListRouter

  • XPathRouter

与许多其他 DSL IntegrationFlowBuilderEIP 方法一样,该route()方法可以应用任何AbstractMessageRouter实现,或者为方便起见,String作为 SpEL 表达式或ref-method对。此外,您可以route()使用 lambda 进行配置,并将 lambda 用于Consumer<RouterSpec<MethodInvokingRouter>>. fluent API 还提供了对AbstractMappingMessageRouter等选项channelMapping(String key, String channelName),如以下示例所示:

@Bean
public IntegrationFlow routeFlowByLambda() {
    return IntegrationFlows.from("routerInput")
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.suffix("Channel")
                            .channelMapping(true, "even")
                            .channelMapping(false, "odd")
            )
            .get();
}

以下示例显示了一个简单的基于表达式的路由器:

@Bean
public IntegrationFlow routeFlowByExpression() {
    return IntegrationFlows.from("routerInput")
            .route("headers['destChannel']")
            .get();
}

routeToRecipients()方法采用Consumer<RecipientListRouterSpec>,如以下示例所示:

@Bean
public IntegrationFlow recipientListFlow() {
    return IntegrationFlows.from("recipientListInput")
            .<String, String>transform(p -> p.replaceFirst("Payload", ""))
            .routeToRecipients(r -> r
                    .recipient("thing1-channel", "'thing1' == payload")
                    .recipientMessageSelector("thing2-channel", m ->
                            m.getHeaders().containsKey("recipient")
                                    && (boolean) m.getHeaders().get("recipient"))
                    .recipientFlow("'thing1' == payload or 'thing2' == payload or 'thing3' == payload",
                            f -> f.<String, String>transform(String::toUpperCase)
                                    .channel(c -> c.queue("recipientListSubFlow1Result")))
                    .recipientFlow((String p) -> p.startsWith("thing3"),
                            f -> f.transform("Hello "::concat)
                                    .channel(c -> c.queue("recipientListSubFlow2Result")))
                    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                    "thing3".equals(m.getPayload())),
                            f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
                    .defaultOutputToParentFlow())
            .get();
}

定义允许您将路由器设置.defaultOutputToParentFlow()为网关,以继续处理主流中不匹配的消息。.routeToRecipients()defaultOutput

分离器

要创建拆分器,请使用split()EIP 方法。默认情况下,如果有效负载是 an Iterable、 an Iterator、 an Array、 aStream或 reactive Publisher,则该split()方法将每个项目作为单独的消息输出。它接受 lambda、SpEL 表达式或任何AbstractMessageSplitter实现。或者,您可以使用它不带参数来提供DefaultMessageSplitter. 以下示例显示了如何split()通过提供 lambda 来使用该方法:

@Bean
public IntegrationFlow splitFlow() {
    return IntegrationFlows.from("splitInput")
              .split(s -> s.applySequence(false).delimiters(","))
              .channel(MessageChannels.executor(taskExecutor()))
              .get();
}

前面的示例创建了一个拆分器,用于拆分包含逗号分隔的消息String

聚合器和重排序器

AnAggregator在概念上与 a 相反Splitter。它将一系列单独的消息聚合成单个消息,并且必然更复杂。默认情况下,聚合器返回一条消息,其中包含来自传入消息的有效负载集合。相同的规则适用于Resequencer. 以下示例显示了拆分器-聚合器模式的规范示例:

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlows.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表拆分为单独的消息并将它们发送到ExecutorChannel. 该resequence()方法通过在消息头中找到的序列详细信息对消息进行重新排序。该aggregate()方法收集这些消息。

但是,您可以通过指定发布策略和关联策略等来更改默认行为。考虑以下示例:

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前面的示例关联具有myCorrelationKey标头的消息,并在至少累积十个消息后释放这些消息。

resequence()为EIP 方法提供了类似的 lambda 配置。

服务激活器及其.handle()方法

EIP 方法的.handle()目标是调用MessageHandler某个 POJO 上的任何实现或任何方法。另一种选择是使用 lambda 表达式定义“活动”。因此,我们引入了一个通用的GenericHandler<P>功能接口。它的handle方法需要两个参数:P payloadMessageHeaders headers(从 5.1 版开始)。有了这个,我们可以定义一个流程如下:

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("flow3Input")
        .<Integer>handle((p, h) -> p * 2)
        .get();
}

前面的示例将它接收到的任何整数加倍。

然而,Spring Integration 的一个主要目标是loose coupling,通过从消息负载到消息处理程序的目标参数的运行时类型转换。由于 Java 不支持 lambda 类的泛型类型解析,我们payloadType为大多数 EIP 方法和LambdaMessageProcessor. 这样做会将硬转换工作委托给 Spring ConversionService,后者使用提供type的和请求的消息来定位方法参数。以下示例显示了结果IntegrationFlow可能的样子:

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
            .<byte[], String>transform(p - > new String(p, "UTF-8"))
            .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

我们还可以BytesToIntegerConverter在其中注册一些ConversionService以摆脱额外的.transform()

@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
   return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlows.from("input")
             .handle(Integer.class, (p, h) -> p * 2)
            .get();
}

运营商网关()

定义中的gateway()操作符IntegrationFlow是一个特殊的服务激活器实现,通过其输入通道调用其他端点或集成流并等待回复。<gateway>从技术上讲,它与定义中的嵌套组件具有相同的作用<chain>(请参阅从链中调用链),并允许流更清晰、更直接。从逻辑上讲,从业务角度来看,它是一个消息传递网关,允许在目标集成解决方案的不同部分之间分配和重用功能(请参阅消息传递网关)。这个操作符有几个针对不同目标的重载:

  • gateway(String requestChannel)按名称向某个端点的输入通道发送消息;

  • gateway(MessageChannel requestChannel)通过直接注入将消息发送到某个端点的输入通道;

  • gateway(IntegrationFlow flow)向提供的输入通道发送消息IntegrationFlow

所有这些都有一个带有第二个Consumer<GatewayEndpointSpec>参数的变体来配置目标GatewayMessageHandler和各自的AbstractEndpoint. 此外,IntegrationFlow基于 - 的方法允许调用现有IntegrationFlowbean 或通过功能接口的就地 lambda 将流声明为子流,或者以方法更清晰的代码样式IntegrationFlow将其提取:private

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlows
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流并不总是返回回复,则应将 设置requestTimeout为 0 以防止无限期挂起调用线程。在这种情况下,流程将在该点结束并释放线程以进行进一步的工作。

操作员日志()

为方便起见,为了通过 Spring 集成流 ( <logging-channel-adapter>) 记录消息旅程,提供了一个log()运算符。在内部,它由作为其订阅者的WireTap ChannelInterceptora表示。LoggingHandler它负责将传入消息记录到下一个端点或当前通道。下面的例子展示了如何使用LoggingHandler

.filter(...)
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(...)

在前面的示例中,仅在通过过滤器且在路由之前的消息的级别上id记录标头。ERRORtest.category

当这个操作符用在流的末尾时,它是一个单向处理程序,流结束。要将其作为产生回复的流程,您可以bridge()在 the 之后使用简单的,log()或者从 5.1 版开始,您可以使用logAndReply()运算符来代替。 logAndReply只能在流程结束时使用。

运算符拦截()

从 5.3 版本开始,intercept()操作员允许在流中ChannelInterceptor的当前注册一个或多个实例MessageChannelMessageChannel这是通过MessageChannelsAPI创建显式的替代方法。以下示例使用 aMessageSelectingInterceptor拒绝某些消息,但有异常:

.transform(...)
.intercept(new MessageSelectingInterceptor(m -> m.getPayload().isValid()))
.handle(...)

MessageChannelSpec.wireTap()

Spring Integration 包括一个.wireTap()流畅的 APIMessageChannelSpec构建器。以下示例显示了如何使用该wireTap方法记录输入:

@Bean
public QueueChannelSpec myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input");
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

如果MessageChannel是 的实例InterceptableChannel,则log()wireTap()intercept()运算符将应用于当前MessageChannel。否则,将中间体DirectChannel注入到当前配置的端点的流中。在下面的示例中,直接WireTap添加了拦截器myChannel,因为DirectChannelimplements InterceptableChannel

@Bean
MessageChannel myChannel() {
    return new DirectChannel();
}

...
    .channel(myChannel())
    .log()
}

当当前MessageChannel没有实现InterceptableChannel时,一个隐式的DirectChannelandBridgeHandler被注入到 中IntegrationFlow,并且WireTap被添加到这个 newDirectChannel中。以下示例没有任何通道声明:

.handle(...)
.log()
}

在前面的示例中(并且任何时候都没有声明任何通道),隐式DirectChannel注入到 的当前位置IntegrationFlow并用作当前配置的输出通道ServiceActivatingHandler(来自.handle()前面描述的)。

使用消息流

IntegrationFlowBuilder提供顶级 API 来生成连接到消息流的集成组件。当您的集成可以通过单个流程完成时(通常是这种情况),这很方便。或者IntegrationFlow,可以通过实例加入MessageChannel实例。

默认情况下,MessageFlow在 Spring Integration 用语中表现为“链”。也就是说,端点是由DirectChannel实例自动和隐式连接的。消息流实际上并未构建为链,它提供了更大的灵活性。例如,如果您知道它的inputChannel名称(也就是说,如果您明确定义它),您可以向流中的任何组件发送消息。您还可以在流中引用外部定义的通道,以允许使用通道适配器(以启用远程传输协议、文件 I/O 等),而不是直接通道。因此,DSL 不支持 Spring Integrationchain元素,因为在这种情况下它不会增加太多价值。

由于 Spring Integration Java DSL 生成与任何其他配置选项相同的 bean 定义模型,并且基于现有的 Spring Framework@Configuration基础设施,因此它可以与 XML 定义一起使用,并与 Spring Integration 消息传递注释配置连接。

您还可以IntegrationFlow使用 lambda 定义直接实例。以下示例显示了如何执行此操作:

@Bean
public IntegrationFlow lambdaFlow() {
    return f -> f.filter("World"::equals)
                   .transform("Hello "::concat)
                   .handle(System.out::println);
}

此定义的结果是与隐式直接通道连接的同一组集成组件。这里唯一的限制是这个流程是从一个命名的直接通道开始的 - lambdaFlow.input。此外,Lambda 流不能从MessageSource或开始MessageProducer

从 5.1 版本开始,这种类型IntegrationFlow被包装到代理以公开生命周期控制并提供inputChannel对内部关联的StandardIntegrationFlow.

从版本 5.0.6 开始,为组件生成的 bean 名称IntegrationFlow包括流 bean,后跟一个点 ( .) 作为前缀。例如,ConsumerEndpointFactoryBean前面.transform("Hello "::concat)示例中的 bean 名称为lambdaFlow.o.s.i.config.ConsumerEndpointFactoryBean#0。(theo.s.i是一个缩短的 fromorg.springframework.integration以适应页面。)该Transformer端点的实现 bean 的 bean 名称为(从 5.1 版开始),其中使用其组件类型lambdaFlow.transformer#0而不是类的完全限定名称。MethodInvokingTransformer相同的模式适用于所有NamedComponents 当必须在流中生成 bean 名称时。这些生成的 bean 名称前面带有流 ID,用于解析日志或在某些分析工具中将组件分组在一起,以及避免我们在运行时同时注册集成流时出现竞争条件。有关更多信息,请参阅动态和运行时集成流

FunctionExpression

我们引入了FunctionExpression类(SpELExpression接口的实现)让我们使用 lambda 和generics. 当存在来自 Core Spring Integration 的隐式变体时Function<T, R>,为 DSL 组件提供了该选项以及一个选项。以下示例显示了如何使用函数表达式:expressionStrategy

.enrich(e -> e.requestChannel("enrichChannel")
            .requestPayload(Message::getPayload)
            .propertyFunction("date", m -> new Date()))

FunctionExpression还支持运行时类型转换,如SpelExpression.

子流支持

一些组件if…​else提供publish-subscribe了通过使用子流来指定其逻辑或映射的能力。最简单的示例是.publishSubscribeChannel(),如以下示例所示:

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以通过单独的IntegrationFlow @Bean定义获得相同的结果,但我们希望您发现逻辑组合的子流程样式很有用。我们发现它导致代码更短(因此更易读)。

从版本 5.3 开始,提供了一个BroadcastCapableChannel基于- 的publishSubscribeChannel()实现来在代理支持的消息通道上配置子流订阅者。例如,我们现在可以在以下位置配置多个订阅者作为子流Jms.publishSubscribeChannel()

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub")
                .get();
}

@Bean
public IntegrationFlow pubSubFlow() {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel(),
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

@Bean
public BroadcastCapableChannel jmsPublishSubscribeChannel(ConnectionFactory jmsConnectionFactory) {
    return (BroadcastCapableChannel) Jms.publishSubscribeChannel(jmsConnectionFactory)
            .destination("pubsub")
            .get();
}

类似publish-subscribe的子流程组合提供了该.routeToRecipients()方法。

另一个例子是在方法上使用.discardFlow()而不是。.discardChannel().filter()

.route()值得特别关注。考虑以下示例:

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping()继续像在常规Router映射中一样工作,但将该子.subFlowMapping()流绑定到主流。换句话说,任何路由器的子流在 之后返回主流.route()

有时,您需要IntegrationFlow @Bean.subFlowMapping(). 以下示例显示了如何执行此操作:

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要接收来自这样一个子流的回复并继续主流时,这个IntegrationFlowbean 引用(或其输入通道)必须用 a 包装,.gateway()如前面的示例所示。oddFlow()前面示例中的引用未包装到.gateway(). 因此,我们不期望来自此路由分支的回复。否则,您最终会遇到类似于以下内容的异常:

引起:org.springframework.beans.factory.BeanCreationException:
    'currentComponent'([电子邮件保护] a51c)
    是单向的“MessageHandler”,不适合配置“outputChannel”。
    这是集成流程的结束。

当您将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。

子流可以嵌套到任何深度,但我们不建议这样做。事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面条,并且很难让人解析。

在 DSL 支持子流配置的情况下,当正在配置的组件通常需要通道并且该子流以channel()元素开始时,框架会隐式地bridge()在组件输出通道和流的输入通道之间放置 a。例如,在这个filter定义中:

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架在内部创建一个DirectChannelbean 用于注入MessageFilter.discardChannel. 然后它将子流包装到一个IntegrationFlow以此隐式通道开始的订阅中,并将 a放在流中指定的bridge前面。channel()当现有IntegrationFlowbean 用作子流引用(而不是内联子流,例如 lambda)时,不需要这样的桥接,因为框架可以解析来自流 bean 的第一个通道。对于内联子流,输入通道尚不可用。

使用协议适配器

到目前为止显示的所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递架构。但是,我们还没有进行任何真正的整合。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源或访问本地文件系统。Spring Integration 支持所有这些以及更多。理想情况下,DSL 应该为所有这些提供一流的支持,但是实现所有这些并跟上新的适配器添加到 Spring Integration 中是一项艰巨的任务。因此,人们期望 DSL 不断赶上 Spring Integration。

因此,我们提供了高级 API 来无缝定义特定协议的消息传递。我们使用工厂模式和构建器模式以及 lambdas 来做到这一点。您可以将工厂类视为“命名空间工厂”,因为它们与来自具体协议特定 Spring Integration 模块的组件的 XML 命名空间扮演相同的角色。目前,Spring Integration Java DSL 支持Amqp, Feed, Jms, Files, (S)Ftp, Http, JPA, MongoDb, TCP/UDP, Mail,WebFluxScripts命名空间工厂。以下示例显示了如何使用其中的三个(AmqpJmsMail):

@Bean
public IntegrationFlow amqpFlow() {
    return IntegrationFlows.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
            .transform("hello "::concat)
            .transform(String.class, String::toUpperCase)
            .get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return IntegrationFlows.from("jmsOutboundGatewayChannel")
            .handle(Jms.outboundGateway(this.jmsConnectionFactory)
                        .replyContainer(c ->
                                    c.concurrentConsumers(3)
                                            .sessionTransacted(true))
                        .requestDestination("jmsPipelineTest"))
            .get();
}

@Bean
public IntegrationFlow sendMailFlow() {
    return IntegrationFlows.from("sendMailChannel")
            .handle(Mail.outboundAdapter("localhost")
                            .port(smtpPort)
                            .credentials("user", "pw")
                            .protocol("smtp")
                            .javaMailProperties(p -> p.put("mail.debug", "true")),
                    e -> e.id("sendMailEndpoint"))
            .get();
}

前面的示例展示了如何使用“命名空间工厂”作为内联适配器声明。但是,您可以从@Bean定义中使用它们以使IntegrationFlow方法链更具可读性。

在我们为其他命名空间工厂付出努力之前,我们正在征求社区对这些命名空间工厂的反馈。我们也感谢任何关于我们接下来应该支持哪些适配器和网关的优先级的输入。

您可以在本参考手册中特定于协议的章节中找到更多 Java DSL 示例。

所有其他协议通道适配器都可以配置为通用 bean 并连接到IntegrationFlow,如以下示例所示:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
    return MessageChannels
            .queue()
            .wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
    return IntegrationFlows.from("inputChannel")
            .filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
                    e -> e.discardChannel(wrongMessagesChannel))
            .log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
            .route(xpathRouter(wrongMessagesChannel))
            .get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
    XPathRouter router = new XPathRouter("local-name(/*)");
    router.setEvaluateAsString(true);
    router.setResolutionRequired(false);
    router.setDefaultOutputChannel(wrongMessagesChannel);
    router.setChannelMapping("Tags", "splittingChannel");
    router.setChannelMapping("Tag", "receivedChannel");
    return router;
}

IntegrationFlowAdapter

接口可以直接实现,IntegrationFlow指定为组件进行扫描,如下例所示:

@Component
public class MyFlow implements IntegrationFlow {

    @Override
    public void configure(IntegrationFlowDefinition<?> f) {
        f.<String, String>transform(String::toUpperCase);
    }

}

它由应用程序上下文拾取IntegrationFlowBeanPostProcessor并正确解析和注册。

为了方便并获得松散耦合架构的好处,我们提供了IntegrationFlowAdapter基类实现。它需要一个buildFlow()方法实现来IntegrationFlowDefinition使用其中一种from()方法生成一个,如以下示例所示:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

    private final AtomicBoolean invoked = new AtomicBoolean();

    public Date nextExecutionTime(TriggerContext triggerContext) {
          return this.invoked.getAndSet(true) ? null : new Date();
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(this::messageSource,
                      e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                 .split(this)
                 .transform(this)
                 .aggregate(a -> a.processor(this, null), null)
                 .enrichHeaders(Collections.singletonMap("thing1", "THING1"))
                 .filter(this)
                 .handle(this)
                 .channel(c -> c.queue("myFlowAdapterOutput"));
    }

    public String messageSource() {
         return "T,H,I,N,G,2";
    }

    @Splitter
    public String[] split(String payload) {
         return StringUtils.commaDelimitedListToStringArray(payload);
    }

    @Transformer
    public String transform(String payload) {
         return payload.toLowerCase();
    }

    @Aggregator
    public String aggregate(List<String> payloads) {
           return payloads.stream().collect(Collectors.joining());
    }

    @Filter
    public boolean filter(@Header Optional<String> thing1) {
            return thing1.isPresent();
    }

    @ServiceActivator
    public String handle(String payload, @Header String thing1) {
           return payload + ":" + thing1;
    }

}

动态和运行时集成流

IntegrationFlow并且它的所有依赖组件都可以在运行时注册。在 5.0 版本之前,我们使用了BeanFactory.registerSingleton()钩子。从 Spring Framework 开始5.0,我们使用instanceSupplierhook 进行编程BeanDefinition注册。以下示例显示了如何以编程方式注册 bean:

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

请注意,在前面的示例中,instanceSupplier钩子是genericBeanDefinition方法的最后一个参数,在这种情况下由 lambda 提供。

所有必要的 bean 初始化和生命周期都是自动完成的,就像使用标准上下文配置 bean 定义一样。

为了简化开发体验,Spring Integration 引入IntegrationFlowContext了在运行时注册和管理IntegrationFlow实例,如下例所示:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

当我们有多个配置选项并且必须创建多个类似流程的实例时,这很有用。为此,我们可以迭代我们的选项并IntegrationFlow在循环中创建和注册实例。另一个变体是当我们的数据源不是基于 Spring 的时,我们必须动态创建它。这样的示例就是 Reactive Streams 事件源,如以下示例所示:

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlows.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder作为 的结果IntegrationFlowContext.registration())可用于指定IntegrationFlow要注册的 bean 名称、控制其autoStartup和注册非 Spring 集成 bean。通常,这些额外的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器,或任何其他所需的支持组件。

当您不再需要IntegrationFlowRegistration.destroy()动态注册的 bean 及其所有依赖的 bean 时,可以使用回调来删除它们。IntegrationFlow有关更多信息,请参阅IntegrationFlowContextJavadoc

从版本 5.0.6 开始,IntegrationFlow定义中所有生成的 bean 名称都以流 ID 作为前缀。我们建议始终指定显式流 ID。否则,将在 , 中启动同步屏障IntegrationFlowContext,以生成 bean 名称IntegrationFlow并注册其 bean。我们在这两个操作上同步以避免当相同的生成 bean 名称可能用于不同的IntegrationFlow实例时出现竞争条件。

此外,从版本 5.0.6 开始,注册构建器 API 有一个新方法:useFlowIdAsPrefix(). 如果您希望声明同一流的多个实例并在流中的组件具有相同 ID 时避免 bean 名称冲突,这将非常有用,如以下示例所示:

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在这种情况下,第一个流的消息处理程序可以使用名称为 的 bean 来引用tcp1.client.handler

使用 E 时需要一个id属性useFlowIdAsPrefix()

IntegrationFlow作为网关

IntegrationFlow可以从提供组件的服务接口开始,如下GatewayProxyFactoryBean例所示:

public interface ControlBusGateway {

    void send(String command);
}

...

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from(ControlBusGateway.class)
            .controlBus()
            .get();
}

接口方法的所有代理都与通道一起提供,以将消息发送到IntegrationFlow. 您可以使用注解标记服务接口,并使用@MessagingGateway注解标记方法@Gateway。尽管如此,requestChannel. 中的下一个组件的内部通道被忽略并覆盖IntegrationFlow。否则,通过使用来创建这样的配置是IntegrationFlow没有意义的。

默认情况下 aGatewayProxyFactoryBean获取一个常规的 bean 名称,例如[FLOW_BEAN_NAME.gateway]. 您可以使用@MessagingGateway.name()属性或重载的IntegrationFlows.from(Class<?> serviceInterface, Consumer<GatewayProxySpec> endpointConfigurer)工厂方法更改该 ID。此外,接口上注释的所有属性@MessagingGateway都应用于目标GatewayProxyFactoryBean。当注释配置不适用时,该Consumer<GatewayProxySpec>变体可用于为目标代理提供适当的选项。此 DSL 方法从 5.2 版开始可用。

使用 Java 8,您甚至可以使用java.util.function接口创建集成网关,如以下示例所示:

@Bean
public IntegrationFlow errorRecovererFlow() {
    return IntegrationFlows.from(Function.class, (gateway) -> gateway.beanName("errorRecovererFunction"))
            .handle((GenericHandler<?>) (p, h) -> {
                throw new RuntimeException("intentional");
            }, e -> e.advice(retryAdvice()))
            .get();
}

errorRecovererFlow可以按如下方式使用:

@Autowired
@Qualifier("errorRecovererFunction")
private Function<String, String> errorRecovererFlowGateway;

DSL 扩展

从 5.3 版开始,IntegrationFlowExtension引入了 an 以允许使用自定义或组合的 EIP 运算符扩展现有的 Java DSL。所需要的只是这个类的扩展,它提供了可以在IntegrationFlowbean 定义中使用的方法。扩展类也可以用于自定义IntegrationComponentSpec配置;例如,可以在现有IntegrationComponentSpec扩展中实现错过或默认选项。下面的示例演示了复合自定义运算符和AggregatorSpec默认自定义扩展的用法outputProcessor

public class CustomIntegrationFlowDefinition
        extends IntegrationFlowExtension<CustomIntegrationFlowDefinition> {

    public CustomIntegrationFlowDefinition upperCaseAfterSplit() {
        return split()
                .transform("payload.toUpperCase()");
    }

    public CustomIntegrationFlowDefinition customAggregate(Consumer<CustomAggregatorSpec> aggregator) {
        return register(new CustomAggregatorSpec(), aggregator);
    }

}

public class CustomAggregatorSpec extends AggregatorSpec {

    CustomAggregatorSpec() {
        outputProcessor(group ->
                group.getMessages()
                        .stream()
                        .map(Message::getPayload)
                        .map(String.class::cast)
                        .collect(Collectors.joining(", ")));
    }

}

对于方法链流,这些扩展中的新 DSL 运算符必须返回扩展类。这样,目标IntegrationFlow定义将适用于新的和现有的 DSL 运营商:

@Bean
public IntegrationFlow customFlowDefinition() {
    return
            new CustomIntegrationFlowDefinition()
                    .log()
                    .upperCaseAfterSplit()
                    .channel("innerChannel")
                    .customAggregate(customAggregatorSpec ->
                            customAggregatorSpec.expireGroupsUponCompletion(true))
                    .logAndReply();
}

集成流组合

由于MessageChannel抽象是 Spring Integration 中的一等公民,因此始终假定集成流的组合。流中任何端点的输入通道可用于从任何其他端点发送消息,而不仅仅是从具有此通道作为输出的端点发送消息。此外,有了@MessagingGateway契约、Content Enricher 组件、像 a 这样的复合端点<chain>,现在有了IntegrationFlowbean(例如IntegrationFlowAdapter),在更短的、可重用的部分之间分配业务逻辑就足够简单了。最终组合所需的只是关于MessageChannel发送或接收的知识。

从 version 开始5.5.4,为了从最终用户中抽象出更多MessageChannel并隐藏实现细节,IntegrationFlows引入了from(IntegrationFlow)工厂方法以允许从IntegrationFlow现有流的输出开始当前:

@Bean
IntegrationFlow templateSourceFlow() {
    return IntegrationFlows.fromSupplier(() -> "test data")
            .channel("sourceChannel")
            .get();
}

@Bean
IntegrationFlow compositionMainFlow(IntegrationFlow templateSourceFlow) {
    return IntegrationFlows.from(templateSourceFlow)
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("compositionMainFlowResult"))
            .get();
}

另一方面,IntegrationFlowDefinition添加了一个to(IntegrationFlow)终端操作符以在其他一些流的输入通道继续当前流:

@Bean
IntegrationFlow mainFlow(IntegrationFlow otherFlow) {
    return f -> f
            .<String, String>transform(String::toUpperCase)
            .to(otherFlow);
}

@Bean
IntegrationFlow otherFlow() {
    return f -> f
            .<String, String>transform(p -> p + " from other flow")
            .channel(c -> c.queue("otherFlowResultChannel"));
}

流程中间的组合可以通过现有的gateway(IntegrationFlow)EIP 方法轻松实现。通过这种方式,我们可以通过由更简单、可重用的逻辑块组合它们来构建具有任何复杂性的流。例如,您可以添加一个IntegrationFlowbean 库作为依赖项,将它们的配置类导入最终项目并为您的IntegrationFlow定义自动装配就足够了。


1. see XML Configuration