配置和运行作业

域部分,使用下图作为指导,讨论了整体架构设计:

图 2.1:批次定型
图 1. 批量定型

虽然该Job对象可能看起来像是一个简单的步骤容器,但开发人员必须了解许多配置选项。Job此外,对于 a将如何运行以及在该运行期间如何存储其元数据,有许多考虑因素。本章将解释Job.

配置作业

接口有多种实现Job。但是,构建器抽象出配置上的差异。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

A Job(通常是其中的任何Step一个)需要一个JobRepository. 的配置JobRepository是通过BatchConfigurer.

上面的示例说明了Job由三个Step实例组成的 a。与作业相关的构建器还可以包含其他有助于并行化 ( Split)、声明式流控制 ( Decision) 和流定义外部化( ) 的元素Flow

无论您使用 Java 还是 XML,接口都有多种实现Job 。然而,命名空间抽象出配置中的差异。它只有三个必需的依赖项:名称、JobRepositoryStep实例列表。

<job id="footballJob">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s2" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
</job>

此处的示例使用父 bean 定义来创建步骤。 有关内联声明特定步骤详细信息的更多选项,请参阅步骤配置部分。XML 命名空间默认引用 id 为“jobRepository”的存储库,这是一个合理的默认设置。但是,这可以被显式覆盖:

<job id="footballJob" job-repository="specialRepository">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s3" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
</job>

除了步骤之外,作业配置还可以包含其他有助于并行化 ( <split>)、声明式流控制 ( <decision>) 和流定义外部化( ) 的元素<flow/>

可重启性

执行批处理作业时的一个关键问题涉及Job重新启动时的行为。 如果 a已经存在于特定的 ,则启动 aJob被认为是“重新启动” 。理想情况下,所有作业都应该能够从中断的地方开始,但在某些情况下这是不可能的。确保在这种情况下创建新的完全取决于开发人员。但是,Spring Batch 确实提供了一些帮助。如果 a永远不应该重新启动,但应该始终作为 new 的一部分运行,那么 restartable 属性可以设置为“false”。JobExecutionJobInstanceJobInstanceJobJobInstance

以下示例显示如何将restartable字段设置为falseXML 格式:

XML 配置
<job id="footballJob" restartable="false">
    ...
</job>

以下示例显示如何在 Java中将restartable字段设置为:false

Java 配置
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart()
                     ...
                     .build();
}

换句话说,将 restartable 设置为 false 意味着“这 Job不支持重新启动”。重新启动Job不可重新启动的 a 会导致 aJobRestartException被抛出。

Job job = new SimpleJob();
job.setRestartable(false);

JobParameters jobParameters = new JobParameters();

JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);

try {
    jobRepository.createJobExecution(job, jobParameters);
    fail();
}
catch (JobRestartException e) {
    // expected
}

这段 JUnit 代码展示了如何尝试 JobExecution为不可重新启动的作业创建第一次不会导致任何问题。但是,第二次尝试会抛出一个JobRestartException.

拦截作业执行

在 Job 的执行过程中,通知其生命周期中的各种事件可能很有用,以便可以执行自定义代码。SimpleJob允许通过在适当的时间调用 a 来实现这 一点 JobListener

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

JobListeners可以SimpleJob通过在作业上设置监听器来添加。

以下示例显示如何将侦听器元素添加到 XML 作业定义:

XML 配置
<job id="footballJob">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s2" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
    <listeners>
        <listener ref="sampleListener"/>
    </listeners>
</job>

以下示例显示如何将侦听器方法添加到 Java 作业定义:

Java 配置
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener())
                     ...
                     .build();
}

需要注意的是,afterJob无论成功与否,都会调用该方法Job。如果需要判断成败,可以从 中获取JobExecution,如下:

public void afterJob(JobExecution jobExecution){
    if (jobExecution.getStatus() == BatchStatus.COMPLETED ) {
        //job success
    }
    else if (jobExecution.getStatus() == BatchStatus.FAILED) {
        //job failure
    }
}

该接口对应的注解为:

  • @BeforeJob

  • @AfterJob

从父作业继承

