1use core::pin::Pin;
2use futures_core::stream::{FusedStream, Stream};
3use futures_core::task::{Context, Poll};
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6
7pin_project! {
8 /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method.
9 #[derive(Debug, Clone)]
10 #[must_use = "sinks do nothing unless polled"]
11 pub struct SinkMapErr<Si, F> {
12 #[pin]
13 sink: Si,
14 f: Option<F>,
15 }
16}
17
18impl<Si, F> SinkMapErr<Si, F> {
19 pub(super) fn new(sink: Si, f: F) -> Self {
20 Self { sink, f: Some(f) }
21 }
22
23 delegate_access_inner!(sink, Si, ());
24
25 fn take_f(self: Pin<&mut Self>) -> F {
26 self.project().f.take().expect(msg:"polled MapErr after completion")
27 }
28}
29
30impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F>
31where
32 Si: Sink<Item>,
33 F: FnOnce(Si::Error) -> E,
34{
35 type Error = E;
36
37 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 self.as_mut().project().sink.poll_ready(cx).map_err(|e: >::Error| self.as_mut().take_f()(e))
39 }
40
41 fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
42 self.as_mut().project().sink.start_send(item).map_err(|e: >::Error| self.as_mut().take_f()(e))
43 }
44
45 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46 self.as_mut().project().sink.poll_flush(cx).map_err(|e: >::Error| self.as_mut().take_f()(e))
47 }
48
49 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
50 self.as_mut().project().sink.poll_close(cx).map_err(|e: >::Error| self.as_mut().take_f()(e))
51 }
52}
53
54// Forwarding impl of Stream from the underlying sink
55impl<S: Stream, F> Stream for SinkMapErr<S, F> {
56 type Item = S::Item;
57
58 delegate_stream!(sink);
59}
60
61impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> {
62 fn is_terminated(&self) -> bool {
63 self.sink.is_terminated()
64 }
65}
66