1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support bind |
3 | |
4 | use std::io::{Error, ErrorKind, Result}; |
5 | use std::io::{Read, Write}; |
6 | use std::{net, thread}; |
7 | |
8 | use tokio::io::{AsyncReadExt, AsyncWriteExt}; |
9 | use tokio::net::{TcpListener, TcpStream}; |
10 | use tokio::try_join; |
11 | |
12 | #[tokio::test ] |
13 | async fn split() -> Result<()> { |
14 | const MSG: &[u8] = b"split" ; |
15 | |
16 | let listener = TcpListener::bind("127.0.0.1:0" ).await?; |
17 | let addr = listener.local_addr()?; |
18 | |
19 | let (stream1, (mut stream2, _)) = try_join! { |
20 | TcpStream::connect(&addr), |
21 | listener.accept(), |
22 | }?; |
23 | let (mut read_half, mut write_half) = stream1.into_split(); |
24 | |
25 | let ((), (), ()) = try_join! { |
26 | async { |
27 | let len = stream2.write(MSG).await?; |
28 | assert_eq!(len, MSG.len()); |
29 | |
30 | let mut read_buf = vec![0u8; 32]; |
31 | let read_len = stream2.read(&mut read_buf).await?; |
32 | assert_eq!(&read_buf[..read_len], MSG); |
33 | Result::Ok(()) |
34 | }, |
35 | async { |
36 | let len = write_half.write(MSG).await?; |
37 | assert_eq!(len, MSG.len()); |
38 | Ok(()) |
39 | }, |
40 | async { |
41 | let mut read_buf = vec![0u8; 32]; |
42 | let peek_len1 = read_half.peek(&mut read_buf[..]).await?; |
43 | let peek_len2 = read_half.peek(&mut read_buf[..]).await?; |
44 | assert_eq!(peek_len1, peek_len2); |
45 | |
46 | let read_len = read_half.read(&mut read_buf[..]).await?; |
47 | assert_eq!(peek_len1, read_len); |
48 | assert_eq!(&read_buf[..read_len], MSG); |
49 | Ok(()) |
50 | }, |
51 | }?; |
52 | |
53 | Ok(()) |
54 | } |
55 | |
56 | #[tokio::test ] |
57 | async fn reunite() -> Result<()> { |
58 | let listener = net::TcpListener::bind("127.0.0.1:0" )?; |
59 | let addr = listener.local_addr()?; |
60 | |
61 | let handle = thread::spawn(move || { |
62 | drop(listener.accept().unwrap()); |
63 | drop(listener.accept().unwrap()); |
64 | }); |
65 | |
66 | let stream1 = TcpStream::connect(&addr).await?; |
67 | let (read1, write1) = stream1.into_split(); |
68 | |
69 | let stream2 = TcpStream::connect(&addr).await?; |
70 | let (_, write2) = stream2.into_split(); |
71 | |
72 | let read1 = match read1.reunite(write2) { |
73 | Ok(_) => panic!("Reunite should not succeed" ), |
74 | Err(err) => err.0, |
75 | }; |
76 | |
77 | read1.reunite(write1).expect("Reunite should succeed" ); |
78 | |
79 | handle.join().unwrap(); |
80 | Ok(()) |
81 | } |
82 | |
83 | /// Test that dropping the write half actually closes the stream. |
84 | #[tokio::test ] |
85 | async fn drop_write() -> Result<()> { |
86 | const MSG: &[u8] = b"split" ; |
87 | |
88 | let listener = net::TcpListener::bind("127.0.0.1:0" )?; |
89 | let addr = listener.local_addr()?; |
90 | |
91 | let handle = thread::spawn(move || { |
92 | let (mut stream, _) = listener.accept().unwrap(); |
93 | stream.write_all(MSG).unwrap(); |
94 | |
95 | let mut read_buf = [0u8; 32]; |
96 | let res = match stream.read(&mut read_buf) { |
97 | Ok(0) => Ok(()), |
98 | Ok(len) => Err(Error::new( |
99 | ErrorKind::Other, |
100 | format!("Unexpected read: {} bytes." , len), |
101 | )), |
102 | Err(err) => Err(err), |
103 | }; |
104 | |
105 | drop(stream); |
106 | |
107 | res |
108 | }); |
109 | |
110 | let stream = TcpStream::connect(&addr).await?; |
111 | let (mut read_half, write_half) = stream.into_split(); |
112 | |
113 | let mut read_buf = [0u8; 32]; |
114 | let read_len = read_half.read(&mut read_buf[..]).await?; |
115 | assert_eq!(&read_buf[..read_len], MSG); |
116 | |
117 | // drop it while the read is in progress |
118 | std::thread::spawn(move || { |
119 | thread::sleep(std::time::Duration::from_millis(10)); |
120 | drop(write_half); |
121 | }); |
122 | |
123 | match read_half.read(&mut read_buf[..]).await { |
124 | Ok(0) => {} |
125 | Ok(len) => panic!("Unexpected read: {} bytes." , len), |
126 | Err(err) => panic!("Unexpected error: {}." , err), |
127 | } |
128 | |
129 | handle.join().unwrap().unwrap(); |
130 | Ok(()) |
131 | } |
132 | |