1 | use futures::executor::block_on; |
2 | use futures::future::{Future, FutureExt}; |
3 | use futures::io::{ |
4 | AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, |
5 | BufReader, SeekFrom, |
6 | }; |
7 | use futures::pin_mut; |
8 | use futures::task::{Context, Poll}; |
9 | use futures_test::task::noop_context; |
10 | use pin_project::pin_project ; |
11 | use std::cmp; |
12 | use std::io; |
13 | use std::pin::Pin; |
14 | |
15 | // helper for maybe_pending_* tests |
16 | fn run<F: Future + Unpin>(mut f: F) -> F::Output { |
17 | let mut cx = noop_context(); |
18 | loop { |
19 | if let Poll::Ready(x) = f.poll_unpin(&mut cx) { |
20 | return x; |
21 | } |
22 | } |
23 | } |
24 | |
25 | // https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719 |
26 | #[pin_project (!Unpin)] |
27 | struct Cursor<T> { |
28 | #[pin] |
29 | inner: futures::io::Cursor<T>, |
30 | } |
31 | |
32 | impl<T> Cursor<T> { |
33 | fn new(inner: T) -> Self { |
34 | Self { inner: futures::io::Cursor::new(inner) } |
35 | } |
36 | } |
37 | |
38 | impl AsyncRead for Cursor<&[u8]> { |
39 | fn poll_read( |
40 | self: Pin<&mut Self>, |
41 | cx: &mut Context<'_>, |
42 | buf: &mut [u8], |
43 | ) -> Poll<io::Result<usize>> { |
44 | self.project().inner.poll_read(cx, buf) |
45 | } |
46 | } |
47 | |
48 | impl AsyncBufRead for Cursor<&[u8]> { |
49 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
50 | self.project().inner.poll_fill_buf(cx) |
51 | } |
52 | |
53 | fn consume(self: Pin<&mut Self>, amt: usize) { |
54 | self.project().inner.consume(amt) |
55 | } |
56 | } |
57 | |
58 | impl AsyncSeek for Cursor<&[u8]> { |
59 | fn poll_seek( |
60 | self: Pin<&mut Self>, |
61 | cx: &mut Context<'_>, |
62 | pos: SeekFrom, |
63 | ) -> Poll<io::Result<u64>> { |
64 | self.project().inner.poll_seek(cx, pos) |
65 | } |
66 | } |
67 | |
68 | struct MaybePending<'a> { |
69 | inner: &'a [u8], |
70 | ready_read: bool, |
71 | ready_fill_buf: bool, |
72 | } |
73 | |
74 | impl<'a> MaybePending<'a> { |
75 | fn new(inner: &'a [u8]) -> Self { |
76 | Self { inner, ready_read: false, ready_fill_buf: false } |
77 | } |
78 | } |
79 | |
80 | impl AsyncRead for MaybePending<'_> { |
81 | fn poll_read( |
82 | mut self: Pin<&mut Self>, |
83 | cx: &mut Context<'_>, |
84 | buf: &mut [u8], |
85 | ) -> Poll<io::Result<usize>> { |
86 | if self.ready_read { |
87 | self.ready_read = false; |
88 | Pin::new(&mut self.inner).poll_read(cx, buf) |
89 | } else { |
90 | self.ready_read = true; |
91 | Poll::Pending |
92 | } |
93 | } |
94 | } |
95 | |
96 | impl AsyncBufRead for MaybePending<'_> { |
97 | fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
98 | if self.ready_fill_buf { |
99 | self.ready_fill_buf = false; |
100 | if self.inner.is_empty() { |
101 | return Poll::Ready(Ok(&[])); |
102 | } |
103 | let len = cmp::min(2, self.inner.len()); |
104 | Poll::Ready(Ok(&self.inner[0..len])) |
105 | } else { |
106 | self.ready_fill_buf = true; |
107 | Poll::Pending |
108 | } |
109 | } |
110 | |
111 | fn consume(mut self: Pin<&mut Self>, amt: usize) { |
112 | self.inner = &self.inner[amt..]; |
113 | } |
114 | } |
115 | |
116 | #[test] |
117 | fn test_buffered_reader() { |
118 | block_on(async { |
119 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
120 | let mut reader = BufReader::with_capacity(2, inner); |
121 | |
122 | let mut buf = [0, 0, 0]; |
123 | let nread = reader.read(&mut buf).await.unwrap(); |
124 | assert_eq!(nread, 3); |
125 | assert_eq!(buf, [5, 6, 7]); |
126 | assert_eq!(reader.buffer(), []); |
127 | |
128 | let mut buf = [0, 0]; |
129 | let nread = reader.read(&mut buf).await.unwrap(); |
130 | assert_eq!(nread, 2); |
131 | assert_eq!(buf, [0, 1]); |
132 | assert_eq!(reader.buffer(), []); |
133 | |
134 | let mut buf = [0]; |
135 | let nread = reader.read(&mut buf).await.unwrap(); |
136 | assert_eq!(nread, 1); |
137 | assert_eq!(buf, [2]); |
138 | assert_eq!(reader.buffer(), [3]); |
139 | |
140 | let mut buf = [0, 0, 0]; |
141 | let nread = reader.read(&mut buf).await.unwrap(); |
142 | assert_eq!(nread, 1); |
143 | assert_eq!(buf, [3, 0, 0]); |
144 | assert_eq!(reader.buffer(), []); |
145 | |
146 | let nread = reader.read(&mut buf).await.unwrap(); |
147 | assert_eq!(nread, 1); |
148 | assert_eq!(buf, [4, 0, 0]); |
149 | assert_eq!(reader.buffer(), []); |
150 | |
151 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
152 | }); |
153 | } |
154 | |
155 | #[test] |
156 | fn test_buffered_reader_seek() { |
157 | block_on(async { |
158 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
159 | let reader = BufReader::with_capacity(2, Cursor::new(inner)); |
160 | pin_mut!(reader); |
161 | |
162 | assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3); |
163 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); |
164 | assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err()); |
165 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); |
166 | assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4); |
167 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]); |
168 | reader.as_mut().consume(1); |
169 | assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3); |
170 | }); |
171 | } |
172 | |
173 | #[test] |
174 | fn test_buffered_reader_seek_relative() { |
175 | block_on(async { |
176 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
177 | let reader = BufReader::with_capacity(2, Cursor::new(inner)); |
178 | pin_mut!(reader); |
179 | |
180 | assert!(reader.as_mut().seek_relative(3).await.is_ok()); |
181 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); |
182 | assert!(reader.as_mut().seek_relative(0).await.is_ok()); |
183 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); |
184 | assert!(reader.as_mut().seek_relative(1).await.is_ok()); |
185 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]); |
186 | assert!(reader.as_mut().seek_relative(-1).await.is_ok()); |
187 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]); |
188 | assert!(reader.as_mut().seek_relative(2).await.is_ok()); |
189 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]); |
190 | }); |
191 | } |
192 | |
193 | #[test] |
194 | fn test_buffered_reader_invalidated_after_read() { |
195 | block_on(async { |
196 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
197 | let reader = BufReader::with_capacity(3, Cursor::new(inner)); |
198 | pin_mut!(reader); |
199 | |
200 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); |
201 | reader.as_mut().consume(3); |
202 | |
203 | let mut buffer = [0, 0, 0, 0, 0]; |
204 | assert_eq!(reader.read(&mut buffer).await.unwrap(), 5); |
205 | assert_eq!(buffer, [0, 1, 2, 3, 4]); |
206 | |
207 | assert!(reader.as_mut().seek_relative(-2).await.is_ok()); |
208 | let mut buffer = [0, 0]; |
209 | assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); |
210 | assert_eq!(buffer, [3, 4]); |
211 | }); |
212 | } |
213 | |
214 | #[test] |
215 | fn test_buffered_reader_invalidated_after_seek() { |
216 | block_on(async { |
217 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
218 | let reader = BufReader::with_capacity(3, Cursor::new(inner)); |
219 | pin_mut!(reader); |
220 | |
221 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]); |
222 | reader.as_mut().consume(3); |
223 | |
224 | assert!(reader.seek(SeekFrom::Current(5)).await.is_ok()); |
225 | |
226 | assert!(reader.as_mut().seek_relative(-2).await.is_ok()); |
227 | let mut buffer = [0, 0]; |
228 | assert_eq!(reader.read(&mut buffer).await.unwrap(), 2); |
229 | assert_eq!(buffer, [3, 4]); |
230 | }); |
231 | } |
232 | |
233 | #[test] |
234 | fn test_buffered_reader_seek_underflow() { |
235 | // gimmick reader that yields its position modulo 256 for each byte |
236 | struct PositionReader { |
237 | pos: u64, |
238 | } |
239 | impl io::Read for PositionReader { |
240 | fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |
241 | let len = buf.len(); |
242 | for x in buf { |
243 | *x = self.pos as u8; |
244 | self.pos = self.pos.wrapping_add(1); |
245 | } |
246 | Ok(len) |
247 | } |
248 | } |
249 | impl io::Seek for PositionReader { |
250 | fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { |
251 | match pos { |
252 | SeekFrom::Start(n) => { |
253 | self.pos = n; |
254 | } |
255 | SeekFrom::Current(n) => { |
256 | self.pos = self.pos.wrapping_add(n as u64); |
257 | } |
258 | SeekFrom::End(n) => { |
259 | self.pos = u64::MAX.wrapping_add(n as u64); |
260 | } |
261 | } |
262 | Ok(self.pos) |
263 | } |
264 | } |
265 | |
266 | block_on(async { |
267 | let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); |
268 | pin_mut!(reader); |
269 | assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]); |
270 | assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5); |
271 | assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); |
272 | // the following seek will require two underlying seeks |
273 | let expected = 9_223_372_036_854_775_802; |
274 | assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected); |
275 | assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5); |
276 | // seeking to 0 should empty the buffer. |
277 | assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected); |
278 | assert_eq!(reader.get_ref().get_ref().pos, expected); |
279 | }); |
280 | } |
281 | |
282 | #[test] |
283 | fn test_short_reads() { |
284 | /// A dummy reader intended at testing short-reads propagation. |
285 | struct ShortReader { |
286 | lengths: Vec<usize>, |
287 | } |
288 | |
289 | impl io::Read for ShortReader { |
290 | fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { |
291 | if self.lengths.is_empty() { |
292 | Ok(0) |
293 | } else { |
294 | Ok(self.lengths.remove(0)) |
295 | } |
296 | } |
297 | } |
298 | |
299 | block_on(async { |
300 | let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; |
301 | let mut reader = BufReader::new(AllowStdIo::new(inner)); |
302 | let mut buf = [0, 0]; |
303 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
304 | assert_eq!(reader.read(&mut buf).await.unwrap(), 1); |
305 | assert_eq!(reader.read(&mut buf).await.unwrap(), 2); |
306 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
307 | assert_eq!(reader.read(&mut buf).await.unwrap(), 1); |
308 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
309 | assert_eq!(reader.read(&mut buf).await.unwrap(), 0); |
310 | }); |
311 | } |
312 | |
313 | #[test] |
314 | fn maybe_pending() { |
315 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
316 | let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); |
317 | |
318 | let mut buf = [0, 0, 0]; |
319 | let nread = run(reader.read(&mut buf)); |
320 | assert_eq!(nread.unwrap(), 3); |
321 | assert_eq!(buf, [5, 6, 7]); |
322 | assert_eq!(reader.buffer(), []); |
323 | |
324 | let mut buf = [0, 0]; |
325 | let nread = run(reader.read(&mut buf)); |
326 | assert_eq!(nread.unwrap(), 2); |
327 | assert_eq!(buf, [0, 1]); |
328 | assert_eq!(reader.buffer(), []); |
329 | |
330 | let mut buf = [0]; |
331 | let nread = run(reader.read(&mut buf)); |
332 | assert_eq!(nread.unwrap(), 1); |
333 | assert_eq!(buf, [2]); |
334 | assert_eq!(reader.buffer(), [3]); |
335 | |
336 | let mut buf = [0, 0, 0]; |
337 | let nread = run(reader.read(&mut buf)); |
338 | assert_eq!(nread.unwrap(), 1); |
339 | assert_eq!(buf, [3, 0, 0]); |
340 | assert_eq!(reader.buffer(), []); |
341 | |
342 | let nread = run(reader.read(&mut buf)); |
343 | assert_eq!(nread.unwrap(), 1); |
344 | assert_eq!(buf, [4, 0, 0]); |
345 | assert_eq!(reader.buffer(), []); |
346 | |
347 | assert_eq!(run(reader.read(&mut buf)).unwrap(), 0); |
348 | } |
349 | |
350 | #[test] |
351 | fn maybe_pending_buf_read() { |
352 | let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); |
353 | let mut reader = BufReader::with_capacity(2, inner); |
354 | let mut v = Vec::new(); |
355 | run(reader.read_until(3, &mut v)).unwrap(); |
356 | assert_eq!(v, [0, 1, 2, 3]); |
357 | v.clear(); |
358 | run(reader.read_until(1, &mut v)).unwrap(); |
359 | assert_eq!(v, [1]); |
360 | v.clear(); |
361 | run(reader.read_until(8, &mut v)).unwrap(); |
362 | assert_eq!(v, [0]); |
363 | v.clear(); |
364 | run(reader.read_until(9, &mut v)).unwrap(); |
365 | assert_eq!(v, []); |
366 | } |
367 | |
368 | // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 |
369 | #[test] |
370 | fn maybe_pending_seek() { |
371 | #[pin_project ] |
372 | struct MaybePendingSeek<'a> { |
373 | #[pin] |
374 | inner: Cursor<&'a [u8]>, |
375 | ready: bool, |
376 | } |
377 | |
378 | impl<'a> MaybePendingSeek<'a> { |
379 | fn new(inner: &'a [u8]) -> Self { |
380 | Self { inner: Cursor::new(inner), ready: true } |
381 | } |
382 | } |
383 | |
384 | impl AsyncRead for MaybePendingSeek<'_> { |
385 | fn poll_read( |
386 | self: Pin<&mut Self>, |
387 | cx: &mut Context<'_>, |
388 | buf: &mut [u8], |
389 | ) -> Poll<io::Result<usize>> { |
390 | self.project().inner.poll_read(cx, buf) |
391 | } |
392 | } |
393 | |
394 | impl AsyncBufRead for MaybePendingSeek<'_> { |
395 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
396 | self.project().inner.poll_fill_buf(cx) |
397 | } |
398 | |
399 | fn consume(self: Pin<&mut Self>, amt: usize) { |
400 | self.project().inner.consume(amt) |
401 | } |
402 | } |
403 | |
404 | impl AsyncSeek for MaybePendingSeek<'_> { |
405 | fn poll_seek( |
406 | mut self: Pin<&mut Self>, |
407 | cx: &mut Context<'_>, |
408 | pos: SeekFrom, |
409 | ) -> Poll<io::Result<u64>> { |
410 | if self.ready { |
411 | *self.as_mut().project().ready = false; |
412 | self.project().inner.poll_seek(cx, pos) |
413 | } else { |
414 | *self.project().ready = true; |
415 | Poll::Pending |
416 | } |
417 | } |
418 | } |
419 | |
420 | let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; |
421 | let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner)); |
422 | pin_mut!(reader); |
423 | |
424 | assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); |
425 | assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); |
426 | assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None); |
427 | assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..])); |
428 | assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); |
429 | assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..])); |
430 | Pin::new(&mut reader).consume(1); |
431 | assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3)); |
432 | } |
433 | |