1 | use crate::io::util::fill_buf::{fill_buf, FillBuf}; |
2 | use crate::io::util::lines::{lines, Lines}; |
3 | use crate::io::util::read_line::{read_line, ReadLine}; |
4 | use crate::io::util::read_until::{read_until, ReadUntil}; |
5 | use crate::io::util::split::{split, Split}; |
6 | use crate::io::AsyncBufRead; |
7 | |
8 | cfg_io_util! { |
9 | /// An extension trait which adds utility methods to [`AsyncBufRead`] types. |
10 | /// |
11 | /// [`AsyncBufRead`]: crate::io::AsyncBufRead |
12 | pub trait AsyncBufReadExt: AsyncBufRead { |
13 | /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. |
14 | /// |
15 | /// Equivalent to: |
16 | /// |
17 | /// ```ignore |
18 | /// async fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize>; |
19 | /// ``` |
20 | /// |
21 | /// This function will read bytes from the underlying stream until the |
22 | /// delimiter or EOF is found. Once found, all bytes up to, and including, |
23 | /// the delimiter (if found) will be appended to `buf`. |
24 | /// |
25 | /// If successful, this function will return the total number of bytes read. |
26 | /// |
27 | /// If this function returns `Ok(0)`, the stream has reached EOF. |
28 | /// |
29 | /// # Errors |
30 | /// |
31 | /// This function will ignore all instances of [`ErrorKind::Interrupted`] and |
32 | /// will otherwise return any errors returned by [`fill_buf`]. |
33 | /// |
34 | /// If an I/O error is encountered then all bytes read so far will be |
35 | /// present in `buf` and its length will have been adjusted appropriately. |
36 | /// |
37 | /// [`fill_buf`]: AsyncBufRead::poll_fill_buf |
38 | /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted |
39 | /// |
40 | /// # Cancel safety |
41 | /// |
42 | /// If the method is used as the event in a |
43 | /// [`tokio::select!`](crate::select) statement and some other branch |
44 | /// completes first, then some data may have been partially read. Any |
45 | /// partially read bytes are appended to `buf`, and the method can be |
46 | /// called again to continue reading until `byte`. |
47 | /// |
48 | /// This method returns the total number of bytes read. If you cancel |
49 | /// the call to `read_until` and then call it again to continue reading, |
50 | /// the counter is reset. |
51 | /// |
52 | /// # Examples |
53 | /// |
54 | /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In |
55 | /// this example, we use [`Cursor`] to read all the bytes in a byte slice |
56 | /// in hyphen delimited segments: |
57 | /// |
58 | /// [`Cursor`]: std::io::Cursor |
59 | /// |
60 | /// ``` |
61 | /// use tokio::io::AsyncBufReadExt; |
62 | /// |
63 | /// use std::io::Cursor; |
64 | /// |
65 | /// #[tokio::main] |
66 | /// async fn main() { |
67 | /// let mut cursor = Cursor::new(b"lorem-ipsum"); |
68 | /// let mut buf = vec![]; |
69 | /// |
70 | /// // cursor is at 'l' |
71 | /// let num_bytes = cursor.read_until(b'-', &mut buf) |
72 | /// .await |
73 | /// .expect("reading from cursor won't fail"); |
74 | /// |
75 | /// assert_eq!(num_bytes, 6); |
76 | /// assert_eq!(buf, b"lorem-"); |
77 | /// buf.clear(); |
78 | /// |
79 | /// // cursor is at 'i' |
80 | /// let num_bytes = cursor.read_until(b'-', &mut buf) |
81 | /// .await |
82 | /// .expect("reading from cursor won't fail"); |
83 | /// |
84 | /// assert_eq!(num_bytes, 5); |
85 | /// assert_eq!(buf, b"ipsum"); |
86 | /// buf.clear(); |
87 | /// |
88 | /// // cursor is at EOF |
89 | /// let num_bytes = cursor.read_until(b'-', &mut buf) |
90 | /// .await |
91 | /// .expect("reading from cursor won't fail"); |
92 | /// assert_eq!(num_bytes, 0); |
93 | /// assert_eq!(buf, b""); |
94 | /// } |
95 | /// ``` |
96 | fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self> |
97 | where |
98 | Self: Unpin, |
99 | { |
100 | read_until(self, byte, buf) |
101 | } |
102 | |
103 | /// Reads all bytes until a newline (the 0xA byte) is reached, and append |
104 | /// them to the provided buffer. |
105 | /// |
106 | /// Equivalent to: |
107 | /// |
108 | /// ```ignore |
109 | /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>; |
110 | /// ``` |
111 | /// |
112 | /// This function will read bytes from the underlying stream until the |
113 | /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes |
114 | /// up to, and including, the delimiter (if found) will be appended to |
115 | /// `buf`. |
116 | /// |
117 | /// If successful, this function will return the total number of bytes read. |
118 | /// |
119 | /// If this function returns `Ok(0)`, the stream has reached EOF. |
120 | /// |
121 | /// # Errors |
122 | /// |
123 | /// This function has the same error semantics as [`read_until`] and will |
124 | /// also return an error if the read bytes are not valid UTF-8. If an I/O |
125 | /// error is encountered then `buf` may contain some bytes already read in |
126 | /// the event that all data read so far was valid UTF-8. |
127 | /// |
128 | /// [`read_until`]: AsyncBufReadExt::read_until |
129 | /// |
130 | /// # Cancel safety |
131 | /// |
132 | /// This method is not cancellation safe. If the method is used as the |
133 | /// event in a [`tokio::select!`](crate::select) statement and some |
134 | /// other branch completes first, then some data may have been partially |
135 | /// read, and this data is lost. There are no guarantees regarding the |
136 | /// contents of `buf` when the call is cancelled. The current |
137 | /// implementation replaces `buf` with the empty string, but this may |
138 | /// change in the future. |
139 | /// |
140 | /// This function does not behave like [`read_until`] because of the |
141 | /// requirement that a string contains only valid utf-8. If you need a |
142 | /// cancellation safe `read_line`, there are three options: |
143 | /// |
144 | /// * Call [`read_until`] with a newline character and manually perform the utf-8 check. |
145 | /// * The stream returned by [`lines`] has a cancellation safe |
146 | /// [`next_line`] method. |
147 | /// * Use [`tokio_util::codec::LinesCodec`][LinesCodec]. |
148 | /// |
149 | /// [LinesCodec]: https://docs.rs/tokio-util/latest/tokio_util/codec/struct.LinesCodec.html |
150 | /// [`read_until`]: Self::read_until |
151 | /// [`lines`]: Self::lines |
152 | /// [`next_line`]: crate::io::Lines::next_line |
153 | /// |
154 | /// # Examples |
155 | /// |
156 | /// [`std::io::Cursor`][`Cursor`] is a type that implements |
157 | /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the |
158 | /// lines in a byte slice: |
159 | /// |
160 | /// [`Cursor`]: std::io::Cursor |
161 | /// |
162 | /// ``` |
163 | /// use tokio::io::AsyncBufReadExt; |
164 | /// |
165 | /// use std::io::Cursor; |
166 | /// |
167 | /// #[tokio::main] |
168 | /// async fn main() { |
169 | /// let mut cursor = Cursor::new(b"foo\nbar"); |
170 | /// let mut buf = String::new(); |
171 | /// |
172 | /// // cursor is at 'f' |
173 | /// let num_bytes = cursor.read_line(&mut buf) |
174 | /// .await |
175 | /// .expect("reading from cursor won't fail"); |
176 | /// |
177 | /// assert_eq!(num_bytes, 4); |
178 | /// assert_eq!(buf, "foo\n"); |
179 | /// buf.clear(); |
180 | /// |
181 | /// // cursor is at 'b' |
182 | /// let num_bytes = cursor.read_line(&mut buf) |
183 | /// .await |
184 | /// .expect("reading from cursor won't fail"); |
185 | /// |
186 | /// assert_eq!(num_bytes, 3); |
187 | /// assert_eq!(buf, "bar"); |
188 | /// buf.clear(); |
189 | /// |
190 | /// // cursor is at EOF |
191 | /// let num_bytes = cursor.read_line(&mut buf) |
192 | /// .await |
193 | /// .expect("reading from cursor won't fail"); |
194 | /// |
195 | /// assert_eq!(num_bytes, 0); |
196 | /// assert_eq!(buf, ""); |
197 | /// } |
198 | /// ``` |
199 | fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> |
200 | where |
201 | Self: Unpin, |
202 | { |
203 | read_line(self, buf) |
204 | } |
205 | |
206 | /// Returns a stream of the contents of this reader split on the byte |
207 | /// `byte`. |
208 | /// |
209 | /// This method is the asynchronous equivalent to |
210 | /// [`BufRead::split`](std::io::BufRead::split). |
211 | /// |
212 | /// The stream returned from this function will yield instances of |
213 | /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have |
214 | /// the delimiter byte at the end. |
215 | /// |
216 | /// [`io::Result`]: std::io::Result |
217 | /// [`Option`]: core::option::Option |
218 | /// [`Vec<u8>`]: std::vec::Vec |
219 | /// |
220 | /// # Errors |
221 | /// |
222 | /// Each item of the stream has the same error semantics as |
223 | /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until). |
224 | /// |
225 | /// # Examples |
226 | /// |
227 | /// ``` |
228 | /// # use tokio::io::AsyncBufRead; |
229 | /// use tokio::io::AsyncBufReadExt; |
230 | /// |
231 | /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> { |
232 | /// let mut segments = my_buf_read.split(b'f'); |
233 | /// |
234 | /// while let Some(segment) = segments.next_segment().await? { |
235 | /// println!("length = {}", segment.len()) |
236 | /// } |
237 | /// # Ok(()) |
238 | /// # } |
239 | /// ``` |
240 | fn split(self, byte: u8) -> Split<Self> |
241 | where |
242 | Self: Sized + Unpin, |
243 | { |
244 | split(self, byte) |
245 | } |
246 | |
247 | /// Returns the contents of the internal buffer, filling it with more |
248 | /// data from the inner reader if it is empty. |
249 | /// |
250 | /// This function is a lower-level call. It needs to be paired with the |
251 | /// [`consume`] method to function properly. When calling this method, |
252 | /// none of the contents will be "read" in the sense that later calling |
253 | /// `read` may return the same contents. As such, [`consume`] must be |
254 | /// called with the number of bytes that are consumed from this buffer |
255 | /// to ensure that the bytes are never returned twice. |
256 | /// |
257 | /// An empty buffer returned indicates that the stream has reached EOF. |
258 | /// |
259 | /// Equivalent to: |
260 | /// |
261 | /// ```ignore |
262 | /// async fn fill_buf(&mut self) -> io::Result<&[u8]>; |
263 | /// ``` |
264 | /// |
265 | /// # Errors |
266 | /// |
267 | /// This function will return an I/O error if the underlying reader was |
268 | /// read, but returned an error. |
269 | /// |
270 | /// [`consume`]: crate::io::AsyncBufReadExt::consume |
271 | fn fill_buf(&mut self) -> FillBuf<'_, Self> |
272 | where |
273 | Self: Unpin, |
274 | { |
275 | fill_buf(self) |
276 | } |
277 | |
278 | /// Tells this buffer that `amt` bytes have been consumed from the |
279 | /// buffer, so they should no longer be returned in calls to [`read`]. |
280 | /// |
281 | /// This function is a lower-level call. It needs to be paired with the |
282 | /// [`fill_buf`] method to function properly. This function does not |
283 | /// perform any I/O, it simply informs this object that some amount of |
284 | /// its buffer, returned from [`fill_buf`], has been consumed and should |
285 | /// no longer be returned. As such, this function may do odd things if |
286 | /// [`fill_buf`] isn't called before calling it. |
287 | /// |
288 | /// The `amt` must be less than the number of bytes in the buffer |
289 | /// returned by [`fill_buf`]. |
290 | /// |
291 | /// [`read`]: crate::io::AsyncReadExt::read |
292 | /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf |
293 | fn consume(&mut self, amt: usize) |
294 | where |
295 | Self: Unpin, |
296 | { |
297 | std::pin::Pin::new(self).consume(amt) |
298 | } |
299 | |
300 | /// Returns a stream over the lines of this reader. |
301 | /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). |
302 | /// |
303 | /// The stream returned from this function will yield instances of |
304 | /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline |
305 | /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. |
306 | /// |
307 | /// [`io::Result`]: std::io::Result |
308 | /// [`Option`]: core::option::Option |
309 | /// [`String`]: String |
310 | /// |
311 | /// # Errors |
312 | /// |
313 | /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. |
314 | /// |
315 | /// # Examples |
316 | /// |
317 | /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In |
318 | /// this example, we use [`Cursor`] to iterate over all the lines in a byte |
319 | /// slice. |
320 | /// |
321 | /// [`Cursor`]: std::io::Cursor |
322 | /// |
323 | /// ``` |
324 | /// use tokio::io::AsyncBufReadExt; |
325 | /// |
326 | /// use std::io::Cursor; |
327 | /// |
328 | /// #[tokio::main] |
329 | /// async fn main() { |
330 | /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); |
331 | /// |
332 | /// let mut lines = cursor.lines(); |
333 | /// |
334 | /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem"))); |
335 | /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum"))); |
336 | /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor"))); |
337 | /// assert_eq!(lines.next_line().await.unwrap(), None); |
338 | /// } |
339 | /// ``` |
340 | /// |
341 | /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line |
342 | fn lines(self) -> Lines<Self> |
343 | where |
344 | Self: Sized, |
345 | { |
346 | lines(self) |
347 | } |
348 | } |
349 | } |
350 | |
351 | impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} |
352 | |