如果一组作业共享相似但不相同的配置,那么定义一个“父”可能会有所帮助, Job具体作业可以从中继承属性。与 Java 中的类继承类似,“子”Job将其元素和属性与父的元素和属性结合起来。

在以下示例中,“baseJob”是一个抽象 Job定义,仅定义了一个侦听器列表。“ Jobjob1”是一个具体定义,它从“baseJob”继承侦听器列表,并将其与自己的侦听器列表合并,以生成 Job具有两个侦听器和一个侦听 器的Step“step1”。

<job id="baseJob" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</job>

<job id="job1" parent="baseJob">
    <step id="step1" parent="standaloneStep"/>

    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</job>

有关更多详细信息,请参阅从父步骤继承部分。

JobParametersValidator

在 XML 命名空间中声明或使用任何子类的 AbstractJob作业可以选择在运行时为作业参数声明验证器。例如,当您需要断言作业以其所有必需参数启动时,这很有用。有一个 DefaultJobParametersValidator可以用来约束简单的强制和可选参数的组合,对于更复杂的约束你可以自己实现接口。

通过作业的子元素通过 XML 命名空间支持验证器的配置,如下例所示:

<job id="job1" parent="baseJob3">
    <step id="step1" parent="standaloneStep"/>
    <validator ref="parametersValidator"/>
</job>

验证器可以指定为引用(如前所示)或 bean 命名空间中的嵌套 bean 定义。

通过 java builder 支持验证器的配置,如以下示例所示:

@Bean
public Job job1() {
    return this.jobBuilderFactory.get("job1")
                     .validator(parametersValidator())
                     ...
                     .build();
}

Java 配置

Spring 3 带来了通过 java 而不是 XML 配置应用程序的能力。从 Spring Batch 2.2.0 开始,可以使用相同的 java config 配置批处理作业。基于 java 的配置有两个组件:@EnableBatchProcessing 注解和两个构建器。

@EnableBatchProcessing工作方式类似于 Spring 家族中的其他 @Enable* 注释。在这种情况下,@EnableBatchProcessing为构建批处理作业提供基本配置。StepScope在这个基本配置中,除了一些可用于自动装配的 bean 之外,还会创建一个实例:

  • JobRepository:bean名称“jobRepository”

  • JobLauncher:bean名称“jobLauncher”

  • JobRegistry:bean名称“jobRegistry”

  • PlatformTransactionManager: bean 名称 "transactionManager"

  • JobBuilderFactory:bean名称“jobBuilders”

  • StepBuilderFactory:bean名称“stepBuilders”

此配置的核心接口是BatchConfigurer. 默认实现提供了上面提到的 bean,并且需要 aDataSource作为上下文中的 bean 来提供。JobRepository 使用此数据源。您可以通过创建接口的自定义实现来自定义这些 bean BatchConfigurer。通常,扩展DefaultBatchConfigurer(如果 BatchConfigurer未找到 a 则提供)并覆盖所需的 getter 就足够了。但是,可能需要从头开始实施您自己的。以下示例显示了如何提供自定义事务管理器:

@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
	return new DefaultBatchConfigurer(dataSource) {
		@Override
		public PlatformTransactionManager getTransactionManager() {
			return new MyTransactionManager();
		}
	};
}

只有一个配置类需要@EnableBatchProcessing注解。一旦你有一个用它注释的类,你将拥有上述所有内容。

有了基本配置,用户就可以使用提供的构建器工厂来配置作业。JobBuilderFactory以下示例显示了使用和配置的两步作业 StepBuilderFactory

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

配置 JobRepository

使用时@EnableBatchProcessingJobRepository为您提供开箱即用的一个。本节介绍如何配置您自己的。

如前所述,JobRepository用于 Spring Batch 中各种持久域对象的基本 CRUD 操作,例如 JobExecutionStepExecution。许多主要框架功能都需要它,例如JobLauncherJobStep.

批处理命名空间抽象了 JobRepository实现及其协作者的许多实现细节。但是,仍然有一些配置选项可用,如以下示例所示:

XML 配置
<job-repository id="jobRepository"
    data-source="dataSource"
    transaction-manager="transactionManager"
    isolation-level-for-create="SERIALIZABLE"
    table-prefix="BATCH_"
	max-varchar-length="1000"/>

