JDBC 支持

Spring Integration 通过使用数据库查询提供了用于接收和发送消息的通道适配器。通过这些适配器,Spring Integration 不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数调用。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-jdbc:5.5.13"

默认情况下,以下 JDBC 组件可用:

Spring Integration JDBC Module 还提供了一个JDBC Message Store

入站通道适配器

入站通道适配器的主要功能是执行 SQLSELECT查询并将结果集转换为消息。消息负载是整个结果集(表示为 a List),列表中项目的类型取决于行映射策略。默认策略是一个通用映射器,它Map为查询结果中的每一行返回一个。或者,您可以通过添加对实例的引用来更改此设置(有关行映射的更多详细信息,RowMapper请参阅Spring JDBC文档)。

如果要将SELECT查询结果中的行转换为单个消息,可以使用下游拆分器。

入站适配器还需要引用JdbcTemplate实例或DataSource.

除了SELECT生成消息的语句外,适配器还有一个UPDATE语句,将记录标记为已处理,以便它们不会出现在下一次轮询中。更新可以通过来自原始选择的 ID 列表进行参数化。默认情况下,这是通过命名约定来完成的(输入结果集中称为的列id被转换为参数映射中的列表以用于更新称为id)。以下示例定义了一个带有更新查询和DataSource引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
更新查询中的参数在参数名称前使用冒号 ( :) 前缀指定(在前面的示例中,该参数是要应用于轮询结果集中的每一行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准特性,结合了 Spring Integration 中采用的约定(投影到轮询结果列表上)。底层 Spring JDBC 特性限制了可用的表达式(例如,不允许使用除句点之外的大多数特殊字符),但由于目标通常是可通过 bean 路径寻址的对象列表(可能是一个对象的列表),因此这不是限制不当。

要更改参数生成策略,您可以将 aSqlParameterSourceFactory注入适配器以覆盖默认行为(适配器具有sql-parameter-source-factory属性)。Spring Integration 提供了ExpressionEvaluatingSqlParameterSourceFactory,它创建了一个基于 SpEL 的参数源,以查询的结果作为#root对象。(如果update-per-row为真,则根对象为行)。如果更新查询中多次出现相同的参数名称,则只计算一次,并缓存其结果。

您还可以将参数源用于选择查询。在这种情况下,由于没有要评估的“结果”对象,因此每次都使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,您可以使用 Spring 创建基于 SpEL 的参数源,如以下示例所示:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个value参数表达式中的 可以是任何有效的 SpEL 表达式。#root表达式求值的对象是定义在parameterSourcebean 上的构造函数参数。它对于所有评估都是静态的(在前面的示例中,一个空的String)。

从版本 5.0 开始,您可以提供ExpressionEvaluatingSqlParameterSourceFactory用于sqlParameterTypes指定特定参数的目标 SQL 类型。

以下示例为查询中使用的参数提供 SQL 类型:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
使用createParameterSourceNoCache工厂方法。否则,参数源缓存评估结果。另请注意,由于禁用了缓存,如果相同的参数名称多次出现在选择查询中,则每次出现都会重新计算。

轮询和交易

入站适配器接受常规 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。用于 JDBC 的轮询器的一个重要特性是将轮询操作包装在事务中的选项,如以下示例所示:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
如果您没有明确指定轮询器,则使用默认值。与 Spring Integration 一样,它可以定义为顶级 bean)。

在前面的示例中,数据库每 1000 毫秒(或每秒一次)轮询一次,并且更新和选择查询都在同一个事务中执行。事务管理器配置未显示。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),以便在同一个线程中调用端点,因此在同一个事务中。这样,如果其中任何一个失败,事务就会回滚,输入数据将恢复到其原始状态。

max-rows相对max-messages-per-poll

JDBC 入站通道适配器定义了一个名为max-rows. 当您指定适配器的轮询器时,您还可以定义一个名为max-messages-per-poll. 虽然这两个属性看起来很相似,但它们的含义却截然不同。

max-messages-per-poll指定每个轮询间隔执行查询的次数,而max-rows指定每次执行返回的行数。

在正常情况下,您可能不想在max-messages-per-poll使用 JDBC 入站通道适配器时设置轮询器的属性。它的默认值是1,这意味着 JDBC 入站通道适配器的receive()方法在每个轮询间隔中只执行一次。

max-messages-per-poll属性设置为更大的值意味着查询会连续多次执行。有关该max-messages-per-poll属性的更多信息,请参阅配置入站通道适配器

相反,max-rows如果该属性大于0,则指定该receive()方法创建的查询结果集中要使用的最大行数。如果该属性设置为0,则所有行都包含在结果消息中。该属性默认为0.

