1 | use crate::sink::{SinkExt, SinkMapErr}; |
2 | use futures_core::stream::{FusedStream, Stream}; |
3 | use futures_sink::Sink; |
4 | use pin_project_lite::pin_project; |
5 | |
6 | pin_project! { |
7 | /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method. |
8 | #[derive(Debug)] |
9 | #[must_use = "sinks do nothing unless polled" ] |
10 | pub struct SinkErrInto<Si: Sink<Item>, Item, E> { |
11 | #[pin] |
12 | sink: SinkMapErr<Si, fn(Si::Error) -> E>, |
13 | } |
14 | } |
15 | |
16 | impl<Si, E, Item> SinkErrInto<Si, Item, E> |
17 | where |
18 | Si: Sink<Item>, |
19 | Si::Error: Into<E>, |
20 | { |
21 | pub(super) fn new(sink: Si) -> Self { |
22 | Self { sink: SinkExt::sink_map_err(self:sink, f:Into::into) } |
23 | } |
24 | |
25 | delegate_access_inner!(sink, Si, (.)); |
26 | } |
27 | |
28 | impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E> |
29 | where |
30 | Si: Sink<Item>, |
31 | Si::Error: Into<E>, |
32 | { |
33 | type Error = E; |
34 | |
35 | delegate_sink!(sink, Item); |
36 | } |
37 | |
38 | // Forwarding impl of Stream from the underlying sink |
39 | impl<S, Item, E> Stream for SinkErrInto<S, Item, E> |
40 | where |
41 | S: Sink<Item> + Stream, |
42 | S::Error: Into<E>, |
43 | { |
44 | type Item = S::Item; |
45 | |
46 | delegate_stream!(sink); |
47 | } |
48 | |
49 | impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E> |
50 | where |
51 | S: Sink<Item> + FusedStream, |
52 | S::Error: Into<E>, |
53 | { |
54 | fn is_terminated(&self) -> bool { |
55 | self.sink.is_terminated() |
56 | } |
57 | } |
58 | |