1 | use super::copy::CopyBuffer; |
2 | |
3 | use crate::io::{AsyncRead, AsyncWrite}; |
4 | |
5 | use std::future::poll_fn; |
6 | use std::io; |
7 | use std::pin::Pin; |
8 | use std::task::{ready, Context, Poll}; |
9 | |
10 | enum TransferState { |
11 | Running(CopyBuffer), |
12 | ShuttingDown(u64), |
13 | Done(u64), |
14 | } |
15 | |
16 | fn transfer_one_direction<A, B>( |
17 | cx: &mut Context<'_>, |
18 | state: &mut TransferState, |
19 | r: &mut A, |
20 | w: &mut B, |
21 | ) -> Poll<io::Result<u64>> |
22 | where |
23 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
24 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
25 | { |
26 | let mut r: Pin<&mut A> = Pin::new(pointer:r); |
27 | let mut w: Pin<&mut B> = Pin::new(pointer:w); |
28 | |
29 | loop { |
30 | match state { |
31 | TransferState::Running(buf: &mut CopyBuffer) => { |
32 | let count: u64 = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?; |
33 | *state = TransferState::ShuttingDown(count); |
34 | } |
35 | TransferState::ShuttingDown(count: &mut u64) => { |
36 | ready!(w.as_mut().poll_shutdown(cx))?; |
37 | |
38 | *state = TransferState::Done(*count); |
39 | } |
40 | TransferState::Done(count: &mut u64) => return Poll::Ready(Ok(*count)), |
41 | } |
42 | } |
43 | } |
44 | /// Copies data in both directions between `a` and `b`. |
45 | /// |
46 | /// This function returns a future that will read from both streams, |
47 | /// writing any data read to the opposing stream. |
48 | /// This happens in both directions concurrently. |
49 | /// |
50 | /// If an EOF is observed on one stream, [`shutdown()`] will be invoked on |
51 | /// the other, and reading from that stream will stop. Copying of data in |
52 | /// the other direction will continue. |
53 | /// |
54 | /// The future will complete successfully once both directions of communication has been shut down. |
55 | /// A direction is shut down when the reader reports EOF, |
56 | /// at which point [`shutdown()`] is called on the corresponding writer. When finished, |
57 | /// it will return a tuple of the number of bytes copied from a to b |
58 | /// and the number of bytes copied from b to a, in that order. |
59 | /// |
60 | /// It uses two 8 KB buffers for transferring bytes between `a` and `b` by default. |
61 | /// To set your own buffers sizes use [`copy_bidirectional_with_sizes()`]. |
62 | /// |
63 | /// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown |
64 | /// |
65 | /// # Errors |
66 | /// |
67 | /// The future will immediately return an error if any IO operation on `a` |
68 | /// or `b` returns an error. Some data read from either stream may be lost (not |
69 | /// written to the other stream) in this case. |
70 | /// |
71 | /// # Return value |
72 | /// |
73 | /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`. |
74 | #[cfg_attr (docsrs, doc(cfg(feature = "io-util" )))] |
75 | pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> io::Result<(u64, u64)> |
76 | where |
77 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
78 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
79 | { |
80 | copy_bidirectional_implimpl Future( |
81 | a, |
82 | b, |
83 | a_to_b_buffer:CopyBuffer::new(super::DEFAULT_BUF_SIZE), |
84 | b_to_a_buffer:CopyBuffer::new(buf_size:super::DEFAULT_BUF_SIZE), |
85 | ) |
86 | .await |
87 | } |
88 | |
89 | /// Copies data in both directions between `a` and `b` using buffers of the specified size. |
90 | /// |
91 | /// This method is the same as the [`copy_bidirectional()`], except that it allows you to set the |
92 | /// size of the internal buffers used when copying data. |
93 | #[cfg_attr (docsrs, doc(cfg(feature = "io-util" )))] |
94 | pub async fn copy_bidirectional_with_sizes<A, B>( |
95 | a: &mut A, |
96 | b: &mut B, |
97 | a_to_b_buf_size: usize, |
98 | b_to_a_buf_size: usize, |
99 | ) -> io::Result<(u64, u64)> |
100 | where |
101 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
102 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
103 | { |
104 | copy_bidirectional_implimpl Future( |
105 | a, |
106 | b, |
107 | a_to_b_buffer:CopyBuffer::new(a_to_b_buf_size), |
108 | b_to_a_buffer:CopyBuffer::new(b_to_a_buf_size), |
109 | ) |
110 | .await |
111 | } |
112 | |
113 | async fn copy_bidirectional_impl<A, B>( |
114 | a: &mut A, |
115 | b: &mut B, |
116 | a_to_b_buffer: CopyBuffer, |
117 | b_to_a_buffer: CopyBuffer, |
118 | ) -> io::Result<(u64, u64)> |
119 | where |
120 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
121 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
122 | { |
123 | let mut a_to_b: TransferState = TransferState::Running(a_to_b_buffer); |
124 | let mut b_to_a: TransferState = TransferState::Running(b_to_a_buffer); |
125 | poll_fnPollFn) -> …>(|cx: &mut Context<'_>| { |
126 | let a_to_b: Poll = transfer_one_direction(cx, &mut a_to_b, r:a, w:b)?; |
127 | let b_to_a: Poll = transfer_one_direction(cx, &mut b_to_a, r:b, w:a)?; |
128 | |
129 | // It is not a problem if ready! returns early because transfer_one_direction for the |
130 | // other direction will keep returning TransferState::Done(count) in future calls to poll |
131 | let a_to_b: u64 = ready!(a_to_b); |
132 | let b_to_a: u64 = ready!(b_to_a); |
133 | |
134 | Poll::Ready(Ok((a_to_b, b_to_a))) |
135 | }) |
136 | .await |
137 | } |
138 | |