1use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
2use crate::runtime::tests::NoopSchedule;
3
4use std::collections::VecDeque;
5use std::future::Future;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, Mutex};
8
9struct AssertDropHandle {
10 is_dropped: Arc<AtomicBool>,
11}
12impl AssertDropHandle {
13 #[track_caller]
14 fn assert_dropped(&self) {
15 assert!(self.is_dropped.load(Ordering::SeqCst));
16 }
17
18 #[track_caller]
19 fn assert_not_dropped(&self) {
20 assert!(!self.is_dropped.load(Ordering::SeqCst));
21 }
22}
23
24struct AssertDrop {
25 is_dropped: Arc<AtomicBool>,
26}
27impl AssertDrop {
28 fn new() -> (Self, AssertDropHandle) {
29 let shared = Arc::new(AtomicBool::new(false));
30 (
31 AssertDrop {
32 is_dropped: shared.clone(),
33 },
34 AssertDropHandle {
35 is_dropped: shared.clone(),
36 },
37 )
38 }
39}
40impl Drop for AssertDrop {
41 fn drop(&mut self) {
42 self.is_dropped.store(true, Ordering::SeqCst);
43 }
44}
45
46// A Notified does not shut down on drop, but it is dropped once the ref-count
47// hits zero.
48#[test]
49fn create_drop1() {
50 let (ad, handle) = AssertDrop::new();
51 let (notified, join) = unowned(
52 async {
53 drop(ad);
54 unreachable!()
55 },
56 NoopSchedule,
57 Id::next(),
58 );
59 drop(notified);
60 handle.assert_not_dropped();
61 drop(join);
62 handle.assert_dropped();
63}
64
65#[test]
66fn create_drop2() {
67 let (ad, handle) = AssertDrop::new();
68 let (notified, join) = unowned(
69 async {
70 drop(ad);
71 unreachable!()
72 },
73 NoopSchedule,
74 Id::next(),
75 );
76 drop(join);
77 handle.assert_not_dropped();
78 drop(notified);
79 handle.assert_dropped();
80}
81
82#[test]
83fn drop_abort_handle1() {
84 let (ad, handle) = AssertDrop::new();
85 let (notified, join) = unowned(
86 async {
87 drop(ad);
88 unreachable!()
89 },
90 NoopSchedule,
91 Id::next(),
92 );
93 let abort = join.abort_handle();
94 drop(join);
95 handle.assert_not_dropped();
96 drop(notified);
97 handle.assert_not_dropped();
98 drop(abort);
99 handle.assert_dropped();
100}
101
102#[test]
103fn drop_abort_handle2() {
104 let (ad, handle) = AssertDrop::new();
105 let (notified, join) = unowned(
106 async {
107 drop(ad);
108 unreachable!()
109 },
110 NoopSchedule,
111 Id::next(),
112 );
113 let abort = join.abort_handle();
114 drop(notified);
115 handle.assert_not_dropped();
116 drop(abort);
117 handle.assert_not_dropped();
118 drop(join);
119 handle.assert_dropped();
120}
121
122// Shutting down through Notified works
123#[test]
124fn create_shutdown1() {
125 let (ad, handle) = AssertDrop::new();
126 let (notified, join) = unowned(
127 async {
128 drop(ad);
129 unreachable!()
130 },
131 NoopSchedule,
132 Id::next(),
133 );
134 drop(join);
135 handle.assert_not_dropped();
136 notified.shutdown();
137 handle.assert_dropped();
138}
139
140#[test]
141fn create_shutdown2() {
142 let (ad, handle) = AssertDrop::new();
143 let (notified, join) = unowned(
144 async {
145 drop(ad);
146 unreachable!()
147 },
148 NoopSchedule,
149 Id::next(),
150 );
151 handle.assert_not_dropped();
152 notified.shutdown();
153 handle.assert_dropped();
154 drop(join);
155}
156
157#[test]
158fn unowned_poll() {
159 let (task, _) = unowned(async {}, NoopSchedule, Id::next());
160 task.run();
161}
162
163#[test]
164fn schedule() {
165 with(|rt| {
166 rt.spawn(async {
167 crate::task::yield_now().await;
168 });
169
170 assert_eq!(2, rt.tick());
171 rt.shutdown();
172 })
173}
174
175#[test]
176fn shutdown() {
177 with(|rt| {
178 rt.spawn(async {
179 loop {
180 crate::task::yield_now().await;
181 }
182 });
183
184 rt.tick_max(1);
185
186 rt.shutdown();
187 })
188}
189
190#[test]
191fn shutdown_immediately() {
192 with(|rt| {
193 rt.spawn(async {
194 loop {
195 crate::task::yield_now().await;
196 }
197 });
198
199 rt.shutdown();
200 })
201}
202
203#[test]
204fn spawn_during_shutdown() {
205 static DID_SPAWN: AtomicBool = AtomicBool::new(false);
206
207 struct SpawnOnDrop(Runtime);
208 impl Drop for SpawnOnDrop {
209 fn drop(&mut self) {
210 DID_SPAWN.store(true, Ordering::SeqCst);
211 self.0.spawn(async {});
212 }
213 }
214
215 with(|rt| {
216 let rt2 = rt.clone();
217 rt.spawn(async move {
218 let _spawn_on_drop = SpawnOnDrop(rt2);
219
220 loop {
221 crate::task::yield_now().await;
222 }
223 });
224
225 rt.tick_max(1);
226 rt.shutdown();
227 });
228
229 assert!(DID_SPAWN.load(Ordering::SeqCst));
230}
231
232fn with(f: impl FnOnce(Runtime)) {
233 struct Reset;
234
235 impl Drop for Reset {
236 fn drop(&mut self) {
237 let _rt = CURRENT.try_lock().unwrap().take();
238 }
239 }
240
241 let _reset = Reset;
242
243 let rt = Runtime(Arc::new(Inner {
244 owned: OwnedTasks::new(16),
245 core: Mutex::new(Core {
246 queue: VecDeque::new(),
247 }),
248 }));
249
250 *CURRENT.try_lock().unwrap() = Some(rt.clone());
251 f(rt)
252}
253
254#[derive(Clone)]
255struct Runtime(Arc<Inner>);
256
257struct Inner {
258 core: Mutex<Core>,
259 owned: OwnedTasks<Runtime>,
260}
261
262struct Core {
263 queue: VecDeque<task::Notified<Runtime>>,
264}
265
266static CURRENT: Mutex<Option<Runtime>> = Mutex::new(None);
267
268impl Runtime {
269 fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
270 where
271 T: 'static + Send + Future,
272 T::Output: 'static + Send,
273 {
274 let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
275
276 if let Some(notified) = notified {
277 self.schedule(notified);
278 }
279
280 handle
281 }
282
283 fn tick(&self) -> usize {
284 self.tick_max(usize::MAX)
285 }
286
287 fn tick_max(&self, max: usize) -> usize {
288 let mut n = 0;
289
290 while !self.is_empty() && n < max {
291 let task = self.next_task();
292 n += 1;
293 let task = self.0.owned.assert_owner(task);
294 task.run();
295 }
296
297 n
298 }
299
300 fn is_empty(&self) -> bool {
301 self.0.core.try_lock().unwrap().queue.is_empty()
302 }
303
304 fn next_task(&self) -> task::Notified<Runtime> {
305 self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
306 }
307
308 fn shutdown(&self) {
309 let mut core = self.0.core.try_lock().unwrap();
310
311 self.0.owned.close_and_shutdown_all(0);
312
313 while let Some(task) = core.queue.pop_back() {
314 drop(task);
315 }
316
317 drop(core);
318 assert!(self.0.owned.is_empty());
319 }
320}
321
322impl Schedule for Runtime {
323 fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
324 self.0.owned.remove(task)
325 }
326
327 fn schedule(&self, task: task::Notified<Self>) {
328 self.0.core.try_lock().unwrap().queue.push_back(task);
329 }
330}
331