1//! Threads that can borrow variables from the stack.
2//!
3//! Create a scope when spawned threads need to access variables on the stack:
4//!
5//! ```
6//! use crossbeam_utils::thread;
7//!
8//! let people = vec![
9//! "Alice".to_string(),
10//! "Bob".to_string(),
11//! "Carol".to_string(),
12//! ];
13//!
14//! thread::scope(|s| {
15//! for person in &people {
16//! s.spawn(move |_| {
17//! println!("Hello, {}!", person);
18//! });
19//! }
20//! }).unwrap();
21//! ```
22//!
23//! # Why scoped threads?
24//!
25//! Suppose we wanted to re-write the previous example using plain threads:
26//!
27//! ```compile_fail,E0597
28//! use std::thread;
29//!
30//! let people = vec![
31//! "Alice".to_string(),
32//! "Bob".to_string(),
33//! "Carol".to_string(),
34//! ];
35//!
36//! let mut threads = Vec::new();
37//!
38//! for person in &people {
39//! threads.push(thread::spawn(move || {
40//! println!("Hello, {}!", person);
41//! }));
42//! }
43//!
44//! for thread in threads {
45//! thread.join().unwrap();
46//! }
47//! ```
48//!
49//! This doesn't work because the borrow checker complains about `people` not living long enough:
50//!
51//! ```text
52//! error[E0597]: `people` does not live long enough
53//! --> src/main.rs:12:20
54//! |
55//! 12 | for person in &people {
56//! | ^^^^^^ borrowed value does not live long enough
57//! ...
58//! 21 | }
59//! | - borrowed value only lives until here
60//! |
61//! = note: borrowed value must be valid for the static lifetime...
62//! ```
63//!
64//! The problem here is that spawned threads are not allowed to borrow variables on stack because
65//! the compiler cannot prove they will be joined before `people` is destroyed.
66//!
67//! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined
68//! before the scope ends.
69//!
70//! # How scoped threads work
71//!
72//! If a variable is borrowed by a thread, the thread must complete before the variable is
73//! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the
74//! `'static` lifetime because the borrow checker cannot be sure when the thread will complete.
75//!
76//! A scope creates a clear boundary between variables outside the scope and threads inside the
77//! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends.
78//! This way we guarantee to the borrow checker that scoped threads only live within the scope and
79//! can safely access variables outside it.
80//!
81//! # Nesting scoped threads
82//!
83//! Sometimes scoped threads need to spawn more threads within the same scope. This is a little
84//! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such
85//! cannot be borrowed by scoped threads:
86//!
87//! ```compile_fail,E0373,E0521
88//! use crossbeam_utils::thread;
89//!
90//! thread::scope(|s| {
91//! s.spawn(|_| {
92//! // Not going to compile because we're trying to borrow `s`,
93//! // which lives *inside* the scope! :(
94//! s.spawn(|_| println!("nested thread"));
95//! });
96//! });
97//! ```
98//!
99//! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an
100//! argument, which can be used for spawning nested threads:
101//!
102//! ```
103//! use crossbeam_utils::thread;
104//!
105//! thread::scope(|s| {
106//! // Note the `|s|` here.
107//! s.spawn(|s| {
108//! // Yay, this works because we're using a fresh argument `s`! :)
109//! s.spawn(|_| println!("nested thread"));
110//! });
111//! }).unwrap();
112//! ```
113
114use std::fmt;
115use std::io;
116use std::marker::PhantomData;
117use std::mem;
118use std::panic;
119use std::sync::{Arc, Mutex};
120use std::thread;
121
122use crate::sync::WaitGroup;
123use cfg_if::cfg_if;
124
125type SharedVec<T> = Arc<Mutex<Vec<T>>>;
126type SharedOption<T> = Arc<Mutex<Option<T>>>;
127
128/// Creates a new scope for spawning threads.
129///
130/// All child threads that haven't been manually joined will be automatically joined just before
131/// this function invocation ends. If all joined threads have successfully completed, `Ok` is
132/// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is
133/// returned containing errors from panicked threads. Note that if panics are implemented by
134/// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
135///
136/// **Note:** Since Rust 1.63, this function is soft-deprecated in favor of the more efficient [`std::thread::scope`].
137///
138/// # Examples
139///
140/// ```
141/// use crossbeam_utils::thread;
142///
143/// let var = vec![1, 2, 3];
144///
145/// thread::scope(|s| {
146/// s.spawn(|_| {
147/// println!("A child thread borrowing `var`: {:?}", var);
148/// });
149/// }).unwrap();
150/// ```
151pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
152where
153 F: FnOnce(&Scope<'env>) -> R,
154{
155 let wg = WaitGroup::new();
156 let scope = Scope::<'env> {
157 handles: SharedVec::default(),
158 wait_group: wg.clone(),
159 _marker: PhantomData,
160 };
161
162 // Execute the scoped function, but catch any panics.
163 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
164
165 // Wait until all nested scopes are dropped.
166 drop(scope.wait_group);
167 wg.wait();
168
169 // Join all remaining spawned threads.
170 let panics: Vec<_> = scope
171 .handles
172 .lock()
173 .unwrap()
174 // Filter handles that haven't been joined, join them, and collect errors.
175 .drain(..)
176 .filter_map(|handle| handle.lock().unwrap().take())
177 .filter_map(|handle| handle.join().err())
178 .collect();
179
180 // If `f` has panicked, resume unwinding.
181 // If any of the child threads have panicked, return the panic errors.
182 // Otherwise, everything is OK and return the result of `f`.
183 match result {
184 Err(err) => panic::resume_unwind(err),
185 Ok(res) => {
186 if panics.is_empty() {
187 Ok(res)
188 } else {
189 Err(Box::new(panics))
190 }
191 }
192 }
193}
194
195/// A scope for spawning threads.
196pub struct Scope<'env> {
197 /// The list of the thread join handles.
198 handles: SharedVec<SharedOption<thread::JoinHandle<()>>>,
199
200 /// Used to wait until all subscopes all dropped.
201 wait_group: WaitGroup,
202
203 /// Borrows data with invariant lifetime `'env`.
204 _marker: PhantomData<&'env mut &'env ()>,
205}
206
207unsafe impl Sync for Scope<'_> {}
208
209impl<'env> Scope<'env> {
210 /// Spawns a scoped thread.
211 ///
212 /// This method is similar to the [`spawn`] function in Rust's standard library. The difference
213 /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits,
214 /// allowing it to reference variables outside the scope.
215 ///
216 /// The scoped thread is passed a reference to this scope as an argument, which can be used for
217 /// spawning nested threads.
218 ///
219 /// The returned [handle](ScopedJoinHandle) can be used to manually
220 /// [join](ScopedJoinHandle::join) the thread before the scope exits.
221 ///
222 /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the
223 /// stack size or the name of the thread, use this API instead.
224 ///
225 /// [`spawn`]: std::thread::spawn
226 ///
227 /// # Panics
228 ///
229 /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`]
230 /// to recover from such errors.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use crossbeam_utils::thread;
236 ///
237 /// thread::scope(|s| {
238 /// let handle = s.spawn(|_| {
239 /// println!("A child thread is running");
240 /// 42
241 /// });
242 ///
243 /// // Join the thread and retrieve its result.
244 /// let res = handle.join().unwrap();
245 /// assert_eq!(res, 42);
246 /// }).unwrap();
247 /// ```
248 pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T>
249 where
250 F: FnOnce(&Scope<'env>) -> T,
251 F: Send + 'env,
252 T: Send + 'env,
253 {
254 self.builder()
255 .spawn(f)
256 .expect("failed to spawn scoped thread")
257 }
258
259 /// Creates a builder that can configure a thread before spawning.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// use crossbeam_utils::thread;
265 ///
266 /// thread::scope(|s| {
267 /// s.builder()
268 /// .spawn(|_| println!("A child thread is running"))
269 /// .unwrap();
270 /// }).unwrap();
271 /// ```
272 pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> {
273 ScopedThreadBuilder {
274 scope: self,
275 builder: thread::Builder::new(),
276 }
277 }
278}
279
280impl fmt::Debug for Scope<'_> {
281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282 f.pad("Scope { .. }")
283 }
284}
285
286/// Configures the properties of a new thread.
287///
288/// The two configurable properties are:
289///
290/// - [`name`]: Specifies an [associated name for the thread][naming-threads].
291/// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size].
292///
293/// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the
294/// thread handle with the given configuration.
295///
296/// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return
297/// value. You may want to use this builder when you want to recover from a failure to launch a
298/// thread.
299///
300/// # Examples
301///
302/// ```
303/// use crossbeam_utils::thread;
304///
305/// thread::scope(|s| {
306/// s.builder()
307/// .spawn(|_| println!("Running a child thread"))
308/// .unwrap();
309/// }).unwrap();
310/// ```
311///
312/// [`name`]: ScopedThreadBuilder::name
313/// [`stack_size`]: ScopedThreadBuilder::stack_size
314/// [`spawn`]: ScopedThreadBuilder::spawn
315/// [`io::Result`]: std::io::Result
316/// [naming-threads]: std::thread#naming-threads
317/// [stack-size]: std::thread#stack-size
318#[derive(Debug)]
319pub struct ScopedThreadBuilder<'scope, 'env> {
320 scope: &'scope Scope<'env>,
321 builder: thread::Builder,
322}
323
324impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> {
325 /// Sets the name for the new thread.
326 ///
327 /// The name must not contain null bytes (`\0`).
328 ///
329 /// For more information about named threads, see [here][naming-threads].
330 ///
331 /// # Examples
332 ///
333 /// ```
334 /// use crossbeam_utils::thread;
335 /// use std::thread::current;
336 ///
337 /// thread::scope(|s| {
338 /// s.builder()
339 /// .name("my thread".to_string())
340 /// .spawn(|_| assert_eq!(current().name(), Some("my thread")))
341 /// .unwrap();
342 /// }).unwrap();
343 /// ```
344 ///
345 /// [naming-threads]: std::thread#naming-threads
346 pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> {
347 self.builder = self.builder.name(name);
348 self
349 }
350
351 /// Sets the size of the stack for the new thread.
352 ///
353 /// The stack size is measured in bytes.
354 ///
355 /// For more information about the stack size for threads, see [here][stack-size].
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// use crossbeam_utils::thread;
361 ///
362 /// thread::scope(|s| {
363 /// s.builder()
364 /// .stack_size(32 * 1024)
365 /// .spawn(|_| println!("Running a child thread"))
366 /// .unwrap();
367 /// }).unwrap();
368 /// ```
369 ///
370 /// [stack-size]: std::thread#stack-size
371 pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> {
372 self.builder = self.builder.stack_size(size);
373 self
374 }
375
376 /// Spawns a scoped thread with this configuration.
377 ///
378 /// The scoped thread is passed a reference to this scope as an argument, which can be used for
379 /// spawning nested threads.
380 ///
381 /// The returned handle can be used to manually join the thread before the scope exits.
382 ///
383 /// # Errors
384 ///
385 /// Unlike the [`Scope::spawn`] method, this method yields an
386 /// [`io::Result`] to capture any failure to create the thread at
387 /// the OS level.
388 ///
389 /// [`io::Result`]: std::io::Result
390 ///
391 /// # Panics
392 ///
393 /// Panics if a thread name was set and it contained null bytes.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// use crossbeam_utils::thread;
399 ///
400 /// thread::scope(|s| {
401 /// let handle = s.builder()
402 /// .spawn(|_| {
403 /// println!("A child thread is running");
404 /// 42
405 /// })
406 /// .unwrap();
407 ///
408 /// // Join the thread and retrieve its result.
409 /// let res = handle.join().unwrap();
410 /// assert_eq!(res, 42);
411 /// }).unwrap();
412 /// ```
413 pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>>
414 where
415 F: FnOnce(&Scope<'env>) -> T,
416 F: Send + 'env,
417 T: Send + 'env,
418 {
419 // The result of `f` will be stored here.
420 let result = SharedOption::default();
421
422 // Spawn the thread and grab its join handle and thread handle.
423 let (handle, thread) = {
424 let result = Arc::clone(&result);
425
426 // A clone of the scope that will be moved into the new thread.
427 let scope = Scope::<'env> {
428 handles: Arc::clone(&self.scope.handles),
429 wait_group: self.scope.wait_group.clone(),
430 _marker: PhantomData,
431 };
432
433 // Spawn the thread.
434 let handle = {
435 let closure = move || {
436 // Make sure the scope is inside the closure with the proper `'env` lifetime.
437 let scope: Scope<'env> = scope;
438
439 // Run the closure.
440 let res = f(&scope);
441
442 // Store the result if the closure didn't panic.
443 *result.lock().unwrap() = Some(res);
444 };
445
446 // Allocate `closure` on the heap and erase the `'env` bound.
447 let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure);
448 let closure: Box<dyn FnOnce() + Send + 'static> =
449 unsafe { mem::transmute(closure) };
450
451 // Finally, spawn the closure.
452 self.builder.spawn(closure)?
453 };
454
455 let thread = handle.thread().clone();
456 let handle = Arc::new(Mutex::new(Some(handle)));
457 (handle, thread)
458 };
459
460 // Add the handle to the shared list of join handles.
461 self.scope.handles.lock().unwrap().push(Arc::clone(&handle));
462
463 Ok(ScopedJoinHandle {
464 handle,
465 result,
466 thread,
467 _marker: PhantomData,
468 })
469 }
470}
471
472unsafe impl<T> Send for ScopedJoinHandle<'_, T> {}
473unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {}
474
475/// A handle that can be used to join its scoped thread.
476///
477/// This struct is created by the [`Scope::spawn`] method and the
478/// [`ScopedThreadBuilder::spawn`] method.
479pub struct ScopedJoinHandle<'scope, T> {
480 /// A join handle to the spawned thread.
481 handle: SharedOption<thread::JoinHandle<()>>,
482
483 /// Holds the result of the inner closure.
484 result: SharedOption<T>,
485
486 /// A handle to the the spawned thread.
487 thread: thread::Thread,
488
489 /// Borrows the parent scope with lifetime `'scope`.
490 _marker: PhantomData<&'scope ()>,
491}
492
493impl<T> ScopedJoinHandle<'_, T> {
494 /// Waits for the thread to finish and returns its result.
495 ///
496 /// If the child thread panics, an error is returned. Note that if panics are implemented by
497 /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind].
498 ///
499 /// # Panics
500 ///
501 /// This function may panic on some platforms if a thread attempts to join itself or otherwise
502 /// may create a deadlock with joining threads.
503 ///
504 /// # Examples
505 ///
506 /// ```
507 /// use crossbeam_utils::thread;
508 ///
509 /// thread::scope(|s| {
510 /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)"));
511 /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :("));
512 ///
513 /// // Join the first thread and verify that it succeeded.
514 /// let res = handle1.join();
515 /// assert!(res.is_ok());
516 ///
517 /// // Join the second thread and verify that it panicked.
518 /// let res = handle2.join();
519 /// assert!(res.is_err());
520 /// }).unwrap();
521 /// ```
522 pub fn join(self) -> thread::Result<T> {
523 // Take out the handle. The handle will surely be available because the root scope waits
524 // for nested scopes before joining remaining threads.
525 let handle = self.handle.lock().unwrap().take().unwrap();
526
527 // Join the thread and then take the result out of its inner closure.
528 handle
529 .join()
530 .map(|()| self.result.lock().unwrap().take().unwrap())
531 }
532
533 /// Returns a handle to the underlying thread.
534 ///
535 /// # Examples
536 ///
537 /// ```
538 /// use crossbeam_utils::thread;
539 ///
540 /// thread::scope(|s| {
541 /// let handle = s.spawn(|_| println!("A child thread is running"));
542 /// println!("The child thread ID: {:?}", handle.thread().id());
543 /// }).unwrap();
544 /// ```
545 pub fn thread(&self) -> &thread::Thread {
546 &self.thread
547 }
548}
549
550cfg_if! {
551 if #[cfg(unix)] {
552 use std::os::unix::thread::{JoinHandleExt, RawPthread};
553
554 impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> {
555 fn as_pthread_t(&self) -> RawPthread {
556 // Borrow the handle. The handle will surely be available because the root scope waits
557 // for nested scopes before joining remaining threads.
558 let handle = self.handle.lock().unwrap();
559 handle.as_ref().unwrap().as_pthread_t()
560 }
561 fn into_pthread_t(self) -> RawPthread {
562 self.as_pthread_t()
563 }
564 }
565 } else if #[cfg(windows)] {
566 use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle};
567
568 impl<T> AsRawHandle for ScopedJoinHandle<'_, T> {
569 fn as_raw_handle(&self) -> RawHandle {
570 // Borrow the handle. The handle will surely be available because the root scope waits
571 // for nested scopes before joining remaining threads.
572 let handle = self.handle.lock().unwrap();
573 handle.as_ref().unwrap().as_raw_handle()
574 }
575 }
576
577 impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> {
578 fn into_raw_handle(self) -> RawHandle {
579 self.as_raw_handle()
580 }
581 }
582 }
583}
584
585impl<T> fmt::Debug for ScopedJoinHandle<'_, T> {
586 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
587 f.pad("ScopedJoinHandle { .. }")
588 }
589}
590