ItemReaders 和 ItemWriters

所有批处理都可以用最简单的形式描述为读取大量数据,执行某种类型的计算或转换,然后写出结果。Spring Batch 提供了三个关键接口来帮助执行批量读写: ItemReaderItemProcessorItemWriter.

ItemReader

虽然是一个简单的概念,但它ItemReader是从许多不同类型的输入中提供数据的方法。最普遍的例子包括:

  • 平面文件:平面文件项目阅读器从平面文件中读取数据行,该文件通常描述具有由文件中的固定位置定义或由某些特殊字符(例如逗号)分隔的数据字段的记录。

  • XML:ItemReaders独立于用于解析、映射和验证对象的技术的 XML 处理 XML。输入数据允许针对 XSD 模式验证 XML 文件。

  • 数据库:访问数据库资源以返回可以映射到对象进行处理的结果集。默认的 SQLItemReader实现调用 aRowMapper 来返回对象,如果需要重新启动,则跟踪当前行,存储基本统计信息,并提供一些稍后解释的事务增强。

还有更多的可能性,但我们只关注本章的基本可能性。所有可用实现的完整列表ItemReader可在 附录 A中找到。

ItemReader是通用输入操作的基本接口,如下接口定义所示:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

read方法定义了ItemReader. 调用它会返回一个项目,或者null如果没有更多项目。一个项目可能代表文件中的一行、数据库中的一行或 XML 文件中的一个元素。通常期望这些映射到可用的域对象(例如TradeFoo或其他),但合同中没有要求这样做。

预计ItemReader接口的实现只是向前的。但是,如果底层资源是事务性的(例如 JMS 队列),那么调用 read可能会在回滚场景中的后续调用中返回相同的逻辑项。还值得注意的是,缺少要由 an 处理的项目ItemReader不会导致引发异常。例如,ItemReader配置有返回 0 结果的查询的数据库null在第一次调用read.

ItemWriter

ItemWriter在功能上与 an 相似,ItemReader但具有逆运算。资源仍然需要定位、打开和关闭,但它们的不同之处在于 ItemWriter写出而不是读入。在数据库或队列的情况下,这些操作可能是插入、更新或发送。输出的序列化格式特定于每个批处理作业。

与 一样ItemReaderItemWriter是一个相当通用的接口,如下面的接口定义所示:

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

readon一样ItemReaderwrite提供了基本的合约ItemWriter。只要它处于打开状态,它就会尝试写出传入的项目列表。因为通常期望项目被“批处理”成一个块然后输出,所以接口接受一个项目列表,而不是一个项目本身。写出列表后,可以在从 write 方法返回之前执行任何可能需要的刷新。例如,如果写入 Hibernate DAO,可以进行多次 write 调用,每个项目调用一次。然后作者可以flush在返回之前调用休眠会话。

ItemStream

两者都很好ItemReadersItemWriters服务于它们各自的目的,但是它们之间有一个共同的问题,即需要另一个接口。通常,作为批处理作业范围的一部分,需要打开、关闭读取器和写入器,并且需要一种持久化状态的机制。该ItemStream接口用于此目的,如以下示例所示:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

在描述每种方法之前,我们应该提到ExecutionContext. ItemReader同样实现的客户端 ItemStream应该在对 的open任何调用之前调用 read,以便打开任何资源(例如文件)或获取连接。类似的限制适用于ItemWriter实现ItemStream. 如第 2 章所述,如果在 中找到预期数据ExecutionContext,则它可能用于在其初始状态以外的位置启动ItemReader或。ItemWriter相反, close调用以确保在打开期间分配的任何资源都被安全释放。 update调用主要是为了确保当前持有的任何状态都加载到提供的ExecutionContext. 该方法在提交前调用,以确保当前状态在提交前被持久化在数据库中。

ItemStream在 an 的客户端是 a的特殊情况下Step(来自 Spring Batch Core),ExecutionContext为每个 StepExecution 创建 an 以允许用户存储特定执行的状态,并期望在JobInstance再次启动时返回它. 对于熟悉 Quartz 的人来说,语义与 Quartz 非常相似JobDataMap

委托模式和注册步骤

请注意,这CompositeItemWriter是委托模式的一个示例,在 Spring Batch 中很常见。委托本身可能实现回调接口,例如StepListener. 如果它们这样做并且如果它们与 Spring Batch Core 一起作为 aStep的一部分使用Job,那么它们几乎肯定需要在Step. 直接连接到接口的读取器、写入器或处理器在Step实现ItemStreamStepListener接口时会自动注册。但是,由于代理不知道Step,因此需要将它们作为侦听器或流(或两者都注入,如果合适的话)。

以下示例显示了如何将委托作为 XML 流注入:

XML 配置
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
                   commit-interval="2">
                <streams>
                    <stream ref="barWriter" />
                </streams>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
    <property name="delegate" ref="barWriter" />
</bean>

<bean id="barWriter" class="...BarWriter" />

以下示例显示了如何将委托作为 XML 流注入:

Java 配置
@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJob")
				.start(step1())
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(compositeItemWriter())
				.stream(barWriter())
				.build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

	CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

	writer.setDelegate(barWriter());

	return writer;
}

@Bean
public BarWriter barWriter() {
	return new BarWriter();
}

平面文件

交换批量数据的最常见机制之一一直是平面文件。与 XML 不同,XML 具有定义其结构 (XSD) 的公认标准,任何阅读平面文件的人都必须提前了解文件的结构。一般来说,所有平面文件都分为两种类型:定界文件和定长文件。分隔文件是其中字段由分隔符(例如逗号)分隔的文件。固定长度文件具有设定长度的字段。

FieldSet

在 Spring Batch 中处理平面文件时,无论是用于输入还是输出,最重要的类之一是FieldSet. 许多体系结构和库包含帮助您从文件中读取的抽象,但它们通常返回一个String或一组String对象。这真的只能让你走到一半。AFieldSet是 Spring Batch 的抽象,用于启用来自文件资源的字段绑定。它允许开发人员以与处理数据库输入相同的方式处理文件输入。AFieldSet在概念上类似于 JDBC ResultSet。AFieldSet只需要一个参数:aString令牌数组。(可选)您还可以配置字段的名称,以便可以通过索引或名称访问字段ResultSet,如以下示例所示:

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

界面上还有更多的选项FieldSet,如Date、long 、 BigDecimal等。最大的优点FieldSet是它提供了对平面文件输入的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以保持一致,而不是每个批处理作业以潜在的意外方式进行不同的解析。

FlatFileItemReader

平面文件是最多包含二维(表格)数据的任何类型的文件。在 Spring Batch 框架中读取平面文件是由名为 的类促进的 FlatFileItemReader,它提供了读取和解析平面文件的基本功能。两个最重要的必需依赖项FlatFileItemReaderResourceLineMapper。该LineMapper接口将在下一节中进行更多探讨。resource 属性表示一个 Spring Core Resource。说明如何创建这种类型的 bean 的文档可以在 Spring Framework 第 5 章资源中找到。Resource因此,除了显示以下简单示例之外,本指南不会详细介绍创建对象:

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由企业应用程序集成 (EAI) 基础设施管理,其中为外部接口建立放置区,用于将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动实用程序超出了 Spring Batch 架构的范围,但是批处理作业流将文件移动实用程序作为作业流中的步骤包括在内并不罕见。批处理架构只需要知道如何定位要处理的文件。Spring Batch 从这个起点开始将数据输入管道的过程。但是, Spring Integration提供了许多这些类型的服务。

中的其他属性可FlatFileItemReader让您进一步指定数据的解释方式,如下表所述:

表 1.FlatFileItemReader属性
财产 类型 描述

注释

细绳[]

指定指示注释行的行前缀。

编码

细绳

指定要使用的文本编码。默认值为 的值Charset.defaultCharset()

线映射器

LineMapper

将 a 转换StringObject表示项目的 a。

linesToSkip

整数

文件顶部要忽略的行数。

记录分隔符策略

记录分隔符策略

用于确定行尾在哪里,并在带引号的字符串内执行诸如继续行尾之类的操作。

资源

Resource

要从中读取的资源。

skippedLinesCallback

LineCallbackHandler

传递文件中要跳过的行的原始行内容的接口。如果linesToSkip设置为 2,则该接口被调用两次。

严格的

布尔值

ExecutionContext在严格模式下,如果输入资源不存在,阅读器会抛出异常。否则,它会记录问题并继续。

