1use std::collections::{HashMap, HashSet, VecDeque};
2use std::error::Error as StdError;
3use std::fmt;
4use std::ops::{Deref, DerefMut};
5use std::sync::{Arc, Mutex, Weak};
6
7#[cfg(not(feature = "runtime"))]
8use std::time::{Duration, Instant};
9
10use futures_channel::oneshot;
11#[cfg(feature = "runtime")]
12use tokio::time::{Duration, Instant, Interval};
13use tracing::{debug, trace};
14
15use super::client::Ver;
16use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
17
18// FIXME: allow() required due to `impl Trait` leaking types to this lint
19#[allow(missing_debug_implementations)]
20pub(super) struct Pool<T> {
21 // If the pool is disabled, this is None.
22 inner: Option<Arc<Mutex<PoolInner<T>>>>,
23}
24
25// Before using a pooled connection, make sure the sender is not dead.
26//
27// This is a trait to allow the `client::pool::tests` to work for `i32`.
28//
29// See https://github.com/hyperium/hyper/issues/1429
30pub(super) trait Poolable: Unpin + Send + Sized + 'static {
31 fn is_open(&self) -> bool;
32 /// Reserve this connection.
33 ///
34 /// Allows for HTTP/2 to return a shared reservation.
35 fn reserve(self) -> Reservation<Self>;
36 fn can_share(&self) -> bool;
37}
38
39/// When checking out a pooled connection, it might be that the connection
40/// only supports a single reservation, or it might be usable for many.
41///
42/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
43/// used for multiple requests.
44// FIXME: allow() required due to `impl Trait` leaking types to this lint
45#[allow(missing_debug_implementations)]
46pub(super) enum Reservation<T> {
47 /// This connection could be used multiple times, the first one will be
48 /// reinserted into the `idle` pool, and the second will be given to
49 /// the `Checkout`.
50 #[cfg(feature = "http2")]
51 Shared(T, T),
52 /// This connection requires unique access. It will be returned after
53 /// use is complete.
54 Unique(T),
55}
56
57/// Simple type alias in case the key type needs to be adjusted.
58pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
59
60struct PoolInner<T> {
61 // A flag that a connection is being established, and the connection
62 // should be shared. This prevents making multiple HTTP/2 connections
63 // to the same host.
64 connecting: HashSet<Key>,
65 // These are internal Conns sitting in the event loop in the KeepAlive
66 // state, waiting to receive a new Request to send on the socket.
67 idle: HashMap<Key, Vec<Idle<T>>>,
68 max_idle_per_host: usize,
69 // These are outstanding Checkouts that are waiting for a socket to be
70 // able to send a Request one. This is used when "racing" for a new
71 // connection.
72 //
73 // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
74 // for the Pool to receive an idle Conn. When a Conn becomes idle,
75 // this list is checked for any parked Checkouts, and tries to notify
76 // them that the Conn could be used instead of waiting for a brand new
77 // connection.
78 waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
79 // A oneshot channel is used to allow the interval to be notified when
80 // the Pool completely drops. That way, the interval can cancel immediately.
81 #[cfg(feature = "runtime")]
82 idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
83 #[cfg(feature = "runtime")]
84 exec: Exec,
85 timeout: Option<Duration>,
86}
87
88// This is because `Weak::new()` *allocates* space for `T`, even if it
89// doesn't need it!
90struct WeakOpt<T>(Option<Weak<T>>);
91
92#[derive(Clone, Copy, Debug)]
93pub(super) struct Config {
94 pub(super) idle_timeout: Option<Duration>,
95 pub(super) max_idle_per_host: usize,
96}
97
98impl Config {
99 pub(super) fn is_enabled(&self) -> bool {
100 self.max_idle_per_host > 0
101 }
102}
103
104impl<T> Pool<T> {
105 pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
106 let inner = if config.is_enabled() {
107 Some(Arc::new(Mutex::new(PoolInner {
108 connecting: HashSet::new(),
109 idle: HashMap::new(),
110 #[cfg(feature = "runtime")]
111 idle_interval_ref: None,
112 max_idle_per_host: config.max_idle_per_host,
113 waiters: HashMap::new(),
114 #[cfg(feature = "runtime")]
115 exec: __exec.clone(),
116 timeout: config.idle_timeout,
117 })))
118 } else {
119 None
120 };
121
122 Pool { inner }
123 }
124
125 fn is_enabled(&self) -> bool {
126 self.inner.is_some()
127 }
128
129 #[cfg(test)]
130 pub(super) fn no_timer(&self) {
131 // Prevent an actual interval from being created for this pool...
132 #[cfg(feature = "runtime")]
133 {
134 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
135 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
136 let (tx, _) = oneshot::channel();
137 inner.idle_interval_ref = Some(tx);
138 }
139 }
140}
141
142impl<T: Poolable> Pool<T> {
143 /// Returns a `Checkout` which is a future that resolves if an idle
144 /// connection becomes available.
145 pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
146 Checkout {
147 key,
148 pool: self.clone(),
149 waiter: None,
150 }
151 }
152
153 /// Ensure that there is only ever 1 connecting task for HTTP/2
154 /// connections. This does nothing for HTTP/1.
155 pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
156 if ver == Ver::Http2 {
157 if let Some(ref enabled) = self.inner {
158 let mut inner = enabled.lock().unwrap();
159 return if inner.connecting.insert(key.clone()) {
160 let connecting = Connecting {
161 key: key.clone(),
162 pool: WeakOpt::downgrade(enabled),
163 };
164 Some(connecting)
165 } else {
166 trace!("HTTP/2 connecting already in progress for {:?}", key);
167 None
168 };
169 }
170 }
171
172 // else
173 Some(Connecting {
174 key: key.clone(),
175 // in HTTP/1's case, there is never a lock, so we don't
176 // need to do anything in Drop.
177 pool: WeakOpt::none(),
178 })
179 }
180
181 #[cfg(test)]
182 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
183 self.inner.as_ref().expect("enabled").lock().expect("lock")
184 }
185
186 /* Used in client/tests.rs...
187 #[cfg(feature = "runtime")]
188 #[cfg(test)]
189 pub(super) fn h1_key(&self, s: &str) -> Key {
190 Arc::new(s.to_string())
191 }
192
193 #[cfg(feature = "runtime")]
194 #[cfg(test)]
195 pub(super) fn idle_count(&self, key: &Key) -> usize {
196 self
197 .locked()
198 .idle
199 .get(key)
200 .map(|list| list.len())
201 .unwrap_or(0)
202 }
203 */
204
205 pub(super) fn pooled(
206 &self,
207 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
208 value: T,
209 ) -> Pooled<T> {
210 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
211 match value.reserve() {
212 #[cfg(feature = "http2")]
213 Reservation::Shared(to_insert, to_return) => {
214 let mut inner = enabled.lock().unwrap();
215 inner.put(connecting.key.clone(), to_insert, enabled);
216 // Do this here instead of Drop for Connecting because we
217 // already have a lock, no need to lock the mutex twice.
218 inner.connected(&connecting.key);
219 // prevent the Drop of Connecting from repeating inner.connected()
220 connecting.pool = WeakOpt::none();
221
222 // Shared reservations don't need a reference to the pool,
223 // since the pool always keeps a copy.
224 (to_return, WeakOpt::none())
225 }
226 Reservation::Unique(value) => {
227 // Unique reservations must take a reference to the pool
228 // since they hope to reinsert once the reservation is
229 // completed
230 (value, WeakOpt::downgrade(enabled))
231 }
232 }
233 } else {
234 // If pool is not enabled, skip all the things...
235
236 // The Connecting should have had no pool ref
237 debug_assert!(connecting.pool.upgrade().is_none());
238
239 (value, WeakOpt::none())
240 };
241 Pooled {
242 key: connecting.key.clone(),
243 is_reused: false,
244 pool: pool_ref,
245 value: Some(value),
246 }
247 }
248
249 fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
250 debug!("reuse idle connection for {:?}", key);
251 // TODO: unhack this
252 // In Pool::pooled(), which is used for inserting brand new connections,
253 // there's some code that adjusts the pool reference taken depending
254 // on if the Reservation can be shared or is unique. By the time
255 // reuse() is called, the reservation has already been made, and
256 // we just have the final value, without knowledge of if this is
257 // unique or shared. So, the hack is to just assume Ver::Http2 means
258 // shared... :(
259 let mut pool_ref = WeakOpt::none();
260 if !value.can_share() {
261 if let Some(ref enabled) = self.inner {
262 pool_ref = WeakOpt::downgrade(enabled);
263 }
264 }
265
266 Pooled {
267 is_reused: true,
268 key: key.clone(),
269 pool: pool_ref,
270 value: Some(value),
271 }
272 }
273}
274
275/// Pop off this list, looking for a usable connection that hasn't expired.
276struct IdlePopper<'a, T> {
277 key: &'a Key,
278 list: &'a mut Vec<Idle<T>>,
279}
280
281impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
282 fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
283 while let Some(entry) = self.list.pop() {
284 // If the connection has been closed, or is older than our idle
285 // timeout, simply drop it and keep looking...
286 if !entry.value.is_open() {
287 trace!("removing closed connection for {:?}", self.key);
288 continue;
289 }
290 // TODO: Actually, since the `idle` list is pushed to the end always,
291 // that would imply that if *this* entry is expired, then anything
292 // "earlier" in the list would *have* to be expired also... Right?
293 //
294 // In that case, we could just break out of the loop and drop the
295 // whole list...
296 if expiration.expires(entry.idle_at) {
297 trace!("removing expired connection for {:?}", self.key);
298 continue;
299 }
300
301 let value = match entry.value.reserve() {
302 #[cfg(feature = "http2")]
303 Reservation::Shared(to_reinsert, to_checkout) => {
304 self.list.push(Idle {
305 idle_at: Instant::now(),
306 value: to_reinsert,
307 });
308 to_checkout
309 }
310 Reservation::Unique(unique) => unique,
311 };
312
313 return Some(Idle {
314 idle_at: entry.idle_at,
315 value,
316 });
317 }
318
319 None
320 }
321}
322
323impl<T: Poolable> PoolInner<T> {
324 fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
325 if value.can_share() && self.idle.contains_key(&key) {
326 trace!("put; existing idle HTTP/2 connection for {:?}", key);
327 return;
328 }
329 trace!("put; add idle connection for {:?}", key);
330 let mut remove_waiters = false;
331 let mut value = Some(value);
332 if let Some(waiters) = self.waiters.get_mut(&key) {
333 while let Some(tx) = waiters.pop_front() {
334 if !tx.is_canceled() {
335 let reserved = value.take().expect("value already sent");
336 let reserved = match reserved.reserve() {
337 #[cfg(feature = "http2")]
338 Reservation::Shared(to_keep, to_send) => {
339 value = Some(to_keep);
340 to_send
341 }
342 Reservation::Unique(uniq) => uniq,
343 };
344 match tx.send(reserved) {
345 Ok(()) => {
346 if value.is_none() {
347 break;
348 } else {
349 continue;
350 }
351 }
352 Err(e) => {
353 value = Some(e);
354 }
355 }
356 }
357
358 trace!("put; removing canceled waiter for {:?}", key);
359 }
360 remove_waiters = waiters.is_empty();
361 }
362 if remove_waiters {
363 self.waiters.remove(&key);
364 }
365
366 match value {
367 Some(value) => {
368 // borrow-check scope...
369 {
370 let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
371 if self.max_idle_per_host <= idle_list.len() {
372 trace!("max idle per host for {:?}, dropping connection", key);
373 return;
374 }
375
376 debug!("pooling idle connection for {:?}", key);
377 idle_list.push(Idle {
378 value,
379 idle_at: Instant::now(),
380 });
381 }
382
383 #[cfg(feature = "runtime")]
384 {
385 self.spawn_idle_interval(__pool_ref);
386 }
387 }
388 None => trace!("put; found waiter for {:?}", key),
389 }
390 }
391
392 /// A `Connecting` task is complete. Not necessarily successfully,
393 /// but the lock is going away, so clean up.
394 fn connected(&mut self, key: &Key) {
395 let existed = self.connecting.remove(key);
396 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
397 // cancel any waiters. if there are any, it's because
398 // this Connecting task didn't complete successfully.
399 // those waiters would never receive a connection.
400 self.waiters.remove(key);
401 }
402
403 #[cfg(feature = "runtime")]
404 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
405 let (dur, rx) = {
406 if self.idle_interval_ref.is_some() {
407 return;
408 }
409
410 if let Some(dur) = self.timeout {
411 let (tx, rx) = oneshot::channel();
412 self.idle_interval_ref = Some(tx);
413 (dur, rx)
414 } else {
415 return;
416 }
417 };
418
419 let interval = IdleTask {
420 interval: tokio::time::interval(dur),
421 pool: WeakOpt::downgrade(pool_ref),
422 pool_drop_notifier: rx,
423 };
424
425 self.exec.execute(interval);
426 }
427}
428
429impl<T> PoolInner<T> {
430 /// Any `FutureResponse`s that were created will have made a `Checkout`,
431 /// and possibly inserted into the pool that it is waiting for an idle
432 /// connection. If a user ever dropped that future, we need to clean out
433 /// those parked senders.
434 fn clean_waiters(&mut self, key: &Key) {
435 let mut remove_waiters: bool = false;
436 if let Some(waiters: &mut VecDeque>) = self.waiters.get_mut(key) {
437 waiters.retain(|tx: &Sender| !tx.is_canceled());
438 remove_waiters = waiters.is_empty();
439 }
440 if remove_waiters {
441 self.waiters.remove(key);
442 }
443 }
444}
445
446#[cfg(feature = "runtime")]
447impl<T: Poolable> PoolInner<T> {
448 /// This should *only* be called by the IdleTask
449 fn clear_expired(&mut self) {
450 let dur = self.timeout.expect("interval assumes timeout");
451
452 let now = Instant::now();
453 //self.last_idle_check_at = now;
454
455 self.idle.retain(|key, values| {
456 values.retain(|entry| {
457 if !entry.value.is_open() {
458 trace!("idle interval evicting closed for {:?}", key);
459 return false;
460 }
461
462 // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
463 if now.saturating_duration_since(entry.idle_at) > dur {
464 trace!("idle interval evicting expired for {:?}", key);
465 return false;
466 }
467
468 // Otherwise, keep this value...
469 true
470 });
471
472 // returning false evicts this key/val
473 !values.is_empty()
474 });
475 }
476}
477
478impl<T> Clone for Pool<T> {
479 fn clone(&self) -> Pool<T> {
480 Pool {
481 inner: self.inner.clone(),
482 }
483 }
484}
485
486/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
487// Note: The bounds `T: Poolable` is needed for the Drop impl.
488pub(super) struct Pooled<T: Poolable> {
489 value: Option<T>,
490 is_reused: bool,
491 key: Key,
492 pool: WeakOpt<Mutex<PoolInner<T>>>,
493}
494
495impl<T: Poolable> Pooled<T> {
496 pub(super) fn is_reused(&self) -> bool {
497 self.is_reused
498 }
499
500 pub(super) fn is_pool_enabled(&self) -> bool {
501 self.pool.0.is_some()
502 }
503
504 fn as_ref(&self) -> &T {
505 self.value.as_ref().expect(msg:"not dropped")
506 }
507
508 fn as_mut(&mut self) -> &mut T {
509 self.value.as_mut().expect(msg:"not dropped")
510 }
511}
512
513impl<T: Poolable> Deref for Pooled<T> {
514 type Target = T;
515 fn deref(&self) -> &T {
516 self.as_ref()
517 }
518}
519
520impl<T: Poolable> DerefMut for Pooled<T> {
521 fn deref_mut(&mut self) -> &mut T {
522 self.as_mut()
523 }
524}
525
526impl<T: Poolable> Drop for Pooled<T> {
527 fn drop(&mut self) {
528 if let Some(value: T) = self.value.take() {
529 if !value.is_open() {
530 // If we *already* know the connection is done here,
531 // it shouldn't be re-inserted back into the pool.
532 return;
533 }
534
535 if let Some(pool: Arc>>) = self.pool.upgrade() {
536 if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() {
537 inner.put(self.key.clone(), value, &pool);
538 }
539 } else if !value.can_share() {
540 trace!("pool dropped, dropping pooled ({:?})", self.key);
541 }
542 // Ver::Http2 is already in the Pool (or dead), so we wouldn't
543 // have an actual reference to the Pool.
544 }
545 }
546}
547
548impl<T: Poolable> fmt::Debug for Pooled<T> {
549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550 f.debug_struct("Pooled").field(name:"key", &self.key).finish()
551 }
552}
553
554struct Idle<T> {
555 idle_at: Instant,
556 value: T,
557}
558
559// FIXME: allow() required due to `impl Trait` leaking types to this lint
560#[allow(missing_debug_implementations)]
561pub(super) struct Checkout<T> {
562 key: Key,
563 pool: Pool<T>,
564 waiter: Option<oneshot::Receiver<T>>,
565}
566
567#[derive(Debug)]
568pub(super) struct CheckoutIsClosedError;
569
570impl StdError for CheckoutIsClosedError {}
571
572impl fmt::Display for CheckoutIsClosedError {
573 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
574 f.write_str(data:"checked out connection was closed")
575 }
576}
577
578impl<T: Poolable> Checkout<T> {
579 fn poll_waiter(
580 &mut self,
581 cx: &mut task::Context<'_>,
582 ) -> Poll<Option<crate::Result<Pooled<T>>>> {
583 if let Some(mut rx) = self.waiter.take() {
584 match Pin::new(&mut rx).poll(cx) {
585 Poll::Ready(Ok(value)) => {
586 if value.is_open() {
587 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
588 } else {
589 Poll::Ready(Some(Err(
590 crate::Error::new_canceled().with(CheckoutIsClosedError)
591 )))
592 }
593 }
594 Poll::Pending => {
595 self.waiter = Some(rx);
596 Poll::Pending
597 }
598 Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
599 crate::Error::new_canceled().with("request has been canceled")
600 ))),
601 }
602 } else {
603 Poll::Ready(None)
604 }
605 }
606
607 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> {
608 let entry = {
609 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
610 let expiration = Expiration::new(inner.timeout);
611 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
612 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
613 // A block to end the mutable borrow on list,
614 // so the map below can check is_empty()
615 {
616 let popper = IdlePopper {
617 key: &self.key,
618 list,
619 };
620 popper.pop(&expiration)
621 }
622 .map(|e| (e, list.is_empty()))
623 });
624
625 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
626 (Some(e), empty)
627 } else {
628 // No entry found means nuke the list for sure.
629 (None, true)
630 };
631 if empty {
632 //TODO: This could be done with the HashMap::entry API instead.
633 inner.idle.remove(&self.key);
634 }
635
636 if entry.is_none() && self.waiter.is_none() {
637 let (tx, mut rx) = oneshot::channel();
638 trace!("checkout waiting for idle connection: {:?}", self.key);
639 inner
640 .waiters
641 .entry(self.key.clone())
642 .or_insert_with(VecDeque::new)
643 .push_back(tx);
644
645 // register the waker with this oneshot
646 assert!(Pin::new(&mut rx).poll(cx).is_pending());
647 self.waiter = Some(rx);
648 }
649
650 entry
651 };
652
653 entry.map(|e| self.pool.reuse(&self.key, e.value))
654 }
655}
656
657impl<T: Poolable> Future for Checkout<T> {
658 type Output = crate::Result<Pooled<T>>;
659
660 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
661 if let Some(pooled: Pooled) = ready!(self.poll_waiter(cx)?) {
662 return Poll::Ready(Ok(pooled));
663 }
664
665 if let Some(pooled: Pooled) = self.checkout(cx) {
666 Poll::Ready(Ok(pooled))
667 } else if !self.pool.is_enabled() {
668 Poll::Ready(Err(crate::Error::new_canceled().with(cause:"pool is disabled")))
669 } else {
670 // There's a new waiter, already registered in self.checkout()
671 debug_assert!(self.waiter.is_some());
672 Poll::Pending
673 }
674 }
675}
676
677impl<T> Drop for Checkout<T> {
678 fn drop(&mut self) {
679 if self.waiter.take().is_some() {
680 trace!("checkout dropped for {:?}", self.key);
681 if let Some(Ok(mut inner: MutexGuard<'_, PoolInner<…>>)) = self.pool.inner.as_ref().map(|i: &Arc>>| i.lock()) {
682 inner.clean_waiters(&self.key);
683 }
684 }
685 }
686}
687
688// FIXME: allow() required due to `impl Trait` leaking types to this lint
689#[allow(missing_debug_implementations)]
690pub(super) struct Connecting<T: Poolable> {
691 key: Key,
692 pool: WeakOpt<Mutex<PoolInner<T>>>,
693}
694
695impl<T: Poolable> Connecting<T> {
696 pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
697 debug_assert!(
698 self.pool.0.is_none(),
699 "Connecting::alpn_h2 but already Http2"
700 );
701
702 pool.connecting(&self.key, Ver::Http2)
703 }
704}
705
706impl<T: Poolable> Drop for Connecting<T> {
707 fn drop(&mut self) {
708 if let Some(pool: Arc>>) = self.pool.upgrade() {
709 // No need to panic on drop, that could abort!
710 if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() {
711 inner.connected(&self.key);
712 }
713 }
714 }
715}
716
717struct Expiration(Option<Duration>);
718
719impl Expiration {
720 fn new(dur: Option<Duration>) -> Expiration {
721 Expiration(dur)
722 }
723
724 fn expires(&self, instant: Instant) -> bool {
725 match self.0 {
726 // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
727 Some(timeout: Duration) => Instant::now().saturating_duration_since(earlier:instant) > timeout,
728 None => false,
729 }
730 }
731}
732
733#[cfg(feature = "runtime")]
734pin_project_lite::pin_project! {
735 struct IdleTask<T> {
736 #[pin]
737 interval: Interval,
738 pool: WeakOpt<Mutex<PoolInner<T>>>,
739 // This allows the IdleTask to be notified as soon as the entire
740 // Pool is fully dropped, and shutdown. This channel is never sent on,
741 // but Err(Canceled) will be received when the Pool is dropped.
742 #[pin]
743 pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
744 }
745}
746
747#[cfg(feature = "runtime")]
748impl<T: Poolable + 'static> Future for IdleTask<T> {
749 type Output = ();
750
751 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
752 let mut this = self.project();
753 loop {
754 match this.pool_drop_notifier.as_mut().poll(cx) {
755 Poll::Ready(Ok(n)) => match n {},
756 Poll::Pending => (),
757 Poll::Ready(Err(_canceled)) => {
758 trace!("pool closed, canceling idle interval");
759 return Poll::Ready(());
760 }
761 }
762
763 ready!(this.interval.as_mut().poll_tick(cx));
764
765 if let Some(inner) = this.pool.upgrade() {
766 if let Ok(mut inner) = inner.lock() {
767 trace!("idle interval checking for expired");
768 inner.clear_expired();
769 continue;
770 }
771 }
772 return Poll::Ready(());
773 }
774 }
775}
776
777impl<T> WeakOpt<T> {
778 fn none() -> Self {
779 WeakOpt(None)
780 }
781
782 fn downgrade(arc: &Arc<T>) -> Self {
783 WeakOpt(Some(Arc::downgrade(this:arc)))
784 }
785
786 fn upgrade(&self) -> Option<Arc<T>> {
787 self.0.as_ref().and_then(Weak::upgrade)
788 }
789}
790
791#[cfg(test)]
792mod tests {
793 use std::task::Poll;
794 use std::time::Duration;
795
796 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
797 use crate::common::{exec::Exec, task, Future, Pin};
798
799 /// Test unique reservations.
800 #[derive(Debug, PartialEq, Eq)]
801 struct Uniq<T>(T);
802
803 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
804 fn is_open(&self) -> bool {
805 true
806 }
807
808 fn reserve(self) -> Reservation<Self> {
809 Reservation::Unique(self)
810 }
811
812 fn can_share(&self) -> bool {
813 false
814 }
815 }
816
817 fn c<T: Poolable>(key: Key) -> Connecting<T> {
818 Connecting {
819 key,
820 pool: WeakOpt::none(),
821 }
822 }
823
824 fn host_key(s: &str) -> Key {
825 (http::uri::Scheme::HTTP, s.parse().expect("host key"))
826 }
827
828 fn pool_no_timer<T>() -> Pool<T> {
829 pool_max_idle_no_timer(::std::usize::MAX)
830 }
831
832 fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
833 let pool = Pool::new(
834 super::Config {
835 idle_timeout: Some(Duration::from_millis(100)),
836 max_idle_per_host: max_idle,
837 },
838 &Exec::Default,
839 );
840 pool.no_timer();
841 pool
842 }
843
844 #[tokio::test]
845 async fn test_pool_checkout_smoke() {
846 let pool = pool_no_timer();
847 let key = host_key("foo");
848 let pooled = pool.pooled(c(key.clone()), Uniq(41));
849
850 drop(pooled);
851
852 match pool.checkout(key).await {
853 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
854 Err(_) => panic!("not ready"),
855 };
856 }
857
858 /// Helper to check if the future is ready after polling once.
859 struct PollOnce<'a, F>(&'a mut F);
860
861 impl<F, T, U> Future for PollOnce<'_, F>
862 where
863 F: Future<Output = Result<T, U>> + Unpin,
864 {
865 type Output = Option<()>;
866
867 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
868 match Pin::new(&mut self.0).poll(cx) {
869 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
870 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
871 Poll::Pending => Poll::Ready(None),
872 }
873 }
874 }
875
876 #[tokio::test]
877 async fn test_pool_checkout_returns_none_if_expired() {
878 let pool = pool_no_timer();
879 let key = host_key("foo");
880 let pooled = pool.pooled(c(key.clone()), Uniq(41));
881
882 drop(pooled);
883 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
884 let mut checkout = pool.checkout(key);
885 let poll_once = PollOnce(&mut checkout);
886 let is_not_ready = poll_once.await.is_none();
887 assert!(is_not_ready);
888 }
889
890 #[cfg(feature = "runtime")]
891 #[tokio::test]
892 async fn test_pool_checkout_removes_expired() {
893 let pool = pool_no_timer();
894 let key = host_key("foo");
895
896 pool.pooled(c(key.clone()), Uniq(41));
897 pool.pooled(c(key.clone()), Uniq(5));
898 pool.pooled(c(key.clone()), Uniq(99));
899
900 assert_eq!(
901 pool.locked().idle.get(&key).map(|entries| entries.len()),
902 Some(3)
903 );
904 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
905
906 let mut checkout = pool.checkout(key.clone());
907 let poll_once = PollOnce(&mut checkout);
908 // checkout.await should clean out the expired
909 poll_once.await;
910 assert!(pool.locked().idle.get(&key).is_none());
911 }
912
913 #[test]
914 fn test_pool_max_idle_per_host() {
915 let pool = pool_max_idle_no_timer(2);
916 let key = host_key("foo");
917
918 pool.pooled(c(key.clone()), Uniq(41));
919 pool.pooled(c(key.clone()), Uniq(5));
920 pool.pooled(c(key.clone()), Uniq(99));
921
922 // pooled and dropped 3, max_idle should only allow 2
923 assert_eq!(
924 pool.locked().idle.get(&key).map(|entries| entries.len()),
925 Some(2)
926 );
927 }
928
929 #[cfg(feature = "runtime")]
930 #[tokio::test]
931 async fn test_pool_timer_removes_expired() {
932 let _ = pretty_env_logger::try_init();
933 tokio::time::pause();
934
935 let pool = Pool::new(
936 super::Config {
937 idle_timeout: Some(Duration::from_millis(10)),
938 max_idle_per_host: std::usize::MAX,
939 },
940 &Exec::Default,
941 );
942
943 let key = host_key("foo");
944
945 pool.pooled(c(key.clone()), Uniq(41));
946 pool.pooled(c(key.clone()), Uniq(5));
947 pool.pooled(c(key.clone()), Uniq(99));
948
949 assert_eq!(
950 pool.locked().idle.get(&key).map(|entries| entries.len()),
951 Some(3)
952 );
953
954 // Let the timer tick passed the expiration...
955 tokio::time::advance(Duration::from_millis(30)).await;
956 // Yield so the Interval can reap...
957 tokio::task::yield_now().await;
958
959 assert!(pool.locked().idle.get(&key).is_none());
960 }
961
962 #[tokio::test]
963 async fn test_pool_checkout_task_unparked() {
964 use futures_util::future::join;
965 use futures_util::FutureExt;
966
967 let pool = pool_no_timer();
968 let key = host_key("foo");
969 let pooled = pool.pooled(c(key.clone()), Uniq(41));
970
971 let checkout = join(pool.checkout(key), async {
972 // the checkout future will park first,
973 // and then this lazy future will be polled, which will insert
974 // the pooled back into the pool
975 //
976 // this test makes sure that doing so will unpark the checkout
977 drop(pooled);
978 })
979 .map(|(entry, _)| entry);
980
981 assert_eq!(*checkout.await.unwrap(), Uniq(41));
982 }
983
984 #[tokio::test]
985 async fn test_pool_checkout_drop_cleans_up_waiters() {
986 let pool = pool_no_timer::<Uniq<i32>>();
987 let key = host_key("foo");
988
989 let mut checkout1 = pool.checkout(key.clone());
990 let mut checkout2 = pool.checkout(key.clone());
991
992 let poll_once1 = PollOnce(&mut checkout1);
993 let poll_once2 = PollOnce(&mut checkout2);
994
995 // first poll needed to get into Pool's parked
996 poll_once1.await;
997 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
998 poll_once2.await;
999 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1000
1001 // on drop, clean up Pool
1002 drop(checkout1);
1003 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1004
1005 drop(checkout2);
1006 assert!(pool.locked().waiters.get(&key).is_none());
1007 }
1008
1009 #[derive(Debug)]
1010 struct CanClose {
1011 #[allow(unused)]
1012 val: i32,
1013 closed: bool,
1014 }
1015
1016 impl Poolable for CanClose {
1017 fn is_open(&self) -> bool {
1018 !self.closed
1019 }
1020
1021 fn reserve(self) -> Reservation<Self> {
1022 Reservation::Unique(self)
1023 }
1024
1025 fn can_share(&self) -> bool {
1026 false
1027 }
1028 }
1029
1030 #[test]
1031 fn pooled_drop_if_closed_doesnt_reinsert() {
1032 let pool = pool_no_timer();
1033 let key = host_key("foo");
1034 pool.pooled(
1035 c(key.clone()),
1036 CanClose {
1037 val: 57,
1038 closed: true,
1039 },
1040 );
1041
1042 assert!(!pool.locked().idle.contains_key(&key));
1043 }
1044}
1045