1 | use futures_util::StreamExt; |
2 | use static_assertions::assert_impl_all; |
3 | use std::{convert::TryInto, sync::Arc}; |
4 | |
5 | use crate::{blocking::Connection, utils::block_on, MatchRule, Message, OwnedMatchRule, Result}; |
6 | |
7 | /// A blocking wrapper of [`crate::MessageStream`]. |
8 | /// |
9 | /// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate |
10 | /// over this type until it's consumed or dropped. |
11 | #[derive (derivative::Derivative, Clone)] |
12 | #[derivative(Debug)] |
13 | pub struct MessageIterator { |
14 | // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed |
15 | // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime |
16 | // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the |
17 | // stream to ensure any associated match rule is deregistered before the iterator is |
18 | // dropped. |
19 | pub(crate) azync: Option<crate::MessageStream>, |
20 | } |
21 | |
22 | assert_impl_all!(MessageIterator: Send, Sync, Unpin); |
23 | |
24 | impl MessageIterator { |
25 | /// Get a reference to the underlying async message stream. |
26 | pub fn inner(&self) -> &crate::MessageStream { |
27 | self.azync.as_ref().expect("Inner stream is `None`" ) |
28 | } |
29 | |
30 | /// Get the underlying async message stream, consuming `self`. |
31 | pub fn into_inner(mut self) -> crate::MessageStream { |
32 | self.azync.take().expect("Inner stream is `None`" ) |
33 | } |
34 | |
35 | /// Create a message iterator for the given match rule. |
36 | /// |
37 | /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying |
38 | /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped. |
39 | /// |
40 | /// # Example |
41 | /// |
42 | /// ``` |
43 | /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged}; |
44 | /// |
45 | /// # fn main() -> Result<(), Box<dyn std::error::Error>> { |
46 | /// let conn = Connection::session()?; |
47 | /// let rule = MatchRule::builder() |
48 | /// .msg_type(zbus::MessageType::Signal) |
49 | /// .sender("org.freedesktop.DBus" )? |
50 | /// .interface("org.freedesktop.DBus" )? |
51 | /// .member("NameOwnerChanged" )? |
52 | /// .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42" )? |
53 | /// .build(); |
54 | /// let mut iter = MessageIterator::for_match_rule( |
55 | /// rule, |
56 | /// &conn, |
57 | /// // For such a specific match rule, we don't need a big queue. |
58 | /// Some(1), |
59 | /// )?; |
60 | /// |
61 | /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\ |
62 | /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\ |
63 | /// arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'" ; |
64 | /// assert_eq!( |
65 | /// iter.match_rule().map(|r| r.to_string()).as_deref(), |
66 | /// Some(rule_str), |
67 | /// ); |
68 | /// |
69 | /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering |
70 | /// // messages based on the match rule, we'd receive method return call for each of these 2 |
71 | /// // calls first. |
72 | /// // |
73 | /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name |
74 | /// // we register since we setup an arg filter. |
75 | /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44" )?; |
76 | /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42" )?; |
77 | /// |
78 | /// let msg = iter.next().unwrap()?; |
79 | /// let signal = NameOwnerChanged::from_message(msg).unwrap(); |
80 | /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42" ); |
81 | /// |
82 | /// # Ok(()) |
83 | /// # } |
84 | /// ``` |
85 | /// |
86 | /// # Caveats |
87 | /// |
88 | /// Since this method relies on [`MatchRule::matches`], it inherits its caveats. |
89 | pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self> |
90 | where |
91 | R: TryInto<OwnedMatchRule>, |
92 | R::Error: Into<crate::Error>, |
93 | { |
94 | block_on(crate::MessageStream::for_match_rule( |
95 | rule, |
96 | conn.inner(), |
97 | max_queued, |
98 | )) |
99 | .map(Some) |
100 | .map(|s| Self { azync: s }) |
101 | } |
102 | |
103 | /// The associated match rule, if any. |
104 | pub fn match_rule(&self) -> Option<MatchRule<'_>> { |
105 | self.azync |
106 | .as_ref() |
107 | .expect("Inner stream is `None`" ) |
108 | .match_rule() |
109 | } |
110 | } |
111 | |
112 | impl Iterator for MessageIterator { |
113 | type Item = Result<Arc<Message>>; |
114 | |
115 | fn next(&mut self) -> Option<Self::Item> { |
116 | block_on(self.azync.as_mut().expect(msg:"Inner stream is `None`" ).next()) |
117 | } |
118 | } |
119 | |
120 | impl From<Connection> for MessageIterator { |
121 | fn from(conn: Connection) -> Self { |
122 | let azync: MessageStream = crate::MessageStream::from(conn.into_inner()); |
123 | |
124 | Self { azync: Some(azync) } |
125 | } |
126 | } |
127 | |
128 | impl From<&Connection> for MessageIterator { |
129 | fn from(conn: &Connection) -> Self { |
130 | Self::from(conn.clone()) |
131 | } |
132 | } |
133 | |
134 | impl From<MessageIterator> for Connection { |
135 | fn from(mut iter: MessageIterator) -> Connection { |
136 | Connection::from(crate::Connection::from( |
137 | iter.azync.take().expect(msg:"Inner stream is `None`" ), |
138 | )) |
139 | } |
140 | } |
141 | |
142 | impl From<&MessageIterator> for Connection { |
143 | fn from(iter: &MessageIterator) -> Connection { |
144 | Connection::from(crate::Connection::from( |
145 | iter.azync.as_ref().expect(msg:"Inner stream is `None`" ), |
146 | )) |
147 | } |
148 | } |
149 | |
150 | impl std::ops::Drop for MessageIterator { |
151 | fn drop(&mut self) { |
152 | block_on(future:async { |
153 | if let Some(azync: MessageStream) = self.azync.take() { |
154 | crate::AsyncDrop::async_drop(self:azync).await; |
155 | } |
156 | }); |
157 | } |
158 | } |
159 | |