1//! A generic event source wrapping an IO objects or file descriptor
2//!
3//! You can use this general purpose adapter around file-descriptor backed objects to
4//! insert into an [`EventLoop`](crate::EventLoop).
5//!
6//! The event generated by this [`Generic`] event source are the [`Readiness`](crate::Readiness)
7//! notification itself, and the monitored object is provided to your callback as the second
8//! argument.
9//!
10#![cfg_attr(unix, doc = "```")]
11#![cfg_attr(not(unix), doc = "```no_run")]
12//! # extern crate calloop;
13//! use calloop::{generic::Generic, Interest, Mode, PostAction};
14//!
15//! # fn main() {
16//! # let mut event_loop = calloop::EventLoop::<()>::try_new()
17//! # .expect("Failed to initialize the event loop!");
18//! # let handle = event_loop.handle();
19//! # #[cfg(unix)]
20//! # let io_object = std::io::stdin();
21//! # #[cfg(windows)]
22//! # let io_object: std::net::TcpStream = panic!();
23//! handle.insert_source(
24//! // wrap your IO object in a Generic, here we register for read readiness
25//! // in level-triggering mode
26//! Generic::new(io_object, Interest::READ, Mode::Level),
27//! |readiness, io_object, shared_data| {
28//! // The first argument of the callback is a Readiness
29//! // The second is a &mut reference to your object
30//!
31//! // your callback needs to return a Result<PostAction, std::io::Error>
32//! // if it returns an error, the event loop will consider this event
33//! // event source as erroring and report it to the user.
34//! Ok(PostAction::Continue)
35//! }
36//! );
37//! # }
38//! ```
39//!
40//! It can also help you implementing your own event sources: just have
41//! these `Generic<_>` as fields of your event source, and delegate the
42//! [`EventSource`](crate::EventSource) implementation to them.
43
44use polling::Poller;
45use std::{borrow, marker::PhantomData, ops, sync::Arc};
46
47#[cfg(unix)]
48use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd};
49#[cfg(windows)]
50use std::os::windows::io::{
51 AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd,
52};
53
54use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
55
56/// Wrapper to use a type implementing `AsRawFd` but not `AsFd` with `Generic`
57#[derive(Debug)]
58pub struct FdWrapper<T: AsRawFd>(T);
59
60impl<T: AsRawFd> FdWrapper<T> {
61 /// Wrap `inner` with an `AsFd` implementation.
62 ///
63 /// # Safety
64 /// This is safe if the `AsRawFd` implementation of `inner` always returns
65 /// a valid fd. This should usually be true for types implementing
66 /// `AsRawFd`. But this isn't guaranteed with `FdWrapper<RawFd>`.
67 pub unsafe fn new(inner: T) -> Self {
68 Self(inner)
69 }
70}
71
72impl<T: AsRawFd> ops::Deref for FdWrapper<T> {
73 type Target = T;
74
75 fn deref(&self) -> &Self::Target {
76 &self.0
77 }
78}
79
80impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
81 fn deref_mut(&mut self) -> &mut Self::Target {
82 &mut self.0
83 }
84}
85
86impl<T: AsRawFd> AsFd for FdWrapper<T> {
87 #[cfg(unix)]
88 fn as_fd(&self) -> BorrowedFd {
89 unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
90 }
91
92 #[cfg(windows)]
93 fn as_socket(&self) -> BorrowedFd {
94 unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) }
95 }
96}
97
98/// A wrapper around a type that doesn't expose it mutably safely.
99///
100/// The [`EventSource`] trait's `Metadata` type demands mutable access to the inner I/O source.
101/// However, the inner polling source used by `calloop` keeps the handle-based equivalent of an
102/// immutable pointer to the underlying object's I/O handle. Therefore, if the inner source is
103/// dropped, this leaves behind a dangling pointer which immediately invokes undefined behavior
104/// on the next poll of the event loop.
105///
106/// In order to prevent this from happening, the [`Generic`] I/O source must not directly expose
107/// a mutable reference to the underlying handle. This type wraps around the underlying handle and
108/// easily allows users to take immutable (`&`) references to the type, but makes mutable (`&mut`)
109/// references unsafe to get. Therefore, it prevents the source from being moved out and dropped
110/// while it is still registered in the event loop.
111///
112/// [`EventSource`]: crate::EventSource
113#[derive(Debug)]
114pub struct NoIoDrop<T>(T);
115
116impl<T> NoIoDrop<T> {
117 /// Get a mutable reference.
118 ///
119 /// # Safety
120 ///
121 /// The inner type's I/O source must not be dropped.
122 pub unsafe fn get_mut(&mut self) -> &mut T {
123 &mut self.0
124 }
125}
126
127impl<T> AsRef<T> for NoIoDrop<T> {
128 fn as_ref(&self) -> &T {
129 &self.0
130 }
131}
132
133impl<T> borrow::Borrow<T> for NoIoDrop<T> {
134 fn borrow(&self) -> &T {
135 &self.0
136 }
137}
138
139impl<T> ops::Deref for NoIoDrop<T> {
140 type Target = T;
141
142 fn deref(&self) -> &Self::Target {
143 &self.0
144 }
145}
146
147impl<T: AsFd> AsFd for NoIoDrop<T> {
148 #[cfg(unix)]
149 fn as_fd(&self) -> BorrowedFd<'_> {
150 // SAFETY: The innter type is not mutated.
151 self.0.as_fd()
152 }
153
154 #[cfg(windows)]
155 fn as_socket(&self) -> BorrowedFd<'_> {
156 // SAFETY: The innter type is not mutated.
157 self.0.as_socket()
158 }
159}
160
161/// A generic event source wrapping a FD-backed type
162#[derive(Debug)]
163pub struct Generic<F: AsFd, E = std::io::Error> {
164 /// The wrapped FD-backed type.
165 ///
166 /// This must be deregistered before it is dropped.
167 file: Option<NoIoDrop<F>>,
168 /// The programmed interest
169 pub interest: Interest,
170 /// The programmed mode
171 pub mode: Mode,
172
173 /// Back-reference to the poller.
174 ///
175 /// This is needed to drop the original file.
176 poller: Option<Arc<Poller>>,
177
178 // This token is used by the event loop logic to look up this source when an
179 // event occurs.
180 token: Option<Token>,
181
182 // This allows us to make the associated error and return types generic.
183 _error_type: PhantomData<E>,
184}
185
186impl<F: AsFd> Generic<F, std::io::Error> {
187 /// Wrap a FD-backed type into a `Generic` event source that uses
188 /// [`std::io::Error`] as its error type.
189 pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> {
190 Generic {
191 file: Some(NoIoDrop(file)),
192 interest,
193 mode,
194 token: None,
195 poller: None,
196 _error_type: PhantomData,
197 }
198 }
199
200 /// Wrap a FD-backed type into a `Generic` event source using an arbitrary error type.
201 pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> {
202 Generic {
203 file: Some(NoIoDrop(file)),
204 interest,
205 mode,
206 token: None,
207 poller: None,
208 _error_type: PhantomData,
209 }
210 }
211}
212
213impl<F: AsFd, E> Generic<F, E> {
214 /// Unwrap the `Generic` source to retrieve the underlying type
215 pub fn unwrap(mut self) -> F {
216 let NoIoDrop(file) = self.file.take().unwrap();
217
218 // Remove it from the poller.
219 if let Some(poller) = self.poller.take() {
220 poller
221 .delete(
222 #[cfg(unix)]
223 file.as_fd(),
224 #[cfg(windows)]
225 file.as_socket(),
226 )
227 .ok();
228 }
229
230 file
231 }
232
233 /// Get a reference to the underlying type.
234 pub fn get_ref(&self) -> &F {
235 &self.file.as_ref().unwrap().0
236 }
237
238 /// Get a mutable reference to the underlying type.
239 ///
240 /// # Safety
241 ///
242 /// This is unsafe because it allows you to modify the underlying type, which
243 /// allows you to drop the underlying event source. Dropping the underlying source
244 /// leads to a dangling reference.
245 pub unsafe fn get_mut(&mut self) -> &mut F {
246 self.file.as_mut().unwrap().get_mut()
247 }
248}
249
250impl<F: AsFd, E> Drop for Generic<F, E> {
251 fn drop(&mut self) {
252 // Remove it from the poller.
253 if let (Some(file: NoIoDrop), Some(poller: Arc)) = (self.file.take(), self.poller.take()) {
254 pollerResult<(), Error>
255 .delete(
256 #[cfg(unix)]
257 file.as_fd(),
258 #[cfg(windows)]
259 file.as_socket(),
260 )
261 .ok();
262 }
263 }
264}
265
266impl<F, E> EventSource for Generic<F, E>
267where
268 F: AsFd,
269 E: Into<Box<dyn std::error::Error + Send + Sync>>,
270{
271 type Event = Readiness;
272 type Metadata = NoIoDrop<F>;
273 type Ret = Result<PostAction, E>;
274 type Error = E;
275
276 fn process_events<C>(
277 &mut self,
278 readiness: Readiness,
279 token: Token,
280 mut callback: C,
281 ) -> Result<PostAction, Self::Error>
282 where
283 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
284 {
285 // If the token is invalid or not ours, skip processing.
286 if self.token != Some(token) {
287 return Ok(PostAction::Continue);
288 }
289
290 callback(readiness, self.file.as_mut().unwrap())
291 }
292
293 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
294 let token = token_factory.token();
295
296 // SAFETY: We ensure that we have a poller to deregister with (see below).
297 unsafe {
298 poll.register(
299 &self.file.as_ref().unwrap().0,
300 self.interest,
301 self.mode,
302 token,
303 )?;
304 }
305
306 // Make sure we can use the poller to deregister if need be.
307 // But only if registration actually succeeded
308 // So that we don't try to unregister the FD on drop if it wasn't registered
309 // in the first place (for example if registration failed because of a duplicate insertion)
310 self.poller = Some(poll.poller().clone());
311 self.token = Some(token);
312
313 Ok(())
314 }
315
316 fn reregister(
317 &mut self,
318 poll: &mut Poll,
319 token_factory: &mut TokenFactory,
320 ) -> crate::Result<()> {
321 let token = token_factory.token();
322
323 poll.reregister(
324 &self.file.as_ref().unwrap().0,
325 self.interest,
326 self.mode,
327 token,
328 )?;
329
330 self.token = Some(token);
331 Ok(())
332 }
333
334 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
335 poll.unregister(&self.file.as_ref().unwrap().0)?;
336 self.poller = None;
337 self.token = None;
338 Ok(())
339 }
340}
341
342#[cfg(all(unix, test))]
343mod tests {
344 use std::io::{Read, Write};
345
346 use super::Generic;
347 use crate::{Dispatcher, Interest, Mode, PostAction};
348 #[cfg(unix)]
349 #[test]
350 fn dispatch_unix() {
351 use std::os::unix::net::UnixStream;
352
353 let mut event_loop = crate::EventLoop::try_new().unwrap();
354
355 let handle = event_loop.handle();
356
357 let (mut tx, rx) = UnixStream::pair().unwrap();
358
359 let generic = Generic::new(rx, Interest::READ, Mode::Level);
360
361 let mut dispached = false;
362
363 let _generic_token = handle
364 .insert_source(generic, move |readiness, file, d| {
365 assert!(readiness.readable);
366 // we have not registered for writability
367 assert!(!readiness.writable);
368 let mut buffer = vec![0; 10];
369 let ret = (&**file).read(&mut buffer).unwrap();
370 assert_eq!(ret, 6);
371 assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
372
373 *d = true;
374 Ok(PostAction::Continue)
375 })
376 .unwrap();
377
378 event_loop
379 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
380 .unwrap();
381
382 assert!(!dispached);
383
384 let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
385 assert_eq!(ret, 6);
386 tx.flush().unwrap();
387
388 event_loop
389 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
390 .unwrap();
391
392 assert!(dispached);
393 }
394
395 #[test]
396 fn register_deregister_unix() {
397 use std::os::unix::net::UnixStream;
398
399 let mut event_loop = crate::EventLoop::try_new().unwrap();
400
401 let handle = event_loop.handle();
402
403 let (mut tx, rx) = UnixStream::pair().unwrap();
404
405 let generic = Generic::new(rx, Interest::READ, Mode::Level);
406 let dispatcher = Dispatcher::new(generic, move |_, _, d| {
407 *d = true;
408 Ok(PostAction::Continue)
409 });
410
411 let mut dispached = false;
412
413 let generic_token = handle.register_dispatcher(dispatcher.clone()).unwrap();
414
415 event_loop
416 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
417 .unwrap();
418
419 assert!(!dispached);
420
421 // remove the source, and then write something
422
423 event_loop.handle().remove(generic_token);
424
425 let ret = tx.write(&[1, 2, 3, 4, 5, 6]).unwrap();
426 assert_eq!(ret, 6);
427 tx.flush().unwrap();
428
429 event_loop
430 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
431 .unwrap();
432
433 // the source has not been dispatched, as the source is no longer here
434 assert!(!dispached);
435
436 // insert it again
437 let generic = dispatcher.into_source_inner();
438 let _generic_token = handle
439 .insert_source(generic, move |readiness, file, d| {
440 assert!(readiness.readable);
441 // we have not registered for writability
442 assert!(!readiness.writable);
443 let mut buffer = vec![0; 10];
444 let ret = (&**file).read(&mut buffer).unwrap();
445 assert_eq!(ret, 6);
446 assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);
447
448 *d = true;
449 Ok(PostAction::Continue)
450 })
451 .unwrap();
452
453 event_loop
454 .dispatch(Some(::std::time::Duration::ZERO), &mut dispached)
455 .unwrap();
456
457 // the has now been properly dispatched
458 assert!(dispached);
459 }
460
461 // Duplicate insertion does not fail on all platforms, but does on Linux
462 #[cfg(target_os = "linux")]
463 #[test]
464 fn duplicate_insert() {
465 use std::os::unix::{
466 io::{AsFd, BorrowedFd},
467 net::UnixStream,
468 };
469 let event_loop = crate::EventLoop::<()>::try_new().unwrap();
470
471 let handle = event_loop.handle();
472
473 let (_, rx) = UnixStream::pair().unwrap();
474
475 // Rc only implements AsFd since 1.69...
476 struct RcFd<T> {
477 rc: std::rc::Rc<T>,
478 }
479
480 impl<T: AsFd> AsFd for RcFd<T> {
481 fn as_fd(&self) -> BorrowedFd<'_> {
482 self.rc.as_fd()
483 }
484 }
485
486 let rx = std::rc::Rc::new(rx);
487
488 let token = handle
489 .insert_source(
490 Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
491 |_, _, _| Ok(PostAction::Continue),
492 )
493 .unwrap();
494
495 // inserting the same FD a second time should fail
496 let ret = handle.insert_source(
497 Generic::new(RcFd { rc: rx.clone() }, Interest::READ, Mode::Level),
498 |_, _, _| Ok(PostAction::Continue),
499 );
500 assert!(ret.is_err());
501 std::mem::drop(ret);
502
503 // but the original token is still registered
504 handle.update(&token).unwrap();
505 }
506}
507