1 | //! Threads that can borrow variables from the stack. |
2 | //! |
3 | //! Create a scope when spawned threads need to access variables on the stack: |
4 | //! |
5 | //! ``` |
6 | //! use crossbeam_utils::thread; |
7 | //! |
8 | //! let people = vec![ |
9 | //! "Alice" .to_string(), |
10 | //! "Bob" .to_string(), |
11 | //! "Carol" .to_string(), |
12 | //! ]; |
13 | //! |
14 | //! thread::scope(|s| { |
15 | //! for person in &people { |
16 | //! s.spawn(move |_| { |
17 | //! println!("Hello, {}!" , person); |
18 | //! }); |
19 | //! } |
20 | //! }).unwrap(); |
21 | //! ``` |
22 | //! |
23 | //! # Why scoped threads? |
24 | //! |
25 | //! Suppose we wanted to re-write the previous example using plain threads: |
26 | //! |
27 | //! ```compile_fail,E0597 |
28 | //! use std::thread; |
29 | //! |
30 | //! let people = vec![ |
31 | //! "Alice" .to_string(), |
32 | //! "Bob" .to_string(), |
33 | //! "Carol" .to_string(), |
34 | //! ]; |
35 | //! |
36 | //! let mut threads = Vec::new(); |
37 | //! |
38 | //! for person in &people { |
39 | //! threads.push(thread::spawn(move || { |
40 | //! println!("Hello, {}!" , person); |
41 | //! })); |
42 | //! } |
43 | //! |
44 | //! for thread in threads { |
45 | //! thread.join().unwrap(); |
46 | //! } |
47 | //! ``` |
48 | //! |
49 | //! This doesn't work because the borrow checker complains about `people` not living long enough: |
50 | //! |
51 | //! ```text |
52 | //! error[E0597]: `people` does not live long enough |
53 | //! --> src/main.rs:12:20 |
54 | //! | |
55 | //! 12 | for person in &people { |
56 | //! | ^^^^^^ borrowed value does not live long enough |
57 | //! ... |
58 | //! 21 | } |
59 | //! | - borrowed value only lives until here |
60 | //! | |
61 | //! = note: borrowed value must be valid for the static lifetime... |
62 | //! ``` |
63 | //! |
64 | //! The problem here is that spawned threads are not allowed to borrow variables on stack because |
65 | //! the compiler cannot prove they will be joined before `people` is destroyed. |
66 | //! |
67 | //! Scoped threads are a mechanism to guarantee to the compiler that spawned threads will be joined |
68 | //! before the scope ends. |
69 | //! |
70 | //! # How scoped threads work |
71 | //! |
72 | //! If a variable is borrowed by a thread, the thread must complete before the variable is |
73 | //! destroyed. Threads spawned using [`std::thread::spawn`] can only borrow variables with the |
74 | //! `'static` lifetime because the borrow checker cannot be sure when the thread will complete. |
75 | //! |
76 | //! A scope creates a clear boundary between variables outside the scope and threads inside the |
77 | //! scope. Whenever a scope spawns a thread, it promises to join the thread before the scope ends. |
78 | //! This way we guarantee to the borrow checker that scoped threads only live within the scope and |
79 | //! can safely access variables outside it. |
80 | //! |
81 | //! # Nesting scoped threads |
82 | //! |
83 | //! Sometimes scoped threads need to spawn more threads within the same scope. This is a little |
84 | //! tricky because argument `s` lives *inside* the invocation of `thread::scope()` and as such |
85 | //! cannot be borrowed by scoped threads: |
86 | //! |
87 | //! ```compile_fail,E0373,E0521 |
88 | //! use crossbeam_utils::thread; |
89 | //! |
90 | //! thread::scope(|s| { |
91 | //! s.spawn(|_| { |
92 | //! // Not going to compile because we're trying to borrow `s`, |
93 | //! // which lives *inside* the scope! :( |
94 | //! s.spawn(|_| println!("nested thread" )); |
95 | //! }); |
96 | //! }); |
97 | //! ``` |
98 | //! |
99 | //! Fortunately, there is a solution. Every scoped thread is passed a reference to its scope as an |
100 | //! argument, which can be used for spawning nested threads: |
101 | //! |
102 | //! ``` |
103 | //! use crossbeam_utils::thread; |
104 | //! |
105 | //! thread::scope(|s| { |
106 | //! // Note the `|s|` here. |
107 | //! s.spawn(|s| { |
108 | //! // Yay, this works because we're using a fresh argument `s`! :) |
109 | //! s.spawn(|_| println!("nested thread" )); |
110 | //! }); |
111 | //! }).unwrap(); |
112 | //! ``` |
113 | |
114 | use std::fmt; |
115 | use std::io; |
116 | use std::marker::PhantomData; |
117 | use std::mem; |
118 | use std::panic; |
119 | use std::sync::{Arc, Mutex}; |
120 | use std::thread; |
121 | |
122 | use crate::sync::WaitGroup; |
123 | use cfg_if::cfg_if; |
124 | |
125 | type SharedVec<T> = Arc<Mutex<Vec<T>>>; |
126 | type SharedOption<T> = Arc<Mutex<Option<T>>>; |
127 | |
128 | /// Creates a new scope for spawning threads. |
129 | /// |
130 | /// All child threads that haven't been manually joined will be automatically joined just before |
131 | /// this function invocation ends. If all joined threads have successfully completed, `Ok` is |
132 | /// returned with the return value of `f`. If any of the joined threads has panicked, an `Err` is |
133 | /// returned containing errors from panicked threads. Note that if panics are implemented by |
134 | /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. |
135 | /// |
136 | /// **Note:** Since Rust 1.63, this function is soft-deprecated in favor of the more efficient [`std::thread::scope`]. |
137 | /// |
138 | /// # Examples |
139 | /// |
140 | /// ``` |
141 | /// use crossbeam_utils::thread; |
142 | /// |
143 | /// let var = vec![1, 2, 3]; |
144 | /// |
145 | /// thread::scope(|s| { |
146 | /// s.spawn(|_| { |
147 | /// println!("A child thread borrowing `var`: {:?}" , var); |
148 | /// }); |
149 | /// }).unwrap(); |
150 | /// ``` |
151 | pub fn scope<'env, F, R>(f: F) -> thread::Result<R> |
152 | where |
153 | F: FnOnce(&Scope<'env>) -> R, |
154 | { |
155 | let wg = WaitGroup::new(); |
156 | let scope = Scope::<'env> { |
157 | handles: SharedVec::default(), |
158 | wait_group: wg.clone(), |
159 | _marker: PhantomData, |
160 | }; |
161 | |
162 | // Execute the scoped function, but catch any panics. |
163 | let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope))); |
164 | |
165 | // Wait until all nested scopes are dropped. |
166 | drop(scope.wait_group); |
167 | wg.wait(); |
168 | |
169 | // Join all remaining spawned threads. |
170 | let panics: Vec<_> = scope |
171 | .handles |
172 | .lock() |
173 | .unwrap() |
174 | // Filter handles that haven't been joined, join them, and collect errors. |
175 | .drain(..) |
176 | .filter_map(|handle| handle.lock().unwrap().take()) |
177 | .filter_map(|handle| handle.join().err()) |
178 | .collect(); |
179 | |
180 | // If `f` has panicked, resume unwinding. |
181 | // If any of the child threads have panicked, return the panic errors. |
182 | // Otherwise, everything is OK and return the result of `f`. |
183 | match result { |
184 | Err(err) => panic::resume_unwind(err), |
185 | Ok(res) => { |
186 | if panics.is_empty() { |
187 | Ok(res) |
188 | } else { |
189 | Err(Box::new(panics)) |
190 | } |
191 | } |
192 | } |
193 | } |
194 | |
195 | /// A scope for spawning threads. |
196 | pub struct Scope<'env> { |
197 | /// The list of the thread join handles. |
198 | handles: SharedVec<SharedOption<thread::JoinHandle<()>>>, |
199 | |
200 | /// Used to wait until all subscopes all dropped. |
201 | wait_group: WaitGroup, |
202 | |
203 | /// Borrows data with invariant lifetime `'env`. |
204 | _marker: PhantomData<&'env mut &'env ()>, |
205 | } |
206 | |
207 | unsafe impl Sync for Scope<'_> {} |
208 | |
209 | impl<'env> Scope<'env> { |
210 | /// Spawns a scoped thread. |
211 | /// |
212 | /// This method is similar to the [`spawn`] function in Rust's standard library. The difference |
213 | /// is that this thread is scoped, meaning it's guaranteed to terminate before the scope exits, |
214 | /// allowing it to reference variables outside the scope. |
215 | /// |
216 | /// The scoped thread is passed a reference to this scope as an argument, which can be used for |
217 | /// spawning nested threads. |
218 | /// |
219 | /// The returned [handle](ScopedJoinHandle) can be used to manually |
220 | /// [join](ScopedJoinHandle::join) the thread before the scope exits. |
221 | /// |
222 | /// This will create a thread using default parameters of [`ScopedThreadBuilder`], if you want to specify the |
223 | /// stack size or the name of the thread, use this API instead. |
224 | /// |
225 | /// [`spawn`]: std::thread::spawn |
226 | /// |
227 | /// # Panics |
228 | /// |
229 | /// Panics if the OS fails to create a thread; use [`ScopedThreadBuilder::spawn`] |
230 | /// to recover from such errors. |
231 | /// |
232 | /// # Examples |
233 | /// |
234 | /// ``` |
235 | /// use crossbeam_utils::thread; |
236 | /// |
237 | /// thread::scope(|s| { |
238 | /// let handle = s.spawn(|_| { |
239 | /// println!("A child thread is running" ); |
240 | /// 42 |
241 | /// }); |
242 | /// |
243 | /// // Join the thread and retrieve its result. |
244 | /// let res = handle.join().unwrap(); |
245 | /// assert_eq!(res, 42); |
246 | /// }).unwrap(); |
247 | /// ``` |
248 | pub fn spawn<'scope, F, T>(&'scope self, f: F) -> ScopedJoinHandle<'scope, T> |
249 | where |
250 | F: FnOnce(&Scope<'env>) -> T, |
251 | F: Send + 'env, |
252 | T: Send + 'env, |
253 | { |
254 | self.builder() |
255 | .spawn(f) |
256 | .expect("failed to spawn scoped thread" ) |
257 | } |
258 | |
259 | /// Creates a builder that can configure a thread before spawning. |
260 | /// |
261 | /// # Examples |
262 | /// |
263 | /// ``` |
264 | /// use crossbeam_utils::thread; |
265 | /// |
266 | /// thread::scope(|s| { |
267 | /// s.builder() |
268 | /// .spawn(|_| println!("A child thread is running" )) |
269 | /// .unwrap(); |
270 | /// }).unwrap(); |
271 | /// ``` |
272 | pub fn builder<'scope>(&'scope self) -> ScopedThreadBuilder<'scope, 'env> { |
273 | ScopedThreadBuilder { |
274 | scope: self, |
275 | builder: thread::Builder::new(), |
276 | } |
277 | } |
278 | } |
279 | |
280 | impl fmt::Debug for Scope<'_> { |
281 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
282 | f.pad("Scope { .. }" ) |
283 | } |
284 | } |
285 | |
286 | /// Configures the properties of a new thread. |
287 | /// |
288 | /// The two configurable properties are: |
289 | /// |
290 | /// - [`name`]: Specifies an [associated name for the thread][naming-threads]. |
291 | /// - [`stack_size`]: Specifies the [desired stack size for the thread][stack-size]. |
292 | /// |
293 | /// The [`spawn`] method will take ownership of the builder and return an [`io::Result`] of the |
294 | /// thread handle with the given configuration. |
295 | /// |
296 | /// The [`Scope::spawn`] method uses a builder with default configuration and unwraps its return |
297 | /// value. You may want to use this builder when you want to recover from a failure to launch a |
298 | /// thread. |
299 | /// |
300 | /// # Examples |
301 | /// |
302 | /// ``` |
303 | /// use crossbeam_utils::thread; |
304 | /// |
305 | /// thread::scope(|s| { |
306 | /// s.builder() |
307 | /// .spawn(|_| println!("Running a child thread" )) |
308 | /// .unwrap(); |
309 | /// }).unwrap(); |
310 | /// ``` |
311 | /// |
312 | /// [`name`]: ScopedThreadBuilder::name |
313 | /// [`stack_size`]: ScopedThreadBuilder::stack_size |
314 | /// [`spawn`]: ScopedThreadBuilder::spawn |
315 | /// [`io::Result`]: std::io::Result |
316 | /// [naming-threads]: std::thread#naming-threads |
317 | /// [stack-size]: std::thread#stack-size |
318 | #[derive (Debug)] |
319 | pub struct ScopedThreadBuilder<'scope, 'env> { |
320 | scope: &'scope Scope<'env>, |
321 | builder: thread::Builder, |
322 | } |
323 | |
324 | impl<'scope, 'env> ScopedThreadBuilder<'scope, 'env> { |
325 | /// Sets the name for the new thread. |
326 | /// |
327 | /// The name must not contain null bytes (`\0`). |
328 | /// |
329 | /// For more information about named threads, see [here][naming-threads]. |
330 | /// |
331 | /// # Examples |
332 | /// |
333 | /// ``` |
334 | /// use crossbeam_utils::thread; |
335 | /// use std::thread::current; |
336 | /// |
337 | /// thread::scope(|s| { |
338 | /// s.builder() |
339 | /// .name("my thread" .to_string()) |
340 | /// .spawn(|_| assert_eq!(current().name(), Some("my thread" ))) |
341 | /// .unwrap(); |
342 | /// }).unwrap(); |
343 | /// ``` |
344 | /// |
345 | /// [naming-threads]: std::thread#naming-threads |
346 | pub fn name(mut self, name: String) -> ScopedThreadBuilder<'scope, 'env> { |
347 | self.builder = self.builder.name(name); |
348 | self |
349 | } |
350 | |
351 | /// Sets the size of the stack for the new thread. |
352 | /// |
353 | /// The stack size is measured in bytes. |
354 | /// |
355 | /// For more information about the stack size for threads, see [here][stack-size]. |
356 | /// |
357 | /// # Examples |
358 | /// |
359 | /// ``` |
360 | /// use crossbeam_utils::thread; |
361 | /// |
362 | /// thread::scope(|s| { |
363 | /// s.builder() |
364 | /// .stack_size(32 * 1024) |
365 | /// .spawn(|_| println!("Running a child thread" )) |
366 | /// .unwrap(); |
367 | /// }).unwrap(); |
368 | /// ``` |
369 | /// |
370 | /// [stack-size]: std::thread#stack-size |
371 | pub fn stack_size(mut self, size: usize) -> ScopedThreadBuilder<'scope, 'env> { |
372 | self.builder = self.builder.stack_size(size); |
373 | self |
374 | } |
375 | |
376 | /// Spawns a scoped thread with this configuration. |
377 | /// |
378 | /// The scoped thread is passed a reference to this scope as an argument, which can be used for |
379 | /// spawning nested threads. |
380 | /// |
381 | /// The returned handle can be used to manually join the thread before the scope exits. |
382 | /// |
383 | /// # Errors |
384 | /// |
385 | /// Unlike the [`Scope::spawn`] method, this method yields an |
386 | /// [`io::Result`] to capture any failure to create the thread at |
387 | /// the OS level. |
388 | /// |
389 | /// [`io::Result`]: std::io::Result |
390 | /// |
391 | /// # Panics |
392 | /// |
393 | /// Panics if a thread name was set and it contained null bytes. |
394 | /// |
395 | /// # Examples |
396 | /// |
397 | /// ``` |
398 | /// use crossbeam_utils::thread; |
399 | /// |
400 | /// thread::scope(|s| { |
401 | /// let handle = s.builder() |
402 | /// .spawn(|_| { |
403 | /// println!("A child thread is running" ); |
404 | /// 42 |
405 | /// }) |
406 | /// .unwrap(); |
407 | /// |
408 | /// // Join the thread and retrieve its result. |
409 | /// let res = handle.join().unwrap(); |
410 | /// assert_eq!(res, 42); |
411 | /// }).unwrap(); |
412 | /// ``` |
413 | pub fn spawn<F, T>(self, f: F) -> io::Result<ScopedJoinHandle<'scope, T>> |
414 | where |
415 | F: FnOnce(&Scope<'env>) -> T, |
416 | F: Send + 'env, |
417 | T: Send + 'env, |
418 | { |
419 | // The result of `f` will be stored here. |
420 | let result = SharedOption::default(); |
421 | |
422 | // Spawn the thread and grab its join handle and thread handle. |
423 | let (handle, thread) = { |
424 | let result = Arc::clone(&result); |
425 | |
426 | // A clone of the scope that will be moved into the new thread. |
427 | let scope = Scope::<'env> { |
428 | handles: Arc::clone(&self.scope.handles), |
429 | wait_group: self.scope.wait_group.clone(), |
430 | _marker: PhantomData, |
431 | }; |
432 | |
433 | // Spawn the thread. |
434 | let handle = { |
435 | let closure = move || { |
436 | // Make sure the scope is inside the closure with the proper `'env` lifetime. |
437 | let scope: Scope<'env> = scope; |
438 | |
439 | // Run the closure. |
440 | let res = f(&scope); |
441 | |
442 | // Store the result if the closure didn't panic. |
443 | *result.lock().unwrap() = Some(res); |
444 | }; |
445 | |
446 | // Allocate `closure` on the heap and erase the `'env` bound. |
447 | let closure: Box<dyn FnOnce() + Send + 'env> = Box::new(closure); |
448 | let closure: Box<dyn FnOnce() + Send + 'static> = |
449 | unsafe { mem::transmute(closure) }; |
450 | |
451 | // Finally, spawn the closure. |
452 | self.builder.spawn(closure)? |
453 | }; |
454 | |
455 | let thread = handle.thread().clone(); |
456 | let handle = Arc::new(Mutex::new(Some(handle))); |
457 | (handle, thread) |
458 | }; |
459 | |
460 | // Add the handle to the shared list of join handles. |
461 | self.scope.handles.lock().unwrap().push(Arc::clone(&handle)); |
462 | |
463 | Ok(ScopedJoinHandle { |
464 | handle, |
465 | result, |
466 | thread, |
467 | _marker: PhantomData, |
468 | }) |
469 | } |
470 | } |
471 | |
472 | unsafe impl<T> Send for ScopedJoinHandle<'_, T> {} |
473 | unsafe impl<T> Sync for ScopedJoinHandle<'_, T> {} |
474 | |
475 | /// A handle that can be used to join its scoped thread. |
476 | /// |
477 | /// This struct is created by the [`Scope::spawn`] method and the |
478 | /// [`ScopedThreadBuilder::spawn`] method. |
479 | pub struct ScopedJoinHandle<'scope, T> { |
480 | /// A join handle to the spawned thread. |
481 | handle: SharedOption<thread::JoinHandle<()>>, |
482 | |
483 | /// Holds the result of the inner closure. |
484 | result: SharedOption<T>, |
485 | |
486 | /// A handle to the the spawned thread. |
487 | thread: thread::Thread, |
488 | |
489 | /// Borrows the parent scope with lifetime `'scope`. |
490 | _marker: PhantomData<&'scope ()>, |
491 | } |
492 | |
493 | impl<T> ScopedJoinHandle<'_, T> { |
494 | /// Waits for the thread to finish and returns its result. |
495 | /// |
496 | /// If the child thread panics, an error is returned. Note that if panics are implemented by |
497 | /// aborting the process, no error is returned; see the notes of [std::panic::catch_unwind]. |
498 | /// |
499 | /// # Panics |
500 | /// |
501 | /// This function may panic on some platforms if a thread attempts to join itself or otherwise |
502 | /// may create a deadlock with joining threads. |
503 | /// |
504 | /// # Examples |
505 | /// |
506 | /// ``` |
507 | /// use crossbeam_utils::thread; |
508 | /// |
509 | /// thread::scope(|s| { |
510 | /// let handle1 = s.spawn(|_| println!("I'm a happy thread :)" )); |
511 | /// let handle2 = s.spawn(|_| panic!("I'm a sad thread :(" )); |
512 | /// |
513 | /// // Join the first thread and verify that it succeeded. |
514 | /// let res = handle1.join(); |
515 | /// assert!(res.is_ok()); |
516 | /// |
517 | /// // Join the second thread and verify that it panicked. |
518 | /// let res = handle2.join(); |
519 | /// assert!(res.is_err()); |
520 | /// }).unwrap(); |
521 | /// ``` |
522 | pub fn join(self) -> thread::Result<T> { |
523 | // Take out the handle. The handle will surely be available because the root scope waits |
524 | // for nested scopes before joining remaining threads. |
525 | let handle = self.handle.lock().unwrap().take().unwrap(); |
526 | |
527 | // Join the thread and then take the result out of its inner closure. |
528 | handle |
529 | .join() |
530 | .map(|()| self.result.lock().unwrap().take().unwrap()) |
531 | } |
532 | |
533 | /// Returns a handle to the underlying thread. |
534 | /// |
535 | /// # Examples |
536 | /// |
537 | /// ``` |
538 | /// use crossbeam_utils::thread; |
539 | /// |
540 | /// thread::scope(|s| { |
541 | /// let handle = s.spawn(|_| println!("A child thread is running" )); |
542 | /// println!("The child thread ID: {:?}" , handle.thread().id()); |
543 | /// }).unwrap(); |
544 | /// ``` |
545 | pub fn thread(&self) -> &thread::Thread { |
546 | &self.thread |
547 | } |
548 | } |
549 | |
550 | cfg_if! { |
551 | if #[cfg(unix)] { |
552 | use std::os::unix::thread::{JoinHandleExt, RawPthread}; |
553 | |
554 | impl<T> JoinHandleExt for ScopedJoinHandle<'_, T> { |
555 | fn as_pthread_t(&self) -> RawPthread { |
556 | // Borrow the handle. The handle will surely be available because the root scope waits |
557 | // for nested scopes before joining remaining threads. |
558 | let handle = self.handle.lock().unwrap(); |
559 | handle.as_ref().unwrap().as_pthread_t() |
560 | } |
561 | fn into_pthread_t(self) -> RawPthread { |
562 | self.as_pthread_t() |
563 | } |
564 | } |
565 | } else if #[cfg(windows)] { |
566 | use std::os::windows::io::{AsRawHandle, IntoRawHandle, RawHandle}; |
567 | |
568 | impl<T> AsRawHandle for ScopedJoinHandle<'_, T> { |
569 | fn as_raw_handle(&self) -> RawHandle { |
570 | // Borrow the handle. The handle will surely be available because the root scope waits |
571 | // for nested scopes before joining remaining threads. |
572 | let handle = self.handle.lock().unwrap(); |
573 | handle.as_ref().unwrap().as_raw_handle() |
574 | } |
575 | } |
576 | |
577 | impl<T> IntoRawHandle for ScopedJoinHandle<'_, T> { |
578 | fn into_raw_handle(self) -> RawHandle { |
579 | self.as_raw_handle() |
580 | } |
581 | } |
582 | } |
583 | } |
584 | |
585 | impl<T> fmt::Debug for ScopedJoinHandle<'_, T> { |
586 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
587 | f.pad("ScopedJoinHandle { .. }" ) |
588 | } |
589 | } |
590 | |