Redis 支持
Spring Integration 2.1 引入了对Redis的支持:“一个开源的高级键值存储”。这种支持以基于 RedisMessageStore
以及发布-订阅消息适配器的形式出现,Redis 通过其 、 和 命令支持这些PUBLISH
消息SUBSCRIBE
适配器UNSUBSCRIBE
。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:5.5.13"
您还需要包括 Redis 客户端依赖项,例如生菜。
要下载、安装和运行 Redis,请参阅Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。Spring Integration 使用另一个 Spring 项目Spring Data Redis提供的支持,它提供了典型的 Spring 构造:ConnectionFactory
和Template
. 这些抽象简化了与多个 Redis 客户端 Java API 的集成。目前 Spring Data Redis 支持Jedis和Lettuce。
使用RedisConnectionFactory
RedisConnectionFactory
要连接到 Redis,您可以使用接口的实现之一。以下清单显示了接口定义:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例显示了如何LettuceConnectionFactory
在 Java 中创建一个:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
下面的例子展示了如何LettuceConnectionFactory
在 Spring 的 XML 配置中创建一个:
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
的实现RedisConnectionFactory
提供了一组属性,例如端口和主机,您可以根据需要进行设置。一旦有了 的实例RedisConnectionFactory
,就可以创建 的实例RedisTemplate
并将其注入RedisConnectionFactory
.
使用RedisTemplate
与 Spring 中的其他模板类(例如JdbcTemplate
和JmsTemplate
)一样,RedisTemplate
它是一个帮助类,用于简化 Redis 数据访问代码。有关RedisTemplate
及其变体(例如StringRedisTemplate
)的更多信息,请参阅Spring Data Redis 文档。
以下示例显示了如何RedisTemplate
在 Java 中创建 的实例:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
下面的例子展示了如何RedisTemplate
在 Spring 的 XML 配置中创建一个实例:
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如介绍中所述PUBLISH
,Redis 通过其、SUBSCRIBE
和UNSUBSCRIBE
命令提供对发布-订阅消息传递的支持。与 JMS 和 AMQP 一样,Spring Integration 提供消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅通道
与 JMS 类似,在某些情况下,生产者和消费者都打算成为同一个应用程序的一部分,在同一个进程中运行。您可以通过使用一对入站和出站通道适配器来完成此操作。但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决这个用例。您可以创建一个发布-订阅通道,如以下示例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
A 的行为与主 Spring Integration 命名空间中publish-subscribe-channel
的普通元素非常相似。<publish-subscribe-channel/>
它可以被任何端点的input-channel
和属性引用。output-channel
不同之处在于此通道由 Redis 主题名称支持:String
由属性指定的值topic-name
。但是,与 JMS 不同的是,此主题不必提前创建,甚至不必由 Redis 自动创建。在 Redis 中,主题是String
扮演地址角色的简单值。生产者和消费者可以使用String
与其主题名称相同的值进行通信。对该通道的简单订阅意味着可以在生产和消费端点之间进行异步发布-订阅消息传递。但是,与通过添加一个<queue/>
在一个简单的 Spring Integration<channel/>
元素中,消息不存储在内存队列中。相反,这些消息通过 Redis 传递,这让您可以依赖它对持久性和集群的支持以及它与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis 入站通道适配器 ( RedisInboundChannelAdapter
) 以与其他入站适配器相同的方式将传入的 Redis 消息适配为 Spring 消息。它接收特定于平台的消息(在本例中为 Redis)并使用MessageConverter
策略将它们转换为 Spring 消息。以下示例显示了如何配置 Redis 入站通道适配器:
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。请注意,前面的配置依赖于熟悉的自动发现某些 bean 的 Spring 范例。在这种情况下,redisConnectionFactory
被隐式注入到适配器中。您可以改为使用connection-factory
属性显式指定它。
另外,请注意,前面的配置使用自定义的MessageConverter
. 该方法类似于 JMS,其中MessageConverter
使用实例在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。默认值为SimpleMessageConverter
.
入站适配器可以订阅多个主题名称,因此topics
属性中的值以逗号分隔。
从 3.0 版开始,入站适配器除了现有topics
属性外,现在还具有该topic-patterns
属性。此属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布-订阅的更多信息,请参阅Redis Pub/Sub。
入站适配器可以使用 aRedisSerializer
来反序列化 Redis 消息的正文。的serializer
属性<int-redis:inbound-channel-adapter>
可以设置为空字符串,这会导致属性的null
值RedisSerializer
。在这种情况下,Redis 消息的原始byte[]
正文作为消息有效负载提供。
从5.0版开始,您可以Executor
使用. 此外,接收到的 Spring Integration 消息现在具有指示已发布消息来源的标头:主题或模式。您可以将此下游用于路由逻辑。task-executor
<int-redis:inbound-channel-adapter>
RedisHeaders.MESSAGE_SOURCE
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息适配为 Redis 消息。它接收 Spring Integration 消息并使用MessageConverter
策略将它们转换为特定于平台的消息(在本例中为 Redis)。以下示例显示了如何配置 Redis 出站通道适配器:
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器平行。适配器被隐式注入 a RedisConnectionFactory
,它被定义redisConnectionFactory
为它的 bean 名称。这个例子还包括可选的(和自定义MessageConverter
的)(testConverter
bean)。
从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter>
提供了topic
属性的替代方案:您可以使用topic-expression
属性在运行时确定消息的 Redis 主题。这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器以从 Redis 列表中“弹出”消息。默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。适配器是消息驱动的。它使用内部侦听器线程并且不使用轮询器。
以下清单显示了 的所有可用属性queue-inbound-channel-adapter
:
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | 组件 bean 名称。如果您不提供该channel 属性,DirectChannel 则会在应用程序上下文中创建并注册一个并使用该id 属性作为 bean 名称。在这种情况下,端点本身使用 bean 名称id plus注册.adapter 。(如果 bean 名称是thing1 ,则端点注册为thing1.adapter 。) |
2 | 从此端点MessageChannel 发送实例的目标。Message |
3 | 一个SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。它默认为true . |
4 | SmartLifecycle 用于指定此端点开始的阶段的属性。它默认为0 . |
5 | RedisConnectionFactory 对bean的引用。它默认为redisConnectionFactory . |
6 | Redis 列表的名称,在该列表上执行基于队列的“pop”操作以获取 Redis 消息。 |
7 | 当从端点的侦听任务接收到异常时MessageChannel 将ErrorMessage 实例发送到的对象。默认情况下,底层MessagePublishingErrorHandler 使用errorChannel 来自应用程序上下文的默认值。 |
8 | RedisSerializer bean 参考。它可以是一个空字符串,这意味着“没有序列化程序”。byte[] 在这种情况下,来自入站 Redis 消息的原始数据将channel 作为Message 有效负载发送到。默认情况下它是一个JdkSerializationRedisSerializer . |
9 | 'pop' 操作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。默认值为 1 秒。 |
10 | 在“pop”操作发生异常后,在重新启动侦听器任务之前,侦听器任务应休眠的时间(以毫秒为单位)。 |
11 | 指定此端点是否希望 Redis 队列中的数据包含整个Message 实例。如果此属性设置为true ,serializer 则 不能为空字符串,因为消息需要某种形式的反序列化(JDK 默认为序列化)。它的默认值为false . |
12 | 对 Spring TaskExecutor (或标准 JDK 1.5+ Executor )bean 的引用。它用于底层监听任务。它默认为SimpleAsyncTaskExecutor . |
13 | 指定此端点应使用“右弹出”(何时true )或“左弹出”(何时false )从 Redis 列表中读取消息。如果true ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当FIFO 队列。将其设置为false 与通过“右推”写入列表的软件一起使用或实现类似堆栈的消息顺序。它的默认值为true . 从 4.3 版开始。 |
必须配置task-executor 多个线程进行处理;否则在RedisQueueMessageDrivenEndpoint 出错后尝试重新启动侦听器任务时可能会出现死锁。可errorChannel 用于处理这些错误,以避免重新启动,但最好不要将您的应用程序暴露于可能的死锁情况。有关可能的实现
,请参阅 Spring Framework参考手册。TaskExecutor |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。默认情况下,它使用“左推”,但您可以将其配置为使用“右推”。以下清单显示了 Redis 的所有可用属性queue-outbound-channel-adapter
:
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | 组件 bean 名称。如果您不提供该channel 属性,DirectChannel 则会在应用程序上下文中创建并注册一个并使用该id 属性作为 bean 名称。在这种情况下,端点注册了一个 bean 名称id plus .adapter 。(如果 bean 名称是thing1 ,则端点注册为thing1.adapter 。) |
2 | 此MessageChannel 端点从中接收Message 实例的 。 |
3 | RedisConnectionFactory 对bean的引用。它默认为redisConnectionFactory . |
4 | 执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。此属性与 互斥queue-expression 。 |
5 | Expression 用于确定 Redis 列表名称的SpEL 。Message 它使用运行时的传入作为#root 变量。此属性与 互斥queue 。 |
6 | RedisSerializer 豆类参考。它默认为JdkSerializationRedisSerializer . 但是,对于String 有效载荷,StringRedisSerializer 如果serializer 未提供参考,则使用 a。 |
7 | 指定此端点是否应仅将有效负载或全部Message 发送到 Redis 队列。它默认为true . |
8 | 指定此端点应使用“左推”(何时true )或“右推”(何时false )将消息写入 Redis 列表。如果true ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当FIFO 队列。将其设置为false 与使用“left pop”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。它默认为true . 从 4.3 版开始。 |
Redis 应用程序事件
从 Spring Integration 3.0 开始,Redis 模块提供IntegrationEvent
了org.springframework.context.ApplicationEvent
. 封装来自 Redis 操作的RedisExceptionEvent
异常(端点是事件的“源”)。例如,在从操作中<int-redis:queue-inbound-channel-adapter/>
捕获异常后会发出这些事件。BoundListOperations.rightPop
异常可以是任何泛型org.springframework.data.redis.RedisSystemException
或org.springframework.data.redis.RedisConnectionFailureException
. 使用 an 处理这些事件<int-event:inbound-channel-adapter/>
有助于确定后台 Redis 任务的问题并采取管理措施。
Redis 消息存储
如企业集成模式(EIP) 书中所述,消息存储可让您持久化消息。这在处理具有缓冲消息能力的组件(聚合器、重定序器等)时非常有用,并且可靠性受到关注。在 Spring Integration 中,该策略还为声明检查MessageStore
模式提供了基础,这也在 EIP 中进行了描述。
Spring Integration 的 Redis 模块提供了RedisMessageStore
. 以下示例显示了如何将其与聚合器一起使用:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 bean 配置,它需要 aRedisConnectionFactory
作为构造函数参数。
默认情况下,RedisMessageStore
使用 Java 序列化来序列化消息。但是,如果您想使用不同的序列化技术(例如 JSON),您可以通过设置valueSerializer
.RedisMessageStore
从版本 4.3.10 开始,框架为Message
实例和MessageHeaders
实例提供 Jackson 序列化器和反序列化器实现 - MessageJacksonDeserializer
和MessageHeadersJacksonSerializer
,分别。它们必须SimpleModule
使用ObjectMapper
. 此外,您应该设置enableDefaultTyping
为ObjectMapper
每个序列化的复杂对象添加类型信息(如果您信任源)。然后在反序列化期间使用该类型信息。该框架提供了一个名为 的实用程序方法JacksonJsonUtils.messagingAwareMapper()
,它已经提供了所有前面提到的属性和序列化程序。此实用程序方法带有trustedPackages
限制 Java 包进行反序列化以避免安全漏洞的参数。默认的受信任包:java.util
, java.lang
,org.springframework.messaging.support
, org.springframework.integration.support
, org.springframework.integration.message
, org.springframework.integration.store
. 要在 中管理 JSON 序列化RedisMessageStore
,您必须以类似于以下示例的方式对其进行配置:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从版本 4.3.12 开始,RedisMessageStore
支持prefix
允许区分同一 Redis 服务器上的存储实例的选项。
Redis 通道消息存储
前面RedisMessageStore
显示的将每个组维护为单个键(组 ID)下的值。虽然您可以使用它来支持QueueChannel
持久性,RedisChannelMessageStore
但为此目的提供了一个专门的(从版本 4.0 开始)。该存储在发送消息和接收消息时LIST
为每个通道使用一个。默认情况下,这个 store 也使用 JDK 序列化,但是你可以修改 value 序列化器,如前所述。LPUSH
RPOP
我们建议使用此商店后备渠道,而不是使用一般的RedisMessageStore
. 以下示例定义了一个 Redis 消息存储并在带有队列的通道中使用它:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键具有以下形式:(<storeBeanName>:<channelId>
在前面的示例中,redisMessageStore:somePersistentQueueChannel
)。
此外,RedisChannelPriorityMessageStore
还提供了一个子类。当您将其与 a 一起使用时QueueChannel
,将按 (FIFO) 优先级顺序接收消息。它使用标准IntegrationMessageHeaderAccessor.PRIORITY
标头并支持优先级值 ( 0 - 9
)。具有其他优先级的消息(和没有优先级的消息)在任何具有优先级的消息之后按 FIFO 顺序检索。
这些商店只实施BasicMessageGroupStore 而不实施MessageGroupStore 。它们只能用于支持QueueChannel .
|
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 Redis MetadataStore
(参见Metadata Store)的实现。您可以使用RedisMetadataStore
来维护MetadataStore
跨应用程序重新启动的状态。您可以将此新MetadataStore
实现与适配器一起使用,例如:
要指示这些适配器使用新的RedisMetadataStore
,请声明一个名为 的 Spring bean metadataStore
。Feed 入站通道适配器和 Feed 入站通道适配器都自动拾取并使用声明的RedisMetadataStore
. 下面的例子展示了如何声明这样一个 bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
由RedisProperties
. _ 与它的交互使用BoundHashOperations
,这反过来又需要key
整个Properties
商店的 a。在 的情况下MetadataStore
,它key
扮演一个区域的角色,这在分布式环境中很有用,当多个应用程序使用同一个 Redis 服务器时。默认情况下,它key
的值为MetaData
。
从 4.0 版本开始,这个 store 实现ConcurrentMetadataStore
了,让它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。
您不能在Redis 集群中使用RedisMetadataStore.replace() (例如,在 中),因为当前不支持原子性命令。
AbstractPersistentAcceptOnceFileListFilter WATCH |
Redis 存储入站通道适配器
Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为Message
有效负载发送。以下示例显示了如何配置 Redis 存储入站通道适配器:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例展示了如何使用store-inbound-channel-adapter
元素配置 Redis 存储入站通道适配器,为各种属性提供值,例如:
-
key
或key-expression
:正在使用的集合的键名。 -
collection-type
:此适配器支持的集合类型的枚举。支持的集合是LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
connection-factory
: 引用o.s.data.redis.connection.RedisConnectionFactory
. -
redis-template
: 引用o.s.data.redis.core.RedisTemplate
. -
所有其他入站适配器共有的其他属性(例如“通道”)。
您不能同时设置redis-template 和connection-factory 。
|
默认情况下,适配器使用
对键和散列键 |
因为它具有 的文字值key
,所以前面的示例相对简单且静态。有时,您可能需要在运行时根据某些条件更改键的值。为此,请key-expression
改用,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,您可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。例如,您可能希望在处理后移动或删除该值。您可以使用 Spring Integration 2.2 中添加的事务同步功能来做到这一点。以下示例使用key-expression
和事务同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以使用transactional
元素将您的轮询器声明为事务性的。该元素可以引用一个真正的事务管理器(例如,如果您的流程的其他部分调用 JDBC)。如果你没有“真正的”事务,你可以使用一个o.s.i.transaction.PseudoTransactionManager
Spring 的实现,它PlatformTransactionManager
可以在没有实际事务时使用 Redis 适配器的事务同步特性。
这不会使 Redis 活动本身具有事务性。它允许在成功(提交)或失败(回滚)之前或之后执行操作的同步。 |
o.s.i.transaction.TransactionSynchronizationFactory
一旦您的轮询器是事务性的,您就可以在transactional
元素
上设置一个实例。TransactionSynchronizationFactory
创建TransactionSynchronization
. 为了您的方便,我们公开了一个默认的基于 SpEL 的TransactionSynchronizationFactory
,它允许您配置 SpEL 表达式,它们的执行与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。对于每个子元素,您可以指定expression
和channel
属性。如果仅存在channel
属性,则将接收到的消息作为特定同步场景的一部分发送到那里。如果只有expression
如果存在属性并且表达式的结果是非空值,则生成带有结果作为有效负载的消息并将其发送到默认通道 ( NullChannel
) 并出现在日志中(在DEBUG
级别)。如果您希望评估结果转到特定渠道,请添加一个channel
属性。如果表达式的结果为 null 或 void,则不会生成任何消息。
有关事务同步的更多信息,请参阅事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许您将消息负载写入 Redis 集合,如以下示例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
上述配置一个 Redis 存储出站通道适配器通过使用该store-inbound-channel-adapter
元素。它为各种属性提供值,例如:
-
key
或key-expression
:正在使用的集合的键名。 -
extract-payload-elements
:如果设置为true
(默认值)并且有效负载是“多值”对象的实例(即 aCollection
或 aMap
),则使用“addAll”和“putAll”语义存储。否则,如果设置为false
,则无论其类型如何,有效负载都将存储为单个条目。如果负载不是“多值”对象的实例,则忽略此属性的值,并且负载始终存储为单个条目。 -
collection-type
Collection
:此适配器支持的类型的枚举。支持的集合是LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
map-key-expression
: SpEL 表达式,返回正在存储的条目的键名。仅当collection-type
isMAP
或PROPERTIES
'extract-payload-elements' 为假时才适用。 -
connection-factory
: 引用o.s.data.redis.connection.RedisConnectionFactory
. -
redis-template
: 引用o.s.data.redis.core.RedisTemplate
. -
所有其他入站适配器共有的其他属性(例如“通道”)。
您不能同时设置redis-template 和connection-factory 。
|
默认情况下,适配器使用StringRedisTemplate . 这使用StringRedisSerializer 键、值、散列键和散列值的实例。但是,如果extract-payload-elements 设置为false ,则将使用RedisTemplate 具有StringRedisSerializer 键和散列键JdkSerializationRedisSerializer 的实例以及值和散列值的实例 s 的 a。对于 JDK 序列化程序,重要的是要了解 Java 序列化用于所有值,无论该值是否实际上是一个集合。如果您需要对值的序列化进行更多控制,请考虑提供您自己的RedisTemplate 而不是依赖这些默认值。
|
因为它具有 the 和其他属性的字面值key
,所以前面的示例相对简单且静态。有时,您可能需要根据某些条件在运行时动态更改值。为此,请使用它们的-expression
等效项(key-expression
、map-key-expression
等),其中提供的表达式可以是任何有效的 SpEL 表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,让您可以使用泛型RedisConnection#execute
方法执行任何标准的 Redis 命令。以下清单显示了 Redis 出站网关的可用属性:
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | 此MessageChannel 端点从中接收Message 实例的 。 |
2 | MessageChannel 此端点发送回复Message 实例的位置。 |
3 | 指定此出站网关是否必须返回非空值。它默认为true . ReplyRequiredException 当 Redis 返回null 值时抛出A。 |
4 | 等待发送回复消息的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。 |
5 | RedisConnectionFactory 对bean的引用。它默认为redisConnectionFactory . 它与 'redis-template' 属性互斥。 |
6 | RedisTemplate 对bean的引用。它与“connection-factory”属性互斥。 |
7 | 对 的实例的引用org.springframework.data.redis.serializer.RedisSerializer 。如有必要,它用于将每个命令参数序列化为 byte[]。 |
8 | 返回命令键的 SpEL 表达式。它默认为redis_command 消息头。它不能评估为null 。 |
9 | 作为命令参数计算的逗号分隔的 SpEL 表达式。arguments-strategy 与属性互斥。如果您不提供任何属性,则将payload 用作命令参数。参数表达式可以评估为“null”以支持可变数量的参数。 |
10 | 一个boolean 标志,用于指定评估的 Redis 命令字符串是否可用作配置时#cmd 表达式评估上下文中的变量。否则,此属性将被忽略。o.s.i.redis.outbound.ExpressionArgumentsStrategy argument-expressions |
11 | 引用 的实例o.s.i.redis.outbound.ArgumentsStrategy 。argument-expressions 与属性互斥。如果您不提供任何属性,则将payload 用作命令参数。 |
您可以将其<int-redis:outbound-gateway>
用作通用组件来执行任何所需的 Redis 操作。以下示例显示了如何从 Redis 原子序数中获取递增值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
有效负载应该有一个名称,redisCounter
它可以由org.springframework.data.redis.support.atomic.RedisAtomicInteger
bean 定义提供。
该RedisConnection#execute
方法具有泛型Object
作为其返回类型。实际结果取决于命令类型。例如,MGET
返回一个List<byte[]>
. 有关命令、它们的参数和结果类型的更多信息,请参阅Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关来执行请求和回复场景。它将对话推UUID
送到提供的queue
,将其作为键的值推UUID
送到 Redis 列表,并等待来自具有键的 Redis 列表的回复UUID' plus '.reply
。每次交互使用不同的 UUID。以下清单显示了 Redis 出站网关的可用属性:
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | 此MessageChannel 端点从中接收Message 实例的 。 |
2 | MessageChannel 此端点发送回复Message 实例的位置。 |
3 | 指定此出站网关是否必须返回非空值。该值是false 默认值。否则,ReplyRequiredException 当 Redis 返回null 值时会抛出 a 。 |
4 | 等待发送回复消息的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。 |
5 | RedisConnectionFactory 对bean的引用。它默认为redisConnectionFactory . 它与“redis-template”属性互斥。 |
6 | 出站网关向其发送对话的 Redis 列表的名称UUID 。 |
7 | 注册多个网关时此出站网关的顺序。 |
8 | RedisSerializer bean 参考。它可以是一个空字符串,这意味着“没有序列化程序”。byte[] 在这种情况下,来自入站 Redis 消息的原始数据将channel 作为Message 有效负载发送到。默认情况下,它是一个JdkSerializationRedisSerializer . |
9 | 指定此端点是否希望 Redis 队列中的数据包含整个Message 实例。如果此属性设置为true ,serializer 则 不能为空字符串,因为消息需要某种形式的反序列化(JDK 默认为序列化)。 |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。它UUID
从提供的会话中弹出一个对话,从 Redis 列表中弹出一个作为其键queue
的值,并使用键加号将回复推送到 Redis 列表。以下清单显示了 Redis 队列入站网关的可用属性:UUID
UUID
.reply
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | MessageChannel 此端点发送Message 从 Redis 数据创建的实例的位置。 |
2 | MessageChannel 此端点等待回复Message 实例的位置。可选 -replyChannel 标头仍在使用中。 |
3 | 对 Spring TaskExecutor (或标准 JDK Executor )bean 的引用。它用于底层监听任务。它默认为SimpleAsyncTaskExecutor . |
4 | 等待发送回复消息的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。 |
5 | RedisConnectionFactory 对bean的引用。它默认为redisConnectionFactory . 它与 'redis-template' 属性互斥。 |
6 | 对话的 Redis 列表的名称UUID 。 |
7 | 注册多个网关时此入站网关的顺序。 |
8 | RedisSerializer bean 参考。它可以是一个空字符串,这意味着“没有序列化程序”。byte[] 在这种情况下,来自入站 Redis 消息的原始数据将channel 作为Message 有效负载发送到。它默认为JdkSerializationRedisSerializer . (请注意,在 4.3 版之前的版本中,StringRedisSerializer 默认情况下它是 a 。要恢复该行为,请提供对 a 的引用StringRedisSerializer )。 |
9 | 等待获取接收消息的超时时间(以毫秒为单位)。它通常应用于基于队列的有限请求通道。 |
10 | 指定此端点是否希望 Redis 队列中的数据包含整个Message 实例。如果此属性设置为true ,serializer 则 不能为空字符串,因为消息需要某种形式的反序列化(JDK 默认为序列化)。 |
11 | 在“右弹出”操作发生异常后,在重新启动侦听器任务之前,侦听器任务应休眠的时间(以毫秒为单位)。 |
必须配置task-executor 多个线程进行处理;否则在RedisQueueMessageDrivenEndpoint 出错后尝试重新启动侦听器任务时可能会出现死锁。可errorChannel 用于处理这些错误,以避免重新启动,但最好不要将您的应用程序暴露于可能的死锁情况。有关可能的实现
,请参阅 Spring Framework参考手册。TaskExecutor |
Redis Stream 出站通道适配器
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,用于将消息有效负载写入 Redis 流。出站通道适配器用于ReactiveStreamOperations.add(…)
向流中添加一个Record
。以下示例展示了如何为 Redis Stream Outbound Channel Adapter 使用 Java 配置和 Service 类。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | ReactiveRedisStreamMessageHandler 构造一个usingReactiveRedisConnectionFactory 和 stream name的实例来添加记录。另一个构造函数变体基于 SpEL 表达式来根据请求消息评估流密钥。 |
2 | 设置一个RedisSerializationContext 用于在添加到流之前序列化记录键和值。 |
3 | SetHashMapper 提供 Java 类型和 Redis 哈希/映射之间的契约。 |
4 | 如果为“真”,通道适配器将从请求消息中提取有效负载以添加流记录。或者使用整个消息作为一个值。它默认为true . |
Redis 流入站通道适配器
Spring Integration 5.4 引入了 Reactive Stream 入站通道适配器,用于从 Redis Stream 读取消息。入站通道适配器使用StreamReceiver.receive(…)
或StreamReceiver.receiveAutoAck()
基于自动确认标志从 Redis 流中读取记录。以下示例显示了如何为 Redis Stream Inbound Channel Adapter 使用 Java 配置。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | ReactiveRedisStreamMessageProducer 构造一个使用ReactiveRedisConnectionFactory 和流键读取记录的实例。 |
2 | AStreamReceiver.StreamReceiverOptions 使用反应式基础设施来消费 redis 流。 |
3 | 一个SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。它默认为true . 如果false ,RedisStreamMessageProducer 应该手动启动messageProducer.start() 。 |
4 | 如果false ,则不会自动确认收到的消息。消息的确认将推迟到客户端消费消息。它默认为true . |
5 | 如果true ,将创建一个消费者组。在创建消费者组流期间,也会创建(如果尚不存在)。消费者组跟踪消息传递并区分消费者。它默认为false . |
6 | 设置消费者组名称。它默认为定义的 bean 名称。 |
7 | 设置消费者名称。my-consumer 从 group读取消息my-group 。 |
8 | 从此端点向其发送消息的消息通道。 |
9 | 定义要读取消息的偏移量。它默认为ReadOffset.latest() . |
10 | 如果为“真”,通道适配器将从Record . 否则,整体Record 被用作有效载荷。它默认为true . |
如果autoAck
设置为false
,则Record
Redis 驱动程序不会自动确认 Redis 流中的消息,而是将IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头添加到消息中以生成SimpleAcknowledgment
实例作为值。acknowledge()
每当基于这样的记录为消息完成业务逻辑时,调用其回调是目标集成流的职责。即使在反序列化过程中发生异常并errorChannel
配置时,也需要类似的逻辑。因此,目标错误处理程序必须决定是否确认此类失败消息。除了IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
,ReactiveRedisStreamMessageProducer
还将这些标头填充到消息中以生成:RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
和。RedisHeaders.CONSUMER_GROUP
RedisHeaders.CONSUMER
从 5.5 版本开始,您可以StreamReceiver.StreamReceiverOptionsBuilder
在 上显式配置选项ReactiveRedisStreamMessageProducer
,包括新引入的onErrorResume
功能,如果 Redis Stream 消费者在发生反序列化错误时应继续轮询,则需要该功能。如上所述,默认函数向错误通道(如果提供)发送一条消息,并可能确认失败消息。所有这些StreamReceiver.StreamReceiverOptionsBuilder
都与外部提供的相互排斥StreamReceiver.StreamReceiverOptions
。
Redis 锁注册表
Spring Integration 4.0 引入了RedisLockRegistry
. 某些组件(例如,聚合器和重排序器)使用从LockRegistry
实例获得的锁来确保一次只有一个线程操作一个组。在DefaultLockRegistry
单个组件中执行此功能。您现在可以在这些组件上配置外部锁定注册表。当您将它与 shared 一起使用时MessageGroupStore
,您可以使用RedisLockRegistry
来跨多个应用程序实例提供此功能,这样一次只有一个实例可以操作该组。
当一个本地线程释放一个锁时,另一个本地线程通常可以立即获得该锁。如果锁由使用不同注册表实例的线程释放,则获取锁最多可能需要 100 毫秒。
To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry. Locks are normally held for a much smaller time.
Because the keys can expire, an attempt to unlock an expired lock results in an exception being thrown. However, the resources protected by such a lock may have been compromised, so such exceptions should be considered to be severe. You should set the expiry at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time. |
Starting with version 5.0, the RedisLockRegistry
implements ExpirableLockRegistry
, which removes locks last acquired more than age
ago and that are not currently locked.
String with version 5.5.6, the RedisLockRegistry
is support automatically clean up cache for redisLocks in RedisLockRegistry.locks
via RedisLockRegistry.setCacheCapacity()
.
See its JavaDocs for more information.
String with version 5.5.13, the RedisLockRegistry
exposes a setRedisLockType(RedisLockType)
option to determine in which mode a Redis lock acquisition should happen:
-
RedisLockType.SPIN_LOCK
- the lock is acquired by periodic loop (100ms) checking whether the lock can be acquired. Default. -
RedisLockType.PUB_SUB_LOCK
- The lock is acquired by redis pub-sub subscription.
pub-sub 是首选模式 - 客户端 Redis 服务器之间的网络聊天更少,并且性能更高 - 当订阅通知在其他进程中解锁时立即获取锁。但是,Redis 不支持主/副本连接中的 pub-sub(例如在 AWS ElastiCache 环境中),因此默认选择忙自旋模式以使注册表在任何环境中都可以正常工作。