1use std::fmt;
2use std::pin::Pin;
3
4use pin_project_lite::pin_project;
5
6use crate::io::{self, BufRead, IoSliceMut, Read};
7use crate::task::{Context, Poll};
8
9pin_project! {
10 /// Adaptor to chain together two readers.
11 ///
12 /// This struct is generally created by calling [`chain`] on a reader.
13 /// Please see the documentation of [`chain`] for more details.
14 ///
15 /// [`chain`]: trait.Read.html#method.chain
16 pub struct Chain<T, U> {
17 #[pin]
18 pub(crate) first: T,
19 #[pin]
20 pub(crate) second: U,
21 pub(crate) done_first: bool,
22 }
23}
24
25impl<T, U> Chain<T, U> {
26 /// Consumes the `Chain`, returning the wrapped readers.
27 ///
28 /// # Examples
29 ///
30 /// ```no_run
31 /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
32 /// #
33 /// use async_std::prelude::*;
34 /// use async_std::fs::File;
35 ///
36 /// let foo_file = File::open("foo.txt").await?;
37 /// let bar_file = File::open("bar.txt").await?;
38 ///
39 /// let chain = foo_file.chain(bar_file);
40 /// let (foo_file, bar_file) = chain.into_inner();
41 /// #
42 /// # Ok(()) }) }
43 /// ```
44 pub fn into_inner(self) -> (T, U) {
45 (self.first, self.second)
46 }
47
48 /// Gets references to the underlying readers in this `Chain`.
49 ///
50 /// # Examples
51 ///
52 /// ```no_run
53 /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
54 /// #
55 /// use async_std::prelude::*;
56 /// use async_std::fs::File;
57 ///
58 /// let foo_file = File::open("foo.txt").await?;
59 /// let bar_file = File::open("bar.txt").await?;
60 ///
61 /// let chain = foo_file.chain(bar_file);
62 /// let (foo_file, bar_file) = chain.get_ref();
63 /// #
64 /// # Ok(()) }) }
65 /// ```
66 pub fn get_ref(&self) -> (&T, &U) {
67 (&self.first, &self.second)
68 }
69
70 /// Gets mutable references to the underlying readers in this `Chain`.
71 ///
72 /// Care should be taken to avoid modifying the internal I/O state of the
73 /// underlying readers as doing so may corrupt the internal state of this
74 /// `Chain`.
75 ///
76 /// # Examples
77 ///
78 /// ```no_run
79 /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
80 /// #
81 /// use async_std::prelude::*;
82 /// use async_std::fs::File;
83 ///
84 /// let foo_file = File::open("foo.txt").await?;
85 /// let bar_file = File::open("bar.txt").await?;
86 ///
87 /// let mut chain = foo_file.chain(bar_file);
88 /// let (foo_file, bar_file) = chain.get_mut();
89 /// #
90 /// # Ok(()) }) }
91 /// ```
92 pub fn get_mut(&mut self) -> (&mut T, &mut U) {
93 (&mut self.first, &mut self.second)
94 }
95}
96
97impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f&mut DebugStruct<'_, '_>.debug_struct("Chain")
100 .field("t", &self.first)
101 .field(name:"u", &self.second)
102 .finish()
103 }
104}
105
106impl<T: Read, U: Read> Read for Chain<T, U> {
107 fn poll_read(
108 self: Pin<&mut Self>,
109 cx: &mut Context<'_>,
110 buf: &mut [u8],
111 ) -> Poll<io::Result<usize>> {
112 let this = self.project();
113 if !*this.done_first {
114 match futures_core::ready!(this.first.poll_read(cx, buf)) {
115 Ok(0) if !buf.is_empty() => *this.done_first = true,
116 Ok(n) => return Poll::Ready(Ok(n)),
117 Err(err) => return Poll::Ready(Err(err)),
118 }
119 }
120
121 this.second.poll_read(cx, buf)
122 }
123
124 fn poll_read_vectored(
125 self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 bufs: &mut [IoSliceMut<'_>],
128 ) -> Poll<io::Result<usize>> {
129 let this = self.project();
130 if !*this.done_first {
131 match futures_core::ready!(this.first.poll_read_vectored(cx, bufs)) {
132 Ok(0) if !bufs.is_empty() => *this.done_first = true,
133 Ok(n) => return Poll::Ready(Ok(n)),
134 Err(err) => return Poll::Ready(Err(err)),
135 }
136 }
137
138 this.second.poll_read_vectored(cx, bufs)
139 }
140}
141
142impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> {
143 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
144 let this = self.project();
145 if !*this.done_first {
146 match futures_core::ready!(this.first.poll_fill_buf(cx)) {
147 Ok(buf) if buf.is_empty() => {
148 *this.done_first = true;
149 }
150 Ok(buf) => return Poll::Ready(Ok(buf)),
151 Err(err) => return Poll::Ready(Err(err)),
152 }
153 }
154
155 this.second.poll_fill_buf(cx)
156 }
157
158 fn consume(self: Pin<&mut Self>, amt: usize) {
159 let this = self.project();
160 if !*this.done_first {
161 this.first.consume(amt)
162 } else {
163 this.second.consume(amt)
164 }
165 }
166}
167
168#[cfg(all(test, feature = "default", not(target_arch = "wasm32")))]
169mod tests {
170 use crate::io;
171 use crate::prelude::*;
172 use crate::task;
173
174 #[test]
175 fn test_chain_basics() -> std::io::Result<()> {
176 let source1: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2]);
177 let source2: io::Cursor<Vec<u8>> = io::Cursor::new(vec![3, 4, 5]);
178
179 task::block_on(async move {
180 let mut buffer = Vec::new();
181
182 let mut source = source1.chain(source2);
183
184 assert_eq!(6, source.read_to_end(&mut buffer).await?);
185 assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]);
186
187 Ok(())
188 })
189 }
190}
191