1 | use std::fmt; |
2 | use std::pin::Pin; |
3 | |
4 | use pin_project_lite::pin_project; |
5 | |
6 | use crate::io::{self, BufRead, IoSliceMut, Read}; |
7 | use crate::task::{Context, Poll}; |
8 | |
9 | pin_project! { |
10 | /// Adaptor to chain together two readers. |
11 | /// |
12 | /// This struct is generally created by calling [`chain`] on a reader. |
13 | /// Please see the documentation of [`chain`] for more details. |
14 | /// |
15 | /// [`chain`]: trait.Read.html#method.chain |
16 | pub struct Chain<T, U> { |
17 | #[pin] |
18 | pub(crate) first: T, |
19 | #[pin] |
20 | pub(crate) second: U, |
21 | pub(crate) done_first: bool, |
22 | } |
23 | } |
24 | |
25 | impl<T, U> Chain<T, U> { |
26 | /// Consumes the `Chain`, returning the wrapped readers. |
27 | /// |
28 | /// # Examples |
29 | /// |
30 | /// ```no_run |
31 | /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { |
32 | /// # |
33 | /// use async_std::prelude::*; |
34 | /// use async_std::fs::File; |
35 | /// |
36 | /// let foo_file = File::open("foo.txt" ).await?; |
37 | /// let bar_file = File::open("bar.txt" ).await?; |
38 | /// |
39 | /// let chain = foo_file.chain(bar_file); |
40 | /// let (foo_file, bar_file) = chain.into_inner(); |
41 | /// # |
42 | /// # Ok(()) }) } |
43 | /// ``` |
44 | pub fn into_inner(self) -> (T, U) { |
45 | (self.first, self.second) |
46 | } |
47 | |
48 | /// Gets references to the underlying readers in this `Chain`. |
49 | /// |
50 | /// # Examples |
51 | /// |
52 | /// ```no_run |
53 | /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { |
54 | /// # |
55 | /// use async_std::prelude::*; |
56 | /// use async_std::fs::File; |
57 | /// |
58 | /// let foo_file = File::open("foo.txt" ).await?; |
59 | /// let bar_file = File::open("bar.txt" ).await?; |
60 | /// |
61 | /// let chain = foo_file.chain(bar_file); |
62 | /// let (foo_file, bar_file) = chain.get_ref(); |
63 | /// # |
64 | /// # Ok(()) }) } |
65 | /// ``` |
66 | pub fn get_ref(&self) -> (&T, &U) { |
67 | (&self.first, &self.second) |
68 | } |
69 | |
70 | /// Gets mutable references to the underlying readers in this `Chain`. |
71 | /// |
72 | /// Care should be taken to avoid modifying the internal I/O state of the |
73 | /// underlying readers as doing so may corrupt the internal state of this |
74 | /// `Chain`. |
75 | /// |
76 | /// # Examples |
77 | /// |
78 | /// ```no_run |
79 | /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { |
80 | /// # |
81 | /// use async_std::prelude::*; |
82 | /// use async_std::fs::File; |
83 | /// |
84 | /// let foo_file = File::open("foo.txt" ).await?; |
85 | /// let bar_file = File::open("bar.txt" ).await?; |
86 | /// |
87 | /// let mut chain = foo_file.chain(bar_file); |
88 | /// let (foo_file, bar_file) = chain.get_mut(); |
89 | /// # |
90 | /// # Ok(()) }) } |
91 | /// ``` |
92 | pub fn get_mut(&mut self) -> (&mut T, &mut U) { |
93 | (&mut self.first, &mut self.second) |
94 | } |
95 | } |
96 | |
97 | impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> { |
98 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
99 | f&mut DebugStruct<'_, '_>.debug_struct("Chain" ) |
100 | .field("t" , &self.first) |
101 | .field(name:"u" , &self.second) |
102 | .finish() |
103 | } |
104 | } |
105 | |
106 | impl<T: Read, U: Read> Read for Chain<T, U> { |
107 | fn poll_read( |
108 | self: Pin<&mut Self>, |
109 | cx: &mut Context<'_>, |
110 | buf: &mut [u8], |
111 | ) -> Poll<io::Result<usize>> { |
112 | let this = self.project(); |
113 | if !*this.done_first { |
114 | match futures_core::ready!(this.first.poll_read(cx, buf)) { |
115 | Ok(0) if !buf.is_empty() => *this.done_first = true, |
116 | Ok(n) => return Poll::Ready(Ok(n)), |
117 | Err(err) => return Poll::Ready(Err(err)), |
118 | } |
119 | } |
120 | |
121 | this.second.poll_read(cx, buf) |
122 | } |
123 | |
124 | fn poll_read_vectored( |
125 | self: Pin<&mut Self>, |
126 | cx: &mut Context<'_>, |
127 | bufs: &mut [IoSliceMut<'_>], |
128 | ) -> Poll<io::Result<usize>> { |
129 | let this = self.project(); |
130 | if !*this.done_first { |
131 | match futures_core::ready!(this.first.poll_read_vectored(cx, bufs)) { |
132 | Ok(0) if !bufs.is_empty() => *this.done_first = true, |
133 | Ok(n) => return Poll::Ready(Ok(n)), |
134 | Err(err) => return Poll::Ready(Err(err)), |
135 | } |
136 | } |
137 | |
138 | this.second.poll_read_vectored(cx, bufs) |
139 | } |
140 | } |
141 | |
142 | impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> { |
143 | fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { |
144 | let this = self.project(); |
145 | if !*this.done_first { |
146 | match futures_core::ready!(this.first.poll_fill_buf(cx)) { |
147 | Ok(buf) if buf.is_empty() => { |
148 | *this.done_first = true; |
149 | } |
150 | Ok(buf) => return Poll::Ready(Ok(buf)), |
151 | Err(err) => return Poll::Ready(Err(err)), |
152 | } |
153 | } |
154 | |
155 | this.second.poll_fill_buf(cx) |
156 | } |
157 | |
158 | fn consume(self: Pin<&mut Self>, amt: usize) { |
159 | let this = self.project(); |
160 | if !*this.done_first { |
161 | this.first.consume(amt) |
162 | } else { |
163 | this.second.consume(amt) |
164 | } |
165 | } |
166 | } |
167 | |
168 | #[cfg (all(test, feature = "default" , not(target_arch = "wasm32" )))] |
169 | mod tests { |
170 | use crate::io; |
171 | use crate::prelude::*; |
172 | use crate::task; |
173 | |
174 | #[test ] |
175 | fn test_chain_basics() -> std::io::Result<()> { |
176 | let source1: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2]); |
177 | let source2: io::Cursor<Vec<u8>> = io::Cursor::new(vec![3, 4, 5]); |
178 | |
179 | task::block_on(async move { |
180 | let mut buffer = Vec::new(); |
181 | |
182 | let mut source = source1.chain(source2); |
183 | |
184 | assert_eq!(6, source.read_to_end(&mut buffer).await?); |
185 | assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]); |
186 | |
187 | Ok(()) |
188 | }) |
189 | } |
190 | } |
191 | |