1use std::sync::atomic::Ordering::SeqCst;
2use std::sync::atomic::{AtomicBool, AtomicUsize};
3use std::sync::{Arc, Mutex};
4
5use crossbeam_deque::Steal::{Empty, Success};
6use crossbeam_deque::Worker;
7use crossbeam_utils::thread::scope;
8use rand::Rng;
9
10#[test]
11fn smoke() {
12 let w = Worker::new_fifo();
13 let s = w.stealer();
14 assert_eq!(w.pop(), None);
15 assert_eq!(s.steal(), Empty);
16
17 w.push(1);
18 assert_eq!(w.pop(), Some(1));
19 assert_eq!(w.pop(), None);
20 assert_eq!(s.steal(), Empty);
21
22 w.push(2);
23 assert_eq!(s.steal(), Success(2));
24 assert_eq!(s.steal(), Empty);
25 assert_eq!(w.pop(), None);
26
27 w.push(3);
28 w.push(4);
29 w.push(5);
30 assert_eq!(s.steal(), Success(3));
31 assert_eq!(s.steal(), Success(4));
32 assert_eq!(s.steal(), Success(5));
33 assert_eq!(s.steal(), Empty);
34
35 w.push(6);
36 w.push(7);
37 w.push(8);
38 w.push(9);
39 assert_eq!(w.pop(), Some(6));
40 assert_eq!(s.steal(), Success(7));
41 assert_eq!(w.pop(), Some(8));
42 assert_eq!(w.pop(), Some(9));
43 assert_eq!(w.pop(), None);
44}
45
46#[test]
47fn is_empty() {
48 let w = Worker::new_fifo();
49 let s = w.stealer();
50
51 assert!(w.is_empty());
52 w.push(1);
53 assert!(!w.is_empty());
54 w.push(2);
55 assert!(!w.is_empty());
56 let _ = w.pop();
57 assert!(!w.is_empty());
58 let _ = w.pop();
59 assert!(w.is_empty());
60
61 assert!(s.is_empty());
62 w.push(1);
63 assert!(!s.is_empty());
64 w.push(2);
65 assert!(!s.is_empty());
66 let _ = s.steal();
67 assert!(!s.is_empty());
68 let _ = s.steal();
69 assert!(s.is_empty());
70}
71
72#[test]
73fn spsc() {
74 #[cfg(miri)]
75 const STEPS: usize = 500;
76 #[cfg(not(miri))]
77 const STEPS: usize = 50_000;
78
79 let w = Worker::new_fifo();
80 let s = w.stealer();
81
82 scope(|scope| {
83 scope.spawn(|_| {
84 for i in 0..STEPS {
85 loop {
86 if let Success(v) = s.steal() {
87 assert_eq!(i, v);
88 break;
89 }
90 }
91 }
92
93 assert_eq!(s.steal(), Empty);
94 });
95
96 for i in 0..STEPS {
97 w.push(i);
98 }
99 })
100 .unwrap();
101}
102
103#[test]
104fn stampede() {
105 const THREADS: usize = 8;
106 #[cfg(miri)]
107 const COUNT: usize = 500;
108 #[cfg(not(miri))]
109 const COUNT: usize = 50_000;
110
111 let w = Worker::new_fifo();
112
113 for i in 0..COUNT {
114 w.push(Box::new(i + 1));
115 }
116 let remaining = Arc::new(AtomicUsize::new(COUNT));
117
118 scope(|scope| {
119 for _ in 0..THREADS {
120 let s = w.stealer();
121 let remaining = remaining.clone();
122
123 scope.spawn(move |_| {
124 let mut last = 0;
125 while remaining.load(SeqCst) > 0 {
126 if let Success(x) = s.steal() {
127 assert!(last < *x);
128 last = *x;
129 remaining.fetch_sub(1, SeqCst);
130 }
131 }
132 });
133 }
134
135 let mut last = 0;
136 while remaining.load(SeqCst) > 0 {
137 if let Some(x) = w.pop() {
138 assert!(last < *x);
139 last = *x;
140 remaining.fetch_sub(1, SeqCst);
141 }
142 }
143 })
144 .unwrap();
145}
146
147#[test]
148fn stress() {
149 const THREADS: usize = 8;
150 #[cfg(miri)]
151 const COUNT: usize = 500;
152 #[cfg(not(miri))]
153 const COUNT: usize = 50_000;
154
155 let w = Worker::new_fifo();
156 let done = Arc::new(AtomicBool::new(false));
157 let hits = Arc::new(AtomicUsize::new(0));
158
159 scope(|scope| {
160 for _ in 0..THREADS {
161 let s = w.stealer();
162 let done = done.clone();
163 let hits = hits.clone();
164
165 scope.spawn(move |_| {
166 let w2 = Worker::new_fifo();
167
168 while !done.load(SeqCst) {
169 if let Success(_) = s.steal() {
170 hits.fetch_add(1, SeqCst);
171 }
172
173 let _ = s.steal_batch(&w2);
174
175 if let Success(_) = s.steal_batch_and_pop(&w2) {
176 hits.fetch_add(1, SeqCst);
177 }
178
179 while w2.pop().is_some() {
180 hits.fetch_add(1, SeqCst);
181 }
182 }
183 });
184 }
185
186 let mut rng = rand::thread_rng();
187 let mut expected = 0;
188 while expected < COUNT {
189 if rng.gen_range(0..3) == 0 {
190 while w.pop().is_some() {
191 hits.fetch_add(1, SeqCst);
192 }
193 } else {
194 w.push(expected);
195 expected += 1;
196 }
197 }
198
199 while hits.load(SeqCst) < COUNT {
200 while w.pop().is_some() {
201 hits.fetch_add(1, SeqCst);
202 }
203 }
204 done.store(true, SeqCst);
205 })
206 .unwrap();
207}
208
209#[cfg_attr(miri, ignore)] // Miri is too slow
210#[test]
211fn no_starvation() {
212 const THREADS: usize = 8;
213 const COUNT: usize = 50_000;
214
215 let w = Worker::new_fifo();
216 let done = Arc::new(AtomicBool::new(false));
217 let mut all_hits = Vec::new();
218
219 scope(|scope| {
220 for _ in 0..THREADS {
221 let s = w.stealer();
222 let done = done.clone();
223 let hits = Arc::new(AtomicUsize::new(0));
224 all_hits.push(hits.clone());
225
226 scope.spawn(move |_| {
227 let w2 = Worker::new_fifo();
228
229 while !done.load(SeqCst) {
230 if let Success(_) = s.steal() {
231 hits.fetch_add(1, SeqCst);
232 }
233
234 let _ = s.steal_batch(&w2);
235
236 if let Success(_) = s.steal_batch_and_pop(&w2) {
237 hits.fetch_add(1, SeqCst);
238 }
239
240 while w2.pop().is_some() {
241 hits.fetch_add(1, SeqCst);
242 }
243 }
244 });
245 }
246
247 let mut rng = rand::thread_rng();
248 let mut my_hits = 0;
249 loop {
250 for i in 0..rng.gen_range(0..COUNT) {
251 if rng.gen_range(0..3) == 0 && my_hits == 0 {
252 while w.pop().is_some() {
253 my_hits += 1;
254 }
255 } else {
256 w.push(i);
257 }
258 }
259
260 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
261 break;
262 }
263 }
264 done.store(true, SeqCst);
265 })
266 .unwrap();
267}
268
269#[test]
270fn destructors() {
271 #[cfg(miri)]
272 const THREADS: usize = 2;
273 #[cfg(not(miri))]
274 const THREADS: usize = 8;
275 #[cfg(miri)]
276 const COUNT: usize = 500;
277 #[cfg(not(miri))]
278 const COUNT: usize = 50_000;
279 #[cfg(miri)]
280 const STEPS: usize = 100;
281 #[cfg(not(miri))]
282 const STEPS: usize = 1000;
283
284 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
285
286 impl Drop for Elem {
287 fn drop(&mut self) {
288 self.1.lock().unwrap().push(self.0);
289 }
290 }
291
292 let w = Worker::new_fifo();
293 let dropped = Arc::new(Mutex::new(Vec::new()));
294 let remaining = Arc::new(AtomicUsize::new(COUNT));
295
296 for i in 0..COUNT {
297 w.push(Elem(i, dropped.clone()));
298 }
299
300 scope(|scope| {
301 for _ in 0..THREADS {
302 let remaining = remaining.clone();
303 let s = w.stealer();
304
305 scope.spawn(move |_| {
306 let w2 = Worker::new_fifo();
307 let mut cnt = 0;
308
309 while cnt < STEPS {
310 if let Success(_) = s.steal() {
311 cnt += 1;
312 remaining.fetch_sub(1, SeqCst);
313 }
314
315 let _ = s.steal_batch(&w2);
316
317 if let Success(_) = s.steal_batch_and_pop(&w2) {
318 cnt += 1;
319 remaining.fetch_sub(1, SeqCst);
320 }
321
322 while w2.pop().is_some() {
323 cnt += 1;
324 remaining.fetch_sub(1, SeqCst);
325 }
326 }
327 });
328 }
329
330 for _ in 0..STEPS {
331 if w.pop().is_some() {
332 remaining.fetch_sub(1, SeqCst);
333 }
334 }
335 })
336 .unwrap();
337
338 let rem = remaining.load(SeqCst);
339 assert!(rem > 0);
340
341 {
342 let mut v = dropped.lock().unwrap();
343 assert_eq!(v.len(), COUNT - rem);
344 v.clear();
345 }
346
347 drop(w);
348
349 {
350 let mut v = dropped.lock().unwrap();
351 assert_eq!(v.len(), rem);
352 v.sort_unstable();
353 for pair in v.windows(2) {
354 assert_eq!(pair[0] + 1, pair[1]);
355 }
356 }
357}
358