消息发布

(面向方面​​的编程)AOP 消息发布功能允许您构建和发送消息作为方法调用的副产品。例如,假设您有一个组件,并且每次该组件的状态发生变化时,您都希望收到一条消息通知。发送此类通知的最简单方法是将消息发送到专用通道,但是如何将更改对象状态的方法调用连接到消息发送过程,通知消息应该如何构造?AOP 消息发布功能使用配置驱动的方法处理这些职责。

消息发布配置

Spring Integration 提供了两种方法:XML 配置和注解驱动 (Java) 配置。

@Publisher带有注释的注释驱动配置

注释驱动的方法允许您使用@Publisher注释来注释任何方法以指定“通道”属性。从 5.1 版开始,要打开此功能,您必须在某些类上使用@EnablePublisher注释。@Configuration有关详细信息,请参阅配置@EnableIntegration。消息由方法调用的返回值构造并发送到“通道”属性指定的通道。要进一步管理消息结构,您还可以结合使用@Payload@Header注释。

在内部,Spring Integration 的这个消息发布功能通过定义使用 Spring AOPPublisherAnnotationAdvisor和 Spring Expression Language (SpEL),为您提供了相当大的灵活性和Message对其发布结构的控制。

PublisherAnnotationAdvisor定义并绑定以下变量:

  • #return: 绑定到返回值,让您引用它或其属性(例如,#return.something其中 'something' 是绑定到的对象的属性#return

  • #exception:如果方法调用抛出异常,则绑定到异常

  • #args:绑定到方法参数,以便您可以按名称提取单个参数(例如,#args.fname

考虑以下示例:

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

在前面的示例中,消息是使用以下结构构造的:

  • 消息负载是方法的返回类型和值。这是默认设置。

  • 一条新构建的消息被发送到默认发布者通道,该通道配置了注释后处理器(本节稍后介绍)。

以下示例与前面的示例相同,不同之处在于它不使用默认发布通道:

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

我们不使用默认发布通道,而是通过设置@Publisher注释的“通道”属性来指定发布通道。我们还添加了一个@Header注释,这导致名为“last”的消息头与“lname”方法参数具有相同的值。该标头被添加到新构造的消息中。

以下示例与前面的示例几乎相同:

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一的区别是我们@Payload在方法上使用注解来显式指定方法的返回值应该用作消息的有效负载。

以下示例通过在@Payload注释中使用 Spring 表达式语言来扩展先前的配置,以进一步指示框架应如何构造消息:

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

在前面的示例中,消息是方法调用的返回值和“lname”输入参数的串联。名为“x”的消息头的值由“num”输入参数确定。该标头被添加到新构造的消息中。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

在前面的示例中,您会看到@Payload注释的另一种用法。在这里,我们注释了一个方法参数,该参数成为新构造的消息的有效负载。

与 Spring 中的大多数其他注释驱动特性一样,您需要注册一个后处理器 ( PublisherAnnotationBeanPostProcessor)。以下示例显示了如何执行此操作:

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

要获得更简洁的配置,您可以改用命名空间支持,如以下示例所示:

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

对于 Java 配置,您必须使用@EnablePublisher注解,如以下示例所示:

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

从版本 5.1.3 开始,<int:enable-publisher>组件以及@EnablePublisher注解具有用于调整配置的proxy-target-class和属性。orderProxyFactory

与其他 Spring 注释(@Component@Scheduled等)类似,您也可以@Publisher用作元注释。这意味着您可以定义自己的注释,这些注释的处理方式与@Publisher自身相同。以下示例显示了如何执行此操作:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在前面的例子中,我们定义了@Audit注解,它本身是用 进行注解的@Publisher。另请注意,您可以channel在元注释上定义一个属性,以封装在此注释内发送消息的位置。现在您可以使用注解来注解任何方法@Audit,如以下示例所示:

@Audit
public String test() {
    return "Hello";
}

在前面的示例中,该test()方法的每次调用都会生成一条消息,其中包含根据其返回值创建的有效负载。每条消息都发送到名为 的通道auditChannel。这种技术的好处之一是您可以避免在多个注释中重复相同的通道名称。您还可以在您自己的、可能特定于域的注释和框架提供的注释之间提供一定程度的间接性。

您还可以对类进行注解,这样您就可以将此注解的属性应用于该类的每个公共方法,如以下示例所示:

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

<publishing-interceptor>使用元素的基于 XML 的方法

基于 XML 的方法允许您配置与基于名称空间的MessagePublishingInterceptor. 它肯定比注解驱动的方法有一些好处,因为它允许您使用 AOP 切入点表达式,因此可能一次拦截多个方法或拦截并发布您没有源代码的方法。

