1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A single-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value. The **latest** value stored in the channel is accessed with
14//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new
15//! value to be sent by the [`Sender`] half.
16//!
17//! # Examples
18//!
19//! ```
20//! use tokio::sync::watch;
21//!
22//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
23//! let (tx, mut rx) = watch::channel("hello");
24//!
25//! tokio::spawn(async move {
26//! while rx.changed().await.is_ok() {
27//! println!("received = {:?}", *rx.borrow());
28//! }
29//! });
30//!
31//! tx.send("world")?;
32//! # Ok(())
33//! # }
34//! ```
35//!
36//! # Closing
37//!
38//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
39//! when all [`Receiver`] handles have been dropped. This indicates that there
40//! is no further interest in the values being produced and work can be stopped.
41//!
42//! The value in the channel will not be dropped until the sender and all receivers
43//! have been dropped.
44//!
45//! # Thread safety
46//!
47//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
48//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
49//! handles may be moved to separate threads and also used concurrently.
50//!
51//! [`Sender`]: crate::sync::watch::Sender
52//! [`Receiver`]: crate::sync::watch::Receiver
53//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
54//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
55//! [`channel`]: crate::sync::watch::channel
56//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
57//! [`Sender::closed`]: crate::sync::watch::Sender::closed
58
59use crate::sync::notify::Notify;
60
61use crate::loom::sync::atomic::AtomicUsize;
62use crate::loom::sync::atomic::Ordering::Relaxed;
63use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
64use std::fmt;
65use std::mem;
66use std::ops;
67use std::panic;
68
69/// Receives values from the associated [`Sender`](struct@Sender).
70///
71/// Instances are created by the [`channel`](fn@channel) function.
72///
73/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
74/// wrapper.
75///
76/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
77#[derive(Debug)]
78pub struct Receiver<T> {
79 /// Pointer to the shared state
80 shared: Arc<Shared<T>>,
81
82 /// Last observed version
83 version: Version,
84}
85
86/// Sends values to the associated [`Receiver`](struct@Receiver).
87///
88/// Instances are created by the [`channel`](fn@channel) function.
89#[derive(Debug)]
90pub struct Sender<T> {
91 shared: Arc<Shared<T>>,
92}
93
94/// Returns a reference to the inner value.
95///
96/// Outstanding borrows hold a read lock on the inner value. This means that
97/// long-lived borrows could cause the producer half to block. It is recommended
98/// to keep the borrow as short-lived as possible. Additionally, if you are
99/// running in an environment that allows `!Send` futures, you must ensure that
100/// the returned `Ref` type is never held alive across an `.await` point,
101/// otherwise, it can lead to a deadlock.
102///
103/// The priority policy of the lock is dependent on the underlying lock
104/// implementation, and this type does not guarantee that any particular policy
105/// will be used. In particular, a producer which is waiting to acquire the lock
106/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
107///
108/// <details><summary>Potential deadlock example</summary>
109///
110/// ```text
111/// // Task 1 (on thread A) | // Task 2 (on thread B)
112/// let _ref1 = rx.borrow(); |
113/// | // will block
114/// | let _ = tx.send(());
115/// // may deadlock |
116/// let _ref2 = rx.borrow(); |
117/// ```
118/// </details>
119#[derive(Debug)]
120pub struct Ref<'a, T> {
121 inner: RwLockReadGuard<'a, T>,
122 has_changed: bool,
123}
124
125impl<'a, T> Ref<'a, T> {
126 /// Indicates if the borrowed value is considered as _changed_ since the last
127 /// time it has been marked as seen.
128 ///
129 /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
130 ///
131 /// When borrowed from the [`Sender`] this function will always return `false`.
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::watch;
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 /// let (tx, mut rx) = watch::channel("hello");
141 ///
142 /// tx.send("goodbye").unwrap();
143 /// // The sender does never consider the value as changed.
144 /// assert!(!tx.borrow().has_changed());
145 ///
146 /// // Drop the sender immediately, just for testing purposes.
147 /// drop(tx);
148 ///
149 /// // Even if the sender has already been dropped...
150 /// assert!(rx.has_changed().is_err());
151 /// // ...the modified value is still readable and detected as changed.
152 /// assert_eq!(*rx.borrow(), "goodbye");
153 /// assert!(rx.borrow().has_changed());
154 ///
155 /// // Read the changed value and mark it as seen.
156 /// {
157 /// let received = rx.borrow_and_update();
158 /// assert_eq!(*received, "goodbye");
159 /// assert!(received.has_changed());
160 /// // Release the read lock when leaving this scope.
161 /// }
162 ///
163 /// // Now the value has already been marked as seen and could
164 /// // never be modified again (after the sender has been dropped).
165 /// assert!(!rx.borrow().has_changed());
166 /// }
167 /// ```
168 pub fn has_changed(&self) -> bool {
169 self.has_changed
170 }
171}
172
173struct Shared<T> {
174 /// The most recent value.
175 value: RwLock<T>,
176
177 /// The current version.
178 ///
179 /// The lowest bit represents a "closed" state. The rest of the bits
180 /// represent the current version.
181 state: AtomicState,
182
183 /// Tracks the number of `Receiver` instances.
184 ref_count_rx: AtomicUsize,
185
186 /// Notifies waiting receivers that the value changed.
187 notify_rx: big_notify::BigNotify,
188
189 /// Notifies any task listening for `Receiver` dropped events.
190 notify_tx: Notify,
191}
192
193impl<T: fmt::Debug> fmt::Debug for Shared<T> {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 let state: StateSnapshot = self.state.load();
196 f&mut DebugStruct<'_, '_>.debug_struct("Shared")
197 .field("value", &self.value)
198 .field("version", &state.version())
199 .field("is_closed", &state.is_closed())
200 .field(name:"ref_count_rx", &self.ref_count_rx)
201 .finish()
202 }
203}
204
205pub mod error {
206 //! Watch error types.
207
208 use std::fmt;
209
210 /// Error produced when sending a value fails.
211 #[derive(PartialEq, Eq, Clone, Copy)]
212 pub struct SendError<T>(pub T);
213
214 // ===== impl SendError =====
215
216 impl<T> fmt::Debug for SendError<T> {
217 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218 f.debug_struct("SendError").finish_non_exhaustive()
219 }
220 }
221
222 impl<T> fmt::Display for SendError<T> {
223 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
224 write!(fmt, "channel closed")
225 }
226 }
227
228 impl<T> std::error::Error for SendError<T> {}
229
230 /// Error produced when receiving a change notification.
231 #[derive(Debug, Clone)]
232 pub struct RecvError(pub(super) ());
233
234 // ===== impl RecvError =====
235
236 impl fmt::Display for RecvError {
237 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
238 write!(fmt, "channel closed")
239 }
240 }
241
242 impl std::error::Error for RecvError {}
243}
244
245mod big_notify {
246 use super::*;
247 use crate::sync::notify::Notified;
248
249 // To avoid contention on the lock inside the `Notify`, we store multiple
250 // copies of it. Then, we use either circular access or randomness to spread
251 // out threads over different `Notify` objects.
252 //
253 // Some simple benchmarks show that randomness performs slightly better than
254 // circular access (probably due to contention on `next`), so we prefer to
255 // use randomness when Tokio is compiled with a random number generator.
256 //
257 // When the random number generator is not available, we fall back to
258 // circular access.
259
260 pub(super) struct BigNotify {
261 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
262 next: AtomicUsize,
263 inner: [Notify; 8],
264 }
265
266 impl BigNotify {
267 pub(super) fn new() -> Self {
268 Self {
269 #[cfg(not(all(
270 not(loom),
271 feature = "sync",
272 any(feature = "rt", feature = "macros")
273 )))]
274 next: AtomicUsize::new(0),
275 inner: Default::default(),
276 }
277 }
278
279 pub(super) fn notify_waiters(&self) {
280 for notify in &self.inner {
281 notify.notify_waiters();
282 }
283 }
284
285 /// This function implements the case where randomness is not available.
286 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
287 pub(super) fn notified(&self) -> Notified<'_> {
288 let i = self.next.fetch_add(1, Relaxed) % 8;
289 self.inner[i].notified()
290 }
291
292 /// This function implements the case where randomness is available.
293 #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
294 pub(super) fn notified(&self) -> Notified<'_> {
295 let i = crate::runtime::context::thread_rng_n(8) as usize;
296 self.inner[i].notified()
297 }
298 }
299}
300
301use self::state::{AtomicState, Version};
302mod state {
303 use crate::loom::sync::atomic::AtomicUsize;
304 use crate::loom::sync::atomic::Ordering::SeqCst;
305
306 const CLOSED: usize = 1;
307
308 /// The version part of the state. The lowest bit is always zero.
309 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
310 pub(super) struct Version(usize);
311
312 /// Snapshot of the state. The first bit is used as the CLOSED bit.
313 /// The remaining bits are used as the version.
314 ///
315 /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
316 /// receivers does not set it.
317 #[derive(Copy, Clone, Debug)]
318 pub(super) struct StateSnapshot(usize);
319
320 /// The state stored in an atomic integer.
321 #[derive(Debug)]
322 pub(super) struct AtomicState(AtomicUsize);
323
324 impl Version {
325 /// Get the initial version when creating the channel.
326 pub(super) fn initial() -> Self {
327 Version(0)
328 }
329 }
330
331 impl StateSnapshot {
332 /// Extract the version from the state.
333 pub(super) fn version(self) -> Version {
334 Version(self.0 & !CLOSED)
335 }
336
337 /// Is the closed bit set?
338 pub(super) fn is_closed(self) -> bool {
339 (self.0 & CLOSED) == CLOSED
340 }
341 }
342
343 impl AtomicState {
344 /// Create a new `AtomicState` that is not closed and which has the
345 /// version set to `Version::initial()`.
346 pub(super) fn new() -> Self {
347 AtomicState(AtomicUsize::new(0))
348 }
349
350 /// Load the current value of the state.
351 pub(super) fn load(&self) -> StateSnapshot {
352 StateSnapshot(self.0.load(SeqCst))
353 }
354
355 /// Increment the version counter.
356 pub(super) fn increment_version(&self) {
357 // Increment by two to avoid touching the CLOSED bit.
358 self.0.fetch_add(2, SeqCst);
359 }
360
361 /// Set the closed bit in the state.
362 pub(super) fn set_closed(&self) {
363 self.0.fetch_or(CLOSED, SeqCst);
364 }
365 }
366}
367
368/// Creates a new watch channel, returning the "send" and "receive" handles.
369///
370/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
371/// Only the last value sent is made available to the [`Receiver`] half. All
372/// intermediate values are dropped.
373///
374/// # Examples
375///
376/// ```
377/// use tokio::sync::watch;
378///
379/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
380/// let (tx, mut rx) = watch::channel("hello");
381///
382/// tokio::spawn(async move {
383/// while rx.changed().await.is_ok() {
384/// println!("received = {:?}", *rx.borrow());
385/// }
386/// });
387///
388/// tx.send("world")?;
389/// # Ok(())
390/// # }
391/// ```
392///
393/// [`Sender`]: struct@Sender
394/// [`Receiver`]: struct@Receiver
395pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
396 let shared: Arc> = Arc::new(data:Shared {
397 value: RwLock::new(init),
398 state: AtomicState::new(),
399 ref_count_rx: AtomicUsize::new(val:1),
400 notify_rx: big_notify::BigNotify::new(),
401 notify_tx: Notify::new(),
402 });
403
404 let tx: Sender = Sender {
405 shared: shared.clone(),
406 };
407
408 let rx: Receiver = Receiver {
409 shared,
410 version: Version::initial(),
411 };
412
413 (tx, rx)
414}
415
416impl<T> Receiver<T> {
417 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
418 // No synchronization necessary as this is only used as a counter and
419 // not memory access.
420 shared.ref_count_rx.fetch_add(1, Relaxed);
421
422 Self { shared, version }
423 }
424
425 /// Returns a reference to the most recently sent value.
426 ///
427 /// This method does not mark the returned value as seen, so future calls to
428 /// [`changed`] may return immediately even if you have already seen the
429 /// value with a call to `borrow`.
430 ///
431 /// Outstanding borrows hold a read lock on the inner value. This means that
432 /// long-lived borrows could cause the producer half to block. It is recommended
433 /// to keep the borrow as short-lived as possible. Additionally, if you are
434 /// running in an environment that allows `!Send` futures, you must ensure that
435 /// the returned `Ref` type is never held alive across an `.await` point,
436 /// otherwise, it can lead to a deadlock.
437 ///
438 /// The priority policy of the lock is dependent on the underlying lock
439 /// implementation, and this type does not guarantee that any particular policy
440 /// will be used. In particular, a producer which is waiting to acquire the lock
441 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
442 ///
443 /// <details><summary>Potential deadlock example</summary>
444 ///
445 /// ```text
446 /// // Task 1 (on thread A) | // Task 2 (on thread B)
447 /// let _ref1 = rx.borrow(); |
448 /// | // will block
449 /// | let _ = tx.send(());
450 /// // may deadlock |
451 /// let _ref2 = rx.borrow(); |
452 /// ```
453 /// </details>
454 ///
455 /// [`changed`]: Receiver::changed
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// use tokio::sync::watch;
461 ///
462 /// let (_, rx) = watch::channel("hello");
463 /// assert_eq!(*rx.borrow(), "hello");
464 /// ```
465 pub fn borrow(&self) -> Ref<'_, T> {
466 let inner = self.shared.value.read().unwrap();
467
468 // After obtaining a read-lock no concurrent writes could occur
469 // and the loaded version matches that of the borrowed reference.
470 let new_version = self.shared.state.load().version();
471 let has_changed = self.version != new_version;
472
473 Ref { inner, has_changed }
474 }
475
476 /// Returns a reference to the most recently sent value and marks that value
477 /// as seen.
478 ///
479 /// This method marks the current value as seen. Subsequent calls to [`changed`]
480 /// will not return immediately until the [`Sender`] has modified the shared
481 /// value again.
482 ///
483 /// Outstanding borrows hold a read lock on the inner value. This means that
484 /// long-lived borrows could cause the producer half to block. It is recommended
485 /// to keep the borrow as short-lived as possible. Additionally, if you are
486 /// running in an environment that allows `!Send` futures, you must ensure that
487 /// the returned `Ref` type is never held alive across an `.await` point,
488 /// otherwise, it can lead to a deadlock.
489 ///
490 /// The priority policy of the lock is dependent on the underlying lock
491 /// implementation, and this type does not guarantee that any particular policy
492 /// will be used. In particular, a producer which is waiting to acquire the lock
493 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
494 ///
495 /// <details><summary>Potential deadlock example</summary>
496 ///
497 /// ```text
498 /// // Task 1 (on thread A) | // Task 2 (on thread B)
499 /// let _ref1 = rx1.borrow_and_update(); |
500 /// | // will block
501 /// | let _ = tx.send(());
502 /// // may deadlock |
503 /// let _ref2 = rx2.borrow_and_update(); |
504 /// ```
505 /// </details>
506 ///
507 /// [`changed`]: Receiver::changed
508 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
509 let inner = self.shared.value.read().unwrap();
510
511 // After obtaining a read-lock no concurrent writes could occur
512 // and the loaded version matches that of the borrowed reference.
513 let new_version = self.shared.state.load().version();
514 let has_changed = self.version != new_version;
515
516 // Mark the shared value as seen by updating the version
517 self.version = new_version;
518
519 Ref { inner, has_changed }
520 }
521
522 /// Checks if this channel contains a message that this receiver has not yet
523 /// seen. The new value is not marked as seen.
524 ///
525 /// Although this method is called `has_changed`, it does not check new
526 /// messages for equality, so this call will return true even if the new
527 /// message is equal to the old message.
528 ///
529 /// Returns an error if the channel has been closed.
530 /// # Examples
531 ///
532 /// ```
533 /// use tokio::sync::watch;
534 ///
535 /// #[tokio::main]
536 /// async fn main() {
537 /// let (tx, mut rx) = watch::channel("hello");
538 ///
539 /// tx.send("goodbye").unwrap();
540 ///
541 /// assert!(rx.has_changed().unwrap());
542 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
543 ///
544 /// // The value has been marked as seen
545 /// assert!(!rx.has_changed().unwrap());
546 ///
547 /// drop(tx);
548 /// // The `tx` handle has been dropped
549 /// assert!(rx.has_changed().is_err());
550 /// }
551 /// ```
552 pub fn has_changed(&self) -> Result<bool, error::RecvError> {
553 // Load the version from the state
554 let state = self.shared.state.load();
555 if state.is_closed() {
556 // The sender has dropped.
557 return Err(error::RecvError(()));
558 }
559 let new_version = state.version();
560
561 Ok(self.version != new_version)
562 }
563
564 /// Waits for a change notification, then marks the newest value as seen.
565 ///
566 /// If the newest value in the channel has not yet been marked seen when
567 /// this method is called, the method marks that value seen and returns
568 /// immediately. If the newest value has already been marked seen, then the
569 /// method sleeps until a new message is sent by the [`Sender`] connected to
570 /// this `Receiver`, or until the [`Sender`] is dropped.
571 ///
572 /// This method returns an error if and only if the [`Sender`] is dropped.
573 ///
574 /// # Cancel safety
575 ///
576 /// This method is cancel safe. If you use it as the event in a
577 /// [`tokio::select!`](crate::select) statement and some other branch
578 /// completes first, then it is guaranteed that no values have been marked
579 /// seen by this call to `changed`.
580 ///
581 /// [`Sender`]: struct@Sender
582 ///
583 /// # Examples
584 ///
585 /// ```
586 /// use tokio::sync::watch;
587 ///
588 /// #[tokio::main]
589 /// async fn main() {
590 /// let (tx, mut rx) = watch::channel("hello");
591 ///
592 /// tokio::spawn(async move {
593 /// tx.send("goodbye").unwrap();
594 /// });
595 ///
596 /// assert!(rx.changed().await.is_ok());
597 /// assert_eq!(*rx.borrow(), "goodbye");
598 ///
599 /// // The `tx` handle has been dropped
600 /// assert!(rx.changed().await.is_err());
601 /// }
602 /// ```
603 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
604 changed_impl(&self.shared, &mut self.version).await
605 }
606
607 /// Waits for a value that satisifes the provided condition.
608 ///
609 /// This method will call the provided closure whenever something is sent on
610 /// the channel. Once the closure returns `true`, this method will return a
611 /// reference to the value that was passed to the closure.
612 ///
613 /// Before `wait_for` starts waiting for changes, it will call the closure
614 /// on the current value. If the closure returns `true` when given the
615 /// current value, then `wait_for` will immediately return a reference to
616 /// the current value. This is the case even if the current value is already
617 /// considered seen.
618 ///
619 /// The watch channel only keeps track of the most recent value, so if
620 /// several messages are sent faster than `wait_for` is able to call the
621 /// closure, then it may skip some updates. Whenever the closure is called,
622 /// it will be called with the most recent value.
623 ///
624 /// When this function returns, the value that was passed to the closure
625 /// when it returned `true` will be considered seen.
626 ///
627 /// If the channel is closed, then `wait_for` will return a `RecvError`.
628 /// Once this happens, no more messages can ever be sent on the channel.
629 /// When an error is returned, it is guaranteed that the closure has been
630 /// called on the last value, and that it returned `false` for that value.
631 /// (If the closure returned `true`, then the last value would have been
632 /// returned instead of the error.)
633 ///
634 /// Like the `borrow` method, the returned borrow holds a read lock on the
635 /// inner value. This means that long-lived borrows could cause the producer
636 /// half to block. It is recommended to keep the borrow as short-lived as
637 /// possible. See the documentation of `borrow` for more information on
638 /// this.
639 ///
640 /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// use tokio::sync::watch;
646 ///
647 /// #[tokio::main]
648 ///
649 /// async fn main() {
650 /// let (tx, _rx) = watch::channel("hello");
651 ///
652 /// tx.send("goodbye").unwrap();
653 ///
654 /// // here we subscribe to a second receiver
655 /// // now in case of using `changed` we would have
656 /// // to first check the current value and then wait
657 /// // for changes or else `changed` would hang.
658 /// let mut rx2 = tx.subscribe();
659 ///
660 /// // in place of changed we have use `wait_for`
661 /// // which would automatically check the current value
662 /// // and wait for changes until the closure returns true.
663 /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
664 /// assert_eq!(*rx2.borrow(), "goodbye");
665 /// }
666 /// ```
667 pub async fn wait_for(
668 &mut self,
669 mut f: impl FnMut(&T) -> bool,
670 ) -> Result<Ref<'_, T>, error::RecvError> {
671 let mut closed = false;
672 loop {
673 {
674 let inner = self.shared.value.read().unwrap();
675
676 let new_version = self.shared.state.load().version();
677 let has_changed = self.version != new_version;
678 self.version = new_version;
679
680 if (!closed || has_changed) && f(&inner) {
681 return Ok(Ref { inner, has_changed });
682 }
683 }
684
685 if closed {
686 return Err(error::RecvError(()));
687 }
688
689 // Wait for the value to change.
690 closed = changed_impl(&self.shared, &mut self.version).await.is_err();
691 }
692 }
693
694 /// Returns `true` if receivers belong to the same channel.
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// let (tx, rx) = tokio::sync::watch::channel(true);
700 /// let rx2 = rx.clone();
701 /// assert!(rx.same_channel(&rx2));
702 ///
703 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
704 /// assert!(!rx3.same_channel(&rx2));
705 /// ```
706 pub fn same_channel(&self, other: &Self) -> bool {
707 Arc::ptr_eq(&self.shared, &other.shared)
708 }
709
710 cfg_process_driver! {
711 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
712 maybe_changed(&self.shared, &mut self.version)
713 }
714 }
715}
716
717fn maybe_changed<T>(
718 shared: &Shared<T>,
719 version: &mut Version,
720) -> Option<Result<(), error::RecvError>> {
721 // Load the version from the state
722 let state: StateSnapshot = shared.state.load();
723 let new_version: Version = state.version();
724
725 if *version != new_version {
726 // Observe the new version and return
727 *version = new_version;
728 return Some(Ok(()));
729 }
730
731 if state.is_closed() {
732 // All receivers have dropped.
733 return Some(Err(error::RecvError(())));
734 }
735
736 None
737}
738
739async fn changed_impl<T>(
740 shared: &Shared<T>,
741 version: &mut Version,
742) -> Result<(), error::RecvError> {
743 crate::trace::async_trace_leaf().await;
744
745 loop {
746 // In order to avoid a race condition, we first request a notification,
747 // **then** check the current value's version. If a new version exists,
748 // the notification request is dropped.
749 let notified: Notified<'_> = shared.notify_rx.notified();
750
751 if let Some(ret: Result<(), RecvError>) = maybe_changed(shared, version) {
752 return ret;
753 }
754
755 notified.await;
756 // loop around again in case the wake-up was spurious
757 }
758}
759
760impl<T> Clone for Receiver<T> {
761 fn clone(&self) -> Self {
762 let version: Version = self.version;
763 let shared: Arc> = self.shared.clone();
764
765 Self::from_shared(version, shared)
766 }
767}
768
769impl<T> Drop for Receiver<T> {
770 fn drop(&mut self) {
771 // No synchronization necessary as this is only used as a counter and
772 // not memory access.
773 if 1 == self.shared.ref_count_rx.fetch_sub(val:1, order:Relaxed) {
774 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
775 self.shared.notify_tx.notify_waiters();
776 }
777 }
778}
779
780impl<T> Sender<T> {
781 /// Sends a new value via the channel, notifying all receivers.
782 ///
783 /// This method fails if the channel is closed, which is the case when
784 /// every receiver has been dropped. It is possible to reopen the channel
785 /// using the [`subscribe`] method. However, when `send` fails, the value
786 /// isn't made available for future receivers (but returned with the
787 /// [`SendError`]).
788 ///
789 /// To always make a new value available for future receivers, even if no
790 /// receiver currently exists, one of the other send methods
791 /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
792 /// used instead.
793 ///
794 /// [`subscribe`]: Sender::subscribe
795 /// [`SendError`]: error::SendError
796 /// [`send_if_modified`]: Sender::send_if_modified
797 /// [`send_modify`]: Sender::send_modify
798 /// [`send_replace`]: Sender::send_replace
799 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
800 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
801 if 0 == self.receiver_count() {
802 return Err(error::SendError(value));
803 }
804
805 self.send_replace(value);
806 Ok(())
807 }
808
809 /// Modifies the watched value **unconditionally** in-place,
810 /// notifying all receivers.
811 ///
812 /// This can be useful for modifying the watched value, without
813 /// having to allocate a new instance. Additionally, this
814 /// method permits sending values even when there are no receivers.
815 ///
816 /// Prefer to use the more versatile function [`Self::send_if_modified()`]
817 /// if the value is only modified conditionally during the mutable borrow
818 /// to prevent unneeded change notifications for unmodified values.
819 ///
820 /// # Panics
821 ///
822 /// This function panics when the invocation of the `modify` closure panics.
823 /// No receivers are notified when panicking. All changes of the watched
824 /// value applied by the closure before panicking will be visible in
825 /// subsequent calls to `borrow`.
826 ///
827 /// # Examples
828 ///
829 /// ```
830 /// use tokio::sync::watch;
831 ///
832 /// struct State {
833 /// counter: usize,
834 /// }
835 /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
836 /// state_tx.send_modify(|state| state.counter += 1);
837 /// assert_eq!(state_rx.borrow().counter, 1);
838 /// ```
839 pub fn send_modify<F>(&self, modify: F)
840 where
841 F: FnOnce(&mut T),
842 {
843 self.send_if_modified(|value| {
844 modify(value);
845 true
846 });
847 }
848
849 /// Modifies the watched value **conditionally** in-place,
850 /// notifying all receivers only if modified.
851 ///
852 /// This can be useful for modifying the watched value, without
853 /// having to allocate a new instance. Additionally, this
854 /// method permits sending values even when there are no receivers.
855 ///
856 /// The `modify` closure must return `true` if the value has actually
857 /// been modified during the mutable borrow. It should only return `false`
858 /// if the value is guaranteed to be unmodified despite the mutable
859 /// borrow.
860 ///
861 /// Receivers are only notified if the closure returned `true`. If the
862 /// closure has modified the value but returned `false` this results
863 /// in a *silent modification*, i.e. the modified value will be visible
864 /// in subsequent calls to `borrow`, but receivers will not receive
865 /// a change notification.
866 ///
867 /// Returns the result of the closure, i.e. `true` if the value has
868 /// been modified and `false` otherwise.
869 ///
870 /// # Panics
871 ///
872 /// This function panics when the invocation of the `modify` closure panics.
873 /// No receivers are notified when panicking. All changes of the watched
874 /// value applied by the closure before panicking will be visible in
875 /// subsequent calls to `borrow`.
876 ///
877 /// # Examples
878 ///
879 /// ```
880 /// use tokio::sync::watch;
881 ///
882 /// struct State {
883 /// counter: usize,
884 /// }
885 /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
886 /// let inc_counter_if_odd = |state: &mut State| {
887 /// if state.counter % 2 == 1 {
888 /// state.counter += 1;
889 /// return true;
890 /// }
891 /// false
892 /// };
893 ///
894 /// assert_eq!(state_rx.borrow().counter, 1);
895 ///
896 /// assert!(!state_rx.has_changed().unwrap());
897 /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
898 /// assert!(state_rx.has_changed().unwrap());
899 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
900 ///
901 /// assert!(!state_rx.has_changed().unwrap());
902 /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
903 /// assert!(!state_rx.has_changed().unwrap());
904 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
905 /// ```
906 pub fn send_if_modified<F>(&self, modify: F) -> bool
907 where
908 F: FnOnce(&mut T) -> bool,
909 {
910 {
911 // Acquire the write lock and update the value.
912 let mut lock = self.shared.value.write().unwrap();
913
914 // Update the value and catch possible panic inside func.
915 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
916 match result {
917 Ok(modified) => {
918 if !modified {
919 // Abort, i.e. don't notify receivers if unmodified
920 return false;
921 }
922 // Continue if modified
923 }
924 Err(panicked) => {
925 // Drop the lock to avoid poisoning it.
926 drop(lock);
927 // Forward the panic to the caller.
928 panic::resume_unwind(panicked);
929 // Unreachable
930 }
931 };
932
933 self.shared.state.increment_version();
934
935 // Release the write lock.
936 //
937 // Incrementing the version counter while holding the lock ensures
938 // that receivers are able to figure out the version number of the
939 // value they are currently looking at.
940 drop(lock);
941 }
942
943 self.shared.notify_rx.notify_waiters();
944
945 true
946 }
947
948 /// Sends a new value via the channel, notifying all receivers and returning
949 /// the previous value in the channel.
950 ///
951 /// This can be useful for reusing the buffers inside a watched value.
952 /// Additionally, this method permits sending values even when there are no
953 /// receivers.
954 ///
955 /// # Examples
956 ///
957 /// ```
958 /// use tokio::sync::watch;
959 ///
960 /// let (tx, _rx) = watch::channel(1);
961 /// assert_eq!(tx.send_replace(2), 1);
962 /// assert_eq!(tx.send_replace(3), 2);
963 /// ```
964 pub fn send_replace(&self, mut value: T) -> T {
965 // swap old watched value with the new one
966 self.send_modify(|old| mem::swap(old, &mut value));
967
968 value
969 }
970
971 /// Returns a reference to the most recently sent value
972 ///
973 /// Outstanding borrows hold a read lock on the inner value. This means that
974 /// long-lived borrows could cause the producer half to block. It is recommended
975 /// to keep the borrow as short-lived as possible. Additionally, if you are
976 /// running in an environment that allows `!Send` futures, you must ensure that
977 /// the returned `Ref` type is never held alive across an `.await` point,
978 /// otherwise, it can lead to a deadlock.
979 ///
980 /// # Examples
981 ///
982 /// ```
983 /// use tokio::sync::watch;
984 ///
985 /// let (tx, _) = watch::channel("hello");
986 /// assert_eq!(*tx.borrow(), "hello");
987 /// ```
988 pub fn borrow(&self) -> Ref<'_, T> {
989 let inner = self.shared.value.read().unwrap();
990
991 // The sender/producer always sees the current version
992 let has_changed = false;
993
994 Ref { inner, has_changed }
995 }
996
997 /// Checks if the channel has been closed. This happens when all receivers
998 /// have dropped.
999 ///
1000 /// # Examples
1001 ///
1002 /// ```
1003 /// let (tx, rx) = tokio::sync::watch::channel(());
1004 /// assert!(!tx.is_closed());
1005 ///
1006 /// drop(rx);
1007 /// assert!(tx.is_closed());
1008 /// ```
1009 pub fn is_closed(&self) -> bool {
1010 self.receiver_count() == 0
1011 }
1012
1013 /// Completes when all receivers have dropped.
1014 ///
1015 /// This allows the producer to get notified when interest in the produced
1016 /// values is canceled and immediately stop doing work.
1017 ///
1018 /// # Cancel safety
1019 ///
1020 /// This method is cancel safe. Once the channel is closed, it stays closed
1021 /// forever and all future calls to `closed` will return immediately.
1022 ///
1023 /// # Examples
1024 ///
1025 /// ```
1026 /// use tokio::sync::watch;
1027 ///
1028 /// #[tokio::main]
1029 /// async fn main() {
1030 /// let (tx, rx) = watch::channel("hello");
1031 ///
1032 /// tokio::spawn(async move {
1033 /// // use `rx`
1034 /// drop(rx);
1035 /// });
1036 ///
1037 /// // Waits for `rx` to drop
1038 /// tx.closed().await;
1039 /// println!("the `rx` handles dropped")
1040 /// }
1041 /// ```
1042 pub async fn closed(&self) {
1043 crate::trace::async_trace_leaf().await;
1044
1045 while self.receiver_count() > 0 {
1046 let notified = self.shared.notify_tx.notified();
1047
1048 if self.receiver_count() == 0 {
1049 return;
1050 }
1051
1052 notified.await;
1053 // The channel could have been reopened in the meantime by calling
1054 // `subscribe`, so we loop again.
1055 }
1056 }
1057
1058 /// Creates a new [`Receiver`] connected to this `Sender`.
1059 ///
1060 /// All messages sent before this call to `subscribe` are initially marked
1061 /// as seen by the new `Receiver`.
1062 ///
1063 /// This method can be called even if there are no other receivers. In this
1064 /// case, the channel is reopened.
1065 ///
1066 /// # Examples
1067 ///
1068 /// The new channel will receive messages sent on this `Sender`.
1069 ///
1070 /// ```
1071 /// use tokio::sync::watch;
1072 ///
1073 /// #[tokio::main]
1074 /// async fn main() {
1075 /// let (tx, _rx) = watch::channel(0u64);
1076 ///
1077 /// tx.send(5).unwrap();
1078 ///
1079 /// let rx = tx.subscribe();
1080 /// assert_eq!(5, *rx.borrow());
1081 ///
1082 /// tx.send(10).unwrap();
1083 /// assert_eq!(10, *rx.borrow());
1084 /// }
1085 /// ```
1086 ///
1087 /// The most recent message is considered seen by the channel, so this test
1088 /// is guaranteed to pass.
1089 ///
1090 /// ```
1091 /// use tokio::sync::watch;
1092 /// use tokio::time::Duration;
1093 ///
1094 /// #[tokio::main]
1095 /// async fn main() {
1096 /// let (tx, _rx) = watch::channel(0u64);
1097 /// tx.send(5).unwrap();
1098 /// let mut rx = tx.subscribe();
1099 ///
1100 /// tokio::spawn(async move {
1101 /// // by spawning and sleeping, the message is sent after `main`
1102 /// // hits the call to `changed`.
1103 /// # if false {
1104 /// tokio::time::sleep(Duration::from_millis(10)).await;
1105 /// # }
1106 /// tx.send(100).unwrap();
1107 /// });
1108 ///
1109 /// rx.changed().await.unwrap();
1110 /// assert_eq!(100, *rx.borrow());
1111 /// }
1112 /// ```
1113 pub fn subscribe(&self) -> Receiver<T> {
1114 let shared = self.shared.clone();
1115 let version = shared.state.load().version();
1116
1117 // The CLOSED bit in the state tracks only whether the sender is
1118 // dropped, so we do not need to unset it if this reopens the channel.
1119 Receiver::from_shared(version, shared)
1120 }
1121
1122 /// Returns the number of receivers that currently exist.
1123 ///
1124 /// # Examples
1125 ///
1126 /// ```
1127 /// use tokio::sync::watch;
1128 ///
1129 /// #[tokio::main]
1130 /// async fn main() {
1131 /// let (tx, rx1) = watch::channel("hello");
1132 ///
1133 /// assert_eq!(1, tx.receiver_count());
1134 ///
1135 /// let mut _rx2 = rx1.clone();
1136 ///
1137 /// assert_eq!(2, tx.receiver_count());
1138 /// }
1139 /// ```
1140 pub fn receiver_count(&self) -> usize {
1141 self.shared.ref_count_rx.load(Relaxed)
1142 }
1143}
1144
1145impl<T> Drop for Sender<T> {
1146 fn drop(&mut self) {
1147 self.shared.state.set_closed();
1148 self.shared.notify_rx.notify_waiters();
1149 }
1150}
1151
1152// ===== impl Ref =====
1153
1154impl<T> ops::Deref for Ref<'_, T> {
1155 type Target = T;
1156
1157 fn deref(&self) -> &T {
1158 self.inner.deref()
1159 }
1160}
1161
1162#[cfg(all(test, loom))]
1163mod tests {
1164 use futures::future::FutureExt;
1165 use loom::thread;
1166
1167 // test for https://github.com/tokio-rs/tokio/issues/3168
1168 #[test]
1169 fn watch_spurious_wakeup() {
1170 loom::model(|| {
1171 let (send, mut recv) = crate::sync::watch::channel(0i32);
1172
1173 send.send(1).unwrap();
1174
1175 let send_thread = thread::spawn(move || {
1176 send.send(2).unwrap();
1177 send
1178 });
1179
1180 recv.changed().now_or_never();
1181
1182 let send = send_thread.join().unwrap();
1183 let recv_thread = thread::spawn(move || {
1184 recv.changed().now_or_never();
1185 recv.changed().now_or_never();
1186 recv
1187 });
1188
1189 send.send(3).unwrap();
1190
1191 let mut recv = recv_thread.join().unwrap();
1192 let send_thread = thread::spawn(move || {
1193 send.send(2).unwrap();
1194 });
1195
1196 recv.changed().now_or_never();
1197
1198 send_thread.join().unwrap();
1199 });
1200 }
1201
1202 #[test]
1203 fn watch_borrow() {
1204 loom::model(|| {
1205 let (send, mut recv) = crate::sync::watch::channel(0i32);
1206
1207 assert!(send.borrow().eq(&0));
1208 assert!(recv.borrow().eq(&0));
1209
1210 send.send(1).unwrap();
1211 assert!(send.borrow().eq(&1));
1212
1213 let send_thread = thread::spawn(move || {
1214 send.send(2).unwrap();
1215 send
1216 });
1217
1218 recv.changed().now_or_never();
1219
1220 let send = send_thread.join().unwrap();
1221 let recv_thread = thread::spawn(move || {
1222 recv.changed().now_or_never();
1223 recv.changed().now_or_never();
1224 recv
1225 });
1226
1227 send.send(3).unwrap();
1228
1229 let recv = recv_thread.join().unwrap();
1230 assert!(recv.borrow().eq(&3));
1231 assert!(send.borrow().eq(&3));
1232
1233 send.send(2).unwrap();
1234
1235 thread::spawn(move || {
1236 assert!(recv.borrow().eq(&2));
1237 });
1238 assert!(send.borrow().eq(&2));
1239 });
1240 }
1241}
1242