| 1 | //! A "tiny database" and accompanying protocol |
| 2 | //! |
| 3 | //! This example shows the usage of shared state amongst all connected clients, |
| 4 | //! namely a database of key/value pairs. Each connected client can send a |
| 5 | //! series of GET/SET commands to query the current value of a key or set the |
| 6 | //! value of a key. |
| 7 | //! |
| 8 | //! This example has a simple protocol you can use to interact with the server. |
| 9 | //! To run, first run this in one terminal window: |
| 10 | //! |
| 11 | //! cargo run --example tinydb |
| 12 | //! |
| 13 | //! and next in another windows run: |
| 14 | //! |
| 15 | //! cargo run --example connect 127.0.0.1:8080 |
| 16 | //! |
| 17 | //! In the `connect` window you can type in commands where when you hit enter |
| 18 | //! you'll get a response from the server for that command. An example session |
| 19 | //! is: |
| 20 | //! |
| 21 | //! |
| 22 | //! $ cargo run --example connect 127.0.0.1:8080 |
| 23 | //! GET foo |
| 24 | //! foo = bar |
| 25 | //! GET FOOBAR |
| 26 | //! error: no key FOOBAR |
| 27 | //! SET FOOBAR my awesome string |
| 28 | //! set FOOBAR = `my awesome string`, previous: None |
| 29 | //! SET foo tokio |
| 30 | //! set foo = `tokio`, previous: Some("bar") |
| 31 | //! GET foo |
| 32 | //! foo = tokio |
| 33 | //! |
| 34 | //! Namely you can issue two forms of commands: |
| 35 | //! |
| 36 | //! * `GET $key` - this will fetch the value of `$key` from the database and |
| 37 | //! return it. The server's database is initially populated with the key `foo` |
| 38 | //! set to the value `bar` |
| 39 | //! * `SET $key $value` - this will set the value of `$key` to `$value`, |
| 40 | //! returning the previous value, if any. |
| 41 | |
| 42 | #![warn (rust_2018_idioms)] |
| 43 | |
| 44 | use tokio::net::TcpListener; |
| 45 | use tokio_stream::StreamExt; |
| 46 | use tokio_util::codec::{Framed, LinesCodec}; |
| 47 | |
| 48 | use futures::SinkExt; |
| 49 | use std::collections::HashMap; |
| 50 | use std::env; |
| 51 | use std::error::Error; |
| 52 | use std::sync::{Arc, Mutex}; |
| 53 | |
| 54 | /// The in-memory database shared amongst all clients. |
| 55 | /// |
| 56 | /// This database will be shared via `Arc`, so to mutate the internal map we're |
| 57 | /// going to use a `Mutex` for interior mutability. |
| 58 | struct Database { |
| 59 | map: Mutex<HashMap<String, String>>, |
| 60 | } |
| 61 | |
| 62 | /// Possible requests our clients can send us |
| 63 | enum Request { |
| 64 | Get { key: String }, |
| 65 | Set { key: String, value: String }, |
| 66 | } |
| 67 | |
| 68 | /// Responses to the `Request` commands above |
| 69 | enum Response { |
| 70 | Value { |
| 71 | key: String, |
| 72 | value: String, |
| 73 | }, |
| 74 | Set { |
| 75 | key: String, |
| 76 | value: String, |
| 77 | previous: Option<String>, |
| 78 | }, |
| 79 | Error { |
| 80 | msg: String, |
| 81 | }, |
| 82 | } |
| 83 | |
| 84 | #[tokio::main] |
| 85 | async fn main() -> Result<(), Box<dyn Error>> { |
| 86 | // Parse the address we're going to run this server on |
| 87 | // and set up our TCP listener to accept connections. |
| 88 | let addr = env::args() |
| 89 | .nth(1) |
| 90 | .unwrap_or_else(|| "127.0.0.1:8080" .to_string()); |
| 91 | |
| 92 | let listener = TcpListener::bind(&addr).await?; |
| 93 | println!("Listening on: {}" , addr); |
| 94 | |
| 95 | // Create the shared state of this server that will be shared amongst all |
| 96 | // clients. We populate the initial database and then create the `Database` |
| 97 | // structure. Note the usage of `Arc` here which will be used to ensure that |
| 98 | // each independently spawned client will have a reference to the in-memory |
| 99 | // database. |
| 100 | let mut initial_db = HashMap::new(); |
| 101 | initial_db.insert("foo" .to_string(), "bar" .to_string()); |
| 102 | let db = Arc::new(Database { |
| 103 | map: Mutex::new(initial_db), |
| 104 | }); |
| 105 | |
| 106 | loop { |
| 107 | match listener.accept().await { |
| 108 | Ok((socket, _)) => { |
| 109 | // After getting a new connection first we see a clone of the database |
| 110 | // being created, which is creating a new reference for this connected |
| 111 | // client to use. |
| 112 | let db = db.clone(); |
| 113 | |
| 114 | // Like with other small servers, we'll `spawn` this client to ensure it |
| 115 | // runs concurrently with all other clients. The `move` keyword is used |
| 116 | // here to move ownership of our db handle into the async closure. |
| 117 | tokio::spawn(async move { |
| 118 | // Since our protocol is line-based we use `tokio_codecs`'s `LineCodec` |
| 119 | // to convert our stream of bytes, `socket`, into a `Stream` of lines |
| 120 | // as well as convert our line based responses into a stream of bytes. |
| 121 | let mut lines = Framed::new(socket, LinesCodec::new()); |
| 122 | |
| 123 | // Here for every line we get back from the `Framed` decoder, |
| 124 | // we parse the request, and if it's valid we generate a response |
| 125 | // based on the values in the database. |
| 126 | while let Some(result) = lines.next().await { |
| 127 | match result { |
| 128 | Ok(line) => { |
| 129 | let response = handle_request(&line, &db); |
| 130 | |
| 131 | let response = response.serialize(); |
| 132 | |
| 133 | if let Err(e) = lines.send(response.as_str()).await { |
| 134 | println!("error on sending response; error = {:?}" , e); |
| 135 | } |
| 136 | } |
| 137 | Err(e) => { |
| 138 | println!("error on decoding from socket; error = {:?}" , e); |
| 139 | } |
| 140 | } |
| 141 | } |
| 142 | |
| 143 | // The connection will be closed at this point as `lines.next()` has returned `None`. |
| 144 | }); |
| 145 | } |
| 146 | Err(e) => println!("error accepting socket; error = {:?}" , e), |
| 147 | } |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | fn handle_request(line: &str, db: &Arc<Database>) -> Response { |
| 152 | let request = match Request::parse(line) { |
| 153 | Ok(req) => req, |
| 154 | Err(e) => return Response::Error { msg: e }, |
| 155 | }; |
| 156 | |
| 157 | let mut db = db.map.lock().unwrap(); |
| 158 | match request { |
| 159 | Request::Get { key } => match db.get(&key) { |
| 160 | Some(value) => Response::Value { |
| 161 | key, |
| 162 | value: value.clone(), |
| 163 | }, |
| 164 | None => Response::Error { |
| 165 | msg: format!("no key {}" , key), |
| 166 | }, |
| 167 | }, |
| 168 | Request::Set { key, value } => { |
| 169 | let previous = db.insert(key.clone(), value.clone()); |
| 170 | Response::Set { |
| 171 | key, |
| 172 | value, |
| 173 | previous, |
| 174 | } |
| 175 | } |
| 176 | } |
| 177 | } |
| 178 | |
| 179 | impl Request { |
| 180 | fn parse(input: &str) -> Result<Request, String> { |
| 181 | let mut parts = input.splitn(3, ' ' ); |
| 182 | match parts.next() { |
| 183 | Some("GET" ) => { |
| 184 | let key = parts.next().ok_or("GET must be followed by a key" )?; |
| 185 | if parts.next().is_some() { |
| 186 | return Err("GET's key must not be followed by anything" .into()); |
| 187 | } |
| 188 | Ok(Request::Get { |
| 189 | key: key.to_string(), |
| 190 | }) |
| 191 | } |
| 192 | Some("SET" ) => { |
| 193 | let key = match parts.next() { |
| 194 | Some(key) => key, |
| 195 | None => return Err("SET must be followed by a key" .into()), |
| 196 | }; |
| 197 | let value = match parts.next() { |
| 198 | Some(value) => value, |
| 199 | None => return Err("SET needs a value" .into()), |
| 200 | }; |
| 201 | Ok(Request::Set { |
| 202 | key: key.to_string(), |
| 203 | value: value.to_string(), |
| 204 | }) |
| 205 | } |
| 206 | Some(cmd) => Err(format!("unknown command: {}" , cmd)), |
| 207 | None => Err("empty input" .into()), |
| 208 | } |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | impl Response { |
| 213 | fn serialize(&self) -> String { |
| 214 | match *self { |
| 215 | Response::Value { ref key, ref value } => format!("{} = {}" , key, value), |
| 216 | Response::Set { |
| 217 | ref key, |
| 218 | ref value, |
| 219 | ref previous, |
| 220 | } => format!("set {} = `{}`, previous: {:?}" , key, value, previous), |
| 221 | Response::Error { ref msg } => format!("error: {}" , msg), |
| 222 | } |
| 223 | } |
| 224 | } |
| 225 | |