1use crate::*;
2use core::future::Future;
3use core::pin::Pin;
4use core::task::{Context, Poll};
5use futures_core::{FusedStream, Stream};
6
7/// Helpers for chaining [`OrderedStream`]s.
8pub trait OrderedStreamExt: OrderedStream {
9 /// Apply a closure to the data.
10 ///
11 /// This does not change the ordering.
12 fn map<F, R>(self, f: F) -> Map<Self, F>
13 where
14 Self: Sized,
15 F: FnMut(Self::Data) -> R,
16 {
17 Map { stream: self, f }
18 }
19
20 /// Apply a closure to the items that has access to the ordering data.
21 fn map_item<F, R>(self, f: F) -> MapItem<Self, F>
22 where
23 Self: Sized,
24 F: FnMut(&Self::Ordering, Self::Data) -> R,
25 {
26 MapItem { stream: self, f }
27 }
28
29 /// Apply a closure to the items that can change the type of the ordering value.
30 ///
31 /// A bidirectional mapping for ordering values is required in order to remap `before` values.
32 /// It is the caller's responsibility to ensure that the items in the mapped stream still meet
33 /// the ordering requirements that [`OrderedStream`] expects.
34 fn map_ordering<NewOrdering, NewData, MapInto, MapFrom>(
35 self,
36 map_into: MapInto,
37 map_from: MapFrom,
38 ) -> MapOrdering<Self, MapInto, MapFrom>
39 where
40 Self: Sized,
41 MapInto: FnMut(Self::Ordering, Self::Data) -> (NewOrdering, NewData),
42 MapFrom: FnMut(&NewOrdering) -> Option<Self::Ordering>,
43 NewOrdering: Ord,
44 {
45 MapOrdering {
46 stream: self,
47 map_into,
48 map_from,
49 }
50 }
51
52 fn filter<F>(self, filter: F) -> Filter<Self, F>
53 where
54 Self: Sized,
55 F: FnMut(&Self::Data) -> bool,
56 {
57 Filter {
58 stream: self,
59 filter,
60 }
61 }
62
63 fn filter_map<F, R>(self, filter: F) -> FilterMap<Self, F>
64 where
65 Self: Sized,
66 F: FnMut(Self::Data) -> Option<R>,
67 {
68 FilterMap {
69 stream: self,
70 filter,
71 }
72 }
73
74 /// Apply a closure that produces a [`Future`] to items, running the future on each item in
75 /// sequence before processing the next.
76 ///
77 /// This has the side effect of buffering items that are not before the requested ordering
78 /// point; you can use [`ready`](core::future::ready) as the closure to take advantage of this
79 /// behavior if you don't want to buffer items yourself.
80 fn then<F, Fut>(self, then: F) -> Then<Self, F, Fut>
81 where
82 Self: Sized,
83 F: FnMut(Self::Data) -> Fut,
84 Fut: Future,
85 {
86 Then {
87 stream: self,
88 then,
89 future: ThenItem::Idle,
90 }
91 }
92
93 /// Convert this into a [`Stream`], discarding the ordering information.
94 fn into_stream(self) -> IntoStream<Self>
95 where
96 Self: Sized,
97 {
98 IntoStream { stream: self }
99 }
100
101 /// Convert this into a [`Stream`], keeping the ordering objects.
102 fn into_tuple_stream(self) -> IntoTupleStream<Self>
103 where
104 Self: Sized,
105 {
106 IntoTupleStream { stream: self }
107 }
108
109 /// Convert this into a [`Stream`], keeping only the ordering objects.
110 fn into_ordering(self) -> IntoOrdering<Self>
111 where
112 Self: Sized,
113 {
114 IntoOrdering { stream: self }
115 }
116
117 /// Return the next item in this stream.
118 fn next(&mut self) -> Next<'_, Self>
119 where
120 Self: Unpin,
121 {
122 Next {
123 stream: Pin::new(self),
124 }
125 }
126
127 /// Return a [`PollResult`] corresponding to the next item in the stream.
128 fn next_before<'a>(&'a mut self, before: Option<&'a Self::Ordering>) -> NextBefore<'a, Self>
129 where
130 Self: Unpin,
131 {
132 NextBefore {
133 stream: Pin::new(self),
134 before,
135 }
136 }
137
138 fn peekable(self) -> Peekable<Self>
139 where
140 Self: Sized,
141 {
142 Peekable {
143 stream: self,
144 item: None,
145 is_terminated: false,
146 }
147 }
148}
149
150impl<T: ?Sized + OrderedStream> OrderedStreamExt for T {}
151
152pin_project_lite::pin_project! {
153 /// An [`OrderedStream`] wrapper around a [`Stream`].
154 ///
155 /// This does not use any future or past knowledge of elements, and so is suitable if the
156 /// stream rarely or never blocks. Prefer using [`FromStream`] if you plan to filter or join
157 /// this stream and want other streams to be able to make progress while this one blocks.
158 #[derive(Debug)]
159 pub struct FromStreamDirect<S, F> {
160 #[pin]
161 stream: S,
162 split_item: F,
163 }
164}
165
166impl<S, F> FromStreamDirect<S, F> {
167 /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element
168 /// produced by the original stream.
169 pub fn new<Ordering, Data>(stream: S, split_item: F) -> Self
170 where
171 S: Stream,
172 F: FnMut(S::Item) -> (Ordering, Data),
173 Ordering: Ord,
174 {
175 Self { stream, split_item }
176 }
177
178 /// Helper function to simplify the creation of a stream when you have a get_ordering function.
179 pub fn with_ordering<Ordering>(
180 stream: S,
181 mut get_ordering: F,
182 ) -> FromStreamDirect<S, impl FnMut(S::Item) -> (Ordering, S::Item)>
183 where
184 S: Stream,
185 F: FnMut(&S::Item) -> Ordering,
186 Ordering: Ord,
187 {
188 FromStreamDirect::new(stream, move |data| {
189 let ordering = get_ordering(&data);
190 (ordering, data)
191 })
192 }
193}
194
195impl<S, F, Ordering, Data> OrderedStream for FromStreamDirect<S, F>
196where
197 S: Stream,
198 F: FnMut(S::Item) -> (Ordering, Data),
199 Ordering: Ord,
200{
201 type Data = Data;
202 type Ordering = Ordering;
203
204 fn poll_next_before(
205 self: Pin<&mut Self>,
206 cx: &mut Context<'_>,
207 _: Option<&Self::Ordering>,
208 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
209 let this: Projection<'_, S, F> = self.project();
210 let split_item: &mut F = this.split_item;
211 this.stream.poll_next(cx).map(|opt: Option<::Item>| match opt {
212 None => PollResult::Terminated,
213 Some(data: ::Item) => {
214 let (ordering: Ordering, data: Data) = split_item(data);
215 PollResult::Item { data, ordering }
216 }
217 })
218 }
219
220 fn size_hint(&self) -> (usize, Option<usize>) {
221 self.stream.size_hint()
222 }
223}
224
225impl<S, F, Ordering, Data> FusedOrderedStream for FromStreamDirect<S, F>
226where
227 S: FusedStream,
228 F: FnMut(S::Item) -> (Ordering, Data),
229 Ordering: Ord,
230{
231 fn is_terminated(&self) -> bool {
232 self.stream.is_terminated()
233 }
234}
235
236pin_project_lite::pin_project! {
237 /// An [`OrderedStream`] wrapper around a [`Stream`].
238 ///
239 /// Unlike [`FromStream`], the items in the [`Stream`] are themselves ordered with no
240 /// additional data.
241 #[derive(Debug)]
242 pub struct FromSortedStream<S> {
243 #[pin]
244 pub stream: S,
245 }
246}
247
248impl<S> FromSortedStream<S> {
249 /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element
250 /// produced by the original stream.
251 pub fn new(stream: S) -> Self
252 where
253 S: Stream,
254 S::Item: Ord,
255 {
256 Self { stream }
257 }
258}
259
260impl<S> OrderedStream for FromSortedStream<S>
261where
262 S: Stream,
263 S::Item: Ord,
264{
265 type Data = ();
266 type Ordering = S::Item;
267
268 fn poll_next_before(
269 self: Pin<&mut Self>,
270 cx: &mut Context<'_>,
271 _: Option<&Self::Ordering>,
272 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
273 let this: Projection<'_, S> = self.project();
274 this.stream.poll_next(cx).map(|opt: Option<::Item>| match opt {
275 None => PollResult::Terminated,
276 Some(ordering: ::Item) => PollResult::Item { data: (), ordering },
277 })
278 }
279
280 fn size_hint(&self) -> (usize, Option<usize>) {
281 self.stream.size_hint()
282 }
283}
284
285impl<S> FusedOrderedStream for FromSortedStream<S>
286where
287 S: FusedStream,
288 S::Item: Ord,
289{
290 fn is_terminated(&self) -> bool {
291 self.stream.is_terminated()
292 }
293}
294
295pin_project_lite::pin_project! {
296 /// An [`OrderedStream`] wrapper around a [`Stream`].
297 ///
298 /// This caches the last-used ordering point returned by the stream and uses it to produce
299 /// NoneBefore results. This makes it suitable for using to adapt streams that are filtered
300 /// or mapped before joining. It still relies on the original stream producing a later-ordered
301 /// element to allow other streams to progress, however.
302 #[derive(Debug)]
303 pub struct FromStream<S, F, Ordering> {
304 #[pin]
305 stream: S,
306 split_item: F,
307 last: Option<Ordering>,
308 }
309}
310
311impl<S, F, Ordering> FromStream<S, F, Ordering>
312where
313 S: Stream,
314 Ordering: Ord + Clone,
315{
316 /// Create a new [`OrderedStream`] by applying a `split_item` closure to each element
317 /// produced by the original stream.
318 pub fn new<Data>(stream: S, split_item: F) -> Self
319 where
320 F: FnMut(S::Item) -> (Ordering, Data),
321 {
322 FromStream {
323 stream,
324 split_item,
325 last: None,
326 }
327 }
328
329 /// Helper function to simplify the creation of a stream when you have a get_ordering function.
330 pub fn with_ordering(
331 stream: S,
332 mut get_ordering: F,
333 ) -> FromStream<S, impl FnMut(S::Item) -> (Ordering, S::Item), Ordering>
334 where
335 F: FnMut(&S::Item) -> Ordering,
336 {
337 FromStream::new(stream, move |data| {
338 let ordering = get_ordering(&data);
339 (ordering, data)
340 })
341 }
342}
343
344impl<S, F, Ordering, Data> OrderedStream for FromStream<S, F, Ordering>
345where
346 S: Stream,
347 F: FnMut(S::Item) -> (Ordering, Data),
348 Ordering: Ord + Clone,
349{
350 type Data = Data;
351 type Ordering = Ordering;
352
353 fn poll_next_before(
354 self: Pin<&mut Self>,
355 cx: &mut Context<'_>,
356 before: Option<&Self::Ordering>,
357 ) -> Poll<PollResult<Ordering, Data>> {
358 let this = self.project();
359 let split_item = this.split_item;
360 let last = this.last;
361 if let (Some(last), Some(before)) = (last.as_ref(), before) {
362 if last >= before {
363 return Poll::Ready(PollResult::NoneBefore);
364 }
365 }
366 this.stream.poll_next(cx).map(|opt| match opt {
367 None => PollResult::Terminated,
368 Some(item) => {
369 let (ordering, data) = split_item(item);
370 *last = Some(ordering.clone());
371 PollResult::Item { data, ordering }
372 }
373 })
374 }
375
376 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
377 self.last.as_ref().map(MaybeBorrowed::Borrowed)
378 }
379
380 fn size_hint(&self) -> (usize, Option<usize>) {
381 self.stream.size_hint()
382 }
383}
384
385impl<S, F, Ordering, Data> FusedOrderedStream for FromStream<S, F, Ordering>
386where
387 S: FusedStream,
388 F: FnMut(S::Item) -> (Ordering, Data),
389 Ordering: Ord + Clone,
390{
391 fn is_terminated(&self) -> bool {
392 self.stream.is_terminated()
393 }
394}
395
396pin_project_lite::pin_project! {
397 /// A [`Stream`] for the [`into_stream`](OrderedStreamExt::into_stream) function.
398 #[derive(Debug)]
399 pub struct IntoStream<S> {
400 #[pin]
401 stream: S,
402 }
403}
404
405impl<S: OrderedStream> Stream for IntoStream<S> {
406 type Item = S::Data;
407
408 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
409 self.project()
410 .stream
411 .poll_next_before(cx, before:None)
412 .map(|r: PollResult<::Ordering, …>| r.into_data())
413 }
414
415 fn size_hint(&self) -> (usize, Option<usize>) {
416 self.stream.size_hint()
417 }
418}
419
420impl<S> FusedStream for IntoStream<S>
421where
422 S: FusedOrderedStream,
423{
424 fn is_terminated(&self) -> bool {
425 self.stream.is_terminated()
426 }
427}
428
429pin_project_lite::pin_project! {
430 /// A [`Stream`] for the [`into_tuple_stream`](OrderedStreamExt::into_tuple_stream) function.
431 #[derive(Debug)]
432 pub struct IntoTupleStream<S> {
433 #[pin]
434 stream: S,
435 }
436}
437
438impl<S: OrderedStream> Stream for IntoTupleStream<S> {
439 type Item = (S::Ordering, S::Data);
440
441 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
442 self.project()
443 .stream
444 .poll_next_before(cx, before:None)
445 .map(|r: PollResult<::Ordering, …>| r.into_tuple())
446 }
447
448 fn size_hint(&self) -> (usize, Option<usize>) {
449 self.stream.size_hint()
450 }
451}
452
453impl<S> FusedStream for IntoTupleStream<S>
454where
455 S: FusedOrderedStream,
456{
457 fn is_terminated(&self) -> bool {
458 self.stream.is_terminated()
459 }
460}
461
462pin_project_lite::pin_project! {
463 /// A [`Stream`] for the [`into_ordering`](OrderedStreamExt::into_ordering) function.
464 #[derive(Debug)]
465 pub struct IntoOrdering<S> {
466 #[pin]
467 stream: S,
468 }
469}
470
471impl<S: OrderedStream> Stream for IntoOrdering<S> {
472 type Item = S::Ordering;
473
474 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
475 self.project()
476 .stream
477 .poll_next_before(cx, before:None)
478 .map(|r: PollResult<::Ordering, …>| r.into_tuple().map(|t: (::Ordering, …)| t.0))
479 }
480
481 fn size_hint(&self) -> (usize, Option<usize>) {
482 self.stream.size_hint()
483 }
484}
485
486impl<S> FusedStream for IntoOrdering<S>
487where
488 S: FusedOrderedStream,
489{
490 fn is_terminated(&self) -> bool {
491 self.stream.is_terminated()
492 }
493}
494
495pin_project_lite::pin_project! {
496 /// An [`OrderedStream`] wrapper around an [`OrderedFuture`].
497 #[derive(Debug)]
498 pub struct FromFuture<F> {
499 #[pin]
500 future: Option<F>,
501 }
502}
503
504impl<F: OrderedFuture> From<F> for FromFuture<F> {
505 fn from(future: F) -> Self {
506 Self {
507 future: Some(future),
508 }
509 }
510}
511
512impl<F: OrderedFuture> OrderedStream for FromFuture<F> {
513 type Data = F::Output;
514 type Ordering = F::Ordering;
515
516 fn poll_next_before(
517 self: Pin<&mut Self>,
518 cx: &mut Context<'_>,
519 before: Option<&Self::Ordering>,
520 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
521 let mut this = self.project();
522 match this.future.as_mut().as_pin_mut() {
523 Some(future) => match future.poll_before(cx, before) {
524 Poll::Ready(Some((ordering, data))) => {
525 this.future.set(None);
526 Poll::Ready(PollResult::Item { data, ordering })
527 }
528 Poll::Ready(None) => Poll::Ready(PollResult::NoneBefore),
529 Poll::Pending => Poll::Pending,
530 },
531 None => Poll::Ready(PollResult::Terminated),
532 }
533 }
534
535 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
536 self.future.as_ref().and_then(|f| f.position_hint())
537 }
538
539 fn size_hint(&self) -> (usize, Option<usize>) {
540 if self.future.is_some() {
541 (1, Some(1))
542 } else {
543 (0, Some(0))
544 }
545 }
546}
547
548impl<F: OrderedFuture> FusedOrderedStream for FromFuture<F> {
549 fn is_terminated(&self) -> bool {
550 self.future.is_none()
551 }
552}
553
554pin_project_lite::pin_project! {
555 /// A stream for the [`map`](OrderedStreamExt::map) function.
556 #[derive(Debug)]
557 pub struct Map<S, F> {
558 #[pin]
559 stream: S,
560 f: F,
561 }
562}
563
564impl<S, F> Map<S, F> {
565 /// Convert to source stream.
566 pub fn into_inner(self) -> S {
567 self.stream
568 }
569}
570
571impl<S, F, R> OrderedStream for Map<S, F>
572where
573 S: OrderedStream,
574 F: FnMut(S::Data) -> R,
575{
576 type Data = R;
577 type Ordering = S::Ordering;
578
579 fn poll_next_before(
580 self: Pin<&mut Self>,
581 cx: &mut Context<'_>,
582 before: Option<&Self::Ordering>,
583 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
584 let this: Projection<'_, S, F> = self.project();
585 let f: &mut F = this.f;
586 thisPoll::Ordering, …>>.stream
587 .poll_next_before(cx, before)
588 .map(|res: PollResult<::Ordering, …>| res.map_data(f))
589 }
590
591 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
592 self.stream.position_hint()
593 }
594
595 fn size_hint(&self) -> (usize, Option<usize>) {
596 self.stream.size_hint()
597 }
598}
599
600pin_project_lite::pin_project! {
601 /// A stream for the [`map_item`](OrderedStreamExt::map_item) function.
602 #[derive(Debug)]
603 pub struct MapItem<S, F> {
604 #[pin]
605 stream: S,
606 f: F,
607 }
608}
609
610impl<S, F> MapItem<S, F> {
611 /// Convert to source stream.
612 pub fn into_inner(self) -> S {
613 self.stream
614 }
615}
616
617impl<S, F, R> OrderedStream for MapItem<S, F>
618where
619 S: OrderedStream,
620 F: FnMut(&S::Ordering, S::Data) -> R,
621{
622 type Data = R;
623 type Ordering = S::Ordering;
624
625 fn poll_next_before(
626 self: Pin<&mut Self>,
627 cx: &mut Context<'_>,
628 before: Option<&Self::Ordering>,
629 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
630 let this = self.project();
631 let f = this.f;
632 this.stream
633 .poll_next_before(cx, before)
634 .map(|res| match res {
635 PollResult::Item { data, ordering } => {
636 let data = f(&ordering, data);
637 PollResult::Item { data, ordering }
638 }
639 PollResult::NoneBefore => PollResult::NoneBefore,
640 PollResult::Terminated => PollResult::Terminated,
641 })
642 }
643
644 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
645 self.stream.position_hint()
646 }
647
648 fn size_hint(&self) -> (usize, Option<usize>) {
649 self.stream.size_hint()
650 }
651}
652
653pin_project_lite::pin_project! {
654 /// A stream for the [`map_ordering`](OrderedStreamExt::map_ordering) function.
655 #[derive(Debug)]
656 pub struct MapOrdering<S, MapInto, MapFrom> {
657 #[pin]
658 stream: S,
659 map_into: MapInto, map_from: MapFrom,
660 }
661}
662
663impl<S, I, F> MapOrdering<S, I, F> {
664 /// Convert to source stream.
665 pub fn into_inner(self) -> S {
666 self.stream
667 }
668}
669
670impl<S, MapInto, MapFrom, NewOrdering, NewData> OrderedStream for MapOrdering<S, MapInto, MapFrom>
671where
672 S: OrderedStream,
673 MapInto: FnMut(S::Ordering, S::Data) -> (NewOrdering, NewData),
674 MapFrom: FnMut(&NewOrdering) -> Option<S::Ordering>,
675 NewOrdering: Ord,
676{
677 type Data = NewData;
678 type Ordering = NewOrdering;
679
680 fn poll_next_before(
681 self: Pin<&mut Self>,
682 cx: &mut Context<'_>,
683 before: Option<&Self::Ordering>,
684 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
685 let this = self.project();
686 let map_into = this.map_into;
687 let before = before.and_then(this.map_from);
688 this.stream
689 .poll_next_before(cx, before.as_ref())
690 .map(|res| match res {
691 PollResult::Item { data, ordering } => {
692 let (ordering, data) = map_into(ordering, data);
693 PollResult::Item { data, ordering }
694 }
695 PollResult::NoneBefore => PollResult::NoneBefore,
696 PollResult::Terminated => PollResult::Terminated,
697 })
698 }
699
700 fn size_hint(&self) -> (usize, Option<usize>) {
701 self.stream.size_hint()
702 }
703}
704
705pin_project_lite::pin_project! {
706 /// A stream for the [`filter`](OrderedStreamExt::filter) function.
707 #[derive(Debug)]
708 pub struct Filter<S, F> {
709 #[pin]
710 stream: S,
711 filter: F,
712 }
713}
714
715impl<S, F> Filter<S, F> {
716 /// Convert to source stream.
717 pub fn into_inner(self) -> S {
718 self.stream
719 }
720}
721
722impl<S, F> OrderedStream for Filter<S, F>
723where
724 S: OrderedStream,
725 F: FnMut(&S::Data) -> bool,
726{
727 type Data = S::Data;
728 type Ordering = S::Ordering;
729
730 fn poll_next_before(
731 self: Pin<&mut Self>,
732 cx: &mut Context<'_>,
733 before: Option<&Self::Ordering>,
734 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
735 let mut this = self.project();
736 loop {
737 match this.stream.as_mut().poll_next_before(cx, before).into() {
738 PollState::Pending => return Poll::Pending,
739 PollState::Terminated => return Poll::Ready(PollResult::Terminated),
740 PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
741 PollState::Item(data, ordering) => {
742 if (this.filter)(&data) {
743 return Poll::Ready(PollResult::Item { data, ordering });
744 }
745 }
746 }
747 }
748 }
749
750 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
751 self.stream.position_hint()
752 }
753
754 fn size_hint(&self) -> (usize, Option<usize>) {
755 (0, self.stream.size_hint().1)
756 }
757}
758
759pin_project_lite::pin_project! {
760 /// A stream for the [`filter_map`](OrderedStreamExt::filter_map) function.
761 #[derive(Debug)]
762 pub struct FilterMap<S, F> {
763 #[pin]
764 stream: S,
765 filter: F,
766 }
767}
768
769impl<S, F> FilterMap<S, F> {
770 /// Convert to source stream.
771 pub fn into_inner(self) -> S {
772 self.stream
773 }
774}
775
776impl<S, F, R> OrderedStream for FilterMap<S, F>
777where
778 S: OrderedStream,
779 F: FnMut(S::Data) -> Option<R>,
780{
781 type Data = R;
782 type Ordering = S::Ordering;
783
784 fn poll_next_before(
785 self: Pin<&mut Self>,
786 cx: &mut Context<'_>,
787 before: Option<&Self::Ordering>,
788 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
789 let mut this = self.project();
790 loop {
791 match this.stream.as_mut().poll_next_before(cx, before).into() {
792 PollState::Pending => return Poll::Pending,
793 PollState::Terminated => return Poll::Ready(PollResult::Terminated),
794 PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
795 PollState::Item(data, ordering) => match (this.filter)(data) {
796 Some(data) => return Poll::Ready(PollResult::Item { data, ordering }),
797 None => continue,
798 },
799 }
800 }
801 }
802
803 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
804 self.stream.position_hint()
805 }
806
807 fn size_hint(&self) -> (usize, Option<usize>) {
808 (0, self.stream.size_hint().1)
809 }
810}
811
812pin_project_lite::pin_project! {
813 #[project = ThenProj]
814 #[project_replace = ThenDone]
815 #[derive(Debug)]
816 enum ThenItem<Fut, T> {
817 Running { #[pin] future: Fut, ordering: T },
818 Idle,
819 }
820}
821
822pin_project_lite::pin_project! {
823 /// A stream for the [`then`](OrderedStreamExt::then) function.
824 #[derive(Debug)]
825 pub struct Then<S, F, Fut>
826 where S: OrderedStream
827 {
828 #[pin]
829 stream: S,
830 then: F,
831 #[pin]
832 future: ThenItem<Fut, S::Ordering>,
833 }
834}
835
836impl<S, F, Fut> OrderedStream for Then<S, F, Fut>
837where
838 S: OrderedStream,
839 F: FnMut(S::Data) -> Fut,
840 Fut: Future,
841{
842 type Data = Fut::Output;
843 type Ordering = S::Ordering;
844
845 fn poll_next_before(
846 self: Pin<&mut Self>,
847 cx: &mut Context<'_>,
848 before: Option<&Self::Ordering>,
849 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
850 let mut this = self.project();
851 loop {
852 if let ThenProj::Running { future, ordering } = this.future.as_mut().project() {
853 // Because we know the next ordering, we can answer questions about it now.
854 if let Some(before) = before {
855 if *ordering >= *before {
856 return Poll::Ready(PollResult::NoneBefore);
857 }
858 }
859
860 if let Poll::Ready(data) = future.poll(cx) {
861 if let ThenDone::Running { ordering, .. } =
862 this.future.as_mut().project_replace(ThenItem::Idle)
863 {
864 return Poll::Ready(PollResult::Item { data, ordering });
865 }
866 } else {
867 return Poll::Pending;
868 }
869 }
870 match this.stream.as_mut().poll_next_before(cx, before).into() {
871 PollState::Pending => return Poll::Pending,
872 PollState::Terminated => return Poll::Ready(PollResult::Terminated),
873 PollState::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
874 PollState::Item(data, ordering) => {
875 this.future.set(ThenItem::Running {
876 future: (this.then)(data),
877 ordering,
878 });
879 }
880 }
881 }
882 }
883
884 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
885 match &self.future {
886 ThenItem::Running { ordering, .. } => Some(MaybeBorrowed::Borrowed(ordering)),
887 ThenItem::Idle => self.stream.position_hint(),
888 }
889 }
890
891 fn size_hint(&self) -> (usize, Option<usize>) {
892 let (min, max) = self.stream.size_hint();
893 match self.future {
894 ThenItem::Running { .. } => (min.saturating_add(1), max.and_then(|v| v.checked_add(1))),
895 ThenItem::Idle => (min, max),
896 }
897 }
898}
899
900/// A future for the [`next`](OrderedStreamExt::next) function.
901#[derive(Debug)]
902pub struct Next<'a, S: ?Sized> {
903 stream: Pin<&'a mut S>,
904}
905
906impl<'a, S: ?Sized> Unpin for Next<'a, S> {}
907
908impl<'a, S> Future for Next<'a, S>
909where
910 S: OrderedStream + ?Sized,
911{
912 type Output = Option<S::Data>;
913
914 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Data>> {
915 self.stream
916 .as_mut()
917 .poll_next_before(cx, before:None)
918 .map(PollResult::into_data)
919 }
920}
921
922/// A future for the [`next_before`](OrderedStreamExt::next_before) function.
923#[derive(Debug)]
924pub struct NextBefore<'a, S>
925where
926 S: OrderedStream + ?Sized,
927{
928 stream: Pin<&'a mut S>,
929 before: Option<&'a S::Ordering>,
930}
931
932impl<'a, S: OrderedStream + ?Sized> Unpin for NextBefore<'a, S> {}
933
934impl<'a, S> Future for NextBefore<'a, S>
935where
936 S: OrderedStream + ?Sized,
937{
938 type Output = PollResult<S::Ordering, S::Data>;
939
940 fn poll(
941 mut self: Pin<&mut Self>,
942 cx: &mut Context<'_>,
943 ) -> Poll<PollResult<S::Ordering, S::Data>> {
944 let before: Option<&::Ordering> = self.before;
945 self.stream.as_mut().poll_next_before(cx, before)
946 }
947}
948
949pin_project_lite::pin_project! {
950 /// A stream for the [`peekable`](OrderedStreamExt::peekable) function.
951 #[derive(Debug)]
952 pub struct Peekable<S: OrderedStream> {
953 #[pin]
954 stream: S,
955 is_terminated: bool,
956 item: Option<(S::Ordering, S::Data)>,
957 }
958}
959
960impl<S: OrderedStream> Peekable<S> {
961 /// Convert into the source stream.
962 ///
963 /// This method returns the source stream along with any buffered item and its
964 /// ordering.
965 pub fn into_inner(self) -> (S, Option<(S::Data, S::Ordering)>) {
966 (self.stream, self.item.map(|(o, d)| (d, o)))
967 }
968
969 /// The current item, without polling
970 pub(crate) fn item(&self) -> Option<&(S::Ordering, S::Data)> {
971 self.item.as_ref()
972 }
973
974 /// Peek on the next item in the stream
975 pub fn poll_peek_before(
976 self: Pin<&mut Self>,
977 cx: &mut Context<'_>,
978 before: Option<&S::Ordering>,
979 ) -> Poll<PollResult<&S::Ordering, &mut S::Data>> {
980 let mut this = self.project();
981 if *this.is_terminated {
982 return Poll::Ready(PollResult::Terminated);
983 }
984 let stream = this.stream.as_mut();
985 if this.item.is_none() {
986 match stream.poll_next_before(cx, before) {
987 Poll::Ready(PollResult::Item { ordering, data }) => {
988 *this.item = Some((ordering, data));
989 }
990 Poll::Ready(PollResult::NoneBefore) => return Poll::Ready(PollResult::NoneBefore),
991 Poll::Ready(PollResult::Terminated) => {
992 *this.is_terminated = true;
993 return Poll::Ready(PollResult::Terminated);
994 }
995 Poll::Pending => return Poll::Pending,
996 }
997 }
998 let item = this.item.as_mut().unwrap();
999 Poll::Ready(PollResult::Item {
1000 ordering: &item.0,
1001 data: &mut item.1,
1002 })
1003 }
1004}
1005
1006impl<S: OrderedStream> OrderedStream for Peekable<S> {
1007 type Ordering = S::Ordering;
1008 type Data = S::Data;
1009
1010 fn poll_next_before(
1011 mut self: Pin<&mut Self>,
1012 cx: &mut Context<'_>,
1013 before: Option<&S::Ordering>,
1014 ) -> Poll<PollResult<S::Ordering, S::Data>> {
1015 match self.as_mut().poll_peek_before(cx, before) {
1016 Poll::Ready(PollResult::Item { .. }) => {
1017 let (ordering, data) = self.project().item.take().unwrap();
1018 Poll::Ready(PollResult::Item { ordering, data })
1019 }
1020 Poll::Ready(PollResult::NoneBefore) => Poll::Ready(PollResult::NoneBefore),
1021 Poll::Ready(PollResult::Terminated) => Poll::Ready(PollResult::Terminated),
1022 Poll::Pending => Poll::Pending,
1023 }
1024 }
1025
1026 fn position_hint(&self) -> Option<MaybeBorrowed<'_, Self::Ordering>> {
1027 match &self.item {
1028 Some((ordering, _)) => Some(MaybeBorrowed::Borrowed(ordering)),
1029 None => self.stream.position_hint(),
1030 }
1031 }
1032
1033 fn size_hint(&self) -> (usize, Option<usize>) {
1034 let (min, max) = if self.is_terminated {
1035 (0, Some(0))
1036 } else {
1037 self.stream.size_hint()
1038 };
1039 if self.item.is_some() {
1040 (min.saturating_add(1), max.and_then(|v| v.checked_add(1)))
1041 } else {
1042 (min, max)
1043 }
1044 }
1045}
1046
1047impl<S: OrderedStream> FusedOrderedStream for Peekable<S> {
1048 fn is_terminated(&self) -> bool {
1049 self.is_terminated
1050 }
1051}
1052