1use crate::io::{AsyncRead, ReadBuf};
2
3use bytes::Buf;
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::io;
7use std::io::ErrorKind::UnexpectedEof;
8use std::marker::PhantomPinned;
9use std::mem::size_of;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13macro_rules! reader {
14 ($name:ident, $ty:ty, $reader:ident) => {
15 reader!($name, $ty, $reader, size_of::<$ty>());
16 };
17 ($name:ident, $ty:ty, $reader:ident, $bytes:expr) => {
18 pin_project! {
19 #[doc(hidden)]
20 #[must_use = "futures do nothing unless you `.await` or poll them"]
21 pub struct $name<R> {
22 #[pin]
23 src: R,
24 buf: [u8; $bytes],
25 read: u8,
26 // Make this future `!Unpin` for compatibility with async trait methods.
27 #[pin]
28 _pin: PhantomPinned,
29 }
30 }
31
32 impl<R> $name<R> {
33 pub(crate) fn new(src: R) -> Self {
34 $name {
35 src,
36 buf: [0; $bytes],
37 read: 0,
38 _pin: PhantomPinned,
39 }
40 }
41 }
42
43 impl<R> Future for $name<R>
44 where
45 R: AsyncRead,
46 {
47 type Output = io::Result<$ty>;
48
49 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 let mut me = self.project();
51
52 if *me.read == $bytes as u8 {
53 return Poll::Ready(Ok(Buf::$reader(&mut &me.buf[..])));
54 }
55
56 while *me.read < $bytes as u8 {
57 let mut buf = ReadBuf::new(&mut me.buf[*me.read as usize..]);
58
59 *me.read += match me.src.as_mut().poll_read(cx, &mut buf) {
60 Poll::Pending => return Poll::Pending,
61 Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
62 Poll::Ready(Ok(())) => {
63 let n = buf.filled().len();
64 if n == 0 {
65 return Poll::Ready(Err(UnexpectedEof.into()));
66 }
67
68 n as u8
69 }
70 };
71 }
72
73 let num = Buf::$reader(&mut &me.buf[..]);
74
75 Poll::Ready(Ok(num))
76 }
77 }
78 };
79}
80
81macro_rules! reader8 {
82 ($name:ident, $ty:ty) => {
83 pin_project! {
84 /// Future returned from `read_u8`
85 #[doc(hidden)]
86 #[must_use = "futures do nothing unless you `.await` or poll them"]
87 pub struct $name<R> {
88 #[pin]
89 reader: R,
90 // Make this future `!Unpin` for compatibility with async trait methods.
91 #[pin]
92 _pin: PhantomPinned,
93 }
94 }
95
96 impl<R> $name<R> {
97 pub(crate) fn new(reader: R) -> $name<R> {
98 $name {
99 reader,
100 _pin: PhantomPinned,
101 }
102 }
103 }
104
105 impl<R> Future for $name<R>
106 where
107 R: AsyncRead,
108 {
109 type Output = io::Result<$ty>;
110
111 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112 let me = self.project();
113
114 let mut buf = [0; 1];
115 let mut buf = ReadBuf::new(&mut buf);
116 match me.reader.poll_read(cx, &mut buf) {
117 Poll::Pending => Poll::Pending,
118 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
119 Poll::Ready(Ok(())) => {
120 if buf.filled().len() == 0 {
121 return Poll::Ready(Err(UnexpectedEof.into()));
122 }
123
124 Poll::Ready(Ok(buf.filled()[0] as $ty))
125 }
126 }
127 }
128 }
129 };
130}
131
132reader8!(ReadU8, u8);
133reader8!(ReadI8, i8);
134
135reader!(ReadU16, u16, get_u16);
136reader!(ReadU32, u32, get_u32);
137reader!(ReadU64, u64, get_u64);
138reader!(ReadU128, u128, get_u128);
139
140reader!(ReadI16, i16, get_i16);
141reader!(ReadI32, i32, get_i32);
142reader!(ReadI64, i64, get_i64);
143reader!(ReadI128, i128, get_i128);
144
145reader!(ReadF32, f32, get_f32);
146reader!(ReadF64, f64, get_f64);
147
148reader!(ReadU16Le, u16, get_u16_le);
149reader!(ReadU32Le, u32, get_u32_le);
150reader!(ReadU64Le, u64, get_u64_le);
151reader!(ReadU128Le, u128, get_u128_le);
152
153reader!(ReadI16Le, i16, get_i16_le);
154reader!(ReadI32Le, i32, get_i32_le);
155reader!(ReadI64Le, i64, get_i64_le);
156reader!(ReadI128Le, i128, get_i128_le);
157
158reader!(ReadF32Le, f32, get_f32_le);
159reader!(ReadF64Le, f64, get_f64_le);
160