1#![allow(dead_code)]
2
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender};
6use tokio_stream::Stream;
7
8struct UnboundedStream<T> {
9 recv: UnboundedReceiver<T>,
10}
11impl<T> Stream for UnboundedStream<T> {
12 type Item = T;
13 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
14 Pin::into_inner(self).recv.poll_recv(cx)
15 }
16}
17
18pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
19 let (tx, rx) = mpsc::unbounded_channel();
20
21 let stream = UnboundedStream { recv: rx };
22
23 (tx, stream)
24}
25
26struct BoundedStream<T> {
27 recv: Receiver<T>,
28}
29impl<T> Stream for BoundedStream<T> {
30 type Item = T;
31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
32 Pin::into_inner(self).recv.poll_recv(cx)
33 }
34}
35
36pub fn channel_stream<T: Unpin>(size: usize) -> (Sender<T>, impl Stream<Item = T>) {
37 let (tx, rx) = mpsc::channel(size);
38
39 let stream = BoundedStream { recv: rx };
40
41 (tx, stream)
42}
43