1 | cfg_rt! { |
2 | pub(crate) mod current_thread; |
3 | pub(crate) use current_thread::CurrentThread; |
4 | |
5 | mod defer; |
6 | use defer::Defer; |
7 | |
8 | pub(crate) mod inject; |
9 | pub(crate) use inject::Inject; |
10 | |
11 | use crate::runtime::TaskHooks; |
12 | } |
13 | |
14 | cfg_rt_multi_thread! { |
15 | mod block_in_place; |
16 | pub(crate) use block_in_place::block_in_place; |
17 | |
18 | mod lock; |
19 | use lock::Lock; |
20 | |
21 | pub(crate) mod multi_thread; |
22 | pub(crate) use multi_thread::MultiThread; |
23 | |
24 | cfg_unstable! { |
25 | pub(crate) mod multi_thread_alt; |
26 | pub(crate) use multi_thread_alt::MultiThread as MultiThreadAlt; |
27 | } |
28 | } |
29 | |
30 | use crate::runtime::driver; |
31 | |
32 | #[derive (Debug, Clone)] |
33 | pub(crate) enum Handle { |
34 | #[cfg (feature = "rt" )] |
35 | CurrentThread(Arc<current_thread::Handle>), |
36 | |
37 | #[cfg (feature = "rt-multi-thread" )] |
38 | MultiThread(Arc<multi_thread::Handle>), |
39 | |
40 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
41 | MultiThreadAlt(Arc<multi_thread_alt::Handle>), |
42 | |
43 | // TODO: This is to avoid triggering "dead code" warnings many other places |
44 | // in the codebase. Remove this during a later cleanup |
45 | #[cfg (not(feature = "rt" ))] |
46 | #[allow (dead_code)] |
47 | Disabled, |
48 | } |
49 | |
50 | #[cfg (feature = "rt" )] |
51 | pub(super) enum Context { |
52 | CurrentThread(current_thread::Context), |
53 | |
54 | #[cfg (feature = "rt-multi-thread" )] |
55 | MultiThread(multi_thread::Context), |
56 | |
57 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
58 | MultiThreadAlt(multi_thread_alt::Context), |
59 | } |
60 | |
61 | impl Handle { |
62 | #[cfg_attr (not(feature = "full" ), allow(dead_code))] |
63 | pub(crate) fn driver(&self) -> &driver::Handle { |
64 | match *self { |
65 | #[cfg (feature = "rt" )] |
66 | Handle::CurrentThread(ref h: &Arc) => &h.driver, |
67 | |
68 | #[cfg (feature = "rt-multi-thread" )] |
69 | Handle::MultiThread(ref h: &Arc) => &h.driver, |
70 | |
71 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
72 | Handle::MultiThreadAlt(ref h) => &h.driver, |
73 | |
74 | #[cfg (not(feature = "rt" ))] |
75 | Handle::Disabled => unreachable!(), |
76 | } |
77 | } |
78 | } |
79 | |
80 | cfg_rt! { |
81 | use crate::future::Future; |
82 | use crate::loom::sync::Arc; |
83 | use crate::runtime::{blocking, task::Id}; |
84 | use crate::runtime::context; |
85 | use crate::task::JoinHandle; |
86 | use crate::util::RngSeedGenerator; |
87 | use std::task::Waker; |
88 | |
89 | macro_rules! match_flavor { |
90 | ($self:expr, $ty:ident($h:ident) => $e:expr) => { |
91 | match $self { |
92 | $ty::CurrentThread($h) => $e, |
93 | |
94 | #[cfg(feature = "rt-multi-thread" )] |
95 | $ty::MultiThread($h) => $e, |
96 | |
97 | #[cfg(all(tokio_unstable, feature = "rt-multi-thread" ))] |
98 | $ty::MultiThreadAlt($h) => $e, |
99 | } |
100 | } |
101 | } |
102 | |
103 | impl Handle { |
104 | #[track_caller ] |
105 | pub(crate) fn current() -> Handle { |
106 | match context::with_current(Clone::clone) { |
107 | Ok(handle) => handle, |
108 | Err(e) => panic!("{}" , e), |
109 | } |
110 | } |
111 | |
112 | pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner { |
113 | match_flavor!(self, Handle(h) => &h.blocking_spawner) |
114 | } |
115 | |
116 | pub(crate) fn is_local(&self) -> bool { |
117 | match self { |
118 | Handle::CurrentThread(h) => h.local_tid.is_some(), |
119 | |
120 | #[cfg (feature = "rt-multi-thread" )] |
121 | Handle::MultiThread(_) => false, |
122 | |
123 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
124 | Handle::MultiThreadAlt(_) => false, |
125 | } |
126 | } |
127 | |
128 | /// Returns true if this is a local runtime and the runtime is owned by the current thread. |
129 | pub(crate) fn can_spawn_local_on_local_runtime(&self) -> bool { |
130 | match self { |
131 | Handle::CurrentThread(h) => h.local_tid.map(|x| std::thread::current().id() == x).unwrap_or(false), |
132 | |
133 | #[cfg (feature = "rt-multi-thread" )] |
134 | Handle::MultiThread(_) => false, |
135 | |
136 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
137 | Handle::MultiThreadAlt(_) => false, |
138 | } |
139 | } |
140 | |
141 | pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output> |
142 | where |
143 | F: Future + Send + 'static, |
144 | F::Output: Send + 'static, |
145 | { |
146 | match self { |
147 | Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), |
148 | |
149 | #[cfg (feature = "rt-multi-thread" )] |
150 | Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id), |
151 | |
152 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
153 | Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id), |
154 | } |
155 | } |
156 | |
157 | /// Spawn a local task |
158 | /// |
159 | /// # Safety |
160 | /// This should only be called in `LocalRuntime` if the runtime has been verified to be owned |
161 | /// by the current thread. |
162 | #[allow (irrefutable_let_patterns)] |
163 | pub(crate) unsafe fn spawn_local<F>(&self, future: F, id: Id) -> JoinHandle<F::Output> |
164 | where |
165 | F: Future + 'static, |
166 | F::Output: 'static, |
167 | { |
168 | if let Handle::CurrentThread(h) = self { |
169 | current_thread::Handle::spawn_local(h, future, id) |
170 | } else { |
171 | panic!("Only current_thread and LocalSet have spawn_local internals implemented" ) |
172 | } |
173 | } |
174 | |
175 | pub(crate) fn shutdown(&self) { |
176 | match *self { |
177 | Handle::CurrentThread(_) => {}, |
178 | |
179 | #[cfg (feature = "rt-multi-thread" )] |
180 | Handle::MultiThread(ref h) => h.shutdown(), |
181 | |
182 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
183 | Handle::MultiThreadAlt(ref h) => h.shutdown(), |
184 | } |
185 | } |
186 | |
187 | pub(crate) fn seed_generator(&self) -> &RngSeedGenerator { |
188 | match_flavor!(self, Handle(h) => &h.seed_generator) |
189 | } |
190 | |
191 | pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> { |
192 | match self { |
193 | Handle::CurrentThread(handle) => handle, |
194 | #[cfg (feature = "rt-multi-thread" )] |
195 | _ => panic!("not a CurrentThread handle" ), |
196 | } |
197 | } |
198 | |
199 | pub(crate) fn hooks(&self) -> &TaskHooks { |
200 | match self { |
201 | Handle::CurrentThread(h) => &h.task_hooks, |
202 | #[cfg (feature = "rt-multi-thread" )] |
203 | Handle::MultiThread(h) => &h.task_hooks, |
204 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
205 | Handle::MultiThreadAlt(h) => &h.task_hooks, |
206 | } |
207 | } |
208 | |
209 | cfg_rt_multi_thread! { |
210 | cfg_unstable! { |
211 | pub(crate) fn expect_multi_thread_alt(&self) -> &Arc<multi_thread_alt::Handle> { |
212 | match self { |
213 | Handle::MultiThreadAlt(handle) => handle, |
214 | _ => panic!("not a `MultiThreadAlt` handle" ), |
215 | } |
216 | } |
217 | } |
218 | } |
219 | } |
220 | |
221 | impl Handle { |
222 | pub(crate) fn num_workers(&self) -> usize { |
223 | match self { |
224 | Handle::CurrentThread(_) => 1, |
225 | #[cfg (feature = "rt-multi-thread" )] |
226 | Handle::MultiThread(handle) => handle.num_workers(), |
227 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
228 | Handle::MultiThreadAlt(handle) => handle.num_workers(), |
229 | } |
230 | } |
231 | |
232 | pub(crate) fn num_alive_tasks(&self) -> usize { |
233 | match_flavor!(self, Handle(handle) => handle.num_alive_tasks()) |
234 | } |
235 | |
236 | pub(crate) fn injection_queue_depth(&self) -> usize { |
237 | match_flavor!(self, Handle(handle) => handle.injection_queue_depth()) |
238 | } |
239 | } |
240 | |
241 | cfg_unstable_metrics! { |
242 | use crate::runtime::{SchedulerMetrics, WorkerMetrics}; |
243 | |
244 | impl Handle { |
245 | cfg_64bit_metrics! { |
246 | pub(crate) fn spawned_tasks_count(&self) -> u64 { |
247 | match_flavor!(self, Handle(handle) => handle.spawned_tasks_count()) |
248 | } |
249 | } |
250 | |
251 | pub(crate) fn num_blocking_threads(&self) -> usize { |
252 | match_flavor!(self, Handle(handle) => handle.num_blocking_threads()) |
253 | } |
254 | |
255 | pub(crate) fn num_idle_blocking_threads(&self) -> usize { |
256 | match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads()) |
257 | } |
258 | |
259 | pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { |
260 | match_flavor!(self, Handle(handle) => handle.scheduler_metrics()) |
261 | } |
262 | |
263 | pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { |
264 | match_flavor!(self, Handle(handle) => handle.worker_metrics(worker)) |
265 | } |
266 | |
267 | pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { |
268 | match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker)) |
269 | } |
270 | |
271 | pub(crate) fn blocking_queue_depth(&self) -> usize { |
272 | match_flavor!(self, Handle(handle) => handle.blocking_queue_depth()) |
273 | } |
274 | } |
275 | } |
276 | |
277 | impl Context { |
278 | #[track_caller ] |
279 | pub(crate) fn expect_current_thread(&self) -> ¤t_thread::Context { |
280 | match self { |
281 | Context::CurrentThread(context) => context, |
282 | #[cfg (feature = "rt-multi-thread" )] |
283 | _ => panic!("expected `CurrentThread::Context`" ) |
284 | } |
285 | } |
286 | |
287 | pub(crate) fn defer(&self, waker: &Waker) { |
288 | match_flavor!(self, Context(context) => context.defer(waker)); |
289 | } |
290 | |
291 | cfg_rt_multi_thread! { |
292 | #[track_caller ] |
293 | pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context { |
294 | match self { |
295 | Context::MultiThread(context) => context, |
296 | _ => panic!("expected `MultiThread::Context`" ) |
297 | } |
298 | } |
299 | |
300 | cfg_unstable! { |
301 | #[track_caller ] |
302 | pub(crate) fn expect_multi_thread_alt(&self) -> &multi_thread_alt::Context { |
303 | match self { |
304 | Context::MultiThreadAlt(context) => context, |
305 | _ => panic!("expected `MultiThreadAlt::Context`" ) |
306 | } |
307 | } |
308 | } |
309 | } |
310 | } |
311 | } |
312 | |
313 | cfg_not_rt! { |
314 | #[cfg (any( |
315 | feature = "net" , |
316 | all(unix, feature = "process" ), |
317 | all(unix, feature = "signal" ), |
318 | feature = "time" , |
319 | ))] |
320 | impl Handle { |
321 | #[track_caller ] |
322 | pub(crate) fn current() -> Handle { |
323 | panic!("{}" , crate::util::error::CONTEXT_MISSING_ERROR) |
324 | } |
325 | } |
326 | } |
327 | |