1 | use crate::sync::batch_semaphore::Semaphore; |
2 | use tokio_test::*; |
3 | |
4 | const MAX_PERMITS: usize = crate::sync::Semaphore::MAX_PERMITS; |
5 | |
6 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
7 | use wasm_bindgen_test::wasm_bindgen_test as test; |
8 | |
9 | #[test] |
10 | fn poll_acquire_one_available() { |
11 | let s = Semaphore::new(100); |
12 | assert_eq!(s.available_permits(), 100); |
13 | |
14 | // Polling for a permit succeeds immediately |
15 | assert_ready_ok!(task::spawn(s.acquire(1)).poll()); |
16 | assert_eq!(s.available_permits(), 99); |
17 | } |
18 | |
19 | #[test] |
20 | fn poll_acquire_many_available() { |
21 | let s = Semaphore::new(100); |
22 | assert_eq!(s.available_permits(), 100); |
23 | |
24 | // Polling for a permit succeeds immediately |
25 | assert_ready_ok!(task::spawn(s.acquire(5)).poll()); |
26 | assert_eq!(s.available_permits(), 95); |
27 | |
28 | assert_ready_ok!(task::spawn(s.acquire(5)).poll()); |
29 | assert_eq!(s.available_permits(), 90); |
30 | } |
31 | |
32 | #[test] |
33 | fn try_acquire_one_available() { |
34 | let s = Semaphore::new(100); |
35 | assert_eq!(s.available_permits(), 100); |
36 | |
37 | assert_ok!(s.try_acquire(1)); |
38 | assert_eq!(s.available_permits(), 99); |
39 | |
40 | assert_ok!(s.try_acquire(1)); |
41 | assert_eq!(s.available_permits(), 98); |
42 | } |
43 | |
44 | #[test] |
45 | fn try_acquire_many_available() { |
46 | let s = Semaphore::new(100); |
47 | assert_eq!(s.available_permits(), 100); |
48 | |
49 | assert_ok!(s.try_acquire(5)); |
50 | assert_eq!(s.available_permits(), 95); |
51 | |
52 | assert_ok!(s.try_acquire(5)); |
53 | assert_eq!(s.available_permits(), 90); |
54 | } |
55 | |
56 | #[test] |
57 | fn poll_acquire_one_unavailable() { |
58 | let s = Semaphore::new(1); |
59 | |
60 | // Acquire the first permit |
61 | assert_ready_ok!(task::spawn(s.acquire(1)).poll()); |
62 | assert_eq!(s.available_permits(), 0); |
63 | |
64 | let mut acquire_2 = task::spawn(s.acquire(1)); |
65 | // Try to acquire the second permit |
66 | assert_pending!(acquire_2.poll()); |
67 | assert_eq!(s.available_permits(), 0); |
68 | |
69 | s.release(1); |
70 | |
71 | assert_eq!(s.available_permits(), 0); |
72 | assert!(acquire_2.is_woken()); |
73 | assert_ready_ok!(acquire_2.poll()); |
74 | assert_eq!(s.available_permits(), 0); |
75 | |
76 | s.release(1); |
77 | assert_eq!(s.available_permits(), 1); |
78 | } |
79 | |
80 | #[test] |
81 | fn poll_acquire_many_unavailable() { |
82 | let s = Semaphore::new(5); |
83 | |
84 | // Acquire the first permit |
85 | assert_ready_ok!(task::spawn(s.acquire(1)).poll()); |
86 | assert_eq!(s.available_permits(), 4); |
87 | |
88 | // Try to acquire the second permit |
89 | let mut acquire_2 = task::spawn(s.acquire(5)); |
90 | assert_pending!(acquire_2.poll()); |
91 | assert_eq!(s.available_permits(), 0); |
92 | |
93 | // Try to acquire the third permit |
94 | let mut acquire_3 = task::spawn(s.acquire(3)); |
95 | assert_pending!(acquire_3.poll()); |
96 | assert_eq!(s.available_permits(), 0); |
97 | |
98 | s.release(1); |
99 | |
100 | assert_eq!(s.available_permits(), 0); |
101 | assert!(acquire_2.is_woken()); |
102 | assert_ready_ok!(acquire_2.poll()); |
103 | |
104 | assert!(!acquire_3.is_woken()); |
105 | assert_eq!(s.available_permits(), 0); |
106 | |
107 | s.release(1); |
108 | assert!(!acquire_3.is_woken()); |
109 | assert_eq!(s.available_permits(), 0); |
110 | |
111 | s.release(2); |
112 | assert!(acquire_3.is_woken()); |
113 | |
114 | assert_ready_ok!(acquire_3.poll()); |
115 | } |
116 | |
117 | #[test] |
118 | fn try_acquire_one_unavailable() { |
119 | let s = Semaphore::new(1); |
120 | |
121 | // Acquire the first permit |
122 | assert_ok!(s.try_acquire(1)); |
123 | assert_eq!(s.available_permits(), 0); |
124 | |
125 | assert_err!(s.try_acquire(1)); |
126 | |
127 | s.release(1); |
128 | |
129 | assert_eq!(s.available_permits(), 1); |
130 | assert_ok!(s.try_acquire(1)); |
131 | |
132 | s.release(1); |
133 | assert_eq!(s.available_permits(), 1); |
134 | } |
135 | |
136 | #[test] |
137 | fn try_acquire_many_unavailable() { |
138 | let s = Semaphore::new(5); |
139 | |
140 | // Acquire the first permit |
141 | assert_ok!(s.try_acquire(1)); |
142 | assert_eq!(s.available_permits(), 4); |
143 | |
144 | assert_err!(s.try_acquire(5)); |
145 | |
146 | s.release(1); |
147 | assert_eq!(s.available_permits(), 5); |
148 | |
149 | assert_ok!(s.try_acquire(5)); |
150 | |
151 | s.release(1); |
152 | assert_eq!(s.available_permits(), 1); |
153 | |
154 | s.release(1); |
155 | assert_eq!(s.available_permits(), 2); |
156 | } |
157 | |
158 | #[test] |
159 | fn poll_acquire_one_zero_permits() { |
160 | let s = Semaphore::new(0); |
161 | assert_eq!(s.available_permits(), 0); |
162 | |
163 | // Try to acquire the permit |
164 | let mut acquire = task::spawn(s.acquire(1)); |
165 | assert_pending!(acquire.poll()); |
166 | |
167 | s.release(1); |
168 | |
169 | assert!(acquire.is_woken()); |
170 | assert_ready_ok!(acquire.poll()); |
171 | } |
172 | |
173 | #[test] |
174 | fn max_permits_doesnt_panic() { |
175 | Semaphore::new(MAX_PERMITS); |
176 | } |
177 | |
178 | #[test] |
179 | #[should_panic ] |
180 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
181 | fn validates_max_permits() { |
182 | Semaphore::new(MAX_PERMITS + 1); |
183 | } |
184 | |
185 | #[test] |
186 | fn close_semaphore_prevents_acquire() { |
187 | let s = Semaphore::new(5); |
188 | s.close(); |
189 | |
190 | assert_eq!(5, s.available_permits()); |
191 | |
192 | assert_ready_err!(task::spawn(s.acquire(1)).poll()); |
193 | assert_eq!(5, s.available_permits()); |
194 | |
195 | assert_ready_err!(task::spawn(s.acquire(1)).poll()); |
196 | assert_eq!(5, s.available_permits()); |
197 | } |
198 | |
199 | #[test] |
200 | fn close_semaphore_notifies_permit1() { |
201 | let s = Semaphore::new(0); |
202 | let mut acquire = task::spawn(s.acquire(1)); |
203 | |
204 | assert_pending!(acquire.poll()); |
205 | |
206 | s.close(); |
207 | |
208 | assert!(acquire.is_woken()); |
209 | assert_ready_err!(acquire.poll()); |
210 | } |
211 | |
212 | #[test] |
213 | fn close_semaphore_notifies_permit2() { |
214 | let s = Semaphore::new(2); |
215 | |
216 | // Acquire a couple of permits |
217 | assert_ready_ok!(task::spawn(s.acquire(1)).poll()); |
218 | assert_ready_ok!(task::spawn(s.acquire(1)).poll()); |
219 | |
220 | let mut acquire3 = task::spawn(s.acquire(1)); |
221 | let mut acquire4 = task::spawn(s.acquire(1)); |
222 | assert_pending!(acquire3.poll()); |
223 | assert_pending!(acquire4.poll()); |
224 | |
225 | s.close(); |
226 | |
227 | assert!(acquire3.is_woken()); |
228 | assert!(acquire4.is_woken()); |
229 | |
230 | assert_ready_err!(acquire3.poll()); |
231 | assert_ready_err!(acquire4.poll()); |
232 | |
233 | assert_eq!(0, s.available_permits()); |
234 | |
235 | s.release(1); |
236 | |
237 | assert_eq!(1, s.available_permits()); |
238 | |
239 | assert_ready_err!(task::spawn(s.acquire(1)).poll()); |
240 | |
241 | s.release(1); |
242 | |
243 | assert_eq!(2, s.available_permits()); |
244 | } |
245 | |
246 | #[test] |
247 | fn cancel_acquire_releases_permits() { |
248 | let s = Semaphore::new(10); |
249 | s.try_acquire(4).expect("uncontended try_acquire succeeds" ); |
250 | assert_eq!(6, s.available_permits()); |
251 | |
252 | let mut acquire = task::spawn(s.acquire(8)); |
253 | assert_pending!(acquire.poll()); |
254 | |
255 | assert_eq!(0, s.available_permits()); |
256 | drop(acquire); |
257 | |
258 | assert_eq!(6, s.available_permits()); |
259 | assert_ok!(s.try_acquire(6)); |
260 | } |
261 | |
262 | #[test] |
263 | fn release_permits_at_drop() { |
264 | use crate::sync::semaphore::*; |
265 | use futures::task::ArcWake; |
266 | use std::future::Future; |
267 | use std::sync::Arc; |
268 | |
269 | let sem = Arc::new(Semaphore::new(1)); |
270 | |
271 | struct ReleaseOnDrop(Option<OwnedSemaphorePermit>); |
272 | |
273 | impl ArcWake for ReleaseOnDrop { |
274 | fn wake_by_ref(_arc_self: &Arc<Self>) {} |
275 | } |
276 | |
277 | let mut fut = Box::pin(async { |
278 | let _permit = sem.acquire().await.unwrap(); |
279 | }); |
280 | |
281 | // Second iteration shouldn't deadlock. |
282 | for _ in 0..=1 { |
283 | let waker = futures::task::waker(Arc::new(ReleaseOnDrop( |
284 | sem.clone().try_acquire_owned().ok(), |
285 | ))); |
286 | let mut cx = std::task::Context::from_waker(&waker); |
287 | assert!(fut.as_mut().poll(&mut cx).is_pending()); |
288 | } |
289 | } |
290 | |