消息转换

变压器

消息转换器在实现消息生产者和消息消费者的松散耦合方面发挥着非常重要的作用。您可以在这些组件之间添加转换器,而不是要求每个生成消息的组件都知道下一个消费者期望什么类型。通用转换器,例如将 a 转换String为 XML 文档的转换器,也是高度可重用的。

对于某些系统,最好提供规范的数据模型,但 Spring Integration 的一般理念是不要求任何特定格式。相反,为了获得最大的灵活性,Spring Integration 旨在提供最简单的扩展模型。与其他端点类型一样,在 XML 或 Java 注释中使用声明性配置可以使简单的 POJO 适应消息转换器的角色。本章的其余部分描述了这些配置选项。

为了最大限度地提高灵活性,Spring 不需要基于 XML 的消息负载。尽管如此,如果这确实是您的应用程序的正确选择,该框架确实提供了一些方便的转换器来处理基于 XML 的有效负载。有关这些转换器的更多信息,请参阅XML 支持 - 处理 XML 有效负载

使用 XML 配置转换器

<transformer>元素用于创建消息转换端点。除了input-channeloutput-channel属性,它还需要一个ref属性。ref可以指向包含@Transformer单个方法上的注释的对象(请参阅使用注释配置转换器),或者它可以与method属性中提供的显式方法名称值组合。

<int:transformer id="testTransformer" ref="testTransformerBean" input-channel="inChannel"
             method="transform" output-channel="outChannel"/>
<beans:bean id="testTransformerBean" class="org.foo.TestTransformer" />

ref如果自定义转换器处理程序实现可以在其他<transformer>定义中重用,则通常建议使用属性。但是,如果自定义转换器处理程序实现的范围应限定为 的单个定义,则<transformer>可以定义内部 bean 定义,如以下示例所示:

