| 1 | use super::{ |
| 2 | store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, |
| 3 | StreamIdOverflow, WindowSize, |
| 4 | }; |
| 5 | use crate::codec::UserError; |
| 6 | use crate::frame::{self, Reason}; |
| 7 | use crate::proto::{self, Error, Initiator}; |
| 8 | |
| 9 | use bytes::Buf; |
| 10 | use tokio::io::AsyncWrite; |
| 11 | |
| 12 | use std::cmp::Ordering; |
| 13 | use std::io; |
| 14 | use std::task::{Context, Poll, Waker}; |
| 15 | |
| 16 | /// Manages state transitions related to outbound frames. |
| 17 | #[derive (Debug)] |
| 18 | pub(super) struct Send { |
| 19 | /// Stream identifier to use for next initialized stream. |
| 20 | next_stream_id: Result<StreamId, StreamIdOverflow>, |
| 21 | |
| 22 | /// Any streams with a higher ID are ignored. |
| 23 | /// |
| 24 | /// This starts as MAX, but is lowered when a GOAWAY is received. |
| 25 | /// |
| 26 | /// > After sending a GOAWAY frame, the sender can discard frames for |
| 27 | /// > streams initiated by the receiver with identifiers higher than |
| 28 | /// > the identified last stream. |
| 29 | max_stream_id: StreamId, |
| 30 | |
| 31 | /// Initial window size of locally initiated streams |
| 32 | init_window_sz: WindowSize, |
| 33 | |
| 34 | /// Prioritization layer |
| 35 | prioritize: Prioritize, |
| 36 | |
| 37 | is_push_enabled: bool, |
| 38 | |
| 39 | /// If extended connect protocol is enabled. |
| 40 | is_extended_connect_protocol_enabled: bool, |
| 41 | } |
| 42 | |
| 43 | /// A value to detect which public API has called `poll_reset`. |
| 44 | #[derive (Debug)] |
| 45 | pub(crate) enum PollReset { |
| 46 | AwaitingHeaders, |
| 47 | Streaming, |
| 48 | } |
| 49 | |
| 50 | impl Send { |
| 51 | /// Create a new `Send` |
| 52 | pub fn new(config: &Config) -> Self { |
| 53 | Send { |
| 54 | init_window_sz: config.remote_init_window_sz, |
| 55 | max_stream_id: StreamId::MAX, |
| 56 | next_stream_id: Ok(config.local_next_stream_id), |
| 57 | prioritize: Prioritize::new(config), |
| 58 | is_push_enabled: true, |
| 59 | is_extended_connect_protocol_enabled: false, |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | /// Returns the initial send window size |
| 64 | pub fn init_window_sz(&self) -> WindowSize { |
| 65 | self.init_window_sz |
| 66 | } |
| 67 | |
| 68 | pub fn open(&mut self) -> Result<StreamId, UserError> { |
| 69 | let stream_id = self.ensure_next_stream_id()?; |
| 70 | self.next_stream_id = stream_id.next_id(); |
| 71 | Ok(stream_id) |
| 72 | } |
| 73 | |
| 74 | pub fn reserve_local(&mut self) -> Result<StreamId, UserError> { |
| 75 | let stream_id = self.ensure_next_stream_id()?; |
| 76 | self.next_stream_id = stream_id.next_id(); |
| 77 | Ok(stream_id) |
| 78 | } |
| 79 | |
| 80 | fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> { |
| 81 | // 8.1.2.2. Connection-Specific Header Fields |
| 82 | if fields.contains_key(http::header::CONNECTION) |
| 83 | || fields.contains_key(http::header::TRANSFER_ENCODING) |
| 84 | || fields.contains_key(http::header::UPGRADE) |
| 85 | || fields.contains_key("keep-alive" ) |
| 86 | || fields.contains_key("proxy-connection" ) |
| 87 | { |
| 88 | tracing::debug!("illegal connection-specific headers found" ); |
| 89 | return Err(UserError::MalformedHeaders); |
| 90 | } else if let Some(te) = fields.get(http::header::TE) { |
| 91 | if te != "trailers" { |
| 92 | tracing::debug!("illegal connection-specific headers found" ); |
| 93 | return Err(UserError::MalformedHeaders); |
| 94 | } |
| 95 | } |
| 96 | Ok(()) |
| 97 | } |
| 98 | |
| 99 | pub fn send_push_promise<B>( |
| 100 | &mut self, |
| 101 | frame: frame::PushPromise, |
| 102 | buffer: &mut Buffer<Frame<B>>, |
| 103 | stream: &mut store::Ptr, |
| 104 | task: &mut Option<Waker>, |
| 105 | ) -> Result<(), UserError> { |
| 106 | if !self.is_push_enabled { |
| 107 | return Err(UserError::PeerDisabledServerPush); |
| 108 | } |
| 109 | |
| 110 | tracing::trace!( |
| 111 | "send_push_promise; frame= {:?}; init_window= {:?}" , |
| 112 | frame, |
| 113 | self.init_window_sz |
| 114 | ); |
| 115 | |
| 116 | Self::check_headers(frame.fields())?; |
| 117 | |
| 118 | // Queue the frame for sending |
| 119 | self.prioritize |
| 120 | .queue_frame(frame.into(), buffer, stream, task); |
| 121 | |
| 122 | Ok(()) |
| 123 | } |
| 124 | |
| 125 | pub fn send_headers<B>( |
| 126 | &mut self, |
| 127 | frame: frame::Headers, |
| 128 | buffer: &mut Buffer<Frame<B>>, |
| 129 | stream: &mut store::Ptr, |
| 130 | counts: &mut Counts, |
| 131 | task: &mut Option<Waker>, |
| 132 | ) -> Result<(), UserError> { |
| 133 | tracing::trace!( |
| 134 | "send_headers; frame= {:?}; init_window= {:?}" , |
| 135 | frame, |
| 136 | self.init_window_sz |
| 137 | ); |
| 138 | |
| 139 | Self::check_headers(frame.fields())?; |
| 140 | |
| 141 | let end_stream = frame.is_end_stream(); |
| 142 | |
| 143 | // Update the state |
| 144 | stream.state.send_open(end_stream)?; |
| 145 | |
| 146 | let mut pending_open = false; |
| 147 | if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push { |
| 148 | self.prioritize.queue_open(stream); |
| 149 | pending_open = true; |
| 150 | } |
| 151 | |
| 152 | // Queue the frame for sending |
| 153 | // |
| 154 | // This call expects that, since new streams are in the open queue, new |
| 155 | // streams won't be pushed on pending_send. |
| 156 | self.prioritize |
| 157 | .queue_frame(frame.into(), buffer, stream, task); |
| 158 | |
| 159 | // Need to notify the connection when pushing onto pending_open since |
| 160 | // queue_frame only notifies for pending_send. |
| 161 | if pending_open { |
| 162 | if let Some(task) = task.take() { |
| 163 | task.wake(); |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | Ok(()) |
| 168 | } |
| 169 | |
| 170 | /// Send an explicit RST_STREAM frame |
| 171 | pub fn send_reset<B>( |
| 172 | &mut self, |
| 173 | reason: Reason, |
| 174 | initiator: Initiator, |
| 175 | buffer: &mut Buffer<Frame<B>>, |
| 176 | stream: &mut store::Ptr, |
| 177 | counts: &mut Counts, |
| 178 | task: &mut Option<Waker>, |
| 179 | ) { |
| 180 | let is_reset = stream.state.is_reset(); |
| 181 | let is_closed = stream.state.is_closed(); |
| 182 | let is_empty = stream.pending_send.is_empty(); |
| 183 | let stream_id = stream.id; |
| 184 | |
| 185 | tracing::trace!( |
| 186 | "send_reset(..., reason= {:?}, initiator= {:?}, stream= {:?}, ..., \ |
| 187 | is_reset= {:?}; is_closed= {:?}; pending_send.is_empty= {:?}; \ |
| 188 | state= {:?} \ |
| 189 | " , |
| 190 | reason, |
| 191 | initiator, |
| 192 | stream_id, |
| 193 | is_reset, |
| 194 | is_closed, |
| 195 | is_empty, |
| 196 | stream.state |
| 197 | ); |
| 198 | |
| 199 | if is_reset { |
| 200 | // Don't double reset |
| 201 | tracing::trace!( |
| 202 | " -> not sending RST_STREAM ( {:?} is already reset)" , |
| 203 | stream_id |
| 204 | ); |
| 205 | return; |
| 206 | } |
| 207 | |
| 208 | // Transition the state to reset no matter what. |
| 209 | stream.set_reset(reason, initiator); |
| 210 | |
| 211 | // If closed AND the send queue is flushed, then the stream cannot be |
| 212 | // reset explicitly, either. Implicit resets can still be queued. |
| 213 | if is_closed && is_empty { |
| 214 | tracing::trace!( |
| 215 | " -> not sending explicit RST_STREAM ( {:?} was closed \ |
| 216 | and send queue was flushed)" , |
| 217 | stream_id |
| 218 | ); |
| 219 | return; |
| 220 | } |
| 221 | |
| 222 | // Clear all pending outbound frames. |
| 223 | // Note that we don't call `self.recv_err` because we want to enqueue |
| 224 | // the reset frame before transitioning the stream inside |
| 225 | // `reclaim_all_capacity`. |
| 226 | self.prioritize.clear_queue(buffer, stream); |
| 227 | |
| 228 | let frame = frame::Reset::new(stream.id, reason); |
| 229 | |
| 230 | tracing::trace!("send_reset -- queueing; frame= {:?}" , frame); |
| 231 | self.prioritize |
| 232 | .queue_frame(frame.into(), buffer, stream, task); |
| 233 | self.prioritize.reclaim_all_capacity(stream, counts); |
| 234 | } |
| 235 | |
| 236 | pub fn schedule_implicit_reset( |
| 237 | &mut self, |
| 238 | stream: &mut store::Ptr, |
| 239 | reason: Reason, |
| 240 | counts: &mut Counts, |
| 241 | task: &mut Option<Waker>, |
| 242 | ) { |
| 243 | if stream.state.is_closed() { |
| 244 | // Stream is already closed, nothing more to do |
| 245 | return; |
| 246 | } |
| 247 | |
| 248 | stream.state.set_scheduled_reset(reason); |
| 249 | |
| 250 | self.prioritize.reclaim_reserved_capacity(stream, counts); |
| 251 | self.prioritize.schedule_send(stream, task); |
| 252 | } |
| 253 | |
| 254 | pub fn send_data<B>( |
| 255 | &mut self, |
| 256 | frame: frame::Data<B>, |
| 257 | buffer: &mut Buffer<Frame<B>>, |
| 258 | stream: &mut store::Ptr, |
| 259 | counts: &mut Counts, |
| 260 | task: &mut Option<Waker>, |
| 261 | ) -> Result<(), UserError> |
| 262 | where |
| 263 | B: Buf, |
| 264 | { |
| 265 | self.prioritize |
| 266 | .send_data(frame, buffer, stream, counts, task) |
| 267 | } |
| 268 | |
| 269 | pub fn send_trailers<B>( |
| 270 | &mut self, |
| 271 | frame: frame::Headers, |
| 272 | buffer: &mut Buffer<Frame<B>>, |
| 273 | stream: &mut store::Ptr, |
| 274 | counts: &mut Counts, |
| 275 | task: &mut Option<Waker>, |
| 276 | ) -> Result<(), UserError> { |
| 277 | // TODO: Should this logic be moved into state.rs? |
| 278 | if !stream.state.is_send_streaming() { |
| 279 | return Err(UserError::UnexpectedFrameType); |
| 280 | } |
| 281 | |
| 282 | stream.state.send_close(); |
| 283 | |
| 284 | tracing::trace!("send_trailers -- queuing; frame= {:?}" , frame); |
| 285 | self.prioritize |
| 286 | .queue_frame(frame.into(), buffer, stream, task); |
| 287 | |
| 288 | // Release any excess capacity |
| 289 | self.prioritize.reserve_capacity(0, stream, counts); |
| 290 | |
| 291 | Ok(()) |
| 292 | } |
| 293 | |
| 294 | pub fn poll_complete<T, B>( |
| 295 | &mut self, |
| 296 | cx: &mut Context, |
| 297 | buffer: &mut Buffer<Frame<B>>, |
| 298 | store: &mut Store, |
| 299 | counts: &mut Counts, |
| 300 | dst: &mut Codec<T, Prioritized<B>>, |
| 301 | ) -> Poll<io::Result<()>> |
| 302 | where |
| 303 | T: AsyncWrite + Unpin, |
| 304 | B: Buf, |
| 305 | { |
| 306 | self.prioritize |
| 307 | .poll_complete(cx, buffer, store, counts, dst) |
| 308 | } |
| 309 | |
| 310 | /// Request capacity to send data |
| 311 | pub fn reserve_capacity( |
| 312 | &mut self, |
| 313 | capacity: WindowSize, |
| 314 | stream: &mut store::Ptr, |
| 315 | counts: &mut Counts, |
| 316 | ) { |
| 317 | self.prioritize.reserve_capacity(capacity, stream, counts) |
| 318 | } |
| 319 | |
| 320 | pub fn poll_capacity( |
| 321 | &mut self, |
| 322 | cx: &Context, |
| 323 | stream: &mut store::Ptr, |
| 324 | ) -> Poll<Option<Result<WindowSize, UserError>>> { |
| 325 | if !stream.state.is_send_streaming() { |
| 326 | return Poll::Ready(None); |
| 327 | } |
| 328 | |
| 329 | if !stream.send_capacity_inc { |
| 330 | stream.wait_send(cx); |
| 331 | return Poll::Pending; |
| 332 | } |
| 333 | |
| 334 | stream.send_capacity_inc = false; |
| 335 | |
| 336 | Poll::Ready(Some(Ok(self.capacity(stream)))) |
| 337 | } |
| 338 | |
| 339 | /// Current available stream send capacity |
| 340 | pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { |
| 341 | stream.capacity(self.prioritize.max_buffer_size()) |
| 342 | } |
| 343 | |
| 344 | pub fn poll_reset( |
| 345 | &self, |
| 346 | cx: &Context, |
| 347 | stream: &mut Stream, |
| 348 | mode: PollReset, |
| 349 | ) -> Poll<Result<Reason, crate::Error>> { |
| 350 | match stream.state.ensure_reason(mode)? { |
| 351 | Some(reason) => Poll::Ready(Ok(reason)), |
| 352 | None => { |
| 353 | stream.wait_send(cx); |
| 354 | Poll::Pending |
| 355 | } |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | pub fn recv_connection_window_update( |
| 360 | &mut self, |
| 361 | frame: frame::WindowUpdate, |
| 362 | store: &mut Store, |
| 363 | counts: &mut Counts, |
| 364 | ) -> Result<(), Reason> { |
| 365 | self.prioritize |
| 366 | .recv_connection_window_update(frame.size_increment(), store, counts) |
| 367 | } |
| 368 | |
| 369 | pub fn recv_stream_window_update<B>( |
| 370 | &mut self, |
| 371 | sz: WindowSize, |
| 372 | buffer: &mut Buffer<Frame<B>>, |
| 373 | stream: &mut store::Ptr, |
| 374 | counts: &mut Counts, |
| 375 | task: &mut Option<Waker>, |
| 376 | ) -> Result<(), Reason> { |
| 377 | if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { |
| 378 | tracing::debug!("recv_stream_window_update !!; err= {:?}" , e); |
| 379 | |
| 380 | self.send_reset( |
| 381 | Reason::FLOW_CONTROL_ERROR, |
| 382 | Initiator::Library, |
| 383 | buffer, |
| 384 | stream, |
| 385 | counts, |
| 386 | task, |
| 387 | ); |
| 388 | |
| 389 | return Err(e); |
| 390 | } |
| 391 | |
| 392 | Ok(()) |
| 393 | } |
| 394 | |
| 395 | pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> { |
| 396 | if last_stream_id > self.max_stream_id { |
| 397 | // The remote endpoint sent a `GOAWAY` frame indicating a stream |
| 398 | // that we never sent, or that we have already terminated on account |
| 399 | // of previous `GOAWAY` frame. In either case, that is illegal. |
| 400 | // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase |
| 401 | // the value they send in the last stream identifier, since the |
| 402 | // peers might already have retried unprocessed requests on another |
| 403 | // connection.") |
| 404 | proto_err!(conn: |
| 405 | "recv_go_away: last_stream_id ( {:?}) > max_stream_id ( {:?})" , |
| 406 | last_stream_id, self.max_stream_id, |
| 407 | ); |
| 408 | return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); |
| 409 | } |
| 410 | |
| 411 | self.max_stream_id = last_stream_id; |
| 412 | Ok(()) |
| 413 | } |
| 414 | |
| 415 | pub fn handle_error<B>( |
| 416 | &mut self, |
| 417 | buffer: &mut Buffer<Frame<B>>, |
| 418 | stream: &mut store::Ptr, |
| 419 | counts: &mut Counts, |
| 420 | ) { |
| 421 | // Clear all pending outbound frames |
| 422 | self.prioritize.clear_queue(buffer, stream); |
| 423 | self.prioritize.reclaim_all_capacity(stream, counts); |
| 424 | } |
| 425 | |
| 426 | pub fn apply_remote_settings<B>( |
| 427 | &mut self, |
| 428 | settings: &frame::Settings, |
| 429 | buffer: &mut Buffer<Frame<B>>, |
| 430 | store: &mut Store, |
| 431 | counts: &mut Counts, |
| 432 | task: &mut Option<Waker>, |
| 433 | ) -> Result<(), Error> { |
| 434 | if let Some(val) = settings.is_extended_connect_protocol_enabled() { |
| 435 | self.is_extended_connect_protocol_enabled = val; |
| 436 | } |
| 437 | |
| 438 | // Applies an update to the remote endpoint's initial window size. |
| 439 | // |
| 440 | // Per RFC 7540 ยง6.9.2: |
| 441 | // |
| 442 | // In addition to changing the flow-control window for streams that are |
| 443 | // not yet active, a SETTINGS frame can alter the initial flow-control |
| 444 | // window size for streams with active flow-control windows (that is, |
| 445 | // streams in the "open" or "half-closed (remote)" state). When the |
| 446 | // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust |
| 447 | // the size of all stream flow-control windows that it maintains by the |
| 448 | // difference between the new value and the old value. |
| 449 | // |
| 450 | // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available |
| 451 | // space in a flow-control window to become negative. A sender MUST |
| 452 | // track the negative flow-control window and MUST NOT send new |
| 453 | // flow-controlled frames until it receives WINDOW_UPDATE frames that |
| 454 | // cause the flow-control window to become positive. |
| 455 | if let Some(val) = settings.initial_window_size() { |
| 456 | let old_val = self.init_window_sz; |
| 457 | self.init_window_sz = val; |
| 458 | |
| 459 | match val.cmp(&old_val) { |
| 460 | Ordering::Less => { |
| 461 | // We must decrease the (remote) window on every open stream. |
| 462 | let dec = old_val - val; |
| 463 | tracing::trace!("decrementing all windows; dec= {}" , dec); |
| 464 | |
| 465 | let mut total_reclaimed = 0; |
| 466 | store.try_for_each(|mut stream| { |
| 467 | let stream = &mut *stream; |
| 468 | |
| 469 | if stream.state.is_send_closed() && stream.buffered_send_data == 0 { |
| 470 | tracing::trace!( |
| 471 | "skipping send-closed stream; id= {:?}; flow= {:?}" , |
| 472 | stream.id, |
| 473 | stream.send_flow |
| 474 | ); |
| 475 | |
| 476 | return Ok(()); |
| 477 | } |
| 478 | |
| 479 | tracing::trace!( |
| 480 | "decrementing stream window; id= {:?}; decr= {}; flow= {:?}" , |
| 481 | stream.id, |
| 482 | dec, |
| 483 | stream.send_flow |
| 484 | ); |
| 485 | |
| 486 | // TODO: this decrement can underflow based on received frames! |
| 487 | stream |
| 488 | .send_flow |
| 489 | .dec_send_window(dec) |
| 490 | .map_err(proto::Error::library_go_away)?; |
| 491 | |
| 492 | // It's possible that decreasing the window causes |
| 493 | // `window_size` (the stream-specific window) to fall below |
| 494 | // `available` (the portion of the connection-level window |
| 495 | // that we have allocated to the stream). |
| 496 | // In this case, we should take that excess allocation away |
| 497 | // and reassign it to other streams. |
| 498 | let window_size = stream.send_flow.window_size(); |
| 499 | let available = stream.send_flow.available().as_size(); |
| 500 | let reclaimed = if available > window_size { |
| 501 | // Drop down to `window_size`. |
| 502 | let reclaim = available - window_size; |
| 503 | stream |
| 504 | .send_flow |
| 505 | .claim_capacity(reclaim) |
| 506 | .map_err(proto::Error::library_go_away)?; |
| 507 | total_reclaimed += reclaim; |
| 508 | reclaim |
| 509 | } else { |
| 510 | 0 |
| 511 | }; |
| 512 | |
| 513 | tracing::trace!( |
| 514 | "decremented stream window; id= {:?}; decr= {}; reclaimed= {}; flow= {:?}" , |
| 515 | stream.id, |
| 516 | dec, |
| 517 | reclaimed, |
| 518 | stream.send_flow |
| 519 | ); |
| 520 | |
| 521 | // TODO: Should this notify the producer when the capacity |
| 522 | // of a stream is reduced? Maybe it should if the capacity |
| 523 | // is reduced to zero, allowing the producer to stop work. |
| 524 | |
| 525 | Ok::<_, proto::Error>(()) |
| 526 | })?; |
| 527 | |
| 528 | self.prioritize |
| 529 | .assign_connection_capacity(total_reclaimed, store, counts); |
| 530 | } |
| 531 | Ordering::Greater => { |
| 532 | let inc = val - old_val; |
| 533 | |
| 534 | store.try_for_each(|mut stream| { |
| 535 | self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) |
| 536 | .map_err(Error::library_go_away) |
| 537 | })?; |
| 538 | } |
| 539 | Ordering::Equal => (), |
| 540 | } |
| 541 | } |
| 542 | |
| 543 | if let Some(val) = settings.is_push_enabled() { |
| 544 | self.is_push_enabled = val |
| 545 | } |
| 546 | |
| 547 | Ok(()) |
| 548 | } |
| 549 | |
| 550 | pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { |
| 551 | self.prioritize.clear_pending_capacity(store, counts); |
| 552 | self.prioritize.clear_pending_send(store, counts); |
| 553 | self.prioritize.clear_pending_open(store, counts); |
| 554 | } |
| 555 | |
| 556 | pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { |
| 557 | if let Ok(next) = self.next_stream_id { |
| 558 | if id >= next { |
| 559 | return Err(Reason::PROTOCOL_ERROR); |
| 560 | } |
| 561 | } |
| 562 | // if next_stream_id is overflowed, that's ok. |
| 563 | |
| 564 | Ok(()) |
| 565 | } |
| 566 | |
| 567 | pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> { |
| 568 | self.next_stream_id |
| 569 | .map_err(|_| UserError::OverflowedStreamId) |
| 570 | } |
| 571 | |
| 572 | pub fn may_have_created_stream(&self, id: StreamId) -> bool { |
| 573 | if let Ok(next_id) = self.next_stream_id { |
| 574 | // Peer::is_local_init should have been called beforehand |
| 575 | debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); |
| 576 | id < next_id |
| 577 | } else { |
| 578 | true |
| 579 | } |
| 580 | } |
| 581 | |
| 582 | pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) { |
| 583 | if let Ok(next_id) = self.next_stream_id { |
| 584 | // Peer::is_local_init should have been called beforehand |
| 585 | debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated()); |
| 586 | if id >= next_id { |
| 587 | self.next_stream_id = id.next_id(); |
| 588 | } |
| 589 | } |
| 590 | } |
| 591 | |
| 592 | pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { |
| 593 | self.is_extended_connect_protocol_enabled |
| 594 | } |
| 595 | } |
| 596 | |