1 | use std::cell::Cell; |
2 | use std::future::Future; |
3 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
4 | use std::sync::Arc; |
5 | use std::task::{Context, Poll}; |
6 | use std::thread; |
7 | use std::time::{Duration, Instant}; |
8 | |
9 | use async_lock::OnceCell; |
10 | use futures_lite::pin; |
11 | use waker_fn::waker_fn; |
12 | |
13 | use crate::reactor::Reactor; |
14 | |
15 | /// Number of currently active `block_on()` invocations. |
16 | static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); |
17 | |
18 | /// Unparker for the "async-io" thread. |
19 | fn unparker() -> &'static parking::Unparker { |
20 | static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new(); |
21 | |
22 | UNPARKER.get_or_init_blocking(|| { |
23 | let (parker: Parker, unparker: Unparker) = parking::pair(); |
24 | |
25 | // Spawn a helper thread driving the reactor. |
26 | // |
27 | // Note that this thread is not exactly necessary, it's only here to help push things |
28 | // forward if there are no `Parker`s around or if `Parker`s are just idling and never |
29 | // parking. |
30 | thread::Builder::new() |
31 | .name("async-io" .to_string()) |
32 | .spawn(move || main_loop(parker)) |
33 | .expect(msg:"cannot spawn async-io thread" ); |
34 | |
35 | unparker |
36 | }) |
37 | } |
38 | |
39 | /// Initializes the "async-io" thread. |
40 | pub(crate) fn init() { |
41 | let _ = unparker(); |
42 | } |
43 | |
44 | /// The main loop for the "async-io" thread. |
45 | fn main_loop(parker: parking::Parker) { |
46 | // The last observed reactor tick. |
47 | let mut last_tick = 0; |
48 | // Number of sleeps since this thread has called `react()`. |
49 | let mut sleeps = 0u64; |
50 | |
51 | loop { |
52 | let tick = Reactor::get().ticker(); |
53 | |
54 | if last_tick == tick { |
55 | let reactor_lock = if sleeps >= 10 { |
56 | // If no new ticks have occurred for a while, stop sleeping and spinning in |
57 | // this loop and just block on the reactor lock. |
58 | Some(Reactor::get().lock()) |
59 | } else { |
60 | Reactor::get().try_lock() |
61 | }; |
62 | |
63 | if let Some(mut reactor_lock) = reactor_lock { |
64 | log::trace!("main_loop: waiting on I/O" ); |
65 | reactor_lock.react(None).ok(); |
66 | last_tick = Reactor::get().ticker(); |
67 | sleeps = 0; |
68 | } |
69 | } else { |
70 | last_tick = tick; |
71 | } |
72 | |
73 | if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 { |
74 | // Exponential backoff from 50us to 10ms. |
75 | let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000] |
76 | .get(sleeps as usize) |
77 | .unwrap_or(&10_000); |
78 | |
79 | log::trace!("main_loop: sleeping for {} us" , delay_us); |
80 | if parker.park_timeout(Duration::from_micros(*delay_us)) { |
81 | log::trace!("main_loop: notified" ); |
82 | |
83 | // If notified before timeout, reset the last tick and the sleep counter. |
84 | last_tick = Reactor::get().ticker(); |
85 | sleeps = 0; |
86 | } else { |
87 | sleeps += 1; |
88 | } |
89 | } |
90 | } |
91 | } |
92 | |
93 | /// Blocks the current thread on a future, processing I/O events when idle. |
94 | /// |
95 | /// # Examples |
96 | /// |
97 | /// ``` |
98 | /// use async_io::Timer; |
99 | /// use std::time::Duration; |
100 | /// |
101 | /// async_io::block_on(async { |
102 | /// // This timer will likely be processed by the current |
103 | /// // thread rather than the fallback "async-io" thread. |
104 | /// Timer::after(Duration::from_millis(1)).await; |
105 | /// }); |
106 | /// ``` |
107 | pub fn block_on<T>(future: impl Future<Output = T>) -> T { |
108 | log::trace!("block_on()" ); |
109 | |
110 | // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive. |
111 | BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst); |
112 | |
113 | // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. |
114 | let _guard = CallOnDrop(|| { |
115 | BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); |
116 | unparker().unpark(); |
117 | }); |
118 | |
119 | // Parker and unparker for notifying the current thread. |
120 | let (p, u) = parking::pair(); |
121 | // This boolean is set to `true` when the current thread is blocked on I/O. |
122 | let io_blocked = Arc::new(AtomicBool::new(false)); |
123 | |
124 | thread_local! { |
125 | // Indicates that the current thread is polling I/O, but not necessarily blocked on it. |
126 | static IO_POLLING: Cell<bool> = Cell::new(false); |
127 | } |
128 | |
129 | // Prepare the waker. |
130 | let waker = waker_fn({ |
131 | let io_blocked = io_blocked.clone(); |
132 | move || { |
133 | if u.unpark() { |
134 | // Check if waking from another thread and if currently blocked on I/O. |
135 | if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) { |
136 | Reactor::get().notify(); |
137 | } |
138 | } |
139 | } |
140 | }); |
141 | let cx = &mut Context::from_waker(&waker); |
142 | pin!(future); |
143 | |
144 | loop { |
145 | // Poll the future. |
146 | if let Poll::Ready(t) = future.as_mut().poll(cx) { |
147 | log::trace!("block_on: completed" ); |
148 | return t; |
149 | } |
150 | |
151 | // Check if a notification was received. |
152 | if p.park_timeout(Duration::from_secs(0)) { |
153 | log::trace!("block_on: notified" ); |
154 | |
155 | // Try grabbing a lock on the reactor to process I/O events. |
156 | if let Some(mut reactor_lock) = Reactor::get().try_lock() { |
157 | // First let wakers know this parker is processing I/O events. |
158 | IO_POLLING.with(|io| io.set(true)); |
159 | let _guard = CallOnDrop(|| { |
160 | IO_POLLING.with(|io| io.set(false)); |
161 | }); |
162 | |
163 | // Process available I/O events. |
164 | reactor_lock.react(Some(Duration::from_secs(0))).ok(); |
165 | } |
166 | continue; |
167 | } |
168 | |
169 | // Try grabbing a lock on the reactor to wait on I/O. |
170 | if let Some(mut reactor_lock) = Reactor::get().try_lock() { |
171 | // Record the instant at which the lock was grabbed. |
172 | let start = Instant::now(); |
173 | |
174 | loop { |
175 | // First let wakers know this parker is blocked on I/O. |
176 | IO_POLLING.with(|io| io.set(true)); |
177 | io_blocked.store(true, Ordering::SeqCst); |
178 | let _guard = CallOnDrop(|| { |
179 | IO_POLLING.with(|io| io.set(false)); |
180 | io_blocked.store(false, Ordering::SeqCst); |
181 | }); |
182 | |
183 | // Check if a notification has been received before `io_blocked` was updated |
184 | // because in that case the reactor won't receive a wakeup. |
185 | if p.park_timeout(Duration::from_secs(0)) { |
186 | log::trace!("block_on: notified" ); |
187 | break; |
188 | } |
189 | |
190 | // Wait for I/O events. |
191 | log::trace!("block_on: waiting on I/O" ); |
192 | reactor_lock.react(None).ok(); |
193 | |
194 | // Check if a notification has been received. |
195 | if p.park_timeout(Duration::from_secs(0)) { |
196 | log::trace!("block_on: notified" ); |
197 | break; |
198 | } |
199 | |
200 | // Check if this thread been handling I/O events for a long time. |
201 | if start.elapsed() > Duration::from_micros(500) { |
202 | log::trace!("block_on: stops hogging the reactor" ); |
203 | |
204 | // This thread is clearly processing I/O events for some other threads |
205 | // because it didn't get a notification yet. It's best to stop hogging the |
206 | // reactor and give other threads a chance to process I/O events for |
207 | // themselves. |
208 | drop(reactor_lock); |
209 | |
210 | // Unpark the "async-io" thread in case no other thread is ready to start |
211 | // processing I/O events. This way we prevent a potential latency spike. |
212 | unparker().unpark(); |
213 | |
214 | // Wait for a notification. |
215 | p.park(); |
216 | break; |
217 | } |
218 | } |
219 | } else { |
220 | // Wait for an actual notification. |
221 | log::trace!("block_on: sleep until notification" ); |
222 | p.park(); |
223 | } |
224 | } |
225 | } |
226 | |
227 | /// Runs a closure when dropped. |
228 | struct CallOnDrop<F: Fn()>(F); |
229 | |
230 | impl<F: Fn()> Drop for CallOnDrop<F> { |
231 | fn drop(&mut self) { |
232 | (self.0)(); |
233 | } |
234 | } |
235 | |