1use super::{ReadyEvent, Tick};
2use crate::io::interest::Interest;
3use crate::io::ready::Ready;
4use crate::loom::sync::atomic::AtomicUsize;
5use crate::loom::sync::Mutex;
6use crate::util::bit;
7use crate::util::slab::Entry;
8use crate::util::WakeList;
9
10use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
11use std::task::{Context, Poll, Waker};
12
13use super::Direction;
14
15cfg_io_readiness! {
16 use crate::util::linked_list::{self, LinkedList};
17
18 use std::cell::UnsafeCell;
19 use std::future::Future;
20 use std::marker::PhantomPinned;
21 use std::pin::Pin;
22 use std::ptr::NonNull;
23}
24
25/// Stored in the I/O driver resource slab.
26#[derive(Debug)]
27pub(crate) struct ScheduledIo {
28 /// Packs the resource's readiness with the resource's generation.
29 readiness: AtomicUsize,
30
31 waiters: Mutex<Waiters>,
32}
33
34cfg_io_readiness! {
35 type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
36}
37
38#[derive(Debug, Default)]
39struct Waiters {
40 #[cfg(feature = "net")]
41 /// List of all current waiters.
42 list: WaitList,
43
44 /// Waker used for AsyncRead.
45 reader: Option<Waker>,
46
47 /// Waker used for AsyncWrite.
48 writer: Option<Waker>,
49}
50
51cfg_io_readiness! {
52 #[derive(Debug)]
53 struct Waiter {
54 pointers: linked_list::Pointers<Waiter>,
55
56 /// The waker for this task.
57 waker: Option<Waker>,
58
59 /// The interest this waiter is waiting on.
60 interest: Interest,
61
62 is_ready: bool,
63
64 /// Should never be `!Unpin`.
65 _p: PhantomPinned,
66 }
67
68 generate_addr_of_methods! {
69 impl<> Waiter {
70 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
71 &self.pointers
72 }
73 }
74 }
75
76 /// Future returned by `readiness()`.
77 struct Readiness<'a> {
78 scheduled_io: &'a ScheduledIo,
79
80 state: State,
81
82 /// Entry in the waiter `LinkedList`.
83 waiter: UnsafeCell<Waiter>,
84 }
85
86 enum State {
87 Init,
88 Waiting,
89 Done,
90 }
91}
92
93// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
94//
95// | shutdown | generation | driver tick | readiness |
96// |----------+------------+--------------+-----------|
97// | 1 bit | 7 bits + 8 bits + 16 bits |
98
99const READINESS: bit::Pack = bit::Pack::least_significant(width:16);
100
101const TICK: bit::Pack = READINESS.then(width:8);
102
103const GENERATION: bit::Pack = TICK.then(width:7);
104
105const SHUTDOWN: bit::Pack = GENERATION.then(width:1);
106
107#[test]
108fn test_generations_assert_same() {
109 assert_eq!(super::GENERATION, GENERATION);
110}
111
112// ===== impl ScheduledIo =====
113
114impl Entry for ScheduledIo {
115 fn reset(&self) {
116 let state: usize = self.readiness.load(order:Acquire);
117
118 let generation: usize = GENERATION.unpack(src:state);
119 let next: usize = GENERATION.pack_lossy(value:generation + 1, base:0);
120
121 self.readiness.store(val:next, order:Release);
122 }
123}
124
125impl Default for ScheduledIo {
126 fn default() -> ScheduledIo {
127 ScheduledIo {
128 readiness: AtomicUsize::new(val:0),
129 waiters: Mutex::new(Default::default()),
130 }
131 }
132}
133
134impl ScheduledIo {
135 pub(crate) fn generation(&self) -> usize {
136 GENERATION.unpack(self.readiness.load(Acquire))
137 }
138
139 /// Invoked when the IO driver is shut down; forces this ScheduledIo into a
140 /// permanently shutdown state.
141 pub(super) fn shutdown(&self) {
142 let mask = SHUTDOWN.pack(1, 0);
143 self.readiness.fetch_or(mask, AcqRel);
144 self.wake(Ready::ALL);
145 }
146
147 /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
148 /// the current value, returning the previous readiness value.
149 ///
150 /// # Arguments
151 /// - `token`: the token for this `ScheduledIo`.
152 /// - `tick`: whether setting the tick or trying to clear readiness for a
153 /// specific tick.
154 /// - `f`: a closure returning a new readiness value given the previous
155 /// readiness.
156 ///
157 /// # Returns
158 ///
159 /// If the given token's generation no longer matches the `ScheduledIo`'s
160 /// generation, then the corresponding IO resource has been removed and
161 /// replaced with a new resource. In that case, this method returns `Err`.
162 /// Otherwise, this returns the previous readiness.
163 pub(super) fn set_readiness(
164 &self,
165 token: Option<usize>,
166 tick: Tick,
167 f: impl Fn(Ready) -> Ready,
168 ) -> Result<(), ()> {
169 let mut current = self.readiness.load(Acquire);
170
171 loop {
172 let current_generation = GENERATION.unpack(current);
173
174 if let Some(token) = token {
175 // Check that the generation for this access is still the
176 // current one.
177 if GENERATION.unpack(token) != current_generation {
178 return Err(());
179 }
180 }
181
182 // Mask out the tick/generation bits so that the modifying
183 // function doesn't see them.
184 let current_readiness = Ready::from_usize(current);
185 let new = f(current_readiness);
186
187 let packed = match tick {
188 Tick::Set(t) => TICK.pack(t as usize, new.as_usize()),
189 Tick::Clear(t) => {
190 if TICK.unpack(current) as u8 != t {
191 // Trying to clear readiness with an old event!
192 return Err(());
193 }
194
195 TICK.pack(t as usize, new.as_usize())
196 }
197 };
198
199 let next = GENERATION.pack(current_generation, packed);
200
201 match self
202 .readiness
203 .compare_exchange(current, next, AcqRel, Acquire)
204 {
205 Ok(_) => return Ok(()),
206 // we lost the race, retry!
207 Err(actual) => current = actual,
208 }
209 }
210 }
211
212 /// Notifies all pending waiters that have registered interest in `ready`.
213 ///
214 /// There may be many waiters to notify. Waking the pending task **must** be
215 /// done from outside of the lock otherwise there is a potential for a
216 /// deadlock.
217 ///
218 /// A stack array of wakers is created and filled with wakers to notify, the
219 /// lock is released, and the wakers are notified. Because there may be more
220 /// than 32 wakers to notify, if the stack array fills up, the lock is
221 /// released, the array is cleared, and the iteration continues.
222 pub(super) fn wake(&self, ready: Ready) {
223 let mut wakers = WakeList::new();
224
225 let mut waiters = self.waiters.lock();
226
227 // check for AsyncRead slot
228 if ready.is_readable() {
229 if let Some(waker) = waiters.reader.take() {
230 wakers.push(waker);
231 }
232 }
233
234 // check for AsyncWrite slot
235 if ready.is_writable() {
236 if let Some(waker) = waiters.writer.take() {
237 wakers.push(waker);
238 }
239 }
240
241 #[cfg(feature = "net")]
242 'outer: loop {
243 let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
244
245 while wakers.can_push() {
246 match iter.next() {
247 Some(waiter) => {
248 let waiter = unsafe { &mut *waiter.as_ptr() };
249
250 if let Some(waker) = waiter.waker.take() {
251 waiter.is_ready = true;
252 wakers.push(waker);
253 }
254 }
255 None => {
256 break 'outer;
257 }
258 }
259 }
260
261 drop(waiters);
262
263 wakers.wake_all();
264
265 // Acquire the lock again.
266 waiters = self.waiters.lock();
267 }
268
269 // Release the lock before notifying
270 drop(waiters);
271
272 wakers.wake_all();
273 }
274
275 pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
276 let curr = self.readiness.load(Acquire);
277
278 ReadyEvent {
279 tick: TICK.unpack(curr) as u8,
280 ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
281 is_shutdown: SHUTDOWN.unpack(curr) != 0,
282 }
283 }
284
285 /// Polls for readiness events in a given direction.
286 ///
287 /// These are to support `AsyncRead` and `AsyncWrite` polling methods,
288 /// which cannot use the `async fn` version. This uses reserved reader
289 /// and writer slots.
290 pub(super) fn poll_readiness(
291 &self,
292 cx: &mut Context<'_>,
293 direction: Direction,
294 ) -> Poll<ReadyEvent> {
295 let curr = self.readiness.load(Acquire);
296
297 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
298 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
299
300 if ready.is_empty() && !is_shutdown {
301 // Update the task info
302 let mut waiters = self.waiters.lock();
303 let slot = match direction {
304 Direction::Read => &mut waiters.reader,
305 Direction::Write => &mut waiters.writer,
306 };
307
308 // Avoid cloning the waker if one is already stored that matches the
309 // current task.
310 match slot {
311 Some(existing) => {
312 if !existing.will_wake(cx.waker()) {
313 *existing = cx.waker().clone();
314 }
315 }
316 None => {
317 *slot = Some(cx.waker().clone());
318 }
319 }
320
321 // Try again, in case the readiness was changed while we were
322 // taking the waiters lock
323 let curr = self.readiness.load(Acquire);
324 let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
325 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
326 if is_shutdown {
327 Poll::Ready(ReadyEvent {
328 tick: TICK.unpack(curr) as u8,
329 ready: direction.mask(),
330 is_shutdown,
331 })
332 } else if ready.is_empty() {
333 Poll::Pending
334 } else {
335 Poll::Ready(ReadyEvent {
336 tick: TICK.unpack(curr) as u8,
337 ready,
338 is_shutdown,
339 })
340 }
341 } else {
342 Poll::Ready(ReadyEvent {
343 tick: TICK.unpack(curr) as u8,
344 ready,
345 is_shutdown,
346 })
347 }
348 }
349
350 pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
351 // This consumes the current readiness state **except** for closed
352 // states. Closed states are excluded because they are final states.
353 let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED;
354
355 // result isn't important
356 let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed);
357 }
358
359 pub(crate) fn clear_wakers(&self) {
360 let mut waiters = self.waiters.lock();
361 waiters.reader.take();
362 waiters.writer.take();
363 }
364}
365
366impl Drop for ScheduledIo {
367 fn drop(&mut self) {
368 self.wake(ready:Ready::ALL);
369 }
370}
371
372unsafe impl Send for ScheduledIo {}
373unsafe impl Sync for ScheduledIo {}
374
375cfg_io_readiness! {
376 impl ScheduledIo {
377 /// An async version of `poll_readiness` which uses a linked list of wakers.
378 pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
379 self.readiness_fut(interest).await
380 }
381
382 // This is in a separate function so that the borrow checker doesn't think
383 // we are borrowing the `UnsafeCell` possibly over await boundaries.
384 //
385 // Go figure.
386 fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
387 Readiness {
388 scheduled_io: self,
389 state: State::Init,
390 waiter: UnsafeCell::new(Waiter {
391 pointers: linked_list::Pointers::new(),
392 waker: None,
393 is_ready: false,
394 interest,
395 _p: PhantomPinned,
396 }),
397 }
398 }
399 }
400
401 unsafe impl linked_list::Link for Waiter {
402 type Handle = NonNull<Waiter>;
403 type Target = Waiter;
404
405 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
406 *handle
407 }
408
409 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
410 ptr
411 }
412
413 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
414 Waiter::addr_of_pointers(target)
415 }
416 }
417
418 // ===== impl Readiness =====
419
420 impl Future for Readiness<'_> {
421 type Output = ReadyEvent;
422
423 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
424 use std::sync::atomic::Ordering::SeqCst;
425
426 let (scheduled_io, state, waiter) = unsafe {
427 let me = self.get_unchecked_mut();
428 (&me.scheduled_io, &mut me.state, &me.waiter)
429 };
430
431 loop {
432 match *state {
433 State::Init => {
434 // Optimistically check existing readiness
435 let curr = scheduled_io.readiness.load(SeqCst);
436 let ready = Ready::from_usize(READINESS.unpack(curr));
437 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
438
439 // Safety: `waiter.interest` never changes
440 let interest = unsafe { (*waiter.get()).interest };
441 let ready = ready.intersection(interest);
442
443 if !ready.is_empty() || is_shutdown {
444 // Currently ready!
445 let tick = TICK.unpack(curr) as u8;
446 *state = State::Done;
447 return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
448 }
449
450 // Wasn't ready, take the lock (and check again while locked).
451 let mut waiters = scheduled_io.waiters.lock();
452
453 let curr = scheduled_io.readiness.load(SeqCst);
454 let mut ready = Ready::from_usize(READINESS.unpack(curr));
455 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
456
457 if is_shutdown {
458 ready = Ready::ALL;
459 }
460
461 let ready = ready.intersection(interest);
462
463 if !ready.is_empty() || is_shutdown {
464 // Currently ready!
465 let tick = TICK.unpack(curr) as u8;
466 *state = State::Done;
467 return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
468 }
469
470 // Not ready even after locked, insert into list...
471
472 // Safety: called while locked
473 unsafe {
474 (*waiter.get()).waker = Some(cx.waker().clone());
475 }
476
477 // Insert the waiter into the linked list
478 //
479 // safety: pointers from `UnsafeCell` are never null.
480 waiters
481 .list
482 .push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
483 *state = State::Waiting;
484 }
485 State::Waiting => {
486 // Currently in the "Waiting" state, implying the caller has
487 // a waiter stored in the waiter list (guarded by
488 // `notify.waiters`). In order to access the waker fields,
489 // we must hold the lock.
490
491 let waiters = scheduled_io.waiters.lock();
492
493 // Safety: called while locked
494 let w = unsafe { &mut *waiter.get() };
495
496 if w.is_ready {
497 // Our waker has been notified.
498 *state = State::Done;
499 } else {
500 // Update the waker, if necessary.
501 if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
502 w.waker = Some(cx.waker().clone());
503 }
504
505 return Poll::Pending;
506 }
507
508 // Explicit drop of the lock to indicate the scope that the
509 // lock is held. Because holding the lock is required to
510 // ensure safe access to fields not held within the lock, it
511 // is helpful to visualize the scope of the critical
512 // section.
513 drop(waiters);
514 }
515 State::Done => {
516 // Safety: State::Done means it is no longer shared
517 let w = unsafe { &mut *waiter.get() };
518
519 let curr = scheduled_io.readiness.load(Acquire);
520 let is_shutdown = SHUTDOWN.unpack(curr) != 0;
521
522 // The returned tick might be newer than the event
523 // which notified our waker. This is ok because the future
524 // still didn't return `Poll::Ready`.
525 let tick = TICK.unpack(curr) as u8;
526
527 // The readiness state could have been cleared in the meantime,
528 // but we allow the returned ready set to be empty.
529 let curr_ready = Ready::from_usize(READINESS.unpack(curr));
530 let ready = curr_ready.intersection(w.interest);
531
532 return Poll::Ready(ReadyEvent {
533 tick,
534 ready,
535 is_shutdown,
536 });
537 }
538 }
539 }
540 }
541 }
542
543 impl Drop for Readiness<'_> {
544 fn drop(&mut self) {
545 let mut waiters = self.scheduled_io.waiters.lock();
546
547 // Safety: `waiter` is only ever stored in `waiters`
548 unsafe {
549 waiters
550 .list
551 .remove(NonNull::new_unchecked(self.waiter.get()))
552 };
553 }
554 }
555
556 unsafe impl Send for Readiness<'_> {}
557 unsafe impl Sync for Readiness<'_> {}
558}
559