1 | //! Benchmark implementation details of the threaded scheduler. These benches are |
2 | //! intended to be used as a form of regression testing and not as a general |
3 | //! purpose benchmark demonstrating real-world performance. |
4 | |
5 | use tokio::runtime::{self, Runtime}; |
6 | use tokio::sync::oneshot; |
7 | |
8 | use std::sync::atomic::Ordering::Relaxed; |
9 | use std::sync::atomic::{AtomicBool, AtomicUsize}; |
10 | use std::sync::{mpsc, Arc}; |
11 | use std::time::{Duration, Instant}; |
12 | |
13 | use criterion::{criterion_group, criterion_main, Criterion}; |
14 | |
15 | const NUM_WORKERS: usize = 4; |
16 | const NUM_SPAWN: usize = 10_000; |
17 | const STALL_DUR: Duration = Duration::from_micros(10); |
18 | |
19 | fn spawn_many_local(c: &mut Criterion) { |
20 | let rt = rt(); |
21 | |
22 | let (tx, rx) = mpsc::sync_channel(1000); |
23 | let rem = Arc::new(AtomicUsize::new(0)); |
24 | |
25 | c.bench_function("spawn_many_local" , |b| { |
26 | b.iter(|| { |
27 | rem.store(NUM_SPAWN, Relaxed); |
28 | |
29 | rt.block_on(async { |
30 | for _ in 0..NUM_SPAWN { |
31 | let tx = tx.clone(); |
32 | let rem = rem.clone(); |
33 | |
34 | tokio::spawn(async move { |
35 | if 1 == rem.fetch_sub(1, Relaxed) { |
36 | tx.send(()).unwrap(); |
37 | } |
38 | }); |
39 | } |
40 | |
41 | let _ = rx.recv().unwrap(); |
42 | }); |
43 | }) |
44 | }); |
45 | } |
46 | |
47 | fn spawn_many_remote_idle(c: &mut Criterion) { |
48 | let rt = rt(); |
49 | |
50 | let mut handles = Vec::with_capacity(NUM_SPAWN); |
51 | |
52 | c.bench_function("spawn_many_remote_idle" , |b| { |
53 | b.iter(|| { |
54 | for _ in 0..NUM_SPAWN { |
55 | handles.push(rt.spawn(async {})); |
56 | } |
57 | |
58 | rt.block_on(async { |
59 | for handle in handles.drain(..) { |
60 | handle.await.unwrap(); |
61 | } |
62 | }); |
63 | }) |
64 | }); |
65 | } |
66 | |
67 | // The runtime is busy with tasks that consume CPU time and yield. Yielding is a |
68 | // lower notification priority than spawning / regular notification. |
69 | fn spawn_many_remote_busy1(c: &mut Criterion) { |
70 | let rt = rt(); |
71 | let rt_handle = rt.handle(); |
72 | let mut handles = Vec::with_capacity(NUM_SPAWN); |
73 | let flag = Arc::new(AtomicBool::new(true)); |
74 | |
75 | // Spawn some tasks to keep the runtimes busy |
76 | for _ in 0..(2 * NUM_WORKERS) { |
77 | let flag = flag.clone(); |
78 | rt.spawn(async move { |
79 | while flag.load(Relaxed) { |
80 | tokio::task::yield_now().await; |
81 | stall(); |
82 | } |
83 | }); |
84 | } |
85 | |
86 | c.bench_function("spawn_many_remote_busy1" , |b| { |
87 | b.iter(|| { |
88 | for _ in 0..NUM_SPAWN { |
89 | handles.push(rt_handle.spawn(async {})); |
90 | } |
91 | |
92 | rt.block_on(async { |
93 | for handle in handles.drain(..) { |
94 | handle.await.unwrap(); |
95 | } |
96 | }); |
97 | }) |
98 | }); |
99 | |
100 | flag.store(false, Relaxed); |
101 | } |
102 | |
103 | // The runtime is busy with tasks that consume CPU time and spawn new high-CPU |
104 | // tasks. Spawning goes via a higher notification priority than yielding. |
105 | fn spawn_many_remote_busy2(c: &mut Criterion) { |
106 | const NUM_SPAWN: usize = 1_000; |
107 | |
108 | let rt = rt(); |
109 | let rt_handle = rt.handle(); |
110 | let mut handles = Vec::with_capacity(NUM_SPAWN); |
111 | let flag = Arc::new(AtomicBool::new(true)); |
112 | |
113 | // Spawn some tasks to keep the runtimes busy |
114 | for _ in 0..(NUM_WORKERS) { |
115 | let flag = flag.clone(); |
116 | fn iter(flag: Arc<AtomicBool>) { |
117 | tokio::spawn(async { |
118 | if flag.load(Relaxed) { |
119 | stall(); |
120 | iter(flag); |
121 | } |
122 | }); |
123 | } |
124 | rt.spawn(async { |
125 | iter(flag); |
126 | }); |
127 | } |
128 | |
129 | c.bench_function("spawn_many_remote_busy2" , |b| { |
130 | b.iter(|| { |
131 | for _ in 0..NUM_SPAWN { |
132 | handles.push(rt_handle.spawn(async {})); |
133 | } |
134 | |
135 | rt.block_on(async { |
136 | for handle in handles.drain(..) { |
137 | handle.await.unwrap(); |
138 | } |
139 | }); |
140 | }) |
141 | }); |
142 | |
143 | flag.store(false, Relaxed); |
144 | } |
145 | |
146 | fn yield_many(c: &mut Criterion) { |
147 | const NUM_YIELD: usize = 1_000; |
148 | const TASKS: usize = 200; |
149 | |
150 | c.bench_function("yield_many" , |b| { |
151 | let rt = rt(); |
152 | let (tx, rx) = mpsc::sync_channel(TASKS); |
153 | |
154 | b.iter(move || { |
155 | for _ in 0..TASKS { |
156 | let tx = tx.clone(); |
157 | |
158 | rt.spawn(async move { |
159 | for _ in 0..NUM_YIELD { |
160 | tokio::task::yield_now().await; |
161 | } |
162 | |
163 | tx.send(()).unwrap(); |
164 | }); |
165 | } |
166 | |
167 | for _ in 0..TASKS { |
168 | let _ = rx.recv().unwrap(); |
169 | } |
170 | }) |
171 | }); |
172 | } |
173 | |
174 | fn ping_pong(c: &mut Criterion) { |
175 | const NUM_PINGS: usize = 1_000; |
176 | |
177 | let rt = rt(); |
178 | |
179 | let (done_tx, done_rx) = mpsc::sync_channel(1000); |
180 | let rem = Arc::new(AtomicUsize::new(0)); |
181 | |
182 | c.bench_function("ping_pong" , |b| { |
183 | b.iter(|| { |
184 | let done_tx = done_tx.clone(); |
185 | let rem = rem.clone(); |
186 | rem.store(NUM_PINGS, Relaxed); |
187 | |
188 | rt.block_on(async { |
189 | tokio::spawn(async move { |
190 | for _ in 0..NUM_PINGS { |
191 | let rem = rem.clone(); |
192 | let done_tx = done_tx.clone(); |
193 | |
194 | tokio::spawn(async move { |
195 | let (tx1, rx1) = oneshot::channel(); |
196 | let (tx2, rx2) = oneshot::channel(); |
197 | |
198 | tokio::spawn(async move { |
199 | rx1.await.unwrap(); |
200 | tx2.send(()).unwrap(); |
201 | }); |
202 | |
203 | tx1.send(()).unwrap(); |
204 | rx2.await.unwrap(); |
205 | |
206 | if 1 == rem.fetch_sub(1, Relaxed) { |
207 | done_tx.send(()).unwrap(); |
208 | } |
209 | }); |
210 | } |
211 | }); |
212 | |
213 | done_rx.recv().unwrap(); |
214 | }); |
215 | }) |
216 | }); |
217 | } |
218 | |
219 | fn chained_spawn(c: &mut Criterion) { |
220 | const ITER: usize = 1_000; |
221 | |
222 | fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { |
223 | if n == 0 { |
224 | done_tx.send(()).unwrap(); |
225 | } else { |
226 | tokio::spawn(async move { |
227 | iter(done_tx, n - 1); |
228 | }); |
229 | } |
230 | } |
231 | |
232 | c.bench_function("chained_spawn" , |b| { |
233 | let rt = rt(); |
234 | let (done_tx, done_rx) = mpsc::sync_channel(1000); |
235 | |
236 | b.iter(move || { |
237 | let done_tx = done_tx.clone(); |
238 | |
239 | rt.block_on(async { |
240 | tokio::spawn(async move { |
241 | iter(done_tx, ITER); |
242 | }); |
243 | |
244 | done_rx.recv().unwrap(); |
245 | }); |
246 | }) |
247 | }); |
248 | } |
249 | |
250 | fn rt() -> Runtime { |
251 | runtime::Builder::new_multi_thread() |
252 | .worker_threads(NUM_WORKERS) |
253 | .enable_all() |
254 | .build() |
255 | .unwrap() |
256 | } |
257 | |
258 | fn stall() { |
259 | let now = Instant::now(); |
260 | while now.elapsed() < STALL_DUR { |
261 | std::thread::yield_now(); |
262 | } |
263 | } |
264 | |
265 | criterion_group!( |
266 | scheduler, |
267 | spawn_many_local, |
268 | spawn_many_remote_idle, |
269 | spawn_many_remote_busy1, |
270 | spawn_many_remote_busy2, |
271 | ping_pong, |
272 | yield_many, |
273 | chained_spawn, |
274 | ); |
275 | |
276 | criterion_main!(scheduler); |
277 | |