1use super::{
2 store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
3 StreamIdOverflow, WindowSize,
4};
5use crate::codec::UserError;
6use crate::frame::{self, Reason};
7use crate::proto::{self, Error, Initiator};
8
9use bytes::Buf;
10use tokio::io::AsyncWrite;
11
12use std::cmp::Ordering;
13use std::io;
14use std::task::{Context, Poll, Waker};
15
16/// Manages state transitions related to outbound frames.
17#[derive(Debug)]
18pub(super) struct Send {
19 /// Stream identifier to use for next initialized stream.
20 next_stream_id: Result<StreamId, StreamIdOverflow>,
21
22 /// Any streams with a higher ID are ignored.
23 ///
24 /// This starts as MAX, but is lowered when a GOAWAY is received.
25 ///
26 /// > After sending a GOAWAY frame, the sender can discard frames for
27 /// > streams initiated by the receiver with identifiers higher than
28 /// > the identified last stream.
29 max_stream_id: StreamId,
30
31 /// Initial window size of locally initiated streams
32 init_window_sz: WindowSize,
33
34 /// Prioritization layer
35 prioritize: Prioritize,
36
37 is_push_enabled: bool,
38
39 /// If extended connect protocol is enabled.
40 is_extended_connect_protocol_enabled: bool,
41}
42
43/// A value to detect which public API has called `poll_reset`.
44#[derive(Debug)]
45pub(crate) enum PollReset {
46 AwaitingHeaders,
47 Streaming,
48}
49
50impl Send {
51 /// Create a new `Send`
52 pub fn new(config: &Config) -> Self {
53 Send {
54 init_window_sz: config.remote_init_window_sz,
55 max_stream_id: StreamId::MAX,
56 next_stream_id: Ok(config.local_next_stream_id),
57 prioritize: Prioritize::new(config),
58 is_push_enabled: true,
59 is_extended_connect_protocol_enabled: false,
60 }
61 }
62
63 /// Returns the initial send window size
64 pub fn init_window_sz(&self) -> WindowSize {
65 self.init_window_sz
66 }
67
68 pub fn open(&mut self) -> Result<StreamId, UserError> {
69 let stream_id = self.ensure_next_stream_id()?;
70 self.next_stream_id = stream_id.next_id();
71 Ok(stream_id)
72 }
73
74 pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
75 let stream_id = self.ensure_next_stream_id()?;
76 self.next_stream_id = stream_id.next_id();
77 Ok(stream_id)
78 }
79
80 fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
81 // 8.1.2.2. Connection-Specific Header Fields
82 if fields.contains_key(http::header::CONNECTION)
83 || fields.contains_key(http::header::TRANSFER_ENCODING)
84 || fields.contains_key(http::header::UPGRADE)
85 || fields.contains_key("keep-alive")
86 || fields.contains_key("proxy-connection")
87 {
88 tracing::debug!("illegal connection-specific headers found");
89 return Err(UserError::MalformedHeaders);
90 } else if let Some(te) = fields.get(http::header::TE) {
91 if te != "trailers" {
92 tracing::debug!("illegal connection-specific headers found");
93 return Err(UserError::MalformedHeaders);
94 }
95 }
96 Ok(())
97 }
98
99 pub fn send_push_promise<B>(
100 &mut self,
101 frame: frame::PushPromise,
102 buffer: &mut Buffer<Frame<B>>,
103 stream: &mut store::Ptr,
104 task: &mut Option<Waker>,
105 ) -> Result<(), UserError> {
106 if !self.is_push_enabled {
107 return Err(UserError::PeerDisabledServerPush);
108 }
109
110 tracing::trace!(
111 "send_push_promise; frame={:?}; init_window={:?}",
112 frame,
113 self.init_window_sz
114 );
115
116 Self::check_headers(frame.fields())?;
117
118 // Queue the frame for sending
119 self.prioritize
120 .queue_frame(frame.into(), buffer, stream, task);
121
122 Ok(())
123 }
124
125 pub fn send_headers<B>(
126 &mut self,
127 frame: frame::Headers,
128 buffer: &mut Buffer<Frame<B>>,
129 stream: &mut store::Ptr,
130 counts: &mut Counts,
131 task: &mut Option<Waker>,
132 ) -> Result<(), UserError> {
133 tracing::trace!(
134 "send_headers; frame={:?}; init_window={:?}",
135 frame,
136 self.init_window_sz
137 );
138
139 Self::check_headers(frame.fields())?;
140
141 let end_stream = frame.is_end_stream();
142
143 // Update the state
144 stream.state.send_open(end_stream)?;
145
146 let mut pending_open = false;
147 if counts.peer().is_local_init(frame.stream_id()) && !stream.is_pending_push {
148 self.prioritize.queue_open(stream);
149 pending_open = true;
150 }
151
152 // Queue the frame for sending
153 //
154 // This call expects that, since new streams are in the open queue, new
155 // streams won't be pushed on pending_send.
156 self.prioritize
157 .queue_frame(frame.into(), buffer, stream, task);
158
159 // Need to notify the connection when pushing onto pending_open since
160 // queue_frame only notifies for pending_send.
161 if pending_open {
162 if let Some(task) = task.take() {
163 task.wake();
164 }
165 }
166
167 Ok(())
168 }
169
170 /// Send an explicit RST_STREAM frame
171 pub fn send_reset<B>(
172 &mut self,
173 reason: Reason,
174 initiator: Initiator,
175 buffer: &mut Buffer<Frame<B>>,
176 stream: &mut store::Ptr,
177 counts: &mut Counts,
178 task: &mut Option<Waker>,
179 ) {
180 let is_reset = stream.state.is_reset();
181 let is_closed = stream.state.is_closed();
182 let is_empty = stream.pending_send.is_empty();
183 let stream_id = stream.id;
184
185 tracing::trace!(
186 "send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
187 is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
188 state={:?} \
189 ",
190 reason,
191 initiator,
192 stream_id,
193 is_reset,
194 is_closed,
195 is_empty,
196 stream.state
197 );
198
199 if is_reset {
200 // Don't double reset
201 tracing::trace!(
202 " -> not sending RST_STREAM ({:?} is already reset)",
203 stream_id
204 );
205 return;
206 }
207
208 // Transition the state to reset no matter what.
209 stream.state.set_reset(stream_id, reason, initiator);
210
211 // If closed AND the send queue is flushed, then the stream cannot be
212 // reset explicitly, either. Implicit resets can still be queued.
213 if is_closed && is_empty {
214 tracing::trace!(
215 " -> not sending explicit RST_STREAM ({:?} was closed \
216 and send queue was flushed)",
217 stream_id
218 );
219 return;
220 }
221
222 // Clear all pending outbound frames.
223 // Note that we don't call `self.recv_err` because we want to enqueue
224 // the reset frame before transitioning the stream inside
225 // `reclaim_all_capacity`.
226 self.prioritize.clear_queue(buffer, stream);
227
228 let frame = frame::Reset::new(stream.id, reason);
229
230 tracing::trace!("send_reset -- queueing; frame={:?}", frame);
231 self.prioritize
232 .queue_frame(frame.into(), buffer, stream, task);
233 self.prioritize.reclaim_all_capacity(stream, counts);
234 }
235
236 pub fn schedule_implicit_reset(
237 &mut self,
238 stream: &mut store::Ptr,
239 reason: Reason,
240 counts: &mut Counts,
241 task: &mut Option<Waker>,
242 ) {
243 if stream.state.is_closed() {
244 // Stream is already closed, nothing more to do
245 return;
246 }
247
248 stream.state.set_scheduled_reset(reason);
249
250 self.prioritize.reclaim_reserved_capacity(stream, counts);
251 self.prioritize.schedule_send(stream, task);
252 }
253
254 pub fn send_data<B>(
255 &mut self,
256 frame: frame::Data<B>,
257 buffer: &mut Buffer<Frame<B>>,
258 stream: &mut store::Ptr,
259 counts: &mut Counts,
260 task: &mut Option<Waker>,
261 ) -> Result<(), UserError>
262 where
263 B: Buf,
264 {
265 self.prioritize
266 .send_data(frame, buffer, stream, counts, task)
267 }
268
269 pub fn send_trailers<B>(
270 &mut self,
271 frame: frame::Headers,
272 buffer: &mut Buffer<Frame<B>>,
273 stream: &mut store::Ptr,
274 counts: &mut Counts,
275 task: &mut Option<Waker>,
276 ) -> Result<(), UserError> {
277 // TODO: Should this logic be moved into state.rs?
278 if !stream.state.is_send_streaming() {
279 return Err(UserError::UnexpectedFrameType);
280 }
281
282 stream.state.send_close();
283
284 tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
285 self.prioritize
286 .queue_frame(frame.into(), buffer, stream, task);
287
288 // Release any excess capacity
289 self.prioritize.reserve_capacity(0, stream, counts);
290
291 Ok(())
292 }
293
294 pub fn poll_complete<T, B>(
295 &mut self,
296 cx: &mut Context,
297 buffer: &mut Buffer<Frame<B>>,
298 store: &mut Store,
299 counts: &mut Counts,
300 dst: &mut Codec<T, Prioritized<B>>,
301 ) -> Poll<io::Result<()>>
302 where
303 T: AsyncWrite + Unpin,
304 B: Buf,
305 {
306 self.prioritize
307 .poll_complete(cx, buffer, store, counts, dst)
308 }
309
310 /// Request capacity to send data
311 pub fn reserve_capacity(
312 &mut self,
313 capacity: WindowSize,
314 stream: &mut store::Ptr,
315 counts: &mut Counts,
316 ) {
317 self.prioritize.reserve_capacity(capacity, stream, counts)
318 }
319
320 pub fn poll_capacity(
321 &mut self,
322 cx: &Context,
323 stream: &mut store::Ptr,
324 ) -> Poll<Option<Result<WindowSize, UserError>>> {
325 if !stream.state.is_send_streaming() {
326 return Poll::Ready(None);
327 }
328
329 if !stream.send_capacity_inc {
330 stream.wait_send(cx);
331 return Poll::Pending;
332 }
333
334 stream.send_capacity_inc = false;
335
336 Poll::Ready(Some(Ok(self.capacity(stream))))
337 }
338
339 /// Current available stream send capacity
340 pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
341 stream.capacity(self.prioritize.max_buffer_size())
342 }
343
344 pub fn poll_reset(
345 &self,
346 cx: &Context,
347 stream: &mut Stream,
348 mode: PollReset,
349 ) -> Poll<Result<Reason, crate::Error>> {
350 match stream.state.ensure_reason(mode)? {
351 Some(reason) => Poll::Ready(Ok(reason)),
352 None => {
353 stream.wait_send(cx);
354 Poll::Pending
355 }
356 }
357 }
358
359 pub fn recv_connection_window_update(
360 &mut self,
361 frame: frame::WindowUpdate,
362 store: &mut Store,
363 counts: &mut Counts,
364 ) -> Result<(), Reason> {
365 self.prioritize
366 .recv_connection_window_update(frame.size_increment(), store, counts)
367 }
368
369 pub fn recv_stream_window_update<B>(
370 &mut self,
371 sz: WindowSize,
372 buffer: &mut Buffer<Frame<B>>,
373 stream: &mut store::Ptr,
374 counts: &mut Counts,
375 task: &mut Option<Waker>,
376 ) -> Result<(), Reason> {
377 if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
378 tracing::debug!("recv_stream_window_update !!; err={:?}", e);
379
380 self.send_reset(
381 Reason::FLOW_CONTROL_ERROR,
382 Initiator::Library,
383 buffer,
384 stream,
385 counts,
386 task,
387 );
388
389 return Err(e);
390 }
391
392 Ok(())
393 }
394
395 pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
396 if last_stream_id > self.max_stream_id {
397 // The remote endpoint sent a `GOAWAY` frame indicating a stream
398 // that we never sent, or that we have already terminated on account
399 // of previous `GOAWAY` frame. In either case, that is illegal.
400 // (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
401 // the value they send in the last stream identifier, since the
402 // peers might already have retried unprocessed requests on another
403 // connection.")
404 proto_err!(conn:
405 "recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
406 last_stream_id, self.max_stream_id,
407 );
408 return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
409 }
410
411 self.max_stream_id = last_stream_id;
412 Ok(())
413 }
414
415 pub fn handle_error<B>(
416 &mut self,
417 buffer: &mut Buffer<Frame<B>>,
418 stream: &mut store::Ptr,
419 counts: &mut Counts,
420 ) {
421 // Clear all pending outbound frames
422 self.prioritize.clear_queue(buffer, stream);
423 self.prioritize.reclaim_all_capacity(stream, counts);
424 }
425
426 pub fn apply_remote_settings<B>(
427 &mut self,
428 settings: &frame::Settings,
429 buffer: &mut Buffer<Frame<B>>,
430 store: &mut Store,
431 counts: &mut Counts,
432 task: &mut Option<Waker>,
433 ) -> Result<(), Error> {
434 if let Some(val) = settings.is_extended_connect_protocol_enabled() {
435 self.is_extended_connect_protocol_enabled = val;
436 }
437
438 // Applies an update to the remote endpoint's initial window size.
439 //
440 // Per RFC 7540 ยง6.9.2:
441 //
442 // In addition to changing the flow-control window for streams that are
443 // not yet active, a SETTINGS frame can alter the initial flow-control
444 // window size for streams with active flow-control windows (that is,
445 // streams in the "open" or "half-closed (remote)" state). When the
446 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
447 // the size of all stream flow-control windows that it maintains by the
448 // difference between the new value and the old value.
449 //
450 // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
451 // space in a flow-control window to become negative. A sender MUST
452 // track the negative flow-control window and MUST NOT send new
453 // flow-controlled frames until it receives WINDOW_UPDATE frames that
454 // cause the flow-control window to become positive.
455 if let Some(val) = settings.initial_window_size() {
456 let old_val = self.init_window_sz;
457 self.init_window_sz = val;
458
459 match val.cmp(&old_val) {
460 Ordering::Less => {
461 // We must decrease the (remote) window on every open stream.
462 let dec = old_val - val;
463 tracing::trace!("decrementing all windows; dec={}", dec);
464
465 let mut total_reclaimed = 0;
466 store.try_for_each(|mut stream| {
467 let stream = &mut *stream;
468
469 tracing::trace!(
470 "decrementing stream window; id={:?}; decr={}; flow={:?}",
471 stream.id,
472 dec,
473 stream.send_flow
474 );
475
476 // TODO: this decrement can underflow based on received frames!
477 stream
478 .send_flow
479 .dec_send_window(dec)
480 .map_err(proto::Error::library_go_away)?;
481
482 // It's possible that decreasing the window causes
483 // `window_size` (the stream-specific window) to fall below
484 // `available` (the portion of the connection-level window
485 // that we have allocated to the stream).
486 // In this case, we should take that excess allocation away
487 // and reassign it to other streams.
488 let window_size = stream.send_flow.window_size();
489 let available = stream.send_flow.available().as_size();
490 let reclaimed = if available > window_size {
491 // Drop down to `window_size`.
492 let reclaim = available - window_size;
493 stream
494 .send_flow
495 .claim_capacity(reclaim)
496 .map_err(proto::Error::library_go_away)?;
497 total_reclaimed += reclaim;
498 reclaim
499 } else {
500 0
501 };
502
503 tracing::trace!(
504 "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
505 stream.id,
506 dec,
507 reclaimed,
508 stream.send_flow
509 );
510
511 // TODO: Should this notify the producer when the capacity
512 // of a stream is reduced? Maybe it should if the capacity
513 // is reduced to zero, allowing the producer to stop work.
514
515 Ok::<_, proto::Error>(())
516 })?;
517
518 self.prioritize
519 .assign_connection_capacity(total_reclaimed, store, counts);
520 }
521 Ordering::Greater => {
522 let inc = val - old_val;
523
524 store.try_for_each(|mut stream| {
525 self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
526 .map_err(Error::library_go_away)
527 })?;
528 }
529 Ordering::Equal => (),
530 }
531 }
532
533 if let Some(val) = settings.is_push_enabled() {
534 self.is_push_enabled = val
535 }
536
537 Ok(())
538 }
539
540 pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
541 self.prioritize.clear_pending_capacity(store, counts);
542 self.prioritize.clear_pending_send(store, counts);
543 self.prioritize.clear_pending_open(store, counts);
544 }
545
546 pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
547 if let Ok(next) = self.next_stream_id {
548 if id >= next {
549 return Err(Reason::PROTOCOL_ERROR);
550 }
551 }
552 // if next_stream_id is overflowed, that's ok.
553
554 Ok(())
555 }
556
557 pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
558 self.next_stream_id
559 .map_err(|_| UserError::OverflowedStreamId)
560 }
561
562 pub fn may_have_created_stream(&self, id: StreamId) -> bool {
563 if let Ok(next_id) = self.next_stream_id {
564 // Peer::is_local_init should have been called beforehand
565 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
566 id < next_id
567 } else {
568 true
569 }
570 }
571
572 pub(super) fn maybe_reset_next_stream_id(&mut self, id: StreamId) {
573 if let Ok(next_id) = self.next_stream_id {
574 // Peer::is_local_init should have been called beforehand
575 debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated());
576 if id >= next_id {
577 self.next_stream_id = id.next_id();
578 }
579 }
580 }
581
582 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
583 self.is_extended_connect_protocol_enabled
584 }
585}
586