1 | use crate::Stream; |
2 | use std::pin::Pin; |
3 | use std::task::{Context, Poll}; |
4 | use tokio::sync::mpsc::Receiver; |
5 | |
6 | /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. |
7 | /// |
8 | /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver |
9 | /// [`Stream`]: trait@crate::Stream |
10 | #[derive(Debug)] |
11 | pub struct ReceiverStream<T> { |
12 | inner: Receiver<T>, |
13 | } |
14 | |
15 | impl<T> ReceiverStream<T> { |
16 | /// Create a new `ReceiverStream`. |
17 | pub fn new(recv: Receiver<T>) -> Self { |
18 | Self { inner: recv } |
19 | } |
20 | |
21 | /// Get back the inner `Receiver`. |
22 | pub fn into_inner(self) -> Receiver<T> { |
23 | self.inner |
24 | } |
25 | |
26 | /// Closes the receiving half of a channel without dropping it. |
27 | /// |
28 | /// This prevents any further messages from being sent on the channel while |
29 | /// still enabling the receiver to drain messages that are buffered. Any |
30 | /// outstanding [`Permit`] values will still be able to send messages. |
31 | /// |
32 | /// To guarantee no messages are dropped, after calling `close()`, you must |
33 | /// receive all items from the stream until `None` is returned. |
34 | /// |
35 | /// [`Permit`]: struct@tokio::sync::mpsc::Permit |
36 | pub fn close(&mut self) { |
37 | self.inner.close(); |
38 | } |
39 | } |
40 | |
41 | impl<T> Stream for ReceiverStream<T> { |
42 | type Item = T; |
43 | |
44 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
45 | self.inner.poll_recv(cx) |
46 | } |
47 | } |
48 | |
49 | impl<T> AsRef<Receiver<T>> for ReceiverStream<T> { |
50 | fn as_ref(&self) -> &Receiver<T> { |
51 | &self.inner |
52 | } |
53 | } |
54 | |
55 | impl<T> AsMut<Receiver<T>> for ReceiverStream<T> { |
56 | fn as_mut(&mut self) -> &mut Receiver<T> { |
57 | &mut self.inner |
58 | } |
59 | } |
60 | |
61 | impl<T> From<Receiver<T>> for ReceiverStream<T> { |
62 | fn from(recv: Receiver<T>) -> Self { |
63 | Self::new(recv) |
64 | } |
65 | } |
66 | |