1 | #![cfg (not(target_os = "wasi" ))] |
2 | |
3 | use std::{task::Context, time::Duration}; |
4 | |
5 | #[cfg (not(loom))] |
6 | use futures::task::noop_waker_ref; |
7 | |
8 | use crate::loom::sync::atomic::{AtomicBool, Ordering}; |
9 | use crate::loom::sync::Arc; |
10 | use crate::loom::thread; |
11 | |
12 | use super::TimerEntry; |
13 | |
14 | fn block_on<T>(f: impl std::future::Future<Output = T>) -> T { |
15 | #[cfg (loom)] |
16 | return loom::future::block_on(f); |
17 | |
18 | #[cfg (not(loom))] |
19 | { |
20 | let rt = crate::runtime::Builder::new_current_thread() |
21 | .build() |
22 | .unwrap(); |
23 | rt.block_on(f) |
24 | } |
25 | } |
26 | |
27 | fn model(f: impl Fn() + Send + Sync + 'static) { |
28 | #[cfg (loom)] |
29 | loom::model(f); |
30 | |
31 | #[cfg (not(loom))] |
32 | f(); |
33 | } |
34 | |
35 | fn rt(start_paused: bool) -> crate::runtime::Runtime { |
36 | crate::runtime::Builder::new_current_thread() |
37 | .enable_time() |
38 | .start_paused(start_paused) |
39 | .build() |
40 | .unwrap() |
41 | } |
42 | |
43 | #[test] |
44 | fn single_timer() { |
45 | model(|| { |
46 | let rt = rt(false); |
47 | let handle = rt.handle(); |
48 | |
49 | let handle_ = handle.clone(); |
50 | let jh = thread::spawn(move || { |
51 | let entry = TimerEntry::new( |
52 | &handle_.inner, |
53 | handle_.inner.driver().clock().now() + Duration::from_secs(1), |
54 | ); |
55 | pin!(entry); |
56 | |
57 | block_on(futures::future::poll_fn(|cx| { |
58 | entry.as_mut().poll_elapsed(cx) |
59 | })) |
60 | .unwrap(); |
61 | }); |
62 | |
63 | thread::yield_now(); |
64 | |
65 | let time = handle.inner.driver().time(); |
66 | let clock = handle.inner.driver().clock(); |
67 | |
68 | // This may or may not return Some (depending on how it races with the |
69 | // thread). If it does return None, however, the timer should complete |
70 | // synchronously. |
71 | time.process_at_time(time.time_source().now(clock) + 2_000_000_000); |
72 | |
73 | jh.join().unwrap(); |
74 | }) |
75 | } |
76 | |
77 | #[test] |
78 | fn drop_timer() { |
79 | model(|| { |
80 | let rt = rt(false); |
81 | let handle = rt.handle(); |
82 | |
83 | let handle_ = handle.clone(); |
84 | let jh = thread::spawn(move || { |
85 | let entry = TimerEntry::new( |
86 | &handle_.inner, |
87 | handle_.inner.driver().clock().now() + Duration::from_secs(1), |
88 | ); |
89 | pin!(entry); |
90 | |
91 | let _ = entry |
92 | .as_mut() |
93 | .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); |
94 | let _ = entry |
95 | .as_mut() |
96 | .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); |
97 | }); |
98 | |
99 | thread::yield_now(); |
100 | |
101 | let time = handle.inner.driver().time(); |
102 | let clock = handle.inner.driver().clock(); |
103 | |
104 | // advance 2s in the future. |
105 | time.process_at_time(time.time_source().now(clock) + 2_000_000_000); |
106 | |
107 | jh.join().unwrap(); |
108 | }) |
109 | } |
110 | |
111 | #[test] |
112 | fn change_waker() { |
113 | model(|| { |
114 | let rt = rt(false); |
115 | let handle = rt.handle(); |
116 | |
117 | let handle_ = handle.clone(); |
118 | let jh = thread::spawn(move || { |
119 | let entry = TimerEntry::new( |
120 | &handle_.inner, |
121 | handle_.inner.driver().clock().now() + Duration::from_secs(1), |
122 | ); |
123 | pin!(entry); |
124 | |
125 | let _ = entry |
126 | .as_mut() |
127 | .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); |
128 | |
129 | block_on(futures::future::poll_fn(|cx| { |
130 | entry.as_mut().poll_elapsed(cx) |
131 | })) |
132 | .unwrap(); |
133 | }); |
134 | |
135 | thread::yield_now(); |
136 | |
137 | let time = handle.inner.driver().time(); |
138 | let clock = handle.inner.driver().clock(); |
139 | |
140 | // advance 2s |
141 | time.process_at_time(time.time_source().now(clock) + 2_000_000_000); |
142 | |
143 | jh.join().unwrap(); |
144 | }) |
145 | } |
146 | |
147 | #[test] |
148 | fn reset_future() { |
149 | model(|| { |
150 | let finished_early = Arc::new(AtomicBool::new(false)); |
151 | |
152 | let rt = rt(false); |
153 | let handle = rt.handle(); |
154 | |
155 | let handle_ = handle.clone(); |
156 | let finished_early_ = finished_early.clone(); |
157 | let start = handle.inner.driver().clock().now(); |
158 | |
159 | let jh = thread::spawn(move || { |
160 | let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1)); |
161 | pin!(entry); |
162 | |
163 | let _ = entry |
164 | .as_mut() |
165 | .poll_elapsed(&mut Context::from_waker(futures::task::noop_waker_ref())); |
166 | |
167 | entry.as_mut().reset(start + Duration::from_secs(2), true); |
168 | |
169 | // shouldn't complete before 2s |
170 | block_on(futures::future::poll_fn(|cx| { |
171 | entry.as_mut().poll_elapsed(cx) |
172 | })) |
173 | .unwrap(); |
174 | |
175 | finished_early_.store(true, Ordering::Relaxed); |
176 | }); |
177 | |
178 | thread::yield_now(); |
179 | |
180 | let handle = handle.inner.driver().time(); |
181 | |
182 | // This may or may not return a wakeup time. |
183 | handle.process_at_time( |
184 | handle |
185 | .time_source() |
186 | .instant_to_tick(start + Duration::from_millis(1500)), |
187 | ); |
188 | |
189 | assert!(!finished_early.load(Ordering::Relaxed)); |
190 | |
191 | handle.process_at_time( |
192 | handle |
193 | .time_source() |
194 | .instant_to_tick(start + Duration::from_millis(2500)), |
195 | ); |
196 | |
197 | jh.join().unwrap(); |
198 | |
199 | assert!(finished_early.load(Ordering::Relaxed)); |
200 | }) |
201 | } |
202 | |
203 | #[cfg (not(loom))] |
204 | fn normal_or_miri<T>(normal: T, miri: T) -> T { |
205 | if cfg!(miri) { |
206 | miri |
207 | } else { |
208 | normal |
209 | } |
210 | } |
211 | |
212 | #[test] |
213 | #[cfg (not(loom))] |
214 | fn poll_process_levels() { |
215 | let rt = rt(true); |
216 | let handle = rt.handle(); |
217 | |
218 | let mut entries = vec![]; |
219 | |
220 | for i in 0..normal_or_miri(1024, 64) { |
221 | let mut entry = Box::pin(TimerEntry::new( |
222 | &handle.inner, |
223 | handle.inner.driver().clock().now() + Duration::from_millis(i), |
224 | )); |
225 | |
226 | let _ = entry |
227 | .as_mut() |
228 | .poll_elapsed(&mut Context::from_waker(noop_waker_ref())); |
229 | |
230 | entries.push(entry); |
231 | } |
232 | |
233 | for t in 1..normal_or_miri(1024, 64) { |
234 | handle.inner.driver().time().process_at_time(t as u64); |
235 | |
236 | for (deadline, future) in entries.iter_mut().enumerate() { |
237 | let mut context = Context::from_waker(noop_waker_ref()); |
238 | if deadline <= t { |
239 | assert!(future.as_mut().poll_elapsed(&mut context).is_ready()); |
240 | } else { |
241 | assert!(future.as_mut().poll_elapsed(&mut context).is_pending()); |
242 | } |
243 | } |
244 | } |
245 | } |
246 | |
247 | #[test] |
248 | #[cfg (not(loom))] |
249 | fn poll_process_levels_targeted() { |
250 | let mut context = Context::from_waker(noop_waker_ref()); |
251 | |
252 | let rt = rt(true); |
253 | let handle = rt.handle(); |
254 | |
255 | let e1 = TimerEntry::new( |
256 | &handle.inner, |
257 | handle.inner.driver().clock().now() + Duration::from_millis(193), |
258 | ); |
259 | pin!(e1); |
260 | |
261 | let handle = handle.inner.driver().time(); |
262 | |
263 | handle.process_at_time(62); |
264 | assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); |
265 | handle.process_at_time(192); |
266 | handle.process_at_time(192); |
267 | } |
268 | |