Spring Framework 为与消息传递系统的集成提供了广泛的支持,从使用 JMS API 的简化使用JmsTemplate
到异步接收消息的完整基础架构。Spring AMQP 为高级消息队列协议提供了类似的功能集。RabbitTemplate
Spring Boot 还为RabbitMQ提供了自动配置选项。Spring WebSocket 原生包括对 STOMP 消息传递的支持,Spring Boot 通过启动器和少量的自动配置来支持它。Spring Boot 还支持 Apache Kafka。
1. JMS
该javax.jms.ConnectionFactory
接口提供了创建javax.jms.Connection
用于与 JMS 代理交互的标准方法。尽管 Spring 需要ConnectionFactory
与 JMS 一起工作,但您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。(有关详细信息,请参阅 Spring Framework 参考文档的相关部分。)Spring Boot 还自动配置必要的基础设施来发送和接收消息。
1.1。ActiveMQ 支持
当ActiveMQ在类路径上可用时,Spring Boot 还可以配置一个ConnectionFactory
. 如果存在代理,则会自动启动和配置嵌入式代理(前提是未通过配置指定代理 URL,并且未在配置中禁用嵌入式代理)。
如果使用spring-boot-starter-activemq ,则提供连接或嵌入 ActiveMQ 实例所需的依赖项,以及与 JMS 集成的 Spring 基础架构。
|
ActiveMQ 配置由spring.activemq.*
.
默认情况下,ActiveMQ 自动配置为使用VM 传输,它启动嵌入在同一 JVM 实例中的代理。
您可以通过配置属性来禁用嵌入式代理,spring.activemq.in-memory
如下例所示:
spring.activemq.in-memory=false
spring:
activemq:
in-memory: false
如果您配置代理 URL,嵌入式代理也将被禁用,如以下示例所示:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
如果您想完全控制嵌入式代理,请参阅ActiveMQ 文档以获取更多信息。
默认情况下,a使用您可以通过外部配置属性控制的合理设置CachingConnectionFactory
包装本机:ConnectionFactory
spring.jms.*
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过添加依赖项org.messaginghub:pooled-jms
并相应地配置来实现JmsPoolConnectionFactory
,如以下示例所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
有关ActiveMQProperties 更多受支持的选项,请参阅。您还可以注册实现ActiveMQConnectionFactoryCustomizer 更高级自定义的任意数量的 bean。
|
默认情况下,如果目标尚不存在,ActiveMQ 会创建一个目标,以便根据提供的名称解析目标。
1.2. ActiveMQ Artemis 支持
ConnectionFactory
当 Spring Boot检测到ActiveMQ Artemis在类路径上可用时,它可以自动配置。如果代理存在,嵌入式代理会自动启动和配置(除非已显式设置模式属性)。支持的模式是embedded
(明确需要嵌入式代理,并且如果代理在类路径上不可用,则应发生错误)和(使用传输协议native
连接到代理)。netty
配置后者时,Spring Boot 会配置一个ConnectionFactory
连接到本地机器上运行的代理的默认设置。
如果使用spring-boot-starter-artemis ,则提供连接到现有 ActiveMQ Artemis 实例的必要依赖项,以及与 JMS 集成的 Spring 基础架构。添加org.apache.activemq:artemis-jms-server 到您的应用程序可以让您使用嵌入式模式。
|
ActiveMQ Artemis 配置由spring.artemis.*
. 例如,您可以在 中声明以下部分application.properties
:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
嵌入代理时,您可以选择是否要启用持久性并列出应该可用的目标。这些可以指定为逗号分隔的列表以使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
或的bean。org.apache.activemq.artemis.jms.server.config.TopicConfiguration
默认情况下,a使用您可以通过外部配置属性控制的合理设置CachingConnectionFactory
包装本机:ConnectionFactory
spring.jms.*
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过添加依赖项org.messaginghub:pooled-jms
并相应地配置来实现JmsPoolConnectionFactory
,如以下示例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
有关ArtemisProperties
更多支持的选项,请参阅。
不涉及 JNDI 查找,并且使用name
Artemis 配置中的属性或通过配置提供的名称根据其名称解析目的地。
1.3. 使用 JNDI 连接工厂
如果您在应用程序服务器中运行应用程序,Spring Boot 会尝试ConnectionFactory
使用 JNDI 定位 JMS。默认情况下,检查java:/JmsXA
和java:/XAConnectionFactory
位置。spring.jms.jndi-name
如果需要指定替代位置,可以使用该属性,如以下示例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
1.4. 发送消息
SpringJmsTemplate
是自动配置的,您可以直接将其自动装配到您自己的 bean 中,如下例所示:
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
JmsMessagingTemplate 可以类似的方式注入。如果定义了 aDestinationResolver 或MessageConverter bean,它会自动关联到 auto-configured JmsTemplate 。
|
1.5。接收消息
当 JMS 基础设施存在时,可以对任何 bean 进行注释@JmsListener
以创建侦听器端点。如果没有JmsListenerContainerFactory
定义,则自动配置一个默认值。如果定义了 a DestinationResolver
、 aMessageConverter
或 a javax.jms.ExceptionListener
bean,它们将自动与默认工厂关联。
默认情况下,默认工厂是事务性的。如果您在存在 a 的基础架构中运行JtaTransactionManager
,则默认情况下它与侦听器容器相关联。如果不是,sessionTransacted
则启用该标志。在后一种情况下,您可以通过添加@Transactional
侦听器方法(或其委托)将本地数据存储事务与传入消息的处理相关联。这可确保在本地事务完成后确认传入消息。这还包括发送已在同一 JMS 会话上执行的响应消息。
以下组件在someQueue
目标上创建侦听器端点:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
有关更多详细信息,请参阅的 Javadoc@EnableJms 。
|
如果你需要创建更多JmsListenerContainerFactory
实例或者你想覆盖默认值,Spring Boot 提供了一个DefaultJmsListenerContainerFactoryConfigurer
你可以使用DefaultJmsListenerContainerFactory
与自动配置相同的设置来初始化 a 的实例。
例如,以下示例公开了另一个使用特定的工厂MessageConverter
:
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后您可以在任何带注释的方法中使用工厂,@JmsListener
如下所示:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
2. AMQP
高级消息队列协议 (AMQP) 是面向消息中间件的平台中立、线路级协议。Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括spring-boot-starter-amqp
“Starter”。
2.1。RabbitMQ 支持
RabbitMQ是基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息代理。Spring 使用RabbitMQ
通过 AMQP 协议进行通信。
RabbitMQ 配置由spring.rabbitmq.*
. 例如,您可以在 中声明以下部分application.properties
:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用以下addresses
属性配置相同的连接:
spring.rabbitmq.addresses=amqp://admin:[email protected]
spring:
rabbitmq:
addresses: "amqp://admin:[email protected]"
以这种方式指定地址时,host 和port 属性将被忽略。如果地址使用amqps 协议,则会自动启用 SSL 支持。
|
有关RabbitProperties
受支持的基于属性的配置选项的更多信息,请参阅。要配置 Spring AMQP 使用的 RabbitMQ 的较低级别的详细信息ConnectionFactory
,请定义一个ConnectionFactoryCustomizer
bean。
如果ConnectionNameStrategy
上下文中存在 bean,它将自动用于命名由 auto-configured 创建的连接CachingConnectionFactory
。
有关更多详细信息,请参阅了解 RabbitMQ 使用的协议 AMQP。 |
2.2. 发送消息
Spring 的AmqpTemplate
和AmqpAdmin
是自动配置的,您可以将它们直接自动装配到您自己的 bean 中,如以下示例所示:
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
RabbitMessagingTemplate 可以类似的方式注入。如果MessageConverter 定义了 bean,它会自动关联到 auto-configured AmqpTemplate 。
|
如有必要,任何org.springframework.amqp.core.Queue
定义为 bean 的内容都会自动用于在 RabbitMQ 实例上声明相应的队列。
要重试操作,您可以启用重试AmqpTemplate
(例如,在代理连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下禁用重试。您还可以RetryTemplate
通过声明RabbitRetryTemplateCustomizer
bean 以编程方式自定义。
如果您需要创建更多RabbitTemplate
实例或想要覆盖默认值,Spring Boot 提供了一个bean,您可以使用该 bean 来使用与自动配置使用的工厂相同的设置RabbitTemplateConfigurer
来初始化 a 。RabbitTemplate
2.3. 向流发送消息
要将消息发送到特定流,请指定流的名称,如以下示例所示:
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了MessageConverter
、StreamMessageConverter
或ProducerCustomizer
bean,它会自动关联到 auto-configured RabbitStreamTemplate
。
如果您需要创建更多RabbitStreamTemplate
实例或想要覆盖默认值,Spring Boot 提供了一个bean,您可以使用该 bean 来使用与自动配置使用的工厂相同的设置RabbitStreamTemplateConfigurer
来初始化 a 。RabbitStreamTemplate
2.4. 接收消息
当 Rabbit 基础设施存在时,可以对任何 bean 进行注释@RabbitListener
以创建侦听器端点。如果没有RabbitListenerContainerFactory
定义,则自动配置默认值,您可以使用该属性SimpleRabbitListenerContainerFactory
切换到直接容器。spring.rabbitmq.listener.type
如果定义了 aMessageConverter
或MessageRecoverer
bean,它会自动与默认工厂关联。
以下示例组件在someQueue
队列上创建一个侦听器端点:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
有关更多详细信息,请参阅的 Javadoc@EnableRabbit 。
|
如果您需要创建更多RabbitListenerContainerFactory
实例或想要覆盖默认值,Spring Boot 提供了 aSimpleRabbitListenerContainerFactoryConfigurer
和 a DirectRabbitListenerContainerFactoryConfigurer
,您可以使用它们来初始化 aSimpleRabbitListenerContainerFactory
和 a DirectRabbitListenerContainerFactory
,其设置与自动配置使用的工厂相同。
您选择哪种容器类型并不重要。这两个 bean 由自动配置公开。 |
例如,以下配置类公开了另一个使用特定的工厂MessageConverter
:
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后您可以在任何带注释的方法中使用工厂@RabbitListener
,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试以处理侦听器引发异常的情况。默认情况下,RejectAndDontRequeueRecoverer
使用,但您可以定义MessageRecoverer
自己的。当重试用尽时,如果代理配置为这样做,则消息将被拒绝并丢弃或路由到死信交换。默认情况下,重试被禁用。您还可以RetryTemplate
通过声明RabbitRetryTemplateCustomizer
bean 以编程方式自定义。
默认情况下,如果重试被禁用并且侦听器抛出异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将defaultRequeueRejected 属性设置为,false 以便尝试进行零次重新传递,或者抛出一个AmqpRejectAndDontRequeueException 信号来表示消息应该被拒绝。后者是启用重试并达到最大传递尝试次数时使用的机制。
|
3. Apache Kafka 支持
通过提供项目的自动配置来支持Apache Kafkaspring-kafka
。
Kafka 配置由spring.kafka.*
. 例如,您可以在 中声明以下部分application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建主题,请添加一个 bean 类型NewTopic 。如果主题已经存在,则忽略该 bean。
|
有关KafkaProperties
更多支持的选项,请参阅。
3.1。发送消息
SpringKafkaTemplate
是自动配置的,您可以直接在自己的 bean 中自动装配它,如下例所示:
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
如果spring.kafka.producer.transaction-id-prefix 定义了属性,KafkaTransactionManager 则会自动配置 a。此外,如果RecordMessageConverter 定义了 bean,它会自动关联到 auto-configured KafkaTemplate 。
|
3.2. 接收消息
当存在 Apache Kafka 基础架构时,可以使用任何 bean 进行注释@KafkaListener
以创建侦听器端点。如果没有KafkaListenerContainerFactory
定义,则自动配置一个默认值,其中定义了spring.kafka.listener.*
.
以下组件在someTopic
主题上创建侦听器端点:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果KafkaTransactionManager
定义了 bean,它会自动关联到容器工厂。类似地,如果定义了RecordFilterStrategy
、或bean CommonErrorHandler
,它会自动关联到默认工厂。AfterRollbackProcessor
ConsumerAwareRebalanceListener
根据侦听器类型,一个RecordMessageConverter
或BatchMessageConverter
bean 与默认工厂相关联。如果RecordMessageConverter
批处理侦听器仅存在一个 bean,则将其包装在BatchMessageConverter
.
ChainedKafkaTransactionManager 必须标记
自定义@Primary ,因为它通常引用自动配置的KafkaTransactionManager bean。
|
3.3. 卡夫卡流
Spring for Apache Kafka 提供了一个工厂 bean 来创建StreamsBuilder
对象并管理其流的生命周期。KafkaStreamsConfiguration
只要在类路径上, Spring Boot 就会自动配置所需的bean,并且注释kafka-streams
启用了 Kafka Streams 。@EnableKafkaStreams
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。前者可以使用 配置,如果未设置则spring.kafka.streams.application-id
默认为。spring.application.name
后者可以全局设置或仅针对流专门覆盖。
使用专用属性可以使用几个附加属性;可以使用spring.kafka.streams.properties
命名空间设置其他任意 Kafka 属性。另请参阅其他 Kafka 属性以获取更多信息。
要使用工厂 bean,请按照以下示例所示连接StreamsBuilder
到您的 bean:@Bean
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,由StreamBuilder
它创建的对象管理的流是自动启动的。spring.kafka.streams.auto-startup
您可以使用该属性自定义此行为。
3.4. 其他 Kafka 属性
自动配置支持的属性显示在附录的“集成属性”部分。请注意,在大多数情况下,这些属性(连字符或驼峰式)直接映射到 Apache Kafka 点属性。有关详细信息,请参阅 Apache Kafka 文档。
这些属性中的前几个适用于所有组件(生产者、消费者、管理员和流),但如果您希望使用不同的值,可以在组件级别指定。Apache Kafka 指定具有 HIGH、MEDIUM 或 LOW 重要性的属性。Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。
只有 Kafka 支持的属性的一个子集可以直接通过KafkaProperties
该类获得。如果您希望使用不直接支持的其他属性配置生产者或消费者,请使用以下属性:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这将通用prop.one
Kafka 属性设置为first
(适用于生产者、消费者和管理员),将prop.two
admin 属性设置为second
,将prop.three
消费者属性设置为third
,将prop.four
生产者属性设置为fourth
,将prop.five
流属性设置为fifth
。
您还可以JsonDeserializer
按如下方式配置 Spring Kafka:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,您可以禁用JsonSerializer
在标头中发送类型信息的默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以这种方式设置的属性会覆盖 Spring Boot 明确支持的任何配置项。 |
3.5. 使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方式。要使用此功能,请使用模块注释测试@EmbeddedKafka
类spring-kafka-test
。有关更多信息,请参阅 Spring for Apache Kafka参考手册。
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起工作,您需要将嵌入式代理地址(由 填充EmbeddedKafkaBroker
)的系统属性重新映射到 Apache Kafka 的 Spring Boot 配置属性中。有几种方法可以做到这一点:
-
提供一个系统属性以将嵌入的代理地址映射到
spring.kafka.bootstrap-servers
测试类中:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
@EmbeddedKafka
在注解上配置一个属性名称:
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
4.RSocket
RSocket是用于字节流传输的二进制协议。它通过通过单个连接传递的异步消息启用对称交互模型。
Spring Framework的spring-messaging
模块为客户端和服务器端的 RSocket 请求者和响应者提供支持。有关更多详细信息,请参阅 Spring Framework 参考的RSocket 部分,包括 RSocket 协议的概述。
4.1。RSocket 策略自动配置
Spring Boot 自动配置一个RSocketStrategies
bean,该 bean 提供编码和解码 RSocket 有效负载所需的所有基础设施。默认情况下,自动配置将尝试配置以下内容(按顺序):
-
Jackson 的CBOR编解码器
-
杰克逊的 JSON 编解码器
启动器提供了这spring-boot-starter-rsocket
两个依赖项。请参阅Jackson 支持部分以了解有关自定义可能性的更多信息。
开发人员可以RSocketStrategies
通过创建实现RSocketStrategiesCustomizer
接口的 bean 来自定义组件。请注意,它们@Order
很重要,因为它决定了编解码器的顺序。
4.2. RSocket 服务器自动配置
Spring Boot 提供 RSocket 服务器自动配置。所需的依赖项由spring-boot-starter-rsocket
.
Spring Boot 允许从 WebFlux 服务器通过 WebSocket 暴露 RSocket,或者建立一个独立的 RSocket 服务器。这取决于应用程序的类型及其配置。
对于 WebFlux 应用程序(类型为WebApplicationType.REACTIVE
),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
spring:
rsocket:
server:
mapping-path: "/rsocket"
transport: "websocket"
仅 Reactor Netty 支持将 RSocket 插入 Web 服务器,因为 RSocket 本身是使用该库构建的。 |
或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。除了依赖要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898
spring:
rsocket:
server:
port: 9898
4.3. Spring 消息传递 RSocket 支持
Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。
这意味着 Spring Boot 将创建一个RSocketMessageHandler
bean 来处理对应用程序的 RSocket 请求。
4.4. 使用 RSocketRequester 调用 RSocket 服务
一旦RSocket
服务器和客户端之间建立了通道,任何一方都可以向另一方发送或接收请求。
作为服务器,您可以RSocketRequester
在 RSocket 的任何处理程序方法上注入实例@Controller
。作为客户端,首先需要配置并建立 RSocket 连接。RSocketRequester.Builder
Spring Boot使用预期的编解码器为这种情况自动配置一个并应用任何RSocketConnectorConfigurer
bean。
该RSocketRequester.Builder
实例是一个原型 bean,这意味着每个注入点都会为您提供一个新的实例。这是故意这样做的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。
下面的代码展示了一个典型的例子:
@Service
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
}
public Mono<User> someRSocketCall(String name) {
return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
}
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
private val rsocketRequester: RSocketRequester
init {
rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
}
fun someRSocketCall(name: String): Mono<User> {
return rsocketRequester.route("user").data(name).retrieveMono(
User::class.java
)
}
}
5. 弹簧集成
Spring Boot 为使用Spring Integration提供了多种便利,包括spring-boot-starter-integration
“Starter”。Spring Integration 提供了对消息传递以及其他传输(如 HTTP、TCP 等)的抽象。如果 Spring Integration 在您的类路径上可用,它会通过@EnableIntegration
注解进行初始化。
Spring Integration 轮询逻辑依赖于自动配置的TaskScheduler
. 可以使用配置属性自定义默认值PollerMetadata
(每秒轮询无限数量的消息) 。spring.integration.poller.*
Spring Boot 还配置了一些由额外的 Spring Integration 模块触发的特性。如果spring-integration-jmx
也在类路径上,则通过 JMX 发布消息处理统计信息。如果spring-integration-jdbc
可用,则可以在启动时创建默认数据库模式,如下行所示:
spring.integration.jdbc.initialize-schema=always
spring:
integration:
jdbc:
initialize-schema: "always"
如果spring-integration-rsocket
可用,开发人员可以使用"spring.rsocket.server.*"
属性配置 RSocket 服务器,并让它使用IntegrationRSocketEndpoint
或RSocketOutboundGateway
组件来处理传入的 RSocket 消息。此基础设施可以处理 Spring Integration RSocket 通道适配器和@MessageMapping
处理程序("spring.integration.rsocket.server.message-mapping-enabled"
已配置)。
Spring Boot 还可以自动配置ClientRSocketConnector
using 配置属性:
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket server over TCP
spring:
integration:
rsocket:
client:
host: "example.org"
port: 9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
# Connecting to a RSocket Server over WebSocket
spring:
integration:
rsocket:
client:
uri: "ws://example.org"
有关更多详细信息,请参阅IntegrationAutoConfiguration
和IntegrationProperties
类。
6. WebSockets
Spring Boot provides WebSockets auto-configuration for embedded Tomcat, Jetty, and Undertow. If you deploy a war file to a standalone container, Spring Boot assumes that the container is responsible for the configuration of its WebSocket support.
Spring Framework provides rich WebSocket support for MVC web applications that can be easily accessed through the spring-boot-starter-websocket
module.
WebSocket support is also available for reactive web applications and requires to include the WebSocket API alongside spring-boot-starter-webflux
:
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
</dependency>