1 | use crate::codec::decoder::Decoder; |
2 | use crate::codec::encoder::Encoder; |
3 | use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; |
4 | |
5 | use futures_core::Stream; |
6 | use tokio::io::{AsyncRead, AsyncWrite}; |
7 | |
8 | use bytes::BytesMut; |
9 | use futures_sink::Sink; |
10 | use pin_project_lite::pin_project; |
11 | use std::fmt; |
12 | use std::io; |
13 | use std::pin::Pin; |
14 | use std::task::{Context, Poll}; |
15 | |
16 | pin_project! { |
17 | /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using |
18 | /// the `Encoder` and `Decoder` traits to encode and decode frames. |
19 | /// |
20 | /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or |
21 | /// by using the `new` function seen below. |
22 | /// |
23 | /// [`Stream`]: futures_core::Stream |
24 | /// [`Sink`]: futures_sink::Sink |
25 | /// [`AsyncRead`]: tokio::io::AsyncRead |
26 | /// [`Decoder::framed`]: crate::codec::Decoder::framed() |
27 | pub struct Framed<T, U> { |
28 | #[pin] |
29 | inner: FramedImpl<T, U, RWFrames> |
30 | } |
31 | } |
32 | |
33 | impl<T, U> Framed<T, U> |
34 | where |
35 | T: AsyncRead + AsyncWrite, |
36 | { |
37 | /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this |
38 | /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. |
39 | /// |
40 | /// Raw I/O objects work with byte sequences, but higher-level code usually |
41 | /// wants to batch these into meaningful chunks, called "frames". This |
42 | /// method layers framing on top of an I/O object, by using the codec |
43 | /// traits to handle encoding and decoding of messages frames. Note that |
44 | /// the incoming and outgoing frame types may be distinct. |
45 | /// |
46 | /// This function returns a *single* object that is both [`Stream`] and |
47 | /// [`Sink`]; grouping this into a single object is often useful for layering |
48 | /// things like gzip or TLS, which require both read and write access to the |
49 | /// underlying object. |
50 | /// |
51 | /// If you want to work more directly with the streams and sink, consider |
52 | /// calling [`split`] on the `Framed` returned by this method, which will |
53 | /// break them into separate objects, allowing them to interact more easily. |
54 | /// |
55 | /// Note that, for some byte sources, the stream can be resumed after an EOF |
56 | /// by reading from it, even after it has returned `None`. Repeated attempts |
57 | /// to do so, without new data available, continue to return `None` without |
58 | /// creating more (closing) frames. |
59 | /// |
60 | /// [`Stream`]: futures_core::Stream |
61 | /// [`Sink`]: futures_sink::Sink |
62 | /// [`Decode`]: crate::codec::Decoder |
63 | /// [`Encoder`]: crate::codec::Encoder |
64 | /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split |
65 | pub fn new(inner: T, codec: U) -> Framed<T, U> { |
66 | Framed { |
67 | inner: FramedImpl { |
68 | inner, |
69 | codec, |
70 | state: Default::default(), |
71 | }, |
72 | } |
73 | } |
74 | |
75 | /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this |
76 | /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data, |
77 | /// with a specific read buffer initial capacity. |
78 | /// |
79 | /// Raw I/O objects work with byte sequences, but higher-level code usually |
80 | /// wants to batch these into meaningful chunks, called "frames". This |
81 | /// method layers framing on top of an I/O object, by using the codec |
82 | /// traits to handle encoding and decoding of messages frames. Note that |
83 | /// the incoming and outgoing frame types may be distinct. |
84 | /// |
85 | /// This function returns a *single* object that is both [`Stream`] and |
86 | /// [`Sink`]; grouping this into a single object is often useful for layering |
87 | /// things like gzip or TLS, which require both read and write access to the |
88 | /// underlying object. |
89 | /// |
90 | /// If you want to work more directly with the streams and sink, consider |
91 | /// calling [`split`] on the `Framed` returned by this method, which will |
92 | /// break them into separate objects, allowing them to interact more easily. |
93 | /// |
94 | /// [`Stream`]: futures_core::Stream |
95 | /// [`Sink`]: futures_sink::Sink |
96 | /// [`Decode`]: crate::codec::Decoder |
97 | /// [`Encoder`]: crate::codec::Encoder |
98 | /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split |
99 | pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> { |
100 | Framed { |
101 | inner: FramedImpl { |
102 | inner, |
103 | codec, |
104 | state: RWFrames { |
105 | read: ReadFrame { |
106 | eof: false, |
107 | is_readable: false, |
108 | buffer: BytesMut::with_capacity(capacity), |
109 | has_errored: false, |
110 | }, |
111 | write: WriteFrame::default(), |
112 | }, |
113 | }, |
114 | } |
115 | } |
116 | } |
117 | |
118 | impl<T, U> Framed<T, U> { |
119 | /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this |
120 | /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. |
121 | /// |
122 | /// Raw I/O objects work with byte sequences, but higher-level code usually |
123 | /// wants to batch these into meaningful chunks, called "frames". This |
124 | /// method layers framing on top of an I/O object, by using the `Codec` |
125 | /// traits to handle encoding and decoding of messages frames. Note that |
126 | /// the incoming and outgoing frame types may be distinct. |
127 | /// |
128 | /// This function returns a *single* object that is both [`Stream`] and |
129 | /// [`Sink`]; grouping this into a single object is often useful for layering |
130 | /// things like gzip or TLS, which require both read and write access to the |
131 | /// underlying object. |
132 | /// |
133 | /// This objects takes a stream and a readbuffer and a writebuffer. These field |
134 | /// can be obtained from an existing `Framed` with the [`into_parts`] method. |
135 | /// |
136 | /// If you want to work more directly with the streams and sink, consider |
137 | /// calling [`split`] on the `Framed` returned by this method, which will |
138 | /// break them into separate objects, allowing them to interact more easily. |
139 | /// |
140 | /// [`Stream`]: futures_core::Stream |
141 | /// [`Sink`]: futures_sink::Sink |
142 | /// [`Decoder`]: crate::codec::Decoder |
143 | /// [`Encoder`]: crate::codec::Encoder |
144 | /// [`into_parts`]: crate::codec::Framed::into_parts() |
145 | /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split |
146 | pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { |
147 | Framed { |
148 | inner: FramedImpl { |
149 | inner: parts.io, |
150 | codec: parts.codec, |
151 | state: RWFrames { |
152 | read: parts.read_buf.into(), |
153 | write: parts.write_buf.into(), |
154 | }, |
155 | }, |
156 | } |
157 | } |
158 | |
159 | /// Returns a reference to the underlying I/O stream wrapped by |
160 | /// `Framed`. |
161 | /// |
162 | /// Note that care should be taken to not tamper with the underlying stream |
163 | /// of data coming in as it may corrupt the stream of frames otherwise |
164 | /// being worked with. |
165 | pub fn get_ref(&self) -> &T { |
166 | &self.inner.inner |
167 | } |
168 | |
169 | /// Returns a mutable reference to the underlying I/O stream wrapped by |
170 | /// `Framed`. |
171 | /// |
172 | /// Note that care should be taken to not tamper with the underlying stream |
173 | /// of data coming in as it may corrupt the stream of frames otherwise |
174 | /// being worked with. |
175 | pub fn get_mut(&mut self) -> &mut T { |
176 | &mut self.inner.inner |
177 | } |
178 | |
179 | /// Returns a pinned mutable reference to the underlying I/O stream wrapped by |
180 | /// `Framed`. |
181 | /// |
182 | /// Note that care should be taken to not tamper with the underlying stream |
183 | /// of data coming in as it may corrupt the stream of frames otherwise |
184 | /// being worked with. |
185 | pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { |
186 | self.project().inner.project().inner |
187 | } |
188 | |
189 | /// Returns a reference to the underlying codec wrapped by |
190 | /// `Framed`. |
191 | /// |
192 | /// Note that care should be taken to not tamper with the underlying codec |
193 | /// as it may corrupt the stream of frames otherwise being worked with. |
194 | pub fn codec(&self) -> &U { |
195 | &self.inner.codec |
196 | } |
197 | |
198 | /// Returns a mutable reference to the underlying codec wrapped by |
199 | /// `Framed`. |
200 | /// |
201 | /// Note that care should be taken to not tamper with the underlying codec |
202 | /// as it may corrupt the stream of frames otherwise being worked with. |
203 | pub fn codec_mut(&mut self) -> &mut U { |
204 | &mut self.inner.codec |
205 | } |
206 | |
207 | /// Maps the codec `U` to `C`, preserving the read and write buffers |
208 | /// wrapped by `Framed`. |
209 | /// |
210 | /// Note that care should be taken to not tamper with the underlying codec |
211 | /// as it may corrupt the stream of frames otherwise being worked with. |
212 | pub fn map_codec<C, F>(self, map: F) -> Framed<T, C> |
213 | where |
214 | F: FnOnce(U) -> C, |
215 | { |
216 | // This could be potentially simplified once rust-lang/rust#86555 hits stable |
217 | let parts = self.into_parts(); |
218 | Framed::from_parts(FramedParts { |
219 | io: parts.io, |
220 | codec: map(parts.codec), |
221 | read_buf: parts.read_buf, |
222 | write_buf: parts.write_buf, |
223 | _priv: (), |
224 | }) |
225 | } |
226 | |
227 | /// Returns a mutable reference to the underlying codec wrapped by |
228 | /// `Framed`. |
229 | /// |
230 | /// Note that care should be taken to not tamper with the underlying codec |
231 | /// as it may corrupt the stream of frames otherwise being worked with. |
232 | pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U { |
233 | self.project().inner.project().codec |
234 | } |
235 | |
236 | /// Returns a reference to the read buffer. |
237 | pub fn read_buffer(&self) -> &BytesMut { |
238 | &self.inner.state.read.buffer |
239 | } |
240 | |
241 | /// Returns a mutable reference to the read buffer. |
242 | pub fn read_buffer_mut(&mut self) -> &mut BytesMut { |
243 | &mut self.inner.state.read.buffer |
244 | } |
245 | |
246 | /// Returns a reference to the write buffer. |
247 | pub fn write_buffer(&self) -> &BytesMut { |
248 | &self.inner.state.write.buffer |
249 | } |
250 | |
251 | /// Returns a mutable reference to the write buffer. |
252 | pub fn write_buffer_mut(&mut self) -> &mut BytesMut { |
253 | &mut self.inner.state.write.buffer |
254 | } |
255 | |
256 | /// Returns backpressure boundary |
257 | pub fn backpressure_boundary(&self) -> usize { |
258 | self.inner.state.write.backpressure_boundary |
259 | } |
260 | |
261 | /// Updates backpressure boundary |
262 | pub fn set_backpressure_boundary(&mut self, boundary: usize) { |
263 | self.inner.state.write.backpressure_boundary = boundary; |
264 | } |
265 | |
266 | /// Consumes the `Framed`, returning its underlying I/O stream. |
267 | /// |
268 | /// Note that care should be taken to not tamper with the underlying stream |
269 | /// of data coming in as it may corrupt the stream of frames otherwise |
270 | /// being worked with. |
271 | pub fn into_inner(self) -> T { |
272 | self.inner.inner |
273 | } |
274 | |
275 | /// Consumes the `Framed`, returning its underlying I/O stream, the buffer |
276 | /// with unprocessed data, and the codec. |
277 | /// |
278 | /// Note that care should be taken to not tamper with the underlying stream |
279 | /// of data coming in as it may corrupt the stream of frames otherwise |
280 | /// being worked with. |
281 | pub fn into_parts(self) -> FramedParts<T, U> { |
282 | FramedParts { |
283 | io: self.inner.inner, |
284 | codec: self.inner.codec, |
285 | read_buf: self.inner.state.read.buffer, |
286 | write_buf: self.inner.state.write.buffer, |
287 | _priv: (), |
288 | } |
289 | } |
290 | } |
291 | |
292 | // This impl just defers to the underlying FramedImpl |
293 | impl<T, U> Stream for Framed<T, U> |
294 | where |
295 | T: AsyncRead, |
296 | U: Decoder, |
297 | { |
298 | type Item = Result<U::Item, U::Error>; |
299 | |
300 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
301 | self.project().inner.poll_next(cx) |
302 | } |
303 | } |
304 | |
305 | // This impl just defers to the underlying FramedImpl |
306 | impl<T, I, U> Sink<I> for Framed<T, U> |
307 | where |
308 | T: AsyncWrite, |
309 | U: Encoder<I>, |
310 | U::Error: From<io::Error>, |
311 | { |
312 | type Error = U::Error; |
313 | |
314 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
315 | self.project().inner.poll_ready(cx) |
316 | } |
317 | |
318 | fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { |
319 | self.project().inner.start_send(item) |
320 | } |
321 | |
322 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
323 | self.project().inner.poll_flush(cx) |
324 | } |
325 | |
326 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
327 | self.project().inner.poll_close(cx) |
328 | } |
329 | } |
330 | |
331 | impl<T, U> fmt::Debug for Framed<T, U> |
332 | where |
333 | T: fmt::Debug, |
334 | U: fmt::Debug, |
335 | { |
336 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
337 | f.debug_struct("Framed" ) |
338 | .field("io" , self.get_ref()) |
339 | .field("codec" , self.codec()) |
340 | .finish() |
341 | } |
342 | } |
343 | |
344 | /// `FramedParts` contains an export of the data of a Framed transport. |
345 | /// It can be used to construct a new [`Framed`] with a different codec. |
346 | /// It contains all current buffers and the inner transport. |
347 | /// |
348 | /// [`Framed`]: crate::codec::Framed |
349 | #[derive(Debug)] |
350 | #[allow (clippy::manual_non_exhaustive)] |
351 | pub struct FramedParts<T, U> { |
352 | /// The inner transport used to read bytes to and write bytes to |
353 | pub io: T, |
354 | |
355 | /// The codec |
356 | pub codec: U, |
357 | |
358 | /// The buffer with read but unprocessed data. |
359 | pub read_buf: BytesMut, |
360 | |
361 | /// A buffer with unprocessed data which are not written yet. |
362 | pub write_buf: BytesMut, |
363 | |
364 | /// This private field allows us to add additional fields in the future in a |
365 | /// backwards compatible way. |
366 | _priv: (), |
367 | } |
368 | |
369 | impl<T, U> FramedParts<T, U> { |
370 | /// Create a new, default, `FramedParts` |
371 | pub fn new<I>(io: T, codec: U) -> FramedParts<T, U> |
372 | where |
373 | U: Encoder<I>, |
374 | { |
375 | FramedParts { |
376 | io, |
377 | codec, |
378 | read_buf: BytesMut::new(), |
379 | write_buf: BytesMut::new(), |
380 | _priv: (), |
381 | } |
382 | } |
383 | } |
384 | |