1 | use futures::executor::block_on; |
2 | use futures::sink::{Sink, SinkExt}; |
3 | use futures::stream::{self, Stream, StreamExt}; |
4 | use futures::task::{Context, Poll}; |
5 | use pin_project::pin_project ; |
6 | use std::pin::Pin; |
7 | |
8 | #[test] |
9 | fn test_split() { |
10 | #[pin_project ] |
11 | struct Join<T, U> { |
12 | #[pin] |
13 | stream: T, |
14 | #[pin] |
15 | sink: U, |
16 | } |
17 | |
18 | impl<T: Stream, U> Stream for Join<T, U> { |
19 | type Item = T::Item; |
20 | |
21 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> { |
22 | self.project().stream.poll_next(cx) |
23 | } |
24 | } |
25 | |
26 | impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> { |
27 | type Error = U::Error; |
28 | |
29 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
30 | self.project().sink.poll_ready(cx) |
31 | } |
32 | |
33 | fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { |
34 | self.project().sink.start_send(item) |
35 | } |
36 | |
37 | fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
38 | self.project().sink.poll_flush(cx) |
39 | } |
40 | |
41 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
42 | self.project().sink.poll_close(cx) |
43 | } |
44 | } |
45 | |
46 | let mut dest: Vec<i32> = Vec::new(); |
47 | { |
48 | let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest }; |
49 | |
50 | let (sink, stream) = join.split(); |
51 | let join = sink.reunite(stream).expect("test_split: reunite error" ); |
52 | let (mut sink, stream) = join.split(); |
53 | let mut stream = stream.map(Ok); |
54 | block_on(sink.send_all(&mut stream)).unwrap(); |
55 | } |
56 | assert_eq!(dest, vec![10, 20, 30]); |
57 | } |
58 | |