| 1 | use futures::executor::block_on; |
| 2 | use futures::sink::{Sink, SinkExt}; |
| 3 | use futures::stream::{self, Stream, StreamExt}; |
| 4 | use futures::task::{Context, Poll}; |
| 5 | use pin_project::pin_project ; |
| 6 | use std::pin::Pin; |
| 7 | |
| 8 | #[test] |
| 9 | fn test_split() { |
| 10 | #[pin_project ] |
| 11 | struct Join<T, U> { |
| 12 | #[pin] |
| 13 | stream: T, |
| 14 | #[pin] |
| 15 | sink: U, |
| 16 | } |
| 17 | |
| 18 | impl<T: Stream, U> Stream for Join<T, U> { |
| 19 | type Item = T::Item; |
| 20 | |
| 21 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
| 22 | self.project().stream.poll_next(cx) |
| 23 | } |
| 24 | } |
| 25 | |
| 26 | impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { |
| 27 | type Error = U::Error; |
| 28 | |
| 29 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 30 | self.project().sink.poll_ready(cx) |
| 31 | } |
| 32 | |
| 33 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
| 34 | self.project().sink.start_send(item) |
| 35 | } |
| 36 | |
| 37 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 38 | self.project().sink.poll_flush(cx) |
| 39 | } |
| 40 | |
| 41 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
| 42 | self.project().sink.poll_close(cx) |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | let mut dest: Vec<i32> = Vec::new(); |
| 47 | { |
| 48 | let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest }; |
| 49 | |
| 50 | let (sink, stream) = join.split(); |
| 51 | let join = sink.reunite(stream).expect("test_split: reunite error" ); |
| 52 | let (mut sink, stream) = join.split(); |
| 53 | let mut stream = stream.map(Ok); |
| 54 | block_on(sink.send_all(&mut stream)).unwrap(); |
| 55 | } |
| 56 | assert_eq!(dest, vec![10, 20, 30]); |
| 57 | } |
| 58 | |