JDBC 支持
Spring Integration 通过使用数据库查询提供了用于接收和发送消息的通道适配器。通过这些适配器,Spring Integration 不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数调用。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-jdbc:5.5.13"
默认情况下,以下 JDBC 组件可用:
Spring Integration JDBC Module 还提供了一个JDBC Message Store。
入站通道适配器
入站通道适配器的主要功能是执行 SQLSELECT
查询并将结果集转换为消息。消息负载是整个结果集(表示为 a List
),列表中项目的类型取决于行映射策略。默认策略是一个通用映射器,它Map
为查询结果中的每一行返回一个。或者,您可以通过添加对实例的引用来更改此设置(有关行映射的更多详细信息,RowMapper
请参阅Spring JDBC文档)。
如果要将SELECT 查询结果中的行转换为单个消息,可以使用下游拆分器。
|
入站适配器还需要引用JdbcTemplate
实例或DataSource
.
除了SELECT
生成消息的语句外,适配器还有一个UPDATE
语句,将记录标记为已处理,以便它们不会出现在下一次轮询中。更新可以通过来自原始选择的 ID 列表进行参数化。默认情况下,这是通过命名约定来完成的(输入结果集中称为的列id
被转换为参数映射中的列表以用于更新称为id
)。以下示例定义了一个带有更新查询和DataSource
引用的入站通道适配器。
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
更新查询中的参数在参数名称前使用冒号 ( : ) 前缀指定(在前面的示例中,该参数是要应用于轮询结果集中的每一行的表达式)。这是 Spring JDBC 中命名参数 JDBC 支持的标准特性,结合了 Spring Integration 中采用的约定(投影到轮询结果列表上)。底层 Spring JDBC 特性限制了可用的表达式(例如,不允许使用除句点之外的大多数特殊字符),但由于目标通常是可通过 bean 路径寻址的对象列表(可能是一个对象的列表),因此这不是限制不当。
|
要更改参数生成策略,您可以将 aSqlParameterSourceFactory
注入适配器以覆盖默认行为(适配器具有sql-parameter-source-factory
属性)。Spring Integration 提供了ExpressionEvaluatingSqlParameterSourceFactory
,它创建了一个基于 SpEL 的参数源,以查询的结果作为#root
对象。(如果update-per-row
为真,则根对象为行)。如果更新查询中多次出现相同的参数名称,则只计算一次,并缓存其结果。
您还可以将参数源用于选择查询。在这种情况下,由于没有要评估的“结果”对象,因此每次都使用单个参数源(而不是使用参数源工厂)。从 4.0 版本开始,您可以使用 Spring 创建基于 SpEL 的参数源,如以下示例所示:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
每个value
参数表达式中的 可以是任何有效的 SpEL 表达式。#root
表达式求值的对象是定义在parameterSource
bean 上的构造函数参数。它对于所有评估都是静态的(在前面的示例中,一个空的String
)。
从版本 5.0 开始,您可以提供ExpressionEvaluatingSqlParameterSourceFactory
用于sqlParameterTypes
指定特定参数的目标 SQL 类型。
以下示例为查询中使用的参数提供 SQL 类型:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
使用createParameterSourceNoCache 工厂方法。否则,参数源缓存评估结果。另请注意,由于禁用了缓存,如果相同的参数名称多次出现在选择查询中,则每次出现都会重新计算。
|
轮询和交易
入站适配器接受常规 Spring Integration 轮询器作为子元素。因此,可以控制轮询的频率(以及其他用途)。用于 JDBC 的轮询器的一个重要特性是将轮询操作包装在事务中的选项,如以下示例所示:
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
如果您没有明确指定轮询器,则使用默认值。与 Spring Integration 一样,它可以定义为顶级 bean)。 |
在前面的示例中,数据库每 1000 毫秒(或每秒一次)轮询一次,并且更新和选择查询都在同一个事务中执行。事务管理器配置未显示。但是,只要它知道数据源,轮询就是事务性的。一个常见的用例是下游通道是直接通道(默认),以便在同一个线程中调用端点,因此在同一个事务中。这样,如果其中任何一个失败,事务就会回滚,输入数据将恢复到其原始状态。
max-rows
相对max-messages-per-poll
JDBC 入站通道适配器定义了一个名为max-rows
. 当您指定适配器的轮询器时,您还可以定义一个名为max-messages-per-poll
. 虽然这两个属性看起来很相似,但它们的含义却截然不同。
max-messages-per-poll
指定每个轮询间隔执行查询的次数,而max-rows
指定每次执行返回的行数。
在正常情况下,您可能不想在max-messages-per-poll
使用 JDBC 入站通道适配器时设置轮询器的属性。它的默认值是1
,这意味着 JDBC 入站通道适配器的receive()
方法在每个轮询间隔中只执行一次。
将max-messages-per-poll
属性设置为更大的值意味着查询会连续多次执行。有关该max-messages-per-poll
属性的更多信息,请参阅配置入站通道适配器。
相反,max-rows
如果该属性大于0
,则指定该receive()
方法创建的查询结果集中要使用的最大行数。如果该属性设置为0
,则所有行都包含在结果消息中。该属性默认为0
.
建议通过供应商特定的查询选项使用结果集限制,例如 MySQLLIMIT 或 SQL ServerTOP 或 Oracle 的ROWNUM . 有关详细信息,请参阅特定的供应商文档。
|
出站通道适配器
出站通道适配器与入站相反:它的作用是处理消息并使用它来执行 SQL 查询。默认情况下,消息负载和标头可用作查询的输入参数,如以下示例所示:
<int-jdbc:outbound-channel-adapter
query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
data-source="dataSource"
channel="input"/>
在前面的示例中,到达标记为通道的消息input
具有映射的有效负载,其键为something
,因此[]
操作员从映射中取消引用该值。标题也作为映射访问。
前面查询中的参数是传入消息的 bean 属性表达式(不是 SpEL 表达式)。此行为是 的一部分SqlParameterSource ,它是出站适配器创建的默认源。您可以注入不同SqlParameterSourceFactory 的以获得不同的行为。
|
出站适配器需要对 aDataSource
或 a的引用JdbcTemplate
。您还可以注入 aSqlParameterSourceFactory
来控制每个传入消息与查询的绑定。
如果输入通道是直接通道,则出站适配器在同一线程中运行其查询,因此,与消息的发送者在同一事务(如果有)中运行。
使用 SpEL 表达式传递参数
大多数 JDBC 通道适配器的一个常见要求是将参数作为 SQL 查询或存储过程或函数的一部分传递。如前所述,这些参数默认是 bean 属性表达式,而不是 SpEL 表达式。但是,如果您需要将 SpEL 表达式作为参数传递,则必须显式注入一个SqlParameterSourceFactory
.
以下示例使用 aExpressionEvaluatingSqlParameterSourceFactory
来实现该要求:
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) \
values (:id, :payload, :createdDate)"
sql-parameter-source-factory="spelSource"/>
<bean id="spelSource"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="headers['id'].toString()"/>
<entry key="createdDate" value="new java.util.Date()"/>
<entry key="payload" value="payload"/>
</map>
</property>
</bean>
有关详细信息,请参阅定义参数源。
使用PreparedStatement
回调
有时,灵活性和松耦合SqlParameterSourceFactory
并不能满足我们对目标的需求,PreparedStatement
或者我们需要做一些低级的 JDBC 工作。Spring JDBC 模块提供 API 来配置执行环境(例如ConnectionCallback
或PreparedStatementCreator
)和操作参数值(例如SqlParameterSource
)。它甚至可以访问用于低级操作的 API,例如StatementCallback
.
从 Spring Integration 4.2 开始,MessagePreparedStatementSetter
允许在上下文中PreparedStatement
手动指定参数。requestMessage
这个类的作用与标准 Spring JDBC API 中的作用完全相同PreparedStatementSetter
。实际上,PreparedStatementSetter
当JdbcMessageHandler
在.execute
JdbcTemplate
此功能接口选项sqlParameterSourceFactory
与PreparedStatement
. requestMessage
例如,当我们需要以流的方式将File
数据存储到 DataBaseBLOB
列时,它很有用。以下示例显示了如何执行此操作:
@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
"INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
ps.setBlob(2, inputStream);
}
catch (Exception e) {
throw new MessageHandlingException(m, e);
}
ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
});
return jdbcMessageHandler;
}
从 XML 配置的角度来看,该prepared-statement-setter
属性在<int-jdbc:outbound-channel-adapter>
组件上可用。它允许您指定MessagePreparedStatementSetter
bean 引用。
批量更新
从版本 5.1 开始,如果请求消息的有效负载是实例,则JdbcMessageHandler
执行 a 。的每个元素都使用请求消息中的标头包装到 a中。在基于常规的配置的情况下,这些消息用于为上述函数中使用的参数构建一个。应用配置时,将使用一个变体来迭代每个项目的这些消息,并针对它们调用提供的消息。选择模式时不支持批量更新。JdbcOperations.batchUpdate()
Iterable
Iterable
Message
SqlParameterSourceFactory
SqlParameterSource[]
JdbcOperations.batchUpdate()
MessagePreparedStatementSetter
BatchPreparedStatementSetter
MessagePreparedStatementSetter
keysGenerated
出站网关
出站网关就像出站和入站适配器的组合:它的作用是处理消息并使用它来执行 SQL 查询,然后通过将结果发送到回复通道来响应结果。默认情况下,消息负载和标头可用作查询的输入参数,如以下示例所示:
<int-jdbc:outbound-gateway
update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource" />
前面示例的结果是向表中插入一条记录,并向输出通道mythings
返回一条消息,指示受影响的行数(有效负载是一个映射:)。{UPDATED=1}
如果更新查询是带有自动生成键的插入,您可以通过添加keys-generated="true"
到前面的示例来使用生成的键填充回复消息(这不是默认设置,因为某些数据库平台不支持它)。以下示例显示了更改后的配置:
<int-jdbc:outbound-gateway
update="insert into mythings (status, name) values (0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource"
keys-generated="true"/>
除了更新计数或生成的键之外,您还可以提供一个选择查询来执行并根据结果(例如入站适配器)生成回复消息,如以下示例所示:
<int-jdbc:outbound-gateway
update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
query="select * from foos where id=:headers[$id]"
request-channel="input" reply-channel="output" data-source="dataSource"/>
从 Spring Integration 2.2 开始,更新 SQL 查询不再是强制性的。您现在可以通过使用query
属性或query
元素仅提供选择查询。如果您需要使用通用网关或有效负载丰富器等主动检索数据,这将非常有用。然后从结果中生成回复消息(类似于入站适配器的工作方式)并传递给回复通道。以下示例显示使用该query
属性:
<int-jdbc:outbound-gateway
query="select * from foos where id=:headers[id]"
request-channel="input"
reply-channel="output"
data-source="dataSource"/>
默认情况下, |
与通道适配器一样,您还可以提供SqlParameterSourceFactory
请求和回复的实例。默认值与出站适配器相同,因此请求消息可用作表达式的根。如果keys-generated="true"
,则表达式的根是生成的键(如果只有一个则为映射,如果为多值则为映射列表)。
出站网关需要对 aDataSource
或 a的引用JdbcTemplate
。它还可以有一个SqlParameterSourceFactory
注入来控制传入消息与查询的绑定。
从 4.2 版开始,该request-prepared-statement-setter
属性可<int-jdbc:outbound-gateway>
用于request-sql-parameter-source-factory
. 它允许您指定一个MessagePreparedStatementSetter
bean 引用,它在执行之前实现更复杂的PreparedStatement
准备。
有关. _ _MessagePreparedStatementSetter
JDBC 消息存储
Spring Integration 提供了两个 JDBC 特定的消息存储实现。这JdbcMessageStore
适用于聚合器和声明检查模式。该JdbcChannelMessageStore
实现专门为消息通道提供了更有针对性和可扩展性的实现。
请注意,您可以使用 aJdbcMessageStore
来支持消息通道,JdbcChannelMessageStore
并为此目的进行了优化。
从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore 已经优化了 的索引。如果您在此类存储中有大型消息组,您可能希望更改索引。此外,索引 forPriorityChannel 已被注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。
|
使用 时,必须OracleChannelMessageStoreQueryProvider 添加优先通道索引,因为它包含在查询的提示中。
|
初始化数据库
在开始使用 JDBC 消息存储组件之前,您应该为目标数据库提供适当的对象。
Spring Integration 附带了一些可用于初始化数据库的示例脚本。在spring-integration-jdbc
JAR 文件中,您可以在org.springframework.integration.jdbc
包中找到脚本。它为一系列常见数据库平台提供了示例创建和示例删除脚本。使用这些脚本的一种常见方法是在Spring JDBC 数据源初始化程序中引用它们。请注意,这些脚本是作为示例提供的,并且作为所需表和列名称的规范提供。您可能会发现需要增强它们以供生产使用(例如,通过添加索引声明)。
通用 JDBC 消息存储
JDBC 模块提供了由数据库支持的 Spring Integration MessageStore
(在声明检查模式中MessageGroupStore
很重要)和(在有状态模式中很重要,例如聚合器)的实现。这两个接口都由 实现JdbcMessageStore
,并且支持在 XML 中配置 store 实例,如下例所示:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定 aJdbcTemplate
而不是 a DataSource
。
以下示例显示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前面的示例中,我们指定了一个LobHandler
用于将消息作为大对象处理(这对于 Oracle 来说通常是必需的),并为存储生成的查询中的表名指定了一个前缀。表名前缀默认为INT_
.
支持消息通道
如果您打算使用 JDBC 支持消息通道,我们建议使用该JdbcChannelMessageStore
实现。它仅与消息通道一起使用。
支持的数据库
使用JdbcChannelMessageStore
特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须ChannelMessageStoreQueryProvider
在JdbcChannelMessageStore
. 这channelMessageStoreQueryProvider
为您指定的特定数据库提供 SQL 查询。Spring Integration 提供对以下关系数据库的支持:
-
PostgreSQL
-
数据库
-
MySQL
-
甲骨文
-
德比
-
H2
-
SqlServer
-
赛贝斯
-
DB2
如果您的数据库未列出,您可以扩展AbstractChannelMessageStoreQueryProvider
该类并提供您自己的自定义查询。
4.0 版将该MESSAGE_SEQUENCE
列添加到表中,以确保即使消息存储在同一毫秒内,也可以进行先进先出 (FIFO) 排队。
自定义消息插入
从 5.0 版本开始,通过重载ChannelMessageStorePreparedStatementSetter
类,您可以在JdbcChannelMessageStore
. 您可以使用它来设置不同的列或更改表结构或序列化策略。例如byte[]
,您可以将其结构存储为 JSON 字符串,而不是默认序列化为 。
以下示例使用 的默认实现setValues
来存储公共列并覆盖将消息有效负载存储为的行为varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系数据库进行排队。相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。如需进一步参考,请参阅以下资源: |
并发轮询
轮询消息通道时,您可以选择配置Poller
与TaskExecutor
引用关联的消息。
但请记住,如果您使用 JDBC 支持的消息通道,并且您计划轮询通道并因此使用多个线程以事务方式轮询消息存储,则应确保使用支持多版本并发控制(MVCC) 的关系数据库。否则,锁定可能是一个问题,并且在使用多个线程时,性能可能无法按预期实现。例如,Apache Derby 在这方面存在问题。 为了获得更好的 JDBC 队列吞吐量并避免不同线程可能
|
优先频道
从 4.0 版开始,JdbcChannelMessageStore
实现PriorityCapableChannelMessageStore
并提供priorityEnabled
选项,让它用作实例的message-store
参考。priority-queue
为此,该INT_CHANNEL_MESSAGE
表有一个MESSAGE_PRIORITY
列来存储PRIORITY
消息头的值。此外,一个新MESSAGE_SEQUENCE
列使我们能够实现强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同优先级存储多条消息也是如此。使用 .从数据库中轮询(选择)消息order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
。
我们不建议JdbcChannelMessageStore 对优先级和非优先级队列通道使用相同的 bean,因为该priorityEnabled 选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。但是,同一个INT_CHANNEL_MESSAGE 表(甚至region )可以用于这两种JdbcChannelMessageStore 类型。要配置该场景,您可以从另一个扩展一个消息存储 bean,如以下示例所示:
|
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
对消息存储进行分区
通常将 aJdbcMessageStore
用作同一应用程序中的一组应用程序或节点的全局存储。为了提供一些防止名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。一种方法是使用单独的表名,通过更改前缀(如前所述)。另一种方法是为region
单个表中的数据分区指定一个名称。第二种方法的一个重要用例是当MessageStore
正在管理支持 Spring 集成消息通道的持久队列。持久通道的消息数据在通道名称的存储中键入。因此,如果通道名称不是全局唯一的,则通道可以拾取不适合它们的数据。为了避免这种危险,您可以使用消息存储region
为具有相同逻辑名称的不同物理通道保持数据分离。
存储过程
在某些情况下,纯 JDBC 支持是不够的。也许您处理遗留的关系数据库模式或者您有复杂的数据处理需求,但最终您必须使用存储过程或存储函数。从 Spring Integration 2.1 开始,我们提供了三个组件来执行存储过程或存储函数:
-
存储过程入站通道适配器
-
存储过程出站通道适配器
-
存储过程出站网关
支持的数据库
为了能够调用存储过程和存储函数,存储过程组件使用org.springframework.jdbc.core.simple.SimpleJdbcCall
该类。因此,完全支持以下数据库来执行存储过程:
-
阿帕奇德比
-
DB2
-
MySQL
-
微软 SQL 服务器
-
甲骨文
-
PostgreSQL
-
赛贝斯
如果要改为执行存储函数,则完全支持以下数据库:
-
MySQL
-
微软 SQL 服务器
-
甲骨文
-
PostgreSQL
即使您的特定数据库可能没有得到完全支持,但您仍然可以非常成功地使用存储过程 Spring Integration 组件,前提是您的 RDBMS 支持存储过程或存储函数。 事实上,一些提供的集成测试使用H2 数据库。尽管如此,彻底测试这些使用场景非常重要。 |
常用配置属性
所有存储过程组件共享某些配置参数:
-
auto-startup
:生命周期属性,指示是否应在应用程序上下文启动期间启动此组件。它默认为true
. 可选的。 -
data-source
: 引用 ajavax.sql.DataSource
,用于访问数据库。必需的。 -
id
: 标识底层 Spring bean 定义,它是一个EventDrivenConsumer
或的实例PollingConsumer
,取决于出站通道适配器的channel
属性是引用 aSubscribableChannel
还是 aPollableChannel
。可选的。 -
ignore-column-meta-data
:对于完全支持的数据库,底层SimpleJdbcCall
类可以自动从JDBC元数据中检索存储过程或存储函数的参数信息。但是,如果数据库不支持元数据查找或者您需要提供自定义参数定义,则可以将此标志设置为
true
。它默认为false
. 可选的。 -
is-function
: 如果true
,则调用 SQL 函数。在这种情况下,stored-procedure-name
orstored-procedure-name-expression
属性定义了被调用函数的名称。它默认为false
. 可选的。 -
stored-procedure-name
:此属性指定存储过程的名称。如果is-function
属性设置为true
,则此属性指定函数名称。stored-procedure-name-expression
必须指定此属性或。 -
stored-procedure-name-expression
: 该属性使用 SpEL 表达式指定存储过程的名称。通过使用 SpEL,您可以访问完整的消息(如果可用),包括其标头和有效负载。您可以使用此属性在运行时调用不同的存储过程。例如,您可以提供您希望作为消息头执行的存储过程名称。表达式必须解析为String
.如果该
is-function
属性设置为true
,则该属性指定一个存储函数。stored-procedure-name
必须指定此属性或。 -
jdbc-call-operations-cache-size
SimpleJdbcCallOperations
:定义缓存实例的最大数量。基本上,对于每个存储过程名称,SimpleJdbcCallOperations
都会创建一个新实例,作为回报,它会被缓存。Spring Integration 2.2 增加了 stored-procedure-name-expression
属性和jdbc-call-operations-cache-size
属性。默认缓存大小为
10
. 值0
禁用缓存。不允许使用负值。如果启用 JMX,有关的统计信息
jdbc-call-operations-cache
将作为 MBean 公开。有关详细信息,请参阅MBean 导出器。 -
sql-parameter-source-factory
: (不适用于存储过程入站通道适配器。)对SqlParameterSourceFactory
. 默认情况下,传入Message
有效负载的 bean 属性通过使用BeanPropertySqlParameterSourceFactory
.这对于基本用例可能就足够了。对于更复杂的选项,请考虑传入一个或多个
ProcedureParameter
值。请参阅定义参数源。可选的。 -
use-payload-as-parameter-source
: (不适用于存储过程入站通道适配器。)如果设置为true
,则 的有效负载Message
用作提供参数的源。但是,如果设置为false
,则整体Message
可用作参数的来源。如果没有传入过程参数,则此属性默认为
true
。这意味着,通过使用默认值BeanPropertySqlParameterSourceFactory
,有效负载的 bean 属性将用作存储过程或存储函数的参数值的来源。但是,如果传入过程参数,则此属性(默认情况下)的计算结果为
false
。ProcedureParameter
让我们提供 SpEL 表达式。因此,访问整个Message
. 该属性设置在基础StoredProcExecutor
. 可选的。
通用配置子元素
存储过程组件共享一组公共子元素,您可以使用它们来定义参数并将参数传递给存储过程或存储函数。可以使用以下元素:
-
parameter
-
returning-resultset
-
sql-parameter-definition
-
poller
-
parameter
:提供一种机制来提供存储过程参数。参数可以是静态的,也可以使用 SpEL 表达式提供。<int-jdbc:parameter name="" (1) type="" (2) value=""/> (3) <int-jdbc:parameter name="" expression=""/> (4)
+ <1> 要传递给存储过程或存储函数的参数的名称。必需的。<2> 该属性指定值的类型。如果未提供任何内容,则此属性默认为
java.lang.String
. 仅在使用该属性时才使用该value
属性。可选的。<3> 参数的值。您必须提供此属性或expression
属性。可选的。<4> 除了value
属性,您可以指定一个SpEL 表达式来传递参数的值。如果指定expression
,value
则不允许使用该属性。可选的。可选的。
-
returning-resultset
: 存储过程可能会返回多个结果集。通过设置一个或多个returning-resultset
元素,您可以指定RowMappers
将每个返回的对象转换ResultSet
为有意义的对象。可选的。<int-jdbc:returning-resultset name="" row-mapper="" />
-
sql-parameter-definition
:如果您使用完全受支持的数据库,您通常不必指定存储过程参数定义。相反,这些参数可以自动从 JDBC 元数据派生。但是,如果您使用不完全支持的数据库,则必须使用sql-parameter-definition
元素显式设置这些参数。ignore-column-meta-data
您还可以使用该属性选择关闭对通过 JDBC 获取的参数元数据信息的任何处理。<int-jdbc:sql-parameter-definition name="" (1) direction="IN" (2) type="STRING" (3) scale="5" (4) type-name="FOO_STRUCT" (5) return-type="fooSqlReturnType"/> (6)
1 指定 SQL 参数的名称。必需的。 2 指定 SQL 参数定义的方向。默认为 IN
. 有效值为:IN
、OUT
和INOUT
。如果您的过程正在返回结果集,请使用该returning-resultset
元素。可选的。3 用于此 SQL 参数定义的 SQL 类型。转换为由 定义的整数值 java.sql.Types
。或者,您也可以提供整数值。如果未明确设置此属性,则默认为“VARCHAR”。可选的。4 SQL 参数的小数位数。仅用于数字和十进制参数。可选的。 5 用于用户命名的类型, typeName
例如:STRUCT
、、、和命名数组类型。该属性与该属性互斥。可选的。DISTINCT
JAVA_OBJECT
scale
6 对复杂类型的自定义值处理程序的引用。的一个实现 SqlReturnType
。该属性与属性互斥,scale
仅适用于 OUT 和 INOUT 参数。可选的。 -
poller
:如果此端点是PollingConsumer
. 可选的。
定义参数源
参数源控制检索 Spring Integration 消息属性并将其映射到相关存储过程输入参数的技术。
存储过程组件遵循一定的规则。默认情况下,Message
有效负载的 bean 属性用作存储过程输入参数的来源。在这种情况下,BeanPropertySqlParameterSourceFactory
使用 a。这对于基本用例可能就足够了。下一个示例说明了该默认行为。
对于使用BeanPropertySqlParameterSourceFactory to 来“自动”查找 bean 属性,您的 bean 属性必须以小写形式定义。这是因为在org.springframework.jdbc.core.metadata.CallMetaDataContext (Java 方法是matchInParameterValuesWithCallParameters() )中,检索到的存储过程参数声明被转换为小写。因此,如果您有驼峰式 bean 属性(例如lastName ),则查找失败。在这种情况下,提供一个明确的ProcedureParameter .
|
假设我们有一个负载,它由一个简单的 bean 组成,它具有以下三个属性:id
、name
和description
。此外,我们有一个称为的简单存储过程INSERT_COFFEE
,它接受三个输入参数:id
、name
和description
。我们还使用完全支持的数据库。在这种情况下,存储过程出站适配器的以下配置就足够了:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
channel="insertCoffeeProcedureRequestChannel"
stored-procedure-name="INSERT_COFFEE"/>
对于更复杂的选项,请考虑传入一个或多个ProcedureParameter
值。
如果您ProcedureParameter
明确提供值,默认情况下,anExpressionEvaluatingSqlParameterSourceFactory
用于参数处理,以启用 SpEL 表达式的全部功能。
如果您需要对如何检索参数进行更多控制,请考虑SqlParameterSourceFactory
使用sql-parameter-source-factory
属性传入自定义实现。
存储过程入站通道适配器
以下清单列出了对存储过程入站通道适配器很重要的属性:
<int-jdbc:stored-proc-inbound-channel-adapter
channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
skip-undeclared-results="" (2)
return-value-required="false" (3)
<int:poller/>
<int-jdbc:sql-parameter-definition name="" direction="IN"
type="STRING"
scale=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 | 轮询消息发送到的通道。如果存储过程或函数不返回任何数据,则其有效负载Message 为空。必需的。 |
2 | 如果此属性设置为,则绕过true 存储过程调用中没有相应声明的所有结果。SqlOutParameter 例如,存储过程可以返回一个更新计数值,即使您的存储过程只声明了一个结果参数。确切的行为取决于数据库实现。该值是在底层证券上设置的JdbcTemplate 。该值默认为true 。可选的。 |
3 | 指示是否应包含此过程的返回值。从 Spring 集成 3.0 开始。可选的。 |
存储过程出站通道适配器
以下清单列出了对存储过程出站通道适配器很重要的属性:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
order="" (2)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int:poller fixed-rate=""/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name=""/>
</int-jdbc:stored-proc-outbound-channel-adapter>
1 | 此端点的接收消息通道。必需的。 |
2 | 指定此端点作为订阅者连接到通道时的调用顺序。当该通道使用failover 调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询消费者时,它不起作用。可选的。 |
存储过程出站网关
以下清单列出了对存储过程出站通道适配器很重要的属性:
<int-jdbc:stored-proc-outbound-gateway request-channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
order=""
reply-channel="" (2)
reply-timeout="" (3)
return-value-required="false" (4)
skip-undeclared-results="" (5)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
type=""
scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 | 此端点的接收消息通道。必需的。 |
2 | 收到数据库响应后应向其发送回复的消息通道。可选的。 |
3 | 让您指定此网关在引发异常之前等待成功发送回复消息的时间。请记住,当发送到 a 时DirectChannel ,调用发生在发送者的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。默认情况下,网关无限期等待。该值以毫秒为单位指定。可选的。 |
4 | 指示是否应包含此过程的返回值。可选的。 |
5 | 如果该skip-undeclared-results 属性设置为,则绕过true 存储过程调用中没有相应声明的所有结果。SqlOutParameter 例如,存储过程可能会返回一个更新计数值,即使您的存储过程只声明了一个结果参数。确切的行为取决于数据库。该值是在底层证券上设置的JdbcTemplate 。该值默认为true 。可选的。 |
例子
本节包含两个调用Apache Derby存储过程的示例。第一个过程调用一个存储过程,该过程返回一个ResultSet
. 通过使用 a RowMapper
,将数据转换为域对象,然后成为 Spring Integration 消息有效负载。
在第二个示例中,我们调用了一个使用输出参数返回数据的存储过程。
该项目包含此处引用的 Apache Derby 示例,以及有关如何运行它的说明。Spring Integration Samples 项目还提供了使用 Oracle 存储过程的示例。 |
在第一个示例中,我们调用了一个名为的存储过程FIND_ALL_COFFEE_BEVERAGES
,它没有定义任何输入参数,但返回一个ResultSet
.
在 Apache Derby 中,存储过程是用 Java 实现的。以下清单显示了方法签名:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
throws SQLException {
...
}
以下清单显示了相应的 SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
在 Spring Integration 中,您现在可以使用例如 a 来调用此存储过程stored-proc-outbound-gateway
,如以下示例所示:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
data-source="dataSource"
request-channel="findAllProcedureRequestChannel"
expect-single-result="true"
stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>
在第二个示例中,我们调用一个名为的存储过程FIND_COFFEE
,该过程具有一个输入参数。它不返回 a ResultSet
,而是使用输出参数。以下示例显示了方法签名:
public static void findCoffee(int coffeeId, String[] coffeeDescription)
throws SQLException {
...
}
以下清单显示了相应的 SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
在 Spring Integration 中,您现在可以使用例如 a 来调用此存储过程stored-proc-outbound-gateway
,如以下示例所示:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
data-source="dataSource"
request-channel="findCoffeeProcedureRequestChannel"
skip-undeclared-results="true"
stored-procedure-name="FIND_COFFEE"
expect-single-result="true">
<int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>
JDBC 锁注册表
4.3 版引入了JdbcLockRegistry
. 某些组件(例如,聚合器和重排序器)使用从LockRegistry
实例获得的锁来确保一次只有一个线程操作一个组。在DefaultLockRegistry
单个组件中执行此功能。您现在可以在这些组件上配置外部锁定注册表。与 shared 一起使用时MessageGroupStore
,您可以使用JdbcLockRegistry
跨多个应用程序实例提供此功能,这样一次只有一个实例可以操作该组。
当一个本地线程释放一个锁时,另一个本地线程通常可以立即获得该锁。如果锁由使用不同注册表实例的线程释放,则获取锁最多可能需要 100 毫秒。
JdbcLockRegistry
是基于LockRepository
抽象的,它有一个实现DefaultLockRepository
。数据库模式脚本位于org.springframework.integration.jdbc
包中,该包针对特定的 RDBMS 供应商进行了划分。例如,以下清单显示了锁表的 H2 DDL:
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
INT_
可以根据目标数据库设计要求进行更改。因此,您必须在bean 定义上使用prefix
属性。DefaultLockRepository
有时,一个应用程序已经进入这样一种状态,它无法释放分布式锁并删除数据库中的特定记录。为此,其他应用程序可以在下一次锁定调用时使此类死锁过期。为此提供了( timeToLive
TTL) 选项。DefaultLockRepository
您可能还想指定CLIENT_ID
为给定DefaultLockRepository
实例存储的锁。如果是这样,您可以将id
要与 关联的指定DefaultLockRepository
为构造函数参数。
从版本 5.1.8 开始,JdbcLockRegistry
可以使用 -a 配置idleBetweenTries
在Duration
锁定记录插入/更新执行之间休眠。默认情况下是100
毫秒,在某些环境中,非领导者过于频繁地污染与数据源的连接。
从 5.4 版本开始,该RenewableLockRegistry
接口已被引入并添加到JdbcLockRegistry
. 该renewLock()
方法必须在锁定过程中调用,以防被锁定的过程会比锁的生存时间长。因此,生存时间可以大大减少,部署可以快速重新获得丢失的锁。
只有当前线程持有锁,才能进行锁更新。 |
版本为 5.5.6 的字符串,支持通过JdbcLockRegistry
自动清理 JdbcLock 的缓存。有关更多信息,请参阅其 JavaDocs。JdbcLockRegistry.locks
JdbcLockRegistry.setCacheCapacity()
JDBC 元数据存储
5.0 版引入了 JDBC MetadataStore
(参见Metadata Store)实现。您可以使用JdbcMetadataStore
来维护跨应用程序重新启动的元数据状态。此MetadataStore
实现可与以下适配器一起使用:
要将这些适配器配置为使用JdbcMetadataStore
,请使用 bean 名称声明 Spring bean metadataStore
。Feed 入站通道适配器和 Feed 入站通道适配器都自动拾取并使用声明的JdbcMetadataStore
,如以下示例所示:
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
该org.springframework.integration.jdbc
软件包具有多个 RDMBS 供应商的数据库模式脚本。例如,以下清单显示了元数据表的 H2 DDL:
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
You can change the INT_
prefix to match the target database design requirements.
You can also configure JdbcMetadataStore
to use the custom prefix.
The JdbcMetadataStore
implements ConcurrentMetadataStore
, letting it be reliably shared across multiple application instances, where only one instance can store or modify a key’s value.
All of these operations are atomic, thanks to transaction guarantees.
Transaction management must use JdbcMetadataStore
.
Inbound channel adapters can be supplied with a reference to the TransactionManager
in the poller configuration.
Unlike non-transactional MetadataStore
implementations, with JdbcMetadataStore
, the entry appears in the target table only after the transaction commits.
When a rollback occurs, no entries are added to the INT_METADATA_STORE
table.
从版本 5.0.7 开始,您可以JdbcMetadataStore
使用 RDBMS 供应商特定lockHint
选项配置元数据存储条目上基于锁的查询。默认情况下,FOR UPDATE
如果目标数据库不支持行锁定功能,它可以配置为空字符串。SELECT
请咨询您的供应商,以了解表达式中更新前锁定行的特定和可能提示。