1 | #![warn (rust_2018_idioms)] |
2 | |
3 | use tokio_stream::StreamExt; |
4 | use tokio_test::assert_ok; |
5 | use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts}; |
6 | |
7 | use bytes::{Buf, BufMut, BytesMut}; |
8 | use std::io::{self, Read}; |
9 | use std::pin::Pin; |
10 | use std::task::{Context, Poll}; |
11 | |
12 | const INITIAL_CAPACITY: usize = 8 * 1024; |
13 | |
14 | /// Encode and decode u32 values. |
15 | #[derive(Default)] |
16 | struct U32Codec { |
17 | read_bytes: usize, |
18 | } |
19 | |
20 | impl Decoder for U32Codec { |
21 | type Item = u32; |
22 | type Error = io::Error; |
23 | |
24 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { |
25 | if buf.len() < 4 { |
26 | return Ok(None); |
27 | } |
28 | |
29 | let n = buf.split_to(4).get_u32(); |
30 | self.read_bytes += 4; |
31 | Ok(Some(n)) |
32 | } |
33 | } |
34 | |
35 | impl Encoder<u32> for U32Codec { |
36 | type Error = io::Error; |
37 | |
38 | fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { |
39 | // Reserve space |
40 | dst.reserve(4); |
41 | dst.put_u32(item); |
42 | Ok(()) |
43 | } |
44 | } |
45 | |
46 | /// Encode and decode u64 values. |
47 | #[derive(Default)] |
48 | struct U64Codec { |
49 | read_bytes: usize, |
50 | } |
51 | |
52 | impl Decoder for U64Codec { |
53 | type Item = u64; |
54 | type Error = io::Error; |
55 | |
56 | fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u64>> { |
57 | if buf.len() < 8 { |
58 | return Ok(None); |
59 | } |
60 | |
61 | let n = buf.split_to(8).get_u64(); |
62 | self.read_bytes += 8; |
63 | Ok(Some(n)) |
64 | } |
65 | } |
66 | |
67 | impl Encoder<u64> for U64Codec { |
68 | type Error = io::Error; |
69 | |
70 | fn encode(&mut self, item: u64, dst: &mut BytesMut) -> io::Result<()> { |
71 | // Reserve space |
72 | dst.reserve(8); |
73 | dst.put_u64(item); |
74 | Ok(()) |
75 | } |
76 | } |
77 | |
78 | /// This value should never be used |
79 | struct DontReadIntoThis; |
80 | |
81 | impl Read for DontReadIntoThis { |
82 | fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { |
83 | Err(io::Error::new( |
84 | io::ErrorKind::Other, |
85 | "Read into something you weren't supposed to." , |
86 | )) |
87 | } |
88 | } |
89 | |
90 | impl tokio::io::AsyncRead for DontReadIntoThis { |
91 | fn poll_read( |
92 | self: Pin<&mut Self>, |
93 | _cx: &mut Context<'_>, |
94 | _buf: &mut tokio::io::ReadBuf<'_>, |
95 | ) -> Poll<io::Result<()>> { |
96 | unreachable!() |
97 | } |
98 | } |
99 | |
100 | #[tokio::test ] |
101 | async fn can_read_from_existing_buf() { |
102 | let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default()); |
103 | parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]); |
104 | |
105 | let mut framed = Framed::from_parts(parts); |
106 | let num = assert_ok!(framed.next().await.unwrap()); |
107 | |
108 | assert_eq!(num, 42); |
109 | assert_eq!(framed.codec().read_bytes, 4); |
110 | } |
111 | |
112 | #[tokio::test ] |
113 | async fn can_read_from_existing_buf_after_codec_changed() { |
114 | let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default()); |
115 | parts.read_buf = BytesMut::from(&[0, 0, 0, 42, 0, 0, 0, 0, 0, 0, 0, 84][..]); |
116 | |
117 | let mut framed = Framed::from_parts(parts); |
118 | let num = assert_ok!(framed.next().await.unwrap()); |
119 | |
120 | assert_eq!(num, 42); |
121 | assert_eq!(framed.codec().read_bytes, 4); |
122 | |
123 | let mut framed = framed.map_codec(|codec| U64Codec { |
124 | read_bytes: codec.read_bytes, |
125 | }); |
126 | let num = assert_ok!(framed.next().await.unwrap()); |
127 | |
128 | assert_eq!(num, 84); |
129 | assert_eq!(framed.codec().read_bytes, 12); |
130 | } |
131 | |
132 | #[test] |
133 | fn external_buf_grows_to_init() { |
134 | let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default()); |
135 | parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]); |
136 | |
137 | let framed = Framed::from_parts(parts); |
138 | let FramedParts { read_buf, .. } = framed.into_parts(); |
139 | |
140 | assert_eq!(read_buf.capacity(), INITIAL_CAPACITY); |
141 | } |
142 | |
143 | #[test] |
144 | fn external_buf_does_not_shrink() { |
145 | let mut parts = FramedParts::new(DontReadIntoThis, U32Codec::default()); |
146 | parts.read_buf = BytesMut::from(&vec![0; INITIAL_CAPACITY * 2][..]); |
147 | |
148 | let framed = Framed::from_parts(parts); |
149 | let FramedParts { read_buf, .. } = framed.into_parts(); |
150 | |
151 | assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2); |
152 | } |
153 | |