| 1 | use futures_util::StreamExt; |
| 2 | use static_assertions::assert_impl_all; |
| 3 | |
| 4 | use crate::{ |
| 5 | blocking::Connection, message::Message, utils::block_on, MatchRule, OwnedMatchRule, Result, |
| 6 | }; |
| 7 | |
| 8 | /// A blocking wrapper of [`crate::MessageStream`]. |
| 9 | /// |
| 10 | /// Just like [`crate::MessageStream`] must be continuously polled, you must continuously iterate |
| 11 | /// over this type until it's consumed or dropped. |
| 12 | #[derive (Debug, Clone)] |
| 13 | pub struct MessageIterator { |
| 14 | // Wrap it in an `Option` to ensure the stream is dropped in a `block_on` call. This is needed |
| 15 | // for tokio because the proxy spawns a task in its `Drop` impl and that needs a runtime |
| 16 | // context in case of tokio. Moreover, we want to use `AsyncDrop::async_drop` to drop the |
| 17 | // stream to ensure any associated match rule is deregistered before the iterator is |
| 18 | // dropped. |
| 19 | pub(crate) azync: Option<crate::MessageStream>, |
| 20 | } |
| 21 | |
| 22 | assert_impl_all!(MessageIterator: Send, Sync, Unpin); |
| 23 | |
| 24 | impl MessageIterator { |
| 25 | /// Get a reference to the underlying async message stream. |
| 26 | pub fn inner(&self) -> &crate::MessageStream { |
| 27 | self.azync.as_ref().expect("Inner stream is `None`" ) |
| 28 | } |
| 29 | |
| 30 | /// Get the underlying async message stream, consuming `self`. |
| 31 | pub fn into_inner(mut self) -> crate::MessageStream { |
| 32 | self.azync.take().expect("Inner stream is `None`" ) |
| 33 | } |
| 34 | |
| 35 | /// Create a message iterator for the given match rule. |
| 36 | /// |
| 37 | /// This is a wrapper around [`crate::MessageStream::for_match_rule`]. Unlike the underlying |
| 38 | /// `MessageStream`, the match rule is immediately deregistered when the iterator is dropped. |
| 39 | /// |
| 40 | /// # Example |
| 41 | /// |
| 42 | /// ``` |
| 43 | /// use zbus::{blocking::{Connection, MessageIterator}, MatchRule, fdo::NameOwnerChanged}; |
| 44 | /// |
| 45 | /// # fn main() -> Result<(), Box<dyn std::error::Error>> { |
| 46 | /// let conn = Connection::session()?; |
| 47 | /// let rule = MatchRule::builder() |
| 48 | /// .msg_type(zbus::message::Type::Signal) |
| 49 | /// .sender("org.freedesktop.DBus" )? |
| 50 | /// .interface("org.freedesktop.DBus" )? |
| 51 | /// .member("NameOwnerChanged" )? |
| 52 | /// .add_arg("org.freedesktop.zbus.MatchRuleIteratorTest42" )? |
| 53 | /// .build(); |
| 54 | /// let mut iter = MessageIterator::for_match_rule( |
| 55 | /// rule, |
| 56 | /// &conn, |
| 57 | /// // For such a specific match rule, we don't need a big queue. |
| 58 | /// Some(1), |
| 59 | /// )?; |
| 60 | /// |
| 61 | /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\ |
| 62 | /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\ |
| 63 | /// arg0='org.freedesktop.zbus.MatchRuleIteratorTest42'" ; |
| 64 | /// assert_eq!( |
| 65 | /// iter.match_rule().map(|r| r.to_string()).as_deref(), |
| 66 | /// Some(rule_str), |
| 67 | /// ); |
| 68 | /// |
| 69 | /// // We register 2 names, starting with the uninteresting one. If `iter` wasn't filtering |
| 70 | /// // messages based on the match rule, we'd receive method return call for each of these 2 |
| 71 | /// // calls first. |
| 72 | /// // |
| 73 | /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name |
| 74 | /// // we register since we setup an arg filter. |
| 75 | /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest44" )?; |
| 76 | /// conn.request_name("org.freedesktop.zbus.MatchRuleIteratorTest42" )?; |
| 77 | /// |
| 78 | /// let msg = iter.next().unwrap()?; |
| 79 | /// let signal = NameOwnerChanged::from_message(msg).unwrap(); |
| 80 | /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleIteratorTest42" ); |
| 81 | /// |
| 82 | /// # Ok(()) |
| 83 | /// # } |
| 84 | /// ``` |
| 85 | /// |
| 86 | /// # Caveats |
| 87 | /// |
| 88 | /// Since this method relies on [`MatchRule::matches`], it inherits its caveats. |
| 89 | pub fn for_match_rule<R>(rule: R, conn: &Connection, max_queued: Option<usize>) -> Result<Self> |
| 90 | where |
| 91 | R: TryInto<OwnedMatchRule>, |
| 92 | R::Error: Into<crate::Error>, |
| 93 | { |
| 94 | block_on(crate::MessageStream::for_match_rule( |
| 95 | rule, |
| 96 | conn.inner(), |
| 97 | max_queued, |
| 98 | )) |
| 99 | .map(Some) |
| 100 | .map(|s| Self { azync: s }) |
| 101 | } |
| 102 | |
| 103 | /// The associated match rule, if any. |
| 104 | pub fn match_rule(&self) -> Option<MatchRule<'_>> { |
| 105 | self.azync |
| 106 | .as_ref() |
| 107 | .expect("Inner stream is `None`" ) |
| 108 | .match_rule() |
| 109 | } |
| 110 | } |
| 111 | |
| 112 | impl Iterator for MessageIterator { |
| 113 | type Item = Result<Message>; |
| 114 | |
| 115 | fn next(&mut self) -> Option<Self::Item> { |
| 116 | block_on(self.azync.as_mut().expect(msg:"Inner stream is `None`" ).next()) |
| 117 | } |
| 118 | } |
| 119 | |
| 120 | impl From<Connection> for MessageIterator { |
| 121 | fn from(conn: Connection) -> Self { |
| 122 | let azync: MessageStream = crate::MessageStream::from(conn.into_inner()); |
| 123 | |
| 124 | Self { azync: Some(azync) } |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | impl From<&Connection> for MessageIterator { |
| 129 | fn from(conn: &Connection) -> Self { |
| 130 | Self::from(conn.clone()) |
| 131 | } |
| 132 | } |
| 133 | |
| 134 | impl From<MessageIterator> for Connection { |
| 135 | fn from(mut iter: MessageIterator) -> Connection { |
| 136 | Connection::from(crate::Connection::from( |
| 137 | iter.azync.take().expect(msg:"Inner stream is `None`" ), |
| 138 | )) |
| 139 | } |
| 140 | } |
| 141 | |
| 142 | impl From<&MessageIterator> for Connection { |
| 143 | fn from(iter: &MessageIterator) -> Connection { |
| 144 | Connection::from(crate::Connection::from( |
| 145 | iter.azync.as_ref().expect(msg:"Inner stream is `None`" ), |
| 146 | )) |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | impl std::ops::Drop for MessageIterator { |
| 151 | fn drop(&mut self) { |
| 152 | block_on(future:async { |
| 153 | if let Some(azync: MessageStream) = self.azync.take() { |
| 154 | crate::AsyncDrop::async_drop(self:azync).await; |
| 155 | } |
| 156 | }); |
| 157 | } |
| 158 | } |
| 159 | |