1use super::*;
2
3use std::task::{Context, Waker};
4use std::time::Instant;
5use std::usize;
6
7/// Tracks Stream related state
8///
9/// # Reference counting
10///
11/// There can be a number of outstanding handles to a single Stream. These are
12/// tracked using reference counting. The `ref_count` field represents the
13/// number of outstanding userspace handles that can reach this stream.
14///
15/// It's important to note that when the stream is placed in an internal queue
16/// (such as an accept queue), this is **not** tracked by a reference count.
17/// Thus, `ref_count` can be zero and the stream still has to be kept around.
18#[derive(Debug)]
19pub(super) struct Stream {
20 /// The h2 stream identifier
21 pub id: StreamId,
22
23 /// Current state of the stream
24 pub state: State,
25
26 /// Set to `true` when the stream is counted against the connection's max
27 /// concurrent streams.
28 pub is_counted: bool,
29
30 /// Number of outstanding handles pointing to this stream
31 pub ref_count: usize,
32
33 // ===== Fields related to sending =====
34 /// Next node in the accept linked list
35 pub next_pending_send: Option<store::Key>,
36
37 /// Set to true when the stream is pending accept
38 pub is_pending_send: bool,
39
40 /// Send data flow control
41 pub send_flow: FlowControl,
42
43 /// Amount of send capacity that has been requested, but not yet allocated.
44 pub requested_send_capacity: WindowSize,
45
46 /// Amount of data buffered at the prioritization layer.
47 /// TODO: Technically this could be greater than the window size...
48 pub buffered_send_data: usize,
49
50 /// Task tracking additional send capacity (i.e. window updates).
51 send_task: Option<Waker>,
52
53 /// Frames pending for this stream being sent to the socket
54 pub pending_send: buffer::Deque,
55
56 /// Next node in the linked list of streams waiting for additional
57 /// connection level capacity.
58 pub next_pending_send_capacity: Option<store::Key>,
59
60 /// True if the stream is waiting for outbound connection capacity
61 pub is_pending_send_capacity: bool,
62
63 /// Set to true when the send capacity has been incremented
64 pub send_capacity_inc: bool,
65
66 /// Next node in the open linked list
67 pub next_open: Option<store::Key>,
68
69 /// Set to true when the stream is pending to be opened
70 pub is_pending_open: bool,
71
72 /// Set to true when a push is pending for this stream
73 pub is_pending_push: bool,
74
75 // ===== Fields related to receiving =====
76 /// Next node in the accept linked list
77 pub next_pending_accept: Option<store::Key>,
78
79 /// Set to true when the stream is pending accept
80 pub is_pending_accept: bool,
81
82 /// Receive data flow control
83 pub recv_flow: FlowControl,
84
85 pub in_flight_recv_data: WindowSize,
86
87 /// Next node in the linked list of streams waiting to send window updates.
88 pub next_window_update: Option<store::Key>,
89
90 /// True if the stream is waiting to send a window update
91 pub is_pending_window_update: bool,
92
93 /// The time when this stream may have been locally reset.
94 pub reset_at: Option<Instant>,
95
96 /// Next node in list of reset streams that should expire eventually
97 pub next_reset_expire: Option<store::Key>,
98
99 /// Frames pending for this stream to read
100 pub pending_recv: buffer::Deque,
101
102 /// When the RecvStream drop occurs, no data should be received.
103 pub is_recv: bool,
104
105 /// Task tracking receiving frames
106 pub recv_task: Option<Waker>,
107
108 /// The stream's pending push promises
109 pub pending_push_promises: store::Queue<NextAccept>,
110
111 /// Validate content-length headers
112 pub content_length: ContentLength,
113}
114
115/// State related to validating a stream's content-length
116#[derive(Debug)]
117pub enum ContentLength {
118 Omitted,
119 Head,
120 Remaining(u64),
121}
122
123#[derive(Debug)]
124pub(super) struct NextAccept;
125
126#[derive(Debug)]
127pub(super) struct NextSend;
128
129#[derive(Debug)]
130pub(super) struct NextSendCapacity;
131
132#[derive(Debug)]
133pub(super) struct NextWindowUpdate;
134
135#[derive(Debug)]
136pub(super) struct NextOpen;
137
138#[derive(Debug)]
139pub(super) struct NextResetExpire;
140
141impl Stream {
142 pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream {
143 let mut send_flow = FlowControl::new();
144 let mut recv_flow = FlowControl::new();
145
146 recv_flow
147 .inc_window(init_recv_window)
148 .expect("invalid initial receive window");
149 // TODO: proper error handling?
150 let _res = recv_flow.assign_capacity(init_recv_window);
151 debug_assert!(_res.is_ok());
152
153 send_flow
154 .inc_window(init_send_window)
155 .expect("invalid initial send window size");
156
157 Stream {
158 id,
159 state: State::default(),
160 ref_count: 0,
161 is_counted: false,
162
163 // ===== Fields related to sending =====
164 next_pending_send: None,
165 is_pending_send: false,
166 send_flow,
167 requested_send_capacity: 0,
168 buffered_send_data: 0,
169 send_task: None,
170 pending_send: buffer::Deque::new(),
171 is_pending_send_capacity: false,
172 next_pending_send_capacity: None,
173 send_capacity_inc: false,
174 is_pending_open: false,
175 next_open: None,
176 is_pending_push: false,
177
178 // ===== Fields related to receiving =====
179 next_pending_accept: None,
180 is_pending_accept: false,
181 recv_flow,
182 in_flight_recv_data: 0,
183 next_window_update: None,
184 is_pending_window_update: false,
185 reset_at: None,
186 next_reset_expire: None,
187 pending_recv: buffer::Deque::new(),
188 is_recv: true,
189 recv_task: None,
190 pending_push_promises: store::Queue::new(),
191 content_length: ContentLength::Omitted,
192 }
193 }
194
195 /// Increment the stream's ref count
196 pub fn ref_inc(&mut self) {
197 assert!(self.ref_count < usize::MAX);
198 self.ref_count += 1;
199 }
200
201 /// Decrements the stream's ref count
202 pub fn ref_dec(&mut self) {
203 assert!(self.ref_count > 0);
204 self.ref_count -= 1;
205 }
206
207 /// Returns true if stream is currently being held for some time because of
208 /// a local reset.
209 pub fn is_pending_reset_expiration(&self) -> bool {
210 self.reset_at.is_some()
211 }
212
213 /// Returns true if frames for this stream are ready to be sent over the wire
214 pub fn is_send_ready(&self) -> bool {
215 // Why do we check pending_open?
216 //
217 // We allow users to call send_request() which schedules a stream to be pending_open
218 // if there is no room according to the concurrency limit (max_send_streams), and we
219 // also allow data to be buffered for send with send_data() if there is no capacity for
220 // the stream to send the data, which attempts to place the stream in pending_send.
221 // If the stream is not open, we don't want the stream to be scheduled for
222 // execution (pending_send). Note that if the stream is in pending_open, it will be
223 // pushed to pending_send when there is room for an open stream.
224 //
225 // In pending_push we track whether a PushPromise still needs to be sent
226 // from a different stream before we can start sending frames on this one.
227 // This is different from the "open" check because reserved streams don't count
228 // toward the concurrency limit.
229 // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
230 !self.is_pending_open && !self.is_pending_push
231 }
232
233 /// Returns true if the stream is closed
234 pub fn is_closed(&self) -> bool {
235 // The state has fully transitioned to closed.
236 self.state.is_closed() &&
237 // Because outbound frames transition the stream state before being
238 // buffered, we have to ensure that all frames have been flushed.
239 self.pending_send.is_empty() &&
240 // Sometimes large data frames are sent out in chunks. After a chunk
241 // of the frame is sent, the remainder is pushed back onto the send
242 // queue to be rescheduled.
243 //
244 // Checking for additional buffered data lets us catch this case.
245 self.buffered_send_data == 0
246 }
247
248 /// Returns true if the stream is no longer in use
249 pub fn is_released(&self) -> bool {
250 // The stream is closed and fully flushed
251 self.is_closed() &&
252 // There are no more outstanding references to the stream
253 self.ref_count == 0 &&
254 // The stream is not in any queue
255 !self.is_pending_send && !self.is_pending_send_capacity &&
256 !self.is_pending_accept && !self.is_pending_window_update &&
257 !self.is_pending_open && self.reset_at.is_none()
258 }
259
260 /// Returns true when the consumer of the stream has dropped all handles
261 /// (indicating no further interest in the stream) and the stream state is
262 /// not actually closed.
263 ///
264 /// In this case, a reset should be sent.
265 pub fn is_canceled_interest(&self) -> bool {
266 self.ref_count == 0 && !self.state.is_closed()
267 }
268
269 /// Current available stream send capacity
270 pub fn capacity(&self, max_buffer_size: usize) -> WindowSize {
271 let available = self.send_flow.available().as_size() as usize;
272 let buffered = self.buffered_send_data;
273
274 available.min(max_buffer_size).saturating_sub(buffered) as WindowSize
275 }
276
277 pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) {
278 let prev_capacity = self.capacity(max_buffer_size);
279 debug_assert!(capacity > 0);
280 // TODO: proper error handling
281 let _res = self.send_flow.assign_capacity(capacity);
282 debug_assert!(_res.is_ok());
283
284 tracing::trace!(
285 " assigned capacity to stream; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
286 self.send_flow.available(),
287 self.buffered_send_data,
288 self.id,
289 max_buffer_size,
290 prev_capacity,
291 );
292
293 if prev_capacity < self.capacity(max_buffer_size) {
294 self.notify_capacity();
295 }
296 }
297
298 pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) {
299 let prev_capacity = self.capacity(max_buffer_size);
300
301 // TODO: proper error handling
302 let _res = self.send_flow.send_data(len);
303 debug_assert!(_res.is_ok());
304
305 // Decrement the stream's buffered data counter
306 debug_assert!(self.buffered_send_data >= len as usize);
307 self.buffered_send_data -= len as usize;
308 self.requested_send_capacity -= len;
309
310 tracing::trace!(
311 " sent stream data; available={}; buffered={}; id={:?}; max_buffer_size={} prev={}",
312 self.send_flow.available(),
313 self.buffered_send_data,
314 self.id,
315 max_buffer_size,
316 prev_capacity,
317 );
318
319 if prev_capacity < self.capacity(max_buffer_size) {
320 self.notify_capacity();
321 }
322 }
323
324 /// If the capacity was limited because of the max_send_buffer_size,
325 /// then consider waking the send task again...
326 pub fn notify_capacity(&mut self) {
327 self.send_capacity_inc = true;
328 tracing::trace!(" notifying task");
329 self.notify_send();
330 }
331
332 /// Returns `Err` when the decrement cannot be completed due to overflow.
333 pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> {
334 match self.content_length {
335 ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) {
336 Some(val) => *rem = val,
337 None => return Err(()),
338 },
339 ContentLength::Head => {
340 if len != 0 {
341 return Err(());
342 }
343 }
344 _ => {}
345 }
346
347 Ok(())
348 }
349
350 pub fn ensure_content_length_zero(&self) -> Result<(), ()> {
351 match self.content_length {
352 ContentLength::Remaining(0) => Ok(()),
353 ContentLength::Remaining(_) => Err(()),
354 _ => Ok(()),
355 }
356 }
357
358 pub fn notify_send(&mut self) {
359 if let Some(task) = self.send_task.take() {
360 task.wake();
361 }
362 }
363
364 pub fn wait_send(&mut self, cx: &Context) {
365 self.send_task = Some(cx.waker().clone());
366 }
367
368 pub fn notify_recv(&mut self) {
369 if let Some(task) = self.recv_task.take() {
370 task.wake();
371 }
372 }
373}
374
375impl store::Next for NextAccept {
376 fn next(stream: &Stream) -> Option<store::Key> {
377 stream.next_pending_accept
378 }
379
380 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
381 stream.next_pending_accept = key;
382 }
383
384 fn take_next(stream: &mut Stream) -> Option<store::Key> {
385 stream.next_pending_accept.take()
386 }
387
388 fn is_queued(stream: &Stream) -> bool {
389 stream.is_pending_accept
390 }
391
392 fn set_queued(stream: &mut Stream, val: bool) {
393 stream.is_pending_accept = val;
394 }
395}
396
397impl store::Next for NextSend {
398 fn next(stream: &Stream) -> Option<store::Key> {
399 stream.next_pending_send
400 }
401
402 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
403 stream.next_pending_send = key;
404 }
405
406 fn take_next(stream: &mut Stream) -> Option<store::Key> {
407 stream.next_pending_send.take()
408 }
409
410 fn is_queued(stream: &Stream) -> bool {
411 stream.is_pending_send
412 }
413
414 fn set_queued(stream: &mut Stream, val: bool) {
415 if val {
416 // ensure that stream is not queued for being opened
417 // if it's being put into queue for sending data
418 debug_assert!(!stream.is_pending_open);
419 }
420 stream.is_pending_send = val;
421 }
422}
423
424impl store::Next for NextSendCapacity {
425 fn next(stream: &Stream) -> Option<store::Key> {
426 stream.next_pending_send_capacity
427 }
428
429 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
430 stream.next_pending_send_capacity = key;
431 }
432
433 fn take_next(stream: &mut Stream) -> Option<store::Key> {
434 stream.next_pending_send_capacity.take()
435 }
436
437 fn is_queued(stream: &Stream) -> bool {
438 stream.is_pending_send_capacity
439 }
440
441 fn set_queued(stream: &mut Stream, val: bool) {
442 stream.is_pending_send_capacity = val;
443 }
444}
445
446impl store::Next for NextWindowUpdate {
447 fn next(stream: &Stream) -> Option<store::Key> {
448 stream.next_window_update
449 }
450
451 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
452 stream.next_window_update = key;
453 }
454
455 fn take_next(stream: &mut Stream) -> Option<store::Key> {
456 stream.next_window_update.take()
457 }
458
459 fn is_queued(stream: &Stream) -> bool {
460 stream.is_pending_window_update
461 }
462
463 fn set_queued(stream: &mut Stream, val: bool) {
464 stream.is_pending_window_update = val;
465 }
466}
467
468impl store::Next for NextOpen {
469 fn next(stream: &Stream) -> Option<store::Key> {
470 stream.next_open
471 }
472
473 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
474 stream.next_open = key;
475 }
476
477 fn take_next(stream: &mut Stream) -> Option<store::Key> {
478 stream.next_open.take()
479 }
480
481 fn is_queued(stream: &Stream) -> bool {
482 stream.is_pending_open
483 }
484
485 fn set_queued(stream: &mut Stream, val: bool) {
486 if val {
487 // ensure that stream is not queued for being sent
488 // if it's being put into queue for opening the stream
489 debug_assert!(!stream.is_pending_send);
490 }
491 stream.is_pending_open = val;
492 }
493}
494
495impl store::Next for NextResetExpire {
496 fn next(stream: &Stream) -> Option<store::Key> {
497 stream.next_reset_expire
498 }
499
500 fn set_next(stream: &mut Stream, key: Option<store::Key>) {
501 stream.next_reset_expire = key;
502 }
503
504 fn take_next(stream: &mut Stream) -> Option<store::Key> {
505 stream.next_reset_expire.take()
506 }
507
508 fn is_queued(stream: &Stream) -> bool {
509 stream.reset_at.is_some()
510 }
511
512 fn set_queued(stream: &mut Stream, val: bool) {
513 if val {
514 stream.reset_at = Some(Instant::now());
515 } else {
516 stream.reset_at = None;
517 }
518 }
519}
520
521// ===== impl ContentLength =====
522
523impl ContentLength {
524 pub fn is_head(&self) -> bool {
525 matches!(*self, Self::Head)
526 }
527}
528