关键的 GemFire 和 Apache Geode 支持
Spring Integration 提供对 Pivotal GemFire 和 Apache Geode 的支持。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-gemfire</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-gemfire:5.5.13"
GemFire 是一个分布式数据管理平台,提供键值数据网格以及高级分布式系统功能,例如事件处理、连续查询和远程函数执行。本指南假定您熟悉商业Pivotal GemFire或开源Apache Geode。
Spring 集成通过实现入口和连续查询事件的入站适配器、将条目写入缓存的出站适配器以及消息和元数据存储和GemfireLockRegistry
实现来为 GemFire 提供支持。Spring 集成利用Spring Data for Pivotal GemFire项目,为其组件提供了一个瘦包装器。
从 5.1 版开始,Spring Integration GemFire 模块默认使用Spring Data for Apache Geode传递依赖。要为 Pivotal GemFire 切换到基于商业 Pivotal GemFire 的 Spring Data,请spring-data-geode
从依赖项中排除并添加spring-data-gemfire
,如以下 Maven 片段所示:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-gemfire</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-geode</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-gemfire</artifactId>
</dependency>
要配置“int-gfe”命名空间,请在 XML 配置文件的标头中包含以下元素:
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"
入站通道适配器
当被 GemFire 触发时,入站通道适配器在通道上生成消息EntryEvent
。CREATED
每当条目是、UPDATED
、DESTROYED
或INVALIDATED
在相关区域中时, GemFire 都会生成事件。入站通道适配器允许您过滤这些事件的子集。例如,您可能只想生成消息以响应正在创建的条目。此外,入站通道适配器可以评估 SpEL 表达式,例如,如果您希望消息负载包含事件属性(例如新条目值)。以下示例显示如何使用 SpEL 语言(在expression
属性中)配置入站通道适配器:
<gfe:cache/>
<gfe:replicated-region id="region"/>
<int-gfe:inbound-channel-adapter id="inputChannel" region="region"
cache-events="CREATED" expression="newValue"/>
前面的配置创建了一个 GemFire Cache
,并Region
使用 Spring GemFire 的“gfe”命名空间。该inbound-channel-adapter
元素需要对适配器侦听事件的 GemFire 区域的引用。可选属性包括cache-events
,它可以包含在输入通道上为其生成消息的事件类型的逗号分隔列表。默认情况下,CREATED
和UPDATED
已启用。如果未channel
提供任何属性,则从该id
属性创建通道。此适配器还支持error-channel
. GemFireEntryEvent
是评估的#root
对象。expression
以下示例显示了替换键值的表达式:
expression="new something.MyEvent(key, oldValue, newValue)"
如果expression
未提供该属性,则消息负载是 GemFireEntryEvent
本身。
此适配器符合 Spring Integration 约定。 |
连续查询入站通道适配器
CqEvent
当被 GemFire 连续查询或事件触发时,连续查询入站通道适配器会在通道上生成消息。在 release1.1
中,Spring Data 引入了持续查询支持,包括ContinuousQueryListenerContainer
,它提供了对 GemFire 原生 API 的一个很好的抽象。这个适配器需要一个ContinuousQueryListenerContainer
实例的引用,为给定的创建一个监听器query
,并执行查询。连续查询充当事件源,只要其结果集更改状态就会触发。
GemFire 查询是用 OQL 编写的,并作用于整个缓存(不仅仅是一个区域)。此外,连续查询需要远程(即在单独的进程或远程主机中运行)缓存服务器。有关实现连续查询的更多信息, 请参阅GemFire 文档。 |
下面的配置创建了一个 GemFire 客户端缓存(回想一下,这个实现需要一个远程缓存服务器,并且它的地址被配置为池的子元素)、一个客户端区域和一个ContinuousQueryListenerContainer
使用 Spring Data 的:
<gfe:client-cache id="client-cache" pool-name="client-pool"/>
<gfe:pool id="client-pool" subscription-enabled="true" >
<!--configure server or locator here required to address the cache server -->
</gfe:pool>
<gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/>
<gfe:cq-listener-container id="queryListenerContainer" cache="client-cache"
pool-name="client-pool"/>
<int-gfe:cq-inbound-channel-adapter id="inputChannel"
cq-listener-container="queryListenerContainer"
query="select * from /test"/>
连续查询入站通道适配器需要一个cq-listener-container
属性,该属性必须包含对ContinuousQueryListenerContainer
. 可选地,它接受一个expression
属性,该属性使用 SpEL 根据需要转换CqEvent
或提取单个属性。cq-inbound-channel-adapter
提供了一个属性,该query-events
属性包含在输入通道上为其生成消息的事件类型的逗号分隔列表。可用的事件类型有CREATED
、UPDATED
、DESTROYED
、REGION_DESTROYED
和REGION_INVALIDATED
。默认情况下,CREATED
和UPDATED
已启用。其他可选属性包括query-name
(提供可选的查询名称)、expression
(如上一节所述)和durable
(一个布尔值,指示查询是否持久 - 默认情况下为 false)。如果您不提供channel
,则从该id
属性创建通道。此适配器还支持error-channel
.
此适配器符合 Spring Integration 约定。 |
出站通道适配器
出站通道适配器写入从消息有效负载映射的缓存条目。在最简单的形式中,它需要一个类型的有效负载java.util.Map
并将映射条目放入其配置的区域。以下示例显示了如何配置出站通道适配器:
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>
给定上述配置,如果有效负载不是Map
. 此外,您可以配置出站通道适配器以使用 SpEL 创建缓存条目映射。以下示例显示了如何执行此操作:
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region">
<int-gfe:cache-entries>
<entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
<entry key="'thing1'" value="'thing2'"/>
</int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>
在前面的配置中,内部元素 ( cache-entries
) 在语义上等同于 Spring 'map' 元素。适配器将key
和value
属性解释为 SpEL 表达式,并将消息作为评估上下文。请注意,这可以包含任意缓存条目(不仅是从消息派生的那些),并且文字值必须用单引号引起来。在前面的示例中,如果发送到的消息cacheChannel
具有String
值为 的有效负载,则会在缓存区域中写入(创建或更新)Hello
两个条目 ( )。[HELLO:hello, thing1:thing2]
此适配器还支持该order
属性,如果它绑定到一个PublishSubscribeChannel
.
Gemfire 消息存储
如 EIP 中所述,消息存储可让您持久化消息。如果可靠性是一个问题,这在处理能够缓冲消息(QueueChannel
、Aggregator
、和其他)的组件时很有用。Resequencer
在 Spring Integration 中,策略接口还为声明检查MessageStore
模式提供了基础,这也在 EIP 中进行了描述。
Spring Integration 的 Gemfire 模块提供GemfireMessageStore
了MessageStore
策略(主要由QueueChannel
andClaimCheck
模式使用)和MessageGroupStore
策略(主要由Aggregator
andResequencer
模式使用)的实现。
以下示例使用spring-gemfire
命名空间(不要与命名空间混淆spring-integration-gemfire
)配置缓存和区域:
<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore">
<constructor-arg ref="myRegion"/>
</bean>
<gfe:cache/>
<gfe:replicated-region id="myRegion"/>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="gemfireMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="gemfireMessageStore"/>
通常,希望消息存储在客户端-服务器配置中的一个或多个远程缓存服务器中维护。在这种情况下,您应该配置客户端缓存、客户端区域和客户端池,并将该区域注入MessageStore
. 以下示例显示了如何执行此操作:
<bean id="gemfireMessageStore"
class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<constructor-arg ref="myRegion"/>
</bean>
<gfe:client-cache/>
<gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/>
<gfe:pool id="messageStorePool">
<gfe:server host="localhost" port="40404" />
</gfe:pool>
请注意,该pool
元素配置有缓存服务器的地址(您可以在此处替换定位器)。该区域被配置为“代理”,因此没有数据存储在本地。该区域id
对应于缓存服务器中同名的区域。
从版本 4.3.12 开始,GemfireMessageStore
支持 keyprefix
选项以允许区分同一 GemFire 区域上的商店实例。
Gemfire 锁注册表
从 4.0 版开始,GemfireLockRegistry
可以使用。某些组件(例如,聚合器和重定序器)使用从LockRegistry
实例获得的锁来确保在任何给定时间只有一个线程在操作组。在DefaultLockRegistry
单个组件中执行此功能。您现在可以在这些组件上配置外部锁定注册表。当您使用 shared MessageGroupStore
with 时GemfireLockRegistry
,它可以跨多个应用程序实例提供此功能,这样一次只有一个实例可以操作该组。
其中一个GemfireLockRegistry 构造函数需要 aRegion 作为参数。它用于Lock 从getDistributedLock() 方法中获取 a。此操作GLOBAL 需要Region . 另一个构造函数需要 a Cache ,并且Region 使用GLOBAL 范围和名称创建 ,LockRegistry 。
|
Gemfire 元数据存储
4.0 版引入了一个新的基于 Gemfire MetadataStore
(元数据存储)的实现。您可以使用GemfireMetadataStore
来维护跨应用程序重新启动的元数据状态。这个新的MetadataStore
实现可以与适配器一起使用,例如:
为了让这些适配器使用新GemfireMetadataStore
的 . 声明一个 bean 名称为metadataStore
. 提要入站通道适配器自动拾取并使用声明的GemfireMetadataStore
.
GemfireMetadataStore 还实现ConcurrentMetadataStore 了 ,让它在多个应用程序实例之间可靠地共享,其中只有一个实例可以存储或修改键的值
。这些方法根据区域的范围和数据策略提供不同级别的并发保证。它们在对等缓存和客户端-服务器缓存中实现,但在具有NORMAL 或EMPTY 数据策略的对等区域中是不允许的。
|
从 5.0 版开始,GemfireMetadataStore 还实现了ListenableMetadataStore ,它允许您通过向 store 提供MetadataStoreListener 实例来监听缓存事件,如以下示例所示:
|
GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
metadataStore.addListener(new MetadataStoreListenerAdapter() {
@Override
public void onAdd(String key, String value) {
...
}
});