1use super::{current, park, Builder, JoinInner, Result, Thread};
2use crate::fmt;
3use crate::io;
4use crate::marker::PhantomData;
5use crate::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
6use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7use crate::sync::Arc;
8
9/// A scope to spawn scoped threads in.
10///
11/// See [`scope`] for details.
12#[stable(feature = "scoped_threads", since = "1.63.0")]
13pub struct Scope<'scope, 'env: 'scope> {
14 data: Arc<ScopeData>,
15 /// Invariance over 'scope, to make sure 'scope cannot shrink,
16 /// which is necessary for soundness.
17 ///
18 /// Without invariance, this would compile fine but be unsound:
19 ///
20 /// ```compile_fail,E0373
21 /// std::thread::scope(|s| {
22 /// s.spawn(|| {
23 /// let a = String::from("abcd");
24 /// s.spawn(|| println!("{a:?}")); // might run after `a` is dropped
25 /// });
26 /// });
27 /// ```
28 scope: PhantomData<&'scope mut &'scope ()>,
29 env: PhantomData<&'env mut &'env ()>,
30}
31
32/// An owned permission to join on a scoped thread (block on its termination).
33///
34/// See [`Scope::spawn`] for details.
35#[stable(feature = "scoped_threads", since = "1.63.0")]
36pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>);
37
38pub(super) struct ScopeData {
39 num_running_threads: AtomicUsize,
40 a_thread_panicked: AtomicBool,
41 main_thread: Thread,
42}
43
44impl ScopeData {
45 pub(super) fn increment_num_running_threads(&self) {
46 // We check for 'overflow' with usize::MAX / 2, to make sure there's no
47 // chance it overflows to 0, which would result in unsoundness.
48 if self.num_running_threads.fetch_add(val:1, order:Ordering::Relaxed) > usize::MAX / 2 {
49 // This can only reasonably happen by mem::forget()'ing a lot of ScopedJoinHandles.
50 self.decrement_num_running_threads(panic:false);
51 panic!("too many running threads in thread scope");
52 }
53 }
54 pub(super) fn decrement_num_running_threads(&self, panic: bool) {
55 if panic {
56 self.a_thread_panicked.store(val:true, order:Ordering::Relaxed);
57 }
58 if self.num_running_threads.fetch_sub(val:1, order:Ordering::Release) == 1 {
59 self.main_thread.unpark();
60 }
61 }
62}
63
64/// Create a scope for spawning scoped threads.
65///
66/// The function passed to `scope` will be provided a [`Scope`] object,
67/// through which scoped threads can be [spawned][`Scope::spawn`].
68///
69/// Unlike non-scoped threads, scoped threads can borrow non-`'static` data,
70/// as the scope guarantees all threads will be joined at the end of the scope.
71///
72/// All threads spawned within the scope that haven't been manually joined
73/// will be automatically joined before this function returns.
74///
75/// # Panics
76///
77/// If any of the automatically joined threads panicked, this function will panic.
78///
79/// If you want to handle panics from spawned threads,
80/// [`join`][ScopedJoinHandle::join] them before the end of the scope.
81///
82/// # Example
83///
84/// ```
85/// use std::thread;
86///
87/// let mut a = vec![1, 2, 3];
88/// let mut x = 0;
89///
90/// thread::scope(|s| {
91/// s.spawn(|| {
92/// println!("hello from the first scoped thread");
93/// // We can borrow `a` here.
94/// dbg!(&a);
95/// });
96/// s.spawn(|| {
97/// println!("hello from the second scoped thread");
98/// // We can even mutably borrow `x` here,
99/// // because no other threads are using it.
100/// x += a[0] + a[2];
101/// });
102/// println!("hello from the main thread");
103/// });
104///
105/// // After the scope, we can modify and access our variables again:
106/// a.push(4);
107/// assert_eq!(x, a.len());
108/// ```
109///
110/// # Lifetimes
111///
112/// Scoped threads involve two lifetimes: `'scope` and `'env`.
113///
114/// The `'scope` lifetime represents the lifetime of the scope itself.
115/// That is: the time during which new scoped threads may be spawned,
116/// and also the time during which they might still be running.
117/// Once this lifetime ends, all scoped threads are joined.
118/// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts.
119/// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns.
120///
121/// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads.
122/// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`.
123/// It can be as small as the call to `scope`, meaning that anything that outlives this call,
124/// such as local variables defined right before the scope, can be borrowed by the scoped threads.
125///
126/// The `'env: 'scope` bound is part of the definition of the `Scope` type.
127#[track_caller]
128#[stable(feature = "scoped_threads", since = "1.63.0")]
129pub fn scope<'env, F, T>(f: F) -> T
130where
131 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
132{
133 // We put the `ScopeData` into an `Arc` so that other threads can finish their
134 // `decrement_num_running_threads` even after this function returns.
135 let scope = Scope {
136 data: Arc::new(ScopeData {
137 num_running_threads: AtomicUsize::new(0),
138 main_thread: current(),
139 a_thread_panicked: AtomicBool::new(false),
140 }),
141 env: PhantomData,
142 scope: PhantomData,
143 };
144
145 // Run `f`, but catch panics so we can make sure to wait for all the threads to join.
146 let result = catch_unwind(AssertUnwindSafe(|| f(&scope)));
147
148 // Wait until all the threads are finished.
149 while scope.data.num_running_threads.load(Ordering::Acquire) != 0 {
150 park();
151 }
152
153 // Throw any panic from `f`, or the return value of `f` if no thread panicked.
154 match result {
155 Err(e) => resume_unwind(e),
156 Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => {
157 panic!("a scoped thread panicked")
158 }
159 Ok(result) => result,
160 }
161}
162
163impl<'scope, 'env> Scope<'scope, 'env> {
164 /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it.
165 ///
166 /// Unlike non-scoped threads, threads spawned with this function may
167 /// borrow non-`'static` data from the outside the scope. See [`scope`] for
168 /// details.
169 ///
170 /// The join handle provides a [`join`] method that can be used to join the spawned
171 /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing
172 /// the panic payload.
173 ///
174 /// If the join handle is dropped, the spawned thread will implicitly joined at the
175 /// end of the scope. In that case, if the spawned thread panics, [`scope`] will
176 /// panic after all threads are joined.
177 ///
178 /// This call will create a thread using default parameters of [`Builder`].
179 /// If you want to specify the stack size or the name of the thread, use
180 /// [`Builder::spawn_scoped`] instead.
181 ///
182 /// # Panics
183 ///
184 /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`]
185 /// to recover from such errors.
186 ///
187 /// [`join`]: ScopedJoinHandle::join
188 #[stable(feature = "scoped_threads", since = "1.63.0")]
189 pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
190 where
191 F: FnOnce() -> T + Send + 'scope,
192 T: Send + 'scope,
193 {
194 Builder::new().spawn_scoped(self, f).expect("failed to spawn thread")
195 }
196}
197
198impl Builder {
199 /// Spawns a new scoped thread using the settings set through this `Builder`.
200 ///
201 /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to
202 /// capture any failure to create the thread at the OS level.
203 ///
204 /// [`io::Result`]: crate::io::Result
205 ///
206 /// # Panics
207 ///
208 /// Panics if a thread name was set and it contained null bytes.
209 ///
210 /// # Example
211 ///
212 /// ```
213 /// use std::thread;
214 ///
215 /// let mut a = vec![1, 2, 3];
216 /// let mut x = 0;
217 ///
218 /// thread::scope(|s| {
219 /// thread::Builder::new()
220 /// .name("first".to_string())
221 /// .spawn_scoped(s, ||
222 /// {
223 /// println!("hello from the {:?} scoped thread", thread::current().name());
224 /// // We can borrow `a` here.
225 /// dbg!(&a);
226 /// })
227 /// .unwrap();
228 /// thread::Builder::new()
229 /// .name("second".to_string())
230 /// .spawn_scoped(s, ||
231 /// {
232 /// println!("hello from the {:?} scoped thread", thread::current().name());
233 /// // We can even mutably borrow `x` here,
234 /// // because no other threads are using it.
235 /// x += a[0] + a[2];
236 /// })
237 /// .unwrap();
238 /// println!("hello from the main thread");
239 /// });
240 ///
241 /// // After the scope, we can modify and access our variables again:
242 /// a.push(4);
243 /// assert_eq!(x, a.len());
244 /// ```
245 #[stable(feature = "scoped_threads", since = "1.63.0")]
246 pub fn spawn_scoped<'scope, 'env, F, T>(
247 self,
248 scope: &'scope Scope<'scope, 'env>,
249 f: F,
250 ) -> io::Result<ScopedJoinHandle<'scope, T>>
251 where
252 F: FnOnce() -> T + Send + 'scope,
253 T: Send + 'scope,
254 {
255 Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?))
256 }
257}
258
259impl<'scope, T> ScopedJoinHandle<'scope, T> {
260 /// Extracts a handle to the underlying thread.
261 ///
262 /// # Examples
263 ///
264 /// ```
265 /// use std::thread;
266 ///
267 /// thread::scope(|s| {
268 /// let t = s.spawn(|| {
269 /// println!("hello");
270 /// });
271 /// println!("thread id: {:?}", t.thread().id());
272 /// });
273 /// ```
274 #[must_use]
275 #[stable(feature = "scoped_threads", since = "1.63.0")]
276 pub fn thread(&self) -> &Thread {
277 &self.0.thread
278 }
279
280 /// Waits for the associated thread to finish.
281 ///
282 /// This function will return immediately if the associated thread has already finished.
283 ///
284 /// In terms of [atomic memory orderings], the completion of the associated
285 /// thread synchronizes with this function returning.
286 /// In other words, all operations performed by that thread
287 /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses)
288 /// all operations that happen after `join` returns.
289 ///
290 /// If the associated thread panics, [`Err`] is returned with the panic payload.
291 ///
292 /// [atomic memory orderings]: crate::sync::atomic
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// use std::thread;
298 ///
299 /// thread::scope(|s| {
300 /// let t = s.spawn(|| {
301 /// panic!("oh no");
302 /// });
303 /// assert!(t.join().is_err());
304 /// });
305 /// ```
306 #[stable(feature = "scoped_threads", since = "1.63.0")]
307 pub fn join(self) -> Result<T> {
308 self.0.join()
309 }
310
311 /// Checks if the associated thread has finished running its main function.
312 ///
313 /// `is_finished` supports implementing a non-blocking join operation, by checking
314 /// `is_finished`, and calling `join` if it returns `false`. This function does not block. To
315 /// block while waiting on the thread to finish, use [`join`][Self::join].
316 ///
317 /// This might return `true` for a brief moment after the thread's main
318 /// function has returned, but before the thread itself has stopped running.
319 /// However, once this returns `true`, [`join`][Self::join] can be expected
320 /// to return quickly, without blocking for any significant amount of time.
321 #[stable(feature = "scoped_threads", since = "1.63.0")]
322 pub fn is_finished(&self) -> bool {
323 Arc::strong_count(&self.0.packet) == 1
324 }
325}
326
327#[stable(feature = "scoped_threads", since = "1.63.0")]
328impl fmt::Debug for Scope<'_, '_> {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 f&mut DebugStruct<'_, '_>.debug_struct("Scope")
331 .field("num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed))
332 .field("a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed))
333 .field(name:"main_thread", &self.data.main_thread)
334 .finish_non_exhaustive()
335 }
336}
337
338#[stable(feature = "scoped_threads", since = "1.63.0")]
339impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 f.debug_struct(name:"ScopedJoinHandle").finish_non_exhaustive()
342 }
343}
344