建议通过供应商特定的查询选项使用结果集限制,例如 MySQLLIMIT或 SQL ServerTOP或 Oracle 的ROWNUM. 有关详细信息,请参阅特定的供应商文档。

出站通道适配器

出站通道适配器与入站相反:它的作用是处理消息并使用它来执行 SQL 查询。默认情况下,消息负载和标头可用作查询的输入参数,如以下示例所示:

<int-jdbc:outbound-channel-adapter
    query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
    data-source="dataSource"
    channel="input"/>

在前面的示例中,到达标记为通道的消息input具有映射的有效负载,其键为something,因此[]操作员从映射中取消引用该值。标题也作为映射访问。

前面查询中的参数是传入消息的 bean 属性表达式(不是 SpEL 表达式)。此行为是 的一部分SqlParameterSource,它是出站适配器创建的默认源。您可以注入不同SqlParameterSourceFactory的以获得不同的行为。

出站适配器需要对 aDataSource或 a的引用JdbcTemplate。您还可以注入 aSqlParameterSourceFactory来控制每个传入消息与查询的绑定。

如果输入通道是直接通道,则出站适配器在同一线程中运行其查询,因此,与消息的发送者在同一事务(如果有)中运行。

使用 SpEL 表达式传递参数

大多数 JDBC 通道适配器的一个常见要求是将参数作为 SQL 查询或存储过程或函数的一部分传递。如前所述,这些参数默认是 bean 属性表达式,而不是 SpEL 表达式。但是,如果您需要将 SpEL 表达式作为参数传递,则必须显式注入一个SqlParameterSourceFactory.

以下示例使用 aExpressionEvaluatingSqlParameterSourceFactory来实现该要求:

<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
    query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE)     \
    values (:id, :payload, :createdDate)"
    sql-parameter-source-factory="spelSource"/>

<bean id="spelSource"
      class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="id"          value="headers['id'].toString()"/>
            <entry key="createdDate" value="new java.util.Date()"/>
            <entry key="payload"     value="payload"/>
        </map>
    </property>
</bean>

有关详细信息,请参阅定义参数源

使用PreparedStatement回调

有时,灵活性和松耦合SqlParameterSourceFactory并不能满足我们对目标的需求,PreparedStatement或者我们需要做一些低级的 JDBC 工作。Spring JDBC 模块提供 API 来配置执行环境(例如ConnectionCallbackPreparedStatementCreator)和操作参数值(例如SqlParameterSource)。它甚至可以访问用于低级操作的 API,例如StatementCallback.

从 Spring Integration 4.2 开始,MessagePreparedStatementSetter允许在上下文中PreparedStatement手动指定参数。requestMessage这个类的作用与标准 Spring JDBC API 中的作用完全相同PreparedStatementSetter。实际上,PreparedStatementSetterJdbcMessageHandler在.executeJdbcTemplate

此功能接口选项sqlParameterSourceFactoryPreparedStatement. requestMessage例如,当我们需要以流的方式将File数据存储到 DataBaseBLOB列时,它很有用。以下示例显示了如何执行此操作:

@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
    JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
            "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
    jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
        ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
        try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
            ps.setBlob(2, inputStream);
        }
        catch (Exception e) {
            throw new MessageHandlingException(m, e);
        }
        ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
    });
    return jdbcMessageHandler;
}

从 XML 配置的角度来看,该prepared-statement-setter属性在<int-jdbc:outbound-channel-adapter>组件上可用。它允许您指定MessagePreparedStatementSetterbean 引用。

批量更新

从版本 5.1 开始,如果请求消息的有效负载是实例,则JdbcMessageHandler执行 a 。的每个元素都使用请求消息中的标头包装到 a中。在基于常规的配置的情况下,这些消息用于为上述函数中使用的参数构建一个。应用配置时,将使用一个变体来迭代每个项目的这些消息,并针对它们调用提供的消息。选择模式时不支持批量更新。JdbcOperations.batchUpdate()IterableIterableMessageSqlParameterSourceFactorySqlParameterSource[]JdbcOperations.batchUpdate()MessagePreparedStatementSetterBatchPreparedStatementSetterMessagePreparedStatementSetterkeysGenerated

出站网关

出站网关就像出站和入站适配器的组合:它的作用是处理消息并使用它来执行 SQL 查询,然后通过将结果发送到回复通道来响应结果。默认情况下,消息负载和标头可用作查询的输入参数,如以下示例所示:

<int-jdbc:outbound-gateway
    update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource" />

