| 1 | #![warn (rust_2018_idioms)] |
| 2 | #![cfg (not(target_os = "wasi" ))] // Wasi doesn't support UDP |
| 3 | |
| 4 | use tokio::net::UdpSocket; |
| 5 | use tokio_stream::StreamExt; |
| 6 | use tokio_util::codec::{Decoder, Encoder, LinesCodec}; |
| 7 | use tokio_util::udp::UdpFramed; |
| 8 | |
| 9 | use bytes::{BufMut, BytesMut}; |
| 10 | use futures::future::try_join; |
| 11 | use futures::future::FutureExt; |
| 12 | use futures::sink::SinkExt; |
| 13 | use std::io; |
| 14 | use std::sync::Arc; |
| 15 | |
| 16 | #[cfg_attr ( |
| 17 | any( |
| 18 | target_os = "macos" , |
| 19 | target_os = "ios" , |
| 20 | target_os = "tvos" , |
| 21 | target_os = "watchos" |
| 22 | ), |
| 23 | allow(unused_assignments) |
| 24 | )] |
| 25 | #[tokio::test ] |
| 26 | async fn send_framed_byte_codec() -> std::io::Result<()> { |
| 27 | let mut a_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
| 28 | let mut b_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
| 29 | |
| 30 | let a_addr = a_soc.local_addr()?; |
| 31 | let b_addr = b_soc.local_addr()?; |
| 32 | |
| 33 | // test sending & receiving bytes |
| 34 | { |
| 35 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
| 36 | let mut b = UdpFramed::new(b_soc, ByteCodec); |
| 37 | |
| 38 | let msg = b"4567" ; |
| 39 | |
| 40 | let send = a.send((msg, b_addr)); |
| 41 | let recv = b.next().map(|e| e.unwrap()); |
| 42 | let (_, received) = try_join(send, recv).await.unwrap(); |
| 43 | |
| 44 | let (data, addr) = received; |
| 45 | assert_eq!(msg, &*data); |
| 46 | assert_eq!(a_addr, addr); |
| 47 | |
| 48 | a_soc = a.into_inner(); |
| 49 | b_soc = b.into_inner(); |
| 50 | } |
| 51 | |
| 52 | #[cfg (not(any( |
| 53 | target_os = "macos" , |
| 54 | target_os = "ios" , |
| 55 | target_os = "tvos" , |
| 56 | target_os = "watchos" |
| 57 | )))] |
| 58 | // test sending & receiving an empty message |
| 59 | { |
| 60 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
| 61 | let mut b = UdpFramed::new(b_soc, ByteCodec); |
| 62 | |
| 63 | let msg = b"" ; |
| 64 | |
| 65 | let send = a.send((msg, b_addr)); |
| 66 | let recv = b.next().map(|e| e.unwrap()); |
| 67 | let (_, received) = try_join(send, recv).await.unwrap(); |
| 68 | |
| 69 | let (data, addr) = received; |
| 70 | assert_eq!(msg, &*data); |
| 71 | assert_eq!(a_addr, addr); |
| 72 | } |
| 73 | |
| 74 | Ok(()) |
| 75 | } |
| 76 | |
| 77 | pub struct ByteCodec; |
| 78 | |
| 79 | impl Decoder for ByteCodec { |
| 80 | type Item = Vec<u8>; |
| 81 | type Error = io::Error; |
| 82 | |
| 83 | fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> { |
| 84 | let len = buf.len(); |
| 85 | Ok(Some(buf.split_to(len).to_vec())) |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | impl Encoder<&[u8]> for ByteCodec { |
| 90 | type Error = io::Error; |
| 91 | |
| 92 | fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { |
| 93 | buf.reserve(data.len()); |
| 94 | buf.put_slice(data); |
| 95 | Ok(()) |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | #[tokio::test ] |
| 100 | async fn send_framed_lines_codec() -> std::io::Result<()> { |
| 101 | let a_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
| 102 | let b_soc = UdpSocket::bind("127.0.0.1:0" ).await?; |
| 103 | |
| 104 | let a_addr = a_soc.local_addr()?; |
| 105 | let b_addr = b_soc.local_addr()?; |
| 106 | |
| 107 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
| 108 | let mut b = UdpFramed::new(b_soc, LinesCodec::new()); |
| 109 | |
| 110 | let msg = b"1 \r\n2 \r\n3 \r\n" .to_vec(); |
| 111 | a.send((&msg, b_addr)).await?; |
| 112 | |
| 113 | assert_eq!(b.next().await.unwrap().unwrap(), ("1" .to_string(), a_addr)); |
| 114 | assert_eq!(b.next().await.unwrap().unwrap(), ("2" .to_string(), a_addr)); |
| 115 | assert_eq!(b.next().await.unwrap().unwrap(), ("3" .to_string(), a_addr)); |
| 116 | |
| 117 | Ok(()) |
| 118 | } |
| 119 | |
| 120 | #[tokio::test ] |
| 121 | async fn framed_half() -> std::io::Result<()> { |
| 122 | let a_soc = Arc::new(UdpSocket::bind("127.0.0.1:0" ).await?); |
| 123 | let b_soc = a_soc.clone(); |
| 124 | |
| 125 | let a_addr = a_soc.local_addr()?; |
| 126 | let b_addr = b_soc.local_addr()?; |
| 127 | |
| 128 | let mut a = UdpFramed::new(a_soc, ByteCodec); |
| 129 | let mut b = UdpFramed::new(b_soc, LinesCodec::new()); |
| 130 | |
| 131 | let msg = b"1 \r\n2 \r\n3 \r\n" .to_vec(); |
| 132 | a.send((&msg, b_addr)).await?; |
| 133 | |
| 134 | let msg = b"4 \r\n5 \r\n6 \r\n" .to_vec(); |
| 135 | a.send((&msg, b_addr)).await?; |
| 136 | |
| 137 | assert_eq!(b.next().await.unwrap().unwrap(), ("1" .to_string(), a_addr)); |
| 138 | assert_eq!(b.next().await.unwrap().unwrap(), ("2" .to_string(), a_addr)); |
| 139 | assert_eq!(b.next().await.unwrap().unwrap(), ("3" .to_string(), a_addr)); |
| 140 | |
| 141 | assert_eq!(b.next().await.unwrap().unwrap(), ("4" .to_string(), a_addr)); |
| 142 | assert_eq!(b.next().await.unwrap().unwrap(), ("5" .to_string(), a_addr)); |
| 143 | assert_eq!(b.next().await.unwrap().unwrap(), ("6" .to_string(), a_addr)); |
| 144 | |
| 145 | Ok(()) |
| 146 | } |
| 147 | |