1// You can run this example from the root of the mio repo:
2// cargo run --example tcp_server --features="os-poll net"
3use mio::event::Event;
4use mio::net::{TcpListener, TcpStream};
5use mio::{Events, Interest, Poll, Registry, Token};
6use std::collections::HashMap;
7use std::io::{self, Read, Write};
8use std::str::from_utf8;
9
10// Setup some tokens to allow us to identify which event is for which socket.
11const SERVER: Token = Token(0);
12
13// Some data we'll send over the connection.
14const DATA: &[u8] = b"Hello world!\n";
15
16#[cfg(not(target_os = "wasi"))]
17fn main() -> io::Result<()> {
18 env_logger::init();
19
20 // Create a poll instance.
21 let mut poll = Poll::new()?;
22 // Create storage for events.
23 let mut events = Events::with_capacity(128);
24
25 // Setup the TCP server socket.
26 let addr = "127.0.0.1:9000".parse().unwrap();
27 let mut server = TcpListener::bind(addr)?;
28
29 // Register the server with poll we can receive events for it.
30 poll.registry()
31 .register(&mut server, SERVER, Interest::READABLE)?;
32
33 // Map of `Token` -> `TcpStream`.
34 let mut connections = HashMap::new();
35 // Unique token for each incoming connection.
36 let mut unique_token = Token(SERVER.0 + 1);
37
38 println!("You can connect to the server using `nc`:");
39 println!(" $ nc 127.0.0.1 9000");
40 println!("You'll see our welcome message and anything you type will be printed here.");
41
42 loop {
43 if let Err(err) = poll.poll(&mut events, None) {
44 if interrupted(&err) {
45 continue;
46 }
47 return Err(err);
48 }
49
50 for event in events.iter() {
51 match event.token() {
52 SERVER => loop {
53 // Received an event for the TCP server socket, which
54 // indicates we can accept an connection.
55 let (mut connection, address) = match server.accept() {
56 Ok((connection, address)) => (connection, address),
57 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
58 // If we get a `WouldBlock` error we know our
59 // listener has no more incoming connections queued,
60 // so we can return to polling and wait for some
61 // more.
62 break;
63 }
64 Err(e) => {
65 // If it was any other kind of error, something went
66 // wrong and we terminate with an error.
67 return Err(e);
68 }
69 };
70
71 println!("Accepted connection from: {}", address);
72
73 let token = next(&mut unique_token);
74 poll.registry().register(
75 &mut connection,
76 token,
77 Interest::READABLE.add(Interest::WRITABLE),
78 )?;
79
80 connections.insert(token, connection);
81 },
82 token => {
83 // Maybe received an event for a TCP connection.
84 let done = if let Some(connection) = connections.get_mut(&token) {
85 handle_connection_event(poll.registry(), connection, event)?
86 } else {
87 // Sporadic events happen, we can safely ignore them.
88 false
89 };
90 if done {
91 if let Some(mut connection) = connections.remove(&token) {
92 poll.registry().deregister(&mut connection)?;
93 }
94 }
95 }
96 }
97 }
98 }
99}
100
101fn next(current: &mut Token) -> Token {
102 let next = current.0;
103 current.0 += 1;
104 Token(next)
105}
106
107/// Returns `true` if the connection is done.
108fn handle_connection_event(
109 registry: &Registry,
110 connection: &mut TcpStream,
111 event: &Event,
112) -> io::Result<bool> {
113 if event.is_writable() {
114 // We can (maybe) write to the connection.
115 match connection.write(DATA) {
116 // We want to write the entire `DATA` buffer in a single go. If we
117 // write less we'll return a short write error (same as
118 // `io::Write::write_all` does).
119 Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
120 Ok(_) => {
121 // After we've written something we'll reregister the connection
122 // to only respond to readable events.
123 registry.reregister(connection, event.token(), Interest::READABLE)?
124 }
125 // Would block "errors" are the OS's way of saying that the
126 // connection is not actually ready to perform this I/O operation.
127 Err(ref err) if would_block(err) => {}
128 // Got interrupted (how rude!), we'll try again.
129 Err(ref err) if interrupted(err) => {
130 return handle_connection_event(registry, connection, event)
131 }
132 // Other errors we'll consider fatal.
133 Err(err) => return Err(err),
134 }
135 }
136
137 if event.is_readable() {
138 let mut connection_closed = false;
139 let mut received_data = vec![0; 4096];
140 let mut bytes_read = 0;
141 // We can (maybe) read from the connection.
142 loop {
143 match connection.read(&mut received_data[bytes_read..]) {
144 Ok(0) => {
145 // Reading 0 bytes means the other side has closed the
146 // connection or is done writing, then so are we.
147 connection_closed = true;
148 break;
149 }
150 Ok(n) => {
151 bytes_read += n;
152 if bytes_read == received_data.len() {
153 received_data.resize(received_data.len() + 1024, 0);
154 }
155 }
156 // Would block "errors" are the OS's way of saying that the
157 // connection is not actually ready to perform this I/O operation.
158 Err(ref err) if would_block(err) => break,
159 Err(ref err) if interrupted(err) => continue,
160 // Other errors we'll consider fatal.
161 Err(err) => return Err(err),
162 }
163 }
164
165 if bytes_read != 0 {
166 let received_data = &received_data[..bytes_read];
167 if let Ok(str_buf) = from_utf8(received_data) {
168 println!("Received data: {}", str_buf.trim_end());
169 } else {
170 println!("Received (none UTF-8) data: {:?}", received_data);
171 }
172 }
173
174 if connection_closed {
175 println!("Connection closed");
176 return Ok(true);
177 }
178 }
179
180 Ok(false)
181}
182
183fn would_block(err: &io::Error) -> bool {
184 err.kind() == io::ErrorKind::WouldBlock
185}
186
187fn interrupted(err: &io::Error) -> bool {
188 err.kind() == io::ErrorKind::Interrupted
189}
190
191#[cfg(target_os = "wasi")]
192fn main() {
193 panic!("can't bind to an address with wasi")
194}
195