1 | use std::collections::{HashMap, HashSet, VecDeque}; |
2 | use std::error::Error as StdError; |
3 | use std::fmt; |
4 | use std::future::Future; |
5 | use std::marker::Unpin; |
6 | use std::ops::{Deref, DerefMut}; |
7 | use std::pin::Pin; |
8 | use std::sync::{Arc, Mutex, Weak}; |
9 | use std::task::{Context, Poll}; |
10 | |
11 | #[cfg (not(feature = "runtime" ))] |
12 | use std::time::{Duration, Instant}; |
13 | |
14 | use futures_channel::oneshot; |
15 | #[cfg (feature = "runtime" )] |
16 | use tokio::time::{Duration, Instant, Interval}; |
17 | use tracing::{debug, trace}; |
18 | |
19 | use super::client::Ver; |
20 | use crate::common::exec::Exec; |
21 | |
22 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
23 | #[allow (missing_debug_implementations)] |
24 | pub(super) struct Pool<T> { |
25 | // If the pool is disabled, this is None. |
26 | inner: Option<Arc<Mutex<PoolInner<T>>>>, |
27 | } |
28 | |
29 | // Before using a pooled connection, make sure the sender is not dead. |
30 | // |
31 | // This is a trait to allow the `client::pool::tests` to work for `i32`. |
32 | // |
33 | // See https://github.com/hyperium/hyper/issues/1429 |
34 | pub(super) trait Poolable: Unpin + Send + Sized + 'static { |
35 | fn is_open(&self) -> bool; |
36 | /// Reserve this connection. |
37 | /// |
38 | /// Allows for HTTP/2 to return a shared reservation. |
39 | fn reserve(self) -> Reservation<Self>; |
40 | fn can_share(&self) -> bool; |
41 | } |
42 | |
43 | /// When checking out a pooled connection, it might be that the connection |
44 | /// only supports a single reservation, or it might be usable for many. |
45 | /// |
46 | /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be |
47 | /// used for multiple requests. |
48 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
49 | #[allow (missing_debug_implementations)] |
50 | pub(super) enum Reservation<T> { |
51 | /// This connection could be used multiple times, the first one will be |
52 | /// reinserted into the `idle` pool, and the second will be given to |
53 | /// the `Checkout`. |
54 | #[cfg (feature = "http2" )] |
55 | Shared(T, T), |
56 | /// This connection requires unique access. It will be returned after |
57 | /// use is complete. |
58 | Unique(T), |
59 | } |
60 | |
61 | /// Simple type alias in case the key type needs to be adjusted. |
62 | pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; |
63 | |
64 | struct PoolInner<T> { |
65 | // A flag that a connection is being established, and the connection |
66 | // should be shared. This prevents making multiple HTTP/2 connections |
67 | // to the same host. |
68 | connecting: HashSet<Key>, |
69 | // These are internal Conns sitting in the event loop in the KeepAlive |
70 | // state, waiting to receive a new Request to send on the socket. |
71 | idle: HashMap<Key, Vec<Idle<T>>>, |
72 | max_idle_per_host: usize, |
73 | // These are outstanding Checkouts that are waiting for a socket to be |
74 | // able to send a Request one. This is used when "racing" for a new |
75 | // connection. |
76 | // |
77 | // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait |
78 | // for the Pool to receive an idle Conn. When a Conn becomes idle, |
79 | // this list is checked for any parked Checkouts, and tries to notify |
80 | // them that the Conn could be used instead of waiting for a brand new |
81 | // connection. |
82 | waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, |
83 | // A oneshot channel is used to allow the interval to be notified when |
84 | // the Pool completely drops. That way, the interval can cancel immediately. |
85 | #[cfg (feature = "runtime" )] |
86 | idle_interval_ref: Option<oneshot::Sender<std::convert::Infallible>>, |
87 | #[cfg (feature = "runtime" )] |
88 | exec: Exec, |
89 | timeout: Option<Duration>, |
90 | } |
91 | |
92 | // This is because `Weak::new()` *allocates* space for `T`, even if it |
93 | // doesn't need it! |
94 | struct WeakOpt<T>(Option<Weak<T>>); |
95 | |
96 | #[derive (Clone, Copy, Debug)] |
97 | pub(super) struct Config { |
98 | pub(super) idle_timeout: Option<Duration>, |
99 | pub(super) max_idle_per_host: usize, |
100 | } |
101 | |
102 | impl Config { |
103 | pub(super) fn is_enabled(&self) -> bool { |
104 | self.max_idle_per_host > 0 |
105 | } |
106 | } |
107 | |
108 | impl<T> Pool<T> { |
109 | pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> { |
110 | let inner = if config.is_enabled() { |
111 | Some(Arc::new(Mutex::new(PoolInner { |
112 | connecting: HashSet::new(), |
113 | idle: HashMap::new(), |
114 | #[cfg (feature = "runtime" )] |
115 | idle_interval_ref: None, |
116 | max_idle_per_host: config.max_idle_per_host, |
117 | waiters: HashMap::new(), |
118 | #[cfg (feature = "runtime" )] |
119 | exec: __exec.clone(), |
120 | timeout: config.idle_timeout.filter(|&t| t > Duration::ZERO), |
121 | }))) |
122 | } else { |
123 | None |
124 | }; |
125 | |
126 | Pool { inner } |
127 | } |
128 | |
129 | fn is_enabled(&self) -> bool { |
130 | self.inner.is_some() |
131 | } |
132 | |
133 | #[cfg (test)] |
134 | pub(super) fn no_timer(&self) { |
135 | // Prevent an actual interval from being created for this pool... |
136 | #[cfg (feature = "runtime" )] |
137 | { |
138 | let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); |
139 | assert!(inner.idle_interval_ref.is_none(), "timer already spawned" ); |
140 | let (tx, _) = oneshot::channel(); |
141 | inner.idle_interval_ref = Some(tx); |
142 | } |
143 | } |
144 | } |
145 | |
146 | impl<T: Poolable> Pool<T> { |
147 | /// Returns a `Checkout` which is a future that resolves if an idle |
148 | /// connection becomes available. |
149 | pub(super) fn checkout(&self, key: Key) -> Checkout<T> { |
150 | Checkout { |
151 | key, |
152 | pool: self.clone(), |
153 | waiter: None, |
154 | } |
155 | } |
156 | |
157 | /// Ensure that there is only ever 1 connecting task for HTTP/2 |
158 | /// connections. This does nothing for HTTP/1. |
159 | pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { |
160 | if ver == Ver::Http2 { |
161 | if let Some(ref enabled) = self.inner { |
162 | let mut inner = enabled.lock().unwrap(); |
163 | return if inner.connecting.insert(key.clone()) { |
164 | let connecting = Connecting { |
165 | key: key.clone(), |
166 | pool: WeakOpt::downgrade(enabled), |
167 | }; |
168 | Some(connecting) |
169 | } else { |
170 | trace!("HTTP/2 connecting already in progress for {:?}" , key); |
171 | None |
172 | }; |
173 | } |
174 | } |
175 | |
176 | // else |
177 | Some(Connecting { |
178 | key: key.clone(), |
179 | // in HTTP/1's case, there is never a lock, so we don't |
180 | // need to do anything in Drop. |
181 | pool: WeakOpt::none(), |
182 | }) |
183 | } |
184 | |
185 | #[cfg (test)] |
186 | fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> { |
187 | self.inner.as_ref().expect("enabled" ).lock().expect("lock" ) |
188 | } |
189 | |
190 | /* Used in client/tests.rs... |
191 | #[cfg(feature = "runtime")] |
192 | #[cfg(test)] |
193 | pub(super) fn h1_key(&self, s: &str) -> Key { |
194 | Arc::new(s.to_string()) |
195 | } |
196 | |
197 | #[cfg(feature = "runtime")] |
198 | #[cfg(test)] |
199 | pub(super) fn idle_count(&self, key: &Key) -> usize { |
200 | self |
201 | .locked() |
202 | .idle |
203 | .get(key) |
204 | .map(|list| list.len()) |
205 | .unwrap_or(0) |
206 | } |
207 | */ |
208 | |
209 | pub(super) fn pooled( |
210 | &self, |
211 | #[cfg_attr (not(feature = "http2" ), allow(unused_mut))] mut connecting: Connecting<T>, |
212 | value: T, |
213 | ) -> Pooled<T> { |
214 | let (value, pool_ref) = if let Some(ref enabled) = self.inner { |
215 | match value.reserve() { |
216 | #[cfg (feature = "http2" )] |
217 | Reservation::Shared(to_insert, to_return) => { |
218 | let mut inner = enabled.lock().unwrap(); |
219 | inner.put(connecting.key.clone(), to_insert, enabled); |
220 | // Do this here instead of Drop for Connecting because we |
221 | // already have a lock, no need to lock the mutex twice. |
222 | inner.connected(&connecting.key); |
223 | // prevent the Drop of Connecting from repeating inner.connected() |
224 | connecting.pool = WeakOpt::none(); |
225 | |
226 | // Shared reservations don't need a reference to the pool, |
227 | // since the pool always keeps a copy. |
228 | (to_return, WeakOpt::none()) |
229 | } |
230 | Reservation::Unique(value) => { |
231 | // Unique reservations must take a reference to the pool |
232 | // since they hope to reinsert once the reservation is |
233 | // completed |
234 | (value, WeakOpt::downgrade(enabled)) |
235 | } |
236 | } |
237 | } else { |
238 | // If pool is not enabled, skip all the things... |
239 | |
240 | // The Connecting should have had no pool ref |
241 | debug_assert!(connecting.pool.upgrade().is_none()); |
242 | |
243 | (value, WeakOpt::none()) |
244 | }; |
245 | Pooled { |
246 | key: connecting.key.clone(), |
247 | is_reused: false, |
248 | pool: pool_ref, |
249 | value: Some(value), |
250 | } |
251 | } |
252 | |
253 | fn reuse(&self, key: &Key, value: T) -> Pooled<T> { |
254 | debug!("reuse idle connection for {:?}" , key); |
255 | // TODO: unhack this |
256 | // In Pool::pooled(), which is used for inserting brand new connections, |
257 | // there's some code that adjusts the pool reference taken depending |
258 | // on if the Reservation can be shared or is unique. By the time |
259 | // reuse() is called, the reservation has already been made, and |
260 | // we just have the final value, without knowledge of if this is |
261 | // unique or shared. So, the hack is to just assume Ver::Http2 means |
262 | // shared... :( |
263 | let mut pool_ref = WeakOpt::none(); |
264 | if !value.can_share() { |
265 | if let Some(ref enabled) = self.inner { |
266 | pool_ref = WeakOpt::downgrade(enabled); |
267 | } |
268 | } |
269 | |
270 | Pooled { |
271 | is_reused: true, |
272 | key: key.clone(), |
273 | pool: pool_ref, |
274 | value: Some(value), |
275 | } |
276 | } |
277 | } |
278 | |
279 | /// Pop off this list, looking for a usable connection that hasn't expired. |
280 | struct IdlePopper<'a, T> { |
281 | key: &'a Key, |
282 | list: &'a mut Vec<Idle<T>>, |
283 | } |
284 | |
285 | impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { |
286 | fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { |
287 | while let Some(entry) = self.list.pop() { |
288 | // If the connection has been closed, or is older than our idle |
289 | // timeout, simply drop it and keep looking... |
290 | if !entry.value.is_open() { |
291 | trace!("removing closed connection for {:?}" , self.key); |
292 | continue; |
293 | } |
294 | // TODO: Actually, since the `idle` list is pushed to the end always, |
295 | // that would imply that if *this* entry is expired, then anything |
296 | // "earlier" in the list would *have* to be expired also... Right? |
297 | // |
298 | // In that case, we could just break out of the loop and drop the |
299 | // whole list... |
300 | if expiration.expires(entry.idle_at) { |
301 | trace!("removing expired connection for {:?}" , self.key); |
302 | continue; |
303 | } |
304 | |
305 | let value = match entry.value.reserve() { |
306 | #[cfg (feature = "http2" )] |
307 | Reservation::Shared(to_reinsert, to_checkout) => { |
308 | self.list.push(Idle { |
309 | idle_at: Instant::now(), |
310 | value: to_reinsert, |
311 | }); |
312 | to_checkout |
313 | } |
314 | Reservation::Unique(unique) => unique, |
315 | }; |
316 | |
317 | return Some(Idle { |
318 | idle_at: entry.idle_at, |
319 | value, |
320 | }); |
321 | } |
322 | |
323 | None |
324 | } |
325 | } |
326 | |
327 | impl<T: Poolable> PoolInner<T> { |
328 | fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { |
329 | if value.can_share() && self.idle.contains_key(&key) { |
330 | trace!("put; existing idle HTTP/2 connection for {:?}" , key); |
331 | return; |
332 | } |
333 | trace!("put; add idle connection for {:?}" , key); |
334 | let mut remove_waiters = false; |
335 | let mut value = Some(value); |
336 | if let Some(waiters) = self.waiters.get_mut(&key) { |
337 | while let Some(tx) = waiters.pop_front() { |
338 | if !tx.is_canceled() { |
339 | let reserved = value.take().expect("value already sent" ); |
340 | let reserved = match reserved.reserve() { |
341 | #[cfg (feature = "http2" )] |
342 | Reservation::Shared(to_keep, to_send) => { |
343 | value = Some(to_keep); |
344 | to_send |
345 | } |
346 | Reservation::Unique(uniq) => uniq, |
347 | }; |
348 | match tx.send(reserved) { |
349 | Ok(()) => { |
350 | if value.is_none() { |
351 | break; |
352 | } else { |
353 | continue; |
354 | } |
355 | } |
356 | Err(e) => { |
357 | value = Some(e); |
358 | } |
359 | } |
360 | } |
361 | |
362 | trace!("put; removing canceled waiter for {:?}" , key); |
363 | } |
364 | remove_waiters = waiters.is_empty(); |
365 | } |
366 | if remove_waiters { |
367 | self.waiters.remove(&key); |
368 | } |
369 | |
370 | match value { |
371 | Some(value) => { |
372 | // borrow-check scope... |
373 | { |
374 | let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); |
375 | if self.max_idle_per_host <= idle_list.len() { |
376 | trace!("max idle per host for {:?}, dropping connection" , key); |
377 | return; |
378 | } |
379 | |
380 | debug!("pooling idle connection for {:?}" , key); |
381 | idle_list.push(Idle { |
382 | value, |
383 | idle_at: Instant::now(), |
384 | }); |
385 | } |
386 | |
387 | #[cfg (feature = "runtime" )] |
388 | { |
389 | self.spawn_idle_interval(__pool_ref); |
390 | } |
391 | } |
392 | None => trace!("put; found waiter for {:?}" , key), |
393 | } |
394 | } |
395 | |
396 | /// A `Connecting` task is complete. Not necessarily successfully, |
397 | /// but the lock is going away, so clean up. |
398 | fn connected(&mut self, key: &Key) { |
399 | let existed = self.connecting.remove(key); |
400 | debug_assert!(existed, "Connecting dropped, key not in pool.connecting" ); |
401 | // cancel any waiters. if there are any, it's because |
402 | // this Connecting task didn't complete successfully. |
403 | // those waiters would never receive a connection. |
404 | self.waiters.remove(key); |
405 | } |
406 | |
407 | #[cfg (feature = "runtime" )] |
408 | fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { |
409 | let (dur, rx) = { |
410 | if self.idle_interval_ref.is_some() { |
411 | return; |
412 | } |
413 | |
414 | if let Some(dur) = self.timeout { |
415 | let (tx, rx) = oneshot::channel(); |
416 | self.idle_interval_ref = Some(tx); |
417 | (dur, rx) |
418 | } else { |
419 | return; |
420 | } |
421 | }; |
422 | |
423 | let interval = IdleTask { |
424 | interval: tokio::time::interval(dur), |
425 | pool: WeakOpt::downgrade(pool_ref), |
426 | pool_drop_notifier: rx, |
427 | }; |
428 | |
429 | self.exec.execute(interval); |
430 | } |
431 | } |
432 | |
433 | impl<T> PoolInner<T> { |
434 | /// Any `FutureResponse`s that were created will have made a `Checkout`, |
435 | /// and possibly inserted into the pool that it is waiting for an idle |
436 | /// connection. If a user ever dropped that future, we need to clean out |
437 | /// those parked senders. |
438 | fn clean_waiters(&mut self, key: &Key) { |
439 | let mut remove_waiters: bool = false; |
440 | if let Some(waiters: &mut VecDeque>) = self.waiters.get_mut(key) { |
441 | waiters.retain(|tx: &Sender| !tx.is_canceled()); |
442 | remove_waiters = waiters.is_empty(); |
443 | } |
444 | if remove_waiters { |
445 | self.waiters.remove(key); |
446 | } |
447 | } |
448 | } |
449 | |
450 | #[cfg (feature = "runtime" )] |
451 | impl<T: Poolable> PoolInner<T> { |
452 | /// This should *only* be called by the IdleTask |
453 | fn clear_expired(&mut self) { |
454 | let dur = self.timeout.expect("interval assumes timeout" ); |
455 | |
456 | let now = Instant::now(); |
457 | //self.last_idle_check_at = now; |
458 | |
459 | self.idle.retain(|key, values| { |
460 | values.retain(|entry| { |
461 | if !entry.value.is_open() { |
462 | trace!("idle interval evicting closed for {:?}" , key); |
463 | return false; |
464 | } |
465 | |
466 | // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. |
467 | if now.saturating_duration_since(entry.idle_at) > dur { |
468 | trace!("idle interval evicting expired for {:?}" , key); |
469 | return false; |
470 | } |
471 | |
472 | // Otherwise, keep this value... |
473 | true |
474 | }); |
475 | |
476 | // returning false evicts this key/val |
477 | !values.is_empty() |
478 | }); |
479 | } |
480 | } |
481 | |
482 | impl<T> Clone for Pool<T> { |
483 | fn clone(&self) -> Pool<T> { |
484 | Pool { |
485 | inner: self.inner.clone(), |
486 | } |
487 | } |
488 | } |
489 | |
490 | /// A wrapped poolable value that tries to reinsert to the Pool on Drop. |
491 | // Note: The bounds `T: Poolable` is needed for the Drop impl. |
492 | pub(super) struct Pooled<T: Poolable> { |
493 | value: Option<T>, |
494 | is_reused: bool, |
495 | key: Key, |
496 | pool: WeakOpt<Mutex<PoolInner<T>>>, |
497 | } |
498 | |
499 | impl<T: Poolable> Pooled<T> { |
500 | pub(super) fn is_reused(&self) -> bool { |
501 | self.is_reused |
502 | } |
503 | |
504 | pub(super) fn is_pool_enabled(&self) -> bool { |
505 | self.pool.0.is_some() |
506 | } |
507 | |
508 | fn as_ref(&self) -> &T { |
509 | self.value.as_ref().expect(msg:"not dropped" ) |
510 | } |
511 | |
512 | fn as_mut(&mut self) -> &mut T { |
513 | self.value.as_mut().expect(msg:"not dropped" ) |
514 | } |
515 | } |
516 | |
517 | impl<T: Poolable> Deref for Pooled<T> { |
518 | type Target = T; |
519 | fn deref(&self) -> &T { |
520 | self.as_ref() |
521 | } |
522 | } |
523 | |
524 | impl<T: Poolable> DerefMut for Pooled<T> { |
525 | fn deref_mut(&mut self) -> &mut T { |
526 | self.as_mut() |
527 | } |
528 | } |
529 | |
530 | impl<T: Poolable> Drop for Pooled<T> { |
531 | fn drop(&mut self) { |
532 | if let Some(value: T) = self.value.take() { |
533 | if !value.is_open() { |
534 | // If we *already* know the connection is done here, |
535 | // it shouldn't be re-inserted back into the pool. |
536 | return; |
537 | } |
538 | |
539 | if let Some(pool: Arc>>) = self.pool.upgrade() { |
540 | if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() { |
541 | inner.put(self.key.clone(), value, &pool); |
542 | } |
543 | } else if !value.can_share() { |
544 | trace!("pool dropped, dropping pooled ( {:?})" , self.key); |
545 | } |
546 | // Ver::Http2 is already in the Pool (or dead), so we wouldn't |
547 | // have an actual reference to the Pool. |
548 | } |
549 | } |
550 | } |
551 | |
552 | impl<T: Poolable> fmt::Debug for Pooled<T> { |
553 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
554 | f.debug_struct("Pooled" ).field(name:"key" , &self.key).finish() |
555 | } |
556 | } |
557 | |
558 | struct Idle<T> { |
559 | idle_at: Instant, |
560 | value: T, |
561 | } |
562 | |
563 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
564 | #[allow (missing_debug_implementations)] |
565 | pub(super) struct Checkout<T> { |
566 | key: Key, |
567 | pool: Pool<T>, |
568 | waiter: Option<oneshot::Receiver<T>>, |
569 | } |
570 | |
571 | #[derive (Debug)] |
572 | pub(super) struct CheckoutIsClosedError; |
573 | |
574 | impl StdError for CheckoutIsClosedError {} |
575 | |
576 | impl fmt::Display for CheckoutIsClosedError { |
577 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
578 | f.write_str(data:"checked out connection was closed" ) |
579 | } |
580 | } |
581 | |
582 | impl<T: Poolable> Checkout<T> { |
583 | fn poll_waiter(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Pooled<T>>>> { |
584 | if let Some(mut rx) = self.waiter.take() { |
585 | match Pin::new(&mut rx).poll(cx) { |
586 | Poll::Ready(Ok(value)) => { |
587 | if value.is_open() { |
588 | Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) |
589 | } else { |
590 | Poll::Ready(Some(Err( |
591 | crate::Error::new_canceled().with(CheckoutIsClosedError) |
592 | ))) |
593 | } |
594 | } |
595 | Poll::Pending => { |
596 | self.waiter = Some(rx); |
597 | Poll::Pending |
598 | } |
599 | Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( |
600 | crate::Error::new_canceled().with("request has been canceled" ) |
601 | ))), |
602 | } |
603 | } else { |
604 | Poll::Ready(None) |
605 | } |
606 | } |
607 | |
608 | fn checkout(&mut self, cx: &mut Context<'_>) -> Option<Pooled<T>> { |
609 | let entry = { |
610 | let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); |
611 | let expiration = Expiration::new(inner.timeout); |
612 | let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { |
613 | trace!("take? {:?}: expiration = {:?}" , self.key, expiration.0); |
614 | // A block to end the mutable borrow on list, |
615 | // so the map below can check is_empty() |
616 | { |
617 | let popper = IdlePopper { |
618 | key: &self.key, |
619 | list, |
620 | }; |
621 | popper.pop(&expiration) |
622 | } |
623 | .map(|e| (e, list.is_empty())) |
624 | }); |
625 | |
626 | let (entry, empty) = if let Some((e, empty)) = maybe_entry { |
627 | (Some(e), empty) |
628 | } else { |
629 | // No entry found means nuke the list for sure. |
630 | (None, true) |
631 | }; |
632 | if empty { |
633 | //TODO: This could be done with the HashMap::entry API instead. |
634 | inner.idle.remove(&self.key); |
635 | } |
636 | |
637 | if entry.is_none() && self.waiter.is_none() { |
638 | let (tx, mut rx) = oneshot::channel(); |
639 | trace!("checkout waiting for idle connection: {:?}" , self.key); |
640 | inner |
641 | .waiters |
642 | .entry(self.key.clone()) |
643 | .or_insert_with(VecDeque::new) |
644 | .push_back(tx); |
645 | |
646 | // register the waker with this oneshot |
647 | assert!(Pin::new(&mut rx).poll(cx).is_pending()); |
648 | self.waiter = Some(rx); |
649 | } |
650 | |
651 | entry |
652 | }; |
653 | |
654 | entry.map(|e| self.pool.reuse(&self.key, e.value)) |
655 | } |
656 | } |
657 | |
658 | impl<T: Poolable> Future for Checkout<T> { |
659 | type Output = crate::Result<Pooled<T>>; |
660 | |
661 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
662 | if let Some(pooled: Pooled) = ready!(self.poll_waiter(cx)?) { |
663 | return Poll::Ready(Ok(pooled)); |
664 | } |
665 | |
666 | if let Some(pooled: Pooled) = self.checkout(cx) { |
667 | Poll::Ready(Ok(pooled)) |
668 | } else if !self.pool.is_enabled() { |
669 | Poll::Ready(Err(crate::Error::new_canceled().with(cause:"pool is disabled" ))) |
670 | } else { |
671 | // There's a new waiter, already registered in self.checkout() |
672 | debug_assert!(self.waiter.is_some()); |
673 | Poll::Pending |
674 | } |
675 | } |
676 | } |
677 | |
678 | impl<T> Drop for Checkout<T> { |
679 | fn drop(&mut self) { |
680 | if self.waiter.take().is_some() { |
681 | trace!("checkout dropped for {:?}" , self.key); |
682 | if let Some(Ok(mut inner: MutexGuard<'_, PoolInner<…>>)) = self.pool.inner.as_ref().map(|i: &Arc>>| i.lock()) { |
683 | inner.clean_waiters(&self.key); |
684 | } |
685 | } |
686 | } |
687 | } |
688 | |
689 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
690 | #[allow (missing_debug_implementations)] |
691 | pub(super) struct Connecting<T: Poolable> { |
692 | key: Key, |
693 | pool: WeakOpt<Mutex<PoolInner<T>>>, |
694 | } |
695 | |
696 | impl<T: Poolable> Connecting<T> { |
697 | pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { |
698 | debug_assert!( |
699 | self.pool.0.is_none(), |
700 | "Connecting::alpn_h2 but already Http2" |
701 | ); |
702 | |
703 | pool.connecting(&self.key, Ver::Http2) |
704 | } |
705 | } |
706 | |
707 | impl<T: Poolable> Drop for Connecting<T> { |
708 | fn drop(&mut self) { |
709 | if let Some(pool: Arc>>) = self.pool.upgrade() { |
710 | // No need to panic on drop, that could abort! |
711 | if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() { |
712 | inner.connected(&self.key); |
713 | } |
714 | } |
715 | } |
716 | } |
717 | |
718 | struct Expiration(Option<Duration>); |
719 | |
720 | impl Expiration { |
721 | fn new(dur: Option<Duration>) -> Expiration { |
722 | Expiration(dur) |
723 | } |
724 | |
725 | fn expires(&self, instant: Instant) -> bool { |
726 | match self.0 { |
727 | // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. |
728 | Some(timeout: Duration) => Instant::now().saturating_duration_since(earlier:instant) > timeout, |
729 | None => false, |
730 | } |
731 | } |
732 | } |
733 | |
734 | #[cfg (feature = "runtime" )] |
735 | pin_project_lite::pin_project! { |
736 | struct IdleTask<T> { |
737 | #[pin] |
738 | interval: Interval, |
739 | pool: WeakOpt<Mutex<PoolInner<T>>>, |
740 | // This allows the IdleTask to be notified as soon as the entire |
741 | // Pool is fully dropped, and shutdown. This channel is never sent on, |
742 | // but Err(Canceled) will be received when the Pool is dropped. |
743 | #[pin] |
744 | pool_drop_notifier: oneshot::Receiver<std::convert::Infallible>, |
745 | } |
746 | } |
747 | |
748 | #[cfg (feature = "runtime" )] |
749 | impl<T: Poolable + 'static> Future for IdleTask<T> { |
750 | type Output = (); |
751 | |
752 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
753 | let mut this = self.project(); |
754 | loop { |
755 | match this.pool_drop_notifier.as_mut().poll(cx) { |
756 | Poll::Ready(Ok(n)) => match n {}, |
757 | Poll::Pending => (), |
758 | Poll::Ready(Err(_canceled)) => { |
759 | trace!("pool closed, canceling idle interval" ); |
760 | return Poll::Ready(()); |
761 | } |
762 | } |
763 | |
764 | ready!(this.interval.as_mut().poll_tick(cx)); |
765 | |
766 | if let Some(inner) = this.pool.upgrade() { |
767 | if let Ok(mut inner) = inner.lock() { |
768 | trace!("idle interval checking for expired" ); |
769 | inner.clear_expired(); |
770 | continue; |
771 | } |
772 | } |
773 | return Poll::Ready(()); |
774 | } |
775 | } |
776 | } |
777 | |
778 | impl<T> WeakOpt<T> { |
779 | fn none() -> Self { |
780 | WeakOpt(None) |
781 | } |
782 | |
783 | fn downgrade(arc: &Arc<T>) -> Self { |
784 | WeakOpt(Some(Arc::downgrade(this:arc))) |
785 | } |
786 | |
787 | fn upgrade(&self) -> Option<Arc<T>> { |
788 | self.0.as_ref().and_then(Weak::upgrade) |
789 | } |
790 | } |
791 | |
792 | #[cfg (test)] |
793 | mod tests { |
794 | use std::future::Future; |
795 | use std::pin::Pin; |
796 | use std::task::Context; |
797 | use std::task::Poll; |
798 | use std::time::Duration; |
799 | |
800 | use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; |
801 | use crate::common::exec::Exec; |
802 | |
803 | /// Test unique reservations. |
804 | #[derive (Debug, PartialEq, Eq)] |
805 | struct Uniq<T>(T); |
806 | |
807 | impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { |
808 | fn is_open(&self) -> bool { |
809 | true |
810 | } |
811 | |
812 | fn reserve(self) -> Reservation<Self> { |
813 | Reservation::Unique(self) |
814 | } |
815 | |
816 | fn can_share(&self) -> bool { |
817 | false |
818 | } |
819 | } |
820 | |
821 | fn c<T: Poolable>(key: Key) -> Connecting<T> { |
822 | Connecting { |
823 | key, |
824 | pool: WeakOpt::none(), |
825 | } |
826 | } |
827 | |
828 | fn host_key(s: &str) -> Key { |
829 | (http::uri::Scheme::HTTP, s.parse().expect("host key" )) |
830 | } |
831 | |
832 | fn pool_no_timer<T>() -> Pool<T> { |
833 | pool_max_idle_no_timer(::std::usize::MAX) |
834 | } |
835 | |
836 | fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { |
837 | let pool = Pool::new( |
838 | super::Config { |
839 | idle_timeout: Some(Duration::from_millis(100)), |
840 | max_idle_per_host: max_idle, |
841 | }, |
842 | &Exec::Default, |
843 | ); |
844 | pool.no_timer(); |
845 | pool |
846 | } |
847 | |
848 | #[tokio::test ] |
849 | async fn test_pool_checkout_smoke() { |
850 | let pool = pool_no_timer(); |
851 | let key = host_key("foo" ); |
852 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
853 | |
854 | drop(pooled); |
855 | |
856 | match pool.checkout(key).await { |
857 | Ok(pooled) => assert_eq!(*pooled, Uniq(41)), |
858 | Err(_) => panic!("not ready" ), |
859 | }; |
860 | } |
861 | |
862 | /// Helper to check if the future is ready after polling once. |
863 | struct PollOnce<'a, F>(&'a mut F); |
864 | |
865 | impl<F, T, U> Future for PollOnce<'_, F> |
866 | where |
867 | F: Future<Output = Result<T, U>> + Unpin, |
868 | { |
869 | type Output = Option<()>; |
870 | |
871 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
872 | match Pin::new(&mut self.0).poll(cx) { |
873 | Poll::Ready(Ok(_)) => Poll::Ready(Some(())), |
874 | Poll::Ready(Err(_)) => Poll::Ready(Some(())), |
875 | Poll::Pending => Poll::Ready(None), |
876 | } |
877 | } |
878 | } |
879 | |
880 | #[tokio::test ] |
881 | async fn test_pool_checkout_returns_none_if_expired() { |
882 | let pool = pool_no_timer(); |
883 | let key = host_key("foo" ); |
884 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
885 | |
886 | drop(pooled); |
887 | tokio::time::sleep(pool.locked().timeout.unwrap()).await; |
888 | let mut checkout = pool.checkout(key); |
889 | let poll_once = PollOnce(&mut checkout); |
890 | let is_not_ready = poll_once.await.is_none(); |
891 | assert!(is_not_ready); |
892 | } |
893 | |
894 | #[cfg (feature = "runtime" )] |
895 | #[tokio::test ] |
896 | async fn test_pool_checkout_removes_expired() { |
897 | let pool = pool_no_timer(); |
898 | let key = host_key("foo" ); |
899 | |
900 | pool.pooled(c(key.clone()), Uniq(41)); |
901 | pool.pooled(c(key.clone()), Uniq(5)); |
902 | pool.pooled(c(key.clone()), Uniq(99)); |
903 | |
904 | assert_eq!( |
905 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
906 | Some(3) |
907 | ); |
908 | tokio::time::sleep(pool.locked().timeout.unwrap()).await; |
909 | |
910 | let mut checkout = pool.checkout(key.clone()); |
911 | let poll_once = PollOnce(&mut checkout); |
912 | // checkout.await should clean out the expired |
913 | poll_once.await; |
914 | assert!(pool.locked().idle.get(&key).is_none()); |
915 | } |
916 | |
917 | #[test ] |
918 | fn test_pool_max_idle_per_host() { |
919 | let pool = pool_max_idle_no_timer(2); |
920 | let key = host_key("foo" ); |
921 | |
922 | pool.pooled(c(key.clone()), Uniq(41)); |
923 | pool.pooled(c(key.clone()), Uniq(5)); |
924 | pool.pooled(c(key.clone()), Uniq(99)); |
925 | |
926 | // pooled and dropped 3, max_idle should only allow 2 |
927 | assert_eq!( |
928 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
929 | Some(2) |
930 | ); |
931 | } |
932 | |
933 | #[cfg (feature = "runtime" )] |
934 | #[tokio::test ] |
935 | async fn test_pool_timer_removes_expired() { |
936 | let _ = pretty_env_logger::try_init(); |
937 | tokio::time::pause(); |
938 | |
939 | let pool = Pool::new( |
940 | super::Config { |
941 | idle_timeout: Some(Duration::from_millis(10)), |
942 | max_idle_per_host: std::usize::MAX, |
943 | }, |
944 | &Exec::Default, |
945 | ); |
946 | |
947 | let key = host_key("foo" ); |
948 | |
949 | pool.pooled(c(key.clone()), Uniq(41)); |
950 | pool.pooled(c(key.clone()), Uniq(5)); |
951 | pool.pooled(c(key.clone()), Uniq(99)); |
952 | |
953 | assert_eq!( |
954 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
955 | Some(3) |
956 | ); |
957 | |
958 | // Let the timer tick passed the expiration... |
959 | tokio::time::advance(Duration::from_millis(30)).await; |
960 | // Yield so the Interval can reap... |
961 | tokio::task::yield_now().await; |
962 | |
963 | assert!(pool.locked().idle.get(&key).is_none()); |
964 | } |
965 | |
966 | #[tokio::test ] |
967 | async fn test_pool_checkout_task_unparked() { |
968 | use futures_util::future::join; |
969 | use futures_util::FutureExt; |
970 | |
971 | let pool = pool_no_timer(); |
972 | let key = host_key("foo" ); |
973 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
974 | |
975 | let checkout = join(pool.checkout(key), async { |
976 | // the checkout future will park first, |
977 | // and then this lazy future will be polled, which will insert |
978 | // the pooled back into the pool |
979 | // |
980 | // this test makes sure that doing so will unpark the checkout |
981 | drop(pooled); |
982 | }) |
983 | .map(|(entry, _)| entry); |
984 | |
985 | assert_eq!(*checkout.await.unwrap(), Uniq(41)); |
986 | } |
987 | |
988 | #[tokio::test ] |
989 | async fn test_pool_checkout_drop_cleans_up_waiters() { |
990 | let pool = pool_no_timer::<Uniq<i32>>(); |
991 | let key = host_key("foo" ); |
992 | |
993 | let mut checkout1 = pool.checkout(key.clone()); |
994 | let mut checkout2 = pool.checkout(key.clone()); |
995 | |
996 | let poll_once1 = PollOnce(&mut checkout1); |
997 | let poll_once2 = PollOnce(&mut checkout2); |
998 | |
999 | // first poll needed to get into Pool's parked |
1000 | poll_once1.await; |
1001 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); |
1002 | poll_once2.await; |
1003 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); |
1004 | |
1005 | // on drop, clean up Pool |
1006 | drop(checkout1); |
1007 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); |
1008 | |
1009 | drop(checkout2); |
1010 | assert!(pool.locked().waiters.get(&key).is_none()); |
1011 | } |
1012 | |
1013 | #[derive (Debug)] |
1014 | struct CanClose { |
1015 | #[allow (unused)] |
1016 | val: i32, |
1017 | closed: bool, |
1018 | } |
1019 | |
1020 | impl Poolable for CanClose { |
1021 | fn is_open(&self) -> bool { |
1022 | !self.closed |
1023 | } |
1024 | |
1025 | fn reserve(self) -> Reservation<Self> { |
1026 | Reservation::Unique(self) |
1027 | } |
1028 | |
1029 | fn can_share(&self) -> bool { |
1030 | false |
1031 | } |
1032 | } |
1033 | |
1034 | #[test ] |
1035 | fn pooled_drop_if_closed_doesnt_reinsert() { |
1036 | let pool = pool_no_timer(); |
1037 | let key = host_key("foo" ); |
1038 | pool.pooled( |
1039 | c(key.clone()), |
1040 | CanClose { |
1041 | val: 57, |
1042 | closed: true, |
1043 | }, |
1044 | ); |
1045 | |
1046 | assert!(!pool.locked().idle.contains_key(&key)); |
1047 | } |
1048 | } |
1049 | |