1 | use crate::scope; |
2 | use std::any::Any; |
3 | use std::sync::mpsc::channel; |
4 | use std::sync::Mutex; |
5 | |
6 | use super::{spawn, spawn_fifo}; |
7 | use crate::ThreadPoolBuilder; |
8 | |
9 | #[test] |
10 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
11 | fn spawn_then_join_in_worker() { |
12 | let (tx, rx) = channel(); |
13 | scope(move |_| { |
14 | spawn(move || tx.send(22).unwrap()); |
15 | }); |
16 | assert_eq!(22, rx.recv().unwrap()); |
17 | } |
18 | |
19 | #[test] |
20 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
21 | fn spawn_then_join_outside_worker() { |
22 | let (tx, rx) = channel(); |
23 | spawn(move || tx.send(22).unwrap()); |
24 | assert_eq!(22, rx.recv().unwrap()); |
25 | } |
26 | |
27 | #[test] |
28 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
29 | fn panic_fwd() { |
30 | let (tx, rx) = channel(); |
31 | |
32 | let tx = Mutex::new(tx); |
33 | let panic_handler = move |err: Box<dyn Any + Send>| { |
34 | let tx = tx.lock().unwrap(); |
35 | if let Some(&msg) = err.downcast_ref::<&str>() { |
36 | if msg == "Hello, world!" { |
37 | tx.send(1).unwrap(); |
38 | } else { |
39 | tx.send(2).unwrap(); |
40 | } |
41 | } else { |
42 | tx.send(3).unwrap(); |
43 | } |
44 | }; |
45 | |
46 | let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); |
47 | |
48 | builder |
49 | .build() |
50 | .unwrap() |
51 | .spawn(move || panic!("Hello, world!" )); |
52 | |
53 | assert_eq!(1, rx.recv().unwrap()); |
54 | } |
55 | |
56 | /// Test what happens when the thread-pool is dropped but there are |
57 | /// still active asynchronous tasks. We expect the thread-pool to stay |
58 | /// alive and executing until those threads are complete. |
59 | #[test] |
60 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
61 | fn termination_while_things_are_executing() { |
62 | let (tx0, rx0) = channel(); |
63 | let (tx1, rx1) = channel(); |
64 | |
65 | // Create a thread-pool and spawn some code in it, but then drop |
66 | // our reference to it. |
67 | { |
68 | let thread_pool = ThreadPoolBuilder::new().build().unwrap(); |
69 | thread_pool.spawn(move || { |
70 | let data = rx0.recv().unwrap(); |
71 | |
72 | // At this point, we know the "main" reference to the |
73 | // `ThreadPool` has been dropped, but there are still |
74 | // active threads. Launch one more. |
75 | spawn(move || { |
76 | tx1.send(data).unwrap(); |
77 | }); |
78 | }); |
79 | } |
80 | |
81 | tx0.send(22).unwrap(); |
82 | let v = rx1.recv().unwrap(); |
83 | assert_eq!(v, 22); |
84 | } |
85 | |
86 | #[test] |
87 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
88 | fn custom_panic_handler_and_spawn() { |
89 | let (tx, rx) = channel(); |
90 | |
91 | // Create a parallel closure that will send panics on the |
92 | // channel; since the closure is potentially executed in parallel |
93 | // with itself, we have to wrap `tx` in a mutex. |
94 | let tx = Mutex::new(tx); |
95 | let panic_handler = move |e: Box<dyn Any + Send>| { |
96 | tx.lock().unwrap().send(e).unwrap(); |
97 | }; |
98 | |
99 | // Execute an async that will panic. |
100 | let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); |
101 | builder.build().unwrap().spawn(move || { |
102 | panic!("Hello, world!" ); |
103 | }); |
104 | |
105 | // Check that we got back the panic we expected. |
106 | let error = rx.recv().unwrap(); |
107 | if let Some(&msg) = error.downcast_ref::<&str>() { |
108 | assert_eq!(msg, "Hello, world!" ); |
109 | } else { |
110 | panic!("did not receive a string from panic handler" ); |
111 | } |
112 | } |
113 | |
114 | #[test] |
115 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
116 | fn custom_panic_handler_and_nested_spawn() { |
117 | let (tx, rx) = channel(); |
118 | |
119 | // Create a parallel closure that will send panics on the |
120 | // channel; since the closure is potentially executed in parallel |
121 | // with itself, we have to wrap `tx` in a mutex. |
122 | let tx = Mutex::new(tx); |
123 | let panic_handler = move |e| { |
124 | tx.lock().unwrap().send(e).unwrap(); |
125 | }; |
126 | |
127 | // Execute an async that will (eventually) panic. |
128 | const PANICS: usize = 3; |
129 | let builder = ThreadPoolBuilder::new().panic_handler(panic_handler); |
130 | builder.build().unwrap().spawn(move || { |
131 | // launch 3 nested spawn-asyncs; these should be in the same |
132 | // thread-pool and hence inherit the same panic handler |
133 | for _ in 0..PANICS { |
134 | spawn(move || { |
135 | panic!("Hello, world!" ); |
136 | }); |
137 | } |
138 | }); |
139 | |
140 | // Check that we get back the panics we expected. |
141 | for _ in 0..PANICS { |
142 | let error = rx.recv().unwrap(); |
143 | if let Some(&msg) = error.downcast_ref::<&str>() { |
144 | assert_eq!(msg, "Hello, world!" ); |
145 | } else { |
146 | panic!("did not receive a string from panic handler" ); |
147 | } |
148 | } |
149 | } |
150 | |
151 | macro_rules! test_order { |
152 | ($outer_spawn:ident, $inner_spawn:ident) => {{ |
153 | let builder = ThreadPoolBuilder::new().num_threads(1); |
154 | let pool = builder.build().unwrap(); |
155 | let (tx, rx) = channel(); |
156 | pool.install(move || { |
157 | for i in 0..10 { |
158 | let tx = tx.clone(); |
159 | $outer_spawn(move || { |
160 | for j in 0..10 { |
161 | let tx = tx.clone(); |
162 | $inner_spawn(move || { |
163 | tx.send(i * 10 + j).unwrap(); |
164 | }); |
165 | } |
166 | }); |
167 | } |
168 | }); |
169 | rx.iter().collect::<Vec<i32>>() |
170 | }}; |
171 | } |
172 | |
173 | #[test] |
174 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
175 | fn lifo_order() { |
176 | // In the absence of stealing, `spawn()` jobs on a thread will run in LIFO order. |
177 | let vec = test_order!(spawn, spawn); |
178 | let expected: Vec<i32> = (0..100).rev().collect(); // LIFO -> reversed |
179 | assert_eq!(vec, expected); |
180 | } |
181 | |
182 | #[test] |
183 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
184 | fn fifo_order() { |
185 | // In the absence of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order. |
186 | let vec = test_order!(spawn_fifo, spawn_fifo); |
187 | let expected: Vec<i32> = (0..100).collect(); // FIFO -> natural order |
188 | assert_eq!(vec, expected); |
189 | } |
190 | |
191 | #[test] |
192 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
193 | fn lifo_fifo_order() { |
194 | // LIFO on the outside, FIFO on the inside |
195 | let vec = test_order!(spawn, spawn_fifo); |
196 | let expected: Vec<i32> = (0..10) |
197 | .rev() |
198 | .flat_map(|i| (0..10).map(move |j| i * 10 + j)) |
199 | .collect(); |
200 | assert_eq!(vec, expected); |
201 | } |
202 | |
203 | #[test] |
204 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
205 | fn fifo_lifo_order() { |
206 | // FIFO on the outside, LIFO on the inside |
207 | let vec = test_order!(spawn_fifo, spawn); |
208 | let expected: Vec<i32> = (0..10) |
209 | .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) |
210 | .collect(); |
211 | assert_eq!(vec, expected); |
212 | } |
213 | |
214 | macro_rules! spawn_send { |
215 | ($spawn:ident, $tx:ident, $i:expr) => {{ |
216 | let tx = $tx.clone(); |
217 | $spawn(move || tx.send($i).unwrap()); |
218 | }}; |
219 | } |
220 | |
221 | /// Test mixed spawns pushing a series of numbers, interleaved such |
222 | /// such that negative values are using the second kind of spawn. |
223 | macro_rules! test_mixed_order { |
224 | ($pos_spawn:ident, $neg_spawn:ident) => {{ |
225 | let builder = ThreadPoolBuilder::new().num_threads(1); |
226 | let pool = builder.build().unwrap(); |
227 | let (tx, rx) = channel(); |
228 | pool.install(move || { |
229 | spawn_send!($pos_spawn, tx, 0); |
230 | spawn_send!($neg_spawn, tx, -1); |
231 | spawn_send!($pos_spawn, tx, 1); |
232 | spawn_send!($neg_spawn, tx, -2); |
233 | spawn_send!($pos_spawn, tx, 2); |
234 | spawn_send!($neg_spawn, tx, -3); |
235 | spawn_send!($pos_spawn, tx, 3); |
236 | }); |
237 | rx.iter().collect::<Vec<i32>>() |
238 | }}; |
239 | } |
240 | |
241 | #[test] |
242 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
243 | fn mixed_lifo_fifo_order() { |
244 | let vec = test_mixed_order!(spawn, spawn_fifo); |
245 | let expected = vec![3, -1, 2, -2, 1, -3, 0]; |
246 | assert_eq!(vec, expected); |
247 | } |
248 | |
249 | #[test] |
250 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
251 | fn mixed_fifo_lifo_order() { |
252 | let vec = test_mixed_order!(spawn_fifo, spawn); |
253 | let expected = vec![0, -3, 1, -2, 2, -1, 3]; |
254 | assert_eq!(vec, expected); |
255 | } |
256 | |