阿帕奇卡夫卡支持
概述
Spring Integration for Apache Kafka 基于Spring for Apache Kafka 项目。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-kafka:5.5.13"
它提供以下组件:
出站通道适配器
出站通道适配器用于将消息从 Spring 集成通道发布到 Apache Kafka 主题。通道在应用程序上下文中定义,然后连接到向 Apache Kafka 发送消息的应用程序。发送方应用程序可以使用 Spring Integration 消息发布到 Apache Kafka,这些消息在内部由出站通道适配器转换为 Kafka 记录,如下所示:
-
Spring Integration 消息的有效负载用于填充 Kafka 记录的有效负载。
-
默认情况下,
kafka_messageKey
Spring Integration 消息的标头用于填充 Kafka 记录的键。
您可以分别通过kafka_topic
和标头自定义发布消息的目标主题和分区。kafka_partitionId
此外,<int-kafka:outbound-channel-adapter>
它还提供了通过在出站消息上应用 SpEL 表达式来提取键、目标主题和目标分区的能力。为此,它支持三对互斥的属性:
-
topic
和topic-expression
-
message-key
和message-key-expression
-
partition-id
和partition-id-expression
这些允许您将topic
、message-key
和partition-id
分别指定为适配器上的静态值,或者在运行时根据请求消息动态评估它们的值。
KafkaHeaders 接口(由 提供spring-kafka )包含用于与标头交互的常量
。和默认标头现在需要一个messageKey 前缀。从使用旧标头的早期版本迁移时,您需要在. 或者,您可以使用 a或 a将上游标头更改为新标头。如果您使用常量值,您还可以使用和在适配器上配置它们。
topic kafka_ message-key-expression="headers['messageKey']" topic-expression="headers['topic']" <int-kafka:outbound-channel-adapter> KafkaHeaders <header-enricher> MessageBuilder topic message-key |
注意:如果适配器配置了主题或消息键(使用常量或表达式),则使用它们并忽略相应的标头。如果您希望标头覆盖配置,则需要在表达式中对其进行配置,例如:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
适配器需要一个KafkaTemplate
,而后者又需要一个适当配置的KafkaProducerFactory
.
如果提供了send-failure-channel
( sendFailureChannel
) 并且接收到发送失败(同步或异步),ErrorMessage
则将发送到通道。有效负载是KafkaSendFailureException
带有failedMessage
、record
( ProducerRecord
) 和cause
属性的。您可以DefaultErrorMessageStrategy
通过设置error-message-strategy
属性来覆盖 。
如果提供了send-success-channel
( ),则在成功发送后发送sendSuccessChannel
带有有效负载类型的消息。org.apache.kafka.clients.producer.RecordMetadata
如果您的应用程序使用事务并且相同的通道适配器用于在事务由侦听器容器启动的地方发布消息,以及在没有现有事务的地方发布消息,则必须在 上配置 atransactionIdPrefix 以KafkaTemplate 覆盖容器使用的前缀或事务管理器。容器启动的事务(生产者工厂或事务管理器属性)使用的前缀在所有应用程序实例上必须相同。用于仅生产者事务的前缀在所有应用程序实例上必须是唯一的。
|
您可以配置flushExpression
必须解析为布尔值的 a。如果您使用Kafka 生产者属性,linger.ms
则在发送多条消息后刷新可能会很有用;batch.size
该表达式应Boolean.TRUE
在最后一条消息上计算为,并且将立即发送不完整的批次。默认情况下,表达式在标头 ( )中查找Boolean
值。如果值是,则刷新将发生,如果它是或不存在标头,则不会发生。KafkaIntegrationHeaders.FLUSH
kafka_flush
true
false
KafkaProducerMessageHandler.sendTimeoutExpression
默认值已从 10 秒更改为delivery.timeout.ms
Kafka 生产者属性+ 5000
,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由该框架生成的超时。为了保持一致性,这已更改,因为您可能会遇到意外行为(Spring 可能会超时发送,而实际上它最终是成功的)。重要提示:默认情况下超时为 120 秒,因此您可能希望减少它以获得更及时的故障。
Java 配置
以下示例显示如何使用 Java 为 Apache Kafka 配置出站通道适配器:
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setSuccessChannel(successes());
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
Java DSL 配置
以下示例显示如何使用 Spring Integration Java DSL 为 Apache Kafka 配置出站通道适配器:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.timestamp(m -> 1487694048644L),
e -> e.id("kafkaProducer2")))
);
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.headerMapper(mapper())
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
}
XML 配置
以下示例显示如何使用 XML 配置 Kafka 出站通道适配器:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
sync="false"
message-key-expression="'bar'"
send-failure-channel="failures"
send-success-channel="successes"
error-message-strategy="ems"
partition-id-expression="2">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
... <!-- more producer properties -->
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
消息驱动的通道适配器
( KafkaMessageDrivenChannelAdapter
)<int-kafka:message-driven-channel-adapter>
使用spring-kafka
KafkaMessageListenerContainer
or ConcurrentListenerContainer
。
该mode
属性也可用。它可以接受record
or的值batch
(默认值:record
)。对于record
模式,每个消息有效负载都是从单个ConsumerRecord
. 对于模式,有效负载是从消费者轮询返回的batch
所有实例转换而来的对象列表。ConsumerRecord
与 batched 一样@KafkaListener
,KafkaHeaders.RECEIVED_MESSAGE_KEY
, KafkaHeaders.RECEIVED_PARTITION_ID
,KafkaHeaders.RECEIVED_TOPIC
和KafkaHeaders.OFFSET
headers 也是列表,其位置对应于有效负载中的位置。
收到的消息填充了某些标题。有关更多信息,请参阅KafkaHeaders
课程。
该Consumer 对象(在kafka_consumer 标头中)不是线程安全的。您必须仅在调用适配器内的侦听器的线程上调用其方法。如果您将消息传递给另一个线程,则不得调用其方法。
|
提供a 时retry-template
,将根据其重试策略重试传递失败。error-channel
在这种情况下不允许使用An 。recovery-callback
当重试用尽时,您可以使用来处理错误。在大多数情况下,这是一个ErrorMessageSendingRecoverer
将 发送ErrorMessage
到通道的。
在构建ErrorMessage
(用于error-channel
or )时,您可以通过设置属性recovery-callback
来自定义错误消息。error-message-strategy
默认情况下,RawRecordHeaderErrorMessageStrategy
使用 a 来提供对转换后的消息以及 raw 的访问ConsumerRecord
。
Java 配置
以下示例显示了如何使用 Java 配置消息驱动的通道适配器:
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
return kafkaMessageDrivenChannelAdapter;
}
@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
ContainerProperties properties = new ContainerProperties(this.topic);
// set more properties
return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaConsumerFactory<>(props);
}
Java DSL 配置
以下示例显示了如何使用 Spring Integration Java DSL 配置消息驱动的通道适配器:
@Bean
public IntegrationFlow topic1ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC1)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL)
.id("topic1ListenerContainer"))
.recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel(),
new RawRecordHeaderErrorMessageStrategy()))
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults1"))
.get();
}
您还可以使用用于@KafkaListener
注释的容器工厂来创建ConcurrentMessageListenerContainer
用于其他目的的实例。有关示例,请参阅Spring for Apache Kafka 文档。
使用 Java DSL,容器不必配置为@Bean
,因为 DSL 将容器注册为 bean。以下示例显示了如何执行此操作:
@Bean
public IntegrationFlow topic2ListenerFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory().createContainer(TEST_TOPIC2),
KafkaMessageDrivenChannelAdapter.ListenerMode.record)
.id("topic2Adapter"))
...
get();
}
请注意,在这种情况下,适配器被赋予一个id
( topic2Adapter
)。该容器在应用程序上下文中注册,名称为topic2Adapter.container
. 如果适配器没有id
属性,则容器的 bean 名称是容器的完全限定类名 plus #n
,其中n
每个容器递增。
XML 配置
以下示例显示了如何使用 XML 配置消息驱动的通道适配器:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
mode="record"
retry-template="template"
recovery-callback="callback"
error-message-strategy="ems"
channel="someChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="foo" />
</bean>
</constructor-arg>
</bean>
入站通道适配器
提供KafkaMessageSource
了一个可轮询的通道适配器实现。
Java 配置
@InboundChannelAdapter(channel = "fromKafka", poller = @Poller(fixedDelay = "5000"))
@Bean
public KafkaMessageSource<String, String> source(ConsumerFactory<String, String> cf) {
KafkaMessageSource<String, String> source = new KafkaMessageSource<>(cf, "myTopic");
source.setGroupId("myGroupId");
source.setClientId("myClientId");
return source;
}
有关可用属性,请参阅 javadocs。
默认情况下,max.poll.records
必须在消费者工厂中显式设置,否则如果消费者工厂是DefaultKafkaConsumerFactory
. 您可以将属性设置allowMultiFetch
为true
覆盖此行为。
您必须在内部轮询消费者max.poll.interval.ms 以避免重新平衡。如果设置allowMultiFetch 为true 必须处理所有检索到的记录,并再次轮询,在max.poll.interval.ms .
|
此适配器发出的消息包含一个标头,其中包含kafka_remainingRecords
上一次轮询中剩余的记录数。
Java DSL 配置
@Bean
public IntegrationFlow flow(ConsumerFactory<String, String> cf) {
return IntegrationFlows.from(Kafka.inboundChannelAdapter(cf, "myTopic")
.groupId("myDslGroupId"), e -> e.poller(Pollers.fixedDelay(5000)))
.handle(System.out::println)
.get();
}
XML 配置
<int-kafka:inbound-channel-adapter
id="adapter1"
consumer-factory="consumerFactory"
ack-factory="ackFactory"
topics="topic1"
channel="inbound"
client-id="client"
group-id="group"
message-converter="converter"
payload-type="java.lang.String"
raw-header="true"
auto-startup="false"
rebalance-listener="rebal">
<int:poller fixed-delay="5000"/>
</int-kafka:inbound-channel-adapter>
出站网关
出站网关用于请求/回复操作。它与大多数 Spring Integration 网关的不同之处在于,发送线程不会在网关中阻塞,并且在回复侦听器容器线程上处理回复。如果您的代码调用同步消息网关后面的网关,则用户线程会阻塞在那里,直到收到回复(或发生超时)。
网关不接受请求,直到回复容器被分配了它的主题和分区。建议您在模板的回复容器属性中添加一个,并在向网关发送消息之前
ConsumerRebalanceListener 等待调用。onPartitionsAssigned |
KafkaProducerMessageHandler
sendTimeoutExpression
默认值为delivery.timeout.ms
Kafka 生产者属性+ 5000
,以便将超时后的实际 Kafka 错误传播到应用程序,而不是由该框架生成的超时。为了保持一致性,这已更改,因为您可能会遇到意外行为(Spring 可能会超时发送,而实际上它最终是成功的)。重要提示:默认情况下超时为 120 秒,因此您可能希望减少它以获得更及时的故障。
Java 配置
以下示例显示如何使用 Java 配置网关:
@Bean
@ServiceActivator(inputChannel = "kafkaRequests", outputChannel = "kafkaReplies")
public KafkaProducerMessageHandler<String, String> outGateway(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return new KafkaProducerMessageHandler<>(kafkaTemplate);
}
有关可用属性,请参阅 javadocs。
请注意,使用了与出站通道适配器相同的类,唯一的区别是KafkaTemplate
传递给构造函数的是ReplyingKafkaTemplate
. 有关更多信息,请参阅Spring for Apache Kafka 文档。
出站主题、分区、键等的确定方式与出站适配器相同。回复主题确定如下:
-
一个名为
KafkaHeaders.REPLY_TOPIC
(如果存在,它必须有一个String
或byte[]
值)的消息头根据模板的回复容器的订阅主题进行验证。 -
如果模板
replyContainer
只订阅了一个主题,则使用它。
您还可以指定一个KafkaHeaders.REPLY_PARTITION
标头来确定要用于回复的特定分区。同样,这是针对模板的回复容器的订阅进行验证的。
Java DSL 配置
以下示例显示如何使用 Java DSL 配置出站网关:
@Bean
public IntegrationFlow outboundGateFlow(
ReplyingKafkaTemplate<String, String, String> kafkaTemplate) {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(kafkaTemplate))
.channel("kafkaReplies")
.get();
}
或者,您也可以使用类似于以下 bean 的配置:
@Bean
public IntegrationFlow outboundGateFlow() {
return IntegrationFlows.from("kafkaRequests")
.handle(Kafka.outboundGateway(producerFactory(), replyContainer())
.configureKafkaTemplate(t -> t.replyTimeout(30_000)))
.channel("kafkaReplies")
.get();
}
XML 配置
<int-kafka:outbound-gateway
id="allProps"
error-message-strategy="ems"
kafka-template="template"
message-key-expression="'key'"
order="23"
partition-id-expression="2"
reply-channel="replies"
reply-timeout="43"
request-channel="requests"
requires-reply="false"
send-success-channel="successes"
send-failure-channel="failures"
send-timeout-expression="44"
sync="true"
timestamp-expression="T(System).currentTimeMillis()"
topic-expression="'topic'"/>
入站网关
入站网关用于请求/回复操作。
以下示例显示如何使用 Java 配置入站网关:
@Bean
public KafkaInboundGateway<Integer, String, String> inboundGateway(
AbstractMessageListenerContainer<Integer, String>container,
KafkaTemplate<Integer, String> replyTemplate) {
KafkaInboundGateway<Integer, String, String> gateway =
new KafkaInboundGateway<>(container, replyTemplate);
gateway.setRequestChannel(requests);
gateway.setReplyChannel(replies);
gateway.setReplyTimeout(30_000);
return gateway;
}
有关可用属性,请参阅 javadocs。
以下示例显示了如何使用 Java DSL 配置简单的大写转换器:
@Bean
public IntegrationFlow serverGateway(
ConcurrentMessageListenerContainer<Integer, String> container,
KafkaTemplate<Integer, String> replyTemplate) {
return IntegrationFlows
.from(Kafka.inboundGateway(container, replyTemplate)
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
或者,您可以使用类似于以下的代码来配置大写转换器:
@Bean
public IntegrationFlow serverGateway() {
return IntegrationFlows
.from(Kafka.inboundGateway(consumerFactory(), containerProperties(),
producerFactory())
.replyTimeout(30_000))
.<String, String>transform(String::toUpperCase)
.get();
}
您还可以使用用于@KafkaListener
注释的容器工厂来创建ConcurrentMessageListenerContainer
用于其他目的的实例。有关示例,请参阅Spring for Apache Kafka 文档和消息驱动的通道适配器。
XML 配置
<int-kafka:inbound-gateway
id="gateway1"
listener-container="container1"
kafka-template="template"
auto-startup="false"
phase="100"
request-timeout="5000"
request-channel="nullChannel"
reply-channel="errorChannel"
reply-timeout="43"
message-converter="messageConverter"
payload-type="java.lang.String"
error-message-strategy="ems"
retry-template="retryTemplate"
recovery-callback="recoveryCallback"/>
有关每个属性的描述,请参见 XML 模式。
Apache Kafka 主题支持的频道
Spring Integration 具有MessageChannel
由 Apache Kafka 主题支持的持久性实现。
每个通道都需要一个KafkaTemplate
用于发送方,一个侦听器容器工厂(用于可订阅通道)或一个KafkaMessageSource
用于可轮询通道。
Java DSL 配置
@Bean
public IntegrationFlow flowWithSubscribable(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlows.from(...)
...
.channel(Kafka.channel(template, containerFactory, "someTopic1").groupId("group1"))
...
.get();
}
@Bean
public IntegrationFlow flowWithPubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return IntegrationFlows.from(...)
...
.publishSubscribeChannel(pubSub(template, containerFactory),
pubsub -> pubsub
.subscribe(subflow -> ...)
.subscribe(subflow -> ...))
.get();
}
@Bean
public BroadcastCapableChannel pubSub(KafkaTemplate<Integer, String> template,
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory) {
return Kafka.publishSubscribeChannel(template, containerFactory, "someTopic2")
.groupId("group2")
.get();
}
@Bean
public IntegrationFlow flowWithPollable(KafkaTemplate<Integer, String> template,
KafkaMessageSource<Integer, String> source) {
return IntegrationFlows.from(...)
...
.channel(Kafka.pollableChannel(template, source, "someTopic3").groupId("group3"))
.handle(..., e -> e.poller(...))
...
.get();
}
Java 配置
/**
* Channel for a single subscriber.
**/
@Bean
SubscribableKafkaChannel pointToPoint(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicA");
channel.setGroupId("group1");
return channel;
}
/**
* Channel for multiple subscribers.
**/
@Bean
SubscribableKafkaChannel pubsub(KafkaTemplate<String, String> template,
KafkaListenerContainerFactory<String, String> factory)
SubscribableKafkaChannel channel =
new SubscribableKafkaChannel(template, factory, "topicB", true);
channel.setGroupId("group2");
return channel;
}
/**
* Pollable channel (topic is configured on the source)
**/
@Bean
PollableKafkaChannel pollable(KafkaTemplate<String, String> template,
KafkaMessageSource<String, String> source)
PollableKafkaChannel channel =
new PollableKafkaChannel(template, source);
channel.setGroupId("group3");
return channel;
}
XML 配置
<int-kafka:channel kafka-template="template" id="ptp" topic="ptpTopic" group-id="ptpGroup"
container-factory="containerFactory" />
<int-kafka:pollable-channel kafka-template="template" id="pollable" message-source="source"
group-id = "pollableGroup"/>
<int-kafka:publish-subscribe-channel kafka-template="template" id="pubSub" topic="pubSubTopic"
group-id="pubSubGroup" container-factory="containerFactory" />
消息转换
提供了一个StringJsonMessageConverter
。有关更多信息,请参阅Spring for Apache Kafka 文档。
当将此转换器与消息驱动的通道适配器一起使用时,您可以指定要将传入有效负载转换为的类型。这是通过在适配器上设置payload-type
属性( property)来实现的。payloadType
以下示例显示了如何在 XML 配置中执行此操作:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
payload-type="com.example.Foo"
error-channel="errorChannel" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
下面的例子展示了如何在 Java 配置中设置适配器的payload-type
属性( property):payloadType
@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
new KafkaMessageDrivenChannelAdapter<>(container, ListenerMode.record);
kafkaMessageDrivenChannelAdapter.setOutputChannel(received());
kafkaMessageDrivenChannelAdapter.setMessageConverter(converter());
kafkaMessageDrivenChannelAdapter.setPayloadType(Foo.class);
return kafkaMessageDrivenChannelAdapter;
}
空负载和日志压缩“墓碑”记录
Spring MessagingMessage<?>
对象不能有null
有效负载。当您使用 Apache Kafka 的端点时,null
有效负载(也称为墓碑记录)由 类型的有效负载表示KafkaNull
。有关更多信息,请参阅Spring for Apache Kafka 文档。
Spring Integration 端点的 POJO 方法可以使用 truenull
值而不是KafkaNull
. 为此,请用 标记参数@Payload(required = false)
。以下示例显示了如何执行此操作:
@ServiceActivator(inputChannel = "fromSomeKafkaInboundEndpoint")
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload(required = false) Customer customer) {
// customer is null if a tombstone record
...
}
从一个调用 Spring Integration 流KStream
您可以使用 aMessagingTransformer
从 a 调用集成流KStream
:
@Bean
public KStream<byte[], byte[]> kStream(StreamsBuilder kStreamBuilder,
MessagingTransformer<byte[], byte[], byte[]> transformer) transformer) {
KStream<byte[], byte[]> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
stream.mapValues((ValueMapper<byte[], byte[]>) String::toUpperCase)
...
.transform(() -> transformer)
.to(streamingTopic2);
stream.print(Printed.toSysOut());
return stream;
}
@Bean
@DependsOn("flow")
public MessagingTransformer<byte[], byte[], String> transformer(
MessagingFunction function) {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(new SimpleKafkaHeaderMapper("*"));
return new MessagingTransformer<>(function, converter);
}
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(MessagingFunction.class)
...
.get();
}
当集成流以接口开始时,创建的代理具有流 bean 的名称,附加“.gateway”,因此@Qualifier
如果需要,可以使用此 bean 名称。
读/处理/写场景的性能注意事项
许多应用程序从一个主题消费,执行一些处理并写入另一个主题。在大多数情况下,如果写入失败,应用程序会希望抛出异常,以便可以重试传入请求和/或将其发送到死信主题。此功能由底层消息侦听器容器以及适当配置的错误处理程序支持。但是,为了支持这一点,我们需要阻塞监听线程,直到写操作成功(或失败),以便可以将任何异常抛出到容器中。使用单个记录时,这是通过sync
在出站适配器上设置属性来实现的。但是,在使用批次时,使用sync
导致性能显着下降,因为应用程序会在发送下一条消息之前等待每次发送的结果。您还可以执行多次发送,然后等待这些发送的结果。这是通过向futuresChannel
消息处理程序添加 a 来实现的。要启用该功能,请添加KafkaIntegrationHeaders.FUTURE_TOKEN
到出站消息;然后可以使用它来将 aFuture
与特定发送的消息相关联。以下是如何使用此功能的示例:
@SpringBootApplication
public class FuturesChannelApplication {
public static void main(String[] args) {
SpringApplication.run(FuturesChannelApplication.class, args);
}
@Bean
IntegrationFlow inbound(ConsumerFactory<String, String> consumerFactory, Handler handler) {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(consumerFactory,
ListenerMode.batch, "inTopic"))
.handle(handler)
.get();
}
@Bean
IntegrationFlow outbound(KafkaTemplate<String, String> kafkaTemplate) {
return IntegrationFlows.from(Gate.class)
.enrichHeaders(h -> h
.header(KafkaHeaders.TOPIC, "outTopic")
.headerExpression(KafkaIntegrationHeaders.FUTURE_TOKEN, "headers[id]"))
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.futuresChannel("futures"))
.get();
}
@Bean
PollableChannel futures() {
return new QueueChannel();
}
}
@Component
@DependsOn("outbound")
class Handler {
@Autowired
Gate gate;
@Autowired
PollableChannel futures;
public void handle(List<String> input) throws Exception {
System.out.println(input);
input.forEach(str -> this.gate.send(str.toUpperCase()));
for (int i = 0; i < input.size(); i++) {
Message<?> future = this.futures.receive(10000);
((Future<?>) future.getPayload()).get(10, TimeUnit.SECONDS);
}
}
}
interface Gate {
void send(String out);
}