1use std::io;
2
3use crate::codec::UserError;
4use crate::frame::{self, Reason, StreamId};
5use crate::proto::{self, Error, Initiator, PollReset};
6
7use self::Inner::*;
8use self::Peer::*;
9
10/// Represents the state of an H2 stream
11///
12/// ```not_rust
13/// +--------+
14/// send PP | | recv PP
15/// ,--------| idle |--------.
16/// / | | \
17/// v +--------+ v
18/// +----------+ | +----------+
19/// | | | send H / | |
20/// ,------| reserved | | recv H | reserved |------.
21/// | | (local) | | | (remote) | |
22/// | +----------+ v +----------+ |
23/// | | +--------+ | |
24/// | | recv ES | | send ES | |
25/// | send H | ,-------| open |-------. | recv H |
26/// | | / | | \ | |
27/// | v v +--------+ v v |
28/// | +----------+ | +----------+ |
29/// | | half | | | half | |
30/// | | closed | | send R / | closed | |
31/// | | (remote) | | recv R | (local) | |
32/// | +----------+ | +----------+ |
33/// | | | | |
34/// | | send ES / | recv ES / | |
35/// | | send R / v send R / | |
36/// | | recv R +--------+ recv R | |
37/// | send R / `----------->| |<-----------' send R / |
38/// | recv R | closed | recv R |
39/// `----------------------->| |<----------------------'
40/// +--------+
41///
42/// send: endpoint sends this frame
43/// recv: endpoint receives this frame
44///
45/// H: HEADERS frame (with implied CONTINUATIONs)
46/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
47/// ES: END_STREAM flag
48/// R: RST_STREAM frame
49/// ```
50#[derive(Debug, Clone)]
51pub struct State {
52 inner: Inner,
53}
54
55#[derive(Debug, Clone)]
56enum Inner {
57 Idle,
58 // TODO: these states shouldn't count against concurrency limits:
59 ReservedLocal,
60 ReservedRemote,
61 Open { local: Peer, remote: Peer },
62 HalfClosedLocal(Peer), // TODO: explicitly name this value
63 HalfClosedRemote(Peer),
64 Closed(Cause),
65}
66
67#[derive(Debug, Copy, Clone, Default)]
68enum Peer {
69 #[default]
70 AwaitingHeaders,
71 Streaming,
72}
73
74#[derive(Debug, Clone)]
75enum Cause {
76 EndStream,
77 Error(Error),
78
79 /// This indicates to the connection that a reset frame must be sent out
80 /// once the send queue has been flushed.
81 ///
82 /// Examples of when this could happen:
83 /// - User drops all references to a stream, so we want to CANCEL the it.
84 /// - Header block size was too large, so we want to REFUSE, possibly
85 /// after sending a 431 response frame.
86 ScheduledLibraryReset(Reason),
87}
88
89impl State {
90 /// Opens the send-half of a stream if it is not already open.
91 pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
92 let local = Streaming;
93
94 self.inner = match self.inner {
95 Idle => {
96 if eos {
97 HalfClosedLocal(AwaitingHeaders)
98 } else {
99 Open {
100 local,
101 remote: AwaitingHeaders,
102 }
103 }
104 }
105 Open {
106 local: AwaitingHeaders,
107 remote,
108 } => {
109 if eos {
110 HalfClosedLocal(remote)
111 } else {
112 Open { local, remote }
113 }
114 }
115 HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
116 if eos {
117 Closed(Cause::EndStream)
118 } else {
119 HalfClosedRemote(local)
120 }
121 }
122 _ => {
123 // All other transitions result in a protocol error
124 return Err(UserError::UnexpectedFrameType);
125 }
126 };
127
128 Ok(())
129 }
130
131 /// Opens the receive-half of the stream when a HEADERS frame is received.
132 ///
133 /// Returns true if this transitions the state to Open.
134 pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> {
135 let mut initial = false;
136 let eos = frame.is_end_stream();
137
138 self.inner = match self.inner {
139 Idle => {
140 initial = true;
141
142 if eos {
143 HalfClosedRemote(AwaitingHeaders)
144 } else {
145 Open {
146 local: AwaitingHeaders,
147 remote: if frame.is_informational() {
148 tracing::trace!("skipping 1xx response headers");
149 AwaitingHeaders
150 } else {
151 Streaming
152 },
153 }
154 }
155 }
156 ReservedRemote => {
157 initial = true;
158
159 if eos {
160 Closed(Cause::EndStream)
161 } else if frame.is_informational() {
162 tracing::trace!("skipping 1xx response headers");
163 ReservedRemote
164 } else {
165 HalfClosedLocal(Streaming)
166 }
167 }
168 Open {
169 local,
170 remote: AwaitingHeaders,
171 } => {
172 if eos {
173 HalfClosedRemote(local)
174 } else {
175 Open {
176 local,
177 remote: if frame.is_informational() {
178 tracing::trace!("skipping 1xx response headers");
179 AwaitingHeaders
180 } else {
181 Streaming
182 },
183 }
184 }
185 }
186 HalfClosedLocal(AwaitingHeaders) => {
187 if eos {
188 Closed(Cause::EndStream)
189 } else if frame.is_informational() {
190 tracing::trace!("skipping 1xx response headers");
191 HalfClosedLocal(AwaitingHeaders)
192 } else {
193 HalfClosedLocal(Streaming)
194 }
195 }
196 ref state => {
197 // All other transitions result in a protocol error
198 proto_err!(conn: "recv_open: in unexpected state {:?}", state);
199 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
200 }
201 };
202
203 Ok(initial)
204 }
205
206 /// Transition from Idle -> ReservedRemote
207 pub fn reserve_remote(&mut self) -> Result<(), Error> {
208 match self.inner {
209 Idle => {
210 self.inner = ReservedRemote;
211 Ok(())
212 }
213 ref state => {
214 proto_err!(conn: "reserve_remote: in unexpected state {:?}", state);
215 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
216 }
217 }
218 }
219
220 /// Transition from Idle -> ReservedLocal
221 pub fn reserve_local(&mut self) -> Result<(), UserError> {
222 match self.inner {
223 Idle => {
224 self.inner = ReservedLocal;
225 Ok(())
226 }
227 _ => Err(UserError::UnexpectedFrameType),
228 }
229 }
230
231 /// Indicates that the remote side will not send more data to the local.
232 pub fn recv_close(&mut self) -> Result<(), Error> {
233 match self.inner {
234 Open { local, .. } => {
235 // The remote side will continue to receive data.
236 tracing::trace!("recv_close: Open => HalfClosedRemote({:?})", local);
237 self.inner = HalfClosedRemote(local);
238 Ok(())
239 }
240 HalfClosedLocal(..) => {
241 tracing::trace!("recv_close: HalfClosedLocal => Closed");
242 self.inner = Closed(Cause::EndStream);
243 Ok(())
244 }
245 ref state => {
246 proto_err!(conn: "recv_close: in unexpected state {:?}", state);
247 Err(Error::library_go_away(Reason::PROTOCOL_ERROR))
248 }
249 }
250 }
251
252 /// The remote explicitly sent a RST_STREAM.
253 ///
254 /// # Arguments
255 /// - `frame`: the received RST_STREAM frame.
256 /// - `queued`: true if this stream has frames in the pending send queue.
257 pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) {
258 match self.inner {
259 // If the stream is already in a `Closed` state, do nothing,
260 // provided that there are no frames still in the send queue.
261 Closed(..) if !queued => {}
262 // A notionally `Closed` stream may still have queued frames in
263 // the following cases:
264 //
265 // - if the cause is `Cause::Scheduled(..)` (i.e. we have not
266 // actually closed the stream yet).
267 // - if the cause is `Cause::EndStream`: we transition to this
268 // state when an EOS frame is *enqueued* (so that it's invalid
269 // to enqueue more frames), not when the EOS frame is *sent*;
270 // therefore, there may still be frames ahead of the EOS frame
271 // in the send queue.
272 //
273 // In either of these cases, we want to overwrite the stream's
274 // previous state with the received RST_STREAM, so that the queue
275 // will be cleared by `Prioritize::pop_frame`.
276 ref state => {
277 tracing::trace!(
278 "recv_reset; frame={:?}; state={:?}; queued={:?}",
279 frame,
280 state,
281 queued
282 );
283 self.inner = Closed(Cause::Error(Error::remote_reset(
284 frame.stream_id(),
285 frame.reason(),
286 )));
287 }
288 }
289 }
290
291 /// Handle a connection-level error.
292 pub fn handle_error(&mut self, err: &proto::Error) {
293 match self.inner {
294 Closed(..) => {}
295 _ => {
296 tracing::trace!("handle_error; err={:?}", err);
297 self.inner = Closed(Cause::Error(err.clone()));
298 }
299 }
300 }
301
302 pub fn recv_eof(&mut self) {
303 match self.inner {
304 Closed(..) => {}
305 ref state => {
306 tracing::trace!("recv_eof; state={:?}", state);
307 self.inner = Closed(Cause::Error(
308 io::Error::new(
309 io::ErrorKind::BrokenPipe,
310 "stream closed because of a broken pipe",
311 )
312 .into(),
313 ));
314 }
315 }
316 }
317
318 /// Indicates that the local side will not send more data to the local.
319 pub fn send_close(&mut self) {
320 match self.inner {
321 Open { remote, .. } => {
322 // The remote side will continue to receive data.
323 tracing::trace!("send_close: Open => HalfClosedLocal({:?})", remote);
324 self.inner = HalfClosedLocal(remote);
325 }
326 HalfClosedRemote(..) => {
327 tracing::trace!("send_close: HalfClosedRemote => Closed");
328 self.inner = Closed(Cause::EndStream);
329 }
330 ref state => panic!("send_close: unexpected state {:?}", state),
331 }
332 }
333
334 /// Set the stream state to reset locally.
335 pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
336 self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
337 }
338
339 /// Set the stream state to a scheduled reset.
340 pub fn set_scheduled_reset(&mut self, reason: Reason) {
341 debug_assert!(!self.is_closed());
342 self.inner = Closed(Cause::ScheduledLibraryReset(reason));
343 }
344
345 pub fn get_scheduled_reset(&self) -> Option<Reason> {
346 match self.inner {
347 Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason),
348 _ => None,
349 }
350 }
351
352 pub fn is_scheduled_reset(&self) -> bool {
353 matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..)))
354 }
355
356 pub fn is_local_error(&self) -> bool {
357 match self.inner {
358 Closed(Cause::Error(ref e)) => e.is_local(),
359 Closed(Cause::ScheduledLibraryReset(..)) => true,
360 _ => false,
361 }
362 }
363
364 pub fn is_remote_reset(&self) -> bool {
365 matches!(
366 self.inner,
367 Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
368 )
369 }
370
371 /// Returns true if the stream is already reset.
372 pub fn is_reset(&self) -> bool {
373 match self.inner {
374 Closed(Cause::EndStream) => false,
375 Closed(_) => true,
376 _ => false,
377 }
378 }
379
380 pub fn is_send_streaming(&self) -> bool {
381 matches!(
382 self.inner,
383 Open {
384 local: Streaming,
385 ..
386 } | HalfClosedRemote(Streaming)
387 )
388 }
389
390 /// Returns true when the stream is in a state to receive headers
391 pub fn is_recv_headers(&self) -> bool {
392 matches!(
393 self.inner,
394 Idle | Open {
395 remote: AwaitingHeaders,
396 ..
397 } | HalfClosedLocal(AwaitingHeaders)
398 | ReservedRemote
399 )
400 }
401
402 pub fn is_recv_streaming(&self) -> bool {
403 matches!(
404 self.inner,
405 Open {
406 remote: Streaming,
407 ..
408 } | HalfClosedLocal(Streaming)
409 )
410 }
411
412 pub fn is_closed(&self) -> bool {
413 matches!(self.inner, Closed(_))
414 }
415
416 pub fn is_recv_closed(&self) -> bool {
417 matches!(
418 self.inner,
419 Closed(..) | HalfClosedRemote(..) | ReservedLocal
420 )
421 }
422
423 pub fn is_send_closed(&self) -> bool {
424 matches!(
425 self.inner,
426 Closed(..) | HalfClosedLocal(..) | ReservedRemote
427 )
428 }
429
430 pub fn is_idle(&self) -> bool {
431 matches!(self.inner, Idle)
432 }
433
434 pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> {
435 // TODO: Is this correct?
436 match self.inner {
437 Closed(Cause::Error(ref e)) => Err(e.clone()),
438 Closed(Cause::ScheduledLibraryReset(reason)) => {
439 Err(proto::Error::library_go_away(reason))
440 }
441 Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
442 _ => Ok(true),
443 }
444 }
445
446 /// Returns a reason if the stream has been reset.
447 pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
448 match self.inner {
449 Closed(Cause::Error(Error::Reset(_, reason, _)))
450 | Closed(Cause::Error(Error::GoAway(_, reason, _)))
451 | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
452 Closed(Cause::Error(ref e)) => Err(e.clone().into()),
453 Open {
454 local: Streaming, ..
455 }
456 | HalfClosedRemote(Streaming) => match mode {
457 PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()),
458 PollReset::Streaming => Ok(None),
459 },
460 _ => Ok(None),
461 }
462 }
463}
464
465impl Default for State {
466 fn default() -> State {
467 State { inner: Inner::Idle }
468 }
469}
470