1#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
2
3//! A single-producer, multi-consumer channel that only retains the *last* sent
4//! value.
5//!
6//! This channel is useful for watching for changes to a value from multiple
7//! points in the code base, for example, changes to configuration values.
8//!
9//! # Usage
10//!
11//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer
12//! and consumer halves of the channel. The channel is created with an initial
13//! value.
14//!
15//! Each [`Receiver`] independently tracks the last value *seen* by its caller.
16//!
17//! To access the **current** value stored in the channel and mark it as *seen*
18//! by a given [`Receiver`], use [`Receiver::borrow_and_update()`].
19//!
20//! To access the current value **without** marking it as *seen*, use
21//! [`Receiver::borrow()`]. (If the value has already been marked *seen*,
22//! [`Receiver::borrow()`] is equivalent to [`Receiver::borrow_and_update()`].)
23//!
24//! For more information on when to use these methods, see
25//! [here](#borrow_and_update-versus-borrow).
26//!
27//! ## Change notifications
28//!
29//! The [`Receiver`] half provides an asynchronous [`changed`] method. This
30//! method is ready when a new, *unseen* value is sent via the [`Sender`] half.
31//!
32//! * [`Receiver::changed()`] returns `Ok(())` on receiving a new value, or
33//! `Err(`[`error::RecvError`]`)` if the [`Sender`] has been dropped.
34//! * If the current value is *unseen* when calling [`changed`], then
35//! [`changed`] will return immediately. If the current value is *seen*, then
36//! it will sleep until either a new message is sent via the [`Sender`] half,
37//! or the [`Sender`] is dropped.
38//! * On completion, the [`changed`] method marks the new value as *seen*.
39//! * At creation, the initial value is considered *seen*. In other words,
40//! [`Receiver::changed()`] will not return until a subsequent value is sent.
41//! * New [`Receiver`] instances can be created with [`Sender::subscribe()`].
42//! The current value at the time the [`Receiver`] is created is considered
43//! *seen*.
44//!
45//! ## `borrow_and_update` versus `borrow`
46//!
47//! If the receiver intends to await notifications from [`changed`] in a loop,
48//! [`Receiver::borrow_and_update()`] should be preferred over
49//! [`Receiver::borrow()`]. This avoids a potential race where a new value is
50//! sent between [`changed`] being ready and the value being read. (If
51//! [`Receiver::borrow()`] is used, the loop may run twice with the same value.)
52//!
53//! If the receiver is only interested in the current value, and does not intend
54//! to wait for changes, then [`Receiver::borrow()`] can be used. It may be more
55//! convenient to use [`borrow`](Receiver::borrow) since it's an `&self`
56//! method---[`borrow_and_update`](Receiver::borrow_and_update) requires `&mut
57//! self`.
58//!
59//! # Examples
60//!
61//! The following example prints `hello! world! `.
62//!
63//! ```
64//! use tokio::sync::watch;
65//! use tokio::time::{Duration, sleep};
66//!
67//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
68//! let (tx, mut rx) = watch::channel("hello");
69//!
70//! tokio::spawn(async move {
71//! // Use the equivalent of a "do-while" loop so the initial value is
72//! // processed before awaiting the `changed()` future.
73//! loop {
74//! println!("{}! ", *rx.borrow_and_update());
75//! if rx.changed().await.is_err() {
76//! break;
77//! }
78//! }
79//! });
80//!
81//! sleep(Duration::from_millis(100)).await;
82//! tx.send("world")?;
83//! # Ok(())
84//! # }
85//! ```
86//!
87//! # Closing
88//!
89//! [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect
90//! when all [`Receiver`] handles have been dropped. This indicates that there
91//! is no further interest in the values being produced and work can be stopped.
92//!
93//! The value in the channel will not be dropped until the sender and all
94//! receivers have been dropped.
95//!
96//! # Thread safety
97//!
98//! Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other
99//! threads and can be used in a concurrent environment. Clones of [`Receiver`]
100//! handles may be moved to separate threads and also used concurrently.
101//!
102//! [`Sender`]: crate::sync::watch::Sender
103//! [`Receiver`]: crate::sync::watch::Receiver
104//! [`changed`]: crate::sync::watch::Receiver::changed
105//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
106//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow
107//! [`Receiver::borrow_and_update()`]:
108//! crate::sync::watch::Receiver::borrow_and_update
109//! [`channel`]: crate::sync::watch::channel
110//! [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed
111//! [`Sender::closed`]: crate::sync::watch::Sender::closed
112//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
113
114use crate::sync::notify::Notify;
115
116use crate::loom::sync::atomic::AtomicUsize;
117use crate::loom::sync::atomic::Ordering::Relaxed;
118use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
119use std::fmt;
120use std::mem;
121use std::ops;
122use std::panic;
123
124/// Receives values from the associated [`Sender`](struct@Sender).
125///
126/// Instances are created by the [`channel`](fn@channel) function.
127///
128/// To turn this receiver into a `Stream`, you can use the [`WatchStream`]
129/// wrapper.
130///
131/// [`WatchStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.WatchStream.html
132#[derive(Debug)]
133pub struct Receiver<T> {
134 /// Pointer to the shared state
135 shared: Arc<Shared<T>>,
136
137 /// Last observed version
138 version: Version,
139}
140
141/// Sends values to the associated [`Receiver`](struct@Receiver).
142///
143/// Instances are created by the [`channel`](fn@channel) function.
144#[derive(Debug)]
145pub struct Sender<T> {
146 shared: Arc<Shared<T>>,
147}
148
149/// Returns a reference to the inner value.
150///
151/// Outstanding borrows hold a read lock on the inner value. This means that
152/// long-lived borrows could cause the producer half to block. It is recommended
153/// to keep the borrow as short-lived as possible. Additionally, if you are
154/// running in an environment that allows `!Send` futures, you must ensure that
155/// the returned `Ref` type is never held alive across an `.await` point,
156/// otherwise, it can lead to a deadlock.
157///
158/// The priority policy of the lock is dependent on the underlying lock
159/// implementation, and this type does not guarantee that any particular policy
160/// will be used. In particular, a producer which is waiting to acquire the lock
161/// in `send` might or might not block concurrent calls to `borrow`, e.g.:
162///
163/// <details><summary>Potential deadlock example</summary>
164///
165/// ```text
166/// // Task 1 (on thread A) | // Task 2 (on thread B)
167/// let _ref1 = rx.borrow(); |
168/// | // will block
169/// | let _ = tx.send(());
170/// // may deadlock |
171/// let _ref2 = rx.borrow(); |
172/// ```
173/// </details>
174#[derive(Debug)]
175pub struct Ref<'a, T> {
176 inner: RwLockReadGuard<'a, T>,
177 has_changed: bool,
178}
179
180impl<'a, T> Ref<'a, T> {
181 /// Indicates if the borrowed value is considered as _changed_ since the last
182 /// time it has been marked as seen.
183 ///
184 /// Unlike [`Receiver::has_changed()`], this method does not fail if the channel is closed.
185 ///
186 /// When borrowed from the [`Sender`] this function will always return `false`.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// use tokio::sync::watch;
192 ///
193 /// #[tokio::main]
194 /// async fn main() {
195 /// let (tx, mut rx) = watch::channel("hello");
196 ///
197 /// tx.send("goodbye").unwrap();
198 /// // The sender does never consider the value as changed.
199 /// assert!(!tx.borrow().has_changed());
200 ///
201 /// // Drop the sender immediately, just for testing purposes.
202 /// drop(tx);
203 ///
204 /// // Even if the sender has already been dropped...
205 /// assert!(rx.has_changed().is_err());
206 /// // ...the modified value is still readable and detected as changed.
207 /// assert_eq!(*rx.borrow(), "goodbye");
208 /// assert!(rx.borrow().has_changed());
209 ///
210 /// // Read the changed value and mark it as seen.
211 /// {
212 /// let received = rx.borrow_and_update();
213 /// assert_eq!(*received, "goodbye");
214 /// assert!(received.has_changed());
215 /// // Release the read lock when leaving this scope.
216 /// }
217 ///
218 /// // Now the value has already been marked as seen and could
219 /// // never be modified again (after the sender has been dropped).
220 /// assert!(!rx.borrow().has_changed());
221 /// }
222 /// ```
223 pub fn has_changed(&self) -> bool {
224 self.has_changed
225 }
226}
227
228struct Shared<T> {
229 /// The most recent value.
230 value: RwLock<T>,
231
232 /// The current version.
233 ///
234 /// The lowest bit represents a "closed" state. The rest of the bits
235 /// represent the current version.
236 state: AtomicState,
237
238 /// Tracks the number of `Receiver` instances.
239 ref_count_rx: AtomicUsize,
240
241 /// Notifies waiting receivers that the value changed.
242 notify_rx: big_notify::BigNotify,
243
244 /// Notifies any task listening for `Receiver` dropped events.
245 notify_tx: Notify,
246}
247
248impl<T: fmt::Debug> fmt::Debug for Shared<T> {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 let state: StateSnapshot = self.state.load();
251 f&mut DebugStruct<'_, '_>.debug_struct("Shared")
252 .field("value", &self.value)
253 .field("version", &state.version())
254 .field("is_closed", &state.is_closed())
255 .field(name:"ref_count_rx", &self.ref_count_rx)
256 .finish()
257 }
258}
259
260pub mod error {
261 //! Watch error types.
262
263 use std::error::Error;
264 use std::fmt;
265
266 /// Error produced when sending a value fails.
267 #[derive(PartialEq, Eq, Clone, Copy)]
268 pub struct SendError<T>(pub T);
269
270 // ===== impl SendError =====
271
272 impl<T> fmt::Debug for SendError<T> {
273 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
274 f.debug_struct("SendError").finish_non_exhaustive()
275 }
276 }
277
278 impl<T> fmt::Display for SendError<T> {
279 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
280 write!(fmt, "channel closed")
281 }
282 }
283
284 impl<T> Error for SendError<T> {}
285
286 /// Error produced when receiving a change notification.
287 #[derive(Debug, Clone)]
288 pub struct RecvError(pub(super) ());
289
290 // ===== impl RecvError =====
291
292 impl fmt::Display for RecvError {
293 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
294 write!(fmt, "channel closed")
295 }
296 }
297
298 impl Error for RecvError {}
299}
300
301mod big_notify {
302 use super::Notify;
303 use crate::sync::notify::Notified;
304
305 // To avoid contention on the lock inside the `Notify`, we store multiple
306 // copies of it. Then, we use either circular access or randomness to spread
307 // out threads over different `Notify` objects.
308 //
309 // Some simple benchmarks show that randomness performs slightly better than
310 // circular access (probably due to contention on `next`), so we prefer to
311 // use randomness when Tokio is compiled with a random number generator.
312 //
313 // When the random number generator is not available, we fall back to
314 // circular access.
315
316 pub(super) struct BigNotify {
317 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
318 next: std::sync::atomic::AtomicUsize,
319 inner: [Notify; 8],
320 }
321
322 impl BigNotify {
323 pub(super) fn new() -> Self {
324 Self {
325 #[cfg(not(all(
326 not(loom),
327 feature = "sync",
328 any(feature = "rt", feature = "macros")
329 )))]
330 next: std::sync::atomic::AtomicUsize::new(0),
331 inner: Default::default(),
332 }
333 }
334
335 pub(super) fn notify_waiters(&self) {
336 for notify in &self.inner {
337 notify.notify_waiters();
338 }
339 }
340
341 /// This function implements the case where randomness is not available.
342 #[cfg(not(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros"))))]
343 pub(super) fn notified(&self) -> Notified<'_> {
344 let i = self.next.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % 8;
345 self.inner[i].notified()
346 }
347
348 /// This function implements the case where randomness is available.
349 #[cfg(all(not(loom), feature = "sync", any(feature = "rt", feature = "macros")))]
350 pub(super) fn notified(&self) -> Notified<'_> {
351 let i = crate::runtime::context::thread_rng_n(8) as usize;
352 self.inner[i].notified()
353 }
354 }
355}
356
357use self::state::{AtomicState, Version};
358mod state {
359 use crate::loom::sync::atomic::AtomicUsize;
360 use crate::loom::sync::atomic::Ordering;
361
362 const CLOSED_BIT: usize = 1;
363
364 // Using 2 as the step size preserves the `CLOSED_BIT`.
365 const STEP_SIZE: usize = 2;
366
367 /// The version part of the state. The lowest bit is always zero.
368 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
369 pub(super) struct Version(usize);
370
371 /// Snapshot of the state. The first bit is used as the CLOSED bit.
372 /// The remaining bits are used as the version.
373 ///
374 /// The CLOSED bit tracks whether the Sender has been dropped. Dropping all
375 /// receivers does not set it.
376 #[derive(Copy, Clone, Debug)]
377 pub(super) struct StateSnapshot(usize);
378
379 /// The state stored in an atomic integer.
380 ///
381 /// The `Sender` uses `Release` ordering for storing a new state
382 /// and the `Receiver`s use `Acquire` ordering for loading the
383 /// current state. This ensures that written values are seen by
384 /// the `Receiver`s for a proper handover.
385 #[derive(Debug)]
386 pub(super) struct AtomicState(AtomicUsize);
387
388 impl Version {
389 /// Decrements the version.
390 pub(super) fn decrement(&mut self) {
391 // Using a wrapping decrement here is required to ensure that the
392 // operation is consistent with `std::sync::atomic::AtomicUsize::fetch_add()`
393 // which wraps on overflow.
394 self.0 = self.0.wrapping_sub(STEP_SIZE);
395 }
396
397 pub(super) const INITIAL: Self = Version(0);
398 }
399
400 impl StateSnapshot {
401 /// Extract the version from the state.
402 pub(super) fn version(self) -> Version {
403 Version(self.0 & !CLOSED_BIT)
404 }
405
406 /// Is the closed bit set?
407 pub(super) fn is_closed(self) -> bool {
408 (self.0 & CLOSED_BIT) == CLOSED_BIT
409 }
410 }
411
412 impl AtomicState {
413 /// Create a new `AtomicState` that is not closed and which has the
414 /// version set to `Version::INITIAL`.
415 pub(super) fn new() -> Self {
416 AtomicState(AtomicUsize::new(Version::INITIAL.0))
417 }
418
419 /// Load the current value of the state.
420 ///
421 /// Only used by the receiver and for debugging purposes.
422 ///
423 /// The receiver side (read-only) uses `Acquire` ordering for a proper handover
424 /// of the shared value with the sender side (single writer). The state is always
425 /// updated after modifying and before releasing the (exclusive) lock on the
426 /// shared value.
427 pub(super) fn load(&self) -> StateSnapshot {
428 StateSnapshot(self.0.load(Ordering::Acquire))
429 }
430
431 /// Increment the version counter.
432 pub(super) fn increment_version_while_locked(&self) {
433 // Use `Release` ordering to ensure that the shared value
434 // has been written before updating the version. The shared
435 // value is still protected by an exclusive lock during this
436 // method.
437 self.0.fetch_add(STEP_SIZE, Ordering::Release);
438 }
439
440 /// Set the closed bit in the state.
441 pub(super) fn set_closed(&self) {
442 self.0.fetch_or(CLOSED_BIT, Ordering::Release);
443 }
444 }
445}
446
447/// Creates a new watch channel, returning the "send" and "receive" handles.
448///
449/// All values sent by [`Sender`] will become visible to the [`Receiver`] handles.
450/// Only the last value sent is made available to the [`Receiver`] half. All
451/// intermediate values are dropped.
452///
453/// # Examples
454///
455/// The following example prints `hello! world! `.
456///
457/// ```
458/// use tokio::sync::watch;
459/// use tokio::time::{Duration, sleep};
460///
461/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
462/// let (tx, mut rx) = watch::channel("hello");
463///
464/// tokio::spawn(async move {
465/// // Use the equivalent of a "do-while" loop so the initial value is
466/// // processed before awaiting the `changed()` future.
467/// loop {
468/// println!("{}! ", *rx.borrow_and_update());
469/// if rx.changed().await.is_err() {
470/// break;
471/// }
472/// }
473/// });
474///
475/// sleep(Duration::from_millis(100)).await;
476/// tx.send("world")?;
477/// # Ok(())
478/// # }
479/// ```
480///
481/// [`Sender`]: struct@Sender
482/// [`Receiver`]: struct@Receiver
483pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
484 let shared: Arc> = Arc::new(data:Shared {
485 value: RwLock::new(init),
486 state: AtomicState::new(),
487 ref_count_rx: AtomicUsize::new(val:1),
488 notify_rx: big_notify::BigNotify::new(),
489 notify_tx: Notify::new(),
490 });
491
492 let tx: Sender = Sender {
493 shared: shared.clone(),
494 };
495
496 let rx: Receiver = Receiver {
497 shared,
498 version: Version::INITIAL,
499 };
500
501 (tx, rx)
502}
503
504impl<T> Receiver<T> {
505 fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
506 // No synchronization necessary as this is only used as a counter and
507 // not memory access.
508 shared.ref_count_rx.fetch_add(1, Relaxed);
509
510 Self { shared, version }
511 }
512
513 /// Returns a reference to the most recently sent value.
514 ///
515 /// This method does not mark the returned value as seen, so future calls to
516 /// [`changed`] may return immediately even if you have already seen the
517 /// value with a call to `borrow`.
518 ///
519 /// Outstanding borrows hold a read lock on the inner value. This means that
520 /// long-lived borrows could cause the producer half to block. It is recommended
521 /// to keep the borrow as short-lived as possible. Additionally, if you are
522 /// running in an environment that allows `!Send` futures, you must ensure that
523 /// the returned `Ref` type is never held alive across an `.await` point,
524 /// otherwise, it can lead to a deadlock.
525 ///
526 /// The priority policy of the lock is dependent on the underlying lock
527 /// implementation, and this type does not guarantee that any particular policy
528 /// will be used. In particular, a producer which is waiting to acquire the lock
529 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
530 ///
531 /// <details><summary>Potential deadlock example</summary>
532 ///
533 /// ```text
534 /// // Task 1 (on thread A) | // Task 2 (on thread B)
535 /// let _ref1 = rx.borrow(); |
536 /// | // will block
537 /// | let _ = tx.send(());
538 /// // may deadlock |
539 /// let _ref2 = rx.borrow(); |
540 /// ```
541 /// </details>
542 ///
543 /// For more information on when to use this method versus
544 /// [`borrow_and_update`], see [here](self#borrow_and_update-versus-borrow).
545 ///
546 /// [`changed`]: Receiver::changed
547 /// [`borrow_and_update`]: Receiver::borrow_and_update
548 ///
549 /// # Examples
550 ///
551 /// ```
552 /// use tokio::sync::watch;
553 ///
554 /// let (_, rx) = watch::channel("hello");
555 /// assert_eq!(*rx.borrow(), "hello");
556 /// ```
557 pub fn borrow(&self) -> Ref<'_, T> {
558 let inner = self.shared.value.read().unwrap();
559
560 // After obtaining a read-lock no concurrent writes could occur
561 // and the loaded version matches that of the borrowed reference.
562 let new_version = self.shared.state.load().version();
563 let has_changed = self.version != new_version;
564
565 Ref { inner, has_changed }
566 }
567
568 /// Returns a reference to the most recently sent value and marks that value
569 /// as seen.
570 ///
571 /// This method marks the current value as seen. Subsequent calls to [`changed`]
572 /// will not return immediately until the [`Sender`] has modified the shared
573 /// value again.
574 ///
575 /// Outstanding borrows hold a read lock on the inner value. This means that
576 /// long-lived borrows could cause the producer half to block. It is recommended
577 /// to keep the borrow as short-lived as possible. Additionally, if you are
578 /// running in an environment that allows `!Send` futures, you must ensure that
579 /// the returned `Ref` type is never held alive across an `.await` point,
580 /// otherwise, it can lead to a deadlock.
581 ///
582 /// The priority policy of the lock is dependent on the underlying lock
583 /// implementation, and this type does not guarantee that any particular policy
584 /// will be used. In particular, a producer which is waiting to acquire the lock
585 /// in `send` might or might not block concurrent calls to `borrow`, e.g.:
586 ///
587 /// <details><summary>Potential deadlock example</summary>
588 ///
589 /// ```text
590 /// // Task 1 (on thread A) | // Task 2 (on thread B)
591 /// let _ref1 = rx1.borrow_and_update(); |
592 /// | // will block
593 /// | let _ = tx.send(());
594 /// // may deadlock |
595 /// let _ref2 = rx2.borrow_and_update(); |
596 /// ```
597 /// </details>
598 ///
599 /// For more information on when to use this method versus [`borrow`], see
600 /// [here](self#borrow_and_update-versus-borrow).
601 ///
602 /// [`changed`]: Receiver::changed
603 /// [`borrow`]: Receiver::borrow
604 pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
605 let inner = self.shared.value.read().unwrap();
606
607 // After obtaining a read-lock no concurrent writes could occur
608 // and the loaded version matches that of the borrowed reference.
609 let new_version = self.shared.state.load().version();
610 let has_changed = self.version != new_version;
611
612 // Mark the shared value as seen by updating the version
613 self.version = new_version;
614
615 Ref { inner, has_changed }
616 }
617
618 /// Checks if this channel contains a message that this receiver has not yet
619 /// seen. The new value is not marked as seen.
620 ///
621 /// Although this method is called `has_changed`, it does not check new
622 /// messages for equality, so this call will return true even if the new
623 /// message is equal to the old message.
624 ///
625 /// Returns an error if the channel has been closed.
626 /// # Examples
627 ///
628 /// ```
629 /// use tokio::sync::watch;
630 ///
631 /// #[tokio::main]
632 /// async fn main() {
633 /// let (tx, mut rx) = watch::channel("hello");
634 ///
635 /// tx.send("goodbye").unwrap();
636 ///
637 /// assert!(rx.has_changed().unwrap());
638 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
639 ///
640 /// // The value has been marked as seen
641 /// assert!(!rx.has_changed().unwrap());
642 ///
643 /// drop(tx);
644 /// // The `tx` handle has been dropped
645 /// assert!(rx.has_changed().is_err());
646 /// }
647 /// ```
648 pub fn has_changed(&self) -> Result<bool, error::RecvError> {
649 // Load the version from the state
650 let state = self.shared.state.load();
651 if state.is_closed() {
652 // The sender has dropped.
653 return Err(error::RecvError(()));
654 }
655 let new_version = state.version();
656
657 Ok(self.version != new_version)
658 }
659
660 /// Marks the state as changed.
661 ///
662 /// After invoking this method [`has_changed()`](Self::has_changed)
663 /// returns `true` and [`changed()`](Self::changed) returns
664 /// immediately, regardless of whether a new value has been sent.
665 ///
666 /// This is useful for triggering an initial change notification after
667 /// subscribing to synchronize new receivers.
668 pub fn mark_changed(&mut self) {
669 self.version.decrement();
670 }
671
672 /// Marks the state as unchanged.
673 ///
674 /// The current value will be considered seen by the receiver.
675 ///
676 /// This is useful if you are not interested in the current value
677 /// visible in the receiver.
678 pub fn mark_unchanged(&mut self) {
679 let current_version = self.shared.state.load().version();
680 self.version = current_version;
681 }
682
683 /// Waits for a change notification, then marks the newest value as seen.
684 ///
685 /// If the newest value in the channel has not yet been marked seen when
686 /// this method is called, the method marks that value seen and returns
687 /// immediately. If the newest value has already been marked seen, then the
688 /// method sleeps until a new message is sent by the [`Sender`] connected to
689 /// this `Receiver`, or until the [`Sender`] is dropped.
690 ///
691 /// This method returns an error if and only if the [`Sender`] is dropped.
692 ///
693 /// For more information, see
694 /// [*Change notifications*](self#change-notifications) in the module-level documentation.
695 ///
696 /// # Cancel safety
697 ///
698 /// This method is cancel safe. If you use it as the event in a
699 /// [`tokio::select!`](crate::select) statement and some other branch
700 /// completes first, then it is guaranteed that no values have been marked
701 /// seen by this call to `changed`.
702 ///
703 /// [`Sender`]: struct@Sender
704 ///
705 /// # Examples
706 ///
707 /// ```
708 /// use tokio::sync::watch;
709 ///
710 /// #[tokio::main]
711 /// async fn main() {
712 /// let (tx, mut rx) = watch::channel("hello");
713 ///
714 /// tokio::spawn(async move {
715 /// tx.send("goodbye").unwrap();
716 /// });
717 ///
718 /// assert!(rx.changed().await.is_ok());
719 /// assert_eq!(*rx.borrow_and_update(), "goodbye");
720 ///
721 /// // The `tx` handle has been dropped
722 /// assert!(rx.changed().await.is_err());
723 /// }
724 /// ```
725 pub async fn changed(&mut self) -> Result<(), error::RecvError> {
726 changed_impl(&self.shared, &mut self.version).await
727 }
728
729 /// Waits for a value that satisfies the provided condition.
730 ///
731 /// This method will call the provided closure whenever something is sent on
732 /// the channel. Once the closure returns `true`, this method will return a
733 /// reference to the value that was passed to the closure.
734 ///
735 /// Before `wait_for` starts waiting for changes, it will call the closure
736 /// on the current value. If the closure returns `true` when given the
737 /// current value, then `wait_for` will immediately return a reference to
738 /// the current value. This is the case even if the current value is already
739 /// considered seen.
740 ///
741 /// The watch channel only keeps track of the most recent value, so if
742 /// several messages are sent faster than `wait_for` is able to call the
743 /// closure, then it may skip some updates. Whenever the closure is called,
744 /// it will be called with the most recent value.
745 ///
746 /// When this function returns, the value that was passed to the closure
747 /// when it returned `true` will be considered seen.
748 ///
749 /// If the channel is closed, then `wait_for` will return a `RecvError`.
750 /// Once this happens, no more messages can ever be sent on the channel.
751 /// When an error is returned, it is guaranteed that the closure has been
752 /// called on the last value, and that it returned `false` for that value.
753 /// (If the closure returned `true`, then the last value would have been
754 /// returned instead of the error.)
755 ///
756 /// Like the `borrow` method, the returned borrow holds a read lock on the
757 /// inner value. This means that long-lived borrows could cause the producer
758 /// half to block. It is recommended to keep the borrow as short-lived as
759 /// possible. See the documentation of `borrow` for more information on
760 /// this.
761 ///
762 /// [`Receiver::changed()`]: crate::sync::watch::Receiver::changed
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// use tokio::sync::watch;
768 ///
769 /// #[tokio::main]
770 ///
771 /// async fn main() {
772 /// let (tx, _rx) = watch::channel("hello");
773 ///
774 /// tx.send("goodbye").unwrap();
775 ///
776 /// // here we subscribe to a second receiver
777 /// // now in case of using `changed` we would have
778 /// // to first check the current value and then wait
779 /// // for changes or else `changed` would hang.
780 /// let mut rx2 = tx.subscribe();
781 ///
782 /// // in place of changed we have use `wait_for`
783 /// // which would automatically check the current value
784 /// // and wait for changes until the closure returns true.
785 /// assert!(rx2.wait_for(|val| *val == "goodbye").await.is_ok());
786 /// assert_eq!(*rx2.borrow(), "goodbye");
787 /// }
788 /// ```
789 pub async fn wait_for(
790 &mut self,
791 mut f: impl FnMut(&T) -> bool,
792 ) -> Result<Ref<'_, T>, error::RecvError> {
793 let mut closed = false;
794 loop {
795 {
796 let inner = self.shared.value.read().unwrap();
797
798 let new_version = self.shared.state.load().version();
799 let has_changed = self.version != new_version;
800 self.version = new_version;
801
802 if !closed || has_changed {
803 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&inner)));
804 match result {
805 Ok(true) => {
806 return Ok(Ref { inner, has_changed });
807 }
808 Ok(false) => {
809 // Skip the value.
810 }
811 Err(panicked) => {
812 // Drop the read-lock to avoid poisoning it.
813 drop(inner);
814 // Forward the panic to the caller.
815 panic::resume_unwind(panicked);
816 // Unreachable
817 }
818 };
819 }
820 }
821
822 if closed {
823 return Err(error::RecvError(()));
824 }
825
826 // Wait for the value to change.
827 closed = changed_impl(&self.shared, &mut self.version).await.is_err();
828 }
829 }
830
831 /// Returns `true` if receivers belong to the same channel.
832 ///
833 /// # Examples
834 ///
835 /// ```
836 /// let (tx, rx) = tokio::sync::watch::channel(true);
837 /// let rx2 = rx.clone();
838 /// assert!(rx.same_channel(&rx2));
839 ///
840 /// let (tx3, rx3) = tokio::sync::watch::channel(true);
841 /// assert!(!rx3.same_channel(&rx2));
842 /// ```
843 pub fn same_channel(&self, other: &Self) -> bool {
844 Arc::ptr_eq(&self.shared, &other.shared)
845 }
846
847 cfg_process_driver! {
848 pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
849 maybe_changed(&self.shared, &mut self.version)
850 }
851 }
852}
853
854fn maybe_changed<T>(
855 shared: &Shared<T>,
856 version: &mut Version,
857) -> Option<Result<(), error::RecvError>> {
858 // Load the version from the state
859 let state: StateSnapshot = shared.state.load();
860 let new_version: Version = state.version();
861
862 if *version != new_version {
863 // Observe the new version and return
864 *version = new_version;
865 return Some(Ok(()));
866 }
867
868 if state.is_closed() {
869 // The sender has been dropped.
870 return Some(Err(error::RecvError(())));
871 }
872
873 None
874}
875
876async fn changed_impl<T>(
877 shared: &Shared<T>,
878 version: &mut Version,
879) -> Result<(), error::RecvError> {
880 crate::trace::async_trace_leaf().await;
881
882 loop {
883 // In order to avoid a race condition, we first request a notification,
884 // **then** check the current value's version. If a new version exists,
885 // the notification request is dropped.
886 let notified: Notified<'_> = shared.notify_rx.notified();
887
888 if let Some(ret: Result<(), RecvError>) = maybe_changed(shared, version) {
889 return ret;
890 }
891
892 notified.await;
893 // loop around again in case the wake-up was spurious
894 }
895}
896
897impl<T> Clone for Receiver<T> {
898 fn clone(&self) -> Self {
899 let version: Version = self.version;
900 let shared: Arc> = self.shared.clone();
901
902 Self::from_shared(version, shared)
903 }
904}
905
906impl<T> Drop for Receiver<T> {
907 fn drop(&mut self) {
908 // No synchronization necessary as this is only used as a counter and
909 // not memory access.
910 if 1 == self.shared.ref_count_rx.fetch_sub(val:1, order:Relaxed) {
911 // This is the last `Receiver` handle, tasks waiting on `Sender::closed()`
912 self.shared.notify_tx.notify_waiters();
913 }
914 }
915}
916
917impl<T> Sender<T> {
918 /// Creates the sending-half of the [`watch`] channel.
919 ///
920 /// See documentation of [`watch::channel`] for errors when calling this function.
921 /// Beware that attempting to send a value when there are no receivers will
922 /// return an error.
923 ///
924 /// [`watch`]: crate::sync::watch
925 /// [`watch::channel`]: crate::sync::watch
926 ///
927 /// # Examples
928 /// ```
929 /// let sender = tokio::sync::watch::Sender::new(0u8);
930 /// assert!(sender.send(3).is_err());
931 /// let _rec = sender.subscribe();
932 /// assert!(sender.send(4).is_ok());
933 /// ```
934 pub fn new(init: T) -> Self {
935 let (tx, _) = channel(init);
936 tx
937 }
938
939 /// Sends a new value via the channel, notifying all receivers.
940 ///
941 /// This method fails if the channel is closed, which is the case when
942 /// every receiver has been dropped. It is possible to reopen the channel
943 /// using the [`subscribe`] method. However, when `send` fails, the value
944 /// isn't made available for future receivers (but returned with the
945 /// [`SendError`]).
946 ///
947 /// To always make a new value available for future receivers, even if no
948 /// receiver currently exists, one of the other send methods
949 /// ([`send_if_modified`], [`send_modify`], or [`send_replace`]) can be
950 /// used instead.
951 ///
952 /// [`subscribe`]: Sender::subscribe
953 /// [`SendError`]: error::SendError
954 /// [`send_if_modified`]: Sender::send_if_modified
955 /// [`send_modify`]: Sender::send_modify
956 /// [`send_replace`]: Sender::send_replace
957 pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
958 // This is pretty much only useful as a hint anyway, so synchronization isn't critical.
959 if 0 == self.receiver_count() {
960 return Err(error::SendError(value));
961 }
962
963 self.send_replace(value);
964 Ok(())
965 }
966
967 /// Modifies the watched value **unconditionally** in-place,
968 /// notifying all receivers.
969 ///
970 /// This can be useful for modifying the watched value, without
971 /// having to allocate a new instance. Additionally, this
972 /// method permits sending values even when there are no receivers.
973 ///
974 /// Prefer to use the more versatile function [`Self::send_if_modified()`]
975 /// if the value is only modified conditionally during the mutable borrow
976 /// to prevent unneeded change notifications for unmodified values.
977 ///
978 /// # Panics
979 ///
980 /// This function panics when the invocation of the `modify` closure panics.
981 /// No receivers are notified when panicking. All changes of the watched
982 /// value applied by the closure before panicking will be visible in
983 /// subsequent calls to `borrow`.
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// use tokio::sync::watch;
989 ///
990 /// struct State {
991 /// counter: usize,
992 /// }
993 /// let (state_tx, state_rx) = watch::channel(State { counter: 0 });
994 /// state_tx.send_modify(|state| state.counter += 1);
995 /// assert_eq!(state_rx.borrow().counter, 1);
996 /// ```
997 pub fn send_modify<F>(&self, modify: F)
998 where
999 F: FnOnce(&mut T),
1000 {
1001 self.send_if_modified(|value| {
1002 modify(value);
1003 true
1004 });
1005 }
1006
1007 /// Modifies the watched value **conditionally** in-place,
1008 /// notifying all receivers only if modified.
1009 ///
1010 /// This can be useful for modifying the watched value, without
1011 /// having to allocate a new instance. Additionally, this
1012 /// method permits sending values even when there are no receivers.
1013 ///
1014 /// The `modify` closure must return `true` if the value has actually
1015 /// been modified during the mutable borrow. It should only return `false`
1016 /// if the value is guaranteed to be unmodified despite the mutable
1017 /// borrow.
1018 ///
1019 /// Receivers are only notified if the closure returned `true`. If the
1020 /// closure has modified the value but returned `false` this results
1021 /// in a *silent modification*, i.e. the modified value will be visible
1022 /// in subsequent calls to `borrow`, but receivers will not receive
1023 /// a change notification.
1024 ///
1025 /// Returns the result of the closure, i.e. `true` if the value has
1026 /// been modified and `false` otherwise.
1027 ///
1028 /// # Panics
1029 ///
1030 /// This function panics when the invocation of the `modify` closure panics.
1031 /// No receivers are notified when panicking. All changes of the watched
1032 /// value applied by the closure before panicking will be visible in
1033 /// subsequent calls to `borrow`.
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```
1038 /// use tokio::sync::watch;
1039 ///
1040 /// struct State {
1041 /// counter: usize,
1042 /// }
1043 /// let (state_tx, mut state_rx) = watch::channel(State { counter: 1 });
1044 /// let inc_counter_if_odd = |state: &mut State| {
1045 /// if state.counter % 2 == 1 {
1046 /// state.counter += 1;
1047 /// return true;
1048 /// }
1049 /// false
1050 /// };
1051 ///
1052 /// assert_eq!(state_rx.borrow().counter, 1);
1053 ///
1054 /// assert!(!state_rx.has_changed().unwrap());
1055 /// assert!(state_tx.send_if_modified(inc_counter_if_odd));
1056 /// assert!(state_rx.has_changed().unwrap());
1057 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1058 ///
1059 /// assert!(!state_rx.has_changed().unwrap());
1060 /// assert!(!state_tx.send_if_modified(inc_counter_if_odd));
1061 /// assert!(!state_rx.has_changed().unwrap());
1062 /// assert_eq!(state_rx.borrow_and_update().counter, 2);
1063 /// ```
1064 pub fn send_if_modified<F>(&self, modify: F) -> bool
1065 where
1066 F: FnOnce(&mut T) -> bool,
1067 {
1068 {
1069 // Acquire the write lock and update the value.
1070 let mut lock = self.shared.value.write().unwrap();
1071
1072 // Update the value and catch possible panic inside func.
1073 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
1074 match result {
1075 Ok(modified) => {
1076 if !modified {
1077 // Abort, i.e. don't notify receivers if unmodified
1078 return false;
1079 }
1080 // Continue if modified
1081 }
1082 Err(panicked) => {
1083 // Drop the lock to avoid poisoning it.
1084 drop(lock);
1085 // Forward the panic to the caller.
1086 panic::resume_unwind(panicked);
1087 // Unreachable
1088 }
1089 };
1090
1091 self.shared.state.increment_version_while_locked();
1092
1093 // Release the write lock.
1094 //
1095 // Incrementing the version counter while holding the lock ensures
1096 // that receivers are able to figure out the version number of the
1097 // value they are currently looking at.
1098 drop(lock);
1099 }
1100
1101 self.shared.notify_rx.notify_waiters();
1102
1103 true
1104 }
1105
1106 /// Sends a new value via the channel, notifying all receivers and returning
1107 /// the previous value in the channel.
1108 ///
1109 /// This can be useful for reusing the buffers inside a watched value.
1110 /// Additionally, this method permits sending values even when there are no
1111 /// receivers.
1112 ///
1113 /// # Examples
1114 ///
1115 /// ```
1116 /// use tokio::sync::watch;
1117 ///
1118 /// let (tx, _rx) = watch::channel(1);
1119 /// assert_eq!(tx.send_replace(2), 1);
1120 /// assert_eq!(tx.send_replace(3), 2);
1121 /// ```
1122 pub fn send_replace(&self, mut value: T) -> T {
1123 // swap old watched value with the new one
1124 self.send_modify(|old| mem::swap(old, &mut value));
1125
1126 value
1127 }
1128
1129 /// Returns a reference to the most recently sent value
1130 ///
1131 /// Outstanding borrows hold a read lock on the inner value. This means that
1132 /// long-lived borrows could cause the producer half to block. It is recommended
1133 /// to keep the borrow as short-lived as possible. Additionally, if you are
1134 /// running in an environment that allows `!Send` futures, you must ensure that
1135 /// the returned `Ref` type is never held alive across an `.await` point,
1136 /// otherwise, it can lead to a deadlock.
1137 ///
1138 /// # Examples
1139 ///
1140 /// ```
1141 /// use tokio::sync::watch;
1142 ///
1143 /// let (tx, _) = watch::channel("hello");
1144 /// assert_eq!(*tx.borrow(), "hello");
1145 /// ```
1146 pub fn borrow(&self) -> Ref<'_, T> {
1147 let inner = self.shared.value.read().unwrap();
1148
1149 // The sender/producer always sees the current version
1150 let has_changed = false;
1151
1152 Ref { inner, has_changed }
1153 }
1154
1155 /// Checks if the channel has been closed. This happens when all receivers
1156 /// have dropped.
1157 ///
1158 /// # Examples
1159 ///
1160 /// ```
1161 /// let (tx, rx) = tokio::sync::watch::channel(());
1162 /// assert!(!tx.is_closed());
1163 ///
1164 /// drop(rx);
1165 /// assert!(tx.is_closed());
1166 /// ```
1167 pub fn is_closed(&self) -> bool {
1168 self.receiver_count() == 0
1169 }
1170
1171 /// Completes when all receivers have dropped.
1172 ///
1173 /// This allows the producer to get notified when interest in the produced
1174 /// values is canceled and immediately stop doing work.
1175 ///
1176 /// # Cancel safety
1177 ///
1178 /// This method is cancel safe. Once the channel is closed, it stays closed
1179 /// forever and all future calls to `closed` will return immediately.
1180 ///
1181 /// # Examples
1182 ///
1183 /// ```
1184 /// use tokio::sync::watch;
1185 ///
1186 /// #[tokio::main]
1187 /// async fn main() {
1188 /// let (tx, rx) = watch::channel("hello");
1189 ///
1190 /// tokio::spawn(async move {
1191 /// // use `rx`
1192 /// drop(rx);
1193 /// });
1194 ///
1195 /// // Waits for `rx` to drop
1196 /// tx.closed().await;
1197 /// println!("the `rx` handles dropped")
1198 /// }
1199 /// ```
1200 pub async fn closed(&self) {
1201 crate::trace::async_trace_leaf().await;
1202
1203 while self.receiver_count() > 0 {
1204 let notified = self.shared.notify_tx.notified();
1205
1206 if self.receiver_count() == 0 {
1207 return;
1208 }
1209
1210 notified.await;
1211 // The channel could have been reopened in the meantime by calling
1212 // `subscribe`, so we loop again.
1213 }
1214 }
1215
1216 /// Creates a new [`Receiver`] connected to this `Sender`.
1217 ///
1218 /// All messages sent before this call to `subscribe` are initially marked
1219 /// as seen by the new `Receiver`.
1220 ///
1221 /// This method can be called even if there are no other receivers. In this
1222 /// case, the channel is reopened.
1223 ///
1224 /// # Examples
1225 ///
1226 /// The new channel will receive messages sent on this `Sender`.
1227 ///
1228 /// ```
1229 /// use tokio::sync::watch;
1230 ///
1231 /// #[tokio::main]
1232 /// async fn main() {
1233 /// let (tx, _rx) = watch::channel(0u64);
1234 ///
1235 /// tx.send(5).unwrap();
1236 ///
1237 /// let rx = tx.subscribe();
1238 /// assert_eq!(5, *rx.borrow());
1239 ///
1240 /// tx.send(10).unwrap();
1241 /// assert_eq!(10, *rx.borrow());
1242 /// }
1243 /// ```
1244 ///
1245 /// The most recent message is considered seen by the channel, so this test
1246 /// is guaranteed to pass.
1247 ///
1248 /// ```
1249 /// use tokio::sync::watch;
1250 /// use tokio::time::Duration;
1251 ///
1252 /// #[tokio::main]
1253 /// async fn main() {
1254 /// let (tx, _rx) = watch::channel(0u64);
1255 /// tx.send(5).unwrap();
1256 /// let mut rx = tx.subscribe();
1257 ///
1258 /// tokio::spawn(async move {
1259 /// // by spawning and sleeping, the message is sent after `main`
1260 /// // hits the call to `changed`.
1261 /// # if false {
1262 /// tokio::time::sleep(Duration::from_millis(10)).await;
1263 /// # }
1264 /// tx.send(100).unwrap();
1265 /// });
1266 ///
1267 /// rx.changed().await.unwrap();
1268 /// assert_eq!(100, *rx.borrow());
1269 /// }
1270 /// ```
1271 pub fn subscribe(&self) -> Receiver<T> {
1272 let shared = self.shared.clone();
1273 let version = shared.state.load().version();
1274
1275 // The CLOSED bit in the state tracks only whether the sender is
1276 // dropped, so we do not need to unset it if this reopens the channel.
1277 Receiver::from_shared(version, shared)
1278 }
1279
1280 /// Returns the number of receivers that currently exist.
1281 ///
1282 /// # Examples
1283 ///
1284 /// ```
1285 /// use tokio::sync::watch;
1286 ///
1287 /// #[tokio::main]
1288 /// async fn main() {
1289 /// let (tx, rx1) = watch::channel("hello");
1290 ///
1291 /// assert_eq!(1, tx.receiver_count());
1292 ///
1293 /// let mut _rx2 = rx1.clone();
1294 ///
1295 /// assert_eq!(2, tx.receiver_count());
1296 /// }
1297 /// ```
1298 pub fn receiver_count(&self) -> usize {
1299 self.shared.ref_count_rx.load(Relaxed)
1300 }
1301}
1302
1303impl<T> Drop for Sender<T> {
1304 fn drop(&mut self) {
1305 self.shared.state.set_closed();
1306 self.shared.notify_rx.notify_waiters();
1307 }
1308}
1309
1310// ===== impl Ref =====
1311
1312impl<T> ops::Deref for Ref<'_, T> {
1313 type Target = T;
1314
1315 fn deref(&self) -> &T {
1316 self.inner.deref()
1317 }
1318}
1319
1320#[cfg(all(test, loom))]
1321mod tests {
1322 use futures::future::FutureExt;
1323 use loom::thread;
1324
1325 // test for https://github.com/tokio-rs/tokio/issues/3168
1326 #[test]
1327 fn watch_spurious_wakeup() {
1328 loom::model(|| {
1329 let (send, mut recv) = crate::sync::watch::channel(0i32);
1330
1331 send.send(1).unwrap();
1332
1333 let send_thread = thread::spawn(move || {
1334 send.send(2).unwrap();
1335 send
1336 });
1337
1338 recv.changed().now_or_never();
1339
1340 let send = send_thread.join().unwrap();
1341 let recv_thread = thread::spawn(move || {
1342 recv.changed().now_or_never();
1343 recv.changed().now_or_never();
1344 recv
1345 });
1346
1347 send.send(3).unwrap();
1348
1349 let mut recv = recv_thread.join().unwrap();
1350 let send_thread = thread::spawn(move || {
1351 send.send(2).unwrap();
1352 });
1353
1354 recv.changed().now_or_never();
1355
1356 send_thread.join().unwrap();
1357 });
1358 }
1359
1360 #[test]
1361 fn watch_borrow() {
1362 loom::model(|| {
1363 let (send, mut recv) = crate::sync::watch::channel(0i32);
1364
1365 assert!(send.borrow().eq(&0));
1366 assert!(recv.borrow().eq(&0));
1367
1368 send.send(1).unwrap();
1369 assert!(send.borrow().eq(&1));
1370
1371 let send_thread = thread::spawn(move || {
1372 send.send(2).unwrap();
1373 send
1374 });
1375
1376 recv.changed().now_or_never();
1377
1378 let send = send_thread.join().unwrap();
1379 let recv_thread = thread::spawn(move || {
1380 recv.changed().now_or_never();
1381 recv.changed().now_or_never();
1382 recv
1383 });
1384
1385 send.send(3).unwrap();
1386
1387 let recv = recv_thread.join().unwrap();
1388 assert!(recv.borrow().eq(&3));
1389 assert!(send.borrow().eq(&3));
1390
1391 send.send(2).unwrap();
1392
1393 thread::spawn(move || {
1394 assert!(recv.borrow().eq(&2));
1395 });
1396 assert!(send.borrow().eq(&2));
1397 });
1398 }
1399}
1400