| 1 | use std::cell::{Cell, RefCell}; |
| 2 | use std::future::Future; |
| 3 | use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
| 4 | use std::sync::Arc; |
| 5 | use std::task::Waker; |
| 6 | use std::task::{Context, Poll}; |
| 7 | use std::thread; |
| 8 | use std::time::{Duration, Instant}; |
| 9 | |
| 10 | use async_lock::OnceCell; |
| 11 | use futures_lite::pin; |
| 12 | use parking::Parker; |
| 13 | |
| 14 | use crate::reactor::Reactor; |
| 15 | |
| 16 | /// Number of currently active `block_on()` invocations. |
| 17 | static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); |
| 18 | |
| 19 | /// Unparker for the "async-io" thread. |
| 20 | fn unparker() -> &'static parking::Unparker { |
| 21 | static UNPARKER: OnceCell<parking::Unparker> = OnceCell::new(); |
| 22 | |
| 23 | UNPARKER.get_or_init_blocking(|| { |
| 24 | let (parker: Parker, unparker: Unparker) = parking::pair(); |
| 25 | |
| 26 | // Spawn a helper thread driving the reactor. |
| 27 | // |
| 28 | // Note that this thread is not exactly necessary, it's only here to help push things |
| 29 | // forward if there are no `Parker`s around or if `Parker`s are just idling and never |
| 30 | // parking. |
| 31 | thread::Builder::new() |
| 32 | .name("async-io" .to_string()) |
| 33 | .spawn(move || main_loop(parker)) |
| 34 | .expect(msg:"cannot spawn async-io thread" ); |
| 35 | |
| 36 | unparker |
| 37 | }) |
| 38 | } |
| 39 | |
| 40 | /// Initializes the "async-io" thread. |
| 41 | pub(crate) fn init() { |
| 42 | let _ = unparker(); |
| 43 | } |
| 44 | |
| 45 | /// The main loop for the "async-io" thread. |
| 46 | fn main_loop(parker: parking::Parker) { |
| 47 | let span = tracing::trace_span!("async_io::main_loop" ); |
| 48 | let _enter = span.enter(); |
| 49 | |
| 50 | // The last observed reactor tick. |
| 51 | let mut last_tick = 0; |
| 52 | // Number of sleeps since this thread has called `react()`. |
| 53 | let mut sleeps = 0u64; |
| 54 | |
| 55 | loop { |
| 56 | let tick = Reactor::get().ticker(); |
| 57 | |
| 58 | if last_tick == tick { |
| 59 | let reactor_lock = if sleeps >= 10 { |
| 60 | // If no new ticks have occurred for a while, stop sleeping and spinning in |
| 61 | // this loop and just block on the reactor lock. |
| 62 | Some(Reactor::get().lock()) |
| 63 | } else { |
| 64 | Reactor::get().try_lock() |
| 65 | }; |
| 66 | |
| 67 | if let Some(mut reactor_lock) = reactor_lock { |
| 68 | tracing::trace!("waiting on I/O" ); |
| 69 | reactor_lock.react(None).ok(); |
| 70 | last_tick = Reactor::get().ticker(); |
| 71 | sleeps = 0; |
| 72 | } |
| 73 | } else { |
| 74 | last_tick = tick; |
| 75 | } |
| 76 | |
| 77 | if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 { |
| 78 | // Exponential backoff from 50us to 10ms. |
| 79 | let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000] |
| 80 | .get(sleeps as usize) |
| 81 | .unwrap_or(&10_000); |
| 82 | |
| 83 | tracing::trace!("sleeping for {} us" , delay_us); |
| 84 | if parker.park_timeout(Duration::from_micros(*delay_us)) { |
| 85 | tracing::trace!("notified" ); |
| 86 | |
| 87 | // If notified before timeout, reset the last tick and the sleep counter. |
| 88 | last_tick = Reactor::get().ticker(); |
| 89 | sleeps = 0; |
| 90 | } else { |
| 91 | sleeps += 1; |
| 92 | } |
| 93 | } |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | /// Blocks the current thread on a future, processing I/O events when idle. |
| 98 | /// |
| 99 | /// # Examples |
| 100 | /// |
| 101 | /// ``` |
| 102 | /// use async_io::Timer; |
| 103 | /// use std::time::Duration; |
| 104 | /// |
| 105 | /// async_io::block_on(async { |
| 106 | /// // This timer will likely be processed by the current |
| 107 | /// // thread rather than the fallback "async-io" thread. |
| 108 | /// Timer::after(Duration::from_millis(1)).await; |
| 109 | /// }); |
| 110 | /// ``` |
| 111 | pub fn block_on<T>(future: impl Future<Output = T>) -> T { |
| 112 | let span = tracing::trace_span!("async_io::block_on" ); |
| 113 | let _enter = span.enter(); |
| 114 | |
| 115 | // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive. |
| 116 | BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst); |
| 117 | |
| 118 | // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. |
| 119 | let _guard = CallOnDrop(|| { |
| 120 | BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); |
| 121 | unparker().unpark(); |
| 122 | }); |
| 123 | |
| 124 | // Creates a parker and an associated waker that unparks it. |
| 125 | fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) { |
| 126 | // Parker and unparker for notifying the current thread. |
| 127 | let (p, u) = parking::pair(); |
| 128 | |
| 129 | // This boolean is set to `true` when the current thread is blocked on I/O. |
| 130 | let io_blocked = Arc::new(AtomicBool::new(false)); |
| 131 | |
| 132 | // Prepare the waker. |
| 133 | let waker = BlockOnWaker::create(io_blocked.clone(), u); |
| 134 | |
| 135 | (p, waker, io_blocked) |
| 136 | } |
| 137 | |
| 138 | thread_local! { |
| 139 | // Cached parker and waker for efficiency. |
| 140 | static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker()); |
| 141 | |
| 142 | // Indicates that the current thread is polling I/O, but not necessarily blocked on it. |
| 143 | static IO_POLLING: Cell<bool> = const { Cell::new(false) }; |
| 144 | } |
| 145 | |
| 146 | struct BlockOnWaker { |
| 147 | io_blocked: Arc<AtomicBool>, |
| 148 | unparker: parking::Unparker, |
| 149 | } |
| 150 | |
| 151 | impl BlockOnWaker { |
| 152 | fn create(io_blocked: Arc<AtomicBool>, unparker: parking::Unparker) -> Waker { |
| 153 | Waker::from(Arc::new(BlockOnWaker { |
| 154 | io_blocked, |
| 155 | unparker, |
| 156 | })) |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | impl std::task::Wake for BlockOnWaker { |
| 161 | fn wake_by_ref(self: &Arc<Self>) { |
| 162 | if self.unparker.unpark() { |
| 163 | // Check if waking from another thread and if currently blocked on I/O. |
| 164 | if !IO_POLLING.with(Cell::get) && self.io_blocked.load(Ordering::SeqCst) { |
| 165 | Reactor::get().notify(); |
| 166 | } |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | fn wake(self: Arc<Self>) { |
| 171 | self.wake_by_ref() |
| 172 | } |
| 173 | } |
| 174 | |
| 175 | CACHE.with(|cache| { |
| 176 | // Try grabbing the cached parker and waker. |
| 177 | let tmp_cached; |
| 178 | let tmp_fresh; |
| 179 | let (p, waker, io_blocked) = match cache.try_borrow_mut() { |
| 180 | Ok(cache) => { |
| 181 | // Use the cached parker and waker. |
| 182 | tmp_cached = cache; |
| 183 | &*tmp_cached |
| 184 | } |
| 185 | Err(_) => { |
| 186 | // Looks like this is a recursive `block_on()` call. |
| 187 | // Create a fresh parker and waker. |
| 188 | tmp_fresh = parker_and_waker(); |
| 189 | &tmp_fresh |
| 190 | } |
| 191 | }; |
| 192 | |
| 193 | pin!(future); |
| 194 | |
| 195 | let cx = &mut Context::from_waker(waker); |
| 196 | |
| 197 | loop { |
| 198 | // Poll the future. |
| 199 | if let Poll::Ready(t) = future.as_mut().poll(cx) { |
| 200 | // Ensure the cached parker is reset to the unnotified state for future block_on calls, |
| 201 | // in case this future called wake and then immediately returned Poll::Ready. |
| 202 | p.park_timeout(Duration::from_secs(0)); |
| 203 | tracing::trace!("completed" ); |
| 204 | return t; |
| 205 | } |
| 206 | |
| 207 | // Check if a notification was received. |
| 208 | if p.park_timeout(Duration::from_secs(0)) { |
| 209 | tracing::trace!("notified" ); |
| 210 | |
| 211 | // Try grabbing a lock on the reactor to process I/O events. |
| 212 | if let Some(mut reactor_lock) = Reactor::get().try_lock() { |
| 213 | // First let wakers know this parker is processing I/O events. |
| 214 | IO_POLLING.with(|io| io.set(true)); |
| 215 | let _guard = CallOnDrop(|| { |
| 216 | IO_POLLING.with(|io| io.set(false)); |
| 217 | }); |
| 218 | |
| 219 | // Process available I/O events. |
| 220 | reactor_lock.react(Some(Duration::from_secs(0))).ok(); |
| 221 | } |
| 222 | continue; |
| 223 | } |
| 224 | |
| 225 | // Try grabbing a lock on the reactor to wait on I/O. |
| 226 | if let Some(mut reactor_lock) = Reactor::get().try_lock() { |
| 227 | // Record the instant at which the lock was grabbed. |
| 228 | let start = Instant::now(); |
| 229 | |
| 230 | loop { |
| 231 | // First let wakers know this parker is blocked on I/O. |
| 232 | IO_POLLING.with(|io| io.set(true)); |
| 233 | io_blocked.store(true, Ordering::SeqCst); |
| 234 | let _guard = CallOnDrop(|| { |
| 235 | IO_POLLING.with(|io| io.set(false)); |
| 236 | io_blocked.store(false, Ordering::SeqCst); |
| 237 | }); |
| 238 | |
| 239 | // Check if a notification has been received before `io_blocked` was updated |
| 240 | // because in that case the reactor won't receive a wakeup. |
| 241 | if p.park_timeout(Duration::from_secs(0)) { |
| 242 | tracing::trace!("notified" ); |
| 243 | break; |
| 244 | } |
| 245 | |
| 246 | // Wait for I/O events. |
| 247 | tracing::trace!("waiting on I/O" ); |
| 248 | reactor_lock.react(None).ok(); |
| 249 | |
| 250 | // Check if a notification has been received. |
| 251 | if p.park_timeout(Duration::from_secs(0)) { |
| 252 | tracing::trace!("notified" ); |
| 253 | break; |
| 254 | } |
| 255 | |
| 256 | // Check if this thread been handling I/O events for a long time. |
| 257 | if start.elapsed() > Duration::from_micros(500) { |
| 258 | tracing::trace!("stops hogging the reactor" ); |
| 259 | |
| 260 | // This thread is clearly processing I/O events for some other threads |
| 261 | // because it didn't get a notification yet. It's best to stop hogging the |
| 262 | // reactor and give other threads a chance to process I/O events for |
| 263 | // themselves. |
| 264 | drop(reactor_lock); |
| 265 | |
| 266 | // Unpark the "async-io" thread in case no other thread is ready to start |
| 267 | // processing I/O events. This way we prevent a potential latency spike. |
| 268 | unparker().unpark(); |
| 269 | |
| 270 | // Wait for a notification. |
| 271 | p.park(); |
| 272 | break; |
| 273 | } |
| 274 | } |
| 275 | } else { |
| 276 | // Wait for an actual notification. |
| 277 | tracing::trace!("sleep until notification" ); |
| 278 | p.park(); |
| 279 | } |
| 280 | } |
| 281 | }) |
| 282 | } |
| 283 | |
| 284 | /// Runs a closure when dropped. |
| 285 | struct CallOnDrop<F: Fn()>(F); |
| 286 | |
| 287 | impl<F: Fn()> Drop for CallOnDrop<F> { |
| 288 | fn drop(&mut self) { |
| 289 | (self.0)(); |
| 290 | } |
| 291 | } |
| 292 | |