1 | //! A pure-rust implementation of a connection to an X11 server. |
2 | |
3 | use std::io::IoSlice; |
4 | use std::sync::{Condvar, Mutex, MutexGuard, TryLockError}; |
5 | use std::time::Instant; |
6 | |
7 | use crate::connection::{ |
8 | compute_length_field, Connection, ReplyOrError, RequestConnection, RequestKind, |
9 | }; |
10 | use crate::cookie::{Cookie, CookieWithFds, VoidCookie}; |
11 | use crate::errors::DisplayParsingError; |
12 | pub use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError}; |
13 | use crate::extension_manager::ExtensionManager; |
14 | use crate::protocol::bigreq::{ConnectionExt as _, EnableReply}; |
15 | use crate::protocol::xproto::{Setup, GET_INPUT_FOCUS_REQUEST, QUERY_EXTENSION_REQUEST}; |
16 | use crate::utils::RawFdContainer; |
17 | use crate::x11_utils::{ExtensionInformation, TryParse, TryParseFd}; |
18 | use x11rb_protocol::connect::Connect; |
19 | use x11rb_protocol::connection::{Connection as ProtoConnection, PollReply, ReplyFdKind}; |
20 | use x11rb_protocol::id_allocator::IdAllocator; |
21 | use x11rb_protocol::{xauth::get_auth, DiscardMode, RawEventAndSeqNumber, SequenceNumber}; |
22 | |
23 | mod packet_reader; |
24 | mod stream; |
25 | mod write_buffer; |
26 | |
27 | use packet_reader::PacketReader; |
28 | pub use stream::{DefaultStream, PollMode, Stream}; |
29 | use write_buffer::WriteBuffer; |
30 | |
31 | type Buffer = <RustConnection as RequestConnection>::Buf; |
32 | /// A combination of a buffer and a list of file descriptors for use by [`RustConnection`]. |
33 | pub type BufWithFds = crate::connection::BufWithFds<Buffer>; |
34 | |
35 | #[derive (Debug)] |
36 | enum MaxRequestBytes { |
37 | Unknown, |
38 | Requested(Option<SequenceNumber>), |
39 | Known(usize), |
40 | } |
41 | |
42 | #[derive (Debug)] |
43 | struct ConnectionInner { |
44 | inner: ProtoConnection, |
45 | write_buffer: WriteBuffer, |
46 | } |
47 | |
48 | type MutexGuardInner<'a> = MutexGuard<'a, ConnectionInner>; |
49 | |
50 | #[derive (Debug, Copy, Clone, PartialEq, Eq)] |
51 | pub(crate) enum BlockingMode { |
52 | Blocking, |
53 | NonBlocking, |
54 | } |
55 | |
56 | /// A connection to an X11 server implemented in pure rust |
57 | /// |
58 | /// This type is generic over `S`, which allows to use a generic stream to communicate with the |
59 | /// server. This stream can written to and read from, but it can also be polled, meaning that one |
60 | /// checks if new data can be read or written. |
61 | /// |
62 | /// `RustConnection` always used an internal buffer for reading, so `R` does not need |
63 | /// to be buffered. |
64 | #[derive (Debug)] |
65 | pub struct RustConnection<S: Stream = DefaultStream> { |
66 | inner: Mutex<ConnectionInner>, |
67 | stream: S, |
68 | // This mutex is only locked with `try_lock` (never blocks), so a simpler |
69 | // lock based only on a atomic variable would be more efficient. |
70 | packet_reader: Mutex<PacketReader>, |
71 | reader_condition: Condvar, |
72 | setup: Setup, |
73 | extension_manager: Mutex<ExtensionManager>, |
74 | maximum_request_bytes: Mutex<MaxRequestBytes>, |
75 | id_allocator: Mutex<IdAllocator>, |
76 | } |
77 | |
78 | // Locking rules |
79 | // ============= |
80 | // |
81 | // To avoid deadlocks, it is important to have a defined ordering about mutexes: |
82 | // |
83 | // Mutexes that may be locked when no other mutex is held: |
84 | // - maximum_request_bytes |
85 | // - extension_manager |
86 | // - id_allocator |
87 | // |
88 | // Then comes `inner`. This mutex protects the information about in-flight requests and packets |
89 | // that were already read from the connection but not given out to callers. This mutex also |
90 | // contains the write buffer and has to be locked in order to write something to the X11 server. |
91 | // In this case, the mutex has to be kept locked until writing the request has finished. This is |
92 | // necessary to ensure correct sync insertion without threads interfering with each other. When |
93 | // this mutex is locked for operations other than writing, the lock should be kept only for a |
94 | // short time. |
95 | // |
96 | // The inner level is `packet_reader`. This mutex is only locked when `inner` is already held and |
97 | // only with `try_lock()`. This ensures that there is only one reader. While actually reading, the |
98 | // lock on `inner` is released so that other threads can make progress. If more threads want to |
99 | // read while `read` is already locked, they sleep on `reader_condition`. The actual reader will |
100 | // then notify this condition variable once it is done reading. |
101 | // |
102 | // n.b. notgull: write_buffer follows the same rules |
103 | // |
104 | // The condition variable is necessary since one thread may read packets that another thread waits |
105 | // for. Thus, after reading something from the connection, all threads that wait for something have |
106 | // to check if they are the intended recipient. |
107 | |
108 | impl RustConnection<DefaultStream> { |
109 | /// Establish a new connection. |
110 | /// |
111 | /// If no `dpy_name` is provided, the value from `$DISPLAY` is used. |
112 | pub fn connect(dpy_name: Option<&str>) -> Result<(Self, usize), ConnectError> { |
113 | // Parse display information |
114 | let parsed_display = x11rb_protocol::parse_display::parse_display(dpy_name)?; |
115 | let screen = parsed_display.screen.into(); |
116 | |
117 | // Establish connection by iterating over ConnectAddresses until we find one that |
118 | // works. |
119 | let mut error = None; |
120 | for addr in parsed_display.connect_instruction() { |
121 | let start = Instant::now(); |
122 | match DefaultStream::connect(&addr) { |
123 | Ok((stream, (family, address))) => { |
124 | crate::trace!( |
125 | "Connected to X11 server via {:?} in {:?}" , |
126 | addr, |
127 | start.elapsed() |
128 | ); |
129 | |
130 | // we found a stream, get auth information |
131 | let (auth_name, auth_data) = get_auth(family, &address, parsed_display.display) |
132 | // Ignore all errors while determining auth; instead we just try without auth info. |
133 | .unwrap_or(None) |
134 | .unwrap_or_else(|| (Vec::new(), Vec::new())); |
135 | crate::trace!("Picked authentication via auth mechanism {:?}" , auth_name); |
136 | |
137 | // finish connecting to server |
138 | return Ok(( |
139 | Self::connect_to_stream_with_auth_info( |
140 | stream, screen, auth_name, auth_data, |
141 | )?, |
142 | screen, |
143 | )); |
144 | } |
145 | Err(e) => { |
146 | crate::debug!("Failed to connect to X11 server via {:?}: {:?}" , addr, e); |
147 | error = Some(e); |
148 | continue; |
149 | } |
150 | } |
151 | } |
152 | |
153 | // none of the addresses worked |
154 | Err(match error { |
155 | Some(e) => ConnectError::IoError(e), |
156 | None => DisplayParsingError::Unknown.into(), |
157 | }) |
158 | } |
159 | } |
160 | |
161 | impl<S: Stream> RustConnection<S> { |
162 | /// Establish a new connection to the given streams. |
163 | /// |
164 | /// `read` is used for reading data from the X11 server and `write` is used for writing. |
165 | /// `screen` is the number of the screen that should be used. This function checks that a |
166 | /// screen with that number exists. |
167 | pub fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> { |
168 | Self::connect_to_stream_with_auth_info(stream, screen, Vec::new(), Vec::new()) |
169 | } |
170 | |
171 | /// Establish a new connection to the given streams. |
172 | /// |
173 | /// `read` is used for reading data from the X11 server and `write` is used for writing. |
174 | /// `screen` is the number of the screen that should be used. This function checks that a |
175 | /// screen with that number exists. |
176 | /// |
177 | /// The parameters `auth_name` and `auth_data` are used for the members |
178 | /// `authorization_protocol_name` and `authorization_protocol_data` of the `SetupRequest` that |
179 | /// is sent to the X11 server. |
180 | pub fn connect_to_stream_with_auth_info( |
181 | stream: S, |
182 | screen: usize, |
183 | auth_name: Vec<u8>, |
184 | auth_data: Vec<u8>, |
185 | ) -> Result<Self, ConnectError> { |
186 | let (mut connect, setup_request) = Connect::with_authorization(auth_name, auth_data); |
187 | |
188 | // write the connect() setup request |
189 | let mut nwritten = 0; |
190 | let mut fds = vec![]; |
191 | |
192 | crate::trace!( |
193 | "Writing connection setup with {} bytes" , |
194 | setup_request.len() |
195 | ); |
196 | while nwritten != setup_request.len() { |
197 | stream.poll(PollMode::Writable)?; |
198 | // poll returned successfully, so the stream is writable. |
199 | match stream.write(&setup_request[nwritten..], &mut fds) { |
200 | Ok(0) => { |
201 | return Err(std::io::Error::new( |
202 | std::io::ErrorKind::WriteZero, |
203 | "failed to write whole buffer" , |
204 | ) |
205 | .into()) |
206 | } |
207 | Ok(n) => nwritten += n, |
208 | // Spurious wakeup from poll, try again |
209 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {} |
210 | Err(e) => return Err(e.into()), |
211 | } |
212 | } |
213 | |
214 | // read in the setup |
215 | loop { |
216 | stream.poll(PollMode::Readable)?; |
217 | crate::trace!( |
218 | "Reading connection setup with at least {} bytes remaining" , |
219 | connect.buffer().len() |
220 | ); |
221 | let adv = match stream.read(connect.buffer(), &mut fds) { |
222 | Ok(0) => { |
223 | return Err(std::io::Error::new( |
224 | std::io::ErrorKind::UnexpectedEof, |
225 | "failed to read whole buffer" , |
226 | ) |
227 | .into()) |
228 | } |
229 | Ok(n) => n, |
230 | // Spurious wakeup from poll, try again |
231 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, |
232 | Err(e) => return Err(e.into()), |
233 | }; |
234 | crate::trace!("Read {} bytes" , adv); |
235 | |
236 | // advance the internal buffer |
237 | if connect.advance(adv) { |
238 | break; |
239 | } |
240 | } |
241 | |
242 | // resolve the setup |
243 | let setup = connect.into_setup()?; |
244 | |
245 | // Check that we got a valid screen number |
246 | if screen >= setup.roots.len() { |
247 | return Err(ConnectError::InvalidScreen); |
248 | } |
249 | |
250 | // Success! Set up our state |
251 | Self::for_connected_stream(stream, setup) |
252 | } |
253 | |
254 | /// Establish a new connection for an already connected stream. |
255 | /// |
256 | /// The given `stream` is used for communicating with the X11 server. |
257 | /// It is assumed that `setup` was just received from the server. Thus, the first reply to a |
258 | /// request that is sent will have sequence number one. |
259 | pub fn for_connected_stream(stream: S, setup: Setup) -> Result<Self, ConnectError> { |
260 | let id_allocator = IdAllocator::new(setup.resource_id_base, setup.resource_id_mask)?; |
261 | |
262 | Ok(RustConnection { |
263 | inner: Mutex::new(ConnectionInner { |
264 | inner: ProtoConnection::new(), |
265 | write_buffer: WriteBuffer::new(), |
266 | }), |
267 | stream, |
268 | packet_reader: Mutex::new(PacketReader::new()), |
269 | reader_condition: Condvar::new(), |
270 | setup, |
271 | extension_manager: Default::default(), |
272 | maximum_request_bytes: Mutex::new(MaxRequestBytes::Unknown), |
273 | id_allocator: Mutex::new(id_allocator), |
274 | }) |
275 | } |
276 | |
277 | /// Internal function for actually sending a request. |
278 | /// |
279 | /// This function "does the actual work" for `send_request_with_reply()` and |
280 | /// `send_request_without_reply()`. |
281 | fn send_request( |
282 | &self, |
283 | bufs: &[IoSlice<'_>], |
284 | fds: Vec<RawFdContainer>, |
285 | kind: ReplyFdKind, |
286 | ) -> Result<SequenceNumber, ConnectionError> { |
287 | let _guard = crate::debug_span!("send_request" ).entered(); |
288 | |
289 | let request_info = RequestInfo { |
290 | extension_manager: &self.extension_manager, |
291 | major_opcode: bufs[0][0], |
292 | minor_opcode: bufs[0][1], |
293 | }; |
294 | crate::debug!("Sending {}" , request_info); |
295 | |
296 | let mut storage = Default::default(); |
297 | let bufs = compute_length_field(self, bufs, &mut storage)?; |
298 | |
299 | // Note: `inner` must be kept blocked until the request has been completely written |
300 | // or buffered to avoid sending the data of different requests interleaved. For this |
301 | // reason, `read_packet_and_enqueue` must always be called with `BlockingMode::NonBlocking` |
302 | // during a write, otherwise `inner` would be temporarily released. |
303 | let mut inner = self.inner.lock().unwrap(); |
304 | |
305 | loop { |
306 | let send_result = inner.inner.send_request(kind); |
307 | match send_result { |
308 | Some(seqno) => { |
309 | // Now actually send the buffers |
310 | let _inner = self.write_all_vectored(inner, bufs, fds)?; |
311 | return Ok(seqno); |
312 | } |
313 | None => { |
314 | crate::trace!("Syncing with the X11 server since there are too many outstanding void requests" ); |
315 | inner = self.send_sync(inner)?; |
316 | } |
317 | } |
318 | } |
319 | } |
320 | |
321 | /// Send a synchronisation packet to the X11 server. |
322 | /// |
323 | /// This function sends a `GetInputFocus` request to the X11 server and arranges for its reply |
324 | /// to be ignored. This ensures that a reply is expected (`ConnectionInner.next_reply_expected` |
325 | /// increases). |
326 | fn send_sync<'a>( |
327 | &'a self, |
328 | mut inner: MutexGuardInner<'a>, |
329 | ) -> Result<MutexGuardInner<'a>, std::io::Error> { |
330 | let length = 1u16.to_ne_bytes(); |
331 | let request = [ |
332 | GET_INPUT_FOCUS_REQUEST, |
333 | 0, /* pad */ |
334 | length[0], |
335 | length[1], |
336 | ]; |
337 | |
338 | let seqno = inner |
339 | .inner |
340 | .send_request(ReplyFdKind::ReplyWithoutFDs) |
341 | .expect("Sending a HasResponse request should not be blocked by syncs" ); |
342 | inner |
343 | .inner |
344 | .discard_reply(seqno, DiscardMode::DiscardReplyAndError); |
345 | let inner = self.write_all_vectored(inner, &[IoSlice::new(&request)], Vec::new())?; |
346 | |
347 | Ok(inner) |
348 | } |
349 | |
350 | /// Write a set of buffers on a `writer`. May also read packets |
351 | /// from the server. |
352 | fn write_all_vectored<'a>( |
353 | &'a self, |
354 | mut inner: MutexGuardInner<'a>, |
355 | mut bufs: &[IoSlice<'_>], |
356 | mut fds: Vec<RawFdContainer>, |
357 | ) -> std::io::Result<MutexGuardInner<'a>> { |
358 | let mut partial_buf: &[u8] = &[]; |
359 | while !partial_buf.is_empty() || !bufs.is_empty() { |
360 | self.stream.poll(PollMode::ReadAndWritable)?; |
361 | let write_result = if !partial_buf.is_empty() { |
362 | // "inner" is held, passed into this function, so this should never be held |
363 | inner |
364 | .write_buffer |
365 | .write(&self.stream, partial_buf, &mut fds) |
366 | } else { |
367 | // same as above |
368 | inner |
369 | .write_buffer |
370 | .write_vectored(&self.stream, bufs, &mut fds) |
371 | }; |
372 | match write_result { |
373 | Ok(0) => { |
374 | return Err(std::io::Error::new( |
375 | std::io::ErrorKind::WriteZero, |
376 | "failed to write anything" , |
377 | )); |
378 | } |
379 | Ok(mut count) => { |
380 | // Successful write |
381 | if count >= partial_buf.len() { |
382 | count -= partial_buf.len(); |
383 | partial_buf = &[]; |
384 | } else { |
385 | partial_buf = &partial_buf[count..]; |
386 | count = 0; |
387 | } |
388 | while count > 0 { |
389 | if count >= bufs[0].len() { |
390 | count -= bufs[0].len(); |
391 | } else { |
392 | partial_buf = &bufs[0][count..]; |
393 | count = 0; |
394 | } |
395 | bufs = &bufs[1..]; |
396 | // Skip empty slices |
397 | while bufs.first().map(|s| s.len()) == Some(0) { |
398 | bufs = &bufs[1..]; |
399 | } |
400 | } |
401 | } |
402 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { |
403 | crate::trace!("Writing more data would block for now" ); |
404 | // Writing would block, try to read instead because the |
405 | // server might not accept new requests after its |
406 | // buffered replies have been read. |
407 | inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?; |
408 | } |
409 | Err(e) => return Err(e), |
410 | } |
411 | } |
412 | if !fds.is_empty() { |
413 | return Err(std::io::Error::new( |
414 | std::io::ErrorKind::Other, |
415 | "Left over FDs after sending the request" , |
416 | )); |
417 | } |
418 | Ok(inner) |
419 | } |
420 | |
421 | fn flush_impl<'a>( |
422 | &'a self, |
423 | mut inner: MutexGuardInner<'a>, |
424 | ) -> std::io::Result<MutexGuardInner<'a>> { |
425 | // n.b. notgull: inner guard is held |
426 | while inner.write_buffer.needs_flush() { |
427 | self.stream.poll(PollMode::ReadAndWritable)?; |
428 | let flush_result = inner.write_buffer.flush(&self.stream); |
429 | match flush_result { |
430 | // Flush completed |
431 | Ok(()) => break, |
432 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { |
433 | crate::trace!("Flushing more data would block for now" ); |
434 | // Writing would block, try to read instead because the |
435 | // server might not accept new requests after its |
436 | // buffered replies have been read. |
437 | inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?; |
438 | } |
439 | Err(e) => return Err(e), |
440 | } |
441 | } |
442 | Ok(inner) |
443 | } |
444 | |
445 | /// Read a packet from the connection. |
446 | /// |
447 | /// This function waits for an X11 packet to be received. It drops the mutex protecting the |
448 | /// inner data while waiting for a packet so that other threads can make progress. For this |
449 | /// reason, you need to pass in a `MutexGuard` to be dropped. This function locks the mutex |
450 | /// again and returns a new `MutexGuard`. |
451 | /// |
452 | /// Note: If `mode` is `BlockingMode::Blocking`, the lock on `inner` will be temporarily |
453 | /// released. While sending a request, `inner` must be kept locked to avoid sending the data |
454 | /// of different requests interleaved. So, when `read_packet_and_enqueue` is called as part |
455 | /// of a write, it must always be done with `mode` set to `BlockingMode::NonBlocking`. |
456 | fn read_packet_and_enqueue<'a>( |
457 | &'a self, |
458 | mut inner: MutexGuardInner<'a>, |
459 | mode: BlockingMode, |
460 | ) -> Result<MutexGuardInner<'a>, std::io::Error> { |
461 | // 0.1. Try to lock the `packet_reader` mutex. |
462 | match self.packet_reader.try_lock() { |
463 | Err(TryLockError::WouldBlock) => { |
464 | // In non-blocking mode, we just return immediately |
465 | match mode { |
466 | BlockingMode::NonBlocking => { |
467 | crate::trace!("read_packet_and_enqueue in NonBlocking mode doing nothing since reader is already locked" ); |
468 | return Ok(inner); |
469 | } |
470 | BlockingMode::Blocking => { |
471 | crate::trace!("read_packet_and_enqueue in Blocking mode waiting for pre-existing reader" ); |
472 | } |
473 | } |
474 | |
475 | // 1.1. Someone else is reading (other thread is at 2.2); |
476 | // wait for it. `Condvar::wait` will unlock `inner`, so |
477 | // the other thread can relock `inner` at 2.1.3 (and to allow |
478 | // other threads to arrive 0.1). |
479 | // |
480 | // When `wait` finishes, other thread has enqueued a packet, |
481 | // so the purpose of this function has been fulfilled. `wait` |
482 | // will relock `inner` when it returns. |
483 | Ok(self.reader_condition.wait(inner).unwrap()) |
484 | } |
485 | Err(TryLockError::Poisoned(e)) => panic!("{}" , e), |
486 | Ok(mut packet_reader) => { |
487 | // Make sure sleeping readers are woken up when we return |
488 | // (Even in case of errors) |
489 | let notify_on_drop = NotifyOnDrop(&self.reader_condition); |
490 | |
491 | // 2.1. Poll for read if mode is blocking. |
492 | if mode == BlockingMode::Blocking { |
493 | // 2.1.1. Unlock `inner`, so other threads can use it while |
494 | // during the poll. |
495 | drop(inner); |
496 | // 2.1.2. Do the actual poll |
497 | self.stream.poll(PollMode::Readable)?; |
498 | // 2.1.3. Relock inner |
499 | inner = self.inner.lock().unwrap(); |
500 | } |
501 | |
502 | // 2.2. Try to read as many packets as possible without blocking. |
503 | let mut fds = Vec::new(); |
504 | let mut packets = Vec::new(); |
505 | packet_reader.try_read_packets(&self.stream, &mut packets, &mut fds)?; |
506 | |
507 | // 2.3. Once `inner` has been relocked, drop the |
508 | // lock on `packet_reader`. While inner is locked, other |
509 | // threads cannot arrive at 0.1 anyways. |
510 | // |
511 | // `packet_reader` must be unlocked with `inner` is locked, |
512 | // otherwise it could let another thread wait on 2.1 |
513 | // for a reply that has been read but not enqueued yet. |
514 | drop(packet_reader); |
515 | |
516 | // 2.4. Actually enqueue the read packets. |
517 | inner.inner.enqueue_fds(fds); |
518 | packets |
519 | .into_iter() |
520 | .for_each(|packet| inner.inner.enqueue_packet(packet)); |
521 | |
522 | // 2.5. Notify the condvar by dropping the `notify_on_drop` object. |
523 | // The object would have been dropped when the function returns, so |
524 | // the explicit drop is not really needed. The purpose of having a |
525 | // explicit drop is to... make it explicit. |
526 | drop(notify_on_drop); |
527 | |
528 | // 2.6. Return the locked `inner` back to the caller. |
529 | Ok(inner) |
530 | } |
531 | } |
532 | } |
533 | |
534 | fn prefetch_maximum_request_bytes_impl(&self, max_bytes: &mut MutexGuard<'_, MaxRequestBytes>) { |
535 | if let MaxRequestBytes::Unknown = **max_bytes { |
536 | crate::info!("Prefetching maximum request length" ); |
537 | let request = self |
538 | .bigreq_enable() |
539 | .map(|cookie| cookie.into_sequence_number()) |
540 | .ok(); |
541 | **max_bytes = MaxRequestBytes::Requested(request); |
542 | } |
543 | } |
544 | |
545 | /// Returns a reference to the contained stream. |
546 | pub fn stream(&self) -> &S { |
547 | &self.stream |
548 | } |
549 | } |
550 | |
551 | impl<S: Stream> RequestConnection for RustConnection<S> { |
552 | type Buf = Vec<u8>; |
553 | |
554 | fn send_request_with_reply<Reply>( |
555 | &self, |
556 | bufs: &[IoSlice<'_>], |
557 | fds: Vec<RawFdContainer>, |
558 | ) -> Result<Cookie<'_, Self, Reply>, ConnectionError> |
559 | where |
560 | Reply: TryParse, |
561 | { |
562 | Ok(Cookie::new( |
563 | self, |
564 | self.send_request(bufs, fds, ReplyFdKind::ReplyWithoutFDs)?, |
565 | )) |
566 | } |
567 | |
568 | fn send_request_with_reply_with_fds<Reply>( |
569 | &self, |
570 | bufs: &[IoSlice<'_>], |
571 | fds: Vec<RawFdContainer>, |
572 | ) -> Result<CookieWithFds<'_, Self, Reply>, ConnectionError> |
573 | where |
574 | Reply: TryParseFd, |
575 | { |
576 | Ok(CookieWithFds::new( |
577 | self, |
578 | self.send_request(bufs, fds, ReplyFdKind::ReplyWithFDs)?, |
579 | )) |
580 | } |
581 | |
582 | fn send_request_without_reply( |
583 | &self, |
584 | bufs: &[IoSlice<'_>], |
585 | fds: Vec<RawFdContainer>, |
586 | ) -> Result<VoidCookie<'_, Self>, ConnectionError> { |
587 | Ok(VoidCookie::new( |
588 | self, |
589 | self.send_request(bufs, fds, ReplyFdKind::NoReply)?, |
590 | )) |
591 | } |
592 | |
593 | fn discard_reply(&self, sequence: SequenceNumber, _kind: RequestKind, mode: DiscardMode) { |
594 | crate::debug!( |
595 | "Discarding reply to request {} in mode {:?}" , |
596 | sequence, |
597 | mode |
598 | ); |
599 | self.inner |
600 | .lock() |
601 | .unwrap() |
602 | .inner |
603 | .discard_reply(sequence, mode); |
604 | } |
605 | |
606 | fn prefetch_extension_information( |
607 | &self, |
608 | extension_name: &'static str, |
609 | ) -> Result<(), ConnectionError> { |
610 | self.extension_manager |
611 | .lock() |
612 | .unwrap() |
613 | .prefetch_extension_information(self, extension_name) |
614 | } |
615 | |
616 | fn extension_information( |
617 | &self, |
618 | extension_name: &'static str, |
619 | ) -> Result<Option<ExtensionInformation>, ConnectionError> { |
620 | self.extension_manager |
621 | .lock() |
622 | .unwrap() |
623 | .extension_information(self, extension_name) |
624 | } |
625 | |
626 | fn wait_for_reply_or_raw_error( |
627 | &self, |
628 | sequence: SequenceNumber, |
629 | ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> { |
630 | match self.wait_for_reply_with_fds_raw(sequence)? { |
631 | ReplyOrError::Reply((reply, _fds)) => Ok(ReplyOrError::Reply(reply)), |
632 | ReplyOrError::Error(e) => Ok(ReplyOrError::Error(e)), |
633 | } |
634 | } |
635 | |
636 | fn wait_for_reply(&self, sequence: SequenceNumber) -> Result<Option<Vec<u8>>, ConnectionError> { |
637 | let _guard = crate::debug_span!("wait_for_reply" , sequence).entered(); |
638 | |
639 | let mut inner = self.inner.lock().unwrap(); |
640 | inner = self.flush_impl(inner)?; |
641 | loop { |
642 | crate::trace!({ sequence }, "Polling for reply" ); |
643 | let poll_result = inner.inner.poll_for_reply(sequence); |
644 | match poll_result { |
645 | PollReply::TryAgain => {} |
646 | PollReply::NoReply => return Ok(None), |
647 | PollReply::Reply(buffer) => return Ok(Some(buffer)), |
648 | } |
649 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
650 | } |
651 | } |
652 | |
653 | fn check_for_raw_error( |
654 | &self, |
655 | sequence: SequenceNumber, |
656 | ) -> Result<Option<Buffer>, ConnectionError> { |
657 | let _guard = crate::debug_span!("check_for_raw_error" , sequence).entered(); |
658 | |
659 | let mut inner = self.inner.lock().unwrap(); |
660 | if inner.inner.prepare_check_for_reply_or_error(sequence) { |
661 | crate::trace!("Inserting sync with the X11 server" ); |
662 | inner = self.send_sync(inner)?; |
663 | assert!(!inner.inner.prepare_check_for_reply_or_error(sequence)); |
664 | } |
665 | // Ensure the request is sent |
666 | inner = self.flush_impl(inner)?; |
667 | loop { |
668 | crate::trace!({ sequence }, "Polling for reply or error" ); |
669 | let poll_result = inner.inner.poll_check_for_reply_or_error(sequence); |
670 | match poll_result { |
671 | PollReply::TryAgain => {} |
672 | PollReply::NoReply => return Ok(None), |
673 | PollReply::Reply(buffer) => return Ok(Some(buffer)), |
674 | } |
675 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
676 | } |
677 | } |
678 | |
679 | fn wait_for_reply_with_fds_raw( |
680 | &self, |
681 | sequence: SequenceNumber, |
682 | ) -> Result<ReplyOrError<BufWithFds, Buffer>, ConnectionError> { |
683 | let _guard = crate::debug_span!("wait_for_reply_with_fds_raw" , sequence).entered(); |
684 | |
685 | let mut inner = self.inner.lock().unwrap(); |
686 | // Ensure the request is sent |
687 | inner = self.flush_impl(inner)?; |
688 | loop { |
689 | crate::trace!({ sequence }, "Polling for reply or error" ); |
690 | if let Some(reply) = inner.inner.poll_for_reply_or_error(sequence) { |
691 | if reply.0[0] == 0 { |
692 | crate::trace!("Got error" ); |
693 | return Ok(ReplyOrError::Error(reply.0)); |
694 | } else { |
695 | crate::trace!("Got reply" ); |
696 | return Ok(ReplyOrError::Reply(reply)); |
697 | } |
698 | } |
699 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
700 | } |
701 | } |
702 | |
703 | fn maximum_request_bytes(&self) -> usize { |
704 | let mut max_bytes = self.maximum_request_bytes.lock().unwrap(); |
705 | self.prefetch_maximum_request_bytes_impl(&mut max_bytes); |
706 | use MaxRequestBytes::*; |
707 | let max_bytes = &mut *max_bytes; |
708 | match max_bytes { |
709 | Unknown => unreachable!("We just prefetched this" ), |
710 | Requested(seqno) => { |
711 | let _guard = crate::info_span!("maximum_request_bytes" ).entered(); |
712 | |
713 | let length = seqno |
714 | // If prefetching the request succeeded, get a cookie |
715 | .and_then(|seqno| { |
716 | Cookie::<_, EnableReply>::new(self, seqno) |
717 | // and then get the reply to the request |
718 | .reply() |
719 | .map(|reply| reply.maximum_request_length) |
720 | .ok() |
721 | }) |
722 | // If anything failed (sending the request, getting the reply), use Setup |
723 | .unwrap_or_else(|| self.setup.maximum_request_length.into()) |
724 | // Turn the u32 into usize, using the max value in case of overflow |
725 | .try_into() |
726 | .unwrap_or(usize::max_value()); |
727 | let length = length * 4; |
728 | *max_bytes = Known(length); |
729 | crate::info!("Maximum request length is {} bytes" , length); |
730 | length |
731 | } |
732 | Known(length) => *length, |
733 | } |
734 | } |
735 | |
736 | fn prefetch_maximum_request_bytes(&self) { |
737 | let mut max_bytes = self.maximum_request_bytes.lock().unwrap(); |
738 | self.prefetch_maximum_request_bytes_impl(&mut max_bytes); |
739 | } |
740 | |
741 | fn parse_error(&self, error: &[u8]) -> Result<crate::x11_utils::X11Error, ParseError> { |
742 | let ext_mgr = self.extension_manager.lock().unwrap(); |
743 | crate::x11_utils::X11Error::try_parse(error, &*ext_mgr) |
744 | } |
745 | |
746 | fn parse_event(&self, event: &[u8]) -> Result<crate::protocol::Event, ParseError> { |
747 | let ext_mgr = self.extension_manager.lock().unwrap(); |
748 | crate::protocol::Event::parse(event, &*ext_mgr) |
749 | } |
750 | } |
751 | |
752 | impl<S: Stream> Connection for RustConnection<S> { |
753 | fn wait_for_raw_event_with_sequence( |
754 | &self, |
755 | ) -> Result<RawEventAndSeqNumber<Vec<u8>>, ConnectionError> { |
756 | let _guard = crate::trace_span!("wait_for_raw_event_with_sequence" ).entered(); |
757 | |
758 | let mut inner = self.inner.lock().unwrap(); |
759 | loop { |
760 | if let Some(event) = inner.inner.poll_for_event_with_sequence() { |
761 | return Ok(event); |
762 | } |
763 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
764 | } |
765 | } |
766 | |
767 | fn poll_for_raw_event_with_sequence( |
768 | &self, |
769 | ) -> Result<Option<RawEventAndSeqNumber<Vec<u8>>>, ConnectionError> { |
770 | let _guard = crate::trace_span!("poll_for_raw_event_with_sequence" ).entered(); |
771 | |
772 | let mut inner = self.inner.lock().unwrap(); |
773 | if let Some(event) = inner.inner.poll_for_event_with_sequence() { |
774 | Ok(Some(event)) |
775 | } else { |
776 | inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?; |
777 | Ok(inner.inner.poll_for_event_with_sequence()) |
778 | } |
779 | } |
780 | |
781 | fn flush(&self) -> Result<(), ConnectionError> { |
782 | let inner = self.inner.lock().unwrap(); |
783 | let _inner = self.flush_impl(inner)?; |
784 | Ok(()) |
785 | } |
786 | |
787 | fn setup(&self) -> &Setup { |
788 | &self.setup |
789 | } |
790 | |
791 | fn generate_id(&self) -> Result<u32, ReplyOrIdError> { |
792 | let mut id_allocator = self.id_allocator.lock().unwrap(); |
793 | if let Some(id) = id_allocator.generate_id() { |
794 | Ok(id) |
795 | } else { |
796 | use crate::protocol::xc_misc::{self, ConnectionExt as _}; |
797 | |
798 | if self |
799 | .extension_information(xc_misc::X11_EXTENSION_NAME)? |
800 | .is_none() |
801 | { |
802 | crate::error!("XIDs are exhausted and XC-MISC extension is not available" ); |
803 | Err(ReplyOrIdError::IdsExhausted) |
804 | } else { |
805 | crate::info!("XIDs are exhausted; fetching free range via XC-MISC" ); |
806 | id_allocator.update_xid_range(&self.xc_misc_get_xid_range()?.reply()?)?; |
807 | id_allocator |
808 | .generate_id() |
809 | .ok_or(ReplyOrIdError::IdsExhausted) |
810 | } |
811 | } |
812 | } |
813 | } |
814 | |
815 | /// Call `notify_all` on a condition variable when dropped. |
816 | #[derive (Debug)] |
817 | struct NotifyOnDrop<'a>(&'a Condvar); |
818 | |
819 | impl Drop for NotifyOnDrop<'_> { |
820 | fn drop(&mut self) { |
821 | self.0.notify_all(); |
822 | } |
823 | } |
824 | |
825 | /// Format information about a request in a Display impl |
826 | struct RequestInfo<'a> { |
827 | extension_manager: &'a Mutex<ExtensionManager>, |
828 | major_opcode: u8, |
829 | minor_opcode: u8, |
830 | } |
831 | |
832 | impl std::fmt::Display for RequestInfo<'_> { |
833 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
834 | // QueryExtension is used by the extension manager. We would deadlock if we |
835 | // tried to lock it again. Hence, this case is hardcoded here. |
836 | if self.major_opcode == QUERY_EXTENSION_REQUEST { |
837 | write!(f, "QueryExtension request" ) |
838 | } else { |
839 | let guard: MutexGuard<'_, ExtensionManager> = self.extension_manager.lock().unwrap(); |
840 | write!( |
841 | f, |
842 | " {} request" , |
843 | x11rb_protocol::protocol::get_request_name( |
844 | &*guard, |
845 | self.major_opcode, |
846 | self.minor_opcode |
847 | ) |
848 | ) |
849 | } |
850 | } |
851 | } |
852 | |