1 | use crate::codec::decoder::Decoder; |
2 | use crate::codec::encoder::Encoder; |
3 | use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; |
4 | |
5 | use futures_core::Stream; |
6 | use tokio::io::{AsyncRead, AsyncWrite}; |
7 | |
8 | use bytes::BytesMut; |
9 | use futures_sink::Sink; |
10 | use pin_project_lite::pin_project; |
11 | use std::fmt; |
12 | use std::io; |
13 | use std::pin::Pin; |
14 | use std::task::{Context, Poll}; |
15 | |
16 | pin_project! { |
17 | /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using |
18 | /// the `Encoder` and `Decoder` traits to encode and decode frames. |
19 | /// |
20 | /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or |
21 | /// by using the `new` function seen below. |
22 | /// |
23 | /// # Cancellation safety |
24 | /// |
25 | /// * [`futures_util::sink::SinkExt::send`]: if send is used as the event in a |
26 | /// `tokio::select!` statement and some other branch completes first, then it is |
27 | /// guaranteed that the message was not sent, but the message itself is lost. |
28 | /// * [`tokio_stream::StreamExt::next`]: This method is cancel safe. The returned |
29 | /// future only holds onto a reference to the underlying stream, so dropping it will |
30 | /// never lose a value. |
31 | /// |
32 | /// [`Stream`]: futures_core::Stream |
33 | /// [`Sink`]: futures_sink::Sink |
34 | /// [`AsyncRead`]: tokio::io::AsyncRead |
35 | /// [`Decoder::framed`]: crate::codec::Decoder::framed() |
36 | /// [`futures_util::sink::SinkExt::send`]: futures_util::sink::SinkExt::send |
37 | /// [`tokio_stream::StreamExt::next`]: https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html#method.next |
38 | pub struct Framed<T, U> { |
39 | #[pin] |
40 | inner: FramedImpl<T, U, RWFrames> |
41 | } |
42 | } |
43 | |
44 | impl<T, U> Framed<T, U> |
45 | where |
46 | T: AsyncRead + AsyncWrite, |
47 | { |
48 | /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this |
49 | /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. |
50 | /// |
51 | /// Raw I/O objects work with byte sequences, but higher-level code usually |
52 | /// wants to batch these into meaningful chunks, called "frames". This |
53 | /// method layers framing on top of an I/O object, by using the codec |
54 | /// traits to handle encoding and decoding of messages frames. Note that |
55 | /// the incoming and outgoing frame types may be distinct. |
56 | /// |
57 | /// This function returns a *single* object that is both [`Stream`] and |
58 | /// [`Sink`]; grouping this into a single object is often useful for layering |
59 | /// things like gzip or TLS, which require both read and write access to the |
60 | /// underlying object. |
61 | /// |
62 | /// If you want to work more directly with the streams and sink, consider |
63 | /// calling [`split`] on the `Framed` returned by this method, which will |
64 | /// break them into separate objects, allowing them to interact more easily. |
65 | /// |
66 | /// Note that, for some byte sources, the stream can be resumed after an EOF |
67 | /// by reading from it, even after it has returned `None`. Repeated attempts |
68 | /// to do so, without new data available, continue to return `None` without |
69 | /// creating more (closing) frames. |
70 | /// |
71 | /// [`Stream`]: futures_core::Stream |
72 | /// [`Sink`]: futures_sink::Sink |
73 | /// [`Decode`]: crate::codec::Decoder |
74 | /// [`Encoder`]: crate::codec::Encoder |
75 | /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split |
76 | pub fn new(inner: T, codec: U) -> Framed<T, U> { |
77 | Framed { |
78 | inner: FramedImpl { |
79 | inner, |
80 | codec, |
81 | state: Default::default(), |
82 | }, |
83 | } |
84 | } |
85 | |
86 | /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this |
87 | /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data, |
88 | /// with a specific read buffer initial capacity. |
89 | /// |
90 | /// Raw I/O objects work with byte sequences, but higher-level code usually |
91 | /// wants to batch these into meaningful chunks, called "frames". This |
92 | /// method layers framing on top of an I/O object, by using the codec |
93 | /// traits to handle encoding and decoding of messages frames. Note that |
94 | /// the incoming and outgoing frame types may be distinct. |
95 | /// |
96 | /// This function returns a *single* object that is both [`Stream`] and |
97 | /// [`Sink`]; grouping this into a single object is often useful for layering |
98 | /// things like gzip or TLS, which require both read and write access to the |
99 | /// underlying object. |
100 | /// |
101 | /// If you want to work more directly with the streams and sink, consider |
102 | /// calling [`split`] on the `Framed` returned by this method, which will |
103 | /// break them into separate objects, allowing them to interact more easily. |
104 | /// |
105 | /// [`Stream`]: futures_core::Stream |
106 | /// [`Sink`]: futures_sink::Sink |
107 | /// [`Decode`]: crate::codec::Decoder |
108 | /// [`Encoder`]: crate::codec::Encoder |
109 | /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split |
110 | pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> { |
111 | Framed { |
112 | inner: FramedImpl { |
113 | inner, |
114 | codec, |
115 | state: RWFrames { |
116 | read: ReadFrame { |
117 | eof: false, |
118 | is_readable: false, |
119 | buffer: BytesMut::with_capacity(capacity), |
120 | has_errored: false, |
121 | }, |
122 | write: WriteFrame::default(), |
123 | }, |
124 | }, |
125 | } |
126 | } |
127 | } |
128 | |
129 | impl<T, U> Framed<T, U> { |
130 | /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this |
131 | /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data. |
132 | /// |
133 | /// Raw I/O objects work with byte sequences, but higher-level code usually |
134 | /// wants to batch these into meaningful chunks, called "frames". This |
135 | /// method layers framing on top of an I/O object, by using the `Codec` |
136 | /// traits to handle encoding and decoding of messages frames. Note that |
137 | /// the incoming and outgoing frame types may be distinct. |
138 | /// |
139 | /// This function returns a *single* object that is both [`Stream`] and |
140 | /// [`Sink`]; grouping this into a single object is often useful for layering |
141 | /// things like gzip or TLS, which require both read and write access to the |
142 | /// underlying object. |
143 | /// |
144 | /// This objects takes a stream and a `readbuffer` and a `writebuffer`. These field |
145 | /// can be obtained from an existing `Framed` with the [`into_parts`] method. |
146 | /// |
147 | /// If you want to work more directly with the streams and sink, consider |
148 | /// calling [`split`] on the `Framed` returned by this method, which will |
149 | /// break them into separate objects, allowing them to interact more easily. |
150 | /// |
151 | /// [`Stream`]: futures_core::Stream |
152 | /// [`Sink`]: futures_sink::Sink |
153 | /// [`Decoder`]: crate::codec::Decoder |
154 | /// [`Encoder`]: crate::codec::Encoder |
155 | /// [`into_parts`]: crate::codec::Framed::into_parts() |
156 | /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split |
157 | pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { |
158 | Framed { |
159 | inner: FramedImpl { |
160 | inner: parts.io, |
161 | codec: parts.codec, |
162 | state: RWFrames { |
163 | read: parts.read_buf.into(), |
164 | write: parts.write_buf.into(), |
165 | }, |
166 | }, |
167 | } |
168 | } |
169 | |
170 | /// Returns a reference to the underlying I/O stream wrapped by |
171 | /// `Framed`. |
172 | /// |
173 | /// Note that care should be taken to not tamper with the underlying stream |
174 | /// of data coming in as it may corrupt the stream of frames otherwise |
175 | /// being worked with. |
176 | pub fn get_ref(&self) -> &T { |
177 | &self.inner.inner |
178 | } |
179 | |
180 | /// Returns a mutable reference to the underlying I/O stream wrapped by |
181 | /// `Framed`. |
182 | /// |
183 | /// Note that care should be taken to not tamper with the underlying stream |
184 | /// of data coming in as it may corrupt the stream of frames otherwise |
185 | /// being worked with. |
186 | pub fn get_mut(&mut self) -> &mut T { |
187 | &mut self.inner.inner |
188 | } |
189 | |
190 | /// Returns a pinned mutable reference to the underlying I/O stream wrapped by |
191 | /// `Framed`. |
192 | /// |
193 | /// Note that care should be taken to not tamper with the underlying stream |
194 | /// of data coming in as it may corrupt the stream of frames otherwise |
195 | /// being worked with. |
196 | pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> { |
197 | self.project().inner.project().inner |
198 | } |
199 | |
200 | /// Returns a reference to the underlying codec wrapped by |
201 | /// `Framed`. |
202 | /// |
203 | /// Note that care should be taken to not tamper with the underlying codec |
204 | /// as it may corrupt the stream of frames otherwise being worked with. |
205 | pub fn codec(&self) -> &U { |
206 | &self.inner.codec |
207 | } |
208 | |
209 | /// Returns a mutable reference to the underlying codec wrapped by |
210 | /// `Framed`. |
211 | /// |
212 | /// Note that care should be taken to not tamper with the underlying codec |
213 | /// as it may corrupt the stream of frames otherwise being worked with. |
214 | pub fn codec_mut(&mut self) -> &mut U { |
215 | &mut self.inner.codec |
216 | } |
217 | |
218 | /// Maps the codec `U` to `C`, preserving the read and write buffers |
219 | /// wrapped by `Framed`. |
220 | /// |
221 | /// Note that care should be taken to not tamper with the underlying codec |
222 | /// as it may corrupt the stream of frames otherwise being worked with. |
223 | pub fn map_codec<C, F>(self, map: F) -> Framed<T, C> |
224 | where |
225 | F: FnOnce(U) -> C, |
226 | { |
227 | // This could be potentially simplified once rust-lang/rust#86555 hits stable |
228 | let parts = self.into_parts(); |
229 | Framed::from_parts(FramedParts { |
230 | io: parts.io, |
231 | codec: map(parts.codec), |
232 | read_buf: parts.read_buf, |
233 | write_buf: parts.write_buf, |
234 | _priv: (), |
235 | }) |
236 | } |
237 | |
238 | /// Returns a mutable reference to the underlying codec wrapped by |
239 | /// `Framed`. |
240 | /// |
241 | /// Note that care should be taken to not tamper with the underlying codec |
242 | /// as it may corrupt the stream of frames otherwise being worked with. |
243 | pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U { |
244 | self.project().inner.project().codec |
245 | } |
246 | |
247 | /// Returns a reference to the read buffer. |
248 | pub fn read_buffer(&self) -> &BytesMut { |
249 | &self.inner.state.read.buffer |
250 | } |
251 | |
252 | /// Returns a mutable reference to the read buffer. |
253 | pub fn read_buffer_mut(&mut self) -> &mut BytesMut { |
254 | &mut self.inner.state.read.buffer |
255 | } |
256 | |
257 | /// Returns a reference to the write buffer. |
258 | pub fn write_buffer(&self) -> &BytesMut { |
259 | &self.inner.state.write.buffer |
260 | } |
261 | |
262 | /// Returns a mutable reference to the write buffer. |
263 | pub fn write_buffer_mut(&mut self) -> &mut BytesMut { |
264 | &mut self.inner.state.write.buffer |
265 | } |
266 | |
267 | /// Returns backpressure boundary |
268 | pub fn backpressure_boundary(&self) -> usize { |
269 | self.inner.state.write.backpressure_boundary |
270 | } |
271 | |
272 | /// Updates backpressure boundary |
273 | pub fn set_backpressure_boundary(&mut self, boundary: usize) { |
274 | self.inner.state.write.backpressure_boundary = boundary; |
275 | } |
276 | |
277 | /// Consumes the `Framed`, returning its underlying I/O stream. |
278 | /// |
279 | /// Note that care should be taken to not tamper with the underlying stream |
280 | /// of data coming in as it may corrupt the stream of frames otherwise |
281 | /// being worked with. |
282 | pub fn into_inner(self) -> T { |
283 | self.inner.inner |
284 | } |
285 | |
286 | /// Consumes the `Framed`, returning its underlying I/O stream, the buffer |
287 | /// with unprocessed data, and the codec. |
288 | /// |
289 | /// Note that care should be taken to not tamper with the underlying stream |
290 | /// of data coming in as it may corrupt the stream of frames otherwise |
291 | /// being worked with. |
292 | pub fn into_parts(self) -> FramedParts<T, U> { |
293 | FramedParts { |
294 | io: self.inner.inner, |
295 | codec: self.inner.codec, |
296 | read_buf: self.inner.state.read.buffer, |
297 | write_buf: self.inner.state.write.buffer, |
298 | _priv: (), |
299 | } |
300 | } |
301 | } |
302 | |
303 | // This impl just defers to the underlying FramedImpl |
304 | impl<T, U> Stream for Framed<T, U> |
305 | where |
306 | T: AsyncRead, |
307 | U: Decoder, |
308 | { |
309 | type Item = Result<U::Item, U::Error>; |
310 | |
311 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
312 | self.project().inner.poll_next(cx) |
313 | } |
314 | } |
315 | |
316 | // This impl just defers to the underlying FramedImpl |
317 | impl<T, I, U> Sink<I> for Framed<T, U> |
318 | where |
319 | T: AsyncWrite, |
320 | U: Encoder<I>, |
321 | U::Error: From<io::Error>, |
322 | { |
323 | type Error = U::Error; |
324 | |
325 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
326 | self.project().inner.poll_ready(cx) |
327 | } |
328 | |
329 | fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { |
330 | self.project().inner.start_send(item) |
331 | } |
332 | |
333 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
334 | self.project().inner.poll_flush(cx) |
335 | } |
336 | |
337 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
338 | self.project().inner.poll_close(cx) |
339 | } |
340 | } |
341 | |
342 | impl<T, U> fmt::Debug for Framed<T, U> |
343 | where |
344 | T: fmt::Debug, |
345 | U: fmt::Debug, |
346 | { |
347 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
348 | f&mut DebugStruct<'_, '_>.debug_struct("Framed" ) |
349 | .field("io" , self.get_ref()) |
350 | .field(name:"codec" , self.codec()) |
351 | .finish() |
352 | } |
353 | } |
354 | |
355 | /// `FramedParts` contains an export of the data of a Framed transport. |
356 | /// It can be used to construct a new [`Framed`] with a different codec. |
357 | /// It contains all current buffers and the inner transport. |
358 | /// |
359 | /// [`Framed`]: crate::codec::Framed |
360 | #[derive (Debug)] |
361 | #[allow (clippy::manual_non_exhaustive)] |
362 | pub struct FramedParts<T, U> { |
363 | /// The inner transport used to read bytes to and write bytes to |
364 | pub io: T, |
365 | |
366 | /// The codec |
367 | pub codec: U, |
368 | |
369 | /// The buffer with read but unprocessed data. |
370 | pub read_buf: BytesMut, |
371 | |
372 | /// A buffer with unprocessed data which are not written yet. |
373 | pub write_buf: BytesMut, |
374 | |
375 | /// This private field allows us to add additional fields in the future in a |
376 | /// backwards compatible way. |
377 | _priv: (), |
378 | } |
379 | |
380 | impl<T, U> FramedParts<T, U> { |
381 | /// Create a new, default, `FramedParts` |
382 | pub fn new<I>(io: T, codec: U) -> FramedParts<T, U> |
383 | where |
384 | U: Encoder<I>, |
385 | { |
386 | FramedParts { |
387 | io, |
388 | codec, |
389 | read_buf: BytesMut::new(), |
390 | write_buf: BytesMut::new(), |
391 | _priv: (), |
392 | } |
393 | } |
394 | } |
395 | |