1use rand::prelude::*;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4use tokio::sync::{watch, Notify};
5
6use criterion::measurement::WallTime;
7use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
8
9fn rt() -> tokio::runtime::Runtime {
10 tokio::runtime::Builder::new_multi_thread()
11 .worker_threads(6)
12 .build()
13 .unwrap()
14}
15
16fn do_work(rng: &mut impl RngCore) -> u32 {
17 use std::fmt::Write;
18 let mut message = String::new();
19 for i in 1..=10 {
20 let _ = write!(&mut message, " {i}={}", rng.gen::<f64>());
21 }
22 message
23 .as_bytes()
24 .iter()
25 .map(|&c| c as u32)
26 .fold(0, u32::wrapping_add)
27}
28
29fn contention_resubscribe<const N_TASKS: usize>(g: &mut BenchmarkGroup<WallTime>) {
30 let rt = rt();
31 let (snd, _) = watch::channel(0i32);
32 let snd = Arc::new(snd);
33 let wg = Arc::new((AtomicU64::new(0), Notify::new()));
34 for n in 0..N_TASKS {
35 let mut rcv = snd.subscribe();
36 let wg = wg.clone();
37 let mut rng = rand::rngs::StdRng::seed_from_u64(n as u64);
38 rt.spawn(async move {
39 while rcv.changed().await.is_ok() {
40 let _ = *rcv.borrow(); // contend on rwlock
41 let r = do_work(&mut rng);
42 let _ = black_box(r);
43 if wg.0.fetch_sub(1, Ordering::Release) == 1 {
44 wg.1.notify_one();
45 }
46 }
47 });
48 }
49
50 const N_ITERS: usize = 100;
51 g.bench_function(N_TASKS.to_string(), |b| {
52 b.iter(|| {
53 rt.block_on({
54 let snd = snd.clone();
55 let wg = wg.clone();
56 async move {
57 tokio::spawn(async move {
58 for _ in 0..N_ITERS {
59 assert_eq!(wg.0.fetch_add(N_TASKS as u64, Ordering::Relaxed), 0);
60 let _ = snd.send(black_box(42));
61 while wg.0.load(Ordering::Acquire) > 0 {
62 wg.1.notified().await;
63 }
64 }
65 })
66 .await
67 .unwrap();
68 }
69 });
70 })
71 });
72}
73
74fn bench_contention_resubscribe(c: &mut Criterion) {
75 let mut group = c.benchmark_group("contention_resubscribe");
76 contention_resubscribe::<10>(&mut group);
77 contention_resubscribe::<100>(&mut group);
78 contention_resubscribe::<500>(&mut group);
79 contention_resubscribe::<1000>(&mut group);
80 group.finish();
81}
82
83criterion_group!(contention, bench_contention_resubscribe);
84
85criterion_main!(contention);
86