1use crate::sync::batch_semaphore::Semaphore;
2use tokio_test::*;
3
4const MAX_PERMITS: usize = crate::sync::Semaphore::MAX_PERMITS;
5
6#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7use wasm_bindgen_test::wasm_bindgen_test as test;
8
9#[test]
10fn poll_acquire_one_available() {
11 let s = Semaphore::new(100);
12 assert_eq!(s.available_permits(), 100);
13
14 // Polling for a permit succeeds immediately
15 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
16 assert_eq!(s.available_permits(), 99);
17}
18
19#[test]
20fn poll_acquire_many_available() {
21 let s = Semaphore::new(100);
22 assert_eq!(s.available_permits(), 100);
23
24 // Polling for a permit succeeds immediately
25 assert_ready_ok!(task::spawn(s.acquire(5)).poll());
26 assert_eq!(s.available_permits(), 95);
27
28 assert_ready_ok!(task::spawn(s.acquire(5)).poll());
29 assert_eq!(s.available_permits(), 90);
30}
31
32#[test]
33fn try_acquire_one_available() {
34 let s = Semaphore::new(100);
35 assert_eq!(s.available_permits(), 100);
36
37 assert_ok!(s.try_acquire(1));
38 assert_eq!(s.available_permits(), 99);
39
40 assert_ok!(s.try_acquire(1));
41 assert_eq!(s.available_permits(), 98);
42}
43
44#[test]
45fn try_acquire_many_available() {
46 let s = Semaphore::new(100);
47 assert_eq!(s.available_permits(), 100);
48
49 assert_ok!(s.try_acquire(5));
50 assert_eq!(s.available_permits(), 95);
51
52 assert_ok!(s.try_acquire(5));
53 assert_eq!(s.available_permits(), 90);
54}
55
56#[test]
57fn poll_acquire_one_unavailable() {
58 let s = Semaphore::new(1);
59
60 // Acquire the first permit
61 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
62 assert_eq!(s.available_permits(), 0);
63
64 let mut acquire_2 = task::spawn(s.acquire(1));
65 // Try to acquire the second permit
66 assert_pending!(acquire_2.poll());
67 assert_eq!(s.available_permits(), 0);
68
69 s.release(1);
70
71 assert_eq!(s.available_permits(), 0);
72 assert!(acquire_2.is_woken());
73 assert_ready_ok!(acquire_2.poll());
74 assert_eq!(s.available_permits(), 0);
75
76 s.release(1);
77 assert_eq!(s.available_permits(), 1);
78}
79
80#[test]
81fn poll_acquire_many_unavailable() {
82 let s = Semaphore::new(5);
83
84 // Acquire the first permit
85 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
86 assert_eq!(s.available_permits(), 4);
87
88 // Try to acquire the second permit
89 let mut acquire_2 = task::spawn(s.acquire(5));
90 assert_pending!(acquire_2.poll());
91 assert_eq!(s.available_permits(), 0);
92
93 // Try to acquire the third permit
94 let mut acquire_3 = task::spawn(s.acquire(3));
95 assert_pending!(acquire_3.poll());
96 assert_eq!(s.available_permits(), 0);
97
98 s.release(1);
99
100 assert_eq!(s.available_permits(), 0);
101 assert!(acquire_2.is_woken());
102 assert_ready_ok!(acquire_2.poll());
103
104 assert!(!acquire_3.is_woken());
105 assert_eq!(s.available_permits(), 0);
106
107 s.release(1);
108 assert!(!acquire_3.is_woken());
109 assert_eq!(s.available_permits(), 0);
110
111 s.release(2);
112 assert!(acquire_3.is_woken());
113
114 assert_ready_ok!(acquire_3.poll());
115}
116
117#[test]
118fn try_acquire_one_unavailable() {
119 let s = Semaphore::new(1);
120
121 // Acquire the first permit
122 assert_ok!(s.try_acquire(1));
123 assert_eq!(s.available_permits(), 0);
124
125 assert_err!(s.try_acquire(1));
126
127 s.release(1);
128
129 assert_eq!(s.available_permits(), 1);
130 assert_ok!(s.try_acquire(1));
131
132 s.release(1);
133 assert_eq!(s.available_permits(), 1);
134}
135
136#[test]
137fn try_acquire_many_unavailable() {
138 let s = Semaphore::new(5);
139
140 // Acquire the first permit
141 assert_ok!(s.try_acquire(1));
142 assert_eq!(s.available_permits(), 4);
143
144 assert_err!(s.try_acquire(5));
145
146 s.release(1);
147 assert_eq!(s.available_permits(), 5);
148
149 assert_ok!(s.try_acquire(5));
150
151 s.release(1);
152 assert_eq!(s.available_permits(), 1);
153
154 s.release(1);
155 assert_eq!(s.available_permits(), 2);
156}
157
158#[test]
159fn poll_acquire_one_zero_permits() {
160 let s = Semaphore::new(0);
161 assert_eq!(s.available_permits(), 0);
162
163 // Try to acquire the permit
164 let mut acquire = task::spawn(s.acquire(1));
165 assert_pending!(acquire.poll());
166
167 s.release(1);
168
169 assert!(acquire.is_woken());
170 assert_ready_ok!(acquire.poll());
171}
172
173#[test]
174fn max_permits_doesnt_panic() {
175 Semaphore::new(MAX_PERMITS);
176}
177
178#[test]
179#[should_panic]
180#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
181fn validates_max_permits() {
182 Semaphore::new(MAX_PERMITS + 1);
183}
184
185#[test]
186fn close_semaphore_prevents_acquire() {
187 let s = Semaphore::new(5);
188 s.close();
189
190 assert_eq!(5, s.available_permits());
191
192 assert_ready_err!(task::spawn(s.acquire(1)).poll());
193 assert_eq!(5, s.available_permits());
194
195 assert_ready_err!(task::spawn(s.acquire(1)).poll());
196 assert_eq!(5, s.available_permits());
197}
198
199#[test]
200fn close_semaphore_notifies_permit1() {
201 let s = Semaphore::new(0);
202 let mut acquire = task::spawn(s.acquire(1));
203
204 assert_pending!(acquire.poll());
205
206 s.close();
207
208 assert!(acquire.is_woken());
209 assert_ready_err!(acquire.poll());
210}
211
212#[test]
213fn close_semaphore_notifies_permit2() {
214 let s = Semaphore::new(2);
215
216 // Acquire a couple of permits
217 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
218 assert_ready_ok!(task::spawn(s.acquire(1)).poll());
219
220 let mut acquire3 = task::spawn(s.acquire(1));
221 let mut acquire4 = task::spawn(s.acquire(1));
222 assert_pending!(acquire3.poll());
223 assert_pending!(acquire4.poll());
224
225 s.close();
226
227 assert!(acquire3.is_woken());
228 assert!(acquire4.is_woken());
229
230 assert_ready_err!(acquire3.poll());
231 assert_ready_err!(acquire4.poll());
232
233 assert_eq!(0, s.available_permits());
234
235 s.release(1);
236
237 assert_eq!(1, s.available_permits());
238
239 assert_ready_err!(task::spawn(s.acquire(1)).poll());
240
241 s.release(1);
242
243 assert_eq!(2, s.available_permits());
244}
245
246#[test]
247fn cancel_acquire_releases_permits() {
248 let s = Semaphore::new(10);
249 s.try_acquire(4).expect("uncontended try_acquire succeeds");
250 assert_eq!(6, s.available_permits());
251
252 let mut acquire = task::spawn(s.acquire(8));
253 assert_pending!(acquire.poll());
254
255 assert_eq!(0, s.available_permits());
256 drop(acquire);
257
258 assert_eq!(6, s.available_permits());
259 assert_ok!(s.try_acquire(6));
260}
261
262#[test]
263fn release_permits_at_drop() {
264 use crate::sync::semaphore::*;
265 use futures::task::ArcWake;
266 use std::future::Future;
267 use std::sync::Arc;
268
269 let sem = Arc::new(Semaphore::new(1));
270
271 struct ReleaseOnDrop(Option<OwnedSemaphorePermit>);
272
273 impl ArcWake for ReleaseOnDrop {
274 fn wake_by_ref(_arc_self: &Arc<Self>) {}
275 }
276
277 let mut fut = Box::pin(async {
278 let _permit = sem.acquire().await.unwrap();
279 });
280
281 // Second iteration shouldn't deadlock.
282 for _ in 0..=1 {
283 let waker = futures::task::waker(Arc::new(ReleaseOnDrop(
284 sem.clone().try_acquire_owned().ok(),
285 )));
286 let mut cx = std::task::Context::from_waker(&waker);
287 assert!(fut.as_mut().poll(&mut cx).is_pending());
288 }
289}
290