1use crate::io::util::read_until::read_until_internal;
2use crate::io::AsyncBufRead;
3
4use pin_project_lite::pin_project;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project! {
11 /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method.
12 ///
13 /// A `Split` can be turned into a `Stream` with [`SplitStream`].
14 ///
15 /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html
16 #[derive(Debug)]
17 #[must_use = "streams do nothing unless polled"]
18 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
19 pub struct Split<R> {
20 #[pin]
21 reader: R,
22 buf: Vec<u8>,
23 delim: u8,
24 read: usize,
25 }
26}
27
28pub(crate) fn split<R>(reader: R, delim: u8) -> Split<R>
29where
30 R: AsyncBufRead,
31{
32 Split {
33 reader,
34 buf: Vec::new(),
35 delim,
36 read: 0,
37 }
38}
39
40impl<R> Split<R>
41where
42 R: AsyncBufRead + Unpin,
43{
44 /// Returns the next segment in the stream.
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// # use tokio::io::AsyncBufRead;
50 /// use tokio::io::AsyncBufReadExt;
51 ///
52 /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
53 /// let mut segments = my_buf_read.split(b'f');
54 ///
55 /// while let Some(segment) = segments.next_segment().await? {
56 /// println!("length = {}", segment.len())
57 /// }
58 /// # Ok(())
59 /// # }
60 /// ```
61 pub async fn next_segment(&mut self) -> io::Result<Option<Vec<u8>>> {
62 use crate::future::poll_fn;
63
64 poll_fn(|cx| Pin::new(&mut *self).poll_next_segment(cx)).await
65 }
66}
67
68impl<R> Split<R>
69where
70 R: AsyncBufRead,
71{
72 /// Polls for the next segment in the stream.
73 ///
74 /// This method returns:
75 ///
76 /// * `Poll::Pending` if the next segment is not yet available.
77 /// * `Poll::Ready(Ok(Some(segment)))` if the next segment is available.
78 /// * `Poll::Ready(Ok(None))` if there are no more segments in this stream.
79 /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the
80 /// next segment.
81 ///
82 /// When the method returns `Poll::Pending`, the `Waker` in the provided
83 /// `Context` is scheduled to receive a wakeup when more bytes become
84 /// available on the underlying IO resource.
85 ///
86 /// Note that on multiple calls to `poll_next_segment`, only the `Waker`
87 /// from the `Context` passed to the most recent call is scheduled to
88 /// receive a wakeup.
89 pub fn poll_next_segment(
90 self: Pin<&mut Self>,
91 cx: &mut Context<'_>,
92 ) -> Poll<io::Result<Option<Vec<u8>>>> {
93 let me = self.project();
94
95 let n = ready!(read_until_internal(
96 me.reader, cx, *me.delim, me.buf, me.read,
97 ))?;
98 // read_until_internal resets me.read to zero once it finds the delimiter
99 debug_assert_eq!(*me.read, 0);
100
101 if n == 0 && me.buf.is_empty() {
102 return Poll::Ready(Ok(None));
103 }
104
105 if me.buf.last() == Some(me.delim) {
106 me.buf.pop();
107 }
108
109 Poll::Ready(Ok(Some(mem::take(me.buf))))
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116
117 #[test]
118 fn assert_unpin() {
119 crate::is_unpin::<Split<()>>();
120 }
121}
122