<int:transformer id="testTransformer" input-channel="inChannel" method="transform"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestTransformer"/>
</transformer>
不允许在同一配置中 同时使用ref属性和内部处理程序定义,因为它会创建模棱两可的条件并导致引发异常。<transformer>
如果ref属性引用了扩展的 bean AbstractMessageProducingHandler(例如框架本身提供的转换器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个都ref必须是一个单独的 bean 实例(或一个prototype-scoped bean)或使用内部<bean/>配置类型。如果您无意中从多个 bean 中引用了相同的消息处理程序,您会得到一个配置异常。

使用 POJO 时,用于转换的方法可能需要Message入站消息的类型或负载类型。@Header它还可以分别使用和@Headers参数注释单独或作为完整映射接受消息头值。方法的返回值可以是任何类型。如果返回值本身是 a Message,则将其传递到转换器的输出通道。

从 Spring Integration 2.0 开始,消息转换器的转换方法不能再返回null。返回null导致异常,因为应该始终期望消息转换器将每个源消息转换为有效的目标消息。换句话说,消息转换器不应该用作消息过滤器,因为有一个专门的<filter>选项。但是,如果您确实需要这种类型的行为(组件可能会返回null并且不应被视为错误),您可以使用服务激活器。它的requires-reply值是false默认值,但可以设置为true以便为null返回值抛出异常,就像转换器一样。

Transformers 和 Spring 表达式语言 (SpEL)

与路由器、聚合器和其他组件一样,从 Spring Integration 2.0 开始,只要转换逻辑相对简单,转换器也可以从SpEL 支持中受益。以下示例显示了如何使用 SpEL 表达式:

<int:transformer input-channel="inChannel"
	output-channel="outChannel"
	expression="payload.toUpperCase() + '- [' + T(System).currentTimeMillis() + ']'"/>

前面的示例在不编写自定义转换器的情况下转换有效负载。我们的有效负载(假设为 a String)是大写的,与当前时间戳连接,并应用了一些格式。

普通变压器

Spring Integration 提供了一些转换器实现。

对象到字符串转换器

因为使用 a 的toString()表示是相当普遍的Object,所以 Spring Integration 提供了 an ObjectToStringTransformer,其输出是Message带有 String的 a payload。这是对入站消息的有效负载String调用操作的结果。toString()以下示例显示了如何声明对象到字符串转换器的实例:

<int:object-to-string-transformer input-channel="in" output-channel="out"/>

该转换器的潜在用途是将一些任意对象发送到file命名空间中的“出站通道适配器”。虽然该通道适配器默认仅支持String、字节数组或java.io.File有效负载,但在适配器处理必要的转换之前立即添加此转换器。只要toString()调用的结果是您想要写入文件的内容,它就可以正常工作。否则,您可以使用前面显示的通用“transformer”元素来提供自定义的基于 POJO 的转换器。

调试时,通常不需要此转换器,因为logging-channel-adapter它能够记录消息有效负载。有关更多详细信息,请参见Wire Tap

对象到字符串的转换器非常简单。toString()它在入站有效负载上调用。从 Spring Integration 3.0 开始,这条规则有两个例外:

  • 如果有效载荷是 a char[],它会调用new String(payload).

  • 如果有效载荷是 a byte[],它会调用new String(payload, charset)charset默认情况下是 UTF-8。charset可以通过在转换器上提供字符集属性来修改。

为了更复杂(例如在运行时动态选择字符集),您可以改用基于 SpEL 表达式的转换器,如以下示例所示:

<int:transformer input-channel="in" output-channel="out"
       expression="new java.lang.String(payload, headers['myCharset']" />

如果您需要将 a 序列化为Object字节数组或将字节数组反序列化回Object,Spring Integration 提供了对称的序列化转换器。默认情况下,它们使用标准 Java 序列化,但您可以分别使用和属性提供 SpringSerializer或策略的实现。下面的例子展示了如何使用 Spring 的序列化器和反序列化器:Deserializerserializerdeserializer

<int:payload-serializing-transformer input-channel="objectsIn" output-channel="bytesOut"/>

<int:payload-deserializing-transformer input-channel="bytesIn" output-channel="objectsOut"
    allow-list="com.mycom.*,com.yourcom.*"/>
从不受信任的来源反序列化数据时,您应该考虑添加一个allow-list包和类模式。默认情况下,所有类都被反序列化。
Object对变压器Map和对Map变压器Object

Spring Integration 还提供了Object-to-MapMap-to-Object转换器,它们使用 JSON 来序列化和反序列化对象图。对象层次结构内省为最原始的类型(Stringint等)。这种类型的路径用 SpEL 描述,key在转换后的Map. 原始类型成为值。

考虑以下示例:

public class Parent{
    private Child child;
    private String name;
    // setters and getters are omitted
}

public class Child{
    private String name;
    private List<String> nickNames;
    // setters and getters are omitted
}

前面示例中的两个类转换为以下内容Map

{person.name=George, person.child.name=Jenna, person.child.nickNames[0]=Jen ...}

基于 JSON 的方法Map可以让您在不共享实际类型的情况下描述对象结构,只要您维护结构,就可以将对象图恢复并重建为不同类型的对象图。

Map例如,可以通过使用-to-Object转换器将前面的结构恢复为以下对象图:

public class Father {
    private Kid child;
    private String name;
    // setters and getters are omitted
}

public class Kid {
    private String name;
    private List<String> nickNames;
    // setters and getters are omitted
}

如果您需要创建“结构化”地图,您可以提供flatten属性。默认值为“真”。如果将其设置为“false”,则结构是Map对象Map

考虑以下示例:

public class Parent {
	private Child child;
	private String name;
	// setters and getters are omitted
}

public class Child {
	private String name;
	private List<String> nickNames;
	// setters and getters are omitted
}

前面示例中的两个类转换为以下内容Map

{name=George, child={name=Jenna, nickNames=[Bimbo, ...]}}

为了配置这些转换器,Spring Integration 为 Object-to-Map 提供命名空间支持,如以下示例所示:

<int:object-to-map-transformer input-channel="directInput" output-channel="output"/>

您还可以将flatten属性设置为 false,如下所示:

<int:object-to-map-transformer input-channel="directInput" output-channel="output" flatten="false"/>

Spring Integration 为 Map-to-Object 提供命名空间支持,如以下示例所示:

<int:map-to-object-transformer input-channel="input"
                         output-channel="output"
                         type="org.something.Person"/>

或者,您可以使用ref属性和原型范围的 bean,如以下示例所示:

<int:map-to-object-transformer input-channel="inputA"
                               output-channel="outputA"
                               ref="person"/>
<bean id="person" class="org.something.Person" scope="prototype"/>
'ref' 和 'type' 属性是互斥的。此外,如果您使用 'ref' 属性,则必须指向一个 'prototype' 范围的 bean。否则,BeanCreationException抛出 a。

从 5.0 版开始,您可以提供ObjectToMapTransformer自定义的JsonObjectMapper - 当您需要特殊格式的日期或空集合(和其他用途)的空值时。有关实现的更多信息,请参阅JSON 转换JsonObjectMapper器。

流转换器

将有效负载转换为 a StreamTransformer(如果提供了 a ,则为 a )。InputStreambyte[]Stringcharset

下面的例子展示了如何stream-transformer在 XML 中使用元素:

<int:stream-transformer input-channel="directInput" output-channel="output"/> <!-- byte[] -->

<int:stream-transformer id="withCharset" charset="UTF-8"
    input-channel="charsetChannel" output-channel="output"/> <!-- String -->

以下示例展示了如何使用StreamTransformer类和@Transformer注解在 Java 中配置流转换器:

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToBytes() {
    return new StreamTransformer(); // transforms to byte[]
}

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public StreamTransformer streamToString() {
    return new StreamTransformer("UTF-8"); // transforms to String
}
JSON 转换器

Spring Integration 提供 Object-to-JSON 和 JSON-to-Object 转换器。以下一对示例展示了如何在 XML 中声明它们:

<int:object-to-json-transformer input-channel="objectMapperInput"/>
<int:json-to-object-transformer input-channel="objectMapperInput"
    type="foo.MyDomainObject"/>

默认情况下,前面清单中的转换器使用 vanilla JsonObjectMapper。它基于类路径的实现。JsonObjectMapper您可以使用适当的选项或基于所需的库(例如 GSON)提供自己的自定义实现,如以下示例所示:

<int:json-to-object-transformer input-channel="objectMapperInput"
    type="something.MyDomainObject" object-mapper="customObjectMapper"/>

从 3.0 版开始,该object-mapper属性引用了一个新策略接口的实例:JsonObjectMapper. 这种抽象允许使用 JSON 映射器的多种实现。提供了包装Jackson 2的实现,并在类路径中检测到版本。类Jackson2JsonObjectMapper分别是 。

您可能希望考虑使用 aFactoryBean或工厂方法来创建JsonObjectMapper具有所需特性的 。下面的例子展示了如何使用这样的工厂:

public class ObjectMapperFactory {

    public static Jackson2JsonObjectMapper getMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        return new Jackson2JsonObjectMapper(mapper);
    }
}

下面的例子展示了如何在 XML 中做同样的事情

<bean id="customObjectMapper" class="something.ObjectMapperFactory"
            factory-method="getMapper"/>

