1use super::store::Resolve;
2use super::*;
3
4use crate::frame::{Reason, StreamId};
5
6use crate::codec::UserError;
7use crate::codec::UserError::*;
8
9use bytes::buf::{Buf, Take};
10use std::{
11 cmp::{self, Ordering},
12 fmt, io, mem,
13 task::{Context, Poll, Waker},
14};
15
16/// # Warning
17///
18/// Queued streams are ordered by stream ID, as we need to ensure that
19/// lower-numbered streams are sent headers before higher-numbered ones.
20/// This is because "idle" stream IDs – those which have been initiated but
21/// have yet to receive frames – will be implicitly closed on receipt of a
22/// frame on a higher stream ID. If these queues was not ordered by stream
23/// IDs, some mechanism would be necessary to ensure that the lowest-numbered]
24/// idle stream is opened first.
25#[derive(Debug)]
26pub(super) struct Prioritize {
27 /// Queue of streams waiting for socket capacity to send a frame.
28 pending_send: store::Queue<stream::NextSend>,
29
30 /// Queue of streams waiting for window capacity to produce data.
31 pending_capacity: store::Queue<stream::NextSendCapacity>,
32
33 /// Streams waiting for capacity due to max concurrency
34 ///
35 /// The `SendRequest` handle is `Clone`. This enables initiating requests
36 /// from many tasks. However, offering this capability while supporting
37 /// backpressure at some level is tricky. If there are many `SendRequest`
38 /// handles and a single stream becomes available, which handle gets
39 /// assigned that stream? Maybe that handle is no longer ready to send a
40 /// request.
41 ///
42 /// The strategy used is to allow each `SendRequest` handle one buffered
43 /// request. A `SendRequest` handle is ready to send a request if it has no
44 /// associated buffered requests. This is the same strategy as `mpsc` in the
45 /// futures library.
46 pending_open: store::Queue<stream::NextOpen>,
47
48 /// Connection level flow control governing sent data
49 flow: FlowControl,
50
51 /// Stream ID of the last stream opened.
52 last_opened_id: StreamId,
53
54 /// What `DATA` frame is currently being sent in the codec.
55 in_flight_data_frame: InFlightData,
56
57 /// The maximum amount of bytes a stream should buffer.
58 max_buffer_size: usize,
59}
60
61#[derive(Debug, Eq, PartialEq)]
62enum InFlightData {
63 /// There is no `DATA` frame in flight.
64 Nothing,
65 /// There is a `DATA` frame in flight belonging to the given stream.
66 DataFrame(store::Key),
67 /// There was a `DATA` frame, but the stream's queue was since cleared.
68 Drop,
69}
70
71pub(crate) struct Prioritized<B> {
72 // The buffer
73 inner: Take<B>,
74
75 end_of_stream: bool,
76
77 // The stream that this is associated with
78 stream: store::Key,
79}
80
81// ===== impl Prioritize =====
82
83impl Prioritize {
84 pub fn new(config: &Config) -> Prioritize {
85 let mut flow = FlowControl::new();
86
87 flow.inc_window(config.remote_init_window_sz)
88 .expect("invalid initial window size");
89
90 // TODO: proper error handling
91 let _res = flow.assign_capacity(config.remote_init_window_sz);
92 debug_assert!(_res.is_ok());
93
94 tracing::trace!("Prioritize::new; flow={:?}", flow);
95
96 Prioritize {
97 pending_send: store::Queue::new(),
98 pending_capacity: store::Queue::new(),
99 pending_open: store::Queue::new(),
100 flow,
101 last_opened_id: StreamId::ZERO,
102 in_flight_data_frame: InFlightData::Nothing,
103 max_buffer_size: config.local_max_buffer_size,
104 }
105 }
106
107 pub(crate) fn max_buffer_size(&self) -> usize {
108 self.max_buffer_size
109 }
110
111 /// Queue a frame to be sent to the remote
112 pub fn queue_frame<B>(
113 &mut self,
114 frame: Frame<B>,
115 buffer: &mut Buffer<Frame<B>>,
116 stream: &mut store::Ptr,
117 task: &mut Option<Waker>,
118 ) {
119 let span = tracing::trace_span!("Prioritize::queue_frame", ?stream.id);
120 let _e = span.enter();
121 // Queue the frame in the buffer
122 stream.pending_send.push_back(buffer, frame);
123 self.schedule_send(stream, task);
124 }
125
126 pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
127 // If the stream is waiting to be opened, nothing more to do.
128 if stream.is_send_ready() {
129 tracing::trace!(?stream.id, "schedule_send");
130 // Queue the stream
131 self.pending_send.push(stream);
132
133 // Notify the connection.
134 if let Some(task) = task.take() {
135 task.wake();
136 }
137 }
138 }
139
140 pub fn queue_open(&mut self, stream: &mut store::Ptr) {
141 self.pending_open.push(stream);
142 }
143
144 /// Send a data frame
145 pub fn send_data<B>(
146 &mut self,
147 frame: frame::Data<B>,
148 buffer: &mut Buffer<Frame<B>>,
149 stream: &mut store::Ptr,
150 counts: &mut Counts,
151 task: &mut Option<Waker>,
152 ) -> Result<(), UserError>
153 where
154 B: Buf,
155 {
156 let sz = frame.payload().remaining();
157
158 if sz > MAX_WINDOW_SIZE as usize {
159 return Err(UserError::PayloadTooBig);
160 }
161
162 let sz = sz as WindowSize;
163
164 if !stream.state.is_send_streaming() {
165 if stream.state.is_closed() {
166 return Err(InactiveStreamId);
167 } else {
168 return Err(UnexpectedFrameType);
169 }
170 }
171
172 // Update the buffered data counter
173 stream.buffered_send_data += sz as usize;
174
175 let span =
176 tracing::trace_span!("send_data", sz, requested = stream.requested_send_capacity);
177 let _e = span.enter();
178 tracing::trace!(buffered = stream.buffered_send_data);
179
180 // Implicitly request more send capacity if not enough has been
181 // requested yet.
182 if (stream.requested_send_capacity as usize) < stream.buffered_send_data {
183 // Update the target requested capacity
184 stream.requested_send_capacity =
185 cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize;
186
187 // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity
188 // cannot be assigned at the time it is called.
189 //
190 // Streams over the max concurrent count will still call `send_data` so we should be
191 // careful not to put it into `pending_capacity` as it will starve the connection
192 // capacity for other streams
193 if !stream.is_pending_open {
194 self.try_assign_capacity(stream);
195 }
196 }
197
198 if frame.is_end_stream() {
199 stream.state.send_close();
200 self.reserve_capacity(0, stream, counts);
201 }
202
203 tracing::trace!(
204 available = %stream.send_flow.available(),
205 buffered = stream.buffered_send_data,
206 );
207
208 // The `stream.buffered_send_data == 0` check is here so that, if a zero
209 // length data frame is queued to the front (there is no previously
210 // queued data), it gets sent out immediately even if there is no
211 // available send window.
212 //
213 // Sending out zero length data frames can be done to signal
214 // end-of-stream.
215 //
216 if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 {
217 // The stream currently has capacity to send the data frame, so
218 // queue it up and notify the connection task.
219 self.queue_frame(frame.into(), buffer, stream, task);
220 } else {
221 // The stream has no capacity to send the frame now, save it but
222 // don't notify the connection task. Once additional capacity
223 // becomes available, the frame will be flushed.
224 stream.pending_send.push_back(buffer, frame.into());
225 }
226
227 Ok(())
228 }
229
230 /// Request capacity to send data
231 pub fn reserve_capacity(
232 &mut self,
233 capacity: WindowSize,
234 stream: &mut store::Ptr,
235 counts: &mut Counts,
236 ) {
237 let span = tracing::trace_span!(
238 "reserve_capacity",
239 ?stream.id,
240 requested = capacity,
241 effective = (capacity as usize) + stream.buffered_send_data,
242 curr = stream.requested_send_capacity
243 );
244 let _e = span.enter();
245
246 // Actual capacity is `capacity` + the current amount of buffered data.
247 // If it were less, then we could never send out the buffered data.
248 let capacity = (capacity as usize) + stream.buffered_send_data;
249
250 match capacity.cmp(&(stream.requested_send_capacity as usize)) {
251 Ordering::Equal => {
252 // Nothing to do
253 }
254 Ordering::Less => {
255 // Update the target requested capacity
256 stream.requested_send_capacity = capacity as WindowSize;
257
258 // Currently available capacity assigned to the stream
259 let available = stream.send_flow.available().as_size();
260
261 // If the stream has more assigned capacity than requested, reclaim
262 // some for the connection
263 if available as usize > capacity {
264 let diff = available - capacity as WindowSize;
265
266 // TODO: proper error handling
267 let _res = stream.send_flow.claim_capacity(diff);
268 debug_assert!(_res.is_ok());
269
270 self.assign_connection_capacity(diff, stream, counts);
271 }
272 }
273 Ordering::Greater => {
274 // If trying to *add* capacity, but the stream send side is closed,
275 // there's nothing to be done.
276 if stream.state.is_send_closed() {
277 return;
278 }
279
280 // Update the target requested capacity
281 stream.requested_send_capacity =
282 cmp::min(capacity, WindowSize::MAX as usize) as WindowSize;
283
284 // Try to assign additional capacity to the stream. If none is
285 // currently available, the stream will be queued to receive some
286 // when more becomes available.
287 self.try_assign_capacity(stream);
288 }
289 }
290 }
291
292 pub fn recv_stream_window_update(
293 &mut self,
294 inc: WindowSize,
295 stream: &mut store::Ptr,
296 ) -> Result<(), Reason> {
297 let span = tracing::trace_span!(
298 "recv_stream_window_update",
299 ?stream.id,
300 ?stream.state,
301 inc,
302 flow = ?stream.send_flow
303 );
304 let _e = span.enter();
305
306 if stream.state.is_send_closed() && stream.buffered_send_data == 0 {
307 // We can't send any data, so don't bother doing anything else.
308 return Ok(());
309 }
310
311 // Update the stream level flow control.
312 stream.send_flow.inc_window(inc)?;
313
314 // If the stream is waiting on additional capacity, then this will
315 // assign it (if available on the connection) and notify the producer
316 self.try_assign_capacity(stream);
317
318 Ok(())
319 }
320
321 pub fn recv_connection_window_update(
322 &mut self,
323 inc: WindowSize,
324 store: &mut Store,
325 counts: &mut Counts,
326 ) -> Result<(), Reason> {
327 // Update the connection's window
328 self.flow.inc_window(inc)?;
329
330 self.assign_connection_capacity(inc, store, counts);
331 Ok(())
332 }
333
334 /// Reclaim all capacity assigned to the stream and re-assign it to the
335 /// connection
336 pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
337 let available = stream.send_flow.available().as_size();
338 if available > 0 {
339 // TODO: proper error handling
340 let _res = stream.send_flow.claim_capacity(available);
341 debug_assert!(_res.is_ok());
342 // Re-assign all capacity to the connection
343 self.assign_connection_capacity(available, stream, counts);
344 }
345 }
346
347 /// Reclaim just reserved capacity, not buffered capacity, and re-assign
348 /// it to the connection
349 pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
350 // only reclaim requested capacity that isn't already buffered
351 if stream.requested_send_capacity as usize > stream.buffered_send_data {
352 let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;
353
354 // TODO: proper error handling
355 let _res = stream.send_flow.claim_capacity(reserved);
356 debug_assert!(_res.is_ok());
357 self.assign_connection_capacity(reserved, stream, counts);
358 }
359 }
360
361 pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
362 let span = tracing::trace_span!("clear_pending_capacity");
363 let _e = span.enter();
364 while let Some(stream) = self.pending_capacity.pop(store) {
365 counts.transition(stream, |_, stream| {
366 tracing::trace!(?stream.id, "clear_pending_capacity");
367 })
368 }
369 }
370
371 pub fn assign_connection_capacity<R>(
372 &mut self,
373 inc: WindowSize,
374 store: &mut R,
375 counts: &mut Counts,
376 ) where
377 R: Resolve,
378 {
379 let span = tracing::trace_span!("assign_connection_capacity", inc);
380 let _e = span.enter();
381
382 // TODO: proper error handling
383 let _res = self.flow.assign_capacity(inc);
384 debug_assert!(_res.is_ok());
385
386 // Assign newly acquired capacity to streams pending capacity.
387 while self.flow.available() > 0 {
388 let stream = match self.pending_capacity.pop(store) {
389 Some(stream) => stream,
390 None => return,
391 };
392
393 // Streams pending capacity may have been reset before capacity
394 // became available. In that case, the stream won't want any
395 // capacity, and so we shouldn't "transition" on it, but just evict
396 // it and continue the loop.
397 if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
398 continue;
399 }
400
401 counts.transition(stream, |_, stream| {
402 // Try to assign capacity to the stream. This will also re-queue the
403 // stream if there isn't enough connection level capacity to fulfill
404 // the capacity request.
405 self.try_assign_capacity(stream);
406 })
407 }
408 }
409
410 /// Request capacity to send data
411 fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
412 let total_requested = stream.requested_send_capacity;
413
414 // Total requested should never go below actual assigned
415 // (Note: the window size can go lower than assigned)
416 debug_assert!(stream.send_flow.available() <= total_requested as usize);
417
418 // The amount of additional capacity that the stream requests.
419 // Don't assign more than the window has available!
420 let additional = cmp::min(
421 total_requested - stream.send_flow.available().as_size(),
422 // Can't assign more than what is available
423 stream.send_flow.window_size() - stream.send_flow.available().as_size(),
424 );
425 let span = tracing::trace_span!("try_assign_capacity", ?stream.id);
426 let _e = span.enter();
427 tracing::trace!(
428 requested = total_requested,
429 additional,
430 buffered = stream.buffered_send_data,
431 window = stream.send_flow.window_size(),
432 conn = %self.flow.available()
433 );
434
435 if additional == 0 {
436 // Nothing more to do
437 return;
438 }
439
440 // If the stream has requested capacity, then it must be in the
441 // streaming state (more data could be sent) or there is buffered data
442 // waiting to be sent.
443 debug_assert!(
444 stream.state.is_send_streaming() || stream.buffered_send_data > 0,
445 "state={:?}",
446 stream.state
447 );
448
449 // The amount of currently available capacity on the connection
450 let conn_available = self.flow.available().as_size();
451
452 // First check if capacity is immediately available
453 if conn_available > 0 {
454 // The amount of capacity to assign to the stream
455 // TODO: Should prioritization factor into this?
456 let assign = cmp::min(conn_available, additional);
457
458 tracing::trace!(capacity = assign, "assigning");
459
460 // Assign the capacity to the stream
461 stream.assign_capacity(assign, self.max_buffer_size);
462
463 // Claim the capacity from the connection
464 // TODO: proper error handling
465 let _res = self.flow.claim_capacity(assign);
466 debug_assert!(_res.is_ok());
467 }
468
469 tracing::trace!(
470 available = %stream.send_flow.available(),
471 requested = stream.requested_send_capacity,
472 buffered = stream.buffered_send_data,
473 has_unavailable = %stream.send_flow.has_unavailable()
474 );
475
476 if stream.send_flow.available() < stream.requested_send_capacity as usize
477 && stream.send_flow.has_unavailable()
478 {
479 // The stream requires additional capacity and the stream's
480 // window has available capacity, but the connection window
481 // does not.
482 //
483 // In this case, the stream needs to be queued up for when the
484 // connection has more capacity.
485 self.pending_capacity.push(stream);
486 }
487
488 // If data is buffered and the stream is send ready, then
489 // schedule the stream for execution
490 if stream.buffered_send_data > 0 && stream.is_send_ready() {
491 // TODO: This assertion isn't *exactly* correct. There can still be
492 // buffered send data while the stream's pending send queue is
493 // empty. This can happen when a large data frame is in the process
494 // of being **partially** sent. Once the window has been sent, the
495 // data frame will be returned to the prioritization layer to be
496 // re-scheduled.
497 //
498 // That said, it would be nice to figure out how to make this
499 // assertion correctly.
500 //
501 // debug_assert!(!stream.pending_send.is_empty());
502
503 self.pending_send.push(stream);
504 }
505 }
506
507 pub fn poll_complete<T, B>(
508 &mut self,
509 cx: &mut Context,
510 buffer: &mut Buffer<Frame<B>>,
511 store: &mut Store,
512 counts: &mut Counts,
513 dst: &mut Codec<T, Prioritized<B>>,
514 ) -> Poll<io::Result<()>>
515 where
516 T: AsyncWrite + Unpin,
517 B: Buf,
518 {
519 // Ensure codec is ready
520 ready!(dst.poll_ready(cx))?;
521
522 // Reclaim any frame that has previously been written
523 self.reclaim_frame(buffer, store, dst);
524
525 // The max frame length
526 let max_frame_len = dst.max_send_frame_size();
527
528 tracing::trace!("poll_complete");
529
530 loop {
531 if let Some(mut stream) = self.pop_pending_open(store, counts) {
532 self.pending_send.push_front(&mut stream);
533 self.try_assign_capacity(&mut stream);
534 }
535
536 match self.pop_frame(buffer, store, max_frame_len, counts) {
537 Some(frame) => {
538 tracing::trace!(?frame, "writing");
539
540 debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing);
541 if let Frame::Data(ref frame) = frame {
542 self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream);
543 }
544 dst.buffer(frame).expect("invalid frame");
545
546 // Ensure the codec is ready to try the loop again.
547 ready!(dst.poll_ready(cx))?;
548
549 // Because, always try to reclaim...
550 self.reclaim_frame(buffer, store, dst);
551 }
552 None => {
553 // Try to flush the codec.
554 ready!(dst.flush(cx))?;
555
556 // This might release a data frame...
557 if !self.reclaim_frame(buffer, store, dst) {
558 return Poll::Ready(Ok(()));
559 }
560
561 // No need to poll ready as poll_complete() does this for
562 // us...
563 }
564 }
565 }
566 }
567
568 /// Tries to reclaim a pending data frame from the codec.
569 ///
570 /// Returns true if a frame was reclaimed.
571 ///
572 /// When a data frame is written to the codec, it may not be written in its
573 /// entirety (large chunks are split up into potentially many data frames).
574 /// In this case, the stream needs to be reprioritized.
575 fn reclaim_frame<T, B>(
576 &mut self,
577 buffer: &mut Buffer<Frame<B>>,
578 store: &mut Store,
579 dst: &mut Codec<T, Prioritized<B>>,
580 ) -> bool
581 where
582 B: Buf,
583 {
584 let span = tracing::trace_span!("try_reclaim_frame");
585 let _e = span.enter();
586
587 // First check if there are any data chunks to take back
588 if let Some(frame) = dst.take_last_data_frame() {
589 self.reclaim_frame_inner(buffer, store, frame)
590 } else {
591 false
592 }
593 }
594
595 fn reclaim_frame_inner<B>(
596 &mut self,
597 buffer: &mut Buffer<Frame<B>>,
598 store: &mut Store,
599 frame: frame::Data<Prioritized<B>>,
600 ) -> bool
601 where
602 B: Buf,
603 {
604 tracing::trace!(
605 ?frame,
606 sz = frame.payload().inner.get_ref().remaining(),
607 "reclaimed"
608 );
609
610 let mut eos = false;
611 let key = frame.payload().stream;
612
613 match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) {
614 InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim"),
615 InFlightData::Drop => {
616 tracing::trace!("not reclaiming frame for cancelled stream");
617 return false;
618 }
619 InFlightData::DataFrame(k) => {
620 debug_assert_eq!(k, key);
621 }
622 }
623
624 let mut frame = frame.map(|prioritized| {
625 // TODO: Ensure fully written
626 eos = prioritized.end_of_stream;
627 prioritized.inner.into_inner()
628 });
629
630 if frame.payload().has_remaining() {
631 let mut stream = store.resolve(key);
632
633 if eos {
634 frame.set_end_stream(true);
635 }
636
637 self.push_back_frame(frame.into(), buffer, &mut stream);
638
639 return true;
640 }
641
642 false
643 }
644
645 /// Push the frame to the front of the stream's deque, scheduling the
646 /// stream if needed.
647 fn push_back_frame<B>(
648 &mut self,
649 frame: Frame<B>,
650 buffer: &mut Buffer<Frame<B>>,
651 stream: &mut store::Ptr,
652 ) {
653 // Push the frame to the front of the stream's deque
654 stream.pending_send.push_front(buffer, frame);
655
656 // If needed, schedule the sender
657 if stream.send_flow.available() > 0 {
658 debug_assert!(!stream.pending_send.is_empty());
659 self.pending_send.push(stream);
660 }
661 }
662
663 pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
664 let span = tracing::trace_span!("clear_queue", ?stream.id);
665 let _e = span.enter();
666
667 // TODO: make this more efficient?
668 while let Some(frame) = stream.pending_send.pop_front(buffer) {
669 tracing::trace!(?frame, "dropping");
670 }
671
672 stream.buffered_send_data = 0;
673 stream.requested_send_capacity = 0;
674 if let InFlightData::DataFrame(key) = self.in_flight_data_frame {
675 if stream.key() == key {
676 // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed.
677 self.in_flight_data_frame = InFlightData::Drop;
678 }
679 }
680 }
681
682 pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
683 while let Some(stream) = self.pending_send.pop(store) {
684 let is_pending_reset = stream.is_pending_reset_expiration();
685 counts.transition_after(stream, is_pending_reset);
686 }
687 }
688
689 pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
690 while let Some(stream) = self.pending_open.pop(store) {
691 let is_pending_reset = stream.is_pending_reset_expiration();
692 counts.transition_after(stream, is_pending_reset);
693 }
694 }
695
696 fn pop_frame<B>(
697 &mut self,
698 buffer: &mut Buffer<Frame<B>>,
699 store: &mut Store,
700 max_len: usize,
701 counts: &mut Counts,
702 ) -> Option<Frame<Prioritized<B>>>
703 where
704 B: Buf,
705 {
706 let span = tracing::trace_span!("pop_frame");
707 let _e = span.enter();
708
709 loop {
710 match self.pending_send.pop(store) {
711 Some(mut stream) => {
712 let span = tracing::trace_span!("popped", ?stream.id, ?stream.state);
713 let _e = span.enter();
714
715 // It's possible that this stream, besides having data to send,
716 // is also queued to send a reset, and thus is already in the queue
717 // to wait for "some time" after a reset.
718 //
719 // To be safe, we just always ask the stream.
720 let is_pending_reset = stream.is_pending_reset_expiration();
721
722 tracing::trace!(is_pending_reset);
723
724 let frame = match stream.pending_send.pop_front(buffer) {
725 Some(Frame::Data(mut frame)) => {
726 // Get the amount of capacity remaining for stream's
727 // window.
728 let stream_capacity = stream.send_flow.available();
729 let sz = frame.payload().remaining();
730
731 tracing::trace!(
732 sz,
733 eos = frame.is_end_stream(),
734 window = %stream_capacity,
735 available = %stream.send_flow.available(),
736 requested = stream.requested_send_capacity,
737 buffered = stream.buffered_send_data,
738 "data frame"
739 );
740
741 // Zero length data frames always have capacity to
742 // be sent.
743 if sz > 0 && stream_capacity == 0 {
744 tracing::trace!("stream capacity is 0");
745
746 // Ensure that the stream is waiting for
747 // connection level capacity
748 //
749 // TODO: uncomment
750 // debug_assert!(stream.is_pending_send_capacity);
751
752 // The stream has no more capacity, this can
753 // happen if the remote reduced the stream
754 // window. In this case, we need to buffer the
755 // frame and wait for a window update...
756 stream.pending_send.push_front(buffer, frame.into());
757
758 continue;
759 }
760
761 // Only send up to the max frame length
762 let len = cmp::min(sz, max_len);
763
764 // Only send up to the stream's window capacity
765 let len =
766 cmp::min(len, stream_capacity.as_size() as usize) as WindowSize;
767
768 // There *must* be be enough connection level
769 // capacity at this point.
770 debug_assert!(len <= self.flow.window_size());
771
772 // Check if the stream level window the peer knows is available. In some
773 // scenarios, maybe the window we know is available but the window which
774 // peer knows is not.
775 if len > 0 && len > stream.send_flow.window_size() {
776 stream.pending_send.push_front(buffer, frame.into());
777 continue;
778 }
779
780 tracing::trace!(len, "sending data frame");
781
782 // Update the flow control
783 tracing::trace_span!("updating stream flow").in_scope(|| {
784 stream.send_data(len, self.max_buffer_size);
785
786 // Assign the capacity back to the connection that
787 // was just consumed from the stream in the previous
788 // line.
789 // TODO: proper error handling
790 let _res = self.flow.assign_capacity(len);
791 debug_assert!(_res.is_ok());
792 });
793
794 let (eos, len) = tracing::trace_span!("updating connection flow")
795 .in_scope(|| {
796 // TODO: proper error handling
797 let _res = self.flow.send_data(len);
798 debug_assert!(_res.is_ok());
799
800 // Wrap the frame's data payload to ensure that the
801 // correct amount of data gets written.
802
803 let eos = frame.is_end_stream();
804 let len = len as usize;
805
806 if frame.payload().remaining() > len {
807 frame.set_end_stream(false);
808 }
809 (eos, len)
810 });
811
812 Frame::Data(frame.map(|buf| Prioritized {
813 inner: buf.take(len),
814 end_of_stream: eos,
815 stream: stream.key(),
816 }))
817 }
818 Some(Frame::PushPromise(pp)) => {
819 let mut pushed =
820 stream.store_mut().find_mut(&pp.promised_id()).unwrap();
821 pushed.is_pending_push = false;
822 // Transition stream from pending_push to pending_open
823 // if possible
824 if !pushed.pending_send.is_empty() {
825 if counts.can_inc_num_send_streams() {
826 counts.inc_num_send_streams(&mut pushed);
827 self.pending_send.push(&mut pushed);
828 } else {
829 self.queue_open(&mut pushed);
830 }
831 }
832 Frame::PushPromise(pp)
833 }
834 Some(frame) => frame.map(|_| {
835 unreachable!(
836 "Frame::map closure will only be called \
837 on DATA frames."
838 )
839 }),
840 None => {
841 if let Some(reason) = stream.state.get_scheduled_reset() {
842 let stream_id = stream.id;
843 stream
844 .state
845 .set_reset(stream_id, reason, Initiator::Library);
846
847 let frame = frame::Reset::new(stream.id, reason);
848 Frame::Reset(frame)
849 } else {
850 // If the stream receives a RESET from the peer, it may have
851 // had data buffered to be sent, but all the frames are cleared
852 // in clear_queue(). Instead of doing O(N) traversal through queue
853 // to remove, lets just ignore the stream here.
854 tracing::trace!("removing dangling stream from pending_send");
855 // Since this should only happen as a consequence of `clear_queue`,
856 // we must be in a closed state of some kind.
857 debug_assert!(stream.state.is_closed());
858 counts.transition_after(stream, is_pending_reset);
859 continue;
860 }
861 }
862 };
863
864 tracing::trace!("pop_frame; frame={:?}", frame);
865
866 if cfg!(debug_assertions) && stream.state.is_idle() {
867 debug_assert!(stream.id > self.last_opened_id);
868 self.last_opened_id = stream.id;
869 }
870
871 if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() {
872 // TODO: Only requeue the sender IF it is ready to send
873 // the next frame. i.e. don't requeue it if the next
874 // frame is a data frame and the stream does not have
875 // any more capacity.
876 self.pending_send.push(&mut stream);
877 }
878
879 counts.transition_after(stream, is_pending_reset);
880
881 return Some(frame);
882 }
883 None => return None,
884 }
885 }
886 }
887
888 fn pop_pending_open<'s>(
889 &mut self,
890 store: &'s mut Store,
891 counts: &mut Counts,
892 ) -> Option<store::Ptr<'s>> {
893 tracing::trace!("schedule_pending_open");
894 // check for any pending open streams
895 if counts.can_inc_num_send_streams() {
896 if let Some(mut stream) = self.pending_open.pop(store) {
897 tracing::trace!("schedule_pending_open; stream={:?}", stream.id);
898
899 counts.inc_num_send_streams(&mut stream);
900 stream.notify_send();
901 return Some(stream);
902 }
903 }
904
905 None
906 }
907}
908
909// ===== impl Prioritized =====
910
911impl<B> Buf for Prioritized<B>
912where
913 B: Buf,
914{
915 fn remaining(&self) -> usize {
916 self.inner.remaining()
917 }
918
919 fn chunk(&self) -> &[u8] {
920 self.inner.chunk()
921 }
922
923 fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize {
924 self.inner.chunks_vectored(dst)
925 }
926
927 fn advance(&mut self, cnt: usize) {
928 self.inner.advance(cnt)
929 }
930}
931
932impl<B: Buf> fmt::Debug for Prioritized<B> {
933 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
934 fmt&mut DebugStruct<'_, '_>.debug_struct("Prioritized")
935 .field("remaining", &self.inner.get_ref().remaining())
936 .field("end_of_stream", &self.end_of_stream)
937 .field(name:"stream", &self.stream)
938 .finish()
939 }
940}
941