1 | //! Asynchronous sinks |
2 | //! |
3 | //! This crate contains the `Sink` trait which allows values to be sent |
4 | //! asynchronously. |
5 | |
6 | #![cfg_attr (not(feature = "std" ), no_std)] |
7 | #![warn (missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub)] |
8 | // It cannot be included in the published code because this lints have false positives in the minimum required version. |
9 | #![cfg_attr (test, warn(single_use_lifetimes))] |
10 | #![doc (test( |
11 | no_crate_inject, |
12 | attr( |
13 | deny(warnings, rust_2018_idioms, single_use_lifetimes), |
14 | allow(dead_code, unused_assignments, unused_variables) |
15 | ) |
16 | ))] |
17 | |
18 | #[cfg (feature = "alloc" )] |
19 | extern crate alloc; |
20 | |
21 | use core::ops::DerefMut; |
22 | use core::pin::Pin; |
23 | use core::task::{Context, Poll}; |
24 | |
25 | /// A `Sink` is a value into which other values can be sent, asynchronously. |
26 | /// |
27 | /// Basic examples of sinks include the sending side of: |
28 | /// |
29 | /// - Channels |
30 | /// - Sockets |
31 | /// - Pipes |
32 | /// |
33 | /// In addition to such "primitive" sinks, it's typical to layer additional |
34 | /// functionality, such as buffering, on top of an existing sink. |
35 | /// |
36 | /// Sending to a sink is "asynchronous" in the sense that the value may not be |
37 | /// sent in its entirety immediately. Instead, values are sent in a two-phase |
38 | /// way: first by initiating a send, and then by polling for completion. This |
39 | /// two-phase setup is analogous to buffered writing in synchronous code, where |
40 | /// writes often succeed immediately, but internally are buffered and are |
41 | /// *actually* written only upon flushing. |
42 | /// |
43 | /// In addition, the `Sink` may be *full*, in which case it is not even possible |
44 | /// to start the sending process. |
45 | /// |
46 | /// As with `Future` and `Stream`, the `Sink` trait is built from a few core |
47 | /// required methods, and a host of default methods for working in a |
48 | /// higher-level way. The `Sink::send_all` combinator is of particular |
49 | /// importance: you can use it to send an entire stream to a sink, which is |
50 | /// the simplest way to ultimately consume a stream. |
51 | #[must_use = "sinks do nothing unless polled" ] |
52 | pub trait Sink<Item> { |
53 | /// The type of value produced by the sink when an error occurs. |
54 | type Error; |
55 | |
56 | /// Attempts to prepare the `Sink` to receive a value. |
57 | /// |
58 | /// This method must be called and return `Poll::Ready(Ok(()))` prior to |
59 | /// each call to `start_send`. |
60 | /// |
61 | /// This method returns `Poll::Ready` once the underlying sink is ready to |
62 | /// receive data. If this method returns `Poll::Pending`, the current task |
63 | /// is registered to be notified (via `cx.waker().wake_by_ref()`) when `poll_ready` |
64 | /// should be called again. |
65 | /// |
66 | /// In most cases, if the sink encounters an error, the sink will |
67 | /// permanently be unable to receive items. |
68 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; |
69 | |
70 | /// Begin the process of sending a value to the sink. |
71 | /// Each call to this function must be preceded by a successful call to |
72 | /// `poll_ready` which returned `Poll::Ready(Ok(()))`. |
73 | /// |
74 | /// As the name suggests, this method only *begins* the process of sending |
75 | /// the item. If the sink employs buffering, the item isn't fully processed |
76 | /// until the buffer is fully flushed. Since sinks are designed to work with |
77 | /// asynchronous I/O, the process of actually writing out the data to an |
78 | /// underlying object takes place asynchronously. **You *must* use |
79 | /// `poll_flush` or `poll_close` in order to guarantee completion of a |
80 | /// send**. |
81 | /// |
82 | /// Implementations of `poll_ready` and `start_send` will usually involve |
83 | /// flushing behind the scenes in order to make room for new messages. |
84 | /// It is only necessary to call `poll_flush` if you need to guarantee that |
85 | /// *all* of the items placed into the `Sink` have been sent. |
86 | /// |
87 | /// In most cases, if the sink encounters an error, the sink will |
88 | /// permanently be unable to receive items. |
89 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>; |
90 | |
91 | /// Flush any remaining output from this sink. |
92 | /// |
93 | /// Returns `Poll::Ready(Ok(()))` when no buffered items remain. If this |
94 | /// value is returned then it is guaranteed that all previous values sent |
95 | /// via `start_send` have been flushed. |
96 | /// |
97 | /// Returns `Poll::Pending` if there is more work left to do, in which |
98 | /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when |
99 | /// `poll_flush` should be called again. |
100 | /// |
101 | /// In most cases, if the sink encounters an error, the sink will |
102 | /// permanently be unable to receive items. |
103 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; |
104 | |
105 | /// Flush any remaining output and close this sink, if necessary. |
106 | /// |
107 | /// Returns `Poll::Ready(Ok(()))` when no buffered items remain and the sink |
108 | /// has been successfully closed. |
109 | /// |
110 | /// Returns `Poll::Pending` if there is more work left to do, in which |
111 | /// case the current task is scheduled (via `cx.waker().wake_by_ref()`) to wake up when |
112 | /// `poll_close` should be called again. |
113 | /// |
114 | /// If this function encounters an error, the sink should be considered to |
115 | /// have failed permanently, and no more `Sink` methods should be called. |
116 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>; |
117 | } |
118 | |
119 | impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for &mut S { |
120 | type Error = S::Error; |
121 | |
122 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
123 | Pin::new(&mut **self).poll_ready(cx) |
124 | } |
125 | |
126 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
127 | Pin::new(&mut **self).start_send(item) |
128 | } |
129 | |
130 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
131 | Pin::new(&mut **self).poll_flush(cx) |
132 | } |
133 | |
134 | fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
135 | Pin::new(&mut **self).poll_close(cx) |
136 | } |
137 | } |
138 | |
139 | impl<P, Item> Sink<Item> for Pin<P> |
140 | where |
141 | P: DerefMut + Unpin, |
142 | P::Target: Sink<Item>, |
143 | { |
144 | type Error = <P::Target as Sink<Item>>::Error; |
145 | |
146 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
147 | self.get_mut().as_mut().poll_ready(cx) |
148 | } |
149 | |
150 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
151 | self.get_mut().as_mut().start_send(item) |
152 | } |
153 | |
154 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
155 | self.get_mut().as_mut().poll_flush(cx) |
156 | } |
157 | |
158 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
159 | self.get_mut().as_mut().poll_close(cx) |
160 | } |
161 | } |
162 | |
163 | #[cfg (feature = "alloc" )] |
164 | mod if_alloc { |
165 | use super::*; |
166 | use core::convert::Infallible as Never; |
167 | |
168 | impl<T> Sink<T> for alloc::vec::Vec<T> { |
169 | type Error = Never; |
170 | |
171 | fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
172 | Poll::Ready(Ok(())) |
173 | } |
174 | |
175 | fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
176 | // TODO: impl<T> Unpin for Vec<T> {} |
177 | unsafe { self.get_unchecked_mut() }.push(item); |
178 | Ok(()) |
179 | } |
180 | |
181 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
182 | Poll::Ready(Ok(())) |
183 | } |
184 | |
185 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
186 | Poll::Ready(Ok(())) |
187 | } |
188 | } |
189 | |
190 | impl<T> Sink<T> for alloc::collections::VecDeque<T> { |
191 | type Error = Never; |
192 | |
193 | fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
194 | Poll::Ready(Ok(())) |
195 | } |
196 | |
197 | fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { |
198 | // TODO: impl<T> Unpin for Vec<T> {} |
199 | unsafe { self.get_unchecked_mut() }.push_back(item); |
200 | Ok(()) |
201 | } |
202 | |
203 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
204 | Poll::Ready(Ok(())) |
205 | } |
206 | |
207 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
208 | Poll::Ready(Ok(())) |
209 | } |
210 | } |
211 | |
212 | impl<S: ?Sized + Sink<Item> + Unpin, Item> Sink<Item> for alloc::boxed::Box<S> { |
213 | type Error = S::Error; |
214 | |
215 | fn poll_ready( |
216 | mut self: Pin<&mut Self>, |
217 | cx: &mut Context<'_>, |
218 | ) -> Poll<Result<(), Self::Error>> { |
219 | Pin::new(&mut **self).poll_ready(cx) |
220 | } |
221 | |
222 | fn start_send(mut self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
223 | Pin::new(&mut **self).start_send(item) |
224 | } |
225 | |
226 | fn poll_flush( |
227 | mut self: Pin<&mut Self>, |
228 | cx: &mut Context<'_>, |
229 | ) -> Poll<Result<(), Self::Error>> { |
230 | Pin::new(&mut **self).poll_flush(cx) |
231 | } |
232 | |
233 | fn poll_close( |
234 | mut self: Pin<&mut Self>, |
235 | cx: &mut Context<'_>, |
236 | ) -> Poll<Result<(), Self::Error>> { |
237 | Pin::new(&mut **self).poll_close(cx) |
238 | } |
239 | } |
240 | } |
241 | |