1use futures::channel::{mpsc, oneshot};
2use futures::executor::{block_on, block_on_stream};
3use futures::sink::SinkExt;
4use futures::stream::StreamExt;
5use std::sync::mpsc as std_mpsc;
6use std::thread;
7
8#[test]
9#[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790
10fn works() {
11 const N: usize = 4;
12
13 let (mut tx, rx) = mpsc::channel(1);
14
15 let (tx2, rx2) = std_mpsc::channel();
16 let (tx3, rx3) = std_mpsc::channel();
17 let t1 = thread::spawn(move || {
18 for _ in 0..=N {
19 let (mytx, myrx) = oneshot::channel();
20 block_on(tx.send(myrx)).unwrap();
21 tx3.send(mytx).unwrap();
22 }
23 rx2.recv().unwrap();
24 for _ in 0..N {
25 let (mytx, myrx) = oneshot::channel();
26 block_on(tx.send(myrx)).unwrap();
27 tx3.send(mytx).unwrap();
28 }
29 });
30
31 let (tx4, rx4) = std_mpsc::channel();
32 let t2 = thread::spawn(move || {
33 for item in block_on_stream(rx.buffer_unordered(N)) {
34 tx4.send(item.unwrap()).unwrap();
35 }
36 });
37
38 let o1 = rx3.recv().unwrap();
39 let o2 = rx3.recv().unwrap();
40 let o3 = rx3.recv().unwrap();
41 let o4 = rx3.recv().unwrap();
42 assert!(rx4.try_recv().is_err());
43
44 o1.send(1).unwrap();
45 assert_eq!(rx4.recv(), Ok(1));
46 o3.send(3).unwrap();
47 assert_eq!(rx4.recv(), Ok(3));
48 tx2.send(()).unwrap();
49 o2.send(2).unwrap();
50 assert_eq!(rx4.recv(), Ok(2));
51 o4.send(4).unwrap();
52 assert_eq!(rx4.recv(), Ok(4));
53
54 let o5 = rx3.recv().unwrap();
55 let o6 = rx3.recv().unwrap();
56 let o7 = rx3.recv().unwrap();
57 let o8 = rx3.recv().unwrap();
58 let o9 = rx3.recv().unwrap();
59
60 o5.send(5).unwrap();
61 assert_eq!(rx4.recv(), Ok(5));
62 o8.send(8).unwrap();
63 assert_eq!(rx4.recv(), Ok(8));
64 o9.send(9).unwrap();
65 assert_eq!(rx4.recv(), Ok(9));
66 o7.send(7).unwrap();
67 assert_eq!(rx4.recv(), Ok(7));
68 o6.send(6).unwrap();
69 assert_eq!(rx4.recv(), Ok(6));
70
71 t1.join().unwrap();
72 t2.join().unwrap();
73}
74