从 2.2 版开始,默认情况下,如果输入消息还没有该标头,则将object-to-json-transformercontent-type头设置为 。application/json

如果您希望将content-type标头设置为其他值或使用某个值(包括application/json)显式覆盖任何现有标头,请使用该content-type属性。如果您希望禁止设置标题,请将content-type属性设置为空字符串 ( "")。这样做会导致消息没有content-type标题,除非输入消息中存在这样的标题。

从版本 3.0 开始,ObjectToJsonTransformer将反映源类型的标头添加到消息中。同样,JsonToObjectTransformer在将 JSON 转换为对象时可以使用这些类型的标头。这些标头映射在 AMQP 适配器中,因此它们与 Spring-AMQP 完全兼容JsonMessageConverter

这使得以下流程无需任何特殊配置即可工作:

  • …​→amqp-outbound-adapter---→

  • ---→amqp-inbound-adapter→json-to-object-transformer→…​

    其中出站适配器配置了 aJsonMessageConverter而入站适配器使用默认的SimpleMessageConverter.

  • …​→object-to-json-transformer→amqp-outbound-adapter---→

  • ---→amqp-inbound-adapter→…​

    其中出站适配器配置了 aSimpleMessageConverter而入站适配器使用默认的JsonMessageConverter.

  • …​→object-to-json-transformer→amqp-outbound-adapter---→

  • ---→amqp-inbound-adapter→json-to-object-transformer→

    两个适配器都配置了SimpleMessageConverter.

使用标头确定类型时,不应提供class属性,因为它优先于标头。

除了 JSON Transformers,Spring Integration 还提供了一个内置的#jsonPathSpEL 函数用于表达式。有关更多信息,请参阅Spring 表达式语言 (SpEL)

从 3.0 版开始,Spring Integration 还提供了一个内置的#xpathSpEL 函数用于表达式。有关详细信息,请参阅#xpath SpEL 函数

从版本 4.0 开始,ObjectToJsonTransformer支持resultType属性,以指定节点 JSON 表示。结果节点树表示取决于提供的JsonObjectMapper. 默认情况下,ObjectToJsonTransformer使用 aJackson2JsonObjectMapper并将对象到节点树的转换委托给ObjectMapper#valueToTree方法。JsonPropertyAccessor当下游消息流使用 SpEL 表达式访问 JSON 数据的属性时,节点 JSON 表示提供了使用效率。有关详细信息,请参阅属性访问器。

从 5.1 版开始,resultType可以将其配置为在使用使用此数据类型操作的下游处理程序时BYTES生成带有有效负载的消息,以便于使用。byte[]

从版本 5.2 开始,JsonToObjectTransformer可以使用 a 配置以ResolvableType在使用目标 JSON 处理器进行反序列化期间支持泛型。此外,此组件现在首先查询请求消息标头以了解 or 的存在,JsonHeaders.RESOLVABLE_TYPE否则JsonHeaders.TYPE_ID回退到配置的类型。现在还根据任何可能的下游场景的请求消息有效负载ObjectToJsonTransformer填充标头。JsonHeaders.RESOLVABLE_TYPE

从版本 5.2.6 开始,JsonToObjectTransformer可以提供 avalueTypeExpression来解析 a ResolvableType,以便有效负载在运行时针对请求消息从 JSON 转换。默认情况下,它会JsonHeaders在请求消息中进行咨询。如果此表达式返回nullResolvableType构建抛出 a ClassNotFoundException,则转换器将回退到提供的targetType. 这个逻辑以表达式的形式出现,因为JsonHeaders可能没有真正的类值,而是一些类型 id,必须根据一些外部注册表映射到目标类。

Apache Avro 变形金刚

5.2 版添加了简单的转换器来转换到/从 Apache Avro。

它们不复杂,因为没有模式注册表;转换器只需使用嵌入在SpecificRecord从 Avro 模式生成的实现中的模式。

发送到的消息SimpleToAvroTransformer必须具有实现的有效负载SpecificRecord;变压器可以处理多种类型。SimpleFromAvroTransformer必须配置一个类,SpecificRecord该类用作反序列化的默认类型。您还可以指定一个 SpEL 表达式来确定使用该setTypeExpression方法反序列化的类型。默认 SpEL 表达式是headers[avro_type]( AvroHeaders.TYPE),默认情况下,它由SimpleToAvroTransformer源类的完全限定类名填充。如果表达式返回nulldefaultType则使用 。

SimpleToAvroTransformer也有方法setTypeExpression。这允许生产者和消费者解耦,发送者可以将标头设置为表示类型的某个令牌,然后消费者将该令牌映射到一个类型。

使用注解配置转换器

您可以将@Transformer注释添加到需要Message类型或消息负载类型的方法。返回值的处理方式与前面描述元素的部分中描述的<transformer>方式完全相同。以下示例显示如何使用@Transformer注解将 aString转换为Order

@Transformer
Order generateOrder(String productId) {
    return new Order(productId);
}

Transformer 方法也可以接受@Header@Headers注释,如Annotation Support. 以下示例展示了如何使用@Header注解:

@Transformer
Order generateOrder(String productId, @Header("customerName") String customer) {
    return new Order(productId, customer);
}

标题过滤器

有时,您的转换用例可能就像删除一些标题一样简单。对于这样的用例,Spring Integration 提供了一个 header 过滤器,允许您指定应该从输出消息中删除的某些 header 名称(例如,出于安全原因删除 headers 或仅临时需要的值)。基本上,标头过滤器与标头丰富器相反。后者在Header Enricher中讨论。以下示例定义了一个标头过滤器:

<int:header-filter input-channel="inputChannel"
		output-channel="outputChannel" header-names="lastName, state"/>

