11/*
2- * Copyright 2002-2018 the original author or authors.
2+ * Copyright 2002-2019 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
4747import org .springframework .integration .file .filters .FileListFilter ;
4848import org .springframework .integration .file .filters .ResettableFileListFilter ;
4949import org .springframework .integration .support .AbstractIntegrationMessageBuilder ;
50+ import org .springframework .lang .Nullable ;
5051import org .springframework .messaging .Message ;
5152import org .springframework .util .Assert ;
5253
@@ -103,23 +104,23 @@ public class FileReadingMessageSource extends AbstractMessageSource<File>
103104 */
104105 private final Queue <File > toBeReceived ;
105106
106- private volatile File directory ;
107+ private File directory ;
107108
108- private volatile DirectoryScanner scanner = new DefaultDirectoryScanner ();
109+ private DirectoryScanner scanner = new DefaultDirectoryScanner ();
109110
110- private volatile boolean scannerExplicitlySet ;
111+ private boolean scannerExplicitlySet ;
111112
112- private volatile boolean autoCreateDirectory = true ;
113+ private boolean autoCreateDirectory = true ;
113114
114- private volatile boolean scanEachPoll = false ;
115+ private boolean scanEachPoll = false ;
115116
116117 private FileListFilter <File > filter ;
117118
118119 private FileLocker locker ;
119120
120121 private boolean useWatchService ;
121122
122- private WatchEventType [] watchEvents = new WatchEventType [] { WatchEventType .CREATE };
123+ private WatchEventType [] watchEvents = { WatchEventType .CREATE };
123124
124125 /**
125126 * Creates a FileReadingMessageSource with a naturally ordered queue of unbounded capacity.
@@ -132,7 +133,6 @@ public FileReadingMessageSource() {
132133 * Creates a FileReadingMessageSource with a bounded queue of the given
133134 * capacity. This can be used to reduce the memory footprint of this
134135 * component when reading from a large directory.
135- *
136136 * @param internalQueueCapacity
137137 * the size of the queue used to cache files to be received
138138 * internally. This queue can be made larger to optimize the
@@ -151,26 +151,22 @@ public FileReadingMessageSource(int internalQueueCapacity) {
151151
152152 /**
153153 * Creates a FileReadingMessageSource with a {@link PriorityBlockingQueue}
154- * ordered with the passed in {@link Comparator}
155- * <p>
156- * The size of the queue used should be large enough to hold all the files
154+ * ordered with the passed in {@link Comparator}.
155+ * <p> The size of the queue used should be large enough to hold all the files
157156 * in the input directory in order to sort all of them, so restricting the
158157 * size of the queue is mutually exclusive with ordering. No guarantees
159158 * about file delivery order can be made under concurrent access.
160- * <p>
161- *
162159 * @param receptionOrderComparator
163160 * the comparator to be used to order the files in the internal
164161 * queue
165162 */
166- public FileReadingMessageSource (Comparator <File > receptionOrderComparator ) {
163+ public FileReadingMessageSource (@ Nullable Comparator <File > receptionOrderComparator ) {
167164 this .toBeReceived = new PriorityBlockingQueue <>(DEFAULT_INTERNAL_QUEUE_CAPACITY , receptionOrderComparator );
168165 }
169166
170167
171168 /**
172169 * Specify the input directory.
173- *
174170 * @param directory to monitor
175171 */
176172 public void setDirectory (File directory ) {
@@ -181,7 +177,6 @@ public void setDirectory(File directory) {
181177 /**
182178 * Optionally specify a custom scanner, for example the
183179 * {@link WatchServiceDirectoryScanner}
184- *
185180 * @param scanner scanner implementation
186181 */
187182 public void setScanner (DirectoryScanner scanner ) {
@@ -207,7 +202,6 @@ public DirectoryScanner getScanner() {
207202 * <em>true</em>. If set to <em>false</em> and the
208203 * source directory does not exist, an Exception will be thrown upon
209204 * initialization.
210- *
211205 * @param autoCreateDirectory
212206 * should the directory to be monitored be created when this
213207 * component starts up?
@@ -224,8 +218,7 @@ public void setAutoCreateDirectory(boolean autoCreateDirectory) {
224218 * If multiple filters are required a
225219 * {@link org.springframework.integration.file.filters.CompositeFileListFilter}
226220 * can be used to group them together.
227- * <p>
228- * <b>The supplied filter must be thread safe.</b>.
221+ * <p> <b>The supplied filter must be thread safe.</b>.
229222 * @param filter a filter
230223 */
231224 public void setFilter (FileListFilter <File > filter ) {
@@ -256,7 +249,6 @@ public void setLocker(FileLocker locker) {
256249 * will more likely be out of sync with the file system if this flag is set
257250 * to <code>false</code>, but it will change more often (causing expensive
258251 * reordering) if it is set to <code>true</code>.
259- *
260252 * @param scanEachPoll
261253 * whether or not the component should re-scan (as opposed to not
262254 * rescanning until the entire backlog has been delivered)
@@ -304,15 +296,15 @@ public String getComponentType() {
304296 @ Override
305297 public void start () {
306298 if (!this .running .getAndSet (true )) {
307- if (!this .directory .exists () && this .autoCreateDirectory ) {
308- this .directory . mkdirs ( );
299+ if (!this .directory .exists () && this .autoCreateDirectory && ! this . directory . mkdirs () ) {
300+ throw new IllegalStateException ( "Cannot create directory or ita parents: " + this .directory );
309301 }
310302 Assert .isTrue (this .directory .exists (),
311- "Source directory [" + this .directory + "] does not exist." );
303+ () -> "Source directory [" + this .directory + "] does not exist." );
312304 Assert .isTrue (this .directory .isDirectory (),
313- "Source path [" + this .directory + "] does not point to a directory." );
305+ () -> "Source path [" + this .directory + "] does not point to a directory." );
314306 Assert .isTrue (this .directory .canRead (),
315- "Source directory [" + this .directory + "] is not readable." );
307+ () -> "Source directory [" + this .directory + "] is not readable." );
316308 if (this .scanner instanceof Lifecycle ) {
317309 ((Lifecycle ) this .scanner ).start ();
318310 }
@@ -336,7 +328,7 @@ protected void onInit() {
336328 Assert .notNull (this .directory , "'directory' must not be null" );
337329
338330 Assert .state (!(this .scannerExplicitlySet && this .useWatchService ),
339- "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this .scanner );
331+ () -> "The 'scanner' and 'useWatchService' options are mutually exclusive: " + this .scanner );
340332
341333 if (this .useWatchService ) {
342334 this .scanner = new WatchServiceDirectoryScanner ();
@@ -345,8 +337,8 @@ protected void onInit() {
345337 // Check that the filter and locker options are _NOT_ set if an external scanner has been set.
346338 // The external scanner is responsible for the filter and locker options in that case.
347339 Assert .state (!(this .scannerExplicitlySet && (this .filter != null || this .locker != null )),
348- "When using an external scanner the 'filter' and 'locker' options should not be used. Instead, set these options on the external DirectoryScanner: "
349- + this .scanner );
340+ () -> "When using an external scanner the 'filter' and 'locker' options should not be used. " +
341+ "Instead, set these options on the external DirectoryScanner: " + this .scanner );
350342 if (this .filter != null ) {
351343 this .scanner .setFilter (this .filter );
352344 }
@@ -492,7 +484,7 @@ protected File[] listEligibleFiles(File directory) {
492484
493485 files .addAll (filesFromEvents ());
494486
495- return files .toArray (new File [files . size () ]);
487+ return files .toArray (new File [0 ]);
496488 }
497489
498490 private Set <File > filesFromEvents () {
@@ -504,57 +496,11 @@ private Set<File> filesFromEvents() {
504496 if (event .kind () == StandardWatchEventKinds .ENTRY_CREATE ||
505497 event .kind () == StandardWatchEventKinds .ENTRY_MODIFY ||
506498 event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
507- Path item = (Path ) event .context ();
508- File file = new File (parentDir , item .toFile ().getName ());
509- if (logger .isDebugEnabled ()) {
510- logger .debug ("Watch event [" + event .kind () + "] for file [" + file + "]" );
511- }
512499
513- if (event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
514- if (getFilter () instanceof ResettableFileListFilter ) {
515- ((ResettableFileListFilter <File >) getFilter ()).remove (file );
516- }
517- boolean fileRemoved = files .remove (file );
518- if (fileRemoved && logger .isDebugEnabled ()) {
519- logger .debug ("The file [" + file +
520- "] has been removed from the queue because of DELETE event." );
521- }
522- }
523- else {
524- if (file .exists ()) {
525- if (file .isDirectory ()) {
526- files .addAll (walkDirectory (file .toPath (), event .kind ()));
527- }
528- else {
529- files .remove (file );
530- files .add (file );
531- }
532- }
533- else {
534- if (logger .isDebugEnabled ()) {
535- logger .debug ("A file [" + file + "] for the event [" + event .kind () +
536- "] doesn't exist. Ignored." );
537- }
538- }
539- }
500+ processFilesFromNormalEvent (files , parentDir , event );
540501 }
541502 else if (event .kind () == StandardWatchEventKinds .OVERFLOW ) {
542- if (logger .isDebugEnabled ()) {
543- logger .debug ("Watch event [" + StandardWatchEventKinds .OVERFLOW +
544- "] with context [" + event .context () + "]" );
545- }
546-
547- for (WatchKey watchKey : this .pathKeys .values ()) {
548- watchKey .cancel ();
549- }
550- this .pathKeys .clear ();
551-
552- if (event .context () != null && event .context () instanceof Path ) {
553- files .addAll (walkDirectory ((Path ) event .context (), event .kind ()));
554- }
555- else {
556- files .addAll (walkDirectory (FileReadingMessageSource .this .directory .toPath (), event .kind ()));
557- }
503+ processFilesFromOverflowEvent (files , event );
558504 }
559505 }
560506 key .reset ();
@@ -563,6 +509,61 @@ else if (event.kind() == StandardWatchEventKinds.OVERFLOW) {
563509 return files ;
564510 }
565511
512+ private void processFilesFromNormalEvent (Set <File > files , File parentDir , WatchEvent <?> event ) {
513+ Path item = (Path ) event .context ();
514+ File file = new File (parentDir , item .toFile ().getName ());
515+ if (logger .isDebugEnabled ()) {
516+ logger .debug ("Watch event [" + event .kind () + "] for file [" + file + "]" );
517+ }
518+
519+ if (event .kind () == StandardWatchEventKinds .ENTRY_DELETE ) {
520+ if (getFilter () instanceof ResettableFileListFilter ) {
521+ ((ResettableFileListFilter <File >) getFilter ()).remove (file );
522+ }
523+ boolean fileRemoved = files .remove (file );
524+ if (fileRemoved && logger .isDebugEnabled ()) {
525+ logger .debug ("The file [" + file +
526+ "] has been removed from the queue because of DELETE event." );
527+ }
528+ }
529+ else {
530+ if (file .exists ()) {
531+ if (file .isDirectory ()) {
532+ files .addAll (walkDirectory (file .toPath (), event .kind ()));
533+ }
534+ else {
535+ files .remove (file );
536+ files .add (file );
537+ }
538+ }
539+ else {
540+ if (logger .isDebugEnabled ()) {
541+ logger .debug ("A file [" + file + "] for the event [" + event .kind () +
542+ "] doesn't exist. Ignored." );
543+ }
544+ }
545+ }
546+ }
547+
548+ private void processFilesFromOverflowEvent (Set <File > files , WatchEvent <?> event ) {
549+ if (logger .isDebugEnabled ()) {
550+ logger .debug ("Watch event [" + StandardWatchEventKinds .OVERFLOW +
551+ "] with context [" + event .context () + "]" );
552+ }
553+
554+ for (WatchKey watchKey : this .pathKeys .values ()) {
555+ watchKey .cancel ();
556+ }
557+ this .pathKeys .clear ();
558+
559+ if (event .context () != null && event .context () instanceof Path ) {
560+ files .addAll (walkDirectory ((Path ) event .context (), event .kind ()));
561+ }
562+ else {
563+ files .addAll (walkDirectory (FileReadingMessageSource .this .directory .toPath (), event .kind ()));
564+ }
565+ }
566+
566567 private Set <File > walkDirectory (Path directory , final WatchEvent .Kind <?> kind ) {
567568 final Set <File > walkedFiles = new LinkedHashSet <>();
568569 try {
0 commit comments