1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::task::{Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use async_lock::OnceCell;
16use concurrent_queue::ConcurrentQueue;
17use futures_lite::ready;
18use polling::{Event, Events, Poller};
19use slab::Slab;
20
21// Choose the proper implementation of `Registration` based on the target platform.
22cfg_if::cfg_if! {
23 if #[cfg(windows)] {
24 mod windows;
25 pub use windows::Registration;
26 } else if #[cfg(any(
27 target_vendor = "apple",
28 target_os = "freebsd",
29 target_os = "netbsd",
30 target_os = "openbsd",
31 target_os = "dragonfly",
32 ))] {
33 mod kqueue;
34 pub use kqueue::Registration;
35 } else if #[cfg(unix)] {
36 mod unix;
37 pub use unix::Registration;
38 } else {
39 compile_error!("unsupported platform");
40 }
41}
42
43#[cfg(not(target_os = "espidf"))]
44const TIMER_QUEUE_SIZE: usize = 1000;
45
46/// ESP-IDF - being an embedded OS - does not need so many timers
47/// and this saves ~ 20K RAM which is a lot for an MCU with RAM < 400K
48#[cfg(target_os = "espidf")]
49const TIMER_QUEUE_SIZE: usize = 100;
50
51const READ: usize = 0;
52const WRITE: usize = 1;
53
54/// The reactor.
55///
56/// There is only one global instance of this type, accessible by [`Reactor::get()`].
57pub(crate) struct Reactor {
58 /// Portable bindings to epoll/kqueue/event ports/IOCP.
59 ///
60 /// This is where I/O is polled, producing I/O events.
61 pub(crate) poller: Poller,
62
63 /// Ticker bumped before polling.
64 ///
65 /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
66 /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
67 /// methods must make sure they don't receive stale I/O events - they only accept events from a
68 /// fresh "round" of `ReactorLock::react()`.
69 ticker: AtomicUsize,
70
71 /// Registered sources.
72 sources: Mutex<Slab<Arc<Source>>>,
73
74 /// Temporary storage for I/O events when polling the reactor.
75 ///
76 /// Holding a lock on this event list implies the exclusive right to poll I/O.
77 events: Mutex<Events>,
78
79 /// An ordered map of registered timers.
80 ///
81 /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
82 /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
83 /// timer.
84 timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
85
86 /// A queue of timer operations (insert and remove).
87 ///
88 /// When inserting or removing a timer, we don't process it immediately - we just push it into
89 /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
90 timer_ops: ConcurrentQueue<TimerOp>,
91}
92
93impl Reactor {
94 /// Returns a reference to the reactor.
95 pub(crate) fn get() -> &'static Reactor {
96 static REACTOR: OnceCell<Reactor> = OnceCell::new();
97
98 REACTOR.get_or_init_blocking(|| {
99 crate::driver::init();
100 Reactor {
101 poller: Poller::new().expect("cannot initialize I/O event notification"),
102 ticker: AtomicUsize::new(0),
103 sources: Mutex::new(Slab::new()),
104 events: Mutex::new(Events::new()),
105 timers: Mutex::new(BTreeMap::new()),
106 timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
107 }
108 })
109 }
110
111 /// Returns the current ticker.
112 pub(crate) fn ticker(&self) -> usize {
113 self.ticker.load(Ordering::SeqCst)
114 }
115
116 /// Registers an I/O source in the reactor.
117 pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
118 // Create an I/O source for this file descriptor.
119 let source = {
120 let mut sources = self.sources.lock().unwrap();
121 let key = sources.vacant_entry().key();
122 let source = Arc::new(Source {
123 registration: raw,
124 key,
125 state: Default::default(),
126 });
127 sources.insert(source.clone());
128 source
129 };
130
131 // Register the file descriptor.
132 if let Err(err) = source.registration.add(&self.poller, source.key) {
133 let mut sources = self.sources.lock().unwrap();
134 sources.remove(source.key);
135 return Err(err);
136 }
137
138 Ok(source)
139 }
140
141 /// Deregisters an I/O source from the reactor.
142 pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
143 let mut sources = self.sources.lock().unwrap();
144 sources.remove(source.key);
145 source.registration.delete(&self.poller)
146 }
147
148 /// Registers a timer in the reactor.
149 ///
150 /// Returns the inserted timer's ID.
151 pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
152 // Generate a new timer ID.
153 static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
154 let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
155
156 // Push an insert operation.
157 while self
158 .timer_ops
159 .push(TimerOp::Insert(when, id, waker.clone()))
160 .is_err()
161 {
162 // If the queue is full, drain it and try again.
163 let mut timers = self.timers.lock().unwrap();
164 self.process_timer_ops(&mut timers);
165 }
166
167 // Notify that a timer has been inserted.
168 self.notify();
169
170 id
171 }
172
173 /// Deregisters a timer from the reactor.
174 pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
175 // Push a remove operation.
176 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
177 // If the queue is full, drain it and try again.
178 let mut timers = self.timers.lock().unwrap();
179 self.process_timer_ops(&mut timers);
180 }
181 }
182
183 /// Notifies the thread blocked on the reactor.
184 pub(crate) fn notify(&self) {
185 self.poller.notify().expect("failed to notify reactor");
186 }
187
188 /// Locks the reactor, potentially blocking if the lock is held by another thread.
189 pub(crate) fn lock(&self) -> ReactorLock<'_> {
190 let reactor = self;
191 let events = self.events.lock().unwrap();
192 ReactorLock { reactor, events }
193 }
194
195 /// Attempts to lock the reactor.
196 pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
197 self.events.try_lock().ok().map(|events| {
198 let reactor = self;
199 ReactorLock { reactor, events }
200 })
201 }
202
203 /// Processes ready timers and extends the list of wakers to wake.
204 ///
205 /// Returns the duration until the next timer before this method was called.
206 fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
207 let span = tracing::trace_span!("process_timers");
208 let _enter = span.enter();
209
210 let mut timers = self.timers.lock().unwrap();
211 self.process_timer_ops(&mut timers);
212
213 let now = Instant::now();
214
215 // Split timers into ready and pending timers.
216 //
217 // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
218 // ready.
219 let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
220 let ready = mem::replace(&mut *timers, pending);
221
222 // Calculate the duration until the next event.
223 let dur = if ready.is_empty() {
224 // Duration until the next timer.
225 timers
226 .keys()
227 .next()
228 .map(|(when, _)| when.saturating_duration_since(now))
229 } else {
230 // Timers are about to fire right now.
231 Some(Duration::from_secs(0))
232 };
233
234 // Drop the lock before waking.
235 drop(timers);
236
237 // Add wakers to the list.
238 tracing::trace!("{} ready wakers", ready.len());
239
240 for (_, waker) in ready {
241 wakers.push(waker);
242 }
243
244 dur
245 }
246
247 /// Processes queued timer operations.
248 fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
249 // Process only as much as fits into the queue, or else this loop could in theory run
250 // forever.
251 self.timer_ops
252 .try_iter()
253 .take(self.timer_ops.capacity().unwrap())
254 .for_each(|op| match op {
255 TimerOp::Insert(when, id, waker) => {
256 timers.insert((when, id), waker);
257 }
258 TimerOp::Remove(when, id) => {
259 timers.remove(&(when, id));
260 }
261 });
262 }
263}
264
265/// A lock on the reactor.
266pub(crate) struct ReactorLock<'a> {
267 reactor: &'a Reactor,
268 events: MutexGuard<'a, Events>,
269}
270
271impl ReactorLock<'_> {
272 /// Processes new events, blocking until the first event or the timeout.
273 pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
274 let span = tracing::trace_span!("react");
275 let _enter = span.enter();
276
277 let mut wakers = Vec::new();
278
279 // Process ready timers.
280 let next_timer = self.reactor.process_timers(&mut wakers);
281
282 // compute the timeout for blocking on I/O events.
283 let timeout = match (next_timer, timeout) {
284 (None, None) => None,
285 (Some(t), None) | (None, Some(t)) => Some(t),
286 (Some(a), Some(b)) => Some(a.min(b)),
287 };
288
289 // Bump the ticker before polling I/O.
290 let tick = self
291 .reactor
292 .ticker
293 .fetch_add(1, Ordering::SeqCst)
294 .wrapping_add(1);
295
296 self.events.clear();
297
298 // Block on I/O events.
299 let res = match self.reactor.poller.wait(&mut self.events, timeout) {
300 // No I/O events occurred.
301 Ok(0) => {
302 if timeout != Some(Duration::from_secs(0)) {
303 // The non-zero timeout was hit so fire ready timers.
304 self.reactor.process_timers(&mut wakers);
305 }
306 Ok(())
307 }
308
309 // At least one I/O event occurred.
310 Ok(_) => {
311 // Iterate over sources in the event list.
312 let sources = self.reactor.sources.lock().unwrap();
313
314 for ev in self.events.iter() {
315 // Check if there is a source in the table with this key.
316 if let Some(source) = sources.get(ev.key) {
317 let mut state = source.state.lock().unwrap();
318
319 // Collect wakers if a writability event was emitted.
320 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
321 if emitted {
322 state[dir].tick = tick;
323 state[dir].drain_into(&mut wakers);
324 }
325 }
326
327 // Re-register if there are still writers or readers. This can happen if
328 // e.g. we were previously interested in both readability and writability,
329 // but only one of them was emitted.
330 if !state[READ].is_empty() || !state[WRITE].is_empty() {
331 // Create the event that we are interested in.
332 let event = {
333 let mut event = Event::none(source.key);
334 event.readable = !state[READ].is_empty();
335 event.writable = !state[WRITE].is_empty();
336 event
337 };
338
339 // Register interest in this event.
340 source.registration.modify(&self.reactor.poller, event)?;
341 }
342 }
343 }
344
345 Ok(())
346 }
347
348 // The syscall was interrupted.
349 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
350
351 // An actual error occureed.
352 Err(err) => Err(err),
353 };
354
355 // Wake up ready tasks.
356 tracing::trace!("{} ready wakers", wakers.len());
357 for waker in wakers {
358 // Don't let a panicking waker blow everything up.
359 panic::catch_unwind(|| waker.wake()).ok();
360 }
361
362 res
363 }
364}
365
366/// A single timer operation.
367enum TimerOp {
368 Insert(Instant, usize, Waker),
369 Remove(Instant, usize),
370}
371
372/// A registered source of I/O events.
373#[derive(Debug)]
374pub(crate) struct Source {
375 /// This source's registration into the reactor.
376 registration: Registration,
377
378 /// The key of this source obtained during registration.
379 key: usize,
380
381 /// Inner state with registered wakers.
382 state: Mutex<[Direction; 2]>,
383}
384
385/// A read or write direction.
386#[derive(Debug, Default)]
387struct Direction {
388 /// Last reactor tick that delivered an event.
389 tick: usize,
390
391 /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
392 ticks: Option<(usize, usize)>,
393
394 /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
395 waker: Option<Waker>,
396
397 /// Wakers of tasks waiting for the next event.
398 ///
399 /// Registered by `Async::readable()` and `Async::writable()`.
400 wakers: Slab<Option<Waker>>,
401}
402
403impl Direction {
404 /// Returns `true` if there are no wakers interested in this direction.
405 fn is_empty(&self) -> bool {
406 self.waker.is_none() && self.wakers.iter().all(|(_, opt: &Option)| opt.is_none())
407 }
408
409 /// Moves all wakers into a `Vec`.
410 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
411 if let Some(w: Waker) = self.waker.take() {
412 dst.push(w);
413 }
414 for (_, opt: &mut Option) in self.wakers.iter_mut() {
415 if let Some(w: Waker) = opt.take() {
416 dst.push(w);
417 }
418 }
419 }
420}
421
422impl Source {
423 /// Polls the I/O source for readability.
424 pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
425 self.poll_ready(READ, cx)
426 }
427
428 /// Polls the I/O source for writability.
429 pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
430 self.poll_ready(WRITE, cx)
431 }
432
433 /// Registers a waker from `poll_readable()` or `poll_writable()`.
434 ///
435 /// If a different waker is already registered, it gets replaced and woken.
436 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
437 let mut state = self.state.lock().unwrap();
438
439 // Check if the reactor has delivered an event.
440 if let Some((a, b)) = state[dir].ticks {
441 // If `state[dir].tick` has changed to a value other than the old reactor tick,
442 // that means a newer reactor tick has delivered an event.
443 if state[dir].tick != a && state[dir].tick != b {
444 state[dir].ticks = None;
445 return Poll::Ready(Ok(()));
446 }
447 }
448
449 let was_empty = state[dir].is_empty();
450
451 // Register the current task's waker.
452 if let Some(w) = state[dir].waker.take() {
453 if w.will_wake(cx.waker()) {
454 state[dir].waker = Some(w);
455 return Poll::Pending;
456 }
457 // Wake the previous waker because it's going to get replaced.
458 panic::catch_unwind(|| w.wake()).ok();
459 }
460 state[dir].waker = Some(cx.waker().clone());
461 state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
462
463 // Update interest in this I/O handle.
464 if was_empty {
465 // Create the event that we are interested in.
466 let event = {
467 let mut event = Event::none(self.key);
468 event.readable = !state[READ].is_empty();
469 event.writable = !state[WRITE].is_empty();
470 event
471 };
472
473 // Register interest in it.
474 self.registration.modify(&Reactor::get().poller, event)?;
475 }
476
477 Poll::Pending
478 }
479
480 /// Waits until the I/O source is readable.
481 pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
482 Readable(Self::ready(handle, READ))
483 }
484
485 /// Waits until the I/O source is readable.
486 pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
487 ReadableOwned(Self::ready(handle, READ))
488 }
489
490 /// Waits until the I/O source is writable.
491 pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
492 Writable(Self::ready(handle, WRITE))
493 }
494
495 /// Waits until the I/O source is writable.
496 pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
497 WritableOwned(Self::ready(handle, WRITE))
498 }
499
500 /// Waits until the I/O source is readable or writable.
501 fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
502 Ready {
503 handle,
504 dir,
505 ticks: None,
506 index: None,
507 _capture: PhantomData,
508 }
509 }
510}
511
512/// Future for [`Async::readable`](crate::Async::readable).
513#[must_use = "futures do nothing unless you `.await` or poll them"]
514pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
515
516impl<T> Future for Readable<'_, T> {
517 type Output = io::Result<()>;
518
519 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
520 ready!(Pin::new(&mut self.0).poll(cx))?;
521 tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
522 Poll::Ready(Ok(()))
523 }
524}
525
526impl<T> fmt::Debug for Readable<'_, T> {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 f.debug_struct(name:"Readable").finish()
529 }
530}
531
532/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
533#[must_use = "futures do nothing unless you `.await` or poll them"]
534pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
535
536impl<T> Future for ReadableOwned<T> {
537 type Output = io::Result<()>;
538
539 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
540 ready!(Pin::new(&mut self.0).poll(cx))?;
541 tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
542 Poll::Ready(Ok(()))
543 }
544}
545
546impl<T> fmt::Debug for ReadableOwned<T> {
547 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
548 f.debug_struct(name:"ReadableOwned").finish()
549 }
550}
551
552/// Future for [`Async::writable`](crate::Async::writable).
553#[must_use = "futures do nothing unless you `.await` or poll them"]
554pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
555
556impl<T> Future for Writable<'_, T> {
557 type Output = io::Result<()>;
558
559 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
560 ready!(Pin::new(&mut self.0).poll(cx))?;
561 tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
562 Poll::Ready(Ok(()))
563 }
564}
565
566impl<T> fmt::Debug for Writable<'_, T> {
567 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
568 f.debug_struct(name:"Writable").finish()
569 }
570}
571
572/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
573#[must_use = "futures do nothing unless you `.await` or poll them"]
574pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
575
576impl<T> Future for WritableOwned<T> {
577 type Output = io::Result<()>;
578
579 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
580 ready!(Pin::new(&mut self.0).poll(cx))?;
581 tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
582 Poll::Ready(Ok(()))
583 }
584}
585
586impl<T> fmt::Debug for WritableOwned<T> {
587 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
588 f.debug_struct(name:"WritableOwned").finish()
589 }
590}
591
592struct Ready<H: Borrow<crate::Async<T>>, T> {
593 handle: H,
594 dir: usize,
595 ticks: Option<(usize, usize)>,
596 index: Option<usize>,
597 _capture: PhantomData<fn() -> T>,
598}
599
600impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
601
602impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
603 type Output = io::Result<()>;
604
605 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
606 let Self {
607 ref handle,
608 dir,
609 ticks,
610 index,
611 ..
612 } = &mut *self;
613
614 let mut state = handle.borrow().source.state.lock().unwrap();
615
616 // Check if the reactor has delivered an event.
617 if let Some((a, b)) = *ticks {
618 // If `state[dir].tick` has changed to a value other than the old reactor tick,
619 // that means a newer reactor tick has delivered an event.
620 if state[*dir].tick != a && state[*dir].tick != b {
621 return Poll::Ready(Ok(()));
622 }
623 }
624
625 let was_empty = state[*dir].is_empty();
626
627 // Register the current task's waker.
628 let i = match *index {
629 Some(i) => i,
630 None => {
631 let i = state[*dir].wakers.insert(None);
632 *index = Some(i);
633 *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
634 i
635 }
636 };
637 state[*dir].wakers[i] = Some(cx.waker().clone());
638
639 // Update interest in this I/O handle.
640 if was_empty {
641 // Create the event that we are interested in.
642 let event = {
643 let mut event = Event::none(handle.borrow().source.key);
644 event.readable = !state[READ].is_empty();
645 event.writable = !state[WRITE].is_empty();
646 event
647 };
648
649 // Indicate that we are interested in this event.
650 handle
651 .borrow()
652 .source
653 .registration
654 .modify(&Reactor::get().poller, event)?;
655 }
656
657 Poll::Pending
658 }
659}
660
661impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
662 fn drop(&mut self) {
663 // Remove our waker when dropped.
664 if let Some(key: usize) = self.index {
665 let mut state = self.handle.borrow().source.state.lock().unwrap();
666 let wakers: &mut {unknown} = &mut state[self.dir].wakers;
667 if wakers.contains(key) {
668 wakers.remove(key);
669 }
670 }
671 }
672}
673