Spring Integration 提供了 Spring 编程模型的扩展,以支持众所周知的企业集成模式。它支持基于 Spring 的应用程序中的轻量级消息传递,并支持通过声明性适配器与外部系统集成。与 Spring 对远程处理、消息传递和调度的支持相比,这些适配器提供了更高级别的抽象。

Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这对于生成可维护、可测试的代码至关重要。

Spring 集成概述

本章对 Spring Integration 的核心概念和组件进行了高级介绍。它包括一些编程技巧,可帮助您充分利用 Spring Integration。

背景

Spring 框架的关键主题之一是控制反转 (IoC)。从最广泛的意义上讲,这意味着框架代表在其上下文中管理的组件处理职责。组件本身被简化了,因为它们免除了这些责任。例如,依赖注入减轻了组件定位或创建它们的依赖的责任。同样,面向方面的编程通过将业务组件模块化为可重用方面来减轻通用横切关注点的业务组件。在每种情况下,最终结果都是一个更易于测试、理解、维护和扩展的系统。

此外,Spring 框架和产品组合为构建企业应用程序提供了一个全面的编程模型。开发人员受益于该模型的一致性,尤其是它基于完善的最佳实践这一事实,例如对接口进行编程和支持组合而不是继承。Spring 的简化抽象和强大的支持库提高了开发人员的生产力,同时提高了可测试性和可移植性的水平。

Spring Integration 的动机是这些相同的目标和原则。它将 Spring 编程模型扩展到消息传递领域,并建立在 Spring 现有的企业集成支持之上,以提供更高级别的抽象。它支持消息驱动的架构,其中控制反转适用于运行时问题,例如某些业务逻辑应该何时运行以及响应应该发送到哪里。它支持消息的路由和转换,以便可以在不影响可测试性的情况下集成不同的传输和不同的数据格式。换句话说,消息传递和集成问题由框架处理。业务组件与基础设施进一步隔离,开发人员无需承担复杂的集成责任。

作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注解、支持命名空间的 XML、带有通用“bean”元素的 XML,以及直接使用底层 API。该 API 基于明确定义的策略接口和非侵入式委托适配器。Spring Integration 的设计灵感来自于对 Spring 中的常见模式与企业集成模式中描述的众所周知的模式之间的强烈相似性,Gregor Hohpe 和 Bobby Woolf (Addison Wesley, 2004)。阅读过该书的开发人员应该会立即熟悉 Spring Integration 的概念和术语。

目标和原则

Spring Integration 的动机如下:

  • 为实现复杂的企业集成解决方案提供一个简单的模型。

  • 在基于 Spring 的应用程序中促进异步、消息驱动的行为。

  • 促进现有 Spring 用户的直观、增量采用。

Spring Integration 遵循以下原则:

  • 组件应该松散耦合以实现模块化和可测试性。

  • 该框架应该强制分离业务逻辑和集成逻辑之间的关注点。

  • 扩展点本质上应该是抽象的(但在明确定义的边界内),以促进重用和可移植性。

主要成分

从垂直角度来看,分层架构有助于关注点分离,层之间基于接口的契约促进松散耦合。基于 Spring 的应用程序通常以这种方式设计,并且 Spring 框架和产品组合为遵循这种最佳实践以实现企业应用程序的完整堆栈提供了坚实的基础。消息驱动的架构增加了水平视角,但这些相同的目标仍然相关。正如“分层架构”是一种极其通用和抽象的范式一样,消息传递系统通常遵循类似抽象的“管道和过滤器”模型。“过滤器”代表任何能够产生或使用消息的组件,“管道”在过滤器之间传输消息,以便组件本身保持松散耦合。需要注意的是,这两个高级范式并不相互排斥。支持“管道”的底层消息传递基础设施仍应封装在其合约被定义为接口的层中。同样,“过滤器”本身应该在逻辑上高于应用程序服务层的层内进行管理,通过接口与这些服务交互,其方式与 Web 层的方式非常相似。

