1 | use std::sync::atomic::Ordering::SeqCst; |
2 | use std::sync::atomic::{AtomicBool, AtomicUsize}; |
3 | use std::sync::{Arc, Mutex}; |
4 | |
5 | use crossbeam_deque::Steal::{Empty, Success}; |
6 | use crossbeam_deque::Worker; |
7 | use crossbeam_utils::thread::scope; |
8 | use rand::Rng; |
9 | |
10 | #[test] |
11 | fn smoke() { |
12 | let w = Worker::new_fifo(); |
13 | let s = w.stealer(); |
14 | assert_eq!(w.pop(), None); |
15 | assert_eq!(s.steal(), Empty); |
16 | |
17 | w.push(1); |
18 | assert_eq!(w.pop(), Some(1)); |
19 | assert_eq!(w.pop(), None); |
20 | assert_eq!(s.steal(), Empty); |
21 | |
22 | w.push(2); |
23 | assert_eq!(s.steal(), Success(2)); |
24 | assert_eq!(s.steal(), Empty); |
25 | assert_eq!(w.pop(), None); |
26 | |
27 | w.push(3); |
28 | w.push(4); |
29 | w.push(5); |
30 | assert_eq!(s.steal(), Success(3)); |
31 | assert_eq!(s.steal(), Success(4)); |
32 | assert_eq!(s.steal(), Success(5)); |
33 | assert_eq!(s.steal(), Empty); |
34 | |
35 | w.push(6); |
36 | w.push(7); |
37 | w.push(8); |
38 | w.push(9); |
39 | assert_eq!(w.pop(), Some(6)); |
40 | assert_eq!(s.steal(), Success(7)); |
41 | assert_eq!(w.pop(), Some(8)); |
42 | assert_eq!(w.pop(), Some(9)); |
43 | assert_eq!(w.pop(), None); |
44 | } |
45 | |
46 | #[test] |
47 | fn is_empty() { |
48 | let w = Worker::new_fifo(); |
49 | let s = w.stealer(); |
50 | |
51 | assert!(w.is_empty()); |
52 | w.push(1); |
53 | assert!(!w.is_empty()); |
54 | w.push(2); |
55 | assert!(!w.is_empty()); |
56 | let _ = w.pop(); |
57 | assert!(!w.is_empty()); |
58 | let _ = w.pop(); |
59 | assert!(w.is_empty()); |
60 | |
61 | assert!(s.is_empty()); |
62 | w.push(1); |
63 | assert!(!s.is_empty()); |
64 | w.push(2); |
65 | assert!(!s.is_empty()); |
66 | let _ = s.steal(); |
67 | assert!(!s.is_empty()); |
68 | let _ = s.steal(); |
69 | assert!(s.is_empty()); |
70 | } |
71 | |
72 | #[test] |
73 | fn spsc() { |
74 | #[cfg (miri)] |
75 | const STEPS: usize = 500; |
76 | #[cfg (not(miri))] |
77 | const STEPS: usize = 50_000; |
78 | |
79 | let w = Worker::new_fifo(); |
80 | let s = w.stealer(); |
81 | |
82 | scope(|scope| { |
83 | scope.spawn(|_| { |
84 | for i in 0..STEPS { |
85 | loop { |
86 | if let Success(v) = s.steal() { |
87 | assert_eq!(i, v); |
88 | break; |
89 | } |
90 | } |
91 | } |
92 | |
93 | assert_eq!(s.steal(), Empty); |
94 | }); |
95 | |
96 | for i in 0..STEPS { |
97 | w.push(i); |
98 | } |
99 | }) |
100 | .unwrap(); |
101 | } |
102 | |
103 | #[test] |
104 | fn stampede() { |
105 | const THREADS: usize = 8; |
106 | #[cfg (miri)] |
107 | const COUNT: usize = 500; |
108 | #[cfg (not(miri))] |
109 | const COUNT: usize = 50_000; |
110 | |
111 | let w = Worker::new_fifo(); |
112 | |
113 | for i in 0..COUNT { |
114 | w.push(Box::new(i + 1)); |
115 | } |
116 | let remaining = Arc::new(AtomicUsize::new(COUNT)); |
117 | |
118 | scope(|scope| { |
119 | for _ in 0..THREADS { |
120 | let s = w.stealer(); |
121 | let remaining = remaining.clone(); |
122 | |
123 | scope.spawn(move |_| { |
124 | let mut last = 0; |
125 | while remaining.load(SeqCst) > 0 { |
126 | if let Success(x) = s.steal() { |
127 | assert!(last < *x); |
128 | last = *x; |
129 | remaining.fetch_sub(1, SeqCst); |
130 | } |
131 | } |
132 | }); |
133 | } |
134 | |
135 | let mut last = 0; |
136 | while remaining.load(SeqCst) > 0 { |
137 | if let Some(x) = w.pop() { |
138 | assert!(last < *x); |
139 | last = *x; |
140 | remaining.fetch_sub(1, SeqCst); |
141 | } |
142 | } |
143 | }) |
144 | .unwrap(); |
145 | } |
146 | |
147 | #[test] |
148 | fn stress() { |
149 | const THREADS: usize = 8; |
150 | #[cfg (miri)] |
151 | const COUNT: usize = 500; |
152 | #[cfg (not(miri))] |
153 | const COUNT: usize = 50_000; |
154 | |
155 | let w = Worker::new_fifo(); |
156 | let done = Arc::new(AtomicBool::new(false)); |
157 | let hits = Arc::new(AtomicUsize::new(0)); |
158 | |
159 | scope(|scope| { |
160 | for _ in 0..THREADS { |
161 | let s = w.stealer(); |
162 | let done = done.clone(); |
163 | let hits = hits.clone(); |
164 | |
165 | scope.spawn(move |_| { |
166 | let w2 = Worker::new_fifo(); |
167 | |
168 | while !done.load(SeqCst) { |
169 | if let Success(_) = s.steal() { |
170 | hits.fetch_add(1, SeqCst); |
171 | } |
172 | |
173 | let _ = s.steal_batch(&w2); |
174 | |
175 | if let Success(_) = s.steal_batch_and_pop(&w2) { |
176 | hits.fetch_add(1, SeqCst); |
177 | } |
178 | |
179 | while w2.pop().is_some() { |
180 | hits.fetch_add(1, SeqCst); |
181 | } |
182 | } |
183 | }); |
184 | } |
185 | |
186 | let mut rng = rand::thread_rng(); |
187 | let mut expected = 0; |
188 | while expected < COUNT { |
189 | if rng.gen_range(0..3) == 0 { |
190 | while w.pop().is_some() { |
191 | hits.fetch_add(1, SeqCst); |
192 | } |
193 | } else { |
194 | w.push(expected); |
195 | expected += 1; |
196 | } |
197 | } |
198 | |
199 | while hits.load(SeqCst) < COUNT { |
200 | while w.pop().is_some() { |
201 | hits.fetch_add(1, SeqCst); |
202 | } |
203 | } |
204 | done.store(true, SeqCst); |
205 | }) |
206 | .unwrap(); |
207 | } |
208 | |
209 | #[cfg_attr (miri, ignore)] // Miri is too slow |
210 | #[test] |
211 | fn no_starvation() { |
212 | const THREADS: usize = 8; |
213 | const COUNT: usize = 50_000; |
214 | |
215 | let w = Worker::new_fifo(); |
216 | let done = Arc::new(AtomicBool::new(false)); |
217 | let mut all_hits = Vec::new(); |
218 | |
219 | scope(|scope| { |
220 | for _ in 0..THREADS { |
221 | let s = w.stealer(); |
222 | let done = done.clone(); |
223 | let hits = Arc::new(AtomicUsize::new(0)); |
224 | all_hits.push(hits.clone()); |
225 | |
226 | scope.spawn(move |_| { |
227 | let w2 = Worker::new_fifo(); |
228 | |
229 | while !done.load(SeqCst) { |
230 | if let Success(_) = s.steal() { |
231 | hits.fetch_add(1, SeqCst); |
232 | } |
233 | |
234 | let _ = s.steal_batch(&w2); |
235 | |
236 | if let Success(_) = s.steal_batch_and_pop(&w2) { |
237 | hits.fetch_add(1, SeqCst); |
238 | } |
239 | |
240 | while w2.pop().is_some() { |
241 | hits.fetch_add(1, SeqCst); |
242 | } |
243 | } |
244 | }); |
245 | } |
246 | |
247 | let mut rng = rand::thread_rng(); |
248 | let mut my_hits = 0; |
249 | loop { |
250 | for i in 0..rng.gen_range(0..COUNT) { |
251 | if rng.gen_range(0..3) == 0 && my_hits == 0 { |
252 | while w.pop().is_some() { |
253 | my_hits += 1; |
254 | } |
255 | } else { |
256 | w.push(i); |
257 | } |
258 | } |
259 | |
260 | if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { |
261 | break; |
262 | } |
263 | } |
264 | done.store(true, SeqCst); |
265 | }) |
266 | .unwrap(); |
267 | } |
268 | |
269 | #[test] |
270 | fn destructors() { |
271 | #[cfg (miri)] |
272 | const THREADS: usize = 2; |
273 | #[cfg (not(miri))] |
274 | const THREADS: usize = 8; |
275 | #[cfg (miri)] |
276 | const COUNT: usize = 500; |
277 | #[cfg (not(miri))] |
278 | const COUNT: usize = 50_000; |
279 | #[cfg (miri)] |
280 | const STEPS: usize = 100; |
281 | #[cfg (not(miri))] |
282 | const STEPS: usize = 1000; |
283 | |
284 | struct Elem(usize, Arc<Mutex<Vec<usize>>>); |
285 | |
286 | impl Drop for Elem { |
287 | fn drop(&mut self) { |
288 | self.1.lock().unwrap().push(self.0); |
289 | } |
290 | } |
291 | |
292 | let w = Worker::new_fifo(); |
293 | let dropped = Arc::new(Mutex::new(Vec::new())); |
294 | let remaining = Arc::new(AtomicUsize::new(COUNT)); |
295 | |
296 | for i in 0..COUNT { |
297 | w.push(Elem(i, dropped.clone())); |
298 | } |
299 | |
300 | scope(|scope| { |
301 | for _ in 0..THREADS { |
302 | let remaining = remaining.clone(); |
303 | let s = w.stealer(); |
304 | |
305 | scope.spawn(move |_| { |
306 | let w2 = Worker::new_fifo(); |
307 | let mut cnt = 0; |
308 | |
309 | while cnt < STEPS { |
310 | if let Success(_) = s.steal() { |
311 | cnt += 1; |
312 | remaining.fetch_sub(1, SeqCst); |
313 | } |
314 | |
315 | let _ = s.steal_batch(&w2); |
316 | |
317 | if let Success(_) = s.steal_batch_and_pop(&w2) { |
318 | cnt += 1; |
319 | remaining.fetch_sub(1, SeqCst); |
320 | } |
321 | |
322 | while w2.pop().is_some() { |
323 | cnt += 1; |
324 | remaining.fetch_sub(1, SeqCst); |
325 | } |
326 | } |
327 | }); |
328 | } |
329 | |
330 | for _ in 0..STEPS { |
331 | if w.pop().is_some() { |
332 | remaining.fetch_sub(1, SeqCst); |
333 | } |
334 | } |
335 | }) |
336 | .unwrap(); |
337 | |
338 | let rem = remaining.load(SeqCst); |
339 | assert!(rem > 0); |
340 | |
341 | { |
342 | let mut v = dropped.lock().unwrap(); |
343 | assert_eq!(v.len(), COUNT - rem); |
344 | v.clear(); |
345 | } |
346 | |
347 | drop(w); |
348 | |
349 | { |
350 | let mut v = dropped.lock().unwrap(); |
351 | assert_eq!(v.len(), rem); |
352 | v.sort_unstable(); |
353 | for pair in v.windows(2) { |
354 | assert_eq!(pair[0] + 1, pair[1]); |
355 | } |
356 | } |
357 | } |
358 | |