1use std::{
2 collections::VecDeque,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use super::Body;
9
10use bytes::{Buf, Bytes};
11use http::HeaderMap;
12use pin_project_lite::pin_project;
13
14pin_project! {
15 /// Future that resolves into a [`Collected`].
16 pub struct Collect<T>
17 where
18 T: Body,
19 {
20 #[pin]
21 body: T,
22 collected: Option<Collected<T::Data>>,
23 is_data_done: bool,
24 }
25}
26
27impl<T: Body> Collect<T> {
28 pub(crate) fn new(body: T) -> Self {
29 Self {
30 body,
31 collected: Some(Collected::default()),
32 is_data_done: false,
33 }
34 }
35}
36
37impl<T: Body> Future for Collect<T> {
38 type Output = Result<Collected<T::Data>, T::Error>;
39
40 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41 let mut me = self.project();
42
43 loop {
44 if !*me.is_data_done {
45 match me.body.as_mut().poll_data(cx) {
46 Poll::Ready(Some(Ok(data))) => {
47 me.collected.as_mut().unwrap().push_data(data);
48 }
49 Poll::Ready(Some(Err(err))) => {
50 return Poll::Ready(Err(err));
51 }
52 Poll::Ready(None) => {
53 *me.is_data_done = true;
54 }
55 Poll::Pending => return Poll::Pending,
56 }
57 } else {
58 match me.body.as_mut().poll_trailers(cx) {
59 Poll::Ready(Ok(Some(trailers))) => {
60 me.collected.as_mut().unwrap().push_trailers(trailers);
61 break;
62 }
63 Poll::Ready(Err(err)) => {
64 return Poll::Ready(Err(err));
65 }
66 Poll::Ready(Ok(None)) => break,
67 Poll::Pending => return Poll::Pending,
68 }
69 }
70 }
71
72 Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
73 }
74}
75
76/// A collected body produced by [`Body::collect`] which collects all the DATA frames
77/// and trailers.
78#[derive(Debug)]
79pub struct Collected<B> {
80 bufs: BufList<B>,
81 trailers: Option<HeaderMap>,
82}
83
84impl<B: Buf> Collected<B> {
85 /// If there is a trailers frame buffered, returns a reference to it.
86 ///
87 /// Returns `None` if the body contained no trailers.
88 pub fn trailers(&self) -> Option<&HeaderMap> {
89 self.trailers.as_ref()
90 }
91
92 /// Aggregate this buffered into a [`Buf`].
93 pub fn aggregate(self) -> impl Buf {
94 self.bufs
95 }
96
97 /// Convert this body into a [`Bytes`].
98 pub fn to_bytes(mut self) -> Bytes {
99 self.bufs.copy_to_bytes(self.bufs.remaining())
100 }
101
102 fn push_data(&mut self, data: B) {
103 // Only push this frame if it has some data in it, to avoid crashing on
104 // `BufList::push`.
105 if data.has_remaining() {
106 self.bufs.push(data);
107 }
108 }
109
110 fn push_trailers(&mut self, trailers: HeaderMap) {
111 if let Some(current) = &mut self.trailers {
112 current.extend(trailers);
113 } else {
114 self.trailers = Some(trailers);
115 }
116 }
117}
118
119impl<B> Default for Collected<B> {
120 fn default() -> Self {
121 Self {
122 bufs: BufList::default(),
123 trailers: None,
124 }
125 }
126}
127
128impl<B> Unpin for Collected<B> {}
129
130#[derive(Debug)]
131struct BufList<T> {
132 bufs: VecDeque<T>,
133}
134
135impl<T: Buf> BufList<T> {
136 #[inline]
137 pub(crate) fn push(&mut self, buf: T) {
138 debug_assert!(buf.has_remaining());
139 self.bufs.push_back(buf);
140 }
141
142 /*
143 #[inline]
144 pub(crate) fn pop(&mut self) -> Option<T> {
145 self.bufs.pop_front()
146 }
147 */
148}
149
150impl<T: Buf> Buf for BufList<T> {
151 #[inline]
152 fn remaining(&self) -> usize {
153 self.bufs.iter().map(|buf| buf.remaining()).sum()
154 }
155
156 #[inline]
157 fn chunk(&self) -> &[u8] {
158 self.bufs.front().map(Buf::chunk).unwrap_or_default()
159 }
160
161 #[inline]
162 fn advance(&mut self, mut cnt: usize) {
163 while cnt > 0 {
164 {
165 let front = &mut self.bufs[0];
166 let rem = front.remaining();
167 if rem > cnt {
168 front.advance(cnt);
169 return;
170 } else {
171 front.advance(rem);
172 cnt -= rem;
173 }
174 }
175 self.bufs.pop_front();
176 }
177 }
178
179 #[inline]
180 fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
181 if dst.is_empty() {
182 return 0;
183 }
184 let mut vecs = 0;
185 for buf in &self.bufs {
186 vecs += buf.chunks_vectored(&mut dst[vecs..]);
187 if vecs == dst.len() {
188 break;
189 }
190 }
191 vecs
192 }
193
194 #[inline]
195 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
196 use bytes::{BufMut, BytesMut};
197 // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
198 // request can be fulfilled by the front buffer, we can take advantage.
199 match self.bufs.front_mut() {
200 Some(front) if front.remaining() == len => {
201 let b = front.copy_to_bytes(len);
202 self.bufs.pop_front();
203 b
204 }
205 Some(front) if front.remaining() > len => front.copy_to_bytes(len),
206 _ => {
207 assert!(len <= self.remaining(), "`len` greater than remaining");
208 let mut bm = BytesMut::with_capacity(len);
209 bm.put(self.take(len));
210 bm.freeze()
211 }
212 }
213 }
214}
215
216impl<T> Default for BufList<T> {
217 fn default() -> Self {
218 BufList {
219 bufs: VecDeque::new(),
220 }
221 }
222}
223