1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support bind |
3 | |
4 | use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; |
5 | use tokio::net::{TcpListener, TcpStream}; |
6 | use tokio::try_join; |
7 | use tokio_test::task; |
8 | use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; |
9 | |
10 | use std::io; |
11 | use std::task::Poll; |
12 | use std::time::Duration; |
13 | |
14 | use futures::future::poll_fn; |
15 | |
16 | #[tokio::test ] |
17 | async fn set_linger() { |
18 | let listener = TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
19 | |
20 | let stream = TcpStream::connect(listener.local_addr().unwrap()) |
21 | .await |
22 | .unwrap(); |
23 | |
24 | assert_ok!(stream.set_linger(Some(Duration::from_secs(1)))); |
25 | assert_eq!(stream.linger().unwrap().unwrap().as_secs(), 1); |
26 | |
27 | assert_ok!(stream.set_linger(None)); |
28 | assert!(stream.linger().unwrap().is_none()); |
29 | } |
30 | |
31 | #[tokio::test ] |
32 | async fn try_read_write() { |
33 | const DATA: &[u8] = b"this is some data to write to the socket" ; |
34 | |
35 | // Create listener |
36 | let listener = TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
37 | |
38 | // Create socket pair |
39 | let client = TcpStream::connect(listener.local_addr().unwrap()) |
40 | .await |
41 | .unwrap(); |
42 | let (server, _) = listener.accept().await.unwrap(); |
43 | let mut written = DATA.to_vec(); |
44 | |
45 | // Track the server receiving data |
46 | let mut readable = task::spawn(server.readable()); |
47 | assert_pending!(readable.poll()); |
48 | |
49 | // Write data. |
50 | client.writable().await.unwrap(); |
51 | assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); |
52 | |
53 | // The task should be notified |
54 | while !readable.is_woken() { |
55 | tokio::task::yield_now().await; |
56 | } |
57 | |
58 | // Fill the write buffer using non-vectored I/O |
59 | loop { |
60 | // Still ready |
61 | let mut writable = task::spawn(client.writable()); |
62 | assert_ready_ok!(writable.poll()); |
63 | |
64 | match client.try_write(DATA) { |
65 | Ok(n) => written.extend(&DATA[..n]), |
66 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
67 | break; |
68 | } |
69 | Err(e) => panic!("error = {:?}" , e), |
70 | } |
71 | } |
72 | |
73 | { |
74 | // Write buffer full |
75 | let mut writable = task::spawn(client.writable()); |
76 | assert_pending!(writable.poll()); |
77 | |
78 | // Drain the socket from the server end using non-vectored I/O |
79 | let mut read = vec![0; written.len()]; |
80 | let mut i = 0; |
81 | |
82 | while i < read.len() { |
83 | server.readable().await.unwrap(); |
84 | |
85 | match server.try_read(&mut read[i..]) { |
86 | Ok(n) => i += n, |
87 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
88 | Err(e) => panic!("error = {:?}" , e), |
89 | } |
90 | } |
91 | |
92 | assert_eq!(read, written); |
93 | } |
94 | |
95 | written.clear(); |
96 | client.writable().await.unwrap(); |
97 | |
98 | // Fill the write buffer using vectored I/O |
99 | let data_bufs: Vec<_> = DATA.chunks(10).map(io::IoSlice::new).collect(); |
100 | loop { |
101 | // Still ready |
102 | let mut writable = task::spawn(client.writable()); |
103 | assert_ready_ok!(writable.poll()); |
104 | |
105 | match client.try_write_vectored(&data_bufs) { |
106 | Ok(n) => written.extend(&DATA[..n]), |
107 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
108 | break; |
109 | } |
110 | Err(e) => panic!("error = {:?}" , e), |
111 | } |
112 | } |
113 | |
114 | { |
115 | // Write buffer full |
116 | let mut writable = task::spawn(client.writable()); |
117 | assert_pending!(writable.poll()); |
118 | |
119 | // Drain the socket from the server end using vectored I/O |
120 | let mut read = vec![0; written.len()]; |
121 | let mut i = 0; |
122 | |
123 | while i < read.len() { |
124 | server.readable().await.unwrap(); |
125 | |
126 | let mut bufs: Vec<_> = read[i..] |
127 | .chunks_mut(0x10000) |
128 | .map(io::IoSliceMut::new) |
129 | .collect(); |
130 | match server.try_read_vectored(&mut bufs) { |
131 | Ok(n) => i += n, |
132 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
133 | Err(e) => panic!("error = {:?}" , e), |
134 | } |
135 | } |
136 | |
137 | assert_eq!(read, written); |
138 | } |
139 | |
140 | // Now, we listen for shutdown |
141 | drop(client); |
142 | |
143 | loop { |
144 | let ready = server.ready(Interest::READABLE).await.unwrap(); |
145 | |
146 | if ready.is_read_closed() { |
147 | return; |
148 | } else { |
149 | tokio::task::yield_now().await; |
150 | } |
151 | } |
152 | } |
153 | |
154 | #[test] |
155 | fn buffer_not_included_in_future() { |
156 | use std::mem; |
157 | |
158 | const N: usize = 4096; |
159 | |
160 | let fut = async { |
161 | let stream = TcpStream::connect("127.0.0.1:8080" ).await.unwrap(); |
162 | |
163 | loop { |
164 | stream.readable().await.unwrap(); |
165 | |
166 | let mut buf = [0; N]; |
167 | let n = stream.try_read(&mut buf[..]).unwrap(); |
168 | |
169 | if n == 0 { |
170 | break; |
171 | } |
172 | } |
173 | }; |
174 | |
175 | let n = mem::size_of_val(&fut); |
176 | assert!(n < 1000); |
177 | } |
178 | |
179 | macro_rules! assert_readable_by_polling { |
180 | ($stream:expr) => { |
181 | assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await); |
182 | }; |
183 | } |
184 | |
185 | macro_rules! assert_not_readable_by_polling { |
186 | ($stream:expr) => { |
187 | poll_fn(|cx| { |
188 | assert_pending!($stream.poll_read_ready(cx)); |
189 | Poll::Ready(()) |
190 | }) |
191 | .await; |
192 | }; |
193 | } |
194 | |
195 | macro_rules! assert_writable_by_polling { |
196 | ($stream:expr) => { |
197 | assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); |
198 | }; |
199 | } |
200 | |
201 | macro_rules! assert_not_writable_by_polling { |
202 | ($stream:expr) => { |
203 | poll_fn(|cx| { |
204 | assert_pending!($stream.poll_write_ready(cx)); |
205 | Poll::Ready(()) |
206 | }) |
207 | .await; |
208 | }; |
209 | } |
210 | |
211 | #[tokio::test ] |
212 | async fn poll_read_ready() { |
213 | let (mut client, mut server) = create_pair().await; |
214 | |
215 | // Initial state - not readable. |
216 | assert_not_readable_by_polling!(server); |
217 | |
218 | // There is data in the buffer - readable. |
219 | assert_ok!(client.write_all(b"ping" ).await); |
220 | assert_readable_by_polling!(server); |
221 | |
222 | // Readable until calls to `poll_read` return `Poll::Pending`. |
223 | let mut buf = [0u8; 4]; |
224 | assert_ok!(server.read_exact(&mut buf).await); |
225 | assert_readable_by_polling!(server); |
226 | read_until_pending(&mut server); |
227 | assert_not_readable_by_polling!(server); |
228 | |
229 | // Detect the client disconnect. |
230 | drop(client); |
231 | assert_readable_by_polling!(server); |
232 | } |
233 | |
234 | #[tokio::test ] |
235 | async fn poll_write_ready() { |
236 | let (mut client, server) = create_pair().await; |
237 | |
238 | // Initial state - writable. |
239 | assert_writable_by_polling!(client); |
240 | |
241 | // No space to write - not writable. |
242 | write_until_pending(&mut client); |
243 | assert_not_writable_by_polling!(client); |
244 | |
245 | // Detect the server disconnect. |
246 | drop(server); |
247 | assert_writable_by_polling!(client); |
248 | } |
249 | |
250 | async fn create_pair() -> (TcpStream, TcpStream) { |
251 | let listener = assert_ok!(TcpListener::bind("127.0.0.1:0" ).await); |
252 | let addr = assert_ok!(listener.local_addr()); |
253 | let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept())); |
254 | (client, server) |
255 | } |
256 | |
257 | fn read_until_pending(stream: &mut TcpStream) -> usize { |
258 | let mut buf = vec![0u8; 1024 * 1024]; |
259 | let mut total = 0; |
260 | loop { |
261 | match stream.try_read(&mut buf) { |
262 | Ok(n) => total += n, |
263 | Err(err) => { |
264 | assert_eq!(err.kind(), io::ErrorKind::WouldBlock); |
265 | break; |
266 | } |
267 | } |
268 | } |
269 | total |
270 | } |
271 | |
272 | fn write_until_pending(stream: &mut TcpStream) -> usize { |
273 | let buf = vec![0u8; 1024 * 1024]; |
274 | let mut total = 0; |
275 | loop { |
276 | match stream.try_write(&buf) { |
277 | Ok(n) => total += n, |
278 | Err(err) => { |
279 | assert_eq!(err.kind(), io::ErrorKind::WouldBlock); |
280 | break; |
281 | } |
282 | } |
283 | } |
284 | total |
285 | } |
286 | |
287 | #[tokio::test ] |
288 | async fn try_read_buf() { |
289 | const DATA: &[u8] = b"this is some data to write to the socket" ; |
290 | |
291 | // Create listener |
292 | let listener = TcpListener::bind("127.0.0.1:0" ).await.unwrap(); |
293 | |
294 | // Create socket pair |
295 | let client = TcpStream::connect(listener.local_addr().unwrap()) |
296 | .await |
297 | .unwrap(); |
298 | let (server, _) = listener.accept().await.unwrap(); |
299 | let mut written = DATA.to_vec(); |
300 | |
301 | // Track the server receiving data |
302 | let mut readable = task::spawn(server.readable()); |
303 | assert_pending!(readable.poll()); |
304 | |
305 | // Write data. |
306 | client.writable().await.unwrap(); |
307 | assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); |
308 | |
309 | // The task should be notified |
310 | while !readable.is_woken() { |
311 | tokio::task::yield_now().await; |
312 | } |
313 | |
314 | // Fill the write buffer |
315 | loop { |
316 | // Still ready |
317 | let mut writable = task::spawn(client.writable()); |
318 | assert_ready_ok!(writable.poll()); |
319 | |
320 | match client.try_write(DATA) { |
321 | Ok(n) => written.extend(&DATA[..n]), |
322 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { |
323 | break; |
324 | } |
325 | Err(e) => panic!("error = {:?}" , e), |
326 | } |
327 | } |
328 | |
329 | { |
330 | // Write buffer full |
331 | let mut writable = task::spawn(client.writable()); |
332 | assert_pending!(writable.poll()); |
333 | |
334 | // Drain the socket from the server end |
335 | let mut read = Vec::with_capacity(written.len()); |
336 | let mut i = 0; |
337 | |
338 | while i < read.capacity() { |
339 | server.readable().await.unwrap(); |
340 | |
341 | match server.try_read_buf(&mut read) { |
342 | Ok(n) => i += n, |
343 | Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, |
344 | Err(e) => panic!("error = {:?}" , e), |
345 | } |
346 | } |
347 | |
348 | assert_eq!(read, written); |
349 | } |
350 | |
351 | // Now, we listen for shutdown |
352 | drop(client); |
353 | |
354 | loop { |
355 | let ready = server.ready(Interest::READABLE).await.unwrap(); |
356 | |
357 | if ready.is_read_closed() { |
358 | return; |
359 | } else { |
360 | tokio::task::yield_now().await; |
361 | } |
362 | } |
363 | } |
364 | |
365 | // read_closed is a best effort event, so test only for no false positives. |
366 | #[tokio::test ] |
367 | async fn read_closed() { |
368 | let (client, mut server) = create_pair().await; |
369 | |
370 | let mut ready_fut = task::spawn(client.ready(Interest::READABLE)); |
371 | assert_pending!(ready_fut.poll()); |
372 | |
373 | assert_ok!(server.write_all(b"ping" ).await); |
374 | |
375 | let ready_event = assert_ok!(ready_fut.await); |
376 | |
377 | assert!(!ready_event.is_read_closed()); |
378 | } |
379 | |
380 | // write_closed is a best effort event, so test only for no false positives. |
381 | #[tokio::test ] |
382 | async fn write_closed() { |
383 | let (mut client, mut server) = create_pair().await; |
384 | |
385 | // Fill the write buffer. |
386 | let write_size = write_until_pending(&mut client); |
387 | let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE)); |
388 | assert_pending!(ready_fut.poll()); |
389 | |
390 | // Drain the socket to make client writable. |
391 | let mut read_size = 0; |
392 | while read_size < write_size { |
393 | server.readable().await.unwrap(); |
394 | read_size += read_until_pending(&mut server); |
395 | } |
396 | |
397 | let ready_event = assert_ok!(ready_fut.await); |
398 | |
399 | assert!(!ready_event.is_write_closed()); |
400 | } |
401 | |