1 | #![allow (dead_code)] |
2 | |
3 | use std::collections::{HashMap, HashSet, VecDeque}; |
4 | use std::convert::Infallible; |
5 | use std::error::Error as StdError; |
6 | use std::fmt::{self, Debug}; |
7 | use std::future::Future; |
8 | use std::hash::Hash; |
9 | use std::ops::{Deref, DerefMut}; |
10 | use std::pin::Pin; |
11 | use std::sync::{Arc, Mutex, Weak}; |
12 | use std::task::{self, Poll}; |
13 | |
14 | use std::time::{Duration, Instant}; |
15 | |
16 | use futures_channel::oneshot; |
17 | use futures_util::ready; |
18 | use tracing::{debug, trace}; |
19 | |
20 | use hyper::rt::Sleep; |
21 | use hyper::rt::Timer as _; |
22 | |
23 | use crate::common::{exec, exec::Exec, timer::Timer}; |
24 | |
25 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
26 | #[allow (missing_debug_implementations)] |
27 | pub struct Pool<T, K: Key> { |
28 | // If the pool is disabled, this is None. |
29 | inner: Option<Arc<Mutex<PoolInner<T, K>>>>, |
30 | } |
31 | |
32 | // Before using a pooled connection, make sure the sender is not dead. |
33 | // |
34 | // This is a trait to allow the `client::pool::tests` to work for `i32`. |
35 | // |
36 | // See https://github.com/hyperium/hyper/issues/1429 |
37 | pub trait Poolable: Unpin + Send + Sized + 'static { |
38 | fn is_open(&self) -> bool; |
39 | /// Reserve this connection. |
40 | /// |
41 | /// Allows for HTTP/2 to return a shared reservation. |
42 | fn reserve(self) -> Reservation<Self>; |
43 | fn can_share(&self) -> bool; |
44 | } |
45 | |
46 | pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} |
47 | |
48 | impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} |
49 | |
50 | /// A marker to identify what version a pooled connection is. |
51 | #[derive (Clone, Copy, Debug, PartialEq, Eq, Hash)] |
52 | #[allow (dead_code)] |
53 | pub enum Ver { |
54 | Auto, |
55 | Http2, |
56 | } |
57 | |
58 | /// When checking out a pooled connection, it might be that the connection |
59 | /// only supports a single reservation, or it might be usable for many. |
60 | /// |
61 | /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be |
62 | /// used for multiple requests. |
63 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
64 | #[allow (missing_debug_implementations)] |
65 | pub enum Reservation<T> { |
66 | /// This connection could be used multiple times, the first one will be |
67 | /// reinserted into the `idle` pool, and the second will be given to |
68 | /// the `Checkout`. |
69 | #[cfg (feature = "http2" )] |
70 | Shared(T, T), |
71 | /// This connection requires unique access. It will be returned after |
72 | /// use is complete. |
73 | Unique(T), |
74 | } |
75 | |
76 | /// Simple type alias in case the key type needs to be adjusted. |
77 | // pub type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; |
78 | |
79 | struct PoolInner<T, K: Eq + Hash> { |
80 | // A flag that a connection is being established, and the connection |
81 | // should be shared. This prevents making multiple HTTP/2 connections |
82 | // to the same host. |
83 | connecting: HashSet<K>, |
84 | // These are internal Conns sitting in the event loop in the KeepAlive |
85 | // state, waiting to receive a new Request to send on the socket. |
86 | idle: HashMap<K, Vec<Idle<T>>>, |
87 | max_idle_per_host: usize, |
88 | // These are outstanding Checkouts that are waiting for a socket to be |
89 | // able to send a Request one. This is used when "racing" for a new |
90 | // connection. |
91 | // |
92 | // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait |
93 | // for the Pool to receive an idle Conn. When a Conn becomes idle, |
94 | // this list is checked for any parked Checkouts, and tries to notify |
95 | // them that the Conn could be used instead of waiting for a brand new |
96 | // connection. |
97 | waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>, |
98 | // A oneshot channel is used to allow the interval to be notified when |
99 | // the Pool completely drops. That way, the interval can cancel immediately. |
100 | idle_interval_ref: Option<oneshot::Sender<Infallible>>, |
101 | exec: Exec, |
102 | timer: Option<Timer>, |
103 | timeout: Option<Duration>, |
104 | } |
105 | |
106 | // This is because `Weak::new()` *allocates* space for `T`, even if it |
107 | // doesn't need it! |
108 | struct WeakOpt<T>(Option<Weak<T>>); |
109 | |
110 | #[derive (Clone, Copy, Debug)] |
111 | pub struct Config { |
112 | pub idle_timeout: Option<Duration>, |
113 | pub max_idle_per_host: usize, |
114 | } |
115 | |
116 | impl Config { |
117 | pub fn is_enabled(&self) -> bool { |
118 | self.max_idle_per_host > 0 |
119 | } |
120 | } |
121 | |
122 | impl<T, K: Key> Pool<T, K> { |
123 | pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K> |
124 | where |
125 | E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static, |
126 | M: hyper::rt::Timer + Send + Sync + Clone + 'static, |
127 | { |
128 | let exec = Exec::new(executor); |
129 | let timer = timer.map(|t| Timer::new(t)); |
130 | let inner = if config.is_enabled() { |
131 | Some(Arc::new(Mutex::new(PoolInner { |
132 | connecting: HashSet::new(), |
133 | idle: HashMap::new(), |
134 | idle_interval_ref: None, |
135 | max_idle_per_host: config.max_idle_per_host, |
136 | waiters: HashMap::new(), |
137 | exec, |
138 | timer, |
139 | timeout: config.idle_timeout, |
140 | }))) |
141 | } else { |
142 | None |
143 | }; |
144 | |
145 | Pool { inner } |
146 | } |
147 | |
148 | pub(crate) fn is_enabled(&self) -> bool { |
149 | self.inner.is_some() |
150 | } |
151 | |
152 | #[cfg (test)] |
153 | pub(super) fn no_timer(&self) { |
154 | // Prevent an actual interval from being created for this pool... |
155 | { |
156 | let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); |
157 | assert!(inner.idle_interval_ref.is_none(), "timer already spawned" ); |
158 | let (tx, _) = oneshot::channel(); |
159 | inner.idle_interval_ref = Some(tx); |
160 | } |
161 | } |
162 | } |
163 | |
164 | impl<T: Poolable, K: Key> Pool<T, K> { |
165 | /// Returns a `Checkout` which is a future that resolves if an idle |
166 | /// connection becomes available. |
167 | pub fn checkout(&self, key: K) -> Checkout<T, K> { |
168 | Checkout { |
169 | key, |
170 | pool: self.clone(), |
171 | waiter: None, |
172 | } |
173 | } |
174 | |
175 | /// Ensure that there is only ever 1 connecting task for HTTP/2 |
176 | /// connections. This does nothing for HTTP/1. |
177 | pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> { |
178 | if ver == Ver::Http2 { |
179 | if let Some(ref enabled) = self.inner { |
180 | let mut inner = enabled.lock().unwrap(); |
181 | return if inner.connecting.insert(key.clone()) { |
182 | let connecting = Connecting { |
183 | key: key.clone(), |
184 | pool: WeakOpt::downgrade(enabled), |
185 | }; |
186 | Some(connecting) |
187 | } else { |
188 | trace!("HTTP/2 connecting already in progress for {:?}" , key); |
189 | None |
190 | }; |
191 | } |
192 | } |
193 | |
194 | // else |
195 | Some(Connecting { |
196 | key: key.clone(), |
197 | // in HTTP/1's case, there is never a lock, so we don't |
198 | // need to do anything in Drop. |
199 | pool: WeakOpt::none(), |
200 | }) |
201 | } |
202 | |
203 | #[cfg (test)] |
204 | fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> { |
205 | self.inner.as_ref().expect("enabled" ).lock().expect("lock" ) |
206 | } |
207 | |
208 | /* Used in client/tests.rs... |
209 | #[cfg(test)] |
210 | pub(super) fn h1_key(&self, s: &str) -> Key { |
211 | Arc::new(s.to_string()) |
212 | } |
213 | |
214 | #[cfg(test)] |
215 | pub(super) fn idle_count(&self, key: &Key) -> usize { |
216 | self |
217 | .locked() |
218 | .idle |
219 | .get(key) |
220 | .map(|list| list.len()) |
221 | .unwrap_or(0) |
222 | } |
223 | */ |
224 | |
225 | pub fn pooled( |
226 | &self, |
227 | #[cfg_attr (not(feature = "http2" ), allow(unused_mut))] mut connecting: Connecting<T, K>, |
228 | value: T, |
229 | ) -> Pooled<T, K> { |
230 | let (value, pool_ref) = if let Some(ref enabled) = self.inner { |
231 | match value.reserve() { |
232 | #[cfg (feature = "http2" )] |
233 | Reservation::Shared(to_insert, to_return) => { |
234 | let mut inner = enabled.lock().unwrap(); |
235 | inner.put(connecting.key.clone(), to_insert, enabled); |
236 | // Do this here instead of Drop for Connecting because we |
237 | // already have a lock, no need to lock the mutex twice. |
238 | inner.connected(&connecting.key); |
239 | // prevent the Drop of Connecting from repeating inner.connected() |
240 | connecting.pool = WeakOpt::none(); |
241 | |
242 | // Shared reservations don't need a reference to the pool, |
243 | // since the pool always keeps a copy. |
244 | (to_return, WeakOpt::none()) |
245 | } |
246 | Reservation::Unique(value) => { |
247 | // Unique reservations must take a reference to the pool |
248 | // since they hope to reinsert once the reservation is |
249 | // completed |
250 | (value, WeakOpt::downgrade(enabled)) |
251 | } |
252 | } |
253 | } else { |
254 | // If pool is not enabled, skip all the things... |
255 | |
256 | // The Connecting should have had no pool ref |
257 | debug_assert!(connecting.pool.upgrade().is_none()); |
258 | |
259 | (value, WeakOpt::none()) |
260 | }; |
261 | Pooled { |
262 | key: connecting.key.clone(), |
263 | is_reused: false, |
264 | pool: pool_ref, |
265 | value: Some(value), |
266 | } |
267 | } |
268 | |
269 | fn reuse(&self, key: &K, value: T) -> Pooled<T, K> { |
270 | debug!("reuse idle connection for {:?}" , key); |
271 | // TODO: unhack this |
272 | // In Pool::pooled(), which is used for inserting brand new connections, |
273 | // there's some code that adjusts the pool reference taken depending |
274 | // on if the Reservation can be shared or is unique. By the time |
275 | // reuse() is called, the reservation has already been made, and |
276 | // we just have the final value, without knowledge of if this is |
277 | // unique or shared. So, the hack is to just assume Ver::Http2 means |
278 | // shared... :( |
279 | let mut pool_ref = WeakOpt::none(); |
280 | if !value.can_share() { |
281 | if let Some(ref enabled) = self.inner { |
282 | pool_ref = WeakOpt::downgrade(enabled); |
283 | } |
284 | } |
285 | |
286 | Pooled { |
287 | is_reused: true, |
288 | key: key.clone(), |
289 | pool: pool_ref, |
290 | value: Some(value), |
291 | } |
292 | } |
293 | } |
294 | |
295 | /// Pop off this list, looking for a usable connection that hasn't expired. |
296 | struct IdlePopper<'a, T, K> { |
297 | key: &'a K, |
298 | list: &'a mut Vec<Idle<T>>, |
299 | } |
300 | |
301 | impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> { |
302 | fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { |
303 | while let Some(entry) = self.list.pop() { |
304 | // If the connection has been closed, or is older than our idle |
305 | // timeout, simply drop it and keep looking... |
306 | if !entry.value.is_open() { |
307 | trace!("removing closed connection for {:?}" , self.key); |
308 | continue; |
309 | } |
310 | // TODO: Actually, since the `idle` list is pushed to the end always, |
311 | // that would imply that if *this* entry is expired, then anything |
312 | // "earlier" in the list would *have* to be expired also... Right? |
313 | // |
314 | // In that case, we could just break out of the loop and drop the |
315 | // whole list... |
316 | if expiration.expires(entry.idle_at) { |
317 | trace!("removing expired connection for {:?}" , self.key); |
318 | continue; |
319 | } |
320 | |
321 | let value = match entry.value.reserve() { |
322 | #[cfg (feature = "http2" )] |
323 | Reservation::Shared(to_reinsert, to_checkout) => { |
324 | self.list.push(Idle { |
325 | idle_at: Instant::now(), |
326 | value: to_reinsert, |
327 | }); |
328 | to_checkout |
329 | } |
330 | Reservation::Unique(unique) => unique, |
331 | }; |
332 | |
333 | return Some(Idle { |
334 | idle_at: entry.idle_at, |
335 | value, |
336 | }); |
337 | } |
338 | |
339 | None |
340 | } |
341 | } |
342 | |
343 | impl<T: Poolable, K: Key> PoolInner<T, K> { |
344 | fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) { |
345 | if value.can_share() && self.idle.contains_key(&key) { |
346 | trace!("put; existing idle HTTP/2 connection for {:?}" , key); |
347 | return; |
348 | } |
349 | trace!("put; add idle connection for {:?}" , key); |
350 | let mut remove_waiters = false; |
351 | let mut value = Some(value); |
352 | if let Some(waiters) = self.waiters.get_mut(&key) { |
353 | while let Some(tx) = waiters.pop_front() { |
354 | if !tx.is_canceled() { |
355 | let reserved = value.take().expect("value already sent" ); |
356 | let reserved = match reserved.reserve() { |
357 | #[cfg (feature = "http2" )] |
358 | Reservation::Shared(to_keep, to_send) => { |
359 | value = Some(to_keep); |
360 | to_send |
361 | } |
362 | Reservation::Unique(uniq) => uniq, |
363 | }; |
364 | match tx.send(reserved) { |
365 | Ok(()) => { |
366 | if value.is_none() { |
367 | break; |
368 | } else { |
369 | continue; |
370 | } |
371 | } |
372 | Err(e) => { |
373 | value = Some(e); |
374 | } |
375 | } |
376 | } |
377 | |
378 | trace!("put; removing canceled waiter for {:?}" , key); |
379 | } |
380 | remove_waiters = waiters.is_empty(); |
381 | } |
382 | if remove_waiters { |
383 | self.waiters.remove(&key); |
384 | } |
385 | |
386 | match value { |
387 | Some(value) => { |
388 | // borrow-check scope... |
389 | { |
390 | let idle_list = self.idle.entry(key.clone()).or_default(); |
391 | if self.max_idle_per_host <= idle_list.len() { |
392 | trace!("max idle per host for {:?}, dropping connection" , key); |
393 | return; |
394 | } |
395 | |
396 | debug!("pooling idle connection for {:?}" , key); |
397 | idle_list.push(Idle { |
398 | value, |
399 | idle_at: Instant::now(), |
400 | }); |
401 | } |
402 | |
403 | self.spawn_idle_interval(__pool_ref); |
404 | } |
405 | None => trace!("put; found waiter for {:?}" , key), |
406 | } |
407 | } |
408 | |
409 | /// A `Connecting` task is complete. Not necessarily successfully, |
410 | /// but the lock is going away, so clean up. |
411 | fn connected(&mut self, key: &K) { |
412 | let existed = self.connecting.remove(key); |
413 | debug_assert!(existed, "Connecting dropped, key not in pool.connecting" ); |
414 | // cancel any waiters. if there are any, it's because |
415 | // this Connecting task didn't complete successfully. |
416 | // those waiters would never receive a connection. |
417 | self.waiters.remove(key); |
418 | } |
419 | |
420 | fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) { |
421 | if self.idle_interval_ref.is_some() { |
422 | return; |
423 | } |
424 | let dur = if let Some(dur) = self.timeout { |
425 | dur |
426 | } else { |
427 | return; |
428 | }; |
429 | let timer = if let Some(timer) = self.timer.clone() { |
430 | timer |
431 | } else { |
432 | return; |
433 | }; |
434 | let (tx, rx) = oneshot::channel(); |
435 | self.idle_interval_ref = Some(tx); |
436 | |
437 | let interval = IdleTask { |
438 | timer: timer.clone(), |
439 | duration: dur, |
440 | deadline: Instant::now(), |
441 | fut: timer.sleep_until(Instant::now()), // ready at first tick |
442 | pool: WeakOpt::downgrade(pool_ref), |
443 | pool_drop_notifier: rx, |
444 | }; |
445 | |
446 | self.exec.execute(interval); |
447 | } |
448 | } |
449 | |
450 | impl<T, K: Eq + Hash> PoolInner<T, K> { |
451 | /// Any `FutureResponse`s that were created will have made a `Checkout`, |
452 | /// and possibly inserted into the pool that it is waiting for an idle |
453 | /// connection. If a user ever dropped that future, we need to clean out |
454 | /// those parked senders. |
455 | fn clean_waiters(&mut self, key: &K) { |
456 | let mut remove_waiters: bool = false; |
457 | if let Some(waiters: &mut VecDeque>) = self.waiters.get_mut(key) { |
458 | waiters.retain(|tx: &Sender| !tx.is_canceled()); |
459 | remove_waiters = waiters.is_empty(); |
460 | } |
461 | if remove_waiters { |
462 | self.waiters.remove(key); |
463 | } |
464 | } |
465 | } |
466 | |
467 | impl<T: Poolable, K: Key> PoolInner<T, K> { |
468 | /// This should *only* be called by the IdleTask |
469 | fn clear_expired(&mut self) { |
470 | let dur = self.timeout.expect("interval assumes timeout" ); |
471 | |
472 | let now = Instant::now(); |
473 | //self.last_idle_check_at = now; |
474 | |
475 | self.idle.retain(|key, values| { |
476 | values.retain(|entry| { |
477 | if !entry.value.is_open() { |
478 | trace!("idle interval evicting closed for {:?}" , key); |
479 | return false; |
480 | } |
481 | |
482 | // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. |
483 | if now.saturating_duration_since(entry.idle_at) > dur { |
484 | trace!("idle interval evicting expired for {:?}" , key); |
485 | return false; |
486 | } |
487 | |
488 | // Otherwise, keep this value... |
489 | true |
490 | }); |
491 | |
492 | // returning false evicts this key/val |
493 | !values.is_empty() |
494 | }); |
495 | } |
496 | } |
497 | |
498 | impl<T, K: Key> Clone for Pool<T, K> { |
499 | fn clone(&self) -> Pool<T, K> { |
500 | Pool { |
501 | inner: self.inner.clone(), |
502 | } |
503 | } |
504 | } |
505 | |
506 | /// A wrapped poolable value that tries to reinsert to the Pool on Drop. |
507 | // Note: The bounds `T: Poolable` is needed for the Drop impl. |
508 | pub struct Pooled<T: Poolable, K: Key> { |
509 | value: Option<T>, |
510 | is_reused: bool, |
511 | key: K, |
512 | pool: WeakOpt<Mutex<PoolInner<T, K>>>, |
513 | } |
514 | |
515 | impl<T: Poolable, K: Key> Pooled<T, K> { |
516 | pub fn is_reused(&self) -> bool { |
517 | self.is_reused |
518 | } |
519 | |
520 | pub fn is_pool_enabled(&self) -> bool { |
521 | self.pool.0.is_some() |
522 | } |
523 | |
524 | fn as_ref(&self) -> &T { |
525 | self.value.as_ref().expect(msg:"not dropped" ) |
526 | } |
527 | |
528 | fn as_mut(&mut self) -> &mut T { |
529 | self.value.as_mut().expect(msg:"not dropped" ) |
530 | } |
531 | } |
532 | |
533 | impl<T: Poolable, K: Key> Deref for Pooled<T, K> { |
534 | type Target = T; |
535 | fn deref(&self) -> &T { |
536 | self.as_ref() |
537 | } |
538 | } |
539 | |
540 | impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> { |
541 | fn deref_mut(&mut self) -> &mut T { |
542 | self.as_mut() |
543 | } |
544 | } |
545 | |
546 | impl<T: Poolable, K: Key> Drop for Pooled<T, K> { |
547 | fn drop(&mut self) { |
548 | if let Some(value: T) = self.value.take() { |
549 | if !value.is_open() { |
550 | // If we *already* know the connection is done here, |
551 | // it shouldn't be re-inserted back into the pool. |
552 | return; |
553 | } |
554 | |
555 | if let Some(pool: Arc>>) = self.pool.upgrade() { |
556 | if let Ok(mut inner: MutexGuard<'_, PoolInner<…, …>>) = pool.lock() { |
557 | inner.put(self.key.clone(), value, &pool); |
558 | } |
559 | } else if !value.can_share() { |
560 | trace!("pool dropped, dropping pooled ( {:?})" , self.key); |
561 | } |
562 | // Ver::Http2 is already in the Pool (or dead), so we wouldn't |
563 | // have an actual reference to the Pool. |
564 | } |
565 | } |
566 | } |
567 | |
568 | impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> { |
569 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
570 | f.debug_struct("Pooled" ).field(name:"key" , &self.key).finish() |
571 | } |
572 | } |
573 | |
574 | struct Idle<T> { |
575 | idle_at: Instant, |
576 | value: T, |
577 | } |
578 | |
579 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
580 | #[allow (missing_debug_implementations)] |
581 | pub struct Checkout<T, K: Key> { |
582 | key: K, |
583 | pool: Pool<T, K>, |
584 | waiter: Option<oneshot::Receiver<T>>, |
585 | } |
586 | |
587 | #[derive (Debug)] |
588 | #[non_exhaustive ] |
589 | pub enum Error { |
590 | PoolDisabled, |
591 | CheckoutNoLongerWanted, |
592 | CheckedOutClosedValue, |
593 | } |
594 | |
595 | impl Error { |
596 | pub(super) fn is_canceled(&self) -> bool { |
597 | matches!(self, Error::CheckedOutClosedValue) |
598 | } |
599 | } |
600 | |
601 | impl fmt::Display for Error { |
602 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
603 | f.write_str(data:match self { |
604 | Error::PoolDisabled => "pool is disabled" , |
605 | Error::CheckedOutClosedValue => "checked out connection was closed" , |
606 | Error::CheckoutNoLongerWanted => "request was canceled" , |
607 | }) |
608 | } |
609 | } |
610 | |
611 | impl StdError for Error {} |
612 | |
613 | impl<T: Poolable, K: Key> Checkout<T, K> { |
614 | fn poll_waiter( |
615 | &mut self, |
616 | cx: &mut task::Context<'_>, |
617 | ) -> Poll<Option<Result<Pooled<T, K>, Error>>> { |
618 | if let Some(mut rx) = self.waiter.take() { |
619 | match Pin::new(&mut rx).poll(cx) { |
620 | Poll::Ready(Ok(value)) => { |
621 | if value.is_open() { |
622 | Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) |
623 | } else { |
624 | Poll::Ready(Some(Err(Error::CheckedOutClosedValue))) |
625 | } |
626 | } |
627 | Poll::Pending => { |
628 | self.waiter = Some(rx); |
629 | Poll::Pending |
630 | } |
631 | Poll::Ready(Err(_canceled)) => { |
632 | Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted))) |
633 | } |
634 | } |
635 | } else { |
636 | Poll::Ready(None) |
637 | } |
638 | } |
639 | |
640 | fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> { |
641 | let entry = { |
642 | let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); |
643 | let expiration = Expiration::new(inner.timeout); |
644 | let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { |
645 | trace!("take? {:?}: expiration = {:?}" , self.key, expiration.0); |
646 | // A block to end the mutable borrow on list, |
647 | // so the map below can check is_empty() |
648 | { |
649 | let popper = IdlePopper { |
650 | key: &self.key, |
651 | list, |
652 | }; |
653 | popper.pop(&expiration) |
654 | } |
655 | .map(|e| (e, list.is_empty())) |
656 | }); |
657 | |
658 | let (entry, empty) = if let Some((e, empty)) = maybe_entry { |
659 | (Some(e), empty) |
660 | } else { |
661 | // No entry found means nuke the list for sure. |
662 | (None, true) |
663 | }; |
664 | if empty { |
665 | //TODO: This could be done with the HashMap::entry API instead. |
666 | inner.idle.remove(&self.key); |
667 | } |
668 | |
669 | if entry.is_none() && self.waiter.is_none() { |
670 | let (tx, mut rx) = oneshot::channel(); |
671 | trace!("checkout waiting for idle connection: {:?}" , self.key); |
672 | inner |
673 | .waiters |
674 | .entry(self.key.clone()) |
675 | .or_insert_with(VecDeque::new) |
676 | .push_back(tx); |
677 | |
678 | // register the waker with this oneshot |
679 | assert!(Pin::new(&mut rx).poll(cx).is_pending()); |
680 | self.waiter = Some(rx); |
681 | } |
682 | |
683 | entry |
684 | }; |
685 | |
686 | entry.map(|e| self.pool.reuse(&self.key, e.value)) |
687 | } |
688 | } |
689 | |
690 | impl<T: Poolable, K: Key> Future for Checkout<T, K> { |
691 | type Output = Result<Pooled<T, K>, Error>; |
692 | |
693 | fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
694 | if let Some(pooled: Pooled) = ready!(self.poll_waiter(cx)?) { |
695 | return Poll::Ready(Ok(pooled)); |
696 | } |
697 | |
698 | if let Some(pooled: Pooled) = self.checkout(cx) { |
699 | Poll::Ready(Ok(pooled)) |
700 | } else if !self.pool.is_enabled() { |
701 | Poll::Ready(Err(Error::PoolDisabled)) |
702 | } else { |
703 | // There's a new waiter, already registered in self.checkout() |
704 | debug_assert!(self.waiter.is_some()); |
705 | Poll::Pending |
706 | } |
707 | } |
708 | } |
709 | |
710 | impl<T, K: Key> Drop for Checkout<T, K> { |
711 | fn drop(&mut self) { |
712 | if self.waiter.take().is_some() { |
713 | trace!("checkout dropped for {:?}" , self.key); |
714 | if let Some(Ok(mut inner: MutexGuard<'_, PoolInner<…, …>>)) = self.pool.inner.as_ref().map(|i: &Arc>>| i.lock()) { |
715 | inner.clean_waiters(&self.key); |
716 | } |
717 | } |
718 | } |
719 | } |
720 | |
721 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
722 | #[allow (missing_debug_implementations)] |
723 | pub struct Connecting<T: Poolable, K: Key> { |
724 | key: K, |
725 | pool: WeakOpt<Mutex<PoolInner<T, K>>>, |
726 | } |
727 | |
728 | impl<T: Poolable, K: Key> Connecting<T, K> { |
729 | pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> { |
730 | debug_assert!( |
731 | self.pool.0.is_none(), |
732 | "Connecting::alpn_h2 but already Http2" |
733 | ); |
734 | |
735 | pool.connecting(&self.key, Ver::Http2) |
736 | } |
737 | } |
738 | |
739 | impl<T: Poolable, K: Key> Drop for Connecting<T, K> { |
740 | fn drop(&mut self) { |
741 | if let Some(pool: Arc>>) = self.pool.upgrade() { |
742 | // No need to panic on drop, that could abort! |
743 | if let Ok(mut inner: MutexGuard<'_, PoolInner<…, …>>) = pool.lock() { |
744 | inner.connected(&self.key); |
745 | } |
746 | } |
747 | } |
748 | } |
749 | |
750 | struct Expiration(Option<Duration>); |
751 | |
752 | impl Expiration { |
753 | fn new(dur: Option<Duration>) -> Expiration { |
754 | Expiration(dur) |
755 | } |
756 | |
757 | fn expires(&self, instant: Instant) -> bool { |
758 | match self.0 { |
759 | // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. |
760 | Some(timeout: Duration) => Instant::now().saturating_duration_since(earlier:instant) > timeout, |
761 | None => false, |
762 | } |
763 | } |
764 | } |
765 | |
766 | pin_project_lite::pin_project! { |
767 | struct IdleTask<T, K: Key> { |
768 | timer: Timer, |
769 | duration: Duration, |
770 | deadline: Instant, |
771 | fut: Pin<Box<dyn Sleep>>, |
772 | pool: WeakOpt<Mutex<PoolInner<T, K>>>, |
773 | // This allows the IdleTask to be notified as soon as the entire |
774 | // Pool is fully dropped, and shutdown. This channel is never sent on, |
775 | // but Err(Canceled) will be received when the Pool is dropped. |
776 | #[pin] |
777 | pool_drop_notifier: oneshot::Receiver<Infallible>, |
778 | } |
779 | } |
780 | |
781 | impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> { |
782 | type Output = (); |
783 | |
784 | fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
785 | let mut this = self.project(); |
786 | loop { |
787 | match this.pool_drop_notifier.as_mut().poll(cx) { |
788 | Poll::Ready(Ok(n)) => match n {}, |
789 | Poll::Pending => (), |
790 | Poll::Ready(Err(_canceled)) => { |
791 | trace!("pool closed, canceling idle interval" ); |
792 | return Poll::Ready(()); |
793 | } |
794 | } |
795 | |
796 | ready!(Pin::new(&mut this.fut).poll(cx)); |
797 | // Set this task to run after the next deadline |
798 | // If the poll missed the deadline by a lot, set the deadline |
799 | // from the current time instead |
800 | *this.deadline += *this.duration; |
801 | if *this.deadline < Instant::now() - Duration::from_millis(5) { |
802 | *this.deadline = Instant::now() + *this.duration; |
803 | } |
804 | *this.fut = this.timer.sleep_until(*this.deadline); |
805 | |
806 | if let Some(inner) = this.pool.upgrade() { |
807 | if let Ok(mut inner) = inner.lock() { |
808 | trace!("idle interval checking for expired" ); |
809 | inner.clear_expired(); |
810 | continue; |
811 | } |
812 | } |
813 | return Poll::Ready(()); |
814 | } |
815 | } |
816 | } |
817 | |
818 | impl<T> WeakOpt<T> { |
819 | fn none() -> Self { |
820 | WeakOpt(None) |
821 | } |
822 | |
823 | fn downgrade(arc: &Arc<T>) -> Self { |
824 | WeakOpt(Some(Arc::downgrade(this:arc))) |
825 | } |
826 | |
827 | fn upgrade(&self) -> Option<Arc<T>> { |
828 | self.0.as_ref().and_then(Weak::upgrade) |
829 | } |
830 | } |
831 | |
832 | #[cfg (all(test, not(miri)))] |
833 | mod tests { |
834 | use std::fmt::Debug; |
835 | use std::future::Future; |
836 | use std::hash::Hash; |
837 | use std::pin::Pin; |
838 | use std::task::{self, Poll}; |
839 | use std::time::Duration; |
840 | |
841 | use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; |
842 | use crate::rt::{TokioExecutor, TokioTimer}; |
843 | |
844 | use crate::common::timer; |
845 | |
846 | #[derive (Clone, Debug, PartialEq, Eq, Hash)] |
847 | struct KeyImpl(http::uri::Scheme, http::uri::Authority); |
848 | |
849 | type KeyTuple = (http::uri::Scheme, http::uri::Authority); |
850 | |
851 | /// Test unique reservations. |
852 | #[derive (Debug, PartialEq, Eq)] |
853 | struct Uniq<T>(T); |
854 | |
855 | impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { |
856 | fn is_open(&self) -> bool { |
857 | true |
858 | } |
859 | |
860 | fn reserve(self) -> Reservation<Self> { |
861 | Reservation::Unique(self) |
862 | } |
863 | |
864 | fn can_share(&self) -> bool { |
865 | false |
866 | } |
867 | } |
868 | |
869 | fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> { |
870 | Connecting { |
871 | key, |
872 | pool: WeakOpt::none(), |
873 | } |
874 | } |
875 | |
876 | fn host_key(s: &str) -> KeyImpl { |
877 | KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key" )) |
878 | } |
879 | |
880 | fn pool_no_timer<T, K: Key>() -> Pool<T, K> { |
881 | pool_max_idle_no_timer(::std::usize::MAX) |
882 | } |
883 | |
884 | fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> { |
885 | let pool = Pool::new( |
886 | super::Config { |
887 | idle_timeout: Some(Duration::from_millis(100)), |
888 | max_idle_per_host: max_idle, |
889 | }, |
890 | TokioExecutor::new(), |
891 | Option::<timer::Timer>::None, |
892 | ); |
893 | pool.no_timer(); |
894 | pool |
895 | } |
896 | |
897 | #[tokio::test ] |
898 | async fn test_pool_checkout_smoke() { |
899 | let pool = pool_no_timer(); |
900 | let key = host_key("foo" ); |
901 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
902 | |
903 | drop(pooled); |
904 | |
905 | match pool.checkout(key).await { |
906 | Ok(pooled) => assert_eq!(*pooled, Uniq(41)), |
907 | Err(_) => panic!("not ready" ), |
908 | }; |
909 | } |
910 | |
911 | /// Helper to check if the future is ready after polling once. |
912 | struct PollOnce<'a, F>(&'a mut F); |
913 | |
914 | impl<F, T, U> Future for PollOnce<'_, F> |
915 | where |
916 | F: Future<Output = Result<T, U>> + Unpin, |
917 | { |
918 | type Output = Option<()>; |
919 | |
920 | fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
921 | match Pin::new(&mut self.0).poll(cx) { |
922 | Poll::Ready(Ok(_)) => Poll::Ready(Some(())), |
923 | Poll::Ready(Err(_)) => Poll::Ready(Some(())), |
924 | Poll::Pending => Poll::Ready(None), |
925 | } |
926 | } |
927 | } |
928 | |
929 | #[tokio::test ] |
930 | async fn test_pool_checkout_returns_none_if_expired() { |
931 | let pool = pool_no_timer(); |
932 | let key = host_key("foo" ); |
933 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
934 | |
935 | drop(pooled); |
936 | tokio::time::sleep(pool.locked().timeout.unwrap()).await; |
937 | let mut checkout = pool.checkout(key); |
938 | let poll_once = PollOnce(&mut checkout); |
939 | let is_not_ready = poll_once.await.is_none(); |
940 | assert!(is_not_ready); |
941 | } |
942 | |
943 | #[tokio::test ] |
944 | async fn test_pool_checkout_removes_expired() { |
945 | let pool = pool_no_timer(); |
946 | let key = host_key("foo" ); |
947 | |
948 | pool.pooled(c(key.clone()), Uniq(41)); |
949 | pool.pooled(c(key.clone()), Uniq(5)); |
950 | pool.pooled(c(key.clone()), Uniq(99)); |
951 | |
952 | assert_eq!( |
953 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
954 | Some(3) |
955 | ); |
956 | tokio::time::sleep(pool.locked().timeout.unwrap()).await; |
957 | |
958 | let mut checkout = pool.checkout(key.clone()); |
959 | let poll_once = PollOnce(&mut checkout); |
960 | // checkout.await should clean out the expired |
961 | poll_once.await; |
962 | assert!(pool.locked().idle.get(&key).is_none()); |
963 | } |
964 | |
965 | #[test ] |
966 | fn test_pool_max_idle_per_host() { |
967 | let pool = pool_max_idle_no_timer(2); |
968 | let key = host_key("foo" ); |
969 | |
970 | pool.pooled(c(key.clone()), Uniq(41)); |
971 | pool.pooled(c(key.clone()), Uniq(5)); |
972 | pool.pooled(c(key.clone()), Uniq(99)); |
973 | |
974 | // pooled and dropped 3, max_idle should only allow 2 |
975 | assert_eq!( |
976 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
977 | Some(2) |
978 | ); |
979 | } |
980 | |
981 | #[tokio::test ] |
982 | async fn test_pool_timer_removes_expired() { |
983 | let pool = Pool::new( |
984 | super::Config { |
985 | idle_timeout: Some(Duration::from_millis(10)), |
986 | max_idle_per_host: std::usize::MAX, |
987 | }, |
988 | TokioExecutor::new(), |
989 | Some(TokioTimer::new()), |
990 | ); |
991 | |
992 | let key = host_key("foo" ); |
993 | |
994 | pool.pooled(c(key.clone()), Uniq(41)); |
995 | pool.pooled(c(key.clone()), Uniq(5)); |
996 | pool.pooled(c(key.clone()), Uniq(99)); |
997 | |
998 | assert_eq!( |
999 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
1000 | Some(3) |
1001 | ); |
1002 | |
1003 | // Let the timer tick passed the expiration... |
1004 | tokio::time::sleep(Duration::from_millis(30)).await; |
1005 | // Yield so the Interval can reap... |
1006 | tokio::task::yield_now().await; |
1007 | |
1008 | assert!(pool.locked().idle.get(&key).is_none()); |
1009 | } |
1010 | |
1011 | #[tokio::test ] |
1012 | async fn test_pool_checkout_task_unparked() { |
1013 | use futures_util::future::join; |
1014 | use futures_util::FutureExt; |
1015 | |
1016 | let pool = pool_no_timer(); |
1017 | let key = host_key("foo" ); |
1018 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
1019 | |
1020 | let checkout = join(pool.checkout(key), async { |
1021 | // the checkout future will park first, |
1022 | // and then this lazy future will be polled, which will insert |
1023 | // the pooled back into the pool |
1024 | // |
1025 | // this test makes sure that doing so will unpark the checkout |
1026 | drop(pooled); |
1027 | }) |
1028 | .map(|(entry, _)| entry); |
1029 | |
1030 | assert_eq!(*checkout.await.unwrap(), Uniq(41)); |
1031 | } |
1032 | |
1033 | #[tokio::test ] |
1034 | async fn test_pool_checkout_drop_cleans_up_waiters() { |
1035 | let pool = pool_no_timer::<Uniq<i32>, KeyImpl>(); |
1036 | let key = host_key("foo" ); |
1037 | |
1038 | let mut checkout1 = pool.checkout(key.clone()); |
1039 | let mut checkout2 = pool.checkout(key.clone()); |
1040 | |
1041 | let poll_once1 = PollOnce(&mut checkout1); |
1042 | let poll_once2 = PollOnce(&mut checkout2); |
1043 | |
1044 | // first poll needed to get into Pool's parked |
1045 | poll_once1.await; |
1046 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); |
1047 | poll_once2.await; |
1048 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); |
1049 | |
1050 | // on drop, clean up Pool |
1051 | drop(checkout1); |
1052 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); |
1053 | |
1054 | drop(checkout2); |
1055 | assert!(pool.locked().waiters.get(&key).is_none()); |
1056 | } |
1057 | |
1058 | #[derive (Debug)] |
1059 | struct CanClose { |
1060 | #[allow (unused)] |
1061 | val: i32, |
1062 | closed: bool, |
1063 | } |
1064 | |
1065 | impl Poolable for CanClose { |
1066 | fn is_open(&self) -> bool { |
1067 | !self.closed |
1068 | } |
1069 | |
1070 | fn reserve(self) -> Reservation<Self> { |
1071 | Reservation::Unique(self) |
1072 | } |
1073 | |
1074 | fn can_share(&self) -> bool { |
1075 | false |
1076 | } |
1077 | } |
1078 | |
1079 | #[test ] |
1080 | fn pooled_drop_if_closed_doesnt_reinsert() { |
1081 | let pool = pool_no_timer(); |
1082 | let key = host_key("foo" ); |
1083 | pool.pooled( |
1084 | c(key.clone()), |
1085 | CanClose { |
1086 | val: 57, |
1087 | closed: true, |
1088 | }, |
1089 | ); |
1090 | |
1091 | assert!(!pool.locked().idle.contains_key(&key)); |
1092 | } |
1093 | } |
1094 | |