1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_core::Stream;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 ///
20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21 /// by using the `new` function seen below.
22 ///
23 /// [`Stream`]: futures_core::Stream
24 /// [`Sink`]: futures_sink::Sink
25 /// [`AsyncRead`]: tokio::io::AsyncRead
26 /// [`Decoder::framed`]: crate::codec::Decoder::framed()
27 pub struct Framed<T, U> {
28 #[pin]
29 inner: FramedImpl<T, U, RWFrames>
30 }
31}
32
33impl<T, U> Framed<T, U>
34where
35 T: AsyncRead + AsyncWrite,
36{
37 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
38 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
39 ///
40 /// Raw I/O objects work with byte sequences, but higher-level code usually
41 /// wants to batch these into meaningful chunks, called "frames". This
42 /// method layers framing on top of an I/O object, by using the codec
43 /// traits to handle encoding and decoding of messages frames. Note that
44 /// the incoming and outgoing frame types may be distinct.
45 ///
46 /// This function returns a *single* object that is both [`Stream`] and
47 /// [`Sink`]; grouping this into a single object is often useful for layering
48 /// things like gzip or TLS, which require both read and write access to the
49 /// underlying object.
50 ///
51 /// If you want to work more directly with the streams and sink, consider
52 /// calling [`split`] on the `Framed` returned by this method, which will
53 /// break them into separate objects, allowing them to interact more easily.
54 ///
55 /// Note that, for some byte sources, the stream can be resumed after an EOF
56 /// by reading from it, even after it has returned `None`. Repeated attempts
57 /// to do so, without new data available, continue to return `None` without
58 /// creating more (closing) frames.
59 ///
60 /// [`Stream`]: futures_core::Stream
61 /// [`Sink`]: futures_sink::Sink
62 /// [`Decode`]: crate::codec::Decoder
63 /// [`Encoder`]: crate::codec::Encoder
64 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
65 pub fn new(inner: T, codec: U) -> Framed<T, U> {
66 Framed {
67 inner: FramedImpl {
68 inner,
69 codec,
70 state: Default::default(),
71 },
72 }
73 }
74
75 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
76 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
77 /// with a specific read buffer initial capacity.
78 ///
79 /// Raw I/O objects work with byte sequences, but higher-level code usually
80 /// wants to batch these into meaningful chunks, called "frames". This
81 /// method layers framing on top of an I/O object, by using the codec
82 /// traits to handle encoding and decoding of messages frames. Note that
83 /// the incoming and outgoing frame types may be distinct.
84 ///
85 /// This function returns a *single* object that is both [`Stream`] and
86 /// [`Sink`]; grouping this into a single object is often useful for layering
87 /// things like gzip or TLS, which require both read and write access to the
88 /// underlying object.
89 ///
90 /// If you want to work more directly with the streams and sink, consider
91 /// calling [`split`] on the `Framed` returned by this method, which will
92 /// break them into separate objects, allowing them to interact more easily.
93 ///
94 /// [`Stream`]: futures_core::Stream
95 /// [`Sink`]: futures_sink::Sink
96 /// [`Decode`]: crate::codec::Decoder
97 /// [`Encoder`]: crate::codec::Encoder
98 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
99 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
100 Framed {
101 inner: FramedImpl {
102 inner,
103 codec,
104 state: RWFrames {
105 read: ReadFrame {
106 eof: false,
107 is_readable: false,
108 buffer: BytesMut::with_capacity(capacity),
109 has_errored: false,
110 },
111 write: WriteFrame::default(),
112 },
113 },
114 }
115 }
116}
117
118impl<T, U> Framed<T, U> {
119 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
120 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
121 ///
122 /// Raw I/O objects work with byte sequences, but higher-level code usually
123 /// wants to batch these into meaningful chunks, called "frames". This
124 /// method layers framing on top of an I/O object, by using the `Codec`
125 /// traits to handle encoding and decoding of messages frames. Note that
126 /// the incoming and outgoing frame types may be distinct.
127 ///
128 /// This function returns a *single* object that is both [`Stream`] and
129 /// [`Sink`]; grouping this into a single object is often useful for layering
130 /// things like gzip or TLS, which require both read and write access to the
131 /// underlying object.
132 ///
133 /// This objects takes a stream and a readbuffer and a writebuffer. These field
134 /// can be obtained from an existing `Framed` with the [`into_parts`] method.
135 ///
136 /// If you want to work more directly with the streams and sink, consider
137 /// calling [`split`] on the `Framed` returned by this method, which will
138 /// break them into separate objects, allowing them to interact more easily.
139 ///
140 /// [`Stream`]: futures_core::Stream
141 /// [`Sink`]: futures_sink::Sink
142 /// [`Decoder`]: crate::codec::Decoder
143 /// [`Encoder`]: crate::codec::Encoder
144 /// [`into_parts`]: crate::codec::Framed::into_parts()
145 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
146 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
147 Framed {
148 inner: FramedImpl {
149 inner: parts.io,
150 codec: parts.codec,
151 state: RWFrames {
152 read: parts.read_buf.into(),
153 write: parts.write_buf.into(),
154 },
155 },
156 }
157 }
158
159 /// Returns a reference to the underlying I/O stream wrapped by
160 /// `Framed`.
161 ///
162 /// Note that care should be taken to not tamper with the underlying stream
163 /// of data coming in as it may corrupt the stream of frames otherwise
164 /// being worked with.
165 pub fn get_ref(&self) -> &T {
166 &self.inner.inner
167 }
168
169 /// Returns a mutable reference to the underlying I/O stream wrapped by
170 /// `Framed`.
171 ///
172 /// Note that care should be taken to not tamper with the underlying stream
173 /// of data coming in as it may corrupt the stream of frames otherwise
174 /// being worked with.
175 pub fn get_mut(&mut self) -> &mut T {
176 &mut self.inner.inner
177 }
178
179 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
180 /// `Framed`.
181 ///
182 /// Note that care should be taken to not tamper with the underlying stream
183 /// of data coming in as it may corrupt the stream of frames otherwise
184 /// being worked with.
185 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
186 self.project().inner.project().inner
187 }
188
189 /// Returns a reference to the underlying codec wrapped by
190 /// `Framed`.
191 ///
192 /// Note that care should be taken to not tamper with the underlying codec
193 /// as it may corrupt the stream of frames otherwise being worked with.
194 pub fn codec(&self) -> &U {
195 &self.inner.codec
196 }
197
198 /// Returns a mutable reference to the underlying codec wrapped by
199 /// `Framed`.
200 ///
201 /// Note that care should be taken to not tamper with the underlying codec
202 /// as it may corrupt the stream of frames otherwise being worked with.
203 pub fn codec_mut(&mut self) -> &mut U {
204 &mut self.inner.codec
205 }
206
207 /// Maps the codec `U` to `C`, preserving the read and write buffers
208 /// wrapped by `Framed`.
209 ///
210 /// Note that care should be taken to not tamper with the underlying codec
211 /// as it may corrupt the stream of frames otherwise being worked with.
212 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
213 where
214 F: FnOnce(U) -> C,
215 {
216 // This could be potentially simplified once rust-lang/rust#86555 hits stable
217 let parts = self.into_parts();
218 Framed::from_parts(FramedParts {
219 io: parts.io,
220 codec: map(parts.codec),
221 read_buf: parts.read_buf,
222 write_buf: parts.write_buf,
223 _priv: (),
224 })
225 }
226
227 /// Returns a mutable reference to the underlying codec wrapped by
228 /// `Framed`.
229 ///
230 /// Note that care should be taken to not tamper with the underlying codec
231 /// as it may corrupt the stream of frames otherwise being worked with.
232 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
233 self.project().inner.project().codec
234 }
235
236 /// Returns a reference to the read buffer.
237 pub fn read_buffer(&self) -> &BytesMut {
238 &self.inner.state.read.buffer
239 }
240
241 /// Returns a mutable reference to the read buffer.
242 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
243 &mut self.inner.state.read.buffer
244 }
245
246 /// Returns a reference to the write buffer.
247 pub fn write_buffer(&self) -> &BytesMut {
248 &self.inner.state.write.buffer
249 }
250
251 /// Returns a mutable reference to the write buffer.
252 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
253 &mut self.inner.state.write.buffer
254 }
255
256 /// Returns backpressure boundary
257 pub fn backpressure_boundary(&self) -> usize {
258 self.inner.state.write.backpressure_boundary
259 }
260
261 /// Updates backpressure boundary
262 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
263 self.inner.state.write.backpressure_boundary = boundary;
264 }
265
266 /// Consumes the `Framed`, returning its underlying I/O stream.
267 ///
268 /// Note that care should be taken to not tamper with the underlying stream
269 /// of data coming in as it may corrupt the stream of frames otherwise
270 /// being worked with.
271 pub fn into_inner(self) -> T {
272 self.inner.inner
273 }
274
275 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
276 /// with unprocessed data, and the codec.
277 ///
278 /// Note that care should be taken to not tamper with the underlying stream
279 /// of data coming in as it may corrupt the stream of frames otherwise
280 /// being worked with.
281 pub fn into_parts(self) -> FramedParts<T, U> {
282 FramedParts {
283 io: self.inner.inner,
284 codec: self.inner.codec,
285 read_buf: self.inner.state.read.buffer,
286 write_buf: self.inner.state.write.buffer,
287 _priv: (),
288 }
289 }
290}
291
292// This impl just defers to the underlying FramedImpl
293impl<T, U> Stream for Framed<T, U>
294where
295 T: AsyncRead,
296 U: Decoder,
297{
298 type Item = Result<U::Item, U::Error>;
299
300 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301 self.project().inner.poll_next(cx)
302 }
303}
304
305// This impl just defers to the underlying FramedImpl
306impl<T, I, U> Sink<I> for Framed<T, U>
307where
308 T: AsyncWrite,
309 U: Encoder<I>,
310 U::Error: From<io::Error>,
311{
312 type Error = U::Error;
313
314 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
315 self.project().inner.poll_ready(cx)
316 }
317
318 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
319 self.project().inner.start_send(item)
320 }
321
322 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
323 self.project().inner.poll_flush(cx)
324 }
325
326 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
327 self.project().inner.poll_close(cx)
328 }
329}
330
331impl<T, U> fmt::Debug for Framed<T, U>
332where
333 T: fmt::Debug,
334 U: fmt::Debug,
335{
336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337 f.debug_struct("Framed")
338 .field("io", self.get_ref())
339 .field("codec", self.codec())
340 .finish()
341 }
342}
343
344/// `FramedParts` contains an export of the data of a Framed transport.
345/// It can be used to construct a new [`Framed`] with a different codec.
346/// It contains all current buffers and the inner transport.
347///
348/// [`Framed`]: crate::codec::Framed
349#[derive(Debug)]
350#[allow(clippy::manual_non_exhaustive)]
351pub struct FramedParts<T, U> {
352 /// The inner transport used to read bytes to and write bytes to
353 pub io: T,
354
355 /// The codec
356 pub codec: U,
357
358 /// The buffer with read but unprocessed data.
359 pub read_buf: BytesMut,
360
361 /// A buffer with unprocessed data which are not written yet.
362 pub write_buf: BytesMut,
363
364 /// This private field allows us to add additional fields in the future in a
365 /// backwards compatible way.
366 _priv: (),
367}
368
369impl<T, U> FramedParts<T, U> {
370 /// Create a new, default, `FramedParts`
371 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
372 where
373 U: Encoder<I>,
374 {
375 FramedParts {
376 io,
377 codec,
378 read_buf: BytesMut::new(),
379 write_buf: BytesMut::new(),
380 _priv: (),
381 }
382 }
383}
384