1 | //! Tokio context aware futures utilities. |
2 | //! |
3 | //! This module includes utilities around integrating tokio with other runtimes |
4 | //! by allowing the context to be attached to futures. This allows spawning |
5 | //! futures on other executors while still using tokio to drive them. This |
6 | //! can be useful if you need to use a tokio based library in an executor/runtime |
7 | //! that does not provide a tokio context. |
8 | |
9 | use pin_project_lite::pin_project; |
10 | use std::{ |
11 | future::Future, |
12 | pin::Pin, |
13 | task::{Context, Poll}, |
14 | }; |
15 | use tokio::runtime::{Handle, Runtime}; |
16 | |
17 | pin_project! { |
18 | /// `TokioContext` allows running futures that must be inside Tokio's |
19 | /// context on a non-Tokio runtime. |
20 | /// |
21 | /// It contains a [`Handle`] to the runtime. A handle to the runtime can be |
22 | /// obtain by calling the [`Runtime::handle()`] method. |
23 | /// |
24 | /// Note that the `TokioContext` wrapper only works if the `Runtime` it is |
25 | /// connected to has not yet been destroyed. You must keep the `Runtime` |
26 | /// alive until the future has finished executing. |
27 | /// |
28 | /// **Warning:** If `TokioContext` is used together with a [current thread] |
29 | /// runtime, that runtime must be inside a call to `block_on` for the |
30 | /// wrapped future to work. For this reason, it is recommended to use a |
31 | /// [multi thread] runtime, even if you configure it to only spawn one |
32 | /// worker thread. |
33 | /// |
34 | /// # Examples |
35 | /// |
36 | /// This example creates two runtimes, but only [enables time] on one of |
37 | /// them. It then uses the context of the runtime with the timer enabled to |
38 | /// execute a [`sleep`] future on the runtime with timing disabled. |
39 | /// ``` |
40 | /// use tokio::time::{sleep, Duration}; |
41 | /// use tokio_util::context::RuntimeExt; |
42 | /// |
43 | /// // This runtime has timers enabled. |
44 | /// let rt = tokio::runtime::Builder::new_multi_thread() |
45 | /// .enable_all() |
46 | /// .build() |
47 | /// .unwrap(); |
48 | /// |
49 | /// // This runtime has timers disabled. |
50 | /// let rt2 = tokio::runtime::Builder::new_multi_thread() |
51 | /// .build() |
52 | /// .unwrap(); |
53 | /// |
54 | /// // Wrap the sleep future in the context of rt. |
55 | /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); |
56 | /// |
57 | /// // Execute the future on rt2. |
58 | /// rt2.block_on(fut); |
59 | /// ``` |
60 | /// |
61 | /// [`Handle`]: struct@tokio::runtime::Handle |
62 | /// [`Runtime::handle()`]: fn@tokio::runtime::Runtime::handle |
63 | /// [`RuntimeExt`]: trait@crate::context::RuntimeExt |
64 | /// [`new_static`]: fn@Self::new_static |
65 | /// [`sleep`]: fn@tokio::time::sleep |
66 | /// [current thread]: fn@tokio::runtime::Builder::new_current_thread |
67 | /// [enables time]: fn@tokio::runtime::Builder::enable_time |
68 | /// [multi thread]: fn@tokio::runtime::Builder::new_multi_thread |
69 | pub struct TokioContext<F> { |
70 | #[pin] |
71 | inner: F, |
72 | handle: Handle, |
73 | } |
74 | } |
75 | |
76 | impl<F> TokioContext<F> { |
77 | /// Associate the provided future with the context of the runtime behind |
78 | /// the provided `Handle`. |
79 | /// |
80 | /// This constructor uses a `'static` lifetime to opt-out of checking that |
81 | /// the runtime still exists. |
82 | /// |
83 | /// # Examples |
84 | /// |
85 | /// This is the same as the example above, but uses the `new` constructor |
86 | /// rather than [`RuntimeExt::wrap`]. |
87 | /// |
88 | /// [`RuntimeExt::wrap`]: fn@RuntimeExt::wrap |
89 | /// |
90 | /// ``` |
91 | /// use tokio::time::{sleep, Duration}; |
92 | /// use tokio_util::context::TokioContext; |
93 | /// |
94 | /// // This runtime has timers enabled. |
95 | /// let rt = tokio::runtime::Builder::new_multi_thread() |
96 | /// .enable_all() |
97 | /// .build() |
98 | /// .unwrap(); |
99 | /// |
100 | /// // This runtime has timers disabled. |
101 | /// let rt2 = tokio::runtime::Builder::new_multi_thread() |
102 | /// .build() |
103 | /// .unwrap(); |
104 | /// |
105 | /// let fut = TokioContext::new( |
106 | /// async { sleep(Duration::from_millis(2)).await }, |
107 | /// rt.handle().clone(), |
108 | /// ); |
109 | /// |
110 | /// // Execute the future on rt2. |
111 | /// rt2.block_on(fut); |
112 | /// ``` |
113 | pub fn new(future: F, handle: Handle) -> TokioContext<F> { |
114 | TokioContext { |
115 | inner: future, |
116 | handle, |
117 | } |
118 | } |
119 | |
120 | /// Obtain a reference to the handle inside this `TokioContext`. |
121 | pub fn handle(&self) -> &Handle { |
122 | &self.handle |
123 | } |
124 | |
125 | /// Remove the association between the Tokio runtime and the wrapped future. |
126 | pub fn into_inner(self) -> F { |
127 | self.inner |
128 | } |
129 | } |
130 | |
131 | impl<F: Future> Future for TokioContext<F> { |
132 | type Output = F::Output; |
133 | |
134 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
135 | let me = self.project(); |
136 | let handle = me.handle; |
137 | let fut = me.inner; |
138 | |
139 | let _enter = handle.enter(); |
140 | fut.poll(cx) |
141 | } |
142 | } |
143 | |
144 | /// Extension trait that simplifies bundling a `Handle` with a `Future`. |
145 | pub trait RuntimeExt { |
146 | /// Create a [`TokioContext`] that wraps the provided future and runs it in |
147 | /// this runtime's context. |
148 | /// |
149 | /// # Examples |
150 | /// |
151 | /// This example creates two runtimes, but only [enables time] on one of |
152 | /// them. It then uses the context of the runtime with the timer enabled to |
153 | /// execute a [`sleep`] future on the runtime with timing disabled. |
154 | /// |
155 | /// ``` |
156 | /// use tokio::time::{sleep, Duration}; |
157 | /// use tokio_util::context::RuntimeExt; |
158 | /// |
159 | /// // This runtime has timers enabled. |
160 | /// let rt = tokio::runtime::Builder::new_multi_thread() |
161 | /// .enable_all() |
162 | /// .build() |
163 | /// .unwrap(); |
164 | /// |
165 | /// // This runtime has timers disabled. |
166 | /// let rt2 = tokio::runtime::Builder::new_multi_thread() |
167 | /// .build() |
168 | /// .unwrap(); |
169 | /// |
170 | /// // Wrap the sleep future in the context of rt. |
171 | /// let fut = rt.wrap(async { sleep(Duration::from_millis(2)).await }); |
172 | /// |
173 | /// // Execute the future on rt2. |
174 | /// rt2.block_on(fut); |
175 | /// ``` |
176 | /// |
177 | /// [`TokioContext`]: struct@crate::context::TokioContext |
178 | /// [`sleep`]: fn@tokio::time::sleep |
179 | /// [enables time]: fn@tokio::runtime::Builder::enable_time |
180 | fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>; |
181 | } |
182 | |
183 | impl RuntimeExt for Runtime { |
184 | fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> { |
185 | TokioContext { |
186 | inner: fut, |
187 | handle: self.handle().clone(), |
188 | } |
189 | } |
190 | } |
191 | |