信息

Spring IntegrationMessage是一个通用的数据容器。任何对象都可以作为有效负载提供,每个Message实例都包含包含用户可扩展属性作为键值对的标头。

Message界面_

以下清单显示了Message接口的定义:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message接口是 API 的核心部分。通过将数据封装在通用包装器中,消息传递系统可以在不知道数据类型的情况下传递它。随着应用程序发展以支持新类型,或者当类型本身被修改或扩展时,消息传递系统不会受到影响。另一方面,当消息传递系统中的某些组件确实需要访问有关Message.

消息头

正如 Spring Integration 允许Object将 any 用作 a 的有效负载一样Message,它也支持任何Object类型作为标头值。实际上,MessageHeaders该类实现了java.util.Map_ interface,如以下类定义所示:

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
即使MessageHeaders该类实现Map了 ,它实际上也是一个只读实现。put对 Map 中的值的任何尝试都会导致UnsupportedOperationException. 这同样适用于removeclear。由于消息可能会传递给多个消费者,因此Map无法修改其结构。同样,消息的有效负载Object不能set在初始创建之后。但是,标头值本身(或有效负载对象)的可变性故意留给框架用户决定。

作为 的实现Map,可以通过使用标头的名称调用来检索get(..)标头。或者,您可以提供预期Class作为附加参数。更好的是,在检索预定义值之一时,可以使用方便的 getter。以下示例显示了这三个选项中的每一个:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息头:

表 1. 预定义的消息头
标题名称 标题类型 用法
MessageHeaders.ID
java.util.UUID

此消息实例的标识符。每次更改消息时都会更改。

消息头。
时间戳
java.lang.Long

创建消息的时间。每次更改消息时都会更改。

消息头。
REPLY_CHANNEL
java.lang.Object
(字符串或
消息频道)

当没有配置显式输出通道并且没有ROUTING_SLIPROUTING_SLIP已用尽时,向其发送回复(如果有)的通道。如果值为 a String,则它必须代表一个 bean 名称或由 a 生成ChannelRegistry.

消息头。
ERROR_CHANNEL
java.lang.Object
(字符串或
消息频道)

向其发送错误的通道。如果值为 a String,它必须代表一个 bean 名称或由 a 生成ChannelRegistry.

许多入站和出站适配器实现也提供或期望某些标头,您可以配置其他用户定义的标头。这些标头的常量可以在存在此类标头的那些模块中找到 - 例如。 AmqpHeaders, JmsHeaders, 等等。

MessageHeaderAccessorAPI

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至spring-messaging模块,并MessageHeaderAccessor引入了 API 以提供对消息传递实现的额外抽象。所有(核心)Spring Integration 特定的消息头常量现在都在IntegrationMessageHeaderAccessor类中声明。下表描述了预定义的消息头:

表 2. 预定义的消息头
标题名称 标题类型 用法
集成消息头访问器。
CORRELATION_ID
java.lang.Object

用于关联两个或多个消息。

集成消息头访问器。
序列号
java.lang.Integer

通常一个带有 a 的消息组的序列号,SEQUENCE_SIZE但也可以用在 a<resequencer/>中以对无限制的消息组重新排序。

集成消息头访问器。
SEQUENCE_SIZE
java.lang.Integer

一组相关消息中的消息数。

集成消息头访问器。
截止日期
java.lang.Long

指示消息何时过期。框架不直接使用,但可以使用标头丰富器进行设置,并用于<filter/>配置了UnexpiredMessageSelector.

集成消息头访问器。
优先
java.lang.Integer

消息优先级 - 例如,在PriorityChannel.

集成消息头访问器。
DUPLICATE_MESSAGE
java.lang.Boolean

如果消息被幂等接收器拦截器检测为重复消息,则为真。请参阅幂等接收器企业集成模式

集成消息头访问器。
CLOSEABLE_RESOURCE
java.io.Closeable

Closeable如果消息与在消息处理完成时应关闭的消息相关联,则存在此标头。一个示例是Session与使用 FTP、SFTP 等的流式文件传输相关联。

集成消息头访问器。
DELIVERY_ATTEMPT
java.lang.
原子整数

如果消息驱动的通道适配器支持 a 的配置RetryTemplate,则此标头包含当前的传递尝试。

集成消息头访问器。
ACKNOWLEDGMENT_CALLBACK
支持。
致谢
打回来

如果入站端点支持它,则回调以接受、拒绝或重新排队消息。请参阅延迟确认可轮询消息源MQTT 手动确认。

类中为其中一些标头提供了方便的类型化 getter IntegrationMessageHeaderAccessor,如以下示例所示:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在IntegrationMessageHeaderAccessor但通常不被用户代码使用的标题(也就是说,它们通常被 Spring Integration 的内部部分使用——它们包含在此处是为了完整性):

表 3. 预定义的消息头
标题名称 标题类型 用法
集成消息头访问器。
SEQUENCE_DETAILS
java.util.
列表<列表<对象>>

需要嵌套相关时使用的相关数据堆栈(例如, splitter→…​→splitter→…​→aggregator→…​→aggregator)。

集成消息头访问器。
ROUTING_SLIP
java.util.
地图<列表<对象>,整数>

请参阅传送单

消息 ID 生成

当消息通过应用程序转换时,每次它被改变(例如,通过转换器)时,都会分配一个新的消息 ID。消息 ID 是UUID. 从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比之前的java.util.UUID.randomUUID()实现更高效。它使用基于安全随机种子的简单随机数,而不是每次都创建一个安全随机数。

可以通过声明org.springframework.util.IdGenerator在应用程序上下文中实现的 bean 来选择不同的 UUID 生成策略。

