1 | use bytes::Buf; |
2 | use futures_core::{ready, stream::Stream}; |
3 | use http_body::{Body, Frame}; |
4 | use pin_project_lite::pin_project; |
5 | use std::{ |
6 | pin::Pin, |
7 | task::{Context, Poll}, |
8 | }; |
9 | |
10 | pin_project! { |
11 | /// A body created from a [`Stream`]. |
12 | #[derive (Clone, Copy, Debug)] |
13 | pub struct StreamBody<S> { |
14 | #[pin] |
15 | stream: S, |
16 | } |
17 | } |
18 | |
19 | impl<S> StreamBody<S> { |
20 | /// Create a new `StreamBody`. |
21 | pub fn new(stream: S) -> Self { |
22 | Self { stream } |
23 | } |
24 | } |
25 | |
26 | impl<S, D, E> Body for StreamBody<S> |
27 | where |
28 | S: Stream<Item = Result<Frame<D>, E>>, |
29 | D: Buf, |
30 | { |
31 | type Data = D; |
32 | type Error = E; |
33 | |
34 | fn poll_frame( |
35 | self: Pin<&mut Self>, |
36 | cx: &mut Context<'_>, |
37 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
38 | match self.project().stream.poll_next(cx) { |
39 | Poll::Ready(Some(result: Result, E>)) => Poll::Ready(Some(result)), |
40 | Poll::Ready(None) => Poll::Ready(None), |
41 | Poll::Pending => Poll::Pending, |
42 | } |
43 | } |
44 | } |
45 | |
46 | impl<S: Stream> Stream for StreamBody<S> { |
47 | type Item = S::Item; |
48 | |
49 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
50 | self.project().stream.poll_next(cx) |
51 | } |
52 | |
53 | fn size_hint(&self) -> (usize, Option<usize>) { |
54 | self.stream.size_hint() |
55 | } |
56 | } |
57 | |
58 | pin_project! { |
59 | /// A stream created from a [`Body`]. |
60 | #[derive (Clone, Copy, Debug)] |
61 | pub struct BodyStream<B> { |
62 | #[pin] |
63 | body: B, |
64 | } |
65 | } |
66 | |
67 | impl<B> BodyStream<B> { |
68 | /// Create a new `BodyStream`. |
69 | pub fn new(body: B) -> Self { |
70 | Self { body } |
71 | } |
72 | } |
73 | |
74 | impl<B> Body for BodyStream<B> |
75 | where |
76 | B: Body, |
77 | { |
78 | type Data = B::Data; |
79 | type Error = B::Error; |
80 | |
81 | fn poll_frame( |
82 | self: Pin<&mut Self>, |
83 | cx: &mut Context<'_>, |
84 | ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { |
85 | self.project().body.poll_frame(cx) |
86 | } |
87 | } |
88 | |
89 | impl<B> Stream for BodyStream<B> |
90 | where |
91 | B: Body, |
92 | { |
93 | type Item = Result<Frame<B::Data>, B::Error>; |
94 | |
95 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
96 | match self.project().body.poll_frame(cx) { |
97 | Poll::Ready(Some(frame: Result::Data>, …>)) => Poll::Ready(Some(frame)), |
98 | Poll::Ready(None) => Poll::Ready(None), |
99 | Poll::Pending => Poll::Pending, |
100 | } |
101 | } |
102 | } |
103 | |
104 | pin_project! { |
105 | /// A data stream created from a [`Body`]. |
106 | #[derive (Clone, Copy, Debug)] |
107 | pub struct BodyDataStream<B> { |
108 | #[pin] |
109 | body: B, |
110 | } |
111 | } |
112 | |
113 | impl<B> BodyDataStream<B> { |
114 | /// Create a new `BodyDataStream` |
115 | pub fn new(body: B) -> Self { |
116 | Self { body } |
117 | } |
118 | } |
119 | |
120 | impl<B> Stream for BodyDataStream<B> |
121 | where |
122 | B: Body, |
123 | { |
124 | type Item = Result<B::Data, B::Error>; |
125 | |
126 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
127 | loop { |
128 | return match ready!(self.as_mut().project().body.poll_frame(cx)) { |
129 | Some(Ok(frame: Frame<::Data>)) => match frame.into_data() { |
130 | Ok(bytes: ::Data) => Poll::Ready(Some(Ok(bytes))), |
131 | Err(_) => continue, |
132 | }, |
133 | Some(Err(err: ::Error)) => Poll::Ready(Some(Err(err))), |
134 | None => Poll::Ready(None), |
135 | }; |
136 | } |
137 | } |
138 | } |
139 | |
140 | #[cfg (test)] |
141 | mod tests { |
142 | use crate::{BodyExt, BodyStream, StreamBody}; |
143 | use bytes::Bytes; |
144 | use futures_util::StreamExt; |
145 | use http_body::Frame; |
146 | use std::convert::Infallible; |
147 | |
148 | #[tokio::test] |
149 | async fn body_from_stream() { |
150 | let chunks: Vec<Result<_, Infallible>> = vec![ |
151 | Ok(Frame::data(Bytes::from(vec![1]))), |
152 | Ok(Frame::data(Bytes::from(vec![2]))), |
153 | Ok(Frame::data(Bytes::from(vec![3]))), |
154 | ]; |
155 | let stream = futures_util::stream::iter(chunks); |
156 | let mut body = StreamBody::new(stream); |
157 | |
158 | assert_eq!( |
159 | body.frame() |
160 | .await |
161 | .unwrap() |
162 | .unwrap() |
163 | .into_data() |
164 | .unwrap() |
165 | .as_ref(), |
166 | [1] |
167 | ); |
168 | assert_eq!( |
169 | body.frame() |
170 | .await |
171 | .unwrap() |
172 | .unwrap() |
173 | .into_data() |
174 | .unwrap() |
175 | .as_ref(), |
176 | [2] |
177 | ); |
178 | assert_eq!( |
179 | body.frame() |
180 | .await |
181 | .unwrap() |
182 | .unwrap() |
183 | .into_data() |
184 | .unwrap() |
185 | .as_ref(), |
186 | [3] |
187 | ); |
188 | |
189 | assert!(body.frame().await.is_none()); |
190 | } |
191 | |
192 | #[tokio::test] |
193 | async fn stream_from_body() { |
194 | let chunks: Vec<Result<_, Infallible>> = vec![ |
195 | Ok(Frame::data(Bytes::from(vec![1]))), |
196 | Ok(Frame::data(Bytes::from(vec![2]))), |
197 | Ok(Frame::data(Bytes::from(vec![3]))), |
198 | ]; |
199 | let stream = futures_util::stream::iter(chunks); |
200 | let body = StreamBody::new(stream); |
201 | |
202 | let mut stream = BodyStream::new(body); |
203 | |
204 | assert_eq!( |
205 | stream |
206 | .next() |
207 | .await |
208 | .unwrap() |
209 | .unwrap() |
210 | .into_data() |
211 | .unwrap() |
212 | .as_ref(), |
213 | [1] |
214 | ); |
215 | assert_eq!( |
216 | stream |
217 | .next() |
218 | .await |
219 | .unwrap() |
220 | .unwrap() |
221 | .into_data() |
222 | .unwrap() |
223 | .as_ref(), |
224 | [2] |
225 | ); |
226 | assert_eq!( |
227 | stream |
228 | .next() |
229 | .await |
230 | .unwrap() |
231 | .unwrap() |
232 | .into_data() |
233 | .unwrap() |
234 | .as_ref(), |
235 | [3] |
236 | ); |
237 | |
238 | assert!(stream.next().await.is_none()); |
239 | } |
240 | } |
241 | |