1 | #![cfg_attr (not(feature = "sync" ), allow(unreachable_pub, dead_code))] |
2 | //! # Implementation Details. |
3 | //! |
4 | //! The semaphore is implemented using an intrusive linked list of waiters. An |
5 | //! atomic counter tracks the number of available permits. If the semaphore does |
6 | //! not contain the required number of permits, the task attempting to acquire |
7 | //! permits places its waker at the end of a queue. When new permits are made |
8 | //! available (such as by releasing an initial acquisition), they are assigned |
9 | //! to the task at the front of the queue, waking that task if its requested |
10 | //! number of permits is met. |
11 | //! |
12 | //! Because waiters are enqueued at the back of the linked list and dequeued |
13 | //! from the front, the semaphore is fair. Tasks trying to acquire large numbers |
14 | //! of permits at a time will always be woken eventually, even if many other |
15 | //! tasks are acquiring smaller numbers of permits. This means that in a |
16 | //! use-case like tokio's read-write lock, writers will not be starved by |
17 | //! readers. |
18 | use crate::loom::cell::UnsafeCell; |
19 | use crate::loom::sync::atomic::AtomicUsize; |
20 | use crate::loom::sync::{Mutex, MutexGuard}; |
21 | use crate::util::linked_list::{self, LinkedList}; |
22 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
23 | use crate::util::trace; |
24 | use crate::util::WakeList; |
25 | |
26 | use std::future::Future; |
27 | use std::marker::PhantomPinned; |
28 | use std::pin::Pin; |
29 | use std::ptr::NonNull; |
30 | use std::sync::atomic::Ordering::*; |
31 | use std::task::{Context, Poll, Waker}; |
32 | use std::{cmp, fmt}; |
33 | |
34 | /// An asynchronous counting semaphore which permits waiting on multiple permits at once. |
35 | pub(crate) struct Semaphore { |
36 | waiters: Mutex<Waitlist>, |
37 | /// The current number of available permits in the semaphore. |
38 | permits: AtomicUsize, |
39 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
40 | resource_span: tracing::Span, |
41 | } |
42 | |
43 | struct Waitlist { |
44 | queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, |
45 | closed: bool, |
46 | } |
47 | |
48 | /// Error returned from the [`Semaphore::try_acquire`] function. |
49 | /// |
50 | /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire |
51 | #[derive(Debug, PartialEq, Eq)] |
52 | pub enum TryAcquireError { |
53 | /// The semaphore has been [closed] and cannot issue new permits. |
54 | /// |
55 | /// [closed]: crate::sync::Semaphore::close |
56 | Closed, |
57 | |
58 | /// The semaphore has no available permits. |
59 | NoPermits, |
60 | } |
61 | /// Error returned from the [`Semaphore::acquire`] function. |
62 | /// |
63 | /// An `acquire` operation can only fail if the semaphore has been |
64 | /// [closed]. |
65 | /// |
66 | /// [closed]: crate::sync::Semaphore::close |
67 | /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire |
68 | #[derive(Debug)] |
69 | pub struct AcquireError(()); |
70 | |
71 | pub(crate) struct Acquire<'a> { |
72 | node: Waiter, |
73 | semaphore: &'a Semaphore, |
74 | num_permits: u32, |
75 | queued: bool, |
76 | } |
77 | |
78 | /// An entry in the wait queue. |
79 | struct Waiter { |
80 | /// The current state of the waiter. |
81 | /// |
82 | /// This is either the number of remaining permits required by |
83 | /// the waiter, or a flag indicating that the waiter is not yet queued. |
84 | state: AtomicUsize, |
85 | |
86 | /// The waker to notify the task awaiting permits. |
87 | /// |
88 | /// # Safety |
89 | /// |
90 | /// This may only be accessed while the wait queue is locked. |
91 | waker: UnsafeCell<Option<Waker>>, |
92 | |
93 | /// Intrusive linked-list pointers. |
94 | /// |
95 | /// # Safety |
96 | /// |
97 | /// This may only be accessed while the wait queue is locked. |
98 | /// |
99 | /// TODO: Ideally, we would be able to use loom to enforce that |
100 | /// this isn't accessed concurrently. However, it is difficult to |
101 | /// use a `UnsafeCell` here, since the `Link` trait requires _returning_ |
102 | /// references to `Pointers`, and `UnsafeCell` requires that checked access |
103 | /// take place inside a closure. We should consider changing `Pointers` to |
104 | /// use `UnsafeCell` internally. |
105 | pointers: linked_list::Pointers<Waiter>, |
106 | |
107 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
108 | ctx: trace::AsyncOpTracingCtx, |
109 | |
110 | /// Should not be `Unpin`. |
111 | _p: PhantomPinned, |
112 | } |
113 | |
114 | generate_addr_of_methods! { |
115 | impl<> Waiter { |
116 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { |
117 | &self.pointers |
118 | } |
119 | } |
120 | } |
121 | |
122 | impl Semaphore { |
123 | /// The maximum number of permits which a semaphore can hold. |
124 | /// |
125 | /// Note that this reserves three bits of flags in the permit counter, but |
126 | /// we only actually use one of them. However, the previous semaphore |
127 | /// implementation used three bits, so we will continue to reserve them to |
128 | /// avoid a breaking change if additional flags need to be added in the |
129 | /// future. |
130 | pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3; |
131 | const CLOSED: usize = 1; |
132 | // The least-significant bit in the number of permits is reserved to use |
133 | // as a flag indicating that the semaphore has been closed. Consequently |
134 | // PERMIT_SHIFT is used to leave that bit for that purpose. |
135 | const PERMIT_SHIFT: usize = 1; |
136 | |
137 | /// Creates a new semaphore with the initial number of permits |
138 | /// |
139 | /// Maximum number of permits on 32-bit platforms is `1<<29`. |
140 | pub(crate) fn new(permits: usize) -> Self { |
141 | assert!( |
142 | permits <= Self::MAX_PERMITS, |
143 | "a semaphore may not have more than MAX_PERMITS permits ({})" , |
144 | Self::MAX_PERMITS |
145 | ); |
146 | |
147 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
148 | let resource_span = { |
149 | let resource_span = tracing::trace_span!( |
150 | "runtime.resource" , |
151 | concrete_type = "Semaphore" , |
152 | kind = "Sync" , |
153 | is_internal = true |
154 | ); |
155 | |
156 | resource_span.in_scope(|| { |
157 | tracing::trace!( |
158 | target: "runtime::resource::state_update" , |
159 | permits = permits, |
160 | permits.op = "override" , |
161 | ) |
162 | }); |
163 | resource_span |
164 | }; |
165 | |
166 | Self { |
167 | permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), |
168 | waiters: Mutex::new(Waitlist { |
169 | queue: LinkedList::new(), |
170 | closed: false, |
171 | }), |
172 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
173 | resource_span, |
174 | } |
175 | } |
176 | |
177 | /// Creates a new semaphore with the initial number of permits. |
178 | /// |
179 | /// Maximum number of permits on 32-bit platforms is `1<<29`. |
180 | #[cfg (not(all(loom, test)))] |
181 | pub(crate) const fn const_new(permits: usize) -> Self { |
182 | assert!(permits <= Self::MAX_PERMITS); |
183 | |
184 | Self { |
185 | permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), |
186 | waiters: Mutex::const_new(Waitlist { |
187 | queue: LinkedList::new(), |
188 | closed: false, |
189 | }), |
190 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
191 | resource_span: tracing::Span::none(), |
192 | } |
193 | } |
194 | |
195 | /// Creates a new closed semaphore with 0 permits. |
196 | pub(crate) fn new_closed() -> Self { |
197 | Self { |
198 | permits: AtomicUsize::new(Self::CLOSED), |
199 | waiters: Mutex::new(Waitlist { |
200 | queue: LinkedList::new(), |
201 | closed: true, |
202 | }), |
203 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
204 | resource_span: tracing::Span::none(), |
205 | } |
206 | } |
207 | |
208 | /// Creates a new closed semaphore with 0 permits. |
209 | #[cfg (not(all(loom, test)))] |
210 | pub(crate) const fn const_new_closed() -> Self { |
211 | Self { |
212 | permits: AtomicUsize::new(Self::CLOSED), |
213 | waiters: Mutex::const_new(Waitlist { |
214 | queue: LinkedList::new(), |
215 | closed: true, |
216 | }), |
217 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
218 | resource_span: tracing::Span::none(), |
219 | } |
220 | } |
221 | |
222 | /// Returns the current number of available permits. |
223 | pub(crate) fn available_permits(&self) -> usize { |
224 | self.permits.load(Acquire) >> Self::PERMIT_SHIFT |
225 | } |
226 | |
227 | /// Adds `added` new permits to the semaphore. |
228 | /// |
229 | /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded. |
230 | pub(crate) fn release(&self, added: usize) { |
231 | if added == 0 { |
232 | return; |
233 | } |
234 | |
235 | // Assign permits to the wait queue |
236 | self.add_permits_locked(added, self.waiters.lock()); |
237 | } |
238 | |
239 | /// Closes the semaphore. This prevents the semaphore from issuing new |
240 | /// permits and notifies all pending waiters. |
241 | pub(crate) fn close(&self) { |
242 | let mut waiters = self.waiters.lock(); |
243 | // If the semaphore's permits counter has enough permits for an |
244 | // unqueued waiter to acquire all the permits it needs immediately, |
245 | // it won't touch the wait list. Therefore, we have to set a bit on |
246 | // the permit counter as well. However, we must do this while |
247 | // holding the lock --- otherwise, if we set the bit and then wait |
248 | // to acquire the lock we'll enter an inconsistent state where the |
249 | // permit counter is closed, but the wait list is not. |
250 | self.permits.fetch_or(Self::CLOSED, Release); |
251 | waiters.closed = true; |
252 | while let Some(mut waiter) = waiters.queue.pop_back() { |
253 | let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; |
254 | if let Some(waker) = waker { |
255 | waker.wake(); |
256 | } |
257 | } |
258 | } |
259 | |
260 | /// Returns true if the semaphore is closed. |
261 | pub(crate) fn is_closed(&self) -> bool { |
262 | self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED |
263 | } |
264 | |
265 | pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { |
266 | assert!( |
267 | num_permits as usize <= Self::MAX_PERMITS, |
268 | "a semaphore may not have more than MAX_PERMITS permits ({})" , |
269 | Self::MAX_PERMITS |
270 | ); |
271 | let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; |
272 | let mut curr = self.permits.load(Acquire); |
273 | loop { |
274 | // Has the semaphore closed? |
275 | if curr & Self::CLOSED == Self::CLOSED { |
276 | return Err(TryAcquireError::Closed); |
277 | } |
278 | |
279 | // Are there enough permits remaining? |
280 | if curr < num_permits { |
281 | return Err(TryAcquireError::NoPermits); |
282 | } |
283 | |
284 | let next = curr - num_permits; |
285 | |
286 | match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { |
287 | Ok(_) => { |
288 | // TODO: Instrument once issue has been solved |
289 | return Ok(()); |
290 | } |
291 | Err(actual) => curr = actual, |
292 | } |
293 | } |
294 | } |
295 | |
296 | pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> { |
297 | Acquire::new(self, num_permits) |
298 | } |
299 | |
300 | /// Release `rem` permits to the semaphore's wait list, starting from the |
301 | /// end of the queue. |
302 | /// |
303 | /// If `rem` exceeds the number of permits needed by the wait list, the |
304 | /// remainder are assigned back to the semaphore. |
305 | fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { |
306 | let mut wakers = WakeList::new(); |
307 | let mut lock = Some(waiters); |
308 | let mut is_empty = false; |
309 | while rem > 0 { |
310 | let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); |
311 | 'inner: while wakers.can_push() { |
312 | // Was the waiter assigned enough permits to wake it? |
313 | match waiters.queue.last() { |
314 | Some(waiter) => { |
315 | if !waiter.assign_permits(&mut rem) { |
316 | break 'inner; |
317 | } |
318 | } |
319 | None => { |
320 | is_empty = true; |
321 | // If we assigned permits to all the waiters in the queue, and there are |
322 | // still permits left over, assign them back to the semaphore. |
323 | break 'inner; |
324 | } |
325 | }; |
326 | let mut waiter = waiters.queue.pop_back().unwrap(); |
327 | if let Some(waker) = |
328 | unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } |
329 | { |
330 | wakers.push(waker); |
331 | } |
332 | } |
333 | |
334 | if rem > 0 && is_empty { |
335 | let permits = rem; |
336 | assert!( |
337 | permits <= Self::MAX_PERMITS, |
338 | "cannot add more than MAX_PERMITS permits ({})" , |
339 | Self::MAX_PERMITS |
340 | ); |
341 | let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release); |
342 | let prev = prev >> Self::PERMIT_SHIFT; |
343 | assert!( |
344 | prev + permits <= Self::MAX_PERMITS, |
345 | "number of added permits ({}) would overflow MAX_PERMITS ({})" , |
346 | rem, |
347 | Self::MAX_PERMITS |
348 | ); |
349 | |
350 | // add remaining permits back |
351 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
352 | self.resource_span.in_scope(|| { |
353 | tracing::trace!( |
354 | target: "runtime::resource::state_update" , |
355 | permits = rem, |
356 | permits.op = "add" , |
357 | ) |
358 | }); |
359 | |
360 | rem = 0; |
361 | } |
362 | |
363 | drop(waiters); // release the lock |
364 | |
365 | wakers.wake_all(); |
366 | } |
367 | |
368 | assert_eq!(rem, 0); |
369 | } |
370 | |
371 | fn poll_acquire( |
372 | &self, |
373 | cx: &mut Context<'_>, |
374 | num_permits: u32, |
375 | node: Pin<&mut Waiter>, |
376 | queued: bool, |
377 | ) -> Poll<Result<(), AcquireError>> { |
378 | let mut acquired = 0; |
379 | |
380 | let needed = if queued { |
381 | node.state.load(Acquire) << Self::PERMIT_SHIFT |
382 | } else { |
383 | (num_permits as usize) << Self::PERMIT_SHIFT |
384 | }; |
385 | |
386 | let mut lock = None; |
387 | // First, try to take the requested number of permits from the |
388 | // semaphore. |
389 | let mut curr = self.permits.load(Acquire); |
390 | let mut waiters = loop { |
391 | // Has the semaphore closed? |
392 | if curr & Self::CLOSED > 0 { |
393 | return Poll::Ready(Err(AcquireError::closed())); |
394 | } |
395 | |
396 | let mut remaining = 0; |
397 | let total = curr |
398 | .checked_add(acquired) |
399 | .expect("number of permits must not overflow" ); |
400 | let (next, acq) = if total >= needed { |
401 | let next = curr - (needed - acquired); |
402 | (next, needed >> Self::PERMIT_SHIFT) |
403 | } else { |
404 | remaining = (needed - acquired) - curr; |
405 | (0, curr >> Self::PERMIT_SHIFT) |
406 | }; |
407 | |
408 | if remaining > 0 && lock.is_none() { |
409 | // No permits were immediately available, so this permit will |
410 | // (probably) need to wait. We'll need to acquire a lock on the |
411 | // wait queue before continuing. We need to do this _before_ the |
412 | // CAS that sets the new value of the semaphore's `permits` |
413 | // counter. Otherwise, if we subtract the permits and then |
414 | // acquire the lock, we might miss additional permits being |
415 | // added while waiting for the lock. |
416 | lock = Some(self.waiters.lock()); |
417 | } |
418 | |
419 | match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { |
420 | Ok(_) => { |
421 | acquired += acq; |
422 | if remaining == 0 { |
423 | if !queued { |
424 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
425 | self.resource_span.in_scope(|| { |
426 | tracing::trace!( |
427 | target: "runtime::resource::state_update" , |
428 | permits = acquired, |
429 | permits.op = "sub" , |
430 | ); |
431 | tracing::trace!( |
432 | target: "runtime::resource::async_op::state_update" , |
433 | permits_obtained = acquired, |
434 | permits.op = "add" , |
435 | ) |
436 | }); |
437 | |
438 | return Poll::Ready(Ok(())); |
439 | } else if lock.is_none() { |
440 | break self.waiters.lock(); |
441 | } |
442 | } |
443 | break lock.expect("lock must be acquired before waiting" ); |
444 | } |
445 | Err(actual) => curr = actual, |
446 | } |
447 | }; |
448 | |
449 | if waiters.closed { |
450 | return Poll::Ready(Err(AcquireError::closed())); |
451 | } |
452 | |
453 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
454 | self.resource_span.in_scope(|| { |
455 | tracing::trace!( |
456 | target: "runtime::resource::state_update" , |
457 | permits = acquired, |
458 | permits.op = "sub" , |
459 | ) |
460 | }); |
461 | |
462 | if node.assign_permits(&mut acquired) { |
463 | self.add_permits_locked(acquired, waiters); |
464 | return Poll::Ready(Ok(())); |
465 | } |
466 | |
467 | assert_eq!(acquired, 0); |
468 | let mut old_waker = None; |
469 | |
470 | // Otherwise, register the waker & enqueue the node. |
471 | node.waker.with_mut(|waker| { |
472 | // Safety: the wait list is locked, so we may modify the waker. |
473 | let waker = unsafe { &mut *waker }; |
474 | // Do we need to register the new waker? |
475 | if waker |
476 | .as_ref() |
477 | .map_or(true, |waker| !waker.will_wake(cx.waker())) |
478 | { |
479 | old_waker = std::mem::replace(waker, Some(cx.waker().clone())); |
480 | } |
481 | }); |
482 | |
483 | // If the waiter is not already in the wait queue, enqueue it. |
484 | if !queued { |
485 | let node = unsafe { |
486 | let node = Pin::into_inner_unchecked(node) as *mut _; |
487 | NonNull::new_unchecked(node) |
488 | }; |
489 | |
490 | waiters.queue.push_front(node); |
491 | } |
492 | drop(waiters); |
493 | drop(old_waker); |
494 | |
495 | Poll::Pending |
496 | } |
497 | } |
498 | |
499 | impl fmt::Debug for Semaphore { |
500 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
501 | fmt.debug_struct("Semaphore" ) |
502 | .field("permits" , &self.available_permits()) |
503 | .finish() |
504 | } |
505 | } |
506 | |
507 | impl Waiter { |
508 | fn new( |
509 | num_permits: u32, |
510 | #[cfg (all(tokio_unstable, feature = "tracing" ))] ctx: trace::AsyncOpTracingCtx, |
511 | ) -> Self { |
512 | Waiter { |
513 | waker: UnsafeCell::new(None), |
514 | state: AtomicUsize::new(num_permits as usize), |
515 | pointers: linked_list::Pointers::new(), |
516 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
517 | ctx, |
518 | _p: PhantomPinned, |
519 | } |
520 | } |
521 | |
522 | /// Assign permits to the waiter. |
523 | /// |
524 | /// Returns `true` if the waiter should be removed from the queue |
525 | fn assign_permits(&self, n: &mut usize) -> bool { |
526 | let mut curr = self.state.load(Acquire); |
527 | loop { |
528 | let assign = cmp::min(curr, *n); |
529 | let next = curr - assign; |
530 | match self.state.compare_exchange(curr, next, AcqRel, Acquire) { |
531 | Ok(_) => { |
532 | *n -= assign; |
533 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
534 | self.ctx.async_op_span.in_scope(|| { |
535 | tracing::trace!( |
536 | target: "runtime::resource::async_op::state_update" , |
537 | permits_obtained = assign, |
538 | permits.op = "add" , |
539 | ); |
540 | }); |
541 | return next == 0; |
542 | } |
543 | Err(actual) => curr = actual, |
544 | } |
545 | } |
546 | } |
547 | } |
548 | |
549 | impl Future for Acquire<'_> { |
550 | type Output = Result<(), AcquireError>; |
551 | |
552 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
553 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
554 | let _resource_span = self.node.ctx.resource_span.clone().entered(); |
555 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
556 | let _async_op_span = self.node.ctx.async_op_span.clone().entered(); |
557 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
558 | let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered(); |
559 | |
560 | let (node, semaphore, needed, queued) = self.project(); |
561 | |
562 | // First, ensure the current task has enough budget to proceed. |
563 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
564 | let coop = ready!(trace_poll_op!( |
565 | "poll_acquire" , |
566 | crate::runtime::coop::poll_proceed(cx), |
567 | )); |
568 | |
569 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
570 | let coop = ready!(crate::runtime::coop::poll_proceed(cx)); |
571 | |
572 | let result = match semaphore.poll_acquire(cx, needed, node, *queued) { |
573 | Poll::Pending => { |
574 | *queued = true; |
575 | Poll::Pending |
576 | } |
577 | Poll::Ready(r) => { |
578 | coop.made_progress(); |
579 | r?; |
580 | *queued = false; |
581 | Poll::Ready(Ok(())) |
582 | } |
583 | }; |
584 | |
585 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
586 | return trace_poll_op!("poll_acquire" , result); |
587 | |
588 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
589 | return result; |
590 | } |
591 | } |
592 | |
593 | impl<'a> Acquire<'a> { |
594 | fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { |
595 | #[cfg (any(not(tokio_unstable), not(feature = "tracing" )))] |
596 | return Self { |
597 | node: Waiter::new(num_permits), |
598 | semaphore, |
599 | num_permits, |
600 | queued: false, |
601 | }; |
602 | |
603 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
604 | return semaphore.resource_span.in_scope(|| { |
605 | let async_op_span = |
606 | tracing::trace_span!("runtime.resource.async_op" , source = "Acquire::new" ); |
607 | let async_op_poll_span = async_op_span.in_scope(|| { |
608 | tracing::trace!( |
609 | target: "runtime::resource::async_op::state_update" , |
610 | permits_requested = num_permits, |
611 | permits.op = "override" , |
612 | ); |
613 | |
614 | tracing::trace!( |
615 | target: "runtime::resource::async_op::state_update" , |
616 | permits_obtained = 0usize, |
617 | permits.op = "override" , |
618 | ); |
619 | |
620 | tracing::trace_span!("runtime.resource.async_op.poll" ) |
621 | }); |
622 | |
623 | let ctx = trace::AsyncOpTracingCtx { |
624 | async_op_span, |
625 | async_op_poll_span, |
626 | resource_span: semaphore.resource_span.clone(), |
627 | }; |
628 | |
629 | Self { |
630 | node: Waiter::new(num_permits, ctx), |
631 | semaphore, |
632 | num_permits, |
633 | queued: false, |
634 | } |
635 | }); |
636 | } |
637 | |
638 | fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { |
639 | fn is_unpin<T: Unpin>() {} |
640 | unsafe { |
641 | // Safety: all fields other than `node` are `Unpin` |
642 | |
643 | is_unpin::<&Semaphore>(); |
644 | is_unpin::<&mut bool>(); |
645 | is_unpin::<u32>(); |
646 | |
647 | let this = self.get_unchecked_mut(); |
648 | ( |
649 | Pin::new_unchecked(&mut this.node), |
650 | this.semaphore, |
651 | this.num_permits, |
652 | &mut this.queued, |
653 | ) |
654 | } |
655 | } |
656 | } |
657 | |
658 | impl Drop for Acquire<'_> { |
659 | fn drop(&mut self) { |
660 | // If the future is completed, there is no node in the wait list, so we |
661 | // can skip acquiring the lock. |
662 | if !self.queued { |
663 | return; |
664 | } |
665 | |
666 | // This is where we ensure safety. The future is being dropped, |
667 | // which means we must ensure that the waiter entry is no longer stored |
668 | // in the linked list. |
669 | let mut waiters = self.semaphore.waiters.lock(); |
670 | |
671 | // remove the entry from the list |
672 | let node = NonNull::from(&mut self.node); |
673 | // Safety: we have locked the wait list. |
674 | unsafe { waiters.queue.remove(node) }; |
675 | |
676 | let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire); |
677 | if acquired_permits > 0 { |
678 | self.semaphore.add_permits_locked(acquired_permits, waiters); |
679 | } |
680 | } |
681 | } |
682 | |
683 | // Safety: the `Acquire` future is not `Sync` automatically because it contains |
684 | // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the |
685 | // `UnsafeCell` is only accessed when the future is borrowed mutably (either in |
686 | // `poll` or in `drop`). Therefore, it is safe (although not particularly |
687 | // _useful_) for the future to be borrowed immutably across threads. |
688 | unsafe impl Sync for Acquire<'_> {} |
689 | |
690 | // ===== impl AcquireError ==== |
691 | |
692 | impl AcquireError { |
693 | fn closed() -> AcquireError { |
694 | AcquireError(()) |
695 | } |
696 | } |
697 | |
698 | impl fmt::Display for AcquireError { |
699 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
700 | write!(fmt, "semaphore closed" ) |
701 | } |
702 | } |
703 | |
704 | impl std::error::Error for AcquireError {} |
705 | |
706 | // ===== impl TryAcquireError ===== |
707 | |
708 | impl TryAcquireError { |
709 | /// Returns `true` if the error was caused by a closed semaphore. |
710 | #[allow (dead_code)] // may be used later! |
711 | pub(crate) fn is_closed(&self) -> bool { |
712 | matches!(self, TryAcquireError::Closed) |
713 | } |
714 | |
715 | /// Returns `true` if the error was caused by calling `try_acquire` on a |
716 | /// semaphore with no available permits. |
717 | #[allow (dead_code)] // may be used later! |
718 | pub(crate) fn is_no_permits(&self) -> bool { |
719 | matches!(self, TryAcquireError::NoPermits) |
720 | } |
721 | } |
722 | |
723 | impl fmt::Display for TryAcquireError { |
724 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
725 | match self { |
726 | TryAcquireError::Closed => write!(fmt, "semaphore closed" ), |
727 | TryAcquireError::NoPermits => write!(fmt, "no permits available" ), |
728 | } |
729 | } |
730 | } |
731 | |
732 | impl std::error::Error for TryAcquireError {} |
733 | |
734 | /// # Safety |
735 | /// |
736 | /// `Waiter` is forced to be !Unpin. |
737 | unsafe impl linked_list::Link for Waiter { |
738 | type Handle = NonNull<Waiter>; |
739 | type Target = Waiter; |
740 | |
741 | fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> { |
742 | *handle |
743 | } |
744 | |
745 | unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { |
746 | ptr |
747 | } |
748 | |
749 | unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { |
750 | Waiter::addr_of_pointers(target) |
751 | } |
752 | } |
753 | |