缩放和并行处理
许多批处理问题可以通过单线程、单进程作业来解决,因此在考虑更复杂的实现之前正确检查是否满足您的需求总是一个好主意。衡量实际工作的性能,看看最简单的实现是否首先满足您的需求。即使使用标准硬件,您也可以在一分钟内读取和写入数百兆字节的文件。
当您准备开始使用一些并行处理来实现作业时,Spring Batch 提供了一系列选项,本章将介绍这些选项,尽管其他地方会介绍一些特性。在高层次上,有两种并行处理模式:
-
单进程,多线程
-
多进程
这些也分为以下几类:
-
多线程步骤(单进程)
-
并行步骤(单个进程)
-
步骤的远程分块(多进程)
-
对步骤进行分区(单个或多个进程)
首先,我们回顾单进程选项。然后我们查看多进程选项。
多线程步骤
启动并行处理的最简单方法是TaskExecutor
在 Step 配置中添加一个。
例如,您可以添加 的属性tasklet
,如下所示:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
使用java配置时,TaskExecutor
可以在step中添加a,如下例所示:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
在此示例中,是对实现该接口
taskExecutor
的另一个 bean 定义的引用。
是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以获取可用实现的详细信息。最简单的多线程是
.TaskExecutor
TaskExecutor
TaskExecutor
SimpleAsyncTaskExecutor
上述配置的结果是Step
通过在单独的执行线程中读取、处理和写入每个项目块(每个提交间隔)来执行。请注意,这意味着要处理的项目没有固定的顺序,并且与单线程情况相比,块可能包含不连续的项目。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,tasklet 配置中还有一个节流限制,默认为 4。您可能需要增加此限制以确保线程池是充分利用。
例如,您可能会增加油门限制,如以下示例所示:
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
使用 Java 配置时,构建器提供对节流限制的访问,如以下示例所示:
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
另请注意,您的步骤中使用的任何池化资源(例如DataSource
. 确保使这些资源中的池至少与步骤中所需的并发线程数一样大。
Step
对于一些常见的批处理用例,使用多线程实现存在一些实际限制。a 中的许多参与者Step
(例如读者和作者)都是有状态的。如果状态不是按线程隔离的,那么这些组件在多线程中不可用Step
。特别是,Spring Batch 中的大多数现成的读取器和写入器都不是为多线程使用而设计的。但是,可以使用无状态或线程安全的读取器和写入器,并且在parallelJob
Spring
Batch Samples中有
一个示例(称为已在数据库输入表中处理。
ItemWriter
Spring Batch 提供了和的一些实现ItemReader
。通常,他们会在 Javadoc 中说明它们是否是线程安全的,或者您必须采取哪些措施来避免并发环境中的问题。如果 Javadoc 中没有信息,您可以检查实现以查看是否有任何状态。如果阅读器不是线程安全的,你可以用提供的来装饰它,SynchronizedItemStreamReader
或者在你自己的同步委托器中使用它。您可以同步调用,read()
只要处理和写入是块中最昂贵的部分,您的步骤仍然可能比在单线程配置中更快地完成。
平行步骤
只要可以将需要并行化的应用程序逻辑拆分为不同的职责并分配给各个步骤,那么它就可以在单个进程中并行化。并行步骤执行易于配置和使用。
例如,(step1,step2)
并行执行步骤step3
很简单,如下例所示:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
使用 Java 配置时,(step1,step2)
并行执行步骤step3
很简单,如下例所示:
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
可配置的任务执行器用于指定TaskExecutor
应该使用哪个实现来执行各个流。默认值为
SyncTaskExecutor
,但需要异步TaskExecutor
才能并行运行这些步骤。请注意,该作业确保拆分中的每个流在聚合退出状态和转换之前完成。
有关详细信息,请参阅拆分流部分。
远程分块
在远程分块中,Step
处理被分成多个进程,通过一些中间件相互通信。下图显示了该模式:
manager 组件是单个进程,worker 是多个远程进程。如果经理不是瓶颈,这种模式效果最好,因此处理必须比读取项目更昂贵(实践中经常出现这种情况)。
Step
管理器是 Spring Batch的一个实现,ItemWriter
替换为通用版本,该版本知道如何将项目块作为消息发送到中间件。工作人员是正在使用的任何中间件的标准侦听器(例如,对于 JMS,它们将是MessageListener
实现),它们的作用是通过接口使用标准ItemWriter
或ItemProcessor
plus
处理项目块。使用此模式的优点之一是读取器、处理器和写入器组件是现成的(与用于本地执行步骤的组件相同)。这些项目是动态划分的,工作是通过中间件共享的,因此,如果侦听器都是热切的消费者,那么负载平衡是自动的。ItemWriter
ChunkProcessor
中间件必须是持久的,有保证的交付和每条消息的单一消费者。JMS 是显而易见的候选者,但网格计算和共享内存产品空间中存在其他选项(例如 JavaSpaces)。
有关更多详细信息,请参阅 Spring Batch Integration - Remote Chunking部分 。
分区
Spring Batch 还提供了一个 SPI 用于对Step
执行进行分区并远程执行它。在这种情况下,远程参与者是Step
可以很容易地配置并用于本地处理的实例。下图显示了该模式:
Job
左侧作为一系列Step
实例运行,其中一个
实例Step
被标记为管理器。这张图片中的工人都是 a 的相同实例Step
,它实际上可以代替经理,从而导致相同的结果Job
。工作人员通常是远程服务,但也可能是本地执行线程。经理在这种模式下发送给工人的消息不需要是持久的或有保证的传递。中的 Spring Batch 元数据JobRepository
确保每个工作人员执行一次,并且每次Job
执行仅执行一次。
Spring Batch 中的 SPI 由一个特殊的实现Step
(称为
PartitionStep
)和两个需要针对特定环境实现的策略接口组成。策略接口为PartitionHandler
和StepExecutionSplitter
,它们的作用如下图所示:
在这种Step
情况下,右边是“远程”工作人员,因此,可能有许多对象和/或进程扮演这个角色,并且PartitionStep
显示驱动执行。
以下示例显示了PartitionStep
使用 XML 配置时的配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
以下示例显示了PartitionStep
使用 Java 配置时的配置:
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
类似于多线程步骤的throttle-limit
属性,该grid-size
属性防止任务执行器被来自单个步骤的请求饱和。
有一个简单的示例可以在
Spring Batch Samples的单元测试套件中复制和扩展(请参阅partition*Job.xml
配置)。
Spring Batch 为名为“step1:partition0”的分区创建步骤执行,依此类推。许多人喜欢将管理器步骤称为“step1:manager”以保持一致性。您可以为步骤使用别名(通过指定name
属性而不是id
属性)。
分区处理程序
这PartitionHandler
是了解远程处理或网格环境结构的组件。它能够向StepExecution
远程实例发送请求Step
,以某种特定于结构的格式(如 DTO)进行包装。它不必知道如何拆分输入数据或如何聚合多次Step
执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是结构的特性。在任何情况下,Spring Batch 始终提供独立于结构的可重启性。失败的Job
总是可以重新启动,只有失败Steps
的才会重新执行。
该PartitionHandler
接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。Spring Batch 不包含任何专有网格或远程结构的实现。
然而,Spring Batch 确实提供了一个有用的实现,它使用Spring 的策略在单独的执行线程中本地PartitionHandler
执行Step
实例
。TaskExecutor
该实现称为
TaskExecutorPartitionHandler
.
这TaskExecutorPartitionHandler
是使用前面显示的 XML 命名空间配置的步骤的默认值。也可以显式配置,如下例所示:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
可以在TaskExecutorPartitionHandler
java 配置中显式配置,如下例所示:
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
该gridSize
属性确定要创建的单独步骤执行的数量,因此它可以与TaskExecutor
. 或者,可以将其设置为大于可用线程数,从而使工作块更小。
这TaskExecutorPartitionHandler
对于 IO 密集型Step
实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供作为Step
远程调用代理的实现来用于远程执行(例如使用 Spring Remoting)。
分区器
有一个更简单的Partitioner
职责:生成执行上下文作为仅用于新步骤执行的输入参数(无需担心重新启动)。它只有一个方法,如下面的接口定义所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值将每个步骤执行的唯一名称 (the
String
) 与 形式的输入参数相关联ExecutionContext
。这些名称稍后在批处理元数据中显示为分区中的步骤名称StepExecutions
。这
ExecutionContext
只是一个名称-值对的包,因此它可能包含一系列主键、行号或输入文件的位置。然后,遥控器Step
通常使用占位符绑定到上下文输入#{…}
(步骤范围内的后期绑定),如下一节所示。
步骤执行的名称(Map
返回 by中的键Partitioner
)在 a 的步骤执行中必须是唯一的,Job
但没有任何其他特定要求。最简单的方法(并使名称对用户有意义)是使用前缀+后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在 中是唯一的Job
),并且后缀只是一个计数器。框架中有一个SimplePartitioner
使用此约定。
调用的可选接口PartitionNameProvider
可用于提供与分区本身分开的分区名称。如果 aPartitioner
实现了这个接口,那么在重新启动时,只查询名称。如果分区很昂贵,这可能是一个有用的优化。提供的名称PartitionNameProvider
必须与提供的名称相匹配Partitioner
。
将输入数据绑定到步骤
由 执行的步骤PartitionHandler
具有相同的配置并且它们的输入参数在运行时从
ExecutionContext
. 这很容易通过 Spring Batch 的 StepScope 特性实现(在后期绑定一节中有更详细的介绍)。例如,如果Partitioner
创建ExecutionContext
具有名为 的属性键的实例,并fileName
为每个步骤调用指向不同的文件(或目录),则Partitioner
输出可能类似于下表的内容:
步骤执行名称(键) |
执行上下文(值) |
文件复制:partition0 |
文件名=/home/data/one |
文件复制:partition1 |
文件名=/home/data/二 |
文件复制:partition2 |
文件名=/home/data/三 |
然后可以使用后期绑定到执行上下文将文件名绑定到步骤。
以下示例显示如何在 XML 中定义后期绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
下面的例子展示了如何在 Java 中定义后期绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}