| 1 | #![warn (rust_2018_idioms)] |
| 2 | |
| 3 | use tokio::io::{AsyncRead, ReadBuf}; |
| 4 | use tokio_test::assert_ready; |
| 5 | use tokio_test::task; |
| 6 | use tokio_util::codec::{Decoder, FramedRead}; |
| 7 | |
| 8 | use bytes::{Buf, BytesMut}; |
| 9 | use futures::Stream; |
| 10 | use std::collections::VecDeque; |
| 11 | use std::io; |
| 12 | use std::pin::Pin; |
| 13 | use std::task::Poll::{Pending, Ready}; |
| 14 | use std::task::{Context, Poll}; |
| 15 | |
| 16 | macro_rules! mock { |
| 17 | ($($x:expr,)*) => {{ |
| 18 | let mut v = VecDeque::new(); |
| 19 | v.extend(vec![$($x),*]); |
| 20 | Mock { calls: v } |
| 21 | }}; |
| 22 | } |
| 23 | |
| 24 | macro_rules! assert_read { |
| 25 | ($e:expr, $n:expr) => {{ |
| 26 | let val = assert_ready!($e); |
| 27 | assert_eq!(val.unwrap().unwrap(), $n); |
| 28 | }}; |
| 29 | } |
| 30 | |
| 31 | macro_rules! pin { |
| 32 | ($id:ident) => { |
| 33 | Pin::new(&mut $id) |
| 34 | }; |
| 35 | } |
| 36 | |
| 37 | struct U32Decoder; |
| 38 | |
| 39 | impl Decoder for U32Decoder { |
| 40 | type Item = u32; |
| 41 | type Error = io::Error; |
| 42 | |
| 43 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { |
| 44 | if buf.len() < 4 { |
| 45 | return Ok(None); |
| 46 | } |
| 47 | |
| 48 | let n = buf.split_to(4).get_u32(); |
| 49 | Ok(Some(n)) |
| 50 | } |
| 51 | } |
| 52 | |
| 53 | struct U64Decoder; |
| 54 | |
| 55 | impl Decoder for U64Decoder { |
| 56 | type Item = u64; |
| 57 | type Error = io::Error; |
| 58 | |
| 59 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> { |
| 60 | if buf.len() < 8 { |
| 61 | return Ok(None); |
| 62 | } |
| 63 | |
| 64 | let n = buf.split_to(8).get_u64(); |
| 65 | Ok(Some(n)) |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | #[test] |
| 70 | fn read_multi_frame_in_packet() { |
| 71 | let mut task = task::spawn(()); |
| 72 | let mock = mock! { |
| 73 | Ok(b" \x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02" .to_vec()), |
| 74 | }; |
| 75 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 76 | |
| 77 | task.enter(|cx, _| { |
| 78 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 79 | assert_read!(pin!(framed).poll_next(cx), 1); |
| 80 | assert_read!(pin!(framed).poll_next(cx), 2); |
| 81 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 82 | }); |
| 83 | } |
| 84 | |
| 85 | #[test] |
| 86 | fn read_multi_frame_across_packets() { |
| 87 | let mut task = task::spawn(()); |
| 88 | let mock = mock! { |
| 89 | Ok(b" \x00\x00\x00\x00" .to_vec()), |
| 90 | Ok(b" \x00\x00\x00\x01" .to_vec()), |
| 91 | Ok(b" \x00\x00\x00\x02" .to_vec()), |
| 92 | }; |
| 93 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 94 | |
| 95 | task.enter(|cx, _| { |
| 96 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 97 | assert_read!(pin!(framed).poll_next(cx), 1); |
| 98 | assert_read!(pin!(framed).poll_next(cx), 2); |
| 99 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 100 | }); |
| 101 | } |
| 102 | |
| 103 | #[test] |
| 104 | fn read_multi_frame_in_packet_after_codec_changed() { |
| 105 | let mut task = task::spawn(()); |
| 106 | let mock = mock! { |
| 107 | Ok(b" \x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x08" .to_vec()), |
| 108 | }; |
| 109 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 110 | |
| 111 | task.enter(|cx, _| { |
| 112 | assert_read!(pin!(framed).poll_next(cx), 0x04); |
| 113 | |
| 114 | let mut framed = framed.map_decoder(|_| U64Decoder); |
| 115 | assert_read!(pin!(framed).poll_next(cx), 0x08); |
| 116 | |
| 117 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 118 | }); |
| 119 | } |
| 120 | |
| 121 | #[test] |
| 122 | fn read_not_ready() { |
| 123 | let mut task = task::spawn(()); |
| 124 | let mock = mock! { |
| 125 | Err(io::Error::new(io::ErrorKind::WouldBlock, "" )), |
| 126 | Ok(b" \x00\x00\x00\x00" .to_vec()), |
| 127 | Ok(b" \x00\x00\x00\x01" .to_vec()), |
| 128 | }; |
| 129 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 130 | |
| 131 | task.enter(|cx, _| { |
| 132 | assert!(pin!(framed).poll_next(cx).is_pending()); |
| 133 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 134 | assert_read!(pin!(framed).poll_next(cx), 1); |
| 135 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 136 | }); |
| 137 | } |
| 138 | |
| 139 | #[test] |
| 140 | fn read_partial_then_not_ready() { |
| 141 | let mut task = task::spawn(()); |
| 142 | let mock = mock! { |
| 143 | Ok(b" \x00\x00" .to_vec()), |
| 144 | Err(io::Error::new(io::ErrorKind::WouldBlock, "" )), |
| 145 | Ok(b" \x00\x00\x00\x00\x00\x01\x00\x00\x00\x02" .to_vec()), |
| 146 | }; |
| 147 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 148 | |
| 149 | task.enter(|cx, _| { |
| 150 | assert!(pin!(framed).poll_next(cx).is_pending()); |
| 151 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 152 | assert_read!(pin!(framed).poll_next(cx), 1); |
| 153 | assert_read!(pin!(framed).poll_next(cx), 2); |
| 154 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 155 | }); |
| 156 | } |
| 157 | |
| 158 | #[test] |
| 159 | fn read_err() { |
| 160 | let mut task = task::spawn(()); |
| 161 | let mock = mock! { |
| 162 | Err(io::Error::new(io::ErrorKind::Other, "" )), |
| 163 | }; |
| 164 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 165 | |
| 166 | task.enter(|cx, _| { |
| 167 | assert_eq!( |
| 168 | io::ErrorKind::Other, |
| 169 | assert_ready!(pin!(framed).poll_next(cx)) |
| 170 | .unwrap() |
| 171 | .unwrap_err() |
| 172 | .kind() |
| 173 | ) |
| 174 | }); |
| 175 | } |
| 176 | |
| 177 | #[test] |
| 178 | fn read_partial_then_err() { |
| 179 | let mut task = task::spawn(()); |
| 180 | let mock = mock! { |
| 181 | Ok(b" \x00\x00" .to_vec()), |
| 182 | Err(io::Error::new(io::ErrorKind::Other, "" )), |
| 183 | }; |
| 184 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 185 | |
| 186 | task.enter(|cx, _| { |
| 187 | assert_eq!( |
| 188 | io::ErrorKind::Other, |
| 189 | assert_ready!(pin!(framed).poll_next(cx)) |
| 190 | .unwrap() |
| 191 | .unwrap_err() |
| 192 | .kind() |
| 193 | ) |
| 194 | }); |
| 195 | } |
| 196 | |
| 197 | #[test] |
| 198 | fn read_partial_would_block_then_err() { |
| 199 | let mut task = task::spawn(()); |
| 200 | let mock = mock! { |
| 201 | Ok(b" \x00\x00" .to_vec()), |
| 202 | Err(io::Error::new(io::ErrorKind::WouldBlock, "" )), |
| 203 | Err(io::Error::new(io::ErrorKind::Other, "" )), |
| 204 | }; |
| 205 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 206 | |
| 207 | task.enter(|cx, _| { |
| 208 | assert!(pin!(framed).poll_next(cx).is_pending()); |
| 209 | assert_eq!( |
| 210 | io::ErrorKind::Other, |
| 211 | assert_ready!(pin!(framed).poll_next(cx)) |
| 212 | .unwrap() |
| 213 | .unwrap_err() |
| 214 | .kind() |
| 215 | ) |
| 216 | }); |
| 217 | } |
| 218 | |
| 219 | #[test] |
| 220 | fn huge_size() { |
| 221 | let mut task = task::spawn(()); |
| 222 | let data = &[0; 32 * 1024][..]; |
| 223 | let mut framed = FramedRead::new(data, BigDecoder); |
| 224 | |
| 225 | task.enter(|cx, _| { |
| 226 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 227 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 228 | }); |
| 229 | |
| 230 | struct BigDecoder; |
| 231 | |
| 232 | impl Decoder for BigDecoder { |
| 233 | type Item = u32; |
| 234 | type Error = io::Error; |
| 235 | |
| 236 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { |
| 237 | if buf.len() < 32 * 1024 { |
| 238 | return Ok(None); |
| 239 | } |
| 240 | buf.advance(32 * 1024); |
| 241 | Ok(Some(0)) |
| 242 | } |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | #[test] |
| 247 | fn data_remaining_is_error() { |
| 248 | let mut task = task::spawn(()); |
| 249 | let slice = &[0; 5][..]; |
| 250 | let mut framed = FramedRead::new(slice, U32Decoder); |
| 251 | |
| 252 | task.enter(|cx, _| { |
| 253 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 254 | assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err()); |
| 255 | }); |
| 256 | } |
| 257 | |
| 258 | #[test] |
| 259 | fn multi_frames_on_eof() { |
| 260 | let mut task = task::spawn(()); |
| 261 | struct MyDecoder(Vec<u32>); |
| 262 | |
| 263 | impl Decoder for MyDecoder { |
| 264 | type Item = u32; |
| 265 | type Error = io::Error; |
| 266 | |
| 267 | fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { |
| 268 | unreachable!(); |
| 269 | } |
| 270 | |
| 271 | fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { |
| 272 | if self.0.is_empty() { |
| 273 | return Ok(None); |
| 274 | } |
| 275 | |
| 276 | Ok(Some(self.0.remove(0))) |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); |
| 281 | |
| 282 | task.enter(|cx, _| { |
| 283 | assert_read!(pin!(framed).poll_next(cx), 0); |
| 284 | assert_read!(pin!(framed).poll_next(cx), 1); |
| 285 | assert_read!(pin!(framed).poll_next(cx), 2); |
| 286 | assert_read!(pin!(framed).poll_next(cx), 3); |
| 287 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 288 | }); |
| 289 | } |
| 290 | |
| 291 | #[test] |
| 292 | fn read_eof_then_resume() { |
| 293 | let mut task = task::spawn(()); |
| 294 | let mock = mock! { |
| 295 | Ok(b" \x00\x00\x00\x01" .to_vec()), |
| 296 | Ok(b"" .to_vec()), |
| 297 | Ok(b" \x00\x00\x00\x02" .to_vec()), |
| 298 | Ok(b"" .to_vec()), |
| 299 | Ok(b" \x00\x00\x00\x03" .to_vec()), |
| 300 | }; |
| 301 | let mut framed = FramedRead::new(mock, U32Decoder); |
| 302 | |
| 303 | task.enter(|cx, _| { |
| 304 | assert_read!(pin!(framed).poll_next(cx), 1); |
| 305 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 306 | assert_read!(pin!(framed).poll_next(cx), 2); |
| 307 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 308 | assert_read!(pin!(framed).poll_next(cx), 3); |
| 309 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 310 | assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none()); |
| 311 | }); |
| 312 | } |
| 313 | |
| 314 | // ===== Mock ====== |
| 315 | |
| 316 | struct Mock { |
| 317 | calls: VecDeque<io::Result<Vec<u8>>>, |
| 318 | } |
| 319 | |
| 320 | impl AsyncRead for Mock { |
| 321 | fn poll_read( |
| 322 | mut self: Pin<&mut Self>, |
| 323 | _cx: &mut Context<'_>, |
| 324 | buf: &mut ReadBuf<'_>, |
| 325 | ) -> Poll<io::Result<()>> { |
| 326 | use io::ErrorKind::WouldBlock; |
| 327 | |
| 328 | match self.calls.pop_front() { |
| 329 | Some(Ok(data)) => { |
| 330 | debug_assert!(buf.remaining() >= data.len()); |
| 331 | buf.put_slice(&data); |
| 332 | Ready(Ok(())) |
| 333 | } |
| 334 | Some(Err(ref e)) if e.kind() == WouldBlock => Pending, |
| 335 | Some(Err(e)) => Ready(Err(e)), |
| 336 | None => Ready(Ok(())), |
| 337 | } |
| 338 | } |
| 339 | } |
| 340 | |