| 1 | use crate::codec::UserError; |
| 2 | use crate::frame::Reason; |
| 3 | use crate::proto::{self, WindowSize}; |
| 4 | |
| 5 | use bytes::{Buf, Bytes}; |
| 6 | use http::HeaderMap; |
| 7 | |
| 8 | use std::fmt; |
| 9 | #[cfg (feature = "stream" )] |
| 10 | use std::pin::Pin; |
| 11 | use std::task::{Context, Poll}; |
| 12 | |
| 13 | /// Sends the body stream and trailers to the remote peer. |
| 14 | /// |
| 15 | /// # Overview |
| 16 | /// |
| 17 | /// A `SendStream` is provided by [`SendRequest`] and [`SendResponse`] once the |
| 18 | /// HTTP/2 message header has been sent sent. It is used to stream the message |
| 19 | /// body and send the message trailers. See method level documentation for more |
| 20 | /// details. |
| 21 | /// |
| 22 | /// The `SendStream` instance is also used to manage outbound flow control. |
| 23 | /// |
| 24 | /// If a `SendStream` is dropped without explicitly closing the send stream, a |
| 25 | /// `RST_STREAM` frame will be sent. This essentially cancels the request / |
| 26 | /// response exchange. |
| 27 | /// |
| 28 | /// The ways to explicitly close the send stream are: |
| 29 | /// |
| 30 | /// * Set `end_of_stream` to true when calling [`send_request`], |
| 31 | /// [`send_response`], or [`send_data`]. |
| 32 | /// * Send trailers with [`send_trailers`]. |
| 33 | /// * Explicitly reset the stream with [`send_reset`]. |
| 34 | /// |
| 35 | /// # Flow control |
| 36 | /// |
| 37 | /// In HTTP/2, data cannot be sent to the remote peer unless there is |
| 38 | /// available window capacity on both the stream and the connection. When a data |
| 39 | /// frame is sent, both the stream window and the connection window are |
| 40 | /// decremented. When the stream level window reaches zero, no further data can |
| 41 | /// be sent on that stream. When the connection level window reaches zero, no |
| 42 | /// further data can be sent on any stream for that connection. |
| 43 | /// |
| 44 | /// When the remote peer is ready to receive more data, it sends `WINDOW_UPDATE` |
| 45 | /// frames. These frames increment the windows. See the [specification] for more |
| 46 | /// details on the principles of HTTP/2 flow control. |
| 47 | /// |
| 48 | /// The implications for sending data are that the caller **should** ensure that |
| 49 | /// both the stream and the connection has available window capacity before |
| 50 | /// loading the data to send into memory. The `SendStream` instance provides the |
| 51 | /// necessary APIs to perform this logic. This, however, is not an obligation. |
| 52 | /// If the caller attempts to send data on a stream when there is no available |
| 53 | /// window capacity, the library will buffer the data until capacity becomes |
| 54 | /// available, at which point the buffer will be flushed to the connection. |
| 55 | /// |
| 56 | /// **NOTE**: There is no bound on the amount of data that the library will |
| 57 | /// buffer. If you are sending large amounts of data, you really should hook |
| 58 | /// into the flow control lifecycle. Otherwise, you risk using up significant |
| 59 | /// amounts of memory. |
| 60 | /// |
| 61 | /// To hook into the flow control lifecycle, the caller signals to the library |
| 62 | /// that it intends to send data by calling [`reserve_capacity`], specifying the |
| 63 | /// amount of data, in octets, that the caller intends to send. After this, |
| 64 | /// `poll_capacity` is used to be notified when the requested capacity is |
| 65 | /// assigned to the stream. Once [`poll_capacity`] returns `Ready` with the number |
| 66 | /// of octets available to the stream, the caller is able to actually send the |
| 67 | /// data using [`send_data`]. |
| 68 | /// |
| 69 | /// Because there is also a connection level window that applies to **all** |
| 70 | /// streams on a connection, when capacity is assigned to a stream (indicated by |
| 71 | /// `poll_capacity` returning `Ready`), this capacity is reserved on the |
| 72 | /// connection and will **not** be assigned to any other stream. If data is |
| 73 | /// never written to the stream, that capacity is effectively lost to other |
| 74 | /// streams and this introduces the risk of deadlocking a connection. |
| 75 | /// |
| 76 | /// To avoid throttling data on a connection, the caller should not reserve |
| 77 | /// capacity until ready to send data and once any capacity is assigned to the |
| 78 | /// stream, the caller should immediately send data consuming this capacity. |
| 79 | /// There is no guarantee as to when the full capacity requested will become |
| 80 | /// available. For example, if the caller requests 64 KB of data and 512 bytes |
| 81 | /// become available, the caller should immediately send 512 bytes of data. |
| 82 | /// |
| 83 | /// See [`reserve_capacity`] documentation for more details. |
| 84 | /// |
| 85 | /// [`SendRequest`]: client/struct.SendRequest.html |
| 86 | /// [`SendResponse`]: server/struct.SendResponse.html |
| 87 | /// [specification]: http://httpwg.org/specs/rfc7540.html#FlowControl |
| 88 | /// [`reserve_capacity`]: #method.reserve_capacity |
| 89 | /// [`poll_capacity`]: #method.poll_capacity |
| 90 | /// [`send_data`]: #method.send_data |
| 91 | /// [`send_request`]: client/struct.SendRequest.html#method.send_request |
| 92 | /// [`send_response`]: server/struct.SendResponse.html#method.send_response |
| 93 | /// [`send_data`]: #method.send_data |
| 94 | /// [`send_trailers`]: #method.send_trailers |
| 95 | /// [`send_reset`]: #method.send_reset |
| 96 | #[derive (Debug)] |
| 97 | pub struct SendStream<B> { |
| 98 | inner: proto::StreamRef<B>, |
| 99 | } |
| 100 | |
| 101 | /// A stream identifier, as described in [Section 5.1.1] of RFC 7540. |
| 102 | /// |
| 103 | /// Streams are identified with an unsigned 31-bit integer. Streams |
| 104 | /// initiated by a client MUST use odd-numbered stream identifiers; those |
| 105 | /// initiated by the server MUST use even-numbered stream identifiers. A |
| 106 | /// stream identifier of zero (0x0) is used for connection control |
| 107 | /// messages; the stream identifier of zero cannot be used to establish a |
| 108 | /// new stream. |
| 109 | /// |
| 110 | /// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1 |
| 111 | #[derive (Debug, Clone, Copy, Eq, PartialEq, Hash)] |
| 112 | pub struct StreamId(u32); |
| 113 | |
| 114 | impl From<StreamId> for u32 { |
| 115 | fn from(src: StreamId) -> Self { |
| 116 | src.0 |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | /// Receives the body stream and trailers from the remote peer. |
| 121 | /// |
| 122 | /// A `RecvStream` is provided by [`client::ResponseFuture`] and |
| 123 | /// [`server::Connection`] with the received HTTP/2 message head (the response |
| 124 | /// and request head respectively). |
| 125 | /// |
| 126 | /// A `RecvStream` instance is used to receive the streaming message body and |
| 127 | /// any trailers from the remote peer. It is also used to manage inbound flow |
| 128 | /// control. |
| 129 | /// |
| 130 | /// See method level documentation for more details on receiving data. See |
| 131 | /// [`FlowControl`] for more details on inbound flow control. |
| 132 | /// |
| 133 | /// [`client::ResponseFuture`]: client/struct.ResponseFuture.html |
| 134 | /// [`server::Connection`]: server/struct.Connection.html |
| 135 | /// [`FlowControl`]: struct.FlowControl.html |
| 136 | /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html |
| 137 | #[must_use = "streams do nothing unless polled" ] |
| 138 | pub struct RecvStream { |
| 139 | inner: FlowControl, |
| 140 | } |
| 141 | |
| 142 | /// A handle to release window capacity to a remote stream. |
| 143 | /// |
| 144 | /// This type allows the caller to manage inbound data [flow control]. The |
| 145 | /// caller is expected to call [`release_capacity`] after dropping data frames. |
| 146 | /// |
| 147 | /// # Overview |
| 148 | /// |
| 149 | /// Each stream has a window size. This window size is the maximum amount of |
| 150 | /// inbound data that can be in-flight. In-flight data is defined as data that |
| 151 | /// has been received, but not yet released. |
| 152 | /// |
| 153 | /// When a stream is created, the window size is set to the connection's initial |
| 154 | /// window size value. When a data frame is received, the window size is then |
| 155 | /// decremented by size of the data frame before the data is provided to the |
| 156 | /// caller. As the caller finishes using the data, [`release_capacity`] must be |
| 157 | /// called. This will then increment the window size again, allowing the peer to |
| 158 | /// send more data. |
| 159 | /// |
| 160 | /// There is also a connection level window as well as the stream level window. |
| 161 | /// Received data counts against the connection level window as well and calls |
| 162 | /// to [`release_capacity`] will also increment the connection level window. |
| 163 | /// |
| 164 | /// # Sending `WINDOW_UPDATE` frames |
| 165 | /// |
| 166 | /// `WINDOW_UPDATE` frames will not be sent out for **every** call to |
| 167 | /// `release_capacity`, as this would end up slowing down the protocol. Instead, |
| 168 | /// `h2` waits until the window size is increased to a certain threshold and |
| 169 | /// then sends out a single `WINDOW_UPDATE` frame representing all the calls to |
| 170 | /// `release_capacity` since the last `WINDOW_UPDATE` frame. |
| 171 | /// |
| 172 | /// This essentially batches window updating. |
| 173 | /// |
| 174 | /// # Scenarios |
| 175 | /// |
| 176 | /// Following is a basic scenario with an HTTP/2 connection containing a |
| 177 | /// single active stream. |
| 178 | /// |
| 179 | /// * A new stream is activated. The receive window is initialized to 1024 (the |
| 180 | /// value of the initial window size for this connection). |
| 181 | /// * A `DATA` frame is received containing a payload of 600 bytes. |
| 182 | /// * The receive window size is reduced to 424 bytes. |
| 183 | /// * [`release_capacity`] is called with 200. |
| 184 | /// * The receive window size is now 624 bytes. The peer may send no more than |
| 185 | /// this. |
| 186 | /// * A `DATA` frame is received with a payload of 624 bytes. |
| 187 | /// * The window size is now 0 bytes. The peer may not send any more data. |
| 188 | /// * [`release_capacity`] is called with 1024. |
| 189 | /// * The receive window size is now 1024 bytes. The peer may now send more |
| 190 | /// data. |
| 191 | /// |
| 192 | /// [flow control]: ../index.html#flow-control |
| 193 | /// [`release_capacity`]: struct.FlowControl.html#method.release_capacity |
| 194 | #[derive (Clone, Debug)] |
| 195 | pub struct FlowControl { |
| 196 | inner: proto::OpaqueStreamRef, |
| 197 | } |
| 198 | |
| 199 | /// A handle to send and receive PING frames with the peer. |
| 200 | // NOT Clone on purpose |
| 201 | pub struct PingPong { |
| 202 | inner: proto::UserPings, |
| 203 | } |
| 204 | |
| 205 | /// Sent via [`PingPong`][] to send a PING frame to a peer. |
| 206 | /// |
| 207 | /// [`PingPong`]: struct.PingPong.html |
| 208 | pub struct Ping { |
| 209 | _p: (), |
| 210 | } |
| 211 | |
| 212 | /// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][]. |
| 213 | /// |
| 214 | /// [`PingPong`]: struct.PingPong.html |
| 215 | /// [`Ping`]: struct.Ping.html |
| 216 | pub struct Pong { |
| 217 | _p: (), |
| 218 | } |
| 219 | |
| 220 | // ===== impl SendStream ===== |
| 221 | |
| 222 | impl<B: Buf> SendStream<B> { |
| 223 | pub(crate) fn new(inner: proto::StreamRef<B>) -> Self { |
| 224 | SendStream { inner } |
| 225 | } |
| 226 | |
| 227 | /// Requests capacity to send data. |
| 228 | /// |
| 229 | /// This function is used to express intent to send data. This requests |
| 230 | /// connection level capacity. Once the capacity is available, it is |
| 231 | /// assigned to the stream and not reused by other streams. |
| 232 | /// |
| 233 | /// This function may be called repeatedly. The `capacity` argument is the |
| 234 | /// **total** amount of requested capacity. Sequential calls to |
| 235 | /// `reserve_capacity` are *not* additive. Given the following: |
| 236 | /// |
| 237 | /// ```rust |
| 238 | /// # use h2::*; |
| 239 | /// # fn doc(mut send_stream: SendStream<&'static [u8]>) { |
| 240 | /// send_stream.reserve_capacity(100); |
| 241 | /// send_stream.reserve_capacity(200); |
| 242 | /// # } |
| 243 | /// ``` |
| 244 | /// |
| 245 | /// After the second call to `reserve_capacity`, the *total* requested |
| 246 | /// capacity will be 200. |
| 247 | /// |
| 248 | /// `reserve_capacity` is also used to cancel previous capacity requests. |
| 249 | /// Given the following: |
| 250 | /// |
| 251 | /// ```rust |
| 252 | /// # use h2::*; |
| 253 | /// # fn doc(mut send_stream: SendStream<&'static [u8]>) { |
| 254 | /// send_stream.reserve_capacity(100); |
| 255 | /// send_stream.reserve_capacity(0); |
| 256 | /// # } |
| 257 | /// ``` |
| 258 | /// |
| 259 | /// After the second call to `reserve_capacity`, the *total* requested |
| 260 | /// capacity will be 0, i.e. there is no requested capacity for the stream. |
| 261 | /// |
| 262 | /// If `reserve_capacity` is called with a lower value than the amount of |
| 263 | /// capacity **currently** assigned to the stream, this capacity will be |
| 264 | /// returned to the connection to be re-assigned to other streams. |
| 265 | /// |
| 266 | /// Also, the amount of capacity that is reserved gets decremented as data |
| 267 | /// is sent. For example: |
| 268 | /// |
| 269 | /// ```rust |
| 270 | /// # use h2::*; |
| 271 | /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) { |
| 272 | /// send_stream.reserve_capacity(100); |
| 273 | /// |
| 274 | /// send_stream.send_data(b"hello" , false).unwrap(); |
| 275 | /// // At this point, the total amount of requested capacity is 95 bytes. |
| 276 | /// |
| 277 | /// // Calling `reserve_capacity` with `100` again essentially requests an |
| 278 | /// // additional 5 bytes. |
| 279 | /// send_stream.reserve_capacity(100); |
| 280 | /// # } |
| 281 | /// ``` |
| 282 | /// |
| 283 | /// See [Flow control](struct.SendStream.html#flow-control) for an overview |
| 284 | /// of how send flow control works. |
| 285 | pub fn reserve_capacity(&mut self, capacity: usize) { |
| 286 | // TODO: Check for overflow |
| 287 | self.inner.reserve_capacity(capacity as WindowSize) |
| 288 | } |
| 289 | |
| 290 | /// Returns the stream's current send capacity. |
| 291 | /// |
| 292 | /// This allows the caller to check the current amount of available capacity |
| 293 | /// before sending data. |
| 294 | pub fn capacity(&self) -> usize { |
| 295 | self.inner.capacity() as usize |
| 296 | } |
| 297 | |
| 298 | /// Requests to be notified when the stream's capacity increases. |
| 299 | /// |
| 300 | /// Before calling this, capacity should be requested with |
| 301 | /// `reserve_capacity`. Once capacity is requested, the connection will |
| 302 | /// assign capacity to the stream **as it becomes available**. There is no |
| 303 | /// guarantee as to when and in what increments capacity gets assigned to |
| 304 | /// the stream. |
| 305 | /// |
| 306 | /// To get notified when the available capacity increases, the caller calls |
| 307 | /// `poll_capacity`, which returns `Ready(Some(n))` when `n` has been |
| 308 | /// increased by the connection. Note that `n` here represents the **total** |
| 309 | /// amount of assigned capacity at that point in time. It is also possible |
| 310 | /// that `n` is lower than the previous call if, since then, the caller has |
| 311 | /// sent data. |
| 312 | pub fn poll_capacity(&mut self, cx: &mut Context) -> Poll<Option<Result<usize, crate::Error>>> { |
| 313 | self.inner |
| 314 | .poll_capacity(cx) |
| 315 | .map_ok(|w| w as usize) |
| 316 | .map_err(Into::into) |
| 317 | } |
| 318 | |
| 319 | /// Sends a single data frame to the remote peer. |
| 320 | /// |
| 321 | /// This function may be called repeatedly as long as `end_of_stream` is set |
| 322 | /// to `false`. Setting `end_of_stream` to `true` sets the end stream flag |
| 323 | /// on the data frame. Any further calls to `send_data` or `send_trailers` |
| 324 | /// will return an [`Error`]. |
| 325 | /// |
| 326 | /// `send_data` can be called without reserving capacity. In this case, the |
| 327 | /// data is buffered and the capacity is implicitly requested. Once the |
| 328 | /// capacity becomes available, the data is flushed to the connection. |
| 329 | /// However, this buffering is unbounded. As such, sending large amounts of |
| 330 | /// data without reserving capacity before hand could result in large |
| 331 | /// amounts of data being buffered in memory. |
| 332 | /// |
| 333 | /// [`Error`]: struct.Error.html |
| 334 | pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), crate::Error> { |
| 335 | self.inner |
| 336 | .send_data(data, end_of_stream) |
| 337 | .map_err(Into::into) |
| 338 | } |
| 339 | |
| 340 | /// Sends trailers to the remote peer. |
| 341 | /// |
| 342 | /// Sending trailers implicitly closes the send stream. Once the send stream |
| 343 | /// is closed, no more data can be sent. |
| 344 | pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), crate::Error> { |
| 345 | self.inner.send_trailers(trailers).map_err(Into::into) |
| 346 | } |
| 347 | |
| 348 | /// Resets the stream. |
| 349 | /// |
| 350 | /// This cancels the request / response exchange. If the response has not |
| 351 | /// yet been received, the associated `ResponseFuture` will return an |
| 352 | /// [`Error`] to reflect the canceled exchange. |
| 353 | /// |
| 354 | /// [`Error`]: struct.Error.html |
| 355 | pub fn send_reset(&mut self, reason: Reason) { |
| 356 | self.inner.send_reset(reason) |
| 357 | } |
| 358 | |
| 359 | /// Polls to be notified when the client resets this stream. |
| 360 | /// |
| 361 | /// If stream is still open, this returns `Poll::Pending`, and |
| 362 | /// registers the task to be notified if a `RST_STREAM` is received. |
| 363 | /// |
| 364 | /// If a `RST_STREAM` frame is received for this stream, calling this |
| 365 | /// method will yield the `Reason` for the reset. |
| 366 | /// |
| 367 | /// # Error |
| 368 | /// |
| 369 | /// If connection sees an error, this returns that error instead of a |
| 370 | /// `Reason`. |
| 371 | pub fn poll_reset(&mut self, cx: &mut Context) -> Poll<Result<Reason, crate::Error>> { |
| 372 | self.inner.poll_reset(cx, proto::PollReset::Streaming) |
| 373 | } |
| 374 | |
| 375 | /// Returns the stream ID of this `SendStream`. |
| 376 | /// |
| 377 | /// # Panics |
| 378 | /// |
| 379 | /// If the lock on the stream store has been poisoned. |
| 380 | pub fn stream_id(&self) -> StreamId { |
| 381 | StreamId::from_internal(self.inner.stream_id()) |
| 382 | } |
| 383 | } |
| 384 | |
| 385 | // ===== impl StreamId ===== |
| 386 | |
| 387 | impl StreamId { |
| 388 | pub(crate) fn from_internal(id: crate::frame::StreamId) -> Self { |
| 389 | StreamId(id.into()) |
| 390 | } |
| 391 | |
| 392 | /// Returns the `u32` corresponding to this `StreamId` |
| 393 | /// |
| 394 | /// # Note |
| 395 | /// |
| 396 | /// This is the same as the `From<StreamId>` implementation, but |
| 397 | /// included as an inherent method because that implementation doesn't |
| 398 | /// appear in rustdocs, as well as a way to force the type instead of |
| 399 | /// relying on inference. |
| 400 | pub fn as_u32(&self) -> u32 { |
| 401 | (*self).into() |
| 402 | } |
| 403 | } |
| 404 | // ===== impl RecvStream ===== |
| 405 | |
| 406 | impl RecvStream { |
| 407 | pub(crate) fn new(inner: FlowControl) -> Self { |
| 408 | RecvStream { inner } |
| 409 | } |
| 410 | |
| 411 | /// Get the next data frame. |
| 412 | pub async fn data(&mut self) -> Option<Result<Bytes, crate::Error>> { |
| 413 | crate::poll_fn(move |cx| self.poll_data(cx)).await |
| 414 | } |
| 415 | |
| 416 | /// Get optional trailers for this stream. |
| 417 | pub async fn trailers(&mut self) -> Result<Option<HeaderMap>, crate::Error> { |
| 418 | crate::poll_fn(move |cx| self.poll_trailers(cx)).await |
| 419 | } |
| 420 | |
| 421 | /// Poll for the next data frame. |
| 422 | pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, crate::Error>>> { |
| 423 | self.inner.inner.poll_data(cx).map_err(Into::into) |
| 424 | } |
| 425 | |
| 426 | #[doc (hidden)] |
| 427 | pub fn poll_trailers( |
| 428 | &mut self, |
| 429 | cx: &mut Context, |
| 430 | ) -> Poll<Result<Option<HeaderMap>, crate::Error>> { |
| 431 | match ready!(self.inner.inner.poll_trailers(cx)) { |
| 432 | Some(Ok(map)) => Poll::Ready(Ok(Some(map))), |
| 433 | Some(Err(e)) => Poll::Ready(Err(e.into())), |
| 434 | None => Poll::Ready(Ok(None)), |
| 435 | } |
| 436 | } |
| 437 | |
| 438 | /// Returns true if the receive half has reached the end of stream. |
| 439 | /// |
| 440 | /// A return value of `true` means that calls to `poll` and `poll_trailers` |
| 441 | /// will both return `None`. |
| 442 | pub fn is_end_stream(&self) -> bool { |
| 443 | self.inner.inner.is_end_stream() |
| 444 | } |
| 445 | |
| 446 | /// Get a mutable reference to this stream's `FlowControl`. |
| 447 | /// |
| 448 | /// It can be used immediately, or cloned to be used later. |
| 449 | pub fn flow_control(&mut self) -> &mut FlowControl { |
| 450 | &mut self.inner |
| 451 | } |
| 452 | |
| 453 | /// Returns the stream ID of this stream. |
| 454 | /// |
| 455 | /// # Panics |
| 456 | /// |
| 457 | /// If the lock on the stream store has been poisoned. |
| 458 | pub fn stream_id(&self) -> StreamId { |
| 459 | self.inner.stream_id() |
| 460 | } |
| 461 | } |
| 462 | |
| 463 | #[cfg (feature = "stream" )] |
| 464 | impl futures_core::Stream for RecvStream { |
| 465 | type Item = Result<Bytes, crate::Error>; |
| 466 | |
| 467 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 468 | self.poll_data(cx) |
| 469 | } |
| 470 | } |
| 471 | |
| 472 | impl fmt::Debug for RecvStream { |
| 473 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| 474 | fmt&mut DebugStruct<'_, '_>.debug_struct("RecvStream" ) |
| 475 | .field(name:"inner" , &self.inner) |
| 476 | .finish() |
| 477 | } |
| 478 | } |
| 479 | |
| 480 | impl Drop for RecvStream { |
| 481 | fn drop(&mut self) { |
| 482 | // Eagerly clear any received DATA frames now, since its no longer |
| 483 | // possible to retrieve them. However, this will be called |
| 484 | // again once *all* stream refs have been dropped, since |
| 485 | // this won't send a RST_STREAM frame, in case the user wishes to |
| 486 | // still *send* DATA. |
| 487 | self.inner.inner.clear_recv_buffer(); |
| 488 | } |
| 489 | } |
| 490 | |
| 491 | // ===== impl FlowControl ===== |
| 492 | |
| 493 | impl FlowControl { |
| 494 | pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self { |
| 495 | FlowControl { inner } |
| 496 | } |
| 497 | |
| 498 | /// Returns the stream ID of the stream whose capacity will |
| 499 | /// be released by this `FlowControl`. |
| 500 | pub fn stream_id(&self) -> StreamId { |
| 501 | StreamId::from_internal(self.inner.stream_id()) |
| 502 | } |
| 503 | |
| 504 | /// Get the current available capacity of data this stream *could* receive. |
| 505 | pub fn available_capacity(&self) -> isize { |
| 506 | self.inner.available_recv_capacity() |
| 507 | } |
| 508 | |
| 509 | /// Get the currently *used* capacity for this stream. |
| 510 | /// |
| 511 | /// This is the amount of bytes that can be released back to the remote. |
| 512 | pub fn used_capacity(&self) -> usize { |
| 513 | self.inner.used_recv_capacity() as usize |
| 514 | } |
| 515 | |
| 516 | /// Release window capacity back to remote stream. |
| 517 | /// |
| 518 | /// This releases capacity back to the stream level and the connection level |
| 519 | /// windows. Both window sizes will be increased by `sz`. |
| 520 | /// |
| 521 | /// See [struct level] documentation for more details. |
| 522 | /// |
| 523 | /// # Errors |
| 524 | /// |
| 525 | /// This function errors if increasing the receive window size by `sz` would |
| 526 | /// result in a window size greater than the target window size. In other |
| 527 | /// words, the caller cannot release more capacity than data has been |
| 528 | /// received. If 1024 bytes of data have been received, at most 1024 bytes |
| 529 | /// can be released. |
| 530 | /// |
| 531 | /// [struct level]: # |
| 532 | pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::Error> { |
| 533 | if sz > proto::MAX_WINDOW_SIZE as usize { |
| 534 | return Err(UserError::ReleaseCapacityTooBig.into()); |
| 535 | } |
| 536 | self.inner |
| 537 | .release_capacity(sz as proto::WindowSize) |
| 538 | .map_err(Into::into) |
| 539 | } |
| 540 | } |
| 541 | |
| 542 | // ===== impl PingPong ===== |
| 543 | |
| 544 | impl PingPong { |
| 545 | pub(crate) fn new(inner: proto::UserPings) -> Self { |
| 546 | PingPong { inner } |
| 547 | } |
| 548 | |
| 549 | /// Send a PING frame and wait for the peer to send the pong. |
| 550 | pub async fn ping(&mut self, ping: Ping) -> Result<Pong, crate::Error> { |
| 551 | self.send_ping(ping)?; |
| 552 | crate::poll_fn(|cx| self.poll_pong(cx)).await |
| 553 | } |
| 554 | |
| 555 | #[doc (hidden)] |
| 556 | pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::Error> { |
| 557 | // Passing a `Ping` here is just to be forwards-compatible with |
| 558 | // eventually allowing choosing a ping payload. For now, we can |
| 559 | // just ignore it. |
| 560 | let _ = ping; |
| 561 | |
| 562 | self.inner.send_ping().map_err(|err| match err { |
| 563 | Some(err) => err.into(), |
| 564 | None => UserError::SendPingWhilePending.into(), |
| 565 | }) |
| 566 | } |
| 567 | |
| 568 | #[doc (hidden)] |
| 569 | pub fn poll_pong(&mut self, cx: &mut Context) -> Poll<Result<Pong, crate::Error>> { |
| 570 | ready!(self.inner.poll_pong(cx))?; |
| 571 | Poll::Ready(Ok(Pong { _p: () })) |
| 572 | } |
| 573 | } |
| 574 | |
| 575 | impl fmt::Debug for PingPong { |
| 576 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| 577 | fmt.debug_struct(name:"PingPong" ).finish() |
| 578 | } |
| 579 | } |
| 580 | |
| 581 | // ===== impl Ping ===== |
| 582 | |
| 583 | impl Ping { |
| 584 | /// Creates a new opaque `Ping` to be sent via a [`PingPong`][]. |
| 585 | /// |
| 586 | /// The payload is "opaque", such that it shouldn't be depended on. |
| 587 | /// |
| 588 | /// [`PingPong`]: struct.PingPong.html |
| 589 | pub fn opaque() -> Ping { |
| 590 | Ping { _p: () } |
| 591 | } |
| 592 | } |
| 593 | |
| 594 | impl fmt::Debug for Ping { |
| 595 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| 596 | fmt.debug_struct(name:"Ping" ).finish() |
| 597 | } |
| 598 | } |
| 599 | |
| 600 | // ===== impl Pong ===== |
| 601 | |
| 602 | impl fmt::Debug for Pong { |
| 603 | fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { |
| 604 | fmt.debug_struct(name:"Pong" ).finish() |
| 605 | } |
| 606 | } |
| 607 | |