Spring 批量集成
Spring Batch 集成介绍
Spring Batch 的许多用户可能会遇到 Spring Batch 范围之外的需求,但可以通过使用 Spring Integration 高效而简洁地实现这些需求。相反,Spring Integration 用户可能会遇到 Spring Batch 需求,需要一种有效集成这两个框架的方法。在这种情况下,出现了几种模式和用例,Spring Batch Integration 解决了这些需求。
Spring Batch 和 Spring Integration 之间的界限并不总是很清楚,但有两条建议可以提供帮助:考虑粒度,并应用通用模式。本参考手册部分描述了其中一些常见模式。
将消息添加到批处理过程可以实现操作自动化以及关键关注点的分离和策略化。例如,一条消息可能会触发一个作业执行,然后消息的发送可以通过多种方式暴露出来。或者,当作业完成或失败时,该事件可能会触发要发送的消息,而这些消息的使用者可能会遇到与应用程序本身无关的操作问题。消息传递也可以嵌入到作业中(例如读取或写入通过通道处理的项目)。远程分区和远程分块提供了将工作负载分配给多个工作人员的方法。
本节涵盖以下关键概念:
命名空间支持
从 Spring Batch Integration 1.3 开始,添加了专用的 XML 命名空间支持,旨在提供更轻松的配置体验。为了激活命名空间,将以下命名空间声明添加到您的 Spring XML 应用程序上下文文件中:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/batch-integration
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">
...
</beans>
用于 Spring Batch Integration 的完全配置的 Spring XML Application Context 文件可能如下所示:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
xsi:schemaLocation="
http://www.springframework.org/schema/batch-integration
https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd">
...
</beans>
也允许将版本号附加到引用的 XSD 文件,但由于无版本声明始终使用最新模式,我们通常不建议将版本号附加到 XSD 名称。添加版本号可能会在更新 Spring Batch Integration 依赖项时产生问题,因为它们可能需要更新版本的 XML 模式。
通过消息启动批处理作业
使用核心 Spring Batch API 启动批处理作业时,您基本上有 2 个选项:
-
从命令行,使用
CommandLineJobRunner
-
以编程方式,使用
JobOperator.start()
或JobLauncher.run()
例如,您可能希望在使用
CommandLineJobRunner
shell 脚本调用批处理作业时使用 。或者,您可以
JobOperator
直接使用(例如,当使用 Spring Batch 作为 Web 应用程序的一部分时)。但是,更复杂的用例呢?也许您需要轮询远程 (S)FTP 服务器来检索批处理作业的数据,或者您的应用程序必须同时支持多个不同的数据源。例如,您可能不仅从 Web 接收数据文件,还从 FTP 和其他来源接收数据文件。在调用 Spring Batch 之前,可能需要对输入文件进行额外的转换。
因此,使用 Spring Integration 及其众多适配器执行批处理作业会更强大。例如,您可以使用文件入站通道适配器来监视文件系统中的目录,并在输入文件到达时立即启动批处理作业。此外,您可以创建使用多个不同适配器的 Spring Integration 流,以便仅使用配置轻松地同时从多个源为批处理作业摄取数据。使用 Spring Integration 实现所有这些场景很容易,因为它允许解耦、事件驱动的
JobLauncher
.
Spring Batch Integration 提供了
JobLaunchingMessageHandler
可用于启动批处理作业的类。的输入
JobLaunchingMessageHandler
由 Spring Integration 消息提供,该消息的有效负载类型为
JobLaunchRequest
。此类是Job
需要启动的包装器以及JobParameters
启动批处理作业所需的包装器。
下图说明了用于启动批处理作业的典型 Spring Integration 消息流。EIP(企业集成模式)网站 提供了消息图标及其描述的完整概述。
将文件转换为 JobLaunchRequest
package io.spring.sbi;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution
回应_
执行批处理作业时,将
JobExecution
返回一个实例。此实例可用于确定执行的状态。如果 aJobExecution
能够创建成功,无论实际执行是否成功,它总是返回。
如何返回实例的确切行为JobExecution
取决于提供的
TaskExecutor
. 如果使用
synchronous
(单线程)
TaskExecutor
实现,
则仅在作业完成JobExecution
时才返回响应
。after
使用 时
asynchronous
TaskExecutor
,
JobExecution
会立即返回实例。然后,用户可以获取id
of
JobExecution
实例(使用JobExecution.getJobId()
)并使用 查询
JobRepository
作业的更新状态JobExplorer
。有关更多信息,请参阅 Spring Batch 参考文档中的
Querying the Repository。
Spring Batch 集成配置
考虑这样一种情况,有人需要创建一个文件inbound-channel-adapter
来侦听提供的目录中的 CSV 文件,将它们交给转换器 ( FileMessageToJobRequest
),通过Job Launching Gateway 启动作业,然后JobExecution
使用logging-channel-adapter
.
以下示例显示了如何在 XML 中配置该常见情况:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
以下示例显示了如何在 Java 中配置这种常见情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
示例 ItemReader 配置
现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring Batch ItemReader
(例如)以使用在名为“input.file.name”的作业参数定义的位置找到的文件,如下面的 bean 所示配置:
以下 XML 示例显示了必要的 bean 配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
以下 Java 示例显示了必要的 bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
前面示例中的主要兴趣点是将 的值
#{jobParameters['input.file.name']}
作为 Resource 属性值注入并将ItemReader
bean 设置为具有Step 范围。将 bean 设置为 Step 范围利用了后期绑定支持,它允许访问
jobParameters
变量。
作业启动网关的可用属性
作业启动网关具有以下属性,您可以设置这些属性来控制作业:
-
id
: 标识底层 Spring bean 定义,它是以下任一实例:-
EventDrivenConsumer
-
PollingConsumer
(具体实现取决于组件的输入通道是 aSubscribableChannel
还是PollableChannel
。)
-
-
auto-startup
:布尔标志,指示端点应在启动时自动启动。默认值为true。 -
request-channel
:MessageChannel
此端点的输入。 -
reply-channel
:MessageChannel
生成的JobExecution
有效载荷发送到的位置。 -
reply-timeout
:让您指定此网关在抛出异常之前等待回复消息成功发送到回复通道的时间(以毫秒为单位)。此属性仅适用于通道可能阻塞的情况(例如,使用当前已满的有界队列通道时)。另外,请记住,当发送到 a 时DirectChannel
,调用发生在发送者的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。该reply-timeout
属性映射到sendTimeout
底层MessagingTemplate
实例的属性。如果未指定,则该属性默认为<emphasis>-1</emphasis>,这意味着默认情况下,Gateway
无限期等待。 -
job-launcher
: 可选的。接受自定义JobLauncher
bean 引用。如果未指定,适配器将重新使用在 of 下注册的id
实例jobLauncher
。如果不存在默认实例,则会引发异常。 -
order
: 指定此端点作为订阅者连接到 时的调用顺序SubscribableChannel
。
子元素
当它Gateway
从 接收消息时
PollableChannel
,您必须提供全局默认值Poller
或向 提供Poller
子元素
Job Launching Gateway
。
以下示例显示了如何在 XML 中提供轮询器:
<batch-int:job-launching-gateway request-channel="queueChannel"
reply-channel="replyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>
以下示例显示了如何在 Java 中提供轮询器:
@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
jobLaunchingGateway.setOutputChannel(replyChannel());
return jobLaunchingGateway;
}
通过信息性消息提供反馈
由于 Spring Batch 作业可以运行很长时间,因此提供进度信息通常很关键。例如,如果批处理作业的部分或全部部分失败,利益相关者可能希望得到通知。Spring Batch 支持通过以下方式收集此信息:
-
主动轮询
-
事件驱动的监听器
当异步启动 Spring Batch 作业时(例如,通过使用Job Launching
Gateway
),将JobExecution
返回一个实例。因此,JobExecution.getJobId()
可用于
JobExecution
通过JobRepository
使用JobExplorer
. 但是,这被认为是次优的,应该首选事件驱动的方法。
因此,Spring Batch 提供了监听器,包括三个最常用的监听器:
-
StepListener
-
ChunkListener
-
JobExecutionListener
在下图中显示的示例中,Spring Batch 作业已配置为
StepExecutionListener
. 因此,Spring Integration 接收并处理事件之前或之后的任何步骤。例如,StepExecution
可以使用
Router
. 根据该检查的结果,可能会发生各种事情(例如将消息路由到邮件出站通道适配器),以便可以根据某些条件发送电子邮件通知。
以下由两部分组成的示例显示了如何配置侦听器以将消息发送到 aGateway
以获取StepExecution
事件并将其输出记录到 a
logging-channel-adapter
。
首先,创建通知集成 bean。
以下示例显示了如何在 XML 中创建通知集成 bean:
<int:channel id="stepExecutionsChannel"/>
<int:gateway id="notificationExecutionsListener"
service-interface="org.springframework.batch.core.StepExecutionListener"
default-request-channel="stepExecutionsChannel"/>
<int:logging-channel-adapter channel="stepExecutionsChannel"/>
以下示例显示了如何在 Java 中创建通知集成 bean:
@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}
您需要将@IntegrationComponentScan 注释添加到您的配置中。
|
其次,修改您的工作以添加一个步进级侦听器。
以下示例显示了如何在 XML 中添加步骤级侦听器:
<job id="importPayments">
<step id="step1">
<tasklet ../>
<chunk ../>
<listeners>
<listener ref="notificationExecutionsListener"/>
</listeners>
</tasklet>
...
</step>
</job>
以下示例显示了如何在 Java 中添加步进级侦听器:
public Job importPaymentsJob() {
return jobBuilderFactory.get("importPayments")
.start(stepBuilderFactory.get("step1")
.chunk(200)
.listener(notificationExecutionsListener())
...
}
异步处理器
异步处理器可帮助您扩展项目的处理。在异步处理器用例中,anAsyncItemProcessor
充当调度程序,ItemProcessor
在新线程上执行项目的逻辑。一旦项目完成, 将Future
传递给AsynchItemWriter
要写入的。
因此,您可以通过使用异步项目处理来提高性能,基本上是让您实现fork-join方案。收集AsyncItemWriter
结果并在所有结果可用后立即写回该块。
以下示例显示了如何AsyncItemProcessor
在 XML 中配置:
<bean id="processor"
class="org.springframework.batch.integration.async.AsyncItemProcessor">
<property name="delegate">
<bean class="your.ItemProcessor"/>
</property>
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
</property>
</bean>
以下示例显示了如何AsyncItemProcessor
在 XML 中配置:
@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
asyncItemProcessor.setTaskExecutor(taskExecutor);
asyncItemProcessor.setDelegate(itemProcessor);
return asyncItemProcessor;
}
delegate
属性指的是你的bean ItemProcessor
,taskExecutor
属性指的TaskExecutor
是你选择的。
以下示例显示了如何AsyncItemWriter
在 XML 中配置:
<bean id="itemWriter"
class="org.springframework.batch.integration.async.AsyncItemWriter">
<property name="delegate">
<bean id="itemWriter" class="your.ItemWriter"/>
</property>
</bean>
以下示例显示了如何AsyncItemWriter
在 Java 中配置:
@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
asyncItemWriter.setDelegate(itemWriter);
return asyncItemWriter;
}
同样,该delegate
属性实际上是对您的ItemWriter
bean 的引用。
外部化批处理执行
到目前为止讨论的集成方法建议了 Spring Integration 像外壳一样包装 Spring Batch 的用例。但是,Spring Batch 也可以在内部使用 Spring Integration。使用这种方法,Spring Batch 用户可以将项目甚至块的处理委托给外部进程。这使您可以卸载复杂的处理。Spring Batch Integration 提供专门的支持:
-
远程分块
-
远程分区
远程分块
更进一步,还可以通过使用
ChunkMessageChannelItemWriter
(由 Spring Batch Integration 提供)将块处理外部化,它将项目发送出去并收集结果。发送后,Spring Batch 继续读取和分组项目的过程,而无需等待结果。ChunkMessageChannelItemWriter
相反,收集结果并将它们重新集成到 Spring Batch 过程中是他们的责任。
使用 Spring Integration,您可以完全控制进程的并发性(例如,通过使用 aQueueChannel
而不是 a
DirectChannel
)。此外,通过依赖 Spring Integration 丰富的通道适配器(例如 JMS 和 AMQP)集合,您可以将批处理作业的块分发到外部系统进行处理。
具有要远程分块的步骤的作业可能具有类似于 XML 中的以下配置:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
具有要远程分块的步骤的作业可能具有类似于 Java 中的以下配置:
public Job chunkJob() {
return jobBuilderFactory.get("personJob")
.start(stepBuilderFactory.get("step1")
.<Person, Person>chunk(200)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
ItemReader
引用指向要用于在管理器上读取数据的 bean 。如上所述,ItemWriter
引用指向一个特殊的ItemWriter
(称为
ChunkMessageChannelItemWriter
)。处理器(如果有的话)不在管理器配置中,因为它是在工作器上配置的。在实施您的用例时,您应该检查任何其他组件属性,例如油门限制等。
以下 XML 配置提供了基本的管理器设置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
以下 Java 配置提供了基本的管理器设置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
前面的配置为我们提供了许多 bean。我们使用 ActiveMQ 和 Spring Integration 提供的入站/出站 JMS 适配器配置我们的消息传递中间件。如图所示,我们itemWriter
的作业步骤引用的 bean 使用
ChunkMessageChannelItemWriter
用于在配置的中间件上写入块。
现在我们可以继续进行工作人员配置,如以下示例所示:
以下示例显示了 XML 中的工作器配置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
以下示例显示了 Java 中的 worker 配置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
这些配置项中的大多数应该从管理器配置中看起来很熟悉。Workers 不需要访问 Spring BatchJobRepository
也不需要访问实际的作业配置文件。感兴趣的主要 bean 是chunkProcessorChunkHandler
. 的
chunkProcessor
属性ChunkProcessorChunkHandler
需要一个已配置SimpleChunkProcessor
的 ,您可以在其中提供对您的
ItemWriter
(以及,可选的,您的
ItemProcessor
)的引用,当它从管理器接收块时,它将在工作人员上运行。
有关详细信息,请参阅远程分块的“可扩展性”章节 。
从 4.1 版开始,Spring Batch Integration 引入了@EnableBatchIntegration
可用于简化远程分块设置的注解。此注解提供了两个可以在应用程序上下文中自动装配的 bean:
-
RemoteChunkingManagerStepBuilderFactory
:用于配置管理器步骤 -
RemoteChunkingWorkerBuilder
:用于配置远程工作者集成流程
这些 API 负责配置许多组件,如下图所述:
在管理器方面,RemoteChunkingManagerStepBuilderFactory
您可以通过声明来配置管理器步骤:
-
项目阅读器阅读项目并将其发送给工人
-
向工作人员发送请求的输出通道(“传出请求”)
-
用于接收工作人员回复的输入通道(“传入回复”)
AChunkMessageChannelItemWriter
和 theMessagingTemplate
不需要显式配置(如果需要,仍然可以显式配置)。
在工作人员方面,RemoteChunkingWorkerBuilder
允许您将工作人员配置为:
-
收听管理器在输入通道上发送的请求(“传入请求”)
-
使用配置的和调用每个请求的
handleChunk
方法ChunkProcessorChunkHandler
ItemProcessor
ItemWriter
-
将输出通道上的回复(“传出回复”)发送给经理
不需要显式配置SimpleChunkProcessor
和ChunkProcessorChunkHandler
(如果需要,可以显式配置)。
以下示例显示了如何使用这些 API:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
您可以在此处找到远程分块作业的完整示例 。
远程分区
另一方面,当不是项目处理而是相关 I/O 导致瓶颈时,远程分区很有用。使用远程分区,可以将工作外包给执行完整 Spring Batch 步骤的工作人员。因此,每个工人都有自己的ItemReader
,ItemProcessor
和
ItemWriter
。为此,Spring Batch Integration 提供了MessageChannelPartitionHandler
.
该PartitionHandler
接口的实现使用MessageChannel
实例向远程工作人员发送指令并接收他们的响应。这为用于与远程工作人员通信的传输(例如 JMS 和 AMQP)提供了一个很好的抽象。
“可扩展性”一章中涉及
远程分区的部分提供了配置远程分区所需的概念和组件的概述,并显示了使用默认值
TaskExecutorPartitionHandler
在单独的本地执行线程中进行分区的示例。对于多个 JVM 的远程分区,需要两个附加组件:
-
远程结构或网格环境
-
PartitionHandler
支持所需远程结构或网格环境的实现
与远程分块类似,JMS 可以用作“远程结构”。在这种情况下,使用MessageChannelPartitionHandler
实例作为PartitionHandler
实现,如前所述。
以下示例假定现有的分区作业并着重
MessageChannelPartitionHandler
于 XML 中的 JMS 配置:
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
以下示例假定现有的分区作业并着重
MessageChannelPartitionHandler
于 Java 中的 JMS 配置:
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlows.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlows.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
您还必须确保分区handler
属性映射到partitionHandler
bean。
以下示例将分区handler
属性映射到partitionHandler
XML 中:
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
以下示例将分区handler
属性映射到partitionHandler
Java 中:
public Job personJob() {
return jobBuilderFactory.get("personJob")
.start(stepBuilderFactory.get("step1.manager")
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
您可以在此处找到远程分区作业的完整示例 。
@EnableBatchIntegration
可用于简化远程分区设置的注释。这个注解提供了两个对远程分区有用的 bean:
-
RemotePartitioningManagerStepBuilderFactory
:用于配置管理器步骤 -
RemotePartitioningWorkerStepBuilderFactory
:用于配置worker step
这些 API 负责配置许多组件,如下图所述:
在管理器方面,RemotePartitioningManagerStepBuilderFactory
允许您通过声明来配置管理器步骤:
-
Partitioner
用于对数据进行分区 -
向工作人员发送请求的输出通道(“传出请求”)
-
接收来自工作人员的回复的输入通道(“传入回复”)(在配置回复聚合时)
-
轮询间隔和超时参数(配置作业存储库轮询时)
和不需要显式配置(如果需要,仍然可以显式配置)MessageChannelPartitionHandler
。MessagingTemplate
在工作人员方面,RemotePartitioningWorkerStepBuilderFactory
允许您将工作人员配置为:
-
收听管理器在输入通道上发送的请求(“传入请求”)
-
为每个请求调用
handle
方法StepExecutionRequestHandler
-
将输出通道上的回复(“传出回复”)发送给经理
不需要显式配置StepExecutionRequestHandler
(如果需要,可以显式配置)。
以下示例显示了如何使用这些 API:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}