1use std::collections::VecDeque;
2use std::io::IoSlice;
3
4use super::Stream;
5use crate::utils::RawFdContainer;
6
7#[derive(Debug)]
8pub(super) struct WriteBuffer {
9 data_buf: VecDeque<u8>,
10 fd_buf: Vec<RawFdContainer>,
11}
12
13impl WriteBuffer {
14 pub(super) fn new() -> Self {
15 // Buffer size chosen by checking what libxcb does
16 Self::with_capacity(16384)
17 }
18
19 fn with_capacity(capacity: usize) -> Self {
20 Self {
21 data_buf: VecDeque::with_capacity(capacity),
22 fd_buf: Vec::new(),
23 }
24 }
25
26 fn flush_buffer(&mut self, stream: &impl Stream) -> std::io::Result<()> {
27 while self.needs_flush() {
28 crate::trace!(
29 "Trying to flush {} bytes of data and {} FDs",
30 self.data_buf.len(),
31 self.fd_buf.len()
32 );
33 let (data_buf_1, data_buf_2) = self.data_buf.as_slices();
34 let data_bufs = [IoSlice::new(data_buf_1), IoSlice::new(data_buf_2)];
35 match stream.write_vectored(&data_bufs, &mut self.fd_buf) {
36 Ok(0) => {
37 if self.data_buf.is_empty() {
38 assert!(!self.fd_buf.is_empty());
39 return Err(std::io::Error::new(
40 std::io::ErrorKind::WriteZero,
41 "failed to write the buffered FDs",
42 ));
43 } else {
44 return Err(std::io::Error::new(
45 std::io::ErrorKind::WriteZero,
46 "failed to write the buffered data",
47 ));
48 }
49 }
50 Ok(n) => {
51 crate::trace!("Flushing wrote {} bytes of data", n);
52 let _ = self.data_buf.drain(..n);
53 }
54 Err(e) => return Err(e),
55 }
56 }
57 Ok(())
58 }
59
60 fn write_helper<W: Stream, F, G>(
61 &mut self,
62 stream: &W,
63 fds: &mut Vec<RawFdContainer>,
64 write_buffer: F,
65 write_inner: G,
66 first_buffer: &[u8],
67 to_write_length: usize,
68 ) -> std::io::Result<usize>
69 where
70 F: FnOnce(&mut VecDeque<u8>),
71 G: FnOnce(&W, &mut Vec<RawFdContainer>) -> std::io::Result<usize>,
72 {
73 crate::trace!(
74 "Writing {} FDs and {} bytes of data",
75 fds.len(),
76 to_write_length
77 );
78 self.fd_buf.append(fds);
79
80 // Is there enough buffer space left for this write?
81 if (self.data_buf.capacity() - self.data_buf.len()) < to_write_length {
82 // Not enough space, try to flush
83 match self.flush_buffer(stream) {
84 Ok(_) => {}
85 Err(e) => {
86 if e.kind() == std::io::ErrorKind::WouldBlock {
87 let available_buf = self.data_buf.capacity() - self.data_buf.len();
88 if available_buf == 0 {
89 // Buffer filled and cannot flush anything without
90 // blocking, so return `WouldBlock`.
91 crate::trace!("Writing failed due to full buffer: {:?}", e);
92 return Err(e);
93 } else {
94 let n_to_write = first_buffer.len().min(available_buf);
95 self.data_buf.extend(&first_buffer[..n_to_write]);
96 // Return `Ok` because some or all data has been buffered,
97 // so from the outside it is seen as a successful write.
98 crate::trace!("Writing appended {} bytes to the buffer", n_to_write);
99 return Ok(n_to_write);
100 }
101 } else {
102 return Err(e);
103 }
104 }
105 }
106 }
107
108 if to_write_length >= self.data_buf.capacity() {
109 // Write is larger than the buffer capacity, thus we just flushed the buffer. This
110 // means that at this point the buffer is empty. Write directly to self.inner. No data
111 // is copied into the buffer, since that would just mean that the large write gets
112 // split into multiple smaller ones.
113 assert!(self.data_buf.is_empty());
114 crate::trace!("Large write is written directly to the stream");
115 write_inner(stream, &mut self.fd_buf)
116 } else {
117 // At this point there is enough space available in the buffer.
118 crate::trace!("Data to write is appended to the buffer");
119 write_buffer(&mut self.data_buf);
120 Ok(to_write_length)
121 }
122 }
123
124 pub(super) fn write(
125 &mut self,
126 stream: &impl Stream,
127 buf: &[u8],
128 fds: &mut Vec<RawFdContainer>,
129 ) -> std::io::Result<usize> {
130 self.write_helper(
131 stream,
132 fds,
133 |w| w.extend(buf),
134 |w, fd| w.write(buf, fd),
135 buf,
136 buf.len(),
137 )
138 }
139
140 pub(super) fn write_vectored(
141 &mut self,
142 stream: &impl Stream,
143 bufs: &[IoSlice<'_>],
144 fds: &mut Vec<RawFdContainer>,
145 ) -> std::io::Result<usize> {
146 let first_nonempty = bufs
147 .iter()
148 .find(|b| !b.is_empty())
149 .map_or(&[][..], |b| &**b);
150 let total_len = bufs.iter().map(|b| b.len()).sum();
151 self.write_helper(
152 stream,
153 fds,
154 |w| {
155 for buf in bufs.iter() {
156 w.extend(&**buf);
157 }
158 },
159 |w, fd| w.write_vectored(bufs, fd),
160 first_nonempty,
161 total_len,
162 )
163 }
164
165 /// Returns `true` if there is buffered data or FDs.
166 pub(super) fn needs_flush(&self) -> bool {
167 !self.data_buf.is_empty() || !self.fd_buf.is_empty()
168 }
169
170 pub(super) fn flush(&mut self, stream: &impl Stream) -> std::io::Result<()> {
171 self.flush_buffer(stream)
172 }
173}
174
175#[cfg(test)]
176mod test {
177 use std::io::{Error, ErrorKind, IoSlice, Result};
178
179 use super::super::{PollMode, Stream};
180 use super::WriteBuffer;
181 use crate::utils::RawFdContainer;
182
183 struct WouldBlockWriter;
184
185 impl Stream for WouldBlockWriter {
186 fn poll(&self, _mode: PollMode) -> Result<()> {
187 unimplemented!();
188 }
189
190 fn read(&self, _buf: &mut [u8], _fd_storage: &mut Vec<RawFdContainer>) -> Result<usize> {
191 unimplemented!();
192 }
193
194 fn write(&self, _buf: &[u8], _fds: &mut Vec<RawFdContainer>) -> Result<usize> {
195 Err(Error::new(ErrorKind::WouldBlock, "would block"))
196 }
197 }
198
199 // Once upon a time, this paniced because it did bufs[0]
200 #[test]
201 fn empty_write() {
202 let stream = WouldBlockWriter;
203 let mut write_buffer = WriteBuffer::new();
204 let bufs = &[];
205 let _ = write_buffer
206 .write_vectored(&stream, bufs, &mut Vec::new())
207 .unwrap();
208 }
209
210 // Once upon a time, BufWriteFD fell back to only writing the first buffer. This could be
211 // mistaken as EOF.
212 #[test]
213 fn incorrect_eof() {
214 let stream = WouldBlockWriter;
215 let mut write_buffer = WriteBuffer::with_capacity(1);
216 let bufs = &[IoSlice::new(&[]), IoSlice::new(b"fooo")];
217 match write_buffer.write_vectored(&stream, bufs, &mut Vec::new()) {
218 Ok(0) => panic!("This looks like EOF!?"),
219 Ok(_) => {}
220 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
221 Err(e) => panic!("Unexpected error: {:?}", e),
222 }
223 }
224}
225