Skip to content

Commit 053cc00

Browse files
GH-3557: Add maxDepth, dirPredicate to FileReadMS (#8596)
* GH-3557: Add maxDepth, dirPredicate to FileReadMS Fixes #3557 * Expose a `watchMaxDepth` on the `FileReadingMessageSource` for its `Files.walkFileTree()` API usage * Add `watchDirPredicate` option ot the `FileReadingMessageSource` to skip sub-tree for `Files.walkFileTree()` scanning according to some condition against directory `Path` * Fix language in docs Co-authored-by: Gary Russell <[email protected]> --------- Co-authored-by: Gary Russell <[email protected]>
1 parent aaaa489 commit 053cc00

File tree

5 files changed

+159
-77
lines changed

5 files changed

+159
-77
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/FileReadingMessageSource.java

+69-34
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.file.FileSystems;
2222
import java.nio.file.FileVisitResult;
23+
import java.nio.file.FileVisitor;
2324
import java.nio.file.Files;
2425
import java.nio.file.Path;
2526
import java.nio.file.SimpleFileVisitor;
@@ -29,6 +30,7 @@
2930
import java.nio.file.WatchService;
3031
import java.nio.file.attribute.BasicFileAttributes;
3132
import java.util.Arrays;
33+
import java.util.Collections;
3234
import java.util.Comparator;
3335
import java.util.Iterator;
3436
import java.util.LinkedHashSet;
@@ -39,6 +41,7 @@
3941
import java.util.concurrent.ConcurrentMap;
4042
import java.util.concurrent.PriorityBlockingQueue;
4143
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.function.Predicate;
4245
import java.util.regex.Matcher;
4346

4447
import org.springframework.context.Lifecycle;
@@ -122,6 +125,10 @@ public class FileReadingMessageSource extends AbstractMessageSource<File> implem
122125

123126
private WatchEventType[] watchEvents = {WatchEventType.CREATE};
124127

128+
private int watchMaxDepth = Integer.MAX_VALUE;
129+
130+
private Predicate<Path> watchDirPredicate = path -> true;
131+
125132
/**
126133
* Create a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
127134
*/
@@ -237,15 +244,14 @@ public void setLocker(FileLocker locker) {
237244
* Set this flag if you want to make sure the internal queue is
238245
* refreshed with the latest content of the input directory on each poll.
239246
* <p>
240-
* By default this implementation will empty its queue before looking at the
247+
* By default, this implementation will empty its queue before looking at the
241248
* directory again. In cases where order is relevant it is important to
242249
* consider the effects of setting this flag. The internal
243250
* {@link java.util.concurrent.BlockingQueue} that this class is keeping
244251
* will more likely be out of sync with the file system if this flag is set
245252
* to false, but it will change more often (causing expensive reordering) if it is set to true.
246-
* @param scanEachPoll
247-
* whether or not the component should re-scan (as opposed to not
248-
* rescanning until the entire backlog has been delivered)
253+
* @param scanEachPoll whether the component should re-scan (as opposed to not
254+
* rescanning until the entire backlog has been delivered)
249255
*/
250256
public void setScanEachPoll(boolean scanEachPoll) {
251257
this.scanEachPoll = scanEachPoll;
@@ -282,6 +288,28 @@ public void setWatchEvents(WatchEventType... watchEvents) {
282288
this.watchEvents = Arrays.copyOf(watchEvents, watchEvents.length);
283289
}
284290

291+
/**
292+
* Set a max depth for the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} API when
293+
* {@link #useWatchService} is enabled.
294+
* Defaults to {@link Integer#MAX_VALUE} - walk the whole tree.
295+
* @param watchMaxDepth the depth for {@link Files#walkFileTree(Path, Set, int, FileVisitor)}.
296+
* @since 6.1
297+
*/
298+
public void setWatchMaxDepth(int watchMaxDepth) {
299+
this.watchMaxDepth = watchMaxDepth;
300+
}
301+
302+
/**
303+
* Set a {@link Predicate} to check a directory in the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} call
304+
* if it is eligible for {@link WatchService}.
305+
* @param watchDirPredicate the {@link Predicate} to check dirs for walking.
306+
* @since 6.1
307+
*/
308+
public void setWatchDirPredicate(Predicate<Path> watchDirPredicate) {
309+
Assert.notNull(watchDirPredicate, "'watchDirPredicate' must not be null.");
310+
this.watchDirPredicate = watchDirPredicate;
311+
}
312+
285313
@Override
286314
public String getComponentType() {
287315
return "file:inbound-channel-adapter";
@@ -299,16 +327,16 @@ public void start() {
299327
() -> "Source path [" + this.directory + "] does not point to a directory.");
300328
Assert.isTrue(this.directory.canRead(),
301329
() -> "Source directory [" + this.directory + "] is not readable.");
302-
if (this.scanner instanceof Lifecycle) {
303-
((Lifecycle) this.scanner).start();
330+
if (this.scanner instanceof Lifecycle lifecycle) {
331+
lifecycle.start();
304332
}
305333
}
306334
}
307335

308336
@Override
309337
public void stop() {
310-
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle) {
311-
((Lifecycle) this.scanner).stop();
338+
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle lifecycle) {
339+
lifecycle.stop();
312340
}
313341
}
314342

@@ -418,8 +446,8 @@ private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner imple
418446

419447
@Override
420448
public void setFilter(FileListFilter<File> filter) {
421-
if (filter instanceof DiscardAwareFileListFilter) {
422-
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
449+
if (filter instanceof DiscardAwareFileListFilter<File> discardAwareFileListFilter) {
450+
discardAwareFileListFilter.addDiscardCallback(this.filesToPoll::add);
423451
}
424452
super.setFilter(filter);
425453
}
@@ -505,8 +533,8 @@ private void processFilesFromNormalEvent(Set<File> files, File parentDir, WatchE
505533
logger.debug(() -> "Watch event [" + event.kind() + "] for file [" + file + "]");
506534

507535
if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())) {
508-
if (getFilter() instanceof ResettableFileListFilter) {
509-
((ResettableFileListFilter<File>) getFilter()).remove(file);
536+
if (getFilter() instanceof ResettableFileListFilter<File> resettableFileListFilter) {
537+
resettableFileListFilter.remove(file);
510538
}
511539
boolean fileRemoved = files.remove(file);
512540
if (fileRemoved) {
@@ -540,8 +568,8 @@ private void processFilesFromOverflowEvent(Set<File> files, WatchEvent<?> event)
540568
}
541569
this.pathKeys.clear();
542570

543-
if (event.context() != null && event.context() instanceof Path) {
544-
files.addAll(walkDirectory((Path) event.context(), event.kind()));
571+
if (event.context() != null && event.context() instanceof Path path) {
572+
files.addAll(walkDirectory(path, event.kind()));
545573
}
546574
else {
547575
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
@@ -552,25 +580,32 @@ private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
552580
final Set<File> walkedFiles = new LinkedHashSet<>();
553581
try {
554582
registerWatch(directory);
555-
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
556-
557-
@Override
558-
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
559-
FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
560-
registerWatch(dir);
561-
return fileVisitResult;
562-
}
563-
564-
@Override
565-
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
566-
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
567-
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
568-
walkedFiles.add(file.toFile());
569-
}
570-
return fileVisitResult;
571-
}
572-
573-
});
583+
Files.walkFileTree(directory, Collections.emptySet(), FileReadingMessageSource.this.watchMaxDepth,
584+
new SimpleFileVisitor<>() {
585+
586+
@Override
587+
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
588+
throws IOException {
589+
590+
if (FileReadingMessageSource.this.watchDirPredicate.test(dir)) {
591+
registerWatch(dir);
592+
return FileVisitResult.CONTINUE;
593+
}
594+
else {
595+
return FileVisitResult.SKIP_SUBTREE;
596+
}
597+
}
598+
599+
@Override
600+
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
601+
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
602+
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
603+
walkedFiles.add(file.toFile());
604+
}
605+
return fileVisitResult;
606+
}
607+
608+
});
574609
}
575610
catch (IOException ex) {
576611
logger.error(ex, () -> "Failed to walk directory: " + directory.toString());

spring-integration-file/src/main/java/org/springframework/integration/file/dsl/FileInboundChannelAdapterSpec.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,10 +17,12 @@
1717
package org.springframework.integration.file.dsl;
1818

1919
import java.io.File;
20+
import java.nio.file.Path;
2021
import java.util.Collections;
2122
import java.util.Comparator;
2223
import java.util.Map;
2324
import java.util.function.Function;
25+
import java.util.function.Predicate;
2426

2527
import org.springframework.beans.DirectFieldAccessor;
2628
import org.springframework.integration.dsl.ComponentsRegistration;
@@ -276,6 +278,32 @@ public FileInboundChannelAdapterSpec watchEvents(FileReadingMessageSource.WatchE
276278
return this;
277279
}
278280

281+
/**
282+
* Set a depth for files walk API.
283+
* @param watchMaxDepth the depth for files walk.
284+
* @return the spec.
285+
* @since 6.1
286+
* @see #useWatchService
287+
* @see FileReadingMessageSource#setWatchMaxDepth(int)
288+
*/
289+
public FileInboundChannelAdapterSpec watchMaxDepth(int watchMaxDepth) {
290+
target.setWatchMaxDepth(watchMaxDepth);
291+
return this;
292+
}
293+
294+
/**
295+
* Set a {@link Predicate} to check if it is eligible for {@link java.nio.file.WatchService}.
296+
* @param watchDirPredicate the {@link Predicate} to check dirs for walking.
297+
* @return the spec.
298+
* @since 6.1
299+
* @see #useWatchService
300+
* @see FileReadingMessageSource#setWatchDirPredicate(Predicate)
301+
*/
302+
public FileInboundChannelAdapterSpec watchDirPredicate(Predicate<Path> watchDirPredicate) {
303+
target.setWatchDirPredicate(watchDirPredicate);
304+
return this;
305+
}
306+
279307
@Override
280308
public Map<Object, String> getComponentsToRegister() {
281309
if (this.scanner == null || this.filtersSet) {

0 commit comments

Comments
 (0)