1 | use std::sync::atomic::Ordering::SeqCst; |
2 | use std::sync::atomic::{AtomicBool, AtomicUsize}; |
3 | use std::sync::{Arc, Mutex}; |
4 | |
5 | use crossbeam_deque::Steal::{Empty, Success}; |
6 | use crossbeam_deque::Worker; |
7 | use crossbeam_utils::thread::scope; |
8 | use rand::Rng; |
9 | |
10 | #[test] |
11 | fn smoke() { |
12 | let w = Worker::new_lifo(); |
13 | let s = w.stealer(); |
14 | assert_eq!(w.pop(), None); |
15 | assert_eq!(s.steal(), Empty); |
16 | |
17 | w.push(1); |
18 | assert_eq!(w.pop(), Some(1)); |
19 | assert_eq!(w.pop(), None); |
20 | assert_eq!(s.steal(), Empty); |
21 | |
22 | w.push(2); |
23 | assert_eq!(s.steal(), Success(2)); |
24 | assert_eq!(s.steal(), Empty); |
25 | assert_eq!(w.pop(), None); |
26 | |
27 | w.push(3); |
28 | w.push(4); |
29 | w.push(5); |
30 | assert_eq!(s.steal(), Success(3)); |
31 | assert_eq!(s.steal(), Success(4)); |
32 | assert_eq!(s.steal(), Success(5)); |
33 | assert_eq!(s.steal(), Empty); |
34 | |
35 | w.push(6); |
36 | w.push(7); |
37 | w.push(8); |
38 | w.push(9); |
39 | assert_eq!(w.pop(), Some(9)); |
40 | assert_eq!(s.steal(), Success(6)); |
41 | assert_eq!(w.pop(), Some(8)); |
42 | assert_eq!(w.pop(), Some(7)); |
43 | assert_eq!(w.pop(), None); |
44 | } |
45 | |
46 | #[test] |
47 | fn is_empty() { |
48 | let w = Worker::new_lifo(); |
49 | let s = w.stealer(); |
50 | |
51 | assert!(w.is_empty()); |
52 | w.push(1); |
53 | assert!(!w.is_empty()); |
54 | w.push(2); |
55 | assert!(!w.is_empty()); |
56 | let _ = w.pop(); |
57 | assert!(!w.is_empty()); |
58 | let _ = w.pop(); |
59 | assert!(w.is_empty()); |
60 | |
61 | assert!(s.is_empty()); |
62 | w.push(1); |
63 | assert!(!s.is_empty()); |
64 | w.push(2); |
65 | assert!(!s.is_empty()); |
66 | let _ = s.steal(); |
67 | assert!(!s.is_empty()); |
68 | let _ = s.steal(); |
69 | assert!(s.is_empty()); |
70 | } |
71 | |
72 | #[test] |
73 | fn spsc() { |
74 | #[cfg (miri)] |
75 | const STEPS: usize = 500; |
76 | #[cfg (not(miri))] |
77 | const STEPS: usize = 50_000; |
78 | |
79 | let w = Worker::new_lifo(); |
80 | let s = w.stealer(); |
81 | |
82 | scope(|scope| { |
83 | scope.spawn(|_| { |
84 | for i in 0..STEPS { |
85 | loop { |
86 | if let Success(v) = s.steal() { |
87 | assert_eq!(i, v); |
88 | break; |
89 | } |
90 | #[cfg (miri)] |
91 | std::hint::spin_loop(); |
92 | } |
93 | } |
94 | |
95 | assert_eq!(s.steal(), Empty); |
96 | }); |
97 | |
98 | for i in 0..STEPS { |
99 | w.push(i); |
100 | } |
101 | }) |
102 | .unwrap(); |
103 | } |
104 | |
105 | #[test] |
106 | fn stampede() { |
107 | const THREADS: usize = 8; |
108 | #[cfg (miri)] |
109 | const COUNT: usize = 500; |
110 | #[cfg (not(miri))] |
111 | const COUNT: usize = 50_000; |
112 | |
113 | let w = Worker::new_lifo(); |
114 | |
115 | for i in 0..COUNT { |
116 | w.push(Box::new(i + 1)); |
117 | } |
118 | let remaining = Arc::new(AtomicUsize::new(COUNT)); |
119 | |
120 | scope(|scope| { |
121 | for _ in 0..THREADS { |
122 | let s = w.stealer(); |
123 | let remaining = remaining.clone(); |
124 | |
125 | scope.spawn(move |_| { |
126 | let mut last = 0; |
127 | while remaining.load(SeqCst) > 0 { |
128 | if let Success(x) = s.steal() { |
129 | assert!(last < *x); |
130 | last = *x; |
131 | remaining.fetch_sub(1, SeqCst); |
132 | } |
133 | } |
134 | }); |
135 | } |
136 | |
137 | let mut last = COUNT + 1; |
138 | while remaining.load(SeqCst) > 0 { |
139 | if let Some(x) = w.pop() { |
140 | assert!(last > *x); |
141 | last = *x; |
142 | remaining.fetch_sub(1, SeqCst); |
143 | } |
144 | } |
145 | }) |
146 | .unwrap(); |
147 | } |
148 | |
149 | #[test] |
150 | fn stress() { |
151 | const THREADS: usize = 8; |
152 | #[cfg (miri)] |
153 | const COUNT: usize = 500; |
154 | #[cfg (not(miri))] |
155 | const COUNT: usize = 50_000; |
156 | |
157 | let w = Worker::new_lifo(); |
158 | let done = Arc::new(AtomicBool::new(false)); |
159 | let hits = Arc::new(AtomicUsize::new(0)); |
160 | |
161 | scope(|scope| { |
162 | for _ in 0..THREADS { |
163 | let s = w.stealer(); |
164 | let done = done.clone(); |
165 | let hits = hits.clone(); |
166 | |
167 | scope.spawn(move |_| { |
168 | let w2 = Worker::new_lifo(); |
169 | |
170 | while !done.load(SeqCst) { |
171 | if let Success(_) = s.steal() { |
172 | hits.fetch_add(1, SeqCst); |
173 | } |
174 | |
175 | let _ = s.steal_batch(&w2); |
176 | |
177 | if let Success(_) = s.steal_batch_and_pop(&w2) { |
178 | hits.fetch_add(1, SeqCst); |
179 | } |
180 | |
181 | while w2.pop().is_some() { |
182 | hits.fetch_add(1, SeqCst); |
183 | } |
184 | } |
185 | }); |
186 | } |
187 | |
188 | let mut rng = rand::thread_rng(); |
189 | let mut expected = 0; |
190 | while expected < COUNT { |
191 | if rng.gen_range(0..3) == 0 { |
192 | while w.pop().is_some() { |
193 | hits.fetch_add(1, SeqCst); |
194 | } |
195 | } else { |
196 | w.push(expected); |
197 | expected += 1; |
198 | } |
199 | } |
200 | |
201 | while hits.load(SeqCst) < COUNT { |
202 | while w.pop().is_some() { |
203 | hits.fetch_add(1, SeqCst); |
204 | } |
205 | } |
206 | done.store(true, SeqCst); |
207 | }) |
208 | .unwrap(); |
209 | } |
210 | |
211 | #[cfg_attr (miri, ignore)] // Miri is too slow |
212 | #[test] |
213 | fn no_starvation() { |
214 | const THREADS: usize = 8; |
215 | const COUNT: usize = 50_000; |
216 | |
217 | let w = Worker::new_lifo(); |
218 | let done = Arc::new(AtomicBool::new(false)); |
219 | let mut all_hits = Vec::new(); |
220 | |
221 | scope(|scope| { |
222 | for _ in 0..THREADS { |
223 | let s = w.stealer(); |
224 | let done = done.clone(); |
225 | let hits = Arc::new(AtomicUsize::new(0)); |
226 | all_hits.push(hits.clone()); |
227 | |
228 | scope.spawn(move |_| { |
229 | let w2 = Worker::new_lifo(); |
230 | |
231 | while !done.load(SeqCst) { |
232 | if let Success(_) = s.steal() { |
233 | hits.fetch_add(1, SeqCst); |
234 | } |
235 | |
236 | let _ = s.steal_batch(&w2); |
237 | |
238 | if let Success(_) = s.steal_batch_and_pop(&w2) { |
239 | hits.fetch_add(1, SeqCst); |
240 | } |
241 | |
242 | while w2.pop().is_some() { |
243 | hits.fetch_add(1, SeqCst); |
244 | } |
245 | } |
246 | }); |
247 | } |
248 | |
249 | let mut rng = rand::thread_rng(); |
250 | let mut my_hits = 0; |
251 | loop { |
252 | for i in 0..rng.gen_range(0..COUNT) { |
253 | if rng.gen_range(0..3) == 0 && my_hits == 0 { |
254 | while w.pop().is_some() { |
255 | my_hits += 1; |
256 | } |
257 | } else { |
258 | w.push(i); |
259 | } |
260 | } |
261 | |
262 | if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { |
263 | break; |
264 | } |
265 | } |
266 | done.store(true, SeqCst); |
267 | }) |
268 | .unwrap(); |
269 | } |
270 | |
271 | #[test] |
272 | fn destructors() { |
273 | #[cfg (miri)] |
274 | const THREADS: usize = 2; |
275 | #[cfg (not(miri))] |
276 | const THREADS: usize = 8; |
277 | #[cfg (miri)] |
278 | const COUNT: usize = 500; |
279 | #[cfg (not(miri))] |
280 | const COUNT: usize = 50_000; |
281 | #[cfg (miri)] |
282 | const STEPS: usize = 100; |
283 | #[cfg (not(miri))] |
284 | const STEPS: usize = 1000; |
285 | |
286 | struct Elem(usize, Arc<Mutex<Vec<usize>>>); |
287 | |
288 | impl Drop for Elem { |
289 | fn drop(&mut self) { |
290 | self.1.lock().unwrap().push(self.0); |
291 | } |
292 | } |
293 | |
294 | let w = Worker::new_lifo(); |
295 | let dropped = Arc::new(Mutex::new(Vec::new())); |
296 | let remaining = Arc::new(AtomicUsize::new(COUNT)); |
297 | |
298 | for i in 0..COUNT { |
299 | w.push(Elem(i, dropped.clone())); |
300 | } |
301 | |
302 | scope(|scope| { |
303 | for _ in 0..THREADS { |
304 | let remaining = remaining.clone(); |
305 | let s = w.stealer(); |
306 | |
307 | scope.spawn(move |_| { |
308 | let w2 = Worker::new_lifo(); |
309 | let mut cnt = 0; |
310 | |
311 | while cnt < STEPS { |
312 | if let Success(_) = s.steal() { |
313 | cnt += 1; |
314 | remaining.fetch_sub(1, SeqCst); |
315 | } |
316 | |
317 | let _ = s.steal_batch(&w2); |
318 | |
319 | if let Success(_) = s.steal_batch_and_pop(&w2) { |
320 | cnt += 1; |
321 | remaining.fetch_sub(1, SeqCst); |
322 | } |
323 | |
324 | while w2.pop().is_some() { |
325 | cnt += 1; |
326 | remaining.fetch_sub(1, SeqCst); |
327 | } |
328 | } |
329 | }); |
330 | } |
331 | |
332 | for _ in 0..STEPS { |
333 | if w.pop().is_some() { |
334 | remaining.fetch_sub(1, SeqCst); |
335 | } |
336 | } |
337 | }) |
338 | .unwrap(); |
339 | |
340 | let rem = remaining.load(SeqCst); |
341 | assert!(rem > 0); |
342 | |
343 | { |
344 | let mut v = dropped.lock().unwrap(); |
345 | assert_eq!(v.len(), COUNT - rem); |
346 | v.clear(); |
347 | } |
348 | |
349 | drop(w); |
350 | |
351 | { |
352 | let mut v = dropped.lock().unwrap(); |
353 | assert_eq!(v.len(), rem); |
354 | v.sort_unstable(); |
355 | for pair in v.windows(2) { |
356 | assert_eq!(pair[0] + 1, pair[1]); |
357 | } |
358 | } |
359 | } |
360 | |