1#![allow(clippy::cognitive_complexity)]
2#![warn(rust_2018_idioms)]
3#![cfg(feature = "sync")]
4
5#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6use wasm_bindgen_test::wasm_bindgen_test as test;
7
8use tokio::sync::broadcast;
9use tokio_test::task;
10use tokio_test::{
11 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12};
13
14use std::sync::Arc;
15
16macro_rules! assert_recv {
17 ($e:expr) => {
18 match $e.try_recv() {
19 Ok(value) => value,
20 Err(e) => panic!("expected recv; got = {:?}", e),
21 }
22 };
23}
24
25macro_rules! assert_empty {
26 ($e:expr) => {
27 match $e.try_recv() {
28 Ok(value) => panic!("expected empty; got = {:?}", value),
29 Err(broadcast::error::TryRecvError::Empty) => {}
30 Err(e) => panic!("expected empty; got = {:?}", e),
31 }
32 };
33}
34
35macro_rules! assert_lagged {
36 ($e:expr, $n:expr) => {
37 match assert_err!($e) {
38 broadcast::error::TryRecvError::Lagged(n) => {
39 assert_eq!(n, $n);
40 }
41 _ => panic!("did not lag"),
42 }
43 };
44}
45
46macro_rules! assert_closed {
47 ($e:expr) => {
48 match assert_err!($e) {
49 broadcast::error::TryRecvError::Closed => {}
50 _ => panic!("is not closed"),
51 }
52 };
53}
54
55trait AssertSend: Send + Sync {}
56impl AssertSend for broadcast::Sender<i32> {}
57impl AssertSend for broadcast::Receiver<i32> {}
58
59#[test]
60fn send_try_recv_bounded() {
61 let (tx, mut rx) = broadcast::channel(16);
62
63 assert_empty!(rx);
64
65 let n = assert_ok!(tx.send("hello"));
66 assert_eq!(n, 1);
67
68 let val = assert_recv!(rx);
69 assert_eq!(val, "hello");
70
71 assert_empty!(rx);
72}
73
74#[test]
75fn send_two_recv() {
76 let (tx, mut rx1) = broadcast::channel(16);
77 let mut rx2 = tx.subscribe();
78
79 assert_empty!(rx1);
80 assert_empty!(rx2);
81
82 let n = assert_ok!(tx.send("hello"));
83 assert_eq!(n, 2);
84
85 let val = assert_recv!(rx1);
86 assert_eq!(val, "hello");
87
88 let val = assert_recv!(rx2);
89 assert_eq!(val, "hello");
90
91 assert_empty!(rx1);
92 assert_empty!(rx2);
93}
94
95#[test]
96fn send_recv_bounded() {
97 let (tx, mut rx) = broadcast::channel(16);
98
99 let mut recv = task::spawn(rx.recv());
100
101 assert_pending!(recv.poll());
102
103 assert_ok!(tx.send("hello"));
104
105 assert!(recv.is_woken());
106 let val = assert_ready_ok!(recv.poll());
107 assert_eq!(val, "hello");
108}
109
110#[test]
111fn send_two_recv_bounded() {
112 let (tx, mut rx1) = broadcast::channel(16);
113 let mut rx2 = tx.subscribe();
114
115 let mut recv1 = task::spawn(rx1.recv());
116 let mut recv2 = task::spawn(rx2.recv());
117
118 assert_pending!(recv1.poll());
119 assert_pending!(recv2.poll());
120
121 assert_ok!(tx.send("hello"));
122
123 assert!(recv1.is_woken());
124 assert!(recv2.is_woken());
125
126 let val1 = assert_ready_ok!(recv1.poll());
127 let val2 = assert_ready_ok!(recv2.poll());
128 assert_eq!(val1, "hello");
129 assert_eq!(val2, "hello");
130
131 drop((recv1, recv2));
132
133 let mut recv1 = task::spawn(rx1.recv());
134 let mut recv2 = task::spawn(rx2.recv());
135
136 assert_pending!(recv1.poll());
137
138 assert_ok!(tx.send("world"));
139
140 assert!(recv1.is_woken());
141 assert!(!recv2.is_woken());
142
143 let val1 = assert_ready_ok!(recv1.poll());
144 let val2 = assert_ready_ok!(recv2.poll());
145 assert_eq!(val1, "world");
146 assert_eq!(val2, "world");
147}
148
149#[test]
150fn change_tasks() {
151 let (tx, mut rx) = broadcast::channel(1);
152
153 let mut recv = Box::pin(rx.recv());
154
155 let mut task1 = task::spawn(&mut recv);
156 assert_pending!(task1.poll());
157
158 let mut task2 = task::spawn(&mut recv);
159 assert_pending!(task2.poll());
160
161 tx.send("hello").unwrap();
162
163 assert!(task2.is_woken());
164}
165
166#[test]
167fn send_slow_rx() {
168 let (tx, mut rx1) = broadcast::channel(16);
169 let mut rx2 = tx.subscribe();
170
171 {
172 let mut recv2 = task::spawn(rx2.recv());
173
174 {
175 let mut recv1 = task::spawn(rx1.recv());
176
177 assert_pending!(recv1.poll());
178 assert_pending!(recv2.poll());
179
180 assert_ok!(tx.send("one"));
181
182 assert!(recv1.is_woken());
183 assert!(recv2.is_woken());
184
185 assert_ok!(tx.send("two"));
186
187 let val = assert_ready_ok!(recv1.poll());
188 assert_eq!(val, "one");
189 }
190
191 let val = assert_ready_ok!(task::spawn(rx1.recv()).poll());
192 assert_eq!(val, "two");
193
194 let mut recv1 = task::spawn(rx1.recv());
195
196 assert_pending!(recv1.poll());
197
198 assert_ok!(tx.send("three"));
199
200 assert!(recv1.is_woken());
201
202 let val = assert_ready_ok!(recv1.poll());
203 assert_eq!(val, "three");
204
205 let val = assert_ready_ok!(recv2.poll());
206 assert_eq!(val, "one");
207 }
208
209 let val = assert_recv!(rx2);
210 assert_eq!(val, "two");
211
212 let val = assert_recv!(rx2);
213 assert_eq!(val, "three");
214}
215
216#[test]
217fn drop_rx_while_values_remain() {
218 let (tx, mut rx1) = broadcast::channel(16);
219 let mut rx2 = tx.subscribe();
220
221 assert_ok!(tx.send("one"));
222 assert_ok!(tx.send("two"));
223
224 assert_recv!(rx1);
225 assert_recv!(rx2);
226
227 drop(rx2);
228 drop(rx1);
229}
230
231#[test]
232fn lagging_rx() {
233 let (tx, mut rx1) = broadcast::channel(2);
234 let mut rx2 = tx.subscribe();
235
236 assert_ok!(tx.send("one"));
237 assert_ok!(tx.send("two"));
238
239 assert_eq!("one", assert_recv!(rx1));
240
241 assert_ok!(tx.send("three"));
242
243 // Lagged too far
244 let x = dbg!(rx2.try_recv());
245 assert_lagged!(x, 1);
246
247 // Calling again gets the next value
248 assert_eq!("two", assert_recv!(rx2));
249
250 assert_eq!("two", assert_recv!(rx1));
251 assert_eq!("three", assert_recv!(rx1));
252
253 assert_ok!(tx.send("four"));
254 assert_ok!(tx.send("five"));
255
256 assert_lagged!(rx2.try_recv(), 1);
257
258 assert_ok!(tx.send("six"));
259
260 assert_lagged!(rx2.try_recv(), 1);
261}
262
263#[test]
264fn send_no_rx() {
265 let (tx, _) = broadcast::channel(16);
266
267 assert_err!(tx.send("hello"));
268
269 let mut rx = tx.subscribe();
270
271 assert_ok!(tx.send("world"));
272
273 let val = assert_recv!(rx);
274 assert_eq!("world", val);
275}
276
277#[test]
278#[should_panic]
279#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
280fn zero_capacity() {
281 broadcast::channel::<()>(0);
282}
283
284#[test]
285#[should_panic]
286#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
287fn capacity_too_big() {
288 use std::usize;
289
290 broadcast::channel::<()>(1 + (usize::MAX >> 1));
291}
292
293#[test]
294#[cfg(panic = "unwind")]
295#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
296fn panic_in_clone() {
297 use std::panic::{self, AssertUnwindSafe};
298
299 #[derive(Eq, PartialEq, Debug)]
300 struct MyVal(usize);
301
302 impl Clone for MyVal {
303 fn clone(&self) -> MyVal {
304 assert_ne!(0, self.0);
305 MyVal(self.0)
306 }
307 }
308
309 let (tx, mut rx) = broadcast::channel(16);
310
311 assert_ok!(tx.send(MyVal(0)));
312 assert_ok!(tx.send(MyVal(1)));
313
314 let res = panic::catch_unwind(AssertUnwindSafe(|| {
315 let _ = rx.try_recv();
316 }));
317
318 assert_err!(res);
319
320 let val = assert_recv!(rx);
321 assert_eq!(val, MyVal(1));
322}
323
324#[test]
325fn dropping_tx_notifies_rx() {
326 let (tx, mut rx1) = broadcast::channel::<()>(16);
327 let mut rx2 = tx.subscribe();
328
329 let tx2 = tx.clone();
330
331 let mut recv1 = task::spawn(rx1.recv());
332 let mut recv2 = task::spawn(rx2.recv());
333
334 assert_pending!(recv1.poll());
335 assert_pending!(recv2.poll());
336
337 drop(tx);
338
339 assert_pending!(recv1.poll());
340 assert_pending!(recv2.poll());
341
342 drop(tx2);
343
344 assert!(recv1.is_woken());
345 assert!(recv2.is_woken());
346
347 let err = assert_ready_err!(recv1.poll());
348 assert!(is_closed(err));
349
350 let err = assert_ready_err!(recv2.poll());
351 assert!(is_closed(err));
352}
353
354#[test]
355fn unconsumed_messages_are_dropped() {
356 let (tx, rx) = broadcast::channel(16);
357
358 let msg = Arc::new(());
359
360 assert_ok!(tx.send(msg.clone()));
361
362 assert_eq!(2, Arc::strong_count(&msg));
363
364 drop(rx);
365
366 assert_eq!(1, Arc::strong_count(&msg));
367}
368
369#[test]
370fn single_capacity_recvs() {
371 let (tx, mut rx) = broadcast::channel(1);
372
373 assert_ok!(tx.send(1));
374
375 assert_eq!(assert_recv!(rx), 1);
376 assert_empty!(rx);
377}
378
379#[test]
380fn single_capacity_recvs_after_drop_1() {
381 let (tx, mut rx) = broadcast::channel(1);
382
383 assert_ok!(tx.send(1));
384 drop(tx);
385
386 assert_eq!(assert_recv!(rx), 1);
387 assert_closed!(rx.try_recv());
388}
389
390#[test]
391fn single_capacity_recvs_after_drop_2() {
392 let (tx, mut rx) = broadcast::channel(1);
393
394 assert_ok!(tx.send(1));
395 assert_ok!(tx.send(2));
396 drop(tx);
397
398 assert_lagged!(rx.try_recv(), 1);
399 assert_eq!(assert_recv!(rx), 2);
400 assert_closed!(rx.try_recv());
401}
402
403#[test]
404fn dropping_sender_does_not_overwrite() {
405 let (tx, mut rx) = broadcast::channel(2);
406
407 assert_ok!(tx.send(1));
408 assert_ok!(tx.send(2));
409 drop(tx);
410
411 assert_eq!(assert_recv!(rx), 1);
412 assert_eq!(assert_recv!(rx), 2);
413 assert_closed!(rx.try_recv());
414}
415
416#[test]
417fn lagging_receiver_recovers_after_wrap_closed_1() {
418 let (tx, mut rx) = broadcast::channel(2);
419
420 assert_ok!(tx.send(1));
421 assert_ok!(tx.send(2));
422 assert_ok!(tx.send(3));
423 drop(tx);
424
425 assert_lagged!(rx.try_recv(), 1);
426 assert_eq!(assert_recv!(rx), 2);
427 assert_eq!(assert_recv!(rx), 3);
428 assert_closed!(rx.try_recv());
429}
430
431#[test]
432fn lagging_receiver_recovers_after_wrap_closed_2() {
433 let (tx, mut rx) = broadcast::channel(2);
434
435 assert_ok!(tx.send(1));
436 assert_ok!(tx.send(2));
437 assert_ok!(tx.send(3));
438 assert_ok!(tx.send(4));
439 drop(tx);
440
441 assert_lagged!(rx.try_recv(), 2);
442 assert_eq!(assert_recv!(rx), 3);
443 assert_eq!(assert_recv!(rx), 4);
444 assert_closed!(rx.try_recv());
445}
446
447#[test]
448fn lagging_receiver_recovers_after_wrap_open() {
449 let (tx, mut rx) = broadcast::channel(2);
450
451 assert_ok!(tx.send(1));
452 assert_ok!(tx.send(2));
453 assert_ok!(tx.send(3));
454
455 assert_lagged!(rx.try_recv(), 1);
456 assert_eq!(assert_recv!(rx), 2);
457 assert_eq!(assert_recv!(rx), 3);
458 assert_empty!(rx);
459}
460
461#[test]
462fn receiver_len_with_lagged() {
463 let (tx, mut rx) = broadcast::channel(3);
464
465 tx.send(10).unwrap();
466 tx.send(20).unwrap();
467 tx.send(30).unwrap();
468 tx.send(40).unwrap();
469
470 assert_eq!(rx.len(), 4);
471 assert_eq!(assert_recv!(rx), 10);
472
473 tx.send(50).unwrap();
474 tx.send(60).unwrap();
475
476 assert_eq!(rx.len(), 5);
477 assert_lagged!(rx.try_recv(), 1);
478}
479
480fn is_closed(err: broadcast::error::RecvError) -> bool {
481 matches!(err, broadcast::error::RecvError::Closed)
482}
483
484#[test]
485fn resubscribe_points_to_tail() {
486 let (tx, mut rx) = broadcast::channel(3);
487 tx.send(1).unwrap();
488
489 let mut rx_resub = rx.resubscribe();
490
491 // verify we're one behind at the start
492 assert_empty!(rx_resub);
493 assert_eq!(assert_recv!(rx), 1);
494
495 // verify we do not affect rx
496 tx.send(2).unwrap();
497 assert_eq!(assert_recv!(rx_resub), 2);
498 tx.send(3).unwrap();
499 assert_eq!(assert_recv!(rx), 2);
500 assert_eq!(assert_recv!(rx), 3);
501 assert_empty!(rx);
502
503 assert_eq!(assert_recv!(rx_resub), 3);
504 assert_empty!(rx_resub);
505}
506
507#[test]
508fn resubscribe_lagged() {
509 let (tx, mut rx) = broadcast::channel(1);
510 tx.send(1).unwrap();
511 tx.send(2).unwrap();
512
513 let mut rx_resub = rx.resubscribe();
514 assert_lagged!(rx.try_recv(), 1);
515 assert_empty!(rx_resub);
516
517 assert_eq!(assert_recv!(rx), 2);
518 assert_empty!(rx);
519 assert_empty!(rx_resub);
520}
521
522#[test]
523fn resubscribe_to_closed_channel() {
524 let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2);
525 drop(tx);
526
527 let mut rx_resub = rx.resubscribe();
528 assert_closed!(rx_resub.try_recv());
529}
530
531#[test]
532fn sender_len() {
533 let (tx, mut rx1) = broadcast::channel(4);
534 let mut rx2 = tx.subscribe();
535
536 assert_eq!(tx.len(), 0);
537 assert!(tx.is_empty());
538
539 tx.send(1).unwrap();
540 tx.send(2).unwrap();
541 tx.send(3).unwrap();
542
543 assert_eq!(tx.len(), 3);
544 assert!(!tx.is_empty());
545
546 assert_recv!(rx1);
547 assert_recv!(rx1);
548
549 assert_eq!(tx.len(), 3);
550 assert!(!tx.is_empty());
551
552 assert_recv!(rx2);
553
554 assert_eq!(tx.len(), 2);
555 assert!(!tx.is_empty());
556
557 tx.send(4).unwrap();
558 tx.send(5).unwrap();
559 tx.send(6).unwrap();
560
561 assert_eq!(tx.len(), 4);
562 assert!(!tx.is_empty());
563}
564
565#[test]
566#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
567fn sender_len_random() {
568 use rand::Rng;
569
570 let (tx, mut rx1) = broadcast::channel(16);
571 let mut rx2 = tx.subscribe();
572
573 for _ in 0..1000 {
574 match rand::thread_rng().gen_range(0..4) {
575 0 => {
576 let _ = rx1.try_recv();
577 }
578 1 => {
579 let _ = rx2.try_recv();
580 }
581 _ => {
582 tx.send(0).unwrap();
583 }
584 }
585
586 let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
587 assert_eq!(tx.len(), expected_len);
588 }
589}
590
591#[test]
592fn send_in_waker_drop() {
593 use futures::task::ArcWake;
594 use std::future::Future;
595 use std::task::Context;
596
597 struct SendOnDrop(broadcast::Sender<()>);
598
599 impl Drop for SendOnDrop {
600 fn drop(&mut self) {
601 let _ = self.0.send(());
602 }
603 }
604
605 impl ArcWake for SendOnDrop {
606 fn wake_by_ref(_arc_self: &Arc<Self>) {}
607 }
608
609 // Test if there is no deadlock when replacing the old waker.
610
611 let (tx, mut rx) = broadcast::channel(16);
612
613 let mut fut = Box::pin(async {
614 let _ = rx.recv().await;
615 });
616
617 // Store our special waker in the receiving future.
618 let waker = futures::task::waker(Arc::new(SendOnDrop(tx)));
619 let mut cx = Context::from_waker(&waker);
620 assert!(fut.as_mut().poll(&mut cx).is_pending());
621 drop(waker);
622
623 // Second poll shouldn't deadlock.
624 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
625 let _ = fut.as_mut().poll(&mut cx);
626
627 // Test if there is no deadlock when calling waker.wake().
628
629 let (tx, mut rx) = broadcast::channel(16);
630
631 let mut fut = Box::pin(async {
632 let _ = rx.recv().await;
633 });
634
635 // Store our special waker in the receiving future.
636 let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone())));
637 let mut cx = Context::from_waker(&waker);
638 assert!(fut.as_mut().poll(&mut cx).is_pending());
639 drop(waker);
640
641 // Shouldn't deadlock.
642 let _ = tx.send(());
643}
644