如您所见,标头过滤器的配置非常简单。它是具有输入和输出通道和header-names属性的典型端点。该属性接受需要删除的标题的名称(如果有多个,则用逗号分隔)。因此,在前面的示例中,出站消息中不存在名为“lastName”和“state”的标头。

基于编解码器的转换器

请参阅编解码器

内容丰富器

有时,您可能需要使用比目标系统提供的更多信息来增强请求。数据丰富器模式描述了各种场景以及可让您满足此类需求的组件 (Enricher)。

Spring IntegrationCore模块包括两个丰富器:

它还包括三个特定于适配器的标头丰富器:

请参阅本参考手册的适配器特定部分以了解有关这些适配器的更多信息。

有关表达式支持的更多信息,请参阅Spring Expression Language (SpEL)

标题丰富器

如果您只需要向消息添加标头,并且标头不是由消息内容动态确定的,那么引用转换器的自定义实现可能会过大。出于这个原因,Spring Integration 提供了对标头丰富器模式的支持。它通过<header-enricher>元素暴露出来。以下示例显示了如何使用它:

<int:header-enricher input-channel="in" output-channel="out">
    <int:header name="foo" value="123"/>
    <int:header name="bar" ref="someBean"/>
</int:header-enricher>

标题丰富器还提供有用的子元素来设置众所周知的标题名称,如以下示例所示:

<int:header-enricher input-channel="in" output-channel="out">
    <int:error-channel ref="applicationErrorChannel"/>
    <int:reply-channel ref="quoteReplyChannel"/>
    <int:correlation-id value="123"/>
    <int:priority value="HIGHEST"/>
    <routing-slip value="channel1; routingSlipRoutingStrategy; request.headers[myRoutingSlipChannel]"/>
    <int:header name="bar" ref="someBean"/>
</int:header-enricher>

前面的配置表明,对于众所周知的标题(例如errorChannel, correlationId, priority, replyChannel,routing-slip等),<header>您可以使用方便的子元素,而不是使用必须同时提供标题“名称”和“值”的通用子元素-elements 直接设置这些值。

从版本 4.1 开始,标题丰富器提供了一个routing-slip子元素。有关详细信息,请参阅路由表。

POJO 支持

通常,标头值不能静态定义,必须根据消息中的某些内容动态确定。ref这就是为什么标头丰富器还允许您使用andmethod属性来指定 bean 引用。指定的方法计算标题值。考虑以下配置和一个带有修改 a 的方法的 bean String

<int:header-enricher input-channel="in" output-channel="out">
    <int:header name="something" method="computeValue" ref="myBean"/>
</int:header-enricher>

<bean id="myBean" class="thing1.thing2.MyBean"/>
public class MyBean {

    public String computeValue(String payload){
        return payload.toUpperCase() + "_US";
    }
}

您还可以将 POJO 配置为内部 bean,如以下示例所示:

<int:header-enricher  input-channel="inputChannel" output-channel="outputChannel">
    <int:header name="some_header">
        <bean class="org.MyEnricher"/>
    </int:header>
</int:header-enricher>

您可以类似地指向 Groovy 脚本,如以下示例所示:

<int:header-enricher  input-channel="inputChannel" output-channel="outputChannel">
    <int:header name="some_header">
        <int-groovy:script location="org/SampleGroovyHeaderEnricher.groovy"/>
    </int:header>
</int:header-enricher>
SpEL 支持

在 Spring Integration 2.0 中,我们引入了Spring 表达式语言 (SpEL)的便利性,以帮助配置许多不同的组件。标题丰富器就是其中之一。再次查看前面显示的 POJO 示例。您可以看到确定标头值的计算逻辑非常简单。一个自然的问题是:“有没有更简单的方法来完成这个?”。这就是 SpEL 展示其真正力量的地方。考虑以下示例:

<int:header-enricher input-channel="in" output-channel="out">
    <int:header name="foo" expression="payload.toUpperCase() + '_US'"/>
</int:header-enricher>

通过在这种简单的情况下使用 SpEL,您不再需要提供单独的类并在应用程序上下文中对其进行配置。您需要做的就是expression使用有效的 SpEL 表达式配置属性。'payload' 和 'headers' 变量绑定到 SpEL 评估上下文,让您可以完全访问传入的消息。

使用 Java 配置配置 Header Enricher

以下两个示例展示了如何将 Java 配置用于标头丰富器:

@Bean
@Transformer(inputChannel = "enrichHeadersChannel", outputChannel = "emailChannel")
public HeaderEnricher enrichHeaders() {
    Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd =
            Collections.singletonMap("emailUrl",
                      new StaticHeaderValueMessageProcessor<>(this.imapUrl));
    HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
    return enricher;
}

@Bean
@Transformer(inputChannel="enrichHeadersChannel", outputChannel="emailChannel")
public HeaderEnricher enrichHeaders() {
    Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<>();
    headersToAdd.put("emailUrl", new StaticHeaderValueMessageProcessor<String>(this.imapUrl));
    Expression expression = new SpelExpressionParser().parseExpression("payload.from[0].toString()");
    headersToAdd.put("from",
               new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class));
    HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
    return enricher;
}

第一个示例添加了一个文字标头。第二个示例添加了两个标头,一个字面标头和一个基于 SpEL 表达式的标头。

使用 Java DSL 配置 Header Enricher

以下示例显示了标头丰富器的 Java DSL 配置:

@Bean
public IntegrationFlow enrichHeadersInFlow() {
    return f -> f
                ...
                .enrichHeaders(h -> h.header("emailUrl", this.emailUrl)
                                     .headerExpression("from", "payload.from[0].toString()"))
                .handle(...);
}
标头频道注册表

