1 | use std::{ |
2 | convert::TryInto, |
3 | pin::Pin, |
4 | sync::Arc, |
5 | task::{Context, Poll}, |
6 | }; |
7 | |
8 | use async_broadcast::Receiver as ActiveReceiver; |
9 | use futures_core::stream; |
10 | use futures_util::stream::FusedStream; |
11 | use ordered_stream::{OrderedStream, PollResult}; |
12 | use static_assertions::assert_impl_all; |
13 | use tracing::warn; |
14 | |
15 | use crate::{ |
16 | AsyncDrop, Connection, ConnectionInner, MatchRule, Message, MessageSequence, OwnedMatchRule, |
17 | Result, |
18 | }; |
19 | |
20 | /// A [`stream::Stream`] implementation that yields [`Message`] items. |
21 | /// |
22 | /// You can convert a [`Connection`] to this type and back to [`Connection`]. |
23 | /// |
24 | /// **NOTE**: You must ensure a `MessageStream` is continuously polled or you will experience hangs. |
25 | /// If you don't need to continuously poll the `MessageStream` but need to keep it around for later |
26 | /// use, keep the connection around and convert it into a `MessageStream` when needed. The |
27 | /// conversion is not an expensive operation so you don't need to worry about performance, unless |
28 | /// you do it very frequently. If you need to convert back and forth frequently, you may want to |
29 | /// consider keeping both a connection and stream around. |
30 | #[derive (Clone, Debug)] |
31 | #[must_use = "streams do nothing unless polled" ] |
32 | pub struct MessageStream { |
33 | inner: Inner, |
34 | } |
35 | |
36 | assert_impl_all!(MessageStream: Send, Sync, Unpin); |
37 | |
38 | impl MessageStream { |
39 | /// Create a message stream for the given match rule. |
40 | /// |
41 | /// If `conn` is a bus connection and match rule is for a signal, the match rule will be |
42 | /// registered with the bus and queued for deregistration when the stream is dropped. If you'd |
43 | /// like immediate deregistration, use [`AsyncDrop::async_drop`]. The reason match rules are |
44 | /// only registered with the bus for signals is that D-Bus specification only allows signals to |
45 | /// be broadcasted and unicast messages are always sent to their destination (regardless of any |
46 | /// match rules registered by the destination) by the bus. Hence there is no need to register |
47 | /// match rules for non-signal messages with the bus. |
48 | /// |
49 | /// Having said that, stream created by this method can still very useful as it allows you to |
50 | /// avoid needless task wakeups and simplify your stream consuming code. |
51 | /// |
52 | /// You can optionally also request the capacity of the underlying message queue through |
53 | /// `max_queued`. If specified, the capacity is guaranteed to be at least `max_queued`. If not |
54 | /// specified, the default of 64 is assumed. The capacity can also be changed later through |
55 | /// [`MessageStream::set_max_queued`]. |
56 | /// |
57 | /// # Example |
58 | /// |
59 | /// ``` |
60 | /// use async_io::Timer; |
61 | /// use zbus::{AsyncDrop, Connection, MatchRule, MessageStream, fdo::NameOwnerChanged}; |
62 | /// use futures_util::{TryStreamExt, future::select, future::Either::{Left, Right}, pin_mut}; |
63 | /// |
64 | /// # zbus::block_on(async { |
65 | /// let conn = Connection::session().await?; |
66 | /// let rule = MatchRule::builder() |
67 | /// .msg_type(zbus::MessageType::Signal) |
68 | /// .sender("org.freedesktop.DBus" )? |
69 | /// .interface("org.freedesktop.DBus" )? |
70 | /// .member("NameOwnerChanged" )? |
71 | /// .add_arg("org.freedesktop.zbus.MatchRuleStreamTest42" )? |
72 | /// .build(); |
73 | /// let mut stream = MessageStream::for_match_rule( |
74 | /// rule, |
75 | /// &conn, |
76 | /// // For such a specific match rule, we don't need a big queue. |
77 | /// Some(1), |
78 | /// ).await?; |
79 | /// |
80 | /// let rule_str = "type='signal',sender='org.freedesktop.DBus',\ |
81 | /// interface='org.freedesktop.DBus',member='NameOwnerChanged',\ |
82 | /// arg0='org.freedesktop.zbus.MatchRuleStreamTest42'" ; |
83 | /// assert_eq!( |
84 | /// stream.match_rule().map(|r| r.to_string()).as_deref(), |
85 | /// Some(rule_str), |
86 | /// ); |
87 | /// |
88 | /// // We register 2 names, starting with the uninteresting one. If `stream` wasn't filtering |
89 | /// // messages based on the match rule, we'd receive method return call for each of these 2 |
90 | /// // calls first. |
91 | /// // |
92 | /// // Note that the `NameOwnerChanged` signal will not be sent by the bus for the first name |
93 | /// // we register since we setup an arg filter. |
94 | /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest44" ) |
95 | /// .await?; |
96 | /// conn.request_name("org.freedesktop.zbus.MatchRuleStreamTest42" ) |
97 | /// .await?; |
98 | /// |
99 | /// let msg = stream.try_next().await?.unwrap(); |
100 | /// let signal = NameOwnerChanged::from_message(msg).unwrap(); |
101 | /// assert_eq!(signal.args()?.name(), "org.freedesktop.zbus.MatchRuleStreamTest42" ); |
102 | /// stream.async_drop().await; |
103 | /// |
104 | /// // Ensure the match rule is deregistered and this connection doesn't receive |
105 | /// // `NameOwnerChanged` signals. |
106 | /// let stream = MessageStream::from(&conn).try_filter_map(|msg| async move { |
107 | /// Ok(NameOwnerChanged::from_message(msg)) |
108 | /// }); |
109 | /// conn.release_name("org.freedesktop.zbus.MatchRuleStreamTest42" ).await?; |
110 | /// |
111 | /// pin_mut!(stream); |
112 | /// let next = stream.try_next(); |
113 | /// pin_mut!(next); |
114 | /// let timeout = Timer::after(std::time::Duration::from_millis(50)); |
115 | /// pin_mut!(timeout); |
116 | /// match select(next, timeout).await { |
117 | /// Left((msg, _)) => unreachable!("unexpected message: {:?}" , msg), |
118 | /// Right((_, _)) => (), |
119 | /// } |
120 | /// |
121 | /// # Ok::<(), zbus::Error>(()) |
122 | /// # }).unwrap(); |
123 | /// ``` |
124 | /// |
125 | /// # Caveats |
126 | /// |
127 | /// Since this method relies on [`MatchRule::matches`], it inherits its caveats. |
128 | pub async fn for_match_rule<R>( |
129 | rule: R, |
130 | conn: &Connection, |
131 | max_queued: Option<usize>, |
132 | ) -> Result<Self> |
133 | where |
134 | R: TryInto<OwnedMatchRule>, |
135 | R::Error: Into<crate::Error>, |
136 | { |
137 | let rule = rule.try_into().map_err(Into::into)?; |
138 | let msg_receiver = conn.add_match(rule.clone(), max_queued).await?; |
139 | |
140 | Ok(Self::for_subscription_channel( |
141 | msg_receiver, |
142 | Some(rule), |
143 | conn, |
144 | )) |
145 | } |
146 | |
147 | /// The associated match rule, if any. |
148 | pub fn match_rule(&self) -> Option<MatchRule<'_>> { |
149 | self.inner.match_rule.as_deref().cloned() |
150 | } |
151 | |
152 | /// The maximum number of messages to queue for this stream. |
153 | pub fn max_queued(&self) -> usize { |
154 | self.inner.msg_receiver.capacity() |
155 | } |
156 | |
157 | /// Set maximum number of messages to queue for this stream. |
158 | /// |
159 | /// After this call, the capacity is guaranteed to be at least `max_queued`. |
160 | pub fn set_max_queued(&mut self, max_queued: usize) { |
161 | if max_queued <= self.max_queued() { |
162 | return; |
163 | } |
164 | self.inner.msg_receiver.set_capacity(max_queued); |
165 | } |
166 | |
167 | pub(crate) fn for_subscription_channel( |
168 | msg_receiver: ActiveReceiver<Result<Arc<Message>>>, |
169 | rule: Option<OwnedMatchRule>, |
170 | conn: &Connection, |
171 | ) -> Self { |
172 | let conn_inner = conn.inner.clone(); |
173 | |
174 | Self { |
175 | inner: Inner { |
176 | conn_inner, |
177 | msg_receiver, |
178 | match_rule: rule, |
179 | }, |
180 | } |
181 | } |
182 | } |
183 | |
184 | impl stream::Stream for MessageStream { |
185 | type Item = Result<Arc<Message>>; |
186 | |
187 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
188 | let this: &mut MessageStream = self.get_mut(); |
189 | |
190 | Pin::new(&mut this.inner.msg_receiver).poll_next(cx) |
191 | } |
192 | } |
193 | |
194 | impl OrderedStream for MessageStream { |
195 | type Data = Result<Arc<Message>>; |
196 | type Ordering = MessageSequence; |
197 | |
198 | fn poll_next_before( |
199 | self: Pin<&mut Self>, |
200 | cx: &mut Context<'_>, |
201 | before: Option<&Self::Ordering>, |
202 | ) -> Poll<PollResult<Self::Ordering, Self::Data>> { |
203 | let this = self.get_mut(); |
204 | |
205 | match stream::Stream::poll_next(Pin::new(this), cx) { |
206 | Poll::Pending if before.is_some() => { |
207 | // Assume the provided MessageSequence in before was obtained from a Message |
208 | // associated with our Connection (because that's the only supported use case). |
209 | // Because there is only one socket-reader task, any messages that would have been |
210 | // ordered before that message would have already been sitting in the broadcast |
211 | // queue (and we would have seen Ready in our poll). Because we didn't, we can |
212 | // guarantee that we won't ever produce a message whose sequence is before that |
213 | // provided value, and so we can return NoneBefore. |
214 | // |
215 | // This ensures that ordered_stream::Join will never return Pending while it |
216 | // has a message buffered. |
217 | Poll::Ready(PollResult::NoneBefore) |
218 | } |
219 | Poll::Pending => Poll::Pending, |
220 | Poll::Ready(Some(Ok(msg))) => Poll::Ready(PollResult::Item { |
221 | ordering: msg.recv_position(), |
222 | data: Ok(msg), |
223 | }), |
224 | Poll::Ready(Some(Err(e))) => Poll::Ready(PollResult::Item { |
225 | ordering: MessageSequence::LAST, |
226 | data: Err(e), |
227 | }), |
228 | Poll::Ready(None) => Poll::Ready(PollResult::Terminated), |
229 | } |
230 | } |
231 | } |
232 | |
233 | impl FusedStream for MessageStream { |
234 | fn is_terminated(&self) -> bool { |
235 | self.inner.msg_receiver.is_terminated() |
236 | } |
237 | } |
238 | |
239 | impl From<Connection> for MessageStream { |
240 | fn from(conn: Connection) -> Self { |
241 | let conn_inner: Arc = conn.inner; |
242 | let msg_receiver: Receiver, …>> = conn_inner.msg_receiver.activate_cloned(); |
243 | |
244 | Self { |
245 | inner: Inner { |
246 | conn_inner, |
247 | msg_receiver, |
248 | match_rule: None, |
249 | }, |
250 | } |
251 | } |
252 | } |
253 | |
254 | impl From<&Connection> for MessageStream { |
255 | fn from(conn: &Connection) -> Self { |
256 | Self::from(conn.clone()) |
257 | } |
258 | } |
259 | |
260 | impl From<MessageStream> for Connection { |
261 | fn from(stream: MessageStream) -> Connection { |
262 | Connection::from(&stream) |
263 | } |
264 | } |
265 | |
266 | impl From<&MessageStream> for Connection { |
267 | fn from(stream: &MessageStream) -> Connection { |
268 | Connection { |
269 | inner: stream.inner.conn_inner.clone(), |
270 | } |
271 | } |
272 | } |
273 | |
274 | #[derive (Clone, Debug)] |
275 | struct Inner { |
276 | conn_inner: Arc<ConnectionInner>, |
277 | msg_receiver: ActiveReceiver<Result<Arc<Message>>>, |
278 | match_rule: Option<OwnedMatchRule>, |
279 | } |
280 | |
281 | impl Drop for Inner { |
282 | fn drop(&mut self) { |
283 | let conn: Connection = Connection { |
284 | inner: self.conn_inner.clone(), |
285 | }; |
286 | |
287 | if let Some(rule: OwnedMatchRule) = self.match_rule.take() { |
288 | conn.queue_remove_match(rule); |
289 | } |
290 | } |
291 | } |
292 | |
293 | #[async_trait::async_trait ] |
294 | impl AsyncDrop for MessageStream { |
295 | async fn async_drop(mut self) { |
296 | let conn: Connection = Connection { |
297 | inner: self.inner.conn_inner.clone(), |
298 | }; |
299 | |
300 | if let Some(rule: OwnedMatchRule) = self.inner.match_rule.take() { |
301 | if let Err(e: Error) = conn.remove_match(rule).await { |
302 | warn!("Failed to remove match rule: {}" , e); |
303 | } |
304 | } |
305 | } |
306 | } |
307 | |