| 1 | use std::io; |
| 2 | |
| 3 | use crate::codec::UserError; |
| 4 | use crate::frame::{self, Reason, StreamId}; |
| 5 | use crate::proto::{self, Error, Initiator, PollReset}; |
| 6 | |
| 7 | use self::Inner::*; |
| 8 | use self::Peer::*; |
| 9 | |
| 10 | /// Represents the state of an H2 stream |
| 11 | /// |
| 12 | /// ```not_rust |
| 13 | /// +--------+ |
| 14 | /// send PP | | recv PP |
| 15 | /// ,--------| idle |--------. |
| 16 | /// / | | \ |
| 17 | /// v +--------+ v |
| 18 | /// +----------+ | +----------+ |
| 19 | /// | | | send H / | | |
| 20 | /// ,------| reserved | | recv H | reserved |------. |
| 21 | /// | | (local) | | | (remote) | | |
| 22 | /// | +----------+ v +----------+ | |
| 23 | /// | | +--------+ | | |
| 24 | /// | | recv ES | | send ES | | |
| 25 | /// | send H | ,-------| open |-------. | recv H | |
| 26 | /// | | / | | \ | | |
| 27 | /// | v v +--------+ v v | |
| 28 | /// | +----------+ | +----------+ | |
| 29 | /// | | half | | | half | | |
| 30 | /// | | closed | | send R / | closed | | |
| 31 | /// | | (remote) | | recv R | (local) | | |
| 32 | /// | +----------+ | +----------+ | |
| 33 | /// | | | | | |
| 34 | /// | | send ES / | recv ES / | | |
| 35 | /// | | send R / v send R / | | |
| 36 | /// | | recv R +--------+ recv R | | |
| 37 | /// | send R / `----------->| |<-----------' send R / | |
| 38 | /// | recv R | closed | recv R | |
| 39 | /// `----------------------->| |<----------------------' |
| 40 | /// +--------+ |
| 41 | /// |
| 42 | /// send: endpoint sends this frame |
| 43 | /// recv: endpoint receives this frame |
| 44 | /// |
| 45 | /// H: HEADERS frame (with implied CONTINUATIONs) |
| 46 | /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) |
| 47 | /// ES: END_STREAM flag |
| 48 | /// R: RST_STREAM frame |
| 49 | /// ``` |
| 50 | #[derive (Debug, Clone)] |
| 51 | pub struct State { |
| 52 | inner: Inner, |
| 53 | } |
| 54 | |
| 55 | #[derive (Debug, Clone)] |
| 56 | enum Inner { |
| 57 | Idle, |
| 58 | // TODO: these states shouldn't count against concurrency limits: |
| 59 | ReservedLocal, |
| 60 | ReservedRemote, |
| 61 | Open { local: Peer, remote: Peer }, |
| 62 | HalfClosedLocal(Peer), // TODO: explicitly name this value |
| 63 | HalfClosedRemote(Peer), |
| 64 | Closed(Cause), |
| 65 | } |
| 66 | |
| 67 | #[derive (Debug, Copy, Clone, Default)] |
| 68 | enum Peer { |
| 69 | #[default] |
| 70 | AwaitingHeaders, |
| 71 | Streaming, |
| 72 | } |
| 73 | |
| 74 | #[derive (Debug, Clone)] |
| 75 | enum Cause { |
| 76 | EndStream, |
| 77 | Error(Error), |
| 78 | |
| 79 | /// This indicates to the connection that a reset frame must be sent out |
| 80 | /// once the send queue has been flushed. |
| 81 | /// |
| 82 | /// Examples of when this could happen: |
| 83 | /// - User drops all references to a stream, so we want to CANCEL the it. |
| 84 | /// - Header block size was too large, so we want to REFUSE, possibly |
| 85 | /// after sending a 431 response frame. |
| 86 | ScheduledLibraryReset(Reason), |
| 87 | } |
| 88 | |
| 89 | impl State { |
| 90 | /// Opens the send-half of a stream if it is not already open. |
| 91 | pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { |
| 92 | let local = Streaming; |
| 93 | |
| 94 | self.inner = match self.inner { |
| 95 | Idle => { |
| 96 | if eos { |
| 97 | HalfClosedLocal(AwaitingHeaders) |
| 98 | } else { |
| 99 | Open { |
| 100 | local, |
| 101 | remote: AwaitingHeaders, |
| 102 | } |
| 103 | } |
| 104 | } |
| 105 | Open { |
| 106 | local: AwaitingHeaders, |
| 107 | remote, |
| 108 | } => { |
| 109 | if eos { |
| 110 | HalfClosedLocal(remote) |
| 111 | } else { |
| 112 | Open { local, remote } |
| 113 | } |
| 114 | } |
| 115 | HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { |
| 116 | if eos { |
| 117 | Closed(Cause::EndStream) |
| 118 | } else { |
| 119 | HalfClosedRemote(local) |
| 120 | } |
| 121 | } |
| 122 | _ => { |
| 123 | // All other transitions result in a protocol error |
| 124 | return Err(UserError::UnexpectedFrameType); |
| 125 | } |
| 126 | }; |
| 127 | |
| 128 | Ok(()) |
| 129 | } |
| 130 | |
| 131 | /// Opens the receive-half of the stream when a HEADERS frame is received. |
| 132 | /// |
| 133 | /// Returns true if this transitions the state to Open. |
| 134 | pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { |
| 135 | let mut initial = false; |
| 136 | let eos = frame.is_end_stream(); |
| 137 | |
| 138 | self.inner = match self.inner { |
| 139 | Idle => { |
| 140 | initial = true; |
| 141 | |
| 142 | if eos { |
| 143 | HalfClosedRemote(AwaitingHeaders) |
| 144 | } else { |
| 145 | Open { |
| 146 | local: AwaitingHeaders, |
| 147 | remote: if frame.is_informational() { |
| 148 | tracing::trace!("skipping 1xx response headers" ); |
| 149 | AwaitingHeaders |
| 150 | } else { |
| 151 | Streaming |
| 152 | }, |
| 153 | } |
| 154 | } |
| 155 | } |
| 156 | ReservedRemote => { |
| 157 | initial = true; |
| 158 | |
| 159 | if eos { |
| 160 | Closed(Cause::EndStream) |
| 161 | } else if frame.is_informational() { |
| 162 | tracing::trace!("skipping 1xx response headers" ); |
| 163 | ReservedRemote |
| 164 | } else { |
| 165 | HalfClosedLocal(Streaming) |
| 166 | } |
| 167 | } |
| 168 | Open { |
| 169 | local, |
| 170 | remote: AwaitingHeaders, |
| 171 | } => { |
| 172 | if eos { |
| 173 | HalfClosedRemote(local) |
| 174 | } else { |
| 175 | Open { |
| 176 | local, |
| 177 | remote: if frame.is_informational() { |
| 178 | tracing::trace!("skipping 1xx response headers" ); |
| 179 | AwaitingHeaders |
| 180 | } else { |
| 181 | Streaming |
| 182 | }, |
| 183 | } |
| 184 | } |
| 185 | } |
| 186 | HalfClosedLocal(AwaitingHeaders) => { |
| 187 | if eos { |
| 188 | Closed(Cause::EndStream) |
| 189 | } else if frame.is_informational() { |
| 190 | tracing::trace!("skipping 1xx response headers" ); |
| 191 | HalfClosedLocal(AwaitingHeaders) |
| 192 | } else { |
| 193 | HalfClosedLocal(Streaming) |
| 194 | } |
| 195 | } |
| 196 | ref state => { |
| 197 | // All other transitions result in a protocol error |
| 198 | proto_err!(conn: "recv_open: in unexpected state {:?}" , state); |
| 199 | return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); |
| 200 | } |
| 201 | }; |
| 202 | |
| 203 | Ok(initial) |
| 204 | } |
| 205 | |
| 206 | /// Transition from Idle -> ReservedRemote |
| 207 | pub fn reserve_remote(&mut self) -> Result<(), Error> { |
| 208 | match self.inner { |
| 209 | Idle => { |
| 210 | self.inner = ReservedRemote; |
| 211 | Ok(()) |
| 212 | } |
| 213 | ref state => { |
| 214 | proto_err!(conn: "reserve_remote: in unexpected state {:?}" , state); |
| 215 | Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
| 216 | } |
| 217 | } |
| 218 | } |
| 219 | |
| 220 | /// Transition from Idle -> ReservedLocal |
| 221 | pub fn reserve_local(&mut self) -> Result<(), UserError> { |
| 222 | match self.inner { |
| 223 | Idle => { |
| 224 | self.inner = ReservedLocal; |
| 225 | Ok(()) |
| 226 | } |
| 227 | _ => Err(UserError::UnexpectedFrameType), |
| 228 | } |
| 229 | } |
| 230 | |
| 231 | /// Indicates that the remote side will not send more data to the local. |
| 232 | pub fn recv_close(&mut self) -> Result<(), Error> { |
| 233 | match self.inner { |
| 234 | Open { local, .. } => { |
| 235 | // The remote side will continue to receive data. |
| 236 | tracing::trace!("recv_close: Open => HalfClosedRemote( {:?})" , local); |
| 237 | self.inner = HalfClosedRemote(local); |
| 238 | Ok(()) |
| 239 | } |
| 240 | HalfClosedLocal(..) => { |
| 241 | tracing::trace!("recv_close: HalfClosedLocal => Closed" ); |
| 242 | self.inner = Closed(Cause::EndStream); |
| 243 | Ok(()) |
| 244 | } |
| 245 | ref state => { |
| 246 | proto_err!(conn: "recv_close: in unexpected state {:?}" , state); |
| 247 | Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
| 248 | } |
| 249 | } |
| 250 | } |
| 251 | |
| 252 | /// The remote explicitly sent a RST_STREAM. |
| 253 | /// |
| 254 | /// # Arguments |
| 255 | /// - `frame`: the received RST_STREAM frame. |
| 256 | /// - `queued`: true if this stream has frames in the pending send queue. |
| 257 | pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { |
| 258 | match self.inner { |
| 259 | // If the stream is already in a `Closed` state, do nothing, |
| 260 | // provided that there are no frames still in the send queue. |
| 261 | Closed(..) if !queued => {} |
| 262 | // A notionally `Closed` stream may still have queued frames in |
| 263 | // the following cases: |
| 264 | // |
| 265 | // - if the cause is `Cause::Scheduled(..)` (i.e. we have not |
| 266 | // actually closed the stream yet). |
| 267 | // - if the cause is `Cause::EndStream`: we transition to this |
| 268 | // state when an EOS frame is *enqueued* (so that it's invalid |
| 269 | // to enqueue more frames), not when the EOS frame is *sent*; |
| 270 | // therefore, there may still be frames ahead of the EOS frame |
| 271 | // in the send queue. |
| 272 | // |
| 273 | // In either of these cases, we want to overwrite the stream's |
| 274 | // previous state with the received RST_STREAM, so that the queue |
| 275 | // will be cleared by `Prioritize::pop_frame`. |
| 276 | ref state => { |
| 277 | tracing::trace!( |
| 278 | "recv_reset; frame= {:?}; state= {:?}; queued= {:?}" , |
| 279 | frame, |
| 280 | state, |
| 281 | queued |
| 282 | ); |
| 283 | self.inner = Closed(Cause::Error(Error::remote_reset( |
| 284 | frame.stream_id(), |
| 285 | frame.reason(), |
| 286 | ))); |
| 287 | } |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | /// Handle a connection-level error. |
| 292 | pub fn handle_error(&mut self, err: &proto::Error) { |
| 293 | match self.inner { |
| 294 | Closed(..) => {} |
| 295 | _ => { |
| 296 | tracing::trace!("handle_error; err= {:?}" , err); |
| 297 | self.inner = Closed(Cause::Error(err.clone())); |
| 298 | } |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | pub fn recv_eof(&mut self) { |
| 303 | match self.inner { |
| 304 | Closed(..) => {} |
| 305 | ref state => { |
| 306 | tracing::trace!("recv_eof; state= {:?}" , state); |
| 307 | self.inner = Closed(Cause::Error( |
| 308 | io::Error::new( |
| 309 | io::ErrorKind::BrokenPipe, |
| 310 | "stream closed because of a broken pipe" , |
| 311 | ) |
| 312 | .into(), |
| 313 | )); |
| 314 | } |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | /// Indicates that the local side will not send more data to the local. |
| 319 | pub fn send_close(&mut self) { |
| 320 | match self.inner { |
| 321 | Open { remote, .. } => { |
| 322 | // The remote side will continue to receive data. |
| 323 | tracing::trace!("send_close: Open => HalfClosedLocal( {:?})" , remote); |
| 324 | self.inner = HalfClosedLocal(remote); |
| 325 | } |
| 326 | HalfClosedRemote(..) => { |
| 327 | tracing::trace!("send_close: HalfClosedRemote => Closed" ); |
| 328 | self.inner = Closed(Cause::EndStream); |
| 329 | } |
| 330 | ref state => panic!("send_close: unexpected state {:?}" , state), |
| 331 | } |
| 332 | } |
| 333 | |
| 334 | /// Set the stream state to reset locally. |
| 335 | pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { |
| 336 | self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); |
| 337 | } |
| 338 | |
| 339 | /// Set the stream state to a scheduled reset. |
| 340 | pub fn set_scheduled_reset(&mut self, reason: Reason) { |
| 341 | debug_assert!(!self.is_closed()); |
| 342 | self.inner = Closed(Cause::ScheduledLibraryReset(reason)); |
| 343 | } |
| 344 | |
| 345 | pub fn get_scheduled_reset(&self) -> Option<Reason> { |
| 346 | match self.inner { |
| 347 | Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), |
| 348 | _ => None, |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | pub fn is_scheduled_reset(&self) -> bool { |
| 353 | matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..))) |
| 354 | } |
| 355 | |
| 356 | pub fn is_local_error(&self) -> bool { |
| 357 | match self.inner { |
| 358 | Closed(Cause::Error(ref e)) => e.is_local(), |
| 359 | Closed(Cause::ScheduledLibraryReset(..)) => true, |
| 360 | _ => false, |
| 361 | } |
| 362 | } |
| 363 | |
| 364 | pub fn is_remote_reset(&self) -> bool { |
| 365 | matches!( |
| 366 | self.inner, |
| 367 | Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) |
| 368 | ) |
| 369 | } |
| 370 | |
| 371 | /// Returns true if the stream is already reset. |
| 372 | pub fn is_reset(&self) -> bool { |
| 373 | match self.inner { |
| 374 | Closed(Cause::EndStream) => false, |
| 375 | Closed(_) => true, |
| 376 | _ => false, |
| 377 | } |
| 378 | } |
| 379 | |
| 380 | pub fn is_send_streaming(&self) -> bool { |
| 381 | matches!( |
| 382 | self.inner, |
| 383 | Open { |
| 384 | local: Streaming, |
| 385 | .. |
| 386 | } | HalfClosedRemote(Streaming) |
| 387 | ) |
| 388 | } |
| 389 | |
| 390 | /// Returns true when the stream is in a state to receive headers |
| 391 | pub fn is_recv_headers(&self) -> bool { |
| 392 | matches!( |
| 393 | self.inner, |
| 394 | Idle | Open { |
| 395 | remote: AwaitingHeaders, |
| 396 | .. |
| 397 | } | HalfClosedLocal(AwaitingHeaders) |
| 398 | | ReservedRemote |
| 399 | ) |
| 400 | } |
| 401 | |
| 402 | pub fn is_recv_streaming(&self) -> bool { |
| 403 | matches!( |
| 404 | self.inner, |
| 405 | Open { |
| 406 | remote: Streaming, |
| 407 | .. |
| 408 | } | HalfClosedLocal(Streaming) |
| 409 | ) |
| 410 | } |
| 411 | |
| 412 | pub fn is_closed(&self) -> bool { |
| 413 | matches!(self.inner, Closed(_)) |
| 414 | } |
| 415 | |
| 416 | pub fn is_recv_closed(&self) -> bool { |
| 417 | matches!( |
| 418 | self.inner, |
| 419 | Closed(..) | HalfClosedRemote(..) | ReservedLocal |
| 420 | ) |
| 421 | } |
| 422 | |
| 423 | pub fn is_send_closed(&self) -> bool { |
| 424 | matches!( |
| 425 | self.inner, |
| 426 | Closed(..) | HalfClosedLocal(..) | ReservedRemote |
| 427 | ) |
| 428 | } |
| 429 | |
| 430 | pub fn is_idle(&self) -> bool { |
| 431 | matches!(self.inner, Idle) |
| 432 | } |
| 433 | |
| 434 | pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { |
| 435 | // TODO: Is this correct? |
| 436 | match self.inner { |
| 437 | Closed(Cause::Error(ref e)) => Err(e.clone()), |
| 438 | Closed(Cause::ScheduledLibraryReset(reason)) => { |
| 439 | Err(proto::Error::library_go_away(reason)) |
| 440 | } |
| 441 | Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), |
| 442 | _ => Ok(true), |
| 443 | } |
| 444 | } |
| 445 | |
| 446 | /// Returns a reason if the stream has been reset. |
| 447 | pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { |
| 448 | match self.inner { |
| 449 | Closed(Cause::Error(Error::Reset(_, reason, _))) |
| 450 | | Closed(Cause::Error(Error::GoAway(_, reason, _))) |
| 451 | | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), |
| 452 | Closed(Cause::Error(ref e)) => Err(e.clone().into()), |
| 453 | Open { |
| 454 | local: Streaming, .. |
| 455 | } |
| 456 | | HalfClosedRemote(Streaming) => match mode { |
| 457 | PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), |
| 458 | PollReset::Streaming => Ok(None), |
| 459 | }, |
| 460 | _ => Ok(None), |
| 461 | } |
| 462 | } |
| 463 | } |
| 464 | |
| 465 | impl Default for State { |
| 466 | fn default() -> State { |
| 467 | State { inner: Inner::Idle } |
| 468 | } |
| 469 | } |
| 470 | |