上面列出的所有配置选项都不是必需的,除了id. 如果未设置,将使用上面显示的默认值。出于提高认识的目的,它们在上面显示。默认值为 2500,这是示例架构脚本max-varchar-length中长 VARCHAR列的长度。

使用 java 配置时,JobRepository为您提供了一个。如果提供了 a,则开箱即用地提供基于 JDBC 的,如果没有DataSource,则提供基于 JDBC 的Map。但是,您可以JobRepository通过接口的实现来自 定义配置BatchConfigurer

Java 配置
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}
...

除了 dataSource 和 transactionManager 之外,上面列出的配置选项都不是必需的。如果未设置,将使用上面显示的默认值。出于提高认识的目的,它们在上面显示。最大 varchar 长度默认为 2500,这是示例架构脚本中长VARCHAR列 的长度

JobRepository 的事务配置

如果使用命名空间或提供的命名空间FactoryBean,则会自动围绕存储库创建事务建议。这是为了确保批处理元数据(包括故障后重新启动所需的状态)正确持久化。如果存储库方法不是事务性的,那么框架的行为就没有很好的定义。方法属性中的隔离级别create*是单独指定的,以确保在启动作业时,如果两个进程尝试同时启动同一个作业,则只有一个成功。该方法的默认隔离级别是SERIALIZABLE,这是非常激进的。READ_COMMITTED也可以。READ_UNCOMMITTED如果两个进程不太可能以这种方式发生冲突,那就没问题了。然而,由于调用create*方法很短,不太可能 SERIALIZED出问题,只要数据库平台支持。但是,这可以被覆盖。

以下示例显示了如何覆盖 XML 中的隔离级别:

XML 配置
<job-repository id="jobRepository"
                isolation-level-for-create="REPEATABLE_READ" />

以下示例显示了如何覆盖 Java 中的隔离级别:

Java 配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

如果不使用命名空间或工厂 bean,那么使用 AOP 配置存储库的事务行为也很重要。

以下示例显示了如何在 XML 中配置存储库的事务行为:

XML 配置
<aop:config>
    <aop:advisor
           pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
    <advice-ref="txAdvice" />
</aop:config>

<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="*" />
    </tx:attributes>
</tx:advice>

前面的片段几乎可以按原样使用,几乎没有变化。还要记住包括适当的命名空间声明,并确保 spring-tx 和 spring-aop(或整个 Spring)在类路径上。

以下示例显示了如何在 Java 中配置存储库的事务行为:

Java 配置
@Bean
public TransactionProxyFactoryBean baseProxy() {
	TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
	Properties transactionAttributes = new Properties();
	transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
	transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
	transactionProxyFactoryBean.setTarget(jobRepository());
	transactionProxyFactoryBean.setTransactionManager(transactionManager());
	return transactionProxyFactoryBean;
}

更改表前缀

另一个可修改的属性JobRepository是元数据表的表前缀。默认情况下,它们都以BATCH_. BATCH_JOB_EXECUTION并且 BATCH_STEP_EXECUTION是两个例子。但是,有可能修改此前缀的原因。如果模式名称需要添加到表名称之前,或者如果同一模式中需要多组元数据表,则需要更改表前缀:

以下示例显示如何更改 XML 中的表前缀:

XML 配置
<job-repository id="jobRepository"
                table-prefix="SYSTEM.TEST_" />

以下示例显示了如何在 Java 中更改表前缀:

Java 配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_");
    return factory.getObject();
}

鉴于前面的更改,对元数据表的每个查询都以 SYSTEM.TEST_. BATCH_JOB_EXECUTION被称为系统。TEST_JOB_EXECUTION.

只有表前缀是可配置的。表名和列名不是。

内存存储库

在某些情况下,您可能不希望将域对象持久化到数据库中。一个原因可能是速度。在每个提交点存储域对象需要额外的时间。另一个原因可能是您不需要为特定工作保留状态。出于这个原因,Spring 批处理提供了Map作业存储库的内存版本。

以下示例显示了MapJobRepositoryFactoryBeanXML 中的包含:

