1#![cfg(test)]
2
3use crate::ThreadPoolBuilder;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::mpsc::channel;
6use std::sync::Arc;
7use std::{thread, time};
8
9#[test]
10fn broadcast_global() {
11 let v = crate::broadcast(|ctx| ctx.index());
12 assert!(v.into_iter().eq(0..crate::current_num_threads()));
13}
14
15#[test]
16#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
17fn spawn_broadcast_global() {
18 let (tx, rx) = channel();
19 crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
20
21 let mut v: Vec<_> = rx.into_iter().collect();
22 v.sort_unstable();
23 assert!(v.into_iter().eq(0..crate::current_num_threads()));
24}
25
26#[test]
27#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
28fn broadcast_pool() {
29 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
30 let v = pool.broadcast(|ctx| ctx.index());
31 assert!(v.into_iter().eq(0..7));
32}
33
34#[test]
35#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
36fn spawn_broadcast_pool() {
37 let (tx, rx) = channel();
38 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
39 pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
40
41 let mut v: Vec<_> = rx.into_iter().collect();
42 v.sort_unstable();
43 assert!(v.into_iter().eq(0..7));
44}
45
46#[test]
47#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
48fn broadcast_self() {
49 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
50 let v = pool.install(|| crate::broadcast(|ctx| ctx.index()));
51 assert!(v.into_iter().eq(0..7));
52}
53
54#[test]
55#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
56fn spawn_broadcast_self() {
57 let (tx, rx) = channel();
58 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
59 pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()));
60
61 let mut v: Vec<_> = rx.into_iter().collect();
62 v.sort_unstable();
63 assert!(v.into_iter().eq(0..7));
64}
65
66#[test]
67#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
68fn broadcast_mutual() {
69 let count = AtomicUsize::new(0);
70 let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
71 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
72 pool1.install(|| {
73 pool2.broadcast(|_| {
74 pool1.broadcast(|_| {
75 count.fetch_add(1, Ordering::Relaxed);
76 })
77 })
78 });
79 assert_eq!(count.into_inner(), 3 * 7);
80}
81
82#[test]
83#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
84fn spawn_broadcast_mutual() {
85 let (tx, rx) = channel();
86 let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
87 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
88 pool1.spawn({
89 let pool1 = Arc::clone(&pool1);
90 move || {
91 pool2.spawn_broadcast(move |_| {
92 let tx = tx.clone();
93 pool1.spawn_broadcast(move |_| tx.send(()).unwrap())
94 })
95 }
96 });
97 assert_eq!(rx.into_iter().count(), 3 * 7);
98}
99
100#[test]
101#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
102fn broadcast_mutual_sleepy() {
103 let count = AtomicUsize::new(0);
104 let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
105 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
106 pool1.install(|| {
107 thread::sleep(time::Duration::from_secs(1));
108 pool2.broadcast(|_| {
109 thread::sleep(time::Duration::from_secs(1));
110 pool1.broadcast(|_| {
111 thread::sleep(time::Duration::from_millis(100));
112 count.fetch_add(1, Ordering::Relaxed);
113 })
114 })
115 });
116 assert_eq!(count.into_inner(), 3 * 7);
117}
118
119#[test]
120#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
121fn spawn_broadcast_mutual_sleepy() {
122 let (tx, rx) = channel();
123 let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap());
124 let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
125 pool1.spawn({
126 let pool1 = Arc::clone(&pool1);
127 move || {
128 thread::sleep(time::Duration::from_secs(1));
129 pool2.spawn_broadcast(move |_| {
130 let tx = tx.clone();
131 thread::sleep(time::Duration::from_secs(1));
132 pool1.spawn_broadcast(move |_| {
133 thread::sleep(time::Duration::from_millis(100));
134 tx.send(()).unwrap();
135 })
136 })
137 }
138 });
139 assert_eq!(rx.into_iter().count(), 3 * 7);
140}
141
142#[test]
143#[cfg_attr(not(panic = "unwind"), ignore)]
144fn broadcast_panic_one() {
145 let count = AtomicUsize::new(0);
146 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
147 let result = crate::unwind::halt_unwinding(|| {
148 pool.broadcast(|ctx| {
149 count.fetch_add(1, Ordering::Relaxed);
150 if ctx.index() == 3 {
151 panic!("Hello, world!");
152 }
153 })
154 });
155 assert_eq!(count.into_inner(), 7);
156 assert!(result.is_err(), "broadcast panic should propagate!");
157}
158
159#[test]
160#[cfg_attr(not(panic = "unwind"), ignore)]
161fn spawn_broadcast_panic_one() {
162 let (tx, rx) = channel();
163 let (panic_tx, panic_rx) = channel();
164 let pool = ThreadPoolBuilder::new()
165 .num_threads(7)
166 .panic_handler(move |e| panic_tx.send(e).unwrap())
167 .build()
168 .unwrap();
169 pool.spawn_broadcast(move |ctx| {
170 tx.send(()).unwrap();
171 if ctx.index() == 3 {
172 panic!("Hello, world!");
173 }
174 });
175 drop(pool); // including panic_tx
176 assert_eq!(rx.into_iter().count(), 7);
177 assert_eq!(panic_rx.into_iter().count(), 1);
178}
179
180#[test]
181#[cfg_attr(not(panic = "unwind"), ignore)]
182fn broadcast_panic_many() {
183 let count = AtomicUsize::new(0);
184 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
185 let result = crate::unwind::halt_unwinding(|| {
186 pool.broadcast(|ctx| {
187 count.fetch_add(1, Ordering::Relaxed);
188 if ctx.index() % 2 == 0 {
189 panic!("Hello, world!");
190 }
191 })
192 });
193 assert_eq!(count.into_inner(), 7);
194 assert!(result.is_err(), "broadcast panic should propagate!");
195}
196
197#[test]
198#[cfg_attr(not(panic = "unwind"), ignore)]
199fn spawn_broadcast_panic_many() {
200 let (tx, rx) = channel();
201 let (panic_tx, panic_rx) = channel();
202 let pool = ThreadPoolBuilder::new()
203 .num_threads(7)
204 .panic_handler(move |e| panic_tx.send(e).unwrap())
205 .build()
206 .unwrap();
207 pool.spawn_broadcast(move |ctx| {
208 tx.send(()).unwrap();
209 if ctx.index() % 2 == 0 {
210 panic!("Hello, world!");
211 }
212 });
213 drop(pool); // including panic_tx
214 assert_eq!(rx.into_iter().count(), 7);
215 assert_eq!(panic_rx.into_iter().count(), 4);
216}
217
218#[test]
219#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
220fn broadcast_sleep_race() {
221 let test_duration = time::Duration::from_secs(1);
222 let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap();
223 let start = time::Instant::now();
224 while start.elapsed() < test_duration {
225 pool.broadcast(|ctx| {
226 // A slight spread of sleep duration increases the chance that one
227 // of the threads will race in the pool's idle sleep afterward.
228 thread::sleep(time::Duration::from_micros(ctx.index() as u64));
229 });
230 }
231}
232
233#[test]
234fn broadcast_after_spawn_broadcast() {
235 let (tx, rx) = channel();
236
237 // Queue a non-blocking spawn_broadcast.
238 crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap());
239
240 // This blocking broadcast runs after all prior broadcasts.
241 crate::broadcast(|_| {});
242
243 // The spawn_broadcast **must** have run by now on all threads.
244 let mut v: Vec<_> = rx.try_iter().collect();
245 v.sort_unstable();
246 assert!(v.into_iter().eq(0..crate::current_num_threads()));
247}
248
249#[test]
250fn broadcast_after_spawn() {
251 let (tx, rx) = channel();
252
253 // Queue a regular spawn on a thread-local deque.
254 crate::registry::in_worker(move |_, _| {
255 crate::spawn(move || tx.send(22).unwrap());
256 });
257
258 // Broadcast runs after the local deque is empty.
259 crate::broadcast(|_| {});
260
261 // The spawn **must** have run by now.
262 assert_eq!(22, rx.try_recv().unwrap());
263}
264