1#![warn(rust_2018_idioms)]
2
3use bytes::Bytes;
4use futures_util::SinkExt;
5use std::io::{self, Error, ErrorKind};
6use tokio::io::AsyncWriteExt;
7use tokio_util::codec::{Encoder, FramedWrite};
8use tokio_util::io::{CopyToBytes, SinkWriter};
9use tokio_util::sync::PollSender;
10
11#[tokio::test]
12async fn test_copied_sink_writer() -> Result<(), Error> {
13 // Construct a channel pair to send data across and wrap a pollable sink.
14 // Note that the sink must mimic a writable object, e.g. have `std::io::Error`
15 // as its error type.
16 // As `PollSender` requires an owned copy of the buffer, we wrap it additionally
17 // with a `CopyToBytes` helper.
18 let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(1);
19 let mut writer = SinkWriter::new(CopyToBytes::new(
20 PollSender::new(tx).sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)),
21 ));
22
23 // Write data to our interface...
24 let data: [u8; 4] = [1, 2, 3, 4];
25 let _ = writer.write(&data).await;
26
27 // ... and receive it.
28 assert_eq!(data.to_vec(), rx.recv().await.unwrap().to_vec());
29
30 Ok(())
31}
32
33/// A trivial encoder.
34struct SliceEncoder;
35
36impl SliceEncoder {
37 fn new() -> Self {
38 Self {}
39 }
40}
41
42impl<'a> Encoder<&'a [u8]> for SliceEncoder {
43 type Error = Error;
44
45 fn encode(&mut self, item: &'a [u8], dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
46 // This is where we'd write packet headers, lengths, etc. in a real encoder.
47 // For simplicity and demonstration purposes, we just pack a copy of
48 // the slice at the end of a buffer.
49 dst.extend_from_slice(item);
50 Ok(())
51 }
52}
53
54#[tokio::test]
55async fn test_direct_sink_writer() -> Result<(), Error> {
56 // We define a framed writer which accepts byte slices
57 // and 'reverse' this construction immediately.
58 let framed_byte_lc = FramedWrite::new(Vec::new(), SliceEncoder::new());
59 let mut writer = SinkWriter::new(framed_byte_lc);
60
61 // Write multiple slices to the sink...
62 let _ = writer.write(&[1, 2, 3]).await;
63 let _ = writer.write(&[4, 5, 6]).await;
64
65 // ... and compare it with the buffer.
66 assert_eq!(
67 writer.into_inner().write_buffer().to_vec().as_slice(),
68 &[1, 2, 3, 4, 5, 6]
69 );
70
71 Ok(())
72}
73