1 | use core::fmt::{Debug, Formatter, Result as FmtResult}; |
2 | use core::pin::Pin; |
3 | use futures_core::task::{Context, Poll}; |
4 | use futures_sink::Sink; |
5 | use pin_project_lite::pin_project; |
6 | |
7 | pin_project! { |
8 | /// Sink that clones incoming items and forwards them to two sinks at the same time. |
9 | /// |
10 | /// Backpressure from any downstream sink propagates up, which means that this sink |
11 | /// can only process items as fast as its _slowest_ downstream sink. |
12 | #[must_use = "sinks do nothing unless polled" ] |
13 | pub struct Fanout<Si1, Si2> { |
14 | #[pin] |
15 | sink1: Si1, |
16 | #[pin] |
17 | sink2: Si2 |
18 | } |
19 | } |
20 | |
21 | impl<Si1, Si2> Fanout<Si1, Si2> { |
22 | pub(super) fn new(sink1: Si1, sink2: Si2) -> Self { |
23 | Self { sink1, sink2 } |
24 | } |
25 | |
26 | /// Get a shared reference to the inner sinks. |
27 | pub fn get_ref(&self) -> (&Si1, &Si2) { |
28 | (&self.sink1, &self.sink2) |
29 | } |
30 | |
31 | /// Get a mutable reference to the inner sinks. |
32 | pub fn get_mut(&mut self) -> (&mut Si1, &mut Si2) { |
33 | (&mut self.sink1, &mut self.sink2) |
34 | } |
35 | |
36 | /// Get a pinned mutable reference to the inner sinks. |
37 | pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut Si1>, Pin<&mut Si2>) { |
38 | let this = self.project(); |
39 | (this.sink1, this.sink2) |
40 | } |
41 | |
42 | /// Consumes this combinator, returning the underlying sinks. |
43 | /// |
44 | /// Note that this may discard intermediate state of this combinator, |
45 | /// so care should be taken to avoid losing resources when this is called. |
46 | pub fn into_inner(self) -> (Si1, Si2) { |
47 | (self.sink1, self.sink2) |
48 | } |
49 | } |
50 | |
51 | impl<Si1: Debug, Si2: Debug> Debug for Fanout<Si1, Si2> { |
52 | fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { |
53 | f.debug_struct("Fanout" ).field("sink1" , &self.sink1).field(name:"sink2" , &self.sink2).finish() |
54 | } |
55 | } |
56 | |
57 | impl<Si1, Si2, Item> Sink<Item> for Fanout<Si1, Si2> |
58 | where |
59 | Si1: Sink<Item>, |
60 | Item: Clone, |
61 | Si2: Sink<Item, Error = Si1::Error>, |
62 | { |
63 | type Error = Si1::Error; |
64 | |
65 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
66 | let this = self.project(); |
67 | |
68 | let sink1_ready = this.sink1.poll_ready(cx)?.is_ready(); |
69 | let sink2_ready = this.sink2.poll_ready(cx)?.is_ready(); |
70 | let ready = sink1_ready && sink2_ready; |
71 | if ready { |
72 | Poll::Ready(Ok(())) |
73 | } else { |
74 | Poll::Pending |
75 | } |
76 | } |
77 | |
78 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
79 | let this = self.project(); |
80 | |
81 | this.sink1.start_send(item.clone())?; |
82 | this.sink2.start_send(item)?; |
83 | Ok(()) |
84 | } |
85 | |
86 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
87 | let this = self.project(); |
88 | |
89 | let sink1_ready = this.sink1.poll_flush(cx)?.is_ready(); |
90 | let sink2_ready = this.sink2.poll_flush(cx)?.is_ready(); |
91 | let ready = sink1_ready && sink2_ready; |
92 | if ready { |
93 | Poll::Ready(Ok(())) |
94 | } else { |
95 | Poll::Pending |
96 | } |
97 | } |
98 | |
99 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
100 | let this = self.project(); |
101 | |
102 | let sink1_ready = this.sink1.poll_close(cx)?.is_ready(); |
103 | let sink2_ready = this.sink2.poll_close(cx)?.is_ready(); |
104 | let ready = sink1_ready && sink2_ready; |
105 | if ready { |
106 | Poll::Ready(Ok(())) |
107 | } else { |
108 | Poll::Pending |
109 | } |
110 | } |
111 | } |
112 | |