| 1 | #![warn (rust_2018_idioms)] |
| 2 | |
| 3 | use bytes::Bytes; |
| 4 | use futures_util::SinkExt; |
| 5 | use std::io::{self, Error, ErrorKind}; |
| 6 | use tokio::io::AsyncWriteExt; |
| 7 | use tokio_util::codec::{Encoder, FramedWrite}; |
| 8 | use tokio_util::io::{CopyToBytes, SinkWriter}; |
| 9 | use tokio_util::sync::PollSender; |
| 10 | |
| 11 | #[tokio::test ] |
| 12 | async fn test_copied_sink_writer() -> Result<(), Error> { |
| 13 | // Construct a channel pair to send data across and wrap a pollable sink. |
| 14 | // Note that the sink must mimic a writable object, e.g. have `std::io::Error` |
| 15 | // as its error type. |
| 16 | // As `PollSender` requires an owned copy of the buffer, we wrap it additionally |
| 17 | // with a `CopyToBytes` helper. |
| 18 | let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(1); |
| 19 | let mut writer = SinkWriter::new(CopyToBytes::new( |
| 20 | PollSender::new(tx).sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)), |
| 21 | )); |
| 22 | |
| 23 | // Write data to our interface... |
| 24 | let data: [u8; 4] = [1, 2, 3, 4]; |
| 25 | let _ = writer.write(&data).await; |
| 26 | |
| 27 | // ... and receive it. |
| 28 | assert_eq!(data.to_vec(), rx.recv().await.unwrap().to_vec()); |
| 29 | |
| 30 | Ok(()) |
| 31 | } |
| 32 | |
| 33 | /// A trivial encoder. |
| 34 | struct SliceEncoder; |
| 35 | |
| 36 | impl SliceEncoder { |
| 37 | fn new() -> Self { |
| 38 | Self {} |
| 39 | } |
| 40 | } |
| 41 | |
| 42 | impl<'a> Encoder<&'a [u8]> for SliceEncoder { |
| 43 | type Error = Error; |
| 44 | |
| 45 | fn encode(&mut self, item: &'a [u8], dst: &mut bytes::BytesMut) -> Result<(), Self::Error> { |
| 46 | // This is where we'd write packet headers, lengths, etc. in a real encoder. |
| 47 | // For simplicity and demonstration purposes, we just pack a copy of |
| 48 | // the slice at the end of a buffer. |
| 49 | dst.extend_from_slice(item); |
| 50 | Ok(()) |
| 51 | } |
| 52 | } |
| 53 | |
| 54 | #[tokio::test ] |
| 55 | async fn test_direct_sink_writer() -> Result<(), Error> { |
| 56 | // We define a framed writer which accepts byte slices |
| 57 | // and 'reverse' this construction immediately. |
| 58 | let framed_byte_lc = FramedWrite::new(Vec::new(), SliceEncoder::new()); |
| 59 | let mut writer = SinkWriter::new(framed_byte_lc); |
| 60 | |
| 61 | // Write multiple slices to the sink... |
| 62 | let _ = writer.write(&[1, 2, 3]).await; |
| 63 | let _ = writer.write(&[4, 5, 6]).await; |
| 64 | |
| 65 | // ... and compare it with the buffer. |
| 66 | assert_eq!( |
| 67 | writer.into_inner().write_buffer().to_vec().as_slice(), |
| 68 | &[1, 2, 3, 4, 5, 6] |
| 69 | ); |
| 70 | |
| 71 | Ok(()) |
| 72 | } |
| 73 | |