从 Spring Integration 3.0 开始,一个新的子元素<int:header-channels-to-string/>可用。它没有属性。这个新的子元素将现有的replyChannelerrorChannel标头(当它们是 a 时MessageChannel)转换为 aString并将通道存储在注册表中以供以后解决,当需要发送回复或处理错误时。这对于标头可能丢失的情况很有用——例如,将消息序列化到消息存储中或通过 JMS 传输消息时。如果标头尚不存在或不是 a MessageChannel,则不会进行任何更改。

使用此功能需要HeaderChannelRegistrybean 的存在。默认情况下,框架会创建一个DefaultHeaderChannelRegistry默认过期时间(60 秒)的。在此之后,频道将从注册表中删除。id要更改此行为,请使用of定义一个 bean,integrationHeaderChannelRegistry并使用构造函数参数(以毫秒为单位)配置所需的默认延迟。

从 4.1 版开始,您可以在定义上设置调用的属性removeOnGet,并且在第一次使用时立即删除映射条目。这在大容量环境中可能很有用,并且当通道仅使用一次时,而不是等待收割者将其移除。true<bean/>

HeaderChannelRegistry一种size()方法可以确定注册表的当前大小。该runReaper()方法取消当前计划任务并立即运行 reaper。然后根据当前延迟安排任务再次运行。可以通过获取对注册表的引用直接调用这些方法,或者您可以将带有例如以下内容的消息发送到控制总线:

"@integrationHeaderChannelRegistry.runReaper()"

这个子元素很方便,相当于指定以下配置:

<int:reply-channel
    expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.replyChannel)"
    overwrite="true" />
<int:error-channel
    expression="@integrationHeaderChannelRegistry.channelToChannelName(headers.errorChannel)"
    overwrite="true" />

从 4.1 版开始,您现在可以覆盖注册表配置的收割机延迟,以便通道映射至少保留指定的时间,而不管收割机延迟如何。以下示例显示了如何执行此操作:

<int:header-enricher input-channel="inputTtl" output-channel="next">
    <int:header-channels-to-string time-to-live-expression="120000" />
</int:header-enricher>

<int:header-enricher input-channel="inputCustomTtl" output-channel="next">
    <int:header-channels-to-string
        time-to-live-expression="headers['channelTTL'] ?: 120000" />
</int:header-enricher>

在第一种情况下,每个标头通道映射的生存时间为两分钟。在第二种情况下,生存时间在消息头中指定,如果没有头,则使用 E​​lvis 运算符使用两分钟。

有效载荷丰富器

在某些情况下,如前所述,标头丰富器可能不够用,并且有效负载本身可能必须使用附加信息进行丰富。例如,进入 Spring Integration 消息系统的订单消息必须根据提供的客户编号查找订单的客户,然后使用该信息丰富原始有效负载。

Spring Integration 2.1 引入了有效负载丰富器。有效负载丰富器定义了一个端点,该端点将 a 传递Message给公开的请求通道,然后期待回复消息。然后,回复消息成为评估表达式以丰富目标有效负载的根对象。

有效负载丰富器通过enricher元素提供完整的 XML 命名空间支持。为了发送请求消息,有效负载丰富器具有一个request-channel属性,可让您将消息分派到请求通道。

基本上,通过定义请求通道,有效负载丰富器充当网关,等待发送到请求通道的消息返回。然后,丰富器使用回复消息提供的数据来扩充消息的有效负载。

向请求通道发送消息时,您还可以选择使用该request-payload-expression属性仅发送原始有效负载的子集。

有效载荷的丰富是通过 SpEL 表达式配置的,提供了最大程度的灵活性。因此,您不仅可以使用来自回复通道的直接值来丰富有效负载Message,还可以使用 SpEL 表达式从该消息中提取子集或应用其他内联转换,从而进一步操作数据。

如果您只需要使用静态值丰富有效负载,则无需提供该request-channel属性。

浓缩器是变压器的一种变体。在许多情况下,您可以使用有效负载丰富器或通用转换器实现向您的消息有效负载添加额外数据。您应该熟悉 Spring Integration 提供的所有支持转换的组件,并仔细选择在语义上最适合您的业务案例的实现。
配置

以下示例显示了有效负载丰富器的所有可用配置选项:

<int:enricher request-channel=""                           (1)
              auto-startup="true"                          (2)
              id=""                                        (3)
              order=""                                     (4)
              output-channel=""                            (5)
              request-payload-expression=""                (6)
              reply-channel=""                             (7)
              error-channel=""                             (8)
              send-timeout=""                              (9)
              should-clone-payload="false">                (10)
    <int:poller></int:poller>                              (11)
    <int:property name="" expression="" null-result-expression="'Could not determine the name'"/>   (12)
    <int:property name="" value="23" type="java.lang.Integer" null-result-expression="'0'"/>
    <int:header name="" expression="" null-result-expression=""/>   (13)
    <int:header name="" value="" overwrite="" type="" null-result-expression=""/>
