1 | use futures::channel::oneshot; |
2 | use futures::executor::ThreadPool; |
3 | use futures::future::{self, ok, Future, FutureExt, TryFutureExt}; |
4 | use futures::task::SpawnExt; |
5 | use std::sync::mpsc; |
6 | use std::thread; |
7 | |
8 | fn run<F: Future + Send + 'static>(future: F) { |
9 | let tp = ThreadPool::new().unwrap(); |
10 | tp.spawn(future.map(drop)).unwrap(); |
11 | } |
12 | |
13 | #[test] |
14 | fn join1() { |
15 | let (tx, rx) = mpsc::channel(); |
16 | run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap())); |
17 | assert_eq!(rx.recv(), Ok((1, 2))); |
18 | assert!(rx.recv().is_err()); |
19 | |
20 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
21 | } |
22 | |
23 | #[test] |
24 | fn join2() { |
25 | let (c1, p1) = oneshot::channel::<i32>(); |
26 | let (c2, p2) = oneshot::channel::<i32>(); |
27 | let (tx, rx) = mpsc::channel(); |
28 | run(future::try_join(p1, p2).map_ok(move |v| tx.send(v).unwrap())); |
29 | assert!(rx.try_recv().is_err()); |
30 | c1.send(1).unwrap(); |
31 | assert!(rx.try_recv().is_err()); |
32 | c2.send(2).unwrap(); |
33 | assert_eq!(rx.recv(), Ok((1, 2))); |
34 | assert!(rx.recv().is_err()); |
35 | |
36 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
37 | } |
38 | |
39 | #[test] |
40 | fn join3() { |
41 | let (c1, p1) = oneshot::channel::<i32>(); |
42 | let (c2, p2) = oneshot::channel::<i32>(); |
43 | let (tx, rx) = mpsc::channel(); |
44 | run(future::try_join(p1, p2).map_err(move |_v| tx.send(1).unwrap())); |
45 | assert!(rx.try_recv().is_err()); |
46 | drop(c1); |
47 | assert_eq!(rx.recv(), Ok(1)); |
48 | assert!(rx.recv().is_err()); |
49 | drop(c2); |
50 | |
51 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
52 | } |
53 | |
54 | #[test] |
55 | fn join4() { |
56 | let (c1, p1) = oneshot::channel::<i32>(); |
57 | let (c2, p2) = oneshot::channel::<i32>(); |
58 | let (tx, rx) = mpsc::channel(); |
59 | run(future::try_join(p1, p2).map_err(move |v| tx.send(v).unwrap())); |
60 | assert!(rx.try_recv().is_err()); |
61 | drop(c1); |
62 | assert!(rx.recv().is_ok()); |
63 | drop(c2); |
64 | assert!(rx.recv().is_err()); |
65 | |
66 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
67 | } |
68 | |
69 | #[test] |
70 | fn join5() { |
71 | let (c1, p1) = oneshot::channel::<i32>(); |
72 | let (c2, p2) = oneshot::channel::<i32>(); |
73 | let (c3, p3) = oneshot::channel::<i32>(); |
74 | let (tx, rx) = mpsc::channel(); |
75 | run(future::try_join(future::try_join(p1, p2), p3).map_ok(move |v| tx.send(v).unwrap())); |
76 | assert!(rx.try_recv().is_err()); |
77 | c1.send(1).unwrap(); |
78 | assert!(rx.try_recv().is_err()); |
79 | c2.send(2).unwrap(); |
80 | assert!(rx.try_recv().is_err()); |
81 | c3.send(3).unwrap(); |
82 | assert_eq!(rx.recv(), Ok(((1, 2), 3))); |
83 | assert!(rx.recv().is_err()); |
84 | |
85 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
86 | } |
87 | |
88 | #[test] |
89 | fn select1() { |
90 | let (c1, p1) = oneshot::channel::<i32>(); |
91 | let (c2, p2) = oneshot::channel::<i32>(); |
92 | let (tx, rx) = mpsc::channel(); |
93 | run(future::try_select(p1, p2).map_ok(move |v| tx.send(v).unwrap())); |
94 | assert!(rx.try_recv().is_err()); |
95 | c1.send(1).unwrap(); |
96 | let (v, p2) = rx.recv().unwrap().into_inner(); |
97 | assert_eq!(v, 1); |
98 | assert!(rx.recv().is_err()); |
99 | |
100 | let (tx, rx) = mpsc::channel(); |
101 | run(p2.map_ok(move |v| tx.send(v).unwrap())); |
102 | c2.send(2).unwrap(); |
103 | assert_eq!(rx.recv(), Ok(2)); |
104 | assert!(rx.recv().is_err()); |
105 | |
106 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
107 | } |
108 | |
109 | #[test] |
110 | fn select2() { |
111 | let (c1, p1) = oneshot::channel::<i32>(); |
112 | let (c2, p2) = oneshot::channel::<i32>(); |
113 | let (tx, rx) = mpsc::channel(); |
114 | run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); |
115 | assert!(rx.try_recv().is_err()); |
116 | drop(c1); |
117 | let (v, p2) = rx.recv().unwrap(); |
118 | assert_eq!(v, 1); |
119 | assert!(rx.recv().is_err()); |
120 | |
121 | let (tx, rx) = mpsc::channel(); |
122 | run(p2.map_ok(move |v| tx.send(v).unwrap())); |
123 | c2.send(2).unwrap(); |
124 | assert_eq!(rx.recv(), Ok(2)); |
125 | assert!(rx.recv().is_err()); |
126 | |
127 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
128 | } |
129 | |
130 | #[test] |
131 | fn select3() { |
132 | let (c1, p1) = oneshot::channel::<i32>(); |
133 | let (c2, p2) = oneshot::channel::<i32>(); |
134 | let (tx, rx) = mpsc::channel(); |
135 | run(future::try_select(p1, p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap())); |
136 | assert!(rx.try_recv().is_err()); |
137 | drop(c1); |
138 | let (v, p2) = rx.recv().unwrap(); |
139 | assert_eq!(v, 1); |
140 | assert!(rx.recv().is_err()); |
141 | |
142 | let (tx, rx) = mpsc::channel(); |
143 | run(p2.map_err(move |_v| tx.send(2).unwrap())); |
144 | drop(c2); |
145 | assert_eq!(rx.recv(), Ok(2)); |
146 | assert!(rx.recv().is_err()); |
147 | |
148 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
149 | } |
150 | |
151 | #[test] |
152 | fn select4() { |
153 | const N: usize = if cfg!(miri) { 100 } else { 10000 }; |
154 | |
155 | let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>(); |
156 | |
157 | let t = thread::spawn(move || { |
158 | for c in rx { |
159 | c.send(1).unwrap(); |
160 | } |
161 | }); |
162 | |
163 | let (tx2, rx2) = mpsc::channel(); |
164 | for _ in 0..N { |
165 | let (c1, p1) = oneshot::channel::<i32>(); |
166 | let (c2, p2) = oneshot::channel::<i32>(); |
167 | |
168 | let tx3 = tx2.clone(); |
169 | run(future::try_select(p1, p2).map_ok(move |_| tx3.send(()).unwrap())); |
170 | tx.send(c1).unwrap(); |
171 | rx2.recv().unwrap(); |
172 | drop(c2); |
173 | } |
174 | drop(tx); |
175 | |
176 | t.join().unwrap(); |
177 | |
178 | std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 |
179 | } |
180 | |