1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::borrow::{Borrow, BorrowMut};
24use std::cmp;
25use std::fmt;
26use std::future::Future;
27use std::io::{IoSlice, IoSliceMut};
28use std::mem;
29use std::pin::Pin;
30use std::sync::{Arc, Mutex};
31use std::task::{Context, Poll};
32
33use futures_core::stream::Stream;
34use pin_project_lite::pin_project;
35
36use crate::future;
37use crate::ready;
38
39const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40
41/// Copies the entire contents of a reader into a writer.
42///
43/// This function will read data from `reader` and write it into `writer` in a streaming fashion
44/// until `reader` returns EOF.
45///
46/// On success, returns the total number of bytes copied.
47///
48/// # Examples
49///
50/// ```
51/// use futures_lite::io::{self, BufReader, BufWriter};
52///
53/// # spin_on::spin_on(async {
54/// let input: &[u8] = b"hello";
55/// let reader = BufReader::new(input);
56///
57/// let mut output = Vec::new();
58/// let writer = BufWriter::new(&mut output);
59///
60/// io::copy(reader, writer).await?;
61/// # std::io::Result::Ok(()) });
62/// ```
63pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
64where
65 R: AsyncRead + Unpin,
66 W: AsyncWrite + Unpin,
67{
68 pin_project! {
69 struct CopyFuture<R, W> {
70 #[pin]
71 reader: R,
72 #[pin]
73 writer: W,
74 amt: u64,
75 }
76 }
77
78 impl<R, W> Future for CopyFuture<R, W>
79 where
80 R: AsyncBufRead,
81 W: AsyncWrite + Unpin,
82 {
83 type Output = Result<u64>;
84
85 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86 let mut this = self.project();
87 loop {
88 let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
89 if buffer.is_empty() {
90 ready!(this.writer.as_mut().poll_flush(cx))?;
91 return Poll::Ready(Ok(*this.amt));
92 }
93
94 let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
95 if i == 0 {
96 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
97 }
98 *this.amt += i as u64;
99 this.reader.as_mut().consume(i);
100 }
101 }
102 }
103
104 let future = CopyFuture {
105 reader: BufReader::new(reader),
106 writer,
107 amt: 0,
108 };
109 future.await
110}
111
112/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
113///
114/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
115/// This is usually the case for in-memory buffered I/O.
116///
117/// # Examples
118///
119/// ```
120/// use futures_lite::io::{AssertAsync, AsyncReadExt};
121///
122/// let reader: &[u8] = b"hello";
123///
124/// # spin_on::spin_on(async {
125/// let mut async_reader = AssertAsync::new(reader);
126/// let mut contents = String::new();
127///
128/// // This line works in async manner - note that there is await:
129/// async_reader.read_to_string(&mut contents).await?;
130/// # std::io::Result::Ok(()) });
131/// ```
132#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
133pub struct AssertAsync<T>(T);
134
135impl<T> Unpin for AssertAsync<T> {}
136
137impl<T> AssertAsync<T> {
138 /// Wraps an I/O handle implementing [`std::io`] traits.
139 ///
140 /// # Examples
141 ///
142 /// ```
143 /// use futures_lite::io::AssertAsync;
144 ///
145 /// let reader: &[u8] = b"hello";
146 ///
147 /// let async_reader = AssertAsync::new(reader);
148 /// ```
149 #[inline(always)]
150 pub fn new(io: T) -> Self {
151 AssertAsync(io)
152 }
153
154 /// Gets a reference to the inner I/O handle.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use futures_lite::io::AssertAsync;
160 ///
161 /// let reader: &[u8] = b"hello";
162 ///
163 /// let async_reader = AssertAsync::new(reader);
164 /// let r = async_reader.get_ref();
165 /// ```
166 #[inline(always)]
167 pub fn get_ref(&self) -> &T {
168 &self.0
169 }
170
171 /// Gets a mutable reference to the inner I/O handle.
172 ///
173 /// # Examples
174 ///
175 /// ```
176 /// use futures_lite::io::AssertAsync;
177 ///
178 /// let reader: &[u8] = b"hello";
179 ///
180 /// let mut async_reader = AssertAsync::new(reader);
181 /// let r = async_reader.get_mut();
182 /// ```
183 #[inline(always)]
184 pub fn get_mut(&mut self) -> &mut T {
185 &mut self.0
186 }
187
188 /// Extracts the inner I/O handle.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// use futures_lite::io::AssertAsync;
194 ///
195 /// let reader: &[u8] = b"hello";
196 ///
197 /// let async_reader = AssertAsync::new(reader);
198 /// let inner = async_reader.into_inner();
199 /// ```
200 #[inline(always)]
201 pub fn into_inner(self) -> T {
202 self.0
203 }
204}
205
206fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
207where
208 F: FnMut() -> std::io::Result<T>,
209{
210 loop {
211 match f() {
212 Err(err: Error) if err.kind() == ErrorKind::Interrupted => {}
213 res: Result => return Poll::Ready(res),
214 }
215 }
216}
217
218impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
219 #[inline]
220 fn poll_read(
221 mut self: Pin<&mut Self>,
222 _: &mut Context<'_>,
223 buf: &mut [u8],
224 ) -> Poll<Result<usize>> {
225 assert_async_wrapio(move || self.0.read(buf))
226 }
227
228 #[inline]
229 fn poll_read_vectored(
230 mut self: Pin<&mut Self>,
231 _: &mut Context<'_>,
232 bufs: &mut [IoSliceMut<'_>],
233 ) -> Poll<Result<usize>> {
234 assert_async_wrapio(move || self.0.read_vectored(bufs))
235 }
236}
237
238impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
239 #[inline]
240 fn poll_write(
241 mut self: Pin<&mut Self>,
242 _: &mut Context<'_>,
243 buf: &[u8],
244 ) -> Poll<Result<usize>> {
245 assert_async_wrapio(move || self.0.write(buf))
246 }
247
248 #[inline]
249 fn poll_write_vectored(
250 mut self: Pin<&mut Self>,
251 _: &mut Context<'_>,
252 bufs: &[IoSlice<'_>],
253 ) -> Poll<Result<usize>> {
254 assert_async_wrapio(move || self.0.write_vectored(bufs))
255 }
256
257 #[inline]
258 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
259 assert_async_wrapio(move || self.0.flush())
260 }
261
262 #[inline]
263 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
264 self.poll_flush(cx)
265 }
266}
267
268impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
269 #[inline]
270 fn poll_seek(
271 mut self: Pin<&mut Self>,
272 _: &mut Context<'_>,
273 pos: SeekFrom,
274 ) -> Poll<Result<u64>> {
275 assert_async_wrapio(move || self.0.seek(pos))
276 }
277}
278
279/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
280/// polls to `WouldBlock` errors.
281///
282/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
283/// that take `Read` as a parameter.
284///
285/// # Examples
286///
287/// ```
288/// use std::io::Read;
289/// use std::task::{Poll, Context};
290///
291/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
292/// // Assume we have a library that's built around `Read` and `Write` traits.
293/// use cooltls::Session;
294///
295/// // We want to use it with our writer that implements `AsyncWrite`.
296/// let writer = Stream::new();
297///
298/// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
299/// use futures_lite::io::AsyncAsSync;
300/// let writer = AsyncAsSync::new(cx, writer);
301///
302/// // Now, we can use it with `cooltls`.
303/// let mut session = Session::new(writer);
304///
305/// // Match on the result of `read()` and translate it to poll.
306/// match session.read(&mut [0; 1024]) {
307/// Ok(n) => Poll::Ready(n),
308/// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
309/// Err(err) => panic!("unexpected error: {}", err),
310/// }
311/// }
312///
313/// // Usually, poll-based functions are best wrapped using `poll_fn`.
314/// use futures_lite::future::poll_fn;
315/// # futures_lite::future::block_on(async {
316/// poll_fn(|cx| poll_for_io(cx)).await;
317/// # });
318/// # struct Stream;
319/// # impl Stream {
320/// # fn new() -> Stream {
321/// # Stream
322/// # }
323/// # }
324/// # impl futures_lite::io::AsyncRead for Stream {
325/// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
326/// # Poll::Ready(Ok(0))
327/// # }
328/// # }
329/// # mod cooltls {
330/// # pub struct Session<W> {
331/// # reader: W,
332/// # }
333/// # impl<W> Session<W> {
334/// # pub fn new(reader: W) -> Session<W> {
335/// # Session { reader }
336/// # }
337/// # }
338/// # impl<W: std::io::Read> std::io::Read for Session<W> {
339/// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
340/// # self.reader.read(buf)
341/// # }
342/// # }
343/// # }
344/// ```
345#[derive(Debug)]
346pub struct AsyncAsSync<'r, 'ctx, T> {
347 /// The context we are using to poll the future.
348 pub context: &'r mut Context<'ctx>,
349
350 /// The actual reader/writer we are wrapping.
351 pub inner: T,
352}
353
354impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
355 /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// use futures_lite::io::AsyncAsSync;
361 /// use std::task::Context;
362 /// use waker_fn::waker_fn;
363 ///
364 /// let reader: &[u8] = b"hello";
365 /// let waker = waker_fn(|| {});
366 /// let mut context = Context::from_waker(&waker);
367 ///
368 /// let async_reader = AsyncAsSync::new(&mut context, reader);
369 /// ```
370 #[inline]
371 pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
372 AsyncAsSync { context, inner }
373 }
374
375 /// Attempt to shutdown the I/O handle.
376 ///
377 /// # Examples
378 ///
379 /// ```
380 /// use futures_lite::io::AsyncAsSync;
381 /// use std::task::Context;
382 /// use waker_fn::waker_fn;
383 ///
384 /// let reader: Vec<u8> = b"hello".to_vec();
385 /// let waker = waker_fn(|| {});
386 /// let mut context = Context::from_waker(&waker);
387 ///
388 /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
389 /// async_reader.close().unwrap();
390 /// ```
391 #[inline]
392 pub fn close(&mut self) -> Result<()>
393 where
394 T: AsyncWrite + Unpin,
395 {
396 self.poll_with(|io, cx| io.poll_close(cx))
397 }
398
399 /// Poll this `AsyncAsSync` for some function.
400 ///
401 /// # Examples
402 ///
403 /// ```
404 /// use futures_lite::io::{AsyncAsSync, AsyncRead};
405 /// use std::task::Context;
406 /// use waker_fn::waker_fn;
407 ///
408 /// let reader: &[u8] = b"hello";
409 /// let waker = waker_fn(|| {});
410 /// let mut context = Context::from_waker(&waker);
411 ///
412 /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
413 /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
414 /// assert_eq!(r.unwrap(), 5);
415 /// ```
416 #[inline]
417 pub fn poll_with<R>(
418 &mut self,
419 f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
420 ) -> Result<R>
421 where
422 T: Unpin,
423 {
424 match f(Pin::new(&mut self.inner), self.context) {
425 Poll::Ready(res) => res,
426 Poll::Pending => Err(ErrorKind::WouldBlock.into()),
427 }
428 }
429}
430
431impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
432 #[inline]
433 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
434 self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_read(cx, buf))
435 }
436
437 #[inline]
438 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
439 self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_read_vectored(cx, bufs))
440 }
441}
442
443impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
444 #[inline]
445 fn write(&mut self, buf: &[u8]) -> Result<usize> {
446 self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_write(cx, buf))
447 }
448
449 #[inline]
450 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
451 self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_write_vectored(cx, bufs))
452 }
453
454 #[inline]
455 fn flush(&mut self) -> Result<()> {
456 self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_flush(cx))
457 }
458}
459
460impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
461 #[inline]
462 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
463 self.poll_with(|io: Pin<&mut T>, cx: &mut Context<'_>| io.poll_seek(cx, pos))
464 }
465}
466
467impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
468 #[inline]
469 fn as_ref(&self) -> &T {
470 &self.inner
471 }
472}
473
474impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
475 #[inline]
476 fn as_mut(&mut self) -> &mut T {
477 &mut self.inner
478 }
479}
480
481impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
482 #[inline]
483 fn borrow(&self) -> &T {
484 &self.inner
485 }
486}
487
488impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
489 #[inline]
490 fn borrow_mut(&mut self) -> &mut T {
491 &mut self.inner
492 }
493}
494
495/// Blocks on all async I/O operations and implements [`std::io`] traits.
496///
497/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
498/// manually all the time becomes too tedious, use this type for more convenient blocking on async
499/// I/O operations.
500///
501/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
502/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
503/// [`AsyncSeek`], respectively.
504///
505/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
506/// dropping the [`BlockOn`] handle or some buffered data might get lost.
507///
508/// # Examples
509///
510/// ```
511/// use futures_lite::io::BlockOn;
512/// use futures_lite::pin;
513/// use std::io::Read;
514///
515/// let reader: &[u8] = b"hello";
516/// pin!(reader);
517///
518/// let mut blocking_reader = BlockOn::new(reader);
519/// let mut contents = String::new();
520///
521/// // This line blocks - note that there is no await:
522/// blocking_reader.read_to_string(&mut contents)?;
523/// # std::io::Result::Ok(())
524/// ```
525#[derive(Debug)]
526pub struct BlockOn<T>(T);
527
528impl<T> BlockOn<T> {
529 /// Wraps an async I/O handle into a blocking interface.
530 ///
531 /// # Examples
532 ///
533 /// ```
534 /// use futures_lite::io::BlockOn;
535 /// use futures_lite::pin;
536 ///
537 /// let reader: &[u8] = b"hello";
538 /// pin!(reader);
539 ///
540 /// let blocking_reader = BlockOn::new(reader);
541 /// ```
542 pub fn new(io: T) -> BlockOn<T> {
543 BlockOn(io)
544 }
545
546 /// Gets a reference to the async I/O handle.
547 ///
548 /// # Examples
549 ///
550 /// ```
551 /// use futures_lite::io::BlockOn;
552 /// use futures_lite::pin;
553 ///
554 /// let reader: &[u8] = b"hello";
555 /// pin!(reader);
556 ///
557 /// let blocking_reader = BlockOn::new(reader);
558 /// let r = blocking_reader.get_ref();
559 /// ```
560 pub fn get_ref(&self) -> &T {
561 &self.0
562 }
563
564 /// Gets a mutable reference to the async I/O handle.
565 ///
566 /// # Examples
567 ///
568 /// ```
569 /// use futures_lite::io::BlockOn;
570 /// use futures_lite::pin;
571 ///
572 /// let reader: &[u8] = b"hello";
573 /// pin!(reader);
574 ///
575 /// let mut blocking_reader = BlockOn::new(reader);
576 /// let r = blocking_reader.get_mut();
577 /// ```
578 pub fn get_mut(&mut self) -> &mut T {
579 &mut self.0
580 }
581
582 /// Extracts the inner async I/O handle.
583 ///
584 /// # Examples
585 ///
586 /// ```
587 /// use futures_lite::io::BlockOn;
588 /// use futures_lite::pin;
589 ///
590 /// let reader: &[u8] = b"hello";
591 /// pin!(reader);
592 ///
593 /// let blocking_reader = BlockOn::new(reader);
594 /// let inner = blocking_reader.into_inner();
595 /// ```
596 pub fn into_inner(self) -> T {
597 self.0
598 }
599}
600
601impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
602 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
603 future::block_on(self.0.read(buf))
604 }
605}
606
607impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
608 fn fill_buf(&mut self) -> Result<&[u8]> {
609 future::block_on(self.0.fill_buf())
610 }
611
612 fn consume(&mut self, amt: usize) {
613 Pin::new(&mut self.0).consume(amt)
614 }
615}
616
617impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
618 fn write(&mut self, buf: &[u8]) -> Result<usize> {
619 future::block_on(self.0.write(buf))
620 }
621
622 fn flush(&mut self) -> Result<()> {
623 future::block_on(self.0.flush())
624 }
625}
626
627impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
628 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
629 future::block_on(self.0.seek(pos))
630 }
631}
632
633pin_project! {
634 /// Adds buffering to a reader.
635 ///
636 /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
637 /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
638 /// maintains an in-memory buffer of the incoming byte stream.
639 ///
640 /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
641 /// the same file or networking socket. It does not help when reading very large amounts at
642 /// once, or reading just once or a few times. It also provides no advantage when reading from
643 /// a source that is already in memory, like a `Vec<u8>`.
644 ///
645 /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
646 /// multiple instances of [`BufReader`] on the same reader can cause data loss.
647 ///
648 /// # Examples
649 ///
650 /// ```
651 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
652 ///
653 /// # spin_on::spin_on(async {
654 /// let input: &[u8] = b"hello";
655 /// let mut reader = BufReader::new(input);
656 ///
657 /// let mut line = String::new();
658 /// reader.read_line(&mut line).await?;
659 /// # std::io::Result::Ok(()) });
660 /// ```
661 pub struct BufReader<R> {
662 #[pin]
663 inner: R,
664 buf: Box<[u8]>,
665 pos: usize,
666 cap: usize,
667 }
668}
669
670impl<R: AsyncRead> BufReader<R> {
671 /// Creates a buffered reader with the default buffer capacity.
672 ///
673 /// The default capacity is currently 8 KB, but that may change in the future.
674 ///
675 /// # Examples
676 ///
677 /// ```
678 /// use futures_lite::io::BufReader;
679 ///
680 /// let input: &[u8] = b"hello";
681 /// let reader = BufReader::new(input);
682 /// ```
683 pub fn new(inner: R) -> BufReader<R> {
684 BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
685 }
686
687 /// Creates a buffered reader with the specified capacity.
688 ///
689 /// # Examples
690 ///
691 /// ```
692 /// use futures_lite::io::BufReader;
693 ///
694 /// let input: &[u8] = b"hello";
695 /// let reader = BufReader::with_capacity(1024, input);
696 /// ```
697 pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
698 BufReader {
699 inner,
700 buf: vec![0; capacity].into_boxed_slice(),
701 pos: 0,
702 cap: 0,
703 }
704 }
705}
706
707impl<R> BufReader<R> {
708 /// Gets a reference to the underlying reader.
709 ///
710 /// It is not advisable to directly read from the underlying reader.
711 ///
712 /// # Examples
713 ///
714 /// ```
715 /// use futures_lite::io::BufReader;
716 ///
717 /// let input: &[u8] = b"hello";
718 /// let reader = BufReader::new(input);
719 ///
720 /// let r = reader.get_ref();
721 /// ```
722 pub fn get_ref(&self) -> &R {
723 &self.inner
724 }
725
726 /// Gets a mutable reference to the underlying reader.
727 ///
728 /// It is not advisable to directly read from the underlying reader.
729 ///
730 /// # Examples
731 ///
732 /// ```
733 /// use futures_lite::io::BufReader;
734 ///
735 /// let input: &[u8] = b"hello";
736 /// let mut reader = BufReader::new(input);
737 ///
738 /// let r = reader.get_mut();
739 /// ```
740 pub fn get_mut(&mut self) -> &mut R {
741 &mut self.inner
742 }
743
744 /// Gets a pinned mutable reference to the underlying reader.
745 ///
746 /// It is not advisable to directly read from the underlying reader.
747 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
748 self.project().inner
749 }
750
751 /// Returns a reference to the internal buffer.
752 ///
753 /// This method will not attempt to fill the buffer if it is empty.
754 ///
755 /// # Examples
756 ///
757 /// ```
758 /// use futures_lite::io::BufReader;
759 ///
760 /// let input: &[u8] = b"hello";
761 /// let reader = BufReader::new(input);
762 ///
763 /// // The internal buffer is empty until the first read request.
764 /// assert_eq!(reader.buffer(), &[]);
765 /// ```
766 pub fn buffer(&self) -> &[u8] {
767 &self.buf[self.pos..self.cap]
768 }
769
770 /// Unwraps the buffered reader, returning the underlying reader.
771 ///
772 /// Note that any leftover data in the internal buffer will be lost.
773 ///
774 /// # Examples
775 ///
776 /// ```
777 /// use futures_lite::io::BufReader;
778 ///
779 /// let input: &[u8] = b"hello";
780 /// let reader = BufReader::new(input);
781 ///
782 /// assert_eq!(reader.into_inner(), input);
783 /// ```
784 pub fn into_inner(self) -> R {
785 self.inner
786 }
787
788 /// Invalidates all data in the internal buffer.
789 #[inline]
790 fn discard_buffer(self: Pin<&mut Self>) {
791 let this = self.project();
792 *this.pos = 0;
793 *this.cap = 0;
794 }
795}
796
797impl<R: AsyncRead> AsyncRead for BufReader<R> {
798 fn poll_read(
799 mut self: Pin<&mut Self>,
800 cx: &mut Context<'_>,
801 buf: &mut [u8],
802 ) -> Poll<Result<usize>> {
803 // If we don't have any buffered data and we're doing a massive read
804 // (larger than our internal buffer), bypass our internal buffer
805 // entirely.
806 if self.pos == self.cap && buf.len() >= self.buf.len() {
807 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
808 self.discard_buffer();
809 return Poll::Ready(res);
810 }
811 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
812 let nread = std::io::Read::read(&mut rem, buf)?;
813 self.consume(nread);
814 Poll::Ready(Ok(nread))
815 }
816
817 fn poll_read_vectored(
818 mut self: Pin<&mut Self>,
819 cx: &mut Context<'_>,
820 bufs: &mut [IoSliceMut<'_>],
821 ) -> Poll<Result<usize>> {
822 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
823 if self.pos == self.cap && total_len >= self.buf.len() {
824 let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
825 self.discard_buffer();
826 return Poll::Ready(res);
827 }
828 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
829 let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
830 self.consume(nread);
831 Poll::Ready(Ok(nread))
832 }
833}
834
835impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
836 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
837 let mut this: Projection<'_, R> = self.project();
838
839 // If we've reached the end of our internal buffer then we need to fetch
840 // some more data from the underlying reader.
841 // Branch using `>=` instead of the more correct `==`
842 // to tell the compiler that the pos..cap slice is always valid.
843 if *this.pos >= *this.cap {
844 debug_assert!(*this.pos == *this.cap);
845 *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
846 *this.pos = 0;
847 }
848 Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
849 }
850
851 fn consume(self: Pin<&mut Self>, amt: usize) {
852 let this: Projection<'_, R> = self.project();
853 *this.pos = cmp::min(*this.pos + amt, *this.cap);
854 }
855}
856
857impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
858 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
859 f&mut DebugStruct<'_, '_>.debug_struct("BufReader")
860 .field("reader", &self.inner)
861 .field(
862 name:"buffer",
863 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
864 )
865 .finish()
866 }
867}
868
869impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
870 /// Seeks to an offset, in bytes, in the underlying reader.
871 ///
872 /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
873 /// reader would be at if the [`BufReader`] had no internal buffer.
874 ///
875 /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
876 /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
877 /// immediately after a seek yields the underlying reader at the same position.
878 ///
879 /// See [`AsyncSeek`] for more details.
880 ///
881 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
882 /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
883 /// the second seek returns `Err`, the underlying reader will be left at the same position it
884 /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
885 fn poll_seek(
886 mut self: Pin<&mut Self>,
887 cx: &mut Context<'_>,
888 pos: SeekFrom,
889 ) -> Poll<Result<u64>> {
890 let result: u64;
891 if let SeekFrom::Current(n) = pos {
892 let remainder = (self.cap - self.pos) as i64;
893 // it should be safe to assume that remainder fits within an i64 as the alternative
894 // means we managed to allocate 8 exbibytes and that's absurd.
895 // But it's not out of the realm of possibility for some weird underlying reader to
896 // support seeking by i64::min_value() so we need to handle underflow when subtracting
897 // remainder.
898 if let Some(offset) = n.checked_sub(remainder) {
899 result = ready!(self
900 .as_mut()
901 .get_pin_mut()
902 .poll_seek(cx, SeekFrom::Current(offset)))?;
903 } else {
904 // seek backwards by our remainder, and then by the offset
905 ready!(self
906 .as_mut()
907 .get_pin_mut()
908 .poll_seek(cx, SeekFrom::Current(-remainder)))?;
909 self.as_mut().discard_buffer();
910 result = ready!(self
911 .as_mut()
912 .get_pin_mut()
913 .poll_seek(cx, SeekFrom::Current(n)))?;
914 }
915 } else {
916 // Seeking with Start/End doesn't care about our buffer length.
917 result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
918 }
919 self.discard_buffer();
920 Poll::Ready(Ok(result))
921 }
922}
923
924impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
925 fn poll_write(
926 mut self: Pin<&mut Self>,
927 cx: &mut Context<'_>,
928 buf: &[u8],
929 ) -> Poll<Result<usize>> {
930 self.as_mut().get_pin_mut().poll_write(cx, buf)
931 }
932
933 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
934 self.as_mut().get_pin_mut().poll_flush(cx)
935 }
936
937 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
938 self.as_mut().get_pin_mut().poll_close(cx)
939 }
940}
941
942pin_project! {
943 /// Adds buffering to a writer.
944 ///
945 /// It can be excessively inefficient to work directly with something that implements
946 /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
947 /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
948 /// writes it to the underlying writer in large, infrequent batches.
949 ///
950 /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
951 /// the same file or networking socket. It does not help when writing very large amounts at
952 /// once, or writing just once or a few times. It also provides no advantage when writing to a
953 /// destination that is in memory, like a `Vec<u8>`.
954 ///
955 /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
956 /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
957 /// dropping the [`BufWriter`].
958 ///
959 /// # Examples
960 ///
961 /// ```
962 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
963 ///
964 /// # spin_on::spin_on(async {
965 /// let mut output = Vec::new();
966 /// let mut writer = BufWriter::new(&mut output);
967 ///
968 /// writer.write_all(b"hello").await?;
969 /// writer.flush().await?;
970 /// # std::io::Result::Ok(()) });
971 /// ```
972 pub struct BufWriter<W> {
973 #[pin]
974 inner: W,
975 buf: Vec<u8>,
976 written: usize,
977 }
978}
979
980impl<W: AsyncWrite> BufWriter<W> {
981 /// Creates a buffered writer with the default buffer capacity.
982 ///
983 /// The default capacity is currently 8 KB, but that may change in the future.
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// use futures_lite::io::BufWriter;
989 ///
990 /// let mut output = Vec::new();
991 /// let writer = BufWriter::new(&mut output);
992 /// ```
993 pub fn new(inner: W) -> BufWriter<W> {
994 BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
995 }
996
997 /// Creates a buffered writer with the specified buffer capacity.
998 ///
999 /// # Examples
1000 ///
1001 /// ```
1002 /// use futures_lite::io::BufWriter;
1003 ///
1004 /// let mut output = Vec::new();
1005 /// let writer = BufWriter::with_capacity(100, &mut output);
1006 /// ```
1007 pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
1008 BufWriter {
1009 inner,
1010 buf: Vec::with_capacity(capacity),
1011 written: 0,
1012 }
1013 }
1014
1015 /// Gets a reference to the underlying writer.
1016 ///
1017 /// # Examples
1018 ///
1019 /// ```
1020 /// use futures_lite::io::BufWriter;
1021 ///
1022 /// let mut output = Vec::new();
1023 /// let writer = BufWriter::new(&mut output);
1024 ///
1025 /// let r = writer.get_ref();
1026 /// ```
1027 pub fn get_ref(&self) -> &W {
1028 &self.inner
1029 }
1030
1031 /// Gets a mutable reference to the underlying writer.
1032 ///
1033 /// It is not advisable to directly write to the underlying writer.
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```
1038 /// use futures_lite::io::BufWriter;
1039 ///
1040 /// let mut output = Vec::new();
1041 /// let mut writer = BufWriter::new(&mut output);
1042 ///
1043 /// let r = writer.get_mut();
1044 /// ```
1045 pub fn get_mut(&mut self) -> &mut W {
1046 &mut self.inner
1047 }
1048
1049 /// Gets a pinned mutable reference to the underlying writer.
1050 ///
1051 /// It is not not advisable to directly write to the underlying writer.
1052 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
1053 self.project().inner
1054 }
1055
1056 /// Unwraps the buffered writer, returning the underlying writer.
1057 ///
1058 /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
1059 /// that data, flush the buffered writer before unwrapping it.
1060 ///
1061 /// # Examples
1062 ///
1063 /// ```
1064 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
1065 ///
1066 /// # spin_on::spin_on(async {
1067 /// let mut output = vec![1, 2, 3];
1068 /// let mut writer = BufWriter::new(&mut output);
1069 ///
1070 /// writer.write_all(&[4]).await?;
1071 /// writer.flush().await?;
1072 /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
1073 /// # std::io::Result::Ok(()) });
1074 /// ```
1075 pub fn into_inner(self) -> W {
1076 self.inner
1077 }
1078
1079 /// Returns a reference to the internal buffer.
1080 ///
1081 /// # Examples
1082 ///
1083 /// ```
1084 /// use futures_lite::io::BufWriter;
1085 ///
1086 /// let mut output = Vec::new();
1087 /// let writer = BufWriter::new(&mut output);
1088 ///
1089 /// // The internal buffer is empty until the first write request.
1090 /// assert_eq!(writer.buffer(), &[]);
1091 /// ```
1092 pub fn buffer(&self) -> &[u8] {
1093 &self.buf
1094 }
1095
1096 /// Flush the buffer.
1097 fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1098 let mut this = self.project();
1099 let len = this.buf.len();
1100 let mut ret = Ok(());
1101
1102 while *this.written < len {
1103 match this
1104 .inner
1105 .as_mut()
1106 .poll_write(cx, &this.buf[*this.written..])
1107 {
1108 Poll::Ready(Ok(0)) => {
1109 ret = Err(Error::new(
1110 ErrorKind::WriteZero,
1111 "Failed to write buffered data",
1112 ));
1113 break;
1114 }
1115 Poll::Ready(Ok(n)) => *this.written += n,
1116 Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
1117 Poll::Ready(Err(e)) => {
1118 ret = Err(e);
1119 break;
1120 }
1121 Poll::Pending => return Poll::Pending,
1122 }
1123 }
1124
1125 if *this.written > 0 {
1126 this.buf.drain(..*this.written);
1127 }
1128 *this.written = 0;
1129
1130 Poll::Ready(ret)
1131 }
1132}
1133
1134impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
1135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1136 f&mut DebugStruct<'_, '_>.debug_struct("BufWriter")
1137 .field("writer", &self.inner)
1138 .field(name:"buf", &self.buf)
1139 .finish()
1140 }
1141}
1142
1143impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
1144 fn poll_write(
1145 mut self: Pin<&mut Self>,
1146 cx: &mut Context<'_>,
1147 buf: &[u8],
1148 ) -> Poll<Result<usize>> {
1149 if self.buf.len() + buf.len() > self.buf.capacity() {
1150 ready!(self.as_mut().poll_flush_buf(cx))?;
1151 }
1152 if buf.len() >= self.buf.capacity() {
1153 self.get_pin_mut().poll_write(cx, buf)
1154 } else {
1155 Pin::new(&mut *self.project().buf).poll_write(cx, buf)
1156 }
1157 }
1158
1159 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1160 ready!(self.as_mut().poll_flush_buf(cx))?;
1161 self.get_pin_mut().poll_flush(cx)
1162 }
1163
1164 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1165 ready!(self.as_mut().poll_flush_buf(cx))?;
1166 self.get_pin_mut().poll_close(cx)
1167 }
1168}
1169
1170impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
1171 /// Seek to the offset, in bytes, in the underlying writer.
1172 ///
1173 /// Seeking always writes out the internal buffer before seeking.
1174 fn poll_seek(
1175 mut self: Pin<&mut Self>,
1176 cx: &mut Context<'_>,
1177 pos: SeekFrom,
1178 ) -> Poll<Result<u64>> {
1179 ready!(self.as_mut().poll_flush_buf(cx))?;
1180 self.get_pin_mut().poll_seek(cx, pos)
1181 }
1182}
1183
1184/// Gives an in-memory buffer a cursor for reading and writing.
1185///
1186/// # Examples
1187///
1188/// ```
1189/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
1190///
1191/// # spin_on::spin_on(async {
1192/// let mut bytes = b"hello".to_vec();
1193/// let mut cursor = Cursor::new(&mut bytes);
1194///
1195/// // Overwrite 'h' with 'H'.
1196/// cursor.write_all(b"H").await?;
1197///
1198/// // Move the cursor one byte forward.
1199/// cursor.seek(SeekFrom::Current(1)).await?;
1200///
1201/// // Read a byte.
1202/// let mut byte = [0];
1203/// cursor.read_exact(&mut byte).await?;
1204/// assert_eq!(&byte, b"l");
1205///
1206/// // Check the final buffer.
1207/// assert_eq!(bytes, b"Hello");
1208/// # std::io::Result::Ok(()) });
1209/// ```
1210#[derive(Clone, Debug, Default)]
1211pub struct Cursor<T> {
1212 inner: std::io::Cursor<T>,
1213}
1214
1215impl<T> Cursor<T> {
1216 /// Creates a cursor for an in-memory buffer.
1217 ///
1218 /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
1219 /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
1220 /// the buffer using [`set_position()`][Cursor::set_position()`] or
1221 /// [`seek()`][`AsyncSeekExt::seek()`].
1222 ///
1223 /// # Examples
1224 ///
1225 /// ```
1226 /// use futures_lite::io::Cursor;
1227 ///
1228 /// let cursor = Cursor::new(Vec::<u8>::new());
1229 /// ```
1230 pub fn new(inner: T) -> Cursor<T> {
1231 Cursor {
1232 inner: std::io::Cursor::new(inner),
1233 }
1234 }
1235
1236 /// Gets a reference to the underlying buffer.
1237 ///
1238 /// # Examples
1239 ///
1240 /// ```
1241 /// use futures_lite::io::Cursor;
1242 ///
1243 /// let cursor = Cursor::new(Vec::<u8>::new());
1244 /// let r = cursor.get_ref();
1245 /// ```
1246 pub fn get_ref(&self) -> &T {
1247 self.inner.get_ref()
1248 }
1249
1250 /// Gets a mutable reference to the underlying buffer.
1251 ///
1252 /// # Examples
1253 ///
1254 /// ```
1255 /// use futures_lite::io::Cursor;
1256 ///
1257 /// let mut cursor = Cursor::new(Vec::<u8>::new());
1258 /// let r = cursor.get_mut();
1259 /// ```
1260 pub fn get_mut(&mut self) -> &mut T {
1261 self.inner.get_mut()
1262 }
1263
1264 /// Unwraps the cursor, returning the underlying buffer.
1265 ///
1266 /// # Examples
1267 ///
1268 /// ```
1269 /// use futures_lite::io::Cursor;
1270 ///
1271 /// let cursor = Cursor::new(vec![1, 2, 3]);
1272 /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1273 /// ```
1274 pub fn into_inner(self) -> T {
1275 self.inner.into_inner()
1276 }
1277
1278 /// Returns the current position of this cursor.
1279 ///
1280 /// # Examples
1281 ///
1282 /// ```
1283 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1284 ///
1285 /// # spin_on::spin_on(async {
1286 /// let mut cursor = Cursor::new(b"hello");
1287 /// assert_eq!(cursor.position(), 0);
1288 ///
1289 /// cursor.seek(SeekFrom::Start(2)).await?;
1290 /// assert_eq!(cursor.position(), 2);
1291 /// # std::io::Result::Ok(()) });
1292 /// ```
1293 pub fn position(&self) -> u64 {
1294 self.inner.position()
1295 }
1296
1297 /// Sets the position of this cursor.
1298 ///
1299 /// # Examples
1300 ///
1301 /// ```
1302 /// use futures_lite::io::Cursor;
1303 ///
1304 /// let mut cursor = Cursor::new(b"hello");
1305 /// assert_eq!(cursor.position(), 0);
1306 ///
1307 /// cursor.set_position(2);
1308 /// assert_eq!(cursor.position(), 2);
1309 /// ```
1310 pub fn set_position(&mut self, pos: u64) {
1311 self.inner.set_position(pos)
1312 }
1313}
1314
1315impl<T> AsyncSeek for Cursor<T>
1316where
1317 T: AsRef<[u8]> + Unpin,
1318{
1319 fn poll_seek(
1320 mut self: Pin<&mut Self>,
1321 _: &mut Context<'_>,
1322 pos: SeekFrom,
1323 ) -> Poll<Result<u64>> {
1324 Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1325 }
1326}
1327
1328impl<T> AsyncRead for Cursor<T>
1329where
1330 T: AsRef<[u8]> + Unpin,
1331{
1332 fn poll_read(
1333 mut self: Pin<&mut Self>,
1334 _cx: &mut Context<'_>,
1335 buf: &mut [u8],
1336 ) -> Poll<Result<usize>> {
1337 Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1338 }
1339
1340 fn poll_read_vectored(
1341 mut self: Pin<&mut Self>,
1342 _: &mut Context<'_>,
1343 bufs: &mut [IoSliceMut<'_>],
1344 ) -> Poll<Result<usize>> {
1345 Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1346 }
1347}
1348
1349impl<T> AsyncBufRead for Cursor<T>
1350where
1351 T: AsRef<[u8]> + Unpin,
1352{
1353 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1354 Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1355 }
1356
1357 fn consume(mut self: Pin<&mut Self>, amt: usize) {
1358 std::io::BufRead::consume(&mut self.inner, amt)
1359 }
1360}
1361
1362impl AsyncWrite for Cursor<&mut [u8]> {
1363 fn poll_write(
1364 mut self: Pin<&mut Self>,
1365 _: &mut Context<'_>,
1366 buf: &[u8],
1367 ) -> Poll<Result<usize>> {
1368 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1369 }
1370
1371 fn poll_write_vectored(
1372 mut self: Pin<&mut Self>,
1373 _: &mut Context<'_>,
1374 bufs: &[IoSlice<'_>],
1375 ) -> Poll<Result<usize>> {
1376 Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1377 }
1378
1379 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1380 Poll::Ready(std::io::Write::flush(&mut self.inner))
1381 }
1382
1383 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1384 self.poll_flush(cx)
1385 }
1386}
1387
1388impl AsyncWrite for Cursor<&mut Vec<u8>> {
1389 fn poll_write(
1390 mut self: Pin<&mut Self>,
1391 _: &mut Context<'_>,
1392 buf: &[u8],
1393 ) -> Poll<Result<usize>> {
1394 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1395 }
1396
1397 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1398 self.poll_flush(cx)
1399 }
1400
1401 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1402 Poll::Ready(std::io::Write::flush(&mut self.inner))
1403 }
1404}
1405
1406impl AsyncWrite for Cursor<Vec<u8>> {
1407 fn poll_write(
1408 mut self: Pin<&mut Self>,
1409 _: &mut Context<'_>,
1410 buf: &[u8],
1411 ) -> Poll<Result<usize>> {
1412 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1413 }
1414
1415 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1416 self.poll_flush(cx)
1417 }
1418
1419 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1420 Poll::Ready(std::io::Write::flush(&mut self.inner))
1421 }
1422}
1423
1424/// Creates an empty reader.
1425///
1426/// # Examples
1427///
1428/// ```
1429/// use futures_lite::io::{self, AsyncReadExt};
1430///
1431/// # spin_on::spin_on(async {
1432/// let mut reader = io::empty();
1433///
1434/// let mut contents = Vec::new();
1435/// reader.read_to_end(&mut contents).await?;
1436/// assert!(contents.is_empty());
1437/// # std::io::Result::Ok(()) });
1438/// ```
1439pub fn empty() -> Empty {
1440 Empty { _private: () }
1441}
1442
1443/// Reader for the [`empty()`] function.
1444pub struct Empty {
1445 _private: (),
1446}
1447
1448impl fmt::Debug for Empty {
1449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1450 f.pad("Empty { .. }")
1451 }
1452}
1453
1454impl AsyncRead for Empty {
1455 #[inline]
1456 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1457 Poll::Ready(Ok(0))
1458 }
1459}
1460
1461impl AsyncBufRead for Empty {
1462 #[inline]
1463 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1464 Poll::Ready(Ok(&[]))
1465 }
1466
1467 #[inline]
1468 fn consume(self: Pin<&mut Self>, _: usize) {}
1469}
1470
1471/// Creates an infinite reader that reads the same byte repeatedly.
1472///
1473/// # Examples
1474///
1475/// ```
1476/// use futures_lite::io::{self, AsyncReadExt};
1477///
1478/// # spin_on::spin_on(async {
1479/// let mut reader = io::repeat(b'a');
1480///
1481/// let mut contents = vec![0; 5];
1482/// reader.read_exact(&mut contents).await?;
1483/// assert_eq!(contents, b"aaaaa");
1484/// # std::io::Result::Ok(()) });
1485/// ```
1486pub fn repeat(byte: u8) -> Repeat {
1487 Repeat { byte }
1488}
1489
1490/// Reader for the [`repeat()`] function.
1491#[derive(Debug)]
1492pub struct Repeat {
1493 byte: u8,
1494}
1495
1496impl AsyncRead for Repeat {
1497 #[inline]
1498 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1499 for b: &mut u8 in &mut *buf {
1500 *b = self.byte;
1501 }
1502 Poll::Ready(Ok(buf.len()))
1503 }
1504}
1505
1506/// Creates a writer that consumes and drops all data.
1507///
1508/// # Examples
1509///
1510/// ```
1511/// use futures_lite::io::{self, AsyncWriteExt};
1512///
1513/// # spin_on::spin_on(async {
1514/// let mut writer = io::sink();
1515/// writer.write_all(b"hello").await?;
1516/// # std::io::Result::Ok(()) });
1517/// ```
1518pub fn sink() -> Sink {
1519 Sink { _private: () }
1520}
1521
1522/// Writer for the [`sink()`] function.
1523#[derive(Debug)]
1524pub struct Sink {
1525 _private: (),
1526}
1527
1528impl AsyncWrite for Sink {
1529 #[inline]
1530 fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1531 Poll::Ready(Ok(buf.len()))
1532 }
1533
1534 #[inline]
1535 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1536 Poll::Ready(Ok(()))
1537 }
1538
1539 #[inline]
1540 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1541 Poll::Ready(Ok(()))
1542 }
1543}
1544
1545/// Extension trait for [`AsyncBufRead`].
1546pub trait AsyncBufReadExt: AsyncBufRead {
1547 /// Returns the contents of the internal buffer, filling it with more data if empty.
1548 ///
1549 /// If the stream has reached EOF, an empty buffer will be returned.
1550 ///
1551 /// # Examples
1552 ///
1553 /// ```
1554 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1555 /// use std::pin::Pin;
1556 ///
1557 /// # spin_on::spin_on(async {
1558 /// let input: &[u8] = b"hello world";
1559 /// let mut reader = BufReader::with_capacity(5, input);
1560 ///
1561 /// assert_eq!(reader.fill_buf().await?, b"hello");
1562 /// reader.consume(2);
1563 /// assert_eq!(reader.fill_buf().await?, b"llo");
1564 /// reader.consume(3);
1565 /// assert_eq!(reader.fill_buf().await?, b" worl");
1566 /// # std::io::Result::Ok(()) });
1567 /// ```
1568 fn fill_buf(&mut self) -> FillBuf<'_, Self>
1569 where
1570 Self: Unpin,
1571 {
1572 FillBuf { reader: Some(self) }
1573 }
1574
1575 /// Consumes `amt` buffered bytes.
1576 ///
1577 /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1578 /// internal buffer.
1579 ///
1580 /// The `amt` must be <= the number of bytes in the buffer returned by
1581 /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1582 ///
1583 /// # Examples
1584 ///
1585 /// ```
1586 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1587 /// use std::pin::Pin;
1588 ///
1589 /// # spin_on::spin_on(async {
1590 /// let input: &[u8] = b"hello";
1591 /// let mut reader = BufReader::with_capacity(4, input);
1592 ///
1593 /// assert_eq!(reader.fill_buf().await?, b"hell");
1594 /// reader.consume(2);
1595 /// assert_eq!(reader.fill_buf().await?, b"ll");
1596 /// # std::io::Result::Ok(()) });
1597 /// ```
1598 fn consume(&mut self, amt: usize)
1599 where
1600 Self: Unpin,
1601 {
1602 AsyncBufRead::consume(Pin::new(self), amt);
1603 }
1604
1605 /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1606 ///
1607 /// This method will read bytes from the underlying stream until the delimiter or EOF is
1608 /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1609 ///
1610 /// If successful, returns the total number of bytes read.
1611 ///
1612 /// # Examples
1613 ///
1614 /// ```
1615 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1616 ///
1617 /// # spin_on::spin_on(async {
1618 /// let input: &[u8] = b"hello";
1619 /// let mut reader = BufReader::new(input);
1620 ///
1621 /// let mut buf = Vec::new();
1622 /// let n = reader.read_until(b'\n', &mut buf).await?;
1623 /// # std::io::Result::Ok(()) });
1624 /// ```
1625 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'_, Self>
1626 where
1627 Self: Unpin,
1628 {
1629 ReadUntilFuture {
1630 reader: self,
1631 byte,
1632 buf,
1633 read: 0,
1634 }
1635 }
1636
1637 /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1638 ///
1639 /// This method will read bytes from the underlying stream until the newline delimiter (the
1640 /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1641 /// will be appended to `buf`.
1642 ///
1643 /// If successful, returns the total number of bytes read.
1644 ///
1645 /// # Examples
1646 ///
1647 /// ```
1648 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1649 ///
1650 /// # spin_on::spin_on(async {
1651 /// let input: &[u8] = b"hello";
1652 /// let mut reader = BufReader::new(input);
1653 ///
1654 /// let mut line = String::new();
1655 /// let n = reader.read_line(&mut line).await?;
1656 /// # std::io::Result::Ok(()) });
1657 /// ```
1658 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'_, Self>
1659 where
1660 Self: Unpin,
1661 {
1662 ReadLineFuture {
1663 reader: self,
1664 buf,
1665 bytes: Vec::new(),
1666 read: 0,
1667 }
1668 }
1669
1670 /// Returns a stream over the lines of this byte stream.
1671 ///
1672 /// The stream returned from this method yields items of type
1673 /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1674 /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1675 /// at the end.
1676 ///
1677 /// # Examples
1678 ///
1679 /// ```
1680 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1681 /// use futures_lite::stream::StreamExt;
1682 ///
1683 /// # spin_on::spin_on(async {
1684 /// let input: &[u8] = b"hello\nworld\n";
1685 /// let mut reader = BufReader::new(input);
1686 /// let mut lines = reader.lines();
1687 ///
1688 /// while let Some(line) = lines.next().await {
1689 /// println!("{}", line?);
1690 /// }
1691 /// # std::io::Result::Ok(()) });
1692 /// ```
1693 fn lines(self) -> Lines<Self>
1694 where
1695 Self: Unpin + Sized,
1696 {
1697 Lines {
1698 reader: self,
1699 buf: String::new(),
1700 bytes: Vec::new(),
1701 read: 0,
1702 }
1703 }
1704
1705 /// Returns a stream over the contents of this reader split on the specified `byte`.
1706 ///
1707 /// The stream returned from this method yields items of type
1708 /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1709 /// Each vector returned will *not* have the delimiter byte at the end.
1710 ///
1711 /// # Examples
1712 ///
1713 /// ```
1714 /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1715 /// use futures_lite::stream::StreamExt;
1716 ///
1717 /// # spin_on::spin_on(async {
1718 /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1719 /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1720 ///
1721 /// assert_eq!(items[0], b"lorem");
1722 /// assert_eq!(items[1], b"ipsum");
1723 /// assert_eq!(items[2], b"dolor");
1724 /// # std::io::Result::Ok(()) });
1725 /// ```
1726 fn split(self, byte: u8) -> Split<Self>
1727 where
1728 Self: Sized,
1729 {
1730 Split {
1731 reader: self,
1732 buf: Vec::new(),
1733 delim: byte,
1734 read: 0,
1735 }
1736 }
1737}
1738
1739impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1740
1741/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1742#[derive(Debug)]
1743#[must_use = "futures do nothing unless you `.await` or poll them"]
1744pub struct FillBuf<'a, R: ?Sized> {
1745 reader: Option<&'a mut R>,
1746}
1747
1748impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1749
1750impl<'a, R> Future for FillBuf<'a, R>
1751where
1752 R: AsyncBufRead + Unpin + ?Sized,
1753{
1754 type Output = Result<&'a [u8]>;
1755
1756 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1757 let this: &mut FillBuf<'_, R> = &mut *self;
1758 let reader: &mut R = this
1759 .reader
1760 .take()
1761 .expect(msg:"polled `FillBuf` after completion");
1762
1763 match Pin::new(&mut *reader).poll_fill_buf(cx) {
1764 Poll::Ready(Ok(_)) => match Pin::new(pointer:reader).poll_fill_buf(cx) {
1765 Poll::Ready(Ok(slice: &[u8])) => Poll::Ready(Ok(slice)),
1766 poll: Poll> => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1767 },
1768 Poll::Ready(Err(err: Error)) => Poll::Ready(Err(err)),
1769 Poll::Pending => {
1770 this.reader = Some(reader);
1771 Poll::Pending
1772 }
1773 }
1774 }
1775}
1776
1777/// Future for the [`AsyncBufReadExt::read_until()`] method.
1778#[derive(Debug)]
1779#[must_use = "futures do nothing unless you `.await` or poll them"]
1780pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1781 reader: &'a mut R,
1782 byte: u8,
1783 buf: &'a mut Vec<u8>,
1784 read: usize,
1785}
1786
1787impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1788
1789impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1790 type Output = Result<usize>;
1791
1792 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1793 let Self {
1794 reader: &mut &mut R,
1795 byte: &mut u8,
1796 buf: &mut &mut Vec,
1797 read: &mut usize,
1798 } = &mut *self;
1799 read_until_internal(reader:Pin::new(pointer:reader), cx, *byte, buf, read)
1800 }
1801}
1802
1803fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1804 mut reader: Pin<&mut R>,
1805 cx: &mut Context<'_>,
1806 byte: u8,
1807 buf: &mut Vec<u8>,
1808 read: &mut usize,
1809) -> Poll<Result<usize>> {
1810 loop {
1811 let (done: bool, used: usize) = {
1812 let available: &[u8] = ready!(reader.as_mut().poll_fill_buf(cx))?;
1813
1814 if let Some(i: usize) = memchr::memchr(needle:byte, haystack:available) {
1815 buf.extend_from_slice(&available[..=i]);
1816 (true, i + 1)
1817 } else {
1818 buf.extend_from_slice(available);
1819 (false, available.len())
1820 }
1821 };
1822
1823 reader.as_mut().consume(amt:used);
1824 *read += used;
1825
1826 if done || used == 0 {
1827 return Poll::Ready(Ok(mem::replace(dest:read, src:0)));
1828 }
1829 }
1830}
1831
1832/// Future for the [`AsyncBufReadExt::read_line()`] method.
1833#[derive(Debug)]
1834#[must_use = "futures do nothing unless you `.await` or poll them"]
1835pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1836 reader: &'a mut R,
1837 buf: &'a mut String,
1838 bytes: Vec<u8>,
1839 read: usize,
1840}
1841
1842impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1843
1844impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1845 type Output = Result<usize>;
1846
1847 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1848 let Self {
1849 reader: &mut &mut R,
1850 buf: &mut &mut String,
1851 bytes: &mut Vec,
1852 read: &mut usize,
1853 } = &mut *self;
1854 read_line_internal(reader:Pin::new(pointer:reader), cx, buf, bytes, read)
1855 }
1856}
1857
1858pin_project! {
1859 /// Stream for the [`AsyncBufReadExt::lines()`] method.
1860 #[derive(Debug)]
1861 #[must_use = "streams do nothing unless polled"]
1862 pub struct Lines<R> {
1863 #[pin]
1864 reader: R,
1865 buf: String,
1866 bytes: Vec<u8>,
1867 read: usize,
1868 }
1869}
1870
1871impl<R: AsyncBufRead> Stream for Lines<R> {
1872 type Item = Result<String>;
1873
1874 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1875 let this = self.project();
1876
1877 let n = ready!(read_line_internal(
1878 this.reader,
1879 cx,
1880 this.buf,
1881 this.bytes,
1882 this.read
1883 ))?;
1884 if n == 0 && this.buf.is_empty() {
1885 return Poll::Ready(None);
1886 }
1887
1888 if this.buf.ends_with('\n') {
1889 this.buf.pop();
1890 if this.buf.ends_with('\r') {
1891 this.buf.pop();
1892 }
1893 }
1894 Poll::Ready(Some(Ok(mem::take(this.buf))))
1895 }
1896}
1897
1898fn read_line_internal<R: AsyncBufRead + ?Sized>(
1899 reader: Pin<&mut R>,
1900 cx: &mut Context<'_>,
1901 buf: &mut String,
1902 bytes: &mut Vec<u8>,
1903 read: &mut usize,
1904) -> Poll<Result<usize>> {
1905 let ret: Result = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1906
1907 match String::from_utf8(vec:mem::take(dest:bytes)) {
1908 Ok(s: String) => {
1909 debug_assert!(buf.is_empty());
1910 debug_assert_eq!(*read, 0);
1911 *buf = s;
1912 Poll::Ready(ret)
1913 }
1914 Err(_) => Poll::Ready(ret.and_then(|_| {
1915 Err(Error::new(
1916 kind:ErrorKind::InvalidData,
1917 error:"stream did not contain valid UTF-8",
1918 ))
1919 })),
1920 }
1921}
1922
1923pin_project! {
1924 /// Stream for the [`AsyncBufReadExt::split()`] method.
1925 #[derive(Debug)]
1926 #[must_use = "streams do nothing unless polled"]
1927 pub struct Split<R> {
1928 #[pin]
1929 reader: R,
1930 buf: Vec<u8>,
1931 read: usize,
1932 delim: u8,
1933 }
1934}
1935
1936impl<R: AsyncBufRead> Stream for Split<R> {
1937 type Item = Result<Vec<u8>>;
1938
1939 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1940 let this: Projection<'_, R> = self.project();
1941
1942 let n: usize = ready!(read_until_internal(
1943 this.reader,
1944 cx,
1945 *this.delim,
1946 this.buf,
1947 this.read
1948 ))?;
1949 if n == 0 && this.buf.is_empty() {
1950 return Poll::Ready(None);
1951 }
1952
1953 if this.buf[this.buf.len() - 1] == *this.delim {
1954 this.buf.pop();
1955 }
1956 Poll::Ready(Some(Ok(mem::take(dest:this.buf))))
1957 }
1958}
1959
1960/// Extension trait for [`AsyncRead`].
1961pub trait AsyncReadExt: AsyncRead {
1962 /// Reads some bytes from the byte stream.
1963 ///
1964 /// On success, returns the total number of bytes read.
1965 ///
1966 /// If the return value is `Ok(n)`, then it must be guaranteed that
1967 /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1968 /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1969 /// scenarios:
1970 ///
1971 /// 1. This reader has reached its "end of file" and will likely no longer be able to
1972 /// produce bytes. Note that this does not mean that the reader will always no
1973 /// longer be able to produce bytes.
1974 /// 2. The buffer specified was 0 bytes in length.
1975 ///
1976 /// # Examples
1977 ///
1978 /// ```
1979 /// use futures_lite::io::{AsyncReadExt, BufReader};
1980 ///
1981 /// # spin_on::spin_on(async {
1982 /// let input: &[u8] = b"hello";
1983 /// let mut reader = BufReader::new(input);
1984 ///
1985 /// let mut buf = vec![0; 1024];
1986 /// let n = reader.read(&mut buf).await?;
1987 /// # std::io::Result::Ok(()) });
1988 /// ```
1989 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1990 where
1991 Self: Unpin,
1992 {
1993 ReadFuture { reader: self, buf }
1994 }
1995
1996 /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
1997 ///
1998 /// Data is copied to fill each buffer in order, with the final buffer possibly being
1999 /// only partially filled. This method must behave same as a single call to
2000 /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
2001 fn read_vectored<'a>(
2002 &'a mut self,
2003 bufs: &'a mut [IoSliceMut<'a>],
2004 ) -> ReadVectoredFuture<'a, Self>
2005 where
2006 Self: Unpin,
2007 {
2008 ReadVectoredFuture { reader: self, bufs }
2009 }
2010
2011 /// Reads the entire contents and appends them to a [`Vec`].
2012 ///
2013 /// On success, returns the total number of bytes read.
2014 ///
2015 /// # Examples
2016 ///
2017 /// ```
2018 /// use futures_lite::io::{AsyncReadExt, Cursor};
2019 ///
2020 /// # spin_on::spin_on(async {
2021 /// let mut reader = Cursor::new(vec![1, 2, 3]);
2022 /// let mut contents = Vec::new();
2023 ///
2024 /// let n = reader.read_to_end(&mut contents).await?;
2025 /// assert_eq!(n, 3);
2026 /// assert_eq!(contents, [1, 2, 3]);
2027 /// # std::io::Result::Ok(()) });
2028 /// ```
2029 fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
2030 where
2031 Self: Unpin,
2032 {
2033 let start_len = buf.len();
2034 ReadToEndFuture {
2035 reader: self,
2036 buf,
2037 start_len,
2038 }
2039 }
2040
2041 /// Reads the entire contents and appends them to a [`String`].
2042 ///
2043 /// On success, returns the total number of bytes read.
2044 ///
2045 /// # Examples
2046 ///
2047 /// ```
2048 /// use futures_lite::io::{AsyncReadExt, Cursor};
2049 ///
2050 /// # spin_on::spin_on(async {
2051 /// let mut reader = Cursor::new(&b"hello");
2052 /// let mut contents = String::new();
2053 ///
2054 /// let n = reader.read_to_string(&mut contents).await?;
2055 /// assert_eq!(n, 5);
2056 /// assert_eq!(contents, "hello");
2057 /// # std::io::Result::Ok(()) });
2058 /// ```
2059 fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
2060 where
2061 Self: Unpin,
2062 {
2063 ReadToStringFuture {
2064 reader: self,
2065 buf,
2066 bytes: Vec::new(),
2067 start_len: 0,
2068 }
2069 }
2070
2071 /// Reads the exact number of bytes required to fill `buf`.
2072 ///
2073 /// On success, returns the total number of bytes read.
2074 ///
2075 /// # Examples
2076 ///
2077 /// ```
2078 /// use futures_lite::io::{AsyncReadExt, Cursor};
2079 ///
2080 /// # spin_on::spin_on(async {
2081 /// let mut reader = Cursor::new(&b"hello");
2082 /// let mut contents = vec![0; 3];
2083 ///
2084 /// reader.read_exact(&mut contents).await?;
2085 /// assert_eq!(contents, b"hel");
2086 /// # std::io::Result::Ok(()) });
2087 /// ```
2088 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
2089 where
2090 Self: Unpin,
2091 {
2092 ReadExactFuture { reader: self, buf }
2093 }
2094
2095 /// Creates an adapter which will read at most `limit` bytes from it.
2096 ///
2097 /// This method returns a new instance of [`AsyncRead`] which will read at most
2098 /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
2099 ///
2100 /// # Examples
2101 ///
2102 /// ```
2103 /// use futures_lite::io::{AsyncReadExt, Cursor};
2104 ///
2105 /// # spin_on::spin_on(async {
2106 /// let mut reader = Cursor::new(&b"hello");
2107 /// let mut contents = String::new();
2108 ///
2109 /// let n = reader.take(3).read_to_string(&mut contents).await?;
2110 /// assert_eq!(n, 3);
2111 /// assert_eq!(contents, "hel");
2112 /// # std::io::Result::Ok(()) });
2113 /// ```
2114 fn take(self, limit: u64) -> Take<Self>
2115 where
2116 Self: Sized,
2117 {
2118 Take { inner: self, limit }
2119 }
2120
2121 /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
2122 ///
2123 /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
2124 ///
2125 /// ```
2126 /// use futures_lite::io::{AsyncReadExt, Cursor};
2127 /// use futures_lite::stream::StreamExt;
2128 ///
2129 /// # spin_on::spin_on(async {
2130 /// let reader = Cursor::new(&b"hello");
2131 /// let mut bytes = reader.bytes();
2132 ///
2133 /// while let Some(byte) = bytes.next().await {
2134 /// println!("byte: {}", byte?);
2135 /// }
2136 /// # std::io::Result::Ok(()) });
2137 /// ```
2138 fn bytes(self) -> Bytes<Self>
2139 where
2140 Self: Sized,
2141 {
2142 Bytes { inner: self }
2143 }
2144
2145 /// Creates an adapter which will chain this stream with another.
2146 ///
2147 /// The returned [`AsyncRead`] instance will first read all bytes from this reader
2148 /// until EOF is found, and then continue with `next`.
2149 ///
2150 /// # Examples
2151 ///
2152 /// ```
2153 /// use futures_lite::io::{AsyncReadExt, Cursor};
2154 ///
2155 /// # spin_on::spin_on(async {
2156 /// let r1 = Cursor::new(&b"hello");
2157 /// let r2 = Cursor::new(&b"world");
2158 /// let mut reader = r1.chain(r2);
2159 ///
2160 /// let mut contents = String::new();
2161 /// reader.read_to_string(&mut contents).await?;
2162 /// assert_eq!(contents, "helloworld");
2163 /// # std::io::Result::Ok(()) });
2164 /// ```
2165 fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
2166 where
2167 Self: Sized,
2168 {
2169 Chain {
2170 first: self,
2171 second: next,
2172 done_first: false,
2173 }
2174 }
2175
2176 /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
2177 ///
2178 /// # Examples
2179 ///
2180 /// ```
2181 /// use futures_lite::io::AsyncReadExt;
2182 ///
2183 /// let reader = [1, 2, 3].boxed_reader();
2184 /// ```
2185 #[cfg(feature = "alloc")]
2186 fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
2187 where
2188 Self: Sized + Send + 'a,
2189 {
2190 Box::pin(self)
2191 }
2192}
2193
2194impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
2195
2196/// Future for the [`AsyncReadExt::read()`] method.
2197#[derive(Debug)]
2198#[must_use = "futures do nothing unless you `.await` or poll them"]
2199pub struct ReadFuture<'a, R: Unpin + ?Sized> {
2200 reader: &'a mut R,
2201 buf: &'a mut [u8],
2202}
2203
2204impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
2205
2206impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
2207 type Output = Result<usize>;
2208
2209 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2210 let Self { reader: &mut &mut R, buf: &mut &mut [u8] } = &mut *self;
2211 Pin::new(pointer:reader).poll_read(cx, buf)
2212 }
2213}
2214
2215/// Future for the [`AsyncReadExt::read_vectored()`] method.
2216#[derive(Debug)]
2217#[must_use = "futures do nothing unless you `.await` or poll them"]
2218pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
2219 reader: &'a mut R,
2220 bufs: &'a mut [IoSliceMut<'a>],
2221}
2222
2223impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
2224
2225impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
2226 type Output = Result<usize>;
2227
2228 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2229 let Self { reader: &mut &mut R, bufs: &mut &mut [IoSliceMut<'_>] } = &mut *self;
2230 Pin::new(pointer:reader).poll_read_vectored(cx, bufs)
2231 }
2232}
2233
2234/// Future for the [`AsyncReadExt::read_to_end()`] method.
2235#[derive(Debug)]
2236#[must_use = "futures do nothing unless you `.await` or poll them"]
2237pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2238 reader: &'a mut R,
2239 buf: &'a mut Vec<u8>,
2240 start_len: usize,
2241}
2242
2243impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2244
2245impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2246 type Output = Result<usize>;
2247
2248 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2249 let Self {
2250 reader: &mut &mut R,
2251 buf: &mut &mut Vec,
2252 start_len: &mut usize,
2253 } = &mut *self;
2254 read_to_end_internal(rd:Pin::new(pointer:reader), cx, buf, *start_len)
2255 }
2256}
2257
2258/// Future for the [`AsyncReadExt::read_to_string()`] method.
2259#[derive(Debug)]
2260#[must_use = "futures do nothing unless you `.await` or poll them"]
2261pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2262 reader: &'a mut R,
2263 buf: &'a mut String,
2264 bytes: Vec<u8>,
2265 start_len: usize,
2266}
2267
2268impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2269
2270impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2271 type Output = Result<usize>;
2272
2273 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2274 let Self {
2275 reader,
2276 buf,
2277 bytes,
2278 start_len,
2279 } = &mut *self;
2280 let reader = Pin::new(reader);
2281
2282 let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2283
2284 match String::from_utf8(mem::take(bytes)) {
2285 Ok(s) => {
2286 debug_assert!(buf.is_empty());
2287 **buf = s;
2288 Poll::Ready(ret)
2289 }
2290 Err(_) => Poll::Ready(ret.and_then(|_| {
2291 Err(Error::new(
2292 ErrorKind::InvalidData,
2293 "stream did not contain valid UTF-8",
2294 ))
2295 })),
2296 }
2297 }
2298}
2299
2300// This uses an adaptive system to extend the vector when it fills. We want to
2301// avoid paying to allocate and zero a huge chunk of memory if the reader only
2302// has 4 bytes while still making large reads if the reader does have a ton
2303// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2304// time is 4,500 times (!) slower than this if the reader has a very small
2305// amount of data to return.
2306//
2307// Because we're extending the buffer with uninitialized data for trusted
2308// readers, we need to make sure to truncate that if any of this panics.
2309fn read_to_end_internal<R: AsyncRead + ?Sized>(
2310 mut rd: Pin<&mut R>,
2311 cx: &mut Context<'_>,
2312 buf: &mut Vec<u8>,
2313 start_len: usize,
2314) -> Poll<Result<usize>> {
2315 struct Guard<'a> {
2316 buf: &'a mut Vec<u8>,
2317 len: usize,
2318 }
2319
2320 impl Drop for Guard<'_> {
2321 fn drop(&mut self) {
2322 self.buf.resize(self.len, 0);
2323 }
2324 }
2325
2326 let mut g = Guard {
2327 len: buf.len(),
2328 buf,
2329 };
2330 let ret;
2331 loop {
2332 if g.len == g.buf.len() {
2333 g.buf.reserve(32);
2334 let capacity = g.buf.capacity();
2335 g.buf.resize(capacity, 0);
2336 }
2337
2338 match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2339 Ok(0) => {
2340 ret = Poll::Ready(Ok(g.len - start_len));
2341 break;
2342 }
2343 Ok(n) => g.len += n,
2344 Err(e) => {
2345 ret = Poll::Ready(Err(e));
2346 break;
2347 }
2348 }
2349 }
2350
2351 ret
2352}
2353
2354/// Future for the [`AsyncReadExt::read_exact()`] method.
2355#[derive(Debug)]
2356#[must_use = "futures do nothing unless you `.await` or poll them"]
2357pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2358 reader: &'a mut R,
2359 buf: &'a mut [u8],
2360}
2361
2362impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2363
2364impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2365 type Output = Result<()>;
2366
2367 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2368 let Self { reader: &mut &mut R, buf: &mut &mut [u8] } = &mut *self;
2369
2370 while !buf.is_empty() {
2371 let n: usize = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2372 let (_, rest: &mut [u8]) = mem::take(buf).split_at_mut(mid:n);
2373 *buf = rest;
2374
2375 if n == 0 {
2376 return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2377 }
2378 }
2379
2380 Poll::Ready(Ok(()))
2381 }
2382}
2383
2384pin_project! {
2385 /// Reader for the [`AsyncReadExt::take()`] method.
2386 #[derive(Debug)]
2387 pub struct Take<R> {
2388 #[pin]
2389 inner: R,
2390 limit: u64,
2391 }
2392}
2393
2394impl<R> Take<R> {
2395 /// Returns the number of bytes before this adapter will return EOF.
2396 ///
2397 /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2398 ///
2399 /// # Examples
2400 ///
2401 /// ```
2402 /// use futures_lite::io::{AsyncReadExt, Cursor};
2403 ///
2404 /// let reader = Cursor::new("hello");
2405 ///
2406 /// let reader = reader.take(3);
2407 /// assert_eq!(reader.limit(), 3);
2408 /// ```
2409 pub fn limit(&self) -> u64 {
2410 self.limit
2411 }
2412
2413 /// Puts a limit on the number of bytes.
2414 ///
2415 /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2416 ///
2417 /// # Examples
2418 ///
2419 /// ```
2420 /// use futures_lite::io::{AsyncReadExt, Cursor};
2421 ///
2422 /// let reader = Cursor::new("hello");
2423 ///
2424 /// let mut reader = reader.take(10);
2425 /// assert_eq!(reader.limit(), 10);
2426 ///
2427 /// reader.set_limit(3);
2428 /// assert_eq!(reader.limit(), 3);
2429 /// ```
2430 pub fn set_limit(&mut self, limit: u64) {
2431 self.limit = limit;
2432 }
2433
2434 /// Gets a reference to the underlying reader.
2435 ///
2436 /// # Examples
2437 ///
2438 /// ```
2439 /// use futures_lite::io::{AsyncReadExt, Cursor};
2440 ///
2441 /// let reader = Cursor::new("hello");
2442 ///
2443 /// let reader = reader.take(3);
2444 /// let r = reader.get_ref();
2445 /// ```
2446 pub fn get_ref(&self) -> &R {
2447 &self.inner
2448 }
2449
2450 /// Gets a mutable reference to the underlying reader.
2451 ///
2452 /// # Examples
2453 ///
2454 /// ```
2455 /// use futures_lite::io::{AsyncReadExt, Cursor};
2456 ///
2457 /// let reader = Cursor::new("hello");
2458 ///
2459 /// let mut reader = reader.take(3);
2460 /// let r = reader.get_mut();
2461 /// ```
2462 pub fn get_mut(&mut self) -> &mut R {
2463 &mut self.inner
2464 }
2465
2466 /// Unwraps the adapter, returning the underlying reader.
2467 ///
2468 /// # Examples
2469 ///
2470 /// ```
2471 /// use futures_lite::io::{AsyncReadExt, Cursor};
2472 ///
2473 /// let reader = Cursor::new("hello");
2474 ///
2475 /// let reader = reader.take(3);
2476 /// let reader = reader.into_inner();
2477 /// ```
2478 pub fn into_inner(self) -> R {
2479 self.inner
2480 }
2481}
2482
2483impl<R: AsyncRead> AsyncRead for Take<R> {
2484 fn poll_read(
2485 self: Pin<&mut Self>,
2486 cx: &mut Context<'_>,
2487 buf: &mut [u8],
2488 ) -> Poll<Result<usize>> {
2489 let this: Projection<'_, R> = self.project();
2490 take_read_internal(rd:this.inner, cx, buf, this.limit)
2491 }
2492}
2493
2494fn take_read_internal<R: AsyncRead + ?Sized>(
2495 mut rd: Pin<&mut R>,
2496 cx: &mut Context<'_>,
2497 buf: &mut [u8],
2498 limit: &mut u64,
2499) -> Poll<Result<usize>> {
2500 // Don't call into inner reader at all at EOF because it may still block
2501 if *limit == 0 {
2502 return Poll::Ready(Ok(0));
2503 }
2504
2505 let max: usize = cmp::min(v1:buf.len() as u64, *limit) as usize;
2506
2507 match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2508 Ok(n: usize) => {
2509 *limit -= n as u64;
2510 Poll::Ready(Ok(n))
2511 }
2512 Err(e: Error) => Poll::Ready(Err(e)),
2513 }
2514}
2515
2516impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2517 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2518 let this = self.project();
2519
2520 if *this.limit == 0 {
2521 return Poll::Ready(Ok(&[]));
2522 }
2523
2524 match ready!(this.inner.poll_fill_buf(cx)) {
2525 Ok(buf) => {
2526 let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2527 Poll::Ready(Ok(&buf[..cap]))
2528 }
2529 Err(e) => Poll::Ready(Err(e)),
2530 }
2531 }
2532
2533 fn consume(self: Pin<&mut Self>, amt: usize) {
2534 let this = self.project();
2535 // Don't let callers reset the limit by passing an overlarge value
2536 let amt = cmp::min(amt as u64, *this.limit) as usize;
2537 *this.limit -= amt as u64;
2538
2539 this.inner.consume(amt);
2540 }
2541}
2542
2543pin_project! {
2544 /// Reader for the [`AsyncReadExt::bytes()`] method.
2545 #[derive(Debug)]
2546 pub struct Bytes<R> {
2547 #[pin]
2548 inner: R,
2549 }
2550}
2551
2552impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2553 type Item = Result<u8>;
2554
2555 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2556 let mut byte: u8 = 0;
2557
2558 let rd: Pin<&mut R> = Pin::new(&mut self.inner);
2559
2560 match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2561 Ok(0) => Poll::Ready(None),
2562 Ok(..) => Poll::Ready(Some(Ok(byte))),
2563 Err(ref e: &Error) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2564 Err(e: Error) => Poll::Ready(Some(Err(e))),
2565 }
2566 }
2567}
2568
2569impl<R: AsyncRead> AsyncRead for Bytes<R> {
2570 fn poll_read(
2571 self: Pin<&mut Self>,
2572 cx: &mut Context<'_>,
2573 buf: &mut [u8],
2574 ) -> Poll<Result<usize>> {
2575 self.project().inner.poll_read(cx, buf)
2576 }
2577
2578 fn poll_read_vectored(
2579 self: Pin<&mut Self>,
2580 cx: &mut Context<'_>,
2581 bufs: &mut [IoSliceMut<'_>],
2582 ) -> Poll<Result<usize>> {
2583 self.project().inner.poll_read_vectored(cx, bufs)
2584 }
2585}
2586
2587pin_project! {
2588 /// Reader for the [`AsyncReadExt::chain()`] method.
2589 pub struct Chain<R1, R2> {
2590 #[pin]
2591 first: R1,
2592 #[pin]
2593 second: R2,
2594 done_first: bool,
2595 }
2596}
2597
2598impl<R1, R2> Chain<R1, R2> {
2599 /// Gets references to the underlying readers.
2600 ///
2601 /// # Examples
2602 ///
2603 /// ```
2604 /// use futures_lite::io::{AsyncReadExt, Cursor};
2605 ///
2606 /// let r1 = Cursor::new(b"hello");
2607 /// let r2 = Cursor::new(b"world");
2608 ///
2609 /// let reader = r1.chain(r2);
2610 /// let (r1, r2) = reader.get_ref();
2611 /// ```
2612 pub fn get_ref(&self) -> (&R1, &R2) {
2613 (&self.first, &self.second)
2614 }
2615
2616 /// Gets mutable references to the underlying readers.
2617 ///
2618 /// # Examples
2619 ///
2620 /// ```
2621 /// use futures_lite::io::{AsyncReadExt, Cursor};
2622 ///
2623 /// let r1 = Cursor::new(b"hello");
2624 /// let r2 = Cursor::new(b"world");
2625 ///
2626 /// let mut reader = r1.chain(r2);
2627 /// let (r1, r2) = reader.get_mut();
2628 /// ```
2629 pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2630 (&mut self.first, &mut self.second)
2631 }
2632
2633 /// Unwraps the adapter, returning the underlying readers.
2634 ///
2635 /// # Examples
2636 ///
2637 /// ```
2638 /// use futures_lite::io::{AsyncReadExt, Cursor};
2639 ///
2640 /// let r1 = Cursor::new(b"hello");
2641 /// let r2 = Cursor::new(b"world");
2642 ///
2643 /// let reader = r1.chain(r2);
2644 /// let (r1, r2) = reader.into_inner();
2645 /// ```
2646 pub fn into_inner(self) -> (R1, R2) {
2647 (self.first, self.second)
2648 }
2649}
2650
2651impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2653 f&mut DebugStruct<'_, '_>.debug_struct("Chain")
2654 .field("r1", &self.first)
2655 .field(name:"r2", &self.second)
2656 .finish()
2657 }
2658}
2659
2660impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2661 fn poll_read(
2662 self: Pin<&mut Self>,
2663 cx: &mut Context<'_>,
2664 buf: &mut [u8],
2665 ) -> Poll<Result<usize>> {
2666 let this = self.project();
2667 if !*this.done_first {
2668 match ready!(this.first.poll_read(cx, buf)) {
2669 Ok(0) if !buf.is_empty() => *this.done_first = true,
2670 Ok(n) => return Poll::Ready(Ok(n)),
2671 Err(err) => return Poll::Ready(Err(err)),
2672 }
2673 }
2674
2675 this.second.poll_read(cx, buf)
2676 }
2677
2678 fn poll_read_vectored(
2679 self: Pin<&mut Self>,
2680 cx: &mut Context<'_>,
2681 bufs: &mut [IoSliceMut<'_>],
2682 ) -> Poll<Result<usize>> {
2683 let this = self.project();
2684 if !*this.done_first {
2685 match ready!(this.first.poll_read_vectored(cx, bufs)) {
2686 Ok(0) if !bufs.is_empty() => *this.done_first = true,
2687 Ok(n) => return Poll::Ready(Ok(n)),
2688 Err(err) => return Poll::Ready(Err(err)),
2689 }
2690 }
2691
2692 this.second.poll_read_vectored(cx, bufs)
2693 }
2694}
2695
2696impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2697 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2698 let this = self.project();
2699 if !*this.done_first {
2700 match ready!(this.first.poll_fill_buf(cx)) {
2701 Ok(buf) if buf.is_empty() => {
2702 *this.done_first = true;
2703 }
2704 Ok(buf) => return Poll::Ready(Ok(buf)),
2705 Err(err) => return Poll::Ready(Err(err)),
2706 }
2707 }
2708
2709 this.second.poll_fill_buf(cx)
2710 }
2711
2712 fn consume(self: Pin<&mut Self>, amt: usize) {
2713 let this = self.project();
2714 if !*this.done_first {
2715 this.first.consume(amt)
2716 } else {
2717 this.second.consume(amt)
2718 }
2719 }
2720}
2721
2722/// Extension trait for [`AsyncSeek`].
2723pub trait AsyncSeekExt: AsyncSeek {
2724 /// Seeks to a new position in a byte stream.
2725 ///
2726 /// Returns the new position in the byte stream.
2727 ///
2728 /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2729 ///
2730 /// # Examples
2731 ///
2732 /// ```
2733 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2734 ///
2735 /// # spin_on::spin_on(async {
2736 /// let mut cursor = Cursor::new("hello");
2737 ///
2738 /// // Move the cursor to the end.
2739 /// cursor.seek(SeekFrom::End(0)).await?;
2740 ///
2741 /// // Check the current position.
2742 /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2743 /// # std::io::Result::Ok(()) });
2744 /// ```
2745 fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2746 where
2747 Self: Unpin,
2748 {
2749 SeekFuture { seeker: self, pos }
2750 }
2751}
2752
2753impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2754
2755/// Future for the [`AsyncSeekExt::seek()`] method.
2756#[derive(Debug)]
2757#[must_use = "futures do nothing unless you `.await` or poll them"]
2758pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2759 seeker: &'a mut S,
2760 pos: SeekFrom,
2761}
2762
2763impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2764
2765impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2766 type Output = Result<u64>;
2767
2768 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2769 let pos: SeekFrom = self.pos;
2770 Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2771 }
2772}
2773
2774/// Extension trait for [`AsyncWrite`].
2775pub trait AsyncWriteExt: AsyncWrite {
2776 /// Writes some bytes into the byte stream.
2777 ///
2778 /// Returns the number of bytes written from the start of the buffer.
2779 ///
2780 /// If the return value is `Ok(n)` then it must be guaranteed that
2781 /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2782 /// object is no longer able to accept bytes and will likely not be able to in the
2783 /// future as well, or that the provided buffer is empty.
2784 ///
2785 /// # Examples
2786 ///
2787 /// ```
2788 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2789 ///
2790 /// # spin_on::spin_on(async {
2791 /// let mut output = Vec::new();
2792 /// let mut writer = BufWriter::new(&mut output);
2793 ///
2794 /// let n = writer.write(b"hello").await?;
2795 /// # std::io::Result::Ok(()) });
2796 /// ```
2797 fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2798 where
2799 Self: Unpin,
2800 {
2801 WriteFuture { writer: self, buf }
2802 }
2803
2804 /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2805 ///
2806 /// Data is copied from each buffer in order, with the final buffer possibly being only
2807 /// partially consumed. This method must behave same as a call to
2808 /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2809 fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2810 where
2811 Self: Unpin,
2812 {
2813 WriteVectoredFuture { writer: self, bufs }
2814 }
2815
2816 /// Writes an entire buffer into the byte stream.
2817 ///
2818 /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2819 /// data to be written or an error occurs. It will not return before the entire buffer is
2820 /// successfully written or an error occurs.
2821 ///
2822 /// # Examples
2823 ///
2824 /// ```
2825 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2826 ///
2827 /// # spin_on::spin_on(async {
2828 /// let mut output = Vec::new();
2829 /// let mut writer = BufWriter::new(&mut output);
2830 ///
2831 /// let n = writer.write_all(b"hello").await?;
2832 /// # std::io::Result::Ok(()) });
2833 /// ```
2834 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2835 where
2836 Self: Unpin,
2837 {
2838 WriteAllFuture { writer: self, buf }
2839 }
2840
2841 /// Flushes the stream to ensure that all buffered contents reach their destination.
2842 ///
2843 /// # Examples
2844 ///
2845 /// ```
2846 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2847 ///
2848 /// # spin_on::spin_on(async {
2849 /// let mut output = Vec::new();
2850 /// let mut writer = BufWriter::new(&mut output);
2851 ///
2852 /// writer.write_all(b"hello").await?;
2853 /// writer.flush().await?;
2854 /// # std::io::Result::Ok(()) });
2855 /// ```
2856 fn flush(&mut self) -> FlushFuture<'_, Self>
2857 where
2858 Self: Unpin,
2859 {
2860 FlushFuture { writer: self }
2861 }
2862
2863 /// Closes the writer.
2864 ///
2865 /// # Examples
2866 ///
2867 /// ```
2868 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2869 ///
2870 /// # spin_on::spin_on(async {
2871 /// let mut output = Vec::new();
2872 /// let mut writer = BufWriter::new(&mut output);
2873 ///
2874 /// writer.close().await?;
2875 /// # std::io::Result::Ok(()) });
2876 /// ```
2877 fn close(&mut self) -> CloseFuture<'_, Self>
2878 where
2879 Self: Unpin,
2880 {
2881 CloseFuture { writer: self }
2882 }
2883
2884 /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2885 ///
2886 /// # Examples
2887 ///
2888 /// ```
2889 /// use futures_lite::io::AsyncWriteExt;
2890 ///
2891 /// let writer = Vec::<u8>::new().boxed_writer();
2892 /// ```
2893 #[cfg(feature = "alloc")]
2894 fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2895 where
2896 Self: Sized + Send + 'a,
2897 {
2898 Box::pin(self)
2899 }
2900}
2901
2902impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2903
2904/// Future for the [`AsyncWriteExt::write()`] method.
2905#[derive(Debug)]
2906#[must_use = "futures do nothing unless you `.await` or poll them"]
2907pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2908 writer: &'a mut W,
2909 buf: &'a [u8],
2910}
2911
2912impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2913
2914impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2915 type Output = Result<usize>;
2916
2917 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2918 let buf: &[u8] = self.buf;
2919 Pin::new(&mut *self.writer).poll_write(cx, buf)
2920 }
2921}
2922
2923/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2924#[derive(Debug)]
2925#[must_use = "futures do nothing unless you `.await` or poll them"]
2926pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2927 writer: &'a mut W,
2928 bufs: &'a [IoSlice<'a>],
2929}
2930
2931impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2932
2933impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2934 type Output = Result<usize>;
2935
2936 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2937 let bufs: &[IoSlice<'_>] = self.bufs;
2938 Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2939 }
2940}
2941
2942/// Future for the [`AsyncWriteExt::write_all()`] method.
2943#[derive(Debug)]
2944#[must_use = "futures do nothing unless you `.await` or poll them"]
2945pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2946 writer: &'a mut W,
2947 buf: &'a [u8],
2948}
2949
2950impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2951
2952impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2953 type Output = Result<()>;
2954
2955 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2956 let Self { writer: &mut &mut W, buf: &mut &[u8] } = &mut *self;
2957
2958 while !buf.is_empty() {
2959 let n: usize = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2960 let (_, rest: &[u8]) = mem::take(buf).split_at(mid:n);
2961 *buf = rest;
2962
2963 if n == 0 {
2964 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2965 }
2966 }
2967
2968 Poll::Ready(Ok(()))
2969 }
2970}
2971
2972/// Future for the [`AsyncWriteExt::flush()`] method.
2973#[derive(Debug)]
2974#[must_use = "futures do nothing unless you `.await` or poll them"]
2975pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2976 writer: &'a mut W,
2977}
2978
2979impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2980
2981impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2982 type Output = Result<()>;
2983
2984 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2985 Pin::new(&mut *self.writer).poll_flush(cx)
2986 }
2987}
2988
2989/// Future for the [`AsyncWriteExt::close()`] method.
2990#[derive(Debug)]
2991#[must_use = "futures do nothing unless you `.await` or poll them"]
2992pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2993 writer: &'a mut W,
2994}
2995
2996impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2997
2998impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2999 type Output = Result<()>;
3000
3001 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3002 Pin::new(&mut *self.writer).poll_close(cx)
3003 }
3004}
3005
3006/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
3007///
3008/// # Examples
3009///
3010/// ```
3011/// use futures_lite::io::AsyncReadExt;
3012///
3013/// let reader = [1, 2, 3].boxed_reader();
3014/// ```
3015#[cfg(feature = "alloc")]
3016pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
3017
3018/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
3019///
3020/// # Examples
3021///
3022/// ```
3023/// use futures_lite::io::AsyncWriteExt;
3024///
3025/// let writer = Vec::<u8>::new().boxed_writer();
3026/// ```
3027#[cfg(feature = "alloc")]
3028pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
3029
3030/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
3031///
3032/// # Examples
3033///
3034/// ```
3035/// use futures_lite::io::{self, Cursor};
3036///
3037/// # spin_on::spin_on(async {
3038/// let stream = Cursor::new(vec![]);
3039/// let (mut reader, mut writer) = io::split(stream);
3040/// # std::io::Result::Ok(()) });
3041/// ```
3042pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
3043where
3044 T: AsyncRead + AsyncWrite + Unpin,
3045{
3046 let inner: Arc> = Arc::new(data:Mutex::new(stream));
3047 (ReadHalf(inner.clone()), WriteHalf(inner))
3048}
3049
3050/// The read half returned by [`split()`].
3051#[derive(Debug)]
3052pub struct ReadHalf<T>(Arc<Mutex<T>>);
3053
3054/// The write half returned by [`split()`].
3055#[derive(Debug)]
3056pub struct WriteHalf<T>(Arc<Mutex<T>>);
3057
3058impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
3059 fn poll_read(
3060 self: Pin<&mut Self>,
3061 cx: &mut Context<'_>,
3062 buf: &mut [u8],
3063 ) -> Poll<Result<usize>> {
3064 let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap();
3065 Pin::new(&mut *inner).poll_read(cx, buf)
3066 }
3067
3068 fn poll_read_vectored(
3069 self: Pin<&mut Self>,
3070 cx: &mut Context<'_>,
3071 bufs: &mut [IoSliceMut<'_>],
3072 ) -> Poll<Result<usize>> {
3073 let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap();
3074 Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
3075 }
3076}
3077
3078impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
3079 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
3080 let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap();
3081 Pin::new(&mut *inner).poll_write(cx, buf)
3082 }
3083
3084 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3085 let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap();
3086 Pin::new(&mut *inner).poll_flush(cx)
3087 }
3088
3089 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3090 let mut inner: MutexGuard<'_, T> = self.0.lock().unwrap();
3091 Pin::new(&mut *inner).poll_close(cx)
3092 }
3093}
3094