1 | use crate::codec::UserError; |
2 | use crate::frame::{Reason, StreamId}; |
3 | use crate::{client, server}; |
4 | |
5 | use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; |
6 | use crate::proto::*; |
7 | |
8 | use bytes::Bytes; |
9 | use futures_core::Stream; |
10 | use std::io; |
11 | use std::marker::PhantomData; |
12 | use std::pin::Pin; |
13 | use std::task::{Context, Poll}; |
14 | use std::time::Duration; |
15 | use tokio::io::AsyncRead; |
16 | |
17 | /// An H2 connection |
18 | #[derive (Debug)] |
19 | pub(crate) struct Connection<T, P, B: Buf = Bytes> |
20 | where |
21 | P: Peer, |
22 | { |
23 | /// Read / write frame values |
24 | codec: Codec<T, Prioritized<B>>, |
25 | |
26 | inner: ConnectionInner<P, B>, |
27 | } |
28 | |
29 | // Extracted part of `Connection` which does not depend on `T`. Reduces the amount of duplicated |
30 | // method instantiations. |
31 | #[derive (Debug)] |
32 | struct ConnectionInner<P, B: Buf = Bytes> |
33 | where |
34 | P: Peer, |
35 | { |
36 | /// Tracks the connection level state transitions. |
37 | state: State, |
38 | |
39 | /// An error to report back once complete. |
40 | /// |
41 | /// This exists separately from State in order to support |
42 | /// graceful shutdown. |
43 | error: Option<frame::GoAway>, |
44 | |
45 | /// Pending GOAWAY frames to write. |
46 | go_away: GoAway, |
47 | |
48 | /// Ping/pong handler |
49 | ping_pong: PingPong, |
50 | |
51 | /// Connection settings |
52 | settings: Settings, |
53 | |
54 | /// Stream state handler |
55 | streams: Streams<B, P>, |
56 | |
57 | /// A `tracing` span tracking the lifetime of the connection. |
58 | span: tracing::Span, |
59 | |
60 | /// Client or server |
61 | _phantom: PhantomData<P>, |
62 | } |
63 | |
64 | struct DynConnection<'a, B: Buf = Bytes> { |
65 | state: &'a mut State, |
66 | |
67 | go_away: &'a mut GoAway, |
68 | |
69 | streams: DynStreams<'a, B>, |
70 | |
71 | error: &'a mut Option<frame::GoAway>, |
72 | |
73 | ping_pong: &'a mut PingPong, |
74 | } |
75 | |
76 | #[derive (Debug, Clone)] |
77 | pub(crate) struct Config { |
78 | pub next_stream_id: StreamId, |
79 | pub initial_max_send_streams: usize, |
80 | pub max_send_buffer_size: usize, |
81 | pub reset_stream_duration: Duration, |
82 | pub reset_stream_max: usize, |
83 | pub remote_reset_stream_max: usize, |
84 | pub local_error_reset_streams_max: Option<usize>, |
85 | pub settings: frame::Settings, |
86 | } |
87 | |
88 | #[derive (Debug)] |
89 | enum State { |
90 | /// Currently open in a sane state |
91 | Open, |
92 | |
93 | /// The codec must be flushed |
94 | Closing(Reason, Initiator), |
95 | |
96 | /// In a closed state |
97 | Closed(Reason, Initiator), |
98 | } |
99 | |
100 | impl<T, P, B> Connection<T, P, B> |
101 | where |
102 | T: AsyncRead + AsyncWrite + Unpin, |
103 | P: Peer, |
104 | B: Buf, |
105 | { |
106 | pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> { |
107 | fn streams_config(config: &Config) -> streams::Config { |
108 | streams::Config { |
109 | initial_max_send_streams: config.initial_max_send_streams, |
110 | local_max_buffer_size: config.max_send_buffer_size, |
111 | local_next_stream_id: config.next_stream_id, |
112 | local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), |
113 | extended_connect_protocol_enabled: config |
114 | .settings |
115 | .is_extended_connect_protocol_enabled() |
116 | .unwrap_or(false), |
117 | local_reset_duration: config.reset_stream_duration, |
118 | local_reset_max: config.reset_stream_max, |
119 | remote_reset_max: config.remote_reset_stream_max, |
120 | remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, |
121 | remote_max_initiated: config |
122 | .settings |
123 | .max_concurrent_streams() |
124 | .map(|max| max as usize), |
125 | local_max_error_reset_streams: config.local_error_reset_streams_max, |
126 | } |
127 | } |
128 | let streams = Streams::new(streams_config(&config)); |
129 | Connection { |
130 | codec, |
131 | inner: ConnectionInner { |
132 | state: State::Open, |
133 | error: None, |
134 | go_away: GoAway::new(), |
135 | ping_pong: PingPong::new(), |
136 | settings: Settings::new(config.settings), |
137 | streams, |
138 | span: tracing::debug_span!("Connection" , peer = %P::NAME), |
139 | _phantom: PhantomData, |
140 | }, |
141 | } |
142 | } |
143 | |
144 | /// connection flow control |
145 | pub(crate) fn set_target_window_size(&mut self, size: WindowSize) { |
146 | let _res = self.inner.streams.set_target_connection_window_size(size); |
147 | // TODO: proper error handling |
148 | debug_assert!(_res.is_ok()); |
149 | } |
150 | |
151 | /// Send a new SETTINGS frame with an updated initial window size. |
152 | pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> { |
153 | let mut settings = frame::Settings::default(); |
154 | settings.set_initial_window_size(Some(size)); |
155 | self.inner.settings.send_settings(settings) |
156 | } |
157 | |
158 | /// Send a new SETTINGS frame with extended CONNECT protocol enabled. |
159 | pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> { |
160 | let mut settings = frame::Settings::default(); |
161 | settings.set_enable_connect_protocol(Some(1)); |
162 | self.inner.settings.send_settings(settings) |
163 | } |
164 | |
165 | /// Returns the maximum number of concurrent streams that may be initiated |
166 | /// by this peer. |
167 | pub(crate) fn max_send_streams(&self) -> usize { |
168 | self.inner.streams.max_send_streams() |
169 | } |
170 | |
171 | /// Returns the maximum number of concurrent streams that may be initiated |
172 | /// by the remote peer. |
173 | pub(crate) fn max_recv_streams(&self) -> usize { |
174 | self.inner.streams.max_recv_streams() |
175 | } |
176 | |
177 | #[cfg (feature = "unstable" )] |
178 | pub fn num_wired_streams(&self) -> usize { |
179 | self.inner.streams.num_wired_streams() |
180 | } |
181 | |
182 | /// Returns `Ready` when the connection is ready to receive a frame. |
183 | /// |
184 | /// Returns `Error` as this may raise errors that are caused by delayed |
185 | /// processing of received frames. |
186 | fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> { |
187 | let _e = self.inner.span.enter(); |
188 | let span = tracing::trace_span!("poll_ready" ); |
189 | let _e = span.enter(); |
190 | // The order of these calls don't really matter too much |
191 | ready!(self.inner.ping_pong.send_pending_pong(cx, &mut self.codec))?; |
192 | ready!(self.inner.ping_pong.send_pending_ping(cx, &mut self.codec))?; |
193 | ready!(self |
194 | .inner |
195 | .settings |
196 | .poll_send(cx, &mut self.codec, &mut self.inner.streams))?; |
197 | ready!(self.inner.streams.send_pending_refusal(cx, &mut self.codec))?; |
198 | |
199 | Poll::Ready(Ok(())) |
200 | } |
201 | |
202 | /// Send any pending GOAWAY frames. |
203 | /// |
204 | /// This will return `Some(reason)` if the connection should be closed |
205 | /// afterwards. If this is a graceful shutdown, this returns `None`. |
206 | fn poll_go_away(&mut self, cx: &mut Context) -> Poll<Option<io::Result<Reason>>> { |
207 | self.inner.go_away.send_pending_go_away(cx, &mut self.codec) |
208 | } |
209 | |
210 | pub fn go_away_from_user(&mut self, e: Reason) { |
211 | self.inner.as_dyn().go_away_from_user(e) |
212 | } |
213 | |
214 | fn take_error(&mut self, ours: Reason, initiator: Initiator) -> Result<(), Error> { |
215 | let (debug_data, theirs) = self |
216 | .inner |
217 | .error |
218 | .take() |
219 | .as_ref() |
220 | .map_or((Bytes::new(), Reason::NO_ERROR), |frame| { |
221 | (frame.debug_data().clone(), frame.reason()) |
222 | }); |
223 | |
224 | match (ours, theirs) { |
225 | (Reason::NO_ERROR, Reason::NO_ERROR) => Ok(()), |
226 | (ours, Reason::NO_ERROR) => Err(Error::GoAway(Bytes::new(), ours, initiator)), |
227 | // If both sides reported an error, give their |
228 | // error back to th user. We assume our error |
229 | // was a consequence of their error, and less |
230 | // important. |
231 | (_, theirs) => Err(Error::remote_go_away(debug_data, theirs)), |
232 | } |
233 | } |
234 | |
235 | /// Closes the connection by transitioning to a GOAWAY state |
236 | /// iff there are no streams or references |
237 | pub fn maybe_close_connection_if_no_streams(&mut self) { |
238 | // If we poll() and realize that there are no streams or references |
239 | // then we can close the connection by transitioning to GOAWAY |
240 | if !self.inner.streams.has_streams_or_other_references() { |
241 | self.inner.as_dyn().go_away_now(Reason::NO_ERROR); |
242 | } |
243 | } |
244 | |
245 | /// Checks if there are any streams |
246 | pub fn has_streams(&self) -> bool { |
247 | self.inner.streams.has_streams() |
248 | } |
249 | |
250 | /// Checks if there are any streams or references left |
251 | pub fn has_streams_or_other_references(&self) -> bool { |
252 | // If we poll() and realize that there are no streams or references |
253 | // then we can close the connection by transitioning to GOAWAY |
254 | self.inner.streams.has_streams_or_other_references() |
255 | } |
256 | |
257 | pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> { |
258 | self.inner.ping_pong.take_user_pings() |
259 | } |
260 | |
261 | /// Advances the internal state of the connection. |
262 | pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> { |
263 | // XXX(eliza): cloning the span is unfortunately necessary here in |
264 | // order to placate the borrow checker — `self` is mutably borrowed by |
265 | // `poll2`, which means that we can't borrow `self.span` to enter it. |
266 | // The clone is just an atomic ref bump. |
267 | let span = self.inner.span.clone(); |
268 | let _e = span.enter(); |
269 | let span = tracing::trace_span!("poll" ); |
270 | let _e = span.enter(); |
271 | |
272 | loop { |
273 | tracing::trace!(connection.state = ?self.inner.state); |
274 | // TODO: probably clean up this glob of code |
275 | match self.inner.state { |
276 | // When open, continue to poll a frame |
277 | State::Open => { |
278 | let result = match self.poll2(cx) { |
279 | Poll::Ready(result) => result, |
280 | // The connection is not ready to make progress |
281 | Poll::Pending => { |
282 | // Ensure all window updates have been sent. |
283 | // |
284 | // This will also handle flushing `self.codec` |
285 | ready!(self.inner.streams.poll_complete(cx, &mut self.codec))?; |
286 | |
287 | if (self.inner.error.is_some() |
288 | || self.inner.go_away.should_close_on_idle()) |
289 | && !self.inner.streams.has_streams() |
290 | { |
291 | self.inner.as_dyn().go_away_now(Reason::NO_ERROR); |
292 | continue; |
293 | } |
294 | |
295 | return Poll::Pending; |
296 | } |
297 | }; |
298 | |
299 | self.inner.as_dyn().handle_poll2_result(result)? |
300 | } |
301 | State::Closing(reason, initiator) => { |
302 | tracing::trace!("connection closing after flush" ); |
303 | // Flush/shutdown the codec |
304 | ready!(self.codec.shutdown(cx))?; |
305 | |
306 | // Transition the state to error |
307 | self.inner.state = State::Closed(reason, initiator); |
308 | } |
309 | State::Closed(reason, initiator) => { |
310 | return Poll::Ready(self.take_error(reason, initiator)); |
311 | } |
312 | } |
313 | } |
314 | } |
315 | |
316 | fn poll2(&mut self, cx: &mut Context) -> Poll<Result<(), Error>> { |
317 | // This happens outside of the loop to prevent needing to do a clock |
318 | // check and then comparison of the queue possibly multiple times a |
319 | // second (and thus, the clock wouldn't have changed enough to matter). |
320 | self.clear_expired_reset_streams(); |
321 | |
322 | loop { |
323 | // First, ensure that the `Connection` is able to receive a frame |
324 | // |
325 | // The order here matters: |
326 | // - poll_go_away may buffer a graceful shutdown GOAWAY frame |
327 | // - If it has, we've also added a PING to be sent in poll_ready |
328 | if let Some(reason) = ready!(self.poll_go_away(cx)?) { |
329 | if self.inner.go_away.should_close_now() { |
330 | if self.inner.go_away.is_user_initiated() { |
331 | // A user initiated abrupt shutdown shouldn't return |
332 | // the same error back to the user. |
333 | return Poll::Ready(Ok(())); |
334 | } else { |
335 | return Poll::Ready(Err(Error::library_go_away(reason))); |
336 | } |
337 | } |
338 | // Only NO_ERROR should be waiting for idle |
339 | debug_assert_eq!( |
340 | reason, |
341 | Reason::NO_ERROR, |
342 | "graceful GOAWAY should be NO_ERROR" |
343 | ); |
344 | } |
345 | ready!(self.poll_ready(cx))?; |
346 | |
347 | match self |
348 | .inner |
349 | .as_dyn() |
350 | .recv_frame(ready!(Pin::new(&mut self.codec).poll_next(cx)?))? |
351 | { |
352 | ReceivedFrame::Settings(frame) => { |
353 | self.inner.settings.recv_settings( |
354 | frame, |
355 | &mut self.codec, |
356 | &mut self.inner.streams, |
357 | )?; |
358 | } |
359 | ReceivedFrame::Continue => (), |
360 | ReceivedFrame::Done => { |
361 | return Poll::Ready(Ok(())); |
362 | } |
363 | } |
364 | } |
365 | } |
366 | |
367 | fn clear_expired_reset_streams(&mut self) { |
368 | self.inner.streams.clear_expired_reset_streams(); |
369 | } |
370 | } |
371 | |
372 | impl<P, B> ConnectionInner<P, B> |
373 | where |
374 | P: Peer, |
375 | B: Buf, |
376 | { |
377 | fn as_dyn(&mut self) -> DynConnection<'_, B> { |
378 | let ConnectionInner { |
379 | state: &mut State, |
380 | go_away: &mut GoAway, |
381 | streams: &mut Streams, |
382 | error: &mut Option, |
383 | ping_pong: &mut PingPong, |
384 | .. |
385 | } = self; |
386 | let streams: DynStreams<'_, B> = streams.as_dyn(); |
387 | DynConnection { |
388 | state, |
389 | go_away, |
390 | streams, |
391 | error, |
392 | ping_pong, |
393 | } |
394 | } |
395 | } |
396 | |
397 | impl<B> DynConnection<'_, B> |
398 | where |
399 | B: Buf, |
400 | { |
401 | fn go_away(&mut self, id: StreamId, e: Reason) { |
402 | let frame = frame::GoAway::new(id, e); |
403 | self.streams.send_go_away(id); |
404 | self.go_away.go_away(frame); |
405 | } |
406 | |
407 | fn go_away_now(&mut self, e: Reason) { |
408 | let last_processed_id = self.streams.last_processed_id(); |
409 | let frame = frame::GoAway::new(last_processed_id, e); |
410 | self.go_away.go_away_now(frame); |
411 | } |
412 | |
413 | fn go_away_now_data(&mut self, e: Reason, data: Bytes) { |
414 | let last_processed_id = self.streams.last_processed_id(); |
415 | let frame = frame::GoAway::with_debug_data(last_processed_id, e, data); |
416 | self.go_away.go_away_now(frame); |
417 | } |
418 | |
419 | fn go_away_from_user(&mut self, e: Reason) { |
420 | let last_processed_id = self.streams.last_processed_id(); |
421 | let frame = frame::GoAway::new(last_processed_id, e); |
422 | self.go_away.go_away_from_user(frame); |
423 | |
424 | // Notify all streams of reason we're abruptly closing. |
425 | self.streams.handle_error(Error::user_go_away(e)); |
426 | } |
427 | |
428 | fn handle_poll2_result(&mut self, result: Result<(), Error>) -> Result<(), Error> { |
429 | match result { |
430 | // The connection has shutdown normally |
431 | Ok(()) => { |
432 | *self.state = State::Closing(Reason::NO_ERROR, Initiator::Library); |
433 | Ok(()) |
434 | } |
435 | // Attempting to read a frame resulted in a connection level |
436 | // error. This is handled by setting a GOAWAY frame followed by |
437 | // terminating the connection. |
438 | Err(Error::GoAway(debug_data, reason, initiator)) => { |
439 | let e = Error::GoAway(debug_data.clone(), reason, initiator); |
440 | tracing::debug!(error = ?e, "Connection::poll; connection error" ); |
441 | |
442 | // We may have already sent a GOAWAY for this error, |
443 | // if so, don't send another, just flush and close up. |
444 | if self |
445 | .go_away |
446 | .going_away() |
447 | .map_or(false, |frame| frame.reason() == reason) |
448 | { |
449 | tracing::trace!(" -> already going away" ); |
450 | *self.state = State::Closing(reason, initiator); |
451 | return Ok(()); |
452 | } |
453 | |
454 | // Reset all active streams |
455 | self.streams.handle_error(e); |
456 | self.go_away_now_data(reason, debug_data); |
457 | Ok(()) |
458 | } |
459 | // Attempting to read a frame resulted in a stream level error. |
460 | // This is handled by resetting the frame then trying to read |
461 | // another frame. |
462 | Err(Error::Reset(id, reason, initiator)) => { |
463 | debug_assert_eq!(initiator, Initiator::Library); |
464 | tracing::trace!(?id, ?reason, "stream error" ); |
465 | self.streams.send_reset(id, reason); |
466 | Ok(()) |
467 | } |
468 | // Attempting to read a frame resulted in an I/O error. All |
469 | // active streams must be reset. |
470 | // |
471 | // TODO: Are I/O errors recoverable? |
472 | Err(Error::Io(kind, inner)) => { |
473 | tracing::debug!(error = ?kind, "Connection::poll; IO error" ); |
474 | let e = Error::Io(kind, inner); |
475 | |
476 | // Reset all active streams |
477 | self.streams.handle_error(e.clone()); |
478 | |
479 | // Some client implementations drop the connections without notifying its peer |
480 | // Attempting to read after the client dropped the connection results in UnexpectedEof |
481 | // If as a server, we don't have anything more to send, just close the connection |
482 | // without error |
483 | // |
484 | // See https://github.com/hyperium/hyper/issues/3427 |
485 | if self.streams.is_server() |
486 | && self.streams.is_buffer_empty() |
487 | && matches!(kind, io::ErrorKind::UnexpectedEof) |
488 | { |
489 | *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library); |
490 | return Ok(()); |
491 | } |
492 | |
493 | // Return the error |
494 | Err(e) |
495 | } |
496 | } |
497 | } |
498 | |
499 | fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> { |
500 | use crate::frame::Frame::*; |
501 | match frame { |
502 | Some(Headers(frame)) => { |
503 | tracing::trace!(?frame, "recv HEADERS" ); |
504 | self.streams.recv_headers(frame)?; |
505 | } |
506 | Some(Data(frame)) => { |
507 | tracing::trace!(?frame, "recv DATA" ); |
508 | self.streams.recv_data(frame)?; |
509 | } |
510 | Some(Reset(frame)) => { |
511 | tracing::trace!(?frame, "recv RST_STREAM" ); |
512 | self.streams.recv_reset(frame)?; |
513 | } |
514 | Some(PushPromise(frame)) => { |
515 | tracing::trace!(?frame, "recv PUSH_PROMISE" ); |
516 | self.streams.recv_push_promise(frame)?; |
517 | } |
518 | Some(Settings(frame)) => { |
519 | tracing::trace!(?frame, "recv SETTINGS" ); |
520 | return Ok(ReceivedFrame::Settings(frame)); |
521 | } |
522 | Some(GoAway(frame)) => { |
523 | tracing::trace!(?frame, "recv GOAWAY" ); |
524 | // This should prevent starting new streams, |
525 | // but should allow continuing to process current streams |
526 | // until they are all EOS. Once they are, State should |
527 | // transition to GoAway. |
528 | self.streams.recv_go_away(&frame)?; |
529 | *self.error = Some(frame); |
530 | } |
531 | Some(Ping(frame)) => { |
532 | tracing::trace!(?frame, "recv PING" ); |
533 | let status = self.ping_pong.recv_ping(frame); |
534 | if status.is_shutdown() { |
535 | assert!( |
536 | self.go_away.is_going_away(), |
537 | "received unexpected shutdown ping" |
538 | ); |
539 | |
540 | let last_processed_id = self.streams.last_processed_id(); |
541 | self.go_away(last_processed_id, Reason::NO_ERROR); |
542 | } |
543 | } |
544 | Some(WindowUpdate(frame)) => { |
545 | tracing::trace!(?frame, "recv WINDOW_UPDATE" ); |
546 | self.streams.recv_window_update(frame)?; |
547 | } |
548 | Some(Priority(frame)) => { |
549 | tracing::trace!(?frame, "recv PRIORITY" ); |
550 | // TODO: handle |
551 | } |
552 | None => { |
553 | tracing::trace!("codec closed" ); |
554 | self.streams.recv_eof(false).expect("mutex poisoned" ); |
555 | return Ok(ReceivedFrame::Done); |
556 | } |
557 | } |
558 | Ok(ReceivedFrame::Continue) |
559 | } |
560 | } |
561 | |
562 | enum ReceivedFrame { |
563 | Settings(frame::Settings), |
564 | Continue, |
565 | Done, |
566 | } |
567 | |
568 | impl<T, B> Connection<T, client::Peer, B> |
569 | where |
570 | T: AsyncRead + AsyncWrite, |
571 | B: Buf, |
572 | { |
573 | pub(crate) fn streams(&self) -> &Streams<B, client::Peer> { |
574 | &self.inner.streams |
575 | } |
576 | } |
577 | |
578 | impl<T, B> Connection<T, server::Peer, B> |
579 | where |
580 | T: AsyncRead + AsyncWrite + Unpin, |
581 | B: Buf, |
582 | { |
583 | pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { |
584 | self.inner.streams.next_incoming() |
585 | } |
586 | |
587 | // Graceful shutdown only makes sense for server peers. |
588 | pub fn go_away_gracefully(&mut self) { |
589 | if self.inner.go_away.is_going_away() { |
590 | // No reason to start a new one. |
591 | return; |
592 | } |
593 | |
594 | // According to http://httpwg.org/specs/rfc7540.html#GOAWAY: |
595 | // |
596 | // > A server that is attempting to gracefully shut down a connection |
597 | // > SHOULD send an initial GOAWAY frame with the last stream |
598 | // > identifier set to 2^31-1 and a NO_ERROR code. This signals to the |
599 | // > client that a shutdown is imminent and that initiating further |
600 | // > requests is prohibited. After allowing time for any in-flight |
601 | // > stream creation (at least one round-trip time), the server can |
602 | // > send another GOAWAY frame with an updated last stream identifier. |
603 | // > This ensures that a connection can be cleanly shut down without |
604 | // > losing requests. |
605 | self.inner.as_dyn().go_away(StreamId::MAX, Reason::NO_ERROR); |
606 | |
607 | // We take the advice of waiting 1 RTT literally, and wait |
608 | // for a pong before proceeding. |
609 | self.inner.ping_pong.ping_shutdown(); |
610 | } |
611 | } |
612 | |
613 | impl<T, P, B> Drop for Connection<T, P, B> |
614 | where |
615 | P: Peer, |
616 | B: Buf, |
617 | { |
618 | fn drop(&mut self) { |
619 | // Ignore errors as this indicates that the mutex is poisoned. |
620 | let _ = self.inner.streams.recv_eof(clear_pending_accept:true); |
621 | } |
622 | } |
623 | |