1//! Generic Watcher implementation based on polling
2//!
3//! Checks the `watch`ed paths periodically to detect changes. This implementation only uses
4//! Rust stdlib APIs and should work on all of the platforms it supports.
5
6use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
7use std::{
8 collections::HashMap,
9 path::{Path, PathBuf},
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Arc, Mutex,
13 },
14 thread,
15 time::Duration,
16};
17
18/// Event send for registered handler on initial directory scans
19pub type ScanEvent = crate::Result<PathBuf>;
20
21/// Handler trait for receivers of ScanEvent.
22/// Very much the same as [EventHandler], but including the Result.
23///
24/// See the full example for more information.
25pub trait ScanEventHandler: Send + 'static {
26 /// Handles an event.
27 fn handle_event(&mut self, event: ScanEvent);
28}
29
30impl<F> ScanEventHandler for F
31where
32 F: FnMut(ScanEvent) + Send + 'static,
33{
34 fn handle_event(&mut self, event: ScanEvent) {
35 (self)(event);
36 }
37}
38
39#[cfg(feature = "crossbeam-channel")]
40impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
41 fn handle_event(&mut self, event: ScanEvent) {
42 let _ = self.send(event);
43 }
44}
45
46impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
47 fn handle_event(&mut self, event: ScanEvent) {
48 let _ = self.send(event);
49 }
50}
51
52impl ScanEventHandler for () {
53 fn handle_event(&mut self, _event: ScanEvent) {}
54}
55
56use data::{DataBuilder, WatchData};
57mod data {
58 use crate::{
59 event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
60 EventHandler,
61 };
62 use filetime::FileTime;
63 use std::{
64 cell::RefCell,
65 collections::{hash_map::RandomState, HashMap},
66 fmt::{self, Debug},
67 fs::{self, File, Metadata},
68 hash::{BuildHasher, Hasher},
69 io::{self, Read},
70 path::{Path, PathBuf},
71 time::Instant,
72 };
73 use walkdir::WalkDir;
74
75 use super::ScanEventHandler;
76
77 /// Builder for [`WatchData`] & [`PathData`].
78 pub(super) struct DataBuilder {
79 emitter: EventEmitter,
80 scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
81
82 // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault
83 // in future.
84 build_hasher: Option<RandomState>,
85
86 // current timestamp for building Data.
87 now: Instant,
88 }
89
90 impl DataBuilder {
91 pub(super) fn new<F, G>(
92 event_handler: F,
93 compare_content: bool,
94 scan_emitter: Option<G>,
95 ) -> Self
96 where
97 F: EventHandler,
98 G: ScanEventHandler,
99 {
100 let scan_emitter = match scan_emitter {
101 None => None,
102 Some(v) => {
103 // workaround for a weird type resolution bug when directly going to dyn Trait
104 let intermediate: Box<RefCell<dyn ScanEventHandler>> =
105 Box::new(RefCell::new(v));
106 Some(intermediate)
107 }
108 };
109 Self {
110 emitter: EventEmitter::new(event_handler),
111 scan_emitter,
112 build_hasher: compare_content.then(RandomState::default),
113 now: Instant::now(),
114 }
115 }
116
117 /// Update internal timestamp.
118 pub(super) fn update_timestamp(&mut self) {
119 self.now = Instant::now();
120 }
121
122 /// Create [`WatchData`].
123 ///
124 /// This function will return `Err(_)` if can not retrieve metadata from
125 /// the path location. (e.g., not found).
126 pub(super) fn build_watch_data(
127 &self,
128 root: PathBuf,
129 is_recursive: bool,
130 ) -> Option<WatchData> {
131 WatchData::new(self, root, is_recursive)
132 }
133
134 /// Create [`PathData`].
135 fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
136 PathData::new(self, meta_path)
137 }
138 }
139
140 impl Debug for DataBuilder {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 f.debug_struct("DataBuilder")
143 .field("build_hasher", &self.build_hasher)
144 .field("now", &self.now)
145 .finish()
146 }
147 }
148
149 #[derive(Debug)]
150 pub(super) struct WatchData {
151 // config part, won't change.
152 root: PathBuf,
153 is_recursive: bool,
154
155 // current status part.
156 all_path_data: HashMap<PathBuf, PathData>,
157 }
158
159 impl WatchData {
160 /// Scan filesystem and create a new `WatchData`.
161 ///
162 /// # Side effect
163 ///
164 /// This function may send event by `data_builder.emitter`.
165 fn new(data_builder: &DataBuilder, root: PathBuf, is_recursive: bool) -> Option<Self> {
166 // If metadata read error at `root` path, it will emit
167 // a error event and stop to create the whole `WatchData`.
168 //
169 // QUESTION: inconsistent?
170 //
171 // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`,
172 // if `root` path hit an io error, then watcher will reject to
173 // create this new watch.
174 //
175 // This may inconsistent with *POLLING* a watch. When watcher
176 // continue polling, io error at root path will not delete
177 // a existing watch. polling still working.
178 //
179 // So, consider a config file may not exists at first time but may
180 // create after a while, developer cannot watch it.
181 //
182 // FIXME: Can we always allow to watch a path, even file not
183 // found at this path?
184 if let Err(e) = fs::metadata(&root) {
185 data_builder.emitter.emit_io_err(e, &root);
186 return None;
187 }
188
189 let all_path_data =
190 Self::scan_all_path_data(data_builder, root.clone(), is_recursive, true).collect();
191
192 Some(Self {
193 root,
194 is_recursive,
195 all_path_data,
196 })
197 }
198
199 /// Rescan filesystem and update this `WatchData`.
200 ///
201 /// # Side effect
202 ///
203 /// This function may emit event by `data_builder.emitter`.
204 pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
205 // scan current filesystem.
206 for (path, new_path_data) in
207 Self::scan_all_path_data(data_builder, self.root.clone(), self.is_recursive, false)
208 {
209 let old_path_data = self
210 .all_path_data
211 .insert(path.clone(), new_path_data.clone());
212
213 // emit event
214 let event =
215 PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
216 if let Some(event) = event {
217 data_builder.emitter.emit_ok(event);
218 }
219 }
220
221 // scan for disappeared paths.
222 let mut disappeared_paths = Vec::new();
223 for (path, path_data) in self.all_path_data.iter() {
224 if path_data.last_check < data_builder.now {
225 disappeared_paths.push(path.clone());
226 }
227 }
228
229 // remove disappeared paths
230 for path in disappeared_paths {
231 let old_path_data = self.all_path_data.remove(&path);
232
233 // emit event
234 let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
235 if let Some(event) = event {
236 data_builder.emitter.emit_ok(event);
237 }
238 }
239 }
240
241 /// Get all `PathData` by given configuration.
242 ///
243 /// # Side Effect
244 ///
245 /// This function may emit some IO Error events by `data_builder.emitter`.
246 fn scan_all_path_data(
247 data_builder: &'_ DataBuilder,
248 root: PathBuf,
249 is_recursive: bool,
250 // whether this is an initial scan, used only for events
251 is_initial: bool,
252 ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
253 log::trace!("rescanning {root:?}");
254 // WalkDir return only one entry if root is a file (not a folder),
255 // so we can use single logic to do the both file & dir's jobs.
256 //
257 // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new
258 WalkDir::new(root)
259 .follow_links(true)
260 .max_depth(Self::dir_scan_depth(is_recursive))
261 .into_iter()
262 //
263 // QUESTION: should we ignore IO Error?
264 //
265 // current implementation ignore some IO error, e.g.,
266 //
267 // - `.filter_map(|entry| entry.ok())`
268 // - all read error when hashing
269 //
270 // but the code also interest with `fs::metadata()` error and
271 // propagate to event handler. It may not consistent.
272 //
273 // FIXME: Should we emit all IO error events? Or ignore them all?
274 .filter_map(|entry_res| match entry_res {
275 Ok(entry) => Some(entry),
276 Err(err) => {
277 log::warn!("walkdir error scanning {err:?}");
278 let crate_err =
279 crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
280 data_builder.emitter.emit(Err(crate_err));
281 None
282 }
283 })
284 .filter_map(move |entry| match entry.metadata() {
285 Ok(metadata) => {
286 let path = entry.into_path();
287 if is_initial {
288 // emit initial scans
289 if let Some(ref emitter) = data_builder.scan_emitter {
290 emitter.borrow_mut().handle_event(Ok(path.clone()));
291 }
292 }
293 let meta_path = MetaPath::from_parts_unchecked(path, metadata);
294 let data_path = data_builder.build_path_data(&meta_path);
295
296 Some((meta_path.into_path(), data_path))
297 }
298 Err(e) => {
299 // emit event.
300 let path = entry.into_path();
301 data_builder.emitter.emit_io_err(e, path);
302
303 None
304 }
305 })
306 }
307
308 fn dir_scan_depth(is_recursive: bool) -> usize {
309 if is_recursive {
310 usize::max_value()
311 } else {
312 1
313 }
314 }
315 }
316
317 /// Stored data for a one path locations.
318 ///
319 /// See [`WatchData`] for more detail.
320 #[derive(Debug, Clone)]
321 struct PathData {
322 /// File updated time.
323 mtime: i64,
324
325 /// Content's hash value, only available if user request compare file
326 /// contents and read successful.
327 hash: Option<u64>,
328
329 /// Checked time.
330 last_check: Instant,
331 }
332
333 impl PathData {
334 /// Create a new `PathData`.
335 fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
336 let metadata = meta_path.metadata();
337
338 PathData {
339 mtime: FileTime::from_last_modification_time(metadata).seconds(),
340 hash: data_builder
341 .build_hasher
342 .as_ref()
343 .filter(|_| metadata.is_file())
344 .and_then(|build_hasher| {
345 Self::get_content_hash(build_hasher, meta_path.path()).ok()
346 }),
347
348 last_check: data_builder.now,
349 }
350 }
351
352 /// Get hash value for the data content in given file `path`.
353 fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> {
354 let mut hasher = build_hasher.build_hasher();
355 let mut file = File::open(path)?;
356 let mut buf = [0; 512];
357
358 loop {
359 let n = match file.read(&mut buf) {
360 Ok(0) => break,
361 Ok(len) => len,
362 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
363 Err(e) => return Err(e),
364 };
365
366 hasher.write(&buf[..n]);
367 }
368
369 Ok(hasher.finish())
370 }
371
372 /// Get [`Event`] by compare two optional [`PathData`].
373 fn compare_to_event<P>(
374 path: P,
375 old: Option<&PathData>,
376 new: Option<&PathData>,
377 ) -> Option<Event>
378 where
379 P: Into<PathBuf>,
380 {
381 match (old, new) {
382 (Some(old), Some(new)) => {
383 if new.mtime > old.mtime {
384 Some(EventKind::Modify(ModifyKind::Metadata(
385 MetadataKind::WriteTime,
386 )))
387 } else if new.hash != old.hash {
388 Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
389 } else {
390 None
391 }
392 }
393 (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
394 (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
395 (None, None) => None,
396 }
397 .map(|event_kind| Event::new(event_kind).add_path(path.into()))
398 }
399 }
400
401 /// Compose path and its metadata.
402 ///
403 /// This data structure designed for make sure path and its metadata can be
404 /// transferred in consistent way, and may avoid some duplicated
405 /// `fs::metadata()` function call in some situations.
406 #[derive(Debug)]
407 pub(super) struct MetaPath {
408 path: PathBuf,
409 metadata: Metadata,
410 }
411
412 impl MetaPath {
413 /// Create `MetaPath` by given parts.
414 ///
415 /// # Invariant
416 ///
417 /// User must make sure the input `metadata` are associated with `path`.
418 fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
419 Self { path, metadata }
420 }
421
422 fn path(&self) -> &Path {
423 &self.path
424 }
425
426 fn metadata(&self) -> &Metadata {
427 &self.metadata
428 }
429
430 fn into_path(self) -> PathBuf {
431 self.path
432 }
433 }
434
435 /// Thin wrapper for outer event handler, for easy to use.
436 struct EventEmitter(
437 // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self).
438 // Use `Box` to make sure EventEmitter is Sized.
439 Box<RefCell<dyn EventHandler>>,
440 );
441
442 impl EventEmitter {
443 fn new<F: EventHandler>(event_handler: F) -> Self {
444 Self(Box::new(RefCell::new(event_handler)))
445 }
446
447 /// Emit single event.
448 fn emit(&self, event: crate::Result<Event>) {
449 self.0.borrow_mut().handle_event(event);
450 }
451
452 /// Emit event.
453 fn emit_ok(&self, event: Event) {
454 self.emit(Ok(event))
455 }
456
457 /// Emit io error event.
458 fn emit_io_err<E, P>(&self, err: E, path: P)
459 where
460 E: Into<io::Error>,
461 P: Into<PathBuf>,
462 {
463 self.emit(Err(crate::Error::io(err.into()).add_path(path.into())))
464 }
465 }
466}
467
468/// Polling based `Watcher` implementation.
469///
470/// By default scans through all files and checks for changed entries based on their change date.
471/// Can also be changed to perform file content change checks.
472///
473/// See [Config] for more details.
474#[derive(Debug)]
475pub struct PollWatcher {
476 watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
477 data_builder: Arc<Mutex<DataBuilder>>,
478 want_to_stop: Arc<AtomicBool>,
479 /// channel to the poll loop
480 /// currently used only for manual polling
481 message_channel: Sender<()>,
482 delay: Option<Duration>,
483}
484
485impl PollWatcher {
486 /// Create a new [PollWatcher], configured as needed.
487 pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
488 Self::with_opt::<_, ()>(event_handler, config, None)
489 }
490
491 /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling.
492 pub fn poll(&self) -> crate::Result<()> {
493 self.message_channel
494 .send(())
495 .map_err(|_| Error::generic("failed to send poll message"))?;
496 Ok(())
497 }
498
499 /// Create a new [PollWatcher] with an scan event handler.
500 ///
501 /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher.
502 pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
503 event_handler: F,
504 config: Config,
505 scan_callback: G,
506 ) -> crate::Result<PollWatcher> {
507 Self::with_opt(event_handler, config, Some(scan_callback))
508 }
509
510 /// create a new PollWatcher with all options
511 fn with_opt<F: EventHandler, G: ScanEventHandler>(
512 event_handler: F,
513 config: Config,
514 scan_callback: Option<G>,
515 ) -> crate::Result<PollWatcher> {
516 let data_builder =
517 DataBuilder::new(event_handler, config.compare_contents(), scan_callback);
518
519 let (tx, rx) = unbounded();
520
521 let poll_watcher = PollWatcher {
522 watches: Default::default(),
523 data_builder: Arc::new(Mutex::new(data_builder)),
524 want_to_stop: Arc::new(AtomicBool::new(false)),
525 delay: config.poll_interval_v2(),
526 message_channel: tx,
527 };
528
529 poll_watcher.run(rx);
530
531 Ok(poll_watcher)
532 }
533
534 fn run(&self, rx: Receiver<()>) {
535 let watches = Arc::clone(&self.watches);
536 let data_builder = Arc::clone(&self.data_builder);
537 let want_to_stop = Arc::clone(&self.want_to_stop);
538 let delay = self.delay;
539
540 let _ = thread::Builder::new()
541 .name("notify-rs poll loop".to_string())
542 .spawn(move || {
543 loop {
544 if want_to_stop.load(Ordering::SeqCst) {
545 break;
546 }
547
548 // HINT: Make sure always lock in the same order to avoid deadlock.
549 //
550 // FIXME: inconsistent: some place mutex poison cause panic,
551 // some place just ignore.
552 if let (Ok(mut watches), Ok(mut data_builder)) =
553 (watches.lock(), data_builder.lock())
554 {
555 data_builder.update_timestamp();
556
557 let vals = watches.values_mut();
558 for watch_data in vals {
559 watch_data.rescan(&mut data_builder);
560 }
561 }
562 // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start))
563 if let Some(delay) = delay {
564 let _ = rx.recv_timeout(delay);
565 } else {
566 let _ = rx.recv();
567 }
568 }
569 });
570 }
571
572 /// Watch a path location.
573 ///
574 /// QUESTION: this function never return an Error, is it as intend?
575 /// Please also consider the IO Error event problem.
576 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
577 // HINT: Make sure always lock in the same order to avoid deadlock.
578 //
579 // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
580 if let (Ok(mut watches), Ok(mut data_builder)) =
581 (self.watches.lock(), self.data_builder.lock())
582 {
583 data_builder.update_timestamp();
584
585 let watch_data =
586 data_builder.build_watch_data(path.to_path_buf(), recursive_mode.is_recursive());
587
588 // if create watch_data successful, add it to watching list.
589 if let Some(watch_data) = watch_data {
590 watches.insert(path.to_path_buf(), watch_data);
591 }
592 }
593 }
594
595 /// Unwatch a path.
596 ///
597 /// Return `Err(_)` if given path has't be monitored.
598 fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
599 // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore.
600 self.watches
601 .lock()
602 .unwrap()
603 .remove(path)
604 .map(|_| ())
605 .ok_or_else(crate::Error::watch_not_found)
606 }
607}
608
609impl Watcher for PollWatcher {
610 /// Create a new [PollWatcher].
611 fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
612 Self::new(event_handler, config)
613 }
614
615 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
616 self.watch_inner(path, recursive_mode);
617
618 Ok(())
619 }
620
621 fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
622 self.unwatch_inner(path)
623 }
624
625 fn kind() -> crate::WatcherKind {
626 crate::WatcherKind::PollWatcher
627 }
628}
629
630impl Drop for PollWatcher {
631 fn drop(&mut self) {
632 self.want_to_stop.store(val:true, order:Ordering::Relaxed);
633 }
634}
635
636#[test]
637fn poll_watcher_is_send_and_sync() {
638 fn check<T: Send + Sync>() {}
639 check::<PollWatcher>();
640}
641