前面示例的结果是向表中插入一条记录,并向输出通道mythings返回一条消息,指示受影响的行数(有效负载是一个映射:)。{UPDATED=1}

如果更新查询是带有自动生成键的插入,您可以通过添加keys-generated="true"到前面的示例来使用生成的键填充回复消息(这不是默认设置,因为某些数据库平台不支持它)。以下示例显示了更改后的配置:

<int-jdbc:outbound-gateway
    update="insert into mythings (status, name) values (0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource"
    keys-generated="true"/>

除了更新计数或生成的键之外,您还可以提供一个选择查询来执行并根据结果(例如入站适配器)生成回复消息,如以下示例所示:

<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    query="select * from foos where id=:headers[$id]"
    request-channel="input" reply-channel="output" data-source="dataSource"/>

从 Spring Integration 2.2 开始,更新 SQL 查询不再是强制性的。您现在可以通过使用query属性或query元素仅提供选择查询。如果您需要使用通用网关或有效负载丰富器等主动检索数据,这将非常有用。然后从结果中生成回复消息(类似于入站适配器的工作方式)并传递给回复通道。以下示例显示使用该query属性:

<int-jdbc:outbound-gateway
    query="select * from foos where id=:headers[id]"
    request-channel="input"
    reply-channel="output"
    data-source="dataSource"/>

默认情况下,SELECT查询组件仅从游标返回一个(第一)行。max-rows您可以使用该选项调整此行为。如果您需要从 SELECT 返回所有行,请考虑指定max-rows="0".

与通道适配器一样,您还可以提供SqlParameterSourceFactory请求和回复的实例。默认值与出站适配器相同,因此请求消息可用作表达式的根。如果keys-generated="true",则表达式的根是生成的键(如果只有一个则为映射,如果为多值则为映射列表)。

出站网关需要对 aDataSource或 a的引用JdbcTemplate。它还可以有一个SqlParameterSourceFactory注入来控制传入消息与查询的绑定。

从 4.2 版开始,该request-prepared-statement-setter属性可<int-jdbc:outbound-gateway>用于request-sql-parameter-source-factory. 它允许您指定一个MessagePreparedStatementSetterbean 引用,它在执行之前实现更复杂的PreparedStatement准备。

有关. _ _MessagePreparedStatementSetter

JDBC 消息存储

Spring Integration 提供了两个 JDBC 特定的消息存储实现。这JdbcMessageStore适用于聚合器和声明检查模式。该JdbcChannelMessageStore实现专门为消息通道提供了更有针对性和可扩展性的实现。

请注意,您可以使用 aJdbcMessageStore来支持消息通道,JdbcChannelMessageStore并为此目的进行了优化。

从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore已经优化了 的索引。如果您在此类存储中有大型消息组,您可能希望更改索引。此外,索引 forPriorityChannel已被注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。
使用 时,必须OracleChannelMessageStoreQueryProvider添加优先通道索引,因为它包含在查询的提示中。

初始化数据库

在开始使用 JDBC 消息存储组件之前,您应该为目标数据库提供适当的对象。

Spring Integration 附带了一些可用于初始化数据库的示例脚本。在spring-integration-jdbcJAR 文件中,您可以在org.springframework.integration.jdbc包中找到脚本。它为一系列常见数据库平台提供了示例创建和示例删除脚本。使用这些脚本的一种常见方法是在Spring JDBC 数据源初始化程序中引用它们。请注意,这些脚本是作为示例提供的,并且作为所需表和列名称的规范提供。您可能会发现需要增强它们以供生产使用(例如,通过添加索引声明)。

通用 JDBC 消息存储

JDBC 模块提供了由数据库支持的 Spring Integration MessageStore(在声明检查模式中MessageGroupStore很重要)和(在有状态模式中很重要,例如聚合器)的实现。这两个接口都由 实现JdbcMessageStore,并且支持在 XML 中配置 store 实例,如下例所示:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

您可以指定 aJdbcTemplate而不是 a DataSource

以下示例显示了一些其他可选属性:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前面的示例中,我们指定了一个LobHandler用于将消息作为大对象处理(这对于 Oracle 来说通常是必需的),并为存储生成的查询中的表名指定了一个前缀。表名前缀默认为INT_.

支持消息通道

如果您打算使用 JDBC 支持消息通道,我们建议使用该JdbcChannelMessageStore实现。它仅与消息通道一起使用。

支持的数据库

