1#![allow(clippy::cognitive_complexity)]
2#![warn(rust_2018_idioms)]
3#![cfg(feature = "sync")]
4
5#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6use wasm_bindgen_test::wasm_bindgen_test as test;
7
8use tokio::sync::watch;
9use tokio_test::task::spawn;
10use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
11
12#[test]
13fn single_rx_recv() {
14 let (tx, mut rx) = watch::channel("one");
15
16 {
17 // Not initially notified
18 let mut t = spawn(rx.changed());
19 assert_pending!(t.poll());
20 }
21 assert_eq!(*rx.borrow(), "one");
22
23 {
24 let mut t = spawn(rx.changed());
25 assert_pending!(t.poll());
26
27 tx.send("two").unwrap();
28
29 assert!(t.is_woken());
30
31 assert_ready_ok!(t.poll());
32 }
33 assert_eq!(*rx.borrow(), "two");
34
35 {
36 let mut t = spawn(rx.changed());
37 assert_pending!(t.poll());
38
39 drop(tx);
40
41 assert!(t.is_woken());
42 assert_ready_err!(t.poll());
43 }
44 assert_eq!(*rx.borrow(), "two");
45}
46
47#[test]
48fn rx_version_underflow() {
49 let (_tx, mut rx) = watch::channel("one");
50
51 // Version starts at 2, validate we do not underflow
52 rx.mark_changed();
53 rx.mark_changed();
54}
55
56#[test]
57fn rx_mark_changed() {
58 let (tx, mut rx) = watch::channel("one");
59
60 let mut rx2 = rx.clone();
61 let mut rx3 = rx.clone();
62 let mut rx4 = rx.clone();
63 {
64 rx.mark_changed();
65 assert!(rx.has_changed().unwrap());
66
67 let mut t = spawn(rx.changed());
68 assert_ready_ok!(t.poll());
69 }
70
71 {
72 assert!(!rx2.has_changed().unwrap());
73
74 let mut t = spawn(rx2.changed());
75 assert_pending!(t.poll());
76 }
77
78 {
79 rx3.mark_changed();
80 assert_eq!(*rx3.borrow(), "one");
81
82 assert!(rx3.has_changed().unwrap());
83
84 assert_eq!(*rx3.borrow_and_update(), "one");
85
86 assert!(!rx3.has_changed().unwrap());
87
88 let mut t = spawn(rx3.changed());
89 assert_pending!(t.poll());
90 }
91
92 {
93 tx.send("two").unwrap();
94 assert!(rx4.has_changed().unwrap());
95 assert_eq!(*rx4.borrow_and_update(), "two");
96
97 rx4.mark_changed();
98 assert!(rx4.has_changed().unwrap());
99 assert_eq!(*rx4.borrow_and_update(), "two")
100 }
101
102 assert_eq!(*rx.borrow(), "two");
103}
104
105#[test]
106fn multi_rx() {
107 let (tx, mut rx1) = watch::channel("one");
108 let mut rx2 = rx1.clone();
109
110 {
111 let mut t1 = spawn(rx1.changed());
112 let mut t2 = spawn(rx2.changed());
113
114 assert_pending!(t1.poll());
115 assert_pending!(t2.poll());
116 }
117 assert_eq!(*rx1.borrow(), "one");
118 assert_eq!(*rx2.borrow(), "one");
119
120 let mut t2 = spawn(rx2.changed());
121
122 {
123 let mut t1 = spawn(rx1.changed());
124
125 assert_pending!(t1.poll());
126 assert_pending!(t2.poll());
127
128 tx.send("two").unwrap();
129
130 assert!(t1.is_woken());
131 assert!(t2.is_woken());
132
133 assert_ready_ok!(t1.poll());
134 }
135 assert_eq!(*rx1.borrow(), "two");
136
137 {
138 let mut t1 = spawn(rx1.changed());
139
140 assert_pending!(t1.poll());
141
142 tx.send("three").unwrap();
143
144 assert!(t1.is_woken());
145 assert!(t2.is_woken());
146
147 assert_ready_ok!(t1.poll());
148 assert_ready_ok!(t2.poll());
149 }
150 assert_eq!(*rx1.borrow(), "three");
151
152 drop(t2);
153
154 assert_eq!(*rx2.borrow(), "three");
155
156 {
157 let mut t1 = spawn(rx1.changed());
158 let mut t2 = spawn(rx2.changed());
159
160 assert_pending!(t1.poll());
161 assert_pending!(t2.poll());
162
163 tx.send("four").unwrap();
164
165 assert_ready_ok!(t1.poll());
166 assert_ready_ok!(t2.poll());
167 }
168 assert_eq!(*rx1.borrow(), "four");
169 assert_eq!(*rx2.borrow(), "four");
170}
171
172#[test]
173fn rx_observes_final_value() {
174 // Initial value
175
176 let (tx, mut rx) = watch::channel("one");
177 drop(tx);
178
179 {
180 let mut t1 = spawn(rx.changed());
181 assert_ready_err!(t1.poll());
182 }
183 assert_eq!(*rx.borrow(), "one");
184
185 // Sending a value
186
187 let (tx, mut rx) = watch::channel("one");
188
189 tx.send("two").unwrap();
190
191 {
192 let mut t1 = spawn(rx.changed());
193 assert_ready_ok!(t1.poll());
194 }
195 assert_eq!(*rx.borrow(), "two");
196
197 {
198 let mut t1 = spawn(rx.changed());
199 assert_pending!(t1.poll());
200
201 tx.send("three").unwrap();
202 drop(tx);
203
204 assert!(t1.is_woken());
205
206 assert_ready_ok!(t1.poll());
207 }
208 assert_eq!(*rx.borrow(), "three");
209
210 {
211 let mut t1 = spawn(rx.changed());
212 assert_ready_err!(t1.poll());
213 }
214 assert_eq!(*rx.borrow(), "three");
215}
216
217#[test]
218fn poll_close() {
219 let (tx, rx) = watch::channel("one");
220
221 {
222 let mut t = spawn(tx.closed());
223 assert_pending!(t.poll());
224
225 drop(rx);
226
227 assert!(t.is_woken());
228 assert_ready!(t.poll());
229 }
230
231 assert!(tx.send("two").is_err());
232}
233
234#[test]
235fn borrow_and_update() {
236 let (tx, mut rx) = watch::channel("one");
237
238 assert!(!rx.has_changed().unwrap());
239
240 tx.send("two").unwrap();
241 assert!(rx.has_changed().unwrap());
242 assert_ready!(spawn(rx.changed()).poll()).unwrap();
243 assert_pending!(spawn(rx.changed()).poll());
244 assert!(!rx.has_changed().unwrap());
245
246 tx.send("three").unwrap();
247 assert!(rx.has_changed().unwrap());
248 assert_eq!(*rx.borrow_and_update(), "three");
249 assert_pending!(spawn(rx.changed()).poll());
250 assert!(!rx.has_changed().unwrap());
251
252 drop(tx);
253 assert_eq!(*rx.borrow_and_update(), "three");
254 assert_ready!(spawn(rx.changed()).poll()).unwrap_err();
255 assert!(rx.has_changed().is_err());
256}
257
258#[test]
259fn reopened_after_subscribe() {
260 let (tx, rx) = watch::channel("one");
261 assert!(!tx.is_closed());
262
263 drop(rx);
264 assert!(tx.is_closed());
265
266 let rx = tx.subscribe();
267 assert!(!tx.is_closed());
268
269 drop(rx);
270 assert!(tx.is_closed());
271}
272
273#[test]
274#[cfg(panic = "unwind")]
275#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
276fn send_modify_panic() {
277 let (tx, mut rx) = watch::channel("one");
278
279 tx.send_modify(|old| *old = "two");
280 assert_eq!(*rx.borrow_and_update(), "two");
281
282 let mut rx2 = rx.clone();
283 assert_eq!(*rx2.borrow_and_update(), "two");
284
285 let mut task = spawn(rx2.changed());
286
287 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
288 tx.send_modify(|old| {
289 *old = "panicked";
290 panic!();
291 })
292 }));
293 assert!(result.is_err());
294
295 assert_pending!(task.poll());
296 assert_eq!(*rx.borrow(), "panicked");
297
298 tx.send_modify(|old| *old = "three");
299 assert_ready_ok!(task.poll());
300 assert_eq!(*rx.borrow_and_update(), "three");
301}
302