| 1 | //! The inner logic for thread spawning and joining. |
| 2 | |
| 3 | use super::current::set_current; |
| 4 | use super::id::ThreadId; |
| 5 | use super::scoped::ScopeData; |
| 6 | use super::thread::Thread; |
| 7 | use super::{Result, spawnhook}; |
| 8 | use crate::cell::UnsafeCell; |
| 9 | use crate::marker::PhantomData; |
| 10 | use crate::mem::{ManuallyDrop, MaybeUninit}; |
| 11 | use crate::sync::Arc; |
| 12 | use crate::sync::atomic::{Atomic, AtomicUsize, Ordering}; |
| 13 | use crate::sys::{AsInner, IntoInner, thread as imp}; |
| 14 | use crate::{env, io, panic}; |
| 15 | |
| 16 | #[cfg_attr (miri, track_caller)] // even without panics, this helps for Miri backtraces |
| 17 | pub(super) unsafe fn spawn_unchecked<'scope, F, T>( |
| 18 | name: Option<String>, |
| 19 | stack_size: Option<usize>, |
| 20 | no_hooks: bool, |
| 21 | scope_data: Option<Arc<ScopeData>>, |
| 22 | f: F, |
| 23 | ) -> io::Result<JoinInner<'scope, T>> |
| 24 | where |
| 25 | F: FnOnce() -> T, |
| 26 | F: Send, |
| 27 | T: Send, |
| 28 | { |
| 29 | let stack_size = stack_size.unwrap_or_else(|| { |
| 30 | static MIN: Atomic<usize> = AtomicUsize::new(0); |
| 31 | |
| 32 | match MIN.load(Ordering::Relaxed) { |
| 33 | 0 => {} |
| 34 | n => return n - 1, |
| 35 | } |
| 36 | |
| 37 | let amt = env::var_os("RUST_MIN_STACK" ) |
| 38 | .and_then(|s| s.to_str().and_then(|s| s.parse().ok())) |
| 39 | .unwrap_or(imp::DEFAULT_MIN_STACK_SIZE); |
| 40 | |
| 41 | // 0 is our sentinel value, so ensure that we'll never see 0 after |
| 42 | // initialization has run |
| 43 | MIN.store(amt + 1, Ordering::Relaxed); |
| 44 | amt |
| 45 | }); |
| 46 | |
| 47 | let id = ThreadId::new(); |
| 48 | let thread = Thread::new(id, name); |
| 49 | |
| 50 | let hooks = if no_hooks { |
| 51 | spawnhook::ChildSpawnHooks::default() |
| 52 | } else { |
| 53 | spawnhook::run_spawn_hooks(&thread) |
| 54 | }; |
| 55 | |
| 56 | let my_packet: Arc<Packet<'scope, T>> = |
| 57 | Arc::new(Packet { scope: scope_data, result: UnsafeCell::new(None), _marker: PhantomData }); |
| 58 | let their_packet = my_packet.clone(); |
| 59 | |
| 60 | // Pass `f` in `MaybeUninit` because actually that closure might *run longer than the lifetime of `F`*. |
| 61 | // See <https://github.com/rust-lang/rust/issues/101983> for more details. |
| 62 | // To prevent leaks we use a wrapper that drops its contents. |
| 63 | #[repr (transparent)] |
| 64 | struct MaybeDangling<T>(MaybeUninit<T>); |
| 65 | impl<T> MaybeDangling<T> { |
| 66 | fn new(x: T) -> Self { |
| 67 | MaybeDangling(MaybeUninit::new(x)) |
| 68 | } |
| 69 | fn into_inner(self) -> T { |
| 70 | // Make sure we don't drop. |
| 71 | let this = ManuallyDrop::new(self); |
| 72 | // SAFETY: we are always initialized. |
| 73 | unsafe { this.0.assume_init_read() } |
| 74 | } |
| 75 | } |
| 76 | impl<T> Drop for MaybeDangling<T> { |
| 77 | fn drop(&mut self) { |
| 78 | // SAFETY: we are always initialized. |
| 79 | unsafe { self.0.assume_init_drop() }; |
| 80 | } |
| 81 | } |
| 82 | |
| 83 | let f = MaybeDangling::new(f); |
| 84 | |
| 85 | // The entrypoint of the Rust thread, after platform-specific thread |
| 86 | // initialization is done. |
| 87 | let rust_start = move || { |
| 88 | let f = f.into_inner(); |
| 89 | let try_result = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
| 90 | crate::sys::backtrace::__rust_begin_short_backtrace(|| hooks.run()); |
| 91 | crate::sys::backtrace::__rust_begin_short_backtrace(f) |
| 92 | })); |
| 93 | // SAFETY: `their_packet` as been built just above and moved by the |
| 94 | // closure (it is an Arc<...>) and `my_packet` will be stored in the |
| 95 | // same `JoinInner` as this closure meaning the mutation will be |
| 96 | // safe (not modify it and affect a value far away). |
| 97 | unsafe { *their_packet.result.get() = Some(try_result) }; |
| 98 | // Here `their_packet` gets dropped, and if this is the last `Arc` for that packet that |
| 99 | // will call `decrement_num_running_threads` and therefore signal that this thread is |
| 100 | // done. |
| 101 | drop(their_packet); |
| 102 | // Here, the lifetime `'scope` can end. `main` keeps running for a bit |
| 103 | // after that before returning itself. |
| 104 | }; |
| 105 | |
| 106 | if let Some(scope_data) = &my_packet.scope { |
| 107 | scope_data.increment_num_running_threads(); |
| 108 | } |
| 109 | |
| 110 | // SAFETY: dynamic size and alignment of the Box remain the same. See below for why the |
| 111 | // lifetime change is justified. |
| 112 | let rust_start = unsafe { |
| 113 | let ptr = Box::into_raw(Box::new(rust_start)); |
| 114 | let ptr = crate::mem::transmute::< |
| 115 | *mut (dyn FnOnce() + Send + '_), |
| 116 | *mut (dyn FnOnce() + Send + 'static), |
| 117 | >(ptr); |
| 118 | Box::from_raw(ptr) |
| 119 | }; |
| 120 | |
| 121 | let init = Box::new(ThreadInit { handle: thread.clone(), rust_start }); |
| 122 | |
| 123 | Ok(JoinInner { |
| 124 | // SAFETY: |
| 125 | // |
| 126 | // `imp::Thread::new` takes a closure with a `'static` lifetime, since it's passed |
| 127 | // through FFI or otherwise used with low-level threading primitives that have no |
| 128 | // notion of or way to enforce lifetimes. |
| 129 | // |
| 130 | // As mentioned in the `Safety` section of this function's documentation, the caller of |
| 131 | // this function needs to guarantee that the passed-in lifetime is sufficiently long |
| 132 | // for the lifetime of the thread. |
| 133 | // |
| 134 | // Similarly, the `sys` implementation must guarantee that no references to the closure |
| 135 | // exist after the thread has terminated, which is signaled by `Thread::join` |
| 136 | // returning. |
| 137 | native: unsafe { imp::Thread::new(stack_size, init)? }, |
| 138 | thread, |
| 139 | packet: my_packet, |
| 140 | }) |
| 141 | } |
| 142 | |
| 143 | /// The data passed to the spawned thread for thread initialization. Any thread |
| 144 | /// implementation should start a new thread by calling .init() on this before |
| 145 | /// doing anything else to ensure the current thread is properly initialized and |
| 146 | /// the global allocator works. |
| 147 | pub(crate) struct ThreadInit { |
| 148 | pub handle: Thread, |
| 149 | pub rust_start: Box<dyn FnOnce() + Send>, |
| 150 | } |
| 151 | |
| 152 | impl ThreadInit { |
| 153 | /// Initialize the 'current thread' mechanism on this thread, returning the |
| 154 | /// Rust entry point. |
| 155 | pub fn init(self: Box<Self>) -> Box<dyn FnOnce() + Send> { |
| 156 | // Set the current thread before any (de)allocations on the global allocator occur, |
| 157 | // so that it may call std::thread::current() in its implementation. This is also |
| 158 | // why we take Box<Self>, to ensure the Box is not destroyed until after this point. |
| 159 | // Cloning the handle does not invoke the global allocator, it is an Arc. |
| 160 | if let Err(_thread) = set_current(self.handle.clone()) { |
| 161 | // The current thread should not have set yet. Use an abort to save binary size (see #123356). |
| 162 | rtabort!("current thread handle already set during thread spawn" ); |
| 163 | } |
| 164 | |
| 165 | if let Some(name) = self.handle.cname() { |
| 166 | imp::set_name(name); |
| 167 | } |
| 168 | |
| 169 | self.rust_start |
| 170 | } |
| 171 | } |
| 172 | |
| 173 | // This packet is used to communicate the return value between the spawned |
| 174 | // thread and the rest of the program. It is shared through an `Arc` and |
| 175 | // there's no need for a mutex here because synchronization happens with `join()` |
| 176 | // (the caller will never read this packet until the thread has exited). |
| 177 | // |
| 178 | // An Arc to the packet is stored into a `JoinInner` which in turns is placed |
| 179 | // in `JoinHandle`. |
| 180 | struct Packet<'scope, T> { |
| 181 | scope: Option<Arc<ScopeData>>, |
| 182 | result: UnsafeCell<Option<Result<T>>>, |
| 183 | _marker: PhantomData<Option<&'scope ScopeData>>, |
| 184 | } |
| 185 | |
| 186 | // Due to the usage of `UnsafeCell` we need to manually implement Sync. |
| 187 | // The type `T` should already always be Send (otherwise the thread could not |
| 188 | // have been created) and the Packet is Sync because all access to the |
| 189 | // `UnsafeCell` synchronized (by the `join()` boundary), and `ScopeData` is Sync. |
| 190 | unsafe impl<'scope, T: Send> Sync for Packet<'scope, T> {} |
| 191 | |
| 192 | impl<'scope, T> Drop for Packet<'scope, T> { |
| 193 | fn drop(&mut self) { |
| 194 | // If this packet was for a thread that ran in a scope, the thread |
| 195 | // panicked, and nobody consumed the panic payload, we make sure |
| 196 | // the scope function will panic. |
| 197 | let unhandled_panic = matches!(self.result.get_mut(), Some(Err(_))); |
| 198 | // Drop the result without causing unwinding. |
| 199 | // This is only relevant for threads that aren't join()ed, as |
| 200 | // join() will take the `result` and set it to None, such that |
| 201 | // there is nothing left to drop here. |
| 202 | // If this panics, we should handle that, because we're outside the |
| 203 | // outermost `catch_unwind` of our thread. |
| 204 | // We just abort in that case, since there's nothing else we can do. |
| 205 | // (And even if we tried to handle it somehow, we'd also need to handle |
| 206 | // the case where the panic payload we get out of it also panics on |
| 207 | // drop, and so on. See issue #86027.) |
| 208 | if let Err(_) = panic::catch_unwind(panic::AssertUnwindSafe(|| { |
| 209 | *self.result.get_mut() = None; |
| 210 | })) { |
| 211 | rtabort!("thread result panicked on drop" ); |
| 212 | } |
| 213 | // Book-keeping so the scope knows when it's done. |
| 214 | if let Some(scope) = &self.scope { |
| 215 | // Now that there will be no more user code running on this thread |
| 216 | // that can use 'scope, mark the thread as 'finished'. |
| 217 | // It's important we only do this after the `result` has been dropped, |
| 218 | // since dropping it might still use things it borrowed from 'scope. |
| 219 | scope.decrement_num_running_threads(unhandled_panic); |
| 220 | } |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | /// Inner representation for JoinHandle |
| 225 | pub(super) struct JoinInner<'scope, T> { |
| 226 | native: imp::Thread, |
| 227 | thread: Thread, |
| 228 | packet: Arc<Packet<'scope, T>>, |
| 229 | } |
| 230 | |
| 231 | impl<'scope, T> JoinInner<'scope, T> { |
| 232 | pub(super) fn is_finished(&self) -> bool { |
| 233 | Arc::strong_count(&self.packet) == 1 |
| 234 | } |
| 235 | |
| 236 | pub(super) fn thread(&self) -> &Thread { |
| 237 | &self.thread |
| 238 | } |
| 239 | |
| 240 | pub(super) fn join(mut self) -> Result<T> { |
| 241 | self.native.join(); |
| 242 | Arc::get_mut(&mut self.packet) |
| 243 | // FIXME(fuzzypixelz): returning an error instead of panicking here |
| 244 | // would require updating the documentation of |
| 245 | // `std::thread::Result`; currently we can return `Err` if and only |
| 246 | // if the thread had panicked. |
| 247 | .expect("threads should not terminate unexpectedly" ) |
| 248 | .result |
| 249 | .get_mut() |
| 250 | .take() |
| 251 | .unwrap() |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | impl<T> AsInner<imp::Thread> for JoinInner<'static, T> { |
| 256 | fn as_inner(&self) -> &imp::Thread { |
| 257 | &self.native |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | impl<T> IntoInner<imp::Thread> for JoinInner<'static, T> { |
| 262 | fn into_inner(self) -> imp::Thread { |
| 263 | self.native |
| 264 | } |
| 265 | } |
| 266 | |