| 1 | use core::fmt; |
| 2 | use core::marker::PhantomData; |
| 3 | use core::pin::Pin; |
| 4 | use futures_core::ready; |
| 5 | use futures_core::stream::{FusedStream, Stream}; |
| 6 | use futures_core::task::{Context, Poll}; |
| 7 | use futures_sink::Sink; |
| 8 | use pin_project_lite::pin_project; |
| 9 | |
| 10 | pin_project! { |
| 11 | /// Sink for the [`with_flat_map`](super::SinkExt::with_flat_map) method. |
| 12 | #[must_use = "sinks do nothing unless polled" ] |
| 13 | pub struct WithFlatMap<Si, Item, U, St, F> { |
| 14 | #[pin] |
| 15 | sink: Si, |
| 16 | f: F, |
| 17 | #[pin] |
| 18 | stream: Option<St>, |
| 19 | buffer: Option<Item>, |
| 20 | _marker: PhantomData<fn(U)>, |
| 21 | } |
| 22 | } |
| 23 | |
| 24 | impl<Si, Item, U, St, F> fmt::Debug for WithFlatMap<Si, Item, U, St, F> |
| 25 | where |
| 26 | Si: fmt::Debug, |
| 27 | St: fmt::Debug, |
| 28 | Item: fmt::Debug, |
| 29 | { |
| 30 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 31 | f&mut DebugStruct<'_, '_>.debug_struct("WithFlatMap" ) |
| 32 | .field("sink" , &self.sink) |
| 33 | .field("stream" , &self.stream) |
| 34 | .field(name:"buffer" , &self.buffer) |
| 35 | .finish() |
| 36 | } |
| 37 | } |
| 38 | |
| 39 | impl<Si, Item, U, St, F> WithFlatMap<Si, Item, U, St, F> |
| 40 | where |
| 41 | Si: Sink<Item>, |
| 42 | F: FnMut(U) -> St, |
| 43 | St: Stream<Item = Result<Item, Si::Error>>, |
| 44 | { |
| 45 | pub(super) fn new(sink: Si, f: F) -> Self { |
| 46 | Self { sink, f, stream: None, buffer: None, _marker: PhantomData } |
| 47 | } |
| 48 | |
| 49 | delegate_access_inner!(sink, Si, ()); |
| 50 | |
| 51 | fn try_empty_stream(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { |
| 52 | let mut this = self.project(); |
| 53 | |
| 54 | if this.buffer.is_some() { |
| 55 | ready!(this.sink.as_mut().poll_ready(cx))?; |
| 56 | let item = this.buffer.take().unwrap(); |
| 57 | this.sink.as_mut().start_send(item)?; |
| 58 | } |
| 59 | if let Some(mut some_stream) = this.stream.as_mut().as_pin_mut() { |
| 60 | while let Some(item) = ready!(some_stream.as_mut().poll_next(cx)?) { |
| 61 | match this.sink.as_mut().poll_ready(cx)? { |
| 62 | Poll::Ready(()) => this.sink.as_mut().start_send(item)?, |
| 63 | Poll::Pending => { |
| 64 | *this.buffer = Some(item); |
| 65 | return Poll::Pending; |
| 66 | } |
| 67 | }; |
| 68 | } |
| 69 | } |
| 70 | this.stream.set(None); |
| 71 | Poll::Ready(Ok(())) |
| 72 | } |
| 73 | } |
| 74 | |
| 75 | // Forwarding impl of Stream from the underlying sink |
| 76 | impl<S, Item, U, St, F> Stream for WithFlatMap<S, Item, U, St, F> |
| 77 | where |
| 78 | S: Stream + Sink<Item>, |
| 79 | F: FnMut(U) -> St, |
| 80 | St: Stream<Item = Result<Item, S::Error>>, |
| 81 | { |
| 82 | type Item = S::Item; |
| 83 | |
| 84 | delegate_stream!(sink); |
| 85 | } |
| 86 | |
| 87 | impl<S, Item, U, St, F> FusedStream for WithFlatMap<S, Item, U, St, F> |
| 88 | where |
| 89 | S: FusedStream + Sink<Item>, |
| 90 | F: FnMut(U) -> St, |
| 91 | St: Stream<Item = Result<Item, S::Error>>, |
| 92 | { |
| 93 | fn is_terminated(&self) -> bool { |
| 94 | self.sink.is_terminated() |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | impl<Si, Item, U, St, F> Sink<U> for WithFlatMap<Si, Item, U, St, F> |
| 99 | where |
| 100 | Si: Sink<Item>, |
| 101 | F: FnMut(U) -> St, |
| 102 | St: Stream<Item = Result<Item, Si::Error>>, |
| 103 | { |
| 104 | type Error = Si::Error; |
| 105 | |
| 106 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 107 | self.try_empty_stream(cx) |
| 108 | } |
| 109 | |
| 110 | fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> { |
| 111 | let mut this = self.project(); |
| 112 | |
| 113 | assert!(this.stream.is_none()); |
| 114 | this.stream.set(Some((this.f)(item))); |
| 115 | Ok(()) |
| 116 | } |
| 117 | |
| 118 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 119 | ready!(self.as_mut().try_empty_stream(cx)?); |
| 120 | self.project().sink.poll_flush(cx) |
| 121 | } |
| 122 | |
| 123 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 124 | ready!(self.as_mut().try_empty_stream(cx)?); |
| 125 | self.project().sink.poll_close(cx) |
| 126 | } |
| 127 | } |
| 128 | |