Batch 的领域语言

对于任何有经验的批处理架构师来说,Spring Batch 中使用的批处理的整体概念应该是熟悉和熟悉的。有“作业”和“步骤”以及开发人员提供的称为ItemReader和的处理单元ItemWriter。但是,由于 Spring 模式、操作、模板、回调和惯用语,有以下机会:

  • 在遵守明确的关注点分离方面有显着改善。

  • 清晰描述的架构层和作为接口提供的服务。

  • 简单和默认的实现,允许快速采用和开箱即用的易用性。

  • 可扩展性显着增强。

下图是已经使用了几十年的批处理参考架构的简化版本。它概述了构成批处理领域语言的组件。这个架构框架是一个蓝图,已经在过去几代平台(COBOL/Mainframe、C/Unix,以及现在的 Java/anywhere)上经过数十年的实施证明。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了在健壮的、可维护的系统中常见的层、组件和技术服务的物理实现,这些系统用于解决从简单到复杂的批处理应用程序的创建,以及用于解决非常复杂的处理需求的基础设施和扩展。

图 2.1:批次定型
图 1. 批量定型

上图突出了构成 Spring Batch 领域语言的关键概念。一个 Job 有一对多的步骤,每个步骤正好有一个ItemReader、一个ItemProcessor和一个ItemWriter。需要启动一个作业(使用 JobLauncher),并且需要存储有关当前运行进程的元数据(在 中 JobRepository)。

工作

本节描述与批处理作业概念相关的构造型。AJob是封装整个批处理过程的实体。与其他 Spring 项目一样,aJob与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以称为“作业配置”。但是, Job它只是整个层次结构的顶部,如下图所示:

工作层次结构
图 2. 作业层次结构

在 Spring Batch 中,aJob只是Step实例的容器。它将逻辑上属于流中的多个步骤组合在一起,并允许配置所有步骤的全局属性,例如可重新启动性。作业配置包含:

  • 作业的简单名称。

  • Step实例的定义和排序。

  • 作业是否可重新启动。

对于那些使用 Java 配置的人,Spring Batch 以SimpleJob类的形式提供了 Job 接口的默认实现,它在Job. 使用基于 java 的配置时,构建器集合可用于实例化 a Job,如以下示例所示:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

对于使用 XML 配置的人,Spring Batch 以类Job的形式提供了接口的默认实现,它在. 但是,批处理命名空间抽象出直接实例化它的需要。相反,可以使用该元素,如下例所示:SimpleJobJob<job>

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

作业实例

AJobInstance指的是逻辑作业运行的概念。考虑一个应该在一天结束时运行一次的批处理作业,例如Job上图中的“EndOfDay”。有一个“EndOfDay”作业,但Job必须单独跟踪每个单独的运行。在这项工作的情况下,JobInstance每天有一个合乎逻辑的。例如,有 1 月 1 日运行、1 月 2 日运行等。如果 1 月 1 日运行第一次失败并在第二天再次运行,则仍然是 1 月 1 日运行。(通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日的运行会处理 1 月 1 日的数据)。因此,每个都JobInstance可以有多个执行(JobExecution本章后面会详细讨论),并且只有一个JobInstance对应于特定的Job和识别JobParameters可以在给定的时间运行。

a 的定义与JobInstance要加载的数据完全无关。完全取决于ItemReader实现来确定如何加载数据。例如,在 EndOfDay 场景中,数据上可能有一列指示数据所属的“生效日期”或“计划日期”。因此,1 月 1 日的运行将仅加载第 1 次的数据,而 1 月 2 日的运行将仅使用第 2 次的数据。因为这个决定很可能是一个商业决定,所以它留给了 ItemReader决定。但是,使用相同的方法JobInstance确定是否使用ExecutionContext先前执行的“状态”(即,本章稍后讨论的 )。使用新的JobInstance表示“从头开始”,使用现有实例通常表示“从你离开的地方开始”。

作业参数

讨论过JobInstance它与约伯有何不同,自然要问的问题是:“一个人JobInstance与另一个人有何区别?” 答案是: JobParameters。一个JobParameters对象包含一组用于启动批处理作业的参数。它们可用于识别,甚至可在运行期间用作参考数据,如下图所示:

