| 1 | #![ cfg_attr(loom, allow(dead_code, unreachable_pub, unused_imports))] | 
| 2 |  | 
|---|
| 3 | //! Synchronization primitives for use in asynchronous contexts. | 
|---|
| 4 | //! | 
|---|
| 5 | //! Tokio programs tend to be organized as a set of [tasks] where each task | 
|---|
| 6 | //! operates independently and may be executed on separate physical threads. The | 
|---|
| 7 | //! synchronization primitives provided in this module permit these independent | 
|---|
| 8 | //! tasks to communicate together. | 
|---|
| 9 | //! | 
|---|
| 10 | //! [tasks]: crate::task | 
|---|
| 11 | //! | 
|---|
| 12 | //! # Message passing | 
|---|
| 13 | //! | 
|---|
| 14 | //! The most common form of synchronization in a Tokio program is message | 
|---|
| 15 | //! passing. Two tasks operate independently and send messages to each other to | 
|---|
| 16 | //! synchronize. Doing so has the advantage of avoiding shared state. | 
|---|
| 17 | //! | 
|---|
| 18 | //! Message passing is implemented using channels. A channel supports sending a | 
|---|
| 19 | //! message from one producer task to one or more consumer tasks. There are a | 
|---|
| 20 | //! few flavors of channels provided by Tokio. Each channel flavor supports | 
|---|
| 21 | //! different message passing patterns. When a channel supports multiple | 
|---|
| 22 | //! producers, many separate tasks may **send** messages. When a channel | 
|---|
| 23 | //! supports multiple consumers, many different separate tasks may **receive** | 
|---|
| 24 | //! messages. | 
|---|
| 25 | //! | 
|---|
| 26 | //! Tokio provides many different channel flavors as different message passing | 
|---|
| 27 | //! patterns are best handled with different implementations. | 
|---|
| 28 | //! | 
|---|
| 29 | //! ## `oneshot` channel | 
|---|
| 30 | //! | 
|---|
| 31 | //! The [`oneshot` channel][oneshot] supports sending a **single** value from a | 
|---|
| 32 | //! single producer to a single consumer. This channel is usually used to send | 
|---|
| 33 | //! the result of a computation to a waiter. | 
|---|
| 34 | //! | 
|---|
| 35 | //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a | 
|---|
| 36 | //! computation. | 
|---|
| 37 | //! | 
|---|
| 38 | //! ``` | 
|---|
| 39 | //! use tokio::sync::oneshot; | 
|---|
| 40 | //! | 
|---|
| 41 | //! async fn some_computation() -> String { | 
|---|
| 42 | //! "represents the result of the computation".to_string() | 
|---|
| 43 | //! } | 
|---|
| 44 | //! | 
|---|
| 45 | //! #[tokio::main] | 
|---|
| 46 | //! async fn main() { | 
|---|
| 47 | //!     let (tx, rx) = oneshot::channel(); | 
|---|
| 48 | //! | 
|---|
| 49 | //!     tokio::spawn(async move { | 
|---|
| 50 | //!         let res = some_computation().await; | 
|---|
| 51 | //!         tx.send(res).unwrap(); | 
|---|
| 52 | //!     }); | 
|---|
| 53 | //! | 
|---|
| 54 | //!     // Do other work while the computation is happening in the background | 
|---|
| 55 | //! | 
|---|
| 56 | //!     // Wait for the computation result | 
|---|
| 57 | //!     let res = rx.await.unwrap(); | 
|---|
| 58 | //! } | 
|---|
| 59 | //! ``` | 
|---|
| 60 | //! | 
|---|
| 61 | //! Note, if the task produces a computation result as its final | 
|---|
| 62 | //! action before terminating, the [`JoinHandle`] can be used to | 
|---|
| 63 | //! receive that value instead of allocating resources for the | 
|---|
| 64 | //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If | 
|---|
| 65 | //! the task panics, the `Joinhandle` yields `Err` with the panic | 
|---|
| 66 | //! cause. | 
|---|
| 67 | //! | 
|---|
| 68 | //! **Example:** | 
|---|
| 69 | //! | 
|---|
| 70 | //! ``` | 
|---|
| 71 | //! async fn some_computation() -> String { | 
|---|
| 72 | //! "the result of the computation".to_string() | 
|---|
| 73 | //! } | 
|---|
| 74 | //! | 
|---|
| 75 | //! #[tokio::main] | 
|---|
| 76 | //! async fn main() { | 
|---|
| 77 | //!     let join_handle = tokio::spawn(async move { | 
|---|
| 78 | //!         some_computation().await | 
|---|
| 79 | //!     }); | 
|---|
| 80 | //! | 
|---|
| 81 | //!     // Do other work while the computation is happening in the background | 
|---|
| 82 | //! | 
|---|
| 83 | //!     // Wait for the computation result | 
|---|
| 84 | //!     let res = join_handle.await.unwrap(); | 
|---|
| 85 | //! } | 
|---|
| 86 | //! ``` | 
|---|
| 87 | //! | 
|---|
| 88 | //! [`JoinHandle`]: crate::task::JoinHandle | 
|---|
| 89 | //! | 
|---|
| 90 | //! ## `mpsc` channel | 
|---|
| 91 | //! | 
|---|
| 92 | //! The [`mpsc` channel][mpsc] supports sending **many** values from **many** | 
|---|
| 93 | //! producers to a single consumer. This channel is often used to send work to a | 
|---|
| 94 | //! task or to receive the result of many computations. | 
|---|
| 95 | //! | 
|---|
| 96 | //! This is also the channel you should use if you want to send many messages | 
|---|
| 97 | //! from a single producer to a single consumer. There is no dedicated spsc | 
|---|
| 98 | //! channel. | 
|---|
| 99 | //! | 
|---|
| 100 | //! **Example:** using an mpsc to incrementally stream the results of a series | 
|---|
| 101 | //! of computations. | 
|---|
| 102 | //! | 
|---|
| 103 | //! ``` | 
|---|
| 104 | //! use tokio::sync::mpsc; | 
|---|
| 105 | //! | 
|---|
| 106 | //! async fn some_computation(input: u32) -> String { | 
|---|
| 107 | //!     format!( "the result of computation {}", input) | 
|---|
| 108 | //! } | 
|---|
| 109 | //! | 
|---|
| 110 | //! #[tokio::main] | 
|---|
| 111 | //! async fn main() { | 
|---|
| 112 | //!     let (tx, mut rx) = mpsc::channel(100); | 
|---|
| 113 | //! | 
|---|
| 114 | //!     tokio::spawn(async move { | 
|---|
| 115 | //!         for i in 0..10 { | 
|---|
| 116 | //!             let res = some_computation(i).await; | 
|---|
| 117 | //!             tx.send(res).await.unwrap(); | 
|---|
| 118 | //!         } | 
|---|
| 119 | //!     }); | 
|---|
| 120 | //! | 
|---|
| 121 | //!     while let Some(res) = rx.recv().await { | 
|---|
| 122 | //!         println!( "got = {}", res); | 
|---|
| 123 | //!     } | 
|---|
| 124 | //! } | 
|---|
| 125 | //! ``` | 
|---|
| 126 | //! | 
|---|
| 127 | //! The argument to `mpsc::channel` is the channel capacity. This is the maximum | 
|---|
| 128 | //! number of values that can be stored in the channel pending receipt at any | 
|---|
| 129 | //! given time. Properly setting this value is key in implementing robust | 
|---|
| 130 | //! programs as the channel capacity plays a critical part in handling back | 
|---|
| 131 | //! pressure. | 
|---|
| 132 | //! | 
|---|
| 133 | //! A common concurrency pattern for resource management is to spawn a task | 
|---|
| 134 | //! dedicated to managing that resource and using message passing between other | 
|---|
| 135 | //! tasks to interact with the resource. The resource may be anything that may | 
|---|
| 136 | //! not be concurrently used. Some examples include a socket and program state. | 
|---|
| 137 | //! For example, if multiple tasks need to send data over a single socket, spawn | 
|---|
| 138 | //! a task to manage the socket and use a channel to synchronize. | 
|---|
| 139 | //! | 
|---|
| 140 | //! **Example:** sending data from many tasks over a single socket using message | 
|---|
| 141 | //! passing. | 
|---|
| 142 | //! | 
|---|
| 143 | //! ```no_run | 
|---|
| 144 | //! use tokio::io::{self, AsyncWriteExt}; | 
|---|
| 145 | //! use tokio::net::TcpStream; | 
|---|
| 146 | //! use tokio::sync::mpsc; | 
|---|
| 147 | //! | 
|---|
| 148 | //! #[tokio::main] | 
|---|
| 149 | //! async fn main() -> io::Result<()> { | 
|---|
| 150 | //!     let mut socket = TcpStream::connect( "www.example.com:1234").await?; | 
|---|
| 151 | //!     let (tx, mut rx) = mpsc::channel(100); | 
|---|
| 152 | //! | 
|---|
| 153 | //!     for _ in 0..10 { | 
|---|
| 154 | //!         // Each task needs its own `tx` handle. This is done by cloning the | 
|---|
| 155 | //!         // original handle. | 
|---|
| 156 | //!         let tx = tx.clone(); | 
|---|
| 157 | //! | 
|---|
| 158 | //!         tokio::spawn(async move { | 
|---|
| 159 | //!             tx.send(& b"data to write"[..]).await.unwrap(); | 
|---|
| 160 | //!         }); | 
|---|
| 161 | //!     } | 
|---|
| 162 | //! | 
|---|
| 163 | //!     // The `rx` half of the channel returns `None` once **all** `tx` clones | 
|---|
| 164 | //!     // drop. To ensure `None` is returned, drop the handle owned by the | 
|---|
| 165 | //!     // current task. If this `tx` handle is not dropped, there will always | 
|---|
| 166 | //!     // be a single outstanding `tx` handle. | 
|---|
| 167 | //!     drop(tx); | 
|---|
| 168 | //! | 
|---|
| 169 | //!     while let Some(res) = rx.recv().await { | 
|---|
| 170 | //!         socket.write_all(res).await?; | 
|---|
| 171 | //!     } | 
|---|
| 172 | //! | 
|---|
| 173 | //!     Ok(()) | 
|---|
| 174 | //! } | 
|---|
| 175 | //! ``` | 
|---|
| 176 | //! | 
|---|
| 177 | //! The [`mpsc`] and [`oneshot`] channels can be combined to provide a request / | 
|---|
| 178 | //! response type synchronization pattern with a shared resource. A task is | 
|---|
| 179 | //! spawned to synchronize a resource and waits on commands received on a | 
|---|
| 180 | //! [`mpsc`] channel. Each command includes a [`oneshot`] `Sender` on which the | 
|---|
| 181 | //! result of the command is sent. | 
|---|
| 182 | //! | 
|---|
| 183 | //! **Example:** use a task to synchronize a `u64` counter. Each task sends an | 
|---|
| 184 | //! "fetch and increment" command. The counter value **before** the increment is | 
|---|
| 185 | //! sent over the provided `oneshot` channel. | 
|---|
| 186 | //! | 
|---|
| 187 | //! ``` | 
|---|
| 188 | //! use tokio::sync::{oneshot, mpsc}; | 
|---|
| 189 | //! use Command::Increment; | 
|---|
| 190 | //! | 
|---|
| 191 | //! enum Command { | 
|---|
| 192 | //!     Increment, | 
|---|
| 193 | //!     // Other commands can be added here | 
|---|
| 194 | //! } | 
|---|
| 195 | //! | 
|---|
| 196 | //! #[tokio::main] | 
|---|
| 197 | //! async fn main() { | 
|---|
| 198 | //!     let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100); | 
|---|
| 199 | //! | 
|---|
| 200 | //!     // Spawn a task to manage the counter | 
|---|
| 201 | //!     tokio::spawn(async move { | 
|---|
| 202 | //!         let mut counter: u64 = 0; | 
|---|
| 203 | //! | 
|---|
| 204 | //!         while let Some((cmd, response)) = cmd_rx.recv().await { | 
|---|
| 205 | //!             match cmd { | 
|---|
| 206 | //!                 Increment => { | 
|---|
| 207 | //!                     let prev = counter; | 
|---|
| 208 | //!                     counter += 1; | 
|---|
| 209 | //!                     response.send(prev).unwrap(); | 
|---|
| 210 | //!                 } | 
|---|
| 211 | //!             } | 
|---|
| 212 | //!         } | 
|---|
| 213 | //!     }); | 
|---|
| 214 | //! | 
|---|
| 215 | //!     let mut join_handles = vec![]; | 
|---|
| 216 | //! | 
|---|
| 217 | //!     // Spawn tasks that will send the increment command. | 
|---|
| 218 | //!     for _ in 0..10 { | 
|---|
| 219 | //!         let cmd_tx = cmd_tx.clone(); | 
|---|
| 220 | //! | 
|---|
| 221 | //!         join_handles.push(tokio::spawn(async move { | 
|---|
| 222 | //!             let (resp_tx, resp_rx) = oneshot::channel(); | 
|---|
| 223 | //! | 
|---|
| 224 | //!             cmd_tx.send((Increment, resp_tx)).await.ok().unwrap(); | 
|---|
| 225 | //!             let res = resp_rx.await.unwrap(); | 
|---|
| 226 | //! | 
|---|
| 227 | //!             println!( "previous value = {}", res); | 
|---|
| 228 | //!         })); | 
|---|
| 229 | //!     } | 
|---|
| 230 | //! | 
|---|
| 231 | //!     // Wait for all tasks to complete | 
|---|
| 232 | //!     for join_handle in join_handles.drain(..) { | 
|---|
| 233 | //!         join_handle.await.unwrap(); | 
|---|
| 234 | //!     } | 
|---|
| 235 | //! } | 
|---|
| 236 | //! ``` | 
|---|
| 237 | //! | 
|---|
| 238 | //! ## `broadcast` channel | 
|---|
| 239 | //! | 
|---|
| 240 | //! The [`broadcast` channel] supports sending **many** values from | 
|---|
| 241 | //! **many** producers to **many** consumers. Each consumer will receive | 
|---|
| 242 | //! **each** value. This channel can be used to implement "fan out" style | 
|---|
| 243 | //! patterns common with pub / sub or "chat" systems. | 
|---|
| 244 | //! | 
|---|
| 245 | //! This channel tends to be used less often than `oneshot` and `mpsc` but still | 
|---|
| 246 | //! has its use cases. | 
|---|
| 247 | //! | 
|---|
| 248 | //! This is also the channel you should use if you want to broadcast values from | 
|---|
| 249 | //! a single producer to many consumers. There is no dedicated spmc broadcast | 
|---|
| 250 | //! channel. | 
|---|
| 251 | //! | 
|---|
| 252 | //! Basic usage | 
|---|
| 253 | //! | 
|---|
| 254 | //! ``` | 
|---|
| 255 | //! use tokio::sync::broadcast; | 
|---|
| 256 | //! | 
|---|
| 257 | //! #[tokio::main] | 
|---|
| 258 | //! async fn main() { | 
|---|
| 259 | //!     let (tx, mut rx1) = broadcast::channel(16); | 
|---|
| 260 | //!     let mut rx2 = tx.subscribe(); | 
|---|
| 261 | //! | 
|---|
| 262 | //!     tokio::spawn(async move { | 
|---|
| 263 | //!         assert_eq!(rx1.recv().await.unwrap(), 10); | 
|---|
| 264 | //!         assert_eq!(rx1.recv().await.unwrap(), 20); | 
|---|
| 265 | //!     }); | 
|---|
| 266 | //! | 
|---|
| 267 | //!     tokio::spawn(async move { | 
|---|
| 268 | //!         assert_eq!(rx2.recv().await.unwrap(), 10); | 
|---|
| 269 | //!         assert_eq!(rx2.recv().await.unwrap(), 20); | 
|---|
| 270 | //!     }); | 
|---|
| 271 | //! | 
|---|
| 272 | //!     tx.send(10).unwrap(); | 
|---|
| 273 | //!     tx.send(20).unwrap(); | 
|---|
| 274 | //! } | 
|---|
| 275 | //! ``` | 
|---|
| 276 | //! | 
|---|
| 277 | //! [`broadcast` channel]: crate::sync::broadcast | 
|---|
| 278 | //! | 
|---|
| 279 | //! ## `watch` channel | 
|---|
| 280 | //! | 
|---|
| 281 | //! The [`watch` channel] supports sending **many** values from a **many** | 
|---|
| 282 | //! producer to **many** consumers. However, only the **most recent** value is | 
|---|
| 283 | //! stored in the channel. Consumers are notified when a new value is sent, but | 
|---|
| 284 | //! there is no guarantee that consumers will see **all** values. | 
|---|
| 285 | //! | 
|---|
| 286 | //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1. | 
|---|
| 287 | //! | 
|---|
| 288 | //! Use cases for the [`watch` channel] include broadcasting configuration | 
|---|
| 289 | //! changes or signalling program state changes, such as transitioning to | 
|---|
| 290 | //! shutdown. | 
|---|
| 291 | //! | 
|---|
| 292 | //! **Example:** use a [`watch` channel] to notify tasks of configuration | 
|---|
| 293 | //! changes. In this example, a configuration file is checked periodically. When | 
|---|
| 294 | //! the file changes, the configuration changes are signalled to consumers. | 
|---|
| 295 | //! | 
|---|
| 296 | //! ``` | 
|---|
| 297 | //! use tokio::sync::watch; | 
|---|
| 298 | //! use tokio::time::{self, Duration, Instant}; | 
|---|
| 299 | //! | 
|---|
| 300 | //! use std::io; | 
|---|
| 301 | //! | 
|---|
| 302 | //! #[derive(Debug, Clone, Eq, PartialEq)] | 
|---|
| 303 | //! struct Config { | 
|---|
| 304 | //!     timeout: Duration, | 
|---|
| 305 | //! } | 
|---|
| 306 | //! | 
|---|
| 307 | //! impl Config { | 
|---|
| 308 | //!     async fn load_from_file() -> io::Result<Config> { | 
|---|
| 309 | //!         // file loading and deserialization logic here | 
|---|
| 310 | //! # Ok(Config { timeout: Duration::from_secs(1) }) | 
|---|
| 311 | //!     } | 
|---|
| 312 | //! } | 
|---|
| 313 | //! | 
|---|
| 314 | //! async fn my_async_operation() { | 
|---|
| 315 | //!     // Do something here | 
|---|
| 316 | //! } | 
|---|
| 317 | //! | 
|---|
| 318 | //! #[tokio::main] | 
|---|
| 319 | //! async fn main() { | 
|---|
| 320 | //!     // Load initial configuration value | 
|---|
| 321 | //!     let mut config = Config::load_from_file().await.unwrap(); | 
|---|
| 322 | //! | 
|---|
| 323 | //!     // Create the watch channel, initialized with the loaded configuration | 
|---|
| 324 | //!     let (tx, rx) = watch::channel(config.clone()); | 
|---|
| 325 | //! | 
|---|
| 326 | //!     // Spawn a task to monitor the file. | 
|---|
| 327 | //!     tokio::spawn(async move { | 
|---|
| 328 | //!         loop { | 
|---|
| 329 | //!             // Wait 10 seconds between checks | 
|---|
| 330 | //!             time::sleep(Duration::from_secs(10)).await; | 
|---|
| 331 | //! | 
|---|
| 332 | //!             // Load the configuration file | 
|---|
| 333 | //!             let new_config = Config::load_from_file().await.unwrap(); | 
|---|
| 334 | //! | 
|---|
| 335 | //!             // If the configuration changed, send the new config value | 
|---|
| 336 | //!             // on the watch channel. | 
|---|
| 337 | //!             if new_config != config { | 
|---|
| 338 | //!                 tx.send(new_config.clone()).unwrap(); | 
|---|
| 339 | //!                 config = new_config; | 
|---|
| 340 | //!             } | 
|---|
| 341 | //!         } | 
|---|
| 342 | //!     }); | 
|---|
| 343 | //! | 
|---|
| 344 | //!     let mut handles = vec![]; | 
|---|
| 345 | //! | 
|---|
| 346 | //!     // Spawn tasks that runs the async operation for at most `timeout`. If | 
|---|
| 347 | //!     // the timeout elapses, restart the operation. | 
|---|
| 348 | //!     // | 
|---|
| 349 | //!     // The task simultaneously watches the `Config` for changes. When the | 
|---|
| 350 | //!     // timeout duration changes, the timeout is updated without restarting | 
|---|
| 351 | //!     // the in-flight operation. | 
|---|
| 352 | //!     for _ in 0..5 { | 
|---|
| 353 | //!         // Clone a config watch handle for use in this task | 
|---|
| 354 | //!         let mut rx = rx.clone(); | 
|---|
| 355 | //! | 
|---|
| 356 | //!         let handle = tokio::spawn(async move { | 
|---|
| 357 | //!             // Start the initial operation and pin the future to the stack. | 
|---|
| 358 | //!             // Pinning to the stack is required to resume the operation | 
|---|
| 359 | //!             // across multiple calls to `select!` | 
|---|
| 360 | //!             let op = my_async_operation(); | 
|---|
| 361 | //!             tokio::pin!(op); | 
|---|
| 362 | //! | 
|---|
| 363 | //!             // Get the initial config value | 
|---|
| 364 | //!             let mut conf = rx.borrow().clone(); | 
|---|
| 365 | //! | 
|---|
| 366 | //!             let mut op_start = Instant::now(); | 
|---|
| 367 | //!             let sleep = time::sleep_until(op_start + conf.timeout); | 
|---|
| 368 | //!             tokio::pin!(sleep); | 
|---|
| 369 | //! | 
|---|
| 370 | //!             loop { | 
|---|
| 371 | //!                 tokio::select! { | 
|---|
| 372 | //!                     _ = &mut sleep => { | 
|---|
| 373 | //!                         // The operation elapsed. Restart it | 
|---|
| 374 | //!                         op.set(my_async_operation()); | 
|---|
| 375 | //! | 
|---|
| 376 | //!                         // Track the new start time | 
|---|
| 377 | //!                         op_start = Instant::now(); | 
|---|
| 378 | //! | 
|---|
| 379 | //!                         // Restart the timeout | 
|---|
| 380 | //!                         sleep.set(time::sleep_until(op_start + conf.timeout)); | 
|---|
| 381 | //!                     } | 
|---|
| 382 | //!                     _ = rx.changed() => { | 
|---|
| 383 | //!                         conf = rx.borrow_and_update().clone(); | 
|---|
| 384 | //! | 
|---|
| 385 | //!                         // The configuration has been updated. Update the | 
|---|
| 386 | //!                         // `sleep` using the new `timeout` value. | 
|---|
| 387 | //!                         sleep.as_mut().reset(op_start + conf.timeout); | 
|---|
| 388 | //!                     } | 
|---|
| 389 | //!                     _ = &mut op => { | 
|---|
| 390 | //!                         // The operation completed! | 
|---|
| 391 | //!                         return | 
|---|
| 392 | //!                     } | 
|---|
| 393 | //!                 } | 
|---|
| 394 | //!             } | 
|---|
| 395 | //!         }); | 
|---|
| 396 | //! | 
|---|
| 397 | //!         handles.push(handle); | 
|---|
| 398 | //!     } | 
|---|
| 399 | //! | 
|---|
| 400 | //!     for handle in handles.drain(..) { | 
|---|
| 401 | //!         handle.await.unwrap(); | 
|---|
| 402 | //!     } | 
|---|
| 403 | //! } | 
|---|
| 404 | //! ``` | 
|---|
| 405 | //! | 
|---|
| 406 | //! [`watch` channel]: mod@crate::sync::watch | 
|---|
| 407 | //! [`broadcast` channel]: mod@crate::sync::broadcast | 
|---|
| 408 | //! | 
|---|
| 409 | //! # State synchronization | 
|---|
| 410 | //! | 
|---|
| 411 | //! The remaining synchronization primitives focus on synchronizing state. | 
|---|
| 412 | //! These are asynchronous equivalents to versions provided by `std`. They | 
|---|
| 413 | //! operate in a similar way as their `std` counterparts but will wait | 
|---|
| 414 | //! asynchronously instead of blocking the thread. | 
|---|
| 415 | //! | 
|---|
| 416 | //! * [`Barrier`] Ensures multiple tasks will wait for each other to reach a | 
|---|
| 417 | //!   point in the program, before continuing execution all together. | 
|---|
| 418 | //! | 
|---|
| 419 | //! * [`Mutex`] Mutual Exclusion mechanism, which ensures that at most one | 
|---|
| 420 | //!   thread at a time is able to access some data. | 
|---|
| 421 | //! | 
|---|
| 422 | //! * [`Notify`] Basic task notification. `Notify` supports notifying a | 
|---|
| 423 | //!   receiving task without sending data. In this case, the task wakes up and | 
|---|
| 424 | //!   resumes processing. | 
|---|
| 425 | //! | 
|---|
| 426 | //! * [`RwLock`] Provides a mutual exclusion mechanism which allows multiple | 
|---|
| 427 | //!   readers at the same time, while allowing only one writer at a time. In | 
|---|
| 428 | //!   some cases, this can be more efficient than a mutex. | 
|---|
| 429 | //! | 
|---|
| 430 | //! * [`Semaphore`] Limits the amount of concurrency. A semaphore holds a | 
|---|
| 431 | //!   number of permits, which tasks may request in order to enter a critical | 
|---|
| 432 | //!   section. Semaphores are useful for implementing limiting or bounding of | 
|---|
| 433 | //!   any kind. | 
|---|
| 434 | //! | 
|---|
| 435 | //! # Runtime compatibility | 
|---|
| 436 | //! | 
|---|
| 437 | //! All synchronization primitives provided in this module are runtime agnostic. | 
|---|
| 438 | //! You can freely move them between different instances of the Tokio runtime | 
|---|
| 439 | //! or even use them from non-Tokio runtimes. | 
|---|
| 440 | //! | 
|---|
| 441 | //! When used in a Tokio runtime, the synchronization primitives participate in | 
|---|
| 442 | //! [cooperative scheduling](crate::task#cooperative-scheduling) to avoid | 
|---|
| 443 | //! starvation. This feature does not apply when used from non-Tokio runtimes. | 
|---|
| 444 | //! | 
|---|
| 445 | //! As an exception, methods ending in `_timeout` are not runtime agnostic | 
|---|
| 446 | //! because they require access to the Tokio timer. See the documentation of | 
|---|
| 447 | //! each `*_timeout` method for more information on its use. | 
|---|
| 448 |  | 
|---|
| 449 | cfg_sync! { | 
|---|
| 450 | /// Named future types. | 
|---|
| 451 | pub mod futures { | 
|---|
| 452 | pub use super::notify::Notified; | 
|---|
| 453 | } | 
|---|
| 454 |  | 
|---|
| 455 | mod barrier; | 
|---|
| 456 | pub use barrier::{Barrier, BarrierWaitResult}; | 
|---|
| 457 |  | 
|---|
| 458 | pub mod broadcast; | 
|---|
| 459 |  | 
|---|
| 460 | pub mod mpsc; | 
|---|
| 461 |  | 
|---|
| 462 | mod mutex; | 
|---|
| 463 | pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard}; | 
|---|
| 464 |  | 
|---|
| 465 | pub(crate) mod notify; | 
|---|
| 466 | pub use notify::Notify; | 
|---|
| 467 |  | 
|---|
| 468 | pub mod oneshot; | 
|---|
| 469 |  | 
|---|
| 470 | pub(crate) mod batch_semaphore; | 
|---|
| 471 | pub use batch_semaphore::{AcquireError, TryAcquireError}; | 
|---|
| 472 |  | 
|---|
| 473 | mod semaphore; | 
|---|
| 474 | pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; | 
|---|
| 475 |  | 
|---|
| 476 | mod rwlock; | 
|---|
| 477 | pub use rwlock::RwLock; | 
|---|
| 478 | pub use rwlock::owned_read_guard::OwnedRwLockReadGuard; | 
|---|
| 479 | pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard; | 
|---|
| 480 | pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard; | 
|---|
| 481 | pub use rwlock::read_guard::RwLockReadGuard; | 
|---|
| 482 | pub use rwlock::write_guard::RwLockWriteGuard; | 
|---|
| 483 | pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard; | 
|---|
| 484 |  | 
|---|
| 485 | mod task; | 
|---|
| 486 | pub(crate) use task::AtomicWaker; | 
|---|
| 487 |  | 
|---|
| 488 | mod once_cell; | 
|---|
| 489 | pub use self::once_cell::{OnceCell, SetError}; | 
|---|
| 490 |  | 
|---|
| 491 | pub mod watch; | 
|---|
| 492 | } | 
|---|
| 493 |  | 
|---|
| 494 | cfg_not_sync! { | 
|---|
| 495 | cfg_fs! { | 
|---|
| 496 | pub(crate) mod batch_semaphore; | 
|---|
| 497 | mod mutex; | 
|---|
| 498 | pub(crate) use mutex::Mutex; | 
|---|
| 499 | } | 
|---|
| 500 |  | 
|---|
| 501 | #[ cfg(any(feature = "rt", feature = "signal", all(unix, feature = "process")))] | 
|---|
| 502 | pub(crate) mod notify; | 
|---|
| 503 |  | 
|---|
| 504 | #[ cfg(any(feature = "rt", all(windows, feature = "process")))] | 
|---|
| 505 | pub(crate) mod oneshot; | 
|---|
| 506 |  | 
|---|
| 507 | cfg_atomic_waker_impl! { | 
|---|
| 508 | mod task; | 
|---|
| 509 | pub(crate) use task::AtomicWaker; | 
|---|
| 510 | } | 
|---|
| 511 |  | 
|---|
| 512 | #[ cfg(any(feature = "signal", all(unix, feature = "process")))] | 
|---|
| 513 | pub(crate) mod watch; | 
|---|
| 514 | } | 
|---|
| 515 |  | 
|---|
| 516 | /// Unit tests | 
|---|
| 517 | #[ cfg(test)] | 
|---|
| 518 | mod tests; | 
|---|
| 519 |  | 
|---|