1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::task::Waker;
6use std::task::{Context, Poll};
7use std::thread;
8use std::time::{Duration, Instant};
9
10use async_lock::OnceCell;
11use futures_lite::pin;
12use parking::Parker;
13
14use crate::reactor::Reactor;
15
16/// Number of currently active `block_on()` invocations.
17static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
18
19/// Unparker for the "async-io" thread.
20fn unparker() -> &'static parking::Unparker {
21 static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();
22
23 UNPARKER.get_or_init_blocking(|| {
24 let (parker: Parker, unparker: Unparker) = parking::pair();
25
26 // Spawn a helper thread driving the reactor.
27 //
28 // Note that this thread is not exactly necessary, it's only here to help push things
29 // forward if there are no `Parker`s around or if `Parker`s are just idling and never
30 // parking.
31 thread::Builder::new()
32 .name("async-io".to_string())
33 .spawn(move || main_loop(parker))
34 .expect(msg:"cannot spawn async-io thread");
35
36 unparker
37 })
38}
39
40/// Initializes the "async-io" thread.
41pub(crate) fn init() {
42 let _ = unparker();
43}
44
45/// The main loop for the "async-io" thread.
46fn main_loop(parker: parking::Parker) {
47 let span = tracing::trace_span!("async_io::main_loop");
48 let _enter = span.enter();
49
50 // The last observed reactor tick.
51 let mut last_tick = 0;
52 // Number of sleeps since this thread has called `react()`.
53 let mut sleeps = 0u64;
54
55 loop {
56 let tick = Reactor::get().ticker();
57
58 if last_tick == tick {
59 let reactor_lock = if sleeps >= 10 {
60 // If no new ticks have occurred for a while, stop sleeping and spinning in
61 // this loop and just block on the reactor lock.
62 Some(Reactor::get().lock())
63 } else {
64 Reactor::get().try_lock()
65 };
66
67 if let Some(mut reactor_lock) = reactor_lock {
68 tracing::trace!("waiting on I/O");
69 reactor_lock.react(None).ok();
70 last_tick = Reactor::get().ticker();
71 sleeps = 0;
72 }
73 } else {
74 last_tick = tick;
75 }
76
77 if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
78 // Exponential backoff from 50us to 10ms.
79 let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
80 .get(sleeps as usize)
81 .unwrap_or(&10_000);
82
83 tracing::trace!("sleeping for {} us", delay_us);
84 if parker.park_timeout(Duration::from_micros(*delay_us)) {
85 tracing::trace!("notified");
86
87 // If notified before timeout, reset the last tick and the sleep counter.
88 last_tick = Reactor::get().ticker();
89 sleeps = 0;
90 } else {
91 sleeps += 1;
92 }
93 }
94 }
95}
96
97/// Blocks the current thread on a future, processing I/O events when idle.
98///
99/// # Examples
100///
101/// ```
102/// use async_io::Timer;
103/// use std::time::Duration;
104///
105/// async_io::block_on(async {
106/// // This timer will likely be processed by the current
107/// // thread rather than the fallback "async-io" thread.
108/// Timer::after(Duration::from_millis(1)).await;
109/// });
110/// ```
111pub fn block_on<T>(future: impl Future<Output = T>) -> T {
112 let span = tracing::trace_span!("async_io::block_on");
113 let _enter = span.enter();
114
115 // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
116 BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
117
118 // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
119 let _guard = CallOnDrop(|| {
120 BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
121 unparker().unpark();
122 });
123
124 // Creates a parker and an associated waker that unparks it.
125 fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
126 // Parker and unparker for notifying the current thread.
127 let (p, u) = parking::pair();
128
129 // This boolean is set to `true` when the current thread is blocked on I/O.
130 let io_blocked = Arc::new(AtomicBool::new(false));
131
132 // Prepare the waker.
133 let waker = BlockOnWaker::create(io_blocked.clone(), u);
134
135 (p, waker, io_blocked)
136 }
137
138 thread_local! {
139 // Cached parker and waker for efficiency.
140 static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());
141
142 // Indicates that the current thread is polling I/O, but not necessarily blocked on it.
143 static IO_POLLING: Cell<bool> = const { Cell::new(false) };
144 }
145
146 struct BlockOnWaker {
147 io_blocked: Arc<AtomicBool>,
148 unparker: parking::Unparker,
149 }
150
151 impl BlockOnWaker {
152 fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker {
153 Waker::from(Arc::new(BlockOnWaker {
154 io_blocked,
155 unparker,
156 }))
157 }
158 }
159
160 impl std::task::Wake for BlockOnWaker {
161 fn wake_by_ref(self: &Arc<Self>) {
162 if self.unparker.unpark() {
163 // Check if waking from another thread and if currently blocked on I/O.
164 if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) {
165 Reactor::get().notify();
166 }
167 }
168 }
169
170 fn wake(self: Arc<Self>) {
171 self.wake_by_ref()
172 }
173 }
174
175 CACHE.with(|cache| {
176 // Try grabbing the cached parker and waker.
177 let tmp_cached;
178 let tmp_fresh;
179 let (p, waker, io_blocked) = match cache.try_borrow_mut() {
180 Ok(cache) => {
181 // Use the cached parker and waker.
182 tmp_cached = cache;
183 &*tmp_cached
184 }
185 Err(_) => {
186 // Looks like this is a recursive `block_on()` call.
187 // Create a fresh parker and waker.
188 tmp_fresh = parker_and_waker();
189 &tmp_fresh
190 }
191 };
192
193 pin!(future);
194
195 let cx = &mut Context::from_waker(waker);
196
197 loop {
198 // Poll the future.
199 if let Poll::Ready(t) = future.as_mut().poll(cx) {
200 // Ensure the cached parker is reset to the unnotified state for future block_on calls,
201 // in case this future called wake and then immediately returned Poll::Ready.
202 p.park_timeout(Duration::from_secs(0));
203 tracing::trace!("completed");
204 return t;
205 }
206
207 // Check if a notification was received.
208 if p.park_timeout(Duration::from_secs(0)) {
209 tracing::trace!("notified");
210
211 // Try grabbing a lock on the reactor to process I/O events.
212 if let Some(mut reactor_lock) = Reactor::get().try_lock() {
213 // First let wakers know this parker is processing I/O events.
214 IO_POLLING.with(|io| io.set(true));
215 let _guard = CallOnDrop(|| {
216 IO_POLLING.with(|io| io.set(false));
217 });
218
219 // Process available I/O events.
220 reactor_lock.react(Some(Duration::from_secs(0))).ok();
221 }
222 continue;
223 }
224
225 // Try grabbing a lock on the reactor to wait on I/O.
226 if let Some(mut reactor_lock) = Reactor::get().try_lock() {
227 // Record the instant at which the lock was grabbed.
228 let start = Instant::now();
229
230 loop {
231 // First let wakers know this parker is blocked on I/O.
232 IO_POLLING.with(|io| io.set(true));
233 io_blocked.store(true, Ordering::SeqCst);
234 let _guard = CallOnDrop(|| {
235 IO_POLLING.with(|io| io.set(false));
236 io_blocked.store(false, Ordering::SeqCst);
237 });
238
239 // Check if a notification has been received before `io_blocked` was updated
240 // because in that case the reactor won't receive a wakeup.
241 if p.park_timeout(Duration::from_secs(0)) {
242 tracing::trace!("notified");
243 break;
244 }
245
246 // Wait for I/O events.
247 tracing::trace!("waiting on I/O");
248 reactor_lock.react(None).ok();
249
250 // Check if a notification has been received.
251 if p.park_timeout(Duration::from_secs(0)) {
252 tracing::trace!("notified");
253 break;
254 }
255
256 // Check if this thread been handling I/O events for a long time.
257 if start.elapsed() > Duration::from_micros(500) {
258 tracing::trace!("stops hogging the reactor");
259
260 // This thread is clearly processing I/O events for some other threads
261 // because it didn't get a notification yet. It's best to stop hogging the
262 // reactor and give other threads a chance to process I/O events for
263 // themselves.
264 drop(reactor_lock);
265
266 // Unpark the "async-io" thread in case no other thread is ready to start
267 // processing I/O events. This way we prevent a potential latency spike.
268 unparker().unpark();
269
270 // Wait for a notification.
271 p.park();
272 break;
273 }
274 }
275 } else {
276 // Wait for an actual notification.
277 tracing::trace!("sleep until notification");
278 p.park();
279 }
280 }
281 })
282}
283
284/// Runs a closure when dropped.
285struct CallOnDrop<F: Fn()>(F);
286
287impl<F: Fn()> Drop for CallOnDrop<F> {
288 fn drop(&mut self) {
289 (self.0)();
290 }
291}
292