1#![warn(rust_2018_idioms)]
2#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi does not support bind or UDP
3
4use futures::future::poll_fn;
5use std::io;
6use std::sync::Arc;
7use tokio::{io::ReadBuf, net::UdpSocket};
8use tokio_test::assert_ok;
9
10const MSG: &[u8] = b"hello";
11const MSG_LEN: usize = MSG.len();
12
13#[tokio::test]
14async fn send_recv() -> std::io::Result<()> {
15 let sender = UdpSocket::bind("127.0.0.1:0").await?;
16 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
17
18 sender.connect(receiver.local_addr()?).await?;
19 receiver.connect(sender.local_addr()?).await?;
20
21 sender.send(MSG).await?;
22
23 let mut recv_buf = [0u8; 32];
24 let len = receiver.recv(&mut recv_buf[..]).await?;
25
26 assert_eq!(&recv_buf[..len], MSG);
27 Ok(())
28}
29
30#[tokio::test]
31async fn send_recv_poll() -> std::io::Result<()> {
32 let sender = UdpSocket::bind("127.0.0.1:0").await?;
33 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
34
35 sender.connect(receiver.local_addr()?).await?;
36 receiver.connect(sender.local_addr()?).await?;
37
38 poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
39
40 let mut recv_buf = [0u8; 32];
41 let mut read = ReadBuf::new(&mut recv_buf);
42 poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
43
44 assert_eq!(read.filled(), MSG);
45 Ok(())
46}
47
48#[tokio::test]
49async fn send_to_recv_from() -> std::io::Result<()> {
50 let sender = UdpSocket::bind("127.0.0.1:0").await?;
51 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
52
53 let receiver_addr = receiver.local_addr()?;
54 sender.send_to(MSG, &receiver_addr).await?;
55
56 let mut recv_buf = [0u8; 32];
57 let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
58
59 assert_eq!(&recv_buf[..len], MSG);
60 assert_eq!(addr, sender.local_addr()?);
61 Ok(())
62}
63
64#[tokio::test]
65async fn send_to_recv_from_poll() -> std::io::Result<()> {
66 let sender = UdpSocket::bind("127.0.0.1:0").await?;
67 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
68
69 let receiver_addr = receiver.local_addr()?;
70 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
71
72 let mut recv_buf = [0u8; 32];
73 let mut read = ReadBuf::new(&mut recv_buf);
74 let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
75
76 assert_eq!(read.filled(), MSG);
77 assert_eq!(addr, sender.local_addr()?);
78 Ok(())
79}
80
81#[tokio::test]
82async fn send_to_peek_from() -> std::io::Result<()> {
83 let sender = UdpSocket::bind("127.0.0.1:0").await?;
84 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
85
86 let receiver_addr = receiver.local_addr()?;
87 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
88
89 // peek
90 let mut recv_buf = [0u8; 32];
91 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
92 assert_eq!(&recv_buf[..n], MSG);
93 assert_eq!(addr, sender.local_addr()?);
94
95 // peek
96 let mut recv_buf = [0u8; 32];
97 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
98 assert_eq!(&recv_buf[..n], MSG);
99 assert_eq!(addr, sender.local_addr()?);
100
101 let mut recv_buf = [0u8; 32];
102 let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
103 assert_eq!(&recv_buf[..n], MSG);
104 assert_eq!(addr, sender.local_addr()?);
105
106 Ok(())
107}
108
109#[tokio::test]
110async fn send_to_try_peek_from() -> std::io::Result<()> {
111 let sender = UdpSocket::bind("127.0.0.1:0").await?;
112 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
113
114 let receiver_addr = receiver.local_addr()?;
115 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
116
117 // peek
118 let mut recv_buf = [0u8; 32];
119
120 loop {
121 match receiver.try_peek_from(&mut recv_buf) {
122 Ok((n, addr)) => {
123 assert_eq!(&recv_buf[..n], MSG);
124 assert_eq!(addr, sender.local_addr()?);
125 break;
126 }
127 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
128 receiver.readable().await?;
129 }
130 Err(e) => return Err(e),
131 }
132 }
133
134 // peek
135 let mut recv_buf = [0u8; 32];
136 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
137 assert_eq!(&recv_buf[..n], MSG);
138 assert_eq!(addr, sender.local_addr()?);
139
140 let mut recv_buf = [0u8; 32];
141 let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
142 assert_eq!(&recv_buf[..n], MSG);
143 assert_eq!(addr, sender.local_addr()?);
144
145 Ok(())
146}
147
148#[tokio::test]
149async fn send_to_peek_from_poll() -> std::io::Result<()> {
150 let sender = UdpSocket::bind("127.0.0.1:0").await?;
151 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
152
153 let receiver_addr = receiver.local_addr()?;
154 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
155
156 let mut recv_buf = [0u8; 32];
157 let mut read = ReadBuf::new(&mut recv_buf);
158 let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
159
160 assert_eq!(read.filled(), MSG);
161 assert_eq!(addr, sender.local_addr()?);
162
163 let mut recv_buf = [0u8; 32];
164 let mut read = ReadBuf::new(&mut recv_buf);
165 poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
166
167 assert_eq!(read.filled(), MSG);
168 let mut recv_buf = [0u8; 32];
169 let mut read = ReadBuf::new(&mut recv_buf);
170
171 poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
172 assert_eq!(read.filled(), MSG);
173 Ok(())
174}
175
176#[tokio::test]
177async fn peek_sender() -> std::io::Result<()> {
178 let sender = UdpSocket::bind("127.0.0.1:0").await?;
179 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
180
181 let sender_addr = sender.local_addr()?;
182 let receiver_addr = receiver.local_addr()?;
183
184 let msg = b"Hello, world!";
185 sender.send_to(msg, receiver_addr).await?;
186
187 let peeked_sender = receiver.peek_sender().await?;
188 assert_eq!(peeked_sender, sender_addr);
189
190 // Assert that `peek_sender()` returns the right sender but
191 // doesn't remove from the receive queue.
192 let mut recv_buf = [0u8; 32];
193 let (read, received_sender) = receiver.recv_from(&mut recv_buf).await?;
194
195 assert_eq!(&recv_buf[..read], msg);
196 assert_eq!(received_sender, peeked_sender);
197
198 Ok(())
199}
200
201#[tokio::test]
202async fn poll_peek_sender() -> std::io::Result<()> {
203 let sender = UdpSocket::bind("127.0.0.1:0").await?;
204 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
205
206 let sender_addr = sender.local_addr()?;
207 let receiver_addr = receiver.local_addr()?;
208
209 let msg = b"Hello, world!";
210 poll_fn(|cx| sender.poll_send_to(cx, msg, receiver_addr)).await?;
211
212 let peeked_sender = poll_fn(|cx| receiver.poll_peek_sender(cx)).await?;
213 assert_eq!(peeked_sender, sender_addr);
214
215 // Assert that `poll_peek_sender()` returns the right sender but
216 // doesn't remove from the receive queue.
217 let mut recv_buf = [0u8; 32];
218 let mut read = ReadBuf::new(&mut recv_buf);
219 let received_sender = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
220
221 assert_eq!(read.filled(), msg);
222 assert_eq!(received_sender, peeked_sender);
223
224 Ok(())
225}
226
227#[tokio::test]
228async fn try_peek_sender() -> std::io::Result<()> {
229 let sender = UdpSocket::bind("127.0.0.1:0").await?;
230 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
231
232 let sender_addr = sender.local_addr()?;
233 let receiver_addr = receiver.local_addr()?;
234
235 let msg = b"Hello, world!";
236 sender.send_to(msg, receiver_addr).await?;
237
238 let peeked_sender = loop {
239 match receiver.try_peek_sender() {
240 Ok(peeked_sender) => break peeked_sender,
241 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
242 receiver.readable().await?;
243 }
244 Err(e) => return Err(e),
245 }
246 };
247
248 assert_eq!(peeked_sender, sender_addr);
249
250 // Assert that `try_peek_sender()` returns the right sender but
251 // didn't remove from the receive queue.
252 let mut recv_buf = [0u8; 32];
253 // We already peeked the sender so there must be data in the receive queue.
254 let (read, received_sender) = receiver.try_recv_from(&mut recv_buf).unwrap();
255
256 assert_eq!(&recv_buf[..read], msg);
257 assert_eq!(received_sender, peeked_sender);
258
259 Ok(())
260}
261
262#[tokio::test]
263async fn split() -> std::io::Result<()> {
264 let socket = UdpSocket::bind("127.0.0.1:0").await?;
265 let s = Arc::new(socket);
266 let r = s.clone();
267
268 let addr = s.local_addr()?;
269 tokio::spawn(async move {
270 s.send_to(MSG, &addr).await.unwrap();
271 });
272 let mut recv_buf = [0u8; 32];
273 let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
274 assert_eq!(&recv_buf[..len], MSG);
275 Ok(())
276}
277
278#[tokio::test]
279async fn split_chan() -> std::io::Result<()> {
280 // setup UdpSocket that will echo all sent items
281 let socket = UdpSocket::bind("127.0.0.1:0").await?;
282 let addr = socket.local_addr().unwrap();
283 let s = Arc::new(socket);
284 let r = s.clone();
285
286 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
287 tokio::spawn(async move {
288 while let Some((bytes, addr)) = rx.recv().await {
289 s.send_to(&bytes, &addr).await.unwrap();
290 }
291 });
292
293 tokio::spawn(async move {
294 let mut buf = [0u8; 32];
295 loop {
296 let (len, addr) = r.recv_from(&mut buf).await.unwrap();
297 tx.send((buf[..len].to_vec(), addr)).await.unwrap();
298 }
299 });
300
301 // test that we can send a value and get back some response
302 let sender = UdpSocket::bind("127.0.0.1:0").await?;
303 sender.send_to(MSG, addr).await?;
304 let mut recv_buf = [0u8; 32];
305 let (len, _) = sender.recv_from(&mut recv_buf).await?;
306 assert_eq!(&recv_buf[..len], MSG);
307 Ok(())
308}
309
310#[tokio::test]
311async fn split_chan_poll() -> std::io::Result<()> {
312 // setup UdpSocket that will echo all sent items
313 let socket = UdpSocket::bind("127.0.0.1:0").await?;
314 let addr = socket.local_addr().unwrap();
315 let s = Arc::new(socket);
316 let r = s.clone();
317
318 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
319 tokio::spawn(async move {
320 while let Some((bytes, addr)) = rx.recv().await {
321 poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
322 .await
323 .unwrap();
324 }
325 });
326
327 tokio::spawn(async move {
328 let mut recv_buf = [0u8; 32];
329 let mut read = ReadBuf::new(&mut recv_buf);
330 loop {
331 let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
332 tx.send((read.filled().to_vec(), addr)).await.unwrap();
333 }
334 });
335
336 // test that we can send a value and get back some response
337 let sender = UdpSocket::bind("127.0.0.1:0").await?;
338 poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
339
340 let mut recv_buf = [0u8; 32];
341 let mut read = ReadBuf::new(&mut recv_buf);
342 let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
343 assert_eq!(read.filled(), MSG);
344 Ok(())
345}
346
347// # Note
348//
349// This test is purposely written such that each time `sender` sends data on
350// the socket, `receiver` awaits the data. On Unix, it would be okay waiting
351// until the end of the test to receive all the data. On Windows, this would
352// **not** be okay because it's resources are completion based (via IOCP).
353// If data is sent and not yet received, attempting to send more data will
354// result in `ErrorKind::WouldBlock` until the first operation completes.
355#[tokio::test]
356async fn try_send_spawn() {
357 const MSG2: &[u8] = b"world!";
358 const MSG2_LEN: usize = MSG2.len();
359
360 let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
361 let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
362
363 receiver
364 .connect(sender.local_addr().unwrap())
365 .await
366 .unwrap();
367
368 sender.writable().await.unwrap();
369
370 let sent = &sender
371 .try_send_to(MSG, receiver.local_addr().unwrap())
372 .unwrap();
373 assert_eq!(sent, &MSG_LEN);
374 let mut buf = [0u8; 32];
375 let mut received = receiver.recv(&mut buf[..]).await.unwrap();
376
377 sender
378 .connect(receiver.local_addr().unwrap())
379 .await
380 .unwrap();
381 let sent = &sender.try_send(MSG2).unwrap();
382 assert_eq!(sent, &MSG2_LEN);
383 received += receiver.recv(&mut buf[..]).await.unwrap();
384
385 std::thread::spawn(move || {
386 let sent = &sender.try_send(MSG).unwrap();
387 assert_eq!(sent, &MSG_LEN);
388 })
389 .join()
390 .unwrap();
391 received += receiver.recv(&mut buf[..]).await.unwrap();
392
393 assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
394}
395
396#[tokio::test]
397async fn try_send_recv() {
398 // Create listener
399 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
400
401 // Create socket pair
402 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
403
404 // Connect the two
405 client.connect(server.local_addr().unwrap()).await.unwrap();
406 server.connect(client.local_addr().unwrap()).await.unwrap();
407
408 for _ in 0..5 {
409 loop {
410 client.writable().await.unwrap();
411
412 match client.try_send(b"hello world") {
413 Ok(n) => {
414 assert_eq!(n, 11);
415 break;
416 }
417 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
418 Err(e) => panic!("{:?}", e),
419 }
420 }
421
422 loop {
423 server.readable().await.unwrap();
424
425 let mut buf = [0; 512];
426
427 match server.try_recv(&mut buf) {
428 Ok(n) => {
429 assert_eq!(n, 11);
430 assert_eq!(&buf[0..11], &b"hello world"[..]);
431 break;
432 }
433 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
434 Err(e) => panic!("{:?}", e),
435 }
436 }
437 }
438}
439
440#[tokio::test]
441async fn try_send_to_recv_from() {
442 // Create listener
443 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
444 let saddr = server.local_addr().unwrap();
445
446 // Create socket pair
447 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
448 let caddr = client.local_addr().unwrap();
449
450 for _ in 0..5 {
451 loop {
452 client.writable().await.unwrap();
453
454 match client.try_send_to(b"hello world", saddr) {
455 Ok(n) => {
456 assert_eq!(n, 11);
457 break;
458 }
459 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
460 Err(e) => panic!("{:?}", e),
461 }
462 }
463
464 loop {
465 server.readable().await.unwrap();
466
467 let mut buf = [0; 512];
468
469 match server.try_recv_from(&mut buf) {
470 Ok((n, addr)) => {
471 assert_eq!(n, 11);
472 assert_eq!(addr, caddr);
473 assert_eq!(&buf[0..11], &b"hello world"[..]);
474 break;
475 }
476 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
477 Err(e) => panic!("{:?}", e),
478 }
479 }
480 }
481}
482
483#[tokio::test]
484async fn try_recv_buf() {
485 // Create listener
486 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
487
488 // Create socket pair
489 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
490
491 // Connect the two
492 client.connect(server.local_addr().unwrap()).await.unwrap();
493 server.connect(client.local_addr().unwrap()).await.unwrap();
494
495 for _ in 0..5 {
496 loop {
497 client.writable().await.unwrap();
498
499 match client.try_send(b"hello world") {
500 Ok(n) => {
501 assert_eq!(n, 11);
502 break;
503 }
504 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
505 Err(e) => panic!("{:?}", e),
506 }
507 }
508
509 loop {
510 server.readable().await.unwrap();
511
512 let mut buf = Vec::with_capacity(512);
513
514 match server.try_recv_buf(&mut buf) {
515 Ok(n) => {
516 assert_eq!(n, 11);
517 assert_eq!(&buf[0..11], &b"hello world"[..]);
518 break;
519 }
520 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
521 Err(e) => panic!("{:?}", e),
522 }
523 }
524 }
525}
526
527#[tokio::test]
528async fn recv_buf() -> std::io::Result<()> {
529 let sender = UdpSocket::bind("127.0.0.1:0").await?;
530 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
531
532 sender.connect(receiver.local_addr()?).await?;
533 receiver.connect(sender.local_addr()?).await?;
534
535 sender.send(MSG).await?;
536 let mut recv_buf = Vec::with_capacity(32);
537 let len = receiver.recv_buf(&mut recv_buf).await?;
538
539 assert_eq!(len, MSG_LEN);
540 assert_eq!(&recv_buf[..len], MSG);
541 Ok(())
542}
543
544#[tokio::test]
545async fn try_recv_buf_from() {
546 // Create listener
547 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
548 let saddr = server.local_addr().unwrap();
549
550 // Create socket pair
551 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
552 let caddr = client.local_addr().unwrap();
553
554 for _ in 0..5 {
555 loop {
556 client.writable().await.unwrap();
557
558 match client.try_send_to(b"hello world", saddr) {
559 Ok(n) => {
560 assert_eq!(n, 11);
561 break;
562 }
563 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
564 Err(e) => panic!("{:?}", e),
565 }
566 }
567
568 loop {
569 server.readable().await.unwrap();
570
571 let mut buf = Vec::with_capacity(512);
572
573 match server.try_recv_buf_from(&mut buf) {
574 Ok((n, addr)) => {
575 assert_eq!(n, 11);
576 assert_eq!(addr, caddr);
577 assert_eq!(&buf[0..11], &b"hello world"[..]);
578 break;
579 }
580 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
581 Err(e) => panic!("{:?}", e),
582 }
583 }
584 }
585}
586
587#[tokio::test]
588async fn recv_buf_from() -> std::io::Result<()> {
589 let sender = UdpSocket::bind("127.0.0.1:0").await?;
590 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
591
592 sender.connect(receiver.local_addr()?).await?;
593
594 sender.send(MSG).await?;
595 let mut recv_buf = Vec::with_capacity(32);
596 let (len, caddr) = receiver.recv_buf_from(&mut recv_buf).await?;
597
598 assert_eq!(len, MSG_LEN);
599 assert_eq!(&recv_buf[..len], MSG);
600 assert_eq!(caddr, sender.local_addr()?);
601 Ok(())
602}
603
604#[tokio::test]
605async fn poll_ready() {
606 // Create listener
607 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
608 let saddr = server.local_addr().unwrap();
609
610 // Create socket pair
611 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
612 let caddr = client.local_addr().unwrap();
613
614 for _ in 0..5 {
615 loop {
616 assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
617
618 match client.try_send_to(b"hello world", saddr) {
619 Ok(n) => {
620 assert_eq!(n, 11);
621 break;
622 }
623 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
624 Err(e) => panic!("{:?}", e),
625 }
626 }
627
628 loop {
629 assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
630
631 let mut buf = Vec::with_capacity(512);
632
633 match server.try_recv_buf_from(&mut buf) {
634 Ok((n, addr)) => {
635 assert_eq!(n, 11);
636 assert_eq!(addr, caddr);
637 assert_eq!(&buf[0..11], &b"hello world"[..]);
638 break;
639 }
640 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
641 Err(e) => panic!("{:?}", e),
642 }
643 }
644 }
645}
646