1 | use alloc::collections::VecDeque; |
2 | use core::pin::Pin; |
3 | use futures_core::ready; |
4 | use futures_core::stream::{FusedStream, Stream}; |
5 | use futures_core::task::{Context, Poll}; |
6 | use futures_sink::Sink; |
7 | use pin_project_lite::pin_project; |
8 | |
9 | pin_project! { |
10 | /// Sink for the [`buffer`](super::SinkExt::buffer) method. |
11 | #[derive(Debug)] |
12 | #[must_use = "sinks do nothing unless polled" ] |
13 | pub struct Buffer<Si, Item> { |
14 | #[pin] |
15 | sink: Si, |
16 | buf: VecDeque<Item>, |
17 | |
18 | // Track capacity separately from the `VecDeque`, which may be rounded up |
19 | capacity: usize, |
20 | } |
21 | } |
22 | |
23 | impl<Si: Sink<Item>, Item> Buffer<Si, Item> { |
24 | pub(super) fn new(sink: Si, capacity: usize) -> Self { |
25 | Self { sink, buf: VecDeque::with_capacity(capacity), capacity } |
26 | } |
27 | |
28 | delegate_access_inner!(sink, Si, ()); |
29 | |
30 | fn try_empty_buffer(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Si::Error>> { |
31 | let mut this: Projection<'_, Si, Item> = self.project(); |
32 | ready!(this.sink.as_mut().poll_ready(cx))?; |
33 | while let Some(item: Item) = this.buf.pop_front() { |
34 | this.sink.as_mut().start_send(item)?; |
35 | if !this.buf.is_empty() { |
36 | ready!(this.sink.as_mut().poll_ready(cx))?; |
37 | } |
38 | } |
39 | Poll::Ready(Ok(())) |
40 | } |
41 | } |
42 | |
43 | // Forwarding impl of Stream from the underlying sink |
44 | impl<S, Item> Stream for Buffer<S, Item> |
45 | where |
46 | S: Sink<Item> + Stream, |
47 | { |
48 | type Item = S::Item; |
49 | |
50 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
51 | self.project().sink.poll_next(cx) |
52 | } |
53 | |
54 | fn size_hint(&self) -> (usize, Option<usize>) { |
55 | self.sink.size_hint() |
56 | } |
57 | } |
58 | |
59 | impl<S, Item> FusedStream for Buffer<S, Item> |
60 | where |
61 | S: Sink<Item> + FusedStream, |
62 | { |
63 | fn is_terminated(&self) -> bool { |
64 | self.sink.is_terminated() |
65 | } |
66 | } |
67 | |
68 | impl<Si: Sink<Item>, Item> Sink<Item> for Buffer<Si, Item> { |
69 | type Error = Si::Error; |
70 | |
71 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
72 | if self.capacity == 0 { |
73 | return self.project().sink.poll_ready(cx); |
74 | } |
75 | |
76 | let _ = self.as_mut().try_empty_buffer(cx)?; |
77 | |
78 | if self.buf.len() >= self.capacity { |
79 | Poll::Pending |
80 | } else { |
81 | Poll::Ready(Ok(())) |
82 | } |
83 | } |
84 | |
85 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
86 | if self.capacity == 0 { |
87 | self.project().sink.start_send(item) |
88 | } else { |
89 | self.project().buf.push_back(item); |
90 | Ok(()) |
91 | } |
92 | } |
93 | |
94 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
95 | ready!(self.as_mut().try_empty_buffer(cx))?; |
96 | debug_assert!(self.buf.is_empty()); |
97 | self.project().sink.poll_flush(cx) |
98 | } |
99 | |
100 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
101 | ready!(self.as_mut().try_empty_buffer(cx))?; |
102 | debug_assert!(self.buf.is_empty()); |
103 | self.project().sink.poll_close(cx) |
104 | } |
105 | } |
106 | |