1use futures::channel::oneshot;
2use futures::executor::ThreadPool;
3use futures::future::{self, ok, Future, FutureExt, TryFutureExt};
4use futures::task::SpawnExt;
5use std::sync::mpsc;
6use std::thread;
7
8fn run<F: Future + Send + 'static>(future: F) {
9 let tp = ThreadPool::new().unwrap();
10 tp.spawn(future.map(drop)).unwrap();
11}
12
13#[test]
14fn join1() {
15 let (tx, rx) = mpsc::channel();
16 run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
17 assert_eq!(rx.recv(), Ok((1, 2)));
18 assert!(rx.recv().is_err());
19
20 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
21}
22
23#[test]
24fn join2() {
25 let (c1, p1) = oneshot::channel::<i32>();
26 let (c2, p2) = oneshot::channel::<i32>();
27 let (tx, rx) = mpsc::channel();
28 run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
29 assert!(rx.try_recv().is_err());
30 c1.send(1).unwrap();
31 assert!(rx.try_recv().is_err());
32 c2.send(2).unwrap();
33 assert_eq!(rx.recv(), Ok((1, 2)));
34 assert!(rx.recv().is_err());
35
36 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
37}
38
39#[test]
40fn join3() {
41 let (c1, p1) = oneshot::channel::<i32>();
42 let (c2, p2) = oneshot::channel::<i32>();
43 let (tx, rx) = mpsc::channel();
44 run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap()));
45 assert!(rx.try_recv().is_err());
46 drop(c1);
47 assert_eq!(rx.recv(), Ok(1));
48 assert!(rx.recv().is_err());
49 drop(c2);
50
51 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
52}
53
54#[test]
55fn join4() {
56 let (c1, p1) = oneshot::channel::<i32>();
57 let (c2, p2) = oneshot::channel::<i32>();
58 let (tx, rx) = mpsc::channel();
59 run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap()));
60 assert!(rx.try_recv().is_err());
61 drop(c1);
62 assert!(rx.recv().is_ok());
63 drop(c2);
64 assert!(rx.recv().is_err());
65
66 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
67}
68
69#[test]
70fn join5() {
71 let (c1, p1) = oneshot::channel::<i32>();
72 let (c2, p2) = oneshot::channel::<i32>();
73 let (c3, p3) = oneshot::channel::<i32>();
74 let (tx, rx) = mpsc::channel();
75 run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap()));
76 assert!(rx.try_recv().is_err());
77 c1.send(1).unwrap();
78 assert!(rx.try_recv().is_err());
79 c2.send(2).unwrap();
80 assert!(rx.try_recv().is_err());
81 c3.send(3).unwrap();
82 assert_eq!(rx.recv(), Ok(((1, 2), 3)));
83 assert!(rx.recv().is_err());
84
85 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
86}
87
88#[test]
89fn select1() {
90 let (c1, p1) = oneshot::channel::<i32>();
91 let (c2, p2) = oneshot::channel::<i32>();
92 let (tx, rx) = mpsc::channel();
93 run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap()));
94 assert!(rx.try_recv().is_err());
95 c1.send(1).unwrap();
96 let (v, p2) = rx.recv().unwrap().into_inner();
97 assert_eq!(v, 1);
98 assert!(rx.recv().is_err());
99
100 let (tx, rx) = mpsc::channel();
101 run(p2.map_ok(move |v| tx.send(v).unwrap()));
102 c2.send(2).unwrap();
103 assert_eq!(rx.recv(), Ok(2));
104 assert!(rx.recv().is_err());
105
106 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
107}
108
109#[test]
110fn select2() {
111 let (c1, p1) = oneshot::channel::<i32>();
112 let (c2, p2) = oneshot::channel::<i32>();
113 let (tx, rx) = mpsc::channel();
114 run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
115 assert!(rx.try_recv().is_err());
116 drop(c1);
117 let (v, p2) = rx.recv().unwrap();
118 assert_eq!(v, 1);
119 assert!(rx.recv().is_err());
120
121 let (tx, rx) = mpsc::channel();
122 run(p2.map_ok(move |v| tx.send(v).unwrap()));
123 c2.send(2).unwrap();
124 assert_eq!(rx.recv(), Ok(2));
125 assert!(rx.recv().is_err());
126
127 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
128}
129
130#[test]
131fn select3() {
132 let (c1, p1) = oneshot::channel::<i32>();
133 let (c2, p2) = oneshot::channel::<i32>();
134 let (tx, rx) = mpsc::channel();
135 run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()));
136 assert!(rx.try_recv().is_err());
137 drop(c1);
138 let (v, p2) = rx.recv().unwrap();
139 assert_eq!(v, 1);
140 assert!(rx.recv().is_err());
141
142 let (tx, rx) = mpsc::channel();
143 run(p2.map_err(move |_v| tx.send(2).unwrap()));
144 drop(c2);
145 assert_eq!(rx.recv(), Ok(2));
146 assert!(rx.recv().is_err());
147
148 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
149}
150
151#[test]
152fn select4() {
153 const N: usize = if cfg!(miri) { 100 } else { 10000 };
154
155 let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
156
157 let t = thread::spawn(move || {
158 for c in rx {
159 c.send(1).unwrap();
160 }
161 });
162
163 let (tx2, rx2) = mpsc::channel();
164 for _ in 0..N {
165 let (c1, p1) = oneshot::channel::<i32>();
166 let (c2, p2) = oneshot::channel::<i32>();
167
168 let tx3 = tx2.clone();
169 run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap()));
170 tx.send(c1).unwrap();
171 rx2.recv().unwrap();
172 drop(c2);
173 }
174 drop(tx);
175
176 t.join().unwrap();
177
178 std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
179}
180