| 1 | use criterion::{criterion_group, criterion_main, Criterion}; |
| 2 | |
| 3 | use rand::{Rng, SeedableRng}; |
| 4 | use rand_chacha::ChaCha20Rng; |
| 5 | |
| 6 | use tokio::io::{copy, repeat, AsyncRead, AsyncReadExt, AsyncWrite}; |
| 7 | use tokio::time::{interval, Interval, MissedTickBehavior}; |
| 8 | |
| 9 | use std::task::Poll; |
| 10 | use std::time::Duration; |
| 11 | |
| 12 | const KILO: usize = 1024; |
| 13 | |
| 14 | // Tunable parameters if you want to change this benchmark. If reader and writer |
| 15 | // are matched in kilobytes per second, then this only exposes buffering to the |
| 16 | // benchmark. |
| 17 | const RNG_SEED: u64 = 0; |
| 18 | // How much data to copy in a single benchmark run |
| 19 | const SOURCE_SIZE: u64 = 256 * KILO as u64; |
| 20 | // Read side provides CHUNK_SIZE every READ_SERVICE_PERIOD. If it's not called |
| 21 | // frequently, it'll burst to catch up (representing OS buffers draining) |
| 22 | const CHUNK_SIZE: usize = 2 * KILO; |
| 23 | const READ_SERVICE_PERIOD: Duration = Duration::from_millis(1); |
| 24 | // Write side buffers up to WRITE_BUFFER, and flushes to disk every |
| 25 | // WRITE_SERVICE_PERIOD. |
| 26 | const WRITE_BUFFER: usize = 40 * KILO; |
| 27 | const WRITE_SERVICE_PERIOD: Duration = Duration::from_millis(20); |
| 28 | // How likely you are to have to wait for previously written data to be flushed |
| 29 | // because another writer claimed the buffer space |
| 30 | const PROBABILITY_FLUSH_WAIT: f64 = 0.1; |
| 31 | |
| 32 | /// A slow writer that aims to simulate HDD behaviour under heavy load. |
| 33 | /// |
| 34 | /// There is a limited buffer, which is fully drained on the next write after |
| 35 | /// a time limit is reached. Flush waits for the time limit to be reached |
| 36 | /// and then drains the buffer. |
| 37 | /// |
| 38 | /// At random, the HDD will stall writers while it flushes out all buffers. If |
| 39 | /// this happens to you, you will be unable to write until the next time the |
| 40 | /// buffer is drained. |
| 41 | struct SlowHddWriter { |
| 42 | service_intervals: Interval, |
| 43 | blocking_rng: ChaCha20Rng, |
| 44 | buffer_size: usize, |
| 45 | buffer_used: usize, |
| 46 | } |
| 47 | |
| 48 | impl SlowHddWriter { |
| 49 | fn new(service_interval: Duration, buffer_size: usize) -> Self { |
| 50 | let blocking_rng = ChaCha20Rng::seed_from_u64(RNG_SEED); |
| 51 | let mut service_intervals = interval(service_interval); |
| 52 | service_intervals.set_missed_tick_behavior(MissedTickBehavior::Delay); |
| 53 | Self { |
| 54 | service_intervals, |
| 55 | blocking_rng, |
| 56 | buffer_size, |
| 57 | buffer_used: 0, |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | fn service_write( |
| 62 | mut self: std::pin::Pin<&mut Self>, |
| 63 | cx: &mut std::task::Context<'_>, |
| 64 | ) -> std::task::Poll<Result<(), std::io::Error>> { |
| 65 | // If we hit a service interval, the buffer can be cleared |
| 66 | let res = self.service_intervals.poll_tick(cx).map(|_| Ok(())); |
| 67 | if let Poll::Ready(_) = res { |
| 68 | self.buffer_used = 0; |
| 69 | } |
| 70 | res |
| 71 | } |
| 72 | |
| 73 | fn write_bytes( |
| 74 | mut self: std::pin::Pin<&mut Self>, |
| 75 | cx: &mut std::task::Context<'_>, |
| 76 | writeable: usize, |
| 77 | ) -> std::task::Poll<Result<usize, std::io::Error>> { |
| 78 | let service_res = self.as_mut().service_write(cx); |
| 79 | |
| 80 | if service_res.is_pending() && self.blocking_rng.gen_bool(PROBABILITY_FLUSH_WAIT) { |
| 81 | return Poll::Pending; |
| 82 | } |
| 83 | let available = self.buffer_size - self.buffer_used; |
| 84 | |
| 85 | if available == 0 { |
| 86 | assert!(service_res.is_pending()); |
| 87 | Poll::Pending |
| 88 | } else { |
| 89 | let written = available.min(writeable); |
| 90 | self.buffer_used += written; |
| 91 | Poll::Ready(Ok(written)) |
| 92 | } |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | impl Unpin for SlowHddWriter {} |
| 97 | |
| 98 | impl AsyncWrite for SlowHddWriter { |
| 99 | fn poll_write( |
| 100 | self: std::pin::Pin<&mut Self>, |
| 101 | cx: &mut std::task::Context<'_>, |
| 102 | buf: &[u8], |
| 103 | ) -> std::task::Poll<Result<usize, std::io::Error>> { |
| 104 | self.write_bytes(cx, buf.len()) |
| 105 | } |
| 106 | |
| 107 | fn poll_flush( |
| 108 | self: std::pin::Pin<&mut Self>, |
| 109 | cx: &mut std::task::Context<'_>, |
| 110 | ) -> std::task::Poll<Result<(), std::io::Error>> { |
| 111 | self.service_write(cx) |
| 112 | } |
| 113 | |
| 114 | fn poll_shutdown( |
| 115 | self: std::pin::Pin<&mut Self>, |
| 116 | cx: &mut std::task::Context<'_>, |
| 117 | ) -> std::task::Poll<Result<(), std::io::Error>> { |
| 118 | self.service_write(cx) |
| 119 | } |
| 120 | |
| 121 | fn poll_write_vectored( |
| 122 | self: std::pin::Pin<&mut Self>, |
| 123 | cx: &mut std::task::Context<'_>, |
| 124 | bufs: &[std::io::IoSlice<'_>], |
| 125 | ) -> std::task::Poll<Result<usize, std::io::Error>> { |
| 126 | let writeable = bufs.into_iter().fold(0, |acc, buf| acc + buf.len()); |
| 127 | self.write_bytes(cx, writeable) |
| 128 | } |
| 129 | |
| 130 | fn is_write_vectored(&self) -> bool { |
| 131 | true |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | /// A reader that limits the maximum chunk it'll give you back |
| 136 | /// |
| 137 | /// Simulates something reading from a slow link - you get one chunk per call, |
| 138 | /// and you are offered chunks on a schedule |
| 139 | struct ChunkReader { |
| 140 | data: Vec<u8>, |
| 141 | service_intervals: Interval, |
| 142 | } |
| 143 | |
| 144 | impl ChunkReader { |
| 145 | fn new(chunk_size: usize, service_interval: Duration) -> Self { |
| 146 | let mut service_intervals = interval(service_interval); |
| 147 | service_intervals.set_missed_tick_behavior(MissedTickBehavior::Burst); |
| 148 | let data: Vec<u8> = std::iter::repeat(0).take(chunk_size).collect(); |
| 149 | Self { |
| 150 | data, |
| 151 | service_intervals, |
| 152 | } |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | impl AsyncRead for ChunkReader { |
| 157 | fn poll_read( |
| 158 | mut self: std::pin::Pin<&mut Self>, |
| 159 | cx: &mut std::task::Context<'_>, |
| 160 | buf: &mut tokio::io::ReadBuf<'_>, |
| 161 | ) -> Poll<std::io::Result<()>> { |
| 162 | if self.service_intervals.poll_tick(cx).is_pending() { |
| 163 | return Poll::Pending; |
| 164 | } |
| 165 | buf.put_slice(&self.data[..buf.remaining().min(self.data.len())]); |
| 166 | Poll::Ready(Ok(())) |
| 167 | } |
| 168 | } |
| 169 | |
| 170 | fn rt() -> tokio::runtime::Runtime { |
| 171 | tokio::runtime::Builder::new_current_thread() |
| 172 | .enable_time() |
| 173 | .build() |
| 174 | .unwrap() |
| 175 | } |
| 176 | |
| 177 | fn copy_mem_to_mem(c: &mut Criterion) { |
| 178 | let rt = rt(); |
| 179 | |
| 180 | c.bench_function("copy_mem_to_mem" , |b| { |
| 181 | b.iter(|| { |
| 182 | let task = || async { |
| 183 | let mut source = repeat(0).take(SOURCE_SIZE); |
| 184 | let mut dest = Vec::new(); |
| 185 | copy(&mut source, &mut dest).await.unwrap(); |
| 186 | }; |
| 187 | |
| 188 | rt.block_on(task()); |
| 189 | }) |
| 190 | }); |
| 191 | } |
| 192 | |
| 193 | fn copy_mem_to_slow_hdd(c: &mut Criterion) { |
| 194 | let rt = rt(); |
| 195 | |
| 196 | c.bench_function("copy_mem_to_slow_hdd" , |b| { |
| 197 | b.iter(|| { |
| 198 | let task = || async { |
| 199 | let mut source = repeat(0).take(SOURCE_SIZE); |
| 200 | let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); |
| 201 | copy(&mut source, &mut dest).await.unwrap(); |
| 202 | }; |
| 203 | |
| 204 | rt.block_on(task()); |
| 205 | }) |
| 206 | }); |
| 207 | } |
| 208 | |
| 209 | fn copy_chunk_to_mem(c: &mut Criterion) { |
| 210 | let rt = rt(); |
| 211 | |
| 212 | c.bench_function("copy_chunk_to_mem" , |b| { |
| 213 | b.iter(|| { |
| 214 | let task = || async { |
| 215 | let mut source = |
| 216 | ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); |
| 217 | let mut dest = Vec::new(); |
| 218 | copy(&mut source, &mut dest).await.unwrap(); |
| 219 | }; |
| 220 | |
| 221 | rt.block_on(task()); |
| 222 | }) |
| 223 | }); |
| 224 | } |
| 225 | |
| 226 | fn copy_chunk_to_slow_hdd(c: &mut Criterion) { |
| 227 | let rt = rt(); |
| 228 | |
| 229 | c.bench_function("copy_chunk_to_slow_hdd" , |b| { |
| 230 | b.iter(|| { |
| 231 | let task = || async { |
| 232 | let mut source = |
| 233 | ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); |
| 234 | let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); |
| 235 | copy(&mut source, &mut dest).await.unwrap(); |
| 236 | }; |
| 237 | |
| 238 | rt.block_on(task()); |
| 239 | }) |
| 240 | }); |
| 241 | } |
| 242 | |
| 243 | criterion_group!( |
| 244 | copy_bench, |
| 245 | copy_mem_to_mem, |
| 246 | copy_mem_to_slow_hdd, |
| 247 | copy_chunk_to_mem, |
| 248 | copy_chunk_to_slow_hdd, |
| 249 | ); |
| 250 | criterion_main!(copy_bench); |
| 251 | |