1use futures_core::ready;
2use futures_core::task::{Context, Poll};
3use futures_io::AsyncWrite;
4use futures_sink::Sink;
5use pin_project_lite::pin_project;
6use std::io;
7use std::pin::Pin;
8
9#[derive(Debug)]
10struct Block<Item> {
11 offset: usize,
12 bytes: Item,
13}
14
15pin_project! {
16 /// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
17 #[must_use = "sinks do nothing unless polled"]
18 #[derive(Debug)]
19 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
20 pub struct IntoSink<W, Item> {
21 #[pin]
22 writer: W,
23 // An outstanding block for us to push into the underlying writer, along with an offset of how
24 // far into this block we have written already.
25 buffer: Option<Block<Item>>,
26 }
27}
28
29impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
30 pub(super) fn new(writer: W) -> Self {
31 Self { writer, buffer: None }
32 }
33
34 /// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
35 /// flush the writer after it succeeds in pushing the block into it.
36 fn poll_flush_buffer(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 ) -> Poll<Result<(), io::Error>> {
40 let mut this = self.project();
41
42 if let Some(buffer) = this.buffer {
43 loop {
44 let bytes = buffer.bytes.as_ref();
45 let written = ready!(this.writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
46 buffer.offset += written;
47 if buffer.offset == bytes.len() {
48 break;
49 }
50 }
51 }
52 *this.buffer = None;
53 Poll::Ready(Ok(()))
54 }
55}
56
57impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
58 type Error = io::Error;
59
60 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61 ready!(self.poll_flush_buffer(cx))?;
62 Poll::Ready(Ok(()))
63 }
64
65 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
66 debug_assert!(self.buffer.is_none());
67 *self.project().buffer = Some(Block { offset: 0, bytes: item });
68 Ok(())
69 }
70
71 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
72 ready!(self.as_mut().poll_flush_buffer(cx))?;
73 ready!(self.project().writer.poll_flush(cx))?;
74 Poll::Ready(Ok(()))
75 }
76
77 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
78 ready!(self.as_mut().poll_flush_buffer(cx))?;
79 ready!(self.project().writer.poll_close(cx))?;
80 Poll::Ready(Ok(()))
81 }
82}
83