1 | #![no_std ] |
2 | #![doc = include_str!("../README.md" )] |
3 | |
4 | use core::pin::Pin; |
5 | use core::task::{Context, Poll}; |
6 | |
7 | /// A stream that produces items that are ordered according to some token. |
8 | /// |
9 | /// The main advantage of this trait over the standard `Stream` trait is the ability to implement a |
10 | /// [`join`](join()) function that does not either block until both source streams produce an item |
11 | /// or contain a race condition when rejoining streams that originated from a common well-ordered |
12 | /// source. |
13 | pub trait OrderedStream { |
14 | /// The type ordered by this stream. |
15 | /// |
16 | /// Each stream must produce values that are in ascending order according to this function, |
17 | /// although there is no requirement that the values be strictly ascending. |
18 | type Ordering: Ord; |
19 | |
20 | /// The unordered data carried by this stream |
21 | /// |
22 | /// This is split from the `Ordering` type to allow specifying a smaller or cheaper-to-generate |
23 | /// type as the ordering key. This is especially useful if you generate values to pass in to |
24 | /// `before`. |
25 | type Data; |
26 | |
27 | /// Attempt to pull out the next value of this stream, registering the current task for wakeup |
28 | /// if needed, and returning `NoneBefore` if it is known that the stream will not produce any |
29 | /// more values ordered before the given point. |
30 | /// |
31 | /// # Return value |
32 | /// |
33 | /// There are several possible return values, each indicating a distinct stream state depending |
34 | /// on the value passed in `before`: |
35 | /// |
36 | /// - If `before` was `None`, `Poll::Pending` means that this stream's next value is not ready |
37 | /// yet. Implementations will ensure that the current task is notified when the next value may |
38 | /// be ready. |
39 | /// |
40 | /// - If `before` was `Some`, `Poll::Pending` means that this stream's next value is not ready |
41 | /// and that it is not yet known if the stream will produce a value ordered prior to the given |
42 | /// ordering value. Implementations will ensure that the current task is notified when either |
43 | /// the next value is ready or once it is known that no such value will be produced. |
44 | /// |
45 | /// - `Poll::Ready(PollResult::Item)` means that the stream has successfully produced |
46 | /// an item. The stream may produce further values on subsequent `poll_next_before` calls. |
47 | /// The returned ordering value must not be less than any prior ordering value returned by this |
48 | /// stream. The returned ordering value **may** be greater than the value passed to `before`. |
49 | /// |
50 | /// - `Poll::Ready(PollResult::Terminated)` means that the stream has terminated, and |
51 | /// `poll_next_before` should not be invoked again. |
52 | /// |
53 | /// - `Poll::Ready(PollResult::NoneBefore)` means that the stream will not produce |
54 | /// any further ordering tokens less than the given token. Subsequent `poll_next_before` calls |
55 | /// may still produce additional items, but their tokens will be greater than or equal to the |
56 | /// given token. It does not make sense to return this value if `before` was `None`. |
57 | fn poll_next_before( |
58 | self: Pin<&mut Self>, |
59 | cx: &mut Context<'_>, |
60 | before: Option<&Self::Ordering>, |
61 | ) -> Poll<PollResult<Self::Ordering, Self::Data>>; |
62 | |
63 | /// The minimum value of the ordering for any future items. |
64 | /// |
65 | /// If this does not return `None`, the returned ordering must be less than or equal to the |
66 | /// ordering of any future item returned from [`Self::poll_next_before`]. This value should |
67 | /// (but is not required to) be greater than or equal to the ordering of the most recent item |
68 | /// returned. |
69 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
70 | None |
71 | } |
72 | |
73 | /// Returns the bounds on the remaining length of the stream. |
74 | fn size_hint(&self) -> (usize, Option<usize>) { |
75 | (0, None) |
76 | } |
77 | } |
78 | |
79 | /// A value that is either borrowed or owned. |
80 | /// |
81 | /// This is similar to `std::borrow::Cow`, but does not require the ability to convert from |
82 | /// borrowed to owned. |
83 | #[derive (Debug)] |
84 | pub enum MaybeBorrowed<'a, T> { |
85 | Borrowed(&'a T), |
86 | Owned(T), |
87 | } |
88 | |
89 | impl<'a, T> AsRef<T> for MaybeBorrowed<'a, T> { |
90 | fn as_ref(&self) -> &T { |
91 | match self { |
92 | Self::Borrowed(t: &&T) => t, |
93 | Self::Owned(t: &T) => t, |
94 | } |
95 | } |
96 | } |
97 | |
98 | impl<'a, T> core::ops::Deref for MaybeBorrowed<'a, T> { |
99 | type Target = T; |
100 | |
101 | fn deref(&self) -> &T { |
102 | match self { |
103 | Self::Borrowed(t: &&T) => t, |
104 | Self::Owned(t: &T) => t, |
105 | } |
106 | } |
107 | } |
108 | |
109 | impl<P> OrderedStream for Pin<P> |
110 | where |
111 | P: core::ops::DerefMut + Unpin, |
112 | P::Target: OrderedStream, |
113 | { |
114 | type Data = <P::Target as OrderedStream>::Data; |
115 | type Ordering = <P::Target as OrderedStream>::Ordering; |
116 | |
117 | fn poll_next_before( |
118 | self: Pin<&mut Self>, |
119 | cx: &mut Context<'_>, |
120 | before: Option<&Self::Ordering>, |
121 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
122 | self.get_mut().as_mut().poll_next_before(cx, before) |
123 | } |
124 | |
125 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
126 | (**self).position_hint() |
127 | } |
128 | |
129 | fn size_hint(&self) -> (usize, Option<usize>) { |
130 | (**self).size_hint() |
131 | } |
132 | } |
133 | |
134 | impl<S> OrderedStream for Option<S> |
135 | where |
136 | S: OrderedStream, |
137 | { |
138 | type Data = S::Data; |
139 | type Ordering = S::Ordering; |
140 | |
141 | fn poll_next_before( |
142 | self: Pin<&mut Self>, |
143 | cx: &mut Context<'_>, |
144 | before: Option<&Self::Ordering>, |
145 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
146 | match self.as_pin_mut() { |
147 | Some(s: Pin<&mut S>) => s.poll_next_before(cx, before), |
148 | None => Poll::Ready(PollResult::Terminated), |
149 | } |
150 | } |
151 | |
152 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
153 | self.as_ref().and_then(|s: &S| s.position_hint()) |
154 | } |
155 | |
156 | fn size_hint(&self) -> (usize, Option<usize>) { |
157 | self.as_ref().map_or((0, Some(0)), |s: &S| s.size_hint()) |
158 | } |
159 | } |
160 | |
161 | /// An [`OrderedStream`] that tracks if the underlying stream should be polled. |
162 | pub trait FusedOrderedStream: OrderedStream { |
163 | /// Returns `true` if the stream should no longer be polled. |
164 | fn is_terminated(&self) -> bool; |
165 | } |
166 | |
167 | impl<P> FusedOrderedStream for Pin<P> |
168 | where |
169 | P: core::ops::DerefMut + Unpin, |
170 | P::Target: FusedOrderedStream, |
171 | { |
172 | fn is_terminated(&self) -> bool { |
173 | (**self).is_terminated() |
174 | } |
175 | } |
176 | |
177 | impl<S> FusedOrderedStream for Option<S> |
178 | where |
179 | S: FusedOrderedStream, |
180 | { |
181 | fn is_terminated(&self) -> bool { |
182 | self.as_ref().map_or(default:true, |s: &S| s.is_terminated()) |
183 | } |
184 | } |
185 | |
186 | /// The result of a [`OrderedStream::poll_next_before`] operation. |
187 | #[derive (Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] |
188 | pub enum PollResult<Ordering, Data> { |
189 | /// An item with a corresponding ordering token. |
190 | Item { data: Data, ordering: Ordering }, |
191 | /// This stream will not return any items prior to the given point. |
192 | NoneBefore, |
193 | /// This stream is terminated and should not be polled again. |
194 | Terminated, |
195 | } |
196 | |
197 | impl<D, T> PollResult<T, D> { |
198 | /// Extract the data from the result. |
199 | pub fn into_data(self) -> Option<D> { |
200 | match self { |
201 | Self::Item { data, .. } => Some(data), |
202 | _ => None, |
203 | } |
204 | } |
205 | |
206 | /// Extract the item from the result. |
207 | pub fn into_tuple(self) -> Option<(T, D)> { |
208 | match self { |
209 | Self::Item { data, ordering } => Some((ordering, data)), |
210 | _ => None, |
211 | } |
212 | } |
213 | |
214 | /// Apply a closure to the data. |
215 | pub fn map_data<R>(self, f: impl FnOnce(D) -> R) -> PollResult<T, R> { |
216 | match self { |
217 | Self::Item { data, ordering } => PollResult::Item { |
218 | data: f(data), |
219 | ordering, |
220 | }, |
221 | Self::NoneBefore => PollResult::NoneBefore, |
222 | Self::Terminated => PollResult::Terminated, |
223 | } |
224 | } |
225 | } |
226 | |
227 | impl<T, D, E> PollResult<T, Result<D, E>> { |
228 | /// Extract the error of a [`Result`] item. |
229 | pub fn transpose_result(self) -> Result<PollResult<T, D>, E> { |
230 | self.transpose_result_item().map_err(|(_, e: E)| e) |
231 | } |
232 | |
233 | /// Extract the error and ordering from a [`Result`] item. |
234 | pub fn transpose_result_item(self) -> Result<PollResult<T, D>, (T, E)> { |
235 | match self { |
236 | Self::Item { |
237 | data: Ok(data: D), |
238 | ordering: T, |
239 | } => Ok(PollResult::Item { data, ordering }), |
240 | Self::Item { |
241 | data: Err(data: E), |
242 | ordering: T, |
243 | } => Err((ordering, data)), |
244 | Self::NoneBefore => Ok(PollResult::NoneBefore), |
245 | Self::Terminated => Ok(PollResult::Terminated), |
246 | } |
247 | } |
248 | } |
249 | |
250 | /// A [`Future`](core::future::Future) that produces an item with an associated ordering. |
251 | /// |
252 | /// This is equivalent to an [`OrderedStream`] that always produces exactly one item. This trait |
253 | /// is not very useful on its own; see [`FromFuture`] to convert it to a stream. |
254 | /// |
255 | /// It is valid to implement both [`Future`](core::future::Future) and [`OrderedFuture`] on the |
256 | /// same type. In this case, unless otherwise documented by the implementing type, neither poll |
257 | /// function should be invoked after either returns an output value. |
258 | pub trait OrderedFuture { |
259 | /// See [`OrderedStream::Ordering`]. |
260 | type Ordering: Ord; |
261 | |
262 | /// See [`OrderedStream::Data`]. |
263 | type Output; |
264 | |
265 | /// Attempt to pull out the value of this future, registering the current task for wakeup if |
266 | /// needed, and returning `None` if it is known that the future will not produce a value |
267 | /// ordered before the given point. |
268 | /// |
269 | /// # Return value |
270 | /// |
271 | /// There are several possible return values, each indicating a distinct state depending on the |
272 | /// value passed in `before`: |
273 | /// |
274 | /// - If `before` was `None`, `Poll::Pending` means that this future's value is not ready yet. |
275 | /// Implementations will ensure that the current task is notified when the next value may be |
276 | /// ready. |
277 | /// |
278 | /// - If `before` was `Some`, `Poll::Pending` means that this future's value is not ready and |
279 | /// that it is not yet known if the value will be ordered prior to the given ordering value. |
280 | /// Implementations will ensure that the current task is notified when either the next value is |
281 | /// ready or once it is known that no such value will be produced. |
282 | /// |
283 | /// - `Poll::Ready(Some(Data))` means that the future has successfully terminated. The |
284 | /// returned ordering value **may** be greater than the value passed to `before`. The |
285 | /// `poll_before` function should not be invoked again. |
286 | /// |
287 | /// - `Poll::Ready(None)` means that this future will not produce an ordering token less than |
288 | /// the given token. It is an error to return `None` if `before` was `None`. |
289 | fn poll_before( |
290 | self: Pin<&mut Self>, |
291 | cx: &mut Context<'_>, |
292 | before: Option<&Self::Ordering>, |
293 | ) -> Poll<Option<(Self::Ordering, Self::Output)>>; |
294 | |
295 | /// The minimum value of the ordering of the item. |
296 | /// |
297 | /// See [`OrderedStream::position_hint`] for details. |
298 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
299 | None |
300 | } |
301 | } |
302 | |
303 | mod adapters; |
304 | pub use adapters::*; |
305 | mod join; |
306 | pub use join::*; |
307 | mod multi; |
308 | pub use multi::*; |
309 | |