1 | #![cfg (test)] |
2 | |
3 | use crate::ThreadPoolBuilder; |
4 | use std::sync::atomic::{AtomicUsize, Ordering}; |
5 | use std::sync::mpsc::channel; |
6 | use std::sync::Arc; |
7 | use std::{thread, time}; |
8 | |
9 | #[test] |
10 | fn broadcast_global() { |
11 | let v = crate::broadcast(|ctx| ctx.index()); |
12 | assert!(v.into_iter().eq(0..crate::current_num_threads())); |
13 | } |
14 | |
15 | #[test] |
16 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
17 | fn spawn_broadcast_global() { |
18 | let (tx, rx) = channel(); |
19 | crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); |
20 | |
21 | let mut v: Vec<_> = rx.into_iter().collect(); |
22 | v.sort_unstable(); |
23 | assert!(v.into_iter().eq(0..crate::current_num_threads())); |
24 | } |
25 | |
26 | #[test] |
27 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
28 | fn broadcast_pool() { |
29 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
30 | let v = pool.broadcast(|ctx| ctx.index()); |
31 | assert!(v.into_iter().eq(0..7)); |
32 | } |
33 | |
34 | #[test] |
35 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
36 | fn spawn_broadcast_pool() { |
37 | let (tx, rx) = channel(); |
38 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
39 | pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); |
40 | |
41 | let mut v: Vec<_> = rx.into_iter().collect(); |
42 | v.sort_unstable(); |
43 | assert!(v.into_iter().eq(0..7)); |
44 | } |
45 | |
46 | #[test] |
47 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
48 | fn broadcast_self() { |
49 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
50 | let v = pool.install(|| crate::broadcast(|ctx| ctx.index())); |
51 | assert!(v.into_iter().eq(0..7)); |
52 | } |
53 | |
54 | #[test] |
55 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
56 | fn spawn_broadcast_self() { |
57 | let (tx, rx) = channel(); |
58 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
59 | pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); |
60 | |
61 | let mut v: Vec<_> = rx.into_iter().collect(); |
62 | v.sort_unstable(); |
63 | assert!(v.into_iter().eq(0..7)); |
64 | } |
65 | |
66 | #[test] |
67 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
68 | fn broadcast_mutual() { |
69 | let count = AtomicUsize::new(0); |
70 | let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); |
71 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
72 | pool1.install(|| { |
73 | pool2.broadcast(|_| { |
74 | pool1.broadcast(|_| { |
75 | count.fetch_add(1, Ordering::Relaxed); |
76 | }) |
77 | }) |
78 | }); |
79 | assert_eq!(count.into_inner(), 3 * 7); |
80 | } |
81 | |
82 | #[test] |
83 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
84 | fn spawn_broadcast_mutual() { |
85 | let (tx, rx) = channel(); |
86 | let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); |
87 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
88 | pool1.spawn({ |
89 | let pool1 = Arc::clone(&pool1); |
90 | move || { |
91 | pool2.spawn_broadcast(move |_| { |
92 | let tx = tx.clone(); |
93 | pool1.spawn_broadcast(move |_| tx.send(()).unwrap()) |
94 | }) |
95 | } |
96 | }); |
97 | assert_eq!(rx.into_iter().count(), 3 * 7); |
98 | } |
99 | |
100 | #[test] |
101 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
102 | fn broadcast_mutual_sleepy() { |
103 | let count = AtomicUsize::new(0); |
104 | let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); |
105 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
106 | pool1.install(|| { |
107 | thread::sleep(time::Duration::from_secs(1)); |
108 | pool2.broadcast(|_| { |
109 | thread::sleep(time::Duration::from_secs(1)); |
110 | pool1.broadcast(|_| { |
111 | thread::sleep(time::Duration::from_millis(100)); |
112 | count.fetch_add(1, Ordering::Relaxed); |
113 | }) |
114 | }) |
115 | }); |
116 | assert_eq!(count.into_inner(), 3 * 7); |
117 | } |
118 | |
119 | #[test] |
120 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
121 | fn spawn_broadcast_mutual_sleepy() { |
122 | let (tx, rx) = channel(); |
123 | let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); |
124 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
125 | pool1.spawn({ |
126 | let pool1 = Arc::clone(&pool1); |
127 | move || { |
128 | thread::sleep(time::Duration::from_secs(1)); |
129 | pool2.spawn_broadcast(move |_| { |
130 | let tx = tx.clone(); |
131 | thread::sleep(time::Duration::from_secs(1)); |
132 | pool1.spawn_broadcast(move |_| { |
133 | thread::sleep(time::Duration::from_millis(100)); |
134 | tx.send(()).unwrap(); |
135 | }) |
136 | }) |
137 | } |
138 | }); |
139 | assert_eq!(rx.into_iter().count(), 3 * 7); |
140 | } |
141 | |
142 | #[test] |
143 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
144 | fn broadcast_panic_one() { |
145 | let count = AtomicUsize::new(0); |
146 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
147 | let result = crate::unwind::halt_unwinding(|| { |
148 | pool.broadcast(|ctx| { |
149 | count.fetch_add(1, Ordering::Relaxed); |
150 | if ctx.index() == 3 { |
151 | panic!("Hello, world!" ); |
152 | } |
153 | }) |
154 | }); |
155 | assert_eq!(count.into_inner(), 7); |
156 | assert!(result.is_err(), "broadcast panic should propagate!" ); |
157 | } |
158 | |
159 | #[test] |
160 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
161 | fn spawn_broadcast_panic_one() { |
162 | let (tx, rx) = channel(); |
163 | let (panic_tx, panic_rx) = channel(); |
164 | let pool = ThreadPoolBuilder::new() |
165 | .num_threads(7) |
166 | .panic_handler(move |e| panic_tx.send(e).unwrap()) |
167 | .build() |
168 | .unwrap(); |
169 | pool.spawn_broadcast(move |ctx| { |
170 | tx.send(()).unwrap(); |
171 | if ctx.index() == 3 { |
172 | panic!("Hello, world!" ); |
173 | } |
174 | }); |
175 | drop(pool); // including panic_tx |
176 | assert_eq!(rx.into_iter().count(), 7); |
177 | assert_eq!(panic_rx.into_iter().count(), 1); |
178 | } |
179 | |
180 | #[test] |
181 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
182 | fn broadcast_panic_many() { |
183 | let count = AtomicUsize::new(0); |
184 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
185 | let result = crate::unwind::halt_unwinding(|| { |
186 | pool.broadcast(|ctx| { |
187 | count.fetch_add(1, Ordering::Relaxed); |
188 | if ctx.index() % 2 == 0 { |
189 | panic!("Hello, world!" ); |
190 | } |
191 | }) |
192 | }); |
193 | assert_eq!(count.into_inner(), 7); |
194 | assert!(result.is_err(), "broadcast panic should propagate!" ); |
195 | } |
196 | |
197 | #[test] |
198 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
199 | fn spawn_broadcast_panic_many() { |
200 | let (tx, rx) = channel(); |
201 | let (panic_tx, panic_rx) = channel(); |
202 | let pool = ThreadPoolBuilder::new() |
203 | .num_threads(7) |
204 | .panic_handler(move |e| panic_tx.send(e).unwrap()) |
205 | .build() |
206 | .unwrap(); |
207 | pool.spawn_broadcast(move |ctx| { |
208 | tx.send(()).unwrap(); |
209 | if ctx.index() % 2 == 0 { |
210 | panic!("Hello, world!" ); |
211 | } |
212 | }); |
213 | drop(pool); // including panic_tx |
214 | assert_eq!(rx.into_iter().count(), 7); |
215 | assert_eq!(panic_rx.into_iter().count(), 4); |
216 | } |
217 | |
218 | #[test] |
219 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
220 | fn broadcast_sleep_race() { |
221 | let test_duration = time::Duration::from_secs(1); |
222 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
223 | let start = time::Instant::now(); |
224 | while start.elapsed() < test_duration { |
225 | pool.broadcast(|ctx| { |
226 | // A slight spread of sleep duration increases the chance that one |
227 | // of the threads will race in the pool's idle sleep afterward. |
228 | thread::sleep(time::Duration::from_micros(ctx.index() as u64)); |
229 | }); |
230 | } |
231 | } |
232 | |
233 | #[test] |
234 | fn broadcast_after_spawn_broadcast() { |
235 | let (tx, rx) = channel(); |
236 | |
237 | // Queue a non-blocking spawn_broadcast. |
238 | crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); |
239 | |
240 | // This blocking broadcast runs after all prior broadcasts. |
241 | crate::broadcast(|_| {}); |
242 | |
243 | // The spawn_broadcast **must** have run by now on all threads. |
244 | let mut v: Vec<_> = rx.try_iter().collect(); |
245 | v.sort_unstable(); |
246 | assert!(v.into_iter().eq(0..crate::current_num_threads())); |
247 | } |
248 | |
249 | #[test] |
250 | fn broadcast_after_spawn() { |
251 | let (tx, rx) = channel(); |
252 | |
253 | // Queue a regular spawn on a thread-local deque. |
254 | crate::registry::in_worker(move |_, _| { |
255 | crate::spawn(move || tx.send(22).unwrap()); |
256 | }); |
257 | |
258 | // Broadcast runs after the local deque is empty. |
259 | crate::broadcast(|_| {}); |
260 | |
261 | // The spawn **must** have run by now. |
262 | assert_eq!(22, rx.try_recv().unwrap()); |
263 | } |
264 | |