| 1 | use crate::io::util::fill_buf::{fill_buf, FillBuf}; |
| 2 | use crate::io::util::lines::{lines, Lines}; |
| 3 | use crate::io::util::read_line::{read_line, ReadLine}; |
| 4 | use crate::io::util::read_until::{read_until, ReadUntil}; |
| 5 | use crate::io::util::split::{split, Split}; |
| 6 | use crate::io::AsyncBufRead; |
| 7 | |
| 8 | cfg_io_util! { |
| 9 | /// An extension trait which adds utility methods to [`AsyncBufRead`] types. |
| 10 | /// |
| 11 | /// [`AsyncBufRead`]: crate::io::AsyncBufRead |
| 12 | pub trait AsyncBufReadExt: AsyncBufRead { |
| 13 | /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. |
| 14 | /// |
| 15 | /// Equivalent to: |
| 16 | /// |
| 17 | /// ```ignore |
| 18 | /// async fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize>; |
| 19 | /// ``` |
| 20 | /// |
| 21 | /// This function will read bytes from the underlying stream until the |
| 22 | /// delimiter or EOF is found. Once found, all bytes up to, and including, |
| 23 | /// the delimiter (if found) will be appended to `buf`. |
| 24 | /// |
| 25 | /// If successful, this function will return the total number of bytes read. |
| 26 | /// |
| 27 | /// If this function returns `Ok(0)`, the stream has reached EOF. |
| 28 | /// |
| 29 | /// # Errors |
| 30 | /// |
| 31 | /// This function will ignore all instances of [`ErrorKind::Interrupted`] and |
| 32 | /// will otherwise return any errors returned by [`fill_buf`]. |
| 33 | /// |
| 34 | /// If an I/O error is encountered then all bytes read so far will be |
| 35 | /// present in `buf` and its length will have been adjusted appropriately. |
| 36 | /// |
| 37 | /// [`fill_buf`]: AsyncBufRead::poll_fill_buf |
| 38 | /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted |
| 39 | /// |
| 40 | /// # Cancel safety |
| 41 | /// |
| 42 | /// If the method is used as the event in a |
| 43 | /// [`tokio::select!`](crate::select) statement and some other branch |
| 44 | /// completes first, then some data may have been partially read. Any |
| 45 | /// partially read bytes are appended to `buf`, and the method can be |
| 46 | /// called again to continue reading until `byte`. |
| 47 | /// |
| 48 | /// This method returns the total number of bytes read. If you cancel |
| 49 | /// the call to `read_until` and then call it again to continue reading, |
| 50 | /// the counter is reset. |
| 51 | /// |
| 52 | /// # Examples |
| 53 | /// |
| 54 | /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In |
| 55 | /// this example, we use [`Cursor`] to read all the bytes in a byte slice |
| 56 | /// in hyphen delimited segments: |
| 57 | /// |
| 58 | /// [`Cursor`]: std::io::Cursor |
| 59 | /// |
| 60 | /// ``` |
| 61 | /// use tokio::io::AsyncBufReadExt; |
| 62 | /// |
| 63 | /// use std::io::Cursor; |
| 64 | /// |
| 65 | /// #[tokio::main] |
| 66 | /// async fn main() { |
| 67 | /// let mut cursor = Cursor::new(b"lorem-ipsum"); |
| 68 | /// let mut buf = vec![]; |
| 69 | /// |
| 70 | /// // cursor is at 'l' |
| 71 | /// let num_bytes = cursor.read_until(b'-', &mut buf) |
| 72 | /// .await |
| 73 | /// .expect("reading from cursor won't fail"); |
| 74 | /// |
| 75 | /// assert_eq!(num_bytes, 6); |
| 76 | /// assert_eq!(buf, b"lorem-"); |
| 77 | /// buf.clear(); |
| 78 | /// |
| 79 | /// // cursor is at 'i' |
| 80 | /// let num_bytes = cursor.read_until(b'-', &mut buf) |
| 81 | /// .await |
| 82 | /// .expect("reading from cursor won't fail"); |
| 83 | /// |
| 84 | /// assert_eq!(num_bytes, 5); |
| 85 | /// assert_eq!(buf, b"ipsum"); |
| 86 | /// buf.clear(); |
| 87 | /// |
| 88 | /// // cursor is at EOF |
| 89 | /// let num_bytes = cursor.read_until(b'-', &mut buf) |
| 90 | /// .await |
| 91 | /// .expect("reading from cursor won't fail"); |
| 92 | /// assert_eq!(num_bytes, 0); |
| 93 | /// assert_eq!(buf, b""); |
| 94 | /// } |
| 95 | /// ``` |
| 96 | fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self> |
| 97 | where |
| 98 | Self: Unpin, |
| 99 | { |
| 100 | read_until(self, byte, buf) |
| 101 | } |
| 102 | |
| 103 | /// Reads all bytes until a newline (the 0xA byte) is reached, and append |
| 104 | /// them to the provided buffer. |
| 105 | /// |
| 106 | /// Equivalent to: |
| 107 | /// |
| 108 | /// ```ignore |
| 109 | /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>; |
| 110 | /// ``` |
| 111 | /// |
| 112 | /// This function will read bytes from the underlying stream until the |
| 113 | /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes |
| 114 | /// up to, and including, the delimiter (if found) will be appended to |
| 115 | /// `buf`. |
| 116 | /// |
| 117 | /// If successful, this function will return the total number of bytes read. |
| 118 | /// |
| 119 | /// If this function returns `Ok(0)`, the stream has reached EOF. |
| 120 | /// |
| 121 | /// # Errors |
| 122 | /// |
| 123 | /// This function has the same error semantics as [`read_until`] and will |
| 124 | /// also return an error if the read bytes are not valid UTF-8. If an I/O |
| 125 | /// error is encountered then `buf` may contain some bytes already read in |
| 126 | /// the event that all data read so far was valid UTF-8. |
| 127 | /// |
| 128 | /// [`read_until`]: AsyncBufReadExt::read_until |
| 129 | /// |
| 130 | /// # Cancel safety |
| 131 | /// |
| 132 | /// This method is not cancellation safe. If the method is used as the |
| 133 | /// event in a [`tokio::select!`](crate::select) statement and some |
| 134 | /// other branch completes first, then some data may have been partially |
| 135 | /// read, and this data is lost. There are no guarantees regarding the |
| 136 | /// contents of `buf` when the call is cancelled. The current |
| 137 | /// implementation replaces `buf` with the empty string, but this may |
| 138 | /// change in the future. |
| 139 | /// |
| 140 | /// This function does not behave like [`read_until`] because of the |
| 141 | /// requirement that a string contains only valid utf-8. If you need a |
| 142 | /// cancellation safe `read_line`, there are three options: |
| 143 | /// |
| 144 | /// * Call [`read_until`] with a newline character and manually perform the utf-8 check. |
| 145 | /// * The stream returned by [`lines`] has a cancellation safe |
| 146 | /// [`next_line`] method. |
| 147 | /// * Use [`tokio_util::codec::LinesCodec`][LinesCodec]. |
| 148 | /// |
| 149 | /// [LinesCodec]: https://docs.rs/tokio-util/latest/tokio_util/codec/struct.LinesCodec.html |
| 150 | /// [`read_until`]: Self::read_until |
| 151 | /// [`lines`]: Self::lines |
| 152 | /// [`next_line`]: crate::io::Lines::next_line |
| 153 | /// |
| 154 | /// # Examples |
| 155 | /// |
| 156 | /// [`std::io::Cursor`][`Cursor`] is a type that implements |
| 157 | /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the |
| 158 | /// lines in a byte slice: |
| 159 | /// |
| 160 | /// [`Cursor`]: std::io::Cursor |
| 161 | /// |
| 162 | /// ``` |
| 163 | /// use tokio::io::AsyncBufReadExt; |
| 164 | /// |
| 165 | /// use std::io::Cursor; |
| 166 | /// |
| 167 | /// #[tokio::main] |
| 168 | /// async fn main() { |
| 169 | /// let mut cursor = Cursor::new(b"foo\nbar"); |
| 170 | /// let mut buf = String::new(); |
| 171 | /// |
| 172 | /// // cursor is at 'f' |
| 173 | /// let num_bytes = cursor.read_line(&mut buf) |
| 174 | /// .await |
| 175 | /// .expect("reading from cursor won't fail"); |
| 176 | /// |
| 177 | /// assert_eq!(num_bytes, 4); |
| 178 | /// assert_eq!(buf, "foo\n"); |
| 179 | /// buf.clear(); |
| 180 | /// |
| 181 | /// // cursor is at 'b' |
| 182 | /// let num_bytes = cursor.read_line(&mut buf) |
| 183 | /// .await |
| 184 | /// .expect("reading from cursor won't fail"); |
| 185 | /// |
| 186 | /// assert_eq!(num_bytes, 3); |
| 187 | /// assert_eq!(buf, "bar"); |
| 188 | /// buf.clear(); |
| 189 | /// |
| 190 | /// // cursor is at EOF |
| 191 | /// let num_bytes = cursor.read_line(&mut buf) |
| 192 | /// .await |
| 193 | /// .expect("reading from cursor won't fail"); |
| 194 | /// |
| 195 | /// assert_eq!(num_bytes, 0); |
| 196 | /// assert_eq!(buf, ""); |
| 197 | /// } |
| 198 | /// ``` |
| 199 | fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> |
| 200 | where |
| 201 | Self: Unpin, |
| 202 | { |
| 203 | read_line(self, buf) |
| 204 | } |
| 205 | |
| 206 | /// Returns a stream of the contents of this reader split on the byte |
| 207 | /// `byte`. |
| 208 | /// |
| 209 | /// This method is the asynchronous equivalent to |
| 210 | /// [`BufRead::split`](std::io::BufRead::split). |
| 211 | /// |
| 212 | /// The stream returned from this function will yield instances of |
| 213 | /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have |
| 214 | /// the delimiter byte at the end. |
| 215 | /// |
| 216 | /// [`io::Result`]: std::io::Result |
| 217 | /// [`Option`]: core::option::Option |
| 218 | /// [`Vec<u8>`]: std::vec::Vec |
| 219 | /// |
| 220 | /// # Errors |
| 221 | /// |
| 222 | /// Each item of the stream has the same error semantics as |
| 223 | /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until). |
| 224 | /// |
| 225 | /// # Examples |
| 226 | /// |
| 227 | /// ``` |
| 228 | /// # use tokio::io::AsyncBufRead; |
| 229 | /// use tokio::io::AsyncBufReadExt; |
| 230 | /// |
| 231 | /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> { |
| 232 | /// let mut segments = my_buf_read.split(b'f'); |
| 233 | /// |
| 234 | /// while let Some(segment) = segments.next_segment().await? { |
| 235 | /// println!("length = {}", segment.len()) |
| 236 | /// } |
| 237 | /// # Ok(()) |
| 238 | /// # } |
| 239 | /// ``` |
| 240 | fn split(self, byte: u8) -> Split<Self> |
| 241 | where |
| 242 | Self: Sized + Unpin, |
| 243 | { |
| 244 | split(self, byte) |
| 245 | } |
| 246 | |
| 247 | /// Returns the contents of the internal buffer, filling it with more |
| 248 | /// data from the inner reader if it is empty. |
| 249 | /// |
| 250 | /// This function is a lower-level call. It needs to be paired with the |
| 251 | /// [`consume`] method to function properly. When calling this method, |
| 252 | /// none of the contents will be "read" in the sense that later calling |
| 253 | /// `read` may return the same contents. As such, [`consume`] must be |
| 254 | /// called with the number of bytes that are consumed from this buffer |
| 255 | /// to ensure that the bytes are never returned twice. |
| 256 | /// |
| 257 | /// An empty buffer returned indicates that the stream has reached EOF. |
| 258 | /// |
| 259 | /// Equivalent to: |
| 260 | /// |
| 261 | /// ```ignore |
| 262 | /// async fn fill_buf(&mut self) -> io::Result<&[u8]>; |
| 263 | /// ``` |
| 264 | /// |
| 265 | /// # Errors |
| 266 | /// |
| 267 | /// This function will return an I/O error if the underlying reader was |
| 268 | /// read, but returned an error. |
| 269 | /// |
| 270 | /// # Cancel safety |
| 271 | /// |
| 272 | /// This method is cancel safe. If you use it as the event in a |
| 273 | /// [`tokio::select!`](crate::select) statement and some other branch |
| 274 | /// completes first, then it is guaranteed that no data was read. |
| 275 | /// |
| 276 | /// [`consume`]: crate::io::AsyncBufReadExt::consume |
| 277 | fn fill_buf(&mut self) -> FillBuf<'_, Self> |
| 278 | where |
| 279 | Self: Unpin, |
| 280 | { |
| 281 | fill_buf(self) |
| 282 | } |
| 283 | |
| 284 | /// Tells this buffer that `amt` bytes have been consumed from the |
| 285 | /// buffer, so they should no longer be returned in calls to [`read`]. |
| 286 | /// |
| 287 | /// This function is a lower-level call. It needs to be paired with the |
| 288 | /// [`fill_buf`] method to function properly. This function does not |
| 289 | /// perform any I/O, it simply informs this object that some amount of |
| 290 | /// its buffer, returned from [`fill_buf`], has been consumed and should |
| 291 | /// no longer be returned. As such, this function may do odd things if |
| 292 | /// [`fill_buf`] isn't called before calling it. |
| 293 | /// |
| 294 | /// The `amt` must be less than the number of bytes in the buffer |
| 295 | /// returned by [`fill_buf`]. |
| 296 | /// |
| 297 | /// [`read`]: crate::io::AsyncReadExt::read |
| 298 | /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf |
| 299 | fn consume(&mut self, amt: usize) |
| 300 | where |
| 301 | Self: Unpin, |
| 302 | { |
| 303 | std::pin::Pin::new(self).consume(amt); |
| 304 | } |
| 305 | |
| 306 | /// Returns a stream over the lines of this reader. |
| 307 | /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). |
| 308 | /// |
| 309 | /// The stream returned from this function will yield instances of |
| 310 | /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline |
| 311 | /// byte (the 0xA byte) or `CRLF` (0xD, 0xA bytes) at the end. |
| 312 | /// |
| 313 | /// [`io::Result`]: std::io::Result |
| 314 | /// [`Option`]: core::option::Option |
| 315 | /// [`String`]: String |
| 316 | /// |
| 317 | /// # Errors |
| 318 | /// |
| 319 | /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. |
| 320 | /// |
| 321 | /// # Examples |
| 322 | /// |
| 323 | /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In |
| 324 | /// this example, we use [`Cursor`] to iterate over all the lines in a byte |
| 325 | /// slice. |
| 326 | /// |
| 327 | /// [`Cursor`]: std::io::Cursor |
| 328 | /// |
| 329 | /// ``` |
| 330 | /// use tokio::io::AsyncBufReadExt; |
| 331 | /// |
| 332 | /// use std::io::Cursor; |
| 333 | /// |
| 334 | /// #[tokio::main] |
| 335 | /// async fn main() { |
| 336 | /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); |
| 337 | /// |
| 338 | /// let mut lines = cursor.lines(); |
| 339 | /// |
| 340 | /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem"))); |
| 341 | /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum"))); |
| 342 | /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor"))); |
| 343 | /// assert_eq!(lines.next_line().await.unwrap(), None); |
| 344 | /// } |
| 345 | /// ``` |
| 346 | /// |
| 347 | /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line |
| 348 | fn lines(self) -> Lines<Self> |
| 349 | where |
| 350 | Self: Sized, |
| 351 | { |
| 352 | lines(self) |
| 353 | } |
| 354 | } |
| 355 | } |
| 356 | |
| 357 | impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} |
| 358 | |