LineMapper

与 一样RowMapper,它采用低级构造,例如ResultSet并返回 an Object,平面文件处理需要相同的构造来将Stringline 转换为 an Object,如以下接口定义所示:

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本约定是,给定当前行和与其关联的行号,映射器应返回结果域对象。这类似于 RowMapper,因为每一行都与其行号相关联,就像 a 中的每一行都 ResultSet与其行号相关联。这允许将行号绑定到生成的域对象,以进行身份​​比较或获取更多信息的日志记录。但是,与 不同RowMapper的是,LineMapper它给出了一条原始线,如上所述,它只会让你走到一半。该行必须被标记为 a FieldSet,然后可以映射到一个对象,如本文档后面所述。

LineTokenizer

将输入行转换为 a 的抽象FieldSet是必要的,因为可能有许多格式的平面文件数据需要转换为FieldSet. 在 Spring Batch 中,这个接口是LineTokenizer

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

a 的契约LineTokenizer是这样的,给定一行输入(理论上 String可以包含多行),FieldSet返回代表该行的 a。然后FieldSet可以将其传递给FieldSetMapper. Spring Batch 包含以下LineTokenizer实现:

  • DelimitedLineTokenizer:用于记录中的字段由分隔符分隔的文件。最常见的分隔符是逗号,但也经常使用管道或分号。

  • FixedLengthTokenizer:用于记录中的每个字段都是“固定宽度”的文件。必须为每种记录类型定义每个字段的宽度。

  • PatternMatchingCompositeLineTokenizerLineTokenizer通过检查模式来确定应在特定行上使用标记器列表中的哪个。

FieldSetMapper

FieldSetMapper接口定义了一个方法,mapFieldSet该方法接受一个 FieldSet对象并将其内容映射到一个对象。此对象可能是自定义 DTO、域对象或数组,具体取决于作业的需要。与FieldSetMapper结合使用,LineTokenizer将资源中的一行数据转换为所需类型的对象,如下面的接口定义所示:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

RowMapper使用的模式与使用的相同JdbcTemplate

DefaultLineMapper

现在已经定义了读取平面文件的基本接口,很明显需要三个基本步骤:

  1. 从文件中读取一行。

  2. String将该行传递给LineTokenizer#tokenize()方法以检索 FieldSet.

  3. 将标记化返回的值传递FieldSet给 a FieldSetMapper,从方法返回结果ItemReader#read()

上面描述的两个接口代表两个独立的任务:将行转换为 a FieldSet并将 a 映射FieldSet到域对象。因为 a LineTokenizer的输入与LineMapper(a 行)的输入相匹配,并且 a 的输出 FieldSetMapper与 的输出相匹配,所以提供了同时使用 a和 aLineMapper的默认实现。下面的类定义中显示的 代表大多数用户需要的行为:LineTokenizerFieldSetMapperDefaultLineMapper

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

上述功能是在默认实现中提供的,而不是内置在阅读器本身中(就像在以前版本的框架中所做的那样),以允许用户在控制解析过程方面具有更大的灵活性,尤其是在需要访问原始行的情况下。

简单的分隔文件读取示例

以下示例说明了如何读取具有实际域场景的平面文件。这个特定的批处理作业从以下文件中读取足球运动员:

ID、姓氏、名字、职位、出生年份、出道年份
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
“AdamCh00,亚当斯,查理,wr,1979,2003”

此文件的内容映射到以下 Player域对象:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

要将 a 映射FieldSetPlayer对象中,FieldSetMapper需要定义返回玩家的 a,如下例所示:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

然后可以通过正确构造 aFlatFileItemReader和调用 来读取该文件read,如下例所示:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每次调用都会从文件中的每一行read返回一个新 Player对象。当到达文件末尾时,null返回。

按名称映射字段

两者都允许另外一项功能, DelimitedLineTokenizer并且FixedLengthTokenizer在功能上类似于 JDBC ResultSet。字段的名称可以注入到这些 LineTokenizer实现中的任何一个中,以增加映射函数的可读性。首先,将平面文件中所有字段的列名注入到分词器中,如下例所示:

tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});

AFieldSetMapper可以按如下方式使用此信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if (fs == null) {
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}
将 FieldSet 自动映射到域对象

对于许多人来说,必须编写一个具体FieldSetMapper的内容与RowMapper为一个JdbcTemplate. FieldSetMapperSpring Batch 通过使用 JavaBean 规范将字段名称与对象上的 setter 匹配来自动映射字段,从而使这变得更容易。

再次使用足球示例,BeanWrapperFieldSetMapper配置类似于以下 XML 片段:

XML 配置
<bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
    <property name="prototypeBeanName" value="player" />
</bean>

<bean id="player"
      class="org.springframework.batch.sample.domain.Player"
      scope="prototype" />

再次使用足球示例,BeanWrapperFieldSetMapper配置类似于 Java 中的以下代码段:

Java 配置
@Bean
public FieldSetMapper fieldSetMapper() {
	BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

	fieldSetMapper.setPrototypeBeanName("player");

	return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
	return new Player();
}

对于 中的每个条目FieldSet,映射器在对象的新实例上查找相应的设置器Player(因此,需要原型范围),就像 Spring 容器查找与属性名称匹配的设置器一样。FieldSet映射中的每个可用字段,并Player返回结果对象,无需代码。

固定长度文件格式

到目前为止,只详细讨论了分隔文件。但是,它们仅代表文件读取图片的一半。许多使用平面文件的组织使用固定长度格式。示例固定长度文件如下:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

虽然这看起来像一个大字段,但它实际上代表了 4 个不同的字段:

  1. ISIN:所订购商品的唯一标识符 - 12 个字符长。

  2. 数量:订购商品的数量 - 3 个字符长。

  3. 价格:商品的价格 - 5 个字符长。

  4. 客户:订购商品的客户 ID - 9 个字符长。

配置 时FixedLengthLineTokenizer,必须以范围的形式提供这些长度中的每一个。

以下示例显示如何为FixedLengthLineTokenizerin XML 定义范围:

XML 配置
<bean id="fixedLengthLineTokenizer"
      class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
    <property name="names" value="ISIN,Quantity,Price,Customer" />
    <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

因为FixedLengthLineTokenizer使用与LineTokenizer前面讨论的相同的接口,所以它返回的结果与FieldSet使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用 BeanWrapperFieldSetMapper.

支持上述范围语法需要 RangeArrayPropertyEditorApplicationContext. ApplicationContext但是,此 bean 会在使用批处理命名空间的地方自动声明。

以下示例显示了如何FixedLengthLineTokenizer在 Java 中定义范围:

Java 配置
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
	FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

	tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
	tokenizer.setColumns(new Range(1, 12),
						new Range(13, 15),
						new Range(16, 20),
						new Range(21, 29));

	return tokenizer;
}

因为FixedLengthLineTokenizer使用与LineTokenizer上面讨论的相同的接口,所以它返回的结果与FieldSet使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用 BeanWrapperFieldSetMapper.

单个文件中的多种记录类型

为简单起见,到目前为止所有的文件读取示例都做了一个关键假设:文件中的所有记录都具有相同的格式。然而,情况可能并非总是如此。一个文件可能包含不同格式的记录,这些记录需要进行不同的标记并映射到不同的对象,这是很常见的。以下文件摘录说明了这一点:

用户;史密斯;彼得;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

在这个文件中,我们有三种类型的记录,“USER”、“LINEA”和“LINEB”。“USER”行对应一个User对象。“LINEA”和“LINEB”都对应于Line对象,尽管“LINEA”比“LINEB”具有更多信息。

ItemReader单独读取每一行,但我们必须指定不同 的对象LineTokenizerFieldSetMapper以便ItemWriter接收正确的项目。通过PatternMatchingCompositeLineMapper允许配置模式映射LineTokenizers和模式来使这变得容易FieldSetMappers

以下示例显示如何为FixedLengthLineTokenizerin XML 定义范围:

XML 配置
<bean id="orderFileLineMapper"
      class="org.spr...PatternMatchingCompositeLineMapper">
    <property name="tokenizers">
        <map>
            <entry key="USER*" value-ref="userTokenizer" />
            <entry key="LINEA*" value-ref="lineATokenizer" />
            <entry key="LINEB*" value-ref="lineBTokenizer" />
        </map>
    </property>
    <property name="fieldSetMappers">
        <map>
            <entry key="USER*" value-ref="userFieldSetMapper" />
            <entry key="LINE*" value-ref="lineFieldSetMapper" />
        </map>
    </property>
