1 | //! Watcher implementation for the inotify Linux API |
2 | //! |
3 | //! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to |
4 | //! monitor individual files, or to monitor directories. When a directory is monitored, inotify |
5 | //! will return events for the directory itself, and for files inside the directory. |
6 | |
7 | use super::event::*; |
8 | use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher}; |
9 | use crate::{bounded, unbounded, BoundSender, Receiver, Sender}; |
10 | use inotify as inotify_sys; |
11 | use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask}; |
12 | use std::collections::HashMap; |
13 | use std::env; |
14 | use std::ffi::OsStr; |
15 | use std::fs::metadata; |
16 | use std::os::unix::io::AsRawFd; |
17 | use std::path::{Path, PathBuf}; |
18 | use std::sync::Arc; |
19 | use std::thread; |
20 | use walkdir::WalkDir; |
21 | |
22 | const INOTIFY: mio::Token = mio::Token(0); |
23 | const MESSAGE: mio::Token = mio::Token(1); |
24 | |
25 | // The EventLoop will set up a mio::Poll and use it to wait for the following: |
26 | // |
27 | // - messages telling it what to do |
28 | // |
29 | // - events telling it that something has happened on one of the watched files. |
30 | |
31 | struct EventLoop { |
32 | running: bool, |
33 | poll: mio::Poll, |
34 | event_loop_waker: Arc<mio::Waker>, |
35 | event_loop_tx: Sender<EventLoopMsg>, |
36 | event_loop_rx: Receiver<EventLoopMsg>, |
37 | inotify: Option<Inotify>, |
38 | event_handler: Box<dyn EventHandler>, |
39 | /// PathBuf -> (WatchDescriptor, WatchMask, is_recursive, is_dir) |
40 | watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>, |
41 | paths: HashMap<WatchDescriptor, PathBuf>, |
42 | rename_event: Option<Event>, |
43 | follow_links: bool, |
44 | } |
45 | |
46 | /// Watcher implementation based on inotify |
47 | #[derive (Debug)] |
48 | pub struct INotifyWatcher { |
49 | channel: Sender<EventLoopMsg>, |
50 | waker: Arc<mio::Waker>, |
51 | } |
52 | |
53 | enum EventLoopMsg { |
54 | AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>), |
55 | RemoveWatch(PathBuf, Sender<Result<()>>), |
56 | Shutdown, |
57 | Configure(Config, BoundSender<Result<bool>>), |
58 | } |
59 | |
60 | #[inline ] |
61 | fn add_watch_by_event( |
62 | path: &Option<PathBuf>, |
63 | event: &inotify_sys::Event<&OsStr>, |
64 | watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>, |
65 | add_watches: &mut Vec<PathBuf>, |
66 | ) { |
67 | if let Some(ref path: &PathBuf) = *path { |
68 | if event.mask.contains(EventMask::ISDIR) { |
69 | if let Some(parent_path: &Path) = path.parent() { |
70 | if let Some(&(_, _, is_recursive: bool, _)) = watches.get(parent_path) { |
71 | if is_recursive { |
72 | add_watches.push(path.to_owned()); |
73 | } |
74 | } |
75 | } |
76 | } |
77 | } |
78 | } |
79 | |
80 | #[inline ] |
81 | fn remove_watch_by_event( |
82 | path: &Option<PathBuf>, |
83 | watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>, |
84 | remove_watches: &mut Vec<PathBuf>, |
85 | ) { |
86 | if let Some(ref path: &PathBuf) = *path { |
87 | if watches.contains_key(path) { |
88 | remove_watches.push(path.to_owned()); |
89 | } |
90 | } |
91 | } |
92 | |
93 | impl EventLoop { |
94 | pub fn new( |
95 | inotify: Inotify, |
96 | event_handler: Box<dyn EventHandler>, |
97 | follow_links: bool, |
98 | ) -> Result<Self> { |
99 | let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>(); |
100 | let poll = mio::Poll::new()?; |
101 | |
102 | let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?); |
103 | |
104 | let inotify_fd = inotify.as_raw_fd(); |
105 | let mut evented_inotify = mio::unix::SourceFd(&inotify_fd); |
106 | poll.registry() |
107 | .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?; |
108 | |
109 | let event_loop = EventLoop { |
110 | running: true, |
111 | poll, |
112 | event_loop_waker, |
113 | event_loop_tx, |
114 | event_loop_rx, |
115 | inotify: Some(inotify), |
116 | event_handler, |
117 | watches: HashMap::new(), |
118 | paths: HashMap::new(), |
119 | rename_event: None, |
120 | follow_links, |
121 | }; |
122 | Ok(event_loop) |
123 | } |
124 | |
125 | // Run the event loop. |
126 | pub fn run(self) { |
127 | let _ = thread::Builder::new() |
128 | .name("notify-rs inotify loop" .to_string()) |
129 | .spawn(|| self.event_loop_thread()); |
130 | } |
131 | |
132 | fn event_loop_thread(mut self) { |
133 | let mut events = mio::Events::with_capacity(16); |
134 | loop { |
135 | // Wait for something to happen. |
136 | match self.poll.poll(&mut events, None) { |
137 | Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => { |
138 | // System call was interrupted, we will retry |
139 | // TODO: Not covered by tests (to reproduce likely need to setup signal handlers) |
140 | } |
141 | Err(e) => panic!("poll failed: {}" , e), |
142 | Ok(()) => {} |
143 | } |
144 | |
145 | // Process whatever happened. |
146 | for event in &events { |
147 | self.handle_event(event); |
148 | } |
149 | |
150 | // Stop, if we're done. |
151 | if !self.running { |
152 | break; |
153 | } |
154 | } |
155 | } |
156 | |
157 | // Handle a single event. |
158 | fn handle_event(&mut self, event: &mio::event::Event) { |
159 | match event.token() { |
160 | MESSAGE => { |
161 | // The channel is readable - handle messages. |
162 | self.handle_messages() |
163 | } |
164 | INOTIFY => { |
165 | // inotify has something to tell us. |
166 | self.handle_inotify() |
167 | } |
168 | _ => unreachable!(), |
169 | } |
170 | } |
171 | |
172 | fn handle_messages(&mut self) { |
173 | while let Ok(msg) = self.event_loop_rx.try_recv() { |
174 | match msg { |
175 | EventLoopMsg::AddWatch(path, recursive_mode, tx) => { |
176 | let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true)); |
177 | } |
178 | EventLoopMsg::RemoveWatch(path, tx) => { |
179 | let _ = tx.send(self.remove_watch(path, false)); |
180 | } |
181 | EventLoopMsg::Shutdown => { |
182 | let _ = self.remove_all_watches(); |
183 | if let Some(inotify) = self.inotify.take() { |
184 | let _ = inotify.close(); |
185 | } |
186 | self.running = false; |
187 | break; |
188 | } |
189 | EventLoopMsg::Configure(config, tx) => { |
190 | self.configure_raw_mode(config, tx); |
191 | } |
192 | } |
193 | } |
194 | } |
195 | |
196 | fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) { |
197 | tx.send(Ok(false)) |
198 | .expect("configuration channel disconnected" ); |
199 | } |
200 | |
201 | fn handle_inotify(&mut self) { |
202 | let mut add_watches = Vec::new(); |
203 | let mut remove_watches = Vec::new(); |
204 | |
205 | if let Some(ref mut inotify) = self.inotify { |
206 | let mut buffer = [0; 1024]; |
207 | // Read all buffers available. |
208 | loop { |
209 | match inotify.read_events(&mut buffer) { |
210 | Ok(events) => { |
211 | let mut num_events = 0; |
212 | for event in events { |
213 | log::trace!("inotify event: {event:?}" ); |
214 | |
215 | num_events += 1; |
216 | if event.mask.contains(EventMask::Q_OVERFLOW) { |
217 | let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan)); |
218 | self.event_handler.handle_event(ev); |
219 | } |
220 | |
221 | let path = match event.name { |
222 | Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)), |
223 | None => self.paths.get(&event.wd).cloned(), |
224 | }; |
225 | |
226 | let mut evs = Vec::new(); |
227 | |
228 | if event.mask.contains(EventMask::MOVED_FROM) { |
229 | remove_watch_by_event(&path, &self.watches, &mut remove_watches); |
230 | |
231 | let event = Event::new(EventKind::Modify(ModifyKind::Name( |
232 | RenameMode::From, |
233 | ))) |
234 | .add_some_path(path.clone()) |
235 | .set_tracker(event.cookie as usize); |
236 | |
237 | self.rename_event = Some(event.clone()); |
238 | |
239 | evs.push(event); |
240 | } else if event.mask.contains(EventMask::MOVED_TO) { |
241 | evs.push( |
242 | Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To))) |
243 | .set_tracker(event.cookie as usize) |
244 | .add_some_path(path.clone()), |
245 | ); |
246 | |
247 | let trackers_match = self |
248 | .rename_event |
249 | .as_ref() |
250 | .and_then(|e| e.tracker()) |
251 | .map_or(false, |from_tracker| { |
252 | from_tracker == event.cookie as usize |
253 | }); |
254 | |
255 | if trackers_match { |
256 | let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point |
257 | evs.push( |
258 | Event::new(EventKind::Modify(ModifyKind::Name( |
259 | RenameMode::Both, |
260 | ))) |
261 | .set_tracker(event.cookie as usize) |
262 | .add_some_path(rename_event.paths.first().cloned()) |
263 | .add_some_path(path.clone()), |
264 | ); |
265 | } |
266 | add_watch_by_event(&path, &event, &self.watches, &mut add_watches); |
267 | } |
268 | if event.mask.contains(EventMask::MOVE_SELF) { |
269 | evs.push( |
270 | Event::new(EventKind::Modify(ModifyKind::Name( |
271 | RenameMode::From, |
272 | ))) |
273 | .add_some_path(path.clone()), |
274 | ); |
275 | // TODO stat the path and get to new path |
276 | // - emit To and Both events |
277 | // - change prefix for further events |
278 | } |
279 | if event.mask.contains(EventMask::CREATE) { |
280 | evs.push( |
281 | Event::new(EventKind::Create( |
282 | if event.mask.contains(EventMask::ISDIR) { |
283 | CreateKind::Folder |
284 | } else { |
285 | CreateKind::File |
286 | }, |
287 | )) |
288 | .add_some_path(path.clone()), |
289 | ); |
290 | add_watch_by_event(&path, &event, &self.watches, &mut add_watches); |
291 | } |
292 | if event.mask.contains(EventMask::DELETE) { |
293 | evs.push( |
294 | Event::new(EventKind::Remove( |
295 | if event.mask.contains(EventMask::ISDIR) { |
296 | RemoveKind::Folder |
297 | } else { |
298 | RemoveKind::File |
299 | }, |
300 | )) |
301 | .add_some_path(path.clone()), |
302 | ); |
303 | remove_watch_by_event(&path, &self.watches, &mut remove_watches); |
304 | } |
305 | if event.mask.contains(EventMask::DELETE_SELF) { |
306 | let remove_kind = match &path { |
307 | Some(watched_path) => { |
308 | let current_watch = self.watches.get(watched_path); |
309 | match current_watch { |
310 | Some(&(_, _, _, true)) => RemoveKind::Folder, |
311 | Some(&(_, _, _, false)) => RemoveKind::File, |
312 | None => RemoveKind::Other, |
313 | } |
314 | } |
315 | None => { |
316 | log::trace!( |
317 | "No patch for DELETE_SELF event, may be a bug?" |
318 | ); |
319 | RemoveKind::Other |
320 | } |
321 | }; |
322 | evs.push( |
323 | Event::new(EventKind::Remove(remove_kind)) |
324 | .add_some_path(path.clone()), |
325 | ); |
326 | remove_watch_by_event(&path, &self.watches, &mut remove_watches); |
327 | } |
328 | if event.mask.contains(EventMask::MODIFY) { |
329 | evs.push( |
330 | Event::new(EventKind::Modify(ModifyKind::Data( |
331 | DataChange::Any, |
332 | ))) |
333 | .add_some_path(path.clone()), |
334 | ); |
335 | } |
336 | if event.mask.contains(EventMask::CLOSE_WRITE) { |
337 | evs.push( |
338 | Event::new(EventKind::Access(AccessKind::Close( |
339 | AccessMode::Write, |
340 | ))) |
341 | .add_some_path(path.clone()), |
342 | ); |
343 | } |
344 | if event.mask.contains(EventMask::CLOSE_NOWRITE) { |
345 | evs.push( |
346 | Event::new(EventKind::Access(AccessKind::Close( |
347 | AccessMode::Read, |
348 | ))) |
349 | .add_some_path(path.clone()), |
350 | ); |
351 | } |
352 | if event.mask.contains(EventMask::ATTRIB) { |
353 | evs.push( |
354 | Event::new(EventKind::Modify(ModifyKind::Metadata( |
355 | MetadataKind::Any, |
356 | ))) |
357 | .add_some_path(path.clone()), |
358 | ); |
359 | } |
360 | if event.mask.contains(EventMask::OPEN) { |
361 | evs.push( |
362 | Event::new(EventKind::Access(AccessKind::Open( |
363 | AccessMode::Any, |
364 | ))) |
365 | .add_some_path(path.clone()), |
366 | ); |
367 | } |
368 | |
369 | for ev in evs { |
370 | self.event_handler.handle_event(Ok(ev)); |
371 | } |
372 | } |
373 | |
374 | // All events read. Break out. |
375 | if num_events == 0 { |
376 | break; |
377 | } |
378 | } |
379 | Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { |
380 | // No events read. Break out. |
381 | break; |
382 | } |
383 | Err(e) => { |
384 | self.event_handler.handle_event(Err(Error::io(e))); |
385 | } |
386 | } |
387 | } |
388 | } |
389 | |
390 | for path in remove_watches { |
391 | self.remove_watch(path, true).ok(); |
392 | } |
393 | |
394 | for path in add_watches { |
395 | self.add_watch(path, true, false).ok(); |
396 | } |
397 | } |
398 | |
399 | fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> { |
400 | // If the watch is not recursive, or if we determine (by stat'ing the path to get its |
401 | // metadata) that the watched path is not a directory, add a single path watch. |
402 | if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() { |
403 | return self.add_single_watch(path, false, true); |
404 | } |
405 | |
406 | for entry in WalkDir::new(path) |
407 | .follow_links(self.follow_links) |
408 | .into_iter() |
409 | .filter_map(filter_dir) |
410 | { |
411 | self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?; |
412 | watch_self = false; |
413 | } |
414 | |
415 | Ok(()) |
416 | } |
417 | |
418 | fn add_single_watch( |
419 | &mut self, |
420 | path: PathBuf, |
421 | is_recursive: bool, |
422 | watch_self: bool, |
423 | ) -> Result<()> { |
424 | let mut watchmask = WatchMask::ATTRIB |
425 | | WatchMask::CREATE |
426 | | WatchMask::OPEN |
427 | | WatchMask::DELETE |
428 | | WatchMask::CLOSE_WRITE |
429 | | WatchMask::MODIFY |
430 | | WatchMask::MOVED_FROM |
431 | | WatchMask::MOVED_TO; |
432 | |
433 | if watch_self { |
434 | watchmask.insert(WatchMask::DELETE_SELF); |
435 | watchmask.insert(WatchMask::MOVE_SELF); |
436 | } |
437 | |
438 | if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) { |
439 | watchmask.insert(old_watchmask); |
440 | watchmask.insert(WatchMask::MASK_ADD); |
441 | } |
442 | |
443 | if let Some(ref mut inotify) = self.inotify { |
444 | log::trace!("adding inotify watch: {}" , path.display()); |
445 | |
446 | match inotify.watches().add(&path, watchmask) { |
447 | Err(e) => { |
448 | Err(if e.raw_os_error() == Some(libc::ENOSPC) { |
449 | // do not report inotify limits as "no more space" on linux #266 |
450 | Error::new(ErrorKind::MaxFilesWatch) |
451 | } else { |
452 | Error::io(e) |
453 | } |
454 | .add_path(path)) |
455 | } |
456 | Ok(w) => { |
457 | watchmask.remove(WatchMask::MASK_ADD); |
458 | let is_dir = metadata(&path).map_err(Error::io)?.is_dir(); |
459 | self.watches |
460 | .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir)); |
461 | self.paths.insert(w, path); |
462 | Ok(()) |
463 | } |
464 | } |
465 | } else { |
466 | Ok(()) |
467 | } |
468 | } |
469 | |
470 | fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> { |
471 | match self.watches.remove(&path) { |
472 | None => return Err(Error::watch_not_found().add_path(path)), |
473 | Some((w, _, is_recursive, _)) => { |
474 | if let Some(ref mut inotify) = self.inotify { |
475 | let mut inotify_watches = inotify.watches(); |
476 | log::trace!("removing inotify watch: {}" , path.display()); |
477 | |
478 | inotify_watches |
479 | .remove(w.clone()) |
480 | .map_err(|e| Error::io(e).add_path(path.clone()))?; |
481 | self.paths.remove(&w); |
482 | |
483 | if is_recursive || remove_recursive { |
484 | let mut remove_list = Vec::new(); |
485 | for (w, p) in &self.paths { |
486 | if p.starts_with(&path) { |
487 | inotify_watches |
488 | .remove(w.clone()) |
489 | .map_err(|e| Error::io(e).add_path(p.into()))?; |
490 | self.watches.remove(p); |
491 | remove_list.push(w.clone()); |
492 | } |
493 | } |
494 | for w in remove_list { |
495 | self.paths.remove(&w); |
496 | } |
497 | } |
498 | } |
499 | } |
500 | } |
501 | Ok(()) |
502 | } |
503 | |
504 | fn remove_all_watches(&mut self) -> Result<()> { |
505 | if let Some(ref mut inotify) = self.inotify { |
506 | let mut inotify_watches = inotify.watches(); |
507 | for (w, p) in &self.paths { |
508 | inotify_watches |
509 | .remove(w.clone()) |
510 | .map_err(|e| Error::io(e).add_path(p.into()))?; |
511 | } |
512 | self.watches.clear(); |
513 | self.paths.clear(); |
514 | } |
515 | Ok(()) |
516 | } |
517 | } |
518 | |
519 | /// return `DirEntry` when it is a directory |
520 | fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> { |
521 | if let Ok(e: DirEntry) = e { |
522 | if let Ok(metadata: Metadata) = e.metadata() { |
523 | if metadata.is_dir() { |
524 | return Some(e); |
525 | } |
526 | } |
527 | } |
528 | None |
529 | } |
530 | |
531 | impl INotifyWatcher { |
532 | fn from_event_handler( |
533 | event_handler: Box<dyn EventHandler>, |
534 | follow_links: bool, |
535 | ) -> Result<Self> { |
536 | let inotify = Inotify::init()?; |
537 | let event_loop = EventLoop::new(inotify, event_handler, follow_links)?; |
538 | let channel = event_loop.event_loop_tx.clone(); |
539 | let waker = event_loop.event_loop_waker.clone(); |
540 | event_loop.run(); |
541 | Ok(INotifyWatcher { channel, waker }) |
542 | } |
543 | |
544 | fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { |
545 | let pb = if path.is_absolute() { |
546 | path.to_owned() |
547 | } else { |
548 | let p = env::current_dir().map_err(Error::io)?; |
549 | p.join(path) |
550 | }; |
551 | let (tx, rx) = unbounded(); |
552 | let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx); |
553 | |
554 | // we expect the event loop to live and reply => unwraps must not panic |
555 | self.channel.send(msg).unwrap(); |
556 | self.waker.wake().unwrap(); |
557 | rx.recv().unwrap() |
558 | } |
559 | |
560 | fn unwatch_inner(&mut self, path: &Path) -> Result<()> { |
561 | let pb = if path.is_absolute() { |
562 | path.to_owned() |
563 | } else { |
564 | let p = env::current_dir().map_err(Error::io)?; |
565 | p.join(path) |
566 | }; |
567 | let (tx, rx) = unbounded(); |
568 | let msg = EventLoopMsg::RemoveWatch(pb, tx); |
569 | |
570 | // we expect the event loop to live and reply => unwraps must not panic |
571 | self.channel.send(msg).unwrap(); |
572 | self.waker.wake().unwrap(); |
573 | rx.recv().unwrap() |
574 | } |
575 | } |
576 | |
577 | impl Watcher for INotifyWatcher { |
578 | /// Create a new watcher. |
579 | fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> { |
580 | Self::from_event_handler(Box::new(event_handler), config.follow_symlinks()) |
581 | } |
582 | |
583 | fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { |
584 | self.watch_inner(path, recursive_mode) |
585 | } |
586 | |
587 | fn unwatch(&mut self, path: &Path) -> Result<()> { |
588 | self.unwatch_inner(path) |
589 | } |
590 | |
591 | fn configure(&mut self, config: Config) -> Result<bool> { |
592 | let (tx, rx) = bounded(1); |
593 | self.channel.send(EventLoopMsg::Configure(config, tx))?; |
594 | self.waker.wake()?; |
595 | rx.recv()? |
596 | } |
597 | |
598 | fn kind() -> crate::WatcherKind { |
599 | crate::WatcherKind::Inotify |
600 | } |
601 | } |
602 | |
603 | impl Drop for INotifyWatcher { |
604 | fn drop(&mut self) { |
605 | // we expect the event loop to live => unwrap must not panic |
606 | self.channel.send(EventLoopMsg::Shutdown).unwrap(); |
607 | self.waker.wake().unwrap(); |
608 | } |
609 | } |
610 | |
611 | #[test ] |
612 | fn inotify_watcher_is_send_and_sync() { |
613 | fn check<T: Send + Sync>() {} |
614 | check::<INotifyWatcher>(); |
615 | } |
616 | |