| 1 | #![no_std ] |
| 2 | #![doc = include_str!("../README.md" )] |
| 3 | |
| 4 | use core::pin::Pin; |
| 5 | use core::task::{Context, Poll}; |
| 6 | |
| 7 | /// A stream that produces items that are ordered according to some token. |
| 8 | /// |
| 9 | /// The main advantage of this trait over the standard `Stream` trait is the ability to implement a |
| 10 | /// [`join`](join()) function that does not either block until both source streams produce an item |
| 11 | /// or contain a race condition when rejoining streams that originated from a common well-ordered |
| 12 | /// source. |
| 13 | pub trait OrderedStream { |
| 14 | /// The type ordered by this stream. |
| 15 | /// |
| 16 | /// Each stream must produce values that are in ascending order according to this function, |
| 17 | /// although there is no requirement that the values be strictly ascending. |
| 18 | type Ordering: Ord; |
| 19 | |
| 20 | /// The unordered data carried by this stream |
| 21 | /// |
| 22 | /// This is split from the `Ordering` type to allow specifying a smaller or cheaper-to-generate |
| 23 | /// type as the ordering key. This is especially useful if you generate values to pass in to |
| 24 | /// `before`. |
| 25 | type Data; |
| 26 | |
| 27 | /// Attempt to pull out the next value of this stream, registering the current task for wakeup |
| 28 | /// if needed, and returning `NoneBefore` if it is known that the stream will not produce any |
| 29 | /// more values ordered before the given point. |
| 30 | /// |
| 31 | /// # Return value |
| 32 | /// |
| 33 | /// There are several possible return values, each indicating a distinct stream state depending |
| 34 | /// on the value passed in `before`: |
| 35 | /// |
| 36 | /// - If `before` was `None`, `Poll::Pending` means that this stream's next value is not ready |
| 37 | /// yet. Implementations will ensure that the current task is notified when the next value may |
| 38 | /// be ready. |
| 39 | /// |
| 40 | /// - If `before` was `Some`, `Poll::Pending` means that this stream's next value is not ready |
| 41 | /// and that it is not yet known if the stream will produce a value ordered prior to the given |
| 42 | /// ordering value. Implementations will ensure that the current task is notified when either |
| 43 | /// the next value is ready or once it is known that no such value will be produced. |
| 44 | /// |
| 45 | /// - `Poll::Ready(PollResult::Item)` means that the stream has successfully produced |
| 46 | /// an item. The stream may produce further values on subsequent `poll_next_before` calls. |
| 47 | /// The returned ordering value must not be less than any prior ordering value returned by this |
| 48 | /// stream. The returned ordering value **may** be greater than the value passed to `before`. |
| 49 | /// |
| 50 | /// - `Poll::Ready(PollResult::Terminated)` means that the stream has terminated, and |
| 51 | /// `poll_next_before` should not be invoked again. |
| 52 | /// |
| 53 | /// - `Poll::Ready(PollResult::NoneBefore)` means that the stream will not produce |
| 54 | /// any further ordering tokens less than the given token. Subsequent `poll_next_before` calls |
| 55 | /// may still produce additional items, but their tokens will be greater than or equal to the |
| 56 | /// given token. It does not make sense to return this value if `before` was `None`. |
| 57 | fn poll_next_before( |
| 58 | self: Pin<&mut Self>, |
| 59 | cx: &mut Context<'_>, |
| 60 | before: Option<&Self::Ordering>, |
| 61 | ) -> Poll<PollResult<Self::Ordering, Self::Data>>; |
| 62 | |
| 63 | /// The minimum value of the ordering for any future items. |
| 64 | /// |
| 65 | /// If this does not return `None`, the returned ordering must be less than or equal to the |
| 66 | /// ordering of any future item returned from [`Self::poll_next_before`]. This value should |
| 67 | /// (but is not required to) be greater than or equal to the ordering of the most recent item |
| 68 | /// returned. |
| 69 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
| 70 | None |
| 71 | } |
| 72 | |
| 73 | /// Returns the bounds on the remaining length of the stream. |
| 74 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 75 | (0, None) |
| 76 | } |
| 77 | } |
| 78 | |
| 79 | /// A value that is either borrowed or owned. |
| 80 | /// |
| 81 | /// This is similar to `std::borrow::Cow`, but does not require the ability to convert from |
| 82 | /// borrowed to owned. |
| 83 | #[derive (Debug)] |
| 84 | pub enum MaybeBorrowed<'a, T> { |
| 85 | Borrowed(&'a T), |
| 86 | Owned(T), |
| 87 | } |
| 88 | |
| 89 | impl<'a, T> AsRef<T> for MaybeBorrowed<'a, T> { |
| 90 | fn as_ref(&self) -> &T { |
| 91 | match self { |
| 92 | Self::Borrowed(t: &&'a T) => t, |
| 93 | Self::Owned(t: &T) => t, |
| 94 | } |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | impl<'a, T> core::ops::Deref for MaybeBorrowed<'a, T> { |
| 99 | type Target = T; |
| 100 | |
| 101 | fn deref(&self) -> &T { |
| 102 | match self { |
| 103 | Self::Borrowed(t: &&'a T) => t, |
| 104 | Self::Owned(t: &T) => t, |
| 105 | } |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | impl<P> OrderedStream for Pin<P> |
| 110 | where |
| 111 | P: core::ops::DerefMut + Unpin, |
| 112 | P::Target: OrderedStream, |
| 113 | { |
| 114 | type Data = <P::Target as OrderedStream>::Data; |
| 115 | type Ordering = <P::Target as OrderedStream>::Ordering; |
| 116 | |
| 117 | fn poll_next_before( |
| 118 | self: Pin<&mut Self>, |
| 119 | cx: &mut Context<'_>, |
| 120 | before: Option<&Self::Ordering>, |
| 121 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
| 122 | self.get_mut().as_mut().poll_next_before(cx, before) |
| 123 | } |
| 124 | |
| 125 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
| 126 | (**self).position_hint() |
| 127 | } |
| 128 | |
| 129 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 130 | (**self).size_hint() |
| 131 | } |
| 132 | } |
| 133 | |
| 134 | impl<S> OrderedStream for Option<S> |
| 135 | where |
| 136 | S: OrderedStream, |
| 137 | { |
| 138 | type Data = S::Data; |
| 139 | type Ordering = S::Ordering; |
| 140 | |
| 141 | fn poll_next_before( |
| 142 | self: Pin<&mut Self>, |
| 143 | cx: &mut Context<'_>, |
| 144 | before: Option<&Self::Ordering>, |
| 145 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
| 146 | match self.as_pin_mut() { |
| 147 | Some(s: Pin<&mut S>) => s.poll_next_before(cx, before), |
| 148 | None => Poll::Ready(PollResult::Terminated), |
| 149 | } |
| 150 | } |
| 151 | |
| 152 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
| 153 | self.as_ref().and_then(|s: &S| s.position_hint()) |
| 154 | } |
| 155 | |
| 156 | fn size_hint(&self) -> (usize, Option<usize>) { |
| 157 | self.as_ref().map_or((0, Some(0)), |s: &S| s.size_hint()) |
| 158 | } |
| 159 | } |
| 160 | |
| 161 | /// An [`OrderedStream`] that tracks if the underlying stream should be polled. |
| 162 | pub trait FusedOrderedStream: OrderedStream { |
| 163 | /// Returns `true` if the stream should no longer be polled. |
| 164 | fn is_terminated(&self) -> bool; |
| 165 | } |
| 166 | |
| 167 | impl<P> FusedOrderedStream for Pin<P> |
| 168 | where |
| 169 | P: core::ops::DerefMut + Unpin, |
| 170 | P::Target: FusedOrderedStream, |
| 171 | { |
| 172 | fn is_terminated(&self) -> bool { |
| 173 | (**self).is_terminated() |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | impl<S> FusedOrderedStream for Option<S> |
| 178 | where |
| 179 | S: FusedOrderedStream, |
| 180 | { |
| 181 | fn is_terminated(&self) -> bool { |
| 182 | self.as_ref().map_or(default:true, |s: &S| s.is_terminated()) |
| 183 | } |
| 184 | } |
| 185 | |
| 186 | /// The result of a [`OrderedStream::poll_next_before`] operation. |
| 187 | #[derive (Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] |
| 188 | pub enum PollResult<Ordering, Data> { |
| 189 | /// An item with a corresponding ordering token. |
| 190 | Item { data: Data, ordering: Ordering }, |
| 191 | /// This stream will not return any items prior to the given point. |
| 192 | NoneBefore, |
| 193 | /// This stream is terminated and should not be polled again. |
| 194 | Terminated, |
| 195 | } |
| 196 | |
| 197 | impl<D, T> PollResult<T, D> { |
| 198 | /// Extract the data from the result. |
| 199 | pub fn into_data(self) -> Option<D> { |
| 200 | match self { |
| 201 | Self::Item { data, .. } => Some(data), |
| 202 | _ => None, |
| 203 | } |
| 204 | } |
| 205 | |
| 206 | /// Extract the item from the result. |
| 207 | pub fn into_tuple(self) -> Option<(T, D)> { |
| 208 | match self { |
| 209 | Self::Item { data, ordering } => Some((ordering, data)), |
| 210 | _ => None, |
| 211 | } |
| 212 | } |
| 213 | |
| 214 | /// Apply a closure to the data. |
| 215 | pub fn map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R> { |
| 216 | match self { |
| 217 | Self::Item { data, ordering } => PollResult::Item { |
| 218 | data: f(data), |
| 219 | ordering, |
| 220 | }, |
| 221 | Self::NoneBefore => PollResult::NoneBefore, |
| 222 | Self::Terminated => PollResult::Terminated, |
| 223 | } |
| 224 | } |
| 225 | } |
| 226 | |
| 227 | impl<T, D, E> PollResult<T, Result<D, E>> { |
| 228 | /// Extract the error of a [`Result`] item. |
| 229 | pub fn transpose_result(self) -> Result<PollResult<T, D>, E> { |
| 230 | self.transpose_result_item().map_err(|(_, e: E)| e) |
| 231 | } |
| 232 | |
| 233 | /// Extract the error and ordering from a [`Result`] item. |
| 234 | pub fn transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)> { |
| 235 | match self { |
| 236 | Self::Item { |
| 237 | data: Ok(data: D), |
| 238 | ordering: T, |
| 239 | } => Ok(PollResult::Item { data, ordering }), |
| 240 | Self::Item { |
| 241 | data: Err(data: E), |
| 242 | ordering: T, |
| 243 | } => Err((ordering, data)), |
| 244 | Self::NoneBefore => Ok(PollResult::NoneBefore), |
| 245 | Self::Terminated => Ok(PollResult::Terminated), |
| 246 | } |
| 247 | } |
| 248 | } |
| 249 | |
| 250 | /// A [`Future`](core::future::Future) that produces an item with an associated ordering. |
| 251 | /// |
| 252 | /// This is equivalent to an [`OrderedStream`] that always produces exactly one item. This trait |
| 253 | /// is not very useful on its own; see [`FromFuture`] to convert it to a stream. |
| 254 | /// |
| 255 | /// It is valid to implement both [`Future`](core::future::Future) and [`OrderedFuture`] on the |
| 256 | /// same type. In this case, unless otherwise documented by the implementing type, neither poll |
| 257 | /// function should be invoked after either returns an output value. |
| 258 | pub trait OrderedFuture { |
| 259 | /// See [`OrderedStream::Ordering`]. |
| 260 | type Ordering: Ord; |
| 261 | |
| 262 | /// See [`OrderedStream::Data`]. |
| 263 | type Output; |
| 264 | |
| 265 | /// Attempt to pull out the value of this future, registering the current task for wakeup if |
| 266 | /// needed, and returning `None` if it is known that the future will not produce a value |
| 267 | /// ordered before the given point. |
| 268 | /// |
| 269 | /// # Return value |
| 270 | /// |
| 271 | /// There are several possible return values, each indicating a distinct state depending on the |
| 272 | /// value passed in `before`: |
| 273 | /// |
| 274 | /// - If `before` was `None`, `Poll::Pending` means that this future's value is not ready yet. |
| 275 | /// Implementations will ensure that the current task is notified when the next value may be |
| 276 | /// ready. |
| 277 | /// |
| 278 | /// - If `before` was `Some`, `Poll::Pending` means that this future's value is not ready and |
| 279 | /// that it is not yet known if the value will be ordered prior to the given ordering value. |
| 280 | /// Implementations will ensure that the current task is notified when either the next value is |
| 281 | /// ready or once it is known that no such value will be produced. |
| 282 | /// |
| 283 | /// - `Poll::Ready(Some(Data))` means that the future has successfully terminated. The |
| 284 | /// returned ordering value **may** be greater than the value passed to `before`. The |
| 285 | /// `poll_before` function should not be invoked again. |
| 286 | /// |
| 287 | /// - `Poll::Ready(None)` means that this future will not produce an ordering token less than |
| 288 | /// the given token. It is an error to return `None` if `before` was `None`. |
| 289 | fn poll_before( |
| 290 | self: Pin<&mut Self>, |
| 291 | cx: &mut Context<'_>, |
| 292 | before: Option<&Self::Ordering>, |
| 293 | ) -> Poll<Option<(Self::Ordering, Self::Output)>>; |
| 294 | |
| 295 | /// The minimum value of the ordering of the item. |
| 296 | /// |
| 297 | /// See [`OrderedStream::position_hint`] for details. |
| 298 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
| 299 | None |
| 300 | } |
| 301 | } |
| 302 | |
| 303 | mod adapters; |
| 304 | pub use adapters::*; |
| 305 | mod join; |
| 306 | pub use join::*; |
| 307 | mod multi; |
| 308 | pub use multi::*; |
| 309 | |