</bean>
Java 配置
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
	PatternMatchingCompositeLineMapper lineMapper =
		new PatternMatchingCompositeLineMapper();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
	tokenizers.put("USER*", userTokenizer());
	tokenizers.put("LINEA*", lineATokenizer());
	tokenizers.put("LINEB*", lineBTokenizer());

	lineMapper.setTokenizers(tokenizers);

	Map<String, FieldSetMapper> mappers = new HashMap<>(2);
	mappers.put("USER*", userFieldSetMapper());
	mappers.put("LINE*", lineFieldSetMapper());

	lineMapper.setFieldSetMappers(mappers);

	return lineMapper;
}

在此示例中,“LINEA”和“LINEB”具有不同LineTokenizer的实例,但它们都使用相同的FieldSetMapper.

PatternMatchingCompositeLineMapper使用该方法PatternMatcher#match为每一行选择正确的委托。允许两个具有特殊含义的PatternMatcher通配符:问号 ("?") 匹配一个字符,而星号 ("*") 匹配零个或多个字符。请注意,在前面的配置中,所有模式都以星号结尾,使它们有效地作为行的前缀。无论配置中的PatternMatcher顺序如何,始终匹配最具体的模式。因此,如果“LINE*”和“LINEA*”都被列为模式,“LINEA”将匹配模式“LINEA*”,而“LINEB”将匹配模式“LINE*”。此外,单个星号(“*”

以下示例显示如何匹配 XML 中任何其他模式都不匹配的行:

XML 配置
<entry key="*" value-ref="defaultLineTokenizer" />

以下示例显示了如何匹配 Java 中任何其他模式都不匹配的行:

Java 配置
...
tokenizers.put("*", defaultLineTokenizer());
...

还有一个PatternMatchingCompositeLineTokenizer可以单独用于标记化。

平面文件通常包含跨越多行的记录。为了处理这种情况,需要更复杂的策略。在示例中可以找到这种常见模式的演示multiLineRecords

平面文件中的异常处理

标记一行可能会导致抛出异常的情况有很多。许多平面文件并不完美,并且包含格式不正确的记录。许多用户在记录问题、原始行和行号时选择跳过这些错误行。以后可以手动或通过另一个批处理作业检查这些日志。出于这个原因,Spring Batch 提供了一个层次结构的异常来处理解析异常: FlatFileParseExceptionFlatFileFormatException. 在尝试读取文件时遇到任何错误时FlatFileParseException抛出。由 接口的实现抛出并指示在标记化时遇到的更具体的错误。FlatFileItemReaderFlatFileFormatExceptionLineTokenizer

IncorrectTokenCountException

两者DelimitedLineTokenizerFixedLengthLineTokenizer可以指定可用于创建FieldSet. 但是,如果列名的数量与标记行时找到的列数不匹配,FieldSet 则无法创建并IncorrectTokenCountException抛出 an,其中包含遇到的标记数和预期的数量,如下例所示:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

因为标记器配置了 4 个列名,但在文件中只找到了 3 个标记,所以IncorrectTokenCountException抛出了一个。

IncorrectLineLengthException

以固定长度格式格式化的文件在解析时有额外的要求,因为与分隔格式不同,每列必须严格遵守其预定义的宽度。如果总行长不等于该列的最宽值,则抛出异常,如下例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上面标记器的配置范围是:1-5、6-10 和 11-15。因此,该行的总长度为 15。但是,在前面的示例中,传入了长度为 5 的行,导致IncorrectLineLengthException抛出 an。在此处抛出异常而不是仅映射第一列允许该行的处理更早地失败,并且如果在尝试读取 a 中的第 2 列时失败,它将包含更多的信息FieldSetMapper。但是,在某些情况下,线的长度并不总是恒定的。因此,可以通过 'strict' 属性关闭行长验证,如下例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

前面的例子几乎和前面的例子一样,只是它 tokenizer.setStrict(false)被调用了。此设置告诉标记器在标记行时不强制行长度。AFieldSet现在已正确创建并返回。但是,它只包含剩余值的空标记。

FlatFileItemWriter

写入平面文件具有相同的问题和从文件读取必须克服的问题。步骤必须能够以事务方式编写定界或固定长度格式。

LineAggregator

正如LineTokenizer接口对于获取项目并将其转换为 是必需的 一样String,文件写入必须有一种方法将多个字段聚合为单个字符串以写入文件。在 Spring Batch 中,这是LineAggregator,如下面的接口定义所示:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

是的LineAggregator逻辑反义词LineTokenizerLineTokenizer接受 a String并返回 a FieldSet,而LineAggregator接受 aitem并返回 a String

PassThroughLineAggregator

LineAggregator接口 最基本的实现是PassThroughLineAggregator,它假设对象已经是一个字符串或者它的字符串表示是可以写的,如下代码所示:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

FlatFileItemWriter如果需要直接控制创建字符串但需要 a 的优点(例如事务和重新启动支持),则上述实现很有用。

简化文件写入示例

既然已经定义了LineAggregator接口及其最基本的实现, PassThroughLineAggregator那么就可以解释基本的编写流程了:

  1. 将要写入的对象传递给LineAggregator以获取 String.

  2. 返回String的内容写入配置文件。

以下摘录FlatFileItemWriter在代码中表达了这一点:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

在 XML 中,一个简单的配置示例可能如下所示:

XML 配置
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" value="file:target/test-outputs/output.txt" />
    <property name="lineAggregator">
        <bean class="org.spr...PassThroughLineAggregator"/>
    </property>
</bean>

在 Java 中,一个简单的配置示例可能如下所示:

Java 配置
@Bean
public FlatFileItemWriter itemWriter() {
	return  new FlatFileItemWriterBuilder<Foo>()
           			.name("itemWriter")
           			.resource(new FileSystemResource("target/test-outputs/output.txt"))
           			.lineAggregator(new PassThroughLineAggregator<>())
           			.build();
}
FieldExtractor

前面的示例对于写入文件的最基本用途可能很有用。但是,大多数用户FlatFileItemWriter都有一个需要写出的域对象,因此必须将其转换为一行。在文件读取中,需要以下内容:

  1. 从文件中读取一行。

  2. 将该行传递给LineTokenizer#tokenize()方法,以便检索 FieldSet.

  3. 将标记化返回的值传递FieldSet给 a FieldSetMapper,从方法返回结果ItemReader#read()

文件写入有类似但相反的步骤:

  1. 将要写入的项目传递给编写器。

  2. 将项目上的字段转换为数组。

  3. 将结果数组聚合成一行。

因为框架无法知道需要写出对象中的哪些字段,所以FieldExtractor必须写a来完成将项变成数组的任务,如下面的接口定义所示:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

接口的实现FieldExtractor应该从提供的对象的字段创建一个数组,然后可以用元素之间的分隔符或作为固定宽度行的一部分写出。

PassThroughFieldExtractor

在许多情况下,需要写出一个集合,例如一个数组、Collection或。FieldSet从其中一种集合类型中“提取”一个数组非常简单。为此,请将集合转换为数组。因此, PassThroughFieldExtractor应该在这种情况下使用。需要注意的是,如果传入的对象不是集合类型,则PassThroughFieldExtractor 返回一个仅包含要提取的项的数组。

BeanWrapperFieldExtractor

BeanWrapperFieldSetMapper文件读取部分中的描述一样,通常最好配置如何将域对象转换为对象数组,而不是自己编写转换。提供BeanWrapperFieldExtractor此功能,如下例所示:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

这个提取器实现只有一个必需的属性:要映射的字段的名称。正如BeanWrapperFieldSetMapper需要将字段名称映射 FieldSet到所提供对象的 setter 上的字段一样,BeanWrapperFieldExtractor需要将名称映射到 getter 以创建对象数组。值得注意的是,名称的顺序决定了数组中字段的顺序。

分隔文件写入示例

最基本的平面文件格式是所有字段都由分隔符分隔的格式。这可以使用DelimitedLineAggregator. 以下示例写出一个简单的域对象,该对象表示对客户帐户的信用:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

因为正在使用域对象,所以FieldExtractor 必须提供接口的实现以及要使用的分隔符。

以下示例显示了如何FieldExtractor在 XML 中使用带分隔符的 :

XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

以下示例显示了如何FieldExtractor在 Java 中使用带分隔符:

Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
	lineAggregator.setDelimiter(",");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

在前面的示例BeanWrapperFieldExtractor中,本章前面描述的用于将其中的名称和信用字段CustomerCredit转换为对象数组,然后在每个字段之间用逗号写出。

也可以使用FlatFileItemWriterBuilder.DelimitedBuilder自动创建BeanWrapperFieldExtractorand DelimitedLineAggregator ,如下例所示:

Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.delimited()
				.delimiter("|")
				.names(new String[] {"name", "credit"})
				.build();
}
固定宽度文件写入示例

分隔不是唯一的平面文件格式类型。许多人更喜欢为每列使用设置宽度来划分字段之间的界限,这通常称为“固定宽度”。Spring Batch 在使用FormatterLineAggregator.

使用与上述相同的CustomerCredit域对象,可以在 XML 中进行如下配置:

XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...FormatterLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit" />
                </bean>
            </property>
            <property name="format" value="%-9s%-2.0f" />
        </bean>
    </property>
</bean>

使用与上述相同的CustomerCredit域对象,可以在 Java 中进行如下配置:

Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
	lineAggregator.setFormat("%-9s%-2.0f");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

前面的大多数示例应该看起来很熟悉。但是,格式属性的值是新的。

以下示例显示了 XML 中的格式属性:

<property name="format" value="%-9s%-2.0f" />

以下示例显示了 Java 中的格式属性:

...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

底层实现是使用 Formatter作为 Java 5 的一部分添加的相同内容构建的。Java Formatter基于 printfC 编程语言的功能。有关如何配置格式化程序的大多数详细信息可以在Formatter的 Javadoc 中找到。

也可以使用FlatFileItemWriterBuilder.FormattedBuilder自动创建BeanWrapperFieldExtractorand FormatterLineAggregator ,如下例所示:

Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.formatted()
				.format("%-9s%-2.0f")
				.names(new String[] {"name", "credit"})
				.build();
}
处理文件创建

FlatFileItemReader与文件资源有非常简单的关系。初始化阅读器时,它会打开文件(如果存在),如果不存在则抛出异常。文件写入并不是那么简单。乍一看,似乎应该存在类似的简单契约FlatFileItemWriter:如果文件已经存在,则抛出异常,如果不存在,则创建它并开始编写。但是,可能重新启动 aJob可能会导致问题。在正常的重启场景中,合约是相反的:如果文件存在,则从最后一个已知的正确位置开始写入,如果不存在,则抛出异常。但是,如果此作业的文件名始终相同,会发生什么情况?在这种情况下,如果文件存在,您可能希望删除该文件,除非它是重新启动。由于这种可能性,FlatFileItemWriter 包含属性,shouldDeleteIfExists。将此属性设置为 true 会导致在打开编写器时删除同名的现有文件。

XML 项读取器和写入器

Spring Batch 提供了用于读取 XML 记录并将它们映射到 Java 对象以及将 Java 对象写入为 XML 记录的事务基础结构。

对流式 XML 的限制

StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理要求(DOM 一次将整个输入加载到内存中,而 SAX 通过允许用户仅提供回调来控制解析过程)。

我们需要考虑 XML 输入和输出在 Spring Batch 中是如何工作的。首先,有一些概念与文件读取和写入不同,但在 Spring Batch XML 处理中很常见。使用 XML 处理,而不是需要标记的记录行(FieldSet实例),假定 XML 资源是对应于各个记录的“片段”的集合,如下图所示:

XML 输入
图 1. XML 输入

“交易”标签在上述场景中被定义为“根元素”。'<trade>' 和 '</trade>' 之间的所有内容都被视为一个“片段”。Spring Batch 使用 Object/XML Mapping (OXM) 将片段绑定到对象。但是,Spring Batch 不依赖于任何特定的 XML 绑定技术。典型的用途是委托给 Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,您可以选择实现 Spring Batch 特定的接口。与 OXM 支持的技术的关系如下图所示:

OXM 绑定
图 2. OXM 绑定

通过对 OXM 的介绍以及如何使用 XML 片段来表示记录,我们现在可以更仔细地检查读取器和写入器。

StaxEventItemReader

StaxEventItemReader配置提供了用于处理来自 XML 输入流的记录的典型设置。首先,考虑StaxEventItemReadercan 处理的以下 XML 记录集:

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

为了能够处理 XML 记录,需要以下内容:

  • 根元素名称:构成要映射的对象的片段的根元素的名称。示例配置通过贸易价值证明了这一点。

  • Resource:表示要读取的文件的 Spring Resource。

  • Unmarshaller:Spring OXM 提供的一种解组工具,用于将 XML 片段映射到对象。

以下示例显示如何定义StaxEventItemReader与名为 的根元素trade、 的资源和在 XML 中data/iosample/input/input.xml调用的解组器一起使用的 a:tradeMarshaller

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
    <property name="fragmentRootElementName" value="trade" />
    <property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
    <property name="unmarshaller" ref="tradeMarshaller" />
</bean>

以下示例显示如何定义StaxEventItemReader与名为 的根元素trade、 的资源和在 Java 中data/iosample/input/input.xml调用的解组器一起使用的 a:tradeMarshaller

Java 配置
@Bean
public StaxEventItemReader itemReader() {
	return new StaxEventItemReaderBuilder<Trade>()
			.name("itemReader")
			.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
			.addFragmentRootElements("trade")
			.unmarshaller(tradeMarshaller())
			.build();

}

请注意,在此示例中,我们选择使用 an XStreamMarshaller,它接受作为映射传入的别名,其中第一个键和值是片段的名称(即根元素)和要绑定的对象类型。然后,与 a 类似FieldSet,映射到对象类型中的字段的其他元素的名称在映射中被描述为键/值对。在配置文件中,我们可以使用 Spring 配置实用程序来描述所需的别名。

以下示例显示如何在 XML 中描述别名:

XML 配置
<bean id="tradeMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="trade"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

以下示例显示了如何在 Java 中描述别名:

Java 配置
@Bean
public XStreamMarshaller tradeMarshaller() {
	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	XStreamMarshaller marshaller = new XStreamMarshaller();

	marshaller.setAliases(aliases);

	return marshaller;
}

在输入时,阅读器读取 XML 资源,直到它识别出一个新的片段即将开始。默认情况下,阅读器匹配元素名称以识别新片段即将开始。阅读器从片段创建一个独立的 XML 文档,并将文档传递给反序列化器(通常是 Spring OXM 的包装器Unmarshaller)以将 XML 映射到 Java 对象。

总之,这个过程类似于下面的 Java 代码,它使用了 Spring 配置提供的注入:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

StaxEventItemWriter

输出与输入对称。StaxEventItemWriter需要一个Resource、一个编组器和一个rootTagName. Resource一个 Java 对象被传递给一个编组器(通常是一个标准的 Spring OXM 编组器),该编组器使用一个自定义事件编写器来写入 a ,该编写器过滤OXM 工具为每个片段生成的事件StartDocumentEndDocument

以下 XML 示例使用MarshallingEventWriterSerializer

XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="marshaller" ref="tradeMarshaller" />
    <property name="rootTagName" value="trade" />
    <property name="overwriteOutput" value="true" />
</bean>

以下 Java 示例使用MarshallingEventWriterSerializer

Java 配置
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
	return new StaxEventItemWriterBuilder<Trade>()
			.name("tradesWriter")
			.marshaller(tradeMarshaller())
			.resource(outputResource)
			.rootTagName("trade")
			.overwriteOutput(true)
			.build();

}

前面的配置设置了三个必需的属性并设置了 overwriteOutput=true本章前面提到的可选属性,用于指定是否可以覆盖现有文件。

以下 XML 示例使用与本章前面的阅读示例中使用的相同的编组器:

XML 配置
<bean id="customerCreditMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="customer"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

以下 Java 示例使用与本章前面的阅读示例中使用的相同的编组器:

Java 配置
@Bean
public XStreamMarshaller customerCreditMarshaller() {
	XStreamMarshaller marshaller = new XStreamMarshaller();

	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	marshaller.setAliases(aliases);

	return marshaller;
}

总结一个 Java 示例,以下代码说明了讨论的所有要点,演示了所需属性的编程设置:

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
	new StaxEventItemWriterBuilder<Trade>()
				.name("tradesWriter")
				.marshaller(marshaller)
				.resource(resource)
				.rootTagName("trade")
				.overwriteOutput(true)
				.build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

