1use std::collections::{HashMap, HashSet, VecDeque};
2use std::error::Error as StdError;
3use std::fmt;
4use std::future::Future;
5use std::marker::Unpin;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, Weak};
9use std::task::{Context, Poll};
10
11#[cfg(not(feature = "runtime"))]
12use std::time::{Duration, Instant};
13
14use futures_channel::oneshot;
15#[cfg(feature = "runtime")]
16use tokio::time::{Duration, Instant, Interval};
17use tracing::{debug, trace};
18
19use super::client::Ver;
20use crate::common::exec::Exec;
21
22// FIXME: allow() required due to `impl Trait` leaking types to this lint
23#[allow(missing_debug_implementations)]
24pub(super) struct Pool<T> {
25 // If the pool is disabled, this is None.
26 inner: Option<Arc<Mutex<PoolInner<T>>>>,
27}
28
29// Before using a pooled connection, make sure the sender is not dead.
30//
31// This is a trait to allow the `client::pool::tests` to work for `i32`.
32//
33// See https://github.com/hyperium/hyper/issues/1429
34pub(super) trait Poolable: Unpin + Send + Sized + 'static {
35 fn is_open(&self) -> bool;
36 /// Reserve this connection.
37 ///
38 /// Allows for HTTP/2 to return a shared reservation.
39 fn reserve(self) -> Reservation<Self>;
40 fn can_share(&self) -> bool;
41}
42
43/// When checking out a pooled connection, it might be that the connection
44/// only supports a single reservation, or it might be usable for many.
45///
46/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
47/// used for multiple requests.
48// FIXME: allow() required due to `impl Trait` leaking types to this lint
49#[allow(missing_debug_implementations)]
50pub(super) enum Reservation<T> {
51 /// This connection could be used multiple times, the first one will be
52 /// reinserted into the `idle` pool, and the second will be given to
53 /// the `Checkout`.
54 #[cfg(feature = "http2")]
55 Shared(T, T),
56 /// This connection requires unique access. It will be returned after
57 /// use is complete.
58 Unique(T),
59}
60
61/// Simple type alias in case the key type needs to be adjusted.
62pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
63
64struct PoolInner<T> {
65 // A flag that a connection is being established, and the connection
66 // should be shared. This prevents making multiple HTTP/2 connections
67 // to the same host.
68 connecting: HashSet<Key>,
69 // These are internal Conns sitting in the event loop in the KeepAlive
70 // state, waiting to receive a new Request to send on the socket.
71 idle: HashMap<Key, Vec<Idle<T>>>,
72 max_idle_per_host: usize,
73 // These are outstanding Checkouts that are waiting for a socket to be
74 // able to send a Request one. This is used when "racing" for a new
75 // connection.
76 //
77 // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
78 // for the Pool to receive an idle Conn. When a Conn becomes idle,
79 // this list is checked for any parked Checkouts, and tries to notify
80 // them that the Conn could be used instead of waiting for a brand new
81 // connection.
82 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
83 // A oneshot channel is used to allow the interval to be notified when
84 // the Pool completely drops. That way, the interval can cancel immediately.
85 #[cfg(feature = "runtime")]
86 idle_interval_ref: Option<oneshot::Sender<std::convert::Infallible>>,
87 #[cfg(feature = "runtime")]
88 exec: Exec,
89 timeout: Option<Duration>,
90}
91
92// This is because `Weak::new()` *allocates* space for `T`, even if it
93// doesn't need it!
94struct WeakOpt<T>(Option<Weak<T>>);
95
96#[derive(Clone, Copy, Debug)]
97pub(super) struct Config {
98 pub(super) idle_timeout: Option<Duration>,
99 pub(super) max_idle_per_host: usize,
100}
101
102impl Config {
103 pub(super) fn is_enabled(&self) -> bool {
104 self.max_idle_per_host > 0
105 }
106}
107
108impl<T> Pool<T> {
109 pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
110 let inner = if config.is_enabled() {
111 Some(Arc::new(Mutex::new(PoolInner {
112 connecting: HashSet::new(),
113 idle: HashMap::new(),
114 #[cfg(feature = "runtime")]
115 idle_interval_ref: None,
116 max_idle_per_host: config.max_idle_per_host,
117 waiters: HashMap::new(),
118 #[cfg(feature = "runtime")]
119 exec: __exec.clone(),
120 timeout: config.idle_timeout.filter(|&t| t > Duration::ZERO),
121 })))
122 } else {
123 None
124 };
125
126 Pool { inner }
127 }
128
129 fn is_enabled(&self) -> bool {
130 self.inner.is_some()
131 }
132
133 #[cfg(test)]
134 pub(super) fn no_timer(&self) {
135 // Prevent an actual interval from being created for this pool...
136 #[cfg(feature = "runtime")]
137 {
138 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
139 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
140 let (tx, _) = oneshot::channel();
141 inner.idle_interval_ref = Some(tx);
142 }
143 }
144}
145
146impl<T: Poolable> Pool<T> {
147 /// Returns a `Checkout` which is a future that resolves if an idle
148 /// connection becomes available.
149 pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
150 Checkout {
151 key,
152 pool: self.clone(),
153 waiter: None,
154 }
155 }
156
157 /// Ensure that there is only ever 1 connecting task for HTTP/2
158 /// connections. This does nothing for HTTP/1.
159 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
160 if ver == Ver::Http2 {
161 if let Some(ref enabled) = self.inner {
162 let mut inner = enabled.lock().unwrap();
163 return if inner.connecting.insert(key.clone()) {
164 let connecting = Connecting {
165 key: key.clone(),
166 pool: WeakOpt::downgrade(enabled),
167 };
168 Some(connecting)
169 } else {
170 trace!("HTTP/2 connecting already in progress for {:?}", key);
171 None
172 };
173 }
174 }
175
176 // else
177 Some(Connecting {
178 key: key.clone(),
179 // in HTTP/1's case, there is never a lock, so we don't
180 // need to do anything in Drop.
181 pool: WeakOpt::none(),
182 })
183 }
184
185 #[cfg(test)]
186 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
187 self.inner.as_ref().expect("enabled").lock().expect("lock")
188 }
189
190 /* Used in client/tests.rs...
191 #[cfg(feature = "runtime")]
192 #[cfg(test)]
193 pub(super) fn h1_key(&self, s: &str) -> Key {
194 Arc::new(s.to_string())
195 }
196
197 #[cfg(feature = "runtime")]
198 #[cfg(test)]
199 pub(super) fn idle_count(&self, key: &Key) -> usize {
200 self
201 .locked()
202 .idle
203 .get(key)
204 .map(|list| list.len())
205 .unwrap_or(0)
206 }
207 */
208
209 pub(super) fn pooled(
210 &self,
211 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
212 value: T,
213 ) -> Pooled<T> {
214 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
215 match value.reserve() {
216 #[cfg(feature = "http2")]
217 Reservation::Shared(to_insert, to_return) => {
218 let mut inner = enabled.lock().unwrap();
219 inner.put(connecting.key.clone(), to_insert, enabled);
220 // Do this here instead of Drop for Connecting because we
221 // already have a lock, no need to lock the mutex twice.
222 inner.connected(&connecting.key);
223 // prevent the Drop of Connecting from repeating inner.connected()
224 connecting.pool = WeakOpt::none();
225
226 // Shared reservations don't need a reference to the pool,
227 // since the pool always keeps a copy.
228 (to_return, WeakOpt::none())
229 }
230 Reservation::Unique(value) => {
231 // Unique reservations must take a reference to the pool
232 // since they hope to reinsert once the reservation is
233 // completed
234 (value, WeakOpt::downgrade(enabled))
235 }
236 }
237 } else {
238 // If pool is not enabled, skip all the things...
239
240 // The Connecting should have had no pool ref
241 debug_assert!(connecting.pool.upgrade().is_none());
242
243 (value, WeakOpt::none())
244 };
245 Pooled {
246 key: connecting.key.clone(),
247 is_reused: false,
248 pool: pool_ref,
249 value: Some(value),
250 }
251 }
252
253 fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
254 debug!("reuse idle connection for {:?}", key);
255 // TODO: unhack this
256 // In Pool::pooled(), which is used for inserting brand new connections,
257 // there's some code that adjusts the pool reference taken depending
258 // on if the Reservation can be shared or is unique. By the time
259 // reuse() is called, the reservation has already been made, and
260 // we just have the final value, without knowledge of if this is
261 // unique or shared. So, the hack is to just assume Ver::Http2 means
262 // shared... :(
263 let mut pool_ref = WeakOpt::none();
264 if !value.can_share() {
265 if let Some(ref enabled) = self.inner {
266 pool_ref = WeakOpt::downgrade(enabled);
267 }
268 }
269
270 Pooled {
271 is_reused: true,
272 key: key.clone(),
273 pool: pool_ref,
274 value: Some(value),
275 }
276 }
277}
278
279/// Pop off this list, looking for a usable connection that hasn't expired.
280struct IdlePopper<'a, T> {
281 key: &'a Key,
282 list: &'a mut Vec<Idle<T>>,
283}
284
285impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
286 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
287 while let Some(entry) = self.list.pop() {
288 // If the connection has been closed, or is older than our idle
289 // timeout, simply drop it and keep looking...
290 if !entry.value.is_open() {
291 trace!("removing closed connection for {:?}", self.key);
292 continue;
293 }
294 // TODO: Actually, since the `idle` list is pushed to the end always,
295 // that would imply that if *this* entry is expired, then anything
296 // "earlier" in the list would *have* to be expired also... Right?
297 //
298 // In that case, we could just break out of the loop and drop the
299 // whole list...
300 if expiration.expires(entry.idle_at) {
301 trace!("removing expired connection for {:?}", self.key);
302 continue;
303 }
304
305 let value = match entry.value.reserve() {
306 #[cfg(feature = "http2")]
307 Reservation::Shared(to_reinsert, to_checkout) => {
308 self.list.push(Idle {
309 idle_at: Instant::now(),
310 value: to_reinsert,
311 });
312 to_checkout
313 }
314 Reservation::Unique(unique) => unique,
315 };
316
317 return Some(Idle {
318 idle_at: entry.idle_at,
319 value,
320 });
321 }
322
323 None
324 }
325}
326
327impl<T: Poolable> PoolInner<T> {
328 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
329 if value.can_share() && self.idle.contains_key(&key) {
330 trace!("put; existing idle HTTP/2 connection for {:?}", key);
331 return;
332 }
333 trace!("put; add idle connection for {:?}", key);
334 let mut remove_waiters = false;
335 let mut value = Some(value);
336 if let Some(waiters) = self.waiters.get_mut(&key) {
337 while let Some(tx) = waiters.pop_front() {
338 if !tx.is_canceled() {
339 let reserved = value.take().expect("value already sent");
340 let reserved = match reserved.reserve() {
341 #[cfg(feature = "http2")]
342 Reservation::Shared(to_keep, to_send) => {
343 value = Some(to_keep);
344 to_send
345 }
346 Reservation::Unique(uniq) => uniq,
347 };
348 match tx.send(reserved) {
349 Ok(()) => {
350 if value.is_none() {
351 break;
352 } else {
353 continue;
354 }
355 }
356 Err(e) => {
357 value = Some(e);
358 }
359 }
360 }
361
362 trace!("put; removing canceled waiter for {:?}", key);
363 }
364 remove_waiters = waiters.is_empty();
365 }
366 if remove_waiters {
367 self.waiters.remove(&key);
368 }
369
370 match value {
371 Some(value) => {
372 // borrow-check scope...
373 {
374 let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
375 if self.max_idle_per_host <= idle_list.len() {
376 trace!("max idle per host for {:?}, dropping connection", key);
377 return;
378 }
379
380 debug!("pooling idle connection for {:?}", key);
381 idle_list.push(Idle {
382 value,
383 idle_at: Instant::now(),
384 });
385 }
386
387 #[cfg(feature = "runtime")]
388 {
389 self.spawn_idle_interval(__pool_ref);
390 }
391 }
392 None => trace!("put; found waiter for {:?}", key),
393 }
394 }
395
396 /// A `Connecting` task is complete. Not necessarily successfully,
397 /// but the lock is going away, so clean up.
398 fn connected(&mut self, key: &Key) {
399 let existed = self.connecting.remove(key);
400 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
401 // cancel any waiters. if there are any, it's because
402 // this Connecting task didn't complete successfully.
403 // those waiters would never receive a connection.
404 self.waiters.remove(key);
405 }
406
407 #[cfg(feature = "runtime")]
408 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
409 let (dur, rx) = {
410 if self.idle_interval_ref.is_some() {
411 return;
412 }
413
414 if let Some(dur) = self.timeout {
415 let (tx, rx) = oneshot::channel();
416 self.idle_interval_ref = Some(tx);
417 (dur, rx)
418 } else {
419 return;
420 }
421 };
422
423 let interval = IdleTask {
424 interval: tokio::time::interval(dur),
425 pool: WeakOpt::downgrade(pool_ref),
426 pool_drop_notifier: rx,
427 };
428
429 self.exec.execute(interval);
430 }
431}
432
433impl<T> PoolInner<T> {
434 /// Any `FutureResponse`s that were created will have made a `Checkout`,
435 /// and possibly inserted into the pool that it is waiting for an idle
436 /// connection. If a user ever dropped that future, we need to clean out
437 /// those parked senders.
438 fn clean_waiters(&mut self, key: &Key) {
439 let mut remove_waiters: bool = false;
440 if let Some(waiters: &mut VecDeque>) = self.waiters.get_mut(key) {
441 waiters.retain(|tx: &Sender| !tx.is_canceled());
442 remove_waiters = waiters.is_empty();
443 }
444 if remove_waiters {
445 self.waiters.remove(key);
446 }
447 }
448}
449
450#[cfg(feature = "runtime")]
451impl<T: Poolable> PoolInner<T> {
452 /// This should *only* be called by the IdleTask
453 fn clear_expired(&mut self) {
454 let dur = self.timeout.expect("interval assumes timeout");
455
456 let now = Instant::now();
457 //self.last_idle_check_at = now;
458
459 self.idle.retain(|key, values| {
460 values.retain(|entry| {
461 if !entry.value.is_open() {
462 trace!("idle interval evicting closed for {:?}", key);
463 return false;
464 }
465
466 // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
467 if now.saturating_duration_since(entry.idle_at) > dur {
468 trace!("idle interval evicting expired for {:?}", key);
469 return false;
470 }
471
472 // Otherwise, keep this value...
473 true
474 });
475
476 // returning false evicts this key/val
477 !values.is_empty()
478 });
479 }
480}
481
482impl<T> Clone for Pool<T> {
483 fn clone(&self) -> Pool<T> {
484 Pool {
485 inner: self.inner.clone(),
486 }
487 }
488}
489
490/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
491// Note: The bounds `T: Poolable` is needed for the Drop impl.
492pub(super) struct Pooled<T: Poolable> {
493 value: Option<T>,
494 is_reused: bool,
495 key: Key,
496 pool: WeakOpt<Mutex<PoolInner<T>>>,
497}
498
499impl<T: Poolable> Pooled<T> {
500 pub(super) fn is_reused(&self) -> bool {
501 self.is_reused
502 }
503
504 pub(super) fn is_pool_enabled(&self) -> bool {
505 self.pool.0.is_some()
506 }
507
508 fn as_ref(&self) -> &T {
509 self.value.as_ref().expect(msg:"not dropped")
510 }
511
512 fn as_mut(&mut self) -> &mut T {
513 self.value.as_mut().expect(msg:"not dropped")
514 }
515}
516
517impl<T: Poolable> Deref for Pooled<T> {
518 type Target = T;
519 fn deref(&self) -> &T {
520 self.as_ref()
521 }
522}
523
524impl<T: Poolable> DerefMut for Pooled<T> {
525 fn deref_mut(&mut self) -> &mut T {
526 self.as_mut()
527 }
528}
529
530impl<T: Poolable> Drop for Pooled<T> {
531 fn drop(&mut self) {
532 if let Some(value: T) = self.value.take() {
533 if !value.is_open() {
534 // If we *already* know the connection is done here,
535 // it shouldn't be re-inserted back into the pool.
536 return;
537 }
538
539 if let Some(pool: Arc>>) = self.pool.upgrade() {
540 if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() {
541 inner.put(self.key.clone(), value, &pool);
542 }
543 } else if !value.can_share() {
544 trace!("pool dropped, dropping pooled ({:?})", self.key);
545 }
546 // Ver::Http2 is already in the Pool (or dead), so we wouldn't
547 // have an actual reference to the Pool.
548 }
549 }
550}
551
552impl<T: Poolable> fmt::Debug for Pooled<T> {
553 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554 f.debug_struct("Pooled").field(name:"key", &self.key).finish()
555 }
556}
557
558struct Idle<T> {
559 idle_at: Instant,
560 value: T,
561}
562
563// FIXME: allow() required due to `impl Trait` leaking types to this lint
564#[allow(missing_debug_implementations)]
565pub(super) struct Checkout<T> {
566 key: Key,
567 pool: Pool<T>,
568 waiter: Option<oneshot::Receiver<T>>,
569}
570
571#[derive(Debug)]
572pub(super) struct CheckoutIsClosedError;
573
574impl StdError for CheckoutIsClosedError {}
575
576impl fmt::Display for CheckoutIsClosedError {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 f.write_str(data:"checked out connection was closed")
579 }
580}
581
582impl<T: Poolable> Checkout<T> {
583 fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> {
584 if let Some(mut rx) = self.waiter.take() {
585 match Pin::new(&mut rx).poll(cx) {
586 Poll::Ready(Ok(value)) => {
587 if value.is_open() {
588 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
589 } else {
590 Poll::Ready(Some(Err(
591 crate::Error::new_canceled().with(CheckoutIsClosedError)
592 )))
593 }
594 }
595 Poll::Pending => {
596 self.waiter = Some(rx);
597 Poll::Pending
598 }
599 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
600 crate::Error::new_canceled().with("request has been canceled")
601 ))),
602 }
603 } else {
604 Poll::Ready(None)
605 }
606 }
607
608 fn checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>> {
609 let entry = {
610 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
611 let expiration = Expiration::new(inner.timeout);
612 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
613 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
614 // A block to end the mutable borrow on list,
615 // so the map below can check is_empty()
616 {
617 let popper = IdlePopper {
618 key: &self.key,
619 list,
620 };
621 popper.pop(&expiration)
622 }
623 .map(|e| (e, list.is_empty()))
624 });
625
626 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
627 (Some(e), empty)
628 } else {
629 // No entry found means nuke the list for sure.
630 (None, true)
631 };
632 if empty {
633 //TODO: This could be done with the HashMap::entry API instead.
634 inner.idle.remove(&self.key);
635 }
636
637 if entry.is_none() && self.waiter.is_none() {
638 let (tx, mut rx) = oneshot::channel();
639 trace!("checkout waiting for idle connection: {:?}", self.key);
640 inner
641 .waiters
642 .entry(self.key.clone())
643 .or_insert_with(VecDeque::new)
644 .push_back(tx);
645
646 // register the waker with this oneshot
647 assert!(Pin::new(&mut rx).poll(cx).is_pending());
648 self.waiter = Some(rx);
649 }
650
651 entry
652 };
653
654 entry.map(|e| self.pool.reuse(&self.key, e.value))
655 }
656}
657
658impl<T: Poolable> Future for Checkout<T> {
659 type Output = crate::Result<Pooled<T>>;
660
661 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
662 if let Some(pooled: Pooled) = ready!(self.poll_waiter(cx)?) {
663 return Poll::Ready(Ok(pooled));
664 }
665
666 if let Some(pooled: Pooled) = self.checkout(cx) {
667 Poll::Ready(Ok(pooled))
668 } else if !self.pool.is_enabled() {
669 Poll::Ready(Err(crate::Error::new_canceled().with(cause:"pool is disabled")))
670 } else {
671 // There's a new waiter, already registered in self.checkout()
672 debug_assert!(self.waiter.is_some());
673 Poll::Pending
674 }
675 }
676}
677
678impl<T> Drop for Checkout<T> {
679 fn drop(&mut self) {
680 if self.waiter.take().is_some() {
681 trace!("checkout dropped for {:?}", self.key);
682 if let Some(Ok(mut inner: MutexGuard<'_, PoolInner<…>>)) = self.pool.inner.as_ref().map(|i: &Arc>>| i.lock()) {
683 inner.clean_waiters(&self.key);
684 }
685 }
686 }
687}
688
689// FIXME: allow() required due to `impl Trait` leaking types to this lint
690#[allow(missing_debug_implementations)]
691pub(super) struct Connecting<T: Poolable> {
692 key: Key,
693 pool: WeakOpt<Mutex<PoolInner<T>>>,
694}
695
696impl<T: Poolable> Connecting<T> {
697 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
698 debug_assert!(
699 self.pool.0.is_none(),
700 "Connecting::alpn_h2 but already Http2"
701 );
702
703 pool.connecting(&self.key, Ver::Http2)
704 }
705}
706
707impl<T: Poolable> Drop for Connecting<T> {
708 fn drop(&mut self) {
709 if let Some(pool: Arc>>) = self.pool.upgrade() {
710 // No need to panic on drop, that could abort!
711 if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() {
712 inner.connected(&self.key);
713 }
714 }
715 }
716}
717
718struct Expiration(Option<Duration>);
719
720impl Expiration {
721 fn new(dur: Option<Duration>) -> Expiration {
722 Expiration(dur)
723 }
724
725 fn expires(&self, instant: Instant) -> bool {
726 match self.0 {
727 // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
728 Some(timeout: Duration) => Instant::now().saturating_duration_since(earlier:instant) > timeout,
729 None => false,
730 }
731 }
732}
733
734#[cfg(feature = "runtime")]
735pin_project_lite::pin_project! {
736 struct IdleTask<T> {
737 #[pin]
738 interval: Interval,
739 pool: WeakOpt<Mutex<PoolInner<T>>>,
740 // This allows the IdleTask to be notified as soon as the entire
741 // Pool is fully dropped, and shutdown. This channel is never sent on,
742 // but Err(Canceled) will be received when the Pool is dropped.
743 #[pin]
744 pool_drop_notifier: oneshot::Receiver<std::convert::Infallible>,
745 }
746}
747
748#[cfg(feature = "runtime")]
749impl<T: Poolable + 'static> Future for IdleTask<T> {
750 type Output = ();
751
752 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
753 let mut this = self.project();
754 loop {
755 match this.pool_drop_notifier.as_mut().poll(cx) {
756 Poll::Ready(Ok(n)) => match n {},
757 Poll::Pending => (),
758 Poll::Ready(Err(_canceled)) => {
759 trace!("pool closed, canceling idle interval");
760 return Poll::Ready(());
761 }
762 }
763
764 ready!(this.interval.as_mut().poll_tick(cx));
765
766 if let Some(inner) = this.pool.upgrade() {
767 if let Ok(mut inner) = inner.lock() {
768 trace!("idle interval checking for expired");
769 inner.clear_expired();
770 continue;
771 }
772 }
773 return Poll::Ready(());
774 }
775 }
776}
777
778impl<T> WeakOpt<T> {
779 fn none() -> Self {
780 WeakOpt(None)
781 }
782
783 fn downgrade(arc: &Arc<T>) -> Self {
784 WeakOpt(Some(Arc::downgrade(this:arc)))
785 }
786
787 fn upgrade(&self) -> Option<Arc<T>> {
788 self.0.as_ref().and_then(Weak::upgrade)
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use std::future::Future;
795 use std::pin::Pin;
796 use std::task::Context;
797 use std::task::Poll;
798 use std::time::Duration;
799
800 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
801 use crate::common::exec::Exec;
802
803 /// Test unique reservations.
804 #[derive(Debug, PartialEq, Eq)]
805 struct Uniq<T>(T);
806
807 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
808 fn is_open(&self) -> bool {
809 true
810 }
811
812 fn reserve(self) -> Reservation<Self> {
813 Reservation::Unique(self)
814 }
815
816 fn can_share(&self) -> bool {
817 false
818 }
819 }
820
821 fn c<T: Poolable>(key: Key) -> Connecting<T> {
822 Connecting {
823 key,
824 pool: WeakOpt::none(),
825 }
826 }
827
828 fn host_key(s: &str) -> Key {
829 (http::uri::Scheme::HTTP, s.parse().expect("host key"))
830 }
831
832 fn pool_no_timer<T>() -> Pool<T> {
833 pool_max_idle_no_timer(::std::usize::MAX)
834 }
835
836 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
837 let pool = Pool::new(
838 super::Config {
839 idle_timeout: Some(Duration::from_millis(100)),
840 max_idle_per_host: max_idle,
841 },
842 &Exec::Default,
843 );
844 pool.no_timer();
845 pool
846 }
847
848 #[tokio::test]
849 async fn test_pool_checkout_smoke() {
850 let pool = pool_no_timer();
851 let key = host_key("foo");
852 let pooled = pool.pooled(c(key.clone()), Uniq(41));
853
854 drop(pooled);
855
856 match pool.checkout(key).await {
857 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
858 Err(_) => panic!("not ready"),
859 };
860 }
861
862 /// Helper to check if the future is ready after polling once.
863 struct PollOnce<'a, F>(&'a mut F);
864
865 impl<F, T, U> Future for PollOnce<'_, F>
866 where
867 F: Future<Output = Result<T, U>> + Unpin,
868 {
869 type Output = Option<()>;
870
871 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872 match Pin::new(&mut self.0).poll(cx) {
873 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
874 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
875 Poll::Pending => Poll::Ready(None),
876 }
877 }
878 }
879
880 #[tokio::test]
881 async fn test_pool_checkout_returns_none_if_expired() {
882 let pool = pool_no_timer();
883 let key = host_key("foo");
884 let pooled = pool.pooled(c(key.clone()), Uniq(41));
885
886 drop(pooled);
887 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
888 let mut checkout = pool.checkout(key);
889 let poll_once = PollOnce(&mut checkout);
890 let is_not_ready = poll_once.await.is_none();
891 assert!(is_not_ready);
892 }
893
894 #[cfg(feature = "runtime")]
895 #[tokio::test]
896 async fn test_pool_checkout_removes_expired() {
897 let pool = pool_no_timer();
898 let key = host_key("foo");
899
900 pool.pooled(c(key.clone()), Uniq(41));
901 pool.pooled(c(key.clone()), Uniq(5));
902 pool.pooled(c(key.clone()), Uniq(99));
903
904 assert_eq!(
905 pool.locked().idle.get(&key).map(|entries| entries.len()),
906 Some(3)
907 );
908 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
909
910 let mut checkout = pool.checkout(key.clone());
911 let poll_once = PollOnce(&mut checkout);
912 // checkout.await should clean out the expired
913 poll_once.await;
914 assert!(pool.locked().idle.get(&key).is_none());
915 }
916
917 #[test]
918 fn test_pool_max_idle_per_host() {
919 let pool = pool_max_idle_no_timer(2);
920 let key = host_key("foo");
921
922 pool.pooled(c(key.clone()), Uniq(41));
923 pool.pooled(c(key.clone()), Uniq(5));
924 pool.pooled(c(key.clone()), Uniq(99));
925
926 // pooled and dropped 3, max_idle should only allow 2
927 assert_eq!(
928 pool.locked().idle.get(&key).map(|entries| entries.len()),
929 Some(2)
930 );
931 }
932
933 #[cfg(feature = "runtime")]
934 #[tokio::test]
935 async fn test_pool_timer_removes_expired() {
936 let _ = pretty_env_logger::try_init();
937 tokio::time::pause();
938
939 let pool = Pool::new(
940 super::Config {
941 idle_timeout: Some(Duration::from_millis(10)),
942 max_idle_per_host: std::usize::MAX,
943 },
944 &Exec::Default,
945 );
946
947 let key = host_key("foo");
948
949 pool.pooled(c(key.clone()), Uniq(41));
950 pool.pooled(c(key.clone()), Uniq(5));
951 pool.pooled(c(key.clone()), Uniq(99));
952
953 assert_eq!(
954 pool.locked().idle.get(&key).map(|entries| entries.len()),
955 Some(3)
956 );
957
958 // Let the timer tick passed the expiration...
959 tokio::time::advance(Duration::from_millis(30)).await;
960 // Yield so the Interval can reap...
961 tokio::task::yield_now().await;
962
963 assert!(pool.locked().idle.get(&key).is_none());
964 }
965
966 #[tokio::test]
967 async fn test_pool_checkout_task_unparked() {
968 use futures_util::future::join;
969 use futures_util::FutureExt;
970
971 let pool = pool_no_timer();
972 let key = host_key("foo");
973 let pooled = pool.pooled(c(key.clone()), Uniq(41));
974
975 let checkout = join(pool.checkout(key), async {
976 // the checkout future will park first,
977 // and then this lazy future will be polled, which will insert
978 // the pooled back into the pool
979 //
980 // this test makes sure that doing so will unpark the checkout
981 drop(pooled);
982 })
983 .map(|(entry, _)| entry);
984
985 assert_eq!(*checkout.await.unwrap(), Uniq(41));
986 }
987
988 #[tokio::test]
989 async fn test_pool_checkout_drop_cleans_up_waiters() {
990 let pool = pool_no_timer::<Uniq<i32>>();
991 let key = host_key("foo");
992
993 let mut checkout1 = pool.checkout(key.clone());
994 let mut checkout2 = pool.checkout(key.clone());
995
996 let poll_once1 = PollOnce(&mut checkout1);
997 let poll_once2 = PollOnce(&mut checkout2);
998
999 // first poll needed to get into Pool's parked
1000 poll_once1.await;
1001 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1002 poll_once2.await;
1003 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1004
1005 // on drop, clean up Pool
1006 drop(checkout1);
1007 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1008
1009 drop(checkout2);
1010 assert!(pool.locked().waiters.get(&key).is_none());
1011 }
1012
1013 #[derive(Debug)]
1014 struct CanClose {
1015 #[allow(unused)]
1016 val: i32,
1017 closed: bool,
1018 }
1019
1020 impl Poolable for CanClose {
1021 fn is_open(&self) -> bool {
1022 !self.closed
1023 }
1024
1025 fn reserve(self) -> Reservation<Self> {
1026 Reservation::Unique(self)
1027 }
1028
1029 fn can_share(&self) -> bool {
1030 false
1031 }
1032 }
1033
1034 #[test]
1035 fn pooled_drop_if_closed_doesnt_reinsert() {
1036 let pool = pool_no_timer();
1037 let key = host_key("foo");
1038 pool.pooled(
1039 c(key.clone()),
1040 CanClose {
1041 val: 57,
1042 closed: true,
1043 },
1044 );
1045
1046 assert!(!pool.locked().idle.contains_key(&key));
1047 }
1048}
1049