1use std::{
2 collections::VecDeque,
3 io,
4 sync::Arc,
5 task::{Context, Poll},
6};
7
8use event_listener::{Event, EventListener};
9
10#[cfg(unix)]
11use crate::OwnedFd;
12use crate::{
13 message_header::{MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE},
14 raw::Socket,
15 utils::padding_for_8_bytes,
16 Message, MessagePrimaryHeader,
17};
18
19use futures_core::ready;
20
21/// A low-level representation of a D-Bus connection
22///
23/// This wrapper is agnostic on the actual transport, using the `Socket` trait
24/// to abstract it. It is compatible with sockets both in blocking or non-blocking
25/// mode.
26///
27/// This wrapper abstracts away the serialization & buffering considerations of the
28/// protocol, and allows interaction based on messages, rather than bytes.
29#[derive(derivative::Derivative)]
30#[derivative(Debug)]
31pub struct Connection<S> {
32 #[derivative(Debug = "ignore")]
33 socket: S,
34 event: Event,
35 raw_in_buffer: Vec<u8>,
36 #[cfg(unix)]
37 raw_in_fds: Vec<OwnedFd>,
38 raw_in_pos: usize,
39 out_pos: usize,
40 out_msgs: VecDeque<Arc<Message>>,
41 prev_seq: u64,
42}
43
44impl<S: Socket> Connection<S> {
45 pub(crate) fn new(socket: S, raw_in_buffer: Vec<u8>) -> Connection<S> {
46 Connection {
47 socket,
48 event: Event::new(),
49 raw_in_pos: raw_in_buffer.len(),
50 raw_in_buffer,
51 #[cfg(unix)]
52 raw_in_fds: vec![],
53 out_pos: 0,
54 out_msgs: VecDeque::new(),
55 prev_seq: 0,
56 }
57 }
58
59 /// Attempt to flush the outgoing buffer
60 ///
61 /// This will try to write as many messages as possible from the
62 /// outgoing buffer into the socket, until an error is encountered.
63 ///
64 /// This method will thus only block if the socket is in blocking mode.
65 pub fn try_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
66 self.event.notify(usize::MAX);
67 while let Some(msg) = self.out_msgs.front() {
68 loop {
69 let data = &msg.as_bytes()[self.out_pos..];
70 if data.is_empty() {
71 self.out_pos = 0;
72 self.out_msgs.pop_front();
73 break;
74 }
75 #[cfg(unix)]
76 let fds = if self.out_pos == 0 { msg.fds() } else { vec![] };
77 self.out_pos += ready!(self.socket.poll_sendmsg(
78 cx,
79 data,
80 #[cfg(unix)]
81 &fds,
82 ))?;
83 }
84 }
85 Poll::Ready(Ok(()))
86 }
87
88 /// Enqueue a message to be sent out to the socket
89 ///
90 /// This method will *not* write anything to the socket, you need to call
91 /// `try_flush()` afterwards so that your message is actually sent out.
92 pub fn enqueue_message(&mut self, msg: Arc<Message>) {
93 self.out_msgs.push_back(msg);
94 }
95
96 /// Attempt to read a message from the socket
97 ///
98 /// This methods will read from the socket until either a full D-Bus message is
99 /// read or an error is encountered.
100 ///
101 /// If the socket is in non-blocking mode, it may read a partial message. In such case it
102 /// will buffer it internally and try to complete it the next time you call
103 /// `try_receive_message`.
104 pub fn try_receive_message(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Message>> {
105 self.event.notify(usize::MAX);
106 if self.raw_in_pos < MIN_MESSAGE_SIZE {
107 self.raw_in_buffer.resize(MIN_MESSAGE_SIZE, 0);
108 // We don't have enough data to make a proper message header yet.
109 // Some partial read may be in raw_in_buffer, so we try to complete it
110 // until we have MIN_MESSAGE_SIZE bytes
111 //
112 // Given that MIN_MESSAGE_SIZE is 16, this codepath is actually extremely unlikely
113 // to be taken more than once
114 while self.raw_in_pos < MIN_MESSAGE_SIZE {
115 let res = ready!(self
116 .socket
117 .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?;
118 let len = {
119 #[cfg(unix)]
120 {
121 let (len, fds) = res;
122 self.raw_in_fds.extend(fds);
123 len
124 }
125 #[cfg(not(unix))]
126 {
127 res
128 }
129 };
130 self.raw_in_pos += len;
131 if len == 0 {
132 return Poll::Ready(Err(crate::Error::InputOutput(
133 std::io::Error::new(
134 std::io::ErrorKind::UnexpectedEof,
135 "failed to receive message",
136 )
137 .into(),
138 )));
139 }
140 }
141 }
142
143 let (primary_header, fields_len) = MessagePrimaryHeader::read(&self.raw_in_buffer)?;
144 let header_len = MIN_MESSAGE_SIZE + fields_len as usize;
145 let body_padding = padding_for_8_bytes(header_len);
146 let body_len = primary_header.body_len() as usize;
147 let total_len = header_len + body_padding + body_len;
148 if total_len > MAX_MESSAGE_SIZE {
149 return Poll::Ready(Err(crate::Error::ExcessData));
150 }
151
152 // By this point we have a full primary header, so we know the exact length of the complete
153 // message.
154 self.raw_in_buffer.resize(total_len, 0);
155
156 // Now we have an incomplete message; read the rest
157 while self.raw_in_buffer.len() > self.raw_in_pos {
158 let res = ready!(self
159 .socket
160 .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?;
161 let read = {
162 #[cfg(unix)]
163 {
164 let (read, fds) = res;
165 self.raw_in_fds.extend(fds);
166 read
167 }
168 #[cfg(not(unix))]
169 {
170 res
171 }
172 };
173 self.raw_in_pos += read;
174 }
175
176 // If we reach here, the message is complete; return it
177 self.raw_in_pos = 0;
178 let bytes = std::mem::take(&mut self.raw_in_buffer);
179 #[cfg(unix)]
180 let fds = std::mem::take(&mut self.raw_in_fds);
181 let seq = self.prev_seq + 1;
182 self.prev_seq = seq;
183 Poll::Ready(Message::from_raw_parts(
184 bytes,
185 #[cfg(unix)]
186 fds,
187 seq,
188 ))
189 }
190
191 /// Close the connection.
192 ///
193 /// After this call, all reading and writing operations will fail.
194 pub fn close(&self) -> crate::Result<()> {
195 self.event.notify(usize::MAX);
196 self.socket().close().map_err(|e| e.into())
197 }
198
199 /// Access the underlying socket
200 ///
201 /// This method is intended to provide access to the socket in order to access certain
202 /// properties (e.g peer credentials).
203 ///
204 /// You should not try to read or write from it directly, as it may corrupt the internal state
205 /// of this wrapper.
206 pub fn socket(&self) -> &S {
207 &self.socket
208 }
209
210 pub(crate) fn monitor_activity(&self) -> EventListener {
211 self.event.listen()
212 }
213}
214
215impl Connection<Box<dyn Socket>> {
216 /// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`]
217 /// impls.
218 pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
219 self.try_flush(cx).map_err(Into::into)
220 }
221}
222
223#[cfg(unix)]
224#[cfg(test)]
225mod tests {
226 use super::{Arc, Connection};
227 use crate::message::Message;
228 use futures_util::future::poll_fn;
229 use test_log::test;
230
231 #[test]
232 fn raw_send_receive() {
233 crate::block_on(raw_send_receive_async());
234 }
235
236 async fn raw_send_receive_async() {
237 #[cfg(not(feature = "tokio"))]
238 let (p0, p1) = std::os::unix::net::UnixStream::pair()
239 .map(|(p0, p1)| {
240 (
241 async_io::Async::new(p0).unwrap(),
242 async_io::Async::new(p1).unwrap(),
243 )
244 })
245 .unwrap();
246 #[cfg(feature = "tokio")]
247 let (p0, p1) = tokio::net::UnixStream::pair().unwrap();
248
249 let mut conn0 = Connection::new(p0, vec![]);
250 let mut conn1 = Connection::new(p1, vec![]);
251
252 let msg = Message::method(
253 None::<()>,
254 None::<()>,
255 "/",
256 Some("org.zbus.p2p"),
257 "Test",
258 &(),
259 )
260 .unwrap();
261
262 conn0.enqueue_message(Arc::new(msg));
263 poll_fn(|cx| conn0.try_flush(cx)).await.unwrap();
264
265 let ret = poll_fn(|cx| conn1.try_receive_message(cx)).await.unwrap();
266 assert_eq!(ret.to_string(), "Method call Test");
267 }
268}
269