1 | //! A pure-rust implementation of a connection to an X11 server. |
2 | |
3 | use std::convert::TryInto; |
4 | use std::io::IoSlice; |
5 | use std::mem::drop; |
6 | use std::sync::{Condvar, Mutex, MutexGuard, TryLockError}; |
7 | use std::time::Instant; |
8 | |
9 | use crate::connection::{ |
10 | compute_length_field, Connection, ReplyOrError, RequestConnection, RequestKind, |
11 | }; |
12 | use crate::cookie::{Cookie, CookieWithFds, VoidCookie}; |
13 | use crate::errors::DisplayParsingError; |
14 | pub use crate::errors::{ConnectError, ConnectionError, ParseError, ReplyError, ReplyOrIdError}; |
15 | use crate::extension_manager::ExtensionManager; |
16 | use crate::protocol::bigreq::{ConnectionExt as _, EnableReply}; |
17 | use crate::protocol::xproto::{Setup, GET_INPUT_FOCUS_REQUEST, QUERY_EXTENSION_REQUEST}; |
18 | use crate::utils::RawFdContainer; |
19 | use crate::x11_utils::{ExtensionInformation, TryParse, TryParseFd}; |
20 | use x11rb_protocol::connect::Connect; |
21 | use x11rb_protocol::connection::{Connection as ProtoConnection, PollReply, ReplyFdKind}; |
22 | use x11rb_protocol::id_allocator::IdAllocator; |
23 | use x11rb_protocol::{xauth::get_auth, DiscardMode, RawEventAndSeqNumber, SequenceNumber}; |
24 | |
25 | mod packet_reader; |
26 | mod stream; |
27 | mod write_buffer; |
28 | |
29 | use packet_reader::PacketReader; |
30 | pub use stream::{DefaultStream, PollMode, Stream}; |
31 | use write_buffer::WriteBuffer; |
32 | |
33 | type Buffer = <RustConnection as RequestConnection>::Buf; |
34 | /// A combination of a buffer and a list of file descriptors for use by [`RustConnection`]. |
35 | pub type BufWithFds = crate::connection::BufWithFds<Buffer>; |
36 | |
37 | #[derive (Debug)] |
38 | enum MaxRequestBytes { |
39 | Unknown, |
40 | Requested(Option<SequenceNumber>), |
41 | Known(usize), |
42 | } |
43 | |
44 | #[derive (Debug)] |
45 | struct ConnectionInner { |
46 | inner: ProtoConnection, |
47 | write_buffer: WriteBuffer, |
48 | } |
49 | |
50 | type MutexGuardInner<'a> = MutexGuard<'a, ConnectionInner>; |
51 | |
52 | #[derive (Debug, Copy, Clone, PartialEq, Eq)] |
53 | pub(crate) enum BlockingMode { |
54 | Blocking, |
55 | NonBlocking, |
56 | } |
57 | |
58 | /// A connection to an X11 server implemented in pure rust |
59 | /// |
60 | /// This type is generic over `S`, which allows to use a generic stream to communicate with the |
61 | /// server. This stream can written to and read from, but it can also be polled, meaning that one |
62 | /// checks if new data can be read or written. |
63 | /// |
64 | /// `RustConnection` always used an internal buffer for reading, so `R` does not need |
65 | /// to be buffered. |
66 | #[derive (Debug)] |
67 | pub struct RustConnection<S: Stream = DefaultStream> { |
68 | inner: Mutex<ConnectionInner>, |
69 | stream: S, |
70 | // This mutex is only locked with `try_lock` (never blocks), so a simpler |
71 | // lock based only on a atomic variable would be more efficient. |
72 | packet_reader: Mutex<PacketReader>, |
73 | reader_condition: Condvar, |
74 | setup: Setup, |
75 | extension_manager: Mutex<ExtensionManager>, |
76 | maximum_request_bytes: Mutex<MaxRequestBytes>, |
77 | id_allocator: Mutex<IdAllocator>, |
78 | } |
79 | |
80 | // Locking rules |
81 | // ============= |
82 | // |
83 | // To avoid deadlocks, it is important to have a defined ordering about mutexes: |
84 | // |
85 | // Mutexes that may be locked when no other mutex is held: |
86 | // - maximum_request_bytes |
87 | // - extension_manager |
88 | // - id_allocator |
89 | // |
90 | // Then comes `inner`. This mutex protects the information about in-flight requests and packets |
91 | // that were already read from the connection but not given out to callers. This mutex also |
92 | // contains the write buffer and has to be locked in order to write something to the X11 server. |
93 | // In this case, the mutex has to be kept locked until writing the request has finished. This is |
94 | // necessary to ensure correct sync insertion without threads interfering with each other. When |
95 | // this mutex is locked for operations other than writing, the lock should be kept only for a |
96 | // short time. |
97 | // |
98 | // The inner level is `packet_reader`. This mutex is only locked when `inner` is already held and |
99 | // only with `try_lock()`. This ensures that there is only one reader. While actually reading, the |
100 | // lock on `inner` is released so that other threads can make progress. If more threads want to |
101 | // read while `read` is already locked, they sleep on `reader_condition`. The actual reader will |
102 | // then notify this condition variable once it is done reading. |
103 | // |
104 | // n.b. notgull: write_buffer follows the same rules |
105 | // |
106 | // The condition variable is necessary since one thread may read packets that another thread waits |
107 | // for. Thus, after reading something from the connection, all threads that wait for something have |
108 | // to check if they are the intended recipient. |
109 | |
110 | impl RustConnection<DefaultStream> { |
111 | /// Establish a new connection. |
112 | /// |
113 | /// If no `dpy_name` is provided, the value from `$DISPLAY` is used. |
114 | pub fn connect(dpy_name: Option<&str>) -> Result<(Self, usize), ConnectError> { |
115 | // Parse display information |
116 | let parsed_display = x11rb_protocol::parse_display::parse_display(dpy_name)?; |
117 | let screen = parsed_display.screen.into(); |
118 | |
119 | // Establish connection by iterating over ConnectAddresses until we find one that |
120 | // works. |
121 | let mut error = None; |
122 | for addr in parsed_display.connect_instruction() { |
123 | let start = Instant::now(); |
124 | match DefaultStream::connect(&addr) { |
125 | Ok((stream, (family, address))) => { |
126 | crate::trace!( |
127 | "Connected to X11 server via {:?} in {:?}" , |
128 | addr, |
129 | start.elapsed() |
130 | ); |
131 | |
132 | // we found a stream, get auth information |
133 | let (auth_name, auth_data) = get_auth(family, &address, parsed_display.display) |
134 | // Ignore all errors while determining auth; instead we just try without auth info. |
135 | .unwrap_or(None) |
136 | .unwrap_or_else(|| (Vec::new(), Vec::new())); |
137 | crate::trace!("Picked authentication via auth mechanism {:?}" , auth_name); |
138 | |
139 | // finish connecting to server |
140 | return Ok(( |
141 | Self::connect_to_stream_with_auth_info( |
142 | stream, screen, auth_name, auth_data, |
143 | )?, |
144 | screen, |
145 | )); |
146 | } |
147 | Err(e) => { |
148 | crate::debug!("Failed to connect to X11 server via {:?}: {:?}" , addr, e); |
149 | error = Some(e); |
150 | continue; |
151 | } |
152 | } |
153 | } |
154 | |
155 | // none of the addresses worked |
156 | Err(match error { |
157 | Some(e) => ConnectError::IoError(e), |
158 | None => DisplayParsingError::Unknown.into(), |
159 | }) |
160 | } |
161 | } |
162 | |
163 | impl<S: Stream> RustConnection<S> { |
164 | /// Establish a new connection to the given streams. |
165 | /// |
166 | /// `read` is used for reading data from the X11 server and `write` is used for writing. |
167 | /// `screen` is the number of the screen that should be used. This function checks that a |
168 | /// screen with that number exists. |
169 | pub fn connect_to_stream(stream: S, screen: usize) -> Result<Self, ConnectError> { |
170 | Self::connect_to_stream_with_auth_info(stream, screen, Vec::new(), Vec::new()) |
171 | } |
172 | |
173 | /// Establish a new connection to the given streams. |
174 | /// |
175 | /// `read` is used for reading data from the X11 server and `write` is used for writing. |
176 | /// `screen` is the number of the screen that should be used. This function checks that a |
177 | /// screen with that number exists. |
178 | /// |
179 | /// The parameters `auth_name` and `auth_data` are used for the members |
180 | /// `authorization_protocol_name` and `authorization_protocol_data` of the `SetupRequest` that |
181 | /// is sent to the X11 server. |
182 | pub fn connect_to_stream_with_auth_info( |
183 | stream: S, |
184 | screen: usize, |
185 | auth_name: Vec<u8>, |
186 | auth_data: Vec<u8>, |
187 | ) -> Result<Self, ConnectError> { |
188 | let (mut connect, setup_request) = Connect::with_authorization(auth_name, auth_data); |
189 | |
190 | // write the connect() setup request |
191 | let mut nwritten = 0; |
192 | let mut fds = vec![]; |
193 | |
194 | crate::trace!( |
195 | "Writing connection setup with {} bytes" , |
196 | setup_request.len() |
197 | ); |
198 | while nwritten != setup_request.len() { |
199 | stream.poll(PollMode::Writable)?; |
200 | // poll returned successfully, so the stream is writable. |
201 | match stream.write(&setup_request[nwritten..], &mut fds) { |
202 | Ok(0) => { |
203 | return Err(std::io::Error::new( |
204 | std::io::ErrorKind::WriteZero, |
205 | "failed to write whole buffer" , |
206 | ) |
207 | .into()) |
208 | } |
209 | Ok(n) => nwritten += n, |
210 | // Spurious wakeup from poll, try again |
211 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {} |
212 | Err(e) => return Err(e.into()), |
213 | } |
214 | } |
215 | |
216 | // read in the setup |
217 | loop { |
218 | stream.poll(PollMode::Readable)?; |
219 | crate::trace!( |
220 | "Reading connection setup with at least {} bytes remaining" , |
221 | connect.buffer().len() |
222 | ); |
223 | let adv = match stream.read(connect.buffer(), &mut fds) { |
224 | Ok(0) => { |
225 | return Err(std::io::Error::new( |
226 | std::io::ErrorKind::UnexpectedEof, |
227 | "failed to read whole buffer" , |
228 | ) |
229 | .into()) |
230 | } |
231 | Ok(n) => n, |
232 | // Spurious wakeup from poll, try again |
233 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, |
234 | Err(e) => return Err(e.into()), |
235 | }; |
236 | crate::trace!("Read {} bytes" , adv); |
237 | |
238 | // advance the internal buffer |
239 | if connect.advance(adv) { |
240 | break; |
241 | } |
242 | } |
243 | |
244 | // resolve the setup |
245 | let setup = connect.into_setup()?; |
246 | |
247 | // Check that we got a valid screen number |
248 | if screen >= setup.roots.len() { |
249 | return Err(ConnectError::InvalidScreen); |
250 | } |
251 | |
252 | // Success! Set up our state |
253 | Self::for_connected_stream(stream, setup) |
254 | } |
255 | |
256 | /// Establish a new connection for an already connected stream. |
257 | /// |
258 | /// The given `stream` is used for communicating with the X11 server. |
259 | /// It is assumed that `setup` was just received from the server. Thus, the first reply to a |
260 | /// request that is sent will have sequence number one. |
261 | pub fn for_connected_stream(stream: S, setup: Setup) -> Result<Self, ConnectError> { |
262 | let id_allocator = IdAllocator::new(setup.resource_id_base, setup.resource_id_mask)?; |
263 | |
264 | Ok(RustConnection { |
265 | inner: Mutex::new(ConnectionInner { |
266 | inner: ProtoConnection::new(), |
267 | write_buffer: WriteBuffer::new(), |
268 | }), |
269 | stream, |
270 | packet_reader: Mutex::new(PacketReader::new()), |
271 | reader_condition: Condvar::new(), |
272 | setup, |
273 | extension_manager: Default::default(), |
274 | maximum_request_bytes: Mutex::new(MaxRequestBytes::Unknown), |
275 | id_allocator: Mutex::new(id_allocator), |
276 | }) |
277 | } |
278 | |
279 | /// Internal function for actually sending a request. |
280 | /// |
281 | /// This function "does the actual work" for `send_request_with_reply()` and |
282 | /// `send_request_without_reply()`. |
283 | fn send_request( |
284 | &self, |
285 | bufs: &[IoSlice<'_>], |
286 | fds: Vec<RawFdContainer>, |
287 | kind: ReplyFdKind, |
288 | ) -> Result<SequenceNumber, ConnectionError> { |
289 | let _guard = crate::debug_span!("send_request" ).entered(); |
290 | |
291 | let request_info = RequestInfo { |
292 | extension_manager: &self.extension_manager, |
293 | major_opcode: bufs[0][0], |
294 | minor_opcode: bufs[0][1], |
295 | }; |
296 | crate::debug!("Sending {}" , request_info); |
297 | |
298 | let mut storage = Default::default(); |
299 | let bufs = compute_length_field(self, bufs, &mut storage)?; |
300 | |
301 | // Note: `inner` must be kept blocked until the request has been completely written |
302 | // or buffered to avoid sending the data of different requests interleaved. For this |
303 | // reason, `read_packet_and_enqueue` must always be called with `BlockingMode::NonBlocking` |
304 | // during a write, otherwise `inner` would be temporarily released. |
305 | let mut inner = self.inner.lock().unwrap(); |
306 | |
307 | loop { |
308 | let send_result = inner.inner.send_request(kind); |
309 | match send_result { |
310 | Some(seqno) => { |
311 | // Now actually send the buffers |
312 | let _inner = self.write_all_vectored(inner, bufs, fds)?; |
313 | return Ok(seqno); |
314 | } |
315 | None => { |
316 | crate::trace!("Syncing with the X11 server since there are too many outstanding void requests" ); |
317 | inner = self.send_sync(inner)?; |
318 | } |
319 | } |
320 | } |
321 | } |
322 | |
323 | /// Send a synchronisation packet to the X11 server. |
324 | /// |
325 | /// This function sends a `GetInputFocus` request to the X11 server and arranges for its reply |
326 | /// to be ignored. This ensures that a reply is expected (`ConnectionInner.next_reply_expected` |
327 | /// increases). |
328 | fn send_sync<'a>( |
329 | &'a self, |
330 | mut inner: MutexGuardInner<'a>, |
331 | ) -> Result<MutexGuardInner<'a>, std::io::Error> { |
332 | let length = 1u16.to_ne_bytes(); |
333 | let request = [ |
334 | GET_INPUT_FOCUS_REQUEST, |
335 | 0, /* pad */ |
336 | length[0], |
337 | length[1], |
338 | ]; |
339 | |
340 | let seqno = inner |
341 | .inner |
342 | .send_request(ReplyFdKind::ReplyWithoutFDs) |
343 | .expect("Sending a HasResponse request should not be blocked by syncs" ); |
344 | inner |
345 | .inner |
346 | .discard_reply(seqno, DiscardMode::DiscardReplyAndError); |
347 | let inner = self.write_all_vectored(inner, &[IoSlice::new(&request)], Vec::new())?; |
348 | |
349 | Ok(inner) |
350 | } |
351 | |
352 | /// Write a set of buffers on a `writer`. May also read packets |
353 | /// from the server. |
354 | fn write_all_vectored<'a>( |
355 | &'a self, |
356 | mut inner: MutexGuardInner<'a>, |
357 | mut bufs: &[IoSlice<'_>], |
358 | mut fds: Vec<RawFdContainer>, |
359 | ) -> std::io::Result<MutexGuardInner<'a>> { |
360 | let mut partial_buf: &[u8] = &[]; |
361 | while !partial_buf.is_empty() || !bufs.is_empty() { |
362 | self.stream.poll(PollMode::ReadAndWritable)?; |
363 | let write_result = if !partial_buf.is_empty() { |
364 | // "inner" is held, passed into this function, so this should never be held |
365 | inner |
366 | .write_buffer |
367 | .write(&self.stream, partial_buf, &mut fds) |
368 | } else { |
369 | // same as above |
370 | inner |
371 | .write_buffer |
372 | .write_vectored(&self.stream, bufs, &mut fds) |
373 | }; |
374 | match write_result { |
375 | Ok(0) => { |
376 | return Err(std::io::Error::new( |
377 | std::io::ErrorKind::WriteZero, |
378 | "failed to write anything" , |
379 | )); |
380 | } |
381 | Ok(mut count) => { |
382 | // Successful write |
383 | if count >= partial_buf.len() { |
384 | count -= partial_buf.len(); |
385 | partial_buf = &[]; |
386 | } else { |
387 | partial_buf = &partial_buf[count..]; |
388 | count = 0; |
389 | } |
390 | while count > 0 { |
391 | if count >= bufs[0].len() { |
392 | count -= bufs[0].len(); |
393 | } else { |
394 | partial_buf = &bufs[0][count..]; |
395 | count = 0; |
396 | } |
397 | bufs = &bufs[1..]; |
398 | // Skip empty slices |
399 | while bufs.first().map(|s| s.len()) == Some(0) { |
400 | bufs = &bufs[1..]; |
401 | } |
402 | } |
403 | } |
404 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { |
405 | crate::trace!("Writing more data would block for now" ); |
406 | // Writing would block, try to read instead because the |
407 | // server might not accept new requests after its |
408 | // buffered replies have been read. |
409 | inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?; |
410 | } |
411 | Err(e) => return Err(e), |
412 | } |
413 | } |
414 | if !fds.is_empty() { |
415 | return Err(std::io::Error::new( |
416 | std::io::ErrorKind::Other, |
417 | "Left over FDs after sending the request" , |
418 | )); |
419 | } |
420 | Ok(inner) |
421 | } |
422 | |
423 | fn flush_impl<'a>( |
424 | &'a self, |
425 | mut inner: MutexGuardInner<'a>, |
426 | ) -> std::io::Result<MutexGuardInner<'a>> { |
427 | // n.b. notgull: inner guard is held |
428 | while inner.write_buffer.needs_flush() { |
429 | self.stream.poll(PollMode::ReadAndWritable)?; |
430 | let flush_result = inner.write_buffer.flush(&self.stream); |
431 | match flush_result { |
432 | // Flush completed |
433 | Ok(()) => break, |
434 | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { |
435 | crate::trace!("Flushing more data would block for now" ); |
436 | // Writing would block, try to read instead because the |
437 | // server might not accept new requests after its |
438 | // buffered replies have been read. |
439 | inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?; |
440 | } |
441 | Err(e) => return Err(e), |
442 | } |
443 | } |
444 | Ok(inner) |
445 | } |
446 | |
447 | /// Read a packet from the connection. |
448 | /// |
449 | /// This function waits for an X11 packet to be received. It drops the mutex protecting the |
450 | /// inner data while waiting for a packet so that other threads can make progress. For this |
451 | /// reason, you need to pass in a `MutexGuard` to be dropped. This function locks the mutex |
452 | /// again and returns a new `MutexGuard`. |
453 | /// |
454 | /// Note: If `mode` is `BlockingMode::Blocking`, the lock on `inner` will be temporarily |
455 | /// released. While sending a request, `inner` must be kept locked to avoid sending the data |
456 | /// of different requests interleaved. So, when `read_packet_and_enqueue` is called as part |
457 | /// of a write, it must always be done with `mode` set to `BlockingMode::NonBlocking`. |
458 | fn read_packet_and_enqueue<'a>( |
459 | &'a self, |
460 | mut inner: MutexGuardInner<'a>, |
461 | mode: BlockingMode, |
462 | ) -> Result<MutexGuardInner<'a>, std::io::Error> { |
463 | // 0.1. Try to lock the `packet_reader` mutex. |
464 | match self.packet_reader.try_lock() { |
465 | Err(TryLockError::WouldBlock) => { |
466 | // In non-blocking mode, we just return immediately |
467 | match mode { |
468 | BlockingMode::NonBlocking => { |
469 | crate::trace!("read_packet_and_enqueue in NonBlocking mode doing nothing since reader is already locked" ); |
470 | return Ok(inner); |
471 | } |
472 | BlockingMode::Blocking => { |
473 | crate::trace!("read_packet_and_enqueue in Blocking mode waiting for pre-existing reader" ); |
474 | } |
475 | } |
476 | |
477 | // 1.1. Someone else is reading (other thread is at 2.2); |
478 | // wait for it. `Condvar::wait` will unlock `inner`, so |
479 | // the other thread can relock `inner` at 2.1.3 (and to allow |
480 | // other threads to arrive 0.1). |
481 | // |
482 | // When `wait` finishes, other thread has enqueued a packet, |
483 | // so the purpose of this function has been fulfilled. `wait` |
484 | // will relock `inner` when it returns. |
485 | Ok(self.reader_condition.wait(inner).unwrap()) |
486 | } |
487 | Err(TryLockError::Poisoned(e)) => panic!("{}" , e), |
488 | Ok(mut packet_reader) => { |
489 | // Make sure sleeping readers are woken up when we return |
490 | // (Even in case of errors) |
491 | let notify_on_drop = NotifyOnDrop(&self.reader_condition); |
492 | |
493 | // 2.1. Poll for read if mode is blocking. |
494 | if mode == BlockingMode::Blocking { |
495 | // 2.1.1. Unlock `inner`, so other threads can use it while |
496 | // during the poll. |
497 | drop(inner); |
498 | // 2.1.2. Do the actual poll |
499 | self.stream.poll(PollMode::Readable)?; |
500 | // 2.1.3. Relock inner |
501 | inner = self.inner.lock().unwrap(); |
502 | } |
503 | |
504 | // 2.2. Try to read as many packets as possible without blocking. |
505 | let mut fds = Vec::new(); |
506 | let mut packets = Vec::new(); |
507 | packet_reader.try_read_packets(&self.stream, &mut packets, &mut fds)?; |
508 | |
509 | // 2.3. Once `inner` has been relocked, drop the |
510 | // lock on `packet_reader`. While inner is locked, other |
511 | // threads cannot arrive at 0.1 anyways. |
512 | // |
513 | // `packet_reader` must be unlocked with `inner` is locked, |
514 | // otherwise it could let another thread wait on 2.1 |
515 | // for a reply that has been read but not enqueued yet. |
516 | drop(packet_reader); |
517 | |
518 | // 2.4. Actually enqueue the read packets. |
519 | inner.inner.enqueue_fds(fds); |
520 | packets |
521 | .into_iter() |
522 | .for_each(|packet| inner.inner.enqueue_packet(packet)); |
523 | |
524 | // 2.5. Notify the condvar by dropping the `notify_on_drop` object. |
525 | // The object would have been dropped when the function returns, so |
526 | // the explicit drop is not really needed. The purpose of having a |
527 | // explicit drop is to... make it explicit. |
528 | drop(notify_on_drop); |
529 | |
530 | // 2.6. Return the locked `inner` back to the caller. |
531 | Ok(inner) |
532 | } |
533 | } |
534 | } |
535 | |
536 | fn prefetch_maximum_request_bytes_impl(&self, max_bytes: &mut MutexGuard<'_, MaxRequestBytes>) { |
537 | if let MaxRequestBytes::Unknown = **max_bytes { |
538 | crate::info!("Prefetching maximum request length" ); |
539 | let request = self |
540 | .bigreq_enable() |
541 | .map(|cookie| cookie.into_sequence_number()) |
542 | .ok(); |
543 | **max_bytes = MaxRequestBytes::Requested(request); |
544 | } |
545 | } |
546 | |
547 | /// Returns a reference to the contained stream. |
548 | pub fn stream(&self) -> &S { |
549 | &self.stream |
550 | } |
551 | } |
552 | |
553 | impl<S: Stream> RequestConnection for RustConnection<S> { |
554 | type Buf = Vec<u8>; |
555 | |
556 | fn send_request_with_reply<Reply>( |
557 | &self, |
558 | bufs: &[IoSlice<'_>], |
559 | fds: Vec<RawFdContainer>, |
560 | ) -> Result<Cookie<'_, Self, Reply>, ConnectionError> |
561 | where |
562 | Reply: TryParse, |
563 | { |
564 | Ok(Cookie::new( |
565 | self, |
566 | self.send_request(bufs, fds, ReplyFdKind::ReplyWithoutFDs)?, |
567 | )) |
568 | } |
569 | |
570 | fn send_request_with_reply_with_fds<Reply>( |
571 | &self, |
572 | bufs: &[IoSlice<'_>], |
573 | fds: Vec<RawFdContainer>, |
574 | ) -> Result<CookieWithFds<'_, Self, Reply>, ConnectionError> |
575 | where |
576 | Reply: TryParseFd, |
577 | { |
578 | Ok(CookieWithFds::new( |
579 | self, |
580 | self.send_request(bufs, fds, ReplyFdKind::ReplyWithFDs)?, |
581 | )) |
582 | } |
583 | |
584 | fn send_request_without_reply( |
585 | &self, |
586 | bufs: &[IoSlice<'_>], |
587 | fds: Vec<RawFdContainer>, |
588 | ) -> Result<VoidCookie<'_, Self>, ConnectionError> { |
589 | Ok(VoidCookie::new( |
590 | self, |
591 | self.send_request(bufs, fds, ReplyFdKind::NoReply)?, |
592 | )) |
593 | } |
594 | |
595 | fn discard_reply(&self, sequence: SequenceNumber, _kind: RequestKind, mode: DiscardMode) { |
596 | crate::debug!( |
597 | "Discarding reply to request {} in mode {:?}" , |
598 | sequence, |
599 | mode |
600 | ); |
601 | self.inner |
602 | .lock() |
603 | .unwrap() |
604 | .inner |
605 | .discard_reply(sequence, mode); |
606 | } |
607 | |
608 | fn prefetch_extension_information( |
609 | &self, |
610 | extension_name: &'static str, |
611 | ) -> Result<(), ConnectionError> { |
612 | self.extension_manager |
613 | .lock() |
614 | .unwrap() |
615 | .prefetch_extension_information(self, extension_name) |
616 | } |
617 | |
618 | fn extension_information( |
619 | &self, |
620 | extension_name: &'static str, |
621 | ) -> Result<Option<ExtensionInformation>, ConnectionError> { |
622 | self.extension_manager |
623 | .lock() |
624 | .unwrap() |
625 | .extension_information(self, extension_name) |
626 | } |
627 | |
628 | fn wait_for_reply_or_raw_error( |
629 | &self, |
630 | sequence: SequenceNumber, |
631 | ) -> Result<ReplyOrError<Vec<u8>>, ConnectionError> { |
632 | match self.wait_for_reply_with_fds_raw(sequence)? { |
633 | ReplyOrError::Reply((reply, _fds)) => Ok(ReplyOrError::Reply(reply)), |
634 | ReplyOrError::Error(e) => Ok(ReplyOrError::Error(e)), |
635 | } |
636 | } |
637 | |
638 | fn wait_for_reply(&self, sequence: SequenceNumber) -> Result<Option<Vec<u8>>, ConnectionError> { |
639 | let _guard = crate::debug_span!("wait_for_reply" , sequence).entered(); |
640 | |
641 | let mut inner = self.inner.lock().unwrap(); |
642 | inner = self.flush_impl(inner)?; |
643 | loop { |
644 | crate::trace!({ sequence }, "Polling for reply" ); |
645 | let poll_result = inner.inner.poll_for_reply(sequence); |
646 | match poll_result { |
647 | PollReply::TryAgain => {} |
648 | PollReply::NoReply => return Ok(None), |
649 | PollReply::Reply(buffer) => return Ok(Some(buffer)), |
650 | } |
651 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
652 | } |
653 | } |
654 | |
655 | fn check_for_raw_error( |
656 | &self, |
657 | sequence: SequenceNumber, |
658 | ) -> Result<Option<Buffer>, ConnectionError> { |
659 | let _guard = crate::debug_span!("check_for_raw_error" , sequence).entered(); |
660 | |
661 | let mut inner = self.inner.lock().unwrap(); |
662 | if inner.inner.prepare_check_for_reply_or_error(sequence) { |
663 | crate::trace!("Inserting sync with the X11 server" ); |
664 | inner = self.send_sync(inner)?; |
665 | assert!(!inner.inner.prepare_check_for_reply_or_error(sequence)); |
666 | } |
667 | // Ensure the request is sent |
668 | inner = self.flush_impl(inner)?; |
669 | loop { |
670 | crate::trace!({ sequence }, "Polling for reply or error" ); |
671 | let poll_result = inner.inner.poll_check_for_reply_or_error(sequence); |
672 | match poll_result { |
673 | PollReply::TryAgain => {} |
674 | PollReply::NoReply => return Ok(None), |
675 | PollReply::Reply(buffer) => return Ok(Some(buffer)), |
676 | } |
677 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
678 | } |
679 | } |
680 | |
681 | fn wait_for_reply_with_fds_raw( |
682 | &self, |
683 | sequence: SequenceNumber, |
684 | ) -> Result<ReplyOrError<BufWithFds, Buffer>, ConnectionError> { |
685 | let _guard = crate::debug_span!("wait_for_reply_with_fds_raw" , sequence).entered(); |
686 | |
687 | let mut inner = self.inner.lock().unwrap(); |
688 | // Ensure the request is sent |
689 | inner = self.flush_impl(inner)?; |
690 | loop { |
691 | crate::trace!({ sequence }, "Polling for reply or error" ); |
692 | if let Some(reply) = inner.inner.poll_for_reply_or_error(sequence) { |
693 | if reply.0[0] == 0 { |
694 | crate::trace!("Got error" ); |
695 | return Ok(ReplyOrError::Error(reply.0)); |
696 | } else { |
697 | crate::trace!("Got reply" ); |
698 | return Ok(ReplyOrError::Reply(reply)); |
699 | } |
700 | } |
701 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
702 | } |
703 | } |
704 | |
705 | fn maximum_request_bytes(&self) -> usize { |
706 | let mut max_bytes = self.maximum_request_bytes.lock().unwrap(); |
707 | self.prefetch_maximum_request_bytes_impl(&mut max_bytes); |
708 | use MaxRequestBytes::*; |
709 | let max_bytes = &mut *max_bytes; |
710 | match max_bytes { |
711 | Unknown => unreachable!("We just prefetched this" ), |
712 | Requested(seqno) => { |
713 | let _guard = crate::info_span!("maximum_request_bytes" ).entered(); |
714 | |
715 | let length = seqno |
716 | // If prefetching the request succeeded, get a cookie |
717 | .and_then(|seqno| { |
718 | Cookie::<_, EnableReply>::new(self, seqno) |
719 | // and then get the reply to the request |
720 | .reply() |
721 | .map(|reply| reply.maximum_request_length) |
722 | .ok() |
723 | }) |
724 | // If anything failed (sending the request, getting the reply), use Setup |
725 | .unwrap_or_else(|| self.setup.maximum_request_length.into()) |
726 | // Turn the u32 into usize, using the max value in case of overflow |
727 | .try_into() |
728 | .unwrap_or(usize::max_value()); |
729 | let length = length * 4; |
730 | *max_bytes = Known(length); |
731 | crate::info!("Maximum request length is {} bytes" , length); |
732 | length |
733 | } |
734 | Known(length) => *length, |
735 | } |
736 | } |
737 | |
738 | fn prefetch_maximum_request_bytes(&self) { |
739 | let mut max_bytes = self.maximum_request_bytes.lock().unwrap(); |
740 | self.prefetch_maximum_request_bytes_impl(&mut max_bytes); |
741 | } |
742 | |
743 | fn parse_error(&self, error: &[u8]) -> Result<crate::x11_utils::X11Error, ParseError> { |
744 | let ext_mgr = self.extension_manager.lock().unwrap(); |
745 | crate::x11_utils::X11Error::try_parse(error, &*ext_mgr) |
746 | } |
747 | |
748 | fn parse_event(&self, event: &[u8]) -> Result<crate::protocol::Event, ParseError> { |
749 | let ext_mgr = self.extension_manager.lock().unwrap(); |
750 | crate::protocol::Event::parse(event, &*ext_mgr) |
751 | } |
752 | } |
753 | |
754 | impl<S: Stream> Connection for RustConnection<S> { |
755 | fn wait_for_raw_event_with_sequence( |
756 | &self, |
757 | ) -> Result<RawEventAndSeqNumber<Vec<u8>>, ConnectionError> { |
758 | let _guard = crate::trace_span!("wait_for_raw_event_with_sequence" ).entered(); |
759 | |
760 | let mut inner = self.inner.lock().unwrap(); |
761 | loop { |
762 | if let Some(event) = inner.inner.poll_for_event_with_sequence() { |
763 | return Ok(event); |
764 | } |
765 | inner = self.read_packet_and_enqueue(inner, BlockingMode::Blocking)?; |
766 | } |
767 | } |
768 | |
769 | fn poll_for_raw_event_with_sequence( |
770 | &self, |
771 | ) -> Result<Option<RawEventAndSeqNumber<Vec<u8>>>, ConnectionError> { |
772 | let _guard = crate::trace_span!("poll_for_raw_event_with_sequence" ).entered(); |
773 | |
774 | let mut inner = self.inner.lock().unwrap(); |
775 | if let Some(event) = inner.inner.poll_for_event_with_sequence() { |
776 | Ok(Some(event)) |
777 | } else { |
778 | inner = self.read_packet_and_enqueue(inner, BlockingMode::NonBlocking)?; |
779 | Ok(inner.inner.poll_for_event_with_sequence()) |
780 | } |
781 | } |
782 | |
783 | fn flush(&self) -> Result<(), ConnectionError> { |
784 | let inner = self.inner.lock().unwrap(); |
785 | let _inner = self.flush_impl(inner)?; |
786 | Ok(()) |
787 | } |
788 | |
789 | fn setup(&self) -> &Setup { |
790 | &self.setup |
791 | } |
792 | |
793 | fn generate_id(&self) -> Result<u32, ReplyOrIdError> { |
794 | let mut id_allocator = self.id_allocator.lock().unwrap(); |
795 | if let Some(id) = id_allocator.generate_id() { |
796 | Ok(id) |
797 | } else { |
798 | use crate::protocol::xc_misc::{self, ConnectionExt as _}; |
799 | |
800 | if self |
801 | .extension_information(xc_misc::X11_EXTENSION_NAME)? |
802 | .is_none() |
803 | { |
804 | crate::error!("XIDs are exhausted and XC-MISC extension is not available" ); |
805 | Err(ReplyOrIdError::IdsExhausted) |
806 | } else { |
807 | crate::info!("XIDs are exhausted; fetching free range via XC-MISC" ); |
808 | id_allocator.update_xid_range(&self.xc_misc_get_xid_range()?.reply()?)?; |
809 | id_allocator |
810 | .generate_id() |
811 | .ok_or(ReplyOrIdError::IdsExhausted) |
812 | } |
813 | } |
814 | } |
815 | } |
816 | |
817 | /// Call `notify_all` on a condition variable when dropped. |
818 | #[derive (Debug)] |
819 | struct NotifyOnDrop<'a>(&'a Condvar); |
820 | |
821 | impl Drop for NotifyOnDrop<'_> { |
822 | fn drop(&mut self) { |
823 | self.0.notify_all(); |
824 | } |
825 | } |
826 | |
827 | /// Format information about a request in a Display impl |
828 | struct RequestInfo<'a> { |
829 | extension_manager: &'a Mutex<ExtensionManager>, |
830 | major_opcode: u8, |
831 | minor_opcode: u8, |
832 | } |
833 | |
834 | impl std::fmt::Display for RequestInfo<'_> { |
835 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
836 | // QueryExtension is used by the extension manager. We would deadlock if we |
837 | // tried to lock it again. Hence, this case is hardcoded here. |
838 | if self.major_opcode == QUERY_EXTENSION_REQUEST { |
839 | write!(f, "QueryExtension request" ) |
840 | } else { |
841 | let guard: MutexGuard<'_, ExtensionManager> = self.extension_manager.lock().unwrap(); |
842 | write!( |
843 | f, |
844 | " {} request" , |
845 | x11rb_protocol::protocol::get_request_name( |
846 | &*guard, |
847 | self.major_opcode, |
848 | self.minor_opcode |
849 | ) |
850 | ) |
851 | } |
852 | } |
853 | } |
854 | |