1use futures_util::StreamExt;
2use static_assertions::assert_impl_all;
3use std::{convert::TryInto, sync::Arc};
4
5use crate::{blocking::Connection, utils::block_on, MatchRule, Message, OwnedMatchRule, Result};
6
7/// A blocking wrapper of [`crate::MessageStream`].
8///
9/// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate
10/// over this type until it's consumed or dropped.
11#[derive(derivative::Derivative, Clone)]
12#[derivative(Debug)]
13pub struct MessageIterator {
14 // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed
15 // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime
16 // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the
17 // stream to ensure any associated match rule is deregistered before the iterator is
18 // dropped.
19 pub(crate) azync: Option<crate::MessageStream>,
20}
21
22assert_impl_all!(MessageIterator: Send, Sync, Unpin);
23
24impl MessageIterator {
25 /// Get a reference to the underlying async message stream.
26 pub fn inner(&self) -> &crate::MessageStream {
27 self.azync.as_ref().expect("Inner stream is `None`")
28 }
29
30 /// Get the underlying async message stream, consuming `self`.
31 pub fn into_inner(mut self) -> crate::MessageStream {
32 self.azync.take().expect("Inner stream is `None`")
33 }
34
35 /// Create a message iterator for the given match rule.
36 ///
37 /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying
38 /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped.
39 ///
40 /// # Example
41 ///
42 /// ```
43 /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged};
44 ///
45 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
46 /// let conn = Connection::session()?;
47 /// let rule = MatchRule::builder()
48 /// .msg_type(zbus::MessageType::Signal)
49 /// .sender("org.freedesktop.DBus")?
50 /// .interface("org.freedesktop.DBus")?
51 /// .member("NameOwnerChanged")?
52 /// .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42")?
53 /// .build();
54 /// let mut iter = MessageIterator::for_match_rule(
55 /// rule,
56 /// &conn,
57 /// // For such a specific match rule, we don't need a big queue.
58 /// Some(1),
59 /// )?;
60 ///
61 /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
62 /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\
63 /// arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'";
64 /// assert_eq!(
65 /// iter.match_rule().map(|r| r.to_string()).as_deref(),
66 /// Some(rule_str),
67 /// );
68 ///
69 /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering
70 /// // messages based on the match rule, we'd receive method return call for each of these 2
71 /// // calls first.
72 /// //
73 /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
74 /// // we register since we setup an arg filter.
75 /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44")?;
76 /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42")?;
77 ///
78 /// let msg = iter.next().unwrap()?;
79 /// let signal = NameOwnerChanged::from_message(msg).unwrap();
80 /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42");
81 ///
82 /// # Ok(())
83 /// # }
84 /// ```
85 ///
86 /// # Caveats
87 ///
88 /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
89 pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self>
90 where
91 R: TryInto<OwnedMatchRule>,
92 R::Error: Into<crate::Error>,
93 {
94 block_on(crate::MessageStream::for_match_rule(
95 rule,
96 conn.inner(),
97 max_queued,
98 ))
99 .map(Some)
100 .map(|s| Self { azync: s })
101 }
102
103 /// The associated match rule, if any.
104 pub fn match_rule(&self) -> Option<MatchRule<'_>> {
105 self.azync
106 .as_ref()
107 .expect("Inner stream is `None`")
108 .match_rule()
109 }
110}
111
112impl Iterator for MessageIterator {
113 type Item = Result<Arc<Message>>;
114
115 fn next(&mut self) -> Option<Self::Item> {
116 block_on(self.azync.as_mut().expect(msg:"Inner stream is `None`").next())
117 }
118}
119
120impl From<Connection> for MessageIterator {
121 fn from(conn: Connection) -> Self {
122 let azync: MessageStream = crate::MessageStream::from(conn.into_inner());
123
124 Self { azync: Some(azync) }
125 }
126}
127
128impl From<&Connection> for MessageIterator {
129 fn from(conn: &Connection) -> Self {
130 Self::from(conn.clone())
131 }
132}
133
134impl From<MessageIterator> for Connection {
135 fn from(mut iter: MessageIterator) -> Connection {
136 Connection::from(crate::Connection::from(
137 iter.azync.take().expect(msg:"Inner stream is `None`"),
138 ))
139 }
140}
141
142impl From<&MessageIterator> for Connection {
143 fn from(iter: &MessageIterator) -> Connection {
144 Connection::from(crate::Connection::from(
145 iter.azync.as_ref().expect(msg:"Inner stream is `None`"),
146 ))
147 }
148}
149
150impl std::ops::Drop for MessageIterator {
151 fn drop(&mut self) {
152 block_on(future:async {
153 if let Some(azync: MessageStream) = self.azync.take() {
154 crate::AsyncDrop::async_drop(self:azync).await;
155 }
156 });
157 }
158}
159