1 | use futures::channel::{mpsc, oneshot}; |
2 | use futures::executor::{block_on, block_on_stream}; |
3 | use futures::sink::SinkExt; |
4 | use futures::stream::StreamExt; |
5 | use std::sync::mpsc as std_mpsc; |
6 | use std::thread; |
7 | |
8 | #[test] |
9 | #[ignore ] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 |
10 | fn works() { |
11 | const N: usize = 4; |
12 | |
13 | let (mut tx, rx) = mpsc::channel(1); |
14 | |
15 | let (tx2, rx2) = std_mpsc::channel(); |
16 | let (tx3, rx3) = std_mpsc::channel(); |
17 | let t1 = thread::spawn(move || { |
18 | for _ in 0..=N { |
19 | let (mytx, myrx) = oneshot::channel(); |
20 | block_on(tx.send(myrx)).unwrap(); |
21 | tx3.send(mytx).unwrap(); |
22 | } |
23 | rx2.recv().unwrap(); |
24 | for _ in 0..N { |
25 | let (mytx, myrx) = oneshot::channel(); |
26 | block_on(tx.send(myrx)).unwrap(); |
27 | tx3.send(mytx).unwrap(); |
28 | } |
29 | }); |
30 | |
31 | let (tx4, rx4) = std_mpsc::channel(); |
32 | let t2 = thread::spawn(move || { |
33 | for item in block_on_stream(rx.buffer_unordered(N)) { |
34 | tx4.send(item.unwrap()).unwrap(); |
35 | } |
36 | }); |
37 | |
38 | let o1 = rx3.recv().unwrap(); |
39 | let o2 = rx3.recv().unwrap(); |
40 | let o3 = rx3.recv().unwrap(); |
41 | let o4 = rx3.recv().unwrap(); |
42 | assert!(rx4.try_recv().is_err()); |
43 | |
44 | o1.send(1).unwrap(); |
45 | assert_eq!(rx4.recv(), Ok(1)); |
46 | o3.send(3).unwrap(); |
47 | assert_eq!(rx4.recv(), Ok(3)); |
48 | tx2.send(()).unwrap(); |
49 | o2.send(2).unwrap(); |
50 | assert_eq!(rx4.recv(), Ok(2)); |
51 | o4.send(4).unwrap(); |
52 | assert_eq!(rx4.recv(), Ok(4)); |
53 | |
54 | let o5 = rx3.recv().unwrap(); |
55 | let o6 = rx3.recv().unwrap(); |
56 | let o7 = rx3.recv().unwrap(); |
57 | let o8 = rx3.recv().unwrap(); |
58 | let o9 = rx3.recv().unwrap(); |
59 | |
60 | o5.send(5).unwrap(); |
61 | assert_eq!(rx4.recv(), Ok(5)); |
62 | o8.send(8).unwrap(); |
63 | assert_eq!(rx4.recv(), Ok(8)); |
64 | o9.send(9).unwrap(); |
65 | assert_eq!(rx4.recv(), Ok(9)); |
66 | o7.send(7).unwrap(); |
67 | assert_eq!(rx4.recv(), Ok(7)); |
68 | o6.send(6).unwrap(); |
69 | assert_eq!(rx4.recv(), Ok(6)); |
70 | |
71 | t1.join().unwrap(); |
72 | t2.join().unwrap(); |
73 | } |
74 | |