1//! Benchmark implementation details of the threaded scheduler. These benches are
2//! intended to be used as a form of regression testing and not as a general
3//! purpose benchmark demonstrating real-world performance.
4
5use tokio::runtime::{self, Runtime};
6use tokio::sync::oneshot;
7
8use std::sync::atomic::Ordering::Relaxed;
9use std::sync::atomic::{AtomicBool, AtomicUsize};
10use std::sync::{mpsc, Arc};
11use std::time::{Duration, Instant};
12
13use criterion::{criterion_group, criterion_main, Criterion};
14
15const NUM_WORKERS: usize = 4;
16const NUM_SPAWN: usize = 10_000;
17const STALL_DUR: Duration = Duration::from_micros(10);
18
19fn spawn_many_local(c: &mut Criterion) {
20 let rt = rt();
21
22 let (tx, rx) = mpsc::sync_channel(1000);
23 let rem = Arc::new(AtomicUsize::new(0));
24
25 c.bench_function("spawn_many_local", |b| {
26 b.iter(|| {
27 rem.store(NUM_SPAWN, Relaxed);
28
29 rt.block_on(async {
30 for _ in 0..NUM_SPAWN {
31 let tx = tx.clone();
32 let rem = rem.clone();
33
34 tokio::spawn(async move {
35 if 1 == rem.fetch_sub(1, Relaxed) {
36 tx.send(()).unwrap();
37 }
38 });
39 }
40
41 let _ = rx.recv().unwrap();
42 });
43 })
44 });
45}
46
47fn spawn_many_remote_idle(c: &mut Criterion) {
48 let rt = rt();
49
50 let mut handles = Vec::with_capacity(NUM_SPAWN);
51
52 c.bench_function("spawn_many_remote_idle", |b| {
53 b.iter(|| {
54 for _ in 0..NUM_SPAWN {
55 handles.push(rt.spawn(async {}));
56 }
57
58 rt.block_on(async {
59 for handle in handles.drain(..) {
60 handle.await.unwrap();
61 }
62 });
63 })
64 });
65}
66
67// The runtime is busy with tasks that consume CPU time and yield. Yielding is a
68// lower notification priority than spawning / regular notification.
69fn spawn_many_remote_busy1(c: &mut Criterion) {
70 let rt = rt();
71 let rt_handle = rt.handle();
72 let mut handles = Vec::with_capacity(NUM_SPAWN);
73 let flag = Arc::new(AtomicBool::new(true));
74
75 // Spawn some tasks to keep the runtimes busy
76 for _ in 0..(2 * NUM_WORKERS) {
77 let flag = flag.clone();
78 rt.spawn(async move {
79 while flag.load(Relaxed) {
80 tokio::task::yield_now().await;
81 stall();
82 }
83 });
84 }
85
86 c.bench_function("spawn_many_remote_busy1", |b| {
87 b.iter(|| {
88 for _ in 0..NUM_SPAWN {
89 handles.push(rt_handle.spawn(async {}));
90 }
91
92 rt.block_on(async {
93 for handle in handles.drain(..) {
94 handle.await.unwrap();
95 }
96 });
97 })
98 });
99
100 flag.store(false, Relaxed);
101}
102
103// The runtime is busy with tasks that consume CPU time and spawn new high-CPU
104// tasks. Spawning goes via a higher notification priority than yielding.
105fn spawn_many_remote_busy2(c: &mut Criterion) {
106 const NUM_SPAWN: usize = 1_000;
107
108 let rt = rt();
109 let rt_handle = rt.handle();
110 let mut handles = Vec::with_capacity(NUM_SPAWN);
111 let flag = Arc::new(AtomicBool::new(true));
112
113 // Spawn some tasks to keep the runtimes busy
114 for _ in 0..(NUM_WORKERS) {
115 let flag = flag.clone();
116 fn iter(flag: Arc<AtomicBool>) {
117 tokio::spawn(async {
118 if flag.load(Relaxed) {
119 stall();
120 iter(flag);
121 }
122 });
123 }
124 rt.spawn(async {
125 iter(flag);
126 });
127 }
128
129 c.bench_function("spawn_many_remote_busy2", |b| {
130 b.iter(|| {
131 for _ in 0..NUM_SPAWN {
132 handles.push(rt_handle.spawn(async {}));
133 }
134
135 rt.block_on(async {
136 for handle in handles.drain(..) {
137 handle.await.unwrap();
138 }
139 });
140 })
141 });
142
143 flag.store(false, Relaxed);
144}
145
146fn yield_many(c: &mut Criterion) {
147 const NUM_YIELD: usize = 1_000;
148 const TASKS: usize = 200;
149
150 c.bench_function("yield_many", |b| {
151 let rt = rt();
152 let (tx, rx) = mpsc::sync_channel(TASKS);
153
154 b.iter(move || {
155 for _ in 0..TASKS {
156 let tx = tx.clone();
157
158 rt.spawn(async move {
159 for _ in 0..NUM_YIELD {
160 tokio::task::yield_now().await;
161 }
162
163 tx.send(()).unwrap();
164 });
165 }
166
167 for _ in 0..TASKS {
168 let _ = rx.recv().unwrap();
169 }
170 })
171 });
172}
173
174fn ping_pong(c: &mut Criterion) {
175 const NUM_PINGS: usize = 1_000;
176
177 let rt = rt();
178
179 let (done_tx, done_rx) = mpsc::sync_channel(1000);
180 let rem = Arc::new(AtomicUsize::new(0));
181
182 c.bench_function("ping_pong", |b| {
183 b.iter(|| {
184 let done_tx = done_tx.clone();
185 let rem = rem.clone();
186 rem.store(NUM_PINGS, Relaxed);
187
188 rt.block_on(async {
189 tokio::spawn(async move {
190 for _ in 0..NUM_PINGS {
191 let rem = rem.clone();
192 let done_tx = done_tx.clone();
193
194 tokio::spawn(async move {
195 let (tx1, rx1) = oneshot::channel();
196 let (tx2, rx2) = oneshot::channel();
197
198 tokio::spawn(async move {
199 rx1.await.unwrap();
200 tx2.send(()).unwrap();
201 });
202
203 tx1.send(()).unwrap();
204 rx2.await.unwrap();
205
206 if 1 == rem.fetch_sub(1, Relaxed) {
207 done_tx.send(()).unwrap();
208 }
209 });
210 }
211 });
212
213 done_rx.recv().unwrap();
214 });
215 })
216 });
217}
218
219fn chained_spawn(c: &mut Criterion) {
220 const ITER: usize = 1_000;
221
222 fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
223 if n == 0 {
224 done_tx.send(()).unwrap();
225 } else {
226 tokio::spawn(async move {
227 iter(done_tx, n - 1);
228 });
229 }
230 }
231
232 c.bench_function("chained_spawn", |b| {
233 let rt = rt();
234 let (done_tx, done_rx) = mpsc::sync_channel(1000);
235
236 b.iter(move || {
237 let done_tx = done_tx.clone();
238
239 rt.block_on(async {
240 tokio::spawn(async move {
241 iter(done_tx, ITER);
242 });
243
244 done_rx.recv().unwrap();
245 });
246 })
247 });
248}
249
250fn rt() -> Runtime {
251 runtime::Builder::new_multi_thread()
252 .worker_threads(NUM_WORKERS)
253 .enable_all()
254 .build()
255 .unwrap()
256}
257
258fn stall() {
259 let now = Instant::now();
260 while now.elapsed() < STALL_DUR {
261 std::thread::yield_now();
262 }
263}
264
265criterion_group!(
266 scheduler,
267 spawn_many_local,
268 spawn_many_remote_idle,
269 spawn_many_remote_busy1,
270 spawn_many_remote_busy2,
271 ping_pong,
272 yield_many,
273 chained_spawn,
274);
275
276criterion_main!(scheduler);
277