1 | #![cfg_attr (loom, allow(dead_code, unreachable_pub, unused_imports))] |
2 | |
3 | //! Synchronization primitives for use in asynchronous contexts. |
4 | //! |
5 | //! Tokio programs tend to be organized as a set of [tasks] where each task |
6 | //! operates independently and may be executed on separate physical threads. The |
7 | //! synchronization primitives provided in this module permit these independent |
8 | //! tasks to communicate together. |
9 | //! |
10 | //! [tasks]: crate::task |
11 | //! |
12 | //! # Message passing |
13 | //! |
14 | //! The most common form of synchronization in a Tokio program is message |
15 | //! passing. Two tasks operate independently and send messages to each other to |
16 | //! synchronize. Doing so has the advantage of avoiding shared state. |
17 | //! |
18 | //! Message passing is implemented using channels. A channel supports sending a |
19 | //! message from one producer task to one or more consumer tasks. There are a |
20 | //! few flavors of channels provided by Tokio. Each channel flavor supports |
21 | //! different message passing patterns. When a channel supports multiple |
22 | //! producers, many separate tasks may **send** messages. When a channel |
23 | //! supports multiple consumers, many different separate tasks may **receive** |
24 | //! messages. |
25 | //! |
26 | //! Tokio provides many different channel flavors as different message passing |
27 | //! patterns are best handled with different implementations. |
28 | //! |
29 | //! ## `oneshot` channel |
30 | //! |
31 | //! The [`oneshot` channel][oneshot] supports sending a **single** value from a |
32 | //! single producer to a single consumer. This channel is usually used to send |
33 | //! the result of a computation to a waiter. |
34 | //! |
35 | //! **Example:** using a [`oneshot` channel][oneshot] to receive the result of a |
36 | //! computation. |
37 | //! |
38 | //! ``` |
39 | //! use tokio::sync::oneshot; |
40 | //! |
41 | //! async fn some_computation() -> String { |
42 | //! "represents the result of the computation" .to_string() |
43 | //! } |
44 | //! |
45 | //! #[tokio::main] |
46 | //! async fn main() { |
47 | //! let (tx, rx) = oneshot::channel(); |
48 | //! |
49 | //! tokio::spawn(async move { |
50 | //! let res = some_computation().await; |
51 | //! tx.send(res).unwrap(); |
52 | //! }); |
53 | //! |
54 | //! // Do other work while the computation is happening in the background |
55 | //! |
56 | //! // Wait for the computation result |
57 | //! let res = rx.await.unwrap(); |
58 | //! } |
59 | //! ``` |
60 | //! |
61 | //! Note, if the task produces a computation result as its final |
62 | //! action before terminating, the [`JoinHandle`] can be used to |
63 | //! receive that value instead of allocating resources for the |
64 | //! `oneshot` channel. Awaiting on [`JoinHandle`] returns `Result`. If |
65 | //! the task panics, the `Joinhandle` yields `Err` with the panic |
66 | //! cause. |
67 | //! |
68 | //! **Example:** |
69 | //! |
70 | //! ``` |
71 | //! async fn some_computation() -> String { |
72 | //! "the result of the computation" .to_string() |
73 | //! } |
74 | //! |
75 | //! #[tokio::main] |
76 | //! async fn main() { |
77 | //! let join_handle = tokio::spawn(async move { |
78 | //! some_computation().await |
79 | //! }); |
80 | //! |
81 | //! // Do other work while the computation is happening in the background |
82 | //! |
83 | //! // Wait for the computation result |
84 | //! let res = join_handle.await.unwrap(); |
85 | //! } |
86 | //! ``` |
87 | //! |
88 | //! [oneshot]: oneshot |
89 | //! [`JoinHandle`]: crate::task::JoinHandle |
90 | //! |
91 | //! ## `mpsc` channel |
92 | //! |
93 | //! The [`mpsc` channel][mpsc] supports sending **many** values from **many** |
94 | //! producers to a single consumer. This channel is often used to send work to a |
95 | //! task or to receive the result of many computations. |
96 | //! |
97 | //! This is also the channel you should use if you want to send many messages |
98 | //! from a single producer to a single consumer. There is no dedicated spsc |
99 | //! channel. |
100 | //! |
101 | //! **Example:** using an mpsc to incrementally stream the results of a series |
102 | //! of computations. |
103 | //! |
104 | //! ``` |
105 | //! use tokio::sync::mpsc; |
106 | //! |
107 | //! async fn some_computation(input: u32) -> String { |
108 | //! format!("the result of computation {}" , input) |
109 | //! } |
110 | //! |
111 | //! #[tokio::main] |
112 | //! async fn main() { |
113 | //! let (tx, mut rx) = mpsc::channel(100); |
114 | //! |
115 | //! tokio::spawn(async move { |
116 | //! for i in 0..10 { |
117 | //! let res = some_computation(i).await; |
118 | //! tx.send(res).await.unwrap(); |
119 | //! } |
120 | //! }); |
121 | //! |
122 | //! while let Some(res) = rx.recv().await { |
123 | //! println!("got = {}" , res); |
124 | //! } |
125 | //! } |
126 | //! ``` |
127 | //! |
128 | //! The argument to `mpsc::channel` is the channel capacity. This is the maximum |
129 | //! number of values that can be stored in the channel pending receipt at any |
130 | //! given time. Properly setting this value is key in implementing robust |
131 | //! programs as the channel capacity plays a critical part in handling back |
132 | //! pressure. |
133 | //! |
134 | //! A common concurrency pattern for resource management is to spawn a task |
135 | //! dedicated to managing that resource and using message passing between other |
136 | //! tasks to interact with the resource. The resource may be anything that may |
137 | //! not be concurrently used. Some examples include a socket and program state. |
138 | //! For example, if multiple tasks need to send data over a single socket, spawn |
139 | //! a task to manage the socket and use a channel to synchronize. |
140 | //! |
141 | //! **Example:** sending data from many tasks over a single socket using message |
142 | //! passing. |
143 | //! |
144 | //! ```no_run |
145 | //! use tokio::io::{self, AsyncWriteExt}; |
146 | //! use tokio::net::TcpStream; |
147 | //! use tokio::sync::mpsc; |
148 | //! |
149 | //! #[tokio::main] |
150 | //! async fn main() -> io::Result<()> { |
151 | //! let mut socket = TcpStream::connect("www.example.com:1234" ).await?; |
152 | //! let (tx, mut rx) = mpsc::channel(100); |
153 | //! |
154 | //! for _ in 0..10 { |
155 | //! // Each task needs its own `tx` handle. This is done by cloning the |
156 | //! // original handle. |
157 | //! let tx = tx.clone(); |
158 | //! |
159 | //! tokio::spawn(async move { |
160 | //! tx.send(&b"data to write" [..]).await.unwrap(); |
161 | //! }); |
162 | //! } |
163 | //! |
164 | //! // The `rx` half of the channel returns `None` once **all** `tx` clones |
165 | //! // drop. To ensure `None` is returned, drop the handle owned by the |
166 | //! // current task. If this `tx` handle is not dropped, there will always |
167 | //! // be a single outstanding `tx` handle. |
168 | //! drop(tx); |
169 | //! |
170 | //! while let Some(res) = rx.recv().await { |
171 | //! socket.write_all(res).await?; |
172 | //! } |
173 | //! |
174 | //! Ok(()) |
175 | //! } |
176 | //! ``` |
177 | //! |
178 | //! The [`mpsc`][mpsc] and [`oneshot`][oneshot] channels can be combined to |
179 | //! provide a request / response type synchronization pattern with a shared |
180 | //! resource. A task is spawned to synchronize a resource and waits on commands |
181 | //! received on a [`mpsc`][mpsc] channel. Each command includes a |
182 | //! [`oneshot`][oneshot] `Sender` on which the result of the command is sent. |
183 | //! |
184 | //! **Example:** use a task to synchronize a `u64` counter. Each task sends an |
185 | //! "fetch and increment" command. The counter value **before** the increment is |
186 | //! sent over the provided `oneshot` channel. |
187 | //! |
188 | //! ``` |
189 | //! use tokio::sync::{oneshot, mpsc}; |
190 | //! use Command::Increment; |
191 | //! |
192 | //! enum Command { |
193 | //! Increment, |
194 | //! // Other commands can be added here |
195 | //! } |
196 | //! |
197 | //! #[tokio::main] |
198 | //! async fn main() { |
199 | //! let (cmd_tx, mut cmd_rx) = mpsc::channel::<(Command, oneshot::Sender<u64>)>(100); |
200 | //! |
201 | //! // Spawn a task to manage the counter |
202 | //! tokio::spawn(async move { |
203 | //! let mut counter: u64 = 0; |
204 | //! |
205 | //! while let Some((cmd, response)) = cmd_rx.recv().await { |
206 | //! match cmd { |
207 | //! Increment => { |
208 | //! let prev = counter; |
209 | //! counter += 1; |
210 | //! response.send(prev).unwrap(); |
211 | //! } |
212 | //! } |
213 | //! } |
214 | //! }); |
215 | //! |
216 | //! let mut join_handles = vec![]; |
217 | //! |
218 | //! // Spawn tasks that will send the increment command. |
219 | //! for _ in 0..10 { |
220 | //! let cmd_tx = cmd_tx.clone(); |
221 | //! |
222 | //! join_handles.push(tokio::spawn(async move { |
223 | //! let (resp_tx, resp_rx) = oneshot::channel(); |
224 | //! |
225 | //! cmd_tx.send((Increment, resp_tx)).await.ok().unwrap(); |
226 | //! let res = resp_rx.await.unwrap(); |
227 | //! |
228 | //! println!("previous value = {}" , res); |
229 | //! })); |
230 | //! } |
231 | //! |
232 | //! // Wait for all tasks to complete |
233 | //! for join_handle in join_handles.drain(..) { |
234 | //! join_handle.await.unwrap(); |
235 | //! } |
236 | //! } |
237 | //! ``` |
238 | //! |
239 | //! [mpsc]: mpsc |
240 | //! |
241 | //! ## `broadcast` channel |
242 | //! |
243 | //! The [`broadcast` channel] supports sending **many** values from |
244 | //! **many** producers to **many** consumers. Each consumer will receive |
245 | //! **each** value. This channel can be used to implement "fan out" style |
246 | //! patterns common with pub / sub or "chat" systems. |
247 | //! |
248 | //! This channel tends to be used less often than `oneshot` and `mpsc` but still |
249 | //! has its use cases. |
250 | //! |
251 | //! This is also the channel you should use if you want to broadcast values from |
252 | //! a single producer to many consumers. There is no dedicated spmc broadcast |
253 | //! channel. |
254 | //! |
255 | //! Basic usage |
256 | //! |
257 | //! ``` |
258 | //! use tokio::sync::broadcast; |
259 | //! |
260 | //! #[tokio::main] |
261 | //! async fn main() { |
262 | //! let (tx, mut rx1) = broadcast::channel(16); |
263 | //! let mut rx2 = tx.subscribe(); |
264 | //! |
265 | //! tokio::spawn(async move { |
266 | //! assert_eq!(rx1.recv().await.unwrap(), 10); |
267 | //! assert_eq!(rx1.recv().await.unwrap(), 20); |
268 | //! }); |
269 | //! |
270 | //! tokio::spawn(async move { |
271 | //! assert_eq!(rx2.recv().await.unwrap(), 10); |
272 | //! assert_eq!(rx2.recv().await.unwrap(), 20); |
273 | //! }); |
274 | //! |
275 | //! tx.send(10).unwrap(); |
276 | //! tx.send(20).unwrap(); |
277 | //! } |
278 | //! ``` |
279 | //! |
280 | //! [`broadcast` channel]: crate::sync::broadcast |
281 | //! |
282 | //! ## `watch` channel |
283 | //! |
284 | //! The [`watch` channel] supports sending **many** values from a **single** |
285 | //! producer to **many** consumers. However, only the **most recent** value is |
286 | //! stored in the channel. Consumers are notified when a new value is sent, but |
287 | //! there is no guarantee that consumers will see **all** values. |
288 | //! |
289 | //! The [`watch` channel] is similar to a [`broadcast` channel] with capacity 1. |
290 | //! |
291 | //! Use cases for the [`watch` channel] include broadcasting configuration |
292 | //! changes or signalling program state changes, such as transitioning to |
293 | //! shutdown. |
294 | //! |
295 | //! **Example:** use a [`watch` channel] to notify tasks of configuration |
296 | //! changes. In this example, a configuration file is checked periodically. When |
297 | //! the file changes, the configuration changes are signalled to consumers. |
298 | //! |
299 | //! ``` |
300 | //! use tokio::sync::watch; |
301 | //! use tokio::time::{self, Duration, Instant}; |
302 | //! |
303 | //! use std::io; |
304 | //! |
305 | //! #[derive(Debug, Clone, Eq, PartialEq)] |
306 | //! struct Config { |
307 | //! timeout: Duration, |
308 | //! } |
309 | //! |
310 | //! impl Config { |
311 | //! async fn load_from_file() -> io::Result<Config> { |
312 | //! // file loading and deserialization logic here |
313 | //! # Ok(Config { timeout: Duration::from_secs(1) }) |
314 | //! } |
315 | //! } |
316 | //! |
317 | //! async fn my_async_operation() { |
318 | //! // Do something here |
319 | //! } |
320 | //! |
321 | //! #[tokio::main] |
322 | //! async fn main() { |
323 | //! // Load initial configuration value |
324 | //! let mut config = Config::load_from_file().await.unwrap(); |
325 | //! |
326 | //! // Create the watch channel, initialized with the loaded configuration |
327 | //! let (tx, rx) = watch::channel(config.clone()); |
328 | //! |
329 | //! // Spawn a task to monitor the file. |
330 | //! tokio::spawn(async move { |
331 | //! loop { |
332 | //! // Wait 10 seconds between checks |
333 | //! time::sleep(Duration::from_secs(10)).await; |
334 | //! |
335 | //! // Load the configuration file |
336 | //! let new_config = Config::load_from_file().await.unwrap(); |
337 | //! |
338 | //! // If the configuration changed, send the new config value |
339 | //! // on the watch channel. |
340 | //! if new_config != config { |
341 | //! tx.send(new_config.clone()).unwrap(); |
342 | //! config = new_config; |
343 | //! } |
344 | //! } |
345 | //! }); |
346 | //! |
347 | //! let mut handles = vec![]; |
348 | //! |
349 | //! // Spawn tasks that runs the async operation for at most `timeout`. If |
350 | //! // the timeout elapses, restart the operation. |
351 | //! // |
352 | //! // The task simultaneously watches the `Config` for changes. When the |
353 | //! // timeout duration changes, the timeout is updated without restarting |
354 | //! // the in-flight operation. |
355 | //! for _ in 0..5 { |
356 | //! // Clone a config watch handle for use in this task |
357 | //! let mut rx = rx.clone(); |
358 | //! |
359 | //! let handle = tokio::spawn(async move { |
360 | //! // Start the initial operation and pin the future to the stack. |
361 | //! // Pinning to the stack is required to resume the operation |
362 | //! // across multiple calls to `select!` |
363 | //! let op = my_async_operation(); |
364 | //! tokio::pin!(op); |
365 | //! |
366 | //! // Get the initial config value |
367 | //! let mut conf = rx.borrow().clone(); |
368 | //! |
369 | //! let mut op_start = Instant::now(); |
370 | //! let sleep = time::sleep_until(op_start + conf.timeout); |
371 | //! tokio::pin!(sleep); |
372 | //! |
373 | //! loop { |
374 | //! tokio::select! { |
375 | //! _ = &mut sleep => { |
376 | //! // The operation elapsed. Restart it |
377 | //! op.set(my_async_operation()); |
378 | //! |
379 | //! // Track the new start time |
380 | //! op_start = Instant::now(); |
381 | //! |
382 | //! // Restart the timeout |
383 | //! sleep.set(time::sleep_until(op_start + conf.timeout)); |
384 | //! } |
385 | //! _ = rx.changed() => { |
386 | //! conf = rx.borrow().clone(); |
387 | //! |
388 | //! // The configuration has been updated. Update the |
389 | //! // `sleep` using the new `timeout` value. |
390 | //! sleep.as_mut().reset(op_start + conf.timeout); |
391 | //! } |
392 | //! _ = &mut op => { |
393 | //! // The operation completed! |
394 | //! return |
395 | //! } |
396 | //! } |
397 | //! } |
398 | //! }); |
399 | //! |
400 | //! handles.push(handle); |
401 | //! } |
402 | //! |
403 | //! for handle in handles.drain(..) { |
404 | //! handle.await.unwrap(); |
405 | //! } |
406 | //! } |
407 | //! ``` |
408 | //! |
409 | //! [`watch` channel]: mod@crate::sync::watch |
410 | //! [`broadcast` channel]: mod@crate::sync::broadcast |
411 | //! |
412 | //! # State synchronization |
413 | //! |
414 | //! The remaining synchronization primitives focus on synchronizing state. |
415 | //! These are asynchronous equivalents to versions provided by `std`. They |
416 | //! operate in a similar way as their `std` counterparts but will wait |
417 | //! asynchronously instead of blocking the thread. |
418 | //! |
419 | //! * [`Barrier`](Barrier) Ensures multiple tasks will wait for each other to |
420 | //! reach a point in the program, before continuing execution all together. |
421 | //! |
422 | //! * [`Mutex`](Mutex) Mutual Exclusion mechanism, which ensures that at most |
423 | //! one thread at a time is able to access some data. |
424 | //! |
425 | //! * [`Notify`](Notify) Basic task notification. `Notify` supports notifying a |
426 | //! receiving task without sending data. In this case, the task wakes up and |
427 | //! resumes processing. |
428 | //! |
429 | //! * [`RwLock`](RwLock) Provides a mutual exclusion mechanism which allows |
430 | //! multiple readers at the same time, while allowing only one writer at a |
431 | //! time. In some cases, this can be more efficient than a mutex. |
432 | //! |
433 | //! * [`Semaphore`](Semaphore) Limits the amount of concurrency. A semaphore |
434 | //! holds a number of permits, which tasks may request in order to enter a |
435 | //! critical section. Semaphores are useful for implementing limiting or |
436 | //! bounding of any kind. |
437 | |
438 | cfg_sync! { |
439 | /// Named future types. |
440 | pub mod futures { |
441 | pub use super::notify::Notified; |
442 | } |
443 | |
444 | mod barrier; |
445 | pub use barrier::{Barrier, BarrierWaitResult}; |
446 | |
447 | pub mod broadcast; |
448 | |
449 | pub mod mpsc; |
450 | |
451 | mod mutex; |
452 | pub use mutex::{Mutex, MutexGuard, TryLockError, OwnedMutexGuard, MappedMutexGuard, OwnedMappedMutexGuard}; |
453 | |
454 | pub(crate) mod notify; |
455 | pub use notify::Notify; |
456 | |
457 | pub mod oneshot; |
458 | |
459 | pub(crate) mod batch_semaphore; |
460 | pub use batch_semaphore::{AcquireError, TryAcquireError}; |
461 | |
462 | mod semaphore; |
463 | pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; |
464 | |
465 | mod rwlock; |
466 | pub use rwlock::RwLock; |
467 | pub use rwlock::owned_read_guard::OwnedRwLockReadGuard; |
468 | pub use rwlock::owned_write_guard::OwnedRwLockWriteGuard; |
469 | pub use rwlock::owned_write_guard_mapped::OwnedRwLockMappedWriteGuard; |
470 | pub use rwlock::read_guard::RwLockReadGuard; |
471 | pub use rwlock::write_guard::RwLockWriteGuard; |
472 | pub use rwlock::write_guard_mapped::RwLockMappedWriteGuard; |
473 | |
474 | mod task; |
475 | pub(crate) use task::AtomicWaker; |
476 | |
477 | mod once_cell; |
478 | pub use self::once_cell::{OnceCell, SetError}; |
479 | |
480 | pub mod watch; |
481 | } |
482 | |
483 | cfg_not_sync! { |
484 | cfg_fs! { |
485 | pub(crate) mod batch_semaphore; |
486 | mod mutex; |
487 | pub(crate) use mutex::Mutex; |
488 | } |
489 | |
490 | #[cfg (any(feature = "rt" , feature = "signal" , all(unix, feature = "process" )))] |
491 | pub(crate) mod notify; |
492 | |
493 | #[cfg (any(feature = "rt" , all(windows, feature = "process" )))] |
494 | pub(crate) mod oneshot; |
495 | |
496 | cfg_atomic_waker_impl! { |
497 | mod task; |
498 | pub(crate) use task::AtomicWaker; |
499 | } |
500 | |
501 | #[cfg (any(feature = "signal" , all(unix, feature = "process" )))] |
502 | pub(crate) mod watch; |
503 | } |
504 | |
505 | /// Unit tests |
506 | #[cfg (test)] |
507 | mod tests; |
508 | |