1use crate::io::interest::Interest;
2use crate::io::ready::Ready;
3use crate::loom::sync::atomic::AtomicUsize;
4use crate::loom::sync::Mutex;
5use crate::runtime::io::{Direction, ReadyEvent, Tick};
6use crate::util::bit;
7use crate::util::linked_list::{self, LinkedList};
8use crate::util::WakeList;
9
10use std::cell::UnsafeCell;
11use std::future::Future;
12use std::marker::PhantomPinned;
13use std::pin::Pin;
14use std::ptr::NonNull;
15use std::sync::atomic::Ordering::{AcqRel, Acquire};
16use std::task::{Context, Poll, Waker};
17
18/// Stored in the I/O driver resource slab.
19#[derive(Debug)]
20// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
21// from crossbeam-utils/src/cache_padded.rs
22//
23// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
24// lines at a time, so we have to align to 128 bytes rather than 64.
25//
26// Sources:
27// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
28// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
29//
30// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
31//
32// Sources:
33// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
34//
35// powerpc64 has 128-byte cache line size.
36//
37// Sources:
38// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
39#[cfg_attr(
40 any(
41 target_arch = "x86_64",
42 target_arch = "aarch64",
43 target_arch = "powerpc64",
44 ),
45 repr(align(128))
46)]
47// arm, mips, mips64, sparc, and hexagon have 32-byte cache line size.
48//
49// Sources:
50// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
51// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
52// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
53// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
54// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
55// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
56#[cfg_attr(
57 any(
58 target_arch = "arm",
59 target_arch = "mips",
60 target_arch = "mips64",
61 target_arch = "sparc",
62 target_arch = "hexagon",
63 ),
64 repr(align(32))
65)]
66// m68k has 16-byte cache line size.
67//
68// Sources:
69// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
70#[cfg_attr(target_arch = "m68k", repr(align(16)))]
71// s390x has 256-byte cache line size.
72//
73// Sources:
74// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
75// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
76#[cfg_attr(target_arch = "s390x", repr(align(256)))]
77// x86, riscv, wasm, and sparc64 have 64-byte cache line size.
78//
79// Sources:
80// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
81// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
82// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
83// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10
84//
85// All others are assumed to have 64-byte cache line size.
86#[cfg_attr(
87 not(any(
88 target_arch = "x86_64",
89 target_arch = "aarch64",
90 target_arch = "powerpc64",
91 target_arch = "arm",
92 target_arch = "mips",
93 target_arch = "mips64",
94 target_arch = "sparc",
95 target_arch = "hexagon",
96 target_arch = "m68k",
97 target_arch = "s390x",
98 )),
99 repr(align(64))
100)]
101pub(crate) struct ScheduledIo {
102 pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>,
103
104 /// Packs the resource's readiness and I/O driver latest tick.
105 readiness: AtomicUsize,
106
107 waiters: Mutex<Waiters>,
108}
109
110type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
111
112#[derive(Debug, Default)]
113struct Waiters {
114 /// List of all current waiters.
115 list: WaitList,
116
117 /// Waker used for AsyncRead.
118 reader: Option<Waker>,
119
120 /// Waker used for AsyncWrite.
121 writer: Option<Waker>,
122}
123
124#[derive(Debug)]
125struct Waiter {
126 pointers: linked_list::Pointers<Waiter>,
127
128 /// The waker for this task.
129 waker: Option<Waker>,
130
131 /// The interest this waiter is waiting on.
132 interest: Interest,
133
134 is_ready: bool,
135
136 /// Should never be `!Unpin`.
137 _p: PhantomPinned,
138}
139
140generate_addr_of_methods! {
141 impl<> Waiter {
142 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
143 &self.pointers
144 }
145 }
146}
147
148/// Future returned by `readiness()`.
149struct Readiness<'a> {
150 scheduled_io: &'a ScheduledIo,
151
152 state: State,
153
154 /// Entry in the waiter `LinkedList`.
155 waiter: UnsafeCell<Waiter>,
156}
157
158enum State {
159 Init,
160 Waiting,
161 Done,
162}
163
164// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
165//
166// | shutdown | driver tick | readiness |
167// |----------+-------------+-----------|
168// | 1 bit | 15 bits + 16 bits |
169
170const READINESS: bit::Pack = bit::Pack::least_significant(16);
171
172const TICK: bit::Pack = READINESS.then(15);
173
174const SHUTDOWN: bit::Pack = TICK.then(1);
175
176// ===== impl ScheduledIo =====
177
178impl Default for ScheduledIo {
179 fn default() -> ScheduledIo {
180 ScheduledIo {
181 linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()),
182 readiness: AtomicUsize::new(0),
183 waiters: Mutex::new(Waiters::default()),
184 }
185 }
186}
187
188impl ScheduledIo {
189 pub(crate) fn token(&self) -> mio::Token {
190 // use `expose_addr` when stable
191 mio::Token(self as *const _ as usize)
192 }
193
194 /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
195 /// permanently shutdown state.
196 pub(super) fn shutdown(&self) {
197 let mask = SHUTDOWN.pack(1, 0);
198 self.readiness.fetch_or(mask, AcqRel);
199 self.wake(Ready::ALL);
200 }
201
202 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
203 /// the current value, returning the previous readiness value.
204 ///
205 /// # Arguments
206 /// - `tick`: whether setting the tick or trying to clear readiness for a
207 /// specific tick.
208 /// - `f`: a closure returning a new readiness value given the previous
209 /// readiness.
210 pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) {
211 let mut current = self.readiness.load(Acquire);
212
213 // If the io driver is shut down, then you are only allowed to clear readiness.
214 debug_assert!(SHUTDOWN.unpack(current) == 0 || matches!(tick, Tick::Clear(_)));
215
216 loop {
217 // Mask out the tick bits so that the modifying function doesn't see
218 // them.
219 let current_readiness = Ready::from_usize(current);
220 let new = f(current_readiness);
221
222 let new_tick = match tick {
223 Tick::Set => {
224 let current = TICK.unpack(current);
225 current.wrapping_add(1) % (TICK.max_value() + 1)
226 }
227 Tick::Clear(t) => {
228 if TICK.unpack(current) as u8 != t {
229 // Trying to clear readiness with an old event!
230 return;
231 }
232
233 t as usize
234 }
235 };
236 let next = TICK.pack(new_tick, new.as_usize());
237
238 match self
239 .readiness
240 .compare_exchange(current, next, AcqRel, Acquire)
241 {
242 Ok(_) => return,
243 // we lost the race, retry!
244 Err(actual) => current = actual,
245 }
246 }
247 }
248
249 /// Notifies all pending waiters that have registered interest in `ready`.
250 ///
251 /// There may be many waiters to notify. Waking the pending task **must** be
252 /// done from outside of the lock otherwise there is a potential for a
253 /// deadlock.
254 ///
255 /// A stack array of wakers is created and filled with wakers to notify, the
256 /// lock is released, and the wakers are notified. Because there may be more
257 /// than 32 wakers to notify, if the stack array fills up, the lock is
258 /// released, the array is cleared, and the iteration continues.
259 pub(super) fn wake(&self, ready: Ready) {
260 let mut wakers = WakeList::new();
261
262 let mut waiters = self.waiters.lock();
263
264 // check for AsyncRead slot
265 if ready.is_readable() {
266 if let Some(waker) = waiters.reader.take() {
267 wakers.push(waker);
268 }
269 }
270
271 // check for AsyncWrite slot
272 if ready.is_writable() {
273 if let Some(waker) = waiters.writer.take() {
274 wakers.push(waker);
275 }
276 }
277
278 'outer: loop {
279 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
280
281 while wakers.can_push() {
282 match iter.next() {
283 Some(waiter) => {
284 let waiter = unsafe { &mut *waiter.as_ptr() };
285
286 if let Some(waker) = waiter.waker.take() {
287 waiter.is_ready = true;
288 wakers.push(waker);
289 }
290 }
291 None => {
292 break 'outer;
293 }
294 }
295 }
296
297 drop(waiters);
298
299 wakers.wake_all();
300
301 // Acquire the lock again.
302 waiters = self.waiters.lock();
303 }
304
305 // Release the lock before notifying
306 drop(waiters);
307
308 wakers.wake_all();
309 }
310
311 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
312 let curr = self.readiness.load(Acquire);
313
314 ReadyEvent {
315 tick: TICK.unpack(curr) as u8,
316 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
317 is_shutdown: SHUTDOWN.unpack(curr) != 0,
318 }
319 }
320
321 /// Polls for readiness events in a given direction.
322 ///
323 /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
324 /// which cannot use the `async fn` version. This uses reserved reader
325 /// and writer slots.
326 pub(super) fn poll_readiness(
327 &self,
328 cx: &mut Context<'_>,
329 direction: Direction,
330 ) -> Poll<ReadyEvent> {
331 let curr = self.readiness.load(Acquire);
332
333 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
334 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
335
336 if ready.is_empty() && !is_shutdown {
337 // Update the task info
338 let mut waiters = self.waiters.lock();
339 let slot = match direction {
340 Direction::Read => &mut waiters.reader,
341 Direction::Write => &mut waiters.writer,
342 };
343
344 // Avoid cloning the waker if one is already stored that matches the
345 // current task.
346 match slot {
347 Some(existing) => {
348 if !existing.will_wake(cx.waker()) {
349 *existing = cx.waker().clone();
350 }
351 }
352 None => {
353 *slot = Some(cx.waker().clone());
354 }
355 }
356
357 // Try again, in case the readiness was changed while we were
358 // taking the waiters lock
359 let curr = self.readiness.load(Acquire);
360 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
361 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
362 if is_shutdown {
363 Poll::Ready(ReadyEvent {
364 tick: TICK.unpack(curr) as u8,
365 ready: direction.mask(),
366 is_shutdown,
367 })
368 } else if ready.is_empty() {
369 Poll::Pending
370 } else {
371 Poll::Ready(ReadyEvent {
372 tick: TICK.unpack(curr) as u8,
373 ready,
374 is_shutdown,
375 })
376 }
377 } else {
378 Poll::Ready(ReadyEvent {
379 tick: TICK.unpack(curr) as u8,
380 ready,
381 is_shutdown,
382 })
383 }
384 }
385
386 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
387 // This consumes the current readiness state **except** for closed
388 // states. Closed states are excluded because they are final states.
389 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
390 self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed);
391 }
392
393 pub(crate) fn clear_wakers(&self) {
394 let mut waiters = self.waiters.lock();
395 waiters.reader.take();
396 waiters.writer.take();
397 }
398}
399
400impl Drop for ScheduledIo {
401 fn drop(&mut self) {
402 self.wake(Ready::ALL);
403 }
404}
405
406unsafe impl Send for ScheduledIo {}
407unsafe impl Sync for ScheduledIo {}
408
409impl ScheduledIo {
410 /// An async version of `poll_readiness` which uses a linked list of wakers.
411 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
412 self.readiness_fut(interest).await
413 }
414
415 // This is in a separate function so that the borrow checker doesn't think
416 // we are borrowing the `UnsafeCell` possibly over await boundaries.
417 //
418 // Go figure.
419 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
420 Readiness {
421 scheduled_io: self,
422 state: State::Init,
423 waiter: UnsafeCell::new(Waiter {
424 pointers: linked_list::Pointers::new(),
425 waker: None,
426 is_ready: false,
427 interest,
428 _p: PhantomPinned,
429 }),
430 }
431 }
432}
433
434unsafe impl linked_list::Link for Waiter {
435 type Handle = NonNull<Waiter>;
436 type Target = Waiter;
437
438 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
439 *handle
440 }
441
442 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
443 ptr
444 }
445
446 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
447 Waiter::addr_of_pointers(target)
448 }
449}
450
451// ===== impl Readiness =====
452
453impl Future for Readiness<'_> {
454 type Output = ReadyEvent;
455
456 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
457 use std::sync::atomic::Ordering::SeqCst;
458
459 let (scheduled_io, state, waiter) = unsafe {
460 let me = self.get_unchecked_mut();
461 (&me.scheduled_io, &mut me.state, &me.waiter)
462 };
463
464 loop {
465 match *state {
466 State::Init => {
467 // Optimistically check existing readiness
468 let curr = scheduled_io.readiness.load(SeqCst);
469 let ready = Ready::from_usize(READINESS.unpack(curr));
470 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
471
472 // Safety: `waiter.interest` never changes
473 let interest = unsafe { (*waiter.get()).interest };
474 let ready = ready.intersection(interest);
475
476 if !ready.is_empty() || is_shutdown {
477 // Currently ready!
478 let tick = TICK.unpack(curr) as u8;
479 *state = State::Done;
480 return Poll::Ready(ReadyEvent {
481 tick,
482 ready,
483 is_shutdown,
484 });
485 }
486
487 // Wasn't ready, take the lock (and check again while locked).
488 let mut waiters = scheduled_io.waiters.lock();
489
490 let curr = scheduled_io.readiness.load(SeqCst);
491 let mut ready = Ready::from_usize(READINESS.unpack(curr));
492 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
493
494 if is_shutdown {
495 ready = Ready::ALL;
496 }
497
498 let ready = ready.intersection(interest);
499
500 if !ready.is_empty() || is_shutdown {
501 // Currently ready!
502 let tick = TICK.unpack(curr) as u8;
503 *state = State::Done;
504 return Poll::Ready(ReadyEvent {
505 tick,
506 ready,
507 is_shutdown,
508 });
509 }
510
511 // Not ready even after locked, insert into list...
512
513 // Safety: called while locked
514 unsafe {
515 (*waiter.get()).waker = Some(cx.waker().clone());
516 }
517
518 // Insert the waiter into the linked list
519 //
520 // safety: pointers from `UnsafeCell` are never null.
521 waiters
522 .list
523 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
524 *state = State::Waiting;
525 }
526 State::Waiting => {
527 // Currently in the "Waiting" state, implying the caller has
528 // a waiter stored in the waiter list (guarded by
529 // `notify.waiters`). In order to access the waker fields,
530 // we must hold the lock.
531
532 let waiters = scheduled_io.waiters.lock();
533
534 // Safety: called while locked
535 let w = unsafe { &mut *waiter.get() };
536
537 if w.is_ready {
538 // Our waker has been notified.
539 *state = State::Done;
540 } else {
541 // Update the waker, if necessary.
542 if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
543 w.waker = Some(cx.waker().clone());
544 }
545
546 return Poll::Pending;
547 }
548
549 // Explicit drop of the lock to indicate the scope that the
550 // lock is held. Because holding the lock is required to
551 // ensure safe access to fields not held within the lock, it
552 // is helpful to visualize the scope of the critical
553 // section.
554 drop(waiters);
555 }
556 State::Done => {
557 // Safety: State::Done means it is no longer shared
558 let w = unsafe { &mut *waiter.get() };
559
560 let curr = scheduled_io.readiness.load(Acquire);
561 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
562
563 // The returned tick might be newer than the event
564 // which notified our waker. This is ok because the future
565 // still didn't return `Poll::Ready`.
566 let tick = TICK.unpack(curr) as u8;
567
568 // The readiness state could have been cleared in the meantime,
569 // but we allow the returned ready set to be empty.
570 let curr_ready = Ready::from_usize(READINESS.unpack(curr));
571 let ready = curr_ready.intersection(w.interest);
572
573 return Poll::Ready(ReadyEvent {
574 tick,
575 ready,
576 is_shutdown,
577 });
578 }
579 }
580 }
581 }
582}
583
584impl Drop for Readiness<'_> {
585 fn drop(&mut self) {
586 let mut waiters = self.scheduled_io.waiters.lock();
587
588 // Safety: `waiter` is only ever stored in `waiters`
589 unsafe {
590 waiters
591 .list
592 .remove(NonNull::new_unchecked(self.waiter.get()))
593 };
594 }
595}
596
597unsafe impl Send for Readiness<'_> {}
598unsafe impl Sync for Readiness<'_> {}
599