1#![cfg(not(loom))]
2
3//! A mock stream implementing [`Stream`].
4//!
5//! # Overview
6//! This crate provides a `StreamMock` that can be used to test code that interacts with streams.
7//! It allows you to mock the behavior of a stream and control the items it yields and the waiting
8//! intervals between items.
9//!
10//! # Usage
11//! To use the `StreamMock`, you need to create a builder using[`StreamMockBuilder`]. The builder
12//! allows you to enqueue actions such as returning items or waiting for a certain duration.
13//!
14//! # Example
15//! ```rust
16//!
17//! use futures_util::StreamExt;
18//! use std::time::Duration;
19//! use tokio_test::stream_mock::StreamMockBuilder;
20//!
21//! async fn test_stream_mock_wait() {
22//! let mut stream_mock = StreamMockBuilder::new()
23//! .next(1)
24//! .wait(Duration::from_millis(300))
25//! .next(2)
26//! .build();
27//!
28//! assert_eq!(stream_mock.next().await, Some(1));
29//! let start = std::time::Instant::now();
30//! assert_eq!(stream_mock.next().await, Some(2));
31//! let elapsed = start.elapsed();
32//! assert!(elapsed >= Duration::from_millis(300));
33//! assert_eq!(stream_mock.next().await, None);
34//! }
35//! ```
36
37use std::collections::VecDeque;
38use std::pin::Pin;
39use std::task::Poll;
40use std::time::Duration;
41
42use futures_core::{ready, Stream};
43use std::future::Future;
44use tokio::time::{sleep_until, Instant, Sleep};
45
46#[derive(Debug, Clone)]
47enum Action<T: Unpin> {
48 Next(T),
49 Wait(Duration),
50}
51
52/// A builder for [`StreamMock`]
53#[derive(Debug, Clone)]
54pub struct StreamMockBuilder<T: Unpin> {
55 actions: VecDeque<Action<T>>,
56}
57
58impl<T: Unpin> StreamMockBuilder<T> {
59 /// Create a new empty [`StreamMockBuilder`]
60 pub fn new() -> Self {
61 StreamMockBuilder::default()
62 }
63
64 /// Queue an item to be returned by the stream
65 pub fn next(mut self, value: T) -> Self {
66 self.actions.push_back(Action::Next(value));
67 self
68 }
69
70 // Queue an item to be consumed by the sink,
71 // commented out until Sink is implemented.
72 //
73 // pub fn consume(mut self, value: T) -> Self {
74 // self.actions.push_back(Action::Consume(value));
75 // self
76 // }
77
78 /// Queue the stream to wait for a duration
79 pub fn wait(mut self, duration: Duration) -> Self {
80 self.actions.push_back(Action::Wait(duration));
81 self
82 }
83
84 /// Build the [`StreamMock`]
85 pub fn build(self) -> StreamMock<T> {
86 StreamMock {
87 actions: self.actions,
88 sleep: None,
89 }
90 }
91}
92
93impl<T: Unpin> Default for StreamMockBuilder<T> {
94 fn default() -> Self {
95 StreamMockBuilder {
96 actions: VecDeque::new(),
97 }
98 }
99}
100
101/// A mock stream implementing [`Stream`]
102///
103/// See [`StreamMockBuilder`] for more information.
104#[derive(Debug)]
105pub struct StreamMock<T: Unpin> {
106 actions: VecDeque<Action<T>>,
107 sleep: Option<Pin<Box<Sleep>>>,
108}
109
110impl<T: Unpin> StreamMock<T> {
111 fn next_action(&mut self) -> Option<Action<T>> {
112 self.actions.pop_front()
113 }
114}
115
116impl<T: Unpin> Stream for StreamMock<T> {
117 type Item = T;
118
119 fn poll_next(
120 mut self: std::pin::Pin<&mut Self>,
121 cx: &mut std::task::Context<'_>,
122 ) -> std::task::Poll<Option<Self::Item>> {
123 // Try polling the sleep future first
124 if let Some(ref mut sleep) = self.sleep {
125 ready!(Pin::new(sleep).poll(cx));
126 // Since we're ready, discard the sleep future
127 self.sleep.take();
128 }
129
130 match self.next_action() {
131 Some(action) => match action {
132 Action::Next(item) => Poll::Ready(Some(item)),
133 Action::Wait(duration) => {
134 // Set up a sleep future and schedule this future to be polled again for it.
135 self.sleep = Some(Box::pin(sleep_until(Instant::now() + duration)));
136 cx.waker().wake_by_ref();
137
138 Poll::Pending
139 }
140 },
141 None => Poll::Ready(None),
142 }
143 }
144}
145
146impl<T: Unpin> Drop for StreamMock<T> {
147 fn drop(&mut self) {
148 // Avoid double panicking to make debugging easier.
149 if std::thread::panicking() {
150 return;
151 }
152
153 let undropped_count = self
154 .actions
155 .iter()
156 .filter(|action| match action {
157 Action::Next(_) => true,
158 Action::Wait(_) => false,
159 })
160 .count();
161
162 assert!(
163 undropped_count == 0,
164 "StreamMock was dropped before all actions were consumed, {} actions were not consumed",
165 undropped_count
166 );
167 }
168}
169