作业参数
图 3. 作业参数

在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个Job,但它有两个JobParameter对象:一个以作业参数 01-01-2017 开始,另一个以 01-02-2017 的参数开始。因此,合同可以定义为:JobInstance= Job + 识别JobParameters。这允许开发人员有效地控制 a JobInstance的定义方式,因为他们控制传入的参数。

并非所有作业参数都需要有助于识别 JobInstance. 默认情况下,他们这样做。但是,该框架还允许提交Job带有不影响 a 身份的参数的 a JobInstance

作业执行

AJobExecution指的是单次尝试运行 Job 的技术概念。执行可能以失败或成功结束,但JobInstance除非执行成功完成,否则对应于给定执行的执行不被视为完成。以前面描述的 EndOfDayJob为例,考虑JobInstance01-01-2017 的第一次运行失败。如果使用与第一次运行 (01-01-2017) 相同的识别作业参数再次运行它,JobExecution则会创建一个新的。但是,仍然只有一个JobInstance

AJob定义了作业是什么以及如何执行,aJobInstance是一个纯粹的组织对象,用于将执行分组在一起,主要是为了启用正确的重启语义。然而, AJobExecution是运行期间实际发生的事情的主要存储机制,并且包含更多必须控制和持久化的属性,如下表所示:

表 1. JobExecution 属性

财产

定义

地位

BatchStatus指示执行状态的对象。运行时,它是 BatchStatus#STARTED. 如果失败,则为BatchStatus#FAILED. 如果它成功完成,它是BatchStatus#COMPLETED

开始时间

Ajava.util.Date表示开始执行时的当前系统时间。如果作业尚未开始,则此字段为空。

时间结束

Ajava.util.Date表示执行完成时的当前系统时间,无论是否成功。如果作业尚未完成,则该字段为空。

退出状态

,ExitStatus表示运行的结果。这是最重要的,因为它包含一个返回给调用者的退出代码。有关详细信息,请参阅第 5 章。如果作业尚未完成,则该字段为空。

创建时间

A表示第一次持久化java.util.Date时的当前系统时间。JobExecution作业可能还没有开始(因此没有开始时间),但它总是有一个 createTime,这是管理作业级别的框架所需要的 ExecutionContexts

最近更新时间

Ajava.util.Date表示最后一次JobExecution持久化 a。如果作业尚未开始,则此字段为空。

执行上下文

包含需要在执行之间持久化的任何用户数据的“属性包”。

失败异常

执行过程中遇到的异常列表Job。如果在Job.

这些属性很重要,因为它们是持久的,可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay 作业在晚上 9:00 执行并在 9:30 失败,则在批处理元数据表中创建以下条目:

表 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

识别

1

日期

时间表.日期

2017-01-01

真的

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

开始时间

时间结束

地位

1

1

2017-01-01 21:00

2017-01-01 21:30

失败的

为了清晰和格式化,列名可能已被缩写或删除。

既然作业失败了,假设需要一整晚才能确定问题,因此“批处理窗口”现在已关闭。进一步假设窗口从晚上 9:00 开始,作业在 01-01 再次开始,从中断的地方开始并在 9:30 成功完成。因为现在是第二天,01-02 作业也必须运行,然后在 9:31 开始,并在 10:30 以正常的一小时时间完成。没有要求一个JobInstance接一个地开始,除非这两个作业有可能尝试访问相同的数据,从而导致数据库级别的锁定问题。完全由调度程序决定何时Job运行 a。因为它们是分开的JobInstances, Spring Batch 不会尝试阻止它们同时运行。(尝试JobInstance在另一个已经在运行时运行相同的结果会导致 JobExecutionAlreadyRunningException被抛出)。现在应该在 和 表中都有一个额外的条目,JobInstance并且在JobParameters表中应该有两个额外的条目, JobExecution如下表所示:

表 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

识别

1

日期

时间表.日期

2017-01-01 00:00:00

真的

2

日期

时间表.日期

