1#![no_std]
2#![doc = include_str!("../README.md")]
3
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7/// A stream that produces items that are ordered according to some token.
8///
9/// The main advantage of this trait over the standard `Stream` trait is the ability to implement a
10/// [`join`](join()) function that does not either block until both source streams produce an item
11/// or contain a race condition when rejoining streams that originated from a common well-ordered
12/// source.
13pub trait OrderedStream {
14 /// The type ordered by this stream.
15 ///
16 /// Each stream must produce values that are in ascending order according to this function,
17 /// although there is no requirement that the values be strictly ascending.
18 type Ordering: Ord;
19
20 /// The unordered data carried by this stream
21 ///
22 /// This is split from the `Ordering` type to allow specifying a smaller or cheaper-to-generate
23 /// type as the ordering key. This is especially useful if you generate values to pass in to
24 /// `before`.
25 type Data;
26
27 /// Attempt to pull out the next value of this stream, registering the current task for wakeup
28 /// if needed, and returning `NoneBefore` if it is known that the stream will not produce any
29 /// more values ordered before the given point.
30 ///
31 /// # Return value
32 ///
33 /// There are several possible return values, each indicating a distinct stream state depending
34 /// on the value passed in `before`:
35 ///
36 /// - If `before` was `None`, `Poll::Pending` means that this stream's next value is not ready
37 /// yet. Implementations will ensure that the current task is notified when the next value may
38 /// be ready.
39 ///
40 /// - If `before` was `Some`, `Poll::Pending` means that this stream's next value is not ready
41 /// and that it is not yet known if the stream will produce a value ordered prior to the given
42 /// ordering value. Implementations will ensure that the current task is notified when either
43 /// the next value is ready or once it is known that no such value will be produced.
44 ///
45 /// - `Poll::Ready(PollResult::Item)` means that the stream has successfully produced
46 /// an item. The stream may produce further values on subsequent `poll_next_before` calls.
47 /// The returned ordering value must not be less than any prior ordering value returned by this
48 /// stream. The returned ordering value **may** be greater than the value passed to `before`.
49 ///
50 /// - `Poll::Ready(PollResult::Terminated)` means that the stream has terminated, and
51 /// `poll_next_before` should not be invoked again.
52 ///
53 /// - `Poll::Ready(PollResult::NoneBefore)` means that the stream will not produce
54 /// any further ordering tokens less than the given token. Subsequent `poll_next_before` calls
55 /// may still produce additional items, but their tokens will be greater than or equal to the
56 /// given token. It does not make sense to return this value if `before` was `None`.
57 fn poll_next_before(
58 self: Pin<&mut Self>,
59 cx: &mut Context<'_>,
60 before: Option<&Self::Ordering>,
61 ) -> Poll<PollResult<Self::Ordering, Self::Data>>;
62
63 /// The minimum value of the ordering for any future items.
64 ///
65 /// If this does not return `None`, the returned ordering must be less than or equal to the
66 /// ordering of any future item returned from [`Self::poll_next_before`]. This value should
67 /// (but is not required to) be greater than or equal to the ordering of the most recent item
68 /// returned.
69 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
70 None
71 }
72
73 /// Returns the bounds on the remaining length of the stream.
74 fn size_hint(&self) -> (usize, Option<usize>) {
75 (0, None)
76 }
77}
78
79/// A value that is either borrowed or owned.
80///
81/// This is similar to `std::borrow::Cow`, but does not require the ability to convert from
82/// borrowed to owned.
83#[derive(Debug)]
84pub enum MaybeBorrowed<'a, T> {
85 Borrowed(&'a T),
86 Owned(T),
87}
88
89impl<'a, T> AsRef<T> for MaybeBorrowed<'a, T> {
90 fn as_ref(&self) -> &T {
91 match self {
92 Self::Borrowed(t: &&T) => t,
93 Self::Owned(t: &T) => t,
94 }
95 }
96}
97
98impl<'a, T> core::ops::Deref for MaybeBorrowed<'a, T> {
99 type Target = T;
100
101 fn deref(&self) -> &T {
102 match self {
103 Self::Borrowed(t: &&T) => t,
104 Self::Owned(t: &T) => t,
105 }
106 }
107}
108
109impl<P> OrderedStream for Pin<P>
110where
111 P: core::ops::DerefMut + Unpin,
112 P::Target: OrderedStream,
113{
114 type Data = <P::Target as OrderedStream>::Data;
115 type Ordering = <P::Target as OrderedStream>::Ordering;
116
117 fn poll_next_before(
118 self: Pin<&mut Self>,
119 cx: &mut Context<'_>,
120 before: Option<&Self::Ordering>,
121 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
122 self.get_mut().as_mut().poll_next_before(cx, before)
123 }
124
125 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
126 (**self).position_hint()
127 }
128
129 fn size_hint(&self) -> (usize, Option<usize>) {
130 (**self).size_hint()
131 }
132}
133
134impl<S> OrderedStream for Option<S>
135where
136 S: OrderedStream,
137{
138 type Data = S::Data;
139 type Ordering = S::Ordering;
140
141 fn poll_next_before(
142 self: Pin<&mut Self>,
143 cx: &mut Context<'_>,
144 before: Option<&Self::Ordering>,
145 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
146 match self.as_pin_mut() {
147 Some(s: Pin<&mut S>) => s.poll_next_before(cx, before),
148 None => Poll::Ready(PollResult::Terminated),
149 }
150 }
151
152 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
153 self.as_ref().and_then(|s: &S| s.position_hint())
154 }
155
156 fn size_hint(&self) -> (usize, Option<usize>) {
157 self.as_ref().map_or((0, Some(0)), |s: &S| s.size_hint())
158 }
159}
160
161/// An [`OrderedStream`] that tracks if the underlying stream should be polled.
162pub trait FusedOrderedStream: OrderedStream {
163 /// Returns `true` if the stream should no longer be polled.
164 fn is_terminated(&self) -> bool;
165}
166
167impl<P> FusedOrderedStream for Pin<P>
168where
169 P: core::ops::DerefMut + Unpin,
170 P::Target: FusedOrderedStream,
171{
172 fn is_terminated(&self) -> bool {
173 (**self).is_terminated()
174 }
175}
176
177impl<S> FusedOrderedStream for Option<S>
178where
179 S: FusedOrderedStream,
180{
181 fn is_terminated(&self) -> bool {
182 self.as_ref().map_or(default:true, |s: &S| s.is_terminated())
183 }
184}
185
186/// The result of a [`OrderedStream::poll_next_before`] operation.
187#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
188pub enum PollResult<Ordering, Data> {
189 /// An item with a corresponding ordering token.
190 Item { data: Data, ordering: Ordering },
191 /// This stream will not return any items prior to the given point.
192 NoneBefore,
193 /// This stream is terminated and should not be polled again.
194 Terminated,
195}
196
197impl<D, T> PollResult<T, D> {
198 /// Extract the data from the result.
199 pub fn into_data(self) -> Option<D> {
200 match self {
201 Self::Item { data, .. } => Some(data),
202 _ => None,
203 }
204 }
205
206 /// Extract the item from the result.
207 pub fn into_tuple(self) -> Option<(T, D)> {
208 match self {
209 Self::Item { data, ordering } => Some((ordering, data)),
210 _ => None,
211 }
212 }
213
214 /// Apply a closure to the data.
215 pub fn map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R> {
216 match self {
217 Self::Item { data, ordering } => PollResult::Item {
218 data: f(data),
219 ordering,
220 },
221 Self::NoneBefore => PollResult::NoneBefore,
222 Self::Terminated => PollResult::Terminated,
223 }
224 }
225}
226
227impl<T, D, E> PollResult<T, Result<D, E>> {
228 /// Extract the error of a [`Result`] item.
229 pub fn transpose_result(self) -> Result<PollResult<T, D>, E> {
230 self.transpose_result_item().map_err(|(_, e: E)| e)
231 }
232
233 /// Extract the error and ordering from a [`Result`] item.
234 pub fn transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)> {
235 match self {
236 Self::Item {
237 data: Ok(data: D),
238 ordering: T,
239 } => Ok(PollResult::Item { data, ordering }),
240 Self::Item {
241 data: Err(data: E),
242 ordering: T,
243 } => Err((ordering, data)),
244 Self::NoneBefore => Ok(PollResult::NoneBefore),
245 Self::Terminated => Ok(PollResult::Terminated),
246 }
247 }
248}
249
250/// A [`Future`](core::future::Future) that produces an item with an associated ordering.
251///
252/// This is equivalent to an [`OrderedStream`] that always produces exactly one item. This trait
253/// is not very useful on its own; see [`FromFuture`] to convert it to a stream.
254///
255/// It is valid to implement both [`Future`](core::future::Future) and [`OrderedFuture`] on the
256/// same type. In this case, unless otherwise documented by the implementing type, neither poll
257/// function should be invoked after either returns an output value.
258pub trait OrderedFuture {
259 /// See [`OrderedStream::Ordering`].
260 type Ordering: Ord;
261
262 /// See [`OrderedStream::Data`].
263 type Output;
264
265 /// Attempt to pull out the value of this future, registering the current task for wakeup if
266 /// needed, and returning `None` if it is known that the future will not produce a value
267 /// ordered before the given point.
268 ///
269 /// # Return value
270 ///
271 /// There are several possible return values, each indicating a distinct state depending on the
272 /// value passed in `before`:
273 ///
274 /// - If `before` was `None`, `Poll::Pending` means that this future's value is not ready yet.
275 /// Implementations will ensure that the current task is notified when the next value may be
276 /// ready.
277 ///
278 /// - If `before` was `Some`, `Poll::Pending` means that this future's value is not ready and
279 /// that it is not yet known if the value will be ordered prior to the given ordering value.
280 /// Implementations will ensure that the current task is notified when either the next value is
281 /// ready or once it is known that no such value will be produced.
282 ///
283 /// - `Poll::Ready(Some(Data))` means that the future has successfully terminated. The
284 /// returned ordering value **may** be greater than the value passed to `before`. The
285 /// `poll_before` function should not be invoked again.
286 ///
287 /// - `Poll::Ready(None)` means that this future will not produce an ordering token less than
288 /// the given token. It is an error to return `None` if `before` was `None`.
289 fn poll_before(
290 self: Pin<&mut Self>,
291 cx: &mut Context<'_>,
292 before: Option<&Self::Ordering>,
293 ) -> Poll<Option<(Self::Ordering, Self::Output)>>;
294
295 /// The minimum value of the ordering of the item.
296 ///
297 /// See [`OrderedStream::position_hint`] for details.
298 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
299 None
300 }
301}
302
303mod adapters;
304pub use adapters::*;
305mod join;
306pub use join::*;
307mod multi;
308pub use multi::*;
309