XML 配置
<bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager"/>
</bean>

以下示例显示了MapJobRepositoryFactoryBeanJava 中的包含:

Java 配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

请注意,内存存储库是易失性的,因此不允许在 JVM 实例之间重新启动。它也不能保证两个具有相同参数的作业实例同时启动,并且不适合在多线程作业或本地分区的作业中使用Step。因此,只要您需要这些功能,就可以使用存储库的数据库版本。

然而,它确实需要定义事务管理器,因为存储库中有回滚语义,并且因为业务逻辑可能仍然是事务性的(例如 RDBMS 访问)。出于测试目的,许多人发现它 ResourcelessTransactionManager很有用。

和相关的MapJobRepositoryFactoryBean类在 v4 中已被弃用,并计划在 v5 中删除。如果要使用内存中的作业存储库,可以使用嵌入式数据库,如 H2、Apache Derby 或 HSQLDB。有几种方法可以创建嵌入式数据库并在 Spring Batch 应用程序中使用它。一种方法是使用Spring JDBC中的 API :

@Bean
public DataSource dataSource() {
    return new EmbeddedDatabaseBuilder()
            .setType(EmbeddedDatabaseType.H2)
            .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
            .addScript("/org/springframework/batch/core/schema-h2.sql")
            .build();
}

在应用程序上下文中将嵌入式数据源定义为 bean 后,如果您使用@EnableBatchProcessing. 否则,您可以使用基于 JDBC 的手动配置它JobRepositoryFactoryBean,如配置 JobRepository 部分所示。

存储库中的非标准数据库类型

如果您使用的数据库平台不在受支持的平台列表中,如果 SQL 变体足够接近,您可以使用其中一种受支持的类型。为此,您可以使用 rawJobRepositoryFactoryBean而不是命名空间快捷方式,并使用它将数据库类型设置为最接近的匹配。

以下示例显示如何使用JobRepositoryFactoryBeanXML 中将数据库类型设置为最接近的匹配:

XML 配置
<bean id="jobRepository" class="org...JobRepositoryFactoryBean">
    <property name="databaseType" value="db2"/>
    <property name="dataSource" ref="dataSource"/>
</bean>

以下示例显示了如何使用JobRepositoryFactoryBeanJava 将数据库类型设置为最接近的匹配项:

Java 配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setDatabaseType("db2");
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

( 如果未指定,则 JobRepositoryFactoryBean尝试从来自 Spring 框架的标准实现)。DataSourceincrementerFactory

如果即使这样也不起作用,或者您没有使用 RDBMS,那么唯一的选择可能是实现依赖的各种Dao 接口并以正常的 Spring 方式手动连接一个。SimpleJobRepository

配置 JobLauncher

使用时@EnableBatchProcessingJobRegistry为您提供开箱即用的一个。本节介绍如何配置您自己的。

JobLauncher接口的最基本实现是SimpleJobLauncher. 它唯一需要的依赖项是 a JobRepository,以便获得执行。

以下示例显示了SimpleJobLauncherXML 格式的 a:

XML 配置
<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

以下示例显示了SimpleJobLauncherJava 中的 a:

Java 配置
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
	SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
	jobLauncher.setJobRepository(jobRepository);
	jobLauncher.afterPropertiesSet();
	return jobLauncher;
}
...

获取到JobExecution后,将其传递给 的 execute 方法Job,最终返回JobExecution给调用者,如下图所示:

作业启动器序列
图 2. Job Launcher 序列

该序列很简单,从调度程序启动时效果很好。但是,尝试从 HTTP 请求启动时会出现问题。在这种情况下,启动需要异步完成,以便SimpleJobLauncher立即返回给它的调用者。这是因为在长时间运行的进程(例如批处理)所需的时间内保持 HTTP 请求打开并不是一个好习惯。下图显示了一个示例序列:

异步作业启动器序列
图 3. 异步作业启动器序列

SimpleJobLauncher可以通过 配置TaskExecutor.

以下 XML 示例显示了一个SimpleJobLauncher配置为立即返回:

XML 配置
<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor">
        <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    </property>
</bean>

以下 Java 示例显示了一个SimpleJobLauncher配置为立即返回:

