1 | use futures::future::poll_fn; |
2 | use tokio::sync::mpsc::channel; |
3 | use tokio_test::task::spawn; |
4 | use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; |
5 | use tokio_util::sync::PollSender; |
6 | |
7 | #[tokio::test ] |
8 | async fn simple() { |
9 | let (send, mut recv) = channel(3); |
10 | let mut send = PollSender::new(send); |
11 | |
12 | for i in 1..=3i32 { |
13 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
14 | assert_ready_ok!(reserve.poll()); |
15 | send.send_item(i).unwrap(); |
16 | } |
17 | |
18 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
19 | assert_pending!(reserve.poll()); |
20 | |
21 | assert_eq!(recv.recv().await.unwrap(), 1); |
22 | assert!(reserve.is_woken()); |
23 | assert_ready_ok!(reserve.poll()); |
24 | |
25 | drop(recv); |
26 | |
27 | send.send_item(42).unwrap(); |
28 | } |
29 | |
30 | #[tokio::test ] |
31 | async fn simple_ref() { |
32 | let v = vec![1, 2, 3i32]; |
33 | |
34 | let (send, mut recv) = channel(3); |
35 | let mut send = PollSender::new(send); |
36 | |
37 | for vi in v.iter() { |
38 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
39 | assert_ready_ok!(reserve.poll()); |
40 | send.send_item(vi).unwrap(); |
41 | } |
42 | |
43 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
44 | assert_pending!(reserve.poll()); |
45 | |
46 | assert_eq!(*recv.recv().await.unwrap(), 1); |
47 | assert!(reserve.is_woken()); |
48 | assert_ready_ok!(reserve.poll()); |
49 | drop(recv); |
50 | send.send_item(&42).unwrap(); |
51 | } |
52 | |
53 | #[tokio::test ] |
54 | async fn repeated_poll_reserve() { |
55 | let (send, mut recv) = channel::<i32>(1); |
56 | let mut send = PollSender::new(send); |
57 | |
58 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
59 | assert_ready_ok!(reserve.poll()); |
60 | assert_ready_ok!(reserve.poll()); |
61 | send.send_item(1).unwrap(); |
62 | |
63 | assert_eq!(recv.recv().await.unwrap(), 1); |
64 | } |
65 | |
66 | #[tokio::test ] |
67 | async fn abort_send() { |
68 | let (send, mut recv) = channel(3); |
69 | let mut send = PollSender::new(send); |
70 | let send2 = send.get_ref().cloned().unwrap(); |
71 | |
72 | for i in 1..=3i32 { |
73 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
74 | assert_ready_ok!(reserve.poll()); |
75 | send.send_item(i).unwrap(); |
76 | } |
77 | |
78 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
79 | assert_pending!(reserve.poll()); |
80 | assert_eq!(recv.recv().await.unwrap(), 1); |
81 | assert!(reserve.is_woken()); |
82 | assert_ready_ok!(reserve.poll()); |
83 | |
84 | let mut send2_send = spawn(send2.send(5)); |
85 | assert_pending!(send2_send.poll()); |
86 | assert!(send.abort_send()); |
87 | assert!(send2_send.is_woken()); |
88 | assert_ready_ok!(send2_send.poll()); |
89 | |
90 | assert_eq!(recv.recv().await.unwrap(), 2); |
91 | assert_eq!(recv.recv().await.unwrap(), 3); |
92 | assert_eq!(recv.recv().await.unwrap(), 5); |
93 | } |
94 | |
95 | #[tokio::test ] |
96 | async fn close_sender_last() { |
97 | let (send, mut recv) = channel::<i32>(3); |
98 | let mut send = PollSender::new(send); |
99 | |
100 | let mut recv_task = spawn(recv.recv()); |
101 | assert_pending!(recv_task.poll()); |
102 | |
103 | send.close(); |
104 | |
105 | assert!(recv_task.is_woken()); |
106 | assert!(assert_ready!(recv_task.poll()).is_none()); |
107 | } |
108 | |
109 | #[tokio::test ] |
110 | async fn close_sender_not_last() { |
111 | let (send, mut recv) = channel::<i32>(3); |
112 | let mut send = PollSender::new(send); |
113 | let send2 = send.get_ref().cloned().unwrap(); |
114 | |
115 | let mut recv_task = spawn(recv.recv()); |
116 | assert_pending!(recv_task.poll()); |
117 | |
118 | send.close(); |
119 | |
120 | assert!(!recv_task.is_woken()); |
121 | assert_pending!(recv_task.poll()); |
122 | |
123 | drop(send2); |
124 | |
125 | assert!(recv_task.is_woken()); |
126 | assert!(assert_ready!(recv_task.poll()).is_none()); |
127 | } |
128 | |
129 | #[tokio::test ] |
130 | async fn close_sender_before_reserve() { |
131 | let (send, mut recv) = channel::<i32>(3); |
132 | let mut send = PollSender::new(send); |
133 | |
134 | let mut recv_task = spawn(recv.recv()); |
135 | assert_pending!(recv_task.poll()); |
136 | |
137 | send.close(); |
138 | |
139 | assert!(recv_task.is_woken()); |
140 | assert!(assert_ready!(recv_task.poll()).is_none()); |
141 | |
142 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
143 | assert_ready_err!(reserve.poll()); |
144 | } |
145 | |
146 | #[tokio::test ] |
147 | async fn close_sender_after_pending_reserve() { |
148 | let (send, mut recv) = channel::<i32>(1); |
149 | let mut send = PollSender::new(send); |
150 | |
151 | let mut recv_task = spawn(recv.recv()); |
152 | assert_pending!(recv_task.poll()); |
153 | |
154 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
155 | assert_ready_ok!(reserve.poll()); |
156 | send.send_item(1).unwrap(); |
157 | |
158 | assert!(recv_task.is_woken()); |
159 | |
160 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
161 | assert_pending!(reserve.poll()); |
162 | drop(reserve); |
163 | |
164 | send.close(); |
165 | |
166 | assert!(send.is_closed()); |
167 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
168 | assert_ready_err!(reserve.poll()); |
169 | } |
170 | |
171 | #[tokio::test ] |
172 | async fn close_sender_after_successful_reserve() { |
173 | let (send, mut recv) = channel::<i32>(3); |
174 | let mut send = PollSender::new(send); |
175 | |
176 | let mut recv_task = spawn(recv.recv()); |
177 | assert_pending!(recv_task.poll()); |
178 | |
179 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
180 | assert_ready_ok!(reserve.poll()); |
181 | drop(reserve); |
182 | |
183 | send.close(); |
184 | assert!(send.is_closed()); |
185 | assert!(!recv_task.is_woken()); |
186 | assert_pending!(recv_task.poll()); |
187 | |
188 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
189 | assert_ready_ok!(reserve.poll()); |
190 | } |
191 | |
192 | #[tokio::test ] |
193 | async fn abort_send_after_pending_reserve() { |
194 | let (send, mut recv) = channel::<i32>(1); |
195 | let mut send = PollSender::new(send); |
196 | |
197 | let mut recv_task = spawn(recv.recv()); |
198 | assert_pending!(recv_task.poll()); |
199 | |
200 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
201 | assert_ready_ok!(reserve.poll()); |
202 | send.send_item(1).unwrap(); |
203 | |
204 | assert_eq!(send.get_ref().unwrap().capacity(), 0); |
205 | assert!(!send.abort_send()); |
206 | |
207 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
208 | assert_pending!(reserve.poll()); |
209 | |
210 | assert!(send.abort_send()); |
211 | assert_eq!(send.get_ref().unwrap().capacity(), 0); |
212 | } |
213 | |
214 | #[tokio::test ] |
215 | async fn abort_send_after_successful_reserve() { |
216 | let (send, mut recv) = channel::<i32>(1); |
217 | let mut send = PollSender::new(send); |
218 | |
219 | let mut recv_task = spawn(recv.recv()); |
220 | assert_pending!(recv_task.poll()); |
221 | |
222 | assert_eq!(send.get_ref().unwrap().capacity(), 1); |
223 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
224 | assert_ready_ok!(reserve.poll()); |
225 | assert_eq!(send.get_ref().unwrap().capacity(), 0); |
226 | |
227 | assert!(send.abort_send()); |
228 | assert_eq!(send.get_ref().unwrap().capacity(), 1); |
229 | } |
230 | |
231 | #[tokio::test ] |
232 | async fn closed_when_receiver_drops() { |
233 | let (send, _) = channel::<i32>(1); |
234 | let mut send = PollSender::new(send); |
235 | |
236 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
237 | assert_ready_err!(reserve.poll()); |
238 | } |
239 | |
240 | #[should_panic ] |
241 | #[test] |
242 | fn start_send_panics_when_idle() { |
243 | let (send, _) = channel::<i32>(3); |
244 | let mut send = PollSender::new(send); |
245 | |
246 | send.send_item(1).unwrap(); |
247 | } |
248 | |
249 | #[should_panic ] |
250 | #[test] |
251 | fn start_send_panics_when_acquiring() { |
252 | let (send, _) = channel::<i32>(1); |
253 | let mut send = PollSender::new(send); |
254 | |
255 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
256 | assert_ready_ok!(reserve.poll()); |
257 | send.send_item(1).unwrap(); |
258 | |
259 | let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx))); |
260 | assert_pending!(reserve.poll()); |
261 | send.send_item(2).unwrap(); |
262 | } |
263 | |