1 | use super::store::Resolve; |
2 | use super::*; |
3 | |
4 | use crate::frame::{Reason, StreamId}; |
5 | |
6 | use crate::codec::UserError; |
7 | use crate::codec::UserError::*; |
8 | |
9 | use bytes::buf::{Buf, Take}; |
10 | use std::{ |
11 | cmp::{self, Ordering}, |
12 | fmt, io, mem, |
13 | task::{Context, Poll, Waker}, |
14 | }; |
15 | |
16 | /// # Warning |
17 | /// |
18 | /// Queued streams are ordered by stream ID, as we need to ensure that |
19 | /// lower-numbered streams are sent headers before higher-numbered ones. |
20 | /// This is because "idle" stream IDs – those which have been initiated but |
21 | /// have yet to receive frames – will be implicitly closed on receipt of a |
22 | /// frame on a higher stream ID. If these queues was not ordered by stream |
23 | /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] |
24 | /// idle stream is opened first. |
25 | #[derive (Debug)] |
26 | pub(super) struct Prioritize { |
27 | /// Queue of streams waiting for socket capacity to send a frame. |
28 | pending_send: store::Queue<stream::NextSend>, |
29 | |
30 | /// Queue of streams waiting for window capacity to produce data. |
31 | pending_capacity: store::Queue<stream::NextSendCapacity>, |
32 | |
33 | /// Streams waiting for capacity due to max concurrency |
34 | /// |
35 | /// The `SendRequest` handle is `Clone`. This enables initiating requests |
36 | /// from many tasks. However, offering this capability while supporting |
37 | /// backpressure at some level is tricky. If there are many `SendRequest` |
38 | /// handles and a single stream becomes available, which handle gets |
39 | /// assigned that stream? Maybe that handle is no longer ready to send a |
40 | /// request. |
41 | /// |
42 | /// The strategy used is to allow each `SendRequest` handle one buffered |
43 | /// request. A `SendRequest` handle is ready to send a request if it has no |
44 | /// associated buffered requests. This is the same strategy as `mpsc` in the |
45 | /// futures library. |
46 | pending_open: store::Queue<stream::NextOpen>, |
47 | |
48 | /// Connection level flow control governing sent data |
49 | flow: FlowControl, |
50 | |
51 | /// Stream ID of the last stream opened. |
52 | last_opened_id: StreamId, |
53 | |
54 | /// What `DATA` frame is currently being sent in the codec. |
55 | in_flight_data_frame: InFlightData, |
56 | |
57 | /// The maximum amount of bytes a stream should buffer. |
58 | max_buffer_size: usize, |
59 | } |
60 | |
61 | #[derive (Debug, Eq, PartialEq)] |
62 | enum InFlightData { |
63 | /// There is no `DATA` frame in flight. |
64 | Nothing, |
65 | /// There is a `DATA` frame in flight belonging to the given stream. |
66 | DataFrame(store::Key), |
67 | /// There was a `DATA` frame, but the stream's queue was since cleared. |
68 | Drop, |
69 | } |
70 | |
71 | pub(crate) struct Prioritized<B> { |
72 | // The buffer |
73 | inner: Take<B>, |
74 | |
75 | end_of_stream: bool, |
76 | |
77 | // The stream that this is associated with |
78 | stream: store::Key, |
79 | } |
80 | |
81 | // ===== impl Prioritize ===== |
82 | |
83 | impl Prioritize { |
84 | pub fn new(config: &Config) -> Prioritize { |
85 | let mut flow = FlowControl::new(); |
86 | |
87 | flow.inc_window(config.remote_init_window_sz) |
88 | .expect("invalid initial window size" ); |
89 | |
90 | // TODO: proper error handling |
91 | let _res = flow.assign_capacity(config.remote_init_window_sz); |
92 | debug_assert!(_res.is_ok()); |
93 | |
94 | tracing::trace!("Prioritize::new; flow= {:?}" , flow); |
95 | |
96 | Prioritize { |
97 | pending_send: store::Queue::new(), |
98 | pending_capacity: store::Queue::new(), |
99 | pending_open: store::Queue::new(), |
100 | flow, |
101 | last_opened_id: StreamId::ZERO, |
102 | in_flight_data_frame: InFlightData::Nothing, |
103 | max_buffer_size: config.local_max_buffer_size, |
104 | } |
105 | } |
106 | |
107 | pub(crate) fn max_buffer_size(&self) -> usize { |
108 | self.max_buffer_size |
109 | } |
110 | |
111 | /// Queue a frame to be sent to the remote |
112 | pub fn queue_frame<B>( |
113 | &mut self, |
114 | frame: Frame<B>, |
115 | buffer: &mut Buffer<Frame<B>>, |
116 | stream: &mut store::Ptr, |
117 | task: &mut Option<Waker>, |
118 | ) { |
119 | let span = tracing::trace_span!("Prioritize::queue_frame" , ?stream.id); |
120 | let _e = span.enter(); |
121 | // Queue the frame in the buffer |
122 | stream.pending_send.push_back(buffer, frame); |
123 | self.schedule_send(stream, task); |
124 | } |
125 | |
126 | pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { |
127 | // If the stream is waiting to be opened, nothing more to do. |
128 | if stream.is_send_ready() { |
129 | tracing::trace!(?stream.id, "schedule_send" ); |
130 | // Queue the stream |
131 | self.pending_send.push(stream); |
132 | |
133 | // Notify the connection. |
134 | if let Some(task) = task.take() { |
135 | task.wake(); |
136 | } |
137 | } |
138 | } |
139 | |
140 | pub fn queue_open(&mut self, stream: &mut store::Ptr) { |
141 | self.pending_open.push(stream); |
142 | } |
143 | |
144 | /// Send a data frame |
145 | pub fn send_data<B>( |
146 | &mut self, |
147 | frame: frame::Data<B>, |
148 | buffer: &mut Buffer<Frame<B>>, |
149 | stream: &mut store::Ptr, |
150 | counts: &mut Counts, |
151 | task: &mut Option<Waker>, |
152 | ) -> Result<(), UserError> |
153 | where |
154 | B: Buf, |
155 | { |
156 | let sz = frame.payload().remaining(); |
157 | |
158 | if sz > MAX_WINDOW_SIZE as usize { |
159 | return Err(UserError::PayloadTooBig); |
160 | } |
161 | |
162 | let sz = sz as WindowSize; |
163 | |
164 | if !stream.state.is_send_streaming() { |
165 | if stream.state.is_closed() { |
166 | return Err(InactiveStreamId); |
167 | } else { |
168 | return Err(UnexpectedFrameType); |
169 | } |
170 | } |
171 | |
172 | // Update the buffered data counter |
173 | stream.buffered_send_data += sz as usize; |
174 | |
175 | let span = |
176 | tracing::trace_span!("send_data" , sz, requested = stream.requested_send_capacity); |
177 | let _e = span.enter(); |
178 | tracing::trace!(buffered = stream.buffered_send_data); |
179 | |
180 | // Implicitly request more send capacity if not enough has been |
181 | // requested yet. |
182 | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { |
183 | // Update the target requested capacity |
184 | stream.requested_send_capacity = |
185 | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; |
186 | |
187 | // `try_assign_capacity` will queue the stream to `pending_capacity` if the capcaity |
188 | // cannot be assigned at the time it is called. |
189 | // |
190 | // Streams over the max concurrent count will still call `send_data` so we should be |
191 | // careful not to put it into `pending_capacity` as it will starve the connection |
192 | // capacity for other streams |
193 | if !stream.is_pending_open { |
194 | self.try_assign_capacity(stream); |
195 | } |
196 | } |
197 | |
198 | if frame.is_end_stream() { |
199 | stream.state.send_close(); |
200 | self.reserve_capacity(0, stream, counts); |
201 | } |
202 | |
203 | tracing::trace!( |
204 | available = %stream.send_flow.available(), |
205 | buffered = stream.buffered_send_data, |
206 | ); |
207 | |
208 | // The `stream.buffered_send_data == 0` check is here so that, if a zero |
209 | // length data frame is queued to the front (there is no previously |
210 | // queued data), it gets sent out immediately even if there is no |
211 | // available send window. |
212 | // |
213 | // Sending out zero length data frames can be done to signal |
214 | // end-of-stream. |
215 | // |
216 | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { |
217 | // The stream currently has capacity to send the data frame, so |
218 | // queue it up and notify the connection task. |
219 | self.queue_frame(frame.into(), buffer, stream, task); |
220 | } else { |
221 | // The stream has no capacity to send the frame now, save it but |
222 | // don't notify the connection task. Once additional capacity |
223 | // becomes available, the frame will be flushed. |
224 | stream.pending_send.push_back(buffer, frame.into()); |
225 | } |
226 | |
227 | Ok(()) |
228 | } |
229 | |
230 | /// Request capacity to send data |
231 | pub fn reserve_capacity( |
232 | &mut self, |
233 | capacity: WindowSize, |
234 | stream: &mut store::Ptr, |
235 | counts: &mut Counts, |
236 | ) { |
237 | let span = tracing::trace_span!( |
238 | "reserve_capacity" , |
239 | ?stream.id, |
240 | requested = capacity, |
241 | effective = (capacity as usize) + stream.buffered_send_data, |
242 | curr = stream.requested_send_capacity |
243 | ); |
244 | let _e = span.enter(); |
245 | |
246 | // Actual capacity is `capacity` + the current amount of buffered data. |
247 | // If it were less, then we could never send out the buffered data. |
248 | let capacity = (capacity as usize) + stream.buffered_send_data; |
249 | |
250 | match capacity.cmp(&(stream.requested_send_capacity as usize)) { |
251 | Ordering::Equal => { |
252 | // Nothing to do |
253 | } |
254 | Ordering::Less => { |
255 | // Update the target requested capacity |
256 | stream.requested_send_capacity = capacity as WindowSize; |
257 | |
258 | // Currently available capacity assigned to the stream |
259 | let available = stream.send_flow.available().as_size(); |
260 | |
261 | // If the stream has more assigned capacity than requested, reclaim |
262 | // some for the connection |
263 | if available as usize > capacity { |
264 | let diff = available - capacity as WindowSize; |
265 | |
266 | // TODO: proper error handling |
267 | let _res = stream.send_flow.claim_capacity(diff); |
268 | debug_assert!(_res.is_ok()); |
269 | |
270 | self.assign_connection_capacity(diff, stream, counts); |
271 | } |
272 | } |
273 | Ordering::Greater => { |
274 | // If trying to *add* capacity, but the stream send side is closed, |
275 | // there's nothing to be done. |
276 | if stream.state.is_send_closed() { |
277 | return; |
278 | } |
279 | |
280 | // Update the target requested capacity |
281 | stream.requested_send_capacity = |
282 | cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; |
283 | |
284 | // Try to assign additional capacity to the stream. If none is |
285 | // currently available, the stream will be queued to receive some |
286 | // when more becomes available. |
287 | self.try_assign_capacity(stream); |
288 | } |
289 | } |
290 | } |
291 | |
292 | pub fn recv_stream_window_update( |
293 | &mut self, |
294 | inc: WindowSize, |
295 | stream: &mut store::Ptr, |
296 | ) -> Result<(), Reason> { |
297 | let span = tracing::trace_span!( |
298 | "recv_stream_window_update" , |
299 | ?stream.id, |
300 | ?stream.state, |
301 | inc, |
302 | flow = ?stream.send_flow |
303 | ); |
304 | let _e = span.enter(); |
305 | |
306 | if stream.state.is_send_closed() && stream.buffered_send_data == 0 { |
307 | // We can't send any data, so don't bother doing anything else. |
308 | return Ok(()); |
309 | } |
310 | |
311 | // Update the stream level flow control. |
312 | stream.send_flow.inc_window(inc)?; |
313 | |
314 | // If the stream is waiting on additional capacity, then this will |
315 | // assign it (if available on the connection) and notify the producer |
316 | self.try_assign_capacity(stream); |
317 | |
318 | Ok(()) |
319 | } |
320 | |
321 | pub fn recv_connection_window_update( |
322 | &mut self, |
323 | inc: WindowSize, |
324 | store: &mut Store, |
325 | counts: &mut Counts, |
326 | ) -> Result<(), Reason> { |
327 | // Update the connection's window |
328 | self.flow.inc_window(inc)?; |
329 | |
330 | self.assign_connection_capacity(inc, store, counts); |
331 | Ok(()) |
332 | } |
333 | |
334 | /// Reclaim all capacity assigned to the stream and re-assign it to the |
335 | /// connection |
336 | pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
337 | let available = stream.send_flow.available().as_size(); |
338 | if available > 0 { |
339 | // TODO: proper error handling |
340 | let _res = stream.send_flow.claim_capacity(available); |
341 | debug_assert!(_res.is_ok()); |
342 | // Re-assign all capacity to the connection |
343 | self.assign_connection_capacity(available, stream, counts); |
344 | } |
345 | } |
346 | |
347 | /// Reclaim just reserved capacity, not buffered capacity, and re-assign |
348 | /// it to the connection |
349 | pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
350 | // only reclaim requested capacity that isn't already buffered |
351 | if stream.requested_send_capacity as usize > stream.buffered_send_data { |
352 | let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize; |
353 | |
354 | // TODO: proper error handling |
355 | let _res = stream.send_flow.claim_capacity(reserved); |
356 | debug_assert!(_res.is_ok()); |
357 | self.assign_connection_capacity(reserved, stream, counts); |
358 | } |
359 | } |
360 | |
361 | pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { |
362 | let span = tracing::trace_span!("clear_pending_capacity" ); |
363 | let _e = span.enter(); |
364 | while let Some(stream) = self.pending_capacity.pop(store) { |
365 | counts.transition(stream, |_, stream| { |
366 | tracing::trace!(?stream.id, "clear_pending_capacity" ); |
367 | }) |
368 | } |
369 | } |
370 | |
371 | pub fn assign_connection_capacity<R>( |
372 | &mut self, |
373 | inc: WindowSize, |
374 | store: &mut R, |
375 | counts: &mut Counts, |
376 | ) where |
377 | R: Resolve, |
378 | { |
379 | let span = tracing::trace_span!("assign_connection_capacity" , inc); |
380 | let _e = span.enter(); |
381 | |
382 | // TODO: proper error handling |
383 | let _res = self.flow.assign_capacity(inc); |
384 | debug_assert!(_res.is_ok()); |
385 | |
386 | // Assign newly acquired capacity to streams pending capacity. |
387 | while self.flow.available() > 0 { |
388 | let stream = match self.pending_capacity.pop(store) { |
389 | Some(stream) => stream, |
390 | None => return, |
391 | }; |
392 | |
393 | // Streams pending capacity may have been reset before capacity |
394 | // became available. In that case, the stream won't want any |
395 | // capacity, and so we shouldn't "transition" on it, but just evict |
396 | // it and continue the loop. |
397 | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { |
398 | continue; |
399 | } |
400 | |
401 | counts.transition(stream, |_, stream| { |
402 | // Try to assign capacity to the stream. This will also re-queue the |
403 | // stream if there isn't enough connection level capacity to fulfill |
404 | // the capacity request. |
405 | self.try_assign_capacity(stream); |
406 | }) |
407 | } |
408 | } |
409 | |
410 | /// Request capacity to send data |
411 | fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { |
412 | let total_requested = stream.requested_send_capacity; |
413 | |
414 | // Total requested should never go below actual assigned |
415 | // (Note: the window size can go lower than assigned) |
416 | debug_assert!(stream.send_flow.available() <= total_requested as usize); |
417 | |
418 | // The amount of additional capacity that the stream requests. |
419 | // Don't assign more than the window has available! |
420 | let additional = cmp::min( |
421 | total_requested - stream.send_flow.available().as_size(), |
422 | // Can't assign more than what is available |
423 | stream.send_flow.window_size() - stream.send_flow.available().as_size(), |
424 | ); |
425 | let span = tracing::trace_span!("try_assign_capacity" , ?stream.id); |
426 | let _e = span.enter(); |
427 | tracing::trace!( |
428 | requested = total_requested, |
429 | additional, |
430 | buffered = stream.buffered_send_data, |
431 | window = stream.send_flow.window_size(), |
432 | conn = %self.flow.available() |
433 | ); |
434 | |
435 | if additional == 0 { |
436 | // Nothing more to do |
437 | return; |
438 | } |
439 | |
440 | // If the stream has requested capacity, then it must be in the |
441 | // streaming state (more data could be sent) or there is buffered data |
442 | // waiting to be sent. |
443 | debug_assert!( |
444 | stream.state.is_send_streaming() || stream.buffered_send_data > 0, |
445 | "state= {:?}" , |
446 | stream.state |
447 | ); |
448 | |
449 | // The amount of currently available capacity on the connection |
450 | let conn_available = self.flow.available().as_size(); |
451 | |
452 | // First check if capacity is immediately available |
453 | if conn_available > 0 { |
454 | // The amount of capacity to assign to the stream |
455 | // TODO: Should prioritization factor into this? |
456 | let assign = cmp::min(conn_available, additional); |
457 | |
458 | tracing::trace!(capacity = assign, "assigning" ); |
459 | |
460 | // Assign the capacity to the stream |
461 | stream.assign_capacity(assign, self.max_buffer_size); |
462 | |
463 | // Claim the capacity from the connection |
464 | // TODO: proper error handling |
465 | let _res = self.flow.claim_capacity(assign); |
466 | debug_assert!(_res.is_ok()); |
467 | } |
468 | |
469 | tracing::trace!( |
470 | available = %stream.send_flow.available(), |
471 | requested = stream.requested_send_capacity, |
472 | buffered = stream.buffered_send_data, |
473 | has_unavailable = %stream.send_flow.has_unavailable() |
474 | ); |
475 | |
476 | if stream.send_flow.available() < stream.requested_send_capacity as usize |
477 | && stream.send_flow.has_unavailable() |
478 | { |
479 | // The stream requires additional capacity and the stream's |
480 | // window has available capacity, but the connection window |
481 | // does not. |
482 | // |
483 | // In this case, the stream needs to be queued up for when the |
484 | // connection has more capacity. |
485 | self.pending_capacity.push(stream); |
486 | } |
487 | |
488 | // If data is buffered and the stream is send ready, then |
489 | // schedule the stream for execution |
490 | if stream.buffered_send_data > 0 && stream.is_send_ready() { |
491 | // TODO: This assertion isn't *exactly* correct. There can still be |
492 | // buffered send data while the stream's pending send queue is |
493 | // empty. This can happen when a large data frame is in the process |
494 | // of being **partially** sent. Once the window has been sent, the |
495 | // data frame will be returned to the prioritization layer to be |
496 | // re-scheduled. |
497 | // |
498 | // That said, it would be nice to figure out how to make this |
499 | // assertion correctly. |
500 | // |
501 | // debug_assert!(!stream.pending_send.is_empty()); |
502 | |
503 | self.pending_send.push(stream); |
504 | } |
505 | } |
506 | |
507 | pub fn poll_complete<T, B>( |
508 | &mut self, |
509 | cx: &mut Context, |
510 | buffer: &mut Buffer<Frame<B>>, |
511 | store: &mut Store, |
512 | counts: &mut Counts, |
513 | dst: &mut Codec<T, Prioritized<B>>, |
514 | ) -> Poll<io::Result<()>> |
515 | where |
516 | T: AsyncWrite + Unpin, |
517 | B: Buf, |
518 | { |
519 | // Ensure codec is ready |
520 | ready!(dst.poll_ready(cx))?; |
521 | |
522 | // Reclaim any frame that has previously been written |
523 | self.reclaim_frame(buffer, store, dst); |
524 | |
525 | // The max frame length |
526 | let max_frame_len = dst.max_send_frame_size(); |
527 | |
528 | tracing::trace!("poll_complete" ); |
529 | |
530 | loop { |
531 | if let Some(mut stream) = self.pop_pending_open(store, counts) { |
532 | self.pending_send.push_front(&mut stream); |
533 | self.try_assign_capacity(&mut stream); |
534 | } |
535 | |
536 | match self.pop_frame(buffer, store, max_frame_len, counts) { |
537 | Some(frame) => { |
538 | tracing::trace!(?frame, "writing" ); |
539 | |
540 | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); |
541 | if let Frame::Data(ref frame) = frame { |
542 | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); |
543 | } |
544 | dst.buffer(frame).expect("invalid frame" ); |
545 | |
546 | // Ensure the codec is ready to try the loop again. |
547 | ready!(dst.poll_ready(cx))?; |
548 | |
549 | // Because, always try to reclaim... |
550 | self.reclaim_frame(buffer, store, dst); |
551 | } |
552 | None => { |
553 | // Try to flush the codec. |
554 | ready!(dst.flush(cx))?; |
555 | |
556 | // This might release a data frame... |
557 | if !self.reclaim_frame(buffer, store, dst) { |
558 | return Poll::Ready(Ok(())); |
559 | } |
560 | |
561 | // No need to poll ready as poll_complete() does this for |
562 | // us... |
563 | } |
564 | } |
565 | } |
566 | } |
567 | |
568 | /// Tries to reclaim a pending data frame from the codec. |
569 | /// |
570 | /// Returns true if a frame was reclaimed. |
571 | /// |
572 | /// When a data frame is written to the codec, it may not be written in its |
573 | /// entirety (large chunks are split up into potentially many data frames). |
574 | /// In this case, the stream needs to be reprioritized. |
575 | fn reclaim_frame<T, B>( |
576 | &mut self, |
577 | buffer: &mut Buffer<Frame<B>>, |
578 | store: &mut Store, |
579 | dst: &mut Codec<T, Prioritized<B>>, |
580 | ) -> bool |
581 | where |
582 | B: Buf, |
583 | { |
584 | let span = tracing::trace_span!("try_reclaim_frame" ); |
585 | let _e = span.enter(); |
586 | |
587 | // First check if there are any data chunks to take back |
588 | if let Some(frame) = dst.take_last_data_frame() { |
589 | self.reclaim_frame_inner(buffer, store, frame) |
590 | } else { |
591 | false |
592 | } |
593 | } |
594 | |
595 | fn reclaim_frame_inner<B>( |
596 | &mut self, |
597 | buffer: &mut Buffer<Frame<B>>, |
598 | store: &mut Store, |
599 | frame: frame::Data<Prioritized<B>>, |
600 | ) -> bool |
601 | where |
602 | B: Buf, |
603 | { |
604 | tracing::trace!( |
605 | ?frame, |
606 | sz = frame.payload().inner.get_ref().remaining(), |
607 | "reclaimed" |
608 | ); |
609 | |
610 | let mut eos = false; |
611 | let key = frame.payload().stream; |
612 | |
613 | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { |
614 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim" ), |
615 | InFlightData::Drop => { |
616 | tracing::trace!("not reclaiming frame for cancelled stream" ); |
617 | return false; |
618 | } |
619 | InFlightData::DataFrame(k) => { |
620 | debug_assert_eq!(k, key); |
621 | } |
622 | } |
623 | |
624 | let mut frame = frame.map(|prioritized| { |
625 | // TODO: Ensure fully written |
626 | eos = prioritized.end_of_stream; |
627 | prioritized.inner.into_inner() |
628 | }); |
629 | |
630 | if frame.payload().has_remaining() { |
631 | let mut stream = store.resolve(key); |
632 | |
633 | if eos { |
634 | frame.set_end_stream(true); |
635 | } |
636 | |
637 | self.push_back_frame(frame.into(), buffer, &mut stream); |
638 | |
639 | return true; |
640 | } |
641 | |
642 | false |
643 | } |
644 | |
645 | /// Push the frame to the front of the stream's deque, scheduling the |
646 | /// stream if needed. |
647 | fn push_back_frame<B>( |
648 | &mut self, |
649 | frame: Frame<B>, |
650 | buffer: &mut Buffer<Frame<B>>, |
651 | stream: &mut store::Ptr, |
652 | ) { |
653 | // Push the frame to the front of the stream's deque |
654 | stream.pending_send.push_front(buffer, frame); |
655 | |
656 | // If needed, schedule the sender |
657 | if stream.send_flow.available() > 0 { |
658 | debug_assert!(!stream.pending_send.is_empty()); |
659 | self.pending_send.push(stream); |
660 | } |
661 | } |
662 | |
663 | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { |
664 | let span = tracing::trace_span!("clear_queue" , ?stream.id); |
665 | let _e = span.enter(); |
666 | |
667 | // TODO: make this more efficient? |
668 | while let Some(frame) = stream.pending_send.pop_front(buffer) { |
669 | tracing::trace!(?frame, "dropping" ); |
670 | } |
671 | |
672 | stream.buffered_send_data = 0; |
673 | stream.requested_send_capacity = 0; |
674 | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { |
675 | if stream.key() == key { |
676 | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. |
677 | self.in_flight_data_frame = InFlightData::Drop; |
678 | } |
679 | } |
680 | } |
681 | |
682 | pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { |
683 | while let Some(stream) = self.pending_send.pop(store) { |
684 | let is_pending_reset = stream.is_pending_reset_expiration(); |
685 | counts.transition_after(stream, is_pending_reset); |
686 | } |
687 | } |
688 | |
689 | pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { |
690 | while let Some(stream) = self.pending_open.pop(store) { |
691 | let is_pending_reset = stream.is_pending_reset_expiration(); |
692 | counts.transition_after(stream, is_pending_reset); |
693 | } |
694 | } |
695 | |
696 | fn pop_frame<B>( |
697 | &mut self, |
698 | buffer: &mut Buffer<Frame<B>>, |
699 | store: &mut Store, |
700 | max_len: usize, |
701 | counts: &mut Counts, |
702 | ) -> Option<Frame<Prioritized<B>>> |
703 | where |
704 | B: Buf, |
705 | { |
706 | let span = tracing::trace_span!("pop_frame" ); |
707 | let _e = span.enter(); |
708 | |
709 | loop { |
710 | match self.pending_send.pop(store) { |
711 | Some(mut stream) => { |
712 | let span = tracing::trace_span!("popped" , ?stream.id, ?stream.state); |
713 | let _e = span.enter(); |
714 | |
715 | // It's possible that this stream, besides having data to send, |
716 | // is also queued to send a reset, and thus is already in the queue |
717 | // to wait for "some time" after a reset. |
718 | // |
719 | // To be safe, we just always ask the stream. |
720 | let is_pending_reset = stream.is_pending_reset_expiration(); |
721 | |
722 | tracing::trace!(is_pending_reset); |
723 | |
724 | let frame = match stream.pending_send.pop_front(buffer) { |
725 | Some(Frame::Data(mut frame)) => { |
726 | // Get the amount of capacity remaining for stream's |
727 | // window. |
728 | let stream_capacity = stream.send_flow.available(); |
729 | let sz = frame.payload().remaining(); |
730 | |
731 | tracing::trace!( |
732 | sz, |
733 | eos = frame.is_end_stream(), |
734 | window = %stream_capacity, |
735 | available = %stream.send_flow.available(), |
736 | requested = stream.requested_send_capacity, |
737 | buffered = stream.buffered_send_data, |
738 | "data frame" |
739 | ); |
740 | |
741 | // Zero length data frames always have capacity to |
742 | // be sent. |
743 | if sz > 0 && stream_capacity == 0 { |
744 | tracing::trace!("stream capacity is 0" ); |
745 | |
746 | // Ensure that the stream is waiting for |
747 | // connection level capacity |
748 | // |
749 | // TODO: uncomment |
750 | // debug_assert!(stream.is_pending_send_capacity); |
751 | |
752 | // The stream has no more capacity, this can |
753 | // happen if the remote reduced the stream |
754 | // window. In this case, we need to buffer the |
755 | // frame and wait for a window update... |
756 | stream.pending_send.push_front(buffer, frame.into()); |
757 | |
758 | continue; |
759 | } |
760 | |
761 | // Only send up to the max frame length |
762 | let len = cmp::min(sz, max_len); |
763 | |
764 | // Only send up to the stream's window capacity |
765 | let len = |
766 | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; |
767 | |
768 | // There *must* be be enough connection level |
769 | // capacity at this point. |
770 | debug_assert!(len <= self.flow.window_size()); |
771 | |
772 | // Check if the stream level window the peer knows is available. In some |
773 | // scenarios, maybe the window we know is available but the window which |
774 | // peer knows is not. |
775 | if len > 0 && len > stream.send_flow.window_size() { |
776 | stream.pending_send.push_front(buffer, frame.into()); |
777 | continue; |
778 | } |
779 | |
780 | tracing::trace!(len, "sending data frame" ); |
781 | |
782 | // Update the flow control |
783 | tracing::trace_span!("updating stream flow" ).in_scope(|| { |
784 | stream.send_data(len, self.max_buffer_size); |
785 | |
786 | // Assign the capacity back to the connection that |
787 | // was just consumed from the stream in the previous |
788 | // line. |
789 | // TODO: proper error handling |
790 | let _res = self.flow.assign_capacity(len); |
791 | debug_assert!(_res.is_ok()); |
792 | }); |
793 | |
794 | let (eos, len) = tracing::trace_span!("updating connection flow" ) |
795 | .in_scope(|| { |
796 | // TODO: proper error handling |
797 | let _res = self.flow.send_data(len); |
798 | debug_assert!(_res.is_ok()); |
799 | |
800 | // Wrap the frame's data payload to ensure that the |
801 | // correct amount of data gets written. |
802 | |
803 | let eos = frame.is_end_stream(); |
804 | let len = len as usize; |
805 | |
806 | if frame.payload().remaining() > len { |
807 | frame.set_end_stream(false); |
808 | } |
809 | (eos, len) |
810 | }); |
811 | |
812 | Frame::Data(frame.map(|buf| Prioritized { |
813 | inner: buf.take(len), |
814 | end_of_stream: eos, |
815 | stream: stream.key(), |
816 | })) |
817 | } |
818 | Some(Frame::PushPromise(pp)) => { |
819 | let mut pushed = |
820 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); |
821 | pushed.is_pending_push = false; |
822 | // Transition stream from pending_push to pending_open |
823 | // if possible |
824 | if !pushed.pending_send.is_empty() { |
825 | if counts.can_inc_num_send_streams() { |
826 | counts.inc_num_send_streams(&mut pushed); |
827 | self.pending_send.push(&mut pushed); |
828 | } else { |
829 | self.queue_open(&mut pushed); |
830 | } |
831 | } |
832 | Frame::PushPromise(pp) |
833 | } |
834 | Some(frame) => frame.map(|_| { |
835 | unreachable!( |
836 | "Frame::map closure will only be called \ |
837 | on DATA frames." |
838 | ) |
839 | }), |
840 | None => { |
841 | if let Some(reason) = stream.state.get_scheduled_reset() { |
842 | let stream_id = stream.id; |
843 | stream |
844 | .state |
845 | .set_reset(stream_id, reason, Initiator::Library); |
846 | |
847 | let frame = frame::Reset::new(stream.id, reason); |
848 | Frame::Reset(frame) |
849 | } else { |
850 | // If the stream receives a RESET from the peer, it may have |
851 | // had data buffered to be sent, but all the frames are cleared |
852 | // in clear_queue(). Instead of doing O(N) traversal through queue |
853 | // to remove, lets just ignore the stream here. |
854 | tracing::trace!("removing dangling stream from pending_send" ); |
855 | // Since this should only happen as a consequence of `clear_queue`, |
856 | // we must be in a closed state of some kind. |
857 | debug_assert!(stream.state.is_closed()); |
858 | counts.transition_after(stream, is_pending_reset); |
859 | continue; |
860 | } |
861 | } |
862 | }; |
863 | |
864 | tracing::trace!("pop_frame; frame= {:?}" , frame); |
865 | |
866 | if cfg!(debug_assertions) && stream.state.is_idle() { |
867 | debug_assert!(stream.id > self.last_opened_id); |
868 | self.last_opened_id = stream.id; |
869 | } |
870 | |
871 | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { |
872 | // TODO: Only requeue the sender IF it is ready to send |
873 | // the next frame. i.e. don't requeue it if the next |
874 | // frame is a data frame and the stream does not have |
875 | // any more capacity. |
876 | self.pending_send.push(&mut stream); |
877 | } |
878 | |
879 | counts.transition_after(stream, is_pending_reset); |
880 | |
881 | return Some(frame); |
882 | } |
883 | None => return None, |
884 | } |
885 | } |
886 | } |
887 | |
888 | fn pop_pending_open<'s>( |
889 | &mut self, |
890 | store: &'s mut Store, |
891 | counts: &mut Counts, |
892 | ) -> Option<store::Ptr<'s>> { |
893 | tracing::trace!("schedule_pending_open" ); |
894 | // check for any pending open streams |
895 | if counts.can_inc_num_send_streams() { |
896 | if let Some(mut stream) = self.pending_open.pop(store) { |
897 | tracing::trace!("schedule_pending_open; stream= {:?}" , stream.id); |
898 | |
899 | counts.inc_num_send_streams(&mut stream); |
900 | stream.notify_send(); |
901 | return Some(stream); |
902 | } |
903 | } |
904 | |
905 | None |
906 | } |
907 | } |
908 | |
909 | // ===== impl Prioritized ===== |
910 | |
911 | impl<B> Buf for Prioritized<B> |
912 | where |
913 | B: Buf, |
914 | { |
915 | fn remaining(&self) -> usize { |
916 | self.inner.remaining() |
917 | } |
918 | |
919 | fn chunk(&self) -> &[u8] { |
920 | self.inner.chunk() |
921 | } |
922 | |
923 | fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { |
924 | self.inner.chunks_vectored(dst) |
925 | } |
926 | |
927 | fn advance(&mut self, cnt: usize) { |
928 | self.inner.advance(cnt) |
929 | } |
930 | } |
931 | |
932 | impl<B: Buf> fmt::Debug for Prioritized<B> { |
933 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
934 | fmt&mut DebugStruct<'_, '_>.debug_struct("Prioritized" ) |
935 | .field("remaining" , &self.inner.get_ref().remaining()) |
936 | .field("end_of_stream" , &self.end_of_stream) |
937 | .field(name:"stream" , &self.stream) |
938 | .finish() |
939 | } |
940 | } |
941 | |