1use std::collections::hash_map::Entry;
2use std::collections::{HashMap, VecDeque};
3use std::io::{self, Read};
4use std::sync::Mutex;
5
6use crate::agent::AgentState;
7use crate::stream::Stream;
8use crate::{Agent, Proxy};
9
10use log::debug;
11use url::Url;
12
13/// Holder of recycled connections.
14///
15/// For each [`PoolKey`] (approximately hostname and port), there may be
16/// multiple connections stored in the `recycle` map. If so, they are stored in
17/// order from oldest at the front to freshest at the back.
18///
19/// The `lru` [`VecDeque`] is a companion struct to `recycle`, and is used to keep
20/// track of which connections to expire if the pool is full on the next insert.
21/// A given [`PoolKey`] can occur in `lru` multiple times. The first entry in `lru` for
22/// a key `K` represents the first entry in `recycle[K]`. The second entry in `lru`
23/// for `K` represents the second entry in `recycle[K]`, and so on. In other
24/// words, `lru` is ordered the same way as the [`VecDeque`] entries in `recycle`:
25/// oldest at the front, freshest at the back. This allows keeping track of which
26/// host should have its connection dropped next.
27///
28/// These invariants hold at the start and end of each method:
29/// - The length `lru` is equal to the sum of lengths of `recycle`'s [`VecDeque`]s.
30/// - Each PoolKey exists the same number of times in `lru` as it has entries in `recycle`.
31/// - If there is an entry in `recycle`, it has at least one element.
32/// - The length of `lru` is less than or equal to [`Self::max_idle_connections`].
33/// - The length of `recycle[K]` is less than or equal to [`Self::max_idle_connections_per_host`].
34///
35/// *Internal API*
36pub(crate) struct ConnectionPool {
37 inner: Mutex<Inner>,
38 max_idle_connections: usize,
39 max_idle_connections_per_host: usize,
40}
41
42struct Inner {
43 // the actual pooled connection. however only one per hostname:port.
44 recycle: HashMap<PoolKey, VecDeque<Stream>>,
45 // This is used to keep track of which streams to expire when the
46 // pool reaches MAX_IDLE_CONNECTIONS. The corresponding PoolKeys for
47 // recently used Streams are added to the back of the queue;
48 // old streams are removed from the front.
49 lru: VecDeque<PoolKey>,
50}
51
52impl fmt::Debug for ConnectionPool {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f&mut DebugStruct<'_, '_>.debug_struct("ConnectionPool")
55 .field("max_idle", &self.max_idle_connections)
56 .field("max_idle_per_host", &self.max_idle_connections_per_host)
57 .field(name:"connections", &self.inner.lock().unwrap().lru.len())
58 .finish()
59 }
60}
61fn remove_first_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
62 match list.iter().position(|x: &PoolKey| x == key) {
63 Some(i: usize) => list.remove(index:i),
64 None => None,
65 }
66}
67
68fn remove_last_match(list: &mut VecDeque<PoolKey>, key: &PoolKey) -> Option<PoolKey> {
69 match list.iter().rposition(|x: &PoolKey| x == key) {
70 Some(i: usize) => list.remove(index:i),
71 None => None,
72 }
73}
74
75impl ConnectionPool {
76 pub(crate) fn new_with_limits(
77 max_idle_connections: usize,
78 max_idle_connections_per_host: usize,
79 ) -> Self {
80 ConnectionPool {
81 inner: Mutex::new(Inner {
82 recycle: HashMap::new(),
83 lru: VecDeque::new(),
84 }),
85 max_idle_connections,
86 max_idle_connections_per_host,
87 }
88 }
89
90 /// Return true if either of the max_* settings is 0, meaning we should do no work.
91 fn noop(&self) -> bool {
92 self.max_idle_connections == 0 || self.max_idle_connections_per_host == 0
93 }
94
95 /// How the unit::connect tries to get a pooled connection.
96 pub fn try_get_connection(&self, url: &Url, proxy: Option<Proxy>) -> Option<Stream> {
97 let key = PoolKey::new(url, proxy);
98 self.remove(&key)
99 }
100
101 fn remove(&self, key: &PoolKey) -> Option<Stream> {
102 let mut inner = self.inner.lock().unwrap();
103 match inner.recycle.entry(key.clone()) {
104 Entry::Occupied(mut occupied_entry) => {
105 let streams = occupied_entry.get_mut();
106 // Take the newest stream.
107 let stream = streams.pop_back();
108 let stream = stream.expect("invariant failed: empty VecDeque in `recycle`");
109
110 if streams.is_empty() {
111 occupied_entry.remove();
112 }
113
114 // Remove the newest matching PoolKey from self.lru. That
115 // corresponds to the stream we just removed from `recycle`.
116 remove_last_match(&mut inner.lru, key)
117 .expect("invariant failed: key in recycle but not in lru");
118
119 debug!("pulling stream from pool: {:?} -> {:?}", key, stream);
120 Some(stream)
121 }
122 Entry::Vacant(_) => None,
123 }
124 }
125
126 pub(crate) fn add(&self, key: &PoolKey, stream: Stream) {
127 if self.noop() {
128 return;
129 }
130 debug!("adding stream to pool: {:?} -> {:?}", key, stream);
131
132 let mut inner = self.inner.lock().unwrap();
133 match inner.recycle.entry(key.clone()) {
134 Entry::Occupied(mut occupied_entry) => {
135 let streams = occupied_entry.get_mut();
136 streams.push_back(stream);
137 if streams.len() > self.max_idle_connections_per_host {
138 // Remove the oldest entry
139 let stream = streams.pop_front().expect("empty streams list");
140 debug!(
141 "host {:?} has {} conns, dropping oldest: {:?}",
142 key,
143 streams.len(),
144 stream
145 );
146 remove_first_match(&mut inner.lru, key)
147 .expect("invariant failed: key in recycle but not in lru");
148 }
149 }
150 Entry::Vacant(vacant_entry) => {
151 vacant_entry.insert(vec![stream].into());
152 }
153 }
154 inner.lru.push_back(key.clone());
155 if inner.lru.len() > self.max_idle_connections {
156 drop(inner);
157 self.remove_oldest()
158 }
159 }
160
161 /// Find the oldest stream in the pool. Remove its representation from lru,
162 /// and the stream itself from `recycle`. Drops the stream, which closes it.
163 fn remove_oldest(&self) {
164 assert!(!self.noop(), "remove_oldest called on Pool with max of 0");
165 let mut inner = self.inner.lock().unwrap();
166 let key = inner.lru.pop_front();
167 let key = key.expect("tried to remove oldest but no entries found!");
168 match inner.recycle.entry(key) {
169 Entry::Occupied(mut occupied_entry) => {
170 let streams = occupied_entry.get_mut();
171 let stream = streams
172 .pop_front()
173 .expect("invariant failed: key existed in recycle but no streams available");
174 debug!("dropping oldest stream in pool: {:?}", stream);
175 if streams.is_empty() {
176 occupied_entry.remove();
177 }
178 }
179 Entry::Vacant(_) => panic!("invariant failed: key existed in lru but not in recycle"),
180 }
181 }
182
183 #[cfg(test)]
184 pub fn len(&self) -> usize {
185 self.inner.lock().unwrap().lru.len()
186 }
187}
188
189#[derive(PartialEq, Clone, Eq, Hash)]
190pub(crate) struct PoolKey {
191 scheme: String,
192 hostname: String,
193 port: Option<u16>,
194 proxy: Option<Proxy>,
195}
196
197use std::fmt;
198
199impl fmt::Debug for PoolKey {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201 f.write_fmt(format_args!(
202 "{}|{}|{}",
203 self.scheme,
204 self.hostname,
205 self.port.unwrap_or(0)
206 ))
207 }
208}
209
210impl PoolKey {
211 fn new(url: &Url, proxy: Option<Proxy>) -> Self {
212 let port: Option = url.port_or_known_default();
213 PoolKey {
214 scheme: url.scheme().to_string(),
215 hostname: url.host_str().unwrap_or(default:"").to_string(),
216 port,
217 proxy,
218 }
219 }
220
221 pub(crate) fn from_parts(scheme: &str, hostname: &str, port: u16) -> Self {
222 PoolKey {
223 scheme: scheme.to_string(),
224 hostname: hostname.to_string(),
225 port: Some(port),
226 proxy: None,
227 }
228 }
229}
230
231#[derive(Clone, Debug)]
232pub(crate) struct PoolReturner {
233 // We store a weak reference to an agent state here to avoid creating
234 // a reference loop, since AgentState contains a ConnectionPool, which
235 // contains Streams, which contain PoolReturners.
236 inner: Option<(std::sync::Weak<AgentState>, PoolKey)>,
237}
238
239impl PoolReturner {
240 /// A PoolReturner that returns to the given Agent's Pool.
241 pub(crate) fn new(agent: &Agent, pool_key: PoolKey) -> Self {
242 Self {
243 inner: Some((agent.weak_state(), pool_key)),
244 }
245 }
246
247 /// A PoolReturner that does nothing
248 pub(crate) fn none() -> Self {
249 Self { inner: None }
250 }
251
252 pub(crate) fn return_to_pool(&self, stream: Stream) {
253 if let Some((weak_state: &Weak, pool_key: &PoolKey)) = &self.inner {
254 if let Some(state: Arc) = weak_state.upgrade() {
255 state.pool.add(pool_key, stream);
256 }
257 }
258 }
259}
260
261/// Read wrapper that returns a stream to the pool once the
262/// read is exhausted (reached a 0).
263///
264/// *Internal API*
265pub(crate) struct PoolReturnRead<R: Read + Sized + Into<Stream>> {
266 // wrapped reader around the same stream. It's an Option because we `take()` it
267 // upon returning the stream to the Agent.
268 reader: Option<R>,
269}
270
271impl<R: Read + Sized + Into<Stream>> PoolReturnRead<R> {
272 pub fn new(reader: R) -> Self {
273 PoolReturnRead {
274 reader: Some(reader),
275 }
276 }
277
278 fn return_connection(&mut self) -> io::Result<()> {
279 // guard we only do this once.
280 if let Some(reader) = self.reader.take() {
281 // bring back stream here to either go into pool or dealloc
282 let stream: Stream = reader.into();
283 stream.return_to_pool()?;
284 }
285
286 Ok(())
287 }
288
289 fn do_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
290 match self.reader.as_mut() {
291 None => Ok(0),
292 Some(reader) => reader.read(buf),
293 }
294 }
295}
296
297impl<R: Read + Sized + Into<Stream>> Read for PoolReturnRead<R> {
298 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
299 let amount: usize = self.do_read(buf)?;
300 // only if the underlying reader is exhausted can we send a new
301 // request to the same socket. hence, we only return it now.
302 if amount == 0 {
303 self.return_connection()?;
304 }
305 Ok(amount)
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use std::io;
312
313 use crate::stream::{remote_addr_for_test, Stream};
314 use crate::ReadWrite;
315
316 use super::*;
317
318 #[derive(Debug)]
319 struct NoopStream;
320
321 impl NoopStream {
322 fn stream(pool_returner: PoolReturner) -> Stream {
323 Stream::new(NoopStream, remote_addr_for_test(), pool_returner)
324 }
325 }
326
327 impl Read for NoopStream {
328 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
329 Ok(buf.len())
330 }
331 }
332
333 impl std::io::Write for NoopStream {
334 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
335 Ok(buf.len())
336 }
337
338 fn flush(&mut self) -> io::Result<()> {
339 Ok(())
340 }
341 }
342
343 impl ReadWrite for NoopStream {
344 fn socket(&self) -> Option<&std::net::TcpStream> {
345 None
346 }
347 }
348
349 #[test]
350 fn poolkey_new() {
351 // Test that PoolKey::new() does not panic on unrecognized schemes.
352 PoolKey::new(&Url::parse("zzz:///example.com").unwrap(), None);
353 }
354
355 #[test]
356 fn pool_connections_limit() {
357 // Test inserting connections with different keys into the pool,
358 // filling and draining it. The pool should evict earlier connections
359 // when the connection limit is reached.
360 let pool = ConnectionPool::new_with_limits(10, 1);
361 let hostnames = (0..pool.max_idle_connections * 2).map(|i| format!("{}.example", i));
362 let poolkeys = hostnames.map(|hostname| PoolKey {
363 scheme: "https".to_string(),
364 hostname,
365 port: Some(999),
366 proxy: None,
367 });
368 for key in poolkeys.clone() {
369 pool.add(&key, NoopStream::stream(PoolReturner::none()));
370 }
371 assert_eq!(pool.len(), pool.max_idle_connections);
372
373 for key in poolkeys.skip(pool.max_idle_connections) {
374 let result = pool.remove(&key);
375 assert!(result.is_some(), "expected key was not in pool");
376 }
377 assert_eq!(pool.len(), 0)
378 }
379
380 #[test]
381 fn pool_per_host_connections_limit() {
382 // Test inserting connections with the same key into the pool,
383 // filling and draining it. The pool should evict earlier connections
384 // when the per-host connection limit is reached.
385 let pool = ConnectionPool::new_with_limits(10, 2);
386 let poolkey = PoolKey {
387 scheme: "https".to_string(),
388 hostname: "example.com".to_string(),
389 port: Some(999),
390 proxy: None,
391 };
392
393 for _ in 0..pool.max_idle_connections_per_host * 2 {
394 pool.add(&poolkey, NoopStream::stream(PoolReturner::none()))
395 }
396 assert_eq!(pool.len(), pool.max_idle_connections_per_host);
397
398 for _ in 0..pool.max_idle_connections_per_host {
399 let result = pool.remove(&poolkey);
400 assert!(result.is_some(), "expected key was not in pool");
401 }
402 assert_eq!(pool.len(), 0);
403 }
404
405 #[test]
406 fn pool_checks_proxy() {
407 // Test inserting different poolkeys with same address but different proxies.
408 // Each insertion should result in an additional entry in the pool.
409 let pool = ConnectionPool::new_with_limits(10, 1);
410 let url = Url::parse("zzz:///example.com").unwrap();
411 let pool_key = PoolKey::new(&url, None);
412
413 pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
414 assert_eq!(pool.len(), 1);
415
416 let pool_key = PoolKey::new(&url, Some(Proxy::new("localhost:9999").unwrap()));
417
418 pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
419 assert_eq!(pool.len(), 2);
420
421 let pool_key = PoolKey::new(
422 &url,
423 Some(Proxy::new("user:password@localhost:9999").unwrap()),
424 );
425
426 pool.add(&pool_key, NoopStream::stream(PoolReturner::none()));
427 assert_eq!(pool.len(), 3);
428 }
429
430 // Test that a stream gets returned to the pool if it was wrapped in a LimitedRead, and
431 // user reads the exact right number of bytes (but never gets a read of 0 bytes).
432 #[test]
433 fn read_exact() {
434 use crate::response::LimitedRead;
435
436 let url = Url::parse("https:///example.com").unwrap();
437
438 let mut out_buf = [0u8; 500];
439
440 let agent = Agent::new();
441 let pool_key = PoolKey::new(&url, None);
442 let stream = NoopStream::stream(PoolReturner::new(&agent, pool_key));
443 let mut limited_read = LimitedRead::new(stream, std::num::NonZeroUsize::new(500).unwrap());
444
445 limited_read.read_exact(&mut out_buf).unwrap();
446
447 assert_eq!(agent.state.pool.len(), 1);
448 }
449
450 // Test that a stream gets returned to the pool if it is gzip encoded and the gzip
451 // decoder reads the exact amount from a chunked stream, not past the 0. This
452 // happens because gzip has built-in knowledge of the length to read.
453 #[test]
454 #[cfg(feature = "gzip")]
455 fn read_exact_chunked_gzip() {
456 use crate::response::Compression;
457 use std::io::Cursor;
458
459 let gz_body = vec![
460 b'E', b'\r', b'\n', // 14 first chunk
461 0x1F, 0x8B, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x03, 0xCB, 0x48, 0xCD, 0xC9,
462 b'\r', b'\n', //
463 b'E', b'\r', b'\n', // 14 second chunk
464 0xC9, 0x57, 0x28, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 0xC8, 0x18, 0xBC, 0x6C, 0x00, 0xA5,
465 b'\r', b'\n', //
466 b'7', b'\r', b'\n', // 7 third chunk
467 0x5C, 0x7C, 0xEF, 0xA7, 0x00, 0x00, 0x00, //
468 b'\r', b'\n', //
469 // end
470 b'0', b'\r', b'\n', //
471 b'\r', b'\n', //
472 ];
473
474 let agent = Agent::new();
475 assert_eq!(agent.state.pool.len(), 0);
476
477 let ro = crate::test::TestStream::new(Cursor::new(gz_body), std::io::sink());
478 let stream = Stream::new(
479 ro,
480 "1.1.1.1:4343".parse().unwrap(),
481 PoolReturner::new(&agent, PoolKey::from_parts("http", "1.1.1.1", 8080)),
482 );
483
484 let chunked = crate::chunked::Decoder::new(stream);
485 let pool_return_read: Box<(dyn Read + Send + Sync + 'static)> =
486 Box::new(PoolReturnRead::new(chunked));
487
488 let compression = Compression::Gzip;
489 let mut stream = compression.wrap_reader(pool_return_read);
490
491 io::copy(&mut stream, &mut io::sink()).unwrap();
492
493 assert_eq!(agent.state.pool.len(), 1);
494 }
495}
496