1 | use crate::Reason; |
2 | |
3 | use super::*; |
4 | |
5 | use std::fmt; |
6 | use std::task::{Context, Waker}; |
7 | use std::time::Instant; |
8 | |
9 | /// Tracks Stream related state |
10 | /// |
11 | /// # Reference counting |
12 | /// |
13 | /// There can be a number of outstanding handles to a single Stream. These are |
14 | /// tracked using reference counting. The `ref_count` field represents the |
15 | /// number of outstanding userspace handles that can reach this stream. |
16 | /// |
17 | /// It's important to note that when the stream is placed in an internal queue |
18 | /// (such as an accept queue), this is **not** tracked by a reference count. |
19 | /// Thus, `ref_count` can be zero and the stream still has to be kept around. |
20 | pub(super) struct Stream { |
21 | /// The h2 stream identifier |
22 | pub id: StreamId, |
23 | |
24 | /// Current state of the stream |
25 | pub state: State, |
26 | |
27 | /// Set to `true` when the stream is counted against the connection's max |
28 | /// concurrent streams. |
29 | pub is_counted: bool, |
30 | |
31 | /// Number of outstanding handles pointing to this stream |
32 | pub ref_count: usize, |
33 | |
34 | // ===== Fields related to sending ===== |
35 | /// Next node in the accept linked list |
36 | pub next_pending_send: Option<store::Key>, |
37 | |
38 | /// Set to true when the stream is pending accept |
39 | pub is_pending_send: bool, |
40 | |
41 | /// Send data flow control |
42 | pub send_flow: FlowControl, |
43 | |
44 | /// Amount of send capacity that has been requested, but not yet allocated. |
45 | pub requested_send_capacity: WindowSize, |
46 | |
47 | /// Amount of data buffered at the prioritization layer. |
48 | /// TODO: Technically this could be greater than the window size... |
49 | pub buffered_send_data: usize, |
50 | |
51 | /// Task tracking additional send capacity (i.e. window updates). |
52 | send_task: Option<Waker>, |
53 | |
54 | /// Frames pending for this stream being sent to the socket |
55 | pub pending_send: buffer::Deque, |
56 | |
57 | /// Next node in the linked list of streams waiting for additional |
58 | /// connection level capacity. |
59 | pub next_pending_send_capacity: Option<store::Key>, |
60 | |
61 | /// True if the stream is waiting for outbound connection capacity |
62 | pub is_pending_send_capacity: bool, |
63 | |
64 | /// Set to true when the send capacity has been incremented |
65 | pub send_capacity_inc: bool, |
66 | |
67 | /// Next node in the open linked list |
68 | pub next_open: Option<store::Key>, |
69 | |
70 | /// Set to true when the stream is pending to be opened |
71 | pub is_pending_open: bool, |
72 | |
73 | /// Set to true when a push is pending for this stream |
74 | pub is_pending_push: bool, |
75 | |
76 | // ===== Fields related to receiving ===== |
77 | /// Next node in the accept linked list |
78 | pub next_pending_accept: Option<store::Key>, |
79 | |
80 | /// Set to true when the stream is pending accept |
81 | pub is_pending_accept: bool, |
82 | |
83 | /// Receive data flow control |
84 | pub recv_flow: FlowControl, |
85 | |
86 | pub in_flight_recv_data: WindowSize, |
87 | |
88 | /// Next node in the linked list of streams waiting to send window updates. |
89 | pub next_window_update: Option<store::Key>, |
90 | |
91 | /// True if the stream is waiting to send a window update |
92 | pub is_pending_window_update: bool, |
93 | |
94 | /// The time when this stream may have been locally reset. |
95 | pub reset_at: Option<Instant>, |
96 | |
97 | /// Next node in list of reset streams that should expire eventually |
98 | pub next_reset_expire: Option<store::Key>, |
99 | |
100 | /// Frames pending for this stream to read |
101 | pub pending_recv: buffer::Deque, |
102 | |
103 | /// When the RecvStream drop occurs, no data should be received. |
104 | pub is_recv: bool, |
105 | |
106 | /// Task tracking receiving frames |
107 | pub recv_task: Option<Waker>, |
108 | |
109 | /// Task tracking pushed promises. |
110 | pub push_task: Option<Waker>, |
111 | |
112 | /// The stream's pending push promises |
113 | pub pending_push_promises: store::Queue<NextAccept>, |
114 | |
115 | /// Validate content-length headers |
116 | pub content_length: ContentLength, |
117 | } |
118 | |
119 | /// State related to validating a stream's content-length |
120 | #[derive (Debug)] |
121 | pub enum ContentLength { |
122 | Omitted, |
123 | Head, |
124 | Remaining(u64), |
125 | } |
126 | |
127 | #[derive (Debug)] |
128 | pub(super) struct NextAccept; |
129 | |
130 | #[derive (Debug)] |
131 | pub(super) struct NextSend; |
132 | |
133 | #[derive (Debug)] |
134 | pub(super) struct NextSendCapacity; |
135 | |
136 | #[derive (Debug)] |
137 | pub(super) struct NextWindowUpdate; |
138 | |
139 | #[derive (Debug)] |
140 | pub(super) struct NextOpen; |
141 | |
142 | #[derive (Debug)] |
143 | pub(super) struct NextResetExpire; |
144 | |
145 | impl Stream { |
146 | pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { |
147 | let mut send_flow = FlowControl::new(); |
148 | let mut recv_flow = FlowControl::new(); |
149 | |
150 | recv_flow |
151 | .inc_window(init_recv_window) |
152 | .expect("invalid initial receive window" ); |
153 | // TODO: proper error handling? |
154 | let _res = recv_flow.assign_capacity(init_recv_window); |
155 | debug_assert!(_res.is_ok()); |
156 | |
157 | send_flow |
158 | .inc_window(init_send_window) |
159 | .expect("invalid initial send window size" ); |
160 | |
161 | Stream { |
162 | id, |
163 | state: State::default(), |
164 | ref_count: 0, |
165 | is_counted: false, |
166 | |
167 | // ===== Fields related to sending ===== |
168 | next_pending_send: None, |
169 | is_pending_send: false, |
170 | send_flow, |
171 | requested_send_capacity: 0, |
172 | buffered_send_data: 0, |
173 | send_task: None, |
174 | pending_send: buffer::Deque::new(), |
175 | is_pending_send_capacity: false, |
176 | next_pending_send_capacity: None, |
177 | send_capacity_inc: false, |
178 | is_pending_open: false, |
179 | next_open: None, |
180 | is_pending_push: false, |
181 | |
182 | // ===== Fields related to receiving ===== |
183 | next_pending_accept: None, |
184 | is_pending_accept: false, |
185 | recv_flow, |
186 | in_flight_recv_data: 0, |
187 | next_window_update: None, |
188 | is_pending_window_update: false, |
189 | reset_at: None, |
190 | next_reset_expire: None, |
191 | pending_recv: buffer::Deque::new(), |
192 | is_recv: true, |
193 | recv_task: None, |
194 | push_task: None, |
195 | pending_push_promises: store::Queue::new(), |
196 | content_length: ContentLength::Omitted, |
197 | } |
198 | } |
199 | |
200 | /// Increment the stream's ref count |
201 | pub fn ref_inc(&mut self) { |
202 | assert!(self.ref_count < usize::MAX); |
203 | self.ref_count += 1; |
204 | } |
205 | |
206 | /// Decrements the stream's ref count |
207 | pub fn ref_dec(&mut self) { |
208 | assert!(self.ref_count > 0); |
209 | self.ref_count -= 1; |
210 | } |
211 | |
212 | /// Returns true if stream is currently being held for some time because of |
213 | /// a local reset. |
214 | pub fn is_pending_reset_expiration(&self) -> bool { |
215 | self.reset_at.is_some() |
216 | } |
217 | |
218 | /// Returns true if frames for this stream are ready to be sent over the wire |
219 | pub fn is_send_ready(&self) -> bool { |
220 | // Why do we check pending_open? |
221 | // |
222 | // We allow users to call send_request() which schedules a stream to be pending_open |
223 | // if there is no room according to the concurrency limit (max_send_streams), and we |
224 | // also allow data to be buffered for send with send_data() if there is no capacity for |
225 | // the stream to send the data, which attempts to place the stream in pending_send. |
226 | // If the stream is not open, we don't want the stream to be scheduled for |
227 | // execution (pending_send). Note that if the stream is in pending_open, it will be |
228 | // pushed to pending_send when there is room for an open stream. |
229 | // |
230 | // In pending_push we track whether a PushPromise still needs to be sent |
231 | // from a different stream before we can start sending frames on this one. |
232 | // This is different from the "open" check because reserved streams don't count |
233 | // toward the concurrency limit. |
234 | // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 |
235 | !self.is_pending_open && !self.is_pending_push |
236 | } |
237 | |
238 | /// Returns true if the stream is closed |
239 | pub fn is_closed(&self) -> bool { |
240 | // The state has fully transitioned to closed. |
241 | self.state.is_closed() && |
242 | // Because outbound frames transition the stream state before being |
243 | // buffered, we have to ensure that all frames have been flushed. |
244 | self.pending_send.is_empty() && |
245 | // Sometimes large data frames are sent out in chunks. After a chunk |
246 | // of the frame is sent, the remainder is pushed back onto the send |
247 | // queue to be rescheduled. |
248 | // |
249 | // Checking for additional buffered data lets us catch this case. |
250 | self.buffered_send_data == 0 |
251 | } |
252 | |
253 | /// Returns true if the stream is no longer in use |
254 | pub fn is_released(&self) -> bool { |
255 | // The stream is closed and fully flushed |
256 | self.is_closed() && |
257 | // There are no more outstanding references to the stream |
258 | self.ref_count == 0 && |
259 | // The stream is not in any queue |
260 | !self.is_pending_send && !self.is_pending_send_capacity && |
261 | !self.is_pending_accept && !self.is_pending_window_update && |
262 | !self.is_pending_open && self.reset_at.is_none() |
263 | } |
264 | |
265 | /// Returns true when the consumer of the stream has dropped all handles |
266 | /// (indicating no further interest in the stream) and the stream state is |
267 | /// not actually closed. |
268 | /// |
269 | /// In this case, a reset should be sent. |
270 | pub fn is_canceled_interest(&self) -> bool { |
271 | self.ref_count == 0 && !self.state.is_closed() |
272 | } |
273 | |
274 | /// Current available stream send capacity |
275 | pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { |
276 | let available = self.send_flow.available().as_size() as usize; |
277 | let buffered = self.buffered_send_data; |
278 | |
279 | available.min(max_buffer_size).saturating_sub(buffered) as WindowSize |
280 | } |
281 | |
282 | pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { |
283 | let prev_capacity = self.capacity(max_buffer_size); |
284 | debug_assert!(capacity > 0); |
285 | // TODO: proper error handling |
286 | let _res = self.send_flow.assign_capacity(capacity); |
287 | debug_assert!(_res.is_ok()); |
288 | |
289 | tracing::trace!( |
290 | " assigned capacity to stream; available= {}; buffered= {}; id= {:?}; max_buffer_size= {} prev= {}" , |
291 | self.send_flow.available(), |
292 | self.buffered_send_data, |
293 | self.id, |
294 | max_buffer_size, |
295 | prev_capacity, |
296 | ); |
297 | |
298 | if prev_capacity < self.capacity(max_buffer_size) { |
299 | self.notify_capacity(); |
300 | } |
301 | } |
302 | |
303 | pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { |
304 | let prev_capacity = self.capacity(max_buffer_size); |
305 | |
306 | // TODO: proper error handling |
307 | let _res = self.send_flow.send_data(len); |
308 | debug_assert!(_res.is_ok()); |
309 | |
310 | // Decrement the stream's buffered data counter |
311 | debug_assert!(self.buffered_send_data >= len as usize); |
312 | self.buffered_send_data -= len as usize; |
313 | self.requested_send_capacity -= len; |
314 | |
315 | tracing::trace!( |
316 | " sent stream data; available= {}; buffered= {}; id= {:?}; max_buffer_size= {} prev= {}" , |
317 | self.send_flow.available(), |
318 | self.buffered_send_data, |
319 | self.id, |
320 | max_buffer_size, |
321 | prev_capacity, |
322 | ); |
323 | |
324 | if prev_capacity < self.capacity(max_buffer_size) { |
325 | self.notify_capacity(); |
326 | } |
327 | } |
328 | |
329 | /// If the capacity was limited because of the max_send_buffer_size, |
330 | /// then consider waking the send task again... |
331 | pub fn notify_capacity(&mut self) { |
332 | self.send_capacity_inc = true; |
333 | tracing::trace!(" notifying task" ); |
334 | self.notify_send(); |
335 | } |
336 | |
337 | /// Returns `Err` when the decrement cannot be completed due to overflow. |
338 | pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { |
339 | match self.content_length { |
340 | ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { |
341 | Some(val) => *rem = val, |
342 | None => return Err(()), |
343 | }, |
344 | ContentLength::Head => { |
345 | if len != 0 { |
346 | return Err(()); |
347 | } |
348 | } |
349 | _ => {} |
350 | } |
351 | |
352 | Ok(()) |
353 | } |
354 | |
355 | pub fn ensure_content_length_zero(&self) -> Result<(), ()> { |
356 | match self.content_length { |
357 | ContentLength::Remaining(0) => Ok(()), |
358 | ContentLength::Remaining(_) => Err(()), |
359 | _ => Ok(()), |
360 | } |
361 | } |
362 | |
363 | pub fn notify_send(&mut self) { |
364 | if let Some(task) = self.send_task.take() { |
365 | task.wake(); |
366 | } |
367 | } |
368 | |
369 | pub fn wait_send(&mut self, cx: &Context) { |
370 | self.send_task = Some(cx.waker().clone()); |
371 | } |
372 | |
373 | pub fn notify_recv(&mut self) { |
374 | if let Some(task) = self.recv_task.take() { |
375 | task.wake(); |
376 | } |
377 | } |
378 | |
379 | pub(super) fn notify_push(&mut self) { |
380 | if let Some(task) = self.push_task.take() { |
381 | task.wake(); |
382 | } |
383 | } |
384 | |
385 | /// Set the stream's state to `Closed` with the given reason and initiator. |
386 | /// Notify the send and receive tasks, if they exist. |
387 | pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) { |
388 | self.state.set_reset(self.id, reason, initiator); |
389 | self.notify_push(); |
390 | self.notify_recv(); |
391 | } |
392 | } |
393 | |
394 | impl fmt::Debug for Stream { |
395 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
396 | f.debug_struct("Stream" ) |
397 | .field("id" , &self.id) |
398 | .field("state" , &self.state) |
399 | .field("is_counted" , &self.is_counted) |
400 | .field("ref_count" , &self.ref_count) |
401 | .field("next_pending_send" , &self.next_pending_send) |
402 | .field("is_pending_send" , &self.is_pending_send) |
403 | .field("send_flow" , &self.send_flow) |
404 | .field("requested_send_capacity" , &self.requested_send_capacity) |
405 | .field("buffered_send_data" , &self.buffered_send_data) |
406 | .field("send_task" , &self.send_task.as_ref().map(|_| ())) |
407 | .field("pending_send" , &self.pending_send) |
408 | .field( |
409 | "next_pending_send_capacity" , |
410 | &self.next_pending_send_capacity, |
411 | ) |
412 | .field("is_pending_send_capacity" , &self.is_pending_send_capacity) |
413 | .field("send_capacity_inc" , &self.send_capacity_inc) |
414 | .field("next_open" , &self.next_open) |
415 | .field("is_pending_open" , &self.is_pending_open) |
416 | .field("is_pending_push" , &self.is_pending_push) |
417 | .field("next_pending_accept" , &self.next_pending_accept) |
418 | .field("is_pending_accept" , &self.is_pending_accept) |
419 | .field("recv_flow" , &self.recv_flow) |
420 | .field("in_flight_recv_data" , &self.in_flight_recv_data) |
421 | .field("next_window_update" , &self.next_window_update) |
422 | .field("is_pending_window_update" , &self.is_pending_window_update) |
423 | .field("reset_at" , &self.reset_at) |
424 | .field("next_reset_expire" , &self.next_reset_expire) |
425 | .field("pending_recv" , &self.pending_recv) |
426 | .field("is_recv" , &self.is_recv) |
427 | .field("recv_task" , &self.recv_task.as_ref().map(|_| ())) |
428 | .field("push_task" , &self.push_task.as_ref().map(|_| ())) |
429 | .field("pending_push_promises" , &self.pending_push_promises) |
430 | .field("content_length" , &self.content_length) |
431 | .finish() |
432 | } |
433 | } |
434 | |
435 | impl store::Next for NextAccept { |
436 | fn next(stream: &Stream) -> Option<store::Key> { |
437 | stream.next_pending_accept |
438 | } |
439 | |
440 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
441 | stream.next_pending_accept = key; |
442 | } |
443 | |
444 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
445 | stream.next_pending_accept.take() |
446 | } |
447 | |
448 | fn is_queued(stream: &Stream) -> bool { |
449 | stream.is_pending_accept |
450 | } |
451 | |
452 | fn set_queued(stream: &mut Stream, val: bool) { |
453 | stream.is_pending_accept = val; |
454 | } |
455 | } |
456 | |
457 | impl store::Next for NextSend { |
458 | fn next(stream: &Stream) -> Option<store::Key> { |
459 | stream.next_pending_send |
460 | } |
461 | |
462 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
463 | stream.next_pending_send = key; |
464 | } |
465 | |
466 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
467 | stream.next_pending_send.take() |
468 | } |
469 | |
470 | fn is_queued(stream: &Stream) -> bool { |
471 | stream.is_pending_send |
472 | } |
473 | |
474 | fn set_queued(stream: &mut Stream, val: bool) { |
475 | if val { |
476 | // ensure that stream is not queued for being opened |
477 | // if it's being put into queue for sending data |
478 | debug_assert!(!stream.is_pending_open); |
479 | } |
480 | stream.is_pending_send = val; |
481 | } |
482 | } |
483 | |
484 | impl store::Next for NextSendCapacity { |
485 | fn next(stream: &Stream) -> Option<store::Key> { |
486 | stream.next_pending_send_capacity |
487 | } |
488 | |
489 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
490 | stream.next_pending_send_capacity = key; |
491 | } |
492 | |
493 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
494 | stream.next_pending_send_capacity.take() |
495 | } |
496 | |
497 | fn is_queued(stream: &Stream) -> bool { |
498 | stream.is_pending_send_capacity |
499 | } |
500 | |
501 | fn set_queued(stream: &mut Stream, val: bool) { |
502 | stream.is_pending_send_capacity = val; |
503 | } |
504 | } |
505 | |
506 | impl store::Next for NextWindowUpdate { |
507 | fn next(stream: &Stream) -> Option<store::Key> { |
508 | stream.next_window_update |
509 | } |
510 | |
511 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
512 | stream.next_window_update = key; |
513 | } |
514 | |
515 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
516 | stream.next_window_update.take() |
517 | } |
518 | |
519 | fn is_queued(stream: &Stream) -> bool { |
520 | stream.is_pending_window_update |
521 | } |
522 | |
523 | fn set_queued(stream: &mut Stream, val: bool) { |
524 | stream.is_pending_window_update = val; |
525 | } |
526 | } |
527 | |
528 | impl store::Next for NextOpen { |
529 | fn next(stream: &Stream) -> Option<store::Key> { |
530 | stream.next_open |
531 | } |
532 | |
533 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
534 | stream.next_open = key; |
535 | } |
536 | |
537 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
538 | stream.next_open.take() |
539 | } |
540 | |
541 | fn is_queued(stream: &Stream) -> bool { |
542 | stream.is_pending_open |
543 | } |
544 | |
545 | fn set_queued(stream: &mut Stream, val: bool) { |
546 | if val { |
547 | // ensure that stream is not queued for being sent |
548 | // if it's being put into queue for opening the stream |
549 | debug_assert!(!stream.is_pending_send); |
550 | } |
551 | stream.is_pending_open = val; |
552 | } |
553 | } |
554 | |
555 | impl store::Next for NextResetExpire { |
556 | fn next(stream: &Stream) -> Option<store::Key> { |
557 | stream.next_reset_expire |
558 | } |
559 | |
560 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
561 | stream.next_reset_expire = key; |
562 | } |
563 | |
564 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
565 | stream.next_reset_expire.take() |
566 | } |
567 | |
568 | fn is_queued(stream: &Stream) -> bool { |
569 | stream.reset_at.is_some() |
570 | } |
571 | |
572 | fn set_queued(stream: &mut Stream, val: bool) { |
573 | if val { |
574 | stream.reset_at = Some(Instant::now()); |
575 | } else { |
576 | stream.reset_at = None; |
577 | } |
578 | } |
579 | } |
580 | |
581 | // ===== impl ContentLength ===== |
582 | |
583 | impl ContentLength { |
584 | pub fn is_head(&self) -> bool { |
585 | matches!(*self, Self::Head) |
586 | } |
587 | } |
588 | |