1 | use futures::channel::mpsc; |
2 | use futures::executor::block_on; |
3 | use futures::future::poll_fn; |
4 | use futures::sink::SinkExt; |
5 | use futures::stream::StreamExt; |
6 | use std::sync::atomic::{AtomicUsize, Ordering}; |
7 | use std::thread; |
8 | |
9 | #[test] |
10 | fn sequence() { |
11 | let (tx, rx) = mpsc::channel(1); |
12 | |
13 | let amt = 20; |
14 | let t = thread::spawn(move || block_on(send_sequence(amt, tx))); |
15 | let list: Vec<_> = block_on(rx.collect()); |
16 | let mut list = list.into_iter(); |
17 | for i in (1..=amt).rev() { |
18 | assert_eq!(list.next(), Some(i)); |
19 | } |
20 | assert_eq!(list.next(), None); |
21 | |
22 | t.join().unwrap(); |
23 | } |
24 | |
25 | async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { |
26 | for x in 0..n { |
27 | sender.send(n - x).await.unwrap(); |
28 | } |
29 | } |
30 | |
31 | #[test] |
32 | fn drop_sender() { |
33 | let (tx, mut rx) = mpsc::channel::<u32>(1); |
34 | drop(tx); |
35 | let f = poll_fn(|cx| rx.poll_next_unpin(cx)); |
36 | assert_eq!(block_on(f), None) |
37 | } |
38 | |
39 | #[test] |
40 | fn drop_rx() { |
41 | let (mut tx, rx) = mpsc::channel::<u32>(1); |
42 | block_on(tx.send(1)).unwrap(); |
43 | drop(rx); |
44 | assert!(block_on(tx.send(1)).is_err()); |
45 | } |
46 | |
47 | #[test] |
48 | fn drop_order() { |
49 | static DROPS: AtomicUsize = AtomicUsize::new(0); |
50 | let (mut tx, rx) = mpsc::channel(1); |
51 | |
52 | struct A; |
53 | |
54 | impl Drop for A { |
55 | fn drop(&mut self) { |
56 | DROPS.fetch_add(1, Ordering::SeqCst); |
57 | } |
58 | } |
59 | |
60 | block_on(tx.send(A)).unwrap(); |
61 | assert_eq!(DROPS.load(Ordering::SeqCst), 0); |
62 | drop(rx); |
63 | assert_eq!(DROPS.load(Ordering::SeqCst), 1); |
64 | assert!(block_on(tx.send(A)).is_err()); |
65 | assert_eq!(DROPS.load(Ordering::SeqCst), 2); |
66 | } |
67 | |