使用JdbcChannelMessageStore特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须ChannelMessageStoreQueryProviderJdbcChannelMessageStore. 这channelMessageStoreQueryProvider为您指定的特定数据库提供 SQL 查询。Spring Integration 提供对以下关系数据库的支持:

  • PostgreSQL

  • 数据库

  • MySQL

  • 甲骨文

  • 德比

  • H2

  • SqlServer

  • 赛贝斯

  • DB2

如果您的数据库未列出,您可以扩展AbstractChannelMessageStoreQueryProvider该类并提供您自己的自定义查询。

4.0 版将该MESSAGE_SEQUENCE列添加到表中,以确保即使消息存储在同一毫秒内,也可以进行先进先出 (FIFO) 排队。

自定义消息插入

从 5.0 版本开始,通过重载ChannelMessageStorePreparedStatementSetter类,您可以在JdbcChannelMessageStore. 您可以使用它来设置不同的列或更改表结构或序列化策略。例如byte[],您可以将其结构存储为 JSON 字符串,而不是默认序列化为 。

以下示例使用 的默认实现setValues来存储公共列并覆盖将消息有效负载存储为的行为varchar

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}

通常,我们不建议使用关系数据库进行排队。相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。如需进一步参考,请参阅以下资源:

并发轮询

轮询消息通道时,您可以选择配置PollerTaskExecutor引用关联的消息。

但请记住,如果您使用 JDBC 支持的消息通道,并且您计划轮询通道并因此使用多个线程以事务方式轮询消息存储,则应确保使用支持多版本并发控制(MVCC) 的关系数据库。否则,锁定可能是一个问题,并且在使用多个线程时,性能可能无法按预期实现。例如,Apache Derby 在这方面存在问题。

为了获得更好的 JDBC 队列吞吐量并避免不同线程可能Message从队列中轮询相同的问题,在使用不支持 MVCC 的数据库时设置 的属性很重要。以下示例显示了如何执行此操作:usingIdCacheJdbcChannelMessageStoretrue

<bean id="queryProvider"
    class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

<task:executor id="pool" pool-size="10"
    queue-capacity="10" rejection-policy="CALLER_RUNS" />

<bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
    <property name="region" value="TX_TIMEOUT"/>
    <property name="usingIdCache" value="true"/>
</bean>

<int:channel id="inputChannel">
    <int:queue message-store="store"/>
</int:channel>

<int:bridge input-channel="inputChannel" output-channel="outputChannel">
    <int:poller fixed-delay="500" receive-timeout="500"
        max-messages-per-poll="1" task-executor="pool">
        <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"
        isolation="READ_COMMITTED" transaction-manager="transactionManager" />
    </int:poller>
</int:bridge>

<int:channel id="outputChannel" />
优先频道

从 4.0 版开始,JdbcChannelMessageStore实现PriorityCapableChannelMessageStore并提供priorityEnabled选项,让它用作实例的message-store参考。priority-queue为此,该INT_CHANNEL_MESSAGE表有一个MESSAGE_PRIORITY列来存储PRIORITY消息头的值。此外,一个新MESSAGE_SEQUENCE列使我们能够实现强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同优先级存储多条消息也是如此。使用 .从数据库中轮询(选择)消息order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE

我们不建议JdbcChannelMessageStore对优先级和非优先级队列通道使用相同的 bean,因为该priorityEnabled选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。但是,同一个INT_CHANNEL_MESSAGE表(甚至region)可以用于这两种JdbcChannelMessageStore类型。要配置该场景,您可以从另一个扩展一个消息存储 bean,如以下示例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

对消息存储进行分区

通常将 aJdbcMessageStore用作同一应用程序中的一组应用程序或节点的全局存储。为了提供一些防止名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。一种方法是使用单独的表名,通过更改前缀(如前所述)。另一种方法是为region单个表中的数据分区指定一个名称。第二种方法的一个重要用例是当MessageStore正在管理支持 Spring 集成消息通道的持久队列。持久通道的消息数据在通道名称的存储中键入。因此,如果通道名称不是全局唯一的,则通道可以拾取不适合它们的数据。为了避免这种危险,您可以使用消息存储region为具有相同逻辑名称的不同物理通道保持数据分离。

存储过程

在某些情况下,纯 JDBC 支持是不够的。也许您处理遗留的关系数据库模式或者您有复杂的数据处理需求,但最终您必须使用存储过程或存储函数。从 Spring Integration 2.1 开始,我们提供了三个组件来执行存储过程或存储函数:

  • 存储过程入站通道适配器

  • 存储过程出站通道适配器

  • 存储过程出站网关

支持的数据库

