1use futures_core::future::Future;
2use futures_core::ready;
3use futures_core::task::{Context, Poll};
4use futures_io::AsyncBufRead;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8
9/// Future for the [`read_until`](super::AsyncBufReadExt::read_until) method.
10#[derive(Debug)]
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct ReadUntil<'a, R: ?Sized> {
13 reader: &'a mut R,
14 byte: u8,
15 buf: &'a mut Vec<u8>,
16 read: usize,
17}
18
19impl<R: ?Sized + Unpin> Unpin for ReadUntil<'_, R> {}
20
21impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
22 pub(super) fn new(reader: &'a mut R, byte: u8, buf: &'a mut Vec<u8>) -> Self {
23 Self { reader, byte, buf, read: 0 }
24 }
25}
26
27pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
28 mut reader: Pin<&mut R>,
29 cx: &mut Context<'_>,
30 byte: u8,
31 buf: &mut Vec<u8>,
32 read: &mut usize,
33) -> Poll<io::Result<usize>> {
34 loop {
35 let (done: bool, used: usize) = {
36 let available: &[u8] = ready!(reader.as_mut().poll_fill_buf(cx))?;
37 if let Some(i: usize) = memchr::memchr(needle:byte, haystack:available) {
38 buf.extend_from_slice(&available[..=i]);
39 (true, i + 1)
40 } else {
41 buf.extend_from_slice(available);
42 (false, available.len())
43 }
44 };
45 reader.as_mut().consume(amt:used);
46 *read += used;
47 if done || used == 0 {
48 return Poll::Ready(Ok(mem::replace(dest:read, src:0)));
49 }
50 }
51}
52
53impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {
54 type Output = io::Result<usize>;
55
56 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57 let Self { reader: &mut &mut R, byte: &mut u8, buf: &mut &mut Vec, read: &mut usize } = &mut *self;
58 read_until_internal(reader:Pin::new(pointer:reader), cx, *byte, buf, read)
59 }
60}
61