| 1 | #[cfg (feature = "p2p" )] |
| 2 | pub mod channel; |
| 3 | #[cfg (feature = "p2p" )] |
| 4 | pub use channel::Channel; |
| 5 | |
| 6 | mod split; |
| 7 | pub use split::{BoxedSplit, Split}; |
| 8 | |
| 9 | mod tcp; |
| 10 | mod unix; |
| 11 | mod vsock; |
| 12 | |
| 13 | #[cfg (not(feature = "tokio" ))] |
| 14 | use async_io::Async; |
| 15 | #[cfg (not(feature = "tokio" ))] |
| 16 | use std::sync::Arc; |
| 17 | use std::{io, mem}; |
| 18 | use tracing::trace; |
| 19 | |
| 20 | use crate::{ |
| 21 | fdo::ConnectionCredentials, |
| 22 | message::{ |
| 23 | header::{MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE}, |
| 24 | PrimaryHeader, |
| 25 | }, |
| 26 | padding_for_8_bytes, Message, |
| 27 | }; |
| 28 | #[cfg (unix)] |
| 29 | use std::os::fd::{AsFd, BorrowedFd, OwnedFd}; |
| 30 | use zvariant::{ |
| 31 | serialized::{self, Context}, |
| 32 | Endian, |
| 33 | }; |
| 34 | |
| 35 | #[cfg (unix)] |
| 36 | type RecvmsgResult = io::Result<(usize, Vec<OwnedFd>)>; |
| 37 | |
| 38 | #[cfg (not(unix))] |
| 39 | type RecvmsgResult = io::Result<usize>; |
| 40 | |
| 41 | /// Trait representing some transport layer over which the DBus protocol can be used |
| 42 | /// |
| 43 | /// In order to allow simultaneous reading and writing, this trait requires you to split the socket |
| 44 | /// into a read half and a write half. The reader and writer halves can be any types that implement |
| 45 | /// [`ReadHalf`] and [`WriteHalf`] respectively. |
| 46 | /// |
| 47 | /// The crate provides implementations for `async_io` and `tokio`'s `UnixStream` wrappers if you |
| 48 | /// enable the corresponding crate features (`async_io` is enabled by default). |
| 49 | /// |
| 50 | /// You can implement it manually to integrate with other runtimes or other dbus transports. Feel |
| 51 | /// free to submit pull requests to add support for more runtimes to zbus itself so rust's orphan |
| 52 | /// rules don't force the use of a wrapper struct (and to avoid duplicating the work across many |
| 53 | /// projects). |
| 54 | pub trait Socket { |
| 55 | type ReadHalf: ReadHalf; |
| 56 | type WriteHalf: WriteHalf; |
| 57 | |
| 58 | /// Split the socket into a read half and a write half. |
| 59 | fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> |
| 60 | where |
| 61 | Self: Sized; |
| 62 | } |
| 63 | |
| 64 | /// The read half of a socket. |
| 65 | /// |
| 66 | /// See [`Socket`] for more details. |
| 67 | #[async_trait::async_trait ] |
| 68 | pub trait ReadHalf: std::fmt::Debug + Send + Sync + 'static { |
| 69 | /// Receive a message on the socket. |
| 70 | /// |
| 71 | /// This is the higher-level method to receive a full D-Bus message. |
| 72 | /// |
| 73 | /// The default implementation uses `recvmsg` to receive the message. Implementers should |
| 74 | /// override either this or `recvmsg`. Note that if you override this method, zbus will not be |
| 75 | /// able perform an authentication handshake and hence will skip the handshake. Therefore your |
| 76 | /// implementation will only be useful for pre-authenticated connections or connections that do |
| 77 | /// not require authentication. |
| 78 | /// |
| 79 | /// # Parameters |
| 80 | /// |
| 81 | /// - `seq`: The sequence number of the message. The returned message should have this sequence. |
| 82 | /// - `already_received_bytes`: Sometimes, zbus already received some bytes from the socket |
| 83 | /// belonging to the first message(s) (as part of the connection handshake process). This is |
| 84 | /// the buffer containing those bytes (if any). If you're implementing this method, most |
| 85 | /// likely you can safely ignore this parameter. |
| 86 | /// - `already_received_fds`: Same goes for file descriptors belonging to first messages. |
| 87 | async fn receive_message( |
| 88 | &mut self, |
| 89 | seq: u64, |
| 90 | already_received_bytes: &mut Vec<u8>, |
| 91 | #[cfg (unix)] already_received_fds: &mut Vec<std::os::fd::OwnedFd>, |
| 92 | ) -> crate::Result<Message> { |
| 93 | #[cfg (unix)] |
| 94 | let mut fds = vec![]; |
| 95 | let mut bytes = if already_received_bytes.len() < MIN_MESSAGE_SIZE { |
| 96 | let mut bytes = vec![]; |
| 97 | if !already_received_bytes.is_empty() { |
| 98 | mem::swap(already_received_bytes, &mut bytes); |
| 99 | } |
| 100 | let mut pos = bytes.len(); |
| 101 | bytes.resize(MIN_MESSAGE_SIZE, 0); |
| 102 | // We don't have enough data to make a proper message header yet. |
| 103 | // Some partial read may be in raw_in_buffer, so we try to complete it |
| 104 | // until we have MIN_MESSAGE_SIZE bytes |
| 105 | // |
| 106 | // Given that MIN_MESSAGE_SIZE is 16, this codepath is actually extremely unlikely |
| 107 | // to be taken more than once |
| 108 | while pos < MIN_MESSAGE_SIZE { |
| 109 | let res = self.recvmsg(&mut bytes[pos..]).await?; |
| 110 | let len = { |
| 111 | #[cfg (unix)] |
| 112 | { |
| 113 | fds.extend(res.1); |
| 114 | res.0 |
| 115 | } |
| 116 | #[cfg (not(unix))] |
| 117 | { |
| 118 | res |
| 119 | } |
| 120 | }; |
| 121 | pos += len; |
| 122 | if len == 0 { |
| 123 | return Err(std::io::Error::new( |
| 124 | std::io::ErrorKind::UnexpectedEof, |
| 125 | "failed to receive message" , |
| 126 | ) |
| 127 | .into()); |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | bytes |
| 132 | } else { |
| 133 | already_received_bytes.drain(..MIN_MESSAGE_SIZE).collect() |
| 134 | }; |
| 135 | |
| 136 | let (primary_header, fields_len) = PrimaryHeader::read(&bytes)?; |
| 137 | let header_len = MIN_MESSAGE_SIZE + fields_len as usize; |
| 138 | let body_padding = padding_for_8_bytes(header_len); |
| 139 | let body_len = primary_header.body_len() as usize; |
| 140 | let total_len = header_len + body_padding + body_len; |
| 141 | if total_len > MAX_MESSAGE_SIZE { |
| 142 | return Err(crate::Error::ExcessData); |
| 143 | } |
| 144 | |
| 145 | // By this point we have a full primary header, so we know the exact length of the complete |
| 146 | // message. |
| 147 | if !already_received_bytes.is_empty() { |
| 148 | // still have some bytes buffered. |
| 149 | let pending = total_len - bytes.len(); |
| 150 | let to_take = std::cmp::min(pending, already_received_bytes.len()); |
| 151 | bytes.extend(already_received_bytes.drain(..to_take)); |
| 152 | } |
| 153 | let mut pos = bytes.len(); |
| 154 | bytes.resize(total_len, 0); |
| 155 | |
| 156 | // Read the rest, if any |
| 157 | while pos < total_len { |
| 158 | let res = self.recvmsg(&mut bytes[pos..]).await?; |
| 159 | let read = { |
| 160 | #[cfg (unix)] |
| 161 | { |
| 162 | fds.extend(res.1); |
| 163 | res.0 |
| 164 | } |
| 165 | #[cfg (not(unix))] |
| 166 | { |
| 167 | res |
| 168 | } |
| 169 | }; |
| 170 | pos += read; |
| 171 | if read == 0 { |
| 172 | return Err(crate::Error::InputOutput( |
| 173 | std::io::Error::new( |
| 174 | std::io::ErrorKind::UnexpectedEof, |
| 175 | "failed to receive message" , |
| 176 | ) |
| 177 | .into(), |
| 178 | )); |
| 179 | } |
| 180 | } |
| 181 | |
| 182 | // If we reach here, the message is complete; return it |
| 183 | let endian = Endian::from(primary_header.endian_sig()); |
| 184 | |
| 185 | #[cfg (unix)] |
| 186 | if !already_received_fds.is_empty() { |
| 187 | use crate::message::{header::PRIMARY_HEADER_SIZE, Field}; |
| 188 | |
| 189 | let ctxt = Context::new_dbus(endian, PRIMARY_HEADER_SIZE); |
| 190 | let encoded_fields = |
| 191 | serialized::Data::new(&bytes[PRIMARY_HEADER_SIZE..header_len], ctxt); |
| 192 | let fields: crate::message::Fields<'_> = encoded_fields.deserialize()?.0; |
| 193 | let num_required_fds = match fields.get_field(crate::message::FieldCode::UnixFDs) { |
| 194 | Some(Field::UnixFDs(num_fds)) => *num_fds as usize, |
| 195 | _ => 0, |
| 196 | }; |
| 197 | let num_pending = num_required_fds |
| 198 | .checked_sub(fds.len()) |
| 199 | .ok_or_else(|| crate::Error::ExcessData)?; |
| 200 | // If we had previously received FDs, `num_pending` has to be > 0 |
| 201 | if num_pending == 0 { |
| 202 | return Err(crate::Error::MissingParameter("Missing file descriptors" )); |
| 203 | } |
| 204 | // All previously received FDs must go first in the list. |
| 205 | let mut already_received: Vec<_> = already_received_fds.drain(..num_pending).collect(); |
| 206 | mem::swap(&mut already_received, &mut fds); |
| 207 | fds.extend(already_received); |
| 208 | } |
| 209 | |
| 210 | let ctxt = Context::new_dbus(endian, 0); |
| 211 | #[cfg (unix)] |
| 212 | let bytes = serialized::Data::new_fds(bytes, ctxt, fds); |
| 213 | #[cfg (not(unix))] |
| 214 | let bytes = serialized::Data::new(bytes, ctxt); |
| 215 | Message::from_raw_parts(bytes, seq) |
| 216 | } |
| 217 | |
| 218 | /// Attempt to receive bytes from the socket. |
| 219 | /// |
| 220 | /// On success, returns the number of bytes read as well as a `Vec` containing |
| 221 | /// any associated file descriptors. |
| 222 | /// |
| 223 | /// The default implementation simply panics. Implementers must override either `read_message` |
| 224 | /// or this method. |
| 225 | async fn recvmsg(&mut self, _buf: &mut [u8]) -> RecvmsgResult { |
| 226 | unimplemented!("`ReadHalf` implementers must either override `read_message` or `recvmsg`" ); |
| 227 | } |
| 228 | |
| 229 | /// Supports passing file descriptors. |
| 230 | /// |
| 231 | /// Default implementation returns `false`. |
| 232 | fn can_pass_unix_fd(&self) -> bool { |
| 233 | false |
| 234 | } |
| 235 | |
| 236 | /// Return the peer credentials. |
| 237 | async fn peer_credentials(&mut self) -> io::Result<ConnectionCredentials> { |
| 238 | Ok(ConnectionCredentials::default()) |
| 239 | } |
| 240 | } |
| 241 | |
| 242 | /// The write half of a socket. |
| 243 | /// |
| 244 | /// See [`Socket`] for more details. |
| 245 | #[async_trait::async_trait ] |
| 246 | pub trait WriteHalf: std::fmt::Debug + Send + Sync + 'static { |
| 247 | /// Send a message on the socket. |
| 248 | /// |
| 249 | /// This is the higher-level method to send a full D-Bus message. |
| 250 | /// |
| 251 | /// The default implementation uses `sendmsg` to send the message. Implementers should override |
| 252 | /// either this or `sendmsg`. |
| 253 | async fn send_message(&mut self, msg: &Message) -> crate::Result<()> { |
| 254 | let data = msg.data(); |
| 255 | let serial = msg.primary_header().serial_num(); |
| 256 | |
| 257 | trace!("Sending message: {:?}" , msg); |
| 258 | let mut pos = 0; |
| 259 | while pos < data.len() { |
| 260 | #[cfg (unix)] |
| 261 | let fds = if pos == 0 { |
| 262 | data.fds().iter().map(|f| f.as_fd()).collect() |
| 263 | } else { |
| 264 | vec![] |
| 265 | }; |
| 266 | pos += self |
| 267 | .sendmsg( |
| 268 | &data[pos..], |
| 269 | #[cfg (unix)] |
| 270 | &fds, |
| 271 | ) |
| 272 | .await?; |
| 273 | } |
| 274 | trace!("Sent message with serial: {}" , serial); |
| 275 | |
| 276 | Ok(()) |
| 277 | } |
| 278 | |
| 279 | /// Attempt to send a message on the socket |
| 280 | /// |
| 281 | /// On success, return the number of bytes written. There may be a partial write, in |
| 282 | /// which case the caller is responsible of sending the remaining data by calling this |
| 283 | /// method again until everything is written or it returns an error of kind `WouldBlock`. |
| 284 | /// |
| 285 | /// If at least one byte has been written, then all the provided file descriptors will |
| 286 | /// have been sent as well, and should not be provided again in subsequent calls. |
| 287 | /// |
| 288 | /// If the underlying transport does not support transmitting file descriptors, this |
| 289 | /// will return `Err(ErrorKind::InvalidInput)`. |
| 290 | /// |
| 291 | /// The default implementation simply panics. Implementers must override either `send_message` |
| 292 | /// or this method. |
| 293 | async fn sendmsg( |
| 294 | &mut self, |
| 295 | _buffer: &[u8], |
| 296 | #[cfg (unix)] _fds: &[BorrowedFd<'_>], |
| 297 | ) -> io::Result<usize> { |
| 298 | unimplemented!("`WriteHalf` implementers must either override `send_message` or `sendmsg`" ); |
| 299 | } |
| 300 | |
| 301 | /// The dbus daemon on `freebsd` and `dragonfly` currently requires sending the zero byte |
| 302 | /// as a separate message with SCM_CREDS, as part of the `EXTERNAL` authentication on unix |
| 303 | /// sockets. This method is used by the authentication machinery in zbus to send this |
| 304 | /// zero byte. Socket implementations based on unix sockets should implement this method. |
| 305 | #[cfg (any(target_os = "freebsd" , target_os = "dragonfly" ))] |
| 306 | async fn send_zero_byte(&mut self) -> io::Result<Option<usize>> { |
| 307 | Ok(None) |
| 308 | } |
| 309 | |
| 310 | /// Close the socket. |
| 311 | /// |
| 312 | /// After this call, it is valid for all reading and writing operations to fail. |
| 313 | async fn close(&mut self) -> io::Result<()>; |
| 314 | |
| 315 | /// Supports passing file descriptors. |
| 316 | /// |
| 317 | /// Default implementation returns `false`. |
| 318 | fn can_pass_unix_fd(&self) -> bool { |
| 319 | false |
| 320 | } |
| 321 | |
| 322 | /// Return the peer credentials. |
| 323 | async fn peer_credentials(&mut self) -> io::Result<ConnectionCredentials> { |
| 324 | Ok(ConnectionCredentials::default()) |
| 325 | } |
| 326 | } |
| 327 | |
| 328 | #[async_trait::async_trait ] |
| 329 | impl ReadHalf for Box<dyn ReadHalf> { |
| 330 | fn can_pass_unix_fd(&self) -> bool { |
| 331 | (**self).can_pass_unix_fd() |
| 332 | } |
| 333 | |
| 334 | async fn receive_message( |
| 335 | &mut self, |
| 336 | seq: u64, |
| 337 | already_received_bytes: &mut Vec<u8>, |
| 338 | #[cfg (unix)] already_received_fds: &mut Vec<std::os::fd::OwnedFd>, |
| 339 | ) -> crate::Result<Message> { |
| 340 | (**self) |
| 341 | .receive_message( |
| 342 | seq, |
| 343 | already_received_bytes, |
| 344 | #[cfg (unix)] |
| 345 | already_received_fds, |
| 346 | ) |
| 347 | .await |
| 348 | } |
| 349 | |
| 350 | async fn recvmsg(&mut self, buf: &mut [u8]) -> RecvmsgResult { |
| 351 | (**self).recvmsg(buf).await |
| 352 | } |
| 353 | |
| 354 | async fn peer_credentials(&mut self) -> io::Result<ConnectionCredentials> { |
| 355 | (**self).peer_credentials().await |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | #[async_trait::async_trait ] |
| 360 | impl WriteHalf for Box<dyn WriteHalf> { |
| 361 | async fn send_message(&mut self, msg: &Message) -> crate::Result<()> { |
| 362 | (**self).send_message(msg).await |
| 363 | } |
| 364 | |
| 365 | async fn sendmsg( |
| 366 | &mut self, |
| 367 | buffer: &[u8], |
| 368 | #[cfg (unix)] fds: &[BorrowedFd<'_>], |
| 369 | ) -> io::Result<usize> { |
| 370 | (**self) |
| 371 | .sendmsg( |
| 372 | buffer, |
| 373 | #[cfg (unix)] |
| 374 | fds, |
| 375 | ) |
| 376 | .await |
| 377 | } |
| 378 | |
| 379 | #[cfg (any(target_os = "freebsd" , target_os = "dragonfly" ))] |
| 380 | async fn send_zero_byte(&mut self) -> io::Result<Option<usize>> { |
| 381 | (**self).send_zero_byte().await |
| 382 | } |
| 383 | |
| 384 | async fn close(&mut self) -> io::Result<()> { |
| 385 | (**self).close().await |
| 386 | } |
| 387 | |
| 388 | fn can_pass_unix_fd(&self) -> bool { |
| 389 | (**self).can_pass_unix_fd() |
| 390 | } |
| 391 | |
| 392 | async fn peer_credentials(&mut self) -> io::Result<ConnectionCredentials> { |
| 393 | (**self).peer_credentials().await |
| 394 | } |
| 395 | } |
| 396 | |
| 397 | #[cfg (not(feature = "tokio" ))] |
| 398 | impl<T> Socket for Async<T> |
| 399 | where |
| 400 | T: std::fmt::Debug + Send + Sync, |
| 401 | Arc<Async<T>>: ReadHalf + WriteHalf, |
| 402 | { |
| 403 | type ReadHalf = Arc<Async<T>>; |
| 404 | type WriteHalf = Arc<Async<T>>; |
| 405 | |
| 406 | fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> { |
| 407 | let arc: Arc> = Arc::new(self); |
| 408 | |
| 409 | Split { |
| 410 | read: arc.clone(), |
| 411 | write: arc, |
| 412 | } |
| 413 | } |
| 414 | } |
| 415 | |