1 | #![cfg_attr (not(feature = "sync" ), allow(unreachable_pub, dead_code))] |
2 | //! # Implementation Details. |
3 | //! |
4 | //! The semaphore is implemented using an intrusive linked list of waiters. An |
5 | //! atomic counter tracks the number of available permits. If the semaphore does |
6 | //! not contain the required number of permits, the task attempting to acquire |
7 | //! permits places its waker at the end of a queue. When new permits are made |
8 | //! available (such as by releasing an initial acquisition), they are assigned |
9 | //! to the task at the front of the queue, waking that task if its requested |
10 | //! number of permits is met. |
11 | //! |
12 | //! Because waiters are enqueued at the back of the linked list and dequeued |
13 | //! from the front, the semaphore is fair. Tasks trying to acquire large numbers |
14 | //! of permits at a time will always be woken eventually, even if many other |
15 | //! tasks are acquiring smaller numbers of permits. This means that in a |
16 | //! use-case like tokio's read-write lock, writers will not be starved by |
17 | //! readers. |
18 | use crate::loom::cell::UnsafeCell; |
19 | use crate::loom::sync::atomic::AtomicUsize; |
20 | use crate::loom::sync::{Mutex, MutexGuard}; |
21 | use crate::util::linked_list::{self, LinkedList}; |
22 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
23 | use crate::util::trace; |
24 | use crate::util::WakeList; |
25 | |
26 | use std::future::Future; |
27 | use std::marker::PhantomPinned; |
28 | use std::pin::Pin; |
29 | use std::ptr::NonNull; |
30 | use std::sync::atomic::Ordering::*; |
31 | use std::task::Poll::*; |
32 | use std::task::{Context, Poll, Waker}; |
33 | use std::{cmp, fmt}; |
34 | |
35 | /// An asynchronous counting semaphore which permits waiting on multiple permits at once. |
36 | pub(crate) struct Semaphore { |
37 | waiters: Mutex<Waitlist>, |
38 | /// The current number of available permits in the semaphore. |
39 | permits: AtomicUsize, |
40 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
41 | resource_span: tracing::Span, |
42 | } |
43 | |
44 | struct Waitlist { |
45 | queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, |
46 | closed: bool, |
47 | } |
48 | |
49 | /// Error returned from the [`Semaphore::try_acquire`] function. |
50 | /// |
51 | /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire |
52 | #[derive (Debug, PartialEq, Eq)] |
53 | pub enum TryAcquireError { |
54 | /// The semaphore has been [closed] and cannot issue new permits. |
55 | /// |
56 | /// [closed]: crate::sync::Semaphore::close |
57 | Closed, |
58 | |
59 | /// The semaphore has no available permits. |
60 | NoPermits, |
61 | } |
62 | /// Error returned from the [`Semaphore::acquire`] function. |
63 | /// |
64 | /// An `acquire` operation can only fail if the semaphore has been |
65 | /// [closed]. |
66 | /// |
67 | /// [closed]: crate::sync::Semaphore::close |
68 | /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire |
69 | #[derive (Debug)] |
70 | pub struct AcquireError(()); |
71 | |
72 | pub(crate) struct Acquire<'a> { |
73 | node: Waiter, |
74 | semaphore: &'a Semaphore, |
75 | num_permits: u32, |
76 | queued: bool, |
77 | } |
78 | |
79 | /// An entry in the wait queue. |
80 | struct Waiter { |
81 | /// The current state of the waiter. |
82 | /// |
83 | /// This is either the number of remaining permits required by |
84 | /// the waiter, or a flag indicating that the waiter is not yet queued. |
85 | state: AtomicUsize, |
86 | |
87 | /// The waker to notify the task awaiting permits. |
88 | /// |
89 | /// # Safety |
90 | /// |
91 | /// This may only be accessed while the wait queue is locked. |
92 | waker: UnsafeCell<Option<Waker>>, |
93 | |
94 | /// Intrusive linked-list pointers. |
95 | /// |
96 | /// # Safety |
97 | /// |
98 | /// This may only be accessed while the wait queue is locked. |
99 | /// |
100 | /// TODO: Ideally, we would be able to use loom to enforce that |
101 | /// this isn't accessed concurrently. However, it is difficult to |
102 | /// use a `UnsafeCell` here, since the `Link` trait requires _returning_ |
103 | /// references to `Pointers`, and `UnsafeCell` requires that checked access |
104 | /// take place inside a closure. We should consider changing `Pointers` to |
105 | /// use `UnsafeCell` internally. |
106 | pointers: linked_list::Pointers<Waiter>, |
107 | |
108 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
109 | ctx: trace::AsyncOpTracingCtx, |
110 | |
111 | /// Should not be `Unpin`. |
112 | _p: PhantomPinned, |
113 | } |
114 | |
115 | generate_addr_of_methods! { |
116 | impl<> Waiter { |
117 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
118 | &self.pointers |
119 | } |
120 | } |
121 | } |
122 | |
123 | impl Semaphore { |
124 | /// The maximum number of permits which a semaphore can hold. |
125 | /// |
126 | /// Note that this reserves three bits of flags in the permit counter, but |
127 | /// we only actually use one of them. However, the previous semaphore |
128 | /// implementation used three bits, so we will continue to reserve them to |
129 | /// avoid a breaking change if additional flags need to be added in the |
130 | /// future. |
131 | pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3; |
132 | const CLOSED: usize = 1; |
133 | // The least-significant bit in the number of permits is reserved to use |
134 | // as a flag indicating that the semaphore has been closed. Consequently |
135 | // PERMIT_SHIFT is used to leave that bit for that purpose. |
136 | const PERMIT_SHIFT: usize = 1; |
137 | |
138 | /// Creates a new semaphore with the initial number of permits |
139 | /// |
140 | /// Maximum number of permits on 32-bit platforms is `1<<29`. |
141 | pub(crate) fn new(permits: usize) -> Self { |
142 | assert!( |
143 | permits <= Self::MAX_PERMITS, |
144 | "a semaphore may not have more than MAX_PERMITS permits ( {})" , |
145 | Self::MAX_PERMITS |
146 | ); |
147 | |
148 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
149 | let resource_span = { |
150 | let resource_span = tracing::trace_span!( |
151 | "runtime.resource" , |
152 | concrete_type = "Semaphore" , |
153 | kind = "Sync" , |
154 | is_internal = true |
155 | ); |
156 | |
157 | resource_span.in_scope(|| { |
158 | tracing::trace!( |
159 | target: "runtime::resource::state_update" , |
160 | permits = permits, |
161 | permits.op = "override" , |
162 | ) |
163 | }); |
164 | resource_span |
165 | }; |
166 | |
167 | Self { |
168 | permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), |
169 | waiters: Mutex::new(Waitlist { |
170 | queue: LinkedList::new(), |
171 | closed: false, |
172 | }), |
173 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
174 | resource_span, |
175 | } |
176 | } |
177 | |
178 | /// Creates a new semaphore with the initial number of permits. |
179 | /// |
180 | /// Maximum number of permits on 32-bit platforms is `1<<29`. |
181 | /// |
182 | /// If the specified number of permits exceeds the maximum permit amount |
183 | /// Then the value will get clamped to the maximum number of permits. |
184 | #[cfg (all(feature = "parking_lot" , not(all(loom, test))))] |
185 | pub(crate) const fn const_new(mut permits: usize) -> Self { |
186 | // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925 |
187 | // currently we just clamp the permit count when it exceeds the max |
188 | permits &= Self::MAX_PERMITS; |
189 | |
190 | Self { |
191 | permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), |
192 | waiters: Mutex::const_new(Waitlist { |
193 | queue: LinkedList::new(), |
194 | closed: false, |
195 | }), |
196 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
197 | resource_span: tracing::Span::none(), |
198 | } |
199 | } |
200 | |
201 | /// Returns the current number of available permits. |
202 | pub(crate) fn available_permits(&self) -> usize { |
203 | self.permits.load(Acquire) >> Self::PERMIT_SHIFT |
204 | } |
205 | |
206 | /// Adds `added` new permits to the semaphore. |
207 | /// |
208 | /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded. |
209 | pub(crate) fn release(&self, added: usize) { |
210 | if added == 0 { |
211 | return; |
212 | } |
213 | |
214 | // Assign permits to the wait queue |
215 | self.add_permits_locked(added, self.waiters.lock()); |
216 | } |
217 | |
218 | /// Closes the semaphore. This prevents the semaphore from issuing new |
219 | /// permits and notifies all pending waiters. |
220 | pub(crate) fn close(&self) { |
221 | let mut waiters = self.waiters.lock(); |
222 | // If the semaphore's permits counter has enough permits for an |
223 | // unqueued waiter to acquire all the permits it needs immediately, |
224 | // it won't touch the wait list. Therefore, we have to set a bit on |
225 | // the permit counter as well. However, we must do this while |
226 | // holding the lock --- otherwise, if we set the bit and then wait |
227 | // to acquire the lock we'll enter an inconsistent state where the |
228 | // permit counter is closed, but the wait list is not. |
229 | self.permits.fetch_or(Self::CLOSED, Release); |
230 | waiters.closed = true; |
231 | while let Some(mut waiter) = waiters.queue.pop_back() { |
232 | let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; |
233 | if let Some(waker) = waker { |
234 | waker.wake(); |
235 | } |
236 | } |
237 | } |
238 | |
239 | /// Returns true if the semaphore is closed. |
240 | pub(crate) fn is_closed(&self) -> bool { |
241 | self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED |
242 | } |
243 | |
244 | pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { |
245 | assert!( |
246 | num_permits as usize <= Self::MAX_PERMITS, |
247 | "a semaphore may not have more than MAX_PERMITS permits ( {})" , |
248 | Self::MAX_PERMITS |
249 | ); |
250 | let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; |
251 | let mut curr = self.permits.load(Acquire); |
252 | loop { |
253 | // Has the semaphore closed? |
254 | if curr & Self::CLOSED == Self::CLOSED { |
255 | return Err(TryAcquireError::Closed); |
256 | } |
257 | |
258 | // Are there enough permits remaining? |
259 | if curr < num_permits { |
260 | return Err(TryAcquireError::NoPermits); |
261 | } |
262 | |
263 | let next = curr - num_permits; |
264 | |
265 | match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { |
266 | Ok(_) => { |
267 | // TODO: Instrument once issue has been solved |
268 | return Ok(()); |
269 | } |
270 | Err(actual) => curr = actual, |
271 | } |
272 | } |
273 | } |
274 | |
275 | pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> { |
276 | Acquire::new(self, num_permits) |
277 | } |
278 | |
279 | /// Release `rem` permits to the semaphore's wait list, starting from the |
280 | /// end of the queue. |
281 | /// |
282 | /// If `rem` exceeds the number of permits needed by the wait list, the |
283 | /// remainder are assigned back to the semaphore. |
284 | fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { |
285 | let mut wakers = WakeList::new(); |
286 | let mut lock = Some(waiters); |
287 | let mut is_empty = false; |
288 | while rem > 0 { |
289 | let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); |
290 | 'inner: while wakers.can_push() { |
291 | // Was the waiter assigned enough permits to wake it? |
292 | match waiters.queue.last() { |
293 | Some(waiter) => { |
294 | if !waiter.assign_permits(&mut rem) { |
295 | break 'inner; |
296 | } |
297 | } |
298 | None => { |
299 | is_empty = true; |
300 | // If we assigned permits to all the waiters in the queue, and there are |
301 | // still permits left over, assign them back to the semaphore. |
302 | break 'inner; |
303 | } |
304 | }; |
305 | let mut waiter = waiters.queue.pop_back().unwrap(); |
306 | if let Some(waker) = |
307 | unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } |
308 | { |
309 | wakers.push(waker); |
310 | } |
311 | } |
312 | |
313 | if rem > 0 && is_empty { |
314 | let permits = rem; |
315 | assert!( |
316 | permits <= Self::MAX_PERMITS, |
317 | "cannot add more than MAX_PERMITS permits ( {})" , |
318 | Self::MAX_PERMITS |
319 | ); |
320 | let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release); |
321 | let prev = prev >> Self::PERMIT_SHIFT; |
322 | assert!( |
323 | prev + permits <= Self::MAX_PERMITS, |
324 | "number of added permits ( {}) would overflow MAX_PERMITS ( {})" , |
325 | rem, |
326 | Self::MAX_PERMITS |
327 | ); |
328 | |
329 | // add remaining permits back |
330 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
331 | self.resource_span.in_scope(|| { |
332 | tracing::trace!( |
333 | target: "runtime::resource::state_update" , |
334 | permits = rem, |
335 | permits.op = "add" , |
336 | ) |
337 | }); |
338 | |
339 | rem = 0; |
340 | } |
341 | |
342 | drop(waiters); // release the lock |
343 | |
344 | wakers.wake_all(); |
345 | } |
346 | |
347 | assert_eq!(rem, 0); |
348 | } |
349 | |
350 | fn poll_acquire( |
351 | &self, |
352 | cx: &mut Context<'_>, |
353 | num_permits: u32, |
354 | node: Pin<&mut Waiter>, |
355 | queued: bool, |
356 | ) -> Poll<Result<(), AcquireError>> { |
357 | let mut acquired = 0; |
358 | |
359 | let needed = if queued { |
360 | node.state.load(Acquire) << Self::PERMIT_SHIFT |
361 | } else { |
362 | (num_permits as usize) << Self::PERMIT_SHIFT |
363 | }; |
364 | |
365 | let mut lock = None; |
366 | // First, try to take the requested number of permits from the |
367 | // semaphore. |
368 | let mut curr = self.permits.load(Acquire); |
369 | let mut waiters = loop { |
370 | // Has the semaphore closed? |
371 | if curr & Self::CLOSED > 0 { |
372 | return Ready(Err(AcquireError::closed())); |
373 | } |
374 | |
375 | let mut remaining = 0; |
376 | let total = curr |
377 | .checked_add(acquired) |
378 | .expect("number of permits must not overflow" ); |
379 | let (next, acq) = if total >= needed { |
380 | let next = curr - (needed - acquired); |
381 | (next, needed >> Self::PERMIT_SHIFT) |
382 | } else { |
383 | remaining = (needed - acquired) - curr; |
384 | (0, curr >> Self::PERMIT_SHIFT) |
385 | }; |
386 | |
387 | if remaining > 0 && lock.is_none() { |
388 | // No permits were immediately available, so this permit will |
389 | // (probably) need to wait. We'll need to acquire a lock on the |
390 | // wait queue before continuing. We need to do this _before_ the |
391 | // CAS that sets the new value of the semaphore's `permits` |
392 | // counter. Otherwise, if we subtract the permits and then |
393 | // acquire the lock, we might miss additional permits being |
394 | // added while waiting for the lock. |
395 | lock = Some(self.waiters.lock()); |
396 | } |
397 | |
398 | match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { |
399 | Ok(_) => { |
400 | acquired += acq; |
401 | if remaining == 0 { |
402 | if !queued { |
403 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
404 | self.resource_span.in_scope(|| { |
405 | tracing::trace!( |
406 | target: "runtime::resource::state_update" , |
407 | permits = acquired, |
408 | permits.op = "sub" , |
409 | ); |
410 | tracing::trace!( |
411 | target: "runtime::resource::async_op::state_update" , |
412 | permits_obtained = acquired, |
413 | permits.op = "add" , |
414 | ) |
415 | }); |
416 | |
417 | return Ready(Ok(())); |
418 | } else if lock.is_none() { |
419 | break self.waiters.lock(); |
420 | } |
421 | } |
422 | break lock.expect("lock must be acquired before waiting" ); |
423 | } |
424 | Err(actual) => curr = actual, |
425 | } |
426 | }; |
427 | |
428 | if waiters.closed { |
429 | return Ready(Err(AcquireError::closed())); |
430 | } |
431 | |
432 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
433 | self.resource_span.in_scope(|| { |
434 | tracing::trace!( |
435 | target: "runtime::resource::state_update" , |
436 | permits = acquired, |
437 | permits.op = "sub" , |
438 | ) |
439 | }); |
440 | |
441 | if node.assign_permits(&mut acquired) { |
442 | self.add_permits_locked(acquired, waiters); |
443 | return Ready(Ok(())); |
444 | } |
445 | |
446 | assert_eq!(acquired, 0); |
447 | let mut old_waker = None; |
448 | |
449 | // Otherwise, register the waker & enqueue the node. |
450 | node.waker.with_mut(|waker| { |
451 | // Safety: the wait list is locked, so we may modify the waker. |
452 | let waker = unsafe { &mut *waker }; |
453 | // Do we need to register the new waker? |
454 | if waker |
455 | .as_ref() |
456 | .map(|waker| !waker.will_wake(cx.waker())) |
457 | .unwrap_or(true) |
458 | { |
459 | old_waker = std::mem::replace(waker, Some(cx.waker().clone())); |
460 | } |
461 | }); |
462 | |
463 | // If the waiter is not already in the wait queue, enqueue it. |
464 | if !queued { |
465 | let node = unsafe { |
466 | let node = Pin::into_inner_unchecked(node) as *mut _; |
467 | NonNull::new_unchecked(node) |
468 | }; |
469 | |
470 | waiters.queue.push_front(node); |
471 | } |
472 | drop(waiters); |
473 | drop(old_waker); |
474 | |
475 | Pending |
476 | } |
477 | } |
478 | |
479 | impl fmt::Debug for Semaphore { |
480 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
481 | fmt&mut DebugStruct<'_, '_>.debug_struct("Semaphore" ) |
482 | .field(name:"permits" , &self.available_permits()) |
483 | .finish() |
484 | } |
485 | } |
486 | |
487 | impl Waiter { |
488 | fn new( |
489 | num_permits: u32, |
490 | #[cfg (all(tokio_unstable, feature = "tracing" ))] ctx: trace::AsyncOpTracingCtx, |
491 | ) -> Self { |
492 | Waiter { |
493 | waker: UnsafeCell::new(None), |
494 | state: AtomicUsize::new(num_permits as usize), |
495 | pointers: linked_list::Pointers::new(), |
496 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
497 | ctx, |
498 | _p: PhantomPinned, |
499 | } |
500 | } |
501 | |
502 | /// Assign permits to the waiter. |
503 | /// |
504 | /// Returns `true` if the waiter should be removed from the queue |
505 | fn assign_permits(&self, n: &mut usize) -> bool { |
506 | let mut curr = self.state.load(Acquire); |
507 | loop { |
508 | let assign = cmp::min(curr, *n); |
509 | let next = curr - assign; |
510 | match self.state.compare_exchange(curr, next, AcqRel, Acquire) { |
511 | Ok(_) => { |
512 | *n -= assign; |
513 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
514 | self.ctx.async_op_span.in_scope(|| { |
515 | tracing::trace!( |
516 | target: "runtime::resource::async_op::state_update" , |
517 | permits_obtained = assign, |
518 | permits.op = "add" , |
519 | ); |
520 | }); |
521 | return next == 0; |
522 | } |
523 | Err(actual) => curr = actual, |
524 | } |
525 | } |
526 | } |
527 | } |
528 | |
529 | impl Future for Acquire<'_> { |
530 | type Output = Result<(), AcquireError>; |
531 | |
532 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
533 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
534 | let _resource_span = self.node.ctx.resource_span.clone().entered(); |
535 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
536 | let _async_op_span = self.node.ctx.async_op_span.clone().entered(); |
537 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
538 | let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered(); |
539 | |
540 | let (node, semaphore, needed, queued) = self.project(); |
541 | |
542 | // First, ensure the current task has enough budget to proceed. |
543 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
544 | let coop = ready!(trace_poll_op!( |
545 | "poll_acquire" , |
546 | crate::runtime::coop::poll_proceed(cx), |
547 | )); |
548 | |
549 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
550 | let coop = ready!(crate::runtime::coop::poll_proceed(cx)); |
551 | |
552 | let result = match semaphore.poll_acquire(cx, needed, node, *queued) { |
553 | Pending => { |
554 | *queued = true; |
555 | Pending |
556 | } |
557 | Ready(r) => { |
558 | coop.made_progress(); |
559 | r?; |
560 | *queued = false; |
561 | Ready(Ok(())) |
562 | } |
563 | }; |
564 | |
565 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
566 | return trace_poll_op!("poll_acquire" , result); |
567 | |
568 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
569 | return result; |
570 | } |
571 | } |
572 | |
573 | impl<'a> Acquire<'a> { |
574 | fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { |
575 | #[cfg (any(not(tokio_unstable), not(feature = "tracing" )))] |
576 | return Self { |
577 | node: Waiter::new(num_permits), |
578 | semaphore, |
579 | num_permits, |
580 | queued: false, |
581 | }; |
582 | |
583 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
584 | return semaphore.resource_span.in_scope(|| { |
585 | let async_op_span = |
586 | tracing::trace_span!("runtime.resource.async_op" , source = "Acquire::new" ); |
587 | let async_op_poll_span = async_op_span.in_scope(|| { |
588 | tracing::trace!( |
589 | target: "runtime::resource::async_op::state_update" , |
590 | permits_requested = num_permits, |
591 | permits.op = "override" , |
592 | ); |
593 | |
594 | tracing::trace!( |
595 | target: "runtime::resource::async_op::state_update" , |
596 | permits_obtained = 0usize, |
597 | permits.op = "override" , |
598 | ); |
599 | |
600 | tracing::trace_span!("runtime.resource.async_op.poll" ) |
601 | }); |
602 | |
603 | let ctx = trace::AsyncOpTracingCtx { |
604 | async_op_span, |
605 | async_op_poll_span, |
606 | resource_span: semaphore.resource_span.clone(), |
607 | }; |
608 | |
609 | Self { |
610 | node: Waiter::new(num_permits, ctx), |
611 | semaphore, |
612 | num_permits, |
613 | queued: false, |
614 | } |
615 | }); |
616 | } |
617 | |
618 | fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { |
619 | fn is_unpin<T: Unpin>() {} |
620 | unsafe { |
621 | // Safety: all fields other than `node` are `Unpin` |
622 | |
623 | is_unpin::<&Semaphore>(); |
624 | is_unpin::<&mut bool>(); |
625 | is_unpin::<u32>(); |
626 | |
627 | let this = self.get_unchecked_mut(); |
628 | ( |
629 | Pin::new_unchecked(&mut this.node), |
630 | this.semaphore, |
631 | this.num_permits, |
632 | &mut this.queued, |
633 | ) |
634 | } |
635 | } |
636 | } |
637 | |
638 | impl Drop for Acquire<'_> { |
639 | fn drop(&mut self) { |
640 | // If the future is completed, there is no node in the wait list, so we |
641 | // can skip acquiring the lock. |
642 | if !self.queued { |
643 | return; |
644 | } |
645 | |
646 | // This is where we ensure safety. The future is being dropped, |
647 | // which means we must ensure that the waiter entry is no longer stored |
648 | // in the linked list. |
649 | let mut waiters: MutexGuard<'_, Waitlist> = self.semaphore.waiters.lock(); |
650 | |
651 | // remove the entry from the list |
652 | let node: NonNull = NonNull::from(&mut self.node); |
653 | // Safety: we have locked the wait list. |
654 | unsafe { waiters.queue.remove(node) }; |
655 | |
656 | let acquired_permits: usize = self.num_permits as usize - self.node.state.load(order:Acquire); |
657 | if acquired_permits > 0 { |
658 | self.semaphore.add_permits_locked(rem:acquired_permits, waiters); |
659 | } |
660 | } |
661 | } |
662 | |
663 | // Safety: the `Acquire` future is not `Sync` automatically because it contains |
664 | // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the |
665 | // `UnsafeCell` is only accessed when the future is borrowed mutably (either in |
666 | // `poll` or in `drop`). Therefore, it is safe (although not particularly |
667 | // _useful_) for the future to be borrowed immutably across threads. |
668 | unsafe impl Sync for Acquire<'_> {} |
669 | |
670 | // ===== impl AcquireError ==== |
671 | |
672 | impl AcquireError { |
673 | fn closed() -> AcquireError { |
674 | AcquireError(()) |
675 | } |
676 | } |
677 | |
678 | impl fmt::Display for AcquireError { |
679 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
680 | write!(fmt, "semaphore closed" ) |
681 | } |
682 | } |
683 | |
684 | impl std::error::Error for AcquireError {} |
685 | |
686 | // ===== impl TryAcquireError ===== |
687 | |
688 | impl TryAcquireError { |
689 | /// Returns `true` if the error was caused by a closed semaphore. |
690 | #[allow (dead_code)] // may be used later! |
691 | pub(crate) fn is_closed(&self) -> bool { |
692 | matches!(self, TryAcquireError::Closed) |
693 | } |
694 | |
695 | /// Returns `true` if the error was caused by calling `try_acquire` on a |
696 | /// semaphore with no available permits. |
697 | #[allow (dead_code)] // may be used later! |
698 | pub(crate) fn is_no_permits(&self) -> bool { |
699 | matches!(self, TryAcquireError::NoPermits) |
700 | } |
701 | } |
702 | |
703 | impl fmt::Display for TryAcquireError { |
704 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
705 | match self { |
706 | TryAcquireError::Closed => write!(fmt, "semaphore closed" ), |
707 | TryAcquireError::NoPermits => write!(fmt, "no permits available" ), |
708 | } |
709 | } |
710 | } |
711 | |
712 | impl std::error::Error for TryAcquireError {} |
713 | |
714 | /// # Safety |
715 | /// |
716 | /// `Waiter` is forced to be !Unpin. |
717 | unsafe impl linked_list::Link for Waiter { |
718 | type Handle = NonNull<Waiter>; |
719 | type Target = Waiter; |
720 | |
721 | fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> { |
722 | *handle |
723 | } |
724 | |
725 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
726 | ptr |
727 | } |
728 | |
729 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
730 | Waiter::addr_of_pointers(me:target) |
731 | } |
732 | } |
733 | |