1use std::{
2 convert::{TryFrom, TryInto},
3 fmt,
4 io::Cursor,
5};
6
7#[cfg(unix)]
8use std::{
9 os::unix::io::{AsRawFd, RawFd},
10 sync::{Arc, RwLock},
11};
12
13use static_assertions::assert_impl_all;
14use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, UniqueName};
15
16#[cfg(unix)]
17use crate::OwnedFd;
18use crate::{
19 utils::padding_for_8_bytes,
20 zvariant::{DynamicType, EncodingContext, ObjectPath, Signature, Type},
21 EndianSig, Error, MessageBuilder, MessageField, MessageFieldCode, MessageFields, MessageHeader,
22 MessagePrimaryHeader, MessageType, QuickMessageFields, Result, MIN_MESSAGE_SIZE,
23 NATIVE_ENDIAN_SIG,
24};
25
26#[cfg(unix)]
27const LOCK_PANIC_MSG: &str = "lock poisoned";
28
29macro_rules! dbus_context {
30 ($n_bytes_before: expr) => {
31 EncodingContext::<byteorder::NativeEndian>::new_dbus($n_bytes_before)
32 };
33}
34
35#[cfg(unix)]
36#[derive(Debug, Eq, PartialEq)]
37pub(crate) enum Fds {
38 Owned(Vec<OwnedFd>),
39 Raw(Vec<RawFd>),
40}
41
42/// A position in the stream of [`Message`] objects received by a single [`zbus::Connection`].
43///
44/// Note: the relative ordering of values obtained from distinct [`zbus::Connection`] objects is
45/// not specified; only sequence numbers originating from the same connection should be compared.
46#[derive(Debug, Default, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
47pub struct MessageSequence {
48 recv_seq: u64,
49}
50
51impl MessageSequence {
52 /// A sequence number that is higher than any other; used by errors that terminate a stream.
53 pub(crate) const LAST: Self = Self { recv_seq: u64::MAX };
54}
55
56/// A D-Bus Message.
57///
58/// The content of the message are stored in serialized format. To deserialize the body of the
59/// message, use the [`body`] method. You may also access the header and other details with the
60/// various other getters.
61///
62/// Also provided are constructors for messages of different types. These will mainly be useful for
63/// very advanced use cases as typically you will want to create a message for immediate dispatch
64/// and hence use the API provided by [`Connection`], even when using the low-level API.
65///
66/// **Note**: The message owns the received FDs and will close them when dropped. You can call
67/// [`take_fds`] after deserializing to `RawFD` using [`body`] if you want to take the ownership.
68///
69/// [`body`]: #method.body
70/// [`take_fds`]: #method.take_fds
71/// [`Connection`]: struct.Connection#method.call_method
72#[derive(Clone)]
73pub struct Message {
74 pub(crate) primary_header: MessagePrimaryHeader,
75 pub(crate) quick_fields: QuickMessageFields,
76 pub(crate) bytes: Vec<u8>,
77 pub(crate) body_offset: usize,
78 #[cfg(unix)]
79 pub(crate) fds: Arc<RwLock<Fds>>,
80 pub(crate) recv_seq: MessageSequence,
81}
82
83assert_impl_all!(Message: Send, Sync, Unpin);
84
85// TODO: Handle non-native byte order: https://github.com/dbus2/zbus/issues/19
86impl Message {
87 /// Create a message of type [`MessageType::MethodCall`].
88 ///
89 /// [`MessageType::MethodCall`]: enum.MessageType.html#variant.MethodCall
90 pub fn method<'s, 'd, 'p, 'i, 'm, S, D, P, I, M, B>(
91 sender: Option<S>,
92 destination: Option<D>,
93 path: P,
94 iface: Option<I>,
95 method_name: M,
96 body: &B,
97 ) -> Result<Self>
98 where
99 S: TryInto<UniqueName<'s>>,
100 D: TryInto<BusName<'d>>,
101 P: TryInto<ObjectPath<'p>>,
102 I: TryInto<InterfaceName<'i>>,
103 M: TryInto<MemberName<'m>>,
104 S::Error: Into<Error>,
105 D::Error: Into<Error>,
106 P::Error: Into<Error>,
107 I::Error: Into<Error>,
108 M::Error: Into<Error>,
109 B: serde::ser::Serialize + DynamicType,
110 {
111 let mut b = MessageBuilder::method_call(path, method_name)?;
112
113 if let Some(sender) = sender {
114 b = b.sender(sender)?;
115 }
116 if let Some(destination) = destination {
117 b = b.destination(destination)?;
118 }
119 if let Some(iface) = iface {
120 b = b.interface(iface)?;
121 }
122 b.build(body)
123 }
124
125 /// Create a message of type [`MessageType::Signal`].
126 ///
127 /// [`MessageType::Signal`]: enum.MessageType.html#variant.Signal
128 pub fn signal<'s, 'd, 'p, 'i, 'm, S, D, P, I, M, B>(
129 sender: Option<S>,
130 destination: Option<D>,
131 path: P,
132 iface: I,
133 signal_name: M,
134 body: &B,
135 ) -> Result<Self>
136 where
137 S: TryInto<UniqueName<'s>>,
138 D: TryInto<BusName<'d>>,
139 P: TryInto<ObjectPath<'p>>,
140 I: TryInto<InterfaceName<'i>>,
141 M: TryInto<MemberName<'m>>,
142 S::Error: Into<Error>,
143 D::Error: Into<Error>,
144 P::Error: Into<Error>,
145 I::Error: Into<Error>,
146 M::Error: Into<Error>,
147 B: serde::ser::Serialize + DynamicType,
148 {
149 let mut b = MessageBuilder::signal(path, iface, signal_name)?;
150
151 if let Some(sender) = sender {
152 b = b.sender(sender)?;
153 }
154 if let Some(destination) = destination {
155 b = b.destination(destination)?;
156 }
157 b.build(body)
158 }
159
160 /// Create a message of type [`MessageType::MethodReturn`].
161 ///
162 /// [`MessageType::MethodReturn`]: enum.MessageType.html#variant.MethodReturn
163 pub fn method_reply<'s, S, B>(sender: Option<S>, call: &Self, body: &B) -> Result<Self>
164 where
165 S: TryInto<UniqueName<'s>>,
166 S::Error: Into<Error>,
167 B: serde::ser::Serialize + DynamicType,
168 {
169 let mut b = MessageBuilder::method_return(&call.header()?)?;
170 if let Some(sender) = sender {
171 b = b.sender(sender)?;
172 }
173 b.build(body)
174 }
175
176 /// Create a message of type [`MessageType::MethodError`].
177 ///
178 /// [`MessageType::MethodError`]: enum.MessageType.html#variant.MethodError
179 pub fn method_error<'s, 'e, S, E, B>(
180 sender: Option<S>,
181 call: &Self,
182 name: E,
183 body: &B,
184 ) -> Result<Self>
185 where
186 S: TryInto<UniqueName<'s>>,
187 S::Error: Into<Error>,
188 E: TryInto<ErrorName<'e>>,
189 E::Error: Into<Error>,
190 B: serde::ser::Serialize + DynamicType,
191 {
192 let mut b = MessageBuilder::error(&call.header()?, name)?;
193 if let Some(sender) = sender {
194 b = b.sender(sender)?;
195 }
196 b.build(body)
197 }
198
199 /// Create a message from bytes.
200 ///
201 /// The `fds` parameter is only available on unix. It specifies the file descriptors that
202 /// accompany the message. On the wire, values of the UNIX_FD types store the index of the
203 /// corresponding file descriptor in this vector. Passing an empty vector on a message that
204 /// has UNIX_FD will result in an error.
205 ///
206 /// **Note:** Since the constructed message is not construct by zbus, the receive sequence,
207 /// which can be acquired from [`Message::recv_position`], is not applicable and hence set
208 /// to `0`.
209 ///
210 /// # Safety
211 ///
212 /// This method is unsafe as bytes may have an invalid encoding.
213 pub unsafe fn from_bytes(bytes: Vec<u8>, #[cfg(unix)] fds: Vec<OwnedFd>) -> Result<Self> {
214 Self::from_raw_parts(
215 bytes,
216 #[cfg(unix)]
217 fds,
218 0,
219 )
220 }
221
222 /// Create a message from its full contents
223 pub(crate) fn from_raw_parts(
224 bytes: Vec<u8>,
225 #[cfg(unix)] fds: Vec<OwnedFd>,
226 recv_seq: u64,
227 ) -> Result<Self> {
228 if EndianSig::try_from(bytes[0])? != NATIVE_ENDIAN_SIG {
229 return Err(Error::IncorrectEndian);
230 }
231
232 let (primary_header, fields_len) = MessagePrimaryHeader::read(&bytes)?;
233 let header = zvariant::from_slice(&bytes, dbus_context!(0))?;
234 #[cfg(unix)]
235 let fds = Arc::new(RwLock::new(Fds::Owned(fds)));
236
237 let header_len = MIN_MESSAGE_SIZE + fields_len as usize;
238 let body_offset = header_len + padding_for_8_bytes(header_len);
239 let quick_fields = QuickMessageFields::new(&bytes, &header)?;
240
241 Ok(Self {
242 primary_header,
243 quick_fields,
244 bytes,
245 body_offset,
246 #[cfg(unix)]
247 fds,
248 recv_seq: MessageSequence { recv_seq },
249 })
250 }
251
252 /// Take ownership of the associated file descriptors in the message.
253 ///
254 /// When a message is received over a AF_UNIX socket, it may contain associated FDs. To prevent
255 /// the message from closing those FDs on drop, call this method that returns all the received
256 /// FDs with their ownership.
257 ///
258 /// This function is Unix-specific.
259 ///
260 /// Note: the message will continue to reference the files, so you must keep them open for as
261 /// long as the message itself.
262 #[cfg(unix)]
263 pub fn take_fds(&self) -> Vec<OwnedFd> {
264 let mut fds_lock = self.fds.write().expect(LOCK_PANIC_MSG);
265 if let Fds::Owned(ref mut fds) = *fds_lock {
266 // From now on, it's the caller responsibility to close the fds
267 let fds = std::mem::take(&mut *fds);
268 *fds_lock = Fds::Raw(fds.iter().map(|fd| fd.as_raw_fd()).collect());
269 fds
270 } else {
271 vec![]
272 }
273 }
274
275 /// The signature of the body.
276 ///
277 /// **Note:** While zbus treats multiple arguments as a struct (to allow you to use the tuple
278 /// syntax), D-Bus does not. Since this method gives you the signature expected on the wire by
279 /// D-Bus, the trailing and leading STRUCT signature parenthesis will not be present in case of
280 /// multiple arguments.
281 pub fn body_signature(&self) -> Result<Signature<'_>> {
282 match self
283 .header()?
284 .into_fields()
285 .into_field(MessageFieldCode::Signature)
286 .ok_or(Error::NoBodySignature)?
287 {
288 MessageField::Signature(signature) => Ok(signature),
289 _ => Err(Error::InvalidField),
290 }
291 }
292
293 pub fn primary_header(&self) -> &MessagePrimaryHeader {
294 &self.primary_header
295 }
296
297 pub(crate) fn modify_primary_header<F>(&mut self, mut modifier: F) -> Result<()>
298 where
299 F: FnMut(&mut MessagePrimaryHeader) -> Result<()>,
300 {
301 modifier(&mut self.primary_header)?;
302
303 let mut cursor = Cursor::new(&mut self.bytes);
304 zvariant::to_writer(&mut cursor, dbus_context!(0), &self.primary_header)
305 .map(|_| ())
306 .map_err(Error::from)
307 }
308
309 /// Deserialize the header.
310 ///
311 /// Note: prefer using the direct access methods if possible; they are more efficient.
312 pub fn header(&self) -> Result<MessageHeader<'_>> {
313 zvariant::from_slice(&self.bytes, dbus_context!(0)).map_err(Error::from)
314 }
315
316 /// Deserialize the fields.
317 ///
318 /// Note: prefer using the direct access methods if possible; they are more efficient.
319 pub fn fields(&self) -> Result<MessageFields<'_>> {
320 let ctxt = dbus_context!(crate::PRIMARY_HEADER_SIZE);
321 zvariant::from_slice(&self.bytes[crate::PRIMARY_HEADER_SIZE..], ctxt).map_err(Error::from)
322 }
323
324 /// The message type.
325 pub fn message_type(&self) -> MessageType {
326 self.primary_header.msg_type()
327 }
328
329 /// The object to send a call to, or the object a signal is emitted from.
330 pub fn path(&self) -> Option<ObjectPath<'_>> {
331 self.quick_fields.path(self)
332 }
333
334 /// The interface to invoke a method call on, or that a signal is emitted from.
335 pub fn interface(&self) -> Option<InterfaceName<'_>> {
336 self.quick_fields.interface(self)
337 }
338
339 /// The member, either the method name or signal name.
340 pub fn member(&self) -> Option<MemberName<'_>> {
341 self.quick_fields.member(self)
342 }
343
344 /// The serial number of the message this message is a reply to.
345 pub fn reply_serial(&self) -> Option<u32> {
346 self.quick_fields.reply_serial()
347 }
348
349 /// Deserialize the body (without checking signature matching).
350 pub fn body_unchecked<'d, 'm: 'd, B>(&'m self) -> Result<B>
351 where
352 B: serde::de::Deserialize<'d> + Type,
353 {
354 {
355 #[cfg(unix)]
356 {
357 zvariant::from_slice_fds(
358 &self.bytes[self.body_offset..],
359 Some(&self.fds()),
360 dbus_context!(0),
361 )
362 }
363 #[cfg(not(unix))]
364 {
365 zvariant::from_slice(&self.bytes[self.body_offset..], dbus_context!(0))
366 }
367 }
368 .map_err(Error::from)
369 }
370
371 /// Deserialize the body using the contained signature.
372 ///
373 /// # Example
374 ///
375 /// ```
376 /// # use zbus::Message;
377 /// # (|| -> zbus::Result<()> {
378 /// let send_body = (7i32, (2i32, "foo"), vec!["bar"]);
379 /// let message = Message::method(None::<&str>, Some("zbus.test"), "/", Some("zbus.test"), "ping", &send_body)?;
380 /// let body : zbus::zvariant::Structure = message.body()?;
381 /// let fields = body.fields();
382 /// assert!(matches!(fields[0], zvariant::Value::I32(7)));
383 /// assert!(matches!(fields[1], zvariant::Value::Structure(_)));
384 /// assert!(matches!(fields[2], zvariant::Value::Array(_)));
385 ///
386 /// let reply_msg = Message::method_reply(None::<&str>, &message, &body)?;
387 /// let reply_value : (i32, (i32, &str), Vec<String>) = reply_msg.body()?;
388 ///
389 /// assert_eq!(reply_value.0, 7);
390 /// assert_eq!(reply_value.2.len(), 1);
391 /// # Ok(()) })().unwrap()
392 /// ```
393 pub fn body<'d, 'm: 'd, B>(&'m self) -> Result<B>
394 where
395 B: zvariant::DynamicDeserialize<'d>,
396 {
397 let body_sig = match self.body_signature() {
398 Ok(sig) => sig,
399 Err(Error::NoBodySignature) => Signature::from_static_str_unchecked(""),
400 Err(e) => return Err(e),
401 };
402
403 {
404 #[cfg(unix)]
405 {
406 zvariant::from_slice_fds_for_dynamic_signature(
407 &self.bytes[self.body_offset..],
408 Some(&self.fds()),
409 dbus_context!(0),
410 &body_sig,
411 )
412 }
413 #[cfg(not(unix))]
414 {
415 zvariant::from_slice_for_dynamic_signature(
416 &self.bytes[self.body_offset..],
417 dbus_context!(0),
418 &body_sig,
419 )
420 }
421 }
422 .map_err(Error::from)
423 }
424
425 #[cfg(unix)]
426 pub(crate) fn fds(&self) -> Vec<RawFd> {
427 match &*self.fds.read().expect(LOCK_PANIC_MSG) {
428 Fds::Raw(fds) => fds.clone(),
429 Fds::Owned(fds) => fds.iter().map(|f| f.as_raw_fd()).collect(),
430 }
431 }
432
433 /// Get a reference to the byte encoding of the message.
434 pub fn as_bytes(&self) -> &[u8] {
435 &self.bytes
436 }
437
438 /// Get a reference to the byte encoding of the body of the message.
439 pub fn body_as_bytes(&self) -> Result<&[u8]> {
440 Ok(&self.bytes[self.body_offset..])
441 }
442
443 /// Get the receive ordering of a message.
444 ///
445 /// This may be used to identify how two events were ordered on the bus. It only produces a
446 /// useful ordering for messages that were produced by the same [`zbus::Connection`].
447 ///
448 /// This is completely unrelated to the serial number on the message, which is set by the peer
449 /// and might not be ordered at all.
450 pub fn recv_position(&self) -> MessageSequence {
451 self.recv_seq
452 }
453}
454
455impl fmt::Debug for Message {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 let mut msg = f.debug_struct("Msg");
458 let _ = self.header().map(|h| {
459 if let Ok(t) = h.message_type() {
460 msg.field("type", &t);
461 }
462 if let Ok(Some(sender)) = h.sender() {
463 msg.field("sender", &sender);
464 }
465 if let Ok(Some(serial)) = h.reply_serial() {
466 msg.field("reply-serial", &serial);
467 }
468 if let Ok(Some(path)) = h.path() {
469 msg.field("path", &path);
470 }
471 if let Ok(Some(iface)) = h.interface() {
472 msg.field("iface", &iface);
473 }
474 if let Ok(Some(member)) = h.member() {
475 msg.field("member", &member);
476 }
477 });
478 if let Ok(s) = self.body_signature() {
479 msg.field("body", &s);
480 }
481 #[cfg(unix)]
482 {
483 let fds = self.fds();
484 if !fds.is_empty() {
485 msg.field("fds", &fds);
486 }
487 }
488 msg.finish()
489 }
490}
491
492impl fmt::Display for Message {
493 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494 let header = self.header();
495 let (ty, error_name, sender, member) = if let Ok(h) = header.as_ref() {
496 (
497 h.message_type().ok(),
498 h.error_name().ok().flatten(),
499 h.sender().ok().flatten(),
500 h.member().ok().flatten(),
501 )
502 } else {
503 (None, None, None, None)
504 };
505
506 match ty {
507 Some(MessageType::MethodCall) => {
508 write!(f, "Method call")?;
509 if let Some(m) = member {
510 write!(f, " {m}")?;
511 }
512 }
513 Some(MessageType::MethodReturn) => {
514 write!(f, "Method return")?;
515 }
516 Some(MessageType::Error) => {
517 write!(f, "Error")?;
518 if let Some(e) = error_name {
519 write!(f, " {e}")?;
520 }
521
522 let msg = self.body_unchecked::<&str>();
523 if let Ok(msg) = msg {
524 write!(f, ": {msg}")?;
525 }
526 }
527 Some(MessageType::Signal) => {
528 write!(f, "Signal")?;
529 if let Some(m) = member {
530 write!(f, " {m}")?;
531 }
532 }
533 _ => {
534 write!(f, "Unknown message")?;
535 }
536 }
537
538 if let Some(s) = sender {
539 write!(f, " from {s}")?;
540 }
541
542 Ok(())
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 #[cfg(unix)]
549 use std::os::unix::io::AsRawFd;
550 use test_log::test;
551 #[cfg(unix)]
552 use zvariant::Fd;
553
554 #[cfg(unix)]
555 use super::Fds;
556 use super::Message;
557 use crate::Error;
558
559 #[test]
560 fn test() {
561 #[cfg(unix)]
562 let stdout = std::io::stdout();
563 let m = Message::method(
564 Some(":1.72"),
565 None::<()>,
566 "/",
567 None::<()>,
568 "do",
569 &(
570 #[cfg(unix)]
571 Fd::from(&stdout),
572 "foo",
573 ),
574 )
575 .unwrap();
576 assert_eq!(
577 m.body_signature().unwrap().to_string(),
578 if cfg!(unix) { "hs" } else { "s" }
579 );
580 #[cfg(unix)]
581 assert_eq!(*m.fds.read().unwrap(), Fds::Raw(vec![stdout.as_raw_fd()]));
582
583 let body: Result<u32, Error> = m.body();
584 assert!(matches!(
585 body.unwrap_err(),
586 Error::Variant(zvariant::Error::SignatureMismatch { .. })
587 ));
588
589 assert_eq!(m.to_string(), "Method call do from :1.72");
590 let r = Message::method_reply(None::<()>, &m, &("all fine!")).unwrap();
591 assert_eq!(r.to_string(), "Method return");
592 let e = Message::method_error(
593 None::<()>,
594 &m,
595 "org.freedesktop.zbus.Error",
596 &("kaboom!", 32),
597 )
598 .unwrap();
599 assert_eq!(e.to_string(), "Error org.freedesktop.zbus.Error: kaboom!");
600 }
601}
602