1use log::debug;
2use std::io::{self, BufRead, BufReader, Read, Write};
3use std::net::SocketAddr;
4use std::net::TcpStream;
5use std::ops::Div;
6use std::time::Duration;
7use std::time::Instant;
8use std::{fmt, io::Cursor};
9
10#[cfg(feature = "socks-proxy")]
11use socks::{TargetAddr, ToTargetAddr};
12
13use crate::chunked::Decoder as ChunkDecoder;
14use crate::error::ErrorKind;
15use crate::pool::{PoolKey, PoolReturner};
16use crate::proxy::Proxy;
17use crate::unit::Unit;
18use crate::Response;
19use crate::{error::Error, proxy::Proto};
20
21/// Trait for things implementing [std::io::Read] + [std::io::Write]. Used in [TlsConnector].
22pub trait ReadWrite: Read + Write + Send + Sync + fmt::Debug + 'static {
23 fn socket(&self) -> Option<&TcpStream>;
24}
25
26impl ReadWrite for TcpStream {
27 fn socket(&self) -> Option<&TcpStream> {
28 Some(self)
29 }
30}
31
32pub trait TlsConnector: Send + Sync {
33 fn connect(
34 &self,
35 dns_name: &str,
36 io: Box<dyn ReadWrite>,
37 ) -> Result<Box<dyn ReadWrite>, crate::error::Error>;
38}
39
40pub(crate) struct Stream {
41 inner: BufReader<Box<dyn ReadWrite>>,
42 /// The remote address the stream is connected to.
43 pub(crate) remote_addr: SocketAddr,
44 pool_returner: PoolReturner,
45}
46
47impl<T: ReadWrite + ?Sized> ReadWrite for Box<T> {
48 fn socket(&self) -> Option<&TcpStream> {
49 ReadWrite::socket(self.as_ref())
50 }
51}
52
53// DeadlineStream wraps a stream such that read() will return an error
54// after the provided deadline, and sets timeouts on the underlying
55// TcpStream to ensure read() doesn't block beyond the deadline.
56// When the From trait is used to turn a DeadlineStream back into a
57// Stream (by PoolReturnRead), the timeouts are removed.
58pub(crate) struct DeadlineStream {
59 stream: Stream,
60 deadline: Option<Instant>,
61}
62
63impl DeadlineStream {
64 pub(crate) fn new(stream: Stream, deadline: Option<Instant>) -> Self {
65 DeadlineStream { stream, deadline }
66 }
67
68 pub(crate) fn inner_ref(&self) -> &Stream {
69 &self.stream
70 }
71
72 pub(crate) fn inner_mut(&mut self) -> &mut Stream {
73 &mut self.stream
74 }
75}
76
77impl From<DeadlineStream> for Stream {
78 fn from(deadline_stream: DeadlineStream) -> Stream {
79 deadline_stream.stream
80 }
81}
82
83impl BufRead for DeadlineStream {
84 fn fill_buf(&mut self) -> io::Result<&[u8]> {
85 if let Some(deadline) = self.deadline {
86 let timeout = time_until_deadline(deadline)?;
87 if let Some(socket) = self.stream.socket() {
88 socket.set_read_timeout(Some(timeout))?;
89 socket.set_write_timeout(Some(timeout))?;
90 }
91 }
92 self.stream.fill_buf().map_err(|e| {
93 // On unix-y platforms set_read_timeout and set_write_timeout
94 // causes ErrorKind::WouldBlock instead of ErrorKind::TimedOut.
95 // Since the socket most definitely not set_nonblocking(true),
96 // we can safely normalize WouldBlock to TimedOut
97 if e.kind() == io::ErrorKind::WouldBlock {
98 return io_err_timeout("timed out reading response".to_string());
99 }
100 e
101 })
102 }
103
104 fn consume(&mut self, amt: usize) {
105 self.stream.consume(amt)
106 }
107}
108
109impl Read for DeadlineStream {
110 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
111 // If the stream's BufReader has any buffered bytes, return those first.
112 // This avoids calling `fill_buf()` on DeadlineStream unnecessarily,
113 // since that call always does a syscall. This ensures DeadlineStream
114 // can pass through the efficiency we gain by using a BufReader in Stream.
115 if !self.stream.inner.buffer().is_empty() {
116 let n: usize = self.stream.inner.buffer().read(buf)?;
117 self.stream.inner.consume(amt:n);
118 return Ok(n);
119 }
120 // All reads on a DeadlineStream use the BufRead impl. This ensures
121 // that we have a chance to set the correct timeout before each recv
122 // syscall.
123 // Copied from the BufReader implementation of `read()`.
124 let nread: usize = {
125 let mut rem: &[u8] = self.fill_buf()?;
126 rem.read(buf)?
127 };
128 self.consume(amt:nread);
129 Ok(nread)
130 }
131}
132
133// If the deadline is in the future, return the remaining time until
134// then. Otherwise return a TimedOut error.
135fn time_until_deadline(deadline: Instant) -> io::Result<Duration> {
136 let now: Instant = Instant::now();
137 match deadline.checked_duration_since(earlier:now) {
138 None => Err(io_err_timeout(error:"timed out reading response".to_string())),
139 Some(duration: Duration) => Ok(duration),
140 }
141}
142
143pub(crate) fn io_err_timeout(error: String) -> io::Error {
144 io::Error::new(kind:io::ErrorKind::TimedOut, error)
145}
146
147#[derive(Debug)]
148pub(crate) struct ReadOnlyStream(Cursor<Vec<u8>>);
149
150impl ReadOnlyStream {
151 pub(crate) fn new(v: Vec<u8>) -> Self {
152 Self(Cursor::new(inner:v))
153 }
154}
155
156impl Read for ReadOnlyStream {
157 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
158 self.0.read(buf)
159 }
160}
161
162impl std::io::Write for ReadOnlyStream {
163 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
164 Ok(buf.len())
165 }
166
167 fn flush(&mut self) -> io::Result<()> {
168 Ok(())
169 }
170}
171
172impl ReadWrite for ReadOnlyStream {
173 fn socket(&self) -> Option<&std::net::TcpStream> {
174 None
175 }
176}
177
178impl fmt::Debug for Stream {
179 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
180 match self.inner.get_ref().socket() {
181 Some(_) => write!(f, "Stream({:?})", self.inner.get_ref()),
182 None => write!(f, "Stream(Test)"),
183 }
184 }
185}
186
187impl Stream {
188 pub(crate) fn new(
189 t: impl ReadWrite,
190 remote_addr: SocketAddr,
191 pool_returner: PoolReturner,
192 ) -> Stream {
193 Stream::logged_create(Stream {
194 inner: BufReader::new(Box::new(t)),
195 remote_addr,
196 pool_returner,
197 })
198 }
199
200 fn logged_create(stream: Stream) -> Stream {
201 debug!("created stream: {:?}", stream);
202 stream
203 }
204
205 pub(crate) fn buffer(&self) -> &[u8] {
206 self.inner.buffer()
207 }
208
209 // Check if the server has closed a stream by performing a one-byte
210 // non-blocking read. If this returns EOF, the server has closed the
211 // connection: return true. If this returns a successful read, there are
212 // some bytes on the connection even though there was no inflight request.
213 // For plain HTTP streams, that might mean an HTTP 408 was pushed; it
214 // could also mean a buggy server that sent more bytes than a response's
215 // Content-Length. For HTTPS streams, that might mean a close_notify alert,
216 // which is the proper way to shut down an idle stream.
217 // Either way, bytes available on the stream before we've made a request
218 // means the stream is not usable, so we should discard it.
219 // If this returns WouldBlock (aka EAGAIN),
220 // that means the connection is still open: return false. Otherwise
221 // return an error.
222 fn serverclosed_stream(stream: &std::net::TcpStream) -> io::Result<bool> {
223 let mut buf = [0; 1];
224 stream.set_nonblocking(true)?;
225
226 let result = match stream.peek(&mut buf) {
227 Ok(n) => {
228 debug!(
229 "peek on reused connection returned {}, not WouldBlock; discarding",
230 n
231 );
232 Ok(true)
233 }
234 Err(e) if e.kind() == io::ErrorKind::WouldBlock => Ok(false),
235 Err(e) => Err(e),
236 };
237 stream.set_nonblocking(false)?;
238
239 result
240 }
241 // Return true if the server has closed this connection.
242 pub(crate) fn server_closed(&self) -> io::Result<bool> {
243 match self.socket() {
244 Some(socket) => Stream::serverclosed_stream(socket),
245 None => Ok(false),
246 }
247 }
248
249 pub(crate) fn set_unpoolable(&mut self) {
250 self.pool_returner = PoolReturner::none();
251 }
252
253 pub(crate) fn return_to_pool(mut self) -> io::Result<()> {
254 // ensure stream can be reused
255 self.reset()?;
256 self.pool_returner.clone().return_to_pool(self);
257 Ok(())
258 }
259
260 pub(crate) fn reset(&mut self) -> io::Result<()> {
261 // When we are turning this back into a regular, non-deadline Stream,
262 // remove any timeouts we set.
263 if let Some(socket) = self.socket() {
264 socket.set_read_timeout(None)?;
265 socket.set_write_timeout(None)?;
266 }
267
268 Ok(())
269 }
270
271 pub(crate) fn socket(&self) -> Option<&TcpStream> {
272 self.inner.get_ref().socket()
273 }
274
275 pub(crate) fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
276 if let Some(socket) = self.socket() {
277 socket.set_read_timeout(timeout)
278 } else {
279 Ok(())
280 }
281 }
282}
283
284impl Read for Stream {
285 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
286 self.inner.read(buf)
287 }
288}
289
290impl BufRead for Stream {
291 fn fill_buf(&mut self) -> io::Result<&[u8]> {
292 self.inner.fill_buf()
293 }
294
295 fn consume(&mut self, amt: usize) {
296 self.inner.consume(amt)
297 }
298}
299
300impl<R: Read> From<ChunkDecoder<R>> for Stream
301where
302 R: Read,
303 Stream: From<R>,
304{
305 fn from(chunk_decoder: ChunkDecoder<R>) -> Stream {
306 chunk_decoder.into_inner().into()
307 }
308}
309
310impl Write for Stream {
311 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
312 self.inner.get_mut().write(buf)
313 }
314 fn flush(&mut self) -> io::Result<()> {
315 self.inner.get_mut().flush()
316 }
317}
318
319impl Drop for Stream {
320 fn drop(&mut self) {
321 debug!("dropping stream: {:?}", self);
322 }
323}
324
325pub(crate) fn connect_http(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
326 //
327 let port: u16 = unit.url.port().unwrap_or(default:80);
328 let pool_key: PoolKey = PoolKey::from_parts(scheme:"http", hostname, port);
329 let pool_returner: PoolReturner = PoolReturner::new(&unit.agent, pool_key);
330 connect_host(unit, hostname, port).map(|(t: TcpStream, r: SocketAddr)| Stream::new(t, remote_addr:r, pool_returner))
331}
332
333pub(crate) fn connect_https(unit: &Unit, hostname: &str) -> Result<Stream, Error> {
334 let port: u16 = unit.url.port().unwrap_or(default:443);
335
336 let (sock: TcpStream, remote_addr: SocketAddr) = connect_host(unit, hostname, port)?;
337
338 let tls_conf: &TlsConfig = &unit.agent.config.tls_config;
339 let https_stream: Box = tls_conf.connect(dns_name:hostname, io:Box::new(sock))?;
340 let pool_key: PoolKey = PoolKey::from_parts(scheme:"https", hostname, port);
341 let pool_returner: PoolReturner = PoolReturner::new(&unit.agent, pool_key);
342 Ok(Stream::new(t:https_stream, remote_addr, pool_returner))
343}
344
345/// If successful, returns a `TcpStream` and the remote address it is connected to.
346pub(crate) fn connect_host(
347 unit: &Unit,
348 hostname: &str,
349 port: u16,
350) -> Result<(TcpStream, SocketAddr), Error> {
351 let connect_deadline: Option<Instant> =
352 if let Some(timeout_connect) = unit.agent.config.timeout_connect {
353 Instant::now().checked_add(timeout_connect)
354 } else {
355 unit.deadline
356 };
357 let proxy: Option<Proxy> = unit.agent.config.proxy.clone();
358 let netloc = match proxy {
359 Some(ref proxy) => format!("{}:{}", proxy.server, proxy.port),
360 None => format!("{}:{}", hostname, port),
361 };
362
363 // TODO: Find a way to apply deadline to DNS lookup.
364 let sock_addrs = unit.resolver().resolve(&netloc).map_err(|e| {
365 ErrorKind::Dns
366 .msg(format!("resolve dns name '{}'", netloc))
367 .src(e)
368 })?;
369
370 if sock_addrs.is_empty() {
371 return Err(ErrorKind::Dns.msg(format!("No ip address for {}", hostname)));
372 }
373
374 let proto = proxy.as_ref().map(|proxy| proxy.proto);
375
376 let mut any_err = None;
377 let mut any_stream_and_addr = None;
378 // Find the first sock_addr that accepts a connection
379 let multiple_addrs = sock_addrs.len() > 1;
380
381 for sock_addr in sock_addrs {
382 // ensure connect timeout or overall timeout aren't yet hit.
383 let timeout = match connect_deadline {
384 Some(deadline) => {
385 let mut deadline = time_until_deadline(deadline)?;
386 if multiple_addrs {
387 deadline = deadline.div(2);
388 }
389 Some(deadline)
390 }
391 None => None,
392 };
393
394 debug!("connecting to {} at {}", netloc, &sock_addr);
395
396 // connect with a configured timeout.
397 #[allow(clippy::unnecessary_unwrap)]
398 let stream = if proto.is_some() && Some(Proto::HTTP) != proto {
399 connect_socks(
400 unit,
401 proxy.clone().unwrap(),
402 connect_deadline,
403 sock_addr,
404 hostname,
405 port,
406 proto.unwrap(),
407 )
408 } else if let Some(timeout) = timeout {
409 TcpStream::connect_timeout(&sock_addr, timeout)
410 } else {
411 TcpStream::connect(sock_addr)
412 };
413
414 if let Ok(stream) = stream {
415 any_stream_and_addr = Some((stream, sock_addr));
416 break;
417 } else if let Err(err) = stream {
418 any_err = Some(err);
419 }
420 }
421
422 let (mut stream, remote_addr) = if let Some(stream_and_addr) = any_stream_and_addr {
423 stream_and_addr
424 } else if let Some(e) = any_err {
425 return Err(ErrorKind::ConnectionFailed.msg("Connect error").src(e));
426 } else {
427 panic!("shouldn't happen: failed to connect to all IPs, but no error");
428 };
429
430 stream.set_nodelay(unit.agent.config.no_delay)?;
431
432 if let Some(deadline) = unit.deadline {
433 stream.set_read_timeout(Some(time_until_deadline(deadline)?))?;
434 } else {
435 stream.set_read_timeout(unit.agent.config.timeout_read)?;
436 }
437
438 if let Some(deadline) = unit.deadline {
439 stream.set_write_timeout(Some(time_until_deadline(deadline)?))?;
440 } else {
441 stream.set_write_timeout(unit.agent.config.timeout_write)?;
442 }
443
444 if proto == Some(Proto::HTTP) && unit.url.scheme() == "https" {
445 if let Some(ref proxy) = proxy {
446 write!(
447 stream,
448 "{}",
449 proxy.connect(hostname, port, &unit.agent.config.user_agent)
450 )
451 .unwrap();
452 stream.flush()?;
453
454 let s = stream.try_clone()?;
455 let pool_key = PoolKey::from_parts(unit.url.scheme(), hostname, port);
456 let pool_returner = PoolReturner::new(&unit.agent, pool_key);
457 let s = Stream::new(s, remote_addr, pool_returner);
458 let response = Response::do_from_stream(s, unit.clone())?;
459 Proxy::verify_response(&response)?;
460 }
461 }
462
463 Ok((stream, remote_addr))
464}
465
466#[cfg(feature = "socks-proxy")]
467fn socks_local_nslookup(
468 unit: &Unit,
469 hostname: &str,
470 port: u16,
471) -> Result<TargetAddr, std::io::Error> {
472 let addrs: Vec<SocketAddr> = unit
473 .resolver()
474 .resolve(&format!("{}:{}", hostname, port))
475 .map_err(|e| {
476 std::io::Error::new(io::ErrorKind::NotFound, format!("DNS failure: {}.", e))
477 })?;
478
479 if addrs.is_empty() {
480 return Err(std::io::Error::new(
481 io::ErrorKind::NotFound,
482 "DNS failure: no socket addrs found.",
483 ));
484 }
485
486 match addrs[0].to_target_addr() {
487 Ok(addr) => Ok(addr),
488 Err(err) => {
489 return Err(std::io::Error::new(
490 io::ErrorKind::NotFound,
491 format!("DNS failure: {}.", err),
492 ))
493 }
494 }
495}
496
497#[cfg(feature = "socks-proxy")]
498fn connect_socks(
499 unit: &Unit,
500 proxy: Proxy,
501 deadline: Option<Instant>,
502 proxy_addr: SocketAddr,
503 host: &str,
504 port: u16,
505 proto: Proto,
506) -> Result<TcpStream, std::io::Error> {
507 use socks::TargetAddr::Domain;
508 use std::net::{Ipv4Addr, Ipv6Addr};
509 use std::str::FromStr;
510
511 let host_addr = if Ipv4Addr::from_str(host).is_ok()
512 || Ipv6Addr::from_str(host).is_ok()
513 || proto == Proto::SOCKS4
514 {
515 match socks_local_nslookup(unit, host, port) {
516 Ok(addr) => addr,
517 Err(err) => return Err(err),
518 }
519 } else {
520 Domain(String::from(host), port)
521 };
522
523 // Since SocksXStream doesn't support set_read_timeout, a suboptimal one is implemented via
524 // thread::spawn.
525 // # Happy Path
526 // 1) thread spawns 2) get_socksX_stream returns ok 3) tx sends result ok
527 // 4) slave_signal signals done and cvar notifies master_signal 5) cvar.wait_timeout receives the done signal
528 // 6) rx receives the socks5 stream and the function exists
529 // # Sad path
530 // 1) get_socksX_stream hangs 2)slave_signal does not send done notification 3) cvar.wait_timeout times out
531 // 3) an exception is thrown.
532 // # Defects
533 // 1) In the event of a timeout, a thread may be left running in the background.
534 // TODO: explore supporting timeouts upstream in Socks5Proxy.
535 #[allow(clippy::mutex_atomic)]
536 let stream = if let Some(deadline) = deadline {
537 use std::sync::mpsc::channel;
538 use std::sync::{Arc, Condvar, Mutex};
539 use std::thread;
540 let master_signal = Arc::new((Mutex::new(false), Condvar::new()));
541 let slave_signal = master_signal.clone();
542 let (tx, rx) = channel();
543 thread::spawn(move || {
544 let (lock, cvar) = &*slave_signal;
545 if tx // try to get a socks stream and send it to the parent thread's rx
546 .send(if proto == Proto::SOCKS5 {
547 get_socks5_stream(&proxy, &proxy_addr, host_addr)
548 } else {
549 get_socks4_stream(&proxy_addr, host_addr)
550 })
551 .is_ok()
552 {
553 // if sending the stream has succeeded we need to notify the parent thread
554 let mut done = lock.lock().unwrap();
555 // set the done signal to true
556 *done = true;
557 // notify the parent thread
558 cvar.notify_one();
559 }
560 });
561
562 let (lock, cvar) = &*master_signal;
563 let done = lock.lock().unwrap();
564
565 let timeout_connect = time_until_deadline(deadline)?;
566 let done_result = cvar.wait_timeout(done, timeout_connect).unwrap();
567 let done = done_result.0;
568 if *done {
569 rx.recv().unwrap()?
570 } else {
571 return Err(io_err_timeout(format!(
572 "SOCKS proxy: {}:{} timed out connecting after {}ms.",
573 host,
574 port,
575 timeout_connect.as_millis()
576 )));
577 }
578 } else if proto == Proto::SOCKS5 {
579 get_socks5_stream(&proxy, &proxy_addr, host_addr)?
580 } else {
581 get_socks4_stream(&proxy_addr, host_addr)?
582 };
583
584 Ok(stream)
585}
586
587#[cfg(feature = "socks-proxy")]
588fn get_socks5_stream(
589 proxy: &Proxy,
590 proxy_addr: &SocketAddr,
591 host_addr: TargetAddr,
592) -> Result<TcpStream, std::io::Error> {
593 use socks::Socks5Stream;
594 if proxy.use_authorization() {
595 let stream = Socks5Stream::connect_with_password(
596 proxy_addr,
597 host_addr,
598 proxy.user.as_ref().unwrap(),
599 proxy.password.as_ref().unwrap(),
600 )?
601 .into_inner();
602 Ok(stream)
603 } else {
604 match Socks5Stream::connect(proxy_addr, host_addr) {
605 Ok(socks_stream) => Ok(socks_stream.into_inner()),
606 Err(err) => Err(err),
607 }
608 }
609}
610
611#[cfg(feature = "socks-proxy")]
612fn get_socks4_stream(
613 proxy_addr: &SocketAddr,
614 host_addr: TargetAddr,
615) -> Result<TcpStream, std::io::Error> {
616 match socks::Socks4Stream::connect(proxy_addr, host_addr, "") {
617 Ok(socks_stream) => Ok(socks_stream.into_inner()),
618 Err(err) => Err(err),
619 }
620}
621
622#[cfg(not(feature = "socks-proxy"))]
623fn connect_socks(
624 _unit: &Unit,
625 _proxy: Proxy,
626 _deadline: Option<Instant>,
627 _proxy_addr: SocketAddr,
628 _hostname: &str,
629 _port: u16,
630 _proto: Proto,
631) -> Result<TcpStream, std::io::Error> {
632 Err(std::io::Error::new(
633 kind:io::ErrorKind::Other,
634 error:"SOCKS feature disabled.",
635 ))
636}
637
638#[cfg(test)]
639pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
640 use crate::test;
641 test::resolve_handler(unit)
642}
643
644#[cfg(not(test))]
645pub(crate) fn connect_test(unit: &Unit) -> Result<Stream, Error> {
646 Err(ErrorKind::UnknownScheme.msg(format!("unknown scheme '{}'", unit.url.scheme())))
647}
648
649#[cfg(test)]
650pub(crate) fn remote_addr_for_test() -> SocketAddr {
651 use std::net::{Ipv4Addr, SocketAddrV4};
652 SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0).into()
653}
654
655#[cfg(test)]
656mod tests {
657 use super::*;
658 use std::{
659 io::Read,
660 sync::{Arc, Mutex},
661 };
662
663 // Returns all zeroes to `.read()` and logs how many times it's called
664 struct ReadRecorder {
665 reads: Arc<Mutex<Vec<usize>>>,
666 }
667
668 impl Read for ReadRecorder {
669 fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
670 self.reads.lock().unwrap().push(buf.len());
671 buf.fill(0);
672 Ok(buf.len())
673 }
674 }
675
676 impl Write for ReadRecorder {
677 fn write(&mut self, _: &[u8]) -> io::Result<usize> {
678 unimplemented!()
679 }
680
681 fn flush(&mut self) -> io::Result<()> {
682 unimplemented!()
683 }
684 }
685
686 impl fmt::Debug for ReadRecorder {
687 fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
688 unimplemented!()
689 }
690 }
691
692 impl ReadWrite for ReadRecorder {
693 fn socket(&self) -> Option<&TcpStream> {
694 unimplemented!()
695 }
696 }
697
698 // Test that when a DeadlineStream wraps a Stream, and the user performs a series of
699 // tiny read_exacts, Stream's BufReader is used appropriately.
700 #[test]
701 fn test_deadline_stream_buffering() {
702 let reads = Arc::new(Mutex::new(vec![]));
703 let recorder = ReadRecorder {
704 reads: reads.clone(),
705 };
706 let stream = Stream::new(recorder, remote_addr_for_test(), PoolReturner::none());
707 let mut deadline_stream = DeadlineStream::new(stream, None);
708 let mut buf = [0u8; 1];
709 for _ in 0..8193 {
710 let _ = deadline_stream.read(&mut buf).unwrap();
711 }
712 let reads = reads.lock().unwrap();
713 assert_eq!(reads.len(), 2);
714 assert_eq!(reads[0], 8192);
715 assert_eq!(reads[1], 8192);
716 }
717}
718