配置一个Step
正如在领域章节中所讨论的,aStep
是一个领域对象,它封装了批处理作业的独立、顺序阶段,并包含定义和控制实际批处理所需的所有信息。这必然是一个模糊的描述,因为任何给定的内容
Step
都由编写Job
. AStep
可以像开发人员希望的那样简单或复杂。一个简单的Step
可能将数据从文件加载到数据库中,需要很少或不需要代码(取决于使用的实现)。更复杂的Step
可能具有作为处理的一部分应用的复杂业务规则,如下图所示:
面向块的处理
Spring Batch 在其最常见的实现中使用“面向块”的处理方式。面向块的处理是指一次读取一个数据并创建在事务边界内写出的“块”。一旦读取的项目数等于提交间隔,整个块被 写出
ItemWriter
,然后事务被提交。下图显示了该过程:
以下伪代码以简化形式显示了相同的概念:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
itemWriter.write(items);
面向块的步骤也可以配置为ItemProcessor
在将项目传递给ItemWriter
. 下图显示了ItemProcessor
在步骤中注册 an 时的过程:
下面的伪代码显示了这是如何以简化的形式实现的:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
List processedItems = new Arraylist();
for(Object item: items){
Object processedItem = itemProcessor.process(item);
if (processedItem != null) {
processedItems.add(processedItem);
}
}
itemWriter.write(processedItems);
有关项目处理器及其用例的更多详细信息,请参阅 项目处理部分。
配置一个Step
尽管 a 所需依赖项的列表相对较短Step
,但它是一个极其复杂的类,可能包含许多协作者。
为了简化配置,可以使用 Spring Batch XML 命名空间,如下例所示:
<job id="sampleJob" job-repository="jobRepository">
<step id="step1">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
使用 Java 配置时,可以使用 Spring Batch 构建器,如下例所示:
/**
* Note the JobRepository is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return this.jobBuilderFactory.get("sampleJob")
.repository(jobRepository)
.start(sampleStep)
.build();
}
/**
* Note the TransactionManager is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
return this.stepBuilderFactory.get("sampleStep")
.transactionManager(transactionManager)
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
上面的配置包括创建面向项目的步骤所需的唯一依赖项:
-
reader
:ItemReader
提供要处理的项目。 -
writer
:ItemWriter
处理由 提供的项目ItemReader
。
-
transaction-manager
:SpringPlatformTransactionManager
在处理期间开始并提交事务。
-
transactionManager
:SpringPlatformTransactionManager
在处理期间开始并提交事务。
-
job-repository
:在处理期间(就在提交之前)JobRepository
定期存储StepExecution
和的 XML 特定名称。ExecutionContext
对于内联<step/>
(在 a 中定义的<job/>
),它是<job/>
元素上的一个属性。对于一个独立的<step/>
,它被定义为 <tasklet/> 的一个属性。
-
repository
:在处理期间(就在提交之前)JobRepository
定期存储StepExecution
和的 Java 特定名称。ExecutionContext
-
commit-interval
:提交事务之前要处理的项目数的 XML 特定名称。
-
chunk
:依赖项的 Java 特定名称,表明这是一个基于项目的步骤,以及在提交事务之前要处理的项目数。
需要注意的是job-repository
defaultsjobRepository
和
transaction-manager
defaults transactionManager
。此外,ItemProcessor
是可选的,因为项目可以直接从阅读器传递给作者。
需要注意的是repository
defaults tojobRepository
和transactionManager
defaults to transactionManager
(都是通过基础设施提供的
@EnableBatchProcessing
)。此外,ItemProcessor
是可选的,因为项目可以直接从阅读器传递给作者。
从父母那里继承Step
如果一组Steps
共享相似的配置,那么定义一个“父级”可能会有所帮助,Step
具体Steps
可以从该“父级”继承属性。与 Java 中的类继承类似,“子”Step
将其元素和属性与父的元素和属性相结合。孩子还覆盖了任何父母的Steps
.
在以下示例中Step
,“concreteStep1”继承自“parentStep”。它用 'itemReader'、'itemProcessor'、'itemWriter'、startLimit=5
和
来实例化allowStartIfComplete=true
。此外,它commitInterval
是 '5',因为它被 "concreteStep1" 覆盖,Step
如下例所示:
<step id="parentStep">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep1" parent="parentStep">
<tasklet start-limit="5">
<chunk processor="itemProcessor" commit-interval="5"/>
</tasklet>
</step>
作业元素中的步骤仍需要该id
属性。这有两个原因:
-
持久化
id
时用作步骤名称StepExecution
。如果在作业中的多个步骤中引用了相同的独立步骤,则会发生错误。
-
在创建作业流程时,如本章后面所述,该
next
属性应指流程中的步骤,而不是独立步骤。
抽象的Step
有时,可能需要定义一个Step
不是完整Step
配置的父级。例如,如果配置中不包含reader
、writer
和tasklet
属性Step
,则初始化失败。如果必须在没有这些属性的情况下定义父级,abstract
则应使用该属性。An
abstract
Step
只是扩展,从不实例化。
在以下示例中,Step
abstractParentStep
如果未将其声明为抽象,则不会实例化。“ Step
concreteStep2”具有“itemReader”、“itemWriter”和 commit-interval=10。
<step id="abstractParentStep" abstract="true">
<tasklet>
<chunk commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep2" parent="abstractParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"/>
</tasklet>
</step>
合并列表
上的一些可配置元素Steps
是列表,例如<listeners/>
元素。如果父项和子项都Steps
声明了一个<listeners/>
元素,则子项的列表会覆盖父项的列表。为了允许子级向父级定义的列表添加额外的侦听器,每个列表元素都有一个merge
属性。如果元素指定了merge="true"
,则子列表与父列表合并,而不是覆盖它。
在以下示例中,Step
“concreteStep3”创建了两个侦听器:
listenerOne
和listenerTwo
:
<step id="listenersParentStep" abstract="true">
<listeners>
<listener ref="listenerOne"/>
<listeners>
</step>
<step id="concreteStep3" parent="listenersParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
</tasklet>
<listeners merge="true">
<listener ref="listenerTwo"/>
<listeners>
</step>
提交间隔
如前所述,一个步骤读入和写出项目,定期使用提供的PlatformTransactionManager
. acommit-interval
为 1 时,它会在写入每个单独的项目后提交。这在许多情况下都不太理想,因为开始和提交事务的成本很高。理想情况下,最好在每个事务中处理尽可能多的项目,这完全取决于正在处理的数据类型以及与该步骤交互的资源。因此,可以配置提交中处理的项目数。
下面的示例显示了一个step
其
值为 10 的值,正如它在 XML 中定义的那样tasklet
:commit-interval
<job id="sampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
以下示例显示了一个step
其
值为 10 的值,正如它在 Java 中定义的那样tasklet
:commit-interval
@Bean
public Job sampleJob() {
return this.jobBuilderFactory.get("sampleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
在前面的示例中,每个事务中处理了 10 个项目。在处理开始时,事务开始。此外,每次read
在 上调用时
ItemReader
,都会增加一个计数器。当它达到 10 时,聚合项目列表被传递给ItemWriter
,并且事务被提交。
配置Step
重启
在“配置和运行作业”部分中,
Job
讨论了重新启动 a。重新启动对步骤有很多影响,因此可能需要一些特定的配置。
设置开始限制
在许多情况下,您可能希望控制 aStep
可以启动的次数。例如,Step
可能需要配置一个特定的,以便它只运行一次,因为它会使某些必须手动修复的资源无效,然后才能再次运行。这可以在步骤级别进行配置,因为不同的步骤可能有不同的要求。Step
只能执行一次的 A可以作为可以无限运行Job
的 a 的一部分存在。Step
以下代码片段显示了 XML 中的开始限制配置示例:
<step id="step1">
<tasklet start-limit="1">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
以下代码片段显示了 Java 中的开始限制配置示例:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
前面示例中显示的步骤只能运行一次。尝试再次运行它会导致 aStartLimitExceededException
被抛出。请注意,开始限制的默认值为Integer.MAX_VALUE
.
重新启动已完成Step
在可重新启动作业的情况下,可能有一个或多个步骤应该始终运行,无论它们第一次是否成功。一个示例可能是验证步骤或Step
在处理之前清理资源的步骤。在重新启动作业的正常处理过程中,将跳过任何状态为“已完成”的步骤,这意味着它已经成功完成。设置allow-start-if-complete
为“true”会覆盖此设置,以便该步骤始终运行。
以下代码片段显示了如何在 XML 中定义可重新启动的作业:
<step id="step1">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
以下代码片段显示了如何在 Java 中定义可重新启动的作业:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
Step
重启配置示例
以下 XML 示例显示如何将作业配置为具有可重新启动的步骤:
<job id="footballJob" restartable="true">
<step id="playerload" next="gameLoad">
<tasklet>
<chunk reader="playerFileItemReader" writer="playerWriter"
commit-interval="10" />
</tasklet>
</step>
<step id="gameLoad" next="playerSummarization">
<tasklet allow-start-if-complete="true">
<chunk reader="gameFileItemReader" writer="gameWriter"
commit-interval="10"/>
</tasklet>
</step>
<step id="playerSummarization">
<tasklet start-limit="2">
<chunk reader="playerSummarizationSource" writer="summaryWriter"
commit-interval="10"/>
</tasklet>
</step>
</job>
以下 Java 示例显示了如何将作业配置为具有可重新启动的步骤:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
@Bean
public Step playerLoad() {
return this.stepBuilderFactory.get("playerLoad")
.<String, String>chunk(10)
.reader(playerFileItemReader())
.writer(playerWriter())
.build();
}
@Bean
public Step gameLoad() {
return this.stepBuilderFactory.get("gameLoad")
.allowStartIfComplete(true)
.<String, String>chunk(10)
.reader(gameFileItemReader())
.writer(gameWriter())
.build();
}
@Bean
public Step playerSummarization() {
return this.stepBuilderFactory.get("playerSummarization")
.startLimit(2)
.<String, String>chunk(10)
.reader(playerSummarizationSource())
.writer(summaryWriter())
.build();
}
前面的示例配置用于加载有关足球比赛的信息并对其进行汇总的作业。它包含三个步骤:playerLoad
、gameLoad
和
playerSummarization
。该playerLoad
步骤从平面文件加载玩家信息,而该gameLoad
步骤对游戏执行相同操作。最后一步,
playerSummarization
然后根据提供的游戏汇总每个玩家的统计数据。假设加载的文件playerLoad
只能加载一次,但gameLoad
可以加载在特定目录中找到的任何游戏,在成功加载到数据库后将其删除。因此,该playerLoad
步骤不包含其他配置。它可以启动任意次数,如果完成,则跳过。这gameLoad
但是,每次都需要运行 step,以防自上次运行以来添加了额外的文件。它已将“allow-start-if-complete”设置为“true”,以便始终启动。(假设加载的数据库桌面游戏上有一个进程指示器,以确保汇总步骤可以正确找到新游戏)。作业中最重要的汇总步骤配置为开始限制为 2。这很有用,因为如果该步骤连续失败,则会将新的退出代码返回给控制作业执行的操作员,并且它可以在进行手动干预之前不要重新开始。
该作业为本文档提供了一个示例, |
本节的其余部分描述了该
footballJob
示例的三个运行中的每一个会发生什么。
运行 1:
-
playerLoad
运行并成功完成,将 400 名玩家添加到“PLAYERS”表中。 -
gameLoad
运行并处理 11 个游戏数据文件,将其内容加载到“游戏”表中。 -
playerSummarization
开始处理并在 5 分钟后失败。
运行 2:
-
playerLoad
不运行,因为它已经成功完成,并且allow-start-if-complete
是“假”(默认值)。 -
gameLoad
再次运行并处理另外 2 个文件,将它们的内容也加载到“GAMES”表中(过程指示器指示它们尚未处理)。 -
playerSummarization
开始处理所有剩余的游戏数据(使用进程指示器过滤)并在 30 分钟后再次失败。
运行 3:
-
playerLoad
不运行,因为它已经成功完成,并且allow-start-if-complete
是“假”(默认值)。 -
gameLoad
再次运行并处理另外 2 个文件,将它们的内容也加载到“GAMES”表中(过程指示器指示它们尚未处理)。 -
playerSummarization
未启动并且作业立即终止,因为这是 的第三次执行playerSummarization
,其限制仅为 2。必须提高限制或Job
必须作为新的JobInstance
.
配置跳过逻辑
在许多情况下,处理过程中遇到的错误不应该导致
Step
失败,而是应该跳过。这通常是必须由了解数据本身及其含义的人做出的决定。例如,财务数据可能无法跳过,因为它会导致资金转移,这需要完全准确。另一方面,加载供应商列表可能会允许跳过。如果供应商由于格式不正确或缺少必要信息而未加载,则可能没有问题。通常,这些不良记录也会被记录下来,稍后讨论侦听器时会涉及到这些记录。
以下 XML 示例显示了使用跳过限制的示例:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file.FlatFileParseException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
以下 Java 示例显示了使用跳过限制的示例:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
在前面的示例中,FlatFileItemReader
使用了 a。如果在任何时候
FlatFileParseException
抛出 a,则跳过该项目并计入 10 的总跳过限制。声明的异常(及其子类)可能会在块处理的任何阶段(读取、处理、写入)抛出但在步骤执行中读取、处理和写入的跳过是单独计数的,但该限制适用于所有跳过。一旦达到跳过限制,发现的下一个异常会导致该步骤失败。换句话说,第十一次跳过会触发异常,而不是第十次。
前面示例的一个问题是除 a 之外的任何其他异常
FlatFileParseException
都会导致Job
失败。在某些情况下,这可能是正确的行为。但是,在其他情况下,可能更容易识别哪些异常应该导致失败并跳过其他所有内容。
以下 XML 示例显示了排除特定异常的示例:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
<exclude class="java.io.FileNotFoundException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
以下 Java 示例显示了一个排除特定异常的示例:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
通过标识java.lang.Exception
为可跳过的异常类,配置指示所有Exceptions
都是可跳过的。但是,通过 '排除'
java.io.FileNotFoundException
,配置将可跳过的异常类列表细化为全部Exceptions
except FileNotFoundException
。如果遇到任何排除的异常类都是致命的(也就是说,它们不会被跳过)。
对于遇到的任何异常,可跳过性由类层次结构中最近的超类确定。任何未分类的异常都被视为“致命”。
<include/>
和元素的顺序<exclude/>
无关紧要。
skip
and方法调用的顺序noSkip
无关紧要。
配置重试逻辑
在大多数情况下,您希望异常导致跳过或Step
失败。但是,并非所有异常都是确定性的。如果FlatFileParseException
在读取时遇到 a,则始终为该记录抛出它。重置ItemReader
没有帮助。但是,对于其他异常,例如 a DeadlockLoserDataAccessException
,它表示当前进程已尝试更新另一个进程持有锁定的记录。等待并再次尝试可能会成功。
在 XML 中,重试应该配置如下:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
commit-interval="2" retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
在Java中,重试应该配置如下:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
Step
允许限制单个项目可以重试的次数以及“可重试”的异常列表。有关重试如何工作的更多详细信息,请参阅
retry。
控制回滚
默认情况下,无论重试还是跳过,任何引发的异常都会ItemWriter
导致由事务控制的事务Step
回滚。如果按照前面描述的方式配置了 skip,则从 抛出的异常ItemReader
不会导致回滚。但是,在许多情况下,从 引发的异常ItemWriter
不应导致回滚,因为没有采取任何措施使事务无效。出于这个原因,Step
可以配置一个不应导致回滚的异常列表。
在 XML 中,您可以按如下方式控制回滚:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<no-rollback-exception-classes>
<include class="org.springframework.batch.item.validator.ValidationException"/>
</no-rollback-exception-classes>
</tasklet>
</step>
在 Java 中,您可以按如下方式控制回滚:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class)
.build();
}
交易读者
的基本合同ItemReader
是它只是向前的。步骤缓冲阅读器输入,因此在回滚的情况下,不需要从阅读器重新读取项目。但是,在某些情况下,阅读器构建在事务资源之上,例如 JMS 队列。在这种情况下,由于队列与回滚的事务相关联,因此从队列中拉出的消息将被放回。因此,可以将该步骤配置为不缓冲项目。
以下示例显示了如何创建不缓冲 XML 中的项目的阅读器:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"
is-reader-transactional-queue="true"/>
</tasklet>
</step>
以下示例显示了如何在 Java 中创建不缓冲项目的阅读器:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue()
.build();
}
交易属性
事务属性可用于控制isolation
、propagation
和
timeout
设置。更多关于设置事务属性的信息可以在
Spring 核心文档中找到。
以下示例在 XML 中设置isolation
、propagation
和timeout
transaction 属性:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<transaction-attributes isolation="DEFAULT"
propagation="REQUIRED"
timeout="30"/>
</tasklet>
</step>
以下示例在 Java 中设置isolation
、propagation
和timeout
transaction 属性:
@Bean
public Step step1() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute)
.build();
}
注册ItemStream
_Step
该步骤ItemStream
必须在其生命周期的必要点处理回调(有关ItemStream
接口的更多信息,请参阅
ItemStream)。如果步骤失败并且可能需要重新启动,这一点至关重要,因为ItemStream
接口是步骤获取有关执行之间的持久状态所需的信息的地方。
如果ItemReader
、ItemProcessor
或ItemWriter
本身实现了ItemStream
接口,那么这些会自动注册。任何其他流都需要单独注册。这通常是间接依赖关系(例如委托)被注入读取器和写入器的情况。可以
step
通过 'stream' 元素在 上注册流。
以下示例显示如何在 XML中注册stream
a :step
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
<streams>
<stream ref="fileItemWriter1"/>
<stream ref="fileItemWriter2"/>
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="compositeWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<beans:property name="delegates">
<beans:list>
<beans:ref bean="fileItemWriter1" />
<beans:ref bean="fileItemWriter2" />
</beans:list>
</beans:property>
</beans:bean>
以下示例显示了如何在 Java中注册stream
a :step
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
/**
* In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
* necessary, but used for an example.
*/
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
在上面的示例中, theCompositeItemWriter
不是ItemStream
,但它的两个代表都是。因此,两个委托编写器都必须显式注册为流,以便框架正确处理它们。ItemReader
不需要显式注册为流,因为它是Step
. 该步骤现在可以重新启动,并且读取器和写入器的状态在发生故障时正确保持。
拦截Step
执行
与 一样,Job
在执行期间有许多事件,Step
用户可能需要执行某些功能。例如,为了写出需要页脚的平面文件,需要在完成ItemWriter
时通知Step
,以便可以写入页脚。这可以通过许多
Step
作用域侦听器之一来完成。
任何实现扩展之一的类StepListener
(但不是该接口本身,因为它是空的)都可以应用于通过listeners
元素的步骤。该listeners
元素在步骤、tasklet 或块声明中有效。建议您在其功能应用的级别声明侦听器,或者,如果它是多功能的(例如StepExecutionListener
and ItemReadListener
),则在其应用的最细粒度级别声明它。
以下示例显示了在 XML 中的块级别应用的侦听器:
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
以下示例显示了在 Java 中的块级别应用的侦听器:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
一个ItemReader
, ItemWriter
orItemProcessor
本身实现了接口之一
的if 使用命名空间元素或工厂之一StepListener
自动注册。这仅适用于直接注入. 如果侦听器嵌套在另一个组件中,则需要显式注册(如前面
使用 a注册中所述)。Step
<step>
*StepFactoryBean
Step
ItemStream
Step
除了StepListener
接口之外,还提供了注释来解决相同的问题。普通的旧 Java 对象可以具有带有这些注释的方法,然后将其转换为相应的StepListener
类型。注释块组件的自定义实现也很常见,例如ItemReader
orItemWriter
或
Tasklet
。注释由 XML 解析器分析<listener/>
元素并使用listener
构建器中的方法注册,因此您需要做的就是使用 XML 命名空间或构建器通过一个步骤注册侦听器。
StepExecutionListener
StepExecutionListener
代表最通用的Step
执行监听器。它允许在 aStep
启动之前和结束之后进行通知,无论它是正常结束还是失败,如以下示例所示:
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
ExitStatus
是的返回类型,afterStep
以便让侦听器有机会修改在完成 a 时返回的退出代码Step
。
该接口对应的注解为:
-
@BeforeStep
-
@AfterStep
ChunkListener
块被定义为在事务范围内处理的项目。在每个提交间隔提交一个事务,提交一个“块”。AChunkListener
可用于在块开始处理之前或块成功完成之后执行逻辑,如下面的接口定义所示:
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
beforeChunk 方法在事务启动后调用,但在ItemReader
. 相反,afterChunk
在提交块之后调用(如果有回滚则根本不调用)。
该接口对应的注解为:
-
@BeforeChunk
-
@AfterChunk
-
@AfterChunkError
AChunkListener
可以在没有块声明时应用。TaskletStep
负责调用,因此ChunkListener
它也适用于非面向项目的tasklet(它在tasklet 之前和之后调用)。
ItemReadListener
之前讨论跳过逻辑时,提到过记录跳过的记录可能会有好处,以便以后处理。在读取错误的情况下,可以使用 来完成ItemReaderListener
,如以下接口定义所示:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead
在每次调用 read 之前调用该方法ItemReader
。afterRead
每次成功调用 read 后都会调用该
方法,并将读取的项目传递给该方法。如果读取时出错,onReadError
则调用该方法。提供遇到的异常以便可以记录它。
该接口对应的注解为:
-
@BeforeRead
-
@AfterRead
-
@OnReadError
ItemProcessListener
就像使用 一样,ItemReadListener
可以“监听”一个项目的处理,如下面的接口定义所示:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
该beforeProcess
方法在之前被调用process
,ItemProcessor
并被交给要处理的项目。afterProcess
成功处理项目后调用该方法。如果处理时出错,
onProcessError
则调用该方法。提供遇到的异常和尝试处理的项目,以便记录它们。
该接口对应的注解为:
-
@BeforeProcess
-
@AfterProcess
-
@OnProcessError
ItemWriteListener
项目的写入可以用 来“收听” ItemWriteListener
,如下面的接口定义所示:
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
该beforeWrite
方法在之前被调用write
,ItemWriter
并被传递给写入的项目列表。afterWrite
成功写入项目后调用该方法。如果写入时出错,onWriteError
则调用该方法。提供遇到的异常和尝试写入的项目,以便记录它们。
该接口对应的注解为:
-
@BeforeWrite
-
@AfterWrite
-
@OnWriteError
SkipListener
ItemReadListener
, ItemProcessListener
, 和ItemWriteListener
所有都提供了错误通知机制,但没有一个通知您记录实际上已被跳过。onWriteError
例如,即使一个项目被重试并且成功,也会被调用。为此,有一个单独的接口用于跟踪跳过的项目,如下面的接口定义所示:
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
onSkipInRead
每当阅读时跳过项目时调用。需要注意的是,回滚可能会导致同一项目被多次注册为跳过。
onSkipInWrite
在写入时跳过项目时调用。因为项目已被成功读取(并且没有被跳过),所以项目本身也作为参数提供。
该接口对应的注解为:
-
@OnSkipInRead
-
@OnSkipInWrite
-
@OnSkipInProcess
TaskletStep
面向块的处理并不是在
Step
. 如果Step
必须包含一个简单的存储过程调用怎么办?您可以将调用实现为 anItemReader
并在过程完成后返回 null。但是,这样做有点不自然,因为需要一个 no-op ItemWriter
。Spring BatchTaskletStep
为这个场景提供了。
Tasklet
是一个简单的接口,它有一个方法,execute
被 反复调用,TaskletStep
直到它返回RepeatStatus.FINISHED
或抛出异常以表示失败。对 a 的每个调用Tasklet
都包含在一个事务中。
Tasklet
实现者可能会调用存储过程、脚本或简单的 SQL 更新语句。
要在 XML 中创建一个,元素TaskletStep
的 'ref' 属性<tasklet/>
应该引用一个定义Tasklet
对象的 bean。不应<chunk/>
在<tasklet/>
. 以下示例显示了一个简单的 tasklet:
<step id="step1">
<tasklet ref="myTasklet"/>
</step>
要在 Java 中创建一个TaskletStep
,传递给tasklet
构建器的方法的 bean 应该实现该Tasklet
接口。chunk
构建TaskletStep
. _ 以下示例显示了一个简单的 tasklet:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(myTasklet())
.build();
}
|
TaskletAdapter
ItemReader
与和ItemWriter
接口的其他适配器一样,该Tasklet
接口包含一个实现,该实现允许自己适应任何预先存在的类:TaskletAdapter
. 这可能有用的一个示例是用于更新一组记录上的标志的现有 DAO。可TaskletAdapter
用于调用此类,而无需为Tasklet
接口编写适配器。
以下示例显示如何TaskletAdapter
在 XML 中定义 a:
<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
<property name="targetObject">
<bean class="org.mycompany.FooDao"/>
</property>
<property name="targetMethod" value="updateFoo" />
</bean>
以下示例显示了如何TaskletAdapter
在 Java 中定义 a:
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
示例Tasklet
实现
许多批处理作业包含必须在主处理开始之前完成的步骤,以便设置各种资源或在处理完成后清理这些资源。在大量使用文件的作业的情况下,通常需要在将某些文件成功上传到另一个位置后在本地删除它们。以下示例(取自
Spring Batch 示例项目)是一个Tasklet
具有此类责任的实现:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(directory, "directory must be set");
}
}
前面的tasklet
实现删除给定目录中的所有文件。需要注意的是,该execute
方法只被调用一次。剩下的就是tasklet
从step
.
以下示例显示如何tasklet
从step
XML 中引用:
<job id="taskletJob">
<step id="deleteFilesInDir">
<tasklet ref="fileDeletingTasklet"/>
</step>
</job>
<beans:bean id="fileDeletingTasklet"
class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
<beans:property name="directoryResource">
<beans:bean id="directory"
class="org.springframework.core.io.FileSystemResource">
<beans:constructor-arg value="target/test-outputs/test-dir" />
</beans:bean>
</beans:property>
</beans:bean>
以下示例显示了如何在 Java中引用tasklet
from :step
@Bean
public Job taskletJob() {
return this.jobBuilderFactory.get("taskletJob")
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir() {
return this.stepBuilderFactory.get("deleteFilesInDir")
.tasklet(fileDeletingTasklet())
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}
控制步骤流程
由于能够在拥有的作业中将步骤分组在一起,因此需要能够控制作业如何从一个步骤“流动”到另一个步骤。a 的失败Step
并不一定意味着Job
应该失败。此外,可能有不止一种类型的“成功”来确定Step
接下来应该执行哪一个。根据一组的Steps
配置方式,某些步骤甚至可能根本不被处理。
顺序流
最简单的流程场景是所有步骤都按顺序执行的作业,如下图所示:
这可以通过在 a 中使用“下一个”来实现step
。
以下示例显示了如何next
在 XML 中使用该属性:
<job id="job">
<step id="stepA" parent="s1" next="stepB" />
<step id="stepB" parent="s2" next="stepC"/>
<step id="stepC" parent="s3" />
</job>
以下示例显示了如何next()
在 Java 中使用该方法:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
在上面的场景中,“步骤 A”首先运行,因为它是第一个Step
列出的。如果“步骤 A”正常完成,则“步骤 B”运行,依此类推。但是,如果“步骤 A”失败,则整个Job
失败并且“步骤 B”不执行。
使用 Spring Batch XML 命名空间,配置中列出的第一步
始终是 |
条件流
在上面的例子中,只有两种可能:
-
step
成功了,应该执行下一个step
。 -
失败的
step
,因此,job
应该失败。
在许多情况下,这可能就足够了。但是,如果 a 的失败step
应该触发不同step
的而不是导致失败,那该怎么办?下图显示了这样一个流程:
为了处理更复杂的场景,Spring Batch XML 命名空间允许在 step 元素中定义转换元素。一种这样的过渡是next
元素。与next
属性一样,next
元素告诉接下来要执行Job
哪个。Step
但是,与属性不同的next
是,给定的 允许有任意数量的元素Step
,并且在失败的情况下没有默认行为。这意味着,如果使用转换元素,则Step
必须明确定义转换的所有行为。另请注意,单个步骤不能同时具有next
属性和transition
元素。
该next
元素指定要匹配的模式和接下来要执行的步骤,如下例所示:
<job id="job">
<step id="stepA" parent="s1">
<next on="*" to="stepB" />
<next on="FAILED" to="stepC" />
</step>
<step id="stepB" parent="s2" next="stepC" />
<step id="stepC" parent="s3" />
</job>
Java API 提供了一组流畅的方法,可让您指定流程以及步骤失败时的操作。以下示例显示如何指定一个步骤 ( ),然后根据是否
成功stepA
继续执行两个不同步骤 (stepB
和) 中的任何一个:stepC
stepA
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
使用 XML 配置时,on
过渡元素的属性使用简单的模式匹配方案来ExitStatus
匹配
Step
.
使用 java 配置时,该on()
方法使用简单的模式匹配方案来ExitStatus
匹配Step
.
模式中只允许使用两个特殊字符:
-
"*" 匹配零个或多个字符
-
“?” 恰好匹配一个字符
例如,“c*t”匹配“cat”和“count”,而“c?t”匹配“cat”但不匹配“count”。
虽然 a 上的过渡元素的数量没有限制,但Step
如果Step
执行导致aExitStatus
未被元素覆盖,则框架会抛出异常并且Job
失败。该框架会自动对从最具体到最不具体的转换进行排序。这意味着,即使在上面的示例中将排序替换为“stepA”,ExitStatus
“FAILED”的一个仍然会转到“stepC”。
批处理状态与退出状态
在为条件流配置 a 时,了解和Job
之间的区别很重要。是一个枚举,它是和的属性,框架使用它来记录 a或的状态。它可以是下列值之一:
, , , , , , , 或
. 它们中的大多数是不言自明的:是在步骤或作业成功完成时设置的状态,在失败时设置,等等。BatchStatus
ExitStatus
BatchStatus
JobExecution
StepExecution
Job
Step
COMPLETED
STARTING
STARTED
STOPPING
STOPPED
FAILED
ABANDONED
UNKNOWN
COMPLETED
FAILED
以下示例在使用 XML 配置时包含“下一个”元素:
<next on="FAILED" to="stepB" />
以下示例在使用 Java 配置时包含“on”元素:
...
.from(stepA()).on("FAILED").to(stepB())
...
乍一看,'on' 似乎引用了它所属BatchStatus
的 the 。Step
但是,它实际上引用ExitStatus
了Step
. 顾名思义,ExitStatus
代表 aStep
完成执行后的状态。
更具体地说,当使用 XML 配置时,前面 XML 配置示例中显示的“下一个”元素引用ExitStatus
.
使用 Java 配置时,上述 Java 配置示例中显示的“on()”方法引用ExitStatus
.
FAILED
在英语中,它说:“如果退出代码是” ,则转到步骤B。默认情况下,退出代码始终与 的 相同BatchStatus
,Step
这就是上面的条目有效的原因。但是,如果退出代码需要不同怎么办?一个很好的例子来自示例项目中的跳过示例作业:
以下示例显示了如何在 XML 中使用不同的退出代码:
<step id="step1" parent="s1">
<end on="FAILED" />
<next on="COMPLETED WITH SKIPS" to="errorPrint1" />
<next on="*" to="step2" />
</step>
以下示例显示了如何在 Java 中使用不同的退出代码:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()).on("FAILED").end()
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
.from(step1()).on("*").to(step2())
.end()
.build();
}
step1
有三种可能:
-
失败的
Step
,在这种情况下,作业应该失败。 -
Step
顺利完成。 -
成功完成,
Step
但退出代码为“COMPLETED WITH SKIPS”。在这种情况下,应运行不同的步骤来处理错误。
上述配置有效。但是,需要根据跳过记录的执行条件更改退出代码,如下例所示:
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
上面的代码StepExecutionListener
首先检查以确保Step
成功,然后检查跳过计数StepExecution
是否高于 0。如果两个条件都满足,则返回ExitStatus
退出代码为的新的
。COMPLETED WITH SKIPS
配置停止
在讨论了BatchStatus 和 ExitStatus 之后,人们可能想知道BatchStatus
和ExitStatus
是如何确定的Job
。虽然这些状态是Step
由执行的代码确定的,但它们的状态是Job
根据配置确定的。
到目前为止,讨论的所有作业配置都至少有一个Step
没有过渡的最终配置。
在以下 XML 示例中,step
执行后,Job
结束:
<step id="stepC" parent="s3"/>
在以下 Java 示例中,step
执行后,Job
结束:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.build();
}
如果没有为 a 定义转换Step
,则 的状态Job
定义如下:
-
如果
Step
以 FAILED 结尾,ExitStatus
则BatchStatus
和ExitStatus
都是。Job
FAILED
-
否则,的
BatchStatus
和ExitStatus
都是。Job
COMPLETED
虽然这种终止批处理作业的方法对于某些批处理作业(例如简单的顺序步骤作业)来说已经足够了,但可能需要自定义定义的作业停止方案。为此,Spring Batch 提供了三个过渡元素来停止 a Job
(除了我们之前讨论的next
元素)。这些停止元素中的每一个都Job
以特定的BatchStatus
. 需要注意的是,停止
过渡BatchStatus
元素ExitStatus
对Steps
. Job
这些元素仅影响Job
. 例如,作业中的每个步骤的状态可能为 ,FAILED
但作业的状态可能为COMPLETED
。
一步结束
配置 step end 指示 aJob
以 a BatchStatus
of停止COMPLETED
。Job
已完成状态的 A
COMPLETED
无法重新启动(框架抛出 a JobInstanceAlreadyCompleteException
)。
当使用 XML 配置时,'end' 元素用于此任务。该end
元素还允许一个可选的“退出代码”属性,可用于自
定义ExitStatus
. Job
如果没有给出 'exit-code' 属性,则ExitStatus
默认
COMPLETED
情况下是匹配BatchStatus
.
当使用 Java 配置时,'end' 方法用于此任务。该end
方法还允许使用可选的“exitStatus”参数来自
ExitStatus
定义Job
. 如果没有提供 'exitStatus' 值,则ExitStatus
默认
COMPLETED
情况下是匹配BatchStatus
.
考虑以下场景:如果step2
失败,则以 a of和 an ofJob
停止
并且不运行。否则,执行移至。请注意,如果失败,则不可重新启动(因为状态为)。BatchStatus
COMPLETED
ExitStatus
COMPLETED
step3
step3
step2
Job
COMPLETED
以下示例显示了 XML 中的场景:
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<end on="FAILED"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
以下示例显示了 Java 中的场景:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.on("FAILED").end()
.from(step2()).on("*").to(step3())
.end()
.build();
}
一步失败
将步骤配置为在给定点失败会指示 a以Job
a 停止
BatchStatus
。FAILED
与 end 不同, a 的失败Job
不会阻止 theJob
重新启动。
ExitStatus
当使用 XML 配置时,'fail' 元素还允许一个可选的 'exit-code' 属性,该属性可用于自定义Job
. 如果没有给出 'exit-code' 属性,则ExitStatus
默认FAILED
情况下是匹配
BatchStatus
.
考虑以下场景,如果step2
失败,则以 a of和 an ofJob
停止
并且不执行。否则,执行移至。此外,如果失败并
重新启动,则在 上重新开始执行。BatchStatus
FAILED
ExitStatus
EARLY TERMINATION
step3
step3
step2
Job
step2
以下示例显示了 XML 中的场景:
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<fail on="FAILED" exit-code="EARLY TERMINATION"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
以下示例显示了 Java 中的场景:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(step2()).on("FAILED").fail()
.from(step2()).on("*").to(step3())
.end()
.build();
}
在给定步骤停止作业
将作业配置为在特定步骤停止会指示 a以Job
a 停止
BatchStatus
。STOPPED
停止 aJob
可以暂时中断处理,以便操作员可以在重新启动Job
.
使用 XML 配置时,'stop' 元素需要一个 'restart' 属性,该属性指定重新启动作业时应执行的步骤。
使用 Java 配置时,该stopAndRestart
方法需要一个“重新启动”属性,该属性指定重新启动作业时应执行的步骤。
考虑以下场景:如果step1
以 结束COMPLETE
,则作业将停止。重新启动后,开始执行step2
。
以下清单显示了 XML 中的场景:
<step id="step1" parent="s1">
<stop on="COMPLETED" restart="step2"/>
</step>
<step id="step2" parent="s2"/>
以下示例显示了 Java 中的场景:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()).on("COMPLETED").stopAndRestart(step2())
.end()
.build();
}
程序化流程决策
在某些情况下,ExitStatus
可能需要比确定下一步执行的步骤更多的信息。在这种情况下,JobExecutionDecider
可以使用 a 来辅助决策,如下例所示:
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
在以下示例作业配置中,adecision
指定要使用的决策者以及所有转换:
<job id="job">
<step id="step1" parent="s1" next="decision" />
<decision id="decision" decider="decider">
<next on="FAILED" to="step2" />
<next on="COMPLETED" to="step3" />
</decision>
<step id="step2" parent="s2" next="step3"/>
<step id="step3" parent="s3" />
</job>
<beans:bean id="decider" class="com.MyDecider"/>
在下面的示例中,当使用 Java 配置时,实现 bean 的 beanJobExecutionDecider
直接传递给调用。next
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
拆分流
到目前为止描述的每个场景都涉及Job
以线性方式一次执行其步骤的 a。除了这种典型的风格,Spring Batch 还允许使用并行流配置作业。
XML 命名空间允许您使用“拆分”元素。如以下示例所示,“split”元素包含一个或多个“流”元素,其中可以定义整个单独的流。'split' 元素还可以包含任何先前讨论的转换元素,例如'next' 属性或'next'、'end' 或'fail' 元素。
<split id="split1" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
基于 Java 的配置允许您通过提供的构建器配置拆分。如以下示例所示,“split”元素包含一个或多个“流”元素,其中可以定义整个单独的流。'split' 元素还可以包含任何先前讨论的转换元素,例如'next' 属性或'next'、'end' 或'fail' 元素。
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public Job job(Flow flow1, Flow flow2) {
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4())
.end()
.build();
}
外部化流程定义和作业之间的依赖关系
作业中的部分流程可以外部化为单独的 bean 定义,然后重新使用。有两种方法可以做到这一点。第一种是简单地将流声明为对其他地方定义的流的引用。
以下示例显示如何将流声明为对 XML 中其他地方定义的流的引用:
<job id="job">
<flow id="job1.flow1" parent="flow1" next="step3"/>
<step id="step3" parent="s3"/>
</job>
<flow id="flow1">
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
以下示例显示了如何将流声明为对 Java 中其他地方定义的流的引用:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(flow1())
.next(step3())
.end()
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
如上例所示定义外部流程的效果是将外部流程中的步骤插入到作业中,就好像它们已被声明为内联一样。这样,许多作业可以引用相同的模板流,并将这些模板组合成不同的逻辑流。这也是分离各个流的集成测试的好方法。
外部化流程的另一种形式是使用JobStep
. AJobStep
类似于 a
FlowStep
但实际上为指定流程中的步骤创建并启动单独的作业执行。
以下示例是JobStep
XML 中的 a 示例:
<job id="jobStepJob" restartable="true">
<step id="jobStepJob.step1">
<job ref="job" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor"/>
</step>
</job>
<job id="job" restartable="true">...</job>
<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
<property name="keys" value="input.file"/>
</bean>
以下示例显示了JobStep
Java 中的 a 示例:
@Bean
public Job jobStepJob() {
return this.jobBuilderFactory.get("jobStepJob")
.start(jobStepJobStep1(null))
.build();
}
@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
return this.stepBuilderFactory.get("jobStepJobStep1")
.job(job())
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor())
.build();
}
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"input.file"});
return extractor;
}
作业参数提取器是一种策略,用于确定如何将ExecutionContext
for theStep
转换为JobParameters
for theJob
运行。JobStep
当您希望有一些更精细的选项来监视和报告作业和步骤时,这很有用。使用JobStep
通常也是对以下问题的一个很好的回答:“我如何创建作业之间的依赖关系?” 将大型系统分解为较小的模块并控制作业流程是一种好方法。
Job
和Step
属性的后期绑定
前面显示的 XML 和平面文件示例都使用 SpringResource
抽象来获取文件。这是有效的,因为Resource
有一个getFile
方法,它返回一个
java.io.File
. XML 和平面文件资源都可以使用标准 Spring 结构进行配置:
以下示例显示了 XML 中的后期绑定:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource"
value="file://outputs/file.txt" />
</bean>
以下示例显示了 Java 中的后期绑定:
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource("file://outputs/file.txt"))
...
}
前面Resource
从指定的文件系统位置加载文件。请注意,绝对位置必须以双斜杠 ( //
) 开头。在大多数 Spring 应用程序中,这种解决方案已经足够好了,因为这些资源的名称在编译时是已知的。但是,在批处理场景中,可能需要在运行时将文件名确定为作业的参数。这可以使用“-D”参数读取系统属性来解决。
以下示例显示如何从 XML 中的属性读取文件名:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="${input.file.name}" />
</bean>
下面展示了如何从 Java 中的属性中读取文件名:
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
该解决方案需要的只是一个系统参数(例如
-Dinput.file.name="file://outputs/file.txt"
)。
虽然在PropertyPlaceholderConfigurer 这里可以使用 a ,但如果系统属性始终设置,则没有必要,因为ResourceEditor Spring 中的 已经过滤并在系统属性上进行了占位符替换。
|
通常,在批处理设置中,最好在
JobParameters
作业中参数化文件名,而不是通过系统属性,然后以这种方式访问它们。Job
为了实现这一点,
Spring Batch 允许后期绑定各种Step
属性。
以下示例显示如何在 XML 中参数化文件名:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>
以下示例显示如何在 Java 中参数化文件名:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
JobExecution
和StepExecution
级别都ExecutionContext
可以以相同的方式访问。
以下示例显示了如何访问ExecutionContext
XML 中的内容:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>
以下示例显示了如何ExecutionContext
在 Java 中访问:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
任何使用后期绑定的 bean 都必须用 scope="step" 声明。有关详细信息,请参阅
步骤范围。应该注意的是, |
如果您使用的是 Spring 3.0(或更高版本),则步进范围 bean 中的表达式使用 Spring 表达式语言,这是一种功能强大的通用语言,具有许多有趣的特性。为了提供向后兼容性,如果 Spring Batch 检测到旧版本 Spring 的存在,它会使用一种功能较弱且解析规则略有不同的原生表达式语言。主要区别在于上面示例中的映射键在 Spring 2.5 中不需要引用,但在 Spring 3.0 中引号是强制性的。 |
步骤范围
前面显示的所有后期绑定示例都在 bean 定义中声明了“步骤”范围。
以下示例显示了在 XML 中绑定到步骤范围的示例:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>
以下示例显示了在 Java 中绑定到步骤范围的示例:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
为了使用后期绑定,需要使用范围Step
,因为在开始之前实际上无法实例化 bean Step
,以允许找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须显式添加作用域,方法是使用batch
命名空间或显式包含 bean 定义StepScope
,或使用@EnableBatchProcessing
注解。仅使用其中一种方法。以下示例使用batch
命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
以下示例显式包含 bean 定义:
<bean class="org.springframework.batch.core.scope.StepScope" />
工作范围
Job
Step
在 Spring Batch 3.0 中引入的范围与配置中的范围相似,但它是Job
上下文的范围,因此每个正在运行的作业只有一个此类 bean 的实例。此外,还支持后期绑定可从JobContext
using#{..}
占位符访问的引用。使用此功能,可以从作业或作业执行上下文和作业参数中提取 bean 属性。
以下示例显示了在 XML 中绑定到作业范围的示例:
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>
以下示例显示了在 Java 中绑定到作业范围的示例:
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
因为默认情况下它不是 Spring 容器的一部分,所以必须显式添加范围,通过使用batch
命名空间,通过显式包含 JobScope 的 bean 定义,或使用@EnableBatchProcessing
注解(但不是全部)。以下示例使用batch
命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
以下示例包含一个明确定义的 bean JobScope
:
<bean class="org.springframework.batch.core.scope.JobScope" />
在多线程或分区步骤中使用作业范围的 bean 存在一些实际限制。Spring Batch 不控制在这些用例中产生的线程,因此无法正确设置它们以使用此类 bean。因此,不建议在多线程或分区步骤中使用作业范围的 bean。 |