1 | use std::{ |
2 | collections::VecDeque, |
3 | future::Future, |
4 | pin::Pin, |
5 | task::{Context, Poll}, |
6 | }; |
7 | |
8 | use super::Body; |
9 | |
10 | use bytes::{Buf, Bytes}; |
11 | use http::HeaderMap; |
12 | use pin_project_lite::pin_project; |
13 | |
14 | pin_project! { |
15 | /// Future that resolves into a [`Collected`]. |
16 | pub struct Collect<T> |
17 | where |
18 | T: Body, |
19 | { |
20 | #[pin] |
21 | body: T, |
22 | collected: Option<Collected<T::Data>>, |
23 | is_data_done: bool, |
24 | } |
25 | } |
26 | |
27 | impl<T: Body> Collect<T> { |
28 | pub(crate) fn new(body: T) -> Self { |
29 | Self { |
30 | body, |
31 | collected: Some(Collected::default()), |
32 | is_data_done: false, |
33 | } |
34 | } |
35 | } |
36 | |
37 | impl<T: Body> Future for Collect<T> { |
38 | type Output = Result<Collected<T::Data>, T::Error>; |
39 | |
40 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
41 | let mut me = self.project(); |
42 | |
43 | loop { |
44 | if !*me.is_data_done { |
45 | match me.body.as_mut().poll_data(cx) { |
46 | Poll::Ready(Some(Ok(data))) => { |
47 | me.collected.as_mut().unwrap().push_data(data); |
48 | } |
49 | Poll::Ready(Some(Err(err))) => { |
50 | return Poll::Ready(Err(err)); |
51 | } |
52 | Poll::Ready(None) => { |
53 | *me.is_data_done = true; |
54 | } |
55 | Poll::Pending => return Poll::Pending, |
56 | } |
57 | } else { |
58 | match me.body.as_mut().poll_trailers(cx) { |
59 | Poll::Ready(Ok(Some(trailers))) => { |
60 | me.collected.as_mut().unwrap().push_trailers(trailers); |
61 | break; |
62 | } |
63 | Poll::Ready(Err(err)) => { |
64 | return Poll::Ready(Err(err)); |
65 | } |
66 | Poll::Ready(Ok(None)) => break, |
67 | Poll::Pending => return Poll::Pending, |
68 | } |
69 | } |
70 | } |
71 | |
72 | Poll::Ready(Ok(me.collected.take().expect("polled after complete" ))) |
73 | } |
74 | } |
75 | |
76 | /// A collected body produced by [`Body::collect`] which collects all the DATA frames |
77 | /// and trailers. |
78 | #[derive (Debug)] |
79 | pub struct Collected<B> { |
80 | bufs: BufList<B>, |
81 | trailers: Option<HeaderMap>, |
82 | } |
83 | |
84 | impl<B: Buf> Collected<B> { |
85 | /// If there is a trailers frame buffered, returns a reference to it. |
86 | /// |
87 | /// Returns `None` if the body contained no trailers. |
88 | pub fn trailers(&self) -> Option<&HeaderMap> { |
89 | self.trailers.as_ref() |
90 | } |
91 | |
92 | /// Aggregate this buffered into a [`Buf`]. |
93 | pub fn aggregate(self) -> impl Buf { |
94 | self.bufs |
95 | } |
96 | |
97 | /// Convert this body into a [`Bytes`]. |
98 | pub fn to_bytes(mut self) -> Bytes { |
99 | self.bufs.copy_to_bytes(self.bufs.remaining()) |
100 | } |
101 | |
102 | fn push_data(&mut self, data: B) { |
103 | // Only push this frame if it has some data in it, to avoid crashing on |
104 | // `BufList::push`. |
105 | if data.has_remaining() { |
106 | self.bufs.push(data); |
107 | } |
108 | } |
109 | |
110 | fn push_trailers(&mut self, trailers: HeaderMap) { |
111 | if let Some(current) = &mut self.trailers { |
112 | current.extend(trailers); |
113 | } else { |
114 | self.trailers = Some(trailers); |
115 | } |
116 | } |
117 | } |
118 | |
119 | impl<B> Default for Collected<B> { |
120 | fn default() -> Self { |
121 | Self { |
122 | bufs: BufList::default(), |
123 | trailers: None, |
124 | } |
125 | } |
126 | } |
127 | |
128 | impl<B> Unpin for Collected<B> {} |
129 | |
130 | #[derive (Debug)] |
131 | struct BufList<T> { |
132 | bufs: VecDeque<T>, |
133 | } |
134 | |
135 | impl<T: Buf> BufList<T> { |
136 | #[inline ] |
137 | pub(crate) fn push(&mut self, buf: T) { |
138 | debug_assert!(buf.has_remaining()); |
139 | self.bufs.push_back(buf); |
140 | } |
141 | |
142 | /* |
143 | #[inline] |
144 | pub(crate) fn pop(&mut self) -> Option<T> { |
145 | self.bufs.pop_front() |
146 | } |
147 | */ |
148 | } |
149 | |
150 | impl<T: Buf> Buf for BufList<T> { |
151 | #[inline ] |
152 | fn remaining(&self) -> usize { |
153 | self.bufs.iter().map(|buf| buf.remaining()).sum() |
154 | } |
155 | |
156 | #[inline ] |
157 | fn chunk(&self) -> &[u8] { |
158 | self.bufs.front().map(Buf::chunk).unwrap_or_default() |
159 | } |
160 | |
161 | #[inline ] |
162 | fn advance(&mut self, mut cnt: usize) { |
163 | while cnt > 0 { |
164 | { |
165 | let front = &mut self.bufs[0]; |
166 | let rem = front.remaining(); |
167 | if rem > cnt { |
168 | front.advance(cnt); |
169 | return; |
170 | } else { |
171 | front.advance(rem); |
172 | cnt -= rem; |
173 | } |
174 | } |
175 | self.bufs.pop_front(); |
176 | } |
177 | } |
178 | |
179 | #[inline ] |
180 | fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { |
181 | if dst.is_empty() { |
182 | return 0; |
183 | } |
184 | let mut vecs = 0; |
185 | for buf in &self.bufs { |
186 | vecs += buf.chunks_vectored(&mut dst[vecs..]); |
187 | if vecs == dst.len() { |
188 | break; |
189 | } |
190 | } |
191 | vecs |
192 | } |
193 | |
194 | #[inline ] |
195 | fn copy_to_bytes(&mut self, len: usize) -> Bytes { |
196 | use bytes::{BufMut, BytesMut}; |
197 | // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole |
198 | // request can be fulfilled by the front buffer, we can take advantage. |
199 | match self.bufs.front_mut() { |
200 | Some(front) if front.remaining() == len => { |
201 | let b = front.copy_to_bytes(len); |
202 | self.bufs.pop_front(); |
203 | b |
204 | } |
205 | Some(front) if front.remaining() > len => front.copy_to_bytes(len), |
206 | _ => { |
207 | assert!(len <= self.remaining(), "`len` greater than remaining" ); |
208 | let mut bm = BytesMut::with_capacity(len); |
209 | bm.put(self.take(len)); |
210 | bm.freeze() |
211 | } |
212 | } |
213 | } |
214 | } |
215 | |
216 | impl<T> Default for BufList<T> { |
217 | fn default() -> Self { |
218 | BufList { |
219 | bufs: VecDeque::new(), |
220 | } |
221 | } |
222 | } |
223 | |