1use crate::sink::{SinkExt, SinkMapErr};
2use futures_core::stream::{FusedStream, Stream};
3use futures_sink::Sink;
4use pin_project_lite::pin_project;
5
6pin_project! {
7 /// Sink for the [`sink_err_into`](super::SinkExt::sink_err_into) method.
8 #[derive(Debug)]
9 #[must_use = "sinks do nothing unless polled"]
10 pub struct SinkErrInto<Si: Sink<Item>, Item, E> {
11 #[pin]
12 sink: SinkMapErr<Si, fn(Si::Error) -> E>,
13 }
14}
15
16impl<Si, E, Item> SinkErrInto<Si, Item, E>
17where
18 Si: Sink<Item>,
19 Si::Error: Into<E>,
20{
21 pub(super) fn new(sink: Si) -> Self {
22 Self { sink: SinkExt::sink_map_err(self:sink, f:Into::into) }
23 }
24
25 delegate_access_inner!(sink, Si, (.));
26}
27
28impl<Si, Item, E> Sink<Item> for SinkErrInto<Si, Item, E>
29where
30 Si: Sink<Item>,
31 Si::Error: Into<E>,
32{
33 type Error = E;
34
35 delegate_sink!(sink, Item);
36}
37
38// Forwarding impl of Stream from the underlying sink
39impl<S, Item, E> Stream for SinkErrInto<S, Item, E>
40where
41 S: Sink<Item> + Stream,
42 S::Error: Into<E>,
43{
44 type Item = S::Item;
45
46 delegate_stream!(sink);
47}
48
49impl<S, Item, E> FusedStream for SinkErrInto<S, Item, E>
50where
51 S: Sink<Item> + FusedStream,
52 S::Error: Into<E>,
53{
54 fn is_terminated(&self) -> bool {
55 self.sink.is_terminated()
56 }
57}
58