1 | use std::future::Future; |
2 | use std::pin::Pin; |
3 | |
4 | use pin_project_lite::pin_project; |
5 | |
6 | use crate::io::{self, BufRead, BufReader, Read, Write}; |
7 | use crate::task::{Context, Poll}; |
8 | use crate::utils::Context as _; |
9 | |
10 | // Note: There are two otherwise-identical implementations of this |
11 | // function because unstable has removed the `?Sized` bound for the |
12 | // reader and writer and accepts `R` and `W` instead of `&mut R` and |
13 | // `&mut W`. If making a change to either of the implementations, |
14 | // ensure that you copy it into the other. |
15 | |
16 | /// Copies the entire contents of a reader into a writer. |
17 | /// |
18 | /// This function will continuously read data from `reader` and then |
19 | /// write it into `writer` in a streaming fashion until `reader` |
20 | /// returns EOF. |
21 | /// |
22 | /// On success, the total number of bytes that were copied from |
23 | /// `reader` to `writer` is returned. |
24 | /// |
25 | /// If you’re wanting to copy the contents of one file to another and you’re |
26 | /// working with filesystem paths, see the [`fs::copy`] function. |
27 | /// |
28 | /// This function is an async version of [`std::io::copy`]. |
29 | /// |
30 | /// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html |
31 | /// [`fs::copy`]: ../fs/fn.copy.html |
32 | /// |
33 | /// # Errors |
34 | /// |
35 | /// This function will return an error immediately if any call to `read` or |
36 | /// `write` returns an error. All instances of `ErrorKind::Interrupted` are |
37 | /// handled by this function and the underlying operation is retried. |
38 | /// |
39 | /// # Examples |
40 | /// |
41 | /// ``` |
42 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
43 | /// # |
44 | /// use async_std::io; |
45 | /// |
46 | /// let mut reader: &[u8] = b"hello" ; |
47 | /// let mut writer = io::stdout(); |
48 | /// |
49 | /// io::copy(&mut reader, &mut writer).await?; |
50 | /// # |
51 | /// # Ok(()) }) } |
52 | /// ``` |
53 | #[cfg (any(feature = "docs" , not(feature = "unstable" )))] |
54 | pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64> |
55 | where |
56 | R: Read + Unpin + ?Sized, |
57 | W: Write + Unpin + ?Sized, |
58 | { |
59 | pin_project! { |
60 | struct CopyFuture<R, W> { |
61 | #[pin] |
62 | reader: R, |
63 | #[pin] |
64 | writer: W, |
65 | amt: u64, |
66 | reader_eof: bool |
67 | } |
68 | } |
69 | |
70 | impl<R, W> Future for CopyFuture<R, W> |
71 | where |
72 | R: BufRead, |
73 | W: Write + Unpin, |
74 | { |
75 | type Output = io::Result<u64>; |
76 | |
77 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
78 | let mut this = self.project(); |
79 | |
80 | loop { |
81 | if *this.reader_eof { |
82 | futures_core::ready!(this.writer.as_mut().poll_flush(cx))?; |
83 | return Poll::Ready(Ok(*this.amt)); |
84 | } |
85 | |
86 | let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; |
87 | |
88 | if buffer.is_empty() { |
89 | *this.reader_eof = true; |
90 | continue; |
91 | } |
92 | |
93 | let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?; |
94 | if i == 0 { |
95 | return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
96 | } |
97 | *this.amt += i as u64; |
98 | this.reader.as_mut().consume(i); |
99 | } |
100 | } |
101 | } |
102 | |
103 | let future = CopyFuture { |
104 | reader: BufReader::new(reader), |
105 | writer, |
106 | reader_eof: false, |
107 | amt: 0, |
108 | }; |
109 | future.await.context(|| String::from("io::copy failed" )) |
110 | } |
111 | |
112 | /// Copies the entire contents of a reader into a writer. |
113 | /// |
114 | /// This function will continuously read data from `reader` and then |
115 | /// write it into `writer` in a streaming fashion until `reader` |
116 | /// returns EOF. |
117 | /// |
118 | /// On success, the total number of bytes that were copied from |
119 | /// `reader` to `writer` is returned. |
120 | /// |
121 | /// If you’re wanting to copy the contents of one file to another and you’re |
122 | /// working with filesystem paths, see the [`fs::copy`] function. |
123 | /// |
124 | /// This function is an async version of [`std::io::copy`]. |
125 | /// |
126 | /// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html |
127 | /// [`fs::copy`]: ../fs/fn.copy.html |
128 | /// |
129 | /// # Errors |
130 | /// |
131 | /// This function will return an error immediately if any call to `read` or |
132 | /// `write` returns an error. All instances of `ErrorKind::Interrupted` are |
133 | /// handled by this function and the underlying operation is retried. |
134 | /// |
135 | /// # Examples |
136 | /// |
137 | /// ``` |
138 | /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
139 | /// # |
140 | /// use async_std::io; |
141 | /// |
142 | /// let mut reader: &[u8] = b"hello"; |
143 | /// let mut writer = io::stdout(); |
144 | /// |
145 | /// io::copy(&mut reader, &mut writer).await?; |
146 | /// # |
147 | /// # Ok(()) }) } |
148 | /// ``` |
149 | #[cfg (all(feature = "unstable" , not(feature = "docs" )))] |
150 | pub async fn copy<R, W>(reader: R, writer: W) -> io::Result<u64> |
151 | where |
152 | R: Read + Unpin, |
153 | W: Write + Unpin, |
154 | { |
155 | pin_project! { |
156 | struct CopyFuture<R, W> { |
157 | #[pin] |
158 | reader: R, |
159 | #[pin] |
160 | writer: W, |
161 | amt: u64, |
162 | reader_eof: bool |
163 | } |
164 | } |
165 | |
166 | impl<R, W> Future for CopyFuture<R, W> |
167 | where |
168 | R: BufRead, |
169 | W: Write + Unpin, |
170 | { |
171 | type Output = io::Result<u64>; |
172 | |
173 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
174 | let mut this = self.project(); |
175 | |
176 | loop { |
177 | if *this.reader_eof { |
178 | futures_core::ready!(this.writer.as_mut().poll_flush(cx))?; |
179 | return Poll::Ready(Ok(*this.amt)); |
180 | } |
181 | |
182 | let buffer = futures_core::ready!(this.reader.as_mut().poll_fill_buf(cx))?; |
183 | |
184 | if buffer.is_empty() { |
185 | *this.reader_eof = true; |
186 | continue; |
187 | } |
188 | |
189 | let i = futures_core::ready!(this.writer.as_mut().poll_write(cx, buffer))?; |
190 | if i == 0 { |
191 | return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); |
192 | } |
193 | *this.amt += i as u64; |
194 | this.reader.as_mut().consume(i); |
195 | } |
196 | } |
197 | } |
198 | |
199 | let future = CopyFuture { |
200 | reader: BufReader::new(reader), |
201 | writer, |
202 | reader_eof: false, |
203 | amt: 0, |
204 | }; |
205 | future.await.context(|| String::from("io::copy failed" )) |
206 | } |
207 | |