1#![allow(clippy::redundant_clone)]
2#![warn(rust_2018_idioms)]
3#![cfg(feature = "sync")]
4
5#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6use wasm_bindgen_test::wasm_bindgen_test as test;
7#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
8use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
9
10#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
11use tokio::test as maybe_tokio_test;
12
13use std::fmt;
14use std::sync::Arc;
15use tokio::sync::mpsc;
16use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
17use tokio_test::*;
18
19#[cfg(not(target_family = "wasm"))]
20mod support {
21 pub(crate) mod mpsc_stream;
22}
23
24trait AssertSend: Send {}
25impl AssertSend for mpsc::Sender<i32> {}
26impl AssertSend for mpsc::Receiver<i32> {}
27
28#[maybe_tokio_test]
29async fn send_recv_with_buffer() {
30 let (tx, mut rx) = mpsc::channel::<i32>(16);
31
32 // Using poll_ready / try_send
33 // let permit assert_ready_ok!(tx.reserve());
34 let permit = tx.reserve().await.unwrap();
35 permit.send(1);
36
37 // Without poll_ready
38 tx.try_send(2).unwrap();
39
40 drop(tx);
41
42 let val = rx.recv().await;
43 assert_eq!(val, Some(1));
44
45 let val = rx.recv().await;
46 assert_eq!(val, Some(2));
47
48 let val = rx.recv().await;
49 assert!(val.is_none());
50}
51
52#[tokio::test]
53#[cfg(feature = "full")]
54async fn reserve_disarm() {
55 let (tx, mut rx) = mpsc::channel::<i32>(2);
56 let tx1 = tx.clone();
57 let tx2 = tx.clone();
58 let tx3 = tx.clone();
59 let tx4 = tx;
60
61 // We should be able to `poll_ready` two handles without problem
62 let permit1 = assert_ok!(tx1.reserve().await);
63 let permit2 = assert_ok!(tx2.reserve().await);
64
65 // But a third should not be ready
66 let mut r3 = tokio_test::task::spawn(tx3.reserve());
67 assert_pending!(r3.poll());
68
69 let mut r4 = tokio_test::task::spawn(tx4.reserve());
70 assert_pending!(r4.poll());
71
72 // Using one of the reserved slots should allow a new handle to become ready
73 permit1.send(1);
74
75 // We also need to receive for the slot to be free
76 assert!(!r3.is_woken());
77 rx.recv().await.unwrap();
78 // Now there's a free slot!
79 assert!(r3.is_woken());
80 assert!(!r4.is_woken());
81
82 // Dropping a permit should also open up a slot
83 drop(permit2);
84 assert!(r4.is_woken());
85
86 let mut r1 = tokio_test::task::spawn(tx1.reserve());
87 assert_pending!(r1.poll());
88}
89
90#[tokio::test]
91#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
92async fn send_recv_stream_with_buffer() {
93 use tokio_stream::StreamExt;
94
95 let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16);
96 let mut rx = Box::pin(rx);
97
98 tokio::spawn(async move {
99 assert_ok!(tx.send(1).await);
100 assert_ok!(tx.send(2).await);
101 });
102
103 assert_eq!(Some(1), rx.next().await);
104 assert_eq!(Some(2), rx.next().await);
105 assert_eq!(None, rx.next().await);
106}
107
108#[tokio::test]
109#[cfg(feature = "full")]
110async fn async_send_recv_with_buffer() {
111 let (tx, mut rx) = mpsc::channel(16);
112
113 tokio::spawn(async move {
114 assert_ok!(tx.send(1).await);
115 assert_ok!(tx.send(2).await);
116 });
117
118 assert_eq!(Some(1), rx.recv().await);
119 assert_eq!(Some(2), rx.recv().await);
120 assert_eq!(None, rx.recv().await);
121}
122
123#[tokio::test]
124#[cfg(feature = "full")]
125async fn async_send_recv_many_with_buffer() {
126 let (tx, mut rx) = mpsc::channel(2);
127 let mut buffer = Vec::<i32>::with_capacity(3);
128
129 // With `limit=0` does not sleep, returns immediately
130 assert_eq!(0, rx.recv_many(&mut buffer, 0).await);
131
132 let handle = tokio::spawn(async move {
133 assert_ok!(tx.send(1).await);
134 assert_ok!(tx.send(2).await);
135 assert_ok!(tx.send(7).await);
136 assert_ok!(tx.send(0).await);
137 });
138
139 let limit = 3;
140 let mut recv_count = 0usize;
141 while recv_count < 4 {
142 recv_count += rx.recv_many(&mut buffer, limit).await;
143 assert_eq!(buffer.len(), recv_count);
144 }
145
146 assert_eq!(vec![1, 2, 7, 0], buffer);
147 assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
148 handle.await.unwrap();
149}
150
151#[tokio::test]
152#[cfg(feature = "full")]
153async fn start_send_past_cap() {
154 use std::future::Future;
155
156 let mut t1 = tokio_test::task::spawn(());
157
158 let (tx1, mut rx) = mpsc::channel(1);
159 let tx2 = tx1.clone();
160
161 assert_ok!(tx1.try_send(()));
162
163 let mut r1 = Box::pin(tx1.reserve());
164 t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx)));
165
166 {
167 let mut r2 = tokio_test::task::spawn(tx2.reserve());
168 assert_pending!(r2.poll());
169
170 drop(r1);
171
172 assert!(rx.recv().await.is_some());
173
174 assert!(r2.is_woken());
175 assert!(!t1.is_woken());
176 }
177
178 drop(tx1);
179 drop(tx2);
180
181 assert!(rx.recv().await.is_none());
182}
183
184#[test]
185#[should_panic]
186#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
187fn buffer_gteq_one() {
188 mpsc::channel::<i32>(0);
189}
190
191#[maybe_tokio_test]
192async fn send_recv_unbounded() {
193 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
194
195 // Using `try_send`
196 assert_ok!(tx.send(1));
197 assert_ok!(tx.send(2));
198
199 assert_eq!(rx.recv().await, Some(1));
200 assert_eq!(rx.recv().await, Some(2));
201
202 drop(tx);
203
204 assert!(rx.recv().await.is_none());
205}
206
207#[maybe_tokio_test]
208async fn send_recv_many_unbounded() {
209 let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
210
211 let mut buffer: Vec<i32> = Vec::new();
212
213 // With `limit=0` does not sleep, returns immediately
214 rx.recv_many(&mut buffer, 0).await;
215 assert_eq!(0, buffer.len());
216
217 assert_ok!(tx.send(7));
218 assert_ok!(tx.send(13));
219 assert_ok!(tx.send(100));
220 assert_ok!(tx.send(1002));
221
222 rx.recv_many(&mut buffer, 0).await;
223 assert_eq!(0, buffer.len());
224
225 let mut count = 0;
226 while count < 4 {
227 count += rx.recv_many(&mut buffer, 1).await;
228 }
229 assert_eq!(count, 4);
230 assert_eq!(vec![7, 13, 100, 1002], buffer);
231 let final_capacity = buffer.capacity();
232 assert!(final_capacity > 0);
233
234 buffer.clear();
235
236 assert_ok!(tx.send(5));
237 assert_ok!(tx.send(6));
238 assert_ok!(tx.send(7));
239 assert_ok!(tx.send(2));
240
241 // Re-use existing capacity
242 count = rx.recv_many(&mut buffer, 32).await;
243
244 assert_eq!(final_capacity, buffer.capacity());
245 assert_eq!(count, 4);
246 assert_eq!(vec![5, 6, 7, 2], buffer);
247
248 drop(tx);
249
250 // recv_many will immediately return zero if the channel
251 // is closed and no more messages are waiting
252 assert_eq!(0, rx.recv_many(&mut buffer, 4).await);
253 assert!(rx.recv().await.is_none());
254}
255
256#[tokio::test]
257#[cfg(feature = "full")]
258async fn send_recv_many_bounded_capacity() {
259 let mut buffer: Vec<String> = Vec::with_capacity(9);
260 let limit = buffer.capacity();
261 let (tx, mut rx) = mpsc::channel(100);
262
263 let mut expected: Vec<String> = (0..limit)
264 .map(|x: usize| format!("{x}"))
265 .collect::<Vec<_>>();
266 for x in expected.clone() {
267 tx.send(x).await.unwrap()
268 }
269 tx.send("one more".to_string()).await.unwrap();
270
271 // Here `recv_many` receives all but the last value;
272 // the initial capacity is adequate, so the buffer does
273 // not increase in side.
274 assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
275 assert_eq!(expected, buffer);
276 assert_eq!(limit, buffer.capacity());
277
278 // Receive up more values:
279 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
280 assert!(buffer.capacity() > limit);
281 expected.push("one more".to_string());
282 assert_eq!(expected, buffer);
283
284 tokio::spawn(async move {
285 tx.send("final".to_string()).await.unwrap();
286 });
287
288 // 'tx' is dropped, but `recv_many` is guaranteed not
289 // to return 0 as the channel has outstanding permits
290 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
291 expected.push("final".to_string());
292 assert_eq!(expected, buffer);
293 // The channel is now closed and `recv_many` returns 0.
294 assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
295 assert_eq!(expected, buffer);
296}
297
298#[tokio::test]
299#[cfg(feature = "full")]
300async fn send_recv_many_unbounded_capacity() {
301 let mut buffer: Vec<String> = Vec::with_capacity(9); // capacity >= 9
302 let limit = buffer.capacity();
303 let (tx, mut rx) = mpsc::unbounded_channel();
304
305 let mut expected: Vec<String> = (0..limit)
306 .map(|x: usize| format!("{x}"))
307 .collect::<Vec<_>>();
308 for x in expected.clone() {
309 tx.send(x).unwrap()
310 }
311 tx.send("one more".to_string()).unwrap();
312
313 // Here `recv_many` receives all but the last value;
314 // the initial capacity is adequate, so the buffer does
315 // not increase in side.
316 assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await);
317 assert_eq!(expected, buffer);
318 assert_eq!(limit, buffer.capacity());
319
320 // Receive up more values:
321 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
322 assert!(buffer.capacity() > limit);
323 expected.push("one more".to_string());
324 assert_eq!(expected, buffer);
325
326 tokio::spawn(async move {
327 tx.send("final".to_string()).unwrap();
328 });
329
330 // 'tx' is dropped, but `recv_many` is guaranteed not
331 // to return 0 as the channel has outstanding permits
332 assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
333 expected.push("final".to_string());
334 assert_eq!(expected, buffer);
335 // The channel is now closed and `recv_many` returns 0.
336 assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
337 assert_eq!(expected, buffer);
338}
339
340#[tokio::test]
341#[cfg(feature = "full")]
342async fn async_send_recv_unbounded() {
343 let (tx, mut rx) = mpsc::unbounded_channel();
344
345 tokio::spawn(async move {
346 assert_ok!(tx.send(1));
347 assert_ok!(tx.send(2));
348 });
349
350 assert_eq!(Some(1), rx.recv().await);
351 assert_eq!(Some(2), rx.recv().await);
352 assert_eq!(None, rx.recv().await);
353}
354
355#[tokio::test]
356#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
357async fn send_recv_stream_unbounded() {
358 use tokio_stream::StreamExt;
359
360 let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>();
361
362 let mut rx = Box::pin(rx);
363
364 tokio::spawn(async move {
365 assert_ok!(tx.send(1));
366 assert_ok!(tx.send(2));
367 });
368
369 assert_eq!(Some(1), rx.next().await);
370 assert_eq!(Some(2), rx.next().await);
371 assert_eq!(None, rx.next().await);
372}
373
374#[maybe_tokio_test]
375async fn no_t_bounds_buffer() {
376 struct NoImpls;
377
378 let (tx, mut rx) = mpsc::channel(100);
379
380 // sender should be Debug even though T isn't Debug
381 is_debug(&tx);
382 // same with Receiver
383 is_debug(&rx);
384 // and sender should be Clone even though T isn't Clone
385 assert!(tx.clone().try_send(NoImpls).is_ok());
386
387 assert!(rx.recv().await.is_some());
388}
389
390#[maybe_tokio_test]
391async fn no_t_bounds_unbounded() {
392 struct NoImpls;
393
394 let (tx, mut rx) = mpsc::unbounded_channel();
395
396 // sender should be Debug even though T isn't Debug
397 is_debug(&tx);
398 // same with Receiver
399 is_debug(&rx);
400 // and sender should be Clone even though T isn't Clone
401 assert!(tx.clone().send(NoImpls).is_ok());
402
403 assert!(rx.recv().await.is_some());
404}
405
406#[tokio::test]
407#[cfg(feature = "full")]
408async fn send_recv_buffer_limited() {
409 let (tx, mut rx) = mpsc::channel::<i32>(1);
410
411 // Reserve capacity
412 let p1 = assert_ok!(tx.reserve().await);
413
414 // Send first message
415 p1.send(1);
416
417 // Not ready
418 let mut p2 = tokio_test::task::spawn(tx.reserve());
419 assert_pending!(p2.poll());
420
421 // Take the value
422 assert!(rx.recv().await.is_some());
423
424 // Notified
425 assert!(p2.is_woken());
426
427 // Trying to send fails
428 assert_err!(tx.try_send(1337));
429
430 // Send second
431 let permit = assert_ready_ok!(p2.poll());
432 permit.send(2);
433
434 assert!(rx.recv().await.is_some());
435}
436
437#[maybe_tokio_test]
438async fn recv_close_gets_none_idle() {
439 let (tx, mut rx) = mpsc::channel::<i32>(10);
440
441 rx.close();
442
443 assert!(rx.recv().await.is_none());
444
445 assert_err!(tx.send(1).await);
446}
447
448#[tokio::test]
449#[cfg(feature = "full")]
450async fn recv_close_gets_none_reserved() {
451 let (tx1, mut rx) = mpsc::channel::<i32>(1);
452 let tx2 = tx1.clone();
453
454 let permit1 = assert_ok!(tx1.reserve().await);
455 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
456 assert_pending!(permit2.poll());
457
458 rx.close();
459
460 assert!(permit2.is_woken());
461 assert_ready_err!(permit2.poll());
462
463 {
464 let mut recv = tokio_test::task::spawn(rx.recv());
465 assert_pending!(recv.poll());
466
467 permit1.send(123);
468 assert!(recv.is_woken());
469
470 let v = assert_ready!(recv.poll());
471 assert_eq!(v, Some(123));
472 }
473
474 assert!(rx.recv().await.is_none());
475}
476
477#[maybe_tokio_test]
478async fn tx_close_gets_none() {
479 let (_, mut rx) = mpsc::channel::<i32>(10);
480 assert!(rx.recv().await.is_none());
481}
482
483#[maybe_tokio_test]
484async fn try_send_fail() {
485 let (tx, mut rx) = mpsc::channel(1);
486
487 tx.try_send("hello").unwrap();
488
489 // This should fail
490 match assert_err!(tx.try_send("fail")) {
491 TrySendError::Full(..) => {}
492 _ => panic!(),
493 }
494
495 assert_eq!(rx.recv().await, Some("hello"));
496
497 assert_ok!(tx.try_send("goodbye"));
498 drop(tx);
499
500 assert_eq!(rx.recv().await, Some("goodbye"));
501 assert!(rx.recv().await.is_none());
502}
503
504#[maybe_tokio_test]
505async fn try_send_fail_with_try_recv() {
506 let (tx, mut rx) = mpsc::channel(1);
507
508 tx.try_send("hello").unwrap();
509
510 // This should fail
511 match assert_err!(tx.try_send("fail")) {
512 TrySendError::Full(..) => {}
513 _ => panic!(),
514 }
515
516 assert_eq!(rx.try_recv(), Ok("hello"));
517
518 assert_ok!(tx.try_send("goodbye"));
519 drop(tx);
520
521 assert_eq!(rx.try_recv(), Ok("goodbye"));
522 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
523}
524
525#[maybe_tokio_test]
526async fn try_reserve_fails() {
527 let (tx, mut rx) = mpsc::channel(1);
528
529 let permit = tx.try_reserve().unwrap();
530
531 // This should fail
532 match assert_err!(tx.try_reserve()) {
533 TrySendError::Full(()) => {}
534 _ => panic!(),
535 }
536
537 permit.send("foo");
538
539 assert_eq!(rx.recv().await, Some("foo"));
540
541 // Dropping permit releases the slot.
542 let permit = tx.try_reserve().unwrap();
543 drop(permit);
544
545 let _permit = tx.try_reserve().unwrap();
546}
547
548#[tokio::test]
549#[cfg(feature = "full")]
550async fn drop_permit_releases_permit() {
551 // poll_ready reserves capacity, ensure that the capacity is released if tx
552 // is dropped w/o sending a value.
553 let (tx1, _rx) = mpsc::channel::<i32>(1);
554 let tx2 = tx1.clone();
555
556 let permit = assert_ok!(tx1.reserve().await);
557
558 let mut reserve2 = tokio_test::task::spawn(tx2.reserve());
559 assert_pending!(reserve2.poll());
560
561 drop(permit);
562
563 assert!(reserve2.is_woken());
564 assert_ready_ok!(reserve2.poll());
565}
566
567#[maybe_tokio_test]
568async fn dropping_rx_closes_channel() {
569 let (tx, rx) = mpsc::channel(100);
570
571 let msg = Arc::new(());
572 assert_ok!(tx.try_send(msg.clone()));
573
574 drop(rx);
575 assert_err!(tx.reserve().await);
576 assert_eq!(1, Arc::strong_count(&msg));
577}
578
579#[test]
580fn dropping_rx_closes_channel_for_try() {
581 let (tx, rx) = mpsc::channel(100);
582
583 let msg = Arc::new(());
584 tx.try_send(msg.clone()).unwrap();
585
586 drop(rx);
587
588 assert!(matches!(
589 tx.try_send(msg.clone()),
590 Err(TrySendError::Closed(_))
591 ));
592 assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_))));
593 assert!(matches!(
594 tx.try_reserve_owned(),
595 Err(TrySendError::Closed(_))
596 ));
597
598 assert_eq!(1, Arc::strong_count(&msg));
599}
600
601#[test]
602fn unconsumed_messages_are_dropped() {
603 let msg = Arc::new(());
604
605 let (tx, rx) = mpsc::channel(100);
606
607 tx.try_send(msg.clone()).unwrap();
608
609 assert_eq!(2, Arc::strong_count(&msg));
610
611 drop((tx, rx));
612
613 assert_eq!(1, Arc::strong_count(&msg));
614}
615
616#[test]
617#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
618fn blocking_recv() {
619 let (tx, mut rx) = mpsc::channel::<u8>(1);
620
621 let sync_code = std::thread::spawn(move || {
622 assert_eq!(Some(10), rx.blocking_recv());
623 });
624
625 tokio::runtime::Runtime::new()
626 .unwrap()
627 .block_on(async move {
628 let _ = tx.send(10).await;
629 });
630 sync_code.join().unwrap()
631}
632
633#[tokio::test]
634#[should_panic]
635#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
636async fn blocking_recv_async() {
637 let (_tx, mut rx) = mpsc::channel::<()>(1);
638 let _ = rx.blocking_recv();
639}
640
641#[test]
642#[cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support threads
643fn blocking_send() {
644 let (tx, mut rx) = mpsc::channel::<u8>(1);
645
646 let sync_code = std::thread::spawn(move || {
647 tx.blocking_send(10).unwrap();
648 });
649
650 tokio::runtime::Runtime::new()
651 .unwrap()
652 .block_on(async move {
653 assert_eq!(Some(10), rx.recv().await);
654 });
655 sync_code.join().unwrap()
656}
657
658#[tokio::test]
659#[should_panic]
660#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
661async fn blocking_send_async() {
662 let (tx, _rx) = mpsc::channel::<()>(1);
663 let _ = tx.blocking_send(());
664}
665
666#[tokio::test]
667#[cfg(feature = "full")]
668async fn ready_close_cancel_bounded() {
669 let (tx, mut rx) = mpsc::channel::<()>(100);
670 let _tx2 = tx.clone();
671
672 let permit = assert_ok!(tx.reserve().await);
673
674 rx.close();
675
676 let mut recv = tokio_test::task::spawn(rx.recv());
677 assert_pending!(recv.poll());
678
679 drop(permit);
680
681 assert!(recv.is_woken());
682 let val = assert_ready!(recv.poll());
683 assert!(val.is_none());
684}
685
686#[tokio::test]
687#[cfg(feature = "full")]
688async fn permit_available_not_acquired_close() {
689 let (tx1, mut rx) = mpsc::channel::<()>(1);
690 let tx2 = tx1.clone();
691
692 let permit1 = assert_ok!(tx1.reserve().await);
693
694 let mut permit2 = tokio_test::task::spawn(tx2.reserve());
695 assert_pending!(permit2.poll());
696
697 rx.close();
698
699 drop(permit1);
700 assert!(permit2.is_woken());
701
702 drop(permit2);
703 assert!(rx.recv().await.is_none());
704}
705
706#[test]
707fn try_recv_bounded() {
708 let (tx, mut rx) = mpsc::channel(5);
709
710 tx.try_send("hello").unwrap();
711 tx.try_send("hello").unwrap();
712 tx.try_send("hello").unwrap();
713 tx.try_send("hello").unwrap();
714 tx.try_send("hello").unwrap();
715 assert!(tx.try_send("hello").is_err());
716
717 assert_eq!(Ok("hello"), rx.try_recv());
718 assert_eq!(Ok("hello"), rx.try_recv());
719 assert_eq!(Ok("hello"), rx.try_recv());
720 assert_eq!(Ok("hello"), rx.try_recv());
721 assert_eq!(Ok("hello"), rx.try_recv());
722 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
723
724 tx.try_send("hello").unwrap();
725 tx.try_send("hello").unwrap();
726 tx.try_send("hello").unwrap();
727 tx.try_send("hello").unwrap();
728 assert_eq!(Ok("hello"), rx.try_recv());
729 tx.try_send("hello").unwrap();
730 tx.try_send("hello").unwrap();
731 assert!(tx.try_send("hello").is_err());
732 assert_eq!(Ok("hello"), rx.try_recv());
733 assert_eq!(Ok("hello"), rx.try_recv());
734 assert_eq!(Ok("hello"), rx.try_recv());
735 assert_eq!(Ok("hello"), rx.try_recv());
736 assert_eq!(Ok("hello"), rx.try_recv());
737 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
738
739 tx.try_send("hello").unwrap();
740 tx.try_send("hello").unwrap();
741 tx.try_send("hello").unwrap();
742 drop(tx);
743 assert_eq!(Ok("hello"), rx.try_recv());
744 assert_eq!(Ok("hello"), rx.try_recv());
745 assert_eq!(Ok("hello"), rx.try_recv());
746 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
747}
748
749#[test]
750fn try_recv_unbounded() {
751 for num in 0..100 {
752 let (tx, mut rx) = mpsc::unbounded_channel();
753
754 for i in 0..num {
755 tx.send(i).unwrap();
756 }
757
758 for i in 0..num {
759 assert_eq!(rx.try_recv(), Ok(i));
760 }
761
762 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
763 drop(tx);
764 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
765 }
766}
767
768#[test]
769fn try_recv_close_while_empty_bounded() {
770 let (tx, mut rx) = mpsc::channel::<()>(5);
771
772 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
773 drop(tx);
774 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
775}
776
777#[test]
778fn try_recv_close_while_empty_unbounded() {
779 let (tx, mut rx) = mpsc::unbounded_channel::<()>();
780
781 assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
782 drop(tx);
783 assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
784}
785
786#[tokio::test(start_paused = true)]
787#[cfg(feature = "full")]
788async fn recv_timeout() {
789 use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
790 use tokio::time::Duration;
791
792 let (tx, rx) = mpsc::channel(5);
793
794 assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
795 assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
796 assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
797 assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
798 assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
799 assert_eq!(
800 tx.send_timeout(60, Duration::from_secs(1)).await,
801 Err(Timeout(60))
802 );
803
804 drop(rx);
805 assert_eq!(
806 tx.send_timeout(70, Duration::from_secs(1)).await,
807 Err(Closed(70))
808 );
809}
810
811#[test]
812#[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
813#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
814fn recv_timeout_panic() {
815 use futures::future::FutureExt;
816 use tokio::time::Duration;
817
818 let (tx, _rx) = mpsc::channel(5);
819 tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
820}
821
822// Tests that channel `capacity` changes and `max_capacity` stays the same
823#[tokio::test]
824async fn test_tx_capacity() {
825 let (tx, _rx) = mpsc::channel::<()>(10);
826 // both capacities are same before
827 assert_eq!(tx.capacity(), 10);
828 assert_eq!(tx.max_capacity(), 10);
829
830 let _permit = tx.reserve().await.unwrap();
831 // after reserve, only capacity should drop by one
832 assert_eq!(tx.capacity(), 9);
833 assert_eq!(tx.max_capacity(), 10);
834
835 tx.send(()).await.unwrap();
836 // after send, capacity should drop by one again
837 assert_eq!(tx.capacity(), 8);
838 assert_eq!(tx.max_capacity(), 10);
839}
840
841fn is_debug<T: fmt::Debug>(_: &T) {}
842