1// Copyright 2009-2021 Intel Corporation
2// SPDX-License-Identifier: Apache-2.0
3
4#include "taskschedulerinternal.h"
5#include "../math/math.h"
6#include "../sys/sysinfo.h"
7#include <algorithm>
8
9namespace embree
10{
11 RTC_NAMESPACE_BEGIN
12
13 static MutexSys g_mutex;
14 size_t TaskScheduler::g_numThreads = 0;
15 __thread TaskScheduler* TaskScheduler::g_instance = nullptr;
16 std::vector<Ref<TaskScheduler>> g_instance_vector;
17 __thread TaskScheduler::Thread* TaskScheduler::thread_local_thread = nullptr;
18 TaskScheduler::ThreadPool* TaskScheduler::threadPool = nullptr;
19
20 template<typename Predicate, typename Body>
21 __forceinline void TaskScheduler::steal_loop(Thread& thread, const Predicate& pred, const Body& body)
22 {
23 while (true)
24 {
25 /*! some rounds that yield */
26 for (size_t i=0; i<32; i++)
27 {
28 /*! some spinning rounds */
29 const size_t threadCount = thread.threadCount();
30 for (size_t j=0; j<1024; j+=threadCount)
31 {
32 if (!pred()) return;
33 if (thread.scheduler->steal_from_other_threads(thread)) {
34 i=j=0;
35 body();
36 }
37 }
38 yield();
39 }
40 }
41 }
42
43 /*! run this task */
44 void TaskScheduler::Task::run_internal (Thread& thread) // FIXME: avoid as many dll_exports as possible
45 {
46 /* try to run if not already stolen */
47 if (try_switch_state(from: INITIALIZED,to: DONE))
48 {
49 Task* prevTask = thread.task;
50 thread.task = this;
51 try {
52 if (thread.scheduler->cancellingException == nullptr)
53 closure->execute();
54 } catch (...) {
55 if (thread.scheduler->cancellingException == nullptr)
56 thread.scheduler->cancellingException = std::current_exception();
57 }
58 thread.task = prevTask;
59 add_dependencies(n: -1);
60 }
61
62 /* steal until all dependencies have completed */
63 steal_loop(thread,
64 pred: [&] () { return dependencies>0; },
65 body: [&] () { while (thread.tasks.execute_local_internal(thread,parent: this)); });
66
67 /* now signal our parent task that we are finished */
68 if (parent)
69 parent->add_dependencies(n: -1);
70 }
71
72 /*! run this task */
73 dll_export void TaskScheduler::Task::run (Thread& thread) {
74 run_internal(thread);
75 }
76
77 bool TaskScheduler::TaskQueue::execute_local_internal(Thread& thread, Task* parent)
78 {
79 /* stop if we run out of local tasks or reach the waiting task */
80 if (right == 0 || &tasks[right-1] == parent)
81 return false;
82
83 /* execute task */
84 size_t oldRight = right;
85 tasks[right-1].run_internal(thread);
86 if (right != oldRight) {
87 THROW_RUNTIME_ERROR("you have to wait for spawned subtasks");
88 }
89
90 /* pop task and closure from stack */
91 right--;
92 if (tasks[right].stackPtr != size_t(-1))
93 stackPtr = tasks[right].stackPtr;
94
95 /* also move left pointer */
96 if (left >= right) left.store(i: right.load());
97
98 return right != 0;
99 }
100
101 dll_export bool TaskScheduler::TaskQueue::execute_local(Thread& thread, Task* parent) {
102 return execute_local_internal(thread,parent);
103 }
104
105 bool TaskScheduler::TaskQueue::steal(Thread& thread)
106 {
107 size_t l = left;
108 size_t r = right;
109 if (l < r)
110 {
111 l = left++;
112 if (l >= r)
113 return false;
114 }
115 else
116 return false;
117
118 if (!tasks[l].try_steal(child&: thread.tasks.tasks[thread.tasks.right]))
119 return false;
120
121 thread.tasks.right++;
122 return true;
123 }
124
125 /* we steal from the left */
126 size_t TaskScheduler::TaskQueue::getTaskSizeAtLeft()
127 {
128 if (left >= right) return 0;
129 return tasks[left].N;
130 }
131
132 void threadPoolFunction(std::pair<TaskScheduler::ThreadPool*,size_t>* pair)
133 {
134 TaskScheduler::ThreadPool* pool = pair->first;
135 size_t threadIndex = pair->second;
136 delete pair;
137 pool->thread_loop(threadIndex);
138 }
139
140 TaskScheduler::ThreadPool::ThreadPool(bool set_affinity)
141 : numThreads(0), numThreadsRunning(0), set_affinity(set_affinity), running(false) {}
142
143 dll_export void TaskScheduler::ThreadPool::startThreads()
144 {
145 if (running) return;
146 setNumThreads(numThreads,startThreads: true);
147 }
148
149 void TaskScheduler::ThreadPool::setNumThreads(size_t newNumThreads, bool startThreads)
150 {
151 Lock<MutexSys> lock(g_mutex);
152 assert(newNumThreads);
153 newNumThreads = min(a: newNumThreads, b: (size_t) getNumberOfLogicalThreads());
154
155 numThreads = newNumThreads;
156 if (!startThreads && !running) return;
157 running = true;
158 size_t numThreadsActive = numThreadsRunning;
159
160 mutex.lock();
161 numThreadsRunning = newNumThreads;
162 mutex.unlock();
163 condition.notify_all();
164
165 /* start new threads */
166 for (size_t t=numThreadsActive; t<numThreads; t++)
167 {
168 if (t == 0) continue;
169 auto pair = new std::pair<TaskScheduler::ThreadPool*,size_t>(this,t);
170 threads.push_back(x: createThread(f: (thread_func)threadPoolFunction,arg: pair,stack_size: 4*1024*1024,threadID: set_affinity ? t : -1));
171 }
172
173 /* stop some threads if we reduce the number of threads */
174 for (ssize_t t=numThreadsActive-1; t>=ssize_t(numThreadsRunning); t--) {
175 if (t == 0) continue;
176 embree::join(tid: threads.back());
177 threads.pop_back();
178 }
179 }
180
181 TaskScheduler::ThreadPool::~ThreadPool()
182 {
183 /* leave all taskschedulers */
184 mutex.lock();
185 numThreadsRunning = 0;
186 mutex.unlock();
187 condition.notify_all();
188
189 /* wait for threads to terminate */
190 for (size_t i=0; i<threads.size(); i++)
191 embree::join(tid: threads[i]);
192 }
193
194 dll_export void TaskScheduler::ThreadPool::add(const Ref<TaskScheduler>& scheduler)
195 {
196 mutex.lock();
197 schedulers.push_back(x: scheduler);
198 mutex.unlock();
199 condition.notify_all();
200 }
201
202 dll_export void TaskScheduler::ThreadPool::remove(const Ref<TaskScheduler>& scheduler)
203 {
204 Lock<MutexSys> lock(mutex);
205 for (std::list<Ref<TaskScheduler> >::iterator it = schedulers.begin(); it != schedulers.end(); it++) {
206 if (scheduler == *it) {
207 schedulers.erase(position: it);
208 return;
209 }
210 }
211 }
212
213 void TaskScheduler::ThreadPool::thread_loop(size_t globalThreadIndex)
214 {
215 while (globalThreadIndex < numThreadsRunning)
216 {
217 Ref<TaskScheduler> scheduler = NULL;
218 ssize_t threadIndex = -1;
219 {
220 Lock<MutexSys> lock(mutex);
221 condition.wait(mutex, pred: [&] () { return globalThreadIndex >= numThreadsRunning || !schedulers.empty(); });
222 if (globalThreadIndex >= numThreadsRunning) break;
223 scheduler = schedulers.front();
224 threadIndex = scheduler->allocThreadIndex();
225 }
226 scheduler->thread_loop(threadIndex);
227 }
228 }
229
230 TaskScheduler::TaskScheduler()
231 : threadCounter(0), anyTasksRunning(0), hasRootTask(false)
232 {
233 threadLocal.resize(new_size: 2*getNumberOfLogicalThreads()); // FIXME: this has to be 2x as in the compatibility join mode with rtcCommitScene the worker threads also join. When disallowing rtcCommitScene to join a build we can remove the 2x.
234 for (size_t i=0; i<threadLocal.size(); i++)
235 threadLocal[i].store(p: nullptr);
236 }
237
238 TaskScheduler::~TaskScheduler()
239 {
240 assert(threadCounter == 0);
241 }
242
243 dll_export size_t TaskScheduler::threadID()
244 {
245 Thread* thread = TaskScheduler::thread();
246 if (thread) return thread->threadIndex;
247 else return 0;
248 }
249
250 dll_export size_t TaskScheduler::threadIndex()
251 {
252 Thread* thread = TaskScheduler::thread();
253 if (thread) return thread->threadIndex;
254 else return 0;
255 }
256
257 dll_export size_t TaskScheduler::threadCount() {
258 return threadPool->size();
259 }
260
261 dll_export TaskScheduler* TaskScheduler::instance()
262 {
263 if (g_instance == NULL) {
264 Lock<MutexSys> lock(g_mutex);
265 g_instance = new TaskScheduler;
266 g_instance_vector.push_back(x: g_instance);
267 }
268 return g_instance;
269 }
270
271 void TaskScheduler::create(size_t numThreads, bool set_affinity, bool start_threads)
272 {
273 if (!threadPool) threadPool = new TaskScheduler::ThreadPool(set_affinity);
274 threadPool->setNumThreads(newNumThreads: numThreads,startThreads: start_threads);
275 }
276
277 void TaskScheduler::destroy() {
278 delete threadPool; threadPool = nullptr;
279 }
280
281 dll_export ssize_t TaskScheduler::allocThreadIndex()
282 {
283 size_t threadIndex = threadCounter++;
284 assert(threadIndex < threadLocal.size());
285 return threadIndex;
286 }
287
288 void TaskScheduler::join()
289 {
290 mutex.lock();
291 size_t threadIndex = allocThreadIndex();
292 condition.wait(mutex, pred: [&] () { return hasRootTask.load(); });
293 mutex.unlock();
294 std::exception_ptr except = thread_loop(threadIndex);
295 if (except != nullptr) std::rethrow_exception(except);
296 }
297
298 void TaskScheduler::reset() {
299 hasRootTask = false;
300 }
301
302 void TaskScheduler::wait_for_threads(size_t threadCount)
303 {
304 while (threadCounter < threadCount-1)
305 pause_cpu();
306 }
307
308 dll_export TaskScheduler::Thread* TaskScheduler::thread() {
309 return thread_local_thread;
310 }
311
312 dll_export TaskScheduler::Thread* TaskScheduler::swapThread(Thread* thread)
313 {
314 Thread* old = thread_local_thread;
315 thread_local_thread = thread;
316 return old;
317 }
318
319 dll_export bool TaskScheduler::wait()
320 {
321 Thread* thread = TaskScheduler::thread();
322 if (thread == nullptr) return true;
323 while (thread->tasks.execute_local_internal(thread&: *thread,parent: thread->task)) {};
324 return thread->scheduler->cancellingException == nullptr;
325 }
326
327 std::exception_ptr TaskScheduler::thread_loop(size_t threadIndex)
328 {
329 /* allocate thread structure */
330 std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
331 Thread& thread = *mthread;
332 threadLocal[threadIndex].store(p: &thread);
333 Thread* oldThread = swapThread(thread: &thread);
334
335 /* main thread loop */
336 while (anyTasksRunning)
337 {
338 steal_loop(thread,
339 pred: [&] () { return anyTasksRunning > 0; },
340 body: [&] () {
341 anyTasksRunning++;
342 while (thread.tasks.execute_local_internal(thread,parent: nullptr));
343 anyTasksRunning--;
344 });
345 }
346 threadLocal[threadIndex].store(p: nullptr);
347 swapThread(thread: oldThread);
348
349 /* remember exception to throw */
350 std::exception_ptr except = nullptr;
351 if (cancellingException != nullptr) except = cancellingException;
352
353 /* wait for all threads to terminate */
354 threadCounter--;
355#if defined(__WIN32__)
356 size_t loopIndex = 1;
357#endif
358#define LOOP_YIELD_THRESHOLD (4096)
359 while (threadCounter > 0) {
360#if defined(__WIN32__)
361 if ((loopIndex % LOOP_YIELD_THRESHOLD) == 0)
362 yield();
363 else
364 _mm_pause();
365 loopIndex++;
366#else
367 yield();
368#endif
369 }
370 return except;
371 }
372
373 bool TaskScheduler::steal_from_other_threads(Thread& thread)
374 {
375 const size_t threadIndex = thread.threadIndex;
376 const size_t threadCount = this->threadCounter;
377
378 for (size_t i=1; i<threadCount; i++)
379 {
380 pause_cpu(N: 32);
381 size_t otherThreadIndex = threadIndex+i;
382 if (otherThreadIndex >= threadCount) otherThreadIndex -= threadCount;
383
384 Thread* othread = threadLocal[otherThreadIndex].load();
385 if (!othread)
386 continue;
387
388 if (othread->tasks.steal(thread))
389 return true;
390 }
391
392 return false;
393 }
394
395 dll_export void TaskScheduler::startThreads() {
396 threadPool->startThreads();
397 }
398
399 dll_export void TaskScheduler::addScheduler(const Ref<TaskScheduler>& scheduler) {
400 threadPool->add(scheduler);
401 }
402
403 dll_export void TaskScheduler::removeScheduler(const Ref<TaskScheduler>& scheduler) {
404 threadPool->remove(scheduler);
405 }
406
407 RTC_NAMESPACE_END
408}
409

source code of qtquick3d/src/3rdparty/embree/common/tasking/taskschedulerinternal.cpp