| 1 | #[cfg (feature = "tokio-vsock" )] |
| 2 | use super::{Socket, Split}; |
| 3 | |
| 4 | #[cfg (all(feature = "vsock" , not(feature = "tokio" )))] |
| 5 | #[async_trait::async_trait ] |
| 6 | impl super::ReadHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> { |
| 7 | async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult { |
| 8 | match futures_util::AsyncReadExt::read(&mut self.as_ref(), buf).await { |
| 9 | Err(e) => Err(e), |
| 10 | Ok(len) => { |
| 11 | #[cfg (unix)] |
| 12 | let ret = (len, vec![]); |
| 13 | #[cfg (not(unix))] |
| 14 | let ret = len; |
| 15 | Ok(ret) |
| 16 | } |
| 17 | } |
| 18 | } |
| 19 | } |
| 20 | |
| 21 | #[cfg (all(feature = "vsock" , not(feature = "tokio" )))] |
| 22 | #[async_trait::async_trait ] |
| 23 | impl super::WriteHalf for std::sync::Arc<async_io::Async<vsock::VsockStream>> { |
| 24 | async fn sendmsg( |
| 25 | &mut self, |
| 26 | buf: &[u8], |
| 27 | #[cfg (unix)] fds: &[std::os::fd::BorrowedFd<'_>], |
| 28 | ) -> std::io::Result<usize> { |
| 29 | use std::io; |
| 30 | |
| 31 | #[cfg (unix)] |
| 32 | if !fds.is_empty() { |
| 33 | return Err(io::Error::new( |
| 34 | io::ErrorKind::InvalidInput, |
| 35 | "fds cannot be sent with a vsock stream" , |
| 36 | )); |
| 37 | } |
| 38 | |
| 39 | futures_util::AsyncWriteExt::write(&mut self.as_ref(), buf).await |
| 40 | } |
| 41 | |
| 42 | async fn close(&mut self) -> std::io::Result<()> { |
| 43 | let stream = self.clone(); |
| 44 | crate::Task::spawn_blocking( |
| 45 | move || stream.get_ref().shutdown(std::net::Shutdown::Both), |
| 46 | "close socket" , |
| 47 | ) |
| 48 | .await |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | #[cfg (feature = "tokio-vsock" )] |
| 53 | impl Socket for tokio_vsock::VsockStream { |
| 54 | type ReadHalf = tokio_vsock::ReadHalf; |
| 55 | type WriteHalf = tokio_vsock::WriteHalf; |
| 56 | |
| 57 | fn split(self) -> Split<Self::ReadHalf, Self::WriteHalf> { |
| 58 | let (read, write) = self.split(); |
| 59 | |
| 60 | Split { read, write } |
| 61 | } |
| 62 | } |
| 63 | |
| 64 | #[cfg (feature = "tokio-vsock" )] |
| 65 | #[async_trait::async_trait ] |
| 66 | impl super::ReadHalf for tokio_vsock::ReadHalf { |
| 67 | async fn recvmsg(&mut self, buf: &mut [u8]) -> super::RecvmsgResult { |
| 68 | use tokio::io::{AsyncReadExt, ReadBuf}; |
| 69 | |
| 70 | let mut read_buf = ReadBuf::new(buf); |
| 71 | self.read_buf(&mut read_buf).await.map(|_| { |
| 72 | let ret = read_buf.filled().len(); |
| 73 | #[cfg (unix)] |
| 74 | let ret = (ret, vec![]); |
| 75 | |
| 76 | ret |
| 77 | }) |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | #[cfg (feature = "tokio-vsock" )] |
| 82 | #[async_trait::async_trait ] |
| 83 | impl super::WriteHalf for tokio_vsock::WriteHalf { |
| 84 | async fn sendmsg( |
| 85 | &mut self, |
| 86 | buf: &[u8], |
| 87 | #[cfg (unix)] fds: &[std::os::fd::BorrowedFd<'_>], |
| 88 | ) -> std::io::Result<usize> { |
| 89 | use std::io; |
| 90 | use tokio::io::AsyncWriteExt; |
| 91 | |
| 92 | #[cfg (unix)] |
| 93 | if !fds.is_empty() { |
| 94 | return Err(io::Error::new( |
| 95 | io::ErrorKind::InvalidInput, |
| 96 | "fds cannot be sent with a vsock stream" , |
| 97 | )); |
| 98 | } |
| 99 | |
| 100 | self.write(buf).await |
| 101 | } |
| 102 | |
| 103 | async fn close(&mut self) -> std::io::Result<()> { |
| 104 | tokio::io::AsyncWriteExt::shutdown(self).await |
| 105 | } |
| 106 | } |
| 107 | |