1//! Watcher implementation for the inotify Linux API
2//!
3//! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to
4//! monitor individual files, or to monitor directories. When a directory is monitored, inotify
5//! will return events for the directory itself, and for files inside the directory.
6
7use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25// The EventLoop will set up a mio::Poll and use it to wait for the following:
26//
27// - messages telling it what to do
28//
29// - events telling it that something has happened on one of the watched files.
30struct EventLoop {
31 running: bool,
32 poll: mio::Poll,
33 event_loop_waker: Arc<mio::Waker>,
34 event_loop_tx: Sender<EventLoopMsg>,
35 event_loop_rx: Receiver<EventLoopMsg>,
36 inotify: Option<Inotify>,
37 event_handler: Box<dyn EventHandler>,
38 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
39 paths: HashMap<WatchDescriptor, PathBuf>,
40 rename_event: Option<Event>,
41}
42
43/// Watcher implementation based on inotify
44#[derive(Debug)]
45pub struct INotifyWatcher {
46 channel: Sender<EventLoopMsg>,
47 waker: Arc<mio::Waker>,
48}
49
50enum EventLoopMsg {
51 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
52 RemoveWatch(PathBuf, Sender<Result<()>>),
53 Shutdown,
54 Configure(Config, BoundSender<Result<bool>>),
55}
56
57#[inline]
58fn add_watch_by_event(
59 path: &Option<PathBuf>,
60 event: &inotify_sys::Event<&OsStr>,
61 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
62 add_watches: &mut Vec<PathBuf>,
63) {
64 if let Some(ref path: &PathBuf) = *path {
65 if event.mask.contains(EventMask::ISDIR) {
66 if let Some(parent_path: &Path) = path.parent() {
67 if let Some(&(_, _, is_recursive: bool)) = watches.get(parent_path) {
68 if is_recursive {
69 add_watches.push(path.to_owned());
70 }
71 }
72 }
73 }
74 }
75}
76
77#[inline]
78fn remove_watch_by_event(
79 path: &Option<PathBuf>,
80 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
81 remove_watches: &mut Vec<PathBuf>,
82) {
83 if let Some(ref path: &PathBuf) = *path {
84 if watches.contains_key(path) {
85 remove_watches.push(path.to_owned());
86 }
87 }
88}
89
90impl EventLoop {
91 pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
92 let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
93 let poll = mio::Poll::new()?;
94
95 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
96
97 let inotify_fd = inotify.as_raw_fd();
98 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
99 poll.registry()
100 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
101
102 let event_loop = EventLoop {
103 running: true,
104 poll,
105 event_loop_waker,
106 event_loop_tx,
107 event_loop_rx,
108 inotify: Some(inotify),
109 event_handler,
110 watches: HashMap::new(),
111 paths: HashMap::new(),
112 rename_event: None,
113 };
114 Ok(event_loop)
115 }
116
117 // Run the event loop.
118 pub fn run(self) {
119 let _ = thread::Builder::new()
120 .name("notify-rs inotify loop".to_string())
121 .spawn(|| self.event_loop_thread());
122 }
123
124 fn event_loop_thread(mut self) {
125 let mut events = mio::Events::with_capacity(16);
126 loop {
127 // Wait for something to happen.
128 match self.poll.poll(&mut events, None) {
129 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
130 // System call was interrupted, we will retry
131 // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
132 }
133 Err(e) => panic!("poll failed: {}", e),
134 Ok(()) => {}
135 }
136
137 // Process whatever happened.
138 for event in &events {
139 self.handle_event(event);
140 }
141
142 // Stop, if we're done.
143 if !self.running {
144 break;
145 }
146 }
147 }
148
149 // Handle a single event.
150 fn handle_event(&mut self, event: &mio::event::Event) {
151 match event.token() {
152 MESSAGE => {
153 // The channel is readable - handle messages.
154 self.handle_messages()
155 }
156 INOTIFY => {
157 // inotify has something to tell us.
158 self.handle_inotify()
159 }
160 _ => unreachable!(),
161 }
162 }
163
164 fn handle_messages(&mut self) {
165 while let Ok(msg) = self.event_loop_rx.try_recv() {
166 match msg {
167 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
168 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
169 }
170 EventLoopMsg::RemoveWatch(path, tx) => {
171 let _ = tx.send(self.remove_watch(path, false));
172 }
173 EventLoopMsg::Shutdown => {
174 let _ = self.remove_all_watches();
175 if let Some(inotify) = self.inotify.take() {
176 let _ = inotify.close();
177 }
178 self.running = false;
179 break;
180 }
181 EventLoopMsg::Configure(config, tx) => {
182 self.configure_raw_mode(config, tx);
183 }
184 }
185 }
186 }
187
188 fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
189 tx.send(Ok(false))
190 .expect("configuration channel disconnected");
191 }
192
193 fn handle_inotify(&mut self) {
194 let mut add_watches = Vec::new();
195 let mut remove_watches = Vec::new();
196
197 if let Some(ref mut inotify) = self.inotify {
198 let mut buffer = [0; 1024];
199 // Read all buffers available.
200 loop {
201 match inotify.read_events(&mut buffer) {
202 Ok(events) => {
203 let mut num_events = 0;
204 for event in events {
205 log::trace!("inotify event: {event:?}");
206
207 num_events += 1;
208 if event.mask.contains(EventMask::Q_OVERFLOW) {
209 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
210 self.event_handler.handle_event(ev);
211 }
212
213 let path = match event.name {
214 Some(name) => {
215 self.paths.get(&event.wd).map(|root| root.join(&name))
216 }
217 None => self.paths.get(&event.wd).cloned(),
218 };
219
220 let mut evs = Vec::new();
221
222 if event.mask.contains(EventMask::MOVED_FROM) {
223 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
224
225 let event = Event::new(EventKind::Modify(ModifyKind::Name(
226 RenameMode::From,
227 )))
228 .add_some_path(path.clone())
229 .set_tracker(event.cookie as usize);
230
231 self.rename_event = Some(event.clone());
232
233 evs.push(event);
234 } else if event.mask.contains(EventMask::MOVED_TO) {
235 evs.push(
236 Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
237 .set_tracker(event.cookie as usize)
238 .add_some_path(path.clone()),
239 );
240
241 let trackers_match = self
242 .rename_event
243 .as_ref()
244 .and_then(|e| e.tracker())
245 .map_or(false, |from_tracker| {
246 from_tracker == event.cookie as usize
247 });
248
249 if trackers_match {
250 let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
251 evs.push(
252 Event::new(EventKind::Modify(ModifyKind::Name(
253 RenameMode::Both,
254 )))
255 .set_tracker(event.cookie as usize)
256 .add_some_path(rename_event.paths.first().cloned())
257 .add_some_path(path.clone()),
258 );
259 }
260 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
261 }
262 if event.mask.contains(EventMask::MOVE_SELF) {
263 evs.push(
264 Event::new(EventKind::Modify(ModifyKind::Name(
265 RenameMode::From,
266 )))
267 .add_some_path(path.clone()),
268 );
269 // TODO stat the path and get to new path
270 // - emit To and Both events
271 // - change prefix for further events
272 }
273 if event.mask.contains(EventMask::CREATE) {
274 evs.push(
275 Event::new(EventKind::Create(
276 if event.mask.contains(EventMask::ISDIR) {
277 CreateKind::Folder
278 } else {
279 CreateKind::File
280 },
281 ))
282 .add_some_path(path.clone()),
283 );
284 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
285 }
286 if event.mask.contains(EventMask::DELETE_SELF)
287 || event.mask.contains(EventMask::DELETE)
288 {
289 evs.push(
290 Event::new(EventKind::Remove(
291 if event.mask.contains(EventMask::ISDIR) {
292 RemoveKind::Folder
293 } else {
294 RemoveKind::File
295 },
296 ))
297 .add_some_path(path.clone()),
298 );
299 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
300 }
301 if event.mask.contains(EventMask::MODIFY) {
302 evs.push(
303 Event::new(EventKind::Modify(ModifyKind::Data(
304 DataChange::Any,
305 )))
306 .add_some_path(path.clone()),
307 );
308 }
309 if event.mask.contains(EventMask::CLOSE_WRITE) {
310 evs.push(
311 Event::new(EventKind::Access(AccessKind::Close(
312 AccessMode::Write,
313 )))
314 .add_some_path(path.clone()),
315 );
316 }
317 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
318 evs.push(
319 Event::new(EventKind::Access(AccessKind::Close(
320 AccessMode::Read,
321 )))
322 .add_some_path(path.clone()),
323 );
324 }
325 if event.mask.contains(EventMask::ATTRIB) {
326 evs.push(
327 Event::new(EventKind::Modify(ModifyKind::Metadata(
328 MetadataKind::Any,
329 )))
330 .add_some_path(path.clone()),
331 );
332 }
333 if event.mask.contains(EventMask::OPEN) {
334 evs.push(
335 Event::new(EventKind::Access(AccessKind::Open(
336 AccessMode::Any,
337 )))
338 .add_some_path(path.clone()),
339 );
340 }
341
342 for ev in evs {
343 self.event_handler.handle_event(Ok(ev));
344 }
345 }
346
347 // All events read. Break out.
348 if num_events == 0 {
349 break;
350 }
351 }
352 Err(e) => {
353 self.event_handler.handle_event(Err(Error::io(e)));
354 }
355 }
356 }
357 }
358
359 for path in remove_watches {
360 self.remove_watch(path, true).ok();
361 }
362
363 for path in add_watches {
364 self.add_watch(path, true, false).ok();
365 }
366 }
367
368 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
369 // If the watch is not recursive, or if we determine (by stat'ing the path to get its
370 // metadata) that the watched path is not a directory, add a single path watch.
371 if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
372 return self.add_single_watch(path, false, true);
373 }
374
375 for entry in WalkDir::new(path)
376 .follow_links(true)
377 .into_iter()
378 .filter_map(filter_dir)
379 {
380 self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
381 watch_self = false;
382 }
383
384 Ok(())
385 }
386
387 fn add_single_watch(
388 &mut self,
389 path: PathBuf,
390 is_recursive: bool,
391 watch_self: bool,
392 ) -> Result<()> {
393 let mut watchmask = WatchMask::ATTRIB
394 | WatchMask::CREATE
395 | WatchMask::DELETE
396 | WatchMask::CLOSE_WRITE
397 | WatchMask::MODIFY
398 | WatchMask::MOVED_FROM
399 | WatchMask::MOVED_TO;
400
401 if watch_self {
402 watchmask.insert(WatchMask::DELETE_SELF);
403 watchmask.insert(WatchMask::MOVE_SELF);
404 }
405
406 if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
407 watchmask.insert(old_watchmask);
408 watchmask.insert(WatchMask::MASK_ADD);
409 }
410
411 if let Some(ref mut inotify) = self.inotify {
412 log::trace!("adding inotify watch: {}", path.display());
413
414 match inotify.add_watch(&path, watchmask) {
415 Err(e) => {
416 Err(if e.raw_os_error() == Some(libc::ENOSPC) {
417 // do not report inotify limits as "no more space" on linux #266
418 Error::new(ErrorKind::MaxFilesWatch)
419 } else {
420 Error::io(e)
421 }
422 .add_path(path))
423 }
424 Ok(w) => {
425 watchmask.remove(WatchMask::MASK_ADD);
426 self.watches
427 .insert(path.clone(), (w.clone(), watchmask, is_recursive));
428 self.paths.insert(w, path);
429 Ok(())
430 }
431 }
432 } else {
433 Ok(())
434 }
435 }
436
437 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
438 match self.watches.remove(&path) {
439 None => return Err(Error::watch_not_found().add_path(path)),
440 Some((w, _, is_recursive)) => {
441 if let Some(ref mut inotify) = self.inotify {
442 log::trace!("removing inotify watch: {}", path.display());
443
444 inotify
445 .rm_watch(w.clone())
446 .map_err(|e| Error::io(e).add_path(path.clone()))?;
447 self.paths.remove(&w);
448
449 if is_recursive || remove_recursive {
450 let mut remove_list = Vec::new();
451 for (w, p) in &self.paths {
452 if p.starts_with(&path) {
453 inotify
454 .rm_watch(w.clone())
455 .map_err(|e| Error::io(e).add_path(p.into()))?;
456 self.watches.remove(p);
457 remove_list.push(w.clone());
458 }
459 }
460 for w in remove_list {
461 self.paths.remove(&w);
462 }
463 }
464 }
465 }
466 }
467 Ok(())
468 }
469
470 fn remove_all_watches(&mut self) -> Result<()> {
471 if let Some(ref mut inotify) = self.inotify {
472 for (w, p) in &self.paths {
473 inotify
474 .rm_watch(w.clone())
475 .map_err(|e| Error::io(e).add_path(p.into()))?;
476 }
477 self.watches.clear();
478 self.paths.clear();
479 }
480 Ok(())
481 }
482}
483
484/// return `DirEntry` when it is a directory
485fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
486 if let Ok(e: DirEntry) = e {
487 if let Ok(metadata: Metadata) = e.metadata() {
488 if metadata.is_dir() {
489 return Some(e);
490 }
491 }
492 }
493 None
494}
495
496impl INotifyWatcher {
497 fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
498 let inotify = Inotify::init()?;
499 let event_loop = EventLoop::new(inotify, event_handler)?;
500 let channel = event_loop.event_loop_tx.clone();
501 let waker = event_loop.event_loop_waker.clone();
502 event_loop.run();
503 Ok(INotifyWatcher { channel, waker })
504 }
505
506 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
507 let pb = if path.is_absolute() {
508 path.to_owned()
509 } else {
510 let p = env::current_dir().map_err(Error::io)?;
511 p.join(path)
512 };
513 let (tx, rx) = unbounded();
514 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
515
516 // we expect the event loop to live and reply => unwraps must not panic
517 self.channel.send(msg).unwrap();
518 self.waker.wake().unwrap();
519 rx.recv().unwrap()
520 }
521
522 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
523 let pb = if path.is_absolute() {
524 path.to_owned()
525 } else {
526 let p = env::current_dir().map_err(Error::io)?;
527 p.join(path)
528 };
529 let (tx, rx) = unbounded();
530 let msg = EventLoopMsg::RemoveWatch(pb, tx);
531
532 // we expect the event loop to live and reply => unwraps must not panic
533 self.channel.send(msg).unwrap();
534 self.waker.wake().unwrap();
535 rx.recv().unwrap()
536 }
537}
538
539impl Watcher for INotifyWatcher {
540 /// Create a new watcher.
541 fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
542 Self::from_event_handler(Box::new(event_handler))
543 }
544
545 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
546 self.watch_inner(path, recursive_mode)
547 }
548
549 fn unwatch(&mut self, path: &Path) -> Result<()> {
550 self.unwatch_inner(path)
551 }
552
553 fn configure(&mut self, config: Config) -> Result<bool> {
554 let (tx, rx) = bounded(1);
555 self.channel.send(EventLoopMsg::Configure(config, tx))?;
556 self.waker.wake()?;
557 rx.recv()?
558 }
559
560 fn kind() -> crate::WatcherKind {
561 crate::WatcherKind::Inotify
562 }
563}
564
565impl Drop for INotifyWatcher {
566 fn drop(&mut self) {
567 // we expect the event loop to live => unwrap must not panic
568 self.channel.send(EventLoopMsg::Shutdown).unwrap();
569 self.waker.wake().unwrap();
570 }
571}
572
573#[test]
574fn inotify_watcher_is_send_and_sync() {
575 fn check<T: Send + Sync>() {}
576 check::<INotifyWatcher>();
577}
578