| 1 | //! Asynchronous sinks. | 
| 2 | //! | 
|---|
| 3 | //! This module contains: | 
|---|
| 4 | //! | 
|---|
| 5 | //! - The [`Sink`] trait, which allows you to asynchronously write data. | 
|---|
| 6 | //! - The [`SinkExt`] trait, which provides adapters for chaining and composing | 
|---|
| 7 | //!   sinks. | 
|---|
| 8 |  | 
|---|
| 9 | use crate::future::{assert_future, Either}; | 
|---|
| 10 | use core::pin::Pin; | 
|---|
| 11 | use futures_core::future::Future; | 
|---|
| 12 | use futures_core::stream::{Stream, TryStream}; | 
|---|
| 13 | use futures_core::task::{Context, Poll}; | 
|---|
| 14 |  | 
|---|
| 15 | #[ cfg(feature = "compat")] | 
|---|
| 16 | use crate::compat::CompatSink; | 
|---|
| 17 |  | 
|---|
| 18 | pub use futures_sink::Sink; | 
|---|
| 19 |  | 
|---|
| 20 | mod close; | 
|---|
| 21 | pub use self::close::Close; | 
|---|
| 22 |  | 
|---|
| 23 | mod drain; | 
|---|
| 24 | pub use self::drain::{drain, Drain}; | 
|---|
| 25 |  | 
|---|
| 26 | mod fanout; | 
|---|
| 27 | pub use self::fanout::Fanout; | 
|---|
| 28 |  | 
|---|
| 29 | mod feed; | 
|---|
| 30 | pub use self::feed::Feed; | 
|---|
| 31 |  | 
|---|
| 32 | mod flush; | 
|---|
| 33 | pub use self::flush::Flush; | 
|---|
| 34 |  | 
|---|
| 35 | mod err_into; | 
|---|
| 36 | pub use self::err_into::SinkErrInto; | 
|---|
| 37 |  | 
|---|
| 38 | mod map_err; | 
|---|
| 39 | pub use self::map_err::SinkMapErr; | 
|---|
| 40 |  | 
|---|
| 41 | mod send; | 
|---|
| 42 | pub use self::send::Send; | 
|---|
| 43 |  | 
|---|
| 44 | mod send_all; | 
|---|
| 45 | pub use self::send_all::SendAll; | 
|---|
| 46 |  | 
|---|
| 47 | mod unfold; | 
|---|
| 48 | pub use self::unfold::{unfold, Unfold}; | 
|---|
| 49 |  | 
|---|
| 50 | mod with; | 
|---|
| 51 | pub use self::with::With; | 
|---|
| 52 |  | 
|---|
| 53 | mod with_flat_map; | 
|---|
| 54 | pub use self::with_flat_map::WithFlatMap; | 
|---|
| 55 |  | 
|---|
| 56 | #[ cfg(feature = "alloc")] | 
|---|
| 57 | mod buffer; | 
|---|
| 58 | #[ cfg(feature = "alloc")] | 
|---|
| 59 | pub use self::buffer::Buffer; | 
|---|
| 60 |  | 
|---|
| 61 | impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {} | 
|---|
| 62 |  | 
|---|
| 63 | /// An extension trait for `Sink`s that provides a variety of convenient | 
|---|
| 64 | /// combinator functions. | 
|---|
| 65 | pub trait SinkExt<Item>: Sink<Item> { | 
|---|
| 66 | /// Composes a function *in front of* the sink. | 
|---|
| 67 | /// | 
|---|
| 68 | /// This adapter produces a new sink that passes each value through the | 
|---|
| 69 | /// given function `f` before sending it to `self`. | 
|---|
| 70 | /// | 
|---|
| 71 | /// To process each value, `f` produces a *future*, which is then polled to | 
|---|
| 72 | /// completion before passing its result down to the underlying sink. If the | 
|---|
| 73 | /// future produces an error, that error is returned by the new sink. | 
|---|
| 74 | /// | 
|---|
| 75 | /// Note that this function consumes the given sink, returning a wrapped | 
|---|
| 76 | /// version, much like `Iterator::map`. | 
|---|
| 77 | fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F> | 
|---|
| 78 | where | 
|---|
| 79 | F: FnMut(U) -> Fut, | 
|---|
| 80 | Fut: Future<Output = Result<Item, E>>, | 
|---|
| 81 | E: From<Self::Error>, | 
|---|
| 82 | Self: Sized, | 
|---|
| 83 | { | 
|---|
| 84 | assert_sink::<U, E, _>(With::new(self, f)) | 
|---|
| 85 | } | 
|---|
| 86 |  | 
|---|
| 87 | /// Composes a function *in front of* the sink. | 
|---|
| 88 | /// | 
|---|
| 89 | /// This adapter produces a new sink that passes each value through the | 
|---|
| 90 | /// given function `f` before sending it to `self`. | 
|---|
| 91 | /// | 
|---|
| 92 | /// To process each value, `f` produces a *stream*, of which each value | 
|---|
| 93 | /// is passed to the underlying sink. A new value will not be accepted until | 
|---|
| 94 | /// the stream has been drained | 
|---|
| 95 | /// | 
|---|
| 96 | /// Note that this function consumes the given sink, returning a wrapped | 
|---|
| 97 | /// version, much like `Iterator::flat_map`. | 
|---|
| 98 | /// | 
|---|
| 99 | /// # Examples | 
|---|
| 100 | /// | 
|---|
| 101 | /// ``` | 
|---|
| 102 | /// # futures::executor::block_on(async { | 
|---|
| 103 | /// use futures::channel::mpsc; | 
|---|
| 104 | /// use futures::sink::SinkExt; | 
|---|
| 105 | /// use futures::stream::{self, StreamExt}; | 
|---|
| 106 | /// | 
|---|
| 107 | /// let (tx, rx) = mpsc::channel(5); | 
|---|
| 108 | /// | 
|---|
| 109 | /// let mut tx = tx.with_flat_map(|x| { | 
|---|
| 110 | ///     stream::iter(vec![Ok(42); x]) | 
|---|
| 111 | /// }); | 
|---|
| 112 | /// | 
|---|
| 113 | /// tx.send(5).await.unwrap(); | 
|---|
| 114 | /// drop(tx); | 
|---|
| 115 | /// let received: Vec<i32> = rx.collect().await; | 
|---|
| 116 | /// assert_eq!(received, vec![42, 42, 42, 42, 42]); | 
|---|
| 117 | /// # }); | 
|---|
| 118 | /// ``` | 
|---|
| 119 | fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F> | 
|---|
| 120 | where | 
|---|
| 121 | F: FnMut(U) -> St, | 
|---|
| 122 | St: Stream<Item = Result<Item, Self::Error>>, | 
|---|
| 123 | Self: Sized, | 
|---|
| 124 | { | 
|---|
| 125 | assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f)) | 
|---|
| 126 | } | 
|---|
| 127 |  | 
|---|
| 128 | /* | 
|---|
| 129 | fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F> | 
|---|
| 130 | where F: FnMut(U) -> Self::SinkItem, | 
|---|
| 131 | Self: Sized; | 
|---|
| 132 |  | 
|---|
| 133 | fn with_filter<F>(self, f: F) -> WithFilter<Self, F> | 
|---|
| 134 | where F: FnMut(Self::SinkItem) -> bool, | 
|---|
| 135 | Self: Sized; | 
|---|
| 136 |  | 
|---|
| 137 | fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F> | 
|---|
| 138 | where F: FnMut(U) -> Option<Self::SinkItem>, | 
|---|
| 139 | Self: Sized; | 
|---|
| 140 | */ | 
|---|
| 141 |  | 
|---|
| 142 | /// Transforms the error returned by the sink. | 
|---|
| 143 | fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F> | 
|---|
| 144 | where | 
|---|
| 145 | F: FnOnce(Self::Error) -> E, | 
|---|
| 146 | Self: Sized, | 
|---|
| 147 | { | 
|---|
| 148 | assert_sink::<Item, E, _>(SinkMapErr::new(self, f)) | 
|---|
| 149 | } | 
|---|
| 150 |  | 
|---|
| 151 | /// Map this sink's error to a different error type using the `Into` trait. | 
|---|
| 152 | /// | 
|---|
| 153 | /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`. | 
|---|
| 154 | fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E> | 
|---|
| 155 | where | 
|---|
| 156 | Self: Sized, | 
|---|
| 157 | Self::Error: Into<E>, | 
|---|
| 158 | { | 
|---|
| 159 | assert_sink::<Item, E, _>(SinkErrInto::new(self)) | 
|---|
| 160 | } | 
|---|
| 161 |  | 
|---|
| 162 | /// Adds a fixed-size buffer to the current sink. | 
|---|
| 163 | /// | 
|---|
| 164 | /// The resulting sink will buffer up to `capacity` items when the | 
|---|
| 165 | /// underlying sink is unwilling to accept additional items. Calling `flush` | 
|---|
| 166 | /// on the buffered sink will attempt to both empty the buffer and complete | 
|---|
| 167 | /// processing on the underlying sink. | 
|---|
| 168 | /// | 
|---|
| 169 | /// Note that this function consumes the given sink, returning a wrapped | 
|---|
| 170 | /// version, much like `Iterator::map`. | 
|---|
| 171 | /// | 
|---|
| 172 | /// This method is only available when the `std` or `alloc` feature of this | 
|---|
| 173 | /// library is activated, and it is activated by default. | 
|---|
| 174 | #[ cfg(feature = "alloc")] | 
|---|
| 175 | fn buffer(self, capacity: usize) -> Buffer<Self, Item> | 
|---|
| 176 | where | 
|---|
| 177 | Self: Sized, | 
|---|
| 178 | { | 
|---|
| 179 | assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity)) | 
|---|
| 180 | } | 
|---|
| 181 |  | 
|---|
| 182 | /// Close the sink. | 
|---|
| 183 | fn close(&mut self) -> Close<'_, Self, Item> | 
|---|
| 184 | where | 
|---|
| 185 | Self: Unpin, | 
|---|
| 186 | { | 
|---|
| 187 | assert_future::<Result<(), Self::Error>, _>(Close::new(self)) | 
|---|
| 188 | } | 
|---|
| 189 |  | 
|---|
| 190 | /// Fanout items to multiple sinks. | 
|---|
| 191 | /// | 
|---|
| 192 | /// This adapter clones each incoming item and forwards it to both this as well as | 
|---|
| 193 | /// the other sink at the same time. | 
|---|
| 194 | fn fanout<Si>(self, other: Si) -> Fanout<Self, Si> | 
|---|
| 195 | where | 
|---|
| 196 | Self: Sized, | 
|---|
| 197 | Item: Clone, | 
|---|
| 198 | Si: Sink<Item, Error = Self::Error>, | 
|---|
| 199 | { | 
|---|
| 200 | assert_sink::<Item, Self::Error, _>(Fanout::new(self, other)) | 
|---|
| 201 | } | 
|---|
| 202 |  | 
|---|
| 203 | /// Flush the sink, processing all pending items. | 
|---|
| 204 | /// | 
|---|
| 205 | /// This adapter is intended to be used when you want to stop sending to the sink | 
|---|
| 206 | /// until all current requests are processed. | 
|---|
| 207 | fn flush(&mut self) -> Flush<'_, Self, Item> | 
|---|
| 208 | where | 
|---|
| 209 | Self: Unpin, | 
|---|
| 210 | { | 
|---|
| 211 | assert_future::<Result<(), Self::Error>, _>(Flush::new(self)) | 
|---|
| 212 | } | 
|---|
| 213 |  | 
|---|
| 214 | /// A future that completes after the given item has been fully processed | 
|---|
| 215 | /// into the sink, including flushing. | 
|---|
| 216 | /// | 
|---|
| 217 | /// Note that, **because of the flushing requirement, it is usually better | 
|---|
| 218 | /// to batch together items to send via `feed` or `send_all`, | 
|---|
| 219 | /// rather than flushing between each item.** | 
|---|
| 220 | fn send(&mut self, item: Item) -> Send<'_, Self, Item> | 
|---|
| 221 | where | 
|---|
| 222 | Self: Unpin, | 
|---|
| 223 | { | 
|---|
| 224 | assert_future::<Result<(), Self::Error>, _>(Send::new(self, item)) | 
|---|
| 225 | } | 
|---|
| 226 |  | 
|---|
| 227 | /// A future that completes after the given item has been received | 
|---|
| 228 | /// by the sink. | 
|---|
| 229 | /// | 
|---|
| 230 | /// Unlike `send`, the returned future does not flush the sink. | 
|---|
| 231 | /// It is the caller's responsibility to ensure all pending items | 
|---|
| 232 | /// are processed, which can be done via `flush` or `close`. | 
|---|
| 233 | fn feed(&mut self, item: Item) -> Feed<'_, Self, Item> | 
|---|
| 234 | where | 
|---|
| 235 | Self: Unpin, | 
|---|
| 236 | { | 
|---|
| 237 | assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item)) | 
|---|
| 238 | } | 
|---|
| 239 |  | 
|---|
| 240 | /// A future that completes after the given stream has been fully processed | 
|---|
| 241 | /// into the sink, including flushing. | 
|---|
| 242 | /// | 
|---|
| 243 | /// This future will drive the stream to keep producing items until it is | 
|---|
| 244 | /// exhausted, sending each item to the sink. It will complete once both the | 
|---|
| 245 | /// stream is exhausted, the sink has received all items, and the sink has | 
|---|
| 246 | /// been flushed. Note that the sink is **not** closed. If the stream produces | 
|---|
| 247 | /// an error, that error will be returned by this future without flushing the sink. | 
|---|
| 248 | /// | 
|---|
| 249 | /// Doing `sink.send_all(stream)` is roughly equivalent to | 
|---|
| 250 | /// `stream.forward(sink)`. The returned future will exhaust all items from | 
|---|
| 251 | /// `stream` and send them to `self`. | 
|---|
| 252 | fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St> | 
|---|
| 253 | where | 
|---|
| 254 | St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized, | 
|---|
| 255 | // St: Stream<Item = Result<Item, Self::Error>> + Unpin + ?Sized, | 
|---|
| 256 | Self: Unpin, | 
|---|
| 257 | { | 
|---|
| 258 | // TODO: type mismatch resolving `<St as Stream>::Item == std::result::Result<Item, <Self as futures_sink::Sink<Item>>::Error>` | 
|---|
| 259 | // assert_future::<Result<(), Self::Error>, _>(SendAll::new(self, stream)) | 
|---|
| 260 | SendAll::new(self, stream) | 
|---|
| 261 | } | 
|---|
| 262 |  | 
|---|
| 263 | /// Wrap this sink in an `Either` sink, making it the left-hand variant | 
|---|
| 264 | /// of that `Either`. | 
|---|
| 265 | /// | 
|---|
| 266 | /// This can be used in combination with the `right_sink` method to write `if` | 
|---|
| 267 | /// statements that evaluate to different streams in different branches. | 
|---|
| 268 | fn left_sink<Si2>(self) -> Either<Self, Si2> | 
|---|
| 269 | where | 
|---|
| 270 | Si2: Sink<Item, Error = Self::Error>, | 
|---|
| 271 | Self: Sized, | 
|---|
| 272 | { | 
|---|
| 273 | assert_sink::<Item, Self::Error, _>(Either::Left(self)) | 
|---|
| 274 | } | 
|---|
| 275 |  | 
|---|
| 276 | /// Wrap this stream in an `Either` stream, making it the right-hand variant | 
|---|
| 277 | /// of that `Either`. | 
|---|
| 278 | /// | 
|---|
| 279 | /// This can be used in combination with the `left_sink` method to write `if` | 
|---|
| 280 | /// statements that evaluate to different streams in different branches. | 
|---|
| 281 | fn right_sink<Si1>(self) -> Either<Si1, Self> | 
|---|
| 282 | where | 
|---|
| 283 | Si1: Sink<Item, Error = Self::Error>, | 
|---|
| 284 | Self: Sized, | 
|---|
| 285 | { | 
|---|
| 286 | assert_sink::<Item, Self::Error, _>(Either::Right(self)) | 
|---|
| 287 | } | 
|---|
| 288 |  | 
|---|
| 289 | /// Wraps a [`Sink`] into a sink compatible with libraries using | 
|---|
| 290 | /// futures 0.1 `Sink`. Requires the `compat` feature to be enabled. | 
|---|
| 291 | #[ cfg(feature = "compat")] | 
|---|
| 292 | #[ cfg_attr(docsrs, doc(cfg(feature = "compat")))] | 
|---|
| 293 | fn compat(self) -> CompatSink<Self, Item> | 
|---|
| 294 | where | 
|---|
| 295 | Self: Sized + Unpin, | 
|---|
| 296 | { | 
|---|
| 297 | CompatSink::new(self) | 
|---|
| 298 | } | 
|---|
| 299 |  | 
|---|
| 300 | /// A convenience method for calling [`Sink::poll_ready`] on [`Unpin`] | 
|---|
| 301 | /// sink types. | 
|---|
| 302 | fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> | 
|---|
| 303 | where | 
|---|
| 304 | Self: Unpin, | 
|---|
| 305 | { | 
|---|
| 306 | Pin::new(self).poll_ready(cx) | 
|---|
| 307 | } | 
|---|
| 308 |  | 
|---|
| 309 | /// A convenience method for calling [`Sink::start_send`] on [`Unpin`] | 
|---|
| 310 | /// sink types. | 
|---|
| 311 | fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error> | 
|---|
| 312 | where | 
|---|
| 313 | Self: Unpin, | 
|---|
| 314 | { | 
|---|
| 315 | Pin::new(self).start_send(item) | 
|---|
| 316 | } | 
|---|
| 317 |  | 
|---|
| 318 | /// A convenience method for calling [`Sink::poll_flush`] on [`Unpin`] | 
|---|
| 319 | /// sink types. | 
|---|
| 320 | fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> | 
|---|
| 321 | where | 
|---|
| 322 | Self: Unpin, | 
|---|
| 323 | { | 
|---|
| 324 | Pin::new(self).poll_flush(cx) | 
|---|
| 325 | } | 
|---|
| 326 |  | 
|---|
| 327 | /// A convenience method for calling [`Sink::poll_close`] on [`Unpin`] | 
|---|
| 328 | /// sink types. | 
|---|
| 329 | fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> | 
|---|
| 330 | where | 
|---|
| 331 | Self: Unpin, | 
|---|
| 332 | { | 
|---|
| 333 | Pin::new(self).poll_close(cx) | 
|---|
| 334 | } | 
|---|
| 335 | } | 
|---|
| 336 |  | 
|---|
| 337 | // Just a helper function to ensure the sinks we're returning all have the | 
|---|
| 338 | // right implementations. | 
|---|
| 339 | pub(crate) fn assert_sink<T, E, S>(sink: S) -> S | 
|---|
| 340 | where | 
|---|
| 341 | S: Sink<T, Error = E>, | 
|---|
| 342 | { | 
|---|
| 343 | sink | 
|---|
| 344 | } | 
|---|
| 345 |  | 
|---|