1// You can run this example from the root of the mio repo:
2// cargo run --example tcp_listenfd_server --features="os-poll net"
3// or with wasi:
4// cargo +nightly build --target wasm32-wasi --example tcp_listenfd_server --features="os-poll net"
5// wasmtime run --tcplisten 127.0.0.1:9000 --env 'LISTEN_FDS=1' target/wasm32-wasi/debug/examples/tcp_listenfd_server.wasm
6
7use mio::event::Event;
8use mio::net::{TcpListener, TcpStream};
9use mio::{Events, Interest, Poll, Registry, Token};
10use std::collections::HashMap;
11use std::io::{self, Read, Write};
12use std::str::from_utf8;
13
14// Setup some tokens to allow us to identify which event is for which socket.
15const SERVER: Token = Token(0);
16
17// Some data we'll send over the connection.
18const DATA: &[u8] = b"Hello world!\n";
19
20#[cfg(not(windows))]
21fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
22 #[cfg(unix)]
23 use std::os::unix::io::FromRawFd;
24 #[cfg(target_os = "wasi")]
25 use std::os::wasi::io::FromRawFd;
26
27 let stdlistener = unsafe { std::net::TcpListener::from_raw_fd(3) };
28 stdlistener.set_nonblocking(true).unwrap();
29 Some(stdlistener)
30}
31
32#[cfg(windows)]
33fn get_first_listen_fd_listener() -> Option<std::net::TcpListener> {
34 // Windows does not support `LISTEN_FDS`
35 None
36}
37
38fn main() -> io::Result<()> {
39 env_logger::init();
40
41 std::env::var("LISTEN_FDS").expect("LISTEN_FDS environment variable unset");
42
43 // Create a poll instance.
44 let mut poll = Poll::new()?;
45 // Create storage for events.
46 let mut events = Events::with_capacity(128);
47
48 // Setup the TCP server socket.
49 let mut server = {
50 let stdlistener = get_first_listen_fd_listener().unwrap();
51 println!("Using preopened socket FD 3");
52 println!("You can connect to the server using `nc`:");
53 match stdlistener.local_addr() {
54 Ok(a) => println!(" $ nc {} {}", a.ip(), a.port()),
55 Err(_) => println!(" $ nc <IP> <PORT>"),
56 }
57 println!("You'll see our welcome message and anything you type will be printed here.");
58 TcpListener::from_std(stdlistener)
59 };
60
61 // Register the server with poll we can receive events for it.
62 poll.registry()
63 .register(&mut server, SERVER, Interest::READABLE)?;
64
65 // Map of `Token` -> `TcpStream`.
66 let mut connections = HashMap::new();
67 // Unique token for each incoming connection.
68 let mut unique_token = Token(SERVER.0 + 1);
69
70 loop {
71 poll.poll(&mut events, None)?;
72
73 for event in events.iter() {
74 match event.token() {
75 SERVER => loop {
76 // Received an event for the TCP server socket, which
77 // indicates we can accept an connection.
78 let (mut connection, address) = match server.accept() {
79 Ok((connection, address)) => (connection, address),
80 Err(ref e) if would_block(e) => {
81 // If we get a `WouldBlock` error we know our
82 // listener has no more incoming connections queued,
83 // so we can return to polling and wait for some
84 // more.
85 break;
86 }
87 Err(e) => {
88 // If it was any other kind of error, something went
89 // wrong and we terminate with an error.
90 return Err(e);
91 }
92 };
93
94 println!("Accepted connection from: {}", address);
95
96 let token = next(&mut unique_token);
97 poll.registry()
98 .register(&mut connection, token, Interest::WRITABLE)?;
99
100 connections.insert(token, connection);
101 },
102 token => {
103 // Maybe received an event for a TCP connection.
104 let done = if let Some(connection) = connections.get_mut(&token) {
105 handle_connection_event(poll.registry(), connection, event)?
106 } else {
107 // Sporadic events happen, we can safely ignore them.
108 false
109 };
110 if done {
111 if let Some(mut connection) = connections.remove(&token) {
112 poll.registry().deregister(&mut connection)?;
113 }
114 }
115 }
116 }
117 }
118 }
119}
120
121fn next(current: &mut Token) -> Token {
122 let next = current.0;
123 current.0 += 1;
124 Token(next)
125}
126
127/// Returns `true` if the connection is done.
128fn handle_connection_event(
129 registry: &Registry,
130 connection: &mut TcpStream,
131 event: &Event,
132) -> io::Result<bool> {
133 if event.is_writable() {
134 // We can (maybe) write to the connection.
135 match connection.write(DATA) {
136 // We want to write the entire `DATA` buffer in a single go. If we
137 // write less we'll return a short write error (same as
138 // `io::Write::write_all` does).
139 Ok(n) if n < DATA.len() => return Err(io::ErrorKind::WriteZero.into()),
140 Ok(_) => {
141 // After we've written something we'll reregister the connection
142 // to only respond to readable events.
143 registry.reregister(connection, event.token(), Interest::READABLE)?
144 }
145 // Would block "errors" are the OS's way of saying that the
146 // connection is not actually ready to perform this I/O operation.
147 Err(ref err) if would_block(err) => {}
148 // Got interrupted (how rude!), we'll try again.
149 Err(ref err) if interrupted(err) => {
150 return handle_connection_event(registry, connection, event)
151 }
152 // Other errors we'll consider fatal.
153 Err(err) => return Err(err),
154 }
155 }
156
157 if event.is_readable() {
158 let mut connection_closed = false;
159 let mut received_data = vec![0; 4096];
160 let mut bytes_read = 0;
161 // We can (maybe) read from the connection.
162 loop {
163 match connection.read(&mut received_data[bytes_read..]) {
164 Ok(0) => {
165 // Reading 0 bytes means the other side has closed the
166 // connection or is done writing, then so are we.
167 connection_closed = true;
168 break;
169 }
170 Ok(n) => {
171 bytes_read += n;
172 if bytes_read == received_data.len() {
173 received_data.resize(received_data.len() + 1024, 0);
174 }
175 }
176 // Would block "errors" are the OS's way of saying that the
177 // connection is not actually ready to perform this I/O operation.
178 Err(ref err) if would_block(err) => break,
179 Err(ref err) if interrupted(err) => continue,
180 // Other errors we'll consider fatal.
181 Err(err) => return Err(err),
182 }
183 }
184
185 if bytes_read != 0 {
186 let received_data = &received_data[..bytes_read];
187 if let Ok(str_buf) = from_utf8(received_data) {
188 println!("Received data: {}", str_buf.trim_end());
189 } else {
190 println!("Received (none UTF-8) data: {:?}", received_data);
191 }
192 }
193
194 if connection_closed {
195 println!("Connection closed");
196 return Ok(true);
197 }
198 }
199
200 Ok(false)
201}
202
203fn would_block(err: &io::Error) -> bool {
204 err.kind() == io::ErrorKind::WouldBlock
205}
206
207fn interrupted(err: &io::Error) -> bool {
208 err.kind() == io::ErrorKind::Interrupted
209}
210