1//! A "tiny" example of HTTP request/response handling using transports.
2//!
3//! This example is intended for *learning purposes* to see how various pieces
4//! hook up together and how HTTP can get up and running. Note that this example
5//! is written with the restriction that it *can't* use any "big" library other
6//! than Tokio, if you'd like a "real world" HTTP library you likely want a
7//! crate like Hyper.
8//!
9//! Code here is based on the `echo-threads` example and implements two paths,
10//! the `/plaintext` and `/json` routes to respond with some text and json,
11//! respectively. By default this will run I/O on all the cores your system has
12//! available, and it doesn't support HTTP request bodies.
13
14#![warn(rust_2018_idioms)]
15
16use bytes::BytesMut;
17use futures::SinkExt;
18use http::{header::HeaderValue, Request, Response, StatusCode};
19#[macro_use]
20extern crate serde_derive;
21use std::{env, error::Error, fmt, io};
22use tokio::net::{TcpListener, TcpStream};
23use tokio_stream::StreamExt;
24use tokio_util::codec::{Decoder, Encoder, Framed};
25
26#[tokio::main]
27async fn main() -> Result<(), Box<dyn Error>> {
28 // Parse the arguments, bind the TCP socket we'll be listening to, spin up
29 // our worker threads, and start shipping sockets to those worker threads.
30 let addr = env::args()
31 .nth(1)
32 .unwrap_or_else(|| "127.0.0.1:8080".to_string());
33 let server = TcpListener::bind(&addr).await?;
34 println!("Listening on: {}", addr);
35
36 loop {
37 let (stream, _) = server.accept().await?;
38 tokio::spawn(async move {
39 if let Err(e) = process(stream).await {
40 println!("failed to process connection; error = {}", e);
41 }
42 });
43 }
44}
45
46async fn process(stream: TcpStream) -> Result<(), Box<dyn Error>> {
47 let mut transport = Framed::new(stream, Http);
48
49 while let Some(request) = transport.next().await {
50 match request {
51 Ok(request) => {
52 let response = respond(request).await?;
53 transport.send(response).await?;
54 }
55 Err(e) => return Err(e.into()),
56 }
57 }
58
59 Ok(())
60}
61
62async fn respond(req: Request<()>) -> Result<Response<String>, Box<dyn Error>> {
63 let mut response = Response::builder();
64 let body = match req.uri().path() {
65 "/plaintext" => {
66 response = response.header("Content-Type", "text/plain");
67 "Hello, World!".to_string()
68 }
69 "/json" => {
70 response = response.header("Content-Type", "application/json");
71
72 #[derive(Serialize)]
73 struct Message {
74 message: &'static str,
75 }
76 serde_json::to_string(&Message {
77 message: "Hello, World!",
78 })?
79 }
80 _ => {
81 response = response.status(StatusCode::NOT_FOUND);
82 String::new()
83 }
84 };
85 let response = response
86 .body(body)
87 .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
88
89 Ok(response)
90}
91
92struct Http;
93
94/// Implementation of encoding an HTTP response into a `BytesMut`, basically
95/// just writing out an HTTP/1.1 response.
96impl Encoder<Response<String>> for Http {
97 type Error = io::Error;
98
99 fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> {
100 use std::fmt::Write;
101
102 write!(
103 BytesWrite(dst),
104 "\
105 HTTP/1.1 {}\r\n\
106 Server: Example\r\n\
107 Content-Length: {}\r\n\
108 Date: {}\r\n\
109 ",
110 item.status(),
111 item.body().len(),
112 date::now()
113 )
114 .unwrap();
115
116 for (k, v) in item.headers() {
117 dst.extend_from_slice(k.as_str().as_bytes());
118 dst.extend_from_slice(b": ");
119 dst.extend_from_slice(v.as_bytes());
120 dst.extend_from_slice(b"\r\n");
121 }
122
123 dst.extend_from_slice(b"\r\n");
124 dst.extend_from_slice(item.body().as_bytes());
125
126 return Ok(());
127
128 // Right now `write!` on `Vec<u8>` goes through io::Write and is not
129 // super speedy, so inline a less-crufty implementation here which
130 // doesn't go through io::Error.
131 struct BytesWrite<'a>(&'a mut BytesMut);
132
133 impl fmt::Write for BytesWrite<'_> {
134 fn write_str(&mut self, s: &str) -> fmt::Result {
135 self.0.extend_from_slice(s.as_bytes());
136 Ok(())
137 }
138
139 fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result {
140 fmt::write(self, args)
141 }
142 }
143 }
144}
145
146/// Implementation of decoding an HTTP request from the bytes we've read so far.
147/// This leverages the `httparse` crate to do the actual parsing and then we use
148/// that information to construct an instance of a `http::Request` object,
149/// trying to avoid allocations where possible.
150impl Decoder for Http {
151 type Item = Request<()>;
152 type Error = io::Error;
153
154 fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> {
155 // TODO: we should grow this headers array if parsing fails and asks
156 // for more headers
157 let mut headers = [None; 16];
158 let (method, path, version, amt) = {
159 let mut parsed_headers = [httparse::EMPTY_HEADER; 16];
160 let mut r = httparse::Request::new(&mut parsed_headers);
161 let status = r.parse(src).map_err(|e| {
162 let msg = format!("failed to parse http request: {:?}", e);
163 io::Error::new(io::ErrorKind::Other, msg)
164 })?;
165
166 let amt = match status {
167 httparse::Status::Complete(amt) => amt,
168 httparse::Status::Partial => return Ok(None),
169 };
170
171 let toslice = |a: &[u8]| {
172 let start = a.as_ptr() as usize - src.as_ptr() as usize;
173 assert!(start < src.len());
174 (start, start + a.len())
175 };
176
177 for (i, header) in r.headers.iter().enumerate() {
178 let k = toslice(header.name.as_bytes());
179 let v = toslice(header.value);
180 headers[i] = Some((k, v));
181 }
182
183 let method = http::Method::try_from(r.method.unwrap())
184 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
185
186 (
187 method,
188 toslice(r.path.unwrap().as_bytes()),
189 r.version.unwrap(),
190 amt,
191 )
192 };
193 if version != 1 {
194 return Err(io::Error::new(
195 io::ErrorKind::Other,
196 "only HTTP/1.1 accepted",
197 ));
198 }
199 let data = src.split_to(amt).freeze();
200 let mut ret = Request::builder();
201 ret = ret.method(method);
202 let s = data.slice(path.0..path.1);
203 let s = unsafe { String::from_utf8_unchecked(Vec::from(s.as_ref())) };
204 ret = ret.uri(s);
205 ret = ret.version(http::Version::HTTP_11);
206 for header in headers.iter() {
207 let (k, v) = match *header {
208 Some((ref k, ref v)) => (k, v),
209 None => break,
210 };
211 let value = HeaderValue::from_bytes(data.slice(v.0..v.1).as_ref())
212 .map_err(|_| io::Error::new(io::ErrorKind::Other, "header decode error"))?;
213 ret = ret.header(&data[k.0..k.1], value);
214 }
215
216 let req = ret
217 .body(())
218 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
219 Ok(Some(req))
220 }
221}
222
223mod date {
224 use std::cell::RefCell;
225 use std::fmt::{self, Write};
226 use std::str;
227 use std::time::SystemTime;
228
229 use httpdate::HttpDate;
230
231 pub struct Now(());
232
233 /// Returns a struct, which when formatted, renders an appropriate `Date`
234 /// header value.
235 pub fn now() -> Now {
236 Now(())
237 }
238
239 // Gee Alex, doesn't this seem like premature optimization. Well you see
240 // there Billy, you're absolutely correct! If your server is *bottlenecked*
241 // on rendering the `Date` header, well then boy do I have news for you, you
242 // don't need this optimization.
243 //
244 // In all seriousness, though, a simple "hello world" benchmark which just
245 // sends back literally "hello world" with standard headers actually is
246 // bottlenecked on rendering a date into a byte buffer. Since it was at the
247 // top of a profile, and this was done for some competitive benchmarks, this
248 // module was written.
249 //
250 // Just to be clear, though, I was not intending on doing this because it
251 // really does seem kinda absurd, but it was done by someone else [1], so I
252 // blame them! :)
253 //
254 // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66
255
256 struct LastRenderedNow {
257 bytes: [u8; 128],
258 amt: usize,
259 unix_date: u64,
260 }
261
262 thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow {
263 bytes: [0; 128],
264 amt: 0,
265 unix_date: 0,
266 }));
267
268 impl fmt::Display for Now {
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 LAST.with(|cache| {
271 let mut cache = cache.borrow_mut();
272 let now = SystemTime::now();
273 let now_unix = now
274 .duration_since(SystemTime::UNIX_EPOCH)
275 .map(|since_epoch| since_epoch.as_secs())
276 .unwrap_or(0);
277 if cache.unix_date != now_unix {
278 cache.update(now, now_unix);
279 }
280 f.write_str(cache.buffer())
281 })
282 }
283 }
284
285 impl LastRenderedNow {
286 fn buffer(&self) -> &str {
287 str::from_utf8(&self.bytes[..self.amt]).unwrap()
288 }
289
290 fn update(&mut self, now: SystemTime, now_unix: u64) {
291 self.amt = 0;
292 self.unix_date = now_unix;
293 write!(LocalBuffer(self), "{}", HttpDate::from(now)).unwrap();
294 }
295 }
296
297 struct LocalBuffer<'a>(&'a mut LastRenderedNow);
298
299 impl fmt::Write for LocalBuffer<'_> {
300 fn write_str(&mut self, s: &str) -> fmt::Result {
301 let start = self.0.amt;
302 let end = start + s.len();
303 self.0.bytes[start..end].copy_from_slice(s.as_bytes());
304 self.0.amt += s.len();
305 Ok(())
306 }
307 }
308}
309