1#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2//! # Implementation Details.
3//!
4//! The semaphore is implemented using an intrusive linked list of waiters. An
5//! atomic counter tracks the number of available permits. If the semaphore does
6//! not contain the required number of permits, the task attempting to acquire
7//! permits places its waker at the end of a queue. When new permits are made
8//! available (such as by releasing an initial acquisition), they are assigned
9//! to the task at the front of the queue, waking that task if its requested
10//! number of permits is met.
11//!
12//! Because waiters are enqueued at the back of the linked list and dequeued
13//! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14//! of permits at a time will always be woken eventually, even if many other
15//! tasks are acquiring smaller numbers of permits. This means that in a
16//! use-case like tokio's read-write lock, writers will not be starved by
17//! readers.
18use crate::loom::cell::UnsafeCell;
19use crate::loom::sync::atomic::AtomicUsize;
20use crate::loom::sync::{Mutex, MutexGuard};
21use crate::util::linked_list::{self, LinkedList};
22#[cfg(all(tokio_unstable, feature = "tracing"))]
23use crate::util::trace;
24use crate::util::WakeList;
25
26use std::future::Future;
27use std::marker::PhantomPinned;
28use std::pin::Pin;
29use std::ptr::NonNull;
30use std::sync::atomic::Ordering::*;
31use std::task::{Context, Poll, Waker};
32use std::{cmp, fmt};
33
34/// An asynchronous counting semaphore which permits waiting on multiple permits at once.
35pub(crate) struct Semaphore {
36 waiters: Mutex<Waitlist>,
37 /// The current number of available permits in the semaphore.
38 permits: AtomicUsize,
39 #[cfg(all(tokio_unstable, feature = "tracing"))]
40 resource_span: tracing::Span,
41}
42
43struct Waitlist {
44 queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
45 closed: bool,
46}
47
48/// Error returned from the [`Semaphore::try_acquire`] function.
49///
50/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
51#[derive(Debug, PartialEq, Eq)]
52pub enum TryAcquireError {
53 /// The semaphore has been [closed] and cannot issue new permits.
54 ///
55 /// [closed]: crate::sync::Semaphore::close
56 Closed,
57
58 /// The semaphore has no available permits.
59 NoPermits,
60}
61/// Error returned from the [`Semaphore::acquire`] function.
62///
63/// An `acquire` operation can only fail if the semaphore has been
64/// [closed].
65///
66/// [closed]: crate::sync::Semaphore::close
67/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
68#[derive(Debug)]
69pub struct AcquireError(());
70
71pub(crate) struct Acquire<'a> {
72 node: Waiter,
73 semaphore: &'a Semaphore,
74 num_permits: usize,
75 queued: bool,
76}
77
78/// An entry in the wait queue.
79struct Waiter {
80 /// The current state of the waiter.
81 ///
82 /// This is either the number of remaining permits required by
83 /// the waiter, or a flag indicating that the waiter is not yet queued.
84 state: AtomicUsize,
85
86 /// The waker to notify the task awaiting permits.
87 ///
88 /// # Safety
89 ///
90 /// This may only be accessed while the wait queue is locked.
91 waker: UnsafeCell<Option<Waker>>,
92
93 /// Intrusive linked-list pointers.
94 ///
95 /// # Safety
96 ///
97 /// This may only be accessed while the wait queue is locked.
98 ///
99 /// TODO: Ideally, we would be able to use loom to enforce that
100 /// this isn't accessed concurrently. However, it is difficult to
101 /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
102 /// references to `Pointers`, and `UnsafeCell` requires that checked access
103 /// take place inside a closure. We should consider changing `Pointers` to
104 /// use `UnsafeCell` internally.
105 pointers: linked_list::Pointers<Waiter>,
106
107 #[cfg(all(tokio_unstable, feature = "tracing"))]
108 ctx: trace::AsyncOpTracingCtx,
109
110 /// Should not be `Unpin`.
111 _p: PhantomPinned,
112}
113
114generate_addr_of_methods! {
115 impl<> Waiter {
116 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
117 &self.pointers
118 }
119 }
120}
121
122impl Semaphore {
123 /// The maximum number of permits which a semaphore can hold.
124 ///
125 /// Note that this reserves three bits of flags in the permit counter, but
126 /// we only actually use one of them. However, the previous semaphore
127 /// implementation used three bits, so we will continue to reserve them to
128 /// avoid a breaking change if additional flags need to be added in the
129 /// future.
130 pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
131 const CLOSED: usize = 1;
132 // The least-significant bit in the number of permits is reserved to use
133 // as a flag indicating that the semaphore has been closed. Consequently
134 // PERMIT_SHIFT is used to leave that bit for that purpose.
135 const PERMIT_SHIFT: usize = 1;
136
137 /// Creates a new semaphore with the initial number of permits
138 ///
139 /// Maximum number of permits on 32-bit platforms is `1<<29`.
140 pub(crate) fn new(permits: usize) -> Self {
141 assert!(
142 permits <= Self::MAX_PERMITS,
143 "a semaphore may not have more than MAX_PERMITS permits ({})",
144 Self::MAX_PERMITS
145 );
146
147 #[cfg(all(tokio_unstable, feature = "tracing"))]
148 let resource_span = {
149 let resource_span = tracing::trace_span!(
150 "runtime.resource",
151 concrete_type = "Semaphore",
152 kind = "Sync",
153 is_internal = true
154 );
155
156 resource_span.in_scope(|| {
157 tracing::trace!(
158 target: "runtime::resource::state_update",
159 permits = permits,
160 permits.op = "override",
161 )
162 });
163 resource_span
164 };
165
166 Self {
167 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
168 waiters: Mutex::new(Waitlist {
169 queue: LinkedList::new(),
170 closed: false,
171 }),
172 #[cfg(all(tokio_unstable, feature = "tracing"))]
173 resource_span,
174 }
175 }
176
177 /// Creates a new semaphore with the initial number of permits.
178 ///
179 /// Maximum number of permits on 32-bit platforms is `1<<29`.
180 #[cfg(not(all(loom, test)))]
181 pub(crate) const fn const_new(permits: usize) -> Self {
182 assert!(permits <= Self::MAX_PERMITS);
183
184 Self {
185 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
186 waiters: Mutex::const_new(Waitlist {
187 queue: LinkedList::new(),
188 closed: false,
189 }),
190 #[cfg(all(tokio_unstable, feature = "tracing"))]
191 resource_span: tracing::Span::none(),
192 }
193 }
194
195 /// Creates a new closed semaphore with 0 permits.
196 pub(crate) fn new_closed() -> Self {
197 Self {
198 permits: AtomicUsize::new(Self::CLOSED),
199 waiters: Mutex::new(Waitlist {
200 queue: LinkedList::new(),
201 closed: true,
202 }),
203 #[cfg(all(tokio_unstable, feature = "tracing"))]
204 resource_span: tracing::Span::none(),
205 }
206 }
207
208 /// Creates a new closed semaphore with 0 permits.
209 #[cfg(not(all(loom, test)))]
210 pub(crate) const fn const_new_closed() -> Self {
211 Self {
212 permits: AtomicUsize::new(Self::CLOSED),
213 waiters: Mutex::const_new(Waitlist {
214 queue: LinkedList::new(),
215 closed: true,
216 }),
217 #[cfg(all(tokio_unstable, feature = "tracing"))]
218 resource_span: tracing::Span::none(),
219 }
220 }
221
222 /// Returns the current number of available permits.
223 pub(crate) fn available_permits(&self) -> usize {
224 self.permits.load(Acquire) >> Self::PERMIT_SHIFT
225 }
226
227 /// Adds `added` new permits to the semaphore.
228 ///
229 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
230 pub(crate) fn release(&self, added: usize) {
231 if added == 0 {
232 return;
233 }
234
235 // Assign permits to the wait queue
236 self.add_permits_locked(added, self.waiters.lock());
237 }
238
239 /// Closes the semaphore. This prevents the semaphore from issuing new
240 /// permits and notifies all pending waiters.
241 pub(crate) fn close(&self) {
242 let mut waiters = self.waiters.lock();
243 // If the semaphore's permits counter has enough permits for an
244 // unqueued waiter to acquire all the permits it needs immediately,
245 // it won't touch the wait list. Therefore, we have to set a bit on
246 // the permit counter as well. However, we must do this while
247 // holding the lock --- otherwise, if we set the bit and then wait
248 // to acquire the lock we'll enter an inconsistent state where the
249 // permit counter is closed, but the wait list is not.
250 self.permits.fetch_or(Self::CLOSED, Release);
251 waiters.closed = true;
252 while let Some(mut waiter) = waiters.queue.pop_back() {
253 let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
254 if let Some(waker) = waker {
255 waker.wake();
256 }
257 }
258 }
259
260 /// Returns true if the semaphore is closed.
261 pub(crate) fn is_closed(&self) -> bool {
262 self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
263 }
264
265 pub(crate) fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
266 assert!(
267 num_permits <= Self::MAX_PERMITS,
268 "a semaphore may not have more than MAX_PERMITS permits ({})",
269 Self::MAX_PERMITS
270 );
271 let num_permits = num_permits << Self::PERMIT_SHIFT;
272 let mut curr = self.permits.load(Acquire);
273 loop {
274 // Has the semaphore closed?
275 if curr & Self::CLOSED == Self::CLOSED {
276 return Err(TryAcquireError::Closed);
277 }
278
279 // Are there enough permits remaining?
280 if curr < num_permits {
281 return Err(TryAcquireError::NoPermits);
282 }
283
284 let next = curr - num_permits;
285
286 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
287 Ok(_) => {
288 // TODO: Instrument once issue has been solved
289 return Ok(());
290 }
291 Err(actual) => curr = actual,
292 }
293 }
294 }
295
296 pub(crate) fn acquire(&self, num_permits: usize) -> Acquire<'_> {
297 Acquire::new(self, num_permits)
298 }
299
300 /// Release `rem` permits to the semaphore's wait list, starting from the
301 /// end of the queue.
302 ///
303 /// If `rem` exceeds the number of permits needed by the wait list, the
304 /// remainder are assigned back to the semaphore.
305 fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
306 let mut wakers = WakeList::new();
307 let mut lock = Some(waiters);
308 let mut is_empty = false;
309 while rem > 0 {
310 let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
311 'inner: while wakers.can_push() {
312 // Was the waiter assigned enough permits to wake it?
313 match waiters.queue.last() {
314 Some(waiter) => {
315 if !waiter.assign_permits(&mut rem) {
316 break 'inner;
317 }
318 }
319 None => {
320 is_empty = true;
321 // If we assigned permits to all the waiters in the queue, and there are
322 // still permits left over, assign them back to the semaphore.
323 break 'inner;
324 }
325 };
326 let mut waiter = waiters.queue.pop_back().unwrap();
327 if let Some(waker) =
328 unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
329 {
330 wakers.push(waker);
331 }
332 }
333
334 if rem > 0 && is_empty {
335 let permits = rem;
336 assert!(
337 permits <= Self::MAX_PERMITS,
338 "cannot add more than MAX_PERMITS permits ({})",
339 Self::MAX_PERMITS
340 );
341 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
342 let prev = prev >> Self::PERMIT_SHIFT;
343 assert!(
344 prev + permits <= Self::MAX_PERMITS,
345 "number of added permits ({}) would overflow MAX_PERMITS ({})",
346 rem,
347 Self::MAX_PERMITS
348 );
349
350 // add remaining permits back
351 #[cfg(all(tokio_unstable, feature = "tracing"))]
352 self.resource_span.in_scope(|| {
353 tracing::trace!(
354 target: "runtime::resource::state_update",
355 permits = rem,
356 permits.op = "add",
357 )
358 });
359
360 rem = 0;
361 }
362
363 drop(waiters); // release the lock
364
365 wakers.wake_all();
366 }
367
368 assert_eq!(rem, 0);
369 }
370
371 fn poll_acquire(
372 &self,
373 cx: &mut Context<'_>,
374 num_permits: usize,
375 node: Pin<&mut Waiter>,
376 queued: bool,
377 ) -> Poll<Result<(), AcquireError>> {
378 let mut acquired = 0;
379
380 let needed = if queued {
381 node.state.load(Acquire) << Self::PERMIT_SHIFT
382 } else {
383 num_permits << Self::PERMIT_SHIFT
384 };
385
386 let mut lock = None;
387 // First, try to take the requested number of permits from the
388 // semaphore.
389 let mut curr = self.permits.load(Acquire);
390 let mut waiters = loop {
391 // Has the semaphore closed?
392 if curr & Self::CLOSED > 0 {
393 return Poll::Ready(Err(AcquireError::closed()));
394 }
395
396 let mut remaining = 0;
397 let total = curr
398 .checked_add(acquired)
399 .expect("number of permits must not overflow");
400 let (next, acq) = if total >= needed {
401 let next = curr - (needed - acquired);
402 (next, needed >> Self::PERMIT_SHIFT)
403 } else {
404 remaining = (needed - acquired) - curr;
405 (0, curr >> Self::PERMIT_SHIFT)
406 };
407
408 if remaining > 0 && lock.is_none() {
409 // No permits were immediately available, so this permit will
410 // (probably) need to wait. We'll need to acquire a lock on the
411 // wait queue before continuing. We need to do this _before_ the
412 // CAS that sets the new value of the semaphore's `permits`
413 // counter. Otherwise, if we subtract the permits and then
414 // acquire the lock, we might miss additional permits being
415 // added while waiting for the lock.
416 lock = Some(self.waiters.lock());
417 }
418
419 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
420 Ok(_) => {
421 acquired += acq;
422 if remaining == 0 {
423 if !queued {
424 #[cfg(all(tokio_unstable, feature = "tracing"))]
425 self.resource_span.in_scope(|| {
426 tracing::trace!(
427 target: "runtime::resource::state_update",
428 permits = acquired,
429 permits.op = "sub",
430 );
431 tracing::trace!(
432 target: "runtime::resource::async_op::state_update",
433 permits_obtained = acquired,
434 permits.op = "add",
435 )
436 });
437
438 return Poll::Ready(Ok(()));
439 } else if lock.is_none() {
440 break self.waiters.lock();
441 }
442 }
443 break lock.expect("lock must be acquired before waiting");
444 }
445 Err(actual) => curr = actual,
446 }
447 };
448
449 if waiters.closed {
450 return Poll::Ready(Err(AcquireError::closed()));
451 }
452
453 #[cfg(all(tokio_unstable, feature = "tracing"))]
454 self.resource_span.in_scope(|| {
455 tracing::trace!(
456 target: "runtime::resource::state_update",
457 permits = acquired,
458 permits.op = "sub",
459 )
460 });
461
462 if node.assign_permits(&mut acquired) {
463 self.add_permits_locked(acquired, waiters);
464 return Poll::Ready(Ok(()));
465 }
466
467 assert_eq!(acquired, 0);
468 let mut old_waker = None;
469
470 // Otherwise, register the waker & enqueue the node.
471 node.waker.with_mut(|waker| {
472 // Safety: the wait list is locked, so we may modify the waker.
473 let waker = unsafe { &mut *waker };
474 // Do we need to register the new waker?
475 if waker
476 .as_ref()
477 .map_or(true, |waker| !waker.will_wake(cx.waker()))
478 {
479 old_waker = std::mem::replace(waker, Some(cx.waker().clone()));
480 }
481 });
482
483 // If the waiter is not already in the wait queue, enqueue it.
484 if !queued {
485 let node = unsafe {
486 let node = Pin::into_inner_unchecked(node) as *mut _;
487 NonNull::new_unchecked(node)
488 };
489
490 waiters.queue.push_front(node);
491 }
492 drop(waiters);
493 drop(old_waker);
494
495 Poll::Pending
496 }
497}
498
499impl fmt::Debug for Semaphore {
500 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
501 fmt&mut DebugStruct<'_, '_>.debug_struct("Semaphore")
502 .field(name:"permits", &self.available_permits())
503 .finish()
504 }
505}
506
507impl Waiter {
508 fn new(
509 num_permits: usize,
510 #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
511 ) -> Self {
512 Waiter {
513 waker: UnsafeCell::new(None),
514 state: AtomicUsize::new(num_permits),
515 pointers: linked_list::Pointers::new(),
516 #[cfg(all(tokio_unstable, feature = "tracing"))]
517 ctx,
518 _p: PhantomPinned,
519 }
520 }
521
522 /// Assign permits to the waiter.
523 ///
524 /// Returns `true` if the waiter should be removed from the queue
525 fn assign_permits(&self, n: &mut usize) -> bool {
526 let mut curr = self.state.load(Acquire);
527 loop {
528 let assign = cmp::min(curr, *n);
529 let next = curr - assign;
530 match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
531 Ok(_) => {
532 *n -= assign;
533 #[cfg(all(tokio_unstable, feature = "tracing"))]
534 self.ctx.async_op_span.in_scope(|| {
535 tracing::trace!(
536 target: "runtime::resource::async_op::state_update",
537 permits_obtained = assign,
538 permits.op = "add",
539 );
540 });
541 return next == 0;
542 }
543 Err(actual) => curr = actual,
544 }
545 }
546 }
547}
548
549impl Future for Acquire<'_> {
550 type Output = Result<(), AcquireError>;
551
552 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
553 #[cfg(all(tokio_unstable, feature = "tracing"))]
554 let _resource_span = self.node.ctx.resource_span.clone().entered();
555 #[cfg(all(tokio_unstable, feature = "tracing"))]
556 let _async_op_span = self.node.ctx.async_op_span.clone().entered();
557 #[cfg(all(tokio_unstable, feature = "tracing"))]
558 let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
559
560 let (node, semaphore, needed, queued) = self.project();
561
562 // First, ensure the current task has enough budget to proceed.
563 #[cfg(all(tokio_unstable, feature = "tracing"))]
564 let coop = ready!(trace_poll_op!(
565 "poll_acquire",
566 crate::runtime::coop::poll_proceed(cx),
567 ));
568
569 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
570 let coop = ready!(crate::runtime::coop::poll_proceed(cx));
571
572 let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
573 Poll::Pending => {
574 *queued = true;
575 Poll::Pending
576 }
577 Poll::Ready(r) => {
578 coop.made_progress();
579 r?;
580 *queued = false;
581 Poll::Ready(Ok(()))
582 }
583 };
584
585 #[cfg(all(tokio_unstable, feature = "tracing"))]
586 return trace_poll_op!("poll_acquire", result);
587
588 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
589 return result;
590 }
591}
592
593impl<'a> Acquire<'a> {
594 fn new(semaphore: &'a Semaphore, num_permits: usize) -> Self {
595 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
596 return Self {
597 node: Waiter::new(num_permits),
598 semaphore,
599 num_permits,
600 queued: false,
601 };
602
603 #[cfg(all(tokio_unstable, feature = "tracing"))]
604 return semaphore.resource_span.in_scope(|| {
605 let async_op_span =
606 tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
607 let async_op_poll_span = async_op_span.in_scope(|| {
608 tracing::trace!(
609 target: "runtime::resource::async_op::state_update",
610 permits_requested = num_permits,
611 permits.op = "override",
612 );
613
614 tracing::trace!(
615 target: "runtime::resource::async_op::state_update",
616 permits_obtained = 0usize,
617 permits.op = "override",
618 );
619
620 tracing::trace_span!("runtime.resource.async_op.poll")
621 });
622
623 let ctx = trace::AsyncOpTracingCtx {
624 async_op_span,
625 async_op_poll_span,
626 resource_span: semaphore.resource_span.clone(),
627 };
628
629 Self {
630 node: Waiter::new(num_permits, ctx),
631 semaphore,
632 num_permits,
633 queued: false,
634 }
635 });
636 }
637
638 fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, usize, &mut bool) {
639 fn is_unpin<T: Unpin>() {}
640 unsafe {
641 // Safety: all fields other than `node` are `Unpin`
642
643 is_unpin::<&Semaphore>();
644 is_unpin::<&mut bool>();
645 is_unpin::<usize>();
646
647 let this = self.get_unchecked_mut();
648 (
649 Pin::new_unchecked(&mut this.node),
650 this.semaphore,
651 this.num_permits,
652 &mut this.queued,
653 )
654 }
655 }
656}
657
658impl Drop for Acquire<'_> {
659 fn drop(&mut self) {
660 // If the future is completed, there is no node in the wait list, so we
661 // can skip acquiring the lock.
662 if !self.queued {
663 return;
664 }
665
666 // This is where we ensure safety. The future is being dropped,
667 // which means we must ensure that the waiter entry is no longer stored
668 // in the linked list.
669 let mut waiters: MutexGuard<'_, Waitlist> = self.semaphore.waiters.lock();
670
671 // remove the entry from the list
672 let node: NonNull = NonNull::from(&mut self.node);
673 // Safety: we have locked the wait list.
674 unsafe { waiters.queue.remove(node) };
675
676 let acquired_permits: usize = self.num_permits - self.node.state.load(order:Acquire);
677 if acquired_permits > 0 {
678 self.semaphore.add_permits_locked(rem:acquired_permits, waiters);
679 }
680 }
681}
682
683// Safety: the `Acquire` future is not `Sync` automatically because it contains
684// a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
685// `UnsafeCell` is only accessed when the future is borrowed mutably (either in
686// `poll` or in `drop`). Therefore, it is safe (although not particularly
687// _useful_) for the future to be borrowed immutably across threads.
688unsafe impl Sync for Acquire<'_> {}
689
690// ===== impl AcquireError ====
691
692impl AcquireError {
693 fn closed() -> AcquireError {
694 AcquireError(())
695 }
696}
697
698impl fmt::Display for AcquireError {
699 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
700 write!(fmt, "semaphore closed")
701 }
702}
703
704impl std::error::Error for AcquireError {}
705
706// ===== impl TryAcquireError =====
707
708impl TryAcquireError {
709 /// Returns `true` if the error was caused by a closed semaphore.
710 #[allow(dead_code)] // may be used later!
711 pub(crate) fn is_closed(&self) -> bool {
712 matches!(self, TryAcquireError::Closed)
713 }
714
715 /// Returns `true` if the error was caused by calling `try_acquire` on a
716 /// semaphore with no available permits.
717 #[allow(dead_code)] // may be used later!
718 pub(crate) fn is_no_permits(&self) -> bool {
719 matches!(self, TryAcquireError::NoPermits)
720 }
721}
722
723impl fmt::Display for TryAcquireError {
724 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
725 match self {
726 TryAcquireError::Closed => write!(fmt, "semaphore closed"),
727 TryAcquireError::NoPermits => write!(fmt, "no permits available"),
728 }
729 }
730}
731
732impl std::error::Error for TryAcquireError {}
733
734/// # Safety
735///
736/// `Waiter` is forced to be !Unpin.
737unsafe impl linked_list::Link for Waiter {
738 type Handle = NonNull<Waiter>;
739 type Target = Waiter;
740
741 fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
742 *handle
743 }
744
745 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
746 ptr
747 }
748
749 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
750 Waiter::addr_of_pointers(me:target)
751 }
752}
753