1 | use super::store::Resolve; |
2 | use super::*; |
3 | |
4 | use crate::frame::{Reason, StreamId}; |
5 | |
6 | use crate::codec::UserError; |
7 | use crate::codec::UserError::*; |
8 | |
9 | use bytes::buf::{Buf, Take}; |
10 | use std::{ |
11 | cmp::{self, Ordering}, |
12 | fmt, io, mem, |
13 | task::{Context, Poll, Waker}, |
14 | }; |
15 | |
16 | /// # Warning |
17 | /// |
18 | /// Queued streams are ordered by stream ID, as we need to ensure that |
19 | /// lower-numbered streams are sent headers before higher-numbered ones. |
20 | /// This is because "idle" stream IDs – those which have been initiated but |
21 | /// have yet to receive frames – will be implicitly closed on receipt of a |
22 | /// frame on a higher stream ID. If these queues was not ordered by stream |
23 | /// IDs, some mechanism would be necessary to ensure that the lowest-numbered] |
24 | /// idle stream is opened first. |
25 | #[derive (Debug)] |
26 | pub(super) struct Prioritize { |
27 | /// Queue of streams waiting for socket capacity to send a frame. |
28 | pending_send: store::Queue<stream::NextSend>, |
29 | |
30 | /// Queue of streams waiting for window capacity to produce data. |
31 | pending_capacity: store::Queue<stream::NextSendCapacity>, |
32 | |
33 | /// Streams waiting for capacity due to max concurrency |
34 | /// |
35 | /// The `SendRequest` handle is `Clone`. This enables initiating requests |
36 | /// from many tasks. However, offering this capability while supporting |
37 | /// backpressure at some level is tricky. If there are many `SendRequest` |
38 | /// handles and a single stream becomes available, which handle gets |
39 | /// assigned that stream? Maybe that handle is no longer ready to send a |
40 | /// request. |
41 | /// |
42 | /// The strategy used is to allow each `SendRequest` handle one buffered |
43 | /// request. A `SendRequest` handle is ready to send a request if it has no |
44 | /// associated buffered requests. This is the same strategy as `mpsc` in the |
45 | /// futures library. |
46 | pending_open: store::Queue<stream::NextOpen>, |
47 | |
48 | /// Connection level flow control governing sent data |
49 | flow: FlowControl, |
50 | |
51 | /// Stream ID of the last stream opened. |
52 | last_opened_id: StreamId, |
53 | |
54 | /// What `DATA` frame is currently being sent in the codec. |
55 | in_flight_data_frame: InFlightData, |
56 | |
57 | /// The maximum amount of bytes a stream should buffer. |
58 | max_buffer_size: usize, |
59 | } |
60 | |
61 | #[derive (Debug, Eq, PartialEq)] |
62 | enum InFlightData { |
63 | /// There is no `DATA` frame in flight. |
64 | Nothing, |
65 | /// There is a `DATA` frame in flight belonging to the given stream. |
66 | DataFrame(store::Key), |
67 | /// There was a `DATA` frame, but the stream's queue was since cleared. |
68 | Drop, |
69 | } |
70 | |
71 | pub(crate) struct Prioritized<B> { |
72 | // The buffer |
73 | inner: Take<B>, |
74 | |
75 | end_of_stream: bool, |
76 | |
77 | // The stream that this is associated with |
78 | stream: store::Key, |
79 | } |
80 | |
81 | // ===== impl Prioritize ===== |
82 | |
83 | impl Prioritize { |
84 | pub fn new(config: &Config) -> Prioritize { |
85 | let mut flow = FlowControl::new(); |
86 | |
87 | flow.inc_window(config.remote_init_window_sz) |
88 | .expect("invalid initial window size" ); |
89 | |
90 | // TODO: proper error handling |
91 | let _res = flow.assign_capacity(config.remote_init_window_sz); |
92 | debug_assert!(_res.is_ok()); |
93 | |
94 | tracing::trace!("Prioritize::new; flow= {:?}" , flow); |
95 | |
96 | Prioritize { |
97 | pending_send: store::Queue::new(), |
98 | pending_capacity: store::Queue::new(), |
99 | pending_open: store::Queue::new(), |
100 | flow, |
101 | last_opened_id: StreamId::ZERO, |
102 | in_flight_data_frame: InFlightData::Nothing, |
103 | max_buffer_size: config.local_max_buffer_size, |
104 | } |
105 | } |
106 | |
107 | pub(crate) fn max_buffer_size(&self) -> usize { |
108 | self.max_buffer_size |
109 | } |
110 | |
111 | /// Queue a frame to be sent to the remote |
112 | pub fn queue_frame<B>( |
113 | &mut self, |
114 | frame: Frame<B>, |
115 | buffer: &mut Buffer<Frame<B>>, |
116 | stream: &mut store::Ptr, |
117 | task: &mut Option<Waker>, |
118 | ) { |
119 | let span = tracing::trace_span!("Prioritize::queue_frame" , ?stream.id); |
120 | let _e = span.enter(); |
121 | // Queue the frame in the buffer |
122 | stream.pending_send.push_back(buffer, frame); |
123 | self.schedule_send(stream, task); |
124 | } |
125 | |
126 | pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) { |
127 | // If the stream is waiting to be opened, nothing more to do. |
128 | if stream.is_send_ready() { |
129 | tracing::trace!(?stream.id, "schedule_send" ); |
130 | // Queue the stream |
131 | self.pending_send.push(stream); |
132 | |
133 | // Notify the connection. |
134 | if let Some(task) = task.take() { |
135 | task.wake(); |
136 | } |
137 | } |
138 | } |
139 | |
140 | pub fn queue_open(&mut self, stream: &mut store::Ptr) { |
141 | self.pending_open.push(stream); |
142 | } |
143 | |
144 | /// Send a data frame |
145 | pub fn send_data<B>( |
146 | &mut self, |
147 | frame: frame::Data<B>, |
148 | buffer: &mut Buffer<Frame<B>>, |
149 | stream: &mut store::Ptr, |
150 | counts: &mut Counts, |
151 | task: &mut Option<Waker>, |
152 | ) -> Result<(), UserError> |
153 | where |
154 | B: Buf, |
155 | { |
156 | let sz = frame.payload().remaining(); |
157 | |
158 | if sz > MAX_WINDOW_SIZE as usize { |
159 | return Err(UserError::PayloadTooBig); |
160 | } |
161 | |
162 | let sz = sz as WindowSize; |
163 | |
164 | if !stream.state.is_send_streaming() { |
165 | if stream.state.is_closed() { |
166 | return Err(InactiveStreamId); |
167 | } else { |
168 | return Err(UnexpectedFrameType); |
169 | } |
170 | } |
171 | |
172 | // Update the buffered data counter |
173 | stream.buffered_send_data += sz as usize; |
174 | |
175 | let span = |
176 | tracing::trace_span!("send_data" , sz, requested = stream.requested_send_capacity); |
177 | let _e = span.enter(); |
178 | tracing::trace!(buffered = stream.buffered_send_data); |
179 | |
180 | // Implicitly request more send capacity if not enough has been |
181 | // requested yet. |
182 | if (stream.requested_send_capacity as usize) < stream.buffered_send_data { |
183 | // Update the target requested capacity |
184 | stream.requested_send_capacity = |
185 | cmp::min(stream.buffered_send_data, WindowSize::MAX as usize) as WindowSize; |
186 | |
187 | self.try_assign_capacity(stream); |
188 | } |
189 | |
190 | if frame.is_end_stream() { |
191 | stream.state.send_close(); |
192 | self.reserve_capacity(0, stream, counts); |
193 | } |
194 | |
195 | tracing::trace!( |
196 | available = %stream.send_flow.available(), |
197 | buffered = stream.buffered_send_data, |
198 | ); |
199 | |
200 | // The `stream.buffered_send_data == 0` check is here so that, if a zero |
201 | // length data frame is queued to the front (there is no previously |
202 | // queued data), it gets sent out immediately even if there is no |
203 | // available send window. |
204 | // |
205 | // Sending out zero length data frames can be done to signal |
206 | // end-of-stream. |
207 | // |
208 | if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { |
209 | // The stream currently has capacity to send the data frame, so |
210 | // queue it up and notify the connection task. |
211 | self.queue_frame(frame.into(), buffer, stream, task); |
212 | } else { |
213 | // The stream has no capacity to send the frame now, save it but |
214 | // don't notify the connection task. Once additional capacity |
215 | // becomes available, the frame will be flushed. |
216 | stream.pending_send.push_back(buffer, frame.into()); |
217 | } |
218 | |
219 | Ok(()) |
220 | } |
221 | |
222 | /// Request capacity to send data |
223 | pub fn reserve_capacity( |
224 | &mut self, |
225 | capacity: WindowSize, |
226 | stream: &mut store::Ptr, |
227 | counts: &mut Counts, |
228 | ) { |
229 | let span = tracing::trace_span!( |
230 | "reserve_capacity" , |
231 | ?stream.id, |
232 | requested = capacity, |
233 | effective = (capacity as usize) + stream.buffered_send_data, |
234 | curr = stream.requested_send_capacity |
235 | ); |
236 | let _e = span.enter(); |
237 | |
238 | // Actual capacity is `capacity` + the current amount of buffered data. |
239 | // If it were less, then we could never send out the buffered data. |
240 | let capacity = (capacity as usize) + stream.buffered_send_data; |
241 | |
242 | match capacity.cmp(&(stream.requested_send_capacity as usize)) { |
243 | Ordering::Equal => { |
244 | // Nothing to do |
245 | } |
246 | Ordering::Less => { |
247 | // Update the target requested capacity |
248 | stream.requested_send_capacity = capacity as WindowSize; |
249 | |
250 | // Currently available capacity assigned to the stream |
251 | let available = stream.send_flow.available().as_size(); |
252 | |
253 | // If the stream has more assigned capacity than requested, reclaim |
254 | // some for the connection |
255 | if available as usize > capacity { |
256 | let diff = available - capacity as WindowSize; |
257 | |
258 | // TODO: proper error handling |
259 | let _res = stream.send_flow.claim_capacity(diff); |
260 | debug_assert!(_res.is_ok()); |
261 | |
262 | self.assign_connection_capacity(diff, stream, counts); |
263 | } |
264 | } |
265 | Ordering::Greater => { |
266 | // If trying to *add* capacity, but the stream send side is closed, |
267 | // there's nothing to be done. |
268 | if stream.state.is_send_closed() { |
269 | return; |
270 | } |
271 | |
272 | // Update the target requested capacity |
273 | stream.requested_send_capacity = |
274 | cmp::min(capacity, WindowSize::MAX as usize) as WindowSize; |
275 | |
276 | // Try to assign additional capacity to the stream. If none is |
277 | // currently available, the stream will be queued to receive some |
278 | // when more becomes available. |
279 | self.try_assign_capacity(stream); |
280 | } |
281 | } |
282 | } |
283 | |
284 | pub fn recv_stream_window_update( |
285 | &mut self, |
286 | inc: WindowSize, |
287 | stream: &mut store::Ptr, |
288 | ) -> Result<(), Reason> { |
289 | let span = tracing::trace_span!( |
290 | "recv_stream_window_update" , |
291 | ?stream.id, |
292 | ?stream.state, |
293 | inc, |
294 | flow = ?stream.send_flow |
295 | ); |
296 | let _e = span.enter(); |
297 | |
298 | if stream.state.is_send_closed() && stream.buffered_send_data == 0 { |
299 | // We can't send any data, so don't bother doing anything else. |
300 | return Ok(()); |
301 | } |
302 | |
303 | // Update the stream level flow control. |
304 | stream.send_flow.inc_window(inc)?; |
305 | |
306 | // If the stream is waiting on additional capacity, then this will |
307 | // assign it (if available on the connection) and notify the producer |
308 | self.try_assign_capacity(stream); |
309 | |
310 | Ok(()) |
311 | } |
312 | |
313 | pub fn recv_connection_window_update( |
314 | &mut self, |
315 | inc: WindowSize, |
316 | store: &mut Store, |
317 | counts: &mut Counts, |
318 | ) -> Result<(), Reason> { |
319 | // Update the connection's window |
320 | self.flow.inc_window(inc)?; |
321 | |
322 | self.assign_connection_capacity(inc, store, counts); |
323 | Ok(()) |
324 | } |
325 | |
326 | /// Reclaim all capacity assigned to the stream and re-assign it to the |
327 | /// connection |
328 | pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
329 | let available = stream.send_flow.available().as_size(); |
330 | if available > 0 { |
331 | // TODO: proper error handling |
332 | let _res = stream.send_flow.claim_capacity(available); |
333 | debug_assert!(_res.is_ok()); |
334 | // Re-assign all capacity to the connection |
335 | self.assign_connection_capacity(available, stream, counts); |
336 | } |
337 | } |
338 | |
339 | /// Reclaim just reserved capacity, not buffered capacity, and re-assign |
340 | /// it to the connection |
341 | pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { |
342 | // only reclaim requested capacity that isn't already buffered |
343 | if stream.requested_send_capacity as usize > stream.buffered_send_data { |
344 | let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize; |
345 | |
346 | // TODO: proper error handling |
347 | let _res = stream.send_flow.claim_capacity(reserved); |
348 | debug_assert!(_res.is_ok()); |
349 | self.assign_connection_capacity(reserved, stream, counts); |
350 | } |
351 | } |
352 | |
353 | pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { |
354 | let span = tracing::trace_span!("clear_pending_capacity" ); |
355 | let _e = span.enter(); |
356 | while let Some(stream) = self.pending_capacity.pop(store) { |
357 | counts.transition(stream, |_, stream| { |
358 | tracing::trace!(?stream.id, "clear_pending_capacity" ); |
359 | }) |
360 | } |
361 | } |
362 | |
363 | pub fn assign_connection_capacity<R>( |
364 | &mut self, |
365 | inc: WindowSize, |
366 | store: &mut R, |
367 | counts: &mut Counts, |
368 | ) where |
369 | R: Resolve, |
370 | { |
371 | let span = tracing::trace_span!("assign_connection_capacity" , inc); |
372 | let _e = span.enter(); |
373 | |
374 | // TODO: proper error handling |
375 | let _res = self.flow.assign_capacity(inc); |
376 | debug_assert!(_res.is_ok()); |
377 | |
378 | // Assign newly acquired capacity to streams pending capacity. |
379 | while self.flow.available() > 0 { |
380 | let stream = match self.pending_capacity.pop(store) { |
381 | Some(stream) => stream, |
382 | None => return, |
383 | }; |
384 | |
385 | // Streams pending capacity may have been reset before capacity |
386 | // became available. In that case, the stream won't want any |
387 | // capacity, and so we shouldn't "transition" on it, but just evict |
388 | // it and continue the loop. |
389 | if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) { |
390 | continue; |
391 | } |
392 | |
393 | counts.transition(stream, |_, stream| { |
394 | // Try to assign capacity to the stream. This will also re-queue the |
395 | // stream if there isn't enough connection level capacity to fulfill |
396 | // the capacity request. |
397 | self.try_assign_capacity(stream); |
398 | }) |
399 | } |
400 | } |
401 | |
402 | /// Request capacity to send data |
403 | fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { |
404 | let total_requested = stream.requested_send_capacity; |
405 | |
406 | // Total requested should never go below actual assigned |
407 | // (Note: the window size can go lower than assigned) |
408 | debug_assert!(stream.send_flow.available() <= total_requested as usize); |
409 | |
410 | // The amount of additional capacity that the stream requests. |
411 | // Don't assign more than the window has available! |
412 | let additional = cmp::min( |
413 | total_requested - stream.send_flow.available().as_size(), |
414 | // Can't assign more than what is available |
415 | stream.send_flow.window_size() - stream.send_flow.available().as_size(), |
416 | ); |
417 | let span = tracing::trace_span!("try_assign_capacity" , ?stream.id); |
418 | let _e = span.enter(); |
419 | tracing::trace!( |
420 | requested = total_requested, |
421 | additional, |
422 | buffered = stream.buffered_send_data, |
423 | window = stream.send_flow.window_size(), |
424 | conn = %self.flow.available() |
425 | ); |
426 | |
427 | if additional == 0 { |
428 | // Nothing more to do |
429 | return; |
430 | } |
431 | |
432 | // If the stream has requested capacity, then it must be in the |
433 | // streaming state (more data could be sent) or there is buffered data |
434 | // waiting to be sent. |
435 | debug_assert!( |
436 | stream.state.is_send_streaming() || stream.buffered_send_data > 0, |
437 | "state= {:?}" , |
438 | stream.state |
439 | ); |
440 | |
441 | // The amount of currently available capacity on the connection |
442 | let conn_available = self.flow.available().as_size(); |
443 | |
444 | // First check if capacity is immediately available |
445 | if conn_available > 0 { |
446 | // The amount of capacity to assign to the stream |
447 | // TODO: Should prioritization factor into this? |
448 | let assign = cmp::min(conn_available, additional); |
449 | |
450 | tracing::trace!(capacity = assign, "assigning" ); |
451 | |
452 | // Assign the capacity to the stream |
453 | stream.assign_capacity(assign, self.max_buffer_size); |
454 | |
455 | // Claim the capacity from the connection |
456 | // TODO: proper error handling |
457 | let _res = self.flow.claim_capacity(assign); |
458 | debug_assert!(_res.is_ok()); |
459 | } |
460 | |
461 | tracing::trace!( |
462 | available = %stream.send_flow.available(), |
463 | requested = stream.requested_send_capacity, |
464 | buffered = stream.buffered_send_data, |
465 | has_unavailable = %stream.send_flow.has_unavailable() |
466 | ); |
467 | |
468 | if stream.send_flow.available() < stream.requested_send_capacity as usize |
469 | && stream.send_flow.has_unavailable() |
470 | { |
471 | // The stream requires additional capacity and the stream's |
472 | // window has available capacity, but the connection window |
473 | // does not. |
474 | // |
475 | // In this case, the stream needs to be queued up for when the |
476 | // connection has more capacity. |
477 | self.pending_capacity.push(stream); |
478 | } |
479 | |
480 | // If data is buffered and the stream is send ready, then |
481 | // schedule the stream for execution |
482 | if stream.buffered_send_data > 0 && stream.is_send_ready() { |
483 | // TODO: This assertion isn't *exactly* correct. There can still be |
484 | // buffered send data while the stream's pending send queue is |
485 | // empty. This can happen when a large data frame is in the process |
486 | // of being **partially** sent. Once the window has been sent, the |
487 | // data frame will be returned to the prioritization layer to be |
488 | // re-scheduled. |
489 | // |
490 | // That said, it would be nice to figure out how to make this |
491 | // assertion correctly. |
492 | // |
493 | // debug_assert!(!stream.pending_send.is_empty()); |
494 | |
495 | self.pending_send.push(stream); |
496 | } |
497 | } |
498 | |
499 | pub fn poll_complete<T, B>( |
500 | &mut self, |
501 | cx: &mut Context, |
502 | buffer: &mut Buffer<Frame<B>>, |
503 | store: &mut Store, |
504 | counts: &mut Counts, |
505 | dst: &mut Codec<T, Prioritized<B>>, |
506 | ) -> Poll<io::Result<()>> |
507 | where |
508 | T: AsyncWrite + Unpin, |
509 | B: Buf, |
510 | { |
511 | // Ensure codec is ready |
512 | ready!(dst.poll_ready(cx))?; |
513 | |
514 | // Reclaim any frame that has previously been written |
515 | self.reclaim_frame(buffer, store, dst); |
516 | |
517 | // The max frame length |
518 | let max_frame_len = dst.max_send_frame_size(); |
519 | |
520 | tracing::trace!("poll_complete" ); |
521 | |
522 | loop { |
523 | if let Some(mut stream) = self.pop_pending_open(store, counts) { |
524 | self.pending_send.push_front(&mut stream); |
525 | } |
526 | |
527 | match self.pop_frame(buffer, store, max_frame_len, counts) { |
528 | Some(frame) => { |
529 | tracing::trace!(?frame, "writing" ); |
530 | |
531 | debug_assert_eq!(self.in_flight_data_frame, InFlightData::Nothing); |
532 | if let Frame::Data(ref frame) = frame { |
533 | self.in_flight_data_frame = InFlightData::DataFrame(frame.payload().stream); |
534 | } |
535 | dst.buffer(frame).expect("invalid frame" ); |
536 | |
537 | // Ensure the codec is ready to try the loop again. |
538 | ready!(dst.poll_ready(cx))?; |
539 | |
540 | // Because, always try to reclaim... |
541 | self.reclaim_frame(buffer, store, dst); |
542 | } |
543 | None => { |
544 | // Try to flush the codec. |
545 | ready!(dst.flush(cx))?; |
546 | |
547 | // This might release a data frame... |
548 | if !self.reclaim_frame(buffer, store, dst) { |
549 | return Poll::Ready(Ok(())); |
550 | } |
551 | |
552 | // No need to poll ready as poll_complete() does this for |
553 | // us... |
554 | } |
555 | } |
556 | } |
557 | } |
558 | |
559 | /// Tries to reclaim a pending data frame from the codec. |
560 | /// |
561 | /// Returns true if a frame was reclaimed. |
562 | /// |
563 | /// When a data frame is written to the codec, it may not be written in its |
564 | /// entirety (large chunks are split up into potentially many data frames). |
565 | /// In this case, the stream needs to be reprioritized. |
566 | fn reclaim_frame<T, B>( |
567 | &mut self, |
568 | buffer: &mut Buffer<Frame<B>>, |
569 | store: &mut Store, |
570 | dst: &mut Codec<T, Prioritized<B>>, |
571 | ) -> bool |
572 | where |
573 | B: Buf, |
574 | { |
575 | let span = tracing::trace_span!("try_reclaim_frame" ); |
576 | let _e = span.enter(); |
577 | |
578 | // First check if there are any data chunks to take back |
579 | if let Some(frame) = dst.take_last_data_frame() { |
580 | self.reclaim_frame_inner(buffer, store, frame) |
581 | } else { |
582 | false |
583 | } |
584 | } |
585 | |
586 | fn reclaim_frame_inner<B>( |
587 | &mut self, |
588 | buffer: &mut Buffer<Frame<B>>, |
589 | store: &mut Store, |
590 | frame: frame::Data<Prioritized<B>>, |
591 | ) -> bool |
592 | where |
593 | B: Buf, |
594 | { |
595 | tracing::trace!( |
596 | ?frame, |
597 | sz = frame.payload().inner.get_ref().remaining(), |
598 | "reclaimed" |
599 | ); |
600 | |
601 | let mut eos = false; |
602 | let key = frame.payload().stream; |
603 | |
604 | match mem::replace(&mut self.in_flight_data_frame, InFlightData::Nothing) { |
605 | InFlightData::Nothing => panic!("wasn't expecting a frame to reclaim" ), |
606 | InFlightData::Drop => { |
607 | tracing::trace!("not reclaiming frame for cancelled stream" ); |
608 | return false; |
609 | } |
610 | InFlightData::DataFrame(k) => { |
611 | debug_assert_eq!(k, key); |
612 | } |
613 | } |
614 | |
615 | let mut frame = frame.map(|prioritized| { |
616 | // TODO: Ensure fully written |
617 | eos = prioritized.end_of_stream; |
618 | prioritized.inner.into_inner() |
619 | }); |
620 | |
621 | if frame.payload().has_remaining() { |
622 | let mut stream = store.resolve(key); |
623 | |
624 | if eos { |
625 | frame.set_end_stream(true); |
626 | } |
627 | |
628 | self.push_back_frame(frame.into(), buffer, &mut stream); |
629 | |
630 | return true; |
631 | } |
632 | |
633 | false |
634 | } |
635 | |
636 | /// Push the frame to the front of the stream's deque, scheduling the |
637 | /// stream if needed. |
638 | fn push_back_frame<B>( |
639 | &mut self, |
640 | frame: Frame<B>, |
641 | buffer: &mut Buffer<Frame<B>>, |
642 | stream: &mut store::Ptr, |
643 | ) { |
644 | // Push the frame to the front of the stream's deque |
645 | stream.pending_send.push_front(buffer, frame); |
646 | |
647 | // If needed, schedule the sender |
648 | if stream.send_flow.available() > 0 { |
649 | debug_assert!(!stream.pending_send.is_empty()); |
650 | self.pending_send.push(stream); |
651 | } |
652 | } |
653 | |
654 | pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { |
655 | let span = tracing::trace_span!("clear_queue" , ?stream.id); |
656 | let _e = span.enter(); |
657 | |
658 | // TODO: make this more efficient? |
659 | while let Some(frame) = stream.pending_send.pop_front(buffer) { |
660 | tracing::trace!(?frame, "dropping" ); |
661 | } |
662 | |
663 | stream.buffered_send_data = 0; |
664 | stream.requested_send_capacity = 0; |
665 | if let InFlightData::DataFrame(key) = self.in_flight_data_frame { |
666 | if stream.key() == key { |
667 | // This stream could get cleaned up now - don't allow the buffered frame to get reclaimed. |
668 | self.in_flight_data_frame = InFlightData::Drop; |
669 | } |
670 | } |
671 | } |
672 | |
673 | pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { |
674 | while let Some(stream) = self.pending_send.pop(store) { |
675 | let is_pending_reset = stream.is_pending_reset_expiration(); |
676 | counts.transition_after(stream, is_pending_reset); |
677 | } |
678 | } |
679 | |
680 | pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { |
681 | while let Some(stream) = self.pending_open.pop(store) { |
682 | let is_pending_reset = stream.is_pending_reset_expiration(); |
683 | counts.transition_after(stream, is_pending_reset); |
684 | } |
685 | } |
686 | |
687 | fn pop_frame<B>( |
688 | &mut self, |
689 | buffer: &mut Buffer<Frame<B>>, |
690 | store: &mut Store, |
691 | max_len: usize, |
692 | counts: &mut Counts, |
693 | ) -> Option<Frame<Prioritized<B>>> |
694 | where |
695 | B: Buf, |
696 | { |
697 | let span = tracing::trace_span!("pop_frame" ); |
698 | let _e = span.enter(); |
699 | |
700 | loop { |
701 | match self.pending_send.pop(store) { |
702 | Some(mut stream) => { |
703 | let span = tracing::trace_span!("popped" , ?stream.id, ?stream.state); |
704 | let _e = span.enter(); |
705 | |
706 | // It's possible that this stream, besides having data to send, |
707 | // is also queued to send a reset, and thus is already in the queue |
708 | // to wait for "some time" after a reset. |
709 | // |
710 | // To be safe, we just always ask the stream. |
711 | let is_pending_reset = stream.is_pending_reset_expiration(); |
712 | |
713 | tracing::trace!(is_pending_reset); |
714 | |
715 | let frame = match stream.pending_send.pop_front(buffer) { |
716 | Some(Frame::Data(mut frame)) => { |
717 | // Get the amount of capacity remaining for stream's |
718 | // window. |
719 | let stream_capacity = stream.send_flow.available(); |
720 | let sz = frame.payload().remaining(); |
721 | |
722 | tracing::trace!( |
723 | sz, |
724 | eos = frame.is_end_stream(), |
725 | window = %stream_capacity, |
726 | available = %stream.send_flow.available(), |
727 | requested = stream.requested_send_capacity, |
728 | buffered = stream.buffered_send_data, |
729 | "data frame" |
730 | ); |
731 | |
732 | // Zero length data frames always have capacity to |
733 | // be sent. |
734 | if sz > 0 && stream_capacity == 0 { |
735 | tracing::trace!("stream capacity is 0" ); |
736 | |
737 | // Ensure that the stream is waiting for |
738 | // connection level capacity |
739 | // |
740 | // TODO: uncomment |
741 | // debug_assert!(stream.is_pending_send_capacity); |
742 | |
743 | // The stream has no more capacity, this can |
744 | // happen if the remote reduced the stream |
745 | // window. In this case, we need to buffer the |
746 | // frame and wait for a window update... |
747 | stream.pending_send.push_front(buffer, frame.into()); |
748 | |
749 | continue; |
750 | } |
751 | |
752 | // Only send up to the max frame length |
753 | let len = cmp::min(sz, max_len); |
754 | |
755 | // Only send up to the stream's window capacity |
756 | let len = |
757 | cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; |
758 | |
759 | // There *must* be be enough connection level |
760 | // capacity at this point. |
761 | debug_assert!(len <= self.flow.window_size()); |
762 | |
763 | // Check if the stream level window the peer knows is available. In some |
764 | // scenarios, maybe the window we know is available but the window which |
765 | // peer knows is not. |
766 | if len > 0 && len > stream.send_flow.window_size() { |
767 | stream.pending_send.push_front(buffer, frame.into()); |
768 | continue; |
769 | } |
770 | |
771 | tracing::trace!(len, "sending data frame" ); |
772 | |
773 | // Update the flow control |
774 | tracing::trace_span!("updating stream flow" ).in_scope(|| { |
775 | stream.send_data(len, self.max_buffer_size); |
776 | |
777 | // Assign the capacity back to the connection that |
778 | // was just consumed from the stream in the previous |
779 | // line. |
780 | // TODO: proper error handling |
781 | let _res = self.flow.assign_capacity(len); |
782 | debug_assert!(_res.is_ok()); |
783 | }); |
784 | |
785 | let (eos, len) = tracing::trace_span!("updating connection flow" ) |
786 | .in_scope(|| { |
787 | // TODO: proper error handling |
788 | let _res = self.flow.send_data(len); |
789 | debug_assert!(_res.is_ok()); |
790 | |
791 | // Wrap the frame's data payload to ensure that the |
792 | // correct amount of data gets written. |
793 | |
794 | let eos = frame.is_end_stream(); |
795 | let len = len as usize; |
796 | |
797 | if frame.payload().remaining() > len { |
798 | frame.set_end_stream(false); |
799 | } |
800 | (eos, len) |
801 | }); |
802 | |
803 | Frame::Data(frame.map(|buf| Prioritized { |
804 | inner: buf.take(len), |
805 | end_of_stream: eos, |
806 | stream: stream.key(), |
807 | })) |
808 | } |
809 | Some(Frame::PushPromise(pp)) => { |
810 | let mut pushed = |
811 | stream.store_mut().find_mut(&pp.promised_id()).unwrap(); |
812 | pushed.is_pending_push = false; |
813 | // Transition stream from pending_push to pending_open |
814 | // if possible |
815 | if !pushed.pending_send.is_empty() { |
816 | if counts.can_inc_num_send_streams() { |
817 | counts.inc_num_send_streams(&mut pushed); |
818 | self.pending_send.push(&mut pushed); |
819 | } else { |
820 | self.queue_open(&mut pushed); |
821 | } |
822 | } |
823 | Frame::PushPromise(pp) |
824 | } |
825 | Some(frame) => frame.map(|_| { |
826 | unreachable!( |
827 | "Frame::map closure will only be called \ |
828 | on DATA frames." |
829 | ) |
830 | }), |
831 | None => { |
832 | if let Some(reason) = stream.state.get_scheduled_reset() { |
833 | let stream_id = stream.id; |
834 | stream |
835 | .state |
836 | .set_reset(stream_id, reason, Initiator::Library); |
837 | |
838 | let frame = frame::Reset::new(stream.id, reason); |
839 | Frame::Reset(frame) |
840 | } else { |
841 | // If the stream receives a RESET from the peer, it may have |
842 | // had data buffered to be sent, but all the frames are cleared |
843 | // in clear_queue(). Instead of doing O(N) traversal through queue |
844 | // to remove, lets just ignore the stream here. |
845 | tracing::trace!("removing dangling stream from pending_send" ); |
846 | // Since this should only happen as a consequence of `clear_queue`, |
847 | // we must be in a closed state of some kind. |
848 | debug_assert!(stream.state.is_closed()); |
849 | counts.transition_after(stream, is_pending_reset); |
850 | continue; |
851 | } |
852 | } |
853 | }; |
854 | |
855 | tracing::trace!("pop_frame; frame= {:?}" , frame); |
856 | |
857 | if cfg!(debug_assertions) && stream.state.is_idle() { |
858 | debug_assert!(stream.id > self.last_opened_id); |
859 | self.last_opened_id = stream.id; |
860 | } |
861 | |
862 | if !stream.pending_send.is_empty() || stream.state.is_scheduled_reset() { |
863 | // TODO: Only requeue the sender IF it is ready to send |
864 | // the next frame. i.e. don't requeue it if the next |
865 | // frame is a data frame and the stream does not have |
866 | // any more capacity. |
867 | self.pending_send.push(&mut stream); |
868 | } |
869 | |
870 | counts.transition_after(stream, is_pending_reset); |
871 | |
872 | return Some(frame); |
873 | } |
874 | None => return None, |
875 | } |
876 | } |
877 | } |
878 | |
879 | fn pop_pending_open<'s>( |
880 | &mut self, |
881 | store: &'s mut Store, |
882 | counts: &mut Counts, |
883 | ) -> Option<store::Ptr<'s>> { |
884 | tracing::trace!("schedule_pending_open" ); |
885 | // check for any pending open streams |
886 | if counts.can_inc_num_send_streams() { |
887 | if let Some(mut stream) = self.pending_open.pop(store) { |
888 | tracing::trace!("schedule_pending_open; stream= {:?}" , stream.id); |
889 | |
890 | counts.inc_num_send_streams(&mut stream); |
891 | stream.notify_send(); |
892 | return Some(stream); |
893 | } |
894 | } |
895 | |
896 | None |
897 | } |
898 | } |
899 | |
900 | // ===== impl Prioritized ===== |
901 | |
902 | impl<B> Buf for Prioritized<B> |
903 | where |
904 | B: Buf, |
905 | { |
906 | fn remaining(&self) -> usize { |
907 | self.inner.remaining() |
908 | } |
909 | |
910 | fn chunk(&self) -> &[u8] { |
911 | self.inner.chunk() |
912 | } |
913 | |
914 | fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { |
915 | self.inner.chunks_vectored(dst) |
916 | } |
917 | |
918 | fn advance(&mut self, cnt: usize) { |
919 | self.inner.advance(cnt) |
920 | } |
921 | } |
922 | |
923 | impl<B: Buf> fmt::Debug for Prioritized<B> { |
924 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
925 | fmt&mut DebugStruct<'_, '_>.debug_struct("Prioritized" ) |
926 | .field("remaining" , &self.inner.get_ref().remaining()) |
927 | .field("end_of_stream" , &self.end_of_stream) |
928 | .field(name:"stream" , &self.stream) |
929 | .finish() |
930 | } |
931 | } |
932 | |