2017-01-01 00:00:00

真的

3

日期

时间表.日期

2017-01-02 00:00:00

真的

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

开始时间

时间结束

地位

1

1

2017-01-01 21:00

2017-01-01 21:30

失败的

2

1

2017-01-02 21:00

2017-01-02 21:30

完全的

3

2

2017-01-02 21:31

2017-01-02 22:29

完全的

为了清晰和格式化,列名可能已被缩写或删除。

AStep是一个域对象,它封装了批处理作业的一个独立的、顺序的阶段。因此,每个 Job 完全由一个或多个步骤组成。AStep包含定义和控制实际批处理所需的所有信息。这必然是一个模糊的描述,因为任何给定的内容Step都由编写Job. AStep可以像开发人员希望的那样简单或复杂。一个简单的Step可能将数据从文件加载到数据库中,需要很少或不需要代码(取决于使用的实现)。更复杂的 Step可能具有作为处理的一部分应用的复杂业务规则。与 a 一样Job,aStep有一个个体StepExecution与 unique 相关联 JobExecution,如下图所示:

图 2.1:包含步骤的作业层次结构
图 4. 带有步骤的作业层次结构

分步执行

AStepExecution表示执行 a 的单次尝试StepStepExecution 每次运行 a 时都会创建一个新的Step,类似于JobExecution. 但是,如果一个步骤因为它之前的步骤失败而无法执行,则不会为它持久执行。A StepExecution仅在其Step实际启动时创建。

Step执行由StepExecution类的对象表示。每个执行都包含对其相应步骤和JobExecution事务相关数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行中持久保存的任何数据,例如重新启动所需的统计信息或状态信息。下表列出了 的属性StepExecution

表 8. StepExecution 属性

财产

定义

地位

BatchStatus指示执行状态的对象。运行时,状态为BatchStatus.STARTED。如果失败,则状态为BatchStatus.FAILED。如果成功完成,状态为BatchStatus.COMPLETED

开始时间

Ajava.util.Date表示开始执行时的当前系统时间。如果步骤尚未开始,则此字段为空。

时间结束

Ajava.util.Date表示执行完成时的当前系统时间,无论是否成功。如果步骤尚未退出,则此字段为空。

退出状态

ExitStatus表示执行结果。这是最重要的,因为它包含一个返回给调用者的退出代码。有关详细信息,请参阅第 5 章。如果作业尚未退出,则此字段为空。

执行上下文

包含需要在执行之间持久化的任何用户数据的“属性包”。

读取计数

已成功读取的项目数。

写计数

已成功写入的项目数。

提交计数

已为此执行提交的事务数。

回滚计数

被控制的业务事务Step被回滚的次数。

读取跳过计数

失败的次数read,导致跳过的项目。

进程跳过计数

失败的次数process,导致跳过的项目。

过滤器计数

已被“过滤”的项目数ItemProcessor

writeSkipCount

失败的次数write,导致跳过的项目。

执行上下文

AnExecutionContext表示由框架持久化和控制的键/值对的集合,以便开发人员可以在一个地方存储范围为StepExecution对象或JobExecution对象的持久状态。对于熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。最好的使用例子是方便重启。以平面文件输入为例,在处理单个行时,框架会定期保存ExecutionContextat commit 点。这样做可以ItemReader存储其状态,以防在运行期间发生致命错误或即使断电。所需要的只是将当前读取的行数放入上下文中,如下例所示,其余的由框架完成:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

JobStereotypes 部分中的 EndOfDay 示例为例,假设有一个步骤“loadData”将文件加载到数据库中。第一次运行失败后,元数据表将类似于以下示例:

表 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

日期

时间表.日期

2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

开始时间

时间结束

地位

1

1

2017-01-01 21:00

2017-01-01 21:30

失败的

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

开始时间

时间结束

地位

1

1

加载数据

2017-01-01 21:00

2017-01-01 21:30

