| 1 | #![cfg (feature = "io-util" )] |
| 2 | #![cfg (not(target_os = "wasi" ))] // Wasi doesn't support threads |
| 3 | |
| 4 | use std::error::Error; |
| 5 | use std::io::{Cursor, Read, Result as IoResult, Write}; |
| 6 | use tokio::io::{AsyncRead, AsyncReadExt}; |
| 7 | use tokio_util::io::SyncIoBridge; |
| 8 | |
| 9 | async fn test_reader_len( |
| 10 | r: impl AsyncRead + Unpin + Send + 'static, |
| 11 | expected_len: usize, |
| 12 | ) -> IoResult<()> { |
| 13 | let mut r = SyncIoBridge::new(r); |
| 14 | let res = tokio::task::spawn_blocking(move || { |
| 15 | let mut buf = Vec::new(); |
| 16 | r.read_to_end(&mut buf)?; |
| 17 | Ok::<_, std::io::Error>(buf) |
| 18 | }) |
| 19 | .await?; |
| 20 | assert_eq!(res?.len(), expected_len); |
| 21 | Ok(()) |
| 22 | } |
| 23 | |
| 24 | #[tokio::test ] |
| 25 | async fn test_async_read_to_sync() -> Result<(), Box<dyn Error>> { |
| 26 | test_reader_len(tokio::io::empty(), 0).await?; |
| 27 | let buf = b"hello world" ; |
| 28 | test_reader_len(Cursor::new(buf), buf.len()).await?; |
| 29 | Ok(()) |
| 30 | } |
| 31 | |
| 32 | #[tokio::test ] |
| 33 | async fn test_async_write_to_sync() -> Result<(), Box<dyn Error>> { |
| 34 | let mut dest = Vec::new(); |
| 35 | let src = b"hello world" ; |
| 36 | let dest = tokio::task::spawn_blocking(move || -> Result<_, String> { |
| 37 | let mut w = SyncIoBridge::new(Cursor::new(&mut dest)); |
| 38 | std::io::copy(&mut Cursor::new(src), &mut w).map_err(|e| e.to_string())?; |
| 39 | Ok(dest) |
| 40 | }) |
| 41 | .await??; |
| 42 | assert_eq!(dest.as_slice(), src); |
| 43 | Ok(()) |
| 44 | } |
| 45 | |
| 46 | #[tokio::test ] |
| 47 | async fn test_into_inner() -> Result<(), Box<dyn Error>> { |
| 48 | let mut buf = Vec::new(); |
| 49 | SyncIoBridge::new(tokio::io::empty()) |
| 50 | .into_inner() |
| 51 | .read_to_end(&mut buf) |
| 52 | .await |
| 53 | .unwrap(); |
| 54 | assert_eq!(buf.len(), 0); |
| 55 | Ok(()) |
| 56 | } |
| 57 | |
| 58 | #[tokio::test ] |
| 59 | async fn test_shutdown() -> Result<(), Box<dyn Error>> { |
| 60 | let (s1, mut s2) = tokio::io::duplex(1024); |
| 61 | let (_rh, wh) = tokio::io::split(s1); |
| 62 | tokio::task::spawn_blocking(move || -> std::io::Result<_> { |
| 63 | let mut wh = SyncIoBridge::new(wh); |
| 64 | wh.write_all(b"hello" )?; |
| 65 | wh.shutdown()?; |
| 66 | assert!(wh.write_all(b" world" ).is_err()); |
| 67 | Ok(()) |
| 68 | }) |
| 69 | .await??; |
| 70 | let mut buf = vec![]; |
| 71 | s2.read_to_end(&mut buf).await?; |
| 72 | assert_eq!(buf, b"hello" ); |
| 73 | Ok(()) |
| 74 | } |
| 75 | |