1use futures::channel::oneshot::{self, Sender};
2use futures::executor::block_on;
3use futures::future::{poll_fn, FutureExt};
4use futures::task::{Context, Poll};
5use futures_test::task::panic_waker_ref;
6use std::sync::mpsc;
7use std::thread;
8
9#[test]
10fn smoke_poll() {
11 let (mut tx, rx) = oneshot::channel::<u32>();
12 let mut rx = Some(rx);
13 let f = poll_fn(|cx| {
14 assert!(tx.poll_canceled(cx).is_pending());
15 assert!(tx.poll_canceled(cx).is_pending());
16 drop(rx.take());
17 assert!(tx.poll_canceled(cx).is_ready());
18 assert!(tx.poll_canceled(cx).is_ready());
19 Poll::Ready(())
20 });
21
22 block_on(f);
23}
24
25#[test]
26fn cancel_notifies() {
27 let (mut tx, rx) = oneshot::channel::<u32>();
28
29 let t = thread::spawn(move || {
30 block_on(tx.cancellation());
31 });
32 drop(rx);
33 t.join().unwrap();
34}
35
36#[test]
37fn cancel_lots() {
38 const N: usize = if cfg!(miri) { 100 } else { 20000 };
39
40 let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
41 let t = thread::spawn(move || {
42 for (mut tx, tx2) in rx {
43 block_on(tx.cancellation());
44 tx2.send(()).unwrap();
45 }
46 });
47
48 for _ in 0..N {
49 let (otx, orx) = oneshot::channel::<u32>();
50 let (tx2, rx2) = mpsc::channel();
51 tx.send((otx, tx2)).unwrap();
52 drop(orx);
53 rx2.recv().unwrap();
54 }
55 drop(tx);
56
57 t.join().unwrap();
58}
59
60#[test]
61fn cancel_after_sender_drop_doesnt_notify() {
62 let (mut tx, rx) = oneshot::channel::<u32>();
63 let mut cx = Context::from_waker(panic_waker_ref());
64 assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
65 drop(tx);
66 drop(rx);
67}
68
69#[test]
70fn close() {
71 let (mut tx, mut rx) = oneshot::channel::<u32>();
72 rx.close();
73 block_on(poll_fn(|cx| {
74 match rx.poll_unpin(cx) {
75 Poll::Ready(Err(_)) => {}
76 _ => panic!(),
77 };
78 assert!(tx.poll_canceled(cx).is_ready());
79 Poll::Ready(())
80 }));
81}
82
83#[test]
84fn close_wakes() {
85 let (mut tx, mut rx) = oneshot::channel::<u32>();
86 let (tx2, rx2) = mpsc::channel();
87 let t = thread::spawn(move || {
88 rx.close();
89 rx2.recv().unwrap();
90 });
91 block_on(tx.cancellation());
92 tx2.send(()).unwrap();
93 t.join().unwrap();
94}
95
96#[test]
97fn is_canceled() {
98 let (tx, rx) = oneshot::channel::<u32>();
99 assert!(!tx.is_canceled());
100 drop(rx);
101 assert!(tx.is_canceled());
102}
103
104#[test]
105fn cancel_sends() {
106 const N: usize = if cfg!(miri) { 100 } else { 20000 };
107
108 let (tx, rx) = mpsc::channel::<Sender<_>>();
109 let t = thread::spawn(move || {
110 for otx in rx {
111 let _ = otx.send(42);
112 }
113 });
114
115 for _ in 0..N {
116 let (otx, mut orx) = oneshot::channel::<u32>();
117 tx.send(otx).unwrap();
118
119 orx.close();
120 let _ = block_on(orx);
121 }
122
123 drop(tx);
124 t.join().unwrap();
125}
126
127// #[test]
128// fn spawn_sends_items() {
129// let core = local_executor::Core::new();
130// let future = ok::<_, ()>(1);
131// let rx = spawn(future, &core);
132// assert_eq!(core.run(rx).unwrap(), 1);
133// }
134//
135// #[test]
136// fn spawn_kill_dead_stream() {
137// use std::thread;
138// use std::time::Duration;
139// use futures::future::Either;
140// use futures::sync::oneshot;
141//
142// // a future which never returns anything (forever accepting incoming
143// // connections), but dropping it leads to observable side effects
144// // (like closing listening sockets, releasing limited resources,
145// // ...)
146// #[derive(Debug)]
147// struct Dead {
148// // when dropped you should get Err(oneshot::Canceled) on the
149// // receiving end
150// done: oneshot::Sender<()>,
151// }
152// impl Future for Dead {
153// type Item = ();
154// type Error = ();
155//
156// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
157// Ok(Poll::Pending)
158// }
159// }
160//
161// // need to implement a timeout for the test, as it would hang
162// // forever right now
163// let (timeout_tx, timeout_rx) = oneshot::channel();
164// thread::spawn(move || {
165// thread::sleep(Duration::from_millis(1000));
166// let _ = timeout_tx.send(());
167// });
168//
169// let core = local_executor::Core::new();
170// let (done_tx, done_rx) = oneshot::channel();
171// let future = Dead{done: done_tx};
172// let rx = spawn(future, &core);
173// let res = core.run(
174// Ok::<_, ()>(())
175// .into_future()
176// .then(move |_| {
177// // now drop the spawned future: maybe some timeout exceeded,
178// // or some connection on this end was closed by the remote
179// // end.
180// drop(rx);
181// // and wait for the spawned future to release its resources
182// done_rx
183// })
184// .select2(timeout_rx)
185// );
186// match res {
187// Err(Either::A((oneshot::Canceled, _))) => (),
188// Ok(Either::B(((), _))) => {
189// panic!("dead future wasn't canceled (timeout)");
190// },
191// _ => {
192// panic!("dead future wasn't canceled (unexpected result)");
193// },
194// }
195// }
196//
197// #[test]
198// fn spawn_dont_kill_forgot_dead_stream() {
199// use std::thread;
200// use std::time::Duration;
201// use futures::future::Either;
202// use futures::sync::oneshot;
203//
204// // a future which never returns anything (forever accepting incoming
205// // connections), but dropping it leads to observable side effects
206// // (like closing listening sockets, releasing limited resources,
207// // ...)
208// #[derive(Debug)]
209// struct Dead {
210// // when dropped you should get Err(oneshot::Canceled) on the
211// // receiving end
212// done: oneshot::Sender<()>,
213// }
214// impl Future for Dead {
215// type Item = ();
216// type Error = ();
217//
218// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
219// Ok(Poll::Pending)
220// }
221// }
222//
223// // need to implement a timeout for the test, as it would hang
224// // forever right now
225// let (timeout_tx, timeout_rx) = oneshot::channel();
226// thread::spawn(move || {
227// thread::sleep(Duration::from_millis(1000));
228// let _ = timeout_tx.send(());
229// });
230//
231// let core = local_executor::Core::new();
232// let (done_tx, done_rx) = oneshot::channel();
233// let future = Dead{done: done_tx};
234// let rx = spawn(future, &core);
235// let res = core.run(
236// Ok::<_, ()>(())
237// .into_future()
238// .then(move |_| {
239// // forget the spawned future: should keep running, i.e. hit
240// // the timeout below.
241// rx.forget();
242// // and wait for the spawned future to release its resources
243// done_rx
244// })
245// .select2(timeout_rx)
246// );
247// match res {
248// Err(Either::A((oneshot::Canceled, _))) => {
249// panic!("forgotten dead future was canceled");
250// },
251// Ok(Either::B(((), _))) => (), // reached timeout
252// _ => {
253// panic!("forgotten dead future was canceled (unexpected result)");
254// },
255// }
256// }
257