为了能够调用存储过程和存储函数,存储过程组件使用org.springframework.jdbc.core.simple.SimpleJdbcCall该类。因此,完全支持以下数据库来执行存储过程:

  • 阿帕奇德比

  • DB2

  • MySQL

  • 微软 SQL 服务器

  • 甲骨文

  • PostgreSQL

  • 赛贝斯

如果要改为执行存储函数,则完全支持以下数据库:

  • MySQL

  • 微软 SQL 服务器

  • 甲骨文

  • PostgreSQL

即使您的特定数据库可能没有得到完全支持,但您仍然可以非常成功地使用存储过程 Spring Integration 组件,前提是您的 RDBMS 支持存储过程或存储函数。

事实上,一些提供的集成测试使用H2 数据库。尽管如此,彻底测试这些使用场景非常重要。

配置

存储过程组件提供完整的 XML 命名空间支持,并且配置组件类似于前面讨论的通用 JDBC 组件。

常用配置属性

所有存储过程组件共享某些配置参数:

  • auto-startup:生命周期属性,指示是否应在应用程序上下文启动期间启动此组件。它默认为true. 可选的。

  • data-source: 引用 a javax.sql.DataSource,用于访问数据库。必需的。

  • id: 标识底层 Spring bean 定义,它是一个EventDrivenConsumer或的实例PollingConsumer,取决于出站通道适配器的channel属性是引用 aSubscribableChannel还是 a PollableChannel。可选的。

  • ignore-column-meta-data:对于完全支持的数据库,底层SimpleJdbcCall类可以自动从JDBC元数据中检索存储过程或存储函数的参数信息。

    但是,如果数据库不支持元数据查找或者您需要提供自定义参数定义,则可以将此标志设置为true。它默认为false. 可选的。

  • is-function: 如果true,则调用 SQL 函数。在这种情况下,stored-procedure-nameorstored-procedure-name-expression属性定义了被调用函数的名称。它默认为false. 可选的。

  • stored-procedure-name:此属性指定存储过程的名称。如果is-function属性设置为true,则此属性指定函数名称。stored-procedure-name-expression必须指定此属性或。

  • stored-procedure-name-expression: 该属性使用 SpEL 表达式指定存储过程的名称。通过使用 SpEL,您可以访问完整的消息(如果可用),包括其标头和有效负载。您可以使用此属性在运行时调用不同的存储过程。例如,您可以提供您希望作为消息头执行的存储过程名称。表达式必须解析为String.

    如果该is-function属性设置为true,则该属性指定一个存储函数。stored-procedure-name必须指定此属性或。

  • jdbc-call-operations-cache-sizeSimpleJdbcCallOperations:定义缓存实例的最大数量。基本上,对于每个存储过程名称,SimpleJdbcCallOperations都会创建一个新实例,作为回报,它会被缓存。

    Spring Integration 2.2 增加了stored-procedure-name-expression属性和jdbc-call-operations-cache-size属性。

    默认缓存大小为10. 值0禁用缓存。不允许使用负值。

    如果启用 JMX,有关的统计信息jdbc-call-operations-cache将作为 MBean 公开。有关详细信息,请参阅MBean 导出器。

  • sql-parameter-source-factory: (不适用于存储过程入站通道适配器。)对SqlParameterSourceFactory. 默认情况下,传入Message有效负载的 bean 属性通过使用BeanPropertySqlParameterSourceFactory.

    这对于基本用例可能就足够了。对于更复杂的选项,请考虑传入一个或多个ProcedureParameter值。请参阅定义参数源。可选的。

  • use-payload-as-parameter-source: (不适用于存储过程入站通道适配器。)如果设置为true,则 的有效负载Message用作提供参数的源。但是,如果设置为false,则整体Message可用作参数的来源。

    如果没有传入过程参数,则此属性默认为true。这意味着,通过使用默认值BeanPropertySqlParameterSourceFactory,有效负载的 bean 属性将用作存储过程或存储函数的参数值的来源。

    但是,如果传入过程参数,则此属性(默认情况下)的计算结果为falseProcedureParameter让我们提供 SpEL 表达式。因此,访问整个Message. 该属性设置在基础StoredProcExecutor. 可选的。

通用配置子元素

