1 | use super::{ReadyEvent, Tick}; |
2 | use crate::io::interest::Interest; |
3 | use crate::io::ready::Ready; |
4 | use crate::loom::sync::atomic::AtomicUsize; |
5 | use crate::loom::sync::Mutex; |
6 | use crate::util::bit; |
7 | use crate::util::slab::Entry; |
8 | use crate::util::WakeList; |
9 | |
10 | use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; |
11 | use std::task::{Context, Poll, Waker}; |
12 | |
13 | use super::Direction; |
14 | |
15 | cfg_io_readiness! { |
16 | use crate::util::linked_list::{self, LinkedList}; |
17 | |
18 | use std::cell::UnsafeCell; |
19 | use std::future::Future; |
20 | use std::marker::PhantomPinned; |
21 | use std::pin::Pin; |
22 | use std::ptr::NonNull; |
23 | } |
24 | |
25 | /// Stored in the I/O driver resource slab. |
26 | #[derive (Debug)] |
27 | pub(crate) struct ScheduledIo { |
28 | /// Packs the resource's readiness with the resource's generation. |
29 | readiness: AtomicUsize, |
30 | |
31 | waiters: Mutex<Waiters>, |
32 | } |
33 | |
34 | cfg_io_readiness! { |
35 | type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>; |
36 | } |
37 | |
38 | #[derive (Debug, Default)] |
39 | struct Waiters { |
40 | #[cfg (feature = "net" )] |
41 | /// List of all current waiters. |
42 | list: WaitList, |
43 | |
44 | /// Waker used for AsyncRead. |
45 | reader: Option<Waker>, |
46 | |
47 | /// Waker used for AsyncWrite. |
48 | writer: Option<Waker>, |
49 | } |
50 | |
51 | cfg_io_readiness! { |
52 | #[derive (Debug)] |
53 | struct Waiter { |
54 | pointers: linked_list::Pointers<Waiter>, |
55 | |
56 | /// The waker for this task. |
57 | waker: Option<Waker>, |
58 | |
59 | /// The interest this waiter is waiting on. |
60 | interest: Interest, |
61 | |
62 | is_ready: bool, |
63 | |
64 | /// Should never be `!Unpin`. |
65 | _p: PhantomPinned, |
66 | } |
67 | |
68 | generate_addr_of_methods! { |
69 | impl<> Waiter { |
70 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
71 | &self.pointers |
72 | } |
73 | } |
74 | } |
75 | |
76 | /// Future returned by `readiness()`. |
77 | struct Readiness<'a> { |
78 | scheduled_io: &'a ScheduledIo, |
79 | |
80 | state: State, |
81 | |
82 | /// Entry in the waiter `LinkedList`. |
83 | waiter: UnsafeCell<Waiter>, |
84 | } |
85 | |
86 | enum State { |
87 | Init, |
88 | Waiting, |
89 | Done, |
90 | } |
91 | } |
92 | |
93 | // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. |
94 | // |
95 | // | shutdown | generation | driver tick | readiness | |
96 | // |----------+------------+--------------+-----------| |
97 | // | 1 bit | 7 bits + 8 bits + 16 bits | |
98 | |
99 | const READINESS: bit::Pack = bit::Pack::least_significant(width:16); |
100 | |
101 | const TICK: bit::Pack = READINESS.then(width:8); |
102 | |
103 | const GENERATION: bit::Pack = TICK.then(width:7); |
104 | |
105 | const SHUTDOWN: bit::Pack = GENERATION.then(width:1); |
106 | |
107 | #[test ] |
108 | fn test_generations_assert_same() { |
109 | assert_eq!(super::GENERATION, GENERATION); |
110 | } |
111 | |
112 | // ===== impl ScheduledIo ===== |
113 | |
114 | impl Entry for ScheduledIo { |
115 | fn reset(&self) { |
116 | let state: usize = self.readiness.load(order:Acquire); |
117 | |
118 | let generation: usize = GENERATION.unpack(src:state); |
119 | let next: usize = GENERATION.pack_lossy(value:generation + 1, base:0); |
120 | |
121 | self.readiness.store(val:next, order:Release); |
122 | } |
123 | } |
124 | |
125 | impl Default for ScheduledIo { |
126 | fn default() -> ScheduledIo { |
127 | ScheduledIo { |
128 | readiness: AtomicUsize::new(val:0), |
129 | waiters: Mutex::new(Default::default()), |
130 | } |
131 | } |
132 | } |
133 | |
134 | impl ScheduledIo { |
135 | pub(crate) fn generation(&self) -> usize { |
136 | GENERATION.unpack(self.readiness.load(Acquire)) |
137 | } |
138 | |
139 | /// Invoked when the IO driver is shut down; forces this ScheduledIo into a |
140 | /// permanently shutdown state. |
141 | pub(super) fn shutdown(&self) { |
142 | let mask = SHUTDOWN.pack(1, 0); |
143 | self.readiness.fetch_or(mask, AcqRel); |
144 | self.wake(Ready::ALL); |
145 | } |
146 | |
147 | /// Sets the readiness on this `ScheduledIo` by invoking the given closure on |
148 | /// the current value, returning the previous readiness value. |
149 | /// |
150 | /// # Arguments |
151 | /// - `token`: the token for this `ScheduledIo`. |
152 | /// - `tick`: whether setting the tick or trying to clear readiness for a |
153 | /// specific tick. |
154 | /// - `f`: a closure returning a new readiness value given the previous |
155 | /// readiness. |
156 | /// |
157 | /// # Returns |
158 | /// |
159 | /// If the given token's generation no longer matches the `ScheduledIo`'s |
160 | /// generation, then the corresponding IO resource has been removed and |
161 | /// replaced with a new resource. In that case, this method returns `Err`. |
162 | /// Otherwise, this returns the previous readiness. |
163 | pub(super) fn set_readiness( |
164 | &self, |
165 | token: Option<usize>, |
166 | tick: Tick, |
167 | f: impl Fn(Ready) -> Ready, |
168 | ) -> Result<(), ()> { |
169 | let mut current = self.readiness.load(Acquire); |
170 | |
171 | loop { |
172 | let current_generation = GENERATION.unpack(current); |
173 | |
174 | if let Some(token) = token { |
175 | // Check that the generation for this access is still the |
176 | // current one. |
177 | if GENERATION.unpack(token) != current_generation { |
178 | return Err(()); |
179 | } |
180 | } |
181 | |
182 | // Mask out the tick/generation bits so that the modifying |
183 | // function doesn't see them. |
184 | let current_readiness = Ready::from_usize(current); |
185 | let new = f(current_readiness); |
186 | |
187 | let packed = match tick { |
188 | Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), |
189 | Tick::Clear(t) => { |
190 | if TICK.unpack(current) as u8 != t { |
191 | // Trying to clear readiness with an old event! |
192 | return Err(()); |
193 | } |
194 | |
195 | TICK.pack(t as usize, new.as_usize()) |
196 | } |
197 | }; |
198 | |
199 | let next = GENERATION.pack(current_generation, packed); |
200 | |
201 | match self |
202 | .readiness |
203 | .compare_exchange(current, next, AcqRel, Acquire) |
204 | { |
205 | Ok(_) => return Ok(()), |
206 | // we lost the race, retry! |
207 | Err(actual) => current = actual, |
208 | } |
209 | } |
210 | } |
211 | |
212 | /// Notifies all pending waiters that have registered interest in `ready`. |
213 | /// |
214 | /// There may be many waiters to notify. Waking the pending task **must** be |
215 | /// done from outside of the lock otherwise there is a potential for a |
216 | /// deadlock. |
217 | /// |
218 | /// A stack array of wakers is created and filled with wakers to notify, the |
219 | /// lock is released, and the wakers are notified. Because there may be more |
220 | /// than 32 wakers to notify, if the stack array fills up, the lock is |
221 | /// released, the array is cleared, and the iteration continues. |
222 | pub(super) fn wake(&self, ready: Ready) { |
223 | let mut wakers = WakeList::new(); |
224 | |
225 | let mut waiters = self.waiters.lock(); |
226 | |
227 | // check for AsyncRead slot |
228 | if ready.is_readable() { |
229 | if let Some(waker) = waiters.reader.take() { |
230 | wakers.push(waker); |
231 | } |
232 | } |
233 | |
234 | // check for AsyncWrite slot |
235 | if ready.is_writable() { |
236 | if let Some(waker) = waiters.writer.take() { |
237 | wakers.push(waker); |
238 | } |
239 | } |
240 | |
241 | #[cfg (feature = "net" )] |
242 | 'outer: loop { |
243 | let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); |
244 | |
245 | while wakers.can_push() { |
246 | match iter.next() { |
247 | Some(waiter) => { |
248 | let waiter = unsafe { &mut *waiter.as_ptr() }; |
249 | |
250 | if let Some(waker) = waiter.waker.take() { |
251 | waiter.is_ready = true; |
252 | wakers.push(waker); |
253 | } |
254 | } |
255 | None => { |
256 | break 'outer; |
257 | } |
258 | } |
259 | } |
260 | |
261 | drop(waiters); |
262 | |
263 | wakers.wake_all(); |
264 | |
265 | // Acquire the lock again. |
266 | waiters = self.waiters.lock(); |
267 | } |
268 | |
269 | // Release the lock before notifying |
270 | drop(waiters); |
271 | |
272 | wakers.wake_all(); |
273 | } |
274 | |
275 | pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { |
276 | let curr = self.readiness.load(Acquire); |
277 | |
278 | ReadyEvent { |
279 | tick: TICK.unpack(curr) as u8, |
280 | ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), |
281 | is_shutdown: SHUTDOWN.unpack(curr) != 0, |
282 | } |
283 | } |
284 | |
285 | /// Polls for readiness events in a given direction. |
286 | /// |
287 | /// These are to support `AsyncRead` and `AsyncWrite` polling methods, |
288 | /// which cannot use the `async fn` version. This uses reserved reader |
289 | /// and writer slots. |
290 | pub(super) fn poll_readiness( |
291 | &self, |
292 | cx: &mut Context<'_>, |
293 | direction: Direction, |
294 | ) -> Poll<ReadyEvent> { |
295 | let curr = self.readiness.load(Acquire); |
296 | |
297 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
298 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
299 | |
300 | if ready.is_empty() && !is_shutdown { |
301 | // Update the task info |
302 | let mut waiters = self.waiters.lock(); |
303 | let slot = match direction { |
304 | Direction::Read => &mut waiters.reader, |
305 | Direction::Write => &mut waiters.writer, |
306 | }; |
307 | |
308 | // Avoid cloning the waker if one is already stored that matches the |
309 | // current task. |
310 | match slot { |
311 | Some(existing) => { |
312 | if !existing.will_wake(cx.waker()) { |
313 | *existing = cx.waker().clone(); |
314 | } |
315 | } |
316 | None => { |
317 | *slot = Some(cx.waker().clone()); |
318 | } |
319 | } |
320 | |
321 | // Try again, in case the readiness was changed while we were |
322 | // taking the waiters lock |
323 | let curr = self.readiness.load(Acquire); |
324 | let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); |
325 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
326 | if is_shutdown { |
327 | Poll::Ready(ReadyEvent { |
328 | tick: TICK.unpack(curr) as u8, |
329 | ready: direction.mask(), |
330 | is_shutdown, |
331 | }) |
332 | } else if ready.is_empty() { |
333 | Poll::Pending |
334 | } else { |
335 | Poll::Ready(ReadyEvent { |
336 | tick: TICK.unpack(curr) as u8, |
337 | ready, |
338 | is_shutdown, |
339 | }) |
340 | } |
341 | } else { |
342 | Poll::Ready(ReadyEvent { |
343 | tick: TICK.unpack(curr) as u8, |
344 | ready, |
345 | is_shutdown, |
346 | }) |
347 | } |
348 | } |
349 | |
350 | pub(crate) fn clear_readiness(&self, event: ReadyEvent) { |
351 | // This consumes the current readiness state **except** for closed |
352 | // states. Closed states are excluded because they are final states. |
353 | let mask_no_closed = event.ready - Ready::READ_CLOSED - Ready::WRITE_CLOSED; |
354 | |
355 | // result isn't important |
356 | let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr - mask_no_closed); |
357 | } |
358 | |
359 | pub(crate) fn clear_wakers(&self) { |
360 | let mut waiters = self.waiters.lock(); |
361 | waiters.reader.take(); |
362 | waiters.writer.take(); |
363 | } |
364 | } |
365 | |
366 | impl Drop for ScheduledIo { |
367 | fn drop(&mut self) { |
368 | self.wake(ready:Ready::ALL); |
369 | } |
370 | } |
371 | |
372 | unsafe impl Send for ScheduledIo {} |
373 | unsafe impl Sync for ScheduledIo {} |
374 | |
375 | cfg_io_readiness! { |
376 | impl ScheduledIo { |
377 | /// An async version of `poll_readiness` which uses a linked list of wakers. |
378 | pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { |
379 | self.readiness_fut(interest).await |
380 | } |
381 | |
382 | // This is in a separate function so that the borrow checker doesn't think |
383 | // we are borrowing the `UnsafeCell` possibly over await boundaries. |
384 | // |
385 | // Go figure. |
386 | fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { |
387 | Readiness { |
388 | scheduled_io: self, |
389 | state: State::Init, |
390 | waiter: UnsafeCell::new(Waiter { |
391 | pointers: linked_list::Pointers::new(), |
392 | waker: None, |
393 | is_ready: false, |
394 | interest, |
395 | _p: PhantomPinned, |
396 | }), |
397 | } |
398 | } |
399 | } |
400 | |
401 | unsafe impl linked_list::Link for Waiter { |
402 | type Handle = NonNull<Waiter>; |
403 | type Target = Waiter; |
404 | |
405 | fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> { |
406 | *handle |
407 | } |
408 | |
409 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
410 | ptr |
411 | } |
412 | |
413 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
414 | Waiter::addr_of_pointers(target) |
415 | } |
416 | } |
417 | |
418 | // ===== impl Readiness ===== |
419 | |
420 | impl Future for Readiness<'_> { |
421 | type Output = ReadyEvent; |
422 | |
423 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
424 | use std::sync::atomic::Ordering::SeqCst; |
425 | |
426 | let (scheduled_io, state, waiter) = unsafe { |
427 | let me = self.get_unchecked_mut(); |
428 | (&me.scheduled_io, &mut me.state, &me.waiter) |
429 | }; |
430 | |
431 | loop { |
432 | match *state { |
433 | State::Init => { |
434 | // Optimistically check existing readiness |
435 | let curr = scheduled_io.readiness.load(SeqCst); |
436 | let ready = Ready::from_usize(READINESS.unpack(curr)); |
437 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
438 | |
439 | // Safety: `waiter.interest` never changes |
440 | let interest = unsafe { (*waiter.get()).interest }; |
441 | let ready = ready.intersection(interest); |
442 | |
443 | if !ready.is_empty() || is_shutdown { |
444 | // Currently ready! |
445 | let tick = TICK.unpack(curr) as u8; |
446 | *state = State::Done; |
447 | return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); |
448 | } |
449 | |
450 | // Wasn't ready, take the lock (and check again while locked). |
451 | let mut waiters = scheduled_io.waiters.lock(); |
452 | |
453 | let curr = scheduled_io.readiness.load(SeqCst); |
454 | let mut ready = Ready::from_usize(READINESS.unpack(curr)); |
455 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
456 | |
457 | if is_shutdown { |
458 | ready = Ready::ALL; |
459 | } |
460 | |
461 | let ready = ready.intersection(interest); |
462 | |
463 | if !ready.is_empty() || is_shutdown { |
464 | // Currently ready! |
465 | let tick = TICK.unpack(curr) as u8; |
466 | *state = State::Done; |
467 | return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); |
468 | } |
469 | |
470 | // Not ready even after locked, insert into list... |
471 | |
472 | // Safety: called while locked |
473 | unsafe { |
474 | (*waiter.get()).waker = Some(cx.waker().clone()); |
475 | } |
476 | |
477 | // Insert the waiter into the linked list |
478 | // |
479 | // safety: pointers from `UnsafeCell` are never null. |
480 | waiters |
481 | .list |
482 | .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); |
483 | *state = State::Waiting; |
484 | } |
485 | State::Waiting => { |
486 | // Currently in the "Waiting" state, implying the caller has |
487 | // a waiter stored in the waiter list (guarded by |
488 | // `notify.waiters`). In order to access the waker fields, |
489 | // we must hold the lock. |
490 | |
491 | let waiters = scheduled_io.waiters.lock(); |
492 | |
493 | // Safety: called while locked |
494 | let w = unsafe { &mut *waiter.get() }; |
495 | |
496 | if w.is_ready { |
497 | // Our waker has been notified. |
498 | *state = State::Done; |
499 | } else { |
500 | // Update the waker, if necessary. |
501 | if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { |
502 | w.waker = Some(cx.waker().clone()); |
503 | } |
504 | |
505 | return Poll::Pending; |
506 | } |
507 | |
508 | // Explicit drop of the lock to indicate the scope that the |
509 | // lock is held. Because holding the lock is required to |
510 | // ensure safe access to fields not held within the lock, it |
511 | // is helpful to visualize the scope of the critical |
512 | // section. |
513 | drop(waiters); |
514 | } |
515 | State::Done => { |
516 | // Safety: State::Done means it is no longer shared |
517 | let w = unsafe { &mut *waiter.get() }; |
518 | |
519 | let curr = scheduled_io.readiness.load(Acquire); |
520 | let is_shutdown = SHUTDOWN.unpack(curr) != 0; |
521 | |
522 | // The returned tick might be newer than the event |
523 | // which notified our waker. This is ok because the future |
524 | // still didn't return `Poll::Ready`. |
525 | let tick = TICK.unpack(curr) as u8; |
526 | |
527 | // The readiness state could have been cleared in the meantime, |
528 | // but we allow the returned ready set to be empty. |
529 | let curr_ready = Ready::from_usize(READINESS.unpack(curr)); |
530 | let ready = curr_ready.intersection(w.interest); |
531 | |
532 | return Poll::Ready(ReadyEvent { |
533 | tick, |
534 | ready, |
535 | is_shutdown, |
536 | }); |
537 | } |
538 | } |
539 | } |
540 | } |
541 | } |
542 | |
543 | impl Drop for Readiness<'_> { |
544 | fn drop(&mut self) { |
545 | let mut waiters = self.scheduled_io.waiters.lock(); |
546 | |
547 | // Safety: `waiter` is only ever stored in `waiters` |
548 | unsafe { |
549 | waiters |
550 | .list |
551 | .remove(NonNull::new_unchecked(self.waiter.get())) |
552 | }; |
553 | } |
554 | } |
555 | |
556 | unsafe impl Send for Readiness<'_> {} |
557 | unsafe impl Sync for Readiness<'_> {} |
558 | } |
559 | |