1use futures::channel::{mpsc, oneshot};
2use futures::executor::{block_on, block_on_stream};
3use futures::future::{poll_fn, FutureExt};
4use futures::pin_mut;
5use futures::sink::{Sink, SinkExt};
6use futures::stream::{Stream, StreamExt};
7use futures::task::{Context, Poll};
8use futures_test::task::{new_count_waker, noop_context};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{Arc, Mutex};
11use std::thread;
12
13trait AssertSend: Send {}
14impl AssertSend for mpsc::Sender<i32> {}
15impl AssertSend for mpsc::Receiver<i32> {}
16
17#[test]
18fn send_recv() {
19 let (mut tx, rx) = mpsc::channel::<i32>(16);
20
21 block_on(tx.send(1)).unwrap();
22 drop(tx);
23 let v: Vec<_> = block_on(rx.collect());
24 assert_eq!(v, vec![1]);
25}
26
27#[test]
28fn send_recv_no_buffer() {
29 // Run on a task context
30 block_on(poll_fn(move |cx| {
31 let (tx, rx) = mpsc::channel::<i32>(0);
32 pin_mut!(tx, rx);
33
34 assert!(tx.as_mut().poll_flush(cx).is_ready());
35 assert!(tx.as_mut().poll_ready(cx).is_ready());
36
37 // Send first message
38 assert!(tx.as_mut().start_send(1).is_ok());
39 assert!(tx.as_mut().poll_ready(cx).is_pending());
40
41 // poll_ready said Pending, so no room in buffer, therefore new sends
42 // should get rejected with is_full.
43 assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
44 assert!(tx.as_mut().poll_ready(cx).is_pending());
45
46 // Take the value
47 assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
48 assert!(tx.as_mut().poll_ready(cx).is_ready());
49
50 // Send second message
51 assert!(tx.as_mut().poll_ready(cx).is_ready());
52 assert!(tx.as_mut().start_send(2).is_ok());
53 assert!(tx.as_mut().poll_ready(cx).is_pending());
54
55 // Take the value
56 assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
57 assert!(tx.as_mut().poll_ready(cx).is_ready());
58
59 Poll::Ready(())
60 }));
61}
62
63#[test]
64fn send_shared_recv() {
65 let (mut tx1, rx) = mpsc::channel::<i32>(16);
66 let mut rx = block_on_stream(rx);
67 let mut tx2 = tx1.clone();
68
69 block_on(tx1.send(1)).unwrap();
70 assert_eq!(rx.next(), Some(1));
71
72 block_on(tx2.send(2)).unwrap();
73 assert_eq!(rx.next(), Some(2));
74}
75
76#[test]
77fn send_recv_threads() {
78 let (mut tx, rx) = mpsc::channel::<i32>(16);
79
80 let t = thread::spawn(move || {
81 block_on(tx.send(1)).unwrap();
82 });
83
84 let v: Vec<_> = block_on(rx.take(1).collect());
85 assert_eq!(v, vec![1]);
86
87 t.join().unwrap();
88}
89
90#[test]
91fn send_recv_threads_no_capacity() {
92 let (mut tx, rx) = mpsc::channel::<i32>(0);
93
94 let t = thread::spawn(move || {
95 block_on(tx.send(1)).unwrap();
96 block_on(tx.send(2)).unwrap();
97 });
98
99 let v: Vec<_> = block_on(rx.collect());
100 assert_eq!(v, vec![1, 2]);
101
102 t.join().unwrap();
103}
104
105#[test]
106fn recv_close_gets_none() {
107 let (mut tx, mut rx) = mpsc::channel::<i32>(10);
108
109 // Run on a task context
110 block_on(poll_fn(move |cx| {
111 rx.close();
112
113 assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
114 match tx.poll_ready(cx) {
115 Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
116 Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
117 };
118
119 Poll::Ready(())
120 }));
121}
122
123#[test]
124fn tx_close_gets_none() {
125 let (_, mut rx) = mpsc::channel::<i32>(10);
126
127 // Run on a task context
128 block_on(poll_fn(move |cx| {
129 assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
130 Poll::Ready(())
131 }));
132}
133
134// #[test]
135// fn spawn_sends_items() {
136// let core = local_executor::Core::new();
137// let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
138// let rx = mpsc::spawn(stream, &core, 1);
139// assert_eq!(core.run(rx.take(4).collect()).unwrap(),
140// [0, 1, 2, 3]);
141// }
142
143// #[test]
144// fn spawn_kill_dead_stream() {
145// use std::thread;
146// use std::time::Duration;
147// use futures::future::Either;
148// use futures::sync::oneshot;
149//
150// // a stream which never returns anything (maybe a remote end isn't
151// // responding), but dropping it leads to observable side effects
152// // (like closing connections, releasing limited resources, ...)
153// #[derive(Debug)]
154// struct Dead {
155// // when dropped you should get Err(oneshot::Canceled) on the
156// // receiving end
157// done: oneshot::Sender<()>,
158// }
159// impl Stream for Dead {
160// type Item = ();
161// type Error = ();
162//
163// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
164// Ok(Poll::Pending)
165// }
166// }
167//
168// // need to implement a timeout for the test, as it would hang
169// // forever right now
170// let (timeout_tx, timeout_rx) = oneshot::channel();
171// thread::spawn(move || {
172// thread::sleep(Duration::from_millis(1000));
173// let _ = timeout_tx.send(());
174// });
175//
176// let core = local_executor::Core::new();
177// let (done_tx, done_rx) = oneshot::channel();
178// let stream = Dead{done: done_tx};
179// let rx = mpsc::spawn(stream, &core, 1);
180// let res = core.run(
181// Ok::<_, ()>(())
182// .into_future()
183// .then(move |_| {
184// // now drop the spawned stream: maybe some timeout exceeded,
185// // or some connection on this end was closed by the remote
186// // end.
187// drop(rx);
188// // and wait for the spawned stream to release its resources
189// done_rx
190// })
191// .select2(timeout_rx)
192// );
193// match res {
194// Err(Either::A((oneshot::Canceled, _))) => (),
195// _ => {
196// panic!("dead stream wasn't canceled");
197// },
198// }
199// }
200
201#[test]
202fn stress_shared_unbounded() {
203 const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
204 const NTHREADS: u32 = 8;
205 let (tx, rx) = mpsc::unbounded::<i32>();
206
207 let t = thread::spawn(move || {
208 let result: Vec<_> = block_on(rx.collect());
209 assert_eq!(result.len(), (AMT * NTHREADS) as usize);
210 for item in result {
211 assert_eq!(item, 1);
212 }
213 });
214
215 for _ in 0..NTHREADS {
216 let tx = tx.clone();
217
218 thread::spawn(move || {
219 for _ in 0..AMT {
220 tx.unbounded_send(1).unwrap();
221 }
222 });
223 }
224
225 drop(tx);
226
227 t.join().ok().unwrap();
228}
229
230#[test]
231fn stress_shared_bounded_hard() {
232 const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
233 const NTHREADS: u32 = 8;
234 let (tx, rx) = mpsc::channel::<i32>(0);
235
236 let t = thread::spawn(move || {
237 let result: Vec<_> = block_on(rx.collect());
238 assert_eq!(result.len(), (AMT * NTHREADS) as usize);
239 for item in result {
240 assert_eq!(item, 1);
241 }
242 });
243
244 for _ in 0..NTHREADS {
245 let mut tx = tx.clone();
246
247 thread::spawn(move || {
248 for _ in 0..AMT {
249 block_on(tx.send(1)).unwrap();
250 }
251 });
252 }
253
254 drop(tx);
255
256 t.join().unwrap();
257}
258
259#[allow(clippy::same_item_push)]
260#[test]
261fn stress_receiver_multi_task_bounded_hard() {
262 const AMT: usize = if cfg!(miri) { 100 } else { 10_000 };
263 const NTHREADS: u32 = 2;
264
265 let (mut tx, rx) = mpsc::channel::<usize>(0);
266 let rx = Arc::new(Mutex::new(Some(rx)));
267 let n = Arc::new(AtomicUsize::new(0));
268
269 let mut th = vec![];
270
271 for _ in 0..NTHREADS {
272 let rx = rx.clone();
273 let n = n.clone();
274
275 let t = thread::spawn(move || {
276 let mut i = 0;
277
278 loop {
279 i += 1;
280 let mut rx_opt = rx.lock().unwrap();
281 if let Some(rx) = &mut *rx_opt {
282 if i % 5 == 0 {
283 let item = block_on(rx.next());
284
285 if item.is_none() {
286 *rx_opt = None;
287 break;
288 }
289
290 n.fetch_add(1, Ordering::Relaxed);
291 } else {
292 // Just poll
293 let n = n.clone();
294 match rx.poll_next_unpin(&mut noop_context()) {
295 Poll::Ready(Some(_)) => {
296 n.fetch_add(1, Ordering::Relaxed);
297 }
298 Poll::Ready(None) => {
299 *rx_opt = None;
300 break;
301 }
302 Poll::Pending => {}
303 }
304 }
305 } else {
306 break;
307 }
308 }
309 });
310
311 th.push(t);
312 }
313
314 for i in 0..AMT {
315 block_on(tx.send(i)).unwrap();
316 }
317 drop(tx);
318
319 for t in th {
320 t.join().unwrap();
321 }
322
323 assert_eq!(AMT, n.load(Ordering::Relaxed));
324}
325
326/// Stress test that receiver properly receives all the messages
327/// after sender dropped.
328#[test]
329fn stress_drop_sender() {
330 const ITER: usize = if cfg!(miri) { 100 } else { 10000 };
331
332 fn list() -> impl Stream<Item = i32> {
333 let (tx, rx) = mpsc::channel(1);
334 thread::spawn(move || {
335 block_on(send_one_two_three(tx));
336 });
337 rx
338 }
339
340 for _ in 0..ITER {
341 let v: Vec<_> = block_on(list().collect());
342 assert_eq!(v, vec![1, 2, 3]);
343 }
344}
345
346async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
347 for i in 1..=3 {
348 tx.send(i).await.unwrap();
349 }
350}
351
352/// Stress test that after receiver dropped,
353/// no messages are lost.
354fn stress_close_receiver_iter() {
355 let (tx, rx) = mpsc::unbounded();
356 let mut rx = block_on_stream(rx);
357 let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
358 let th = thread::spawn(move || {
359 for i in 1.. {
360 if tx.unbounded_send(i).is_err() {
361 unwritten_tx.send(i).expect("unwritten_tx");
362 return;
363 }
364 }
365 });
366
367 // Read one message to make sure thread effectively started
368 assert_eq!(Some(1), rx.next());
369
370 rx.close();
371
372 for i in 2.. {
373 match rx.next() {
374 Some(r) => assert!(i == r),
375 None => {
376 let unwritten = unwritten_rx.recv().expect("unwritten_rx");
377 assert_eq!(unwritten, i);
378 th.join().unwrap();
379 return;
380 }
381 }
382 }
383}
384
385#[test]
386fn stress_close_receiver() {
387 const ITER: usize = if cfg!(miri) { 50 } else { 10000 };
388
389 for _ in 0..ITER {
390 stress_close_receiver_iter();
391 }
392}
393
394async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
395 for i in (1..=count).rev() {
396 sender.send(i).await.unwrap();
397 }
398}
399
400/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
401#[allow(clippy::same_item_push)]
402#[test]
403fn stress_poll_ready() {
404 const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
405 const NTHREADS: u32 = 8;
406
407 /// Run a stress test using the specified channel capacity.
408 fn stress(capacity: usize) {
409 let (tx, rx) = mpsc::channel(capacity);
410 let mut threads = Vec::new();
411 for _ in 0..NTHREADS {
412 let sender = tx.clone();
413 threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
414 }
415 drop(tx);
416
417 let result: Vec<_> = block_on(rx.collect());
418 assert_eq!(result.len() as u32, AMT * NTHREADS);
419
420 for thread in threads {
421 thread.join().unwrap();
422 }
423 }
424
425 stress(0);
426 stress(1);
427 stress(8);
428 stress(16);
429}
430
431#[test]
432fn try_send_1() {
433 const N: usize = if cfg!(miri) { 100 } else { 3000 };
434 let (mut tx, rx) = mpsc::channel(0);
435
436 let t = thread::spawn(move || {
437 for i in 0..N {
438 loop {
439 if tx.try_send(i).is_ok() {
440 break;
441 }
442 }
443 }
444 });
445
446 let result: Vec<_> = block_on(rx.collect());
447 for (i, j) in result.into_iter().enumerate() {
448 assert_eq!(i, j);
449 }
450
451 t.join().unwrap();
452}
453
454#[test]
455fn try_send_2() {
456 let (mut tx, rx) = mpsc::channel(0);
457 let mut rx = block_on_stream(rx);
458
459 tx.try_send("hello").unwrap();
460
461 let (readytx, readyrx) = oneshot::channel::<()>();
462
463 let th = thread::spawn(move || {
464 block_on(poll_fn(|cx| {
465 assert!(tx.poll_ready(cx).is_pending());
466 Poll::Ready(())
467 }));
468
469 drop(readytx);
470 block_on(tx.send("goodbye")).unwrap();
471 });
472
473 let _ = block_on(readyrx);
474 assert_eq!(rx.next(), Some("hello"));
475 assert_eq!(rx.next(), Some("goodbye"));
476 assert_eq!(rx.next(), None);
477
478 th.join().unwrap();
479}
480
481#[test]
482fn try_send_fail() {
483 let (mut tx, rx) = mpsc::channel(0);
484 let mut rx = block_on_stream(rx);
485
486 tx.try_send("hello").unwrap();
487
488 // This should fail
489 assert!(tx.try_send("fail").is_err());
490
491 assert_eq!(rx.next(), Some("hello"));
492
493 tx.try_send("goodbye").unwrap();
494 drop(tx);
495
496 assert_eq!(rx.next(), Some("goodbye"));
497 assert_eq!(rx.next(), None);
498}
499
500#[test]
501fn try_send_recv() {
502 let (mut tx, mut rx) = mpsc::channel(1);
503 tx.try_send("hello").unwrap();
504 tx.try_send("hello").unwrap();
505 tx.try_send("hello").unwrap_err(); // should be full
506 rx.try_next().unwrap();
507 rx.try_next().unwrap();
508 rx.try_next().unwrap_err(); // should be empty
509 tx.try_send("hello").unwrap();
510 rx.try_next().unwrap();
511 rx.try_next().unwrap_err(); // should be empty
512}
513
514#[test]
515fn same_receiver() {
516 let (mut txa1, _) = mpsc::channel::<i32>(1);
517 let txa2 = txa1.clone();
518
519 let (mut txb1, _) = mpsc::channel::<i32>(1);
520 let txb2 = txb1.clone();
521
522 assert!(txa1.same_receiver(&txa2));
523 assert!(txb1.same_receiver(&txb2));
524 assert!(!txa1.same_receiver(&txb1));
525
526 txa1.disconnect();
527 txb1.close_channel();
528
529 assert!(!txa1.same_receiver(&txa2));
530 assert!(txb1.same_receiver(&txb2));
531}
532
533#[test]
534fn is_connected_to() {
535 let (txa, rxa) = mpsc::channel::<i32>(1);
536 let (txb, rxb) = mpsc::channel::<i32>(1);
537
538 assert!(txa.is_connected_to(&rxa));
539 assert!(txb.is_connected_to(&rxb));
540 assert!(!txa.is_connected_to(&rxb));
541 assert!(!txb.is_connected_to(&rxa));
542}
543
544#[test]
545fn hash_receiver() {
546 use std::collections::hash_map::DefaultHasher;
547 use std::hash::Hasher;
548
549 let mut hasher_a1 = DefaultHasher::new();
550 let mut hasher_a2 = DefaultHasher::new();
551 let mut hasher_b1 = DefaultHasher::new();
552 let mut hasher_b2 = DefaultHasher::new();
553 let (mut txa1, _) = mpsc::channel::<i32>(1);
554 let txa2 = txa1.clone();
555
556 let (mut txb1, _) = mpsc::channel::<i32>(1);
557 let txb2 = txb1.clone();
558
559 txa1.hash_receiver(&mut hasher_a1);
560 let hash_a1 = hasher_a1.finish();
561 txa2.hash_receiver(&mut hasher_a2);
562 let hash_a2 = hasher_a2.finish();
563 txb1.hash_receiver(&mut hasher_b1);
564 let hash_b1 = hasher_b1.finish();
565 txb2.hash_receiver(&mut hasher_b2);
566 let hash_b2 = hasher_b2.finish();
567
568 assert_eq!(hash_a1, hash_a2);
569 assert_eq!(hash_b1, hash_b2);
570 assert!(hash_a1 != hash_b1);
571
572 txa1.disconnect();
573 txb1.close_channel();
574
575 let mut hasher_a1 = DefaultHasher::new();
576 let mut hasher_a2 = DefaultHasher::new();
577 let mut hasher_b1 = DefaultHasher::new();
578 let mut hasher_b2 = DefaultHasher::new();
579
580 txa1.hash_receiver(&mut hasher_a1);
581 let hash_a1 = hasher_a1.finish();
582 txa2.hash_receiver(&mut hasher_a2);
583 let hash_a2 = hasher_a2.finish();
584 txb1.hash_receiver(&mut hasher_b1);
585 let hash_b1 = hasher_b1.finish();
586 txb2.hash_receiver(&mut hasher_b2);
587 let hash_b2 = hasher_b2.finish();
588
589 assert!(hash_a1 != hash_a2);
590 assert_eq!(hash_b1, hash_b2);
591}
592
593#[test]
594fn send_backpressure() {
595 let (waker, counter) = new_count_waker();
596 let mut cx = Context::from_waker(&waker);
597
598 let (mut tx, mut rx) = mpsc::channel(1);
599 block_on(tx.send(1)).unwrap();
600
601 let mut task = tx.send(2);
602 assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
603 assert_eq!(counter, 0);
604
605 let item = block_on(rx.next()).unwrap();
606 assert_eq!(item, 1);
607 assert_eq!(counter, 1);
608 assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
609
610 let item = block_on(rx.next()).unwrap();
611 assert_eq!(item, 2);
612}
613
614#[test]
615fn send_backpressure_multi_senders() {
616 let (waker, counter) = new_count_waker();
617 let mut cx = Context::from_waker(&waker);
618
619 let (mut tx1, mut rx) = mpsc::channel(1);
620 let mut tx2 = tx1.clone();
621 block_on(tx1.send(1)).unwrap();
622
623 let mut task = tx2.send(2);
624 assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
625 assert_eq!(counter, 0);
626
627 let item = block_on(rx.next()).unwrap();
628 assert_eq!(item, 1);
629 assert_eq!(counter, 1);
630 assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
631
632 let item = block_on(rx.next()).unwrap();
633 assert_eq!(item, 2);
634}
635
636/// Test that empty channel has zero length and that non-empty channel has length equal to number
637/// of enqueued items
638#[test]
639fn unbounded_len() {
640 let (tx, mut rx) = mpsc::unbounded();
641 assert_eq!(tx.len(), 0);
642 assert!(tx.is_empty());
643 tx.unbounded_send(1).unwrap();
644 assert_eq!(tx.len(), 1);
645 assert!(!tx.is_empty());
646 tx.unbounded_send(2).unwrap();
647 assert_eq!(tx.len(), 2);
648 assert!(!tx.is_empty());
649 let item = block_on(rx.next()).unwrap();
650 assert_eq!(item, 1);
651 assert_eq!(tx.len(), 1);
652 assert!(!tx.is_empty());
653 let item = block_on(rx.next()).unwrap();
654 assert_eq!(item, 2);
655 assert_eq!(tx.len(), 0);
656 assert!(tx.is_empty());
657}
658