文件支持
Spring Integration 的文件支持通过专用词汇表扩展了 Spring Integration 核心,以处理读取、写入和转换文件。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-file:5.5.13"
它提供了一个命名空间,允许元素定义专用于文件的通道适配器,并支持可以将文件内容读入字符串或字节数组的转换器。
本节解释了它们的工作原理FileReadingMessageSource
以及FileWritingMessageHandler
如何将它们配置为 bean。它还讨论了通过Transformer
. 最后,它解释了文件特定的命名空间。
读取文件
AFileReadingMessageSource
可用于使用文件系统中的文件。这是一个MessageSource
从文件系统目录创建消息的实现。以下示例显示了如何配置FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为防止为某些文件创建消息,您可以提供FileListFilter
. 默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保不处理隐藏文件。请注意,隐藏的确切定义取决于系统。例如,在基于 UNIX 的系统上,以句点字符开头的文件被认为是隐藏的。另一方面,Microsoft Windows 有一个专用的文件属性来指示隐藏文件。
4.2 版引入了 |
AcceptOnceFileListFilter
确保文件仅从目录中提取一次。
将 从 4.0 版开始,此过滤器需要 从 4.1.5 版本开始,此过滤器有一个新属性 ( |
持久文件列表过滤器现在有一个布尔属性forRecursion
。将此属性设置为true
, 也设置alwaysAcceptDirectories
,这意味着出站网关 (ls
和mget
) 上的递归操作现在将始终遍历整个目录树。这是为了解决未检测到目录树深处更改的问题。此外,forRecursion=true
使文件的完整路径用作元数据存储键;这解决了如果同名文件在不同目录中多次出现时过滤器无法正常工作的问题。重要提示:这意味着将无法为顶级目录下的文件找到持久元数据存储中的现有密钥。为此,该物业false
默认; 这可能会在未来的版本中改变。
以下示例FileReadingMessageSource
使用过滤器配置 a:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是文件可能在准备好之前被检测到(即,其他一些进程可能仍在写入文件)。默认设置AcceptOnceFileListFilter
不会阻止这一点。在大多数情况下,如果文件写入过程在每个文件准备好读取时重命名它,则可以防止这种情况发生。一个filename-pattern
orfilename-regex
过滤器只接受准备好的文件(可能基于已知的后缀),由 default 组成AcceptOnceFileListFilter
,允许这种情况。启用组合,如以下CompositeFileListFilter
示例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,则 Spring Integration 提供了另一种选择。4.2 版添加了LastModifiedFileListFilter
. 可以使用age
属性配置此过滤器,以便过滤器仅传递比此值更旧的文件。年龄默认为 60 秒,但您应该选择足够大的年龄以避免提早获取文件(例如,由于网络故障)。以下示例显示了如何配置LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了一个ChainFileListFilter
(的扩展CompositeFileListFilter
)以允许后续过滤器只能看到前一个过滤器的结果的情况。(使用CompositeFileListFilter
,所有过滤器都可以看到所有文件,但它只通过已通过所有过滤器的文件)。需要新行为的一个示例是 和 的组合LastModifiedFileListFilter
,AcceptOnceFileListFilter
当我们不希望在经过一段时间后接受文件时。使用CompositeFileListFilter
, 因为AcceptOnceFileListFilter
在第一次通过时会看到所有文件,所以稍后在其他过滤器执行时它不会通过它。这CompositeFileListFilter
当模式过滤器与查找辅助文件以指示文件传输已完成的自定义过滤器结合使用时,该方法很有用。模式过滤器可能只传递主文件(例如something.txt
),但“完成”过滤器需要查看(例如)something.done
是否存在。
假设我们有文件a.txt
,a.done
和b.txt
.
模式过滤器仅通过a.txt
和b.txt
,而“完成”过滤器查看所有三个文件并仅通过a.txt
。复合过滤器的最终结果是只有a.txt
被释放。
使用ChainFileListFilter ,如果链中的任何过滤器返回一个空列表,则不会调用剩余的过滤器。
|
5.0 版引入了ExpressionFileListFilter
针对文件执行 SpEL 表达式作为上下文评估根对象。为此,所有用于文件处理(本地和远程)的 XML 组件以及现有filter
属性都提供了该filter-expression
选项,如以下示例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版引入了DiscardAwareFileListFilter
对被拒绝文件感兴趣的实现。为此,应通过回调为此类过滤器实现提供回调addDiscardCallback(Consumer<File>)
。在框架中,此功能从FileReadingMessageSource.WatchServiceDirectoryScanner
, 与 . 结合使用LastModifiedFileListFilter
。与常规不同DirectoryScanner
,WatchService
提供文件根据目标文件系统上的事件进行处理。在使用这些文件轮询内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为它们相对于其配置的age
. 因此,我们会丢失文件以备将来可能的考虑。丢弃回调钩子让我们将文件保留在内部队列中,以便age
在随后的轮询中对其进行检查。这CompositeFileListFilter
还实现了 aDiscardAwareFileListFilter
并向其所有DiscardAwareFileListFilter
委托填充了丢弃回调。
由于CompositeFileListFilter 将文件与所有委托进行匹配,因此discardCallback 可能会为同一个文件多次调用 。
|
从版本 5.1 开始,FileReadingMessageSource
不检查目录是否存在,并且在start()
调用它之前不会创建它(通常通过 wrapping SourcePollingChannelAdapter
)。以前,在引用目录时(例如从测试中或稍后应用权限时)没有简单的方法来防止操作系统权限错误。
消息头
从 5.0 版开始,FileReadingMessageSource
(除了payload
as a polled File
)将以下标头填充到 outbound Message
:
-
FileHeaders.FILENAME
:File.getName()
要发送的文件。可用于后续重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:File
对象本身。通常,当我们丢失原始对象时,框架组件(例如拆分器或转换器)会自动填充此标头。File
但是,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。 -
FileHeaders.RELATIVE_PATH
:引入了一个新的标头,用于表示相对于根目录的文件路径部分以进行扫描。当需要在其他位置恢复源目录层次结构时,此标头可能很有用。为此,DefaultFileNameGenerator
(请参阅“生成文件名” )可以配置为使用此标头。
目录扫描和轮询
FileReadingMessageSource
不会立即为目录中的文件生成消息。它使用内部队列来存储scanner
. 该scanEachPoll
选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。默认情况下 ( scanEachPoll = false
),在FileReadingMessageSource
再次扫描目录之前清空其队列。此默认行为对于减少对目录中大量文件的扫描特别有用。但是,在需要自定义排序的情况下,考虑将此标志设置为 的影响很重要true
。处理文件的顺序可能与预期不同。默认情况下,队列中的文件按其自然(path
) 命令。扫描添加的新文件,即使队列已经有文件,也会插入到适当的位置以保持自然顺序。要自定义顺序,FileReadingMessageSource
可以接受 aComparator<File>
作为构造函数参数。内部 ( PriorityBlockingQueue
) 使用它来根据业务需求对其内容进行重新排序。因此,要按特定顺序处理文件,您应该提供一个比较器,FileReadingMessageSource
而不是对自定义生成的列表进行排序DirectoryScanner
。
引入 5.0 版RecursiveDirectoryScanner
以执行文件树访问。该实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目录 ( DirectoryScanner.listFiles(File)
) 参数从结果中排除。所有其他子目录包含和排除都基于目标FileListFilter
实现。例如,SimplePatternFileListFilter
默认情况下过滤掉目录。有关详细信息,请参阅AbstractDirectoryAwareFileListFilter
及其实现。
从 5.5 版开始,FileInboundChannelAdapterSpec Java DSL 提供了一个方便的recursive(boolean) 选项,可以RecursiveDirectoryScanner 在目标中使用 aFileReadingMessageSource 而不是默认的。
|
命名空间支持
文件读取的配置可以通过使用文件特定的命名空间来简化。为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,您可以减少FileReadingMessageSource
并将其包装在入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略prevent-duplicates
andignore-hidden
属性,因为它们是true
默认的。
Spring Integration 4.2 引入了该 |
第二个通道适配器示例使用自定义过滤器,第三个使用filename-pattern
属性添加AntPathMatcher
基于过滤器,第四个使用filename-regex
属性将基于正则表达式模式的过滤器添加到FileReadingMessageSource
. 和属性都filename-pattern
与常规引用属性filename-regex
互斥。filter
但是,您可以使用该filter
属性来引用一个实例,CompositeFileListFilter
该实例组合了任意数量的过滤器,包括一个或多个基于模式的过滤器以满足您的特定需求。
当多个进程从同一个目录读取时,您可能希望锁定文件以防止它们被同时拾取。为此,您可以使用FileLocker
. 有一个java.nio
基于 - 的实现可用,但也可以实现您自己的锁定方案。nio
可以按如下方式注入更衣室:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义储物柜:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了锁时,它负责在允许接收文件之前获取锁。它不承担解锁文件的责任。如果您已经处理了文件并保留了锁,那么您就有内存泄漏。如果这是一个问题,您应该FileLocker.unlock(File file) 在适当的时候给自己打电话。
|
当过滤和锁定文件还不够时,您可能需要控制完全列出文件的方式。要实现这种类型的需求,您可以使用DirectoryScanner
. 此扫描仪可让您准确确定每次投票中列出的文件。这也是 Spring Integration 在内部用于连接FileListFilter
实例和FileLocker
连接到FileReadingMessageSource
. 您可以将自定义DirectoryScanner
注入到<int-file:inbound-channel-adapter/>
onscanner
属性中,如以下示例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您完全自由地选择排序、列表和锁定策略。
了解过滤器(包括patterns
、regex
、prevent-duplicates
和其他)和locker
实例实际上是由scanner
. 适配器上设置的任何这些属性随后都会注入到 internalscanner
中。对于external 的情况scanner
,所有filter 和locker 属性都被禁止FileReadingMessageSource
。必须在该 custom 上指定(如果需要)它们DirectoryScanner
。换句话说,如果你将 ascanner
注入FileReadingMessageSource
,你应该提供filter
and locker
,scanner
而不是FileReadingMessageSource
.
默认情况下,DefaultDirectoryScanner 使用 anIgnoreHiddenFileListFilter 和 an AcceptOnceFileListFilter 。为防止它们被使用,您可以配置自己的过滤器(例如AcceptAllFileListFilter ),甚至将其设置为null .
|
WatchServiceDirectoryScanner
将FileReadingMessageSource.WatchServiceDirectoryScanner
新文件添加到目录时依赖于文件系统事件。在初始化期间,该目录被注册以生成事件。初始文件列表也在初始化期间建立。在遍历目录树时,遇到的任何子目录也会被注册以生成事件。在第一次轮询时,返回遍历目录的初始文件列表。在随后的轮询中,将返回来自新创建事件的文件。如果添加了新的子目录,则使用其创建事件遍历新子树以查找现有文件并注册找到的任何新子目录。
当程序没有像目录修改事件发生一样快地耗尽WatchKey 其内部事件时,
存在一个问题。queue 如果超出队列大小,StandardWatchEventKinds.OVERFLOW 则发出 a 以指示某些文件系统事件可能会丢失。在这种情况下,会完全重新扫描根目录。为避免重复,请考虑使用适当的FileListFilter (例如AcceptOnceFileListFilter )或在处理完成时删除文件。
|
WatchServiceDirectoryScanner
可以通过选项启用,与选项FileReadingMessageSource.use-watch-service
互斥scanner
。为FileReadingMessageSource.WatchServiceDirectoryScanner
提供的directory
.
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
。
如果您需要跟踪现有文件以及新文件的修改,则应ENTRY_MODIFY
在FileListFilter
. 否则,来自这些事件的文件将以相同的方式处理。
ResettableFileListFilter
实现拾取事件ENTRY_DELETE
。因此,为操作提供了它们的文件remove()
。启用此事件后,过滤器(例如AcceptOnceFileListFilter
删除文件)。因此,如果出现同名文件,它会通过过滤器并作为消息发送。
为此,引入了watch-events
属性 ( )。FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
(WatchEventType
是 中的公共内部枚举FileReadingMessageSource
。)使用这样的选项,我们可以对新文件使用一个下游流逻辑,对修改的文件使用其他一些逻辑。下面的例子展示了如何为在同一个目录中创建和修改事件配置不同的逻辑:
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
限制内存消耗
您可以使用 aHeadDirectoryScanner
来限制内存中保留的文件数。这在扫描大型目录时很有用。使用 XML 配置,可以通过queue-size
在入站通道适配器上设置属性来启用此功能。
在 4.2 版之前,此设置与任何其他过滤器的使用不兼容。任何其他过滤器(包括prevent-duplicates="true"
)覆盖了用于限制大小的过滤器。
a 的使用与 . 通常, |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'尾随文件
另一个流行的用例是从文件的末尾(或尾部)获取“行”,在添加新行时捕获它们。提供了两种实现方式。第一个,OSDelegatingFileTailingMessageProducer
,使用本机tail
命令(在有一个的操作系统上)。这通常是这些平台上最有效的实现。对于没有tail
命令的操作系统,第二个实现ApacheCommonsFileTailingMessageProducer
使用 Apachecommons-io
Tailer
类。
在这两种情况下,文件系统事件,例如文件不可用和其他事件,都ApplicationEvent
通过使用正常的 Spring 事件发布机制作为实例发布。此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
前面示例中显示的事件序列可能会发生,例如,当文件旋转时。
从 5.0 版开始,FileTailingIdleEvent
在idleEventInterval
. 以下示例显示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令的平台都提供这些状态消息。
|
从这些端点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
:File
对象 -
FileHeaders.FILENAME
: 文件名 (File.getName()
)
在 5.0 之前的版本中,FileHeaders.FILENAME 标头包含文件绝对路径的字符串表示形式。您现在可以通过调用getAbsolutePath() 原始文件头来获得该字符串表示。
|
以下示例使用默认选项('-F -n 0',意思是从当前结尾跟随文件名)创建一个本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用“-F -n +0”选项创建本机适配器(意味着遵循文件名,发出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,丢失的文件会导致tail
失败,即使-F
指定了),命令每 10 秒重试一次。
默认情况下,本机适配器从标准输出捕获并将内容作为消息发送。它们还从标准错误中捕获以引发事件。从版本 4.3.6 开始,您可以通过将 设置为enable-status-reader
来丢弃标准错误事件false
,如以下示例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,表示如果五秒内没有写入任何行,FileTailingIdleEvent
则每五秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这会很有用。
下面的示例创建一个 Apachecommons-io
Tailer
适配器,它每两秒检查一次文件是否有新行,并每十秒检查一次是否存在丢失的文件:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 该文件从开头 ( end="false" ) 而不是结尾(这是默认设置)拖尾。 |
2 | 为每个块重新打开文件(默认为保持文件打开)。 |
指定delay 、end 或reopen 属性会强制使用 Apachecommons-io 适配器并使该native-options 属性不可用。
|
写入文件
要将消息写入文件系统,您可以使用FileWritingMessageHandler
. 此类可以处理以下负载类型:
-
File
-
String
-
字节数组
-
InputStream
(从4.2 版开始)
对于字符串负载,您可以配置编码和字符集。
为了使事情变得更简单,您可以FileWritingMessageHandler
使用 XML 命名空间将其配置为出站通道适配器或出站网关的一部分。
从 4.3 版开始,您可以指定写入文件时使用的缓冲区大小。
从 5.1 版开始,您可以提供一个BiConsumer<File, Message<?>>
newFileCallback
如果您使用FileExistsMode.APPEND
orFileExistsMode.APPEND_NO_FLUSH
并且必须创建一个新文件时触发的。这个回调接收一个新创建的文件和触发它的消息。例如,此回调可用于编写在消息头中定义的 CSV 头。
生成文件名
在最简单的形式中,FileWritingMessageHandler
只需要一个目标目录来写入文件。要写入的文件的名称由处理程序的FileNameGenerator
. 默认实现会查找其键与定义为 的常量匹配的消息头FileHeaders.FILENAME
。
或者,您可以指定要针对消息评估的表达式以生成文件名 - 例如,headers['myCustomHeader'] + '.something'
. 表达式必须计算为 a String
。为方便起见,DefaultFileNameGenerator
还提供了该setHeaderName
方法,让您显式指定将其值用作文件名的消息头。
设置完成后,将DefaultFileNameGenerator
采用以下解析步骤来确定给定消息有效负载的文件名:
-
根据消息评估表达式,如果结果为非空
String
,则将其用作文件名。 -
否则,如果有效负载是 a
java.io.File
,则使用File
对象的文件名。 -
否则,使用附加的消息 ID。
msg
作为文件名。
当您使用 XML 命名空间支持时,文件出站通道适配器和文件出站网关都支持以下互斥配置属性:
-
filename-generator
(对实现的引用FileNameGenerator
) -
filename-generator-expression
(计算结果为 a 的表达式String
)
写入文件时,使用临时文件后缀(默认为.writing
)。在写入文件时,它会附加到文件名。要自定义后缀,您可以temporary-file-suffix
在文件出站通道适配器和文件出站网关上设置属性。
使用APPEND filemode 时,该temporary-file-suffix 属性被忽略,因为数据直接附加到文件中。
|
从 4.2.5 版开始,生成的文件名(作为filename-generator
或filename-generator-expression
评估的结果)可以与目标文件名一起表示子路径。它File(File parent, String child)
像以前一样用作第二个构造函数参数。但是,过去我们没有mkdirs()
为子路径创建 () 目录,只假设文件名。这种方法在我们需要恢复文件系统树以匹配源目录的情况下很有用——例如,当解压缩存档并以原始顺序保存目标目录中的所有文件时。
指定输出目录
文件出站通道适配器和文件出站网关都提供了两个互斥的配置属性来指定输出目录:
-
directory
-
directory-expression
Spring Integration 2.2 引入了该directory-expression 属性。
|
使用directory
属性
使用该directory
属性时,输出目录设置为一个固定值,该值在FileWritingMessageHandler
初始化时设置。如果不指定此属性,则必须使用该directory-expression
属性。
使用directory-expression
属性
如果您想获得完整的 SpEL 支持,可以使用该directory-expression
属性。此属性接受一个 SpEL 表达式,该表达式针对正在处理的每条消息进行评估。因此,当您动态指定输出文件目录时,您可以完全访问消息的有效负载及其标头。
SpEL 表达式必须解析为aString
或。(无论如何,后者被评估为 a 。)此外,结果or必须指向一个目录。如果不指定属性,则必须设置属性。java.io.File
org.springframework.core.io.Resource
File
String
File
directory-expression
directory
使用auto-create-directory
属性
默认情况下,如果目标目录不存在,则会自动创建相应的目标目录和任何不存在的父目录。为了防止这种行为,您可以将auto-create-directory
属性设置为false
. 此属性适用于directory
和directory-expression
属性。
使用 与初始化适配器时检查目标目录的存在不同,现在对正在处理的每条消息执行此检查。 此外,如果 |
处理现有的目标文件
当您写入文件并且目标文件已经存在时,默认行为是覆盖该目标文件。mode
您可以通过在相关文件出站组件上设置属性来更改此行为。存在以下选项:
-
REPLACE
(默认) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
FAIL
-
IGNORE
Spring Integration 2.2 引入了mode 属性和APPEND , FAIL , 和IGNORE 选项。
|
REPLACE
-
如果目标文件已存在,则将其覆盖。如果
mode
未指定该属性,则这是写入文件时的默认行为。 REPLACE_IF_MODIFIED
-
如果目标文件已经存在,则仅当最后修改的时间戳与源文件的时间戳不同时才会覆盖它。对于
File
有效负载,将有效负载lastModified
时间与现有文件进行比较。对于其他有效负载,将FileHeaders.SET_MODIFIED
(file_setModified
) 标头与现有文件进行比较。如果标头丢失或具有不是 a 的值,Number
则始终替换该文件。 APPEND
-
此模式允许您将消息内容附加到现有文件,而不是每次都创建一个新文件。请注意,此属性与该属性互斥,
temporary-file-suffix
因为当它向现有文件附加内容时,适配器不再使用临时文件。该文件在每条消息后关闭。 APPEND_NO_FLUSH
-
此选项与 具有相同的语义
APPEND
,但在每条消息之后不会刷新数据并且不会关闭文件。这可以提供显着的性能,但在发生故障时可能会丢失数据。有关详细信息,请参阅使用时刷新文件APPEND_NO_FLUSH
。 FAIL
-
如果目标文件存在,
MessageHandlingException
则抛出 a。 IGNORE
-
如果目标文件存在,则消息负载将被静默忽略。
使用临时文件后缀(默认为.writing )时,IGNORE 如果存在最终文件名或临时文件名,则该选项适用。
|
使用时刷新文件APPEND_NO_FLUSH
该APPEND_NO_FLUSH
模式是在 4.3 版中添加的。使用它可以提高性能,因为文件不会在每条消息后关闭。但是,如果发生故障,这可能会导致数据丢失。
Spring Integration 提供了几种刷新策略来减轻这种数据丢失:
-
使用
flushInterval
. 如果在这段时间内没有写入文件,则会自动刷新。这是近似值,可能到1.33x
现在为止(平均值为1.167x
)。 -
将包含正则表达式的消息发送到消息处理程序的
trigger
方法。具有与模式匹配的绝对路径名的文件将被刷新。 -
为处理程序提供自定义实现,以修改将消息发送到方法
MessageFlushPredicate
时所采取的操作。trigger
-
flushIfNeeded
通过传入自定义FileWritingMessageHandler.FlushPredicate
或FileWritingMessageHandler.MessageFlushPredicate
实现来调用处理程序的方法之一。
每个打开的文件都会调用谓词。有关更多信息,请参阅这些接口的Javadoc。请注意,从 5.0 版本开始,谓词方法提供了另一个参数:如果是新文件或之前关闭,则当前文件首次写入的时间。
使用flushInterval
时,间隔从最后一次写入开始。仅当文件在该时间间隔内处于空闲状态时才会刷新该文件。从版本 4.3.7 开始,可以将附加属性 ( flushWhenIdle
) 设置为false
,这意味着间隔从第一次写入之前刷新的(或新的)文件开始。
文件时间戳
默认情况下,目标文件的lastModified
时间戳是文件创建的时间(除了就地重命名保留当前时间戳)。从 4.3 版开始,您现在可以配置preserve-timestamp
(或setPreserveTimestamp(true)
在使用 Java 配置时)。对于File
有效负载,这会将时间戳从入站文件传输到出站文件(无论是否需要副本)。对于其他有效负载,如果FileHeaders.SET_MODIFIED
标头 ( file_setModified
) 存在,则它用于设置目标文件的lastModified
时间戳,只要标头是Number
.
文件权限
从版本 5.0 开始,将文件写入支持 Posix 权限的文件系统时,您可以在出站通道适配器或网关上指定这些权限。该属性是一个整数,通常以熟悉的八进制格式提供——例如,0640
表示所有者具有读/写权限,组具有只读权限,而其他人没有访问权限。
文件出站通道适配器
以下示例配置文件出站通道适配器:
<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>
基于命名空间的配置也支持delete-source-files
属性。如果设置为true
,它会在写入目标后触发删除原始源文件。该标志的默认值为false
. 以下示例显示如何将其设置为true
:
<int-file:outbound-channel-adapter id="filesOut"
directory="${output.directory}"
delete-source-files="true"/>
delete-source-files 仅当入站消息具有File 有效负载或FileHeaders.ORIGINAL_FILE 标头值包含源File 实例或String 表示原始文件路径
的 a 时,
该属性才有效。 |
从版本 4.2 开始,FileWritingMessageHandler
支持一个append-new-line
选项。如果设置为true
,则在写入消息后将新行附加到文件中。默认属性值为false
。以下示例显示了如何使用该append-new-line
选项:
<int-file:outbound-channel-adapter id="newlineAdapter"
append-new-line="true"
directory="${output.directory}"/>
出站网关
如果您希望根据写入的文件继续处理消息,则可以outbound-gateway
改用。它的作用类似于outbound-channel-adapter
. 但是,在写入文件后,它也会将其作为消息的有效负载发送到回复通道。
以下示例配置出站网关:
<int-file:outbound-gateway id="mover" request-channel="moveInput"
reply-channel="output"
directory="${output.directory}"
mode="REPLACE" delete-source-files="true"/>
如前所述,您还可以指定mode
属性,该属性定义了如何处理目标文件已经存在的情况的行为。有关详细信息,请参阅处理现有目标文件。通常,使用文件出站网关时,结果文件作为回复通道上的消息负载返回。
这也适用于指定IGNORE
模式。在这种情况下,将返回预先存在的目标文件。如果请求消息的负载是一个文件,您仍然可以通过消息头访问该原始文件。请参阅FileHeaders.ORIGINAL_FILE。
在您想首先移动文件然后通过处理管道发送它的情况下,“出站网关”效果很好。在这种情况下,您可以将文件命名空间的inbound-channel-adapter 元素连接到outbound-gateway ,然后将该网关连接reply-channel 到管道的开头。
|
如果您有更详细的要求或需要支持其他有效负载类型作为要转换为文件内容的输入,您可以扩展FileWritingMessageHandler
. 但更好的选择是依赖Transformer
.
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
}
@Bean
@ServiceActivator(inputChannel = "writeToFileChannel")
public MessageHandler fileWritingMessageHandler() {
Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileExistsMode(FileExistsMode.APPEND);
return handler;
}
@MessagingGateway(defaultRequestChannel = "writeToFileChannel")
public interface MyGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
@Header(FileHeaders.FILENAME) File directory, String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
fileWritingInput.send(new GenericMessage<>("foo"));
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlows.from("fileWritingInput")
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
.header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
.handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
.channel(MessageChannels.queue("fileWritingResultChannel"))
.get();
}
}
文件转换器
要将从文件系统读取的数据转换为对象,反之亦然,您需要做一些工作。与FileReadingMessageSource
在较小程度上不同的是FileWritingMessageHandler
,您可能需要自己的机制来完成工作。为此,您可以实现Transformer
接口。或者,您可以扩展AbstractFilePayloadTransformer
入站消息。Spring Integration 提供了一些明显的实现。
查看接口的JavadocTransformer
以了解哪些 Spring Integration 类实现了它。同样,您可以检查该类的JavadocAbstractFilePayloadTransformer
以查看哪些 Spring Integration 类对其进行了扩展。
FileToByteArrayTransformer
通过使用SpringAbstractFilePayloadTransformer
的. 使用一系列转换器通常比将所有转换放在一个类中更好。在这种情况下,转换可能是合乎逻辑的第一步。File
byte[]
FileCopyUtils
File
byte[]
FileToStringTransformer
extendsAbstractFilePayloadTransformer
将File
对象转换为String
. 如果不出意外,这对于调试很有用(考虑将它与丝锥一起使用)。
要配置特定于文件的转换器,您可以使用文件命名空间中的适当元素,如以下示例所示:
<int-file:file-to-bytes-transformer input-channel="input" output-channel="output"
delete-files="true"/>
<int-file:file-to-string-transformer input-channel="input" output-channel="output"
delete-files="true" charset="UTF-8"/>
该delete-files
选项向转换器发出信号,它应该在转换完成后删除入站文件。这绝不是在多线程环境中使用AcceptOnceFileListFilter
时的替代品FileReadingMessageSource
(例如,当您通常使用 Spring Integration 时)。
文件拆分器
FileSplitter
是在 4.1.2 版中添加的,它的命名空间支持是在 4.2 版中添加的。将FileSplitter
文本文件拆分为单独的行,基于BufferedReader.readLine()
. 默认情况下,当Iterator
从文件中读取行时,拆分器使用 an 一次发出一行。将iterator
属性设置为false
使其在将所有行作为消息发出之前将它们读入内存。一个用例可能是如果您想在发送任何包含行的消息之前检测文件上的 I/O 错误。但是,它只适用于相对较短的文件。
入站有效负载可以是File
、String
(File
路径)InputStream
、 或Reader
。其他有效载荷类型不变地发射。
以下清单显示了配置 a 的可能方法FileSplitter
:
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | 拆分器的 bean 名称。 |
2 | 设置为true (默认值)以使用迭代器或false 在发送行之前将文件加载到内存中。 |
3 | 设置为true 在文件数据之前和之后发出文件开始和文件结束标记消息。标记是带有FileSplitter.FileMarker 有效负载的消息(属性中带有START 和END 值mark )。在过滤某些行的下游流中按顺序处理文件时,您可能会使用标记。它们使下游处理能够知道文件何时已完全处理。此外,file_marker 包含START 或END 添加到这些消息的标头。标记包括行END 数。如果文件为空,则只有START 和END 标记0 以lineCount . 默认值为false . 当true ,默认情况下apply-sequence 。false 另见markers-json (下一个属性)。 |
4 | 当markers 为 true 时,将其设置为true 将FileMarker 对象转换为 JSON 字符串。(使用SimpleJsonSerializer 下面)。 |
5 | 设置为false 禁用消息中包含sequenceSize 和sequenceNumber 标题。默认是true ,除非markers 是true 。当true 和markers 是true ,标记被包括在测序中。当true 和iterator 是true 时,sequenceSize 标头设置为0 ,因为大小未知。 |
6 | 如果文件中没有行,则设置为true 导致抛出 a 。RequiresReplyException 默认值为false . |
7 | 设置将文本数据读入String 有效负载时使用的字符集名称。默认为平台字符集。 |
8 | 在为其余行发出的消息中作为标题携带的第一行的标题名称。从 5.0 版开始。 |
9 | 设置用于向拆分器发送消息的输入通道。 |
10 | 设置消息发送到的输出通道。 |
11 | 设置发送超时。仅适用于output-channel can 阻塞 - 例如完整的QueueChannel . |
12 | 设置为false 在刷新上下文时禁用自动启动拆分器。默认值为true . |
13 | input-channel 如果是,则设置此端点的顺序<publish-subscribe-channel/> 。 |
14 | 设置分离器的启动阶段(在auto-startup is时使用true )。 |
FileSplitter
还将任何基于文本的内容拆分为InputStream
行。从版本 4.3 开始,当与 FTP 或 SFTP 流式入站通道适配器或使用stream
选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用时自动关闭支持流的会话有关这些工具的更多信息,请参阅FTP 流式入站通道适配器和SFTP 流式入站通道适配器以及FTP 出站网关和SFTP 出站网关。
使用 Java 配置时,可以使用额外的构造函数,如以下示例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当markersJson
为真时,标记表示为 JSON 字符串(使用 a SimpleJsonSerializer
)。
5.0 版引入了firstLineAsHeader
将第一行内容指定为标题的选项(例如 CSV 文件中的列名)。传递给此属性的参数是标题名称,第一行作为标题在为其余行发出的消息中携带。此行不包含在序列标题中(如果applySequence
为真),也不包含在lineCount
关联的FileMarker.END
. 注意:从 5.5 版开始,lineCount` 也包含FileHeaders.LINE_COUNT
在消息的标题中FileMarker.END
,因为FileMarker
可以序列化为 JSON。如果文件仅包含标题行,则该文件被视为空文件,因此仅FileMarker
在拆分期间发出实例(如果启用了标记 - 否则,不会发出任何消息)。默认情况下(如果没有设置标题名称),第一行被认为是数据,并成为第一个发出的消息的有效负载。
如果您需要关于从文件内容中提取标题的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定的标题,等等),请考虑在FileSplitter
. 请注意,已移动到标题的行可能会在正常内容过程的下游被过滤掉。
幂等下游处理拆分文件
当为真时,拆分器在标题apply-sequence
中添加行号(当为真时,标记计为行)。行号可以与幂等接收器一起使用,以避免在重新启动后重新处理行。SEQUENCE_NUMBER
markers
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}
文件聚合器
从 5.5 版开始,当启用 START/END 标记时,FileAggregator
引入了 a 以涵盖用例的另一面。FileSplitter
为方便起见,FileAggregator
实现了所有三个序列细节策略:
-
HeaderAttributeCorrelationStrategy
带有属性的FileHeaders.FILENAME
用于相关键计算。当在 上启用标记时FileSplitter
,它不会填充序列详细信息标头,因为 START/END 标记消息也包含在序列大小中。FileHeaders.FILENAME
仍然为发出的每一行填充 ,包括 START/END 标记消息。 -
FileMarkerReleaseStrategy
- 检查组中的消息FileSplitter.FileMarker.Mark.END
,然后将FileHeaders.LINE_COUNT
标头值与组大小减去2
-FileSplitter.FileMarker
实例进行比较。它还实现了一个方便的GroupConditionProvider
联系人,conditionSupplier
以便在AbstractCorrelatingMessageHandler
. 有关详细信息,请参阅消息组条件。 -
FileAggregatingMessageGroupProcessor
只是从组中删除消息FileSplitter.FileMarker
并将其余消息收集到列表有效负载中以生成。
以下清单显示了配置 a 的可能方法FileAggregator
:
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
如果默认行为FileAggregator
不满足目标逻辑,建议使用单独的策略配置聚合器端点。有关更多信息,请参阅FileAggregator
JavaDocs。
远程持久文件列表过滤器
入站和流式入站远程文件通道适配器(FTP
、SFTP
和其他技术)默认配置有相应的实现AbstractPersistentFileListFilter
,配置有 in-memory MetadataStore
。要在集群中运行,可以使用共享过滤器替换这些过滤器MetadataStore
(有关更多信息,请参阅元数据存储)。这些过滤器用于防止多次获取同一个文件(除非修改时间更改)。从版本 5.2 开始,在文件被提取之前立即将文件添加到过滤器(如果提取失败,则将其反转)。
如果发生灾难性故障(例如断电),当前正在获取的文件可能会保留在过滤器中,并且在重新启动应用程序时不会重新获取。在这种情况下,您需要手动从MetadataStore .
|
在以前的版本中,在获取任何文件之前对文件进行了过滤,这意味着在发生灾难性故障后可能有多个文件处于这种状态。
为了促进这种新行为,已将两种新方法添加到FileListFilter
.
boolean accept(F file);
boolean supportsSingleFileFiltering();
如果过滤器返回true
,supportsSingleFileFiltering
它必须实现accept()
。
如果远程过滤器不支持单个文件过滤(例如AbstractMarkerFilePresentFileListFilter
),则适配器将恢复到以前的行为。
If multiple filters are in used (using a CompositeFileListFilter
or ChainFileListFilter
), then all of the delegate filters must support single file filtering in order for the composite filter to support it.
持久文件列表过滤器现在有一个布尔属性forRecursion
。将此属性设置为true
, 也设置alwaysAcceptDirectories
,这意味着出站网关 (ls
和mget
) 上的递归操作现在将始终遍历整个目录树。这是为了解决未检测到目录树深处更改的问题。此外,forRecursion=true
使文件的完整路径用作元数据存储键;这解决了如果同名文件在不同目录中多次出现时过滤器无法正常工作的问题。重要提示:这意味着将无法为顶级目录下的文件找到持久元数据存储中的现有密钥。为此,该物业false
默认; 这可能会在未来的版本中改变。