1 | use futures::channel::oneshot::{self, Sender}; |
2 | use futures::executor::block_on; |
3 | use futures::future::{poll_fn, FutureExt}; |
4 | use futures::task::{Context, Poll}; |
5 | use futures_test::task::panic_waker_ref; |
6 | use std::sync::mpsc; |
7 | use std::thread; |
8 | |
9 | #[test] |
10 | fn smoke_poll() { |
11 | let (mut tx, rx) = oneshot::channel::<u32>(); |
12 | let mut rx = Some(rx); |
13 | let f = poll_fn(|cx| { |
14 | assert!(tx.poll_canceled(cx).is_pending()); |
15 | assert!(tx.poll_canceled(cx).is_pending()); |
16 | drop(rx.take()); |
17 | assert!(tx.poll_canceled(cx).is_ready()); |
18 | assert!(tx.poll_canceled(cx).is_ready()); |
19 | Poll::Ready(()) |
20 | }); |
21 | |
22 | block_on(f); |
23 | } |
24 | |
25 | #[test] |
26 | fn cancel_notifies() { |
27 | let (mut tx, rx) = oneshot::channel::<u32>(); |
28 | |
29 | let t = thread::spawn(move || { |
30 | block_on(tx.cancellation()); |
31 | }); |
32 | drop(rx); |
33 | t.join().unwrap(); |
34 | } |
35 | |
36 | #[test] |
37 | fn cancel_lots() { |
38 | const N: usize = if cfg!(miri) { 100 } else { 20000 }; |
39 | |
40 | let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); |
41 | let t = thread::spawn(move || { |
42 | for (mut tx, tx2) in rx { |
43 | block_on(tx.cancellation()); |
44 | tx2.send(()).unwrap(); |
45 | } |
46 | }); |
47 | |
48 | for _ in 0..N { |
49 | let (otx, orx) = oneshot::channel::<u32>(); |
50 | let (tx2, rx2) = mpsc::channel(); |
51 | tx.send((otx, tx2)).unwrap(); |
52 | drop(orx); |
53 | rx2.recv().unwrap(); |
54 | } |
55 | drop(tx); |
56 | |
57 | t.join().unwrap(); |
58 | } |
59 | |
60 | #[test] |
61 | fn cancel_after_sender_drop_doesnt_notify() { |
62 | let (mut tx, rx) = oneshot::channel::<u32>(); |
63 | let mut cx = Context::from_waker(panic_waker_ref()); |
64 | assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending); |
65 | drop(tx); |
66 | drop(rx); |
67 | } |
68 | |
69 | #[test] |
70 | fn close() { |
71 | let (mut tx, mut rx) = oneshot::channel::<u32>(); |
72 | rx.close(); |
73 | block_on(poll_fn(|cx| { |
74 | match rx.poll_unpin(cx) { |
75 | Poll::Ready(Err(_)) => {} |
76 | _ => panic!(), |
77 | }; |
78 | assert!(tx.poll_canceled(cx).is_ready()); |
79 | Poll::Ready(()) |
80 | })); |
81 | } |
82 | |
83 | #[test] |
84 | fn close_wakes() { |
85 | let (mut tx, mut rx) = oneshot::channel::<u32>(); |
86 | let (tx2, rx2) = mpsc::channel(); |
87 | let t = thread::spawn(move || { |
88 | rx.close(); |
89 | rx2.recv().unwrap(); |
90 | }); |
91 | block_on(tx.cancellation()); |
92 | tx2.send(()).unwrap(); |
93 | t.join().unwrap(); |
94 | } |
95 | |
96 | #[test] |
97 | fn is_canceled() { |
98 | let (tx, rx) = oneshot::channel::<u32>(); |
99 | assert!(!tx.is_canceled()); |
100 | drop(rx); |
101 | assert!(tx.is_canceled()); |
102 | } |
103 | |
104 | #[test] |
105 | fn cancel_sends() { |
106 | const N: usize = if cfg!(miri) { 100 } else { 20000 }; |
107 | |
108 | let (tx, rx) = mpsc::channel::<Sender<_>>(); |
109 | let t = thread::spawn(move || { |
110 | for otx in rx { |
111 | let _ = otx.send(42); |
112 | } |
113 | }); |
114 | |
115 | for _ in 0..N { |
116 | let (otx, mut orx) = oneshot::channel::<u32>(); |
117 | tx.send(otx).unwrap(); |
118 | |
119 | orx.close(); |
120 | let _ = block_on(orx); |
121 | } |
122 | |
123 | drop(tx); |
124 | t.join().unwrap(); |
125 | } |
126 | |
127 | // #[test] |
128 | // fn spawn_sends_items() { |
129 | // let core = local_executor::Core::new(); |
130 | // let future = ok::<_, ()>(1); |
131 | // let rx = spawn(future, &core); |
132 | // assert_eq!(core.run(rx).unwrap(), 1); |
133 | // } |
134 | // |
135 | // #[test] |
136 | // fn spawn_kill_dead_stream() { |
137 | // use std::thread; |
138 | // use std::time::Duration; |
139 | // use futures::future::Either; |
140 | // use futures::sync::oneshot; |
141 | // |
142 | // // a future which never returns anything (forever accepting incoming |
143 | // // connections), but dropping it leads to observable side effects |
144 | // // (like closing listening sockets, releasing limited resources, |
145 | // // ...) |
146 | // #[derive(Debug)] |
147 | // struct Dead { |
148 | // // when dropped you should get Err(oneshot::Canceled) on the |
149 | // // receiving end |
150 | // done: oneshot::Sender<()>, |
151 | // } |
152 | // impl Future for Dead { |
153 | // type Item = (); |
154 | // type Error = (); |
155 | // |
156 | // fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |
157 | // Ok(Poll::Pending) |
158 | // } |
159 | // } |
160 | // |
161 | // // need to implement a timeout for the test, as it would hang |
162 | // // forever right now |
163 | // let (timeout_tx, timeout_rx) = oneshot::channel(); |
164 | // thread::spawn(move || { |
165 | // thread::sleep(Duration::from_millis(1000)); |
166 | // let _ = timeout_tx.send(()); |
167 | // }); |
168 | // |
169 | // let core = local_executor::Core::new(); |
170 | // let (done_tx, done_rx) = oneshot::channel(); |
171 | // let future = Dead{done: done_tx}; |
172 | // let rx = spawn(future, &core); |
173 | // let res = core.run( |
174 | // Ok::<_, ()>(()) |
175 | // .into_future() |
176 | // .then(move |_| { |
177 | // // now drop the spawned future: maybe some timeout exceeded, |
178 | // // or some connection on this end was closed by the remote |
179 | // // end. |
180 | // drop(rx); |
181 | // // and wait for the spawned future to release its resources |
182 | // done_rx |
183 | // }) |
184 | // .select2(timeout_rx) |
185 | // ); |
186 | // match res { |
187 | // Err(Either::A((oneshot::Canceled, _))) => (), |
188 | // Ok(Either::B(((), _))) => { |
189 | // panic!("dead future wasn't canceled (timeout)"); |
190 | // }, |
191 | // _ => { |
192 | // panic!("dead future wasn't canceled (unexpected result)"); |
193 | // }, |
194 | // } |
195 | // } |
196 | // |
197 | // #[test] |
198 | // fn spawn_dont_kill_forgot_dead_stream() { |
199 | // use std::thread; |
200 | // use std::time::Duration; |
201 | // use futures::future::Either; |
202 | // use futures::sync::oneshot; |
203 | // |
204 | // // a future which never returns anything (forever accepting incoming |
205 | // // connections), but dropping it leads to observable side effects |
206 | // // (like closing listening sockets, releasing limited resources, |
207 | // // ...) |
208 | // #[derive(Debug)] |
209 | // struct Dead { |
210 | // // when dropped you should get Err(oneshot::Canceled) on the |
211 | // // receiving end |
212 | // done: oneshot::Sender<()>, |
213 | // } |
214 | // impl Future for Dead { |
215 | // type Item = (); |
216 | // type Error = (); |
217 | // |
218 | // fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |
219 | // Ok(Poll::Pending) |
220 | // } |
221 | // } |
222 | // |
223 | // // need to implement a timeout for the test, as it would hang |
224 | // // forever right now |
225 | // let (timeout_tx, timeout_rx) = oneshot::channel(); |
226 | // thread::spawn(move || { |
227 | // thread::sleep(Duration::from_millis(1000)); |
228 | // let _ = timeout_tx.send(()); |
229 | // }); |
230 | // |
231 | // let core = local_executor::Core::new(); |
232 | // let (done_tx, done_rx) = oneshot::channel(); |
233 | // let future = Dead{done: done_tx}; |
234 | // let rx = spawn(future, &core); |
235 | // let res = core.run( |
236 | // Ok::<_, ()>(()) |
237 | // .into_future() |
238 | // .then(move |_| { |
239 | // // forget the spawned future: should keep running, i.e. hit |
240 | // // the timeout below. |
241 | // rx.forget(); |
242 | // // and wait for the spawned future to release its resources |
243 | // done_rx |
244 | // }) |
245 | // .select2(timeout_rx) |
246 | // ); |
247 | // match res { |
248 | // Err(Either::A((oneshot::Canceled, _))) => { |
249 | // panic!("forgotten dead future was canceled"); |
250 | // }, |
251 | // Ok(Either::B(((), _))) => (), // reached timeout |
252 | // _ => { |
253 | // panic!("forgotten dead future was canceled (unexpected result)"); |
254 | // }, |
255 | // } |
256 | // } |
257 | |