1 | use std::cell::{Cell, RefCell}; |
2 | use std::future::Future; |
3 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
4 | use std::sync::Arc; |
5 | use std::task::Waker; |
6 | use std::task::{Context, Poll}; |
7 | use std::thread; |
8 | use std::time::{Duration, Instant}; |
9 | |
10 | use async_lock::OnceCell; |
11 | use futures_lite::pin; |
12 | use parking::Parker; |
13 | |
14 | use crate::reactor::Reactor; |
15 | |
16 | /// Number of currently active `block_on()` invocations. |
17 | static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); |
18 | |
19 | /// Unparker for the "async-io" thread. |
20 | fn unparker() -> &'static parking::Unparker { |
21 | static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new(); |
22 | |
23 | UNPARKER.get_or_init_blocking(|| { |
24 | let (parker: Parker, unparker: Unparker) = parking::pair(); |
25 | |
26 | // Spawn a helper thread driving the reactor. |
27 | // |
28 | // Note that this thread is not exactly necessary, it's only here to help push things |
29 | // forward if there are no `Parker`s around or if `Parker`s are just idling and never |
30 | // parking. |
31 | thread::Builder::new() |
32 | .name("async-io" .to_string()) |
33 | .spawn(move || main_loop(parker)) |
34 | .expect(msg:"cannot spawn async-io thread" ); |
35 | |
36 | unparker |
37 | }) |
38 | } |
39 | |
40 | /// Initializes the "async-io" thread. |
41 | pub(crate) fn init() { |
42 | let _ = unparker(); |
43 | } |
44 | |
45 | /// The main loop for the "async-io" thread. |
46 | fn main_loop(parker: parking::Parker) { |
47 | let span = tracing::trace_span!("async_io::main_loop" ); |
48 | let _enter = span.enter(); |
49 | |
50 | // The last observed reactor tick. |
51 | let mut last_tick = 0; |
52 | // Number of sleeps since this thread has called `react()`. |
53 | let mut sleeps = 0u64; |
54 | |
55 | loop { |
56 | let tick = Reactor::get().ticker(); |
57 | |
58 | if last_tick == tick { |
59 | let reactor_lock = if sleeps >= 10 { |
60 | // If no new ticks have occurred for a while, stop sleeping and spinning in |
61 | // this loop and just block on the reactor lock. |
62 | Some(Reactor::get().lock()) |
63 | } else { |
64 | Reactor::get().try_lock() |
65 | }; |
66 | |
67 | if let Some(mut reactor_lock) = reactor_lock { |
68 | tracing::trace!("waiting on I/O" ); |
69 | reactor_lock.react(None).ok(); |
70 | last_tick = Reactor::get().ticker(); |
71 | sleeps = 0; |
72 | } |
73 | } else { |
74 | last_tick = tick; |
75 | } |
76 | |
77 | if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 { |
78 | // Exponential backoff from 50us to 10ms. |
79 | let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000] |
80 | .get(sleeps as usize) |
81 | .unwrap_or(&10_000); |
82 | |
83 | tracing::trace!("sleeping for {} us" , delay_us); |
84 | if parker.park_timeout(Duration::from_micros(*delay_us)) { |
85 | tracing::trace!("notified" ); |
86 | |
87 | // If notified before timeout, reset the last tick and the sleep counter. |
88 | last_tick = Reactor::get().ticker(); |
89 | sleeps = 0; |
90 | } else { |
91 | sleeps += 1; |
92 | } |
93 | } |
94 | } |
95 | } |
96 | |
97 | /// Blocks the current thread on a future, processing I/O events when idle. |
98 | /// |
99 | /// # Examples |
100 | /// |
101 | /// ``` |
102 | /// use async_io::Timer; |
103 | /// use std::time::Duration; |
104 | /// |
105 | /// async_io::block_on(async { |
106 | /// // This timer will likely be processed by the current |
107 | /// // thread rather than the fallback "async-io" thread. |
108 | /// Timer::after(Duration::from_millis(1)).await; |
109 | /// }); |
110 | /// ``` |
111 | pub fn block_on<T>(future: impl Future<Output = T>) -> T { |
112 | let span = tracing::trace_span!("async_io::block_on" ); |
113 | let _enter = span.enter(); |
114 | |
115 | // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive. |
116 | BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst); |
117 | |
118 | // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. |
119 | let _guard = CallOnDrop(|| { |
120 | BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); |
121 | unparker().unpark(); |
122 | }); |
123 | |
124 | // Creates a parker and an associated waker that unparks it. |
125 | fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) { |
126 | // Parker and unparker for notifying the current thread. |
127 | let (p, u) = parking::pair(); |
128 | |
129 | // This boolean is set to `true` when the current thread is blocked on I/O. |
130 | let io_blocked = Arc::new(AtomicBool::new(false)); |
131 | |
132 | // Prepare the waker. |
133 | let waker = BlockOnWaker::create(io_blocked.clone(), u); |
134 | |
135 | (p, waker, io_blocked) |
136 | } |
137 | |
138 | thread_local! { |
139 | // Cached parker and waker for efficiency. |
140 | static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker()); |
141 | |
142 | // Indicates that the current thread is polling I/O, but not necessarily blocked on it. |
143 | static IO_POLLING: Cell<bool> = const { Cell::new(false) }; |
144 | } |
145 | |
146 | struct BlockOnWaker { |
147 | io_blocked: Arc<AtomicBool>, |
148 | unparker: parking::Unparker, |
149 | } |
150 | |
151 | impl BlockOnWaker { |
152 | fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker { |
153 | Waker::from(Arc::new(BlockOnWaker { |
154 | io_blocked, |
155 | unparker, |
156 | })) |
157 | } |
158 | } |
159 | |
160 | impl std::task::Wake for BlockOnWaker { |
161 | fn wake_by_ref(self: &Arc<Self>) { |
162 | if self.unparker.unpark() { |
163 | // Check if waking from another thread and if currently blocked on I/O. |
164 | if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) { |
165 | Reactor::get().notify(); |
166 | } |
167 | } |
168 | } |
169 | |
170 | fn wake(self: Arc<Self>) { |
171 | self.wake_by_ref() |
172 | } |
173 | } |
174 | |
175 | CACHE.with(|cache| { |
176 | // Try grabbing the cached parker and waker. |
177 | let tmp_cached; |
178 | let tmp_fresh; |
179 | let (p, waker, io_blocked) = match cache.try_borrow_mut() { |
180 | Ok(cache) => { |
181 | // Use the cached parker and waker. |
182 | tmp_cached = cache; |
183 | &*tmp_cached |
184 | } |
185 | Err(_) => { |
186 | // Looks like this is a recursive `block_on()` call. |
187 | // Create a fresh parker and waker. |
188 | tmp_fresh = parker_and_waker(); |
189 | &tmp_fresh |
190 | } |
191 | }; |
192 | |
193 | pin!(future); |
194 | |
195 | let cx = &mut Context::from_waker(waker); |
196 | |
197 | loop { |
198 | // Poll the future. |
199 | if let Poll::Ready(t) = future.as_mut().poll(cx) { |
200 | // Ensure the cached parker is reset to the unnotified state for future block_on calls, |
201 | // in case this future called wake and then immediately returned Poll::Ready. |
202 | p.park_timeout(Duration::from_secs(0)); |
203 | tracing::trace!("completed" ); |
204 | return t; |
205 | } |
206 | |
207 | // Check if a notification was received. |
208 | if p.park_timeout(Duration::from_secs(0)) { |
209 | tracing::trace!("notified" ); |
210 | |
211 | // Try grabbing a lock on the reactor to process I/O events. |
212 | if let Some(mut reactor_lock) = Reactor::get().try_lock() { |
213 | // First let wakers know this parker is processing I/O events. |
214 | IO_POLLING.with(|io| io.set(true)); |
215 | let _guard = CallOnDrop(|| { |
216 | IO_POLLING.with(|io| io.set(false)); |
217 | }); |
218 | |
219 | // Process available I/O events. |
220 | reactor_lock.react(Some(Duration::from_secs(0))).ok(); |
221 | } |
222 | continue; |
223 | } |
224 | |
225 | // Try grabbing a lock on the reactor to wait on I/O. |
226 | if let Some(mut reactor_lock) = Reactor::get().try_lock() { |
227 | // Record the instant at which the lock was grabbed. |
228 | let start = Instant::now(); |
229 | |
230 | loop { |
231 | // First let wakers know this parker is blocked on I/O. |
232 | IO_POLLING.with(|io| io.set(true)); |
233 | io_blocked.store(true, Ordering::SeqCst); |
234 | let _guard = CallOnDrop(|| { |
235 | IO_POLLING.with(|io| io.set(false)); |
236 | io_blocked.store(false, Ordering::SeqCst); |
237 | }); |
238 | |
239 | // Check if a notification has been received before `io_blocked` was updated |
240 | // because in that case the reactor won't receive a wakeup. |
241 | if p.park_timeout(Duration::from_secs(0)) { |
242 | tracing::trace!("notified" ); |
243 | break; |
244 | } |
245 | |
246 | // Wait for I/O events. |
247 | tracing::trace!("waiting on I/O" ); |
248 | reactor_lock.react(None).ok(); |
249 | |
250 | // Check if a notification has been received. |
251 | if p.park_timeout(Duration::from_secs(0)) { |
252 | tracing::trace!("notified" ); |
253 | break; |
254 | } |
255 | |
256 | // Check if this thread been handling I/O events for a long time. |
257 | if start.elapsed() > Duration::from_micros(500) { |
258 | tracing::trace!("stops hogging the reactor" ); |
259 | |
260 | // This thread is clearly processing I/O events for some other threads |
261 | // because it didn't get a notification yet. It's best to stop hogging the |
262 | // reactor and give other threads a chance to process I/O events for |
263 | // themselves. |
264 | drop(reactor_lock); |
265 | |
266 | // Unpark the "async-io" thread in case no other thread is ready to start |
267 | // processing I/O events. This way we prevent a potential latency spike. |
268 | unparker().unpark(); |
269 | |
270 | // Wait for a notification. |
271 | p.park(); |
272 | break; |
273 | } |
274 | } |
275 | } else { |
276 | // Wait for an actual notification. |
277 | tracing::trace!("sleep until notification" ); |
278 | p.park(); |
279 | } |
280 | } |
281 | }) |
282 | } |
283 | |
284 | /// Runs a closure when dropped. |
285 | struct CallOnDrop<F: Fn()>(F); |
286 | |
287 | impl<F: Fn()> Drop for CallOnDrop<F> { |
288 | fn drop(&mut self) { |
289 | (self.0)(); |
290 | } |
291 | } |
292 | |