关键的 GemFire 和 Apache Geode 支持

Spring Integration 提供对 Pivotal GemFire 和 Apache Geode 的支持。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-gemfire</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-gemfire:5.5.13"

GemFire 是一个分布式数据管理平台,提供键值数据网格以及高级分布式系统功能,例如事件处理、连续查询和远程函数执行。本指南假定您熟悉商业Pivotal GemFire或开源Apache Geode

Spring 集成通过实现入口和连续查询事件的入站适配器、将条目写入缓存的出站适配器以及消息和元数据存储和GemfireLockRegistry实现来为 GemFire 提供支持。Spring 集成利用Spring Data for Pivotal GemFire项目,为其组件提供了一个瘦包装器。

从 5.1 版开始,Spring Integration GemFire 模块默认使用Spring Data for Apache Geode传递依赖。要为 Pivotal GemFire 切换到基于商业 Pivotal GemFire 的 Spring Data,请spring-data-geode从依赖项中排除并添加spring-data-gemfire,如以下 Maven 片段所示:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-gemfire</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-geode</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-gemfire</artifactId>
</dependency>

要配置“int-gfe”命名空间,请在 XML 配置文件的标头中包含以下元素:

xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
	https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"

入站通道适配器

当被 GemFire 触发时,入站通道适配器在通道上生成消息EntryEventCREATED每当条目是、UPDATEDDESTROYEDINVALIDATED在相关区域中时, GemFire 都会生成事件。入站通道适配器允许您过滤这些事件的子集。例如,您可能只想生成消息以响应正在创建的条目。此外,入站通道适配器可以评估 SpEL 表达式,例如,如果您希望消息负载包含事件属性(例如新条目值)。以下示例显示如何使用 SpEL 语言(在expression属性中)配置入站通道适配器:

<gfe:cache/>
<gfe:replicated-region id="region"/>
<int-gfe:inbound-channel-adapter id="inputChannel" region="region"
    cache-events="CREATED" expression="newValue"/>

前面的配置创建了一个 GemFire Cache,并Region使用 Spring GemFire 的“gfe”命名空间。该inbound-channel-adapter元素需要对适配器侦听事件的 GemFire 区域的引用。可选属性包括cache-events,它可以包含在输入通道上为其生成消息的事件类型的逗号分隔列表。默认情况下,CREATEDUPDATED已启用。如果未channel提供任何属性,则从该id属性创建通道。此适配器还支持error-channel. GemFireEntryEvent是评估的#root对象。expression以下示例显示了替换键值的表达式:

expression="new something.MyEvent(key, oldValue, newValue)"

如果expression未提供该属性,则消息负载是 GemFireEntryEvent本身。

此适配器符合 Spring Integration 约定。

连续查询入站通道适配器

CqEvent当被 GemFire 连续查询或事件触发时,连续查询入站通道适配器会在通道上生成消息。在 release1.1中,Spring Data 引入了持续查询支持,包括ContinuousQueryListenerContainer,它提供了对 GemFire 原生 API 的一个很好的抽象。这个适配器需要一个ContinuousQueryListenerContainer实例的引用,为给定的创建一个监听器query,并执行查询。连续查询充当事件源,只要其结果集更改状态就会触发。

GemFire 查询是用 OQL 编写的,并作用于整个缓存(不仅仅是一个区域)。此外,连续查询需要远程(即在单独的进程或远程主机中运行)缓存服务器。有关实现连续查询的更多信息, 请参阅GemFire 文档。

下面的配置创建了一个 GemFire 客户端缓存(回想一下,这个实现需要一个远程缓存服务器,并且它的地址被配置为池的子元素)、一个客户端区域和一个ContinuousQueryListenerContainer使用 Spring Data 的:

<gfe:client-cache id="client-cache" pool-name="client-pool"/>

<gfe:pool id="client-pool" subscription-enabled="true" >
    <!--configure server or locator here required to address the cache server -->
</gfe:pool>

<gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/>

<gfe:cq-listener-container id="queryListenerContainer" cache="client-cache"
    pool-name="client-pool"/>

<int-gfe:cq-inbound-channel-adapter id="inputChannel"
    cq-listener-container="queryListenerContainer"
    query="select * from /test"/>

连续查询入站通道适配器需要一个cq-listener-container属性,该属性必须包含对ContinuousQueryListenerContainer. 可选地,它接受一个expression属性,该属性使用 SpEL 根据需要转换CqEvent或提取单个属性。cq-inbound-channel-adapter提供了一个属性,该query-events属性包含在输入通道上为其生成消息的事件类型的逗号分隔列表。可用的事件类型有CREATEDUPDATEDDESTROYEDREGION_DESTROYEDREGION_INVALIDATED。默认情况下,CREATEDUPDATED已启用。其他可选属性包括query-name(提供可选的查询名称)、expression(如上一节所述)和durable(一个布尔值,指示查询是否持久 - 默认情况下为 false)。如果您不提供channel,则从该id属性创建通道。此适配器还支持error-channel.

此适配器符合 Spring Integration 约定。

