| 1 | use core::fmt; |
| 2 | use core::pin::Pin; |
| 3 | use futures_core::ready; |
| 4 | use futures_core::stream::Stream; |
| 5 | use futures_core::task::{Context, Poll}; |
| 6 | use futures_sink::Sink; |
| 7 | |
| 8 | use crate::lock::BiLock; |
| 9 | |
| 10 | /// A `Stream` part of the split pair |
| 11 | #[derive (Debug)] |
| 12 | #[must_use = "streams do nothing unless polled" ] |
| 13 | #[cfg_attr (docsrs, doc(cfg(feature = "sink" )))] |
| 14 | pub struct SplitStream<S>(BiLock<S>); |
| 15 | |
| 16 | impl<S> Unpin for SplitStream<S> {} |
| 17 | |
| 18 | impl<S> SplitStream<S> { |
| 19 | /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`. |
| 20 | pub fn is_pair_of<Item>(&self, other: &SplitSink<S, Item>) -> bool { |
| 21 | other.is_pair_of(&self) |
| 22 | } |
| 23 | } |
| 24 | |
| 25 | impl<S: Unpin> SplitStream<S> { |
| 26 | /// Attempts to put the two "halves" of a split `Stream + Sink` back |
| 27 | /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are |
| 28 | /// a matching pair originating from the same call to `StreamExt::split`. |
| 29 | pub fn reunite<Item>(self, other: SplitSink<S, Item>) -> Result<S, ReuniteError<S, Item>> |
| 30 | where |
| 31 | S: Sink<Item>, |
| 32 | { |
| 33 | other.reunite(self) |
| 34 | } |
| 35 | } |
| 36 | |
| 37 | impl<S: Stream> Stream for SplitStream<S> { |
| 38 | type Item = S::Item; |
| 39 | |
| 40 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
| 41 | ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx) |
| 42 | } |
| 43 | } |
| 44 | |
| 45 | #[allow (non_snake_case)] |
| 46 | fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> { |
| 47 | SplitSink { lock, slot: None } |
| 48 | } |
| 49 | |
| 50 | /// A `Sink` part of the split pair |
| 51 | #[derive (Debug)] |
| 52 | #[must_use = "sinks do nothing unless polled" ] |
| 53 | #[cfg_attr (docsrs, doc(cfg(feature = "sink" )))] |
| 54 | pub struct SplitSink<S, Item> { |
| 55 | lock: BiLock<S>, |
| 56 | slot: Option<Item>, |
| 57 | } |
| 58 | |
| 59 | impl<S, Item> Unpin for SplitSink<S, Item> {} |
| 60 | |
| 61 | impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> { |
| 62 | /// Attempts to put the two "halves" of a split `Stream + Sink` back |
| 63 | /// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are |
| 64 | /// a matching pair originating from the same call to `StreamExt::split`. |
| 65 | pub fn reunite(self, other: SplitStream<S>) -> Result<S, ReuniteError<S, Item>> { |
| 66 | self.lock.reunite(other.0).map_err(|err: ReuniteError| ReuniteError(SplitSink(lock:err.0), SplitStream(err.1))) |
| 67 | } |
| 68 | } |
| 69 | |
| 70 | impl<S, Item> SplitSink<S, Item> { |
| 71 | /// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`. |
| 72 | pub fn is_pair_of(&self, other: &SplitStream<S>) -> bool { |
| 73 | self.lock.is_pair_of(&other.0) |
| 74 | } |
| 75 | } |
| 76 | |
| 77 | impl<S: Sink<Item>, Item> SplitSink<S, Item> { |
| 78 | fn poll_flush_slot( |
| 79 | mut inner: Pin<&mut S>, |
| 80 | slot: &mut Option<Item>, |
| 81 | cx: &mut Context<'_>, |
| 82 | ) -> Poll<Result<(), S::Error>> { |
| 83 | if slot.is_some() { |
| 84 | ready!(inner.as_mut().poll_ready(cx))?; |
| 85 | Poll::Ready(inner.start_send(item:slot.take().unwrap())) |
| 86 | } else { |
| 87 | Poll::Ready(Ok(())) |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | fn poll_lock_and_flush_slot( |
| 92 | mut self: Pin<&mut Self>, |
| 93 | cx: &mut Context<'_>, |
| 94 | ) -> Poll<Result<(), S::Error>> { |
| 95 | let this: &mut SplitSink = &mut *self; |
| 96 | let mut inner: BiLockGuard<'_, S> = ready!(this.lock.poll_lock(cx)); |
| 97 | Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> { |
| 102 | type Error = S::Error; |
| 103 | |
| 104 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { |
| 105 | loop { |
| 106 | if self.slot.is_none() { |
| 107 | return Poll::Ready(Ok(())); |
| 108 | } |
| 109 | ready!(self.as_mut().poll_lock_and_flush_slot(cx))?; |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), S::Error> { |
| 114 | self.slot = Some(item); |
| 115 | Ok(()) |
| 116 | } |
| 117 | |
| 118 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { |
| 119 | let this = &mut *self; |
| 120 | let mut inner = ready!(this.lock.poll_lock(cx)); |
| 121 | ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; |
| 122 | inner.as_pin_mut().poll_flush(cx) |
| 123 | } |
| 124 | |
| 125 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> { |
| 126 | let this = &mut *self; |
| 127 | let mut inner = ready!(this.lock.poll_lock(cx)); |
| 128 | ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; |
| 129 | inner.as_pin_mut().poll_close(cx) |
| 130 | } |
| 131 | } |
| 132 | |
| 133 | pub(super) fn split<S: Stream + Sink<Item>, Item>(s: S) -> (SplitSink<S, Item>, SplitStream<S>) { |
| 134 | let (a: BiLock, b: BiLock) = BiLock::new(s); |
| 135 | let read: SplitStream = SplitStream(a); |
| 136 | let write: SplitSink = SplitSink(lock:b); |
| 137 | (write, read) |
| 138 | } |
| 139 | |
| 140 | /// Error indicating a `SplitSink<S>` and `SplitStream<S>` were not two halves |
| 141 | /// of a `Stream + Split`, and thus could not be `reunite`d. |
| 142 | #[cfg_attr (docsrs, doc(cfg(feature = "sink" )))] |
| 143 | pub struct ReuniteError<T, Item>(pub SplitSink<T, Item>, pub SplitStream<T>); |
| 144 | |
| 145 | impl<T, Item> fmt::Debug for ReuniteError<T, Item> { |
| 146 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 147 | f.debug_tuple(name:"ReuniteError" ).field(&"..." ).finish() |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | impl<T, Item> fmt::Display for ReuniteError<T, Item> { |
| 152 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 153 | write!(f, "tried to reunite a SplitStream and SplitSink that don't form a pair" ) |
| 154 | } |
| 155 | } |
| 156 | |
| 157 | #[cfg (feature = "std" )] |
| 158 | impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {} |
| 159 | |
| 160 | #[cfg (test)] |
| 161 | mod tests { |
| 162 | use super::*; |
| 163 | use crate::stream::StreamExt; |
| 164 | use core::marker::PhantomData; |
| 165 | |
| 166 | struct NopStream<Item> { |
| 167 | phantom: PhantomData<Item>, |
| 168 | } |
| 169 | |
| 170 | impl<Item> Stream for NopStream<Item> { |
| 171 | type Item = Item; |
| 172 | |
| 173 | fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 174 | todo!() |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | impl<Item> Sink<Item> for NopStream<Item> { |
| 179 | type Error = (); |
| 180 | |
| 181 | fn poll_ready( |
| 182 | self: Pin<&mut Self>, |
| 183 | _cx: &mut Context<'_>, |
| 184 | ) -> Poll<Result<(), Self::Error>> { |
| 185 | todo!() |
| 186 | } |
| 187 | |
| 188 | fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { |
| 189 | todo!() |
| 190 | } |
| 191 | |
| 192 | fn poll_flush( |
| 193 | self: Pin<&mut Self>, |
| 194 | _cx: &mut Context<'_>, |
| 195 | ) -> Poll<Result<(), Self::Error>> { |
| 196 | todo!() |
| 197 | } |
| 198 | |
| 199 | fn poll_close( |
| 200 | self: Pin<&mut Self>, |
| 201 | _cx: &mut Context<'_>, |
| 202 | ) -> Poll<Result<(), Self::Error>> { |
| 203 | todo!() |
| 204 | } |
| 205 | } |
| 206 | |
| 207 | #[test ] |
| 208 | fn test_pairing() { |
| 209 | let s1 = NopStream::<()> { phantom: PhantomData }; |
| 210 | let (sink1, stream1) = s1.split(); |
| 211 | assert!(sink1.is_pair_of(&stream1)); |
| 212 | assert!(stream1.is_pair_of(&sink1)); |
| 213 | |
| 214 | let s2 = NopStream::<()> { phantom: PhantomData }; |
| 215 | let (sink2, stream2) = s2.split(); |
| 216 | assert!(sink2.is_pair_of(&stream2)); |
| 217 | assert!(stream2.is_pair_of(&sink2)); |
| 218 | |
| 219 | assert!(!sink1.is_pair_of(&stream2)); |
| 220 | assert!(!stream1.is_pair_of(&sink2)); |
| 221 | assert!(!sink2.is_pair_of(&stream1)); |
| 222 | assert!(!stream2.is_pair_of(&sink1)); |
| 223 | } |
| 224 | } |
| 225 | |