| 1 | #![cfg_attr (loom, allow(dead_code, unreachable_pub, unused_imports))] |
| 2 | |
| 3 | //! Synchronization primitives for use in asynchronous contexts. |
| 4 | //! |
| 5 | //! Tokio programs tend to be organized as a set of [tasks] where each task |
| 6 | //! operates independently and may be executed on separate physical threads. The |
| 7 | //! synchronization primitives provided in this module permit these independent |
| 8 | //! tasks to communicate together. |
| 9 | //! |
| 10 | //! [tasks]: crate::task |
| 11 | //! |
| 12 | //! # Message passing |
| 13 | //! |
| 14 | //! The most common form of synchronization in a Tokio program is message |
| 15 | //! passing. Two tasks operate independently and send messages to each other to |
| 16 | //! synchronize. Doing so has the advantage of avoiding shared state. |
| 17 | //! |
| 18 | //! Message passing is implemented using channels. A channel supports sending a |
| 19 | //! message from one producer task to one or more consumer tasks. There are a |
| 20 | //! few flavors of channels provided by Tokio. Each channel flavor supports |
| 21 | //! different message passing patterns. When a channel supports multiple |
| 22 | //! producers, many separate tasks may **send** messages. When a channel |
| 23 | //! supports multiple consumers, many different separate tasks may **receive** |
| 24 | //! messages. |
| 25 | //! |
| 26 | //! Tokio provides many different channel flavors as different message passing |
| 27 | //! patterns are best handled with different implementations. |
| 28 | //! |
| 29 | //! ## `oneshot` channel |
| 30 | //! |
| 31 | //! The [`oneshot` channel][oneshot] supports sending a **single** value from a |
| 32 | //! single producer to a single consumer. This channel is usually used to send |
| 33 | //! the result of a computation to a waiter. |
| 34 | //! |
| 35 | //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a |
| 36 | //! computation. |
| 37 | //! |
| 38 | //! ``` |
| 39 | //! use tokio::sync::oneshot; |
| 40 | //! |
| 41 | //! async fn some_computation() -> String { |
| 42 | //! "represents the result of the computation" .to_string() |
| 43 | //! } |
| 44 | //! |
| 45 | //! #[tokio::main] |
| 46 | //! async fn main() { |
| 47 | //! let (tx, rx) = oneshot::channel(); |
| 48 | //! |
| 49 | //! tokio::spawn(async move { |
| 50 | //! let res = some_computation().await; |
| 51 | //! tx.send(res).unwrap(); |
| 52 | //! }); |
| 53 | //! |
| 54 | //! // Do other work while the computation is happening in the background |
| 55 | //! |
| 56 | //! // Wait for the computation result |
| 57 | //! let res = rx.await.unwrap(); |
| 58 | //! } |
| 59 | //! ``` |
| 60 | //! |
| 61 | //! Note, if the task produces a computation result as its final |
| 62 | //! action before terminating, the [`JoinHandle`] can be used to |
| 63 | //! receive that value instead of allocating resources for the |
| 64 | //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If |
| 65 | //! the task panics, the `Joinhandle` yields `Err` with the panic |
| 66 | //! cause. |
| 67 | //! |
| 68 | //! **Example:** |
| 69 | //! |
| 70 | //! ``` |
| 71 | //! async fn some_computation() -> String { |
| 72 | //! "the result of the computation" .to_string() |
| 73 | //! } |
| 74 | //! |
| 75 | //! #[tokio::main] |
| 76 | //! async fn main() { |
| 77 | //! let join_handle = tokio::spawn(async move { |
| 78 | //! some_computation().await |
| 79 | //! }); |
| 80 | //! |
| 81 | //! // Do other work while the computation is happening in the background |
| 82 | //! |
| 83 | //! // Wait for the computation result |
| 84 | //! let res = join_handle.await.unwrap(); |
| 85 | //! } |
| 86 | //! ``` |
| 87 | //! |
| 88 | //! [`JoinHandle`]: crate::task::JoinHandle |
| 89 | //! |
| 90 | //! ## `mpsc` channel |
| 91 | //! |
| 92 | //! The [`mpsc` channel][mpsc] supports sending **many** values from **many** |
| 93 | //! producers to a single consumer. This channel is often used to send work to a |
| 94 | //! task or to receive the result of many computations. |
| 95 | //! |
| 96 | //! This is also the channel you should use if you want to send many messages |
| 97 | //! from a single producer to a single consumer. There is no dedicated spsc |
| 98 | //! channel. |
| 99 | //! |
| 100 | //! **Example:** using an mpsc to incrementally stream the results of a series |
| 101 | //! of computations. |
| 102 | //! |
| 103 | //! ``` |
| 104 | //! use tokio::sync::mpsc; |
| 105 | //! |
| 106 | //! async fn some_computation(input: u32) -> String { |
| 107 | //! format!("the result of computation {}" , input) |
| 108 | //! } |
| 109 | //! |
| 110 | //! #[tokio::main] |
| 111 | //! async fn main() { |
| 112 | //! let (tx, mut rx) = mpsc::channel(100); |
| 113 | //! |
| 114 | //! tokio::spawn(async move { |
| 115 | //! for i in 0..10 { |
| 116 | //! let res = some_computation(i).await; |
| 117 | //! tx.send(res).await.unwrap(); |
| 118 | //! } |
| 119 | //! }); |
| 120 | //! |
| 121 | //! while let Some(res) = rx.recv().await { |
| 122 | //! println!("got = {}" , res); |
| 123 | //! } |
| 124 | //! } |
| 125 | //! ``` |
| 126 | //! |
| 127 | //! The argument to `mpsc::channel` is the channel capacity. This is the maximum |
| 128 | //! number of values that can be stored in the channel pending receipt at any |
| 129 | //! given time. Properly setting this value is key in implementing robust |
| 130 | //! programs as the channel capacity plays a critical part in handling back |
| 131 | //! pressure. |
| 132 | //! |
| 133 | //! A common concurrency pattern for resource management is to spawn a task |
| 134 | //! dedicated to managing that resource and using message passing between other |
| 135 | //! tasks to interact with the resource. The resource may be anything that may |
| 136 | //! not be concurrently used. Some examples include a socket and program state. |
| 137 | //! For example, if multiple tasks need to send data over a single socket, spawn |
| 138 | //! a task to manage the socket and use a channel to synchronize. |
| 139 | //! |
| 140 | //! **Example:** sending data from many tasks over a single socket using message |
| 141 | //! passing. |
| 142 | //! |
| 143 | //! ```no_run |
| 144 | //! use tokio::io::{self, AsyncWriteExt}; |
| 145 | //! use tokio::net::TcpStream; |
| 146 | //! use tokio::sync::mpsc; |
| 147 | //! |
| 148 | //! #[tokio::main] |
| 149 | //! async fn main() -> io::Result<()> { |
| 150 | //! let mut socket = TcpStream::connect("www.example.com:1234" ).await?; |
| 151 | //! let (tx, mut rx) = mpsc::channel(100); |
| 152 | //! |
| 153 | //! for _ in 0..10 { |
| 154 | //! // Each task needs its own `tx` handle. This is done by cloning the |
| 155 | //! // original handle. |
| 156 | //! let tx = tx.clone(); |
| 157 | //! |
| 158 | //! tokio::spawn(async move { |
| 159 | //! tx.send(&b"data to write" [..]).await.unwrap(); |
| 160 | //! }); |
| 161 | //! } |
| 162 | //! |
| 163 | //! // The `rx` half of the channel returns `None` once **all** `tx` clones |
| 164 | //! // drop. To ensure `None` is returned, drop the handle owned by the |
| 165 | //! // current task. If this `tx` handle is not dropped, there will always |
| 166 | //! // be a single outstanding `tx` handle. |
| 167 | //! drop(tx); |
| 168 | //! |
| 169 | //! while let Some(res) = rx.recv().await { |
| 170 | //! socket.write_all(res).await?; |
| 171 | //! } |
| 172 | //! |
| 173 | //! Ok(()) |
| 174 | //! } |
| 175 | //! ``` |
| 176 | //! |
| 177 | //! The [`mpsc`] and [`oneshot`] channels can be combined to provide a request / |
| 178 | //! response type synchronization pattern with a shared resource. A task is |
| 179 | //! spawned to synchronize a resource and waits on commands received on a |
| 180 | //! [`mpsc`] channel. Each command includes a [`oneshot`] `Sender` on which the |
| 181 | //! result of the command is sent. |
| 182 | //! |
| 183 | //! **Example:** use a task to synchronize a `u64` counter. Each task sends an |
| 184 | //! "fetch and increment" command. The counter value **before** the increment is |
| 185 | //! sent over the provided `oneshot` channel. |
| 186 | //! |
| 187 | //! ``` |
| 188 | //! use tokio::sync::{oneshot, mpsc}; |
| 189 | //! use Command::Increment; |
| 190 | //! |
| 191 | //! enum Command { |
| 192 | //! Increment, |
| 193 | //! // Other commands can be added here |
| 194 | //! } |
| 195 | //! |
| 196 | //! #[tokio::main] |
| 197 | //! async fn main() { |
| 198 | //! let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100); |
| 199 | //! |
| 200 | //! // Spawn a task to manage the counter |
| 201 | //! tokio::spawn(async move { |
| 202 | //! let mut counter: u64 = 0; |
| 203 | //! |
| 204 | //! while let Some((cmd, response)) = cmd_rx.recv().await { |
| 205 | //! match cmd { |
| 206 | //! Increment => { |
| 207 | //! let prev = counter; |
| 208 | //! counter += 1; |
| 209 | //! response.send(prev).unwrap(); |
| 210 | //! } |
| 211 | //! } |
| 212 | //! } |
| 213 | //! }); |
| 214 | //! |
| 215 | //! let mut join_handles = vec![]; |
| 216 | //! |
| 217 | //! // Spawn tasks that will send the increment command. |
| 218 | //! for _ in 0..10 { |
| 219 | //! let cmd_tx = cmd_tx.clone(); |
| 220 | //! |
| 221 | //! join_handles.push(tokio::spawn(async move { |
| 222 | //! let (resp_tx, resp_rx) = oneshot::channel(); |
| 223 | //! |
| 224 | //! cmd_tx.send((Increment, resp_tx)).await.ok().unwrap(); |
| 225 | //! let res = resp_rx.await.unwrap(); |
| 226 | //! |
| 227 | //! println!("previous value = {}" , res); |
| 228 | //! })); |
| 229 | //! } |
| 230 | //! |
| 231 | //! // Wait for all tasks to complete |
| 232 | //! for join_handle in join_handles.drain(..) { |
| 233 | //! join_handle.await.unwrap(); |
| 234 | //! } |
| 235 | //! } |
| 236 | //! ``` |
| 237 | //! |
| 238 | //! ## `broadcast` channel |
| 239 | //! |
| 240 | //! The [`broadcast` channel] supports sending **many** values from |
| 241 | //! **many** producers to **many** consumers. Each consumer will receive |
| 242 | //! **each** value. This channel can be used to implement "fan out" style |
| 243 | //! patterns common with pub / sub or "chat" systems. |
| 244 | //! |
| 245 | //! This channel tends to be used less often than `oneshot` and `mpsc` but still |
| 246 | //! has its use cases. |
| 247 | //! |
| 248 | //! This is also the channel you should use if you want to broadcast values from |
| 249 | //! a single producer to many consumers. There is no dedicated spmc broadcast |
| 250 | //! channel. |
| 251 | //! |
| 252 | //! Basic usage |
| 253 | //! |
| 254 | //! ``` |
| 255 | //! use tokio::sync::broadcast; |
| 256 | //! |
| 257 | //! #[tokio::main] |
| 258 | //! async fn main() { |
| 259 | //! let (tx, mut rx1) = broadcast::channel(16); |
| 260 | //! let mut rx2 = tx.subscribe(); |
| 261 | //! |
| 262 | //! tokio::spawn(async move { |
| 263 | //! assert_eq!(rx1.recv().await.unwrap(), 10); |
| 264 | //! assert_eq!(rx1.recv().await.unwrap(), 20); |
| 265 | //! }); |
| 266 | //! |
| 267 | //! tokio::spawn(async move { |
| 268 | //! assert_eq!(rx2.recv().await.unwrap(), 10); |
| 269 | //! assert_eq!(rx2.recv().await.unwrap(), 20); |
| 270 | //! }); |
| 271 | //! |
| 272 | //! tx.send(10).unwrap(); |
| 273 | //! tx.send(20).unwrap(); |
| 274 | //! } |
| 275 | //! ``` |
| 276 | //! |
| 277 | //! [`broadcast` channel]: crate::sync::broadcast |
| 278 | //! |
| 279 | //! ## `watch` channel |
| 280 | //! |
| 281 | //! The [`watch` channel] supports sending **many** values from a **many** |
| 282 | //! producer to **many** consumers. However, only the **most recent** value is |
| 283 | //! stored in the channel. Consumers are notified when a new value is sent, but |
| 284 | //! there is no guarantee that consumers will see **all** values. |
| 285 | //! |
| 286 | //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1. |
| 287 | //! |
| 288 | //! Use cases for the [`watch` channel] include broadcasting configuration |
| 289 | //! changes or signalling program state changes, such as transitioning to |
| 290 | //! shutdown. |
| 291 | //! |
| 292 | //! **Example:** use a [`watch` channel] to notify tasks of configuration |
| 293 | //! changes. In this example, a configuration file is checked periodically. When |
| 294 | //! the file changes, the configuration changes are signalled to consumers. |
| 295 | //! |
| 296 | //! ``` |
| 297 | //! use tokio::sync::watch; |
| 298 | //! use tokio::time::{self, Duration, Instant}; |
| 299 | //! |
| 300 | //! use std::io; |
| 301 | //! |
| 302 | //! #[derive(Debug, Clone, Eq, PartialEq)] |
| 303 | //! struct Config { |
| 304 | //! timeout: Duration, |
| 305 | //! } |
| 306 | //! |
| 307 | //! impl Config { |
| 308 | //! async fn load_from_file() -> io::Result<Config> { |
| 309 | //! // file loading and deserialization logic here |
| 310 | //! # Ok(Config { timeout: Duration::from_secs(1) }) |
| 311 | //! } |
| 312 | //! } |
| 313 | //! |
| 314 | //! async fn my_async_operation() { |
| 315 | //! // Do something here |
| 316 | //! } |
| 317 | //! |
| 318 | //! #[tokio::main] |
| 319 | //! async fn main() { |
| 320 | //! // Load initial configuration value |
| 321 | //! let mut config = Config::load_from_file().await.unwrap(); |
| 322 | //! |
| 323 | //! // Create the watch channel, initialized with the loaded configuration |
| 324 | //! let (tx, rx) = watch::channel(config.clone()); |
| 325 | //! |
| 326 | //! // Spawn a task to monitor the file. |
| 327 | //! tokio::spawn(async move { |
| 328 | //! loop { |
| 329 | //! // Wait 10 seconds between checks |
| 330 | //! time::sleep(Duration::from_secs(10)).await; |
| 331 | //! |
| 332 | //! // Load the configuration file |
| 333 | //! let new_config = Config::load_from_file().await.unwrap(); |
| 334 | //! |
| 335 | //! // If the configuration changed, send the new config value |
| 336 | //! // on the watch channel. |
| 337 | //! if new_config != config { |
| 338 | //! tx.send(new_config.clone()).unwrap(); |
| 339 | //! config = new_config; |
| 340 | //! } |
| 341 | //! } |
| 342 | //! }); |
| 343 | //! |
| 344 | //! let mut handles = vec![]; |
| 345 | //! |
| 346 | //! // Spawn tasks that runs the async operation for at most `timeout`. If |
| 347 | //! // the timeout elapses, restart the operation. |
| 348 | //! // |
| 349 | //! // The task simultaneously watches the `Config` for changes. When the |
| 350 | //! // timeout duration changes, the timeout is updated without restarting |
| 351 | //! // the in-flight operation. |
| 352 | //! for _ in 0..5 { |
| 353 | //! // Clone a config watch handle for use in this task |
| 354 | //! let mut rx = rx.clone(); |
| 355 | //! |
| 356 | //! let handle = tokio::spawn(async move { |
| 357 | //! // Start the initial operation and pin the future to the stack. |
| 358 | //! // Pinning to the stack is required to resume the operation |
| 359 | //! // across multiple calls to `select!` |
| 360 | //! let op = my_async_operation(); |
| 361 | //! tokio::pin!(op); |
| 362 | //! |
| 363 | //! // Get the initial config value |
| 364 | //! let mut conf = rx.borrow().clone(); |
| 365 | //! |
| 366 | //! let mut op_start = Instant::now(); |
| 367 | //! let sleep = time::sleep_until(op_start + conf.timeout); |
| 368 | //! tokio::pin!(sleep); |
| 369 | //! |
| 370 | //! loop { |
| 371 | //! tokio::select! { |
| 372 | //! _ = &mut sleep => { |
| 373 | //! // The operation elapsed. Restart it |
| 374 | //! op.set(my_async_operation()); |
| 375 | //! |
| 376 | //! // Track the new start time |
| 377 | //! op_start = Instant::now(); |
| 378 | //! |
| 379 | //! // Restart the timeout |
| 380 | //! sleep.set(time::sleep_until(op_start + conf.timeout)); |
| 381 | //! } |
| 382 | //! _ = rx.changed() => { |
| 383 | //! conf = rx.borrow_and_update().clone(); |
| 384 | //! |
| 385 | //! // The configuration has been updated. Update the |
| 386 | //! // `sleep` using the new `timeout` value. |
| 387 | //! sleep.as_mut().reset(op_start + conf.timeout); |
| 388 | //! } |
| 389 | //! _ = &mut op => { |
| 390 | //! // The operation completed! |
| 391 | //! return |
| 392 | //! } |
| 393 | //! } |
| 394 | //! } |
| 395 | //! }); |
| 396 | //! |
| 397 | //! handles.push(handle); |
| 398 | //! } |
| 399 | //! |
| 400 | //! for handle in handles.drain(..) { |
| 401 | //! handle.await.unwrap(); |
| 402 | //! } |
| 403 | //! } |
| 404 | //! ``` |
| 405 | //! |
| 406 | //! [`watch` channel]: mod@crate::sync::watch |
| 407 | //! [`broadcast` channel]: mod@crate::sync::broadcast |
| 408 | //! |
| 409 | //! # State synchronization |
| 410 | //! |
| 411 | //! The remaining synchronization primitives focus on synchronizing state. |
| 412 | //! These are asynchronous equivalents to versions provided by `std`. They |
| 413 | //! operate in a similar way as their `std` counterparts but will wait |
| 414 | //! asynchronously instead of blocking the thread. |
| 415 | //! |
| 416 | //! * [`Barrier`] Ensures multiple tasks will wait for each other to reach a |
| 417 | //! point in the program, before continuing execution all together. |
| 418 | //! |
| 419 | //! * [`Mutex`] Mutual Exclusion mechanism, which ensures that at most one |
| 420 | //! thread at a time is able to access some data. |
| 421 | //! |
| 422 | //! * [`Notify`] Basic task notification. `Notify` supports notifying a |
| 423 | //! receiving task without sending data. In this case, the task wakes up and |
| 424 | //! resumes processing. |
| 425 | //! |
| 426 | //! * [`RwLock`] Provides a mutual exclusion mechanism which allows multiple |
| 427 | //! readers at the same time, while allowing only one writer at a time. In |
| 428 | //! some cases, this can be more efficient than a mutex. |
| 429 | //! |
| 430 | //! * [`Semaphore`] Limits the amount of concurrency. A semaphore holds a |
| 431 | //! number of permits, which tasks may request in order to enter a critical |
| 432 | //! section. Semaphores are useful for implementing limiting or bounding of |
| 433 | //! any kind. |
| 434 | //! |
| 435 | //! # Runtime compatibility |
| 436 | //! |
| 437 | //! All synchronization primitives provided in this module are runtime agnostic. |
| 438 | //! You can freely move them between different instances of the Tokio runtime |
| 439 | //! or even use them from non-Tokio runtimes. |
| 440 | //! |
| 441 | //! When used in a Tokio runtime, the synchronization primitives participate in |
| 442 | //! [cooperative scheduling](crate::task::coop#cooperative-scheduling) to avoid |
| 443 | //! starvation. This feature does not apply when used from non-Tokio runtimes. |
| 444 | //! |
| 445 | //! As an exception, methods ending in `_timeout` are not runtime agnostic |
| 446 | //! because they require access to the Tokio timer. See the documentation of |
| 447 | //! each `*_timeout` method for more information on its use. |
| 448 | |
| 449 | cfg_sync! { |
| 450 | /// Named future types. |
| 451 | pub mod futures { |
| 452 | pub use super::notify::Notified; |
| 453 | } |
| 454 | |
| 455 | mod barrier; |
| 456 | pub use barrier::{Barrier, BarrierWaitResult}; |
| 457 | |
| 458 | pub mod broadcast; |
| 459 | |
| 460 | pub mod mpsc; |
| 461 | |
| 462 | mod mutex; |
| 463 | pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard}; |
| 464 | |
| 465 | pub(crate) mod notify; |
| 466 | pub use notify::Notify; |
| 467 | |
| 468 | pub mod oneshot; |
| 469 | |
| 470 | pub(crate) mod batch_semaphore; |
| 471 | pub use batch_semaphore::{AcquireError, TryAcquireError}; |
| 472 | |
| 473 | mod semaphore; |
| 474 | pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; |
| 475 | |
| 476 | mod rwlock; |
| 477 | pub use rwlock::RwLock; |
| 478 | pub use rwlock::owned_read_guard::OwnedRwLockReadGuard; |
| 479 | pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard; |
| 480 | pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard; |
| 481 | pub use rwlock::read_guard::RwLockReadGuard; |
| 482 | pub use rwlock::write_guard::RwLockWriteGuard; |
| 483 | pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard; |
| 484 | |
| 485 | mod task; |
| 486 | pub(crate) use task::AtomicWaker; |
| 487 | |
| 488 | mod once_cell; |
| 489 | pub use self::once_cell::{OnceCell, SetError}; |
| 490 | |
| 491 | pub mod watch; |
| 492 | } |
| 493 | |
| 494 | cfg_not_sync! { |
| 495 | cfg_fs! { |
| 496 | pub(crate) mod batch_semaphore; |
| 497 | mod mutex; |
| 498 | pub(crate) use mutex::Mutex; |
| 499 | } |
| 500 | |
| 501 | #[cfg (any(feature = "rt" , feature = "signal" , all(unix, feature = "process" )))] |
| 502 | pub(crate) mod notify; |
| 503 | |
| 504 | #[cfg (any(feature = "rt" , all(windows, feature = "process" )))] |
| 505 | pub(crate) mod oneshot; |
| 506 | |
| 507 | cfg_atomic_waker_impl! { |
| 508 | mod task; |
| 509 | pub(crate) use task::AtomicWaker; |
| 510 | } |
| 511 | |
| 512 | #[cfg (any(feature = "signal" , all(unix, feature = "process" )))] |
| 513 | pub(crate) mod watch; |
| 514 | } |
| 515 | |
| 516 | /// Unit tests |
| 517 | #[cfg (test)] |
| 518 | mod tests; |
| 519 | |