1 | #![warn (rust_2018_idioms)] |
2 | |
3 | use tokio::io::{AsyncRead, ReadBuf}; |
4 | use tokio_test::assert_ready; |
5 | use tokio_test::task; |
6 | use tokio_util::codec::{Decoder, FramedRead}; |
7 | |
8 | use bytes::{Buf, BytesMut}; |
9 | use futures::Stream; |
10 | use std::collections::VecDeque; |
11 | use std::io; |
12 | use std::pin::Pin; |
13 | use std::task::Poll::{Pending, Ready}; |
14 | use std::task::{Context, Poll}; |
15 | |
16 | macro_rules! mock { |
17 | ($($x:expr,)*) => {{ |
18 | let mut v = VecDeque::new(); |
19 | v.extend(vec![$($x),*]); |
20 | Mock { calls: v } |
21 | }}; |
22 | } |
23 | |
24 | macro_rules! assert_read { |
25 | ($e:expr, $n:expr) => {{ |
26 | let val = assert_ready!($e); |
27 | assert_eq!(val.unwrap().unwrap(), $n); |
28 | }}; |
29 | } |
30 | |
31 | macro_rules! pin { |
32 | ($id:ident) => { |
33 | Pin::new(&mut $id) |
34 | }; |
35 | } |
36 | |
37 | struct U32Decoder; |
38 | |
39 | impl Decoder for U32Decoder { |
40 | type Item = u32; |
41 | type Error = io::Error; |
42 | |
43 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { |
44 | if buf.len() < 4 { |
45 | return Ok(None); |
46 | } |
47 | |
48 | let n = buf.split_to(4).get_u32(); |
49 | Ok(Some(n)) |
50 | } |
51 | } |
52 | |
53 | struct U64Decoder; |
54 | |
55 | impl Decoder for U64Decoder { |
56 | type Item = u64; |
57 | type Error = io::Error; |
58 | |
59 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> { |
60 | if buf.len() < 8 { |
61 | return Ok(None); |
62 | } |
63 | |
64 | let n = buf.split_to(8).get_u64(); |
65 | Ok(Some(n)) |
66 | } |
67 | } |
68 | |
69 | #[test] |
70 | fn read_multi_frame_in_packet() { |
71 | let mut task = task::spawn(()); |
72 | let mock = mock! { |
73 | Ok(b" \x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02" .to_vec()), |
74 | }; |
75 | let mut framed = FramedRead::new(mock, U32Decoder); |
76 | |
77 | task.enter(|cx, _| { |
78 | assert_read!(pin!(framed).poll_next(cx), 0); |
79 | assert_read!(pin!(framed).poll_next(cx), 1); |
80 | assert_read!(pin!(framed).poll_next(cx), 2); |
81 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
82 | }); |
83 | } |
84 | |
85 | #[test] |
86 | fn read_multi_frame_across_packets() { |
87 | let mut task = task::spawn(()); |
88 | let mock = mock! { |
89 | Ok(b" \x00\x00\x00\x00" .to_vec()), |
90 | Ok(b" \x00\x00\x00\x01" .to_vec()), |
91 | Ok(b" \x00\x00\x00\x02" .to_vec()), |
92 | }; |
93 | let mut framed = FramedRead::new(mock, U32Decoder); |
94 | |
95 | task.enter(|cx, _| { |
96 | assert_read!(pin!(framed).poll_next(cx), 0); |
97 | assert_read!(pin!(framed).poll_next(cx), 1); |
98 | assert_read!(pin!(framed).poll_next(cx), 2); |
99 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
100 | }); |
101 | } |
102 | |
103 | #[test] |
104 | fn read_multi_frame_in_packet_after_codec_changed() { |
105 | let mut task = task::spawn(()); |
106 | let mock = mock! { |
107 | Ok(b" \x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08" .to_vec()), |
108 | }; |
109 | let mut framed = FramedRead::new(mock, U32Decoder); |
110 | |
111 | task.enter(|cx, _| { |
112 | assert_read!(pin!(framed).poll_next(cx), 0x04); |
113 | |
114 | let mut framed = framed.map_decoder(|_| U64Decoder); |
115 | assert_read!(pin!(framed).poll_next(cx), 0x08); |
116 | |
117 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
118 | }); |
119 | } |
120 | |
121 | #[test] |
122 | fn read_not_ready() { |
123 | let mut task = task::spawn(()); |
124 | let mock = mock! { |
125 | Err(io::Error::new(io::ErrorKind::WouldBlock, "" )), |
126 | Ok(b" \x00\x00\x00\x00" .to_vec()), |
127 | Ok(b" \x00\x00\x00\x01" .to_vec()), |
128 | }; |
129 | let mut framed = FramedRead::new(mock, U32Decoder); |
130 | |
131 | task.enter(|cx, _| { |
132 | assert!(pin!(framed).poll_next(cx).is_pending()); |
133 | assert_read!(pin!(framed).poll_next(cx), 0); |
134 | assert_read!(pin!(framed).poll_next(cx), 1); |
135 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
136 | }); |
137 | } |
138 | |
139 | #[test] |
140 | fn read_partial_then_not_ready() { |
141 | let mut task = task::spawn(()); |
142 | let mock = mock! { |
143 | Ok(b" \x00\x00" .to_vec()), |
144 | Err(io::Error::new(io::ErrorKind::WouldBlock, "" )), |
145 | Ok(b" \x00\x00\x00\x00\x00\x01\x00\x00\x00\x02" .to_vec()), |
146 | }; |
147 | let mut framed = FramedRead::new(mock, U32Decoder); |
148 | |
149 | task.enter(|cx, _| { |
150 | assert!(pin!(framed).poll_next(cx).is_pending()); |
151 | assert_read!(pin!(framed).poll_next(cx), 0); |
152 | assert_read!(pin!(framed).poll_next(cx), 1); |
153 | assert_read!(pin!(framed).poll_next(cx), 2); |
154 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
155 | }); |
156 | } |
157 | |
158 | #[test] |
159 | fn read_err() { |
160 | let mut task = task::spawn(()); |
161 | let mock = mock! { |
162 | Err(io::Error::new(io::ErrorKind::Other, "" )), |
163 | }; |
164 | let mut framed = FramedRead::new(mock, U32Decoder); |
165 | |
166 | task.enter(|cx, _| { |
167 | assert_eq!( |
168 | io::ErrorKind::Other, |
169 | assert_ready!(pin!(framed).poll_next(cx)) |
170 | .unwrap() |
171 | .unwrap_err() |
172 | .kind() |
173 | ) |
174 | }); |
175 | } |
176 | |
177 | #[test] |
178 | fn read_partial_then_err() { |
179 | let mut task = task::spawn(()); |
180 | let mock = mock! { |
181 | Ok(b" \x00\x00" .to_vec()), |
182 | Err(io::Error::new(io::ErrorKind::Other, "" )), |
183 | }; |
184 | let mut framed = FramedRead::new(mock, U32Decoder); |
185 | |
186 | task.enter(|cx, _| { |
187 | assert_eq!( |
188 | io::ErrorKind::Other, |
189 | assert_ready!(pin!(framed).poll_next(cx)) |
190 | .unwrap() |
191 | .unwrap_err() |
192 | .kind() |
193 | ) |
194 | }); |
195 | } |
196 | |
197 | #[test] |
198 | fn read_partial_would_block_then_err() { |
199 | let mut task = task::spawn(()); |
200 | let mock = mock! { |
201 | Ok(b" \x00\x00" .to_vec()), |
202 | Err(io::Error::new(io::ErrorKind::WouldBlock, "" )), |
203 | Err(io::Error::new(io::ErrorKind::Other, "" )), |
204 | }; |
205 | let mut framed = FramedRead::new(mock, U32Decoder); |
206 | |
207 | task.enter(|cx, _| { |
208 | assert!(pin!(framed).poll_next(cx).is_pending()); |
209 | assert_eq!( |
210 | io::ErrorKind::Other, |
211 | assert_ready!(pin!(framed).poll_next(cx)) |
212 | .unwrap() |
213 | .unwrap_err() |
214 | .kind() |
215 | ) |
216 | }); |
217 | } |
218 | |
219 | #[test] |
220 | fn huge_size() { |
221 | let mut task = task::spawn(()); |
222 | let data = &[0; 32 * 1024][..]; |
223 | let mut framed = FramedRead::new(data, BigDecoder); |
224 | |
225 | task.enter(|cx, _| { |
226 | assert_read!(pin!(framed).poll_next(cx), 0); |
227 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
228 | }); |
229 | |
230 | struct BigDecoder; |
231 | |
232 | impl Decoder for BigDecoder { |
233 | type Item = u32; |
234 | type Error = io::Error; |
235 | |
236 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { |
237 | if buf.len() < 32 * 1024 { |
238 | return Ok(None); |
239 | } |
240 | buf.advance(32 * 1024); |
241 | Ok(Some(0)) |
242 | } |
243 | } |
244 | } |
245 | |
246 | #[test] |
247 | fn data_remaining_is_error() { |
248 | let mut task = task::spawn(()); |
249 | let slice = &[0; 5][..]; |
250 | let mut framed = FramedRead::new(slice, U32Decoder); |
251 | |
252 | task.enter(|cx, _| { |
253 | assert_read!(pin!(framed).poll_next(cx), 0); |
254 | assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err()); |
255 | }); |
256 | } |
257 | |
258 | #[test] |
259 | fn multi_frames_on_eof() { |
260 | let mut task = task::spawn(()); |
261 | struct MyDecoder(Vec<u32>); |
262 | |
263 | impl Decoder for MyDecoder { |
264 | type Item = u32; |
265 | type Error = io::Error; |
266 | |
267 | fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { |
268 | unreachable!(); |
269 | } |
270 | |
271 | fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { |
272 | if self.0.is_empty() { |
273 | return Ok(None); |
274 | } |
275 | |
276 | Ok(Some(self.0.remove(0))) |
277 | } |
278 | } |
279 | |
280 | let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); |
281 | |
282 | task.enter(|cx, _| { |
283 | assert_read!(pin!(framed).poll_next(cx), 0); |
284 | assert_read!(pin!(framed).poll_next(cx), 1); |
285 | assert_read!(pin!(framed).poll_next(cx), 2); |
286 | assert_read!(pin!(framed).poll_next(cx), 3); |
287 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
288 | }); |
289 | } |
290 | |
291 | #[test] |
292 | fn read_eof_then_resume() { |
293 | let mut task = task::spawn(()); |
294 | let mock = mock! { |
295 | Ok(b" \x00\x00\x00\x01" .to_vec()), |
296 | Ok(b"" .to_vec()), |
297 | Ok(b" \x00\x00\x00\x02" .to_vec()), |
298 | Ok(b"" .to_vec()), |
299 | Ok(b" \x00\x00\x00\x03" .to_vec()), |
300 | }; |
301 | let mut framed = FramedRead::new(mock, U32Decoder); |
302 | |
303 | task.enter(|cx, _| { |
304 | assert_read!(pin!(framed).poll_next(cx), 1); |
305 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
306 | assert_read!(pin!(framed).poll_next(cx), 2); |
307 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
308 | assert_read!(pin!(framed).poll_next(cx), 3); |
309 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
310 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
311 | }); |
312 | } |
313 | |
314 | // ===== Mock ====== |
315 | |
316 | struct Mock { |
317 | calls: VecDeque<io::Result<Vec<u8>>>, |
318 | } |
319 | |
320 | impl AsyncRead for Mock { |
321 | fn poll_read( |
322 | mut self: Pin<&mut Self>, |
323 | _cx: &mut Context<'_>, |
324 | buf: &mut ReadBuf<'_>, |
325 | ) -> Poll<io::Result<()>> { |
326 | use io::ErrorKind::WouldBlock; |
327 | |
328 | match self.calls.pop_front() { |
329 | Some(Ok(data)) => { |
330 | debug_assert!(buf.remaining() >= data.len()); |
331 | buf.put_slice(&data); |
332 | Ready(Ok(())) |
333 | } |
334 | Some(Err(ref e)) if e.kind() == WouldBlock => Pending, |
335 | Some(Err(e)) => Ready(Err(e)), |
336 | None => Ready(Ok(())), |
337 | } |
338 | } |
339 | } |
340 | |