使用 XML 配置消息发布,您只需要做以下两件事:

  • MessagePublishingInterceptor使用<publishing-interceptor>XML 元素提供配置。

  • 提供 AOP 配置以将其MessagePublishingInterceptor应用于托管对象。

以下示例显示了如何配置publishing-interceptor元素:

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor>配置看起来与基于注释的方法非常相似,并且还使用了 Spring 表达式语言的强大功能。

在前面的示例中,执行 a 的echo方法testBean会呈现Message具有以下结构的 a:

  • 有效负载的Message类型String具有以下内容:Echoing: [value],其中value是执行方法返回的值。

  • Message一个标题,其名称为things,值为something

  • Message发送到echoChannel

第二种方法与第一种方法非常相似。在这里,每个以 'repl' 开头的方法都会呈现Message具有以下结构的 a:

  • Message有效负载与前面的示例相同。

  • Message有一个名为的标头,things其值是 SpEL 表达式的结果'something'.toUpperCase()

  • Message发送到echoChannel

第二种方法,映射以 开头的任何方法的执行echoDef,产生Message具有以下结构的 a:

  • Message有效负载是执行方法返回的值。

  • 由于channel未提供该属性,因此Message将 发送到由defaultChannel定义的publisher

对于简单的映射规则,您可以依赖publisher默认值,如以下示例所示:

<publishing-interceptor id="anotherInterceptor"/>

前面的示例将与切入点表达式匹配的每个方法的返回值映射到有效负载并发送到default-channel. 如果您不指定defaultChannel(如前面的示例未指定),则消息将发送到全局nullChannel(相当于/dev/null)。

异步发布

发布与组件的执行发生在同一个线程中。因此,默认情况下,它是同步的。这意味着整个消息流必须等到发布者的流完成。但是,开发人员通常希望完全相反:使用此消息发布功能来启动异步流。例如,您可能托管一个接收远程请求的服务(HTTP、WS 等)。您可能希望在内部将此请求发送到可能需要一段时间的进程中。但是,您可能还想立即回复用户。因此,您可以使用“output-channel”或“replyChannel”标头在使用消息时将类似确认的简单回复发送回调用方,而不是向输出通道发送入站请求以进行处理(传统方式)-发布者功能来启动一个复杂的流程。

以下示例中的服务接收到一个复杂的有效负载(需要进一步发送以进行处理),但它还需要通过简单的确认来回复调用者:

public String echo(Object complexPayload) {
     return "ACK";
}

因此,我们没有将复杂流连接到输出通道,而是使用消息发布功能。我们将其配置为通过使用服务方法的输入参数(如上例所示)创建一条新消息,并将其发送到“localProcessChannel”。为了确保这个流程是异步的,我们需要做的就是将它发送到任何类型的异步通道(ExecutorChannel在下一个示例中)。以下示例显示了如何异步publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleservice" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

处理这种情况的另一种方法是使用窃听器。请参阅接线接头

基于预定触发器生成和发布消息

在前面的部分中,我们查看了消息发布功能,它构建和发布消息作为方法调用的副产品。但是,在这些情况下,您仍然负责调用该方法。expressionSpring Integration 2.0通过“inbound-channel-adapter”元素上的新属性增加了对预定消息生产者和发布者的支持。您可以基于多个触发器进行调度,其中任何一个都可以在“poller”元素上配置。目前,我们支持cronfixed-ratefixed-delay任何由您实现并由 'trigger' 属性值引用的自定义触发器。

如前所述,通过<inbound-channel-adapter>XML 元素提供对预定生产者和发布者的支持。考虑以下示例:

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

前面的示例创建了一个构造 a 的入站通道适配器,Message其有效负载是expression属性中定义的表达式的结果。每次fixed-delay发生由属性指定的延迟时,都会创建和发送此类消息。

以下示例与前面的示例类似,不同之处在于它使用了fixed-rate属性:

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate属性允许您以固定速率发送消息(从每个任务的开始时间开始测量)。

以下示例显示了如何应用具有在cron属性中指定的值的 Cron 触发器:

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下示例显示了如何在消息中插入额外的标头:

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

附加的消息头可以采用标量值或评估 Spring 表达式的结果。

如果您需要实现自己的自定义触发器,则可以使用该trigger属性来提供对任何实现该org.springframework.scheduling.Trigger接口的 spring 配置 bean 的引用。以下示例显示了如何执行此操作:

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>

1. see XML Configuration