1//! A "tiny database" and accompanying protocol
2//!
3//! This example shows the usage of shared state amongst all connected clients,
4//! namely a database of key/value pairs. Each connected client can send a
5//! series of GET/SET commands to query the current value of a key or set the
6//! value of a key.
7//!
8//! This example has a simple protocol you can use to interact with the server.
9//! To run, first run this in one terminal window:
10//!
11//! cargo run --example tinydb
12//!
13//! and next in another windows run:
14//!
15//! cargo run --example connect 127.0.0.1:8080
16//!
17//! In the `connect` window you can type in commands where when you hit enter
18//! you'll get a response from the server for that command. An example session
19//! is:
20//!
21//!
22//! $ cargo run --example connect 127.0.0.1:8080
23//! GET foo
24//! foo = bar
25//! GET FOOBAR
26//! error: no key FOOBAR
27//! SET FOOBAR my awesome string
28//! set FOOBAR = `my awesome string`, previous: None
29//! SET foo tokio
30//! set foo = `tokio`, previous: Some("bar")
31//! GET foo
32//! foo = tokio
33//!
34//! Namely you can issue two forms of commands:
35//!
36//! * `GET $key` - this will fetch the value of `$key` from the database and
37//! return it. The server's database is initially populated with the key `foo`
38//! set to the value `bar`
39//! * `SET $key $value` - this will set the value of `$key` to `$value`,
40//! returning the previous value, if any.
41
42#![warn(rust_2018_idioms)]
43
44use tokio::net::TcpListener;
45use tokio_stream::StreamExt;
46use tokio_util::codec::{Framed, LinesCodec};
47
48use futures::SinkExt;
49use std::collections::HashMap;
50use std::env;
51use std::error::Error;
52use std::sync::{Arc, Mutex};
53
54/// The in-memory database shared amongst all clients.
55///
56/// This database will be shared via `Arc`, so to mutate the internal map we're
57/// going to use a `Mutex` for interior mutability.
58struct Database {
59 map: Mutex<HashMap<String, String>>,
60}
61
62/// Possible requests our clients can send us
63enum Request {
64 Get { key: String },
65 Set { key: String, value: String },
66}
67
68/// Responses to the `Request` commands above
69enum Response {
70 Value {
71 key: String,
72 value: String,
73 },
74 Set {
75 key: String,
76 value: String,
77 previous: Option<String>,
78 },
79 Error {
80 msg: String,
81 },
82}
83
84#[tokio::main]
85async fn main() -> Result<(), Box<dyn Error>> {
86 // Parse the address we're going to run this server on
87 // and set up our TCP listener to accept connections.
88 let addr = env::args()
89 .nth(1)
90 .unwrap_or_else(|| "127.0.0.1:8080".to_string());
91
92 let listener = TcpListener::bind(&addr).await?;
93 println!("Listening on: {}", addr);
94
95 // Create the shared state of this server that will be shared amongst all
96 // clients. We populate the initial database and then create the `Database`
97 // structure. Note the usage of `Arc` here which will be used to ensure that
98 // each independently spawned client will have a reference to the in-memory
99 // database.
100 let mut initial_db = HashMap::new();
101 initial_db.insert("foo".to_string(), "bar".to_string());
102 let db = Arc::new(Database {
103 map: Mutex::new(initial_db),
104 });
105
106 loop {
107 match listener.accept().await {
108 Ok((socket, _)) => {
109 // After getting a new connection first we see a clone of the database
110 // being created, which is creating a new reference for this connected
111 // client to use.
112 let db = db.clone();
113
114 // Like with other small servers, we'll `spawn` this client to ensure it
115 // runs concurrently with all other clients. The `move` keyword is used
116 // here to move ownership of our db handle into the async closure.
117 tokio::spawn(async move {
118 // Since our protocol is line-based we use `tokio_codecs`'s `LineCodec`
119 // to convert our stream of bytes, `socket`, into a `Stream` of lines
120 // as well as convert our line based responses into a stream of bytes.
121 let mut lines = Framed::new(socket, LinesCodec::new());
122
123 // Here for every line we get back from the `Framed` decoder,
124 // we parse the request, and if it's valid we generate a response
125 // based on the values in the database.
126 while let Some(result) = lines.next().await {
127 match result {
128 Ok(line) => {
129 let response = handle_request(&line, &db);
130
131 let response = response.serialize();
132
133 if let Err(e) = lines.send(response.as_str()).await {
134 println!("error on sending response; error = {:?}", e);
135 }
136 }
137 Err(e) => {
138 println!("error on decoding from socket; error = {:?}", e);
139 }
140 }
141 }
142
143 // The connection will be closed at this point as `lines.next()` has returned `None`.
144 });
145 }
146 Err(e) => println!("error accepting socket; error = {:?}", e),
147 }
148 }
149}
150
151fn handle_request(line: &str, db: &Arc<Database>) -> Response {
152 let request = match Request::parse(line) {
153 Ok(req) => req,
154 Err(e) => return Response::Error { msg: e },
155 };
156
157 let mut db = db.map.lock().unwrap();
158 match request {
159 Request::Get { key } => match db.get(&key) {
160 Some(value) => Response::Value {
161 key,
162 value: value.clone(),
163 },
164 None => Response::Error {
165 msg: format!("no key {}", key),
166 },
167 },
168 Request::Set { key, value } => {
169 let previous = db.insert(key.clone(), value.clone());
170 Response::Set {
171 key,
172 value,
173 previous,
174 }
175 }
176 }
177}
178
179impl Request {
180 fn parse(input: &str) -> Result<Request, String> {
181 let mut parts = input.splitn(3, ' ');
182 match parts.next() {
183 Some("GET") => {
184 let key = parts.next().ok_or("GET must be followed by a key")?;
185 if parts.next().is_some() {
186 return Err("GET's key must not be followed by anything".into());
187 }
188 Ok(Request::Get {
189 key: key.to_string(),
190 })
191 }
192 Some("SET") => {
193 let key = match parts.next() {
194 Some(key) => key,
195 None => return Err("SET must be followed by a key".into()),
196 };
197 let value = match parts.next() {
198 Some(value) => value,
199 None => return Err("SET needs a value".into()),
200 };
201 Ok(Request::Set {
202 key: key.to_string(),
203 value: value.to_string(),
204 })
205 }
206 Some(cmd) => Err(format!("unknown command: {}", cmd)),
207 None => Err("empty input".into()),
208 }
209 }
210}
211
212impl Response {
213 fn serialize(&self) -> String {
214 match *self {
215 Response::Value { ref key, ref value } => format!("{} = {}", key, value),
216 Response::Set {
217 ref key,
218 ref value,
219 ref previous,
220 } => format!("set {} = `{}`, previous: {:?}", key, value, previous),
221 Response::Error { ref msg } => format!("error: {}", msg),
222 }
223 }
224}
225