1use crate::runtime::scheduler::multi_thread::{queue, Stats};
2use crate::runtime::task::{self, Schedule, Task};
3
4use std::cell::RefCell;
5use std::thread;
6use std::time::Duration;
7
8#[allow(unused)]
9macro_rules! assert_metrics {
10 ($stats:ident, $field:ident == $v:expr) => {{
11 use crate::runtime::WorkerMetrics;
12 use std::sync::atomic::Ordering::Relaxed;
13
14 let worker = WorkerMetrics::new();
15 $stats.submit(&worker);
16
17 let expect = $v;
18 let actual = worker.$field.load(Relaxed);
19
20 assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
21 }};
22}
23
24fn new_stats() -> Stats {
25 use crate::runtime::WorkerMetrics;
26 Stats::new(&WorkerMetrics::new())
27}
28
29#[test]
30fn fits_256_one_at_a_time() {
31 let (_, mut local) = queue::local();
32 let inject = RefCell::new(vec![]);
33 let mut stats = new_stats();
34
35 for _ in 0..256 {
36 let (task, _) = super::unowned(async {});
37 local.push_back_or_overflow(task, &inject, &mut stats);
38 }
39
40 cfg_metrics! {
41 assert_metrics!(stats, overflow_count == 0);
42 }
43
44 assert!(inject.borrow_mut().pop().is_none());
45
46 while local.pop().is_some() {}
47}
48
49#[test]
50fn fits_256_all_at_once() {
51 let (_, mut local) = queue::local();
52
53 let mut tasks = (0..256)
54 .map(|_| super::unowned(async {}).0)
55 .collect::<Vec<_>>();
56 local.push_back(tasks.drain(..));
57
58 let mut i = 0;
59 while local.pop().is_some() {
60 i += 1;
61 }
62
63 assert_eq!(i, 256);
64}
65
66#[test]
67fn fits_256_all_in_chunks() {
68 let (_, mut local) = queue::local();
69
70 let mut tasks = (0..256)
71 .map(|_| super::unowned(async {}).0)
72 .collect::<Vec<_>>();
73
74 local.push_back(tasks.drain(..10));
75 local.push_back(tasks.drain(..100));
76 local.push_back(tasks.drain(..46));
77 local.push_back(tasks.drain(..100));
78
79 let mut i = 0;
80 while local.pop().is_some() {
81 i += 1;
82 }
83
84 assert_eq!(i, 256);
85}
86
87#[test]
88fn overflow() {
89 let (_, mut local) = queue::local();
90 let inject = RefCell::new(vec![]);
91 let mut stats = new_stats();
92
93 for _ in 0..257 {
94 let (task, _) = super::unowned(async {});
95 local.push_back_or_overflow(task, &inject, &mut stats);
96 }
97
98 cfg_metrics! {
99 assert_metrics!(stats, overflow_count == 1);
100 }
101
102 let mut n = 0;
103
104 n += inject.borrow_mut().drain(..).count();
105
106 while local.pop().is_some() {
107 n += 1;
108 }
109
110 assert_eq!(n, 257);
111}
112
113#[test]
114fn steal_batch() {
115 let mut stats = new_stats();
116
117 let (steal1, mut local1) = queue::local();
118 let (_, mut local2) = queue::local();
119 let inject = RefCell::new(vec![]);
120
121 for _ in 0..4 {
122 let (task, _) = super::unowned(async {});
123 local1.push_back_or_overflow(task, &inject, &mut stats);
124 }
125
126 assert!(steal1.steal_into(&mut local2, &mut stats).is_some());
127
128 cfg_metrics! {
129 assert_metrics!(stats, steal_count == 2);
130 }
131
132 for _ in 0..1 {
133 assert!(local2.pop().is_some());
134 }
135
136 assert!(local2.pop().is_none());
137
138 for _ in 0..2 {
139 assert!(local1.pop().is_some());
140 }
141
142 assert!(local1.pop().is_none());
143}
144
145const fn normal_or_miri(normal: usize, miri: usize) -> usize {
146 if cfg!(miri) {
147 miri
148 } else {
149 normal
150 }
151}
152
153#[test]
154fn stress1() {
155 const NUM_ITER: usize = 5;
156 const NUM_STEAL: usize = normal_or_miri(1_000, 10);
157 const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
158 const NUM_PUSH: usize = normal_or_miri(500, 10);
159 const NUM_POP: usize = normal_or_miri(250, 10);
160
161 let mut stats = new_stats();
162
163 for _ in 0..NUM_ITER {
164 let (steal, mut local) = queue::local();
165 let inject = RefCell::new(vec![]);
166
167 let th = thread::spawn(move || {
168 let mut stats = new_stats();
169 let (_, mut local) = queue::local();
170 let mut n = 0;
171
172 for _ in 0..NUM_STEAL {
173 if steal.steal_into(&mut local, &mut stats).is_some() {
174 n += 1;
175 }
176
177 while local.pop().is_some() {
178 n += 1;
179 }
180
181 thread::yield_now();
182 }
183
184 cfg_metrics! {
185 assert_metrics!(stats, steal_count == n as _);
186 }
187
188 n
189 });
190
191 let mut n = 0;
192
193 for _ in 0..NUM_LOCAL {
194 for _ in 0..NUM_PUSH {
195 let (task, _) = super::unowned(async {});
196 local.push_back_or_overflow(task, &inject, &mut stats);
197 }
198
199 for _ in 0..NUM_POP {
200 if local.pop().is_some() {
201 n += 1;
202 } else {
203 break;
204 }
205 }
206 }
207
208 n += inject.borrow_mut().drain(..).count();
209
210 n += th.join().unwrap();
211
212 assert_eq!(n, NUM_LOCAL * NUM_PUSH);
213 }
214}
215
216#[test]
217fn stress2() {
218 const NUM_ITER: usize = 1;
219 const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
220 const NUM_STEAL: usize = normal_or_miri(1_000, 10);
221
222 let mut stats = new_stats();
223
224 for _ in 0..NUM_ITER {
225 let (steal, mut local) = queue::local();
226 let inject = RefCell::new(vec![]);
227
228 let th = thread::spawn(move || {
229 let mut stats = new_stats();
230 let (_, mut local) = queue::local();
231 let mut n = 0;
232
233 for _ in 0..NUM_STEAL {
234 if steal.steal_into(&mut local, &mut stats).is_some() {
235 n += 1;
236 }
237
238 while local.pop().is_some() {
239 n += 1;
240 }
241
242 thread::sleep(Duration::from_micros(10));
243 }
244
245 n
246 });
247
248 let mut num_pop = 0;
249
250 for i in 0..NUM_TASKS {
251 let (task, _) = super::unowned(async {});
252 local.push_back_or_overflow(task, &inject, &mut stats);
253
254 if i % 128 == 0 && local.pop().is_some() {
255 num_pop += 1;
256 }
257
258 num_pop += inject.borrow_mut().drain(..).count();
259 }
260
261 num_pop += th.join().unwrap();
262
263 while local.pop().is_some() {
264 num_pop += 1;
265 }
266
267 num_pop += inject.borrow_mut().drain(..).count();
268
269 assert_eq!(num_pop, NUM_TASKS);
270 }
271}
272
273struct Runtime;
274
275impl Schedule for Runtime {
276 fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
277 None
278 }
279
280 fn schedule(&self, _task: task::Notified<Self>) {
281 unreachable!();
282 }
283}
284