</int:enricher>
1 向其发送消息以获取用于扩充的数据的通道。可选的。
2 生命周期属性表明此组件是否应在应用程序上下文启动期间启动。默认为真。可选的。
3 底层 bean 定义的 ID,可以是 anEventDrivenConsumer或 a PollingConsumer。可选的。
4 指定此端点作为订阅者连接到通道时的调用顺序。当该通道使用“故障转移”调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询使用者时,它不起作用。可选的。
5 标识在此端点处理消息后将消息发送到的消息通道。可选的。
6 默认情况下,原始消息的有效负载用作发送到request-channel. 通过将 SpEL 表达式指定为request-payload-expression属性的值,您可以使用原始负载的子集、标头值或任何其他可解析的 SpEL 表达式作为发送到请求通道的负载的基础。对于表达式评估,完整消息可作为“根对象”使用。例如,以下 SpEL 表达式(以及其他)是可能的:payload.something, headers.something, new java.util.Date(),'thing1' + 'thing2'
7 期望回复消息的通道。这是可选的。通常,自动生成的临时回复通道就足够了。可选的。
8 ErrorMessage如果 anException发生在 下游,则发送到的通道request-channel。这使您能够返回替代对象以用于扩充。如果未设置,则将 anException抛出给调用者。可选的。
9 如果通道可能阻塞,则在向通道发送消息时等待的最长时间(以毫秒为单位)。例如,如果已达到最大容量,队列通道可以阻塞直到有可用空间。在内部,发送超时设置在 上,MessagingTemplate并最终在调用 上的发送操作时应用MessageChannel。默认情况下,发送超时设置为“-1”,这可能导致 上的发送操作MessageChannel(取决于实现)无限期阻塞。可选的。
10 Cloneable布尔值,指示在将消息发送到请求通道以获取丰富数据之前是否应克隆任何实现的有效负载。克隆版本将用作最终回复的目标有效负载。默认值为false. 可选的。
11 如果此端点是轮询使用者,则允许您配置消息轮询器。可选的。
12 每个property子元素提供一个属性的名称(通过强制name属性)。该属性应该可以在目标有效负载实例上设置。还必须提供其中一个valueorexpression属性——前者用于设置文字值,后者用于评估 SpEL 表达式。评估上下文的根对象是从此丰富器启动的流返回的消息——如果没有请求通道或应用程序上下文(使用@<beanName>.<beanProperty>SpEL 语法),则为输入消息。从 4.0 版本开始,在指定value属性时,还可以指定可选type属性。当目标是类型化的 setter 方法时,框架会适当地强制该值(只要PropertyEditor) 用于处理转换。但是,如果目标有效负载是 a Map,则将使用值填充条目而不进行转换。type例如,该属性允许您将String包含数字的 a转换Integer为目标有效负载中的值。从 4.1 版开始,您还可以指定可选null-result-expression属性。当enricher返回 null 时,将对其进行评估,并返回评估的输出。
13 每个header子元素提供消息头的名称(通过强制name属性)。还必须提供其中一个valueorexpression属性——前者用于设置文字值,后者用于评估 SpEL 表达式。评估上下文的根对象是从此丰富器启动的流中返回的消息 — 如果没有请求通道或应用程序上下文,则为输入消息(使用 '@<beanName>.<beanProperty>' SpEL 语法) . 请注意,与 类似<header-enricher><enricher>元素的header元素具有typeoverwrite属性。但是,一个关键的区别是,对于<enricher>,该overwrite属性true默认情况下与<enricher>元素的<property>子元素。从 4.1 版开始,您还可以指定可选null-result-expression属性。当enricher返回 null 时,将对其进行评估,并返回评估的输出。
例子

本节包含在各种情况下使用有效负载丰富器的几个示例。

此处显示的代码示例是 Spring Integration Samples 项目的一部分。请参阅Spring 集成示例

在以下示例中,User对象作为 的有效负载传递Message

<int:enricher id="findUserEnricher"
              input-channel="findUserEnricherChannel"
              request-channel="findUserServiceChannel">
    <int:property name="email"    expression="payload.email"/>
    <int:property name="password" expression="payload.password"/>
</int:enricher>

User几个属性,但username最初只设置了 。丰富器的request-channel属性配置为将 传递UserfindUserServiceChannel.

通过隐式设置reply-channel,返回一个User对象,并通过使用property子元素,从回复中提取属性并用于丰富原始有效负载。

如何只将数据子集传递给请求通道?

使用request-payload-expression属性时,可以将有效负载的单个属性而不是完整消息传递到请求通道。在以下示例中,用户名属性被传递到请求通道:

<int:enricher id="findUserByUsernameEnricher"
              input-channel="findUserByUsernameEnricherChannel"
              request-channel="findUserByUsernameServiceChannel"
              request-payload-expression="payload.username">
    <int:property name="email"    expression="payload.email"/>
    <int:property name="password" expression="payload.password"/>
</int:enricher>

请记住,虽然只传递了用户名,但发送到请求通道的结果消息包含完整的MessageHeaders.

如何丰富由收集数据组成的有效负载?

在以下示例中,传入User的是 a 而不是对象:Map

<int:enricher id="findUserWithMapEnricher"
              input-channel="findUserWithMapEnricherChannel"
              request-channel="findUserByUsernameServiceChannel"
              request-payload-expression="payload.username">
    <int:property name="user" expression="payload"/>
</int:enricher>

Map包含username映射键下的用户名。只有username被传递到请求通道。回复包含一个完整的User对象,最终添加到键Mapuser

如何在不使用请求通道的情况下使用静态信息丰富有效负载?

以下示例根本不使用请求通道,而是仅使用静态值丰富了消息的有效负载:

<int:enricher id="userEnricher"
              input-channel="input">
    <int:property name="user.updateDate" expression="new java.util.Date()"/>
    <int:property name="user.firstName" value="William"/>
    <int:property name="user.lastName"  value="Shakespeare"/>
    <int:property name="user.age"       value="42"/>
</int:enricher>

请注意,“静态”这个词在这里用得很松散。您仍然可以使用 SpEL 表达式来设置这些值。

索赔检查

