1use std::sync::atomic::Ordering::SeqCst;
2use std::sync::atomic::{AtomicBool, AtomicUsize};
3use std::sync::{Arc, Mutex};
4
5use crossbeam_deque::Steal::{Empty, Success};
6use crossbeam_deque::Worker;
7use crossbeam_utils::thread::scope;
8use rand::Rng;
9
10#[test]
11fn smoke() {
12 let w = Worker::new_lifo();
13 let s = w.stealer();
14 assert_eq!(w.pop(), None);
15 assert_eq!(s.steal(), Empty);
16
17 w.push(1);
18 assert_eq!(w.pop(), Some(1));
19 assert_eq!(w.pop(), None);
20 assert_eq!(s.steal(), Empty);
21
22 w.push(2);
23 assert_eq!(s.steal(), Success(2));
24 assert_eq!(s.steal(), Empty);
25 assert_eq!(w.pop(), None);
26
27 w.push(3);
28 w.push(4);
29 w.push(5);
30 assert_eq!(s.steal(), Success(3));
31 assert_eq!(s.steal(), Success(4));
32 assert_eq!(s.steal(), Success(5));
33 assert_eq!(s.steal(), Empty);
34
35 w.push(6);
36 w.push(7);
37 w.push(8);
38 w.push(9);
39 assert_eq!(w.pop(), Some(9));
40 assert_eq!(s.steal(), Success(6));
41 assert_eq!(w.pop(), Some(8));
42 assert_eq!(w.pop(), Some(7));
43 assert_eq!(w.pop(), None);
44}
45
46#[test]
47fn is_empty() {
48 let w = Worker::new_lifo();
49 let s = w.stealer();
50
51 assert!(w.is_empty());
52 w.push(1);
53 assert!(!w.is_empty());
54 w.push(2);
55 assert!(!w.is_empty());
56 let _ = w.pop();
57 assert!(!w.is_empty());
58 let _ = w.pop();
59 assert!(w.is_empty());
60
61 assert!(s.is_empty());
62 w.push(1);
63 assert!(!s.is_empty());
64 w.push(2);
65 assert!(!s.is_empty());
66 let _ = s.steal();
67 assert!(!s.is_empty());
68 let _ = s.steal();
69 assert!(s.is_empty());
70}
71
72#[test]
73fn spsc() {
74 #[cfg(miri)]
75 const STEPS: usize = 500;
76 #[cfg(not(miri))]
77 const STEPS: usize = 50_000;
78
79 let w = Worker::new_lifo();
80 let s = w.stealer();
81
82 scope(|scope| {
83 scope.spawn(|_| {
84 for i in 0..STEPS {
85 loop {
86 if let Success(v) = s.steal() {
87 assert_eq!(i, v);
88 break;
89 }
90 #[cfg(miri)]
91 std::hint::spin_loop();
92 }
93 }
94
95 assert_eq!(s.steal(), Empty);
96 });
97
98 for i in 0..STEPS {
99 w.push(i);
100 }
101 })
102 .unwrap();
103}
104
105#[test]
106fn stampede() {
107 const THREADS: usize = 8;
108 #[cfg(miri)]
109 const COUNT: usize = 500;
110 #[cfg(not(miri))]
111 const COUNT: usize = 50_000;
112
113 let w = Worker::new_lifo();
114
115 for i in 0..COUNT {
116 w.push(Box::new(i + 1));
117 }
118 let remaining = Arc::new(AtomicUsize::new(COUNT));
119
120 scope(|scope| {
121 for _ in 0..THREADS {
122 let s = w.stealer();
123 let remaining = remaining.clone();
124
125 scope.spawn(move |_| {
126 let mut last = 0;
127 while remaining.load(SeqCst) > 0 {
128 if let Success(x) = s.steal() {
129 assert!(last < *x);
130 last = *x;
131 remaining.fetch_sub(1, SeqCst);
132 }
133 }
134 });
135 }
136
137 let mut last = COUNT + 1;
138 while remaining.load(SeqCst) > 0 {
139 if let Some(x) = w.pop() {
140 assert!(last > *x);
141 last = *x;
142 remaining.fetch_sub(1, SeqCst);
143 }
144 }
145 })
146 .unwrap();
147}
148
149#[test]
150fn stress() {
151 const THREADS: usize = 8;
152 #[cfg(miri)]
153 const COUNT: usize = 500;
154 #[cfg(not(miri))]
155 const COUNT: usize = 50_000;
156
157 let w = Worker::new_lifo();
158 let done = Arc::new(AtomicBool::new(false));
159 let hits = Arc::new(AtomicUsize::new(0));
160
161 scope(|scope| {
162 for _ in 0..THREADS {
163 let s = w.stealer();
164 let done = done.clone();
165 let hits = hits.clone();
166
167 scope.spawn(move |_| {
168 let w2 = Worker::new_lifo();
169
170 while !done.load(SeqCst) {
171 if let Success(_) = s.steal() {
172 hits.fetch_add(1, SeqCst);
173 }
174
175 let _ = s.steal_batch(&w2);
176
177 if let Success(_) = s.steal_batch_and_pop(&w2) {
178 hits.fetch_add(1, SeqCst);
179 }
180
181 while w2.pop().is_some() {
182 hits.fetch_add(1, SeqCst);
183 }
184 }
185 });
186 }
187
188 let mut rng = rand::thread_rng();
189 let mut expected = 0;
190 while expected < COUNT {
191 if rng.gen_range(0..3) == 0 {
192 while w.pop().is_some() {
193 hits.fetch_add(1, SeqCst);
194 }
195 } else {
196 w.push(expected);
197 expected += 1;
198 }
199 }
200
201 while hits.load(SeqCst) < COUNT {
202 while w.pop().is_some() {
203 hits.fetch_add(1, SeqCst);
204 }
205 }
206 done.store(true, SeqCst);
207 })
208 .unwrap();
209}
210
211#[cfg_attr(miri, ignore)] // Miri is too slow
212#[test]
213fn no_starvation() {
214 const THREADS: usize = 8;
215 const COUNT: usize = 50_000;
216
217 let w = Worker::new_lifo();
218 let done = Arc::new(AtomicBool::new(false));
219 let mut all_hits = Vec::new();
220
221 scope(|scope| {
222 for _ in 0..THREADS {
223 let s = w.stealer();
224 let done = done.clone();
225 let hits = Arc::new(AtomicUsize::new(0));
226 all_hits.push(hits.clone());
227
228 scope.spawn(move |_| {
229 let w2 = Worker::new_lifo();
230
231 while !done.load(SeqCst) {
232 if let Success(_) = s.steal() {
233 hits.fetch_add(1, SeqCst);
234 }
235
236 let _ = s.steal_batch(&w2);
237
238 if let Success(_) = s.steal_batch_and_pop(&w2) {
239 hits.fetch_add(1, SeqCst);
240 }
241
242 while w2.pop().is_some() {
243 hits.fetch_add(1, SeqCst);
244 }
245 }
246 });
247 }
248
249 let mut rng = rand::thread_rng();
250 let mut my_hits = 0;
251 loop {
252 for i in 0..rng.gen_range(0..COUNT) {
253 if rng.gen_range(0..3) == 0 && my_hits == 0 {
254 while w.pop().is_some() {
255 my_hits += 1;
256 }
257 } else {
258 w.push(i);
259 }
260 }
261
262 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
263 break;
264 }
265 }
266 done.store(true, SeqCst);
267 })
268 .unwrap();
269}
270
271#[test]
272fn destructors() {
273 #[cfg(miri)]
274 const THREADS: usize = 2;
275 #[cfg(not(miri))]
276 const THREADS: usize = 8;
277 #[cfg(miri)]
278 const COUNT: usize = 500;
279 #[cfg(not(miri))]
280 const COUNT: usize = 50_000;
281 #[cfg(miri)]
282 const STEPS: usize = 100;
283 #[cfg(not(miri))]
284 const STEPS: usize = 1000;
285
286 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
287
288 impl Drop for Elem {
289 fn drop(&mut self) {
290 self.1.lock().unwrap().push(self.0);
291 }
292 }
293
294 let w = Worker::new_lifo();
295 let dropped = Arc::new(Mutex::new(Vec::new()));
296 let remaining = Arc::new(AtomicUsize::new(COUNT));
297
298 for i in 0..COUNT {
299 w.push(Elem(i, dropped.clone()));
300 }
301
302 scope(|scope| {
303 for _ in 0..THREADS {
304 let remaining = remaining.clone();
305 let s = w.stealer();
306
307 scope.spawn(move |_| {
308 let w2 = Worker::new_lifo();
309 let mut cnt = 0;
310
311 while cnt < STEPS {
312 if let Success(_) = s.steal() {
313 cnt += 1;
314 remaining.fetch_sub(1, SeqCst);
315 }
316
317 let _ = s.steal_batch(&w2);
318
319 if let Success(_) = s.steal_batch_and_pop(&w2) {
320 cnt += 1;
321 remaining.fetch_sub(1, SeqCst);
322 }
323
324 while w2.pop().is_some() {
325 cnt += 1;
326 remaining.fetch_sub(1, SeqCst);
327 }
328 }
329 });
330 }
331
332 for _ in 0..STEPS {
333 if w.pop().is_some() {
334 remaining.fetch_sub(1, SeqCst);
335 }
336 }
337 })
338 .unwrap();
339
340 let rem = remaining.load(SeqCst);
341 assert!(rem > 0);
342
343 {
344 let mut v = dropped.lock().unwrap();
345 assert_eq!(v.len(), COUNT - rem);
346 v.clear();
347 }
348
349 drop(w);
350
351 {
352 let mut v = dropped.lock().unwrap();
353 assert_eq!(v.len(), rem);
354 v.sort_unstable();
355 for pair in v.windows(2) {
356 assert_eq!(pair[0] + 1, pair[1]);
357 }
358 }
359}
360