1 | use std::io; |
2 | |
3 | use crate::codec::UserError; |
4 | use crate::frame::{self, Reason, StreamId}; |
5 | use crate::proto::{self, Error, Initiator, PollReset}; |
6 | |
7 | use self::Inner::*; |
8 | use self::Peer::*; |
9 | |
10 | /// Represents the state of an H2 stream |
11 | /// |
12 | /// ```not_rust |
13 | /// +--------+ |
14 | /// send PP | | recv PP |
15 | /// ,--------| idle |--------. |
16 | /// / | | \ |
17 | /// v +--------+ v |
18 | /// +----------+ | +----------+ |
19 | /// | | | send H / | | |
20 | /// ,------| reserved | | recv H | reserved |------. |
21 | /// | | (local) | | | (remote) | | |
22 | /// | +----------+ v +----------+ | |
23 | /// | | +--------+ | | |
24 | /// | | recv ES | | send ES | | |
25 | /// | send H | ,-------| open |-------. | recv H | |
26 | /// | | / | | \ | | |
27 | /// | v v +--------+ v v | |
28 | /// | +----------+ | +----------+ | |
29 | /// | | half | | | half | | |
30 | /// | | closed | | send R / | closed | | |
31 | /// | | (remote) | | recv R | (local) | | |
32 | /// | +----------+ | +----------+ | |
33 | /// | | | | | |
34 | /// | | send ES / | recv ES / | | |
35 | /// | | send R / v send R / | | |
36 | /// | | recv R +--------+ recv R | | |
37 | /// | send R / `----------->| |<-----------' send R / | |
38 | /// | recv R | closed | recv R | |
39 | /// `----------------------->| |<----------------------' |
40 | /// +--------+ |
41 | /// |
42 | /// send: endpoint sends this frame |
43 | /// recv: endpoint receives this frame |
44 | /// |
45 | /// H: HEADERS frame (with implied CONTINUATIONs) |
46 | /// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) |
47 | /// ES: END_STREAM flag |
48 | /// R: RST_STREAM frame |
49 | /// ``` |
50 | #[derive (Debug, Clone)] |
51 | pub struct State { |
52 | inner: Inner, |
53 | } |
54 | |
55 | #[derive (Debug, Clone)] |
56 | enum Inner { |
57 | Idle, |
58 | // TODO: these states shouldn't count against concurrency limits: |
59 | ReservedLocal, |
60 | ReservedRemote, |
61 | Open { local: Peer, remote: Peer }, |
62 | HalfClosedLocal(Peer), // TODO: explicitly name this value |
63 | HalfClosedRemote(Peer), |
64 | Closed(Cause), |
65 | } |
66 | |
67 | #[derive (Debug, Copy, Clone, Default)] |
68 | enum Peer { |
69 | #[default] |
70 | AwaitingHeaders, |
71 | Streaming, |
72 | } |
73 | |
74 | #[derive (Debug, Clone)] |
75 | enum Cause { |
76 | EndStream, |
77 | Error(Error), |
78 | |
79 | /// This indicates to the connection that a reset frame must be sent out |
80 | /// once the send queue has been flushed. |
81 | /// |
82 | /// Examples of when this could happen: |
83 | /// - User drops all references to a stream, so we want to CANCEL the it. |
84 | /// - Header block size was too large, so we want to REFUSE, possibly |
85 | /// after sending a 431 response frame. |
86 | ScheduledLibraryReset(Reason), |
87 | } |
88 | |
89 | impl State { |
90 | /// Opens the send-half of a stream if it is not already open. |
91 | pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { |
92 | let local = Streaming; |
93 | |
94 | self.inner = match self.inner { |
95 | Idle => { |
96 | if eos { |
97 | HalfClosedLocal(AwaitingHeaders) |
98 | } else { |
99 | Open { |
100 | local, |
101 | remote: AwaitingHeaders, |
102 | } |
103 | } |
104 | } |
105 | Open { |
106 | local: AwaitingHeaders, |
107 | remote, |
108 | } => { |
109 | if eos { |
110 | HalfClosedLocal(remote) |
111 | } else { |
112 | Open { local, remote } |
113 | } |
114 | } |
115 | HalfClosedRemote(AwaitingHeaders) | ReservedLocal => { |
116 | if eos { |
117 | Closed(Cause::EndStream) |
118 | } else { |
119 | HalfClosedRemote(local) |
120 | } |
121 | } |
122 | _ => { |
123 | // All other transitions result in a protocol error |
124 | return Err(UserError::UnexpectedFrameType); |
125 | } |
126 | }; |
127 | |
128 | Ok(()) |
129 | } |
130 | |
131 | /// Opens the receive-half of the stream when a HEADERS frame is received. |
132 | /// |
133 | /// Returns true if this transitions the state to Open. |
134 | pub fn recv_open(&mut self, frame: &frame::Headers) -> Result<bool, Error> { |
135 | let mut initial = false; |
136 | let eos = frame.is_end_stream(); |
137 | |
138 | self.inner = match self.inner { |
139 | Idle => { |
140 | initial = true; |
141 | |
142 | if eos { |
143 | HalfClosedRemote(AwaitingHeaders) |
144 | } else { |
145 | Open { |
146 | local: AwaitingHeaders, |
147 | remote: if frame.is_informational() { |
148 | tracing::trace!("skipping 1xx response headers" ); |
149 | AwaitingHeaders |
150 | } else { |
151 | Streaming |
152 | }, |
153 | } |
154 | } |
155 | } |
156 | ReservedRemote => { |
157 | initial = true; |
158 | |
159 | if eos { |
160 | Closed(Cause::EndStream) |
161 | } else if frame.is_informational() { |
162 | tracing::trace!("skipping 1xx response headers" ); |
163 | ReservedRemote |
164 | } else { |
165 | HalfClosedLocal(Streaming) |
166 | } |
167 | } |
168 | Open { |
169 | local, |
170 | remote: AwaitingHeaders, |
171 | } => { |
172 | if eos { |
173 | HalfClosedRemote(local) |
174 | } else { |
175 | Open { |
176 | local, |
177 | remote: if frame.is_informational() { |
178 | tracing::trace!("skipping 1xx response headers" ); |
179 | AwaitingHeaders |
180 | } else { |
181 | Streaming |
182 | }, |
183 | } |
184 | } |
185 | } |
186 | HalfClosedLocal(AwaitingHeaders) => { |
187 | if eos { |
188 | Closed(Cause::EndStream) |
189 | } else if frame.is_informational() { |
190 | tracing::trace!("skipping 1xx response headers" ); |
191 | HalfClosedLocal(AwaitingHeaders) |
192 | } else { |
193 | HalfClosedLocal(Streaming) |
194 | } |
195 | } |
196 | ref state => { |
197 | // All other transitions result in a protocol error |
198 | proto_err!(conn: "recv_open: in unexpected state {:?}" , state); |
199 | return Err(Error::library_go_away(Reason::PROTOCOL_ERROR)); |
200 | } |
201 | }; |
202 | |
203 | Ok(initial) |
204 | } |
205 | |
206 | /// Transition from Idle -> ReservedRemote |
207 | pub fn reserve_remote(&mut self) -> Result<(), Error> { |
208 | match self.inner { |
209 | Idle => { |
210 | self.inner = ReservedRemote; |
211 | Ok(()) |
212 | } |
213 | ref state => { |
214 | proto_err!(conn: "reserve_remote: in unexpected state {:?}" , state); |
215 | Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
216 | } |
217 | } |
218 | } |
219 | |
220 | /// Transition from Idle -> ReservedLocal |
221 | pub fn reserve_local(&mut self) -> Result<(), UserError> { |
222 | match self.inner { |
223 | Idle => { |
224 | self.inner = ReservedLocal; |
225 | Ok(()) |
226 | } |
227 | _ => Err(UserError::UnexpectedFrameType), |
228 | } |
229 | } |
230 | |
231 | /// Indicates that the remote side will not send more data to the local. |
232 | pub fn recv_close(&mut self) -> Result<(), Error> { |
233 | match self.inner { |
234 | Open { local, .. } => { |
235 | // The remote side will continue to receive data. |
236 | tracing::trace!("recv_close: Open => HalfClosedRemote( {:?})" , local); |
237 | self.inner = HalfClosedRemote(local); |
238 | Ok(()) |
239 | } |
240 | HalfClosedLocal(..) => { |
241 | tracing::trace!("recv_close: HalfClosedLocal => Closed" ); |
242 | self.inner = Closed(Cause::EndStream); |
243 | Ok(()) |
244 | } |
245 | ref state => { |
246 | proto_err!(conn: "recv_close: in unexpected state {:?}" , state); |
247 | Err(Error::library_go_away(Reason::PROTOCOL_ERROR)) |
248 | } |
249 | } |
250 | } |
251 | |
252 | /// The remote explicitly sent a RST_STREAM. |
253 | /// |
254 | /// # Arguments |
255 | /// - `frame`: the received RST_STREAM frame. |
256 | /// - `queued`: true if this stream has frames in the pending send queue. |
257 | pub fn recv_reset(&mut self, frame: frame::Reset, queued: bool) { |
258 | match self.inner { |
259 | // If the stream is already in a `Closed` state, do nothing, |
260 | // provided that there are no frames still in the send queue. |
261 | Closed(..) if !queued => {} |
262 | // A notionally `Closed` stream may still have queued frames in |
263 | // the following cases: |
264 | // |
265 | // - if the cause is `Cause::Scheduled(..)` (i.e. we have not |
266 | // actually closed the stream yet). |
267 | // - if the cause is `Cause::EndStream`: we transition to this |
268 | // state when an EOS frame is *enqueued* (so that it's invalid |
269 | // to enqueue more frames), not when the EOS frame is *sent*; |
270 | // therefore, there may still be frames ahead of the EOS frame |
271 | // in the send queue. |
272 | // |
273 | // In either of these cases, we want to overwrite the stream's |
274 | // previous state with the received RST_STREAM, so that the queue |
275 | // will be cleared by `Prioritize::pop_frame`. |
276 | ref state => { |
277 | tracing::trace!( |
278 | "recv_reset; frame= {:?}; state= {:?}; queued= {:?}" , |
279 | frame, |
280 | state, |
281 | queued |
282 | ); |
283 | self.inner = Closed(Cause::Error(Error::remote_reset( |
284 | frame.stream_id(), |
285 | frame.reason(), |
286 | ))); |
287 | } |
288 | } |
289 | } |
290 | |
291 | /// Handle a connection-level error. |
292 | pub fn handle_error(&mut self, err: &proto::Error) { |
293 | match self.inner { |
294 | Closed(..) => {} |
295 | _ => { |
296 | tracing::trace!("handle_error; err= {:?}" , err); |
297 | self.inner = Closed(Cause::Error(err.clone())); |
298 | } |
299 | } |
300 | } |
301 | |
302 | pub fn recv_eof(&mut self) { |
303 | match self.inner { |
304 | Closed(..) => {} |
305 | ref state => { |
306 | tracing::trace!("recv_eof; state= {:?}" , state); |
307 | self.inner = Closed(Cause::Error( |
308 | io::Error::new( |
309 | io::ErrorKind::BrokenPipe, |
310 | "stream closed because of a broken pipe" , |
311 | ) |
312 | .into(), |
313 | )); |
314 | } |
315 | } |
316 | } |
317 | |
318 | /// Indicates that the local side will not send more data to the local. |
319 | pub fn send_close(&mut self) { |
320 | match self.inner { |
321 | Open { remote, .. } => { |
322 | // The remote side will continue to receive data. |
323 | tracing::trace!("send_close: Open => HalfClosedLocal( {:?})" , remote); |
324 | self.inner = HalfClosedLocal(remote); |
325 | } |
326 | HalfClosedRemote(..) => { |
327 | tracing::trace!("send_close: HalfClosedRemote => Closed" ); |
328 | self.inner = Closed(Cause::EndStream); |
329 | } |
330 | ref state => panic!("send_close: unexpected state {:?}" , state), |
331 | } |
332 | } |
333 | |
334 | /// Set the stream state to reset locally. |
335 | pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) { |
336 | self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator))); |
337 | } |
338 | |
339 | /// Set the stream state to a scheduled reset. |
340 | pub fn set_scheduled_reset(&mut self, reason: Reason) { |
341 | debug_assert!(!self.is_closed()); |
342 | self.inner = Closed(Cause::ScheduledLibraryReset(reason)); |
343 | } |
344 | |
345 | pub fn get_scheduled_reset(&self) -> Option<Reason> { |
346 | match self.inner { |
347 | Closed(Cause::ScheduledLibraryReset(reason)) => Some(reason), |
348 | _ => None, |
349 | } |
350 | } |
351 | |
352 | pub fn is_scheduled_reset(&self) -> bool { |
353 | matches!(self.inner, Closed(Cause::ScheduledLibraryReset(..))) |
354 | } |
355 | |
356 | pub fn is_local_error(&self) -> bool { |
357 | match self.inner { |
358 | Closed(Cause::Error(ref e)) => e.is_local(), |
359 | Closed(Cause::ScheduledLibraryReset(..)) => true, |
360 | _ => false, |
361 | } |
362 | } |
363 | |
364 | pub fn is_remote_reset(&self) -> bool { |
365 | matches!( |
366 | self.inner, |
367 | Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote))) |
368 | ) |
369 | } |
370 | |
371 | /// Returns true if the stream is already reset. |
372 | pub fn is_reset(&self) -> bool { |
373 | match self.inner { |
374 | Closed(Cause::EndStream) => false, |
375 | Closed(_) => true, |
376 | _ => false, |
377 | } |
378 | } |
379 | |
380 | pub fn is_send_streaming(&self) -> bool { |
381 | matches!( |
382 | self.inner, |
383 | Open { |
384 | local: Streaming, |
385 | .. |
386 | } | HalfClosedRemote(Streaming) |
387 | ) |
388 | } |
389 | |
390 | /// Returns true when the stream is in a state to receive headers |
391 | pub fn is_recv_headers(&self) -> bool { |
392 | matches!( |
393 | self.inner, |
394 | Idle | Open { |
395 | remote: AwaitingHeaders, |
396 | .. |
397 | } | HalfClosedLocal(AwaitingHeaders) |
398 | | ReservedRemote |
399 | ) |
400 | } |
401 | |
402 | pub fn is_recv_streaming(&self) -> bool { |
403 | matches!( |
404 | self.inner, |
405 | Open { |
406 | remote: Streaming, |
407 | .. |
408 | } | HalfClosedLocal(Streaming) |
409 | ) |
410 | } |
411 | |
412 | pub fn is_closed(&self) -> bool { |
413 | matches!(self.inner, Closed(_)) |
414 | } |
415 | |
416 | pub fn is_recv_closed(&self) -> bool { |
417 | matches!( |
418 | self.inner, |
419 | Closed(..) | HalfClosedRemote(..) | ReservedLocal |
420 | ) |
421 | } |
422 | |
423 | pub fn is_send_closed(&self) -> bool { |
424 | matches!( |
425 | self.inner, |
426 | Closed(..) | HalfClosedLocal(..) | ReservedRemote |
427 | ) |
428 | } |
429 | |
430 | pub fn is_idle(&self) -> bool { |
431 | matches!(self.inner, Idle) |
432 | } |
433 | |
434 | pub fn ensure_recv_open(&self) -> Result<bool, proto::Error> { |
435 | // TODO: Is this correct? |
436 | match self.inner { |
437 | Closed(Cause::Error(ref e)) => Err(e.clone()), |
438 | Closed(Cause::ScheduledLibraryReset(reason)) => { |
439 | Err(proto::Error::library_go_away(reason)) |
440 | } |
441 | Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false), |
442 | _ => Ok(true), |
443 | } |
444 | } |
445 | |
446 | /// Returns a reason if the stream has been reset. |
447 | pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> { |
448 | match self.inner { |
449 | Closed(Cause::Error(Error::Reset(_, reason, _))) |
450 | | Closed(Cause::Error(Error::GoAway(_, reason, _))) |
451 | | Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)), |
452 | Closed(Cause::Error(ref e)) => Err(e.clone().into()), |
453 | Open { |
454 | local: Streaming, .. |
455 | } |
456 | | HalfClosedRemote(Streaming) => match mode { |
457 | PollReset::AwaitingHeaders => Err(UserError::PollResetAfterSendResponse.into()), |
458 | PollReset::Streaming => Ok(None), |
459 | }, |
460 | _ => Ok(None), |
461 | } |
462 | } |
463 | } |
464 | |
465 | impl Default for State { |
466 | fn default() -> State { |
467 | State { inner: Inner::Idle } |
468 | } |
469 | } |
470 | |