JSON 项目读取器和写入器

Spring Batch 提供对以下格式的 JSON 资源的读写支持:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

假设 JSON 资源是对应于单个项目的 JSON 对象数组。Spring Batch 不绑定到任何特定的 JSON 库。

JsonItemReader

JsonItemReaderJSON 解析和绑定委托给 org.springframework.batch.item.json.JsonObjectReader接口的实现。此接口旨在通过使用流式 API 以块读取 JSON 对象来实现。目前提供了两种实现:

  • 杰克逊通过org.springframework.batch.item.json.JacksonJsonObjectReader

  • Gson通过org.springframework.batch.item.json.GsonJsonObjectReader

为了能够处理 JSON 记录,需要以下内容:

  • Resource:代表要读取的 JSON 文件的 Spring 资源。

  • JsonObjectReader:一个 JSON 对象阅读器,用于解析 JSON 对象并将其绑定到项目

下面的例子展示了如何定义一个JsonItemReader与之前的 JSON 资源一起工作的org/springframework/batch/item/json/trades.jsona 和一个 JsonObjectReader基于 Jackson 的 a:

@Bean
public JsonItemReader<Trade> jsonItemReader() {
   return new JsonItemReaderBuilder<Trade>()
                 .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonItemReader")
                 .build();
}

JsonFileItemWriter

JsonFileItemWriter项目编组委托给 org.springframework.batch.item.json.JsonObjectMarshaller接口。该接口的约定是获取一个对象并将其编组为 JSON String。目前提供了两种实现:

  • 杰克逊通过org.springframework.batch.item.json.JacksonJsonObjectMarshaller

  • Gson通过org.springframework.batch.item.json.GsonJsonObjectMarshaller

为了能够写入 JSON 记录,需要以下内容:

  • ResourceResource:代表要写入的 JSON 文件的 Spring

  • JsonObjectMarshaller:将对象编组为 JSON 格式的 JSON 对象编组器

以下示例显示了如何定义 a JsonFileItemWriter

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

多文件输入

在单个Step. 假设文件都具有相同的格式,则MultiResourceItemReader支持 XML 和平面文件处理的这种类型的输入。考虑目录中的以下文件:

文件-1.txt 文件-2.txt 被忽略的.txt

file-1.txt 和 file-2.txt 的格式相同,出于业务原因,应一起处理。可MultiResourceItemReader用于通过使用通配符读取两个文件。

以下示例显示了如何在 XML 中读取带有通配符的文件:

XML 配置
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
    <property name="resources" value="classpath:data/input/file-*.txt" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>

以下示例显示了如何在 Java 中读取带有通配符的文件:

Java 配置
@Bean
public MultiResourceItemReader multiResourceReader() {
	return new MultiResourceItemReaderBuilder<Foo>()
					.delegate(flatFileItemReader())
					.resources(resources())
					.build();
}

引用的委托是一个简单的FlatFileItemReader. 上面的配置从两个文件中读取输入,处理回滚和重启场景。应该注意的是,与 any 一样ItemReader,添加额外的输入(在本例中为文件)可能会在重新启动时导致潜在的问题。建议批处理作业使用它们自己的单独目录,直到成功完成。

输入资源通过 using 进行排序,MultiResourceItemReader#setComparator(Comparator) 以确保在重新启动场景中的作业运行之间保留资源排序。

数据库

像大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,由于系统必须使用的数据集的庞大规模,批处理与其他应用程序样式不同。如果一条 SQL 语句返回 100 万行,则结果集可能会将所有返回的结果保存在内存中,直到读取所有行。Spring Batch 为这个问题提供了两种类型的解决方案:

基于光标的ItemReader实现

使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库对“流式传输”关系数据问题的解决方案。JavaResultSet类本质上是一种用于操作游标的面向对象的机制。AResultSet维护一个指向当前数据行的游标。调用nexta ResultSet会将此光标移动到下一行。Spring Batch 基于游标的ItemReader 实现在初始化时打开一个游标,并在每次调用时将游标向前移动一行read,返回一个可用于处理的映射对象。然后调用该 close方法以确保释放所有资源。Spring 核心 JdbcTemplate通过使用回调模式将所有行完全映射到一个ResultSet并在将控制权返回给方法调用者之前关闭。但是,在批处理中,这必须等到步骤完成。下图显示了基于光标的ItemReader工作原理的通用图。请注意,虽然该示例使用 SQL(因为 SQL 广为人知),但任何技术都可以实现基本方法。

光标示例
图 3. 光标示例

这个例子说明了基本模式。给定一个 'FOO' 表,它具有三列: IDNAMEBAR,选择 ID 大于 1 但小于 7 的所有行。这会将光标(第 1 行)的开头放在 ID 2 上。这一行的结果应该是一个完全映射的Foo对象。再次调用read()会将光标移动到下一行,即FooID 为 3 的 。这些读取的结果在每个 之后写出 read,从而允许对对象进行垃圾收集(假设没有实例变量维护对它们的引用)。

JdbcCursorItemReader

JdbcCursorItemReader是基于游标技术的 JDBC 实现。它直接与 a 一起工作,ResultSet并且需要一条 SQL 语句来针对从 a 获得的连接运行DataSource。以下数据库模式用作示例:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人喜欢为每一行使用一个域对象,所以下面的例子使用RowMapper接口的一个实现来映射一个CustomerCredit对象:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

由于JdbcCursorItemReader与 共享关键接口JdbcTemplate,因此查看如何使用 读取此数据的示例JdbcTemplate以将其与 进行对比很有用ItemReader。出于本示例的目的,假设CUSTOMER数据库中有 1,000 行。第一个示例使用JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

运行上述代码片段后,customerCredits列表包含 1,000 个 CustomerCredit对象。在查询方法中,从 中获取连接 DataSource,针对它运行提供的 SQL,并mapRow为 中的每一行调用该方法ResultSet。将此与 的方法进行对比 JdbcCursorItemReader,如下例所示:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

运行上述代码片段后,计数器等于 1,000。如果上面的代码将返回的值放入一个列表中,结果将与示例customerCredit完全相同。JdbcTemplate但是,它的最大优点ItemReader 是它允许“流式传输”项目。该read方法可以调用一次,可以用 写出项目ItemWriter,然后可以用 获取下一个项目 read。这允许项目的读取和写入在“块”中完成并定期提交,这是高性能批处理的本质。此外,它很容易配置为注入 Spring Batch Step

以下示例显示了如何将一个注入ItemReaderStepXML 中:

XML 配置
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

以下示例展示了如何在 Java 中将an 注入ItemReader到 a中:Step

Java 配置
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}
附加属性

因为在 Java 中打开游标有很多不同的选项,所以JdbcCursorItemReader可以设置许多属性,如下表所述:

表 2. JdbcCursorItemReader 属性

忽略警告

确定是否记录 SQLWarnings 或导致异常。默认值为true(意味着记录警告)。

取大小

当. ResultSet_ ItemReader默认情况下,不给出任何提示。

最大行数

设置底层证券ResultSet在任何时候可以容纳的最大行数限制。

查询超时

Statement设置驱动程序等待对象运行的秒数。如果超出限制,DataAccessException则抛出 a。(有关详细信息,请参阅您的驱动程序供应商文档)。

验证光标位置

因为由ResultSet持有的相同ItemReader被传递给RowMapper,所以用户可能会调用ResultSet.next()自己,这可能会导致阅读器的内部计数出现问题。 如果光标位置在调用后与之前不同,则将此值设置为true会引发异常。RowMapper

保存状态

指示是否应将阅读器的状态保存在 ExecutionContext提供的 中ItemStream#update(ExecutionContext)。默认值为 true.

驱动程序支持绝对

指示 JDBC 驱动程序是否支持在ResultSet. true 对于支持 的 JDBC 驱动程序,建议将其设置为ResultSet.absolute(),因为它可能会提高性能,尤其是在处理大型数据集时某个步骤失败的情况下。默认为false.

setUseSharedExtendedConnection

