1use tokio::sync::mpsc;
2
3use criterion::measurement::WallTime;
4use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
5
6#[derive(Debug, Copy, Clone)]
7struct Medium([usize; 64]);
8impl Default for Medium {
9 fn default() -> Self {
10 Medium([0; 64])
11 }
12}
13
14#[derive(Debug, Copy, Clone)]
15struct Large([Medium; 64]);
16impl Default for Large {
17 fn default() -> Self {
18 Large([Medium::default(); 64])
19 }
20}
21
22fn rt() -> tokio::runtime::Runtime {
23 tokio::runtime::Builder::new_multi_thread()
24 .worker_threads(6)
25 .build()
26 .unwrap()
27}
28
29fn create_medium<const SIZE: usize>(g: &mut BenchmarkGroup<WallTime>) {
30 g.bench_function(SIZE.to_string(), |b| {
31 b.iter(|| {
32 black_box(&mpsc::channel::<Medium>(SIZE));
33 })
34 });
35}
36
37fn send_data<T: Default, const SIZE: usize>(g: &mut BenchmarkGroup<WallTime>, prefix: &str) {
38 let rt = rt();
39
40 g.bench_function(format!("{}_{}", prefix, SIZE), |b| {
41 b.iter(|| {
42 let (tx, mut rx) = mpsc::channel::<T>(SIZE);
43
44 let _ = rt.block_on(tx.send(T::default()));
45
46 rt.block_on(rx.recv()).unwrap();
47 })
48 });
49}
50
51fn contention_bounded(g: &mut BenchmarkGroup<WallTime>) {
52 let rt = rt();
53
54 g.bench_function("bounded", |b| {
55 b.iter(|| {
56 rt.block_on(async move {
57 let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
58
59 for _ in 0..5 {
60 let tx = tx.clone();
61 tokio::spawn(async move {
62 for i in 0..1000 {
63 tx.send(i).await.unwrap();
64 }
65 });
66 }
67
68 for _ in 0..1_000 * 5 {
69 let _ = rx.recv().await;
70 }
71 })
72 })
73 });
74}
75
76fn contention_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
77 let rt = rt();
78
79 g.bench_function("bounded_recv_many", |b| {
80 b.iter(|| {
81 rt.block_on(async move {
82 let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
83
84 for _ in 0..5 {
85 let tx = tx.clone();
86 tokio::spawn(async move {
87 for i in 0..1000 {
88 tx.send(i).await.unwrap();
89 }
90 });
91 }
92
93 let mut buffer = Vec::<usize>::with_capacity(5_000);
94 let mut total = 0;
95 while total < 1_000 * 5 {
96 total += rx.recv_many(&mut buffer, 5_000).await;
97 }
98 })
99 })
100 });
101}
102
103fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) {
104 let rt = rt();
105
106 g.bench_function("bounded_full", |b| {
107 b.iter(|| {
108 rt.block_on(async move {
109 let (tx, mut rx) = mpsc::channel::<usize>(100);
110
111 for _ in 0..5 {
112 let tx = tx.clone();
113 tokio::spawn(async move {
114 for i in 0..1000 {
115 tx.send(i).await.unwrap();
116 }
117 });
118 }
119
120 for _ in 0..1_000 * 5 {
121 let _ = rx.recv().await;
122 }
123 })
124 })
125 });
126}
127
128fn contention_bounded_full_recv_many(g: &mut BenchmarkGroup<WallTime>) {
129 let rt = rt();
130
131 g.bench_function("bounded_full_recv_many", |b| {
132 b.iter(|| {
133 rt.block_on(async move {
134 let (tx, mut rx) = mpsc::channel::<usize>(100);
135
136 for _ in 0..5 {
137 let tx = tx.clone();
138 tokio::spawn(async move {
139 for i in 0..1000 {
140 tx.send(i).await.unwrap();
141 }
142 });
143 }
144
145 let mut buffer = Vec::<usize>::with_capacity(5_000);
146 let mut total = 0;
147 while total < 1_000 * 5 {
148 total += rx.recv_many(&mut buffer, 5_000).await;
149 }
150 })
151 })
152 });
153}
154
155fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) {
156 let rt = rt();
157
158 g.bench_function("unbounded", |b| {
159 b.iter(|| {
160 rt.block_on(async move {
161 let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
162
163 for _ in 0..5 {
164 let tx = tx.clone();
165 tokio::spawn(async move {
166 for i in 0..1000 {
167 tx.send(i).unwrap();
168 }
169 });
170 }
171
172 for _ in 0..1_000 * 5 {
173 let _ = rx.recv().await;
174 }
175 })
176 })
177 });
178}
179
180fn contention_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
181 let rt = rt();
182
183 g.bench_function("unbounded_recv_many", |b| {
184 b.iter(|| {
185 rt.block_on(async move {
186 let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
187
188 for _ in 0..5 {
189 let tx = tx.clone();
190 tokio::spawn(async move {
191 for i in 0..1000 {
192 tx.send(i).unwrap();
193 }
194 });
195 }
196
197 let mut buffer = Vec::<usize>::with_capacity(5_000);
198 let mut total = 0;
199 while total < 1_000 * 5 {
200 total += rx.recv_many(&mut buffer, 5_000).await;
201 }
202 })
203 })
204 });
205}
206
207fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) {
208 let rt = rt();
209
210 g.bench_function("bounded", |b| {
211 b.iter(|| {
212 rt.block_on(async move {
213 let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
214
215 for i in 0..5000 {
216 tx.send(i).await.unwrap();
217 }
218
219 for _ in 0..5_000 {
220 let _ = rx.recv().await;
221 }
222 })
223 })
224 });
225}
226
227fn uncontented_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
228 let rt = rt();
229
230 g.bench_function("bounded_recv_many", |b| {
231 b.iter(|| {
232 rt.block_on(async move {
233 let (tx, mut rx) = mpsc::channel::<usize>(1_000_000);
234
235 for i in 0..5000 {
236 tx.send(i).await.unwrap();
237 }
238
239 let mut buffer = Vec::<usize>::with_capacity(5_000);
240 let mut total = 0;
241 while total < 1_000 * 5 {
242 total += rx.recv_many(&mut buffer, 5_000).await;
243 }
244 })
245 })
246 });
247}
248
249fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) {
250 let rt = rt();
251
252 g.bench_function("unbounded", |b| {
253 b.iter(|| {
254 rt.block_on(async move {
255 let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
256
257 for i in 0..5000 {
258 tx.send(i).unwrap();
259 }
260
261 for _ in 0..5_000 {
262 let _ = rx.recv().await;
263 }
264 })
265 })
266 });
267}
268
269fn uncontented_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) {
270 let rt = rt();
271
272 g.bench_function("unbounded_recv_many", |b| {
273 b.iter(|| {
274 rt.block_on(async move {
275 let (tx, mut rx) = mpsc::unbounded_channel::<usize>();
276
277 for i in 0..5000 {
278 tx.send(i).unwrap();
279 }
280
281 let mut buffer = Vec::<usize>::with_capacity(5_000);
282 let mut total = 0;
283 while total < 1_000 * 5 {
284 total += rx.recv_many(&mut buffer, 5_000).await;
285 }
286 })
287 })
288 });
289}
290
291fn bench_create_medium(c: &mut Criterion) {
292 let mut group = c.benchmark_group("create_medium");
293 create_medium::<1>(&mut group);
294 create_medium::<100>(&mut group);
295 create_medium::<100_000>(&mut group);
296 group.finish();
297}
298
299fn bench_send(c: &mut Criterion) {
300 let mut group = c.benchmark_group("send");
301 send_data::<Medium, 1000>(&mut group, "medium");
302 send_data::<Large, 1000>(&mut group, "large");
303 group.finish();
304}
305
306fn bench_contention(c: &mut Criterion) {
307 let mut group = c.benchmark_group("contention");
308 contention_bounded(&mut group);
309 contention_bounded_recv_many(&mut group);
310 contention_bounded_full(&mut group);
311 contention_bounded_full_recv_many(&mut group);
312 contention_unbounded(&mut group);
313 contention_unbounded_recv_many(&mut group);
314 group.finish();
315}
316
317fn bench_uncontented(c: &mut Criterion) {
318 let mut group = c.benchmark_group("uncontented");
319 uncontented_bounded(&mut group);
320 uncontented_bounded_recv_many(&mut group);
321 uncontented_unbounded(&mut group);
322 uncontented_unbounded_recv_many(&mut group);
323 group.finish();
324}
325
326criterion_group!(create, bench_create_medium);
327criterion_group!(send, bench_send);
328criterion_group!(contention, bench_contention);
329criterion_group!(uncontented, bench_uncontented);
330
331criterion_main!(create, send, contention, uncontented);
332