MQTT 支持

Spring Integration 提供入站和出站通道适配器以支持消息队列遥测传输 (MQTT) 协议。

您需要将此依赖项包含到您的项目中:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.13</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-mqtt:5.5.13"

当前实现使用Eclipse Paho MQTT 客户端库。

XML 配置和本章的大部分内容是关于 MQTT v3.1 协议支持和各自的 Paho 客户端。有关相应协议的支持,请参阅MQTT v5 支持段落。

两个适配器的配置是使用DefaultMqttPahoClientFactory. 有关配置选项的更多信息,请参阅 Paho 文档。

我们建议配置一个MqttConnectOptions对象并将其注入工厂,而不是在工厂本身上设置(不推荐使用的)选项。

入站(消息驱动)通道适配器

入站通道适配器由MqttPahoMessageDrivenChannelAdapter. 为方便起见,您可以使用命名空间进行配置。一个最小的配置可能如下:

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下清单显示了可用的属性:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户端 ID。
2 代理 URL。
3 此适配器从中接收消息的以逗号分隔的主题列表。
4 QoS 值的逗号分隔列表。它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
5 一个MqttMessageConverter(可选)。默认情况下,默认DefaultPahoMessageConverter生成String带有以下标头的有效负载的消息:
  • mqtt_topic: 收到消息的主题

  • mqtt_duplicate:true如果消息是重复的

  • mqtt_qos: 服务质量 您可以通过将其声明为 a并将属性设置为 来配置DefaultPahoMessageConverter返回有效负载中的原始数据。byte[]<bean/>payloadAsBytestrue

6 客户工厂。
7 发送超时。它仅适用于通道可能阻塞(例如QueueChannel当前已满的有界)。
8 错误通道。如果提供了下游异常,则以ErrorMessage. 有效负载是MessagingException包含失败消息和原因的。
9 恢复间隔。它控制适配器在失败后尝试重新连接的时间间隔。默认为10000ms(十秒)。
10 确认方式;设置为 true 以进行手动确认。
从 4.1 版开始,您可以省略 URL。相反,您可以serverURIsDefaultMqttPahoClientFactory. 例如,这样做可以连接到高可用性 (HA) 集群。

从版本 4.2.2 开始,MqttSubscribedEvent当适配器成功订阅主题时发布。 MqttConnectionFailedEvent连接或订阅失败时发布事件。这些事件可以由实现ApplicationListener.

此外,一个名为的新属性recoveryInterval控制适配器在失败后尝试重新连接的时间间隔。默认为10000ms(十秒)。

在 4.2.3 版本之前,当适配器停止时,客户端总是取消订阅。这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便在适配器停止时到达的消息在下次启动时传递。这也需要将cleanSession客户端工厂的属性设置为false. 它默认为true.

从版本 4.2.3 开始,如果cleanSession属性为false.

consumerCloseAction可以通过在工厂上设置属性来覆盖此行为。它可以有值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。后者(默认)仅在cleanSession属性为时取消订阅true

要恢复到 4.2.3 之前的行为,请使用UNSUBSCRIBE_ALWAYS.

