1 | #![cfg (test)] |
2 | |
3 | use std::sync::atomic::{AtomicUsize, Ordering}; |
4 | use std::sync::mpsc::channel; |
5 | use std::sync::{Arc, Mutex}; |
6 | |
7 | use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder}; |
8 | |
9 | #[test] |
10 | #[should_panic (expected = "Hello, world!" )] |
11 | fn panic_propagate() { |
12 | let thread_pool = ThreadPoolBuilder::new().build().unwrap(); |
13 | thread_pool.install(|| { |
14 | panic!("Hello, world!" ); |
15 | }); |
16 | } |
17 | |
18 | #[test] |
19 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
20 | fn workers_stop() { |
21 | let registry; |
22 | |
23 | { |
24 | // once we exit this block, thread-pool will be dropped |
25 | let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); |
26 | registry = thread_pool.install(|| { |
27 | // do some work on these threads |
28 | join_a_lot(22); |
29 | |
30 | Arc::clone(&thread_pool.registry) |
31 | }); |
32 | assert_eq!(registry.num_threads(), 22); |
33 | } |
34 | |
35 | // once thread-pool is dropped, registry should terminate, which |
36 | // should lead to worker threads stopping |
37 | registry.wait_until_stopped(); |
38 | } |
39 | |
40 | fn join_a_lot(n: usize) { |
41 | if n > 0 { |
42 | join(|| join_a_lot(n - 1), || join_a_lot(n - 1)); |
43 | } |
44 | } |
45 | |
46 | #[test] |
47 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
48 | fn sleeper_stop() { |
49 | use std::{thread, time}; |
50 | |
51 | let registry; |
52 | |
53 | { |
54 | // once we exit this block, thread-pool will be dropped |
55 | let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap(); |
56 | registry = Arc::clone(&thread_pool.registry); |
57 | |
58 | // Give time for at least some of the thread pool to fall asleep. |
59 | thread::sleep(time::Duration::from_secs(1)); |
60 | } |
61 | |
62 | // once thread-pool is dropped, registry should terminate, which |
63 | // should lead to worker threads stopping |
64 | registry.wait_until_stopped(); |
65 | } |
66 | |
67 | /// Creates a start/exit handler that increments an atomic counter. |
68 | fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) { |
69 | let count = Arc::new(AtomicUsize::new(0)); |
70 | (Arc::clone(&count), move |_| { |
71 | count.fetch_add(1, Ordering::SeqCst); |
72 | }) |
73 | } |
74 | |
75 | /// Wait until a counter is no longer shared, then return its value. |
76 | fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize { |
77 | use std::{thread, time}; |
78 | |
79 | for _ in 0..60 { |
80 | counter = match Arc::try_unwrap(counter) { |
81 | Ok(counter) => return counter.into_inner(), |
82 | Err(counter) => { |
83 | thread::sleep(time::Duration::from_secs(1)); |
84 | counter |
85 | } |
86 | }; |
87 | } |
88 | |
89 | // That's too long! |
90 | panic!("Counter is still shared!" ); |
91 | } |
92 | |
93 | #[test] |
94 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
95 | fn failed_thread_stack() { |
96 | // Note: we first tried to force failure with a `usize::MAX` stack, but |
97 | // macOS and Windows weren't fazed, or at least didn't fail the way we want. |
98 | // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a |
99 | // 2GB stack, so it might not fail until the second thread. |
100 | let stack_size = ::std::isize::MAX as usize; |
101 | |
102 | let (start_count, start_handler) = count_handler(); |
103 | let (exit_count, exit_handler) = count_handler(); |
104 | let builder = ThreadPoolBuilder::new() |
105 | .num_threads(10) |
106 | .stack_size(stack_size) |
107 | .start_handler(start_handler) |
108 | .exit_handler(exit_handler); |
109 | |
110 | let pool = builder.build(); |
111 | assert!(pool.is_err(), "thread stack should have failed!" ); |
112 | |
113 | // With such a huge stack, 64-bit will probably fail on the first thread; |
114 | // 32-bit might manage the first 2GB, but certainly fail the second. |
115 | let start_count = wait_for_counter(start_count); |
116 | assert!(start_count <= 1); |
117 | assert_eq!(start_count, wait_for_counter(exit_count)); |
118 | } |
119 | |
120 | #[test] |
121 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
122 | fn panic_thread_name() { |
123 | let (start_count, start_handler) = count_handler(); |
124 | let (exit_count, exit_handler) = count_handler(); |
125 | let builder = ThreadPoolBuilder::new() |
126 | .num_threads(10) |
127 | .start_handler(start_handler) |
128 | .exit_handler(exit_handler) |
129 | .thread_name(|i| { |
130 | if i >= 5 { |
131 | panic!(); |
132 | } |
133 | format!("panic_thread_name#{}" , i) |
134 | }); |
135 | |
136 | let pool = crate::unwind::halt_unwinding(|| builder.build()); |
137 | assert!(pool.is_err(), "thread-name panic should propagate!" ); |
138 | |
139 | // Assuming they're created in order, threads 0 through 4 should have |
140 | // been started already, and then terminated by the panic. |
141 | assert_eq!(5, wait_for_counter(start_count)); |
142 | assert_eq!(5, wait_for_counter(exit_count)); |
143 | } |
144 | |
145 | #[test] |
146 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
147 | fn self_install() { |
148 | let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
149 | |
150 | // If the inner `install` blocks, then nothing will actually run it! |
151 | assert!(pool.install(|| pool.install(|| true))); |
152 | } |
153 | |
154 | #[test] |
155 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
156 | fn mutual_install() { |
157 | let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
158 | let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
159 | |
160 | let ok = pool1.install(|| { |
161 | // This creates a dependency from `pool1` -> `pool2` |
162 | pool2.install(|| { |
163 | // This creates a dependency from `pool2` -> `pool1` |
164 | pool1.install(|| { |
165 | // If they blocked on inter-pool installs, there would be no |
166 | // threads left to run this! |
167 | true |
168 | }) |
169 | }) |
170 | }); |
171 | assert!(ok); |
172 | } |
173 | |
174 | #[test] |
175 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
176 | fn mutual_install_sleepy() { |
177 | use std::{thread, time}; |
178 | |
179 | let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
180 | let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
181 | |
182 | let ok = pool1.install(|| { |
183 | // This creates a dependency from `pool1` -> `pool2` |
184 | pool2.install(|| { |
185 | // Give `pool1` time to fall asleep. |
186 | thread::sleep(time::Duration::from_secs(1)); |
187 | |
188 | // This creates a dependency from `pool2` -> `pool1` |
189 | pool1.install(|| { |
190 | // Give `pool2` time to fall asleep. |
191 | thread::sleep(time::Duration::from_secs(1)); |
192 | |
193 | // If they blocked on inter-pool installs, there would be no |
194 | // threads left to run this! |
195 | true |
196 | }) |
197 | }) |
198 | }); |
199 | assert!(ok); |
200 | } |
201 | |
202 | #[test] |
203 | #[allow (deprecated)] |
204 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
205 | fn check_thread_pool_new() { |
206 | let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap(); |
207 | assert_eq!(pool.current_num_threads(), 22); |
208 | } |
209 | |
210 | macro_rules! test_scope_order { |
211 | ($scope:ident => $spawn:ident) => {{ |
212 | let builder = ThreadPoolBuilder::new().num_threads(1); |
213 | let pool = builder.build().unwrap(); |
214 | pool.install(|| { |
215 | let vec = Mutex::new(vec![]); |
216 | pool.$scope(|scope| { |
217 | let vec = &vec; |
218 | for i in 0..10 { |
219 | scope.$spawn(move |_| { |
220 | vec.lock().unwrap().push(i); |
221 | }); |
222 | } |
223 | }); |
224 | vec.into_inner().unwrap() |
225 | }) |
226 | }}; |
227 | } |
228 | |
229 | #[test] |
230 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
231 | fn scope_lifo_order() { |
232 | let vec = test_scope_order!(scope => spawn); |
233 | let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed |
234 | assert_eq!(vec, expected); |
235 | } |
236 | |
237 | #[test] |
238 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
239 | fn scope_fifo_order() { |
240 | let vec = test_scope_order!(scope_fifo => spawn_fifo); |
241 | let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order |
242 | assert_eq!(vec, expected); |
243 | } |
244 | |
245 | macro_rules! test_spawn_order { |
246 | ($spawn:ident) => {{ |
247 | let builder = ThreadPoolBuilder::new().num_threads(1); |
248 | let pool = &builder.build().unwrap(); |
249 | let (tx, rx) = channel(); |
250 | pool.install(move || { |
251 | for i in 0..10 { |
252 | let tx = tx.clone(); |
253 | pool.$spawn(move || { |
254 | tx.send(i).unwrap(); |
255 | }); |
256 | } |
257 | }); |
258 | rx.iter().collect::<Vec<i32>>() |
259 | }}; |
260 | } |
261 | |
262 | #[test] |
263 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
264 | fn spawn_lifo_order() { |
265 | let vec = test_spawn_order!(spawn); |
266 | let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed |
267 | assert_eq!(vec, expected); |
268 | } |
269 | |
270 | #[test] |
271 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
272 | fn spawn_fifo_order() { |
273 | let vec = test_spawn_order!(spawn_fifo); |
274 | let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order |
275 | assert_eq!(vec, expected); |
276 | } |
277 | |
278 | #[test] |
279 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
280 | fn nested_scopes() { |
281 | // Create matching scopes for every thread pool. |
282 | fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP) |
283 | where |
284 | OP: FnOnce(&[&Scope<'scope>]) + Send, |
285 | { |
286 | if let Some((pool, tail)) = pools.split_first() { |
287 | pool.scope(move |s| { |
288 | // This move reduces the reference lifetimes by variance to match s, |
289 | // but the actual scopes are still tied to the invariant 'scope. |
290 | let mut scopes = scopes; |
291 | scopes.push(s); |
292 | nest(tail, scopes, op) |
293 | }) |
294 | } else { |
295 | (op)(&scopes) |
296 | } |
297 | } |
298 | |
299 | let pools: Vec<_> = (0..10) |
300 | .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) |
301 | .collect(); |
302 | |
303 | let counter = AtomicUsize::new(0); |
304 | nest(&pools, vec![], |scopes| { |
305 | for &s in scopes { |
306 | s.spawn(|_| { |
307 | // Our 'scope lets us borrow the counter in every pool. |
308 | counter.fetch_add(1, Ordering::Relaxed); |
309 | }); |
310 | } |
311 | }); |
312 | assert_eq!(counter.into_inner(), pools.len()); |
313 | } |
314 | |
315 | #[test] |
316 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
317 | fn nested_fifo_scopes() { |
318 | // Create matching fifo scopes for every thread pool. |
319 | fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP) |
320 | where |
321 | OP: FnOnce(&[&ScopeFifo<'scope>]) + Send, |
322 | { |
323 | if let Some((pool, tail)) = pools.split_first() { |
324 | pool.scope_fifo(move |s| { |
325 | // This move reduces the reference lifetimes by variance to match s, |
326 | // but the actual scopes are still tied to the invariant 'scope. |
327 | let mut scopes = scopes; |
328 | scopes.push(s); |
329 | nest(tail, scopes, op) |
330 | }) |
331 | } else { |
332 | (op)(&scopes) |
333 | } |
334 | } |
335 | |
336 | let pools: Vec<_> = (0..10) |
337 | .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap()) |
338 | .collect(); |
339 | |
340 | let counter = AtomicUsize::new(0); |
341 | nest(&pools, vec![], |scopes| { |
342 | for &s in scopes { |
343 | s.spawn_fifo(|_| { |
344 | // Our 'scope lets us borrow the counter in every pool. |
345 | counter.fetch_add(1, Ordering::Relaxed); |
346 | }); |
347 | } |
348 | }); |
349 | assert_eq!(counter.into_inner(), pools.len()); |
350 | } |
351 | |
352 | #[test] |
353 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
354 | fn in_place_scope_no_deadlock() { |
355 | let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
356 | let (tx, rx) = channel(); |
357 | let rx_ref = ℞ |
358 | pool.in_place_scope(move |s| { |
359 | // With regular scopes this closure would never run because this scope op |
360 | // itself would block the only worker thread. |
361 | s.spawn(move |_| { |
362 | tx.send(()).unwrap(); |
363 | }); |
364 | rx_ref.recv().unwrap(); |
365 | }); |
366 | } |
367 | |
368 | #[test] |
369 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
370 | fn in_place_scope_fifo_no_deadlock() { |
371 | let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); |
372 | let (tx, rx) = channel(); |
373 | let rx_ref = ℞ |
374 | pool.in_place_scope_fifo(move |s| { |
375 | // With regular scopes this closure would never run because this scope op |
376 | // itself would block the only worker thread. |
377 | s.spawn_fifo(move |_| { |
378 | tx.send(()).unwrap(); |
379 | }); |
380 | rx_ref.recv().unwrap(); |
381 | }); |
382 | } |
383 | |
384 | #[test] |
385 | fn yield_now_to_spawn() { |
386 | let (tx, rx) = channel(); |
387 | |
388 | // Queue a regular spawn. |
389 | crate::spawn(move || tx.send(22).unwrap()); |
390 | |
391 | // The single-threaded fallback mode (for wasm etc.) won't |
392 | // get a chance to run the spawn if we never yield to it. |
393 | crate::registry::in_worker(move |_, _| { |
394 | crate::yield_now(); |
395 | }); |
396 | |
397 | // The spawn **must** have started by now, but we still might have to wait |
398 | // for it to finish if a different thread stole it first. |
399 | assert_eq!(22, rx.recv().unwrap()); |
400 | } |
401 | |
402 | #[test] |
403 | fn yield_local_to_spawn() { |
404 | let (tx, rx) = channel(); |
405 | |
406 | // Queue a regular spawn. |
407 | crate::spawn(move || tx.send(22).unwrap()); |
408 | |
409 | // The single-threaded fallback mode (for wasm etc.) won't |
410 | // get a chance to run the spawn if we never yield to it. |
411 | crate::registry::in_worker(move |_, _| { |
412 | crate::yield_local(); |
413 | }); |
414 | |
415 | // The spawn **must** have started by now, but we still might have to wait |
416 | // for it to finish if a different thread stole it first. |
417 | assert_eq!(22, rx.recv().unwrap()); |
418 | } |
419 | |