1use crate::codec::encoder::Encoder;
2use crate::codec::framed_impl::{FramedImpl, WriteFrame};
3
4use futures_core::Stream;
5use tokio::io::AsyncWrite;
6
7use bytes::BytesMut;
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10use std::fmt;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15pin_project! {
16 /// A [`Sink`] of frames encoded to an `AsyncWrite`.
17 ///
18 /// [`Sink`]: futures_sink::Sink
19 pub struct FramedWrite<T, E> {
20 #[pin]
21 inner: FramedImpl<T, E, WriteFrame>,
22 }
23}
24
25impl<T, E> FramedWrite<T, E>
26where
27 T: AsyncWrite,
28{
29 /// Creates a new `FramedWrite` with the given `encoder`.
30 pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
31 FramedWrite {
32 inner: FramedImpl {
33 inner,
34 codec: encoder,
35 state: WriteFrame::default(),
36 },
37 }
38 }
39}
40
41impl<T, E> FramedWrite<T, E> {
42 /// Returns a reference to the underlying I/O stream wrapped by
43 /// `FramedWrite`.
44 ///
45 /// Note that care should be taken to not tamper with the underlying stream
46 /// of data coming in as it may corrupt the stream of frames otherwise
47 /// being worked with.
48 pub fn get_ref(&self) -> &T {
49 &self.inner.inner
50 }
51
52 /// Returns a mutable reference to the underlying I/O stream wrapped by
53 /// `FramedWrite`.
54 ///
55 /// Note that care should be taken to not tamper with the underlying stream
56 /// of data coming in as it may corrupt the stream of frames otherwise
57 /// being worked with.
58 pub fn get_mut(&mut self) -> &mut T {
59 &mut self.inner.inner
60 }
61
62 /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
63 /// `FramedWrite`.
64 ///
65 /// Note that care should be taken to not tamper with the underlying stream
66 /// of data coming in as it may corrupt the stream of frames otherwise
67 /// being worked with.
68 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
69 self.project().inner.project().inner
70 }
71
72 /// Consumes the `FramedWrite`, returning its underlying I/O stream.
73 ///
74 /// Note that care should be taken to not tamper with the underlying stream
75 /// of data coming in as it may corrupt the stream of frames otherwise
76 /// being worked with.
77 pub fn into_inner(self) -> T {
78 self.inner.inner
79 }
80
81 /// Returns a reference to the underlying encoder.
82 pub fn encoder(&self) -> &E {
83 &self.inner.codec
84 }
85
86 /// Returns a mutable reference to the underlying encoder.
87 pub fn encoder_mut(&mut self) -> &mut E {
88 &mut self.inner.codec
89 }
90
91 /// Maps the encoder `E` to `C`, preserving the write buffer
92 /// wrapped by `Framed`.
93 pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
94 where
95 F: FnOnce(E) -> C,
96 {
97 // This could be potentially simplified once rust-lang/rust#86555 hits stable
98 let FramedImpl {
99 inner,
100 state,
101 codec,
102 } = self.inner;
103 FramedWrite {
104 inner: FramedImpl {
105 inner,
106 state,
107 codec: map(codec),
108 },
109 }
110 }
111
112 /// Returns a mutable reference to the underlying encoder.
113 pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
114 self.project().inner.project().codec
115 }
116
117 /// Returns a reference to the write buffer.
118 pub fn write_buffer(&self) -> &BytesMut {
119 &self.inner.state.buffer
120 }
121
122 /// Returns a mutable reference to the write buffer.
123 pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
124 &mut self.inner.state.buffer
125 }
126}
127
128// This impl just defers to the underlying FramedImpl
129impl<T, I, E> Sink<I> for FramedWrite<T, E>
130where
131 T: AsyncWrite,
132 E: Encoder<I>,
133 E::Error: From<io::Error>,
134{
135 type Error = E::Error;
136
137 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.project().inner.poll_ready(cx)
139 }
140
141 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
142 self.project().inner.start_send(item)
143 }
144
145 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146 self.project().inner.poll_flush(cx)
147 }
148
149 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150 self.project().inner.poll_close(cx)
151 }
152}
153
154// This impl just defers to the underlying T: Stream
155impl<T, D> Stream for FramedWrite<T, D>
156where
157 T: Stream,
158{
159 type Item = T::Item;
160
161 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
162 self.project().inner.project().inner.poll_next(cx)
163 }
164}
165
166impl<T, U> fmt::Debug for FramedWrite<T, U>
167where
168 T: fmt::Debug,
169 U: fmt::Debug,
170{
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 f&mut DebugStruct<'_, '_>.debug_struct("FramedWrite")
173 .field("inner", &self.get_ref())
174 .field("encoder", &self.encoder())
175 .field(name:"buffer", &self.inner.state.buffer)
176 .finish()
177 }
178}
179