| 1 | use futures::channel::{mpsc, oneshot}; |
| 2 | use futures::executor::{block_on, block_on_stream}; |
| 3 | use futures::sink::SinkExt; |
| 4 | use futures::stream::StreamExt; |
| 5 | use std::sync::mpsc as std_mpsc; |
| 6 | use std::thread; |
| 7 | |
| 8 | #[test] |
| 9 | #[ignore ] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 |
| 10 | fn works() { |
| 11 | const N: usize = 4; |
| 12 | |
| 13 | let (mut tx, rx) = mpsc::channel(1); |
| 14 | |
| 15 | let (tx2, rx2) = std_mpsc::channel(); |
| 16 | let (tx3, rx3) = std_mpsc::channel(); |
| 17 | let t1 = thread::spawn(move || { |
| 18 | for _ in 0..=N { |
| 19 | let (mytx, myrx) = oneshot::channel(); |
| 20 | block_on(tx.send(myrx)).unwrap(); |
| 21 | tx3.send(mytx).unwrap(); |
| 22 | } |
| 23 | rx2.recv().unwrap(); |
| 24 | for _ in 0..N { |
| 25 | let (mytx, myrx) = oneshot::channel(); |
| 26 | block_on(tx.send(myrx)).unwrap(); |
| 27 | tx3.send(mytx).unwrap(); |
| 28 | } |
| 29 | }); |
| 30 | |
| 31 | let (tx4, rx4) = std_mpsc::channel(); |
| 32 | let t2 = thread::spawn(move || { |
| 33 | for item in block_on_stream(rx.buffer_unordered(N)) { |
| 34 | tx4.send(item.unwrap()).unwrap(); |
| 35 | } |
| 36 | }); |
| 37 | |
| 38 | let o1 = rx3.recv().unwrap(); |
| 39 | let o2 = rx3.recv().unwrap(); |
| 40 | let o3 = rx3.recv().unwrap(); |
| 41 | let o4 = rx3.recv().unwrap(); |
| 42 | assert!(rx4.try_recv().is_err()); |
| 43 | |
| 44 | o1.send(1).unwrap(); |
| 45 | assert_eq!(rx4.recv(), Ok(1)); |
| 46 | o3.send(3).unwrap(); |
| 47 | assert_eq!(rx4.recv(), Ok(3)); |
| 48 | tx2.send(()).unwrap(); |
| 49 | o2.send(2).unwrap(); |
| 50 | assert_eq!(rx4.recv(), Ok(2)); |
| 51 | o4.send(4).unwrap(); |
| 52 | assert_eq!(rx4.recv(), Ok(4)); |
| 53 | |
| 54 | let o5 = rx3.recv().unwrap(); |
| 55 | let o6 = rx3.recv().unwrap(); |
| 56 | let o7 = rx3.recv().unwrap(); |
| 57 | let o8 = rx3.recv().unwrap(); |
| 58 | let o9 = rx3.recv().unwrap(); |
| 59 | |
| 60 | o5.send(5).unwrap(); |
| 61 | assert_eq!(rx4.recv(), Ok(5)); |
| 62 | o8.send(8).unwrap(); |
| 63 | assert_eq!(rx4.recv(), Ok(8)); |
| 64 | o9.send(9).unwrap(); |
| 65 | assert_eq!(rx4.recv(), Ok(9)); |
| 66 | o7.send(7).unwrap(); |
| 67 | assert_eq!(rx4.recv(), Ok(7)); |
| 68 | o6.send(6).unwrap(); |
| 69 | assert_eq!(rx4.recv(), Ok(6)); |
| 70 | |
| 71 | t1.join().unwrap(); |
| 72 | t2.join().unwrap(); |
| 73 | } |
| 74 | |