1 | use super::batch_semaphore as ll; // low level implementation |
2 | use super::{AcquireError, TryAcquireError}; |
3 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
4 | use crate::util::trace; |
5 | use std::sync::Arc; |
6 | |
7 | /// Counting semaphore performing asynchronous permit acquisition. |
8 | /// |
9 | /// A semaphore maintains a set of permits. Permits are used to synchronize |
10 | /// access to a shared resource. A semaphore differs from a mutex in that it |
11 | /// can allow more than one concurrent caller to access the shared resource at a |
12 | /// time. |
13 | /// |
14 | /// When `acquire` is called and the semaphore has remaining permits, the |
15 | /// function immediately returns a permit. However, if no remaining permits are |
16 | /// available, `acquire` (asynchronously) waits until an outstanding permit is |
17 | /// dropped. At this point, the freed permit is assigned to the caller. |
18 | /// |
19 | /// This `Semaphore` is fair, which means that permits are given out in the order |
20 | /// they were requested. This fairness is also applied when `acquire_many` gets |
21 | /// involved, so if a call to `acquire_many` at the front of the queue requests |
22 | /// more permits than currently available, this can prevent a call to `acquire` |
23 | /// from completing, even if the semaphore has enough permits complete the call |
24 | /// to `acquire`. |
25 | /// |
26 | /// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`] |
27 | /// utility. |
28 | /// |
29 | /// # Examples |
30 | /// |
31 | /// Basic usage: |
32 | /// |
33 | /// ``` |
34 | /// use tokio::sync::{Semaphore, TryAcquireError}; |
35 | /// |
36 | /// #[tokio::main] |
37 | /// async fn main() { |
38 | /// let semaphore = Semaphore::new(3); |
39 | /// |
40 | /// let a_permit = semaphore.acquire().await.unwrap(); |
41 | /// let two_permits = semaphore.acquire_many(2).await.unwrap(); |
42 | /// |
43 | /// assert_eq!(semaphore.available_permits(), 0); |
44 | /// |
45 | /// let permit_attempt = semaphore.try_acquire(); |
46 | /// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits)); |
47 | /// } |
48 | /// ``` |
49 | /// |
50 | /// Use [`Semaphore::acquire_owned`] to move permits across tasks: |
51 | /// |
52 | /// ``` |
53 | /// use std::sync::Arc; |
54 | /// use tokio::sync::Semaphore; |
55 | /// |
56 | /// #[tokio::main] |
57 | /// async fn main() { |
58 | /// let semaphore = Arc::new(Semaphore::new(3)); |
59 | /// let mut join_handles = Vec::new(); |
60 | /// |
61 | /// for _ in 0..5 { |
62 | /// let permit = semaphore.clone().acquire_owned().await.unwrap(); |
63 | /// join_handles.push(tokio::spawn(async move { |
64 | /// // perform task... |
65 | /// // explicitly own `permit` in the task |
66 | /// drop(permit); |
67 | /// })); |
68 | /// } |
69 | /// |
70 | /// for handle in join_handles { |
71 | /// handle.await.unwrap(); |
72 | /// } |
73 | /// } |
74 | /// ``` |
75 | /// |
76 | /// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html |
77 | /// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned |
78 | #[derive (Debug)] |
79 | pub struct Semaphore { |
80 | /// The low level semaphore |
81 | ll_sem: ll::Semaphore, |
82 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
83 | resource_span: tracing::Span, |
84 | } |
85 | |
86 | /// A permit from the semaphore. |
87 | /// |
88 | /// This type is created by the [`acquire`] method. |
89 | /// |
90 | /// [`acquire`]: crate::sync::Semaphore::acquire() |
91 | #[must_use ] |
92 | #[clippy::has_significant_drop] |
93 | #[derive (Debug)] |
94 | pub struct SemaphorePermit<'a> { |
95 | sem: &'a Semaphore, |
96 | permits: u32, |
97 | } |
98 | |
99 | /// An owned permit from the semaphore. |
100 | /// |
101 | /// This type is created by the [`acquire_owned`] method. |
102 | /// |
103 | /// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned() |
104 | #[must_use ] |
105 | #[clippy::has_significant_drop] |
106 | #[derive (Debug)] |
107 | pub struct OwnedSemaphorePermit { |
108 | sem: Arc<Semaphore>, |
109 | permits: u32, |
110 | } |
111 | |
112 | #[test ] |
113 | #[cfg (not(loom))] |
114 | fn bounds() { |
115 | fn check_unpin<T: Unpin>() {} |
116 | // This has to take a value, since the async fn's return type is unnameable. |
117 | fn check_send_sync_val<T: Send + Sync>(_t: T) {} |
118 | fn check_send_sync<T: Send + Sync>() {} |
119 | check_unpin::<Semaphore>(); |
120 | check_unpin::<SemaphorePermit<'_>>(); |
121 | check_send_sync::<Semaphore>(); |
122 | |
123 | let semaphore: Semaphore = Semaphore::new(permits:0); |
124 | check_send_sync_val(semaphore.acquire()); |
125 | } |
126 | |
127 | impl Semaphore { |
128 | /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`. |
129 | /// |
130 | /// Exceeding this limit typically results in a panic. |
131 | pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS; |
132 | |
133 | /// Creates a new semaphore with the initial number of permits. |
134 | /// |
135 | /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`]. |
136 | #[track_caller ] |
137 | pub fn new(permits: usize) -> Self { |
138 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
139 | let resource_span = { |
140 | let location = std::panic::Location::caller(); |
141 | |
142 | tracing::trace_span!( |
143 | "runtime.resource" , |
144 | concrete_type = "Semaphore" , |
145 | kind = "Sync" , |
146 | loc.file = location.file(), |
147 | loc.line = location.line(), |
148 | loc.col = location.column(), |
149 | inherits_child_attrs = true, |
150 | ) |
151 | }; |
152 | |
153 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
154 | let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits)); |
155 | |
156 | #[cfg (any(not(tokio_unstable), not(feature = "tracing" )))] |
157 | let ll_sem = ll::Semaphore::new(permits); |
158 | |
159 | Self { |
160 | ll_sem, |
161 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
162 | resource_span, |
163 | } |
164 | } |
165 | |
166 | /// Creates a new semaphore with the initial number of permits. |
167 | /// |
168 | /// # Examples |
169 | /// |
170 | /// ``` |
171 | /// use tokio::sync::Semaphore; |
172 | /// |
173 | /// static SEM: Semaphore = Semaphore::const_new(10); |
174 | /// ``` |
175 | /// |
176 | #[cfg (all(feature = "parking_lot" , not(all(loom, test))))] |
177 | #[cfg_attr (docsrs, doc(cfg(feature = "parking_lot" )))] |
178 | pub const fn const_new(permits: usize) -> Self { |
179 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
180 | return Self { |
181 | ll_sem: ll::Semaphore::const_new(permits), |
182 | resource_span: tracing::Span::none(), |
183 | }; |
184 | |
185 | #[cfg (any(not(tokio_unstable), not(feature = "tracing" )))] |
186 | return Self { |
187 | ll_sem: ll::Semaphore::const_new(permits), |
188 | }; |
189 | } |
190 | |
191 | /// Returns the current number of available permits. |
192 | pub fn available_permits(&self) -> usize { |
193 | self.ll_sem.available_permits() |
194 | } |
195 | |
196 | /// Adds `n` new permits to the semaphore. |
197 | /// |
198 | /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded. |
199 | pub fn add_permits(&self, n: usize) { |
200 | self.ll_sem.release(n); |
201 | } |
202 | |
203 | /// Acquires a permit from the semaphore. |
204 | /// |
205 | /// If the semaphore has been closed, this returns an [`AcquireError`]. |
206 | /// Otherwise, this returns a [`SemaphorePermit`] representing the |
207 | /// acquired permit. |
208 | /// |
209 | /// # Cancel safety |
210 | /// |
211 | /// This method uses a queue to fairly distribute permits in the order they |
212 | /// were requested. Cancelling a call to `acquire` makes you lose your place |
213 | /// in the queue. |
214 | /// |
215 | /// # Examples |
216 | /// |
217 | /// ``` |
218 | /// use tokio::sync::Semaphore; |
219 | /// |
220 | /// #[tokio::main] |
221 | /// async fn main() { |
222 | /// let semaphore = Semaphore::new(2); |
223 | /// |
224 | /// let permit_1 = semaphore.acquire().await.unwrap(); |
225 | /// assert_eq!(semaphore.available_permits(), 1); |
226 | /// |
227 | /// let permit_2 = semaphore.acquire().await.unwrap(); |
228 | /// assert_eq!(semaphore.available_permits(), 0); |
229 | /// |
230 | /// drop(permit_1); |
231 | /// assert_eq!(semaphore.available_permits(), 1); |
232 | /// } |
233 | /// ``` |
234 | /// |
235 | /// [`AcquireError`]: crate::sync::AcquireError |
236 | /// [`SemaphorePermit`]: crate::sync::SemaphorePermit |
237 | pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> { |
238 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
239 | let inner = trace::async_op( |
240 | || self.ll_sem.acquire(1), |
241 | self.resource_span.clone(), |
242 | "Semaphore::acquire" , |
243 | "poll" , |
244 | true, |
245 | ); |
246 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
247 | let inner = self.ll_sem.acquire(1); |
248 | |
249 | inner.await?; |
250 | Ok(SemaphorePermit { |
251 | sem: self, |
252 | permits: 1, |
253 | }) |
254 | } |
255 | |
256 | /// Acquires `n` permits from the semaphore. |
257 | /// |
258 | /// If the semaphore has been closed, this returns an [`AcquireError`]. |
259 | /// Otherwise, this returns a [`SemaphorePermit`] representing the |
260 | /// acquired permits. |
261 | /// |
262 | /// # Cancel safety |
263 | /// |
264 | /// This method uses a queue to fairly distribute permits in the order they |
265 | /// were requested. Cancelling a call to `acquire_many` makes you lose your |
266 | /// place in the queue. |
267 | /// |
268 | /// # Examples |
269 | /// |
270 | /// ``` |
271 | /// use tokio::sync::Semaphore; |
272 | /// |
273 | /// #[tokio::main] |
274 | /// async fn main() { |
275 | /// let semaphore = Semaphore::new(5); |
276 | /// |
277 | /// let permit = semaphore.acquire_many(3).await.unwrap(); |
278 | /// assert_eq!(semaphore.available_permits(), 2); |
279 | /// } |
280 | /// ``` |
281 | /// |
282 | /// [`AcquireError`]: crate::sync::AcquireError |
283 | /// [`SemaphorePermit`]: crate::sync::SemaphorePermit |
284 | pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> { |
285 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
286 | trace::async_op( |
287 | || self.ll_sem.acquire(n), |
288 | self.resource_span.clone(), |
289 | "Semaphore::acquire_many" , |
290 | "poll" , |
291 | true, |
292 | ) |
293 | .await?; |
294 | |
295 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
296 | self.ll_sem.acquire(n).await?; |
297 | |
298 | Ok(SemaphorePermit { |
299 | sem: self, |
300 | permits: n, |
301 | }) |
302 | } |
303 | |
304 | /// Tries to acquire a permit from the semaphore. |
305 | /// |
306 | /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] |
307 | /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, |
308 | /// this returns a [`SemaphorePermit`] representing the acquired permits. |
309 | /// |
310 | /// # Examples |
311 | /// |
312 | /// ``` |
313 | /// use tokio::sync::{Semaphore, TryAcquireError}; |
314 | /// |
315 | /// # fn main() { |
316 | /// let semaphore = Semaphore::new(2); |
317 | /// |
318 | /// let permit_1 = semaphore.try_acquire().unwrap(); |
319 | /// assert_eq!(semaphore.available_permits(), 1); |
320 | /// |
321 | /// let permit_2 = semaphore.try_acquire().unwrap(); |
322 | /// assert_eq!(semaphore.available_permits(), 0); |
323 | /// |
324 | /// let permit_3 = semaphore.try_acquire(); |
325 | /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits)); |
326 | /// # } |
327 | /// ``` |
328 | /// |
329 | /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed |
330 | /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits |
331 | /// [`SemaphorePermit`]: crate::sync::SemaphorePermit |
332 | pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> { |
333 | match self.ll_sem.try_acquire(1) { |
334 | Ok(_) => Ok(SemaphorePermit { |
335 | sem: self, |
336 | permits: 1, |
337 | }), |
338 | Err(e) => Err(e), |
339 | } |
340 | } |
341 | |
342 | /// Tries to acquire `n` permits from the semaphore. |
343 | /// |
344 | /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] |
345 | /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left. |
346 | /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits. |
347 | /// |
348 | /// # Examples |
349 | /// |
350 | /// ``` |
351 | /// use tokio::sync::{Semaphore, TryAcquireError}; |
352 | /// |
353 | /// # fn main() { |
354 | /// let semaphore = Semaphore::new(4); |
355 | /// |
356 | /// let permit_1 = semaphore.try_acquire_many(3).unwrap(); |
357 | /// assert_eq!(semaphore.available_permits(), 1); |
358 | /// |
359 | /// let permit_2 = semaphore.try_acquire_many(2); |
360 | /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits)); |
361 | /// # } |
362 | /// ``` |
363 | /// |
364 | /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed |
365 | /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits |
366 | /// [`SemaphorePermit`]: crate::sync::SemaphorePermit |
367 | pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> { |
368 | match self.ll_sem.try_acquire(n) { |
369 | Ok(_) => Ok(SemaphorePermit { |
370 | sem: self, |
371 | permits: n, |
372 | }), |
373 | Err(e) => Err(e), |
374 | } |
375 | } |
376 | |
377 | /// Acquires a permit from the semaphore. |
378 | /// |
379 | /// The semaphore must be wrapped in an [`Arc`] to call this method. |
380 | /// If the semaphore has been closed, this returns an [`AcquireError`]. |
381 | /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the |
382 | /// acquired permit. |
383 | /// |
384 | /// # Cancel safety |
385 | /// |
386 | /// This method uses a queue to fairly distribute permits in the order they |
387 | /// were requested. Cancelling a call to `acquire_owned` makes you lose your |
388 | /// place in the queue. |
389 | /// |
390 | /// # Examples |
391 | /// |
392 | /// ``` |
393 | /// use std::sync::Arc; |
394 | /// use tokio::sync::Semaphore; |
395 | /// |
396 | /// #[tokio::main] |
397 | /// async fn main() { |
398 | /// let semaphore = Arc::new(Semaphore::new(3)); |
399 | /// let mut join_handles = Vec::new(); |
400 | /// |
401 | /// for _ in 0..5 { |
402 | /// let permit = semaphore.clone().acquire_owned().await.unwrap(); |
403 | /// join_handles.push(tokio::spawn(async move { |
404 | /// // perform task... |
405 | /// // explicitly own `permit` in the task |
406 | /// drop(permit); |
407 | /// })); |
408 | /// } |
409 | /// |
410 | /// for handle in join_handles { |
411 | /// handle.await.unwrap(); |
412 | /// } |
413 | /// } |
414 | /// ``` |
415 | /// |
416 | /// [`Arc`]: std::sync::Arc |
417 | /// [`AcquireError`]: crate::sync::AcquireError |
418 | /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit |
419 | pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> { |
420 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
421 | let inner = trace::async_op( |
422 | || self.ll_sem.acquire(1), |
423 | self.resource_span.clone(), |
424 | "Semaphore::acquire_owned" , |
425 | "poll" , |
426 | true, |
427 | ); |
428 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
429 | let inner = self.ll_sem.acquire(1); |
430 | |
431 | inner.await?; |
432 | Ok(OwnedSemaphorePermit { |
433 | sem: self, |
434 | permits: 1, |
435 | }) |
436 | } |
437 | |
438 | /// Acquires `n` permits from the semaphore. |
439 | /// |
440 | /// The semaphore must be wrapped in an [`Arc`] to call this method. |
441 | /// If the semaphore has been closed, this returns an [`AcquireError`]. |
442 | /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the |
443 | /// acquired permit. |
444 | /// |
445 | /// # Cancel safety |
446 | /// |
447 | /// This method uses a queue to fairly distribute permits in the order they |
448 | /// were requested. Cancelling a call to `acquire_many_owned` makes you lose |
449 | /// your place in the queue. |
450 | /// |
451 | /// # Examples |
452 | /// |
453 | /// ``` |
454 | /// use std::sync::Arc; |
455 | /// use tokio::sync::Semaphore; |
456 | /// |
457 | /// #[tokio::main] |
458 | /// async fn main() { |
459 | /// let semaphore = Arc::new(Semaphore::new(10)); |
460 | /// let mut join_handles = Vec::new(); |
461 | /// |
462 | /// for _ in 0..5 { |
463 | /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap(); |
464 | /// join_handles.push(tokio::spawn(async move { |
465 | /// // perform task... |
466 | /// // explicitly own `permit` in the task |
467 | /// drop(permit); |
468 | /// })); |
469 | /// } |
470 | /// |
471 | /// for handle in join_handles { |
472 | /// handle.await.unwrap(); |
473 | /// } |
474 | /// } |
475 | /// ``` |
476 | /// |
477 | /// [`Arc`]: std::sync::Arc |
478 | /// [`AcquireError`]: crate::sync::AcquireError |
479 | /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit |
480 | pub async fn acquire_many_owned( |
481 | self: Arc<Self>, |
482 | n: u32, |
483 | ) -> Result<OwnedSemaphorePermit, AcquireError> { |
484 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
485 | let inner = trace::async_op( |
486 | || self.ll_sem.acquire(n), |
487 | self.resource_span.clone(), |
488 | "Semaphore::acquire_many_owned" , |
489 | "poll" , |
490 | true, |
491 | ); |
492 | #[cfg (not(all(tokio_unstable, feature = "tracing" )))] |
493 | let inner = self.ll_sem.acquire(n); |
494 | |
495 | inner.await?; |
496 | Ok(OwnedSemaphorePermit { |
497 | sem: self, |
498 | permits: n, |
499 | }) |
500 | } |
501 | |
502 | /// Tries to acquire a permit from the semaphore. |
503 | /// |
504 | /// The semaphore must be wrapped in an [`Arc`] to call this method. If |
505 | /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`] |
506 | /// and a [`TryAcquireError::NoPermits`] if there are no permits left. |
507 | /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the |
508 | /// acquired permit. |
509 | /// |
510 | /// # Examples |
511 | /// |
512 | /// ``` |
513 | /// use std::sync::Arc; |
514 | /// use tokio::sync::{Semaphore, TryAcquireError}; |
515 | /// |
516 | /// # fn main() { |
517 | /// let semaphore = Arc::new(Semaphore::new(2)); |
518 | /// |
519 | /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap(); |
520 | /// assert_eq!(semaphore.available_permits(), 1); |
521 | /// |
522 | /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap(); |
523 | /// assert_eq!(semaphore.available_permits(), 0); |
524 | /// |
525 | /// let permit_3 = semaphore.try_acquire_owned(); |
526 | /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits)); |
527 | /// # } |
528 | /// ``` |
529 | /// |
530 | /// [`Arc`]: std::sync::Arc |
531 | /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed |
532 | /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits |
533 | /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit |
534 | pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> { |
535 | match self.ll_sem.try_acquire(1) { |
536 | Ok(_) => Ok(OwnedSemaphorePermit { |
537 | sem: self, |
538 | permits: 1, |
539 | }), |
540 | Err(e) => Err(e), |
541 | } |
542 | } |
543 | |
544 | /// Tries to acquire `n` permits from the semaphore. |
545 | /// |
546 | /// The semaphore must be wrapped in an [`Arc`] to call this method. If |
547 | /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`] |
548 | /// and a [`TryAcquireError::NoPermits`] if there are no permits left. |
549 | /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the |
550 | /// acquired permit. |
551 | /// |
552 | /// # Examples |
553 | /// |
554 | /// ``` |
555 | /// use std::sync::Arc; |
556 | /// use tokio::sync::{Semaphore, TryAcquireError}; |
557 | /// |
558 | /// # fn main() { |
559 | /// let semaphore = Arc::new(Semaphore::new(4)); |
560 | /// |
561 | /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap(); |
562 | /// assert_eq!(semaphore.available_permits(), 1); |
563 | /// |
564 | /// let permit_2 = semaphore.try_acquire_many_owned(2); |
565 | /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits)); |
566 | /// # } |
567 | /// ``` |
568 | /// |
569 | /// [`Arc`]: std::sync::Arc |
570 | /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed |
571 | /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits |
572 | /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit |
573 | pub fn try_acquire_many_owned( |
574 | self: Arc<Self>, |
575 | n: u32, |
576 | ) -> Result<OwnedSemaphorePermit, TryAcquireError> { |
577 | match self.ll_sem.try_acquire(n) { |
578 | Ok(_) => Ok(OwnedSemaphorePermit { |
579 | sem: self, |
580 | permits: n, |
581 | }), |
582 | Err(e) => Err(e), |
583 | } |
584 | } |
585 | |
586 | /// Closes the semaphore. |
587 | /// |
588 | /// This prevents the semaphore from issuing new permits and notifies all pending waiters. |
589 | /// |
590 | /// # Examples |
591 | /// |
592 | /// ``` |
593 | /// use tokio::sync::Semaphore; |
594 | /// use std::sync::Arc; |
595 | /// use tokio::sync::TryAcquireError; |
596 | /// |
597 | /// #[tokio::main] |
598 | /// async fn main() { |
599 | /// let semaphore = Arc::new(Semaphore::new(1)); |
600 | /// let semaphore2 = semaphore.clone(); |
601 | /// |
602 | /// tokio::spawn(async move { |
603 | /// let permit = semaphore.acquire_many(2).await; |
604 | /// assert!(permit.is_err()); |
605 | /// println!("waiter received error" ); |
606 | /// }); |
607 | /// |
608 | /// println!("closing semaphore" ); |
609 | /// semaphore2.close(); |
610 | /// |
611 | /// // Cannot obtain more permits |
612 | /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed)) |
613 | /// } |
614 | /// ``` |
615 | pub fn close(&self) { |
616 | self.ll_sem.close(); |
617 | } |
618 | |
619 | /// Returns true if the semaphore is closed |
620 | pub fn is_closed(&self) -> bool { |
621 | self.ll_sem.is_closed() |
622 | } |
623 | } |
624 | |
625 | impl<'a> SemaphorePermit<'a> { |
626 | /// Forgets the permit **without** releasing it back to the semaphore. |
627 | /// This can be used to reduce the amount of permits available from a |
628 | /// semaphore. |
629 | pub fn forget(mut self) { |
630 | self.permits = 0; |
631 | } |
632 | |
633 | /// Merge two [`SemaphorePermit`] instances together, consuming `other` |
634 | /// without releasing the permits it holds. |
635 | /// |
636 | /// Permits held by both `self` and `other` are released when `self` drops. |
637 | /// |
638 | /// # Panics |
639 | /// |
640 | /// This function panics if permits from different [`Semaphore`] instances |
641 | /// are merged. |
642 | #[track_caller ] |
643 | pub fn merge(&mut self, mut other: Self) { |
644 | assert!( |
645 | std::ptr::eq(self.sem, other.sem), |
646 | "merging permits from different semaphore instances" |
647 | ); |
648 | self.permits += other.permits; |
649 | other.permits = 0; |
650 | } |
651 | } |
652 | |
653 | impl OwnedSemaphorePermit { |
654 | /// Forgets the permit **without** releasing it back to the semaphore. |
655 | /// This can be used to reduce the amount of permits available from a |
656 | /// semaphore. |
657 | pub fn forget(mut self) { |
658 | self.permits = 0; |
659 | } |
660 | |
661 | /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other` |
662 | /// without releasing the permits it holds. |
663 | /// |
664 | /// Permits held by both `self` and `other` are released when `self` drops. |
665 | /// |
666 | /// # Panics |
667 | /// |
668 | /// This function panics if permits from different [`Semaphore`] instances |
669 | /// are merged. |
670 | #[track_caller ] |
671 | pub fn merge(&mut self, mut other: Self) { |
672 | assert!( |
673 | Arc::ptr_eq(&self.sem, &other.sem), |
674 | "merging permits from different semaphore instances" |
675 | ); |
676 | self.permits += other.permits; |
677 | other.permits = 0; |
678 | } |
679 | |
680 | /// Returns the [`Semaphore`] from which this permit was acquired. |
681 | pub fn semaphore(&self) -> &Arc<Semaphore> { |
682 | &self.sem |
683 | } |
684 | } |
685 | |
686 | impl Drop for SemaphorePermit<'_> { |
687 | fn drop(&mut self) { |
688 | self.sem.add_permits(self.permits as usize); |
689 | } |
690 | } |
691 | |
692 | impl Drop for OwnedSemaphorePermit { |
693 | fn drop(&mut self) { |
694 | self.sem.add_permits(self.permits as usize); |
695 | } |
696 | } |
697 | |