系统管理
指标和管理
本节介绍如何捕获 Spring Integration 的指标。在最近的版本中,我们更多地依赖于 Micrometer(参见https://micrometer.io),我们计划在未来的版本中更多地使用 Micrometer。
在高容量环境中禁用日志记录
您可以在主消息流中控制调试日志记录。在容量非常大的应用程序中,调用isDebugEnabled()
某些日志子系统可能会非常昂贵。您可以禁用所有此类日志记录以避免这种开销。异常日志记录(调试或其他)不受此设置的影响。
以下清单显示了用于控制日志记录的可用选项:
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)
public static class ContextConfiguration {
...
}
<int:management default-logging-enabled="true"/> (1)
1 | 设置为false 禁用主消息流中的所有日志记录,而不考虑日志系统类别设置。设置为“true”以启用调试日志记录(如果也由日志记录子系统启用)。仅当您没有在 bean 定义中显式配置设置时才应用。默认值为true . |
defaultLoggingEnabled 仅当您没有在 bean 定义中显式配置相应设置时才应用。
|
千分尺积分
概述
从版本 5.0.3 开始,应用程序上下文中存在Micrometer MeterRegistry
会触发对 Micrometer 度量的支持。
要使用 Micrometer,请将其中一个MeterRegistry
bean 添加到应用程序上下文中。
对于每个MessageHandler
和MessageChannel
,都注册了计时器。对于每个MessageSource
,都会注册一个计数器。
这仅适用于扩展AbstractMessageHandler
、AbstractMessageChannel
和的对象AbstractMessageSource
(大多数框架组件都是这种情况)。
消息通道上发送操作的Timer
仪表具有以下名称或标签:
-
name
:spring.integration.send
-
tag
:type:channel
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Send processing time
(failure
有none
异常的结果意味着通道的send()
操作返回false
。)
可轮询消息通道上用于接收操作的Counter
仪表具有以下名称或标签:
-
name
:spring.integration.receive
-
tag
:type:channel
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Messages received
消息处理程序上的Timer
操作仪表具有以下名称或标签:
-
name
:spring.integration.send
-
tag
:type:handler
-
tag
:name:<componentName>
-
tag
:result:(success|failure)
-
tag
:exception:(none|exception simple class name)
-
description
:Send processing time
消息源的Counter
计量器具有以下名称/标签:
-
name
:spring.integration.receive
-
tag
:type:source
-
tag
:name:<componentName>
-
tag
:result:success
-
tag
:exception:none
-
description
:Messages received
此外,还有三个Gauge
Meter:
-
spring.integration.channels
:MessageChannels
应用程序中的数量。 -
spring.integration.handlers
:MessageHandlers
应用程序中的数量。 -
spring.integration.sources
:MessageSources
应用程序中的数量。
可以Meters
通过提供MicrometerMetricsCaptor
. MicrometerCustomMetricsTests测试用例展示了如何做到这一点的简单示例。您还可以通过重载build()
构建器子类上的方法来进一步自定义计量器。
从版本 5.1.13 开始,QueueChannel
公开了用于队列大小和剩余容量的千分尺:
-
name
:spring.integration.channel.queue.size
-
tag
:type:channel
-
tag
:name:<componentName>
-
description
:The size of the queue channel
和
-
name
:spring.integration.channel.queue.remaining.capacity
-
tag
:type:channel
-
tag
:name:<componentName>
-
description
:The remaining capacity of the queue channel
禁用仪表
使用Legacy Metrics(现已删除),您可以指定哪些集成组件将收集指标。默认情况下,所有仪表在首次使用时都会注册。现在,使用 Micrometer,您可以将MeterFilter
s 添加到MeterRegistry
以防止部分或全部被注册。name
您可以通过提供的任何属性、tag
、 等过滤掉(拒绝)仪表。有关详细信息,请参阅 Micrometer 文档中的仪表过滤器。
例如,给定:
@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}
您可以通过以下方式禁止仅此通道的仪表注册:
registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));
Spring 集成 JMX 支持
另请参阅JMX 支持。
消息历史
消息传递体系结构的主要好处是松散耦合,这样参与的组件不会保持对彼此的任何了解。仅这一点就使应用程序非常灵活,让您可以在不影响流程的其余部分的情况下更改组件、更改消息传递路由、更改消息使用样式(轮询与事件驱动)等等。然而,当出现问题时,这种谦逊的建筑风格可能会变得很困难。调试时,您可能需要尽可能多的有关消息的信息(其来源、遍历的通道以及其他详细信息)。
消息历史是其中一种模式,它为您提供了一个选项来保持对消息路径的某种程度的了解,无论是出于调试目的还是维护审计跟踪。Spring 集成提供了一种简单的方法来配置消息流以维护消息历史记录,方法是向消息添加标头并在每次消息通过跟踪组件时更新该标头。
消息历史配置
要启用消息历史记录,您只需在配置中定义message-history
元素(或@EnableMessageHistory
),如下例所示:
@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/>
现在跟踪每个命名组件(定义了“id”的组件)。框架在您的消息中设置“历史”标题。它的值 a List<Properties>
。
考虑以下配置示例:
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}
@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}
<int:gateway id="sampleGateway"
service-interface="org.springframework.integration.history.sample.SampleGateway"
default-request-channel="bridgeInChannel"/>
<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
<int:header name="baz" value="baz"/>
</int:header-enricher>
上述配置生成了一个简单的消息历史结构,输出类似于以下内容:
[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]
要访问消息历史记录,您只需访问MessageHistory
标题。以下示例显示了如何执行此操作:
Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));
您可能不想跟踪所有组件。要根据名称限制特定组件的历史记录,您可以提供tracked-components
属性并指定与您要跟踪的组件匹配的组件名称和模式的逗号分隔列表。以下示例显示了如何执行此操作:
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>
在前面的示例中,仅为以“Gateway”结尾、以“sample”开头或与名称“aName”完全匹配的组件维护消息历史记录。
此外,该MessageHistoryConfigurer
bean 现在由IntegrationMBeanExporter
(请参阅MBean Exporter)作为 JMX MBean 公开,允许您在运行时更改模式。但是请注意,必须停止 bean(关闭消息历史记录)才能更改模式。此功能可能有助于暂时打开历史记录以分析系统。MBean 的对象名称是<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer
.
只有一个@EnableMessageHistory (或<message-history/> )必须在应用程序上下文中声明为组件跟踪配置的单一来源。不要对MessageHistoryConfigurer .
|
根据定义,消息历史标头是不可变的(您不能重写历史记录)。因此,当写入消息历史值时,组件要么创建新消息(当组件是源时),要么从请求消息中复制历史,修改它并在回复消息上设置新列表。在任何一种情况下,即使消息本身跨越线程边界,也可以附加这些值。这意味着历史值可以大大简化异步消息流中的调试。 |
消息存储
Enterprise Integration Patterns (EIP) 一书确定了几种能够缓冲消息的模式。例如,聚合器缓冲消息直到它们可以被释放,并且QueueChannel
缓冲消息直到消费者明确地从该通道接收到这些消息。由于消息流中的任何点都可能发生故障,缓冲消息的 EIP 组件也会引入消息可能丢失的点。
为了降低丢失消息的风险,EIP 定义了消息存储模式,它允许 EIP 组件存储消息,通常在某种类型的持久存储(例如 RDBMS)中。
Spring Integration 通过以下方式为消息存储模式提供支持:
-
定义
org.springframework.integration.store.MessageStore
策略接口 -
提供此接口的几种实现
-
在所有能够缓冲消息的组件上公开一个
message-store
属性,以便您可以注入任何实现该MessageStore
接口的实例。
有关如何配置特定消息存储实现以及如何将MessageStore
实现注入特定缓冲组件的详细信息在整个手册中进行了描述(请参阅特定组件,例如QueueChannel、Aggregator、Delayer等)。以下一对示例展示了如何为 aQueueChannel
和 aggregator 添加对消息存储的引用:
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
默认情况下,消息使用o.s.i.store.SimpleMessageStore
的实现存储在内存中MessageStore
。这对于开发或简单的低容量环境可能很好,因为非持久性消息的潜在丢失不是问题。但是,典型的生产应用程序需要更强大的选项,不仅可以降低消息丢失的风险,还可以避免潜在的内存不足错误。因此,我们还为MessageStore
各种数据存储提供了实现。以下是支持的实现的完整列表:
-
JDBC 消息存储:使用 RDBMS 存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来存储消息
-
Gemfire 消息存储:使用 Gemfire 分布式缓存来存储消息
但是,在使用 根据 特别注意代表某些类型数据的标题。例如,如果其中一个标头包含某个 Spring bean 的实例,则在反序列化时,您可能最终得到该 bean 的不同实例,这直接影响框架创建的一些隐式标头(例如 从 Spring Integration 版本 3.0 开始,您可以使用配置为在将通道注册到 另外,考虑如下配置消息流时会发生什么:网关→队列通道(由持久消息存储支持)→服务激活器。该网关创建一个临时回复通道,该通道在服务激活器的轮询器从队列中读取时丢失。同样,您可以使用标题丰富器将标题替换为 有关详细信息,请参阅Header Enricher。 |
Spring Integration 4.0 引入了两个新接口:
-
ChannelMessageStore
:实现特定于QueueChannel
实例的操作 -
PriorityCapableChannelMessageStore
:标记MessageStore
要用于PriorityChannel
实例的实现并为持久化消息提供优先级顺序。
实际行为取决于实现。该框架提供了以下实现,可以用作和的持久MessageStore
化:QueueChannel
PriorityChannel
注意事项
SimpleMessageStore 从 4.1 版本开始, 访问组件(例如聚合器)之外的组存储的用户现在可以直接引用聚合器使用的组,而不是副本。在聚合器之外对组进行操作可能会导致不可预知的结果。 因此,您不应执行此类操作或将 |
使用MessageGroupFactory
从 4.3 版开始,一些实现MessageGroupStore
可以注入自定义MessageGroupFactory
策略来创建和自定义. 这默认为 a ,它基于( ) 内部集合生成实例。其他可能的选项是and ,其中最后一个可用于恢复以前的行为。该选项也可用。有关详细信息,请参阅下一节。从版本 5.0.1 开始,当组中消息的顺序和唯一性无关紧要时,该选项也可用。MessageGroup
MessageGroupStore
SimpleMessageGroupFactory
SimpleMessageGroup
GroupType.HASH_SET
LinkedHashSet
SYNCHRONISED_SET
BLOCKING_QUEUE
SimpleMessageGroup
PERSISTENT
LIST
持久MessageGroupStore
和延迟加载
从 4.3 版开始,所有持久实例都以延迟加载的方式从存储中MessageGroupStore
检索MessageGroup
实例及其实例。messages
在大多数情况下,它对关联MessageHandler
实例很有用(请参阅Aggregator和ResequencerMessageGroup
),因为它会增加开销以在每个关联操作上从存储中加载整个数据。
您可以使用该AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
选项从配置中关闭延迟加载行为。
MessageStore
我们在 MongoDB ( MongoDB Message Store ) 和<aggregator>
( Aggregator )上对延迟加载的性能测试使用release-strategy
类似于以下的自定义:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它为 1000 条简单消息生成类似于以下的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是从 5.5 版开始,所有持久性MessageGroupStore
实现都提供了streamMessagesForGroup(Object groupId)
基于目标数据库流 API 的协定。当商店中的群体非常大时,这可以提高资源利用率。在框架内部,这个新的 API 在延迟器(例如)中使用,当它在启动时重新安排持久消息时。返回的Stream<Message<?>>
必须在处理结束时关闭,例如通过try-with-resources
. 每当使用 aPersistentMessageGroup
时,其streamMessages()
代表MessageGroupStore.streamMessagesForGroup()
.
消息组条件
从 5.5 版开始,MessageGroup
抽象提供了一个condition
字符串选项。此选项的值可以是以后可以出于任何原因进行解析以为组做出决定的任何值。例如,ReleaseStrategy
来自相关消息处理程序的消息可能会从组中查询此属性,而不是迭代组中的所有消息。MessageGroupStore
公开一个setGroupCondition(Object groupId, String condition)
API 。为此,setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
已将一个选项添加到AbstractCorrelatingMessageHandler
. 在将每条消息添加到组后以及该组的现有条件之后,都会针对每条消息评估此函数。实现可能决定返回一个新值、现有值,或者将目标条件重置为null
。的价值condition
可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并随后解析的内容。例如,FileMarkerReleaseStrategy
来自File AggregatorFileHeaders.LINE_COUNT
组件,将条件从消息的标头填充到组中,并通过将组大小与此条件中的值进行比较FileSplitter.FileMarker.Mark.END
来咨询它。canRelease()
这样它就不会遍历组中的所有消息来查找FileSplitter.FileMarker.Mark.END
带有FileHeaders.LINE_COUNT
标题的消息。它还允许结束标记在所有其他记录之前到达聚合器;例如在多线程环境中处理文件时。
另外,为了配置方便,GroupConditionProvider
引入了合约。AbstractCorrelatingMessageHandler
检查提供的是否ReleaseStrategy
实现此接口并提取conditionSupplier
组条件评估逻辑。
元数据存储
许多外部系统、服务或资源不是事务性的(Twitter、RSS、文件系统等),并且无法将数据标记为已读。此外,有时,您可能需要在某些集成解决方案中实现企业集成模式幂等接收器。为了实现这个目标并在与外部系统的下一次交互或处理下一条消息之前存储端点的一些先前状态,Spring Integration 提供元数据存储组件作为具有org.springframework.integration.metadata.MetadataStore
通用键值合约的接口的实现。
元数据存储旨在存储各种类型的通用元数据(例如,已处理的最后一个提要条目的发布日期),以帮助提要适配器等组件处理重复项。如果组件没有直接提供对 a 的引用MetadataStore
,则定位元数据存储的算法如下:首先,metadataStore
在应用程序上下文中查找具有 ID 的 bean。如果找到,请使用它。否则,创建 的新实例SimpleMetadataStore
,这是一个内存实现,仅在当前运行的应用程序上下文的生命周期内保留元数据。这意味着,在重新启动时,您可能会得到重复的条目。
如果您需要在应用程序上下文重新启动之间保留元数据,框架提供以下持久性MetadataStores
:
-
PropertiesPersistingMetadataStore
由PropertiesPersistingMetadataStore
属性文件和PropertiesPersister
.
默认情况下,它只保留应用程序上下文正常关闭时的状态。它的实现Flushable
是为了让您可以通过调用flush()
. 以下示例显示如何使用 XML 配置“PropertiesPersistingMetadataStore”:
<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>
或者,您可以提供自己的MetadataStore
接口实现(例如,JdbcMetadataStore
)并将其配置为应用程序上下文中的 bean。
从 4.0 版开始SimpleMetadataStore
,PropertiesPersistingMetadataStore
, 和RedisMetadataStore
实施ConcurrentMetadataStore
. 这些提供原子更新并且可以跨多个组件或应用程序实例使用。
幂等接收器和元数据存储
元数据存储对于实现 EIP幂等接收器模式很有用,当需要过滤传入消息(如果它已经被处理)并且您可以丢弃它或在丢弃时执行一些其他逻辑时。以下配置显示了如何执行此操作的示例:
<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>
<int:publish-subscribe-channel id="idempotentServiceChannel"/>
<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>
<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
幂等条目的value
可能是到期日期,在此之后,该条目应该由某个计划的收割者从元数据存储中删除。
MetadataStoreListener
一些元数据存储(目前只有 zookeeper)支持注册监听器以在项目更改时接收事件,如以下示例所示:
public interface MetadataStoreListener {
void onAdd(String key, String value);
void onRemove(String key, String oldValue);
void onUpdate(String key, String newValue);
}
有关更多信息,请参阅Javadoc。MetadataStoreListenerAdapter
如果您只对事件的子集感兴趣,则可以将其子类化。
控制总线
正如企业集成模式(EIP) 书中所述,控制总线背后的想法是,可以使用相同的消息传递系统来监视和管理框架内的组件,就像用于“应用程序级”消息传递一样。在 Spring Integration 中,我们基于上述适配器构建,以便您可以发送消息作为调用公开操作的一种方式。
以下示例显示如何使用 XML 配置控制总线:
<int:control-bus input-channel="operationChannel"/>
控制总线有一个输入通道,可以访问该通道以在应用程序上下文中调用对 bean 的操作。它还具有服务激活端点的所有常见属性。例如,如果操作的结果具有要发送到下游通道的返回值,则可以指定输出通道。
控制总线在输入通道上运行消息作为 Spring 表达式语言 (SpEL) 表达式。它接收一条消息,将正文编译为表达式,添加一些上下文,然后运行它。@ManagedAttribute
默认上下文支持使用或注释的任何方法@ManagedOperation
。它还支持 SpringLifecycle
接口上的方法(及其Pausable
自 5.2 版以来的扩展),并支持用于配置多个 SpringTaskExecutor
和TaskScheduler
实现的方法。确保您自己的方法可用于控制总线的最简单方法是使用@ManagedAttribute
或@ManagedOperation
注释。由于这些注释还用于向 JMX MBean 注册表公开方法,因此它们提供了一个方便的副产品:通常,您希望向控制总线公开的相同类型的操作对于通过 JMX 公开是合理的)。应用程序上下文中的任何特定实例的解析都是在典型的 SpEL 语法中实现的。为此,为 bean 提供带有 SpEL 前缀的 bean 名称 ( @
)。例如,要在 Spring Bean 上执行方法,客户端可以向操作通道发送消息,如下所示:
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
表达式上下文的根是其Message
本身,因此您还可以访问payload
和headers
作为表达式中的变量。这与 Spring Integration 端点中的所有其他表达式支持一致。
使用 Java 注解,您可以按如下方式配置控制总线:
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
同样,您可以按如下方式配置 Java DSL 流定义:
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("controlBus")
.controlBus()
.get();
}
如果您更喜欢使用自动DirectChannel
创建的 lambda,您可以按如下方式创建控制总线:
@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}
在这种情况下,通道被命名为controlBus.input
。
有序关机
如“ MBean Exporter ”中所述,MBean exporter 提供了一个名为 的JMX 操作stopActiveComponents
,用于有序地停止应用程序。该操作只有一个Long
参数。该参数指示操作等待多长时间(以毫秒为单位)以允许进行中的消息完成。操作如下:
-
调用
beforeShutdown()
所有实现OrderlyShutdownCapable
.这样做可以让这些组件为关闭做好准备。实现此接口的组件示例以及它们对此调用的作用包括停止其侦听器容器的 JMS 和 AMQP 消息驱动适配器、停止接受新连接(同时保持现有连接打开)的 TCP 服务器连接工厂、丢弃的 TCP 入站端点(记录)收到的任何新消息,以及
503 - Service Unavailable
为任何新请求返回的 HTTP 入站端点。 -
停止任何活动通道,例如 JMS 或 AMQP 支持的通道。
-
停止所有
MessageSource
实例。 -
停止所有入站
MessageProducer
(不是OrderlyShutdownCapable
)。 -
等待任何剩余时间,由
Long
传递给操作的参数值定义。这样做可以让任何飞行中的消息完成它们的旅程。因此,在调用此操作时选择适当的超时非常重要。
-
调用
afterShutdown()
所有OrderlyShutdownCapable
组件。这样做可以让这些组件执行最终的关闭任务(例如关闭所有打开的套接字)。
如有序关闭托管操作中所述,可以使用 JMX 调用此操作。如果您希望以编程方式调用该方法,则需要注入或以其他方式获取对IntegrationMBeanExporter
. 如果定义中没有id
提供属性<int-jmx:mbean-export/>
,则 bean 具有生成的名称。如果同一个 JVM ( )ObjectName
中存在多个 Spring Integration 上下文,此名称包含一个随机组件以避免冲突。MBeanServer
因此,如果您希望以编程方式调用该方法,我们建议您为导出器提供一个id
属性,以便您可以在应用程序上下文中轻松访问它。
最后,可以使用<control-bus>
元素调用操作。有关详细信息,请参阅监控 Spring Integration 示例应用程序。
前面描述的算法在 4.1 版中得到了改进。以前,所有任务执行程序和调度程序都已停止。QueueChannel 这可能会导致在实例中保留中间流消息。现在关闭让轮询器运行,让这些消息被耗尽和处理。
|
积分图
从 4.3 版开始,Spring Integration 提供对应用程序运行时对象模型的访问,该模型可以选择性地包含组件指标。它以图形的形式公开,可用于可视化集成应用程序的当前状态。该o.s.i.support.management.graph
包包含收集、构建和呈现 Spring Integration 组件的运行时状态作为单个树状Graph
对象所需的所有类。IntegrationGraphServer
应该声明为 bean 来构建、检索和刷新对象Graph
。生成的Graph
对象可以序列化为任何格式,尽管 JSON 在客户端解析和表示时灵活方便。仅具有默认组件的 Spring Integration 应用程序将公开如下图:
{
"contentDescriptor" : {
"providerVersion" : "5.5.13",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}
5.2 版弃用了传统的度量标准,取而代之的是 Micrometer 米,正如所讨论的Metrics Management。旧版指标已在 5.4 版中删除,将不再出现在图表中。 |
在前面的示例中,图表由三个顶级元素组成。
图形元素包含有关提供数据的应用程序的contentDescriptor
一般信息。name
可以在beanIntegrationGraphServer
或spring.application.name
应用程序上下文环境属性中自定义。框架提供了其他属性,可让您将类似模型与其他来源区分开来。
links
graph 元素表示来自 graph 元素的节点之间的连接,nodes
因此也表示源 Spring Integration 应用程序中的集成组件之间的连接。例如,from a MessageChannel
to an EventDrivenConsumer
with someMessageHandler
或 from an AbstractReplyProducingMessageHandler
to a MessageChannel
。为方便起见并让您确定链接的用途,模型包含该type
属性。可能的类型有:
-
input
: 标识从MessageChannel
到端点inputChannel
、 或requestChannel
属性的方向 -
output
: 从MessageHandler
,MessageProducer
, orSourcePollingChannelAdapter
到MessageChannel
通过outputChannel
orreplyChannel
属性的方向 -
error
:从MessageHandler
或PollingConsumer
或MessageProducer
到SourcePollingChannelAdapter
通过MessageChannel
财产errorChannel
; -
discard
:从DiscardingMessageHandler
(例如MessageFilter
)到MessageChannel
通过errorChannel
属性。 -
route
:从AbstractMappingMessageRouter
(如HeaderValueRouter
)到MessageChannel
。类似于output
但在运行时确定。可能是配置的通道映射或动态解析的通道。为此,路由器通常最多只保留 100 条动态路由,但您可以通过设置该dynamicChannelLimit
属性来修改此值。
可视化工具可以使用来自该元素的信息来呈现来自nodes
图形元素的节点之间的连接,其中from
和to
数字表示来自nodeId
链接节点的属性的值。例如,该link
元素可用于确定port
目标节点上的正确性。
以下“文本图像”显示了类型之间的关系:
+---(丢弃) | +----o----+ | | | | | | (输入)--o o---(输出) | | | | | | +----o----+ | +---(错误)
graph 元素可能是最nodes
有趣的,因为它的元素不仅包含运行时组件及其componentType
实例和name
值,还可以选择包含组件公开的指标。节点元素包含通常不言自明的各种属性。例如,基于表达式的组件包括expression
包含该组件的主要表达式字符串的属性。要启用指标,@EnableIntegrationManagement
请向@Configuration
类添加一个或<int:management/>
向您的 XML 配置添加一个元素。有关完整信息,请参阅指标和管理。
表示唯一的nodeId
增量标识符,可让您将一个组件与另一个组件区分开来。它还用于links
元素中,表示此组件与其他组件的关系(连接)(如果有)。input
和output
属性用于、、或的inputChannel
和outputChannel
属性。有关详细信息,请参阅下一节。AbstractEndpoint
MessageHandler
SourcePollingChannelAdapter
MessageProducerSupport
从 5.1 版开始,IntegrationGraphServer
接受 aFunction<NamedComponent, Map<String, Object>> additionalPropertiesCallback
为IntegrationNode
特定的NamedComponent
. 例如,您可以将SmartLifecycle
autoStartup
andrunning
属性公开到目标图中:
server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});
图运行时模型
Spring Integration 组件具有不同级别的复杂性。例如,任何被轮询的对象MessageSource
还具有 aSourcePollingChannelAdapter
和 aMessageChannel
以定期从源数据向其发送消息。其他组件可能是中间件请求-回复组件(例如JmsOutboundGateway
),其中一个消费者AbstractEndpoint
订阅(或轮询)消息的requestChannel
( input
),以及一个replyChannel
( output
) 生成回复消息以发送到下游。同时,任何MessageProducerSupport
实现(例如ApplicationEventListeningMessageProducer
)都包装了一些源协议监听逻辑并将消息发送到outputChannel
.
在图中,Spring Integration 组件使用IntegrationNode
类层次结构表示,您可以在o.s.i.support.management.graph
包中找到它。例如,您可以使用ErrorCapableDiscardingMessageHandlerNode
for the AggregatingMessageHandler
(因为它有一个选项),并且在使用 adiscardChannel
从 a 消费时会产生错误。另一个例子是 -当使用订阅a 时。PollableChannel
PollingConsumer
CompositeMessageHandlerNode
MessageHandlerChain
SubscribableChannel
EventDrivenConsumer
(@MessagingGateway 请参阅消息传递网关)为其每个方法提供节点,其中name 属性基于网关的 bean 名称和简短的方法签名。考虑以下网关示例:
|
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {
void foo(String foo);
void foo(Integer foo);
void bar(String bar);
}
前面的网关产生类似于以下的节点:
{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
}
您可以使用此IntegrationNode
层次结构在客户端解析图形模型以及了解一般 Spring Integration 运行时行为。另请参阅编程提示和技巧以获取更多信息。
5.3 版引入了一个IntegrationPattern
抽象和所有开箱即用的组件,它们代表了企业集成模式 (EIP),实现了这个抽象并提供了一个IntegrationPatternType
枚举值。此信息对于目标应用程序中的某些分类逻辑可能很有用,或者暴露在图形节点中,UI 可以使用它来确定如何绘制组件。
集成图控制器
如果您的应用程序是基于 Web 的(或构建在带有嵌入式 Web 容器的 Spring Boot 之上)并且 Spring Integration HTTP 或 WebFlux 模块(分别参见HTTP Support和WebFlux Support)存在于类路径中,您可以IntegrationGraphController
使用将IntegrationGraphServer
功能公开为 REST 服务。为此,HTTP 模块中提供了@EnableIntegrationGraphController
和@Configuration
类注解和<int-http:graph-controller/>
XML 元素。连同@EnableWebMvc
注解(或<mvc:annotation-driven/>
用于 XML 定义),此配置注册一个可以在注解或元素上配置的IntegrationGraphController
@RestController
位置。默认路径是.@RequestMapping.path
@EnableIntegrationGraphController
<int-http:graph-controller/>
/integration
提供以下IntegrationGraphController
@RestController
服务:
-
@GetMapping(name = "getGraph")
:检索自上次IntegrationGraphServer
刷新以来 Spring Integration 组件的状态。作为REST 服务的o.s.i.support.management.graph.Graph
一个返回。@ResponseBody
-
@GetMapping(path = "/refresh", name = "refreshGraph")
:刷新当前Graph
的实际运行时状态并将其作为 REST 响应返回。不必为指标刷新图表。它们在检索图形时实时提供。如果自上次检索图形后应用程序上下文已被修改,则可以调用刷新。在这种情况下,图形将完全重建。
IntegrationGraphController
您可以使用 Spring Security 和 Spring MVC 项目提供的标准配置选项和组件设置安全性和跨域限制。以下示例实现了这些目标:
<mvc:annotation-driven />
<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>
<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>
<int-http:graph-controller path="/myIntegration" />
下面的例子展示了如何用 Java 配置做同样的事情:
@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}
//...
}
请注意,为方便起见,@EnableIntegrationGraphController
注释提供了一个allowedOrigins
属性。这提供GET
了对path
. 为了更复杂,您可以使用标准 Spring MVC 机制配置 CORS 映射。