1use std::sync::atomic::Ordering::SeqCst;
2use std::sync::atomic::{AtomicBool, AtomicUsize};
3use std::sync::{Arc, Mutex};
4
5use crossbeam_deque::Steal::{Empty, Success};
6use crossbeam_deque::{Injector, Worker};
7use crossbeam_utils::thread::scope;
8use rand::Rng;
9
10#[test]
11fn smoke() {
12 let q = Injector::new();
13 assert_eq!(q.steal(), Empty);
14
15 q.push(1);
16 q.push(2);
17 assert_eq!(q.steal(), Success(1));
18 assert_eq!(q.steal(), Success(2));
19 assert_eq!(q.steal(), Empty);
20
21 q.push(3);
22 assert_eq!(q.steal(), Success(3));
23 assert_eq!(q.steal(), Empty);
24}
25
26#[test]
27fn is_empty() {
28 let q = Injector::new();
29 assert!(q.is_empty());
30
31 q.push(1);
32 assert!(!q.is_empty());
33 q.push(2);
34 assert!(!q.is_empty());
35
36 let _ = q.steal();
37 assert!(!q.is_empty());
38 let _ = q.steal();
39 assert!(q.is_empty());
40
41 q.push(3);
42 assert!(!q.is_empty());
43 let _ = q.steal();
44 assert!(q.is_empty());
45}
46
47#[test]
48fn spsc() {
49 #[cfg(miri)]
50 const COUNT: usize = 500;
51 #[cfg(not(miri))]
52 const COUNT: usize = 100_000;
53
54 let q = Injector::new();
55
56 scope(|scope| {
57 scope.spawn(|_| {
58 for i in 0..COUNT {
59 loop {
60 if let Success(v) = q.steal() {
61 assert_eq!(i, v);
62 break;
63 }
64 #[cfg(miri)]
65 std::hint::spin_loop();
66 }
67 }
68
69 assert_eq!(q.steal(), Empty);
70 });
71
72 for i in 0..COUNT {
73 q.push(i);
74 }
75 })
76 .unwrap();
77}
78
79#[test]
80fn mpmc() {
81 #[cfg(miri)]
82 const COUNT: usize = 500;
83 #[cfg(not(miri))]
84 const COUNT: usize = 25_000;
85 const THREADS: usize = 4;
86
87 let q = Injector::new();
88 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
89
90 scope(|scope| {
91 for _ in 0..THREADS {
92 scope.spawn(|_| {
93 for i in 0..COUNT {
94 q.push(i);
95 }
96 });
97 }
98
99 for _ in 0..THREADS {
100 scope.spawn(|_| {
101 for _ in 0..COUNT {
102 loop {
103 if let Success(n) = q.steal() {
104 v[n].fetch_add(1, SeqCst);
105 break;
106 }
107 #[cfg(miri)]
108 std::hint::spin_loop();
109 }
110 }
111 });
112 }
113 })
114 .unwrap();
115
116 for c in v {
117 assert_eq!(c.load(SeqCst), THREADS);
118 }
119}
120
121#[test]
122fn stampede() {
123 const THREADS: usize = 8;
124 #[cfg(miri)]
125 const COUNT: usize = 500;
126 #[cfg(not(miri))]
127 const COUNT: usize = 50_000;
128
129 let q = Injector::new();
130
131 for i in 0..COUNT {
132 q.push(Box::new(i + 1));
133 }
134 let remaining = Arc::new(AtomicUsize::new(COUNT));
135
136 scope(|scope| {
137 for _ in 0..THREADS {
138 let remaining = remaining.clone();
139 let q = &q;
140
141 scope.spawn(move |_| {
142 let mut last = 0;
143 while remaining.load(SeqCst) > 0 {
144 if let Success(x) = q.steal() {
145 assert!(last < *x);
146 last = *x;
147 remaining.fetch_sub(1, SeqCst);
148 }
149 }
150 });
151 }
152
153 let mut last = 0;
154 while remaining.load(SeqCst) > 0 {
155 if let Success(x) = q.steal() {
156 assert!(last < *x);
157 last = *x;
158 remaining.fetch_sub(1, SeqCst);
159 }
160 }
161 })
162 .unwrap();
163}
164
165#[test]
166fn stress() {
167 const THREADS: usize = 8;
168 #[cfg(miri)]
169 const COUNT: usize = 500;
170 #[cfg(not(miri))]
171 const COUNT: usize = 50_000;
172
173 let q = Injector::new();
174 let done = Arc::new(AtomicBool::new(false));
175 let hits = Arc::new(AtomicUsize::new(0));
176
177 scope(|scope| {
178 for _ in 0..THREADS {
179 let done = done.clone();
180 let hits = hits.clone();
181 let q = &q;
182
183 scope.spawn(move |_| {
184 let w2 = Worker::new_fifo();
185
186 while !done.load(SeqCst) {
187 if let Success(_) = q.steal() {
188 hits.fetch_add(1, SeqCst);
189 }
190
191 let _ = q.steal_batch(&w2);
192
193 if let Success(_) = q.steal_batch_and_pop(&w2) {
194 hits.fetch_add(1, SeqCst);
195 }
196
197 while w2.pop().is_some() {
198 hits.fetch_add(1, SeqCst);
199 }
200 }
201 });
202 }
203
204 let mut rng = rand::thread_rng();
205 let mut expected = 0;
206 while expected < COUNT {
207 if rng.gen_range(0..3) == 0 {
208 while let Success(_) = q.steal() {
209 hits.fetch_add(1, SeqCst);
210 }
211 } else {
212 q.push(expected);
213 expected += 1;
214 }
215 }
216
217 while hits.load(SeqCst) < COUNT {
218 while let Success(_) = q.steal() {
219 hits.fetch_add(1, SeqCst);
220 }
221 }
222 done.store(true, SeqCst);
223 })
224 .unwrap();
225}
226
227#[cfg_attr(miri, ignore)] // Miri is too slow
228#[test]
229fn no_starvation() {
230 const THREADS: usize = 8;
231 const COUNT: usize = 50_000;
232
233 let q = Injector::new();
234 let done = Arc::new(AtomicBool::new(false));
235 let mut all_hits = Vec::new();
236
237 scope(|scope| {
238 for _ in 0..THREADS {
239 let done = done.clone();
240 let hits = Arc::new(AtomicUsize::new(0));
241 all_hits.push(hits.clone());
242 let q = &q;
243
244 scope.spawn(move |_| {
245 let w2 = Worker::new_fifo();
246
247 while !done.load(SeqCst) {
248 if let Success(_) = q.steal() {
249 hits.fetch_add(1, SeqCst);
250 }
251
252 let _ = q.steal_batch(&w2);
253
254 if let Success(_) = q.steal_batch_and_pop(&w2) {
255 hits.fetch_add(1, SeqCst);
256 }
257
258 while w2.pop().is_some() {
259 hits.fetch_add(1, SeqCst);
260 }
261 }
262 });
263 }
264
265 let mut rng = rand::thread_rng();
266 let mut my_hits = 0;
267 loop {
268 for i in 0..rng.gen_range(0..COUNT) {
269 if rng.gen_range(0..3) == 0 && my_hits == 0 {
270 while let Success(_) = q.steal() {
271 my_hits += 1;
272 }
273 } else {
274 q.push(i);
275 }
276 }
277
278 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
279 break;
280 }
281 }
282 done.store(true, SeqCst);
283 })
284 .unwrap();
285}
286
287#[test]
288fn destructors() {
289 #[cfg(miri)]
290 const THREADS: usize = 2;
291 #[cfg(not(miri))]
292 const THREADS: usize = 8;
293 #[cfg(miri)]
294 const COUNT: usize = 500;
295 #[cfg(not(miri))]
296 const COUNT: usize = 50_000;
297 #[cfg(miri)]
298 const STEPS: usize = 100;
299 #[cfg(not(miri))]
300 const STEPS: usize = 1000;
301
302 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
303
304 impl Drop for Elem {
305 fn drop(&mut self) {
306 self.1.lock().unwrap().push(self.0);
307 }
308 }
309
310 let q = Injector::new();
311 let dropped = Arc::new(Mutex::new(Vec::new()));
312 let remaining = Arc::new(AtomicUsize::new(COUNT));
313
314 for i in 0..COUNT {
315 q.push(Elem(i, dropped.clone()));
316 }
317
318 scope(|scope| {
319 for _ in 0..THREADS {
320 let remaining = remaining.clone();
321 let q = &q;
322
323 scope.spawn(move |_| {
324 let w2 = Worker::new_fifo();
325 let mut cnt = 0;
326
327 while cnt < STEPS {
328 if let Success(_) = q.steal() {
329 cnt += 1;
330 remaining.fetch_sub(1, SeqCst);
331 }
332
333 let _ = q.steal_batch(&w2);
334
335 if let Success(_) = q.steal_batch_and_pop(&w2) {
336 cnt += 1;
337 remaining.fetch_sub(1, SeqCst);
338 }
339
340 while w2.pop().is_some() {
341 cnt += 1;
342 remaining.fetch_sub(1, SeqCst);
343 }
344 }
345 });
346 }
347
348 for _ in 0..STEPS {
349 if let Success(_) = q.steal() {
350 remaining.fetch_sub(1, SeqCst);
351 }
352 }
353 })
354 .unwrap();
355
356 let rem = remaining.load(SeqCst);
357 assert!(rem > 0);
358
359 {
360 let mut v = dropped.lock().unwrap();
361 assert_eq!(v.len(), COUNT - rem);
362 v.clear();
363 }
364
365 drop(q);
366
367 {
368 let mut v = dropped.lock().unwrap();
369 assert_eq!(v.len(), rem);
370 v.sort_unstable();
371 for pair in v.windows(2) {
372 assert_eq!(pair[0] + 1, pair[1]);
373 }
374 }
375}
376