1 | use crate::runtime::scheduler::multi_thread::{queue, Stats}; |
2 | use crate::runtime::task::{self, Schedule, Task}; |
3 | |
4 | use std::cell::RefCell; |
5 | use std::thread; |
6 | use std::time::Duration; |
7 | |
8 | #[allow (unused)] |
9 | macro_rules! assert_metrics { |
10 | ($stats:ident, $field:ident == $v:expr) => {{ |
11 | use crate::runtime::WorkerMetrics; |
12 | use std::sync::atomic::Ordering::Relaxed; |
13 | |
14 | let worker = WorkerMetrics::new(); |
15 | $stats.submit(&worker); |
16 | |
17 | let expect = $v; |
18 | let actual = worker.$field.load(Relaxed); |
19 | |
20 | assert!(actual == expect, "expect = {}; actual = {}" , expect, actual) |
21 | }}; |
22 | } |
23 | |
24 | fn new_stats() -> Stats { |
25 | use crate::runtime::WorkerMetrics; |
26 | Stats::new(&WorkerMetrics::new()) |
27 | } |
28 | |
29 | #[test] |
30 | fn fits_256_one_at_a_time() { |
31 | let (_, mut local) = queue::local(); |
32 | let inject = RefCell::new(vec![]); |
33 | let mut stats = new_stats(); |
34 | |
35 | for _ in 0..256 { |
36 | let (task, _) = super::unowned(async {}); |
37 | local.push_back_or_overflow(task, &inject, &mut stats); |
38 | } |
39 | |
40 | cfg_metrics! { |
41 | assert_metrics!(stats, overflow_count == 0); |
42 | } |
43 | |
44 | assert!(inject.borrow_mut().pop().is_none()); |
45 | |
46 | while local.pop().is_some() {} |
47 | } |
48 | |
49 | #[test] |
50 | fn fits_256_all_at_once() { |
51 | let (_, mut local) = queue::local(); |
52 | |
53 | let mut tasks = (0..256) |
54 | .map(|_| super::unowned(async {}).0) |
55 | .collect::<Vec<_>>(); |
56 | local.push_back(tasks.drain(..)); |
57 | |
58 | let mut i = 0; |
59 | while local.pop().is_some() { |
60 | i += 1; |
61 | } |
62 | |
63 | assert_eq!(i, 256); |
64 | } |
65 | |
66 | #[test] |
67 | fn fits_256_all_in_chunks() { |
68 | let (_, mut local) = queue::local(); |
69 | |
70 | let mut tasks = (0..256) |
71 | .map(|_| super::unowned(async {}).0) |
72 | .collect::<Vec<_>>(); |
73 | |
74 | local.push_back(tasks.drain(..10)); |
75 | local.push_back(tasks.drain(..100)); |
76 | local.push_back(tasks.drain(..46)); |
77 | local.push_back(tasks.drain(..100)); |
78 | |
79 | let mut i = 0; |
80 | while local.pop().is_some() { |
81 | i += 1; |
82 | } |
83 | |
84 | assert_eq!(i, 256); |
85 | } |
86 | |
87 | #[test] |
88 | fn overflow() { |
89 | let (_, mut local) = queue::local(); |
90 | let inject = RefCell::new(vec![]); |
91 | let mut stats = new_stats(); |
92 | |
93 | for _ in 0..257 { |
94 | let (task, _) = super::unowned(async {}); |
95 | local.push_back_or_overflow(task, &inject, &mut stats); |
96 | } |
97 | |
98 | cfg_metrics! { |
99 | assert_metrics!(stats, overflow_count == 1); |
100 | } |
101 | |
102 | let mut n = 0; |
103 | |
104 | n += inject.borrow_mut().drain(..).count(); |
105 | |
106 | while local.pop().is_some() { |
107 | n += 1; |
108 | } |
109 | |
110 | assert_eq!(n, 257); |
111 | } |
112 | |
113 | #[test] |
114 | fn steal_batch() { |
115 | let mut stats = new_stats(); |
116 | |
117 | let (steal1, mut local1) = queue::local(); |
118 | let (_, mut local2) = queue::local(); |
119 | let inject = RefCell::new(vec![]); |
120 | |
121 | for _ in 0..4 { |
122 | let (task, _) = super::unowned(async {}); |
123 | local1.push_back_or_overflow(task, &inject, &mut stats); |
124 | } |
125 | |
126 | assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); |
127 | |
128 | cfg_metrics! { |
129 | assert_metrics!(stats, steal_count == 2); |
130 | } |
131 | |
132 | for _ in 0..1 { |
133 | assert!(local2.pop().is_some()); |
134 | } |
135 | |
136 | assert!(local2.pop().is_none()); |
137 | |
138 | for _ in 0..2 { |
139 | assert!(local1.pop().is_some()); |
140 | } |
141 | |
142 | assert!(local1.pop().is_none()); |
143 | } |
144 | |
145 | const fn normal_or_miri(normal: usize, miri: usize) -> usize { |
146 | if cfg!(miri) { |
147 | miri |
148 | } else { |
149 | normal |
150 | } |
151 | } |
152 | |
153 | #[test] |
154 | fn stress1() { |
155 | const NUM_ITER: usize = 5; |
156 | const NUM_STEAL: usize = normal_or_miri(1_000, 10); |
157 | const NUM_LOCAL: usize = normal_or_miri(1_000, 10); |
158 | const NUM_PUSH: usize = normal_or_miri(500, 10); |
159 | const NUM_POP: usize = normal_or_miri(250, 10); |
160 | |
161 | let mut stats = new_stats(); |
162 | |
163 | for _ in 0..NUM_ITER { |
164 | let (steal, mut local) = queue::local(); |
165 | let inject = RefCell::new(vec![]); |
166 | |
167 | let th = thread::spawn(move || { |
168 | let mut stats = new_stats(); |
169 | let (_, mut local) = queue::local(); |
170 | let mut n = 0; |
171 | |
172 | for _ in 0..NUM_STEAL { |
173 | if steal.steal_into(&mut local, &mut stats).is_some() { |
174 | n += 1; |
175 | } |
176 | |
177 | while local.pop().is_some() { |
178 | n += 1; |
179 | } |
180 | |
181 | thread::yield_now(); |
182 | } |
183 | |
184 | cfg_metrics! { |
185 | assert_metrics!(stats, steal_count == n as _); |
186 | } |
187 | |
188 | n |
189 | }); |
190 | |
191 | let mut n = 0; |
192 | |
193 | for _ in 0..NUM_LOCAL { |
194 | for _ in 0..NUM_PUSH { |
195 | let (task, _) = super::unowned(async {}); |
196 | local.push_back_or_overflow(task, &inject, &mut stats); |
197 | } |
198 | |
199 | for _ in 0..NUM_POP { |
200 | if local.pop().is_some() { |
201 | n += 1; |
202 | } else { |
203 | break; |
204 | } |
205 | } |
206 | } |
207 | |
208 | n += inject.borrow_mut().drain(..).count(); |
209 | |
210 | n += th.join().unwrap(); |
211 | |
212 | assert_eq!(n, NUM_LOCAL * NUM_PUSH); |
213 | } |
214 | } |
215 | |
216 | #[test] |
217 | fn stress2() { |
218 | const NUM_ITER: usize = 1; |
219 | const NUM_TASKS: usize = normal_or_miri(1_000_000, 50); |
220 | const NUM_STEAL: usize = normal_or_miri(1_000, 10); |
221 | |
222 | let mut stats = new_stats(); |
223 | |
224 | for _ in 0..NUM_ITER { |
225 | let (steal, mut local) = queue::local(); |
226 | let inject = RefCell::new(vec![]); |
227 | |
228 | let th = thread::spawn(move || { |
229 | let mut stats = new_stats(); |
230 | let (_, mut local) = queue::local(); |
231 | let mut n = 0; |
232 | |
233 | for _ in 0..NUM_STEAL { |
234 | if steal.steal_into(&mut local, &mut stats).is_some() { |
235 | n += 1; |
236 | } |
237 | |
238 | while local.pop().is_some() { |
239 | n += 1; |
240 | } |
241 | |
242 | thread::sleep(Duration::from_micros(10)); |
243 | } |
244 | |
245 | n |
246 | }); |
247 | |
248 | let mut num_pop = 0; |
249 | |
250 | for i in 0..NUM_TASKS { |
251 | let (task, _) = super::unowned(async {}); |
252 | local.push_back_or_overflow(task, &inject, &mut stats); |
253 | |
254 | if i % 128 == 0 && local.pop().is_some() { |
255 | num_pop += 1; |
256 | } |
257 | |
258 | num_pop += inject.borrow_mut().drain(..).count(); |
259 | } |
260 | |
261 | num_pop += th.join().unwrap(); |
262 | |
263 | while local.pop().is_some() { |
264 | num_pop += 1; |
265 | } |
266 | |
267 | num_pop += inject.borrow_mut().drain(..).count(); |
268 | |
269 | assert_eq!(num_pop, NUM_TASKS); |
270 | } |
271 | } |
272 | |
273 | struct Runtime; |
274 | |
275 | impl Schedule for Runtime { |
276 | fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { |
277 | None |
278 | } |
279 | |
280 | fn schedule(&self, _task: task::Notified<Self>) { |
281 | unreachable!(); |
282 | } |
283 | } |
284 | |