1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8use std::panic;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::{Arc, Mutex, MutexGuard};
12use std::task::{Context, Poll, Waker};
13use std::time::{Duration, Instant};
14
15use async_lock::OnceCell;
16use concurrent_queue::ConcurrentQueue;
17use futures_lite::ready;
18use polling::{Event, Events, Poller};
19use slab::Slab;
20
21// Choose the proper implementation of `Registration` based on the target platform.
22cfg_if::cfg_if! {
23 if #[cfg(windows)] {
24 mod windows;
25 pub use windows::Registration;
26 } else if #[cfg(any(
27 target_os = "macos",
28 target_os = "ios",
29 target_os = "tvos",
30 target_os = "watchos",
31 target_os = "freebsd",
32 target_os = "netbsd",
33 target_os = "openbsd",
34 target_os = "dragonfly",
35 ))] {
36 mod kqueue;
37 pub use kqueue::Registration;
38 } else if #[cfg(unix)] {
39 mod unix;
40 pub use unix::Registration;
41 } else {
42 compile_error!("unsupported platform");
43 }
44}
45
46#[cfg(not(target_os = "espidf"))]
47const TIMER_QUEUE_SIZE: usize = 1000;
48
49/// ESP-IDF - being an embedded OS - does not need so many timers
50/// and this saves ~ 20K RAM which is a lot for an MCU with RAM < 400K
51#[cfg(target_os = "espidf")]
52const TIMER_QUEUE_SIZE: usize = 100;
53
54const READ: usize = 0;
55const WRITE: usize = 1;
56
57/// The reactor.
58///
59/// There is only one global instance of this type, accessible by [`Reactor::get()`].
60pub(crate) struct Reactor {
61 /// Portable bindings to epoll/kqueue/event ports/IOCP.
62 ///
63 /// This is where I/O is polled, producing I/O events.
64 pub(crate) poller: Poller,
65
66 /// Ticker bumped before polling.
67 ///
68 /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
69 /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
70 /// methods must make sure they don't receive stale I/O events - they only accept events from a
71 /// fresh "round" of `ReactorLock::react()`.
72 ticker: AtomicUsize,
73
74 /// Registered sources.
75 sources: Mutex<Slab<Arc<Source>>>,
76
77 /// Temporary storage for I/O events when polling the reactor.
78 ///
79 /// Holding a lock on this event list implies the exclusive right to poll I/O.
80 events: Mutex<Events>,
81
82 /// An ordered map of registered timers.
83 ///
84 /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
85 /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
86 /// timer.
87 timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
88
89 /// A queue of timer operations (insert and remove).
90 ///
91 /// When inserting or removing a timer, we don't process it immediately - we just push it into
92 /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
93 timer_ops: ConcurrentQueue<TimerOp>,
94}
95
96impl Reactor {
97 /// Returns a reference to the reactor.
98 pub(crate) fn get() -> &'static Reactor {
99 static REACTOR: OnceCell<Reactor> = OnceCell::new();
100
101 REACTOR.get_or_init_blocking(|| {
102 crate::driver::init();
103 Reactor {
104 poller: Poller::new().expect("cannot initialize I/O event notification"),
105 ticker: AtomicUsize::new(0),
106 sources: Mutex::new(Slab::new()),
107 events: Mutex::new(Events::new()),
108 timers: Mutex::new(BTreeMap::new()),
109 timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
110 }
111 })
112 }
113
114 /// Returns the current ticker.
115 pub(crate) fn ticker(&self) -> usize {
116 self.ticker.load(Ordering::SeqCst)
117 }
118
119 /// Registers an I/O source in the reactor.
120 pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
121 // Create an I/O source for this file descriptor.
122 let source = {
123 let mut sources = self.sources.lock().unwrap();
124 let key = sources.vacant_entry().key();
125 let source = Arc::new(Source {
126 registration: raw,
127 key,
128 state: Default::default(),
129 });
130 sources.insert(source.clone());
131 source
132 };
133
134 // Register the file descriptor.
135 if let Err(err) = source.registration.add(&self.poller, source.key) {
136 let mut sources = self.sources.lock().unwrap();
137 sources.remove(source.key);
138 return Err(err);
139 }
140
141 Ok(source)
142 }
143
144 /// Deregisters an I/O source from the reactor.
145 pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
146 let mut sources = self.sources.lock().unwrap();
147 sources.remove(source.key);
148 source.registration.delete(&self.poller)
149 }
150
151 /// Registers a timer in the reactor.
152 ///
153 /// Returns the inserted timer's ID.
154 pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
155 // Generate a new timer ID.
156 static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
157 let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
158
159 // Push an insert operation.
160 while self
161 .timer_ops
162 .push(TimerOp::Insert(when, id, waker.clone()))
163 .is_err()
164 {
165 // If the queue is full, drain it and try again.
166 let mut timers = self.timers.lock().unwrap();
167 self.process_timer_ops(&mut timers);
168 }
169
170 // Notify that a timer has been inserted.
171 self.notify();
172
173 id
174 }
175
176 /// Deregisters a timer from the reactor.
177 pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
178 // Push a remove operation.
179 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
180 // If the queue is full, drain it and try again.
181 let mut timers = self.timers.lock().unwrap();
182 self.process_timer_ops(&mut timers);
183 }
184 }
185
186 /// Notifies the thread blocked on the reactor.
187 pub(crate) fn notify(&self) {
188 self.poller.notify().expect("failed to notify reactor");
189 }
190
191 /// Locks the reactor, potentially blocking if the lock is held by another thread.
192 pub(crate) fn lock(&self) -> ReactorLock<'_> {
193 let reactor = self;
194 let events = self.events.lock().unwrap();
195 ReactorLock { reactor, events }
196 }
197
198 /// Attempts to lock the reactor.
199 pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
200 self.events.try_lock().ok().map(|events| {
201 let reactor = self;
202 ReactorLock { reactor, events }
203 })
204 }
205
206 /// Processes ready timers and extends the list of wakers to wake.
207 ///
208 /// Returns the duration until the next timer before this method was called.
209 fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
210 let span = tracing::trace_span!("process_timers");
211 let _enter = span.enter();
212
213 let mut timers = self.timers.lock().unwrap();
214 self.process_timer_ops(&mut timers);
215
216 let now = Instant::now();
217
218 // Split timers into ready and pending timers.
219 //
220 // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
221 // ready.
222 let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
223 let ready = mem::replace(&mut *timers, pending);
224
225 // Calculate the duration until the next event.
226 let dur = if ready.is_empty() {
227 // Duration until the next timer.
228 timers
229 .keys()
230 .next()
231 .map(|(when, _)| when.saturating_duration_since(now))
232 } else {
233 // Timers are about to fire right now.
234 Some(Duration::from_secs(0))
235 };
236
237 // Drop the lock before waking.
238 drop(timers);
239
240 // Add wakers to the list.
241 tracing::trace!("{} ready wakers", ready.len());
242
243 for (_, waker) in ready {
244 wakers.push(waker);
245 }
246
247 dur
248 }
249
250 /// Processes queued timer operations.
251 fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
252 // Process only as much as fits into the queue, or else this loop could in theory run
253 // forever.
254 self.timer_ops
255 .try_iter()
256 .take(self.timer_ops.capacity().unwrap())
257 .for_each(|op| match op {
258 TimerOp::Insert(when, id, waker) => {
259 timers.insert((when, id), waker);
260 }
261 TimerOp::Remove(when, id) => {
262 timers.remove(&(when, id));
263 }
264 });
265 }
266}
267
268/// A lock on the reactor.
269pub(crate) struct ReactorLock<'a> {
270 reactor: &'a Reactor,
271 events: MutexGuard<'a, Events>,
272}
273
274impl ReactorLock<'_> {
275 /// Processes new events, blocking until the first event or the timeout.
276 pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
277 let span = tracing::trace_span!("react");
278 let _enter = span.enter();
279
280 let mut wakers = Vec::new();
281
282 // Process ready timers.
283 let next_timer = self.reactor.process_timers(&mut wakers);
284
285 // compute the timeout for blocking on I/O events.
286 let timeout = match (next_timer, timeout) {
287 (None, None) => None,
288 (Some(t), None) | (None, Some(t)) => Some(t),
289 (Some(a), Some(b)) => Some(a.min(b)),
290 };
291
292 // Bump the ticker before polling I/O.
293 let tick = self
294 .reactor
295 .ticker
296 .fetch_add(1, Ordering::SeqCst)
297 .wrapping_add(1);
298
299 self.events.clear();
300
301 // Block on I/O events.
302 let res = match self.reactor.poller.wait(&mut self.events, timeout) {
303 // No I/O events occurred.
304 Ok(0) => {
305 if timeout != Some(Duration::from_secs(0)) {
306 // The non-zero timeout was hit so fire ready timers.
307 self.reactor.process_timers(&mut wakers);
308 }
309 Ok(())
310 }
311
312 // At least one I/O event occurred.
313 Ok(_) => {
314 // Iterate over sources in the event list.
315 let sources = self.reactor.sources.lock().unwrap();
316
317 for ev in self.events.iter() {
318 // Check if there is a source in the table with this key.
319 if let Some(source) = sources.get(ev.key) {
320 let mut state = source.state.lock().unwrap();
321
322 // Collect wakers if a writability event was emitted.
323 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
324 if emitted {
325 state[dir].tick = tick;
326 state[dir].drain_into(&mut wakers);
327 }
328 }
329
330 // Re-register if there are still writers or readers. This can happen if
331 // e.g. we were previously interested in both readability and writability,
332 // but only one of them was emitted.
333 if !state[READ].is_empty() || !state[WRITE].is_empty() {
334 // Create the event that we are interested in.
335 let event = {
336 let mut event = Event::none(source.key);
337 event.readable = !state[READ].is_empty();
338 event.writable = !state[WRITE].is_empty();
339 event
340 };
341
342 // Register interest in this event.
343 source.registration.modify(&self.reactor.poller, event)?;
344 }
345 }
346 }
347
348 Ok(())
349 }
350
351 // The syscall was interrupted.
352 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
353
354 // An actual error occureed.
355 Err(err) => Err(err),
356 };
357
358 // Wake up ready tasks.
359 tracing::trace!("{} ready wakers", wakers.len());
360 for waker in wakers {
361 // Don't let a panicking waker blow everything up.
362 panic::catch_unwind(|| waker.wake()).ok();
363 }
364
365 res
366 }
367}
368
369/// A single timer operation.
370enum TimerOp {
371 Insert(Instant, usize, Waker),
372 Remove(Instant, usize),
373}
374
375/// A registered source of I/O events.
376#[derive(Debug)]
377pub(crate) struct Source {
378 /// This source's registration into the reactor.
379 registration: Registration,
380
381 /// The key of this source obtained during registration.
382 key: usize,
383
384 /// Inner state with registered wakers.
385 state: Mutex<[Direction; 2]>,
386}
387
388/// A read or write direction.
389#[derive(Debug, Default)]
390struct Direction {
391 /// Last reactor tick that delivered an event.
392 tick: usize,
393
394 /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
395 ticks: Option<(usize, usize)>,
396
397 /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
398 waker: Option<Waker>,
399
400 /// Wakers of tasks waiting for the next event.
401 ///
402 /// Registered by `Async::readable()` and `Async::writable()`.
403 wakers: Slab<Option<Waker>>,
404}
405
406impl Direction {
407 /// Returns `true` if there are no wakers interested in this direction.
408 fn is_empty(&self) -> bool {
409 self.waker.is_none() && self.wakers.iter().all(|(_, opt: &Option)| opt.is_none())
410 }
411
412 /// Moves all wakers into a `Vec`.
413 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
414 if let Some(w: Waker) = self.waker.take() {
415 dst.push(w);
416 }
417 for (_, opt: &mut Option) in self.wakers.iter_mut() {
418 if let Some(w: Waker) = opt.take() {
419 dst.push(w);
420 }
421 }
422 }
423}
424
425impl Source {
426 /// Polls the I/O source for readability.
427 pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
428 self.poll_ready(READ, cx)
429 }
430
431 /// Polls the I/O source for writability.
432 pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
433 self.poll_ready(WRITE, cx)
434 }
435
436 /// Registers a waker from `poll_readable()` or `poll_writable()`.
437 ///
438 /// If a different waker is already registered, it gets replaced and woken.
439 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
440 let mut state = self.state.lock().unwrap();
441
442 // Check if the reactor has delivered an event.
443 if let Some((a, b)) = state[dir].ticks {
444 // If `state[dir].tick` has changed to a value other than the old reactor tick,
445 // that means a newer reactor tick has delivered an event.
446 if state[dir].tick != a && state[dir].tick != b {
447 state[dir].ticks = None;
448 return Poll::Ready(Ok(()));
449 }
450 }
451
452 let was_empty = state[dir].is_empty();
453
454 // Register the current task's waker.
455 if let Some(w) = state[dir].waker.take() {
456 if w.will_wake(cx.waker()) {
457 state[dir].waker = Some(w);
458 return Poll::Pending;
459 }
460 // Wake the previous waker because it's going to get replaced.
461 panic::catch_unwind(|| w.wake()).ok();
462 }
463 state[dir].waker = Some(cx.waker().clone());
464 state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
465
466 // Update interest in this I/O handle.
467 if was_empty {
468 // Create the event that we are interested in.
469 let event = {
470 let mut event = Event::none(self.key);
471 event.readable = !state[READ].is_empty();
472 event.writable = !state[WRITE].is_empty();
473 event
474 };
475
476 // Register interest in it.
477 self.registration.modify(&Reactor::get().poller, event)?;
478 }
479
480 Poll::Pending
481 }
482
483 /// Waits until the I/O source is readable.
484 pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
485 Readable(Self::ready(handle, READ))
486 }
487
488 /// Waits until the I/O source is readable.
489 pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
490 ReadableOwned(Self::ready(handle, READ))
491 }
492
493 /// Waits until the I/O source is writable.
494 pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
495 Writable(Self::ready(handle, WRITE))
496 }
497
498 /// Waits until the I/O source is writable.
499 pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
500 WritableOwned(Self::ready(handle, WRITE))
501 }
502
503 /// Waits until the I/O source is readable or writable.
504 fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
505 Ready {
506 handle,
507 dir,
508 ticks: None,
509 index: None,
510 _capture: PhantomData,
511 }
512 }
513}
514
515/// Future for [`Async::readable`](crate::Async::readable).
516#[must_use = "futures do nothing unless you `.await` or poll them"]
517pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
518
519impl<T> Future for Readable<'_, T> {
520 type Output = io::Result<()>;
521
522 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
523 ready!(Pin::new(&mut self.0).poll(cx))?;
524 tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
525 Poll::Ready(Ok(()))
526 }
527}
528
529impl<T> fmt::Debug for Readable<'_, T> {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 f.debug_struct(name:"Readable").finish()
532 }
533}
534
535/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
536#[must_use = "futures do nothing unless you `.await` or poll them"]
537pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
538
539impl<T> Future for ReadableOwned<T> {
540 type Output = io::Result<()>;
541
542 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
543 ready!(Pin::new(&mut self.0).poll(cx))?;
544 tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
545 Poll::Ready(Ok(()))
546 }
547}
548
549impl<T> fmt::Debug for ReadableOwned<T> {
550 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
551 f.debug_struct(name:"ReadableOwned").finish()
552 }
553}
554
555/// Future for [`Async::writable`](crate::Async::writable).
556#[must_use = "futures do nothing unless you `.await` or poll them"]
557pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
558
559impl<T> Future for Writable<'_, T> {
560 type Output = io::Result<()>;
561
562 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
563 ready!(Pin::new(&mut self.0).poll(cx))?;
564 tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
565 Poll::Ready(Ok(()))
566 }
567}
568
569impl<T> fmt::Debug for Writable<'_, T> {
570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571 f.debug_struct(name:"Writable").finish()
572 }
573}
574
575/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
576#[must_use = "futures do nothing unless you `.await` or poll them"]
577pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
578
579impl<T> Future for WritableOwned<T> {
580 type Output = io::Result<()>;
581
582 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
583 ready!(Pin::new(&mut self.0).poll(cx))?;
584 tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
585 Poll::Ready(Ok(()))
586 }
587}
588
589impl<T> fmt::Debug for WritableOwned<T> {
590 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591 f.debug_struct(name:"WritableOwned").finish()
592 }
593}
594
595struct Ready<H: Borrow<crate::Async<T>>, T> {
596 handle: H,
597 dir: usize,
598 ticks: Option<(usize, usize)>,
599 index: Option<usize>,
600 _capture: PhantomData<fn() -> T>,
601}
602
603impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
604
605impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
606 type Output = io::Result<()>;
607
608 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
609 let Self {
610 ref handle,
611 dir,
612 ticks,
613 index,
614 ..
615 } = &mut *self;
616
617 let mut state = handle.borrow().source.state.lock().unwrap();
618
619 // Check if the reactor has delivered an event.
620 if let Some((a, b)) = *ticks {
621 // If `state[dir].tick` has changed to a value other than the old reactor tick,
622 // that means a newer reactor tick has delivered an event.
623 if state[*dir].tick != a && state[*dir].tick != b {
624 return Poll::Ready(Ok(()));
625 }
626 }
627
628 let was_empty = state[*dir].is_empty();
629
630 // Register the current task's waker.
631 let i = match *index {
632 Some(i) => i,
633 None => {
634 let i = state[*dir].wakers.insert(None);
635 *index = Some(i);
636 *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
637 i
638 }
639 };
640 state[*dir].wakers[i] = Some(cx.waker().clone());
641
642 // Update interest in this I/O handle.
643 if was_empty {
644 // Create the event that we are interested in.
645 let event = {
646 let mut event = Event::none(handle.borrow().source.key);
647 event.readable = !state[READ].is_empty();
648 event.writable = !state[WRITE].is_empty();
649 event
650 };
651
652 // Indicate that we are interested in this event.
653 handle
654 .borrow()
655 .source
656 .registration
657 .modify(&Reactor::get().poller, event)?;
658 }
659
660 Poll::Pending
661 }
662}
663
664impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
665 fn drop(&mut self) {
666 // Remove our waker when dropped.
667 if let Some(key: usize) = self.index {
668 let mut state = self.handle.borrow().source.state.lock().unwrap();
669 let wakers: &mut {unknown} = &mut state[self.dir].wakers;
670 if wakers.contains(key) {
671 wakers.remove(key);
672 }
673 }
674 }
675}
676