1use crate::yielder::Receiver;
2
3use futures_core::{FusedStream, Stream};
4use pin_project_lite::pin_project;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9pin_project! {
10 #[doc(hidden)]
11 #[derive(Debug)]
12 pub struct AsyncStream<T, U> {
13 rx: Receiver<T>,
14 done: bool,
15 #[pin]
16 generator: U,
17 }
18}
19
20impl<T, U> AsyncStream<T, U> {
21 #[doc(hidden)]
22 pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
23 AsyncStream {
24 rx,
25 done: false,
26 generator,
27 }
28 }
29}
30
31impl<T, U> FusedStream for AsyncStream<T, U>
32where
33 U: Future<Output = ()>,
34{
35 fn is_terminated(&self) -> bool {
36 self.done
37 }
38}
39
40impl<T, U> Stream for AsyncStream<T, U>
41where
42 U: Future<Output = ()>,
43{
44 type Item = T;
45
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
47 let me = self.project();
48
49 if *me.done {
50 return Poll::Ready(None);
51 }
52
53 let mut dst = None;
54 let res = {
55 let _enter = me.rx.enter(&mut dst);
56 me.generator.poll(cx)
57 };
58
59 *me.done = res.is_ready();
60
61 if dst.is_some() {
62 return Poll::Ready(dst.take());
63 }
64
65 if *me.done {
66 Poll::Ready(None)
67 } else {
68 Poll::Pending
69 }
70 }
71
72 fn size_hint(&self) -> (usize, Option<usize>) {
73 if self.done {
74 (0, Some(0))
75 } else {
76 (0, None)
77 }
78 }
79}
80