1 | use futures::channel::mpsc; |
2 | use futures::executor::block_on; |
3 | use futures::stream::{abortable, Stream, StreamExt}; |
4 | use futures::task::{Context, Poll}; |
5 | use futures::SinkExt; |
6 | use futures_test::task::new_count_waker; |
7 | use std::pin::Pin; |
8 | |
9 | #[test] |
10 | fn abortable_works() { |
11 | let (_tx, a_rx) = mpsc::channel::<()>(1); |
12 | let (mut abortable_rx, abort_handle) = abortable(a_rx); |
13 | |
14 | abort_handle.abort(); |
15 | assert!(abortable_rx.is_aborted()); |
16 | assert_eq!(None, block_on(abortable_rx.next())); |
17 | } |
18 | |
19 | #[test] |
20 | fn abortable_awakens() { |
21 | let (_tx, a_rx) = mpsc::channel::<()>(1); |
22 | let (mut abortable_rx, abort_handle) = abortable(a_rx); |
23 | |
24 | let (waker, counter) = new_count_waker(); |
25 | let mut cx = Context::from_waker(&waker); |
26 | |
27 | assert_eq!(counter, 0); |
28 | assert_eq!(Poll::Pending, Pin::new(&mut abortable_rx).poll_next(&mut cx)); |
29 | assert_eq!(counter, 0); |
30 | |
31 | abort_handle.abort(); |
32 | assert_eq!(counter, 1); |
33 | assert!(abortable_rx.is_aborted()); |
34 | assert_eq!(Poll::Ready(None), Pin::new(&mut abortable_rx).poll_next(&mut cx)); |
35 | } |
36 | |
37 | #[test] |
38 | fn abortable_resolves() { |
39 | let (mut tx, a_rx) = mpsc::channel::<()>(1); |
40 | let (mut abortable_rx, _abort_handle) = abortable(a_rx); |
41 | |
42 | block_on(tx.send(())).unwrap(); |
43 | |
44 | assert!(!abortable_rx.is_aborted()); |
45 | assert_eq!(Some(()), block_on(abortable_rx.next())); |
46 | } |
47 | |