1use futures::future::poll_fn;
2use tokio::sync::mpsc::channel;
3use tokio_test::task::spawn;
4use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
5use tokio_util::sync::PollSender;
6
7#[tokio::test]
8async fn simple() {
9 let (send, mut recv) = channel(3);
10 let mut send = PollSender::new(send);
11
12 for i in 1..=3i32 {
13 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
14 assert_ready_ok!(reserve.poll());
15 send.send_item(i).unwrap();
16 }
17
18 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
19 assert_pending!(reserve.poll());
20
21 assert_eq!(recv.recv().await.unwrap(), 1);
22 assert!(reserve.is_woken());
23 assert_ready_ok!(reserve.poll());
24
25 drop(recv);
26
27 send.send_item(42).unwrap();
28}
29
30#[tokio::test]
31async fn simple_ref() {
32 let v = vec![1, 2, 3i32];
33
34 let (send, mut recv) = channel(3);
35 let mut send = PollSender::new(send);
36
37 for vi in v.iter() {
38 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
39 assert_ready_ok!(reserve.poll());
40 send.send_item(vi).unwrap();
41 }
42
43 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
44 assert_pending!(reserve.poll());
45
46 assert_eq!(*recv.recv().await.unwrap(), 1);
47 assert!(reserve.is_woken());
48 assert_ready_ok!(reserve.poll());
49 drop(recv);
50 send.send_item(&42).unwrap();
51}
52
53#[tokio::test]
54async fn repeated_poll_reserve() {
55 let (send, mut recv) = channel::<i32>(1);
56 let mut send = PollSender::new(send);
57
58 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
59 assert_ready_ok!(reserve.poll());
60 assert_ready_ok!(reserve.poll());
61 send.send_item(1).unwrap();
62
63 assert_eq!(recv.recv().await.unwrap(), 1);
64}
65
66#[tokio::test]
67async fn abort_send() {
68 let (send, mut recv) = channel(3);
69 let mut send = PollSender::new(send);
70 let send2 = send.get_ref().cloned().unwrap();
71
72 for i in 1..=3i32 {
73 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
74 assert_ready_ok!(reserve.poll());
75 send.send_item(i).unwrap();
76 }
77
78 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
79 assert_pending!(reserve.poll());
80 assert_eq!(recv.recv().await.unwrap(), 1);
81 assert!(reserve.is_woken());
82 assert_ready_ok!(reserve.poll());
83
84 let mut send2_send = spawn(send2.send(5));
85 assert_pending!(send2_send.poll());
86 assert!(send.abort_send());
87 assert!(send2_send.is_woken());
88 assert_ready_ok!(send2_send.poll());
89
90 assert_eq!(recv.recv().await.unwrap(), 2);
91 assert_eq!(recv.recv().await.unwrap(), 3);
92 assert_eq!(recv.recv().await.unwrap(), 5);
93}
94
95#[tokio::test]
96async fn close_sender_last() {
97 let (send, mut recv) = channel::<i32>(3);
98 let mut send = PollSender::new(send);
99
100 let mut recv_task = spawn(recv.recv());
101 assert_pending!(recv_task.poll());
102
103 send.close();
104
105 assert!(recv_task.is_woken());
106 assert!(assert_ready!(recv_task.poll()).is_none());
107}
108
109#[tokio::test]
110async fn close_sender_not_last() {
111 let (send, mut recv) = channel::<i32>(3);
112 let mut send = PollSender::new(send);
113 let send2 = send.get_ref().cloned().unwrap();
114
115 let mut recv_task = spawn(recv.recv());
116 assert_pending!(recv_task.poll());
117
118 send.close();
119
120 assert!(!recv_task.is_woken());
121 assert_pending!(recv_task.poll());
122
123 drop(send2);
124
125 assert!(recv_task.is_woken());
126 assert!(assert_ready!(recv_task.poll()).is_none());
127}
128
129#[tokio::test]
130async fn close_sender_before_reserve() {
131 let (send, mut recv) = channel::<i32>(3);
132 let mut send = PollSender::new(send);
133
134 let mut recv_task = spawn(recv.recv());
135 assert_pending!(recv_task.poll());
136
137 send.close();
138
139 assert!(recv_task.is_woken());
140 assert!(assert_ready!(recv_task.poll()).is_none());
141
142 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
143 assert_ready_err!(reserve.poll());
144}
145
146#[tokio::test]
147async fn close_sender_after_pending_reserve() {
148 let (send, mut recv) = channel::<i32>(1);
149 let mut send = PollSender::new(send);
150
151 let mut recv_task = spawn(recv.recv());
152 assert_pending!(recv_task.poll());
153
154 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
155 assert_ready_ok!(reserve.poll());
156 send.send_item(1).unwrap();
157
158 assert!(recv_task.is_woken());
159
160 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
161 assert_pending!(reserve.poll());
162 drop(reserve);
163
164 send.close();
165
166 assert!(send.is_closed());
167 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
168 assert_ready_err!(reserve.poll());
169}
170
171#[tokio::test]
172async fn close_sender_after_successful_reserve() {
173 let (send, mut recv) = channel::<i32>(3);
174 let mut send = PollSender::new(send);
175
176 let mut recv_task = spawn(recv.recv());
177 assert_pending!(recv_task.poll());
178
179 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
180 assert_ready_ok!(reserve.poll());
181 drop(reserve);
182
183 send.close();
184 assert!(send.is_closed());
185 assert!(!recv_task.is_woken());
186 assert_pending!(recv_task.poll());
187
188 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
189 assert_ready_ok!(reserve.poll());
190}
191
192#[tokio::test]
193async fn abort_send_after_pending_reserve() {
194 let (send, mut recv) = channel::<i32>(1);
195 let mut send = PollSender::new(send);
196
197 let mut recv_task = spawn(recv.recv());
198 assert_pending!(recv_task.poll());
199
200 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
201 assert_ready_ok!(reserve.poll());
202 send.send_item(1).unwrap();
203
204 assert_eq!(send.get_ref().unwrap().capacity(), 0);
205 assert!(!send.abort_send());
206
207 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
208 assert_pending!(reserve.poll());
209
210 assert!(send.abort_send());
211 assert_eq!(send.get_ref().unwrap().capacity(), 0);
212}
213
214#[tokio::test]
215async fn abort_send_after_successful_reserve() {
216 let (send, mut recv) = channel::<i32>(1);
217 let mut send = PollSender::new(send);
218
219 let mut recv_task = spawn(recv.recv());
220 assert_pending!(recv_task.poll());
221
222 assert_eq!(send.get_ref().unwrap().capacity(), 1);
223 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
224 assert_ready_ok!(reserve.poll());
225 assert_eq!(send.get_ref().unwrap().capacity(), 0);
226
227 assert!(send.abort_send());
228 assert_eq!(send.get_ref().unwrap().capacity(), 1);
229}
230
231#[tokio::test]
232async fn closed_when_receiver_drops() {
233 let (send, _) = channel::<i32>(1);
234 let mut send = PollSender::new(send);
235
236 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
237 assert_ready_err!(reserve.poll());
238}
239
240#[should_panic]
241#[test]
242fn start_send_panics_when_idle() {
243 let (send, _) = channel::<i32>(3);
244 let mut send = PollSender::new(send);
245
246 send.send_item(1).unwrap();
247}
248
249#[should_panic]
250#[test]
251fn start_send_panics_when_acquiring() {
252 let (send, _) = channel::<i32>(1);
253 let mut send = PollSender::new(send);
254
255 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
256 assert_ready_ok!(reserve.poll());
257 send.send_item(1).unwrap();
258
259 let mut reserve = spawn(poll_fn(|cx| send.poll_reserve(cx)));
260 assert_pending!(reserve.poll());
261 send.send_item(2).unwrap();
262}
263