1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; |
5 | |
6 | #[tokio::test ] |
7 | async fn ping_pong() { |
8 | let (mut a, mut b) = duplex(32); |
9 | |
10 | let mut buf = [0u8; 4]; |
11 | |
12 | a.write_all(b"ping" ).await.unwrap(); |
13 | b.read_exact(&mut buf).await.unwrap(); |
14 | assert_eq!(&buf, b"ping" ); |
15 | |
16 | b.write_all(b"pong" ).await.unwrap(); |
17 | a.read_exact(&mut buf).await.unwrap(); |
18 | assert_eq!(&buf, b"pong" ); |
19 | } |
20 | |
21 | #[tokio::test ] |
22 | async fn across_tasks() { |
23 | let (mut a, mut b) = duplex(32); |
24 | |
25 | let t1 = tokio::spawn(async move { |
26 | a.write_all(b"ping" ).await.unwrap(); |
27 | let mut buf = [0u8; 4]; |
28 | a.read_exact(&mut buf).await.unwrap(); |
29 | assert_eq!(&buf, b"pong" ); |
30 | }); |
31 | |
32 | let t2 = tokio::spawn(async move { |
33 | let mut buf = [0u8; 4]; |
34 | b.read_exact(&mut buf).await.unwrap(); |
35 | assert_eq!(&buf, b"ping" ); |
36 | b.write_all(b"pong" ).await.unwrap(); |
37 | }); |
38 | |
39 | t1.await.unwrap(); |
40 | t2.await.unwrap(); |
41 | } |
42 | |
43 | #[tokio::test ] |
44 | async fn disconnect() { |
45 | let (mut a, mut b) = duplex(32); |
46 | |
47 | let t1 = tokio::spawn(async move { |
48 | a.write_all(b"ping" ).await.unwrap(); |
49 | // and dropped |
50 | }); |
51 | |
52 | let t2 = tokio::spawn(async move { |
53 | let mut buf = [0u8; 32]; |
54 | let n = b.read(&mut buf).await.unwrap(); |
55 | assert_eq!(&buf[..n], b"ping" ); |
56 | |
57 | let n = b.read(&mut buf).await.unwrap(); |
58 | assert_eq!(n, 0); |
59 | }); |
60 | |
61 | t1.await.unwrap(); |
62 | t2.await.unwrap(); |
63 | } |
64 | |
65 | #[tokio::test ] |
66 | async fn disconnect_reader() { |
67 | let (a, mut b) = duplex(2); |
68 | |
69 | let t1 = tokio::spawn(async move { |
70 | // this will block, as not all data fits into duplex |
71 | b.write_all(b"ping" ).await.unwrap_err(); |
72 | }); |
73 | |
74 | let t2 = tokio::spawn(async move { |
75 | // here we drop the reader side, and we expect the writer in the other |
76 | // task to exit with an error |
77 | drop(a); |
78 | }); |
79 | |
80 | t2.await.unwrap(); |
81 | t1.await.unwrap(); |
82 | } |
83 | |
84 | #[tokio::test ] |
85 | async fn max_write_size() { |
86 | let (mut a, mut b) = duplex(32); |
87 | |
88 | let t1 = tokio::spawn(async move { |
89 | let n = a.write(&[0u8; 64]).await.unwrap(); |
90 | assert_eq!(n, 32); |
91 | let n = a.write(&[0u8; 64]).await.unwrap(); |
92 | assert_eq!(n, 4); |
93 | }); |
94 | |
95 | let mut buf = [0u8; 4]; |
96 | b.read_exact(&mut buf).await.unwrap(); |
97 | |
98 | t1.await.unwrap(); |
99 | |
100 | // drop b only after task t1 finishes writing |
101 | drop(b); |
102 | } |
103 | |
104 | #[tokio::test ] |
105 | async fn duplex_is_cooperative() { |
106 | let (mut tx, mut rx) = tokio::io::duplex(1024 * 8); |
107 | |
108 | tokio::select! { |
109 | biased; |
110 | |
111 | _ = async { |
112 | loop { |
113 | let buf = [3u8; 4096]; |
114 | tx.write_all(&buf).await.unwrap(); |
115 | let mut buf = [0u8; 4096]; |
116 | let _ = rx.read(&mut buf).await.unwrap(); |
117 | } |
118 | } => {}, |
119 | _ = tokio::task::yield_now() => {} |
120 | } |
121 | } |
122 | |