在前面的部分中,我们介绍了几个内容丰富器组件,它们可以帮助您处理消息丢失一条数据的情况。我们还讨论了内容过滤,它允许您从消息中删除数据项。但是,有时我们想暂时隐藏数据。例如,在分布式系统中,我们可能会收到一条带有非常大有效负载的消息。一些间歇性消息处理步骤可能不需要访问此有效负载,而有些可能只需要访问某些标头,因此通过每个处理步骤携带较大的消息有效负载可能会导致性能下降,可能产生安全风险,并且可能使调试更加困难。

中存储(或声明检查)模式描述了一种机制,该机制允许您将数据存储在众所周知的位置,同时只维护指向该数据所在位置的指针(声明检查)。您可以将该指针作为新消息的有效负载传递,从而让消息流中的任何组件在需要时立即获取实际数据。这种方法与挂号信的流程非常相似,您在邮箱中获得索赔支票,然后必须去邮局领取您的实际包裹。这也与航班或酒店后的行李提取相同。

Spring Integration 提供了两种类型的声明检查转换器:

  • 传入索赔检查变压器

  • 传出索赔检查变压器

方便的基于名称空间的机制可用于配置它们。

传入索赔检查变压器

传入声明检查转换器通过将传入消息存储在由其message-store属性标识的消息存储中来转换传入消息。以下示例定义了一个传入声明检查转换器:

<int:claim-check-in id="checkin"
        input-channel="checkinChannel"
        message-store="testMessageStore"
        output-channel="output"/>

在前面的配置中,接收input-channel到的消息被持久化到使用message-store属性标识的消息存储中,并使用生成的 ID 进行索引。该 ID 是该消息的声明检查。声明检查也成为发送到output-channel.

现在,假设在某些时候您确实需要访问实际消息。您可以手动访问消息存储并获取消息的内容,也可以使用相同的方法(创建转换器),只是现在您使用传出声明检查转换器将声明检查转换为实际消息。

以下清单概述了传入索赔检查转换器的所有可用参数:

<int:claim-check-in auto-startup="true"             (1)
                    id=""                           (2)
                    input-channel=""                (3)
                    message-store="messageStore"    (4)
                    order=""                        (5)
                    output-channel=""               (6)
                    send-timeout="">                (7)
    <int:poller></int:poller>                       (8)
</int:claim-check-in>
1 生命周期属性表明此组件是否应在应用程序上下文启动期间启动。它默认为true. Chain此属性在元素内不可用。可选的。
2 标识底层 bean 定义的 ID ( MessageTransformingHandler)。Chain此属性在元素内不可用。可选的。
3 此端点的接收消息通道。Chain此属性在元素内不可用。可选的。
4 引用MessageStore此声明检查变压器将使用的 。如果未指定,则默认引用是一个名为messageStore. 可选的。
5 指定此端点作为订阅者连接到通道时的调用顺序。当该通道使用failover调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询消费者时,它不起作用。Chain此属性在元素内不可用。可选的。
6 标识消息经过此端点处理后发送到的消息通道。Chain此属性在元素内不可用。可选的。
7 指定向输出通道发送回复消息时要等待的最长时间(以毫秒为单位)。默认为-1 — 无限期阻塞。Chain此属性在元素内不可用。可选的。
8 定义一个轮询器。此元素在元素内不可用Chain。可选的。

传出索赔检查变压器

传出声明检查转换器允许您将带有声明检查有效负载的消息转换为以原始内容作为其有效负载的消息。

<int:claim-check-out id="checkout"
        input-channel="checkoutChannel"
        message-store="testMessageStore"
        output-channel="output"/>

在前面的配置中,接收到的消息input-channel应该有一个声明检查作为它的有效负载。传出声明检查转换器通过在消息存储中查询由提供的声明检查标识的消息,将其转换为具有原始有效负载的消息。然后它将新签出的消息发送到output-channel.

以下清单概述了传出索赔检查变压器的所有可用参数:

<int:claim-check-out auto-startup="true"             (1)
                     id=""                           (2)
                     input-channel=""                (3)
                     message-store="messageStore"    (4)
                     order=""                        (5)
                     output-channel=""               (6)
                     remove-message="false"          (7)
                     send-timeout="">                (8)
    <int:poller></int:poller>                        (9)
</int:claim-check-out>
1 生命周期属性表明此组件是否应在应用程序上下文启动期间启动。它默认为true. Chain此属性在元素内不可用。可选的。
2 标识底层 bean 定义的 ID ( MessageTransformingHandler)。Chain此属性在元素内不可用。可选的。
3 此端点的接收消息通道。Chain此属性在元素内不可用。可选的。
4 引用MessageStore此声明检查变压器将使用的 。如果未指定,则默认引用是一个名为messageStore. 可选的。
5 指定此端点作为订阅者连接到通道时的调用顺序。当该通道使用failover调度策略时,这一点尤其重要。当此端点本身是具有队列的通道的轮询消费者时,它不起作用。Chain此属性在元素内不可用。可选的。
6 标识消息经过此端点处理后发送到的消息通道。Chain此属性在元素内不可用。可选的。
7 如果设置为,则此转换true器将消息从 中删除。MessageStore当消息只能被“认领”一次时,此设置很有用。它默认为false. 可选的。
8 指定向输出通道发送回复消息时要等待的最长时间(以毫秒为单位)。它默认为-1 - 无限期阻塞。Chain此属性在元素内不可用。可选的。
9 定义一个轮询器。此元素在元素内不可用Chain。可选的。

索赔一次

