1 | use tokio::sync::mpsc; |
2 | |
3 | use criterion::measurement::WallTime; |
4 | use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; |
5 | |
6 | #[derive(Debug, Copy, Clone)] |
7 | struct Medium([usize; 64]); |
8 | impl Default for Medium { |
9 | fn default() -> Self { |
10 | Medium([0; 64]) |
11 | } |
12 | } |
13 | |
14 | #[derive(Debug, Copy, Clone)] |
15 | struct Large([Medium; 64]); |
16 | impl Default for Large { |
17 | fn default() -> Self { |
18 | Large([Medium::default(); 64]) |
19 | } |
20 | } |
21 | |
22 | fn rt() -> tokio::runtime::Runtime { |
23 | tokio::runtime::Builder::new_multi_thread() |
24 | .worker_threads(6) |
25 | .build() |
26 | .unwrap() |
27 | } |
28 | |
29 | fn create_medium<const SIZE: usize>(g: &mut BenchmarkGroup<WallTime>) { |
30 | g.bench_function(SIZE.to_string(), |b| { |
31 | b.iter(|| { |
32 | black_box(&mpsc::channel::<Medium>(SIZE)); |
33 | }) |
34 | }); |
35 | } |
36 | |
37 | fn send_data<T: Default, const SIZE: usize>(g: &mut BenchmarkGroup<WallTime>, prefix: &str) { |
38 | let rt = rt(); |
39 | |
40 | g.bench_function(format!("{}_{}" , prefix, SIZE), |b| { |
41 | b.iter(|| { |
42 | let (tx, mut rx) = mpsc::channel::<T>(SIZE); |
43 | |
44 | let _ = rt.block_on(tx.send(T::default())); |
45 | |
46 | rt.block_on(rx.recv()).unwrap(); |
47 | }) |
48 | }); |
49 | } |
50 | |
51 | fn contention_bounded(g: &mut BenchmarkGroup<WallTime>) { |
52 | let rt = rt(); |
53 | |
54 | g.bench_function("bounded" , |b| { |
55 | b.iter(|| { |
56 | rt.block_on(async move { |
57 | let (tx, mut rx) = mpsc::channel::<usize>(1_000_000); |
58 | |
59 | for _ in 0..5 { |
60 | let tx = tx.clone(); |
61 | tokio::spawn(async move { |
62 | for i in 0..1000 { |
63 | tx.send(i).await.unwrap(); |
64 | } |
65 | }); |
66 | } |
67 | |
68 | for _ in 0..1_000 * 5 { |
69 | let _ = rx.recv().await; |
70 | } |
71 | }) |
72 | }) |
73 | }); |
74 | } |
75 | |
76 | fn contention_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) { |
77 | let rt = rt(); |
78 | |
79 | g.bench_function("bounded_recv_many" , |b| { |
80 | b.iter(|| { |
81 | rt.block_on(async move { |
82 | let (tx, mut rx) = mpsc::channel::<usize>(1_000_000); |
83 | |
84 | for _ in 0..5 { |
85 | let tx = tx.clone(); |
86 | tokio::spawn(async move { |
87 | for i in 0..1000 { |
88 | tx.send(i).await.unwrap(); |
89 | } |
90 | }); |
91 | } |
92 | |
93 | let mut buffer = Vec::<usize>::with_capacity(5_000); |
94 | let mut total = 0; |
95 | while total < 1_000 * 5 { |
96 | total += rx.recv_many(&mut buffer, 5_000).await; |
97 | } |
98 | }) |
99 | }) |
100 | }); |
101 | } |
102 | |
103 | fn contention_bounded_full(g: &mut BenchmarkGroup<WallTime>) { |
104 | let rt = rt(); |
105 | |
106 | g.bench_function("bounded_full" , |b| { |
107 | b.iter(|| { |
108 | rt.block_on(async move { |
109 | let (tx, mut rx) = mpsc::channel::<usize>(100); |
110 | |
111 | for _ in 0..5 { |
112 | let tx = tx.clone(); |
113 | tokio::spawn(async move { |
114 | for i in 0..1000 { |
115 | tx.send(i).await.unwrap(); |
116 | } |
117 | }); |
118 | } |
119 | |
120 | for _ in 0..1_000 * 5 { |
121 | let _ = rx.recv().await; |
122 | } |
123 | }) |
124 | }) |
125 | }); |
126 | } |
127 | |
128 | fn contention_bounded_full_recv_many(g: &mut BenchmarkGroup<WallTime>) { |
129 | let rt = rt(); |
130 | |
131 | g.bench_function("bounded_full_recv_many" , |b| { |
132 | b.iter(|| { |
133 | rt.block_on(async move { |
134 | let (tx, mut rx) = mpsc::channel::<usize>(100); |
135 | |
136 | for _ in 0..5 { |
137 | let tx = tx.clone(); |
138 | tokio::spawn(async move { |
139 | for i in 0..1000 { |
140 | tx.send(i).await.unwrap(); |
141 | } |
142 | }); |
143 | } |
144 | |
145 | let mut buffer = Vec::<usize>::with_capacity(5_000); |
146 | let mut total = 0; |
147 | while total < 1_000 * 5 { |
148 | total += rx.recv_many(&mut buffer, 5_000).await; |
149 | } |
150 | }) |
151 | }) |
152 | }); |
153 | } |
154 | |
155 | fn contention_unbounded(g: &mut BenchmarkGroup<WallTime>) { |
156 | let rt = rt(); |
157 | |
158 | g.bench_function("unbounded" , |b| { |
159 | b.iter(|| { |
160 | rt.block_on(async move { |
161 | let (tx, mut rx) = mpsc::unbounded_channel::<usize>(); |
162 | |
163 | for _ in 0..5 { |
164 | let tx = tx.clone(); |
165 | tokio::spawn(async move { |
166 | for i in 0..1000 { |
167 | tx.send(i).unwrap(); |
168 | } |
169 | }); |
170 | } |
171 | |
172 | for _ in 0..1_000 * 5 { |
173 | let _ = rx.recv().await; |
174 | } |
175 | }) |
176 | }) |
177 | }); |
178 | } |
179 | |
180 | fn contention_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) { |
181 | let rt = rt(); |
182 | |
183 | g.bench_function("unbounded_recv_many" , |b| { |
184 | b.iter(|| { |
185 | rt.block_on(async move { |
186 | let (tx, mut rx) = mpsc::unbounded_channel::<usize>(); |
187 | |
188 | for _ in 0..5 { |
189 | let tx = tx.clone(); |
190 | tokio::spawn(async move { |
191 | for i in 0..1000 { |
192 | tx.send(i).unwrap(); |
193 | } |
194 | }); |
195 | } |
196 | |
197 | let mut buffer = Vec::<usize>::with_capacity(5_000); |
198 | let mut total = 0; |
199 | while total < 1_000 * 5 { |
200 | total += rx.recv_many(&mut buffer, 5_000).await; |
201 | } |
202 | }) |
203 | }) |
204 | }); |
205 | } |
206 | |
207 | fn uncontented_bounded(g: &mut BenchmarkGroup<WallTime>) { |
208 | let rt = rt(); |
209 | |
210 | g.bench_function("bounded" , |b| { |
211 | b.iter(|| { |
212 | rt.block_on(async move { |
213 | let (tx, mut rx) = mpsc::channel::<usize>(1_000_000); |
214 | |
215 | for i in 0..5000 { |
216 | tx.send(i).await.unwrap(); |
217 | } |
218 | |
219 | for _ in 0..5_000 { |
220 | let _ = rx.recv().await; |
221 | } |
222 | }) |
223 | }) |
224 | }); |
225 | } |
226 | |
227 | fn uncontented_bounded_recv_many(g: &mut BenchmarkGroup<WallTime>) { |
228 | let rt = rt(); |
229 | |
230 | g.bench_function("bounded_recv_many" , |b| { |
231 | b.iter(|| { |
232 | rt.block_on(async move { |
233 | let (tx, mut rx) = mpsc::channel::<usize>(1_000_000); |
234 | |
235 | for i in 0..5000 { |
236 | tx.send(i).await.unwrap(); |
237 | } |
238 | |
239 | let mut buffer = Vec::<usize>::with_capacity(5_000); |
240 | let mut total = 0; |
241 | while total < 1_000 * 5 { |
242 | total += rx.recv_many(&mut buffer, 5_000).await; |
243 | } |
244 | }) |
245 | }) |
246 | }); |
247 | } |
248 | |
249 | fn uncontented_unbounded(g: &mut BenchmarkGroup<WallTime>) { |
250 | let rt = rt(); |
251 | |
252 | g.bench_function("unbounded" , |b| { |
253 | b.iter(|| { |
254 | rt.block_on(async move { |
255 | let (tx, mut rx) = mpsc::unbounded_channel::<usize>(); |
256 | |
257 | for i in 0..5000 { |
258 | tx.send(i).unwrap(); |
259 | } |
260 | |
261 | for _ in 0..5_000 { |
262 | let _ = rx.recv().await; |
263 | } |
264 | }) |
265 | }) |
266 | }); |
267 | } |
268 | |
269 | fn uncontented_unbounded_recv_many(g: &mut BenchmarkGroup<WallTime>) { |
270 | let rt = rt(); |
271 | |
272 | g.bench_function("unbounded_recv_many" , |b| { |
273 | b.iter(|| { |
274 | rt.block_on(async move { |
275 | let (tx, mut rx) = mpsc::unbounded_channel::<usize>(); |
276 | |
277 | for i in 0..5000 { |
278 | tx.send(i).unwrap(); |
279 | } |
280 | |
281 | let mut buffer = Vec::<usize>::with_capacity(5_000); |
282 | let mut total = 0; |
283 | while total < 1_000 * 5 { |
284 | total += rx.recv_many(&mut buffer, 5_000).await; |
285 | } |
286 | }) |
287 | }) |
288 | }); |
289 | } |
290 | |
291 | fn bench_create_medium(c: &mut Criterion) { |
292 | let mut group = c.benchmark_group("create_medium" ); |
293 | create_medium::<1>(&mut group); |
294 | create_medium::<100>(&mut group); |
295 | create_medium::<100_000>(&mut group); |
296 | group.finish(); |
297 | } |
298 | |
299 | fn bench_send(c: &mut Criterion) { |
300 | let mut group = c.benchmark_group("send" ); |
301 | send_data::<Medium, 1000>(&mut group, "medium" ); |
302 | send_data::<Large, 1000>(&mut group, "large" ); |
303 | group.finish(); |
304 | } |
305 | |
306 | fn bench_contention(c: &mut Criterion) { |
307 | let mut group = c.benchmark_group("contention" ); |
308 | contention_bounded(&mut group); |
309 | contention_bounded_recv_many(&mut group); |
310 | contention_bounded_full(&mut group); |
311 | contention_bounded_full_recv_many(&mut group); |
312 | contention_unbounded(&mut group); |
313 | contention_unbounded_recv_many(&mut group); |
314 | group.finish(); |
315 | } |
316 | |
317 | fn bench_uncontented(c: &mut Criterion) { |
318 | let mut group = c.benchmark_group("uncontented" ); |
319 | uncontented_bounded(&mut group); |
320 | uncontented_bounded_recv_many(&mut group); |
321 | uncontented_unbounded(&mut group); |
322 | uncontented_unbounded_recv_many(&mut group); |
323 | group.finish(); |
324 | } |
325 | |
326 | criterion_group!(create, bench_create_medium); |
327 | criterion_group!(send, bench_send); |
328 | criterion_group!(contention, bench_contention); |
329 | criterion_group!(uncontented, bench_uncontented); |
330 | |
331 | criterion_main!(create, send, contention, uncontented); |
332 | |