1 | use futures::executor::block_on_stream; |
2 | use futures::future::{err, ok, TryFutureExt}; |
3 | use futures::sink::Sink; |
4 | use futures::stream::Stream; |
5 | use futures::stream::{self, StreamExt}; |
6 | use futures::task::{Context, Poll}; |
7 | use std::marker::PhantomData; |
8 | use std::pin::Pin; |
9 | |
10 | #[test] |
11 | fn successful_future() { |
12 | let stream_items = vec![17, 19]; |
13 | let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); |
14 | |
15 | let stream = future_of_a_stream.try_flatten_stream(); |
16 | |
17 | let mut iter = block_on_stream(stream); |
18 | assert_eq!(Ok(17), iter.next().unwrap()); |
19 | assert_eq!(Ok(19), iter.next().unwrap()); |
20 | assert_eq!(None, iter.next()); |
21 | } |
22 | |
23 | #[test] |
24 | fn failed_future() { |
25 | struct PanickingStream<T, E> { |
26 | _marker: PhantomData<(T, E)>, |
27 | } |
28 | |
29 | impl<T, E> Stream for PanickingStream<T, E> { |
30 | type Item = Result<T, E>; |
31 | |
32 | fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
33 | panic!() |
34 | } |
35 | } |
36 | |
37 | let future_of_a_stream = err::<PanickingStream<bool, u32>, _>(10); |
38 | let stream = future_of_a_stream.try_flatten_stream(); |
39 | let mut iter = block_on_stream(stream); |
40 | assert_eq!(Err(10), iter.next().unwrap()); |
41 | assert_eq!(None, iter.next()); |
42 | } |
43 | |
44 | #[test] |
45 | fn assert_impls() { |
46 | struct StreamSink<T, E, Item>(PhantomData<(T, E, Item)>); |
47 | |
48 | impl<T, E, Item> Stream for StreamSink<T, E, Item> { |
49 | type Item = Result<T, E>; |
50 | fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
51 | panic!() |
52 | } |
53 | } |
54 | |
55 | impl<T, E, Item> Sink<Item> for StreamSink<T, E, Item> { |
56 | type Error = E; |
57 | fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
58 | panic!() |
59 | } |
60 | fn start_send(self: Pin<&mut Self>, _: Item) -> Result<(), Self::Error> { |
61 | panic!() |
62 | } |
63 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
64 | panic!() |
65 | } |
66 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
67 | panic!() |
68 | } |
69 | } |
70 | |
71 | fn assert_stream<S: Stream>(_: &S) {} |
72 | fn assert_sink<S: Sink<Item>, Item>(_: &S) {} |
73 | fn assert_stream_sink<S: Stream + Sink<Item>, Item>(_: &S) {} |
74 | |
75 | let s = ok(StreamSink::<(), (), ()>(PhantomData)).try_flatten_stream(); |
76 | assert_stream(&s); |
77 | assert_sink(&s); |
78 | assert_stream_sink(&s); |
79 | let s = ok(StreamSink::<(), (), ()>(PhantomData)).flatten_sink(); |
80 | assert_stream(&s); |
81 | assert_sink(&s); |
82 | assert_stream_sink(&s); |
83 | } |
84 | |