1#![cfg(test)]
2
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::mpsc::channel;
5use std::sync::{Arc, Mutex};
6
7use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
8
9#[test]
10#[should_panic(expected = "Hello, world!")]
11fn panic_propagate() {
12 let thread_pool = ThreadPoolBuilder::new().build().unwrap();
13 thread_pool.install(|| {
14 panic!("Hello, world!");
15 });
16}
17
18#[test]
19#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
20fn workers_stop() {
21 let registry;
22
23 {
24 // once we exit this block, thread-pool will be dropped
25 let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
26 registry = thread_pool.install(|| {
27 // do some work on these threads
28 join_a_lot(22);
29
30 Arc::clone(&thread_pool.registry)
31 });
32 assert_eq!(registry.num_threads(), 22);
33 }
34
35 // once thread-pool is dropped, registry should terminate, which
36 // should lead to worker threads stopping
37 registry.wait_until_stopped();
38}
39
40fn join_a_lot(n: usize) {
41 if n > 0 {
42 join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
43 }
44}
45
46#[test]
47#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
48fn sleeper_stop() {
49 use std::{thread, time};
50
51 let registry;
52
53 {
54 // once we exit this block, thread-pool will be dropped
55 let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
56 registry = Arc::clone(&thread_pool.registry);
57
58 // Give time for at least some of the thread pool to fall asleep.
59 thread::sleep(time::Duration::from_secs(1));
60 }
61
62 // once thread-pool is dropped, registry should terminate, which
63 // should lead to worker threads stopping
64 registry.wait_until_stopped();
65}
66
67/// Creates a start/exit handler that increments an atomic counter.
68fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
69 let count = Arc::new(AtomicUsize::new(0));
70 (Arc::clone(&count), move |_| {
71 count.fetch_add(1, Ordering::SeqCst);
72 })
73}
74
75/// Wait until a counter is no longer shared, then return its value.
76fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
77 use std::{thread, time};
78
79 for _ in 0..60 {
80 counter = match Arc::try_unwrap(counter) {
81 Ok(counter) => return counter.into_inner(),
82 Err(counter) => {
83 thread::sleep(time::Duration::from_secs(1));
84 counter
85 }
86 };
87 }
88
89 // That's too long!
90 panic!("Counter is still shared!");
91}
92
93#[test]
94#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
95fn failed_thread_stack() {
96 // Note: we first tried to force failure with a `usize::MAX` stack, but
97 // macOS and Windows weren't fazed, or at least didn't fail the way we want.
98 // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
99 // 2GB stack, so it might not fail until the second thread.
100 let stack_size = ::std::isize::MAX as usize;
101
102 let (start_count, start_handler) = count_handler();
103 let (exit_count, exit_handler) = count_handler();
104 let builder = ThreadPoolBuilder::new()
105 .num_threads(10)
106 .stack_size(stack_size)
107 .start_handler(start_handler)
108 .exit_handler(exit_handler);
109
110 let pool = builder.build();
111 assert!(pool.is_err(), "thread stack should have failed!");
112
113 // With such a huge stack, 64-bit will probably fail on the first thread;
114 // 32-bit might manage the first 2GB, but certainly fail the second.
115 let start_count = wait_for_counter(start_count);
116 assert!(start_count <= 1);
117 assert_eq!(start_count, wait_for_counter(exit_count));
118}
119
120#[test]
121#[cfg_attr(not(panic = "unwind"), ignore)]
122fn panic_thread_name() {
123 let (start_count, start_handler) = count_handler();
124 let (exit_count, exit_handler) = count_handler();
125 let builder = ThreadPoolBuilder::new()
126 .num_threads(10)
127 .start_handler(start_handler)
128 .exit_handler(exit_handler)
129 .thread_name(|i| {
130 if i >= 5 {
131 panic!();
132 }
133 format!("panic_thread_name#{}", i)
134 });
135
136 let pool = crate::unwind::halt_unwinding(|| builder.build());
137 assert!(pool.is_err(), "thread-name panic should propagate!");
138
139 // Assuming they're created in order, threads 0 through 4 should have
140 // been started already, and then terminated by the panic.
141 assert_eq!(5, wait_for_counter(start_count));
142 assert_eq!(5, wait_for_counter(exit_count));
143}
144
145#[test]
146#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
147fn self_install() {
148 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
149
150 // If the inner `install` blocks, then nothing will actually run it!
151 assert!(pool.install(|| pool.install(|| true)));
152}
153
154#[test]
155#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
156fn mutual_install() {
157 let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
158 let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
159
160 let ok = pool1.install(|| {
161 // This creates a dependency from `pool1` -> `pool2`
162 pool2.install(|| {
163 // This creates a dependency from `pool2` -> `pool1`
164 pool1.install(|| {
165 // If they blocked on inter-pool installs, there would be no
166 // threads left to run this!
167 true
168 })
169 })
170 });
171 assert!(ok);
172}
173
174#[test]
175#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
176fn mutual_install_sleepy() {
177 use std::{thread, time};
178
179 let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
180 let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
181
182 let ok = pool1.install(|| {
183 // This creates a dependency from `pool1` -> `pool2`
184 pool2.install(|| {
185 // Give `pool1` time to fall asleep.
186 thread::sleep(time::Duration::from_secs(1));
187
188 // This creates a dependency from `pool2` -> `pool1`
189 pool1.install(|| {
190 // Give `pool2` time to fall asleep.
191 thread::sleep(time::Duration::from_secs(1));
192
193 // If they blocked on inter-pool installs, there would be no
194 // threads left to run this!
195 true
196 })
197 })
198 });
199 assert!(ok);
200}
201
202#[test]
203#[allow(deprecated)]
204#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
205fn check_thread_pool_new() {
206 let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
207 assert_eq!(pool.current_num_threads(), 22);
208}
209
210macro_rules! test_scope_order {
211 ($scope:ident => $spawn:ident) => {{
212 let builder = ThreadPoolBuilder::new().num_threads(1);
213 let pool = builder.build().unwrap();
214 pool.install(|| {
215 let vec = Mutex::new(vec![]);
216 pool.$scope(|scope| {
217 let vec = &vec;
218 for i in 0..10 {
219 scope.$spawn(move |_| {
220 vec.lock().unwrap().push(i);
221 });
222 }
223 });
224 vec.into_inner().unwrap()
225 })
226 }};
227}
228
229#[test]
230#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
231fn scope_lifo_order() {
232 let vec = test_scope_order!(scope => spawn);
233 let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
234 assert_eq!(vec, expected);
235}
236
237#[test]
238#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
239fn scope_fifo_order() {
240 let vec = test_scope_order!(scope_fifo => spawn_fifo);
241 let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
242 assert_eq!(vec, expected);
243}
244
245macro_rules! test_spawn_order {
246 ($spawn:ident) => {{
247 let builder = ThreadPoolBuilder::new().num_threads(1);
248 let pool = &builder.build().unwrap();
249 let (tx, rx) = channel();
250 pool.install(move || {
251 for i in 0..10 {
252 let tx = tx.clone();
253 pool.$spawn(move || {
254 tx.send(i).unwrap();
255 });
256 }
257 });
258 rx.iter().collect::<Vec<i32>>()
259 }};
260}
261
262#[test]
263#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
264fn spawn_lifo_order() {
265 let vec = test_spawn_order!(spawn);
266 let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
267 assert_eq!(vec, expected);
268}
269
270#[test]
271#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
272fn spawn_fifo_order() {
273 let vec = test_spawn_order!(spawn_fifo);
274 let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
275 assert_eq!(vec, expected);
276}
277
278#[test]
279#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
280fn nested_scopes() {
281 // Create matching scopes for every thread pool.
282 fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
283 where
284 OP: FnOnce(&[&Scope<'scope>]) + Send,
285 {
286 if let Some((pool, tail)) = pools.split_first() {
287 pool.scope(move |s| {
288 // This move reduces the reference lifetimes by variance to match s,
289 // but the actual scopes are still tied to the invariant 'scope.
290 let mut scopes = scopes;
291 scopes.push(s);
292 nest(tail, scopes, op)
293 })
294 } else {
295 (op)(&scopes)
296 }
297 }
298
299 let pools: Vec<_> = (0..10)
300 .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
301 .collect();
302
303 let counter = AtomicUsize::new(0);
304 nest(&pools, vec![], |scopes| {
305 for &s in scopes {
306 s.spawn(|_| {
307 // Our 'scope lets us borrow the counter in every pool.
308 counter.fetch_add(1, Ordering::Relaxed);
309 });
310 }
311 });
312 assert_eq!(counter.into_inner(), pools.len());
313}
314
315#[test]
316#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
317fn nested_fifo_scopes() {
318 // Create matching fifo scopes for every thread pool.
319 fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
320 where
321 OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
322 {
323 if let Some((pool, tail)) = pools.split_first() {
324 pool.scope_fifo(move |s| {
325 // This move reduces the reference lifetimes by variance to match s,
326 // but the actual scopes are still tied to the invariant 'scope.
327 let mut scopes = scopes;
328 scopes.push(s);
329 nest(tail, scopes, op)
330 })
331 } else {
332 (op)(&scopes)
333 }
334 }
335
336 let pools: Vec<_> = (0..10)
337 .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
338 .collect();
339
340 let counter = AtomicUsize::new(0);
341 nest(&pools, vec![], |scopes| {
342 for &s in scopes {
343 s.spawn_fifo(|_| {
344 // Our 'scope lets us borrow the counter in every pool.
345 counter.fetch_add(1, Ordering::Relaxed);
346 });
347 }
348 });
349 assert_eq!(counter.into_inner(), pools.len());
350}
351
352#[test]
353#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
354fn in_place_scope_no_deadlock() {
355 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
356 let (tx, rx) = channel();
357 let rx_ref = &rx;
358 pool.in_place_scope(move |s| {
359 // With regular scopes this closure would never run because this scope op
360 // itself would block the only worker thread.
361 s.spawn(move |_| {
362 tx.send(()).unwrap();
363 });
364 rx_ref.recv().unwrap();
365 });
366}
367
368#[test]
369#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
370fn in_place_scope_fifo_no_deadlock() {
371 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
372 let (tx, rx) = channel();
373 let rx_ref = &rx;
374 pool.in_place_scope_fifo(move |s| {
375 // With regular scopes this closure would never run because this scope op
376 // itself would block the only worker thread.
377 s.spawn_fifo(move |_| {
378 tx.send(()).unwrap();
379 });
380 rx_ref.recv().unwrap();
381 });
382}
383
384#[test]
385fn yield_now_to_spawn() {
386 let (tx, rx) = channel();
387
388 // Queue a regular spawn.
389 crate::spawn(move || tx.send(22).unwrap());
390
391 // The single-threaded fallback mode (for wasm etc.) won't
392 // get a chance to run the spawn if we never yield to it.
393 crate::registry::in_worker(move |_, _| {
394 crate::yield_now();
395 });
396
397 // The spawn **must** have started by now, but we still might have to wait
398 // for it to finish if a different thread stole it first.
399 assert_eq!(22, rx.recv().unwrap());
400}
401
402#[test]
403fn yield_local_to_spawn() {
404 let (tx, rx) = channel();
405
406 // Queue a regular spawn.
407 crate::spawn(move || tx.send(22).unwrap());
408
409 // The single-threaded fallback mode (for wasm etc.) won't
410 // get a chance to run the spawn if we never yield to it.
411 crate::registry::in_worker(move |_, _| {
412 crate::yield_local();
413 });
414
415 // The spawn **must** have started by now, but we still might have to wait
416 // for it to finish if a different thread stole it first.
417 assert_eq!(22, rx.recv().unwrap());
418}
419