1 | #![warn (rust_2018_idioms)] |
2 | |
3 | use bytes::Bytes; |
4 | use futures_util::SinkExt; |
5 | use std::io::{self, Error, ErrorKind}; |
6 | use tokio::io::AsyncWriteExt; |
7 | use tokio_util::codec::{Encoder, FramedWrite}; |
8 | use tokio_util::io::{CopyToBytes, SinkWriter}; |
9 | use tokio_util::sync::PollSender; |
10 | |
11 | #[tokio::test ] |
12 | async fn test_copied_sink_writer() -> Result<(), Error> { |
13 | // Construct a channel pair to send data across and wrap a pollable sink. |
14 | // Note that the sink must mimic a writable object, e.g. have `std::io::Error` |
15 | // as its error type. |
16 | // As `PollSender` requires an owned copy of the buffer, we wrap it additionally |
17 | // with a `CopyToBytes` helper. |
18 | let (tx, mut rx) = tokio::sync::mpsc::channel::<Bytes>(1); |
19 | let mut writer = SinkWriter::new(CopyToBytes::new( |
20 | PollSender::new(tx).sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)), |
21 | )); |
22 | |
23 | // Write data to our interface... |
24 | let data: [u8; 4] = [1, 2, 3, 4]; |
25 | let _ = writer.write(&data).await; |
26 | |
27 | // ... and receive it. |
28 | assert_eq!(data.to_vec(), rx.recv().await.unwrap().to_vec()); |
29 | |
30 | Ok(()) |
31 | } |
32 | |
33 | /// A trivial encoder. |
34 | struct SliceEncoder; |
35 | |
36 | impl SliceEncoder { |
37 | fn new() -> Self { |
38 | Self {} |
39 | } |
40 | } |
41 | |
42 | impl<'a> Encoder<&'a [u8]> for SliceEncoder { |
43 | type Error = Error; |
44 | |
45 | fn encode(&mut self, item: &'a [u8], dst: &mut bytes::BytesMut) -> Result<(), Self::Error> { |
46 | // This is where we'd write packet headers, lengths, etc. in a real encoder. |
47 | // For simplicity and demonstration purposes, we just pack a copy of |
48 | // the slice at the end of a buffer. |
49 | dst.extend_from_slice(item); |
50 | Ok(()) |
51 | } |
52 | } |
53 | |
54 | #[tokio::test ] |
55 | async fn test_direct_sink_writer() -> Result<(), Error> { |
56 | // We define a framed writer which accepts byte slices |
57 | // and 'reverse' this construction immediately. |
58 | let framed_byte_lc = FramedWrite::new(Vec::new(), SliceEncoder::new()); |
59 | let mut writer = SinkWriter::new(framed_byte_lc); |
60 | |
61 | // Write multiple slices to the sink... |
62 | let _ = writer.write(&[1, 2, 3]).await; |
63 | let _ = writer.write(&[4, 5, 6]).await; |
64 | |
65 | // ... and compare it with the buffer. |
66 | assert_eq!( |
67 | writer.into_inner().write_buffer().to_vec().as_slice(), |
68 | &[1, 2, 3, 4, 5, 6] |
69 | ); |
70 | |
71 | Ok(()) |
72 | } |
73 | |