1use crate::Reason;
2
3use super::*;
4
5use std::fmt;
6use std::task::{Context, Waker};
7use std::time::Instant;
8
9/// Tracks Stream related state
10///
11/// # Reference counting
12///
13/// There can be a number of outstanding handles to a single Stream. These are
14/// tracked using reference counting. The `ref_count` field represents the
15/// number of outstanding userspace handles that can reach this stream.
16///
17/// It's important to note that when the stream is placed in an internal queue
18/// (such as an accept queue), this is **not** tracked by a reference count.
19/// Thus, `ref_count` can be zero and the stream still has to be kept around.
20pub(super) struct Stream {
21 /// The h2 stream identifier
22 pub id: StreamId,
23
24 /// Current state of the stream
25 pub state: State,
26
27 /// Set to `true` when the stream is counted against the connection's max
28 /// concurrent streams.
29 pub is_counted: bool,
30
31 /// Number of outstanding handles pointing to this stream
32 pub ref_count: usize,
33
34 // ===== Fields related to sending =====
35 /// Next node in the accept linked list
36 pub next_pending_send: Option<store::Key>,
37
38 /// Set to true when the stream is pending accept
39 pub is_pending_send: bool,
40
41 /// Send data flow control
42 pub send_flow: FlowControl,
43
44 /// Amount of send capacity that has been requested, but not yet allocated.
45 pub requested_send_capacity: WindowSize,
46
47 /// Amount of data buffered at the prioritization layer.
48 /// TODO: Technically this could be greater than the window size...
49 pub buffered_send_data: usize,
50
51 /// Task tracking additional send capacity (i.e. window updates).
52 send_task: Option<Waker>,
53
54 /// Frames pending for this stream being sent to the socket
55 pub pending_send: buffer::Deque,
56
57 /// Next node in the linked list of streams waiting for additional
58 /// connection level capacity.
59 pub next_pending_send_capacity: Option<store::Key>,
60
61 /// True if the stream is waiting for outbound connection capacity
62 pub is_pending_send_capacity: bool,
63
64 /// Set to true when the send capacity has been incremented
65 pub send_capacity_inc: bool,
66
67 /// Next node in the open linked list
68 pub next_open: Option<store::Key>,
69
70 /// Set to true when the stream is pending to be opened
71 pub is_pending_open: bool,
72
73 /// Set to true when a push is pending for this stream
74 pub is_pending_push: bool,
75
76 // ===== Fields related to receiving =====
77 /// Next node in the accept linked list
78 pub next_pending_accept: Option<store::Key>,
79
80 /// Set to true when the stream is pending accept
81 pub is_pending_accept: bool,
82
83 /// Receive data flow control
84 pub recv_flow: FlowControl,
85
86 pub in_flight_recv_data: WindowSize,
87
88 /// Next node in the linked list of streams waiting to send window updates.
89 pub next_window_update: Option<store::Key>,
90
91 /// True if the stream is waiting to send a window update
92 pub is_pending_window_update: bool,
93
94 /// The time when this stream may have been locally reset.
95 pub reset_at: Option<Instant>,
96
97 /// Next node in list of reset streams that should expire eventually
98 pub next_reset_expire: Option<store::Key>,
99
100 /// Frames pending for this stream to read
101 pub pending_recv: buffer::Deque,
102
103 /// When the RecvStream drop occurs, no data should be received.
104 pub is_recv: bool,
105
106 /// Task tracking receiving frames
107 pub recv_task: Option<Waker>,
108
109 /// Task tracking pushed promises.
110 pub push_task: Option<Waker>,
111
112 /// The stream's pending push promises
113 pub pending_push_promises: store::Queue<NextAccept>,
114
115 /// Validate content-length headers
116 pub content_length: ContentLength,
117}
118
119/// State related to validating a stream's content-length
120#[derive(Debug)]
121pub enum ContentLength {
122 Omitted,
123 Head,
124 Remaining(u64),
125}
126
127#[derive(Debug)]
128pub(super) struct NextAccept;
129
130#[derive(Debug)]
131pub(super) struct NextSend;
132
133#[derive(Debug)]
134pub(super) struct NextSendCapacity;
135
136#[derive(Debug)]
137pub(super) struct NextWindowUpdate;
138
139#[derive(Debug)]
140pub(super) struct NextOpen;
141
142#[derive(Debug)]
143pub(super) struct NextResetExpire;
144
145impl Stream {
146 pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
147 let mut send_flow = FlowControl::new();
148 let mut recv_flow = FlowControl::new();
149
150 recv_flow
151 .inc_window(init_recv_window)
152 .expect("invalid initial receive window");
153 // TODO: proper error handling?
154 let _res = recv_flow.assign_capacity(init_recv_window);
155 debug_assert!(_res.is_ok());
156
157 send_flow
158 .inc_window(init_send_window)
159 .expect("invalid initial send window size");
160
161 Stream {
162 id,
163 state: State::default(),
164 ref_count: 0,
165 is_counted: false,
166
167 // ===== Fields related to sending =====
168 next_pending_send: None,
169 is_pending_send: false,
170 send_flow,
171 requested_send_capacity: 0,
172 buffered_send_data: 0,
173 send_task: None,
174 pending_send: buffer::Deque::new(),
175 is_pending_send_capacity: false,
176 next_pending_send_capacity: None,
177 send_capacity_inc: false,
178 is_pending_open: false,
179 next_open: None,
180 is_pending_push: false,
181
182 // ===== Fields related to receiving =====
183 next_pending_accept: None,
184 is_pending_accept: false,
185 recv_flow,
186 in_flight_recv_data: 0,
187 next_window_update: None,
188 is_pending_window_update: false,
189 reset_at: None,
190 next_reset_expire: None,
191 pending_recv: buffer::Deque::new(),
192 is_recv: true,
193 recv_task: None,
194 push_task: None,
195 pending_push_promises: store::Queue::new(),
196 content_length: ContentLength::Omitted,
197 }
198 }
199
200 /// Increment the stream's ref count
201 pub fn ref_inc(&mut self) {
202 assert!(self.ref_count < usize::MAX);
203 self.ref_count += 1;
204 }
205
206 /// Decrements the stream's ref count
207 pub fn ref_dec(&mut self) {
208 assert!(self.ref_count > 0);
209 self.ref_count -= 1;
210 }
211
212 /// Returns true if stream is currently being held for some time because of
213 /// a local reset.
214 pub fn is_pending_reset_expiration(&self) -> bool {
215 self.reset_at.is_some()
216 }
217
218 /// Returns true if frames for this stream are ready to be sent over the wire
219 pub fn is_send_ready(&self) -> bool {
220 // Why do we check pending_open?
221 //
222 // We allow users to call send_request() which schedules a stream to be pending_open
223 // if there is no room according to the concurrency limit (max_send_streams), and we
224 // also allow data to be buffered for send with send_data() if there is no capacity for
225 // the stream to send the data, which attempts to place the stream in pending_send.
226 // If the stream is not open, we don't want the stream to be scheduled for
227 // execution (pending_send). Note that if the stream is in pending_open, it will be
228 // pushed to pending_send when there is room for an open stream.
229 //
230 // In pending_push we track whether a PushPromise still needs to be sent
231 // from a different stream before we can start sending frames on this one.
232 // This is different from the "open" check because reserved streams don't count
233 // toward the concurrency limit.
234 // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
235 !self.is_pending_open && !self.is_pending_push
236 }
237
238 /// Returns true if the stream is closed
239 pub fn is_closed(&self) -> bool {
240 // The state has fully transitioned to closed.
241 self.state.is_closed() &&
242 // Because outbound frames transition the stream state before being
243 // buffered, we have to ensure that all frames have been flushed.
244 self.pending_send.is_empty() &&
245 // Sometimes large data frames are sent out in chunks. After a chunk
246 // of the frame is sent, the remainder is pushed back onto the send
247 // queue to be rescheduled.
248 //
249 // Checking for additional buffered data lets us catch this case.
250 self.buffered_send_data == 0
251 }
252
253 /// Returns true if the stream is no longer in use
254 pub fn is_released(&self) -> bool {
255 // The stream is closed and fully flushed
256 self.is_closed() &&
257 // There are no more outstanding references to the stream
258 self.ref_count == 0 &&
259 // The stream is not in any queue
260 !self.is_pending_send && !self.is_pending_send_capacity &&
261 !self.is_pending_accept && !self.is_pending_window_update &&
262 !self.is_pending_open && self.reset_at.is_none()
263 }
264
265 /// Returns true when the consumer of the stream has dropped all handles
266 /// (indicating no further interest in the stream) and the stream state is
267 /// not actually closed.
268 ///
269 /// In this case, a reset should be sent.
270 pub fn is_canceled_interest(&self) -> bool {
271 self.ref_count == 0 && !self.state.is_closed()
272 }
273
274 /// Current available stream send capacity
275 pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
276 let available = self.send_flow.available().as_size() as usize;
277 let buffered = self.buffered_send_data;
278
279 available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
280 }
281
282 pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
283 let prev_capacity = self.capacity(max_buffer_size);
284 debug_assert!(capacity > 0);
285 // TODO: proper error handling
286 let _res = self.send_flow.assign_capacity(capacity);
287 debug_assert!(_res.is_ok());
288
289 tracing::trace!(
290 " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
291 self.send_flow.available(),
292 self.buffered_send_data,
293 self.id,
294 max_buffer_size,
295 prev_capacity,
296 );
297
298 if prev_capacity < self.capacity(max_buffer_size) {
299 self.notify_capacity();
300 }
301 }
302
303 pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
304 let prev_capacity = self.capacity(max_buffer_size);
305
306 // TODO: proper error handling
307 let _res = self.send_flow.send_data(len);
308 debug_assert!(_res.is_ok());
309
310 // Decrement the stream's buffered data counter
311 debug_assert!(self.buffered_send_data >= len as usize);
312 self.buffered_send_data -= len as usize;
313 self.requested_send_capacity -= len;
314
315 tracing::trace!(
316 " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
317 self.send_flow.available(),
318 self.buffered_send_data,
319 self.id,
320 max_buffer_size,
321 prev_capacity,
322 );
323
324 if prev_capacity < self.capacity(max_buffer_size) {
325 self.notify_capacity();
326 }
327 }
328
329 /// If the capacity was limited because of the max_send_buffer_size,
330 /// then consider waking the send task again...
331 pub fn notify_capacity(&mut self) {
332 self.send_capacity_inc = true;
333 tracing::trace!(" notifying task");
334 self.notify_send();
335 }
336
337 /// Returns `Err` when the decrement cannot be completed due to overflow.
338 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
339 match self.content_length {
340 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
341 Some(val) => *rem = val,
342 None => return Err(()),
343 },
344 ContentLength::Head => {
345 if len != 0 {
346 return Err(());
347 }
348 }
349 _ => {}
350 }
351
352 Ok(())
353 }
354
355 pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
356 match self.content_length {
357 ContentLength::Remaining(0) => Ok(()),
358 ContentLength::Remaining(_) => Err(()),
359 _ => Ok(()),
360 }
361 }
362
363 pub fn notify_send(&mut self) {
364 if let Some(task) = self.send_task.take() {
365 task.wake();
366 }
367 }
368
369 pub fn wait_send(&mut self, cx: &Context) {
370 self.send_task = Some(cx.waker().clone());
371 }
372
373 pub fn notify_recv(&mut self) {
374 if let Some(task) = self.recv_task.take() {
375 task.wake();
376 }
377 }
378
379 pub(super) fn notify_push(&mut self) {
380 if let Some(task) = self.push_task.take() {
381 task.wake();
382 }
383 }
384
385 /// Set the stream's state to `Closed` with the given reason and initiator.
386 /// Notify the send and receive tasks, if they exist.
387 pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388 self.state.set_reset(self.id, reason, initiator);
389 self.notify_push();
390 self.notify_recv();
391 }
392}
393
394impl fmt::Debug for Stream {
395 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
396 f.debug_struct("Stream")
397 .field("id", &self.id)
398 .field("state", &self.state)
399 .field("is_counted", &self.is_counted)
400 .field("ref_count", &self.ref_count)
401 .field("next_pending_send", &self.next_pending_send)
402 .field("is_pending_send", &self.is_pending_send)
403 .field("send_flow", &self.send_flow)
404 .field("requested_send_capacity", &self.requested_send_capacity)
405 .field("buffered_send_data", &self.buffered_send_data)
406 .field("send_task", &self.send_task.as_ref().map(|_| ()))
407 .field("pending_send", &self.pending_send)
408 .field(
409 "next_pending_send_capacity",
410 &self.next_pending_send_capacity,
411 )
412 .field("is_pending_send_capacity", &self.is_pending_send_capacity)
413 .field("send_capacity_inc", &self.send_capacity_inc)
414 .field("next_open", &self.next_open)
415 .field("is_pending_open", &self.is_pending_open)
416 .field("is_pending_push", &self.is_pending_push)
417 .field("next_pending_accept", &self.next_pending_accept)
418 .field("is_pending_accept", &self.is_pending_accept)
419 .field("recv_flow", &self.recv_flow)
420 .field("in_flight_recv_data", &self.in_flight_recv_data)
421 .field("next_window_update", &self.next_window_update)
422 .field("is_pending_window_update", &self.is_pending_window_update)
423 .field("reset_at", &self.reset_at)
424 .field("next_reset_expire", &self.next_reset_expire)
425 .field("pending_recv", &self.pending_recv)
426 .field("is_recv", &self.is_recv)
427 .field("recv_task", &self.recv_task.as_ref().map(|_| ()))
428 .field("push_task", &self.push_task.as_ref().map(|_| ()))
429 .field("pending_push_promises", &self.pending_push_promises)
430 .field("content_length", &self.content_length)
431 .finish()
432 }
433}
434
435impl store::Next for NextAccept {
436 fn next(stream: &Stream) -> Option<store::Key> {
437 stream.next_pending_accept
438 }
439
440 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
441 stream.next_pending_accept = key;
442 }
443
444 fn take_next(stream: &mut Stream) -> Option<store::Key> {
445 stream.next_pending_accept.take()
446 }
447
448 fn is_queued(stream: &Stream) -> bool {
449 stream.is_pending_accept
450 }
451
452 fn set_queued(stream: &mut Stream, val: bool) {
453 stream.is_pending_accept = val;
454 }
455}
456
457impl store::Next for NextSend {
458 fn next(stream: &Stream) -> Option<store::Key> {
459 stream.next_pending_send
460 }
461
462 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
463 stream.next_pending_send = key;
464 }
465
466 fn take_next(stream: &mut Stream) -> Option<store::Key> {
467 stream.next_pending_send.take()
468 }
469
470 fn is_queued(stream: &Stream) -> bool {
471 stream.is_pending_send
472 }
473
474 fn set_queued(stream: &mut Stream, val: bool) {
475 if val {
476 // ensure that stream is not queued for being opened
477 // if it's being put into queue for sending data
478 debug_assert!(!stream.is_pending_open);
479 }
480 stream.is_pending_send = val;
481 }
482}
483
484impl store::Next for NextSendCapacity {
485 fn next(stream: &Stream) -> Option<store::Key> {
486 stream.next_pending_send_capacity
487 }
488
489 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
490 stream.next_pending_send_capacity = key;
491 }
492
493 fn take_next(stream: &mut Stream) -> Option<store::Key> {
494 stream.next_pending_send_capacity.take()
495 }
496
497 fn is_queued(stream: &Stream) -> bool {
498 stream.is_pending_send_capacity
499 }
500
501 fn set_queued(stream: &mut Stream, val: bool) {
502 stream.is_pending_send_capacity = val;
503 }
504}
505
506impl store::Next for NextWindowUpdate {
507 fn next(stream: &Stream) -> Option<store::Key> {
508 stream.next_window_update
509 }
510
511 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
512 stream.next_window_update = key;
513 }
514
515 fn take_next(stream: &mut Stream) -> Option<store::Key> {
516 stream.next_window_update.take()
517 }
518
519 fn is_queued(stream: &Stream) -> bool {
520 stream.is_pending_window_update
521 }
522
523 fn set_queued(stream: &mut Stream, val: bool) {
524 stream.is_pending_window_update = val;
525 }
526}
527
528impl store::Next for NextOpen {
529 fn next(stream: &Stream) -> Option<store::Key> {
530 stream.next_open
531 }
532
533 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
534 stream.next_open = key;
535 }
536
537 fn take_next(stream: &mut Stream) -> Option<store::Key> {
538 stream.next_open.take()
539 }
540
541 fn is_queued(stream: &Stream) -> bool {
542 stream.is_pending_open
543 }
544
545 fn set_queued(stream: &mut Stream, val: bool) {
546 if val {
547 // ensure that stream is not queued for being sent
548 // if it's being put into queue for opening the stream
549 debug_assert!(!stream.is_pending_send);
550 }
551 stream.is_pending_open = val;
552 }
553}
554
555impl store::Next for NextResetExpire {
556 fn next(stream: &Stream) -> Option<store::Key> {
557 stream.next_reset_expire
558 }
559
560 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
561 stream.next_reset_expire = key;
562 }
563
564 fn take_next(stream: &mut Stream) -> Option<store::Key> {
565 stream.next_reset_expire.take()
566 }
567
568 fn is_queued(stream: &Stream) -> bool {
569 stream.reset_at.is_some()
570 }
571
572 fn set_queued(stream: &mut Stream, val: bool) {
573 if val {
574 stream.reset_at = Some(Instant::now());
575 } else {
576 stream.reset_at = None;
577 }
578 }
579}
580
581// ===== impl ContentLength =====
582
583impl ContentLength {
584 pub fn is_head(&self) -> bool {
585 matches!(*self, Self::Head)
586 }
587}
588