| 1 | // Copyright 2009-2021 Intel Corporation | 
| 2 | // SPDX-License-Identifier: Apache-2.0 | 
| 3 |  | 
| 4 | #include "taskschedulerinternal.h" | 
| 5 | #include "../math/math.h" | 
| 6 | #include "../sys/sysinfo.h" | 
| 7 | #include <algorithm> | 
| 8 |  | 
| 9 | namespace embree | 
| 10 | { | 
| 11 |   RTC_NAMESPACE_BEGIN | 
| 12 |    | 
| 13 |   static MutexSys g_mutex; | 
| 14 |   size_t TaskScheduler::g_numThreads = 0; | 
| 15 |   __thread TaskScheduler* TaskScheduler::g_instance = nullptr; | 
| 16 |   std::vector<Ref<TaskScheduler>> g_instance_vector; | 
| 17 |   __thread TaskScheduler::Thread* TaskScheduler::thread_local_thread = nullptr; | 
| 18 |   TaskScheduler::ThreadPool* TaskScheduler::threadPool = nullptr; | 
| 19 |  | 
| 20 |   template<typename Predicate, typename Body> | 
| 21 |   __forceinline void TaskScheduler::steal_loop(Thread& thread, const Predicate& pred, const Body& body) | 
| 22 |   { | 
| 23 |     while (true) | 
| 24 |     { | 
| 25 |       /*! some rounds that yield */ | 
| 26 |       for (size_t i=0; i<32; i++) | 
| 27 |       { | 
| 28 |         /*! some spinning rounds */ | 
| 29 |         const size_t threadCount = thread.threadCount(); | 
| 30 |         for (size_t j=0; j<1024; j+=threadCount) | 
| 31 |         { | 
| 32 |           if (!pred()) return; | 
| 33 |           if (thread.scheduler->steal_from_other_threads(thread)) { | 
| 34 |             i=j=0; | 
| 35 |             body(); | 
| 36 |           } | 
| 37 |         } | 
| 38 |         yield(); | 
| 39 |       } | 
| 40 |     } | 
| 41 |   } | 
| 42 |  | 
| 43 |   /*! run this task */ | 
| 44 |   void TaskScheduler::Task::run_internal (Thread& thread) // FIXME: avoid as many dll_exports as possible | 
| 45 |   { | 
| 46 |     /* try to run if not already stolen */ | 
| 47 |     if (try_switch_state(from: INITIALIZED,to: DONE)) | 
| 48 |     { | 
| 49 |       Task* prevTask = thread.task; | 
| 50 |       thread.task = this; | 
| 51 |       try { | 
| 52 |         if (thread.scheduler->cancellingException == nullptr) | 
| 53 |           closure->execute(); | 
| 54 |       } catch (...) { | 
| 55 |         if (thread.scheduler->cancellingException == nullptr) | 
| 56 |           thread.scheduler->cancellingException = std::current_exception(); | 
| 57 |       } | 
| 58 |       thread.task = prevTask; | 
| 59 |       add_dependencies(n: -1); | 
| 60 |     } | 
| 61 |  | 
| 62 |     /* steal until all dependencies have completed */ | 
| 63 |     steal_loop(thread, | 
| 64 |                pred: [&] () { return dependencies>0; }, | 
| 65 |                body: [&] () { while (thread.tasks.execute_local_internal(thread,parent: this)); }); | 
| 66 |  | 
| 67 |     /* now signal our parent task that we are finished */ | 
| 68 |     if (parent) | 
| 69 |       parent->add_dependencies(n: -1); | 
| 70 |   } | 
| 71 |  | 
| 72 |     /*! run this task */ | 
| 73 |   dll_export void TaskScheduler::Task::run (Thread& thread) { | 
| 74 |     run_internal(thread); | 
| 75 |   } | 
| 76 |  | 
| 77 |   bool TaskScheduler::TaskQueue::execute_local_internal(Thread& thread, Task* parent) | 
| 78 |   { | 
| 79 |     /* stop if we run out of local tasks or reach the waiting task */ | 
| 80 |     if (right == 0 || &tasks[right-1] == parent) | 
| 81 |       return false; | 
| 82 |  | 
| 83 |     /* execute task */ | 
| 84 |     size_t oldRight = right; | 
| 85 |     tasks[right-1].run_internal(thread); | 
| 86 |     if (right != oldRight) { | 
| 87 |       THROW_RUNTIME_ERROR("you have to wait for spawned subtasks" ); | 
| 88 |     } | 
| 89 |  | 
| 90 |     /* pop task and closure from stack */ | 
| 91 |     right--; | 
| 92 |     if (tasks[right].stackPtr != size_t(-1)) | 
| 93 |       stackPtr = tasks[right].stackPtr; | 
| 94 |  | 
| 95 |     /* also move left pointer */ | 
| 96 |     if (left >= right) left.store(i: right.load()); | 
| 97 |  | 
| 98 |     return right != 0; | 
| 99 |   } | 
| 100 |  | 
| 101 |   dll_export bool TaskScheduler::TaskQueue::execute_local(Thread& thread, Task* parent) { | 
| 102 |     return execute_local_internal(thread,parent); | 
| 103 |   } | 
| 104 |  | 
| 105 |   bool TaskScheduler::TaskQueue::steal(Thread& thread) | 
| 106 |   { | 
| 107 |     size_t l = left; | 
| 108 |     size_t r = right; | 
| 109 |     if (l < r) | 
| 110 |     { | 
| 111 |       l = left++; | 
| 112 |        if (l >= r) | 
| 113 |          return false; | 
| 114 |     } | 
| 115 |     else | 
| 116 |       return false; | 
| 117 |  | 
| 118 |     if (!tasks[l].try_steal(child&: thread.tasks.tasks[thread.tasks.right])) | 
| 119 |       return false; | 
| 120 |  | 
| 121 |     thread.tasks.right++; | 
| 122 |     return true; | 
| 123 |   } | 
| 124 |  | 
| 125 |   /* we steal from the left */ | 
| 126 |   size_t TaskScheduler::TaskQueue::getTaskSizeAtLeft() | 
| 127 |   { | 
| 128 |     if (left >= right) return 0; | 
| 129 |     return tasks[left].N; | 
| 130 |   } | 
| 131 |  | 
| 132 |   void threadPoolFunction(std::pair<TaskScheduler::ThreadPool*,size_t>* pair) | 
| 133 |   { | 
| 134 |     TaskScheduler::ThreadPool* pool = pair->first; | 
| 135 |     size_t threadIndex = pair->second; | 
| 136 |     delete pair; | 
| 137 |     pool->thread_loop(threadIndex); | 
| 138 |   } | 
| 139 |  | 
| 140 |   TaskScheduler::ThreadPool::ThreadPool(bool set_affinity) | 
| 141 |     : numThreads(0), numThreadsRunning(0), set_affinity(set_affinity), running(false) {} | 
| 142 |  | 
| 143 |   dll_export void TaskScheduler::ThreadPool::startThreads() | 
| 144 |   { | 
| 145 |     if (running) return; | 
| 146 |     setNumThreads(numThreads,startThreads: true); | 
| 147 |   } | 
| 148 |  | 
| 149 |   void TaskScheduler::ThreadPool::setNumThreads(size_t newNumThreads, bool startThreads) | 
| 150 |   { | 
| 151 |     Lock<MutexSys> lock(g_mutex); | 
| 152 |     assert(newNumThreads); | 
| 153 |     newNumThreads = min(a: newNumThreads, b: (size_t) getNumberOfLogicalThreads()); | 
| 154 |  | 
| 155 |     numThreads = newNumThreads; | 
| 156 |     if (!startThreads && !running) return; | 
| 157 |     running = true; | 
| 158 |     size_t numThreadsActive = numThreadsRunning; | 
| 159 |  | 
| 160 |     mutex.lock(); | 
| 161 |     numThreadsRunning = newNumThreads; | 
| 162 |     mutex.unlock(); | 
| 163 |     condition.notify_all(); | 
| 164 |  | 
| 165 |     /* start new threads */ | 
| 166 |     for (size_t t=numThreadsActive; t<numThreads; t++) | 
| 167 |     { | 
| 168 |       if (t == 0) continue; | 
| 169 |       auto pair = new std::pair<TaskScheduler::ThreadPool*,size_t>(this,t); | 
| 170 |       threads.push_back(x: createThread(f: (thread_func)threadPoolFunction,arg: pair,stack_size: 4*1024*1024,threadID: set_affinity ? t : -1)); | 
| 171 |     } | 
| 172 |  | 
| 173 |     /* stop some threads if we reduce the number of threads */ | 
| 174 |     for (ssize_t t=numThreadsActive-1; t>=ssize_t(numThreadsRunning); t--) { | 
| 175 |       if (t == 0) continue; | 
| 176 |       embree::join(tid: threads.back()); | 
| 177 |       threads.pop_back(); | 
| 178 |     } | 
| 179 |   } | 
| 180 |  | 
| 181 |   TaskScheduler::ThreadPool::~ThreadPool() | 
| 182 |   { | 
| 183 |     /* leave all taskschedulers */ | 
| 184 |     mutex.lock(); | 
| 185 |     numThreadsRunning = 0; | 
| 186 |     mutex.unlock(); | 
| 187 |     condition.notify_all(); | 
| 188 |  | 
| 189 |     /* wait for threads to terminate */ | 
| 190 |     for (size_t i=0; i<threads.size(); i++) | 
| 191 |       embree::join(tid: threads[i]); | 
| 192 |   } | 
| 193 |  | 
| 194 |   dll_export void TaskScheduler::ThreadPool::add(const Ref<TaskScheduler>& scheduler) | 
| 195 |   { | 
| 196 |     mutex.lock(); | 
| 197 |     schedulers.push_back(x: scheduler); | 
| 198 |     mutex.unlock(); | 
| 199 |     condition.notify_all(); | 
| 200 |   } | 
| 201 |  | 
| 202 |   dll_export void TaskScheduler::ThreadPool::remove(const Ref<TaskScheduler>& scheduler) | 
| 203 |   { | 
| 204 |     Lock<MutexSys> lock(mutex); | 
| 205 |     for (std::list<Ref<TaskScheduler> >::iterator it = schedulers.begin(); it != schedulers.end(); it++) { | 
| 206 |       if (scheduler == *it) { | 
| 207 |         schedulers.erase(position: it); | 
| 208 |         return; | 
| 209 |       } | 
| 210 |     } | 
| 211 |   } | 
| 212 |  | 
| 213 |   void TaskScheduler::ThreadPool::thread_loop(size_t globalThreadIndex) | 
| 214 |   { | 
| 215 |     while (globalThreadIndex < numThreadsRunning) | 
| 216 |     { | 
| 217 |       Ref<TaskScheduler> scheduler = NULL; | 
| 218 |       ssize_t threadIndex = -1; | 
| 219 |       { | 
| 220 |         Lock<MutexSys> lock(mutex); | 
| 221 |         condition.wait(mutex, pred: [&] () { return globalThreadIndex >= numThreadsRunning || !schedulers.empty(); }); | 
| 222 |         if (globalThreadIndex >= numThreadsRunning) break; | 
| 223 |         scheduler = schedulers.front(); | 
| 224 |         threadIndex = scheduler->allocThreadIndex(); | 
| 225 |       } | 
| 226 |       scheduler->thread_loop(threadIndex); | 
| 227 |     } | 
| 228 |   } | 
| 229 |  | 
| 230 |   TaskScheduler::TaskScheduler() | 
| 231 |     : threadCounter(0), anyTasksRunning(0), hasRootTask(false) | 
| 232 |   { | 
| 233 |     threadLocal.resize(new_size: 2*getNumberOfLogicalThreads()); // FIXME: this has to be 2x as in the compatibility join mode with rtcCommitScene the worker threads also join. When disallowing rtcCommitScene to join a build we can remove the 2x. | 
| 234 |     for (size_t i=0; i<threadLocal.size(); i++) | 
| 235 |       threadLocal[i].store(p: nullptr); | 
| 236 |   } | 
| 237 |  | 
| 238 |   TaskScheduler::~TaskScheduler() | 
| 239 |   { | 
| 240 |     assert(threadCounter == 0); | 
| 241 |   } | 
| 242 |  | 
| 243 |   dll_export size_t TaskScheduler::threadID() | 
| 244 |   { | 
| 245 |     Thread* thread = TaskScheduler::thread(); | 
| 246 |     if (thread) return thread->threadIndex; | 
| 247 |     else        return 0; | 
| 248 |   } | 
| 249 |  | 
| 250 |   dll_export size_t TaskScheduler::threadIndex() | 
| 251 |   { | 
| 252 |     Thread* thread = TaskScheduler::thread(); | 
| 253 |     if (thread) return thread->threadIndex; | 
| 254 |     else        return 0; | 
| 255 |   } | 
| 256 |  | 
| 257 |   dll_export size_t TaskScheduler::threadCount() { | 
| 258 |     return threadPool->size(); | 
| 259 |   } | 
| 260 |  | 
| 261 |   dll_export TaskScheduler* TaskScheduler::instance() | 
| 262 |   { | 
| 263 |     if (g_instance == NULL) { | 
| 264 |       Lock<MutexSys> lock(g_mutex); | 
| 265 |       g_instance = new TaskScheduler; | 
| 266 |       g_instance_vector.push_back(x: g_instance); | 
| 267 |     } | 
| 268 |     return g_instance; | 
| 269 |   } | 
| 270 |  | 
| 271 |   void TaskScheduler::create(size_t numThreads, bool set_affinity, bool start_threads) | 
| 272 |   { | 
| 273 |     if (!threadPool) threadPool = new TaskScheduler::ThreadPool(set_affinity); | 
| 274 |     threadPool->setNumThreads(newNumThreads: numThreads,startThreads: start_threads); | 
| 275 |   } | 
| 276 |  | 
| 277 |   void TaskScheduler::destroy() { | 
| 278 |     delete threadPool; threadPool = nullptr; | 
| 279 |   } | 
| 280 |  | 
| 281 |   dll_export ssize_t TaskScheduler::allocThreadIndex() | 
| 282 |   { | 
| 283 |     size_t threadIndex = threadCounter++; | 
| 284 |     assert(threadIndex < threadLocal.size()); | 
| 285 |     return threadIndex; | 
| 286 |   } | 
| 287 |  | 
| 288 |   void TaskScheduler::join() | 
| 289 |   { | 
| 290 |     mutex.lock(); | 
| 291 |     size_t threadIndex = allocThreadIndex(); | 
| 292 |     condition.wait(mutex, pred: [&] () { return hasRootTask.load(); }); | 
| 293 |     mutex.unlock(); | 
| 294 |     std::exception_ptr except = thread_loop(threadIndex); | 
| 295 |     if (except != nullptr) std::rethrow_exception(except); | 
| 296 |   } | 
| 297 |  | 
| 298 |   void TaskScheduler::reset() { | 
| 299 |     hasRootTask = false; | 
| 300 |   } | 
| 301 |  | 
| 302 |   void TaskScheduler::wait_for_threads(size_t threadCount) | 
| 303 |   { | 
| 304 |     while (threadCounter < threadCount-1) | 
| 305 |       pause_cpu(); | 
| 306 |   } | 
| 307 |  | 
| 308 |   dll_export TaskScheduler::Thread* TaskScheduler::thread() { | 
| 309 |     return thread_local_thread; | 
| 310 |   } | 
| 311 |  | 
| 312 |   dll_export TaskScheduler::Thread* TaskScheduler::swapThread(Thread* thread) | 
| 313 |   { | 
| 314 |     Thread* old = thread_local_thread; | 
| 315 |     thread_local_thread = thread; | 
| 316 |     return old; | 
| 317 |   } | 
| 318 |  | 
| 319 |   dll_export bool TaskScheduler::wait() | 
| 320 |   { | 
| 321 |     Thread* thread = TaskScheduler::thread(); | 
| 322 |     if (thread == nullptr) return true; | 
| 323 |     while (thread->tasks.execute_local_internal(thread&: *thread,parent: thread->task)) {}; | 
| 324 |     return thread->scheduler->cancellingException == nullptr; | 
| 325 |   } | 
| 326 |  | 
| 327 |   std::exception_ptr TaskScheduler::thread_loop(size_t threadIndex) | 
| 328 |   { | 
| 329 |     /* allocate thread structure */ | 
| 330 |     std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation | 
| 331 |     Thread& thread = *mthread; | 
| 332 |     threadLocal[threadIndex].store(p: &thread); | 
| 333 |     Thread* oldThread = swapThread(thread: &thread); | 
| 334 |  | 
| 335 |     /* main thread loop */ | 
| 336 |     while (anyTasksRunning) | 
| 337 |     { | 
| 338 |       steal_loop(thread, | 
| 339 |                  pred: [&] () { return anyTasksRunning > 0; }, | 
| 340 |                  body: [&] () { | 
| 341 |                    anyTasksRunning++; | 
| 342 |                    while (thread.tasks.execute_local_internal(thread,parent: nullptr)); | 
| 343 |                    anyTasksRunning--; | 
| 344 |                  }); | 
| 345 |     } | 
| 346 |     threadLocal[threadIndex].store(p: nullptr); | 
| 347 |     swapThread(thread: oldThread); | 
| 348 |  | 
| 349 |     /* remember exception to throw */ | 
| 350 |     std::exception_ptr except = nullptr; | 
| 351 |     if (cancellingException != nullptr) except = cancellingException; | 
| 352 |  | 
| 353 |     /* wait for all threads to terminate */ | 
| 354 |     threadCounter--; | 
| 355 | #if defined(__WIN32__) | 
| 356 | 	size_t loopIndex = 1; | 
| 357 | #endif | 
| 358 | #define LOOP_YIELD_THRESHOLD (4096) | 
| 359 | 	while (threadCounter > 0) { | 
| 360 | #if defined(__WIN32__) | 
| 361 |           if ((loopIndex % LOOP_YIELD_THRESHOLD) == 0) | 
| 362 |             yield(); | 
| 363 |           else | 
| 364 |             _mm_pause(); | 
| 365 | 	  loopIndex++; | 
| 366 | #else | 
| 367 |           yield(); | 
| 368 | #endif | 
| 369 | 	} | 
| 370 |     return except; | 
| 371 |   } | 
| 372 |  | 
| 373 |   bool TaskScheduler::steal_from_other_threads(Thread& thread) | 
| 374 |   { | 
| 375 |     const size_t threadIndex = thread.threadIndex; | 
| 376 |     const size_t threadCount = this->threadCounter; | 
| 377 |  | 
| 378 |     for (size_t i=1; i<threadCount; i++) | 
| 379 |     { | 
| 380 |       pause_cpu(N: 32); | 
| 381 |       size_t otherThreadIndex = threadIndex+i; | 
| 382 |       if (otherThreadIndex >= threadCount) otherThreadIndex -= threadCount; | 
| 383 |  | 
| 384 |       Thread* othread = threadLocal[otherThreadIndex].load(); | 
| 385 |       if (!othread) | 
| 386 |         continue; | 
| 387 |  | 
| 388 |       if (othread->tasks.steal(thread)) | 
| 389 |         return true; | 
| 390 |     } | 
| 391 |  | 
| 392 |     return false; | 
| 393 |   } | 
| 394 |  | 
| 395 |   dll_export void TaskScheduler::startThreads() { | 
| 396 |     threadPool->startThreads(); | 
| 397 |   } | 
| 398 |  | 
| 399 |   dll_export void TaskScheduler::addScheduler(const Ref<TaskScheduler>& scheduler) { | 
| 400 |     threadPool->add(scheduler); | 
| 401 |   } | 
| 402 |  | 
| 403 |   dll_export void TaskScheduler::removeScheduler(const Ref<TaskScheduler>& scheduler) { | 
| 404 |     threadPool->remove(scheduler); | 
| 405 |   } | 
| 406 |  | 
| 407 |   RTC_NAMESPACE_END | 
| 408 | } | 
| 409 |  |