1 | use criterion::{criterion_group, criterion_main, Criterion}; |
2 | |
3 | use rand::{Rng, SeedableRng}; |
4 | use rand_chacha::ChaCha20Rng; |
5 | |
6 | use tokio::io::{copy, repeat, AsyncRead, AsyncReadExt, AsyncWrite}; |
7 | use tokio::time::{interval, Interval, MissedTickBehavior}; |
8 | |
9 | use std::task::Poll; |
10 | use std::time::Duration; |
11 | |
12 | const KILO: usize = 1024; |
13 | |
14 | // Tunable parameters if you want to change this benchmark. If reader and writer |
15 | // are matched in kilobytes per second, then this only exposes buffering to the |
16 | // benchmark. |
17 | const RNG_SEED: u64 = 0; |
18 | // How much data to copy in a single benchmark run |
19 | const SOURCE_SIZE: u64 = 256 * KILO as u64; |
20 | // Read side provides CHUNK_SIZE every READ_SERVICE_PERIOD. If it's not called |
21 | // frequently, it'll burst to catch up (representing OS buffers draining) |
22 | const CHUNK_SIZE: usize = 2 * KILO; |
23 | const READ_SERVICE_PERIOD: Duration = Duration::from_millis(1); |
24 | // Write side buffers up to WRITE_BUFFER, and flushes to disk every |
25 | // WRITE_SERVICE_PERIOD. |
26 | const WRITE_BUFFER: usize = 40 * KILO; |
27 | const WRITE_SERVICE_PERIOD: Duration = Duration::from_millis(20); |
28 | // How likely you are to have to wait for previously written data to be flushed |
29 | // because another writer claimed the buffer space |
30 | const PROBABILITY_FLUSH_WAIT: f64 = 0.1; |
31 | |
32 | /// A slow writer that aims to simulate HDD behaviour under heavy load. |
33 | /// |
34 | /// There is a limited buffer, which is fully drained on the next write after |
35 | /// a time limit is reached. Flush waits for the time limit to be reached |
36 | /// and then drains the buffer. |
37 | /// |
38 | /// At random, the HDD will stall writers while it flushes out all buffers. If |
39 | /// this happens to you, you will be unable to write until the next time the |
40 | /// buffer is drained. |
41 | struct SlowHddWriter { |
42 | service_intervals: Interval, |
43 | blocking_rng: ChaCha20Rng, |
44 | buffer_size: usize, |
45 | buffer_used: usize, |
46 | } |
47 | |
48 | impl SlowHddWriter { |
49 | fn new(service_interval: Duration, buffer_size: usize) -> Self { |
50 | let blocking_rng = ChaCha20Rng::seed_from_u64(RNG_SEED); |
51 | let mut service_intervals = interval(service_interval); |
52 | service_intervals.set_missed_tick_behavior(MissedTickBehavior::Delay); |
53 | Self { |
54 | service_intervals, |
55 | blocking_rng, |
56 | buffer_size, |
57 | buffer_used: 0, |
58 | } |
59 | } |
60 | |
61 | fn service_write( |
62 | mut self: std::pin::Pin<&mut Self>, |
63 | cx: &mut std::task::Context<'_>, |
64 | ) -> std::task::Poll<Result<(), std::io::Error>> { |
65 | // If we hit a service interval, the buffer can be cleared |
66 | let res = self.service_intervals.poll_tick(cx).map(|_| Ok(())); |
67 | if let Poll::Ready(_) = res { |
68 | self.buffer_used = 0; |
69 | } |
70 | res |
71 | } |
72 | |
73 | fn write_bytes( |
74 | mut self: std::pin::Pin<&mut Self>, |
75 | cx: &mut std::task::Context<'_>, |
76 | writeable: usize, |
77 | ) -> std::task::Poll<Result<usize, std::io::Error>> { |
78 | let service_res = self.as_mut().service_write(cx); |
79 | |
80 | if service_res.is_pending() && self.blocking_rng.gen_bool(PROBABILITY_FLUSH_WAIT) { |
81 | return Poll::Pending; |
82 | } |
83 | let available = self.buffer_size - self.buffer_used; |
84 | |
85 | if available == 0 { |
86 | assert!(service_res.is_pending()); |
87 | Poll::Pending |
88 | } else { |
89 | let written = available.min(writeable); |
90 | self.buffer_used += written; |
91 | Poll::Ready(Ok(written)) |
92 | } |
93 | } |
94 | } |
95 | |
96 | impl Unpin for SlowHddWriter {} |
97 | |
98 | impl AsyncWrite for SlowHddWriter { |
99 | fn poll_write( |
100 | self: std::pin::Pin<&mut Self>, |
101 | cx: &mut std::task::Context<'_>, |
102 | buf: &[u8], |
103 | ) -> std::task::Poll<Result<usize, std::io::Error>> { |
104 | self.write_bytes(cx, buf.len()) |
105 | } |
106 | |
107 | fn poll_flush( |
108 | self: std::pin::Pin<&mut Self>, |
109 | cx: &mut std::task::Context<'_>, |
110 | ) -> std::task::Poll<Result<(), std::io::Error>> { |
111 | self.service_write(cx) |
112 | } |
113 | |
114 | fn poll_shutdown( |
115 | self: std::pin::Pin<&mut Self>, |
116 | cx: &mut std::task::Context<'_>, |
117 | ) -> std::task::Poll<Result<(), std::io::Error>> { |
118 | self.service_write(cx) |
119 | } |
120 | |
121 | fn poll_write_vectored( |
122 | self: std::pin::Pin<&mut Self>, |
123 | cx: &mut std::task::Context<'_>, |
124 | bufs: &[std::io::IoSlice<'_>], |
125 | ) -> std::task::Poll<Result<usize, std::io::Error>> { |
126 | let writeable = bufs.into_iter().fold(0, |acc, buf| acc + buf.len()); |
127 | self.write_bytes(cx, writeable) |
128 | } |
129 | |
130 | fn is_write_vectored(&self) -> bool { |
131 | true |
132 | } |
133 | } |
134 | |
135 | /// A reader that limits the maximum chunk it'll give you back |
136 | /// |
137 | /// Simulates something reading from a slow link - you get one chunk per call, |
138 | /// and you are offered chunks on a schedule |
139 | struct ChunkReader { |
140 | data: Vec<u8>, |
141 | service_intervals: Interval, |
142 | } |
143 | |
144 | impl ChunkReader { |
145 | fn new(chunk_size: usize, service_interval: Duration) -> Self { |
146 | let mut service_intervals = interval(service_interval); |
147 | service_intervals.set_missed_tick_behavior(MissedTickBehavior::Burst); |
148 | let data: Vec<u8> = std::iter::repeat(0).take(chunk_size).collect(); |
149 | Self { |
150 | data, |
151 | service_intervals, |
152 | } |
153 | } |
154 | } |
155 | |
156 | impl AsyncRead for ChunkReader { |
157 | fn poll_read( |
158 | mut self: std::pin::Pin<&mut Self>, |
159 | cx: &mut std::task::Context<'_>, |
160 | buf: &mut tokio::io::ReadBuf<'_>, |
161 | ) -> Poll<std::io::Result<()>> { |
162 | if self.service_intervals.poll_tick(cx).is_pending() { |
163 | return Poll::Pending; |
164 | } |
165 | buf.put_slice(&self.data[..buf.remaining().min(self.data.len())]); |
166 | Poll::Ready(Ok(())) |
167 | } |
168 | } |
169 | |
170 | fn rt() -> tokio::runtime::Runtime { |
171 | tokio::runtime::Builder::new_current_thread() |
172 | .enable_time() |
173 | .build() |
174 | .unwrap() |
175 | } |
176 | |
177 | fn copy_mem_to_mem(c: &mut Criterion) { |
178 | let rt = rt(); |
179 | |
180 | c.bench_function("copy_mem_to_mem" , |b| { |
181 | b.iter(|| { |
182 | let task = || async { |
183 | let mut source = repeat(0).take(SOURCE_SIZE); |
184 | let mut dest = Vec::new(); |
185 | copy(&mut source, &mut dest).await.unwrap(); |
186 | }; |
187 | |
188 | rt.block_on(task()); |
189 | }) |
190 | }); |
191 | } |
192 | |
193 | fn copy_mem_to_slow_hdd(c: &mut Criterion) { |
194 | let rt = rt(); |
195 | |
196 | c.bench_function("copy_mem_to_slow_hdd" , |b| { |
197 | b.iter(|| { |
198 | let task = || async { |
199 | let mut source = repeat(0).take(SOURCE_SIZE); |
200 | let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); |
201 | copy(&mut source, &mut dest).await.unwrap(); |
202 | }; |
203 | |
204 | rt.block_on(task()); |
205 | }) |
206 | }); |
207 | } |
208 | |
209 | fn copy_chunk_to_mem(c: &mut Criterion) { |
210 | let rt = rt(); |
211 | |
212 | c.bench_function("copy_chunk_to_mem" , |b| { |
213 | b.iter(|| { |
214 | let task = || async { |
215 | let mut source = |
216 | ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); |
217 | let mut dest = Vec::new(); |
218 | copy(&mut source, &mut dest).await.unwrap(); |
219 | }; |
220 | |
221 | rt.block_on(task()); |
222 | }) |
223 | }); |
224 | } |
225 | |
226 | fn copy_chunk_to_slow_hdd(c: &mut Criterion) { |
227 | let rt = rt(); |
228 | |
229 | c.bench_function("copy_chunk_to_slow_hdd" , |b| { |
230 | b.iter(|| { |
231 | let task = || async { |
232 | let mut source = |
233 | ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); |
234 | let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); |
235 | copy(&mut source, &mut dest).await.unwrap(); |
236 | }; |
237 | |
238 | rt.block_on(task()); |
239 | }) |
240 | }); |
241 | } |
242 | |
243 | criterion_group!( |
244 | copy_bench, |
245 | copy_mem_to_mem, |
246 | copy_mem_to_slow_hdd, |
247 | copy_chunk_to_mem, |
248 | copy_chunk_to_slow_hdd, |
249 | ); |
250 | criterion_main!(copy_bench); |
251 | |