1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4// https://github.com/rust-lang/futures-rs/blob/1803948ff091b4eabf7f3bf39e16bbbdefca5cc8/futures/tests/io_buf_reader.rs
5
6use futures::task::{noop_waker_ref, Context, Poll};
7use std::cmp;
8use std::io::{self, Cursor};
9use std::pin::Pin;
10use tokio::io::{
11 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWriteExt,
12 BufReader, ReadBuf, SeekFrom,
13};
14use tokio_test::task::spawn;
15use tokio_test::{assert_pending, assert_ready};
16
17macro_rules! run_fill_buf {
18 ($reader:expr) => {{
19 let mut cx = Context::from_waker(noop_waker_ref());
20 loop {
21 if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) {
22 break x;
23 }
24 }
25 }};
26}
27
28struct MaybePending<'a> {
29 inner: &'a [u8],
30 ready_read: bool,
31 ready_fill_buf: bool,
32}
33
34impl<'a> MaybePending<'a> {
35 fn new(inner: &'a [u8]) -> Self {
36 Self {
37 inner,
38 ready_read: false,
39 ready_fill_buf: false,
40 }
41 }
42}
43
44impl AsyncRead for MaybePending<'_> {
45 fn poll_read(
46 mut self: Pin<&mut Self>,
47 cx: &mut Context<'_>,
48 buf: &mut ReadBuf<'_>,
49 ) -> Poll<io::Result<()>> {
50 if self.ready_read {
51 self.ready_read = false;
52 Pin::new(&mut self.inner).poll_read(cx, buf)
53 } else {
54 self.ready_read = true;
55 cx.waker().wake_by_ref();
56 Poll::Pending
57 }
58 }
59}
60
61impl AsyncBufRead for MaybePending<'_> {
62 fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
63 if self.ready_fill_buf {
64 self.ready_fill_buf = false;
65 if self.inner.is_empty() {
66 return Poll::Ready(Ok(&[]));
67 }
68 let len = cmp::min(2, self.inner.len());
69 Poll::Ready(Ok(&self.inner[0..len]))
70 } else {
71 self.ready_fill_buf = true;
72 Poll::Pending
73 }
74 }
75
76 fn consume(mut self: Pin<&mut Self>, amt: usize) {
77 self.inner = &self.inner[amt..];
78 }
79}
80
81#[tokio::test]
82async fn test_buffered_reader() {
83 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
84 let mut reader = BufReader::with_capacity(2, inner);
85
86 let mut buf = [0, 0, 0];
87 let nread = reader.read(&mut buf).await.unwrap();
88 assert_eq!(nread, 3);
89 assert_eq!(buf, [5, 6, 7]);
90 assert_eq!(reader.buffer(), []);
91
92 let mut buf = [0, 0];
93 let nread = reader.read(&mut buf).await.unwrap();
94 assert_eq!(nread, 2);
95 assert_eq!(buf, [0, 1]);
96 assert_eq!(reader.buffer(), []);
97
98 let mut buf = [0];
99 let nread = reader.read(&mut buf).await.unwrap();
100 assert_eq!(nread, 1);
101 assert_eq!(buf, [2]);
102 assert_eq!(reader.buffer(), [3]);
103
104 let mut buf = [0, 0, 0];
105 let nread = reader.read(&mut buf).await.unwrap();
106 assert_eq!(nread, 1);
107 assert_eq!(buf, [3, 0, 0]);
108 assert_eq!(reader.buffer(), []);
109
110 let nread = reader.read(&mut buf).await.unwrap();
111 assert_eq!(nread, 1);
112 assert_eq!(buf, [4, 0, 0]);
113 assert_eq!(reader.buffer(), []);
114
115 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
116}
117
118#[tokio::test]
119async fn test_buffered_reader_seek() {
120 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
121 let mut reader = BufReader::with_capacity(2, Cursor::new(inner));
122
123 assert_eq!(reader.seek(SeekFrom::Start(3)).await.unwrap(), 3);
124 assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
125 assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
126 assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
127 assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
128 assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]);
129 Pin::new(&mut reader).consume(1);
130 assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
131}
132
133#[tokio::test]
134async fn test_buffered_reader_seek_underflow() {
135 // gimmick reader that yields its position modulo 256 for each byte
136 struct PositionReader {
137 pos: u64,
138 }
139 impl AsyncRead for PositionReader {
140 fn poll_read(
141 mut self: Pin<&mut Self>,
142 _: &mut Context<'_>,
143 buf: &mut ReadBuf<'_>,
144 ) -> Poll<io::Result<()>> {
145 let b = buf.initialize_unfilled();
146 let len = b.len();
147 for x in b {
148 *x = self.pos as u8;
149 self.pos = self.pos.wrapping_add(1);
150 }
151 buf.advance(len);
152 Poll::Ready(Ok(()))
153 }
154 }
155 impl AsyncSeek for PositionReader {
156 fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
157 match pos {
158 SeekFrom::Start(n) => {
159 self.pos = n;
160 }
161 SeekFrom::Current(n) => {
162 self.pos = self.pos.wrapping_add(n as u64);
163 }
164 SeekFrom::End(n) => {
165 self.pos = u64::MAX.wrapping_add(n as u64);
166 }
167 }
168 Ok(())
169 }
170 fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
171 Poll::Ready(Ok(self.pos))
172 }
173 }
174
175 let mut reader = BufReader::with_capacity(5, PositionReader { pos: 0 });
176 assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1, 2, 3, 4][..]);
177 assert_eq!(reader.seek(SeekFrom::End(-5)).await.unwrap(), u64::MAX - 5);
178 assert_eq!(run_fill_buf!(reader).unwrap().len(), 5);
179 // the following seek will require two underlying seeks
180 let expected = 9_223_372_036_854_775_802;
181 assert_eq!(
182 reader.seek(SeekFrom::Current(i64::MIN)).await.unwrap(),
183 expected
184 );
185 assert_eq!(run_fill_buf!(reader).unwrap().len(), 5);
186 // seeking to 0 should empty the buffer.
187 assert_eq!(reader.seek(SeekFrom::Current(0)).await.unwrap(), expected);
188 assert_eq!(reader.get_ref().pos, expected);
189}
190
191#[tokio::test]
192async fn test_short_reads() {
193 /// A dummy reader intended at testing short-reads propagation.
194 struct ShortReader {
195 lengths: Vec<usize>,
196 }
197
198 impl AsyncRead for ShortReader {
199 fn poll_read(
200 mut self: Pin<&mut Self>,
201 _: &mut Context<'_>,
202 buf: &mut ReadBuf<'_>,
203 ) -> Poll<io::Result<()>> {
204 if !self.lengths.is_empty() {
205 buf.advance(self.lengths.remove(0));
206 }
207 Poll::Ready(Ok(()))
208 }
209 }
210
211 let inner = ShortReader {
212 lengths: vec![0, 1, 2, 0, 1, 0],
213 };
214 let mut reader = BufReader::new(inner);
215 let mut buf = [0, 0];
216 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
217 assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
218 assert_eq!(reader.read(&mut buf).await.unwrap(), 2);
219 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
220 assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
221 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
222 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
223}
224
225#[tokio::test]
226async fn maybe_pending() {
227 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
228 let mut reader = BufReader::with_capacity(2, MaybePending::new(inner));
229
230 let mut buf = [0, 0, 0];
231 let nread = reader.read(&mut buf).await.unwrap();
232 assert_eq!(nread, 3);
233 assert_eq!(buf, [5, 6, 7]);
234 assert_eq!(reader.buffer(), []);
235
236 let mut buf = [0, 0];
237 let nread = reader.read(&mut buf).await.unwrap();
238 assert_eq!(nread, 2);
239 assert_eq!(buf, [0, 1]);
240 assert_eq!(reader.buffer(), []);
241
242 let mut buf = [0];
243 let nread = reader.read(&mut buf).await.unwrap();
244 assert_eq!(nread, 1);
245 assert_eq!(buf, [2]);
246 assert_eq!(reader.buffer(), [3]);
247
248 let mut buf = [0, 0, 0];
249 let nread = reader.read(&mut buf).await.unwrap();
250 assert_eq!(nread, 1);
251 assert_eq!(buf, [3, 0, 0]);
252 assert_eq!(reader.buffer(), []);
253
254 let nread = reader.read(&mut buf).await.unwrap();
255 assert_eq!(nread, 1);
256 assert_eq!(buf, [4, 0, 0]);
257 assert_eq!(reader.buffer(), []);
258
259 assert_eq!(reader.read(&mut buf).await.unwrap(), 0);
260}
261
262#[tokio::test]
263async fn maybe_pending_buf_read() {
264 let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]);
265 let mut reader = BufReader::with_capacity(2, inner);
266 let mut v = Vec::new();
267 reader.read_until(3, &mut v).await.unwrap();
268 assert_eq!(v, [0, 1, 2, 3]);
269 v.clear();
270 reader.read_until(1, &mut v).await.unwrap();
271 assert_eq!(v, [1]);
272 v.clear();
273 reader.read_until(8, &mut v).await.unwrap();
274 assert_eq!(v, [0]);
275 v.clear();
276 reader.read_until(9, &mut v).await.unwrap();
277 assert_eq!(v, []);
278}
279
280// https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309
281#[tokio::test]
282async fn maybe_pending_seek() {
283 struct MaybePendingSeek<'a> {
284 inner: Cursor<&'a [u8]>,
285 ready: bool,
286 seek_res: Option<io::Result<()>>,
287 }
288
289 impl<'a> MaybePendingSeek<'a> {
290 fn new(inner: &'a [u8]) -> Self {
291 Self {
292 inner: Cursor::new(inner),
293 ready: true,
294 seek_res: None,
295 }
296 }
297 }
298
299 impl AsyncRead for MaybePendingSeek<'_> {
300 fn poll_read(
301 mut self: Pin<&mut Self>,
302 cx: &mut Context<'_>,
303 buf: &mut ReadBuf<'_>,
304 ) -> Poll<io::Result<()>> {
305 Pin::new(&mut self.inner).poll_read(cx, buf)
306 }
307 }
308
309 impl AsyncBufRead for MaybePendingSeek<'_> {
310 fn poll_fill_buf(
311 mut self: Pin<&mut Self>,
312 cx: &mut Context<'_>,
313 ) -> Poll<io::Result<&[u8]>> {
314 let this: *mut Self = &mut *self as *mut _;
315 Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx)
316 }
317
318 fn consume(mut self: Pin<&mut Self>, amt: usize) {
319 Pin::new(&mut self.inner).consume(amt)
320 }
321 }
322
323 impl AsyncSeek for MaybePendingSeek<'_> {
324 fn start_seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> io::Result<()> {
325 self.seek_res = Some(Pin::new(&mut self.inner).start_seek(pos));
326 Ok(())
327 }
328 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
329 if self.ready {
330 self.ready = false;
331 self.seek_res.take().unwrap_or(Ok(()))?;
332 Pin::new(&mut self.inner).poll_complete(cx)
333 } else {
334 self.ready = true;
335 cx.waker().wake_by_ref();
336 Poll::Pending
337 }
338 }
339 }
340
341 let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4];
342 let mut reader = BufReader::with_capacity(2, MaybePendingSeek::new(inner));
343
344 assert_eq!(reader.seek(SeekFrom::Current(3)).await.unwrap(), 3);
345 assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
346 assert!(reader.seek(SeekFrom::Current(i64::MIN)).await.is_err());
347 assert_eq!(run_fill_buf!(reader).unwrap(), &[0, 1][..]);
348 assert_eq!(reader.seek(SeekFrom::Current(1)).await.unwrap(), 4);
349 assert_eq!(run_fill_buf!(reader).unwrap(), &[1, 2][..]);
350 Pin::new(&mut reader).consume(1);
351 assert_eq!(reader.seek(SeekFrom::Current(-2)).await.unwrap(), 3);
352}
353
354// This tests the AsyncBufReadExt::fill_buf wrapper.
355#[tokio::test]
356async fn test_fill_buf_wrapper() {
357 let (mut write, read) = tokio::io::duplex(16);
358
359 let mut read = BufReader::new(read);
360 write.write_all(b"hello world").await.unwrap();
361
362 assert_eq!(read.fill_buf().await.unwrap(), b"hello world");
363 read.consume(b"hello ".len());
364 assert_eq!(read.fill_buf().await.unwrap(), b"world");
365 assert_eq!(read.fill_buf().await.unwrap(), b"world");
366 read.consume(b"world".len());
367
368 let mut fill = spawn(read.fill_buf());
369 assert_pending!(fill.poll());
370
371 write.write_all(b"foo bar").await.unwrap();
372 assert_eq!(assert_ready!(fill.poll()).unwrap(), b"foo bar");
373 drop(fill);
374
375 drop(write);
376 assert_eq!(read.fill_buf().await.unwrap(), b"foo bar");
377 read.consume(b"foo bar".len());
378 assert_eq!(read.fill_buf().await.unwrap(), b"");
379}
380