ItemReaders 和 ItemWriters
所有批处理都可以用最简单的形式描述为读取大量数据,执行某种类型的计算或转换,然后写出结果。Spring Batch 提供了三个关键接口来帮助执行批量读写:
ItemReader
、ItemProcessor
和ItemWriter
.
ItemReader
虽然是一个简单的概念,但它ItemReader
是从许多不同类型的输入中提供数据的方法。最普遍的例子包括:
-
平面文件:平面文件项目阅读器从平面文件中读取数据行,该文件通常描述具有由文件中的固定位置定义或由某些特殊字符(例如逗号)分隔的数据字段的记录。
-
XML:
ItemReaders
独立于用于解析、映射和验证对象的技术的 XML 处理 XML。输入数据允许针对 XSD 模式验证 XML 文件。 -
数据库:访问数据库资源以返回可以映射到对象进行处理的结果集。默认的 SQL
ItemReader
实现调用 aRowMapper
来返回对象,如果需要重新启动,则跟踪当前行,存储基本统计信息,并提供一些稍后解释的事务增强。
还有更多的可能性,但我们只关注本章的基本可能性。所有可用实现的完整列表ItemReader
可在
附录 A中找到。
ItemReader
是通用输入操作的基本接口,如下接口定义所示:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
该read
方法定义了ItemReader
. 调用它会返回一个项目,或者null
如果没有更多项目。一个项目可能代表文件中的一行、数据库中的一行或 XML 文件中的一个元素。通常期望这些映射到可用的域对象(例如Trade
、Foo
或其他),但合同中没有要求这样做。
预计ItemReader
接口的实现只是向前的。但是,如果底层资源是事务性的(例如 JMS 队列),那么调用
read
可能会在回滚场景中的后续调用中返回相同的逻辑项。还值得注意的是,缺少要由 an 处理的项目ItemReader
不会导致引发异常。例如,ItemReader
配置有返回 0 结果的查询的数据库null
在第一次调用read
.
ItemWriter
ItemWriter
在功能上与 an 相似,ItemReader
但具有逆运算。资源仍然需要定位、打开和关闭,但它们的不同之处在于
ItemWriter
写出而不是读入。在数据库或队列的情况下,这些操作可能是插入、更新或发送。输出的序列化格式特定于每个批处理作业。
与 一样ItemReader
,
ItemWriter
是一个相当通用的接口,如下面的接口定义所示:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
与read
on一样ItemReader
,write
提供了基本的合约ItemWriter
。只要它处于打开状态,它就会尝试写出传入的项目列表。因为通常期望项目被“批处理”成一个块然后输出,所以接口接受一个项目列表,而不是一个项目本身。写出列表后,可以在从 write 方法返回之前执行任何可能需要的刷新。例如,如果写入 Hibernate DAO,可以进行多次 write 调用,每个项目调用一次。然后作者可以flush
在返回之前调用休眠会话。
ItemStream
两者都很好ItemReaders
地ItemWriters
服务于它们各自的目的,但是它们之间有一个共同的问题,即需要另一个接口。通常,作为批处理作业范围的一部分,需要打开、关闭读取器和写入器,并且需要一种持久化状态的机制。该ItemStream
接口用于此目的,如以下示例所示:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在描述每种方法之前,我们应该提到ExecutionContext
. ItemReader
同样实现的客户端
ItemStream
应该在对 的open
任何调用之前调用
read
,以便打开任何资源(例如文件)或获取连接。类似的限制适用于ItemWriter
实现ItemStream
. 如第 2 章所述,如果在 中找到预期数据ExecutionContext
,则它可能用于在其初始状态以外的位置启动ItemReader
或。ItemWriter
相反,
close
调用以确保在打开期间分配的任何资源都被安全释放。
update
调用主要是为了确保当前持有的任何状态都加载到提供的ExecutionContext
. 该方法在提交前调用,以确保当前状态在提交前被持久化在数据库中。
ItemStream
在 an 的客户端是 a的特殊情况下Step
(来自 Spring Batch Core),ExecutionContext
为每个 StepExecution 创建 an 以允许用户存储特定执行的状态,并期望在JobInstance
再次启动时返回它. 对于熟悉 Quartz 的人来说,语义与 Quartz 非常相似JobDataMap
。
委托模式和注册步骤
请注意,这CompositeItemWriter
是委托模式的一个示例,在 Spring Batch 中很常见。委托本身可能实现回调接口,例如StepListener
. 如果它们这样做并且如果它们与 Spring Batch Core 一起作为 aStep
的一部分使用Job
,那么它们几乎肯定需要在Step
. 直接连接到接口的读取器、写入器或处理器在Step
实现ItemStream
或
StepListener
接口时会自动注册。但是,由于代理不知道Step
,因此需要将它们作为侦听器或流(或两者都注入,如果合适的话)。
以下示例显示了如何将委托作为 XML 流注入:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
以下示例显示了如何将委托作为 XML 流注入:
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(fooReader())
.processor(fooProcessor())
.writer(compositeItemWriter())
.stream(barWriter())
.build();
}
@Bean
public CustomCompositeItemWriter compositeItemWriter() {
CustomCompositeItemWriter writer = new CustomCompositeItemWriter();
writer.setDelegate(barWriter());
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter();
}
平面文件
交换批量数据的最常见机制之一一直是平面文件。与 XML 不同,XML 具有定义其结构 (XSD) 的公认标准,任何阅读平面文件的人都必须提前了解文件的结构。一般来说,所有平面文件都分为两种类型:定界文件和定长文件。分隔文件是其中字段由分隔符(例如逗号)分隔的文件。固定长度文件具有设定长度的字段。
这FieldSet
在 Spring Batch 中处理平面文件时,无论是用于输入还是输出,最重要的类之一是FieldSet
. 许多体系结构和库包含帮助您从文件中读取的抽象,但它们通常返回一个String
或一组String
对象。这真的只能让你走到一半。AFieldSet
是 Spring Batch 的抽象,用于启用来自文件资源的字段绑定。它允许开发人员以与处理数据库输入相同的方式处理文件输入。AFieldSet
在概念上类似于 JDBC
ResultSet
。AFieldSet
只需要一个参数:aString
令牌数组。(可选)您还可以配置字段的名称,以便可以通过索引或名称访问字段ResultSet
,如以下示例所示:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
界面上还有更多的选项FieldSet
,如Date
、long 、
BigDecimal
等。最大的优点FieldSet
是它提供了对平面文件输入的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以保持一致,而不是每个批处理作业以潜在的意外方式进行不同的解析。
FlatFileItemReader
平面文件是最多包含二维(表格)数据的任何类型的文件。在 Spring Batch 框架中读取平面文件是由名为 的类促进的
FlatFileItemReader
,它提供了读取和解析平面文件的基本功能。两个最重要的必需依赖项FlatFileItemReader
是
Resource
和LineMapper
。该LineMapper
接口将在下一节中进行更多探讨。resource 属性表示一个 Spring Core Resource
。说明如何创建这种类型的 bean 的文档可以在
Spring Framework 第 5 章资源中找到。Resource
因此,除了显示以下简单示例之外,本指南不会详细介绍创建对象:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由企业应用程序集成 (EAI) 基础设施管理,其中为外部接口建立放置区,用于将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动实用程序超出了 Spring Batch 架构的范围,但是批处理作业流将文件移动实用程序作为作业流中的步骤包括在内并不罕见。批处理架构只需要知道如何定位要处理的文件。Spring Batch 从这个起点开始将数据输入管道的过程。但是, Spring Integration提供了许多这些类型的服务。
中的其他属性可FlatFileItemReader
让您进一步指定数据的解释方式,如下表所述:
财产 | 类型 | 描述 |
---|---|---|
注释 |
细绳[] |
指定指示注释行的行前缀。 |
编码 |
细绳 |
指定要使用的文本编码。默认值为 的值 |
线映射器 |
|
将 a 转换 |
linesToSkip |
整数 |
文件顶部要忽略的行数。 |
记录分隔符策略 |
记录分隔符策略 |
用于确定行尾在哪里,并在带引号的字符串内执行诸如继续行尾之类的操作。 |
资源 |
|
要从中读取的资源。 |
skippedLinesCallback |
LineCallbackHandler |
传递文件中要跳过的行的原始行内容的接口。如果 |
严格的 |
布尔值 |
|
LineMapper
与 一样RowMapper
,它采用低级构造,例如ResultSet
并返回 an Object
,平面文件处理需要相同的构造来将String
line 转换为 an Object
,如以下接口定义所示:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
基本约定是,给定当前行和与其关联的行号,映射器应返回结果域对象。这类似于
RowMapper
,因为每一行都与其行号相关联,就像 a 中的每一行都
ResultSet
与其行号相关联。这允许将行号绑定到生成的域对象,以进行身份比较或获取更多信息的日志记录。但是,与 不同RowMapper
的是,LineMapper
它给出了一条原始线,如上所述,它只会让你走到一半。该行必须被标记为 a FieldSet
,然后可以映射到一个对象,如本文档后面所述。
LineTokenizer
将输入行转换为 a 的抽象FieldSet
是必要的,因为可能有许多格式的平面文件数据需要转换为FieldSet
. 在 Spring Batch 中,这个接口是LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
a 的契约LineTokenizer
是这样的,给定一行输入(理论上
String
可以包含多行),FieldSet
返回代表该行的 a。然后FieldSet
可以将其传递给FieldSetMapper
. Spring Batch 包含以下LineTokenizer
实现:
-
DelimitedLineTokenizer
:用于记录中的字段由分隔符分隔的文件。最常见的分隔符是逗号,但也经常使用管道或分号。 -
FixedLengthTokenizer
:用于记录中的每个字段都是“固定宽度”的文件。必须为每种记录类型定义每个字段的宽度。 -
PatternMatchingCompositeLineTokenizer
:LineTokenizer
通过检查模式来确定应在特定行上使用标记器列表中的哪个。
FieldSetMapper
该FieldSetMapper
接口定义了一个方法,mapFieldSet
该方法接受一个
FieldSet
对象并将其内容映射到一个对象。此对象可能是自定义 DTO、域对象或数组,具体取决于作业的需要。与FieldSetMapper
结合使用,LineTokenizer
将资源中的一行数据转换为所需类型的对象,如下面的接口定义所示:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
RowMapper
使用的模式与使用的相同JdbcTemplate
。
DefaultLineMapper
现在已经定义了读取平面文件的基本接口,很明显需要三个基本步骤:
-
从文件中读取一行。
-
String
将该行传递给LineTokenizer#tokenize()
方法以检索FieldSet
. -
将标记化返回的值传递
FieldSet
给 aFieldSetMapper
,从方法返回结果ItemReader#read()
。
上面描述的两个接口代表两个独立的任务:将行转换为 a
FieldSet
并将 a 映射FieldSet
到域对象。因为 a
LineTokenizer
的输入与LineMapper
(a 行)的输入相匹配,并且 a 的输出
FieldSetMapper
与 的输出相匹配,所以提供了同时使用 a和 aLineMapper
的默认实现。下面的类定义中显示的 代表大多数用户需要的行为:LineTokenizer
FieldSetMapper
DefaultLineMapper
public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
上述功能是在默认实现中提供的,而不是内置在阅读器本身中(就像在以前版本的框架中所做的那样),以允许用户在控制解析过程方面具有更大的灵活性,尤其是在需要访问原始行的情况下。
简单的分隔文件读取示例
以下示例说明了如何读取具有实际域场景的平面文件。这个特定的批处理作业从以下文件中读取足球运动员:
ID、姓氏、名字、职位、出生年份、出道年份 "AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996", "AbduRa00,Abdullah,Rabih,rb,1975,1999", "AberWa00,Abercrombie,Walter,rb,1959,1982", "AbraDa00,Abramowicz,Danny,wr,1945,1967", "AdamBo00,Adams,Bob,te,1946,1969", “AdamCh00,亚当斯,查理,wr,1979,2003”
此文件的内容映射到以下
Player
域对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
要将 a 映射FieldSet
到Player
对象中,FieldSetMapper
需要定义返回玩家的 a,如下例所示:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后可以通过正确构造 aFlatFileItemReader
和调用
来读取该文件read
,如下例所示:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
每次调用都会从文件中的每一行read
返回一个新
Player
对象。当到达文件末尾时,null
返回。
按名称映射字段
两者都允许另外一项功能,
DelimitedLineTokenizer
并且FixedLengthTokenizer
在功能上类似于 JDBC ResultSet
。字段的名称可以注入到这些
LineTokenizer
实现中的任何一个中,以增加映射函数的可读性。首先,将平面文件中所有字段的列名注入到分词器中,如下例所示:
tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});
AFieldSetMapper
可以按如下方式使用此信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if (fs == null) {
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
将 FieldSet 自动映射到域对象
对于许多人来说,必须编写一个具体FieldSetMapper
的内容与RowMapper
为一个JdbcTemplate
. FieldSetMapper
Spring Batch 通过使用 JavaBean 规范将字段名称与对象上的 setter 匹配来自动映射字段,从而使这变得更容易。
再次使用足球示例,BeanWrapperFieldSetMapper
配置类似于以下 XML 片段:
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
再次使用足球示例,BeanWrapperFieldSetMapper
配置类似于 Java 中的以下代码段:
@Bean
public FieldSetMapper fieldSetMapper() {
BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
fieldSetMapper.setPrototypeBeanName("player");
return fieldSetMapper;
}
@Bean
@Scope("prototype")
public Player player() {
return new Player();
}
对于 中的每个条目FieldSet
,映射器在对象的新实例上查找相应的设置器Player
(因此,需要原型范围),就像 Spring 容器查找与属性名称匹配的设置器一样。FieldSet
映射中的每个可用字段,并Player
返回结果对象,无需代码。
固定长度文件格式
到目前为止,只详细讨论了分隔文件。但是,它们仅代表文件读取图片的一半。许多使用平面文件的组织使用固定长度格式。示例固定长度文件如下:
UK21341EAH4121131.11customer1 UK21341EAH4221232.11customer2 UK21341EAH4321333.11customer3 UK21341EAH4421434.11customer4 UK21341EAH4521535.11customer5
虽然这看起来像一个大字段,但它实际上代表了 4 个不同的字段:
-
ISIN:所订购商品的唯一标识符 - 12 个字符长。
-
数量:订购商品的数量 - 3 个字符长。
-
价格:商品的价格 - 5 个字符长。
-
客户:订购商品的客户 ID - 9 个字符长。
配置 时FixedLengthLineTokenizer
,必须以范围的形式提供这些长度中的每一个。
以下示例显示如何为FixedLengthLineTokenizer
in XML 定义范围:
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
因为FixedLengthLineTokenizer
使用与LineTokenizer
前面讨论的相同的接口,所以它返回的结果与FieldSet
使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用
BeanWrapperFieldSetMapper
.
支持上述范围语法需要
|
以下示例显示了如何FixedLengthLineTokenizer
在 Java 中定义范围:
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
tokenizer.setColumns(new Range(1, 12),
new Range(13, 15),
new Range(16, 20),
new Range(21, 29));
return tokenizer;
}
因为FixedLengthLineTokenizer
使用与LineTokenizer
上面讨论的相同的接口,所以它返回的结果与FieldSet
使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用
BeanWrapperFieldSetMapper
.
单个文件中的多种记录类型
为简单起见,到目前为止所有的文件读取示例都做了一个关键假设:文件中的所有记录都具有相同的格式。然而,情况可能并非总是如此。一个文件可能包含不同格式的记录,这些记录需要进行不同的标记并映射到不同的对象,这是很常见的。以下文件摘录说明了这一点:
用户;史密斯;彼得;;T;20014539;F LINEA;1044391041ABC037.49G201XX1383.12H LINEB;2134776319DEF422.99M005LI
在这个文件中,我们有三种类型的记录,“USER”、“LINEA”和“LINEB”。“USER”行对应一个User
对象。“LINEA”和“LINEB”都对应于Line
对象,尽管“LINEA”比“LINEB”具有更多信息。
ItemReader
单独读取每一行,但我们必须指定不同
的对象LineTokenizer
,FieldSetMapper
以便ItemWriter
接收正确的项目。通过PatternMatchingCompositeLineMapper
允许配置模式映射LineTokenizers
和模式来使这变得容易FieldSetMappers
。
以下示例显示如何为FixedLengthLineTokenizer
in XML 定义范围:
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
PatternMatchingCompositeLineMapper lineMapper =
new PatternMatchingCompositeLineMapper();
Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
tokenizers.put("USER*", userTokenizer());
tokenizers.put("LINEA*", lineATokenizer());
tokenizers.put("LINEB*", lineBTokenizer());
lineMapper.setTokenizers(tokenizers);
Map<String, FieldSetMapper> mappers = new HashMap<>(2);
mappers.put("USER*", userFieldSetMapper());
mappers.put("LINE*", lineFieldSetMapper());
lineMapper.setFieldSetMappers(mappers);
return lineMapper;
}
在此示例中,“LINEA”和“LINEB”具有不同LineTokenizer
的实例,但它们都使用相同的FieldSetMapper
.
PatternMatchingCompositeLineMapper
使用该方法PatternMatcher#match
为每一行选择正确的委托。允许两个具有特殊含义的PatternMatcher
通配符:问号 ("?") 匹配一个字符,而星号 ("*") 匹配零个或多个字符。请注意,在前面的配置中,所有模式都以星号结尾,使它们有效地作为行的前缀。无论配置中的PatternMatcher
顺序如何,始终匹配最具体的模式。因此,如果“LINE*”和“LINEA*”都被列为模式,“LINEA”将匹配模式“LINEA*”,而“LINEB”将匹配模式“LINE*”。此外,单个星号(“*”
以下示例显示如何匹配 XML 中任何其他模式都不匹配的行:
<entry key="*" value-ref="defaultLineTokenizer" />
以下示例显示了如何匹配 Java 中任何其他模式都不匹配的行:
...
tokenizers.put("*", defaultLineTokenizer());
...
还有一个PatternMatchingCompositeLineTokenizer
可以单独用于标记化。
平面文件通常包含跨越多行的记录。为了处理这种情况,需要更复杂的策略。在示例中可以找到这种常见模式的演示multiLineRecords
。
平面文件中的异常处理
标记一行可能会导致抛出异常的情况有很多。许多平面文件并不完美,并且包含格式不正确的记录。许多用户在记录问题、原始行和行号时选择跳过这些错误行。以后可以手动或通过另一个批处理作业检查这些日志。出于这个原因,Spring Batch 提供了一个层次结构的异常来处理解析异常:
FlatFileParseException
和FlatFileFormatException
. 在尝试读取文件时遇到任何错误时FlatFileParseException
抛出。由
接口的实现抛出并指示在标记化时遇到的更具体的错误。FlatFileItemReader
FlatFileFormatException
LineTokenizer
IncorrectTokenCountException
两者DelimitedLineTokenizer
都FixedLengthLineTokenizer
可以指定可用于创建FieldSet
. 但是,如果列名的数量与标记行时找到的列数不匹配,FieldSet
则无法创建并IncorrectTokenCountException
抛出 an,其中包含遇到的标记数和预期的数量,如下例所示:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
因为标记器配置了 4 个列名,但在文件中只找到了 3 个标记,所以IncorrectTokenCountException
抛出了一个。
IncorrectLineLengthException
以固定长度格式格式化的文件在解析时有额外的要求,因为与分隔格式不同,每列必须严格遵守其预定义的宽度。如果总行长不等于该列的最宽值,则抛出异常,如下例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上面标记器的配置范围是:1-5、6-10 和 11-15。因此,该行的总长度为 15。但是,在前面的示例中,传入了长度为 5 的行,导致IncorrectLineLengthException
抛出 an。在此处抛出异常而不是仅映射第一列允许该行的处理更早地失败,并且如果在尝试读取 a 中的第 2 列时失败,它将包含更多的信息FieldSetMapper
。但是,在某些情况下,线的长度并不总是恒定的。因此,可以通过 'strict' 属性关闭行长验证,如下例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
前面的例子几乎和前面的例子一样,只是它
tokenizer.setStrict(false)
被调用了。此设置告诉标记器在标记行时不强制行长度。AFieldSet
现在已正确创建并返回。但是,它只包含剩余值的空标记。
FlatFileItemWriter
写入平面文件具有相同的问题和从文件读取必须克服的问题。步骤必须能够以事务方式编写定界或固定长度格式。
LineAggregator
正如LineTokenizer
接口对于获取项目并将其转换为 是必需的
一样String
,文件写入必须有一种方法将多个字段聚合为单个字符串以写入文件。在 Spring Batch 中,这是LineAggregator
,如下面的接口定义所示:
public interface LineAggregator<T> {
public String aggregate(T item);
}
是的LineAggregator
逻辑反义词LineTokenizer
。LineTokenizer
接受 a
String
并返回 a FieldSet
,而LineAggregator
接受 aitem
并返回 a
String
。
PassThroughLineAggregator
LineAggregator
接口
最基本的实现是PassThroughLineAggregator
,它假设对象已经是一个字符串或者它的字符串表示是可以写的,如下代码所示:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
FlatFileItemWriter
如果需要直接控制创建字符串但需要 a 的优点(例如事务和重新启动支持),则上述实现很有用。
简化文件写入示例
既然已经定义了LineAggregator
接口及其最基本的实现,
PassThroughLineAggregator
那么就可以解释基本的编写流程了:
-
将要写入的对象传递给
LineAggregator
以获取String
. -
返回
String
的内容写入配置文件。
以下摘录FlatFileItemWriter
在代码中表达了这一点:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
在 XML 中,一个简单的配置示例可能如下所示:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
在 Java 中,一个简单的配置示例可能如下所示:
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<Foo>()
.name("itemWriter")
.resource(new FileSystemResource("target/test-outputs/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
FieldExtractor
前面的示例对于写入文件的最基本用途可能很有用。但是,大多数用户FlatFileItemWriter
都有一个需要写出的域对象,因此必须将其转换为一行。在文件读取中,需要以下内容:
-
从文件中读取一行。
-
将该行传递给
LineTokenizer#tokenize()
方法,以便检索FieldSet
. -
将标记化返回的值传递
FieldSet
给 aFieldSetMapper
,从方法返回结果ItemReader#read()
。
文件写入有类似但相反的步骤:
-
将要写入的项目传递给编写器。
-
将项目上的字段转换为数组。
-
将结果数组聚合成一行。
因为框架无法知道需要写出对象中的哪些字段,所以FieldExtractor
必须写a来完成将项变成数组的任务,如下面的接口定义所示:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
接口的实现FieldExtractor
应该从提供的对象的字段创建一个数组,然后可以用元素之间的分隔符或作为固定宽度行的一部分写出。
PassThroughFieldExtractor
在许多情况下,需要写出一个集合,例如一个数组、Collection
或。FieldSet
从其中一种集合类型中“提取”一个数组非常简单。为此,请将集合转换为数组。因此,
PassThroughFieldExtractor
应该在这种情况下使用。需要注意的是,如果传入的对象不是集合类型,则PassThroughFieldExtractor
返回一个仅包含要提取的项的数组。
BeanWrapperFieldExtractor
与BeanWrapperFieldSetMapper
文件读取部分中的描述一样,通常最好配置如何将域对象转换为对象数组,而不是自己编写转换。提供BeanWrapperFieldExtractor
此功能,如下例所示:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
这个提取器实现只有一个必需的属性:要映射的字段的名称。正如BeanWrapperFieldSetMapper
需要将字段名称映射
FieldSet
到所提供对象的 setter 上的字段一样,BeanWrapperFieldExtractor
需要将名称映射到 getter 以创建对象数组。值得注意的是,名称的顺序决定了数组中字段的顺序。
分隔文件写入示例
最基本的平面文件格式是所有字段都由分隔符分隔的格式。这可以使用DelimitedLineAggregator
. 以下示例写出一个简单的域对象,该对象表示对客户帐户的信用:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
因为正在使用域对象,所以FieldExtractor
必须提供接口的实现以及要使用的分隔符。
以下示例显示了如何FieldExtractor
在 XML 中使用带分隔符的 :
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
以下示例显示了如何FieldExtractor
在 Java 中使用带分隔符:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
在前面的示例BeanWrapperFieldExtractor
中,本章前面描述的用于将其中的名称和信用字段CustomerCredit
转换为对象数组,然后在每个字段之间用逗号写出。
也可以使用FlatFileItemWriterBuilder.DelimitedBuilder
自动创建BeanWrapperFieldExtractor
and DelimitedLineAggregator
,如下例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.delimited()
.delimiter("|")
.names(new String[] {"name", "credit"})
.build();
}
固定宽度文件写入示例
分隔不是唯一的平面文件格式类型。许多人更喜欢为每列使用设置宽度来划分字段之间的界限,这通常称为“固定宽度”。Spring Batch 在使用FormatterLineAggregator
.
使用与上述相同的CustomerCredit
域对象,可以在 XML 中进行如下配置:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
使用与上述相同的CustomerCredit
域对象,可以在 Java 中进行如下配置:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
前面的大多数示例应该看起来很熟悉。但是,格式属性的值是新的。
以下示例显示了 XML 中的格式属性:
<property name="format" value="%-9s%-2.0f" />
以下示例显示了 Java 中的格式属性:
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...
底层实现是使用
Formatter
作为 Java 5 的一部分添加的相同内容构建的。Java
Formatter
基于
printf
C 编程语言的功能。有关如何配置格式化程序的大多数详细信息可以在Formatter的 Javadoc 中找到。
也可以使用FlatFileItemWriterBuilder.FormattedBuilder
自动创建BeanWrapperFieldExtractor
and FormatterLineAggregator
,如下例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.formatted()
.format("%-9s%-2.0f")
.names(new String[] {"name", "credit"})
.build();
}
处理文件创建
FlatFileItemReader
与文件资源有非常简单的关系。初始化阅读器时,它会打开文件(如果存在),如果不存在则抛出异常。文件写入并不是那么简单。乍一看,似乎应该存在类似的简单契约FlatFileItemWriter
:如果文件已经存在,则抛出异常,如果不存在,则创建它并开始编写。但是,可能重新启动 aJob
可能会导致问题。在正常的重启场景中,合约是相反的:如果文件存在,则从最后一个已知的正确位置开始写入,如果不存在,则抛出异常。但是,如果此作业的文件名始终相同,会发生什么情况?在这种情况下,如果文件存在,您可能希望删除该文件,除非它是重新启动。由于这种可能性,FlatFileItemWriter
包含属性,shouldDeleteIfExists
。将此属性设置为 true 会导致在打开编写器时删除同名的现有文件。
XML 项读取器和写入器
Spring Batch 提供了用于读取 XML 记录并将它们映射到 Java 对象以及将 Java 对象写入为 XML 记录的事务基础结构。
对流式 XML 的限制
StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理要求(DOM 一次将整个输入加载到内存中,而 SAX 通过允许用户仅提供回调来控制解析过程)。 |
我们需要考虑 XML 输入和输出在 Spring Batch 中是如何工作的。首先,有一些概念与文件读取和写入不同,但在 Spring Batch XML 处理中很常见。使用 XML 处理,而不是需要标记的记录行(FieldSet
实例),假定 XML 资源是对应于各个记录的“片段”的集合,如下图所示:
“交易”标签在上述场景中被定义为“根元素”。'<trade>' 和 '</trade>' 之间的所有内容都被视为一个“片段”。Spring Batch 使用 Object/XML Mapping (OXM) 将片段绑定到对象。但是,Spring Batch 不依赖于任何特定的 XML 绑定技术。典型的用途是委托给 Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,您可以选择实现 Spring Batch 特定的接口。与 OXM 支持的技术的关系如下图所示:
通过对 OXM 的介绍以及如何使用 XML 片段来表示记录,我们现在可以更仔细地检查读取器和写入器。
StaxEventItemReader
该StaxEventItemReader
配置提供了用于处理来自 XML 输入流的记录的典型设置。首先,考虑StaxEventItemReader
can 处理的以下 XML 记录集:
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
为了能够处理 XML 记录,需要以下内容:
-
根元素名称:构成要映射的对象的片段的根元素的名称。示例配置通过贸易价值证明了这一点。
-
Resource:表示要读取的文件的 Spring Resource。
-
Unmarshaller
:Spring OXM 提供的一种解组工具,用于将 XML 片段映射到对象。
以下示例显示如何定义StaxEventItemReader
与名为 的根元素trade
、 的资源和在 XML 中data/iosample/input/input.xml
调用的解组器一起使用的 a:tradeMarshaller
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
以下示例显示如何定义StaxEventItemReader
与名为 的根元素trade
、 的资源和在 Java 中data/iosample/input/input.xml
调用的解组器一起使用的 a:tradeMarshaller
@Bean
public StaxEventItemReader itemReader() {
return new StaxEventItemReaderBuilder<Trade>()
.name("itemReader")
.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(tradeMarshaller())
.build();
}
请注意,在此示例中,我们选择使用 an XStreamMarshaller
,它接受作为映射传入的别名,其中第一个键和值是片段的名称(即根元素)和要绑定的对象类型。然后,与 a 类似FieldSet
,映射到对象类型中的字段的其他元素的名称在映射中被描述为键/值对。在配置文件中,我们可以使用 Spring 配置实用程序来描述所需的别名。
以下示例显示如何在 XML 中描述别名:
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
以下示例显示了如何在 Java 中描述别名:
@Bean
public XStreamMarshaller tradeMarshaller() {
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
return marshaller;
}
在输入时,阅读器读取 XML 资源,直到它识别出一个新的片段即将开始。默认情况下,阅读器匹配元素名称以识别新片段即将开始。阅读器从片段创建一个独立的 XML 文档,并将文档传递给反序列化器(通常是 Spring OXM 的包装器Unmarshaller
)以将 XML 映射到 Java 对象。
总之,这个过程类似于下面的 Java 代码,它使用了 Spring 配置提供的注入:
StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true;
Trade trade = null;
while (hasNext) {
trade = xmlStaxEventItemReader.read();
if (trade == null) {
hasNext = false;
}
else {
System.out.println(trade);
}
}
StaxEventItemWriter
输出与输入对称。StaxEventItemWriter
需要一个Resource
、一个编组器和一个rootTagName
. Resource
一个 Java 对象被传递给一个编组器(通常是一个标准的 Spring OXM 编组器),该编组器使用一个自定义事件编写器来写入 a ,该编写器过滤OXM 工具为每个片段生成的事件StartDocument
。EndDocument
以下 XML 示例使用MarshallingEventWriterSerializer
:
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="rootTagName" value="trade" />
<property name="overwriteOutput" value="true" />
</bean>
以下 Java 示例使用MarshallingEventWriterSerializer
:
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
return new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(tradeMarshaller())
.resource(outputResource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
}
前面的配置设置了三个必需的属性并设置了
overwriteOutput=true
本章前面提到的可选属性,用于指定是否可以覆盖现有文件。
以下 XML 示例使用与本章前面的阅读示例中使用的相同的编组器:
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
以下 Java 示例使用与本章前面的阅读示例中使用的相同的编组器:
@Bean
public XStreamMarshaller customerCreditMarshaller() {
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
marshaller.setAliases(aliases);
return marshaller;
}
总结一个 Java 示例,以下代码说明了讨论的所有要点,演示了所需属性的编程设置:
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
StaxEventItemWriter staxItemWriter =
new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(marshaller)
.resource(resource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
staxItemWriter.afterPropertiesSet();
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);
JSON 项目读取器和写入器
Spring Batch 提供对以下格式的 JSON 资源的读写支持:
[
{
"isin": "123",
"quantity": 1,
"price": 1.2,
"customer": "foo"
},
{
"isin": "456",
"quantity": 2,
"price": 1.4,
"customer": "bar"
}
]
假设 JSON 资源是对应于单个项目的 JSON 对象数组。Spring Batch 不绑定到任何特定的 JSON 库。
JsonItemReader
将JsonItemReader
JSON 解析和绑定委托给
org.springframework.batch.item.json.JsonObjectReader
接口的实现。此接口旨在通过使用流式 API 以块读取 JSON 对象来实现。目前提供了两种实现:
为了能够处理 JSON 记录,需要以下内容:
-
Resource
:代表要读取的 JSON 文件的 Spring 资源。 -
JsonObjectReader
:一个 JSON 对象阅读器,用于解析 JSON 对象并将其绑定到项目
下面的例子展示了如何定义一个JsonItemReader
与之前的 JSON 资源一起工作的org/springframework/batch/item/json/trades.json
a 和一个
JsonObjectReader
基于 Jackson 的 a:
@Bean
public JsonItemReader<Trade> jsonItemReader() {
return new JsonItemReaderBuilder<Trade>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonItemReader")
.build();
}
JsonFileItemWriter
将JsonFileItemWriter
项目编组委托给
org.springframework.batch.item.json.JsonObjectMarshaller
接口。该接口的约定是获取一个对象并将其编组为 JSON String
。目前提供了两种实现:
为了能够写入 JSON 记录,需要以下内容:
-
Resource
Resource
:代表要写入的 JSON 文件的 Spring -
JsonObjectMarshaller
:将对象编组为 JSON 格式的 JSON 对象编组器
以下示例显示了如何定义 a JsonFileItemWriter
:
@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
return new JsonFileItemWriterBuilder<Trade>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonFileItemWriter")
.build();
}
多文件输入
在单个Step
. 假设文件都具有相同的格式,则MultiResourceItemReader
支持 XML 和平面文件处理的这种类型的输入。考虑目录中的以下文件:
文件-1.txt 文件-2.txt 被忽略的.txt
file-1.txt 和 file-2.txt 的格式相同,出于业务原因,应一起处理。可MultiResourceItemReader
用于通过使用通配符读取两个文件。
以下示例显示了如何在 XML 中读取带有通配符的文件:
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
以下示例显示了如何在 Java 中读取带有通配符的文件:
@Bean
public MultiResourceItemReader multiResourceReader() {
return new MultiResourceItemReaderBuilder<Foo>()
.delegate(flatFileItemReader())
.resources(resources())
.build();
}
引用的委托是一个简单的FlatFileItemReader
. 上面的配置从两个文件中读取输入,处理回滚和重启场景。应该注意的是,与 any 一样ItemReader
,添加额外的输入(在本例中为文件)可能会在重新启动时导致潜在的问题。建议批处理作业使用它们自己的单独目录,直到成功完成。
输入资源通过 using 进行排序,MultiResourceItemReader#setComparator(Comparator)
以确保在重新启动场景中的作业运行之间保留资源排序。
|
数据库
像大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,由于系统必须使用的数据集的庞大规模,批处理与其他应用程序样式不同。如果一条 SQL 语句返回 100 万行,则结果集可能会将所有返回的结果保存在内存中,直到读取所有行。Spring Batch 为这个问题提供了两种类型的解决方案:
基于光标的ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库对“流式传输”关系数据问题的解决方案。JavaResultSet
类本质上是一种用于操作游标的面向对象的机制。AResultSet
维护一个指向当前数据行的游标。调用next
a
ResultSet
会将此光标移动到下一行。Spring Batch 基于游标的ItemReader
实现在初始化时打开一个游标,并在每次调用时将游标向前移动一行read
,返回一个可用于处理的映射对象。然后调用该
close
方法以确保释放所有资源。Spring 核心
JdbcTemplate
通过使用回调模式将所有行完全映射到一个ResultSet
并在将控制权返回给方法调用者之前关闭。但是,在批处理中,这必须等到步骤完成。下图显示了基于光标的ItemReader
工作原理的通用图。请注意,虽然该示例使用 SQL(因为 SQL 广为人知),但任何技术都可以实现基本方法。
这个例子说明了基本模式。给定一个 'FOO' 表,它具有三列:
ID
、NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。这会将光标(第 1 行)的开头放在 ID 2 上。这一行的结果应该是一个完全映射的Foo
对象。再次调用read()
会将光标移动到下一行,即Foo
ID 为 3 的 。这些读取的结果在每个 之后写出
read
,从而允许对对象进行垃圾收集(假设没有实例变量维护对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标技术的 JDBC 实现。它直接与 a 一起工作,ResultSet
并且需要一条 SQL 语句来针对从 a 获得的连接运行DataSource
。以下数据库模式用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人喜欢为每一行使用一个域对象,所以下面的例子使用RowMapper
接口的一个实现来映射一个CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由于JdbcCursorItemReader
与 共享关键接口JdbcTemplate
,因此查看如何使用 读取此数据的示例JdbcTemplate
以将其与 进行对比很有用ItemReader
。出于本示例的目的,假设CUSTOMER
数据库中有 1,000 行。第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段后,customerCredits
列表包含 1,000 个
CustomerCredit
对象。在查询方法中,从 中获取连接
DataSource
,针对它运行提供的 SQL,并mapRow
为 中的每一行调用该方法ResultSet
。将此与 的方法进行对比
JdbcCursorItemReader
,如下例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段后,计数器等于 1,000。如果上面的代码将返回的值放入一个列表中,结果将与示例customerCredit
完全相同。JdbcTemplate
但是,它的最大优点ItemReader
是它允许“流式传输”项目。该read
方法可以调用一次,可以用 写出项目ItemWriter
,然后可以用 获取下一个项目
read
。这允许项目的读取和写入在“块”中完成并定期提交,这是高性能批处理的本质。此外,它很容易配置为注入 Spring Batch Step
。
以下示例显示了如何将一个注入ItemReader
到Step
XML 中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例展示了如何在 Java 中将an 注入ItemReader
到 a中:Step
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
附加属性
因为在 Java 中打开游标有很多不同的选项,所以JdbcCursorItemReader
可以设置许多属性,如下表所述:
忽略警告 |
确定是否记录 SQLWarnings 或导致异常。默认值为 |
取大小 |
当. |
最大行数 |
设置底层证券 |
查询超时 |
|
验证光标位置 |
因为由 |
保存状态 |
指示是否应将阅读器的状态保存在
|
驱动程序支持绝对 |
指示 JDBC 驱动程序是否支持在 |
setUseSharedExtendedConnection |
指示用于游标的连接是否应由所有其他处理使用,从而共享同一事务。如果将其设置为 |
HibernateCursorItemReader
就像普通的 Spring 用户对是否使用 ORM 解决方案做出重要决定一样,这会影响他们是否使用 aJdbcTemplate
或 a
HibernateTemplate
,Spring Batch 用户也有相同的选择。
HibernateCursorItemReader
是游标技术的 Hibernate 实现。Hibernate 在批处理中的使用颇具争议。这主要是因为 Hibernate 最初是为了支持在线应用程序样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession
而不是标准会话。这消除了 Hibernate 使用的所有缓存和脏检查,这可能会导致批处理场景中的问题。有关无状态和正常休眠会话之间差异的更多信息,请参阅特定休眠版本的文档。HibernateCursorItemReader
允许您声明 HQL 语句并传入 a ,这将在
每次
SessionFactory
调用时传回一个项目,以与JdbcCursorItemReader
. 以下示例配置使用与 JDBC 读取器相同的“客户信用”示例:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
假设已为表正确创建了休眠映射文件,此配置以与 描述的完全相同的方式ItemReader
返回对象。'useStatelessSession' 属性默认为 true,但已在此处添加以提醒人们注意打开或关闭它的能力。还值得注意的是,可以使用该
属性设置底层游标的获取大小。与 一样,配置很简单。CustomerCredit
JdbcCursorItemReader
Customer
setFetchSize
JdbcCursorItemReader
以下示例显示了如何ItemReader
在 XML 中注入 Hibernate:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
以下示例显示了如何ItemReader
在 Java 中注入 Hibernate:
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
StoredProcedureItemReader
有时需要使用存储过程来获取游标数据。其
StoredProcedureItemReader
工作方式与 类似JdbcCursorItemReader
,不同之处在于,它不是运行查询来获取游标,而是运行返回游标的存储过程。存储过程可以通过三种不同的方式返回游标:
-
作为返回值
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为作为 out 参数返回的引用游标(由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
以下 XML 示例配置使用与前面示例相同的“客户信用”示例:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下 Java 示例配置使用与前面示例相同的“客户信用”示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
前面的示例依赖存储过程来提供 aResultSet
作为返回结果(前面的选项 1)。
如果存储过程返回 a ref-cursor
(选项 2),那么我们需要提供返回的 out 参数的位置ref-cursor
。
以下示例显示了如何使用 XML 中的第一个参数作为引用游标:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例显示了如何使用第一个参数作为 Java 中的 ref-cursor:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
如果游标是从存储函数返回的(选项 3),我们需要将属性“ function ”设置为true
. 它默认为false
.
以下示例显示true
了 XML 中的属性 to:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例显示true
了 Java 中的属性:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
在所有这些情况下,我们需要定义 aRowMapper
以及 aDataSource
和实际的过程名称。
如果存储过程或函数接受参数,则必须使用parameters
属性声明和设置它们。以下示例对于 Oracle,声明了三个参数。第一个是out
返回 ref-cursor 的参数,第二个和第三个是接受 type 值的参数INTEGER
。
以下示例显示了如何使用 XML 中的参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
以下示例显示了如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
除了参数声明之外,我们还需要指定一个PreparedStatementSetter
设置调用参数值的实现。这与JdbcCursorItemReader
上述相同。附加属性中列出的所有
附加属性也适用于StoredProcedureItemReader
。
分页ItemReader
实现
使用数据库游标的替代方法是运行多个查询,其中每个查询获取部分结果。我们将此部分称为页面。每个查询都必须指定起始行号和我们希望在页面中返回的行数。
JdbcPagingItemReader
分页的一种实现ItemReader
是JdbcPagingItemReader
. JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供用于检索组成页面的行的 SQL 查询。
由于每个数据库都有自己的策略来提供分页支持,因此我们需要PagingQueryProvider
为每种支持的数据库类型使用不同的策略。还有SqlPagingQueryProviderFactoryBean
一个自动检测正在使用的数据库并确定适当的
PagingQueryProvider
实现。这简化了配置,是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求您指定一个子句select
和一个
from
子句。您还可以提供一个可选where
子句。这些子句和 requiredsortKey
用于构建 SQL 语句。
重要的是对 有一个唯一的键约束,sortKey 以保证在执行之间不会丢失任何数据。
|
read
打开阅读器后,它会以与任何其他相同的基本方式将每次调用传回一个项目ItemReader
。当需要额外的行时,分页发生在幕后。
以下 XML 示例配置使用与ItemReaders
前面显示的基于光标的类似“客户信用”示例:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
以下 Java 示例配置使用与ItemReaders
前面显示的基于光标的类似“客户信用”示例:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
此配置使用必须指定的ItemReader
返回CustomerCredit
对象。RowMapper
“pageSize”属性确定每次查询运行时从数据库读取的实体数。
'parameterValues' 属性可用于指定Map
查询的参数值。如果在where
子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果您使用传统的“?” placeholder,那么每个条目的 key 应该是占位符的编号,从 1 开始。
JpaPagingItemReader
分页的另一种实现ItemReader
是JpaPagingItemReader
. JPA 没有类似于 Hibernate 的概念StatelessSession
,所以我们必须使用 JPA 规范提供的其他特性。由于 JPA 支持分页,因此在使用 JPA 进行批处理时这是一个自然的选择。读取每个页面后,实体将分离并清除持久性上下文,以便在处理页面后对实体进行垃圾收集。
JpaPagingItemReader
允许您声明 JPQL 语句并
EntityManagerFactory
传入. 然后,它在每次调用时传回一个项目,以与任何其他项目相同的基本方式进行读取ItemReader
。当需要其他实体时,分页发生在幕后。
以下 XML 示例配置使用与前面显示的 JDBC 阅读器相同的“客户信用”示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
以下 Java 示例配置使用与前面显示的 JDBC 阅读器相同的“客户信用”示例:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
假设对象具有正确的 JPA 注释或 ORM 映射文件,此配置以与上述完全相同的方式ItemReader
返回对象。“pageSize”属性确定每次查询执行时从数据库读取的实体数。CustomerCredit
JdbcPagingItemReader
CustomerCredit
数据库 ItemWriters
虽然平面文件和 XML 文件都有特定的ItemWriter
实例,但在数据库世界中并没有完全相同的实例。这是因为事务提供了所有需要的功能。
ItemWriter
文件的实现是必要的,因为它们必须像事务性的那样工作,跟踪写入的项目并在适当的时间刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建自己的 DAO 来实现ItemWriter
接口或使用自定义的 DAOItemWriter
这是为通用处理问题而编写的。无论哪种方式,它们都应该毫无问题地工作。需要注意的一件事是通过批处理输出提供的性能和错误处理能力。这在使用 hibernate 时最常见,ItemWriter
但在使用 JDBC 批处理模式时可能会遇到相同的问题。批处理数据库输出没有任何固有缺陷,假设我们小心刷新并且数据没有错误。但是,写入时的任何错误都可能导致混淆,因为无法知道是哪个单个项目导致了异常,或者即使任何单个项目负责,如下图所示:
如果项目在写入之前被缓冲,则在提交之前刷新缓冲区之前不会引发任何错误。例如,假设每个块写入 20 个项目,第 15 个项目抛出一个DataIntegrityViolationException
. 就目前Step
而言,所有 20 项都已成功写入,因为在实际写入之前无法知道是否发生了错误。一旦Session#flush()
被调用,缓冲区就会被清空并触发异常。在这一点上,没有什么Step
可以做。事务必须回滚。通常,此异常可能会导致项目被跳过(取决于跳过/重试策略),然后不再写入。但是,在批处理场景中,无法知道是哪个项目导致了问题。发生故障时正在写入整个缓冲区。解决此问题的唯一方法是在每个项目之后刷新,如下图所示:
这是一个常见的用例,尤其是在使用 Hibernate 时,实现的简单准则ItemWriter
是在每次调用write()
. 这样做可以可靠地跳过项目,Spring Batch 在内部负责处理ItemWriter
错误后调用的粒度。
重用现有服务
批处理系统通常与其他应用程序样式结合使用。最常见的是在线系统,但它也可以通过移动每种应用程序样式使用的必要批量数据来支持集成甚至是胖客户端应用程序。出于这个原因,许多用户希望在他们的批处理作业中重用现有的 DAO 或其他服务是很常见的。Spring 容器本身通过允许注入任何必要的类使这变得相当容易。但是,可能存在现有服务需要充当ItemReader
orItemWriter
的情况,以满足另一个 Spring Batch 类的依赖关系,或者因为它确实是主要的ItemReader
一步。为每个需要包装的服务编写一个适配器类是相当简单的,但是因为这是一个常见的问题,Spring Batch 提供了实现:
ItemReaderAdapter
和ItemWriterAdapter
. 这两个类都通过调用委托模式来实现标准的 Spring 方法,并且设置起来相当简单。
以下 XML 示例使用ItemReaderAdapter
:
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
以下 Java 示例使用ItemReaderAdapter
:
@Bean
public ItemReaderAdapter itemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(fooService());
reader.setTargetMethod("generateFoo");
return reader;
}
@Bean
public FooService fooService() {
return new FooService();
}
需要注意的重要一点是,合同的合同targetMethod
必须与合同相同read
:用尽时,它会返回null
。否则,它返回一个
Object
. 其他任何事情都会阻止框架知道处理何时结束,导致无限循环或不正确的失败,具体取决于ItemWriter
.
以下 XML 示例使用ItemWriterAdapter
:
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
以下 Java 示例使用ItemWriterAdapter
:
@Bean
public ItemWriterAdapter itemWriter() {
ItemWriterAdapter writer = new ItemWriterAdapter();
writer.setTargetObject(fooService());
writer.setTargetMethod("processFoo");
return writer;
}
@Bean
public FooService fooService() {
return new FooService();
}
防止状态持续存在
默认情况下,所有ItemReader
和ItemWriter
实现都将其当前状态存储在ExecutionContext
提交之前。但是,这可能并不总是期望的行为。例如,许多开发人员选择通过使用进程指示器使他们的数据库读取器“可重新运行”。一个额外的列被添加到输入数据以指示它是否已被处理。当读取(或写入)特定记录时,已处理标志从 翻转false
到true
。然后 SQL 语句可以在子句中包含额外的语句where
,例如where PROCESSED_IND = false
,从而确保在重新启动的情况下只返回未处理的记录。在这种情况下,最好不要存储任何状态,例如当前行号,因为它在重新启动时无关紧要。因此,所有读取器和写入器都包含“saveState”属性。
以下 bean 定义显示了如何防止 XML 中的状态持久性:
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
以下 bean 定义显示了如何防止 Java 中的状态持久性:
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PlayerSummary>()
.dataSource(dataSource)
.rowMapper(new PlayerSummaryMapper())
.saveState(false)
.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
+ "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
+ "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
+ "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
+ "from games, players where players.player_id ="
+ "games.player_id group by games.player_id, games.year_no")
.build();
}
上面的ItemReader
配置不会ExecutionContext
在它参与的任何执行中创建任何条目。
创建自定义 ItemReaders 和 ItemWriters
到目前为止,本章已经讨论了 Spring Batch 中读写的基本契约以及一些常见的实现。但是,这些都是相当通用的,并且有许多开箱即用的实现可能无法涵盖的潜在场景。本节通过一个简单的示例展示了如何创建自定义
ItemReader
和ItemWriter
实现并正确实施他们的合同。ItemReader
还实现了,
ItemStream
以说明如何使读取器或写入器可重新启动。
自定义ItemReader
示例
出于本示例的目的,我们创建了一个ItemReader
从提供的列表中读取的简单实现。我们从实现最基本的合约
ItemReader
,read
方法开始,如下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类接受一个项目列表并一次返回一个项目,从列表中删除每个项目。当列表为空时,返回null
,从而满足 an 的最基本要求,ItemReader
如下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使可ItemReader
重启
最后的挑战是使ItemReader
可重新启动。目前,如果处理被中断并重新开始,则ItemReader
必须从头开始。这在许多情况下实际上是有效的,但有时最好从停止的地方重新启动批处理作业。关键的判别通常是读者是有状态的还是无状态的。无状态阅读器不需要担心可重新启动性,但有状态阅读器必须尝试在重新启动时重建其最后一个已知状态。因此,我们建议您尽可能使自定义阅读器保持无状态,这样您就不必担心可重新启动性。
如果确实需要存储状态,ItemStream
则应使用该接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
在每次调用该ItemStream
update
方法时,当前索引ItemReader
都存储在提供ExecutionContext
的键“current.index”中。调用该
ItemStream
open
方法时,ExecutionContext
会检查它是否包含具有该键的条目。如果找到该键,则将当前索引移动到该位置。这是一个相当琐碎的例子,但它仍然符合一般合同:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数ItemReaders
都有更复杂的重启逻辑。JdbcCursorItemReader
例如,将最后处理的行的行 ID 存储在游标中。
还值得注意的是,其中使用的密钥ExecutionContext
不应该是微不足道的。那是因为相同ExecutionContext
的内容用于 a 中的所有ItemStreams
内容Step
。在大多数情况下,只需在键前面加上类名就足以保证唯一性。但是,在同一步骤中使用两个相同类型的极少数情况下
ItemStream
(如果需要两个文件进行输出,可能会发生这种情况),需要一个更唯一的名称。出于这个原因,许多 Spring Batch
ItemReader
和ItemWriter
实现都有一个setName()
属性,可以覆盖这个键名。
自定义ItemWriter
示例
实现自定义ItemWriter
在许多方面与ItemReader
上面的示例相似,但在足以保证其自己的示例的方式上有所不同。但是,添加可重启性本质上是相同的,因此在此示例中不涉及。与
ItemReader
示例一样,List
使用 a 是为了使示例尽可能简单:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(List<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使可ItemWriter
重启
为了使ItemWriter
可重新启动,我们将遵循与 相同的过程
ItemReader
,添加和实现ItemStream
接口以同步执行上下文。在示例中,我们可能必须计算处理的项目数并将其添加为页脚记录。如果我们需要这样做,我们可以
ItemStream
在我们的中实现,ItemWriter
以便在重新打开流时从执行上下文中重构计数器。
在许多实际情况下,customItemWriters
还委托给另一个本身可重新启动的写入程序(例如,写入文件时),或者它写入事务性资源因此不需要可重新启动,因为它是无状态的。当你有一个有状态的 writer 时,你可能应该确保ItemStream
实现ItemWriter
. 还请记住,编写器的客户端需要知道ItemStream
,因此您可能需要在配置中将其注册为流。
项目读取器和写入器实现
在本节中,我们将向您介绍前几节中尚未讨论的读者和作者。
装饰器
在某些情况下,用户需要将专门的行为附加到预先存在的
ItemReader
. Spring Batch 提供了一些开箱即用的装饰器,可以为您的实现添加额外的ItemReader
行为ItemWriter
。
Spring Batch 包括以下装饰器:
SynchronizedItemStreamReader
当使用ItemReader
非线程安全的时,Spring Batch 提供
SynchronizedItemStreamReader
装饰器,可用于使ItemReader
线程安全。Spring Batch 提供了一个SynchronizedItemStreamReaderBuilder
用于构造SynchronizedItemStreamReader
.
SingleItemPeekableItemReader
Spring Batch 包含一个装饰器,该装饰器将 peek 方法添加到ItemReader
. 这种查看方法让用户可以查看前面的一项。对 peek 的重复调用返回相同的项目,这是该read
方法返回的下一个项目。Spring Batch 提供了一个
SingleItemPeekableItemReaderBuilder
用于构造
SingleItemPeekableItemReader
.
SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为不可能在多个线程中实现 peek。只有一个偷看的线程会在下一次读取调用中获得该项目。 |
SynchronizedItemStreamWriter
当使用ItemWriter
非线程安全的时,Spring Batch 提供
SynchronizedItemStreamWriter
装饰器,可用于使ItemWriter
线程安全。Spring Batch 提供了一个SynchronizedItemStreamWriterBuilder
用于构造SynchronizedItemStreamWriter
.
MultiResourceItemWriter
当MultiResourceItemWriter
当前ResourceAwareItemWriterItemStream
资源中写入的项目数超过
itemCountLimitPerResource
. Spring Batch 提供了一个MultiResourceItemWriterBuilder
用于构造MultiResourceItemWriter
.
消息读者和作家
Spring Batch 为常用的消息传递系统提供以下阅读器和编写器:
AmqpItemReader
TheAmqpItemReader
是一个ItemReader
使用 anAmqpTemplate
来接收或转换来自交换的消息。Spring Batch 提供了一个AmqpItemReaderBuilder
用于构造AmqpItemReader
.
AmqpItemWriter
是AmqpItemWriter
一个ItemWriter
使用AmqpTemplate
发送消息到 AMQP 交换。如果提供的名称未指定,则消息将发送到无名交换器AmqpTemplate
。Spring Batch 提供了一个AmqpItemWriterBuilder
用于构造AmqpItemWriter
.
JmsItemReader
JmsItemReader
是用于 JMS的ItemReader
,它使用JmsTemplate
. 模板应该有一个默认目的地,用于为read()
方法提供项目。Spring Batch 提供了一个JmsItemReaderBuilder
用于构造
JmsItemReader
.
JmsItemWriter
JmsItemWriter
是用于 JMS的ItemWriter
,它使用JmsTemplate
. 模板应该有一个默认目的地,用于在write(List)
. Spring Batch 提供了一个JmsItemWriterBuilder
用于构造JmsItemWriter
.
数据库阅读器
Spring Batch 提供以下数据库阅读器:
Neo4jItemReader
这Neo4jItemReader
是一个ItemReader
通过使用分页技术从图形数据库 Neo4j 中读取对象的方法。Spring Batch 提供了一个Neo4jItemReaderBuilder
用于构造Neo4jItemReader
.
MongoItemReader
这MongoItemReader
是一个ItemReader
使用分页技术从 MongoDB 读取文档的方法。Spring Batch 提供了一个MongoItemReaderBuilder
用于构造MongoItemReader
.
HibernateCursorItemReader
这HibernateCursorItemReader
是一个ItemStreamReader
用于读取建立在 Hibernate 之上的数据库记录。它执行 HQL 查询,然后在初始化时,在调用方法时迭代结果集read()
,依次返回与当前行对应的对象。Spring Batch 提供了一个
HibernateCursorItemReaderBuilder
用于构造
HibernateCursorItemReader
.
数据库编写器
Spring Batch 提供以下数据库编写器:
Neo4jItemWriter
这Neo4jItemWriter
是一个ItemWriter
写入 Neo4j 数据库的实现。Spring Batch 提供了一个Neo4jItemWriterBuilder
用于构造
Neo4jItemWriter
.
MongoItemWriter
这MongoItemWriter
是一个ItemWriter
使用 Spring Data 的实现写入 MongoDB 存储的实现MongoOperations
。Spring Batch 提供了一个
MongoItemWriterBuilder
用于构造MongoItemWriter
.
RepositoryItemWriter
这RepositoryItemWriter
是来自 Spring Data的ItemWriter
包装器。CrudRepository
Spring Batch 提供了一个RepositoryItemWriterBuilder
用于构造RepositoryItemWriter
.
HibernateItemWriter
它HibernateItemWriter
使用ItemWriter
Hibernate 会话来保存或更新不属于当前 Hibernate 会话的实体。Spring Batch 提供了一个HibernateItemWriterBuilder
用于构造HibernateItemWriter
.
JdbcBatchItemWriter
这JdbcBatchItemWriter
是一个ItemWriter
使用批处理功能
NamedParameterJdbcTemplate
为所有提供的项目执行批处理语句的方法。Spring Batch 提供了一个JdbcBatchItemWriterBuilder
用于构造
JdbcBatchItemWriter
.
专业读者
Spring Batch 提供以下专业阅读器:
LdifReader
从LdifReader
a 读取 LDIF(LDAP 数据交换格式)记录Resource
,解析它们,并LdapAttribute
为每个read
执行返回一个对象。Spring Batch 提供了一个LdifReaderBuilder
用于构造LdifReader
.
专业作家
Spring Batch 提供以下专业编写器: