1use crate::codec::Framed;
2
3use tokio::io::{AsyncRead, AsyncWrite};
4
5use bytes::BytesMut;
6use std::io;
7
8/// Decoding of frames via buffers.
9///
10/// This trait is used when constructing an instance of [`Framed`] or
11/// [`FramedRead`]. An implementation of `Decoder` takes a byte stream that has
12/// already been buffered in `src` and decodes the data into a stream of
13/// `Self::Item` frames.
14///
15/// Implementations are able to track state on `self`, which enables
16/// implementing stateful streaming parsers. In many cases, though, this type
17/// will simply be a unit struct (e.g. `struct HttpDecoder`).
18///
19/// For some underlying data-sources, namely files and FIFOs,
20/// it's possible to temporarily read 0 bytes by reaching EOF.
21///
22/// In these cases `decode_eof` will be called until it signals
23/// fullfillment of all closing frames by returning `Ok(None)`.
24/// After that, repeated attempts to read from the [`Framed`] or [`FramedRead`]
25/// will not invoke `decode` or `decode_eof` again, until data can be read
26/// during a retry.
27///
28/// It is up to the Decoder to keep track of a restart after an EOF,
29/// and to decide how to handle such an event by, for example,
30/// allowing frames to cross EOF boundaries, re-emitting opening frames, or
31/// resetting the entire internal state.
32///
33/// [`Framed`]: crate::codec::Framed
34/// [`FramedRead`]: crate::codec::FramedRead
35pub trait Decoder {
36 /// The type of decoded frames.
37 type Item;
38
39 /// The type of unrecoverable frame decoding errors.
40 ///
41 /// If an individual message is ill-formed but can be ignored without
42 /// interfering with the processing of future messages, it may be more
43 /// useful to report the failure as an `Item`.
44 ///
45 /// `From<io::Error>` is required in the interest of making `Error` suitable
46 /// for returning directly from a [`FramedRead`], and to enable the default
47 /// implementation of `decode_eof` to yield an `io::Error` when the decoder
48 /// fails to consume all available data.
49 ///
50 /// Note that implementors of this trait can simply indicate `type Error =
51 /// io::Error` to use I/O errors as this type.
52 ///
53 /// [`FramedRead`]: crate::codec::FramedRead
54 type Error: From<io::Error>;
55
56 /// Attempts to decode a frame from the provided buffer of bytes.
57 ///
58 /// This method is called by [`FramedRead`] whenever bytes are ready to be
59 /// parsed. The provided buffer of bytes is what's been read so far, and
60 /// this instance of `Decode` can determine whether an entire frame is in
61 /// the buffer and is ready to be returned.
62 ///
63 /// If an entire frame is available, then this instance will remove those
64 /// bytes from the buffer provided and return them as a decoded
65 /// frame. Note that removing bytes from the provided buffer doesn't always
66 /// necessarily copy the bytes, so this should be an efficient operation in
67 /// most circumstances.
68 ///
69 /// If the bytes look valid, but a frame isn't fully available yet, then
70 /// `Ok(None)` is returned. This indicates to the [`Framed`] instance that
71 /// it needs to read some more bytes before calling this method again.
72 ///
73 /// Note that the bytes provided may be empty. If a previous call to
74 /// `decode` consumed all the bytes in the buffer then `decode` will be
75 /// called again until it returns `Ok(None)`, indicating that more bytes need to
76 /// be read.
77 ///
78 /// Finally, if the bytes in the buffer are malformed then an error is
79 /// returned indicating why. This informs [`Framed`] that the stream is now
80 /// corrupt and should be terminated.
81 ///
82 /// [`Framed`]: crate::codec::Framed
83 /// [`FramedRead`]: crate::codec::FramedRead
84 ///
85 /// # Buffer management
86 ///
87 /// Before returning from the function, implementations should ensure that
88 /// the buffer has appropriate capacity in anticipation of future calls to
89 /// `decode`. Failing to do so leads to inefficiency.
90 ///
91 /// For example, if frames have a fixed length, or if the length of the
92 /// current frame is known from a header, a possible buffer management
93 /// strategy is:
94 ///
95 /// ```no_run
96 /// # use std::io;
97 /// #
98 /// # use bytes::BytesMut;
99 /// # use tokio_util::codec::Decoder;
100 /// #
101 /// # struct MyCodec;
102 /// #
103 /// impl Decoder for MyCodec {
104 /// // ...
105 /// # type Item = BytesMut;
106 /// # type Error = io::Error;
107 ///
108 /// fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
109 /// // ...
110 ///
111 /// // Reserve enough to complete decoding of the current frame.
112 /// let current_frame_len: usize = 1000; // Example.
113 /// // And to start decoding the next frame.
114 /// let next_frame_header_len: usize = 10; // Example.
115 /// src.reserve(current_frame_len + next_frame_header_len);
116 ///
117 /// return Ok(None);
118 /// }
119 /// }
120 /// ```
121 ///
122 /// An optimal buffer management strategy minimizes reallocations and
123 /// over-allocations.
124 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
125
126 /// A default method available to be called when there are no more bytes
127 /// available to be read from the underlying I/O.
128 ///
129 /// This method defaults to calling `decode` and returns an error if
130 /// `Ok(None)` is returned while there is unconsumed data in `buf`.
131 /// Typically this doesn't need to be implemented unless the framing
132 /// protocol differs near the end of the stream, or if you need to construct
133 /// frames _across_ eof boundaries on sources that can be resumed.
134 ///
135 /// Note that the `buf` argument may be empty. If a previous call to
136 /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
137 /// called again until it returns `None`, indicating that there are no more
138 /// frames to yield. This behavior enables returning finalization frames
139 /// that may not be based on inbound data.
140 ///
141 /// Once `None` has been returned, `decode_eof` won't be called again until
142 /// an attempt to resume the stream has been made, where the underlying stream
143 /// actually returned more data.
144 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
145 match self.decode(buf)? {
146 Some(frame) => Ok(Some(frame)),
147 None => {
148 if buf.is_empty() {
149 Ok(None)
150 } else {
151 Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
152 }
153 }
154 }
155 }
156
157 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
158 /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
159 ///
160 /// Raw I/O objects work with byte sequences, but higher-level code usually
161 /// wants to batch these into meaningful chunks, called "frames". This
162 /// method layers framing on top of an I/O object, by using the `Codec`
163 /// traits to handle encoding and decoding of messages frames. Note that
164 /// the incoming and outgoing frame types may be distinct.
165 ///
166 /// This function returns a *single* object that is both `Stream` and
167 /// `Sink`; grouping this into a single object is often useful for layering
168 /// things like gzip or TLS, which require both read and write access to the
169 /// underlying object.
170 ///
171 /// If you want to work more directly with the streams and sink, consider
172 /// calling `split` on the [`Framed`] returned by this method, which will
173 /// break them into separate objects, allowing them to interact more easily.
174 ///
175 /// [`Stream`]: futures_core::Stream
176 /// [`Sink`]: futures_sink::Sink
177 /// [`Framed`]: crate::codec::Framed
178 fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
179 where
180 Self: Sized,
181 {
182 Framed::new(io, self)
183 }
184}
185