1#![warn(rust_2018_idioms)]
2#![cfg(feature = "sync")]
3
4#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
5use wasm_bindgen_test::wasm_bindgen_test as test;
6#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
7use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
8
9#[cfg(not(all(target_family = "wasm", not(target_os = "wasi"))))]
10use tokio::test as maybe_tokio_test;
11
12use tokio::sync::oneshot;
13use tokio::sync::oneshot::error::TryRecvError;
14use tokio_test::*;
15
16use std::future::Future;
17use std::pin::Pin;
18use std::task::{Context, Poll};
19
20trait AssertSend: Send {}
21impl AssertSend for oneshot::Sender<i32> {}
22impl AssertSend for oneshot::Receiver<i32> {}
23
24trait SenderExt {
25 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
26}
27impl<T> SenderExt for oneshot::Sender<T> {
28 fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
29 tokio::pin! {
30 let fut = self.closed();
31 }
32 fut.poll(cx)
33 }
34}
35
36#[test]
37fn send_recv() {
38 let (tx, rx) = oneshot::channel();
39 let mut rx = task::spawn(rx);
40
41 assert_pending!(rx.poll());
42
43 assert_ok!(tx.send(1));
44
45 assert!(rx.is_woken());
46
47 let val = assert_ready_ok!(rx.poll());
48 assert_eq!(val, 1);
49}
50
51#[maybe_tokio_test]
52async fn async_send_recv() {
53 let (tx, rx) = oneshot::channel();
54
55 assert_ok!(tx.send(1));
56 assert_eq!(1, assert_ok!(rx.await));
57}
58
59#[test]
60fn close_tx() {
61 let (tx, rx) = oneshot::channel::<i32>();
62 let mut rx = task::spawn(rx);
63
64 assert_pending!(rx.poll());
65
66 drop(tx);
67
68 assert!(rx.is_woken());
69 assert_ready_err!(rx.poll());
70}
71
72#[test]
73fn close_rx() {
74 // First, without checking poll_closed()
75 //
76 let (tx, _) = oneshot::channel();
77
78 assert_err!(tx.send(1));
79
80 // Second, via poll_closed();
81
82 let (tx, rx) = oneshot::channel();
83 let mut tx = task::spawn(tx);
84
85 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
86
87 drop(rx);
88
89 assert!(tx.is_woken());
90 assert!(tx.is_closed());
91 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
92
93 assert_err!(tx.into_inner().send(1));
94}
95
96#[tokio::test]
97#[cfg(feature = "full")]
98async fn async_rx_closed() {
99 let (mut tx, rx) = oneshot::channel::<()>();
100
101 tokio::spawn(async move {
102 drop(rx);
103 });
104
105 tx.closed().await;
106}
107
108#[test]
109fn explicit_close_poll() {
110 // First, with message sent
111 let (tx, rx) = oneshot::channel();
112 let mut rx = task::spawn(rx);
113
114 assert_ok!(tx.send(1));
115
116 rx.close();
117
118 let value = assert_ready_ok!(rx.poll());
119 assert_eq!(value, 1);
120
121 // Second, without the message sent
122 let (tx, rx) = oneshot::channel::<i32>();
123 let mut tx = task::spawn(tx);
124 let mut rx = task::spawn(rx);
125
126 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
127
128 rx.close();
129
130 assert!(tx.is_woken());
131 assert!(tx.is_closed());
132 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
133
134 assert_err!(tx.into_inner().send(1));
135 assert_ready_err!(rx.poll());
136
137 // Again, but without sending the value this time
138 let (tx, rx) = oneshot::channel::<i32>();
139 let mut tx = task::spawn(tx);
140 let mut rx = task::spawn(rx);
141
142 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
143
144 rx.close();
145
146 assert!(tx.is_woken());
147 assert!(tx.is_closed());
148 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
149
150 assert_ready_err!(rx.poll());
151}
152
153#[test]
154fn explicit_close_try_recv() {
155 // First, with message sent
156 let (tx, mut rx) = oneshot::channel();
157
158 assert_ok!(tx.send(1));
159
160 rx.close();
161
162 let val = assert_ok!(rx.try_recv());
163 assert_eq!(1, val);
164
165 // Second, without the message sent
166 let (tx, mut rx) = oneshot::channel::<i32>();
167 let mut tx = task::spawn(tx);
168
169 assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
170
171 rx.close();
172
173 assert!(tx.is_woken());
174 assert!(tx.is_closed());
175 assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
176
177 assert_err!(rx.try_recv());
178}
179
180#[test]
181#[should_panic]
182#[cfg(not(target_family = "wasm"))] // wasm currently doesn't support unwinding
183fn close_try_recv_poll() {
184 let (_tx, rx) = oneshot::channel::<i32>();
185 let mut rx = task::spawn(rx);
186
187 rx.close();
188
189 assert_err!(rx.try_recv());
190
191 let _ = rx.poll();
192}
193
194#[test]
195fn close_after_recv() {
196 let (tx, mut rx) = oneshot::channel::<i32>();
197
198 tx.send(17).unwrap();
199
200 assert_eq!(17, rx.try_recv().unwrap());
201 rx.close();
202}
203
204#[test]
205fn try_recv_after_completion() {
206 let (tx, mut rx) = oneshot::channel::<i32>();
207
208 tx.send(17).unwrap();
209
210 assert_eq!(17, rx.try_recv().unwrap());
211 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
212 rx.close();
213}
214
215#[test]
216fn try_recv_after_completion_await() {
217 let (tx, rx) = oneshot::channel::<i32>();
218 let mut rx = task::spawn(rx);
219
220 tx.send(17).unwrap();
221
222 assert_eq!(Ok(17), assert_ready!(rx.poll()));
223 assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
224 rx.close();
225}
226
227#[test]
228fn drops_tasks() {
229 let (mut tx, mut rx) = oneshot::channel::<i32>();
230 let mut tx_task = task::spawn(());
231 let mut rx_task = task::spawn(());
232
233 assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
234 assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
235
236 drop(tx);
237 drop(rx);
238
239 assert_eq!(1, tx_task.waker_ref_count());
240 assert_eq!(1, rx_task.waker_ref_count());
241}
242
243#[test]
244fn receiver_changes_task() {
245 let (tx, mut rx) = oneshot::channel();
246
247 let mut task1 = task::spawn(());
248 let mut task2 = task::spawn(());
249
250 assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
251
252 assert_eq!(2, task1.waker_ref_count());
253 assert_eq!(1, task2.waker_ref_count());
254
255 assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
256
257 assert_eq!(1, task1.waker_ref_count());
258 assert_eq!(2, task2.waker_ref_count());
259
260 assert_ok!(tx.send(1));
261
262 assert!(!task1.is_woken());
263 assert!(task2.is_woken());
264
265 assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
266}
267
268#[test]
269fn sender_changes_task() {
270 let (mut tx, rx) = oneshot::channel::<i32>();
271
272 let mut task1 = task::spawn(());
273 let mut task2 = task::spawn(());
274
275 assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
276
277 assert_eq!(2, task1.waker_ref_count());
278 assert_eq!(1, task2.waker_ref_count());
279
280 assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
281
282 assert_eq!(1, task1.waker_ref_count());
283 assert_eq!(2, task2.waker_ref_count());
284
285 drop(rx);
286
287 assert!(!task1.is_woken());
288 assert!(task2.is_woken());
289
290 assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
291}
292