1 | //! Generic Watcher implementation based on polling |
2 | //! |
3 | //! Checks the `watch`ed paths periodically to detect changes. This implementation only uses |
4 | //! Rust stdlib APIs and should work on all of the platforms it supports. |
5 | |
6 | use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher}; |
7 | use std::{ |
8 | collections::HashMap, |
9 | path::{Path, PathBuf}, |
10 | sync::{ |
11 | atomic::{AtomicBool, Ordering}, |
12 | Arc, Mutex, |
13 | }, |
14 | thread, |
15 | time::Duration, |
16 | }; |
17 | |
18 | /// Event sent for registered handlers on initial directory scans |
19 | pub type ScanEvent = crate::Result<PathBuf>; |
20 | |
21 | /// Handler trait for receivers of [`ScanEvent`]. |
22 | /// Very much the same as [`EventHandler`], but including the Result. |
23 | /// |
24 | /// See the full example for more information. |
25 | pub trait ScanEventHandler: Send + 'static { |
26 | /// Handles an event. |
27 | fn handle_event(&mut self, event: ScanEvent); |
28 | } |
29 | |
30 | impl<F> ScanEventHandler for F |
31 | where |
32 | F: FnMut(ScanEvent) + Send + 'static, |
33 | { |
34 | fn handle_event(&mut self, event: ScanEvent) { |
35 | (self)(event); |
36 | } |
37 | } |
38 | |
39 | #[cfg (feature = "crossbeam-channel" )] |
40 | impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> { |
41 | fn handle_event(&mut self, event: ScanEvent) { |
42 | let _ = self.send(event); |
43 | } |
44 | } |
45 | |
46 | impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> { |
47 | fn handle_event(&mut self, event: ScanEvent) { |
48 | let _ = self.send(event); |
49 | } |
50 | } |
51 | |
52 | impl ScanEventHandler for () { |
53 | fn handle_event(&mut self, _event: ScanEvent) {} |
54 | } |
55 | |
56 | use data::{DataBuilder, WatchData}; |
57 | mod data { |
58 | use crate::{ |
59 | event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind}, |
60 | EventHandler, |
61 | }; |
62 | use filetime::FileTime; |
63 | use std::{ |
64 | cell::RefCell, |
65 | collections::{hash_map::RandomState, HashMap}, |
66 | fmt::{self, Debug}, |
67 | fs::{self, File, Metadata}, |
68 | hash::{BuildHasher, Hasher}, |
69 | io::{self, Read}, |
70 | path::{Path, PathBuf}, |
71 | time::Instant, |
72 | }; |
73 | use walkdir::WalkDir; |
74 | |
75 | use super::ScanEventHandler; |
76 | |
77 | /// Builder for [`WatchData`] & [`PathData`]. |
78 | pub(super) struct DataBuilder { |
79 | emitter: EventEmitter, |
80 | scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>, |
81 | |
82 | // TODO: May allow user setup their custom BuildHasher / BuildHasherDefault |
83 | // in future. |
84 | build_hasher: Option<RandomState>, |
85 | |
86 | // current timestamp for building Data. |
87 | now: Instant, |
88 | } |
89 | |
90 | impl DataBuilder { |
91 | pub(super) fn new<F, G>( |
92 | event_handler: F, |
93 | compare_content: bool, |
94 | scan_emitter: Option<G>, |
95 | ) -> Self |
96 | where |
97 | F: EventHandler, |
98 | G: ScanEventHandler, |
99 | { |
100 | let scan_emitter = match scan_emitter { |
101 | None => None, |
102 | Some(v) => { |
103 | // workaround for a weird type resolution bug when directly going to dyn Trait |
104 | let intermediate: Box<RefCell<dyn ScanEventHandler>> = |
105 | Box::new(RefCell::new(v)); |
106 | Some(intermediate) |
107 | } |
108 | }; |
109 | Self { |
110 | emitter: EventEmitter::new(event_handler), |
111 | scan_emitter, |
112 | build_hasher: compare_content.then(RandomState::default), |
113 | now: Instant::now(), |
114 | } |
115 | } |
116 | |
117 | /// Update internal timestamp. |
118 | pub(super) fn update_timestamp(&mut self) { |
119 | self.now = Instant::now(); |
120 | } |
121 | |
122 | /// Create [`WatchData`]. |
123 | /// |
124 | /// This function will return `Err(_)` if can not retrieve metadata from |
125 | /// the path location. (e.g., not found). |
126 | pub(super) fn build_watch_data( |
127 | &self, |
128 | root: PathBuf, |
129 | is_recursive: bool, |
130 | follow_symlinks: bool, |
131 | ) -> Option<WatchData> { |
132 | WatchData::new(self, root, is_recursive, follow_symlinks) |
133 | } |
134 | |
135 | /// Create [`PathData`]. |
136 | fn build_path_data(&self, meta_path: &MetaPath) -> PathData { |
137 | PathData::new(self, meta_path) |
138 | } |
139 | } |
140 | |
141 | impl Debug for DataBuilder { |
142 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
143 | f.debug_struct("DataBuilder" ) |
144 | .field("build_hasher" , &self.build_hasher) |
145 | .field("now" , &self.now) |
146 | .finish() |
147 | } |
148 | } |
149 | |
150 | #[derive (Debug)] |
151 | pub(super) struct WatchData { |
152 | // config part, won't change. |
153 | root: PathBuf, |
154 | is_recursive: bool, |
155 | follow_symlinks: bool, |
156 | |
157 | // current status part. |
158 | all_path_data: HashMap<PathBuf, PathData>, |
159 | } |
160 | |
161 | impl WatchData { |
162 | /// Scan filesystem and create a new `WatchData`. |
163 | /// |
164 | /// # Side effect |
165 | /// |
166 | /// This function may send event by `data_builder.emitter`. |
167 | fn new( |
168 | data_builder: &DataBuilder, |
169 | root: PathBuf, |
170 | is_recursive: bool, |
171 | follow_symlinks: bool, |
172 | ) -> Option<Self> { |
173 | // If metadata read error at `root` path, it will emit |
174 | // a error event and stop to create the whole `WatchData`. |
175 | // |
176 | // QUESTION: inconsistent? |
177 | // |
178 | // When user try to *CREATE* a watch by `poll_watcher.watch(root, ..)`, |
179 | // if `root` path hit an io error, then watcher will reject to |
180 | // create this new watch. |
181 | // |
182 | // This may inconsistent with *POLLING* a watch. When watcher |
183 | // continue polling, io error at root path will not delete |
184 | // a existing watch. polling still working. |
185 | // |
186 | // So, consider a config file may not exists at first time but may |
187 | // create after a while, developer cannot watch it. |
188 | // |
189 | // FIXME: Can we always allow to watch a path, even file not |
190 | // found at this path? |
191 | if let Err(e) = fs::metadata(&root) { |
192 | data_builder.emitter.emit_io_err(e, Some(&root)); |
193 | return None; |
194 | } |
195 | |
196 | let all_path_data = Self::scan_all_path_data( |
197 | data_builder, |
198 | root.clone(), |
199 | is_recursive, |
200 | follow_symlinks, |
201 | true, |
202 | ) |
203 | .collect(); |
204 | |
205 | Some(Self { |
206 | root, |
207 | is_recursive, |
208 | follow_symlinks, |
209 | all_path_data, |
210 | }) |
211 | } |
212 | |
213 | /// Rescan filesystem and update this `WatchData`. |
214 | /// |
215 | /// # Side effect |
216 | /// |
217 | /// This function may emit event by `data_builder.emitter`. |
218 | pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) { |
219 | // scan current filesystem. |
220 | for (path, new_path_data) in Self::scan_all_path_data( |
221 | data_builder, |
222 | self.root.clone(), |
223 | self.is_recursive, |
224 | self.follow_symlinks, |
225 | false, |
226 | ) { |
227 | let old_path_data = self |
228 | .all_path_data |
229 | .insert(path.clone(), new_path_data.clone()); |
230 | |
231 | // emit event |
232 | let event = |
233 | PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data)); |
234 | if let Some(event) = event { |
235 | data_builder.emitter.emit_ok(event); |
236 | } |
237 | } |
238 | |
239 | // scan for disappeared paths. |
240 | let mut disappeared_paths = Vec::new(); |
241 | for (path, path_data) in self.all_path_data.iter() { |
242 | if path_data.last_check < data_builder.now { |
243 | disappeared_paths.push(path.clone()); |
244 | } |
245 | } |
246 | |
247 | // remove disappeared paths |
248 | for path in disappeared_paths { |
249 | let old_path_data = self.all_path_data.remove(&path); |
250 | |
251 | // emit event |
252 | let event = PathData::compare_to_event(path, old_path_data.as_ref(), None); |
253 | if let Some(event) = event { |
254 | data_builder.emitter.emit_ok(event); |
255 | } |
256 | } |
257 | } |
258 | |
259 | /// Get all `PathData` by given configuration. |
260 | /// |
261 | /// # Side Effect |
262 | /// |
263 | /// This function may emit some IO Error events by `data_builder.emitter`. |
264 | fn scan_all_path_data( |
265 | data_builder: &'_ DataBuilder, |
266 | root: PathBuf, |
267 | is_recursive: bool, |
268 | follow_symlinks: bool, |
269 | // whether this is an initial scan, used only for events |
270 | is_initial: bool, |
271 | ) -> impl Iterator<Item = (PathBuf, PathData)> + '_ { |
272 | log::trace!("rescanning {root:?}" ); |
273 | // WalkDir return only one entry if root is a file (not a folder), |
274 | // so we can use single logic to do the both file & dir's jobs. |
275 | // |
276 | // See: https://docs.rs/walkdir/2.0.1/walkdir/struct.WalkDir.html#method.new |
277 | WalkDir::new(root) |
278 | .follow_links(follow_symlinks) |
279 | .max_depth(Self::dir_scan_depth(is_recursive)) |
280 | .into_iter() |
281 | .filter_map(|entry_res| match entry_res { |
282 | Ok(entry) => Some(entry), |
283 | Err(err) => { |
284 | log::warn!("walkdir error scanning {err:?}" ); |
285 | if let Some(io_error) = err.io_error() { |
286 | // clone an io::Error, so we have to create a new one. |
287 | let new_io_error = io::Error::new(io_error.kind(), err.to_string()); |
288 | data_builder.emitter.emit_io_err(new_io_error, err.path()); |
289 | } else { |
290 | let crate_err = |
291 | crate::Error::new(crate::ErrorKind::Generic(err.to_string())); |
292 | data_builder.emitter.emit(Err(crate_err)); |
293 | } |
294 | None |
295 | } |
296 | }) |
297 | .filter_map(move |entry| match entry.metadata() { |
298 | Ok(metadata) => { |
299 | let path = entry.into_path(); |
300 | if is_initial { |
301 | // emit initial scans |
302 | if let Some(ref emitter) = data_builder.scan_emitter { |
303 | emitter.borrow_mut().handle_event(Ok(path.clone())); |
304 | } |
305 | } |
306 | let meta_path = MetaPath::from_parts_unchecked(path, metadata); |
307 | let data_path = data_builder.build_path_data(&meta_path); |
308 | |
309 | Some((meta_path.into_path(), data_path)) |
310 | } |
311 | Err(e) => { |
312 | // emit event. |
313 | let path = entry.into_path(); |
314 | data_builder.emitter.emit_io_err(e, Some(path)); |
315 | |
316 | None |
317 | } |
318 | }) |
319 | } |
320 | |
321 | fn dir_scan_depth(is_recursive: bool) -> usize { |
322 | if is_recursive { |
323 | usize::MAX |
324 | } else { |
325 | 1 |
326 | } |
327 | } |
328 | } |
329 | |
330 | /// Stored data for a one path locations. |
331 | /// |
332 | /// See [`WatchData`] for more detail. |
333 | #[derive (Debug, Clone)] |
334 | struct PathData { |
335 | /// File updated time. |
336 | mtime: i64, |
337 | |
338 | /// Content's hash value, only available if user request compare file |
339 | /// contents and read successful. |
340 | hash: Option<u64>, |
341 | |
342 | /// Checked time. |
343 | last_check: Instant, |
344 | } |
345 | |
346 | impl PathData { |
347 | /// Create a new `PathData`. |
348 | fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData { |
349 | let metadata = meta_path.metadata(); |
350 | |
351 | PathData { |
352 | mtime: FileTime::from_last_modification_time(metadata).seconds(), |
353 | hash: data_builder |
354 | .build_hasher |
355 | .as_ref() |
356 | .filter(|_| metadata.is_file()) |
357 | .and_then(|build_hasher| { |
358 | Self::get_content_hash(build_hasher, meta_path.path()).ok() |
359 | }), |
360 | |
361 | last_check: data_builder.now, |
362 | } |
363 | } |
364 | |
365 | /// Get hash value for the data content in given file `path`. |
366 | fn get_content_hash(build_hasher: &RandomState, path: &Path) -> io::Result<u64> { |
367 | let mut hasher = build_hasher.build_hasher(); |
368 | let mut file = File::open(path)?; |
369 | let mut buf = [0; 512]; |
370 | |
371 | loop { |
372 | let n = match file.read(&mut buf) { |
373 | Ok(0) => break, |
374 | Ok(len) => len, |
375 | Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, |
376 | Err(e) => return Err(e), |
377 | }; |
378 | |
379 | hasher.write(&buf[..n]); |
380 | } |
381 | |
382 | Ok(hasher.finish()) |
383 | } |
384 | |
385 | /// Get [`Event`] by compare two optional [`PathData`]. |
386 | fn compare_to_event<P>( |
387 | path: P, |
388 | old: Option<&PathData>, |
389 | new: Option<&PathData>, |
390 | ) -> Option<Event> |
391 | where |
392 | P: Into<PathBuf>, |
393 | { |
394 | match (old, new) { |
395 | (Some(old), Some(new)) => { |
396 | if new.mtime > old.mtime { |
397 | Some(EventKind::Modify(ModifyKind::Metadata( |
398 | MetadataKind::WriteTime, |
399 | ))) |
400 | } else if new.hash != old.hash { |
401 | Some(EventKind::Modify(ModifyKind::Data(DataChange::Any))) |
402 | } else { |
403 | None |
404 | } |
405 | } |
406 | (None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)), |
407 | (Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)), |
408 | (None, None) => None, |
409 | } |
410 | .map(|event_kind| Event::new(event_kind).add_path(path.into())) |
411 | } |
412 | } |
413 | |
414 | /// Compose path and its metadata. |
415 | /// |
416 | /// This data structure designed for make sure path and its metadata can be |
417 | /// transferred in consistent way, and may avoid some duplicated |
418 | /// `fs::metadata()` function call in some situations. |
419 | #[derive (Debug)] |
420 | pub(super) struct MetaPath { |
421 | path: PathBuf, |
422 | metadata: Metadata, |
423 | } |
424 | |
425 | impl MetaPath { |
426 | /// Create `MetaPath` by given parts. |
427 | /// |
428 | /// # Invariant |
429 | /// |
430 | /// User must make sure the input `metadata` are associated with `path`. |
431 | fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self { |
432 | Self { path, metadata } |
433 | } |
434 | |
435 | fn path(&self) -> &Path { |
436 | &self.path |
437 | } |
438 | |
439 | fn metadata(&self) -> &Metadata { |
440 | &self.metadata |
441 | } |
442 | |
443 | fn into_path(self) -> PathBuf { |
444 | self.path |
445 | } |
446 | } |
447 | |
448 | /// Thin wrapper for outer event handler, for easy to use. |
449 | struct EventEmitter( |
450 | // Use `RefCell` to make sure `emit()` only need shared borrow of self (&self). |
451 | // Use `Box` to make sure EventEmitter is Sized. |
452 | Box<RefCell<dyn EventHandler>>, |
453 | ); |
454 | |
455 | impl EventEmitter { |
456 | fn new<F: EventHandler>(event_handler: F) -> Self { |
457 | Self(Box::new(RefCell::new(event_handler))) |
458 | } |
459 | |
460 | /// Emit single event. |
461 | fn emit(&self, event: crate::Result<Event>) { |
462 | self.0.borrow_mut().handle_event(event); |
463 | } |
464 | |
465 | /// Emit event. |
466 | fn emit_ok(&self, event: Event) { |
467 | self.emit(Ok(event)) |
468 | } |
469 | |
470 | /// Emit io error event. |
471 | fn emit_io_err<E, P>(&self, err: E, path: Option<P>) |
472 | where |
473 | E: Into<io::Error>, |
474 | P: Into<PathBuf>, |
475 | { |
476 | let e = crate::Error::io(err.into()); |
477 | if let Some(path) = path { |
478 | self.emit(Err(e.add_path(path.into()))); |
479 | } else { |
480 | self.emit(Err(e)); |
481 | } |
482 | } |
483 | } |
484 | } |
485 | |
486 | /// Polling based `Watcher` implementation. |
487 | /// |
488 | /// By default scans through all files and checks for changed entries based on their change date. |
489 | /// Can also be changed to perform file content change checks. |
490 | /// |
491 | /// See [Config] for more details. |
492 | #[derive (Debug)] |
493 | pub struct PollWatcher { |
494 | watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>, |
495 | data_builder: Arc<Mutex<DataBuilder>>, |
496 | want_to_stop: Arc<AtomicBool>, |
497 | /// channel to the poll loop |
498 | /// currently used only for manual polling |
499 | message_channel: Sender<()>, |
500 | delay: Option<Duration>, |
501 | follow_sylinks: bool, |
502 | } |
503 | |
504 | impl PollWatcher { |
505 | /// Create a new [`PollWatcher`], configured as needed. |
506 | pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> { |
507 | Self::with_opt::<_, ()>(event_handler, config, None) |
508 | } |
509 | |
510 | /// Actively poll for changes. Can be combined with a timeout of 0 to perform only manual polling. |
511 | pub fn poll(&self) -> crate::Result<()> { |
512 | self.message_channel |
513 | .send(()) |
514 | .map_err(|_| Error::generic("failed to send poll message" ))?; |
515 | Ok(()) |
516 | } |
517 | |
518 | /// Create a new [`PollWatcher`] with an scan event handler. |
519 | /// |
520 | /// `scan_fallback` is called on the initial scan with all files seen by the pollwatcher. |
521 | pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>( |
522 | event_handler: F, |
523 | config: Config, |
524 | scan_callback: G, |
525 | ) -> crate::Result<PollWatcher> { |
526 | Self::with_opt(event_handler, config, Some(scan_callback)) |
527 | } |
528 | |
529 | /// create a new [`PollWatcher`] with all options. |
530 | fn with_opt<F: EventHandler, G: ScanEventHandler>( |
531 | event_handler: F, |
532 | config: Config, |
533 | scan_callback: Option<G>, |
534 | ) -> crate::Result<PollWatcher> { |
535 | let data_builder = |
536 | DataBuilder::new(event_handler, config.compare_contents(), scan_callback); |
537 | |
538 | let (tx, rx) = unbounded(); |
539 | |
540 | let poll_watcher = PollWatcher { |
541 | watches: Default::default(), |
542 | data_builder: Arc::new(Mutex::new(data_builder)), |
543 | want_to_stop: Arc::new(AtomicBool::new(false)), |
544 | delay: config.poll_interval(), |
545 | follow_sylinks: config.follow_symlinks(), |
546 | message_channel: tx, |
547 | }; |
548 | |
549 | poll_watcher.run(rx); |
550 | |
551 | Ok(poll_watcher) |
552 | } |
553 | |
554 | fn run(&self, rx: Receiver<()>) { |
555 | let watches = Arc::clone(&self.watches); |
556 | let data_builder = Arc::clone(&self.data_builder); |
557 | let want_to_stop = Arc::clone(&self.want_to_stop); |
558 | let delay = self.delay; |
559 | |
560 | let _ = thread::Builder::new() |
561 | .name("notify-rs poll loop" .to_string()) |
562 | .spawn(move || { |
563 | loop { |
564 | if want_to_stop.load(Ordering::SeqCst) { |
565 | break; |
566 | } |
567 | |
568 | // HINT: Make sure always lock in the same order to avoid deadlock. |
569 | // |
570 | // FIXME: inconsistent: some place mutex poison cause panic, |
571 | // some place just ignore. |
572 | if let (Ok(mut watches), Ok(mut data_builder)) = |
573 | (watches.lock(), data_builder.lock()) |
574 | { |
575 | data_builder.update_timestamp(); |
576 | |
577 | let vals = watches.values_mut(); |
578 | for watch_data in vals { |
579 | watch_data.rescan(&mut data_builder); |
580 | } |
581 | } |
582 | // TODO: v7.0 use delay - (Instant::now().saturating_duration_since(start)) |
583 | if let Some(delay) = delay { |
584 | let _ = rx.recv_timeout(delay); |
585 | } else { |
586 | let _ = rx.recv(); |
587 | } |
588 | } |
589 | }); |
590 | } |
591 | |
592 | /// Watch a path location. |
593 | /// |
594 | /// QUESTION: this function never return an Error, is it as intend? |
595 | /// Please also consider the IO Error event problem. |
596 | fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) { |
597 | // HINT: Make sure always lock in the same order to avoid deadlock. |
598 | // |
599 | // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. |
600 | if let (Ok(mut watches), Ok(mut data_builder)) = |
601 | (self.watches.lock(), self.data_builder.lock()) |
602 | { |
603 | data_builder.update_timestamp(); |
604 | |
605 | let watch_data = data_builder.build_watch_data( |
606 | path.to_path_buf(), |
607 | recursive_mode.is_recursive(), |
608 | self.follow_sylinks, |
609 | ); |
610 | |
611 | // if create watch_data successful, add it to watching list. |
612 | if let Some(watch_data) = watch_data { |
613 | watches.insert(path.to_path_buf(), watch_data); |
614 | } |
615 | } |
616 | } |
617 | |
618 | /// Unwatch a path. |
619 | /// |
620 | /// Return `Err(_)` if given path has't be monitored. |
621 | fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> { |
622 | // FIXME: inconsistent: some place mutex poison cause panic, some place just ignore. |
623 | self.watches |
624 | .lock() |
625 | .unwrap() |
626 | .remove(path) |
627 | .map(|_| ()) |
628 | .ok_or_else(crate::Error::watch_not_found) |
629 | } |
630 | } |
631 | |
632 | impl Watcher for PollWatcher { |
633 | /// Create a new [`PollWatcher`]. |
634 | fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> { |
635 | Self::new(event_handler, config) |
636 | } |
637 | |
638 | fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> { |
639 | self.watch_inner(path, recursive_mode); |
640 | |
641 | Ok(()) |
642 | } |
643 | |
644 | fn unwatch(&mut self, path: &Path) -> crate::Result<()> { |
645 | self.unwatch_inner(path) |
646 | } |
647 | |
648 | fn kind() -> crate::WatcherKind { |
649 | crate::WatcherKind::PollWatcher |
650 | } |
651 | } |
652 | |
653 | impl Drop for PollWatcher { |
654 | fn drop(&mut self) { |
655 | self.want_to_stop.store(val:true, order:Ordering::Relaxed); |
656 | } |
657 | } |
658 | |
659 | #[test ] |
660 | fn poll_watcher_is_send_and_sync() { |
661 | fn check<T: Send + Sync>() {} |
662 | check::<PollWatcher>(); |
663 | } |
664 | |