在类加载器中只能使用一种 UUID 生成策略。这意味着,如果两个或多个应用程序上下文在同一个类加载器中运行,它们共享相同的策略。如果其中一个上下文改变了策略,则它被所有上下文使用。如果同一个类加载器中的两个或多个上下文声明了一个 type 的 bean org.springframework.util.IdGenerator,它们必须都是同一个类的实例。否则,尝试替换自定义策略的上下文无法初始化。如果策略相同,但已参数化,则使用要初始化的第一个上下文中的策略。

除了默认策略之外,IdGenerators还提供了两个附加策略。 org.springframework.util.JdkIdGenerator使用之前的UUID.randomUUID()机制。您可以o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator在真正不需要 UUID 并且简单的递增值就足够时使用。

只读标头

和是只读标头,不能被覆盖MessageHeaders.IDMessageHeaders.TIMESTAMP

从版本 4.3.2 开始,MessageBuilder提供readOnlyHeaders(String…​ readOnlyHeaders)API 来自定义不应从上游复制的标头列表Message。只有MessageHeaders.IDandMessageHeaders.TIMESTAMP是默认只读的。提供全局spring.integration.readOnly.headers属性(请参阅全局属性)以自定义DefaultMessageBuilderFactory框架组件。当您不希望填充一些开箱即用的标头时,这可能很有用,例如contentTypeObjectToJsonTransformer请参阅JSON Transformers)。

当您尝试使用 构建新消息MessageBuilder时,会忽略此类标头,并将特定INFO消息发送到日志。

从 5.0 版开始,Messaging GatewayHeader EnricherContent EnricherHeader Filter不允许您在使用时配置MessageHeaders.IDMessageHeaders.TIMESTAMPheader 名称DefaultMessageBuilderFactory,它们会抛出BeanInitializationException.

标头传播

当消息由产生消息的端点(例如服务激活器)处理(和修改)时,通常会将入站标头传播到出站消息。一个例外是转换器,当完整的消息返回到框架时。在这种情况下,用户代码负责整个出站消息。当转换器仅返回有效负载时,会传播入站标头。此外,仅当出站消息中不存在标头时才会传播标头,让您可以根据需要更改标头值。

从版本 4.3.10 开始,您可以配置消息处理程序(修改消息并产生输出)以抑制特定标头的传播。要配置您不想被复制的标头,请调用抽象类上的setNotPropagatedHeaders()addNotPropagatedHeaders()方法。MessageProducingMessageHandler

readOnlyHeaders您还可以通过将属性设置META-INF/spring.integration.properties为以逗号分隔的标头列表来全局抑制特定消息标头的传播。

从 5.0 版开始,setNotPropagatedHeaders()应用AbstractMessageProducingHandler简单模式(xxx*xxx*xxxxxx*yyy)的实现允许过滤具有通用后缀或前缀的标头。有关详细信息,请参阅PatternMatchUtilsJavadoc。当其中一种模式是*(星号)时,不会传播任何标头。所有其他模式都将被忽略。在这种情况下,服务激活器的行为方式与转换器相同,并且必须在Message服务方法的返回中提供任何所需的标头。该notPropagatedHeaders()选项可ConsumerEndpointSpec用于 Java DSL 也可用于<service-activator>组件的 XML 配置作为not-propagated-headers属性。

报头传播抑制不适用于那些不修改消息的端点,例如网桥路由器

消息实现

Message接口的基本实现是GenericMessage<T>,它提供了两个构造函数,如下面的清单所示:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

创建a 时Message,会生成一个随机唯一 ID。接受 a Mapof headers 的构造函数将提供的 headers 复制到新创建的Message.

还有一个方便的实现,Message旨在传达错误条件。此实现将Throwable对象作为其有效负载,如以下示例所示:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了GenericMessage基类已参数化这一事实。Message因此,如两个示例中所示,在检索有效负载时不需要强制转换Object

MessageBuilder助手类

您可能会注意到该Message接口为其有效负载和标头定义了检索方法,但没有提供设置器。这样做的原因是 aMessage在其初始创建后无法修改。因此,当一个Message实例发送给多个消费者(例如,通过发布-订阅通道)时,如果其中一个消费者需要发送具有不同负载类型的回复,则必须创建一个新的Message. 因此,其他消费者不受这些变化的影响。请记住,多个消费者可能会访问相同的有效负载实例或标头值,而此类实例本身是否不可变则由您决定。换句话说,Message实例的契约类似于不可修改的契约Collection,并且MessageHeadersmap 进一步说明了这一点。即使MessageHeaders该类实现了java.util.Map,任何put对实例调用操作(或“删除”或“清除”)的尝试都会MessageHeaders导致UnsupportedOperationException.

Spring Integration 确实提供了一种更方便的方式来构造消息,而不是要求创建和填充 Map 以传递给 GenericMessage 构造函数:MessageBuilder. 提供MessageBuilder了两种工厂方法,用于Message从现有Message或使用有效负载创建实例Object。从现有的 构建时Message,其标头和有效负载Message将复制到新的Message,如以下示例所示:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要Message使用新的有效负载创建一个,但仍想从现有的 复制标头Message,则可以使用“复制”方法之一,如以下示例所示:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,该copyHeadersIfAbsent方法不会覆盖现有值。此外,在前面的示例中,您可以看到如何使用setHeader. 最后,有一些set方法可用于预定义的标头以及用于设置任何标头的非破坏性方法(MessageHeaders还为预定义的标头名称定义常量)。

您还可以使用MessageBuilder设置消息的优先级,如以下示例所示:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priority仅在使用 a 时才考虑标头(PriorityChannel如下一章所述)。它被定义为一个java.lang.Integer.


1. see XML Configuration