有时,一条特定的消息只能声明一次。作为类比,考虑处理飞机行李的过程。您在出发时托运行李并在抵达时领取。一旦行李被认领,必须先重新托运,才能再次认领。为了适应这种情况,我们在转换器上引入了remove-message布尔属性。claim-check-out此属性false默认设置为。但是,如果设置为true,则已声明的消息将从 中删除,MessageStore因此无法再次声明该消息。

此功能在存储空间方面会产生影响,尤其是在Map基于内存SimpleMessageStoreOutOfMemoryException. 因此,如果您不希望提出多个声明,我们建议您将remove-message属性的值设置为true。以下示例显示了如何使用该remove-message属性:

<int:claim-check-out id="checkout"
        input-channel="checkoutChannel"
        message-store="testMessageStore"
        output-channel="output"
        remove-message="true"/>

消息存储上的一句话

尽管我们很少关心声明检查的细节(只要它们有效),但您应该知道 Spring Integration 中实际声明检查(指针)的当前实现使用 UUID 来确保唯一性。

org.springframework.integration.store.MessageStore是用于存储和检索消息的策略接口。Spring Integration 提供了两种方便的实现:

  • SimpleMessageStore: 一个Map基于内存的实现(默认,有利于测试)

  • JdbcMessageStore:通过 JDBC 使用关系数据库的实现

编解码器

Spring Integration 4.2 版本引入了Codec抽象。编解码器将对象编码和解码到byte[]. 它们提供了 Java 序列化的替代方案。一个优点是,通常,对象不需要实现Serializable。我们提供了一种使用Kryo进行序列化的实现,但您可以提供自己的实现以用于以下任何组件:

  • EncodingPayloadTransformer

  • DecodingTransformer

  • CodecMessageConverter

EncodingPayloadTransformer

该转换byte[]器使用编解码器将有效负载编码为 a。它不影响邮件标头。

有关更多信息,请参阅Javadoc

DecodingTransformer

该转换器使用编解码器对 abyte[]进行解码。它需要配置Class对象应该解码到的 (或解析为 a 的表达式Class)。如果结果对象是 a Message<?>,则不保留入站标头。

有关更多信息,请参阅Javadoc

CodecMessageConverter

某些端点(例如 TCP 和 Redis)没有消息头的概念。它们支持使用 a MessageConverter,并且CodecMessageConverter可用于将消息转换为 a 或从 abyte[]传输。

有关更多信息,请参阅Javadoc

克里奥

目前,这是唯一的实现Codec,它提供了两种Codec

  • PojoCodec: 用于变压器

  • MessageCodec: 用在CodecMessageConverter

该框架提供了几个自定义序列化器:

  • FileSerializer

  • MessageHeadersSerializer

  • MutableMessageHeadersSerializer

第一个可以与 一起使用,方法是PojoCodec用 初始化它FileKryoRegistrar。第二个和第三个与 一起使用MessageCodec,它是用 初始化的MessageKryoRegistrar

自定义 Kryo

默认情况下,Kryo 将未知的 Java 类型委托给它的FieldSerializer. StringKryo 还为每个原始类型以及、Collection和注册了默认序列化程序MapFieldSerializer使用反射来导航对象图。一种更有效的方法是实现一个自定义序列化程序,该序列化程序知道对象的结构并且可以直接序列化选定的原始字段。以下示例显示了这样一个序列化程序:

public class AddressSerializer extends Serializer<Address> {

    @Override
    public void write(Kryo kryo, Output output, Address address) {
        output.writeString(address.getStreet());
        output.writeString(address.getCity());
        output.writeString(address.getCountry());
    }

    @Override
    public Address read(Kryo kryo, Input input, Class<Address> type) {
        return new Address(input.readString(), input.readString(), input.readString());
    }
}

如Kryo 文档中所述,该Serializer接口公开了KryoInputOutput,它们提供了对包含哪些字段和其他内部设置的完全控制。

注册自定义序列化程序时,您需要一个注册 ID。注册 ID 是任意的。但是,在我们的例子中,ID 必须明确定义,因为分布式应用程序中的每个 Kryo 实例都必须使用相同的 ID。Kryo 推荐使用小的正整数并保留一些 id(值 < 10)。Spring Integration 当前默认使用 40、41 和 42(用于前面提到的文件和消息头序列化程序)。我们建议您从 60 开始,以允许在框架中进行扩展。您可以通过配置前面提到的注册商来覆盖这些框架默认值。
使用自定义 Kryo 序列化程序

如果您需要自定义序列化,请参阅Kryo文档,因为您需要使用原生 API 进行自定义。例如,请参阅MessageCodec实现。

实现 KryoSerializable

如果您对域对象源代码具有写入权限,则可以按照此处KryoSerializable的描述实现。在这种情况下,该类本身提供序列化方法,不需要进一步配置。然而,基准测试表明这不如显式注册自定义序列化程序有效。以下示例显示了一个自定义 Kryo 序列化程序:

public class Address implements KryoSerializable {
    ...

    @Override
    public void write(Kryo kryo, Output output) {
        output.writeString(this.street);
        output.writeString(this.city);
        output.writeString(this.country);
    }

    @Override
    public void read(Kryo kryo, Input input) {
        this.street = input.readString();
        this.city = input.readString();
        this.country = input.readString();
    }
}

您还可以使用此技术来包装 Kryo 以外的序列化库。

使用@DefaultSerializer注解

Kryo 还提供了@DefaultSerializer注释,如此所述。

@DefaultSerializer(SomeClassSerializer.class)
public class SomeClass {
       // ...
}

如果您对域对象具有写入权限,这可能是指定自定义序列化程序的更简单方法。请注意,这不会使用 ID 注册类,这可能会使该技术在某些情况下无用。


1. see XML Configuration