1 | //! A "tiny database" and accompanying protocol |
2 | //! |
3 | //! This example shows the usage of shared state amongst all connected clients, |
4 | //! namely a database of key/value pairs. Each connected client can send a |
5 | //! series of GET/SET commands to query the current value of a key or set the |
6 | //! value of a key. |
7 | //! |
8 | //! This example has a simple protocol you can use to interact with the server. |
9 | //! To run, first run this in one terminal window: |
10 | //! |
11 | //! cargo run --example tinydb |
12 | //! |
13 | //! and next in another windows run: |
14 | //! |
15 | //! cargo run --example connect 127.0.0.1:8080 |
16 | //! |
17 | //! In the `connect` window you can type in commands where when you hit enter |
18 | //! you'll get a response from the server for that command. An example session |
19 | //! is: |
20 | //! |
21 | //! |
22 | //! $ cargo run --example connect 127.0.0.1:8080 |
23 | //! GET foo |
24 | //! foo = bar |
25 | //! GET FOOBAR |
26 | //! error: no key FOOBAR |
27 | //! SET FOOBAR my awesome string |
28 | //! set FOOBAR = `my awesome string`, previous: None |
29 | //! SET foo tokio |
30 | //! set foo = `tokio`, previous: Some("bar") |
31 | //! GET foo |
32 | //! foo = tokio |
33 | //! |
34 | //! Namely you can issue two forms of commands: |
35 | //! |
36 | //! * `GET $key` - this will fetch the value of `$key` from the database and |
37 | //! return it. The server's database is initially populated with the key `foo` |
38 | //! set to the value `bar` |
39 | //! * `SET $key $value` - this will set the value of `$key` to `$value`, |
40 | //! returning the previous value, if any. |
41 | |
42 | #![warn (rust_2018_idioms)] |
43 | |
44 | use tokio::net::TcpListener; |
45 | use tokio_stream::StreamExt; |
46 | use tokio_util::codec::{Framed, LinesCodec}; |
47 | |
48 | use futures::SinkExt; |
49 | use std::collections::HashMap; |
50 | use std::env; |
51 | use std::error::Error; |
52 | use std::sync::{Arc, Mutex}; |
53 | |
54 | /// The in-memory database shared amongst all clients. |
55 | /// |
56 | /// This database will be shared via `Arc`, so to mutate the internal map we're |
57 | /// going to use a `Mutex` for interior mutability. |
58 | struct Database { |
59 | map: Mutex<HashMap<String, String>>, |
60 | } |
61 | |
62 | /// Possible requests our clients can send us |
63 | enum Request { |
64 | Get { key: String }, |
65 | Set { key: String, value: String }, |
66 | } |
67 | |
68 | /// Responses to the `Request` commands above |
69 | enum Response { |
70 | Value { |
71 | key: String, |
72 | value: String, |
73 | }, |
74 | Set { |
75 | key: String, |
76 | value: String, |
77 | previous: Option<String>, |
78 | }, |
79 | Error { |
80 | msg: String, |
81 | }, |
82 | } |
83 | |
84 | #[tokio::main] |
85 | async fn main() -> Result<(), Box<dyn Error>> { |
86 | // Parse the address we're going to run this server on |
87 | // and set up our TCP listener to accept connections. |
88 | let addr = env::args() |
89 | .nth(1) |
90 | .unwrap_or_else(|| "127.0.0.1:8080" .to_string()); |
91 | |
92 | let listener = TcpListener::bind(&addr).await?; |
93 | println!("Listening on: {}" , addr); |
94 | |
95 | // Create the shared state of this server that will be shared amongst all |
96 | // clients. We populate the initial database and then create the `Database` |
97 | // structure. Note the usage of `Arc` here which will be used to ensure that |
98 | // each independently spawned client will have a reference to the in-memory |
99 | // database. |
100 | let mut initial_db = HashMap::new(); |
101 | initial_db.insert("foo" .to_string(), "bar" .to_string()); |
102 | let db = Arc::new(Database { |
103 | map: Mutex::new(initial_db), |
104 | }); |
105 | |
106 | loop { |
107 | match listener.accept().await { |
108 | Ok((socket, _)) => { |
109 | // After getting a new connection first we see a clone of the database |
110 | // being created, which is creating a new reference for this connected |
111 | // client to use. |
112 | let db = db.clone(); |
113 | |
114 | // Like with other small servers, we'll `spawn` this client to ensure it |
115 | // runs concurrently with all other clients. The `move` keyword is used |
116 | // here to move ownership of our db handle into the async closure. |
117 | tokio::spawn(async move { |
118 | // Since our protocol is line-based we use `tokio_codecs`'s `LineCodec` |
119 | // to convert our stream of bytes, `socket`, into a `Stream` of lines |
120 | // as well as convert our line based responses into a stream of bytes. |
121 | let mut lines = Framed::new(socket, LinesCodec::new()); |
122 | |
123 | // Here for every line we get back from the `Framed` decoder, |
124 | // we parse the request, and if it's valid we generate a response |
125 | // based on the values in the database. |
126 | while let Some(result) = lines.next().await { |
127 | match result { |
128 | Ok(line) => { |
129 | let response = handle_request(&line, &db); |
130 | |
131 | let response = response.serialize(); |
132 | |
133 | if let Err(e) = lines.send(response.as_str()).await { |
134 | println!("error on sending response; error = {:?}" , e); |
135 | } |
136 | } |
137 | Err(e) => { |
138 | println!("error on decoding from socket; error = {:?}" , e); |
139 | } |
140 | } |
141 | } |
142 | |
143 | // The connection will be closed at this point as `lines.next()` has returned `None`. |
144 | }); |
145 | } |
146 | Err(e) => println!("error accepting socket; error = {:?}" , e), |
147 | } |
148 | } |
149 | } |
150 | |
151 | fn handle_request(line: &str, db: &Arc<Database>) -> Response { |
152 | let request = match Request::parse(line) { |
153 | Ok(req) => req, |
154 | Err(e) => return Response::Error { msg: e }, |
155 | }; |
156 | |
157 | let mut db = db.map.lock().unwrap(); |
158 | match request { |
159 | Request::Get { key } => match db.get(&key) { |
160 | Some(value) => Response::Value { |
161 | key, |
162 | value: value.clone(), |
163 | }, |
164 | None => Response::Error { |
165 | msg: format!("no key {}" , key), |
166 | }, |
167 | }, |
168 | Request::Set { key, value } => { |
169 | let previous = db.insert(key.clone(), value.clone()); |
170 | Response::Set { |
171 | key, |
172 | value, |
173 | previous, |
174 | } |
175 | } |
176 | } |
177 | } |
178 | |
179 | impl Request { |
180 | fn parse(input: &str) -> Result<Request, String> { |
181 | let mut parts = input.splitn(3, ' ' ); |
182 | match parts.next() { |
183 | Some("GET" ) => { |
184 | let key = parts.next().ok_or("GET must be followed by a key" )?; |
185 | if parts.next().is_some() { |
186 | return Err("GET's key must not be followed by anything" .into()); |
187 | } |
188 | Ok(Request::Get { |
189 | key: key.to_string(), |
190 | }) |
191 | } |
192 | Some("SET" ) => { |
193 | let key = match parts.next() { |
194 | Some(key) => key, |
195 | None => return Err("SET must be followed by a key" .into()), |
196 | }; |
197 | let value = match parts.next() { |
198 | Some(value) => value, |
199 | None => return Err("SET needs a value" .into()), |
200 | }; |
201 | Ok(Request::Set { |
202 | key: key.to_string(), |
203 | value: value.to_string(), |
204 | }) |
205 | } |
206 | Some(cmd) => Err(format!("unknown command: {}" , cmd)), |
207 | None => Err("empty input" .into()), |
208 | } |
209 | } |
210 | } |
211 | |
212 | impl Response { |
213 | fn serialize(&self) -> String { |
214 | match *self { |
215 | Response::Value { ref key, ref value } => format!("{} = {}" , key, value), |
216 | Response::Set { |
217 | ref key, |
218 | ref value, |
219 | ref previous, |
220 | } => format!("set {} = `{}`, previous: {:?}" , key, value, previous), |
221 | Response::Error { ref msg } => format!("error: {}" , msg), |
222 | } |
223 | } |
224 | } |
225 | |