1use std::{
2 convert::TryInto,
3 pin::Pin,
4 sync::Arc,
5 task::{Context, Poll},
6};
7
8use async_broadcast::Receiver as ActiveReceiver;
9use futures_core::stream;
10use futures_util::stream::FusedStream;
11use ordered_stream::{OrderedStream, PollResult};
12use static_assertions::assert_impl_all;
13use tracing::warn;
14
15use crate::{
16 AsyncDrop, Connection, ConnectionInner, MatchRule, Message, MessageSequence, OwnedMatchRule,
17 Result,
18};
19
20/// A [`stream::Stream`] implementation that yields [`Message`] items.
21///
22/// You can convert a [`Connection`] to this type and back to [`Connection`].
23///
24/// **NOTE**: You must ensure a `MessageStream` is continuously polled or you will experience hangs.
25/// If you don't need to continuously poll the `MessageStream` but need to keep it around for later
26/// use, keep the connection around and convert it into a `MessageStream` when needed. The
27/// conversion is not an expensive operation so you don't need to worry about performance, unless
28/// you do it very frequently. If you need to convert back and forth frequently, you may want to
29/// consider keeping both a connection and stream around.
30#[derive(Clone, Debug)]
31#[must_use = "streams do nothing unless polled"]
32pub struct MessageStream {
33 inner: Inner,
34}
35
36assert_impl_all!(MessageStream: Send, Sync, Unpin);
37
38impl MessageStream {
39 /// Create a message stream for the given match rule.
40 ///
41 /// If `conn` is a bus connection and match rule is for a signal, the match rule will be
42 /// registered with the bus and queued for deregistration when the stream is dropped. If you'd
43 /// like immediate deregistration, use [`AsyncDrop::async_drop`]. The reason match rules are
44 /// only registered with the bus for signals is that D-Bus specification only allows signals to
45 /// be broadcasted and unicast messages are always sent to their destination (regardless of any
46 /// match rules registered by the destination) by the bus. Hence there is no need to register
47 /// match rules for non-signal messages with the bus.
48 ///
49 /// Having said that, stream created by this method can still very useful as it allows you to
50 /// avoid needless task wakeups and simplify your stream consuming code.
51 ///
52 /// You can optionally also request the capacity of the underlying message queue through
53 /// `max_queued`. If specified, the capacity is guaranteed to be at least `max_queued`. If not
54 /// specified, the default of 64 is assumed. The capacity can also be changed later through
55 /// [`MessageStream::set_max_queued`].
56 ///
57 /// # Example
58 ///
59 /// ```
60 /// use async_io::Timer;
61 /// use zbus::{AsyncDrop, Connection, MatchRule, MessageStream, fdo::NameOwnerChanged};
62 /// use futures_util::{TryStreamExt, future::select, future::Either::{Left, Right}, pin_mut};
63 ///
64 /// # zbus::block_on(async {
65 /// let conn = Connection::session().await?;
66 /// let rule = MatchRule::builder()
67 /// .msg_type(zbus::MessageType::Signal)
68 /// .sender("org.freedesktop.DBus")?
69 /// .interface("org.freedesktop.DBus")?
70 /// .member("NameOwnerChanged")?
71 /// .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42")?
72 /// .build();
73 /// let mut stream = MessageStream::for_match_rule(
74 /// rule,
75 /// &conn,
76 /// // For such a specific match rule, we don't need a big queue.
77 /// Some(1),
78 /// ).await?;
79 ///
80 /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\
81 /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\
82 /// arg0='org.freedesktop.zbus.MatchRuleStreamTest42'";
83 /// assert_eq!(
84 /// stream.match_rule().map(|r| r.to_string()).as_deref(),
85 /// Some(rule_str),
86 /// );
87 ///
88 /// // We register 2 names, starting with the uninteresting one. If `stream` wasn't filtering
89 /// // messages based on the match rule, we'd receive method return call for each of these 2
90 /// // calls first.
91 /// //
92 /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name
93 /// // we register since we setup an arg filter.
94 /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest44")
95 /// .await?;
96 /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest42")
97 /// .await?;
98 ///
99 /// let msg = stream.try_next().await?.unwrap();
100 /// let signal = NameOwnerChanged::from_message(msg).unwrap();
101 /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleStreamTest42");
102 /// stream.async_drop().await;
103 ///
104 /// // Ensure the match rule is deregistered and this connection doesn't receive
105 /// // `NameOwnerChanged` signals.
106 /// let stream = MessageStream::from(&conn).try_filter_map(|msg| async move {
107 /// Ok(NameOwnerChanged::from_message(msg))
108 /// });
109 /// conn.release_name("org.freedesktop.zbus.MatchRuleStreamTest42").await?;
110 ///
111 /// pin_mut!(stream);
112 /// let next = stream.try_next();
113 /// pin_mut!(next);
114 /// let timeout = Timer::after(std::time::Duration::from_millis(50));
115 /// pin_mut!(timeout);
116 /// match select(next, timeout).await {
117 /// Left((msg, _)) => unreachable!("unexpected message: {:?}", msg),
118 /// Right((_, _)) => (),
119 /// }
120 ///
121 /// # Ok::<(), zbus::Error>(())
122 /// # }).unwrap();
123 /// ```
124 ///
125 /// # Caveats
126 ///
127 /// Since this method relies on [`MatchRule::matches`], it inherits its caveats.
128 pub async fn for_match_rule<R>(
129 rule: R,
130 conn: &Connection,
131 max_queued: Option<usize>,
132 ) -> Result<Self>
133 where
134 R: TryInto<OwnedMatchRule>,
135 R::Error: Into<crate::Error>,
136 {
137 let rule = rule.try_into().map_err(Into::into)?;
138 let msg_receiver = conn.add_match(rule.clone(), max_queued).await?;
139
140 Ok(Self::for_subscription_channel(
141 msg_receiver,
142 Some(rule),
143 conn,
144 ))
145 }
146
147 /// The associated match rule, if any.
148 pub fn match_rule(&self) -> Option<MatchRule<'_>> {
149 self.inner.match_rule.as_deref().cloned()
150 }
151
152 /// The maximum number of messages to queue for this stream.
153 pub fn max_queued(&self) -> usize {
154 self.inner.msg_receiver.capacity()
155 }
156
157 /// Set maximum number of messages to queue for this stream.
158 ///
159 /// After this call, the capacity is guaranteed to be at least `max_queued`.
160 pub fn set_max_queued(&mut self, max_queued: usize) {
161 if max_queued <= self.max_queued() {
162 return;
163 }
164 self.inner.msg_receiver.set_capacity(max_queued);
165 }
166
167 pub(crate) fn for_subscription_channel(
168 msg_receiver: ActiveReceiver<Result<Arc<Message>>>,
169 rule: Option<OwnedMatchRule>,
170 conn: &Connection,
171 ) -> Self {
172 let conn_inner = conn.inner.clone();
173
174 Self {
175 inner: Inner {
176 conn_inner,
177 msg_receiver,
178 match_rule: rule,
179 },
180 }
181 }
182}
183
184impl stream::Stream for MessageStream {
185 type Item = Result<Arc<Message>>;
186
187 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188 let this: &mut MessageStream = self.get_mut();
189
190 Pin::new(&mut this.inner.msg_receiver).poll_next(cx)
191 }
192}
193
194impl OrderedStream for MessageStream {
195 type Data = Result<Arc<Message>>;
196 type Ordering = MessageSequence;
197
198 fn poll_next_before(
199 self: Pin<&mut Self>,
200 cx: &mut Context<'_>,
201 before: Option<&Self::Ordering>,
202 ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
203 let this = self.get_mut();
204
205 match stream::Stream::poll_next(Pin::new(this), cx) {
206 Poll::Pending if before.is_some() => {
207 // Assume the provided MessageSequence in before was obtained from a Message
208 // associated with our Connection (because that's the only supported use case).
209 // Because there is only one socket-reader task, any messages that would have been
210 // ordered before that message would have already been sitting in the broadcast
211 // queue (and we would have seen Ready in our poll). Because we didn't, we can
212 // guarantee that we won't ever produce a message whose sequence is before that
213 // provided value, and so we can return NoneBefore.
214 //
215 // This ensures that ordered_stream::Join will never return Pending while it
216 // has a message buffered.
217 Poll::Ready(PollResult::NoneBefore)
218 }
219 Poll::Pending => Poll::Pending,
220 Poll::Ready(Some(Ok(msg))) => Poll::Ready(PollResult::Item {
221 ordering: msg.recv_position(),
222 data: Ok(msg),
223 }),
224 Poll::Ready(Some(Err(e))) => Poll::Ready(PollResult::Item {
225 ordering: MessageSequence::LAST,
226 data: Err(e),
227 }),
228 Poll::Ready(None) => Poll::Ready(PollResult::Terminated),
229 }
230 }
231}
232
233impl FusedStream for MessageStream {
234 fn is_terminated(&self) -> bool {
235 self.inner.msg_receiver.is_terminated()
236 }
237}
238
239impl From<Connection> for MessageStream {
240 fn from(conn: Connection) -> Self {
241 let conn_inner: Arc = conn.inner;
242 let msg_receiver: Receiver, …>> = conn_inner.msg_receiver.activate_cloned();
243
244 Self {
245 inner: Inner {
246 conn_inner,
247 msg_receiver,
248 match_rule: None,
249 },
250 }
251 }
252}
253
254impl From<&Connection> for MessageStream {
255 fn from(conn: &Connection) -> Self {
256 Self::from(conn.clone())
257 }
258}
259
260impl From<MessageStream> for Connection {
261 fn from(stream: MessageStream) -> Connection {
262 Connection::from(&stream)
263 }
264}
265
266impl From<&MessageStream> for Connection {
267 fn from(stream: &MessageStream) -> Connection {
268 Connection {
269 inner: stream.inner.conn_inner.clone(),
270 }
271 }
272}
273
274#[derive(Clone, Debug)]
275struct Inner {
276 conn_inner: Arc<ConnectionInner>,
277 msg_receiver: ActiveReceiver<Result<Arc<Message>>>,
278 match_rule: Option<OwnedMatchRule>,
279}
280
281impl Drop for Inner {
282 fn drop(&mut self) {
283 let conn: Connection = Connection {
284 inner: self.conn_inner.clone(),
285 };
286
287 if let Some(rule: OwnedMatchRule) = self.match_rule.take() {
288 conn.queue_remove_match(rule);
289 }
290 }
291}
292
293#[async_trait::async_trait]
294impl AsyncDrop for MessageStream {
295 async fn async_drop(mut self) {
296 let conn: Connection = Connection {
297 inner: self.inner.conn_inner.clone(),
298 };
299
300 if let Some(rule: OwnedMatchRule) = self.inner.match_rule.take() {
301 if let Err(e: Error) = conn.remove_match(rule).await {
302 warn!("Failed to remove match rule: {}", e);
303 }
304 }
305 }
306}
307