指示用于游标的连接是否应由所有其他处理使用,从而共享同一事务。如果将其设置为false,则游标将使用其自己的连接打开,并且不参与为其余步骤处理启动的任何事务。如果将此标志设置为,true则必须将 DataSource 包装在 an 中 ExtendedConnectionDataSourceProxy,以防止在每次提交后关闭和释放连接。当您将此选项设置为true,用于打开游标的语句是使用“READ_ONLY”和“HOLD_CURSORS_OVER_COMMIT”选项创建的。这允许在步骤处理中执行的事务开始和提交上保持光标打开。要使用此功能,您需要一个支持此功能的数据库和一个支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认为false.

HibernateCursorItemReader

就像普通的 Spring 用户对是否使用 ORM 解决方案做出重要决定一样,这会影响他们是否使用 aJdbcTemplate或 a HibernateTemplate,Spring Batch 用户也有相同的选择。 HibernateCursorItemReader是游标技术的 Hibernate 实现。Hibernate 在批处理中的使用颇具争议。这主要是因为 Hibernate 最初是为了支持在线应用程序样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession而不是标准会话。这消除了 Hibernate 使用的所有缓存和脏检查,这可能会导致批处理场景中的问题。有关无状态和正常休眠会话之间差异的更多信息,请参阅特定休眠版本的文档。HibernateCursorItemReader允许您声明 HQL 语句并传入 a ,这将在 每次 SessionFactory调用时传回一个项目,以与JdbcCursorItemReader. 以下示例配置使用与 JDBC 读取器相同的“客户信用”示例:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

假设已为表正确创建了休眠映射文件,此配置以与 描述的完全相同的方式ItemReader返回对象。'useStatelessSession' 属性默认为 true,但已在此处添加以提醒人们注意打开或关闭它的能力。还值得注意的是,可以使用该 属性设置底层游标的获取大小。与 一样,配置很简单。CustomerCreditJdbcCursorItemReaderCustomersetFetchSizeJdbcCursorItemReader

以下示例显示了如何ItemReader在 XML 中注入 Hibernate:

XML 配置
<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>

以下示例显示了如何ItemReader在 Java 中注入 Hibernate:

Java 配置
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
	return new HibernateCursorItemReaderBuilder<CustomerCredit>()
			.name("creditReader")
			.sessionFactory(sessionFactory)
			.queryString("from CustomerCredit")
			.build();
}
StoredProcedureItemReader

有时需要使用存储过程来获取游标数据。其 StoredProcedureItemReader工作方式与 类似JdbcCursorItemReader,不同之处在于,它不是运行查询来获取游标,而是运行返回游标的存储过程。存储过程可以通过三种不同的方式返回游标:

  • 作为返回值ResultSet(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。

  • 作为作为 out 参数返回的引用游标(由 Oracle 和 PostgreSQL 使用)。

  • 作为存储函数调用的返回值。

以下 XML 示例配置使用与前面示例相同的“客户信用”示例:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

以下 Java 示例配置使用与前面示例相同的“客户信用”示例:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

前面的示例依赖存储过程来提供 aResultSet作为返回结果(前面的选项 1)。

如果存储过程返回 a ref-cursor(选项 2),那么我们需要提供返回的 out 参数的位置ref-cursor

以下示例显示了如何使用 XML 中的第一个参数作为引用游标:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

以下示例显示了如何使用第一个参数作为 Java 中的 ref-cursor:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

如果游标是从存储函数返回的(选项 3),我们需要将属性“ function ”设置为true. 它默认为false.

以下示例显示true了 XML 中的属性 to:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

以下示例显示true了 Java 中的属性:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

在所有这些情况下,我们需要定义 aRowMapper以及 aDataSource和实际的过程名称。

如果存储过程或函数接受参数,则必须使用parameters属性声明和设置它们。以下示例对于 Oracle,声明了三个参数。第一个是out返回 ref-cursor 的参数,第二个和第三个是接受 type 值的参数INTEGER

以下示例显示了如何使用 XML 中的参数:

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

以下示例显示了如何在 Java 中使用参数:

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

除了参数声明之外,我们还需要指定一个PreparedStatementSetter 设置调用参数值的实现。这与JdbcCursorItemReader上述相同。附加属性中列出的所有 附加属性也适用于StoredProcedureItemReader

分页ItemReader实现

使用数据库游标的替代方法是运行多个查询,其中每个查询获取部分结果。我们将此部分称为页面。每个查询都必须指定起始行号和我们希望在页面中返回的行数。

JdbcPagingItemReader

分页的一种实现ItemReaderJdbcPagingItemReader. JdbcPagingItemReader需要一个PagingQueryProvider负责提供用于检索组成页面的行的 SQL 查询。 由于每个数据库都有自己的策略来提供分页支持,因此我们需要PagingQueryProvider 为每种支持的数据库类型使用不同的策略。还有SqlPagingQueryProviderFactoryBean 一个自动检测正在使用的数据库并确定适当的 PagingQueryProvider实现。这简化了配置,是推荐的最佳实践。

SqlPagingQueryProviderFactoryBean要求您指定一个子句select和一个 from子句。您还可以提供一个可选where子句。这些子句和 requiredsortKey用于构建 SQL 语句。

重要的是对 有一个唯一的键约束,sortKey以保证在执行之间不会丢失任何数据。

read打开阅读器后,它会以与任何其他相同的基本方式将每次调用传回一个项目ItemReader。当需要额外的行时,分页发生在幕后。

以下 XML 示例配置使用与ItemReaders前面显示的基于光标的类似“客户信用”示例:

XML 配置
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

以下 Java 示例配置使用与ItemReaders前面显示的基于光标的类似“客户信用”示例:

Java 配置
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

此配置使用必须指定的ItemReader返回CustomerCredit对象。RowMapper“pageSize”属性确定每次查询运行时从数据库读取的实体数。

'parameterValues' 属性可用于指定Map查询的参数值。如果在where子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果您使用传统的“?” placeholder,那么每个条目的 key 应该是占位符的编号,从 1 开始。

JpaPagingItemReader

分页的另一种实现ItemReaderJpaPagingItemReader. JPA 没有类似于 Hibernate 的概念StatelessSession,所以我们必须使用 JPA 规范提供的其他特性。由于 JPA 支持分页,因此在使用 JPA 进行批处理时这是一个自然的选择。读取每个页面后,实体将分离并清除持久性上下文,以便在处理页面后对实体进行垃圾收集。

JpaPagingItemReader允许您声明 JPQL 语句并 EntityManagerFactory传入. 然后,它在每次调用时传回一个项目,以与任何其他项目相同的基本方式进行读取ItemReader。当需要其他实体时,分页发生在幕后。

以下 XML 示例配置使用与前面显示的 JDBC 阅读器相同的“客户信用”示例:

XML 配置
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

以下 Java 示例配置使用与前面显示的 JDBC 阅读器相同的“客户信用”示例:

Java 配置
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

假设对象具有正确的 JPA 注释或 ORM 映射文件,此配置以与上述完全相同的方式ItemReader返回对象。“pageSize”属性确定每次查询执行时从数据库读取的实体数。CustomerCreditJdbcPagingItemReaderCustomerCredit

数据库 ItemWriters

虽然平面文件和 XML 文件都有特定的ItemWriter实例,但在数据库世界中并没有完全相同的实例。这是因为事务提供了所有需要的功能。 ItemWriter文件的实现是必要的,因为它们必须像事务性的那样工作,跟踪写入的项目并在适当的时间刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建自己的 DAO 来实现ItemWriter接口或使用自定义的 DAOItemWriter这是为通用处理问题而编写的。无论哪种方式,它们都应该毫无问题地工作。需要注意的一件事是通过批处理输出提供的性能和错误处理能力。这在使用 hibernate 时最常见,ItemWriter但在使用 JDBC 批处理模式时可能会遇到相同的问题。批处理数据库输出没有任何固有缺陷,假设我们小心刷新并且数据没有错误。但是,写入时的任何错误都可能导致混淆,因为无法知道是哪个单个项目导致了异常,或者即使任何单个项目负责,如下图所示:

刷新错误
图 4. 刷新错误

如果项目在写入之前被缓冲,则在提交之前刷新缓冲区之前不会引发任何错误。例如,假设每个块写入 20 个项目,第 15 个项目抛出一个DataIntegrityViolationException. 就目前Step 而言,所有 20 项都已成功写入,因为在实际写入之前无法知道是否发生了错误。一旦Session#flush()被调用,缓冲区就会被清空并触发异常。在这一点上,没有什么Step 可以做。事务必须回滚。通常,此异常可能会导致项目被跳过(取决于跳过/重试策略),然后不再写入。但是,在批处理场景中,无法知道是哪个项目导致了问题。发生故障时正在写入整个缓冲区。解决此问题的唯一方法是在每个项目之后刷新,如下图所示:

写入错误
图 5. 写入错误

这是一个常见的用例,尤其是在使用 Hibernate 时,实现的简单准则ItemWriter是在每次调用write(). 这样做可以可靠地跳过项目,Spring Batch 在内部负责处理ItemWriter错误后调用的粒度。

重用现有服务

批处理系统通常与其他应用程序样式结合使用。最常见的是在线系统,但它也可以通过移动每种应用程序样式使用的必要批量数据来支持集成甚至是胖客户端应用程序。出于这个原因,许多用户希望在他们的批处理作业中重用现有的 DAO 或其他服务是很常见的。Spring 容器本身通过允许注入任何必要的类使这变得相当容易。但是,可能存在现有服务需要充当ItemReaderorItemWriter的情况,以满足另一个 Spring Batch 类的依赖关系,或者因为它确实是主要的ItemReader一步。为每个需要包装的服务编写一个适配器类是相当简单的,但是因为这是一个常见的问题,Spring Batch 提供了实现: ItemReaderAdapterItemWriterAdapter. 这两个类都通过调用委托模式来实现标准的 Spring 方法,并且设置起来相当简单。

以下 XML 示例使用ItemReaderAdapter

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="generateFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

以下 Java 示例使用ItemReaderAdapter

Java 配置
@Bean
public ItemReaderAdapter itemReader() {
	ItemReaderAdapter reader = new ItemReaderAdapter();

	reader.setTargetObject(fooService());
	reader.setTargetMethod("generateFoo");

	return reader;
}

@Bean
public FooService fooService() {
	return new FooService();
}

需要注意的重要一点是,合同的合同targetMethod必须与合同相同read:用尽时,它会返回null。否则,它返回一个 Object. 其他任何事情都会阻止框架知道处理何时结束,导致无限循环或不正确的失败,具体取决于ItemWriter.

以下 XML 示例使用ItemWriterAdapter

XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="processFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

以下 Java 示例使用ItemWriterAdapter

Java 配置
@Bean
public ItemWriterAdapter itemWriter() {
	ItemWriterAdapter writer = new ItemWriterAdapter();

	writer.setTargetObject(fooService());
	writer.setTargetMethod("processFoo");

	return writer;
}

@Bean
public FooService fooService() {
	return new FooService();
}

防止状态持续存在

默认情况下,所有ItemReaderItemWriter实现都将其当前状态存储在ExecutionContext提交之前。但是,这可能并不总是期望的行为。例如,许多开发人员选择通过使用进程指示器使他们的数据库读取器“可重新运行”。一个额外的列被添加到输入数据以指示它是否已被处理。当读取(或写入)特定记录时,已处理标志从 翻转falsetrue。然后 SQL 语句可以在子句中包含额外的语句where,例如where PROCESSED_IND = false,从而确保在重新启动的情况下只返回未处理的记录。在这种情况下,最好不要存储任何状态,例如当前行号,因为它在重新启动时无关紧要。因此,所有读取器和写入器都包含“saveState”属性。

以下 bean 定义显示了如何防止 XML 中的状态持久性:

XML 配置
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
    </property>
    <property name="saveState" value="false" />
    <property name="sql">
        <value>
            SELECT games.player_id, games.year_no, SUM(COMPLETES),
            SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
            SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
            SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
            from games, players where players.player_id =
            games.player_id group by games.player_id, games.year_no
        </value>
    </property>
</bean>

以下 bean 定义显示了如何防止 Java 中的状态持久性:

Java 配置
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
	return new JdbcCursorItemReaderBuilder<PlayerSummary>()
				.dataSource(dataSource)
				.rowMapper(new PlayerSummaryMapper())
				.saveState(false)
				.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
				  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
				  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
				  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
				  + "from games, players where players.player_id ="
				  + "games.player_id group by games.player_id, games.year_no")
				.build();

}

