1 | #![cfg (feature = "sync" )] |
2 | |
3 | use tokio::sync::watch; |
4 | use tokio_stream::wrappers::WatchStream; |
5 | use tokio_stream::StreamExt; |
6 | use tokio_test::assert_pending; |
7 | use tokio_test::task::spawn; |
8 | |
9 | #[tokio::test ] |
10 | async fn watch_stream_message_not_twice() { |
11 | let (tx, rx) = watch::channel("hello" ); |
12 | |
13 | let mut counter = 0; |
14 | let mut stream = WatchStream::new(rx).map(move |payload| { |
15 | println!("{}" , payload); |
16 | if payload == "goodbye" { |
17 | counter += 1; |
18 | } |
19 | if counter >= 2 { |
20 | panic!("too many goodbyes" ); |
21 | } |
22 | }); |
23 | |
24 | let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); |
25 | |
26 | // Send goodbye just once |
27 | tx.send("goodbye" ).unwrap(); |
28 | |
29 | drop(tx); |
30 | task.await.unwrap(); |
31 | } |
32 | |
33 | #[tokio::test ] |
34 | async fn watch_stream_from_rx() { |
35 | let (tx, rx) = watch::channel("hello" ); |
36 | |
37 | let mut stream = WatchStream::from(rx); |
38 | |
39 | assert_eq!(stream.next().await.unwrap(), "hello" ); |
40 | |
41 | tx.send("bye" ).unwrap(); |
42 | |
43 | assert_eq!(stream.next().await.unwrap(), "bye" ); |
44 | } |
45 | |
46 | #[tokio::test ] |
47 | async fn watch_stream_from_changes() { |
48 | let (tx, rx) = watch::channel("hello" ); |
49 | |
50 | let mut stream = WatchStream::from_changes(rx); |
51 | |
52 | assert_pending!(spawn(&mut stream).poll_next()); |
53 | |
54 | tx.send("bye" ).unwrap(); |
55 | |
56 | assert_eq!(stream.next().await.unwrap(), "bye" ); |
57 | } |
58 | |