1#![cfg(not(target_os = "wasi"))]
2
3use std::{task::Context, time::Duration};
4
5#[cfg(not(loom))]
6use futures::task::noop_waker_ref;
7
8use crate::loom::sync::atomic::{AtomicBool, Ordering};
9use crate::loom::sync::Arc;
10use crate::loom::thread;
11
12use super::TimerEntry;
13
14fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
15 #[cfg(loom)]
16 return loom::future::block_on(f);
17
18 #[cfg(not(loom))]
19 {
20 let rt = crate::runtime::Builder::new_current_thread()
21 .build()
22 .unwrap();
23 rt.block_on(f)
24 }
25}
26
27fn model(f: impl Fn() + Send + Sync + 'static) {
28 #[cfg(loom)]
29 loom::model(f);
30
31 #[cfg(not(loom))]
32 f();
33}
34
35fn rt(start_paused: bool) -> crate::runtime::Runtime {
36 crate::runtime::Builder::new_current_thread()
37 .enable_time()
38 .start_paused(start_paused)
39 .build()
40 .unwrap()
41}
42
43#[test]
44fn single_timer() {
45 model(|| {
46 let rt = rt(false);
47 let handle = rt.handle();
48
49 let handle_ = handle.clone();
50 let jh = thread::spawn(move || {
51 let entry = TimerEntry::new(
52 &handle_.inner,
53 handle_.inner.driver().clock().now() + Duration::from_secs(1),
54 );
55 pin!(entry);
56
57 block_on(futures::future::poll_fn(|cx| {
58 entry.as_mut().poll_elapsed(cx)
59 }))
60 .unwrap();
61 });
62
63 thread::yield_now();
64
65 let time = handle.inner.driver().time();
66 let clock = handle.inner.driver().clock();
67
68 // This may or may not return Some (depending on how it races with the
69 // thread). If it does return None, however, the timer should complete
70 // synchronously.
71 time.process_at_time(time.time_source().now(clock) + 2_000_000_000);
72
73 jh.join().unwrap();
74 })
75}
76
77#[test]
78fn drop_timer() {
79 model(|| {
80 let rt = rt(false);
81 let handle = rt.handle();
82
83 let handle_ = handle.clone();
84 let jh = thread::spawn(move || {
85 let entry = TimerEntry::new(
86 &handle_.inner,
87 handle_.inner.driver().clock().now() + Duration::from_secs(1),
88 );
89 pin!(entry);
90
91 let _ = entry
92 .as_mut()
93 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
94 let _ = entry
95 .as_mut()
96 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
97 });
98
99 thread::yield_now();
100
101 let time = handle.inner.driver().time();
102 let clock = handle.inner.driver().clock();
103
104 // advance 2s in the future.
105 time.process_at_time(time.time_source().now(clock) + 2_000_000_000);
106
107 jh.join().unwrap();
108 })
109}
110
111#[test]
112fn change_waker() {
113 model(|| {
114 let rt = rt(false);
115 let handle = rt.handle();
116
117 let handle_ = handle.clone();
118 let jh = thread::spawn(move || {
119 let entry = TimerEntry::new(
120 &handle_.inner,
121 handle_.inner.driver().clock().now() + Duration::from_secs(1),
122 );
123 pin!(entry);
124
125 let _ = entry
126 .as_mut()
127 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
128
129 block_on(futures::future::poll_fn(|cx| {
130 entry.as_mut().poll_elapsed(cx)
131 }))
132 .unwrap();
133 });
134
135 thread::yield_now();
136
137 let time = handle.inner.driver().time();
138 let clock = handle.inner.driver().clock();
139
140 // advance 2s
141 time.process_at_time(time.time_source().now(clock) + 2_000_000_000);
142
143 jh.join().unwrap();
144 })
145}
146
147#[test]
148fn reset_future() {
149 model(|| {
150 let finished_early = Arc::new(AtomicBool::new(false));
151
152 let rt = rt(false);
153 let handle = rt.handle();
154
155 let handle_ = handle.clone();
156 let finished_early_ = finished_early.clone();
157 let start = handle.inner.driver().clock().now();
158
159 let jh = thread::spawn(move || {
160 let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1));
161 pin!(entry);
162
163 let _ = entry
164 .as_mut()
165 .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref()));
166
167 entry.as_mut().reset(start + Duration::from_secs(2), true);
168
169 // shouldn't complete before 2s
170 block_on(futures::future::poll_fn(|cx| {
171 entry.as_mut().poll_elapsed(cx)
172 }))
173 .unwrap();
174
175 finished_early_.store(true, Ordering::Relaxed);
176 });
177
178 thread::yield_now();
179
180 let handle = handle.inner.driver().time();
181
182 // This may or may not return a wakeup time.
183 handle.process_at_time(
184 handle
185 .time_source()
186 .instant_to_tick(start + Duration::from_millis(1500)),
187 );
188
189 assert!(!finished_early.load(Ordering::Relaxed));
190
191 handle.process_at_time(
192 handle
193 .time_source()
194 .instant_to_tick(start + Duration::from_millis(2500)),
195 );
196
197 jh.join().unwrap();
198
199 assert!(finished_early.load(Ordering::Relaxed));
200 })
201}
202
203#[cfg(not(loom))]
204fn normal_or_miri<T>(normal: T, miri: T) -> T {
205 if cfg!(miri) {
206 miri
207 } else {
208 normal
209 }
210}
211
212#[test]
213#[cfg(not(loom))]
214fn poll_process_levels() {
215 let rt = rt(true);
216 let handle = rt.handle();
217
218 let mut entries = vec![];
219
220 for i in 0..normal_or_miri(1024, 64) {
221 let mut entry = Box::pin(TimerEntry::new(
222 &handle.inner,
223 handle.inner.driver().clock().now() + Duration::from_millis(i),
224 ));
225
226 let _ = entry
227 .as_mut()
228 .poll_elapsed(&mut Context::from_waker(noop_waker_ref()));
229
230 entries.push(entry);
231 }
232
233 for t in 1..normal_or_miri(1024, 64) {
234 handle.inner.driver().time().process_at_time(t as u64);
235
236 for (deadline, future) in entries.iter_mut().enumerate() {
237 let mut context = Context::from_waker(noop_waker_ref());
238 if deadline <= t {
239 assert!(future.as_mut().poll_elapsed(&mut context).is_ready());
240 } else {
241 assert!(future.as_mut().poll_elapsed(&mut context).is_pending());
242 }
243 }
244 }
245}
246
247#[test]
248#[cfg(not(loom))]
249fn poll_process_levels_targeted() {
250 let mut context = Context::from_waker(noop_waker_ref());
251
252 let rt = rt(true);
253 let handle = rt.handle();
254
255 let e1 = TimerEntry::new(
256 &handle.inner,
257 handle.inner.driver().clock().now() + Duration::from_millis(193),
258 );
259 pin!(e1);
260
261 let handle = handle.inner.driver().time();
262
263 handle.process_at_time(62);
264 assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
265 handle.process_at_time(192);
266 handle.process_at_time(192);
267}
268