1//! Portable interface to epoll, kqueue, event ports, and IOCP.
2//!
3//! Supported platforms:
4//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
5//! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
6//! DragonFly BSD
7//! - [event ports](https://illumos.org/man/port_create): illumos, Solaris
8//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems
9//! - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
10//!
11//! By default, polling is done in oneshot mode, which means interest in I/O events needs to
12//! be re-enabled after an event is delivered if we're interested in the next event of the same
13//! kind. However, level and edge triggered modes are also available for certain operating
14//! systems. See the documentation of the [`PollMode`] type for more information.
15//!
16//! Only one thread can be waiting for I/O events at a time.
17//!
18//! # Examples
19//!
20//! ```no_run
21//! use polling::{Event, Poller};
22//! use std::net::TcpListener;
23//!
24//! // Create a TCP listener.
25//! let socket = TcpListener::bind("127.0.0.1:8000")?;
26//! socket.set_nonblocking(true)?;
27//! let key = 7; // Arbitrary key identifying the socket.
28//!
29//! // Create a poller and register interest in readability on the socket.
30//! let poller = Poller::new()?;
31//! poller.add(&socket, Event::readable(key))?;
32//!
33//! // The event loop.
34//! let mut events = Vec::new();
35//! loop {
36//! // Wait for at least one I/O event.
37//! events.clear();
38//! poller.wait(&mut events, None)?;
39//!
40//! for ev in &events {
41//! if ev.key == key {
42//! // Perform a non-blocking accept operation.
43//! socket.accept()?;
44//! // Set interest in the next readability event.
45//! poller.modify(&socket, Event::readable(key))?;
46//! }
47//! }
48//! }
49//! # std::io::Result::Ok(())
50//! ```
51
52#![cfg(feature = "std")]
53#![cfg_attr(not(feature = "std"), no_std)]
54#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
55#![allow(clippy::useless_conversion, clippy::unnecessary_cast)]
56#![cfg_attr(docsrs, feature(doc_cfg))]
57
58use std::fmt;
59use std::io;
60use std::sync::atomic::{AtomicBool, Ordering};
61use std::sync::Mutex;
62use std::time::Duration;
63use std::usize;
64
65use cfg_if::cfg_if;
66
67/// Calls a libc function and results in `io::Result`.
68#[cfg(unix)]
69macro_rules! syscall {
70 ($fn:ident $args:tt) => {{
71 let res = unsafe { libc::$fn $args };
72 if res == -1 {
73 Err(std::io::Error::last_os_error())
74 } else {
75 Ok(res)
76 }
77 }};
78}
79
80cfg_if! {
81 // Note: This cfg is intended to make it easy for polling developers to test
82 // the backend that uses poll, and is not a public API.
83 if #[cfg(polling_test_poll_backend)] {
84 mod poll;
85 use poll as sys;
86 } else if #[cfg(any(target_os = "linux", target_os = "android"))] {
87 mod epoll;
88 use epoll as sys;
89 } else if #[cfg(any(
90 target_os = "illumos",
91 target_os = "solaris",
92 ))] {
93 mod port;
94 use port as sys;
95 } else if #[cfg(any(
96 target_os = "macos",
97 target_os = "ios",
98 target_os = "tvos",
99 target_os = "watchos",
100 target_os = "freebsd",
101 target_os = "netbsd",
102 target_os = "openbsd",
103 target_os = "dragonfly",
104 ))] {
105 mod kqueue;
106 use kqueue as sys;
107 } else if #[cfg(any(
108 target_os = "vxworks",
109 target_os = "fuchsia",
110 target_os = "horizon",
111 unix,
112 ))] {
113 mod poll;
114 use poll as sys;
115 } else if #[cfg(target_os = "windows")] {
116 mod iocp;
117 use iocp as sys;
118 } else {
119 compile_error!("polling does not support this target OS");
120 }
121}
122
123pub mod os;
124
125/// Key associated with notifications.
126const NOTIFY_KEY: usize = std::usize::MAX;
127
128/// Indicates that a file descriptor or socket can read or write without blocking.
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct Event {
131 /// Key identifying the file descriptor or socket.
132 pub key: usize,
133 /// Can it do a read operation without blocking?
134 pub readable: bool,
135 /// Can it do a write operation without blocking?
136 pub writable: bool,
137}
138
139/// The mode in which the poller waits for I/O events.
140#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
141#[non_exhaustive]
142pub enum PollMode {
143 /// Poll in oneshot mode.
144 ///
145 /// In this mode, the poller will only deliver one event per file descriptor or socket.
146 /// Once an event has been delivered, interest in the event needs to be re-enabled
147 /// by calling `Poller::modify` or `Poller::add`.
148 ///
149 /// This is the default mode.
150 Oneshot,
151
152 /// Poll in level-triggered mode.
153 ///
154 /// Once an event has been delivered, polling will continue to deliver that event
155 /// until interest in the event is disabled by calling `Poller::modify` or `Poller::delete`.
156 ///
157 /// Not all operating system support this mode. Trying to register a file descriptor with
158 /// this mode in an unsupported operating system will raise an error. You can check if
159 /// the operating system supports this mode by calling `Poller::supports_level`.
160 Level,
161
162 /// Poll in edge-triggered mode.
163 ///
164 /// Once an event has been delivered, polling will not deliver that event again unless
165 /// a new event occurs.
166 ///
167 /// Not all operating system support this mode. Trying to register a file descriptor with
168 /// this mode in an unsupported operating system will raise an error. You can check if
169 /// the operating system supports this mode by calling `Poller::supports_edge`.
170 Edge,
171
172 /// Poll in both edge-triggered and oneshot mode.
173 ///
174 /// This mode is similar to the `Oneshot` mode, but it will only deliver one event per new
175 /// event.
176 ///
177 /// Not all operating system support this mode. Trying to register a file descriptor with
178 /// this mode in an unsupported operating system will raise an error. You can check if
179 /// the operating system supports this mode by calling `Poller::supports_edge`.
180 EdgeOneshot,
181}
182
183impl Event {
184 /// All kinds of events (readable and writable).
185 ///
186 /// Equivalent to: `Event { key, readable: true, writable: true }`
187 pub fn all(key: usize) -> Event {
188 Event {
189 key,
190 readable: true,
191 writable: true,
192 }
193 }
194
195 /// Only the readable event.
196 ///
197 /// Equivalent to: `Event { key, readable: true, writable: false }`
198 pub fn readable(key: usize) -> Event {
199 Event {
200 key,
201 readable: true,
202 writable: false,
203 }
204 }
205
206 /// Only the writable event.
207 ///
208 /// Equivalent to: `Event { key, readable: false, writable: true }`
209 pub fn writable(key: usize) -> Event {
210 Event {
211 key,
212 readable: false,
213 writable: true,
214 }
215 }
216
217 /// No events.
218 ///
219 /// Equivalent to: `Event { key, readable: false, writable: false }`
220 pub fn none(key: usize) -> Event {
221 Event {
222 key,
223 readable: false,
224 writable: false,
225 }
226 }
227}
228
229/// Waits for I/O events.
230pub struct Poller {
231 poller: sys::Poller,
232 events: Mutex<sys::Events>,
233 notified: AtomicBool,
234}
235
236impl Poller {
237 /// Creates a new poller.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// use polling::Poller;
243 ///
244 /// let poller = Poller::new()?;
245 /// # std::io::Result::Ok(())
246 /// ```
247 pub fn new() -> io::Result<Poller> {
248 Ok(Poller {
249 poller: sys::Poller::new()?,
250 events: Mutex::new(sys::Events::new()),
251 notified: AtomicBool::new(false),
252 })
253 }
254
255 /// Tell whether or not this `Poller` supports level-triggered polling.
256 pub fn supports_level(&self) -> bool {
257 self.poller.supports_level()
258 }
259
260 /// Tell whether or not this `Poller` supports edge-triggered polling.
261 pub fn supports_edge(&self) -> bool {
262 self.poller.supports_edge()
263 }
264
265 /// Adds a file descriptor or socket to the poller.
266 ///
267 /// A file descriptor or socket is considered readable or writable when a read or write
268 /// operation on it would not block. This doesn't mean the read or write operation will
269 /// succeed, it only means the operation will return immediately.
270 ///
271 /// If interest is set in both readability and writability, the two kinds of events might be
272 /// delivered either separately or together.
273 ///
274 /// For example, interest in `Event { key: 7, readable: true, writable: true }` might result in
275 /// a single [`Event`] of the same form, or in two separate [`Event`]s:
276 /// - `Event { key: 7, readable: true, writable: false }`
277 /// - `Event { key: 7, readable: false, writable: true }`
278 ///
279 /// Note that interest in I/O events needs to be re-enabled using
280 /// [`modify()`][`Poller::modify()`] again after an event is delivered if we're interested in
281 /// the next event of the same kind.
282 ///
283 /// Don't forget to [`delete()`][`Poller::delete()`] the file descriptor or socket when it is
284 /// no longer used!
285 ///
286 /// # Errors
287 ///
288 /// This method returns an error in the following situations:
289 ///
290 /// * If `key` equals `usize::MAX` because that key is reserved for internal use.
291 /// * If an error is returned by the syscall.
292 ///
293 /// # Examples
294 ///
295 /// Set interest in all events:
296 ///
297 /// ```no_run
298 /// use polling::{Event, Poller};
299 ///
300 /// let source = std::net::TcpListener::bind("127.0.0.1:0")?;
301 /// source.set_nonblocking(true)?;
302 /// let key = 7;
303 ///
304 /// let poller = Poller::new()?;
305 /// poller.add(&source, Event::all(key))?;
306 /// # std::io::Result::Ok(())
307 /// ```
308 pub fn add(&self, source: impl Source, interest: Event) -> io::Result<()> {
309 self.add_with_mode(source, interest, PollMode::Oneshot)
310 }
311
312 /// Adds a file descriptor or socket to the poller in the specified mode.
313 ///
314 /// This is identical to the `add()` function, but allows specifying the
315 /// polling mode to use for this socket.
316 ///
317 /// # Errors
318 ///
319 /// If the operating system does not support the specified mode, this function
320 /// will return an error.
321 pub fn add_with_mode(
322 &self,
323 source: impl Source,
324 interest: Event,
325 mode: PollMode,
326 ) -> io::Result<()> {
327 if interest.key == NOTIFY_KEY {
328 return Err(io::Error::new(
329 io::ErrorKind::InvalidInput,
330 "the key is not allowed to be `usize::MAX`",
331 ));
332 }
333 self.poller.add(source.raw(), interest, mode)
334 }
335
336 /// Modifies the interest in a file descriptor or socket.
337 ///
338 /// This method has the same behavior as [`add()`][`Poller::add()`] except it modifies the
339 /// interest of a previously added file descriptor or socket.
340 ///
341 /// To use this method with a file descriptor or socket, you must first add it using
342 /// [`add()`][`Poller::add()`].
343 ///
344 /// Note that interest in I/O events needs to be re-enabled using
345 /// [`modify()`][`Poller::modify()`] again after an event is delivered if we're interested in
346 /// the next event of the same kind.
347 ///
348 /// # Errors
349 ///
350 /// This method returns an error in the following situations:
351 ///
352 /// * If `key` equals `usize::MAX` because that key is reserved for internal use.
353 /// * If an error is returned by the syscall.
354 ///
355 /// # Examples
356 ///
357 /// To enable interest in all events:
358 ///
359 /// ```no_run
360 /// # use polling::{Event, Poller};
361 /// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
362 /// # let key = 7;
363 /// # let poller = Poller::new()?;
364 /// # poller.add(&source, Event::none(key))?;
365 /// poller.modify(&source, Event::all(key))?;
366 /// # std::io::Result::Ok(())
367 /// ```
368 ///
369 /// To enable interest in readable events and disable interest in writable events:
370 ///
371 /// ```no_run
372 /// # use polling::{Event, Poller};
373 /// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
374 /// # let key = 7;
375 /// # let poller = Poller::new()?;
376 /// # poller.add(&source, Event::none(key))?;
377 /// poller.modify(&source, Event::readable(key))?;
378 /// # std::io::Result::Ok(())
379 /// ```
380 ///
381 /// To disable interest in readable events and enable interest in writable events:
382 ///
383 /// ```no_run
384 /// # use polling::{Event, Poller};
385 /// # let poller = Poller::new()?;
386 /// # let key = 7;
387 /// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
388 /// # poller.add(&source, Event::none(key))?;
389 /// poller.modify(&source, Event::writable(key))?;
390 /// # std::io::Result::Ok(())
391 /// ```
392 ///
393 /// To disable interest in all events:
394 ///
395 /// ```no_run
396 /// # use polling::{Event, Poller};
397 /// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
398 /// # let key = 7;
399 /// # let poller = Poller::new()?;
400 /// # poller.add(&source, Event::none(key))?;
401 /// poller.modify(&source, Event::none(key))?;
402 /// # std::io::Result::Ok(())
403 /// ```
404 pub fn modify(&self, source: impl Source, interest: Event) -> io::Result<()> {
405 self.modify_with_mode(source, interest, PollMode::Oneshot)
406 }
407
408 /// Modifies interest in a file descriptor or socket to the poller, but with the specified
409 /// mode.
410 ///
411 /// This is identical to the `modify()` function, but allows specifying the polling mode
412 /// to use for this socket.
413 ///
414 /// # Performance Notes
415 ///
416 /// This function can be used to change a source from one polling mode to another. However,
417 /// on some platforms, this switch can cause delays in the delivery of events.
418 ///
419 /// # Errors
420 ///
421 /// If the operating system does not support the specified mode, this function will return
422 /// an error.
423 pub fn modify_with_mode(
424 &self,
425 source: impl Source,
426 interest: Event,
427 mode: PollMode,
428 ) -> io::Result<()> {
429 if interest.key == NOTIFY_KEY {
430 return Err(io::Error::new(
431 io::ErrorKind::InvalidInput,
432 "the key is not allowed to be `usize::MAX`",
433 ));
434 }
435 self.poller.modify(source.raw(), interest, mode)
436 }
437
438 /// Removes a file descriptor or socket from the poller.
439 ///
440 /// Unlike [`add()`][`Poller::add()`], this method only removes the file descriptor or
441 /// socket from the poller without putting it back into blocking mode.
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// use polling::{Event, Poller};
447 /// use std::net::TcpListener;
448 ///
449 /// let socket = TcpListener::bind("127.0.0.1:0")?;
450 /// socket.set_nonblocking(true)?;
451 /// let key = 7;
452 ///
453 /// let poller = Poller::new()?;
454 /// poller.add(&socket, Event::all(key))?;
455 /// poller.delete(&socket)?;
456 /// # std::io::Result::Ok(())
457 /// ```
458 pub fn delete(&self, source: impl Source) -> io::Result<()> {
459 self.poller.delete(source.raw())
460 }
461
462 /// Waits for at least one I/O event and returns the number of new events.
463 ///
464 /// New events will be appended to `events`. If necessary, make sure to clear the [`Vec`]
465 /// before calling [`wait()`][`Poller::wait()`]!
466 ///
467 /// This method will return with no new events if a notification is delivered by the
468 /// [`notify()`] method, or the timeout is reached. Sometimes it may even return with no events
469 /// spuriously.
470 ///
471 /// Only one thread can wait on I/O. If another thread is already in [`wait()`], concurrent
472 /// calls to this method will return immediately with no new events.
473 ///
474 /// If the operating system is ready to deliver a large number of events at once, this method
475 /// may decide to deliver them in smaller batches.
476 ///
477 /// [`notify()`]: `Poller::notify()`
478 /// [`wait()`]: `Poller::wait()`
479 ///
480 /// # Examples
481 ///
482 /// ```
483 /// use polling::{Event, Poller};
484 /// use std::net::TcpListener;
485 /// use std::time::Duration;
486 ///
487 /// let socket = TcpListener::bind("127.0.0.1:0")?;
488 /// socket.set_nonblocking(true)?;
489 /// let key = 7;
490 ///
491 /// let poller = Poller::new()?;
492 /// poller.add(&socket, Event::all(key))?;
493 ///
494 /// let mut events = Vec::new();
495 /// let n = poller.wait(&mut events, Some(Duration::from_secs(1)))?;
496 /// # std::io::Result::Ok(())
497 /// ```
498 pub fn wait(&self, events: &mut Vec<Event>, timeout: Option<Duration>) -> io::Result<usize> {
499 log::trace!("Poller::wait(_, {:?})", timeout);
500
501 if let Ok(mut lock) = self.events.try_lock() {
502 // Wait for I/O events.
503 self.poller.wait(&mut lock, timeout)?;
504
505 // Clear the notification, if any.
506 self.notified.swap(false, Ordering::SeqCst);
507
508 // Collect events.
509 let len = events.len();
510 events.extend(lock.iter().filter(|ev| ev.key != usize::MAX));
511 Ok(events.len() - len)
512 } else {
513 log::trace!("wait: skipping because another thread is already waiting on I/O");
514 Ok(0)
515 }
516 }
517
518 /// Wakes up the current or the following invocation of [`wait()`].
519 ///
520 /// If no thread is calling [`wait()`] right now, this method will cause the following call
521 /// to wake up immediately.
522 ///
523 /// [`wait()`]: `Poller::wait()`
524 ///
525 /// # Examples
526 ///
527 /// ```
528 /// use polling::Poller;
529 ///
530 /// let poller = Poller::new()?;
531 ///
532 /// // Notify the poller.
533 /// poller.notify()?;
534 ///
535 /// let mut events = Vec::new();
536 /// poller.wait(&mut events, None)?; // wakes up immediately
537 /// assert!(events.is_empty());
538 /// # std::io::Result::Ok(())
539 /// ```
540 pub fn notify(&self) -> io::Result<()> {
541 log::trace!("Poller::notify()");
542 if self
543 .notified
544 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
545 .is_ok()
546 {
547 self.poller.notify()?;
548 }
549 Ok(())
550 }
551}
552
553#[cfg(all(
554 any(
555 target_os = "linux",
556 target_os = "android",
557 target_os = "illumos",
558 target_os = "solaris",
559 target_os = "macos",
560 target_os = "ios",
561 target_os = "tvos",
562 target_os = "watchos",
563 target_os = "freebsd",
564 target_os = "netbsd",
565 target_os = "openbsd",
566 target_os = "dragonfly",
567 ),
568 not(polling_test_poll_backend),
569))]
570#[cfg_attr(
571 docsrs,
572 doc(cfg(any(
573 target_os = "linux",
574 target_os = "android",
575 target_os = "illumos",
576 target_os = "solaris",
577 target_os = "macos",
578 target_os = "ios",
579 target_os = "tvos",
580 target_os = "watchos",
581 target_os = "freebsd",
582 target_os = "netbsd",
583 target_os = "openbsd",
584 target_os = "dragonfly",
585 )))
586)]
587mod raw_fd_impl {
588 use crate::Poller;
589 use std::os::unix::io::{AsRawFd, RawFd};
590
591 #[cfg(not(polling_no_io_safety))]
592 use std::os::unix::io::{AsFd, BorrowedFd};
593
594 impl AsRawFd for Poller {
595 fn as_raw_fd(&self) -> RawFd {
596 self.poller.as_raw_fd()
597 }
598 }
599
600 #[cfg(not(polling_no_io_safety))]
601 impl AsFd for Poller {
602 fn as_fd(&self) -> BorrowedFd<'_> {
603 self.poller.as_fd()
604 }
605 }
606}
607
608#[cfg(windows)]
609#[cfg_attr(docsrs, doc(cfg(windows)))]
610mod raw_handle_impl {
611 use crate::Poller;
612 use std::os::windows::io::{AsRawHandle, RawHandle};
613
614 #[cfg(not(polling_no_io_safety))]
615 use std::os::windows::io::{AsHandle, BorrowedHandle};
616
617 impl AsRawHandle for Poller {
618 fn as_raw_handle(&self) -> RawHandle {
619 self.poller.as_raw_handle()
620 }
621 }
622
623 #[cfg(not(polling_no_io_safety))]
624 impl AsHandle for Poller {
625 fn as_handle(&self) -> BorrowedHandle<'_> {
626 self.poller.as_handle()
627 }
628 }
629}
630
631impl fmt::Debug for Poller {
632 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
633 self.poller.fmt(f)
634 }
635}
636
637cfg_if! {
638 if #[cfg(unix)] {
639 use std::os::unix::io::{AsRawFd, RawFd};
640
641 /// A [`RawFd`] or a reference to a type implementing [`AsRawFd`].
642 pub trait Source {
643 /// Returns the [`RawFd`] for this I/O object.
644 fn raw(&self) -> RawFd;
645 }
646
647 impl Source for RawFd {
648 fn raw(&self) -> RawFd {
649 *self
650 }
651 }
652
653 impl<T: AsRawFd> Source for &T {
654 fn raw(&self) -> RawFd {
655 self.as_raw_fd()
656 }
657 }
658 } else if #[cfg(windows)] {
659 use std::os::windows::io::{AsRawSocket, RawSocket};
660
661 /// A [`RawSocket`] or a reference to a type implementing [`AsRawSocket`].
662 pub trait Source {
663 /// Returns the [`RawSocket`] for this I/O object.
664 fn raw(&self) -> RawSocket;
665 }
666
667 impl Source for RawSocket {
668 fn raw(&self) -> RawSocket {
669 *self
670 }
671 }
672
673 impl<T: AsRawSocket> Source for &T {
674 fn raw(&self) -> RawSocket {
675 self.as_raw_socket()
676 }
677 }
678 }
679}
680
681#[allow(unused)]
682fn unsupported_error(err: impl Into<String>) -> io::Error {
683 io::Error::new(
684 #[cfg(not(polling_no_unsupported_error_kind))]
685 io::ErrorKind::Unsupported,
686 #[cfg(polling_no_unsupported_error_kind)]
687 io::ErrorKind::Other,
688 err.into(),
689 )
690}
691