1use crate::Stream;
2
3use std::borrow::Borrow;
4use std::hash::Hash;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8/// Combine many streams into one, indexing each source stream with a unique
9/// key.
10///
11/// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
12/// streams into a single merged stream that yields values in the order that
13/// they arrive from the source streams. However, `StreamMap` has a lot more
14/// flexibility in usage patterns.
15///
16/// `StreamMap` can:
17///
18/// * Merge an arbitrary number of streams.
19/// * Track which source stream the value was received from.
20/// * Handle inserting and removing streams from the set of managed streams at
21/// any point during iteration.
22///
23/// All source streams held by `StreamMap` are indexed using a key. This key is
24/// included with the value when a source stream yields a value. The key is also
25/// used to remove the stream from the `StreamMap` before the stream has
26/// completed streaming.
27///
28/// # `Unpin`
29///
30/// Because the `StreamMap` API moves streams during runtime, both streams and
31/// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
32/// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
33/// pin the stream in the heap.
34///
35/// # Implementation
36///
37/// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
38/// internal implementation detail will persist in future versions, but it is
39/// important to know the runtime implications. In general, `StreamMap` works
40/// best with a "smallish" number of streams as all entries are scanned on
41/// insert, remove, and polling. In cases where a large number of streams need
42/// to be merged, it may be advisable to use tasks sending values on a shared
43/// [`mpsc`] channel.
44///
45/// # Notes
46///
47/// `StreamMap` removes finished streams automatically, without alerting the user.
48/// In some scenarios, the caller would want to know on closed streams.
49/// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
50/// It will return None when the stream is closed.
51///
52/// [`StreamExt::merge`]: crate::StreamExt::merge
53/// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
54/// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
55/// [`Box::pin`]: std::boxed::Box::pin
56/// [`StreamNotifyClose`]: crate::StreamNotifyClose
57///
58/// # Examples
59///
60/// Merging two streams, then remove them after receiving the first value
61///
62/// ```
63/// use tokio_stream::{StreamExt, StreamMap, Stream};
64/// use tokio::sync::mpsc;
65/// use std::pin::Pin;
66///
67/// #[tokio::main]
68/// async fn main() {
69/// let (tx1, mut rx1) = mpsc::channel::<usize>(10);
70/// let (tx2, mut rx2) = mpsc::channel::<usize>(10);
71///
72/// // Convert the channels to a `Stream`.
73/// let rx1 = Box::pin(async_stream::stream! {
74/// while let Some(item) = rx1.recv().await {
75/// yield item;
76/// }
77/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
78///
79/// let rx2 = Box::pin(async_stream::stream! {
80/// while let Some(item) = rx2.recv().await {
81/// yield item;
82/// }
83/// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
84///
85/// tokio::spawn(async move {
86/// tx1.send(1).await.unwrap();
87///
88/// // This value will never be received. The send may or may not return
89/// // `Err` depending on if the remote end closed first or not.
90/// let _ = tx1.send(2).await;
91/// });
92///
93/// tokio::spawn(async move {
94/// tx2.send(3).await.unwrap();
95/// let _ = tx2.send(4).await;
96/// });
97///
98/// let mut map = StreamMap::new();
99///
100/// // Insert both streams
101/// map.insert("one", rx1);
102/// map.insert("two", rx2);
103///
104/// // Read twice
105/// for _ in 0..2 {
106/// let (key, val) = map.next().await.unwrap();
107///
108/// if key == "one" {
109/// assert_eq!(val, 1);
110/// } else {
111/// assert_eq!(val, 3);
112/// }
113///
114/// // Remove the stream to prevent reading the next value
115/// map.remove(key);
116/// }
117/// }
118/// ```
119///
120/// This example models a read-only client to a chat system with channels. The
121/// client sends commands to join and leave channels. `StreamMap` is used to
122/// manage active channel subscriptions.
123///
124/// For simplicity, messages are displayed with `println!`, but they could be
125/// sent to the client over a socket.
126///
127/// ```no_run
128/// use tokio_stream::{Stream, StreamExt, StreamMap};
129///
130/// enum Command {
131/// Join(String),
132/// Leave(String),
133/// }
134///
135/// fn commands() -> impl Stream<Item = Command> {
136/// // Streams in user commands by parsing `stdin`.
137/// # tokio_stream::pending()
138/// }
139///
140/// // Join a channel, returns a stream of messages received on the channel.
141/// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
142/// // left as an exercise to the reader
143/// # tokio_stream::pending()
144/// }
145///
146/// #[tokio::main]
147/// async fn main() {
148/// let mut channels = StreamMap::new();
149///
150/// // Input commands (join / leave channels).
151/// let cmds = commands();
152/// tokio::pin!(cmds);
153///
154/// loop {
155/// tokio::select! {
156/// Some(cmd) = cmds.next() => {
157/// match cmd {
158/// Command::Join(chan) => {
159/// // Join the channel and add it to the `channels`
160/// // stream map
161/// let msgs = join(&chan);
162/// channels.insert(chan, msgs);
163/// }
164/// Command::Leave(chan) => {
165/// channels.remove(&chan);
166/// }
167/// }
168/// }
169/// Some((chan, msg)) = channels.next() => {
170/// // Received a message, display it on stdout with the channel
171/// // it originated from.
172/// println!("{}: {}", chan, msg);
173/// }
174/// // Both the `commands` stream and the `channels` stream are
175/// // complete. There is no more work to do, so leave the loop.
176/// else => break,
177/// }
178/// }
179/// }
180/// ```
181///
182/// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
183///
184/// ```
185/// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
186///
187/// #[tokio::main]
188/// async fn main() {
189/// let mut map = StreamMap::new();
190/// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
191/// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
192/// map.insert(0, stream);
193/// map.insert(1, stream2);
194/// while let Some((key, val)) = map.next().await {
195/// match val {
196/// Some(val) => println!("got {val:?} from stream {key:?}"),
197/// None => println!("stream {key:?} closed"),
198/// }
199/// }
200/// }
201/// ```
202
203#[derive(Debug)]
204pub struct StreamMap<K, V> {
205 /// Streams stored in the map
206 entries: Vec<(K, V)>,
207}
208
209impl<K, V> StreamMap<K, V> {
210 /// An iterator visiting all key-value pairs in arbitrary order.
211 ///
212 /// The iterator element type is &'a (K, V).
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use tokio_stream::{StreamMap, pending};
218 ///
219 /// let mut map = StreamMap::new();
220 ///
221 /// map.insert("a", pending::<i32>());
222 /// map.insert("b", pending());
223 /// map.insert("c", pending());
224 ///
225 /// for (key, stream) in map.iter() {
226 /// println!("({}, {:?})", key, stream);
227 /// }
228 /// ```
229 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
230 self.entries.iter()
231 }
232
233 /// An iterator visiting all key-value pairs mutably in arbitrary order.
234 ///
235 /// The iterator element type is &'a mut (K, V).
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// use tokio_stream::{StreamMap, pending};
241 ///
242 /// let mut map = StreamMap::new();
243 ///
244 /// map.insert("a", pending::<i32>());
245 /// map.insert("b", pending());
246 /// map.insert("c", pending());
247 ///
248 /// for (key, stream) in map.iter_mut() {
249 /// println!("({}, {:?})", key, stream);
250 /// }
251 /// ```
252 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
253 self.entries.iter_mut()
254 }
255
256 /// Creates an empty `StreamMap`.
257 ///
258 /// The stream map is initially created with a capacity of `0`, so it will
259 /// not allocate until it is first inserted into.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// use tokio_stream::{StreamMap, Pending};
265 ///
266 /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
267 /// ```
268 pub fn new() -> StreamMap<K, V> {
269 StreamMap { entries: vec![] }
270 }
271
272 /// Creates an empty `StreamMap` with the specified capacity.
273 ///
274 /// The stream map will be able to hold at least `capacity` elements without
275 /// reallocating. If `capacity` is 0, the stream map will not allocate.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// use tokio_stream::{StreamMap, Pending};
281 ///
282 /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
283 /// ```
284 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
285 StreamMap {
286 entries: Vec::with_capacity(capacity),
287 }
288 }
289
290 /// Returns an iterator visiting all keys in arbitrary order.
291 ///
292 /// The iterator element type is &'a K.
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// use tokio_stream::{StreamMap, pending};
298 ///
299 /// let mut map = StreamMap::new();
300 ///
301 /// map.insert("a", pending::<i32>());
302 /// map.insert("b", pending());
303 /// map.insert("c", pending());
304 ///
305 /// for key in map.keys() {
306 /// println!("{}", key);
307 /// }
308 /// ```
309 pub fn keys(&self) -> impl Iterator<Item = &K> {
310 self.iter().map(|(k, _)| k)
311 }
312
313 /// An iterator visiting all values in arbitrary order.
314 ///
315 /// The iterator element type is &'a V.
316 ///
317 /// # Examples
318 ///
319 /// ```
320 /// use tokio_stream::{StreamMap, pending};
321 ///
322 /// let mut map = StreamMap::new();
323 ///
324 /// map.insert("a", pending::<i32>());
325 /// map.insert("b", pending());
326 /// map.insert("c", pending());
327 ///
328 /// for stream in map.values() {
329 /// println!("{:?}", stream);
330 /// }
331 /// ```
332 pub fn values(&self) -> impl Iterator<Item = &V> {
333 self.iter().map(|(_, v)| v)
334 }
335
336 /// An iterator visiting all values mutably in arbitrary order.
337 ///
338 /// The iterator element type is &'a mut V.
339 ///
340 /// # Examples
341 ///
342 /// ```
343 /// use tokio_stream::{StreamMap, pending};
344 ///
345 /// let mut map = StreamMap::new();
346 ///
347 /// map.insert("a", pending::<i32>());
348 /// map.insert("b", pending());
349 /// map.insert("c", pending());
350 ///
351 /// for stream in map.values_mut() {
352 /// println!("{:?}", stream);
353 /// }
354 /// ```
355 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
356 self.iter_mut().map(|(_, v)| v)
357 }
358
359 /// Returns the number of streams the map can hold without reallocating.
360 ///
361 /// This number is a lower bound; the `StreamMap` might be able to hold
362 /// more, but is guaranteed to be able to hold at least this many.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use tokio_stream::{StreamMap, Pending};
368 ///
369 /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
370 /// assert!(map.capacity() >= 100);
371 /// ```
372 pub fn capacity(&self) -> usize {
373 self.entries.capacity()
374 }
375
376 /// Returns the number of streams in the map.
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use tokio_stream::{StreamMap, pending};
382 ///
383 /// let mut a = StreamMap::new();
384 /// assert_eq!(a.len(), 0);
385 /// a.insert(1, pending::<i32>());
386 /// assert_eq!(a.len(), 1);
387 /// ```
388 pub fn len(&self) -> usize {
389 self.entries.len()
390 }
391
392 /// Returns `true` if the map contains no elements.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use tokio_stream::{StreamMap, pending};
398 ///
399 /// let mut a = StreamMap::new();
400 /// assert!(a.is_empty());
401 /// a.insert(1, pending::<i32>());
402 /// assert!(!a.is_empty());
403 /// ```
404 pub fn is_empty(&self) -> bool {
405 self.entries.is_empty()
406 }
407
408 /// Clears the map, removing all key-stream pairs. Keeps the allocated
409 /// memory for reuse.
410 ///
411 /// # Examples
412 ///
413 /// ```
414 /// use tokio_stream::{StreamMap, pending};
415 ///
416 /// let mut a = StreamMap::new();
417 /// a.insert(1, pending::<i32>());
418 /// a.clear();
419 /// assert!(a.is_empty());
420 /// ```
421 pub fn clear(&mut self) {
422 self.entries.clear();
423 }
424
425 /// Insert a key-stream pair into the map.
426 ///
427 /// If the map did not have this key present, `None` is returned.
428 ///
429 /// If the map did have this key present, the new `stream` replaces the old
430 /// one and the old stream is returned.
431 ///
432 /// # Examples
433 ///
434 /// ```
435 /// use tokio_stream::{StreamMap, pending};
436 ///
437 /// let mut map = StreamMap::new();
438 ///
439 /// assert!(map.insert(37, pending::<i32>()).is_none());
440 /// assert!(!map.is_empty());
441 ///
442 /// map.insert(37, pending());
443 /// assert!(map.insert(37, pending()).is_some());
444 /// ```
445 pub fn insert(&mut self, k: K, stream: V) -> Option<V>
446 where
447 K: Hash + Eq,
448 {
449 let ret = self.remove(&k);
450 self.entries.push((k, stream));
451
452 ret
453 }
454
455 /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
456 ///
457 /// The key may be any borrowed form of the map's key type, but `Hash` and
458 /// `Eq` on the borrowed form must match those for the key type.
459 ///
460 /// # Examples
461 ///
462 /// ```
463 /// use tokio_stream::{StreamMap, pending};
464 ///
465 /// let mut map = StreamMap::new();
466 /// map.insert(1, pending::<i32>());
467 /// assert!(map.remove(&1).is_some());
468 /// assert!(map.remove(&1).is_none());
469 /// ```
470 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
471 where
472 K: Borrow<Q>,
473 Q: Hash + Eq,
474 {
475 for i in 0..self.entries.len() {
476 if self.entries[i].0.borrow() == k {
477 return Some(self.entries.swap_remove(i).1);
478 }
479 }
480
481 None
482 }
483
484 /// Returns `true` if the map contains a stream for the specified key.
485 ///
486 /// The key may be any borrowed form of the map's key type, but `Hash` and
487 /// `Eq` on the borrowed form must match those for the key type.
488 ///
489 /// # Examples
490 ///
491 /// ```
492 /// use tokio_stream::{StreamMap, pending};
493 ///
494 /// let mut map = StreamMap::new();
495 /// map.insert(1, pending::<i32>());
496 /// assert_eq!(map.contains_key(&1), true);
497 /// assert_eq!(map.contains_key(&2), false);
498 /// ```
499 pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
500 where
501 K: Borrow<Q>,
502 Q: Hash + Eq,
503 {
504 for i in 0..self.entries.len() {
505 if self.entries[i].0.borrow() == k {
506 return true;
507 }
508 }
509
510 false
511 }
512}
513
514impl<K, V> StreamMap<K, V>
515where
516 K: Unpin,
517 V: Stream + Unpin,
518{
519 /// Polls the next value, includes the vec entry index
520 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
521 let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
522 let mut idx = start;
523
524 for _ in 0..self.entries.len() {
525 let (_, stream) = &mut self.entries[idx];
526
527 match Pin::new(stream).poll_next(cx) {
528 Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
529 Poll::Ready(None) => {
530 // Remove the entry
531 self.entries.swap_remove(idx);
532
533 // Check if this was the last entry, if so the cursor needs
534 // to wrap
535 if idx == self.entries.len() {
536 idx = 0;
537 } else if idx < start && start <= self.entries.len() {
538 // The stream being swapped into the current index has
539 // already been polled, so skip it.
540 idx = idx.wrapping_add(1) % self.entries.len();
541 }
542 }
543 Poll::Pending => {
544 idx = idx.wrapping_add(1) % self.entries.len();
545 }
546 }
547 }
548
549 // If the map is empty, then the stream is complete.
550 if self.entries.is_empty() {
551 Poll::Ready(None)
552 } else {
553 Poll::Pending
554 }
555 }
556}
557
558impl<K, V> Default for StreamMap<K, V> {
559 fn default() -> Self {
560 Self::new()
561 }
562}
563
564impl<K, V> Stream for StreamMap<K, V>
565where
566 K: Clone + Unpin,
567 V: Stream + Unpin,
568{
569 type Item = (K, V::Item);
570
571 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
572 if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
573 let key = self.entries[idx].0.clone();
574 Poll::Ready(Some((key, val)))
575 } else {
576 Poll::Ready(None)
577 }
578 }
579
580 fn size_hint(&self) -> (usize, Option<usize>) {
581 let mut ret = (0, Some(0));
582
583 for (_, stream) in &self.entries {
584 let hint = stream.size_hint();
585
586 ret.0 += hint.0;
587
588 match (ret.1, hint.1) {
589 (Some(a), Some(b)) => ret.1 = Some(a + b),
590 (Some(_), None) => ret.1 = None,
591 _ => {}
592 }
593 }
594
595 ret
596 }
597}
598
599impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
600where
601 K: Hash + Eq,
602{
603 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
604 let iterator = iter.into_iter();
605 let (lower_bound, _) = iterator.size_hint();
606 let mut stream_map = Self::with_capacity(lower_bound);
607
608 for (key, value) in iterator {
609 stream_map.insert(key, value);
610 }
611
612 stream_map
613 }
614}
615
616impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
617 fn extend<T>(&mut self, iter: T)
618 where
619 T: IntoIterator<Item = (K, V)>,
620 {
621 self.entries.extend(iter);
622 }
623}
624
625mod rand {
626 use std::cell::Cell;
627
628 mod loom {
629 #[cfg(not(loom))]
630 pub(crate) mod rand {
631 use std::collections::hash_map::RandomState;
632 use std::hash::{BuildHasher, Hash, Hasher};
633 use std::sync::atomic::AtomicU32;
634 use std::sync::atomic::Ordering::Relaxed;
635
636 static COUNTER: AtomicU32 = AtomicU32::new(1);
637
638 pub(crate) fn seed() -> u64 {
639 let rand_state = RandomState::new();
640
641 let mut hasher = rand_state.build_hasher();
642
643 // Hash some unique-ish data to generate some new state
644 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
645
646 // Get the seed
647 hasher.finish()
648 }
649 }
650
651 #[cfg(loom)]
652 pub(crate) mod rand {
653 pub(crate) fn seed() -> u64 {
654 1
655 }
656 }
657 }
658
659 /// Fast random number generate
660 ///
661 /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
662 /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
663 /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
664 /// This generator passes the SmallCrush suite, part of TestU01 framework:
665 /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
666 #[derive(Debug)]
667 pub(crate) struct FastRand {
668 one: Cell<u32>,
669 two: Cell<u32>,
670 }
671
672 impl FastRand {
673 /// Initialize a new, thread-local, fast random number generator.
674 pub(crate) fn new(seed: u64) -> FastRand {
675 let one = (seed >> 32) as u32;
676 let mut two = seed as u32;
677
678 if two == 0 {
679 // This value cannot be zero
680 two = 1;
681 }
682
683 FastRand {
684 one: Cell::new(one),
685 two: Cell::new(two),
686 }
687 }
688
689 pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
690 // This is similar to fastrand() % n, but faster.
691 // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
692 let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
693 (mul >> 32) as u32
694 }
695
696 fn fastrand(&self) -> u32 {
697 let mut s1 = self.one.get();
698 let s0 = self.two.get();
699
700 s1 ^= s1 << 17;
701 s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
702
703 self.one.set(s0);
704 self.two.set(s1);
705
706 s0.wrapping_add(s1)
707 }
708 }
709
710 // Used by `StreamMap`
711 pub(crate) fn thread_rng_n(n: u32) -> u32 {
712 thread_local! {
713 static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
714 }
715
716 THREAD_RNG.with(|rng| rng.fastrand_n(n))
717 }
718}
719