1#![warn(rust_2018_idioms)]
2#![cfg(not(target_os = "wasi"))] // Wasi doesn't support UDP
3
4use tokio::net::UdpSocket;
5use tokio_stream::StreamExt;
6use tokio_util::codec::{Decoder, Encoder, LinesCodec};
7use tokio_util::udp::UdpFramed;
8
9use bytes::{BufMut, BytesMut};
10use futures::future::try_join;
11use futures::future::FutureExt;
12use futures::sink::SinkExt;
13use std::io;
14use std::sync::Arc;
15
16#[cfg_attr(
17 any(
18 target_os = "macos",
19 target_os = "ios",
20 target_os = "tvos",
21 target_os = "watchos"
22 ),
23 allow(unused_assignments)
24)]
25#[tokio::test]
26async fn send_framed_byte_codec() -> std::io::Result<()> {
27 let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
28 let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
29
30 let a_addr = a_soc.local_addr()?;
31 let b_addr = b_soc.local_addr()?;
32
33 // test sending & receiving bytes
34 {
35 let mut a = UdpFramed::new(a_soc, ByteCodec);
36 let mut b = UdpFramed::new(b_soc, ByteCodec);
37
38 let msg = b"4567";
39
40 let send = a.send((msg, b_addr));
41 let recv = b.next().map(|e| e.unwrap());
42 let (_, received) = try_join(send, recv).await.unwrap();
43
44 let (data, addr) = received;
45 assert_eq!(msg, &*data);
46 assert_eq!(a_addr, addr);
47
48 a_soc = a.into_inner();
49 b_soc = b.into_inner();
50 }
51
52 #[cfg(not(any(
53 target_os = "macos",
54 target_os = "ios",
55 target_os = "tvos",
56 target_os = "watchos"
57 )))]
58 // test sending & receiving an empty message
59 {
60 let mut a = UdpFramed::new(a_soc, ByteCodec);
61 let mut b = UdpFramed::new(b_soc, ByteCodec);
62
63 let msg = b"";
64
65 let send = a.send((msg, b_addr));
66 let recv = b.next().map(|e| e.unwrap());
67 let (_, received) = try_join(send, recv).await.unwrap();
68
69 let (data, addr) = received;
70 assert_eq!(msg, &*data);
71 assert_eq!(a_addr, addr);
72 }
73
74 Ok(())
75}
76
77pub struct ByteCodec;
78
79impl Decoder for ByteCodec {
80 type Item = Vec<u8>;
81 type Error = io::Error;
82
83 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
84 let len = buf.len();
85 Ok(Some(buf.split_to(len).to_vec()))
86 }
87}
88
89impl Encoder<&[u8]> for ByteCodec {
90 type Error = io::Error;
91
92 fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> {
93 buf.reserve(data.len());
94 buf.put_slice(data);
95 Ok(())
96 }
97}
98
99#[tokio::test]
100async fn send_framed_lines_codec() -> std::io::Result<()> {
101 let a_soc = UdpSocket::bind("127.0.0.1:0").await?;
102 let b_soc = UdpSocket::bind("127.0.0.1:0").await?;
103
104 let a_addr = a_soc.local_addr()?;
105 let b_addr = b_soc.local_addr()?;
106
107 let mut a = UdpFramed::new(a_soc, ByteCodec);
108 let mut b = UdpFramed::new(b_soc, LinesCodec::new());
109
110 let msg = b"1\r\n2\r\n3\r\n".to_vec();
111 a.send((&msg, b_addr)).await?;
112
113 assert_eq!(b.next().await.unwrap().unwrap(), ("1".to_string(), a_addr));
114 assert_eq!(b.next().await.unwrap().unwrap(), ("2".to_string(), a_addr));
115 assert_eq!(b.next().await.unwrap().unwrap(), ("3".to_string(), a_addr));
116
117 Ok(())
118}
119
120#[tokio::test]
121async fn framed_half() -> std::io::Result<()> {
122 let a_soc = Arc::new(UdpSocket::bind("127.0.0.1:0").await?);
123 let b_soc = a_soc.clone();
124
125 let a_addr = a_soc.local_addr()?;
126 let b_addr = b_soc.local_addr()?;
127
128 let mut a = UdpFramed::new(a_soc, ByteCodec);
129 let mut b = UdpFramed::new(b_soc, LinesCodec::new());
130
131 let msg = b"1\r\n2\r\n3\r\n".to_vec();
132 a.send((&msg, b_addr)).await?;
133
134 let msg = b"4\r\n5\r\n6\r\n".to_vec();
135 a.send((&msg, b_addr)).await?;
136
137 assert_eq!(b.next().await.unwrap().unwrap(), ("1".to_string(), a_addr));
138 assert_eq!(b.next().await.unwrap().unwrap(), ("2".to_string(), a_addr));
139 assert_eq!(b.next().await.unwrap().unwrap(), ("3".to_string(), a_addr));
140
141 assert_eq!(b.next().await.unwrap().unwrap(), ("4".to_string(), a_addr));
142 assert_eq!(b.next().await.unwrap().unwrap(), ("5".to_string(), a_addr));
143 assert_eq!(b.next().await.unwrap().unwrap(), ("6".to_string(), a_addr));
144
145 Ok(())
146}
147