1 | #![cfg (not(loom))] |
2 | |
3 | //! A mock stream implementing [`Stream`]. |
4 | //! |
5 | //! # Overview |
6 | //! This crate provides a `StreamMock` that can be used to test code that interacts with streams. |
7 | //! It allows you to mock the behavior of a stream and control the items it yields and the waiting |
8 | //! intervals between items. |
9 | //! |
10 | //! # Usage |
11 | //! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder |
12 | //! allows you to enqueue actions such as returning items or waiting for a certain duration. |
13 | //! |
14 | //! # Example |
15 | //! ```rust |
16 | //! |
17 | //! use futures_util::StreamExt; |
18 | //! use std::time::Duration; |
19 | //! use tokio_test::stream_mock::StreamMockBuilder; |
20 | //! |
21 | //! async fn test_stream_mock_wait() { |
22 | //! let mut stream_mock = StreamMockBuilder::new() |
23 | //! .next(1) |
24 | //! .wait(Duration::from_millis(300)) |
25 | //! .next(2) |
26 | //! .build(); |
27 | //! |
28 | //! assert_eq!(stream_mock.next().await, Some(1)); |
29 | //! let start = std::time::Instant::now(); |
30 | //! assert_eq!(stream_mock.next().await, Some(2)); |
31 | //! let elapsed = start.elapsed(); |
32 | //! assert!(elapsed >= Duration::from_millis(300)); |
33 | //! assert_eq!(stream_mock.next().await, None); |
34 | //! } |
35 | //! ``` |
36 | |
37 | use std::collections::VecDeque; |
38 | use std::pin::Pin; |
39 | use std::task::Poll; |
40 | use std::time::Duration; |
41 | |
42 | use futures_core::{ready, Stream}; |
43 | use std::future::Future; |
44 | use tokio::time::{sleep_until, Instant, Sleep}; |
45 | |
46 | #[derive(Debug, Clone)] |
47 | enum Action<T: Unpin> { |
48 | Next(T), |
49 | Wait(Duration), |
50 | } |
51 | |
52 | /// A builder for [`StreamMock`] |
53 | #[derive(Debug, Clone)] |
54 | pub struct StreamMockBuilder<T: Unpin> { |
55 | actions: VecDeque<Action<T>>, |
56 | } |
57 | |
58 | impl<T: Unpin> StreamMockBuilder<T> { |
59 | /// Create a new empty [`StreamMockBuilder`] |
60 | pub fn new() -> Self { |
61 | StreamMockBuilder::default() |
62 | } |
63 | |
64 | /// Queue an item to be returned by the stream |
65 | pub fn next(mut self, value: T) -> Self { |
66 | self.actions.push_back(Action::Next(value)); |
67 | self |
68 | } |
69 | |
70 | // Queue an item to be consumed by the sink, |
71 | // commented out until Sink is implemented. |
72 | // |
73 | // pub fn consume(mut self, value: T) -> Self { |
74 | // self.actions.push_back(Action::Consume(value)); |
75 | // self |
76 | // } |
77 | |
78 | /// Queue the stream to wait for a duration |
79 | pub fn wait(mut self, duration: Duration) -> Self { |
80 | self.actions.push_back(Action::Wait(duration)); |
81 | self |
82 | } |
83 | |
84 | /// Build the [`StreamMock`] |
85 | pub fn build(self) -> StreamMock<T> { |
86 | StreamMock { |
87 | actions: self.actions, |
88 | sleep: None, |
89 | } |
90 | } |
91 | } |
92 | |
93 | impl<T: Unpin> Default for StreamMockBuilder<T> { |
94 | fn default() -> Self { |
95 | StreamMockBuilder { |
96 | actions: VecDeque::new(), |
97 | } |
98 | } |
99 | } |
100 | |
101 | /// A mock stream implementing [`Stream`] |
102 | /// |
103 | /// See [`StreamMockBuilder`] for more information. |
104 | #[derive(Debug)] |
105 | pub struct StreamMock<T: Unpin> { |
106 | actions: VecDeque<Action<T>>, |
107 | sleep: Option<Pin<Box<Sleep>>>, |
108 | } |
109 | |
110 | impl<T: Unpin> StreamMock<T> { |
111 | fn next_action(&mut self) -> Option<Action<T>> { |
112 | self.actions.pop_front() |
113 | } |
114 | } |
115 | |
116 | impl<T: Unpin> Stream for StreamMock<T> { |
117 | type Item = T; |
118 | |
119 | fn poll_next( |
120 | mut self: std::pin::Pin<&mut Self>, |
121 | cx: &mut std::task::Context<'_>, |
122 | ) -> std::task::Poll<Option<Self::Item>> { |
123 | // Try polling the sleep future first |
124 | if let Some(ref mut sleep) = self.sleep { |
125 | ready!(Pin::new(sleep).poll(cx)); |
126 | // Since we're ready, discard the sleep future |
127 | self.sleep.take(); |
128 | } |
129 | |
130 | match self.next_action() { |
131 | Some(action) => match action { |
132 | Action::Next(item) => Poll::Ready(Some(item)), |
133 | Action::Wait(duration) => { |
134 | // Set up a sleep future and schedule this future to be polled again for it. |
135 | self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration))); |
136 | cx.waker().wake_by_ref(); |
137 | |
138 | Poll::Pending |
139 | } |
140 | }, |
141 | None => Poll::Ready(None), |
142 | } |
143 | } |
144 | } |
145 | |
146 | impl<T: Unpin> Drop for StreamMock<T> { |
147 | fn drop(&mut self) { |
148 | // Avoid double panicking to make debugging easier. |
149 | if std::thread::panicking() { |
150 | return; |
151 | } |
152 | |
153 | let undropped_count = self |
154 | .actions |
155 | .iter() |
156 | .filter(|action| match action { |
157 | Action::Next(_) => true, |
158 | Action::Wait(_) => false, |
159 | }) |
160 | .count(); |
161 | |
162 | assert!( |
163 | undropped_count == 0, |
164 | "StreamMock was dropped before all actions were consumed, {} actions were not consumed" , |
165 | undropped_count |
166 | ); |
167 | } |
168 | } |
169 | |