Skip to content

GH-3557: Add maxDepth, dirPredicate to FileReadMS #8596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
Expand All @@ -29,6 +30,7 @@
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -39,6 +41,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Matcher;

import org.springframework.context.Lifecycle;
Expand Down Expand Up @@ -122,6 +125,10 @@ public class FileReadingMessageSource extends AbstractMessageSource<File> implem

private WatchEventType[] watchEvents = {WatchEventType.CREATE};

private int watchMaxDepth = Integer.MAX_VALUE;

private Predicate<Path> watchDirPredicate = path -> true;

/**
* Create a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
*/
Expand Down Expand Up @@ -237,15 +244,14 @@ public void setLocker(FileLocker locker) {
* Set this flag if you want to make sure the internal queue is
* refreshed with the latest content of the input directory on each poll.
* <p>
* By default this implementation will empty its queue before looking at the
* By default, this implementation will empty its queue before looking at the
* directory again. In cases where order is relevant it is important to
* consider the effects of setting this flag. The internal
* {@link java.util.concurrent.BlockingQueue} that this class is keeping
* will more likely be out of sync with the file system if this flag is set
* to false, but it will change more often (causing expensive reordering) if it is set to true.
* @param scanEachPoll
* whether or not the component should re-scan (as opposed to not
* rescanning until the entire backlog has been delivered)
* @param scanEachPoll whether the component should re-scan (as opposed to not
* rescanning until the entire backlog has been delivered)
*/
public void setScanEachPoll(boolean scanEachPoll) {
this.scanEachPoll = scanEachPoll;
Expand Down Expand Up @@ -282,6 +288,28 @@ public void setWatchEvents(WatchEventType... watchEvents) {
this.watchEvents = Arrays.copyOf(watchEvents, watchEvents.length);
}

/**
* Set a max depth for the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} API when
* {@link #useWatchService} is enabled.
* Defaults to {@link Integer#MAX_VALUE} - walk the whole tree.
* @param watchMaxDepth the depth for {@link Files#walkFileTree(Path, Set, int, FileVisitor)}.
* @since 6.1
*/
public void setWatchMaxDepth(int watchMaxDepth) {
this.watchMaxDepth = watchMaxDepth;
}

/**
* Set a {@link Predicate} to check a directory in the {@link Files#walkFileTree(Path, Set, int, FileVisitor)} call
* if it is eligible for {@link WatchService}.
* @param watchDirPredicate the {@link Predicate} to check dirs for walking.
* @since 6.1
*/
public void setWatchDirPredicate(Predicate<Path> watchDirPredicate) {
Assert.notNull(watchDirPredicate, "'watchDirPredicate' must not be null.");
this.watchDirPredicate = watchDirPredicate;
}

@Override
public String getComponentType() {
return "file:inbound-channel-adapter";
Expand All @@ -299,16 +327,16 @@ public void start() {
() -> "Source path [" + this.directory + "] does not point to a directory.");
Assert.isTrue(this.directory.canRead(),
() -> "Source directory [" + this.directory + "] is not readable.");
if (this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).start();
if (this.scanner instanceof Lifecycle lifecycle) {
lifecycle.start();
}
}
}

@Override
public void stop() {
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle) {
((Lifecycle) this.scanner).stop();
if (this.running.getAndSet(false) && this.scanner instanceof Lifecycle lifecycle) {
lifecycle.stop();
}
}

Expand Down Expand Up @@ -418,8 +446,8 @@ private class WatchServiceDirectoryScanner extends DefaultDirectoryScanner imple

@Override
public void setFilter(FileListFilter<File> filter) {
if (filter instanceof DiscardAwareFileListFilter) {
((DiscardAwareFileListFilter<File>) filter).addDiscardCallback(this.filesToPoll::add);
if (filter instanceof DiscardAwareFileListFilter<File> discardAwareFileListFilter) {
discardAwareFileListFilter.addDiscardCallback(this.filesToPoll::add);
}
super.setFilter(filter);
}
Expand Down Expand Up @@ -505,8 +533,8 @@ private void processFilesFromNormalEvent(Set<File> files, File parentDir, WatchE
logger.debug(() -> "Watch event [" + event.kind() + "] for file [" + file + "]");

if (StandardWatchEventKinds.ENTRY_DELETE.equals(event.kind())) {
if (getFilter() instanceof ResettableFileListFilter) {
((ResettableFileListFilter<File>) getFilter()).remove(file);
if (getFilter() instanceof ResettableFileListFilter<File> resettableFileListFilter) {
resettableFileListFilter.remove(file);
}
boolean fileRemoved = files.remove(file);
if (fileRemoved) {
Expand Down Expand Up @@ -540,8 +568,8 @@ private void processFilesFromOverflowEvent(Set<File> files, WatchEvent<?> event)
}
this.pathKeys.clear();

if (event.context() != null && event.context() instanceof Path) {
files.addAll(walkDirectory((Path) event.context(), event.kind()));
if (event.context() != null && event.context() instanceof Path path) {
files.addAll(walkDirectory(path, event.kind()));
}
else {
files.addAll(walkDirectory(FileReadingMessageSource.this.directory.toPath(), event.kind()));
Expand All @@ -552,25 +580,32 @@ private Set<File> walkDirectory(Path directory, final WatchEvent.Kind<?> kind) {
final Set<File> walkedFiles = new LinkedHashSet<>();
try {
registerWatch(directory);
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.preVisitDirectory(dir, attrs);
registerWatch(dir);
return fileVisitResult;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
walkedFiles.add(file.toFile());
}
return fileVisitResult;
}

});
Files.walkFileTree(directory, Collections.emptySet(), FileReadingMessageSource.this.watchMaxDepth,
new SimpleFileVisitor<>() {

@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {

if (FileReadingMessageSource.this.watchDirPredicate.test(dir)) {
registerWatch(dir);
return FileVisitResult.CONTINUE;
}
else {
return FileVisitResult.SKIP_SUBTREE;
}
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
FileVisitResult fileVisitResult = super.visitFile(file, attrs);
if (!StandardWatchEventKinds.ENTRY_MODIFY.equals(kind)) {
walkedFiles.add(file.toFile());
}
return fileVisitResult;
}

});
}
catch (IOException ex) {
logger.error(ex, () -> "Failed to walk directory: " + directory.toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,10 +17,12 @@
package org.springframework.integration.file.dsl;

import java.io.File;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.dsl.ComponentsRegistration;
Expand Down Expand Up @@ -276,6 +278,32 @@ public FileInboundChannelAdapterSpec watchEvents(FileReadingMessageSource.WatchE
return this;
}

/**
* Set a depth for files walk API.
* @param watchMaxDepth the depth for files walk.
* @return the spec.
* @since 6.1
* @see #useWatchService
* @see FileReadingMessageSource#setWatchMaxDepth(int)
*/
public FileInboundChannelAdapterSpec watchMaxDepth(int watchMaxDepth) {
target.setWatchMaxDepth(watchMaxDepth);
return this;
}

/**
* Set a {@link Predicate} to check if it is eligible for {@link java.nio.file.WatchService}.
* @param watchDirPredicate the {@link Predicate} to check dirs for walking.
* @return the spec.
* @since 6.1
* @see #useWatchService
* @see FileReadingMessageSource#setWatchDirPredicate(Predicate)
*/
public FileInboundChannelAdapterSpec watchDirPredicate(Predicate<Path> watchDirPredicate) {
target.setWatchDirPredicate(watchDirPredicate);
return this;
}

@Override
public Map<Object, String> getComponentsToRegister() {
if (this.scanner == null || this.filtersSet) {
Expand Down
Loading