| 1 | #![cfg (not(loom))] |
| 2 | |
| 3 | //! A mock stream implementing [`Stream`]. |
| 4 | //! |
| 5 | //! # Overview |
| 6 | //! This crate provides a `StreamMock` that can be used to test code that interacts with streams. |
| 7 | //! It allows you to mock the behavior of a stream and control the items it yields and the waiting |
| 8 | //! intervals between items. |
| 9 | //! |
| 10 | //! # Usage |
| 11 | //! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder |
| 12 | //! allows you to enqueue actions such as returning items or waiting for a certain duration. |
| 13 | //! |
| 14 | //! # Example |
| 15 | //! ```rust |
| 16 | //! |
| 17 | //! use futures_util::StreamExt; |
| 18 | //! use std::time::Duration; |
| 19 | //! use tokio_test::stream_mock::StreamMockBuilder; |
| 20 | //! |
| 21 | //! async fn test_stream_mock_wait() { |
| 22 | //! let mut stream_mock = StreamMockBuilder::new() |
| 23 | //! .next(1) |
| 24 | //! .wait(Duration::from_millis(300)) |
| 25 | //! .next(2) |
| 26 | //! .build(); |
| 27 | //! |
| 28 | //! assert_eq!(stream_mock.next().await, Some(1)); |
| 29 | //! let start = std::time::Instant::now(); |
| 30 | //! assert_eq!(stream_mock.next().await, Some(2)); |
| 31 | //! let elapsed = start.elapsed(); |
| 32 | //! assert!(elapsed >= Duration::from_millis(300)); |
| 33 | //! assert_eq!(stream_mock.next().await, None); |
| 34 | //! } |
| 35 | //! ``` |
| 36 | |
| 37 | use std::collections::VecDeque; |
| 38 | use std::pin::Pin; |
| 39 | use std::task::Poll; |
| 40 | use std::time::Duration; |
| 41 | |
| 42 | use futures_core::{ready, Stream}; |
| 43 | use std::future::Future; |
| 44 | use tokio::time::{sleep_until, Instant, Sleep}; |
| 45 | |
| 46 | #[derive(Debug, Clone)] |
| 47 | enum Action<T: Unpin> { |
| 48 | Next(T), |
| 49 | Wait(Duration), |
| 50 | } |
| 51 | |
| 52 | /// A builder for [`StreamMock`] |
| 53 | #[derive(Debug, Clone)] |
| 54 | pub struct StreamMockBuilder<T: Unpin> { |
| 55 | actions: VecDeque<Action<T>>, |
| 56 | } |
| 57 | |
| 58 | impl<T: Unpin> StreamMockBuilder<T> { |
| 59 | /// Create a new empty [`StreamMockBuilder`] |
| 60 | pub fn new() -> Self { |
| 61 | StreamMockBuilder::default() |
| 62 | } |
| 63 | |
| 64 | /// Queue an item to be returned by the stream |
| 65 | pub fn next(mut self, value: T) -> Self { |
| 66 | self.actions.push_back(Action::Next(value)); |
| 67 | self |
| 68 | } |
| 69 | |
| 70 | // Queue an item to be consumed by the sink, |
| 71 | // commented out until Sink is implemented. |
| 72 | // |
| 73 | // pub fn consume(mut self, value: T) -> Self { |
| 74 | // self.actions.push_back(Action::Consume(value)); |
| 75 | // self |
| 76 | // } |
| 77 | |
| 78 | /// Queue the stream to wait for a duration |
| 79 | pub fn wait(mut self, duration: Duration) -> Self { |
| 80 | self.actions.push_back(Action::Wait(duration)); |
| 81 | self |
| 82 | } |
| 83 | |
| 84 | /// Build the [`StreamMock`] |
| 85 | pub fn build(self) -> StreamMock<T> { |
| 86 | StreamMock { |
| 87 | actions: self.actions, |
| 88 | sleep: None, |
| 89 | } |
| 90 | } |
| 91 | } |
| 92 | |
| 93 | impl<T: Unpin> Default for StreamMockBuilder<T> { |
| 94 | fn default() -> Self { |
| 95 | StreamMockBuilder { |
| 96 | actions: VecDeque::new(), |
| 97 | } |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | /// A mock stream implementing [`Stream`] |
| 102 | /// |
| 103 | /// See [`StreamMockBuilder`] for more information. |
| 104 | #[derive(Debug)] |
| 105 | pub struct StreamMock<T: Unpin> { |
| 106 | actions: VecDeque<Action<T>>, |
| 107 | sleep: Option<Pin<Box<Sleep>>>, |
| 108 | } |
| 109 | |
| 110 | impl<T: Unpin> StreamMock<T> { |
| 111 | fn next_action(&mut self) -> Option<Action<T>> { |
| 112 | self.actions.pop_front() |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | impl<T: Unpin> Stream for StreamMock<T> { |
| 117 | type Item = T; |
| 118 | |
| 119 | fn poll_next( |
| 120 | mut self: std::pin::Pin<&mut Self>, |
| 121 | cx: &mut std::task::Context<'_>, |
| 122 | ) -> std::task::Poll<Option<Self::Item>> { |
| 123 | // Try polling the sleep future first |
| 124 | if let Some(ref mut sleep) = self.sleep { |
| 125 | ready!(Pin::new(sleep).poll(cx)); |
| 126 | // Since we're ready, discard the sleep future |
| 127 | self.sleep.take(); |
| 128 | } |
| 129 | |
| 130 | match self.next_action() { |
| 131 | Some(action) => match action { |
| 132 | Action::Next(item) => Poll::Ready(Some(item)), |
| 133 | Action::Wait(duration) => { |
| 134 | // Set up a sleep future and schedule this future to be polled again for it. |
| 135 | self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration))); |
| 136 | cx.waker().wake_by_ref(); |
| 137 | |
| 138 | Poll::Pending |
| 139 | } |
| 140 | }, |
| 141 | None => Poll::Ready(None), |
| 142 | } |
| 143 | } |
| 144 | } |
| 145 | |
| 146 | impl<T: Unpin> Drop for StreamMock<T> { |
| 147 | fn drop(&mut self) { |
| 148 | // Avoid double panicking to make debugging easier. |
| 149 | if std::thread::panicking() { |
| 150 | return; |
| 151 | } |
| 152 | |
| 153 | let undropped_count = self |
| 154 | .actions |
| 155 | .iter() |
| 156 | .filter(|action| match action { |
| 157 | Action::Next(_) => true, |
| 158 | Action::Wait(_) => false, |
| 159 | }) |
| 160 | .count(); |
| 161 | |
| 162 | assert!( |
| 163 | undropped_count == 0, |
| 164 | "StreamMock was dropped before all actions were consumed, {} actions were not consumed" , |
| 165 | undropped_count |
| 166 | ); |
| 167 | } |
| 168 | } |
| 169 | |