1 | use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender as Broadcaster}; |
2 | use enumflags2::BitFlags; |
3 | use event_listener::{Event, EventListener}; |
4 | use once_cell::sync::OnceCell; |
5 | use ordered_stream::{OrderedFuture, OrderedStream, PollResult}; |
6 | use static_assertions::assert_impl_all; |
7 | use std::{ |
8 | collections::HashMap, |
9 | convert::TryInto, |
10 | io::{self, ErrorKind}, |
11 | ops::Deref, |
12 | pin::Pin, |
13 | sync::{ |
14 | self, |
15 | atomic::{AtomicU32, Ordering::SeqCst}, |
16 | Arc, Weak, |
17 | }, |
18 | task::{Context, Poll}, |
19 | }; |
20 | use tracing::{debug, info_span, instrument, trace, trace_span, warn, Instrument}; |
21 | use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName}; |
22 | use zvariant::ObjectPath; |
23 | |
24 | use futures_core::{ready, Future}; |
25 | use futures_sink::Sink; |
26 | use futures_util::{sink::SinkExt, StreamExt}; |
27 | |
28 | use crate::{ |
29 | async_lock::Mutex, |
30 | blocking, |
31 | fdo::{self, ConnectionCredentials, RequestNameFlags, RequestNameReply}, |
32 | raw::{Connection as RawConnection, Socket}, |
33 | socket_reader::SocketReader, |
34 | Authenticated, CacheProperties, ConnectionBuilder, DBusError, Error, Executor, Guid, MatchRule, |
35 | Message, MessageBuilder, MessageFlags, MessageStream, MessageType, ObjectServer, |
36 | OwnedMatchRule, Result, Task, |
37 | }; |
38 | |
39 | const DEFAULT_MAX_QUEUED: usize = 64; |
40 | const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8; |
41 | |
42 | /// Inner state shared by Connection and WeakConnection |
43 | #[derive (Debug)] |
44 | pub(crate) struct ConnectionInner { |
45 | server_guid: Guid, |
46 | #[cfg (unix)] |
47 | cap_unix_fd: bool, |
48 | bus_conn: bool, |
49 | unique_name: OnceCell<OwnedUniqueName>, |
50 | registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>, |
51 | |
52 | raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>, |
53 | |
54 | // Serial number for next outgoing message |
55 | serial: AtomicU32, |
56 | |
57 | // Our executor |
58 | executor: Executor<'static>, |
59 | |
60 | // Socket reader task |
61 | #[allow (unused)] |
62 | socket_reader_task: OnceCell<Task<()>>, |
63 | |
64 | pub(crate) msg_receiver: InactiveReceiver<Result<Arc<Message>>>, |
65 | pub(crate) method_return_receiver: InactiveReceiver<Result<Arc<Message>>>, |
66 | msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>, |
67 | |
68 | subscriptions: Mutex<Subscriptions>, |
69 | |
70 | object_server: OnceCell<blocking::ObjectServer>, |
71 | object_server_dispatch_task: OnceCell<Task<()>>, |
72 | } |
73 | |
74 | type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Arc<Message>>>)>; |
75 | |
76 | pub(crate) type MsgBroadcaster = Broadcaster<Result<Arc<Message>>>; |
77 | |
78 | /// A D-Bus connection. |
79 | /// |
80 | /// A connection to a D-Bus bus, or a direct peer. |
81 | /// |
82 | /// Once created, the connection is authenticated and negotiated and messages can be sent or |
83 | /// received, such as [method calls] or [signals]. |
84 | /// |
85 | /// For higher-level message handling (typed functions, introspection, documentation reasons etc), |
86 | /// it is recommended to wrap the low-level D-Bus messages into Rust functions with the |
87 | /// [`dbus_proxy`] and [`dbus_interface`] macros instead of doing it directly on a `Connection`. |
88 | /// |
89 | /// Typically, a connection is made to the session bus with [`Connection::session`], or to the |
90 | /// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`] |
91 | /// instances or the on-demand [`ObjectServer`] instance that can be accessed through |
92 | /// [`Connection::object_server`]. |
93 | /// |
94 | /// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying |
95 | /// data is not cloned. This makes it very convenient to share the connection between different |
96 | /// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`] |
97 | /// so you can send and share a connection instance across threads as well. |
98 | /// |
99 | /// `Connection` keeps internal queues of incoming message. The default capacity of each of these is |
100 | /// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`] |
101 | /// method. When the queue is full, no more messages can be received until room is created for more. |
102 | /// This is why it's important to ensure that all [`crate::MessageStream`] and |
103 | /// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on, |
104 | /// respectively. |
105 | /// |
106 | /// For sending messages you can either use [`Connection::send_message`] method or make use of the |
107 | /// [`Sink`] implementation. For latter, you might find [`SinkExt`] API very useful. Keep in mind |
108 | /// that [`Connection`] will not manage the serial numbers (cookies) on the messages for you when |
109 | /// they are sent through the [`Sink`] implementation. You can manually assign unique serial numbers |
110 | /// to them using the [`Connection::assign_serial_num`] method before sending them off, if needed. |
111 | /// Having said that, the [`Sink`] is mainly useful for sending out signals, as they do not expect |
112 | /// a reply, and serial numbers are not very useful for signals either for the same reason. |
113 | /// |
114 | /// Since you do not need exclusive access to a `zbus::Connection` to send messages on the bus, |
115 | /// [`Sink`] is also implemented on `&Connection`. |
116 | /// |
117 | /// # Caveats |
118 | /// |
119 | /// At the moment, a simultaneous [flush request] from multiple tasks/threads could |
120 | /// potentially create a busy loop, thus wasting CPU time. This limitation may be removed in the |
121 | /// future. |
122 | /// |
123 | /// [flush request]: https://docs.rs/futures/0.3.15/futures/sink/trait.SinkExt.html#method.flush |
124 | /// |
125 | /// [method calls]: struct.Connection.html#method.call_method |
126 | /// [signals]: struct.Connection.html#method.emit_signal |
127 | /// [`dbus_proxy`]: attr.dbus_proxy.html |
128 | /// [`dbus_interface`]: attr.dbus_interface.html |
129 | /// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html |
130 | /// [`set_max_queued`]: struct.Connection.html#method.set_max_queued |
131 | /// |
132 | /// ### Examples |
133 | /// |
134 | /// #### Get the session bus ID |
135 | /// |
136 | /// ``` |
137 | /// # zbus::block_on(async { |
138 | /// use zbus::Connection; |
139 | /// |
140 | /// let connection = Connection::session().await?; |
141 | /// |
142 | /// let reply = connection |
143 | /// .call_method( |
144 | /// Some("org.freedesktop.DBus" ), |
145 | /// "/org/freedesktop/DBus" , |
146 | /// Some("org.freedesktop.DBus" ), |
147 | /// "GetId" , |
148 | /// &(), |
149 | /// ) |
150 | /// .await?; |
151 | /// |
152 | /// let id: &str = reply.body()?; |
153 | /// println!("Unique ID of the bus: {}" , id); |
154 | /// # Ok::<(), zbus::Error>(()) |
155 | /// # }).unwrap(); |
156 | /// ``` |
157 | /// |
158 | /// #### Monitoring all messages |
159 | /// |
160 | /// Let's eavesdrop on the session bus 😈 using the [Monitor] interface: |
161 | /// |
162 | /// ```rust,no_run |
163 | /// # zbus::block_on(async { |
164 | /// use futures_util::stream::TryStreamExt; |
165 | /// use zbus::{Connection, MessageStream}; |
166 | /// |
167 | /// let connection = Connection::session().await?; |
168 | /// |
169 | /// connection |
170 | /// .call_method( |
171 | /// Some("org.freedesktop.DBus" ), |
172 | /// "/org/freedesktop/DBus" , |
173 | /// Some("org.freedesktop.DBus.Monitoring" ), |
174 | /// "BecomeMonitor" , |
175 | /// &(&[] as &[&str], 0u32), |
176 | /// ) |
177 | /// .await?; |
178 | /// |
179 | /// let mut stream = MessageStream::from(connection); |
180 | /// while let Some(msg) = stream.try_next().await? { |
181 | /// println!("Got message: {}" , msg); |
182 | /// } |
183 | /// |
184 | /// # Ok::<(), zbus::Error>(()) |
185 | /// # }).unwrap(); |
186 | /// ``` |
187 | /// |
188 | /// This should print something like: |
189 | /// |
190 | /// ```console |
191 | /// Got message: Signal NameAcquired from org.freedesktop.DBus |
192 | /// Got message: Signal NameLost from org.freedesktop.DBus |
193 | /// Got message: Method call GetConnectionUnixProcessID from :1.1324 |
194 | /// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner: |
195 | /// Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus |
196 | /// Got message: Method call AddMatch from :1.918 |
197 | /// Got message: Method return from org.freedesktop.DBus |
198 | /// ``` |
199 | /// |
200 | /// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor |
201 | #[derive (Clone, Debug)] |
202 | #[must_use = "Dropping a `Connection` will close the underlying socket." ] |
203 | pub struct Connection { |
204 | pub(crate) inner: Arc<ConnectionInner>, |
205 | } |
206 | |
207 | assert_impl_all!(Connection: Send, Sync, Unpin); |
208 | |
209 | /// A method call whose completion can be awaited or joined with other streams. |
210 | /// |
211 | /// This is useful for cache population method calls, where joining the [`JoinableStream`] with |
212 | /// an update signal stream can be used to ensure that cache updates are not overwritten by a cache |
213 | /// population whose task is scheduled later. |
214 | #[derive (Debug)] |
215 | pub(crate) struct PendingMethodCall { |
216 | stream: Option<MessageStream>, |
217 | serial: u32, |
218 | } |
219 | |
220 | impl Future for PendingMethodCall { |
221 | type Output = Result<Arc<Message>>; |
222 | |
223 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
224 | self.poll_before(cx, before:None).map(|ret: Option<(MessageSequence, …)>| { |
225 | ret.map(|(_, r: Result, Error>)| r).unwrap_or_else(|| { |
226 | Err(crate::Error::InputOutput( |
227 | io::Error::new(kind:ErrorKind::BrokenPipe, error:"socket closed" ).into(), |
228 | )) |
229 | }) |
230 | }) |
231 | } |
232 | } |
233 | |
234 | impl OrderedFuture for PendingMethodCall { |
235 | type Output = Result<Arc<Message>>; |
236 | type Ordering = zbus::MessageSequence; |
237 | |
238 | fn poll_before( |
239 | self: Pin<&mut Self>, |
240 | cx: &mut Context<'_>, |
241 | before: Option<&Self::Ordering>, |
242 | ) -> Poll<Option<(Self::Ordering, Self::Output)>> { |
243 | let this = self.get_mut(); |
244 | if let Some(stream) = &mut this.stream { |
245 | loop { |
246 | match Pin::new(&mut *stream).poll_next_before(cx, before) { |
247 | Poll::Ready(PollResult::Item { |
248 | data: Ok(msg), |
249 | ordering, |
250 | }) => { |
251 | if msg.reply_serial() != Some(this.serial) { |
252 | continue; |
253 | } |
254 | let res = match msg.message_type() { |
255 | MessageType::Error => Err(msg.into()), |
256 | MessageType::MethodReturn => Ok(msg), |
257 | _ => continue, |
258 | }; |
259 | this.stream = None; |
260 | return Poll::Ready(Some((ordering, res))); |
261 | } |
262 | Poll::Ready(PollResult::Item { |
263 | data: Err(e), |
264 | ordering, |
265 | }) => { |
266 | return Poll::Ready(Some((ordering, Err(e)))); |
267 | } |
268 | |
269 | Poll::Ready(PollResult::NoneBefore) => { |
270 | return Poll::Ready(None); |
271 | } |
272 | Poll::Ready(PollResult::Terminated) => { |
273 | return Poll::Ready(None); |
274 | } |
275 | Poll::Pending => return Poll::Pending, |
276 | } |
277 | } |
278 | } |
279 | Poll::Ready(None) |
280 | } |
281 | } |
282 | |
283 | impl Connection { |
284 | /// Send `msg` to the peer. |
285 | /// |
286 | /// Unlike our [`Sink`] implementation, this method sets a unique (to this connection) serial |
287 | /// number on the message before sending it off, for you. |
288 | /// |
289 | /// On successfully sending off `msg`, the assigned serial number is returned. |
290 | pub async fn send_message(&self, mut msg: Message) -> Result<u32> { |
291 | let serial = self.assign_serial_num(&mut msg)?; |
292 | |
293 | trace!("Sending message: {:?}" , msg); |
294 | (&mut &*self).send(msg).await?; |
295 | trace!("Sent message with serial: {}" , serial); |
296 | |
297 | Ok(serial) |
298 | } |
299 | |
300 | /// Send a method call. |
301 | /// |
302 | /// Create a method-call message, send it over the connection, then wait for the reply. |
303 | /// |
304 | /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus |
305 | /// error replies are returned as [`Error::MethodError`]. |
306 | pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>( |
307 | &self, |
308 | destination: Option<D>, |
309 | path: P, |
310 | interface: Option<I>, |
311 | method_name: M, |
312 | body: &B, |
313 | ) -> Result<Arc<Message>> |
314 | where |
315 | D: TryInto<BusName<'d>>, |
316 | P: TryInto<ObjectPath<'p>>, |
317 | I: TryInto<InterfaceName<'i>>, |
318 | M: TryInto<MemberName<'m>>, |
319 | D::Error: Into<Error>, |
320 | P::Error: Into<Error>, |
321 | I::Error: Into<Error>, |
322 | M::Error: Into<Error>, |
323 | B: serde::ser::Serialize + zvariant::DynamicType, |
324 | { |
325 | self.call_method_raw( |
326 | destination, |
327 | path, |
328 | interface, |
329 | method_name, |
330 | BitFlags::empty(), |
331 | body, |
332 | ) |
333 | .await? |
334 | .expect("no reply" ) |
335 | .await |
336 | } |
337 | |
338 | /// Send a method call. |
339 | /// |
340 | /// Send the given message, which must be a method call, over the connection and return an |
341 | /// object that allows the reply to be retrieved. Typically you'd want to use |
342 | /// [`Connection::call_method`] instead. |
343 | /// |
344 | /// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is |
345 | /// guaranteed to be `Ok(Some(_))`, if there was no error encountered. |
346 | /// |
347 | /// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`. |
348 | pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>( |
349 | &self, |
350 | destination: Option<D>, |
351 | path: P, |
352 | interface: Option<I>, |
353 | method_name: M, |
354 | flags: BitFlags<MessageFlags>, |
355 | body: &B, |
356 | ) -> Result<Option<PendingMethodCall>> |
357 | where |
358 | D: TryInto<BusName<'d>>, |
359 | P: TryInto<ObjectPath<'p>>, |
360 | I: TryInto<InterfaceName<'i>>, |
361 | M: TryInto<MemberName<'m>>, |
362 | D::Error: Into<Error>, |
363 | P::Error: Into<Error>, |
364 | I::Error: Into<Error>, |
365 | M::Error: Into<Error>, |
366 | B: serde::ser::Serialize + zvariant::DynamicType, |
367 | { |
368 | let mut builder = MessageBuilder::method_call(path, method_name)?; |
369 | if let Some(sender) = self.unique_name() { |
370 | builder = builder.sender(sender)? |
371 | } |
372 | if let Some(destination) = destination { |
373 | builder = builder.destination(destination)? |
374 | } |
375 | if let Some(interface) = interface { |
376 | builder = builder.interface(interface)? |
377 | } |
378 | for flag in flags { |
379 | builder = builder.with_flags(flag)?; |
380 | } |
381 | let msg = builder.build(body)?; |
382 | |
383 | let msg_receiver = self.inner.method_return_receiver.activate_cloned(); |
384 | let stream = Some(MessageStream::for_subscription_channel( |
385 | msg_receiver, |
386 | // This is a lie but we only use the stream internally so it's fine. |
387 | None, |
388 | self, |
389 | )); |
390 | let serial = self.send_message(msg).await?; |
391 | if flags.contains(MessageFlags::NoReplyExpected) { |
392 | Ok(None) |
393 | } else { |
394 | Ok(Some(PendingMethodCall { stream, serial })) |
395 | } |
396 | } |
397 | |
398 | /// Emit a signal. |
399 | /// |
400 | /// Create a signal message, and send it over the connection. |
401 | pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>( |
402 | &self, |
403 | destination: Option<D>, |
404 | path: P, |
405 | interface: I, |
406 | signal_name: M, |
407 | body: &B, |
408 | ) -> Result<()> |
409 | where |
410 | D: TryInto<BusName<'d>>, |
411 | P: TryInto<ObjectPath<'p>>, |
412 | I: TryInto<InterfaceName<'i>>, |
413 | M: TryInto<MemberName<'m>>, |
414 | D::Error: Into<Error>, |
415 | P::Error: Into<Error>, |
416 | I::Error: Into<Error>, |
417 | M::Error: Into<Error>, |
418 | B: serde::ser::Serialize + zvariant::DynamicType, |
419 | { |
420 | let m = Message::signal( |
421 | self.unique_name(), |
422 | destination, |
423 | path, |
424 | interface, |
425 | signal_name, |
426 | body, |
427 | )?; |
428 | |
429 | self.send_message(m).await.map(|_| ()) |
430 | } |
431 | |
432 | /// Reply to a message. |
433 | /// |
434 | /// Given an existing message (likely a method call), send a reply back to the caller with the |
435 | /// given `body`. |
436 | /// |
437 | /// Returns the message serial number. |
438 | pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32> |
439 | where |
440 | B: serde::ser::Serialize + zvariant::DynamicType, |
441 | { |
442 | let m = Message::method_reply(self.unique_name(), call, body)?; |
443 | self.send_message(m).await |
444 | } |
445 | |
446 | /// Reply an error to a message. |
447 | /// |
448 | /// Given an existing message (likely a method call), send an error reply back to the caller |
449 | /// with the given `error_name` and `body`. |
450 | /// |
451 | /// Returns the message serial number. |
452 | pub async fn reply_error<'e, E, B>( |
453 | &self, |
454 | call: &Message, |
455 | error_name: E, |
456 | body: &B, |
457 | ) -> Result<u32> |
458 | where |
459 | B: serde::ser::Serialize + zvariant::DynamicType, |
460 | E: TryInto<ErrorName<'e>>, |
461 | E::Error: Into<Error>, |
462 | { |
463 | let m = Message::method_error(self.unique_name(), call, error_name, body)?; |
464 | self.send_message(m).await |
465 | } |
466 | |
467 | /// Reply an error to a message. |
468 | /// |
469 | /// Given an existing message (likely a method call), send an error reply back to the caller |
470 | /// using one of the standard interface reply types. |
471 | /// |
472 | /// Returns the message serial number. |
473 | pub async fn reply_dbus_error( |
474 | &self, |
475 | call: &zbus::MessageHeader<'_>, |
476 | err: impl DBusError, |
477 | ) -> Result<u32> { |
478 | let m = err.create_reply(call); |
479 | self.send_message(m?).await |
480 | } |
481 | |
482 | /// Register a well-known name for this connection. |
483 | /// |
484 | /// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the |
485 | /// name (if requested) is used of self-identification. |
486 | /// |
487 | /// You can request multiple names for the same connection. Use [`Connection::release_name`] for |
488 | /// deregistering names registered through this method. |
489 | /// |
490 | /// Note that exclusive ownership without queueing is requested (using |
491 | /// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that |
492 | /// is the most typical case. If that is not what you want, you should use |
493 | /// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested |
494 | /// **after** you've setup your service implementation with the `ObjectServer`). |
495 | /// |
496 | /// # Caveats |
497 | /// |
498 | /// The associated `ObjectServer` will only handle method calls destined for the unique name of |
499 | /// this connection or any of the registered well-known names. If no well-known name is |
500 | /// registered, the method calls destined to all well-known names will be handled. |
501 | /// |
502 | /// Since names registered through any other means than `Connection` or [`ConnectionBuilder`] |
503 | /// API are not known to the connection, method calls destined to those names will only be |
504 | /// handled by the associated `ObjectServer` if none of the names are registered through |
505 | /// `Connection*` API. Simply put, either register all the names through `Connection*` API or |
506 | /// none of them. |
507 | /// |
508 | /// # Errors |
509 | /// |
510 | /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer. |
511 | pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()> |
512 | where |
513 | W: TryInto<WellKnownName<'w>>, |
514 | W::Error: Into<Error>, |
515 | { |
516 | self.request_name_with_flags( |
517 | well_known_name, |
518 | RequestNameFlags::ReplaceExisting | RequestNameFlags::DoNotQueue, |
519 | ) |
520 | .await |
521 | .map(|_| ()) |
522 | } |
523 | |
524 | /// Register a well-known name for this connection. |
525 | /// |
526 | /// This is the same as [`Connection::request_name`] but allows to specify the flags to use when |
527 | /// requesting the name. |
528 | /// |
529 | /// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the |
530 | /// queue, you can use [`fdo::NameAcquiredStream`] to be notified when the name is acquired. A |
531 | /// queued name request can be cancelled using [`Connection::release_name`]. |
532 | /// |
533 | /// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be |
534 | /// lost if another peer requests the same name. You can use [`fdo::NameLostStream`] to be |
535 | /// notified when the name is lost |
536 | /// |
537 | /// # Example |
538 | /// |
539 | /// ``` |
540 | /// # |
541 | /// # zbus::block_on(async { |
542 | /// use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}}; |
543 | /// use enumflags2::BitFlags; |
544 | /// use futures_util::stream::StreamExt; |
545 | /// |
546 | /// let name = "org.freedesktop.zbus.QueuedNameTest" ; |
547 | /// let conn1 = Connection::session().await?; |
548 | /// // This should just work right away. |
549 | /// conn1.request_name(name).await?; |
550 | /// |
551 | /// let conn2 = Connection::session().await?; |
552 | /// // A second request from the another connection will fail with `DoNotQueue` flag, which is |
553 | /// // implicit with `request_name` method. |
554 | /// assert!(conn2.request_name(name).await.is_err()); |
555 | /// |
556 | /// // Now let's try w/o `DoNotQueue` and we should be queued. |
557 | /// let reply = conn2 |
558 | /// .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into()) |
559 | /// .await?; |
560 | /// assert_eq!(reply, RequestNameReply::InQueue); |
561 | /// // Another request should just give us the same response. |
562 | /// let reply = conn2 |
563 | /// // The flags on subsequent requests will however be ignored. |
564 | /// .request_name_with_flags(name, BitFlags::empty()) |
565 | /// .await?; |
566 | /// assert_eq!(reply, RequestNameReply::InQueue); |
567 | /// let mut acquired_stream = DBusProxy::new(&conn2) |
568 | /// .await? |
569 | /// .receive_name_acquired() |
570 | /// .await?; |
571 | /// assert!(conn1.release_name(name).await?); |
572 | /// // This would have waited forever if `conn1` hadn't just release the name. |
573 | /// let acquired = acquired_stream.next().await.unwrap(); |
574 | /// assert_eq!(acquired.args().unwrap().name, name); |
575 | /// |
576 | /// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be |
577 | /// // able to take it back. |
578 | /// let mut lost_stream = DBusProxy::new(&conn2) |
579 | /// .await? |
580 | /// .receive_name_lost() |
581 | /// .await?; |
582 | /// conn1.request_name(name).await?; |
583 | /// let lost = lost_stream.next().await.unwrap(); |
584 | /// assert_eq!(lost.args().unwrap().name, name); |
585 | /// |
586 | /// # Ok::<(), zbus::Error>(()) |
587 | /// # }).unwrap(); |
588 | /// ``` |
589 | /// |
590 | /// # Caveats |
591 | /// |
592 | /// * Same as that of [`Connection::request_name`]. |
593 | /// * If you wish to track changes to name ownership after this call, make sure that the |
594 | /// [`fdo::NameAcquired`] and/or [`fdo::NameLostStream`] instance(s) are created **before** |
595 | /// calling this method. Otherwise, you may loose the signal if it's emitted after this call but |
596 | /// just before the stream instance get created. |
597 | pub async fn request_name_with_flags<'w, W>( |
598 | &self, |
599 | well_known_name: W, |
600 | flags: BitFlags<RequestNameFlags>, |
601 | ) -> Result<RequestNameReply> |
602 | where |
603 | W: TryInto<WellKnownName<'w>>, |
604 | W::Error: Into<Error>, |
605 | { |
606 | let well_known_name = well_known_name.try_into().map_err(Into::into)?; |
607 | // We keep the lock until the end of this function so that the (possibly) spawned task |
608 | // doesn't end up accessing the name entry before it's inserted. |
609 | let mut names = self.inner.registered_names.lock().await; |
610 | |
611 | match names.get(&well_known_name) { |
612 | Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner), |
613 | Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue), |
614 | None => (), |
615 | } |
616 | |
617 | if !self.is_bus() { |
618 | names.insert(well_known_name.to_owned(), NameStatus::Owner(None)); |
619 | |
620 | return Ok(RequestNameReply::PrimaryOwner); |
621 | } |
622 | |
623 | let dbus_proxy = fdo::DBusProxy::builder(self) |
624 | .cache_properties(CacheProperties::No) |
625 | .build() |
626 | .await?; |
627 | let mut acquired_stream = dbus_proxy.receive_name_acquired().await?; |
628 | let mut lost_stream = dbus_proxy.receive_name_lost().await?; |
629 | let reply = dbus_proxy |
630 | .request_name(well_known_name.clone(), flags) |
631 | .await?; |
632 | let lost_task_name = format!("monitor name {well_known_name} lost" ); |
633 | let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) { |
634 | let weak_conn = WeakConnection::from(self); |
635 | let well_known_name = well_known_name.to_owned(); |
636 | Some( |
637 | async move { |
638 | loop { |
639 | let signal = lost_stream.next().await; |
640 | let inner = match weak_conn.upgrade() { |
641 | Some(conn) => conn.inner.clone(), |
642 | None => break, |
643 | }; |
644 | |
645 | match signal { |
646 | Some(signal) => match signal.args() { |
647 | Ok(args) if args.name == well_known_name => { |
648 | tracing::info!( |
649 | "Connection ` {}` lost name ` {}`" , |
650 | // SAFETY: This is bus connection so unique name can't be |
651 | // None. |
652 | inner.unique_name.get().unwrap(), |
653 | well_known_name |
654 | ); |
655 | inner.registered_names.lock().await.remove(&well_known_name); |
656 | |
657 | break; |
658 | } |
659 | Ok(_) => (), |
660 | Err(e) => warn!("Failed to parse `NameLost` signal: {}" , e), |
661 | }, |
662 | None => { |
663 | trace!("`NameLost` signal stream closed" ); |
664 | // This is a very strange state we end up in. Now the name is |
665 | // question remains in the queue |
666 | // forever. Maybe we can do better here but I |
667 | // think it's a very unlikely scenario anyway. |
668 | // |
669 | // Can happen if the connection is lost/dropped but then the whole |
670 | // `Connection` instance will go away soon anyway and hence this |
671 | // strange state along with it. |
672 | break; |
673 | } |
674 | } |
675 | } |
676 | } |
677 | .instrument(info_span!("{}" , lost_task_name)), |
678 | ) |
679 | } else { |
680 | None |
681 | }; |
682 | let status = match reply { |
683 | RequestNameReply::InQueue => { |
684 | let weak_conn = WeakConnection::from(self); |
685 | let well_known_name = well_known_name.to_owned(); |
686 | let task_name = format!("monitor name {well_known_name} acquired" ); |
687 | let task = self.executor().spawn( |
688 | async move { |
689 | loop { |
690 | let signal = acquired_stream.next().await; |
691 | let inner = match weak_conn.upgrade() { |
692 | Some(conn) => conn.inner.clone(), |
693 | None => break, |
694 | }; |
695 | match signal { |
696 | Some(signal) => match signal.args() { |
697 | Ok(args) if args.name == well_known_name => { |
698 | let mut names = inner.registered_names.lock().await; |
699 | if let Some(status) = names.get_mut(&well_known_name) { |
700 | let task = name_lost_fut.map(|fut| { |
701 | inner.executor.spawn(fut, &lost_task_name) |
702 | }); |
703 | *status = NameStatus::Owner(task); |
704 | |
705 | break; |
706 | } |
707 | // else the name was released in the meantime. :shrug: |
708 | } |
709 | Ok(_) => (), |
710 | Err(e) => warn!("Failed to parse `NameAcquired` signal: {}" , e), |
711 | }, |
712 | None => { |
713 | trace!("`NameAcquired` signal stream closed" ); |
714 | // See comment above for similar state in case of `NameLost` |
715 | // stream. |
716 | break; |
717 | } |
718 | } |
719 | } |
720 | } |
721 | .instrument(info_span!("{}" , task_name)), |
722 | &task_name, |
723 | ); |
724 | |
725 | NameStatus::Queued(task) |
726 | } |
727 | RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => { |
728 | let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name)); |
729 | |
730 | NameStatus::Owner(task) |
731 | } |
732 | RequestNameReply::Exists => return Err(Error::NameTaken), |
733 | }; |
734 | |
735 | names.insert(well_known_name.to_owned(), status); |
736 | |
737 | Ok(reply) |
738 | } |
739 | |
740 | /// Deregister a previously registered well-known name for this service on the bus. |
741 | /// |
742 | /// Use this method to deregister a well-known name, registered through |
743 | /// [`Connection::request_name`]. |
744 | /// |
745 | /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with |
746 | /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name |
747 | /// was not previously registered or already deregistered. |
748 | pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool> |
749 | where |
750 | W: TryInto<WellKnownName<'w>>, |
751 | W::Error: Into<Error>, |
752 | { |
753 | let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?; |
754 | let mut names = self.inner.registered_names.lock().await; |
755 | // FIXME: Should be possible to avoid cloning/allocation here |
756 | if names.remove(&well_known_name.to_owned()).is_none() { |
757 | return Ok(false); |
758 | }; |
759 | |
760 | if !self.is_bus() { |
761 | return Ok(true); |
762 | } |
763 | |
764 | fdo::DBusProxy::builder(self) |
765 | .cache_properties(CacheProperties::No) |
766 | .build() |
767 | .await? |
768 | .release_name(well_known_name) |
769 | .await |
770 | .map(|_| true) |
771 | .map_err(Into::into) |
772 | } |
773 | |
774 | /// Checks if `self` is a connection to a message bus. |
775 | /// |
776 | /// This will return `false` for p2p connections. |
777 | pub fn is_bus(&self) -> bool { |
778 | self.inner.bus_conn |
779 | } |
780 | |
781 | /// Assigns a serial number to `msg` that is unique to this connection. |
782 | /// |
783 | /// This method can fail if `msg` is corrupted. |
784 | pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32> { |
785 | let mut serial = 0; |
786 | msg.modify_primary_header(|primary| { |
787 | serial = *primary.serial_num_or_init(|| self.next_serial()); |
788 | Ok(()) |
789 | })?; |
790 | |
791 | Ok(serial) |
792 | } |
793 | |
794 | /// The unique name of the connection, if set/applicable. |
795 | /// |
796 | /// The unique name is assigned by the message bus or set manually using |
797 | /// [`Connection::set_unique_name`]. |
798 | pub fn unique_name(&self) -> Option<&OwnedUniqueName> { |
799 | self.inner.unique_name.get() |
800 | } |
801 | |
802 | /// Sets the unique name of the connection (if not already set). |
803 | /// |
804 | /// # Panics |
805 | /// |
806 | /// This method panics if the unique name is already set. It will always panic if the connection |
807 | /// is to a message bus as it's the bus that assigns peers their unique names. This is mainly |
808 | /// provided for bus implementations. All other users should not need to use this method. |
809 | pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()> |
810 | where |
811 | U: TryInto<OwnedUniqueName>, |
812 | U::Error: Into<Error>, |
813 | { |
814 | let name = unique_name.try_into().map_err(Into::into)?; |
815 | self.inner |
816 | .unique_name |
817 | .set(name) |
818 | .expect("unique name already set" ); |
819 | |
820 | Ok(()) |
821 | } |
822 | |
823 | /// The capacity of the main (unfiltered) queue. |
824 | pub fn max_queued(&self) -> usize { |
825 | self.inner.msg_receiver.capacity() |
826 | } |
827 | |
828 | /// Set the capacity of the main (unfiltered) queue. |
829 | pub fn set_max_queued(&mut self, max: usize) { |
830 | self.inner.msg_receiver.clone().set_capacity(max); |
831 | } |
832 | |
833 | /// The server's GUID. |
834 | pub fn server_guid(&self) -> &str { |
835 | self.inner.server_guid.as_str() |
836 | } |
837 | |
838 | /// The underlying executor. |
839 | /// |
840 | /// When a connection is built with internal_executor set to false, zbus will not spawn a |
841 | /// thread to run the executor. You're responsible to continuously [tick the executor][tte]. |
842 | /// Failure to do so will result in hangs. |
843 | /// |
844 | /// # Examples |
845 | /// |
846 | /// Here is how one would typically run the zbus executor through async-std's single-threaded |
847 | /// scheduler: |
848 | /// |
849 | /// ``` |
850 | /// # // Disable on windows because somehow it triggers a stack overflow there: |
851 | /// # // https://gitlab.freedesktop.org/zeenix/zbus/-/jobs/34023494 |
852 | /// # #[cfg (all(not(feature = "tokio" ), not(target_os = "windows" )))] |
853 | /// # { |
854 | /// use zbus::ConnectionBuilder; |
855 | /// use async_std::task::{block_on, spawn}; |
856 | /// |
857 | /// # struct SomeIface; |
858 | /// # |
859 | /// # #[zbus::dbus_interface] |
860 | /// # impl SomeIface { |
861 | /// # } |
862 | /// # |
863 | /// block_on(async { |
864 | /// let conn = ConnectionBuilder::session() |
865 | /// .unwrap() |
866 | /// .internal_executor(false) |
867 | /// # // This is only for testing a deadlock that used to happen with this combo. |
868 | /// # .serve_at("/some/iface" , SomeIface) |
869 | /// # .unwrap() |
870 | /// .build() |
871 | /// .await |
872 | /// .unwrap(); |
873 | /// { |
874 | /// let conn = conn.clone(); |
875 | /// spawn(async move { |
876 | /// loop { |
877 | /// conn.executor().tick().await; |
878 | /// } |
879 | /// }); |
880 | /// } |
881 | /// |
882 | /// // All your other async code goes here. |
883 | /// }); |
884 | /// # } |
885 | /// ``` |
886 | /// |
887 | /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use |
888 | /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable |
889 | /// `tokio` feature. You should also disable the (default) `async-io` feature in your |
890 | /// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling |
891 | /// `async-io` was required to enable tight `tokio` integration. |
892 | /// |
893 | /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick |
894 | pub fn executor(&self) -> &Executor<'static> { |
895 | &self.inner.executor |
896 | } |
897 | |
898 | /// Get a reference to the associated [`ObjectServer`]. |
899 | /// |
900 | /// The `ObjectServer` is created on-demand. |
901 | /// |
902 | /// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls |
903 | /// received on `self`. If you want to manually reply to method calls, do not use this |
904 | /// method (or any of the `ObjectServer` related API). |
905 | pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ { |
906 | // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for |
907 | // crate::ObjectServer instead of this wrapper? |
908 | struct Wrapper<'a>(&'a blocking::ObjectServer); |
909 | impl<'a> Deref for Wrapper<'a> { |
910 | type Target = ObjectServer; |
911 | |
912 | fn deref(&self) -> &Self::Target { |
913 | self.0.inner() |
914 | } |
915 | } |
916 | |
917 | Wrapper(self.sync_object_server(true, None)) |
918 | } |
919 | |
920 | pub(crate) fn sync_object_server( |
921 | &self, |
922 | start: bool, |
923 | started_event: Option<Event>, |
924 | ) -> &blocking::ObjectServer { |
925 | self.inner |
926 | .object_server |
927 | .get_or_init(move || self.setup_object_server(start, started_event)) |
928 | } |
929 | |
930 | fn setup_object_server( |
931 | &self, |
932 | start: bool, |
933 | started_event: Option<Event>, |
934 | ) -> blocking::ObjectServer { |
935 | if start { |
936 | self.start_object_server(started_event); |
937 | } |
938 | |
939 | blocking::ObjectServer::new(self) |
940 | } |
941 | |
942 | #[instrument (skip(self))] |
943 | pub(crate) fn start_object_server(&self, started_event: Option<Event>) { |
944 | self.inner.object_server_dispatch_task.get_or_init(|| { |
945 | trace!("starting ObjectServer task" ); |
946 | let weak_conn = WeakConnection::from(self); |
947 | |
948 | let obj_server_task_name = "ObjectServer task" ; |
949 | self.inner.executor.spawn( |
950 | async move { |
951 | let mut stream = match weak_conn.upgrade() { |
952 | Some(conn) => { |
953 | let mut builder = MatchRule::builder().msg_type(MessageType::MethodCall); |
954 | if let Some(unique_name) = conn.unique_name() { |
955 | builder = builder.destination(&**unique_name).expect("unique name" ); |
956 | } |
957 | let rule = builder.build(); |
958 | match conn.add_match(rule.into(), None).await { |
959 | Ok(stream) => stream, |
960 | Err(e) => { |
961 | // Very unlikely but can happen I guess if connection is closed. |
962 | debug!("Failed to create message stream: {}" , e); |
963 | |
964 | return; |
965 | } |
966 | } |
967 | } |
968 | None => { |
969 | trace!("Connection is gone, stopping associated object server task" ); |
970 | |
971 | return; |
972 | } |
973 | }; |
974 | if let Some(started_event) = started_event { |
975 | started_event.notify(1); |
976 | } |
977 | |
978 | trace!("waiting for incoming method call messages.." ); |
979 | while let Some(msg) = stream.next().await.and_then(|m| { |
980 | if let Err(e) = &m { |
981 | debug!("Error while reading from object server stream: {:?}" , e); |
982 | } |
983 | m.ok() |
984 | }) { |
985 | if let Some(conn) = weak_conn.upgrade() { |
986 | let hdr = match msg.header() { |
987 | Ok(hdr) => hdr, |
988 | Err(e) => { |
989 | warn!("Failed to parse header: {}" , e); |
990 | |
991 | continue; |
992 | } |
993 | }; |
994 | match hdr.destination() { |
995 | // Unique name is already checked by the match rule. |
996 | Ok(Some(BusName::Unique(_))) | Ok(None) => (), |
997 | Ok(Some(BusName::WellKnown(dest))) => { |
998 | let names = conn.inner.registered_names.lock().await; |
999 | // destination doesn't matter if no name has been registered |
1000 | // (probably means name it's registered through external means). |
1001 | if !names.is_empty() && !names.contains_key(dest) { |
1002 | trace!("Got a method call for a different destination: {}" , dest); |
1003 | |
1004 | continue; |
1005 | } |
1006 | } |
1007 | Err(e) => { |
1008 | warn!("Failed to parse destination: {}" , e); |
1009 | |
1010 | continue; |
1011 | } |
1012 | } |
1013 | let member = match msg.member() { |
1014 | Some(member) => member, |
1015 | None => { |
1016 | warn!("Got a method call with no `MEMBER` field: {}" , msg); |
1017 | |
1018 | continue; |
1019 | } |
1020 | }; |
1021 | trace!("Got `{}`. Will spawn a task for dispatch.." , msg); |
1022 | let executor = conn.inner.executor.clone(); |
1023 | let task_name = format!("`{member}` method dispatcher" ); |
1024 | executor |
1025 | .spawn( |
1026 | async move { |
1027 | trace!("spawned a task to dispatch `{}`." , msg); |
1028 | let server = conn.object_server(); |
1029 | if let Err(e) = server.dispatch_message(&msg).await { |
1030 | debug!( |
1031 | "Error dispatching message. Message: {:?}, error: {:?}" , |
1032 | msg, e |
1033 | ); |
1034 | } |
1035 | } |
1036 | .instrument(trace_span!("{}" , task_name)), |
1037 | &task_name, |
1038 | ) |
1039 | .detach(); |
1040 | } else { |
1041 | // If connection is completely gone, no reason to keep running the task anymore. |
1042 | trace!("Connection is gone, stopping associated object server task" ); |
1043 | break; |
1044 | } |
1045 | } |
1046 | } |
1047 | .instrument(info_span!("{}" , obj_server_task_name)), |
1048 | obj_server_task_name, |
1049 | ) |
1050 | }); |
1051 | } |
1052 | |
1053 | pub(crate) async fn add_match( |
1054 | &self, |
1055 | rule: OwnedMatchRule, |
1056 | max_queued: Option<usize>, |
1057 | ) -> Result<Receiver<Result<Arc<Message>>>> { |
1058 | use std::collections::hash_map::Entry; |
1059 | |
1060 | if self.inner.msg_senders.lock().await.is_empty() { |
1061 | // This only happens if socket reader task has errored out. |
1062 | return Err(Error::InputOutput(Arc::new(io::Error::new( |
1063 | io::ErrorKind::BrokenPipe, |
1064 | "Socket reader task has errored out" , |
1065 | )))); |
1066 | } |
1067 | |
1068 | let mut subscriptions = self.inner.subscriptions.lock().await; |
1069 | let msg_type = rule.msg_type().unwrap_or(MessageType::Signal); |
1070 | match subscriptions.entry(rule.clone()) { |
1071 | Entry::Vacant(e) => { |
1072 | let max_queued = max_queued.unwrap_or(DEFAULT_MAX_QUEUED); |
1073 | let (sender, mut receiver) = broadcast(max_queued); |
1074 | receiver.set_await_active(false); |
1075 | if self.is_bus() && msg_type == MessageType::Signal { |
1076 | fdo::DBusProxy::builder(self) |
1077 | .cache_properties(CacheProperties::No) |
1078 | .build() |
1079 | .await? |
1080 | .add_match_rule(e.key().inner().clone()) |
1081 | .await?; |
1082 | } |
1083 | e.insert((1, receiver.clone().deactivate())); |
1084 | self.inner |
1085 | .msg_senders |
1086 | .lock() |
1087 | .await |
1088 | .insert(Some(rule), sender); |
1089 | |
1090 | Ok(receiver) |
1091 | } |
1092 | Entry::Occupied(mut e) => { |
1093 | let (num_subscriptions, receiver) = e.get_mut(); |
1094 | *num_subscriptions += 1; |
1095 | if let Some(max_queued) = max_queued { |
1096 | if max_queued > receiver.capacity() { |
1097 | receiver.set_capacity(max_queued); |
1098 | } |
1099 | } |
1100 | |
1101 | Ok(receiver.activate_cloned()) |
1102 | } |
1103 | } |
1104 | } |
1105 | |
1106 | pub(crate) async fn remove_match(&self, rule: OwnedMatchRule) -> Result<bool> { |
1107 | use std::collections::hash_map::Entry; |
1108 | let mut subscriptions = self.inner.subscriptions.lock().await; |
1109 | // TODO when it becomes stable, use HashMap::raw_entry and only require expr: &str |
1110 | // (both here and in add_match) |
1111 | let msg_type = rule.msg_type().unwrap_or(MessageType::Signal); |
1112 | match subscriptions.entry(rule) { |
1113 | Entry::Vacant(_) => Ok(false), |
1114 | Entry::Occupied(mut e) => { |
1115 | let rule = e.key().inner().clone(); |
1116 | e.get_mut().0 -= 1; |
1117 | if e.get().0 == 0 { |
1118 | if self.is_bus() && msg_type == MessageType::Signal { |
1119 | fdo::DBusProxy::builder(self) |
1120 | .cache_properties(CacheProperties::No) |
1121 | .build() |
1122 | .await? |
1123 | .remove_match_rule(rule.clone()) |
1124 | .await?; |
1125 | } |
1126 | e.remove(); |
1127 | self.inner |
1128 | .msg_senders |
1129 | .lock() |
1130 | .await |
1131 | .remove(&Some(rule.into())); |
1132 | } |
1133 | Ok(true) |
1134 | } |
1135 | } |
1136 | } |
1137 | |
1138 | pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) { |
1139 | let conn = self.clone(); |
1140 | let task_name = format!("Remove match ` {}`" , *rule); |
1141 | let remove_match = |
1142 | async move { conn.remove_match(rule).await }.instrument(trace_span!("{}" , task_name)); |
1143 | self.inner.executor.spawn(remove_match, &task_name).detach() |
1144 | } |
1145 | |
1146 | pub(crate) async fn hello_bus(&self) -> Result<()> { |
1147 | let dbus_proxy = fdo::DBusProxy::builder(self) |
1148 | .cache_properties(CacheProperties::No) |
1149 | .build() |
1150 | .await?; |
1151 | let name = dbus_proxy.hello().await?; |
1152 | |
1153 | self.inner |
1154 | .unique_name |
1155 | .set(name) |
1156 | // programmer (probably our) error if this fails. |
1157 | .expect("Attempted to set unique_name twice" ); |
1158 | |
1159 | Ok(()) |
1160 | } |
1161 | |
1162 | pub(crate) async fn new( |
1163 | auth: Authenticated<Box<dyn Socket>>, |
1164 | bus_connection: bool, |
1165 | executor: Executor<'static>, |
1166 | ) -> Result<Self> { |
1167 | #[cfg (unix)] |
1168 | let cap_unix_fd = auth.cap_unix_fd; |
1169 | |
1170 | macro_rules! create_msg_broadcast_channel { |
1171 | ($size:expr) => {{ |
1172 | let (msg_sender, msg_receiver) = broadcast($size); |
1173 | let mut msg_receiver = msg_receiver.deactivate(); |
1174 | msg_receiver.set_await_active(false); |
1175 | |
1176 | (msg_sender, msg_receiver) |
1177 | }}; |
1178 | } |
1179 | // The unfiltered message channel. |
1180 | let (msg_sender, msg_receiver) = create_msg_broadcast_channel!(DEFAULT_MAX_QUEUED); |
1181 | let mut msg_senders = HashMap::new(); |
1182 | msg_senders.insert(None, msg_sender); |
1183 | |
1184 | // The special method return & error channel. |
1185 | let (method_return_sender, method_return_receiver) = |
1186 | create_msg_broadcast_channel!(DEFAULT_MAX_METHOD_RETURN_QUEUED); |
1187 | let rule = MatchRule::builder() |
1188 | .msg_type(MessageType::MethodReturn) |
1189 | .build() |
1190 | .into(); |
1191 | msg_senders.insert(Some(rule), method_return_sender.clone()); |
1192 | let rule = MatchRule::builder() |
1193 | .msg_type(MessageType::Error) |
1194 | .build() |
1195 | .into(); |
1196 | msg_senders.insert(Some(rule), method_return_sender); |
1197 | let msg_senders = Arc::new(Mutex::new(msg_senders)); |
1198 | let subscriptions = Mutex::new(HashMap::new()); |
1199 | |
1200 | let raw_conn = Arc::new(sync::Mutex::new(auth.conn)); |
1201 | |
1202 | let connection = Self { |
1203 | inner: Arc::new(ConnectionInner { |
1204 | raw_conn, |
1205 | server_guid: auth.server_guid, |
1206 | #[cfg (unix)] |
1207 | cap_unix_fd, |
1208 | bus_conn: bus_connection, |
1209 | serial: AtomicU32::new(1), |
1210 | unique_name: OnceCell::new(), |
1211 | subscriptions, |
1212 | object_server: OnceCell::new(), |
1213 | object_server_dispatch_task: OnceCell::new(), |
1214 | executor, |
1215 | socket_reader_task: OnceCell::new(), |
1216 | msg_senders, |
1217 | msg_receiver, |
1218 | method_return_receiver, |
1219 | registered_names: Mutex::new(HashMap::new()), |
1220 | }), |
1221 | }; |
1222 | |
1223 | Ok(connection) |
1224 | } |
1225 | |
1226 | fn next_serial(&self) -> u32 { |
1227 | self.inner.serial.fetch_add(1, SeqCst) |
1228 | } |
1229 | |
1230 | /// Create a `Connection` to the session/user message bus. |
1231 | pub async fn session() -> Result<Self> { |
1232 | ConnectionBuilder::session()?.build().await |
1233 | } |
1234 | |
1235 | /// Create a `Connection` to the system-wide message bus. |
1236 | pub async fn system() -> Result<Self> { |
1237 | ConnectionBuilder::system()?.build().await |
1238 | } |
1239 | |
1240 | /// Returns a listener, notified on various connection activity. |
1241 | /// |
1242 | /// This function is meant for the caller to implement idle or timeout on inactivity. |
1243 | pub fn monitor_activity(&self) -> EventListener { |
1244 | self.inner |
1245 | .raw_conn |
1246 | .lock() |
1247 | .expect("poisoned lock" ) |
1248 | .monitor_activity() |
1249 | } |
1250 | |
1251 | /// Returns the peer process ID, or Ok(None) if it cannot be returned for the associated socket. |
1252 | #[deprecated ( |
1253 | since = "3.13.0" , |
1254 | note = "Use `peer_credentials` instead, which returns `ConnectionCredentials` which includes |
1255 | the peer PID." |
1256 | )] |
1257 | pub fn peer_pid(&self) -> io::Result<Option<u32>> { |
1258 | self.inner |
1259 | .raw_conn |
1260 | .lock() |
1261 | .expect("poisoned lock" ) |
1262 | .socket() |
1263 | .peer_pid() |
1264 | } |
1265 | |
1266 | /// Returns the peer credentials. |
1267 | /// |
1268 | /// The fields are populated on the best effort basis. Some or all fields may not even make |
1269 | /// sense for certain sockets or on certain platforms and hence will be set to `None`. |
1270 | /// |
1271 | /// # Caveats |
1272 | /// |
1273 | /// Currently `unix_group_ids` and `linux_security_label` fields are not populated. |
1274 | #[allow (deprecated)] |
1275 | pub async fn peer_credentials(&self) -> io::Result<ConnectionCredentials> { |
1276 | let raw_conn = self.inner.raw_conn.lock().expect("poisoned lock" ); |
1277 | let socket = raw_conn.socket(); |
1278 | |
1279 | Ok(ConnectionCredentials { |
1280 | process_id: socket.peer_pid()?, |
1281 | #[cfg (unix)] |
1282 | unix_user_id: socket.uid()?, |
1283 | #[cfg (not(unix))] |
1284 | unix_user_id: None, |
1285 | // Should we beother providing all the groups of user? What's the use case? |
1286 | unix_group_ids: None, |
1287 | #[cfg (windows)] |
1288 | windows_sid: socket.peer_sid(), |
1289 | #[cfg (not(windows))] |
1290 | windows_sid: None, |
1291 | // TODO: Populate this field (see the field docs for pointers). |
1292 | linux_security_label: None, |
1293 | }) |
1294 | } |
1295 | |
1296 | pub(crate) fn init_socket_reader(&self) { |
1297 | let inner = &self.inner; |
1298 | inner |
1299 | .socket_reader_task |
1300 | .set( |
1301 | SocketReader::new(inner.raw_conn.clone(), inner.msg_senders.clone()) |
1302 | .spawn(&inner.executor), |
1303 | ) |
1304 | .expect("Attempted to set `socket_reader_task` twice" ); |
1305 | } |
1306 | } |
1307 | |
1308 | impl<T> Sink<T> for Connection |
1309 | where |
1310 | T: Into<Arc<Message>>, |
1311 | { |
1312 | type Error = Error; |
1313 | |
1314 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1315 | <&Connection as Sink<Arc<Message>>>::poll_ready(self:Pin::new(&mut &*self), cx) |
1316 | } |
1317 | |
1318 | fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()> { |
1319 | Pin::new(&mut &*self).start_send(item:msg) |
1320 | } |
1321 | |
1322 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1323 | <&Connection as Sink<Arc<Message>>>::poll_flush(self:Pin::new(&mut &*self), cx) |
1324 | } |
1325 | |
1326 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1327 | <&Connection as Sink<Arc<Message>>>::poll_close(self:Pin::new(&mut &*self), cx) |
1328 | } |
1329 | } |
1330 | |
1331 | impl<'a, T> Sink<T> for &'a Connection |
1332 | where |
1333 | T: Into<Arc<Message>>, |
1334 | { |
1335 | type Error = Error; |
1336 | |
1337 | fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> { |
1338 | // TODO: We should have a max queue length in raw::Socket for outgoing messages. |
1339 | Poll::Ready(Ok(())) |
1340 | } |
1341 | |
1342 | fn start_send(self: Pin<&mut Self>, msg: T) -> Result<()> { |
1343 | let msg = msg.into(); |
1344 | |
1345 | #[cfg (unix)] |
1346 | if !msg.fds().is_empty() && !self.inner.cap_unix_fd { |
1347 | return Err(Error::Unsupported); |
1348 | } |
1349 | |
1350 | self.inner |
1351 | .raw_conn |
1352 | .lock() |
1353 | .expect("poisoned lock" ) |
1354 | .enqueue_message(msg); |
1355 | |
1356 | Ok(()) |
1357 | } |
1358 | |
1359 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1360 | self.inner.raw_conn.lock().expect("poisoned lock" ).flush(cx) |
1361 | } |
1362 | |
1363 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { |
1364 | let mut raw_conn = self.inner.raw_conn.lock().expect("poisoned lock" ); |
1365 | let res = raw_conn.flush(cx); |
1366 | match ready!(res) { |
1367 | Ok(_) => (), |
1368 | Err(e) => return Poll::Ready(Err(e)), |
1369 | } |
1370 | |
1371 | Poll::Ready(raw_conn.close()) |
1372 | } |
1373 | } |
1374 | |
1375 | impl From<crate::blocking::Connection> for Connection { |
1376 | fn from(conn: crate::blocking::Connection) -> Self { |
1377 | conn.into_inner() |
1378 | } |
1379 | } |
1380 | |
1381 | // Internal API that allows keeping a weak connection ref around. |
1382 | #[derive (Debug)] |
1383 | pub(crate) struct WeakConnection { |
1384 | inner: Weak<ConnectionInner>, |
1385 | } |
1386 | |
1387 | impl WeakConnection { |
1388 | /// Upgrade to a Connection. |
1389 | pub fn upgrade(&self) -> Option<Connection> { |
1390 | self.inner.upgrade().map(|inner: Arc| Connection { inner }) |
1391 | } |
1392 | } |
1393 | |
1394 | impl From<&Connection> for WeakConnection { |
1395 | fn from(conn: &Connection) -> Self { |
1396 | Self { |
1397 | inner: Arc::downgrade(&conn.inner), |
1398 | } |
1399 | } |
1400 | } |
1401 | |
1402 | #[derive (Debug)] |
1403 | enum NameStatus { |
1404 | // The task waits for name lost signal if owner allows replacement. |
1405 | Owner(#[allow (unused)] Option<Task<()>>), |
1406 | // The task waits for name acquisition signal. |
1407 | Queued(#[allow (unused)] Task<()>), |
1408 | } |
1409 | |
1410 | #[cfg (test)] |
1411 | mod tests { |
1412 | use futures_util::stream::TryStreamExt; |
1413 | use ntest::timeout; |
1414 | use test_log::test; |
1415 | |
1416 | use crate::{fdo::DBusProxy, AuthMechanism}; |
1417 | |
1418 | use super::*; |
1419 | |
1420 | // Same numbered client and server are already paired up. We make use of the |
1421 | // `futures_util::stream::Forward` to connect the two pipes and hence test one of the benefits |
1422 | // of our Stream and Sink impls. |
1423 | async fn test_p2p( |
1424 | server1: Connection, |
1425 | client1: Connection, |
1426 | server2: Connection, |
1427 | client2: Connection, |
1428 | ) -> Result<()> { |
1429 | let forward1 = MessageStream::from(server1.clone()).forward(client2.clone()); |
1430 | let forward2 = MessageStream::from(&client2).forward(server1); |
1431 | let _forward_task = client1.executor().spawn( |
1432 | async move { futures_util::try_join!(forward1, forward2) }, |
1433 | "forward_task" , |
1434 | ); |
1435 | |
1436 | let server_ready = Event::new(); |
1437 | let server_ready_listener = server_ready.listen(); |
1438 | let client_done = Event::new(); |
1439 | let client_done_listener = client_done.listen(); |
1440 | |
1441 | let server_future = async move { |
1442 | let mut stream = MessageStream::from(&server2); |
1443 | server_ready.notify(1); |
1444 | let method = loop { |
1445 | let m = stream.try_next().await?.unwrap(); |
1446 | if m.to_string() == "Method call Test" { |
1447 | break m; |
1448 | } |
1449 | }; |
1450 | |
1451 | // Send another message first to check the queueing function on client side. |
1452 | server2 |
1453 | .emit_signal(None::<()>, "/" , "org.zbus.p2p" , "ASignalForYou" , &()) |
1454 | .await?; |
1455 | server2.reply(&method, &("yay" )).await?; |
1456 | client_done_listener.await; |
1457 | |
1458 | Ok(()) |
1459 | }; |
1460 | |
1461 | let client_future = async move { |
1462 | let mut stream = MessageStream::from(&client1); |
1463 | server_ready_listener.await; |
1464 | let reply = client1 |
1465 | .call_method(None::<()>, "/" , Some("org.zbus.p2p" ), "Test" , &()) |
1466 | .await?; |
1467 | assert_eq!(reply.to_string(), "Method return" ); |
1468 | // Check we didn't miss the signal that was sent during the call. |
1469 | let m = stream.try_next().await?.unwrap(); |
1470 | client_done.notify(1); |
1471 | assert_eq!(m.to_string(), "Signal ASignalForYou" ); |
1472 | reply.body::<String>() |
1473 | }; |
1474 | |
1475 | let (val, _) = futures_util::try_join!(client_future, server_future,)?; |
1476 | assert_eq!(val, "yay" ); |
1477 | |
1478 | Ok(()) |
1479 | } |
1480 | |
1481 | #[test ] |
1482 | #[timeout(15000)] |
1483 | fn tcp_p2p() { |
1484 | crate::utils::block_on(test_tcp_p2p()).unwrap(); |
1485 | } |
1486 | |
1487 | async fn test_tcp_p2p() -> Result<()> { |
1488 | let (server1, client1) = tcp_p2p_pipe().await?; |
1489 | let (server2, client2) = tcp_p2p_pipe().await?; |
1490 | |
1491 | test_p2p(server1, client1, server2, client2).await |
1492 | } |
1493 | |
1494 | async fn tcp_p2p_pipe() -> Result<(Connection, Connection)> { |
1495 | let guid = Guid::generate(); |
1496 | |
1497 | #[cfg (not(feature = "tokio" ))] |
1498 | let (server_conn_builder, client_conn_builder) = { |
1499 | let listener = std::net::TcpListener::bind("127.0.0.1:0" ).unwrap(); |
1500 | let addr = listener.local_addr().unwrap(); |
1501 | let p1 = std::net::TcpStream::connect(addr).unwrap(); |
1502 | let p0 = listener.incoming().next().unwrap().unwrap(); |
1503 | |
1504 | ( |
1505 | ConnectionBuilder::tcp_stream(p0) |
1506 | .server(&guid) |
1507 | .p2p() |
1508 | .auth_mechanisms(&[AuthMechanism::Anonymous]), |
1509 | ConnectionBuilder::tcp_stream(p1).p2p(), |
1510 | ) |
1511 | }; |
1512 | |
1513 | #[cfg (feature = "tokio" )] |
1514 | let (server_conn_builder, client_conn_builder) = { |
1515 | let listener = tokio::net::TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
1516 | let addr = listener.local_addr().unwrap(); |
1517 | let p1 = tokio::net::TcpStream::connect(addr).await.unwrap(); |
1518 | let p0 = listener.accept().await.unwrap().0; |
1519 | |
1520 | ( |
1521 | ConnectionBuilder::tcp_stream(p0) |
1522 | .server(&guid) |
1523 | .p2p() |
1524 | .auth_mechanisms(&[AuthMechanism::Anonymous]), |
1525 | ConnectionBuilder::tcp_stream(p1).p2p(), |
1526 | ) |
1527 | }; |
1528 | |
1529 | futures_util::try_join!(server_conn_builder.build(), client_conn_builder.build()) |
1530 | } |
1531 | |
1532 | #[cfg (unix)] |
1533 | #[test ] |
1534 | #[timeout(15000)] |
1535 | fn unix_p2p() { |
1536 | crate::utils::block_on(test_unix_p2p()).unwrap(); |
1537 | } |
1538 | |
1539 | #[cfg (unix)] |
1540 | async fn test_unix_p2p() -> Result<()> { |
1541 | let (server1, client1) = unix_p2p_pipe().await?; |
1542 | let (server2, client2) = unix_p2p_pipe().await?; |
1543 | |
1544 | test_p2p(server1, client1, server2, client2).await |
1545 | } |
1546 | |
1547 | #[cfg (unix)] |
1548 | async fn unix_p2p_pipe() -> Result<(Connection, Connection)> { |
1549 | #[cfg (not(feature = "tokio" ))] |
1550 | use std::os::unix::net::UnixStream; |
1551 | #[cfg (feature = "tokio" )] |
1552 | use tokio::net::UnixStream; |
1553 | #[cfg (all(windows, not(feature = "tokio" )))] |
1554 | use uds_windows::UnixStream; |
1555 | |
1556 | let guid = Guid::generate(); |
1557 | |
1558 | let (p0, p1) = UnixStream::pair().unwrap(); |
1559 | |
1560 | futures_util::try_join!( |
1561 | ConnectionBuilder::unix_stream(p1).p2p().build(), |
1562 | ConnectionBuilder::unix_stream(p0) |
1563 | .server(&guid) |
1564 | .p2p() |
1565 | .build(), |
1566 | ) |
1567 | } |
1568 | |
1569 | // Compile-test only since we don't have a VM setup to run this with/in. |
1570 | #[cfg (any( |
1571 | all(feature = "vsock" , not(feature = "tokio" )), |
1572 | feature = "tokio-vsock" |
1573 | ))] |
1574 | #[test ] |
1575 | #[timeout(15000)] |
1576 | #[ignore ] |
1577 | fn vsock_p2p() { |
1578 | crate::utils::block_on(test_vsock_p2p()).unwrap(); |
1579 | } |
1580 | |
1581 | #[cfg (any( |
1582 | all(feature = "vsock" , not(feature = "tokio" )), |
1583 | feature = "tokio-vsock" |
1584 | ))] |
1585 | async fn test_vsock_p2p() -> Result<()> { |
1586 | let (server1, client1) = vsock_p2p_pipe().await?; |
1587 | let (server2, client2) = vsock_p2p_pipe().await?; |
1588 | |
1589 | test_p2p(server1, client1, server2, client2).await |
1590 | } |
1591 | |
1592 | #[cfg (all(feature = "vsock" , not(feature = "tokio" )))] |
1593 | async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> { |
1594 | let guid = Guid::generate(); |
1595 | |
1596 | let listener = vsock::VsockListener::bind_with_cid_port(vsock::VMADDR_CID_ANY, 42).unwrap(); |
1597 | let addr = listener.local_addr().unwrap(); |
1598 | let client = vsock::VsockStream::connect(&addr).unwrap(); |
1599 | let server = listener.incoming().next().unwrap().unwrap(); |
1600 | |
1601 | futures_util::try_join!( |
1602 | ConnectionBuilder::vsock_stream(server) |
1603 | .server(&guid) |
1604 | .p2p() |
1605 | .auth_mechanisms(&[AuthMechanism::Anonymous]) |
1606 | .build(), |
1607 | ConnectionBuilder::vsock_stream(client).p2p().build(), |
1608 | ) |
1609 | } |
1610 | |
1611 | #[cfg (feature = "tokio-vsock" )] |
1612 | async fn vsock_p2p_pipe() -> Result<(Connection, Connection)> { |
1613 | let guid = Guid::generate(); |
1614 | |
1615 | let listener = tokio_vsock::VsockListener::bind(2, 42).unwrap(); |
1616 | let client = tokio_vsock::VsockStream::connect(3, 42).await.unwrap(); |
1617 | let server = listener.incoming().next().await.unwrap().unwrap(); |
1618 | |
1619 | futures_util::try_join!( |
1620 | ConnectionBuilder::vsock_stream(server) |
1621 | .server(&guid) |
1622 | .p2p() |
1623 | .auth_mechanisms(&[AuthMechanism::Anonymous]) |
1624 | .build(), |
1625 | ConnectionBuilder::vsock_stream(client).p2p().build(), |
1626 | ) |
1627 | } |
1628 | |
1629 | #[test ] |
1630 | #[timeout(15000)] |
1631 | fn serial_monotonically_increases() { |
1632 | crate::utils::block_on(test_serial_monotonically_increases()); |
1633 | } |
1634 | |
1635 | async fn test_serial_monotonically_increases() { |
1636 | let c = Connection::session().await.unwrap(); |
1637 | let serial = c.next_serial() + 1; |
1638 | |
1639 | for next in serial..serial + 10 { |
1640 | assert_eq!(next, c.next_serial()); |
1641 | } |
1642 | } |
1643 | |
1644 | #[cfg (all(windows, feature = "windows-gdbus" ))] |
1645 | #[test ] |
1646 | fn connect_gdbus_session_bus() { |
1647 | let addr = crate::win32::windows_autolaunch_bus_address() |
1648 | .expect("Unable to get GDBus session bus address" ); |
1649 | |
1650 | crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus" ); |
1651 | } |
1652 | |
1653 | #[cfg (target_os = "macos" )] |
1654 | #[test ] |
1655 | fn connect_launchd_session_bus() { |
1656 | crate::block_on(async { |
1657 | let addr = crate::address::macos_launchd_bus_address("DBUS_LAUNCHD_SESSION_BUS_SOCKET" ) |
1658 | .await |
1659 | .expect("Unable to get Launchd session bus address" ); |
1660 | addr.connect().await |
1661 | }) |
1662 | .expect("Unable to connect to session bus" ); |
1663 | } |
1664 | |
1665 | #[test ] |
1666 | #[timeout(15000)] |
1667 | fn disconnect_on_drop() { |
1668 | // Reproducer for https://github.com/dbus2/zbus/issues/308 where setting up the |
1669 | // objectserver would cause the connection to not disconnect on drop. |
1670 | crate::utils::block_on(test_disconnect_on_drop()); |
1671 | } |
1672 | |
1673 | async fn test_disconnect_on_drop() { |
1674 | #[derive (Default)] |
1675 | struct MyInterface {} |
1676 | |
1677 | #[crate::dbus_interface (name = "dev.peelz.FooBar.Baz" )] |
1678 | impl MyInterface { |
1679 | fn do_thing(&self) {} |
1680 | } |
1681 | let name = "dev.peelz.foobar" ; |
1682 | let connection = ConnectionBuilder::session() |
1683 | .unwrap() |
1684 | .name(name) |
1685 | .unwrap() |
1686 | .serve_at("/dev/peelz/FooBar" , MyInterface::default()) |
1687 | .unwrap() |
1688 | .build() |
1689 | .await |
1690 | .unwrap(); |
1691 | |
1692 | let connection2 = Connection::session().await.unwrap(); |
1693 | let dbus = DBusProxy::new(&connection2).await.unwrap(); |
1694 | let mut stream = dbus |
1695 | .receive_name_owner_changed_with_args(&[(0, name), (2, "" )]) |
1696 | .await |
1697 | .unwrap(); |
1698 | |
1699 | drop(connection); |
1700 | |
1701 | // If the connection is not dropped, this will hang forever. |
1702 | stream.next().await.unwrap(); |
1703 | |
1704 | // Let's still make sure the name is gone. |
1705 | let name_has_owner = dbus.name_has_owner(name.try_into().unwrap()).await.unwrap(); |
1706 | assert!(!name_has_owner); |
1707 | } |
1708 | |
1709 | #[cfg (any(unix, not(feature = "tokio" )))] |
1710 | #[test ] |
1711 | #[timeout(15000)] |
1712 | fn unix_p2p_cookie_auth() { |
1713 | use crate::utils::block_on; |
1714 | use std::{ |
1715 | fs::{create_dir_all, remove_file, write}, |
1716 | time::{SystemTime as Time, UNIX_EPOCH}, |
1717 | }; |
1718 | #[cfg (unix)] |
1719 | use std::{ |
1720 | fs::{set_permissions, Permissions}, |
1721 | os::unix::fs::PermissionsExt, |
1722 | }; |
1723 | use xdg_home::home_dir; |
1724 | |
1725 | let cookie_context = "zbus-test-cookie-context" ; |
1726 | let cookie_id = 123456789; |
1727 | let cookie = hex::encode(b"our cookie" ); |
1728 | |
1729 | // Ensure cookie directory exists. |
1730 | let cookie_dir = home_dir().unwrap().join(".dbus-keyrings" ); |
1731 | create_dir_all(&cookie_dir).unwrap(); |
1732 | #[cfg (unix)] |
1733 | set_permissions(&cookie_dir, Permissions::from_mode(0o700)).unwrap(); |
1734 | |
1735 | // Create a cookie file. |
1736 | let cookie_file = cookie_dir.join(cookie_context); |
1737 | let ts = Time::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); |
1738 | let cookie_entry = format!(" {cookie_id} {ts} {cookie}" ); |
1739 | write(&cookie_file, cookie_entry).unwrap(); |
1740 | |
1741 | // Explicit cookie ID. |
1742 | let res1 = block_on(test_unix_p2p_cookie_auth(cookie_context, Some(cookie_id))); |
1743 | // Implicit cookie ID (first one should be picked). |
1744 | let res2 = block_on(test_unix_p2p_cookie_auth(cookie_context, None)); |
1745 | |
1746 | // Remove the cookie file. |
1747 | remove_file(&cookie_file).unwrap(); |
1748 | |
1749 | res1.unwrap(); |
1750 | res2.unwrap(); |
1751 | } |
1752 | |
1753 | #[cfg (any(unix, not(feature = "tokio" )))] |
1754 | async fn test_unix_p2p_cookie_auth( |
1755 | cookie_context: &'static str, |
1756 | cookie_id: Option<usize>, |
1757 | ) -> Result<()> { |
1758 | #[cfg (all(unix, not(feature = "tokio" )))] |
1759 | use std::os::unix::net::UnixStream; |
1760 | #[cfg (all(unix, feature = "tokio" ))] |
1761 | use tokio::net::UnixStream; |
1762 | #[cfg (all(windows, not(feature = "tokio" )))] |
1763 | use uds_windows::UnixStream; |
1764 | |
1765 | let guid = Guid::generate(); |
1766 | |
1767 | let (p0, p1) = UnixStream::pair().unwrap(); |
1768 | let mut server_builder = ConnectionBuilder::unix_stream(p0) |
1769 | .server(&guid) |
1770 | .p2p() |
1771 | .auth_mechanisms(&[AuthMechanism::Cookie]) |
1772 | .cookie_context(cookie_context) |
1773 | .unwrap(); |
1774 | if let Some(cookie_id) = cookie_id { |
1775 | server_builder = server_builder.cookie_id(cookie_id); |
1776 | } |
1777 | |
1778 | futures_util::try_join!( |
1779 | ConnectionBuilder::unix_stream(p1).p2p().build(), |
1780 | server_builder.build(), |
1781 | ) |
1782 | .map(|_| ()) |
1783 | } |
1784 | } |
1785 | |