1use std::borrow::Borrow;
2use std::collections::BTreeMap;
3use std::fmt;
4use std::future::Future;
5use std::io;
6use std::marker::PhantomData;
7use std::mem;
8#[cfg(unix)]
9use std::os::unix::io::RawFd;
10#[cfg(windows)]
11use std::os::windows::io::RawSocket;
12use std::panic;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::{Arc, Mutex, MutexGuard};
16use std::task::{Context, Poll, Waker};
17use std::time::{Duration, Instant};
18
19use async_lock::OnceCell;
20use concurrent_queue::ConcurrentQueue;
21use futures_lite::ready;
22use polling::{Event, Poller};
23use slab::Slab;
24
25const READ: usize = 0;
26const WRITE: usize = 1;
27
28/// The reactor.
29///
30/// There is only one global instance of this type, accessible by [`Reactor::get()`].
31pub(crate) struct Reactor {
32 /// Portable bindings to epoll/kqueue/event ports/IOCP.
33 ///
34 /// This is where I/O is polled, producing I/O events.
35 poller: Poller,
36
37 /// Ticker bumped before polling.
38 ///
39 /// This is useful for checking what is the current "round" of `ReactorLock::react()` when
40 /// synchronizing things in `Source::readable()` and `Source::writable()`. Both of those
41 /// methods must make sure they don't receive stale I/O events - they only accept events from a
42 /// fresh "round" of `ReactorLock::react()`.
43 ticker: AtomicUsize,
44
45 /// Registered sources.
46 sources: Mutex<Slab<Arc<Source>>>,
47
48 /// Temporary storage for I/O events when polling the reactor.
49 ///
50 /// Holding a lock on this event list implies the exclusive right to poll I/O.
51 events: Mutex<Vec<Event>>,
52
53 /// An ordered map of registered timers.
54 ///
55 /// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
56 /// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
57 /// timer.
58 timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
59
60 /// A queue of timer operations (insert and remove).
61 ///
62 /// When inserting or removing a timer, we don't process it immediately - we just push it into
63 /// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
64 timer_ops: ConcurrentQueue<TimerOp>,
65}
66
67impl Reactor {
68 /// Returns a reference to the reactor.
69 pub(crate) fn get() -> &'static Reactor {
70 static REACTOR: OnceCell<Reactor> = OnceCell::new();
71
72 REACTOR.get_or_init_blocking(|| {
73 crate::driver::init();
74 Reactor {
75 poller: Poller::new().expect("cannot initialize I/O event notification"),
76 ticker: AtomicUsize::new(0),
77 sources: Mutex::new(Slab::new()),
78 events: Mutex::new(Vec::new()),
79 timers: Mutex::new(BTreeMap::new()),
80 timer_ops: ConcurrentQueue::bounded(1000),
81 }
82 })
83 }
84
85 /// Returns the current ticker.
86 pub(crate) fn ticker(&self) -> usize {
87 self.ticker.load(Ordering::SeqCst)
88 }
89
90 /// Registers an I/O source in the reactor.
91 pub(crate) fn insert_io(
92 &self,
93 #[cfg(unix)] raw: RawFd,
94 #[cfg(windows)] raw: RawSocket,
95 ) -> io::Result<Arc<Source>> {
96 // Create an I/O source for this file descriptor.
97 let source = {
98 let mut sources = self.sources.lock().unwrap();
99 let key = sources.vacant_entry().key();
100 let source = Arc::new(Source {
101 raw,
102 key,
103 state: Default::default(),
104 });
105 sources.insert(source.clone());
106 source
107 };
108
109 // Register the file descriptor.
110 if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
111 let mut sources = self.sources.lock().unwrap();
112 sources.remove(source.key);
113 return Err(err);
114 }
115
116 Ok(source)
117 }
118
119 /// Deregisters an I/O source from the reactor.
120 pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
121 let mut sources = self.sources.lock().unwrap();
122 sources.remove(source.key);
123 self.poller.delete(source.raw)
124 }
125
126 /// Registers a timer in the reactor.
127 ///
128 /// Returns the inserted timer's ID.
129 pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
130 // Generate a new timer ID.
131 static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
132 let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
133
134 // Push an insert operation.
135 while self
136 .timer_ops
137 .push(TimerOp::Insert(when, id, waker.clone()))
138 .is_err()
139 {
140 // If the queue is full, drain it and try again.
141 let mut timers = self.timers.lock().unwrap();
142 self.process_timer_ops(&mut timers);
143 }
144
145 // Notify that a timer has been inserted.
146 self.notify();
147
148 id
149 }
150
151 /// Deregisters a timer from the reactor.
152 pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
153 // Push a remove operation.
154 while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
155 // If the queue is full, drain it and try again.
156 let mut timers = self.timers.lock().unwrap();
157 self.process_timer_ops(&mut timers);
158 }
159 }
160
161 /// Notifies the thread blocked on the reactor.
162 pub(crate) fn notify(&self) {
163 self.poller.notify().expect("failed to notify reactor");
164 }
165
166 /// Locks the reactor, potentially blocking if the lock is held by another thread.
167 pub(crate) fn lock(&self) -> ReactorLock<'_> {
168 let reactor = self;
169 let events = self.events.lock().unwrap();
170 ReactorLock { reactor, events }
171 }
172
173 /// Attempts to lock the reactor.
174 pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
175 self.events.try_lock().ok().map(|events| {
176 let reactor = self;
177 ReactorLock { reactor, events }
178 })
179 }
180
181 /// Processes ready timers and extends the list of wakers to wake.
182 ///
183 /// Returns the duration until the next timer before this method was called.
184 fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
185 let mut timers = self.timers.lock().unwrap();
186 self.process_timer_ops(&mut timers);
187
188 let now = Instant::now();
189
190 // Split timers into ready and pending timers.
191 //
192 // Careful to split just *after* `now`, so that a timer set for exactly `now` is considered
193 // ready.
194 let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
195 let ready = mem::replace(&mut *timers, pending);
196
197 // Calculate the duration until the next event.
198 let dur = if ready.is_empty() {
199 // Duration until the next timer.
200 timers
201 .keys()
202 .next()
203 .map(|(when, _)| when.saturating_duration_since(now))
204 } else {
205 // Timers are about to fire right now.
206 Some(Duration::from_secs(0))
207 };
208
209 // Drop the lock before waking.
210 drop(timers);
211
212 // Add wakers to the list.
213 log::trace!("process_timers: {} ready wakers", ready.len());
214 for (_, waker) in ready {
215 wakers.push(waker);
216 }
217
218 dur
219 }
220
221 /// Processes queued timer operations.
222 fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
223 // Process only as much as fits into the queue, or else this loop could in theory run
224 // forever.
225 for _ in 0..self.timer_ops.capacity().unwrap() {
226 match self.timer_ops.pop() {
227 Ok(TimerOp::Insert(when, id, waker)) => {
228 timers.insert((when, id), waker);
229 }
230 Ok(TimerOp::Remove(when, id)) => {
231 timers.remove(&(when, id));
232 }
233 Err(_) => break,
234 }
235 }
236 }
237}
238
239/// A lock on the reactor.
240pub(crate) struct ReactorLock<'a> {
241 reactor: &'a Reactor,
242 events: MutexGuard<'a, Vec<Event>>,
243}
244
245impl ReactorLock<'_> {
246 /// Processes new events, blocking until the first event or the timeout.
247 pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
248 let mut wakers = Vec::new();
249
250 // Process ready timers.
251 let next_timer = self.reactor.process_timers(&mut wakers);
252
253 // compute the timeout for blocking on I/O events.
254 let timeout = match (next_timer, timeout) {
255 (None, None) => None,
256 (Some(t), None) | (None, Some(t)) => Some(t),
257 (Some(a), Some(b)) => Some(a.min(b)),
258 };
259
260 // Bump the ticker before polling I/O.
261 let tick = self
262 .reactor
263 .ticker
264 .fetch_add(1, Ordering::SeqCst)
265 .wrapping_add(1);
266
267 self.events.clear();
268
269 // Block on I/O events.
270 let res = match self.reactor.poller.wait(&mut self.events, timeout) {
271 // No I/O events occurred.
272 Ok(0) => {
273 if timeout != Some(Duration::from_secs(0)) {
274 // The non-zero timeout was hit so fire ready timers.
275 self.reactor.process_timers(&mut wakers);
276 }
277 Ok(())
278 }
279
280 // At least one I/O event occurred.
281 Ok(_) => {
282 // Iterate over sources in the event list.
283 let sources = self.reactor.sources.lock().unwrap();
284
285 for ev in self.events.iter() {
286 // Check if there is a source in the table with this key.
287 if let Some(source) = sources.get(ev.key) {
288 let mut state = source.state.lock().unwrap();
289
290 // Collect wakers if a writability event was emitted.
291 for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
292 if emitted {
293 state[dir].tick = tick;
294 state[dir].drain_into(&mut wakers);
295 }
296 }
297
298 // Re-register if there are still writers or readers. The can happen if
299 // e.g. we were previously interested in both readability and writability,
300 // but only one of them was emitted.
301 if !state[READ].is_empty() || !state[WRITE].is_empty() {
302 self.reactor.poller.modify(
303 source.raw,
304 Event {
305 key: source.key,
306 readable: !state[READ].is_empty(),
307 writable: !state[WRITE].is_empty(),
308 },
309 )?;
310 }
311 }
312 }
313
314 Ok(())
315 }
316
317 // The syscall was interrupted.
318 Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
319
320 // An actual error occureed.
321 Err(err) => Err(err),
322 };
323
324 // Wake up ready tasks.
325 log::trace!("react: {} ready wakers", wakers.len());
326 for waker in wakers {
327 // Don't let a panicking waker blow everything up.
328 panic::catch_unwind(|| waker.wake()).ok();
329 }
330
331 res
332 }
333}
334
335/// A single timer operation.
336enum TimerOp {
337 Insert(Instant, usize, Waker),
338 Remove(Instant, usize),
339}
340
341/// A registered source of I/O events.
342#[derive(Debug)]
343pub(crate) struct Source {
344 /// Raw file descriptor on Unix platforms.
345 #[cfg(unix)]
346 pub(crate) raw: RawFd,
347
348 /// Raw socket handle on Windows.
349 #[cfg(windows)]
350 pub(crate) raw: RawSocket,
351
352 /// The key of this source obtained during registration.
353 key: usize,
354
355 /// Inner state with registered wakers.
356 state: Mutex<[Direction; 2]>,
357}
358
359/// A read or write direction.
360#[derive(Debug, Default)]
361struct Direction {
362 /// Last reactor tick that delivered an event.
363 tick: usize,
364
365 /// Ticks remembered by `Async::poll_readable()` or `Async::poll_writable()`.
366 ticks: Option<(usize, usize)>,
367
368 /// Waker stored by `Async::poll_readable()` or `Async::poll_writable()`.
369 waker: Option<Waker>,
370
371 /// Wakers of tasks waiting for the next event.
372 ///
373 /// Registered by `Async::readable()` and `Async::writable()`.
374 wakers: Slab<Option<Waker>>,
375}
376
377impl Direction {
378 /// Returns `true` if there are no wakers interested in this direction.
379 fn is_empty(&self) -> bool {
380 self.waker.is_none() && self.wakers.iter().all(|(_, opt: &Option)| opt.is_none())
381 }
382
383 /// Moves all wakers into a `Vec`.
384 fn drain_into(&mut self, dst: &mut Vec<Waker>) {
385 if let Some(w: Waker) = self.waker.take() {
386 dst.push(w);
387 }
388 for (_, opt: &mut Option) in self.wakers.iter_mut() {
389 if let Some(w: Waker) = opt.take() {
390 dst.push(w);
391 }
392 }
393 }
394}
395
396impl Source {
397 /// Polls the I/O source for readability.
398 pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
399 self.poll_ready(READ, cx)
400 }
401
402 /// Polls the I/O source for writability.
403 pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
404 self.poll_ready(WRITE, cx)
405 }
406
407 /// Registers a waker from `poll_readable()` or `poll_writable()`.
408 ///
409 /// If a different waker is already registered, it gets replaced and woken.
410 fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
411 let mut state = self.state.lock().unwrap();
412
413 // Check if the reactor has delivered an event.
414 if let Some((a, b)) = state[dir].ticks {
415 // If `state[dir].tick` has changed to a value other than the old reactor tick,
416 // that means a newer reactor tick has delivered an event.
417 if state[dir].tick != a && state[dir].tick != b {
418 state[dir].ticks = None;
419 return Poll::Ready(Ok(()));
420 }
421 }
422
423 let was_empty = state[dir].is_empty();
424
425 // Register the current task's waker.
426 if let Some(w) = state[dir].waker.take() {
427 if w.will_wake(cx.waker()) {
428 state[dir].waker = Some(w);
429 return Poll::Pending;
430 }
431 // Wake the previous waker because it's going to get replaced.
432 panic::catch_unwind(|| w.wake()).ok();
433 }
434 state[dir].waker = Some(cx.waker().clone());
435 state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
436
437 // Update interest in this I/O handle.
438 if was_empty {
439 Reactor::get().poller.modify(
440 self.raw,
441 Event {
442 key: self.key,
443 readable: !state[READ].is_empty(),
444 writable: !state[WRITE].is_empty(),
445 },
446 )?;
447 }
448
449 Poll::Pending
450 }
451
452 /// Waits until the I/O source is readable.
453 pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
454 Readable(Self::ready(handle, READ))
455 }
456
457 /// Waits until the I/O source is readable.
458 pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
459 ReadableOwned(Self::ready(handle, READ))
460 }
461
462 /// Waits until the I/O source is writable.
463 pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
464 Writable(Self::ready(handle, WRITE))
465 }
466
467 /// Waits until the I/O source is writable.
468 pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
469 WritableOwned(Self::ready(handle, WRITE))
470 }
471
472 /// Waits until the I/O source is readable or writable.
473 fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
474 Ready {
475 handle,
476 dir,
477 ticks: None,
478 index: None,
479 _capture: PhantomData,
480 }
481 }
482}
483
484/// Future for [`Async::readable`](crate::Async::readable).
485#[must_use = "futures do nothing unless you `.await` or poll them"]
486pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
487
488impl<T> Future for Readable<'_, T> {
489 type Output = io::Result<()>;
490
491 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
492 ready!(Pin::new(&mut self.0).poll(cx))?;
493 log::trace!("readable: fd={}", self.0.handle.source.raw);
494 Poll::Ready(Ok(()))
495 }
496}
497
498impl<T> fmt::Debug for Readable<'_, T> {
499 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
500 f.debug_struct(name:"Readable").finish()
501 }
502}
503
504/// Future for [`Async::readable_owned`](crate::Async::readable_owned).
505#[must_use = "futures do nothing unless you `.await` or poll them"]
506pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
507
508impl<T> Future for ReadableOwned<T> {
509 type Output = io::Result<()>;
510
511 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
512 ready!(Pin::new(&mut self.0).poll(cx))?;
513 log::trace!("readable_owned: fd={}", self.0.handle.source.raw);
514 Poll::Ready(Ok(()))
515 }
516}
517
518impl<T> fmt::Debug for ReadableOwned<T> {
519 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
520 f.debug_struct(name:"ReadableOwned").finish()
521 }
522}
523
524/// Future for [`Async::writable`](crate::Async::writable).
525#[must_use = "futures do nothing unless you `.await` or poll them"]
526pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
527
528impl<T> Future for Writable<'_, T> {
529 type Output = io::Result<()>;
530
531 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
532 ready!(Pin::new(&mut self.0).poll(cx))?;
533 log::trace!("writable: fd={}", self.0.handle.source.raw);
534 Poll::Ready(Ok(()))
535 }
536}
537
538impl<T> fmt::Debug for Writable<'_, T> {
539 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
540 f.debug_struct(name:"Writable").finish()
541 }
542}
543
544/// Future for [`Async::writable_owned`](crate::Async::writable_owned).
545#[must_use = "futures do nothing unless you `.await` or poll them"]
546pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
547
548impl<T> Future for WritableOwned<T> {
549 type Output = io::Result<()>;
550
551 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552 ready!(Pin::new(&mut self.0).poll(cx))?;
553 log::trace!("writable_owned: fd={}", self.0.handle.source.raw);
554 Poll::Ready(Ok(()))
555 }
556}
557
558impl<T> fmt::Debug for WritableOwned<T> {
559 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
560 f.debug_struct(name:"WritableOwned").finish()
561 }
562}
563
564struct Ready<H: Borrow<crate::Async<T>>, T> {
565 handle: H,
566 dir: usize,
567 ticks: Option<(usize, usize)>,
568 index: Option<usize>,
569 _capture: PhantomData<fn() -> T>,
570}
571
572impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
573
574impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
575 type Output = io::Result<()>;
576
577 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
578 let Self {
579 ref handle,
580 dir,
581 ticks,
582 index,
583 ..
584 } = &mut *self;
585
586 let mut state = handle.borrow().source.state.lock().unwrap();
587
588 // Check if the reactor has delivered an event.
589 if let Some((a, b)) = *ticks {
590 // If `state[dir].tick` has changed to a value other than the old reactor tick,
591 // that means a newer reactor tick has delivered an event.
592 if state[*dir].tick != a && state[*dir].tick != b {
593 return Poll::Ready(Ok(()));
594 }
595 }
596
597 let was_empty = state[*dir].is_empty();
598
599 // Register the current task's waker.
600 let i = match *index {
601 Some(i) => i,
602 None => {
603 let i = state[*dir].wakers.insert(None);
604 *index = Some(i);
605 *ticks = Some((Reactor::get().ticker(), state[*dir].tick));
606 i
607 }
608 };
609 state[*dir].wakers[i] = Some(cx.waker().clone());
610
611 // Update interest in this I/O handle.
612 if was_empty {
613 Reactor::get().poller.modify(
614 handle.borrow().source.raw,
615 Event {
616 key: handle.borrow().source.key,
617 readable: !state[READ].is_empty(),
618 writable: !state[WRITE].is_empty(),
619 },
620 )?;
621 }
622
623 Poll::Pending
624 }
625}
626
627impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
628 fn drop(&mut self) {
629 // Remove our waker when dropped.
630 if let Some(key: usize) = self.index {
631 let mut state = self.handle.borrow().source.state.lock().unwrap();
632 let wakers: &mut {unknown} = &mut state[self.dir].wakers;
633 if wakers.contains(key) {
634 wakers.remove(key);
635 }
636 }
637 }
638}
639