1//! A pure-rust implementation of a connection to an X11 server.
2
3use std::convert::TryInto;
4use std::io::IoSlice;
5use std::mem::drop;
6use std::sync::{Condvar, Mutex, MutexGuard, TryLockError};
7use std::time::Instant;
8
9use crate::connection::{
10 compute_length_field, Connection, ReplyOrError, RequestConnection, RequestKind,
11};
12use crate::cookie::{Cookie, CookieWithFds, VoidCookie};
13use crate::errors::DisplayParsingError;
14pub use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError};
15use crate::extension_manager::ExtensionManager;
16use crate::protocol::bigreq::{ConnectionExt as _, EnableReply};
17use crate::protocol::xproto::{Setup, GET_INPUT_FOCUS_REQUEST, QUERY_EXTENSION_REQUEST};
18use crate::utils::RawFdContainer;
19use crate::x11_utils::{ExtensionInformation, TryParse, TryParseFd};
20use x11rb_protocol::connect::Connect;
21use x11rb_protocol::connection::{Connection as ProtoConnection, PollReply, ReplyFdKind};
22use x11rb_protocol::id_allocator::IdAllocator;
23use x11rb_protocol::{xauth::get_auth, DiscardMode, RawEventAndSeqNumber, SequenceNumber};
24
25mod packet_reader;
26mod stream;
27mod write_buffer;
28
29use packet_reader::PacketReader;
30pub use stream::{DefaultStream, PollMode, Stream};
31use write_buffer::WriteBuffer;
32
33type Buffer = <RustConnection as RequestConnection>::Buf;
34/// A combination of a buffer and a list of file descriptors for use by [`RustConnection`].
35pub type BufWithFds = crate::connection::BufWithFds<Buffer>;
36
37#[derive(Debug)]
38enum MaxRequestBytes {
39 Unknown,
40 Requested(Option<SequenceNumber>),
41 Known(usize),
42}
43
44#[derive(Debug)]
45struct ConnectionInner {
46 inner: ProtoConnection,
47 write_buffer: WriteBuffer,
48}
49
50type MutexGuardInner<'a> = MutexGuard<'a, ConnectionInner>;
51
52#[derive(Debug, Copy, Clone, PartialEq, Eq)]
53pub(crate) enum BlockingMode {
54 Blocking,
55 NonBlocking,
56}
57
58/// A connection to an X11 server implemented in pure rust
59///
60/// This type is generic over `S`, which allows to use a generic stream to communicate with the
61/// server. This stream can written to and read from, but it can also be polled, meaning that one
62/// checks if new data can be read or written.
63///
64/// `RustConnection` always used an internal buffer for reading, so `R` does not need
65/// to be buffered.
66#[derive(Debug)]
67pub struct RustConnection<S: Stream = DefaultStream> {
68 inner: Mutex<ConnectionInner>,
69 stream: S,
70 // This mutex is only locked with `try_lock` (never blocks), so a simpler
71 // lock based only on a atomic variable would be more efficient.
72 packet_reader: Mutex<PacketReader>,
73 reader_condition: Condvar,
74 setup: Setup,
75 extension_manager: Mutex<ExtensionManager>,
76 maximum_request_bytes: Mutex<MaxRequestBytes>,
77 id_allocator: Mutex<IdAllocator>,
78}
79
80// Locking rules
81// =============
82//
83// To avoid deadlocks, it is important to have a defined ordering about mutexes:
84//
85// Mutexes that may be locked when no other mutex is held:
86// - maximum_request_bytes
87// - extension_manager
88// - id_allocator
89//
90// Then comes `inner`. This mutex protects the information about in-flight requests and packets
91// that were already read from the connection but not given out to callers. This mutex also
92// contains the write buffer and has to be locked in order to write something to the X11 server.
93// In this case, the mutex has to be kept locked until writing the request has finished. This is
94// necessary to ensure correct sync insertion without threads interfering with each other. When
95// this mutex is locked for operations other than writing, the lock should be kept only for a
96// short time.
97//
98// The inner level is `packet_reader`. This mutex is only locked when `inner` is already held and
99// only with `try_lock()`. This ensures that there is only one reader. While actually reading, the
100// lock on `inner` is released so that other threads can make progress. If more threads want to
101// read while `read` is already locked, they sleep on `reader_condition`. The actual reader will
102// then notify this condition variable once it is done reading.
103//
104// n.b. notgull: write_buffer follows the same rules
105//
106// The condition variable is necessary since one thread may read packets that another thread waits
107// for. Thus, after reading something from the connection, all threads that wait for something have
108// to check if they are the intended recipient.
109
110impl RustConnection<DefaultStream> {
111 /// Establish a new connection.
112 ///
113 /// If no `dpy_name` is provided, the value from `$DISPLAY` is used.
114 pub fn connect(dpy_name: Option<&str>) -> Result<(Self, usize), ConnectError> {
115 // Parse display information
116 let parsed_display = x11rb_protocol::parse_display::parse_display(dpy_name)?;
117 let screen = parsed_display.screen.into();
118
119 // Establish connection by iterating over ConnectAddresses until we find one that
120 // works.
121 let mut error = None;
122 for addr in parsed_display.connect_instruction() {
123 let start = Instant::now();
124 match DefaultStream::connect(&addr) {
125 Ok((stream, (family, address))) => {
126 crate::trace!(
127 "Connected to X11 server via {:?} in {:?}",
128 addr,
129 start.elapsed()
130 );
131
132 // we found a stream, get auth information
133 let (auth_name, auth_data) = get_auth(family, &address, parsed_display.display)
134 // Ignore all errors while determining auth; instead we just try without auth info.
135 .unwrap_or(None)
136 .unwrap_or_else(|| (Vec::new(), Vec::new()));
137 crate::trace!("Picked authentication via auth mechanism {:?}", auth_name);
138
139 // finish connecting to server
140 return Ok((
141 Self::connect_to_stream_with_auth_info(
142 stream, screen, auth_name, auth_data,
143 )?,
144 screen,
145 ));
146 }
147 Err(e) => {
148 crate::debug!("Failed to connect to X11 server via {:?}: {:?}", addr, e);
149 error = Some(e);
150 continue;
151 }
152 }
153 }
154
155 // none of the addresses worked
156 Err(match error {
157 Some(e) => ConnectError::IoError(e),
158 None => DisplayParsingError::Unknown.into(),
159 })
160 }
161}
162
163impl<S: Stream> RustConnection<S> {
164 /// Establish a new connection to the given streams.
165 ///
166 /// `read` is used for reading data from the X11 server and `write` is used for writing.
167 /// `screen` is the number of the screen that should be used. This function checks that a
168 /// screen with that number exists.
169 pub fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> {
170 Self::connect_to_stream_with_auth_info(stream, screen, Vec::new(), Vec::new())
171 }
172
173 /// Establish a new connection to the given streams.
174 ///
175 /// `read` is used for reading data from the X11 server and `write` is used for writing.
176 /// `screen` is the number of the screen that should be used. This function checks that a
177 /// screen with that number exists.
178 ///
179 /// The parameters `auth_name` and `auth_data` are used for the members
180 /// `authorization_protocol_name` and `authorization_protocol_data` of the `SetupRequest` that
181 /// is sent to the X11 server.
182 pub fn connect_to_stream_with_auth_info(
183 stream: S,
184 screen: usize,
185 auth_name: Vec<u8>,
186 auth_data: Vec<u8>,
187 ) -> Result<Self, ConnectError> {
188 let (mut connect, setup_request) = Connect::with_authorization(auth_name, auth_data);
189
190 // write the connect() setup request
191 let mut nwritten = 0;
192 let mut fds = vec![];
193
194 crate::trace!(
195 "Writing connection setup with {} bytes",
196 setup_request.len()
197 );
198 while nwritten != setup_request.len() {
199 stream.poll(PollMode::Writable)?;
200 // poll returned successfully, so the stream is writable.
201 match stream.write(&setup_request[nwritten..], &mut fds) {
202 Ok(0) => {
203 return Err(std::io::Error::new(
204 std::io::ErrorKind::WriteZero,
205 "failed to write whole buffer",
206 )
207 .into())
208 }
209 Ok(n) => nwritten += n,
210 // Spurious wakeup from poll, try again
211 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
212 Err(e) => return Err(e.into()),
213 }
214 }
215
216 // read in the setup
217 loop {
218 stream.poll(PollMode::Readable)?;
219 crate::trace!(
220 "Reading connection setup with at least {} bytes remaining",
221 connect.buffer().len()
222 );
223 let adv = match stream.read(connect.buffer(), &mut fds) {
224 Ok(0) => {
225 return Err(std::io::Error::new(
226 std::io::ErrorKind::UnexpectedEof,
227 "failed to read whole buffer",
228 )
229 .into())
230 }
231 Ok(n) => n,
232 // Spurious wakeup from poll, try again
233 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
234 Err(e) => return Err(e.into()),
235 };
236 crate::trace!("Read {} bytes", adv);
237
238 // advance the internal buffer
239 if connect.advance(adv) {
240 break;
241 }
242 }
243
244 // resolve the setup
245 let setup = connect.into_setup()?;
246
247 // Check that we got a valid screen number
248 if screen >= setup.roots.len() {
249 return Err(ConnectError::InvalidScreen);
250 }
251
252 // Success! Set up our state
253 Self::for_connected_stream(stream, setup)
254 }
255
256 /// Establish a new connection for an already connected stream.
257 ///
258 /// The given `stream` is used for communicating with the X11 server.
259 /// It is assumed that `setup` was just received from the server. Thus, the first reply to a
260 /// request that is sent will have sequence number one.
261 pub fn for_connected_stream(stream: S, setup: Setup) -> Result<Self, ConnectError> {
262 let id_allocator = IdAllocator::new(setup.resource_id_base, setup.resource_id_mask)?;
263
264 Ok(RustConnection {
265 inner: Mutex::new(ConnectionInner {
266 inner: ProtoConnection::new(),
267 write_buffer: WriteBuffer::new(),
268 }),
269 stream,
270 packet_reader: Mutex::new(PacketReader::new()),
271 reader_condition: Condvar::new(),
272 setup,
273 extension_manager: Default::default(),
274 maximum_request_bytes: Mutex::new(MaxRequestBytes::Unknown),
275 id_allocator: Mutex::new(id_allocator),
276 })
277 }
278
279 /// Internal function for actually sending a request.
280 ///
281 /// This function "does the actual work" for `send_request_with_reply()` and
282 /// `send_request_without_reply()`.
283 fn send_request(
284 &self,
285 bufs: &[IoSlice<'_>],
286 fds: Vec<RawFdContainer>,
287 kind: ReplyFdKind,
288 ) -> Result<SequenceNumber, ConnectionError> {
289 let _guard = crate::debug_span!("send_request").entered();
290
291 let request_info = RequestInfo {
292 extension_manager: &self.extension_manager,
293 major_opcode: bufs[0][0],
294 minor_opcode: bufs[0][1],
295 };
296 crate::debug!("Sending {}", request_info);
297
298 let mut storage = Default::default();
299 let bufs = compute_length_field(self, bufs, &mut storage)?;
300
301 // Note: `inner` must be kept blocked until the request has been completely written
302 // or buffered to avoid sending the data of different requests interleaved. For this
303 // reason, `read_packet_and_enqueue` must always be called with `BlockingMode::NonBlocking`
304 // during a write, otherwise `inner` would be temporarily released.
305 let mut inner = self.inner.lock().unwrap();
306
307 loop {
308 let send_result = inner.inner.send_request(kind);
309 match send_result {
310 Some(seqno) => {
311 // Now actually send the buffers
312 let _inner = self.write_all_vectored(inner, bufs, fds)?;
313 return Ok(seqno);
314 }
315 None => {
316 crate::trace!("Syncing with the X11 server since there are too many outstanding void requests");
317 inner = self.send_sync(inner)?;
318 }
319 }
320 }
321 }
322
323 /// Send a synchronisation packet to the X11 server.
324 ///
325 /// This function sends a `GetInputFocus` request to the X11 server and arranges for its reply
326 /// to be ignored. This ensures that a reply is expected (`ConnectionInner.next_reply_expected`
327 /// increases).
328 fn send_sync<'a>(
329 &'a self,
330 mut inner: MutexGuardInner<'a>,
331 ) -> Result<MutexGuardInner<'a>, std::io::Error> {
332 let length = 1u16.to_ne_bytes();
333 let request = [
334 GET_INPUT_FOCUS_REQUEST,
335 0, /* pad */
336 length[0],
337 length[1],
338 ];
339
340 let seqno = inner
341 .inner
342 .send_request(ReplyFdKind::ReplyWithoutFDs)
343 .expect("Sending a HasResponse request should not be blocked by syncs");
344 inner
345 .inner
346 .discard_reply(seqno, DiscardMode::DiscardReplyAndError);
347 let inner = self.write_all_vectored(inner, &[IoSlice::new(&request)], Vec::new())?;
348
349 Ok(inner)
350 }
351
352 /// Write a set of buffers on a `writer`. May also read packets
353 /// from the server.
354 fn write_all_vectored<'a>(
355 &'a self,
356 mut inner: MutexGuardInner<'a>,
357 mut bufs: &[IoSlice<'_>],
358 mut fds: Vec<RawFdContainer>,
359 ) -> std::io::Result<MutexGuardInner<'a>> {
360 let mut partial_buf: &[u8] = &[];
361 while !partial_buf.is_empty() || !bufs.is_empty() {
362 self.stream.poll(PollMode::ReadAndWritable)?;
363 let write_result = if !partial_buf.is_empty() {
364 // "inner" is held, passed into this function, so this should never be held
365 inner
366 .write_buffer
367 .write(&self.stream, partial_buf, &mut fds)
368 } else {
369 // same as above
370 inner
371 .write_buffer
372 .write_vectored(&self.stream, bufs, &mut fds)
373 };
374 match write_result {
375 Ok(0) => {
376 return Err(std::io::Error::new(
377 std::io::ErrorKind::WriteZero,
378 "failed to write anything",
379 ));
380 }
381 Ok(mut count) => {
382 // Successful write
383 if count >= partial_buf.len() {
384 count -= partial_buf.len();
385 partial_buf = &[];
386 } else {
387 partial_buf = &partial_buf[count..];
388 count = 0;
389 }
390 while count > 0 {
391 if count >= bufs[0].len() {
392 count -= bufs[0].len();
393 } else {
394 partial_buf = &bufs[0][count..];
395 count = 0;
396 }
397 bufs = &bufs[1..];
398 // Skip empty slices
399 while bufs.first().map(|s| s.len()) == Some(0) {
400 bufs = &bufs[1..];
401 }
402 }
403 }
404 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
405 crate::trace!("Writing more data would block for now");
406 // Writing would block, try to read instead because the
407 // server might not accept new requests after its
408 // buffered replies have been read.
409 inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
410 }
411 Err(e) => return Err(e),
412 }
413 }
414 if !fds.is_empty() {
415 return Err(std::io::Error::new(
416 std::io::ErrorKind::Other,
417 "Left over FDs after sending the request",
418 ));
419 }
420 Ok(inner)
421 }
422
423 fn flush_impl<'a>(
424 &'a self,
425 mut inner: MutexGuardInner<'a>,
426 ) -> std::io::Result<MutexGuardInner<'a>> {
427 // n.b. notgull: inner guard is held
428 while inner.write_buffer.needs_flush() {
429 self.stream.poll(PollMode::ReadAndWritable)?;
430 let flush_result = inner.write_buffer.flush(&self.stream);
431 match flush_result {
432 // Flush completed
433 Ok(()) => break,
434 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
435 crate::trace!("Flushing more data would block for now");
436 // Writing would block, try to read instead because the
437 // server might not accept new requests after its
438 // buffered replies have been read.
439 inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
440 }
441 Err(e) => return Err(e),
442 }
443 }
444 Ok(inner)
445 }
446
447 /// Read a packet from the connection.
448 ///
449 /// This function waits for an X11 packet to be received. It drops the mutex protecting the
450 /// inner data while waiting for a packet so that other threads can make progress. For this
451 /// reason, you need to pass in a `MutexGuard` to be dropped. This function locks the mutex
452 /// again and returns a new `MutexGuard`.
453 ///
454 /// Note: If `mode` is `BlockingMode::Blocking`, the lock on `inner` will be temporarily
455 /// released. While sending a request, `inner` must be kept locked to avoid sending the data
456 /// of different requests interleaved. So, when `read_packet_and_enqueue` is called as part
457 /// of a write, it must always be done with `mode` set to `BlockingMode::NonBlocking`.
458 fn read_packet_and_enqueue<'a>(
459 &'a self,
460 mut inner: MutexGuardInner<'a>,
461 mode: BlockingMode,
462 ) -> Result<MutexGuardInner<'a>, std::io::Error> {
463 // 0.1. Try to lock the `packet_reader` mutex.
464 match self.packet_reader.try_lock() {
465 Err(TryLockError::WouldBlock) => {
466 // In non-blocking mode, we just return immediately
467 match mode {
468 BlockingMode::NonBlocking => {
469 crate::trace!("read_packet_and_enqueue in NonBlocking mode doing nothing since reader is already locked");
470 return Ok(inner);
471 }
472 BlockingMode::Blocking => {
473 crate::trace!("read_packet_and_enqueue in Blocking mode waiting for pre-existing reader");
474 }
475 }
476
477 // 1.1. Someone else is reading (other thread is at 2.2);
478 // wait for it. `Condvar::wait` will unlock `inner`, so
479 // the other thread can relock `inner` at 2.1.3 (and to allow
480 // other threads to arrive 0.1).
481 //
482 // When `wait` finishes, other thread has enqueued a packet,
483 // so the purpose of this function has been fulfilled. `wait`
484 // will relock `inner` when it returns.
485 Ok(self.reader_condition.wait(inner).unwrap())
486 }
487 Err(TryLockError::Poisoned(e)) => panic!("{}", e),
488 Ok(mut packet_reader) => {
489 // Make sure sleeping readers are woken up when we return
490 // (Even in case of errors)
491 let notify_on_drop = NotifyOnDrop(&self.reader_condition);
492
493 // 2.1. Poll for read if mode is blocking.
494 if mode == BlockingMode::Blocking {
495 // 2.1.1. Unlock `inner`, so other threads can use it while
496 // during the poll.
497 drop(inner);
498 // 2.1.2. Do the actual poll
499 self.stream.poll(PollMode::Readable)?;
500 // 2.1.3. Relock inner
501 inner = self.inner.lock().unwrap();
502 }
503
504 // 2.2. Try to read as many packets as possible without blocking.
505 let mut fds = Vec::new();
506 let mut packets = Vec::new();
507 packet_reader.try_read_packets(&self.stream, &mut packets, &mut fds)?;
508
509 // 2.3. Once `inner` has been relocked, drop the
510 // lock on `packet_reader`. While inner is locked, other
511 // threads cannot arrive at 0.1 anyways.
512 //
513 // `packet_reader` must be unlocked with `inner` is locked,
514 // otherwise it could let another thread wait on 2.1
515 // for a reply that has been read but not enqueued yet.
516 drop(packet_reader);
517
518 // 2.4. Actually enqueue the read packets.
519 inner.inner.enqueue_fds(fds);
520 packets
521 .into_iter()
522 .for_each(|packet| inner.inner.enqueue_packet(packet));
523
524 // 2.5. Notify the condvar by dropping the `notify_on_drop` object.
525 // The object would have been dropped when the function returns, so
526 // the explicit drop is not really needed. The purpose of having a
527 // explicit drop is to... make it explicit.
528 drop(notify_on_drop);
529
530 // 2.6. Return the locked `inner` back to the caller.
531 Ok(inner)
532 }
533 }
534 }
535
536 fn prefetch_maximum_request_bytes_impl(&self, max_bytes: &mut MutexGuard<'_, MaxRequestBytes>) {
537 if let MaxRequestBytes::Unknown = **max_bytes {
538 crate::info!("Prefetching maximum request length");
539 let request = self
540 .bigreq_enable()
541 .map(|cookie| cookie.into_sequence_number())
542 .ok();
543 **max_bytes = MaxRequestBytes::Requested(request);
544 }
545 }
546
547 /// Returns a reference to the contained stream.
548 pub fn stream(&self) -> &S {
549 &self.stream
550 }
551}
552
553impl<S: Stream> RequestConnection for RustConnection<S> {
554 type Buf = Vec<u8>;
555
556 fn send_request_with_reply<Reply>(
557 &self,
558 bufs: &[IoSlice<'_>],
559 fds: Vec<RawFdContainer>,
560 ) -> Result<Cookie<'_, Self, Reply>, ConnectionError>
561 where
562 Reply: TryParse,
563 {
564 Ok(Cookie::new(
565 self,
566 self.send_request(bufs, fds, ReplyFdKind::ReplyWithoutFDs)?,
567 ))
568 }
569
570 fn send_request_with_reply_with_fds<Reply>(
571 &self,
572 bufs: &[IoSlice<'_>],
573 fds: Vec<RawFdContainer>,
574 ) -> Result<CookieWithFds<'_, Self, Reply>, ConnectionError>
575 where
576 Reply: TryParseFd,
577 {
578 Ok(CookieWithFds::new(
579 self,
580 self.send_request(bufs, fds, ReplyFdKind::ReplyWithFDs)?,
581 ))
582 }
583
584 fn send_request_without_reply(
585 &self,
586 bufs: &[IoSlice<'_>],
587 fds: Vec<RawFdContainer>,
588 ) -> Result<VoidCookie<'_, Self>, ConnectionError> {
589 Ok(VoidCookie::new(
590 self,
591 self.send_request(bufs, fds, ReplyFdKind::NoReply)?,
592 ))
593 }
594
595 fn discard_reply(&self, sequence: SequenceNumber, _kind: RequestKind, mode: DiscardMode) {
596 crate::debug!(
597 "Discarding reply to request {} in mode {:?}",
598 sequence,
599 mode
600 );
601 self.inner
602 .lock()
603 .unwrap()
604 .inner
605 .discard_reply(sequence, mode);
606 }
607
608 fn prefetch_extension_information(
609 &self,
610 extension_name: &'static str,
611 ) -> Result<(), ConnectionError> {
612 self.extension_manager
613 .lock()
614 .unwrap()
615 .prefetch_extension_information(self, extension_name)
616 }
617
618 fn extension_information(
619 &self,
620 extension_name: &'static str,
621 ) -> Result<Option<ExtensionInformation>, ConnectionError> {
622 self.extension_manager
623 .lock()
624 .unwrap()
625 .extension_information(self, extension_name)
626 }
627
628 fn wait_for_reply_or_raw_error(
629 &self,
630 sequence: SequenceNumber,
631 ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> {
632 match self.wait_for_reply_with_fds_raw(sequence)? {
633 ReplyOrError::Reply((reply, _fds)) => Ok(ReplyOrError::Reply(reply)),
634 ReplyOrError::Error(e) => Ok(ReplyOrError::Error(e)),
635 }
636 }
637
638 fn wait_for_reply(&self, sequence: SequenceNumber) -> Result<Option<Vec<u8>>, ConnectionError> {
639 let _guard = crate::debug_span!("wait_for_reply", sequence).entered();
640
641 let mut inner = self.inner.lock().unwrap();
642 inner = self.flush_impl(inner)?;
643 loop {
644 crate::trace!({ sequence }, "Polling for reply");
645 let poll_result = inner.inner.poll_for_reply(sequence);
646 match poll_result {
647 PollReply::TryAgain => {}
648 PollReply::NoReply => return Ok(None),
649 PollReply::Reply(buffer) => return Ok(Some(buffer)),
650 }
651 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
652 }
653 }
654
655 fn check_for_raw_error(
656 &self,
657 sequence: SequenceNumber,
658 ) -> Result<Option<Buffer>, ConnectionError> {
659 let _guard = crate::debug_span!("check_for_raw_error", sequence).entered();
660
661 let mut inner = self.inner.lock().unwrap();
662 if inner.inner.prepare_check_for_reply_or_error(sequence) {
663 crate::trace!("Inserting sync with the X11 server");
664 inner = self.send_sync(inner)?;
665 assert!(!inner.inner.prepare_check_for_reply_or_error(sequence));
666 }
667 // Ensure the request is sent
668 inner = self.flush_impl(inner)?;
669 loop {
670 crate::trace!({ sequence }, "Polling for reply or error");
671 let poll_result = inner.inner.poll_check_for_reply_or_error(sequence);
672 match poll_result {
673 PollReply::TryAgain => {}
674 PollReply::NoReply => return Ok(None),
675 PollReply::Reply(buffer) => return Ok(Some(buffer)),
676 }
677 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
678 }
679 }
680
681 fn wait_for_reply_with_fds_raw(
682 &self,
683 sequence: SequenceNumber,
684 ) -> Result<ReplyOrError<BufWithFds, Buffer>, ConnectionError> {
685 let _guard = crate::debug_span!("wait_for_reply_with_fds_raw", sequence).entered();
686
687 let mut inner = self.inner.lock().unwrap();
688 // Ensure the request is sent
689 inner = self.flush_impl(inner)?;
690 loop {
691 crate::trace!({ sequence }, "Polling for reply or error");
692 if let Some(reply) = inner.inner.poll_for_reply_or_error(sequence) {
693 if reply.0[0] == 0 {
694 crate::trace!("Got error");
695 return Ok(ReplyOrError::Error(reply.0));
696 } else {
697 crate::trace!("Got reply");
698 return Ok(ReplyOrError::Reply(reply));
699 }
700 }
701 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
702 }
703 }
704
705 fn maximum_request_bytes(&self) -> usize {
706 let mut max_bytes = self.maximum_request_bytes.lock().unwrap();
707 self.prefetch_maximum_request_bytes_impl(&mut max_bytes);
708 use MaxRequestBytes::*;
709 let max_bytes = &mut *max_bytes;
710 match max_bytes {
711 Unknown => unreachable!("We just prefetched this"),
712 Requested(seqno) => {
713 let _guard = crate::info_span!("maximum_request_bytes").entered();
714
715 let length = seqno
716 // If prefetching the request succeeded, get a cookie
717 .and_then(|seqno| {
718 Cookie::<_, EnableReply>::new(self, seqno)
719 // and then get the reply to the request
720 .reply()
721 .map(|reply| reply.maximum_request_length)
722 .ok()
723 })
724 // If anything failed (sending the request, getting the reply), use Setup
725 .unwrap_or_else(|| self.setup.maximum_request_length.into())
726 // Turn the u32 into usize, using the max value in case of overflow
727 .try_into()
728 .unwrap_or(usize::max_value());
729 let length = length * 4;
730 *max_bytes = Known(length);
731 crate::info!("Maximum request length is {} bytes", length);
732 length
733 }
734 Known(length) => *length,
735 }
736 }
737
738 fn prefetch_maximum_request_bytes(&self) {
739 let mut max_bytes = self.maximum_request_bytes.lock().unwrap();
740 self.prefetch_maximum_request_bytes_impl(&mut max_bytes);
741 }
742
743 fn parse_error(&self, error: &[u8]) -> Result<crate::x11_utils::X11Error, ParseError> {
744 let ext_mgr = self.extension_manager.lock().unwrap();
745 crate::x11_utils::X11Error::try_parse(error, &*ext_mgr)
746 }
747
748 fn parse_event(&self, event: &[u8]) -> Result<crate::protocol::Event, ParseError> {
749 let ext_mgr = self.extension_manager.lock().unwrap();
750 crate::protocol::Event::parse(event, &*ext_mgr)
751 }
752}
753
754impl<S: Stream> Connection for RustConnection<S> {
755 fn wait_for_raw_event_with_sequence(
756 &self,
757 ) -> Result<RawEventAndSeqNumber<Vec<u8>>, ConnectionError> {
758 let _guard = crate::trace_span!("wait_for_raw_event_with_sequence").entered();
759
760 let mut inner = self.inner.lock().unwrap();
761 loop {
762 if let Some(event) = inner.inner.poll_for_event_with_sequence() {
763 return Ok(event);
764 }
765 inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?;
766 }
767 }
768
769 fn poll_for_raw_event_with_sequence(
770 &self,
771 ) -> Result<Option<RawEventAndSeqNumber<Vec<u8>>>, ConnectionError> {
772 let _guard = crate::trace_span!("poll_for_raw_event_with_sequence").entered();
773
774 let mut inner = self.inner.lock().unwrap();
775 if let Some(event) = inner.inner.poll_for_event_with_sequence() {
776 Ok(Some(event))
777 } else {
778 inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?;
779 Ok(inner.inner.poll_for_event_with_sequence())
780 }
781 }
782
783 fn flush(&self) -> Result<(), ConnectionError> {
784 let inner = self.inner.lock().unwrap();
785 let _inner = self.flush_impl(inner)?;
786 Ok(())
787 }
788
789 fn setup(&self) -> &Setup {
790 &self.setup
791 }
792
793 fn generate_id(&self) -> Result<u32, ReplyOrIdError> {
794 let mut id_allocator = self.id_allocator.lock().unwrap();
795 if let Some(id) = id_allocator.generate_id() {
796 Ok(id)
797 } else {
798 use crate::protocol::xc_misc::{self, ConnectionExt as _};
799
800 if self
801 .extension_information(xc_misc::X11_EXTENSION_NAME)?
802 .is_none()
803 {
804 crate::error!("XIDs are exhausted and XC-MISC extension is not available");
805 Err(ReplyOrIdError::IdsExhausted)
806 } else {
807 crate::info!("XIDs are exhausted; fetching free range via XC-MISC");
808 id_allocator.update_xid_range(&self.xc_misc_get_xid_range()?.reply()?)?;
809 id_allocator
810 .generate_id()
811 .ok_or(ReplyOrIdError::IdsExhausted)
812 }
813 }
814 }
815}
816
817/// Call `notify_all` on a condition variable when dropped.
818#[derive(Debug)]
819struct NotifyOnDrop<'a>(&'a Condvar);
820
821impl Drop for NotifyOnDrop<'_> {
822 fn drop(&mut self) {
823 self.0.notify_all();
824 }
825}
826
827/// Format information about a request in a Display impl
828struct RequestInfo<'a> {
829 extension_manager: &'a Mutex<ExtensionManager>,
830 major_opcode: u8,
831 minor_opcode: u8,
832}
833
834impl std::fmt::Display for RequestInfo<'_> {
835 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
836 // QueryExtension is used by the extension manager. We would deadlock if we
837 // tried to lock it again. Hence, this case is hardcoded here.
838 if self.major_opcode == QUERY_EXTENSION_REQUEST {
839 write!(f, "QueryExtension request")
840 } else {
841 let guard: MutexGuard<'_, ExtensionManager> = self.extension_manager.lock().unwrap();
842 write!(
843 f,
844 "{} request",
845 x11rb_protocol::protocol::get_request_name(
846 &*guard,
847 self.major_opcode,
848 self.minor_opcode
849 )
850 )
851 }
852 }
853}
854