1use criterion::{criterion_group, criterion_main, Criterion};
2
3use rand::{Rng, SeedableRng};
4use rand_chacha::ChaCha20Rng;
5
6use tokio::io::{copy, repeat, AsyncRead, AsyncReadExt, AsyncWrite};
7use tokio::time::{interval, Interval, MissedTickBehavior};
8
9use std::task::Poll;
10use std::time::Duration;
11
12const KILO: usize = 1024;
13
14// Tunable parameters if you want to change this benchmark. If reader and writer
15// are matched in kilobytes per second, then this only exposes buffering to the
16// benchmark.
17const RNG_SEED: u64 = 0;
18// How much data to copy in a single benchmark run
19const SOURCE_SIZE: u64 = 256 * KILO as u64;
20// Read side provides CHUNK_SIZE every READ_SERVICE_PERIOD. If it's not called
21// frequently, it'll burst to catch up (representing OS buffers draining)
22const CHUNK_SIZE: usize = 2 * KILO;
23const READ_SERVICE_PERIOD: Duration = Duration::from_millis(1);
24// Write side buffers up to WRITE_BUFFER, and flushes to disk every
25// WRITE_SERVICE_PERIOD.
26const WRITE_BUFFER: usize = 40 * KILO;
27const WRITE_SERVICE_PERIOD: Duration = Duration::from_millis(20);
28// How likely you are to have to wait for previously written data to be flushed
29// because another writer claimed the buffer space
30const PROBABILITY_FLUSH_WAIT: f64 = 0.1;
31
32/// A slow writer that aims to simulate HDD behaviour under heavy load.
33///
34/// There is a limited buffer, which is fully drained on the next write after
35/// a time limit is reached. Flush waits for the time limit to be reached
36/// and then drains the buffer.
37///
38/// At random, the HDD will stall writers while it flushes out all buffers. If
39/// this happens to you, you will be unable to write until the next time the
40/// buffer is drained.
41struct SlowHddWriter {
42 service_intervals: Interval,
43 blocking_rng: ChaCha20Rng,
44 buffer_size: usize,
45 buffer_used: usize,
46}
47
48impl SlowHddWriter {
49 fn new(service_interval: Duration, buffer_size: usize) -> Self {
50 let blocking_rng = ChaCha20Rng::seed_from_u64(RNG_SEED);
51 let mut service_intervals = interval(service_interval);
52 service_intervals.set_missed_tick_behavior(MissedTickBehavior::Delay);
53 Self {
54 service_intervals,
55 blocking_rng,
56 buffer_size,
57 buffer_used: 0,
58 }
59 }
60
61 fn service_write(
62 mut self: std::pin::Pin<&mut Self>,
63 cx: &mut std::task::Context<'_>,
64 ) -> std::task::Poll<Result<(), std::io::Error>> {
65 // If we hit a service interval, the buffer can be cleared
66 let res = self.service_intervals.poll_tick(cx).map(|_| Ok(()));
67 if let Poll::Ready(_) = res {
68 self.buffer_used = 0;
69 }
70 res
71 }
72
73 fn write_bytes(
74 mut self: std::pin::Pin<&mut Self>,
75 cx: &mut std::task::Context<'_>,
76 writeable: usize,
77 ) -> std::task::Poll<Result<usize, std::io::Error>> {
78 let service_res = self.as_mut().service_write(cx);
79
80 if service_res.is_pending() && self.blocking_rng.gen_bool(PROBABILITY_FLUSH_WAIT) {
81 return Poll::Pending;
82 }
83 let available = self.buffer_size - self.buffer_used;
84
85 if available == 0 {
86 assert!(service_res.is_pending());
87 Poll::Pending
88 } else {
89 let written = available.min(writeable);
90 self.buffer_used += written;
91 Poll::Ready(Ok(written))
92 }
93 }
94}
95
96impl Unpin for SlowHddWriter {}
97
98impl AsyncWrite for SlowHddWriter {
99 fn poll_write(
100 self: std::pin::Pin<&mut Self>,
101 cx: &mut std::task::Context<'_>,
102 buf: &[u8],
103 ) -> std::task::Poll<Result<usize, std::io::Error>> {
104 self.write_bytes(cx, buf.len())
105 }
106
107 fn poll_flush(
108 self: std::pin::Pin<&mut Self>,
109 cx: &mut std::task::Context<'_>,
110 ) -> std::task::Poll<Result<(), std::io::Error>> {
111 self.service_write(cx)
112 }
113
114 fn poll_shutdown(
115 self: std::pin::Pin<&mut Self>,
116 cx: &mut std::task::Context<'_>,
117 ) -> std::task::Poll<Result<(), std::io::Error>> {
118 self.service_write(cx)
119 }
120
121 fn poll_write_vectored(
122 self: std::pin::Pin<&mut Self>,
123 cx: &mut std::task::Context<'_>,
124 bufs: &[std::io::IoSlice<'_>],
125 ) -> std::task::Poll<Result<usize, std::io::Error>> {
126 let writeable = bufs.into_iter().fold(0, |acc, buf| acc + buf.len());
127 self.write_bytes(cx, writeable)
128 }
129
130 fn is_write_vectored(&self) -> bool {
131 true
132 }
133}
134
135/// A reader that limits the maximum chunk it'll give you back
136///
137/// Simulates something reading from a slow link - you get one chunk per call,
138/// and you are offered chunks on a schedule
139struct ChunkReader {
140 data: Vec<u8>,
141 service_intervals: Interval,
142}
143
144impl ChunkReader {
145 fn new(chunk_size: usize, service_interval: Duration) -> Self {
146 let mut service_intervals = interval(service_interval);
147 service_intervals.set_missed_tick_behavior(MissedTickBehavior::Burst);
148 let data: Vec<u8> = std::iter::repeat(0).take(chunk_size).collect();
149 Self {
150 data,
151 service_intervals,
152 }
153 }
154}
155
156impl AsyncRead for ChunkReader {
157 fn poll_read(
158 mut self: std::pin::Pin<&mut Self>,
159 cx: &mut std::task::Context<'_>,
160 buf: &mut tokio::io::ReadBuf<'_>,
161 ) -> Poll<std::io::Result<()>> {
162 if self.service_intervals.poll_tick(cx).is_pending() {
163 return Poll::Pending;
164 }
165 buf.put_slice(&self.data[..buf.remaining().min(self.data.len())]);
166 Poll::Ready(Ok(()))
167 }
168}
169
170fn rt() -> tokio::runtime::Runtime {
171 tokio::runtime::Builder::new_current_thread()
172 .enable_time()
173 .build()
174 .unwrap()
175}
176
177fn copy_mem_to_mem(c: &mut Criterion) {
178 let rt = rt();
179
180 c.bench_function("copy_mem_to_mem", |b| {
181 b.iter(|| {
182 let task = || async {
183 let mut source = repeat(0).take(SOURCE_SIZE);
184 let mut dest = Vec::new();
185 copy(&mut source, &mut dest).await.unwrap();
186 };
187
188 rt.block_on(task());
189 })
190 });
191}
192
193fn copy_mem_to_slow_hdd(c: &mut Criterion) {
194 let rt = rt();
195
196 c.bench_function("copy_mem_to_slow_hdd", |b| {
197 b.iter(|| {
198 let task = || async {
199 let mut source = repeat(0).take(SOURCE_SIZE);
200 let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
201 copy(&mut source, &mut dest).await.unwrap();
202 };
203
204 rt.block_on(task());
205 })
206 });
207}
208
209fn copy_chunk_to_mem(c: &mut Criterion) {
210 let rt = rt();
211
212 c.bench_function("copy_chunk_to_mem", |b| {
213 b.iter(|| {
214 let task = || async {
215 let mut source =
216 ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
217 let mut dest = Vec::new();
218 copy(&mut source, &mut dest).await.unwrap();
219 };
220
221 rt.block_on(task());
222 })
223 });
224}
225
226fn copy_chunk_to_slow_hdd(c: &mut Criterion) {
227 let rt = rt();
228
229 c.bench_function("copy_chunk_to_slow_hdd", |b| {
230 b.iter(|| {
231 let task = || async {
232 let mut source =
233 ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE);
234 let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER);
235 copy(&mut source, &mut dest).await.unwrap();
236 };
237
238 rt.block_on(task());
239 })
240 });
241}
242
243criterion_group!(
244 copy_bench,
245 copy_mem_to_mem,
246 copy_mem_to_slow_hdd,
247 copy_chunk_to_mem,
248 copy_chunk_to_slow_hdd,
249);
250criterion_main!(copy_bench);
251