1//! A pure-rust implementation of a connection to an X11 server.
2
3use std::io::IoSlice;
4use std::sync::{Condvar, Mutex, MutexGuard, TryLockError};
5use std::time::Instant;
6
7use crate::connection::{
8 compute_length_field, Connection, ReplyOrError, RequestConnection, RequestKind,
9};
10use crate::cookie::{Cookie, CookieWithFds, VoidCookie};
11use crate::errors::DisplayParsingError;
12pub use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError};
13use crate::extension_manager::ExtensionManager;
14use crate::protocol::bigreq::{ConnectionExt as _, EnableReply};
15use crate::protocol::xproto::{Setup, GET_INPUT_FOCUS_REQUEST, QUERY_EXTENSION_REQUEST};
16use crate::utils::RawFdContainer;
17use crate::x11_utils::{ExtensionInformation, TryParse, TryParseFd};
18use x11rb_protocol::connect::Connect;
19use x11rb_protocol::connection::{Connection as ProtoConnection, PollReply, ReplyFdKind};
20use x11rb_protocol::id_allocator::IdAllocator;
21use x11rb_protocol::{xauth::get_auth, DiscardMode, RawEventAndSeqNumber, SequenceNumber};
22
23mod packet_reader;
24mod stream;
25mod write_buffer;
26
27use packet_reader::PacketReader;
28pub use stream::{DefaultStream, PollMode, Stream};
29use write_buffer::WriteBuffer;
30
31type Buffer = <RustConnection as RequestConnection>::Buf;
32/// A combination of a buffer and a list of file descriptors for use by [`RustConnection`].
33pub type BufWithFds = crate::connection::BufWithFds<Buffer>;
34
35#[derive(Debug)]
36enum MaxRequestBytes {
37 Unknown,
38 Requested(Option<SequenceNumber>),
39 Known(usize),
40}
41
42#[derive(Debug)]
43struct ConnectionInner {
44 inner: ProtoConnection,
45 write_buffer: WriteBuffer,
46}
47
48type MutexGuardInner<'a> = MutexGuard<'a, ConnectionInner>;
49
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
51pub(crate) enum BlockingMode {
52 Blocking,
53 NonBlocking,
54}
55
56/// A connection to an X11 server implemented in pure rust
57///
58/// This type is generic over `S`, which allows to use a generic stream to communicate with the
59/// server. This stream can written to and read from, but it can also be polled, meaning that one
60/// checks if new data can be read or written.
61///
62/// `RustConnection` always used an internal buffer for reading, so `R` does not need
63/// to be buffered.
64#[derive(Debug)]
65pub struct RustConnection<S: Stream = DefaultStream> {
66 inner: Mutex<ConnectionInner>,
67 stream: S,
68 // This mutex is only locked with `try_lock` (never blocks), so a simpler
69 // lock based only on a atomic variable would be more efficient.
70 packet_reader: Mutex<PacketReader>,
71 reader_condition: Condvar,
72 setup: Setup,
73 extension_manager: Mutex<ExtensionManager>,
74 maximum_request_bytes: Mutex<MaxRequestBytes>,
75 id_allocator: Mutex<IdAllocator>,
76}
77
78// Locking rules
79// =============
80//
81// To avoid deadlocks, it is important to have a defined ordering about mutexes:
82//
83// Mutexes that may be locked when no other mutex is held:
84// - maximum_request_bytes
85// - extension_manager
86// - id_allocator
87//
88// Then comes `inner`. This mutex protects the information about in-flight requests and packets
89// that were already read from the connection but not given out to callers. This mutex also
90// contains the write buffer and has to be locked in order to write something to the X11 server.
91// In this case, the mutex has to be kept locked until writing the request has finished. This is
92// necessary to ensure correct sync insertion without threads interfering with each other. When
93// this mutex is locked for operations other than writing, the lock should be kept only for a
94// short time.
95//
96// The inner level is `packet_reader`. This mutex is only locked when `inner` is already held and
97// only with `try_lock()`. This ensures that there is only one reader. While actually reading, the
98// lock on `inner` is released so that other threads can make progress. If more threads want to
99// read while `read` is already locked, they sleep on `reader_condition`. The actual reader will
100// then notify this condition variable once it is done reading.
101//
102// n.b. notgull: write_buffer follows the same rules
103//
104// The condition variable is necessary since one thread may read packets that another thread waits
105// for. Thus, after reading something from the connection, all threads that wait for something have
106// to check if they are the intended recipient.
107
108impl RustConnection<DefaultStream> {
109 /// Establish a new connection.
110 ///
111 /// If no `dpy_name` is provided, the value from `$DISPLAY` is used.
112 pub fn connect(dpy_name: Option<&str>) -> Result<(Self, usize), ConnectError> {
113 // Parse display information
114 let parsed_display = x11rb_protocol::parse_display::parse_display(dpy_name)?;
115 let screen = parsed_display.screen.into();
116
117 // Establish connection by iterating over ConnectAddresses until we find one that
118 // works.
119 let mut error = None;
120 for addr in parsed_display.connect_instruction() {
121 let start = Instant::now();
122 match DefaultStream::connect(&addr) {
123 Ok((stream, (family, address))) => {
124 crate::trace!(
125 "Connected to X11 server via {:?} in {:?}",
126 addr,
127 start.elapsed()
128 );
129
130 // we found a stream, get auth information
131 let (auth_name, auth_data) = get_auth(family, &address, parsed_display.display)
132 // Ignore all errors while determining auth; instead we just try without auth info.
133 .unwrap_or(None)
134 .unwrap_or_else(|| (Vec::new(), Vec::new()));
135 crate::trace!("Picked authentication via auth mechanism {:?}", auth_name);
136
137 // finish connecting to server
138 return Ok((
139 Self::connect_to_stream_with_auth_info(
140 stream, screen, auth_name, auth_data,
141 )?,
142 screen,
143 ));
144 }
145 Err(e) => {
146 crate::debug!("Failed to connect to X11 server via {:?}: {:?}", addr, e);
147 error = Some(e);
148 continue;
149 }
150 }
151 }
152
153 // none of the addresses worked
154 Err(match error {
155 Some(e) => ConnectError::IoError(e),
156 None => DisplayParsingError::Unknown.into(),
157 })
158 }
159}
160
161impl<S: Stream> RustConnection<S> {
162 /// Establish a new connection to the given streams.
163 ///
164 /// `read` is used for reading data from the X11 server and `write` is used for writing.
165 /// `screen` is the number of the screen that should be used. This function checks that a
166 /// screen with that number exists.
167 pub fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> {
168 Self::connect_to_stream_with_auth_info(stream, screen, Vec::new(), Vec::new())
169 }
170
171 /// Establish a new connection to the given streams.
172 ///
173 /// `read` is used for reading data from the X11 server and `write` is used for writing.
174 /// `screen` is the number of the screen that should be used. This function checks that a
175 /// screen with that number exists.
176 ///
177 /// The parameters `auth_name` and `auth_data` are used for the members
178 /// `authorization_protocol_name` and `authorization_protocol_data` of the `SetupRequest` that
179 /// is sent to the X11 server.
180 pub fn connect_to_stream_with_auth_info(
181 stream: S,
182 screen: usize,
183 auth_name: Vec<u8>,
184 auth_data: Vec<u8>,
185 ) -> Result<Self, ConnectError> {
186 let (mut connect, setup_request) = Connect::with_authorization(auth_name, auth_data);
187
188 // write the connect() setup request
189 let mut nwritten = 0;
190 let mut fds = vec![];
191
192 crate::trace!(
193 "Writing connection setup with {} bytes",
194 setup_request.len()
195 );
196 while nwritten != setup_request.len() {
197 stream.poll(PollMode::Writable)?;
198 // poll returned successfully, so the stream is writable.
199 match stream.write(&setup_request[nwritten..], &mut fds) {
200 Ok(0) => {
201 return Err(std::io::Error::new(
202 std::io::ErrorKind::WriteZero,
203 "failed to write whole buffer",
204 )
205 .into())
206 }
207 Ok(n) => nwritten += n,
208 // Spurious wakeup from poll, try again
209 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
210 Err(e) => return Err(e.into()),
211 }
212 }
213
214 // read in the setup
215 loop {
216 stream.poll(PollMode::Readable)?;
217 crate::trace!(
218 "Reading connection setup with at least {} bytes remaining",
219 connect.buffer().len()
220 );
221 let adv = match stream.read(connect.buffer(), &mut fds) {
222 Ok(0) => {
223 return Err(std::io::Error::new(
224 std::io::ErrorKind::UnexpectedEof,
225 "failed to read whole buffer",
226 )
227 .into())
228 }
229 Ok(n) => n,
230 // Spurious wakeup from poll, try again
231 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
232 Err(e) => return Err(e.into()),
233 };
234 crate::trace!("Read {} bytes", adv);
235
236 // advance the internal buffer
237 if connect.advance(adv) {
238 break;
239 }
240 }
241
242 // resolve the setup
243 let setup = connect.into_setup()?;
244
245 // Check that we got a valid screen number
246 if screen >= setup.roots.len() {
247 return Err(ConnectError::InvalidScreen);
248 }
249
250 // Success! Set up our state
251 Self::for_connected_stream(stream, setup)
252 }
253
254 /// Establish a new connection for an already connected stream.
255 ///
256 /// The given `stream` is used for communicating with the X11 server.
257 /// It is assumed that `setup` was just received from the server. Thus, the first reply to a
258 /// request that is sent will have sequence number one.
259 pub fn for_connected_stream(stream: S, setup: Setup) -> Result<Self, ConnectError> {
260 let id_allocator = IdAllocator::new(setup.resource_id_base, setup.resource_id_mask)?;
261
262 Ok(RustConnection {
263 inner: Mutex::new(ConnectionInner {
264 inner: ProtoConnection::new(),
265 write_buffer: WriteBuffer::new(),
266 }),
267 stream,
268 packet_reader: Mutex::new(PacketReader::new()),
269 reader_condition: Condvar::new(),
270 setup,
271 extension_manager: Default::default(),
272 maximum_request_bytes: Mutex::new(MaxRequestBytes::Unknown),
273 id_allocator: Mutex::new(id_allocator),
274 })
275 }
276
277 /// Internal function for actually sending a request.
278 ///
279 /// This function "does the actual work" for `send_request_with_reply()` and
280 /// `send_request_without_reply()`.
281 fn send_request(
282 &self,
283 bufs: &[IoSlice<'_>],
284 fds: Vec<RawFdContainer>,
285 kind: ReplyFdKind,
286 ) -> Result<SequenceNumber, ConnectionError> {
287 let _guard = crate::debug_span!("send_request").entered();
288
289 let request_info = RequestInfo {
290 extension_manager: &self.extension_manager,
291 major_opcode: bufs[0][0],
292 minor_opcode: bufs[0][1],
293 };
294 crate::debug!("Sending {}", request_info);
295
296 let mut storage = Default::default();
297 let bufs = compute_length_field(self, bufs, &mut storage)?;
298
299 // Note: `inner` must be kept blocked until the request has been completely written
300 // or buffered to avoid sending the data of different requests interleaved. For this
301 // reason, `read_packet_and_enqueue` must always be called with `BlockingMode::NonBlocking`
302 // during a write, otherwise `inner` would be temporarily released.
303 let mut inner = self.inner.lock().unwrap();
304
305 loop {
306 let send_result = inner.inner.send_request(kind);
307 match send_result {
308 Some(seqno) => {
309 // Now actually send the buffers
310 let _inner = self.write_all_vectored(inner, bufs, fds)?;
311 return Ok(seqno);
312 }
313 None => {
314 crate::trace!("Syncing with the X11 server since there are too many outstanding void requests");
315 inner = self.send_sync(inner)?;
316 }
317 }
318 }
319 }
320
321 /// Send a synchronisation packet to the X11 server.
322 ///
323 /// This function sends a `GetInputFocus` request to the X11 server and arranges for its reply
324 /// to be ignored. This ensures that a reply is expected (`ConnectionInner.next_reply_expected`
325 /// increases).
326 fn send_sync<'a>(
327 &'a self,
328 mut inner: MutexGuardInner<'a>,
329 ) -> Result<MutexGuardInner<'a>, std::io::Error> {
330 let length = 1u16.to_ne_bytes();
331 let request = [
332 GET_INPUT_FOCUS_REQUEST,
333 0, /* pad */
334 length[0],
335 length[1],
336 ];
337
338 let seqno = inner
339 .inner
340 .send_request(ReplyFdKind::ReplyWithoutFDs)
341 .expect("Sending a HasResponse request should not be blocked by syncs");
342 inner
343 .inner
344 .discard_reply(seqno, DiscardMode::DiscardReplyAndError);
345 let inner = self.write_all_vectored(inner, &[IoSlice::new(&request)], Vec::new())?;
346
347 Ok(inner)
348 }
349
350 /// Write a set of buffers on a `writer`. May also read packets
351 /// from the server.
352 fn write_all_vectored<'a>(
353 &'a self,
354 mut inner: MutexGuardInner<'a>,
355 mut bufs: &[IoSlice<'_>],
356 mut fds: Vec<RawFdContainer>,
357 ) -> std::io::Result<MutexGuardInner<'a>> {
358 let mut partial_buf: &[u8] = &[];
359 while !partial_buf.is_empty() || !bufs.is_empty() {
360 self.stream.poll(PollMode::ReadAndWritable)?;
361 let write_result = if !partial_buf.is_empty() {
362 // "inner" is held, passed into this function, so this should never be held
363 inner
364 .write_buffer
365 .write(&self.stream, partial_buf, &mut fds)
366 } else {
367 // same as above
368 inner
369 .write_buffer
370 .write_vectored(&self.stream, bufs, &mut fds)
371 };
372 match write_result {
373 Ok(0) => {
374 return Err(std::io::Error::new(
375 std::io::ErrorKind::WriteZero,
376 "failed to write anything",
377 ));
378 }
379 Ok(mut count) => {
380 // Successful write
381 if count >= partial_buf.len() {
382 count -= partial_buf.len();
383 partial_buf = &[];
384 } else {
385 partial_buf = &partial_buf[count..];
386 count = 0;
387 }
388 while count > 0 {
389 if count >= bufs[0].len() {
390 count -= bufs[0].len();
391 } else {
392 partial_buf = &bufs[0][count..];
393 count = 0;
394 }
395 bufs = &bufs[1..];
396 // Skip empty slices
397 while bufs.first().map(|s| s.len()) == Some(0) {
398 bufs = &bufs[1..];
399 }
400 }
401 }
402 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
403 crate::trace!("Writing more data would block for now");
404 // Writing would block, try to read instead because the
405 // server might not accept new requests after its
406 // buffered replies have been read.
407 inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
408 }
409 Err(e) => return Err(e),
410 }
411 }
412 if !fds.is_empty() {
413 return Err(std::io::Error::new(
414 std::io::ErrorKind::Other,
415 "Left over FDs after sending the request",
416 ));
417 }
418 Ok(inner)
419 }
420
421 fn flush_impl<'a>(
422 &'a self,
423 mut inner: MutexGuardInner<'a>,
424 ) -> std::io::Result<MutexGuardInner<'a>> {
425 // n.b. notgull: inner guard is held
426 while inner.write_buffer.needs_flush() {
427 self.stream.poll(PollMode::ReadAndWritable)?;
428 let flush_result = inner.write_buffer.flush(&self.stream);
429 match flush_result {
430 // Flush completed
431 Ok(()) => break,
432 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
433 crate::trace!("Flushing more data would block for now");
434 // Writing would block, try to read instead because the
435 // server might not accept new requests after its
436 // buffered replies have been read.
437 inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
438 }
439 Err(e) => return Err(e),
440 }
441 }
442 Ok(inner)
443 }
444
445 /// Read a packet from the connection.
446 ///
447 /// This function waits for an X11 packet to be received. It drops the mutex protecting the
448 /// inner data while waiting for a packet so that other threads can make progress. For this
449 /// reason, you need to pass in a `MutexGuard` to be dropped. This function locks the mutex
450 /// again and returns a new `MutexGuard`.
451 ///
452 /// Note: If `mode` is `BlockingMode::Blocking`, the lock on `inner` will be temporarily
453 /// released. While sending a request, `inner` must be kept locked to avoid sending the data
454 /// of different requests interleaved. So, when `read_packet_and_enqueue` is called as part
455 /// of a write, it must always be done with `mode` set to `BlockingMode::NonBlocking`.
456 fn read_packet_and_enqueue<'a>(
457 &'a self,
458 mut inner: MutexGuardInner<'a>,
459 mode: BlockingMode,
460 ) -> Result<MutexGuardInner<'a>, std::io::Error> {
461 // 0.1. Try to lock the `packet_reader` mutex.
462 match self.packet_reader.try_lock() {
463 Err(TryLockError::WouldBlock) => {
464 // In non-blocking mode, we just return immediately
465 match mode {
466 BlockingMode::NonBlocking => {
467 crate::trace!("read_packet_and_enqueue in NonBlocking mode doing nothing since reader is already locked");
468 return Ok(inner);
469 }
470 BlockingMode::Blocking => {
471 crate::trace!("read_packet_and_enqueue in Blocking mode waiting for pre-existing reader");
472 }
473 }
474
475 // 1.1. Someone else is reading (other thread is at 2.2);
476 // wait for it. `Condvar::wait` will unlock `inner`, so
477 // the other thread can relock `inner` at 2.1.3 (and to allow
478 // other threads to arrive 0.1).
479 //
480 // When `wait` finishes, other thread has enqueued a packet,
481 // so the purpose of this function has been fulfilled. `wait`
482 // will relock `inner` when it returns.
483 Ok(self.reader_condition.wait(inner).unwrap())
484 }
485 Err(TryLockError::Poisoned(e)) => panic!("{}", e),
486 Ok(mut packet_reader) => {
487 // Make sure sleeping readers are woken up when we return
488 // (Even in case of errors)
489 let notify_on_drop = NotifyOnDrop(&self.reader_condition);
490
491 // 2.1. Poll for read if mode is blocking.
492 if mode == BlockingMode::Blocking {
493 // 2.1.1. Unlock `inner`, so other threads can use it while
494 // during the poll.
495 drop(inner);
496 // 2.1.2. Do the actual poll
497 self.stream.poll(PollMode::Readable)?;
498 // 2.1.3. Relock inner
499 inner = self.inner.lock().unwrap();
500 }
501
502 // 2.2. Try to read as many packets as possible without blocking.
503 let mut fds = Vec::new();
504 let mut packets = Vec::new();
505 packet_reader.try_read_packets(&self.stream, &mut packets, &mut fds)?;
506
507 // 2.3. Once `inner` has been relocked, drop the
508 // lock on `packet_reader`. While inner is locked, other
509 // threads cannot arrive at 0.1 anyways.
510 //
511 // `packet_reader` must be unlocked with `inner` is locked,
512 // otherwise it could let another thread wait on 2.1
513 // for a reply that has been read but not enqueued yet.
514 drop(packet_reader);
515
516 // 2.4. Actually enqueue the read packets.
517 inner.inner.enqueue_fds(fds);
518 packets
519 .into_iter()
520 .for_each(|packet| inner.inner.enqueue_packet(packet));
521
522 // 2.5. Notify the condvar by dropping the `notify_on_drop` object.
523 // The object would have been dropped when the function returns, so
524 // the explicit drop is not really needed. The purpose of having a
525 // explicit drop is to... make it explicit.
526 drop(notify_on_drop);
527
528 // 2.6. Return the locked `inner` back to the caller.
529 Ok(inner)
530 }
531 }
532 }
533
534 fn prefetch_maximum_request_bytes_impl(&self, max_bytes: &mut MutexGuard<'_, MaxRequestBytes>) {
535 if let MaxRequestBytes::Unknown = **max_bytes {
536 crate::info!("Prefetching maximum request length");
537 let request = self
538 .bigreq_enable()
539 .map(|cookie| cookie.into_sequence_number())
540 .ok();
541 **max_bytes = MaxRequestBytes::Requested(request);
542 }
543 }
544
545 /// Returns a reference to the contained stream.
546 pub fn stream(&self) -> &S {
547 &self.stream
548 }
549}
550
551impl<S: Stream> RequestConnection for RustConnection<S> {
552 type Buf = Vec<u8>;
553
554 fn send_request_with_reply<Reply>(
555 &self,
556 bufs: &[IoSlice<'_>],
557 fds: Vec<RawFdContainer>,
558 ) -> Result<Cookie<'_, Self, Reply>, ConnectionError>
559 where
560 Reply: TryParse,
561 {
562 Ok(Cookie::new(
563 self,
564 self.send_request(bufs, fds, ReplyFdKind::ReplyWithoutFDs)?,
565 ))
566 }
567
568 fn send_request_with_reply_with_fds<Reply>(
569 &self,
570 bufs: &[IoSlice<'_>],
571 fds: Vec<RawFdContainer>,
572 ) -> Result<CookieWithFds<'_, Self, Reply>, ConnectionError>
573 where
574 Reply: TryParseFd,
575 {
576 Ok(CookieWithFds::new(
577 self,
578 self.send_request(bufs, fds, ReplyFdKind::ReplyWithFDs)?,
579 ))
580 }
581
582 fn send_request_without_reply(
583 &self,
584 bufs: &[IoSlice<'_>],
585 fds: Vec<RawFdContainer>,
586 ) -> Result<VoidCookie<'_, Self>, ConnectionError> {
587 Ok(VoidCookie::new(
588 self,
589 self.send_request(bufs, fds, ReplyFdKind::NoReply)?,
590 ))
591 }
592
593 fn discard_reply(&self, sequence: SequenceNumber, _kind: RequestKind, mode: DiscardMode) {
594 crate::debug!(
595 "Discarding reply to request {} in mode {:?}",
596 sequence,
597 mode
598 );
599 self.inner
600 .lock()
601 .unwrap()
602 .inner
603 .discard_reply(sequence, mode);
604 }
605
606 fn prefetch_extension_information(
607 &self,
608 extension_name: &'static str,
609 ) -> Result<(), ConnectionError> {
610 self.extension_manager
611 .lock()
612 .unwrap()
613 .prefetch_extension_information(self, extension_name)
614 }
615
616 fn extension_information(
617 &self,
618 extension_name: &'static str,
619 ) -> Result<Option<ExtensionInformation>, ConnectionError> {
620 self.extension_manager
621 .lock()
622 .unwrap()
623 .extension_information(self, extension_name)
624 }
625
626 fn wait_for_reply_or_raw_error(
627 &self,
628 sequence: SequenceNumber,
629 ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> {
630 match self.wait_for_reply_with_fds_raw(sequence)? {
631 ReplyOrError::Reply((reply, _fds)) => Ok(ReplyOrError::Reply(reply)),
632 ReplyOrError::Error(e) => Ok(ReplyOrError::Error(e)),
633 }
634 }
635
636 fn wait_for_reply(&self, sequence: SequenceNumber) -> Result<Option<Vec<u8>>, ConnectionError> {
637 let _guard = crate::debug_span!("wait_for_reply", sequence).entered();
638
639 let mut inner = self.inner.lock().unwrap();
640 inner = self.flush_impl(inner)?;
641 loop {
642 crate::trace!({ sequence }, "Polling for reply");
643 let poll_result = inner.inner.poll_for_reply(sequence);
644 match poll_result {
645 PollReply::TryAgain => {}
646 PollReply::NoReply => return Ok(None),
647 PollReply::Reply(buffer) => return Ok(Some(buffer)),
648 }
649 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
650 }
651 }
652
653 fn check_for_raw_error(
654 &self,
655 sequence: SequenceNumber,
656 ) -> Result<Option<Buffer>, ConnectionError> {
657 let _guard = crate::debug_span!("check_for_raw_error", sequence).entered();
658
659 let mut inner = self.inner.lock().unwrap();
660 if inner.inner.prepare_check_for_reply_or_error(sequence) {
661 crate::trace!("Inserting sync with the X11 server");
662 inner = self.send_sync(inner)?;
663 assert!(!inner.inner.prepare_check_for_reply_or_error(sequence));
664 }
665 // Ensure the request is sent
666 inner = self.flush_impl(inner)?;
667 loop {
668 crate::trace!({ sequence }, "Polling for reply or error");
669 let poll_result = inner.inner.poll_check_for_reply_or_error(sequence);
670 match poll_result {
671 PollReply::TryAgain => {}
672 PollReply::NoReply => return Ok(None),
673 PollReply::Reply(buffer) => return Ok(Some(buffer)),
674 }
675 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
676 }
677 }
678
679 fn wait_for_reply_with_fds_raw(
680 &self,
681 sequence: SequenceNumber,
682 ) -> Result<ReplyOrError<BufWithFds, Buffer>, ConnectionError> {
683 let _guard = crate::debug_span!("wait_for_reply_with_fds_raw", sequence).entered();
684
685 let mut inner = self.inner.lock().unwrap();
686 // Ensure the request is sent
687 inner = self.flush_impl(inner)?;
688 loop {
689 crate::trace!({ sequence }, "Polling for reply or error");
690 if let Some(reply) = inner.inner.poll_for_reply_or_error(sequence) {
691 if reply.0[0] == 0 {
692 crate::trace!("Got error");
693 return Ok(ReplyOrError::Error(reply.0));
694 } else {
695 crate::trace!("Got reply");
696 return Ok(ReplyOrError::Reply(reply));
697 }
698 }
699 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
700 }
701 }
702
703 fn maximum_request_bytes(&self) -> usize {
704 let mut max_bytes = self.maximum_request_bytes.lock().unwrap();
705 self.prefetch_maximum_request_bytes_impl(&mut max_bytes);
706 use MaxRequestBytes::*;
707 let max_bytes = &mut *max_bytes;
708 match max_bytes {
709 Unknown => unreachable!("We just prefetched this"),
710 Requested(seqno) => {
711 let _guard = crate::info_span!("maximum_request_bytes").entered();
712
713 let length = seqno
714 // If prefetching the request succeeded, get a cookie
715 .and_then(|seqno| {
716 Cookie::<_, EnableReply>::new(self, seqno)
717 // and then get the reply to the request
718 .reply()
719 .map(|reply| reply.maximum_request_length)
720 .ok()
721 })
722 // If anything failed (sending the request, getting the reply), use Setup
723 .unwrap_or_else(|| self.setup.maximum_request_length.into())
724 // Turn the u32 into usize, using the max value in case of overflow
725 .try_into()
726 .unwrap_or(usize::max_value());
727 let length = length * 4;
728 *max_bytes = Known(length);
729 crate::info!("Maximum request length is {} bytes", length);
730 length
731 }
732 Known(length) => *length,
733 }
734 }
735
736 fn prefetch_maximum_request_bytes(&self) {
737 let mut max_bytes = self.maximum_request_bytes.lock().unwrap();
738 self.prefetch_maximum_request_bytes_impl(&mut max_bytes);
739 }
740
741 fn parse_error(&self, error: &[u8]) -> Result<crate::x11_utils::X11Error, ParseError> {
742 let ext_mgr = self.extension_manager.lock().unwrap();
743 crate::x11_utils::X11Error::try_parse(error, &*ext_mgr)
744 }
745
746 fn parse_event(&self, event: &[u8]) -> Result<crate::protocol::Event, ParseError> {
747 let ext_mgr = self.extension_manager.lock().unwrap();
748 crate::protocol::Event::parse(event, &*ext_mgr)
749 }
750}
751
752impl<S: Stream> Connection for RustConnection<S> {
753 fn wait_for_raw_event_with_sequence(
754 &self,
755 ) -> Result<RawEventAndSeqNumber<Vec<u8>>, ConnectionError> {
756 let _guard = crate::trace_span!("wait_for_raw_event_with_sequence").entered();
757
758 let mut inner = self.inner.lock().unwrap();
759 loop {
760 if let Some(event) = inner.inner.poll_for_event_with_sequence() {
761 return Ok(event);
762 }
763 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
764 }
765 }
766
767 fn poll_for_raw_event_with_sequence(
768 &self,
769 ) -> Result<Option<RawEventAndSeqNumber<Vec<u8>>>, ConnectionError> {
770 let _guard = crate::trace_span!("poll_for_raw_event_with_sequence").entered();
771
772 let mut inner = self.inner.lock().unwrap();
773 if let Some(event) = inner.inner.poll_for_event_with_sequence() {
774 Ok(Some(event))
775 } else {
776 inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
777 Ok(inner.inner.poll_for_event_with_sequence())
778 }
779 }
780
781 fn flush(&self) -> Result<(), ConnectionError> {
782 let inner = self.inner.lock().unwrap();
783 let _inner = self.flush_impl(inner)?;
784 Ok(())
785 }
786
787 fn setup(&self) -> &Setup {
788 &self.setup
789 }
790
791 fn generate_id(&self) -> Result<u32, ReplyOrIdError> {
792 let mut id_allocator = self.id_allocator.lock().unwrap();
793 if let Some(id) = id_allocator.generate_id() {
794 Ok(id)
795 } else {
796 use crate::protocol::xc_misc::{self, ConnectionExt as _};
797
798 if self
799 .extension_information(xc_misc::X11_EXTENSION_NAME)?
800 .is_none()
801 {
802 crate::error!("XIDs are exhausted and XC-MISC extension is not available");
803 Err(ReplyOrIdError::IdsExhausted)
804 } else {
805 crate::info!("XIDs are exhausted; fetching free range via XC-MISC");
806 id_allocator.update_xid_range(&self.xc_misc_get_xid_range()?.reply()?)?;
807 id_allocator
808 .generate_id()
809 .ok_or(ReplyOrIdError::IdsExhausted)
810 }
811 }
812 }
813}
814
815/// Call `notify_all` on a condition variable when dropped.
816#[derive(Debug)]
817struct NotifyOnDrop<'a>(&'a Condvar);
818
819impl Drop for NotifyOnDrop<'_> {
820 fn drop(&mut self) {
821 self.0.notify_all();
822 }
823}
824
825/// Format information about a request in a Display impl
826struct RequestInfo<'a> {
827 extension_manager: &'a Mutex<ExtensionManager>,
828 major_opcode: u8,
829 minor_opcode: u8,
830}
831
832impl std::fmt::Display for RequestInfo<'_> {
833 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
834 // QueryExtension is used by the extension manager. We would deadlock if we
835 // tried to lock it again. Hence, this case is hardcoded here.
836 if self.major_opcode == QUERY_EXTENSION_REQUEST {
837 write!(f, "QueryExtension request")
838 } else {
839 let guard: MutexGuard<'_, ExtensionManager> = self.extension_manager.lock().unwrap();
840 write!(
841 f,
842 "{} request",
843 x11rb_protocol::protocol::get_request_name(
844 &*guard,
845 self.major_opcode,
846 self.minor_opcode
847 )
848 )
849 }
850 }
851}
852