存储过程组件共享一组公共子元素,您可以使用它们来定义参数并将参数传递给存储过程或存储函数。可以使用以下元素:

  • parameter

  • returning-resultset

  • sql-parameter-definition

  • poller

  • parameter:提供一种机制来提供存储过程参数。参数可以是静态的,也可以使用 SpEL 表达式提供。

    <int-jdbc:parameter name=""         (1)
                        type=""         (2)
                        value=""/>      (3)
    
    <int-jdbc:parameter name=""
                        expression=""/> (4)

    + <1> 要传递给存储过程或存储函数的参数的名称。必需的。<2> 该属性指定值的类型。如果未提供任何内容,则此属性默认为java.lang.String. 仅在使用该属性时才使用该value属性。可选的。<3> 参数的值。您必须提供此属性或expression属性。可选的。<4> 除了value属性,您可以指定一个SpEL 表达式来传递参数的值。如果指定expressionvalue则不允许使用该属性。可选的。

    可选的。

  • returning-resultset: 存储过程可能会返回多个结果集。通过设置一个或多个returning-resultset元素,您可以指定RowMappers将每个返回的对象转换ResultSet为有意义的对象。可选的。

    <int-jdbc:returning-resultset name="" row-mapper="" />
  • sql-parameter-definition:如果您使用完全受支持的数据库,您通常不必指定存储过程参数定义。相反,这些参数可以自动从 JDBC 元数据派生。但是,如果您使用不完全支持的数据库,则必须使用sql-parameter-definition元素显式设置这些参数。

    ignore-column-meta-data您还可以使用该属性选择关闭对通过 JDBC 获取的参数元数据信息的任何处理。

    <int-jdbc:sql-parameter-definition
                                       name=""                           (1)
                                       direction="IN"                    (2)
                                       type="STRING"                     (3)
                                       scale="5"                         (4)
                                       type-name="FOO_STRUCT"            (5)
                                       return-type="fooSqlReturnType"/>  (6)
    1 指定 SQL 参数的名称。必需的。
    2 指定 SQL 参数定义的方向。默认为IN. 有效值为:INOUTINOUT。如果您的过程正在返回结果集,请使用该returning-resultset元素。可选的。
    3 用于此 SQL 参数定义的 SQL 类型。转换为由 定义的整数值java.sql.Types。或者,您也可以提供整数值。如果未明确设置此属性,则默认为“VARCHAR”。可选的。
    4 SQL 参数的小数位数。仅用于数字和十进制参数。可选的。
    5 用于用户命名的类型,typeName例如:STRUCT、、、和命名数组类型。该属性与该属性互斥。可选的。DISTINCTJAVA_OBJECTscale
    6 对复杂类型的自定义值处理程序的引用。的一个实现SqlReturnType。该属性与属性互斥,scale仅适用于 OUT 和 INOUT 参数。可选的。
  • poller:如果此端点是PollingConsumer. 可选的。

定义参数源

参数源控制检索 Spring Integration 消息属性并将其映射到相关存储过程输入参数的技术。

存储过程组件遵循一定的规则。默认情况下,Message有效负载的 bean 属性用作存储过程输入参数的来源。在这种情况下,BeanPropertySqlParameterSourceFactory使用 a。这对于基本用例可能就足够了。下一个示例说明了该默认行为。

对于使用BeanPropertySqlParameterSourceFactoryto 来“自动”查找 bean 属性,您的 bean 属性必须以小写形式定义。这是因为在org.springframework.jdbc.core.metadata.CallMetaDataContext(Java 方法是matchInParameterValuesWithCallParameters())中,检索到的存储过程参数声明被转换为小写。因此,如果您有驼峰式 bean 属性(例如lastName),则查找失败。在这种情况下,提供一个明确的ProcedureParameter.

假设我们有一个负载,它由一个简单的 bean 组成,它具有以下三个属性:idnamedescription。此外,我们有一个称为的简单存储过程INSERT_COFFEE,它接受三个输入参数:idnamedescription。我们还使用完全支持的数据库。在这种情况下,存储过程出站适配器的以下配置就足够了:

<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
    channel="insertCoffeeProcedureRequestChannel"
    stored-procedure-name="INSERT_COFFEE"/>

对于更复杂的选项,请考虑传入一个或多个ProcedureParameter值。

如果您ProcedureParameter明确提供值,默认情况下,anExpressionEvaluatingSqlParameterSourceFactory用于参数处理,以启用 SpEL 表达式的全部功能。

如果您需要对如何检索参数进行更多控制,请考虑SqlParameterSourceFactory使用sql-parameter-source-factory属性传入自定义实现。

存储过程入站通道适配器

以下清单列出了对存储过程入站通道适配器很重要的属性:

<int-jdbc:stored-proc-inbound-channel-adapter
                                   channel=""                                    (1)
                                   stored-procedure-name=""
                                   data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   skip-undeclared-results=""                    (2)
                                   return-value-required="false"                 (3)
    <int:poller/>
    <int-jdbc:sql-parameter-definition name="" direction="IN"
                                               type="STRING"
                                               scale=""/>
    <int-jdbc:parameter name="" type="" value=""/>
    <int-jdbc:parameter name="" expression=""/>
    <int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 轮询消息发送到的通道。如果存储过程或函数不返回任何数据,则其有效负载Message为空。必需的。
