| 1 | use crate::io::interest::Interest; |
| 2 | use crate::io::ready::Ready; |
| 3 | use crate::loom::sync::atomic::AtomicUsize; |
| 4 | use crate::loom::sync::Mutex; |
| 5 | use crate::runtime::io::{Direction, ReadyEvent, Tick}; |
| 6 | use crate::util::bit; |
| 7 | use crate::util::linked_list::{self, LinkedList}; |
| 8 | use crate::util::WakeList; |
| 9 | |
| 10 | use std::cell::UnsafeCell; |
| 11 | use std::future::Future; |
| 12 | use std::marker::PhantomPinned; |
| 13 | use std::pin::Pin; |
| 14 | use std::ptr::NonNull; |
| 15 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
| 16 | use std::task::{Context, Poll, Waker}; |
| 17 | |
| 18 | /// Stored in the I/O driver resource slab. |
| 19 | #[derive (Debug)] |
| 20 | // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied |
| 21 | // from crossbeam-utils/src/cache_padded.rs |
| 22 | // |
| 23 | // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache |
| 24 | // lines at a time, so we have to align to 128 bytes rather than 64. |
| 25 | // |
| 26 | // Sources: |
| 27 | // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf |
| 28 | // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 |
| 29 | // |
| 30 | // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. |
| 31 | // |
| 32 | // Sources: |
| 33 | // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ |
| 34 | // |
| 35 | // powerpc64 has 128-byte cache line size. |
| 36 | // |
| 37 | // Sources: |
| 38 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 |
| 39 | #[cfg_attr ( |
| 40 | any( |
| 41 | target_arch = "x86_64" , |
| 42 | target_arch = "aarch64" , |
| 43 | target_arch = "powerpc64" , |
| 44 | ), |
| 45 | repr(align(128)) |
| 46 | )] |
| 47 | // arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. |
| 48 | // |
| 49 | // Sources: |
| 50 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 |
| 51 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 |
| 52 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 |
| 53 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 |
| 54 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 |
| 55 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 |
| 56 | #[cfg_attr ( |
| 57 | any( |
| 58 | target_arch = "arm" , |
| 59 | target_arch = "mips" , |
| 60 | target_arch = "mips64" , |
| 61 | target_arch = "sparc" , |
| 62 | target_arch = "hexagon" , |
| 63 | ), |
| 64 | repr(align(32)) |
| 65 | )] |
| 66 | // m68k has 16-byte cache line size. |
| 67 | // |
| 68 | // Sources: |
| 69 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 |
| 70 | #[cfg_attr (target_arch = "m68k" , repr(align(16)))] |
| 71 | // s390x has 256-byte cache line size. |
| 72 | // |
| 73 | // Sources: |
| 74 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 |
| 75 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 |
| 76 | #[cfg_attr (target_arch = "s390x" , repr(align(256)))] |
| 77 | // x86, riscv, wasm, and sparc64 have 64-byte cache line size. |
| 78 | // |
| 79 | // Sources: |
| 80 | // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 |
| 81 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 |
| 82 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 |
| 83 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 |
| 84 | // |
| 85 | // All others are assumed to have 64-byte cache line size. |
| 86 | #[cfg_attr ( |
| 87 | not(any( |
| 88 | target_arch = "x86_64" , |
| 89 | target_arch = "aarch64" , |
| 90 | target_arch = "powerpc64" , |
| 91 | target_arch = "arm" , |
| 92 | target_arch = "mips" , |
| 93 | target_arch = "mips64" , |
| 94 | target_arch = "sparc" , |
| 95 | target_arch = "hexagon" , |
| 96 | target_arch = "m68k" , |
| 97 | target_arch = "s390x" , |
| 98 | )), |
| 99 | repr(align(64)) |
| 100 | )] |
| 101 | pub(crate) struct ScheduledIo { |
| 102 | pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>, |
| 103 | |
| 104 | /// Packs the resource's readiness and I/O driver latest tick. |
| 105 | readiness: AtomicUsize, |
| 106 | |
| 107 | waiters: Mutex<Waiters>, |
| 108 | } |
| 109 | |
| 110 | type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
| 111 | |
| 112 | #[derive (Debug, Default)] |
| 113 | struct Waiters { |
| 114 | /// List of all current waiters. |
| 115 | list: WaitList, |
| 116 | |
| 117 | /// Waker used for `AsyncRead`. |
| 118 | reader: Option<Waker>, |
| 119 | |
| 120 | /// Waker used for `AsyncWrite`. |
| 121 | writer: Option<Waker>, |
| 122 | } |
| 123 | |
| 124 | #[derive (Debug)] |
| 125 | struct Waiter { |
| 126 | pointers: linked_list::Pointers<Waiter>, |
| 127 | |
| 128 | /// The waker for this task. |
| 129 | waker: Option<Waker>, |
| 130 | |
| 131 | /// The interest this waiter is waiting on. |
| 132 | interest: Interest, |
| 133 | |
| 134 | is_ready: bool, |
| 135 | |
| 136 | /// Should never be `!Unpin`. |
| 137 | _p: PhantomPinned, |
| 138 | } |
| 139 | |
| 140 | generate_addr_of_methods! { |
| 141 | impl<> Waiter { |
| 142 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
| 143 | &self.pointers |
| 144 | } |
| 145 | } |
| 146 | } |
| 147 | |
| 148 | /// Future returned by `readiness()`. |
| 149 | struct Readiness<'a> { |
| 150 | scheduled_io: &'a ScheduledIo, |
| 151 | |
| 152 | state: State, |
| 153 | |
| 154 | /// Entry in the waiter `LinkedList`. |
| 155 | waiter: UnsafeCell<Waiter>, |
| 156 | } |
| 157 | |
| 158 | enum State { |
| 159 | Init, |
| 160 | Waiting, |
| 161 | Done, |
| 162 | } |
| 163 | |
| 164 | // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. |
| 165 | // |
| 166 | // | shutdown | driver tick | readiness | |
| 167 | // |----------+-------------+-----------| |
| 168 | // | 1 bit | 15 bits + 16 bits | |
| 169 | |
| 170 | const READINESS: bit::Pack = bit::Pack::least_significant(width:16); |
| 171 | |
| 172 | const TICK: bit::Pack = READINESS.then(width:15); |
| 173 | |
| 174 | const SHUTDOWN: bit::Pack = TICK.then(width:1); |
| 175 | |
| 176 | // ===== impl ScheduledIo ===== |
| 177 | |
| 178 | impl Default for ScheduledIo { |
| 179 | fn default() -> ScheduledIo { |
| 180 | ScheduledIo { |
| 181 | linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()), |
| 182 | readiness: AtomicUsize::new(val:0), |
| 183 | waiters: Mutex::new(Waiters::default()), |
| 184 | } |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | impl ScheduledIo { |
| 189 | pub(crate) fn token(&self) -> mio::Token { |
| 190 | mio::Token(super::EXPOSE_IO.expose_provenance(self)) |
| 191 | } |
| 192 | |
| 193 | /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a |
| 194 | /// permanently shutdown state. |
| 195 | pub(super) fn shutdown(&self) { |
| 196 | let mask = SHUTDOWN.pack(1, 0); |
| 197 | self.readiness.fetch_or(mask, AcqRel); |
| 198 | self.wake(Ready::ALL); |
| 199 | } |
| 200 | |
| 201 | /// Sets the readiness on this `ScheduledIo` by invoking the given closure on |
| 202 | /// the current value, returning the previous readiness value. |
| 203 | /// |
| 204 | /// # Arguments |
| 205 | /// - `tick`: whether setting the tick or trying to clear readiness for a |
| 206 | /// specific tick. |
| 207 | /// - `f`: a closure returning a new readiness value given the previous |
| 208 | /// readiness. |
| 209 | pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) { |
| 210 | let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { |
| 211 | // If the io driver is shut down, then you are only allowed to clear readiness. |
| 212 | debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_))); |
| 213 | |
| 214 | const MAX_TICK: usize = TICK.max_value() + 1; |
| 215 | let tick = TICK.unpack(curr); |
| 216 | |
| 217 | let new_tick = match tick_op { |
| 218 | // Trying to clear readiness with an old event! |
| 219 | Tick::Clear(t) if tick as u8 != t => return None, |
| 220 | Tick::Clear(t) => t as usize, |
| 221 | Tick::Set => tick.wrapping_add(1) % MAX_TICK, |
| 222 | }; |
| 223 | let ready = Ready::from_usize(READINESS.unpack(curr)); |
| 224 | Some(TICK.pack(new_tick, f(ready).as_usize())) |
| 225 | }); |
| 226 | } |
| 227 | |
| 228 | /// Notifies all pending waiters that have registered interest in `ready`. |
| 229 | /// |
| 230 | /// There may be many waiters to notify. Waking the pending task **must** be |
| 231 | /// done from outside of the lock otherwise there is a potential for a |
| 232 | /// deadlock. |
| 233 | /// |
| 234 | /// A stack array of wakers is created and filled with wakers to notify, the |
| 235 | /// lock is released, and the wakers are notified. Because there may be more |
| 236 | /// than 32 wakers to notify, if the stack array fills up, the lock is |
| 237 | /// released, the array is cleared, and the iteration continues. |
| 238 | pub(super) fn wake(&self, ready: Ready) { |
| 239 | let mut wakers = WakeList::new(); |
| 240 | |
| 241 | let mut waiters = self.waiters.lock(); |
| 242 | |
| 243 | // check for AsyncRead slot |
| 244 | if ready.is_readable() { |
| 245 | if let Some(waker) = waiters.reader.take() { |
| 246 | wakers.push(waker); |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | // check for AsyncWrite slot |
| 251 | if ready.is_writable() { |
| 252 | if let Some(waker) = waiters.writer.take() { |
| 253 | wakers.push(waker); |
| 254 | } |
| 255 | } |
| 256 | |
| 257 | 'outer: loop { |
| 258 | let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); |
| 259 | |
| 260 | while wakers.can_push() { |
| 261 | match iter.next() { |
| 262 | Some(waiter) => { |
| 263 | let waiter = unsafe { &mut *waiter.as_ptr() }; |
| 264 | |
| 265 | if let Some(waker) = waiter.waker.take() { |
| 266 | waiter.is_ready = true; |
| 267 | wakers.push(waker); |
| 268 | } |
| 269 | } |
| 270 | None => { |
| 271 | break 'outer; |
| 272 | } |
| 273 | } |
| 274 | } |
| 275 | |
| 276 | drop(waiters); |
| 277 | |
| 278 | wakers.wake_all(); |
| 279 | |
| 280 | // Acquire the lock again. |
| 281 | waiters = self.waiters.lock(); |
| 282 | } |
| 283 | |
| 284 | // Release the lock before notifying |
| 285 | drop(waiters); |
| 286 | |
| 287 | wakers.wake_all(); |
| 288 | } |
| 289 | |
| 290 | pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { |
| 291 | let curr = self.readiness.load(Acquire); |
| 292 | |
| 293 | ReadyEvent { |
| 294 | tick: TICK.unpack(curr) as u8, |
| 295 | ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), |
| 296 | is_shutdown: SHUTDOWN.unpack(curr) != 0, |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | /// Polls for readiness events in a given direction. |
| 301 | /// |
| 302 | /// These are to support `AsyncRead` and `AsyncWrite` polling methods, |
| 303 | /// which cannot use the `async fn` version. This uses reserved reader |
| 304 | /// and writer slots. |
| 305 | pub(super) fn poll_readiness( |
| 306 | &self, |
| 307 | cx: &mut Context<'_>, |
| 308 | direction: Direction, |
| 309 | ) -> Poll<ReadyEvent> { |
| 310 | let curr = self.readiness.load(Acquire); |
| 311 | |
| 312 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
| 313 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
| 314 | |
| 315 | if ready.is_empty() && !is_shutdown { |
| 316 | // Update the task info |
| 317 | let mut waiters = self.waiters.lock(); |
| 318 | let waker = match direction { |
| 319 | Direction::Read => &mut waiters.reader, |
| 320 | Direction::Write => &mut waiters.writer, |
| 321 | }; |
| 322 | |
| 323 | // Avoid cloning the waker if one is already stored that matches the |
| 324 | // current task. |
| 325 | match waker { |
| 326 | Some(waker) => waker.clone_from(cx.waker()), |
| 327 | None => *waker = Some(cx.waker().clone()), |
| 328 | } |
| 329 | |
| 330 | // Try again, in case the readiness was changed while we were |
| 331 | // taking the waiters lock |
| 332 | let curr = self.readiness.load(Acquire); |
| 333 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
| 334 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
| 335 | if is_shutdown { |
| 336 | Poll::Ready(ReadyEvent { |
| 337 | tick: TICK.unpack(curr) as u8, |
| 338 | ready: direction.mask(), |
| 339 | is_shutdown, |
| 340 | }) |
| 341 | } else if ready.is_empty() { |
| 342 | Poll::Pending |
| 343 | } else { |
| 344 | Poll::Ready(ReadyEvent { |
| 345 | tick: TICK.unpack(curr) as u8, |
| 346 | ready, |
| 347 | is_shutdown, |
| 348 | }) |
| 349 | } |
| 350 | } else { |
| 351 | Poll::Ready(ReadyEvent { |
| 352 | tick: TICK.unpack(curr) as u8, |
| 353 | ready, |
| 354 | is_shutdown, |
| 355 | }) |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | pub(crate) fn clear_readiness(&self, event: ReadyEvent) { |
| 360 | // This consumes the current readiness state **except** for closed |
| 361 | // states. Closed states are excluded because they are final states. |
| 362 | let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; |
| 363 | self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed); |
| 364 | } |
| 365 | |
| 366 | pub(crate) fn clear_wakers(&self) { |
| 367 | let mut waiters = self.waiters.lock(); |
| 368 | waiters.reader.take(); |
| 369 | waiters.writer.take(); |
| 370 | } |
| 371 | } |
| 372 | |
| 373 | impl Drop for ScheduledIo { |
| 374 | fn drop(&mut self) { |
| 375 | self.wake(ready:Ready::ALL); |
| 376 | } |
| 377 | } |
| 378 | |
| 379 | unsafe impl Send for ScheduledIo {} |
| 380 | unsafe impl Sync for ScheduledIo {} |
| 381 | |
| 382 | impl ScheduledIo { |
| 383 | /// An async version of `poll_readiness` which uses a linked list of wakers. |
| 384 | pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { |
| 385 | self.readiness_fut(interest).await |
| 386 | } |
| 387 | |
| 388 | // This is in a separate function so that the borrow checker doesn't think |
| 389 | // we are borrowing the `UnsafeCell` possibly over await boundaries. |
| 390 | // |
| 391 | // Go figure. |
| 392 | fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { |
| 393 | Readiness { |
| 394 | scheduled_io: self, |
| 395 | state: State::Init, |
| 396 | waiter: UnsafeCell::new(Waiter { |
| 397 | pointers: linked_list::Pointers::new(), |
| 398 | waker: None, |
| 399 | is_ready: false, |
| 400 | interest, |
| 401 | _p: PhantomPinned, |
| 402 | }), |
| 403 | } |
| 404 | } |
| 405 | } |
| 406 | |
| 407 | unsafe impl linked_list::Link for Waiter { |
| 408 | type Handle = NonNull<Waiter>; |
| 409 | type Target = Waiter; |
| 410 | |
| 411 | fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { |
| 412 | *handle |
| 413 | } |
| 414 | |
| 415 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
| 416 | ptr |
| 417 | } |
| 418 | |
| 419 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
| 420 | Waiter::addr_of_pointers(me:target) |
| 421 | } |
| 422 | } |
| 423 | |
| 424 | // ===== impl Readiness ===== |
| 425 | |
| 426 | impl Future for Readiness<'_> { |
| 427 | type Output = ReadyEvent; |
| 428 | |
| 429 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
| 430 | use std::sync::atomic::Ordering::SeqCst; |
| 431 | |
| 432 | let (scheduled_io, state, waiter) = unsafe { |
| 433 | let me = self.get_unchecked_mut(); |
| 434 | (&me.scheduled_io, &mut me.state, &me.waiter) |
| 435 | }; |
| 436 | |
| 437 | loop { |
| 438 | match *state { |
| 439 | State::Init => { |
| 440 | // Optimistically check existing readiness |
| 441 | let curr = scheduled_io.readiness.load(SeqCst); |
| 442 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
| 443 | |
| 444 | // Safety: `waiter.interest` never changes |
| 445 | let interest = unsafe { (*waiter.get()).interest }; |
| 446 | let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest); |
| 447 | |
| 448 | if !ready.is_empty() || is_shutdown { |
| 449 | // Currently ready! |
| 450 | let tick = TICK.unpack(curr) as u8; |
| 451 | *state = State::Done; |
| 452 | return Poll::Ready(ReadyEvent { |
| 453 | tick, |
| 454 | ready, |
| 455 | is_shutdown, |
| 456 | }); |
| 457 | } |
| 458 | |
| 459 | // Wasn't ready, take the lock (and check again while locked). |
| 460 | let mut waiters = scheduled_io.waiters.lock(); |
| 461 | |
| 462 | let curr = scheduled_io.readiness.load(SeqCst); |
| 463 | let mut ready = Ready::from_usize(READINESS.unpack(curr)); |
| 464 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
| 465 | |
| 466 | if is_shutdown { |
| 467 | ready = Ready::ALL; |
| 468 | } |
| 469 | |
| 470 | let ready = ready.intersection(interest); |
| 471 | |
| 472 | if !ready.is_empty() || is_shutdown { |
| 473 | // Currently ready! |
| 474 | let tick = TICK.unpack(curr) as u8; |
| 475 | *state = State::Done; |
| 476 | return Poll::Ready(ReadyEvent { |
| 477 | tick, |
| 478 | ready, |
| 479 | is_shutdown, |
| 480 | }); |
| 481 | } |
| 482 | |
| 483 | // Not ready even after locked, insert into list... |
| 484 | |
| 485 | // Safety: called while locked |
| 486 | unsafe { |
| 487 | (*waiter.get()).waker = Some(cx.waker().clone()); |
| 488 | } |
| 489 | |
| 490 | // Insert the waiter into the linked list |
| 491 | // |
| 492 | // safety: pointers from `UnsafeCell` are never null. |
| 493 | waiters |
| 494 | .list |
| 495 | .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); |
| 496 | *state = State::Waiting; |
| 497 | } |
| 498 | State::Waiting => { |
| 499 | // Currently in the "Waiting" state, implying the caller has |
| 500 | // a waiter stored in the waiter list (guarded by |
| 501 | // `notify.waiters`). In order to access the waker fields, |
| 502 | // we must hold the lock. |
| 503 | |
| 504 | let waiters = scheduled_io.waiters.lock(); |
| 505 | |
| 506 | // Safety: called while locked |
| 507 | let w = unsafe { &mut *waiter.get() }; |
| 508 | |
| 509 | if w.is_ready { |
| 510 | // Our waker has been notified. |
| 511 | *state = State::Done; |
| 512 | } else { |
| 513 | // Update the waker, if necessary. |
| 514 | w.waker.as_mut().unwrap().clone_from(cx.waker()); |
| 515 | return Poll::Pending; |
| 516 | } |
| 517 | |
| 518 | // Explicit drop of the lock to indicate the scope that the |
| 519 | // lock is held. Because holding the lock is required to |
| 520 | // ensure safe access to fields not held within the lock, it |
| 521 | // is helpful to visualize the scope of the critical |
| 522 | // section. |
| 523 | drop(waiters); |
| 524 | } |
| 525 | State::Done => { |
| 526 | // Safety: State::Done means it is no longer shared |
| 527 | let w = unsafe { &mut *waiter.get() }; |
| 528 | |
| 529 | let curr = scheduled_io.readiness.load(Acquire); |
| 530 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
| 531 | |
| 532 | // The returned tick might be newer than the event |
| 533 | // which notified our waker. This is ok because the future |
| 534 | // still didn't return `Poll::Ready`. |
| 535 | let tick = TICK.unpack(curr) as u8; |
| 536 | |
| 537 | // The readiness state could have been cleared in the meantime, |
| 538 | // but we allow the returned ready set to be empty. |
| 539 | let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest); |
| 540 | |
| 541 | return Poll::Ready(ReadyEvent { |
| 542 | tick, |
| 543 | ready, |
| 544 | is_shutdown, |
| 545 | }); |
| 546 | } |
| 547 | } |
| 548 | } |
| 549 | } |
| 550 | } |
| 551 | |
| 552 | impl Drop for Readiness<'_> { |
| 553 | fn drop(&mut self) { |
| 554 | let mut waiters: MutexGuard<'_, Waiters> = self.scheduled_io.waiters.lock(); |
| 555 | |
| 556 | // Safety: `waiter` is only ever stored in `waiters` |
| 557 | unsafe { |
| 558 | waiters |
| 559 | .list |
| 560 | .remove(node:NonNull::new_unchecked(self.waiter.get())) |
| 561 | }; |
| 562 | } |
| 563 | } |
| 564 | |
| 565 | unsafe impl Send for Readiness<'_> {} |
| 566 | unsafe impl Sync for Readiness<'_> {} |
| 567 | |