出站通道适配器

出站通道适配器写入从消息有效负载映射的缓存条目。在最简单的形式中,它需要一个类型的有效负载java.util.Map并将映射条目放入其配置的区域。以下示例显示了如何配置出站通道适配器:

<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>

给定上述配置,如果有效负载不是Map. 此外,您可以配置出站通道适配器以使用 SpEL 创建缓存条目映射。以下示例显示了如何执行此操作:

<int-gfe:outbound-channel-adapter id="cacheChannel" region="region">
    <int-gfe:cache-entries>
        <entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
        <entry key="'thing1'" value="'thing2'"/>
    </int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>

在前面的配置中,内部元素 ( cache-entries) 在语义上等同于 Spring 'map' 元素。适配器将keyvalue属性解释为 SpEL 表达式,并将消息作为评估上下文。请注意,这可以包含任意缓存条目(不仅是从消息派生的那些),并且文字值必须用单引号引起来。在前面的示例中,如果发送到的消息cacheChannel具有String值为 的有效负载,则会在缓存区域中写入(创建或更新)Hello两个条目 ( )。[HELLO:hello, thing1:thing2]此适配器还支持该order属性,如果它绑定到一个PublishSubscribeChannel.

Gemfire 消息存储

如 EIP 中所述,消息存储可让您持久化消息。如果可靠性是一个问题,这在处理能够缓冲消息(QueueChannelAggregator、和其他)的组件时很有用。Resequencer在 Spring Integration 中,策略接口还为声明检查MessageStore模式提供了基础,这也在 EIP 中进行了描述。

Spring Integration 的 Gemfire 模块提供GemfireMessageStoreMessageStore策略(主要由QueueChannelandClaimCheck模式使用)和MessageGroupStore策略(主要由AggregatorandResequencer模式使用)的实现。

以下示例使用spring-gemfire命名空间(不要与命名空间混淆spring-integration-gemfire)配置缓存和区域:

<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore">
    <constructor-arg ref="myRegion"/>
</bean>

<gfe:cache/>

<gfe:replicated-region id="myRegion"/>


<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="gemfireMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
    message-store="gemfireMessageStore"/>

通常,希望消息存储在客户端-服务器配置中的一个或多个远程缓存服务器中维护。在这种情况下,您应该配置客户端缓存、客户端区域和客户端池,并将该区域注入MessageStore. 以下示例显示了如何执行此操作:

<bean id="gemfireMessageStore"
    class="org.springframework.integration.gemfire.store.GemfireMessageStore">
    <constructor-arg ref="myRegion"/>
</bean>

<gfe:client-cache/>

<gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/>

<gfe:pool id="messageStorePool">
    <gfe:server host="localhost" port="40404" />
</gfe:pool>

请注意,该pool元素配置有缓存服务器的地址(您可以在此处替换定位器)。该区域被配置为“代理”,因此没有数据存储在本地。该区域id对应于缓存服务器中同名的区域。

从版本 4.3.12 开始,GemfireMessageStore支持 keyprefix选项以允许区分同一 GemFire 区域上的商店实例。

Gemfire 锁注册表

从 4.0 版开始,GemfireLockRegistry可以使用。某些组件(例如,聚合器和重定序器)使用从LockRegistry实例获得的锁来确保在任何给定时间只有一个线程在操作组。在DefaultLockRegistry单个组件中执行此功能。您现在可以在这些组件上配置外部锁定注册表。当您使用 shared MessageGroupStorewith 时GemfireLockRegistry,它可以跨多个应用程序实例提供此功能,这样一次只有一个实例可以操作该组。

其中一个GemfireLockRegistry构造函数需要 aRegion作为参数。它用于LockgetDistributedLock()方法中获取 a。此操作GLOBAL需要Region. 另一个构造函数需要 a Cache,并且Region使用GLOBAL范围和名称创建 ,LockRegistry

Gemfire 元数据存储

4.0 版引入了一个新的基于 Gemfire MetadataStore元数据存储)的实现。您可以使用GemfireMetadataStore来维护跨应用程序重新启动的元数据状态。这个新的MetadataStore实现可以与适配器一起使用,例如:

为了让这些适配器使用新GemfireMetadataStore的 . 声明一个 bean 名称为metadataStore. 提要入站通道适配器自动拾取并使用声明的GemfireMetadataStore.

GemfireMetadataStore还实现ConcurrentMetadataStore了 ,让它在多个应用程序实例之间可靠地共享,其中只有一个实例可以存储或修改键的值 。这些方法根据区域的范围和数据策略提供不同级别的并发保证。它们在对等缓存和客户端-服务器缓存中实现,但在具有NORMALEMPTY数据策略的对等区域中是不允许的。
从 5.0 版开始,GemfireMetadataStore还实现了ListenableMetadataStore,它允许您通过向 store 提供MetadataStoreListener实例来监听缓存事件,如以下示例所示:
GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
metadataStore.addListener(new MetadataStoreListenerAdapter() {

    @Override
    public void onAdd(String key, String value) {
         ...
    }

});

1. see XML Configuration