2 如果此属性设置为,则绕过true存储过程调用中没有相应声明的所有结果。SqlOutParameter例如,存储过程可以返回一个更新计数值,即使您的存储过程只声明了一个结果参数。确切的行为取决于数据库实现。该值是在底层证券上设置的JdbcTemplate。该值默认为true。可选的。
3 指示是否应包含此过程的返回值。从 Spring 集成 3.0 开始。可选的。

存储过程出站通道适配器

以下清单列出了对存储过程出站通道适配器很重要的属性:

<int-jdbc:stored-proc-outbound-channel-adapter channel=""                        (1)
                                               stored-procedure-name=""
                                               data-source=""
                                               auto-startup="true"
                                               id=""
                                               ignore-column-meta-data="false"
                                               order=""                          (2)
                                               sql-parameter-source-factory=""
                                               use-payload-as-parameter-source="">
    <int:poller fixed-rate=""/>
    <int-jdbc:sql-parameter-definition name=""/>
    <int-jdbc:parameter name=""/>

</int-jdbc:stored-proc-outbound-channel-adapter>
1 此端点的接收消息通道。必需的。
2 指定此端点作为订阅者连接到通道时的调用顺序。当该通道使用failover调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询消费者时,它不起作用。可选的。

存储过程出站网关

以下清单列出了对存储过程出站通道适配器很重要的属性:

<int-jdbc:stored-proc-outbound-gateway request-channel=""                        (1)
                                       stored-procedure-name=""
                                       data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   order=""
                                   reply-channel=""                              (2)
                                   reply-timeout=""                              (3)
                                   return-value-required="false"                 (4)
                                   skip-undeclared-results=""                    (5)
                                   sql-parameter-source-factory=""
                                   use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
                                   type=""
                                   scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 此端点的接收消息通道。必需的。
2 收到数据库响应后应向其发送回复的消息通道。可选的。
3 让您指定此网关在引发异常之前等待成功发送回复消息的时间。请记住,当发送到 a 时DirectChannel,调用发生在发送者的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。默认情况下,网关无限期等待。该值以毫秒为单位指定。可选的。
4 指示是否应包含此过程的返回值。可选的。
5 如果该skip-undeclared-results属性设置为,则绕过true存储过程调用中没有相应声明的所有结果。SqlOutParameter例如,存储过程可能会返回一个更新计数值,即使您的存储过程只声明了一个结果参数。确切的行为取决于数据库。该值是在底层证券上设置的JdbcTemplate。该值默认为true。可选的。

例子

本节包含两个调用Apache Derby存储过程的示例。第一个过程调用一个存储过程,该过程返回一个ResultSet. 通过使用 a RowMapper,将数据转换为域对象,然后成为 Spring Integration 消息有效负载。

在第二个示例中,我们调用了一个使用输出参数返回数据的存储过程。

该项目包含此处引用的 Apache Derby 示例,以及有关如何运行它的说明。Spring Integration Samples 项目还提供了使用 Oracle 存储过程的示例

在第一个示例中,我们调用了一个名为的存储过程FIND_ALL_COFFEE_BEVERAGES,它没有定义任何输入参数,但返回一个ResultSet.

在 Apache Derby 中,存储过程是用 Java 实现的。以下清单显示了方法签名:

public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
            throws SQLException {
    ...
}

以下清单显示了相应的 SQL:

CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';

在 Spring Integration 中,您现在可以使用例如 a 来调用此存储过程stored-proc-outbound-gateway,如以下示例所示:

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
                                       data-source="dataSource"
                                       request-channel="findAllProcedureRequestChannel"
                                       expect-single-result="true"
                                       stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
    row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>

在第二个示例中,我们调用一个名为的存储过程FIND_COFFEE,该过程具有一个输入参数。它不返回 a ResultSet,而是使用输出参数。以下示例显示了方法签名:

public static void findCoffee(int coffeeId, String[] coffeeDescription)
            throws SQLException {
    ...
}

以下清单显示了相应的 SQL:

CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';

在 Spring Integration 中,您现在可以使用例如 a 来调用此存储过程stored-proc-outbound-gateway,如以下示例所示:

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
                                       data-source="dataSource"
                                       request-channel="findCoffeeProcedureRequestChannel"
                                       skip-undeclared-results="true"
                                       stored-procedure-name="FIND_COFFEE"
                                       expect-single-result="true">
    <int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>

