1//! This example leverages `BytesCodec` to create a UDP client and server which
2//! speak a custom protocol.
3//!
4//! Here we're using the codec from `tokio-codec` to convert a UDP socket to a stream of
5//! client messages. These messages are then processed and returned back as a
6//! new message with a new destination. Overall, we then use this to construct a
7//! "ping pong" pair where two sockets are sending messages back and forth.
8
9#![warn(rust_2018_idioms)]
10
11use tokio::net::UdpSocket;
12use tokio::{io, time};
13use tokio_stream::StreamExt;
14use tokio_util::codec::BytesCodec;
15use tokio_util::udp::UdpFramed;
16
17use bytes::Bytes;
18use futures::{FutureExt, SinkExt};
19use std::env;
20use std::error::Error;
21use std::net::SocketAddr;
22use std::time::Duration;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn Error>> {
26 let addr = env::args()
27 .nth(1)
28 .unwrap_or_else(|| "127.0.0.1:0".to_string());
29
30 // Bind both our sockets and then figure out what ports we got.
31 let a = UdpSocket::bind(&addr).await?;
32 let b = UdpSocket::bind(&addr).await?;
33
34 let b_addr = b.local_addr()?;
35
36 let mut a = UdpFramed::new(a, BytesCodec::new());
37 let mut b = UdpFramed::new(b, BytesCodec::new());
38
39 // Start off by sending a ping from a to b, afterwards we just print out
40 // what they send us and continually send pings
41 let a = ping(&mut a, b_addr);
42
43 // The second client we have will receive the pings from `a` and then send
44 // back pongs.
45 let b = pong(&mut b);
46
47 // Run both futures simultaneously of `a` and `b` sending messages back and forth.
48 match tokio::try_join!(a, b) {
49 Err(e) => println!("an error occurred; error = {:?}", e),
50 _ => println!("done!"),
51 }
52
53 Ok(())
54}
55
56async fn ping(socket: &mut UdpFramed<BytesCodec>, b_addr: SocketAddr) -> Result<(), io::Error> {
57 socket.send((Bytes::from(&b"PING"[..]), b_addr)).await?;
58
59 for _ in 0..4usize {
60 let (bytes, addr) = socket.next().map(|e| e.unwrap()).await?;
61
62 println!("[a] recv: {}", String::from_utf8_lossy(&bytes));
63
64 socket.send((Bytes::from(&b"PING"[..]), addr)).await?;
65 }
66
67 Ok(())
68}
69
70async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
71 let timeout = Duration::from_millis(200);
72
73 while let Ok(Some(Ok((bytes, addr)))) = time::timeout(timeout, socket.next()).await {
74 println!("[b] recv: {}", String::from_utf8_lossy(&bytes));
75
76 socket.send((Bytes::from(&b"PONG"[..]), addr)).await?;
77 }
78
79 Ok(())
80}
81