1 | //! Watcher implementation for the inotify Linux API
|
2 | //!
|
3 | //! The inotify API provides a mechanism for monitoring filesystem events. Inotify can be used to
|
4 | //! monitor individual files, or to monitor directories. When a directory is monitored, inotify
|
5 | //! will return events for the directory itself, and for files inside the directory.
|
6 |
|
7 | use super::event::*;
|
8 | use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
|
9 | use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
|
10 | use inotify as inotify_sys;
|
11 | use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
|
12 | use std::collections::HashMap;
|
13 | use std::env;
|
14 | use std::ffi::OsStr;
|
15 | use std::fs::metadata;
|
16 | use std::os::unix::io::AsRawFd;
|
17 | use std::path::{Path, PathBuf};
|
18 | use std::sync::Arc;
|
19 | use std::thread;
|
20 | use walkdir::WalkDir;
|
21 |
|
22 | const INOTIFY: mio::Token = mio::Token(0);
|
23 | const MESSAGE: mio::Token = mio::Token(1);
|
24 |
|
25 | // The EventLoop will set up a mio::Poll and use it to wait for the following:
|
26 | //
|
27 | // - messages telling it what to do
|
28 | //
|
29 | // - events telling it that something has happened on one of the watched files.
|
30 | struct EventLoop {
|
31 | running: bool,
|
32 | poll: mio::Poll,
|
33 | event_loop_waker: Arc<mio::Waker>,
|
34 | event_loop_tx: Sender<EventLoopMsg>,
|
35 | event_loop_rx: Receiver<EventLoopMsg>,
|
36 | inotify: Option<Inotify>,
|
37 | event_handler: Box<dyn EventHandler>,
|
38 | watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
|
39 | paths: HashMap<WatchDescriptor, PathBuf>,
|
40 | rename_event: Option<Event>,
|
41 | }
|
42 |
|
43 | /// Watcher implementation based on inotify
|
44 | #[derive (Debug)]
|
45 | pub struct INotifyWatcher {
|
46 | channel: Sender<EventLoopMsg>,
|
47 | waker: Arc<mio::Waker>,
|
48 | }
|
49 |
|
50 | enum EventLoopMsg {
|
51 | AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
|
52 | RemoveWatch(PathBuf, Sender<Result<()>>),
|
53 | Shutdown,
|
54 | Configure(Config, BoundSender<Result<bool>>),
|
55 | }
|
56 |
|
57 | #[inline ]
|
58 | fn add_watch_by_event(
|
59 | path: &Option<PathBuf>,
|
60 | event: &inotify_sys::Event<&OsStr>,
|
61 | watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
|
62 | add_watches: &mut Vec<PathBuf>,
|
63 | ) {
|
64 | if let Some(ref path: &PathBuf) = *path {
|
65 | if event.mask.contains(EventMask::ISDIR) {
|
66 | if let Some(parent_path: &Path) = path.parent() {
|
67 | if let Some(&(_, _, is_recursive: bool)) = watches.get(parent_path) {
|
68 | if is_recursive {
|
69 | add_watches.push(path.to_owned());
|
70 | }
|
71 | }
|
72 | }
|
73 | }
|
74 | }
|
75 | }
|
76 |
|
77 | #[inline ]
|
78 | fn remove_watch_by_event(
|
79 | path: &Option<PathBuf>,
|
80 | watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
|
81 | remove_watches: &mut Vec<PathBuf>,
|
82 | ) {
|
83 | if let Some(ref path: &PathBuf) = *path {
|
84 | if watches.contains_key(path) {
|
85 | remove_watches.push(path.to_owned());
|
86 | }
|
87 | }
|
88 | }
|
89 |
|
90 | impl EventLoop {
|
91 | pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
|
92 | let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
|
93 | let poll = mio::Poll::new()?;
|
94 |
|
95 | let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
|
96 |
|
97 | let inotify_fd = inotify.as_raw_fd();
|
98 | let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
|
99 | poll.registry()
|
100 | .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
|
101 |
|
102 | let event_loop = EventLoop {
|
103 | running: true,
|
104 | poll,
|
105 | event_loop_waker,
|
106 | event_loop_tx,
|
107 | event_loop_rx,
|
108 | inotify: Some(inotify),
|
109 | event_handler,
|
110 | watches: HashMap::new(),
|
111 | paths: HashMap::new(),
|
112 | rename_event: None,
|
113 | };
|
114 | Ok(event_loop)
|
115 | }
|
116 |
|
117 | // Run the event loop.
|
118 | pub fn run(self) {
|
119 | let _ = thread::Builder::new()
|
120 | .name("notify-rs inotify loop" .to_string())
|
121 | .spawn(|| self.event_loop_thread());
|
122 | }
|
123 |
|
124 | fn event_loop_thread(mut self) {
|
125 | let mut events = mio::Events::with_capacity(16);
|
126 | loop {
|
127 | // Wait for something to happen.
|
128 | match self.poll.poll(&mut events, None) {
|
129 | Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
|
130 | // System call was interrupted, we will retry
|
131 | // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
|
132 | }
|
133 | Err(e) => panic!("poll failed: {}" , e),
|
134 | Ok(()) => {}
|
135 | }
|
136 |
|
137 | // Process whatever happened.
|
138 | for event in &events {
|
139 | self.handle_event(event);
|
140 | }
|
141 |
|
142 | // Stop, if we're done.
|
143 | if !self.running {
|
144 | break;
|
145 | }
|
146 | }
|
147 | }
|
148 |
|
149 | // Handle a single event.
|
150 | fn handle_event(&mut self, event: &mio::event::Event) {
|
151 | match event.token() {
|
152 | MESSAGE => {
|
153 | // The channel is readable - handle messages.
|
154 | self.handle_messages()
|
155 | }
|
156 | INOTIFY => {
|
157 | // inotify has something to tell us.
|
158 | self.handle_inotify()
|
159 | }
|
160 | _ => unreachable!(),
|
161 | }
|
162 | }
|
163 |
|
164 | fn handle_messages(&mut self) {
|
165 | while let Ok(msg) = self.event_loop_rx.try_recv() {
|
166 | match msg {
|
167 | EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
|
168 | let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
|
169 | }
|
170 | EventLoopMsg::RemoveWatch(path, tx) => {
|
171 | let _ = tx.send(self.remove_watch(path, false));
|
172 | }
|
173 | EventLoopMsg::Shutdown => {
|
174 | let _ = self.remove_all_watches();
|
175 | if let Some(inotify) = self.inotify.take() {
|
176 | let _ = inotify.close();
|
177 | }
|
178 | self.running = false;
|
179 | break;
|
180 | }
|
181 | EventLoopMsg::Configure(config, tx) => {
|
182 | self.configure_raw_mode(config, tx);
|
183 | }
|
184 | }
|
185 | }
|
186 | }
|
187 |
|
188 | fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
|
189 | tx.send(Ok(false))
|
190 | .expect("configuration channel disconnected" );
|
191 | }
|
192 |
|
193 | fn handle_inotify(&mut self) {
|
194 | let mut add_watches = Vec::new();
|
195 | let mut remove_watches = Vec::new();
|
196 |
|
197 | if let Some(ref mut inotify) = self.inotify {
|
198 | let mut buffer = [0; 1024];
|
199 | // Read all buffers available.
|
200 | loop {
|
201 | match inotify.read_events(&mut buffer) {
|
202 | Ok(events) => {
|
203 | let mut num_events = 0;
|
204 | for event in events {
|
205 | log::trace!("inotify event: {event:?}" );
|
206 |
|
207 | num_events += 1;
|
208 | if event.mask.contains(EventMask::Q_OVERFLOW) {
|
209 | let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
|
210 | self.event_handler.handle_event(ev);
|
211 | }
|
212 |
|
213 | let path = match event.name {
|
214 | Some(name) => {
|
215 | self.paths.get(&event.wd).map(|root| root.join(&name))
|
216 | }
|
217 | None => self.paths.get(&event.wd).cloned(),
|
218 | };
|
219 |
|
220 | let mut evs = Vec::new();
|
221 |
|
222 | if event.mask.contains(EventMask::MOVED_FROM) {
|
223 | remove_watch_by_event(&path, &self.watches, &mut remove_watches);
|
224 |
|
225 | let event = Event::new(EventKind::Modify(ModifyKind::Name(
|
226 | RenameMode::From,
|
227 | )))
|
228 | .add_some_path(path.clone())
|
229 | .set_tracker(event.cookie as usize);
|
230 |
|
231 | self.rename_event = Some(event.clone());
|
232 |
|
233 | evs.push(event);
|
234 | } else if event.mask.contains(EventMask::MOVED_TO) {
|
235 | evs.push(
|
236 | Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
|
237 | .set_tracker(event.cookie as usize)
|
238 | .add_some_path(path.clone()),
|
239 | );
|
240 |
|
241 | let trackers_match = self
|
242 | .rename_event
|
243 | .as_ref()
|
244 | .and_then(|e| e.tracker())
|
245 | .map_or(false, |from_tracker| {
|
246 | from_tracker == event.cookie as usize
|
247 | });
|
248 |
|
249 | if trackers_match {
|
250 | let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
|
251 | evs.push(
|
252 | Event::new(EventKind::Modify(ModifyKind::Name(
|
253 | RenameMode::Both,
|
254 | )))
|
255 | .set_tracker(event.cookie as usize)
|
256 | .add_some_path(rename_event.paths.first().cloned())
|
257 | .add_some_path(path.clone()),
|
258 | );
|
259 | }
|
260 | add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
|
261 | }
|
262 | if event.mask.contains(EventMask::MOVE_SELF) {
|
263 | evs.push(
|
264 | Event::new(EventKind::Modify(ModifyKind::Name(
|
265 | RenameMode::From,
|
266 | )))
|
267 | .add_some_path(path.clone()),
|
268 | );
|
269 | // TODO stat the path and get to new path
|
270 | // - emit To and Both events
|
271 | // - change prefix for further events
|
272 | }
|
273 | if event.mask.contains(EventMask::CREATE) {
|
274 | evs.push(
|
275 | Event::new(EventKind::Create(
|
276 | if event.mask.contains(EventMask::ISDIR) {
|
277 | CreateKind::Folder
|
278 | } else {
|
279 | CreateKind::File
|
280 | },
|
281 | ))
|
282 | .add_some_path(path.clone()),
|
283 | );
|
284 | add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
|
285 | }
|
286 | if event.mask.contains(EventMask::DELETE_SELF)
|
287 | || event.mask.contains(EventMask::DELETE)
|
288 | {
|
289 | evs.push(
|
290 | Event::new(EventKind::Remove(
|
291 | if event.mask.contains(EventMask::ISDIR) {
|
292 | RemoveKind::Folder
|
293 | } else {
|
294 | RemoveKind::File
|
295 | },
|
296 | ))
|
297 | .add_some_path(path.clone()),
|
298 | );
|
299 | remove_watch_by_event(&path, &self.watches, &mut remove_watches);
|
300 | }
|
301 | if event.mask.contains(EventMask::MODIFY) {
|
302 | evs.push(
|
303 | Event::new(EventKind::Modify(ModifyKind::Data(
|
304 | DataChange::Any,
|
305 | )))
|
306 | .add_some_path(path.clone()),
|
307 | );
|
308 | }
|
309 | if event.mask.contains(EventMask::CLOSE_WRITE) {
|
310 | evs.push(
|
311 | Event::new(EventKind::Access(AccessKind::Close(
|
312 | AccessMode::Write,
|
313 | )))
|
314 | .add_some_path(path.clone()),
|
315 | );
|
316 | }
|
317 | if event.mask.contains(EventMask::CLOSE_NOWRITE) {
|
318 | evs.push(
|
319 | Event::new(EventKind::Access(AccessKind::Close(
|
320 | AccessMode::Read,
|
321 | )))
|
322 | .add_some_path(path.clone()),
|
323 | );
|
324 | }
|
325 | if event.mask.contains(EventMask::ATTRIB) {
|
326 | evs.push(
|
327 | Event::new(EventKind::Modify(ModifyKind::Metadata(
|
328 | MetadataKind::Any,
|
329 | )))
|
330 | .add_some_path(path.clone()),
|
331 | );
|
332 | }
|
333 | if event.mask.contains(EventMask::OPEN) {
|
334 | evs.push(
|
335 | Event::new(EventKind::Access(AccessKind::Open(
|
336 | AccessMode::Any,
|
337 | )))
|
338 | .add_some_path(path.clone()),
|
339 | );
|
340 | }
|
341 |
|
342 | for ev in evs {
|
343 | self.event_handler.handle_event(Ok(ev));
|
344 | }
|
345 | }
|
346 |
|
347 | // All events read. Break out.
|
348 | if num_events == 0 {
|
349 | break;
|
350 | }
|
351 | }
|
352 | Err(e) => {
|
353 | self.event_handler.handle_event(Err(Error::io(e)));
|
354 | }
|
355 | }
|
356 | }
|
357 | }
|
358 |
|
359 | for path in remove_watches {
|
360 | self.remove_watch(path, true).ok();
|
361 | }
|
362 |
|
363 | for path in add_watches {
|
364 | self.add_watch(path, true, false).ok();
|
365 | }
|
366 | }
|
367 |
|
368 | fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
|
369 | // If the watch is not recursive, or if we determine (by stat'ing the path to get its
|
370 | // metadata) that the watched path is not a directory, add a single path watch.
|
371 | if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
|
372 | return self.add_single_watch(path, false, true);
|
373 | }
|
374 |
|
375 | for entry in WalkDir::new(path)
|
376 | .follow_links(true)
|
377 | .into_iter()
|
378 | .filter_map(filter_dir)
|
379 | {
|
380 | self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
|
381 | watch_self = false;
|
382 | }
|
383 |
|
384 | Ok(())
|
385 | }
|
386 |
|
387 | fn add_single_watch(
|
388 | &mut self,
|
389 | path: PathBuf,
|
390 | is_recursive: bool,
|
391 | watch_self: bool,
|
392 | ) -> Result<()> {
|
393 | let mut watchmask = WatchMask::ATTRIB
|
394 | | WatchMask::CREATE
|
395 | | WatchMask::DELETE
|
396 | | WatchMask::CLOSE_WRITE
|
397 | | WatchMask::MODIFY
|
398 | | WatchMask::MOVED_FROM
|
399 | | WatchMask::MOVED_TO;
|
400 |
|
401 | if watch_self {
|
402 | watchmask.insert(WatchMask::DELETE_SELF);
|
403 | watchmask.insert(WatchMask::MOVE_SELF);
|
404 | }
|
405 |
|
406 | if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
|
407 | watchmask.insert(old_watchmask);
|
408 | watchmask.insert(WatchMask::MASK_ADD);
|
409 | }
|
410 |
|
411 | if let Some(ref mut inotify) = self.inotify {
|
412 | log::trace!("adding inotify watch: {}" , path.display());
|
413 |
|
414 | match inotify.add_watch(&path, watchmask) {
|
415 | Err(e) => {
|
416 | Err(if e.raw_os_error() == Some(libc::ENOSPC) {
|
417 | // do not report inotify limits as "no more space" on linux #266
|
418 | Error::new(ErrorKind::MaxFilesWatch)
|
419 | } else {
|
420 | Error::io(e)
|
421 | }
|
422 | .add_path(path))
|
423 | }
|
424 | Ok(w) => {
|
425 | watchmask.remove(WatchMask::MASK_ADD);
|
426 | self.watches
|
427 | .insert(path.clone(), (w.clone(), watchmask, is_recursive));
|
428 | self.paths.insert(w, path);
|
429 | Ok(())
|
430 | }
|
431 | }
|
432 | } else {
|
433 | Ok(())
|
434 | }
|
435 | }
|
436 |
|
437 | fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
|
438 | match self.watches.remove(&path) {
|
439 | None => return Err(Error::watch_not_found().add_path(path)),
|
440 | Some((w, _, is_recursive)) => {
|
441 | if let Some(ref mut inotify) = self.inotify {
|
442 | log::trace!("removing inotify watch: {}" , path.display());
|
443 |
|
444 | inotify
|
445 | .rm_watch(w.clone())
|
446 | .map_err(|e| Error::io(e).add_path(path.clone()))?;
|
447 | self.paths.remove(&w);
|
448 |
|
449 | if is_recursive || remove_recursive {
|
450 | let mut remove_list = Vec::new();
|
451 | for (w, p) in &self.paths {
|
452 | if p.starts_with(&path) {
|
453 | inotify
|
454 | .rm_watch(w.clone())
|
455 | .map_err(|e| Error::io(e).add_path(p.into()))?;
|
456 | self.watches.remove(p);
|
457 | remove_list.push(w.clone());
|
458 | }
|
459 | }
|
460 | for w in remove_list {
|
461 | self.paths.remove(&w);
|
462 | }
|
463 | }
|
464 | }
|
465 | }
|
466 | }
|
467 | Ok(())
|
468 | }
|
469 |
|
470 | fn remove_all_watches(&mut self) -> Result<()> {
|
471 | if let Some(ref mut inotify) = self.inotify {
|
472 | for (w, p) in &self.paths {
|
473 | inotify
|
474 | .rm_watch(w.clone())
|
475 | .map_err(|e| Error::io(e).add_path(p.into()))?;
|
476 | }
|
477 | self.watches.clear();
|
478 | self.paths.clear();
|
479 | }
|
480 | Ok(())
|
481 | }
|
482 | }
|
483 |
|
484 | /// return `DirEntry` when it is a directory
|
485 | fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
|
486 | if let Ok(e: DirEntry) = e {
|
487 | if let Ok(metadata: Metadata) = e.metadata() {
|
488 | if metadata.is_dir() {
|
489 | return Some(e);
|
490 | }
|
491 | }
|
492 | }
|
493 | None
|
494 | }
|
495 |
|
496 | impl INotifyWatcher {
|
497 | fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
|
498 | let inotify = Inotify::init()?;
|
499 | let event_loop = EventLoop::new(inotify, event_handler)?;
|
500 | let channel = event_loop.event_loop_tx.clone();
|
501 | let waker = event_loop.event_loop_waker.clone();
|
502 | event_loop.run();
|
503 | Ok(INotifyWatcher { channel, waker })
|
504 | }
|
505 |
|
506 | fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
|
507 | let pb = if path.is_absolute() {
|
508 | path.to_owned()
|
509 | } else {
|
510 | let p = env::current_dir().map_err(Error::io)?;
|
511 | p.join(path)
|
512 | };
|
513 | let (tx, rx) = unbounded();
|
514 | let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
|
515 |
|
516 | // we expect the event loop to live and reply => unwraps must not panic
|
517 | self.channel.send(msg).unwrap();
|
518 | self.waker.wake().unwrap();
|
519 | rx.recv().unwrap()
|
520 | }
|
521 |
|
522 | fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
|
523 | let pb = if path.is_absolute() {
|
524 | path.to_owned()
|
525 | } else {
|
526 | let p = env::current_dir().map_err(Error::io)?;
|
527 | p.join(path)
|
528 | };
|
529 | let (tx, rx) = unbounded();
|
530 | let msg = EventLoopMsg::RemoveWatch(pb, tx);
|
531 |
|
532 | // we expect the event loop to live and reply => unwraps must not panic
|
533 | self.channel.send(msg).unwrap();
|
534 | self.waker.wake().unwrap();
|
535 | rx.recv().unwrap()
|
536 | }
|
537 | }
|
538 |
|
539 | impl Watcher for INotifyWatcher {
|
540 | /// Create a new watcher.
|
541 | fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
|
542 | Self::from_event_handler(Box::new(event_handler))
|
543 | }
|
544 |
|
545 | fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
|
546 | self.watch_inner(path, recursive_mode)
|
547 | }
|
548 |
|
549 | fn unwatch(&mut self, path: &Path) -> Result<()> {
|
550 | self.unwatch_inner(path)
|
551 | }
|
552 |
|
553 | fn configure(&mut self, config: Config) -> Result<bool> {
|
554 | let (tx, rx) = bounded(1);
|
555 | self.channel.send(EventLoopMsg::Configure(config, tx))?;
|
556 | self.waker.wake()?;
|
557 | rx.recv()?
|
558 | }
|
559 |
|
560 | fn kind() -> crate::WatcherKind {
|
561 | crate::WatcherKind::Inotify
|
562 | }
|
563 | }
|
564 |
|
565 | impl Drop for INotifyWatcher {
|
566 | fn drop(&mut self) {
|
567 | // we expect the event loop to live => unwrap must not panic
|
568 | self.channel.send(EventLoopMsg::Shutdown).unwrap();
|
569 | self.waker.wake().unwrap();
|
570 | }
|
571 | }
|
572 |
|
573 | #[test ]
|
574 | fn inotify_watcher_is_send_and_sync() {
|
575 | fn check<T: Send + Sync>() {}
|
576 | check::<INotifyWatcher>();
|
577 | }
|
578 | |