| 1 | use core::pin::Pin; |
| 2 | use futures_core::stream::{FusedStream, Stream}; |
| 3 | use futures_core::task::{Context, Poll}; |
| 4 | use futures_sink::Sink; |
| 5 | use pin_project_lite::pin_project; |
| 6 | |
| 7 | pin_project! { |
| 8 | /// Sink for the [`sink_map_err`](super::SinkExt::sink_map_err) method. |
| 9 | #[derive (Debug, Clone)] |
| 10 | #[must_use = "sinks do nothing unless polled" ] |
| 11 | pub struct SinkMapErr<Si, F> { |
| 12 | #[pin] |
| 13 | sink: Si, |
| 14 | f: Option<F>, |
| 15 | } |
| 16 | } |
| 17 | |
| 18 | impl<Si, F> SinkMapErr<Si, F> { |
| 19 | pub(super) fn new(sink: Si, f: F) -> Self { |
| 20 | Self { sink, f: Some(f) } |
| 21 | } |
| 22 | |
| 23 | delegate_access_inner!(sink, Si, ()); |
| 24 | |
| 25 | fn take_f(self: Pin<&mut Self>) -> F { |
| 26 | self.project().f.take().expect(msg:"polled MapErr after completion" ) |
| 27 | } |
| 28 | } |
| 29 | |
| 30 | impl<Si, F, E, Item> Sink<Item> for SinkMapErr<Si, F> |
| 31 | where |
| 32 | Si: Sink<Item>, |
| 33 | F: FnOnce(Si::Error) -> E, |
| 34 | { |
| 35 | type Error = E; |
| 36 | |
| 37 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 38 | self.as_mut().project().sink.poll_ready(cx).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
| 39 | } |
| 40 | |
| 41 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
| 42 | self.as_mut().project().sink.start_send(item).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
| 43 | } |
| 44 | |
| 45 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 46 | self.as_mut().project().sink.poll_flush(cx).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
| 47 | } |
| 48 | |
| 49 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 50 | self.as_mut().project().sink.poll_close(cx).map_err(|e: >::Error| self.as_mut().take_f()(e)) |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | // Forwarding impl of Stream from the underlying sink |
| 55 | impl<S: Stream, F> Stream for SinkMapErr<S, F> { |
| 56 | type Item = S::Item; |
| 57 | |
| 58 | delegate_stream!(sink); |
| 59 | } |
| 60 | |
| 61 | impl<S: FusedStream, F> FusedStream for SinkMapErr<S, F> { |
| 62 | fn is_terminated(&self) -> bool { |
| 63 | self.sink.is_terminated() |
| 64 | } |
| 65 | } |
| 66 | |