Java 配置
@Bean
public JobLauncher jobLauncher() {
	SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
	jobLauncher.setJobRepository(jobRepository());
	jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
	jobLauncher.afterPropertiesSet();
	return jobLauncher;
}

springTaskExecutor 接口的任何实现都可以用来控制作业的异步执行方式。

运行作业

启动批处理作业至少需要两件事: Job要启动的和 JobLauncher. 两者都可以包含在相同的上下文或不同的上下文中。例如,如果从命令行启动作业,将为每个作业实例化一个新的 JVM,因此每个作业都有自己的JobLauncher. 但是,如果从 范围内的 Web 容器中运行, HttpRequest通常会有一个 JobLauncher配置为异步作业启动,多个请求将调用它来启动它们的作业。

从命令行运行作业

对于想要从企业调度程序运行其作业的用户,命令行是主要界面。这是因为大多数调度程序(Quartz 除外,除非使用 NativeJob)直接与操作系统进程一起工作,主要由 shell 脚本启动。除了 shell 脚本之外,还有很多方法可以启动 Java 进程,例如 Perl、Ruby,甚至是 ant 或 maven 等“构建工具”。但是,由于大多数人都熟悉 shell 脚本,因此本示例将重点介绍它们。

CommandLineJobRunner

因为启动作业的脚本必须启动 Java 虚拟机,所以需要有一个带有 main 方法的类作为主要入口点。Spring Batch 提供了一个实现这个目的的实现: CommandLineJobRunner. 需要注意的是,这只是引导应用程序的一种方式,但启动 Java 进程的方式有很多种,并且绝不应将此类视为确定性的。执行CommandLineJobRunner 四项任务:

  • 加载适当的 ApplicationContext

  • 将命令行参数解析为 JobParameters

  • 根据参数找到合适的工作

  • 使用JobLauncher应用程序上下文中提供的启动作业。

所有这些任务都只使用传入的参数来完成。以下是必需的参数:

表 1. CommandLineJobRunner 参数

工作路径

将用于创建ApplicationContext. 该文件应包含运行完整作业所需的所有内容

职位名称

要运行的作业的名称。

这些参数必须首先传入路径,然后传入名称。这些之后的所有参数都被认为是作业参数,被转换为一个 JobParameters 对象,并且必须采用'name=value'的格式。

以下示例显示了作为作业参数传递给 XML 中定义的作业的日期:

<bash$ java CommandLineJobRunner endOfDayJob.xml endOfDay schedule.date(date)=2007/05/05

以下示例显示了作为作业参数传递给 Java 中定义的作业的日期:

<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2007/05/05

默认情况下,CommandLineJobRunner使用 aDefaultJobParametersConverter隐式转换键/值对以识别作业参数。+但是,可以通过分别在它们前面加上或来明确指定哪些作业参数正在识别,哪些不是-

在以下示例中,schedule.date是识别作业参数,而vendor.id不是:

<bash$ java CommandLineJobRunner endOfDayJob.xml endOfDay \
                                 +schedule.date(date)=2007/05/05 -vendor.id=123
<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay \
                                 +schedule.date(date)=2007/05/05 -vendor.id=123

可以使用自定义覆盖此行为JobParametersConverter

在大多数情况下,您会希望使用清单在 jar 中声明您的主类,但为简单起见,直接使用该类。此示例使用domainLanguageOfBatch中的相同“EndOfDay”示例。第一个参数是“endOfDayJob.xml”,它是包含 Job. 第二个参数“endOfDay”表示作业名称。最后一个参数“schedule.date(date)=2007/05/05”被转换为 JobParameters 对象。

以下示例显示了endOfDayXML 中的示例配置:

<job id="endOfDay">
    <step id="step1" parent="simpleStep" />
</job>

<!-- Launcher details removed for clarity -->
<beans:bean id="jobLauncher"
         class="org.springframework.batch.core.launch.support.SimpleJobLauncher" />

