1 | use crate::io::interest::Interest; |
2 | use crate::io::ready::Ready; |
3 | use crate::loom::sync::atomic::AtomicUsize; |
4 | use crate::loom::sync::Mutex; |
5 | use crate::runtime::io::{Direction, ReadyEvent, Tick}; |
6 | use crate::util::bit; |
7 | use crate::util::linked_list::{self, LinkedList}; |
8 | use crate::util::WakeList; |
9 | |
10 | use std::cell::UnsafeCell; |
11 | use std::future::Future; |
12 | use std::marker::PhantomPinned; |
13 | use std::pin::Pin; |
14 | use std::ptr::NonNull; |
15 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
16 | use std::task::{Context, Poll, Waker}; |
17 | |
18 | /// Stored in the I/O driver resource slab. |
19 | #[derive(Debug)] |
20 | // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied |
21 | // from crossbeam-utils/src/cache_padded.rs |
22 | // |
23 | // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache |
24 | // lines at a time, so we have to align to 128 bytes rather than 64. |
25 | // |
26 | // Sources: |
27 | // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf |
28 | // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 |
29 | // |
30 | // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. |
31 | // |
32 | // Sources: |
33 | // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ |
34 | // |
35 | // powerpc64 has 128-byte cache line size. |
36 | // |
37 | // Sources: |
38 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 |
39 | #[cfg_attr ( |
40 | any( |
41 | target_arch = "x86_64" , |
42 | target_arch = "aarch64" , |
43 | target_arch = "powerpc64" , |
44 | ), |
45 | repr(align(128)) |
46 | )] |
47 | // arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. |
48 | // |
49 | // Sources: |
50 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 |
51 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 |
52 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 |
53 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 |
54 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 |
55 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 |
56 | #[cfg_attr ( |
57 | any( |
58 | target_arch = "arm" , |
59 | target_arch = "mips" , |
60 | target_arch = "mips64" , |
61 | target_arch = "sparc" , |
62 | target_arch = "hexagon" , |
63 | ), |
64 | repr(align(32)) |
65 | )] |
66 | // m68k has 16-byte cache line size. |
67 | // |
68 | // Sources: |
69 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 |
70 | #[cfg_attr (target_arch = "m68k" , repr(align(16)))] |
71 | // s390x has 256-byte cache line size. |
72 | // |
73 | // Sources: |
74 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 |
75 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 |
76 | #[cfg_attr (target_arch = "s390x" , repr(align(256)))] |
77 | // x86, riscv, wasm, and sparc64 have 64-byte cache line size. |
78 | // |
79 | // Sources: |
80 | // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 |
81 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 |
82 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 |
83 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 |
84 | // |
85 | // All others are assumed to have 64-byte cache line size. |
86 | #[cfg_attr ( |
87 | not(any( |
88 | target_arch = "x86_64" , |
89 | target_arch = "aarch64" , |
90 | target_arch = "powerpc64" , |
91 | target_arch = "arm" , |
92 | target_arch = "mips" , |
93 | target_arch = "mips64" , |
94 | target_arch = "sparc" , |
95 | target_arch = "hexagon" , |
96 | target_arch = "m68k" , |
97 | target_arch = "s390x" , |
98 | )), |
99 | repr(align(64)) |
100 | )] |
101 | pub(crate) struct ScheduledIo { |
102 | pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>, |
103 | |
104 | /// Packs the resource's readiness and I/O driver latest tick. |
105 | readiness: AtomicUsize, |
106 | |
107 | waiters: Mutex<Waiters>, |
108 | } |
109 | |
110 | type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
111 | |
112 | #[derive(Debug, Default)] |
113 | struct Waiters { |
114 | /// List of all current waiters. |
115 | list: WaitList, |
116 | |
117 | /// Waker used for AsyncRead. |
118 | reader: Option<Waker>, |
119 | |
120 | /// Waker used for AsyncWrite. |
121 | writer: Option<Waker>, |
122 | } |
123 | |
124 | #[derive(Debug)] |
125 | struct Waiter { |
126 | pointers: linked_list::Pointers<Waiter>, |
127 | |
128 | /// The waker for this task. |
129 | waker: Option<Waker>, |
130 | |
131 | /// The interest this waiter is waiting on. |
132 | interest: Interest, |
133 | |
134 | is_ready: bool, |
135 | |
136 | /// Should never be `!Unpin`. |
137 | _p: PhantomPinned, |
138 | } |
139 | |
140 | generate_addr_of_methods! { |
141 | impl<> Waiter { |
142 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
143 | &self.pointers |
144 | } |
145 | } |
146 | } |
147 | |
148 | /// Future returned by `readiness()`. |
149 | struct Readiness<'a> { |
150 | scheduled_io: &'a ScheduledIo, |
151 | |
152 | state: State, |
153 | |
154 | /// Entry in the waiter `LinkedList`. |
155 | waiter: UnsafeCell<Waiter>, |
156 | } |
157 | |
158 | enum State { |
159 | Init, |
160 | Waiting, |
161 | Done, |
162 | } |
163 | |
164 | // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. |
165 | // |
166 | // | shutdown | driver tick | readiness | |
167 | // |----------+-------------+-----------| |
168 | // | 1 bit | 15 bits + 16 bits | |
169 | |
170 | const READINESS: bit::Pack = bit::Pack::least_significant(16); |
171 | |
172 | const TICK: bit::Pack = READINESS.then(15); |
173 | |
174 | const SHUTDOWN: bit::Pack = TICK.then(1); |
175 | |
176 | // ===== impl ScheduledIo ===== |
177 | |
178 | impl Default for ScheduledIo { |
179 | fn default() -> ScheduledIo { |
180 | ScheduledIo { |
181 | linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()), |
182 | readiness: AtomicUsize::new(0), |
183 | waiters: Mutex::new(Waiters::default()), |
184 | } |
185 | } |
186 | } |
187 | |
188 | impl ScheduledIo { |
189 | pub(crate) fn token(&self) -> mio::Token { |
190 | // use `expose_addr` when stable |
191 | mio::Token(self as *const _ as usize) |
192 | } |
193 | |
194 | /// Invoked when the IO driver is shut down; forces this ScheduledIo into a |
195 | /// permanently shutdown state. |
196 | pub(super) fn shutdown(&self) { |
197 | let mask = SHUTDOWN.pack(1, 0); |
198 | self.readiness.fetch_or(mask, AcqRel); |
199 | self.wake(Ready::ALL); |
200 | } |
201 | |
202 | /// Sets the readiness on this `ScheduledIo` by invoking the given closure on |
203 | /// the current value, returning the previous readiness value. |
204 | /// |
205 | /// # Arguments |
206 | /// - `tick`: whether setting the tick or trying to clear readiness for a |
207 | /// specific tick. |
208 | /// - `f`: a closure returning a new readiness value given the previous |
209 | /// readiness. |
210 | pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) { |
211 | let mut current = self.readiness.load(Acquire); |
212 | |
213 | // If the io driver is shut down, then you are only allowed to clear readiness. |
214 | debug_assert!(SHUTDOWN.unpack(current) == 0 || matches!(tick, Tick::Clear(_))); |
215 | |
216 | loop { |
217 | // Mask out the tick bits so that the modifying function doesn't see |
218 | // them. |
219 | let current_readiness = Ready::from_usize(current); |
220 | let new = f(current_readiness); |
221 | |
222 | let new_tick = match tick { |
223 | Tick::Set => { |
224 | let current = TICK.unpack(current); |
225 | current.wrapping_add(1) % (TICK.max_value() + 1) |
226 | } |
227 | Tick::Clear(t) => { |
228 | if TICK.unpack(current) as u8 != t { |
229 | // Trying to clear readiness with an old event! |
230 | return; |
231 | } |
232 | |
233 | t as usize |
234 | } |
235 | }; |
236 | let next = TICK.pack(new_tick, new.as_usize()); |
237 | |
238 | match self |
239 | .readiness |
240 | .compare_exchange(current, next, AcqRel, Acquire) |
241 | { |
242 | Ok(_) => return, |
243 | // we lost the race, retry! |
244 | Err(actual) => current = actual, |
245 | } |
246 | } |
247 | } |
248 | |
249 | /// Notifies all pending waiters that have registered interest in `ready`. |
250 | /// |
251 | /// There may be many waiters to notify. Waking the pending task **must** be |
252 | /// done from outside of the lock otherwise there is a potential for a |
253 | /// deadlock. |
254 | /// |
255 | /// A stack array of wakers is created and filled with wakers to notify, the |
256 | /// lock is released, and the wakers are notified. Because there may be more |
257 | /// than 32 wakers to notify, if the stack array fills up, the lock is |
258 | /// released, the array is cleared, and the iteration continues. |
259 | pub(super) fn wake(&self, ready: Ready) { |
260 | let mut wakers = WakeList::new(); |
261 | |
262 | let mut waiters = self.waiters.lock(); |
263 | |
264 | // check for AsyncRead slot |
265 | if ready.is_readable() { |
266 | if let Some(waker) = waiters.reader.take() { |
267 | wakers.push(waker); |
268 | } |
269 | } |
270 | |
271 | // check for AsyncWrite slot |
272 | if ready.is_writable() { |
273 | if let Some(waker) = waiters.writer.take() { |
274 | wakers.push(waker); |
275 | } |
276 | } |
277 | |
278 | 'outer: loop { |
279 | let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); |
280 | |
281 | while wakers.can_push() { |
282 | match iter.next() { |
283 | Some(waiter) => { |
284 | let waiter = unsafe { &mut *waiter.as_ptr() }; |
285 | |
286 | if let Some(waker) = waiter.waker.take() { |
287 | waiter.is_ready = true; |
288 | wakers.push(waker); |
289 | } |
290 | } |
291 | None => { |
292 | break 'outer; |
293 | } |
294 | } |
295 | } |
296 | |
297 | drop(waiters); |
298 | |
299 | wakers.wake_all(); |
300 | |
301 | // Acquire the lock again. |
302 | waiters = self.waiters.lock(); |
303 | } |
304 | |
305 | // Release the lock before notifying |
306 | drop(waiters); |
307 | |
308 | wakers.wake_all(); |
309 | } |
310 | |
311 | pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { |
312 | let curr = self.readiness.load(Acquire); |
313 | |
314 | ReadyEvent { |
315 | tick: TICK.unpack(curr) as u8, |
316 | ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), |
317 | is_shutdown: SHUTDOWN.unpack(curr) != 0, |
318 | } |
319 | } |
320 | |
321 | /// Polls for readiness events in a given direction. |
322 | /// |
323 | /// These are to support `AsyncRead` and `AsyncWrite` polling methods, |
324 | /// which cannot use the `async fn` version. This uses reserved reader |
325 | /// and writer slots. |
326 | pub(super) fn poll_readiness( |
327 | &self, |
328 | cx: &mut Context<'_>, |
329 | direction: Direction, |
330 | ) -> Poll<ReadyEvent> { |
331 | let curr = self.readiness.load(Acquire); |
332 | |
333 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
334 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
335 | |
336 | if ready.is_empty() && !is_shutdown { |
337 | // Update the task info |
338 | let mut waiters = self.waiters.lock(); |
339 | let slot = match direction { |
340 | Direction::Read => &mut waiters.reader, |
341 | Direction::Write => &mut waiters.writer, |
342 | }; |
343 | |
344 | // Avoid cloning the waker if one is already stored that matches the |
345 | // current task. |
346 | match slot { |
347 | Some(existing) => { |
348 | if !existing.will_wake(cx.waker()) { |
349 | *existing = cx.waker().clone(); |
350 | } |
351 | } |
352 | None => { |
353 | *slot = Some(cx.waker().clone()); |
354 | } |
355 | } |
356 | |
357 | // Try again, in case the readiness was changed while we were |
358 | // taking the waiters lock |
359 | let curr = self.readiness.load(Acquire); |
360 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
361 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
362 | if is_shutdown { |
363 | Poll::Ready(ReadyEvent { |
364 | tick: TICK.unpack(curr) as u8, |
365 | ready: direction.mask(), |
366 | is_shutdown, |
367 | }) |
368 | } else if ready.is_empty() { |
369 | Poll::Pending |
370 | } else { |
371 | Poll::Ready(ReadyEvent { |
372 | tick: TICK.unpack(curr) as u8, |
373 | ready, |
374 | is_shutdown, |
375 | }) |
376 | } |
377 | } else { |
378 | Poll::Ready(ReadyEvent { |
379 | tick: TICK.unpack(curr) as u8, |
380 | ready, |
381 | is_shutdown, |
382 | }) |
383 | } |
384 | } |
385 | |
386 | pub(crate) fn clear_readiness(&self, event: ReadyEvent) { |
387 | // This consumes the current readiness state **except** for closed |
388 | // states. Closed states are excluded because they are final states. |
389 | let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; |
390 | self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed); |
391 | } |
392 | |
393 | pub(crate) fn clear_wakers(&self) { |
394 | let mut waiters = self.waiters.lock(); |
395 | waiters.reader.take(); |
396 | waiters.writer.take(); |
397 | } |
398 | } |
399 | |
400 | impl Drop for ScheduledIo { |
401 | fn drop(&mut self) { |
402 | self.wake(Ready::ALL); |
403 | } |
404 | } |
405 | |
406 | unsafe impl Send for ScheduledIo {} |
407 | unsafe impl Sync for ScheduledIo {} |
408 | |
409 | impl ScheduledIo { |
410 | /// An async version of `poll_readiness` which uses a linked list of wakers. |
411 | pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { |
412 | self.readiness_fut(interest).await |
413 | } |
414 | |
415 | // This is in a separate function so that the borrow checker doesn't think |
416 | // we are borrowing the `UnsafeCell` possibly over await boundaries. |
417 | // |
418 | // Go figure. |
419 | fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { |
420 | Readiness { |
421 | scheduled_io: self, |
422 | state: State::Init, |
423 | waiter: UnsafeCell::new(Waiter { |
424 | pointers: linked_list::Pointers::new(), |
425 | waker: None, |
426 | is_ready: false, |
427 | interest, |
428 | _p: PhantomPinned, |
429 | }), |
430 | } |
431 | } |
432 | } |
433 | |
434 | unsafe impl linked_list::Link for Waiter { |
435 | type Handle = NonNull<Waiter>; |
436 | type Target = Waiter; |
437 | |
438 | fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { |
439 | *handle |
440 | } |
441 | |
442 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
443 | ptr |
444 | } |
445 | |
446 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
447 | Waiter::addr_of_pointers(target) |
448 | } |
449 | } |
450 | |
451 | // ===== impl Readiness ===== |
452 | |
453 | impl Future for Readiness<'_> { |
454 | type Output = ReadyEvent; |
455 | |
456 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
457 | use std::sync::atomic::Ordering::SeqCst; |
458 | |
459 | let (scheduled_io, state, waiter) = unsafe { |
460 | let me = self.get_unchecked_mut(); |
461 | (&me.scheduled_io, &mut me.state, &me.waiter) |
462 | }; |
463 | |
464 | loop { |
465 | match *state { |
466 | State::Init => { |
467 | // Optimistically check existing readiness |
468 | let curr = scheduled_io.readiness.load(SeqCst); |
469 | let ready = Ready::from_usize(READINESS.unpack(curr)); |
470 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
471 | |
472 | // Safety: `waiter.interest` never changes |
473 | let interest = unsafe { (*waiter.get()).interest }; |
474 | let ready = ready.intersection(interest); |
475 | |
476 | if !ready.is_empty() || is_shutdown { |
477 | // Currently ready! |
478 | let tick = TICK.unpack(curr) as u8; |
479 | *state = State::Done; |
480 | return Poll::Ready(ReadyEvent { |
481 | tick, |
482 | ready, |
483 | is_shutdown, |
484 | }); |
485 | } |
486 | |
487 | // Wasn't ready, take the lock (and check again while locked). |
488 | let mut waiters = scheduled_io.waiters.lock(); |
489 | |
490 | let curr = scheduled_io.readiness.load(SeqCst); |
491 | let mut ready = Ready::from_usize(READINESS.unpack(curr)); |
492 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
493 | |
494 | if is_shutdown { |
495 | ready = Ready::ALL; |
496 | } |
497 | |
498 | let ready = ready.intersection(interest); |
499 | |
500 | if !ready.is_empty() || is_shutdown { |
501 | // Currently ready! |
502 | let tick = TICK.unpack(curr) as u8; |
503 | *state = State::Done; |
504 | return Poll::Ready(ReadyEvent { |
505 | tick, |
506 | ready, |
507 | is_shutdown, |
508 | }); |
509 | } |
510 | |
511 | // Not ready even after locked, insert into list... |
512 | |
513 | // Safety: called while locked |
514 | unsafe { |
515 | (*waiter.get()).waker = Some(cx.waker().clone()); |
516 | } |
517 | |
518 | // Insert the waiter into the linked list |
519 | // |
520 | // safety: pointers from `UnsafeCell` are never null. |
521 | waiters |
522 | .list |
523 | .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); |
524 | *state = State::Waiting; |
525 | } |
526 | State::Waiting => { |
527 | // Currently in the "Waiting" state, implying the caller has |
528 | // a waiter stored in the waiter list (guarded by |
529 | // `notify.waiters`). In order to access the waker fields, |
530 | // we must hold the lock. |
531 | |
532 | let waiters = scheduled_io.waiters.lock(); |
533 | |
534 | // Safety: called while locked |
535 | let w = unsafe { &mut *waiter.get() }; |
536 | |
537 | if w.is_ready { |
538 | // Our waker has been notified. |
539 | *state = State::Done; |
540 | } else { |
541 | // Update the waker, if necessary. |
542 | if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { |
543 | w.waker = Some(cx.waker().clone()); |
544 | } |
545 | |
546 | return Poll::Pending; |
547 | } |
548 | |
549 | // Explicit drop of the lock to indicate the scope that the |
550 | // lock is held. Because holding the lock is required to |
551 | // ensure safe access to fields not held within the lock, it |
552 | // is helpful to visualize the scope of the critical |
553 | // section. |
554 | drop(waiters); |
555 | } |
556 | State::Done => { |
557 | // Safety: State::Done means it is no longer shared |
558 | let w = unsafe { &mut *waiter.get() }; |
559 | |
560 | let curr = scheduled_io.readiness.load(Acquire); |
561 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
562 | |
563 | // The returned tick might be newer than the event |
564 | // which notified our waker. This is ok because the future |
565 | // still didn't return `Poll::Ready`. |
566 | let tick = TICK.unpack(curr) as u8; |
567 | |
568 | // The readiness state could have been cleared in the meantime, |
569 | // but we allow the returned ready set to be empty. |
570 | let curr_ready = Ready::from_usize(READINESS.unpack(curr)); |
571 | let ready = curr_ready.intersection(w.interest); |
572 | |
573 | return Poll::Ready(ReadyEvent { |
574 | tick, |
575 | ready, |
576 | is_shutdown, |
577 | }); |
578 | } |
579 | } |
580 | } |
581 | } |
582 | } |
583 | |
584 | impl Drop for Readiness<'_> { |
585 | fn drop(&mut self) { |
586 | let mut waiters = self.scheduled_io.waiters.lock(); |
587 | |
588 | // Safety: `waiter` is only ever stored in `waiters` |
589 | unsafe { |
590 | waiters |
591 | .list |
592 | .remove(NonNull::new_unchecked(self.waiter.get())) |
593 | }; |
594 | } |
595 | } |
596 | |
597 | unsafe impl Send for Readiness<'_> {} |
598 | unsafe impl Sync for Readiness<'_> {} |
599 | |