1use std::cell::Cell;
2use std::future::Future;
3use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::task::{Context, Poll};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use async_lock::OnceCell;
10use futures_lite::pin;
11use waker_fn::waker_fn;
12
13use crate::reactor::Reactor;
14
15/// Number of currently active `block_on()` invocations.
16static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
17
18/// Unparker for the "async-io" thread.
19fn unparker() -> &'static parking::Unparker {
20 static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new();
21
22 UNPARKER.get_or_init_blocking(|| {
23 let (parker: Parker, unparker: Unparker) = parking::pair();
24
25 // Spawn a helper thread driving the reactor.
26 //
27 // Note that this thread is not exactly necessary, it's only here to help push things
28 // forward if there are no `Parker`s around or if `Parker`s are just idling and never
29 // parking.
30 thread::Builder::new()
31 .name("async-io".to_string())
32 .spawn(move || main_loop(parker))
33 .expect(msg:"cannot spawn async-io thread");
34
35 unparker
36 })
37}
38
39/// Initializes the "async-io" thread.
40pub(crate) fn init() {
41 let _ = unparker();
42}
43
44/// The main loop for the "async-io" thread.
45fn main_loop(parker: parking::Parker) {
46 // The last observed reactor tick.
47 let mut last_tick = 0;
48 // Number of sleeps since this thread has called `react()`.
49 let mut sleeps = 0u64;
50
51 loop {
52 let tick = Reactor::get().ticker();
53
54 if last_tick == tick {
55 let reactor_lock = if sleeps >= 10 {
56 // If no new ticks have occurred for a while, stop sleeping and spinning in
57 // this loop and just block on the reactor lock.
58 Some(Reactor::get().lock())
59 } else {
60 Reactor::get().try_lock()
61 };
62
63 if let Some(mut reactor_lock) = reactor_lock {
64 log::trace!("main_loop: waiting on I/O");
65 reactor_lock.react(None).ok();
66 last_tick = Reactor::get().ticker();
67 sleeps = 0;
68 }
69 } else {
70 last_tick = tick;
71 }
72
73 if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
74 // Exponential backoff from 50us to 10ms.
75 let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
76 .get(sleeps as usize)
77 .unwrap_or(&10_000);
78
79 log::trace!("main_loop: sleeping for {} us", delay_us);
80 if parker.park_timeout(Duration::from_micros(*delay_us)) {
81 log::trace!("main_loop: notified");
82
83 // If notified before timeout, reset the last tick and the sleep counter.
84 last_tick = Reactor::get().ticker();
85 sleeps = 0;
86 } else {
87 sleeps += 1;
88 }
89 }
90 }
91}
92
93/// Blocks the current thread on a future, processing I/O events when idle.
94///
95/// # Examples
96///
97/// ```
98/// use async_io::Timer;
99/// use std::time::Duration;
100///
101/// async_io::block_on(async {
102/// // This timer will likely be processed by the current
103/// // thread rather than the fallback "async-io" thread.
104/// Timer::after(Duration::from_millis(1)).await;
105/// });
106/// ```
107pub fn block_on<T>(future: impl Future<Output = T>) -> T {
108 log::trace!("block_on()");
109
110 // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
111 BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
112
113 // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
114 let _guard = CallOnDrop(|| {
115 BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
116 unparker().unpark();
117 });
118
119 // Parker and unparker for notifying the current thread.
120 let (p, u) = parking::pair();
121 // This boolean is set to `true` when the current thread is blocked on I/O.
122 let io_blocked = Arc::new(AtomicBool::new(false));
123
124 thread_local! {
125 // Indicates that the current thread is polling I/O, but not necessarily blocked on it.
126 static IO_POLLING: Cell<bool> = Cell::new(false);
127 }
128
129 // Prepare the waker.
130 let waker = waker_fn({
131 let io_blocked = io_blocked.clone();
132 move || {
133 if u.unpark() {
134 // Check if waking from another thread and if currently blocked on I/O.
135 if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
136 Reactor::get().notify();
137 }
138 }
139 }
140 });
141 let cx = &mut Context::from_waker(&waker);
142 pin!(future);
143
144 loop {
145 // Poll the future.
146 if let Poll::Ready(t) = future.as_mut().poll(cx) {
147 log::trace!("block_on: completed");
148 return t;
149 }
150
151 // Check if a notification was received.
152 if p.park_timeout(Duration::from_secs(0)) {
153 log::trace!("block_on: notified");
154
155 // Try grabbing a lock on the reactor to process I/O events.
156 if let Some(mut reactor_lock) = Reactor::get().try_lock() {
157 // First let wakers know this parker is processing I/O events.
158 IO_POLLING.with(|io| io.set(true));
159 let _guard = CallOnDrop(|| {
160 IO_POLLING.with(|io| io.set(false));
161 });
162
163 // Process available I/O events.
164 reactor_lock.react(Some(Duration::from_secs(0))).ok();
165 }
166 continue;
167 }
168
169 // Try grabbing a lock on the reactor to wait on I/O.
170 if let Some(mut reactor_lock) = Reactor::get().try_lock() {
171 // Record the instant at which the lock was grabbed.
172 let start = Instant::now();
173
174 loop {
175 // First let wakers know this parker is blocked on I/O.
176 IO_POLLING.with(|io| io.set(true));
177 io_blocked.store(true, Ordering::SeqCst);
178 let _guard = CallOnDrop(|| {
179 IO_POLLING.with(|io| io.set(false));
180 io_blocked.store(false, Ordering::SeqCst);
181 });
182
183 // Check if a notification has been received before `io_blocked` was updated
184 // because in that case the reactor won't receive a wakeup.
185 if p.park_timeout(Duration::from_secs(0)) {
186 log::trace!("block_on: notified");
187 break;
188 }
189
190 // Wait for I/O events.
191 log::trace!("block_on: waiting on I/O");
192 reactor_lock.react(None).ok();
193
194 // Check if a notification has been received.
195 if p.park_timeout(Duration::from_secs(0)) {
196 log::trace!("block_on: notified");
197 break;
198 }
199
200 // Check if this thread been handling I/O events for a long time.
201 if start.elapsed() > Duration::from_micros(500) {
202 log::trace!("block_on: stops hogging the reactor");
203
204 // This thread is clearly processing I/O events for some other threads
205 // because it didn't get a notification yet. It's best to stop hogging the
206 // reactor and give other threads a chance to process I/O events for
207 // themselves.
208 drop(reactor_lock);
209
210 // Unpark the "async-io" thread in case no other thread is ready to start
211 // processing I/O events. This way we prevent a potential latency spike.
212 unparker().unpark();
213
214 // Wait for a notification.
215 p.park();
216 break;
217 }
218 }
219 } else {
220 // Wait for an actual notification.
221 log::trace!("block_on: sleep until notification");
222 p.park();
223 }
224 }
225}
226
227/// Runs a closure when dropped.
228struct CallOnDrop<F: Fn()>(F);
229
230impl<F: Fn()> Drop for CallOnDrop<F> {
231 fn drop(&mut self) {
232 (self.0)();
233 }
234}
235