信息

在 Spring Integration 中,消息是任何 Java 对象的通用包装器,并结合了框架在处理该对象时使用的元数据。它由有效负载和标头组成。有效负载可以是任何类型,并且标头包含通常需要的信息,例如 ID、时间戳、相关 ID 和返回地址。标头也用于在连接的传输中传递值。例如,当从接收到的文件创建消息时,文件名可以存储在标题中以供下游组件访问。同样,如果消息的内容最终将由出站邮件适配器发送,则上游组件可以将各种属性(to、from、cc、subject 等)配置为消息头值。开发人员还可以在标头中存储任意键值对。

信息
图 1. 消息

留言频道

消息通道代表管道和过滤器架构的“管道”。生产者向通道发送消息,消费者从通道接收消息。因此,消息通道将消息传递组件解耦,并且还为拦截和监视消息提供了便利点。

留言频道
图 2. 消息通道

消息通道可以遵循点对点或发布-订阅语义。使用点对点通道,不能超过一个消费者可以接收发送到通道的每条消息。另一方面,发布-订阅频道尝试将每条消息广播给频道上的所有订阅者。Spring Integration 支持这两种模型。

尽管“点对点”和“发布-订阅”定义了最终接收每条消息的消费者数量的两个选项,但还有另一个重要的考虑因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够缓冲队列中的消息。缓冲的优点是它允许限制入站消息,从而防止消费者过载。然而,顾名思义,这也增加了一些复杂性,因为如果配置了轮询器,消费者只能从这样的通道接收消息。另一方面,连接到可订阅频道的消费者只是消息驱动的。 Message Channel Implementations详细讨论了 Spring Integration 中可用的各种通道实现。

消息端点

Spring Integration 的主要目标之一是通过控制反转来简化企业集成解决方案的开发。这意味着您不必直接实现消费者和生产者,甚至不必构建消息并在消息通道上调用发送或接收操作。相反,您应该能够通过基于普通对象的实现专注于您的特定领域模型。然后,通过提供声明性配置,您可以将特定于域的代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着您必须直接连接现有的应用程序代码。任何现实世界的企业集成解决方案都需要一些专注于集成问题的代码,例如路由和转换。重要的是实现集成逻辑和业务逻辑之间的关注点分离。换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范例一样,目标应该是提供一个薄但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,并且在接下来的章节中,您可以看到 Spring Integration 的声明性配置选项如何提供一种非侵入性的方式来使用其中的每一个。重要的是实现集成逻辑和业务逻辑之间的关注点分离。换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范例一样,目标应该是提供一个薄但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,并且在接下来的章节中,您可以看到 Spring Integration 的声明性配置选项如何提供一种非侵入性的方式来使用其中的每一个。重要的是实现集成逻辑和业务逻辑之间的关注点分离。换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范例一样,目标应该是提供一个薄但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,并且在接下来的章节中,您可以看到 Spring Integration 的声明性配置选项如何提供一种非侵入性的方式来使用其中的每一个。目标应该是提供一个精简但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,并且在接下来的章节中,您可以看到 Spring Integration 的声明性配置选项如何提供一种非侵入性的方式来使用其中的每一个。目标应该是提供一个精简但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,并且在接下来的章节中,您可以看到 Spring Integration 的声明性配置选项如何提供一种非侵入性的方式来使用其中的每一个。

消息端点

消息端点代表管道和过滤器架构的“过滤器”。如前所述,端点的主要作用是将应用程序代码连接到消息传递框架,并以非侵入方式进行。换句话说,理想情况下,应用程序代码应该不知道消息对象或消息通道。这类似于 MVC 范例中控制器的角色。正如控制器处理 HTTP 请求一样,消息端点也处理消息。正如控制器映射到 URL 模式一样,消息端点也映射到消息通道。这两种情况的目标是相同的:将应用程序代码与基础设施隔离开来。企业集成模式中详细讨论了这些概念和所有模式书。在这里,我们仅提供 Spring Integration 支持的主要端点类型以及与这些类型关联的角色的高级描述。随后的章节详细说明并提供示例代码和配置示例。

