MongoDb 支持
2.1 版引入了对MongoDB的支持:“高性能、开源、面向文档的数据库”。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:5.5.13"
要下载、安装和运行 MongoDB,请参阅MongoDB 文档。
连接到 MongoDb
阻塞还是反应?
从 5.3 版开始,Spring Integration 提供对响应式 MongoDB 驱动程序的支持,以在访问 MongoDB 时启用非阻塞 I/O。要启用响应式支持,请将 MongoDB 响应式流驱动程序添加到您的依赖项中:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
对于常规同步客户端,您需要将其各自的驱动程序添加到依赖项中:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
它们都optional
在框架中,以提供更好的最终用户选择支持。
要开始与 MongoDB 交互,您首先需要连接到它。Spring Integration 建立在另一个 Spring 项目Spring Data MongoDB提供的支持之上。MongoDatabaseFactory
它提供了名为and的工厂类ReactiveMongoDatabaseFactory
,简化了与 MongoDB 客户端 API 的集成。
Spring Data 默认提供阻塞 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择响应式使用。 |
使用MongoDatabaseFactory
要连接到 MongoDB,您可以使用MongoDatabaseFactory
接口的实现。
下面的例子展示了如何使用SimpleMongoClientDatabaseFactory
:
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory
有两个参数:一个MongoClient
实例和一个String
指定数据库名称的。如果需要配置 、 等属性host
,port
可以使用底层MongoClients
类提供的构造函数之一来传递这些属性。有关如何配置 MongoDB 的更多信息,请参阅Spring-Data-MongoDB参考。
使用ReactiveMongoDatabaseFactory
要使用响应式驱动程序连接到 MongoDB,您可以使用ReactiveMongoDatabaseFactory
接口的实现。
下面的例子展示了如何使用SimpleReactiveMongoDatabaseFactory
:
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 消息存储
如企业集成模式(EIP) 书中所述,消息存储可让您持久化消息。如果需要考虑可靠性,那么在处理能够缓冲消息(QueueChannel
、aggregator
、resequencer
等)的组件时,这样做会很有用。在 Spring Integration 中,该策略还为声明检查MessageStore
模式提供了基础,这也在 EIP 中进行了描述。
Spring Integration 的 MongoDB 模块提供MongoDbMessageStore
,它是MessageStore
策略(主要由声明检查模式使用)和MessageGroupStore
策略(主要由聚合器和重排序器模式使用)的实现。
以下示例将 a 配置MongoDbMessageStore
为使用 aQueueChannel
和 an aggregator
:
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
前面的示例是一个简单的 bean 配置,它需要 aMongoDbFactory
作为构造函数参数。
使用 Spring Data Mongo 映射机制将其MongoDbMessageStore
扩展Message
为具有所有嵌套属性的 Mongo 文档。当您需要访问payload
或headers
进行审计或分析时,它很有用——例如,针对存储的消息。
使用MongoDbMessageStore 自定义MappingMongoConverter 实现将Message 实例存储为 MongoDB 文档,payload 并且.
header Message |
从版本 5.1.6 开始,MongoDbMessageStore
可以使用自定义转换器进行配置,这些转换器会传播到内部MappingMongoConverter
实现中。有关更多信息,请参阅MongoDbMessageStore.setCustomConverters(Object… customConverters)
JavaDocs。
Spring Integration 3.0 引入了ConfigurableMongoDbMessageStore
. 它实现了MessageStore
和MessageGroupStore
接口。此类可以接收作为构造函数参数的 a MongoTemplate
,例如,您可以使用它配置一个 custom WriteConcern
。另一个构造函数需要 aMappingMongoConverter
和 a MongoDbFactory
,它允许您为Message
实例及其属性提供一些自定义转换。请注意,默认情况下,使用标准 Java 序列化向MongoDBConfigurableMongoDbMessageStore
写入和读取实例(请参阅 参考资料),并依赖. 它从提供的和构建一个。存储的集合的默认名称是Message
MongoDbMessageBytesConverter
MongoTemplate
MongoTemplate
MongoDbFactory
MappingMongoConverter
ConfigurableMongoDbMessageStore
configurableStoreMessages
. 当消息包含复杂的数据类型时,我们建议使用此实现来创建强大而灵活的解决方案。
MongoDB 通道消息存储
4.0 版引入了新的MongoDbChannelMessageStore
. 它针对MessageGroupStore
在QueueChannel
实例中的使用进行了优化。使用priorityEnabled = true
,您可以在<int:priority-queue>
实例中使用它来实现对持久消息的优先顺序轮询。优先级 MongoDB 文档字段由IntegrationMessageHeaderAccessor.PRIORITY
( priority
) 消息头填充。
此外,所有 MongoDBMessageStore
实例现在都有一个文档sequence
字段。MessageGroup
该sequence
值是对同一集合$inc
中的简单sequence
文档进行操作的结果,该集合是按需创建的。当消息在同一毫秒内存储时,该sequence
字段在poll
操作中用于提供先进先出 (FIFO) 消息顺序(在优先级内,如果已配置)。
我们不建议MongoDbChannelMessageStore 对优先级和非优先级使用相同的 bean,因为该priorityEnabled 选项适用于整个商店。但是,collection 这两种类型都可以使用相同的方法MongoDbChannelMessageStore ,因为来自存储的消息轮询已排序并使用索引。要配置该场景,您可以从另一个扩展一个消息存储 bean,如以下示例所示:
|
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
MongoDB 元数据存储
Spring Integration 4.2 引入了一个新的基于 MongoDB MetadataStore
(参见Metadata Store)的实现。您可以使用MongoDbMetadataStore
来维护跨应用程序重新启动的元数据状态。您可以将此新MetadataStore
实现与适配器一起使用,例如:
要指示这些适配器使用新的MongoDbMetadataStore
,请声明一个带有 bean 名称的 Spring bean metadataStore
。提要入站通道适配器自动拾取并使用声明的MongoDbMetadataStore
. 下面的例子展示了如何声明一个名称为 的 bean metadataStore
:
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore
还实现ConcurrentMetadataStore
了 ,让它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。由于 MongoDB 的保证,所有这些操作都是原子的。
MongoDB 入站通道适配器
MongoDB 入站通道适配器是一个轮询消费者,它从 MongoDB 读取数据并将其作为Message
有效负载发送。以下示例显示了如何配置 MongoDB 入站通道适配器:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如前面的配置所示,您可以通过使用inbound-channel-adapter
元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:
-
query
:一个 JSON 查询(参见MongoDB 查询) -
query-expression
: 一个 SpEL 表达式,它被评估为 JSON 查询字符串(如query
上面的属性)或o.s.data.mongodb.core.query.Query
.query
与属性互斥。 -
entity-class
:负载对象的类型。如果未提供,com.mongodb.DBObject
则返回 a。 -
collection-name
或collection-name-expression
:标识要使用的 MongoDB 集合的名称。 -
mongodb-factory
: 引用一个实例o.s.data.mongodb.MongoDbFactory
-
mongo-template
: 引用一个实例o.s.data.mongodb.core.MongoTemplate
-
所有其他入站适配器共有的其他属性(例如“通道”)。
您不能同时设置mongo-template 和mongodb-factory 。
|
前面的示例相对简单且静态,因为它有一个文字值,query
并且使用默认名称 a collection
。有时,您可能需要在运行时根据某些条件更改这些值。为此,请使用它们的-expression
等价物 (query-expression
和collection-name-expression
),其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,您可能希望对从 MongoDB 读取的成功处理的数据进行一些后处理。例如; 您可能希望在处理完文档后移动或删除它。您可以使用 Spring Integration 2.2 添加的事务同步功能来做到这一点,如以下示例所示:
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下示例显示了DocumentCleaner
前面示例中引用的内容:
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?>){
List<?> documents = (List<?>) target;
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
您可以使用该transactional
元素将您的轮询器声明为事务性的。该元素可以引用一个真正的事务管理器(例如,如果您的流程的其他部分调用 JDBC)。如果您没有“真正的”事务,则可以使用 的实例o.s.i.transaction.PseudoTransactionManager
,它是 Spring 的一个实现,PlatformTransactionManager
并且在没有实际事务时可以使用 Mongo 适配器的事务同步特性。
这样做不会使 MongoDB 本身具有事务性。它允许在成功(提交)或失败(回滚)之前或之后执行操作的同步。 |
o.s.i.transaction.TransactionSynchronizationFactory
一旦您的轮询器是事务性的,您就可以在transactional
元素上设置一个实例。ATransactionSynchronizationFactory
创建 的实例TransactionSynchronization
。为了您的方便,我们公开了一个默认的基于 SpEL 的方法TransactionSynchronizationFactory
,允许您配置 SpEL 表达式,它们的执行与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 事件的表达式,以及用于发送评估结果(如果有)的每个事件的通道。对于每个子元素,您可以指定expression
和channel
属性。如果仅存在channel
属性,则将接收到的消息作为特定同步场景的一部分发送到那里。如果只有expression
如果存在属性并且表达式的结果是非空值,则生成带有结果作为有效负载的消息并将其发送到默认通道 ( NullChannel
) 并出现在日志中(在DEBUG
级别上)。如果您希望评估结果转到特定渠道,请添加一个channel
属性。如果表达式的结果为 null 或 void,则不会生成任何消息。
有关事务同步的更多信息,请参阅事务同步。
从版本 5.5 开始,MongoDbMessageSource
可以使用 配置updateExpression
,它必须String
使用 MongoDbupdate
语法评估为 a 或org.springframework.data.mongodb.core.query.Update
实例。它可以用作上述后处理过程的替代方法,它会修改从集合中获取的那些实体,因此它们不会在下一个轮询周期再次从集合中拉出(假设更新更改了一些使用的值在查询中)。MongoDbMessageSource
当集群中使用同一个集合的多个实例时,仍然建议使用事务来实现执行隔离和数据一致性。
MongoDB 更改流入站通道适配器
从 5.3 版开始,该spring-integration-mongodb
模块引入了Spring Data APIMongoDbChangeStreamMessageProducer
的响应式MessageProducerSupport
实现。ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)
默认情况下,此组件生成 a Flux
of 消息,其中 a body
ofChangeStreamEvent
作为有效负载和一些更改流相关的标头(请参阅 参考资料MongoHeaders
)。建议将此MongoDbChangeStreamMessageProducer
与FluxMessageChannel
作为outputChannel
按需订阅和下游事件消费的组合。
此通道适配器的 Java DSL 配置可能如下所示:
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlows.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
当MongoDbChangeStreamMessageProducer
停止,或下游取消订阅,或 MongoDb 更改流产生OperationType.INVALIDATE
时,Publisher
完成。可以再次启动通道适配器,并Publisher
创建一个新的源数据,并在MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)
. 如果需要使用来自其他地方的更改流事件,则可以重新配置此通道适配器以在启动之间使用新选项。
在 Spring Data MongoDb文档中查看有关更改流支持的更多信息。
MongoDB 出站通道适配器
MongoDB 出站通道适配器允许您将消息负载写入 MongoDB 文档存储,如以下示例所示:
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如前面的配置所示,您可以使用该outbound-channel-adapter
元素配置 MongoDB 出站通道适配器,为各种属性提供值,例如:
-
collection-name
orcollection-name-expression
:标识要使用的 MongoDb 集合的名称。 -
mongo-converter
o.s.data.mongodb.core.convert.MongoConverter
:对有助于将原始 Java 对象转换为 JSON 文档表示的实例的引用。 -
mongodb-factory
: 引用o.s.data.mongodb.MongoDbFactory
. -
mongo-template
: 引用o.s.data.mongodb.core.MongoTemplate
. 注意:您不能同时设置 mongo-template 和 mongodb-factory。 -
所有其他入站适配器共有的其他属性(例如“通道”)。
前面的示例相对简单且静态,因为它具有collection-name
. 有时,您可能需要在运行时根据某些条件更改此值。为此,请使用collection-name-expression
,其中提供的表达式是任何有效的 SpEL 表达式。
MongoDB 出站网关
5.0 版引入了 MongoDB 出站网关。它允许您通过向其请求通道发送消息来查询数据库。然后网关将响应发送到回复通道。您可以使用消息负载和标头来指定查询和集合名称,如以下示例所示:
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory;
@Autowired
lateinit var mongoConverter: MongoConverter;
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以将以下属性与 MongoDB 出站网关一起使用:
-
collection-name
或collection-name-expression
:标识要使用的 MongoDB 集合的名称。 -
mongo-converter
o.s.data.mongodb.core.convert.MongoConverter
:对有助于将原始 Java 对象转换为 JSON 文档表示的实例的引用。 -
mongodb-factory
: 引用o.s.data.mongodb.MongoDbFactory
. -
mongo-template
: 引用o.s.data.mongodb.core.MongoTemplate
. 注意:您不能同时设置mongo-template
和mongodb-factory
。 -
entity-class
:要传递给MongoTemplate中的find(..)
and方法的实体类的完全限定名称。findOne(..)
如果未提供此属性,则默认值为org.bson.Document
。 -
query
或query-expression
:指定 MongoDB 查询。有关更多查询示例,请参阅MongoDB 文档。 -
collection-callback
: 引用org.springframework.data.mongodb.core.CollectionCallback
. 最好是o.s.i.mongodb.outbound.MessageCollectionCallback
自 5.0.11 起带有请求消息上下文的实例。有关更多信息,请参阅其 Javadocs。注意:您不能同时拥有collection-callback
任何查询属性。
作为query
和属性的替代,您可以通过使用该属性作为对功能接口实现的引用来query-expression
指定其他数据库操作。以下示例指定计数操作:collectionCallback
MessageCollectionCallback
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 反应式通道适配器
从 5.3 版开始,提供了ReactiveMongoDbStoringMessageHandler
和ReactiveMongoDbMessageSource
实现。它们基于ReactiveMongoOperations
Spring Data 并需要org.mongodb:mongodb-driver-reactivestreams
依赖项。
当集成流定义中涉及反应流组合时,它是框架中本机支持的ReactiveMongoDbStoringMessageHandler
实现。ReactiveMessageHandler
在ReactiveMessageHandler中查看更多信息。
从配置的角度来看,与许多其他标准通道适配器没有区别。例如,对于 Java DSL,这样的通道适配器可以像这样使用:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在这个示例中,我们将通过提供连接到 MongoDb ReactiveMongoDatabaseFactory
,并将请求消息中的数据存储到具有data
名称的默认集合中。真正的操作将根据内部创建的反应流组合按需执行ReactiveStreamsConsumer
。
这ReactiveMongoDbMessageSource
是AbstractMessageSource
基于提供的ReactiveMongoDatabaseFactory
orReactiveMongoOperations
和 MongoDb 查询(或表达式)的实现,根据具有预期类型的选项调用find()
或findOne()
操作以转换查询结果。当(或根据选项)在生成的消息的有效负载中被订阅时,将按需执行查询执行和结果评估。框架可以在拆分器时自动(本质上)订阅这样的有效负载并在下游使用。否则,订阅下游端点中的轮询发布者是目标应用程序的责任。expectSingleResult
entityClass
Publisher
Flux
Mono
expectSingleResult
flatMap
FluxMessageChannel
使用 Java DSL,这样的通道适配器可以配置为:
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlows
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
从 5.5 版开始,ReactiveMongoDbMessageSource
可以使用updateExpression
. 它具有与阻塞相同的功能MongoDbMessageSource
。有关更多信息,请参阅MongoDB 入站通道适配器和AbstractMongoDbMessageSourceSpec
JavaDocs。