1 | use super::{current, park, Builder, JoinInner, Result, Thread}; |
2 | use crate::fmt; |
3 | use crate::io; |
4 | use crate::marker::PhantomData; |
5 | use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe}; |
6 | use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; |
7 | use crate::sync::Arc; |
8 | |
9 | /// A scope to spawn scoped threads in. |
10 | /// |
11 | /// See [`scope`] for details. |
12 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
13 | pub struct Scope<'scope, 'env: 'scope> { |
14 | data: Arc<ScopeData>, |
15 | /// Invariance over 'scope, to make sure 'scope cannot shrink, |
16 | /// which is necessary for soundness. |
17 | /// |
18 | /// Without invariance, this would compile fine but be unsound: |
19 | /// |
20 | /// ```compile_fail,E0373 |
21 | /// std::thread::scope(|s| { |
22 | /// s.spawn(|| { |
23 | /// let a = String::from("abcd" ); |
24 | /// s.spawn(|| println!("{a:?}" )); // might run after `a` is dropped |
25 | /// }); |
26 | /// }); |
27 | /// ``` |
28 | scope: PhantomData<&'scope mut &'scope ()>, |
29 | env: PhantomData<&'env mut &'env ()>, |
30 | } |
31 | |
32 | /// An owned permission to join on a scoped thread (block on its termination). |
33 | /// |
34 | /// See [`Scope::spawn`] for details. |
35 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
36 | pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>); |
37 | |
38 | pub(super) struct ScopeData { |
39 | num_running_threads: AtomicUsize, |
40 | a_thread_panicked: AtomicBool, |
41 | main_thread: Thread, |
42 | } |
43 | |
44 | impl ScopeData { |
45 | pub(super) fn increment_num_running_threads(&self) { |
46 | // We check for 'overflow' with usize::MAX / 2, to make sure there's no |
47 | // chance it overflows to 0, which would result in unsoundness. |
48 | if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 { |
49 | // This can only reasonably happen by mem::forget()'ing a lot of ScopedJoinHandles. |
50 | self.overflow(); |
51 | } |
52 | } |
53 | |
54 | #[cold ] |
55 | fn overflow(&self) { |
56 | self.decrement_num_running_threads(false); |
57 | panic!("too many running threads in thread scope" ); |
58 | } |
59 | |
60 | pub(super) fn decrement_num_running_threads(&self, panic: bool) { |
61 | if panic { |
62 | self.a_thread_panicked.store(true, Ordering::Relaxed); |
63 | } |
64 | if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 { |
65 | self.main_thread.unpark(); |
66 | } |
67 | } |
68 | } |
69 | |
70 | /// Create a scope for spawning scoped threads. |
71 | /// |
72 | /// The function passed to `scope` will be provided a [`Scope`] object, |
73 | /// through which scoped threads can be [spawned][`Scope::spawn`]. |
74 | /// |
75 | /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data, |
76 | /// as the scope guarantees all threads will be joined at the end of the scope. |
77 | /// |
78 | /// All threads spawned within the scope that haven't been manually joined |
79 | /// will be automatically joined before this function returns. |
80 | /// |
81 | /// # Panics |
82 | /// |
83 | /// If any of the automatically joined threads panicked, this function will panic. |
84 | /// |
85 | /// If you want to handle panics from spawned threads, |
86 | /// [`join`][ScopedJoinHandle::join] them before the end of the scope. |
87 | /// |
88 | /// # Example |
89 | /// |
90 | /// ``` |
91 | /// use std::thread; |
92 | /// |
93 | /// let mut a = vec![1, 2, 3]; |
94 | /// let mut x = 0; |
95 | /// |
96 | /// thread::scope(|s| { |
97 | /// s.spawn(|| { |
98 | /// println!("hello from the first scoped thread" ); |
99 | /// // We can borrow `a` here. |
100 | /// dbg!(&a); |
101 | /// }); |
102 | /// s.spawn(|| { |
103 | /// println!("hello from the second scoped thread" ); |
104 | /// // We can even mutably borrow `x` here, |
105 | /// // because no other threads are using it. |
106 | /// x += a[0] + a[2]; |
107 | /// }); |
108 | /// println!("hello from the main thread" ); |
109 | /// }); |
110 | /// |
111 | /// // After the scope, we can modify and access our variables again: |
112 | /// a.push(4); |
113 | /// assert_eq!(x, a.len()); |
114 | /// ``` |
115 | /// |
116 | /// # Lifetimes |
117 | /// |
118 | /// Scoped threads involve two lifetimes: `'scope` and `'env`. |
119 | /// |
120 | /// The `'scope` lifetime represents the lifetime of the scope itself. |
121 | /// That is: the time during which new scoped threads may be spawned, |
122 | /// and also the time during which they might still be running. |
123 | /// Once this lifetime ends, all scoped threads are joined. |
124 | /// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts. |
125 | /// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns. |
126 | /// |
127 | /// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads. |
128 | /// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`. |
129 | /// It can be as small as the call to `scope`, meaning that anything that outlives this call, |
130 | /// such as local variables defined right before the scope, can be borrowed by the scoped threads. |
131 | /// |
132 | /// The `'env: 'scope` bound is part of the definition of the `Scope` type. |
133 | #[track_caller ] |
134 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
135 | pub fn scope<'env, F, T>(f: F) -> T |
136 | where |
137 | F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, |
138 | { |
139 | // We put the `ScopeData` into an `Arc` so that other threads can finish their |
140 | // `decrement_num_running_threads` even after this function returns. |
141 | let scope = Scope { |
142 | data: Arc::new(ScopeData { |
143 | num_running_threads: AtomicUsize::new(0), |
144 | main_thread: current(), |
145 | a_thread_panicked: AtomicBool::new(false), |
146 | }), |
147 | env: PhantomData, |
148 | scope: PhantomData, |
149 | }; |
150 | |
151 | // Run `f`, but catch panics so we can make sure to wait for all the threads to join. |
152 | let result = catch_unwind(AssertUnwindSafe(|| f(&scope))); |
153 | |
154 | // Wait until all the threads are finished. |
155 | while scope.data.num_running_threads.load(Ordering::Acquire) != 0 { |
156 | park(); |
157 | } |
158 | |
159 | // Throw any panic from `f`, or the return value of `f` if no thread panicked. |
160 | match result { |
161 | Err(e) => resume_unwind(e), |
162 | Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => { |
163 | panic!("a scoped thread panicked" ) |
164 | } |
165 | Ok(result) => result, |
166 | } |
167 | } |
168 | |
169 | impl<'scope, 'env> Scope<'scope, 'env> { |
170 | /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it. |
171 | /// |
172 | /// Unlike non-scoped threads, threads spawned with this function may |
173 | /// borrow non-`'static` data from the outside the scope. See [`scope`] for |
174 | /// details. |
175 | /// |
176 | /// The join handle provides a [`join`] method that can be used to join the spawned |
177 | /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing |
178 | /// the panic payload. |
179 | /// |
180 | /// If the join handle is dropped, the spawned thread will implicitly joined at the |
181 | /// end of the scope. In that case, if the spawned thread panics, [`scope`] will |
182 | /// panic after all threads are joined. |
183 | /// |
184 | /// This call will create a thread using default parameters of [`Builder`]. |
185 | /// If you want to specify the stack size or the name of the thread, use |
186 | /// [`Builder::spawn_scoped`] instead. |
187 | /// |
188 | /// # Panics |
189 | /// |
190 | /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`] |
191 | /// to recover from such errors. |
192 | /// |
193 | /// [`join`]: ScopedJoinHandle::join |
194 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
195 | pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> |
196 | where |
197 | F: FnOnce() -> T + Send + 'scope, |
198 | T: Send + 'scope, |
199 | { |
200 | Builder::new().spawn_scoped(self, f).expect("failed to spawn thread" ) |
201 | } |
202 | } |
203 | |
204 | impl Builder { |
205 | /// Spawns a new scoped thread using the settings set through this `Builder`. |
206 | /// |
207 | /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to |
208 | /// capture any failure to create the thread at the OS level. |
209 | /// |
210 | /// [`io::Result`]: crate::io::Result |
211 | /// |
212 | /// # Panics |
213 | /// |
214 | /// Panics if a thread name was set and it contained null bytes. |
215 | /// |
216 | /// # Example |
217 | /// |
218 | /// ``` |
219 | /// use std::thread; |
220 | /// |
221 | /// let mut a = vec![1, 2, 3]; |
222 | /// let mut x = 0; |
223 | /// |
224 | /// thread::scope(|s| { |
225 | /// thread::Builder::new() |
226 | /// .name("first" .to_string()) |
227 | /// .spawn_scoped(s, || |
228 | /// { |
229 | /// println!("hello from the {:?} scoped thread" , thread::current().name()); |
230 | /// // We can borrow `a` here. |
231 | /// dbg!(&a); |
232 | /// }) |
233 | /// .unwrap(); |
234 | /// thread::Builder::new() |
235 | /// .name("second" .to_string()) |
236 | /// .spawn_scoped(s, || |
237 | /// { |
238 | /// println!("hello from the {:?} scoped thread" , thread::current().name()); |
239 | /// // We can even mutably borrow `x` here, |
240 | /// // because no other threads are using it. |
241 | /// x += a[0] + a[2]; |
242 | /// }) |
243 | /// .unwrap(); |
244 | /// println!("hello from the main thread" ); |
245 | /// }); |
246 | /// |
247 | /// // After the scope, we can modify and access our variables again: |
248 | /// a.push(4); |
249 | /// assert_eq!(x, a.len()); |
250 | /// ``` |
251 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
252 | pub fn spawn_scoped<'scope, 'env, F, T>( |
253 | self, |
254 | scope: &'scope Scope<'scope, 'env>, |
255 | f: F, |
256 | ) -> io::Result<ScopedJoinHandle<'scope, T>> |
257 | where |
258 | F: FnOnce() -> T + Send + 'scope, |
259 | T: Send + 'scope, |
260 | { |
261 | Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?)) |
262 | } |
263 | } |
264 | |
265 | impl<'scope, T> ScopedJoinHandle<'scope, T> { |
266 | /// Extracts a handle to the underlying thread. |
267 | /// |
268 | /// # Examples |
269 | /// |
270 | /// ``` |
271 | /// use std::thread; |
272 | /// |
273 | /// thread::scope(|s| { |
274 | /// let t = s.spawn(|| { |
275 | /// println!("hello" ); |
276 | /// }); |
277 | /// println!("thread id: {:?}" , t.thread().id()); |
278 | /// }); |
279 | /// ``` |
280 | #[must_use ] |
281 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
282 | pub fn thread(&self) -> &Thread { |
283 | &self.0.thread |
284 | } |
285 | |
286 | /// Waits for the associated thread to finish. |
287 | /// |
288 | /// This function will return immediately if the associated thread has already finished. |
289 | /// |
290 | /// In terms of [atomic memory orderings], the completion of the associated |
291 | /// thread synchronizes with this function returning. |
292 | /// In other words, all operations performed by that thread |
293 | /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses) |
294 | /// all operations that happen after `join` returns. |
295 | /// |
296 | /// If the associated thread panics, [`Err`] is returned with the panic payload. |
297 | /// |
298 | /// [atomic memory orderings]: crate::sync::atomic |
299 | /// |
300 | /// # Examples |
301 | /// |
302 | /// ``` |
303 | /// use std::thread; |
304 | /// |
305 | /// thread::scope(|s| { |
306 | /// let t = s.spawn(|| { |
307 | /// panic!("oh no" ); |
308 | /// }); |
309 | /// assert!(t.join().is_err()); |
310 | /// }); |
311 | /// ``` |
312 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
313 | pub fn join(self) -> Result<T> { |
314 | self.0.join() |
315 | } |
316 | |
317 | /// Checks if the associated thread has finished running its main function. |
318 | /// |
319 | /// `is_finished` supports implementing a non-blocking join operation, by checking |
320 | /// `is_finished`, and calling `join` if it returns `true`. This function does not block. To |
321 | /// block while waiting on the thread to finish, use [`join`][Self::join]. |
322 | /// |
323 | /// This might return `true` for a brief moment after the thread's main |
324 | /// function has returned, but before the thread itself has stopped running. |
325 | /// However, once this returns `true`, [`join`][Self::join] can be expected |
326 | /// to return quickly, without blocking for any significant amount of time. |
327 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
328 | pub fn is_finished(&self) -> bool { |
329 | Arc::strong_count(&self.0.packet) == 1 |
330 | } |
331 | } |
332 | |
333 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
334 | impl fmt::Debug for Scope<'_, '_> { |
335 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
336 | f&mut DebugStruct<'_, '_>.debug_struct("Scope" ) |
337 | .field("num_running_threads" , &self.data.num_running_threads.load(Ordering::Relaxed)) |
338 | .field("a_thread_panicked" , &self.data.a_thread_panicked.load(Ordering::Relaxed)) |
339 | .field(name:"main_thread" , &self.data.main_thread) |
340 | .finish_non_exhaustive() |
341 | } |
342 | } |
343 | |
344 | #[stable (feature = "scoped_threads" , since = "1.63.0" )] |
345 | impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> { |
346 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
347 | f.debug_struct(name:"ScopedJoinHandle" ).finish_non_exhaustive() |
348 | } |
349 | } |
350 | |