1 | use std::{cmp, io}; |
2 | |
3 | use bytes::{Buf, Bytes}; |
4 | use hyper::rt::{Read, ReadBufCursor, Write}; |
5 | |
6 | use std::{ |
7 | pin::Pin, |
8 | task::{self, Poll}, |
9 | }; |
10 | |
11 | /// Combine a buffer with an IO, rewinding reads to use the buffer. |
12 | #[derive (Debug)] |
13 | pub(crate) struct Rewind<T> { |
14 | pub(crate) pre: Option<Bytes>, |
15 | pub(crate) inner: T, |
16 | } |
17 | |
18 | impl<T> Rewind<T> { |
19 | #[cfg (all(feature = "server" , any(feature = "http1" , feature = "http2" )))] |
20 | pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { |
21 | Rewind { |
22 | pre: Some(buf), |
23 | inner: io, |
24 | } |
25 | } |
26 | } |
27 | |
28 | impl<T> Read for Rewind<T> |
29 | where |
30 | T: Read + Unpin, |
31 | { |
32 | fn poll_read( |
33 | mut self: Pin<&mut Self>, |
34 | cx: &mut task::Context<'_>, |
35 | mut buf: ReadBufCursor<'_>, |
36 | ) -> Poll<io::Result<()>> { |
37 | if let Some(mut prefix: Bytes) = self.pre.take() { |
38 | // If there are no remaining bytes, let the bytes get dropped. |
39 | if !prefix.is_empty() { |
40 | let copy_len: usize = cmp::min(v1:prefix.len(), v2:remaining(&mut buf)); |
41 | // TODO: There should be a way to do following two lines cleaner... |
42 | put_slice(&mut buf, &prefix[..copy_len]); |
43 | prefix.advance(cnt:copy_len); |
44 | // Put back what's left |
45 | if !prefix.is_empty() { |
46 | self.pre = Some(prefix); |
47 | } |
48 | |
49 | return Poll::Ready(Ok(())); |
50 | } |
51 | } |
52 | Pin::new(&mut self.inner).poll_read(cx, buf) |
53 | } |
54 | } |
55 | |
56 | fn remaining(cursor: &mut ReadBufCursor<'_>) -> usize { |
57 | // SAFETY: |
58 | // We do not uninitialize any set bytes. |
59 | unsafe { cursor.as_mut().len() } |
60 | } |
61 | |
62 | // Copied from `ReadBufCursor::put_slice`. |
63 | // If that becomes public, we could ditch this. |
64 | fn put_slice(cursor: &mut ReadBufCursor<'_>, slice: &[u8]) { |
65 | assert!( |
66 | remaining(cursor) >= slice.len(), |
67 | "buf.len() must fit in remaining()" |
68 | ); |
69 | |
70 | let amt: usize = slice.len(); |
71 | |
72 | // SAFETY: |
73 | // the length is asserted above |
74 | unsafe { |
75 | cursor.as_mut()[..amt] |
76 | .as_mut_ptr() |
77 | .cast::<u8>() |
78 | .copy_from_nonoverlapping(src:slice.as_ptr(), count:amt); |
79 | cursor.advance(amt); |
80 | } |
81 | } |
82 | |
83 | impl<T> Write for Rewind<T> |
84 | where |
85 | T: Write + Unpin, |
86 | { |
87 | fn poll_write( |
88 | mut self: Pin<&mut Self>, |
89 | cx: &mut task::Context<'_>, |
90 | buf: &[u8], |
91 | ) -> Poll<io::Result<usize>> { |
92 | Pin::new(&mut self.inner).poll_write(cx, buf) |
93 | } |
94 | |
95 | fn poll_write_vectored( |
96 | mut self: Pin<&mut Self>, |
97 | cx: &mut task::Context<'_>, |
98 | bufs: &[io::IoSlice<'_>], |
99 | ) -> Poll<io::Result<usize>> { |
100 | Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) |
101 | } |
102 | |
103 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { |
104 | Pin::new(&mut self.inner).poll_flush(cx) |
105 | } |
106 | |
107 | fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { |
108 | Pin::new(&mut self.inner).poll_shutdown(cx) |
109 | } |
110 | |
111 | fn is_write_vectored(&self) -> bool { |
112 | self.inner.is_write_vectored() |
113 | } |
114 | } |
115 | |
116 | /* |
117 | #[cfg(test)] |
118 | mod tests { |
119 | use super::Rewind; |
120 | use bytes::Bytes; |
121 | use tokio::io::AsyncReadExt; |
122 | |
123 | #[cfg(not(miri))] |
124 | #[tokio::test] |
125 | async fn partial_rewind() { |
126 | let underlying = [104, 101, 108, 108, 111]; |
127 | |
128 | let mock = tokio_test::io::Builder::new().read(&underlying).build(); |
129 | |
130 | let mut stream = Rewind::new(mock); |
131 | |
132 | // Read off some bytes, ensure we filled o1 |
133 | let mut buf = [0; 2]; |
134 | stream.read_exact(&mut buf).await.expect("read1"); |
135 | |
136 | // Rewind the stream so that it is as if we never read in the first place. |
137 | stream.rewind(Bytes::copy_from_slice(&buf[..])); |
138 | |
139 | let mut buf = [0; 5]; |
140 | stream.read_exact(&mut buf).await.expect("read1"); |
141 | |
142 | // At this point we should have read everything that was in the MockStream |
143 | assert_eq!(&buf, &underlying); |
144 | } |
145 | |
146 | #[cfg(not(miri))] |
147 | #[tokio::test] |
148 | async fn full_rewind() { |
149 | let underlying = [104, 101, 108, 108, 111]; |
150 | |
151 | let mock = tokio_test::io::Builder::new().read(&underlying).build(); |
152 | |
153 | let mut stream = Rewind::new(mock); |
154 | |
155 | let mut buf = [0; 5]; |
156 | stream.read_exact(&mut buf).await.expect("read1"); |
157 | |
158 | // Rewind the stream so that it is as if we never read in the first place. |
159 | stream.rewind(Bytes::copy_from_slice(&buf[..])); |
160 | |
161 | let mut buf = [0; 5]; |
162 | stream.read_exact(&mut buf).await.expect("read1"); |
163 | } |
164 | } |
165 | */ |
166 | |