上面的ItemReader配置不会ExecutionContext在它参与的任何执行中创建任何条目。

创建自定义 ItemReaders 和 ItemWriters

到目前为止,本章已经讨论了 Spring Batch 中读写的基本契约以及一些常见的实现。但是,这些都是相当通用的,并且有许多开箱即用的实现可能无法涵盖的潜在场景。本节通过一个简单的示例展示了如何创建自定义 ItemReaderItemWriter实现并正确实施他们的合同。ItemReader还实现了, ItemStream以说明如何使读取器或写入器可重新启动。

自定义ItemReader示例

出于本示例的目的,我们创建了一个ItemReader从提供的列表中读取的简单实现。我们从实现最基本的合约 ItemReaderread方法开始,如下代码所示:

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类接受一个项目列表并一次返回一个项目,从列表中删除每个项目。当列表为空时,返回null,从而满足 an 的最基本要求,ItemReader如下测试代码所示:

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使可ItemReader重启

最后的挑战是使ItemReader可重新启动。目前,如果处理被中断并重新开始,则ItemReader必须从头开始。这在许多情况下实际上是有效的,但有时最好从停止的地方重新启动批处理作业。关键的判别通常是读者是有状态的还是无状态的。无状态阅读器不需要担心可重新启动性,但有状态阅读器必须尝试在重新启动时重建其最后一个已知状态。因此,我们建议您尽可能使自定义阅读器保持无状态,这样您就不必担心可重新启动性。

如果确实需要存储状态,ItemStream则应使用该接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

在每次调用该ItemStream update方法时,当前索引ItemReader 都存储在提供ExecutionContext的键“current.index”中。调用该 ItemStream open方法时,ExecutionContext会检查它是否包含具有该键的条目。如果找到该键,则将当前索引移动到该位置。这是一个相当琐碎的例子,但它仍然符合一般合同:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数ItemReaders都有更复杂的重启逻辑。JdbcCursorItemReader例如,将最后处理的行的行 ID 存储在游标中。

还值得注意的是,其中使用的密钥ExecutionContext不应该是微不足道的。那是因为相同ExecutionContext的内容用于 a 中的所有ItemStreams内容Step。在大多数情况下,只需在键前面加上类名就足以保证唯一性。但是,在同一步骤中使用两个相同类型的极少数情况下 ItemStream(如果需要两个文件进行输出,可能会发生这种情况),需要一个更唯一的名称。出于这个原因,许多 Spring Batch ItemReaderItemWriter实现都有一个setName()属性,可以覆盖这个键名。

自定义ItemWriter示例

实现自定义ItemWriter在许多方面与ItemReader上面的示例相似,但在足以保证其自己的示例的方式上有所不同。但是,添加可重启性本质上是相同的,因此在此示例中不涉及。与 ItemReader示例一样,List使用 a 是为了使示例尽可能简单:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}
使可ItemWriter重启

为了使ItemWriter可重新启动,我们将遵循与 相同的过程 ItemReader,添加和实现ItemStream接口以同步执行上下文。在示例中,我们可能必须计算处理的项目数并将其添加为页脚记录。如果我们需要这样做,我们可以 ItemStream在我们的中实现,ItemWriter以便在重新打开流时从执行上下文中重构计数器。

在许多实际情况下,customItemWriters还委托给另一个本身可重新启动的写入程序(例如,写入文件时),或者它写入事务性资源因此不需要可重新启动,因为它是无状态的。当你有一个有状态的 writer 时,你可能应该确保ItemStream实现ItemWriter. 还请记住,编写器的客户端需要知道ItemStream,因此您可能需要在配置中将其注册为流。

项目读取器和写入器实现

在本节中,我们将向您介绍前几节中尚未讨论的读者和作者。

装饰器

在某些情况下,用户需要将专门的行为附加到预先存在的 ItemReader. Spring Batch 提供了一些开箱即用的装饰器,可以为您的实现添加额外的ItemReader行为ItemWriter

Spring Batch 包括以下装饰器:

SynchronizedItemStreamReader

当使用ItemReader非线程安全的时,Spring Batch 提供 SynchronizedItemStreamReader装饰器,可用于使ItemReader 线程安全。Spring Batch 提供了一个SynchronizedItemStreamReaderBuilder用于构造SynchronizedItemStreamReader.

SingleItemPeekableItemReader

