ZeroMQ 支持
Spring Integration 提供组件来支持应用程序中的ZeroMQ通信。该实现基于JeroMQ库得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期并在内部为它们管理线程,从而使与这些组件的交互无锁且线程安全。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:5.5.13"
ZeroMQ 代理
ZeroMqProxy
是内置ZMQ.proxy()
函数的Spring 友好包装器。它封装了套接字生命周期和线程管理。该代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。除了标准之外,ZContext
它还需要一种著名的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,一对合适的 ZeroMQ 套接字类型被用于代理的前端和后端。详情请参阅ZeroMqProxy.Type
。
用于创建、绑定和配置套接字并从(如果有的话)在专用线程中启动的ZeroMqProxy
实现。前端和后端套接字的绑定是通过协议在所有可用的网络接口和提供的端口上完成的。否则,它们将绑定到随机端口,稍后可以通过相应的API方法获得这些端口。SmartLifecycle
ZMQ.proxy()
Executor
tcp://
getFrontendPort()
getBackendPort()
控制套接字公开为地址SocketType.PAIR
上的线程间传输"inproc://" + beanName + ".control"
;可以通过getControlAddress()
. 它应该与来自另一个SocketType.PAIR
套接字的相同应用程序一起使用以发送ZMQ.PROXY_TERMINATE
和ZMQ.PROXY_PAUSE
/或ZMQ.PROXY_RESUME
命令。当调用它的生命周期时ZeroMqProxy
执行一个ZMQ.PROXY_TERMINATE
命令来终止循环并优雅地关闭所有绑定的套接字。stop()
ZMQ.proxy()
该setExposeCaptureSocket(boolean)
选项使该组件绑定一个额外的线程间套接字,SocketType.PUB
以捕获和发布前端和后端套接字之间的所有通信,因为它在ZMQ.proxy()
实现中声明。此套接字绑定到该"inproc://" + beanName + ".capture"
地址,并且不需要任何特定的过滤订阅。
前端和后端套接字可以使用其他属性进行自定义,例如读/写超时或安全性。此自定义可分别通过setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
和setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
回调获得。
ZeroMqProxy
可以像这样提供简单的bean :
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过连接到该代理的主机tcp://
并使用他们感兴趣的相应端口。
ZeroMQ 消息通道
它使用ZeroMqChannel
一SubscribableChannel
对 ZeroMQ 套接字来连接发布者和订阅者以进行消息交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用PAIR
套接字) -connectUrl
在这种情况下不提供。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。connect url 选项是一个标准的 ZeroMQ 连接字符串,带有协议和主机以及一对冒号上的端口,用于 ZeroMQ 代理的前端和后端套接字。ZeroMqProxy
为方便起见,如果在与代理相同的应用程序中配置通道,则可以为通道提供实例而不是连接字符串。
发送和接收套接字都在它们自己的专用线程中进行管理,从而使该通道对并发友好。这样,我们可以在ZeroMqChannel
不同步的情况下从不同的线程发布和消费。
默认情况下,ZeroMqChannel
使用杰克逊 JSON 处理器EmbeddedJsonHeadersMessageMapper
来(反)序列化Message
(包括标题)从/到。byte[]
这个逻辑可以通过配置setMessageMapper(BytesMessageMapper)
。
可以通过相应setSendSocketConfigurer(Consumer<ZMQ.Socket>)
的和setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
回调为任何选项(读/写超时、安全等)自定义发送和接收套接字。
的内部逻辑ZeroMqChannel
基于通过 Project ReactorFlux
和Mono
操作符的反应流。这提供了更简单的线程控制,并允许无锁并发发布和消费到/从通道。本地 PUB/SUB 逻辑被实现为一个Flux.publish()
操作符,以允许该通道的所有本地订阅者像PUB
套接字的分布式订阅者一样接收相同的发布消息。
下面是一个简单的ZeroMqChannel
配置示例:
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
这ZeroMqMessageProducer
是一个MessageProducerSupport
具有反应语义的实现。它以非阻塞方式不断地从 ZeroMQ 套接字读取数据,并将消息发布Flux
到由 a 订阅FluxMessageChannel
或在start()
方法中显式订阅的无限,如果输出通道不是反应式的。当套接字上没有接收到数据时,consumeDelay
在下一次读取尝试之前应用一个(默认为 1 秒)。
只有SocketType.PAIR
和SocketType.PULL
都SocketType.SUB
支持ZeroMqMessageProducer
。该组件可以使用提供的或随机端口连接到远程套接字或绑定到 TCP 协议。getBoundPort()
启动该组件并绑定 ZeroMQ 套接字后,即可获取实际端口。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调配置套接字选项(例如安全性或写入超时)。
如果receiveRaw
选项设置为true
,ZMsg
则从套接字消耗的 a 将按原样在生产者的有效负载中发送Message
:由下游流解析和转换ZMsg
. 否则InboundMessageMapper
使用 an 将消费的数据转换为Message
. 如果接收ZMsg
的是多帧,则第一帧被视为ZeroMqHeaders.TOPIC
此 ZeroMQ 消息发布到的标头。
使用SocketType.SUB
,ZeroMqMessageProducer
使用提供topics
的订阅选项;默认订阅所有。可以在运行时使用subscribeToTopics()
和unsubscribeFromTopics()
@ManagedOperation
s 调整订阅。
这是一个ZeroMqMessageProducer
配置示例:
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
这ZeroMqMessageHandler
是一种ReactiveMessageHandler
将发布消息生成到 ZeroMQ 套接字的实现。仅SocketType.PAIR
,SocketType.PUSH
和SocketType.PUB
受支持。ZeroMqMessageHandler
唯一支持连接ZeroMQ socket ;不支持绑定。使用 时SocketType.PUB
,将topicExpression
针对请求消息进行评估,以将主题框架注入 ZeroMQ 消息(如果它不为空)。订阅方 ( SocketType.SUB
) 必须先接收主题帧,然后才能解析实际数据。当请求消息的有效负载是 aZMsg
时,不执行转换或主题提取:按ZMsg
原样发送到套接字并且不会销毁它以供进一步重用。否则一个OutboundMessageMapper<byte[]>
用于将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以进行发布。默认情况下, aConvertingBytesMessageMapper
与 a 一起使用ConfigurableCompositeMessageConverter
。可以通过setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
回调配置套接字选项(例如安全性或写入超时)。
这是一个ZeroMqMessageHandler
配置示例:
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
它通过工厂和上述组件的实现spring-integration-zeromq
提供了一个方便的 Java DSL fluent API 。ZeroMq
IntegrationComponentSpec
这是一个 Java DSL 示例ZeroMqChannel
:
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
IntegrationFlows.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}