FTP/FTPS 适配器
Spring Integration 支持使用 FTP 和 FTPS 进行文件传输操作。
文件传输协议 (FTP) 是一种简单的网络协议,可让您在 Internet 上的两台计算机之间传输文件。FTPS 代表“基于 SSL 的 FTP”。
您需要将此依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>5.5.13</version>
</dependency>
compile "org.springframework.integration:spring-integration-ftp:5.5.13"
FTP 通信有两个参与者:客户端和服务器。要使用 FTP 或 FTPS 传输文件,您可以使用客户端来启动与运行 FTP 服务器的远程计算机的连接。建立连接后,客户端可以选择发送或接收文件副本。
Spring Integration 通过提供三个客户端端点支持通过 FTP 或 FTPS 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。它还为定义这些客户端组件提供了方便的基于名称空间的配置选项。
要使用 FTP 命名空间,请将以下内容添加到 XML 文件的标题中:
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"
FTP 会话工厂
Spring Integration 提供了可用于创建 FTP(或 FTPS)会话的工厂。
默认工厂
从 3.0 版开始,默认情况下不再缓存会话。请参阅FTP 会话缓存。 |
在配置 FTP 适配器之前,您必须配置一个 FTP 会话工厂。您可以使用实现类所在的常规 bean 定义来配置 FTP 会话工厂o.s.i.ftp.session.DefaultFtpSessionFactory
。以下示例显示了基本配置:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="kermit"/>
<property name="password" value="frog"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
对于 FTPS 连接,您可以o.s.i.ftp.session.DefaultFtpsSessionFactory
改用。
以下示例显示了完整的配置:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="oleg"/>
<property name="password" value="password"/>
<property name="clientMode" value="1"/>
<property name="fileType" value="2"/>
<property name="useClientMode" value="true"/>
<property name="cipherSuites" value="a,b.c"/>
<property name="keyManager" ref="keyManager"/>
<property name="protocol" value="SSL"/>
<property name="trustManager" ref="trustManager"/>
<property name="prot" value="P"/>
<property name="needClientAuth" value="true"/>
<property name="authValue" value="oleg"/>
<property name="sessionCreation" value="true"/>
<property name="protocols" value="SSL, TLS"/>
<property name="implicit" value="true"/>
</bean>
如果您遇到连接问题并希望跟踪会话创建以及查看轮询了哪些会话,您可以通过将记录器设置为TRACE 级别来启用会话跟踪(例如,log4j.category.org.springframework.integration.file=TRACE )。
|
现在您只需要将这些会话工厂注入到您的适配器中。适配器使用的协议(FTP 或 FTPS)取决于已注入适配器的会话工厂的类型。
为 FTP 或 FTPS 会话工厂提供值的更实用的方法是使用 Spring 的属性占位符支持(参见https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-工厂占位符配置器)。 |
高级配置
DefaultFtpSessionFactory
提供了对底层客户端 API 的抽象,它(从 Spring Integration 2.0 开始)是Apache Commons Net。这使您免于org.apache.commons.net.ftp.FTPClient
. 会话工厂中公开了几个常用属性(从 4.0 版开始,现在包括connectTimeout
、defaultTimeout
和dataTimeout
)。但是,有时您需要访问较低级别FTPClient
的配置以实现更高级的配置(例如设置活动模式的端口范围)。为此,AbstractFtpSessionFactory
(所有 FTP 会话工厂的基类)公开了钩子,其形式为以下清单中所示的两种后处理方法:
/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}
如您所见,这两种方法没有默认实现。但是,通过扩展DefaultFtpSessionFactory
,您可以覆盖这些方法以提供更高级的 配置FTPClient
,如以下示例所示:
public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {
protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}
FTPS 和共享 SSLSession
当使用基于 SSL 或 TLS 的 FTP 时,某些服务器要求SSLSession
在控制和数据连接上使用相同的内容。这是为了防止“窃取”数据连接。有关详细信息,请参阅https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html。
目前,Apache FTPSClient 不支持此功能。请参阅NET-408。
以下由Stack Overflow提供的解决方案在 上使用反射sun.security.ssl.SSLSessionContextImpl
,因此它可能不适用于其他 JVM。堆栈溢出的答案是2015年提交的,最近Spring Integration团队在JDK 1.8.0_112上测试了该解决方案。
以下示例显示了如何创建 FTPS 会话:
@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {
@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}
};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}
private static final class SharedSSLFTPSClient extends FTPSClient {
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}
}
}
委托会话工厂
4.2 版引入了DelegatingSessionFactory
,它允许在运行时选择实际的会话工厂。在调用 FTP 端点之前,调用setThreadKey()
工厂以将密钥与当前线程相关联。然后使用该密钥来查找要使用的实际会话工厂。您可以在使用后通过调用来清除密钥clearThreadKey()
。
我们添加了便利方法,以便您可以轻松地从消息流中使用委托会话工厂。
以下示例显示如何声明委托会话工厂:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
当您使用会话缓存时(请参阅FTP 会话缓存),每个委托都应该被缓存。您不能缓存DelegatingSessionFactory 自身。
|
从 5.0.7 版本开始,DelegatingSessionFactory
可以与 a 结合使用RotatingServerAdvice
来轮询多个服务器;请参阅入站通道适配器:轮询多个服务器和目录。
FTP 入站通道适配器
FTP 入站通道适配器是一个特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建的新文件),此时它会启动文件传输。以下示例显示了如何配置inbound-channel-adapter
:
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前面的配置所示,您可以使用该inbound-channel-adapter
元素配置 FTP 入站通道适配器,同时还提供各种属性的值,例如local-directory
, filename-pattern
(基于简单的模式匹配,而不是正则表达式),以及对session-factory
.
默认情况下,传输的文件与原始文件具有相同的名称。如果您想覆盖此行为,您可以设置该local-filename-generator-expression
属性,它允许您提供一个 SpEL 表达式来生成本地文件的名称。与 SpEL 评估上下文的根对象是 a 的出站网关和适配器不同Message
,此入站适配器在评估时还没有消息,因为这是它最终使用传输的文件作为其有效负载生成的。因此,SpEL 评估上下文的根对象是远程文件的原始名称 (a String
)。
入站通道适配器首先检索File
本地目录的对象,然后根据轮询器配置发出每个文件。从 5.0 版开始,您现在可以在需要检索新文件时限制从 FTP 服务器获取的文件数量。当目标文件非常大或在具有持久文件列表过滤器的集群系统中运行时,这可能很有用,稍后将讨论。用于max-fetch-size
此目的。负值(默认值)表示没有限制并检索所有匹配的文件。有关详细信息,请参阅入站通道适配器:控制远程文件获取。从 5.0 版开始,您还可以通过设置属性DirectoryScanner
来提供自定义实现。inbound-channel-adapter
scanner
从 Spring Integration 3.0 开始,您可以指定preserve-timestamp
属性(其默认值为false
)。当 时true
,本地文件的修改时间戳设置为从服务器检索的值。否则,将其设置为当前时间。
从版本 4.2 开始,您可以指定remote-directory-expression
而不是remote-directory
,让您在每次轮询时动态确定目录 - 例如,remote-directory-expression="@myBean.determineRemoteDir()"
.
从 4.3 版开始,您可以省略remote-directory
andremote-directory-expression
属性。他们默认为null
. 在这种情况下,根据 FTP 协议,客户端工作目录被用作默认的远程目录。
有时,基于filename-pattern
属性指定的简单模式的文件过滤可能不够。如果是这种情况,您可以使用该filename-regex
属性来指定正则表达式(例如filename-regex=".*\.test$"
)。此外,如果您需要完全控制,您可以使用该filter
属性并提供对任何自定义实现的引用,这o.s.i.file.filters.FileListFilter
是一个用于过滤文件列表的策略接口。此过滤器确定检索哪些远程文件。您还可以AcceptOnceFileListFilter
通过使用CompositeFileListFilter
.
将AcceptOnceFileListFilter
其状态存储在内存中。如果您希望状态在系统重新启动后仍然存在,请考虑FtpPersistentAcceptOnceFileListFilter
改用。此过滤器将接受的文件名存储在MetadataStore
策略实例中(请参阅元数据存储)。此过滤器匹配文件名和远程修改时间。
从 4.0 版开始,此过滤器需要ConcurrentMetadataStore
. 当与共享数据存储(例如)Redis
一起使用时RedisMetadataStore
,它允许在多个应用程序或服务器实例之间共享过滤器键。
从 5.0 版开始,FtpPersistentAcceptOnceFileListFilter
默认SimpleMetadataStore
情况下对FtpInboundFileSynchronizer
. 此过滤器还与 XML 配置中的regex
orpattern
选项以及FtpInboundChannelAdapterSpec
Java DSL 中的选项一起应用。任何其他用例都可以使用CompositeFileListFilter
(或ChainFileListFilter
)进行管理。
前面的讨论是指在检索文件之前过滤文件。检索文件后,将对文件系统上的文件应用附加过滤器。默认情况下,AcceptOnceFileListFilter
如前所述,这是一个在内存中保留状态并且不考虑文件修改时间的方法。除非您的应用程序在处理后删除文件,否则适配器将默认在应用程序重新启动后重新处理磁盘上的文件。
此外,如果您将 配置filter
为使用 aFtpPersistentAcceptOnceFileListFilter
并且远程文件时间戳发生更改(导致它被重新获取),则默认的本地过滤器不会让这个新文件被处理。
有关此过滤器及其使用方式的更多信息,请参阅远程持久文件列表过滤器。
您可以使用该local-filter
属性来配置本地文件系统过滤器的行为。从版本 4.3.8 开始,FileSystemPersistentAcceptOnceFileListFilter
默认配置 a。此过滤器将接受的文件名和修改的时间戳存储在MetadataStore
策略实例中(请参阅元数据存储),并检测本地文件修改时间的更改。默认MetadataStore
值为 a SimpleMetadataStore
,它将状态存储在内存中。
从版本 4.1.5 开始,这些过滤器有一个新属性 ( flushOnUpdate
),它会导致它们在每次更新时刷新元数据存储(如果存储实现了Flushable
)。
实际的本地过滤器是一个CompositeFileListFilter
包含提供的过滤器和一个模式过滤器,用于阻止处理正在下载的文件(基于temporary-file-suffix
)。使用此后缀(默认为.writing
)下载文件,并在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。
remote-file-separator
如果默认的“/”不适用于您的特定环境,该属性允许您配置要使用的文件分隔符。
有关这些属性的更多详细信息,请参见架构。
您还应该了解 FTP 入站通道适配器是轮询使用者。因此,您必须配置轮询器(通过使用全局默认值或本地子元素)。传输文件后,将生成带有 ajava.io.File
作为其有效负载的消息并将其发送到该channel
属性标识的通道。
更多关于文件过滤和不完整文件
有时,刚刚出现在被监控(远程)目录中的文件并不完整。通常,此类文件使用临时扩展名(例如somefile.txt.writing
)写入,然后在写入过程完成后重命名。在大多数情况下,您只对完整的文件感兴趣,并且只想过滤完整的文件。要处理这些情况,您可以使用 、 和 属性提供的filename-pattern
过滤filename-regex
支持filter
。以下示例使用自定义过滤器实现:
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站 FTP 适配器的轮询器配置说明
入站 FTP 适配器的工作包括两个任务:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一条带有该文件作为有效负载的消息,并将其发送到由“通道”属性标识的通道。这就是为什么它们被称为“'通道适配器'”而不仅仅是“'适配器'”。这种适配器的主要工作是生成要发送到消息通道的消息。本质上,第二个任务以这样一种方式优先,如果您的本地目录已经有一个或多个文件,它首先会从这些文件中生成消息。只有在处理完所有本地文件后,它才会启动远程通信以检索更多文件。
此外,在轮询器上配置触发器时,应密切注意该max-messages-per-poll
属性。它的默认值适用1
于所有SourcePollingChannelAdapter
实例(包括 FTP)。这意味着,一旦处理了一个文件,它就会等待由您的触发器配置确定的下一个执行时间。如果您碰巧有一个或多个文件位于 中local-directory
,它会在启动与远程 FTP 服务器的通信之前处理这些文件。此外,如果max-messages-per-poll
设置为1
(默认值),它一次只处理一个文件,间隔由触发器定义,本质上是“单轮询 === 一个文件”。
对于典型的文件传输用例,您很可能想要相反的行为:处理每次轮询可以处理的所有文件,然后才等待下一次轮询。如果是这种情况,请设置max-messages-per-poll
为 -1。然后,在每次轮询时,适配器都会尝试生成尽可能多的消息。换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输那里可用的所有内容以在本地处理。只有这样轮询操作才被认为完成,轮询器等待下一个执行时间。
您也可以将 'max-messages-per-poll' 值设置为正值,表示每次轮询时从文件创建的消息的上限。例如,值10
表示在每次轮询时,它尝试处理的文件不超过 10 个。
从故障中恢复
了解适配器的体系结构很重要。有一个获取文件的文件同步器和一个FileReadingMessageSource
为每个同步文件发出消息的文件同步器。如前所述,涉及两个过滤器。属性(filter
和模式)引用远程 (FTP) 文件列表,以避免获取已经获取的文件。local-filter
用于FileReadingMessageSource
确定要作为消息发送的文件。
同步器列出远程文件并查询其过滤器。然后传输文件。如果在文件传输过程中发生 IO 错误,任何已添加到过滤器的文件都将被删除,以便在下次轮询时重新获取它们。这仅适用于过滤器实现ReversibleFileListFilter
(例如AcceptOnceFileListFilter
)的情况。
如果文件同步后,下游流处理文件出错,过滤器不会自动回滚,所以默认不重新处理失败的文件。
如果您希望在失败后重新处理此类文件,可以使用类似于以下的配置来帮助从过滤器中删除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何ResettableFileListFilter
.
从 5.0 版本开始,入站通道适配器可以在本地构建与生成的本地文件名对应的子目录。这也可以是远程子路径。为了能够根据层次结构支持递归地读取本地目录以进行修改,您现在可以根据算法提供一个FileReadingMessageSource
带有新的内部目录。有关更多信息,请参阅。此外,您现在可以使用选项切换到-based 。它还为所有实例配置,以对本地目录中的任何修改做出反应。前面显示的重新处理示例基于从本地目录中删除文件 ( ) 时执行的内置功能。看RecursiveDirectoryScanner
Files.walk()
AbstractInboundFileSynchronizingMessageSource.setScanner()
AbstractInboundFileSynchronizingMessageSource
WatchService
DirectoryScanner
setUseWatchService()
WatchEventType
FileReadingMessageSource.WatchServiceDirectoryScanner
ResettableFileListFilter.remove()
StandardWatchEventKinds.ENTRY_DELETE
WatchServiceDirectoryScanner
了解更多信息。
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
FTP 流入站通道适配器
4.3 版引入了流式入站通道适配器。该适配器生成带有有效负载类型的消息InputStream
,让文件无需写入本地文件系统即可获取。由于会话保持打开状态,因此消费应用程序负责在文件被消费后关闭会话。会话在closeableResource
标题 ( IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
) 中提供。标准框架组件,例如FileSplitter
and StreamTransformer
,会自动关闭会话。有关这些组件的更多信息,请参阅文件拆分器和流转换器。以下示例显示了如何配置inbound-streaming-channel-adapter
:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
仅允许filename-pattern
, filename-regex
, filter
, 或之一。filter-expression
从 5.0 版开始,默认情况下,适配器基于 in-memoryFtpStreamingMessageSource 防止远程文件重复。默认情况下,此过滤器也适用于文件名模式(或正则表达式)。如果需要允许重复,可以使用. 任何其他用例都可以由(或)处理。Java 配置(稍后在文档中)显示了一种在处理后删除远程文件以避免重复的技术。
FtpPersistentAcceptOnceFileListFilter SimpleMetadataStore AcceptAllFileListFilter CompositeFileListFilter ChainFileListFilter |
有关FtpPersistentAcceptOnceFileListFilter
以及如何使用它的更多信息,请参阅远程持久文件列表过滤器。
当需要获取时,使用该max-fetch-size
属性限制每次轮询获取的文件数。在集群环境中运行时将其设置为1
并使用持久过滤器。有关详细信息,请参阅入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名分别放在FileHeaders.REMOTE_DIRECTORY
和FileHeaders.REMOTE_FILE
标头中。从 5.0 版开始,FileHeaders.REMOTE_FILE_INFO
标头提供了额外的远程文件信息(默认以 JSON 表示)。如果在to上设置fileInfoJson
属性,则标头包含一个对象。使用该方法可以访问底层 Apache Net 库提供的对象。当您使用 XML 配置时,该属性不可用,但您可以通过将其注入到您的配置类之一中来设置它。另请参阅远程文件信息。FtpStreamingMessageSource
false
FtpFileInfo
FTPFile
FtpFileInfo.getFileInfo()
fileInfoJson
FtpStreamingMessageSource
从 5.1 版开始, 的泛型类型comparator
是FTPFile
. 以前,它是AbstractFileInfo<FTPFile>
. 这是因为排序现在在处理的早期执行,在过滤和应用之前maxFetch
。
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序具有在处理后删除远程文件的建议。
入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,RotatingServerAdvice
可用;当配置为轮询建议时,入站适配器可以轮询多个服务器和目录。像往常一样配置建议并将其添加到轮询器的建议链中。ADelegatingSessionFactory
用于选择服务器,请参阅委托会话工厂了解更多信息。建议配置由RotationPolicy.KeyDirectory
对象列表组成。
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将轮询foo
服务器上的目录,one
直到不存在新文件,然后移至目录bar
,然后移至baz
服务器上的目录two
等。
可以使用fair
构造函数 arg 修改此默认行为:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,建议将移动到下一个服务器/目录,而不管之前的轮询是否返回了文件。
或者,您可以根据需要提供自己RotationPolicy
的重新配置消息源:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression
属性(在localFilenameGeneratorExpression
同步器上)现在可以包含#remoteDirectory
变量。这允许从不同目录检索的文件下载到本地类似目录:
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
TaskExecutor 使用此建议时
不要在轮询器上配置 a ;有关详细信息,请参阅消息源的条件轮询器。
|
入站通道适配器:控制远程文件获取
配置入站通道适配器时应考虑两个属性。
max-messages-per-poll
,与所有轮询器一样,可用于限制每次轮询时发出的消息数量(如果准备好配置的值以上)。
max-fetch-size
(从 5.0 版开始)可以限制一次从远程服务器检索的文件数量。
以下场景假设起始状态是一个空的本地目录:
-
max-messages-per-poll=2
andmax-fetch-size=1
:适配器获取一个文件,发出它,取出下一个文件,发出它,然后休眠直到下一次轮询。 -
max-messages-per-poll=2
andmax-fetch-size=2
):适配器获取两个文件,然后发出每个文件。 -
max-messages-per-poll=2
andmax-fetch-size=4
:适配器最多获取四个文件(如果可用)并发出前两个文件(如果至少有两个)。接下来的两个文件将在下一次轮询时发出。 -
max-messages-per-poll=2
且未max-fetch-size
指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。随后的文件在随后的轮询中发出(一次两个)。当所有文件都用完后,再次尝试远程获取,以获取任何新文件。
当您部署应用程序的多个实例时,我们建议使用较小的max-fetch-size , 以避免一个实例“抓取”所有文件并使其他实例挨饿。
|
的另一个用途max-fetch-size
是,如果您想停止获取远程文件但继续处理已经获取的文件。在(以编程方式,使用 JMX 或使用控制总线)上设置maxFetchSize
属性有效地阻止适配器获取更多文件,但让轮询器继续为先前获取的文件发出消息。如果属性更改时轮询器处于活动状态,则更改将在下一次轮询时生效。MessageSource
从 5.1 版开始,同步器可以提供一个Comparator<FTPFile>
. 这在限制使用maxFetchSize
.
FTP 出站通道适配器
FTP 出站通道适配器依赖于MessageHandler
连接到 FTP 服务器并为它在传入消息的有效负载中接收到的每个文件启动 FTP 传输的实现。它还支持文件的多种表示形式,因此您不仅限于java.io.File
-typed 有效负载。FTP 出站通道适配器支持以下负载:
-
java.io.File
: 实际的文件对象 -
byte[]
: 表示文件内容的字节数组 -
java.lang.String
:表示文件内容的文本 -
java.io.InputStream
:要传输到远程文件的数据流 -
org.springframework.core.io.Resource
: 数据传输到远程文件的资源
以下示例显示了如何配置outbound-channel-adapter
:
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
charset="UTF-8"
remote-file-separator="/"
auto-create-directory="true"
remote-directory-expression="headers['remote_dir']"
temporary-remote-directory-expression="headers['temp_remote_dir']"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
前面的配置展示了如何使用outbound-channel-adapter
元素配置 FTP 出站通道适配器,同时还为各种属性提供值,例如filename-generator
(o.s.i.file.FileNameGenerator
策略接口的实现)、对 a 的引用session-factory
和其他属性。您还可以查看一些*expression
属性示例,这些示例允许您使用 SpEL 来配置设置,例如remote-directory-expression
、temporary-remote-directory-expression
和remote-filename-generator-expression
(SpEL 的替代品filename-generator
,如前面的示例所示)。与任何允许使用 SpEL 的组件一样,可以通过“payload”和“headers”变量访问有效负载和消息头。有关可用属性的更多详细信息,请参阅架构。
默认情况下,如果没有指定文件名生成器,Spring Integration 使用o.s.i.file.DefaultFileNameGenerator .
根据 中的标头(如果存在)DefaultFileNameGenerator 的值确定文件名,或者,如果 Message 的有效负载已经是 a ,则使用该文件的原始名称。
file_name MessageHeaders java.io.File |
定义某些值(例如remote-directory )可能取决于平台或 FTP 服务器。例如,正如https://forum.spring.io/showthread.php?p=333478&posted=1#post333478remote-directory="/thing1/thing2/" 上报道的那样,在某些平台上,您必须在目录定义的末尾添加一个斜杠(例如,) remote-directory="/thing1/thing2" 。
|
从版本 4.1 开始,您可以指定mode
传输文件的时间。默认情况下,现有文件会被覆盖。模式由FileExistsMode
枚举定义,包括以下值:
-
REPLACE
(默认) -
REPLACE_IF_MODIFIED
-
APPEND
-
APPEND_NO_FLUSH
-
IGNORE
-
FAIL
IGNORE
并且FAIL
不要传输文件。
FAIL
导致抛出异常,同时IGNORE
默默地忽略传输(尽管DEBUG
会生成日志条目)。
5.2 版本引入了该chmod
属性,您可以使用该属性在上传后更改远程文件权限。您可以使用传统的 Unix 八进制格式(例如,600
只允许文件所有者读写)。使用 java 配置适配器时,可以使用setChmodOctal("600")
或setChmod(0600)
。仅当您的 FTP 服务器支持该SITE CHMOD
子命令时才适用。
避免部分写入文件
处理文件传输时出现的常见问题之一是处理部分文件的可能性。也就是说,文件可能在其传输实际完成之前出现在文件系统中。
为了解决这个问题,Spring Integration FTP 适配器使用一种通用算法:文件以临时名称传输,然后在完全传输后重命名。
默认情况下,正在传输的每个文件都出现在文件系统中,并带有一个附加后缀,默认情况下为.writing
. temporary-file-suffix
您可以通过设置属性来更改此后缀。
但是,在某些情况下您可能不想使用此技术(例如,如果服务器不允许重命名文件)。对于这种情况,您可以通过设置use-temporary-file-name
为false
(默认为true
)来禁用此功能。当此属性为false
时,文件将写入其最终名称,并且消费应用程序需要一些其他机制来检测文件是否已完全上传,然后才能访问它。
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlows.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
FTP 出站网关
FTP 出站网关提供一组有限的命令来与远程 FTP 或 FTPS 服务器交互。支持的命令有:
-
ls
(列出文件) -
nlst
(列出文件名) -
get
(检索文件) -
mget
(检索文件) -
rm
(删除文件) -
mv
(移动/重命名文件) -
put
(发送文件) -
mput
(发送多个文件)
使用ls
命令
ls
列出远程文件并支持以下选项:
-
-1
:检索文件名列表。默认是检索FileInfo
对象列表。 -
-a
: 包括所有文件(包括以'.'开头的文件) -
-f
: 不对列表进行排序 -
-dirs
: 包括目录(默认不包括在内) -
-links
:包括符号链接(默认情况下不包括在内) -
-R
: 递归列出远程目录
此外,还提供了文件名过滤,其方式与inbound-channel-adapter
. 请参阅FTP 入站通道适配器。
操作产生的消息有效负载ls
是文件名列表或FileInfo
对象列表。这些对象提供修改时间、权限和其他详细信息等信息。
该ls
命令所作用的远程目录在file_remoteDirectory
标头中提供。
使用递归选项 ( -R
) 时,fileName
包括任何子目录元素,表示文件的相对路径(相对于远程目录)。如果-dirs
包含该选项,则每个递归目录也作为列表中的一个元素返回。在这种情况下,建议您不要使用该-1
选项,因为您将无法区分文件和目录,而您可以使用FileInfo
对象来做到这一点。
从 4.3 版开始,支持FtpSession
和null
方法。因此,您可以省略该属性。为方便起见,Java 配置有两个没有参数的构造函数。根据 FTP 协议,或、、和命令被视为客户端工作目录。必须提供所有其他命令以根据请求消息评估远程路径。您可以在扩展并实现回调时使用该函数设置工作目录。list()
listNames()
expression
expression
LS
NLST
PUT
MPUT
null
expression
FTPClient.changeWorkingDirectory()
DefaultFtpSessionFactory
postProcessClientAfterConnect()
使用nlst
命令
版本 5 引入了对nlst
命令的支持。
nlst
列出远程文件名并且只支持一个选项:
-
-f
: 不对列表进行排序
操作产生的消息有效负载nlst
是文件名列表。
该nlst
命令所作用的远程目录在file_remoteDirectory
标头中提供。
使用get
命令
get
检索远程文件。它支持以下选项:
-
-P
:保留远程文件的时间戳。 -
-stream
: 以流的形式检索远程文件。 -
-D
: 传输成功后删除远程文件。如果忽略传输,远程文件不会被删除,因为FileExistsMode
它IGNORE
和本地文件已经存在。
标file_remoteDirectory
头提供远程目录名称,file_remoteFile
标头提供文件名。
操作产生的消息有效负载get
是一个File
对象,它表示检索到的文件或InputStream
当您使用该-stream
选项时。该-stream
选项允许将文件作为流检索。对于文本文件,一个常见的用例是将此操作与文件拆分器或流转换器结合使用。将远程文件作为流使用时,您有责任Session
在使用流后关闭。为方便起见,Session
在标题中提供了closeableResource
,您可以使用便捷方法访问它IntegrationMessageHeaderAccessor
。以下示例显示了如何使用便捷方法:
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
以下示例显示了如何将文件作为流使用:
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
如果您在自定义组件中使用输入流,则必须关闭Session . 您可以在自定义代码中执行此操作,也可以通过将消息副本路由到 aservice-activator 并使用 SpEL 来执行此操作,如以下示例所示:
|
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
使用mget
命令
mget
基于模式检索多个远程文件并支持以下选项:
-
-P
:保留远程文件的时间戳。 -
-R
:递归检索整个目录树。 -
-x
:如果没有文件与模式匹配,则抛出异常(否则返回空列表)。 -
-D
:成功传输后删除每个远程文件。如果忽略传输,远程文件不会被删除,因为FileExistsMode
它IGNORE
和本地文件已经存在。
操作产生的消息有效负载mget
是一个List<File>
对象(即一个List
对象File
,每个对象代表一个检索到的文件)。
从版本 5.0 开始,如果FileExistsMode 是IGNORE ,则输出消息的有效负载不再包含由于文件已存在而未获取的文件。以前,该列表包含所有文件,包括那些已经存在的文件。
|
用于确定远程路径的表达式应该产生一个以- 结尾的结果,例如
somedir/
将获取 . 下的完整树somedir
。
从 5.0 版本开始,递归mget
结合新FileExistsMode.REPLACE_IF_MODIFIED
模式可用于定期在本地同步整个远程目录树。此模式将本地文件的最后修改时间戳替换为远程时间戳,无论-P
(preserve timestamp) 选项如何。
使用递归 (
-R )该模式被忽略,并被 如果过滤了子目录,则不会执行对该子目录的额外遍历。
通常,您会在 中使用该 |
持久文件列表过滤器现在有一个布尔属性forRecursion
。将此属性设置为true
, 也设置alwaysAcceptDirectories
,这意味着出站网关 (ls
和mget
) 上的递归操作现在将始终遍历整个目录树。这是为了解决未检测到目录树深处更改的问题。此外,forRecursion=true
使文件的完整路径用作元数据存储键;这解决了如果同名文件在不同目录中多次出现时过滤器无法正常工作的问题。重要提示:这意味着将无法为顶级目录下的文件找到持久元数据存储中的现有密钥。为此,该物业false
默认; 这可能会在未来的版本中改变。
从 5.0 版开始,FtpSimplePatternFileListFilter
和FtpRegexPatternFileListFilter
可以通过将alwaysAcceptDirectories
属性设置为 来配置为始终传递目录true
。这样做允许对简单模式进行递归,如以下示例所示:
<bean id="starDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
一旦您定义了如前面示例中的过滤器,您可以通过filter
在网关上设置属性来使用一个。
另请参阅出站网关部分成功(mget
和mput
)。
使用put
命令
commad将put
文件发送到远程服务器。消息的有效负载可以是 a java.io.File
、 abyte[]
或 a String
。A remote-filename-generator
(或表达式)用于命名远程文件。其他可用属性包括remote-directory
,temporary-remote-directory
和它们的*-expression
等价物:use-temporary-file-name
和auto-create-directory
. 有关更多信息,请参阅架构文档。
put
操作产生的消息有效负载是String
表示传输后服务器上文件的完整路径的 a。
5.2 版本引入了该chmod
属性,该属性会在上传后更改远程文件的权限。您可以使用传统的 Unix 八进制格式(例如,600
只允许文件所有者读写)。使用 java 配置适配器时,可以使用setChmod(0600)
. 仅当您的 FTP 服务器支持该SITE CHMOD
子命令时才适用。
使用mput
命令
将mput
多个文件发送到服务器并且仅支持一个选项:
-
-R
: 递归。发送目录及其子目录中的所有文件(可能已过滤)。
消息有效负载必须是表示本地目录的java.io.File
(或)。从 5.1 版开始,也支持orString
的集合。File
String
put
此命令支持与command相同的属性。此外,可以使用 、 、 或 之一过滤本地目录mput-pattern
中mput-regex
的mput-filter
文件mput-filter-expression
。只要子目录本身通过过滤器,过滤器就可以使用递归。不通过过滤器的子目录不会被递归。
由操作产生的消息有效负载mput
是一个List<String>
对象(即,List
由传输产生的远程文件路径的一个)。
另请参阅出站网关部分成功(mget
和mput
)。
5.2 版引入了该chmod
属性,可让您在上传后更改远程文件权限。您可以使用传统的 Unix 八进制格式(例如,600
只允许文件所有者读写)。使用 Java 配置适配器时,可以使用setChmodOctal("600")
或setChmod(0600)
。仅当您的 FTP 服务器支持该SITE CHMOD
子命令时才适用。
使用rm
命令
该rm
命令删除文件。
该rm
命令没有选项。
rm
操作产生的消息有效负载是Boolean.TRUE
删除是否成功Boolean.FALSE
。标file_remoteDirectory
头提供远程目录,file_remoteFile
标头提供文件名。
使用mv
命令
该mv
命令移动文件。
该mv
命令没有选项。
属性定义“ expression
from”路径,rename-expression
属性定义“to”路径。默认情况下,rename-expression
是headers['file_renameTo']
. 此表达式的计算结果不得为 null 或空String
。如有必要,将创建任何必要的远程目录。结果消息的有效负载是Boolean.TRUE
。标file_remoteDirectory
头提供原始远程目录,file_remoteFile
标头提供文件名。新路径在file_renameTo
标题中。
从版本 5.5.6 开始,为了方便,remoteDirectoryExpression
可以在命令中使用。mv
如果“from”文件不是完整的文件路径,则将结果remoteDirectoryExpression
用作远程目录。这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。
有关 FTP 出站网关命令的附加信息
get
andmget
命令支持该属性local-filename-generator-expression
。它定义了一个 SpEL 表达式以在传输过程中生成本地文件的名称。评估上下文的根对象是请求消息。remoteFileName
对尤其有用的变量mget
也可用,例如local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something"
。
get
andmget
命令支持该属性local-directory-expression
。它定义了一个 SpEL 表达式以在传输过程中生成本地目录的名称。评估上下文的根对象是请求消息但是。remoteDirectory
对尤其有用的变量mget
也可用 - 例如:local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something"
。该属性与该属性互斥local-directory
。
对于所有命令,网关的“表达式”属性提供了命令作用的路径。对于mget
命令,表达式的计算结果可能为 ' ',意思是检索所有文件,或 'somedirectory/ ',等等。
以下示例显示了为ls
命令配置的网关:
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
发送到toSplitter
通道的消息的有效负载是一个对象列表String
,每个对象都包含一个文件的名称。如果command-options
省略该属性,则它包含FileInfo
对象。它使用空格分隔的选项——例如,command-options="-1 -dirs -links"
.
从 4.2 版开始GET
,MGET
、PUT
和MPUT
命令支持FileExistsMode
属性(mode
使用命名空间支持时)。这会影响本地文件存在 ( GET
and MGET
) 或远程文件存在 ( PUT
and MPUT
) 时的行为。支持的模式是REPLACE
、APPEND
、FAIL
和IGNORE
。为了向后兼容,PUT
和MPUT
操作的默认模式是REPLACE
. 对于GET
和MGET
运算,默认为FAIL
。
从版本 5.0 开始,在 (in XML) 上提供了setWorkingDirExpression()
( working-dir-expression
in XML)FtpOutboundGateway
选项<int-ftp:outbound-gateway>
。它允许您在运行时更改客户端工作目录。根据请求消息评估表达式。每次网关操作后都会恢复以前的工作目录。
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站网关的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpOutboundGateway ftpOutboundGateway =
new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
return ftpOutboundGateway;
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站网关的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpOutboundGatewaySpec ftpOutboundGateway() {
return Ftp.outboundGateway(ftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subFtpSource|.*1.txt)")
.localDirectoryExpression("'localDirectory/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
}
@Bean
public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
return f -> f
.handle(ftpOutboundGateway)
.channel(c -> c.queue("remoteFileOutputChannel"));
}
}
出站网关部分成功(mget
和mput
)
当您对多个文件执行操作时(通过使用mget
and mput
),在传输一个或多个文件后的某个时间可能会发生异常。在这种情况下(从 4.2 版开始),PartialSuccessException
会抛出 a。除了通常的MessagingException
属性 (failedMessage
和cause
),此异常还有两个附加属性:
-
partialResults
: 传输成功的结果。 -
derivedInput
:从请求消息生成的文件列表(例如,要传输的本地文件mput
)。
这些属性可让您确定哪些文件已成功传输,哪些未成功传输。
在递归的情况下mput
,PartialSuccessException
可能有嵌套的PartialSuccessException
出现。
考虑以下目录结构:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
如果异常发生在file3.txt
,网关抛出的PartialSuccessException
有derivedInput
of file1.txt
、subdir
、zoo.txt
和partialResults
。file1.txt
它是cause
另一个PartialSuccessException
与derivedInput
offile2.txt
和file3.txt
of
partialResults
。file2.txt
FTP 会话缓存
从 Spring Integration 3.0 开始,默认情况下不再缓存会话。cache-sessions 端点不再支持该属性。如果您希望缓存会话,则必须使用CachingSessionFactory (在下一个示例中显示)。
|
在 3.0 之前的版本中,默认情况下会自动缓存会话。有一个cache-sessions
属性可用于禁用自动缓存,但该解决方案没有提供配置其他会话缓存属性的方法。例如,您不能限制创建的会话数。为了支持该要求和其他配置选项,CachingSessionFactory
添加了一个。它提供sessionCacheSize
和sessionWaitTimeout
属性。该sessionCacheSize
属性控制工厂在其缓存中维护的活动会话数(默认为无界)。如果sessionCacheSize
已达到阈值,则任何获取另一个会话的尝试都会阻塞,直到其中一个缓存会话变得可用或直到会话的等待时间到期(默认等待时间为Integer.MAX_VALUE
)。这sessionWaitTimeout
属性配置该值。
如果您希望缓存会话,请按照前面所述配置默认会话工厂,然后将其包装在 的实例中CachingSessionFactory
,您可以在其中提供这些附加属性。以下示例显示了如何执行此操作:
<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
前面的示例显示了一个CachingSessionFactory
createdsessionCacheSize
设置为10
和
sessionWaitTimeout
设置为一秒(其值以毫秒为单位)。
从 Spring Integration 3.0 开始,CachingConnectionFactory
提供了一个resetCache()
方法。调用时,所有空闲会话都立即关闭,使用中的会话在返回缓存时关闭。新的会话请求会根据需要建立新的会话。
从 5.1 版开始,CachingSessionFactory
有一个新属性testSession
。当为 true 时,会话将通过发送 NOOP 命令进行测试,以确保它仍然处于活动状态;如果没有,它将从缓存中删除;如果缓存中没有活动会话,则会创建一个新会话。
使用RemoteFileTemplate
从 Spring Integration 3.0 开始,为对象提供了一个新的抽象FtpSession
。该模板提供了发送、检索(作为InputStream
)、删除和重命名文件的方法。此外,execute
还提供了一种方法,允许调用者在会话上执行多个操作。在所有情况下,模板都会负责可靠地关闭会话。有关更多信息,请
参阅RemoteFileTemplate
. FTP 有一个子类:FtpRemoteFileTemplate
.
4.1 版添加了额外的方法,包括getClientInstance()
,它提供对底层的访问FTPClient
,从而使您可以访问低级 API。
并非所有 FTP 服务器都能正确执行该STAT <path>
命令。有些返回不存在路径的肯定结果。NLST
当路径是文件并且存在时,该命令可靠地返回名称。但是,这不支持检查是否存在空目录,因为NLST
当路径是目录时总是返回一个空列表。由于模板不知道路径是否代表目录,因此当路径似乎不存在时(使用 时NLST
),它必须执行额外的检查。这增加了开销,需要向服务器发出多个请求。从版本 4.1.9 开始,FtpRemoteFileTemplate
提供了FtpRemoteFileTemplate.ExistsMode
属性,该属性具有以下选项:
-
STAT
: 执行STAT
FTP 命令 (FTPClient.getStatus(path)
) 以检查路径是否存在。这是默认设置,需要您的 FTP 服务器正确支持该STAT
命令(带有路径)。 -
NLST
:执行NLST
FTP 命令 —FTPClient.listName(path)
。如果您要测试的路径是文件的完整路径,请使用此选项。它不适用于空目录。 -
NLST_AND_DIRS
:首先执行NLST
命令,如果它没有返回文件,则回退到使用 . 临时切换工作目录的技术FTPClient.changeWorkingDirectory(path)
。有关FtpSession.exists()
更多信息,请参阅。
因为我们知道FileExistsMode.FAIL
case 总是只查找文件(而不是目录),所以我们安全地为and组件使用NLST
模式。FtpMessageHandler
FtpOutboundGateway
对于任何其他情况,FtpRemoteFileTemplate
可以扩展以在重写的exist()
方法中实现自定义逻辑。
从 5.0 版开始,新RemoteFileOperations.invoke(OperationsCallback<F, T> action)
方法可用。此方法允许RemoteFileOperations
在同一个线程有界的范围内调用多个调用Session
。当您需要将多个高级操作RemoteFileTemplate
作为一个工作单元执行时,这很有用。例如,AbstractRemoteFileOutboundGateway
将它与mput
命令实现一起使用,我们put
对提供的目录中的每个文件执行操作,并递归地对其子目录执行操作。有关更多信息,请参阅Javadoc。
使用MessageSessionCallback
从 Spring Integration 4.2 开始,您可以使用MessageSessionCallback<F, T>
带有
<int-ftp:outbound-gateway/>
(FtpOutboundGateway
在 Java 中)的实现来对Session<FTPFile>
带有requestMessage
上下文的任何操作。它可用于任何非标准或低级 FTP 操作,并允许从集成流定义和功能接口 (Lambda) 实现注入进行访问,如以下示例所示:
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
return new FtpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
另一个示例可能是对正在发送或检索的文件数据进行预处理或后处理。
使用 XML 配置时,<int-ftp:outbound-gateway/>
提供了一个session-callback
属性让您指定MessageSessionCallback
bean 名称。
与和属性session-callback 互斥。使用 Java 进行配置时,类中可以使用不同的构造函数。
command expression FtpOutboundGateway |
Apache Mina FTP 服务器事件
在 5.2 版中添加的ApacheMinaFtplet
,侦听某些 Apache Mina FTP 服务器事件并将它们发布为ApplicationEvent
s,可以由任何ApplicationListener
bean、@EventListener
bean 方法或Event Inbound Channel Adapter接收。
目前支持的事件有:
-
SessionOpenedEvent
- 客户端会话已打开 -
DirectoryCreatedEvent
- 创建了一个目录 -
FileWrittenEvent
- 一个文件被写入 -
PathMovedEvent
- 文件或目录被重命名 -
PathRemovedEvent
- 删除了文件或目录 -
SessionClosedEvent
- 客户端已断开连接
这些中的每一个都是ApacheMinaFtpEvent
; 您可以配置单个侦听器来接收所有事件类型。每个事件的source
属性是一个FtpSession
,可以从中获取客户端地址等信息;在抽象事件上提供了一种方便的getSession()
方法。
会话打开/关闭以外的事件具有另一个属性FtpRequest
,该属性具有命令和参数等属性。
要使用侦听器(必须是 Spring bean)配置服务器,请将其添加到服务器工厂:
FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();
要使用 Spring Integration 事件适配器使用这些事件:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaFtpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
远程文件信息
从版本 5.2 开始,FtpStreamingMessageSource
( FTP 流入站通道适配器)、FtpInboundFileSynchronizingMessageSource
( FTP 入站通道适配器FtpOutboundGateway
) 和( FTP 出站网关)的“读取”命令在消息中提供了额外的标头,以生成有关远程文件的信息:
-
FileHeaders.REMOTE_HOST_PORT
- 在文件传输操作期间远程会话已连接到的主机:端口对; -
FileHeaders.REMOTE_DIRECTORY
- 已执行操作的远程目录; -
FileHeaders.REMOTE_FILE
- 远程文件名;仅适用于单个文件操作。
由于FtpInboundFileSynchronizingMessageSource
不生成针对远程文件的消息,而是使用本地副本,因此在同步操作期间,AbstractInboundFileSynchronizer
将有关远程文件的信息存储MetadataStore
在 URI 样式 ( ) 中(可以在外部配置) 。protocol://host:port/remoteDirectory#remoteFileName
此元数据由FtpInboundFileSynchronizingMessageSource
轮询本地文件时检索。当本地文件被删除时,建议删除其元数据条目。为此目的提供AbstractInboundFileSynchronizer
回调。removeRemoteFileMetadata()
此外setMetadataStorePrefix()
,元数据键中还有一个要使用的键。当相同的实例在这些组件之间共享时,建议将此前缀与MetadataStore
基于- 的实现中使用的前缀不同,以避免条目覆盖,因为过滤器和FileListFilter
MetadataStore
AbstractInboundFileSynchronizer
元数据条目键使用相同的本地文件名。