JDBC 锁注册表

4.3 版引入了JdbcLockRegistry. 某些组件(例如,聚合器和重排序器)使用从LockRegistry实例获得的锁来确保一次只有一个线程操作一个组。在DefaultLockRegistry单个组件中执行此功能。您现在可以在这些组件上配置外部锁定注册表。与 shared 一起使用时MessageGroupStore,您可以使用JdbcLockRegistry跨多个应用程序实例提供此功能,这样一次只有一个实例可以操作该组。

当一个本地线程释放一个锁时,另一个本地线程通常可以立即获得该锁。如果锁由使用不同注册表实例的线程释放,则获取锁最多可能需要 100 毫秒。

JdbcLockRegistry是基于LockRepository抽象的,它有一个实现DefaultLockRepository。数据库模式脚本位于org.springframework.integration.jdbc包中,该包针对特定的 RDBMS 供应商进行了划分。例如,以下清单显示了锁表的 H2 DDL:

CREATE TABLE INT_LOCK  (
    LOCK_KEY CHAR(36),
    REGION VARCHAR(100),
    CLIENT_ID CHAR(36),
    CREATED_DATE TIMESTAMP NOT NULL,
    constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);

INT_可以根据目标数据库设计要求进行更改。因此,您必须在bean 定义上使用prefix属性。DefaultLockRepository

有时,一个应用程序已经进入这样一种状态,它无法释放分布式锁并删除数据库中的特定记录。为此,其他应用程序可以在下一次锁定调用时使此类死锁过期。为此提供了( timeToLiveTTL) 选项。DefaultLockRepository您可能还想指定CLIENT_ID为给定DefaultLockRepository实例存储的锁。如果是这样,您可以将id要与 关联的指定DefaultLockRepository为构造函数参数。

从版本 5.1.8 开始,JdbcLockRegistry可以使用 -a 配置idleBetweenTriesDuration锁定记录插入/更新执行之间休眠。默认情况下是100毫秒,在某些环境中,非领导者过于频繁地污染与数据源的连接。

从 5.4 版本开始,该RenewableLockRegistry接口已被引入并添加到JdbcLockRegistry. 该renewLock()方法必须在锁定过程中调用,以防被锁定的过程会比锁的生存时间长。因此,生存时间可以大大减少,部署可以快速重新获得丢失的锁。

只有当前线程持有锁,才能进行锁更新。

版本为 5.5.6 的字符串,支持通过JdbcLockRegistry自动清理 JdbcLock 的缓存。有关更多信息,请参阅其 JavaDocs。JdbcLockRegistry.locksJdbcLockRegistry.setCacheCapacity()

JDBC 元数据存储

5.0 版引入了 JDBC MetadataStore(参见Metadata Store)实现。您可以使用JdbcMetadataStore来维护跨应用程序重新启动的元数据状态。此MetadataStore实现可与以下适配器一起使用:

要将这些适配器配置为使用JdbcMetadataStore,请使用 bean 名称声明 Spring bean metadataStore。Feed 入站通道适配器和 Feed 入站通道适配器都自动拾取并使用声明的JdbcMetadataStore,如以下示例所示:

@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

org.springframework.integration.jdbc软件包具有多个 RDMBS 供应商的数据库模式脚本。例如,以下清单显示了元数据表的 H2 DDL:

CREATE TABLE INT_METADATA_STORE  (
	METADATA_KEY VARCHAR(255) NOT NULL,
	METADATA_VALUE VARCHAR(4000),
	REGION VARCHAR(100) NOT NULL,
	constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);

You can change the INT_ prefix to match the target database design requirements. You can also configure JdbcMetadataStore to use the custom prefix.

The JdbcMetadataStore implements ConcurrentMetadataStore, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key’s value. All of these operations are atomic, thanks to transaction guarantees.

Transaction management must use JdbcMetadataStore. Inbound channel adapters can be supplied with a reference to the TransactionManager in the poller configuration. Unlike non-transactional MetadataStore implementations, with JdbcMetadataStore, the entry appears in the target table only after the transaction commits. When a rollback occurs, no entries are added to the INT_METADATA_STORE table.

从版本 5.0.7 开始,您可以JdbcMetadataStore使用 RDBMS 供应商特定lockHint选项配置元数据存储条目上基于锁的查询。默认情况下,FOR UPDATE如果目标数据库不支持行锁定功能,它可以配置为空字符串。SELECT请咨询您的供应商,以了解表达式中更新前锁定行的特定和可能提示。


1. see XML Configuration