1use futures::executor::block_on;
2use futures::sink::{Sink, SinkExt};
3use futures::stream::{self, Stream, StreamExt};
4use futures::task::{Context, Poll};
5use pin_project::pin_project;
6use std::pin::Pin;
7
8#[test]
9fn test_split() {
10 #[pin_project]
11 struct Join<T, U> {
12 #[pin]
13 stream: T,
14 #[pin]
15 sink: U,
16 }
17
18 impl<T: Stream, U> Stream for Join<T, U> {
19 type Item = T::Item;
20
21 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
22 self.project().stream.poll_next(cx)
23 }
24 }
25
26 impl<T, U: Sink<Item>, Item> Sink<Item> for Join<T, U> {
27 type Error = U::Error;
28
29 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
30 self.project().sink.poll_ready(cx)
31 }
32
33 fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
34 self.project().sink.start_send(item)
35 }
36
37 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
38 self.project().sink.poll_flush(cx)
39 }
40
41 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
42 self.project().sink.poll_close(cx)
43 }
44 }
45
46 let mut dest: Vec<i32> = Vec::new();
47 {
48 let join = Join { stream: stream::iter(vec![10, 20, 30]), sink: &mut dest };
49
50 let (sink, stream) = join.split();
51 let join = sink.reunite(stream).expect("test_split: reunite error");
52 let (mut sink, stream) = join.split();
53 let mut stream = stream.map(Ok);
54 block_on(sink.send_all(&mut stream)).unwrap();
55 }
56 assert_eq!(dest, vec![10, 20, 30]);
57}
58