在大多数情况下,您会希望使用清单在 jar 中声明您的主类,但为简单起见,直接使用该类。此示例使用domainLanguageOfBatch中的相同“EndOfDay”示例。第一个参数是“io.spring.EndOfDayJobConfiguration”,它是包含作业的配置类的完全限定类名。第二个参数“endOfDay”表示作业名称。最后一个参数 'schedule.date(date)=2007/05/05' 被转换为一个 JobParameters对象。java配置的一个例子如下:

以下示例显示了endOfDayJava 中的示例配置:

@Configuration
@EnableBatchProcessing
public class EndOfDayJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job endOfDay() {
        return this.jobBuilderFactory.get("endOfDay")
    				.start(step1())
    				.build();
    }

    @Bean
    public Step step1() {
        return this.stepBuilderFactory.get("step1")
    				.tasklet((contribution, chunkContext) -> null)
    				.build();
    }
}

前面的例子过于简单化了,因为在 Spring Batch 中运行批处理作业通常有更多的要求,但它可以显示CommandLineJobRunner:JobJobLauncher.

退出代码

从命令行启动批处理作业时,通常使用企业调度程序。大多数调度程序都相当愚蠢,只在进程级别工作。这意味着他们只知道一些操作系统进程,例如他们正在调用的 shell 脚本。在这种情况下,向调度程序反馈作业成功或失败的唯一方法是通过返回码。返回码是进程返回给调度程序的一个数字,表示运行的结果。在最简单的情况下:0 是成功,1 是失败。但是,可能还有更复杂的场景:如果job A返回4 kick off job B,如果返回5 kick off job C。这种行为是在调度器级别配置的,但重要的是,诸如 Spring Batch 之类的处理框架提供了一种为特定批处理作业返回“退出代码”的数字表示的方法。在 Spring Batch 中,这被封装在一个ExitStatus,这在第 5 章中有更详细的介绍。为了讨论退出代码,唯一需要知道的是 an ExitStatus具有由框架(或开发人员)设置的退出代码属性,并作为JobExecutionJobLauncher. _ 使用 接口CommandLineJobRunner将此字符串值转换为数字ExitCodeMapper

public interface ExitCodeMapper {

    public int intValue(String exitCode);

}

an 的基本约定 ExitCodeMapper是,给定一个字符串退出代码,将返回一个数字表示。作业运行程序使用的默认实现是SimpleJvmExitCodeMapper 返回 0 表示完成,1 表示一般错误,2 表示任何作业运行程序错误,例如无法 Job在提供的上下文中找到 a。如果需要比上述 3 个值更复杂的值,则ExitCodeMapper必须提供接口的自定义实现。因为 CommandLineJobRunner是创建 的类,ApplicationContext因此不能“连接在一起”,任何需要覆盖的值都必须自动连接。这意味着如果 ExitCodeMapperBeanFactory,它将在创建上下文后注入到运行器中。提供您自己需要做的 ExitCodeMapper就是将实现声明为根级别 bean,并确保它是运行 ApplicationContext程序加载的 bean 的一部分。

从 Web 容器中运行作业

从历史上看,诸如批处理作业之类的离线处理是从命令行启动的,如上所述。但是,在许多情况下,从 an 启动HttpRequest是更好的选择。许多此类用例包括报告、临时作业运行和 Web 应用程序支持。因为根据定义,批处理作业是长时间运行的,所以最重要的问题是确保异步启动作业:

来自 Web 容器的异步作业启动器序列
图 4. 来自 Web 容器的异步作业启动器序列

本例中的控制器是 Spring MVC 控制器。关于 Spring MVC 的更多信息可以在这里找到:https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#mvc。控制器Job使用 JobLauncher已配置为 异步启动的 a 启动 a ,它立即返回 a JobExecution。可能仍在运行,但是,这种 Job非阻塞行为允许控制器立即返回,这在处理HttpRequest. 下面是一个例子:

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

高级元数据使用

到目前为止,已经讨论了JobLauncher和接口。JobRepository它们共同代表了作业的简单启动和批处理域对象的基本 CRUD 操作:

作业存储库
图 5. 作业存储库

AJobLauncher使用 JobRepository来创建新 JobExecution对象并运行它们。 Job和以后的实现在 Job 运行期间Step使用相同的基本更新相同的执行。JobRepository基本的操作对于简单的场景来说已经足够了,但是在具有数百个批处理作业和复杂调度要求的大批处理环境中,需要对元数据进行更高级的访问:

