1use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::Arc, time::Duration};
2
3#[cfg(unix)]
4use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd as Borrowed, RawFd as Raw};
5
6#[cfg(windows)]
7use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket as Borrowed, RawSocket as Raw};
8
9use polling::{Event, Events, PollMode, Poller};
10
11use crate::sources::timer::TimerWheel;
12use crate::token::TokenInner;
13use crate::RegistrationToken;
14
15/// Possible modes for registering a file descriptor
16#[derive(Copy, Clone, Debug)]
17pub enum Mode {
18 /// Single event generation
19 ///
20 /// This FD will be disabled as soon as it has generated one event.
21 ///
22 /// The user will need to use `LoopHandle::update()` to re-enable it if
23 /// desired.
24 OneShot,
25
26 /// Level-triggering
27 ///
28 /// This FD will report events on every poll as long as the requested interests
29 /// are available.
30 Level,
31
32 /// Edge-triggering
33 ///
34 /// This FD will report events only when it *gains* one of the requested interests.
35 /// it must thus be fully processed before it'll generate events again.
36 ///
37 /// This mode is not supported on certain platforms, and an error will be returned
38 /// if it is used.
39 ///
40 /// ## Supported Platforms
41 ///
42 /// As of the time of writing, the platforms that support edge triggered polling are
43 /// as follows:
44 ///
45 /// - Linux/Android
46 /// - macOS/iOS/tvOS/watchOS
47 /// - FreeBSD/OpenBSD/NetBSD/DragonflyBSD
48 Edge,
49}
50
51/// Interest to register regarding the file descriptor
52#[derive(Copy, Clone, Debug)]
53pub struct Interest {
54 /// Wait for the FD to be readable
55 pub readable: bool,
56
57 /// Wait for the FD to be writable
58 pub writable: bool,
59}
60
61impl Interest {
62 /// Shorthand for empty interest
63 pub const EMPTY: Interest = Interest {
64 readable: false,
65 writable: false,
66 };
67
68 /// Shorthand for read interest
69 pub const READ: Interest = Interest {
70 readable: true,
71 writable: false,
72 };
73
74 /// Shorthand for write interest
75 pub const WRITE: Interest = Interest {
76 readable: false,
77 writable: true,
78 };
79
80 /// Shorthand for read and write interest
81 pub const BOTH: Interest = Interest {
82 readable: true,
83 writable: true,
84 };
85}
86
87/// Readiness for a file descriptor notification
88#[derive(Copy, Clone, Debug)]
89pub struct Readiness {
90 /// Is the FD readable
91 pub readable: bool,
92
93 /// Is the FD writable
94 pub writable: bool,
95
96 /// Is the FD in an error state
97 pub error: bool,
98}
99
100impl Readiness {
101 /// Shorthand for empty readiness
102 pub const EMPTY: Readiness = Readiness {
103 readable: false,
104 writable: false,
105 error: false,
106 };
107}
108
109#[derive(Debug)]
110pub(crate) struct PollEvent {
111 pub(crate) readiness: Readiness,
112 pub(crate) token: Token,
113}
114
115/// Factory for creating tokens in your registrations
116///
117/// When composing event sources, each sub-source needs to
118/// have its own token to identify itself. This factory is
119/// provided to produce such unique tokens.
120
121#[derive(Debug)]
122pub struct TokenFactory {
123 next_token: TokenInner,
124}
125
126impl TokenFactory {
127 pub(crate) fn new(token: TokenInner) -> TokenFactory {
128 TokenFactory {
129 next_token: token.forget_sub_id(),
130 }
131 }
132
133 /// Get the "raw" registration token of this TokenFactory
134 pub(crate) fn registration_token(&self) -> RegistrationToken {
135 RegistrationToken::new(self.next_token.forget_sub_id())
136 }
137
138 /// Produce a new unique token
139 pub fn token(&mut self) -> Token {
140 let token: TokenInner = self.next_token;
141 self.next_token = token.increment_sub_id();
142 Token { inner: token }
143 }
144}
145
146/// A token (for implementation of the [`EventSource`](crate::EventSource) trait)
147///
148/// This token is produced by the [`TokenFactory`] and is used when calling the
149/// [`EventSource`](crate::EventSource) implementations to process event, in order
150/// to identify which sub-source produced them.
151///
152/// You should forward it to the [`Poll`] when registering your file descriptors.
153#[derive(Clone, Copy, Debug, PartialEq, Eq)]
154pub struct Token {
155 pub(crate) inner: TokenInner,
156}
157
158/// The polling system
159///
160/// This type represents the polling system of calloop, on which you
161/// can register your file descriptors. This interface is only accessible in
162/// implementations of the [`EventSource`](crate::EventSource) trait.
163///
164/// You only need to interact with this type if you are implementing your
165/// own event sources, while implementing the [`EventSource`](crate::EventSource) trait.
166/// And even in this case, you can often just use the [`Generic`](crate::generic::Generic) event
167/// source and delegate the implementations to it.
168pub struct Poll {
169 /// The handle to wepoll/epoll/kqueue/... used to poll for events.
170 pub(crate) poller: Arc<Poller>,
171
172 /// The buffer of events returned by the poller.
173 events: RefCell<Events>,
174
175 /// The sources registered as level triggered.
176 ///
177 /// Some platforms that `polling` supports do not support level-triggered events. As of the time
178 /// of writing, this only includes Solaris and illumos. To work around this, we emulate level
179 /// triggered events by keeping this map of file descriptors.
180 ///
181 /// One can emulate level triggered events on top of oneshot events by just re-registering the
182 /// file descriptor every time it is polled. However, this is not ideal, as it requires a
183 /// system call every time. It's better to use the intergrated system, if available.
184 level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
185
186 pub(crate) timers: Rc<RefCell<TimerWheel>>,
187}
188
189impl std::fmt::Debug for Poll {
190 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.write_str(data:"Poll { ... }")
193 }
194}
195
196impl Poll {
197 pub(crate) fn new() -> crate::Result<Poll> {
198 Self::new_inner(false)
199 }
200
201 fn new_inner(force_fallback_lt: bool) -> crate::Result<Poll> {
202 let poller = Poller::new()?;
203 let level_triggered = if poller.supports_level() && !force_fallback_lt {
204 None
205 } else {
206 Some(RefCell::new(HashMap::new()))
207 };
208
209 Ok(Poll {
210 poller: Arc::new(poller),
211 events: RefCell::new(Events::new()),
212 timers: Rc::new(RefCell::new(TimerWheel::new())),
213 level_triggered,
214 })
215 }
216
217 pub(crate) fn poll(&self, mut timeout: Option<Duration>) -> crate::Result<Vec<PollEvent>> {
218 let now = std::time::Instant::now();
219
220 // Adjust the timeout for the timers.
221 if let Some(next_timeout) = self.timers.borrow().next_deadline() {
222 if next_timeout <= now {
223 timeout = Some(Duration::ZERO);
224 } else if let Some(deadline) = timeout {
225 timeout = Some(std::cmp::min(deadline, next_timeout - now));
226 } else {
227 timeout = Some(next_timeout - now);
228 }
229 };
230
231 let mut events = self.events.borrow_mut();
232 events.clear();
233 self.poller.wait(&mut events, timeout)?;
234
235 // Convert `polling` events to `calloop` events.
236 let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
237 let mut poll_events = events
238 .iter()
239 .map(|ev| {
240 // If we need to emulate level-triggered events...
241 if let Some(level_triggered) = level_triggered.as_ref() {
242 // ...and this event is from a level-triggered source...
243 if let Some((source, interest)) = level_triggered.get(&ev.key) {
244 // ...then we need to re-register the source.
245 // SAFETY: The source is valid.
246 self.poller
247 .modify(unsafe { Borrowed::borrow_raw(*source) }, *interest)?;
248 }
249 }
250
251 Ok(PollEvent {
252 readiness: Readiness {
253 readable: ev.readable,
254 writable: ev.writable,
255 error: false,
256 },
257 token: Token {
258 inner: TokenInner::from(ev.key),
259 },
260 })
261 })
262 .collect::<std::io::Result<Vec<_>>>()?;
263
264 drop(events);
265
266 // Update 'now' as some time may have elapsed in poll()
267 let now = std::time::Instant::now();
268 let mut timers = self.timers.borrow_mut();
269 while let Some((_, token)) = timers.next_expired(now) {
270 poll_events.push(PollEvent {
271 readiness: Readiness {
272 readable: true,
273 writable: false,
274 error: false,
275 },
276 token,
277 });
278 }
279
280 Ok(poll_events)
281 }
282
283 /// Register a new file descriptor for polling
284 ///
285 /// The file descriptor will be registered with given interest,
286 /// mode and token. This function will fail if given a
287 /// bad file descriptor or if the provided file descriptor is already
288 /// registered.
289 ///
290 /// # Safety
291 ///
292 /// The registered source must not be dropped before it is unregistered.
293 ///
294 /// # Leaking tokens
295 ///
296 /// If your event source is dropped without being unregistered, the token
297 /// passed in here will remain on the heap and continue to be used by the
298 /// polling system even though no event source will match it.
299 pub unsafe fn register(
300 &self,
301 #[cfg(unix)] fd: impl AsFd,
302 #[cfg(windows)] fd: impl AsSocket,
303 interest: Interest,
304 mode: Mode,
305 token: Token,
306 ) -> crate::Result<()> {
307 let raw = {
308 #[cfg(unix)]
309 {
310 fd.as_fd().as_raw_fd()
311 }
312
313 #[cfg(windows)]
314 {
315 fd.as_socket().as_raw_socket()
316 }
317 };
318
319 let ev = cvt_interest(interest, token);
320
321 // SAFETY: See invariant on function.
322 unsafe {
323 self.poller
324 .add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
325 }
326
327 // If this is level triggered and we're emulating level triggered mode...
328 if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
329 // ...then we need to keep track of the source.
330 let mut level_triggered = level_triggered.borrow_mut();
331 level_triggered.insert(ev.key, (raw, ev));
332 }
333
334 Ok(())
335 }
336
337 /// Update the registration for a file descriptor
338 ///
339 /// This allows you to change the interest, mode or token of a file
340 /// descriptor. Fails if the provided fd is not currently registered.
341 ///
342 /// See note on [`register()`](Self::register()) regarding leaking.
343 pub fn reregister(
344 &self,
345 #[cfg(unix)] fd: impl AsFd,
346 #[cfg(windows)] fd: impl AsSocket,
347 interest: Interest,
348 mode: Mode,
349 token: Token,
350 ) -> crate::Result<()> {
351 let (borrowed, raw) = {
352 #[cfg(unix)]
353 {
354 (fd.as_fd(), fd.as_fd().as_raw_fd())
355 }
356
357 #[cfg(windows)]
358 {
359 (fd.as_socket(), fd.as_socket().as_raw_socket())
360 }
361 };
362
363 let ev = cvt_interest(interest, token);
364 self.poller
365 .modify_with_mode(borrowed, ev, cvt_mode(mode, self.poller.supports_level()))?;
366
367 // If this is level triggered and we're emulating level triggered mode...
368 if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
369 // ...then we need to keep track of the source.
370 let mut level_triggered = level_triggered.borrow_mut();
371 level_triggered.insert(ev.key, (raw, ev));
372 }
373
374 Ok(())
375 }
376
377 /// Unregister a file descriptor
378 ///
379 /// This file descriptor will no longer generate events. Fails if the
380 /// provided file descriptor is not currently registered.
381 pub fn unregister(
382 &self,
383 #[cfg(unix)] fd: impl AsFd,
384 #[cfg(windows)] fd: impl AsSocket,
385 ) -> crate::Result<()> {
386 let (borrowed, raw) = {
387 #[cfg(unix)]
388 {
389 (fd.as_fd(), fd.as_fd().as_raw_fd())
390 }
391
392 #[cfg(windows)]
393 {
394 (fd.as_socket(), fd.as_socket().as_raw_socket())
395 }
396 };
397 self.poller.delete(borrowed)?;
398
399 if let Some(level_triggered) = self.level_triggered.as_ref() {
400 let mut level_triggered = level_triggered.borrow_mut();
401 level_triggered.retain(|_, (source, _)| *source != raw);
402 }
403
404 Ok(())
405 }
406
407 /// Get a thread-safe handle which can be used to wake up the `Poll`.
408 pub(crate) fn notifier(&self) -> Notifier {
409 Notifier(self.poller.clone())
410 }
411
412 /// Get a reference to the poller.
413 pub(crate) fn poller(&self) -> &Arc<Poller> {
414 &self.poller
415 }
416}
417
418/// Thread-safe handle which can be used to wake up the `Poll`.
419#[derive(Clone)]
420pub(crate) struct Notifier(Arc<Poller>);
421
422impl Notifier {
423 pub(crate) fn notify(&self) -> crate::Result<()> {
424 self.0.notify()?;
425
426 Ok(())
427 }
428}
429
430fn cvt_interest(interest: Interest, tok: Token) -> Event {
431 let mut event: Event = Event::none(key:tok.inner.into());
432 event.readable = interest.readable;
433 event.writable = interest.writable;
434 event
435}
436
437fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
438 if !supports_other_modes {
439 return PollMode::Oneshot;
440 }
441
442 match mode {
443 Mode::Edge => PollMode::Edge,
444 Mode::Level => PollMode::Level,
445 Mode::OneShot => PollMode::Oneshot,
446 }
447}
448