1use futures::executor::block_on;
2use futures::future::{Future, FutureExt};
3use futures::io::{
4 AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt,
5 BufReader, SeekFrom,
6};
7use futures::pin_mut;
8use futures::task::{Context, Poll};
9use futures_test::task::noop_context;
10use pin_project::pin_project;
11use std::cmp;
12use std::io;
13use std::pin::Pin;
14
15// helper for maybe_pending_* tests
16fn run<F: Future + Unpin>(mut f: F) -> F::Output {
17 let mut cx = noop_context();
18 loop {
19 if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
20 return x;
21 }
22 }
23}
24
25// https://github.com/rust-lang/futures-rs/pull/2489#discussion_r697865719
26#[pin_project(!Unpin)]
27struct Cursor<T> {
28 #[pin]
29 inner: futures::io::Cursor<T>,
30}
31
32impl<T> Cursor<T> {
33 fn new(inner: T) -> Self {
34 Self { inner: futures::io::Cursor::new(inner) }
35 }
36}
37
38impl AsyncRead for Cursor<&[u8]> {
39 fn poll_read(
40 self: Pin<&mut Self>,
41 cx: &mut Context<'_>,
42 buf: &mut [u8],
43 ) -> Poll<io::Result<usize>> {
44 self.project().inner.poll_read(cx, buf)
45 }
46}
47
48impl AsyncBufRead for Cursor<&[u8]> {
49 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
50 self.project().inner.poll_fill_buf(cx)
51 }
52
53 fn consume(self: Pin<&mut Self>, amt: usize) {
54 self.project().inner.consume(amt)
55 }
56}
57
58impl AsyncSeek for Cursor<&[u8]> {
59 fn poll_seek(
60 self: Pin<&mut Self>,
61 cx: &mut Context<'_>,
62 pos: SeekFrom,
63 ) -> Poll<io::Result<u64>> {
64 self.project().inner.poll_seek(cx, pos)
65 }
66}
67
68struct MaybePending<'a> {
69 inner: &'a [u8],
70 ready_read: bool,
71 ready_fill_buf: bool,
72}
73
74impl<'a> MaybePending<'a> {
75 fn new(inner: &'a [u8]) -> Self {
76 Self { inner, ready_read: false, ready_fill_buf: false }
77 }
78}
79
80impl AsyncRead for MaybePending<'_> {
81 fn poll_read(
82 mut self: Pin<&mut Self>,
83 cx: &mut Context<'_>,
84 buf: &mut [u8],
85 ) -> Poll<io::Result<usize>> {
86 if self.ready_read {
87 self.ready_read = false;
88 Pin::new(&mut self.inner).poll_read(cx, buf)
89 } else {
90 self.ready_read = true;
91 Poll::Pending
92 }
93 }
94}
95
96impl AsyncBufRead for MaybePending<'_> {
97 fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
98 if self.ready_fill_buf {
99 self.ready_fill_buf = false;
100 if self.inner.is_empty() {
101 return Poll::Ready(Ok(&[]));
102 }
103 let len = cmp::min(2, self.inner.len());
104 Poll::Ready(Ok(&self.inner[0..len]))
105 } else {
106 self.ready_fill_buf = true;
107 Poll::Pending
108 }
109 }
110
111 fn consume(mut self: Pin<&mut Self>, amt: usize) {
112 self.inner = &self.inner[amt..];
113 }
114}
115
116#[test]
117fn test_buffered_reader() {
118 block_on(async {
119 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
120 let mut reader = BufReader::with_capacity(2, inner);
121
122 let mut buf = [0, 0, 0];
123 let nread = reader.read(&mut buf).await.unwrap();
124 assert_eq!(nread, 3);
125 assert_eq!(buf, [5, 6, 7]);
126 assert_eq!(reader.buffer(), []);
127
128 let mut buf = [0, 0];
129 let nread = reader.read(&mut buf).await.unwrap();
130 assert_eq!(nread, 2);
131 assert_eq!(buf, [0, 1]);
132 assert_eq!(reader.buffer(), []);
133
134 let mut buf = [0];
135 let nread = reader.read(&mut buf).await.unwrap();
136 assert_eq!(nread, 1);
137 assert_eq!(buf, [2]);
138 assert_eq!(reader.buffer(), [3]);
139
140 let mut buf = [0, 0, 0];
141 let nread = reader.read(&mut buf).await.unwrap();
142 assert_eq!(nread, 1);
143 assert_eq!(buf, [3, 0, 0]);
144 assert_eq!(reader.buffer(), []);
145
146 let nread = reader.read(&mut buf).await.unwrap();
147 assert_eq!(nread, 1);
148 assert_eq!(buf, [4, 0, 0]);
149 assert_eq!(reader.buffer(), []);
150
151 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
152 });
153}
154
155#[test]
156fn test_buffered_reader_seek() {
157 block_on(async {
158 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
159 let reader = BufReader::with_capacity(2, Cursor::new(inner));
160 pin_mut!(reader);
161
162 assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
163 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
164 assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
165 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
166 assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
167 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1, 2][..]);
168 reader.as_mut().consume(1);
169 assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
170 });
171}
172
173#[test]
174fn test_buffered_reader_seek_relative() {
175 block_on(async {
176 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
177 let reader = BufReader::with_capacity(2, Cursor::new(inner));
178 pin_mut!(reader);
179
180 assert!(reader.as_mut().seek_relative(3).await.is_ok());
181 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
182 assert!(reader.as_mut().seek_relative(0).await.is_ok());
183 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
184 assert!(reader.as_mut().seek_relative(1).await.is_ok());
185 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[1][..]);
186 assert!(reader.as_mut().seek_relative(-1).await.is_ok());
187 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1][..]);
188 assert!(reader.as_mut().seek_relative(2).await.is_ok());
189 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[2, 3][..]);
190 });
191}
192
193#[test]
194fn test_buffered_reader_invalidated_after_read() {
195 block_on(async {
196 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
197 let reader = BufReader::with_capacity(3, Cursor::new(inner));
198 pin_mut!(reader);
199
200 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
201 reader.as_mut().consume(3);
202
203 let mut buffer = [0, 0, 0, 0, 0];
204 assert_eq!(reader.read(&mut buffer).await.unwrap(), 5);
205 assert_eq!(buffer, [0, 1, 2, 3, 4]);
206
207 assert!(reader.as_mut().seek_relative(-2).await.is_ok());
208 let mut buffer = [0, 0];
209 assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
210 assert_eq!(buffer, [3, 4]);
211 });
212}
213
214#[test]
215fn test_buffered_reader_invalidated_after_seek() {
216 block_on(async {
217 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
218 let reader = BufReader::with_capacity(3, Cursor::new(inner));
219 pin_mut!(reader);
220
221 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[5, 6, 7][..]);
222 reader.as_mut().consume(3);
223
224 assert!(reader.seek(SeekFrom::Current(5)).await.is_ok());
225
226 assert!(reader.as_mut().seek_relative(-2).await.is_ok());
227 let mut buffer = [0, 0];
228 assert_eq!(reader.read(&mut buffer).await.unwrap(), 2);
229 assert_eq!(buffer, [3, 4]);
230 });
231}
232
233#[test]
234fn test_buffered_reader_seek_underflow() {
235 // gimmick reader that yields its position modulo 256 for each byte
236 struct PositionReader {
237 pos: u64,
238 }
239 impl io::Read for PositionReader {
240 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
241 let len = buf.len();
242 for x in buf {
243 *x = self.pos as u8;
244 self.pos = self.pos.wrapping_add(1);
245 }
246 Ok(len)
247 }
248 }
249 impl io::Seek for PositionReader {
250 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
251 match pos {
252 SeekFrom::Start(n) => {
253 self.pos = n;
254 }
255 SeekFrom::Current(n) => {
256 self.pos = self.pos.wrapping_add(n as u64);
257 }
258 SeekFrom::End(n) => {
259 self.pos = u64::MAX.wrapping_add(n as u64);
260 }
261 }
262 Ok(self.pos)
263 }
264 }
265
266 block_on(async {
267 let reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 }));
268 pin_mut!(reader);
269 assert_eq!(reader.as_mut().fill_buf().await.unwrap(), &[0, 1, 2, 3, 4][..]);
270 assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
271 assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
272 // the following seek will require two underlying seeks
273 let expected = 9_223_372_036_854_775_802;
274 assert_eq!(reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(), expected);
275 assert_eq!(reader.as_mut().fill_buf().await.unwrap().len(), 5);
276 // seeking to 0 should empty the buffer.
277 assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
278 assert_eq!(reader.get_ref().get_ref().pos, expected);
279 });
280}
281
282#[test]
283fn test_short_reads() {
284 /// A dummy reader intended at testing short-reads propagation.
285 struct ShortReader {
286 lengths: Vec<usize>,
287 }
288
289 impl io::Read for ShortReader {
290 fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
291 if self.lengths.is_empty() {
292 Ok(0)
293 } else {
294 Ok(self.lengths.remove(0))
295 }
296 }
297 }
298
299 block_on(async {
300 let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] };
301 let mut reader = BufReader::new(AllowStdIo::new(inner));
302 let mut buf = [0, 0];
303 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
304 assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
305 assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
306 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
307 assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
308 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
309 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
310 });
311}
312
313#[test]
314fn maybe_pending() {
315 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
316 let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
317
318 let mut buf = [0, 0, 0];
319 let nread = run(reader.read(&mut buf));
320 assert_eq!(nread.unwrap(), 3);
321 assert_eq!(buf, [5, 6, 7]);
322 assert_eq!(reader.buffer(), []);
323
324 let mut buf = [0, 0];
325 let nread = run(reader.read(&mut buf));
326 assert_eq!(nread.unwrap(), 2);
327 assert_eq!(buf, [0, 1]);
328 assert_eq!(reader.buffer(), []);
329
330 let mut buf = [0];
331 let nread = run(reader.read(&mut buf));
332 assert_eq!(nread.unwrap(), 1);
333 assert_eq!(buf, [2]);
334 assert_eq!(reader.buffer(), [3]);
335
336 let mut buf = [0, 0, 0];
337 let nread = run(reader.read(&mut buf));
338 assert_eq!(nread.unwrap(), 1);
339 assert_eq!(buf, [3, 0, 0]);
340 assert_eq!(reader.buffer(), []);
341
342 let nread = run(reader.read(&mut buf));
343 assert_eq!(nread.unwrap(), 1);
344 assert_eq!(buf, [4, 0, 0]);
345 assert_eq!(reader.buffer(), []);
346
347 assert_eq!(run(reader.read(&mut buf)).unwrap(), 0);
348}
349
350#[test]
351fn maybe_pending_buf_read() {
352 let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
353 let mut reader = BufReader::with_capacity(2, inner);
354 let mut v = Vec::new();
355 run(reader.read_until(3, &mut v)).unwrap();
356 assert_eq!(v, [0, 1, 2, 3]);
357 v.clear();
358 run(reader.read_until(1, &mut v)).unwrap();
359 assert_eq!(v, [1]);
360 v.clear();
361 run(reader.read_until(8, &mut v)).unwrap();
362 assert_eq!(v, [0]);
363 v.clear();
364 run(reader.read_until(9, &mut v)).unwrap();
365 assert_eq!(v, []);
366}
367
368// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
369#[test]
370fn maybe_pending_seek() {
371 #[pin_project]
372 struct MaybePendingSeek<'a> {
373 #[pin]
374 inner: Cursor<&'a [u8]>,
375 ready: bool,
376 }
377
378 impl<'a> MaybePendingSeek<'a> {
379 fn new(inner: &'a [u8]) -> Self {
380 Self { inner: Cursor::new(inner), ready: true }
381 }
382 }
383
384 impl AsyncRead for MaybePendingSeek<'_> {
385 fn poll_read(
386 self: Pin<&mut Self>,
387 cx: &mut Context<'_>,
388 buf: &mut [u8],
389 ) -> Poll<io::Result<usize>> {
390 self.project().inner.poll_read(cx, buf)
391 }
392 }
393
394 impl AsyncBufRead for MaybePendingSeek<'_> {
395 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
396 self.project().inner.poll_fill_buf(cx)
397 }
398
399 fn consume(self: Pin<&mut Self>, amt: usize) {
400 self.project().inner.consume(amt)
401 }
402 }
403
404 impl AsyncSeek for MaybePendingSeek<'_> {
405 fn poll_seek(
406 mut self: Pin<&mut Self>,
407 cx: &mut Context<'_>,
408 pos: SeekFrom,
409 ) -> Poll<io::Result<u64>> {
410 if self.ready {
411 *self.as_mut().project().ready = false;
412 self.project().inner.poll_seek(cx, pos)
413 } else {
414 *self.project().ready = true;
415 Poll::Pending
416 }
417 }
418 }
419
420 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
421 let reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
422 pin_mut!(reader);
423
424 assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3));
425 assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
426 assert_eq!(run(reader.seek(SeekFrom::Current(i64::MIN))).ok(), None);
427 assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[0, 1][..]));
428 assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4));
429 assert_eq!(run(reader.as_mut().fill_buf()).ok(), Some(&[1, 2][..]));
430 Pin::new(&mut reader).consume(1);
431 assert_eq!(run(reader.seek(SeekFrom::Current(-2))).ok(), Some(3));
432}
433