| 1 | use bytes::Buf; |
| 2 | use futures_core::{ready, stream::Stream}; |
| 3 | use http_body::{Body, Frame}; |
| 4 | use pin_project_lite::pin_project; |
| 5 | use std::{ |
| 6 | pin::Pin, |
| 7 | task::{Context, Poll}, |
| 8 | }; |
| 9 | |
| 10 | pin_project! { |
| 11 | /// A body created from a [`Stream`]. |
| 12 | #[derive (Clone, Copy, Debug)] |
| 13 | pub struct StreamBody<S> { |
| 14 | #[pin] |
| 15 | stream: S, |
| 16 | } |
| 17 | } |
| 18 | |
| 19 | impl<S> StreamBody<S> { |
| 20 | /// Create a new `StreamBody`. |
| 21 | pub fn new(stream: S) -> Self { |
| 22 | Self { stream } |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | impl<S, D, E> Body for StreamBody<S> |
| 27 | where |
| 28 | S: Stream<Item = Result<Frame<D>, E>>, |
| 29 | D: Buf, |
| 30 | { |
| 31 | type Data = D; |
| 32 | type Error = E; |
| 33 | |
| 34 | fn poll_frame( |
| 35 | self: Pin<&mut Self>, |
| 36 | cx: &mut Context<'_>, |
| 37 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
| 38 | match self.project().stream.poll_next(cx) { |
| 39 | Poll::Ready(Some(result: Result, E>)) => Poll::Ready(Some(result)), |
| 40 | Poll::Ready(None) => Poll::Ready(None), |
| 41 | Poll::Pending => Poll::Pending, |
| 42 | } |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | impl<S: Stream> Stream for StreamBody<S> { |
| 47 | type Item = S::Item; |
| 48 | |
| 49 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 50 | self.project().stream.poll_next(cx) |
| 51 | } |
| 52 | |
| 53 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 54 | self.stream.size_hint() |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | pin_project! { |
| 59 | /// A stream created from a [`Body`]. |
| 60 | #[derive (Clone, Copy, Debug)] |
| 61 | pub struct BodyStream<B> { |
| 62 | #[pin] |
| 63 | body: B, |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | impl<B> BodyStream<B> { |
| 68 | /// Create a new `BodyStream`. |
| 69 | pub fn new(body: B) -> Self { |
| 70 | Self { body } |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | impl<B> Body for BodyStream<B> |
| 75 | where |
| 76 | B: Body, |
| 77 | { |
| 78 | type Data = B::Data; |
| 79 | type Error = B::Error; |
| 80 | |
| 81 | fn poll_frame( |
| 82 | self: Pin<&mut Self>, |
| 83 | cx: &mut Context<'_>, |
| 84 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
| 85 | self.project().body.poll_frame(cx) |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | impl<B> Stream for BodyStream<B> |
| 90 | where |
| 91 | B: Body, |
| 92 | { |
| 93 | type Item = Result<Frame<B::Data>, B::Error>; |
| 94 | |
| 95 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 96 | match self.project().body.poll_frame(cx) { |
| 97 | Poll::Ready(Some(frame: Result::Data>, …>)) => Poll::Ready(Some(frame)), |
| 98 | Poll::Ready(None) => Poll::Ready(None), |
| 99 | Poll::Pending => Poll::Pending, |
| 100 | } |
| 101 | } |
| 102 | } |
| 103 | |
| 104 | pin_project! { |
| 105 | /// A data stream created from a [`Body`]. |
| 106 | #[derive (Clone, Copy, Debug)] |
| 107 | pub struct BodyDataStream<B> { |
| 108 | #[pin] |
| 109 | body: B, |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | impl<B> BodyDataStream<B> { |
| 114 | /// Create a new `BodyDataStream` |
| 115 | pub fn new(body: B) -> Self { |
| 116 | Self { body } |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | impl<B> Stream for BodyDataStream<B> |
| 121 | where |
| 122 | B: Body, |
| 123 | { |
| 124 | type Item = Result<B::Data, B::Error>; |
| 125 | |
| 126 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 127 | loop { |
| 128 | return match ready!(self.as_mut().project().body.poll_frame(cx)) { |
| 129 | Some(Ok(frame: Frame<::Data>)) => match frame.into_data() { |
| 130 | Ok(bytes: ::Data) => Poll::Ready(Some(Ok(bytes))), |
| 131 | Err(_) => continue, |
| 132 | }, |
| 133 | Some(Err(err: ::Error)) => Poll::Ready(Some(Err(err))), |
| 134 | None => Poll::Ready(None), |
| 135 | }; |
| 136 | } |
| 137 | } |
| 138 | } |
| 139 | |
| 140 | #[cfg (test)] |
| 141 | mod tests { |
| 142 | use crate::{BodyExt, BodyStream, StreamBody}; |
| 143 | use bytes::Bytes; |
| 144 | use futures_util::StreamExt; |
| 145 | use http_body::Frame; |
| 146 | use std::convert::Infallible; |
| 147 | |
| 148 | #[tokio::test] |
| 149 | async fn body_from_stream() { |
| 150 | let chunks: Vec<Result<_, Infallible>> = vec![ |
| 151 | Ok(Frame::data(Bytes::from(vec![1]))), |
| 152 | Ok(Frame::data(Bytes::from(vec![2]))), |
| 153 | Ok(Frame::data(Bytes::from(vec![3]))), |
| 154 | ]; |
| 155 | let stream = futures_util::stream::iter(chunks); |
| 156 | let mut body = StreamBody::new(stream); |
| 157 | |
| 158 | assert_eq!( |
| 159 | body.frame() |
| 160 | .await |
| 161 | .unwrap() |
| 162 | .unwrap() |
| 163 | .into_data() |
| 164 | .unwrap() |
| 165 | .as_ref(), |
| 166 | [1] |
| 167 | ); |
| 168 | assert_eq!( |
| 169 | body.frame() |
| 170 | .await |
| 171 | .unwrap() |
| 172 | .unwrap() |
| 173 | .into_data() |
| 174 | .unwrap() |
| 175 | .as_ref(), |
| 176 | [2] |
| 177 | ); |
| 178 | assert_eq!( |
| 179 | body.frame() |
| 180 | .await |
| 181 | .unwrap() |
| 182 | .unwrap() |
| 183 | .into_data() |
| 184 | .unwrap() |
| 185 | .as_ref(), |
| 186 | [3] |
| 187 | ); |
| 188 | |
| 189 | assert!(body.frame().await.is_none()); |
| 190 | } |
| 191 | |
| 192 | #[tokio::test] |
| 193 | async fn stream_from_body() { |
| 194 | let chunks: Vec<Result<_, Infallible>> = vec![ |
| 195 | Ok(Frame::data(Bytes::from(vec![1]))), |
| 196 | Ok(Frame::data(Bytes::from(vec![2]))), |
| 197 | Ok(Frame::data(Bytes::from(vec![3]))), |
| 198 | ]; |
| 199 | let stream = futures_util::stream::iter(chunks); |
| 200 | let body = StreamBody::new(stream); |
| 201 | |
| 202 | let mut stream = BodyStream::new(body); |
| 203 | |
| 204 | assert_eq!( |
| 205 | stream |
| 206 | .next() |
| 207 | .await |
| 208 | .unwrap() |
| 209 | .unwrap() |
| 210 | .into_data() |
| 211 | .unwrap() |
| 212 | .as_ref(), |
| 213 | [1] |
| 214 | ); |
| 215 | assert_eq!( |
| 216 | stream |
| 217 | .next() |
| 218 | .await |
| 219 | .unwrap() |
| 220 | .unwrap() |
| 221 | .into_data() |
| 222 | .unwrap() |
| 223 | .as_ref(), |
| 224 | [2] |
| 225 | ); |
| 226 | assert_eq!( |
| 227 | stream |
| 228 | .next() |
| 229 | .await |
| 230 | .unwrap() |
| 231 | .unwrap() |
| 232 | .into_data() |
| 233 | .unwrap() |
| 234 | .as_ref(), |
| 235 | [3] |
| 236 | ); |
| 237 | |
| 238 | assert!(stream.next().await.is_none()); |
| 239 | } |
| 240 | } |
| 241 | |