1 | #![allow (clippy::cognitive_complexity)] |
2 | #![warn (rust_2018_idioms)] |
3 | #![cfg (feature = "sync" )] |
4 | |
5 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
6 | use wasm_bindgen_test::wasm_bindgen_test as test; |
7 | |
8 | use tokio::sync::broadcast; |
9 | use tokio_test::task; |
10 | use tokio_test::{ |
11 | assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, |
12 | }; |
13 | |
14 | use std::sync::Arc; |
15 | |
16 | macro_rules! assert_recv { |
17 | ($e:expr) => { |
18 | match $e.try_recv() { |
19 | Ok(value) => value, |
20 | Err(e) => panic!("expected recv; got = {:?}" , e), |
21 | } |
22 | }; |
23 | } |
24 | |
25 | macro_rules! assert_empty { |
26 | ($e:expr) => { |
27 | match $e.try_recv() { |
28 | Ok(value) => panic!("expected empty; got = {:?}" , value), |
29 | Err(broadcast::error::TryRecvError::Empty) => {} |
30 | Err(e) => panic!("expected empty; got = {:?}" , e), |
31 | } |
32 | }; |
33 | } |
34 | |
35 | macro_rules! assert_lagged { |
36 | ($e:expr, $n:expr) => { |
37 | match assert_err!($e) { |
38 | broadcast::error::TryRecvError::Lagged(n) => { |
39 | assert_eq!(n, $n); |
40 | } |
41 | _ => panic!("did not lag" ), |
42 | } |
43 | }; |
44 | } |
45 | |
46 | macro_rules! assert_closed { |
47 | ($e:expr) => { |
48 | match assert_err!($e) { |
49 | broadcast::error::TryRecvError::Closed => {} |
50 | _ => panic!("is not closed" ), |
51 | } |
52 | }; |
53 | } |
54 | |
55 | trait AssertSend: Send + Sync {} |
56 | impl AssertSend for broadcast::Sender<i32> {} |
57 | impl AssertSend for broadcast::Receiver<i32> {} |
58 | |
59 | #[test] |
60 | fn send_try_recv_bounded() { |
61 | let (tx, mut rx) = broadcast::channel(16); |
62 | |
63 | assert_empty!(rx); |
64 | |
65 | let n = assert_ok!(tx.send("hello" )); |
66 | assert_eq!(n, 1); |
67 | |
68 | let val = assert_recv!(rx); |
69 | assert_eq!(val, "hello" ); |
70 | |
71 | assert_empty!(rx); |
72 | } |
73 | |
74 | #[test] |
75 | fn send_two_recv() { |
76 | let (tx, mut rx1) = broadcast::channel(16); |
77 | let mut rx2 = tx.subscribe(); |
78 | |
79 | assert_empty!(rx1); |
80 | assert_empty!(rx2); |
81 | |
82 | let n = assert_ok!(tx.send("hello" )); |
83 | assert_eq!(n, 2); |
84 | |
85 | let val = assert_recv!(rx1); |
86 | assert_eq!(val, "hello" ); |
87 | |
88 | let val = assert_recv!(rx2); |
89 | assert_eq!(val, "hello" ); |
90 | |
91 | assert_empty!(rx1); |
92 | assert_empty!(rx2); |
93 | } |
94 | |
95 | #[test] |
96 | fn send_recv_bounded() { |
97 | let (tx, mut rx) = broadcast::channel(16); |
98 | |
99 | let mut recv = task::spawn(rx.recv()); |
100 | |
101 | assert_pending!(recv.poll()); |
102 | |
103 | assert_ok!(tx.send("hello" )); |
104 | |
105 | assert!(recv.is_woken()); |
106 | let val = assert_ready_ok!(recv.poll()); |
107 | assert_eq!(val, "hello" ); |
108 | } |
109 | |
110 | #[test] |
111 | fn send_two_recv_bounded() { |
112 | let (tx, mut rx1) = broadcast::channel(16); |
113 | let mut rx2 = tx.subscribe(); |
114 | |
115 | let mut recv1 = task::spawn(rx1.recv()); |
116 | let mut recv2 = task::spawn(rx2.recv()); |
117 | |
118 | assert_pending!(recv1.poll()); |
119 | assert_pending!(recv2.poll()); |
120 | |
121 | assert_ok!(tx.send("hello" )); |
122 | |
123 | assert!(recv1.is_woken()); |
124 | assert!(recv2.is_woken()); |
125 | |
126 | let val1 = assert_ready_ok!(recv1.poll()); |
127 | let val2 = assert_ready_ok!(recv2.poll()); |
128 | assert_eq!(val1, "hello" ); |
129 | assert_eq!(val2, "hello" ); |
130 | |
131 | drop((recv1, recv2)); |
132 | |
133 | let mut recv1 = task::spawn(rx1.recv()); |
134 | let mut recv2 = task::spawn(rx2.recv()); |
135 | |
136 | assert_pending!(recv1.poll()); |
137 | |
138 | assert_ok!(tx.send("world" )); |
139 | |
140 | assert!(recv1.is_woken()); |
141 | assert!(!recv2.is_woken()); |
142 | |
143 | let val1 = assert_ready_ok!(recv1.poll()); |
144 | let val2 = assert_ready_ok!(recv2.poll()); |
145 | assert_eq!(val1, "world" ); |
146 | assert_eq!(val2, "world" ); |
147 | } |
148 | |
149 | #[test] |
150 | fn change_tasks() { |
151 | let (tx, mut rx) = broadcast::channel(1); |
152 | |
153 | let mut recv = Box::pin(rx.recv()); |
154 | |
155 | let mut task1 = task::spawn(&mut recv); |
156 | assert_pending!(task1.poll()); |
157 | |
158 | let mut task2 = task::spawn(&mut recv); |
159 | assert_pending!(task2.poll()); |
160 | |
161 | tx.send("hello" ).unwrap(); |
162 | |
163 | assert!(task2.is_woken()); |
164 | } |
165 | |
166 | #[test] |
167 | fn send_slow_rx() { |
168 | let (tx, mut rx1) = broadcast::channel(16); |
169 | let mut rx2 = tx.subscribe(); |
170 | |
171 | { |
172 | let mut recv2 = task::spawn(rx2.recv()); |
173 | |
174 | { |
175 | let mut recv1 = task::spawn(rx1.recv()); |
176 | |
177 | assert_pending!(recv1.poll()); |
178 | assert_pending!(recv2.poll()); |
179 | |
180 | assert_ok!(tx.send("one" )); |
181 | |
182 | assert!(recv1.is_woken()); |
183 | assert!(recv2.is_woken()); |
184 | |
185 | assert_ok!(tx.send("two" )); |
186 | |
187 | let val = assert_ready_ok!(recv1.poll()); |
188 | assert_eq!(val, "one" ); |
189 | } |
190 | |
191 | let val = assert_ready_ok!(task::spawn(rx1.recv()).poll()); |
192 | assert_eq!(val, "two" ); |
193 | |
194 | let mut recv1 = task::spawn(rx1.recv()); |
195 | |
196 | assert_pending!(recv1.poll()); |
197 | |
198 | assert_ok!(tx.send("three" )); |
199 | |
200 | assert!(recv1.is_woken()); |
201 | |
202 | let val = assert_ready_ok!(recv1.poll()); |
203 | assert_eq!(val, "three" ); |
204 | |
205 | let val = assert_ready_ok!(recv2.poll()); |
206 | assert_eq!(val, "one" ); |
207 | } |
208 | |
209 | let val = assert_recv!(rx2); |
210 | assert_eq!(val, "two" ); |
211 | |
212 | let val = assert_recv!(rx2); |
213 | assert_eq!(val, "three" ); |
214 | } |
215 | |
216 | #[test] |
217 | fn drop_rx_while_values_remain() { |
218 | let (tx, mut rx1) = broadcast::channel(16); |
219 | let mut rx2 = tx.subscribe(); |
220 | |
221 | assert_ok!(tx.send("one" )); |
222 | assert_ok!(tx.send("two" )); |
223 | |
224 | assert_recv!(rx1); |
225 | assert_recv!(rx2); |
226 | |
227 | drop(rx2); |
228 | drop(rx1); |
229 | } |
230 | |
231 | #[test] |
232 | fn lagging_rx() { |
233 | let (tx, mut rx1) = broadcast::channel(2); |
234 | let mut rx2 = tx.subscribe(); |
235 | |
236 | assert_ok!(tx.send("one" )); |
237 | assert_ok!(tx.send("two" )); |
238 | |
239 | assert_eq!("one" , assert_recv!(rx1)); |
240 | |
241 | assert_ok!(tx.send("three" )); |
242 | |
243 | // Lagged too far |
244 | let x = dbg!(rx2.try_recv()); |
245 | assert_lagged!(x, 1); |
246 | |
247 | // Calling again gets the next value |
248 | assert_eq!("two" , assert_recv!(rx2)); |
249 | |
250 | assert_eq!("two" , assert_recv!(rx1)); |
251 | assert_eq!("three" , assert_recv!(rx1)); |
252 | |
253 | assert_ok!(tx.send("four" )); |
254 | assert_ok!(tx.send("five" )); |
255 | |
256 | assert_lagged!(rx2.try_recv(), 1); |
257 | |
258 | assert_ok!(tx.send("six" )); |
259 | |
260 | assert_lagged!(rx2.try_recv(), 1); |
261 | } |
262 | |
263 | #[test] |
264 | fn send_no_rx() { |
265 | let (tx, _) = broadcast::channel(16); |
266 | |
267 | assert_err!(tx.send("hello" )); |
268 | |
269 | let mut rx = tx.subscribe(); |
270 | |
271 | assert_ok!(tx.send("world" )); |
272 | |
273 | let val = assert_recv!(rx); |
274 | assert_eq!("world" , val); |
275 | } |
276 | |
277 | #[test] |
278 | #[should_panic ] |
279 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
280 | fn zero_capacity() { |
281 | broadcast::channel::<()>(0); |
282 | } |
283 | |
284 | #[test] |
285 | #[should_panic ] |
286 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
287 | fn capacity_too_big() { |
288 | use std::usize; |
289 | |
290 | broadcast::channel::<()>(1 + (usize::MAX >> 1)); |
291 | } |
292 | |
293 | #[test] |
294 | #[cfg (panic = "unwind" )] |
295 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
296 | fn panic_in_clone() { |
297 | use std::panic::{self, AssertUnwindSafe}; |
298 | |
299 | #[derive(Eq, PartialEq, Debug)] |
300 | struct MyVal(usize); |
301 | |
302 | impl Clone for MyVal { |
303 | fn clone(&self) -> MyVal { |
304 | assert_ne!(0, self.0); |
305 | MyVal(self.0) |
306 | } |
307 | } |
308 | |
309 | let (tx, mut rx) = broadcast::channel(16); |
310 | |
311 | assert_ok!(tx.send(MyVal(0))); |
312 | assert_ok!(tx.send(MyVal(1))); |
313 | |
314 | let res = panic::catch_unwind(AssertUnwindSafe(|| { |
315 | let _ = rx.try_recv(); |
316 | })); |
317 | |
318 | assert_err!(res); |
319 | |
320 | let val = assert_recv!(rx); |
321 | assert_eq!(val, MyVal(1)); |
322 | } |
323 | |
324 | #[test] |
325 | fn dropping_tx_notifies_rx() { |
326 | let (tx, mut rx1) = broadcast::channel::<()>(16); |
327 | let mut rx2 = tx.subscribe(); |
328 | |
329 | let tx2 = tx.clone(); |
330 | |
331 | let mut recv1 = task::spawn(rx1.recv()); |
332 | let mut recv2 = task::spawn(rx2.recv()); |
333 | |
334 | assert_pending!(recv1.poll()); |
335 | assert_pending!(recv2.poll()); |
336 | |
337 | drop(tx); |
338 | |
339 | assert_pending!(recv1.poll()); |
340 | assert_pending!(recv2.poll()); |
341 | |
342 | drop(tx2); |
343 | |
344 | assert!(recv1.is_woken()); |
345 | assert!(recv2.is_woken()); |
346 | |
347 | let err = assert_ready_err!(recv1.poll()); |
348 | assert!(is_closed(err)); |
349 | |
350 | let err = assert_ready_err!(recv2.poll()); |
351 | assert!(is_closed(err)); |
352 | } |
353 | |
354 | #[test] |
355 | fn unconsumed_messages_are_dropped() { |
356 | let (tx, rx) = broadcast::channel(16); |
357 | |
358 | let msg = Arc::new(()); |
359 | |
360 | assert_ok!(tx.send(msg.clone())); |
361 | |
362 | assert_eq!(2, Arc::strong_count(&msg)); |
363 | |
364 | drop(rx); |
365 | |
366 | assert_eq!(1, Arc::strong_count(&msg)); |
367 | } |
368 | |
369 | #[test] |
370 | fn single_capacity_recvs() { |
371 | let (tx, mut rx) = broadcast::channel(1); |
372 | |
373 | assert_ok!(tx.send(1)); |
374 | |
375 | assert_eq!(assert_recv!(rx), 1); |
376 | assert_empty!(rx); |
377 | } |
378 | |
379 | #[test] |
380 | fn single_capacity_recvs_after_drop_1() { |
381 | let (tx, mut rx) = broadcast::channel(1); |
382 | |
383 | assert_ok!(tx.send(1)); |
384 | drop(tx); |
385 | |
386 | assert_eq!(assert_recv!(rx), 1); |
387 | assert_closed!(rx.try_recv()); |
388 | } |
389 | |
390 | #[test] |
391 | fn single_capacity_recvs_after_drop_2() { |
392 | let (tx, mut rx) = broadcast::channel(1); |
393 | |
394 | assert_ok!(tx.send(1)); |
395 | assert_ok!(tx.send(2)); |
396 | drop(tx); |
397 | |
398 | assert_lagged!(rx.try_recv(), 1); |
399 | assert_eq!(assert_recv!(rx), 2); |
400 | assert_closed!(rx.try_recv()); |
401 | } |
402 | |
403 | #[test] |
404 | fn dropping_sender_does_not_overwrite() { |
405 | let (tx, mut rx) = broadcast::channel(2); |
406 | |
407 | assert_ok!(tx.send(1)); |
408 | assert_ok!(tx.send(2)); |
409 | drop(tx); |
410 | |
411 | assert_eq!(assert_recv!(rx), 1); |
412 | assert_eq!(assert_recv!(rx), 2); |
413 | assert_closed!(rx.try_recv()); |
414 | } |
415 | |
416 | #[test] |
417 | fn lagging_receiver_recovers_after_wrap_closed_1() { |
418 | let (tx, mut rx) = broadcast::channel(2); |
419 | |
420 | assert_ok!(tx.send(1)); |
421 | assert_ok!(tx.send(2)); |
422 | assert_ok!(tx.send(3)); |
423 | drop(tx); |
424 | |
425 | assert_lagged!(rx.try_recv(), 1); |
426 | assert_eq!(assert_recv!(rx), 2); |
427 | assert_eq!(assert_recv!(rx), 3); |
428 | assert_closed!(rx.try_recv()); |
429 | } |
430 | |
431 | #[test] |
432 | fn lagging_receiver_recovers_after_wrap_closed_2() { |
433 | let (tx, mut rx) = broadcast::channel(2); |
434 | |
435 | assert_ok!(tx.send(1)); |
436 | assert_ok!(tx.send(2)); |
437 | assert_ok!(tx.send(3)); |
438 | assert_ok!(tx.send(4)); |
439 | drop(tx); |
440 | |
441 | assert_lagged!(rx.try_recv(), 2); |
442 | assert_eq!(assert_recv!(rx), 3); |
443 | assert_eq!(assert_recv!(rx), 4); |
444 | assert_closed!(rx.try_recv()); |
445 | } |
446 | |
447 | #[test] |
448 | fn lagging_receiver_recovers_after_wrap_open() { |
449 | let (tx, mut rx) = broadcast::channel(2); |
450 | |
451 | assert_ok!(tx.send(1)); |
452 | assert_ok!(tx.send(2)); |
453 | assert_ok!(tx.send(3)); |
454 | |
455 | assert_lagged!(rx.try_recv(), 1); |
456 | assert_eq!(assert_recv!(rx), 2); |
457 | assert_eq!(assert_recv!(rx), 3); |
458 | assert_empty!(rx); |
459 | } |
460 | |
461 | #[test] |
462 | fn receiver_len_with_lagged() { |
463 | let (tx, mut rx) = broadcast::channel(3); |
464 | |
465 | tx.send(10).unwrap(); |
466 | tx.send(20).unwrap(); |
467 | tx.send(30).unwrap(); |
468 | tx.send(40).unwrap(); |
469 | |
470 | assert_eq!(rx.len(), 4); |
471 | assert_eq!(assert_recv!(rx), 10); |
472 | |
473 | tx.send(50).unwrap(); |
474 | tx.send(60).unwrap(); |
475 | |
476 | assert_eq!(rx.len(), 5); |
477 | assert_lagged!(rx.try_recv(), 1); |
478 | } |
479 | |
480 | fn is_closed(err: broadcast::error::RecvError) -> bool { |
481 | matches!(err, broadcast::error::RecvError::Closed) |
482 | } |
483 | |
484 | #[test] |
485 | fn resubscribe_points_to_tail() { |
486 | let (tx, mut rx) = broadcast::channel(3); |
487 | tx.send(1).unwrap(); |
488 | |
489 | let mut rx_resub = rx.resubscribe(); |
490 | |
491 | // verify we're one behind at the start |
492 | assert_empty!(rx_resub); |
493 | assert_eq!(assert_recv!(rx), 1); |
494 | |
495 | // verify we do not affect rx |
496 | tx.send(2).unwrap(); |
497 | assert_eq!(assert_recv!(rx_resub), 2); |
498 | tx.send(3).unwrap(); |
499 | assert_eq!(assert_recv!(rx), 2); |
500 | assert_eq!(assert_recv!(rx), 3); |
501 | assert_empty!(rx); |
502 | |
503 | assert_eq!(assert_recv!(rx_resub), 3); |
504 | assert_empty!(rx_resub); |
505 | } |
506 | |
507 | #[test] |
508 | fn resubscribe_lagged() { |
509 | let (tx, mut rx) = broadcast::channel(1); |
510 | tx.send(1).unwrap(); |
511 | tx.send(2).unwrap(); |
512 | |
513 | let mut rx_resub = rx.resubscribe(); |
514 | assert_lagged!(rx.try_recv(), 1); |
515 | assert_empty!(rx_resub); |
516 | |
517 | assert_eq!(assert_recv!(rx), 2); |
518 | assert_empty!(rx); |
519 | assert_empty!(rx_resub); |
520 | } |
521 | |
522 | #[test] |
523 | fn resubscribe_to_closed_channel() { |
524 | let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2); |
525 | drop(tx); |
526 | |
527 | let mut rx_resub = rx.resubscribe(); |
528 | assert_closed!(rx_resub.try_recv()); |
529 | } |
530 | |
531 | #[test] |
532 | fn sender_len() { |
533 | let (tx, mut rx1) = broadcast::channel(4); |
534 | let mut rx2 = tx.subscribe(); |
535 | |
536 | assert_eq!(tx.len(), 0); |
537 | assert!(tx.is_empty()); |
538 | |
539 | tx.send(1).unwrap(); |
540 | tx.send(2).unwrap(); |
541 | tx.send(3).unwrap(); |
542 | |
543 | assert_eq!(tx.len(), 3); |
544 | assert!(!tx.is_empty()); |
545 | |
546 | assert_recv!(rx1); |
547 | assert_recv!(rx1); |
548 | |
549 | assert_eq!(tx.len(), 3); |
550 | assert!(!tx.is_empty()); |
551 | |
552 | assert_recv!(rx2); |
553 | |
554 | assert_eq!(tx.len(), 2); |
555 | assert!(!tx.is_empty()); |
556 | |
557 | tx.send(4).unwrap(); |
558 | tx.send(5).unwrap(); |
559 | tx.send(6).unwrap(); |
560 | |
561 | assert_eq!(tx.len(), 4); |
562 | assert!(!tx.is_empty()); |
563 | } |
564 | |
565 | #[test] |
566 | #[cfg (not(all(target_family = "wasm" , not(target_os = "wasi" ))))] |
567 | fn sender_len_random() { |
568 | use rand::Rng; |
569 | |
570 | let (tx, mut rx1) = broadcast::channel(16); |
571 | let mut rx2 = tx.subscribe(); |
572 | |
573 | for _ in 0..1000 { |
574 | match rand::thread_rng().gen_range(0..4) { |
575 | 0 => { |
576 | let _ = rx1.try_recv(); |
577 | } |
578 | 1 => { |
579 | let _ = rx2.try_recv(); |
580 | } |
581 | _ => { |
582 | tx.send(0).unwrap(); |
583 | } |
584 | } |
585 | |
586 | let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16); |
587 | assert_eq!(tx.len(), expected_len); |
588 | } |
589 | } |
590 | |
591 | #[test] |
592 | fn send_in_waker_drop() { |
593 | use futures::task::ArcWake; |
594 | use std::future::Future; |
595 | use std::task::Context; |
596 | |
597 | struct SendOnDrop(broadcast::Sender<()>); |
598 | |
599 | impl Drop for SendOnDrop { |
600 | fn drop(&mut self) { |
601 | let _ = self.0.send(()); |
602 | } |
603 | } |
604 | |
605 | impl ArcWake for SendOnDrop { |
606 | fn wake_by_ref(_arc_self: &Arc<Self>) {} |
607 | } |
608 | |
609 | // Test if there is no deadlock when replacing the old waker. |
610 | |
611 | let (tx, mut rx) = broadcast::channel(16); |
612 | |
613 | let mut fut = Box::pin(async { |
614 | let _ = rx.recv().await; |
615 | }); |
616 | |
617 | // Store our special waker in the receiving future. |
618 | let waker = futures::task::waker(Arc::new(SendOnDrop(tx))); |
619 | let mut cx = Context::from_waker(&waker); |
620 | assert!(fut.as_mut().poll(&mut cx).is_pending()); |
621 | drop(waker); |
622 | |
623 | // Second poll shouldn't deadlock. |
624 | let mut cx = Context::from_waker(futures::task::noop_waker_ref()); |
625 | let _ = fut.as_mut().poll(&mut cx); |
626 | |
627 | // Test if there is no deadlock when calling waker.wake(). |
628 | |
629 | let (tx, mut rx) = broadcast::channel(16); |
630 | |
631 | let mut fut = Box::pin(async { |
632 | let _ = rx.recv().await; |
633 | }); |
634 | |
635 | // Store our special waker in the receiving future. |
636 | let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone()))); |
637 | let mut cx = Context::from_waker(&waker); |
638 | assert!(fut.as_mut().poll(&mut cx).is_pending()); |
639 | drop(waker); |
640 | |
641 | // Shouldn't deadlock. |
642 | let _ = tx.send(()); |
643 | } |
644 | |