消息转换器

消息转换器负责转换消息的内容或结构并返回修改后的消息。可能最常见的转换器类型是将消息的有效负载从一种格式转换为另一种格式(例如从 XML 转换为java.lang.String. 同样,转换器可以添加、删除或修改消息的标头值。

消息过滤器

消息过滤器确定是否应该将消息传递到输出通道。这仅需要一个布尔测试方法,该方法可以检查特定的有效负载内容类型、属性值、标头的存在或其他条件。如果消息被接受,则将其发送到输出通道。如果不是,它会被丢弃(或者,对于更严格的实现,Exception可能会被抛出)。消息过滤器通常与发布-订阅通道一起使用,其中多个消费者可能会收到相同的消息并使用过滤器的标准来缩小要处理的消息集。

注意不要将管道和过滤器架构模式中“过滤器”的通用使用与这种选择性地缩小两个通道之间的消息流动范围的特定端点类型混淆。“过滤器”的管道和过滤器概念与 Spring Integration 的消息端点更匹配:任何可以连接到消息通道以发送或接收消息的组件。

消息路由器

消息路由器负责决定接下来应该接收消息的通道或通道(如果有)。通常,该决定基于消息的内容或消息标头中可用的元数据。消息路由器通常用作服务激活器或其他能够发送回复消息的端点上静态配置的输出通道的动态替代方案。同样,消息路由器为多个订阅者使用的被动消息过滤器提供了一种主动替代方案,如前所述。

路由器
图 3. 消息路由器

分路器

拆分器是另一种类型的消息端点,其职责是接受来自其输入通道的消息,将该消息拆分为多个消息,并将每个消息发送到其输出通道。这通常用于将“复合”有效负载对象划分为一组包含细分有效负载的消息。

聚合器

基本上是拆分器的镜像,聚合器是一种消息端点,它接收多条消息并将它们组合成一条消息。事实上,聚合器通常是包含拆分器的管道中的下游消费者。从技术上讲,聚合器比拆分器更复杂,因为它需要维护状态(要聚合的消息),决定何时完整的消息组可用,并在必要时超时。此外,在超时的情况下,聚合器需要知道是发送部分结果、丢弃它们还是将它们发送到单独的通道。Spring Integration为超时、是否在超时时发送部分结果以及丢弃通道提供了 a CorrelationStrategy、 a和可配置设置。ReleaseStrategy

服务激活器

服务激活器是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法能够返回值,也可以提供输出消息通道。

输出通道是可选的,因为每条消息还可以提供自己的“返回地址”标头。同样的规则适用于所有消费者端点。

服务激活器调用对某个服务对象的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不期望消息类型的参数)。每当服务对象的方法返回一个值时,该返回值同样会在必要时转换为回复消息(如果它还不是消息类型)。该回复消息被发送到输出通道。如果没有配置输出通道,则将回复发送到消息的“返回地址”中指定的通道(如果可用)。

请求-回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。

处理程序端点
图 4. 服务激活器
如前所述,在Message Channel中,通道可以是可轮询或可订阅的。在上图中,这由“时钟”符号、实线箭头(轮询)和虚线箭头(订阅)来描述。

通道适配器

通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站的,也可以是出站的。通常,通道适配器在消息与从其他系统接收或发送到其他系统(文件、HTTP 请求、JMS 消息等)的任何对象或资源之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息头值。Spring Integration 提供了许多通道适配器,这些将在接下来的章节中介绍。

源端点
图 5. 入站通道适配器端点将源系统连接到MessageChannel.
消息源可以是可轮询的(例如,POP3)或消息驱动的(例如,IMAP Idle)。在上图中,这由“时钟”符号和实线箭头(轮询)和虚线箭头(消息驱动)来描述。
目标端点
图 6. 出站通道适配器端点将 a 连接MessageChannel到目标系统。
正如前面在消息通道中所讨论的,通道可以是可轮询或可订阅的。在上图中,这由“时钟”符号、实线箭头(轮询)和虚线箭头(订阅)来描述。

