1 | use std::collections::VecDeque; |
2 | use std::io::IoSlice; |
3 | |
4 | use super::Stream; |
5 | use crate::utils::RawFdContainer; |
6 | |
7 | #[derive (Debug)] |
8 | pub(super) struct WriteBuffer { |
9 | data_buf: VecDeque<u8>, |
10 | fd_buf: Vec<RawFdContainer>, |
11 | } |
12 | |
13 | impl WriteBuffer { |
14 | pub(super) fn new() -> Self { |
15 | // Buffer size chosen by checking what libxcb does |
16 | Self::with_capacity(16384) |
17 | } |
18 | |
19 | fn with_capacity(capacity: usize) -> Self { |
20 | Self { |
21 | data_buf: VecDeque::with_capacity(capacity), |
22 | fd_buf: Vec::new(), |
23 | } |
24 | } |
25 | |
26 | fn flush_buffer(&mut self, stream: &impl Stream) -> std::io::Result<()> { |
27 | while self.needs_flush() { |
28 | crate::trace!( |
29 | "Trying to flush {} bytes of data and {} FDs" , |
30 | self.data_buf.len(), |
31 | self.fd_buf.len() |
32 | ); |
33 | let (data_buf_1, data_buf_2) = self.data_buf.as_slices(); |
34 | let data_bufs = [IoSlice::new(data_buf_1), IoSlice::new(data_buf_2)]; |
35 | match stream.write_vectored(&data_bufs, &mut self.fd_buf) { |
36 | Ok(0) => { |
37 | if self.data_buf.is_empty() { |
38 | assert!(!self.fd_buf.is_empty()); |
39 | return Err(std::io::Error::new( |
40 | std::io::ErrorKind::WriteZero, |
41 | "failed to write the buffered FDs" , |
42 | )); |
43 | } else { |
44 | return Err(std::io::Error::new( |
45 | std::io::ErrorKind::WriteZero, |
46 | "failed to write the buffered data" , |
47 | )); |
48 | } |
49 | } |
50 | Ok(n) => { |
51 | crate::trace!("Flushing wrote {} bytes of data" , n); |
52 | let _ = self.data_buf.drain(..n); |
53 | } |
54 | Err(e) => return Err(e), |
55 | } |
56 | } |
57 | Ok(()) |
58 | } |
59 | |
60 | fn write_helper<W: Stream, F, G>( |
61 | &mut self, |
62 | stream: &W, |
63 | fds: &mut Vec<RawFdContainer>, |
64 | write_buffer: F, |
65 | write_inner: G, |
66 | first_buffer: &[u8], |
67 | to_write_length: usize, |
68 | ) -> std::io::Result<usize> |
69 | where |
70 | F: FnOnce(&mut VecDeque<u8>), |
71 | G: FnOnce(&W, &mut Vec<RawFdContainer>) -> std::io::Result<usize>, |
72 | { |
73 | crate::trace!( |
74 | "Writing {} FDs and {} bytes of data" , |
75 | fds.len(), |
76 | to_write_length |
77 | ); |
78 | self.fd_buf.append(fds); |
79 | |
80 | // Is there enough buffer space left for this write? |
81 | if (self.data_buf.capacity() - self.data_buf.len()) < to_write_length { |
82 | // Not enough space, try to flush |
83 | match self.flush_buffer(stream) { |
84 | Ok(_) => {} |
85 | Err(e) => { |
86 | if e.kind() == std::io::ErrorKind::WouldBlock { |
87 | let available_buf = self.data_buf.capacity() - self.data_buf.len(); |
88 | if available_buf == 0 { |
89 | // Buffer filled and cannot flush anything without |
90 | // blocking, so return `WouldBlock`. |
91 | crate::trace!("Writing failed due to full buffer: {:?}" , e); |
92 | return Err(e); |
93 | } else { |
94 | let n_to_write = first_buffer.len().min(available_buf); |
95 | self.data_buf.extend(&first_buffer[..n_to_write]); |
96 | // Return `Ok` because some or all data has been buffered, |
97 | // so from the outside it is seen as a successful write. |
98 | crate::trace!("Writing appended {} bytes to the buffer" , n_to_write); |
99 | return Ok(n_to_write); |
100 | } |
101 | } else { |
102 | return Err(e); |
103 | } |
104 | } |
105 | } |
106 | } |
107 | |
108 | if to_write_length >= self.data_buf.capacity() { |
109 | // Write is larger than the buffer capacity, thus we just flushed the buffer. This |
110 | // means that at this point the buffer is empty. Write directly to self.inner. No data |
111 | // is copied into the buffer, since that would just mean that the large write gets |
112 | // split into multiple smaller ones. |
113 | assert!(self.data_buf.is_empty()); |
114 | crate::trace!("Large write is written directly to the stream" ); |
115 | write_inner(stream, &mut self.fd_buf) |
116 | } else { |
117 | // At this point there is enough space available in the buffer. |
118 | crate::trace!("Data to write is appended to the buffer" ); |
119 | write_buffer(&mut self.data_buf); |
120 | Ok(to_write_length) |
121 | } |
122 | } |
123 | |
124 | pub(super) fn write( |
125 | &mut self, |
126 | stream: &impl Stream, |
127 | buf: &[u8], |
128 | fds: &mut Vec<RawFdContainer>, |
129 | ) -> std::io::Result<usize> { |
130 | self.write_helper( |
131 | stream, |
132 | fds, |
133 | |w| w.extend(buf), |
134 | |w, fd| w.write(buf, fd), |
135 | buf, |
136 | buf.len(), |
137 | ) |
138 | } |
139 | |
140 | pub(super) fn write_vectored( |
141 | &mut self, |
142 | stream: &impl Stream, |
143 | bufs: &[IoSlice<'_>], |
144 | fds: &mut Vec<RawFdContainer>, |
145 | ) -> std::io::Result<usize> { |
146 | let first_nonempty = bufs |
147 | .iter() |
148 | .find(|b| !b.is_empty()) |
149 | .map_or(&[][..], |b| &**b); |
150 | let total_len = bufs.iter().map(|b| b.len()).sum(); |
151 | self.write_helper( |
152 | stream, |
153 | fds, |
154 | |w| { |
155 | for buf in bufs.iter() { |
156 | w.extend(&**buf); |
157 | } |
158 | }, |
159 | |w, fd| w.write_vectored(bufs, fd), |
160 | first_nonempty, |
161 | total_len, |
162 | ) |
163 | } |
164 | |
165 | /// Returns `true` if there is buffered data or FDs. |
166 | pub(super) fn needs_flush(&self) -> bool { |
167 | !self.data_buf.is_empty() || !self.fd_buf.is_empty() |
168 | } |
169 | |
170 | pub(super) fn flush(&mut self, stream: &impl Stream) -> std::io::Result<()> { |
171 | self.flush_buffer(stream) |
172 | } |
173 | } |
174 | |
175 | #[cfg (test)] |
176 | mod test { |
177 | use std::io::{Error, ErrorKind, IoSlice, Result}; |
178 | |
179 | use super::super::{PollMode, Stream}; |
180 | use super::WriteBuffer; |
181 | use crate::utils::RawFdContainer; |
182 | |
183 | struct WouldBlockWriter; |
184 | |
185 | impl Stream for WouldBlockWriter { |
186 | fn poll(&self, _mode: PollMode) -> Result<()> { |
187 | unimplemented!(); |
188 | } |
189 | |
190 | fn read(&self, _buf: &mut [u8], _fd_storage: &mut Vec<RawFdContainer>) -> Result<usize> { |
191 | unimplemented!(); |
192 | } |
193 | |
194 | fn write(&self, _buf: &[u8], _fds: &mut Vec<RawFdContainer>) -> Result<usize> { |
195 | Err(Error::new(ErrorKind::WouldBlock, "would block" )) |
196 | } |
197 | } |
198 | |
199 | // Once upon a time, this paniced because it did bufs[0] |
200 | #[test ] |
201 | fn empty_write() { |
202 | let stream = WouldBlockWriter; |
203 | let mut write_buffer = WriteBuffer::new(); |
204 | let bufs = &[]; |
205 | let _ = write_buffer |
206 | .write_vectored(&stream, bufs, &mut Vec::new()) |
207 | .unwrap(); |
208 | } |
209 | |
210 | // Once upon a time, BufWriteFD fell back to only writing the first buffer. This could be |
211 | // mistaken as EOF. |
212 | #[test ] |
213 | fn incorrect_eof() { |
214 | let stream = WouldBlockWriter; |
215 | let mut write_buffer = WriteBuffer::with_capacity(1); |
216 | let bufs = &[IoSlice::new(&[]), IoSlice::new(b"fooo" )]; |
217 | match write_buffer.write_vectored(&stream, bufs, &mut Vec::new()) { |
218 | Ok(0) => panic!("This looks like EOF!?" ), |
219 | Ok(_) => {} |
220 | Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} |
221 | Err(e) => panic!("Unexpected error: {:?}" , e), |
222 | } |
223 | } |
224 | } |
225 | |