1use crate::codec::decoder::Decoder;
2use crate::codec::encoder::Encoder;
3use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_core::Stream;
6use tokio::io::{AsyncRead, AsyncWrite};
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 ///
20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21 /// by using the `new` function seen below.
22 ///
23 /// [`Stream`]: futures_core::Stream
24 /// [`Sink`]: futures_sink::Sink
25 /// [`AsyncRead`]: tokio::io::AsyncRead
26 /// [`Decoder::framed`]: crate::codec::Decoder::framed()
27 pub struct Framed<T, U> {
28 #[pin]
29 inner: FramedImpl<T, U, RWFrames>
30 }
31}
32
33impl<T, U> Framed<T, U>
34where
35 T: AsyncRead + AsyncWrite,
36{
37 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
38 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
39 ///
40 /// Raw I/O objects work with byte sequences, but higher-level code usually
41 /// wants to batch these into meaningful chunks, called "frames". This
42 /// method layers framing on top of an I/O object, by using the codec
43 /// traits to handle encoding and decoding of messages frames. Note that
44 /// the incoming and outgoing frame types may be distinct.
45 ///
46 /// This function returns a *single* object that is both [`Stream`] and
47 /// [`Sink`]; grouping this into a single object is often useful for layering
48 /// things like gzip or TLS, which require both read and write access to the
49 /// underlying object.
50 ///
51 /// If you want to work more directly with the streams and sink, consider
52 /// calling [`split`] on the `Framed` returned by this method, which will
53 /// break them into separate objects, allowing them to interact more easily.
54 ///
55 /// Note that, for some byte sources, the stream can be resumed after an EOF
56 /// by reading from it, even after it has returned `None`. Repeated attempts
57 /// to do so, without new data available, continue to return `None` without
58 /// creating more (closing) frames.
59 ///
60 /// [`Stream`]: futures_core::Stream
61 /// [`Sink`]: futures_sink::Sink
62 /// [`Decode`]: crate::codec::Decoder
63 /// [`Encoder`]: crate::codec::Encoder
64 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
65 pub fn new(inner: T, codec: U) -> Framed<T, U> {
66 Framed {
67 inner: FramedImpl {
68 inner,
69 codec,
70 state: Default::default(),
71 },
72 }
73 }
74
75 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
76 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
77 /// with a specific read buffer initial capacity.
78 ///
79 /// Raw I/O objects work with byte sequences, but higher-level code usually
80 /// wants to batch these into meaningful chunks, called "frames". This
81 /// method layers framing on top of an I/O object, by using the codec
82 /// traits to handle encoding and decoding of messages frames. Note that
83 /// the incoming and outgoing frame types may be distinct.
84 ///
85 /// This function returns a *single* object that is both [`Stream`] and
86 /// [`Sink`]; grouping this into a single object is often useful for layering
87 /// things like gzip or TLS, which require both read and write access to the
88 /// underlying object.
89 ///
90 /// If you want to work more directly with the streams and sink, consider
91 /// calling [`split`] on the `Framed` returned by this method, which will
92 /// break them into separate objects, allowing them to interact more easily.
93 ///
94 /// [`Stream`]: futures_core::Stream
95 /// [`Sink`]: futures_sink::Sink
96 /// [`Decode`]: crate::codec::Decoder
97 /// [`Encoder`]: crate::codec::Encoder
98 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
99 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
100 Framed {
101 inner: FramedImpl {
102 inner,
103 codec,
104 state: RWFrames {
105 read: ReadFrame {
106 eof: false,
107 is_readable: false,
108 buffer: BytesMut::with_capacity(capacity),
109 has_errored: false,
110 },
111 write: WriteFrame::default(),
112 },
113 },
114 }
115 }
116}
117
118impl<T, U> Framed<T, U> {
119 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
120 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
121 ///
122 /// Raw I/O objects work with byte sequences, but higher-level code usually
123 /// wants to batch these into meaningful chunks, called "frames". This
124 /// method layers framing on top of an I/O object, by using the `Codec`
125 /// traits to handle encoding and decoding of messages frames. Note that
126 /// the incoming and outgoing frame types may be distinct.
127 ///
128 /// This function returns a *single* object that is both [`Stream`] and
129 /// [`Sink`]; grouping this into a single object is often useful for layering
130 /// things like gzip or TLS, which require both read and write access to the
131 /// underlying object.
132 ///
133 /// This objects takes a stream and a readbuffer and a writebuffer. These field
134 /// can be obtained from an existing `Framed` with the [`into_parts`] method.
135 ///
136 /// If you want to work more directly with the streams and sink, consider
137 /// calling [`split`] on the `Framed` returned by this method, which will
138 /// break them into separate objects, allowing them to interact more easily.
139 ///
140 /// [`Stream`]: futures_core::Stream
141 /// [`Sink`]: futures_sink::Sink
142 /// [`Decoder`]: crate::codec::Decoder
143 /// [`Encoder`]: crate::codec::Encoder
144 /// [`into_parts`]: crate::codec::Framed::into_parts()
145 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
146 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
147 Framed {
148 inner: FramedImpl {
149 inner: parts.io,
150 codec: parts.codec,
151 state: RWFrames {
152 read: parts.read_buf.into(),
153 write: parts.write_buf.into(),
154 },
155 },
156 }
157 }
158
159 /// Returns a reference to the underlying I/O stream wrapped by
160 /// `Framed`.
161 ///
162 /// Note that care should be taken to not tamper with the underlying stream
163 /// of data coming in as it may corrupt the stream of frames otherwise
164 /// being worked with.
165 pub fn get_ref(&self) -> &T {
166 &self.inner.inner
167 }
168
169 /// Returns a mutable reference to the underlying I/O stream wrapped by
170 /// `Framed`.
171 ///
172 /// Note that care should be taken to not tamper with the underlying stream
173 /// of data coming in as it may corrupt the stream of frames otherwise
174 /// being worked with.
175 pub fn get_mut(&mut self) -> &mut T {
176 &mut self.inner.inner
177 }
178
179 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
180 /// `Framed`.
181 ///
182 /// Note that care should be taken to not tamper with the underlying stream
183 /// of data coming in as it may corrupt the stream of frames otherwise
184 /// being worked with.
185 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
186 self.project().inner.project().inner
187 }
188
189 /// Returns a reference to the underlying codec wrapped by
190 /// `Framed`.
191 ///
192 /// Note that care should be taken to not tamper with the underlying codec
193 /// as it may corrupt the stream of frames otherwise being worked with.
194 pub fn codec(&self) -> &U {
195 &self.inner.codec
196 }
197
198 /// Returns a mutable reference to the underlying codec wrapped by
199 /// `Framed`.
200 ///
201 /// Note that care should be taken to not tamper with the underlying codec
202 /// as it may corrupt the stream of frames otherwise being worked with.
203 pub fn codec_mut(&mut self) -> &mut U {
204 &mut self.inner.codec
205 }
206
207 /// Maps the codec `U` to `C`, preserving the read and write buffers
208 /// wrapped by `Framed`.
209 ///
210 /// Note that care should be taken to not tamper with the underlying codec
211 /// as it may corrupt the stream of frames otherwise being worked with.
212 pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
213 where
214 F: FnOnce(U) -> C,
215 {
216 // This could be potentially simplified once rust-lang/rust#86555 hits stable
217 let parts = self.into_parts();
218 Framed::from_parts(FramedParts {
219 io: parts.io,
220 codec: map(parts.codec),
221 read_buf: parts.read_buf,
222 write_buf: parts.write_buf,
223 _priv: (),
224 })
225 }
226
227 /// Returns a mutable reference to the underlying codec wrapped by
228 /// `Framed`.
229 ///
230 /// Note that care should be taken to not tamper with the underlying codec
231 /// as it may corrupt the stream of frames otherwise being worked with.
232 pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
233 self.project().inner.project().codec
234 }
235
236 /// Returns a reference to the read buffer.
237 pub fn read_buffer(&self) -> &BytesMut {
238 &self.inner.state.read.buffer
239 }
240
241 /// Returns a mutable reference to the read buffer.
242 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
243 &mut self.inner.state.read.buffer
244 }
245
246 /// Returns a reference to the write buffer.
247 pub fn write_buffer(&self) -> &BytesMut {
248 &self.inner.state.write.buffer
249 }
250
251 /// Returns a mutable reference to the write buffer.
252 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
253 &mut self.inner.state.write.buffer
254 }
255
256 /// Consumes the `Framed`, returning its underlying I/O stream.
257 ///
258 /// Note that care should be taken to not tamper with the underlying stream
259 /// of data coming in as it may corrupt the stream of frames otherwise
260 /// being worked with.
261 pub fn into_inner(self) -> T {
262 self.inner.inner
263 }
264
265 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
266 /// with unprocessed data, and the codec.
267 ///
268 /// Note that care should be taken to not tamper with the underlying stream
269 /// of data coming in as it may corrupt the stream of frames otherwise
270 /// being worked with.
271 pub fn into_parts(self) -> FramedParts<T, U> {
272 FramedParts {
273 io: self.inner.inner,
274 codec: self.inner.codec,
275 read_buf: self.inner.state.read.buffer,
276 write_buf: self.inner.state.write.buffer,
277 _priv: (),
278 }
279 }
280}
281
282// This impl just defers to the underlying FramedImpl
283impl<T, U> Stream for Framed<T, U>
284where
285 T: AsyncRead,
286 U: Decoder,
287{
288 type Item = Result<U::Item, U::Error>;
289
290 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
291 self.project().inner.poll_next(cx)
292 }
293}
294
295// This impl just defers to the underlying FramedImpl
296impl<T, I, U> Sink<I> for Framed<T, U>
297where
298 T: AsyncWrite,
299 U: Encoder<I>,
300 U::Error: From<io::Error>,
301{
302 type Error = U::Error;
303
304 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
305 self.project().inner.poll_ready(cx)
306 }
307
308 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
309 self.project().inner.start_send(item)
310 }
311
312 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
313 self.project().inner.poll_flush(cx)
314 }
315
316 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
317 self.project().inner.poll_close(cx)
318 }
319}
320
321impl<T, U> fmt::Debug for Framed<T, U>
322where
323 T: fmt::Debug,
324 U: fmt::Debug,
325{
326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327 f&mut DebugStruct<'_, '_>.debug_struct("Framed")
328 .field("io", self.get_ref())
329 .field(name:"codec", self.codec())
330 .finish()
331 }
332}
333
334/// `FramedParts` contains an export of the data of a Framed transport.
335/// It can be used to construct a new [`Framed`] with a different codec.
336/// It contains all current buffers and the inner transport.
337///
338/// [`Framed`]: crate::codec::Framed
339#[derive(Debug)]
340#[allow(clippy::manual_non_exhaustive)]
341pub struct FramedParts<T, U> {
342 /// The inner transport used to read bytes to and write bytes to
343 pub io: T,
344
345 /// The codec
346 pub codec: U,
347
348 /// The buffer with read but unprocessed data.
349 pub read_buf: BytesMut,
350
351 /// A buffer with unprocessed data which are not written yet.
352 pub write_buf: BytesMut,
353
354 /// This private field allows us to add additional fields in the future in a
355 /// backwards compatible way.
356 _priv: (),
357}
358
359impl<T, U> FramedParts<T, U> {
360 /// Create a new, default, `FramedParts`
361 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
362 where
363 U: Encoder<I>,
364 {
365 FramedParts {
366 io,
367 codec,
368 read_buf: BytesMut::new(),
369 write_buf: BytesMut::new(),
370 _priv: (),
371 }
372 }
373}
374