1use crate::io::AsyncBufRead;
2use crate::util::memchr;
3
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::io;
7use std::marker::PhantomPinned;
8use std::mem;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12pin_project! {
13 /// Future for the [`read_until`](crate::io::AsyncBufReadExt::read_until) method.
14 /// The delimiter is included in the resulting vector.
15 #[derive(Debug)]
16 #[must_use = "futures do nothing unless you `.await` or poll them"]
17 pub struct ReadUntil<'a, R: ?Sized> {
18 reader: &'a mut R,
19 delimiter: u8,
20 buf: &'a mut Vec<u8>,
21 // The number of bytes appended to buf. This can be less than buf.len() if
22 // the buffer was not empty when the operation was started.
23 read: usize,
24 // Make this future `!Unpin` for compatibility with async trait methods.
25 #[pin]
26 _pin: PhantomPinned,
27 }
28}
29
30pub(crate) fn read_until<'a, R>(
31 reader: &'a mut R,
32 delimiter: u8,
33 buf: &'a mut Vec<u8>,
34) -> ReadUntil<'a, R>
35where
36 R: AsyncBufRead + ?Sized + Unpin,
37{
38 ReadUntil {
39 reader,
40 delimiter,
41 buf,
42 read: 0,
43 _pin: PhantomPinned,
44 }
45}
46
47pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
48 mut reader: Pin<&mut R>,
49 cx: &mut Context<'_>,
50 delimiter: u8,
51 buf: &mut Vec<u8>,
52 read: &mut usize,
53) -> Poll<io::Result<usize>> {
54 loop {
55 let (done, used) = {
56 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
57 if let Some(i) = memchr::memchr(delimiter, available) {
58 buf.extend_from_slice(&available[..=i]);
59 (true, i + 1)
60 } else {
61 buf.extend_from_slice(available);
62 (false, available.len())
63 }
64 };
65 reader.as_mut().consume(used);
66 *read += used;
67 if done || used == 0 {
68 return Poll::Ready(Ok(mem::replace(read, 0)));
69 }
70 }
71}
72
73impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
74 type Output = io::Result<usize>;
75
76 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77 let me = self.project();
78 read_until_internal(Pin::new(*me.reader), cx, *me.delimiter, me.buf, me.read)
79 }
80}
81