1use super::*;
2use crate::codec::UserError;
3use crate::frame::{self, PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
4use crate::proto::{self, Error};
5
6use http::{HeaderMap, Request, Response};
7
8use std::cmp::Ordering;
9use std::io;
10use std::task::{Context, Poll, Waker};
11use std::time::{Duration, Instant};
12
13#[derive(Debug)]
14pub(super) struct Recv {
15 /// Initial window size of remote initiated streams
16 init_window_sz: WindowSize,
17
18 /// Connection level flow control governing received data
19 flow: FlowControl,
20
21 /// Amount of connection window capacity currently used by outstanding streams.
22 in_flight_data: WindowSize,
23
24 /// The lowest stream ID that is still idle
25 next_stream_id: Result<StreamId, StreamIdOverflow>,
26
27 /// The stream ID of the last processed stream
28 last_processed_id: StreamId,
29
30 /// Any streams with a higher ID are ignored.
31 ///
32 /// This starts as MAX, but is lowered when a GOAWAY is received.
33 ///
34 /// > After sending a GOAWAY frame, the sender can discard frames for
35 /// > streams initiated by the receiver with identifiers higher than
36 /// > the identified last stream.
37 max_stream_id: StreamId,
38
39 /// Streams that have pending window updates
40 pending_window_updates: store::Queue<stream::NextWindowUpdate>,
41
42 /// New streams to be accepted
43 pending_accept: store::Queue<stream::NextAccept>,
44
45 /// Locally reset streams that should be reaped when they expire
46 pending_reset_expired: store::Queue<stream::NextResetExpire>,
47
48 /// How long locally reset streams should ignore received frames
49 reset_duration: Duration,
50
51 /// Holds frames that are waiting to be read
52 buffer: Buffer<Event>,
53
54 /// Refused StreamId, this represents a frame that must be sent out.
55 refused: Option<StreamId>,
56
57 /// If push promises are allowed to be received.
58 is_push_enabled: bool,
59
60 /// If extended connect protocol is enabled.
61 is_extended_connect_protocol_enabled: bool,
62}
63
64#[derive(Debug)]
65pub(super) enum Event {
66 Headers(peer::PollMessage),
67 Data(Bytes),
68 Trailers(HeaderMap),
69}
70
71#[derive(Debug)]
72pub(super) enum RecvHeaderBlockError<T> {
73 Oversize(T),
74 State(Error),
75}
76
77#[derive(Debug)]
78pub(crate) enum Open {
79 PushPromise,
80 Headers,
81}
82
83impl Recv {
84 pub fn new(peer: peer::Dyn, config: &Config) -> Self {
85 let next_stream_id = if peer.is_server() { 1 } else { 2 };
86
87 let mut flow = FlowControl::new();
88
89 // connections always have the default window size, regardless of
90 // settings
91 flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
92 .expect("invalid initial remote window size");
93 flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
94
95 Recv {
96 init_window_sz: config.local_init_window_sz,
97 flow,
98 in_flight_data: 0 as WindowSize,
99 next_stream_id: Ok(next_stream_id.into()),
100 pending_window_updates: store::Queue::new(),
101 last_processed_id: StreamId::ZERO,
102 max_stream_id: StreamId::MAX,
103 pending_accept: store::Queue::new(),
104 pending_reset_expired: store::Queue::new(),
105 reset_duration: config.local_reset_duration,
106 buffer: Buffer::new(),
107 refused: None,
108 is_push_enabled: config.local_push_enabled,
109 is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
110 }
111 }
112
113 /// Returns the initial receive window size
114 pub fn init_window_sz(&self) -> WindowSize {
115 self.init_window_sz
116 }
117
118 /// Returns the ID of the last processed stream
119 pub fn last_processed_id(&self) -> StreamId {
120 self.last_processed_id
121 }
122
123 /// Update state reflecting a new, remotely opened stream
124 ///
125 /// Returns the stream state if successful. `None` if refused
126 pub fn open(
127 &mut self,
128 id: StreamId,
129 mode: Open,
130 counts: &mut Counts,
131 ) -> Result<Option<StreamId>, Error> {
132 assert!(self.refused.is_none());
133
134 counts.peer().ensure_can_open(id, mode)?;
135
136 let next_id = self.next_stream_id()?;
137 if id < next_id {
138 proto_err!(conn: "id ({:?}) < next_id ({:?})", id, next_id);
139 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
140 }
141
142 self.next_stream_id = id.next_id();
143
144 if !counts.can_inc_num_recv_streams() {
145 self.refused = Some(id);
146 return Ok(None);
147 }
148
149 Ok(Some(id))
150 }
151
152 /// Transition the stream state based on receiving headers
153 ///
154 /// The caller ensures that the frame represents headers and not trailers.
155 pub fn recv_headers(
156 &mut self,
157 frame: frame::Headers,
158 stream: &mut store::Ptr,
159 counts: &mut Counts,
160 ) -> Result<(), RecvHeaderBlockError<Option<frame::Headers>>> {
161 tracing::trace!("opening stream; init_window={}", self.init_window_sz);
162 let is_initial = stream.state.recv_open(&frame)?;
163
164 if is_initial {
165 // TODO: be smarter about this logic
166 if frame.stream_id() > self.last_processed_id {
167 self.last_processed_id = frame.stream_id();
168 }
169
170 // Increment the number of concurrent streams
171 counts.inc_num_recv_streams(stream);
172 }
173
174 if !stream.content_length.is_head() {
175 use super::stream::ContentLength;
176 use http::header;
177
178 if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
179 let content_length = match frame::parse_u64(content_length.as_bytes()) {
180 Ok(v) => v,
181 Err(_) => {
182 proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
183 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
184 }
185 };
186
187 stream.content_length = ContentLength::Remaining(content_length);
188 }
189 }
190
191 if frame.is_over_size() {
192 // A frame is over size if the decoded header block was bigger than
193 // SETTINGS_MAX_HEADER_LIST_SIZE.
194 //
195 // > A server that receives a larger header block than it is willing
196 // > to handle can send an HTTP 431 (Request Header Fields Too
197 // > Large) status code [RFC6585]. A client can discard responses
198 // > that it cannot process.
199 //
200 // So, if peer is a server, we'll send a 431. In either case,
201 // an error is recorded, which will send a REFUSED_STREAM,
202 // since we don't want any of the data frames either.
203 tracing::debug!(
204 "stream error REQUEST_HEADER_FIELDS_TOO_LARGE -- \
205 recv_headers: frame is over size; stream={:?}",
206 stream.id
207 );
208 return if counts.peer().is_server() && is_initial {
209 let mut res = frame::Headers::new(
210 stream.id,
211 frame::Pseudo::response(::http::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE),
212 HeaderMap::new(),
213 );
214 res.set_end_stream();
215 Err(RecvHeaderBlockError::Oversize(Some(res)))
216 } else {
217 Err(RecvHeaderBlockError::Oversize(None))
218 };
219 }
220
221 let stream_id = frame.stream_id();
222 let (pseudo, fields) = frame.into_parts();
223
224 if pseudo.protocol.is_some()
225 && counts.peer().is_server()
226 && !self.is_extended_connect_protocol_enabled
227 {
228 proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
229 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
230 }
231
232 if pseudo.status.is_some() && counts.peer().is_server() {
233 proto_err!(stream: "cannot use :status header for requests; stream={:?}", stream.id);
234 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
235 }
236
237 if !pseudo.is_informational() {
238 let message = counts
239 .peer()
240 .convert_poll_message(pseudo, fields, stream_id)?;
241
242 // Push the frame onto the stream's recv buffer
243 stream
244 .pending_recv
245 .push_back(&mut self.buffer, Event::Headers(message));
246 stream.notify_recv();
247
248 // Only servers can receive a headers frame that initiates the stream.
249 // This is verified in `Streams` before calling this function.
250 if counts.peer().is_server() {
251 // Correctness: never push a stream to `pending_accept` without having the
252 // corresponding headers frame pushed to `stream.pending_recv`.
253 self.pending_accept.push(stream);
254 }
255 }
256
257 Ok(())
258 }
259
260 /// Called by the server to get the request
261 ///
262 /// # Panics
263 ///
264 /// Panics if `stream.pending_recv` has no `Event::Headers` queued.
265 ///
266 pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> {
267 use super::peer::PollMessage::*;
268
269 match stream.pending_recv.pop_front(&mut self.buffer) {
270 Some(Event::Headers(Server(request))) => request,
271 _ => unreachable!("server stream queue must start with Headers"),
272 }
273 }
274
275 /// Called by the client to get pushed response
276 pub fn poll_pushed(
277 &mut self,
278 cx: &Context,
279 stream: &mut store::Ptr,
280 ) -> Poll<Option<Result<(Request<()>, store::Key), proto::Error>>> {
281 use super::peer::PollMessage::*;
282
283 let mut ppp = stream.pending_push_promises.take();
284 let pushed = ppp.pop(stream.store_mut()).map(|mut pushed| {
285 match pushed.pending_recv.pop_front(&mut self.buffer) {
286 Some(Event::Headers(Server(headers))) => (headers, pushed.key()),
287 // When frames are pushed into the queue, it is verified that
288 // the first frame is a HEADERS frame.
289 _ => panic!("Headers not set on pushed stream"),
290 }
291 });
292 stream.pending_push_promises = ppp;
293 if let Some(p) = pushed {
294 Poll::Ready(Some(Ok(p)))
295 } else {
296 let is_open = stream.state.ensure_recv_open()?;
297
298 if is_open {
299 stream.recv_task = Some(cx.waker().clone());
300 Poll::Pending
301 } else {
302 Poll::Ready(None)
303 }
304 }
305 }
306
307 /// Called by the client to get the response
308 pub fn poll_response(
309 &mut self,
310 cx: &Context,
311 stream: &mut store::Ptr,
312 ) -> Poll<Result<Response<()>, proto::Error>> {
313 use super::peer::PollMessage::*;
314
315 // If the buffer is not empty, then the first frame must be a HEADERS
316 // frame or the user violated the contract.
317 match stream.pending_recv.pop_front(&mut self.buffer) {
318 Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
319 Some(_) => panic!("poll_response called after response returned"),
320 None => {
321 if !stream.state.ensure_recv_open()? {
322 proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
323 return Poll::Ready(Err(Error::library_reset(
324 stream.id,
325 Reason::PROTOCOL_ERROR,
326 )));
327 }
328
329 stream.recv_task = Some(cx.waker().clone());
330 Poll::Pending
331 }
332 }
333 }
334
335 /// Transition the stream based on receiving trailers
336 pub fn recv_trailers(
337 &mut self,
338 frame: frame::Headers,
339 stream: &mut store::Ptr,
340 ) -> Result<(), Error> {
341 // Transition the state
342 stream.state.recv_close()?;
343
344 if stream.ensure_content_length_zero().is_err() {
345 proto_err!(stream: "recv_trailers: content-length is not zero; stream={:?};", stream.id);
346 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
347 }
348
349 let trailers = frame.into_fields();
350
351 // Push the frame onto the stream's recv buffer
352 stream
353 .pending_recv
354 .push_back(&mut self.buffer, Event::Trailers(trailers));
355 stream.notify_recv();
356
357 Ok(())
358 }
359
360 /// Releases capacity of the connection
361 pub fn release_connection_capacity(&mut self, capacity: WindowSize, task: &mut Option<Waker>) {
362 tracing::trace!(
363 "release_connection_capacity; size={}, connection in_flight_data={}",
364 capacity,
365 self.in_flight_data,
366 );
367
368 // Decrement in-flight data
369 self.in_flight_data -= capacity;
370
371 // Assign capacity to connection
372 // TODO: proper error handling
373 let _res = self.flow.assign_capacity(capacity);
374 debug_assert!(_res.is_ok());
375
376 if self.flow.unclaimed_capacity().is_some() {
377 if let Some(task) = task.take() {
378 task.wake();
379 }
380 }
381 }
382
383 /// Releases capacity back to the connection & stream
384 pub fn release_capacity(
385 &mut self,
386 capacity: WindowSize,
387 stream: &mut store::Ptr,
388 task: &mut Option<Waker>,
389 ) -> Result<(), UserError> {
390 tracing::trace!("release_capacity; size={}", capacity);
391
392 if capacity > stream.in_flight_recv_data {
393 return Err(UserError::ReleaseCapacityTooBig);
394 }
395
396 self.release_connection_capacity(capacity, task);
397
398 // Decrement in-flight data
399 stream.in_flight_recv_data -= capacity;
400
401 // Assign capacity to stream
402 // TODO: proper error handling
403 let _res = stream.recv_flow.assign_capacity(capacity);
404 debug_assert!(_res.is_ok());
405
406 if stream.recv_flow.unclaimed_capacity().is_some() {
407 // Queue the stream for sending the WINDOW_UPDATE frame.
408 self.pending_window_updates.push(stream);
409
410 if let Some(task) = task.take() {
411 task.wake();
412 }
413 }
414
415 Ok(())
416 }
417
418 /// Release any unclaimed capacity for a closed stream.
419 pub fn release_closed_capacity(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
420 debug_assert_eq!(stream.ref_count, 0);
421
422 if stream.in_flight_recv_data == 0 {
423 return;
424 }
425
426 tracing::trace!(
427 "auto-release closed stream ({:?}) capacity: {:?}",
428 stream.id,
429 stream.in_flight_recv_data,
430 );
431
432 self.release_connection_capacity(stream.in_flight_recv_data, task);
433 stream.in_flight_recv_data = 0;
434
435 self.clear_recv_buffer(stream);
436 }
437
438 /// Set the "target" connection window size.
439 ///
440 /// By default, all new connections start with 64kb of window size. As
441 /// streams used and release capacity, we will send WINDOW_UPDATEs for the
442 /// connection to bring it back up to the initial "target".
443 ///
444 /// Setting a target means that we will try to tell the peer about
445 /// WINDOW_UPDATEs so the peer knows it has about `target` window to use
446 /// for the whole connection.
447 ///
448 /// The `task` is an optional parked task for the `Connection` that might
449 /// be blocked on needing more window capacity.
450 pub fn set_target_connection_window(
451 &mut self,
452 target: WindowSize,
453 task: &mut Option<Waker>,
454 ) -> Result<(), Reason> {
455 tracing::trace!(
456 "set_target_connection_window; target={}; available={}, reserved={}",
457 target,
458 self.flow.available(),
459 self.in_flight_data,
460 );
461
462 // The current target connection window is our `available` plus any
463 // in-flight data reserved by streams.
464 //
465 // Update the flow controller with the difference between the new
466 // target and the current target.
467 let current = self
468 .flow
469 .available()
470 .add(self.in_flight_data)?
471 .checked_size();
472 if target > current {
473 self.flow.assign_capacity(target - current)?;
474 } else {
475 self.flow.claim_capacity(current - target)?;
476 }
477
478 // If changing the target capacity means we gained a bunch of capacity,
479 // enough that we went over the update threshold, then schedule sending
480 // a connection WINDOW_UPDATE.
481 if self.flow.unclaimed_capacity().is_some() {
482 if let Some(task) = task.take() {
483 task.wake();
484 }
485 }
486 Ok(())
487 }
488
489 pub(crate) fn apply_local_settings(
490 &mut self,
491 settings: &frame::Settings,
492 store: &mut Store,
493 ) -> Result<(), proto::Error> {
494 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
495 self.is_extended_connect_protocol_enabled = val;
496 }
497
498 if let Some(target) = settings.initial_window_size() {
499 let old_sz = self.init_window_sz;
500 self.init_window_sz = target;
501
502 tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
503
504 // Per RFC 7540 §6.9.2:
505 //
506 // In addition to changing the flow-control window for streams that are
507 // not yet active, a SETTINGS frame can alter the initial flow-control
508 // window size for streams with active flow-control windows (that is,
509 // streams in the "open" or "half-closed (remote)" state). When the
510 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
511 // the size of all stream flow-control windows that it maintains by the
512 // difference between the new value and the old value.
513 //
514 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
515 // space in a flow-control window to become negative. A sender MUST
516 // track the negative flow-control window and MUST NOT send new
517 // flow-controlled frames until it receives WINDOW_UPDATE frames that
518 // cause the flow-control window to become positive.
519
520 match target.cmp(&old_sz) {
521 Ordering::Less => {
522 // We must decrease the (local) window on every open stream.
523 let dec = old_sz - target;
524 tracing::trace!("decrementing all windows; dec={}", dec);
525
526 store.try_for_each(|mut stream| {
527 stream
528 .recv_flow
529 .dec_recv_window(dec)
530 .map_err(proto::Error::library_go_away)?;
531 Ok::<_, proto::Error>(())
532 })?;
533 }
534 Ordering::Greater => {
535 // We must increase the (local) window on every open stream.
536 let inc = target - old_sz;
537 tracing::trace!("incrementing all windows; inc={}", inc);
538 store.try_for_each(|mut stream| {
539 // XXX: Shouldn't the peer have already noticed our
540 // overflow and sent us a GOAWAY?
541 stream
542 .recv_flow
543 .inc_window(inc)
544 .map_err(proto::Error::library_go_away)?;
545 stream
546 .recv_flow
547 .assign_capacity(inc)
548 .map_err(proto::Error::library_go_away)?;
549 Ok::<_, proto::Error>(())
550 })?;
551 }
552 Ordering::Equal => (),
553 }
554 }
555
556 Ok(())
557 }
558
559 pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
560 if !stream.state.is_recv_closed() {
561 return false;
562 }
563
564 stream.pending_recv.is_empty()
565 }
566
567 pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) -> Result<(), Error> {
568 let sz = frame.payload().len();
569
570 // This should have been enforced at the codec::FramedRead layer, so
571 // this is just a sanity check.
572 assert!(sz <= MAX_WINDOW_SIZE as usize);
573
574 let sz = sz as WindowSize;
575
576 let is_ignoring_frame = stream.state.is_local_error();
577
578 if !is_ignoring_frame && !stream.state.is_recv_streaming() {
579 // TODO: There are cases where this can be a stream error of
580 // STREAM_CLOSED instead...
581
582 // Receiving a DATA frame when not expecting one is a protocol
583 // error.
584 proto_err!(conn: "unexpected DATA frame; stream={:?}", stream.id);
585 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
586 }
587
588 tracing::trace!(
589 "recv_data; size={}; connection={}; stream={}",
590 sz,
591 self.flow.window_size(),
592 stream.recv_flow.window_size()
593 );
594
595 if is_ignoring_frame {
596 tracing::trace!(
597 "recv_data; frame ignored on locally reset {:?} for some time",
598 stream.id,
599 );
600 return self.ignore_data(sz);
601 }
602
603 // Ensure that there is enough capacity on the connection before acting
604 // on the stream.
605 self.consume_connection_window(sz)?;
606
607 if stream.recv_flow.window_size() < sz {
608 // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
609 // > A receiver MAY respond with a stream error (Section 5.4.2) or
610 // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
611 // > it is unable to accept a frame.
612 //
613 // So, for violating the **stream** window, we can send either a
614 // stream or connection error. We've opted to send a stream
615 // error.
616 return Err(Error::library_reset(stream.id, Reason::FLOW_CONTROL_ERROR));
617 }
618
619 if stream.dec_content_length(frame.payload().len()).is_err() {
620 proto_err!(stream:
621 "recv_data: content-length overflow; stream={:?}; len={:?}",
622 stream.id,
623 frame.payload().len(),
624 );
625 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
626 }
627
628 if frame.is_end_stream() {
629 if stream.ensure_content_length_zero().is_err() {
630 proto_err!(stream:
631 "recv_data: content-length underflow; stream={:?}; len={:?}",
632 stream.id,
633 frame.payload().len(),
634 );
635 return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR));
636 }
637
638 if stream.state.recv_close().is_err() {
639 proto_err!(conn: "recv_data: failed to transition to closed state; stream={:?}", stream.id);
640 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
641 }
642 }
643
644 // Received a frame, but no one cared about it. fix issue#648
645 if !stream.is_recv {
646 tracing::trace!(
647 "recv_data; frame ignored on stream release {:?} for some time",
648 stream.id,
649 );
650 self.release_connection_capacity(sz, &mut None);
651 return Ok(());
652 }
653
654 // Update stream level flow control
655 stream
656 .recv_flow
657 .send_data(sz)
658 .map_err(proto::Error::library_go_away)?;
659
660 // Track the data as in-flight
661 stream.in_flight_recv_data += sz;
662
663 let event = Event::Data(frame.into_payload());
664
665 // Push the frame onto the recv buffer
666 stream.pending_recv.push_back(&mut self.buffer, event);
667 stream.notify_recv();
668
669 Ok(())
670 }
671
672 pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), Error> {
673 // Ensure that there is enough capacity on the connection...
674 self.consume_connection_window(sz)?;
675
676 // Since we are ignoring this frame,
677 // we aren't returning the frame to the user. That means they
678 // have no way to release the capacity back to the connection. So
679 // we have to release it automatically.
680 //
681 // This call doesn't send a WINDOW_UPDATE immediately, just marks
682 // the capacity as available to be reclaimed. When the available
683 // capacity meets a threshold, a WINDOW_UPDATE is then sent.
684 self.release_connection_capacity(sz, &mut None);
685 Ok(())
686 }
687
688 pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), Error> {
689 if self.flow.window_size() < sz {
690 tracing::debug!(
691 "connection error FLOW_CONTROL_ERROR -- window_size ({:?}) < sz ({:?});",
692 self.flow.window_size(),
693 sz,
694 );
695 return Err(Error::library_go_away(Reason::FLOW_CONTROL_ERROR));
696 }
697
698 // Update connection level flow control
699 self.flow.send_data(sz).map_err(Error::library_go_away)?;
700
701 // Track the data as in-flight
702 self.in_flight_data += sz;
703 Ok(())
704 }
705
706 pub fn recv_push_promise(
707 &mut self,
708 frame: frame::PushPromise,
709 stream: &mut store::Ptr,
710 ) -> Result<(), Error> {
711 stream.state.reserve_remote()?;
712 if frame.is_over_size() {
713 // A frame is over size if the decoded header block was bigger than
714 // SETTINGS_MAX_HEADER_LIST_SIZE.
715 //
716 // > A server that receives a larger header block than it is willing
717 // > to handle can send an HTTP 431 (Request Header Fields Too
718 // > Large) status code [RFC6585]. A client can discard responses
719 // > that it cannot process.
720 //
721 // So, if peer is a server, we'll send a 431. In either case,
722 // an error is recorded, which will send a REFUSED_STREAM,
723 // since we don't want any of the data frames either.
724 tracing::debug!(
725 "stream error REFUSED_STREAM -- recv_push_promise: \
726 headers frame is over size; promised_id={:?};",
727 frame.promised_id(),
728 );
729 return Err(Error::library_reset(
730 frame.promised_id(),
731 Reason::REFUSED_STREAM,
732 ));
733 }
734
735 let promised_id = frame.promised_id();
736 let (pseudo, fields) = frame.into_parts();
737 let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
738
739 if let Err(e) = frame::PushPromise::validate_request(&req) {
740 use PushPromiseHeaderError::*;
741 match e {
742 NotSafeAndCacheable => proto_err!(
743 stream:
744 "recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
745 req.method(),
746 promised_id,
747 ),
748 InvalidContentLength(e) => proto_err!(
749 stream:
750 "recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
751 e,
752 promised_id,
753 ),
754 }
755 return Err(Error::library_reset(promised_id, Reason::PROTOCOL_ERROR));
756 }
757
758 use super::peer::PollMessage::*;
759 stream
760 .pending_recv
761 .push_back(&mut self.buffer, Event::Headers(Server(req)));
762 stream.notify_recv();
763 Ok(())
764 }
765
766 /// Ensures that `id` is not in the `Idle` state.
767 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
768 if let Ok(next) = self.next_stream_id {
769 if id >= next {
770 tracing::debug!(
771 "stream ID implicitly closed, PROTOCOL_ERROR; stream={:?}",
772 id
773 );
774 return Err(Reason::PROTOCOL_ERROR);
775 }
776 }
777 // if next_stream_id is overflowed, that's ok.
778
779 Ok(())
780 }
781
782 /// Handle remote sending an explicit RST_STREAM.
783 pub fn recv_reset(
784 &mut self,
785 frame: frame::Reset,
786 stream: &mut Stream,
787 counts: &mut Counts,
788 ) -> Result<(), Error> {
789 // Reseting a stream that the user hasn't accepted is possible,
790 // but should be done with care. These streams will continue
791 // to take up memory in the accept queue, but will no longer be
792 // counted as "concurrent" streams.
793 //
794 // So, we have a separate limit for these.
795 //
796 // See https://github.com/hyperium/hyper/issues/2877
797 if stream.is_pending_accept {
798 if counts.can_inc_num_remote_reset_streams() {
799 counts.inc_num_remote_reset_streams();
800 } else {
801 tracing::warn!(
802 "recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
803 counts.max_remote_reset_streams(),
804 );
805 return Err(Error::library_go_away_data(
806 Reason::ENHANCE_YOUR_CALM,
807 "too_many_resets",
808 ));
809 }
810 }
811
812 // Notify the stream
813 stream.state.recv_reset(frame, stream.is_pending_send);
814
815 stream.notify_send();
816 stream.notify_recv();
817
818 Ok(())
819 }
820
821 /// Handle a connection-level error
822 pub fn handle_error(&mut self, err: &proto::Error, stream: &mut Stream) {
823 // Receive an error
824 stream.state.handle_error(err);
825
826 // If a receiver is waiting, notify it
827 stream.notify_send();
828 stream.notify_recv();
829 }
830
831 pub fn go_away(&mut self, last_processed_id: StreamId) {
832 assert!(self.max_stream_id >= last_processed_id);
833 self.max_stream_id = last_processed_id;
834 }
835
836 pub fn recv_eof(&mut self, stream: &mut Stream) {
837 stream.state.recv_eof();
838 stream.notify_send();
839 stream.notify_recv();
840 }
841
842 pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
843 while stream.pending_recv.pop_front(&mut self.buffer).is_some() {
844 // drop it
845 }
846 }
847
848 /// Get the max ID of streams we can receive.
849 ///
850 /// This gets lowered if we send a GOAWAY frame.
851 pub fn max_stream_id(&self) -> StreamId {
852 self.max_stream_id
853 }
854
855 pub fn next_stream_id(&self) -> Result<StreamId, Error> {
856 if let Ok(id) = self.next_stream_id {
857 Ok(id)
858 } else {
859 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
860 }
861 }
862
863 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
864 if let Ok(next_id) = self.next_stream_id {
865 // Peer::is_local_init should have been called beforehand
866 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
867 id < next_id
868 } else {
869 true
870 }
871 }
872
873 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
874 if let Ok(next_id) = self.next_stream_id {
875 // !Peer::is_local_init should have been called beforehand
876 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
877 if id >= next_id {
878 self.next_stream_id = id.next_id();
879 }
880 }
881 }
882
883 /// Returns true if the remote peer can reserve a stream with the given ID.
884 pub fn ensure_can_reserve(&self) -> Result<(), Error> {
885 if !self.is_push_enabled {
886 proto_err!(conn: "recv_push_promise: push is disabled");
887 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
888 }
889
890 Ok(())
891 }
892
893 /// Add a locally reset stream to queue to be eventually reaped.
894 pub fn enqueue_reset_expiration(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
895 if !stream.state.is_local_error() || stream.is_pending_reset_expiration() {
896 return;
897 }
898
899 tracing::trace!("enqueue_reset_expiration; {:?}", stream.id);
900
901 if counts.can_inc_num_reset_streams() {
902 counts.inc_num_reset_streams();
903 self.pending_reset_expired.push(stream);
904 }
905 }
906
907 /// Send any pending refusals.
908 pub fn send_pending_refusal<T, B>(
909 &mut self,
910 cx: &mut Context,
911 dst: &mut Codec<T, Prioritized<B>>,
912 ) -> Poll<io::Result<()>>
913 where
914 T: AsyncWrite + Unpin,
915 B: Buf,
916 {
917 if let Some(stream_id) = self.refused {
918 ready!(dst.poll_ready(cx))?;
919
920 // Create the RST_STREAM frame
921 let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM);
922
923 // Buffer the frame
924 dst.buffer(frame.into()).expect("invalid RST_STREAM frame");
925 }
926
927 self.refused = None;
928
929 Poll::Ready(Ok(()))
930 }
931
932 pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
933 if !self.pending_reset_expired.is_empty() {
934 let now = Instant::now();
935 let reset_duration = self.reset_duration;
936 while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
937 let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
938 // rust-lang/rust#86470 tracks a bug in the standard library where `Instant`
939 // subtraction can panic (because, on some platforms, `Instant` isn't actually
940 // monotonic). We use a saturating operation to avoid this panic here.
941 now.saturating_duration_since(reset_at) > reset_duration
942 }) {
943 counts.transition_after(stream, true);
944 }
945 }
946 }
947
948 pub fn clear_queues(
949 &mut self,
950 clear_pending_accept: bool,
951 store: &mut Store,
952 counts: &mut Counts,
953 ) {
954 self.clear_stream_window_update_queue(store, counts);
955 self.clear_all_reset_streams(store, counts);
956
957 if clear_pending_accept {
958 self.clear_all_pending_accept(store, counts);
959 }
960 }
961
962 fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
963 while let Some(stream) = self.pending_window_updates.pop(store) {
964 counts.transition(stream, |_, stream| {
965 tracing::trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
966 })
967 }
968 }
969
970 /// Called on EOF
971 fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
972 while let Some(stream) = self.pending_reset_expired.pop(store) {
973 counts.transition_after(stream, true);
974 }
975 }
976
977 fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
978 while let Some(stream) = self.pending_accept.pop(store) {
979 counts.transition_after(stream, false);
980 }
981 }
982
983 pub fn poll_complete<T, B>(
984 &mut self,
985 cx: &mut Context,
986 store: &mut Store,
987 counts: &mut Counts,
988 dst: &mut Codec<T, Prioritized<B>>,
989 ) -> Poll<io::Result<()>>
990 where
991 T: AsyncWrite + Unpin,
992 B: Buf,
993 {
994 // Send any pending connection level window updates
995 ready!(self.send_connection_window_update(cx, dst))?;
996
997 // Send any pending stream level window updates
998 ready!(self.send_stream_window_updates(cx, store, counts, dst))?;
999
1000 Poll::Ready(Ok(()))
1001 }
1002
1003 /// Send connection level window update
1004 fn send_connection_window_update<T, B>(
1005 &mut self,
1006 cx: &mut Context,
1007 dst: &mut Codec<T, Prioritized<B>>,
1008 ) -> Poll<io::Result<()>>
1009 where
1010 T: AsyncWrite + Unpin,
1011 B: Buf,
1012 {
1013 if let Some(incr) = self.flow.unclaimed_capacity() {
1014 let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
1015
1016 // Ensure the codec has capacity
1017 ready!(dst.poll_ready(cx))?;
1018
1019 // Buffer the WINDOW_UPDATE frame
1020 dst.buffer(frame.into())
1021 .expect("invalid WINDOW_UPDATE frame");
1022
1023 // Update flow control
1024 self.flow
1025 .inc_window(incr)
1026 .expect("unexpected flow control state");
1027 }
1028
1029 Poll::Ready(Ok(()))
1030 }
1031
1032 /// Send stream level window update
1033 pub fn send_stream_window_updates<T, B>(
1034 &mut self,
1035 cx: &mut Context,
1036 store: &mut Store,
1037 counts: &mut Counts,
1038 dst: &mut Codec<T, Prioritized<B>>,
1039 ) -> Poll<io::Result<()>>
1040 where
1041 T: AsyncWrite + Unpin,
1042 B: Buf,
1043 {
1044 loop {
1045 // Ensure the codec has capacity
1046 ready!(dst.poll_ready(cx))?;
1047
1048 // Get the next stream
1049 let stream = match self.pending_window_updates.pop(store) {
1050 Some(stream) => stream,
1051 None => return Poll::Ready(Ok(())),
1052 };
1053
1054 counts.transition(stream, |_, stream| {
1055 tracing::trace!("pending_window_updates -- pop; stream={:?}", stream.id);
1056 debug_assert!(!stream.is_pending_window_update);
1057
1058 if !stream.state.is_recv_streaming() {
1059 // No need to send window updates on the stream if the stream is
1060 // no longer receiving data.
1061 //
1062 // TODO: is this correct? We could possibly send a window
1063 // update on a ReservedRemote stream if we already know
1064 // we want to stream the data faster...
1065 return;
1066 }
1067
1068 // TODO: de-dup
1069 if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
1070 // Create the WINDOW_UPDATE frame
1071 let frame = frame::WindowUpdate::new(stream.id, incr);
1072
1073 // Buffer it
1074 dst.buffer(frame.into())
1075 .expect("invalid WINDOW_UPDATE frame");
1076
1077 // Update flow control
1078 stream
1079 .recv_flow
1080 .inc_window(incr)
1081 .expect("unexpected flow control state");
1082 }
1083 })
1084 }
1085 }
1086
1087 pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
1088 self.pending_accept.pop(store).map(|ptr| ptr.key())
1089 }
1090
1091 pub fn poll_data(
1092 &mut self,
1093 cx: &Context,
1094 stream: &mut Stream,
1095 ) -> Poll<Option<Result<Bytes, proto::Error>>> {
1096 match stream.pending_recv.pop_front(&mut self.buffer) {
1097 Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
1098 Some(event) => {
1099 // Frame is trailer
1100 stream.pending_recv.push_front(&mut self.buffer, event);
1101
1102 // Notify the recv task. This is done just in case
1103 // `poll_trailers` was called.
1104 //
1105 // It is very likely that `notify_recv` will just be a no-op (as
1106 // the task will be None), so this isn't really much of a
1107 // performance concern. It also means we don't have to track
1108 // state to see if `poll_trailers` was called before `poll_data`
1109 // returned `None`.
1110 stream.notify_recv();
1111
1112 // No more data frames
1113 Poll::Ready(None)
1114 }
1115 None => self.schedule_recv(cx, stream),
1116 }
1117 }
1118
1119 pub fn poll_trailers(
1120 &mut self,
1121 cx: &Context,
1122 stream: &mut Stream,
1123 ) -> Poll<Option<Result<HeaderMap, proto::Error>>> {
1124 match stream.pending_recv.pop_front(&mut self.buffer) {
1125 Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))),
1126 Some(event) => {
1127 // Frame is not trailers.. not ready to poll trailers yet.
1128 stream.pending_recv.push_front(&mut self.buffer, event);
1129
1130 Poll::Pending
1131 }
1132 None => self.schedule_recv(cx, stream),
1133 }
1134 }
1135
1136 fn schedule_recv<T>(
1137 &mut self,
1138 cx: &Context,
1139 stream: &mut Stream,
1140 ) -> Poll<Option<Result<T, proto::Error>>> {
1141 if stream.state.ensure_recv_open()? {
1142 // Request to get notified once more frames arrive
1143 stream.recv_task = Some(cx.waker().clone());
1144 Poll::Pending
1145 } else {
1146 // No more frames will be received
1147 Poll::Ready(None)
1148 }
1149 }
1150}
1151
1152// ===== impl Open =====
1153
1154impl Open {
1155 pub fn is_push_promise(&self) -> bool {
1156 matches!(*self, Self::PushPromise)
1157 }
1158}
1159
1160// ===== impl RecvHeaderBlockError =====
1161
1162impl<T> From<Error> for RecvHeaderBlockError<T> {
1163 fn from(err: Error) -> Self {
1164 RecvHeaderBlockError::State(err)
1165 }
1166}
1167