1 | use crate::Stream; |
2 | |
3 | use std::borrow::Borrow; |
4 | use std::hash::Hash; |
5 | use std::pin::Pin; |
6 | use std::task::{Context, Poll}; |
7 | |
8 | /// Combine many streams into one, indexing each source stream with a unique |
9 | /// key. |
10 | /// |
11 | /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source |
12 | /// streams into a single merged stream that yields values in the order that |
13 | /// they arrive from the source streams. However, `StreamMap` has a lot more |
14 | /// flexibility in usage patterns. |
15 | /// |
16 | /// `StreamMap` can: |
17 | /// |
18 | /// * Merge an arbitrary number of streams. |
19 | /// * Track which source stream the value was received from. |
20 | /// * Handle inserting and removing streams from the set of managed streams at |
21 | /// any point during iteration. |
22 | /// |
23 | /// All source streams held by `StreamMap` are indexed using a key. This key is |
24 | /// included with the value when a source stream yields a value. The key is also |
25 | /// used to remove the stream from the `StreamMap` before the stream has |
26 | /// completed streaming. |
27 | /// |
28 | /// # `Unpin` |
29 | /// |
30 | /// Because the `StreamMap` API moves streams during runtime, both streams and |
31 | /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a |
32 | /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to |
33 | /// pin the stream in the heap. |
34 | /// |
35 | /// # Implementation |
36 | /// |
37 | /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this |
38 | /// internal implementation detail will persist in future versions, but it is |
39 | /// important to know the runtime implications. In general, `StreamMap` works |
40 | /// best with a "smallish" number of streams as all entries are scanned on |
41 | /// insert, remove, and polling. In cases where a large number of streams need |
42 | /// to be merged, it may be advisable to use tasks sending values on a shared |
43 | /// [`mpsc`] channel. |
44 | /// |
45 | /// # Notes |
46 | /// |
47 | /// `StreamMap` removes finished streams automatically, without alerting the user. |
48 | /// In some scenarios, the caller would want to know on closed streams. |
49 | /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream. |
50 | /// It will return None when the stream is closed. |
51 | /// |
52 | /// [`StreamExt::merge`]: crate::StreamExt::merge |
53 | /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html |
54 | /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html |
55 | /// [`Box::pin`]: std::boxed::Box::pin |
56 | /// [`StreamNotifyClose`]: crate::StreamNotifyClose |
57 | /// |
58 | /// # Examples |
59 | /// |
60 | /// Merging two streams, then remove them after receiving the first value |
61 | /// |
62 | /// ``` |
63 | /// use tokio_stream::{StreamExt, StreamMap, Stream}; |
64 | /// use tokio::sync::mpsc; |
65 | /// use std::pin::Pin; |
66 | /// |
67 | /// #[tokio::main] |
68 | /// async fn main() { |
69 | /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); |
70 | /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); |
71 | /// |
72 | /// // Convert the channels to a `Stream`. |
73 | /// let rx1 = Box::pin(async_stream::stream! { |
74 | /// while let Some(item) = rx1.recv().await { |
75 | /// yield item; |
76 | /// } |
77 | /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
78 | /// |
79 | /// let rx2 = Box::pin(async_stream::stream! { |
80 | /// while let Some(item) = rx2.recv().await { |
81 | /// yield item; |
82 | /// } |
83 | /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; |
84 | /// |
85 | /// tokio::spawn(async move { |
86 | /// tx1.send(1).await.unwrap(); |
87 | /// |
88 | /// // This value will never be received. The send may or may not return |
89 | /// // `Err` depending on if the remote end closed first or not. |
90 | /// let _ = tx1.send(2).await; |
91 | /// }); |
92 | /// |
93 | /// tokio::spawn(async move { |
94 | /// tx2.send(3).await.unwrap(); |
95 | /// let _ = tx2.send(4).await; |
96 | /// }); |
97 | /// |
98 | /// let mut map = StreamMap::new(); |
99 | /// |
100 | /// // Insert both streams |
101 | /// map.insert("one" , rx1); |
102 | /// map.insert("two" , rx2); |
103 | /// |
104 | /// // Read twice |
105 | /// for _ in 0..2 { |
106 | /// let (key, val) = map.next().await.unwrap(); |
107 | /// |
108 | /// if key == "one" { |
109 | /// assert_eq!(val, 1); |
110 | /// } else { |
111 | /// assert_eq!(val, 3); |
112 | /// } |
113 | /// |
114 | /// // Remove the stream to prevent reading the next value |
115 | /// map.remove(key); |
116 | /// } |
117 | /// } |
118 | /// ``` |
119 | /// |
120 | /// This example models a read-only client to a chat system with channels. The |
121 | /// client sends commands to join and leave channels. `StreamMap` is used to |
122 | /// manage active channel subscriptions. |
123 | /// |
124 | /// For simplicity, messages are displayed with `println!`, but they could be |
125 | /// sent to the client over a socket. |
126 | /// |
127 | /// ```no_run |
128 | /// use tokio_stream::{Stream, StreamExt, StreamMap}; |
129 | /// |
130 | /// enum Command { |
131 | /// Join(String), |
132 | /// Leave(String), |
133 | /// } |
134 | /// |
135 | /// fn commands() -> impl Stream<Item = Command> { |
136 | /// // Streams in user commands by parsing `stdin`. |
137 | /// # tokio_stream::pending() |
138 | /// } |
139 | /// |
140 | /// // Join a channel, returns a stream of messages received on the channel. |
141 | /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { |
142 | /// // left as an exercise to the reader |
143 | /// # tokio_stream::pending() |
144 | /// } |
145 | /// |
146 | /// #[tokio::main] |
147 | /// async fn main() { |
148 | /// let mut channels = StreamMap::new(); |
149 | /// |
150 | /// // Input commands (join / leave channels). |
151 | /// let cmds = commands(); |
152 | /// tokio::pin!(cmds); |
153 | /// |
154 | /// loop { |
155 | /// tokio::select! { |
156 | /// Some(cmd) = cmds.next() => { |
157 | /// match cmd { |
158 | /// Command::Join(chan) => { |
159 | /// // Join the channel and add it to the `channels` |
160 | /// // stream map |
161 | /// let msgs = join(&chan); |
162 | /// channels.insert(chan, msgs); |
163 | /// } |
164 | /// Command::Leave(chan) => { |
165 | /// channels.remove(&chan); |
166 | /// } |
167 | /// } |
168 | /// } |
169 | /// Some((chan, msg)) = channels.next() => { |
170 | /// // Received a message, display it on stdout with the channel |
171 | /// // it originated from. |
172 | /// println!("{}: {}" , chan, msg); |
173 | /// } |
174 | /// // Both the `commands` stream and the `channels` stream are |
175 | /// // complete. There is no more work to do, so leave the loop. |
176 | /// else => break, |
177 | /// } |
178 | /// } |
179 | /// } |
180 | /// ``` |
181 | /// |
182 | /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. |
183 | /// |
184 | /// ``` |
185 | /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; |
186 | /// |
187 | /// #[tokio::main] |
188 | /// async fn main() { |
189 | /// let mut map = StreamMap::new(); |
190 | /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
191 | /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); |
192 | /// map.insert(0, stream); |
193 | /// map.insert(1, stream2); |
194 | /// while let Some((key, val)) = map.next().await { |
195 | /// match val { |
196 | /// Some(val) => println!("got {val:?} from stream {key:?}" ), |
197 | /// None => println!("stream {key:?} closed" ), |
198 | /// } |
199 | /// } |
200 | /// } |
201 | /// ``` |
202 | |
203 | #[derive(Debug)] |
204 | pub struct StreamMap<K, V> { |
205 | /// Streams stored in the map |
206 | entries: Vec<(K, V)>, |
207 | } |
208 | |
209 | impl<K, V> StreamMap<K, V> { |
210 | /// An iterator visiting all key-value pairs in arbitrary order. |
211 | /// |
212 | /// The iterator element type is &'a (K, V). |
213 | /// |
214 | /// # Examples |
215 | /// |
216 | /// ``` |
217 | /// use tokio_stream::{StreamMap, pending}; |
218 | /// |
219 | /// let mut map = StreamMap::new(); |
220 | /// |
221 | /// map.insert("a" , pending::<i32>()); |
222 | /// map.insert("b" , pending()); |
223 | /// map.insert("c" , pending()); |
224 | /// |
225 | /// for (key, stream) in map.iter() { |
226 | /// println!("({}, {:?})" , key, stream); |
227 | /// } |
228 | /// ``` |
229 | pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { |
230 | self.entries.iter() |
231 | } |
232 | |
233 | /// An iterator visiting all key-value pairs mutably in arbitrary order. |
234 | /// |
235 | /// The iterator element type is &'a mut (K, V). |
236 | /// |
237 | /// # Examples |
238 | /// |
239 | /// ``` |
240 | /// use tokio_stream::{StreamMap, pending}; |
241 | /// |
242 | /// let mut map = StreamMap::new(); |
243 | /// |
244 | /// map.insert("a" , pending::<i32>()); |
245 | /// map.insert("b" , pending()); |
246 | /// map.insert("c" , pending()); |
247 | /// |
248 | /// for (key, stream) in map.iter_mut() { |
249 | /// println!("({}, {:?})" , key, stream); |
250 | /// } |
251 | /// ``` |
252 | pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { |
253 | self.entries.iter_mut() |
254 | } |
255 | |
256 | /// Creates an empty `StreamMap`. |
257 | /// |
258 | /// The stream map is initially created with a capacity of `0`, so it will |
259 | /// not allocate until it is first inserted into. |
260 | /// |
261 | /// # Examples |
262 | /// |
263 | /// ``` |
264 | /// use tokio_stream::{StreamMap, Pending}; |
265 | /// |
266 | /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); |
267 | /// ``` |
268 | pub fn new() -> StreamMap<K, V> { |
269 | StreamMap { entries: vec![] } |
270 | } |
271 | |
272 | /// Creates an empty `StreamMap` with the specified capacity. |
273 | /// |
274 | /// The stream map will be able to hold at least `capacity` elements without |
275 | /// reallocating. If `capacity` is 0, the stream map will not allocate. |
276 | /// |
277 | /// # Examples |
278 | /// |
279 | /// ``` |
280 | /// use tokio_stream::{StreamMap, Pending}; |
281 | /// |
282 | /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); |
283 | /// ``` |
284 | pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { |
285 | StreamMap { |
286 | entries: Vec::with_capacity(capacity), |
287 | } |
288 | } |
289 | |
290 | /// Returns an iterator visiting all keys in arbitrary order. |
291 | /// |
292 | /// The iterator element type is &'a K. |
293 | /// |
294 | /// # Examples |
295 | /// |
296 | /// ``` |
297 | /// use tokio_stream::{StreamMap, pending}; |
298 | /// |
299 | /// let mut map = StreamMap::new(); |
300 | /// |
301 | /// map.insert("a" , pending::<i32>()); |
302 | /// map.insert("b" , pending()); |
303 | /// map.insert("c" , pending()); |
304 | /// |
305 | /// for key in map.keys() { |
306 | /// println!("{}" , key); |
307 | /// } |
308 | /// ``` |
309 | pub fn keys(&self) -> impl Iterator<Item = &K> { |
310 | self.iter().map(|(k, _)| k) |
311 | } |
312 | |
313 | /// An iterator visiting all values in arbitrary order. |
314 | /// |
315 | /// The iterator element type is &'a V. |
316 | /// |
317 | /// # Examples |
318 | /// |
319 | /// ``` |
320 | /// use tokio_stream::{StreamMap, pending}; |
321 | /// |
322 | /// let mut map = StreamMap::new(); |
323 | /// |
324 | /// map.insert("a" , pending::<i32>()); |
325 | /// map.insert("b" , pending()); |
326 | /// map.insert("c" , pending()); |
327 | /// |
328 | /// for stream in map.values() { |
329 | /// println!("{:?}" , stream); |
330 | /// } |
331 | /// ``` |
332 | pub fn values(&self) -> impl Iterator<Item = &V> { |
333 | self.iter().map(|(_, v)| v) |
334 | } |
335 | |
336 | /// An iterator visiting all values mutably in arbitrary order. |
337 | /// |
338 | /// The iterator element type is &'a mut V. |
339 | /// |
340 | /// # Examples |
341 | /// |
342 | /// ``` |
343 | /// use tokio_stream::{StreamMap, pending}; |
344 | /// |
345 | /// let mut map = StreamMap::new(); |
346 | /// |
347 | /// map.insert("a" , pending::<i32>()); |
348 | /// map.insert("b" , pending()); |
349 | /// map.insert("c" , pending()); |
350 | /// |
351 | /// for stream in map.values_mut() { |
352 | /// println!("{:?}" , stream); |
353 | /// } |
354 | /// ``` |
355 | pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { |
356 | self.iter_mut().map(|(_, v)| v) |
357 | } |
358 | |
359 | /// Returns the number of streams the map can hold without reallocating. |
360 | /// |
361 | /// This number is a lower bound; the `StreamMap` might be able to hold |
362 | /// more, but is guaranteed to be able to hold at least this many. |
363 | /// |
364 | /// # Examples |
365 | /// |
366 | /// ``` |
367 | /// use tokio_stream::{StreamMap, Pending}; |
368 | /// |
369 | /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); |
370 | /// assert!(map.capacity() >= 100); |
371 | /// ``` |
372 | pub fn capacity(&self) -> usize { |
373 | self.entries.capacity() |
374 | } |
375 | |
376 | /// Returns the number of streams in the map. |
377 | /// |
378 | /// # Examples |
379 | /// |
380 | /// ``` |
381 | /// use tokio_stream::{StreamMap, pending}; |
382 | /// |
383 | /// let mut a = StreamMap::new(); |
384 | /// assert_eq!(a.len(), 0); |
385 | /// a.insert(1, pending::<i32>()); |
386 | /// assert_eq!(a.len(), 1); |
387 | /// ``` |
388 | pub fn len(&self) -> usize { |
389 | self.entries.len() |
390 | } |
391 | |
392 | /// Returns `true` if the map contains no elements. |
393 | /// |
394 | /// # Examples |
395 | /// |
396 | /// ``` |
397 | /// use tokio_stream::{StreamMap, pending}; |
398 | /// |
399 | /// let mut a = StreamMap::new(); |
400 | /// assert!(a.is_empty()); |
401 | /// a.insert(1, pending::<i32>()); |
402 | /// assert!(!a.is_empty()); |
403 | /// ``` |
404 | pub fn is_empty(&self) -> bool { |
405 | self.entries.is_empty() |
406 | } |
407 | |
408 | /// Clears the map, removing all key-stream pairs. Keeps the allocated |
409 | /// memory for reuse. |
410 | /// |
411 | /// # Examples |
412 | /// |
413 | /// ``` |
414 | /// use tokio_stream::{StreamMap, pending}; |
415 | /// |
416 | /// let mut a = StreamMap::new(); |
417 | /// a.insert(1, pending::<i32>()); |
418 | /// a.clear(); |
419 | /// assert!(a.is_empty()); |
420 | /// ``` |
421 | pub fn clear(&mut self) { |
422 | self.entries.clear(); |
423 | } |
424 | |
425 | /// Insert a key-stream pair into the map. |
426 | /// |
427 | /// If the map did not have this key present, `None` is returned. |
428 | /// |
429 | /// If the map did have this key present, the new `stream` replaces the old |
430 | /// one and the old stream is returned. |
431 | /// |
432 | /// # Examples |
433 | /// |
434 | /// ``` |
435 | /// use tokio_stream::{StreamMap, pending}; |
436 | /// |
437 | /// let mut map = StreamMap::new(); |
438 | /// |
439 | /// assert!(map.insert(37, pending::<i32>()).is_none()); |
440 | /// assert!(!map.is_empty()); |
441 | /// |
442 | /// map.insert(37, pending()); |
443 | /// assert!(map.insert(37, pending()).is_some()); |
444 | /// ``` |
445 | pub fn insert(&mut self, k: K, stream: V) -> Option<V> |
446 | where |
447 | K: Hash + Eq, |
448 | { |
449 | let ret = self.remove(&k); |
450 | self.entries.push((k, stream)); |
451 | |
452 | ret |
453 | } |
454 | |
455 | /// Removes a key from the map, returning the stream at the key if the key was previously in the map. |
456 | /// |
457 | /// The key may be any borrowed form of the map's key type, but `Hash` and |
458 | /// `Eq` on the borrowed form must match those for the key type. |
459 | /// |
460 | /// # Examples |
461 | /// |
462 | /// ``` |
463 | /// use tokio_stream::{StreamMap, pending}; |
464 | /// |
465 | /// let mut map = StreamMap::new(); |
466 | /// map.insert(1, pending::<i32>()); |
467 | /// assert!(map.remove(&1).is_some()); |
468 | /// assert!(map.remove(&1).is_none()); |
469 | /// ``` |
470 | pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> |
471 | where |
472 | K: Borrow<Q>, |
473 | Q: Hash + Eq, |
474 | { |
475 | for i in 0..self.entries.len() { |
476 | if self.entries[i].0.borrow() == k { |
477 | return Some(self.entries.swap_remove(i).1); |
478 | } |
479 | } |
480 | |
481 | None |
482 | } |
483 | |
484 | /// Returns `true` if the map contains a stream for the specified key. |
485 | /// |
486 | /// The key may be any borrowed form of the map's key type, but `Hash` and |
487 | /// `Eq` on the borrowed form must match those for the key type. |
488 | /// |
489 | /// # Examples |
490 | /// |
491 | /// ``` |
492 | /// use tokio_stream::{StreamMap, pending}; |
493 | /// |
494 | /// let mut map = StreamMap::new(); |
495 | /// map.insert(1, pending::<i32>()); |
496 | /// assert_eq!(map.contains_key(&1), true); |
497 | /// assert_eq!(map.contains_key(&2), false); |
498 | /// ``` |
499 | pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool |
500 | where |
501 | K: Borrow<Q>, |
502 | Q: Hash + Eq, |
503 | { |
504 | for i in 0..self.entries.len() { |
505 | if self.entries[i].0.borrow() == k { |
506 | return true; |
507 | } |
508 | } |
509 | |
510 | false |
511 | } |
512 | } |
513 | |
514 | impl<K, V> StreamMap<K, V> |
515 | where |
516 | K: Unpin, |
517 | V: Stream + Unpin, |
518 | { |
519 | /// Polls the next value, includes the vec entry index |
520 | fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { |
521 | let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; |
522 | let mut idx = start; |
523 | |
524 | for _ in 0..self.entries.len() { |
525 | let (_, stream) = &mut self.entries[idx]; |
526 | |
527 | match Pin::new(stream).poll_next(cx) { |
528 | Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))), |
529 | Poll::Ready(None) => { |
530 | // Remove the entry |
531 | self.entries.swap_remove(idx); |
532 | |
533 | // Check if this was the last entry, if so the cursor needs |
534 | // to wrap |
535 | if idx == self.entries.len() { |
536 | idx = 0; |
537 | } else if idx < start && start <= self.entries.len() { |
538 | // The stream being swapped into the current index has |
539 | // already been polled, so skip it. |
540 | idx = idx.wrapping_add(1) % self.entries.len(); |
541 | } |
542 | } |
543 | Poll::Pending => { |
544 | idx = idx.wrapping_add(1) % self.entries.len(); |
545 | } |
546 | } |
547 | } |
548 | |
549 | // If the map is empty, then the stream is complete. |
550 | if self.entries.is_empty() { |
551 | Poll::Ready(None) |
552 | } else { |
553 | Poll::Pending |
554 | } |
555 | } |
556 | } |
557 | |
558 | impl<K, V> Default for StreamMap<K, V> { |
559 | fn default() -> Self { |
560 | Self::new() |
561 | } |
562 | } |
563 | |
564 | impl<K, V> Stream for StreamMap<K, V> |
565 | where |
566 | K: Clone + Unpin, |
567 | V: Stream + Unpin, |
568 | { |
569 | type Item = (K, V::Item); |
570 | |
571 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
572 | if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { |
573 | let key = self.entries[idx].0.clone(); |
574 | Poll::Ready(Some((key, val))) |
575 | } else { |
576 | Poll::Ready(None) |
577 | } |
578 | } |
579 | |
580 | fn size_hint(&self) -> (usize, Option<usize>) { |
581 | let mut ret = (0, Some(0)); |
582 | |
583 | for (_, stream) in &self.entries { |
584 | let hint = stream.size_hint(); |
585 | |
586 | ret.0 += hint.0; |
587 | |
588 | match (ret.1, hint.1) { |
589 | (Some(a), Some(b)) => ret.1 = Some(a + b), |
590 | (Some(_), None) => ret.1 = None, |
591 | _ => {} |
592 | } |
593 | } |
594 | |
595 | ret |
596 | } |
597 | } |
598 | |
599 | impl<K, V> FromIterator<(K, V)> for StreamMap<K, V> |
600 | where |
601 | K: Hash + Eq, |
602 | { |
603 | fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { |
604 | let iterator = iter.into_iter(); |
605 | let (lower_bound, _) = iterator.size_hint(); |
606 | let mut stream_map = Self::with_capacity(lower_bound); |
607 | |
608 | for (key, value) in iterator { |
609 | stream_map.insert(key, value); |
610 | } |
611 | |
612 | stream_map |
613 | } |
614 | } |
615 | |
616 | impl<K, V> Extend<(K, V)> for StreamMap<K, V> { |
617 | fn extend<T>(&mut self, iter: T) |
618 | where |
619 | T: IntoIterator<Item = (K, V)>, |
620 | { |
621 | self.entries.extend(iter); |
622 | } |
623 | } |
624 | |
625 | mod rand { |
626 | use std::cell::Cell; |
627 | |
628 | mod loom { |
629 | #[cfg (not(loom))] |
630 | pub(crate) mod rand { |
631 | use std::collections::hash_map::RandomState; |
632 | use std::hash::{BuildHasher, Hash, Hasher}; |
633 | use std::sync::atomic::AtomicU32; |
634 | use std::sync::atomic::Ordering::Relaxed; |
635 | |
636 | static COUNTER: AtomicU32 = AtomicU32::new(1); |
637 | |
638 | pub(crate) fn seed() -> u64 { |
639 | let rand_state = RandomState::new(); |
640 | |
641 | let mut hasher = rand_state.build_hasher(); |
642 | |
643 | // Hash some unique-ish data to generate some new state |
644 | COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); |
645 | |
646 | // Get the seed |
647 | hasher.finish() |
648 | } |
649 | } |
650 | |
651 | #[cfg (loom)] |
652 | pub(crate) mod rand { |
653 | pub(crate) fn seed() -> u64 { |
654 | 1 |
655 | } |
656 | } |
657 | } |
658 | |
659 | /// Fast random number generate |
660 | /// |
661 | /// Implement xorshift64+: 2 32-bit xorshift sequences added together. |
662 | /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's |
663 | /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> |
664 | /// This generator passes the SmallCrush suite, part of TestU01 framework: |
665 | /// <http://simul.iro.umontreal.ca/testu01/tu01.html> |
666 | #[derive(Debug)] |
667 | pub(crate) struct FastRand { |
668 | one: Cell<u32>, |
669 | two: Cell<u32>, |
670 | } |
671 | |
672 | impl FastRand { |
673 | /// Initialize a new, thread-local, fast random number generator. |
674 | pub(crate) fn new(seed: u64) -> FastRand { |
675 | let one = (seed >> 32) as u32; |
676 | let mut two = seed as u32; |
677 | |
678 | if two == 0 { |
679 | // This value cannot be zero |
680 | two = 1; |
681 | } |
682 | |
683 | FastRand { |
684 | one: Cell::new(one), |
685 | two: Cell::new(two), |
686 | } |
687 | } |
688 | |
689 | pub(crate) fn fastrand_n(&self, n: u32) -> u32 { |
690 | // This is similar to fastrand() % n, but faster. |
691 | // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ |
692 | let mul = (self.fastrand() as u64).wrapping_mul(n as u64); |
693 | (mul >> 32) as u32 |
694 | } |
695 | |
696 | fn fastrand(&self) -> u32 { |
697 | let mut s1 = self.one.get(); |
698 | let s0 = self.two.get(); |
699 | |
700 | s1 ^= s1 << 17; |
701 | s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; |
702 | |
703 | self.one.set(s0); |
704 | self.two.set(s1); |
705 | |
706 | s0.wrapping_add(s1) |
707 | } |
708 | } |
709 | |
710 | // Used by `StreamMap` |
711 | pub(crate) fn thread_rng_n(n: u32) -> u32 { |
712 | thread_local! { |
713 | static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); |
714 | } |
715 | |
716 | THREAD_RNG.with(|rng| rng.fastrand_n(n)) |
717 | } |
718 | } |
719 | |