| 1 | use super::{Builder, JoinInner, Result, Thread, current_or_unnamed}; | 
| 2 | use crate::marker::PhantomData; | 
|---|
| 3 | use crate::panic::{AssertUnwindSafe, catch_unwind, resume_unwind}; | 
|---|
| 4 | use crate::sync::Arc; | 
|---|
| 5 | use crate::sync::atomic::{Atomic, AtomicBool, AtomicUsize, Ordering}; | 
|---|
| 6 | use crate::{fmt, io}; | 
|---|
| 7 |  | 
|---|
| 8 | /// A scope to spawn scoped threads in. | 
|---|
| 9 | /// | 
|---|
| 10 | /// See [`scope`] for details. | 
|---|
| 11 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 12 | pub struct Scope<'scope, 'env: 'scope> { | 
|---|
| 13 | data: Arc<ScopeData>, | 
|---|
| 14 | /// Invariance over 'scope, to make sure 'scope cannot shrink, | 
|---|
| 15 | /// which is necessary for soundness. | 
|---|
| 16 | /// | 
|---|
| 17 | /// Without invariance, this would compile fine but be unsound: | 
|---|
| 18 | /// | 
|---|
| 19 | /// ```compile_fail,E0373 | 
|---|
| 20 | /// std::thread::scope(|s| { | 
|---|
| 21 | ///     s.spawn(|| { | 
|---|
| 22 | ///         let a = String::from( "abcd"); | 
|---|
| 23 | ///         s.spawn(|| println!( "{a:?}")); // might run after `a` is dropped | 
|---|
| 24 | ///     }); | 
|---|
| 25 | /// }); | 
|---|
| 26 | /// ``` | 
|---|
| 27 | scope: PhantomData<&'scope mut &'scope ()>, | 
|---|
| 28 | env: PhantomData<&'env mut &'env ()>, | 
|---|
| 29 | } | 
|---|
| 30 |  | 
|---|
| 31 | /// An owned permission to join on a scoped thread (block on its termination). | 
|---|
| 32 | /// | 
|---|
| 33 | /// See [`Scope::spawn`] for details. | 
|---|
| 34 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 35 | pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>); | 
|---|
| 36 |  | 
|---|
| 37 | pub(super) struct ScopeData { | 
|---|
| 38 | num_running_threads: Atomic<usize>, | 
|---|
| 39 | a_thread_panicked: Atomic<bool>, | 
|---|
| 40 | main_thread: Thread, | 
|---|
| 41 | } | 
|---|
| 42 |  | 
|---|
| 43 | impl ScopeData { | 
|---|
| 44 | pub(super) fn increment_num_running_threads(&self) { | 
|---|
| 45 | // We check for 'overflow' with usize::MAX / 2, to make sure there's no | 
|---|
| 46 | // chance it overflows to 0, which would result in unsoundness. | 
|---|
| 47 | if self.num_running_threads.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 { | 
|---|
| 48 | // This can only reasonably happen by mem::forget()'ing a lot of ScopedJoinHandles. | 
|---|
| 49 | self.overflow(); | 
|---|
| 50 | } | 
|---|
| 51 | } | 
|---|
| 52 |  | 
|---|
| 53 | #[ cold] | 
|---|
| 54 | fn overflow(&self) { | 
|---|
| 55 | self.decrement_num_running_threads(false); | 
|---|
| 56 | panic!( "too many running threads in thread scope"); | 
|---|
| 57 | } | 
|---|
| 58 |  | 
|---|
| 59 | pub(super) fn decrement_num_running_threads(&self, panic: bool) { | 
|---|
| 60 | if panic { | 
|---|
| 61 | self.a_thread_panicked.store(true, Ordering::Relaxed); | 
|---|
| 62 | } | 
|---|
| 63 | if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 { | 
|---|
| 64 | self.main_thread.unpark(); | 
|---|
| 65 | } | 
|---|
| 66 | } | 
|---|
| 67 | } | 
|---|
| 68 |  | 
|---|
| 69 | /// Creates a scope for spawning scoped threads. | 
|---|
| 70 | /// | 
|---|
| 71 | /// The function passed to `scope` will be provided a [`Scope`] object, | 
|---|
| 72 | /// through which scoped threads can be [spawned][`Scope::spawn`]. | 
|---|
| 73 | /// | 
|---|
| 74 | /// Unlike non-scoped threads, scoped threads can borrow non-`'static` data, | 
|---|
| 75 | /// as the scope guarantees all threads will be joined at the end of the scope. | 
|---|
| 76 | /// | 
|---|
| 77 | /// All threads spawned within the scope that haven't been manually joined | 
|---|
| 78 | /// will be automatically joined before this function returns. | 
|---|
| 79 | /// | 
|---|
| 80 | /// # Panics | 
|---|
| 81 | /// | 
|---|
| 82 | /// If any of the automatically joined threads panicked, this function will panic. | 
|---|
| 83 | /// | 
|---|
| 84 | /// If you want to handle panics from spawned threads, | 
|---|
| 85 | /// [`join`][ScopedJoinHandle::join] them before the end of the scope. | 
|---|
| 86 | /// | 
|---|
| 87 | /// # Example | 
|---|
| 88 | /// | 
|---|
| 89 | /// ``` | 
|---|
| 90 | /// use std::thread; | 
|---|
| 91 | /// | 
|---|
| 92 | /// let mut a = vec![1, 2, 3]; | 
|---|
| 93 | /// let mut x = 0; | 
|---|
| 94 | /// | 
|---|
| 95 | /// thread::scope(|s| { | 
|---|
| 96 | ///     s.spawn(|| { | 
|---|
| 97 | ///         println!( "hello from the first scoped thread"); | 
|---|
| 98 | ///         // We can borrow `a` here. | 
|---|
| 99 | ///         dbg!(&a); | 
|---|
| 100 | ///     }); | 
|---|
| 101 | ///     s.spawn(|| { | 
|---|
| 102 | ///         println!( "hello from the second scoped thread"); | 
|---|
| 103 | ///         // We can even mutably borrow `x` here, | 
|---|
| 104 | ///         // because no other threads are using it. | 
|---|
| 105 | ///         x += a[0] + a[2]; | 
|---|
| 106 | ///     }); | 
|---|
| 107 | ///     println!( "hello from the main thread"); | 
|---|
| 108 | /// }); | 
|---|
| 109 | /// | 
|---|
| 110 | /// // After the scope, we can modify and access our variables again: | 
|---|
| 111 | /// a.push(4); | 
|---|
| 112 | /// assert_eq!(x, a.len()); | 
|---|
| 113 | /// ``` | 
|---|
| 114 | /// | 
|---|
| 115 | /// # Lifetimes | 
|---|
| 116 | /// | 
|---|
| 117 | /// Scoped threads involve two lifetimes: `'scope` and `'env`. | 
|---|
| 118 | /// | 
|---|
| 119 | /// The `'scope` lifetime represents the lifetime of the scope itself. | 
|---|
| 120 | /// That is: the time during which new scoped threads may be spawned, | 
|---|
| 121 | /// and also the time during which they might still be running. | 
|---|
| 122 | /// Once this lifetime ends, all scoped threads are joined. | 
|---|
| 123 | /// This lifetime starts within the `scope` function, before `f` (the argument to `scope`) starts. | 
|---|
| 124 | /// It ends after `f` returns and all scoped threads have been joined, but before `scope` returns. | 
|---|
| 125 | /// | 
|---|
| 126 | /// The `'env` lifetime represents the lifetime of whatever is borrowed by the scoped threads. | 
|---|
| 127 | /// This lifetime must outlast the call to `scope`, and thus cannot be smaller than `'scope`. | 
|---|
| 128 | /// It can be as small as the call to `scope`, meaning that anything that outlives this call, | 
|---|
| 129 | /// such as local variables defined right before the scope, can be borrowed by the scoped threads. | 
|---|
| 130 | /// | 
|---|
| 131 | /// The `'env: 'scope` bound is part of the definition of the `Scope` type. | 
|---|
| 132 | #[ track_caller] | 
|---|
| 133 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 134 | pub fn scope<'env, F, T>(f: F) -> T | 
|---|
| 135 | where | 
|---|
| 136 | F: for<'scope> dynFnOnce(&'scope Scope<'scope, 'env>) -> T, | 
|---|
| 137 | { | 
|---|
| 138 | // We put the `ScopeData` into an `Arc` so that other threads can finish their | 
|---|
| 139 | // `decrement_num_running_threads` even after this function returns. | 
|---|
| 140 | let scope = Scope { | 
|---|
| 141 | data: Arc::new(ScopeData { | 
|---|
| 142 | num_running_threads: AtomicUsize::new(0), | 
|---|
| 143 | main_thread: current_or_unnamed(), | 
|---|
| 144 | a_thread_panicked: AtomicBool::new(false), | 
|---|
| 145 | }), | 
|---|
| 146 | env: PhantomData, | 
|---|
| 147 | scope: PhantomData, | 
|---|
| 148 | }; | 
|---|
| 149 |  | 
|---|
| 150 | // Run `f`, but catch panics so we can make sure to wait for all the threads to join. | 
|---|
| 151 | let result = catch_unwind(AssertUnwindSafe(|| f(&scope))); | 
|---|
| 152 |  | 
|---|
| 153 | // Wait until all the threads are finished. | 
|---|
| 154 | while scope.data.num_running_threads.load(Ordering::Acquire) != 0 { | 
|---|
| 155 | // SAFETY: this is the main thread, the handle belongs to us. | 
|---|
| 156 | unsafe { scope.data.main_thread.park() }; | 
|---|
| 157 | } | 
|---|
| 158 |  | 
|---|
| 159 | // Throw any panic from `f`, or the return value of `f` if no thread panicked. | 
|---|
| 160 | match result { | 
|---|
| 161 | Err(e) => resume_unwind(e), | 
|---|
| 162 | Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => { | 
|---|
| 163 | panic!( "a scoped thread panicked") | 
|---|
| 164 | } | 
|---|
| 165 | Ok(result) => result, | 
|---|
| 166 | } | 
|---|
| 167 | } | 
|---|
| 168 |  | 
|---|
| 169 | impl<'scope, 'env> Scope<'scope, 'env> { | 
|---|
| 170 | /// Spawns a new thread within a scope, returning a [`ScopedJoinHandle`] for it. | 
|---|
| 171 | /// | 
|---|
| 172 | /// Unlike non-scoped threads, threads spawned with this function may | 
|---|
| 173 | /// borrow non-`'static` data from the outside the scope. See [`scope`] for | 
|---|
| 174 | /// details. | 
|---|
| 175 | /// | 
|---|
| 176 | /// The join handle provides a [`join`] method that can be used to join the spawned | 
|---|
| 177 | /// thread. If the spawned thread panics, [`join`] will return an [`Err`] containing | 
|---|
| 178 | /// the panic payload. | 
|---|
| 179 | /// | 
|---|
| 180 | /// If the join handle is dropped, the spawned thread will be implicitly joined at the | 
|---|
| 181 | /// end of the scope. In that case, if the spawned thread panics, [`scope`] will | 
|---|
| 182 | /// panic after all threads are joined. | 
|---|
| 183 | /// | 
|---|
| 184 | /// This call will create a thread using default parameters of [`Builder`]. | 
|---|
| 185 | /// If you want to specify the stack size or the name of the thread, use | 
|---|
| 186 | /// [`Builder::spawn_scoped`] instead. | 
|---|
| 187 | /// | 
|---|
| 188 | /// # Panics | 
|---|
| 189 | /// | 
|---|
| 190 | /// Panics if the OS fails to create a thread; use [`Builder::spawn_scoped`] | 
|---|
| 191 | /// to recover from such errors. | 
|---|
| 192 | /// | 
|---|
| 193 | /// [`join`]: ScopedJoinHandle::join | 
|---|
| 194 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 195 | pub fn spawn<F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> | 
|---|
| 196 | where | 
|---|
| 197 | F: FnOnce() -> T + Send + 'scope, | 
|---|
| 198 | T: Send + 'scope, | 
|---|
| 199 | { | 
|---|
| 200 | Builder::new().spawn_scoped(self, f).expect( "failed to spawn thread") | 
|---|
| 201 | } | 
|---|
| 202 | } | 
|---|
| 203 |  | 
|---|
| 204 | impl Builder { | 
|---|
| 205 | /// Spawns a new scoped thread using the settings set through this `Builder`. | 
|---|
| 206 | /// | 
|---|
| 207 | /// Unlike [`Scope::spawn`], this method yields an [`io::Result`] to | 
|---|
| 208 | /// capture any failure to create the thread at the OS level. | 
|---|
| 209 | /// | 
|---|
| 210 | /// [`io::Result`]: crate::io::Result | 
|---|
| 211 | /// | 
|---|
| 212 | /// # Panics | 
|---|
| 213 | /// | 
|---|
| 214 | /// Panics if a thread name was set and it contained null bytes. | 
|---|
| 215 | /// | 
|---|
| 216 | /// # Example | 
|---|
| 217 | /// | 
|---|
| 218 | /// ``` | 
|---|
| 219 | /// use std::thread; | 
|---|
| 220 | /// | 
|---|
| 221 | /// let mut a = vec![1, 2, 3]; | 
|---|
| 222 | /// let mut x = 0; | 
|---|
| 223 | /// | 
|---|
| 224 | /// thread::scope(|s| { | 
|---|
| 225 | ///     thread::Builder::new() | 
|---|
| 226 | ///         .name( "first".to_string()) | 
|---|
| 227 | ///         .spawn_scoped(s, || | 
|---|
| 228 | ///     { | 
|---|
| 229 | ///         println!( "hello from the {:?} scoped thread", thread::current().name()); | 
|---|
| 230 | ///         // We can borrow `a` here. | 
|---|
| 231 | ///         dbg!(&a); | 
|---|
| 232 | ///     }) | 
|---|
| 233 | ///     .unwrap(); | 
|---|
| 234 | ///     thread::Builder::new() | 
|---|
| 235 | ///         .name( "second".to_string()) | 
|---|
| 236 | ///         .spawn_scoped(s, || | 
|---|
| 237 | ///     { | 
|---|
| 238 | ///         println!( "hello from the {:?} scoped thread", thread::current().name()); | 
|---|
| 239 | ///         // We can even mutably borrow `x` here, | 
|---|
| 240 | ///         // because no other threads are using it. | 
|---|
| 241 | ///         x += a[0] + a[2]; | 
|---|
| 242 | ///     }) | 
|---|
| 243 | ///     .unwrap(); | 
|---|
| 244 | ///     println!( "hello from the main thread"); | 
|---|
| 245 | /// }); | 
|---|
| 246 | /// | 
|---|
| 247 | /// // After the scope, we can modify and access our variables again: | 
|---|
| 248 | /// a.push(4); | 
|---|
| 249 | /// assert_eq!(x, a.len()); | 
|---|
| 250 | /// ``` | 
|---|
| 251 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 252 | pub fn spawn_scoped<'scope, 'env, F, T>( | 
|---|
| 253 | self, | 
|---|
| 254 | scope: &'scope Scope<'scope, 'env>, | 
|---|
| 255 | f: F, | 
|---|
| 256 | ) -> io::Result<ScopedJoinHandle<'scope, T>> | 
|---|
| 257 | where | 
|---|
| 258 | F: FnOnce() -> T + Send + 'scope, | 
|---|
| 259 | T: Send + 'scope, | 
|---|
| 260 | { | 
|---|
| 261 | Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?)) | 
|---|
| 262 | } | 
|---|
| 263 | } | 
|---|
| 264 |  | 
|---|
| 265 | impl<'scope, T> ScopedJoinHandle<'scope, T> { | 
|---|
| 266 | /// Extracts a handle to the underlying thread. | 
|---|
| 267 | /// | 
|---|
| 268 | /// # Examples | 
|---|
| 269 | /// | 
|---|
| 270 | /// ``` | 
|---|
| 271 | /// use std::thread; | 
|---|
| 272 | /// | 
|---|
| 273 | /// thread::scope(|s| { | 
|---|
| 274 | ///     let t = s.spawn(|| { | 
|---|
| 275 | ///         println!( "hello"); | 
|---|
| 276 | ///     }); | 
|---|
| 277 | ///     println!( "thread id: {:?}", t.thread().id()); | 
|---|
| 278 | /// }); | 
|---|
| 279 | /// ``` | 
|---|
| 280 | #[ must_use] | 
|---|
| 281 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 282 | pub fn thread(&self) -> &Thread { | 
|---|
| 283 | &self.0.thread | 
|---|
| 284 | } | 
|---|
| 285 |  | 
|---|
| 286 | /// Waits for the associated thread to finish. | 
|---|
| 287 | /// | 
|---|
| 288 | /// This function will return immediately if the associated thread has already finished. | 
|---|
| 289 | /// | 
|---|
| 290 | /// In terms of [atomic memory orderings], the completion of the associated | 
|---|
| 291 | /// thread synchronizes with this function returning. | 
|---|
| 292 | /// In other words, all operations performed by that thread | 
|---|
| 293 | /// [happen before](https://doc.rust-lang.org/nomicon/atomics.html#data-accesses) | 
|---|
| 294 | /// all operations that happen after `join` returns. | 
|---|
| 295 | /// | 
|---|
| 296 | /// If the associated thread panics, [`Err`] is returned with the panic payload. | 
|---|
| 297 | /// | 
|---|
| 298 | /// [atomic memory orderings]: crate::sync::atomic | 
|---|
| 299 | /// | 
|---|
| 300 | /// # Examples | 
|---|
| 301 | /// | 
|---|
| 302 | /// ``` | 
|---|
| 303 | /// use std::thread; | 
|---|
| 304 | /// | 
|---|
| 305 | /// thread::scope(|s| { | 
|---|
| 306 | ///     let t = s.spawn(|| { | 
|---|
| 307 | ///         panic!( "oh no"); | 
|---|
| 308 | ///     }); | 
|---|
| 309 | ///     assert!(t.join().is_err()); | 
|---|
| 310 | /// }); | 
|---|
| 311 | /// ``` | 
|---|
| 312 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 313 | pub fn join(self) -> Result<T> { | 
|---|
| 314 | self.0.join() | 
|---|
| 315 | } | 
|---|
| 316 |  | 
|---|
| 317 | /// Checks if the associated thread has finished running its main function. | 
|---|
| 318 | /// | 
|---|
| 319 | /// `is_finished` supports implementing a non-blocking join operation, by checking | 
|---|
| 320 | /// `is_finished`, and calling `join` if it returns `true`. This function does not block. To | 
|---|
| 321 | /// block while waiting on the thread to finish, use [`join`][Self::join]. | 
|---|
| 322 | /// | 
|---|
| 323 | /// This might return `true` for a brief moment after the thread's main | 
|---|
| 324 | /// function has returned, but before the thread itself has stopped running. | 
|---|
| 325 | /// However, once this returns `true`, [`join`][Self::join] can be expected | 
|---|
| 326 | /// to return quickly, without blocking for any significant amount of time. | 
|---|
| 327 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 328 | pub fn is_finished(&self) -> bool { | 
|---|
| 329 | Arc::strong_count(&self.0.packet) == 1 | 
|---|
| 330 | } | 
|---|
| 331 | } | 
|---|
| 332 |  | 
|---|
| 333 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 334 | impl fmt::Debug for Scope<'_, '_> { | 
|---|
| 335 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|---|
| 336 | f&mut DebugStruct<'_, '_>.debug_struct( "Scope") | 
|---|
| 337 | .field( "num_running_threads", &self.data.num_running_threads.load(Ordering::Relaxed)) | 
|---|
| 338 | .field( "a_thread_panicked", &self.data.a_thread_panicked.load(Ordering::Relaxed)) | 
|---|
| 339 | .field(name: "main_thread", &self.data.main_thread) | 
|---|
| 340 | .finish_non_exhaustive() | 
|---|
| 341 | } | 
|---|
| 342 | } | 
|---|
| 343 |  | 
|---|
| 344 | #[ stable(feature = "scoped_threads", since = "1.63.0")] | 
|---|
| 345 | impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> { | 
|---|
| 346 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | 
|---|
| 347 | f.debug_struct(name: "ScopedJoinHandle").finish_non_exhaustive() | 
|---|
| 348 | } | 
|---|
| 349 | } | 
|---|
| 350 |  | 
|---|