1 | use crate::runtime::blocking::BlockingPool; |
2 | use crate::runtime::scheduler::CurrentThread; |
3 | use crate::runtime::{context, EnterGuard, Handle}; |
4 | use crate::task::JoinHandle; |
5 | |
6 | use std::future::Future; |
7 | use std::time::Duration; |
8 | |
9 | cfg_rt_multi_thread! { |
10 | use crate::runtime::Builder; |
11 | use crate::runtime::scheduler::MultiThread; |
12 | |
13 | cfg_unstable! { |
14 | use crate::runtime::scheduler::MultiThreadAlt; |
15 | } |
16 | } |
17 | |
18 | /// The Tokio runtime. |
19 | /// |
20 | /// The runtime provides an I/O driver, task scheduler, [timer], and |
21 | /// blocking pool, necessary for running asynchronous tasks. |
22 | /// |
23 | /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. |
24 | /// However, most users will use the `#[tokio::main]` annotation on their |
25 | /// entry point instead. |
26 | /// |
27 | /// See [module level][mod] documentation for more details. |
28 | /// |
29 | /// # Shutdown |
30 | /// |
31 | /// Shutting down the runtime is done by dropping the value, or calling |
32 | /// [`shutdown_background`] or [`shutdown_timeout`]. |
33 | /// |
34 | /// Tasks spawned through [`Runtime::spawn`] keep running until they yield. |
35 | /// Then they are dropped. They are not *guaranteed* to run to completion, but |
36 | /// *might* do so if they do not yield until completion. |
37 | /// |
38 | /// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running |
39 | /// until they return. |
40 | /// |
41 | /// The thread initiating the shutdown blocks until all spawned work has been |
42 | /// stopped. This can take an indefinite amount of time. The `Drop` |
43 | /// implementation waits forever for this. |
44 | /// |
45 | /// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if |
46 | /// waiting forever is undesired. When the timeout is reached, spawned work that |
47 | /// did not stop in time and threads running it are leaked. The work continues |
48 | /// to run until one of the stopping conditions is fulfilled, but the thread |
49 | /// initiating the shutdown is unblocked. |
50 | /// |
51 | /// Once the runtime has been dropped, any outstanding I/O resources bound to |
52 | /// it will no longer function. Calling any method on them will result in an |
53 | /// error. |
54 | /// |
55 | /// # Sharing |
56 | /// |
57 | /// There are several ways to establish shared access to a Tokio runtime: |
58 | /// |
59 | /// * Using an <code>[Arc]\<Runtime></code>. |
60 | /// * Using a [`Handle`]. |
61 | /// * Entering the runtime context. |
62 | /// |
63 | /// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various |
64 | /// things with the runtime such as spawning new tasks or entering the runtime |
65 | /// context. Both types can be cloned to create a new handle that allows access |
66 | /// to the same runtime. By passing clones into different tasks or threads, you |
67 | /// will be able to access the runtime from those tasks or threads. |
68 | /// |
69 | /// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that |
70 | /// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down, |
71 | /// whereas a [`Handle`] does not prevent that. This is because shutdown of the |
72 | /// runtime happens when the destructor of the `Runtime` object runs. |
73 | /// |
74 | /// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive |
75 | /// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>, |
76 | /// this can be achieved via [`Arc::try_unwrap`] when only one strong count |
77 | /// reference is left over. |
78 | /// |
79 | /// The runtime context is entered using the [`Runtime::enter`] or |
80 | /// [`Handle::enter`] methods, which use a thread-local variable to store the |
81 | /// current runtime. Whenever you are inside the runtime context, methods such |
82 | /// as [`tokio::spawn`] will use the runtime whose context you are inside. |
83 | /// |
84 | /// [timer]: crate::time |
85 | /// [mod]: index.html |
86 | /// [`new`]: method@Self::new |
87 | /// [`Builder`]: struct@Builder |
88 | /// [`Handle`]: struct@Handle |
89 | /// [`tokio::spawn`]: crate::spawn |
90 | /// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap |
91 | /// [Arc]: std::sync::Arc |
92 | /// [`shutdown_background`]: method@Runtime::shutdown_background |
93 | /// [`shutdown_timeout`]: method@Runtime::shutdown_timeout |
94 | #[derive(Debug)] |
95 | pub struct Runtime { |
96 | /// Task scheduler |
97 | scheduler: Scheduler, |
98 | |
99 | /// Handle to runtime, also contains driver handles |
100 | handle: Handle, |
101 | |
102 | /// Blocking pool handle, used to signal shutdown |
103 | blocking_pool: BlockingPool, |
104 | } |
105 | |
106 | /// The flavor of a `Runtime`. |
107 | /// |
108 | /// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()). |
109 | #[derive(Debug, PartialEq, Eq)] |
110 | #[non_exhaustive ] |
111 | pub enum RuntimeFlavor { |
112 | /// The flavor that executes all tasks on the current thread. |
113 | CurrentThread, |
114 | /// The flavor that executes tasks across multiple threads. |
115 | MultiThread, |
116 | /// The flavor that executes tasks across multiple threads. |
117 | #[cfg (tokio_unstable)] |
118 | MultiThreadAlt, |
119 | } |
120 | |
121 | /// The runtime scheduler is either a multi-thread or a current-thread executor. |
122 | #[derive(Debug)] |
123 | pub(super) enum Scheduler { |
124 | /// Execute all tasks on the current-thread. |
125 | CurrentThread(CurrentThread), |
126 | |
127 | /// Execute tasks across multiple threads. |
128 | #[cfg (all(feature = "rt-multi-thread" , not(target_os = "wasi" )))] |
129 | MultiThread(MultiThread), |
130 | |
131 | /// Execute tasks across multiple threads. |
132 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" , not(target_os = "wasi" )))] |
133 | MultiThreadAlt(MultiThreadAlt), |
134 | } |
135 | |
136 | impl Runtime { |
137 | pub(super) fn from_parts( |
138 | scheduler: Scheduler, |
139 | handle: Handle, |
140 | blocking_pool: BlockingPool, |
141 | ) -> Runtime { |
142 | Runtime { |
143 | scheduler, |
144 | handle, |
145 | blocking_pool, |
146 | } |
147 | } |
148 | |
149 | cfg_not_wasi! { |
150 | /// Creates a new runtime instance with default configuration values. |
151 | /// |
152 | /// This results in the multi threaded scheduler, I/O driver, and time driver being |
153 | /// initialized. |
154 | /// |
155 | /// Most applications will not need to call this function directly. Instead, |
156 | /// they will use the [`#[tokio::main]` attribute][main]. When a more complex |
157 | /// configuration is necessary, the [runtime builder] may be used. |
158 | /// |
159 | /// See [module level][mod] documentation for more details. |
160 | /// |
161 | /// # Examples |
162 | /// |
163 | /// Creating a new `Runtime` with default configuration values. |
164 | /// |
165 | /// ``` |
166 | /// use tokio::runtime::Runtime; |
167 | /// |
168 | /// let rt = Runtime::new() |
169 | /// .unwrap(); |
170 | /// |
171 | /// // Use the runtime... |
172 | /// ``` |
173 | /// |
174 | /// [mod]: index.html |
175 | /// [main]: ../attr.main.html |
176 | /// [threaded scheduler]: index.html#threaded-scheduler |
177 | /// [runtime builder]: crate::runtime::Builder |
178 | #[cfg (feature = "rt-multi-thread" )] |
179 | #[cfg_attr (docsrs, doc(cfg(feature = "rt-multi-thread" )))] |
180 | pub fn new() -> std::io::Result<Runtime> { |
181 | Builder::new_multi_thread().enable_all().build() |
182 | } |
183 | } |
184 | |
185 | /// Returns a handle to the runtime's spawner. |
186 | /// |
187 | /// The returned handle can be used to spawn tasks that run on this runtime, and can |
188 | /// be cloned to allow moving the `Handle` to other threads. |
189 | /// |
190 | /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone. |
191 | /// Refer to the documentation of [`Handle::block_on`] for more. |
192 | /// |
193 | /// # Examples |
194 | /// |
195 | /// ``` |
196 | /// use tokio::runtime::Runtime; |
197 | /// |
198 | /// let rt = Runtime::new() |
199 | /// .unwrap(); |
200 | /// |
201 | /// let handle = rt.handle(); |
202 | /// |
203 | /// // Use the handle... |
204 | /// ``` |
205 | pub fn handle(&self) -> &Handle { |
206 | &self.handle |
207 | } |
208 | |
209 | /// Spawns a future onto the Tokio runtime. |
210 | /// |
211 | /// This spawns the given future onto the runtime's executor, usually a |
212 | /// thread pool. The thread pool is then responsible for polling the future |
213 | /// until it completes. |
214 | /// |
215 | /// The provided future will start running in the background immediately |
216 | /// when `spawn` is called, even if you don't await the returned |
217 | /// `JoinHandle`. |
218 | /// |
219 | /// See [module level][mod] documentation for more details. |
220 | /// |
221 | /// [mod]: index.html |
222 | /// |
223 | /// # Examples |
224 | /// |
225 | /// ``` |
226 | /// use tokio::runtime::Runtime; |
227 | /// |
228 | /// # fn dox() { |
229 | /// // Create the runtime |
230 | /// let rt = Runtime::new().unwrap(); |
231 | /// |
232 | /// // Spawn a future onto the runtime |
233 | /// rt.spawn(async { |
234 | /// println!("now running on a worker thread" ); |
235 | /// }); |
236 | /// # } |
237 | /// ``` |
238 | #[track_caller ] |
239 | pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> |
240 | where |
241 | F: Future + Send + 'static, |
242 | F::Output: Send + 'static, |
243 | { |
244 | self.handle.spawn(future) |
245 | } |
246 | |
247 | /// Runs the provided function on an executor dedicated to blocking operations. |
248 | /// |
249 | /// # Examples |
250 | /// |
251 | /// ``` |
252 | /// use tokio::runtime::Runtime; |
253 | /// |
254 | /// # fn dox() { |
255 | /// // Create the runtime |
256 | /// let rt = Runtime::new().unwrap(); |
257 | /// |
258 | /// // Spawn a blocking function onto the runtime |
259 | /// rt.spawn_blocking(|| { |
260 | /// println!("now running on a worker thread" ); |
261 | /// }); |
262 | /// # } |
263 | /// ``` |
264 | #[track_caller ] |
265 | pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> |
266 | where |
267 | F: FnOnce() -> R + Send + 'static, |
268 | R: Send + 'static, |
269 | { |
270 | self.handle.spawn_blocking(func) |
271 | } |
272 | |
273 | /// Runs a future to completion on the Tokio runtime. This is the |
274 | /// runtime's entry point. |
275 | /// |
276 | /// This runs the given future on the current thread, blocking until it is |
277 | /// complete, and yielding its resolved result. Any tasks or timers |
278 | /// which the future spawns internally will be executed on the runtime. |
279 | /// |
280 | /// # Non-worker future |
281 | /// |
282 | /// Note that the future required by this function does not run as a |
283 | /// worker. The expectation is that other tasks are spawned by the future here. |
284 | /// Awaiting on other futures from the future provided here will not |
285 | /// perform as fast as those spawned as workers. |
286 | /// |
287 | /// # Multi thread scheduler |
288 | /// |
289 | /// When the multi thread scheduler is used this will allow futures |
290 | /// to run within the io driver and timer context of the overall runtime. |
291 | /// |
292 | /// Any spawned tasks will continue running after `block_on` returns. |
293 | /// |
294 | /// # Current thread scheduler |
295 | /// |
296 | /// When the current thread scheduler is enabled `block_on` |
297 | /// can be called concurrently from multiple threads. The first call |
298 | /// will take ownership of the io and timer drivers. This means |
299 | /// other threads which do not own the drivers will hook into that one. |
300 | /// When the first `block_on` completes, other threads will be able to |
301 | /// "steal" the driver to allow continued execution of their futures. |
302 | /// |
303 | /// Any spawned tasks will be suspended after `block_on` returns. Calling |
304 | /// `block_on` again will resume previously spawned tasks. |
305 | /// |
306 | /// # Panics |
307 | /// |
308 | /// This function panics if the provided future panics, or if called within an |
309 | /// asynchronous execution context. |
310 | /// |
311 | /// # Examples |
312 | /// |
313 | /// ```no_run |
314 | /// use tokio::runtime::Runtime; |
315 | /// |
316 | /// // Create the runtime |
317 | /// let rt = Runtime::new().unwrap(); |
318 | /// |
319 | /// // Execute the future, blocking the current thread until completion |
320 | /// rt.block_on(async { |
321 | /// println!("hello" ); |
322 | /// }); |
323 | /// ``` |
324 | /// |
325 | /// [handle]: fn@Handle::block_on |
326 | #[track_caller ] |
327 | pub fn block_on<F: Future>(&self, future: F) -> F::Output { |
328 | #[cfg (all( |
329 | tokio_unstable, |
330 | tokio_taskdump, |
331 | feature = "rt" , |
332 | target_os = "linux" , |
333 | any(target_arch = "aarch64" , target_arch = "x86" , target_arch = "x86_64" ) |
334 | ))] |
335 | let future = super::task::trace::Trace::root(future); |
336 | |
337 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
338 | let future = crate::util::trace::task( |
339 | future, |
340 | "block_on" , |
341 | None, |
342 | crate::runtime::task::Id::next().as_u64(), |
343 | ); |
344 | |
345 | let _enter = self.enter(); |
346 | |
347 | match &self.scheduler { |
348 | Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future), |
349 | #[cfg (all(feature = "rt-multi-thread" , not(target_os = "wasi" )))] |
350 | Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future), |
351 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" , not(target_os = "wasi" )))] |
352 | Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future), |
353 | } |
354 | } |
355 | |
356 | /// Enters the runtime context. |
357 | /// |
358 | /// This allows you to construct types that must have an executor |
359 | /// available on creation such as [`Sleep`] or [`TcpStream`]. It will |
360 | /// also allow you to call methods such as [`tokio::spawn`]. |
361 | /// |
362 | /// [`Sleep`]: struct@crate::time::Sleep |
363 | /// [`TcpStream`]: struct@crate::net::TcpStream |
364 | /// [`tokio::spawn`]: fn@crate::spawn |
365 | /// |
366 | /// # Example |
367 | /// |
368 | /// ``` |
369 | /// use tokio::runtime::Runtime; |
370 | /// |
371 | /// fn function_that_spawns(msg: String) { |
372 | /// // Had we not used `rt.enter` below, this would panic. |
373 | /// tokio::spawn(async move { |
374 | /// println!("{}" , msg); |
375 | /// }); |
376 | /// } |
377 | /// |
378 | /// fn main() { |
379 | /// let rt = Runtime::new().unwrap(); |
380 | /// |
381 | /// let s = "Hello World!" .to_string(); |
382 | /// |
383 | /// // By entering the context, we tie `tokio::spawn` to this executor. |
384 | /// let _guard = rt.enter(); |
385 | /// function_that_spawns(s); |
386 | /// } |
387 | /// ``` |
388 | pub fn enter(&self) -> EnterGuard<'_> { |
389 | self.handle.enter() |
390 | } |
391 | |
392 | /// Shuts down the runtime, waiting for at most `duration` for all spawned |
393 | /// work to stop. |
394 | /// |
395 | /// See the [struct level documentation](Runtime#shutdown) for more details. |
396 | /// |
397 | /// # Examples |
398 | /// |
399 | /// ``` |
400 | /// use tokio::runtime::Runtime; |
401 | /// use tokio::task; |
402 | /// |
403 | /// use std::thread; |
404 | /// use std::time::Duration; |
405 | /// |
406 | /// fn main() { |
407 | /// let runtime = Runtime::new().unwrap(); |
408 | /// |
409 | /// runtime.block_on(async move { |
410 | /// task::spawn_blocking(move || { |
411 | /// thread::sleep(Duration::from_secs(10_000)); |
412 | /// }); |
413 | /// }); |
414 | /// |
415 | /// runtime.shutdown_timeout(Duration::from_millis(100)); |
416 | /// } |
417 | /// ``` |
418 | pub fn shutdown_timeout(mut self, duration: Duration) { |
419 | // Wakeup and shutdown all the worker threads |
420 | self.handle.inner.shutdown(); |
421 | self.blocking_pool.shutdown(Some(duration)); |
422 | } |
423 | |
424 | /// Shuts down the runtime, without waiting for any spawned work to stop. |
425 | /// |
426 | /// This can be useful if you want to drop a runtime from within another runtime. |
427 | /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks |
428 | /// to complete, which would normally not be permitted within an asynchronous context. |
429 | /// By calling `shutdown_background()`, you can drop the runtime from such a context. |
430 | /// |
431 | /// Note however, that because we do not wait for any blocking tasks to complete, this |
432 | /// may result in a resource leak (in that any blocking tasks are still running until they |
433 | /// return. |
434 | /// |
435 | /// See the [struct level documentation](Runtime#shutdown) for more details. |
436 | /// |
437 | /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`. |
438 | /// |
439 | /// ``` |
440 | /// use tokio::runtime::Runtime; |
441 | /// |
442 | /// fn main() { |
443 | /// let runtime = Runtime::new().unwrap(); |
444 | /// |
445 | /// runtime.block_on(async move { |
446 | /// let inner_runtime = Runtime::new().unwrap(); |
447 | /// // ... |
448 | /// inner_runtime.shutdown_background(); |
449 | /// }); |
450 | /// } |
451 | /// ``` |
452 | pub fn shutdown_background(self) { |
453 | self.shutdown_timeout(Duration::from_nanos(0)); |
454 | } |
455 | } |
456 | |
457 | #[allow (clippy::single_match)] // there are comments in the error branch, so we don't want if-let |
458 | impl Drop for Runtime { |
459 | fn drop(&mut self) { |
460 | match &mut self.scheduler { |
461 | Scheduler::CurrentThread(current_thread) => { |
462 | // This ensures that tasks spawned on the current-thread |
463 | // runtime are dropped inside the runtime's context. |
464 | let _guard = context::try_set_current(&self.handle.inner); |
465 | current_thread.shutdown(&self.handle.inner); |
466 | } |
467 | #[cfg (all(feature = "rt-multi-thread" , not(target_os = "wasi" )))] |
468 | Scheduler::MultiThread(multi_thread) => { |
469 | // The threaded scheduler drops its tasks on its worker threads, which is |
470 | // already in the runtime's context. |
471 | multi_thread.shutdown(&self.handle.inner); |
472 | } |
473 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" , not(target_os = "wasi" )))] |
474 | Scheduler::MultiThreadAlt(multi_thread) => { |
475 | // The threaded scheduler drops its tasks on its worker threads, which is |
476 | // already in the runtime's context. |
477 | multi_thread.shutdown(&self.handle.inner); |
478 | } |
479 | } |
480 | } |
481 | } |
482 | |
483 | impl std::panic::UnwindSafe for Runtime {} |
484 | |
485 | impl std::panic::RefUnwindSafe for Runtime {} |
486 | |
487 | cfg_metrics! { |
488 | impl Runtime { |
489 | /// TODO |
490 | pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { |
491 | self.handle.metrics() |
492 | } |
493 | } |
494 | } |
495 | |