1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt};
5
6#[tokio::test]
7async fn ping_pong() {
8 let (mut a, mut b) = duplex(32);
9
10 let mut buf = [0u8; 4];
11
12 a.write_all(b"ping").await.unwrap();
13 b.read_exact(&mut buf).await.unwrap();
14 assert_eq!(&buf, b"ping");
15
16 b.write_all(b"pong").await.unwrap();
17 a.read_exact(&mut buf).await.unwrap();
18 assert_eq!(&buf, b"pong");
19}
20
21#[tokio::test]
22async fn across_tasks() {
23 let (mut a, mut b) = duplex(32);
24
25 let t1 = tokio::spawn(async move {
26 a.write_all(b"ping").await.unwrap();
27 let mut buf = [0u8; 4];
28 a.read_exact(&mut buf).await.unwrap();
29 assert_eq!(&buf, b"pong");
30 });
31
32 let t2 = tokio::spawn(async move {
33 let mut buf = [0u8; 4];
34 b.read_exact(&mut buf).await.unwrap();
35 assert_eq!(&buf, b"ping");
36 b.write_all(b"pong").await.unwrap();
37 });
38
39 t1.await.unwrap();
40 t2.await.unwrap();
41}
42
43#[tokio::test]
44async fn disconnect() {
45 let (mut a, mut b) = duplex(32);
46
47 let t1 = tokio::spawn(async move {
48 a.write_all(b"ping").await.unwrap();
49 // and dropped
50 });
51
52 let t2 = tokio::spawn(async move {
53 let mut buf = [0u8; 32];
54 let n = b.read(&mut buf).await.unwrap();
55 assert_eq!(&buf[..n], b"ping");
56
57 let n = b.read(&mut buf).await.unwrap();
58 assert_eq!(n, 0);
59 });
60
61 t1.await.unwrap();
62 t2.await.unwrap();
63}
64
65#[tokio::test]
66async fn disconnect_reader() {
67 let (a, mut b) = duplex(2);
68
69 let t1 = tokio::spawn(async move {
70 // this will block, as not all data fits into duplex
71 b.write_all(b"ping").await.unwrap_err();
72 });
73
74 let t2 = tokio::spawn(async move {
75 // here we drop the reader side, and we expect the writer in the other
76 // task to exit with an error
77 drop(a);
78 });
79
80 t2.await.unwrap();
81 t1.await.unwrap();
82}
83
84#[tokio::test]
85async fn max_write_size() {
86 let (mut a, mut b) = duplex(32);
87
88 let t1 = tokio::spawn(async move {
89 let n = a.write(&[0u8; 64]).await.unwrap();
90 assert_eq!(n, 32);
91 let n = a.write(&[0u8; 64]).await.unwrap();
92 assert_eq!(n, 4);
93 });
94
95 let mut buf = [0u8; 4];
96 b.read_exact(&mut buf).await.unwrap();
97
98 t1.await.unwrap();
99
100 // drop b only after task t1 finishes writing
101 drop(b);
102}
103
104#[tokio::test]
105async fn duplex_is_cooperative() {
106 let (mut tx, mut rx) = tokio::io::duplex(1024 * 8);
107
108 tokio::select! {
109 biased;
110
111 _ = async {
112 loop {
113 let buf = [3u8; 4096];
114 tx.write_all(&buf).await.unwrap();
115 let mut buf = [0u8; 4096];
116 let _ = rx.read(&mut buf).await.unwrap();
117 }
118 } => {},
119 _ = tokio::task::yield_now() => {}
120 }
121}
122