1use super::batch_semaphore as ll; // low level implementation
2use super::{AcquireError, TryAcquireError};
3#[cfg(all(tokio_unstable, feature = "tracing"))]
4use crate::util::trace;
5use std::sync::Arc;
6
7/// Counting semaphore performing asynchronous permit acquisition.
8///
9/// A semaphore maintains a set of permits. Permits are used to synchronize
10/// access to a shared resource. A semaphore differs from a mutex in that it
11/// can allow more than one concurrent caller to access the shared resource at a
12/// time.
13///
14/// When `acquire` is called and the semaphore has remaining permits, the
15/// function immediately returns a permit. However, if no remaining permits are
16/// available, `acquire` (asynchronously) waits until an outstanding permit is
17/// dropped. At this point, the freed permit is assigned to the caller.
18///
19/// This `Semaphore` is fair, which means that permits are given out in the order
20/// they were requested. This fairness is also applied when `acquire_many` gets
21/// involved, so if a call to `acquire_many` at the front of the queue requests
22/// more permits than currently available, this can prevent a call to `acquire`
23/// from completing, even if the semaphore has enough permits complete the call
24/// to `acquire`.
25///
26/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
27/// utility.
28///
29/// # Examples
30///
31/// Basic usage:
32///
33/// ```
34/// use tokio::sync::{Semaphore, TryAcquireError};
35///
36/// #[tokio::main]
37/// async fn main() {
38/// let semaphore = Semaphore::new(3);
39///
40/// let a_permit = semaphore.acquire().await.unwrap();
41/// let two_permits = semaphore.acquire_many(2).await.unwrap();
42///
43/// assert_eq!(semaphore.available_permits(), 0);
44///
45/// let permit_attempt = semaphore.try_acquire();
46/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
47/// }
48/// ```
49///
50/// ## Limit the number of simultaneously opened files in your program
51///
52/// Most operating systems have limits on the number of open file
53/// handles. Even in systems without explicit limits, resource constraints
54/// implicitly set an upper bound on the number of open files. If your
55/// program attempts to open a large number of files and exceeds this
56/// limit, it will result in an error.
57///
58/// This example uses a Semaphore with 100 permits. By acquiring a permit from
59/// the Semaphore before accessing a file, you ensure that your program opens
60/// no more than 100 files at a time. When trying to open the 101st
61/// file, the program will wait until a permit becomes available before
62/// proceeding to open another file.
63/// ```
64/// use std::io::Result;
65/// use tokio::fs::File;
66/// use tokio::sync::Semaphore;
67/// use tokio::io::AsyncWriteExt;
68///
69/// static PERMITS: Semaphore = Semaphore::const_new(100);
70///
71/// async fn write_to_file(message: &[u8]) -> Result<()> {
72/// let _permit = PERMITS.acquire().await.unwrap();
73/// let mut buffer = File::create("example.txt").await?;
74/// buffer.write_all(message).await?;
75/// Ok(()) // Permit goes out of scope here, and is available again for acquisition
76/// }
77/// ```
78///
79/// ## Limit the number of incoming requests being handled at the same time
80///
81/// Similar to limiting the number of simultaneously opened files, network handles
82/// are a limited resource. Allowing an unbounded amount of requests to be processed
83/// could result in a denial-of-service, among many other issues.
84///
85/// This example uses an `Arc<Semaphore>` instead of a global variable.
86/// To limit the number of requests that can be processed at the time,
87/// we acquire a permit for each task before spawning it. Once acquired,
88/// a new task is spawned; and once finished, the permit is dropped inside
89/// of the task to allow others to spawn. Permits must be acquired via
90/// [`Semaphore::acquire_owned`] to be movable across the task boundary.
91/// (Since our semaphore is not a global variable — if it was, then `acquire` would be enough.)
92///
93/// ```no_run
94/// use std::sync::Arc;
95/// use tokio::sync::Semaphore;
96/// use tokio::net::TcpListener;
97///
98/// #[tokio::main]
99/// async fn main() -> std::io::Result<()> {
100/// let semaphore = Arc::new(Semaphore::new(3));
101/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
102///
103/// loop {
104/// // Acquire permit before accepting the next socket.
105/// //
106/// // We use `acquire_owned` so that we can move `permit` into
107/// // other tasks.
108/// let permit = semaphore.clone().acquire_owned().await.unwrap();
109/// let (mut socket, _) = listener.accept().await?;
110///
111/// tokio::spawn(async move {
112/// // Do work using the socket.
113/// handle_connection(&mut socket).await;
114/// // Drop socket while the permit is still live.
115/// drop(socket);
116/// // Drop the permit, so more tasks can be created.
117/// drop(permit);
118/// });
119/// }
120/// }
121/// # async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
122/// # // Do work
123/// # }
124/// ```
125///
126/// ## Prevent tests from running in parallel
127///
128/// By default, Rust runs tests in the same file in parallel. However, in some
129/// cases, running two tests in parallel may lead to problems. For example, this
130/// can happen when tests use the same database.
131///
132/// Consider the following scenario:
133/// 1. `test_insert`: Inserts a key-value pair into the database, then retrieves
134/// the value using the same key to verify the insertion.
135/// 2. `test_update`: Inserts a key, then updates the key to a new value and
136/// verifies that the value has been accurately updated.
137/// 3. `test_others`: A third test that doesn't modify the database state. It
138/// can run in parallel with the other tests.
139///
140/// In this example, `test_insert` and `test_update` need to run in sequence to
141/// work, but it doesn't matter which test runs first. We can leverage a
142/// semaphore with a single permit to address this challenge.
143///
144/// ```
145/// # use tokio::sync::Mutex;
146/// # use std::collections::BTreeMap;
147/// # struct Database {
148/// # map: Mutex<BTreeMap<String, i32>>,
149/// # }
150/// # impl Database {
151/// # pub const fn setup() -> Database {
152/// # Database {
153/// # map: Mutex::const_new(BTreeMap::new()),
154/// # }
155/// # }
156/// # pub async fn insert(&self, key: &str, value: i32) {
157/// # self.map.lock().await.insert(key.to_string(), value);
158/// # }
159/// # pub async fn update(&self, key: &str, value: i32) {
160/// # self.map.lock().await
161/// # .entry(key.to_string())
162/// # .and_modify(|origin| *origin = value);
163/// # }
164/// # pub async fn delete(&self, key: &str) {
165/// # self.map.lock().await.remove(key);
166/// # }
167/// # pub async fn get(&self, key: &str) -> i32 {
168/// # *self.map.lock().await.get(key).unwrap()
169/// # }
170/// # }
171/// use tokio::sync::Semaphore;
172///
173/// // Initialize a static semaphore with only one permit, which is used to
174/// // prevent test_insert and test_update from running in parallel.
175/// static PERMIT: Semaphore = Semaphore::const_new(1);
176///
177/// // Initialize the database that will be used by the subsequent tests.
178/// static DB: Database = Database::setup();
179///
180/// #[tokio::test]
181/// # async fn fake_test_insert() {}
182/// async fn test_insert() {
183/// // Acquire permit before proceeding. Since the semaphore has only one permit,
184/// // the test will wait if the permit is already acquired by other tests.
185/// let permit = PERMIT.acquire().await.unwrap();
186///
187/// // Do the actual test stuff with database
188///
189/// // Insert a key-value pair to database
190/// let (key, value) = ("name", 0);
191/// DB.insert(key, value).await;
192///
193/// // Verify that the value has been inserted correctly.
194/// assert_eq!(DB.get(key).await, value);
195///
196/// // Undo the insertion, so the database is empty at the end of the test.
197/// DB.delete(key).await;
198///
199/// // Drop permit. This allows the other test to start running.
200/// drop(permit);
201/// }
202///
203/// #[tokio::test]
204/// # async fn fake_test_update() {}
205/// async fn test_update() {
206/// // Acquire permit before proceeding. Since the semaphore has only one permit,
207/// // the test will wait if the permit is already acquired by other tests.
208/// let permit = PERMIT.acquire().await.unwrap();
209///
210/// // Do the same insert.
211/// let (key, value) = ("name", 0);
212/// DB.insert(key, value).await;
213///
214/// // Update the existing value with a new one.
215/// let new_value = 1;
216/// DB.update(key, new_value).await;
217///
218/// // Verify that the value has been updated correctly.
219/// assert_eq!(DB.get(key).await, new_value);
220///
221/// // Undo any modificattion.
222/// DB.delete(key).await;
223///
224/// // Drop permit. This allows the other test to start running.
225/// drop(permit);
226/// }
227///
228/// #[tokio::test]
229/// # async fn fake_test_others() {}
230/// async fn test_others() {
231/// // This test can run in parallel with test_insert and test_update,
232/// // so it does not use PERMIT.
233/// }
234/// # #[tokio::main(flavor = "current_thread")]
235/// # async fn main() {
236/// # test_insert().await;
237/// # test_update().await;
238/// # test_others().await;
239/// # }
240/// ```
241///
242/// ## Rate limiting using a token bucket
243///
244/// This example showcases the [`add_permits`] and [`SemaphorePermit::forget`] methods.
245///
246/// Many applications and systems have constraints on the rate at which certain
247/// operations should occur. Exceeding this rate can result in suboptimal
248/// performance or even errors.
249///
250/// This example implements rate limiting using a [token bucket]. A token bucket is a form of rate
251/// limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that
252/// arrive at the same time.
253///
254/// With a token bucket, each incoming request consumes a token, and the tokens are refilled at a
255/// certain rate that defines the rate limit. When a burst of requests arrives, tokens are
256/// immediately given out until the bucket is empty. Once the bucket is empty, requests will have to
257/// wait for new tokens to be added.
258///
259/// Unlike the example that limits how many requests can be handled at the same time, we do not add
260/// tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
261///
262/// Note that this implementation is suboptimal when the duration is small, because it consumes a
263/// lot of cpu constantly looping and sleeping.
264///
265/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
266/// [`add_permits`]: crate::sync::Semaphore::add_permits
267/// [`SemaphorePermit::forget`]: crate::sync::SemaphorePermit::forget
268/// ```
269/// use std::sync::Arc;
270/// use tokio::sync::Semaphore;
271/// use tokio::time::{interval, Duration};
272///
273/// struct TokenBucket {
274/// sem: Arc<Semaphore>,
275/// jh: tokio::task::JoinHandle<()>,
276/// }
277///
278/// impl TokenBucket {
279/// fn new(duration: Duration, capacity: usize) -> Self {
280/// let sem = Arc::new(Semaphore::new(capacity));
281///
282/// // refills the tokens at the end of each interval
283/// let jh = tokio::spawn({
284/// let sem = sem.clone();
285/// let mut interval = interval(duration);
286/// interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
287///
288/// async move {
289/// loop {
290/// interval.tick().await;
291///
292/// if sem.available_permits() < capacity {
293/// sem.add_permits(1);
294/// }
295/// }
296/// }
297/// });
298///
299/// Self { jh, sem }
300/// }
301///
302/// async fn acquire(&self) {
303/// // This can return an error if the semaphore is closed, but we
304/// // never close it, so this error can never happen.
305/// let permit = self.sem.acquire().await.unwrap();
306/// // To avoid releasing the permit back to the semaphore, we use
307/// // the `SemaphorePermit::forget` method.
308/// permit.forget();
309/// }
310/// }
311///
312/// impl Drop for TokenBucket {
313/// fn drop(&mut self) {
314/// // Kill the background task so it stops taking up resources when we
315/// // don't need it anymore.
316/// self.jh.abort();
317/// }
318/// }
319///
320/// #[tokio::main]
321/// # async fn _hidden() {}
322/// # #[tokio::main(flavor = "current_thread", start_paused = true)]
323/// async fn main() {
324/// let capacity = 5;
325/// let update_interval = Duration::from_secs_f32(1.0 / capacity as f32);
326/// let bucket = TokenBucket::new(update_interval, capacity);
327///
328/// for _ in 0..5 {
329/// bucket.acquire().await;
330///
331/// // do the operation
332/// }
333/// }
334/// ```
335///
336/// [`PollSemaphore`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSemaphore.html
337/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
338#[derive(Debug)]
339pub struct Semaphore {
340 /// The low level semaphore
341 ll_sem: ll::Semaphore,
342 #[cfg(all(tokio_unstable, feature = "tracing"))]
343 resource_span: tracing::Span,
344}
345
346/// A permit from the semaphore.
347///
348/// This type is created by the [`acquire`] method.
349///
350/// [`acquire`]: crate::sync::Semaphore::acquire()
351#[must_use]
352#[clippy::has_significant_drop]
353#[derive(Debug)]
354pub struct SemaphorePermit<'a> {
355 sem: &'a Semaphore,
356 permits: u32,
357}
358
359/// An owned permit from the semaphore.
360///
361/// This type is created by the [`acquire_owned`] method.
362///
363/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
364#[must_use]
365#[clippy::has_significant_drop]
366#[derive(Debug)]
367pub struct OwnedSemaphorePermit {
368 sem: Arc<Semaphore>,
369 permits: u32,
370}
371
372#[test]
373#[cfg(not(loom))]
374fn bounds() {
375 fn check_unpin<T: Unpin>() {}
376 // This has to take a value, since the async fn's return type is unnameable.
377 fn check_send_sync_val<T: Send + Sync>(_t: T) {}
378 fn check_send_sync<T: Send + Sync>() {}
379 check_unpin::<Semaphore>();
380 check_unpin::<SemaphorePermit<'_>>();
381 check_send_sync::<Semaphore>();
382
383 let semaphore = Semaphore::new(0);
384 check_send_sync_val(semaphore.acquire());
385}
386
387impl Semaphore {
388 /// The maximum number of permits which a semaphore can hold. It is `usize::MAX >> 3`.
389 ///
390 /// Exceeding this limit typically results in a panic.
391 pub const MAX_PERMITS: usize = super::batch_semaphore::Semaphore::MAX_PERMITS;
392
393 /// Creates a new semaphore with the initial number of permits.
394 ///
395 /// Panics if `permits` exceeds [`Semaphore::MAX_PERMITS`].
396 #[track_caller]
397 pub fn new(permits: usize) -> Self {
398 #[cfg(all(tokio_unstable, feature = "tracing"))]
399 let resource_span = {
400 let location = std::panic::Location::caller();
401
402 tracing::trace_span!(
403 parent: None,
404 "runtime.resource",
405 concrete_type = "Semaphore",
406 kind = "Sync",
407 loc.file = location.file(),
408 loc.line = location.line(),
409 loc.col = location.column(),
410 inherits_child_attrs = true,
411 )
412 };
413
414 #[cfg(all(tokio_unstable, feature = "tracing"))]
415 let ll_sem = resource_span.in_scope(|| ll::Semaphore::new(permits));
416
417 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
418 let ll_sem = ll::Semaphore::new(permits);
419
420 Self {
421 ll_sem,
422 #[cfg(all(tokio_unstable, feature = "tracing"))]
423 resource_span,
424 }
425 }
426
427 /// Creates a new semaphore with the initial number of permits.
428 ///
429 /// When using the `tracing` [unstable feature], a `Semaphore` created with
430 /// `const_new` will not be instrumented. As such, it will not be visible
431 /// in [`tokio-console`]. Instead, [`Semaphore::new`] should be used to
432 /// create an instrumented object if that is needed.
433 ///
434 /// # Examples
435 ///
436 /// ```
437 /// use tokio::sync::Semaphore;
438 ///
439 /// static SEM: Semaphore = Semaphore::const_new(10);
440 /// ```
441 ///
442 /// [`tokio-console`]: https://github.com/tokio-rs/console
443 /// [unstable feature]: crate#unstable-features
444 #[cfg(not(all(loom, test)))]
445 pub const fn const_new(permits: usize) -> Self {
446 Self {
447 ll_sem: ll::Semaphore::const_new(permits),
448 #[cfg(all(tokio_unstable, feature = "tracing"))]
449 resource_span: tracing::Span::none(),
450 }
451 }
452
453 /// Creates a new closed semaphore with 0 permits.
454 pub(crate) fn new_closed() -> Self {
455 Self {
456 ll_sem: ll::Semaphore::new_closed(),
457 #[cfg(all(tokio_unstable, feature = "tracing"))]
458 resource_span: tracing::Span::none(),
459 }
460 }
461
462 /// Creates a new closed semaphore with 0 permits.
463 #[cfg(not(all(loom, test)))]
464 pub(crate) const fn const_new_closed() -> Self {
465 Self {
466 ll_sem: ll::Semaphore::const_new_closed(),
467 #[cfg(all(tokio_unstable, feature = "tracing"))]
468 resource_span: tracing::Span::none(),
469 }
470 }
471
472 /// Returns the current number of available permits.
473 pub fn available_permits(&self) -> usize {
474 self.ll_sem.available_permits()
475 }
476
477 /// Adds `n` new permits to the semaphore.
478 ///
479 /// The maximum number of permits is [`Semaphore::MAX_PERMITS`], and this function will panic if the limit is exceeded.
480 pub fn add_permits(&self, n: usize) {
481 self.ll_sem.release(n);
482 }
483
484 /// Acquires a permit from the semaphore.
485 ///
486 /// If the semaphore has been closed, this returns an [`AcquireError`].
487 /// Otherwise, this returns a [`SemaphorePermit`] representing the
488 /// acquired permit.
489 ///
490 /// # Cancel safety
491 ///
492 /// This method uses a queue to fairly distribute permits in the order they
493 /// were requested. Cancelling a call to `acquire` makes you lose your place
494 /// in the queue.
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// use tokio::sync::Semaphore;
500 ///
501 /// #[tokio::main]
502 /// async fn main() {
503 /// let semaphore = Semaphore::new(2);
504 ///
505 /// let permit_1 = semaphore.acquire().await.unwrap();
506 /// assert_eq!(semaphore.available_permits(), 1);
507 ///
508 /// let permit_2 = semaphore.acquire().await.unwrap();
509 /// assert_eq!(semaphore.available_permits(), 0);
510 ///
511 /// drop(permit_1);
512 /// assert_eq!(semaphore.available_permits(), 1);
513 /// }
514 /// ```
515 ///
516 /// [`AcquireError`]: crate::sync::AcquireError
517 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
518 pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
519 #[cfg(all(tokio_unstable, feature = "tracing"))]
520 let inner = trace::async_op(
521 || self.ll_sem.acquire(1),
522 self.resource_span.clone(),
523 "Semaphore::acquire",
524 "poll",
525 true,
526 );
527 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
528 let inner = self.ll_sem.acquire(1);
529
530 inner.await?;
531 Ok(SemaphorePermit {
532 sem: self,
533 permits: 1,
534 })
535 }
536
537 /// Acquires `n` permits from the semaphore.
538 ///
539 /// If the semaphore has been closed, this returns an [`AcquireError`].
540 /// Otherwise, this returns a [`SemaphorePermit`] representing the
541 /// acquired permits.
542 ///
543 /// # Cancel safety
544 ///
545 /// This method uses a queue to fairly distribute permits in the order they
546 /// were requested. Cancelling a call to `acquire_many` makes you lose your
547 /// place in the queue.
548 ///
549 /// # Examples
550 ///
551 /// ```
552 /// use tokio::sync::Semaphore;
553 ///
554 /// #[tokio::main]
555 /// async fn main() {
556 /// let semaphore = Semaphore::new(5);
557 ///
558 /// let permit = semaphore.acquire_many(3).await.unwrap();
559 /// assert_eq!(semaphore.available_permits(), 2);
560 /// }
561 /// ```
562 ///
563 /// [`AcquireError`]: crate::sync::AcquireError
564 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
565 pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
566 #[cfg(all(tokio_unstable, feature = "tracing"))]
567 trace::async_op(
568 || self.ll_sem.acquire(n),
569 self.resource_span.clone(),
570 "Semaphore::acquire_many",
571 "poll",
572 true,
573 )
574 .await?;
575
576 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
577 self.ll_sem.acquire(n).await?;
578
579 Ok(SemaphorePermit {
580 sem: self,
581 permits: n,
582 })
583 }
584
585 /// Tries to acquire a permit from the semaphore.
586 ///
587 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
588 /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
589 /// this returns a [`SemaphorePermit`] representing the acquired permits.
590 ///
591 /// # Examples
592 ///
593 /// ```
594 /// use tokio::sync::{Semaphore, TryAcquireError};
595 ///
596 /// # fn main() {
597 /// let semaphore = Semaphore::new(2);
598 ///
599 /// let permit_1 = semaphore.try_acquire().unwrap();
600 /// assert_eq!(semaphore.available_permits(), 1);
601 ///
602 /// let permit_2 = semaphore.try_acquire().unwrap();
603 /// assert_eq!(semaphore.available_permits(), 0);
604 ///
605 /// let permit_3 = semaphore.try_acquire();
606 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
607 /// # }
608 /// ```
609 ///
610 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
611 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
612 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
613 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
614 match self.ll_sem.try_acquire(1) {
615 Ok(()) => Ok(SemaphorePermit {
616 sem: self,
617 permits: 1,
618 }),
619 Err(e) => Err(e),
620 }
621 }
622
623 /// Tries to acquire `n` permits from the semaphore.
624 ///
625 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
626 /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
627 /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// use tokio::sync::{Semaphore, TryAcquireError};
633 ///
634 /// # fn main() {
635 /// let semaphore = Semaphore::new(4);
636 ///
637 /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
638 /// assert_eq!(semaphore.available_permits(), 1);
639 ///
640 /// let permit_2 = semaphore.try_acquire_many(2);
641 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
642 /// # }
643 /// ```
644 ///
645 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
646 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
647 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
648 pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
649 match self.ll_sem.try_acquire(n) {
650 Ok(()) => Ok(SemaphorePermit {
651 sem: self,
652 permits: n,
653 }),
654 Err(e) => Err(e),
655 }
656 }
657
658 /// Acquires a permit from the semaphore.
659 ///
660 /// The semaphore must be wrapped in an [`Arc`] to call this method.
661 /// If the semaphore has been closed, this returns an [`AcquireError`].
662 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
663 /// acquired permit.
664 ///
665 /// # Cancel safety
666 ///
667 /// This method uses a queue to fairly distribute permits in the order they
668 /// were requested. Cancelling a call to `acquire_owned` makes you lose your
669 /// place in the queue.
670 ///
671 /// # Examples
672 ///
673 /// ```
674 /// use std::sync::Arc;
675 /// use tokio::sync::Semaphore;
676 ///
677 /// #[tokio::main]
678 /// async fn main() {
679 /// let semaphore = Arc::new(Semaphore::new(3));
680 /// let mut join_handles = Vec::new();
681 ///
682 /// for _ in 0..5 {
683 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
684 /// join_handles.push(tokio::spawn(async move {
685 /// // perform task...
686 /// // explicitly own `permit` in the task
687 /// drop(permit);
688 /// }));
689 /// }
690 ///
691 /// for handle in join_handles {
692 /// handle.await.unwrap();
693 /// }
694 /// }
695 /// ```
696 ///
697 /// [`Arc`]: std::sync::Arc
698 /// [`AcquireError`]: crate::sync::AcquireError
699 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
700 pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
701 #[cfg(all(tokio_unstable, feature = "tracing"))]
702 let inner = trace::async_op(
703 || self.ll_sem.acquire(1),
704 self.resource_span.clone(),
705 "Semaphore::acquire_owned",
706 "poll",
707 true,
708 );
709 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
710 let inner = self.ll_sem.acquire(1);
711
712 inner.await?;
713 Ok(OwnedSemaphorePermit {
714 sem: self,
715 permits: 1,
716 })
717 }
718
719 /// Acquires `n` permits from the semaphore.
720 ///
721 /// The semaphore must be wrapped in an [`Arc`] to call this method.
722 /// If the semaphore has been closed, this returns an [`AcquireError`].
723 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
724 /// acquired permit.
725 ///
726 /// # Cancel safety
727 ///
728 /// This method uses a queue to fairly distribute permits in the order they
729 /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
730 /// your place in the queue.
731 ///
732 /// # Examples
733 ///
734 /// ```
735 /// use std::sync::Arc;
736 /// use tokio::sync::Semaphore;
737 ///
738 /// #[tokio::main]
739 /// async fn main() {
740 /// let semaphore = Arc::new(Semaphore::new(10));
741 /// let mut join_handles = Vec::new();
742 ///
743 /// for _ in 0..5 {
744 /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
745 /// join_handles.push(tokio::spawn(async move {
746 /// // perform task...
747 /// // explicitly own `permit` in the task
748 /// drop(permit);
749 /// }));
750 /// }
751 ///
752 /// for handle in join_handles {
753 /// handle.await.unwrap();
754 /// }
755 /// }
756 /// ```
757 ///
758 /// [`Arc`]: std::sync::Arc
759 /// [`AcquireError`]: crate::sync::AcquireError
760 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
761 pub async fn acquire_many_owned(
762 self: Arc<Self>,
763 n: u32,
764 ) -> Result<OwnedSemaphorePermit, AcquireError> {
765 #[cfg(all(tokio_unstable, feature = "tracing"))]
766 let inner = trace::async_op(
767 || self.ll_sem.acquire(n),
768 self.resource_span.clone(),
769 "Semaphore::acquire_many_owned",
770 "poll",
771 true,
772 );
773 #[cfg(not(all(tokio_unstable, feature = "tracing")))]
774 let inner = self.ll_sem.acquire(n);
775
776 inner.await?;
777 Ok(OwnedSemaphorePermit {
778 sem: self,
779 permits: n,
780 })
781 }
782
783 /// Tries to acquire a permit from the semaphore.
784 ///
785 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
786 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
787 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
788 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
789 /// acquired permit.
790 ///
791 /// # Examples
792 ///
793 /// ```
794 /// use std::sync::Arc;
795 /// use tokio::sync::{Semaphore, TryAcquireError};
796 ///
797 /// # fn main() {
798 /// let semaphore = Arc::new(Semaphore::new(2));
799 ///
800 /// let permit_1 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
801 /// assert_eq!(semaphore.available_permits(), 1);
802 ///
803 /// let permit_2 = Arc::clone(&semaphore).try_acquire_owned().unwrap();
804 /// assert_eq!(semaphore.available_permits(), 0);
805 ///
806 /// let permit_3 = semaphore.try_acquire_owned();
807 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
808 /// # }
809 /// ```
810 ///
811 /// [`Arc`]: std::sync::Arc
812 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
813 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
814 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
815 pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
816 match self.ll_sem.try_acquire(1) {
817 Ok(()) => Ok(OwnedSemaphorePermit {
818 sem: self,
819 permits: 1,
820 }),
821 Err(e) => Err(e),
822 }
823 }
824
825 /// Tries to acquire `n` permits from the semaphore.
826 ///
827 /// The semaphore must be wrapped in an [`Arc`] to call this method. If
828 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
829 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
830 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
831 /// acquired permit.
832 ///
833 /// # Examples
834 ///
835 /// ```
836 /// use std::sync::Arc;
837 /// use tokio::sync::{Semaphore, TryAcquireError};
838 ///
839 /// # fn main() {
840 /// let semaphore = Arc::new(Semaphore::new(4));
841 ///
842 /// let permit_1 = Arc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
843 /// assert_eq!(semaphore.available_permits(), 1);
844 ///
845 /// let permit_2 = semaphore.try_acquire_many_owned(2);
846 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
847 /// # }
848 /// ```
849 ///
850 /// [`Arc`]: std::sync::Arc
851 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
852 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
853 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
854 pub fn try_acquire_many_owned(
855 self: Arc<Self>,
856 n: u32,
857 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
858 match self.ll_sem.try_acquire(n) {
859 Ok(()) => Ok(OwnedSemaphorePermit {
860 sem: self,
861 permits: n,
862 }),
863 Err(e) => Err(e),
864 }
865 }
866
867 /// Closes the semaphore.
868 ///
869 /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
870 ///
871 /// # Examples
872 ///
873 /// ```
874 /// use tokio::sync::Semaphore;
875 /// use std::sync::Arc;
876 /// use tokio::sync::TryAcquireError;
877 ///
878 /// #[tokio::main]
879 /// async fn main() {
880 /// let semaphore = Arc::new(Semaphore::new(1));
881 /// let semaphore2 = semaphore.clone();
882 ///
883 /// tokio::spawn(async move {
884 /// let permit = semaphore.acquire_many(2).await;
885 /// assert!(permit.is_err());
886 /// println!("waiter received error");
887 /// });
888 ///
889 /// println!("closing semaphore");
890 /// semaphore2.close();
891 ///
892 /// // Cannot obtain more permits
893 /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
894 /// }
895 /// ```
896 pub fn close(&self) {
897 self.ll_sem.close();
898 }
899
900 /// Returns true if the semaphore is closed
901 pub fn is_closed(&self) -> bool {
902 self.ll_sem.is_closed()
903 }
904}
905
906impl<'a> SemaphorePermit<'a> {
907 /// Forgets the permit **without** releasing it back to the semaphore.
908 /// This can be used to reduce the amount of permits available from a
909 /// semaphore.
910 pub fn forget(mut self) {
911 self.permits = 0;
912 }
913
914 /// Merge two [`SemaphorePermit`] instances together, consuming `other`
915 /// without releasing the permits it holds.
916 ///
917 /// Permits held by both `self` and `other` are released when `self` drops.
918 ///
919 /// # Panics
920 ///
921 /// This function panics if permits from different [`Semaphore`] instances
922 /// are merged.
923 #[track_caller]
924 pub fn merge(&mut self, mut other: Self) {
925 assert!(
926 std::ptr::eq(self.sem, other.sem),
927 "merging permits from different semaphore instances"
928 );
929 self.permits += other.permits;
930 other.permits = 0;
931 }
932}
933
934impl OwnedSemaphorePermit {
935 /// Forgets the permit **without** releasing it back to the semaphore.
936 /// This can be used to reduce the amount of permits available from a
937 /// semaphore.
938 pub fn forget(mut self) {
939 self.permits = 0;
940 }
941
942 /// Merge two [`OwnedSemaphorePermit`] instances together, consuming `other`
943 /// without releasing the permits it holds.
944 ///
945 /// Permits held by both `self` and `other` are released when `self` drops.
946 ///
947 /// # Panics
948 ///
949 /// This function panics if permits from different [`Semaphore`] instances
950 /// are merged.
951 #[track_caller]
952 pub fn merge(&mut self, mut other: Self) {
953 assert!(
954 Arc::ptr_eq(&self.sem, &other.sem),
955 "merging permits from different semaphore instances"
956 );
957 self.permits += other.permits;
958 other.permits = 0;
959 }
960
961 /// Returns the [`Semaphore`] from which this permit was acquired.
962 pub fn semaphore(&self) -> &Arc<Semaphore> {
963 &self.sem
964 }
965}
966
967impl Drop for SemaphorePermit<'_> {
968 fn drop(&mut self) {
969 self.sem.add_permits(self.permits as usize);
970 }
971}
972
973impl Drop for OwnedSemaphorePermit {
974 fn drop(&mut self) {
975 self.sem.add_permits(self.permits as usize);
976 }
977}
978