1#![warn(rust_2018_idioms)]
2#![cfg(all(feature = "full", not(target_os = "wasi")))] // Wasi doesn't support bind
3
4use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
5use tokio::net::{TcpListener, TcpStream};
6use tokio::try_join;
7use tokio_test::task;
8use tokio_test::{assert_ok, assert_pending, assert_ready_ok};
9
10use std::io;
11use std::task::Poll;
12use std::time::Duration;
13
14use futures::future::poll_fn;
15
16#[tokio::test]
17async fn set_linger() {
18 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
19
20 let stream = TcpStream::connect(listener.local_addr().unwrap())
21 .await
22 .unwrap();
23
24 assert_ok!(stream.set_linger(Some(Duration::from_secs(1))));
25 assert_eq!(stream.linger().unwrap().unwrap().as_secs(), 1);
26
27 assert_ok!(stream.set_linger(None));
28 assert!(stream.linger().unwrap().is_none());
29}
30
31#[tokio::test]
32async fn try_read_write() {
33 const DATA: &[u8] = b"this is some data to write to the socket";
34
35 // Create listener
36 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
37
38 // Create socket pair
39 let client = TcpStream::connect(listener.local_addr().unwrap())
40 .await
41 .unwrap();
42 let (server, _) = listener.accept().await.unwrap();
43 let mut written = DATA.to_vec();
44
45 // Track the server receiving data
46 let mut readable = task::spawn(server.readable());
47 assert_pending!(readable.poll());
48
49 // Write data.
50 client.writable().await.unwrap();
51 assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
52
53 // The task should be notified
54 while !readable.is_woken() {
55 tokio::task::yield_now().await;
56 }
57
58 // Fill the write buffer using non-vectored I/O
59 loop {
60 // Still ready
61 let mut writable = task::spawn(client.writable());
62 assert_ready_ok!(writable.poll());
63
64 match client.try_write(DATA) {
65 Ok(n) => written.extend(&DATA[..n]),
66 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
67 break;
68 }
69 Err(e) => panic!("error = {:?}", e),
70 }
71 }
72
73 {
74 // Write buffer full
75 let mut writable = task::spawn(client.writable());
76 assert_pending!(writable.poll());
77
78 // Drain the socket from the server end using non-vectored I/O
79 let mut read = vec![0; written.len()];
80 let mut i = 0;
81
82 while i < read.len() {
83 server.readable().await.unwrap();
84
85 match server.try_read(&mut read[i..]) {
86 Ok(n) => i += n,
87 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
88 Err(e) => panic!("error = {:?}", e),
89 }
90 }
91
92 assert_eq!(read, written);
93 }
94
95 written.clear();
96 client.writable().await.unwrap();
97
98 // Fill the write buffer using vectored I/O
99 let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect();
100 loop {
101 // Still ready
102 let mut writable = task::spawn(client.writable());
103 assert_ready_ok!(writable.poll());
104
105 match client.try_write_vectored(&data_bufs) {
106 Ok(n) => written.extend(&DATA[..n]),
107 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
108 break;
109 }
110 Err(e) => panic!("error = {:?}", e),
111 }
112 }
113
114 {
115 // Write buffer full
116 let mut writable = task::spawn(client.writable());
117 assert_pending!(writable.poll());
118
119 // Drain the socket from the server end using vectored I/O
120 let mut read = vec![0; written.len()];
121 let mut i = 0;
122
123 while i < read.len() {
124 server.readable().await.unwrap();
125
126 let mut bufs: Vec<_> = read[i..]
127 .chunks_mut(0x10000)
128 .map(io::IoSliceMut::new)
129 .collect();
130 match server.try_read_vectored(&mut bufs) {
131 Ok(n) => i += n,
132 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
133 Err(e) => panic!("error = {:?}", e),
134 }
135 }
136
137 assert_eq!(read, written);
138 }
139
140 // Now, we listen for shutdown
141 drop(client);
142
143 loop {
144 let ready = server.ready(Interest::READABLE).await.unwrap();
145
146 if ready.is_read_closed() {
147 return;
148 } else {
149 tokio::task::yield_now().await;
150 }
151 }
152}
153
154#[test]
155fn buffer_not_included_in_future() {
156 use std::mem;
157
158 const N: usize = 4096;
159
160 let fut = async {
161 let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
162
163 loop {
164 stream.readable().await.unwrap();
165
166 let mut buf = [0; N];
167 let n = stream.try_read(&mut buf[..]).unwrap();
168
169 if n == 0 {
170 break;
171 }
172 }
173 };
174
175 let n = mem::size_of_val(&fut);
176 assert!(n < 1000);
177}
178
179macro_rules! assert_readable_by_polling {
180 ($stream:expr) => {
181 assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await);
182 };
183}
184
185macro_rules! assert_not_readable_by_polling {
186 ($stream:expr) => {
187 poll_fn(|cx| {
188 assert_pending!($stream.poll_read_ready(cx));
189 Poll::Ready(())
190 })
191 .await;
192 };
193}
194
195macro_rules! assert_writable_by_polling {
196 ($stream:expr) => {
197 assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await);
198 };
199}
200
201macro_rules! assert_not_writable_by_polling {
202 ($stream:expr) => {
203 poll_fn(|cx| {
204 assert_pending!($stream.poll_write_ready(cx));
205 Poll::Ready(())
206 })
207 .await;
208 };
209}
210
211#[tokio::test]
212async fn poll_read_ready() {
213 let (mut client, mut server) = create_pair().await;
214
215 // Initial state - not readable.
216 assert_not_readable_by_polling!(server);
217
218 // There is data in the buffer - readable.
219 assert_ok!(client.write_all(b"ping").await);
220 assert_readable_by_polling!(server);
221
222 // Readable until calls to `poll_read` return `Poll::Pending`.
223 let mut buf = [0u8; 4];
224 assert_ok!(server.read_exact(&mut buf).await);
225 assert_readable_by_polling!(server);
226 read_until_pending(&mut server);
227 assert_not_readable_by_polling!(server);
228
229 // Detect the client disconnect.
230 drop(client);
231 assert_readable_by_polling!(server);
232}
233
234#[tokio::test]
235async fn poll_write_ready() {
236 let (mut client, server) = create_pair().await;
237
238 // Initial state - writable.
239 assert_writable_by_polling!(client);
240
241 // No space to write - not writable.
242 write_until_pending(&mut client);
243 assert_not_writable_by_polling!(client);
244
245 // Detect the server disconnect.
246 drop(server);
247 assert_writable_by_polling!(client);
248}
249
250async fn create_pair() -> (TcpStream, TcpStream) {
251 let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
252 let addr = assert_ok!(listener.local_addr());
253 let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept()));
254 (client, server)
255}
256
257fn read_until_pending(stream: &mut TcpStream) -> usize {
258 let mut buf = vec![0u8; 1024 * 1024];
259 let mut total = 0;
260 loop {
261 match stream.try_read(&mut buf) {
262 Ok(n) => total += n,
263 Err(err) => {
264 assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
265 break;
266 }
267 }
268 }
269 total
270}
271
272fn write_until_pending(stream: &mut TcpStream) -> usize {
273 let buf = vec![0u8; 1024 * 1024];
274 let mut total = 0;
275 loop {
276 match stream.try_write(&buf) {
277 Ok(n) => total += n,
278 Err(err) => {
279 assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
280 break;
281 }
282 }
283 }
284 total
285}
286
287#[tokio::test]
288async fn try_read_buf() {
289 const DATA: &[u8] = b"this is some data to write to the socket";
290
291 // Create listener
292 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
293
294 // Create socket pair
295 let client = TcpStream::connect(listener.local_addr().unwrap())
296 .await
297 .unwrap();
298 let (server, _) = listener.accept().await.unwrap();
299 let mut written = DATA.to_vec();
300
301 // Track the server receiving data
302 let mut readable = task::spawn(server.readable());
303 assert_pending!(readable.poll());
304
305 // Write data.
306 client.writable().await.unwrap();
307 assert_eq!(DATA.len(), client.try_write(DATA).unwrap());
308
309 // The task should be notified
310 while !readable.is_woken() {
311 tokio::task::yield_now().await;
312 }
313
314 // Fill the write buffer
315 loop {
316 // Still ready
317 let mut writable = task::spawn(client.writable());
318 assert_ready_ok!(writable.poll());
319
320 match client.try_write(DATA) {
321 Ok(n) => written.extend(&DATA[..n]),
322 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
323 break;
324 }
325 Err(e) => panic!("error = {:?}", e),
326 }
327 }
328
329 {
330 // Write buffer full
331 let mut writable = task::spawn(client.writable());
332 assert_pending!(writable.poll());
333
334 // Drain the socket from the server end
335 let mut read = Vec::with_capacity(written.len());
336 let mut i = 0;
337
338 while i < read.capacity() {
339 server.readable().await.unwrap();
340
341 match server.try_read_buf(&mut read) {
342 Ok(n) => i += n,
343 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
344 Err(e) => panic!("error = {:?}", e),
345 }
346 }
347
348 assert_eq!(read, written);
349 }
350
351 // Now, we listen for shutdown
352 drop(client);
353
354 loop {
355 let ready = server.ready(Interest::READABLE).await.unwrap();
356
357 if ready.is_read_closed() {
358 return;
359 } else {
360 tokio::task::yield_now().await;
361 }
362 }
363}
364
365// read_closed is a best effort event, so test only for no false positives.
366#[tokio::test]
367async fn read_closed() {
368 let (client, mut server) = create_pair().await;
369
370 let mut ready_fut = task::spawn(client.ready(Interest::READABLE));
371 assert_pending!(ready_fut.poll());
372
373 assert_ok!(server.write_all(b"ping").await);
374
375 let ready_event = assert_ok!(ready_fut.await);
376
377 assert!(!ready_event.is_read_closed());
378}
379
380// write_closed is a best effort event, so test only for no false positives.
381#[tokio::test]
382async fn write_closed() {
383 let (mut client, mut server) = create_pair().await;
384
385 // Fill the write buffer.
386 let write_size = write_until_pending(&mut client);
387 let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE));
388 assert_pending!(ready_fut.poll());
389
390 // Drain the socket to make client writable.
391 let mut read_size = 0;
392 while read_size < write_size {
393 server.readable().await.unwrap();
394 read_size += read_until_pending(&mut server);
395 }
396
397 let ready_event = assert_ok!(ready_fut.await);
398
399 assert!(!ready_event.is_write_closed());
400}
401