1 | use std::task::{Context, Poll}; |
2 | #[cfg (feature = "http2" )] |
3 | use std::{future::Future, pin::Pin}; |
4 | |
5 | #[cfg (feature = "http2" )] |
6 | use http::{Request, Response}; |
7 | #[cfg (feature = "http2" )] |
8 | use http_body::Body; |
9 | #[cfg (feature = "http2" )] |
10 | use pin_project_lite::pin_project; |
11 | use tokio::sync::{mpsc, oneshot}; |
12 | |
13 | #[cfg (feature = "http2" )] |
14 | use crate::{body::Incoming, proto::h2::client::ResponseFutMap}; |
15 | |
16 | pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>; |
17 | pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; |
18 | |
19 | /// An error when calling `try_send_request`. |
20 | /// |
21 | /// There is a possibility of an error occurring on a connection in-between the |
22 | /// time that a request is queued and when it is actually written to the IO |
23 | /// transport. If that happens, it is safe to return the request back to the |
24 | /// caller, as it was never fully sent. |
25 | #[derive (Debug)] |
26 | pub struct TrySendError<T> { |
27 | pub(crate) error: crate::Error, |
28 | pub(crate) message: Option<T>, |
29 | } |
30 | |
31 | pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { |
32 | let (tx: UnboundedSender>, rx: UnboundedReceiver>) = mpsc::unbounded_channel(); |
33 | let (giver: Giver, taker: Taker) = want::new(); |
34 | let tx: Sender = Sender { |
35 | #[cfg (feature = "http1" )] |
36 | buffered_once: false, |
37 | giver, |
38 | inner: tx, |
39 | }; |
40 | let rx: Receiver = Receiver { inner: rx, taker }; |
41 | (tx, rx) |
42 | } |
43 | |
44 | /// A bounded sender of requests and callbacks for when responses are ready. |
45 | /// |
46 | /// While the inner sender is unbounded, the Giver is used to determine |
47 | /// if the Receiver is ready for another request. |
48 | pub(crate) struct Sender<T, U> { |
49 | /// One message is always allowed, even if the Receiver hasn't asked |
50 | /// for it yet. This boolean keeps track of whether we've sent one |
51 | /// without notice. |
52 | #[cfg (feature = "http1" )] |
53 | buffered_once: bool, |
54 | /// The Giver helps watch that the Receiver side has been polled |
55 | /// when the queue is empty. This helps us know when a request and |
56 | /// response have been fully processed, and a connection is ready |
57 | /// for more. |
58 | giver: want::Giver, |
59 | /// Actually bounded by the Giver, plus `buffered_once`. |
60 | inner: mpsc::UnboundedSender<Envelope<T, U>>, |
61 | } |
62 | |
63 | /// An unbounded version. |
64 | /// |
65 | /// Cannot poll the Giver, but can still use it to determine if the Receiver |
66 | /// has been dropped. However, this version can be cloned. |
67 | #[cfg (feature = "http2" )] |
68 | pub(crate) struct UnboundedSender<T, U> { |
69 | /// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked. |
70 | giver: want::SharedGiver, |
71 | inner: mpsc::UnboundedSender<Envelope<T, U>>, |
72 | } |
73 | |
74 | impl<T, U> Sender<T, U> { |
75 | #[cfg (feature = "http1" )] |
76 | pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
77 | self.giver |
78 | .poll_want(cx) |
79 | .map_err(|_| crate::Error::new_closed()) |
80 | } |
81 | |
82 | #[cfg (feature = "http1" )] |
83 | pub(crate) fn is_ready(&self) -> bool { |
84 | self.giver.is_wanting() |
85 | } |
86 | |
87 | #[cfg (feature = "http1" )] |
88 | pub(crate) fn is_closed(&self) -> bool { |
89 | self.giver.is_canceled() |
90 | } |
91 | |
92 | #[cfg (feature = "http1" )] |
93 | fn can_send(&mut self) -> bool { |
94 | if self.giver.give() || !self.buffered_once { |
95 | // If the receiver is ready *now*, then of course we can send. |
96 | // |
97 | // If the receiver isn't ready yet, but we don't have anything |
98 | // in the channel yet, then allow one message. |
99 | self.buffered_once = true; |
100 | true |
101 | } else { |
102 | false |
103 | } |
104 | } |
105 | |
106 | #[cfg (feature = "http1" )] |
107 | pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { |
108 | if !self.can_send() { |
109 | return Err(val); |
110 | } |
111 | let (tx, rx) = oneshot::channel(); |
112 | self.inner |
113 | .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) |
114 | .map(move |_| rx) |
115 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped" ).0) |
116 | } |
117 | |
118 | #[cfg (feature = "http1" )] |
119 | pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { |
120 | if !self.can_send() { |
121 | return Err(val); |
122 | } |
123 | let (tx, rx) = oneshot::channel(); |
124 | self.inner |
125 | .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) |
126 | .map(move |_| rx) |
127 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped" ).0) |
128 | } |
129 | |
130 | #[cfg (feature = "http2" )] |
131 | pub(crate) fn unbound(self) -> UnboundedSender<T, U> { |
132 | UnboundedSender { |
133 | giver: self.giver.shared(), |
134 | inner: self.inner, |
135 | } |
136 | } |
137 | } |
138 | |
139 | #[cfg (feature = "http2" )] |
140 | impl<T, U> UnboundedSender<T, U> { |
141 | pub(crate) fn is_ready(&self) -> bool { |
142 | !self.giver.is_canceled() |
143 | } |
144 | |
145 | pub(crate) fn is_closed(&self) -> bool { |
146 | self.giver.is_canceled() |
147 | } |
148 | |
149 | pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> { |
150 | let (tx, rx) = oneshot::channel(); |
151 | self.inner |
152 | .send(Envelope(Some((val, Callback::Retry(Some(tx)))))) |
153 | .map(move |_| rx) |
154 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped" ).0) |
155 | } |
156 | |
157 | pub(crate) fn send(&mut self, val: T) -> Result<Promise<U>, T> { |
158 | let (tx, rx) = oneshot::channel(); |
159 | self.inner |
160 | .send(Envelope(Some((val, Callback::NoRetry(Some(tx)))))) |
161 | .map(move |_| rx) |
162 | .map_err(|mut e| (e.0).0.take().expect("envelope not dropped" ).0) |
163 | } |
164 | } |
165 | |
166 | #[cfg (feature = "http2" )] |
167 | impl<T, U> Clone for UnboundedSender<T, U> { |
168 | fn clone(&self) -> Self { |
169 | UnboundedSender { |
170 | giver: self.giver.clone(), |
171 | inner: self.inner.clone(), |
172 | } |
173 | } |
174 | } |
175 | |
176 | pub(crate) struct Receiver<T, U> { |
177 | inner: mpsc::UnboundedReceiver<Envelope<T, U>>, |
178 | taker: want::Taker, |
179 | } |
180 | |
181 | impl<T, U> Receiver<T, U> { |
182 | pub(crate) fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<(T, Callback<T, U>)>> { |
183 | match self.inner.poll_recv(cx) { |
184 | Poll::Ready(item) => { |
185 | Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped" ))) |
186 | } |
187 | Poll::Pending => { |
188 | self.taker.want(); |
189 | Poll::Pending |
190 | } |
191 | } |
192 | } |
193 | |
194 | #[cfg (feature = "http1" )] |
195 | pub(crate) fn close(&mut self) { |
196 | self.taker.cancel(); |
197 | self.inner.close(); |
198 | } |
199 | |
200 | #[cfg (feature = "http1" )] |
201 | pub(crate) fn try_recv(&mut self) -> Option<(T, Callback<T, U>)> { |
202 | use futures_util::FutureExt; |
203 | match self.inner.recv().now_or_never() { |
204 | Some(Some(mut env)) => env.0.take(), |
205 | _ => None, |
206 | } |
207 | } |
208 | } |
209 | |
210 | impl<T, U> Drop for Receiver<T, U> { |
211 | fn drop(&mut self) { |
212 | // Notify the giver about the closure first, before dropping |
213 | // the mpsc::Receiver. |
214 | self.taker.cancel(); |
215 | } |
216 | } |
217 | |
218 | struct Envelope<T, U>(Option<(T, Callback<T, U>)>); |
219 | |
220 | impl<T, U> Drop for Envelope<T, U> { |
221 | fn drop(&mut self) { |
222 | if let Some((val: T, cb: Callback)) = self.0.take() { |
223 | cb.send(val:Err(TrySendError { |
224 | error: crate::Error::new_canceled().with(cause:"connection closed" ), |
225 | message: Some(val), |
226 | })); |
227 | } |
228 | } |
229 | } |
230 | |
231 | pub(crate) enum Callback<T, U> { |
232 | #[allow (unused)] |
233 | Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>), |
234 | NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>), |
235 | } |
236 | |
237 | impl<T, U> Drop for Callback<T, U> { |
238 | fn drop(&mut self) { |
239 | match self { |
240 | Callback::Retry(tx: &mut Option>>>) => { |
241 | if let Some(tx: Sender>>) = tx.take() { |
242 | let _ = tx.send(Err(TrySendError { |
243 | error: dispatch_gone(), |
244 | message: None, |
245 | })); |
246 | } |
247 | } |
248 | Callback::NoRetry(tx: &mut Option>>) => { |
249 | if let Some(tx: Sender>) = tx.take() { |
250 | let _ = tx.send(Err(dispatch_gone())); |
251 | } |
252 | } |
253 | } |
254 | } |
255 | } |
256 | |
257 | #[cold ] |
258 | fn dispatch_gone() -> crate::Error { |
259 | // FIXME(nox): What errors do we want here? |
260 | crate::Error::new_user_dispatch_gone().with(cause:if std::thread::panicking() { |
261 | "user code panicked" |
262 | } else { |
263 | "runtime dropped the dispatch task" |
264 | }) |
265 | } |
266 | |
267 | impl<T, U> Callback<T, U> { |
268 | #[cfg (feature = "http2" )] |
269 | pub(crate) fn is_canceled(&self) -> bool { |
270 | match *self { |
271 | Callback::Retry(Some(ref tx)) => tx.is_closed(), |
272 | Callback::NoRetry(Some(ref tx)) => tx.is_closed(), |
273 | _ => unreachable!(), |
274 | } |
275 | } |
276 | |
277 | pub(crate) fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
278 | match *self { |
279 | Callback::Retry(Some(ref mut tx)) => tx.poll_closed(cx), |
280 | Callback::NoRetry(Some(ref mut tx)) => tx.poll_closed(cx), |
281 | _ => unreachable!(), |
282 | } |
283 | } |
284 | |
285 | pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) { |
286 | match self { |
287 | Callback::Retry(ref mut tx) => { |
288 | let _ = tx.take().unwrap().send(val); |
289 | } |
290 | Callback::NoRetry(ref mut tx) => { |
291 | let _ = tx.take().unwrap().send(val.map_err(|e| e.error)); |
292 | } |
293 | } |
294 | } |
295 | } |
296 | |
297 | impl<T> TrySendError<T> { |
298 | /// Take the message from this error. |
299 | /// |
300 | /// The message will not always have been recovered. If an error occurs |
301 | /// after the message has been serialized onto the connection, it will not |
302 | /// be available here. |
303 | pub fn take_message(&mut self) -> Option<T> { |
304 | self.message.take() |
305 | } |
306 | |
307 | /// Consumes this to return the inner error. |
308 | pub fn into_error(self) -> crate::Error { |
309 | self.error |
310 | } |
311 | } |
312 | |
313 | #[cfg (feature = "http2" )] |
314 | pin_project! { |
315 | pub struct SendWhen<B> |
316 | where |
317 | B: Body, |
318 | B: 'static, |
319 | { |
320 | #[pin] |
321 | pub(crate) when: ResponseFutMap<B>, |
322 | #[pin] |
323 | pub(crate) call_back: Option<Callback<Request<B>, Response<Incoming>>>, |
324 | } |
325 | } |
326 | |
327 | #[cfg (feature = "http2" )] |
328 | impl<B> Future for SendWhen<B> |
329 | where |
330 | B: Body + 'static, |
331 | { |
332 | type Output = (); |
333 | |
334 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
335 | let mut this = self.project(); |
336 | |
337 | let mut call_back = this.call_back.take().expect("polled after complete" ); |
338 | |
339 | match Pin::new(&mut this.when).poll(cx) { |
340 | Poll::Ready(Ok(res)) => { |
341 | call_back.send(Ok(res)); |
342 | Poll::Ready(()) |
343 | } |
344 | Poll::Pending => { |
345 | // check if the callback is canceled |
346 | match call_back.poll_canceled(cx) { |
347 | Poll::Ready(v) => v, |
348 | Poll::Pending => { |
349 | // Move call_back back to struct before return |
350 | this.call_back.set(Some(call_back)); |
351 | return Poll::Pending; |
352 | } |
353 | }; |
354 | trace!("send_when canceled" ); |
355 | Poll::Ready(()) |
356 | } |
357 | Poll::Ready(Err((error, message))) => { |
358 | call_back.send(Err(TrySendError { error, message })); |
359 | Poll::Ready(()) |
360 | } |
361 | } |
362 | } |
363 | } |
364 | |
365 | #[cfg (test)] |
366 | mod tests { |
367 | #[cfg (feature = "nightly" )] |
368 | extern crate test; |
369 | |
370 | use std::future::Future; |
371 | use std::pin::Pin; |
372 | use std::task::{Context, Poll}; |
373 | |
374 | use super::{channel, Callback, Receiver}; |
375 | |
376 | #[derive (Debug)] |
377 | struct Custom(#[allow (dead_code)] i32); |
378 | |
379 | impl<T, U> Future for Receiver<T, U> { |
380 | type Output = Option<(T, Callback<T, U>)>; |
381 | |
382 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
383 | self.poll_recv(cx) |
384 | } |
385 | } |
386 | |
387 | /// Helper to check if the future is ready after polling once. |
388 | struct PollOnce<'a, F>(&'a mut F); |
389 | |
390 | impl<F, T> Future for PollOnce<'_, F> |
391 | where |
392 | F: Future<Output = T> + Unpin, |
393 | { |
394 | type Output = Option<()>; |
395 | |
396 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
397 | match Pin::new(&mut self.0).poll(cx) { |
398 | Poll::Ready(_) => Poll::Ready(Some(())), |
399 | Poll::Pending => Poll::Ready(None), |
400 | } |
401 | } |
402 | } |
403 | |
404 | #[cfg (not(miri))] |
405 | #[tokio::test ] |
406 | async fn drop_receiver_sends_cancel_errors() { |
407 | let _ = pretty_env_logger::try_init(); |
408 | |
409 | let (mut tx, mut rx) = channel::<Custom, ()>(); |
410 | |
411 | // must poll once for try_send to succeed |
412 | assert!(PollOnce(&mut rx).await.is_none(), "rx empty" ); |
413 | |
414 | let promise = tx.try_send(Custom(43)).unwrap(); |
415 | drop(rx); |
416 | |
417 | let fulfilled = promise.await; |
418 | let err = fulfilled |
419 | .expect("fulfilled" ) |
420 | .expect_err("promise should error" ); |
421 | match (err.error.is_canceled(), err.message) { |
422 | (true, Some(_)) => (), |
423 | e => panic!("expected Error::Cancel(_), found {:?}" , e), |
424 | } |
425 | } |
426 | |
427 | #[cfg (not(miri))] |
428 | #[tokio::test ] |
429 | async fn sender_checks_for_want_on_send() { |
430 | let (mut tx, mut rx) = channel::<Custom, ()>(); |
431 | |
432 | // one is allowed to buffer, second is rejected |
433 | let _ = tx.try_send(Custom(1)).expect("1 buffered" ); |
434 | tx.try_send(Custom(2)).expect_err("2 not ready" ); |
435 | |
436 | assert!(PollOnce(&mut rx).await.is_some(), "rx once" ); |
437 | |
438 | // Even though 1 has been popped, only 1 could be buffered for the |
439 | // lifetime of the channel. |
440 | tx.try_send(Custom(2)).expect_err("2 still not ready" ); |
441 | |
442 | assert!(PollOnce(&mut rx).await.is_none(), "rx empty" ); |
443 | |
444 | let _ = tx.try_send(Custom(2)).expect("2 ready" ); |
445 | } |
446 | |
447 | #[cfg (feature = "http2" )] |
448 | #[test ] |
449 | fn unbounded_sender_doesnt_bound_on_want() { |
450 | let (tx, rx) = channel::<Custom, ()>(); |
451 | let mut tx = tx.unbound(); |
452 | |
453 | let _ = tx.try_send(Custom(1)).unwrap(); |
454 | let _ = tx.try_send(Custom(2)).unwrap(); |
455 | let _ = tx.try_send(Custom(3)).unwrap(); |
456 | |
457 | drop(rx); |
458 | |
459 | let _ = tx.try_send(Custom(4)).unwrap_err(); |
460 | } |
461 | |
462 | #[cfg (feature = "nightly" )] |
463 | #[bench ] |
464 | fn giver_queue_throughput(b: &mut test::Bencher) { |
465 | use crate::{body::Incoming, Request, Response}; |
466 | |
467 | let rt = tokio::runtime::Builder::new_current_thread() |
468 | .build() |
469 | .unwrap(); |
470 | let (mut tx, mut rx) = channel::<Request<Incoming>, Response<Incoming>>(); |
471 | |
472 | b.iter(move || { |
473 | let _ = tx.send(Request::new(Incoming::empty())).unwrap(); |
474 | rt.block_on(async { |
475 | loop { |
476 | let poll_once = PollOnce(&mut rx); |
477 | let opt = poll_once.await; |
478 | if opt.is_none() { |
479 | break; |
480 | } |
481 | } |
482 | }); |
483 | }) |
484 | } |
485 | |
486 | #[cfg (feature = "nightly" )] |
487 | #[bench ] |
488 | fn giver_queue_not_ready(b: &mut test::Bencher) { |
489 | let rt = tokio::runtime::Builder::new_current_thread() |
490 | .build() |
491 | .unwrap(); |
492 | let (_tx, mut rx) = channel::<i32, ()>(); |
493 | b.iter(move || { |
494 | rt.block_on(async { |
495 | let poll_once = PollOnce(&mut rx); |
496 | assert!(poll_once.await.is_none()); |
497 | }); |
498 | }) |
499 | } |
500 | |
501 | #[cfg (feature = "nightly" )] |
502 | #[bench ] |
503 | fn giver_queue_cancel(b: &mut test::Bencher) { |
504 | let (_tx, mut rx) = channel::<i32, ()>(); |
505 | |
506 | b.iter(move || { |
507 | rx.taker.cancel(); |
508 | }) |
509 | } |
510 | } |
511 | |