1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "sync" )] |
3 | |
4 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
5 | use wasm_bindgen_test::wasm_bindgen_test as test; |
6 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
7 | use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; |
8 | |
9 | #[cfg (not(all(target_family = "wasm" , not(target_os = "wasi" ))))] |
10 | use tokio::test as maybe_tokio_test ; |
11 | |
12 | use tokio::sync::oneshot; |
13 | use tokio::sync::oneshot::error::TryRecvError; |
14 | use tokio_test::*; |
15 | |
16 | use std::future::Future; |
17 | use std::pin::Pin; |
18 | use std::task::{Context, Poll}; |
19 | |
20 | trait AssertSend: Send {} |
21 | impl AssertSend for oneshot::Sender<i32> {} |
22 | impl AssertSend for oneshot::Receiver<i32> {} |
23 | |
24 | trait SenderExt { |
25 | fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>; |
26 | } |
27 | impl<T> SenderExt for oneshot::Sender<T> { |
28 | fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
29 | tokio::pin! { |
30 | let fut = self.closed(); |
31 | } |
32 | fut.poll(cx) |
33 | } |
34 | } |
35 | |
36 | #[test] |
37 | fn send_recv() { |
38 | let (tx, rx) = oneshot::channel(); |
39 | let mut rx = task::spawn(rx); |
40 | |
41 | assert_pending!(rx.poll()); |
42 | |
43 | assert_ok!(tx.send(1)); |
44 | |
45 | assert!(rx.is_woken()); |
46 | |
47 | let val = assert_ready_ok!(rx.poll()); |
48 | assert_eq!(val, 1); |
49 | } |
50 | |
51 | #[maybe_tokio_test ] |
52 | async fn async_send_recv() { |
53 | let (tx, rx) = oneshot::channel(); |
54 | |
55 | assert_ok!(tx.send(1)); |
56 | assert_eq!(1, assert_ok!(rx.await)); |
57 | } |
58 | |
59 | #[test] |
60 | fn close_tx() { |
61 | let (tx, rx) = oneshot::channel::<i32>(); |
62 | let mut rx = task::spawn(rx); |
63 | |
64 | assert_pending!(rx.poll()); |
65 | |
66 | drop(tx); |
67 | |
68 | assert!(rx.is_woken()); |
69 | assert_ready_err!(rx.poll()); |
70 | } |
71 | |
72 | #[test] |
73 | fn close_rx() { |
74 | // First, without checking poll_closed() |
75 | // |
76 | let (tx, _) = oneshot::channel(); |
77 | |
78 | assert_err!(tx.send(1)); |
79 | |
80 | // Second, via poll_closed(); |
81 | |
82 | let (tx, rx) = oneshot::channel(); |
83 | let mut tx = task::spawn(tx); |
84 | |
85 | assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
86 | |
87 | drop(rx); |
88 | |
89 | assert!(tx.is_woken()); |
90 | assert!(tx.is_closed()); |
91 | assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
92 | |
93 | assert_err!(tx.into_inner().send(1)); |
94 | } |
95 | |
96 | #[tokio::test ] |
97 | #[cfg (feature = "full" )] |
98 | async fn async_rx_closed() { |
99 | let (mut tx, rx) = oneshot::channel::<()>(); |
100 | |
101 | tokio::spawn(async move { |
102 | drop(rx); |
103 | }); |
104 | |
105 | tx.closed().await; |
106 | } |
107 | |
108 | #[test] |
109 | fn explicit_close_poll() { |
110 | // First, with message sent |
111 | let (tx, rx) = oneshot::channel(); |
112 | let mut rx = task::spawn(rx); |
113 | |
114 | assert_ok!(tx.send(1)); |
115 | |
116 | rx.close(); |
117 | |
118 | let value = assert_ready_ok!(rx.poll()); |
119 | assert_eq!(value, 1); |
120 | |
121 | // Second, without the message sent |
122 | let (tx, rx) = oneshot::channel::<i32>(); |
123 | let mut tx = task::spawn(tx); |
124 | let mut rx = task::spawn(rx); |
125 | |
126 | assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
127 | |
128 | rx.close(); |
129 | |
130 | assert!(tx.is_woken()); |
131 | assert!(tx.is_closed()); |
132 | assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
133 | |
134 | assert_err!(tx.into_inner().send(1)); |
135 | assert_ready_err!(rx.poll()); |
136 | |
137 | // Again, but without sending the value this time |
138 | let (tx, rx) = oneshot::channel::<i32>(); |
139 | let mut tx = task::spawn(tx); |
140 | let mut rx = task::spawn(rx); |
141 | |
142 | assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
143 | |
144 | rx.close(); |
145 | |
146 | assert!(tx.is_woken()); |
147 | assert!(tx.is_closed()); |
148 | assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
149 | |
150 | assert_ready_err!(rx.poll()); |
151 | } |
152 | |
153 | #[test] |
154 | fn explicit_close_try_recv() { |
155 | // First, with message sent |
156 | let (tx, mut rx) = oneshot::channel(); |
157 | |
158 | assert_ok!(tx.send(1)); |
159 | |
160 | rx.close(); |
161 | |
162 | let val = assert_ok!(rx.try_recv()); |
163 | assert_eq!(1, val); |
164 | |
165 | // Second, without the message sent |
166 | let (tx, mut rx) = oneshot::channel::<i32>(); |
167 | let mut tx = task::spawn(tx); |
168 | |
169 | assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
170 | |
171 | rx.close(); |
172 | |
173 | assert!(tx.is_woken()); |
174 | assert!(tx.is_closed()); |
175 | assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx))); |
176 | |
177 | assert_err!(rx.try_recv()); |
178 | } |
179 | |
180 | #[test] |
181 | #[should_panic ] |
182 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
183 | fn close_try_recv_poll() { |
184 | let (_tx, rx) = oneshot::channel::<i32>(); |
185 | let mut rx = task::spawn(rx); |
186 | |
187 | rx.close(); |
188 | |
189 | assert_err!(rx.try_recv()); |
190 | |
191 | let _ = rx.poll(); |
192 | } |
193 | |
194 | #[test] |
195 | fn close_after_recv() { |
196 | let (tx, mut rx) = oneshot::channel::<i32>(); |
197 | |
198 | tx.send(17).unwrap(); |
199 | |
200 | assert_eq!(17, rx.try_recv().unwrap()); |
201 | rx.close(); |
202 | } |
203 | |
204 | #[test] |
205 | fn try_recv_after_completion() { |
206 | let (tx, mut rx) = oneshot::channel::<i32>(); |
207 | |
208 | tx.send(17).unwrap(); |
209 | |
210 | assert_eq!(17, rx.try_recv().unwrap()); |
211 | assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); |
212 | rx.close(); |
213 | } |
214 | |
215 | #[test] |
216 | fn try_recv_after_completion_await() { |
217 | let (tx, rx) = oneshot::channel::<i32>(); |
218 | let mut rx = task::spawn(rx); |
219 | |
220 | tx.send(17).unwrap(); |
221 | |
222 | assert_eq!(Ok(17), assert_ready!(rx.poll())); |
223 | assert_eq!(Err(TryRecvError::Closed), rx.try_recv()); |
224 | rx.close(); |
225 | } |
226 | |
227 | #[test] |
228 | fn drops_tasks() { |
229 | let (mut tx, mut rx) = oneshot::channel::<i32>(); |
230 | let mut tx_task = task::spawn(()); |
231 | let mut rx_task = task::spawn(()); |
232 | |
233 | assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx))); |
234 | assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
235 | |
236 | drop(tx); |
237 | drop(rx); |
238 | |
239 | assert_eq!(1, tx_task.waker_ref_count()); |
240 | assert_eq!(1, rx_task.waker_ref_count()); |
241 | } |
242 | |
243 | #[test] |
244 | fn receiver_changes_task() { |
245 | let (tx, mut rx) = oneshot::channel(); |
246 | |
247 | let mut task1 = task::spawn(()); |
248 | let mut task2 = task::spawn(()); |
249 | |
250 | assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
251 | |
252 | assert_eq!(2, task1.waker_ref_count()); |
253 | assert_eq!(1, task2.waker_ref_count()); |
254 | |
255 | assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
256 | |
257 | assert_eq!(1, task1.waker_ref_count()); |
258 | assert_eq!(2, task2.waker_ref_count()); |
259 | |
260 | assert_ok!(tx.send(1)); |
261 | |
262 | assert!(!task1.is_woken()); |
263 | assert!(task2.is_woken()); |
264 | |
265 | assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx))); |
266 | } |
267 | |
268 | #[test] |
269 | fn sender_changes_task() { |
270 | let (mut tx, rx) = oneshot::channel::<i32>(); |
271 | |
272 | let mut task1 = task::spawn(()); |
273 | let mut task2 = task::spawn(()); |
274 | |
275 | assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx))); |
276 | |
277 | assert_eq!(2, task1.waker_ref_count()); |
278 | assert_eq!(1, task2.waker_ref_count()); |
279 | |
280 | assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx))); |
281 | |
282 | assert_eq!(1, task1.waker_ref_count()); |
283 | assert_eq!(2, task2.waker_ref_count()); |
284 | |
285 | drop(rx); |
286 | |
287 | assert!(!task1.is_woken()); |
288 | assert!(task2.is_woken()); |
289 | |
290 | assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx))); |
291 | } |
292 | |