1//! A chat server that broadcasts a message to all connections.
2//!
3//! This example is explicitly more verbose than it has to be. This is to
4//! illustrate more concepts.
5//!
6//! A chat server for telnet clients. After a telnet client connects, the first
7//! line should contain the client's name. After that, all lines sent by a
8//! client are broadcasted to all other connected clients.
9//!
10//! Because the client is telnet, lines are delimited by "\r\n".
11//!
12//! You can test this out by running:
13//!
14//! cargo run --example chat
15//!
16//! And then in another terminal run:
17//!
18//! telnet localhost 6142
19//!
20//! You can run the `telnet` command in any number of additional windows.
21//!
22//! You can run the second command in multiple windows and then chat between the
23//! two, seeing the messages from the other client as they're received. For all
24//! connected clients they'll all join the same room and see everyone else's
25//! messages.
26
27#![warn(rust_2018_idioms)]
28
29use tokio::net::{TcpListener, TcpStream};
30use tokio::sync::{mpsc, Mutex};
31use tokio_stream::StreamExt;
32use tokio_util::codec::{Framed, LinesCodec};
33
34use futures::SinkExt;
35use std::collections::HashMap;
36use std::env;
37use std::error::Error;
38use std::io;
39use std::net::SocketAddr;
40use std::sync::Arc;
41
42#[tokio::main]
43async fn main() -> Result<(), Box<dyn Error>> {
44 use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
45 // Configure a `tracing` subscriber that logs traces emitted by the chat
46 // server.
47 tracing_subscriber::fmt()
48 // Filter what traces are displayed based on the RUST_LOG environment
49 // variable.
50 //
51 // Traces emitted by the example code will always be displayed. You
52 // can set `RUST_LOG=tokio=trace` to enable additional traces emitted by
53 // Tokio itself.
54 .with_env_filter(EnvFilter::from_default_env().add_directive("chat=info".parse()?))
55 // Log events when `tracing` spans are created, entered, exited, or
56 // closed. When Tokio's internal tracing support is enabled (as
57 // described above), this can be used to track the lifecycle of spawned
58 // tasks on the Tokio runtime.
59 .with_span_events(FmtSpan::FULL)
60 // Set this subscriber as the default, to collect all traces emitted by
61 // the program.
62 .init();
63
64 // Create the shared state. This is how all the peers communicate.
65 //
66 // The server task will hold a handle to this. For every new client, the
67 // `state` handle is cloned and passed into the task that processes the
68 // client connection.
69 let state = Arc::new(Mutex::new(Shared::new()));
70
71 let addr = env::args()
72 .nth(1)
73 .unwrap_or_else(|| "127.0.0.1:6142".to_string());
74
75 // Bind a TCP listener to the socket address.
76 //
77 // Note that this is the Tokio TcpListener, which is fully async.
78 let listener = TcpListener::bind(&addr).await?;
79
80 tracing::info!("server running on {}", addr);
81
82 loop {
83 // Asynchronously wait for an inbound TcpStream.
84 let (stream, addr) = listener.accept().await?;
85
86 // Clone a handle to the `Shared` state for the new connection.
87 let state = Arc::clone(&state);
88
89 // Spawn our handler to be run asynchronously.
90 tokio::spawn(async move {
91 tracing::debug!("accepted connection");
92 if let Err(e) = process(state, stream, addr).await {
93 tracing::info!("an error occurred; error = {:?}", e);
94 }
95 });
96 }
97}
98
99/// Shorthand for the transmit half of the message channel.
100type Tx = mpsc::UnboundedSender<String>;
101
102/// Shorthand for the receive half of the message channel.
103type Rx = mpsc::UnboundedReceiver<String>;
104
105/// Data that is shared between all peers in the chat server.
106///
107/// This is the set of `Tx` handles for all connected clients. Whenever a
108/// message is received from a client, it is broadcasted to all peers by
109/// iterating over the `peers` entries and sending a copy of the message on each
110/// `Tx`.
111struct Shared {
112 peers: HashMap<SocketAddr, Tx>,
113}
114
115/// The state for each connected client.
116struct Peer {
117 /// The TCP socket wrapped with the `Lines` codec, defined below.
118 ///
119 /// This handles sending and receiving data on the socket. When using
120 /// `Lines`, we can work at the line level instead of having to manage the
121 /// raw byte operations.
122 lines: Framed<TcpStream, LinesCodec>,
123
124 /// Receive half of the message channel.
125 ///
126 /// This is used to receive messages from peers. When a message is received
127 /// off of this `Rx`, it will be written to the socket.
128 rx: Rx,
129}
130
131impl Shared {
132 /// Create a new, empty, instance of `Shared`.
133 fn new() -> Self {
134 Shared {
135 peers: HashMap::new(),
136 }
137 }
138
139 /// Send a `LineCodec` encoded message to every peer, except
140 /// for the sender.
141 async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
142 for peer in self.peers.iter_mut() {
143 if *peer.0 != sender {
144 let _ = peer.1.send(message.into());
145 }
146 }
147 }
148}
149
150impl Peer {
151 /// Create a new instance of `Peer`.
152 async fn new(
153 state: Arc<Mutex<Shared>>,
154 lines: Framed<TcpStream, LinesCodec>,
155 ) -> io::Result<Peer> {
156 // Get the client socket address
157 let addr = lines.get_ref().peer_addr()?;
158
159 // Create a channel for this peer
160 let (tx, rx) = mpsc::unbounded_channel();
161
162 // Add an entry for this `Peer` in the shared state map.
163 state.lock().await.peers.insert(addr, tx);
164
165 Ok(Peer { lines, rx })
166 }
167}
168
169/// Process an individual chat client
170async fn process(
171 state: Arc<Mutex<Shared>>,
172 stream: TcpStream,
173 addr: SocketAddr,
174) -> Result<(), Box<dyn Error>> {
175 let mut lines = Framed::new(stream, LinesCodec::new());
176
177 // Send a prompt to the client to enter their username.
178 lines.send("Please enter your username:").await?;
179
180 // Read the first line from the `LineCodec` stream to get the username.
181 let username = match lines.next().await {
182 Some(Ok(line)) => line,
183 // We didn't get a line so we return early here.
184 _ => {
185 tracing::error!("Failed to get username from {}. Client disconnected.", addr);
186 return Ok(());
187 }
188 };
189
190 // Register our peer with state which internally sets up some channels.
191 let mut peer = Peer::new(state.clone(), lines).await?;
192
193 // A client has connected, let's let everyone know.
194 {
195 let mut state = state.lock().await;
196 let msg = format!("{} has joined the chat", username);
197 tracing::info!("{}", msg);
198 state.broadcast(addr, &msg).await;
199 }
200
201 // Process incoming messages until our stream is exhausted by a disconnect.
202 loop {
203 tokio::select! {
204 // A message was received from a peer. Send it to the current user.
205 Some(msg) = peer.rx.recv() => {
206 peer.lines.send(&msg).await?;
207 }
208 result = peer.lines.next() => match result {
209 // A message was received from the current user, we should
210 // broadcast this message to the other users.
211 Some(Ok(msg)) => {
212 let mut state = state.lock().await;
213 let msg = format!("{}: {}", username, msg);
214
215 state.broadcast(addr, &msg).await;
216 }
217 // An error occurred.
218 Some(Err(e)) => {
219 tracing::error!(
220 "an error occurred while processing messages for {}; error = {:?}",
221 username,
222 e
223 );
224 }
225 // The stream has been exhausted.
226 None => break,
227 },
228 }
229 }
230
231 // If this section is reached it means that the client was disconnected!
232 // Let's let everyone still connected know about it.
233 {
234 let mut state = state.lock().await;
235 state.peers.remove(&addr);
236
237 let msg = format!("{} has left the chat", username);
238 tracing::info!("{}", msg);
239 state.broadcast(addr, &msg).await;
240 }
241
242 Ok(())
243}
244