1 | #![cfg_attr (loom, allow(dead_code, unreachable_pub, unused_imports))] |
2 | |
3 | //! Synchronization primitives for use in asynchronous contexts. |
4 | //! |
5 | //! Tokio programs tend to be organized as a set of [tasks] where each task |
6 | //! operates independently and may be executed on separate physical threads. The |
7 | //! synchronization primitives provided in this module permit these independent |
8 | //! tasks to communicate together. |
9 | //! |
10 | //! [tasks]: crate::task |
11 | //! |
12 | //! # Message passing |
13 | //! |
14 | //! The most common form of synchronization in a Tokio program is message |
15 | //! passing. Two tasks operate independently and send messages to each other to |
16 | //! synchronize. Doing so has the advantage of avoiding shared state. |
17 | //! |
18 | //! Message passing is implemented using channels. A channel supports sending a |
19 | //! message from one producer task to one or more consumer tasks. There are a |
20 | //! few flavors of channels provided by Tokio. Each channel flavor supports |
21 | //! different message passing patterns. When a channel supports multiple |
22 | //! producers, many separate tasks may **send** messages. When a channel |
23 | //! supports multiple consumers, many different separate tasks may **receive** |
24 | //! messages. |
25 | //! |
26 | //! Tokio provides many different channel flavors as different message passing |
27 | //! patterns are best handled with different implementations. |
28 | //! |
29 | //! ## `oneshot` channel |
30 | //! |
31 | //! The [`oneshot` channel][oneshot] supports sending a **single** value from a |
32 | //! single producer to a single consumer. This channel is usually used to send |
33 | //! the result of a computation to a waiter. |
34 | //! |
35 | //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a |
36 | //! computation. |
37 | //! |
38 | //! ``` |
39 | //! use tokio::sync::oneshot; |
40 | //! |
41 | //! async fn some_computation() -> String { |
42 | //! "represents the result of the computation" .to_string() |
43 | //! } |
44 | //! |
45 | //! #[tokio::main] |
46 | //! async fn main() { |
47 | //! let (tx, rx) = oneshot::channel(); |
48 | //! |
49 | //! tokio::spawn(async move { |
50 | //! let res = some_computation().await; |
51 | //! tx.send(res).unwrap(); |
52 | //! }); |
53 | //! |
54 | //! // Do other work while the computation is happening in the background |
55 | //! |
56 | //! // Wait for the computation result |
57 | //! let res = rx.await.unwrap(); |
58 | //! } |
59 | //! ``` |
60 | //! |
61 | //! Note, if the task produces a computation result as its final |
62 | //! action before terminating, the [`JoinHandle`] can be used to |
63 | //! receive that value instead of allocating resources for the |
64 | //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If |
65 | //! the task panics, the `Joinhandle` yields `Err` with the panic |
66 | //! cause. |
67 | //! |
68 | //! **Example:** |
69 | //! |
70 | //! ``` |
71 | //! async fn some_computation() -> String { |
72 | //! "the result of the computation" .to_string() |
73 | //! } |
74 | //! |
75 | //! #[tokio::main] |
76 | //! async fn main() { |
77 | //! let join_handle = tokio::spawn(async move { |
78 | //! some_computation().await |
79 | //! }); |
80 | //! |
81 | //! // Do other work while the computation is happening in the background |
82 | //! |
83 | //! // Wait for the computation result |
84 | //! let res = join_handle.await.unwrap(); |
85 | //! } |
86 | //! ``` |
87 | //! |
88 | //! [`JoinHandle`]: crate::task::JoinHandle |
89 | //! |
90 | //! ## `mpsc` channel |
91 | //! |
92 | //! The [`mpsc` channel][mpsc] supports sending **many** values from **many** |
93 | //! producers to a single consumer. This channel is often used to send work to a |
94 | //! task or to receive the result of many computations. |
95 | //! |
96 | //! This is also the channel you should use if you want to send many messages |
97 | //! from a single producer to a single consumer. There is no dedicated spsc |
98 | //! channel. |
99 | //! |
100 | //! **Example:** using an mpsc to incrementally stream the results of a series |
101 | //! of computations. |
102 | //! |
103 | //! ``` |
104 | //! use tokio::sync::mpsc; |
105 | //! |
106 | //! async fn some_computation(input: u32) -> String { |
107 | //! format!("the result of computation {}" , input) |
108 | //! } |
109 | //! |
110 | //! #[tokio::main] |
111 | //! async fn main() { |
112 | //! let (tx, mut rx) = mpsc::channel(100); |
113 | //! |
114 | //! tokio::spawn(async move { |
115 | //! for i in 0..10 { |
116 | //! let res = some_computation(i).await; |
117 | //! tx.send(res).await.unwrap(); |
118 | //! } |
119 | //! }); |
120 | //! |
121 | //! while let Some(res) = rx.recv().await { |
122 | //! println!("got = {}" , res); |
123 | //! } |
124 | //! } |
125 | //! ``` |
126 | //! |
127 | //! The argument to `mpsc::channel` is the channel capacity. This is the maximum |
128 | //! number of values that can be stored in the channel pending receipt at any |
129 | //! given time. Properly setting this value is key in implementing robust |
130 | //! programs as the channel capacity plays a critical part in handling back |
131 | //! pressure. |
132 | //! |
133 | //! A common concurrency pattern for resource management is to spawn a task |
134 | //! dedicated to managing that resource and using message passing between other |
135 | //! tasks to interact with the resource. The resource may be anything that may |
136 | //! not be concurrently used. Some examples include a socket and program state. |
137 | //! For example, if multiple tasks need to send data over a single socket, spawn |
138 | //! a task to manage the socket and use a channel to synchronize. |
139 | //! |
140 | //! **Example:** sending data from many tasks over a single socket using message |
141 | //! passing. |
142 | //! |
143 | //! ```no_run |
144 | //! use tokio::io::{self, AsyncWriteExt}; |
145 | //! use tokio::net::TcpStream; |
146 | //! use tokio::sync::mpsc; |
147 | //! |
148 | //! #[tokio::main] |
149 | //! async fn main() -> io::Result<()> { |
150 | //! let mut socket = TcpStream::connect("www.example.com:1234" ).await?; |
151 | //! let (tx, mut rx) = mpsc::channel(100); |
152 | //! |
153 | //! for _ in 0..10 { |
154 | //! // Each task needs its own `tx` handle. This is done by cloning the |
155 | //! // original handle. |
156 | //! let tx = tx.clone(); |
157 | //! |
158 | //! tokio::spawn(async move { |
159 | //! tx.send(&b"data to write" [..]).await.unwrap(); |
160 | //! }); |
161 | //! } |
162 | //! |
163 | //! // The `rx` half of the channel returns `None` once **all** `tx` clones |
164 | //! // drop. To ensure `None` is returned, drop the handle owned by the |
165 | //! // current task. If this `tx` handle is not dropped, there will always |
166 | //! // be a single outstanding `tx` handle. |
167 | //! drop(tx); |
168 | //! |
169 | //! while let Some(res) = rx.recv().await { |
170 | //! socket.write_all(res).await?; |
171 | //! } |
172 | //! |
173 | //! Ok(()) |
174 | //! } |
175 | //! ``` |
176 | //! |
177 | //! The [`mpsc`] and [`oneshot`] channels can be combined to provide a request / |
178 | //! response type synchronization pattern with a shared resource. A task is |
179 | //! spawned to synchronize a resource and waits on commands received on a |
180 | //! [`mpsc`] channel. Each command includes a [`oneshot`] `Sender` on which the |
181 | //! result of the command is sent. |
182 | //! |
183 | //! **Example:** use a task to synchronize a `u64` counter. Each task sends an |
184 | //! "fetch and increment" command. The counter value **before** the increment is |
185 | //! sent over the provided `oneshot` channel. |
186 | //! |
187 | //! ``` |
188 | //! use tokio::sync::{oneshot, mpsc}; |
189 | //! use Command::Increment; |
190 | //! |
191 | //! enum Command { |
192 | //! Increment, |
193 | //! // Other commands can be added here |
194 | //! } |
195 | //! |
196 | //! #[tokio::main] |
197 | //! async fn main() { |
198 | //! let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100); |
199 | //! |
200 | //! // Spawn a task to manage the counter |
201 | //! tokio::spawn(async move { |
202 | //! let mut counter: u64 = 0; |
203 | //! |
204 | //! while let Some((cmd, response)) = cmd_rx.recv().await { |
205 | //! match cmd { |
206 | //! Increment => { |
207 | //! let prev = counter; |
208 | //! counter += 1; |
209 | //! response.send(prev).unwrap(); |
210 | //! } |
211 | //! } |
212 | //! } |
213 | //! }); |
214 | //! |
215 | //! let mut join_handles = vec![]; |
216 | //! |
217 | //! // Spawn tasks that will send the increment command. |
218 | //! for _ in 0..10 { |
219 | //! let cmd_tx = cmd_tx.clone(); |
220 | //! |
221 | //! join_handles.push(tokio::spawn(async move { |
222 | //! let (resp_tx, resp_rx) = oneshot::channel(); |
223 | //! |
224 | //! cmd_tx.send((Increment, resp_tx)).await.ok().unwrap(); |
225 | //! let res = resp_rx.await.unwrap(); |
226 | //! |
227 | //! println!("previous value = {}" , res); |
228 | //! })); |
229 | //! } |
230 | //! |
231 | //! // Wait for all tasks to complete |
232 | //! for join_handle in join_handles.drain(..) { |
233 | //! join_handle.await.unwrap(); |
234 | //! } |
235 | //! } |
236 | //! ``` |
237 | //! |
238 | //! ## `broadcast` channel |
239 | //! |
240 | //! The [`broadcast` channel] supports sending **many** values from |
241 | //! **many** producers to **many** consumers. Each consumer will receive |
242 | //! **each** value. This channel can be used to implement "fan out" style |
243 | //! patterns common with pub / sub or "chat" systems. |
244 | //! |
245 | //! This channel tends to be used less often than `oneshot` and `mpsc` but still |
246 | //! has its use cases. |
247 | //! |
248 | //! This is also the channel you should use if you want to broadcast values from |
249 | //! a single producer to many consumers. There is no dedicated spmc broadcast |
250 | //! channel. |
251 | //! |
252 | //! Basic usage |
253 | //! |
254 | //! ``` |
255 | //! use tokio::sync::broadcast; |
256 | //! |
257 | //! #[tokio::main] |
258 | //! async fn main() { |
259 | //! let (tx, mut rx1) = broadcast::channel(16); |
260 | //! let mut rx2 = tx.subscribe(); |
261 | //! |
262 | //! tokio::spawn(async move { |
263 | //! assert_eq!(rx1.recv().await.unwrap(), 10); |
264 | //! assert_eq!(rx1.recv().await.unwrap(), 20); |
265 | //! }); |
266 | //! |
267 | //! tokio::spawn(async move { |
268 | //! assert_eq!(rx2.recv().await.unwrap(), 10); |
269 | //! assert_eq!(rx2.recv().await.unwrap(), 20); |
270 | //! }); |
271 | //! |
272 | //! tx.send(10).unwrap(); |
273 | //! tx.send(20).unwrap(); |
274 | //! } |
275 | //! ``` |
276 | //! |
277 | //! [`broadcast` channel]: crate::sync::broadcast |
278 | //! |
279 | //! ## `watch` channel |
280 | //! |
281 | //! The [`watch` channel] supports sending **many** values from a **single** |
282 | //! producer to **many** consumers. However, only the **most recent** value is |
283 | //! stored in the channel. Consumers are notified when a new value is sent, but |
284 | //! there is no guarantee that consumers will see **all** values. |
285 | //! |
286 | //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1. |
287 | //! |
288 | //! Use cases for the [`watch` channel] include broadcasting configuration |
289 | //! changes or signalling program state changes, such as transitioning to |
290 | //! shutdown. |
291 | //! |
292 | //! **Example:** use a [`watch` channel] to notify tasks of configuration |
293 | //! changes. In this example, a configuration file is checked periodically. When |
294 | //! the file changes, the configuration changes are signalled to consumers. |
295 | //! |
296 | //! ``` |
297 | //! use tokio::sync::watch; |
298 | //! use tokio::time::{self, Duration, Instant}; |
299 | //! |
300 | //! use std::io; |
301 | //! |
302 | //! #[derive(Debug, Clone, Eq, PartialEq)] |
303 | //! struct Config { |
304 | //! timeout: Duration, |
305 | //! } |
306 | //! |
307 | //! impl Config { |
308 | //! async fn load_from_file() -> io::Result<Config> { |
309 | //! // file loading and deserialization logic here |
310 | //! # Ok(Config { timeout: Duration::from_secs(1) }) |
311 | //! } |
312 | //! } |
313 | //! |
314 | //! async fn my_async_operation() { |
315 | //! // Do something here |
316 | //! } |
317 | //! |
318 | //! #[tokio::main] |
319 | //! async fn main() { |
320 | //! // Load initial configuration value |
321 | //! let mut config = Config::load_from_file().await.unwrap(); |
322 | //! |
323 | //! // Create the watch channel, initialized with the loaded configuration |
324 | //! let (tx, rx) = watch::channel(config.clone()); |
325 | //! |
326 | //! // Spawn a task to monitor the file. |
327 | //! tokio::spawn(async move { |
328 | //! loop { |
329 | //! // Wait 10 seconds between checks |
330 | //! time::sleep(Duration::from_secs(10)).await; |
331 | //! |
332 | //! // Load the configuration file |
333 | //! let new_config = Config::load_from_file().await.unwrap(); |
334 | //! |
335 | //! // If the configuration changed, send the new config value |
336 | //! // on the watch channel. |
337 | //! if new_config != config { |
338 | //! tx.send(new_config.clone()).unwrap(); |
339 | //! config = new_config; |
340 | //! } |
341 | //! } |
342 | //! }); |
343 | //! |
344 | //! let mut handles = vec![]; |
345 | //! |
346 | //! // Spawn tasks that runs the async operation for at most `timeout`. If |
347 | //! // the timeout elapses, restart the operation. |
348 | //! // |
349 | //! // The task simultaneously watches the `Config` for changes. When the |
350 | //! // timeout duration changes, the timeout is updated without restarting |
351 | //! // the in-flight operation. |
352 | //! for _ in 0..5 { |
353 | //! // Clone a config watch handle for use in this task |
354 | //! let mut rx = rx.clone(); |
355 | //! |
356 | //! let handle = tokio::spawn(async move { |
357 | //! // Start the initial operation and pin the future to the stack. |
358 | //! // Pinning to the stack is required to resume the operation |
359 | //! // across multiple calls to `select!` |
360 | //! let op = my_async_operation(); |
361 | //! tokio::pin!(op); |
362 | //! |
363 | //! // Get the initial config value |
364 | //! let mut conf = rx.borrow().clone(); |
365 | //! |
366 | //! let mut op_start = Instant::now(); |
367 | //! let sleep = time::sleep_until(op_start + conf.timeout); |
368 | //! tokio::pin!(sleep); |
369 | //! |
370 | //! loop { |
371 | //! tokio::select! { |
372 | //! _ = &mut sleep => { |
373 | //! // The operation elapsed. Restart it |
374 | //! op.set(my_async_operation()); |
375 | //! |
376 | //! // Track the new start time |
377 | //! op_start = Instant::now(); |
378 | //! |
379 | //! // Restart the timeout |
380 | //! sleep.set(time::sleep_until(op_start + conf.timeout)); |
381 | //! } |
382 | //! _ = rx.changed() => { |
383 | //! conf = rx.borrow_and_update().clone(); |
384 | //! |
385 | //! // The configuration has been updated. Update the |
386 | //! // `sleep` using the new `timeout` value. |
387 | //! sleep.as_mut().reset(op_start + conf.timeout); |
388 | //! } |
389 | //! _ = &mut op => { |
390 | //! // The operation completed! |
391 | //! return |
392 | //! } |
393 | //! } |
394 | //! } |
395 | //! }); |
396 | //! |
397 | //! handles.push(handle); |
398 | //! } |
399 | //! |
400 | //! for handle in handles.drain(..) { |
401 | //! handle.await.unwrap(); |
402 | //! } |
403 | //! } |
404 | //! ``` |
405 | //! |
406 | //! [`watch` channel]: mod@crate::sync::watch |
407 | //! [`broadcast` channel]: mod@crate::sync::broadcast |
408 | //! |
409 | //! # State synchronization |
410 | //! |
411 | //! The remaining synchronization primitives focus on synchronizing state. |
412 | //! These are asynchronous equivalents to versions provided by `std`. They |
413 | //! operate in a similar way as their `std` counterparts but will wait |
414 | //! asynchronously instead of blocking the thread. |
415 | //! |
416 | //! * [`Barrier`] Ensures multiple tasks will wait for each other to reach a |
417 | //! point in the program, before continuing execution all together. |
418 | //! |
419 | //! * [`Mutex`] Mutual Exclusion mechanism, which ensures that at most one |
420 | //! thread at a time is able to access some data. |
421 | //! |
422 | //! * [`Notify`] Basic task notification. `Notify` supports notifying a |
423 | //! receiving task without sending data. In this case, the task wakes up and |
424 | //! resumes processing. |
425 | //! |
426 | //! * [`RwLock`] Provides a mutual exclusion mechanism which allows multiple |
427 | //! readers at the same time, while allowing only one writer at a time. In |
428 | //! some cases, this can be more efficient than a mutex. |
429 | //! |
430 | //! * [`Semaphore`] Limits the amount of concurrency. A semaphore holds a |
431 | //! number of permits, which tasks may request in order to enter a critical |
432 | //! section. Semaphores are useful for implementing limiting or bounding of |
433 | //! any kind. |
434 | |
435 | cfg_sync! { |
436 | /// Named future types. |
437 | pub mod futures { |
438 | pub use super::notify::Notified; |
439 | } |
440 | |
441 | mod barrier; |
442 | pub use barrier::{Barrier, BarrierWaitResult}; |
443 | |
444 | pub mod broadcast; |
445 | |
446 | pub mod mpsc; |
447 | |
448 | mod mutex; |
449 | pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard}; |
450 | |
451 | pub(crate) mod notify; |
452 | pub use notify::Notify; |
453 | |
454 | pub mod oneshot; |
455 | |
456 | pub(crate) mod batch_semaphore; |
457 | pub use batch_semaphore::{AcquireError, TryAcquireError}; |
458 | |
459 | mod semaphore; |
460 | pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; |
461 | |
462 | mod rwlock; |
463 | pub use rwlock::RwLock; |
464 | pub use rwlock::owned_read_guard::OwnedRwLockReadGuard; |
465 | pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard; |
466 | pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard; |
467 | pub use rwlock::read_guard::RwLockReadGuard; |
468 | pub use rwlock::write_guard::RwLockWriteGuard; |
469 | pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard; |
470 | |
471 | mod task; |
472 | pub(crate) use task::AtomicWaker; |
473 | |
474 | mod once_cell; |
475 | pub use self::once_cell::{OnceCell, SetError}; |
476 | |
477 | pub mod watch; |
478 | } |
479 | |
480 | cfg_not_sync! { |
481 | cfg_fs! { |
482 | pub(crate) mod batch_semaphore; |
483 | mod mutex; |
484 | pub(crate) use mutex::Mutex; |
485 | } |
486 | |
487 | #[cfg (any(feature = "rt" , feature = "signal" , all(unix, feature = "process" )))] |
488 | pub(crate) mod notify; |
489 | |
490 | #[cfg (any(feature = "rt" , all(windows, feature = "process" )))] |
491 | pub(crate) mod oneshot; |
492 | |
493 | cfg_atomic_waker_impl! { |
494 | mod task; |
495 | pub(crate) use task::AtomicWaker; |
496 | } |
497 | |
498 | #[cfg (any(feature = "signal" , all(unix, feature = "process" )))] |
499 | pub(crate) mod watch; |
500 | } |
501 | |
502 | /// Unit tests |
503 | #[cfg (test)] |
504 | mod tests; |
505 | |