| 1 | use super::copy::CopyBuffer; |
| 2 | |
| 3 | use crate::io::{AsyncRead, AsyncWrite}; |
| 4 | |
| 5 | use std::future::poll_fn; |
| 6 | use std::io; |
| 7 | use std::pin::Pin; |
| 8 | use std::task::{ready, Context, Poll}; |
| 9 | |
| 10 | enum TransferState { |
| 11 | Running(CopyBuffer), |
| 12 | ShuttingDown(u64), |
| 13 | Done(u64), |
| 14 | } |
| 15 | |
| 16 | fn transfer_one_direction<A, B>( |
| 17 | cx: &mut Context<'_>, |
| 18 | state: &mut TransferState, |
| 19 | r: &mut A, |
| 20 | w: &mut B, |
| 21 | ) -> Poll<io::Result<u64>> |
| 22 | where |
| 23 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 24 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 25 | { |
| 26 | let mut r: Pin<&mut A> = Pin::new(pointer:r); |
| 27 | let mut w: Pin<&mut B> = Pin::new(pointer:w); |
| 28 | |
| 29 | loop { |
| 30 | match state { |
| 31 | TransferState::Running(buf: &mut CopyBuffer) => { |
| 32 | let count: u64 = ready!(buf.poll_copy(cx, r.as_mut(), w.as_mut()))?; |
| 33 | *state = TransferState::ShuttingDown(count); |
| 34 | } |
| 35 | TransferState::ShuttingDown(count: &mut u64) => { |
| 36 | ready!(w.as_mut().poll_shutdown(cx))?; |
| 37 | |
| 38 | *state = TransferState::Done(*count); |
| 39 | } |
| 40 | TransferState::Done(count: &mut u64) => return Poll::Ready(Ok(*count)), |
| 41 | } |
| 42 | } |
| 43 | } |
| 44 | /// Copies data in both directions between `a` and `b`. |
| 45 | /// |
| 46 | /// This function returns a future that will read from both streams, |
| 47 | /// writing any data read to the opposing stream. |
| 48 | /// This happens in both directions concurrently. |
| 49 | /// |
| 50 | /// If an EOF is observed on one stream, [`shutdown()`] will be invoked on |
| 51 | /// the other, and reading from that stream will stop. Copying of data in |
| 52 | /// the other direction will continue. |
| 53 | /// |
| 54 | /// The future will complete successfully once both directions of communication has been shut down. |
| 55 | /// A direction is shut down when the reader reports EOF, |
| 56 | /// at which point [`shutdown()`] is called on the corresponding writer. When finished, |
| 57 | /// it will return a tuple of the number of bytes copied from a to b |
| 58 | /// and the number of bytes copied from b to a, in that order. |
| 59 | /// |
| 60 | /// It uses two 8 KB buffers for transferring bytes between `a` and `b` by default. |
| 61 | /// To set your own buffers sizes use [`copy_bidirectional_with_sizes()`]. |
| 62 | /// |
| 63 | /// [`shutdown()`]: crate::io::AsyncWriteExt::shutdown |
| 64 | /// |
| 65 | /// # Errors |
| 66 | /// |
| 67 | /// The future will immediately return an error if any IO operation on `a` |
| 68 | /// or `b` returns an error. Some data read from either stream may be lost (not |
| 69 | /// written to the other stream) in this case. |
| 70 | /// |
| 71 | /// # Return value |
| 72 | /// |
| 73 | /// Returns a tuple of bytes copied `a` to `b` and bytes copied `b` to `a`. |
| 74 | #[cfg_attr (docsrs, doc(cfg(feature = "io-util" )))] |
| 75 | pub async fn copy_bidirectional<A, B>(a: &mut A, b: &mut B) -> io::Result<(u64, u64)> |
| 76 | where |
| 77 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 78 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 79 | { |
| 80 | copy_bidirectional_implimpl Future( |
| 81 | a, |
| 82 | b, |
| 83 | a_to_b_buffer:CopyBuffer::new(super::DEFAULT_BUF_SIZE), |
| 84 | b_to_a_buffer:CopyBuffer::new(buf_size:super::DEFAULT_BUF_SIZE), |
| 85 | ) |
| 86 | .await |
| 87 | } |
| 88 | |
| 89 | /// Copies data in both directions between `a` and `b` using buffers of the specified size. |
| 90 | /// |
| 91 | /// This method is the same as the [`copy_bidirectional()`], except that it allows you to set the |
| 92 | /// size of the internal buffers used when copying data. |
| 93 | #[cfg_attr (docsrs, doc(cfg(feature = "io-util" )))] |
| 94 | pub async fn copy_bidirectional_with_sizes<A, B>( |
| 95 | a: &mut A, |
| 96 | b: &mut B, |
| 97 | a_to_b_buf_size: usize, |
| 98 | b_to_a_buf_size: usize, |
| 99 | ) -> io::Result<(u64, u64)> |
| 100 | where |
| 101 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 102 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 103 | { |
| 104 | copy_bidirectional_implimpl Future( |
| 105 | a, |
| 106 | b, |
| 107 | a_to_b_buffer:CopyBuffer::new(a_to_b_buf_size), |
| 108 | b_to_a_buffer:CopyBuffer::new(b_to_a_buf_size), |
| 109 | ) |
| 110 | .await |
| 111 | } |
| 112 | |
| 113 | async fn copy_bidirectional_impl<A, B>( |
| 114 | a: &mut A, |
| 115 | b: &mut B, |
| 116 | a_to_b_buffer: CopyBuffer, |
| 117 | b_to_a_buffer: CopyBuffer, |
| 118 | ) -> io::Result<(u64, u64)> |
| 119 | where |
| 120 | A: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 121 | B: AsyncRead + AsyncWrite + Unpin + ?Sized, |
| 122 | { |
| 123 | let mut a_to_b: TransferState = TransferState::Running(a_to_b_buffer); |
| 124 | let mut b_to_a: TransferState = TransferState::Running(b_to_a_buffer); |
| 125 | poll_fnPollFn) -> …>(|cx: &mut Context<'_>| { |
| 126 | let a_to_b: Poll = transfer_one_direction(cx, &mut a_to_b, r:a, w:b)?; |
| 127 | let b_to_a: Poll = transfer_one_direction(cx, &mut b_to_a, r:b, w:a)?; |
| 128 | |
| 129 | // It is not a problem if ready! returns early because transfer_one_direction for the |
| 130 | // other direction will keep returning TransferState::Done(count) in future calls to poll |
| 131 | let a_to_b: u64 = ready!(a_to_b); |
| 132 | let b_to_a: u64 = ready!(b_to_a); |
| 133 | |
| 134 | Poll::Ready(Ok((a_to_b, b_to_a))) |
| 135 | }) |
| 136 | .await |
| 137 | } |
| 138 | |