1#![cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))]
2
3//! Synchronization primitives for use in asynchronous contexts.
4//!
5//! Tokio programs tend to be organized as a set of [tasks] where each task
6//! operates independently and may be executed on separate physical threads. The
7//! synchronization primitives provided in this module permit these independent
8//! tasks to communicate together.
9//!
10//! [tasks]: crate::task
11//!
12//! # Message passing
13//!
14//! The most common form of synchronization in a Tokio program is message
15//! passing. Two tasks operate independently and send messages to each other to
16//! synchronize. Doing so has the advantage of avoiding shared state.
17//!
18//! Message passing is implemented using channels. A channel supports sending a
19//! message from one producer task to one or more consumer tasks. There are a
20//! few flavors of channels provided by Tokio. Each channel flavor supports
21//! different message passing patterns. When a channel supports multiple
22//! producers, many separate tasks may **send** messages. When a channel
23//! supports multiple consumers, many different separate tasks may **receive**
24//! messages.
25//!
26//! Tokio provides many different channel flavors as different message passing
27//! patterns are best handled with different implementations.
28//!
29//! ## `oneshot` channel
30//!
31//! The [`oneshot` channel][oneshot] supports sending a **single** value from a
32//! single producer to a single consumer. This channel is usually used to send
33//! the result of a computation to a waiter.
34//!
35//! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a
36//! computation.
37//!
38//! ```
39//! use tokio::sync::oneshot;
40//!
41//! async fn some_computation() -> String {
42//! "represents the result of the computation".to_string()
43//! }
44//!
45//! #[tokio::main]
46//! async fn main() {
47//! let (tx, rx) = oneshot::channel();
48//!
49//! tokio::spawn(async move {
50//! let res = some_computation().await;
51//! tx.send(res).unwrap();
52//! });
53//!
54//! // Do other work while the computation is happening in the background
55//!
56//! // Wait for the computation result
57//! let res = rx.await.unwrap();
58//! }
59//! ```
60//!
61//! Note, if the task produces a computation result as its final
62//! action before terminating, the [`JoinHandle`] can be used to
63//! receive that value instead of allocating resources for the
64//! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If
65//! the task panics, the `Joinhandle` yields `Err` with the panic
66//! cause.
67//!
68//! **Example:**
69//!
70//! ```
71//! async fn some_computation() -> String {
72//! "the result of the computation".to_string()
73//! }
74//!
75//! #[tokio::main]
76//! async fn main() {
77//! let join_handle = tokio::spawn(async move {
78//! some_computation().await
79//! });
80//!
81//! // Do other work while the computation is happening in the background
82//!
83//! // Wait for the computation result
84//! let res = join_handle.await.unwrap();
85//! }
86//! ```
87//!
88//! [oneshot]: oneshot
89//! [`JoinHandle`]: crate::task::JoinHandle
90//!
91//! ## `mpsc` channel
92//!
93//! The [`mpsc` channel][mpsc] supports sending **many** values from **many**
94//! producers to a single consumer. This channel is often used to send work to a
95//! task or to receive the result of many computations.
96//!
97//! This is also the channel you should use if you want to send many messages
98//! from a single producer to a single consumer. There is no dedicated spsc
99//! channel.
100//!
101//! **Example:** using an mpsc to incrementally stream the results of a series
102//! of computations.
103//!
104//! ```
105//! use tokio::sync::mpsc;
106//!
107//! async fn some_computation(input: u32) -> String {
108//! format!("the result of computation {}", input)
109//! }
110//!
111//! #[tokio::main]
112//! async fn main() {
113//! let (tx, mut rx) = mpsc::channel(100);
114//!
115//! tokio::spawn(async move {
116//! for i in 0..10 {
117//! let res = some_computation(i).await;
118//! tx.send(res).await.unwrap();
119//! }
120//! });
121//!
122//! while let Some(res) = rx.recv().await {
123//! println!("got = {}", res);
124//! }
125//! }
126//! ```
127//!
128//! The argument to `mpsc::channel` is the channel capacity. This is the maximum
129//! number of values that can be stored in the channel pending receipt at any
130//! given time. Properly setting this value is key in implementing robust
131//! programs as the channel capacity plays a critical part in handling back
132//! pressure.
133//!
134//! A common concurrency pattern for resource management is to spawn a task
135//! dedicated to managing that resource and using message passing between other
136//! tasks to interact with the resource. The resource may be anything that may
137//! not be concurrently used. Some examples include a socket and program state.
138//! For example, if multiple tasks need to send data over a single socket, spawn
139//! a task to manage the socket and use a channel to synchronize.
140//!
141//! **Example:** sending data from many tasks over a single socket using message
142//! passing.
143//!
144//! ```no_run
145//! use tokio::io::{self, AsyncWriteExt};
146//! use tokio::net::TcpStream;
147//! use tokio::sync::mpsc;
148//!
149//! #[tokio::main]
150//! async fn main() -> io::Result<()> {
151//! let mut socket = TcpStream::connect("www.example.com:1234").await?;
152//! let (tx, mut rx) = mpsc::channel(100);
153//!
154//! for _ in 0..10 {
155//! // Each task needs its own `tx` handle. This is done by cloning the
156//! // original handle.
157//! let tx = tx.clone();
158//!
159//! tokio::spawn(async move {
160//! tx.send(&b"data to write"[..]).await.unwrap();
161//! });
162//! }
163//!
164//! // The `rx` half of the channel returns `None` once **all** `tx` clones
165//! // drop. To ensure `None` is returned, drop the handle owned by the
166//! // current task. If this `tx` handle is not dropped, there will always
167//! // be a single outstanding `tx` handle.
168//! drop(tx);
169//!
170//! while let Some(res) = rx.recv().await {
171//! socket.write_all(res).await?;
172//! }
173//!
174//! Ok(())
175//! }
176//! ```
177//!
178//! The [`mpsc`][mpsc] and [`oneshot`][oneshot] channels can be combined to
179//! provide a request / response type synchronization pattern with a shared
180//! resource. A task is spawned to synchronize a resource and waits on commands
181//! received on a [`mpsc`][mpsc] channel. Each command includes a
182//! [`oneshot`][oneshot] `Sender` on which the result of the command is sent.
183//!
184//! **Example:** use a task to synchronize a `u64` counter. Each task sends an
185//! "fetch and increment" command. The counter value **before** the increment is
186//! sent over the provided `oneshot` channel.
187//!
188//! ```
189//! use tokio::sync::{oneshot, mpsc};
190//! use Command::Increment;
191//!
192//! enum Command {
193//! Increment,
194//! // Other commands can be added here
195//! }
196//!
197//! #[tokio::main]
198//! async fn main() {
199//! let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100);
200//!
201//! // Spawn a task to manage the counter
202//! tokio::spawn(async move {
203//! let mut counter: u64 = 0;
204//!
205//! while let Some((cmd, response)) = cmd_rx.recv().await {
206//! match cmd {
207//! Increment => {
208//! let prev = counter;
209//! counter += 1;
210//! response.send(prev).unwrap();
211//! }
212//! }
213//! }
214//! });
215//!
216//! let mut join_handles = vec![];
217//!
218//! // Spawn tasks that will send the increment command.
219//! for _ in 0..10 {
220//! let cmd_tx = cmd_tx.clone();
221//!
222//! join_handles.push(tokio::spawn(async move {
223//! let (resp_tx, resp_rx) = oneshot::channel();
224//!
225//! cmd_tx.send((Increment, resp_tx)).await.ok().unwrap();
226//! let res = resp_rx.await.unwrap();
227//!
228//! println!("previous value = {}", res);
229//! }));
230//! }
231//!
232//! // Wait for all tasks to complete
233//! for join_handle in join_handles.drain(..) {
234//! join_handle.await.unwrap();
235//! }
236//! }
237//! ```
238//!
239//! [mpsc]: mpsc
240//!
241//! ## `broadcast` channel
242//!
243//! The [`broadcast` channel] supports sending **many** values from
244//! **many** producers to **many** consumers. Each consumer will receive
245//! **each** value. This channel can be used to implement "fan out" style
246//! patterns common with pub / sub or "chat" systems.
247//!
248//! This channel tends to be used less often than `oneshot` and `mpsc` but still
249//! has its use cases.
250//!
251//! This is also the channel you should use if you want to broadcast values from
252//! a single producer to many consumers. There is no dedicated spmc broadcast
253//! channel.
254//!
255//! Basic usage
256//!
257//! ```
258//! use tokio::sync::broadcast;
259//!
260//! #[tokio::main]
261//! async fn main() {
262//! let (tx, mut rx1) = broadcast::channel(16);
263//! let mut rx2 = tx.subscribe();
264//!
265//! tokio::spawn(async move {
266//! assert_eq!(rx1.recv().await.unwrap(), 10);
267//! assert_eq!(rx1.recv().await.unwrap(), 20);
268//! });
269//!
270//! tokio::spawn(async move {
271//! assert_eq!(rx2.recv().await.unwrap(), 10);
272//! assert_eq!(rx2.recv().await.unwrap(), 20);
273//! });
274//!
275//! tx.send(10).unwrap();
276//! tx.send(20).unwrap();
277//! }
278//! ```
279//!
280//! [`broadcast` channel]: crate::sync::broadcast
281//!
282//! ## `watch` channel
283//!
284//! The [`watch` channel] supports sending **many** values from a **single**
285//! producer to **many** consumers. However, only the **most recent** value is
286//! stored in the channel. Consumers are notified when a new value is sent, but
287//! there is no guarantee that consumers will see **all** values.
288//!
289//! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1.
290//!
291//! Use cases for the [`watch` channel] include broadcasting configuration
292//! changes or signalling program state changes, such as transitioning to
293//! shutdown.
294//!
295//! **Example:** use a [`watch` channel] to notify tasks of configuration
296//! changes. In this example, a configuration file is checked periodically. When
297//! the file changes, the configuration changes are signalled to consumers.
298//!
299//! ```
300//! use tokio::sync::watch;
301//! use tokio::time::{self, Duration, Instant};
302//!
303//! use std::io;
304//!
305//! #[derive(Debug, Clone, Eq, PartialEq)]
306//! struct Config {
307//! timeout: Duration,
308//! }
309//!
310//! impl Config {
311//! async fn load_from_file() -> io::Result<Config> {
312//! // file loading and deserialization logic here
313//! # Ok(Config { timeout: Duration::from_secs(1) })
314//! }
315//! }
316//!
317//! async fn my_async_operation() {
318//! // Do something here
319//! }
320//!
321//! #[tokio::main]
322//! async fn main() {
323//! // Load initial configuration value
324//! let mut config = Config::load_from_file().await.unwrap();
325//!
326//! // Create the watch channel, initialized with the loaded configuration
327//! let (tx, rx) = watch::channel(config.clone());
328//!
329//! // Spawn a task to monitor the file.
330//! tokio::spawn(async move {
331//! loop {
332//! // Wait 10 seconds between checks
333//! time::sleep(Duration::from_secs(10)).await;
334//!
335//! // Load the configuration file
336//! let new_config = Config::load_from_file().await.unwrap();
337//!
338//! // If the configuration changed, send the new config value
339//! // on the watch channel.
340//! if new_config != config {
341//! tx.send(new_config.clone()).unwrap();
342//! config = new_config;
343//! }
344//! }
345//! });
346//!
347//! let mut handles = vec![];
348//!
349//! // Spawn tasks that runs the async operation for at most `timeout`. If
350//! // the timeout elapses, restart the operation.
351//! //
352//! // The task simultaneously watches the `Config` for changes. When the
353//! // timeout duration changes, the timeout is updated without restarting
354//! // the in-flight operation.
355//! for _ in 0..5 {
356//! // Clone a config watch handle for use in this task
357//! let mut rx = rx.clone();
358//!
359//! let handle = tokio::spawn(async move {
360//! // Start the initial operation and pin the future to the stack.
361//! // Pinning to the stack is required to resume the operation
362//! // across multiple calls to `select!`
363//! let op = my_async_operation();
364//! tokio::pin!(op);
365//!
366//! // Get the initial config value
367//! let mut conf = rx.borrow().clone();
368//!
369//! let mut op_start = Instant::now();
370//! let sleep = time::sleep_until(op_start + conf.timeout);
371//! tokio::pin!(sleep);
372//!
373//! loop {
374//! tokio::select! {
375//! _ = &mut sleep => {
376//! // The operation elapsed. Restart it
377//! op.set(my_async_operation());
378//!
379//! // Track the new start time
380//! op_start = Instant::now();
381//!
382//! // Restart the timeout
383//! sleep.set(time::sleep_until(op_start + conf.timeout));
384//! }
385//! _ = rx.changed() => {
386//! conf = rx.borrow().clone();
387//!
388//! // The configuration has been updated. Update the
389//! // `sleep` using the new `timeout` value.
390//! sleep.as_mut().reset(op_start + conf.timeout);
391//! }
392//! _ = &mut op => {
393//! // The operation completed!
394//! return
395//! }
396//! }
397//! }
398//! });
399//!
400//! handles.push(handle);
401//! }
402//!
403//! for handle in handles.drain(..) {
404//! handle.await.unwrap();
405//! }
406//! }
407//! ```
408//!
409//! [`watch` channel]: mod@crate::sync::watch
410//! [`broadcast` channel]: mod@crate::sync::broadcast
411//!
412//! # State synchronization
413//!
414//! The remaining synchronization primitives focus on synchronizing state.
415//! These are asynchronous equivalents to versions provided by `std`. They
416//! operate in a similar way as their `std` counterparts but will wait
417//! asynchronously instead of blocking the thread.
418//!
419//! * [`Barrier`](Barrier) Ensures multiple tasks will wait for each other to
420//! reach a point in the program, before continuing execution all together.
421//!
422//! * [`Mutex`](Mutex) Mutual Exclusion mechanism, which ensures that at most
423//! one thread at a time is able to access some data.
424//!
425//! * [`Notify`](Notify) Basic task notification. `Notify` supports notifying a
426//! receiving task without sending data. In this case, the task wakes up and
427//! resumes processing.
428//!
429//! * [`RwLock`](RwLock) Provides a mutual exclusion mechanism which allows
430//! multiple readers at the same time, while allowing only one writer at a
431//! time. In some cases, this can be more efficient than a mutex.
432//!
433//! * [`Semaphore`](Semaphore) Limits the amount of concurrency. A semaphore
434//! holds a number of permits, which tasks may request in order to enter a
435//! critical section. Semaphores are useful for implementing limiting or
436//! bounding of any kind.
437
438cfg_sync! {
439 /// Named future types.
440 pub mod futures {
441 pub use super::notify::Notified;
442 }
443
444 mod barrier;
445 pub use barrier::{Barrier, BarrierWaitResult};
446
447 pub mod broadcast;
448
449 pub mod mpsc;
450
451 mod mutex;
452 pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard};
453
454 pub(crate) mod notify;
455 pub use notify::Notify;
456
457 pub mod oneshot;
458
459 pub(crate) mod batch_semaphore;
460 pub use batch_semaphore::{AcquireError, TryAcquireError};
461
462 mod semaphore;
463 pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
464
465 mod rwlock;
466 pub use rwlock::RwLock;
467 pub use rwlock::owned_read_guard::OwnedRwLockReadGuard;
468 pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard;
469 pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard;
470 pub use rwlock::read_guard::RwLockReadGuard;
471 pub use rwlock::write_guard::RwLockWriteGuard;
472 pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard;
473
474 mod task;
475 pub(crate) use task::AtomicWaker;
476
477 mod once_cell;
478 pub use self::once_cell::{OnceCell, SetError};
479
480 pub mod watch;
481}
482
483cfg_not_sync! {
484 cfg_fs! {
485 pub(crate) mod batch_semaphore;
486 mod mutex;
487 pub(crate) use mutex::Mutex;
488 }
489
490 #[cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))]
491 pub(crate) mod notify;
492
493 #[cfg(any(feature = "rt", all(windows, feature = "process")))]
494 pub(crate) mod oneshot;
495
496 cfg_atomic_waker_impl! {
497 mod task;
498 pub(crate) use task::AtomicWaker;
499 }
500
501 #[cfg(any(feature = "signal", all(unix, feature = "process")))]
502 pub(crate) mod watch;
503}
504
505/// Unit tests
506#[cfg(test)]
507mod tests;
508