1#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2//! # Implementation Details.
3//!
4//! The semaphore is implemented using an intrusive linked list of waiters. An
5//! atomic counter tracks the number of available permits. If the semaphore does
6//! not contain the required number of permits, the task attempting to acquire
7//! permits places its waker at the end of a queue. When new permits are made
8//! available (such as by releasing an initial acquisition), they are assigned
9//! to the task at the front of the queue, waking that task if its requested
10//! number of permits is met.
11//!
12//! Because waiters are enqueued at the back of the linked list and dequeued
13//! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14//! of permits at a time will always be woken eventually, even if many other
15//! tasks are acquiring smaller numbers of permits. This means that in a
16//! use-case like tokio's read-write lock, writers will not be starved by
17//! readers.
18use crate::loom::cell::UnsafeCell;
19use crate::loom::sync::atomic::AtomicUsize;
20use crate::loom::sync::{Mutex, MutexGuard};
21use crate::util::linked_list::{self, LinkedList};
22#[cfg(all(tokio_unstable, feature = "tracing"))]
23use crate::util::trace;
24use crate::util::WakeList;
25
26use std::future::Future;
27use std::marker::PhantomPinned;
28use std::pin::Pin;
29use std::ptr::NonNull;
30use std::sync::atomic::Ordering::*;
31use std::task::Poll::*;
32use std::task::{Context, Poll, Waker};
33use std::{cmp, fmt};
34
35/// An asynchronous counting semaphore which permits waiting on multiple permits at once.
36pub(crate) struct Semaphore {
37 waiters: Mutex<Waitlist>,
38 /// The current number of available permits in the semaphore.
39 permits: AtomicUsize,
40 #[cfg(all(tokio_unstable, feature = "tracing"))]
41 resource_span: tracing::Span,
42}
43
44struct Waitlist {
45 queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
46 closed: bool,
47}
48
49/// Error returned from the [`Semaphore::try_acquire`] function.
50///
51/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
52#[derive(Debug, PartialEq, Eq)]
53pub enum TryAcquireError {
54 /// The semaphore has been [closed] and cannot issue new permits.
55 ///
56 /// [closed]: crate::sync::Semaphore::close
57 Closed,
58
59 /// The semaphore has no available permits.
60 NoPermits,
61}
62/// Error returned from the [`Semaphore::acquire`] function.
63///
64/// An `acquire` operation can only fail if the semaphore has been
65/// [closed].
66///
67/// [closed]: crate::sync::Semaphore::close
68/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
69#[derive(Debug)]
70pub struct AcquireError(());
71
72pub(crate) struct Acquire<'a> {
73 node: Waiter,
74 semaphore: &'a Semaphore,
75 num_permits: u32,
76 queued: bool,
77}
78
79/// An entry in the wait queue.
80struct Waiter {
81 /// The current state of the waiter.
82 ///
83 /// This is either the number of remaining permits required by
84 /// the waiter, or a flag indicating that the waiter is not yet queued.
85 state: AtomicUsize,
86
87 /// The waker to notify the task awaiting permits.
88 ///
89 /// # Safety
90 ///
91 /// This may only be accessed while the wait queue is locked.
92 waker: UnsafeCell<Option<Waker>>,
93
94 /// Intrusive linked-list pointers.
95 ///
96 /// # Safety
97 ///
98 /// This may only be accessed while the wait queue is locked.
99 ///
100 /// TODO: Ideally, we would be able to use loom to enforce that
101 /// this isn't accessed concurrently. However, it is difficult to
102 /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
103 /// references to `Pointers`, and `UnsafeCell` requires that checked access
104 /// take place inside a closure. We should consider changing `Pointers` to
105 /// use `UnsafeCell` internally.
106 pointers: linked_list::Pointers<Waiter>,
107
108 #[cfg(all(tokio_unstable, feature = "tracing"))]
109 ctx: trace::AsyncOpTracingCtx,
110
111 /// Should not be `Unpin`.
112 _p: PhantomPinned,
113}
114
115generate_addr_of_methods! {
116 impl<> Waiter {
117 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
118 &self.pointers
119 }
120 }
121}
122
123impl Semaphore {
124 /// The maximum number of permits which a semaphore can hold.
125 ///
126 /// Note that this reserves three bits of flags in the permit counter, but
127 /// we only actually use one of them. However, the previous semaphore
128 /// implementation used three bits, so we will continue to reserve them to
129 /// avoid a breaking change if additional flags need to be added in the
130 /// future.
131 pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
132 const CLOSED: usize = 1;
133 // The least-significant bit in the number of permits is reserved to use
134 // as a flag indicating that the semaphore has been closed. Consequently
135 // PERMIT_SHIFT is used to leave that bit for that purpose.
136 const PERMIT_SHIFT: usize = 1;
137
138 /// Creates a new semaphore with the initial number of permits
139 ///
140 /// Maximum number of permits on 32-bit platforms is `1<<29`.
141 pub(crate) fn new(permits: usize) -> Self {
142 assert!(
143 permits <= Self::MAX_PERMITS,
144 "a semaphore may not have more than MAX_PERMITS permits ({})",
145 Self::MAX_PERMITS
146 );
147
148 #[cfg(all(tokio_unstable, feature = "tracing"))]
149 let resource_span = {
150 let resource_span = tracing::trace_span!(
151 "runtime.resource",
152 concrete_type = "Semaphore",
153 kind = "Sync",
154 is_internal = true
155 );
156
157 resource_span.in_scope(|| {
158 tracing::trace!(
159 target: "runtime::resource::state_update",
160 permits = permits,
161 permits.op = "override",
162 )
163 });
164 resource_span
165 };
166
167 Self {
168 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
169 waiters: Mutex::new(Waitlist {
170 queue: LinkedList::new(),
171 closed: false,
172 }),
173 #[cfg(all(tokio_unstable, feature = "tracing"))]
174 resource_span,
175 }
176 }
177
178 /// Creates a new semaphore with the initial number of permits.
179 ///
180 /// Maximum number of permits on 32-bit platforms is `1<<29`.
181 ///
182 /// If the specified number of permits exceeds the maximum permit amount
183 /// Then the value will get clamped to the maximum number of permits.
184 #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
185 pub(crate) const fn const_new(mut permits: usize) -> Self {
186 // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925
187 // currently we just clamp the permit count when it exceeds the max
188 permits &= Self::MAX_PERMITS;
189
190 Self {
191 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
192 waiters: Mutex::const_new(Waitlist {
193 queue: LinkedList::new(),
194 closed: false,
195 }),
196 #[cfg(all(tokio_unstable, feature = "tracing"))]
197 resource_span: tracing::Span::none(),
198 }
199 }
200
201 /// Returns the current number of available permits.
202 pub(crate) fn available_permits(&self) -> usize {
203 self.permits.load(Acquire) >> Self::PERMIT_SHIFT
204 }
205
206 /// Adds `added` new permits to the semaphore.
207 ///
208 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
209 pub(crate) fn release(&self, added: usize) {
210 if added == 0 {
211 return;
212 }
213
214 // Assign permits to the wait queue
215 self.add_permits_locked(added, self.waiters.lock());
216 }
217
218 /// Closes the semaphore. This prevents the semaphore from issuing new
219 /// permits and notifies all pending waiters.
220 pub(crate) fn close(&self) {
221 let mut waiters = self.waiters.lock();
222 // If the semaphore's permits counter has enough permits for an
223 // unqueued waiter to acquire all the permits it needs immediately,
224 // it won't touch the wait list. Therefore, we have to set a bit on
225 // the permit counter as well. However, we must do this while
226 // holding the lock --- otherwise, if we set the bit and then wait
227 // to acquire the lock we'll enter an inconsistent state where the
228 // permit counter is closed, but the wait list is not.
229 self.permits.fetch_or(Self::CLOSED, Release);
230 waiters.closed = true;
231 while let Some(mut waiter) = waiters.queue.pop_back() {
232 let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
233 if let Some(waker) = waker {
234 waker.wake();
235 }
236 }
237 }
238
239 /// Returns true if the semaphore is closed.
240 pub(crate) fn is_closed(&self) -> bool {
241 self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
242 }
243
244 pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
245 assert!(
246 num_permits as usize <= Self::MAX_PERMITS,
247 "a semaphore may not have more than MAX_PERMITS permits ({})",
248 Self::MAX_PERMITS
249 );
250 let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
251 let mut curr = self.permits.load(Acquire);
252 loop {
253 // Has the semaphore closed?
254 if curr & Self::CLOSED == Self::CLOSED {
255 return Err(TryAcquireError::Closed);
256 }
257
258 // Are there enough permits remaining?
259 if curr < num_permits {
260 return Err(TryAcquireError::NoPermits);
261 }
262
263 let next = curr - num_permits;
264
265 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
266 Ok(_) => {
267 // TODO: Instrument once issue has been solved
268 return Ok(());
269 }
270 Err(actual) => curr = actual,
271 }
272 }
273 }
274
275 pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
276 Acquire::new(self, num_permits)
277 }
278
279 /// Release `rem` permits to the semaphore's wait list, starting from the
280 /// end of the queue.
281 ///
282 /// If `rem` exceeds the number of permits needed by the wait list, the
283 /// remainder are assigned back to the semaphore.
284 fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
285 let mut wakers = WakeList::new();
286 let mut lock = Some(waiters);
287 let mut is_empty = false;
288 while rem > 0 {
289 let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
290 'inner: while wakers.can_push() {
291 // Was the waiter assigned enough permits to wake it?
292 match waiters.queue.last() {
293 Some(waiter) => {
294 if !waiter.assign_permits(&mut rem) {
295 break 'inner;
296 }
297 }
298 None => {
299 is_empty = true;
300 // If we assigned permits to all the waiters in the queue, and there are
301 // still permits left over, assign them back to the semaphore.
302 break 'inner;
303 }
304 };
305 let mut waiter = waiters.queue.pop_back().unwrap();
306 if let Some(waker) =
307 unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
308 {
309 wakers.push(waker);
310 }
311 }
312
313 if rem > 0 && is_empty {
314 let permits = rem;
315 assert!(
316 permits <= Self::MAX_PERMITS,
317 "cannot add more than MAX_PERMITS permits ({})",
318 Self::MAX_PERMITS
319 );
320 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
321 let prev = prev >> Self::PERMIT_SHIFT;
322 assert!(
323 prev + permits <= Self::MAX_PERMITS,
324 "number of added permits ({}) would overflow MAX_PERMITS ({})",
325 rem,
326 Self::MAX_PERMITS
327 );
328
329 // add remaining permits back
330 #[cfg(all(tokio_unstable, feature = "tracing"))]
331 self.resource_span.in_scope(|| {
332 tracing::trace!(
333 target: "runtime::resource::state_update",
334 permits = rem,
335 permits.op = "add",
336 )
337 });
338
339 rem = 0;
340 }
341
342 drop(waiters); // release the lock
343
344 wakers.wake_all();
345 }
346
347 assert_eq!(rem, 0);
348 }
349
350 fn poll_acquire(
351 &self,
352 cx: &mut Context<'_>,
353 num_permits: u32,
354 node: Pin<&mut Waiter>,
355 queued: bool,
356 ) -> Poll<Result<(), AcquireError>> {
357 let mut acquired = 0;
358
359 let needed = if queued {
360 node.state.load(Acquire) << Self::PERMIT_SHIFT
361 } else {
362 (num_permits as usize) << Self::PERMIT_SHIFT
363 };
364
365 let mut lock = None;
366 // First, try to take the requested number of permits from the
367 // semaphore.
368 let mut curr = self.permits.load(Acquire);
369 let mut waiters = loop {
370 // Has the semaphore closed?
371 if curr & Self::CLOSED > 0 {
372 return Ready(Err(AcquireError::closed()));
373 }
374
375 let mut remaining = 0;
376 let total = curr
377 .checked_add(acquired)
378 .expect("number of permits must not overflow");
379 let (next, acq) = if total >= needed {
380 let next = curr - (needed - acquired);
381 (next, needed >> Self::PERMIT_SHIFT)
382 } else {
383 remaining = (needed - acquired) - curr;
384 (0, curr >> Self::PERMIT_SHIFT)
385 };
386
387 if remaining > 0 && lock.is_none() {
388 // No permits were immediately available, so this permit will
389 // (probably) need to wait. We'll need to acquire a lock on the
390 // wait queue before continuing. We need to do this _before_ the
391 // CAS that sets the new value of the semaphore's `permits`
392 // counter. Otherwise, if we subtract the permits and then
393 // acquire the lock, we might miss additional permits being
394 // added while waiting for the lock.
395 lock = Some(self.waiters.lock());
396 }
397
398 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
399 Ok(_) => {
400 acquired += acq;
401 if remaining == 0 {
402 if !queued {
403 #[cfg(all(tokio_unstable, feature = "tracing"))]
404 self.resource_span.in_scope(|| {
405 tracing::trace!(
406 target: "runtime::resource::state_update",
407 permits = acquired,
408 permits.op = "sub",
409 );
410 tracing::trace!(
411 target: "runtime::resource::async_op::state_update",
412 permits_obtained = acquired,
413 permits.op = "add",
414 )
415 });
416
417 return Ready(Ok(()));
418 } else if lock.is_none() {
419 break self.waiters.lock();
420 }
421 }
422 break lock.expect("lock must be acquired before waiting");
423 }
424 Err(actual) => curr = actual,
425 }
426 };
427
428 if waiters.closed {
429 return Ready(Err(AcquireError::closed()));
430 }
431
432 #[cfg(all(tokio_unstable, feature = "tracing"))]
433 self.resource_span.in_scope(|| {
434 tracing::trace!(
435 target: "runtime::resource::state_update",
436 permits = acquired,
437 permits.op = "sub",
438 )
439 });
440
441 if node.assign_permits(&mut acquired) {
442 self.add_permits_locked(acquired, waiters);
443 return Ready(Ok(()));
444 }
445
446 assert_eq!(acquired, 0);
447 let mut old_waker = None;
448
449 // Otherwise, register the waker & enqueue the node.
450 node.waker.with_mut(|waker| {
451 // Safety: the wait list is locked, so we may modify the waker.
452 let waker = unsafe { &mut *waker };
453 // Do we need to register the new waker?
454 if waker
455 .as_ref()
456 .map(|waker| !waker.will_wake(cx.waker()))
457 .unwrap_or(true)
458 {
459 old_waker = std::mem::replace(waker, Some(cx.waker().clone()));
460 }
461 });
462
463 // If the waiter is not already in the wait queue, enqueue it.
464 if !queued {
465 let node = unsafe {
466 let node = Pin::into_inner_unchecked(node) as *mut _;
467 NonNull::new_unchecked(node)
468 };
469
470 waiters.queue.push_front(node);
471 }
472 drop(waiters);
473 drop(old_waker);
474
475 Pending
476 }
477}
478
479impl fmt::Debug for Semaphore {
480 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
481 fmt&mut DebugStruct<'_, '_>.debug_struct("Semaphore")
482 .field(name:"permits", &self.available_permits())
483 .finish()
484 }
485}
486
487impl Waiter {
488 fn new(
489 num_permits: u32,
490 #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
491 ) -> Self {
492 Waiter {
493 waker: UnsafeCell::new(None),
494 state: AtomicUsize::new(num_permits as usize),
495 pointers: linked_list::Pointers::new(),
496 #[cfg(all(tokio_unstable, feature = "tracing"))]
497 ctx,
498 _p: PhantomPinned,
499 }
500 }
501
502 /// Assign permits to the waiter.
503 ///
504 /// Returns `true` if the waiter should be removed from the queue
505 fn assign_permits(&self, n: &mut usize) -> bool {
506 let mut curr = self.state.load(Acquire);
507 loop {
508 let assign = cmp::min(curr, *n);
509 let next = curr - assign;
510 match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
511 Ok(_) => {
512 *n -= assign;
513 #[cfg(all(tokio_unstable, feature = "tracing"))]
514 self.ctx.async_op_span.in_scope(|| {
515 tracing::trace!(
516 target: "runtime::resource::async_op::state_update",
517 permits_obtained = assign,
518 permits.op = "add",
519 );
520 });
521 return next == 0;
522 }
523 Err(actual) => curr = actual,
524 }
525 }
526 }
527}
528
529impl Future for Acquire<'_> {
530 type Output = Result<(), AcquireError>;
531
532 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
533 #[cfg(all(tokio_unstable, feature = "tracing"))]
534 let _resource_span = self.node.ctx.resource_span.clone().entered();
535 #[cfg(all(tokio_unstable, feature = "tracing"))]
536 let _async_op_span = self.node.ctx.async_op_span.clone().entered();
537 #[cfg(all(tokio_unstable, feature = "tracing"))]
538 let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
539
540 let (node, semaphore, needed, queued) = self.project();
541
542 // First, ensure the current task has enough budget to proceed.
543 #[cfg(all(tokio_unstable, feature = "tracing"))]
544 let coop = ready!(trace_poll_op!(
545 "poll_acquire",
546 crate::runtime::coop::poll_proceed(cx),
547 ));
548
549 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
550 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
551
552 let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
553 Pending => {
554 *queued = true;
555 Pending
556 }
557 Ready(r) => {
558 coop.made_progress();
559 r?;
560 *queued = false;
561 Ready(Ok(()))
562 }
563 };
564
565 #[cfg(all(tokio_unstable, feature = "tracing"))]
566 return trace_poll_op!("poll_acquire", result);
567
568 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
569 return result;
570 }
571}
572
573impl<'a> Acquire<'a> {
574 fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
575 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
576 return Self {
577 node: Waiter::new(num_permits),
578 semaphore,
579 num_permits,
580 queued: false,
581 };
582
583 #[cfg(all(tokio_unstable, feature = "tracing"))]
584 return semaphore.resource_span.in_scope(|| {
585 let async_op_span =
586 tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
587 let async_op_poll_span = async_op_span.in_scope(|| {
588 tracing::trace!(
589 target: "runtime::resource::async_op::state_update",
590 permits_requested = num_permits,
591 permits.op = "override",
592 );
593
594 tracing::trace!(
595 target: "runtime::resource::async_op::state_update",
596 permits_obtained = 0usize,
597 permits.op = "override",
598 );
599
600 tracing::trace_span!("runtime.resource.async_op.poll")
601 });
602
603 let ctx = trace::AsyncOpTracingCtx {
604 async_op_span,
605 async_op_poll_span,
606 resource_span: semaphore.resource_span.clone(),
607 };
608
609 Self {
610 node: Waiter::new(num_permits, ctx),
611 semaphore,
612 num_permits,
613 queued: false,
614 }
615 });
616 }
617
618 fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
619 fn is_unpin<T: Unpin>() {}
620 unsafe {
621 // Safety: all fields other than `node` are `Unpin`
622
623 is_unpin::<&Semaphore>();
624 is_unpin::<&mut bool>();
625 is_unpin::<u32>();
626
627 let this = self.get_unchecked_mut();
628 (
629 Pin::new_unchecked(&mut this.node),
630 this.semaphore,
631 this.num_permits,
632 &mut this.queued,
633 )
634 }
635 }
636}
637
638impl Drop for Acquire<'_> {
639 fn drop(&mut self) {
640 // If the future is completed, there is no node in the wait list, so we
641 // can skip acquiring the lock.
642 if !self.queued {
643 return;
644 }
645
646 // This is where we ensure safety. The future is being dropped,
647 // which means we must ensure that the waiter entry is no longer stored
648 // in the linked list.
649 let mut waiters: MutexGuard<'_, Waitlist> = self.semaphore.waiters.lock();
650
651 // remove the entry from the list
652 let node: NonNull = NonNull::from(&mut self.node);
653 // Safety: we have locked the wait list.
654 unsafe { waiters.queue.remove(node) };
655
656 let acquired_permits: usize = self.num_permits as usize - self.node.state.load(order:Acquire);
657 if acquired_permits > 0 {
658 self.semaphore.add_permits_locked(rem:acquired_permits, waiters);
659 }
660 }
661}
662
663// Safety: the `Acquire` future is not `Sync` automatically because it contains
664// a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
665// `UnsafeCell` is only accessed when the future is borrowed mutably (either in
666// `poll` or in `drop`). Therefore, it is safe (although not particularly
667// _useful_) for the future to be borrowed immutably across threads.
668unsafe impl Sync for Acquire<'_> {}
669
670// ===== impl AcquireError ====
671
672impl AcquireError {
673 fn closed() -> AcquireError {
674 AcquireError(())
675 }
676}
677
678impl fmt::Display for AcquireError {
679 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
680 write!(fmt, "semaphore closed")
681 }
682}
683
684impl std::error::Error for AcquireError {}
685
686// ===== impl TryAcquireError =====
687
688impl TryAcquireError {
689 /// Returns `true` if the error was caused by a closed semaphore.
690 #[allow(dead_code)] // may be used later!
691 pub(crate) fn is_closed(&self) -> bool {
692 matches!(self, TryAcquireError::Closed)
693 }
694
695 /// Returns `true` if the error was caused by calling `try_acquire` on a
696 /// semaphore with no available permits.
697 #[allow(dead_code)] // may be used later!
698 pub(crate) fn is_no_permits(&self) -> bool {
699 matches!(self, TryAcquireError::NoPermits)
700 }
701}
702
703impl fmt::Display for TryAcquireError {
704 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
705 match self {
706 TryAcquireError::Closed => write!(fmt, "semaphore closed"),
707 TryAcquireError::NoPermits => write!(fmt, "no permits available"),
708 }
709 }
710}
711
712impl std::error::Error for TryAcquireError {}
713
714/// # Safety
715///
716/// `Waiter` is forced to be !Unpin.
717unsafe impl linked_list::Link for Waiter {
718 type Handle = NonNull<Waiter>;
719 type Target = Waiter;
720
721 fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
722 *handle
723 }
724
725 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
726 ptr
727 }
728
729 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
730 Waiter::addr_of_pointers(me:target)
731 }
732}
733