作业存储库高级
图 6. 高级作业存储库访问

将在下面讨论的JobExplorerJobOperator接口添加了用于查询和控制元数据的附加功能。

查询仓库

在任何高级功能之前,最基本的需求是能够查询存储库以获取现有执行。此功能由JobExplorer接口提供:

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

从上面的方法签名中可以明显看出,JobExplorer它是 的只读版本JobRepository,并且与 一样JobRepository,它可以通过使用工厂 bean 轻松配置:

以下示例显示如何JobExplorer在 XML 中配置 a:

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

以下示例显示了如何JobExplorer在 Java 中配置 a:

Java 配置
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...

在本章前面,我们注意到JobRepository可以修改 的表前缀以允许不同的版本或模式。因为JobExplorer使用相同的表,它也需要设置前缀的能力。

以下示例显示了如何JobExplorer在 XML 中为 a 设置表前缀:

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
		p:tablePrefix="SYSTEM."/>

以下示例显示了如何JobExplorer在 Java 中为 a 设置表前缀:

Java 配置
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...

工作登记处

A JobRegistry(及其父接口JobLocator)不是强制性的,但如果您想跟踪上下文中可用的作业,它会很有用。当作业在其他地方(例如,在子上下文中)创建时,它对于在应用程序上下文中集中收集作业也很有用。自定义JobRegistry实现也可用于操作已注册作业的名称和其他属性。该框架只提供了一种实现,它基于从作业名称到作业实例的简单映射。

以下示例显示如何为JobRegistryXML 中定义的作业包含 a:

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

以下示例显示了如何为JobRegistryJava 中定义的作业包含 a:

使用时@EnableBatchProcessingJobRegistry为您提供开箱即用的一个。如果你想自己配置:

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the getter in the SimpleBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...

有两种方法可以JobRegistry自动填充:使用 bean 后处理器和使用注册器生命周期组件。这两种机制将在以下部分中进行描述。

JobRegistryBeanPostProcessor

这是一个 bean 后处理器,可以在创建所有作业时对其进行注册。

以下示例显示了如何为JobRegistryBeanPostProcessorXML 中定义的作业包含:

XML 配置
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

以下示例显示了如何为JobRegistryBeanPostProcessorJava 中定义的作业包含:

Java 配置
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry());
    return postProcessor;
}

虽然不是绝对必要的,但示例中的后处理器已被赋予一个 id,以便它可以包含在子上下文中(例如,作为父 bean 定义)并导致在那里创建的所有作业也自动注册。

AutomaticJobRegistrar

这是一个生命周期组件,它创建子上下文并在创建时从这些上下文中注册作业。这样做的一个好处是,虽然子上下文中的作业名称在注册表中仍然必须是全局唯一的,但它们的依赖关系可以具有“自然”名称。例如,您可以创建一组 XML 配置文件,每个文件只有一个 Job,但都有不同的定义,ItemReader具有相同的 bean 名称,例如“reader”。如果所有这些文件都被导入到同一个上下文中,读取器定义会发生冲突并相互覆盖,但是使用自动注册器可以避免这种情况。这使得集成从应用程序的单独模块贡献的作业变得更加容易。

以下示例显示了如何为AutomaticJobRegistrarXML 中定义的作业包含:

XML 配置
<bean class="org.spr...AutomaticJobRegistrar">
   <property name="applicationContextFactories">
      <bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
         <property name="resources" value="classpath*:/config/job*.xml" />
      </bean>
   </property>
   <property name="jobLoader">
      <bean class="org.spr...DefaultJobLoader">
         <property name="jobRegistry" ref="jobRegistry" />
      </bean>
   </property>
</bean>

以下示例显示了如何为AutomaticJobRegistrarJava 中定义的作业包含:

Java 配置
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

注册器有两个强制属性,一个是一个数组 ApplicationContextFactory(这里是从一个方便的工厂 bean 创建的),另一个是一个 JobLoader. JobLoader 负责管理子上下文的生命周期并在JobRegistry.

