| 1 | #![cfg (test)] |
| 2 | |
| 3 | use crate::ThreadPoolBuilder; |
| 4 | use std::sync::atomic::{AtomicUsize, Ordering}; |
| 5 | use std::sync::mpsc::channel; |
| 6 | use std::sync::Arc; |
| 7 | use std::{thread, time}; |
| 8 | |
| 9 | #[test] |
| 10 | fn broadcast_global() { |
| 11 | let v = crate::broadcast(|ctx| ctx.index()); |
| 12 | assert!(v.into_iter().eq(0..crate::current_num_threads())); |
| 13 | } |
| 14 | |
| 15 | #[test] |
| 16 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 17 | fn spawn_broadcast_global() { |
| 18 | let (tx, rx) = channel(); |
| 19 | crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); |
| 20 | |
| 21 | let mut v: Vec<_> = rx.into_iter().collect(); |
| 22 | v.sort_unstable(); |
| 23 | assert!(v.into_iter().eq(0..crate::current_num_threads())); |
| 24 | } |
| 25 | |
| 26 | #[test] |
| 27 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 28 | fn broadcast_pool() { |
| 29 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 30 | let v = pool.broadcast(|ctx| ctx.index()); |
| 31 | assert!(v.into_iter().eq(0..7)); |
| 32 | } |
| 33 | |
| 34 | #[test] |
| 35 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 36 | fn spawn_broadcast_pool() { |
| 37 | let (tx, rx) = channel(); |
| 38 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 39 | pool.spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); |
| 40 | |
| 41 | let mut v: Vec<_> = rx.into_iter().collect(); |
| 42 | v.sort_unstable(); |
| 43 | assert!(v.into_iter().eq(0..7)); |
| 44 | } |
| 45 | |
| 46 | #[test] |
| 47 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 48 | fn broadcast_self() { |
| 49 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 50 | let v = pool.install(|| crate::broadcast(|ctx| ctx.index())); |
| 51 | assert!(v.into_iter().eq(0..7)); |
| 52 | } |
| 53 | |
| 54 | #[test] |
| 55 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 56 | fn spawn_broadcast_self() { |
| 57 | let (tx, rx) = channel(); |
| 58 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 59 | pool.spawn(|| crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap())); |
| 60 | |
| 61 | let mut v: Vec<_> = rx.into_iter().collect(); |
| 62 | v.sort_unstable(); |
| 63 | assert!(v.into_iter().eq(0..7)); |
| 64 | } |
| 65 | |
| 66 | #[test] |
| 67 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 68 | fn broadcast_mutual() { |
| 69 | let count = AtomicUsize::new(0); |
| 70 | let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); |
| 71 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 72 | pool1.install(|| { |
| 73 | pool2.broadcast(|_| { |
| 74 | pool1.broadcast(|_| { |
| 75 | count.fetch_add(1, Ordering::Relaxed); |
| 76 | }) |
| 77 | }) |
| 78 | }); |
| 79 | assert_eq!(count.into_inner(), 3 * 7); |
| 80 | } |
| 81 | |
| 82 | #[test] |
| 83 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 84 | fn spawn_broadcast_mutual() { |
| 85 | let (tx, rx) = channel(); |
| 86 | let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); |
| 87 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 88 | pool1.spawn({ |
| 89 | let pool1 = Arc::clone(&pool1); |
| 90 | move || { |
| 91 | pool2.spawn_broadcast(move |_| { |
| 92 | let tx = tx.clone(); |
| 93 | pool1.spawn_broadcast(move |_| tx.send(()).unwrap()) |
| 94 | }) |
| 95 | } |
| 96 | }); |
| 97 | assert_eq!(rx.into_iter().count(), 3 * 7); |
| 98 | } |
| 99 | |
| 100 | #[test] |
| 101 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 102 | fn broadcast_mutual_sleepy() { |
| 103 | let count = AtomicUsize::new(0); |
| 104 | let pool1 = ThreadPoolBuilder::new().num_threads(3).build().unwrap(); |
| 105 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 106 | pool1.install(|| { |
| 107 | thread::sleep(time::Duration::from_secs(1)); |
| 108 | pool2.broadcast(|_| { |
| 109 | thread::sleep(time::Duration::from_secs(1)); |
| 110 | pool1.broadcast(|_| { |
| 111 | thread::sleep(time::Duration::from_millis(100)); |
| 112 | count.fetch_add(1, Ordering::Relaxed); |
| 113 | }) |
| 114 | }) |
| 115 | }); |
| 116 | assert_eq!(count.into_inner(), 3 * 7); |
| 117 | } |
| 118 | |
| 119 | #[test] |
| 120 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 121 | fn spawn_broadcast_mutual_sleepy() { |
| 122 | let (tx, rx) = channel(); |
| 123 | let pool1 = Arc::new(ThreadPoolBuilder::new().num_threads(3).build().unwrap()); |
| 124 | let pool2 = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 125 | pool1.spawn({ |
| 126 | let pool1 = Arc::clone(&pool1); |
| 127 | move || { |
| 128 | thread::sleep(time::Duration::from_secs(1)); |
| 129 | pool2.spawn_broadcast(move |_| { |
| 130 | let tx = tx.clone(); |
| 131 | thread::sleep(time::Duration::from_secs(1)); |
| 132 | pool1.spawn_broadcast(move |_| { |
| 133 | thread::sleep(time::Duration::from_millis(100)); |
| 134 | tx.send(()).unwrap(); |
| 135 | }) |
| 136 | }) |
| 137 | } |
| 138 | }); |
| 139 | assert_eq!(rx.into_iter().count(), 3 * 7); |
| 140 | } |
| 141 | |
| 142 | #[test] |
| 143 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
| 144 | fn broadcast_panic_one() { |
| 145 | let count = AtomicUsize::new(0); |
| 146 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 147 | let result = crate::unwind::halt_unwinding(|| { |
| 148 | pool.broadcast(|ctx| { |
| 149 | count.fetch_add(1, Ordering::Relaxed); |
| 150 | if ctx.index() == 3 { |
| 151 | panic!("Hello, world!" ); |
| 152 | } |
| 153 | }) |
| 154 | }); |
| 155 | assert_eq!(count.into_inner(), 7); |
| 156 | assert!(result.is_err(), "broadcast panic should propagate!" ); |
| 157 | } |
| 158 | |
| 159 | #[test] |
| 160 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
| 161 | fn spawn_broadcast_panic_one() { |
| 162 | let (tx, rx) = channel(); |
| 163 | let (panic_tx, panic_rx) = channel(); |
| 164 | let pool = ThreadPoolBuilder::new() |
| 165 | .num_threads(7) |
| 166 | .panic_handler(move |e| panic_tx.send(e).unwrap()) |
| 167 | .build() |
| 168 | .unwrap(); |
| 169 | pool.spawn_broadcast(move |ctx| { |
| 170 | tx.send(()).unwrap(); |
| 171 | if ctx.index() == 3 { |
| 172 | panic!("Hello, world!" ); |
| 173 | } |
| 174 | }); |
| 175 | drop(pool); // including panic_tx |
| 176 | assert_eq!(rx.into_iter().count(), 7); |
| 177 | assert_eq!(panic_rx.into_iter().count(), 1); |
| 178 | } |
| 179 | |
| 180 | #[test] |
| 181 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
| 182 | fn broadcast_panic_many() { |
| 183 | let count = AtomicUsize::new(0); |
| 184 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 185 | let result = crate::unwind::halt_unwinding(|| { |
| 186 | pool.broadcast(|ctx| { |
| 187 | count.fetch_add(1, Ordering::Relaxed); |
| 188 | if ctx.index() % 2 == 0 { |
| 189 | panic!("Hello, world!" ); |
| 190 | } |
| 191 | }) |
| 192 | }); |
| 193 | assert_eq!(count.into_inner(), 7); |
| 194 | assert!(result.is_err(), "broadcast panic should propagate!" ); |
| 195 | } |
| 196 | |
| 197 | #[test] |
| 198 | #[cfg_attr (not(panic = "unwind" ), ignore)] |
| 199 | fn spawn_broadcast_panic_many() { |
| 200 | let (tx, rx) = channel(); |
| 201 | let (panic_tx, panic_rx) = channel(); |
| 202 | let pool = ThreadPoolBuilder::new() |
| 203 | .num_threads(7) |
| 204 | .panic_handler(move |e| panic_tx.send(e).unwrap()) |
| 205 | .build() |
| 206 | .unwrap(); |
| 207 | pool.spawn_broadcast(move |ctx| { |
| 208 | tx.send(()).unwrap(); |
| 209 | if ctx.index() % 2 == 0 { |
| 210 | panic!("Hello, world!" ); |
| 211 | } |
| 212 | }); |
| 213 | drop(pool); // including panic_tx |
| 214 | assert_eq!(rx.into_iter().count(), 7); |
| 215 | assert_eq!(panic_rx.into_iter().count(), 4); |
| 216 | } |
| 217 | |
| 218 | #[test] |
| 219 | #[cfg_attr (any(target_os = "emscripten" , target_family = "wasm" ), ignore)] |
| 220 | fn broadcast_sleep_race() { |
| 221 | let test_duration = time::Duration::from_secs(1); |
| 222 | let pool = ThreadPoolBuilder::new().num_threads(7).build().unwrap(); |
| 223 | let start = time::Instant::now(); |
| 224 | while start.elapsed() < test_duration { |
| 225 | pool.broadcast(|ctx| { |
| 226 | // A slight spread of sleep duration increases the chance that one |
| 227 | // of the threads will race in the pool's idle sleep afterward. |
| 228 | thread::sleep(time::Duration::from_micros(ctx.index() as u64)); |
| 229 | }); |
| 230 | } |
| 231 | } |
| 232 | |
| 233 | #[test] |
| 234 | fn broadcast_after_spawn_broadcast() { |
| 235 | let (tx, rx) = channel(); |
| 236 | |
| 237 | // Queue a non-blocking spawn_broadcast. |
| 238 | crate::spawn_broadcast(move |ctx| tx.send(ctx.index()).unwrap()); |
| 239 | |
| 240 | // This blocking broadcast runs after all prior broadcasts. |
| 241 | crate::broadcast(|_| {}); |
| 242 | |
| 243 | // The spawn_broadcast **must** have run by now on all threads. |
| 244 | let mut v: Vec<_> = rx.try_iter().collect(); |
| 245 | v.sort_unstable(); |
| 246 | assert!(v.into_iter().eq(0..crate::current_num_threads())); |
| 247 | } |
| 248 | |
| 249 | #[test] |
| 250 | fn broadcast_after_spawn() { |
| 251 | let (tx, rx) = channel(); |
| 252 | |
| 253 | // Queue a regular spawn on a thread-local deque. |
| 254 | crate::registry::in_worker(move |_, _| { |
| 255 | crate::spawn(move || tx.send(22).unwrap()); |
| 256 | }); |
| 257 | |
| 258 | // Broadcast runs after the local deque is empty. |
| 259 | crate::broadcast(|_| {}); |
| 260 | |
| 261 | // The spawn **must** have run by now. |
| 262 | assert_eq!(22, rx.try_recv().unwrap()); |
| 263 | } |
| 264 | |