从 5.0 版开始topicqos、 和retained属性映射到.RECEIVED_…​标头(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免无意传播到(默认情况下)使用MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED标头的出站消息。

在运行时添加和删除主题

从 4.1 版开始,您可以通过编程方式更改适配器订阅的主题。Spring Integration 提供了addTopic()removeTopic()方法。添加主题时,您可以选择指定QoS(默认值:1)。您还可以通过向具有<control-bus/>适当负载的适当消息发送适当的消息来修改主题 - 例如:"myMqttAdapter.addTopic('foo', 1)".

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。这些更改不会保留在应用程序上下文的生命周期之外。新的应用程序上下文恢复为配置的设置。

在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。

手动确认

从 5.3 版开始,您可以将该manualAcks属性设置为 true。通常用于异步确认交付。当设置为 时true,标头 ( IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 将添加到值为 a 的消息中SimpleAcknowledgment。您必须调用该acknowledge()方法才能完成交付。有关更多信息,IMqttClient setManualAcks()请参阅 Javadocs 。messageArrivedComplete()为方便起见,提供了标题访问器:

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从 version 开始5.2.11,当消息转换器抛出异常或nullMqttMessage转换返回时,如果提供了,则MqttPahoMessageDrivenChannelAdapter发送一个ErrorMessage到 中。errorChannel否则将此转换错误重新抛出到 MQTT 客户端回调中。

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2");)
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道适配器

出站通道适配器由MqttPahoMessageHandler包装在ConsumerEndpoint. 为方便起见,您可以使用命名空间进行配置。

从 4.1 版本开始,适配器支持异步发送操作,避免阻塞直到确认发送。如果需要,您可以发出应用程序事件以使应用程序能够确认交付。

以下清单显示了可用于出站通道适配器的属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户端 ID。
2 代理 URL。
3 一个MqttMessageConverter(可选)。默认DefaultPahoMessageConverter识别以下标头:
  • mqtt_topic: 消息将发送到的主题

  • mqtt_retained:true如果要保留消息

  • mqtt_qos: 服务质量

4 客户工厂。
5 默认服务质量。如果没有mqtt_qos找到标题或qos-expression返回,则使用它null。如果您提供自定义converter.
6 用于评估以确定 qos 的表达式。默认值为headers[mqtt_qos].
7 保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter
8 要评估以确定保留的布尔值的表达式。默认值为headers[mqtt_retained].
9 消息发送到的默认主题(如果没有mqtt_topic找到标头,则使用)。
10 用于评估以确定目标主题的表达式。默认值为headers['mqtt_topic'].
11 当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。
12 asyncasync-eventsare bothtrue时,MqttMessageSentEvent会发出 an (请参阅事件)。它包含消息、主题、messageId客户端库生成的clientId、 和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,MqttMessageDeliveredEvent会发出 an 。它包含messageIdclientIdclientInstance,使传递与发送相关联。任何ApplicationListener或事件入站通道适配器都可以接收这些事件。请注意,有可能在MqttMessageDeliveredEvent之前收到MqttMessageSentEvent。默认值为false.
从 4.1 版开始,可以省略 URL。相反,可以serverURIsDefaultMqttPahoClientFactory. 例如,这可以连接到高可用性 (HA) 集群。

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

活动

某些应用程序事件由适配器发布。

  • MqttConnectionFailedEvent- 如果我们无法连接或随后丢失连接,则由两个适配器发布。

  • MqttMessageSentEvent- 如果在异步模式下运行,则在发送消息时由出站适配器发布。

  • MqttMessageDeliveredEvent- 如果在异步模式下运行,则在客户端指示消息已传递时由出站适配器发布。

  • MqttSubscribedEvent- 订阅主题后由入站适配器发布。

这些事件可以通过一个ApplicationListener<MqttIntegrationEvent>或一个@EventListener方法接收。

要确定事件的来源,请使用以下内容;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从版本 5.5.5 开始,该spring-integration-mqtt模块为 MQTT v5 协议提供通道适配器实现。这org.eclipse.paho:org.eclipse.paho.mqttv5.client是一个optional依赖项,因此必须明确包含在目标项目中。

由于 MQTT v5 协议在 MQTT 消息中支持额外的任意属性,MqttHeaderMapper因此引入了实现来映射到/从发布和接收操作的标头。默认情况下(通过*模式)它映射所有接收到的PUBLISH帧属性(包括用户属性)。PUBLISH在出站端,它为frame: contentType, mqtt_messageExpiryInterval, mqtt_responseTopic,映射此标头子集mqtt_correlationData

MQTT v5 协议的出站通道适配器以Mqttv5PahoMessageHandler. 它需要一个clientIdMQTT 代理 URL 或MqttConnectionOptions参考。它支持一个MqttClientPersistence选项,在这种情况下可以async并且可以发射MqttIntegrationEvent对象(参见asyncEvents选项)。如果请求消息有效负载是org.eclipse.paho.mqttv5.common.MqttMessage,它会通过 internal 发布IMqttAsyncClient。如果有效负载是byte[],则按原样使用目标MqttMessage有效负载来发布。如果有效负载是 a String,则将其转换byte[]为发布。其余的用例被委托给提供的MessageConverter,它是IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter来自应用程序上下文的 bean。注意:HeaderMapper<MqttProperties>当请求的消息有效负载已经是MqttMessage. 以下 Java DSL 配置示例演示了如何在集成流程中使用此通道适配器:

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter不能与 the 一起使用, 因为Mqttv5PahoMessageHandler它的合约仅针对 MQTT v3 协议。

如果在启动或运行时连接失败,则Mqttv5PahoMessageHandler尝试在生成到此处理程序的下一条消息时重新连接。如果此手动重新连接失败,则将连接异常抛出给调用者。在这种情况下,将应用标准 Spring Integration 错误处理过程,包括请求处理程序建议,例如重试或断路器。

Mqttv5PahoMessageHandler在javadocs 及其超类中查看更多信息。

MQTT v5 协议的入站通道适配器以Mqttv5PahoMessageDrivenChannelAdapter. 它需要一个clientIdMQTT 代理 URL 或MqttConnectionOptions参考,以及订阅和消费的主题。它支持一个MqttClientPersistence选项,默认情况下在内存中。可以配置预期的payloadType(默认情况下)并将其传播到提供的转换 from的接收。如果设置了该选项,则会将标头添加到消息中以作为. 用于将框架属性(包括用户属性)映射到目标消息头中。标准属性,例如, , ,byte[]SmartMessageConverterbyte[]MqttMessagemanualAckIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKSimpleAcknowledgmentHeaderMapper<MqttProperties>PUBLISHMqttMessageqosiddupretained, 加上收到的主题总是映射到标题。有关MqttHeaders更多信息,请参阅。

以下 Java DSL 配置示例演示了如何在集成流程中使用此通道适配器:

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlows.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter不能与 the 一起使用, 因为Mqttv5PahoMessageDrivenChannelAdapter它的合约仅针对 MQTT v3 协议。

Mqttv5PahoMessageDrivenChannelAdapter在javadocs 及其超类中查看更多信息。

建议将MqttConnectionOptions#setAutomaticReconnect(boolean)设置设置为 true 以让内部IMqttAsyncClient实例处理重新连接。否则,只有手动重启Mqttv5PahoMessageDrivenChannelAdapter才能处理重新连接,例如通过MqttConnectionFailedEvent处理断开连接。

1. see XML Configuration