端点 Bean 名称

消费端点(带有 的任何东西inputChannel)由两个 bean 组成,消费者和消息处理程序。消费者拥有对消息处理程序的引用,并在消息到达时调用它。

考虑以下 XML 示例:

<int:service-activator id = "someService" ... />

给定前面的示例,bean 名称如下:

  • 消费者someService:(id

  • 处理程序:someService.handler

使用企业集成模式 (EIP) 注释时,名称取决于几个因素。考虑以下带注释的 POJO 示例:

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

给定前面的示例,bean 名称如下:

  • 消费者:someComponent.someMethod.serviceActivator

  • 处理程序:someComponent.someMethod.serviceActivator.handler

从版本 5.0.4 开始,您可以使用@EndpointId注解修改这些名称,如以下示例所示:

@Component
public class SomeComponent {

    @EndpointId("someService")
    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

给定前面的示例,bean 名称如下:

  • 消费者:someService

  • 处理程序:someService.handler

@EndpointId创建由id具有 XML 配置的属性创建的名称。考虑以下带注释的 bean 示例:

@Configuration
public class SomeConfiguration {

    @Bean
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}

给定前面的示例,bean 名称如下:

  • 消费者:someConfiguration.someHandler.serviceActivator

  • 处理人:(someHandler姓名@Bean

从版本 5.0.4 开始,您可以使用@EndpointId注解修改这些名称,如以下示例所示:

@Configuration
public class SomeConfiguration {

    @Bean("someService.handler")             (1)
    @EndpointId("someService")               (2)
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}
1 处理程序:someService.handler(bean名称)
2 消费者:(someService端点 ID)

只要您使用附加到名称的约定,@EndpointId注释就会创建由具有 XML 配置的属性创建的名称。id.handler@Bean

创建第三个 bean 有一种特殊情况:出于架构原因,如果 aMessageHandler @Bean未定义 an AbstractReplyProducingMessageHandler,则框架将提供的 bean 包装在 a 中ReplyProducingMessageHandlerWrapper。此包装器支持请求处理程序建议处理并发出正常的“未产生回复”调试日志消息。它的 bean 名称是处理程序 bean 名称加上.wrapper(如果有@EndpointId - 否则,它是正常生成的处理程序名称)。

类似地,可轮询消息源创建两个 bean,一个SourcePollingChannelAdapter(SPCA) 和一个MessageSource.

考虑以下 XML 配置:

<int:inbound-channel-adapter id = "someAdapter" ... />

给定前面的 XML 配置,bean 名称如下:

  • SPCA someAdapter:(id

  • 处理程序:someAdapter.source

考虑 POJO 的以下 Java 配置来定义@EndpointId

@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
    ...
}

给定前面的 Java 配置示例,bean 名称如下:

  • SPCA:someAdapter

  • 处理程序:someAdapter.source

考虑以下 bean 的 Java 配置来定义@EndpointID

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

给定前面的示例,bean 名称如下:

  • SPCA:someAdapter

  • 处理程序:(someAdapter.source只要您使用附加.source@Bean名称的约定)

配置和@EnableIntegration

在整个文档中,您可以看到对 XML 命名空间支持的引用,以在 Spring 集成流中声明元素。这种支持由一系列命名空间解析器提供,这些解析器生成适当的 bean 定义以实现特定组件。例如,许多端点由一个MessageHandlerbean 和一个ConsumerEndpointFactoryBean注入处理程序和输入通道名称的 bean 组成。

第一次遇到 Spring Integration 命名空间元素时,框架会自动声明一些用于支持运行时环境的 bean(任务调度程序、隐式通道创建器等)。

4.0 版引入了@EnableIntegration注解,以允许注册 Spring Integration 基础设施 bean(请参阅Javadoc)。当仅使用 Java 配置时,此注解是必需的——例如使用 Spring Boot 或 Spring Integration Messaging Annotation 支持和没有 XML 集成配置的 Spring Integration Java DSL。

当您有一个没有 Spring Integration 组件的父上下文和两个或多个使用 Spring Integration 的子上下文时,该@EnableIntegration注释也很有用。它只允许在父上下文中声明这些公共组件一次。

注释将@EnableIntegration许多基础设施组件注册到应用程序上下文中。特别是,它:

  • 为轮询器、SpEL 函数等注册一些内置 bean,例如和errorChannelits 。LoggingHandlertaskSchedulerjsonPath

  • 添加多个BeanFactoryPostProcessor实例以增强BeanFactory全局和默认集成环境。

  • 添加几个BeanPostProcessor实例以增强或转换和包装特定 bean 以用于集成目的。

  • 添加注解处理器来解析消息注解并在应用程序上下文中为它们注册组件。

@IntegrationComponentScan注释还允许类路径扫描。该注解与标准 Spring Framework 注解的作用类似@ComponentScan,但仅限于 Spring Integration 特有的组件和注解,这是标准 Spring Framework 组件扫描机制无法达到的。有关示例,请参阅@MessagingGateway注解

@EnablePublisher注解注册一个PublisherAnnotationBeanPostProcessorbean 并default-publisher-channel为那些@Publisher没有提供channel属性的注解配置。如果找到多个@EnablePublisher注释,则它们必须都具有相同的默认通道值。有关详细信息,请参阅使用注释的注释驱动配置@Publisher

引入了@GlobalChannelInterceptor注解来标记ChannelInterceptorbean 以进行全局通道拦截。此注释是<int:channel-interceptor>XML 元素的类似物(请参阅Global Channel Interceptor Configuration)。 @GlobalChannelInterceptor注释可以放置在类级别(带有@Component原型注释)或类中的@Bean方法@Configuration。在任何一种情况下,bean 都必须实现ChannelInterceptor.

从 5.1 版开始,全局通道拦截器适用于动态注册的通道——例如使用 Java DSL 时使用beanFactory.initializeBean()或通过初始化的 bean。IntegrationFlowContext以前,在刷新应用程序上下文后创建 bean 时不应用拦截器。

注释将、或bean@IntegrationConverter标记为. 此注释是XML 元素的类似物(请参阅Payload Type Conversion)。您可以在类级别(使用构造型注释)或类中的方法上放置注释。ConverterGenericConverterConverterFactoryintegrationConversionService<int:converter>@IntegrationConverter@Component@Bean@Configuration

有关消息注释的更多信息,请参阅注释支持

编程注意事项

您应该尽可能使用普通的旧 Java 对象 (POJO),并且仅在绝对必要时才在代码中公开框架。有关详细信息,请参阅POJO 方法调用

如果您确实将框架暴露给您的类,则需要考虑一些注意事项,尤其是在应用程序启动期间:

  • 如果你的组件是ApplicationContextAware,你一般不应该ApplicationContextsetApplicationContext()方法中使用 。相反,存储一个引用并将此类​​使用推迟到上下文生命周期的后期。

  • 如果您的组件是一个InitializingBean或使用@PostConstruct方法,请不要从这些初始化方法发送任何消息。调用这些方法时,应用程序上下文尚未初始化,发送此类消息很可能会失败。如果您需要在启动期间发送消息,请执行ApplicationListener并等待ContextRefreshedEvent. 或者,实施SmartLifecycle,将您的 bean 置于后期阶段,并从该start()方法发送消息。

使用打包(例如,阴影)jar 时的注意事项

Spring Integration 通过使用 Spring Framework 的SpringFactories机制加载多个IntegrationConfigurationInitializer类来引导某些功能。这包括-corejar 以及某些其他的,包括-http-jmx. 此过程的信息存储META-INF/spring.factories在每个 jar 中的一个文件中。

一些开发人员更喜欢使用众所周知的工具(例如Apache Maven Shade 插件)将他们的应用程序和所有依赖项重新打包到一个 jar 中。

spring.factories默认情况下,shade 插件在生成 shaded jar 时不会合并文件。

除了 之外spring.factories,其他META-INF文件 (spring.handlersspring.schemas) 用于 XML 配置。这些文件也需要合并。

Spring Boot 的可执行 jar 机制采用了不同的方法,因为它嵌套了 jar,从而将每个spring.factories文件保留在类路径中。因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需再做任何事情。

即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具通过为上述文件添加转换器来增强 shade 插件。以下示例显示了如何配置插件:

示例 1. pom.xml
...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> (1)
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> (2)
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

具体来说,

1 添加spring-boot-maven-plugin作为依赖项。
2 配置变压器。

您可以为显式版本添加属性${spring.boot.version}或使用显式版本。

编程技巧和窍门

本节记录了从 Spring Integration 中获得最大收益的一些方法。

XML 模式

使用 XML 配置时,为避免出现错误的模式验证错误,您应该使用“Spring-aware”IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径解析正确的 XML 模式(通过使用META-INF/spring.schemasjar 中的文件)。当使用带有插件的 STS 或 Eclipse 时,您必须Spring Project Nature在项目上启用。

出于兼容性原因,某些遗留模块(存在于版本 1.0 中的模块)托管在 Internet 上的模式是 1.0 版本。如果您的 IDE 使用这些模式,您可能会看到错误的错误。

这些在线模式中的每一个都有类似于以下内容的警告:

此模式适用于 Spring Integration Core 的 1.0 版本。我们无法将其更新为当前架构,因为这会破坏任何使用 1.0.3 或更低版本的应用程序。对于后续版本,未版本化的模式是从类路径中解析并从 jar 中获取的。请参考github:

受影响的模块是

  • core( spring-integration.xsd)

  • file

  • http

  • jms

  • mail

  • rmi

  • security

  • stream

  • ws

  • xml

查找 Java 和 DSL 配置的类名

通过 XML 配置和 Spring Integration Namespace 支持,XML 解析器隐藏了目标 bean 的声明和连接方式。对于 Java 配置,了解目标最终用户应用程序的框架 API 很重要。

EIP 实现的一等公民是MessageChannelEndpoint(参见本章前面的主要组件)。它们的实现(合同)是:

  • org.springframework.messaging.Message:见留言

  • org.springframework.messaging.MessageChannel:见消息渠道

  • org.springframework.integration.endpoint.AbstractEndpoint: 见轮询器

前两个足够简单,可以理解如何实现、配置和使用。最后一个更值得关注

AbstractEndpoint整个 Spring 框架中广泛用于不同的组件实现。其主要实现有:

  • EventDrivenConsumer, 当我们订阅 aSubscribableChannel来监听消息时使用。

  • PollingConsumer,当我们轮询来自 的消息时使用PollableChannel

当您使用消息注释或 Java DSL 时,您无需担心这些组件,因为框架会自动生成它们并带有适当的注释和BeanPostProcessor实现。手动构建组件时,您应该根据提供的属性使用ConsumerEndpointFactoryBean帮助确定AbstractEndpoint要创建的目标使用者实现。inputChannel

另一方面,ConsumerEndpointFactoryBean代表框架中的另一个一等公民 - org.springframework.messaging.MessageHandler. 该接口实现的目标是处理端点从通道消费的消息。Spring Integration 中的所有 EIP 组件都是MessageHandler实现(例如,AggregatingMessageHandlerMessageTransformingHandlerAbstractMessageSplitter等)。目标协议出站适配器(FileWritingMessageHandlerHttpRequestExecutingMessageHandlerAbstractMqttMessageHandler等)也是MessageHandler实现。MessageHandler当您使用 Java 配置开发 Spring Integration 应用程序时,您应该查看 Spring Integration 模块以找到用于配置的适当实现@ServiceActivator。例如,要发送 XMPP 消息(请参阅XMPP 支持) 您应该配置如下内容:

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

这些MessageHandler实现代表消息流的出站和处理部分。

入站消息流侧有自己的组件,分为轮询和监听行为。监听(消息驱动)组件很简单,通常只需要一个目标类实现即可准备好生成消息。监听组件可以是单向MessageProducerSupport实现(例如AbstractMqttMessageDrivenChannelAdapterand ImapIdleChannelAdapter)或请求-回复MessagingGatewaySupport实现(例如AmqpInboundGatewayand AbstractWebServiceInboundGateway)。

轮询入站端点适用于那些不提供侦听器 API 或不适用于此类行为的协议,包括任何基于文件的协议(如 FTP)、任何数据库(RDBMS 或 NoSQL)等。

这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务,以及一个消息源类,用于从目标协议读取数据并为下游集成流生成消息。轮询器配置的第一类是SourcePollingChannelAdapter. 它是另一种AbstractEndpoint实现,但特别是用于轮询以启动集成流程。通常,使用消息注释或 Java DSL,您不必担心此类。框架根据@InboundChannelAdapter配置或 Java DSL 构建器规范为其生成一个 bean。

消息源组件对于目标应用程序的开发更为重要,它们都实现了MessageSource接口(例如,MongoDbMessageSourceAbstractTwitterMessageSource)。考虑到这一点,我们使用 JDBC 从 RDBMS 表中读取数据的配置可能类似于以下内容:

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

您可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。例如,spring-integration-websocket适配器是:

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:实现MessageProducerSupport侦听套接字上的帧并向通道生成消息。

  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:将传入消息转换为适当的帧并通过 websocket 发送的单向AbstractMessageHandler实现。

如果您熟悉 Spring Integration XML 配置,从 4.3 版开始,我们会在 XSD 元素定义中提供有关哪些目标类用于为适配器或网关声明 bean 的信息,如以下示例所示:

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>

POJO 方法调用

编程注意事项中所述,我们建议使用 POJO 编程风格,如以下示例所示:

@ServiceActivator
public String myService(String payload) { ... }

在这种情况下,框架提取String负载,调用您的方法,并将结果包装在消息中以发送到流中的下一个组件(原始标头被复制到新消息中)。事实上,如果你使用 XML 配置,你甚至不需要@ServiceActivator注解,如以下配对示例所示:

<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }

method只要类的公共方法没有歧义,就可以省略该属性。

您还可以在 POJO 方法中获取标头信息,如以下示例所示:

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

您还可以取消引用消息的属性,如以下示例所示:

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

Because various POJO method invocations are available, versions prior to 5.0 used SpEL (Spring Expression Language) to invoke the POJO methods. SpEL (even interpreted) is usually “fast enough” for these operations, when compared to the actual work usually done in the methods. However, starting with version 5.0, the org.springframework.messaging.handler.invocation.InvocableHandlerMethod is used by default whenever possible. This technique is usually faster to execute than interpreted SpEL and is consistent with other Spring messaging projects. The InvocableHandlerMethod is similar to the technique used to invoke controller methods in Spring MVC. There are certain methods that are still always invoked when using SpEL. Examples include annotated parameters with dereferenced properties, as discussed earlier. This is because SpEL has the capability to navigate a property path.

可能还有一些我们没有考虑过的极端情况也不适用于InvocableHandlerMethod实例。出于这个原因,在这些情况下,我们会自动回退到使用 SpEL。

如果您愿意,您还可以设置您的 POJO 方法,使其始终使用带有UseSpelInvoker注释的 SpEL,如以下示例所示:

@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }

如果compilerMode省略该属性,则spring.expression.compiler.mode系统属性确定编译器模式。有关已编译 SpEL的更多信息,请参阅SpEL 编译。


1. see XML Configuration