| 1 | //! An example of hooking up stdin/stdout to either a TCP or UDP stream. |
| 2 | //! |
| 3 | //! This example will connect to a socket address specified in the argument list |
| 4 | //! and then forward all data read on stdin to the server, printing out all data |
| 5 | //! received on stdout. An optional `--udp` argument can be passed to specify |
| 6 | //! that the connection should be made over UDP instead of TCP, translating each |
| 7 | //! line entered on stdin to a UDP packet to be sent to the remote address. |
| 8 | //! |
| 9 | //! Note that this is not currently optimized for performance, especially |
| 10 | //! around buffer management. Rather it's intended to show an example of |
| 11 | //! working with a client. |
| 12 | //! |
| 13 | //! This example can be quite useful when interacting with the other examples in |
| 14 | //! this repository! Many of them recommend running this as a simple "hook up |
| 15 | //! stdin/stdout to a server" to get up and running. |
| 16 | |
| 17 | #![warn (rust_2018_idioms)] |
| 18 | |
| 19 | use futures::StreamExt; |
| 20 | use tokio::io; |
| 21 | use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; |
| 22 | |
| 23 | use std::env; |
| 24 | use std::error::Error; |
| 25 | use std::net::SocketAddr; |
| 26 | |
| 27 | #[tokio::main] |
| 28 | async fn main() -> Result<(), Box<dyn Error>> { |
| 29 | // Determine if we're going to run in TCP or UDP mode |
| 30 | let mut args = env::args().skip(1).collect::<Vec<_>>(); |
| 31 | let tcp = match args.iter().position(|a| a == "--udp" ) { |
| 32 | Some(i) => { |
| 33 | args.remove(i); |
| 34 | false |
| 35 | } |
| 36 | None => true, |
| 37 | }; |
| 38 | |
| 39 | // Parse what address we're going to connect to |
| 40 | let addr = args |
| 41 | .first() |
| 42 | .ok_or("this program requires at least one argument" )?; |
| 43 | let addr = addr.parse::<SocketAddr>()?; |
| 44 | |
| 45 | let stdin = FramedRead::new(io::stdin(), BytesCodec::new()); |
| 46 | let stdin = stdin.map(|i| i.map(|bytes| bytes.freeze())); |
| 47 | let stdout = FramedWrite::new(io::stdout(), BytesCodec::new()); |
| 48 | |
| 49 | if tcp { |
| 50 | tcp::connect(&addr, stdin, stdout).await?; |
| 51 | } else { |
| 52 | udp::connect(&addr, stdin, stdout).await?; |
| 53 | } |
| 54 | |
| 55 | Ok(()) |
| 56 | } |
| 57 | |
| 58 | mod tcp { |
| 59 | use bytes::Bytes; |
| 60 | use futures::{future, Sink, SinkExt, Stream, StreamExt}; |
| 61 | use std::{error::Error, io, net::SocketAddr}; |
| 62 | use tokio::net::TcpStream; |
| 63 | use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; |
| 64 | |
| 65 | pub async fn connect( |
| 66 | addr: &SocketAddr, |
| 67 | mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, |
| 68 | mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, |
| 69 | ) -> Result<(), Box<dyn Error>> { |
| 70 | let mut stream = TcpStream::connect(addr).await?; |
| 71 | let (r, w) = stream.split(); |
| 72 | let mut sink = FramedWrite::new(w, BytesCodec::new()); |
| 73 | // filter map Result<BytesMut, Error> stream into just a Bytes stream to match stdout Sink |
| 74 | // on the event of an Error, log the error and end the stream |
| 75 | let mut stream = FramedRead::new(r, BytesCodec::new()) |
| 76 | .filter_map(|i| match i { |
| 77 | //BytesMut into Bytes |
| 78 | Ok(i) => future::ready(Some(i.freeze())), |
| 79 | Err(e) => { |
| 80 | println!("failed to read from socket; error={}" , e); |
| 81 | future::ready(None) |
| 82 | } |
| 83 | }) |
| 84 | .map(Ok); |
| 85 | |
| 86 | match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { |
| 87 | (Err(e), _) | (_, Err(e)) => Err(e.into()), |
| 88 | _ => Ok(()), |
| 89 | } |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | mod udp { |
| 94 | use bytes::Bytes; |
| 95 | use futures::{Sink, SinkExt, Stream, StreamExt}; |
| 96 | use std::error::Error; |
| 97 | use std::io; |
| 98 | use std::net::SocketAddr; |
| 99 | use tokio::net::UdpSocket; |
| 100 | |
| 101 | pub async fn connect( |
| 102 | addr: &SocketAddr, |
| 103 | stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, |
| 104 | stdout: impl Sink<Bytes, Error = io::Error> + Unpin, |
| 105 | ) -> Result<(), Box<dyn Error>> { |
| 106 | // We'll bind our UDP socket to a local IP/port, but for now we |
| 107 | // basically let the OS pick both of those. |
| 108 | let bind_addr = if addr.ip().is_ipv4() { |
| 109 | "0.0.0.0:0" |
| 110 | } else { |
| 111 | "[::]:0" |
| 112 | }; |
| 113 | |
| 114 | let socket = UdpSocket::bind(&bind_addr).await?; |
| 115 | socket.connect(addr).await?; |
| 116 | |
| 117 | tokio::try_join!(send(stdin, &socket), recv(stdout, &socket))?; |
| 118 | |
| 119 | Ok(()) |
| 120 | } |
| 121 | |
| 122 | async fn send( |
| 123 | mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, |
| 124 | writer: &UdpSocket, |
| 125 | ) -> Result<(), io::Error> { |
| 126 | while let Some(item) = stdin.next().await { |
| 127 | let buf = item?; |
| 128 | writer.send(&buf[..]).await?; |
| 129 | } |
| 130 | |
| 131 | Ok(()) |
| 132 | } |
| 133 | |
| 134 | async fn recv( |
| 135 | mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, |
| 136 | reader: &UdpSocket, |
| 137 | ) -> Result<(), io::Error> { |
| 138 | loop { |
| 139 | let mut buf = vec![0; 1024]; |
| 140 | let n = reader.recv(&mut buf[..]).await?; |
| 141 | |
| 142 | if n > 0 { |
| 143 | stdout.send(Bytes::from(buf)).await?; |
| 144 | } |
| 145 | } |
| 146 | } |
| 147 | } |
| 148 | |