1 | use crate::*; |
2 | use core::future::Future; |
3 | use core::pin::Pin; |
4 | use core::task::{Context, Poll}; |
5 | use futures_core::{FusedStream, Stream}; |
6 | |
7 | /// Helpers for chaining [`OrderedStream`]s. |
8 | pub trait OrderedStreamExt: OrderedStream { |
9 | /// Apply a closure to the data. |
10 | /// |
11 | /// This does not change the ordering. |
12 | fn map<F, R>(self, f: F) -> Map<Self, F> |
13 | where |
14 | Self: Sized, |
15 | F: FnMut(Self::Data) -> R, |
16 | { |
17 | Map { stream: self, f } |
18 | } |
19 | |
20 | /// Apply a closure to the items that has access to the ordering data. |
21 | fn map_item<F, R>(self, f: F) -> MapItem<Self, F> |
22 | where |
23 | Self: Sized, |
24 | F: FnMut(&Self::Ordering, Self::Data) -> R, |
25 | { |
26 | MapItem { stream: self, f } |
27 | } |
28 | |
29 | /// Apply a closure to the items that can change the type of the ordering value. |
30 | /// |
31 | /// A bidirectional mapping for ordering values is required in order to remap `before` values. |
32 | /// It is the caller's responsibility to ensure that the items in the mapped stream still meet |
33 | /// the ordering requirements that [`OrderedStream`] expects. |
34 | fn map_ordering<NewOrdering, NewData, MapInto, MapFrom>( |
35 | self, |
36 | map_into: MapInto, |
37 | map_from: MapFrom, |
38 | ) -> MapOrdering<Self, MapInto, MapFrom> |
39 | where |
40 | Self: Sized, |
41 | MapInto: FnMut(Self::Ordering, Self::Data) -> (NewOrdering, NewData), |
42 | MapFrom: FnMut(&NewOrdering) -> Option<Self::Ordering>, |
43 | NewOrdering: Ord, |
44 | { |
45 | MapOrdering { |
46 | stream: self, |
47 | map_into, |
48 | map_from, |
49 | } |
50 | } |
51 | |
52 | fn filter<F>(self, filter: F) -> Filter<Self, F> |
53 | where |
54 | Self: Sized, |
55 | F: FnMut(&Self::Data) -> bool, |
56 | { |
57 | Filter { |
58 | stream: self, |
59 | filter, |
60 | } |
61 | } |
62 | |
63 | fn filter_map<F, R>(self, filter: F) -> FilterMap<Self, F> |
64 | where |
65 | Self: Sized, |
66 | F: FnMut(Self::Data) -> Option<R>, |
67 | { |
68 | FilterMap { |
69 | stream: self, |
70 | filter, |
71 | } |
72 | } |
73 | |
74 | /// Apply a closure that produces a [`Future`] to items, running the future on each item in |
75 | /// sequence before processing the next. |
76 | /// |
77 | /// This has the side effect of buffering items that are not before the requested ordering |
78 | /// point; you can use [`ready`](core::future::ready) as the closure to take advantage of this |
79 | /// behavior if you don't want to buffer items yourself. |
80 | fn then<F, Fut>(self, then: F) -> Then<Self, F, Fut> |
81 | where |
82 | Self: Sized, |
83 | F: FnMut(Self::Data) -> Fut, |
84 | Fut: Future, |
85 | { |
86 | Then { |
87 | stream: self, |
88 | then, |
89 | future: ThenItem::Idle, |
90 | } |
91 | } |
92 | |
93 | /// Convert this into a [`Stream`], discarding the ordering information. |
94 | fn into_stream(self) -> IntoStream<Self> |
95 | where |
96 | Self: Sized, |
97 | { |
98 | IntoStream { stream: self } |
99 | } |
100 | |
101 | /// Convert this into a [`Stream`], keeping the ordering objects. |
102 | fn into_tuple_stream(self) -> IntoTupleStream<Self> |
103 | where |
104 | Self: Sized, |
105 | { |
106 | IntoTupleStream { stream: self } |
107 | } |
108 | |
109 | /// Convert this into a [`Stream`], keeping only the ordering objects. |
110 | fn into_ordering(self) -> IntoOrdering<Self> |
111 | where |
112 | Self: Sized, |
113 | { |
114 | IntoOrdering { stream: self } |
115 | } |
116 | |
117 | /// Return the next item in this stream. |
118 | fn next(&mut self) -> Next<'_, Self> |
119 | where |
120 | Self: Unpin, |
121 | { |
122 | Next { |
123 | stream: Pin::new(self), |
124 | } |
125 | } |
126 | |
127 | /// Return a [`PollResult`] corresponding to the next item in the stream. |
128 | fn next_before<'a>(&'a mut self, before: Option<&'a Self::Ordering>) -> NextBefore<'a, Self> |
129 | where |
130 | Self: Unpin, |
131 | { |
132 | NextBefore { |
133 | stream: Pin::new(self), |
134 | before, |
135 | } |
136 | } |
137 | |
138 | fn peekable(self) -> Peekable<Self> |
139 | where |
140 | Self: Sized, |
141 | { |
142 | Peekable { |
143 | stream: self, |
144 | item: None, |
145 | is_terminated: false, |
146 | } |
147 | } |
148 | } |
149 | |
150 | impl<T: ?Sized + OrderedStream> OrderedStreamExt for T {} |
151 | |
152 | pin_project_lite::pin_project! { |
153 | /// An [`OrderedStream`] wrapper around a [`Stream`]. |
154 | /// |
155 | /// This does not use any future or past knowledge of elements, and so is suitable if the |
156 | /// stream rarely or never blocks. Prefer using [`FromStream`] if you plan to filter or join |
157 | /// this stream and want other streams to be able to make progress while this one blocks. |
158 | #[derive(Debug)] |
159 | pub struct FromStreamDirect<S, F> { |
160 | #[pin] |
161 | stream: S, |
162 | split_item: F, |
163 | } |
164 | } |
165 | |
166 | impl<S, F> FromStreamDirect<S, F> { |
167 | /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element |
168 | /// produced by the original stream. |
169 | pub fn new<Ordering, Data>(stream: S, split_item: F) -> Self |
170 | where |
171 | S: Stream, |
172 | F: FnMut(S::Item) -> (Ordering, Data), |
173 | Ordering: Ord, |
174 | { |
175 | Self { stream, split_item } |
176 | } |
177 | |
178 | /// Helper function to simplify the creation of a stream when you have a get_ordering function. |
179 | pub fn with_ordering<Ordering>( |
180 | stream: S, |
181 | mut get_ordering: F, |
182 | ) -> FromStreamDirect<S, impl FnMut(S::Item) -> (Ordering, S::Item)> |
183 | where |
184 | S: Stream, |
185 | F: FnMut(&S::Item) -> Ordering, |
186 | Ordering: Ord, |
187 | { |
188 | FromStreamDirect::new(stream, move |data| { |
189 | let ordering = get_ordering(&data); |
190 | (ordering, data) |
191 | }) |
192 | } |
193 | } |
194 | |
195 | impl<S, F, Ordering, Data> OrderedStream for FromStreamDirect<S, F> |
196 | where |
197 | S: Stream, |
198 | F: FnMut(S::Item) -> (Ordering, Data), |
199 | Ordering: Ord, |
200 | { |
201 | type Data = Data; |
202 | type Ordering = Ordering; |
203 | |
204 | fn poll_next_before( |
205 | self: Pin<&mut Self>, |
206 | cx: &mut Context<'_>, |
207 | _: Option<&Self::Ordering>, |
208 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
209 | let this: Projection<'_, S, F> = self.project(); |
210 | let split_item: &mut F = this.split_item; |
211 | this.stream.poll_next(cx).map(|opt: Option<::Item>| match opt { |
212 | None => PollResult::Terminated, |
213 | Some(data: ::Item) => { |
214 | let (ordering: Ordering, data: Data) = split_item(data); |
215 | PollResult::Item { data, ordering } |
216 | } |
217 | }) |
218 | } |
219 | |
220 | fn size_hint(&self) -> (usize, Option<usize>) { |
221 | self.stream.size_hint() |
222 | } |
223 | } |
224 | |
225 | impl<S, F, Ordering, Data> FusedOrderedStream for FromStreamDirect<S, F> |
226 | where |
227 | S: FusedStream, |
228 | F: FnMut(S::Item) -> (Ordering, Data), |
229 | Ordering: Ord, |
230 | { |
231 | fn is_terminated(&self) -> bool { |
232 | self.stream.is_terminated() |
233 | } |
234 | } |
235 | |
236 | pin_project_lite::pin_project! { |
237 | /// An [`OrderedStream`] wrapper around a [`Stream`]. |
238 | /// |
239 | /// Unlike [`FromStream`], the items in the [`Stream`] are themselves ordered with no |
240 | /// additional data. |
241 | #[derive(Debug)] |
242 | pub struct FromSortedStream<S> { |
243 | #[pin] |
244 | pub stream: S, |
245 | } |
246 | } |
247 | |
248 | impl<S> FromSortedStream<S> { |
249 | /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element |
250 | /// produced by the original stream. |
251 | pub fn new(stream: S) -> Self |
252 | where |
253 | S: Stream, |
254 | S::Item: Ord, |
255 | { |
256 | Self { stream } |
257 | } |
258 | } |
259 | |
260 | impl<S> OrderedStream for FromSortedStream<S> |
261 | where |
262 | S: Stream, |
263 | S::Item: Ord, |
264 | { |
265 | type Data = (); |
266 | type Ordering = S::Item; |
267 | |
268 | fn poll_next_before( |
269 | self: Pin<&mut Self>, |
270 | cx: &mut Context<'_>, |
271 | _: Option<&Self::Ordering>, |
272 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
273 | let this: Projection<'_, S> = self.project(); |
274 | this.stream.poll_next(cx).map(|opt: Option<::Item>| match opt { |
275 | None => PollResult::Terminated, |
276 | Some(ordering: ::Item) => PollResult::Item { data: (), ordering }, |
277 | }) |
278 | } |
279 | |
280 | fn size_hint(&self) -> (usize, Option<usize>) { |
281 | self.stream.size_hint() |
282 | } |
283 | } |
284 | |
285 | impl<S> FusedOrderedStream for FromSortedStream<S> |
286 | where |
287 | S: FusedStream, |
288 | S::Item: Ord, |
289 | { |
290 | fn is_terminated(&self) -> bool { |
291 | self.stream.is_terminated() |
292 | } |
293 | } |
294 | |
295 | pin_project_lite::pin_project! { |
296 | /// An [`OrderedStream`] wrapper around a [`Stream`]. |
297 | /// |
298 | /// This caches the last-used ordering point returned by the stream and uses it to produce |
299 | /// NoneBefore results. This makes it suitable for using to adapt streams that are filtered |
300 | /// or mapped before joining. It still relies on the original stream producing a later-ordered |
301 | /// element to allow other streams to progress, however. |
302 | #[derive(Debug)] |
303 | pub struct FromStream<S, F, Ordering> { |
304 | #[pin] |
305 | stream: S, |
306 | split_item: F, |
307 | last: Option<Ordering>, |
308 | } |
309 | } |
310 | |
311 | impl<S, F, Ordering> FromStream<S, F, Ordering> |
312 | where |
313 | S: Stream, |
314 | Ordering: Ord + Clone, |
315 | { |
316 | /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element |
317 | /// produced by the original stream. |
318 | pub fn new<Data>(stream: S, split_item: F) -> Self |
319 | where |
320 | F: FnMut(S::Item) -> (Ordering, Data), |
321 | { |
322 | FromStream { |
323 | stream, |
324 | split_item, |
325 | last: None, |
326 | } |
327 | } |
328 | |
329 | /// Helper function to simplify the creation of a stream when you have a get_ordering function. |
330 | pub fn with_ordering( |
331 | stream: S, |
332 | mut get_ordering: F, |
333 | ) -> FromStream<S, impl FnMut(S::Item) -> (Ordering, S::Item), Ordering> |
334 | where |
335 | F: FnMut(&S::Item) -> Ordering, |
336 | { |
337 | FromStream::new(stream, move |data| { |
338 | let ordering = get_ordering(&data); |
339 | (ordering, data) |
340 | }) |
341 | } |
342 | } |
343 | |
344 | impl<S, F, Ordering, Data> OrderedStream for FromStream<S, F, Ordering> |
345 | where |
346 | S: Stream, |
347 | F: FnMut(S::Item) -> (Ordering, Data), |
348 | Ordering: Ord + Clone, |
349 | { |
350 | type Data = Data; |
351 | type Ordering = Ordering; |
352 | |
353 | fn poll_next_before( |
354 | self: Pin<&mut Self>, |
355 | cx: &mut Context<'_>, |
356 | before: Option<&Self::Ordering>, |
357 | ) -> Poll<PollResult<Ordering, Data>> { |
358 | let this = self.project(); |
359 | let split_item = this.split_item; |
360 | let last = this.last; |
361 | if let (Some(last), Some(before)) = (last.as_ref(), before) { |
362 | if last >= before { |
363 | return Poll::Ready(PollResult::NoneBefore); |
364 | } |
365 | } |
366 | this.stream.poll_next(cx).map(|opt| match opt { |
367 | None => PollResult::Terminated, |
368 | Some(item) => { |
369 | let (ordering, data) = split_item(item); |
370 | *last = Some(ordering.clone()); |
371 | PollResult::Item { data, ordering } |
372 | } |
373 | }) |
374 | } |
375 | |
376 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
377 | self.last.as_ref().map(MaybeBorrowed::Borrowed) |
378 | } |
379 | |
380 | fn size_hint(&self) -> (usize, Option<usize>) { |
381 | self.stream.size_hint() |
382 | } |
383 | } |
384 | |
385 | impl<S, F, Ordering, Data> FusedOrderedStream for FromStream<S, F, Ordering> |
386 | where |
387 | S: FusedStream, |
388 | F: FnMut(S::Item) -> (Ordering, Data), |
389 | Ordering: Ord + Clone, |
390 | { |
391 | fn is_terminated(&self) -> bool { |
392 | self.stream.is_terminated() |
393 | } |
394 | } |
395 | |
396 | pin_project_lite::pin_project! { |
397 | /// A [`Stream`] for the [`into_stream`](OrderedStreamExt::into_stream) function. |
398 | #[derive(Debug)] |
399 | pub struct IntoStream<S> { |
400 | #[pin] |
401 | stream: S, |
402 | } |
403 | } |
404 | |
405 | impl<S: OrderedStream> Stream for IntoStream<S> { |
406 | type Item = S::Data; |
407 | |
408 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
409 | self.project() |
410 | .stream |
411 | .poll_next_before(cx, before:None) |
412 | .map(|r: PollResult<::Ordering, …>| r.into_data()) |
413 | } |
414 | |
415 | fn size_hint(&self) -> (usize, Option<usize>) { |
416 | self.stream.size_hint() |
417 | } |
418 | } |
419 | |
420 | impl<S> FusedStream for IntoStream<S> |
421 | where |
422 | S: FusedOrderedStream, |
423 | { |
424 | fn is_terminated(&self) -> bool { |
425 | self.stream.is_terminated() |
426 | } |
427 | } |
428 | |
429 | pin_project_lite::pin_project! { |
430 | /// A [`Stream`] for the [`into_tuple_stream`](OrderedStreamExt::into_tuple_stream) function. |
431 | #[derive(Debug)] |
432 | pub struct IntoTupleStream<S> { |
433 | #[pin] |
434 | stream: S, |
435 | } |
436 | } |
437 | |
438 | impl<S: OrderedStream> Stream for IntoTupleStream<S> { |
439 | type Item = (S::Ordering, S::Data); |
440 | |
441 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
442 | self.project() |
443 | .stream |
444 | .poll_next_before(cx, before:None) |
445 | .map(|r: PollResult<::Ordering, …>| r.into_tuple()) |
446 | } |
447 | |
448 | fn size_hint(&self) -> (usize, Option<usize>) { |
449 | self.stream.size_hint() |
450 | } |
451 | } |
452 | |
453 | impl<S> FusedStream for IntoTupleStream<S> |
454 | where |
455 | S: FusedOrderedStream, |
456 | { |
457 | fn is_terminated(&self) -> bool { |
458 | self.stream.is_terminated() |
459 | } |
460 | } |
461 | |
462 | pin_project_lite::pin_project! { |
463 | /// A [`Stream`] for the [`into_ordering`](OrderedStreamExt::into_ordering) function. |
464 | #[derive(Debug)] |
465 | pub struct IntoOrdering<S> { |
466 | #[pin] |
467 | stream: S, |
468 | } |
469 | } |
470 | |
471 | impl<S: OrderedStream> Stream for IntoOrdering<S> { |
472 | type Item = S::Ordering; |
473 | |
474 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
475 | self.project() |
476 | .stream |
477 | .poll_next_before(cx, before:None) |
478 | .map(|r: PollResult<::Ordering, …>| r.into_tuple().map(|t: (::Ordering, …)| t.0)) |
479 | } |
480 | |
481 | fn size_hint(&self) -> (usize, Option<usize>) { |
482 | self.stream.size_hint() |
483 | } |
484 | } |
485 | |
486 | impl<S> FusedStream for IntoOrdering<S> |
487 | where |
488 | S: FusedOrderedStream, |
489 | { |
490 | fn is_terminated(&self) -> bool { |
491 | self.stream.is_terminated() |
492 | } |
493 | } |
494 | |
495 | pin_project_lite::pin_project! { |
496 | /// An [`OrderedStream`] wrapper around an [`OrderedFuture`]. |
497 | #[derive(Debug)] |
498 | pub struct FromFuture<F> { |
499 | #[pin] |
500 | future: Option<F>, |
501 | } |
502 | } |
503 | |
504 | impl<F: OrderedFuture> From<F> for FromFuture<F> { |
505 | fn from(future: F) -> Self { |
506 | Self { |
507 | future: Some(future), |
508 | } |
509 | } |
510 | } |
511 | |
512 | impl<F: OrderedFuture> OrderedStream for FromFuture<F> { |
513 | type Data = F::Output; |
514 | type Ordering = F::Ordering; |
515 | |
516 | fn poll_next_before( |
517 | self: Pin<&mut Self>, |
518 | cx: &mut Context<'_>, |
519 | before: Option<&Self::Ordering>, |
520 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
521 | let mut this = self.project(); |
522 | match this.future.as_mut().as_pin_mut() { |
523 | Some(future) => match future.poll_before(cx, before) { |
524 | Poll::Ready(Some((ordering, data))) => { |
525 | this.future.set(None); |
526 | Poll::Ready(PollResult::Item { data, ordering }) |
527 | } |
528 | Poll::Ready(None) => Poll::Ready(PollResult::NoneBefore), |
529 | Poll::Pending => Poll::Pending, |
530 | }, |
531 | None => Poll::Ready(PollResult::Terminated), |
532 | } |
533 | } |
534 | |
535 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
536 | self.future.as_ref().and_then(|f| f.position_hint()) |
537 | } |
538 | |
539 | fn size_hint(&self) -> (usize, Option<usize>) { |
540 | if self.future.is_some() { |
541 | (1, Some(1)) |
542 | } else { |
543 | (0, Some(0)) |
544 | } |
545 | } |
546 | } |
547 | |
548 | impl<F: OrderedFuture> FusedOrderedStream for FromFuture<F> { |
549 | fn is_terminated(&self) -> bool { |
550 | self.future.is_none() |
551 | } |
552 | } |
553 | |
554 | pin_project_lite::pin_project! { |
555 | /// A stream for the [`map`](OrderedStreamExt::map) function. |
556 | #[derive(Debug)] |
557 | pub struct Map<S, F> { |
558 | #[pin] |
559 | stream: S, |
560 | f: F, |
561 | } |
562 | } |
563 | |
564 | impl<S, F> Map<S, F> { |
565 | /// Convert to source stream. |
566 | pub fn into_inner(self) -> S { |
567 | self.stream |
568 | } |
569 | } |
570 | |
571 | impl<S, F, R> OrderedStream for Map<S, F> |
572 | where |
573 | S: OrderedStream, |
574 | F: FnMut(S::Data) -> R, |
575 | { |
576 | type Data = R; |
577 | type Ordering = S::Ordering; |
578 | |
579 | fn poll_next_before( |
580 | self: Pin<&mut Self>, |
581 | cx: &mut Context<'_>, |
582 | before: Option<&Self::Ordering>, |
583 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
584 | let this: Projection<'_, S, F> = self.project(); |
585 | let f: &mut F = this.f; |
586 | thisPoll::Ordering, …>>.stream |
587 | .poll_next_before(cx, before) |
588 | .map(|res: PollResult<::Ordering, …>| res.map_data(f)) |
589 | } |
590 | |
591 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
592 | self.stream.position_hint() |
593 | } |
594 | |
595 | fn size_hint(&self) -> (usize, Option<usize>) { |
596 | self.stream.size_hint() |
597 | } |
598 | } |
599 | |
600 | pin_project_lite::pin_project! { |
601 | /// A stream for the [`map_item`](OrderedStreamExt::map_item) function. |
602 | #[derive(Debug)] |
603 | pub struct MapItem<S, F> { |
604 | #[pin] |
605 | stream: S, |
606 | f: F, |
607 | } |
608 | } |
609 | |
610 | impl<S, F> MapItem<S, F> { |
611 | /// Convert to source stream. |
612 | pub fn into_inner(self) -> S { |
613 | self.stream |
614 | } |
615 | } |
616 | |
617 | impl<S, F, R> OrderedStream for MapItem<S, F> |
618 | where |
619 | S: OrderedStream, |
620 | F: FnMut(&S::Ordering, S::Data) -> R, |
621 | { |
622 | type Data = R; |
623 | type Ordering = S::Ordering; |
624 | |
625 | fn poll_next_before( |
626 | self: Pin<&mut Self>, |
627 | cx: &mut Context<'_>, |
628 | before: Option<&Self::Ordering>, |
629 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
630 | let this = self.project(); |
631 | let f = this.f; |
632 | this.stream |
633 | .poll_next_before(cx, before) |
634 | .map(|res| match res { |
635 | PollResult::Item { data, ordering } => { |
636 | let data = f(&ordering, data); |
637 | PollResult::Item { data, ordering } |
638 | } |
639 | PollResult::NoneBefore => PollResult::NoneBefore, |
640 | PollResult::Terminated => PollResult::Terminated, |
641 | }) |
642 | } |
643 | |
644 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
645 | self.stream.position_hint() |
646 | } |
647 | |
648 | fn size_hint(&self) -> (usize, Option<usize>) { |
649 | self.stream.size_hint() |
650 | } |
651 | } |
652 | |
653 | pin_project_lite::pin_project! { |
654 | /// A stream for the [`map_ordering`](OrderedStreamExt::map_ordering) function. |
655 | #[derive(Debug)] |
656 | pub struct MapOrdering<S, MapInto, MapFrom> { |
657 | #[pin] |
658 | stream: S, |
659 | map_into: MapInto, map_from: MapFrom, |
660 | } |
661 | } |
662 | |
663 | impl<S, I, F> MapOrdering<S, I, F> { |
664 | /// Convert to source stream. |
665 | pub fn into_inner(self) -> S { |
666 | self.stream |
667 | } |
668 | } |
669 | |
670 | impl<S, MapInto, MapFrom, NewOrdering, NewData> OrderedStream for MapOrdering<S, MapInto, MapFrom> |
671 | where |
672 | S: OrderedStream, |
673 | MapInto: FnMut(S::Ordering, S::Data) -> (NewOrdering, NewData), |
674 | MapFrom: FnMut(&NewOrdering) -> Option<S::Ordering>, |
675 | NewOrdering: Ord, |
676 | { |
677 | type Data = NewData; |
678 | type Ordering = NewOrdering; |
679 | |
680 | fn poll_next_before( |
681 | self: Pin<&mut Self>, |
682 | cx: &mut Context<'_>, |
683 | before: Option<&Self::Ordering>, |
684 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
685 | let this = self.project(); |
686 | let map_into = this.map_into; |
687 | let before = before.and_then(this.map_from); |
688 | this.stream |
689 | .poll_next_before(cx, before.as_ref()) |
690 | .map(|res| match res { |
691 | PollResult::Item { data, ordering } => { |
692 | let (ordering, data) = map_into(ordering, data); |
693 | PollResult::Item { data, ordering } |
694 | } |
695 | PollResult::NoneBefore => PollResult::NoneBefore, |
696 | PollResult::Terminated => PollResult::Terminated, |
697 | }) |
698 | } |
699 | |
700 | fn size_hint(&self) -> (usize, Option<usize>) { |
701 | self.stream.size_hint() |
702 | } |
703 | } |
704 | |
705 | pin_project_lite::pin_project! { |
706 | /// A stream for the [`filter`](OrderedStreamExt::filter) function. |
707 | #[derive(Debug)] |
708 | pub struct Filter<S, F> { |
709 | #[pin] |
710 | stream: S, |
711 | filter: F, |
712 | } |
713 | } |
714 | |
715 | impl<S, F> Filter<S, F> { |
716 | /// Convert to source stream. |
717 | pub fn into_inner(self) -> S { |
718 | self.stream |
719 | } |
720 | } |
721 | |
722 | impl<S, F> OrderedStream for Filter<S, F> |
723 | where |
724 | S: OrderedStream, |
725 | F: FnMut(&S::Data) -> bool, |
726 | { |
727 | type Data = S::Data; |
728 | type Ordering = S::Ordering; |
729 | |
730 | fn poll_next_before( |
731 | self: Pin<&mut Self>, |
732 | cx: &mut Context<'_>, |
733 | before: Option<&Self::Ordering>, |
734 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
735 | let mut this = self.project(); |
736 | loop { |
737 | match this.stream.as_mut().poll_next_before(cx, before).into() { |
738 | PollState::Pending => return Poll::Pending, |
739 | PollState::Terminated => return Poll::Ready(PollResult::Terminated), |
740 | PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore), |
741 | PollState::Item(data, ordering) => { |
742 | if (this.filter)(&data) { |
743 | return Poll::Ready(PollResult::Item { data, ordering }); |
744 | } |
745 | } |
746 | } |
747 | } |
748 | } |
749 | |
750 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
751 | self.stream.position_hint() |
752 | } |
753 | |
754 | fn size_hint(&self) -> (usize, Option<usize>) { |
755 | (0, self.stream.size_hint().1) |
756 | } |
757 | } |
758 | |
759 | pin_project_lite::pin_project! { |
760 | /// A stream for the [`filter_map`](OrderedStreamExt::filter_map) function. |
761 | #[derive(Debug)] |
762 | pub struct FilterMap<S, F> { |
763 | #[pin] |
764 | stream: S, |
765 | filter: F, |
766 | } |
767 | } |
768 | |
769 | impl<S, F> FilterMap<S, F> { |
770 | /// Convert to source stream. |
771 | pub fn into_inner(self) -> S { |
772 | self.stream |
773 | } |
774 | } |
775 | |
776 | impl<S, F, R> OrderedStream for FilterMap<S, F> |
777 | where |
778 | S: OrderedStream, |
779 | F: FnMut(S::Data) -> Option<R>, |
780 | { |
781 | type Data = R; |
782 | type Ordering = S::Ordering; |
783 | |
784 | fn poll_next_before( |
785 | self: Pin<&mut Self>, |
786 | cx: &mut Context<'_>, |
787 | before: Option<&Self::Ordering>, |
788 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
789 | let mut this = self.project(); |
790 | loop { |
791 | match this.stream.as_mut().poll_next_before(cx, before).into() { |
792 | PollState::Pending => return Poll::Pending, |
793 | PollState::Terminated => return Poll::Ready(PollResult::Terminated), |
794 | PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore), |
795 | PollState::Item(data, ordering) => match (this.filter)(data) { |
796 | Some(data) => return Poll::Ready(PollResult::Item { data, ordering }), |
797 | None => continue, |
798 | }, |
799 | } |
800 | } |
801 | } |
802 | |
803 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
804 | self.stream.position_hint() |
805 | } |
806 | |
807 | fn size_hint(&self) -> (usize, Option<usize>) { |
808 | (0, self.stream.size_hint().1) |
809 | } |
810 | } |
811 | |
812 | pin_project_lite::pin_project! { |
813 | #[project = ThenProj] |
814 | #[project_replace = ThenDone] |
815 | #[derive(Debug)] |
816 | enum ThenItem<Fut, T> { |
817 | Running { #[pin] future: Fut, ordering: T }, |
818 | Idle, |
819 | } |
820 | } |
821 | |
822 | pin_project_lite::pin_project! { |
823 | /// A stream for the [`then`](OrderedStreamExt::then) function. |
824 | #[derive(Debug)] |
825 | pub struct Then<S, F, Fut> |
826 | where S: OrderedStream |
827 | { |
828 | #[pin] |
829 | stream: S, |
830 | then: F, |
831 | #[pin] |
832 | future: ThenItem<Fut, S::Ordering>, |
833 | } |
834 | } |
835 | |
836 | impl<S, F, Fut> OrderedStream for Then<S, F, Fut> |
837 | where |
838 | S: OrderedStream, |
839 | F: FnMut(S::Data) -> Fut, |
840 | Fut: Future, |
841 | { |
842 | type Data = Fut::Output; |
843 | type Ordering = S::Ordering; |
844 | |
845 | fn poll_next_before( |
846 | self: Pin<&mut Self>, |
847 | cx: &mut Context<'_>, |
848 | before: Option<&Self::Ordering>, |
849 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
850 | let mut this = self.project(); |
851 | loop { |
852 | if let ThenProj::Running { future, ordering } = this.future.as_mut().project() { |
853 | // Because we know the next ordering, we can answer questions about it now. |
854 | if let Some(before) = before { |
855 | if *ordering >= *before { |
856 | return Poll::Ready(PollResult::NoneBefore); |
857 | } |
858 | } |
859 | |
860 | if let Poll::Ready(data) = future.poll(cx) { |
861 | if let ThenDone::Running { ordering, .. } = |
862 | this.future.as_mut().project_replace(ThenItem::Idle) |
863 | { |
864 | return Poll::Ready(PollResult::Item { data, ordering }); |
865 | } |
866 | } else { |
867 | return Poll::Pending; |
868 | } |
869 | } |
870 | match this.stream.as_mut().poll_next_before(cx, before).into() { |
871 | PollState::Pending => return Poll::Pending, |
872 | PollState::Terminated => return Poll::Ready(PollResult::Terminated), |
873 | PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore), |
874 | PollState::Item(data, ordering) => { |
875 | this.future.set(ThenItem::Running { |
876 | future: (this.then)(data), |
877 | ordering, |
878 | }); |
879 | } |
880 | } |
881 | } |
882 | } |
883 | |
884 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
885 | match &self.future { |
886 | ThenItem::Running { ordering, .. } => Some(MaybeBorrowed::Borrowed(ordering)), |
887 | ThenItem::Idle => self.stream.position_hint(), |
888 | } |
889 | } |
890 | |
891 | fn size_hint(&self) -> (usize, Option<usize>) { |
892 | let (min, max) = self.stream.size_hint(); |
893 | match self.future { |
894 | ThenItem::Running { .. } => (min.saturating_add(1), max.and_then(|v| v.checked_add(1))), |
895 | ThenItem::Idle => (min, max), |
896 | } |
897 | } |
898 | } |
899 | |
900 | /// A future for the [`next`](OrderedStreamExt::next) function. |
901 | #[derive (Debug)] |
902 | pub struct Next<'a, S: ?Sized> { |
903 | stream: Pin<&'a mut S>, |
904 | } |
905 | |
906 | impl<'a, S: ?Sized> Unpin for Next<'a, S> {} |
907 | |
908 | impl<'a, S> Future for Next<'a, S> |
909 | where |
910 | S: OrderedStream + ?Sized, |
911 | { |
912 | type Output = Option<S::Data>; |
913 | |
914 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Data>> { |
915 | self.stream |
916 | .as_mut() |
917 | .poll_next_before(cx, before:None) |
918 | .map(PollResult::into_data) |
919 | } |
920 | } |
921 | |
922 | /// A future for the [`next_before`](OrderedStreamExt::next_before) function. |
923 | #[derive (Debug)] |
924 | pub struct NextBefore<'a, S> |
925 | where |
926 | S: OrderedStream + ?Sized, |
927 | { |
928 | stream: Pin<&'a mut S>, |
929 | before: Option<&'a S::Ordering>, |
930 | } |
931 | |
932 | impl<'a, S: OrderedStream + ?Sized> Unpin for NextBefore<'a, S> {} |
933 | |
934 | impl<'a, S> Future for NextBefore<'a, S> |
935 | where |
936 | S: OrderedStream + ?Sized, |
937 | { |
938 | type Output = PollResult<S::Ordering, S::Data>; |
939 | |
940 | fn poll( |
941 | mut self: Pin<&mut Self>, |
942 | cx: &mut Context<'_>, |
943 | ) -> Poll<PollResult<S::Ordering, S::Data>> { |
944 | let before: Option<&::Ordering> = self.before; |
945 | self.stream.as_mut().poll_next_before(cx, before) |
946 | } |
947 | } |
948 | |
949 | pin_project_lite::pin_project! { |
950 | /// A stream for the [`peekable`](OrderedStreamExt::peekable) function. |
951 | #[derive(Debug)] |
952 | pub struct Peekable<S: OrderedStream> { |
953 | #[pin] |
954 | stream: S, |
955 | is_terminated: bool, |
956 | item: Option<(S::Ordering, S::Data)>, |
957 | } |
958 | } |
959 | |
960 | impl<S: OrderedStream> Peekable<S> { |
961 | /// Convert into the source stream. |
962 | /// |
963 | /// This method returns the source stream along with any buffered item and its |
964 | /// ordering. |
965 | pub fn into_inner(self) -> (S, Option<(S::Data, S::Ordering)>) { |
966 | (self.stream, self.item.map(|(o, d)| (d, o))) |
967 | } |
968 | |
969 | /// The current item, without polling |
970 | pub(crate) fn item(&self) -> Option<&(S::Ordering, S::Data)> { |
971 | self.item.as_ref() |
972 | } |
973 | |
974 | /// Peek on the next item in the stream |
975 | pub fn poll_peek_before( |
976 | self: Pin<&mut Self>, |
977 | cx: &mut Context<'_>, |
978 | before: Option<&S::Ordering>, |
979 | ) -> Poll<PollResult<&S::Ordering, &mut S::Data>> { |
980 | let mut this = self.project(); |
981 | if *this.is_terminated { |
982 | return Poll::Ready(PollResult::Terminated); |
983 | } |
984 | let stream = this.stream.as_mut(); |
985 | if this.item.is_none() { |
986 | match stream.poll_next_before(cx, before) { |
987 | Poll::Ready(PollResult::Item { ordering, data }) => { |
988 | *this.item = Some((ordering, data)); |
989 | } |
990 | Poll::Ready(PollResult::NoneBefore) => return Poll::Ready(PollResult::NoneBefore), |
991 | Poll::Ready(PollResult::Terminated) => { |
992 | *this.is_terminated = true; |
993 | return Poll::Ready(PollResult::Terminated); |
994 | } |
995 | Poll::Pending => return Poll::Pending, |
996 | } |
997 | } |
998 | let item = this.item.as_mut().unwrap(); |
999 | Poll::Ready(PollResult::Item { |
1000 | ordering: &item.0, |
1001 | data: &mut item.1, |
1002 | }) |
1003 | } |
1004 | } |
1005 | |
1006 | impl<S: OrderedStream> OrderedStream for Peekable<S> { |
1007 | type Ordering = S::Ordering; |
1008 | type Data = S::Data; |
1009 | |
1010 | fn poll_next_before( |
1011 | mut self: Pin<&mut Self>, |
1012 | cx: &mut Context<'_>, |
1013 | before: Option<&S::Ordering>, |
1014 | ) -> Poll<PollResult<S::Ordering, S::Data>> { |
1015 | match self.as_mut().poll_peek_before(cx, before) { |
1016 | Poll::Ready(PollResult::Item { .. }) => { |
1017 | let (ordering, data) = self.project().item.take().unwrap(); |
1018 | Poll::Ready(PollResult::Item { ordering, data }) |
1019 | } |
1020 | Poll::Ready(PollResult::NoneBefore) => Poll::Ready(PollResult::NoneBefore), |
1021 | Poll::Ready(PollResult::Terminated) => Poll::Ready(PollResult::Terminated), |
1022 | Poll::Pending => Poll::Pending, |
1023 | } |
1024 | } |
1025 | |
1026 | fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> { |
1027 | match &self.item { |
1028 | Some((ordering, _)) => Some(MaybeBorrowed::Borrowed(ordering)), |
1029 | None => self.stream.position_hint(), |
1030 | } |
1031 | } |
1032 | |
1033 | fn size_hint(&self) -> (usize, Option<usize>) { |
1034 | let (min, max) = if self.is_terminated { |
1035 | (0, Some(0)) |
1036 | } else { |
1037 | self.stream.size_hint() |
1038 | }; |
1039 | if self.item.is_some() { |
1040 | (min.saturating_add(1), max.and_then(|v| v.checked_add(1))) |
1041 | } else { |
1042 | (min, max) |
1043 | } |
1044 | } |
1045 | } |
1046 | |
1047 | impl<S: OrderedStream> FusedOrderedStream for Peekable<S> { |
1048 | fn is_terminated(&self) -> bool { |
1049 | self.is_terminated |
1050 | } |
1051 | } |
1052 | |