1 | use core::pin::Pin; |
2 | use futures_core::stream::{FusedStream, Stream, TryStream}; |
3 | use futures_core::task::{Context, Poll}; |
4 | #[cfg (feature = "sink" )] |
5 | use futures_sink::Sink; |
6 | use pin_project_lite::pin_project; |
7 | |
8 | pin_project! { |
9 | /// Stream for the [`into_stream`](super::TryStreamExt::into_stream) method. |
10 | #[derive(Debug)] |
11 | #[must_use = "streams do nothing unless polled" ] |
12 | pub struct IntoStream<St> { |
13 | #[pin] |
14 | stream: St, |
15 | } |
16 | } |
17 | |
18 | impl<St> IntoStream<St> { |
19 | #[inline ] |
20 | pub(super) fn new(stream: St) -> Self { |
21 | Self { stream } |
22 | } |
23 | |
24 | delegate_access_inner!(stream, St, ()); |
25 | } |
26 | |
27 | impl<St: TryStream + FusedStream> FusedStream for IntoStream<St> { |
28 | fn is_terminated(&self) -> bool { |
29 | self.stream.is_terminated() |
30 | } |
31 | } |
32 | |
33 | impl<St: TryStream> Stream for IntoStream<St> { |
34 | type Item = Result<St::Ok, St::Error>; |
35 | |
36 | #[inline ] |
37 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
38 | self.project().stream.try_poll_next(cx) |
39 | } |
40 | |
41 | fn size_hint(&self) -> (usize, Option<usize>) { |
42 | self.stream.size_hint() |
43 | } |
44 | } |
45 | |
46 | // Forwarding impl of Sink from the underlying stream |
47 | #[cfg (feature = "sink" )] |
48 | impl<S: Sink<Item>, Item> Sink<Item> for IntoStream<S> { |
49 | type Error = S::Error; |
50 | |
51 | delegate_sink!(stream, Item); |
52 | } |
53 | |