失败的

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在前面的案例中,Step运行了 30 分钟并处理了 40,321 个“片段”,在这种情况下,这将代表文件中的行。该值在框架每次提交之前更新,并且可以包含与 ExecutionContext. 在提交之前收到通知需要各种 StepListener实现(或ItemStream)之一,本指南稍后将对此进行更详细的讨论。与前面的示例一样,假设Job第二天重新启动。当它重新启动时,ExecutionContext上次运行的值会从数据库中重新构成。当ItemReader打开时,它可以检查它是否在上下文中存储了任何状态并从那里初始化自己,如下面的示例所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在上面的代码运行之后,当前行是 40,322,允许Step 从它停止的地方重新开始。ExecutionContext也可用于需要保留有关运行本身的统计信息。例如,如果一个平面文件包含跨多行存在的处理订单,则可能需要存储已处理的订单数量(这与读取的行数有很大不同),以便可以发送电子邮件至Step以正文中处理的订单总数结尾。该框架为开发人员处理存储此内容,以便使用个人正确确定其范围JobInstance。很难知道现有的ExecutionContext应该使用或不使用。例如,使用上面的 'EndOfDay' 示例,当 01-01 运行第二次再次开始时,框架识别出它是相同的JobInstance,并且在单独的Step基础上,ExecutionContext从数据库中拉出并交给它(作为 StepExecution) 到Step自身的一部分。相反,对于 01-02 运行,框架会识别出它是一个不同的实例,因此必须将空上下文交给 Step. 框架为开发人员做出了许多此类决定,以确保在正确的时间向他们提供状态。同样重要的是要注意在任何给定时间都ExecutionContext存在一个。StepExecution的客户ExecutionContext应该小心,因为这会创建一个共享的密钥空间。因此,在输入值时应注意确保没有数据被覆盖。但是,Step上下文中绝对不存储任何数据,因此无法对框架产生不利影响。

同样重要的是要注意,至少有一个ExecutionContextper JobExecution和一个 for each StepExecution。例如,考虑以下代码片段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如评论中所述,ecStep不等于ecJob. 他们是两个不同的 ExecutionContexts。范围为 的Step保存在 中的每个提交点 Step,而范围为 Job 的则保存在每次Step执行之间。

作业存储库

JobRepository是上述所有刻板印象的持久性机制。JobLauncher它为、JobStep实现提供 CRUD 操作。当 a Job首次启动时,aJobExecution从存储库中获取,并且在执行过程中,StepExecution通过JobExecution将实现传递到存储库来持久化它们。

Spring Batch XML 命名空间支持JobRepository使用标签配置实例,<job-repository>如下例所示:

<job-repository id="jobRepository"/>

使用 Java 配置时,@EnableBatchProcessing注解提供了一个 JobRepository作为开箱即用自动配置的组件之一。

作业启动器

JobLauncherJob表示使用给定集合 启动 a 的简单接口JobParameters,如以下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

预计实现会从 中获取有效JobExecutionJobRepository并执行Job.

项目阅读器

ItemReader是一种抽象,表示一次检索 a 的输入Step。当ItemReader已经用尽它可以提供的项目时,它通过返回来表示这一点nullItemReader有关接口及其各种实现 的更多详细信息,请参阅Readers 和 Writers

物品作者

ItemWriter是一种抽象,表示一次输出一个Step、一个批次或一大块项目。通常,anItemWriter不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的项目。ItemWriter有关接口及其各种实现 的更多详细信息,请参阅Readers 和 Writers

项目处理器

ItemProcessor是表示项目的业务处理的抽象。当ItemReader读取一个项目并ItemWriter写入它们时,它们 ItemProcessor提供了一个访问点来转换或应用其他业务处理。如果在处理该项目时确定该项目无效,则返回 null表示该项目不应被写出。更多关于 ItemProcessor接口的细节可以在 Readers And Writers中找到。

批处理命名空间

前面列出的许多领域概念都需要在 Spring 中进行配置 ApplicationContext。虽然上述接口的实现可以在标准 bean 定义中使用,但为了便于配置,已经提供了命名空间,如下例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了批处理命名空间,就可以使用它的任何元素。有关配置作业的更多信息,请参阅配置和运行作业。有关配置 a 的更多信息,请Step参阅 配置步骤


1. see XML Configuration