1 | use std::future::Future; |
2 | use std::sync::Arc; |
3 | use std::task::Poll; |
4 | use tokio::sync::{OwnedSemaphorePermit, Semaphore}; |
5 | use tokio_util::sync::PollSemaphore; |
6 | |
7 | type SemRet = Option<OwnedSemaphorePermit>; |
8 | |
9 | fn semaphore_poll( |
10 | sem: &mut PollSemaphore, |
11 | ) -> tokio_test::task::Spawn<impl Future<Output = SemRet> + '_> { |
12 | let fut = futures::future::poll_fn(move |cx| sem.poll_acquire(cx)); |
13 | tokio_test::task::spawn(fut) |
14 | } |
15 | |
16 | fn semaphore_poll_many( |
17 | sem: &mut PollSemaphore, |
18 | permits: u32, |
19 | ) -> tokio_test::task::Spawn<impl Future<Output = SemRet> + '_> { |
20 | let fut = futures::future::poll_fn(move |cx| sem.poll_acquire_many(cx, permits)); |
21 | tokio_test::task::spawn(fut) |
22 | } |
23 | |
24 | #[tokio::test ] |
25 | async fn it_works() { |
26 | let sem = Arc::new(Semaphore::new(1)); |
27 | let mut poll_sem = PollSemaphore::new(sem.clone()); |
28 | |
29 | let permit = sem.acquire().await.unwrap(); |
30 | let mut poll = semaphore_poll(&mut poll_sem); |
31 | assert!(poll.poll().is_pending()); |
32 | drop(permit); |
33 | |
34 | assert!(matches!(poll.poll(), Poll::Ready(Some(_)))); |
35 | drop(poll); |
36 | |
37 | sem.close(); |
38 | |
39 | assert!(semaphore_poll(&mut poll_sem).await.is_none()); |
40 | |
41 | // Check that it is fused. |
42 | assert!(semaphore_poll(&mut poll_sem).await.is_none()); |
43 | assert!(semaphore_poll(&mut poll_sem).await.is_none()); |
44 | } |
45 | |
46 | #[tokio::test ] |
47 | async fn can_acquire_many_permits() { |
48 | let sem = Arc::new(Semaphore::new(4)); |
49 | let mut poll_sem = PollSemaphore::new(sem.clone()); |
50 | |
51 | let permit1 = semaphore_poll(&mut poll_sem).poll(); |
52 | assert!(matches!(permit1, Poll::Ready(Some(_)))); |
53 | |
54 | let permit2 = semaphore_poll_many(&mut poll_sem, 2).poll(); |
55 | assert!(matches!(permit2, Poll::Ready(Some(_)))); |
56 | |
57 | assert_eq!(sem.available_permits(), 1); |
58 | |
59 | drop(permit2); |
60 | |
61 | let mut permit4 = semaphore_poll_many(&mut poll_sem, 4); |
62 | assert!(permit4.poll().is_pending()); |
63 | |
64 | drop(permit1); |
65 | |
66 | let permit4 = permit4.poll(); |
67 | assert!(matches!(permit4, Poll::Ready(Some(_)))); |
68 | assert_eq!(sem.available_permits(), 0); |
69 | } |
70 | |
71 | #[tokio::test ] |
72 | async fn can_poll_different_amounts_of_permits() { |
73 | let sem = Arc::new(Semaphore::new(4)); |
74 | let mut poll_sem = PollSemaphore::new(sem.clone()); |
75 | assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending()); |
76 | assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_ready()); |
77 | |
78 | let permit = sem.acquire_many(4).await.unwrap(); |
79 | assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending()); |
80 | assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_pending()); |
81 | drop(permit); |
82 | assert!(semaphore_poll_many(&mut poll_sem, 5).poll().is_pending()); |
83 | assert!(semaphore_poll_many(&mut poll_sem, 4).poll().is_ready()); |
84 | } |
85 | |