1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16 /// A [`Sink`] of frames encoded to an `AsyncWrite`.
17 ///
18 /// [`Sink`]: futures_sink::Sink
19 pub struct FramedWrite<T, E> {
20 #[pin]
21 inner: FramedImpl<T, E, WriteFrame>,
22 }
23}
24
25impl<T, E> FramedWrite<T, E>
26where
27 T: AsyncWrite,
28{
29 /// Creates a new `FramedWrite` with the given `encoder`.
30 pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
31 FramedWrite {
32 inner: FramedImpl {
33 inner,
34 codec: encoder,
35 state: WriteFrame::default(),
36 },
37 }
38 }
39}
40
41impl<T, E> FramedWrite<T, E> {
42 /// Returns a reference to the underlying I/O stream wrapped by
43 /// `FramedWrite`.
44 ///
45 /// Note that care should be taken to not tamper with the underlying stream
46 /// of data coming in as it may corrupt the stream of frames otherwise
47 /// being worked with.
48 pub fn get_ref(&self) -> &T {
49 &self.inner.inner
50 }
51
52 /// Returns a mutable reference to the underlying I/O stream wrapped by
53 /// `FramedWrite`.
54 ///
55 /// Note that care should be taken to not tamper with the underlying stream
56 /// of data coming in as it may corrupt the stream of frames otherwise
57 /// being worked with.
58 pub fn get_mut(&mut self) -> &mut T {
59 &mut self.inner.inner
60 }
61
62 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
63 /// `FramedWrite`.
64 ///
65 /// Note that care should be taken to not tamper with the underlying stream
66 /// of data coming in as it may corrupt the stream of frames otherwise
67 /// being worked with.
68 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
69 self.project().inner.project().inner
70 }
71
72 /// Consumes the `FramedWrite`, returning its underlying I/O stream.
73 ///
74 /// Note that care should be taken to not tamper with the underlying stream
75 /// of data coming in as it may corrupt the stream of frames otherwise
76 /// being worked with.
77 pub fn into_inner(self) -> T {
78 self.inner.inner
79 }
80
81 /// Returns a reference to the underlying encoder.
82 pub fn encoder(&self) -> &E {
83 &self.inner.codec
84 }
85
86 /// Returns a mutable reference to the underlying encoder.
87 pub fn encoder_mut(&mut self) -> &mut E {
88 &mut self.inner.codec
89 }
90
91 /// Maps the encoder `E` to `C`, preserving the write buffer
92 /// wrapped by `Framed`.
93 pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
94 where
95 F: FnOnce(E) -> C,
96 {
97 // This could be potentially simplified once rust-lang/rust#86555 hits stable
98 let FramedImpl {
99 inner,
100 state,
101 codec,
102 } = self.inner;
103 FramedWrite {
104 inner: FramedImpl {
105 inner,
106 state,
107 codec: map(codec),
108 },
109 }
110 }
111
112 /// Returns a mutable reference to the underlying encoder.
113 pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
114 self.project().inner.project().codec
115 }
116
117 /// Returns a reference to the write buffer.
118 pub fn write_buffer(&self) -> &BytesMut {
119 &self.inner.state.buffer
120 }
121
122 /// Returns a mutable reference to the write buffer.
123 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
124 &mut self.inner.state.buffer
125 }
126
127 /// Returns backpressure boundary
128 pub fn backpressure_boundary(&self) -> usize {
129 self.inner.state.backpressure_boundary
130 }
131
132 /// Updates backpressure boundary
133 pub fn set_backpressure_boundary(&mut self, boundary: usize) {
134 self.inner.state.backpressure_boundary = boundary;
135 }
136}
137
138// This impl just defers to the underlying FramedImpl
139impl<T, I, E> Sink<I> for FramedWrite<T, E>
140where
141 T: AsyncWrite,
142 E: Encoder<I>,
143 E::Error: From<io::Error>,
144{
145 type Error = E::Error;
146
147 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 self.project().inner.poll_ready(cx)
149 }
150
151 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
152 self.project().inner.start_send(item)
153 }
154
155 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 self.project().inner.poll_flush(cx)
157 }
158
159 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
160 self.project().inner.poll_close(cx)
161 }
162}
163
164// This impl just defers to the underlying T: Stream
165impl<T, D> Stream for FramedWrite<T, D>
166where
167 T: Stream,
168{
169 type Item = T::Item;
170
171 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 self.project().inner.project().inner.poll_next(cx)
173 }
174}
175
176impl<T, U> fmt::Debug for FramedWrite<T, U>
177where
178 T: fmt::Debug,
179 U: fmt::Debug,
180{
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 f.debug_struct("FramedWrite")
183 .field("inner", &self.get_ref())
184 .field("encoder", &self.encoder())
185 .field("buffer", &self.inner.state.buffer)
186 .finish()
187 }
188}
189