1 | use crate::stream::Fuse; |
2 | use core::pin::Pin; |
3 | use futures_core::future::{FusedFuture, Future}; |
4 | use futures_core::ready; |
5 | use futures_core::stream::Stream; |
6 | use futures_core::task::{Context, Poll}; |
7 | use futures_sink::Sink; |
8 | use pin_project_lite::pin_project; |
9 | |
10 | pin_project! { |
11 | /// Future for the [`forward`](super::StreamExt::forward) method. |
12 | #[project = ForwardProj] |
13 | #[derive(Debug)] |
14 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
15 | pub struct Forward<St, Si, Item> { |
16 | #[pin] |
17 | sink: Option<Si>, |
18 | #[pin] |
19 | stream: Fuse<St>, |
20 | buffered_item: Option<Item>, |
21 | } |
22 | } |
23 | |
24 | impl<St, Si, Item> Forward<St, Si, Item> { |
25 | pub(crate) fn new(stream: St, sink: Si) -> Self { |
26 | Self { sink: Some(sink), stream: Fuse::new(stream), buffered_item: None } |
27 | } |
28 | } |
29 | |
30 | impl<St, Si, Item, E> FusedFuture for Forward<St, Si, Item> |
31 | where |
32 | Si: Sink<Item, Error = E>, |
33 | St: Stream<Item = Result<Item, E>>, |
34 | { |
35 | fn is_terminated(&self) -> bool { |
36 | self.sink.is_none() |
37 | } |
38 | } |
39 | |
40 | impl<St, Si, Item, E> Future for Forward<St, Si, Item> |
41 | where |
42 | Si: Sink<Item, Error = E>, |
43 | St: Stream<Item = Result<Item, E>>, |
44 | { |
45 | type Output = Result<(), E>; |
46 | |
47 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
48 | let ForwardProj { mut sink, mut stream, buffered_item } = self.project(); |
49 | let mut si = sink.as_mut().as_pin_mut().expect("polled `Forward` after completion" ); |
50 | |
51 | loop { |
52 | // If we've got an item buffered already, we need to write it to the |
53 | // sink before we can do anything else |
54 | if buffered_item.is_some() { |
55 | ready!(si.as_mut().poll_ready(cx))?; |
56 | si.as_mut().start_send(buffered_item.take().unwrap())?; |
57 | } |
58 | |
59 | match stream.as_mut().poll_next(cx)? { |
60 | Poll::Ready(Some(item)) => { |
61 | *buffered_item = Some(item); |
62 | } |
63 | Poll::Ready(None) => { |
64 | ready!(si.poll_close(cx))?; |
65 | sink.set(None); |
66 | return Poll::Ready(Ok(())); |
67 | } |
68 | Poll::Pending => { |
69 | ready!(si.poll_flush(cx))?; |
70 | return Poll::Pending; |
71 | } |
72 | } |
73 | } |
74 | } |
75 | } |
76 | |