1 | use crate::io::util::read_until::read_until_internal; |
2 | use crate::io::AsyncBufRead; |
3 | |
4 | use pin_project_lite::pin_project; |
5 | use std::io; |
6 | use std::mem; |
7 | use std::pin::Pin; |
8 | use std::task::{Context, Poll}; |
9 | |
10 | pin_project! { |
11 | /// Splitter for the [`split`](crate::io::AsyncBufReadExt::split) method. |
12 | /// |
13 | /// A `Split` can be turned into a `Stream` with [`SplitStream`]. |
14 | /// |
15 | /// [`SplitStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.SplitStream.html |
16 | #[derive(Debug)] |
17 | #[must_use = "streams do nothing unless polled" ] |
18 | #[cfg_attr(docsrs, doc(cfg(feature = "io-util" )))] |
19 | pub struct Split<R> { |
20 | #[pin] |
21 | reader: R, |
22 | buf: Vec<u8>, |
23 | delim: u8, |
24 | read: usize, |
25 | } |
26 | } |
27 | |
28 | pub(crate) fn split<R>(reader: R, delim: u8) -> Split<R> |
29 | where |
30 | R: AsyncBufRead, |
31 | { |
32 | Split { |
33 | reader, |
34 | buf: Vec::new(), |
35 | delim, |
36 | read: 0, |
37 | } |
38 | } |
39 | |
40 | impl<R> Split<R> |
41 | where |
42 | R: AsyncBufRead + Unpin, |
43 | { |
44 | /// Returns the next segment in the stream. |
45 | /// |
46 | /// # Examples |
47 | /// |
48 | /// ``` |
49 | /// # use tokio::io::AsyncBufRead; |
50 | /// use tokio::io::AsyncBufReadExt; |
51 | /// |
52 | /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> { |
53 | /// let mut segments = my_buf_read.split(b'f' ); |
54 | /// |
55 | /// while let Some(segment) = segments.next_segment().await? { |
56 | /// println!("length = {}" , segment.len()) |
57 | /// } |
58 | /// # Ok(()) |
59 | /// # } |
60 | /// ``` |
61 | pub async fn next_segment(&mut self) -> io::Result<Option<Vec<u8>>> { |
62 | use crate::future::poll_fn; |
63 | |
64 | poll_fn(|cx| Pin::new(&mut *self).poll_next_segment(cx)).await |
65 | } |
66 | } |
67 | |
68 | impl<R> Split<R> |
69 | where |
70 | R: AsyncBufRead, |
71 | { |
72 | /// Polls for the next segment in the stream. |
73 | /// |
74 | /// This method returns: |
75 | /// |
76 | /// * `Poll::Pending` if the next segment is not yet available. |
77 | /// * `Poll::Ready(Ok(Some(segment)))` if the next segment is available. |
78 | /// * `Poll::Ready(Ok(None))` if there are no more segments in this stream. |
79 | /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the |
80 | /// next segment. |
81 | /// |
82 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
83 | /// `Context` is scheduled to receive a wakeup when more bytes become |
84 | /// available on the underlying IO resource. |
85 | /// |
86 | /// Note that on multiple calls to `poll_next_segment`, only the `Waker` |
87 | /// from the `Context` passed to the most recent call is scheduled to |
88 | /// receive a wakeup. |
89 | pub fn poll_next_segment( |
90 | self: Pin<&mut Self>, |
91 | cx: &mut Context<'_>, |
92 | ) -> Poll<io::Result<Option<Vec<u8>>>> { |
93 | let me = self.project(); |
94 | |
95 | let n = ready!(read_until_internal( |
96 | me.reader, cx, *me.delim, me.buf, me.read, |
97 | ))?; |
98 | // read_until_internal resets me.read to zero once it finds the delimiter |
99 | debug_assert_eq!(*me.read, 0); |
100 | |
101 | if n == 0 && me.buf.is_empty() { |
102 | return Poll::Ready(Ok(None)); |
103 | } |
104 | |
105 | if me.buf.last() == Some(me.delim) { |
106 | me.buf.pop(); |
107 | } |
108 | |
109 | Poll::Ready(Ok(Some(mem::take(me.buf)))) |
110 | } |
111 | } |
112 | |
113 | #[cfg (test)] |
114 | mod tests { |
115 | use super::*; |
116 | |
117 | #[test] |
118 | fn assert_unpin() { |
119 | crate::is_unpin::<Split<()>>(); |
120 | } |
121 | } |
122 | |