| 1 | use futures_core::ready; |
| 2 | use futures_core::task::{Context, Poll}; |
| 3 | use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut}; |
| 4 | use pin_project_lite::pin_project; |
| 5 | use std::fmt; |
| 6 | use std::io; |
| 7 | use std::pin::Pin; |
| 8 | |
| 9 | pin_project! { |
| 10 | /// Reader for the [`chain`](super::AsyncReadExt::chain) method. |
| 11 | #[must_use = "readers do nothing unless polled" ] |
| 12 | pub struct Chain<T, U> { |
| 13 | #[pin] |
| 14 | first: T, |
| 15 | #[pin] |
| 16 | second: U, |
| 17 | done_first: bool, |
| 18 | } |
| 19 | } |
| 20 | |
| 21 | impl<T, U> Chain<T, U> |
| 22 | where |
| 23 | T: AsyncRead, |
| 24 | U: AsyncRead, |
| 25 | { |
| 26 | pub(super) fn new(first: T, second: U) -> Self { |
| 27 | Self { first, second, done_first: false } |
| 28 | } |
| 29 | |
| 30 | /// Gets references to the underlying readers in this `Chain`. |
| 31 | pub fn get_ref(&self) -> (&T, &U) { |
| 32 | (&self.first, &self.second) |
| 33 | } |
| 34 | |
| 35 | /// Gets mutable references to the underlying readers in this `Chain`. |
| 36 | /// |
| 37 | /// Care should be taken to avoid modifying the internal I/O state of the |
| 38 | /// underlying readers as doing so may corrupt the internal state of this |
| 39 | /// `Chain`. |
| 40 | pub fn get_mut(&mut self) -> (&mut T, &mut U) { |
| 41 | (&mut self.first, &mut self.second) |
| 42 | } |
| 43 | |
| 44 | /// Gets pinned mutable references to the underlying readers in this `Chain`. |
| 45 | /// |
| 46 | /// Care should be taken to avoid modifying the internal I/O state of the |
| 47 | /// underlying readers as doing so may corrupt the internal state of this |
| 48 | /// `Chain`. |
| 49 | pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut T>, Pin<&mut U>) { |
| 50 | let this = self.project(); |
| 51 | (this.first, this.second) |
| 52 | } |
| 53 | |
| 54 | /// Consumes the `Chain`, returning the wrapped readers. |
| 55 | pub fn into_inner(self) -> (T, U) { |
| 56 | (self.first, self.second) |
| 57 | } |
| 58 | } |
| 59 | |
| 60 | impl<T, U> fmt::Debug for Chain<T, U> |
| 61 | where |
| 62 | T: fmt::Debug, |
| 63 | U: fmt::Debug, |
| 64 | { |
| 65 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 66 | f&mut DebugStruct<'_, '_>.debug_struct("Chain" ) |
| 67 | .field("t" , &self.first) |
| 68 | .field("u" , &self.second) |
| 69 | .field(name:"done_first" , &self.done_first) |
| 70 | .finish() |
| 71 | } |
| 72 | } |
| 73 | |
| 74 | impl<T, U> AsyncRead for Chain<T, U> |
| 75 | where |
| 76 | T: AsyncRead, |
| 77 | U: AsyncRead, |
| 78 | { |
| 79 | fn poll_read( |
| 80 | self: Pin<&mut Self>, |
| 81 | cx: &mut Context<'_>, |
| 82 | buf: &mut [u8], |
| 83 | ) -> Poll<io::Result<usize>> { |
| 84 | let this = self.project(); |
| 85 | |
| 86 | if !*this.done_first { |
| 87 | match ready!(this.first.poll_read(cx, buf)?) { |
| 88 | 0 if !buf.is_empty() => *this.done_first = true, |
| 89 | n => return Poll::Ready(Ok(n)), |
| 90 | } |
| 91 | } |
| 92 | this.second.poll_read(cx, buf) |
| 93 | } |
| 94 | |
| 95 | fn poll_read_vectored( |
| 96 | self: Pin<&mut Self>, |
| 97 | cx: &mut Context<'_>, |
| 98 | bufs: &mut [IoSliceMut<'_>], |
| 99 | ) -> Poll<io::Result<usize>> { |
| 100 | let this = self.project(); |
| 101 | |
| 102 | if !*this.done_first { |
| 103 | let n = ready!(this.first.poll_read_vectored(cx, bufs)?); |
| 104 | if n == 0 && bufs.iter().any(|b| !b.is_empty()) { |
| 105 | *this.done_first = true |
| 106 | } else { |
| 107 | return Poll::Ready(Ok(n)); |
| 108 | } |
| 109 | } |
| 110 | this.second.poll_read_vectored(cx, bufs) |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | impl<T, U> AsyncBufRead for Chain<T, U> |
| 115 | where |
| 116 | T: AsyncBufRead, |
| 117 | U: AsyncBufRead, |
| 118 | { |
| 119 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
| 120 | let this = self.project(); |
| 121 | |
| 122 | if !*this.done_first { |
| 123 | match ready!(this.first.poll_fill_buf(cx)?) { |
| 124 | buf if buf.is_empty() => { |
| 125 | *this.done_first = true; |
| 126 | } |
| 127 | buf => return Poll::Ready(Ok(buf)), |
| 128 | } |
| 129 | } |
| 130 | this.second.poll_fill_buf(cx) |
| 131 | } |
| 132 | |
| 133 | fn consume(self: Pin<&mut Self>, amt: usize) { |
| 134 | let this = self.project(); |
| 135 | |
| 136 | if !*this.done_first { |
| 137 | this.first.consume(amt) |
| 138 | } else { |
| 139 | this.second.consume(amt) |
| 140 | } |
| 141 | } |
| 142 | } |
| 143 | |