1use crate::Stream;
2
3use core::future::Future;
4use core::marker::PhantomPinned;
5use core::mem;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use pin_project_lite::pin_project;
9
10// Do not export this struct until `FromStream` can be unsealed.
11pin_project! {
12 /// Future returned by the [`collect`](super::StreamExt::collect) method.
13 #[must_use = "futures do nothing unless you `.await` or poll them"]
14 #[derive(Debug)]
15 pub struct Collect<T, U>
16 where
17 T: Stream,
18 U: FromStream<T::Item>,
19 {
20 #[pin]
21 stream: T,
22 collection: U::InternalCollection,
23 // Make this future `!Unpin` for compatibility with async trait methods.
24 #[pin]
25 _pin: PhantomPinned,
26 }
27}
28
29/// Convert from a [`Stream`].
30///
31/// This trait is not intended to be used directly. Instead, call
32/// [`StreamExt::collect()`](super::StreamExt::collect).
33///
34/// # Implementing
35///
36/// Currently, this trait may not be implemented by third parties. The trait is
37/// sealed in order to make changes in the future. Stabilization is pending
38/// enhancements to the Rust language.
39pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
40
41impl<T, U> Collect<T, U>
42where
43 T: Stream,
44 U: FromStream<T::Item>,
45{
46 pub(super) fn new(stream: T) -> Collect<T, U> {
47 let (lower, upper) = stream.size_hint();
48 let collection = U::initialize(sealed::Internal, lower, upper);
49
50 Collect {
51 stream,
52 collection,
53 _pin: PhantomPinned,
54 }
55 }
56}
57
58impl<T, U> Future for Collect<T, U>
59where
60 T: Stream,
61 U: FromStream<T::Item>,
62{
63 type Output = U;
64
65 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
66 use Poll::Ready;
67
68 loop {
69 let me = self.as_mut().project();
70
71 let item = match ready!(me.stream.poll_next(cx)) {
72 Some(item) => item,
73 None => {
74 return Ready(U::finalize(sealed::Internal, me.collection));
75 }
76 };
77
78 if !U::extend(sealed::Internal, me.collection, item) {
79 return Ready(U::finalize(sealed::Internal, me.collection));
80 }
81 }
82 }
83}
84
85// ===== FromStream implementations
86
87impl FromStream<()> for () {}
88
89impl sealed::FromStreamPriv<()> for () {
90 type InternalCollection = ();
91
92 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
93
94 fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
95 true
96 }
97
98 fn finalize(_: sealed::Internal, _collection: &mut ()) {}
99}
100
101impl<T: AsRef<str>> FromStream<T> for String {}
102
103impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
104 type InternalCollection = String;
105
106 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
107 String::new()
108 }
109
110 fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
111 collection.push_str(item.as_ref());
112 true
113 }
114
115 fn finalize(_: sealed::Internal, collection: &mut String) -> String {
116 mem::take(collection)
117 }
118}
119
120impl<T> FromStream<T> for Vec<T> {}
121
122impl<T> sealed::FromStreamPriv<T> for Vec<T> {
123 type InternalCollection = Vec<T>;
124
125 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
126 Vec::with_capacity(lower)
127 }
128
129 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
130 collection.push(item);
131 true
132 }
133
134 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
135 mem::take(collection)
136 }
137}
138
139impl<T> FromStream<T> for Box<[T]> {}
140
141impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
142 type InternalCollection = Vec<T>;
143
144 fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
145 <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
146 }
147
148 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
149 <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
150 }
151
152 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
153 <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
154 .into_boxed_slice()
155 }
156}
157
158impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
159
160impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
161where
162 U: FromStream<T>,
163{
164 type InternalCollection = Result<U::InternalCollection, E>;
165
166 fn initialize(
167 _: sealed::Internal,
168 lower: usize,
169 upper: Option<usize>,
170 ) -> Result<U::InternalCollection, E> {
171 Ok(U::initialize(sealed::Internal, lower, upper))
172 }
173
174 fn extend(
175 _: sealed::Internal,
176 collection: &mut Self::InternalCollection,
177 item: Result<T, E>,
178 ) -> bool {
179 assert!(collection.is_ok());
180 match item {
181 Ok(item) => {
182 let collection = collection.as_mut().ok().expect("invalid state");
183 U::extend(sealed::Internal, collection, item)
184 }
185 Err(err) => {
186 *collection = Err(err);
187 false
188 }
189 }
190 }
191
192 fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
193 if let Ok(collection) = collection.as_mut() {
194 Ok(U::finalize(sealed::Internal, collection))
195 } else {
196 let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
197
198 Err(res.map(drop).unwrap_err())
199 }
200 }
201}
202
203pub(crate) mod sealed {
204 #[doc(hidden)]
205 pub trait FromStreamPriv<T> {
206 /// Intermediate type used during collection process
207 ///
208 /// The name of this type is internal and cannot be relied upon.
209 type InternalCollection;
210
211 /// Initialize the collection
212 fn initialize(
213 internal: Internal,
214 lower: usize,
215 upper: Option<usize>,
216 ) -> Self::InternalCollection;
217
218 /// Extend the collection with the received item
219 ///
220 /// Return `true` to continue streaming, `false` complete collection.
221 fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
222
223 /// Finalize collection into target type.
224 fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
225 }
226
227 #[allow(missing_debug_implementations)]
228 pub struct Internal;
229}
230