1use crate::codec::framed_impl::{FramedImpl, ReadFrame};
2use crate::codec::Decoder;
3
4use futures_core::Stream;
5use tokio::io::AsyncRead;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14pin_project! {
15 /// A [`Stream`] of messages decoded from an [`AsyncRead`].
16 ///
17 /// [`Stream`]: futures_core::Stream
18 /// [`AsyncRead`]: tokio::io::AsyncRead
19 pub struct FramedRead<T, D> {
20 #[pin]
21 inner: FramedImpl<T, D, ReadFrame>,
22 }
23}
24
25// ===== impl FramedRead =====
26
27impl<T, D> FramedRead<T, D>
28where
29 T: AsyncRead,
30 D: Decoder,
31{
32 /// Creates a new `FramedRead` with the given `decoder`.
33 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
34 FramedRead {
35 inner: FramedImpl {
36 inner,
37 codec: decoder,
38 state: Default::default(),
39 },
40 }
41 }
42
43 /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity`
44 /// initial size.
45 pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
46 FramedRead {
47 inner: FramedImpl {
48 inner,
49 codec: decoder,
50 state: ReadFrame {
51 eof: false,
52 is_readable: false,
53 buffer: BytesMut::with_capacity(capacity),
54 has_errored: false,
55 },
56 },
57 }
58 }
59}
60
61impl<T, D> FramedRead<T, D> {
62 /// Returns a reference to the underlying I/O stream wrapped by
63 /// `FramedRead`.
64 ///
65 /// Note that care should be taken to not tamper with the underlying stream
66 /// of data coming in as it may corrupt the stream of frames otherwise
67 /// being worked with.
68 pub fn get_ref(&self) -> &T {
69 &self.inner.inner
70 }
71
72 /// Returns a mutable reference to the underlying I/O stream wrapped by
73 /// `FramedRead`.
74 ///
75 /// Note that care should be taken to not tamper with the underlying stream
76 /// of data coming in as it may corrupt the stream of frames otherwise
77 /// being worked with.
78 pub fn get_mut(&mut self) -> &mut T {
79 &mut self.inner.inner
80 }
81
82 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
83 /// `FramedRead`.
84 ///
85 /// Note that care should be taken to not tamper with the underlying stream
86 /// of data coming in as it may corrupt the stream of frames otherwise
87 /// being worked with.
88 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
89 self.project().inner.project().inner
90 }
91
92 /// Consumes the `FramedRead`, returning its underlying I/O stream.
93 ///
94 /// Note that care should be taken to not tamper with the underlying stream
95 /// of data coming in as it may corrupt the stream of frames otherwise
96 /// being worked with.
97 pub fn into_inner(self) -> T {
98 self.inner.inner
99 }
100
101 /// Returns a reference to the underlying decoder.
102 pub fn decoder(&self) -> &D {
103 &self.inner.codec
104 }
105
106 /// Returns a mutable reference to the underlying decoder.
107 pub fn decoder_mut(&mut self) -> &mut D {
108 &mut self.inner.codec
109 }
110
111 /// Maps the decoder `D` to `C`, preserving the read buffer
112 /// wrapped by `Framed`.
113 pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
114 where
115 F: FnOnce(D) -> C,
116 {
117 // This could be potentially simplified once rust-lang/rust#86555 hits stable
118 let FramedImpl {
119 inner,
120 state,
121 codec,
122 } = self.inner;
123 FramedRead {
124 inner: FramedImpl {
125 inner,
126 state,
127 codec: map(codec),
128 },
129 }
130 }
131
132 /// Returns a mutable reference to the underlying decoder.
133 pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
134 self.project().inner.project().codec
135 }
136
137 /// Returns a reference to the read buffer.
138 pub fn read_buffer(&self) -> &BytesMut {
139 &self.inner.state.buffer
140 }
141
142 /// Returns a mutable reference to the read buffer.
143 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
144 &mut self.inner.state.buffer
145 }
146}
147
148// This impl just defers to the underlying FramedImpl
149impl<T, D> Stream for FramedRead<T, D>
150where
151 T: AsyncRead,
152 D: Decoder,
153{
154 type Item = Result<D::Item, D::Error>;
155
156 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157 self.project().inner.poll_next(cx)
158 }
159}
160
161// This impl just defers to the underlying T: Sink
162impl<T, I, D> Sink<I> for FramedRead<T, D>
163where
164 T: Sink<I>,
165{
166 type Error = T::Error;
167
168 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
169 self.project().inner.project().inner.poll_ready(cx)
170 }
171
172 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
173 self.project().inner.project().inner.start_send(item)
174 }
175
176 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
177 self.project().inner.project().inner.poll_flush(cx)
178 }
179
180 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
181 self.project().inner.project().inner.poll_close(cx)
182 }
183}
184
185impl<T, D> fmt::Debug for FramedRead<T, D>
186where
187 T: fmt::Debug,
188 D: fmt::Debug,
189{
190 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
191 f.debug_struct("FramedRead")
192 .field("inner", &self.get_ref())
193 .field("decoder", &self.decoder())
194 .field("eof", &self.inner.state.eof)
195 .field("is_readable", &self.inner.state.is_readable)
196 .field("buffer", &self.read_buffer())
197 .finish()
198 }
199}
200