1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (not(target_os = "wasi" ))] // Wasi doesn't support UDP |
3 | |
4 | use tokio::net::UdpSocket; |
5 | use tokio_stream::StreamExt; |
6 | use tokio_util::codec::{Decoder, Encoder, LinesCodec}; |
7 | use tokio_util::udp::UdpFramed; |
8 | |
9 | use bytes::{BufMut, BytesMut}; |
10 | use futures::future::try_join; |
11 | use futures::future::FutureExt; |
12 | use futures::sink::SinkExt; |
13 | use std::io; |
14 | use std::sync::Arc; |
15 | |
16 | #[cfg_attr ( |
17 | any( |
18 | target_os = "macos" , |
19 | target_os = "ios" , |
20 | target_os = "tvos" , |
21 | target_os = "watchos" |
22 | ), |
23 | allow(unused_assignments) |
24 | )] |
25 | #[tokio::test ] |
26 | async fn send_framed_byte_codec() -> std::io::Result<()> { |
27 | let mut a_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
28 | let mut b_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
29 | |
30 | let a_addr = a_soc.local_addr()?; |
31 | let b_addr = b_soc.local_addr()?; |
32 | |
33 | // test sending & receiving bytes |
34 | { |
35 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
36 | let mut b = UdpFramed::new(b_soc, ByteCodec); |
37 | |
38 | let msg = b"4567" ; |
39 | |
40 | let send = a.send((msg, b_addr)); |
41 | let recv = b.next().map(|e| e.unwrap()); |
42 | let (_, received) = try_join(send, recv).await.unwrap(); |
43 | |
44 | let (data, addr) = received; |
45 | assert_eq!(msg, &*data); |
46 | assert_eq!(a_addr, addr); |
47 | |
48 | a_soc = a.into_inner(); |
49 | b_soc = b.into_inner(); |
50 | } |
51 | |
52 | #[cfg (not(any( |
53 | target_os = "macos" , |
54 | target_os = "ios" , |
55 | target_os = "tvos" , |
56 | target_os = "watchos" |
57 | )))] |
58 | // test sending & receiving an empty message |
59 | { |
60 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
61 | let mut b = UdpFramed::new(b_soc, ByteCodec); |
62 | |
63 | let msg = b"" ; |
64 | |
65 | let send = a.send((msg, b_addr)); |
66 | let recv = b.next().map(|e| e.unwrap()); |
67 | let (_, received) = try_join(send, recv).await.unwrap(); |
68 | |
69 | let (data, addr) = received; |
70 | assert_eq!(msg, &*data); |
71 | assert_eq!(a_addr, addr); |
72 | } |
73 | |
74 | Ok(()) |
75 | } |
76 | |
77 | pub struct ByteCodec; |
78 | |
79 | impl Decoder for ByteCodec { |
80 | type Item = Vec<u8>; |
81 | type Error = io::Error; |
82 | |
83 | fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> { |
84 | let len = buf.len(); |
85 | Ok(Some(buf.split_to(len).to_vec())) |
86 | } |
87 | } |
88 | |
89 | impl Encoder<&[u8]> for ByteCodec { |
90 | type Error = io::Error; |
91 | |
92 | fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { |
93 | buf.reserve(data.len()); |
94 | buf.put_slice(data); |
95 | Ok(()) |
96 | } |
97 | } |
98 | |
99 | #[tokio::test ] |
100 | async fn send_framed_lines_codec() -> std::io::Result<()> { |
101 | let a_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
102 | let b_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
103 | |
104 | let a_addr = a_soc.local_addr()?; |
105 | let b_addr = b_soc.local_addr()?; |
106 | |
107 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
108 | let mut b = UdpFramed::new(b_soc, LinesCodec::new()); |
109 | |
110 | let msg = b"1 \r\n2 \r\n3 \r\n" .to_vec(); |
111 | a.send((&msg, b_addr)).await?; |
112 | |
113 | assert_eq!(b.next().await.unwrap().unwrap(), ("1" .to_string(), a_addr)); |
114 | assert_eq!(b.next().await.unwrap().unwrap(), ("2" .to_string(), a_addr)); |
115 | assert_eq!(b.next().await.unwrap().unwrap(), ("3" .to_string(), a_addr)); |
116 | |
117 | Ok(()) |
118 | } |
119 | |
120 | #[tokio::test ] |
121 | async fn framed_half() -> std::io::Result<()> { |
122 | let a_soc = Arc::new(UdpSocket::bind("127.0.0.1:0" ).await?); |
123 | let b_soc = a_soc.clone(); |
124 | |
125 | let a_addr = a_soc.local_addr()?; |
126 | let b_addr = b_soc.local_addr()?; |
127 | |
128 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
129 | let mut b = UdpFramed::new(b_soc, LinesCodec::new()); |
130 | |
131 | let msg = b"1 \r\n2 \r\n3 \r\n" .to_vec(); |
132 | a.send((&msg, b_addr)).await?; |
133 | |
134 | let msg = b"4 \r\n5 \r\n6 \r\n" .to_vec(); |
135 | a.send((&msg, b_addr)).await?; |
136 | |
137 | assert_eq!(b.next().await.unwrap().unwrap(), ("1" .to_string(), a_addr)); |
138 | assert_eq!(b.next().await.unwrap().unwrap(), ("2" .to_string(), a_addr)); |
139 | assert_eq!(b.next().await.unwrap().unwrap(), ("3" .to_string(), a_addr)); |
140 | |
141 | assert_eq!(b.next().await.unwrap().unwrap(), ("4" .to_string(), a_addr)); |
142 | assert_eq!(b.next().await.unwrap().unwrap(), ("5" .to_string(), a_addr)); |
143 | assert_eq!(b.next().await.unwrap().unwrap(), ("6" .to_string(), a_addr)); |
144 | |
145 | Ok(()) |
146 | } |
147 | |