| 1 | use crate::Reason; |
| 2 | |
| 3 | use super::*; |
| 4 | |
| 5 | use std::fmt; |
| 6 | use std::task::{Context, Waker}; |
| 7 | use std::time::Instant; |
| 8 | |
| 9 | /// Tracks Stream related state |
| 10 | /// |
| 11 | /// # Reference counting |
| 12 | /// |
| 13 | /// There can be a number of outstanding handles to a single Stream. These are |
| 14 | /// tracked using reference counting. The `ref_count` field represents the |
| 15 | /// number of outstanding userspace handles that can reach this stream. |
| 16 | /// |
| 17 | /// It's important to note that when the stream is placed in an internal queue |
| 18 | /// (such as an accept queue), this is **not** tracked by a reference count. |
| 19 | /// Thus, `ref_count` can be zero and the stream still has to be kept around. |
| 20 | pub(super) struct Stream { |
| 21 | /// The h2 stream identifier |
| 22 | pub id: StreamId, |
| 23 | |
| 24 | /// Current state of the stream |
| 25 | pub state: State, |
| 26 | |
| 27 | /// Set to `true` when the stream is counted against the connection's max |
| 28 | /// concurrent streams. |
| 29 | pub is_counted: bool, |
| 30 | |
| 31 | /// Number of outstanding handles pointing to this stream |
| 32 | pub ref_count: usize, |
| 33 | |
| 34 | // ===== Fields related to sending ===== |
| 35 | /// Next node in the accept linked list |
| 36 | pub next_pending_send: Option<store::Key>, |
| 37 | |
| 38 | /// Set to true when the stream is pending accept |
| 39 | pub is_pending_send: bool, |
| 40 | |
| 41 | /// Send data flow control |
| 42 | pub send_flow: FlowControl, |
| 43 | |
| 44 | /// Amount of send capacity that has been requested, but not yet allocated. |
| 45 | pub requested_send_capacity: WindowSize, |
| 46 | |
| 47 | /// Amount of data buffered at the prioritization layer. |
| 48 | /// TODO: Technically this could be greater than the window size... |
| 49 | pub buffered_send_data: usize, |
| 50 | |
| 51 | /// Task tracking additional send capacity (i.e. window updates). |
| 52 | send_task: Option<Waker>, |
| 53 | |
| 54 | /// Frames pending for this stream being sent to the socket |
| 55 | pub pending_send: buffer::Deque, |
| 56 | |
| 57 | /// Next node in the linked list of streams waiting for additional |
| 58 | /// connection level capacity. |
| 59 | pub next_pending_send_capacity: Option<store::Key>, |
| 60 | |
| 61 | /// True if the stream is waiting for outbound connection capacity |
| 62 | pub is_pending_send_capacity: bool, |
| 63 | |
| 64 | /// Set to true when the send capacity has been incremented |
| 65 | pub send_capacity_inc: bool, |
| 66 | |
| 67 | /// Next node in the open linked list |
| 68 | pub next_open: Option<store::Key>, |
| 69 | |
| 70 | /// Set to true when the stream is pending to be opened |
| 71 | pub is_pending_open: bool, |
| 72 | |
| 73 | /// Set to true when a push is pending for this stream |
| 74 | pub is_pending_push: bool, |
| 75 | |
| 76 | // ===== Fields related to receiving ===== |
| 77 | /// Next node in the accept linked list |
| 78 | pub next_pending_accept: Option<store::Key>, |
| 79 | |
| 80 | /// Set to true when the stream is pending accept |
| 81 | pub is_pending_accept: bool, |
| 82 | |
| 83 | /// Receive data flow control |
| 84 | pub recv_flow: FlowControl, |
| 85 | |
| 86 | pub in_flight_recv_data: WindowSize, |
| 87 | |
| 88 | /// Next node in the linked list of streams waiting to send window updates. |
| 89 | pub next_window_update: Option<store::Key>, |
| 90 | |
| 91 | /// True if the stream is waiting to send a window update |
| 92 | pub is_pending_window_update: bool, |
| 93 | |
| 94 | /// The time when this stream may have been locally reset. |
| 95 | pub reset_at: Option<Instant>, |
| 96 | |
| 97 | /// Next node in list of reset streams that should expire eventually |
| 98 | pub next_reset_expire: Option<store::Key>, |
| 99 | |
| 100 | /// Frames pending for this stream to read |
| 101 | pub pending_recv: buffer::Deque, |
| 102 | |
| 103 | /// When the RecvStream drop occurs, no data should be received. |
| 104 | pub is_recv: bool, |
| 105 | |
| 106 | /// Task tracking receiving frames |
| 107 | pub recv_task: Option<Waker>, |
| 108 | |
| 109 | /// Task tracking pushed promises. |
| 110 | pub push_task: Option<Waker>, |
| 111 | |
| 112 | /// The stream's pending push promises |
| 113 | pub pending_push_promises: store::Queue<NextAccept>, |
| 114 | |
| 115 | /// Validate content-length headers |
| 116 | pub content_length: ContentLength, |
| 117 | } |
| 118 | |
| 119 | /// State related to validating a stream's content-length |
| 120 | #[derive (Debug)] |
| 121 | pub enum ContentLength { |
| 122 | Omitted, |
| 123 | Head, |
| 124 | Remaining(u64), |
| 125 | } |
| 126 | |
| 127 | #[derive (Debug)] |
| 128 | pub(super) struct NextAccept; |
| 129 | |
| 130 | #[derive (Debug)] |
| 131 | pub(super) struct NextSend; |
| 132 | |
| 133 | #[derive (Debug)] |
| 134 | pub(super) struct NextSendCapacity; |
| 135 | |
| 136 | #[derive (Debug)] |
| 137 | pub(super) struct NextWindowUpdate; |
| 138 | |
| 139 | #[derive (Debug)] |
| 140 | pub(super) struct NextOpen; |
| 141 | |
| 142 | #[derive (Debug)] |
| 143 | pub(super) struct NextResetExpire; |
| 144 | |
| 145 | impl Stream { |
| 146 | pub fn new(id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize) -> Stream { |
| 147 | let mut send_flow = FlowControl::new(); |
| 148 | let mut recv_flow = FlowControl::new(); |
| 149 | |
| 150 | recv_flow |
| 151 | .inc_window(init_recv_window) |
| 152 | .expect("invalid initial receive window" ); |
| 153 | // TODO: proper error handling? |
| 154 | let _res = recv_flow.assign_capacity(init_recv_window); |
| 155 | debug_assert!(_res.is_ok()); |
| 156 | |
| 157 | send_flow |
| 158 | .inc_window(init_send_window) |
| 159 | .expect("invalid initial send window size" ); |
| 160 | |
| 161 | Stream { |
| 162 | id, |
| 163 | state: State::default(), |
| 164 | ref_count: 0, |
| 165 | is_counted: false, |
| 166 | |
| 167 | // ===== Fields related to sending ===== |
| 168 | next_pending_send: None, |
| 169 | is_pending_send: false, |
| 170 | send_flow, |
| 171 | requested_send_capacity: 0, |
| 172 | buffered_send_data: 0, |
| 173 | send_task: None, |
| 174 | pending_send: buffer::Deque::new(), |
| 175 | is_pending_send_capacity: false, |
| 176 | next_pending_send_capacity: None, |
| 177 | send_capacity_inc: false, |
| 178 | is_pending_open: false, |
| 179 | next_open: None, |
| 180 | is_pending_push: false, |
| 181 | |
| 182 | // ===== Fields related to receiving ===== |
| 183 | next_pending_accept: None, |
| 184 | is_pending_accept: false, |
| 185 | recv_flow, |
| 186 | in_flight_recv_data: 0, |
| 187 | next_window_update: None, |
| 188 | is_pending_window_update: false, |
| 189 | reset_at: None, |
| 190 | next_reset_expire: None, |
| 191 | pending_recv: buffer::Deque::new(), |
| 192 | is_recv: true, |
| 193 | recv_task: None, |
| 194 | push_task: None, |
| 195 | pending_push_promises: store::Queue::new(), |
| 196 | content_length: ContentLength::Omitted, |
| 197 | } |
| 198 | } |
| 199 | |
| 200 | /// Increment the stream's ref count |
| 201 | pub fn ref_inc(&mut self) { |
| 202 | assert!(self.ref_count < usize::MAX); |
| 203 | self.ref_count += 1; |
| 204 | } |
| 205 | |
| 206 | /// Decrements the stream's ref count |
| 207 | pub fn ref_dec(&mut self) { |
| 208 | assert!(self.ref_count > 0); |
| 209 | self.ref_count -= 1; |
| 210 | } |
| 211 | |
| 212 | /// Returns true if stream is currently being held for some time because of |
| 213 | /// a local reset. |
| 214 | pub fn is_pending_reset_expiration(&self) -> bool { |
| 215 | self.reset_at.is_some() |
| 216 | } |
| 217 | |
| 218 | /// Returns true if frames for this stream are ready to be sent over the wire |
| 219 | pub fn is_send_ready(&self) -> bool { |
| 220 | // Why do we check pending_open? |
| 221 | // |
| 222 | // We allow users to call send_request() which schedules a stream to be pending_open |
| 223 | // if there is no room according to the concurrency limit (max_send_streams), and we |
| 224 | // also allow data to be buffered for send with send_data() if there is no capacity for |
| 225 | // the stream to send the data, which attempts to place the stream in pending_send. |
| 226 | // If the stream is not open, we don't want the stream to be scheduled for |
| 227 | // execution (pending_send). Note that if the stream is in pending_open, it will be |
| 228 | // pushed to pending_send when there is room for an open stream. |
| 229 | // |
| 230 | // In pending_push we track whether a PushPromise still needs to be sent |
| 231 | // from a different stream before we can start sending frames on this one. |
| 232 | // This is different from the "open" check because reserved streams don't count |
| 233 | // toward the concurrency limit. |
| 234 | // See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2 |
| 235 | !self.is_pending_open && !self.is_pending_push |
| 236 | } |
| 237 | |
| 238 | /// Returns true if the stream is closed |
| 239 | pub fn is_closed(&self) -> bool { |
| 240 | // The state has fully transitioned to closed. |
| 241 | self.state.is_closed() && |
| 242 | // Because outbound frames transition the stream state before being |
| 243 | // buffered, we have to ensure that all frames have been flushed. |
| 244 | self.pending_send.is_empty() && |
| 245 | // Sometimes large data frames are sent out in chunks. After a chunk |
| 246 | // of the frame is sent, the remainder is pushed back onto the send |
| 247 | // queue to be rescheduled. |
| 248 | // |
| 249 | // Checking for additional buffered data lets us catch this case. |
| 250 | self.buffered_send_data == 0 |
| 251 | } |
| 252 | |
| 253 | /// Returns true if the stream is no longer in use |
| 254 | pub fn is_released(&self) -> bool { |
| 255 | // The stream is closed and fully flushed |
| 256 | self.is_closed() && |
| 257 | // There are no more outstanding references to the stream |
| 258 | self.ref_count == 0 && |
| 259 | // The stream is not in any queue |
| 260 | !self.is_pending_send && !self.is_pending_send_capacity && |
| 261 | !self.is_pending_accept && !self.is_pending_window_update && |
| 262 | !self.is_pending_open && self.reset_at.is_none() |
| 263 | } |
| 264 | |
| 265 | /// Returns true when the consumer of the stream has dropped all handles |
| 266 | /// (indicating no further interest in the stream) and the stream state is |
| 267 | /// not actually closed. |
| 268 | /// |
| 269 | /// In this case, a reset should be sent. |
| 270 | pub fn is_canceled_interest(&self) -> bool { |
| 271 | self.ref_count == 0 && !self.state.is_closed() |
| 272 | } |
| 273 | |
| 274 | /// Current available stream send capacity |
| 275 | pub fn capacity(&self, max_buffer_size: usize) -> WindowSize { |
| 276 | let available = self.send_flow.available().as_size() as usize; |
| 277 | let buffered = self.buffered_send_data; |
| 278 | |
| 279 | available.min(max_buffer_size).saturating_sub(buffered) as WindowSize |
| 280 | } |
| 281 | |
| 282 | pub fn assign_capacity(&mut self, capacity: WindowSize, max_buffer_size: usize) { |
| 283 | let prev_capacity = self.capacity(max_buffer_size); |
| 284 | debug_assert!(capacity > 0); |
| 285 | // TODO: proper error handling |
| 286 | let _res = self.send_flow.assign_capacity(capacity); |
| 287 | debug_assert!(_res.is_ok()); |
| 288 | |
| 289 | tracing::trace!( |
| 290 | " assigned capacity to stream; available= {}; buffered= {}; id= {:?}; max_buffer_size= {} prev= {}" , |
| 291 | self.send_flow.available(), |
| 292 | self.buffered_send_data, |
| 293 | self.id, |
| 294 | max_buffer_size, |
| 295 | prev_capacity, |
| 296 | ); |
| 297 | |
| 298 | if prev_capacity < self.capacity(max_buffer_size) { |
| 299 | self.notify_capacity(); |
| 300 | } |
| 301 | } |
| 302 | |
| 303 | pub fn send_data(&mut self, len: WindowSize, max_buffer_size: usize) { |
| 304 | let prev_capacity = self.capacity(max_buffer_size); |
| 305 | |
| 306 | // TODO: proper error handling |
| 307 | let _res = self.send_flow.send_data(len); |
| 308 | debug_assert!(_res.is_ok()); |
| 309 | |
| 310 | // Decrement the stream's buffered data counter |
| 311 | debug_assert!(self.buffered_send_data >= len as usize); |
| 312 | self.buffered_send_data -= len as usize; |
| 313 | self.requested_send_capacity -= len; |
| 314 | |
| 315 | tracing::trace!( |
| 316 | " sent stream data; available= {}; buffered= {}; id= {:?}; max_buffer_size= {} prev= {}" , |
| 317 | self.send_flow.available(), |
| 318 | self.buffered_send_data, |
| 319 | self.id, |
| 320 | max_buffer_size, |
| 321 | prev_capacity, |
| 322 | ); |
| 323 | |
| 324 | if prev_capacity < self.capacity(max_buffer_size) { |
| 325 | self.notify_capacity(); |
| 326 | } |
| 327 | } |
| 328 | |
| 329 | /// If the capacity was limited because of the max_send_buffer_size, |
| 330 | /// then consider waking the send task again... |
| 331 | pub fn notify_capacity(&mut self) { |
| 332 | self.send_capacity_inc = true; |
| 333 | tracing::trace!(" notifying task" ); |
| 334 | self.notify_send(); |
| 335 | } |
| 336 | |
| 337 | /// Returns `Err` when the decrement cannot be completed due to overflow. |
| 338 | pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { |
| 339 | match self.content_length { |
| 340 | ContentLength::Remaining(ref mut rem) => match rem.checked_sub(len as u64) { |
| 341 | Some(val) => *rem = val, |
| 342 | None => return Err(()), |
| 343 | }, |
| 344 | ContentLength::Head => { |
| 345 | if len != 0 { |
| 346 | return Err(()); |
| 347 | } |
| 348 | } |
| 349 | _ => {} |
| 350 | } |
| 351 | |
| 352 | Ok(()) |
| 353 | } |
| 354 | |
| 355 | pub fn ensure_content_length_zero(&self) -> Result<(), ()> { |
| 356 | match self.content_length { |
| 357 | ContentLength::Remaining(0) => Ok(()), |
| 358 | ContentLength::Remaining(_) => Err(()), |
| 359 | _ => Ok(()), |
| 360 | } |
| 361 | } |
| 362 | |
| 363 | pub fn notify_send(&mut self) { |
| 364 | if let Some(task) = self.send_task.take() { |
| 365 | task.wake(); |
| 366 | } |
| 367 | } |
| 368 | |
| 369 | pub fn wait_send(&mut self, cx: &Context) { |
| 370 | self.send_task = Some(cx.waker().clone()); |
| 371 | } |
| 372 | |
| 373 | pub fn notify_recv(&mut self) { |
| 374 | if let Some(task) = self.recv_task.take() { |
| 375 | task.wake(); |
| 376 | } |
| 377 | } |
| 378 | |
| 379 | pub(super) fn notify_push(&mut self) { |
| 380 | if let Some(task) = self.push_task.take() { |
| 381 | task.wake(); |
| 382 | } |
| 383 | } |
| 384 | |
| 385 | /// Set the stream's state to `Closed` with the given reason and initiator. |
| 386 | /// Notify the send and receive tasks, if they exist. |
| 387 | pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) { |
| 388 | self.state.set_reset(self.id, reason, initiator); |
| 389 | self.notify_push(); |
| 390 | self.notify_recv(); |
| 391 | } |
| 392 | } |
| 393 | |
| 394 | impl fmt::Debug for Stream { |
| 395 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 396 | f.debug_struct("Stream" ) |
| 397 | .field("id" , &self.id) |
| 398 | .field("state" , &self.state) |
| 399 | .field("is_counted" , &self.is_counted) |
| 400 | .field("ref_count" , &self.ref_count) |
| 401 | .field("next_pending_send" , &self.next_pending_send) |
| 402 | .field("is_pending_send" , &self.is_pending_send) |
| 403 | .field("send_flow" , &self.send_flow) |
| 404 | .field("requested_send_capacity" , &self.requested_send_capacity) |
| 405 | .field("buffered_send_data" , &self.buffered_send_data) |
| 406 | .field("send_task" , &self.send_task.as_ref().map(|_| ())) |
| 407 | .field("pending_send" , &self.pending_send) |
| 408 | .field( |
| 409 | "next_pending_send_capacity" , |
| 410 | &self.next_pending_send_capacity, |
| 411 | ) |
| 412 | .field("is_pending_send_capacity" , &self.is_pending_send_capacity) |
| 413 | .field("send_capacity_inc" , &self.send_capacity_inc) |
| 414 | .field("next_open" , &self.next_open) |
| 415 | .field("is_pending_open" , &self.is_pending_open) |
| 416 | .field("is_pending_push" , &self.is_pending_push) |
| 417 | .field("next_pending_accept" , &self.next_pending_accept) |
| 418 | .field("is_pending_accept" , &self.is_pending_accept) |
| 419 | .field("recv_flow" , &self.recv_flow) |
| 420 | .field("in_flight_recv_data" , &self.in_flight_recv_data) |
| 421 | .field("next_window_update" , &self.next_window_update) |
| 422 | .field("is_pending_window_update" , &self.is_pending_window_update) |
| 423 | .field("reset_at" , &self.reset_at) |
| 424 | .field("next_reset_expire" , &self.next_reset_expire) |
| 425 | .field("pending_recv" , &self.pending_recv) |
| 426 | .field("is_recv" , &self.is_recv) |
| 427 | .field("recv_task" , &self.recv_task.as_ref().map(|_| ())) |
| 428 | .field("push_task" , &self.push_task.as_ref().map(|_| ())) |
| 429 | .field("pending_push_promises" , &self.pending_push_promises) |
| 430 | .field("content_length" , &self.content_length) |
| 431 | .finish() |
| 432 | } |
| 433 | } |
| 434 | |
| 435 | impl store::Next for NextAccept { |
| 436 | fn next(stream: &Stream) -> Option<store::Key> { |
| 437 | stream.next_pending_accept |
| 438 | } |
| 439 | |
| 440 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| 441 | stream.next_pending_accept = key; |
| 442 | } |
| 443 | |
| 444 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| 445 | stream.next_pending_accept.take() |
| 446 | } |
| 447 | |
| 448 | fn is_queued(stream: &Stream) -> bool { |
| 449 | stream.is_pending_accept |
| 450 | } |
| 451 | |
| 452 | fn set_queued(stream: &mut Stream, val: bool) { |
| 453 | stream.is_pending_accept = val; |
| 454 | } |
| 455 | } |
| 456 | |
| 457 | impl store::Next for NextSend { |
| 458 | fn next(stream: &Stream) -> Option<store::Key> { |
| 459 | stream.next_pending_send |
| 460 | } |
| 461 | |
| 462 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| 463 | stream.next_pending_send = key; |
| 464 | } |
| 465 | |
| 466 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| 467 | stream.next_pending_send.take() |
| 468 | } |
| 469 | |
| 470 | fn is_queued(stream: &Stream) -> bool { |
| 471 | stream.is_pending_send |
| 472 | } |
| 473 | |
| 474 | fn set_queued(stream: &mut Stream, val: bool) { |
| 475 | if val { |
| 476 | // ensure that stream is not queued for being opened |
| 477 | // if it's being put into queue for sending data |
| 478 | debug_assert!(!stream.is_pending_open); |
| 479 | } |
| 480 | stream.is_pending_send = val; |
| 481 | } |
| 482 | } |
| 483 | |
| 484 | impl store::Next for NextSendCapacity { |
| 485 | fn next(stream: &Stream) -> Option<store::Key> { |
| 486 | stream.next_pending_send_capacity |
| 487 | } |
| 488 | |
| 489 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| 490 | stream.next_pending_send_capacity = key; |
| 491 | } |
| 492 | |
| 493 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| 494 | stream.next_pending_send_capacity.take() |
| 495 | } |
| 496 | |
| 497 | fn is_queued(stream: &Stream) -> bool { |
| 498 | stream.is_pending_send_capacity |
| 499 | } |
| 500 | |
| 501 | fn set_queued(stream: &mut Stream, val: bool) { |
| 502 | stream.is_pending_send_capacity = val; |
| 503 | } |
| 504 | } |
| 505 | |
| 506 | impl store::Next for NextWindowUpdate { |
| 507 | fn next(stream: &Stream) -> Option<store::Key> { |
| 508 | stream.next_window_update |
| 509 | } |
| 510 | |
| 511 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| 512 | stream.next_window_update = key; |
| 513 | } |
| 514 | |
| 515 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| 516 | stream.next_window_update.take() |
| 517 | } |
| 518 | |
| 519 | fn is_queued(stream: &Stream) -> bool { |
| 520 | stream.is_pending_window_update |
| 521 | } |
| 522 | |
| 523 | fn set_queued(stream: &mut Stream, val: bool) { |
| 524 | stream.is_pending_window_update = val; |
| 525 | } |
| 526 | } |
| 527 | |
| 528 | impl store::Next for NextOpen { |
| 529 | fn next(stream: &Stream) -> Option<store::Key> { |
| 530 | stream.next_open |
| 531 | } |
| 532 | |
| 533 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| 534 | stream.next_open = key; |
| 535 | } |
| 536 | |
| 537 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| 538 | stream.next_open.take() |
| 539 | } |
| 540 | |
| 541 | fn is_queued(stream: &Stream) -> bool { |
| 542 | stream.is_pending_open |
| 543 | } |
| 544 | |
| 545 | fn set_queued(stream: &mut Stream, val: bool) { |
| 546 | if val { |
| 547 | // ensure that stream is not queued for being sent |
| 548 | // if it's being put into queue for opening the stream |
| 549 | debug_assert!(!stream.is_pending_send); |
| 550 | } |
| 551 | stream.is_pending_open = val; |
| 552 | } |
| 553 | } |
| 554 | |
| 555 | impl store::Next for NextResetExpire { |
| 556 | fn next(stream: &Stream) -> Option<store::Key> { |
| 557 | stream.next_reset_expire |
| 558 | } |
| 559 | |
| 560 | fn set_next(stream: &mut Stream, key: Option<store::Key>) { |
| 561 | stream.next_reset_expire = key; |
| 562 | } |
| 563 | |
| 564 | fn take_next(stream: &mut Stream) -> Option<store::Key> { |
| 565 | stream.next_reset_expire.take() |
| 566 | } |
| 567 | |
| 568 | fn is_queued(stream: &Stream) -> bool { |
| 569 | stream.reset_at.is_some() |
| 570 | } |
| 571 | |
| 572 | fn set_queued(stream: &mut Stream, val: bool) { |
| 573 | if val { |
| 574 | stream.reset_at = Some(Instant::now()); |
| 575 | } else { |
| 576 | stream.reset_at = None; |
| 577 | } |
| 578 | } |
| 579 | } |
| 580 | |
| 581 | // ===== impl ContentLength ===== |
| 582 | |
| 583 | impl ContentLength { |
| 584 | pub fn is_head(&self) -> bool { |
| 585 | matches!(*self, Self::Head) |
| 586 | } |
| 587 | } |
| 588 | |