1 | use super::*; |
2 | |
3 | use std::task::{Context, Waker}; |
4 | use std::time::Instant; |
5 | use std::usize; |
6 | |
7 | /// Tracks Stream related state |
8 | /// |
9 | /// # Reference counting |
10 | /// |
11 | /// There can be a number of outstanding handles to a single Stream. These are |
12 | /// tracked using reference counting. The `ref_count` field represents the |
13 | /// number of outstanding userspace handles that can reach this stream. |
14 | /// |
15 | /// It's important to note that when the stream is placed in an internal queue |
16 | /// (such as an accept queue), this is **not** tracked by a reference count. |
17 | /// Thus, `ref_count` can be zero and the stream still has to be kept around. |
18 | #[derive (Debug)] |
19 | pub(super) struct Stream { |
20 | /// The h2 stream identifier |
21 | pub id: StreamId, |
22 | |
23 | /// Current state of the stream |
24 | pub state: State, |
25 | |
26 | /// Set to `true` when the stream is counted against the connection's max |
27 | /// concurrent streams. |
28 | pub is_counted: bool, |
29 | |
30 | /// Number of outstanding handles pointing to this stream |
31 | pub ref_count: usize, |
32 | |
33 | // ===== Fields related to sending ===== |
34 | /// Next node in the accept linked list |
35 | pub next_pending_send: Option<store::Key>, |
36 | |
37 | /// Set to true when the stream is pending accept |
38 | pub is_pending_send: bool, |
39 | |
40 | /// Send data flow control |
41 | pub send_flow: FlowControl, |
42 | |
43 | /// Amount of send capacity that has been requested, but not yet allocated. |
44 | pub requested_send_capacity: WindowSize, |
45 | |
46 | /// Amount of data buffered at the prioritization layer. |
47 | /// TODO: Technically this could be greater than the window size... |
48 | pub buffered_send_data: usize, |
49 | |
50 | /// Task tracking additional send capacity (i.e. window updates). |
51 | send_task: Option<Waker>, |
52 | |
53 | /// Frames pending for this stream being sent to the socket |
54 | pub pending_send: buffer::Deque, |
55 | |
56 | /// Next node in the linked list of streams waiting for additional |
57 | /// connection level capacity. |
58 | pub next_pending_send_capacity: Option<store::Key>, |
59 | |
60 | /// True if the stream is waiting for outbound connection capacity |
61 | pub is_pending_send_capacity: bool, |
62 | |
63 | /// Set to true when the send capacity has been incremented |
64 | pub send_capacity_inc: bool, |
65 | |
66 | /// Next node in the open linked list |
67 | pub next_open: Option<store::Key>, |
68 | |
69 | /// Set to true when the stream is pending to be opened |
70 | pub is_pending_open: bool, |
71 | |
72 | /// Set to true when a push is pending for this stream |
73 | pub is_pending_push: bool, |
74 | |
75 | // ===== Fields related to receiving ===== |
76 | /// Next node in the accept linked list |
77 | pub next_pending_accept: Option<store::Key>, |
78 | |
79 | /// Set to true when the stream is pending accept |
80 | pub is_pending_accept: bool, |
81 | |
82 | /// Receive data flow control |
83 | pub recv_flow: FlowControl, |
84 | |
85 | pub in_flight_recv_data: WindowSize, |
86 | |
87 | /// Next node in the linked list of streams waiting to send window updates. |
88 | pub next_window_update: Option<store::Key>, |
89 | |
90 | /// True if the stream is waiting to send a window update |
91 | pub is_pending_window_update: bool, |
92 | |
93 | /// The time when this stream may have been locally reset. |
94 | pub reset_at: Option<Instant>, |
95 | |
96 | /// Next node in list of reset streams that should expire eventually |
97 | pub next_reset_expire: Option<store::Key>, |
98 | |
99 | /// Frames pending for this stream to read |
100 | pub pending_recv: buffer::Deque, |
101 | |
102 | /// When the RecvStream drop occurs, no data should be received. |
103 | pub is_recv: bool, |
104 | |
105 | /// Task tracking receiving frames |
106 | pub recv_task: Option<Waker>, |
107 | |
108 | /// The stream's pending push promises |
109 | pub pending_push_promises: store::Queue<NextAccept>, |
110 | |
111 | /// Validate content-length headers |
112 | pub content_length: ContentLength, |
113 | } |
114 | |
115 | /// State related to validating a stream's content-length |
116 | #[derive (Debug)] |
117 | pub enum ContentLength { |
118 | Omitted, |
119 | Head, |
120 | Remaining(u64), |
121 | } |
122 | |
123 | #[derive (Debug)] |
124 | pub(super) struct NextAccept; |
125 | |
126 | #[derive (Debug)] |
127 | pub(super) struct NextSend; |
128 | |
129 | #[derive (Debug)] |
130 | pub(super) struct NextSendCapacity; |
131 | |
132 | #[derive (Debug)] |
133 | pub(super) struct NextWindowUpdate; |
134 | |
135 | #[derive (Debug)] |
136 | pub(super) struct NextOpen; |
137 | |
138 | #[derive (Debug)] |
139 | pub(super) struct NextResetExpire; |
140 | |
141 | impl Stream { |
142 | pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { |
143 | let mut send_flow = FlowControl::new(); |
144 | let mut recv_flow = FlowControl::new(); |
145 | |
146 | recv_flow |
147 | .inc_window(init_recv_window) |
148 | .expect("invalid initial receive window" ); |
149 | // TODO: proper error handling? |
150 | let _res = recv_flow.assign_capacity(init_recv_window); |
151 | debug_assert!(_res.is_ok()); |
152 | |
153 | send_flow |
154 | .inc_window(init_send_window) |
155 | .expect("invalid initial send window size" ); |
156 | |
157 | Stream { |
158 | id, |
159 | state: State::default(), |
160 | ref_count: 0, |
161 | is_counted: false, |
162 | |
163 | // ===== Fields related to sending ===== |
164 | next_pending_send: None, |
165 | is_pending_send: false, |
166 | send_flow, |
167 | requested_send_capacity: 0, |
168 | buffered_send_data: 0, |
169 | send_task: None, |
170 | pending_send: buffer::Deque::new(), |
171 | is_pending_send_capacity: false, |
172 | next_pending_send_capacity: None, |
173 | send_capacity_inc: false, |
174 | is_pending_open: false, |
175 | next_open: None, |
176 | is_pending_push: false, |
177 | |
178 | // ===== Fields related to receiving ===== |
179 | next_pending_accept: None, |
180 | is_pending_accept: false, |
181 | recv_flow, |
182 | in_flight_recv_data: 0, |
183 | next_window_update: None, |
184 | is_pending_window_update: false, |
185 | reset_at: None, |
186 | next_reset_expire: None, |
187 | pending_recv: buffer::Deque::new(), |
188 | is_recv: true, |
189 | recv_task: None, |
190 | pending_push_promises: store::Queue::new(), |
191 | content_length: ContentLength::Omitted, |
192 | } |
193 | } |
194 | |
195 | /// Increment the stream's ref count |
196 | pub fn ref_inc(&mut self) { |
197 | assert!(self.ref_count < usize::MAX); |
198 | self.ref_count += 1; |
199 | } |
200 | |
201 | /// Decrements the stream's ref count |
202 | pub fn ref_dec(&mut self) { |
203 | assert!(self.ref_count > 0); |
204 | self.ref_count -= 1; |
205 | } |
206 | |
207 | /// Returns true if stream is currently being held for some time because of |
208 | /// a local reset. |
209 | pub fn is_pending_reset_expiration(&self) -> bool { |
210 | self.reset_at.is_some() |
211 | } |
212 | |
213 | /// Returns true if frames for this stream are ready to be sent over the wire |
214 | pub fn is_send_ready(&self) -> bool { |
215 | // Why do we check pending_open? |
216 | // |
217 | // We allow users to call send_request() which schedules a stream to be pending_open |
218 | // if there is no room according to the concurrency limit (max_send_streams), and we |
219 | // also allow data to be buffered for send with send_data() if there is no capacity for |
220 | // the stream to send the data, which attempts to place the stream in pending_send. |
221 | // If the stream is not open, we don't want the stream to be scheduled for |
222 | // execution (pending_send). Note that if the stream is in pending_open, it will be |
223 | // pushed to pending_send when there is room for an open stream. |
224 | // |
225 | // In pending_push we track whether a PushPromise still needs to be sent |
226 | // from a different stream before we can start sending frames on this one. |
227 | // This is different from the "open" check because reserved streams don't count |
228 | // toward the concurrency limit. |
229 | // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 |
230 | !self.is_pending_open && !self.is_pending_push |
231 | } |
232 | |
233 | /// Returns true if the stream is closed |
234 | pub fn is_closed(&self) -> bool { |
235 | // The state has fully transitioned to closed. |
236 | self.state.is_closed() && |
237 | // Because outbound frames transition the stream state before being |
238 | // buffered, we have to ensure that all frames have been flushed. |
239 | self.pending_send.is_empty() && |
240 | // Sometimes large data frames are sent out in chunks. After a chunk |
241 | // of the frame is sent, the remainder is pushed back onto the send |
242 | // queue to be rescheduled. |
243 | // |
244 | // Checking for additional buffered data lets us catch this case. |
245 | self.buffered_send_data == 0 |
246 | } |
247 | |
248 | /// Returns true if the stream is no longer in use |
249 | pub fn is_released(&self) -> bool { |
250 | // The stream is closed and fully flushed |
251 | self.is_closed() && |
252 | // There are no more outstanding references to the stream |
253 | self.ref_count == 0 && |
254 | // The stream is not in any queue |
255 | !self.is_pending_send && !self.is_pending_send_capacity && |
256 | !self.is_pending_accept && !self.is_pending_window_update && |
257 | !self.is_pending_open && self.reset_at.is_none() |
258 | } |
259 | |
260 | /// Returns true when the consumer of the stream has dropped all handles |
261 | /// (indicating no further interest in the stream) and the stream state is |
262 | /// not actually closed. |
263 | /// |
264 | /// In this case, a reset should be sent. |
265 | pub fn is_canceled_interest(&self) -> bool { |
266 | self.ref_count == 0 && !self.state.is_closed() |
267 | } |
268 | |
269 | /// Current available stream send capacity |
270 | pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { |
271 | let available = self.send_flow.available().as_size() as usize; |
272 | let buffered = self.buffered_send_data; |
273 | |
274 | available.min(max_buffer_size).saturating_sub(buffered) as WindowSize |
275 | } |
276 | |
277 | pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { |
278 | let prev_capacity = self.capacity(max_buffer_size); |
279 | debug_assert!(capacity > 0); |
280 | // TODO: proper error handling |
281 | let _res = self.send_flow.assign_capacity(capacity); |
282 | debug_assert!(_res.is_ok()); |
283 | |
284 | tracing::trace!( |
285 | " assigned capacity to stream; available= {}; buffered= {}; id= {:?}; max_buffer_size= {} prev= {}" , |
286 | self.send_flow.available(), |
287 | self.buffered_send_data, |
288 | self.id, |
289 | max_buffer_size, |
290 | prev_capacity, |
291 | ); |
292 | |
293 | if prev_capacity < self.capacity(max_buffer_size) { |
294 | self.notify_capacity(); |
295 | } |
296 | } |
297 | |
298 | pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { |
299 | let prev_capacity = self.capacity(max_buffer_size); |
300 | |
301 | // TODO: proper error handling |
302 | let _res = self.send_flow.send_data(len); |
303 | debug_assert!(_res.is_ok()); |
304 | |
305 | // Decrement the stream's buffered data counter |
306 | debug_assert!(self.buffered_send_data >= len as usize); |
307 | self.buffered_send_data -= len as usize; |
308 | self.requested_send_capacity -= len; |
309 | |
310 | tracing::trace!( |
311 | " sent stream data; available= {}; buffered= {}; id= {:?}; max_buffer_size= {} prev= {}" , |
312 | self.send_flow.available(), |
313 | self.buffered_send_data, |
314 | self.id, |
315 | max_buffer_size, |
316 | prev_capacity, |
317 | ); |
318 | |
319 | if prev_capacity < self.capacity(max_buffer_size) { |
320 | self.notify_capacity(); |
321 | } |
322 | } |
323 | |
324 | /// If the capacity was limited because of the max_send_buffer_size, |
325 | /// then consider waking the send task again... |
326 | pub fn notify_capacity(&mut self) { |
327 | self.send_capacity_inc = true; |
328 | tracing::trace!(" notifying task" ); |
329 | self.notify_send(); |
330 | } |
331 | |
332 | /// Returns `Err` when the decrement cannot be completed due to overflow. |
333 | pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { |
334 | match self.content_length { |
335 | ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { |
336 | Some(val) => *rem = val, |
337 | None => return Err(()), |
338 | }, |
339 | ContentLength::Head => { |
340 | if len != 0 { |
341 | return Err(()); |
342 | } |
343 | } |
344 | _ => {} |
345 | } |
346 | |
347 | Ok(()) |
348 | } |
349 | |
350 | pub fn ensure_content_length_zero(&self) -> Result<(), ()> { |
351 | match self.content_length { |
352 | ContentLength::Remaining(0) => Ok(()), |
353 | ContentLength::Remaining(_) => Err(()), |
354 | _ => Ok(()), |
355 | } |
356 | } |
357 | |
358 | pub fn notify_send(&mut self) { |
359 | if let Some(task) = self.send_task.take() { |
360 | task.wake(); |
361 | } |
362 | } |
363 | |
364 | pub fn wait_send(&mut self, cx: &Context) { |
365 | self.send_task = Some(cx.waker().clone()); |
366 | } |
367 | |
368 | pub fn notify_recv(&mut self) { |
369 | if let Some(task) = self.recv_task.take() { |
370 | task.wake(); |
371 | } |
372 | } |
373 | } |
374 | |
375 | impl store::Next for NextAccept { |
376 | fn next(stream: &Stream) -> Option<store::Key> { |
377 | stream.next_pending_accept |
378 | } |
379 | |
380 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
381 | stream.next_pending_accept = key; |
382 | } |
383 | |
384 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
385 | stream.next_pending_accept.take() |
386 | } |
387 | |
388 | fn is_queued(stream: &Stream) -> bool { |
389 | stream.is_pending_accept |
390 | } |
391 | |
392 | fn set_queued(stream: &mut Stream, val: bool) { |
393 | stream.is_pending_accept = val; |
394 | } |
395 | } |
396 | |
397 | impl store::Next for NextSend { |
398 | fn next(stream: &Stream) -> Option<store::Key> { |
399 | stream.next_pending_send |
400 | } |
401 | |
402 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
403 | stream.next_pending_send = key; |
404 | } |
405 | |
406 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
407 | stream.next_pending_send.take() |
408 | } |
409 | |
410 | fn is_queued(stream: &Stream) -> bool { |
411 | stream.is_pending_send |
412 | } |
413 | |
414 | fn set_queued(stream: &mut Stream, val: bool) { |
415 | if val { |
416 | // ensure that stream is not queued for being opened |
417 | // if it's being put into queue for sending data |
418 | debug_assert!(!stream.is_pending_open); |
419 | } |
420 | stream.is_pending_send = val; |
421 | } |
422 | } |
423 | |
424 | impl store::Next for NextSendCapacity { |
425 | fn next(stream: &Stream) -> Option<store::Key> { |
426 | stream.next_pending_send_capacity |
427 | } |
428 | |
429 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
430 | stream.next_pending_send_capacity = key; |
431 | } |
432 | |
433 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
434 | stream.next_pending_send_capacity.take() |
435 | } |
436 | |
437 | fn is_queued(stream: &Stream) -> bool { |
438 | stream.is_pending_send_capacity |
439 | } |
440 | |
441 | fn set_queued(stream: &mut Stream, val: bool) { |
442 | stream.is_pending_send_capacity = val; |
443 | } |
444 | } |
445 | |
446 | impl store::Next for NextWindowUpdate { |
447 | fn next(stream: &Stream) -> Option<store::Key> { |
448 | stream.next_window_update |
449 | } |
450 | |
451 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
452 | stream.next_window_update = key; |
453 | } |
454 | |
455 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
456 | stream.next_window_update.take() |
457 | } |
458 | |
459 | fn is_queued(stream: &Stream) -> bool { |
460 | stream.is_pending_window_update |
461 | } |
462 | |
463 | fn set_queued(stream: &mut Stream, val: bool) { |
464 | stream.is_pending_window_update = val; |
465 | } |
466 | } |
467 | |
468 | impl store::Next for NextOpen { |
469 | fn next(stream: &Stream) -> Option<store::Key> { |
470 | stream.next_open |
471 | } |
472 | |
473 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
474 | stream.next_open = key; |
475 | } |
476 | |
477 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
478 | stream.next_open.take() |
479 | } |
480 | |
481 | fn is_queued(stream: &Stream) -> bool { |
482 | stream.is_pending_open |
483 | } |
484 | |
485 | fn set_queued(stream: &mut Stream, val: bool) { |
486 | if val { |
487 | // ensure that stream is not queued for being sent |
488 | // if it's being put into queue for opening the stream |
489 | debug_assert!(!stream.is_pending_send); |
490 | } |
491 | stream.is_pending_open = val; |
492 | } |
493 | } |
494 | |
495 | impl store::Next for NextResetExpire { |
496 | fn next(stream: &Stream) -> Option<store::Key> { |
497 | stream.next_reset_expire |
498 | } |
499 | |
500 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
501 | stream.next_reset_expire = key; |
502 | } |
503 | |
504 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
505 | stream.next_reset_expire.take() |
506 | } |
507 | |
508 | fn is_queued(stream: &Stream) -> bool { |
509 | stream.reset_at.is_some() |
510 | } |
511 | |
512 | fn set_queued(stream: &mut Stream, val: bool) { |
513 | if val { |
514 | stream.reset_at = Some(Instant::now()); |
515 | } else { |
516 | stream.reset_at = None; |
517 | } |
518 | } |
519 | } |
520 | |
521 | // ===== impl ContentLength ===== |
522 | |
523 | impl ContentLength { |
524 | pub fn is_head(&self) -> bool { |
525 | matches!(*self, Self::Head) |
526 | } |
527 | } |
528 | |