1//! Asynchronous I/O.
2//!
3//! This module is the asynchronous version of `std::io`. It defines four
4//! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5//! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6//! standard library. However, these traits integrate with the asynchronous
7//! task system, so that if an I/O object isn't ready for reading (or writing),
8//! the thread is not blocked, and instead the current task is queued to be
9//! woken when I/O is ready.
10//!
11//! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12//! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13//! for operating with asynchronous I/O objects, including ways to work with
14//! them using futures, streams and sinks.
15//!
16//! This module is only available when the `std` feature of this
17//! library is activated, and it is activated by default.
18
19#[cfg(feature = "io-compat")]
20#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21use crate::compat::Compat;
22use crate::future::assert_future;
23use crate::stream::assert_stream;
24use std::{pin::Pin, ptr};
25
26// Re-export some types from `std::io` so that users don't have to deal
27// with conflicts when `use`ing `futures::io` and `std::io`.
28#[doc(no_inline)]
29pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30
31pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
32
33// used by `BufReader` and `BufWriter`
34// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
35const DEFAULT_BUF_SIZE: usize = 8 * 1024;
36
37/// Initializes a buffer if necessary.
38///
39/// A buffer is currently always initialized.
40#[inline]
41unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
42 ptr::write_bytes(dst:buf.as_mut_ptr(), val:0, count:buf.len())
43}
44
45mod allow_std;
46pub use self::allow_std::AllowStdIo;
47
48mod buf_reader;
49pub use self::buf_reader::{BufReader, SeeKRelative};
50
51mod buf_writer;
52pub use self::buf_writer::BufWriter;
53
54mod line_writer;
55pub use self::line_writer::LineWriter;
56
57mod chain;
58pub use self::chain::Chain;
59
60mod close;
61pub use self::close::Close;
62
63mod copy;
64pub use self::copy::{copy, Copy};
65
66mod copy_buf;
67pub use self::copy_buf::{copy_buf, CopyBuf};
68
69mod copy_buf_abortable;
70pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
71
72mod cursor;
73pub use self::cursor::Cursor;
74
75mod empty;
76pub use self::empty::{empty, Empty};
77
78mod fill_buf;
79pub use self::fill_buf::FillBuf;
80
81mod flush;
82pub use self::flush::Flush;
83
84#[cfg(feature = "sink")]
85#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
86mod into_sink;
87#[cfg(feature = "sink")]
88#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
89pub use self::into_sink::IntoSink;
90
91mod lines;
92pub use self::lines::Lines;
93
94mod read;
95pub use self::read::Read;
96
97mod read_vectored;
98pub use self::read_vectored::ReadVectored;
99
100mod read_exact;
101pub use self::read_exact::ReadExact;
102
103mod read_line;
104pub use self::read_line::ReadLine;
105
106mod read_to_end;
107pub use self::read_to_end::ReadToEnd;
108
109mod read_to_string;
110pub use self::read_to_string::ReadToString;
111
112mod read_until;
113pub use self::read_until::ReadUntil;
114
115mod repeat;
116pub use self::repeat::{repeat, Repeat};
117
118mod seek;
119pub use self::seek::Seek;
120
121mod sink;
122pub use self::sink::{sink, Sink};
123
124mod split;
125pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
126
127mod take;
128pub use self::take::Take;
129
130mod window;
131pub use self::window::Window;
132
133mod write;
134pub use self::write::Write;
135
136mod write_vectored;
137pub use self::write_vectored::WriteVectored;
138
139mod write_all;
140pub use self::write_all::WriteAll;
141
142#[cfg(feature = "write-all-vectored")]
143mod write_all_vectored;
144#[cfg(feature = "write-all-vectored")]
145pub use self::write_all_vectored::WriteAllVectored;
146
147/// An extension trait which adds utility methods to `AsyncRead` types.
148pub trait AsyncReadExt: AsyncRead {
149 /// Creates an adaptor which will chain this stream with another.
150 ///
151 /// The returned `AsyncRead` instance will first read all bytes from this object
152 /// until EOF is encountered. Afterwards the output is equivalent to the
153 /// output of `next`.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// # futures::executor::block_on(async {
159 /// use futures::io::{AsyncReadExt, Cursor};
160 ///
161 /// let reader1 = Cursor::new([1, 2, 3, 4]);
162 /// let reader2 = Cursor::new([5, 6, 7, 8]);
163 ///
164 /// let mut reader = reader1.chain(reader2);
165 /// let mut buffer = Vec::new();
166 ///
167 /// // read the value into a Vec.
168 /// reader.read_to_end(&mut buffer).await?;
169 /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
170 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
171 /// ```
172 fn chain<R>(self, next: R) -> Chain<Self, R>
173 where
174 Self: Sized,
175 R: AsyncRead,
176 {
177 assert_read(Chain::new(self, next))
178 }
179
180 /// Tries to read some bytes directly into the given `buf` in asynchronous
181 /// manner, returning a future type.
182 ///
183 /// The returned future will resolve to the number of bytes read once the read
184 /// operation is completed.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// # futures::executor::block_on(async {
190 /// use futures::io::{AsyncReadExt, Cursor};
191 ///
192 /// let mut reader = Cursor::new([1, 2, 3, 4]);
193 /// let mut output = [0u8; 5];
194 ///
195 /// let bytes = reader.read(&mut output[..]).await?;
196 ///
197 /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
198 /// // reader. In a real system you could get anywhere from 1 to
199 /// // `output.len()` bytes in a single read.
200 /// assert_eq!(bytes, 4);
201 /// assert_eq!(output, [1, 2, 3, 4, 0]);
202 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
203 /// ```
204 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
205 where
206 Self: Unpin,
207 {
208 assert_future::<Result<usize>, _>(Read::new(self, buf))
209 }
210
211 /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
212 /// IO operations.
213 ///
214 /// The returned future will resolve to the number of bytes read once the read
215 /// operation is completed.
216 fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
217 where
218 Self: Unpin,
219 {
220 assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
221 }
222
223 /// Creates a future which will read exactly enough bytes to fill `buf`,
224 /// returning an error if end of file (EOF) is hit sooner.
225 ///
226 /// The returned future will resolve once the read operation is completed.
227 ///
228 /// In the case of an error the buffer and the object will be discarded, with
229 /// the error yielded.
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// # futures::executor::block_on(async {
235 /// use futures::io::{AsyncReadExt, Cursor};
236 ///
237 /// let mut reader = Cursor::new([1, 2, 3, 4]);
238 /// let mut output = [0u8; 4];
239 ///
240 /// reader.read_exact(&mut output).await?;
241 ///
242 /// assert_eq!(output, [1, 2, 3, 4]);
243 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
244 /// ```
245 ///
246 /// ## EOF is hit before `buf` is filled
247 ///
248 /// ```
249 /// # futures::executor::block_on(async {
250 /// use futures::io::{self, AsyncReadExt, Cursor};
251 ///
252 /// let mut reader = Cursor::new([1, 2, 3, 4]);
253 /// let mut output = [0u8; 5];
254 ///
255 /// let result = reader.read_exact(&mut output).await;
256 ///
257 /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
258 /// # });
259 /// ```
260 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
261 where
262 Self: Unpin,
263 {
264 assert_future::<Result<()>, _>(ReadExact::new(self, buf))
265 }
266
267 /// Creates a future which will read all the bytes from this `AsyncRead`.
268 ///
269 /// On success the total number of bytes read is returned.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// # futures::executor::block_on(async {
275 /// use futures::io::{AsyncReadExt, Cursor};
276 ///
277 /// let mut reader = Cursor::new([1, 2, 3, 4]);
278 /// let mut output = Vec::with_capacity(4);
279 ///
280 /// let bytes = reader.read_to_end(&mut output).await?;
281 ///
282 /// assert_eq!(bytes, 4);
283 /// assert_eq!(output, vec![1, 2, 3, 4]);
284 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
285 /// ```
286 fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
287 where
288 Self: Unpin,
289 {
290 assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
291 }
292
293 /// Creates a future which will read all the bytes from this `AsyncRead`.
294 ///
295 /// On success the total number of bytes read is returned.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// # futures::executor::block_on(async {
301 /// use futures::io::{AsyncReadExt, Cursor};
302 ///
303 /// let mut reader = Cursor::new(&b"1234"[..]);
304 /// let mut buffer = String::with_capacity(4);
305 ///
306 /// let bytes = reader.read_to_string(&mut buffer).await?;
307 ///
308 /// assert_eq!(bytes, 4);
309 /// assert_eq!(buffer, String::from("1234"));
310 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
311 /// ```
312 fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
313 where
314 Self: Unpin,
315 {
316 assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
317 }
318
319 /// Helper method for splitting this read/write object into two halves.
320 ///
321 /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
322 /// traits, respectively.
323 ///
324 /// # Examples
325 ///
326 /// ```
327 /// # futures::executor::block_on(async {
328 /// use futures::io::{self, AsyncReadExt, Cursor};
329 ///
330 /// // Note that for `Cursor` the read and write halves share a single
331 /// // seek position. This may or may not be true for other types that
332 /// // implement both `AsyncRead` and `AsyncWrite`.
333 ///
334 /// let reader = Cursor::new([1, 2, 3, 4]);
335 /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
336 /// let mut writer = Cursor::new(vec![0u8; 5]);
337 ///
338 /// {
339 /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
340 /// io::copy(reader, &mut buffer_writer).await?;
341 /// io::copy(buffer_reader, &mut writer).await?;
342 /// }
343 ///
344 /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
345 /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
346 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
347 /// ```
348 fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
349 where
350 Self: AsyncWrite + Sized,
351 {
352 let (r, w) = split::split(self);
353 (assert_read(r), assert_write(w))
354 }
355
356 /// Creates an AsyncRead adapter which will read at most `limit` bytes
357 /// from the underlying reader.
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// # futures::executor::block_on(async {
363 /// use futures::io::{AsyncReadExt, Cursor};
364 ///
365 /// let reader = Cursor::new(&b"12345678"[..]);
366 /// let mut buffer = [0; 5];
367 ///
368 /// let mut take = reader.take(4);
369 /// let n = take.read(&mut buffer).await?;
370 ///
371 /// assert_eq!(n, 4);
372 /// assert_eq!(&buffer, b"1234\0");
373 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
374 /// ```
375 fn take(self, limit: u64) -> Take<Self>
376 where
377 Self: Sized,
378 {
379 assert_read(Take::new(self, limit))
380 }
381
382 /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
383 /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
384 /// implements [`AsyncWrite`] as well, the result will also implement the
385 /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
386 ///
387 /// Requires the `io-compat` feature to enable.
388 #[cfg(feature = "io-compat")]
389 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
390 fn compat(self) -> Compat<Self>
391 where
392 Self: Sized + Unpin,
393 {
394 Compat::new(self)
395 }
396}
397
398impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
399
400/// An extension trait which adds utility methods to `AsyncWrite` types.
401pub trait AsyncWriteExt: AsyncWrite {
402 /// Creates a future which will entirely flush this `AsyncWrite`.
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// # futures::executor::block_on(async {
408 /// use futures::io::{AllowStdIo, AsyncWriteExt};
409 /// use std::io::{BufWriter, Cursor};
410 ///
411 /// let mut output = vec![0u8; 5];
412 ///
413 /// {
414 /// let writer = Cursor::new(&mut output);
415 /// let mut buffered = AllowStdIo::new(BufWriter::new(writer));
416 /// buffered.write_all(&[1, 2]).await?;
417 /// buffered.write_all(&[3, 4]).await?;
418 /// buffered.flush().await?;
419 /// }
420 ///
421 /// assert_eq!(output, [1, 2, 3, 4, 0]);
422 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
423 /// ```
424 fn flush(&mut self) -> Flush<'_, Self>
425 where
426 Self: Unpin,
427 {
428 assert_future::<Result<()>, _>(Flush::new(self))
429 }
430
431 /// Creates a future which will entirely close this `AsyncWrite`.
432 fn close(&mut self) -> Close<'_, Self>
433 where
434 Self: Unpin,
435 {
436 assert_future::<Result<()>, _>(Close::new(self))
437 }
438
439 /// Creates a future which will write bytes from `buf` into the object.
440 ///
441 /// The returned future will resolve to the number of bytes written once the write
442 /// operation is completed.
443 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
444 where
445 Self: Unpin,
446 {
447 assert_future::<Result<usize>, _>(Write::new(self, buf))
448 }
449
450 /// Creates a future which will write bytes from `bufs` into the object using vectored
451 /// IO operations.
452 ///
453 /// The returned future will resolve to the number of bytes written once the write
454 /// operation is completed.
455 fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
456 where
457 Self: Unpin,
458 {
459 assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
460 }
461
462 /// Write data into this object.
463 ///
464 /// Creates a future that will write the entire contents of the buffer `buf` into
465 /// this `AsyncWrite`.
466 ///
467 /// The returned future will not complete until all the data has been written.
468 ///
469 /// # Examples
470 ///
471 /// ```
472 /// # futures::executor::block_on(async {
473 /// use futures::io::{AsyncWriteExt, Cursor};
474 ///
475 /// let mut writer = Cursor::new(vec![0u8; 5]);
476 ///
477 /// writer.write_all(&[1, 2, 3, 4]).await?;
478 ///
479 /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
480 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
481 /// ```
482 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
483 where
484 Self: Unpin,
485 {
486 assert_future::<Result<()>, _>(WriteAll::new(self, buf))
487 }
488
489 /// Attempts to write multiple buffers into this writer.
490 ///
491 /// Creates a future that will write the entire contents of `bufs` into this
492 /// `AsyncWrite` using [vectored writes].
493 ///
494 /// The returned future will not complete until all the data has been
495 /// written.
496 ///
497 /// [vectored writes]: std::io::Write::write_vectored
498 ///
499 /// # Notes
500 ///
501 /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
502 /// a slice of `IoSlice`s, not an immutable one. That's because we need to
503 /// modify the slice to keep track of the bytes already written.
504 ///
505 /// Once this futures returns, the contents of `bufs` are unspecified, as
506 /// this depends on how many calls to `write_vectored` were necessary. It is
507 /// best to understand this function as taking ownership of `bufs` and to
508 /// not use `bufs` afterwards. The underlying buffers, to which the
509 /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
510 /// can be reused.
511 ///
512 /// # Examples
513 ///
514 /// ```
515 /// # futures::executor::block_on(async {
516 /// use futures::io::AsyncWriteExt;
517 /// use futures_util::io::Cursor;
518 /// use std::io::IoSlice;
519 ///
520 /// let mut writer = Cursor::new(Vec::new());
521 /// let bufs = &mut [
522 /// IoSlice::new(&[1]),
523 /// IoSlice::new(&[2, 3]),
524 /// IoSlice::new(&[4, 5, 6]),
525 /// ];
526 ///
527 /// writer.write_all_vectored(bufs).await?;
528 /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
529 ///
530 /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
531 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
532 /// ```
533 #[cfg(feature = "write-all-vectored")]
534 fn write_all_vectored<'a>(
535 &'a mut self,
536 bufs: &'a mut [IoSlice<'a>],
537 ) -> WriteAllVectored<'a, Self>
538 where
539 Self: Unpin,
540 {
541 assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
542 }
543
544 /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
545 /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
546 /// Requires the `io-compat` feature to enable.
547 #[cfg(feature = "io-compat")]
548 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
549 fn compat_write(self) -> Compat<Self>
550 where
551 Self: Sized + Unpin,
552 {
553 Compat::new(self)
554 }
555
556 /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
557 ///
558 /// This adapter produces a sink that will write each value passed to it
559 /// into the underlying writer.
560 ///
561 /// Note that this function consumes the given writer, returning a wrapped
562 /// version.
563 ///
564 /// # Examples
565 ///
566 /// ```
567 /// # futures::executor::block_on(async {
568 /// use futures::io::AsyncWriteExt;
569 /// use futures::stream::{self, StreamExt};
570 ///
571 /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
572 ///
573 /// let mut writer = vec![];
574 ///
575 /// stream.forward((&mut writer).into_sink()).await?;
576 ///
577 /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
578 /// # Ok::<(), Box<dyn std::error::Error>>(())
579 /// # })?;
580 /// # Ok::<(), Box<dyn std::error::Error>>(())
581 /// ```
582 #[cfg(feature = "sink")]
583 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
584 fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
585 where
586 Self: Sized,
587 {
588 crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
589 }
590}
591
592impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
593
594/// An extension trait which adds utility methods to `AsyncSeek` types.
595pub trait AsyncSeekExt: AsyncSeek {
596 /// Creates a future which will seek an IO object, and then yield the
597 /// new position in the object and the object itself.
598 ///
599 /// In the case of an error the buffer and the object will be discarded, with
600 /// the error yielded.
601 fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
602 where
603 Self: Unpin,
604 {
605 assert_future::<Result<u64>, _>(Seek::new(self, pos))
606 }
607
608 /// Creates a future which will return the current seek position from the
609 /// start of the stream.
610 ///
611 /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
612 fn stream_position(&mut self) -> Seek<'_, Self>
613 where
614 Self: Unpin,
615 {
616 self.seek(pos:SeekFrom::Current(0))
617 }
618}
619
620impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
621
622/// An extension trait which adds utility methods to `AsyncBufRead` types.
623pub trait AsyncBufReadExt: AsyncBufRead {
624 /// Creates a future which will wait for a non-empty buffer to be available from this I/O
625 /// object or EOF to be reached.
626 ///
627 /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
628 ///
629 /// ```rust
630 /// # futures::executor::block_on(async {
631 /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
632 ///
633 /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
634 ///
635 /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
636 /// stream.consume_unpin(2);
637 ///
638 /// assert_eq!(stream.fill_buf().await?, vec![3]);
639 /// stream.consume_unpin(1);
640 ///
641 /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
642 /// stream.consume_unpin(3);
643 ///
644 /// assert_eq!(stream.fill_buf().await?, vec![]);
645 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
646 /// ```
647 fn fill_buf(&mut self) -> FillBuf<'_, Self>
648 where
649 Self: Unpin,
650 {
651 assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
652 }
653
654 /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
655 ///
656 /// ```rust
657 /// # futures::executor::block_on(async {
658 /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
659 ///
660 /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
661 ///
662 /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
663 /// stream.consume_unpin(2);
664 ///
665 /// assert_eq!(stream.fill_buf().await?, vec![3]);
666 /// stream.consume_unpin(1);
667 ///
668 /// assert_eq!(stream.fill_buf().await?, vec![]);
669 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
670 /// ```
671 fn consume_unpin(&mut self, amt: usize)
672 where
673 Self: Unpin,
674 {
675 Pin::new(self).consume(amt)
676 }
677
678 /// Creates a future which will read all the bytes associated with this I/O
679 /// object into `buf` until the delimiter `byte` or EOF is reached.
680 /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
681 ///
682 /// This function will read bytes from the underlying stream until the
683 /// delimiter or EOF is found. Once found, all bytes up to, and including,
684 /// the delimiter (if found) will be appended to `buf`.
685 ///
686 /// The returned future will resolve to the number of bytes read once the read
687 /// operation is completed.
688 ///
689 /// In the case of an error the buffer and the object will be discarded, with
690 /// the error yielded.
691 ///
692 /// # Examples
693 ///
694 /// ```
695 /// # futures::executor::block_on(async {
696 /// use futures::io::{AsyncBufReadExt, Cursor};
697 ///
698 /// let mut cursor = Cursor::new(b"lorem-ipsum");
699 /// let mut buf = vec![];
700 ///
701 /// // cursor is at 'l'
702 /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
703 /// assert_eq!(num_bytes, 6);
704 /// assert_eq!(buf, b"lorem-");
705 /// buf.clear();
706 ///
707 /// // cursor is at 'i'
708 /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
709 /// assert_eq!(num_bytes, 5);
710 /// assert_eq!(buf, b"ipsum");
711 /// buf.clear();
712 ///
713 /// // cursor is at EOF
714 /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
715 /// assert_eq!(num_bytes, 0);
716 /// assert_eq!(buf, b"");
717 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
718 /// ```
719 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
720 where
721 Self: Unpin,
722 {
723 assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
724 }
725
726 /// Creates a future which will read all the bytes associated with this I/O
727 /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
728 /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
729 ///
730 /// This function will read bytes from the underlying stream until the
731 /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
732 /// up to, and including, the delimiter (if found) will be appended to
733 /// `buf`.
734 ///
735 /// The returned future will resolve to the number of bytes read once the read
736 /// operation is completed.
737 ///
738 /// In the case of an error the buffer and the object will be discarded, with
739 /// the error yielded.
740 ///
741 /// # Errors
742 ///
743 /// This function has the same error semantics as [`read_until`] and will
744 /// also return an error if the read bytes are not valid UTF-8. If an I/O
745 /// error is encountered then `buf` may contain some bytes already read in
746 /// the event that all data read so far was valid UTF-8.
747 ///
748 /// [`read_until`]: AsyncBufReadExt::read_until
749 ///
750 /// # Examples
751 ///
752 /// ```
753 /// # futures::executor::block_on(async {
754 /// use futures::io::{AsyncBufReadExt, Cursor};
755 ///
756 /// let mut cursor = Cursor::new(b"foo\nbar");
757 /// let mut buf = String::new();
758 ///
759 /// // cursor is at 'f'
760 /// let num_bytes = cursor.read_line(&mut buf).await?;
761 /// assert_eq!(num_bytes, 4);
762 /// assert_eq!(buf, "foo\n");
763 /// buf.clear();
764 ///
765 /// // cursor is at 'b'
766 /// let num_bytes = cursor.read_line(&mut buf).await?;
767 /// assert_eq!(num_bytes, 3);
768 /// assert_eq!(buf, "bar");
769 /// buf.clear();
770 ///
771 /// // cursor is at EOF
772 /// let num_bytes = cursor.read_line(&mut buf).await?;
773 /// assert_eq!(num_bytes, 0);
774 /// assert_eq!(buf, "");
775 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
776 /// ```
777 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
778 where
779 Self: Unpin,
780 {
781 assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
782 }
783
784 /// Returns a stream over the lines of this reader.
785 /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
786 ///
787 /// The stream returned from this function will yield instances of
788 /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
789 /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
790 ///
791 /// [`io::Result`]: std::io::Result
792 /// [`String`]: String
793 ///
794 /// # Errors
795 ///
796 /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
797 ///
798 /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
799 ///
800 /// # Examples
801 ///
802 /// ```
803 /// # futures::executor::block_on(async {
804 /// use futures::io::{AsyncBufReadExt, Cursor};
805 /// use futures::stream::StreamExt;
806 ///
807 /// let cursor = Cursor::new(b"lorem\nipsum\xc2\r\ndolor");
808 ///
809 /// let mut lines_stream = cursor.lines().map(|l| l.unwrap_or(String::from("invalid UTF_8")));
810 /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
811 /// assert_eq!(lines_stream.next().await, Some(String::from("invalid UTF_8")));
812 /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
813 /// assert_eq!(lines_stream.next().await, None);
814 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
815 /// ```
816 fn lines(self) -> Lines<Self>
817 where
818 Self: Sized,
819 {
820 assert_stream::<Result<String>, _>(Lines::new(self))
821 }
822}
823
824impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
825
826// Just a helper function to ensure the reader we're returning all have the
827// right implementations.
828pub(crate) fn assert_read<R>(reader: R) -> R
829where
830 R: AsyncRead,
831{
832 reader
833}
834// Just a helper function to ensure the writer we're returning all have the
835// right implementations.
836pub(crate) fn assert_write<W>(writer: W) -> W
837where
838 W: AsyncWrite,
839{
840 writer
841}
842