1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | use bytes::BytesMut; |
5 | use futures::ready; |
6 | use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; |
7 | use tokio_test::assert_ok; |
8 | |
9 | use std::pin::Pin; |
10 | use std::task::{Context, Poll}; |
11 | |
12 | #[tokio::test ] |
13 | async fn copy() { |
14 | struct Rd(bool); |
15 | |
16 | impl AsyncRead for Rd { |
17 | fn poll_read( |
18 | mut self: Pin<&mut Self>, |
19 | _cx: &mut Context<'_>, |
20 | buf: &mut ReadBuf<'_>, |
21 | ) -> Poll<io::Result<()>> { |
22 | if self.0 { |
23 | buf.put_slice(b"hello world" ); |
24 | self.0 = false; |
25 | Poll::Ready(Ok(())) |
26 | } else { |
27 | Poll::Ready(Ok(())) |
28 | } |
29 | } |
30 | } |
31 | |
32 | let mut rd = Rd(true); |
33 | let mut wr = Vec::new(); |
34 | |
35 | let n = assert_ok!(io::copy(&mut rd, &mut wr).await); |
36 | assert_eq!(n, 11); |
37 | assert_eq!(wr, b"hello world" ); |
38 | } |
39 | |
40 | #[tokio::test ] |
41 | async fn proxy() { |
42 | struct BufferedWd { |
43 | buf: BytesMut, |
44 | writer: io::DuplexStream, |
45 | } |
46 | |
47 | impl AsyncWrite for BufferedWd { |
48 | fn poll_write( |
49 | self: Pin<&mut Self>, |
50 | _cx: &mut Context<'_>, |
51 | buf: &[u8], |
52 | ) -> Poll<io::Result<usize>> { |
53 | self.get_mut().buf.extend_from_slice(buf); |
54 | Poll::Ready(Ok(buf.len())) |
55 | } |
56 | |
57 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
58 | let this = self.get_mut(); |
59 | |
60 | while !this.buf.is_empty() { |
61 | let n = ready!(Pin::new(&mut this.writer).poll_write(cx, &this.buf))?; |
62 | let _ = this.buf.split_to(n); |
63 | } |
64 | |
65 | Pin::new(&mut this.writer).poll_flush(cx) |
66 | } |
67 | |
68 | fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { |
69 | Pin::new(&mut self.writer).poll_shutdown(cx) |
70 | } |
71 | } |
72 | |
73 | let (rd, wd) = io::duplex(1024); |
74 | let mut rd = rd.take(1024); |
75 | let mut wd = BufferedWd { |
76 | buf: BytesMut::new(), |
77 | writer: wd, |
78 | }; |
79 | |
80 | // write start bytes |
81 | assert_ok!(wd.write_all(&[0x42; 512]).await); |
82 | assert_ok!(wd.flush().await); |
83 | |
84 | let n = assert_ok!(io::copy(&mut rd, &mut wd).await); |
85 | |
86 | assert_eq!(n, 1024); |
87 | } |
88 | |