1//! Threads that can borrow variables from the stack.
2//!
3//! Create a scope when spawned threads need to access variables on the stack:
4//!
5//! ```
6//! use crossbeam_utils::thread;
7//!
8//! let people = vec![
9//! "Alice".to_string(),
10//! "Bob".to_string(),
11//! "Carol".to_string(),
12//! ];
13//!
14//! thread::scope(|s| {
15//! for person in &people {
16//! s.spawn(move |_| {
17//! println!("Hello, {}!", person);
18//! });
19//! }
20//! }).unwrap();
21//! ```
22//!
23//! # Why scoped threads?
24//!
25//! Suppose we wanted to re-write the previous example using plain threads:
26//!
27//! ```compile_fail,E0597
28//! use std::thread;
29//!
30//! let people = vec![
31//! "Alice".to_string(),
32//! "Bob".to_string(),
33//! "Carol".to_string(),
34//! ];
35//!
36//! let mut threads = Vec::new();
37//!
38//! for person in &people {
39//! threads.push(thread::spawn(move || {
40//! println!("Hello, {}!", person);
41//! }));
42//! }
43//!
44//! for thread in threads {
45//! thread.join().unwrap();
46//! }
47//! ```
48//!
49//! This doesn't work because the borrow checker complains about `people` not living long enough:
50//!
51//! ```text
52//! error[E0597]: `people` does not live long enough
53//! --> src/main.rs:12:20
54//! |
55//! 12 | for person in &people {
56//! | ^^^^^^ borrowed value does not live long enough
57//! ...
58//! 21 | }
59//! | - borrowed value only lives until here
60//! |
61//! = note: borrowed value must be valid for the static lifetime...
62//! ```
63//!
64//! The problem here is that spawned threads are not allowed to borrow variables on stack because
65//! the compiler cannot prove they will be joined before `people` is destroyed.
66//!
67//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
68//! before the scope ends.
69//!
70//! # How scoped threads work
71//!
72//! If a variable is borrowed by a thread, the thread must complete before the variable is
73//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
74//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
75//!
76//! A scope creates a clear boundary between variables outside the scope and threads inside the
77//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
78//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
79//! can safely access variables outside it.
80//!
81//! # Nesting scoped threads
82//!
83//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
84//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
85//! cannot be borrowed by scoped threads:
86//!
87//! ```compile_fail,E0521
88//! use crossbeam_utils::thread;
89//!
90//! thread::scope(|s| {
91//! s.spawn(|_| {
92//! // Not going to compile because we're trying to borrow `s`,
93//! // which lives *inside* the scope! :(
94//! s.spawn(|_| println!("nested thread"));
95//! });
96//! });
97//! ```
98//!
99//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
100//! argument, which can be used for spawning nested threads:
101//!
102//! ```
103//! use crossbeam_utils::thread;
104//!
105//! thread::scope(|s| {
106//! // Note the `|s|` here.
107//! s.spawn(|s| {
108//! // Yay, this works because we're using a fresh argument `s`! :)
109//! s.spawn(|_| println!("nested thread"));
110//! });
111//! }).unwrap();
112//! ```
113
114use std::fmt;
115use std::io;
116use std::marker::PhantomData;
117use std::mem;
118use std::panic;
119use std::sync::{Arc, Mutex};
120use std::thread;
121
122use crate::sync::WaitGroup;
123
124type SharedVec<T> = Arc<Mutex<Vec<T>>>;
125type SharedOption<T> = Arc<Mutex<Option<T>>>;
126
127/// Creates a new scope for spawning threads.
128///
129/// All child threads that haven't been manually joined will be automatically joined just before
130/// this function invocation ends. If all joined threads have successfully completed, `Ok` is
131/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
132/// returned containing errors from panicked threads. Note that if panics are implemented by
133/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
134///
135/// **Note:** Since Rust 1.63, this function is soft-deprecated in favor of the more efficient [`std::thread::scope`].
136///
137/// # Examples
138///
139/// ```
140/// use crossbeam_utils::thread;
141///
142/// let var = vec![1, 2, 3];
143///
144/// thread::scope(|s| {
145/// s.spawn(|_| {
146/// println!("A child thread borrowing `var`: {:?}", var);
147/// });
148/// }).unwrap();
149/// ```
150pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
151where
152 F: FnOnce(&Scope<'env>) -> R,
153{
154 struct AbortOnPanic;
155 impl Drop for AbortOnPanic {
156 fn drop(&mut self) {
157 if thread::panicking() {
158 std::process::abort();
159 }
160 }
161 }
162
163 let wg = WaitGroup::new();
164 let scope = Scope::<'env> {
165 handles: SharedVec::default(),
166 wait_group: wg.clone(),
167 _marker: PhantomData,
168 };
169
170 // Execute the scoped function, but catch any panics.
171 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
172
173 // If an unwinding panic occurs before all threads are joined
174 // promote it to an aborting panic to prevent any threads from escaping the scope.
175 let guard = AbortOnPanic;
176
177 // Wait until all nested scopes are dropped.
178 drop(scope.wait_group);
179 wg.wait();
180
181 // Join all remaining spawned threads.
182 let panics: Vec<_> = scope
183 .handles
184 .lock()
185 .unwrap()
186 // Filter handles that haven't been joined, join them, and collect errors.
187 .drain(..)
188 .filter_map(|handle| handle.lock().unwrap().take())
189 .filter_map(|handle| handle.join().err())
190 .collect();
191
192 mem::forget(guard);
193
194 // If `f` has panicked, resume unwinding.
195 // If any of the child threads have panicked, return the panic errors.
196 // Otherwise, everything is OK and return the result of `f`.
197 match result {
198 Err(err) => panic::resume_unwind(err),
199 Ok(res) => {
200 if panics.is_empty() {
201 Ok(res)
202 } else {
203 Err(Box::new(panics))
204 }
205 }
206 }
207}
208
209/// A scope for spawning threads.
210pub struct Scope<'env> {
211 /// The list of the thread join handles.
212 handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
213
214 /// Used to wait until all subscopes all dropped.
215 wait_group: WaitGroup,
216
217 /// Borrows data with invariant lifetime `'env`.
218 _marker: PhantomData<&'env mut &'env ()>,
219}
220
221unsafe impl Sync for Scope<'_> {}
222
223impl<'env> Scope<'env> {
224 /// Spawns a scoped thread.
225 ///
226 /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
227 /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
228 /// allowing it to reference variables outside the scope.
229 ///
230 /// The scoped thread is passed a reference to this scope as an argument, which can be used for
231 /// spawning nested threads.
232 ///
233 /// The returned [handle](ScopedJoinHandle) can be used to manually
234 /// [join](ScopedJoinHandle::join) the thread before the scope exits.
235 ///
236 /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
237 /// stack size or the name of the thread, use this API instead.
238 ///
239 /// [`spawn`]: std::thread::spawn
240 ///
241 /// # Panics
242 ///
243 /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
244 /// to recover from such errors.
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use crossbeam_utils::thread;
250 ///
251 /// thread::scope(|s| {
252 /// let handle = s.spawn(|_| {
253 /// println!("A child thread is running");
254 /// 42
255 /// });
256 ///
257 /// // Join the thread and retrieve its result.
258 /// let res = handle.join().unwrap();
259 /// assert_eq!(res, 42);
260 /// }).unwrap();
261 /// ```
262 pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
263 where
264 F: FnOnce(&Scope<'env>) -> T,
265 F: Send + 'env,
266 T: Send + 'env,
267 {
268 self.builder()
269 .spawn(f)
270 .expect("failed to spawn scoped thread")
271 }
272
273 /// Creates a builder that can configure a thread before spawning.
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// use crossbeam_utils::thread;
279 ///
280 /// thread::scope(|s| {
281 /// s.builder()
282 /// .spawn(|_| println!("A child thread is running"))
283 /// .unwrap();
284 /// }).unwrap();
285 /// ```
286 pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
287 ScopedThreadBuilder {
288 scope: self,
289 builder: thread::Builder::new(),
290 }
291 }
292}
293
294impl fmt::Debug for Scope<'_> {
295 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
296 f.pad("Scope { .. }")
297 }
298}
299
300/// Configures the properties of a new thread.
301///
302/// The two configurable properties are:
303///
304/// - [`name`]: Specifies an [associated name for the thread][naming-threads].
305/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
306///
307/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
308/// thread handle with the given configuration.
309///
310/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
311/// value. You may want to use this builder when you want to recover from a failure to launch a
312/// thread.
313///
314/// # Examples
315///
316/// ```
317/// use crossbeam_utils::thread;
318///
319/// thread::scope(|s| {
320/// s.builder()
321/// .spawn(|_| println!("Running a child thread"))
322/// .unwrap();
323/// }).unwrap();
324/// ```
325///
326/// [`name`]: ScopedThreadBuilder::name
327/// [`stack_size`]: ScopedThreadBuilder::stack_size
328/// [`spawn`]: ScopedThreadBuilder::spawn
329/// [`io::Result`]: std::io::Result
330/// [naming-threads]: std::thread#naming-threads
331/// [stack-size]: std::thread#stack-size
332#[derive(Debug)]
333pub struct ScopedThreadBuilder<'scope, 'env> {
334 scope: &'scope Scope<'env>,
335 builder: thread::Builder,
336}
337
338impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
339 /// Sets the name for the new thread.
340 ///
341 /// The name must not contain null bytes (`\0`).
342 ///
343 /// For more information about named threads, see [here][naming-threads].
344 ///
345 /// # Examples
346 ///
347 /// ```
348 /// use crossbeam_utils::thread;
349 /// use std::thread::current;
350 ///
351 /// thread::scope(|s| {
352 /// s.builder()
353 /// .name("my thread".to_string())
354 /// .spawn(|_| assert_eq!(current().name(), Some("my thread")))
355 /// .unwrap();
356 /// }).unwrap();
357 /// ```
358 ///
359 /// [naming-threads]: std::thread#naming-threads
360 pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
361 self.builder = self.builder.name(name);
362 self
363 }
364
365 /// Sets the size of the stack for the new thread.
366 ///
367 /// The stack size is measured in bytes.
368 ///
369 /// For more information about the stack size for threads, see [here][stack-size].
370 ///
371 /// # Examples
372 ///
373 /// ```
374 /// use crossbeam_utils::thread;
375 ///
376 /// thread::scope(|s| {
377 /// s.builder()
378 /// .stack_size(32 * 1024)
379 /// .spawn(|_| println!("Running a child thread"))
380 /// .unwrap();
381 /// }).unwrap();
382 /// ```
383 ///
384 /// [stack-size]: std::thread#stack-size
385 pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
386 self.builder = self.builder.stack_size(size);
387 self
388 }
389
390 /// Spawns a scoped thread with this configuration.
391 ///
392 /// The scoped thread is passed a reference to this scope as an argument, which can be used for
393 /// spawning nested threads.
394 ///
395 /// The returned handle can be used to manually join the thread before the scope exits.
396 ///
397 /// # Errors
398 ///
399 /// Unlike the [`Scope::spawn`] method, this method yields an
400 /// [`io::Result`] to capture any failure to create the thread at
401 /// the OS level.
402 ///
403 /// [`io::Result`]: std::io::Result
404 ///
405 /// # Panics
406 ///
407 /// Panics if a thread name was set and it contained null bytes.
408 ///
409 /// # Examples
410 ///
411 /// ```
412 /// use crossbeam_utils::thread;
413 ///
414 /// thread::scope(|s| {
415 /// let handle = s.builder()
416 /// .spawn(|_| {
417 /// println!("A child thread is running");
418 /// 42
419 /// })
420 /// .unwrap();
421 ///
422 /// // Join the thread and retrieve its result.
423 /// let res = handle.join().unwrap();
424 /// assert_eq!(res, 42);
425 /// }).unwrap();
426 /// ```
427 pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
428 where
429 F: FnOnce(&Scope<'env>) -> T,
430 F: Send + 'env,
431 T: Send + 'env,
432 {
433 // The result of `f` will be stored here.
434 let result = SharedOption::default();
435
436 // Spawn the thread and grab its join handle and thread handle.
437 let (handle, thread) = {
438 let result = Arc::clone(&result);
439
440 // A clone of the scope that will be moved into the new thread.
441 let scope = Scope::<'env> {
442 handles: Arc::clone(&self.scope.handles),
443 wait_group: self.scope.wait_group.clone(),
444 _marker: PhantomData,
445 };
446
447 // Spawn the thread.
448 let handle = {
449 let closure = move || {
450 // Make sure the scope is inside the closure with the proper `'env` lifetime.
451 let scope: Scope<'env> = scope;
452
453 // Run the closure.
454 let res = f(&scope);
455
456 // Store the result if the closure didn't panic.
457 *result.lock().unwrap() = Some(res);
458 };
459
460 // Allocate `closure` on the heap and erase the `'env` bound.
461 let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
462 let closure: Box<dyn FnOnce() + Send + 'static> =
463 unsafe { mem::transmute(closure) };
464
465 // Finally, spawn the closure.
466 self.builder.spawn(closure)?
467 };
468
469 let thread = handle.thread().clone();
470 let handle = Arc::new(Mutex::new(Some(handle)));
471 (handle, thread)
472 };
473
474 // Add the handle to the shared list of join handles.
475 self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
476
477 Ok(ScopedJoinHandle {
478 handle,
479 result,
480 thread,
481 _marker: PhantomData,
482 })
483 }
484}
485
486unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
487unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
488
489/// A handle that can be used to join its scoped thread.
490///
491/// This struct is created by the [`Scope::spawn`] method and the
492/// [`ScopedThreadBuilder::spawn`] method.
493pub struct ScopedJoinHandle<'scope, T> {
494 /// A join handle to the spawned thread.
495 handle: SharedOption<thread::JoinHandle<()>>,
496
497 /// Holds the result of the inner closure.
498 result: SharedOption<T>,
499
500 /// A handle to the the spawned thread.
501 thread: thread::Thread,
502
503 /// Borrows the parent scope with lifetime `'scope`.
504 _marker: PhantomData<&'scope ()>,
505}
506
507impl<T> ScopedJoinHandle<'_, T> {
508 /// Waits for the thread to finish and returns its result.
509 ///
510 /// If the child thread panics, an error is returned. Note that if panics are implemented by
511 /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
512 ///
513 /// # Panics
514 ///
515 /// This function may panic on some platforms if a thread attempts to join itself or otherwise
516 /// may create a deadlock with joining threads.
517 ///
518 /// # Examples
519 ///
520 /// ```
521 /// use crossbeam_utils::thread;
522 ///
523 /// thread::scope(|s| {
524 /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
525 /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
526 ///
527 /// // Join the first thread and verify that it succeeded.
528 /// let res = handle1.join();
529 /// assert!(res.is_ok());
530 ///
531 /// // Join the second thread and verify that it panicked.
532 /// let res = handle2.join();
533 /// assert!(res.is_err());
534 /// }).unwrap();
535 /// ```
536 pub fn join(self) -> thread::Result<T> {
537 // Take out the handle. The handle will surely be available because the root scope waits
538 // for nested scopes before joining remaining threads.
539 let handle = self.handle.lock().unwrap().take().unwrap();
540
541 // Join the thread and then take the result out of its inner closure.
542 handle
543 .join()
544 .map(|()| self.result.lock().unwrap().take().unwrap())
545 }
546
547 /// Returns a handle to the underlying thread.
548 ///
549 /// # Examples
550 ///
551 /// ```
552 /// use crossbeam_utils::thread;
553 ///
554 /// thread::scope(|s| {
555 /// let handle = s.spawn(|_| println!("A child thread is running"));
556 /// println!("The child thread ID: {:?}", handle.thread().id());
557 /// }).unwrap();
558 /// ```
559 pub fn thread(&self) -> &thread::Thread {
560 &self.thread
561 }
562}
563
564/// Unix-specific extensions.
565#[cfg(unix)]
566mod unix {
567 use super::ScopedJoinHandle;
568 use std::os::unix::thread::{JoinHandleExt, RawPthread};
569
570 impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
571 fn as_pthread_t(&self) -> RawPthread {
572 // Borrow the handle. The handle will surely be available because the root scope waits
573 // for nested scopes before joining remaining threads.
574 let handle: MutexGuard<'_, Option>> = self.handle.lock().unwrap();
575 handle.as_ref().unwrap().as_pthread_t()
576 }
577 fn into_pthread_t(self) -> RawPthread {
578 self.as_pthread_t()
579 }
580 }
581}
582/// Windows-specific extensions.
583#[cfg(windows)]
584mod windows {
585 use super::ScopedJoinHandle;
586 use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
587
588 impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
589 fn as_raw_handle(&self) -> RawHandle {
590 // Borrow the handle. The handle will surely be available because the root scope waits
591 // for nested scopes before joining remaining threads.
592 let handle = self.handle.lock().unwrap();
593 handle.as_ref().unwrap().as_raw_handle()
594 }
595 }
596
597 impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
598 fn into_raw_handle(self) -> RawHandle {
599 self.as_raw_handle()
600 }
601 }
602}
603
604impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
605 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
606 f.pad("ScopedJoinHandle { .. }")
607 }
608}
609