1use crate::io::util::read_line::read_line_internal;
2use crate::io::AsyncBufRead;
3
4use pin_project_lite::pin_project;
5use std::io;
6use std::mem;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10pin_project! {
11 /// Reads lines from an [`AsyncBufRead`].
12 ///
13 /// A `Lines` can be turned into a `Stream` with [`LinesStream`].
14 ///
15 /// This type is usually created using the [`lines`] method.
16 ///
17 /// [`AsyncBufRead`]: crate::io::AsyncBufRead
18 /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
19 /// [`lines`]: crate::io::AsyncBufReadExt::lines
20 #[derive(Debug)]
21 #[must_use = "streams do nothing unless polled"]
22 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
23 pub struct Lines<R> {
24 #[pin]
25 reader: R,
26 buf: String,
27 bytes: Vec<u8>,
28 read: usize,
29 }
30}
31
32pub(crate) fn lines<R>(reader: R) -> Lines<R>
33where
34 R: AsyncBufRead,
35{
36 Lines {
37 reader,
38 buf: String::new(),
39 bytes: Vec::new(),
40 read: 0,
41 }
42}
43
44impl<R> Lines<R>
45where
46 R: AsyncBufRead + Unpin,
47{
48 /// Returns the next line in the stream.
49 ///
50 /// # Cancel safety
51 ///
52 /// This method is cancellation safe.
53 ///
54 /// # Examples
55 ///
56 /// ```
57 /// # use tokio::io::AsyncBufRead;
58 /// use tokio::io::AsyncBufReadExt;
59 ///
60 /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
61 /// let mut lines = my_buf_read.lines();
62 ///
63 /// while let Some(line) = lines.next_line().await? {
64 /// println!("length = {}", line.len())
65 /// }
66 /// # Ok(())
67 /// # }
68 /// ```
69 pub async fn next_line(&mut self) -> io::Result<Option<String>> {
70 use crate::future::poll_fn;
71
72 poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
73 }
74
75 /// Obtains a mutable reference to the underlying reader.
76 pub fn get_mut(&mut self) -> &mut R {
77 &mut self.reader
78 }
79
80 /// Obtains a reference to the underlying reader.
81 pub fn get_ref(&mut self) -> &R {
82 &self.reader
83 }
84
85 /// Unwraps this `Lines<R>`, returning the underlying reader.
86 ///
87 /// Note that any leftover data in the internal buffer is lost.
88 /// Therefore, a following read from the underlying reader may lead to data loss.
89 pub fn into_inner(self) -> R {
90 self.reader
91 }
92}
93
94impl<R> Lines<R>
95where
96 R: AsyncBufRead,
97{
98 /// Polls for the next line in the stream.
99 ///
100 /// This method returns:
101 ///
102 /// * `Poll::Pending` if the next line is not yet available.
103 /// * `Poll::Ready(Ok(Some(line)))` if the next line is available.
104 /// * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
105 /// * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
106 ///
107 /// When the method returns `Poll::Pending`, the `Waker` in the provided
108 /// `Context` is scheduled to receive a wakeup when more bytes become
109 /// available on the underlying IO resource. Note that on multiple calls to
110 /// `poll_next_line`, only the `Waker` from the `Context` passed to the most
111 /// recent call is scheduled to receive a wakeup.
112 pub fn poll_next_line(
113 self: Pin<&mut Self>,
114 cx: &mut Context<'_>,
115 ) -> Poll<io::Result<Option<String>>> {
116 let me = self.project();
117
118 let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
119 debug_assert_eq!(*me.read, 0);
120
121 if n == 0 && me.buf.is_empty() {
122 return Poll::Ready(Ok(None));
123 }
124
125 if me.buf.ends_with('\n') {
126 me.buf.pop();
127
128 if me.buf.ends_with('\r') {
129 me.buf.pop();
130 }
131 }
132
133 Poll::Ready(Ok(Some(mem::take(me.buf))))
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn assert_unpin() {
143 crate::is_unpin::<Lines<()>>();
144 }
145}
146