1use std::pin::Pin;
2
3use crate::io::{self, Read};
4use crate::stream::stream::Stream;
5use crate::task::{Context, Poll};
6
7/// A stream over `u8` values of a reader.
8///
9/// This struct is generally created by calling [`bytes`] on a reader.
10/// Please see the documentation of [`bytes`] for more details.
11///
12/// [`bytes`]: trait.Read.html#method.bytes
13#[derive(Debug)]
14pub struct Bytes<T> {
15 pub(crate) inner: T,
16}
17
18impl<T: Read + Unpin> Stream for Bytes<T> {
19 type Item = io::Result<u8>;
20
21 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
22 let mut byte: u8 = 0;
23
24 let rd: Pin<&mut T> = Pin::new(&mut self.inner);
25
26 match futures_core::ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
27 Ok(0) => Poll::Ready(None),
28 Ok(..) => Poll::Ready(Some(Ok(byte))),
29 Err(ref e: &Error) if e.kind() == io::ErrorKind::Interrupted => Poll::Pending,
30 Err(e: Error) => Poll::Ready(Some(Err(e))),
31 }
32 }
33}
34
35#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))]
36mod tests {
37 use crate::io;
38 use crate::prelude::*;
39 use crate::task;
40
41 #[test]
42 fn test_bytes_basics() -> std::io::Result<()> {
43 task::block_on(async move {
44 let raw: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8];
45 let source: io::Cursor<Vec<u8>> = io::Cursor::new(raw.clone());
46
47 let mut s = source.bytes();
48
49 // TODO(@dignifiedquire): Use collect, once it is stable.
50 let mut result = Vec::new();
51 while let Some(byte) = s.next().await {
52 result.push(byte?);
53 }
54
55 assert_eq!(result, raw);
56
57 Ok(())
58 })
59 }
60}
61