| 1 | //! A "tiny" example of HTTP request/response handling using transports. |
| 2 | //! |
| 3 | //! This example is intended for *learning purposes* to see how various pieces |
| 4 | //! hook up together and how HTTP can get up and running. Note that this example |
| 5 | //! is written with the restriction that it *can't* use any "big" library other |
| 6 | //! than Tokio, if you'd like a "real world" HTTP library you likely want a |
| 7 | //! crate like Hyper. |
| 8 | //! |
| 9 | //! Code here is based on the `echo-threads` example and implements two paths, |
| 10 | //! the `/plaintext` and `/json` routes to respond with some text and json, |
| 11 | //! respectively. By default this will run I/O on all the cores your system has |
| 12 | //! available, and it doesn't support HTTP request bodies. |
| 13 | |
| 14 | #![warn (rust_2018_idioms)] |
| 15 | |
| 16 | use bytes::BytesMut; |
| 17 | use futures::SinkExt; |
| 18 | use http::{header::HeaderValue, Request, Response, StatusCode}; |
| 19 | #[macro_use ] |
| 20 | extern crate serde_derive; |
| 21 | use std::{env, error::Error, fmt, io}; |
| 22 | use tokio::net::{TcpListener, TcpStream}; |
| 23 | use tokio_stream::StreamExt; |
| 24 | use tokio_util::codec::{Decoder, Encoder, Framed}; |
| 25 | |
| 26 | #[tokio::main] |
| 27 | async fn main() -> Result<(), Box<dyn Error>> { |
| 28 | // Parse the arguments, bind the TCP socket we'll be listening to, spin up |
| 29 | // our worker threads, and start shipping sockets to those worker threads. |
| 30 | let addr = env::args() |
| 31 | .nth(1) |
| 32 | .unwrap_or_else(|| "127.0.0.1:8080" .to_string()); |
| 33 | let server = TcpListener::bind(&addr).await?; |
| 34 | println!("Listening on: {}" , addr); |
| 35 | |
| 36 | loop { |
| 37 | let (stream, _) = server.accept().await?; |
| 38 | tokio::spawn(async move { |
| 39 | if let Err(e) = process(stream).await { |
| 40 | println!("failed to process connection; error = {}" , e); |
| 41 | } |
| 42 | }); |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | async fn process(stream: TcpStream) -> Result<(), Box<dyn Error>> { |
| 47 | let mut transport = Framed::new(stream, Http); |
| 48 | |
| 49 | while let Some(request) = transport.next().await { |
| 50 | match request { |
| 51 | Ok(request) => { |
| 52 | let response = respond(request).await?; |
| 53 | transport.send(response).await?; |
| 54 | } |
| 55 | Err(e) => return Err(e.into()), |
| 56 | } |
| 57 | } |
| 58 | |
| 59 | Ok(()) |
| 60 | } |
| 61 | |
| 62 | async fn respond(req: Request<()>) -> Result<Response<String>, Box<dyn Error>> { |
| 63 | let mut response = Response::builder(); |
| 64 | let body = match req.uri().path() { |
| 65 | "/plaintext" => { |
| 66 | response = response.header("Content-Type" , "text/plain" ); |
| 67 | "Hello, World!" .to_string() |
| 68 | } |
| 69 | "/json" => { |
| 70 | response = response.header("Content-Type" , "application/json" ); |
| 71 | |
| 72 | #[derive(Serialize)] |
| 73 | struct Message { |
| 74 | message: &'static str, |
| 75 | } |
| 76 | serde_json::to_string(&Message { |
| 77 | message: "Hello, World!" , |
| 78 | })? |
| 79 | } |
| 80 | _ => { |
| 81 | response = response.status(StatusCode::NOT_FOUND); |
| 82 | String::new() |
| 83 | } |
| 84 | }; |
| 85 | let response = response |
| 86 | .body(body) |
| 87 | .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; |
| 88 | |
| 89 | Ok(response) |
| 90 | } |
| 91 | |
| 92 | struct Http; |
| 93 | |
| 94 | /// Implementation of encoding an HTTP response into a `BytesMut`, basically |
| 95 | /// just writing out an HTTP/1.1 response. |
| 96 | impl Encoder<Response<String>> for Http { |
| 97 | type Error = io::Error; |
| 98 | |
| 99 | fn encode(&mut self, item: Response<String>, dst: &mut BytesMut) -> io::Result<()> { |
| 100 | use std::fmt::Write; |
| 101 | |
| 102 | write!( |
| 103 | BytesWrite(dst), |
| 104 | "\ |
| 105 | HTTP/1.1 {} \r\n\ |
| 106 | Server: Example \r\n\ |
| 107 | Content-Length: {} \r\n\ |
| 108 | Date: {} \r\n\ |
| 109 | " , |
| 110 | item.status(), |
| 111 | item.body().len(), |
| 112 | date::now() |
| 113 | ) |
| 114 | .unwrap(); |
| 115 | |
| 116 | for (k, v) in item.headers() { |
| 117 | dst.extend_from_slice(k.as_str().as_bytes()); |
| 118 | dst.extend_from_slice(b": " ); |
| 119 | dst.extend_from_slice(v.as_bytes()); |
| 120 | dst.extend_from_slice(b" \r\n" ); |
| 121 | } |
| 122 | |
| 123 | dst.extend_from_slice(b" \r\n" ); |
| 124 | dst.extend_from_slice(item.body().as_bytes()); |
| 125 | |
| 126 | return Ok(()); |
| 127 | |
| 128 | // Right now `write!` on `Vec<u8>` goes through io::Write and is not |
| 129 | // super speedy, so inline a less-crufty implementation here which |
| 130 | // doesn't go through io::Error. |
| 131 | struct BytesWrite<'a>(&'a mut BytesMut); |
| 132 | |
| 133 | impl fmt::Write for BytesWrite<'_> { |
| 134 | fn write_str(&mut self, s: &str) -> fmt::Result { |
| 135 | self.0.extend_from_slice(s.as_bytes()); |
| 136 | Ok(()) |
| 137 | } |
| 138 | |
| 139 | fn write_fmt(&mut self, args: fmt::Arguments<'_>) -> fmt::Result { |
| 140 | fmt::write(self, args) |
| 141 | } |
| 142 | } |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | /// Implementation of decoding an HTTP request from the bytes we've read so far. |
| 147 | /// This leverages the `httparse` crate to do the actual parsing and then we use |
| 148 | /// that information to construct an instance of a `http::Request` object, |
| 149 | /// trying to avoid allocations where possible. |
| 150 | impl Decoder for Http { |
| 151 | type Item = Request<()>; |
| 152 | type Error = io::Error; |
| 153 | |
| 154 | fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Request<()>>> { |
| 155 | // TODO: we should grow this headers array if parsing fails and asks |
| 156 | // for more headers |
| 157 | let mut headers = [None; 16]; |
| 158 | let (method, path, version, amt) = { |
| 159 | let mut parsed_headers = [httparse::EMPTY_HEADER; 16]; |
| 160 | let mut r = httparse::Request::new(&mut parsed_headers); |
| 161 | let status = r.parse(src).map_err(|e| { |
| 162 | let msg = format!("failed to parse http request: {:?}" , e); |
| 163 | io::Error::new(io::ErrorKind::Other, msg) |
| 164 | })?; |
| 165 | |
| 166 | let amt = match status { |
| 167 | httparse::Status::Complete(amt) => amt, |
| 168 | httparse::Status::Partial => return Ok(None), |
| 169 | }; |
| 170 | |
| 171 | let toslice = |a: &[u8]| { |
| 172 | let start = a.as_ptr() as usize - src.as_ptr() as usize; |
| 173 | assert!(start < src.len()); |
| 174 | (start, start + a.len()) |
| 175 | }; |
| 176 | |
| 177 | for (i, header) in r.headers.iter().enumerate() { |
| 178 | let k = toslice(header.name.as_bytes()); |
| 179 | let v = toslice(header.value); |
| 180 | headers[i] = Some((k, v)); |
| 181 | } |
| 182 | |
| 183 | let method = http::Method::try_from(r.method.unwrap()) |
| 184 | .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; |
| 185 | |
| 186 | ( |
| 187 | method, |
| 188 | toslice(r.path.unwrap().as_bytes()), |
| 189 | r.version.unwrap(), |
| 190 | amt, |
| 191 | ) |
| 192 | }; |
| 193 | if version != 1 { |
| 194 | return Err(io::Error::new( |
| 195 | io::ErrorKind::Other, |
| 196 | "only HTTP/1.1 accepted" , |
| 197 | )); |
| 198 | } |
| 199 | let data = src.split_to(amt).freeze(); |
| 200 | let mut ret = Request::builder(); |
| 201 | ret = ret.method(method); |
| 202 | let s = data.slice(path.0..path.1); |
| 203 | let s = unsafe { String::from_utf8_unchecked(Vec::from(s.as_ref())) }; |
| 204 | ret = ret.uri(s); |
| 205 | ret = ret.version(http::Version::HTTP_11); |
| 206 | for header in headers.iter() { |
| 207 | let (k, v) = match *header { |
| 208 | Some((ref k, ref v)) => (k, v), |
| 209 | None => break, |
| 210 | }; |
| 211 | let value = HeaderValue::from_bytes(data.slice(v.0..v.1).as_ref()) |
| 212 | .map_err(|_| io::Error::new(io::ErrorKind::Other, "header decode error" ))?; |
| 213 | ret = ret.header(&data[k.0..k.1], value); |
| 214 | } |
| 215 | |
| 216 | let req = ret |
| 217 | .body(()) |
| 218 | .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; |
| 219 | Ok(Some(req)) |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | mod date { |
| 224 | use std::cell::RefCell; |
| 225 | use std::fmt::{self, Write}; |
| 226 | use std::str; |
| 227 | use std::time::SystemTime; |
| 228 | |
| 229 | use httpdate::HttpDate; |
| 230 | |
| 231 | pub struct Now(()); |
| 232 | |
| 233 | /// Returns a struct, which when formatted, renders an appropriate `Date` |
| 234 | /// header value. |
| 235 | pub fn now() -> Now { |
| 236 | Now(()) |
| 237 | } |
| 238 | |
| 239 | // Gee Alex, doesn't this seem like premature optimization. Well you see |
| 240 | // there Billy, you're absolutely correct! If your server is *bottlenecked* |
| 241 | // on rendering the `Date` header, well then boy do I have news for you, you |
| 242 | // don't need this optimization. |
| 243 | // |
| 244 | // In all seriousness, though, a simple "hello world" benchmark which just |
| 245 | // sends back literally "hello world" with standard headers actually is |
| 246 | // bottlenecked on rendering a date into a byte buffer. Since it was at the |
| 247 | // top of a profile, and this was done for some competitive benchmarks, this |
| 248 | // module was written. |
| 249 | // |
| 250 | // Just to be clear, though, I was not intending on doing this because it |
| 251 | // really does seem kinda absurd, but it was done by someone else [1], so I |
| 252 | // blame them! :) |
| 253 | // |
| 254 | // [1]: https://github.com/rapidoid/rapidoid/blob/f1c55c0555007e986b5d069fe1086e6d09933f7b/rapidoid-commons/src/main/java/org/rapidoid/commons/Dates.java#L48-L66 |
| 255 | |
| 256 | struct LastRenderedNow { |
| 257 | bytes: [u8; 128], |
| 258 | amt: usize, |
| 259 | unix_date: u64, |
| 260 | } |
| 261 | |
| 262 | thread_local!(static LAST: RefCell<LastRenderedNow> = RefCell::new(LastRenderedNow { |
| 263 | bytes: [0; 128], |
| 264 | amt: 0, |
| 265 | unix_date: 0, |
| 266 | })); |
| 267 | |
| 268 | impl fmt::Display for Now { |
| 269 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 270 | LAST.with(|cache| { |
| 271 | let mut cache = cache.borrow_mut(); |
| 272 | let now = SystemTime::now(); |
| 273 | let now_unix = now |
| 274 | .duration_since(SystemTime::UNIX_EPOCH) |
| 275 | .map(|since_epoch| since_epoch.as_secs()) |
| 276 | .unwrap_or(0); |
| 277 | if cache.unix_date != now_unix { |
| 278 | cache.update(now, now_unix); |
| 279 | } |
| 280 | f.write_str(cache.buffer()) |
| 281 | }) |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | impl LastRenderedNow { |
| 286 | fn buffer(&self) -> &str { |
| 287 | str::from_utf8(&self.bytes[..self.amt]).unwrap() |
| 288 | } |
| 289 | |
| 290 | fn update(&mut self, now: SystemTime, now_unix: u64) { |
| 291 | self.amt = 0; |
| 292 | self.unix_date = now_unix; |
| 293 | write!(LocalBuffer(self), "{}" , HttpDate::from(now)).unwrap(); |
| 294 | } |
| 295 | } |
| 296 | |
| 297 | struct LocalBuffer<'a>(&'a mut LastRenderedNow); |
| 298 | |
| 299 | impl fmt::Write for LocalBuffer<'_> { |
| 300 | fn write_str(&mut self, s: &str) -> fmt::Result { |
| 301 | let start = self.0.amt; |
| 302 | let end = start + s.len(); |
| 303 | self.0.bytes[start..end].copy_from_slice(s.as_bytes()); |
| 304 | self.0.amt += s.len(); |
| 305 | Ok(()) |
| 306 | } |
| 307 | } |
| 308 | } |
| 309 | |