1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread,
15 time::Duration,
16};
17
18/// Event sent for registered handlers on initial directory scans
19pub type ScanEvent = crate::Result<PathBuf>;
20
21/// Handler trait for receivers of [`ScanEvent`].
22/// Very much the same as [`EventHandler`], but including the Result.
23///
24/// See the full example for more information.
25pub trait ScanEventHandler: Send + 'static {
26 /// Handles an event.
27 fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32 F: FnMut(ScanEvent) + Send + 'static,
33{
34 fn handle_event(&mut self, event: ScanEvent) {
35 (self)(event);
36 }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41 fn handle_event(&mut self, event: ScanEvent) {
42 let _ = self.send(event);
43 }
44}
45
46impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
47 fn handle_event(&mut self, event: ScanEvent) {
48 let _ = self.send(event);
49 }
50}
51
52impl ScanEventHandler for () {
53 fn handle_event(&mut self, _event: ScanEvent) {}
54}
55
56use data::{DataBuilder, WatchData};
57mod data {
58 use crate::{
59 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
60 EventHandler,
61 };
62 use filetime::FileTime;
63 use std::{
64 cell::RefCell,
65 collections::{hash_map::RandomState, HashMap},
66 fmt::{self, Debug},
67 fs::{self, File, Metadata},
68 hash::{BuildHasher, Hasher},
69 io::{self, Read},
70 path::{Path, PathBuf},
71 time::Instant,
72 };
73 use walkdir::WalkDir;
74
75 use super::ScanEventHandler;
76
77 /// Builder for [`WatchData`] & [`PathData`].
78 pub(super) struct DataBuilder {
79 emitter: EventEmitter,
80 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
81
82 // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
83 // in future.
84 build_hasher: Option<RandomState>,
85
86 // current timestamp for building Data.
87 now: Instant,
88 }
89
90 impl DataBuilder {
91 pub(super) fn new<F, G>(
92 event_handler: F,
93 compare_content: bool,
94 scan_emitter: Option<G>,
95 ) -> Self
96 where
97 F: EventHandler,
98 G: ScanEventHandler,
99 {
100 let scan_emitter = match scan_emitter {
101 None => None,
102 Some(v) => {
103 // workaround for a weird type resolution bug when directly going to dyn Trait
104 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
105 Box::new(RefCell::new(v));
106 Some(intermediate)
107 }
108 };
109 Self {
110 emitter: EventEmitter::new(event_handler),
111 scan_emitter,
112 build_hasher: compare_content.then(RandomState::default),
113 now: Instant::now(),
114 }
115 }
116
117 /// Update internal timestamp.
118 pub(super) fn update_timestamp(&mut self) {
119 self.now = Instant::now();
120 }
121
122 /// Create [`WatchData`].
123 ///
124 /// This function will return `Err(_)` if can not retrieve metadata from
125 /// the path location. (e.g., not found).
126 pub(super) fn build_watch_data(
127 &self,
128 root: PathBuf,
129 is_recursive: bool,
130 follow_symlinks: bool,
131 ) -> Option<WatchData> {
132 WatchData::new(self, root, is_recursive, follow_symlinks)
133 }
134
135 /// Create [`PathData`].
136 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
137 PathData::new(self, meta_path)
138 }
139 }
140
141 impl Debug for DataBuilder {
142 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
143 f.debug_struct("DataBuilder")
144 .field("build_hasher", &self.build_hasher)
145 .field("now", &self.now)
146 .finish()
147 }
148 }
149
150 #[derive(Debug)]
151 pub(super) struct WatchData {
152 // config part, won't change.
153 root: PathBuf,
154 is_recursive: bool,
155 follow_symlinks: bool,
156
157 // current status part.
158 all_path_data: HashMap<PathBuf, PathData>,
159 }
160
161 impl WatchData {
162 /// Scan filesystem and create a new `WatchData`.
163 ///
164 /// # Side effect
165 ///
166 /// This function may send event by `data_builder.emitter`.
167 fn new(
168 data_builder: &DataBuilder,
169 root: PathBuf,
170 is_recursive: bool,
171 follow_symlinks: bool,
172 ) -> Option<Self> {
173 // If metadata read error at `root` path, it will emit
174 // a error event and stop to create the whole `WatchData`.
175 //
176 // QUESTION: inconsistent?
177 //
178 // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
179 // if `root` path hit an io error, then watcher will reject to
180 // create this new watch.
181 //
182 // This may inconsistent with *POLLING* a watch. When watcher
183 // continue polling, io error at root path will not delete
184 // a existing watch. polling still working.
185 //
186 // So, consider a config file may not exists at first time but may
187 // create after a while, developer cannot watch it.
188 //
189 // FIXME: Can we always allow to watch a path, even file not
190 // found at this path?
191 if let Err(e) = fs::metadata(&root) {
192 data_builder.emitter.emit_io_err(e, Some(&root));
193 return None;
194 }
195
196 let all_path_data = Self::scan_all_path_data(
197 data_builder,
198 root.clone(),
199 is_recursive,
200 follow_symlinks,
201 true,
202 )
203 .collect();
204
205 Some(Self {
206 root,
207 is_recursive,
208 follow_symlinks,
209 all_path_data,
210 })
211 }
212
213 /// Rescan filesystem and update this `WatchData`.
214 ///
215 /// # Side effect
216 ///
217 /// This function may emit event by `data_builder.emitter`.
218 pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
219 // scan current filesystem.
220 for (path, new_path_data) in Self::scan_all_path_data(
221 data_builder,
222 self.root.clone(),
223 self.is_recursive,
224 self.follow_symlinks,
225 false,
226 ) {
227 let old_path_data = self
228 .all_path_data
229 .insert(path.clone(), new_path_data.clone());
230
231 // emit event
232 let event =
233 PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
234 if let Some(event) = event {
235 data_builder.emitter.emit_ok(event);
236 }
237 }
238
239 // scan for disappeared paths.
240 let mut disappeared_paths = Vec::new();
241 for (path, path_data) in self.all_path_data.iter() {
242 if path_data.last_check < data_builder.now {
243 disappeared_paths.push(path.clone());
244 }
245 }
246
247 // remove disappeared paths
248 for path in disappeared_paths {
249 let old_path_data = self.all_path_data.remove(&path);
250
251 // emit event
252 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
253 if let Some(event) = event {
254 data_builder.emitter.emit_ok(event);
255 }
256 }
257 }
258
259 /// Get all `PathData` by given configuration.
260 ///
261 /// # Side Effect
262 ///
263 /// This function may emit some IO Error events by `data_builder.emitter`.
264 fn scan_all_path_data(
265 data_builder: &'_ DataBuilder,
266 root: PathBuf,
267 is_recursive: bool,
268 follow_symlinks: bool,
269 // whether this is an initial scan, used only for events
270 is_initial: bool,
271 ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
272 log::trace!("rescanning {root:?}");
273 // WalkDir return only one entry if root is a file (not a folder),
274 // so we can use single logic to do the both file & dir's jobs.
275 //
276 // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
277 WalkDir::new(root)
278 .follow_links(follow_symlinks)
279 .max_depth(Self::dir_scan_depth(is_recursive))
280 .into_iter()
281 .filter_map(|entry_res| match entry_res {
282 Ok(entry) => Some(entry),
283 Err(err) => {
284 log::warn!("walkdir error scanning {err:?}");
285 if let Some(io_error) = err.io_error() {
286 // clone an io::Error, so we have to create a new one.
287 let new_io_error = io::Error::new(io_error.kind(), err.to_string());
288 data_builder.emitter.emit_io_err(new_io_error, err.path());
289 } else {
290 let crate_err =
291 crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
292 data_builder.emitter.emit(Err(crate_err));
293 }
294 None
295 }
296 })
297 .filter_map(move |entry| match entry.metadata() {
298 Ok(metadata) => {
299 let path = entry.into_path();
300 if is_initial {
301 // emit initial scans
302 if let Some(ref emitter) = data_builder.scan_emitter {
303 emitter.borrow_mut().handle_event(Ok(path.clone()));
304 }
305 }
306 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
307 let data_path = data_builder.build_path_data(&meta_path);
308
309 Some((meta_path.into_path(), data_path))
310 }
311 Err(e) => {
312 // emit event.
313 let path = entry.into_path();
314 data_builder.emitter.emit_io_err(e, Some(path));
315
316 None
317 }
318 })
319 }
320
321 fn dir_scan_depth(is_recursive: bool) -> usize {
322 if is_recursive {
323 usize::MAX
324 } else {
325 1
326 }
327 }
328 }
329
330 /// Stored data for a one path locations.
331 ///
332 /// See [`WatchData`] for more detail.
333 #[derive(Debug, Clone)]
334 struct PathData {
335 /// File updated time.
336 mtime: i64,
337
338 /// Content's hash value, only available if user request compare file
339 /// contents and read successful.
340 hash: Option<u64>,
341
342 /// Checked time.
343 last_check: Instant,
344 }
345
346 impl PathData {
347 /// Create a new `PathData`.
348 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
349 let metadata = meta_path.metadata();
350
351 PathData {
352 mtime: FileTime::from_last_modification_time(metadata).seconds(),
353 hash: data_builder
354 .build_hasher
355 .as_ref()
356 .filter(|_| metadata.is_file())
357 .and_then(|build_hasher| {
358 Self::get_content_hash(build_hasher, meta_path.path()).ok()
359 }),
360
361 last_check: data_builder.now,
362 }
363 }
364
365 /// Get hash value for the data content in given file `path`.
366 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
367 let mut hasher = build_hasher.build_hasher();
368 let mut file = File::open(path)?;
369 let mut buf = [0; 512];
370
371 loop {
372 let n = match file.read(&mut buf) {
373 Ok(0) => break,
374 Ok(len) => len,
375 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
376 Err(e) => return Err(e),
377 };
378
379 hasher.write(&buf[..n]);
380 }
381
382 Ok(hasher.finish())
383 }
384
385 /// Get [`Event`] by compare two optional [`PathData`].
386 fn compare_to_event<P>(
387 path: P,
388 old: Option<&PathData>,
389 new: Option<&PathData>,
390 ) -> Option<Event>
391 where
392 P: Into<PathBuf>,
393 {
394 match (old, new) {
395 (Some(old), Some(new)) => {
396 if new.mtime > old.mtime {
397 Some(EventKind::Modify(ModifyKind::Metadata(
398 MetadataKind::WriteTime,
399 )))
400 } else if new.hash != old.hash {
401 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
402 } else {
403 None
404 }
405 }
406 (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
407 (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
408 (None, None) => None,
409 }
410 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
411 }
412 }
413
414 /// Compose path and its metadata.
415 ///
416 /// This data structure designed for make sure path and its metadata can be
417 /// transferred in consistent way, and may avoid some duplicated
418 /// `fs::metadata()` function call in some situations.
419 #[derive(Debug)]
420 pub(super) struct MetaPath {
421 path: PathBuf,
422 metadata: Metadata,
423 }
424
425 impl MetaPath {
426 /// Create `MetaPath` by given parts.
427 ///
428 /// # Invariant
429 ///
430 /// User must make sure the input `metadata` are associated with `path`.
431 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
432 Self { path, metadata }
433 }
434
435 fn path(&self) -> &Path {
436 &self.path
437 }
438
439 fn metadata(&self) -> &Metadata {
440 &self.metadata
441 }
442
443 fn into_path(self) -> PathBuf {
444 self.path
445 }
446 }
447
448 /// Thin wrapper for outer event handler, for easy to use.
449 struct EventEmitter(
450 // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
451 // Use `Box` to make sure EventEmitter is Sized.
452 Box<RefCell<dyn EventHandler>>,
453 );
454
455 impl EventEmitter {
456 fn new<F: EventHandler>(event_handler: F) -> Self {
457 Self(Box::new(RefCell::new(event_handler)))
458 }
459
460 /// Emit single event.
461 fn emit(&self, event: crate::Result<Event>) {
462 self.0.borrow_mut().handle_event(event);
463 }
464
465 /// Emit event.
466 fn emit_ok(&self, event: Event) {
467 self.emit(Ok(event))
468 }
469
470 /// Emit io error event.
471 fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
472 where
473 E: Into<io::Error>,
474 P: Into<PathBuf>,
475 {
476 let e = crate::Error::io(err.into());
477 if let Some(path) = path {
478 self.emit(Err(e.add_path(path.into())));
479 } else {
480 self.emit(Err(e));
481 }
482 }
483 }
484}
485
486/// Polling based `Watcher` implementation.
487///
488/// By default scans through all files and checks for changed entries based on their change date.
489/// Can also be changed to perform file content change checks.
490///
491/// See [Config] for more details.
492#[derive(Debug)]
493pub struct PollWatcher {
494 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
495 data_builder: Arc<Mutex<DataBuilder>>,
496 want_to_stop: Arc<AtomicBool>,
497 /// channel to the poll loop
498 /// currently used only for manual polling
499 message_channel: Sender<()>,
500 delay: Option<Duration>,
501 follow_sylinks: bool,
502}
503
504impl PollWatcher {
505 /// Create a new [`PollWatcher`], configured as needed.
506 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
507 Self::with_opt::<_, ()>(event_handler, config, None)
508 }
509
510 /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
511 pub fn poll(&self) -> crate::Result<()> {
512 self.message_channel
513 .send(())
514 .map_err(|_| Error::generic("failed to send poll message"))?;
515 Ok(())
516 }
517
518 /// Create a new [`PollWatcher`] with an scan event handler.
519 ///
520 /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
521 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
522 event_handler: F,
523 config: Config,
524 scan_callback: G,
525 ) -> crate::Result<PollWatcher> {
526 Self::with_opt(event_handler, config, Some(scan_callback))
527 }
528
529 /// create a new [`PollWatcher`] with all options.
530 fn with_opt<F: EventHandler, G: ScanEventHandler>(
531 event_handler: F,
532 config: Config,
533 scan_callback: Option<G>,
534 ) -> crate::Result<PollWatcher> {
535 let data_builder =
536 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
537
538 let (tx, rx) = unbounded();
539
540 let poll_watcher = PollWatcher {
541 watches: Default::default(),
542 data_builder: Arc::new(Mutex::new(data_builder)),
543 want_to_stop: Arc::new(AtomicBool::new(false)),
544 delay: config.poll_interval(),
545 follow_sylinks: config.follow_symlinks(),
546 message_channel: tx,
547 };
548
549 poll_watcher.run(rx);
550
551 Ok(poll_watcher)
552 }
553
554 fn run(&self, rx: Receiver<()>) {
555 let watches = Arc::clone(&self.watches);
556 let data_builder = Arc::clone(&self.data_builder);
557 let want_to_stop = Arc::clone(&self.want_to_stop);
558 let delay = self.delay;
559
560 let _ = thread::Builder::new()
561 .name("notify-rs poll loop".to_string())
562 .spawn(move || {
563 loop {
564 if want_to_stop.load(Ordering::SeqCst) {
565 break;
566 }
567
568 // HINT: Make sure always lock in the same order to avoid deadlock.
569 //
570 // FIXME: inconsistent: some place mutex poison cause panic,
571 // some place just ignore.
572 if let (Ok(mut watches), Ok(mut data_builder)) =
573 (watches.lock(), data_builder.lock())
574 {
575 data_builder.update_timestamp();
576
577 let vals = watches.values_mut();
578 for watch_data in vals {
579 watch_data.rescan(&mut data_builder);
580 }
581 }
582 // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
583 if let Some(delay) = delay {
584 let _ = rx.recv_timeout(delay);
585 } else {
586 let _ = rx.recv();
587 }
588 }
589 });
590 }
591
592 /// Watch a path location.
593 ///
594 /// QUESTION: this function never return an Error, is it as intend?
595 /// Please also consider the IO Error event problem.
596 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
597 // HINT: Make sure always lock in the same order to avoid deadlock.
598 //
599 // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
600 if let (Ok(mut watches), Ok(mut data_builder)) =
601 (self.watches.lock(), self.data_builder.lock())
602 {
603 data_builder.update_timestamp();
604
605 let watch_data = data_builder.build_watch_data(
606 path.to_path_buf(),
607 recursive_mode.is_recursive(),
608 self.follow_sylinks,
609 );
610
611 // if create watch_data successful, add it to watching list.
612 if let Some(watch_data) = watch_data {
613 watches.insert(path.to_path_buf(), watch_data);
614 }
615 }
616 }
617
618 /// Unwatch a path.
619 ///
620 /// Return `Err(_)` if given path has't be monitored.
621 fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
622 // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
623 self.watches
624 .lock()
625 .unwrap()
626 .remove(path)
627 .map(|_| ())
628 .ok_or_else(crate::Error::watch_not_found)
629 }
630}
631
632impl Watcher for PollWatcher {
633 /// Create a new [`PollWatcher`].
634 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
635 Self::new(event_handler, config)
636 }
637
638 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
639 self.watch_inner(path, recursive_mode);
640
641 Ok(())
642 }
643
644 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
645 self.unwatch_inner(path)
646 }
647
648 fn kind() -> crate::WatcherKind {
649 crate::WatcherKind::PollWatcher
650 }
651}
652
653impl Drop for PollWatcher {
654 fn drop(&mut self) {
655 self.want_to_stop.store(val:true, order:Ordering::Relaxed);
656 }
657}
658
659#[test]
660fn poll_watcher_is_send_and_sync() {
661 fn check<T: Send + Sync>() {}
662 check::<PollWatcher>();
663}
664