1 | //! A generic event source wrapping an IO objects or file descriptor |
2 | //! |
3 | //! You can use this general purpose adapter around file-descriptor backed objects to |
4 | //! insert into an [`EventLoop`](crate::EventLoop). |
5 | //! |
6 | //! The event generated by this [`Generic`] event source are the [`Readiness`](crate::Readiness) |
7 | //! notification itself, and the monitored object is provided to your callback as the second |
8 | //! argument. |
9 | //! |
10 | #![cfg_attr (unix, doc = "```" )] |
11 | #![cfg_attr (not(unix), doc = "```no_run" )] |
12 | //! # extern crate calloop; |
13 | //! use calloop::{generic::Generic, Interest, Mode, PostAction}; |
14 | //! |
15 | //! # fn main() { |
16 | //! # let mut event_loop = calloop::EventLoop::<()>::try_new() |
17 | //! # .expect("Failed to initialize the event loop!" ); |
18 | //! # let handle = event_loop.handle(); |
19 | //! # #[cfg (unix)] |
20 | //! # let io_object = std::io::stdin(); |
21 | //! # #[cfg (windows)] |
22 | //! # let io_object: std::net::TcpStream = panic!(); |
23 | //! handle.insert_source( |
24 | //! // wrap your IO object in a Generic, here we register for read readiness |
25 | //! // in level-triggering mode |
26 | //! Generic::new(io_object, Interest::READ, Mode::Level), |
27 | //! |readiness, io_object, shared_data| { |
28 | //! // The first argument of the callback is a Readiness |
29 | //! // The second is a &mut reference to your object |
30 | //! |
31 | //! // your callback needs to return a Result<PostAction, std::io::Error> |
32 | //! // if it returns an error, the event loop will consider this event |
33 | //! // event source as erroring and report it to the user. |
34 | //! Ok(PostAction::Continue) |
35 | //! } |
36 | //! ); |
37 | //! # } |
38 | //! ``` |
39 | //! |
40 | //! It can also help you implementing your own event sources: just have |
41 | //! these `Generic<_>` as fields of your event source, and delegate the |
42 | //! [`EventSource`](crate::EventSource) implementation to them. |
43 | |
44 | use polling::Poller; |
45 | use std::{borrow, marker::PhantomData, ops, sync::Arc}; |
46 | |
47 | #[cfg (unix)] |
48 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd}; |
49 | #[cfg (windows)] |
50 | use std::os::windows::io::{ |
51 | AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, |
52 | }; |
53 | |
54 | use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory}; |
55 | |
56 | /// Wrapper to use a type implementing `AsRawFd` but not `AsFd` with `Generic` |
57 | #[derive (Debug)] |
58 | pub struct FdWrapper<T: AsRawFd>(T); |
59 | |
60 | impl<T: AsRawFd> FdWrapper<T> { |
61 | /// Wrap `inner` with an `AsFd` implementation. |
62 | /// |
63 | /// # Safety |
64 | /// This is safe if the `AsRawFd` implementation of `inner` always returns |
65 | /// a valid fd. This should usually be true for types implementing |
66 | /// `AsRawFd`. But this isn't guaranteed with `FdWrapper<RawFd>`. |
67 | pub unsafe fn new(inner: T) -> Self { |
68 | Self(inner) |
69 | } |
70 | } |
71 | |
72 | impl<T: AsRawFd> ops::Deref for FdWrapper<T> { |
73 | type Target = T; |
74 | |
75 | fn deref(&self) -> &Self::Target { |
76 | &self.0 |
77 | } |
78 | } |
79 | |
80 | impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> { |
81 | fn deref_mut(&mut self) -> &mut Self::Target { |
82 | &mut self.0 |
83 | } |
84 | } |
85 | |
86 | impl<T: AsRawFd> AsFd for FdWrapper<T> { |
87 | #[cfg (unix)] |
88 | fn as_fd(&self) -> BorrowedFd { |
89 | unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) } |
90 | } |
91 | |
92 | #[cfg (windows)] |
93 | fn as_socket(&self) -> BorrowedFd { |
94 | unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) } |
95 | } |
96 | } |
97 | |
98 | /// A wrapper around a type that doesn't expose it mutably safely. |
99 | /// |
100 | /// The [`EventSource`] trait's `Metadata` type demands mutable access to the inner I/O source. |
101 | /// However, the inner polling source used by `calloop` keeps the handle-based equivalent of an |
102 | /// immutable pointer to the underlying object's I/O handle. Therefore, if the inner source is |
103 | /// dropped, this leaves behind a dangling pointer which immediately invokes undefined behavior |
104 | /// on the next poll of the event loop. |
105 | /// |
106 | /// In order to prevent this from happening, the [`Generic`] I/O source must not directly expose |
107 | /// a mutable reference to the underlying handle. This type wraps around the underlying handle and |
108 | /// easily allows users to take immutable (`&`) references to the type, but makes mutable (`&mut`) |
109 | /// references unsafe to get. Therefore, it prevents the source from being moved out and dropped |
110 | /// while it is still registered in the event loop. |
111 | /// |
112 | /// [`EventSource`]: crate::EventSource |
113 | #[derive (Debug)] |
114 | pub struct NoIoDrop<T>(T); |
115 | |
116 | impl<T> NoIoDrop<T> { |
117 | /// Get a mutable reference. |
118 | /// |
119 | /// # Safety |
120 | /// |
121 | /// The inner type's I/O source must not be dropped. |
122 | pub unsafe fn get_mut(&mut self) -> &mut T { |
123 | &mut self.0 |
124 | } |
125 | } |
126 | |
127 | impl<T> AsRef<T> for NoIoDrop<T> { |
128 | fn as_ref(&self) -> &T { |
129 | &self.0 |
130 | } |
131 | } |
132 | |
133 | impl<T> borrow::Borrow<T> for NoIoDrop<T> { |
134 | fn borrow(&self) -> &T { |
135 | &self.0 |
136 | } |
137 | } |
138 | |
139 | impl<T> ops::Deref for NoIoDrop<T> { |
140 | type Target = T; |
141 | |
142 | fn deref(&self) -> &Self::Target { |
143 | &self.0 |
144 | } |
145 | } |
146 | |
147 | impl<T: AsFd> AsFd for NoIoDrop<T> { |
148 | #[cfg (unix)] |
149 | fn as_fd(&self) -> BorrowedFd<'_> { |
150 | // SAFETY: The innter type is not mutated. |
151 | self.0.as_fd() |
152 | } |
153 | |
154 | #[cfg (windows)] |
155 | fn as_socket(&self) -> BorrowedFd<'_> { |
156 | // SAFETY: The innter type is not mutated. |
157 | self.0.as_socket() |
158 | } |
159 | } |
160 | |
161 | /// A generic event source wrapping a FD-backed type |
162 | #[derive (Debug)] |
163 | pub struct Generic<F: AsFd, E = std::io::Error> { |
164 | /// The wrapped FD-backed type. |
165 | /// |
166 | /// This must be deregistered before it is dropped. |
167 | file: Option<NoIoDrop<F>>, |
168 | /// The programmed interest |
169 | pub interest: Interest, |
170 | /// The programmed mode |
171 | pub mode: Mode, |
172 | |
173 | /// Back-reference to the poller. |
174 | /// |
175 | /// This is needed to drop the original file. |
176 | poller: Option<Arc<Poller>>, |
177 | |
178 | // This token is used by the event loop logic to look up this source when an |
179 | // event occurs. |
180 | token: Option<Token>, |
181 | |
182 | // This allows us to make the associated error and return types generic. |
183 | _error_type: PhantomData<E>, |
184 | } |
185 | |
186 | impl<F: AsFd> Generic<F, std::io::Error> { |
187 | /// Wrap a FD-backed type into a `Generic` event source that uses |
188 | /// [`std::io::Error`] as its error type. |
189 | pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> { |
190 | Generic { |
191 | file: Some(NoIoDrop(file)), |
192 | interest, |
193 | mode, |
194 | token: None, |
195 | poller: None, |
196 | _error_type: PhantomData, |
197 | } |
198 | } |
199 | |
200 | /// Wrap a FD-backed type into a `Generic` event source using an arbitrary error type. |
201 | pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> { |
202 | Generic { |
203 | file: Some(NoIoDrop(file)), |
204 | interest, |
205 | mode, |
206 | token: None, |
207 | poller: None, |
208 | _error_type: PhantomData, |
209 | } |
210 | } |
211 | } |
212 | |
213 | impl<F: AsFd, E> Generic<F, E> { |
214 | /// Unwrap the `Generic` source to retrieve the underlying type |
215 | pub fn unwrap(mut self) -> F { |
216 | let NoIoDrop(file) = self.file.take().unwrap(); |
217 | |
218 | // Remove it from the poller. |
219 | if let Some(poller) = self.poller.take() { |
220 | poller |
221 | .delete( |
222 | #[cfg (unix)] |
223 | file.as_fd(), |
224 | #[cfg (windows)] |
225 | file.as_socket(), |
226 | ) |
227 | .ok(); |
228 | } |
229 | |
230 | file |
231 | } |
232 | |
233 | /// Get a reference to the underlying type. |
234 | pub fn get_ref(&self) -> &F { |
235 | &self.file.as_ref().unwrap().0 |
236 | } |
237 | |
238 | /// Get a mutable reference to the underlying type. |
239 | /// |
240 | /// # Safety |
241 | /// |
242 | /// This is unsafe because it allows you to modify the underlying type, which |
243 | /// allows you to drop the underlying event source. Dropping the underlying source |
244 | /// leads to a dangling reference. |
245 | pub unsafe fn get_mut(&mut self) -> &mut F { |
246 | self.file.as_mut().unwrap().get_mut() |
247 | } |
248 | } |
249 | |
250 | impl<F: AsFd, E> Drop for Generic<F, E> { |
251 | fn drop(&mut self) { |
252 | // Remove it from the poller. |
253 | if let (Some(file: NoIoDrop), Some(poller: Arc)) = (self.file.take(), self.poller.take()) { |
254 | pollerResult<(), Error> |
255 | .delete( |
256 | #[cfg (unix)] |
257 | file.as_fd(), |
258 | #[cfg (windows)] |
259 | file.as_socket(), |
260 | ) |
261 | .ok(); |
262 | } |
263 | } |
264 | } |
265 | |
266 | impl<F, E> EventSource for Generic<F, E> |
267 | where |
268 | F: AsFd, |
269 | E: Into<Box<dyn std::error::Error + Send + Sync>>, |
270 | { |
271 | type Event = Readiness; |
272 | type Metadata = NoIoDrop<F>; |
273 | type Ret = Result<PostAction, E>; |
274 | type Error = E; |
275 | |
276 | fn process_events<C>( |
277 | &mut self, |
278 | readiness: Readiness, |
279 | token: Token, |
280 | mut callback: C, |
281 | ) -> Result<PostAction, Self::Error> |
282 | where |
283 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
284 | { |
285 | // If the token is invalid or not ours, skip processing. |
286 | if self.token != Some(token) { |
287 | return Ok(PostAction::Continue); |
288 | } |
289 | |
290 | callback(readiness, self.file.as_mut().unwrap()) |
291 | } |
292 | |
293 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
294 | let token = token_factory.token(); |
295 | |
296 | // SAFETY: We ensure that we have a poller to deregister with (see below). |
297 | unsafe { |
298 | poll.register( |
299 | &self.file.as_ref().unwrap().0, |
300 | self.interest, |
301 | self.mode, |
302 | token, |
303 | )?; |
304 | } |
305 | |
306 | // Make sure we can use the poller to deregister if need be. |
307 | // But only if registration actually succeeded |
308 | // So that we don't try to unregister the FD on drop if it wasn't registered |
309 | // in the first place (for example if registration failed because of a duplicate insertion) |
310 | self.poller = Some(poll.poller().clone()); |
311 | self.token = Some(token); |
312 | |
313 | Ok(()) |
314 | } |
315 | |
316 | fn reregister( |
317 | &mut self, |
318 | poll: &mut Poll, |
319 | token_factory: &mut TokenFactory, |
320 | ) -> crate::Result<()> { |
321 | let token = token_factory.token(); |
322 | |
323 | poll.reregister( |
324 | &self.file.as_ref().unwrap().0, |
325 | self.interest, |
326 | self.mode, |
327 | token, |
328 | )?; |
329 | |
330 | self.token = Some(token); |
331 | Ok(()) |
332 | } |
333 | |
334 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
335 | poll.unregister(&self.file.as_ref().unwrap().0)?; |
336 | self.poller = None; |
337 | self.token = None; |
338 | Ok(()) |
339 | } |
340 | } |
341 | |
342 | #[cfg (all(unix, test))] |
343 | mod tests { |
344 | use std::io::{Read, Write}; |
345 | |
346 | use super::Generic; |
347 | use crate::{Dispatcher, Interest, Mode, PostAction}; |
348 | #[cfg (unix)] |
349 | #[test ] |
350 | fn dispatch_unix() { |
351 | use std::os::unix::net::UnixStream; |
352 | |
353 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
354 | |
355 | let handle = event_loop.handle(); |
356 | |
357 | let (mut tx, rx) = UnixStream::pair().unwrap(); |
358 | |
359 | let generic = Generic::new(rx, Interest::READ, Mode::Level); |
360 | |
361 | let mut dispached = false; |
362 | |
363 | let _generic_token = handle |
364 | .insert_source(generic, move |readiness, file, d| { |
365 | assert!(readiness.readable); |
366 | // we have not registered for writability |
367 | assert!(!readiness.writable); |
368 | let mut buffer = vec![0; 10]; |
369 | let ret = (&**file).read(&mut buffer).unwrap(); |
370 | assert_eq!(ret, 6); |
371 | assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]); |
372 | |
373 | *d = true; |
374 | Ok(PostAction::Continue) |
375 | }) |
376 | .unwrap(); |
377 | |
378 | event_loop |
379 | .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) |
380 | .unwrap(); |
381 | |
382 | assert!(!dispached); |
383 | |
384 | let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap(); |
385 | assert_eq!(ret, 6); |
386 | tx.flush().unwrap(); |
387 | |
388 | event_loop |
389 | .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) |
390 | .unwrap(); |
391 | |
392 | assert!(dispached); |
393 | } |
394 | |
395 | #[test ] |
396 | fn register_deregister_unix() { |
397 | use std::os::unix::net::UnixStream; |
398 | |
399 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
400 | |
401 | let handle = event_loop.handle(); |
402 | |
403 | let (mut tx, rx) = UnixStream::pair().unwrap(); |
404 | |
405 | let generic = Generic::new(rx, Interest::READ, Mode::Level); |
406 | let dispatcher = Dispatcher::new(generic, move |_, _, d| { |
407 | *d = true; |
408 | Ok(PostAction::Continue) |
409 | }); |
410 | |
411 | let mut dispached = false; |
412 | |
413 | let generic_token = handle.register_dispatcher(dispatcher.clone()).unwrap(); |
414 | |
415 | event_loop |
416 | .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) |
417 | .unwrap(); |
418 | |
419 | assert!(!dispached); |
420 | |
421 | // remove the source, and then write something |
422 | |
423 | event_loop.handle().remove(generic_token); |
424 | |
425 | let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap(); |
426 | assert_eq!(ret, 6); |
427 | tx.flush().unwrap(); |
428 | |
429 | event_loop |
430 | .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) |
431 | .unwrap(); |
432 | |
433 | // the source has not been dispatched, as the source is no longer here |
434 | assert!(!dispached); |
435 | |
436 | // insert it again |
437 | let generic = dispatcher.into_source_inner(); |
438 | let _generic_token = handle |
439 | .insert_source(generic, move |readiness, file, d| { |
440 | assert!(readiness.readable); |
441 | // we have not registered for writability |
442 | assert!(!readiness.writable); |
443 | let mut buffer = vec![0; 10]; |
444 | let ret = (&**file).read(&mut buffer).unwrap(); |
445 | assert_eq!(ret, 6); |
446 | assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]); |
447 | |
448 | *d = true; |
449 | Ok(PostAction::Continue) |
450 | }) |
451 | .unwrap(); |
452 | |
453 | event_loop |
454 | .dispatch(Some(::std::time::Duration::ZERO), &mut dispached) |
455 | .unwrap(); |
456 | |
457 | // the has now been properly dispatched |
458 | assert!(dispached); |
459 | } |
460 | |
461 | // Duplicate insertion does not fail on all platforms, but does on Linux |
462 | #[cfg (target_os = "linux" )] |
463 | #[test ] |
464 | fn duplicate_insert() { |
465 | use std::os::unix::{ |
466 | io::{AsFd, BorrowedFd}, |
467 | net::UnixStream, |
468 | }; |
469 | let event_loop = crate::EventLoop::<()>::try_new().unwrap(); |
470 | |
471 | let handle = event_loop.handle(); |
472 | |
473 | let (_, rx) = UnixStream::pair().unwrap(); |
474 | |
475 | // Rc only implements AsFd since 1.69... |
476 | struct RcFd<T> { |
477 | rc: std::rc::Rc<T>, |
478 | } |
479 | |
480 | impl<T: AsFd> AsFd for RcFd<T> { |
481 | fn as_fd(&self) -> BorrowedFd<'_> { |
482 | self.rc.as_fd() |
483 | } |
484 | } |
485 | |
486 | let rx = std::rc::Rc::new(rx); |
487 | |
488 | let token = handle |
489 | .insert_source( |
490 | Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level), |
491 | |_, _, _| Ok(PostAction::Continue), |
492 | ) |
493 | .unwrap(); |
494 | |
495 | // inserting the same FD a second time should fail |
496 | let ret = handle.insert_source( |
497 | Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level), |
498 | |_, _, _| Ok(PostAction::Continue), |
499 | ); |
500 | assert!(ret.is_err()); |
501 | std::mem::drop(ret); |
502 | |
503 | // but the original token is still registered |
504 | handle.update(&token).unwrap(); |
505 | } |
506 | } |
507 | |