1//! Asynchronous sinks.
2//!
3//! This module contains:
4//!
5//! - The [`Sink`] trait, which allows you to asynchronously write data.
6//! - The [`SinkExt`] trait, which provides adapters for chaining and composing
7//! sinks.
8
9use crate::future::{assert_future, Either};
10use core::pin::Pin;
11use futures_core::future::Future;
12use futures_core::stream::{Stream, TryStream};
13use futures_core::task::{Context, Poll};
14
15#[cfg(feature = "compat")]
16use crate::compat::CompatSink;
17
18pub use futures_sink::Sink;
19
20mod close;
21pub use self::close::Close;
22
23mod drain;
24pub use self::drain::{drain, Drain};
25
26mod fanout;
27pub use self::fanout::Fanout;
28
29mod feed;
30pub use self::feed::Feed;
31
32mod flush;
33pub use self::flush::Flush;
34
35mod err_into;
36pub use self::err_into::SinkErrInto;
37
38mod map_err;
39pub use self::map_err::SinkMapErr;
40
41mod send;
42pub use self::send::Send;
43
44mod send_all;
45pub use self::send_all::SendAll;
46
47mod unfold;
48pub use self::unfold::{unfold, Unfold};
49
50mod with;
51pub use self::with::With;
52
53mod with_flat_map;
54pub use self::with_flat_map::WithFlatMap;
55
56#[cfg(feature = "alloc")]
57mod buffer;
58#[cfg(feature = "alloc")]
59pub use self::buffer::Buffer;
60
61impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
62
63/// An extension trait for `Sink`s that provides a variety of convenient
64/// combinator functions.
65pub trait SinkExt<Item>: Sink<Item> {
66 /// Composes a function *in front of* the sink.
67 ///
68 /// This adapter produces a new sink that passes each value through the
69 /// given function `f` before sending it to `self`.
70 ///
71 /// To process each value, `f` produces a *future*, which is then polled to
72 /// completion before passing its result down to the underlying sink. If the
73 /// future produces an error, that error is returned by the new sink.
74 ///
75 /// Note that this function consumes the given sink, returning a wrapped
76 /// version, much like `Iterator::map`.
77 fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
78 where
79 F: FnMut(U) -> Fut,
80 Fut: Future<Output = Result<Item, E>>,
81 E: From<Self::Error>,
82 Self: Sized,
83 {
84 assert_sink::<U, E, _>(With::new(self, f))
85 }
86
87 /// Composes a function *in front of* the sink.
88 ///
89 /// This adapter produces a new sink that passes each value through the
90 /// given function `f` before sending it to `self`.
91 ///
92 /// To process each value, `f` produces a *stream*, of which each value
93 /// is passed to the underlying sink. A new value will not be accepted until
94 /// the stream has been drained
95 ///
96 /// Note that this function consumes the given sink, returning a wrapped
97 /// version, much like `Iterator::flat_map`.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// # futures::executor::block_on(async {
103 /// use futures::channel::mpsc;
104 /// use futures::sink::SinkExt;
105 /// use futures::stream::{self, StreamExt};
106 ///
107 /// let (tx, rx) = mpsc::channel(5);
108 ///
109 /// let mut tx = tx.with_flat_map(|x| {
110 /// stream::iter(vec![Ok(42); x])
111 /// });
112 ///
113 /// tx.send(5).await.unwrap();
114 /// drop(tx);
115 /// let received: Vec<i32> = rx.collect().await;
116 /// assert_eq!(received, vec![42, 42, 42, 42, 42]);
117 /// # });
118 /// ```
119 fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
120 where
121 F: FnMut(U) -> St,
122 St: Stream<Item = Result<Item, Self::Error>>,
123 Self: Sized,
124 {
125 assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
126 }
127
128 /*
129 fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
130 where F: FnMut(U) -> Self::SinkItem,
131 Self: Sized;
132
133 fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
134 where F: FnMut(Self::SinkItem) -> bool,
135 Self: Sized;
136
137 fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
138 where F: FnMut(U) -> Option<Self::SinkItem>,
139 Self: Sized;
140 */
141
142 /// Transforms the error returned by the sink.
143 fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
144 where
145 F: FnOnce(Self::Error) -> E,
146 Self: Sized,
147 {
148 assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
149 }
150
151 /// Map this sink's error to a different error type using the `Into` trait.
152 ///
153 /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
154 fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
155 where
156 Self: Sized,
157 Self::Error: Into<E>,
158 {
159 assert_sink::<Item, E, _>(SinkErrInto::new(self))
160 }
161
162 /// Adds a fixed-size buffer to the current sink.
163 ///
164 /// The resulting sink will buffer up to `capacity` items when the
165 /// underlying sink is unwilling to accept additional items. Calling `flush`
166 /// on the buffered sink will attempt to both empty the buffer and complete
167 /// processing on the underlying sink.
168 ///
169 /// Note that this function consumes the given sink, returning a wrapped
170 /// version, much like `Iterator::map`.
171 ///
172 /// This method is only available when the `std` or `alloc` feature of this
173 /// library is activated, and it is activated by default.
174 #[cfg(feature = "alloc")]
175 fn buffer(self, capacity: usize) -> Buffer<Self, Item>
176 where
177 Self: Sized,
178 {
179 assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
180 }
181
182 /// Close the sink.
183 fn close(&mut self) -> Close<'_, Self, Item>
184 where
185 Self: Unpin,
186 {
187 assert_future::<Result<(), Self::Error>, _>(Close::new(self))
188 }
189
190 /// Fanout items to multiple sinks.
191 ///
192 /// This adapter clones each incoming item and forwards it to both this as well as
193 /// the other sink at the same time.
194 fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
195 where
196 Self: Sized,
197 Item: Clone,
198 Si: Sink<Item, Error = Self::Error>,
199 {
200 assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
201 }
202
203 /// Flush the sink, processing all pending items.
204 ///
205 /// This adapter is intended to be used when you want to stop sending to the sink
206 /// until all current requests are processed.
207 fn flush(&mut self) -> Flush<'_, Self, Item>
208 where
209 Self: Unpin,
210 {
211 assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
212 }
213
214 /// A future that completes after the given item has been fully processed
215 /// into the sink, including flushing.
216 ///
217 /// Note that, **because of the flushing requirement, it is usually better
218 /// to batch together items to send via `feed` or `send_all`,
219 /// rather than flushing between each item.**
220 fn send(&mut self, item: Item) -> Send<'_, Self, Item>
221 where
222 Self: Unpin,
223 {
224 assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
225 }
226
227 /// A future that completes after the given item has been received
228 /// by the sink.
229 ///
230 /// Unlike `send`, the returned future does not flush the sink.
231 /// It is the caller's responsibility to ensure all pending items
232 /// are processed, which can be done via `flush` or `close`.
233 fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
234 where
235 Self: Unpin,
236 {
237 assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
238 }
239
240 /// A future that completes after the given stream has been fully processed
241 /// into the sink, including flushing.
242 ///
243 /// This future will drive the stream to keep producing items until it is
244 /// exhausted, sending each item to the sink. It will complete once both the
245 /// stream is exhausted, the sink has received all items, and the sink has
246 /// been flushed. Note that the sink is **not** closed. If the stream produces
247 /// an error, that error will be returned by this future without flushing the sink.
248 ///
249 /// Doing `sink.send_all(stream)` is roughly equivalent to
250 /// `stream.forward(sink)`. The returned future will exhaust all items from
251 /// `stream` and send them to `self`.
252 fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
253 where
254 St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
255 // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized,
256 Self: Unpin,
257 {
258 // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>`
259 // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream))
260 SendAll::new(self, stream)
261 }
262
263 /// Wrap this sink in an `Either` sink, making it the left-hand variant
264 /// of that `Either`.
265 ///
266 /// This can be used in combination with the `right_sink` method to write `if`
267 /// statements that evaluate to different streams in different branches.
268 fn left_sink<Si2>(self) -> Either<Self, Si2>
269 where
270 Si2: Sink<Item, Error = Self::Error>,
271 Self: Sized,
272 {
273 assert_sink::<Item, Self::Error, _>(Either::Left(self))
274 }
275
276 /// Wrap this stream in an `Either` stream, making it the right-hand variant
277 /// of that `Either`.
278 ///
279 /// This can be used in combination with the `left_sink` method to write `if`
280 /// statements that evaluate to different streams in different branches.
281 fn right_sink<Si1>(self) -> Either<Si1, Self>
282 where
283 Si1: Sink<Item, Error = Self::Error>,
284 Self: Sized,
285 {
286 assert_sink::<Item, Self::Error, _>(Either::Right(self))
287 }
288
289 /// Wraps a [`Sink`] into a sink compatible with libraries using
290 /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled.
291 #[cfg(feature = "compat")]
292 #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
293 fn compat(self) -> CompatSink<Self, Item>
294 where
295 Self: Sized + Unpin,
296 {
297 CompatSink::new(self)
298 }
299
300 /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`]
301 /// sink types.
302 fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
303 where
304 Self: Unpin,
305 {
306 Pin::new(self).poll_ready(cx)
307 }
308
309 /// A convenience method for calling [`Sink::start_send`] on [`Unpin`]
310 /// sink types.
311 fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
312 where
313 Self: Unpin,
314 {
315 Pin::new(self).start_send(item)
316 }
317
318 /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`]
319 /// sink types.
320 fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
321 where
322 Self: Unpin,
323 {
324 Pin::new(self).poll_flush(cx)
325 }
326
327 /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`]
328 /// sink types.
329 fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
330 where
331 Self: Unpin,
332 {
333 Pin::new(self).poll_close(cx)
334 }
335}
336
337// Just a helper function to ensure the sinks we're returning all have the
338// right implementations.
339pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
340where
341 S: Sink<T, Error = E>,
342{
343 sink
344}
345