1 | use core::pin::Pin; |
2 | use futures_core::stream::{FusedStream, Stream}; |
3 | use futures_core::task::{Context, Poll}; |
4 | use futures_sink::Sink; |
5 | use pin_project_lite::pin_project; |
6 | |
7 | pin_project! { |
8 | /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. |
9 | #[derive(Debug, Clone)] |
10 | #[must_use = "sinks do nothing unless polled" ] |
11 | pub struct SinkMapErr<Si, F> { |
12 | #[pin] |
13 | sink: Si, |
14 | f: Option<F>, |
15 | } |
16 | } |
17 | |
18 | impl<Si, F> SinkMapErr<Si, F> { |
19 | pub(super) fn new(sink: Si, f: F) -> Self { |
20 | Self { sink, f: Some(f) } |
21 | } |
22 | |
23 | delegate_access_inner!(sink, Si, ()); |
24 | |
25 | fn take_f(self: Pin<&mut Self>) -> F { |
26 | self.project().f.take().expect(msg:"polled MapErr after completion" ) |
27 | } |
28 | } |
29 | |
30 | impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F> |
31 | where |
32 | Si: Sink<Item>, |
33 | F: FnOnce(Si::Error) -> E, |
34 | { |
35 | type Error = E; |
36 | |
37 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
38 | self.as_mut().project().sink.poll_ready(cx).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
39 | } |
40 | |
41 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
42 | self.as_mut().project().sink.start_send(item).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
43 | } |
44 | |
45 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
46 | self.as_mut().project().sink.poll_flush(cx).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
47 | } |
48 | |
49 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
50 | self.as_mut().project().sink.poll_close(cx).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
51 | } |
52 | } |
53 | |
54 | // Forwarding impl of Stream from the underlying sink |
55 | impl<S: Stream, F> Stream for SinkMapErr<S, F> { |
56 | type Item = S::Item; |
57 | |
58 | delegate_stream!(sink); |
59 | } |
60 | |
61 | impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> { |
62 | fn is_terminated(&self) -> bool { |
63 | self.sink.is_terminated() |
64 | } |
65 | } |
66 | |