1 | //! An example of hooking up stdin/stdout to either a TCP or UDP stream. |
2 | //! |
3 | //! This example will connect to a socket address specified in the argument list |
4 | //! and then forward all data read on stdin to the server, printing out all data |
5 | //! received on stdout. An optional `--udp` argument can be passed to specify |
6 | //! that the connection should be made over UDP instead of TCP, translating each |
7 | //! line entered on stdin to a UDP packet to be sent to the remote address. |
8 | //! |
9 | //! Note that this is not currently optimized for performance, especially |
10 | //! around buffer management. Rather it's intended to show an example of |
11 | //! working with a client. |
12 | //! |
13 | //! This example can be quite useful when interacting with the other examples in |
14 | //! this repository! Many of them recommend running this as a simple "hook up |
15 | //! stdin/stdout to a server" to get up and running. |
16 | |
17 | #![warn (rust_2018_idioms)] |
18 | |
19 | use futures::StreamExt; |
20 | use tokio::io; |
21 | use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; |
22 | |
23 | use std::env; |
24 | use std::error::Error; |
25 | use std::net::SocketAddr; |
26 | |
27 | #[tokio::main] |
28 | async fn main() -> Result<(), Box<dyn Error>> { |
29 | // Determine if we're going to run in TCP or UDP mode |
30 | let mut args = env::args().skip(1).collect::<Vec<_>>(); |
31 | let tcp = match args.iter().position(|a| a == "--udp" ) { |
32 | Some(i) => { |
33 | args.remove(i); |
34 | false |
35 | } |
36 | None => true, |
37 | }; |
38 | |
39 | // Parse what address we're going to connect to |
40 | let addr = args |
41 | .first() |
42 | .ok_or("this program requires at least one argument" )?; |
43 | let addr = addr.parse::<SocketAddr>()?; |
44 | |
45 | let stdin = FramedRead::new(io::stdin(), BytesCodec::new()); |
46 | let stdin = stdin.map(|i| i.map(|bytes| bytes.freeze())); |
47 | let stdout = FramedWrite::new(io::stdout(), BytesCodec::new()); |
48 | |
49 | if tcp { |
50 | tcp::connect(&addr, stdin, stdout).await?; |
51 | } else { |
52 | udp::connect(&addr, stdin, stdout).await?; |
53 | } |
54 | |
55 | Ok(()) |
56 | } |
57 | |
58 | mod tcp { |
59 | use bytes::Bytes; |
60 | use futures::{future, Sink, SinkExt, Stream, StreamExt}; |
61 | use std::{error::Error, io, net::SocketAddr}; |
62 | use tokio::net::TcpStream; |
63 | use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; |
64 | |
65 | pub async fn connect( |
66 | addr: &SocketAddr, |
67 | mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, |
68 | mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, |
69 | ) -> Result<(), Box<dyn Error>> { |
70 | let mut stream = TcpStream::connect(addr).await?; |
71 | let (r, w) = stream.split(); |
72 | let mut sink = FramedWrite::new(w, BytesCodec::new()); |
73 | // filter map Result<BytesMut, Error> stream into just a Bytes stream to match stdout Sink |
74 | // on the event of an Error, log the error and end the stream |
75 | let mut stream = FramedRead::new(r, BytesCodec::new()) |
76 | .filter_map(|i| match i { |
77 | //BytesMut into Bytes |
78 | Ok(i) => future::ready(Some(i.freeze())), |
79 | Err(e) => { |
80 | println!("failed to read from socket; error={}" , e); |
81 | future::ready(None) |
82 | } |
83 | }) |
84 | .map(Ok); |
85 | |
86 | match future::join(sink.send_all(&mut stdin), stdout.send_all(&mut stream)).await { |
87 | (Err(e), _) | (_, Err(e)) => Err(e.into()), |
88 | _ => Ok(()), |
89 | } |
90 | } |
91 | } |
92 | |
93 | mod udp { |
94 | use bytes::Bytes; |
95 | use futures::{Sink, SinkExt, Stream, StreamExt}; |
96 | use std::error::Error; |
97 | use std::io; |
98 | use std::net::SocketAddr; |
99 | use tokio::net::UdpSocket; |
100 | |
101 | pub async fn connect( |
102 | addr: &SocketAddr, |
103 | stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, |
104 | stdout: impl Sink<Bytes, Error = io::Error> + Unpin, |
105 | ) -> Result<(), Box<dyn Error>> { |
106 | // We'll bind our UDP socket to a local IP/port, but for now we |
107 | // basically let the OS pick both of those. |
108 | let bind_addr = if addr.ip().is_ipv4() { |
109 | "0.0.0.0:0" |
110 | } else { |
111 | "[::]:0" |
112 | }; |
113 | |
114 | let socket = UdpSocket::bind(&bind_addr).await?; |
115 | socket.connect(addr).await?; |
116 | |
117 | tokio::try_join!(send(stdin, &socket), recv(stdout, &socket))?; |
118 | |
119 | Ok(()) |
120 | } |
121 | |
122 | async fn send( |
123 | mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, |
124 | writer: &UdpSocket, |
125 | ) -> Result<(), io::Error> { |
126 | while let Some(item) = stdin.next().await { |
127 | let buf = item?; |
128 | writer.send(&buf[..]).await?; |
129 | } |
130 | |
131 | Ok(()) |
132 | } |
133 | |
134 | async fn recv( |
135 | mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, |
136 | reader: &UdpSocket, |
137 | ) -> Result<(), io::Error> { |
138 | loop { |
139 | let mut buf = vec![0; 1024]; |
140 | let n = reader.recv(&mut buf[..]).await?; |
141 | |
142 | if n > 0 { |
143 | stdout.send(Bytes::from(buf)).await?; |
144 | } |
145 | } |
146 | } |
147 | } |
148 | |