1 | use crate::io::interest::Interest; |
2 | use crate::io::ready::Ready; |
3 | use crate::loom::sync::atomic::AtomicUsize; |
4 | use crate::loom::sync::Mutex; |
5 | use crate::runtime::io::{Direction, ReadyEvent, Tick}; |
6 | use crate::util::bit; |
7 | use crate::util::linked_list::{self, LinkedList}; |
8 | use crate::util::WakeList; |
9 | |
10 | use std::cell::UnsafeCell; |
11 | use std::future::Future; |
12 | use std::marker::PhantomPinned; |
13 | use std::pin::Pin; |
14 | use std::ptr::NonNull; |
15 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
16 | use std::task::{Context, Poll, Waker}; |
17 | |
18 | /// Stored in the I/O driver resource slab. |
19 | #[derive (Debug)] |
20 | // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied |
21 | // from crossbeam-utils/src/cache_padded.rs |
22 | // |
23 | // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache |
24 | // lines at a time, so we have to align to 128 bytes rather than 64. |
25 | // |
26 | // Sources: |
27 | // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf |
28 | // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107 |
29 | // |
30 | // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size. |
31 | // |
32 | // Sources: |
33 | // - https://www.mono-project.com/news/2016/09/12/arm64-icache/ |
34 | // |
35 | // powerpc64 has 128-byte cache line size. |
36 | // |
37 | // Sources: |
38 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9 |
39 | #[cfg_attr ( |
40 | any( |
41 | target_arch = "x86_64" , |
42 | target_arch = "aarch64" , |
43 | target_arch = "powerpc64" , |
44 | ), |
45 | repr(align(128)) |
46 | )] |
47 | // arm, mips, mips64, sparc, and hexagon have 32-byte cache line size. |
48 | // |
49 | // Sources: |
50 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7 |
51 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7 |
52 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7 |
53 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9 |
54 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17 |
55 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12 |
56 | #[cfg_attr ( |
57 | any( |
58 | target_arch = "arm" , |
59 | target_arch = "mips" , |
60 | target_arch = "mips64" , |
61 | target_arch = "sparc" , |
62 | target_arch = "hexagon" , |
63 | ), |
64 | repr(align(32)) |
65 | )] |
66 | // m68k has 16-byte cache line size. |
67 | // |
68 | // Sources: |
69 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9 |
70 | #[cfg_attr (target_arch = "m68k" , repr(align(16)))] |
71 | // s390x has 256-byte cache line size. |
72 | // |
73 | // Sources: |
74 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7 |
75 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13 |
76 | #[cfg_attr (target_arch = "s390x" , repr(align(256)))] |
77 | // x86, riscv, wasm, and sparc64 have 64-byte cache line size. |
78 | // |
79 | // Sources: |
80 | // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9 |
81 | // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7 |
82 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19 |
83 | // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/riscv/include/asm/cache.h#L10 |
84 | // |
85 | // All others are assumed to have 64-byte cache line size. |
86 | #[cfg_attr ( |
87 | not(any( |
88 | target_arch = "x86_64" , |
89 | target_arch = "aarch64" , |
90 | target_arch = "powerpc64" , |
91 | target_arch = "arm" , |
92 | target_arch = "mips" , |
93 | target_arch = "mips64" , |
94 | target_arch = "sparc" , |
95 | target_arch = "hexagon" , |
96 | target_arch = "m68k" , |
97 | target_arch = "s390x" , |
98 | )), |
99 | repr(align(64)) |
100 | )] |
101 | pub(crate) struct ScheduledIo { |
102 | pub(super) linked_list_pointers: UnsafeCell<linked_list::Pointers<Self>>, |
103 | |
104 | /// Packs the resource's readiness and I/O driver latest tick. |
105 | readiness: AtomicUsize, |
106 | |
107 | waiters: Mutex<Waiters>, |
108 | } |
109 | |
110 | type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
111 | |
112 | #[derive (Debug, Default)] |
113 | struct Waiters { |
114 | /// List of all current waiters. |
115 | list: WaitList, |
116 | |
117 | /// Waker used for `AsyncRead`. |
118 | reader: Option<Waker>, |
119 | |
120 | /// Waker used for `AsyncWrite`. |
121 | writer: Option<Waker>, |
122 | } |
123 | |
124 | #[derive (Debug)] |
125 | struct Waiter { |
126 | pointers: linked_list::Pointers<Waiter>, |
127 | |
128 | /// The waker for this task. |
129 | waker: Option<Waker>, |
130 | |
131 | /// The interest this waiter is waiting on. |
132 | interest: Interest, |
133 | |
134 | is_ready: bool, |
135 | |
136 | /// Should never be `!Unpin`. |
137 | _p: PhantomPinned, |
138 | } |
139 | |
140 | generate_addr_of_methods! { |
141 | impl<> Waiter { |
142 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
143 | &self.pointers |
144 | } |
145 | } |
146 | } |
147 | |
148 | /// Future returned by `readiness()`. |
149 | struct Readiness<'a> { |
150 | scheduled_io: &'a ScheduledIo, |
151 | |
152 | state: State, |
153 | |
154 | /// Entry in the waiter `LinkedList`. |
155 | waiter: UnsafeCell<Waiter>, |
156 | } |
157 | |
158 | enum State { |
159 | Init, |
160 | Waiting, |
161 | Done, |
162 | } |
163 | |
164 | // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. |
165 | // |
166 | // | shutdown | driver tick | readiness | |
167 | // |----------+-------------+-----------| |
168 | // | 1 bit | 15 bits + 16 bits | |
169 | |
170 | const READINESS: bit::Pack = bit::Pack::least_significant(width:16); |
171 | |
172 | const TICK: bit::Pack = READINESS.then(width:15); |
173 | |
174 | const SHUTDOWN: bit::Pack = TICK.then(width:1); |
175 | |
176 | // ===== impl ScheduledIo ===== |
177 | |
178 | impl Default for ScheduledIo { |
179 | fn default() -> ScheduledIo { |
180 | ScheduledIo { |
181 | linked_list_pointers: UnsafeCell::new(linked_list::Pointers::new()), |
182 | readiness: AtomicUsize::new(val:0), |
183 | waiters: Mutex::new(Waiters::default()), |
184 | } |
185 | } |
186 | } |
187 | |
188 | impl ScheduledIo { |
189 | pub(crate) fn token(&self) -> mio::Token { |
190 | mio::Token(super::EXPOSE_IO.expose_provenance(self)) |
191 | } |
192 | |
193 | /// Invoked when the IO driver is shut down; forces this `ScheduledIo` into a |
194 | /// permanently shutdown state. |
195 | pub(super) fn shutdown(&self) { |
196 | let mask = SHUTDOWN.pack(1, 0); |
197 | self.readiness.fetch_or(mask, AcqRel); |
198 | self.wake(Ready::ALL); |
199 | } |
200 | |
201 | /// Sets the readiness on this `ScheduledIo` by invoking the given closure on |
202 | /// the current value, returning the previous readiness value. |
203 | /// |
204 | /// # Arguments |
205 | /// - `tick`: whether setting the tick or trying to clear readiness for a |
206 | /// specific tick. |
207 | /// - `f`: a closure returning a new readiness value given the previous |
208 | /// readiness. |
209 | pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) { |
210 | let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { |
211 | // If the io driver is shut down, then you are only allowed to clear readiness. |
212 | debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_))); |
213 | |
214 | const MAX_TICK: usize = TICK.max_value() + 1; |
215 | let tick = TICK.unpack(curr); |
216 | |
217 | let new_tick = match tick_op { |
218 | // Trying to clear readiness with an old event! |
219 | Tick::Clear(t) if tick as u8 != t => return None, |
220 | Tick::Clear(t) => t as usize, |
221 | Tick::Set => tick.wrapping_add(1) % MAX_TICK, |
222 | }; |
223 | let ready = Ready::from_usize(READINESS.unpack(curr)); |
224 | Some(TICK.pack(new_tick, f(ready).as_usize())) |
225 | }); |
226 | } |
227 | |
228 | /// Notifies all pending waiters that have registered interest in `ready`. |
229 | /// |
230 | /// There may be many waiters to notify. Waking the pending task **must** be |
231 | /// done from outside of the lock otherwise there is a potential for a |
232 | /// deadlock. |
233 | /// |
234 | /// A stack array of wakers is created and filled with wakers to notify, the |
235 | /// lock is released, and the wakers are notified. Because there may be more |
236 | /// than 32 wakers to notify, if the stack array fills up, the lock is |
237 | /// released, the array is cleared, and the iteration continues. |
238 | pub(super) fn wake(&self, ready: Ready) { |
239 | let mut wakers = WakeList::new(); |
240 | |
241 | let mut waiters = self.waiters.lock(); |
242 | |
243 | // check for AsyncRead slot |
244 | if ready.is_readable() { |
245 | if let Some(waker) = waiters.reader.take() { |
246 | wakers.push(waker); |
247 | } |
248 | } |
249 | |
250 | // check for AsyncWrite slot |
251 | if ready.is_writable() { |
252 | if let Some(waker) = waiters.writer.take() { |
253 | wakers.push(waker); |
254 | } |
255 | } |
256 | |
257 | 'outer: loop { |
258 | let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); |
259 | |
260 | while wakers.can_push() { |
261 | match iter.next() { |
262 | Some(waiter) => { |
263 | let waiter = unsafe { &mut *waiter.as_ptr() }; |
264 | |
265 | if let Some(waker) = waiter.waker.take() { |
266 | waiter.is_ready = true; |
267 | wakers.push(waker); |
268 | } |
269 | } |
270 | None => { |
271 | break 'outer; |
272 | } |
273 | } |
274 | } |
275 | |
276 | drop(waiters); |
277 | |
278 | wakers.wake_all(); |
279 | |
280 | // Acquire the lock again. |
281 | waiters = self.waiters.lock(); |
282 | } |
283 | |
284 | // Release the lock before notifying |
285 | drop(waiters); |
286 | |
287 | wakers.wake_all(); |
288 | } |
289 | |
290 | pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { |
291 | let curr = self.readiness.load(Acquire); |
292 | |
293 | ReadyEvent { |
294 | tick: TICK.unpack(curr) as u8, |
295 | ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), |
296 | is_shutdown: SHUTDOWN.unpack(curr) != 0, |
297 | } |
298 | } |
299 | |
300 | /// Polls for readiness events in a given direction. |
301 | /// |
302 | /// These are to support `AsyncRead` and `AsyncWrite` polling methods, |
303 | /// which cannot use the `async fn` version. This uses reserved reader |
304 | /// and writer slots. |
305 | pub(super) fn poll_readiness( |
306 | &self, |
307 | cx: &mut Context<'_>, |
308 | direction: Direction, |
309 | ) -> Poll<ReadyEvent> { |
310 | let curr = self.readiness.load(Acquire); |
311 | |
312 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
313 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
314 | |
315 | if ready.is_empty() && !is_shutdown { |
316 | // Update the task info |
317 | let mut waiters = self.waiters.lock(); |
318 | let waker = match direction { |
319 | Direction::Read => &mut waiters.reader, |
320 | Direction::Write => &mut waiters.writer, |
321 | }; |
322 | |
323 | // Avoid cloning the waker if one is already stored that matches the |
324 | // current task. |
325 | match waker { |
326 | Some(waker) => waker.clone_from(cx.waker()), |
327 | None => *waker = Some(cx.waker().clone()), |
328 | } |
329 | |
330 | // Try again, in case the readiness was changed while we were |
331 | // taking the waiters lock |
332 | let curr = self.readiness.load(Acquire); |
333 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
334 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
335 | if is_shutdown { |
336 | Poll::Ready(ReadyEvent { |
337 | tick: TICK.unpack(curr) as u8, |
338 | ready: direction.mask(), |
339 | is_shutdown, |
340 | }) |
341 | } else if ready.is_empty() { |
342 | Poll::Pending |
343 | } else { |
344 | Poll::Ready(ReadyEvent { |
345 | tick: TICK.unpack(curr) as u8, |
346 | ready, |
347 | is_shutdown, |
348 | }) |
349 | } |
350 | } else { |
351 | Poll::Ready(ReadyEvent { |
352 | tick: TICK.unpack(curr) as u8, |
353 | ready, |
354 | is_shutdown, |
355 | }) |
356 | } |
357 | } |
358 | |
359 | pub(crate) fn clear_readiness(&self, event: ReadyEvent) { |
360 | // This consumes the current readiness state **except** for closed |
361 | // states. Closed states are excluded because they are final states. |
362 | let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; |
363 | self.set_readiness(Tick::Clear(event.tick), |curr| curr - mask_no_closed); |
364 | } |
365 | |
366 | pub(crate) fn clear_wakers(&self) { |
367 | let mut waiters = self.waiters.lock(); |
368 | waiters.reader.take(); |
369 | waiters.writer.take(); |
370 | } |
371 | } |
372 | |
373 | impl Drop for ScheduledIo { |
374 | fn drop(&mut self) { |
375 | self.wake(ready:Ready::ALL); |
376 | } |
377 | } |
378 | |
379 | unsafe impl Send for ScheduledIo {} |
380 | unsafe impl Sync for ScheduledIo {} |
381 | |
382 | impl ScheduledIo { |
383 | /// An async version of `poll_readiness` which uses a linked list of wakers. |
384 | pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { |
385 | self.readiness_fut(interest).await |
386 | } |
387 | |
388 | // This is in a separate function so that the borrow checker doesn't think |
389 | // we are borrowing the `UnsafeCell` possibly over await boundaries. |
390 | // |
391 | // Go figure. |
392 | fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { |
393 | Readiness { |
394 | scheduled_io: self, |
395 | state: State::Init, |
396 | waiter: UnsafeCell::new(Waiter { |
397 | pointers: linked_list::Pointers::new(), |
398 | waker: None, |
399 | is_ready: false, |
400 | interest, |
401 | _p: PhantomPinned, |
402 | }), |
403 | } |
404 | } |
405 | } |
406 | |
407 | unsafe impl linked_list::Link for Waiter { |
408 | type Handle = NonNull<Waiter>; |
409 | type Target = Waiter; |
410 | |
411 | fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { |
412 | *handle |
413 | } |
414 | |
415 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
416 | ptr |
417 | } |
418 | |
419 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
420 | Waiter::addr_of_pointers(me:target) |
421 | } |
422 | } |
423 | |
424 | // ===== impl Readiness ===== |
425 | |
426 | impl Future for Readiness<'_> { |
427 | type Output = ReadyEvent; |
428 | |
429 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
430 | use std::sync::atomic::Ordering::SeqCst; |
431 | |
432 | let (scheduled_io, state, waiter) = unsafe { |
433 | let me = self.get_unchecked_mut(); |
434 | (&me.scheduled_io, &mut me.state, &me.waiter) |
435 | }; |
436 | |
437 | loop { |
438 | match *state { |
439 | State::Init => { |
440 | // Optimistically check existing readiness |
441 | let curr = scheduled_io.readiness.load(SeqCst); |
442 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
443 | |
444 | // Safety: `waiter.interest` never changes |
445 | let interest = unsafe { (*waiter.get()).interest }; |
446 | let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest); |
447 | |
448 | if !ready.is_empty() || is_shutdown { |
449 | // Currently ready! |
450 | let tick = TICK.unpack(curr) as u8; |
451 | *state = State::Done; |
452 | return Poll::Ready(ReadyEvent { |
453 | tick, |
454 | ready, |
455 | is_shutdown, |
456 | }); |
457 | } |
458 | |
459 | // Wasn't ready, take the lock (and check again while locked). |
460 | let mut waiters = scheduled_io.waiters.lock(); |
461 | |
462 | let curr = scheduled_io.readiness.load(SeqCst); |
463 | let mut ready = Ready::from_usize(READINESS.unpack(curr)); |
464 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
465 | |
466 | if is_shutdown { |
467 | ready = Ready::ALL; |
468 | } |
469 | |
470 | let ready = ready.intersection(interest); |
471 | |
472 | if !ready.is_empty() || is_shutdown { |
473 | // Currently ready! |
474 | let tick = TICK.unpack(curr) as u8; |
475 | *state = State::Done; |
476 | return Poll::Ready(ReadyEvent { |
477 | tick, |
478 | ready, |
479 | is_shutdown, |
480 | }); |
481 | } |
482 | |
483 | // Not ready even after locked, insert into list... |
484 | |
485 | // Safety: called while locked |
486 | unsafe { |
487 | (*waiter.get()).waker = Some(cx.waker().clone()); |
488 | } |
489 | |
490 | // Insert the waiter into the linked list |
491 | // |
492 | // safety: pointers from `UnsafeCell` are never null. |
493 | waiters |
494 | .list |
495 | .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); |
496 | *state = State::Waiting; |
497 | } |
498 | State::Waiting => { |
499 | // Currently in the "Waiting" state, implying the caller has |
500 | // a waiter stored in the waiter list (guarded by |
501 | // `notify.waiters`). In order to access the waker fields, |
502 | // we must hold the lock. |
503 | |
504 | let waiters = scheduled_io.waiters.lock(); |
505 | |
506 | // Safety: called while locked |
507 | let w = unsafe { &mut *waiter.get() }; |
508 | |
509 | if w.is_ready { |
510 | // Our waker has been notified. |
511 | *state = State::Done; |
512 | } else { |
513 | // Update the waker, if necessary. |
514 | w.waker.as_mut().unwrap().clone_from(cx.waker()); |
515 | return Poll::Pending; |
516 | } |
517 | |
518 | // Explicit drop of the lock to indicate the scope that the |
519 | // lock is held. Because holding the lock is required to |
520 | // ensure safe access to fields not held within the lock, it |
521 | // is helpful to visualize the scope of the critical |
522 | // section. |
523 | drop(waiters); |
524 | } |
525 | State::Done => { |
526 | // Safety: State::Done means it is no longer shared |
527 | let w = unsafe { &mut *waiter.get() }; |
528 | |
529 | let curr = scheduled_io.readiness.load(Acquire); |
530 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
531 | |
532 | // The returned tick might be newer than the event |
533 | // which notified our waker. This is ok because the future |
534 | // still didn't return `Poll::Ready`. |
535 | let tick = TICK.unpack(curr) as u8; |
536 | |
537 | // The readiness state could have been cleared in the meantime, |
538 | // but we allow the returned ready set to be empty. |
539 | let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest); |
540 | |
541 | return Poll::Ready(ReadyEvent { |
542 | tick, |
543 | ready, |
544 | is_shutdown, |
545 | }); |
546 | } |
547 | } |
548 | } |
549 | } |
550 | } |
551 | |
552 | impl Drop for Readiness<'_> { |
553 | fn drop(&mut self) { |
554 | let mut waiters: MutexGuard<'_, Waiters> = self.scheduled_io.waiters.lock(); |
555 | |
556 | // Safety: `waiter` is only ever stored in `waiters` |
557 | unsafe { |
558 | waiters |
559 | .list |
560 | .remove(node:NonNull::new_unchecked(self.waiter.get())) |
561 | }; |
562 | } |
563 | } |
564 | |
565 | unsafe impl Send for Readiness<'_> {} |
566 | unsafe impl Sync for Readiness<'_> {} |
567 | |