1 | use std::sync::atomic::Ordering::SeqCst; |
2 | use std::sync::atomic::{AtomicBool, AtomicUsize}; |
3 | use std::sync::{Arc, Mutex}; |
4 | |
5 | use crossbeam_deque::Steal::{Empty, Success}; |
6 | use crossbeam_deque::{Injector, Worker}; |
7 | use crossbeam_utils::thread::scope; |
8 | use rand::Rng; |
9 | |
10 | #[test] |
11 | fn smoke() { |
12 | let q = Injector::new(); |
13 | assert_eq!(q.steal(), Empty); |
14 | |
15 | q.push(1); |
16 | q.push(2); |
17 | assert_eq!(q.steal(), Success(1)); |
18 | assert_eq!(q.steal(), Success(2)); |
19 | assert_eq!(q.steal(), Empty); |
20 | |
21 | q.push(3); |
22 | assert_eq!(q.steal(), Success(3)); |
23 | assert_eq!(q.steal(), Empty); |
24 | } |
25 | |
26 | #[test] |
27 | fn is_empty() { |
28 | let q = Injector::new(); |
29 | assert!(q.is_empty()); |
30 | |
31 | q.push(1); |
32 | assert!(!q.is_empty()); |
33 | q.push(2); |
34 | assert!(!q.is_empty()); |
35 | |
36 | let _ = q.steal(); |
37 | assert!(!q.is_empty()); |
38 | let _ = q.steal(); |
39 | assert!(q.is_empty()); |
40 | |
41 | q.push(3); |
42 | assert!(!q.is_empty()); |
43 | let _ = q.steal(); |
44 | assert!(q.is_empty()); |
45 | } |
46 | |
47 | #[test] |
48 | fn spsc() { |
49 | #[cfg (miri)] |
50 | const COUNT: usize = 500; |
51 | #[cfg (not(miri))] |
52 | const COUNT: usize = 100_000; |
53 | |
54 | let q = Injector::new(); |
55 | |
56 | scope(|scope| { |
57 | scope.spawn(|_| { |
58 | for i in 0..COUNT { |
59 | loop { |
60 | if let Success(v) = q.steal() { |
61 | assert_eq!(i, v); |
62 | break; |
63 | } |
64 | #[cfg (miri)] |
65 | std::hint::spin_loop(); |
66 | } |
67 | } |
68 | |
69 | assert_eq!(q.steal(), Empty); |
70 | }); |
71 | |
72 | for i in 0..COUNT { |
73 | q.push(i); |
74 | } |
75 | }) |
76 | .unwrap(); |
77 | } |
78 | |
79 | #[test] |
80 | fn mpmc() { |
81 | #[cfg (miri)] |
82 | const COUNT: usize = 500; |
83 | #[cfg (not(miri))] |
84 | const COUNT: usize = 25_000; |
85 | const THREADS: usize = 4; |
86 | |
87 | let q = Injector::new(); |
88 | let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); |
89 | |
90 | scope(|scope| { |
91 | for _ in 0..THREADS { |
92 | scope.spawn(|_| { |
93 | for i in 0..COUNT { |
94 | q.push(i); |
95 | } |
96 | }); |
97 | } |
98 | |
99 | for _ in 0..THREADS { |
100 | scope.spawn(|_| { |
101 | for _ in 0..COUNT { |
102 | loop { |
103 | if let Success(n) = q.steal() { |
104 | v[n].fetch_add(1, SeqCst); |
105 | break; |
106 | } |
107 | #[cfg (miri)] |
108 | std::hint::spin_loop(); |
109 | } |
110 | } |
111 | }); |
112 | } |
113 | }) |
114 | .unwrap(); |
115 | |
116 | for c in v { |
117 | assert_eq!(c.load(SeqCst), THREADS); |
118 | } |
119 | } |
120 | |
121 | #[test] |
122 | fn stampede() { |
123 | const THREADS: usize = 8; |
124 | #[cfg (miri)] |
125 | const COUNT: usize = 500; |
126 | #[cfg (not(miri))] |
127 | const COUNT: usize = 50_000; |
128 | |
129 | let q = Injector::new(); |
130 | |
131 | for i in 0..COUNT { |
132 | q.push(Box::new(i + 1)); |
133 | } |
134 | let remaining = Arc::new(AtomicUsize::new(COUNT)); |
135 | |
136 | scope(|scope| { |
137 | for _ in 0..THREADS { |
138 | let remaining = remaining.clone(); |
139 | let q = &q; |
140 | |
141 | scope.spawn(move |_| { |
142 | let mut last = 0; |
143 | while remaining.load(SeqCst) > 0 { |
144 | if let Success(x) = q.steal() { |
145 | assert!(last < *x); |
146 | last = *x; |
147 | remaining.fetch_sub(1, SeqCst); |
148 | } |
149 | } |
150 | }); |
151 | } |
152 | |
153 | let mut last = 0; |
154 | while remaining.load(SeqCst) > 0 { |
155 | if let Success(x) = q.steal() { |
156 | assert!(last < *x); |
157 | last = *x; |
158 | remaining.fetch_sub(1, SeqCst); |
159 | } |
160 | } |
161 | }) |
162 | .unwrap(); |
163 | } |
164 | |
165 | #[test] |
166 | fn stress() { |
167 | const THREADS: usize = 8; |
168 | #[cfg (miri)] |
169 | const COUNT: usize = 500; |
170 | #[cfg (not(miri))] |
171 | const COUNT: usize = 50_000; |
172 | |
173 | let q = Injector::new(); |
174 | let done = Arc::new(AtomicBool::new(false)); |
175 | let hits = Arc::new(AtomicUsize::new(0)); |
176 | |
177 | scope(|scope| { |
178 | for _ in 0..THREADS { |
179 | let done = done.clone(); |
180 | let hits = hits.clone(); |
181 | let q = &q; |
182 | |
183 | scope.spawn(move |_| { |
184 | let w2 = Worker::new_fifo(); |
185 | |
186 | while !done.load(SeqCst) { |
187 | if let Success(_) = q.steal() { |
188 | hits.fetch_add(1, SeqCst); |
189 | } |
190 | |
191 | let _ = q.steal_batch(&w2); |
192 | |
193 | if let Success(_) = q.steal_batch_and_pop(&w2) { |
194 | hits.fetch_add(1, SeqCst); |
195 | } |
196 | |
197 | while w2.pop().is_some() { |
198 | hits.fetch_add(1, SeqCst); |
199 | } |
200 | } |
201 | }); |
202 | } |
203 | |
204 | let mut rng = rand::thread_rng(); |
205 | let mut expected = 0; |
206 | while expected < COUNT { |
207 | if rng.gen_range(0..3) == 0 { |
208 | while let Success(_) = q.steal() { |
209 | hits.fetch_add(1, SeqCst); |
210 | } |
211 | } else { |
212 | q.push(expected); |
213 | expected += 1; |
214 | } |
215 | } |
216 | |
217 | while hits.load(SeqCst) < COUNT { |
218 | while let Success(_) = q.steal() { |
219 | hits.fetch_add(1, SeqCst); |
220 | } |
221 | } |
222 | done.store(true, SeqCst); |
223 | }) |
224 | .unwrap(); |
225 | } |
226 | |
227 | #[cfg_attr (miri, ignore)] // Miri is too slow |
228 | #[test] |
229 | fn no_starvation() { |
230 | const THREADS: usize = 8; |
231 | const COUNT: usize = 50_000; |
232 | |
233 | let q = Injector::new(); |
234 | let done = Arc::new(AtomicBool::new(false)); |
235 | let mut all_hits = Vec::new(); |
236 | |
237 | scope(|scope| { |
238 | for _ in 0..THREADS { |
239 | let done = done.clone(); |
240 | let hits = Arc::new(AtomicUsize::new(0)); |
241 | all_hits.push(hits.clone()); |
242 | let q = &q; |
243 | |
244 | scope.spawn(move |_| { |
245 | let w2 = Worker::new_fifo(); |
246 | |
247 | while !done.load(SeqCst) { |
248 | if let Success(_) = q.steal() { |
249 | hits.fetch_add(1, SeqCst); |
250 | } |
251 | |
252 | let _ = q.steal_batch(&w2); |
253 | |
254 | if let Success(_) = q.steal_batch_and_pop(&w2) { |
255 | hits.fetch_add(1, SeqCst); |
256 | } |
257 | |
258 | while w2.pop().is_some() { |
259 | hits.fetch_add(1, SeqCst); |
260 | } |
261 | } |
262 | }); |
263 | } |
264 | |
265 | let mut rng = rand::thread_rng(); |
266 | let mut my_hits = 0; |
267 | loop { |
268 | for i in 0..rng.gen_range(0..COUNT) { |
269 | if rng.gen_range(0..3) == 0 && my_hits == 0 { |
270 | while let Success(_) = q.steal() { |
271 | my_hits += 1; |
272 | } |
273 | } else { |
274 | q.push(i); |
275 | } |
276 | } |
277 | |
278 | if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { |
279 | break; |
280 | } |
281 | } |
282 | done.store(true, SeqCst); |
283 | }) |
284 | .unwrap(); |
285 | } |
286 | |
287 | #[test] |
288 | fn destructors() { |
289 | #[cfg (miri)] |
290 | const THREADS: usize = 2; |
291 | #[cfg (not(miri))] |
292 | const THREADS: usize = 8; |
293 | #[cfg (miri)] |
294 | const COUNT: usize = 500; |
295 | #[cfg (not(miri))] |
296 | const COUNT: usize = 50_000; |
297 | #[cfg (miri)] |
298 | const STEPS: usize = 100; |
299 | #[cfg (not(miri))] |
300 | const STEPS: usize = 1000; |
301 | |
302 | struct Elem(usize, Arc<Mutex<Vec<usize>>>); |
303 | |
304 | impl Drop for Elem { |
305 | fn drop(&mut self) { |
306 | self.1.lock().unwrap().push(self.0); |
307 | } |
308 | } |
309 | |
310 | let q = Injector::new(); |
311 | let dropped = Arc::new(Mutex::new(Vec::new())); |
312 | let remaining = Arc::new(AtomicUsize::new(COUNT)); |
313 | |
314 | for i in 0..COUNT { |
315 | q.push(Elem(i, dropped.clone())); |
316 | } |
317 | |
318 | scope(|scope| { |
319 | for _ in 0..THREADS { |
320 | let remaining = remaining.clone(); |
321 | let q = &q; |
322 | |
323 | scope.spawn(move |_| { |
324 | let w2 = Worker::new_fifo(); |
325 | let mut cnt = 0; |
326 | |
327 | while cnt < STEPS { |
328 | if let Success(_) = q.steal() { |
329 | cnt += 1; |
330 | remaining.fetch_sub(1, SeqCst); |
331 | } |
332 | |
333 | let _ = q.steal_batch(&w2); |
334 | |
335 | if let Success(_) = q.steal_batch_and_pop(&w2) { |
336 | cnt += 1; |
337 | remaining.fetch_sub(1, SeqCst); |
338 | } |
339 | |
340 | while w2.pop().is_some() { |
341 | cnt += 1; |
342 | remaining.fetch_sub(1, SeqCst); |
343 | } |
344 | } |
345 | }); |
346 | } |
347 | |
348 | for _ in 0..STEPS { |
349 | if let Success(_) = q.steal() { |
350 | remaining.fetch_sub(1, SeqCst); |
351 | } |
352 | } |
353 | }) |
354 | .unwrap(); |
355 | |
356 | let rem = remaining.load(SeqCst); |
357 | assert!(rem > 0); |
358 | |
359 | { |
360 | let mut v = dropped.lock().unwrap(); |
361 | assert_eq!(v.len(), COUNT - rem); |
362 | v.clear(); |
363 | } |
364 | |
365 | drop(q); |
366 | |
367 | { |
368 | let mut v = dropped.lock().unwrap(); |
369 | assert_eq!(v.len(), rem); |
370 | v.sort_unstable(); |
371 | for pair in v.windows(2) { |
372 | assert_eq!(pair[0] + 1, pair[1]); |
373 | } |
374 | } |
375 | } |
376 | |