1 | use std::collections::{HashMap, HashSet, VecDeque}; |
2 | use std::error::Error as StdError; |
3 | use std::fmt; |
4 | use std::ops::{Deref, DerefMut}; |
5 | use std::sync::{Arc, Mutex, Weak}; |
6 | |
7 | #[cfg (not(feature = "runtime" ))] |
8 | use std::time::{Duration, Instant}; |
9 | |
10 | use futures_channel::oneshot; |
11 | #[cfg (feature = "runtime" )] |
12 | use tokio::time::{Duration, Instant, Interval}; |
13 | use tracing::{debug, trace}; |
14 | |
15 | use super::client::Ver; |
16 | use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin}; |
17 | |
18 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
19 | #[allow (missing_debug_implementations)] |
20 | pub(super) struct Pool<T> { |
21 | // If the pool is disabled, this is None. |
22 | inner: Option<Arc<Mutex<PoolInner<T>>>>, |
23 | } |
24 | |
25 | // Before using a pooled connection, make sure the sender is not dead. |
26 | // |
27 | // This is a trait to allow the `client::pool::tests` to work for `i32`. |
28 | // |
29 | // See https://github.com/hyperium/hyper/issues/1429 |
30 | pub(super) trait Poolable: Unpin + Send + Sized + 'static { |
31 | fn is_open(&self) -> bool; |
32 | /// Reserve this connection. |
33 | /// |
34 | /// Allows for HTTP/2 to return a shared reservation. |
35 | fn reserve(self) -> Reservation<Self>; |
36 | fn can_share(&self) -> bool; |
37 | } |
38 | |
39 | /// When checking out a pooled connection, it might be that the connection |
40 | /// only supports a single reservation, or it might be usable for many. |
41 | /// |
42 | /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be |
43 | /// used for multiple requests. |
44 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
45 | #[allow (missing_debug_implementations)] |
46 | pub(super) enum Reservation<T> { |
47 | /// This connection could be used multiple times, the first one will be |
48 | /// reinserted into the `idle` pool, and the second will be given to |
49 | /// the `Checkout`. |
50 | #[cfg (feature = "http2" )] |
51 | Shared(T, T), |
52 | /// This connection requires unique access. It will be returned after |
53 | /// use is complete. |
54 | Unique(T), |
55 | } |
56 | |
57 | /// Simple type alias in case the key type needs to be adjusted. |
58 | pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>; |
59 | |
60 | struct PoolInner<T> { |
61 | // A flag that a connection is being established, and the connection |
62 | // should be shared. This prevents making multiple HTTP/2 connections |
63 | // to the same host. |
64 | connecting: HashSet<Key>, |
65 | // These are internal Conns sitting in the event loop in the KeepAlive |
66 | // state, waiting to receive a new Request to send on the socket. |
67 | idle: HashMap<Key, Vec<Idle<T>>>, |
68 | max_idle_per_host: usize, |
69 | // These are outstanding Checkouts that are waiting for a socket to be |
70 | // able to send a Request one. This is used when "racing" for a new |
71 | // connection. |
72 | // |
73 | // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait |
74 | // for the Pool to receive an idle Conn. When a Conn becomes idle, |
75 | // this list is checked for any parked Checkouts, and tries to notify |
76 | // them that the Conn could be used instead of waiting for a brand new |
77 | // connection. |
78 | waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, |
79 | // A oneshot channel is used to allow the interval to be notified when |
80 | // the Pool completely drops. That way, the interval can cancel immediately. |
81 | #[cfg (feature = "runtime" )] |
82 | idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>, |
83 | #[cfg (feature = "runtime" )] |
84 | exec: Exec, |
85 | timeout: Option<Duration>, |
86 | } |
87 | |
88 | // This is because `Weak::new()` *allocates* space for `T`, even if it |
89 | // doesn't need it! |
90 | struct WeakOpt<T>(Option<Weak<T>>); |
91 | |
92 | #[derive (Clone, Copy, Debug)] |
93 | pub(super) struct Config { |
94 | pub(super) idle_timeout: Option<Duration>, |
95 | pub(super) max_idle_per_host: usize, |
96 | } |
97 | |
98 | impl Config { |
99 | pub(super) fn is_enabled(&self) -> bool { |
100 | self.max_idle_per_host > 0 |
101 | } |
102 | } |
103 | |
104 | impl<T> Pool<T> { |
105 | pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> { |
106 | let inner = if config.is_enabled() { |
107 | Some(Arc::new(Mutex::new(PoolInner { |
108 | connecting: HashSet::new(), |
109 | idle: HashMap::new(), |
110 | #[cfg (feature = "runtime" )] |
111 | idle_interval_ref: None, |
112 | max_idle_per_host: config.max_idle_per_host, |
113 | waiters: HashMap::new(), |
114 | #[cfg (feature = "runtime" )] |
115 | exec: __exec.clone(), |
116 | timeout: config.idle_timeout, |
117 | }))) |
118 | } else { |
119 | None |
120 | }; |
121 | |
122 | Pool { inner } |
123 | } |
124 | |
125 | fn is_enabled(&self) -> bool { |
126 | self.inner.is_some() |
127 | } |
128 | |
129 | #[cfg (test)] |
130 | pub(super) fn no_timer(&self) { |
131 | // Prevent an actual interval from being created for this pool... |
132 | #[cfg (feature = "runtime" )] |
133 | { |
134 | let mut inner = self.inner.as_ref().unwrap().lock().unwrap(); |
135 | assert!(inner.idle_interval_ref.is_none(), "timer already spawned" ); |
136 | let (tx, _) = oneshot::channel(); |
137 | inner.idle_interval_ref = Some(tx); |
138 | } |
139 | } |
140 | } |
141 | |
142 | impl<T: Poolable> Pool<T> { |
143 | /// Returns a `Checkout` which is a future that resolves if an idle |
144 | /// connection becomes available. |
145 | pub(super) fn checkout(&self, key: Key) -> Checkout<T> { |
146 | Checkout { |
147 | key, |
148 | pool: self.clone(), |
149 | waiter: None, |
150 | } |
151 | } |
152 | |
153 | /// Ensure that there is only ever 1 connecting task for HTTP/2 |
154 | /// connections. This does nothing for HTTP/1. |
155 | pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> { |
156 | if ver == Ver::Http2 { |
157 | if let Some(ref enabled) = self.inner { |
158 | let mut inner = enabled.lock().unwrap(); |
159 | return if inner.connecting.insert(key.clone()) { |
160 | let connecting = Connecting { |
161 | key: key.clone(), |
162 | pool: WeakOpt::downgrade(enabled), |
163 | }; |
164 | Some(connecting) |
165 | } else { |
166 | trace!("HTTP/2 connecting already in progress for {:?}" , key); |
167 | None |
168 | }; |
169 | } |
170 | } |
171 | |
172 | // else |
173 | Some(Connecting { |
174 | key: key.clone(), |
175 | // in HTTP/1's case, there is never a lock, so we don't |
176 | // need to do anything in Drop. |
177 | pool: WeakOpt::none(), |
178 | }) |
179 | } |
180 | |
181 | #[cfg (test)] |
182 | fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> { |
183 | self.inner.as_ref().expect("enabled" ).lock().expect("lock" ) |
184 | } |
185 | |
186 | /* Used in client/tests.rs... |
187 | #[cfg(feature = "runtime")] |
188 | #[cfg(test)] |
189 | pub(super) fn h1_key(&self, s: &str) -> Key { |
190 | Arc::new(s.to_string()) |
191 | } |
192 | |
193 | #[cfg(feature = "runtime")] |
194 | #[cfg(test)] |
195 | pub(super) fn idle_count(&self, key: &Key) -> usize { |
196 | self |
197 | .locked() |
198 | .idle |
199 | .get(key) |
200 | .map(|list| list.len()) |
201 | .unwrap_or(0) |
202 | } |
203 | */ |
204 | |
205 | pub(super) fn pooled( |
206 | &self, |
207 | #[cfg_attr (not(feature = "http2" ), allow(unused_mut))] mut connecting: Connecting<T>, |
208 | value: T, |
209 | ) -> Pooled<T> { |
210 | let (value, pool_ref) = if let Some(ref enabled) = self.inner { |
211 | match value.reserve() { |
212 | #[cfg (feature = "http2" )] |
213 | Reservation::Shared(to_insert, to_return) => { |
214 | let mut inner = enabled.lock().unwrap(); |
215 | inner.put(connecting.key.clone(), to_insert, enabled); |
216 | // Do this here instead of Drop for Connecting because we |
217 | // already have a lock, no need to lock the mutex twice. |
218 | inner.connected(&connecting.key); |
219 | // prevent the Drop of Connecting from repeating inner.connected() |
220 | connecting.pool = WeakOpt::none(); |
221 | |
222 | // Shared reservations don't need a reference to the pool, |
223 | // since the pool always keeps a copy. |
224 | (to_return, WeakOpt::none()) |
225 | } |
226 | Reservation::Unique(value) => { |
227 | // Unique reservations must take a reference to the pool |
228 | // since they hope to reinsert once the reservation is |
229 | // completed |
230 | (value, WeakOpt::downgrade(enabled)) |
231 | } |
232 | } |
233 | } else { |
234 | // If pool is not enabled, skip all the things... |
235 | |
236 | // The Connecting should have had no pool ref |
237 | debug_assert!(connecting.pool.upgrade().is_none()); |
238 | |
239 | (value, WeakOpt::none()) |
240 | }; |
241 | Pooled { |
242 | key: connecting.key.clone(), |
243 | is_reused: false, |
244 | pool: pool_ref, |
245 | value: Some(value), |
246 | } |
247 | } |
248 | |
249 | fn reuse(&self, key: &Key, value: T) -> Pooled<T> { |
250 | debug!("reuse idle connection for {:?}" , key); |
251 | // TODO: unhack this |
252 | // In Pool::pooled(), which is used for inserting brand new connections, |
253 | // there's some code that adjusts the pool reference taken depending |
254 | // on if the Reservation can be shared or is unique. By the time |
255 | // reuse() is called, the reservation has already been made, and |
256 | // we just have the final value, without knowledge of if this is |
257 | // unique or shared. So, the hack is to just assume Ver::Http2 means |
258 | // shared... :( |
259 | let mut pool_ref = WeakOpt::none(); |
260 | if !value.can_share() { |
261 | if let Some(ref enabled) = self.inner { |
262 | pool_ref = WeakOpt::downgrade(enabled); |
263 | } |
264 | } |
265 | |
266 | Pooled { |
267 | is_reused: true, |
268 | key: key.clone(), |
269 | pool: pool_ref, |
270 | value: Some(value), |
271 | } |
272 | } |
273 | } |
274 | |
275 | /// Pop off this list, looking for a usable connection that hasn't expired. |
276 | struct IdlePopper<'a, T> { |
277 | key: &'a Key, |
278 | list: &'a mut Vec<Idle<T>>, |
279 | } |
280 | |
281 | impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { |
282 | fn pop(self, expiration: &Expiration) -> Option<Idle<T>> { |
283 | while let Some(entry) = self.list.pop() { |
284 | // If the connection has been closed, or is older than our idle |
285 | // timeout, simply drop it and keep looking... |
286 | if !entry.value.is_open() { |
287 | trace!("removing closed connection for {:?}" , self.key); |
288 | continue; |
289 | } |
290 | // TODO: Actually, since the `idle` list is pushed to the end always, |
291 | // that would imply that if *this* entry is expired, then anything |
292 | // "earlier" in the list would *have* to be expired also... Right? |
293 | // |
294 | // In that case, we could just break out of the loop and drop the |
295 | // whole list... |
296 | if expiration.expires(entry.idle_at) { |
297 | trace!("removing expired connection for {:?}" , self.key); |
298 | continue; |
299 | } |
300 | |
301 | let value = match entry.value.reserve() { |
302 | #[cfg (feature = "http2" )] |
303 | Reservation::Shared(to_reinsert, to_checkout) => { |
304 | self.list.push(Idle { |
305 | idle_at: Instant::now(), |
306 | value: to_reinsert, |
307 | }); |
308 | to_checkout |
309 | } |
310 | Reservation::Unique(unique) => unique, |
311 | }; |
312 | |
313 | return Some(Idle { |
314 | idle_at: entry.idle_at, |
315 | value, |
316 | }); |
317 | } |
318 | |
319 | None |
320 | } |
321 | } |
322 | |
323 | impl<T: Poolable> PoolInner<T> { |
324 | fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) { |
325 | if value.can_share() && self.idle.contains_key(&key) { |
326 | trace!("put; existing idle HTTP/2 connection for {:?}" , key); |
327 | return; |
328 | } |
329 | trace!("put; add idle connection for {:?}" , key); |
330 | let mut remove_waiters = false; |
331 | let mut value = Some(value); |
332 | if let Some(waiters) = self.waiters.get_mut(&key) { |
333 | while let Some(tx) = waiters.pop_front() { |
334 | if !tx.is_canceled() { |
335 | let reserved = value.take().expect("value already sent" ); |
336 | let reserved = match reserved.reserve() { |
337 | #[cfg (feature = "http2" )] |
338 | Reservation::Shared(to_keep, to_send) => { |
339 | value = Some(to_keep); |
340 | to_send |
341 | } |
342 | Reservation::Unique(uniq) => uniq, |
343 | }; |
344 | match tx.send(reserved) { |
345 | Ok(()) => { |
346 | if value.is_none() { |
347 | break; |
348 | } else { |
349 | continue; |
350 | } |
351 | } |
352 | Err(e) => { |
353 | value = Some(e); |
354 | } |
355 | } |
356 | } |
357 | |
358 | trace!("put; removing canceled waiter for {:?}" , key); |
359 | } |
360 | remove_waiters = waiters.is_empty(); |
361 | } |
362 | if remove_waiters { |
363 | self.waiters.remove(&key); |
364 | } |
365 | |
366 | match value { |
367 | Some(value) => { |
368 | // borrow-check scope... |
369 | { |
370 | let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new); |
371 | if self.max_idle_per_host <= idle_list.len() { |
372 | trace!("max idle per host for {:?}, dropping connection" , key); |
373 | return; |
374 | } |
375 | |
376 | debug!("pooling idle connection for {:?}" , key); |
377 | idle_list.push(Idle { |
378 | value, |
379 | idle_at: Instant::now(), |
380 | }); |
381 | } |
382 | |
383 | #[cfg (feature = "runtime" )] |
384 | { |
385 | self.spawn_idle_interval(__pool_ref); |
386 | } |
387 | } |
388 | None => trace!("put; found waiter for {:?}" , key), |
389 | } |
390 | } |
391 | |
392 | /// A `Connecting` task is complete. Not necessarily successfully, |
393 | /// but the lock is going away, so clean up. |
394 | fn connected(&mut self, key: &Key) { |
395 | let existed = self.connecting.remove(key); |
396 | debug_assert!(existed, "Connecting dropped, key not in pool.connecting" ); |
397 | // cancel any waiters. if there are any, it's because |
398 | // this Connecting task didn't complete successfully. |
399 | // those waiters would never receive a connection. |
400 | self.waiters.remove(key); |
401 | } |
402 | |
403 | #[cfg (feature = "runtime" )] |
404 | fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) { |
405 | let (dur, rx) = { |
406 | if self.idle_interval_ref.is_some() { |
407 | return; |
408 | } |
409 | |
410 | if let Some(dur) = self.timeout { |
411 | let (tx, rx) = oneshot::channel(); |
412 | self.idle_interval_ref = Some(tx); |
413 | (dur, rx) |
414 | } else { |
415 | return; |
416 | } |
417 | }; |
418 | |
419 | let interval = IdleTask { |
420 | interval: tokio::time::interval(dur), |
421 | pool: WeakOpt::downgrade(pool_ref), |
422 | pool_drop_notifier: rx, |
423 | }; |
424 | |
425 | self.exec.execute(interval); |
426 | } |
427 | } |
428 | |
429 | impl<T> PoolInner<T> { |
430 | /// Any `FutureResponse`s that were created will have made a `Checkout`, |
431 | /// and possibly inserted into the pool that it is waiting for an idle |
432 | /// connection. If a user ever dropped that future, we need to clean out |
433 | /// those parked senders. |
434 | fn clean_waiters(&mut self, key: &Key) { |
435 | let mut remove_waiters: bool = false; |
436 | if let Some(waiters: &mut VecDeque>) = self.waiters.get_mut(key) { |
437 | waiters.retain(|tx: &Sender| !tx.is_canceled()); |
438 | remove_waiters = waiters.is_empty(); |
439 | } |
440 | if remove_waiters { |
441 | self.waiters.remove(key); |
442 | } |
443 | } |
444 | } |
445 | |
446 | #[cfg (feature = "runtime" )] |
447 | impl<T: Poolable> PoolInner<T> { |
448 | /// This should *only* be called by the IdleTask |
449 | fn clear_expired(&mut self) { |
450 | let dur = self.timeout.expect("interval assumes timeout" ); |
451 | |
452 | let now = Instant::now(); |
453 | //self.last_idle_check_at = now; |
454 | |
455 | self.idle.retain(|key, values| { |
456 | values.retain(|entry| { |
457 | if !entry.value.is_open() { |
458 | trace!("idle interval evicting closed for {:?}" , key); |
459 | return false; |
460 | } |
461 | |
462 | // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470. |
463 | if now.saturating_duration_since(entry.idle_at) > dur { |
464 | trace!("idle interval evicting expired for {:?}" , key); |
465 | return false; |
466 | } |
467 | |
468 | // Otherwise, keep this value... |
469 | true |
470 | }); |
471 | |
472 | // returning false evicts this key/val |
473 | !values.is_empty() |
474 | }); |
475 | } |
476 | } |
477 | |
478 | impl<T> Clone for Pool<T> { |
479 | fn clone(&self) -> Pool<T> { |
480 | Pool { |
481 | inner: self.inner.clone(), |
482 | } |
483 | } |
484 | } |
485 | |
486 | /// A wrapped poolable value that tries to reinsert to the Pool on Drop. |
487 | // Note: The bounds `T: Poolable` is needed for the Drop impl. |
488 | pub(super) struct Pooled<T: Poolable> { |
489 | value: Option<T>, |
490 | is_reused: bool, |
491 | key: Key, |
492 | pool: WeakOpt<Mutex<PoolInner<T>>>, |
493 | } |
494 | |
495 | impl<T: Poolable> Pooled<T> { |
496 | pub(super) fn is_reused(&self) -> bool { |
497 | self.is_reused |
498 | } |
499 | |
500 | pub(super) fn is_pool_enabled(&self) -> bool { |
501 | self.pool.0.is_some() |
502 | } |
503 | |
504 | fn as_ref(&self) -> &T { |
505 | self.value.as_ref().expect(msg:"not dropped" ) |
506 | } |
507 | |
508 | fn as_mut(&mut self) -> &mut T { |
509 | self.value.as_mut().expect(msg:"not dropped" ) |
510 | } |
511 | } |
512 | |
513 | impl<T: Poolable> Deref for Pooled<T> { |
514 | type Target = T; |
515 | fn deref(&self) -> &T { |
516 | self.as_ref() |
517 | } |
518 | } |
519 | |
520 | impl<T: Poolable> DerefMut for Pooled<T> { |
521 | fn deref_mut(&mut self) -> &mut T { |
522 | self.as_mut() |
523 | } |
524 | } |
525 | |
526 | impl<T: Poolable> Drop for Pooled<T> { |
527 | fn drop(&mut self) { |
528 | if let Some(value: T) = self.value.take() { |
529 | if !value.is_open() { |
530 | // If we *already* know the connection is done here, |
531 | // it shouldn't be re-inserted back into the pool. |
532 | return; |
533 | } |
534 | |
535 | if let Some(pool: Arc>>) = self.pool.upgrade() { |
536 | if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() { |
537 | inner.put(self.key.clone(), value, &pool); |
538 | } |
539 | } else if !value.can_share() { |
540 | trace!("pool dropped, dropping pooled ( {:?})" , self.key); |
541 | } |
542 | // Ver::Http2 is already in the Pool (or dead), so we wouldn't |
543 | // have an actual reference to the Pool. |
544 | } |
545 | } |
546 | } |
547 | |
548 | impl<T: Poolable> fmt::Debug for Pooled<T> { |
549 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
550 | f.debug_struct("Pooled" ).field(name:"key" , &self.key).finish() |
551 | } |
552 | } |
553 | |
554 | struct Idle<T> { |
555 | idle_at: Instant, |
556 | value: T, |
557 | } |
558 | |
559 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
560 | #[allow (missing_debug_implementations)] |
561 | pub(super) struct Checkout<T> { |
562 | key: Key, |
563 | pool: Pool<T>, |
564 | waiter: Option<oneshot::Receiver<T>>, |
565 | } |
566 | |
567 | #[derive (Debug)] |
568 | pub(super) struct CheckoutIsClosedError; |
569 | |
570 | impl StdError for CheckoutIsClosedError {} |
571 | |
572 | impl fmt::Display for CheckoutIsClosedError { |
573 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
574 | f.write_str(data:"checked out connection was closed" ) |
575 | } |
576 | } |
577 | |
578 | impl<T: Poolable> Checkout<T> { |
579 | fn poll_waiter( |
580 | &mut self, |
581 | cx: &mut task::Context<'_>, |
582 | ) -> Poll<Option<crate::Result<Pooled<T>>>> { |
583 | if let Some(mut rx) = self.waiter.take() { |
584 | match Pin::new(&mut rx).poll(cx) { |
585 | Poll::Ready(Ok(value)) => { |
586 | if value.is_open() { |
587 | Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) |
588 | } else { |
589 | Poll::Ready(Some(Err( |
590 | crate::Error::new_canceled().with(CheckoutIsClosedError) |
591 | ))) |
592 | } |
593 | } |
594 | Poll::Pending => { |
595 | self.waiter = Some(rx); |
596 | Poll::Pending |
597 | } |
598 | Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err( |
599 | crate::Error::new_canceled().with("request has been canceled" ) |
600 | ))), |
601 | } |
602 | } else { |
603 | Poll::Ready(None) |
604 | } |
605 | } |
606 | |
607 | fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> { |
608 | let entry = { |
609 | let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); |
610 | let expiration = Expiration::new(inner.timeout); |
611 | let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| { |
612 | trace!("take? {:?}: expiration = {:?}" , self.key, expiration.0); |
613 | // A block to end the mutable borrow on list, |
614 | // so the map below can check is_empty() |
615 | { |
616 | let popper = IdlePopper { |
617 | key: &self.key, |
618 | list, |
619 | }; |
620 | popper.pop(&expiration) |
621 | } |
622 | .map(|e| (e, list.is_empty())) |
623 | }); |
624 | |
625 | let (entry, empty) = if let Some((e, empty)) = maybe_entry { |
626 | (Some(e), empty) |
627 | } else { |
628 | // No entry found means nuke the list for sure. |
629 | (None, true) |
630 | }; |
631 | if empty { |
632 | //TODO: This could be done with the HashMap::entry API instead. |
633 | inner.idle.remove(&self.key); |
634 | } |
635 | |
636 | if entry.is_none() && self.waiter.is_none() { |
637 | let (tx, mut rx) = oneshot::channel(); |
638 | trace!("checkout waiting for idle connection: {:?}" , self.key); |
639 | inner |
640 | .waiters |
641 | .entry(self.key.clone()) |
642 | .or_insert_with(VecDeque::new) |
643 | .push_back(tx); |
644 | |
645 | // register the waker with this oneshot |
646 | assert!(Pin::new(&mut rx).poll(cx).is_pending()); |
647 | self.waiter = Some(rx); |
648 | } |
649 | |
650 | entry |
651 | }; |
652 | |
653 | entry.map(|e| self.pool.reuse(&self.key, e.value)) |
654 | } |
655 | } |
656 | |
657 | impl<T: Poolable> Future for Checkout<T> { |
658 | type Output = crate::Result<Pooled<T>>; |
659 | |
660 | fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
661 | if let Some(pooled: Pooled) = ready!(self.poll_waiter(cx)?) { |
662 | return Poll::Ready(Ok(pooled)); |
663 | } |
664 | |
665 | if let Some(pooled: Pooled) = self.checkout(cx) { |
666 | Poll::Ready(Ok(pooled)) |
667 | } else if !self.pool.is_enabled() { |
668 | Poll::Ready(Err(crate::Error::new_canceled().with(cause:"pool is disabled" ))) |
669 | } else { |
670 | // There's a new waiter, already registered in self.checkout() |
671 | debug_assert!(self.waiter.is_some()); |
672 | Poll::Pending |
673 | } |
674 | } |
675 | } |
676 | |
677 | impl<T> Drop for Checkout<T> { |
678 | fn drop(&mut self) { |
679 | if self.waiter.take().is_some() { |
680 | trace!("checkout dropped for {:?}" , self.key); |
681 | if let Some(Ok(mut inner: MutexGuard<'_, PoolInner<…>>)) = self.pool.inner.as_ref().map(|i: &Arc>>| i.lock()) { |
682 | inner.clean_waiters(&self.key); |
683 | } |
684 | } |
685 | } |
686 | } |
687 | |
688 | // FIXME: allow() required due to `impl Trait` leaking types to this lint |
689 | #[allow (missing_debug_implementations)] |
690 | pub(super) struct Connecting<T: Poolable> { |
691 | key: Key, |
692 | pool: WeakOpt<Mutex<PoolInner<T>>>, |
693 | } |
694 | |
695 | impl<T: Poolable> Connecting<T> { |
696 | pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> { |
697 | debug_assert!( |
698 | self.pool.0.is_none(), |
699 | "Connecting::alpn_h2 but already Http2" |
700 | ); |
701 | |
702 | pool.connecting(&self.key, Ver::Http2) |
703 | } |
704 | } |
705 | |
706 | impl<T: Poolable> Drop for Connecting<T> { |
707 | fn drop(&mut self) { |
708 | if let Some(pool: Arc>>) = self.pool.upgrade() { |
709 | // No need to panic on drop, that could abort! |
710 | if let Ok(mut inner: MutexGuard<'_, PoolInner<…>>) = pool.lock() { |
711 | inner.connected(&self.key); |
712 | } |
713 | } |
714 | } |
715 | } |
716 | |
717 | struct Expiration(Option<Duration>); |
718 | |
719 | impl Expiration { |
720 | fn new(dur: Option<Duration>) -> Expiration { |
721 | Expiration(dur) |
722 | } |
723 | |
724 | fn expires(&self, instant: Instant) -> bool { |
725 | match self.0 { |
726 | // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470. |
727 | Some(timeout: Duration) => Instant::now().saturating_duration_since(earlier:instant) > timeout, |
728 | None => false, |
729 | } |
730 | } |
731 | } |
732 | |
733 | #[cfg (feature = "runtime" )] |
734 | pin_project_lite::pin_project! { |
735 | struct IdleTask<T> { |
736 | #[pin] |
737 | interval: Interval, |
738 | pool: WeakOpt<Mutex<PoolInner<T>>>, |
739 | // This allows the IdleTask to be notified as soon as the entire |
740 | // Pool is fully dropped, and shutdown. This channel is never sent on, |
741 | // but Err(Canceled) will be received when the Pool is dropped. |
742 | #[pin] |
743 | pool_drop_notifier: oneshot::Receiver<crate::common::Never>, |
744 | } |
745 | } |
746 | |
747 | #[cfg (feature = "runtime" )] |
748 | impl<T: Poolable + 'static> Future for IdleTask<T> { |
749 | type Output = (); |
750 | |
751 | fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
752 | let mut this = self.project(); |
753 | loop { |
754 | match this.pool_drop_notifier.as_mut().poll(cx) { |
755 | Poll::Ready(Ok(n)) => match n {}, |
756 | Poll::Pending => (), |
757 | Poll::Ready(Err(_canceled)) => { |
758 | trace!("pool closed, canceling idle interval" ); |
759 | return Poll::Ready(()); |
760 | } |
761 | } |
762 | |
763 | ready!(this.interval.as_mut().poll_tick(cx)); |
764 | |
765 | if let Some(inner) = this.pool.upgrade() { |
766 | if let Ok(mut inner) = inner.lock() { |
767 | trace!("idle interval checking for expired" ); |
768 | inner.clear_expired(); |
769 | continue; |
770 | } |
771 | } |
772 | return Poll::Ready(()); |
773 | } |
774 | } |
775 | } |
776 | |
777 | impl<T> WeakOpt<T> { |
778 | fn none() -> Self { |
779 | WeakOpt(None) |
780 | } |
781 | |
782 | fn downgrade(arc: &Arc<T>) -> Self { |
783 | WeakOpt(Some(Arc::downgrade(this:arc))) |
784 | } |
785 | |
786 | fn upgrade(&self) -> Option<Arc<T>> { |
787 | self.0.as_ref().and_then(Weak::upgrade) |
788 | } |
789 | } |
790 | |
791 | #[cfg (test)] |
792 | mod tests { |
793 | use std::task::Poll; |
794 | use std::time::Duration; |
795 | |
796 | use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; |
797 | use crate::common::{exec::Exec, task, Future, Pin}; |
798 | |
799 | /// Test unique reservations. |
800 | #[derive (Debug, PartialEq, Eq)] |
801 | struct Uniq<T>(T); |
802 | |
803 | impl<T: Send + 'static + Unpin> Poolable for Uniq<T> { |
804 | fn is_open(&self) -> bool { |
805 | true |
806 | } |
807 | |
808 | fn reserve(self) -> Reservation<Self> { |
809 | Reservation::Unique(self) |
810 | } |
811 | |
812 | fn can_share(&self) -> bool { |
813 | false |
814 | } |
815 | } |
816 | |
817 | fn c<T: Poolable>(key: Key) -> Connecting<T> { |
818 | Connecting { |
819 | key, |
820 | pool: WeakOpt::none(), |
821 | } |
822 | } |
823 | |
824 | fn host_key(s: &str) -> Key { |
825 | (http::uri::Scheme::HTTP, s.parse().expect("host key" )) |
826 | } |
827 | |
828 | fn pool_no_timer<T>() -> Pool<T> { |
829 | pool_max_idle_no_timer(::std::usize::MAX) |
830 | } |
831 | |
832 | fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> { |
833 | let pool = Pool::new( |
834 | super::Config { |
835 | idle_timeout: Some(Duration::from_millis(100)), |
836 | max_idle_per_host: max_idle, |
837 | }, |
838 | &Exec::Default, |
839 | ); |
840 | pool.no_timer(); |
841 | pool |
842 | } |
843 | |
844 | #[tokio::test] |
845 | async fn test_pool_checkout_smoke() { |
846 | let pool = pool_no_timer(); |
847 | let key = host_key("foo" ); |
848 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
849 | |
850 | drop(pooled); |
851 | |
852 | match pool.checkout(key).await { |
853 | Ok(pooled) => assert_eq!(*pooled, Uniq(41)), |
854 | Err(_) => panic!("not ready" ), |
855 | }; |
856 | } |
857 | |
858 | /// Helper to check if the future is ready after polling once. |
859 | struct PollOnce<'a, F>(&'a mut F); |
860 | |
861 | impl<F, T, U> Future for PollOnce<'_, F> |
862 | where |
863 | F: Future<Output = Result<T, U>> + Unpin, |
864 | { |
865 | type Output = Option<()>; |
866 | |
867 | fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |
868 | match Pin::new(&mut self.0).poll(cx) { |
869 | Poll::Ready(Ok(_)) => Poll::Ready(Some(())), |
870 | Poll::Ready(Err(_)) => Poll::Ready(Some(())), |
871 | Poll::Pending => Poll::Ready(None), |
872 | } |
873 | } |
874 | } |
875 | |
876 | #[tokio::test] |
877 | async fn test_pool_checkout_returns_none_if_expired() { |
878 | let pool = pool_no_timer(); |
879 | let key = host_key("foo" ); |
880 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
881 | |
882 | drop(pooled); |
883 | tokio::time::sleep(pool.locked().timeout.unwrap()).await; |
884 | let mut checkout = pool.checkout(key); |
885 | let poll_once = PollOnce(&mut checkout); |
886 | let is_not_ready = poll_once.await.is_none(); |
887 | assert!(is_not_ready); |
888 | } |
889 | |
890 | #[cfg (feature = "runtime" )] |
891 | #[tokio::test] |
892 | async fn test_pool_checkout_removes_expired() { |
893 | let pool = pool_no_timer(); |
894 | let key = host_key("foo" ); |
895 | |
896 | pool.pooled(c(key.clone()), Uniq(41)); |
897 | pool.pooled(c(key.clone()), Uniq(5)); |
898 | pool.pooled(c(key.clone()), Uniq(99)); |
899 | |
900 | assert_eq!( |
901 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
902 | Some(3) |
903 | ); |
904 | tokio::time::sleep(pool.locked().timeout.unwrap()).await; |
905 | |
906 | let mut checkout = pool.checkout(key.clone()); |
907 | let poll_once = PollOnce(&mut checkout); |
908 | // checkout.await should clean out the expired |
909 | poll_once.await; |
910 | assert!(pool.locked().idle.get(&key).is_none()); |
911 | } |
912 | |
913 | #[test ] |
914 | fn test_pool_max_idle_per_host() { |
915 | let pool = pool_max_idle_no_timer(2); |
916 | let key = host_key("foo" ); |
917 | |
918 | pool.pooled(c(key.clone()), Uniq(41)); |
919 | pool.pooled(c(key.clone()), Uniq(5)); |
920 | pool.pooled(c(key.clone()), Uniq(99)); |
921 | |
922 | // pooled and dropped 3, max_idle should only allow 2 |
923 | assert_eq!( |
924 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
925 | Some(2) |
926 | ); |
927 | } |
928 | |
929 | #[cfg (feature = "runtime" )] |
930 | #[tokio::test] |
931 | async fn test_pool_timer_removes_expired() { |
932 | let _ = pretty_env_logger::try_init(); |
933 | tokio::time::pause(); |
934 | |
935 | let pool = Pool::new( |
936 | super::Config { |
937 | idle_timeout: Some(Duration::from_millis(10)), |
938 | max_idle_per_host: std::usize::MAX, |
939 | }, |
940 | &Exec::Default, |
941 | ); |
942 | |
943 | let key = host_key("foo" ); |
944 | |
945 | pool.pooled(c(key.clone()), Uniq(41)); |
946 | pool.pooled(c(key.clone()), Uniq(5)); |
947 | pool.pooled(c(key.clone()), Uniq(99)); |
948 | |
949 | assert_eq!( |
950 | pool.locked().idle.get(&key).map(|entries| entries.len()), |
951 | Some(3) |
952 | ); |
953 | |
954 | // Let the timer tick passed the expiration... |
955 | tokio::time::advance(Duration::from_millis(30)).await; |
956 | // Yield so the Interval can reap... |
957 | tokio::task::yield_now().await; |
958 | |
959 | assert!(pool.locked().idle.get(&key).is_none()); |
960 | } |
961 | |
962 | #[tokio::test] |
963 | async fn test_pool_checkout_task_unparked() { |
964 | use futures_util::future::join; |
965 | use futures_util::FutureExt; |
966 | |
967 | let pool = pool_no_timer(); |
968 | let key = host_key("foo" ); |
969 | let pooled = pool.pooled(c(key.clone()), Uniq(41)); |
970 | |
971 | let checkout = join(pool.checkout(key), async { |
972 | // the checkout future will park first, |
973 | // and then this lazy future will be polled, which will insert |
974 | // the pooled back into the pool |
975 | // |
976 | // this test makes sure that doing so will unpark the checkout |
977 | drop(pooled); |
978 | }) |
979 | .map(|(entry, _)| entry); |
980 | |
981 | assert_eq!(*checkout.await.unwrap(), Uniq(41)); |
982 | } |
983 | |
984 | #[tokio::test] |
985 | async fn test_pool_checkout_drop_cleans_up_waiters() { |
986 | let pool = pool_no_timer::<Uniq<i32>>(); |
987 | let key = host_key("foo" ); |
988 | |
989 | let mut checkout1 = pool.checkout(key.clone()); |
990 | let mut checkout2 = pool.checkout(key.clone()); |
991 | |
992 | let poll_once1 = PollOnce(&mut checkout1); |
993 | let poll_once2 = PollOnce(&mut checkout2); |
994 | |
995 | // first poll needed to get into Pool's parked |
996 | poll_once1.await; |
997 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); |
998 | poll_once2.await; |
999 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); |
1000 | |
1001 | // on drop, clean up Pool |
1002 | drop(checkout1); |
1003 | assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); |
1004 | |
1005 | drop(checkout2); |
1006 | assert!(pool.locked().waiters.get(&key).is_none()); |
1007 | } |
1008 | |
1009 | #[derive (Debug)] |
1010 | struct CanClose { |
1011 | #[allow (unused)] |
1012 | val: i32, |
1013 | closed: bool, |
1014 | } |
1015 | |
1016 | impl Poolable for CanClose { |
1017 | fn is_open(&self) -> bool { |
1018 | !self.closed |
1019 | } |
1020 | |
1021 | fn reserve(self) -> Reservation<Self> { |
1022 | Reservation::Unique(self) |
1023 | } |
1024 | |
1025 | fn can_share(&self) -> bool { |
1026 | false |
1027 | } |
1028 | } |
1029 | |
1030 | #[test ] |
1031 | fn pooled_drop_if_closed_doesnt_reinsert() { |
1032 | let pool = pool_no_timer(); |
1033 | let key = host_key("foo" ); |
1034 | pool.pooled( |
1035 | c(key.clone()), |
1036 | CanClose { |
1037 | val: 57, |
1038 | closed: true, |
1039 | }, |
1040 | ); |
1041 | |
1042 | assert!(!pool.locked().idle.contains_key(&key)); |
1043 | } |
1044 | } |
1045 | |