| 1 | use futures::channel::mpsc; |
| 2 | use futures::executor::{block_on, block_on_stream}; |
| 3 | use futures::future::{self, FutureExt}; |
| 4 | use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; |
| 5 | use futures::task::Poll; |
| 6 | use futures_test::task::noop_context; |
| 7 | |
| 8 | #[test] |
| 9 | fn is_terminated() { |
| 10 | let mut cx = noop_context(); |
| 11 | let mut tasks = SelectAll::new(); |
| 12 | |
| 13 | assert_eq!(tasks.is_terminated(), false); |
| 14 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); |
| 15 | assert_eq!(tasks.is_terminated(), true); |
| 16 | |
| 17 | // Test that the sentinel value doesn't leak |
| 18 | assert_eq!(tasks.is_empty(), true); |
| 19 | assert_eq!(tasks.len(), 0); |
| 20 | |
| 21 | tasks.push(future::ready(1).into_stream()); |
| 22 | |
| 23 | assert_eq!(tasks.is_empty(), false); |
| 24 | assert_eq!(tasks.len(), 1); |
| 25 | |
| 26 | assert_eq!(tasks.is_terminated(), false); |
| 27 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1))); |
| 28 | assert_eq!(tasks.is_terminated(), false); |
| 29 | assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None)); |
| 30 | assert_eq!(tasks.is_terminated(), true); |
| 31 | } |
| 32 | |
| 33 | #[test] |
| 34 | fn issue_1626() { |
| 35 | let a = stream::iter(0..=2); |
| 36 | let b = stream::iter(10..=14); |
| 37 | |
| 38 | let mut s = block_on_stream(stream::select_all(vec![a, b])); |
| 39 | |
| 40 | assert_eq!(s.next(), Some(0)); |
| 41 | assert_eq!(s.next(), Some(10)); |
| 42 | assert_eq!(s.next(), Some(1)); |
| 43 | assert_eq!(s.next(), Some(11)); |
| 44 | assert_eq!(s.next(), Some(2)); |
| 45 | assert_eq!(s.next(), Some(12)); |
| 46 | assert_eq!(s.next(), Some(13)); |
| 47 | assert_eq!(s.next(), Some(14)); |
| 48 | assert_eq!(s.next(), None); |
| 49 | } |
| 50 | |
| 51 | #[test] |
| 52 | fn works_1() { |
| 53 | let (a_tx, a_rx) = mpsc::unbounded::<u32>(); |
| 54 | let (b_tx, b_rx) = mpsc::unbounded::<u32>(); |
| 55 | let (c_tx, c_rx) = mpsc::unbounded::<u32>(); |
| 56 | |
| 57 | let streams = vec![a_rx, b_rx, c_rx]; |
| 58 | |
| 59 | let mut stream = block_on_stream(select_all(streams)); |
| 60 | |
| 61 | b_tx.unbounded_send(99).unwrap(); |
| 62 | a_tx.unbounded_send(33).unwrap(); |
| 63 | assert_eq!(Some(33), stream.next()); |
| 64 | assert_eq!(Some(99), stream.next()); |
| 65 | |
| 66 | b_tx.unbounded_send(99).unwrap(); |
| 67 | a_tx.unbounded_send(33).unwrap(); |
| 68 | assert_eq!(Some(33), stream.next()); |
| 69 | assert_eq!(Some(99), stream.next()); |
| 70 | |
| 71 | c_tx.unbounded_send(42).unwrap(); |
| 72 | assert_eq!(Some(42), stream.next()); |
| 73 | a_tx.unbounded_send(43).unwrap(); |
| 74 | assert_eq!(Some(43), stream.next()); |
| 75 | |
| 76 | drop((a_tx, b_tx, c_tx)); |
| 77 | assert_eq!(None, stream.next()); |
| 78 | } |
| 79 | |
| 80 | #[test] |
| 81 | fn clear() { |
| 82 | let mut tasks = |
| 83 | select_all(vec![stream::iter(vec![1].into_iter()), stream::iter(vec![2].into_iter())]); |
| 84 | |
| 85 | assert_eq!(block_on(tasks.next()), Some(1)); |
| 86 | assert!(!tasks.is_empty()); |
| 87 | |
| 88 | tasks.clear(); |
| 89 | assert!(tasks.is_empty()); |
| 90 | |
| 91 | tasks.push(stream::iter(vec![3].into_iter())); |
| 92 | assert!(!tasks.is_empty()); |
| 93 | |
| 94 | tasks.clear(); |
| 95 | assert!(tasks.is_empty()); |
| 96 | |
| 97 | assert_eq!(block_on(tasks.next()), None); |
| 98 | assert!(tasks.is_terminated()); |
| 99 | tasks.clear(); |
| 100 | assert!(!tasks.is_terminated()); |
| 101 | } |
| 102 | |
| 103 | #[test] |
| 104 | fn iter_mut() { |
| 105 | let mut stream = |
| 106 | vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] |
| 107 | .into_iter() |
| 108 | .collect::<SelectAll<_>>(); |
| 109 | |
| 110 | let mut iter = stream.iter_mut(); |
| 111 | assert_eq!(iter.len(), 3); |
| 112 | assert!(iter.next().is_some()); |
| 113 | assert_eq!(iter.len(), 2); |
| 114 | assert!(iter.next().is_some()); |
| 115 | assert_eq!(iter.len(), 1); |
| 116 | assert!(iter.next().is_some()); |
| 117 | assert_eq!(iter.len(), 0); |
| 118 | assert!(iter.next().is_none()); |
| 119 | |
| 120 | let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] |
| 121 | .into_iter() |
| 122 | .collect::<SelectAll<_>>(); |
| 123 | |
| 124 | assert_eq!(stream.len(), 3); |
| 125 | assert_eq!(block_on(stream.next()), Some(1)); |
| 126 | assert_eq!(stream.len(), 2); |
| 127 | let mut iter = stream.iter_mut(); |
| 128 | assert_eq!(iter.len(), 2); |
| 129 | assert!(iter.next().is_some()); |
| 130 | assert_eq!(iter.len(), 1); |
| 131 | assert!(iter.next().is_some()); |
| 132 | assert_eq!(iter.len(), 0); |
| 133 | assert!(iter.next().is_none()); |
| 134 | |
| 135 | assert_eq!(block_on(stream.next()), Some(2)); |
| 136 | assert_eq!(stream.len(), 2); |
| 137 | assert_eq!(block_on(stream.next()), None); |
| 138 | let mut iter = stream.iter_mut(); |
| 139 | assert_eq!(iter.len(), 0); |
| 140 | assert!(iter.next().is_none()); |
| 141 | } |
| 142 | |
| 143 | #[test] |
| 144 | fn iter() { |
| 145 | let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] |
| 146 | .into_iter() |
| 147 | .collect::<SelectAll<_>>(); |
| 148 | |
| 149 | let mut iter = stream.iter(); |
| 150 | assert_eq!(iter.len(), 3); |
| 151 | assert!(iter.next().is_some()); |
| 152 | assert_eq!(iter.len(), 2); |
| 153 | assert!(iter.next().is_some()); |
| 154 | assert_eq!(iter.len(), 1); |
| 155 | assert!(iter.next().is_some()); |
| 156 | assert_eq!(iter.len(), 0); |
| 157 | assert!(iter.next().is_none()); |
| 158 | |
| 159 | let mut stream = vec![stream::iter(vec![]), stream::iter(vec![1]), stream::iter(vec![2])] |
| 160 | .into_iter() |
| 161 | .collect::<SelectAll<_>>(); |
| 162 | |
| 163 | assert_eq!(stream.len(), 3); |
| 164 | assert_eq!(block_on(stream.next()), Some(1)); |
| 165 | assert_eq!(stream.len(), 2); |
| 166 | let mut iter = stream.iter(); |
| 167 | assert_eq!(iter.len(), 2); |
| 168 | assert!(iter.next().is_some()); |
| 169 | assert_eq!(iter.len(), 1); |
| 170 | assert!(iter.next().is_some()); |
| 171 | assert_eq!(iter.len(), 0); |
| 172 | assert!(iter.next().is_none()); |
| 173 | |
| 174 | assert_eq!(block_on(stream.next()), Some(2)); |
| 175 | assert_eq!(stream.len(), 2); |
| 176 | assert_eq!(block_on(stream.next()), None); |
| 177 | let mut iter = stream.iter(); |
| 178 | assert_eq!(iter.len(), 0); |
| 179 | assert!(iter.next().is_none()); |
| 180 | } |
| 181 | |
| 182 | #[test] |
| 183 | fn into_iter() { |
| 184 | let stream = vec![stream::pending::<()>(), stream::pending::<()>(), stream::pending::<()>()] |
| 185 | .into_iter() |
| 186 | .collect::<SelectAll<_>>(); |
| 187 | |
| 188 | let mut iter = stream.into_iter(); |
| 189 | assert_eq!(iter.len(), 3); |
| 190 | assert!(iter.next().is_some()); |
| 191 | assert_eq!(iter.len(), 2); |
| 192 | assert!(iter.next().is_some()); |
| 193 | assert_eq!(iter.len(), 1); |
| 194 | assert!(iter.next().is_some()); |
| 195 | assert_eq!(iter.len(), 0); |
| 196 | assert!(iter.next().is_none()); |
| 197 | } |
| 198 | |