1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | // https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_reader.rs |
5 | |
6 | use futures::task::{noop_waker_ref, Context, Poll}; |
7 | use std::cmp; |
8 | use std::io::{self, Cursor}; |
9 | use std::pin::Pin; |
10 | use tokio::io::{ |
11 | AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt, |
12 | BufReader, ReadBuf, SeekFrom, |
13 | }; |
14 | use tokio_test::task::spawn; |
15 | use tokio_test::{assert_pending, assert_ready}; |
16 | |
17 | macro_rules! run_fill_buf { |
18 | ($reader:expr) => {{ |
19 | let mut cx = Context::from_waker(noop_waker_ref()); |
20 | loop { |
21 | if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { |
22 | break x; |
23 | } |
24 | } |
25 | }}; |
26 | } |
27 | |
28 | struct MaybePending<'a> { |
29 | inner: &'a [u8], |
30 | ready_read: bool, |
31 | ready_fill_buf: bool, |
32 | } |
33 | |
34 | impl<'a> MaybePending<'a> { |
35 | fn new(inner: &'a [u8]) -> Self { |
36 | Self { |
37 | inner, |
38 | ready_read: false, |
39 | ready_fill_buf: false, |
40 | } |
41 | } |
42 | } |
43 | |
44 | impl AsyncRead for MaybePending<'_> { |
45 | fn poll_read( |
46 | mut self: Pin<&mut Self>, |
47 | cx: &mut Context<'_>, |
48 | buf: &mut ReadBuf<'_>, |
49 | ) -> Poll<io::Result<()>> { |
50 | if self.ready_read { |
51 | self.ready_read = false; |
52 | Pin::new(&mut self.inner).poll_read(cx, buf) |
53 | } else { |
54 | self.ready_read = true; |
55 | cx.waker().wake_by_ref(); |
56 | Poll::Pending |
57 | } |
58 | } |
59 | } |
60 | |
61 | impl AsyncBufRead for MaybePending<'_> { |
62 | fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
63 | if self.ready_fill_buf { |
64 | self.ready_fill_buf = false; |
65 | if self.inner.is_empty() { |
66 | return Poll::Ready(Ok(&[])); |
67 | } |
68 | let len = cmp::min(2, self.inner.len()); |
69 | Poll::Ready(Ok(&self.inner[0..len])) |
70 | } else { |
71 | self.ready_fill_buf = true; |
72 | Poll::Pending |
73 | } |
74 | } |
75 | |
76 | fn consume(mut self: Pin<&mut Self>, amt: usize) { |
77 | self.inner = &self.inner[amt..]; |
78 | } |
79 | } |
80 | |
81 | #[tokio::test ] |
82 | async fn test_buffered_reader() { |
83 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
84 | let mut reader = BufReader::with_capacity(2, inner); |
85 | |
86 | let mut buf = [0, 0, 0]; |
87 | let nread = reader.read(&mut buf).await.unwrap(); |
88 | assert_eq!(nread, 3); |
89 | assert_eq!(buf, [5, 6, 7]); |
90 | assert_eq!(reader.buffer(), []); |
91 | |
92 | let mut buf = [0, 0]; |
93 | let nread = reader.read(&mut buf).await.unwrap(); |
94 | assert_eq!(nread, 2); |
95 | assert_eq!(buf, [0, 1]); |
96 | assert_eq!(reader.buffer(), []); |
97 | |
98 | let mut buf = [0]; |
99 | let nread = reader.read(&mut buf).await.unwrap(); |
100 | assert_eq!(nread, 1); |
101 | assert_eq!(buf, [2]); |
102 | assert_eq!(reader.buffer(), [3]); |
103 | |
104 | let mut buf = [0, 0, 0]; |
105 | let nread = reader.read(&mut buf).await.unwrap(); |
106 | assert_eq!(nread, 1); |
107 | assert_eq!(buf, [3, 0, 0]); |
108 | assert_eq!(reader.buffer(), []); |
109 | |
110 | let nread = reader.read(&mut buf).await.unwrap(); |
111 | assert_eq!(nread, 1); |
112 | assert_eq!(buf, [4, 0, 0]); |
113 | assert_eq!(reader.buffer(), []); |
114 | |
115 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
116 | } |
117 | |
118 | #[tokio::test ] |
119 | async fn test_buffered_reader_seek() { |
120 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
121 | let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); |
122 | |
123 | assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); |
124 | assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); |
125 | assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); |
126 | assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); |
127 | assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); |
128 | assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]); |
129 | Pin::new(&mut reader).consume(1); |
130 | assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); |
131 | } |
132 | |
133 | #[tokio::test ] |
134 | async fn test_buffered_reader_seek_underflow() { |
135 | // gimmick reader that yields its position modulo 256 for each byte |
136 | struct PositionReader { |
137 | pos: u64, |
138 | } |
139 | impl AsyncRead for PositionReader { |
140 | fn poll_read( |
141 | mut self: Pin<&mut Self>, |
142 | _: &mut Context<'_>, |
143 | buf: &mut ReadBuf<'_>, |
144 | ) -> Poll<io::Result<()>> { |
145 | let b = buf.initialize_unfilled(); |
146 | let len = b.len(); |
147 | for x in b { |
148 | *x = self.pos as u8; |
149 | self.pos = self.pos.wrapping_add(1); |
150 | } |
151 | buf.advance(len); |
152 | Poll::Ready(Ok(())) |
153 | } |
154 | } |
155 | impl AsyncSeek for PositionReader { |
156 | fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { |
157 | match pos { |
158 | SeekFrom::Start(n) => { |
159 | self.pos = n; |
160 | } |
161 | SeekFrom::Current(n) => { |
162 | self.pos = self.pos.wrapping_add(n as u64); |
163 | } |
164 | SeekFrom::End(n) => { |
165 | self.pos = u64::MAX.wrapping_add(n as u64); |
166 | } |
167 | } |
168 | Ok(()) |
169 | } |
170 | fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> { |
171 | Poll::Ready(Ok(self.pos)) |
172 | } |
173 | } |
174 | |
175 | let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 }); |
176 | assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1, 2, 3, 4][..]); |
177 | assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5); |
178 | assert_eq!(run_fill_buf!(reader).unwrap().len(), 5); |
179 | // the following seek will require two underlying seeks |
180 | let expected = 9_223_372_036_854_775_802; |
181 | assert_eq!( |
182 | reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), |
183 | expected |
184 | ); |
185 | assert_eq!(run_fill_buf!(reader).unwrap().len(), 5); |
186 | // seeking to 0 should empty the buffer. |
187 | assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); |
188 | assert_eq!(reader.get_ref().pos, expected); |
189 | } |
190 | |
191 | #[tokio::test ] |
192 | async fn test_short_reads() { |
193 | /// A dummy reader intended at testing short-reads propagation. |
194 | struct ShortReader { |
195 | lengths: Vec<usize>, |
196 | } |
197 | |
198 | impl AsyncRead for ShortReader { |
199 | fn poll_read( |
200 | mut self: Pin<&mut Self>, |
201 | _: &mut Context<'_>, |
202 | buf: &mut ReadBuf<'_>, |
203 | ) -> Poll<io::Result<()>> { |
204 | if !self.lengths.is_empty() { |
205 | buf.advance(self.lengths.remove(0)); |
206 | } |
207 | Poll::Ready(Ok(())) |
208 | } |
209 | } |
210 | |
211 | let inner = ShortReader { |
212 | lengths: vec![0, 1, 2, 0, 1, 0], |
213 | }; |
214 | let mut reader = BufReader::new(inner); |
215 | let mut buf = [0, 0]; |
216 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
217 | assert_eq!(reader.read(&mut buf).await.unwrap(), 1); |
218 | assert_eq!(reader.read(&mut buf).await.unwrap(), 2); |
219 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
220 | assert_eq!(reader.read(&mut buf).await.unwrap(), 1); |
221 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
222 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
223 | } |
224 | |
225 | #[tokio::test ] |
226 | async fn maybe_pending() { |
227 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
228 | let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); |
229 | |
230 | let mut buf = [0, 0, 0]; |
231 | let nread = reader.read(&mut buf).await.unwrap(); |
232 | assert_eq!(nread, 3); |
233 | assert_eq!(buf, [5, 6, 7]); |
234 | assert_eq!(reader.buffer(), []); |
235 | |
236 | let mut buf = [0, 0]; |
237 | let nread = reader.read(&mut buf).await.unwrap(); |
238 | assert_eq!(nread, 2); |
239 | assert_eq!(buf, [0, 1]); |
240 | assert_eq!(reader.buffer(), []); |
241 | |
242 | let mut buf = [0]; |
243 | let nread = reader.read(&mut buf).await.unwrap(); |
244 | assert_eq!(nread, 1); |
245 | assert_eq!(buf, [2]); |
246 | assert_eq!(reader.buffer(), [3]); |
247 | |
248 | let mut buf = [0, 0, 0]; |
249 | let nread = reader.read(&mut buf).await.unwrap(); |
250 | assert_eq!(nread, 1); |
251 | assert_eq!(buf, [3, 0, 0]); |
252 | assert_eq!(reader.buffer(), []); |
253 | |
254 | let nread = reader.read(&mut buf).await.unwrap(); |
255 | assert_eq!(nread, 1); |
256 | assert_eq!(buf, [4, 0, 0]); |
257 | assert_eq!(reader.buffer(), []); |
258 | |
259 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
260 | } |
261 | |
262 | #[tokio::test ] |
263 | async fn maybe_pending_buf_read() { |
264 | let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); |
265 | let mut reader = BufReader::with_capacity(2, inner); |
266 | let mut v = Vec::new(); |
267 | reader.read_until(3, &mut v).await.unwrap(); |
268 | assert_eq!(v, [0, 1, 2, 3]); |
269 | v.clear(); |
270 | reader.read_until(1, &mut v).await.unwrap(); |
271 | assert_eq!(v, [1]); |
272 | v.clear(); |
273 | reader.read_until(8, &mut v).await.unwrap(); |
274 | assert_eq!(v, [0]); |
275 | v.clear(); |
276 | reader.read_until(9, &mut v).await.unwrap(); |
277 | assert_eq!(v, []); |
278 | } |
279 | |
280 | // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 |
281 | #[tokio::test ] |
282 | async fn maybe_pending_seek() { |
283 | struct MaybePendingSeek<'a> { |
284 | inner: Cursor<&'a [u8]>, |
285 | ready: bool, |
286 | seek_res: Option<io::Result<()>>, |
287 | } |
288 | |
289 | impl<'a> MaybePendingSeek<'a> { |
290 | fn new(inner: &'a [u8]) -> Self { |
291 | Self { |
292 | inner: Cursor::new(inner), |
293 | ready: true, |
294 | seek_res: None, |
295 | } |
296 | } |
297 | } |
298 | |
299 | impl AsyncRead for MaybePendingSeek<'_> { |
300 | fn poll_read( |
301 | mut self: Pin<&mut Self>, |
302 | cx: &mut Context<'_>, |
303 | buf: &mut ReadBuf<'_>, |
304 | ) -> Poll<io::Result<()>> { |
305 | Pin::new(&mut self.inner).poll_read(cx, buf) |
306 | } |
307 | } |
308 | |
309 | impl AsyncBufRead for MaybePendingSeek<'_> { |
310 | fn poll_fill_buf( |
311 | mut self: Pin<&mut Self>, |
312 | cx: &mut Context<'_>, |
313 | ) -> Poll<io::Result<&[u8]>> { |
314 | let this: *mut Self = &mut *self as *mut _; |
315 | Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) |
316 | } |
317 | |
318 | fn consume(mut self: Pin<&mut Self>, amt: usize) { |
319 | Pin::new(&mut self.inner).consume(amt) |
320 | } |
321 | } |
322 | |
323 | impl AsyncSeek for MaybePendingSeek<'_> { |
324 | fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> { |
325 | self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos)); |
326 | Ok(()) |
327 | } |
328 | fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> { |
329 | if self.ready { |
330 | self.ready = false; |
331 | self.seek_res.take().unwrap_or(Ok(()))?; |
332 | Pin::new(&mut self.inner).poll_complete(cx) |
333 | } else { |
334 | self.ready = true; |
335 | cx.waker().wake_by_ref(); |
336 | Poll::Pending |
337 | } |
338 | } |
339 | } |
340 | |
341 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
342 | let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); |
343 | |
344 | assert_eq!(reader.seek(SeekFrom::Current(3)).await.unwrap(), 3); |
345 | assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); |
346 | assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); |
347 | assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]); |
348 | assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); |
349 | assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]); |
350 | Pin::new(&mut reader).consume(1); |
351 | assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); |
352 | } |
353 | |
354 | // This tests the AsyncBufReadExt::fill_buf wrapper. |
355 | #[tokio::test ] |
356 | async fn test_fill_buf_wrapper() { |
357 | let (mut write, read) = tokio::io::duplex(16); |
358 | |
359 | let mut read = BufReader::new(read); |
360 | write.write_all(b"hello world" ).await.unwrap(); |
361 | |
362 | assert_eq!(read.fill_buf().await.unwrap(), b"hello world" ); |
363 | read.consume(b"hello " .len()); |
364 | assert_eq!(read.fill_buf().await.unwrap(), b"world" ); |
365 | assert_eq!(read.fill_buf().await.unwrap(), b"world" ); |
366 | read.consume(b"world" .len()); |
367 | |
368 | let mut fill = spawn(read.fill_buf()); |
369 | assert_pending!(fill.poll()); |
370 | |
371 | write.write_all(b"foo bar" ).await.unwrap(); |
372 | assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar" ); |
373 | drop(fill); |
374 | |
375 | drop(write); |
376 | assert_eq!(read.fill_buf().await.unwrap(), b"foo bar" ); |
377 | read.consume(b"foo bar" .len()); |
378 | assert_eq!(read.fill_buf().await.unwrap(), b"" ); |
379 | } |
380 | |