1use super::store::Resolve;
2use super::*;
3
4use crate::frame::{Reason, StreamId};
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::{Buf, Take};
10use std::{
11 cmp::{self, Ordering},
12 fmt, io, mem,
13 task::{Context, Poll, Waker},
14};
15
16/// # Warning
17///
18/// Queued streams are ordered by stream ID, as we need to ensure that
19/// lower-numbered streams are sent headers before higher-numbered ones.
20/// This is because "idle" stream IDs – those which have been initiated but
21/// have yet to receive frames – will be implicitly closed on receipt of a
22/// frame on a higher stream ID. If these queues was not ordered by stream
23/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24/// idle stream is opened first.
25#[derive(Debug)]
26pub(super) struct Prioritize {
27 /// Queue of streams waiting for socket capacity to send a frame.
28 pending_send: store::Queue<stream::NextSend>,
29
30 /// Queue of streams waiting for window capacity to produce data.
31 pending_capacity: store::Queue<stream::NextSendCapacity>,
32
33 /// Streams waiting for capacity due to max concurrency
34 ///
35 /// The `SendRequest` handle is `Clone`. This enables initiating requests
36 /// from many tasks. However, offering this capability while supporting
37 /// backpressure at some level is tricky. If there are many `SendRequest`
38 /// handles and a single stream becomes available, which handle gets
39 /// assigned that stream? Maybe that handle is no longer ready to send a
40 /// request.
41 ///
42 /// The strategy used is to allow each `SendRequest` handle one buffered
43 /// request. A `SendRequest` handle is ready to send a request if it has no
44 /// associated buffered requests. This is the same strategy as `mpsc` in the
45 /// futures library.
46 pending_open: store::Queue<stream::NextOpen>,
47
48 /// Connection level flow control governing sent data
49 flow: FlowControl,
50
51 /// Stream ID of the last stream opened.
52 last_opened_id: StreamId,
53
54 /// What `DATA` frame is currently being sent in the codec.
55 in_flight_data_frame: InFlightData,
56
57 /// The maximum amount of bytes a stream should buffer.
58 max_buffer_size: usize,
59}
60
61#[derive(Debug, Eq, PartialEq)]
62enum InFlightData {
63 /// There is no `DATA` frame in flight.
64 Nothing,
65 /// There is a `DATA` frame in flight belonging to the given stream.
66 DataFrame(store::Key),
67 /// There was a `DATA` frame, but the stream's queue was since cleared.
68 Drop,
69}
70
71pub(crate) struct Prioritized<B> {
72 // The buffer
73 inner: Take<B>,
74
75 end_of_stream: bool,
76
77 // The stream that this is associated with
78 stream: store::Key,
79}
80
81// ===== impl Prioritize =====
82
83impl Prioritize {
84 pub fn new(config: &Config) -> Prioritize {
85 let mut flow = FlowControl::new();
86
87 flow.inc_window(config.remote_init_window_sz)
88 .expect("invalid initial window size");
89
90 // TODO: proper error handling
91 let _res = flow.assign_capacity(config.remote_init_window_sz);
92 debug_assert!(_res.is_ok());
93
94 tracing::trace!("Prioritize::new; flow={:?}", flow);
95
96 Prioritize {
97 pending_send: store::Queue::new(),
98 pending_capacity: store::Queue::new(),
99 pending_open: store::Queue::new(),
100 flow,
101 last_opened_id: StreamId::ZERO,
102 in_flight_data_frame: InFlightData::Nothing,
103 max_buffer_size: config.local_max_buffer_size,
104 }
105 }
106
107 pub(crate) fn max_buffer_size(&self) -> usize {
108 self.max_buffer_size
109 }
110
111 /// Queue a frame to be sent to the remote
112 pub fn queue_frame<B>(
113 &mut self,
114 frame: Frame<B>,
115 buffer: &mut Buffer<Frame<B>>,
116 stream: &mut store::Ptr,
117 task: &mut Option<Waker>,
118 ) {
119 let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120 let _e = span.enter();
121 // Queue the frame in the buffer
122 stream.pending_send.push_back(buffer, frame);
123 self.schedule_send(stream, task);
124 }
125
126 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127 // If the stream is waiting to be opened, nothing more to do.
128 if stream.is_send_ready() {
129 tracing::trace!(?stream.id, "schedule_send");
130 // Queue the stream
131 self.pending_send.push(stream);
132
133 // Notify the connection.
134 if let Some(task) = task.take() {
135 task.wake();
136 }
137 }
138 }
139
140 pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141 self.pending_open.push(stream);
142 }
143
144 /// Send a data frame
145 pub fn send_data<B>(
146 &mut self,
147 frame: frame::Data<B>,
148 buffer: &mut Buffer<Frame<B>>,
149 stream: &mut store::Ptr,
150 counts: &mut Counts,
151 task: &mut Option<Waker>,
152 ) -> Result<(), UserError>
153 where
154 B: Buf,
155 {
156 let sz = frame.payload().remaining();
157
158 if sz > MAX_WINDOW_SIZE as usize {
159 return Err(UserError::PayloadTooBig);
160 }
161
162 let sz = sz as WindowSize;
163
164 if !stream.state.is_send_streaming() {
165 if stream.state.is_closed() {
166 return Err(InactiveStreamId);
167 } else {
168 return Err(UnexpectedFrameType);
169 }
170 }
171
172 // Update the buffered data counter
173 stream.buffered_send_data += sz as usize;
174
175 let span =
176 tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177 let _e = span.enter();
178 tracing::trace!(buffered = stream.buffered_send_data);
179
180 // Implicitly request more send capacity if not enough has been
181 // requested yet.
182 if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183 // Update the target requested capacity
184 stream.requested_send_capacity =
185 cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
187 self.try_assign_capacity(stream);
188 }
189
190 if frame.is_end_stream() {
191 stream.state.send_close();
192 self.reserve_capacity(0, stream, counts);
193 }
194
195 tracing::trace!(
196 available = %stream.send_flow.available(),
197 buffered = stream.buffered_send_data,
198 );
199
200 // The `stream.buffered_send_data == 0` check is here so that, if a zero
201 // length data frame is queued to the front (there is no previously
202 // queued data), it gets sent out immediately even if there is no
203 // available send window.
204 //
205 // Sending out zero length data frames can be done to signal
206 // end-of-stream.
207 //
208 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
209 // The stream currently has capacity to send the data frame, so
210 // queue it up and notify the connection task.
211 self.queue_frame(frame.into(), buffer, stream, task);
212 } else {
213 // The stream has no capacity to send the frame now, save it but
214 // don't notify the connection task. Once additional capacity
215 // becomes available, the frame will be flushed.
216 stream.pending_send.push_back(buffer, frame.into());
217 }
218
219 Ok(())
220 }
221
222 /// Request capacity to send data
223 pub fn reserve_capacity(
224 &mut self,
225 capacity: WindowSize,
226 stream: &mut store::Ptr,
227 counts: &mut Counts,
228 ) {
229 let span = tracing::trace_span!(
230 "reserve_capacity",
231 ?stream.id,
232 requested = capacity,
233 effective = (capacity as usize) + stream.buffered_send_data,
234 curr = stream.requested_send_capacity
235 );
236 let _e = span.enter();
237
238 // Actual capacity is `capacity` + the current amount of buffered data.
239 // If it were less, then we could never send out the buffered data.
240 let capacity = (capacity as usize) + stream.buffered_send_data;
241
242 match capacity.cmp(&(stream.requested_send_capacity as usize)) {
243 Ordering::Equal => {
244 // Nothing to do
245 }
246 Ordering::Less => {
247 // Update the target requested capacity
248 stream.requested_send_capacity = capacity as WindowSize;
249
250 // Currently available capacity assigned to the stream
251 let available = stream.send_flow.available().as_size();
252
253 // If the stream has more assigned capacity than requested, reclaim
254 // some for the connection
255 if available as usize > capacity {
256 let diff = available - capacity as WindowSize;
257
258 // TODO: proper error handling
259 let _res = stream.send_flow.claim_capacity(diff);
260 debug_assert!(_res.is_ok());
261
262 self.assign_connection_capacity(diff, stream, counts);
263 }
264 }
265 Ordering::Greater => {
266 // If trying to *add* capacity, but the stream send side is closed,
267 // there's nothing to be done.
268 if stream.state.is_send_closed() {
269 return;
270 }
271
272 // Update the target requested capacity
273 stream.requested_send_capacity =
274 cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
275
276 // Try to assign additional capacity to the stream. If none is
277 // currently available, the stream will be queued to receive some
278 // when more becomes available.
279 self.try_assign_capacity(stream);
280 }
281 }
282 }
283
284 pub fn recv_stream_window_update(
285 &mut self,
286 inc: WindowSize,
287 stream: &mut store::Ptr,
288 ) -> Result<(), Reason> {
289 let span = tracing::trace_span!(
290 "recv_stream_window_update",
291 ?stream.id,
292 ?stream.state,
293 inc,
294 flow = ?stream.send_flow
295 );
296 let _e = span.enter();
297
298 if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
299 // We can't send any data, so don't bother doing anything else.
300 return Ok(());
301 }
302
303 // Update the stream level flow control.
304 stream.send_flow.inc_window(inc)?;
305
306 // If the stream is waiting on additional capacity, then this will
307 // assign it (if available on the connection) and notify the producer
308 self.try_assign_capacity(stream);
309
310 Ok(())
311 }
312
313 pub fn recv_connection_window_update(
314 &mut self,
315 inc: WindowSize,
316 store: &mut Store,
317 counts: &mut Counts,
318 ) -> Result<(), Reason> {
319 // Update the connection's window
320 self.flow.inc_window(inc)?;
321
322 self.assign_connection_capacity(inc, store, counts);
323 Ok(())
324 }
325
326 /// Reclaim all capacity assigned to the stream and re-assign it to the
327 /// connection
328 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
329 let available = stream.send_flow.available().as_size();
330 if available > 0 {
331 // TODO: proper error handling
332 let _res = stream.send_flow.claim_capacity(available);
333 debug_assert!(_res.is_ok());
334 // Re-assign all capacity to the connection
335 self.assign_connection_capacity(available, stream, counts);
336 }
337 }
338
339 /// Reclaim just reserved capacity, not buffered capacity, and re-assign
340 /// it to the connection
341 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
342 // only reclaim requested capacity that isn't already buffered
343 if stream.requested_send_capacity as usize > stream.buffered_send_data {
344 let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
345
346 // TODO: proper error handling
347 let _res = stream.send_flow.claim_capacity(reserved);
348 debug_assert!(_res.is_ok());
349 self.assign_connection_capacity(reserved, stream, counts);
350 }
351 }
352
353 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
354 let span = tracing::trace_span!("clear_pending_capacity");
355 let _e = span.enter();
356 while let Some(stream) = self.pending_capacity.pop(store) {
357 counts.transition(stream, |_, stream| {
358 tracing::trace!(?stream.id, "clear_pending_capacity");
359 })
360 }
361 }
362
363 pub fn assign_connection_capacity<R>(
364 &mut self,
365 inc: WindowSize,
366 store: &mut R,
367 counts: &mut Counts,
368 ) where
369 R: Resolve,
370 {
371 let span = tracing::trace_span!("assign_connection_capacity", inc);
372 let _e = span.enter();
373
374 // TODO: proper error handling
375 let _res = self.flow.assign_capacity(inc);
376 debug_assert!(_res.is_ok());
377
378 // Assign newly acquired capacity to streams pending capacity.
379 while self.flow.available() > 0 {
380 let stream = match self.pending_capacity.pop(store) {
381 Some(stream) => stream,
382 None => return,
383 };
384
385 // Streams pending capacity may have been reset before capacity
386 // became available. In that case, the stream won't want any
387 // capacity, and so we shouldn't "transition" on it, but just evict
388 // it and continue the loop.
389 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
390 continue;
391 }
392
393 counts.transition(stream, |_, stream| {
394 // Try to assign capacity to the stream. This will also re-queue the
395 // stream if there isn't enough connection level capacity to fulfill
396 // the capacity request.
397 self.try_assign_capacity(stream);
398 })
399 }
400 }
401
402 /// Request capacity to send data
403 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
404 let total_requested = stream.requested_send_capacity;
405
406 // Total requested should never go below actual assigned
407 // (Note: the window size can go lower than assigned)
408 debug_assert!(stream.send_flow.available() <= total_requested as usize);
409
410 // The amount of additional capacity that the stream requests.
411 // Don't assign more than the window has available!
412 let additional = cmp::min(
413 total_requested - stream.send_flow.available().as_size(),
414 // Can't assign more than what is available
415 stream.send_flow.window_size() - stream.send_flow.available().as_size(),
416 );
417 let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
418 let _e = span.enter();
419 tracing::trace!(
420 requested = total_requested,
421 additional,
422 buffered = stream.buffered_send_data,
423 window = stream.send_flow.window_size(),
424 conn = %self.flow.available()
425 );
426
427 if additional == 0 {
428 // Nothing more to do
429 return;
430 }
431
432 // If the stream has requested capacity, then it must be in the
433 // streaming state (more data could be sent) or there is buffered data
434 // waiting to be sent.
435 debug_assert!(
436 stream.state.is_send_streaming() || stream.buffered_send_data > 0,
437 "state={:?}",
438 stream.state
439 );
440
441 // The amount of currently available capacity on the connection
442 let conn_available = self.flow.available().as_size();
443
444 // First check if capacity is immediately available
445 if conn_available > 0 {
446 // The amount of capacity to assign to the stream
447 // TODO: Should prioritization factor into this?
448 let assign = cmp::min(conn_available, additional);
449
450 tracing::trace!(capacity = assign, "assigning");
451
452 // Assign the capacity to the stream
453 stream.assign_capacity(assign, self.max_buffer_size);
454
455 // Claim the capacity from the connection
456 // TODO: proper error handling
457 let _res = self.flow.claim_capacity(assign);
458 debug_assert!(_res.is_ok());
459 }
460
461 tracing::trace!(
462 available = %stream.send_flow.available(),
463 requested = stream.requested_send_capacity,
464 buffered = stream.buffered_send_data,
465 has_unavailable = %stream.send_flow.has_unavailable()
466 );
467
468 if stream.send_flow.available() < stream.requested_send_capacity as usize
469 && stream.send_flow.has_unavailable()
470 {
471 // The stream requires additional capacity and the stream's
472 // window has available capacity, but the connection window
473 // does not.
474 //
475 // In this case, the stream needs to be queued up for when the
476 // connection has more capacity.
477 self.pending_capacity.push(stream);
478 }
479
480 // If data is buffered and the stream is send ready, then
481 // schedule the stream for execution
482 if stream.buffered_send_data > 0 && stream.is_send_ready() {
483 // TODO: This assertion isn't *exactly* correct. There can still be
484 // buffered send data while the stream's pending send queue is
485 // empty. This can happen when a large data frame is in the process
486 // of being **partially** sent. Once the window has been sent, the
487 // data frame will be returned to the prioritization layer to be
488 // re-scheduled.
489 //
490 // That said, it would be nice to figure out how to make this
491 // assertion correctly.
492 //
493 // debug_assert!(!stream.pending_send.is_empty());
494
495 self.pending_send.push(stream);
496 }
497 }
498
499 pub fn poll_complete<T, B>(
500 &mut self,
501 cx: &mut Context,
502 buffer: &mut Buffer<Frame<B>>,
503 store: &mut Store,
504 counts: &mut Counts,
505 dst: &mut Codec<T, Prioritized<B>>,
506 ) -> Poll<io::Result<()>>
507 where
508 T: AsyncWrite + Unpin,
509 B: Buf,
510 {
511 // Ensure codec is ready
512 ready!(dst.poll_ready(cx))?;
513
514 // Reclaim any frame that has previously been written
515 self.reclaim_frame(buffer, store, dst);
516
517 // The max frame length
518 let max_frame_len = dst.max_send_frame_size();
519
520 tracing::trace!("poll_complete");
521
522 loop {
523 if let Some(mut stream) = self.pop_pending_open(store, counts) {
524 self.pending_send.push_front(&mut stream);
525 }
526
527 match self.pop_frame(buffer, store, max_frame_len, counts) {
528 Some(frame) => {
529 tracing::trace!(?frame, "writing");
530
531 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
532 if let Frame::Data(ref frame) = frame {
533 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
534 }
535 dst.buffer(frame).expect("invalid frame");
536
537 // Ensure the codec is ready to try the loop again.
538 ready!(dst.poll_ready(cx))?;
539
540 // Because, always try to reclaim...
541 self.reclaim_frame(buffer, store, dst);
542 }
543 None => {
544 // Try to flush the codec.
545 ready!(dst.flush(cx))?;
546
547 // This might release a data frame...
548 if !self.reclaim_frame(buffer, store, dst) {
549 return Poll::Ready(Ok(()));
550 }
551
552 // No need to poll ready as poll_complete() does this for
553 // us...
554 }
555 }
556 }
557 }
558
559 /// Tries to reclaim a pending data frame from the codec.
560 ///
561 /// Returns true if a frame was reclaimed.
562 ///
563 /// When a data frame is written to the codec, it may not be written in its
564 /// entirety (large chunks are split up into potentially many data frames).
565 /// In this case, the stream needs to be reprioritized.
566 fn reclaim_frame<T, B>(
567 &mut self,
568 buffer: &mut Buffer<Frame<B>>,
569 store: &mut Store,
570 dst: &mut Codec<T, Prioritized<B>>,
571 ) -> bool
572 where
573 B: Buf,
574 {
575 let span = tracing::trace_span!("try_reclaim_frame");
576 let _e = span.enter();
577
578 // First check if there are any data chunks to take back
579 if let Some(frame) = dst.take_last_data_frame() {
580 self.reclaim_frame_inner(buffer, store, frame)
581 } else {
582 false
583 }
584 }
585
586 fn reclaim_frame_inner<B>(
587 &mut self,
588 buffer: &mut Buffer<Frame<B>>,
589 store: &mut Store,
590 frame: frame::Data<Prioritized<B>>,
591 ) -> bool
592 where
593 B: Buf,
594 {
595 tracing::trace!(
596 ?frame,
597 sz = frame.payload().inner.get_ref().remaining(),
598 "reclaimed"
599 );
600
601 let mut eos = false;
602 let key = frame.payload().stream;
603
604 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
605 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
606 InFlightData::Drop => {
607 tracing::trace!("not reclaiming frame for cancelled stream");
608 return false;
609 }
610 InFlightData::DataFrame(k) => {
611 debug_assert_eq!(k, key);
612 }
613 }
614
615 let mut frame = frame.map(|prioritized| {
616 // TODO: Ensure fully written
617 eos = prioritized.end_of_stream;
618 prioritized.inner.into_inner()
619 });
620
621 if frame.payload().has_remaining() {
622 let mut stream = store.resolve(key);
623
624 if eos {
625 frame.set_end_stream(true);
626 }
627
628 self.push_back_frame(frame.into(), buffer, &mut stream);
629
630 return true;
631 }
632
633 false
634 }
635
636 /// Push the frame to the front of the stream's deque, scheduling the
637 /// stream if needed.
638 fn push_back_frame<B>(
639 &mut self,
640 frame: Frame<B>,
641 buffer: &mut Buffer<Frame<B>>,
642 stream: &mut store::Ptr,
643 ) {
644 // Push the frame to the front of the stream's deque
645 stream.pending_send.push_front(buffer, frame);
646
647 // If needed, schedule the sender
648 if stream.send_flow.available() > 0 {
649 debug_assert!(!stream.pending_send.is_empty());
650 self.pending_send.push(stream);
651 }
652 }
653
654 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
655 let span = tracing::trace_span!("clear_queue", ?stream.id);
656 let _e = span.enter();
657
658 // TODO: make this more efficient?
659 while let Some(frame) = stream.pending_send.pop_front(buffer) {
660 tracing::trace!(?frame, "dropping");
661 }
662
663 stream.buffered_send_data = 0;
664 stream.requested_send_capacity = 0;
665 if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
666 if stream.key() == key {
667 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
668 self.in_flight_data_frame = InFlightData::Drop;
669 }
670 }
671 }
672
673 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
674 while let Some(stream) = self.pending_send.pop(store) {
675 let is_pending_reset = stream.is_pending_reset_expiration();
676 counts.transition_after(stream, is_pending_reset);
677 }
678 }
679
680 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
681 while let Some(stream) = self.pending_open.pop(store) {
682 let is_pending_reset = stream.is_pending_reset_expiration();
683 counts.transition_after(stream, is_pending_reset);
684 }
685 }
686
687 fn pop_frame<B>(
688 &mut self,
689 buffer: &mut Buffer<Frame<B>>,
690 store: &mut Store,
691 max_len: usize,
692 counts: &mut Counts,
693 ) -> Option<Frame<Prioritized<B>>>
694 where
695 B: Buf,
696 {
697 let span = tracing::trace_span!("pop_frame");
698 let _e = span.enter();
699
700 loop {
701 match self.pending_send.pop(store) {
702 Some(mut stream) => {
703 let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
704 let _e = span.enter();
705
706 // It's possible that this stream, besides having data to send,
707 // is also queued to send a reset, and thus is already in the queue
708 // to wait for "some time" after a reset.
709 //
710 // To be safe, we just always ask the stream.
711 let is_pending_reset = stream.is_pending_reset_expiration();
712
713 tracing::trace!(is_pending_reset);
714
715 let frame = match stream.pending_send.pop_front(buffer) {
716 Some(Frame::Data(mut frame)) => {
717 // Get the amount of capacity remaining for stream's
718 // window.
719 let stream_capacity = stream.send_flow.available();
720 let sz = frame.payload().remaining();
721
722 tracing::trace!(
723 sz,
724 eos = frame.is_end_stream(),
725 window = %stream_capacity,
726 available = %stream.send_flow.available(),
727 requested = stream.requested_send_capacity,
728 buffered = stream.buffered_send_data,
729 "data frame"
730 );
731
732 // Zero length data frames always have capacity to
733 // be sent.
734 if sz > 0 && stream_capacity == 0 {
735 tracing::trace!("stream capacity is 0");
736
737 // Ensure that the stream is waiting for
738 // connection level capacity
739 //
740 // TODO: uncomment
741 // debug_assert!(stream.is_pending_send_capacity);
742
743 // The stream has no more capacity, this can
744 // happen if the remote reduced the stream
745 // window. In this case, we need to buffer the
746 // frame and wait for a window update...
747 stream.pending_send.push_front(buffer, frame.into());
748
749 continue;
750 }
751
752 // Only send up to the max frame length
753 let len = cmp::min(sz, max_len);
754
755 // Only send up to the stream's window capacity
756 let len =
757 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
758
759 // There *must* be be enough connection level
760 // capacity at this point.
761 debug_assert!(len <= self.flow.window_size());
762
763 // Check if the stream level window the peer knows is available. In some
764 // scenarios, maybe the window we know is available but the window which
765 // peer knows is not.
766 if len > 0 && len > stream.send_flow.window_size() {
767 stream.pending_send.push_front(buffer, frame.into());
768 continue;
769 }
770
771 tracing::trace!(len, "sending data frame");
772
773 // Update the flow control
774 tracing::trace_span!("updating stream flow").in_scope(|| {
775 stream.send_data(len, self.max_buffer_size);
776
777 // Assign the capacity back to the connection that
778 // was just consumed from the stream in the previous
779 // line.
780 // TODO: proper error handling
781 let _res = self.flow.assign_capacity(len);
782 debug_assert!(_res.is_ok());
783 });
784
785 let (eos, len) = tracing::trace_span!("updating connection flow")
786 .in_scope(|| {
787 // TODO: proper error handling
788 let _res = self.flow.send_data(len);
789 debug_assert!(_res.is_ok());
790
791 // Wrap the frame's data payload to ensure that the
792 // correct amount of data gets written.
793
794 let eos = frame.is_end_stream();
795 let len = len as usize;
796
797 if frame.payload().remaining() > len {
798 frame.set_end_stream(false);
799 }
800 (eos, len)
801 });
802
803 Frame::Data(frame.map(|buf| Prioritized {
804 inner: buf.take(len),
805 end_of_stream: eos,
806 stream: stream.key(),
807 }))
808 }
809 Some(Frame::PushPromise(pp)) => {
810 let mut pushed =
811 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
812 pushed.is_pending_push = false;
813 // Transition stream from pending_push to pending_open
814 // if possible
815 if !pushed.pending_send.is_empty() {
816 if counts.can_inc_num_send_streams() {
817 counts.inc_num_send_streams(&mut pushed);
818 self.pending_send.push(&mut pushed);
819 } else {
820 self.queue_open(&mut pushed);
821 }
822 }
823 Frame::PushPromise(pp)
824 }
825 Some(frame) => frame.map(|_| {
826 unreachable!(
827 "Frame::map closure will only be called \
828 on DATA frames."
829 )
830 }),
831 None => {
832 if let Some(reason) = stream.state.get_scheduled_reset() {
833 let stream_id = stream.id;
834 stream
835 .state
836 .set_reset(stream_id, reason, Initiator::Library);
837
838 let frame = frame::Reset::new(stream.id, reason);
839 Frame::Reset(frame)
840 } else {
841 // If the stream receives a RESET from the peer, it may have
842 // had data buffered to be sent, but all the frames are cleared
843 // in clear_queue(). Instead of doing O(N) traversal through queue
844 // to remove, lets just ignore the stream here.
845 tracing::trace!("removing dangling stream from pending_send");
846 // Since this should only happen as a consequence of `clear_queue`,
847 // we must be in a closed state of some kind.
848 debug_assert!(stream.state.is_closed());
849 counts.transition_after(stream, is_pending_reset);
850 continue;
851 }
852 }
853 };
854
855 tracing::trace!("pop_frame; frame={:?}", frame);
856
857 if cfg!(debug_assertions) && stream.state.is_idle() {
858 debug_assert!(stream.id > self.last_opened_id);
859 self.last_opened_id = stream.id;
860 }
861
862 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
863 // TODO: Only requeue the sender IF it is ready to send
864 // the next frame. i.e. don't requeue it if the next
865 // frame is a data frame and the stream does not have
866 // any more capacity.
867 self.pending_send.push(&mut stream);
868 }
869
870 counts.transition_after(stream, is_pending_reset);
871
872 return Some(frame);
873 }
874 None => return None,
875 }
876 }
877 }
878
879 fn pop_pending_open<'s>(
880 &mut self,
881 store: &'s mut Store,
882 counts: &mut Counts,
883 ) -> Option<store::Ptr<'s>> {
884 tracing::trace!("schedule_pending_open");
885 // check for any pending open streams
886 if counts.can_inc_num_send_streams() {
887 if let Some(mut stream) = self.pending_open.pop(store) {
888 tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
889
890 counts.inc_num_send_streams(&mut stream);
891 stream.notify_send();
892 return Some(stream);
893 }
894 }
895
896 None
897 }
898}
899
900// ===== impl Prioritized =====
901
902impl<B> Buf for Prioritized<B>
903where
904 B: Buf,
905{
906 fn remaining(&self) -> usize {
907 self.inner.remaining()
908 }
909
910 fn chunk(&self) -> &[u8] {
911 self.inner.chunk()
912 }
913
914 fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
915 self.inner.chunks_vectored(dst)
916 }
917
918 fn advance(&mut self, cnt: usize) {
919 self.inner.advance(cnt)
920 }
921}
922
923impl<B: Buf> fmt::Debug for Prioritized<B> {
924 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
925 fmt&mut DebugStruct<'_, '_>.debug_struct("Prioritized")
926 .field("remaining", &self.inner.get_ref().remaining())
927 .field("end_of_stream", &self.end_of_stream)
928 .field(name:"stream", &self.stream)
929 .finish()
930 }
931}
932