Spring Batch 包含一个装饰器,该装饰器将 peek 方法添加到ItemReader. 这种查看方法让用户可以查看前面的一项。对 peek 的重复调用返回相同的项目,这是该read方法返回的下一个项目。Spring Batch 提供了一个 SingleItemPeekableItemReaderBuilder用于构造 SingleItemPeekableItemReader.

SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为不可能在多个线程中实现 peek。只有一个偷看的线程会在下一次读取调用中获得该项目。
SynchronizedItemStreamWriter

当使用ItemWriter非线程安全的时,Spring Batch 提供 SynchronizedItemStreamWriter装饰器,可用于使ItemWriter 线程安全。Spring Batch 提供了一个SynchronizedItemStreamWriterBuilder用于构造SynchronizedItemStreamWriter.

MultiResourceItemWriter

MultiResourceItemWriter当前ResourceAwareItemWriterItemStream资源中写入的项目数超过 itemCountLimitPerResource. Spring Batch 提供了一个MultiResourceItemWriterBuilder用于构造MultiResourceItemWriter.

ClassifierCompositeItemWriter

基于ClassifierCompositeItemWriter通过ItemWriter 提供的 Classifier. 如果所有委托都是线程安全的,则实现是线程安全的。Spring Batch 提供了一个ClassifierCompositeItemWriterBuilder用于构造 ClassifierCompositeItemWriter.

ClassifierCompositeItemProcessor

ClassifierCompositeItemProcessor是一个ItemProcessor基于ItemProcessor通过提供的Classifier. Spring Batch 提供了一个 ClassifierCompositeItemProcessorBuilder用于构造 ClassifierCompositeItemProcessor.

消息读者和作家

Spring Batch 为常用的消息传递系统提供以下阅读器和编写器:

AmqpItemReader

TheAmqpItemReader是一个ItemReader使用 anAmqpTemplate来接收或转换来自交换的消息。Spring Batch 提供了一个AmqpItemReaderBuilder用于构造AmqpItemReader.

AmqpItemWriter

AmqpItemWriter一个ItemWriter使用AmqpTemplate发送消息到 AMQP 交换。如果提供的名称未指定,则消息将发送到无名交换器AmqpTemplate。Spring Batch 提供了一个AmqpItemWriterBuilder用于构造AmqpItemWriter.

JmsItemReader

JmsItemReader是用于 JMS的ItemReader,它使用JmsTemplate. 模板应该有一个默认目的地,用于为read() 方法提供项目。Spring Batch 提供了一个JmsItemReaderBuilder用于构造 JmsItemReader.

JmsItemWriter

JmsItemWriter是用于 JMS的ItemWriter,它使用JmsTemplate. 模板应该有一个默认目的地,用于在write(List). Spring Batch 提供了一个JmsItemWriterBuilder用于构造JmsItemWriter.

KafkaItemReader

KafkaItemReaderItemReader一个 Apache Kafka 主题。它可以配置为从同一主题的多个分区读取消息。它在执行上下文中存储消息偏移量以支持重新启动功能。Spring Batch 提供了一个 KafkaItemReaderBuilder用于构造KafkaItemReader.

KafkaItemWriter

KafkaItemWriterItemWriterApache Kafka 的一个,它使用一个KafkaTemplate将事件发送到默认主题。Spring Batch 提供了一个KafkaItemWriterBuilder用于构造KafkaItemWriter.

数据库阅读器

Spring Batch 提供以下数据库阅读器:

Neo4jItemReader

Neo4jItemReader是一个ItemReader通过使用分页技术从图形数据库 Neo4j 中读取对象的方法。Spring Batch 提供了一个Neo4jItemReaderBuilder用于构造Neo4jItemReader.

MongoItemReader

MongoItemReader是一个ItemReader使用分页技术从 MongoDB 读取文档的方法。Spring Batch 提供了一个MongoItemReaderBuilder用于构造MongoItemReader.

HibernateCursorItemReader

HibernateCursorItemReader是一个ItemStreamReader用于读取建立在 Hibernate 之上的数据库记录。它执行 HQL 查询,然后在初始化时,在调用方法时迭代结果集read(),依次返回与当前行对应的对象。Spring Batch 提供了一个 HibernateCursorItemReaderBuilder用于构造 HibernateCursorItemReader.

HibernatePagingItemReader

HibernatePagingItemReader是一个ItemReader用于读取建立在 Hibernate 之上的数据库记录,并且一次只能读取固定数量的项目。Spring Batch 提供了一个HibernatePagingItemReaderBuilder用于构造 HibernatePagingItemReader.

RepositoryItemReader

RepositoryItemReader是一个ItemReader使用 a 读取记录的 a PagingAndSortingRepository。Spring Batch 提供了一个RepositoryItemReaderBuilder用于构造RepositoryItemReader.

数据库编写器

Spring Batch 提供以下数据库编写器:

Neo4jItemWriter

Neo4jItemWriter是一个ItemWriter写入 Neo4j 数据库的实现。Spring Batch 提供了一个Neo4jItemWriterBuilder用于构造 Neo4jItemWriter.

MongoItemWriter

MongoItemWriter是一个ItemWriter使用 Spring Data 的实现写入 MongoDB 存储的实现MongoOperations。Spring Batch 提供了一个 MongoItemWriterBuilder用于构造MongoItemWriter.

RepositoryItemWriter

RepositoryItemWriter是来自 Spring Data的ItemWriter包装器。CrudRepositorySpring Batch 提供了一个RepositoryItemWriterBuilder用于构造RepositoryItemWriter.

HibernateItemWriter

HibernateItemWriter使用ItemWriterHibernate 会话来保存或更新不属于当前 Hibernate 会话的实体。Spring Batch 提供了一个HibernateItemWriterBuilder用于构造HibernateItemWriter.

JdbcBatchItemWriter

JdbcBatchItemWriter是一个ItemWriter使用批处理功能 NamedParameterJdbcTemplate为所有提供的项目执行批处理语句的方法。Spring Batch 提供了一个JdbcBatchItemWriterBuilder用于构造 JdbcBatchItemWriter.

JpaItemWriter

JpaItemWriter使用ItemWriterJPAEntityManagerFactory来合并不属于持久性上下文的任何实体。Spring Batch 提供了一个 JpaItemWriterBuilder用于构造JpaItemWriter.

GemfireItemWriter

GemfireItemWriter是一个ItemWriter使用 aGemfireTemplate将 GemFire 中的项目作为键/值对存储的一个。Spring Batch 提供了一个GemfireItemWriterBuilder 用于构造GemfireItemWriter.

专业读者

Spring Batch 提供以下专业阅读器:

LdifReader

LdifReadera 读取 LDIF(LDAP 数据交换格式)记录Resource,解析它们,并LdapAttribute为每个read执行返回一个对象。Spring Batch 提供了一个LdifReaderBuilder用于构造LdifReader.

MappingLdifReader

MappingLdifReadera 读取 LDIF(LDAP 数据交换格式)记录 Resource,解析它们,然后将每个 LDIF 记录映射到 POJO(普通旧 Java 对象)。每次读取都会返回一个 POJO。Spring Batch 提供了一个MappingLdifReaderBuilder用于构造MappingLdifReader.

AvroItemReader

从资源中AvroItemReader读取序列化的 Avro 数据。每次读取都会返回一个由 Java 类或 Avro Schema 指定的类型的实例。可以选择将阅读器配置为嵌入或不嵌入 Avro 模式的输入。Spring Batch 提供了一个AvroItemReaderBuilder用于构造AvroItemReader.

专业作家

Spring Batch 提供以下专业编写器:

SimpleMailMessageItemWriter

SimpleMailMessageItemWriter是一个ItemWriter可以发送邮件的消息。它将消息的实际发送委托给MailSender. Spring Batch 提供了一个SimpleMailMessageItemWriterBuilder用于构造 SimpleMailMessageItemWriter.

AvroItemWriter

根据AvroItemWrite给定的类型或模式将 Java 对象序列化为 WriteableResource。可以选择将编写器配置为在输出中嵌入或不嵌入 Avro 模式。Spring Batch 提供了一个AvroItemWriterBuilder用于构造AvroItemWriter.

专用处理器

Spring Batch 提供以下专用处理器:

ScriptItemProcessor

ScriptItemProcessor是一个ItemProcessor将要处理的当前项目传递给提供的脚本,并且脚本的结果由处理器返回。Spring Batch 提供了一个ScriptItemProcessorBuilder用于构造 ScriptItemProcessor.


1. see XML Configuration