交易支持

本章介绍 Spring Integration 对事务的支持。它涵盖以下主题:

了解消息流中的事务

Spring Integration 公开了几个钩子来解决消息流的事务需求。为了更好地理解这些钩子以及如何从中受益,我们必须首先重新审视可用于启动消息流的六种机制,并了解如何在每种机制中满足这些流的事务需求。

以下六种机制启动消息流(本手册中提供了每种机制的详细信息):

  • 网关代理:一个基本的消息传递网关。

  • 消息通道:与方法的直接交互MessageChannel(例如,channel.send(message))。

  • 消息发布者:作为 Spring bean 方法调用的副产品启动消息流的方式。

  • 入站通道适配器和网关:基于连接第三方系统与 Spring Integration 消息系统(例如,[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)发起消息流的方式。

  • 调度器:基于预先配置的调度器分发的调度事件来启动消息流的方式。

  • Poller:与调度器类似,这是基于调度或由预先配置的轮询器分发的基于间隔的事件来启动消息流的方式。

我们可以将这六种机制分为两大类:

  • 由用户进程发起的消息流:此类别中的示例场景将调用网关方法或显式发送 aMessageMessageChannel. 换句话说,这些消息流依赖于第三方进程(比如你写的一些代码)来启动。

  • 由守护进程启动的消息流:此类别中的示例场景包括轮询消息队列以使用轮询消息启动新消息流或调度程序通过创建新消息并在预定义时间启动消息流来调度进程.

显然网关代理,MessageChannel.send(…​)并且MessagePublisher都属于第一类,而入站适配器和网关、调度器和轮询器属于第二类。

那么,如何解决每个类别中各种场景中的事务需求,Spring Integration 是否需要为特定场景的事务提供明确的内容?或者您可以改用 Spring 的事务支持吗?

Spring 本身为事务管理提供了一流的支持。因此,我们的目标不是提供新的东西,而是使用 Spring 从其现有的事务支持中受益。换句话说,作为一个框架,我们必须向 Spring 的事务管理功能公开钩子。然而,由于 Spring Integration 配置是基于 Spring 配置的,我们不需要总是暴露这些钩子,因为 Spring 已经暴露了它们。毕竟,每个 Spring Integration 组件都是一个 Spring Bean。

考虑到这个目标,我们可以再次考虑两种情况:由用户进程发起的消息流和由守护进程发起的消息流。

由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的通常事务配置的约束。因此它们不需要由 Spring Integration 显式配置来支持事务。事务可以而且应该通过 Spring 的标准事务支持来启动。Spring Integration 消息流自然尊重组件的事务语义,因为它本身是由 Spring 配置的。例如,网关或服务激活器方法可以用@TransactionalTransactionInterceptor可以在 XML 配置中定义一个切入点表达式,该表达式指向应该是事务性的特定方法。底线是您可以完全控制这些场景中的事务配置和边界。

但是,当涉及到由守护进程启动的消息流时,情况就有些不同了。尽管由开发人员配置,但这些流程并不直接涉及要启动的人或其他一些过程。这些是基于触发器的流,由触发器进程(守护进程)基于进程的配置启动。例如,我们可以让调度程序在每个星期五晚上启动一个消息流。我们还可以配置一个触发器,每秒启动一个消息流,依此类推。因此,我们需要一种方法来让这些基于触发器的流程知道我们打算使生成的消息流成为事务性的,以便在启动新消息流时可以创建事务上下文。也就是说,我们需要暴露一些事务配置,

轮询器事务支持

Spring Integration 为轮询器提供事务支持。轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以调用receive()本身是事务性的资源,因此包括receive()事务边界中的调用,这样可以在任务失败的情况下回滚。如果我们要为通道添加相同的支持,则添加的事务将影响从send()调用开始的所有下游组件。这为事务划分提供了相当广泛的范围,没有任何充分的理由,尤其是当 Spring 已经提供了多种方法来解决下游任何组件的事务需求时。然而,receive()包含在事务边界中的方法是轮询器的“强有力的理由”。

任何时候配置轮询器时,都可以使用transactional子元素及其属性来提供事务配置,如以下示例所示:

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

上述配置看起来类似于原生 Spring 事务配置。您仍然必须提供对事务管理器的引用,并指定事务属性或依赖默认值(例如,如果未指定“事务管理器”属性,则默认为名为“事务管理器”的 bean)。在内部,流程被包裹在 Spring 的原生事务中,TransactionInterceptor负责处理事务。有关如何配置事务管理器、事务管理器的类型(例如 JTA、Datasource 等)以及与事务配置相关的其他详细信息的更多信息,请参阅Spring Framework 参考指南

使用上述配置,此轮询器发起的所有消息流都是事务性的。有关轮询器事务配置的更多信息和详细信息,请参阅轮询和事务

在运行轮询器时,除了事务之外,您可能还需要解决更多的跨领域问题。为了帮助解决这个问题,poller 元素接受一个<advice-chain>子元素,它允许您定义要应用于 Poller 的自定义建议实例链。(有关详细信息,请参阅Pollable Message Source。)在 Spring Integration 2.0 中,Poller 进行了重构工作,现在使用代理机制来解决事务关注点以及其他横切关注点。从这项工作中产生的重大变化之一是我们使<transactional><advice-chain>元素相互排斥。这背后的基本原理是,如果您需要多个建议,其中之一是交易建议,您可以将其包含在<advice-chain>与以前一样方便,但具有更多控制权,因为您现在可以选择按所需顺序放置建议。以下示例显示了如何执行此操作:

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

txAdvice前面的示例显示了 Spring Transaction 建议 ( )的基于 XML 的基本配置,并将其包含<advice-chain>在 Poller 定义的范围内。如果您只需要解决轮询器的事务性问题,您仍然可以使用该<transactional>元素作为一种方便。

交易边界

另一个重要因素是消息流中事务的边界。当一个事务启动时,事务上下文被绑定到当前线程。因此,无论您的消息流中有多少端点和通道,只要您确保流在同一线程上继续,您的事务上下文都将被保留。一旦你通过引入一个可轮询通道执行器通道来打破它或者在某些服务中手动启动一个新线程,事务边界也会被打破。本质上,事务将在此处结束,并且如果在线程之间发生了成功的切换,则流程将被视为成功,并且即使流程将继续并且可能仍会在下游某处导致异常,也会发送 COMMIT 信号。如果这样的流是同步的,则该异常可能会被扔回消息流的发起者,该发起者也是事务上下文的发起者,并且事务将导致 ROLLBACK。中间立场是在线程边界被打破的任何地方使用事务通道。例如,您可以使用委托给事务性 MessageStore 策略的队列支持通道,或者您可以使用 JMS 支持的通道。

事务同步

在某些环境中,它有助于将操作与包含整个流程的事务同步。例如,考虑<file:inbound-channel-adapter/>在执行多个数据库更新的流程的开头。如果事务提交,我们可能希望将文件移动到一个success目录,而failure如果事务回滚,我们可能希望将它移动到一个目录。

Spring Integration 2.2 引入了将这些操作与事务同步的能力。此外,PseudoTransactionManager如果您没有“真实”事务但仍希望在成功或失败时执行不同的操作,则可以配置 a。有关详细信息,请参阅伪事务

以下清单显示了此功能的关键策略接口:

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

工厂负责创建TransactionSynchronization对象。您可以实现自己的,也可以使用框架提供的:DefaultTransactionSynchronizationFactory. 此实现返回一个TransactionSynchronization委托给默认实现的TransactionSynchronizationProcessor: ExpressionEvaluatingTransactionSynchronizationProcessor。该处理器支持三种 SpEL 表达式:beforeCommitExpressionafterCommitExpressionafterRollbackExpression

这些操作对于那些熟悉交易的人来说应该是不言自明的。在每种情况下,#root变量都是原始的Message。在某些情况下,其他 SpEL 变量可用,具体取决于MessageSource轮询器轮询的对象。例如,MongoDbMessageSource提供#mongoTemplate变量,它引用消息源的MongoTemplate. 类似地,RedisStoreMessageSource提供了#store变量,该变量引用RedisStore了民意调查创建的变量。

要为特定轮询器启用该功能,您可以使用属性提供TransactionSynchronizationFactory对轮询器<transactional/>元素的synchronization-factory引用。

从 5.0 版开始,Spring Integration 提供了,当配置了 no 但通知链中存在类型的通知PassThroughTransactionSynchronizationFactory时,默认情况下将其应用于轮询端点。当使用任何开箱即用的实现时,轮询端点将轮询消息绑定到当前事务上下文,并在事务建议之后抛出异常时将其作为in a提供。当使用未实现的自定义事务建议时,您可以显式配置 a来实现此行为。在任何一种情况下, 都会成为发送到的有效负载,原因是通知引发的原始异常。此前,该TransactionSynchronizationFactoryTransactionInterceptorTransactionSynchronizationFactoryfailedMessageMessagingExceptionTransactionInterceptorPassThroughTransactionSynchronizationFactoryMessagingExceptionErrorMessageerrorChannelErrorMessage有一个负载是通知抛出的原始异常,并且没有提供对failedMessage信息的引用,因此很难确定事务提交问题的原因。

为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例显示如何使用命名空间来配置文件入站通道适配器:

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SpEL 评估的结果作为有效负载发送到committedChannelor rolledBackChannel(在这种情况下,这将是Boolean.TRUEor Boolean.FALSE —java.io.File.renameTo()方法调用的结果)。

如果您希望发送整个有效负载以进行进一步的 Spring 集成处理,请使用“有效负载”表达式。

重要的是要了解这会将操作与事务同步。它不会使本质上不是事务性的资源实际上是事务性的。相反,事务(无论是 JDBC 还是其他方式)在轮询之前启动,并在流程完成时提交或回滚,然后执行同步操作。

如果您提供一个 custom TransactionSynchronizationFactory,它负责创建一个资源同步,使绑定的资源在事务完成时自动解除绑定。默认TransactionSynchronizationFactory通过返回 的子类来实现ResourceHolderSynchronization,默认shouldUnbindAtCompletion()返回true

除了after-commitandafter-rollback表达式,before-commit还支持。在这种情况下,如果评估(或下游处理)抛出异常,事务将回滚而不是提交。

伪交易

阅读事务同步部分后,您可能会认为在流程完成时采取这些“成功”或“失败”操作会很有用,即使轮询器下游没有“真正的”事务资源(例如 JDBC)。例如,考虑一个“<file:inbound-channel-adapter/>”后跟一个“<ftp:outbout-channel-adapter/>”。这些组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。

为了提供这个功能,框架提供了一个PseudoTransactionManager, 启用上述配置,即使没有涉及真正的事务资源。如果流程正常完成,则调用beforeCommitafterCommit同步。失败时,afterRollback调用同步。因为它不是真正的事务,所以不会发生实际的提交或回滚。伪事务是用于启用同步功能的工具。

要使用 a PseudoTransactionManager,您可以将其定义为 <bean/>,就像配置真正的事务管理器一样。以下示例显示了如何执行此操作:

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

反应式交易

从 5.3 版开始,aReactiveTransactionManager也可以与TransactionInterceptor返回反应类型的端点的通知一起使用。这包括产生带有或有效载荷的消息MessageSourceReactiveMessageHandler实现(例如)。当他们的回复有效负载也是某种反应类型时,所有其他产生回复的消息处理程序实现都可以依赖。ReactiveMongoDbMessageSourceFluxMonoReactiveTransactionManager


1. see XML Configuration