| 1 | use tokio_stream::{self as stream, StreamExt}; |
| 2 | use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; |
| 3 | |
| 4 | mod support { |
| 5 | pub(crate) mod mpsc; |
| 6 | } |
| 7 | |
| 8 | use support::mpsc; |
| 9 | |
| 10 | #[allow (clippy::let_unit_value)] |
| 11 | #[tokio::test ] |
| 12 | async fn empty_unit() { |
| 13 | // Drains the stream. |
| 14 | let mut iter = vec![(), (), ()].into_iter(); |
| 15 | let _: () = stream::iter(&mut iter).collect().await; |
| 16 | assert!(iter.next().is_none()); |
| 17 | } |
| 18 | |
| 19 | #[tokio::test ] |
| 20 | async fn empty_vec() { |
| 21 | let coll: Vec<u32> = stream::empty().collect().await; |
| 22 | assert!(coll.is_empty()); |
| 23 | } |
| 24 | |
| 25 | #[tokio::test ] |
| 26 | async fn empty_box_slice() { |
| 27 | let coll: Box<[u32]> = stream::empty().collect().await; |
| 28 | assert!(coll.is_empty()); |
| 29 | } |
| 30 | |
| 31 | #[tokio::test ] |
| 32 | async fn empty_string() { |
| 33 | let coll: String = stream::empty::<&str>().collect().await; |
| 34 | assert!(coll.is_empty()); |
| 35 | } |
| 36 | |
| 37 | #[tokio::test ] |
| 38 | async fn empty_result() { |
| 39 | let coll: Result<Vec<u32>, &str> = stream::empty().collect().await; |
| 40 | assert_eq!(Ok(vec![]), coll); |
| 41 | } |
| 42 | |
| 43 | #[tokio::test ] |
| 44 | async fn collect_vec_items() { |
| 45 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
| 46 | let mut fut = task::spawn(rx.collect::<Vec<i32>>()); |
| 47 | |
| 48 | assert_pending!(fut.poll()); |
| 49 | |
| 50 | tx.send(1).unwrap(); |
| 51 | assert!(fut.is_woken()); |
| 52 | assert_pending!(fut.poll()); |
| 53 | |
| 54 | tx.send(2).unwrap(); |
| 55 | assert!(fut.is_woken()); |
| 56 | assert_pending!(fut.poll()); |
| 57 | |
| 58 | drop(tx); |
| 59 | assert!(fut.is_woken()); |
| 60 | let coll = assert_ready!(fut.poll()); |
| 61 | assert_eq!(vec![1, 2], coll); |
| 62 | } |
| 63 | |
| 64 | #[tokio::test ] |
| 65 | async fn collect_string_items() { |
| 66 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
| 67 | |
| 68 | let mut fut = task::spawn(rx.collect::<String>()); |
| 69 | |
| 70 | assert_pending!(fut.poll()); |
| 71 | |
| 72 | tx.send("hello " .to_string()).unwrap(); |
| 73 | assert!(fut.is_woken()); |
| 74 | assert_pending!(fut.poll()); |
| 75 | |
| 76 | tx.send("world" .to_string()).unwrap(); |
| 77 | assert!(fut.is_woken()); |
| 78 | assert_pending!(fut.poll()); |
| 79 | |
| 80 | drop(tx); |
| 81 | assert!(fut.is_woken()); |
| 82 | let coll = assert_ready!(fut.poll()); |
| 83 | assert_eq!("hello world" , coll); |
| 84 | } |
| 85 | |
| 86 | #[tokio::test ] |
| 87 | async fn collect_str_items() { |
| 88 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
| 89 | |
| 90 | let mut fut = task::spawn(rx.collect::<String>()); |
| 91 | |
| 92 | assert_pending!(fut.poll()); |
| 93 | |
| 94 | tx.send("hello " ).unwrap(); |
| 95 | assert!(fut.is_woken()); |
| 96 | assert_pending!(fut.poll()); |
| 97 | |
| 98 | tx.send("world" ).unwrap(); |
| 99 | assert!(fut.is_woken()); |
| 100 | assert_pending!(fut.poll()); |
| 101 | |
| 102 | drop(tx); |
| 103 | assert!(fut.is_woken()); |
| 104 | let coll = assert_ready!(fut.poll()); |
| 105 | assert_eq!("hello world" , coll); |
| 106 | } |
| 107 | |
| 108 | #[tokio::test ] |
| 109 | async fn collect_results_ok() { |
| 110 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
| 111 | |
| 112 | let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); |
| 113 | |
| 114 | assert_pending!(fut.poll()); |
| 115 | |
| 116 | tx.send(Ok("hello " )).unwrap(); |
| 117 | assert!(fut.is_woken()); |
| 118 | assert_pending!(fut.poll()); |
| 119 | |
| 120 | tx.send(Ok("world" )).unwrap(); |
| 121 | assert!(fut.is_woken()); |
| 122 | assert_pending!(fut.poll()); |
| 123 | |
| 124 | drop(tx); |
| 125 | assert!(fut.is_woken()); |
| 126 | let coll = assert_ready_ok!(fut.poll()); |
| 127 | assert_eq!("hello world" , coll); |
| 128 | } |
| 129 | |
| 130 | #[tokio::test ] |
| 131 | async fn collect_results_err() { |
| 132 | let (tx, rx) = mpsc::unbounded_channel_stream(); |
| 133 | |
| 134 | let mut fut = task::spawn(rx.collect::<Result<String, &str>>()); |
| 135 | |
| 136 | assert_pending!(fut.poll()); |
| 137 | |
| 138 | tx.send(Ok("hello " )).unwrap(); |
| 139 | assert!(fut.is_woken()); |
| 140 | assert_pending!(fut.poll()); |
| 141 | |
| 142 | tx.send(Err("oh no" )).unwrap(); |
| 143 | assert!(fut.is_woken()); |
| 144 | let err = assert_ready_err!(fut.poll()); |
| 145 | assert_eq!("oh no" , err); |
| 146 | } |
| 147 | |