ApplicationContextFactory负责创建子上下文,最常见的用法是 使用ClassPathXmlApplicationContextFactory. 这个工厂的特点之一是默认情况下它会将一些配置从父上下文复制到子上下文。PropertyPlaceholderConfigurer因此,例如,如果它应该与父级相同,则不必在子级中重新定义 或 AOP 配置。

如果需要,AutomaticJobRegistrar可以与 a 一起使用JobRegistryBeanPostProcessor (只要DefaultJobLoader也使用了)。例如,如果在主父上下文以及子位置中定义了作业,这可能是可取的。

作业操作员

如前所述,JobRepository 提供对元数据的 CRUD 操作,以及 JobExplorer提供对元数据的只读操作。但是,这些操作在一起用于执行常见的监视任务时最有用,例如停止、重新启动或汇总作业,这通常由批处理操作员完成。Spring Batch 通过接口提供这些类型的操作 JobOperator

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

上述操作表示来自许多不同接口的方法,例如 JobLauncherJobRepositoryJobExplorerJobRegistry。出于这个原因,提供的JobOperator, SimpleJobOperator, 的实现有很多依赖。

以下示例显示了SimpleJobOperatorXML 中的典型 bean 定义:

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

以下示例显示了SimpleJobOperatorJava 中的典型 bean 定义:

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();

	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }

如果您在作业存储库上设置表前缀,请不要忘记在作业资源管理器上也设置它。

JobParametersIncrementer

上的大部分方法JobOperator都是不言自明的,更详细的解释可以 在接口的javadoc上找到。但是,该 startNextInstance方法值得注意。此方法将始终启动 Job 的新实例。如果 a 中存在严重问题 JobExecution并且需要从头开始重新开始作业,这将非常有用。与 它不同JobLauncher的是,它需要一个新 对象,如果参数与任何先前的参数集不同,则该 JobParameters对象将触发一个 新对象,该方法将使用 绑定到 来强制到一个新实例:JobInstancestartNextInstanceJobParametersIncrementerJobJob

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

的约定JobParametersIncrementer是,给定一个JobParameters 对象,它将通过增加它可能包含的任何必要值来返回“下一个”JobParameters 对象。此策略很有用,因为框架无法知道哪些更改会JobParameters使其成为“下一个”实例。例如,如果唯一的值 JobParameters是日期,并且应该创建下一个实例,那么该值是否应该增加一天?还是一周(例如,如果工作是每周一次)?对于任何有助于识别作业的数值都可以这样说,如下所示:

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,键为“run.id”的值用于区分JobInstances. 如果 JobParameters传入的是 null,则可以假设Job之前从未运行过,因此可以返回其初始状态。但是,如果不是,则获取旧值,加一并返回。

对于在 XML 中定义的作业,可以通过命名空间中的 'incrementer' 属性关联一个增量器Job,如下所示:

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

对于 Java 中定义的作业,可以通过 incrementer构建器中提供的方法将增量器与“作业”相关联,如下所示:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}

停止作业

最常见的用例之一 JobOperator是优雅地停止作业:

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即的,因为没有办法强制立即关闭,特别是如果执行当前在框架无法控制的开发人员代码中,例如业务服务。但是,一旦控制权返回给框架,它会将当前的状态设置 StepExecutionBatchStatus.STOPPED,保存它,然后对JobExecution之前的完成执行相同的操作。

中止作业

A job execution which is FAILED can be restarted (if the Job is restartable). A job execution whose status is ABANDONED will not be restarted by the framework. The ABANDONED status is also used in step executions to mark them as skippable in a restarted job execution: if a job is executing and encounters a step that has been marked ABANDONED in the previous failed job execution, it will move on to the next step (as determined by the job flow definition and the step execution exit status).

如果进程死亡(kill -9或服务器故障),则该作业当然没有运行,但JobRepository无法知道,因为在进程死亡之前没有人告诉它。您必须手动告诉它您知道执行失败或应该被视为中止(将其状态更改为 FAILEDor ABANDONED)。这是一个商业决策,没有办法让它自动化。将状态更改为FAILED仅当它可重新启动并且您知道重新启动数据有效时。


1. see XML Configuration