1#![cfg(feature = "sync")]
2
3use tokio::sync::watch;
4use tokio_stream::wrappers::WatchStream;
5use tokio_stream::StreamExt;
6use tokio_test::assert_pending;
7use tokio_test::task::spawn;
8
9#[tokio::test]
10async fn watch_stream_message_not_twice() {
11 let (tx, rx) = watch::channel("hello");
12
13 let mut counter = 0;
14 let mut stream = WatchStream::new(rx).map(move |payload| {
15 println!("{}", payload);
16 if payload == "goodbye" {
17 counter += 1;
18 }
19 if counter >= 2 {
20 panic!("too many goodbyes");
21 }
22 });
23
24 let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
25
26 // Send goodbye just once
27 tx.send("goodbye").unwrap();
28
29 drop(tx);
30 task.await.unwrap();
31}
32
33#[tokio::test]
34async fn watch_stream_from_rx() {
35 let (tx, rx) = watch::channel("hello");
36
37 let mut stream = WatchStream::from(rx);
38
39 assert_eq!(stream.next().await.unwrap(), "hello");
40
41 tx.send("bye").unwrap();
42
43 assert_eq!(stream.next().await.unwrap(), "bye");
44}
45
46#[tokio::test]
47async fn watch_stream_from_changes() {
48 let (tx, rx) = watch::channel("hello");
49
50 let mut stream = WatchStream::from_changes(rx);
51
52 assert_pending!(spawn(&mut stream).poll_next());
53
54 tx.send("bye").unwrap();
55
56 assert_eq!(stream.next().await.unwrap(), "bye");
57}
58