TCP 和 UDP 支持
Spring Integration 提供了用于通过 Internet 协议接收和发送消息的通道适配器。提供了 UDP(用户数据报协议)和 TCP(传输控制协议)适配器。每个适配器都通过底层协议提供单向通信。此外,Spring Integration 提供了简单的入站和出站 TCP 网关。当需要双向通信时使用这些。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-ip:5.5.13"
介绍
提供了两种 UDP 入站和出站通道适配器:
-
UnicastSendingMessageHandler
将数据报包发送到单个目的地。 -
UnicastReceivingChannelAdapter
接收传入的数据报包。 -
MulticastSendingMessageHandler
向多播地址发送(广播)数据报包。 -
MulticastReceivingChannelAdapter
通过加入多播地址来接收传入的数据报包。
提供 TCP 入站和出站通道适配器:
-
TcpSendingMessageHandler
通过 TCP 发送消息。 -
TcpReceivingChannelAdapter
通过 TCP 接收消息。
提供了一个入站 TCP 网关。它允许简单的请求-响应处理。虽然网关可以支持任意数量的连接,但每个连接只能串行处理。从套接字读取的线程在再次读取之前等待并发送响应。如果连接工厂配置为单次使用连接,则在套接字超时后关闭连接。
提供了出站 TCP 网关。它允许简单的请求-响应处理。如果关联的连接工厂配置为一次性连接,则会立即为每个新请求创建一个新连接。否则,如果连接正在使用中,则调用线程会阻塞连接,直到收到响应或发生超时或 I/O 错误。
TCP 和 UDP 入站通道适配器以及 TCP 入站网关支持该error-channel
属性。这提供了与输入GatewayProxyFactoryBean
.
UDP 适配器
本节介绍如何配置和使用 UDP 适配器。
出站 UDP 适配器(XML 配置)
以下示例配置 UDP 出站通道适配器:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
socket-customizer="udpCustomizer"
channel="exampleChannel"/>
当设置multicast 为 时true ,您还应该在主机属性中提供多播地址。
|
UDP 是一种高效但不可靠的协议。Spring Integration 添加了两个属性来提高可靠性:check-length
和acknowledge
. 当check-length
设置为true
时,适配器在消息数据之前加上一个长度字段(网络字节顺序中的四个字节)。这使接收方能够验证接收到的数据包的长度。如果接收系统使用的缓冲区太短而无法容纳数据包,则可以截断数据包。标length
头提供了一种机制来检测这一点。
从版本 4.3 开始,您可以将 设置port
为0
,在这种情况下操作系统会选择端口。选择的端口可以通过getPort()
在适配器启动并isListening()
返回后调用来发现true
。
从版本 5.3.3 开始,您可以添加一个SocketCustomizer
bean 以DatagramSocket
在创建后对其进行修改(例如,调用setTrafficClass(0x10)
)。
以下示例显示了一个向数据报包添加长度检查的出站通道适配器:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
channel="exampleChannel"/>
数据包的接收者还必须配置为期望在实际数据之前有一个长度。对于 Spring Integration UDP 入站通道适配器,设置其check-length 属性。
|
第二个可靠性改进允许使用应用程序级确认协议。接收方必须在指定时间内向发送方发送确认。
以下示例显示了一个出站通道适配器,它向数据报包添加长度检查并等待确认:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
acknowledge="true"
ack-host="thishost"
ack-port="22222"
ack-timeout="10000"
channel="exampleChannel"/>
设置acknowledge 为true 意味着数据包的接收者可以解释添加到包含确认数据(主机和端口)的数据包的标头。最有可能的是,接收方是 Spring Integration 入站通道适配器。
|
当多播为真时,附加属性 ( min-acks-for-success ) 指定必须在ack-timeout .
|
从版本 4.3 开始,您可以将 设置ackPort
为0
,在这种情况下操作系统会选择端口。
出站 UDP 适配器(Java 配置)
以下示例显示如何使用 Java 配置出站 UDP 适配器:
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
return new UnicastSendingMessageHandler("localhost", 11111);
}
(或MulticastSendingChannelAdapter
用于多播)。
出站 UDP 适配器(Java DSL 配置)
以下示例显示如何使用 Java DSL 配置出站 UDP 适配器:
@Bean
public IntegrationFlow udpOutFlow() {
return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
.configureSocket(socket -> socket.setTrafficClass(0x10)))
.get();
}
入站 UDP 适配器(XML 配置)
以下示例显示如何配置基本单播入站 udp 通道适配器。
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="false"
socket-customizer="udpCustomizer"
check-length="true"/>
以下示例显示如何配置基本多播入站 udp 通道适配器:
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="true"
multicast-address="225.6.7.8"
check-length="true"/>
默认情况下,对入站数据包执行反向 DNS 查找,以将 IP 地址转换为主机名,以便在消息头中使用。在未配置 DNS 的环境中,这可能会导致延迟。lookup-host
您可以通过将属性设置为 来覆盖此默认行为false
。
从版本 5.3.3 开始,您可以添加一个SocketCustomizer
bean 以DatagramSocket
在创建后对其进行修改。为接收套接字和为发送确认而创建的任何套接字调用它。
入站 UDP 适配器(Java 配置)
以下示例显示如何使用 Java 配置入站 UDP 适配器:
@Bean
public UnicastReceivingChannelAdapter udpIn() {
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
adapter.setOutputChannelName("udpChannel");
return adapter;
}
以下示例显示如何使用 Java DSL 配置入站 UDP 适配器:
入站 UDP 适配器(Java DSL 配置)
@Bean
public IntegrationFlow udpIn() {
return IntegrationFlows.from(Udp.inboundAdapter(11111))
.channel("udpChannel")
.get();
}
服务器监听事件
从版本 5.0.2 开始,UdpServerListeningEvent
当入站适配器启动并开始侦听时会发出 a。当适配器配置为侦听端口 0(即操作系统选择端口)时,这很有用。isListening()
如果您需要在启动其他将连接到套接字的进程之前等待,它也可以用来代替 polling 。
高级出站配置
( <int-ip:udp-outbound-channel-adapter>
)UnicastSendingMessageHandler
有destination-expression
和socket-expression
选项。
您可以将用作硬编码-对destination-expression
的运行时替代方案,以确定针对 a 的传出数据报包的目标地址(使用评估上下文的根对象)。表达式必须计算为URI 样式(参见RFC-2396)中的 a 、 a 或a 。您还可以为此表达式使用入站标头。在框架中,当我们在 中接收数据报并将它们转换为消息时,会填充此标头。标头值正是传入数据报的结果。host
port
requestMessage
URI
String
SocketAddress
IpHeaders.PACKET_ADDRESS
DatagramPacketMessageMapper
UnicastReceivingChannelAdapter
DatagramPacket.getSocketAddress()
使用socket-expression
,出站通道适配器可以使用(例如)入站通道适配器套接字通过接收数据报的同一端口发送数据报。它在我们的应用程序作为 UDP 服务器工作并且客户端在网络地址转换 (NAT) 后面运行的场景中很有用。此表达式必须计算为 a DatagramSocket
。requestMessage
用作评估上下文的根对象。不能将socket-expression
参数与multicast
和acknowledge
参数一起使用。以下示例显示如何使用转换为大写并使用套接字的转换器配置 UDP 入站通道适配器:
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />
<int:channel id="in" />
<int:transformer expression="new String(payload).toUpperCase()"
input-channel="in" output-channel="out"/>
<int:channel id="out" />
<int-ip:udp-outbound-channel-adapter id="outbound"
socket-expression="@inbound.socket"
destination-expression="headers['ip_packetAddress']"
channel="out" />
以下示例显示了 Java DSL 的等效配置:
@Bean
public IntegrationFlow udpEchoUpcaseServer() {
return IntegrationFlows.from(Udp.inboundAdapter(11111).id("udpIn"))
.<byte[], String>transform(p -> new String(p).toUpperCase())
.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
.socketExpression("@udpIn.socket"))
.get();
}
TCP 连接工厂
概述
对于 TCP,底层连接的配置是通过使用连接工厂来提供的。提供了两种类型的连接工厂:客户端连接工厂和服务器连接工厂。客户端连接工厂建立传出连接。服务器连接工厂监听传入的连接。
出站通道适配器使用客户端连接工厂,但您也可以将客户端连接工厂的引用提供给入站通道适配器。该适配器接收在出站适配器创建的连接上接收到的任何传入消息。
入站通道适配器或网关使用服务器连接工厂。(事实上,没有一个连接工厂就无法运行)。您还可以提供对出站适配器的服务器连接工厂的引用。然后,您可以使用该适配器向同一连接上的传入消息发送回复。
仅当回复包含ip_connectionId 连接工厂插入到原始消息中的标头时,回复消息才会路由到连接。
|
这是在入站和出站适配器之间共享连接工厂时执行的消息关联程度。这种共享允许通过 TCP 进行异步双向通信。默认情况下,仅使用 TCP 传输有效负载信息。因此,任何消息关联都必须由下游组件(例如聚合器或其他端点)执行。在 3.0 版中引入了对传输所选标头的支持。有关详细信息,请参阅TCP 消息关联。 |
您可以将连接工厂的引用引用到每种类型的最多一个适配器。
Spring Integration 提供了使用java.net.Socket
和的连接工厂java.nio.channel.SocketChannel
。
以下示例显示了一个使用java.net.Socket
连接的简单服务器连接工厂:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
以下示例显示了一个使用java.nio.channel.SocketChannel
连接的简单服务器连接工厂:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring Integration 版本 4.2 开始,如果服务器配置为侦听随机端口(通过将端口设置为0 ),您可以使用getPort() . 此外,还getServerSocketAddress() 可以让您获得完整的SocketAddress . 有关更多信息,
请参阅接口的Javadoc 。TcpServerConnectionFactory |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
以下示例显示了一个客户端连接工厂,它使用java.net.Socket
连接并为每条消息创建一个新连接:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从 5.2 版本开始,客户端连接工厂支持属性connectTimeout
,以秒为单位指定,默认为 60。
消息划分(序列化器和反序列化器)
TCP 是一种流协议。这意味着必须为通过 TCP 传输的数据提供某种结构,以便接收方可以将数据划分为离散的消息。连接工厂配置为使用序列化器和反序列化器在消息有效负载和通过 TCP 发送的位之间进行转换。这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。Spring Integration 提供了许多标准的序列化器和反序列化器。
ByteArrayCrlfSerializer
*将字节数组转换为字节流,后跟回车符和换行符 ( \r\n
)。这是默认的序列化程序(和反序列化程序),可以(例如)与 telnet 作为客户端一起使用。
ByteArraySingleTerminatorSerializer
*将字节数组转换为字节流,后跟单个终止字符(默认为0x00
)。
ByteArrayLfSerializer
*将字节数组转换为后跟单个换行符 ( 0x0a
) 的字节流。
ByteArrayStxEtxSerializer
*将一个字节数组转换为一个字节流,前面是一个 STX ( 0x02
),后面是一个 ETX ( 0x03
)。
将ByteArrayLengthHeaderSerializer
字节数组转换为字节流,前面是网络字节顺序(大端)的二进制长度。这是一个高效的反序列化器,因为它不必解析每个字节来查找终止字符序列。它也可以用于包含二进制数据的有效载荷。前面的序列化程序仅支持负载中的文本。长度标头的默认大小是四个字节(一个整数),允许最多 (2^31 - 1) 个字节的消息。但是,length
对于最多 255 字节的消息,标头可以是单字节(无符号),对于最多 (2^16 - 1) 字节的消息,标头可以是无符号短(2 字节)。如果您需要任何其他格式的标头,您可以子类化并为andByteArrayLengthHeaderSerializer
提供实现readHeader
writeHeader
方法。绝对最大数据大小为 (2^31 - 1) 字节。从版本 5.2 开始,标头值可以包括标头的长度以及有效负载。设置inclusive
属性以启用该机制(对于生产者和消费者,它必须设置为相同)。
ByteArrayRawSerializer
*将字节数组转换为字节流,并且不添加额外的消息分界数据。使用这个序列化器(和反序列化器),消息的结束由客户端以有序的方式关闭套接字来指示。使用此序列化程序时,消息接收会挂起,直到客户端关闭套接字或发生超时。超时不会产生消息。当使用此序列化程序且客户端是 Spring Integration 应用程序时,客户端必须使用配置有的连接工厂single-use="true"
. 这样做会导致适配器在发送消息后关闭套接字。序列化程序本身不会关闭连接。您应该仅将此序列化程序与通道适配器(而不是网关)使用的连接工厂一起使用,并且连接工厂应该由入站或出站适配器使用,但不能同时使用。另请参阅ByteArrayElasticRawDeserializer
本节后面的 。但是,从 5.2 版开始,出站网关有了一个新属性closeStreamAfterSend
;这允许使用原始序列化器/反序列化器,因为 EOF 被发送给服务器,同时保持连接打开以接收回复。
在版本 4.2.2 之前,当使用非阻塞 I/O (NIO) 时,此序列化程序将超时(读取期间)视为文件结束,并且到目前为止读取的数据作为消息发出。这是不可靠的,不应用于分隔消息。它现在将此类情况视为例外。万一您以这种方式使用它,您可以通过将treatTimeoutAsEndOfMessage 构造函数参数设置为 来恢复以前的行为true 。
|
它们中的每一个都是 的子类AbstractByteArraySerializer
,它同时实现org.springframework.core.serializer.Serializer
和org.springframework.core.serializer.Deserializer
。为了向后兼容,使用任何子类AbstractByteArraySerializer
进行序列化的连接也接受String
首先转换为字节数组的 a。这些序列化器和反序列化器中的每一个都将包含相应格式的输入流转换为字节数组有效负载。
为了避免由于表现不佳的客户端(不遵守配置的序列化程序协议的客户端)而导致内存耗尽,这些序列化程序会施加最大消息大小。如果传入消息超过此大小,则会引发异常。默认的最大消息大小为 2048 字节。maxMessageSize
您可以通过设置属性来增加它。如果您使用默认序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为具有maxMessageSize
属性集的显式 bean,并将连接工厂配置为使用该 bean。
本节前面标有*的类使用中间缓冲区并将解码的数据复制到正确大小的最终缓冲区。从版本 4.3 开始,您可以通过设置一个poolSize
属性来配置这些缓冲区,以让这些原始缓冲区被重用,而不是为每条消息分配和丢弃,这是默认行为。将该属性设置为负值会创建一个没有边界的池。如果池是有界的,您还可以设置poolWaitTimeout
属性(以毫秒为单位),之后如果没有可用的缓冲区,则会引发异常。它默认为无穷大。这样的异常会导致套接字关闭。
如果您希望在自定义反序列化器中使用相同的机制,您可以扩展AbstractPooledBufferByteArraySerializer
(而不是它的超类AbstractByteArraySerializer
)并实现doDeserialize()
而不是deserialize()
. 缓冲区会自动返回到池中。
AbstractPooledBufferByteArraySerializer
还提供了一个方便的实用方法:copyToSizedArray()
.
5.0 版添加了ByteArrayElasticRawDeserializer
. 这类似于ByteArrayRawSerializer
上面的解串器端,除了不需要设置 a maxMessageSize
。在内部,它使用 aByteArrayOutputStream
让缓冲区根据需要增长。客户端必须按顺序关闭套接字以发出消息结束的信号。
此反序列化程序仅应在对等点受信任时使用;由于内存不足,它很容易受到 DoS 附加的影响。 |
使用MapJsonSerializer
Jackson在 a和 JSONObjectMapper
之间进行转换。Map
您可以将此序列化程序与 aMessageConvertingTcpMessageMapper
和 a结合使用,MapMessageConverter
以 JSON 格式传输选定的标头和有效负载。
JacksonObjectMapper 无法区分流中的消息。因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息划分。默认情况下,ByteArrayLfSerializer 使用 a,从而生成格式为<json><LF> 在线的消息,但您可以将其配置为使用其他格式。(下一个示例显示了如何执行此操作。)
|
最终的标准序列化器是org.springframework.core.serializer.DefaultSerializer
,您可以使用它通过 Java 序列化来转换可序列化对象。
org.springframework.core.serializer.DefaultDeserializer
提供用于包含可序列化对象的流的入站反序列化。
如果您不希望使用默认的序列化器和反序列化器 ( ByteArrayCrLfSerializer
),则必须在连接工厂上设置serializer
和属性。deserializer
以下示例显示了如何执行此操作:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
使用java.net.Socket
连接并在线上使用 Java 序列化的服务器连接工厂。
有关连接工厂可用属性的完整详细信息,请参阅本节末尾的参考资料。
默认情况下,对入站数据包执行反向 DNS 查找,以将 IP 地址转换为主机名,以便在消息头中使用。在未配置 DNS 的环境中,这可能会导致连接延迟。lookup-host
您可以通过将属性设置为 来覆盖此默认行为false
。
您还可以修改套接字和套接字工厂的属性。有关详细信息,请参阅SSL/TLS 支持。如那里所述,无论是否使用 SSL,都可以进行此类修改。 |
自定义序列化器和反序列化器
如果您的数据不是标准反序列化器支持的格式,您可以实现自己的;您还可以实现自定义序列化程序。
要实现自定义序列化器和反序列化器对,请实现org.springframework.core.serializer.Deserializer
和org.springframework.core.serializer.Serializer
接口。
当反序列化器检测到消息之间关闭的输入流时,它必须抛出一个SoftEndOfStreamException
; 这是向框架发出的信号,表明收盘是“正常的”。如果在解码消息时关闭了流,则应该抛出一些其他异常。
从 5.2 版开始,SoftEndOfStreamException
现在是 aRuntimeException
而不是扩展IOException
.
TCP 缓存客户端连接工厂
如前所述, TCP 套接字可以是“一次性的”(一个请求或响应)或共享的。在大容量环境中,共享套接字与出站网关的性能不佳,因为套接字一次只能处理一个请求或响应。
为了提高性能,您可以使用协作通道适配器而不是网关,但这需要应用程序级别的消息关联。有关详细信息,请参阅TCP 消息相关性。
Spring Integration 2.2 引入了缓存客户端连接工厂,它使用共享套接字池,让网关使用共享连接池处理多个并发请求。
TCP 故障转移客户端连接工厂
您可以配置支持故障转移到一台或多台其他服务器的 TCP 连接工厂。发送消息时,工厂会遍历其所有配置的工厂,直到可以发送消息或找不到连接。最初,使用配置列表中的第一个工厂。如果随后连接失败,则下一个工厂将成为当前工厂。以下示例显示如何配置故障转移客户端连接工厂:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
使用故障转移连接工厂时,singleUse 工厂本身和它配置使用的工厂列表之间的属性必须一致。
|
当与共享连接 ( singleUse=false
) 一起使用时,连接工厂有两个与故障回复相关的属性:
-
refreshSharedInterval
-
closeOnRefresh
基于上述配置考虑以下场景:假设clientFactory1
无法建立连接,但clientFactory2
可以。当该failCF
getConnection()
方法在通过后被调用时refreshSharedInterval
,我们将再次尝试使用clientFactory1
;进行连接。如果成功,连接clientFactory2
将被关闭。如果closeOnRefresh
是false
,则“旧”连接将保持打开状态,并且如果第一个工厂再次失败,将来可能会重复使用。
设置refreshSharedInterval
为仅在该时间到期后尝试与第一个工厂重新连接;Long.MAX_VALUE
如果您只想在当前连接失败时恢复到第一个工厂,请将其设置为(默认)。
设置closeOnRefresh
为在刷新后关闭“旧”连接实际上会创建一个新连接。
如果任何委托工厂是 a,则这些属性不适用,CachingClientConnectionFactory 因为在那里处理了连接缓存;在这种情况下,将始终咨询连接工厂列表以获取连接。
|
从 5.3 版开始,这些默认设置为Long.MAX_VALUE
,true
因此工厂仅在当前连接失败时尝试故障恢复。要恢复到以前版本的默认行为,请将它们设置为0
和false
。
另请参阅测试连接。
TCP 线程亲和连接工厂
Spring Integration 5.0 版本引入了这个连接工厂。它将一个连接绑定到调用线程,并且每次该线程发送消息时都会重用相同的连接。这一直持续到连接关闭(由服务器或网络)或直到线程调用该releaseConnection()
方法。连接本身由另一个客户端工厂实现提供,必须将其配置为提供非共享(一次性)连接,以便每个线程都获得一个连接。
以下示例显示如何配置 TCP 线程亲和连接工厂:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}
测试连接
在某些情况下,首次打开连接时发送某种健康检查请求会很有用。一种这样的情况可能是在使用TCP 故障转移客户端连接工厂时,如果所选服务器允许打开连接但报告它不健康,我们可以进行故障转移。
为了支持这个特性,添加一个connectionTest
到客户端连接工厂。
/**
* Set a {@link Predicate} that will be invoked to test a new connection; return true
* to accept the connection, false the reject.
* @param connectionTest the predicate.
* @since 5.3
*/
public void setConnectionTest(@Nullable Predicate<TcpConnectionSupport> connectionTest) {
this.connectionTest = connectionTest;
}
要测试连接,请将临时侦听器附加到测试中的连接。如果测试失败,则关闭连接并抛出异常。当与TCP 故障转移客户端连接工厂一起使用时,这会触发尝试下一个服务器。
只有来自服务器的第一个回复才会发送到测试侦听器。 |
在下面的示例中,如果服务器PONG
在我们发送时回复,则认为服务器是健康的PING
。
Message<String> ping = new GenericMessage<>("PING");
byte[] pong = "PONG".getBytes();
clientFactory.setConnectionTest(conn -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean result = new AtomicBoolean();
conn.registerTestListener(msg -> {
if (Arrays.equals(pong, (byte[]) msg.getPayload())) {
result.set(true);
}
latch.countDown();
return false;
});
conn.send(ping);
try {
latch.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return result.get();
});
TCP 连接拦截器
您可以使用对 a 的引用来配置连接工厂TcpConnectionInterceptorFactoryChain
。您可以使用拦截器向连接添加行为,例如协商、安全和其他选项。框架目前没有提供拦截器,但请参阅InterceptedSharedConnectionTests
源存储库中的示例。
HelloWorldInterceptor
测试用例中使用的工作如下:
拦截器首先配置了一个客户端连接工厂。当通过拦截的连接发送第一条消息时,拦截器通过连接发送“Hello”并期望接收“world!”。发生这种情况时,协商完成并发送原始消息。使用相同连接的更多消息无需任何额外协商即可发送。
当配置了服务器连接工厂时,拦截器要求第一条消息是“Hello”,如果是,则返回“world!”。否则,它会引发导致连接关闭的异常。
所有TcpConnection
方法都被拦截。拦截器实例由拦截器工厂为每个连接创建。如果拦截器是有状态的,工厂应该为每个连接创建一个新实例。如果没有状态,同一个拦截器可以包装每个连接。interceptor-factory
拦截器工厂被添加到拦截器工厂链的配置中,您可以通过设置属性将其提供给连接工厂。拦截器必须扩展TcpConnectionInterceptorSupport
。工厂必须实现TcpConnectionInterceptorFactory
接口。
TcpConnectionInterceptorSupport
有直通方法。通过扩展这个类,你只需要实现那些你想拦截的方法。
以下示例显示了如何配置连接拦截器工厂链:
<bean id="helloWorldInterceptorFactory"
class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<array>
<bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
</array>
</property>
</bean>
<int-ip:tcp-connection-factory id="server"
type="server"
port="12345"
using-nio="true"
single-use="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
single-use="true"
so-timeout="100000"
using-nio="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>
TCP 连接事件
从版本 3.0 开始,对实例的更改TcpConnection
由实例报告TcpConnectionEvent
。
TcpConnectionEvent
是的子类,ApplicationEvent
因此可以被任何ApplicationListener
定义在其中的对象接收ApplicationContext
——例如事件入站通道适配器。
TcpConnectionEvents
具有以下属性:
-
connectionId
:连接标识符,您可以在消息头中使用它来向连接发送数据。 -
connectionFactoryName
:连接所属的连接工厂的bean名称。 -
throwable
:Throwable
(仅用于TcpConnectionExceptionEvent
事件)。 -
source
: 的TcpConnection
。例如,您可以使用它来确定远程 IP 地址getHostAddress()
(需要强制转换)。
此外,从 4.0 版开始,TCP 连接工厂TcpDeserializationExceptionEvent
中讨论的标准反序列化器现在会在解码数据流时遇到问题时发出实例。这些事件包含异常、正在构建的缓冲区以及异常发生时缓冲区的偏移量(如果可用)。应用程序可以使用一个普通ApplicationListener
的或一个ApplicationEventListeningMessageProducer
(请参阅接收 Spring 应用程序事件)来捕获这些事件,以便分析问题。
从版本 4.0.7 和 4.1.3 开始,TcpConnectionServerExceptionEvent
只要在服务器套接字上发生意外异常(例如BindException
在使用服务器套接字时),就会发布实例。这些事件都引用了连接工厂和原因。
从版本 4.2 开始,只要端点(入站网关或协作出站通道适配器)收到由于标头无效TcpConnectionFailedCorrelationEvent
而无法路由到连接的消息,就会发布实例。ip_connectionId
当收到迟到的回复(发送者线程已超时)时,出站网关也会发布此事件。该事件包含连接 ID 以及cause
属性中的异常,其中包含失败的消息。
从版本 4.3 开始,在TcpConnectionServerListeningEvent
启动服务器连接工厂时会发出 a 。这在工厂配置为侦听端口 0 时很有用,这意味着操作系统选择了端口。isListening()
如果您需要在启动连接到套接字的其他进程之前等待,它也可以用来代替 polling 。
为避免延迟侦听线程接受连接,事件在单独的线程上发布。 |
从版本 4.3.2 开始,TcpConnectionFailedEvent
每当无法创建客户端连接时都会发出 a 。事件的来源是连接工厂,您可以使用它来确定无法建立连接的主机和端口。
TCP 适配器
提供了使用前面提到的连接工厂的 TCP 入站和出站通道适配器。这些适配器有两个相关的属性:connection-factory
和channel
. 该connection-factory
属性指示要使用哪个连接工厂来管理适配器的连接。这channel
属性指定消息到达出站适配器的通道以及入站适配器放置消息的通道。虽然入站和出站适配器都可以共享一个连接工厂,但服务器连接工厂总是由一个入站适配器“拥有”。客户端连接工厂始终由出站适配器“拥有”。每种类型只有一个适配器可以引用连接工厂。以下示例显示如何定义客户端和服务器 TCP 连接工厂:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer"/>
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer"/>
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"
using-nio="true"
single-use="true"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="#{server.port}"
single-use="true"
so-timeout="10000"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
<int:channel id="input" />
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-ip:tcp-outbound-channel-adapter id="outboundClient"
channel="input"
connection-factory="client"/>
<int-ip:tcp-inbound-channel-adapter id="inboundClient"
channel="replies"
connection-factory="client"/>
<int-ip:tcp-inbound-channel-adapter id="inboundServer"
channel="loop"
connection-factory="server"/>
<int-ip:tcp-outbound-channel-adapter id="outboundServer"
channel="loop"
connection-factory="server"/>
<int:channel id="loop"/>
在前面的配置中,到达input
通道的消息通过client
连接工厂创建的连接进行序列化,在服务器处接收并放置在loop
通道上。由于loop
是 的输入通道outboundServer
,因此消息通过同一连接环回,由 接收inboundClient
并存放在replies
通道中。在线上使用 Java 序列化。
通常,入站适配器使用type="server"
连接工厂,它侦听传入的连接请求。在某些情况下,您可能希望反向建立连接,以便入站适配器连接到外部服务器,然后等待该连接上的入站消息。
client-mode="true"
入站适配器上的设置支持此拓扑。在这种情况下,连接工厂必须是 typeclient
并且必须single-use
设置为false
。
两个附加属性支持此机制。retry-interval
指定(以毫秒为单位)框架在连接失败后尝试重新连接的频率。提供scheduler
一个TaskScheduler
来安排连接尝试并测试连接是否仍然处于活动状态。
如果您不提供调度程序,则使用框架的默认taskScheduler bean。
对于出站适配器,通常在发送第一条消息时建立连接。出站适配器上的client-mode="true"
会导致在启动适配器时建立连接。默认情况下,适配器会自动启动。同样,连接工厂必须是 typeclient
并且具有single-use="false"
. Aretry-interval
和scheduler
也受支持。如果连接失败,则由调度程序或在发送下一条消息时重新建立。
对于入站和出站,如果启动了适配器,可以通过发送<control-bus />
命令强制适配器建立连接:@adapter_id.retryConnection()
. 然后您可以使用 来检查当前状态@adapter_id.isClientModeConnected()
。
TCP 网关
入站 TCP 网关TcpInboundGateway
和出站 TCP 网关TcpOutboundGateway
分别使用服务器和客户端连接工厂。每个连接一次可以处理一个请求或响应。
入站网关在使用传入有效负载构造消息并将其发送到 后requestChannel
,等待响应并将响应消息中的有效负载写入连接中。
对于入站网关,您必须保留或填充ip_connectionId 标头,因为它用于将消息与连接相关联。源自网关的消息会自动设置标头。如果回复构造为新消息,则需要设置标头。可以从传入消息中捕获标头值。
|
与入站适配器一样,入站网关通常使用type="server"
连接工厂,它侦听传入的连接请求。在某些情况下,您可能希望反向建立连接,以便入站网关连接到外部服务器,然后等待并回复该连接上的入站消息。
client-mode="true"
在入站网关上使用支持此拓扑。在这种情况下,连接工厂必须是 typeclient
并且必须single-use
设置为false
。
两个附加属性支持此机制。
retry-interval
指定(以毫秒为单位)框架在连接失败后尝试重新连接的频率。
scheduler
提供 aTaskScheduler
来安排连接尝试并测试连接是否仍处于活动状态。
如果网关已启动,您可以通过发送<control-bus/>
命令强制网关建立连接:@adapter_id.retryConnection()
并检查当前状态@adapter_id.isClientModeConnected()
。
出站网关在通过连接发送消息后,等待响应,构造响应消息,并将其放在回复通道上。连接上的通信是单线程的。一次只能处理一条消息。如果另一个线程在收到当前响应之前尝试发送消息,它将阻塞,直到任何先前的请求完成(或超时)。但是,如果客户端连接工厂配置为一次性连接,则每个新请求都会获得自己的连接并立即进行处理。以下示例配置入站 TCP 网关:
<int-ip:tcp-inbound-gateway id="inGateway"
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfServer"
reply-timeout="10000"/>
如果使用配置了默认序列化器或反序列化器的连接工厂,则消息是\r\n
分隔数据,并且网关可以由简单的客户端(例如 telnet)使用。
以下示例显示出站 TCP 网关:
<int-ip:tcp-outbound-gateway id="outGateway"
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfClient"
request-timeout="10000"
remote-timeout="10000"/> <!-- or e.g.
remote-timeout-expression="headers['timeout']" -->
client-mode
目前不适用于出站网关。
从版本 5.2 开始,可以使用属性配置出站网关closeStreamAfterSend
。single-use
如果为(每个请求/回复的新连接)配置了连接工厂,网关将关闭输出流;这向服务器发送 EOF 信号。如果服务器使用 EOF 来确定消息的结尾,而不是流中的某个分隔符,但保持连接打开以接收回复,这将很有用。
通常,调用线程会阻塞在网关中,等待回复(或超时)。从5.3版本开始,可以async
在网关上设置属性,释放发送线程做其他工作。回复(或错误)将在接收线程上发送。这仅在使用 时适用TcpNetClientConnectionFactory
,在使用 NIO 时会被忽略,因为存在竞争条件,即在收到回复之后发生的套接字错误可以在回复之前传递给网关。
当使用共享连接 ( singleUse=false ) 时,一个新请求,而另一个正在处理中,将被阻止,直到收到当前回复。CachingClientConnectionFactory 如果您希望在长期连接池上支持并发请求,
请考虑使用。 |
从 5.4 版开始,入站可以配置为unsolicitedMessageChannel
. 未经请求的入站消息以及延迟回复(客户端超时)将发送到此通道。为了在服务器端支持这一点,您现在可以TcpSender
向连接工厂注册多个 s。网关和通道适配器会自动注册自己。从服务器发送未经请求的消息时,您必须将适当的添加IpHeaders.CONNECTION_ID
到发送的消息中。
TCP 消息关联
IP 端点的一个目标是提供与 Spring Integration 应用程序以外的系统的通信。因此,默认情况下仅发送和接收消息负载。从 3.0 开始,您可以使用 JSON、Java 序列化或自定义序列化器和反序列化器来传输标头。有关详细信息,请参阅传输标头。框架(使用网关时除外)或服务器端的协作通道适配器不提供消息关联。 在本文档的后面部分,我们将讨论可用于应用程序的各种关联技术。在大多数情况下,这需要消息的特定应用程序级关联,即使消息有效负载包含一些自然关联数据(例如订单号)。
网关
网关自动关联消息。但是,对于容量相对较小的应用程序,您应该使用出站网关。当您将连接工厂配置为对所有消息对使用单个共享连接 ('single-use="false"') 时,一次只能处理一条消息。新消息必须等到收到对前一条消息的回复。当为每条新消息配置连接工厂以使用新连接('single-use="true"')时,此限制不适用。虽然此设置可以提供比共享连接环境更高的吞吐量,但它会带来为每个消息对打开和关闭新连接的开销。
因此,对于大量消息,请考虑使用一对协作的通道适配器。但是,为此,您需要提供协作逻辑。
Spring Integration 2.2 中引入的另一个解决方案是使用 a CachingClientConnectionFactory
,它允许使用共享连接池。
协作出站和入站通道适配器
要实现大容量吞吐量(避免使用网关的陷阱,如前所述),您可以配置一对协作的出站和入站通道适配器。您还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是使用请求-回复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器添加了一个标头,允许出站适配器在发送回复消息时确定使用哪个连接。
在服务器端,您必须填充ip_connectionId 标头,因为它用于将消息与连接相关联。源自入站适配器的消息会自动设置标头。如果您希望构建其他消息发送,您需要设置标头。您可以从传入消息中获取标头值。
|
在客户端,如果需要,应用程序必须提供自己的关联逻辑。您可以通过多种方式做到这一点。
如果消息有效负载具有一些自然关联数据(例如事务 ID 或订单号)并且您不需要保留原始出站消息中的任何信息(例如回复通道标头),则关联很简单,并且将是在任何情况下都在应用程序级别完成。
如果消息payload有一些自然相关的数据(比如交易ID或者订单号),但是需要保留原始出站消息的一些信息(比如回复通道头),可以保留一份原始的出站消息(可能通过使用发布-订阅通道)并使用聚合器重新组合必要的数据。
对于前两种情况中的任何一种,如果负载没有自然相关数据,您可以在出站通道适配器上游提供一个转换器,以使用此类数据增强负载。这样的转换器可以将原始有效负载转换为包含原始有效负载和消息头的某些子集的新对象。当然,来自标头的活动对象(例如回复通道)不能包含在转换后的有效负载中。
如果您选择这样的策略,则需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类负载(例如DefaultSerializer
and DefaultDeserializer
,它使用 java 序列化,或自定义序列化器和反序列化器)。TCP Connection FactoriesByteArray*Serializer
中提到的选项(包括默认选项)不支持此类有效负载,除非转换后的有效负载是 a或。ByteArrayCrLfSerializer
String
byte[]
在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时,该 这种默认行为在真正的异步环境中是不合适的,因此它现在默认为无限超时。 |
从 5.4 版开始,多个出站通道适配器和一个TcpInboundChannelAdapter
可以共享同一个连接工厂。这允许应用程序支持请求/回复和任意服务器→客户端消息传递。有关详细信息,请参阅TCP 网关。
传输标头
TCP 是一种流协议。
Serializers
并Deserializers
在流中划分消息。在 3.0 之前,只有消息有效负载 (String
或byte[]
) 可以通过 TCP 传输。从 3.0 开始,您可以传输选定的标头以及有效负载。replyChannel
但是,不能序列化“活动”对象,例如标头。
通过 TCP 发送标头信息需要一些额外的配置。
第一步是提供ConnectionFactory
一个MessageConvertingTcpMessageMapper
使用该mapper
属性的对象。此映射器委托给任何MessageConverter
实现,以将消息与某个可以由配置的和反序列化的对象相互serializer
转换deserializer
。
Spring Integration 提供了一个MapMessageConverter
,它允许指定添加到Map
对象的标头列表以及有效负载。生成的 Map 有两个条目:payload
和headers
。该headers
条目本身是 aMap
并包含选定的标题。
第二步是提供一个串行器和一个解串器,可以在 aMap
和某种线格式之间进行转换。这可以是自定义的Serializer
或Deserializer
,如果对等系统不是 Spring Integration 应用程序,您通常需要它。
Spring Integration 提供了一个MapJsonSerializer
将 a 转换为Map
JSON 和从 JSON 转换的方法。它使用 Spring Integration JsonObjectMapper
。JsonObjectMapper
如果需要,您可以提供自定义。默认情况下,序列化程序会0x0a
在对象之间插入换行符 ( )。有关更多信息,请参阅Javadoc。
使用类路径中
的JsonObjectMapper 任何版本。Jackson |
您还可以Map
通过使用DefaultSerializer
and来使用 , 的标准 Java 序列化DefaultDeserializer
。
以下示例显示了使用 JSON 传输 、 和 标头correlationId
的sequenceNumber
连接工厂的配置:sequenceSize
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置发送的消息,有效负载为“某事”,将出现在网络上,如下所示:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}
关于非阻塞 I/O (NIO)
使用 NIO(参见using-nio
IP配置属性)避免了专用线程从每个套接字中读取。对于少数套接字,您可能会发现不使用 NIO 以及异步切换(例如到 a QueueChannel
)的性能与使用 NIO 一样好或更好。
在处理大量连接时,您应该考虑使用 NIO。但是,NIO 的使用还有其他一些影响。一个线程池(在任务执行器中)在所有套接字之间共享。每个传入消息都被组装并发送到配置的通道,作为从该池中选择的线程上的一个单独的工作单元。到达同一个套接字的两个顺序消息可能由不同的线程处理。这意味着消息发送到通道的顺序是不确定的。不维护到达套接字的消息的严格顺序。
对于某些应用程序,这不是问题。对其他人来说,这是一个问题。如果您需要严格的排序,请考虑设置using-nio
并false
使用异步切换。
或者,您可以在入站端点的下游插入一个重新排序器,以将消息返回到正确的顺序。如果您设置apply-sequence
为true
连接工厂,则到达 TCP 连接的消息将sequenceNumber
设置correlationId
标头。重定序器使用这些标头将消息返回到正确的顺序。
从版本 5.1.4 开始,接受新连接优先于从现有连接读取。通常,除非您的新传入连接率非常高,否则这应该几乎没有影响。如果您希望恢复到之前给予读取优先级的行为,请将multiAccept 属性设置为TcpNioServerConnectionFactory to false 。
|
池大小
不再使用池大小属性。以前,它在未指定任务执行器时指定默认线程池的大小。它还用于设置服务器套接字上的连接积压。不再需要第一个函数(参见下一段)。第二个函数被backlog
属性替换。
以前,当使用 NIO 的固定线程池任务执行器(这是默认值)时,可能会出现死锁并且处理将停止。当缓冲区已满,从套接字读取的线程试图向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间时,就会出现问题。这仅发生在池规模非常小的情况下,但在极端条件下也可能发生。从 2.2 开始,有两个变化消除了这个问题。首先,默认的任务执行器是一个缓存的线程池执行器。其次,添加了死锁检测逻辑,如果发生线程饥饿,则不是死锁,而是抛出异常,从而释放死锁的资源。
现在默认任务执行程序是无限制的,如果消息处理需要较长时间,则可能会出现内存不足的情况,并且传入消息的频率很高。如果您的应用程序表现出这种行为,您应该使用具有适当池大小的池任务执行器,但请参阅下一节。 |
CALLER_RUNS
带策略的线程池任务执行器
CallerRunsPolicy
当您将固定线程池与(CALLER_RUNS
使用<task/>
命名空间时)一起使用并且队列容量较小时,您应该牢记一些重要的注意事项。
如果您不使用固定线程池,则以下内容不适用。
对于 NIO 连接,存在三种不同的任务类型。I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行器将 I/O 读取操作分派给其他线程)。当一个 I/O 读取器线程(读取操作被分派到该线程)读取数据时,它会移交给另一个线程来组装传入的消息。大型消息可能需要多次读取才能完成。这些“汇编程序”线程可以在等待数据时阻塞。当发生新的读取事件时,读取器确定此套接字是否已经有一个汇编器,如果没有,则运行一个新的。当组装过程完成时,组装线程返回到池中。
当池耗尽、CALLER_RUNS
拒绝策略正在使用以及任务队列已满时,这可能会导致死锁。当池为空且队列中没有空间时,IO 选择器线程接收OP_READ
事件并使用执行器调度读取。队列已满,因此选择器线程自己启动读取过程。现在它检测到这个套接字没有汇编器,并且在它进行读取之前,触发了一个汇编器。同样,队列已满,选择器线程成为汇编器。汇编器现在被阻塞,等待读取数据,这永远不会发生。连接工厂现在已死锁,因为选择器线程无法处理新事件。
为了避免这种死锁,我们必须避免选择器(或读取器)线程执行组装任务。我们希望为 IO 和组装操作使用单独的池。
该框架提供了一个CompositeExecutor
,它允许配置两个不同的执行器:一个用于执行 IO 操作,一个用于消息组装。在这种环境下,IO线程永远不可能成为汇编线程,也不会发生死锁。
此外,任务执行器应配置为使用AbortPolicy
( ABORT
when using <task>
)。当一个 I/O 任务无法完成时,它会被延迟一小段时间并不断重试,直到它可以完成并分配一个汇编程序。默认情况下,延迟为 100 毫秒,但您可以通过设置readDelay
连接工厂的属性来更改它(read-delay
使用 XML 命名空间进行配置时)。
以下三个示例展示了如何配置复合执行器:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>
SSL/TLS 支持
支持安全套接字层/传输层安全。使用 NIO 时,JDK 5+SSLEngine
的特性用于处理建立连接后的握手。不使用 NIO 时,使用标准SSLSocketFactory
和SSLServerSocketFactory
对象来创建连接。提供了许多策略界面以允许进行重要的定制。这些接口的默认实现提供了开始安全通信的最简单方法。
入门
不管你是否使用NIO,都需要ssl-context-support
在连接工厂上配置属性。该属性引用了一个 <bean/> 定义,该定义描述了所需密钥存储的位置和密码。
SSL/TLS 对等点分别需要两个密钥存储:
-
包含用于识别对等点的私钥和公钥对的密钥库
-
包含受信任对等方的公钥的信任库。
keytool
请参阅随 JDK 提供的实用程序的文档。基本步骤是-
创建一个新的密钥对并将其存储在密钥库中。
-
导出公钥。
-
将公钥导入对等方的信任库。
-
对另一个同伴重复。
-
在测试用例中,在两个对等点上使用相同的密钥存储是很常见的,但在生产环境中应该避免这种情况。 |
建立密钥存储之后,下一步是向TcpSSLContextSupport
bean 指示它们的位置,并向连接工厂提供对该 bean 的引用。
以下示例配置 SSL 连接:
<bean id="sslContextSupport"
class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
<constructor-arg value="client.ks"/>
<constructor-arg value="client.truststore.ks"/>
<constructor-arg value="secret"/>
<constructor-arg value="secret"/>
</bean>
<ip:tcp-connection-factory id="clientFactory"
type="client"
host="localhost"
port="1234"
ssl-context-support="sslContextSupport" />
该类DefaultTcpSSLContextSupport
还有一个可选protocol
属性,可以是SSL
或TLS
(默认值)。
密钥库文件名(前两个构造函数参数)使用 SpringResource
抽象。默认情况下,文件位于类路径上,但您可以使用file:
前缀覆盖它(改为在文件系统上查找文件)。
从 4.3.6 版本开始,当你使用 NIO 时,你可以ssl-handshake-timeout
在连接工厂上指定一个(以秒为单位)。在 SSL 握手期间等待数据时使用此超时(默认为 30 秒)。如果超时,则停止进程并关闭套接字。
主机验证
从版本 5.0.8 开始,您可以配置是否启用主机验证。从 5.1 版本开始,默认启用;禁用它的机制取决于您是否使用 NIO。
主机验证用于确保您连接的服务器与证书中的信息匹配,即使证书是受信任的。
使用 NIO 时,DefaultTcpNioSSLConnectionSupport
例如配置 .
@Bean
public DefaultTcpNioSSLConnectionSupport connectionSupport() {
DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks",
"test.truststore.ks", "secret", "secret");
sslContextSupport.setProtocol("SSL");
DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport =
new DefaultTcpNioSSLConnectionSupport(sslContextSupport, false);
return tcpNioConnectionSupport;
}
第二个构造函数参数禁用主机验证。然后将connectionSupport
bean 注入 NIO 连接工厂。
不使用 NIO 时,配置位于TcpSocketSupport
:
connectionFactory.setTcpSocketSupport(new DefaultTcpSocketSupport(false));
同样,构造函数参数禁用主机验证。
先进技术
本节介绍在某些情况下您可能会发现有用的高级技术。
策略接口
在许多情况下,前面描述的配置就是启用 TCP/IP 上的安全通信所需的全部内容。但是,Spring Integration 提供了许多策略接口来允许自定义和修改套接字工厂和套接字:
-
TcpSSLContextSupport
-
TcpSocketFactorySupport
-
TcpSocketSupport
-
TcpNetConnectionSupport
-
TcpNioConnectionSupport
策略TcpSSLContextSupport
界面
以下清单显示了TcpSSLContextSupport
策略界面:
public interface TcpSSLContextSupport {
SSLContext getSSLContext() throws Exception;
}
接口的实现TcpSSLContextSupport
负责创建SSLContext
对象。框架提供的实现是DefaultTcpSSLContextSupport
前面描述的. 如果您需要不同的行为,请实现此接口并为连接工厂提供对您的类实现的 bean 的引用。
策略TcpSocketFactorySupport
界面
以下清单显示了TcpSocketFactorySupport
策略接口的定义:
public interface TcpSocketFactorySupport {
ServerSocketFactory getServerSocketFactory();
SocketFactory getSocketFactory();
}
此接口的实现负责获取对ServerSocketFactory
和的引用SocketFactory
。提供了两种实现方式。第一个DefaultTcpNetSocketFactorySupport
用于非 SSL 套接字(ssl-context-support
未定义属性时)。这使用 JDK 的默认工厂。第二种实现是DefaultTcpNetSSLSocketFactorySupport
. 默认情况下,这在ssl-context-support
定义属性时使用。它使用该SSLContext
bean 创建的来创建套接字工厂。
此接口仅适用于using-nio is false 。NIO 不使用套接字工厂。
|
策略TcpSocketSupport
界面
以下清单显示了TcpSocketSupport
策略接口的定义:
public interface TcpSocketSupport {
void postProcessServerSocket(ServerSocket serverSocket);
void postProcessSocket(Socket socket);
}
此接口的实现可以在创建套接字之后以及在应用所有配置的属性之后但在使用套接字之前修改套接字。无论您是否使用 NIO,这都适用。例如,您可以使用此接口的实现来修改 SSL 套接字上支持的密码套件,或者您可以添加一个侦听器,在 SSL 握手完成后收到通知。框架提供的唯一实现是DefaultTcpSocketSupport
,它不会以任何方式修改套接字。
要提供您自己的TcpSocketFactorySupport
or实现,请分别TcpSocketSupport
通过设置socket-factory-support
和socket-support
属性为连接工厂提供对您自定义类型的 bean 的引用。
策略TcpNetConnectionSupport
界面
以下清单显示了TcpNetConnectionSupport
策略接口的定义:
public interface TcpNetConnectionSupport {
TcpNetConnection createNewConnection(Socket socket,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;
}
调用此接口以创建类型TcpNetConnection
(或其子类)的对象。该框架提供了一个单一的实现 ( DefaultTcpNetConnectionSupport
),默认情况下,它会创建简单的TcpNetConnection
对象。它有两个属性:pushbackCapable
和pushbackBufferSize
。启用推回后,实现返回一个子类,该子类将连接包装InputStream
在PushbackInputStream
. 与默认值对齐PushbackInputStream
,缓冲区大小默认为 1。这让反序列化器“未读”(推回)字节到流中。下面的简单示例展示了如何在委托解串器中使用它,该解串器“窥视”第一个字节以确定要调用哪个解串器:
public class CompositeDeserializer implements Deserializer<byte[]> {
private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer();
private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer();
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
PushbackInputStream pbis = (PushbackInputStream) inputStream;
int first = pbis.read();
if (first < 0) {
throw new SoftEndOfStreamException();
}
pbis.unread(first);
if (first == ByteArrayStxEtxSerializer.STX) {
this.receivedStxEtx = true;
return this.stxEtx.deserialize(pbis);
}
else {
this.receivedCrLf = true;
return this.crlf.deserialize(pbis);
}
}
}
策略TcpNioConnectionSupport
界面
以下清单显示了TcpNioConnectionSupport
策略接口的定义:
public interface TcpNioConnectionSupport {
TcpNioConnection createNewConnection(SocketChannel socketChannel,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;
}
调用此接口来创建TcpNioConnection
对象(或来自子类的对象)。Spring Integration 提供了两种实现:DefaultTcpNioSSLConnectionSupport
和DefaultTcpNioConnectionSupport
. 使用哪一种取决于是否使用 SSL。一个常见的用例是子类化DefaultTcpNioSSLConnectionSupport
和覆盖postProcessSSLEngine
。请参阅SSL 客户端身份验证示例。与 一样DefaultTcpNetConnectionSupport
,这些实现也支持推回。
示例:启用 SSL 客户端身份验证
要在使用 SSL 时启用客户端证书身份验证,该技术取决于您是否使用 NIO。当您不使用 NIO 时,提供自定义TcpSocketSupport
实现来后处理服务器套接字:
serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {
@Override
public void postProcessServerSocket(ServerSocket serverSocket) {
((SSLServerSocket) serverSocket).setNeedClientAuth(true);
}
});
(当您使用 XML 配置时,通过设置socket-support
属性来提供对 bean 的引用)。
使用 NIO 时,提供自定义TcpNioSslConnectionSupport
实现来对 进行后处理SSLEngine
,如以下示例所示:
@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {
@Override
protected void postProcessSSLEngine(SSLEngine sslEngine) {
sslEngine.setNeedClientAuth(true);
}
}
}
@Bean
public TcpNioServerConnectionFactory server() {
...
serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
...
}
(当您使用 XML 配置时,从版本 4.3.7 开始,通过设置nio-connection-support
属性来提供对您的 bean 的引用)。
IP 配置属性
下表描述了您可以设置以配置 IP 连接的属性:
属性名称 | 客户? | 服务器? | 允许值 | 属性说明 |
---|---|---|---|---|
|
是 |
是 |
客户端服务器 |
确定连接工厂是客户端还是服务器。 |
|
是 |
ñ |
目标的主机名或 IP 地址。 |
|
|
是 |
是 |
港口。 |
|
|
是 |
是 |
|
|
|
是 |
是 |
|
|
|
是 |
是 |
|
连接是否使用 NIO。有关详细信息,请参阅 |
|
是 |
ñ |
|
使用 NIO 时,连接是否使用直接缓冲区。有关详细信息,请参阅 |
|
是 |
是 |
|
当您使用 NIO 时,可能需要对消息重新排序。当此属性设置为 |
|
是 |
是 |
默认为 |
|
|
是 |
是 |
见 |
|
|
是 |
是 |
见 |
|
|
是 |
是 |
|
见 |
|
是 |
是 |
使用提供的值设置 |
|
|
是 |
是 |
|
见 |
|
是 |
是 |
见 |
|
|
ñ |
是 |
在多宿主系统上,指定套接字绑定到的接口的 IP 地址。 |
|
|
是 |
是 |
指定用于套接字处理的特定执行程序。如果未提供,则使用内部缓存线程执行器。在一些需要使用特定任务执行器的平台上需要,例如 |
|
|
是 |
是 |
|
指定一个连接是否可用于多条消息。如果 |
|
ñ |
ñ |
该属性不再使用。为了向后兼容,它设置了积压,但您应该使用它 |
|
|
ñ |
是 |
设置服务器工厂的连接积压。 |
|
|
是 |
是 |
|
指定是否对 IP 地址进行反向查找以转换为主机名以用于消息头。如果为 false,则使用 IP 地址。默认值: |
|
是 |
是 |
请参阅TCP 连接拦截器。 |
|
|
是 |
是 |
||
|
是 |
是 |
||
|
是 |
是 |
请参阅SSL/TLS 支持。 |
|
|
是 |
是 |
请参阅高级技术。 |
|
|
是 |
是 |
长 > 0 |
由于线程不足,上一次尝试失败后重试读取之前的延迟(以毫秒为单位)。默认值:100。仅适用于 |
下表描述了您可以设置以配置 UDP 入站通道适配器的属性:
属性名称 | 允许值 | 属性说明 |
---|---|---|
|
适配器侦听的端口。 |
|
|
|
UDP 适配器是否使用多播。 |
|
当多播为真时,适配器加入的多播地址。 |
|
|
指定可以同时处理的数据包数量。它仅适用于未配置任务执行器的情况。默认值:5。 |
|
任务执行者 |
指定用于套接字处理的特定执行程序。如果未提供,则使用内部池执行器。在一些需要使用特定任务执行器的平台上需要,例如 |
|
|
用于接收的缓冲区的大小 |
|
|
|
UDP 适配器是否需要接收到的数据包中的数据长度字段。用于检测数据包截断。 |
|
有关详细信息,请参阅中的 |
|
|
用于 UDP 确认数据包。有关详细信息,请参阅中的 setSendBufferSize() 方法 |
|
|
有关 |
|
|
在多宿主系统上,指定套接字绑定到的接口的 IP 地址。 |
|
|
如果下游组件抛出异常,则将 |
|
|
|
指定是否对 IP 地址进行反向查找以转换为主机名以用于消息头。如果 |
下表描述了您可以设置以配置 UDP 出站通道适配器的属性:
属性名称 | 允许值 | 属性说明 |
---|---|---|
|
目标的主机名或 IP 地址。对于多播 udp 适配器,多播地址。 |
|
|
目的地港口。 |
|
|
|
udp 适配器是否使用多播。 |
|
|
UDP 适配器是否需要来自目标的确认。启用后,需要设置以下四个属性: |
|
当 |
|
|
当 |
|
|
When |
|
|
默认为 1。对于多播适配器,您可以将其设置为更大的值,这需要来自多个目标的确认。 |
|
|
|
UDP 适配器是否在发送到目的地的数据包中包含数据长度字段。 |
|
对于多播适配器,指定 |
|
|
有关详细信息,请参阅 |
|
|
有关详细信息,请参阅中的 |
|
|
用于 UDP 确认数据包。有关详细信息,请参阅中的 |
|
本地地址 |
在多宿主系统上,对于 UDP 适配器,指定接口的 IP 地址,套接字绑定到该接口以用于回复消息。对于多播适配器,它还确定通过哪个接口发送多播数据包。 |
|
|
指定用于确认处理的特定执行程序。如果未提供,则使用内部单线程执行器。在一些需要使用特定任务执行器的平台上需要,例如 |
|
|
SpEL 表达式 |
要评估的 SpEL 表达式以确定将哪个 |
|
SpEL 表达式 |
要评估的 SpEL 表达式以确定哪个数据报套接字用于发送传出 UDP 数据包。 |
下表描述了您可以设置以配置 TCP 入站通道适配器的属性:
属性名称 | 允许值 | 属性说明 |
---|---|---|
|
入站消息发送到的通道。 |
|
|
如果连接工厂的类型为 |
|
|
如果下游组件抛出异常,则将 |
|
|
|
当 时 |
|
在 中时 |
|
|
|
指定 |
下表描述了您可以设置以配置 TCP 出站通道适配器的属性:
属性名称 | 允许值 | 属性说明 |
---|---|---|
|
出站消息到达的通道。 |
|
|
如果连接工厂的类型为 |
|
|
|
时 |
|
在 中时 |
|
|
|
指定 |
下表描述了您可以设置以配置 TCP 入站网关的属性:
属性名称 | 允许值 | 属性说明 |
---|---|---|
|
连接工厂必须是服务器类型。 |
|
|
传入消息发送到的通道。 |
|
|
回复消息可能到达的通道。通常,回复到达添加到入站消息头的临时回复通道。 |
|
|
网关等待回复的时间(以毫秒为单位)。默认值:1000(1 秒)。 |
|
|
如果下游组件抛出异常,则将 |
|
|
|
当 时 |
|
在 中时 |
|
|
|
指定 |
下表描述了您可以设置以配置 TCP 出站网关的属性:
属性名称 | 允许值 | 属性说明 |
---|---|---|
|
连接工厂的类型必须是 |
|
|
传出消息到达的通道。 |
|
|
可选的。回复消息发送到的通道。 |
|
|
网关等待远程系统回复的时间(以毫秒为单位)。与 互斥 |
|
|
一个 SpEL 表达式,根据消息评估以确定网关等待远程系统回复的时间(以毫秒为单位)。与 互斥 |
|
|
如果未使用一次性连接工厂,则网关等待访问共享连接的时间(以毫秒为单位)。 |
|
|
网关在向回复通道发送回复时等待的时间(以毫秒为单位)。仅当回复通道可能阻塞时才适用(例如当前已满的有界 QueueChannel)。 |
|
|
发送后释放发送线程;回复(或错误)将在接收线程上发送。 |
|
|
向其发送未经请求的消息和延迟回复的通道。 |
IP 消息头
该模块使用以下MessageHeader
实例:
标题名称 | IpHeaders 常量 | 描述 |
---|---|---|
|
|
从中接收 TCP 消息或 UDP 数据包的主机名。如果 |
|
|
接收 TCP 消息或 UDP 数据包的 IP 地址。 |
|
|
UDP 数据包的远程端口。 |
ip_localInetAddress |
|
|
|
|
向其发送 UDP 应用程序级确认的远程 IP 地址。该框架在数据包中包含确认信息。 |
|
|
UDP 应用程序级确认的相关 ID。该框架在数据包中包含确认信息。 |
|
|
TCP 连接的远程端口。 |
|
|
TCP 连接的唯一标识符。由框架为入站消息设置。当发送到服务器端入站通道适配器或回复入站网关时,需要此标头,以便端点可以确定要将消息发送到的连接。 |
|
|
仅供参考。使用缓存或故障转移客户端连接工厂时,它包含实际的底层连接 ID。 |
|
|
入站消息的可选内容类型 在此表之后进行描述。请注意,与其他标头常量不同,此常量位于 |
对于入站消息,默认情况下会映射ip_hostname
、ip_address
、ip_tcp_remotePort
和。如果您将映射器的属性设置为,则映射器会设置标头(默认情况下为 )。您可以通过设置属性来更改默认值。您可以通过子类化和覆盖该方法来添加其他标头。例如,当您使用 SSL 时,您可以通过从对象中获取会话对象来添加 的属性,会话对象作为方法的参数提供。ip_connectionId
TcpHeaderMapper
addContentTypeHeader
true
contentType
application/octet-stream;charset="UTF-8"
contentType
TcpHeaderMapper
supplyCustomHeaders
SSLSession
TcpConnection
supplyCustomHeaders
对于出站消息,String
有效负载将转换为byte[]
使用默认 ( UTF-8
) 字符集。设置charset
属性以更改默认值。
mapper
在自定义映射器属性或子类化时,将映射器声明为 bean,并使用该属性向连接工厂提供实例。
基于注释的配置
示例存储库中的以下示例显示了当您使用注释而不是 XML 时可用的一些配置选项:
@EnableIntegration (1)
@IntegrationComponentScan (2)
@Configuration
public static class Config {
@Value(${some.port})
private int port;
@MessagingGateway(defaultRequestChannel="toTcp") (3)
public interface Gateway {
String viaTcp(String in);
}
@Bean
@ServiceActivator(inputChannel="toTcp") (4)
public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory);
gate.setOutputChannelName("resultToString");
return gate;
}
@Bean (5)
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannel(fromTcp());
return inGate;
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@MessageEndpoint
public static class Echo { (6)
@Transformer(inputChannel="fromTcp", outputChannel="toEcho")
public String convert(byte[] bytes) {
return new String(bytes);
}
@ServiceActivator(inputChannel="toEcho")
public String upCase(String in) {
return in.toUpperCase();
}
@Transformer(inputChannel="resultToString")
public String convertResult(byte[] bytes) {
return new String(bytes);
}
}
@Bean
public AbstractClientConnectionFactory clientCF() { (7)
return new TcpNetClientConnectionFactory("localhost", this.port);
}
@Bean
public AbstractServerConnectionFactory serverCF() { (8)
return new TcpNetServerConnectionFactory(this.port);
}
}
1 | 为集成应用程序启用基础设施的标准 Spring Integration 注释。 |
2 | 搜索@MessagingGateway 接口。 |
3 | 流的客户端的入口点。调用应用程序可以使用@Autowired 这个Gateway bean 并调用它的方法。 |
4 | 出站端点由一个MessageHandler 和一个包装它的消费者组成。在这种情况下,@ServiceActivator 根据通道类型配置端点。 |
5 | 入站端点(在 TCP/UDP 模块中)都是消息驱动的,因此只需要声明为简单@Bean 实例。 |
6 | 此类提供了许多用于此示例流程的 POJO 方法(一个在服务器端,一个@Transformer 在客户端)。@ServiceActivator @Transformer |
7 | 客户端连接工厂。 |
8 | 服务器端连接工厂。 |
将 Java DSL 用于 TCP 组件
对 TCP 组件的 DSL 支持包括适配器和网关规范、Tcp
具有创建连接工厂 bean 的TcpCodecs
工厂方法的类以及具有创建序列化器和反序列化器的工厂方法的类。有关更多信息,请参阅他们的 javadocs。
下面是一些使用 DSL 配置流的示例。
@Bean
public IntegrationFlow server() {
return IntegrationFlows.from(Tcp.inboundAdapter(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}
@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.nioClient("localhost", 1234)
.serializer(TcpCodecs.lengthHeader1())));
}
@Bean
public IntegrationFlow server() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}
@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundGateway(Tcp.nioClient("localhost", 1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())));
}