1 | use std::{ |
2 | collections::VecDeque, |
3 | io, |
4 | sync::Arc, |
5 | task::{Context, Poll}, |
6 | }; |
7 | |
8 | use event_listener::{Event, EventListener}; |
9 | |
10 | #[cfg (unix)] |
11 | use crate::OwnedFd; |
12 | use crate::{ |
13 | message_header::{MAX_MESSAGE_SIZE, MIN_MESSAGE_SIZE}, |
14 | raw::Socket, |
15 | utils::padding_for_8_bytes, |
16 | Message, MessagePrimaryHeader, |
17 | }; |
18 | |
19 | use futures_core::ready; |
20 | |
21 | /// A low-level representation of a D-Bus connection |
22 | /// |
23 | /// This wrapper is agnostic on the actual transport, using the `Socket` trait |
24 | /// to abstract it. It is compatible with sockets both in blocking or non-blocking |
25 | /// mode. |
26 | /// |
27 | /// This wrapper abstracts away the serialization & buffering considerations of the |
28 | /// protocol, and allows interaction based on messages, rather than bytes. |
29 | #[derive (derivative::Derivative)] |
30 | #[derivative(Debug)] |
31 | pub struct Connection<S> { |
32 | #[derivative(Debug = "ignore" )] |
33 | socket: S, |
34 | event: Event, |
35 | raw_in_buffer: Vec<u8>, |
36 | #[cfg (unix)] |
37 | raw_in_fds: Vec<OwnedFd>, |
38 | raw_in_pos: usize, |
39 | out_pos: usize, |
40 | out_msgs: VecDeque<Arc<Message>>, |
41 | prev_seq: u64, |
42 | } |
43 | |
44 | impl<S: Socket> Connection<S> { |
45 | pub(crate) fn new(socket: S, raw_in_buffer: Vec<u8>) -> Connection<S> { |
46 | Connection { |
47 | socket, |
48 | event: Event::new(), |
49 | raw_in_pos: raw_in_buffer.len(), |
50 | raw_in_buffer, |
51 | #[cfg (unix)] |
52 | raw_in_fds: vec![], |
53 | out_pos: 0, |
54 | out_msgs: VecDeque::new(), |
55 | prev_seq: 0, |
56 | } |
57 | } |
58 | |
59 | /// Attempt to flush the outgoing buffer |
60 | /// |
61 | /// This will try to write as many messages as possible from the |
62 | /// outgoing buffer into the socket, until an error is encountered. |
63 | /// |
64 | /// This method will thus only block if the socket is in blocking mode. |
65 | pub fn try_flush(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
66 | self.event.notify(usize::MAX); |
67 | while let Some(msg) = self.out_msgs.front() { |
68 | loop { |
69 | let data = &msg.as_bytes()[self.out_pos..]; |
70 | if data.is_empty() { |
71 | self.out_pos = 0; |
72 | self.out_msgs.pop_front(); |
73 | break; |
74 | } |
75 | #[cfg (unix)] |
76 | let fds = if self.out_pos == 0 { msg.fds() } else { vec![] }; |
77 | self.out_pos += ready!(self.socket.poll_sendmsg( |
78 | cx, |
79 | data, |
80 | #[cfg (unix)] |
81 | &fds, |
82 | ))?; |
83 | } |
84 | } |
85 | Poll::Ready(Ok(())) |
86 | } |
87 | |
88 | /// Enqueue a message to be sent out to the socket |
89 | /// |
90 | /// This method will *not* write anything to the socket, you need to call |
91 | /// `try_flush()` afterwards so that your message is actually sent out. |
92 | pub fn enqueue_message(&mut self, msg: Arc<Message>) { |
93 | self.out_msgs.push_back(msg); |
94 | } |
95 | |
96 | /// Attempt to read a message from the socket |
97 | /// |
98 | /// This methods will read from the socket until either a full D-Bus message is |
99 | /// read or an error is encountered. |
100 | /// |
101 | /// If the socket is in non-blocking mode, it may read a partial message. In such case it |
102 | /// will buffer it internally and try to complete it the next time you call |
103 | /// `try_receive_message`. |
104 | pub fn try_receive_message(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Message>> { |
105 | self.event.notify(usize::MAX); |
106 | if self.raw_in_pos < MIN_MESSAGE_SIZE { |
107 | self.raw_in_buffer.resize(MIN_MESSAGE_SIZE, 0); |
108 | // We don't have enough data to make a proper message header yet. |
109 | // Some partial read may be in raw_in_buffer, so we try to complete it |
110 | // until we have MIN_MESSAGE_SIZE bytes |
111 | // |
112 | // Given that MIN_MESSAGE_SIZE is 16, this codepath is actually extremely unlikely |
113 | // to be taken more than once |
114 | while self.raw_in_pos < MIN_MESSAGE_SIZE { |
115 | let res = ready!(self |
116 | .socket |
117 | .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?; |
118 | let len = { |
119 | #[cfg (unix)] |
120 | { |
121 | let (len, fds) = res; |
122 | self.raw_in_fds.extend(fds); |
123 | len |
124 | } |
125 | #[cfg (not(unix))] |
126 | { |
127 | res |
128 | } |
129 | }; |
130 | self.raw_in_pos += len; |
131 | if len == 0 { |
132 | return Poll::Ready(Err(crate::Error::InputOutput( |
133 | std::io::Error::new( |
134 | std::io::ErrorKind::UnexpectedEof, |
135 | "failed to receive message" , |
136 | ) |
137 | .into(), |
138 | ))); |
139 | } |
140 | } |
141 | } |
142 | |
143 | let (primary_header, fields_len) = MessagePrimaryHeader::read(&self.raw_in_buffer)?; |
144 | let header_len = MIN_MESSAGE_SIZE + fields_len as usize; |
145 | let body_padding = padding_for_8_bytes(header_len); |
146 | let body_len = primary_header.body_len() as usize; |
147 | let total_len = header_len + body_padding + body_len; |
148 | if total_len > MAX_MESSAGE_SIZE { |
149 | return Poll::Ready(Err(crate::Error::ExcessData)); |
150 | } |
151 | |
152 | // By this point we have a full primary header, so we know the exact length of the complete |
153 | // message. |
154 | self.raw_in_buffer.resize(total_len, 0); |
155 | |
156 | // Now we have an incomplete message; read the rest |
157 | while self.raw_in_buffer.len() > self.raw_in_pos { |
158 | let res = ready!(self |
159 | .socket |
160 | .poll_recvmsg(cx, &mut self.raw_in_buffer[self.raw_in_pos..]))?; |
161 | let read = { |
162 | #[cfg (unix)] |
163 | { |
164 | let (read, fds) = res; |
165 | self.raw_in_fds.extend(fds); |
166 | read |
167 | } |
168 | #[cfg (not(unix))] |
169 | { |
170 | res |
171 | } |
172 | }; |
173 | self.raw_in_pos += read; |
174 | } |
175 | |
176 | // If we reach here, the message is complete; return it |
177 | self.raw_in_pos = 0; |
178 | let bytes = std::mem::take(&mut self.raw_in_buffer); |
179 | #[cfg (unix)] |
180 | let fds = std::mem::take(&mut self.raw_in_fds); |
181 | let seq = self.prev_seq + 1; |
182 | self.prev_seq = seq; |
183 | Poll::Ready(Message::from_raw_parts( |
184 | bytes, |
185 | #[cfg (unix)] |
186 | fds, |
187 | seq, |
188 | )) |
189 | } |
190 | |
191 | /// Close the connection. |
192 | /// |
193 | /// After this call, all reading and writing operations will fail. |
194 | pub fn close(&self) -> crate::Result<()> { |
195 | self.event.notify(usize::MAX); |
196 | self.socket().close().map_err(|e| e.into()) |
197 | } |
198 | |
199 | /// Access the underlying socket |
200 | /// |
201 | /// This method is intended to provide access to the socket in order to access certain |
202 | /// properties (e.g peer credentials). |
203 | /// |
204 | /// You should not try to read or write from it directly, as it may corrupt the internal state |
205 | /// of this wrapper. |
206 | pub fn socket(&self) -> &S { |
207 | &self.socket |
208 | } |
209 | |
210 | pub(crate) fn monitor_activity(&self) -> EventListener { |
211 | self.event.listen() |
212 | } |
213 | } |
214 | |
215 | impl Connection<Box<dyn Socket>> { |
216 | /// Same as `try_flush` above, except it wraps the method for use in [`std::future::Future`] |
217 | /// impls. |
218 | pub(crate) fn flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { |
219 | self.try_flush(cx).map_err(Into::into) |
220 | } |
221 | } |
222 | |
223 | #[cfg (unix)] |
224 | #[cfg (test)] |
225 | mod tests { |
226 | use super::{Arc, Connection}; |
227 | use crate::message::Message; |
228 | use futures_util::future::poll_fn; |
229 | use test_log::test; |
230 | |
231 | #[test ] |
232 | fn raw_send_receive() { |
233 | crate::block_on(raw_send_receive_async()); |
234 | } |
235 | |
236 | async fn raw_send_receive_async() { |
237 | #[cfg (not(feature = "tokio" ))] |
238 | let (p0, p1) = std::os::unix::net::UnixStream::pair() |
239 | .map(|(p0, p1)| { |
240 | ( |
241 | async_io::Async::new(p0).unwrap(), |
242 | async_io::Async::new(p1).unwrap(), |
243 | ) |
244 | }) |
245 | .unwrap(); |
246 | #[cfg (feature = "tokio" )] |
247 | let (p0, p1) = tokio::net::UnixStream::pair().unwrap(); |
248 | |
249 | let mut conn0 = Connection::new(p0, vec![]); |
250 | let mut conn1 = Connection::new(p1, vec![]); |
251 | |
252 | let msg = Message::method( |
253 | None::<()>, |
254 | None::<()>, |
255 | "/" , |
256 | Some("org.zbus.p2p" ), |
257 | "Test" , |
258 | &(), |
259 | ) |
260 | .unwrap(); |
261 | |
262 | conn0.enqueue_message(Arc::new(msg)); |
263 | poll_fn(|cx| conn0.try_flush(cx)).await.unwrap(); |
264 | |
265 | let ret = poll_fn(|cx| conn1.try_receive_message(cx)).await.unwrap(); |
266 | assert_eq!(ret.to_string(), "Method call Test" ); |
267 | } |
268 | } |
269 | |