1#![warn(rust_2018_idioms)]
2
3use tokio::io::{AsyncRead, ReadBuf};
4use tokio_test::assert_ready;
5use tokio_test::task;
6use tokio_util::codec::{Decoder, FramedRead};
7
8use bytes::{Buf, BytesMut};
9use futures::Stream;
10use std::collections::VecDeque;
11use std::io;
12use std::pin::Pin;
13use std::task::Poll::{Pending, Ready};
14use std::task::{Context, Poll};
15
16macro_rules! mock {
17 ($($x:expr,)*) => {{
18 let mut v = VecDeque::new();
19 v.extend(vec![$($x),*]);
20 Mock { calls: v }
21 }};
22}
23
24macro_rules! assert_read {
25 ($e:expr, $n:expr) => {{
26 let val = assert_ready!($e);
27 assert_eq!(val.unwrap().unwrap(), $n);
28 }};
29}
30
31macro_rules! pin {
32 ($id:ident) => {
33 Pin::new(&mut $id)
34 };
35}
36
37struct U32Decoder;
38
39impl Decoder for U32Decoder {
40 type Item = u32;
41 type Error = io::Error;
42
43 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
44 if buf.len() < 4 {
45 return Ok(None);
46 }
47
48 let n = buf.split_to(4).get_u32();
49 Ok(Some(n))
50 }
51}
52
53struct U64Decoder;
54
55impl Decoder for U64Decoder {
56 type Item = u64;
57 type Error = io::Error;
58
59 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> {
60 if buf.len() < 8 {
61 return Ok(None);
62 }
63
64 let n = buf.split_to(8).get_u64();
65 Ok(Some(n))
66 }
67}
68
69#[test]
70fn read_multi_frame_in_packet() {
71 let mut task = task::spawn(());
72 let mock = mock! {
73 Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
74 };
75 let mut framed = FramedRead::new(mock, U32Decoder);
76
77 task.enter(|cx, _| {
78 assert_read!(pin!(framed).poll_next(cx), 0);
79 assert_read!(pin!(framed).poll_next(cx), 1);
80 assert_read!(pin!(framed).poll_next(cx), 2);
81 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
82 });
83}
84
85#[test]
86fn read_multi_frame_across_packets() {
87 let mut task = task::spawn(());
88 let mock = mock! {
89 Ok(b"\x00\x00\x00\x00".to_vec()),
90 Ok(b"\x00\x00\x00\x01".to_vec()),
91 Ok(b"\x00\x00\x00\x02".to_vec()),
92 };
93 let mut framed = FramedRead::new(mock, U32Decoder);
94
95 task.enter(|cx, _| {
96 assert_read!(pin!(framed).poll_next(cx), 0);
97 assert_read!(pin!(framed).poll_next(cx), 1);
98 assert_read!(pin!(framed).poll_next(cx), 2);
99 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
100 });
101}
102
103#[test]
104fn read_multi_frame_in_packet_after_codec_changed() {
105 let mut task = task::spawn(());
106 let mock = mock! {
107 Ok(b"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08".to_vec()),
108 };
109 let mut framed = FramedRead::new(mock, U32Decoder);
110
111 task.enter(|cx, _| {
112 assert_read!(pin!(framed).poll_next(cx), 0x04);
113
114 let mut framed = framed.map_decoder(|_| U64Decoder);
115 assert_read!(pin!(framed).poll_next(cx), 0x08);
116
117 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
118 });
119}
120
121#[test]
122fn read_not_ready() {
123 let mut task = task::spawn(());
124 let mock = mock! {
125 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
126 Ok(b"\x00\x00\x00\x00".to_vec()),
127 Ok(b"\x00\x00\x00\x01".to_vec()),
128 };
129 let mut framed = FramedRead::new(mock, U32Decoder);
130
131 task.enter(|cx, _| {
132 assert!(pin!(framed).poll_next(cx).is_pending());
133 assert_read!(pin!(framed).poll_next(cx), 0);
134 assert_read!(pin!(framed).poll_next(cx), 1);
135 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
136 });
137}
138
139#[test]
140fn read_partial_then_not_ready() {
141 let mut task = task::spawn(());
142 let mock = mock! {
143 Ok(b"\x00\x00".to_vec()),
144 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
145 Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
146 };
147 let mut framed = FramedRead::new(mock, U32Decoder);
148
149 task.enter(|cx, _| {
150 assert!(pin!(framed).poll_next(cx).is_pending());
151 assert_read!(pin!(framed).poll_next(cx), 0);
152 assert_read!(pin!(framed).poll_next(cx), 1);
153 assert_read!(pin!(framed).poll_next(cx), 2);
154 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
155 });
156}
157
158#[test]
159fn read_err() {
160 let mut task = task::spawn(());
161 let mock = mock! {
162 Err(io::Error::new(io::ErrorKind::Other, "")),
163 };
164 let mut framed = FramedRead::new(mock, U32Decoder);
165
166 task.enter(|cx, _| {
167 assert_eq!(
168 io::ErrorKind::Other,
169 assert_ready!(pin!(framed).poll_next(cx))
170 .unwrap()
171 .unwrap_err()
172 .kind()
173 )
174 });
175}
176
177#[test]
178fn read_partial_then_err() {
179 let mut task = task::spawn(());
180 let mock = mock! {
181 Ok(b"\x00\x00".to_vec()),
182 Err(io::Error::new(io::ErrorKind::Other, "")),
183 };
184 let mut framed = FramedRead::new(mock, U32Decoder);
185
186 task.enter(|cx, _| {
187 assert_eq!(
188 io::ErrorKind::Other,
189 assert_ready!(pin!(framed).poll_next(cx))
190 .unwrap()
191 .unwrap_err()
192 .kind()
193 )
194 });
195}
196
197#[test]
198fn read_partial_would_block_then_err() {
199 let mut task = task::spawn(());
200 let mock = mock! {
201 Ok(b"\x00\x00".to_vec()),
202 Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
203 Err(io::Error::new(io::ErrorKind::Other, "")),
204 };
205 let mut framed = FramedRead::new(mock, U32Decoder);
206
207 task.enter(|cx, _| {
208 assert!(pin!(framed).poll_next(cx).is_pending());
209 assert_eq!(
210 io::ErrorKind::Other,
211 assert_ready!(pin!(framed).poll_next(cx))
212 .unwrap()
213 .unwrap_err()
214 .kind()
215 )
216 });
217}
218
219#[test]
220fn huge_size() {
221 let mut task = task::spawn(());
222 let data = &[0; 32 * 1024][..];
223 let mut framed = FramedRead::new(data, BigDecoder);
224
225 task.enter(|cx, _| {
226 assert_read!(pin!(framed).poll_next(cx), 0);
227 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
228 });
229
230 struct BigDecoder;
231
232 impl Decoder for BigDecoder {
233 type Item = u32;
234 type Error = io::Error;
235
236 fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
237 if buf.len() < 32 * 1024 {
238 return Ok(None);
239 }
240 buf.advance(32 * 1024);
241 Ok(Some(0))
242 }
243 }
244}
245
246#[test]
247fn data_remaining_is_error() {
248 let mut task = task::spawn(());
249 let slice = &[0; 5][..];
250 let mut framed = FramedRead::new(slice, U32Decoder);
251
252 task.enter(|cx, _| {
253 assert_read!(pin!(framed).poll_next(cx), 0);
254 assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
255 });
256}
257
258#[test]
259fn multi_frames_on_eof() {
260 let mut task = task::spawn(());
261 struct MyDecoder(Vec<u32>);
262
263 impl Decoder for MyDecoder {
264 type Item = u32;
265 type Error = io::Error;
266
267 fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
268 unreachable!();
269 }
270
271 fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
272 if self.0.is_empty() {
273 return Ok(None);
274 }
275
276 Ok(Some(self.0.remove(0)))
277 }
278 }
279
280 let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
281
282 task.enter(|cx, _| {
283 assert_read!(pin!(framed).poll_next(cx), 0);
284 assert_read!(pin!(framed).poll_next(cx), 1);
285 assert_read!(pin!(framed).poll_next(cx), 2);
286 assert_read!(pin!(framed).poll_next(cx), 3);
287 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
288 });
289}
290
291#[test]
292fn read_eof_then_resume() {
293 let mut task = task::spawn(());
294 let mock = mock! {
295 Ok(b"\x00\x00\x00\x01".to_vec()),
296 Ok(b"".to_vec()),
297 Ok(b"\x00\x00\x00\x02".to_vec()),
298 Ok(b"".to_vec()),
299 Ok(b"\x00\x00\x00\x03".to_vec()),
300 };
301 let mut framed = FramedRead::new(mock, U32Decoder);
302
303 task.enter(|cx, _| {
304 assert_read!(pin!(framed).poll_next(cx), 1);
305 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
306 assert_read!(pin!(framed).poll_next(cx), 2);
307 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
308 assert_read!(pin!(framed).poll_next(cx), 3);
309 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
310 assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
311 });
312}
313
314// ===== Mock ======
315
316struct Mock {
317 calls: VecDeque<io::Result<Vec<u8>>>,
318}
319
320impl AsyncRead for Mock {
321 fn poll_read(
322 mut self: Pin<&mut Self>,
323 _cx: &mut Context<'_>,
324 buf: &mut ReadBuf<'_>,
325 ) -> Poll<io::Result<()>> {
326 use io::ErrorKind::WouldBlock;
327
328 match self.calls.pop_front() {
329 Some(Ok(data)) => {
330 debug_assert!(buf.remaining() >= data.len());
331 buf.put_slice(&data);
332 Ready(Ok(()))
333 }
334 Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
335 Some(Err(e)) => Ready(Err(e)),
336 None => Ready(Ok(())),
337 }
338 }
339}
340