1use crate::io::util::fill_buf::{fill_buf, FillBuf};
2use crate::io::util::lines::{lines, Lines};
3use crate::io::util::read_line::{read_line, ReadLine};
4use crate::io::util::read_until::{read_until, ReadUntil};
5use crate::io::util::split::{split, Split};
6use crate::io::AsyncBufRead;
7
8cfg_io_util! {
9 /// An extension trait which adds utility methods to [`AsyncBufRead`] types.
10 ///
11 /// [`AsyncBufRead`]: crate::io::AsyncBufRead
12 pub trait AsyncBufReadExt: AsyncBufRead {
13 /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
14 ///
15 /// Equivalent to:
16 ///
17 /// ```ignore
18 /// async fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize>;
19 /// ```
20 ///
21 /// This function will read bytes from the underlying stream until the
22 /// delimiter or EOF is found. Once found, all bytes up to, and including,
23 /// the delimiter (if found) will be appended to `buf`.
24 ///
25 /// If successful, this function will return the total number of bytes read.
26 ///
27 /// If this function returns `Ok(0)`, the stream has reached EOF.
28 ///
29 /// # Errors
30 ///
31 /// This function will ignore all instances of [`ErrorKind::Interrupted`] and
32 /// will otherwise return any errors returned by [`fill_buf`].
33 ///
34 /// If an I/O error is encountered then all bytes read so far will be
35 /// present in `buf` and its length will have been adjusted appropriately.
36 ///
37 /// [`fill_buf`]: AsyncBufRead::poll_fill_buf
38 /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
39 ///
40 /// # Cancel safety
41 ///
42 /// If the method is used as the event in a
43 /// [`tokio::select!`](crate::select) statement and some other branch
44 /// completes first, then some data may have been partially read. Any
45 /// partially read bytes are appended to `buf`, and the method can be
46 /// called again to continue reading until `byte`.
47 ///
48 /// This method returns the total number of bytes read. If you cancel
49 /// the call to `read_until` and then call it again to continue reading,
50 /// the counter is reset.
51 ///
52 /// # Examples
53 ///
54 /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
55 /// this example, we use [`Cursor`] to read all the bytes in a byte slice
56 /// in hyphen delimited segments:
57 ///
58 /// [`Cursor`]: std::io::Cursor
59 ///
60 /// ```
61 /// use tokio::io::AsyncBufReadExt;
62 ///
63 /// use std::io::Cursor;
64 ///
65 /// #[tokio::main]
66 /// async fn main() {
67 /// let mut cursor = Cursor::new(b"lorem-ipsum");
68 /// let mut buf = vec![];
69 ///
70 /// // cursor is at 'l'
71 /// let num_bytes = cursor.read_until(b'-', &mut buf)
72 /// .await
73 /// .expect("reading from cursor won't fail");
74 ///
75 /// assert_eq!(num_bytes, 6);
76 /// assert_eq!(buf, b"lorem-");
77 /// buf.clear();
78 ///
79 /// // cursor is at 'i'
80 /// let num_bytes = cursor.read_until(b'-', &mut buf)
81 /// .await
82 /// .expect("reading from cursor won't fail");
83 ///
84 /// assert_eq!(num_bytes, 5);
85 /// assert_eq!(buf, b"ipsum");
86 /// buf.clear();
87 ///
88 /// // cursor is at EOF
89 /// let num_bytes = cursor.read_until(b'-', &mut buf)
90 /// .await
91 /// .expect("reading from cursor won't fail");
92 /// assert_eq!(num_bytes, 0);
93 /// assert_eq!(buf, b"");
94 /// }
95 /// ```
96 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
97 where
98 Self: Unpin,
99 {
100 read_until(self, byte, buf)
101 }
102
103 /// Reads all bytes until a newline (the 0xA byte) is reached, and append
104 /// them to the provided buffer.
105 ///
106 /// Equivalent to:
107 ///
108 /// ```ignore
109 /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>;
110 /// ```
111 ///
112 /// This function will read bytes from the underlying stream until the
113 /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
114 /// up to, and including, the delimiter (if found) will be appended to
115 /// `buf`.
116 ///
117 /// If successful, this function will return the total number of bytes read.
118 ///
119 /// If this function returns `Ok(0)`, the stream has reached EOF.
120 ///
121 /// # Errors
122 ///
123 /// This function has the same error semantics as [`read_until`] and will
124 /// also return an error if the read bytes are not valid UTF-8. If an I/O
125 /// error is encountered then `buf` may contain some bytes already read in
126 /// the event that all data read so far was valid UTF-8.
127 ///
128 /// [`read_until`]: AsyncBufReadExt::read_until
129 ///
130 /// # Cancel safety
131 ///
132 /// This method is not cancellation safe. If the method is used as the
133 /// event in a [`tokio::select!`](crate::select) statement and some
134 /// other branch completes first, then some data may have been partially
135 /// read, and this data is lost. There are no guarantees regarding the
136 /// contents of `buf` when the call is cancelled. The current
137 /// implementation replaces `buf` with the empty string, but this may
138 /// change in the future.
139 ///
140 /// This function does not behave like [`read_until`] because of the
141 /// requirement that a string contains only valid utf-8. If you need a
142 /// cancellation safe `read_line`, there are three options:
143 ///
144 /// * Call [`read_until`] with a newline character and manually perform the utf-8 check.
145 /// * The stream returned by [`lines`] has a cancellation safe
146 /// [`next_line`] method.
147 /// * Use [`tokio_util::codec::LinesCodec`][LinesCodec].
148 ///
149 /// [LinesCodec]: https://docs.rs/tokio-util/latest/tokio_util/codec/struct.LinesCodec.html
150 /// [`read_until`]: Self::read_until
151 /// [`lines`]: Self::lines
152 /// [`next_line`]: crate::io::Lines::next_line
153 ///
154 /// # Examples
155 ///
156 /// [`std::io::Cursor`][`Cursor`] is a type that implements
157 /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the
158 /// lines in a byte slice:
159 ///
160 /// [`Cursor`]: std::io::Cursor
161 ///
162 /// ```
163 /// use tokio::io::AsyncBufReadExt;
164 ///
165 /// use std::io::Cursor;
166 ///
167 /// #[tokio::main]
168 /// async fn main() {
169 /// let mut cursor = Cursor::new(b"foo\nbar");
170 /// let mut buf = String::new();
171 ///
172 /// // cursor is at 'f'
173 /// let num_bytes = cursor.read_line(&mut buf)
174 /// .await
175 /// .expect("reading from cursor won't fail");
176 ///
177 /// assert_eq!(num_bytes, 4);
178 /// assert_eq!(buf, "foo\n");
179 /// buf.clear();
180 ///
181 /// // cursor is at 'b'
182 /// let num_bytes = cursor.read_line(&mut buf)
183 /// .await
184 /// .expect("reading from cursor won't fail");
185 ///
186 /// assert_eq!(num_bytes, 3);
187 /// assert_eq!(buf, "bar");
188 /// buf.clear();
189 ///
190 /// // cursor is at EOF
191 /// let num_bytes = cursor.read_line(&mut buf)
192 /// .await
193 /// .expect("reading from cursor won't fail");
194 ///
195 /// assert_eq!(num_bytes, 0);
196 /// assert_eq!(buf, "");
197 /// }
198 /// ```
199 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
200 where
201 Self: Unpin,
202 {
203 read_line(self, buf)
204 }
205
206 /// Returns a stream of the contents of this reader split on the byte
207 /// `byte`.
208 ///
209 /// This method is the asynchronous equivalent to
210 /// [`BufRead::split`](std::io::BufRead::split).
211 ///
212 /// The stream returned from this function will yield instances of
213 /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have
214 /// the delimiter byte at the end.
215 ///
216 /// [`io::Result`]: std::io::Result
217 /// [`Option`]: core::option::Option
218 /// [`Vec<u8>`]: std::vec::Vec
219 ///
220 /// # Errors
221 ///
222 /// Each item of the stream has the same error semantics as
223 /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until).
224 ///
225 /// # Examples
226 ///
227 /// ```
228 /// # use tokio::io::AsyncBufRead;
229 /// use tokio::io::AsyncBufReadExt;
230 ///
231 /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
232 /// let mut segments = my_buf_read.split(b'f');
233 ///
234 /// while let Some(segment) = segments.next_segment().await? {
235 /// println!("length = {}", segment.len())
236 /// }
237 /// # Ok(())
238 /// # }
239 /// ```
240 fn split(self, byte: u8) -> Split<Self>
241 where
242 Self: Sized + Unpin,
243 {
244 split(self, byte)
245 }
246
247 /// Returns the contents of the internal buffer, filling it with more
248 /// data from the inner reader if it is empty.
249 ///
250 /// This function is a lower-level call. It needs to be paired with the
251 /// [`consume`] method to function properly. When calling this method,
252 /// none of the contents will be "read" in the sense that later calling
253 /// `read` may return the same contents. As such, [`consume`] must be
254 /// called with the number of bytes that are consumed from this buffer
255 /// to ensure that the bytes are never returned twice.
256 ///
257 /// An empty buffer returned indicates that the stream has reached EOF.
258 ///
259 /// Equivalent to:
260 ///
261 /// ```ignore
262 /// async fn fill_buf(&mut self) -> io::Result<&[u8]>;
263 /// ```
264 ///
265 /// # Errors
266 ///
267 /// This function will return an I/O error if the underlying reader was
268 /// read, but returned an error.
269 ///
270 /// [`consume`]: crate::io::AsyncBufReadExt::consume
271 fn fill_buf(&mut self) -> FillBuf<'_, Self>
272 where
273 Self: Unpin,
274 {
275 fill_buf(self)
276 }
277
278 /// Tells this buffer that `amt` bytes have been consumed from the
279 /// buffer, so they should no longer be returned in calls to [`read`].
280 ///
281 /// This function is a lower-level call. It needs to be paired with the
282 /// [`fill_buf`] method to function properly. This function does not
283 /// perform any I/O, it simply informs this object that some amount of
284 /// its buffer, returned from [`fill_buf`], has been consumed and should
285 /// no longer be returned. As such, this function may do odd things if
286 /// [`fill_buf`] isn't called before calling it.
287 ///
288 /// The `amt` must be less than the number of bytes in the buffer
289 /// returned by [`fill_buf`].
290 ///
291 /// [`read`]: crate::io::AsyncReadExt::read
292 /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf
293 fn consume(&mut self, amt: usize)
294 where
295 Self: Unpin,
296 {
297 std::pin::Pin::new(self).consume(amt)
298 }
299
300 /// Returns a stream over the lines of this reader.
301 /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
302 ///
303 /// The stream returned from this function will yield instances of
304 /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline
305 /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
306 ///
307 /// [`io::Result`]: std::io::Result
308 /// [`Option`]: core::option::Option
309 /// [`String`]: String
310 ///
311 /// # Errors
312 ///
313 /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
314 ///
315 /// # Examples
316 ///
317 /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
318 /// this example, we use [`Cursor`] to iterate over all the lines in a byte
319 /// slice.
320 ///
321 /// [`Cursor`]: std::io::Cursor
322 ///
323 /// ```
324 /// use tokio::io::AsyncBufReadExt;
325 ///
326 /// use std::io::Cursor;
327 ///
328 /// #[tokio::main]
329 /// async fn main() {
330 /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
331 ///
332 /// let mut lines = cursor.lines();
333 ///
334 /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem")));
335 /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum")));
336 /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor")));
337 /// assert_eq!(lines.next_line().await.unwrap(), None);
338 /// }
339 /// ```
340 ///
341 /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
342 fn lines(self) -> Lines<Self>
343 where
344 Self: Sized,
345 {
346 lines(self)
347 }
348 }
349}
350
351impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
352