1 | use crate::Stream; |
2 | use std::pin::Pin; |
3 | use std::task::{Context, Poll}; |
4 | use tokio::sync::mpsc::UnboundedReceiver; |
5 | |
6 | /// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`]. |
7 | /// |
8 | /// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver |
9 | /// [`Stream`]: trait@crate::Stream |
10 | #[derive(Debug)] |
11 | pub struct UnboundedReceiverStream<T> { |
12 | inner: UnboundedReceiver<T>, |
13 | } |
14 | |
15 | impl<T> UnboundedReceiverStream<T> { |
16 | /// Create a new `UnboundedReceiverStream`. |
17 | pub fn new(recv: UnboundedReceiver<T>) -> Self { |
18 | Self { inner: recv } |
19 | } |
20 | |
21 | /// Get back the inner `UnboundedReceiver`. |
22 | pub fn into_inner(self) -> UnboundedReceiver<T> { |
23 | self.inner |
24 | } |
25 | |
26 | /// Closes the receiving half of a channel without dropping it. |
27 | /// |
28 | /// This prevents any further messages from being sent on the channel while |
29 | /// still enabling the receiver to drain messages that are buffered. |
30 | pub fn close(&mut self) { |
31 | self.inner.close(); |
32 | } |
33 | } |
34 | |
35 | impl<T> Stream for UnboundedReceiverStream<T> { |
36 | type Item = T; |
37 | |
38 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
39 | self.inner.poll_recv(cx) |
40 | } |
41 | } |
42 | |
43 | impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { |
44 | fn as_ref(&self) -> &UnboundedReceiver<T> { |
45 | &self.inner |
46 | } |
47 | } |
48 | |
49 | impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { |
50 | fn as_mut(&mut self) -> &mut UnboundedReceiver<T> { |
51 | &mut self.inner |
52 | } |
53 | } |
54 | |
55 | impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> { |
56 | fn from(recv: UnboundedReceiver<T>) -> Self { |
57 | Self::new(recv) |
58 | } |
59 | } |
60 | |