1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3#![cfg(unix)]
4
5use futures::future::poll_fn;
6use tokio::io::ReadBuf;
7use tokio::net::UnixDatagram;
8use tokio::try_join;
9
10use std::io;
11use std::sync::Arc;
12
13async fn echo_server(socket: UnixDatagram) -> io::Result<()> {
14 let mut recv_buf = vec![0u8; 1024];
15 loop {
16 let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?;
17 if let Some(path) = peer_addr.as_pathname() {
18 socket.send_to(&recv_buf[..len], path).await?;
19 }
20 }
21}
22
23#[tokio::test]
24async fn echo() -> io::Result<()> {
25 let dir = tempfile::tempdir().unwrap();
26 let server_path = dir.path().join("server.sock");
27 let client_path = dir.path().join("client.sock");
28
29 let server_socket = UnixDatagram::bind(server_path.clone())?;
30
31 tokio::spawn(async move {
32 let _ = echo_server(server_socket).await;
33 });
34
35 {
36 let socket = UnixDatagram::bind(&client_path).unwrap();
37 socket.connect(server_path)?;
38 socket.send(b"ECHO").await?;
39 let mut recv_buf = [0u8; 16];
40 let len = socket.recv(&mut recv_buf[..]).await?;
41 assert_eq!(&recv_buf[..len], b"ECHO");
42 }
43
44 Ok(())
45}
46
47#[tokio::test]
48async fn echo_from() -> io::Result<()> {
49 let dir = tempfile::tempdir().unwrap();
50 let server_path = dir.path().join("server.sock");
51 let client_path = dir.path().join("client.sock");
52
53 let server_socket = UnixDatagram::bind(server_path.clone())?;
54
55 tokio::spawn(async move {
56 let _ = echo_server(server_socket).await;
57 });
58
59 {
60 let socket = UnixDatagram::bind(&client_path).unwrap();
61 socket.connect(&server_path)?;
62 socket.send(b"ECHO").await?;
63 let mut recv_buf = [0u8; 16];
64 let (len, addr) = socket.recv_from(&mut recv_buf[..]).await?;
65 assert_eq!(&recv_buf[..len], b"ECHO");
66 assert_eq!(addr.as_pathname(), Some(server_path.as_path()));
67 }
68
69 Ok(())
70}
71
72// Even though we use sync non-blocking io we still need a reactor.
73#[tokio::test]
74async fn try_send_recv_never_block() -> io::Result<()> {
75 let mut recv_buf = [0u8; 16];
76 let payload = b"PAYLOAD";
77 let mut count = 0;
78
79 let (dgram1, dgram2) = UnixDatagram::pair()?;
80
81 // Send until we hit the OS `net.unix.max_dgram_qlen`.
82 loop {
83 dgram1.writable().await.unwrap();
84
85 match dgram1.try_send(payload) {
86 Err(err) => match (err.kind(), err.raw_os_error()) {
87 (io::ErrorKind::WouldBlock, _) => break,
88 (_, Some(libc::ENOBUFS)) => break,
89 _ => {
90 panic!("unexpected error {:?}", err);
91 }
92 },
93 Ok(len) => {
94 assert_eq!(len, payload.len());
95 }
96 }
97 count += 1;
98 }
99
100 // Read every dgram we sent.
101 while count > 0 {
102 dgram2.readable().await.unwrap();
103 let len = dgram2.try_recv(&mut recv_buf[..])?;
104 assert_eq!(len, payload.len());
105 assert_eq!(payload, &recv_buf[..len]);
106 count -= 1;
107 }
108
109 let err = dgram2.try_recv(&mut recv_buf[..]).unwrap_err();
110 match err.kind() {
111 io::ErrorKind::WouldBlock => (),
112 _ => unreachable!("unexpected error {:?}", err),
113 }
114
115 Ok(())
116}
117
118#[tokio::test]
119async fn split() -> std::io::Result<()> {
120 let dir = tempfile::tempdir().unwrap();
121 let path = dir.path().join("split.sock");
122 let s = Arc::new(UnixDatagram::bind(path.clone())?);
123 let r = s.clone();
124
125 let msg = b"hello";
126 let ((), ()) = try_join! {
127 async {
128 s.send_to(msg, path).await?;
129 io::Result::Ok(())
130 },
131 async {
132 let mut recv_buf = [0u8; 32];
133 let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
134 assert_eq!(&recv_buf[..len], msg);
135 Ok(())
136 },
137 }?;
138
139 Ok(())
140}
141
142#[tokio::test]
143async fn send_to_recv_from_poll() -> std::io::Result<()> {
144 let dir = tempfile::tempdir().unwrap();
145 let sender_path = dir.path().join("sender.sock");
146 let receiver_path = dir.path().join("receiver.sock");
147
148 let sender = UnixDatagram::bind(&sender_path)?;
149 let receiver = UnixDatagram::bind(&receiver_path)?;
150
151 let msg = b"hello";
152 poll_fn(|cx| sender.poll_send_to(cx, msg, &receiver_path)).await?;
153
154 let mut recv_buf = [0u8; 32];
155 let mut read = ReadBuf::new(&mut recv_buf);
156 let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
157
158 assert_eq!(read.filled(), msg);
159 assert_eq!(addr.as_pathname(), Some(sender_path.as_ref()));
160 Ok(())
161}
162
163#[tokio::test]
164async fn send_recv_poll() -> std::io::Result<()> {
165 let dir = tempfile::tempdir().unwrap();
166 let sender_path = dir.path().join("sender.sock");
167 let receiver_path = dir.path().join("receiver.sock");
168
169 let sender = UnixDatagram::bind(&sender_path)?;
170 let receiver = UnixDatagram::bind(&receiver_path)?;
171
172 sender.connect(&receiver_path)?;
173 receiver.connect(&sender_path)?;
174
175 let msg = b"hello";
176 poll_fn(|cx| sender.poll_send(cx, msg)).await?;
177
178 let mut recv_buf = [0u8; 32];
179 let mut read = ReadBuf::new(&mut recv_buf);
180 poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
181
182 assert_eq!(read.filled(), msg);
183 Ok(())
184}
185
186#[tokio::test]
187async fn try_send_to_recv_from() -> std::io::Result<()> {
188 let dir = tempfile::tempdir().unwrap();
189 let server_path = dir.path().join("server.sock");
190 let client_path = dir.path().join("client.sock");
191
192 // Create listener
193 let server = UnixDatagram::bind(&server_path)?;
194
195 // Create socket pair
196 let client = UnixDatagram::bind(&client_path)?;
197
198 for _ in 0..5 {
199 loop {
200 client.writable().await?;
201
202 match client.try_send_to(b"hello world", &server_path) {
203 Ok(n) => {
204 assert_eq!(n, 11);
205 break;
206 }
207 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
208 Err(e) => panic!("{:?}", e),
209 }
210 }
211
212 loop {
213 server.readable().await?;
214
215 let mut buf = [0; 512];
216
217 match server.try_recv_from(&mut buf) {
218 Ok((n, addr)) => {
219 assert_eq!(n, 11);
220 assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
221 assert_eq!(&buf[0..11], &b"hello world"[..]);
222 break;
223 }
224 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
225 Err(e) => panic!("{:?}", e),
226 }
227 }
228 }
229
230 Ok(())
231}
232
233#[tokio::test]
234async fn try_recv_buf_from() -> std::io::Result<()> {
235 let dir = tempfile::tempdir().unwrap();
236 let server_path = dir.path().join("server.sock");
237 let client_path = dir.path().join("client.sock");
238
239 // Create listener
240 let server = UnixDatagram::bind(&server_path)?;
241
242 // Create socket pair
243 let client = UnixDatagram::bind(&client_path)?;
244
245 for _ in 0..5 {
246 loop {
247 client.writable().await?;
248
249 match client.try_send_to(b"hello world", &server_path) {
250 Ok(n) => {
251 assert_eq!(n, 11);
252 break;
253 }
254 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
255 Err(e) => panic!("{:?}", e),
256 }
257 }
258
259 loop {
260 server.readable().await?;
261
262 let mut buf = Vec::with_capacity(512);
263
264 match server.try_recv_buf_from(&mut buf) {
265 Ok((n, addr)) => {
266 assert_eq!(n, 11);
267 assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
268 assert_eq!(&buf[0..11], &b"hello world"[..]);
269 break;
270 }
271 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
272 Err(e) => panic!("{:?}", e),
273 }
274 }
275 }
276
277 Ok(())
278}
279
280#[tokio::test]
281async fn recv_buf_from() -> std::io::Result<()> {
282 let tmp = tempfile::tempdir()?;
283
284 // Bind each socket to a filesystem path
285 let tx_path = tmp.path().join("tx");
286 let tx = UnixDatagram::bind(&tx_path)?;
287 let rx_path = tmp.path().join("rx");
288 let rx = UnixDatagram::bind(&rx_path)?;
289
290 let bytes = b"hello world";
291 tx.send_to(bytes, &rx_path).await?;
292
293 let mut buf = Vec::with_capacity(24);
294 let (size, addr) = rx.recv_buf_from(&mut buf).await?;
295
296 let dgram = &buf[..size];
297 assert_eq!(dgram, bytes);
298 assert_eq!(addr.as_pathname().unwrap(), &tx_path);
299 Ok(())
300}
301
302// Even though we use sync non-blocking io we still need a reactor.
303#[tokio::test]
304async fn try_recv_buf_never_block() -> io::Result<()> {
305 let payload = b"PAYLOAD";
306 let mut count = 0;
307
308 let (dgram1, dgram2) = UnixDatagram::pair()?;
309
310 // Send until we hit the OS `net.unix.max_dgram_qlen`.
311 loop {
312 dgram1.writable().await.unwrap();
313
314 match dgram1.try_send(payload) {
315 Err(err) => match (err.kind(), err.raw_os_error()) {
316 (io::ErrorKind::WouldBlock, _) => break,
317 (_, Some(libc::ENOBUFS)) => break,
318 _ => {
319 panic!("unexpected error {:?}", err);
320 }
321 },
322 Ok(len) => {
323 assert_eq!(len, payload.len());
324 }
325 }
326 count += 1;
327 }
328
329 // Read every dgram we sent.
330 while count > 0 {
331 let mut recv_buf = Vec::with_capacity(16);
332
333 dgram2.readable().await.unwrap();
334 let len = dgram2.try_recv_buf(&mut recv_buf)?;
335 assert_eq!(len, payload.len());
336 assert_eq!(payload, &recv_buf[..len]);
337 count -= 1;
338 }
339
340 let mut recv_buf = vec![0; 16];
341 let err = dgram2.try_recv_from(&mut recv_buf).unwrap_err();
342 match err.kind() {
343 io::ErrorKind::WouldBlock => (),
344 _ => unreachable!("unexpected error {:?}", err),
345 }
346
347 Ok(())
348}
349
350#[tokio::test]
351async fn recv_buf() -> std::io::Result<()> {
352 // Create the pair of sockets
353 let (sock1, sock2) = UnixDatagram::pair()?;
354
355 // Since the sockets are paired, the paired send/recv
356 // functions can be used
357 let bytes = b"hello world";
358 sock1.send(bytes).await?;
359
360 let mut buff = Vec::with_capacity(24);
361 let size = sock2.recv_buf(&mut buff).await?;
362
363 let dgram = &buff[..size];
364 assert_eq!(dgram, bytes);
365 Ok(())
366}
367
368#[tokio::test]
369async fn poll_ready() -> io::Result<()> {
370 let dir = tempfile::tempdir().unwrap();
371 let server_path = dir.path().join("server.sock");
372 let client_path = dir.path().join("client.sock");
373
374 // Create listener
375 let server = UnixDatagram::bind(&server_path)?;
376
377 // Create socket pair
378 let client = UnixDatagram::bind(&client_path)?;
379
380 for _ in 0..5 {
381 loop {
382 poll_fn(|cx| client.poll_send_ready(cx)).await?;
383
384 match client.try_send_to(b"hello world", &server_path) {
385 Ok(n) => {
386 assert_eq!(n, 11);
387 break;
388 }
389 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
390 Err(e) => panic!("{:?}", e),
391 }
392 }
393
394 loop {
395 poll_fn(|cx| server.poll_recv_ready(cx)).await?;
396
397 let mut buf = Vec::with_capacity(512);
398
399 match server.try_recv_buf_from(&mut buf) {
400 Ok((n, addr)) => {
401 assert_eq!(n, 11);
402 assert_eq!(addr.as_pathname(), Some(client_path.as_ref()));
403 assert_eq!(&buf[0..11], &b"hello world"[..]);
404 break;
405 }
406 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
407 Err(e) => panic!("{:?}", e),
408 }
409 }
410 }
411
412 Ok(())
413}
414