1// Copyright 2009-2021 Intel Corporation
2// SPDX-License-Identifier: Apache-2.0
3
4#pragma once
5
6#include "../sys/platform.h"
7#include "../sys/alloc.h"
8#include "../sys/barrier.h"
9#include "../sys/thread.h"
10#include "../sys/mutex.h"
11#include "../sys/condition.h"
12#include "../sys/ref.h"
13#include "../sys/atomic.h"
14#include "../math/range.h"
15#include "../../include/embree3/rtcore.h"
16
17#include <exception>
18#include <list>
19
20namespace embree
21{
22
23 /* The tasking system exports some symbols to be used by the tutorials. Thus we
24 hide is also in the API namespace when requested. */
25 RTC_NAMESPACE_BEGIN
26
27 struct TaskScheduler : public RefCount
28 {
29 ALIGNED_STRUCT_(64);
30 friend class Device;
31
32 static const size_t TASK_STACK_SIZE = 4*1024; //!< task structure stack
33 static const size_t CLOSURE_STACK_SIZE = 512*1024; //!< stack for task closures
34
35 struct Thread;
36
37 /*! virtual interface for all tasks */
38 struct TaskFunction {
39 virtual void execute() = 0;
40 };
41
42 /*! builds a task interface from a closure */
43 template<typename Closure>
44 struct ClosureTaskFunction : public TaskFunction
45 {
46 Closure closure;
47 __forceinline ClosureTaskFunction (const Closure& closure) : closure(closure) {}
48 void execute() { closure(); };
49 };
50
51 struct __aligned(64) Task
52 {
53 /*! states a task can be in */
54 enum { DONE, INITIALIZED };
55
56 /*! switch from one state to another */
57 __forceinline void switch_state(int from, int to)
58 {
59 __memory_barrier();
60 MAYBE_UNUSED bool success = state.compare_exchange_strong(i1&: from,i2: to);
61 assert(success);
62 }
63
64 /*! try to switch from one state to another */
65 __forceinline bool try_switch_state(int from, int to) {
66 __memory_barrier();
67 return state.compare_exchange_strong(i1&: from,i2: to);
68 }
69
70 /*! increment/decrement dependency counter */
71 void add_dependencies(int n) {
72 dependencies+=n;
73 }
74
75 /*! initialize all tasks to DONE state by default */
76 __forceinline Task()
77 : state(DONE) {}
78
79 /*! construction of new task */
80 __forceinline Task (TaskFunction* closure, Task* parent, size_t stackPtr, size_t N)
81 : dependencies(1), stealable(true), closure(closure), parent(parent), stackPtr(stackPtr), N(N)
82 {
83 if (parent) parent->add_dependencies(n: +1);
84 switch_state(from: DONE,to: INITIALIZED);
85 }
86
87 /*! construction of stolen task, stealing thread will decrement initial dependency */
88 __forceinline Task (TaskFunction* closure, Task* parent)
89 : dependencies(1), stealable(false), closure(closure), parent(parent), stackPtr(-1), N(1)
90 {
91 switch_state(from: DONE,to: INITIALIZED);
92 }
93
94 /*! try to steal this task */
95 bool try_steal(Task& child)
96 {
97 if (!stealable) return false;
98 if (!try_switch_state(from: INITIALIZED,to: DONE)) return false;
99 new (&child) Task(closure, this);
100 return true;
101 }
102
103 /*! run this task */
104 dll_export void run(Thread& thread);
105
106 void run_internal(Thread& thread);
107
108 public:
109 std::atomic<int> state; //!< state this task is in
110 std::atomic<int> dependencies; //!< dependencies to wait for
111 std::atomic<bool> stealable; //!< true if task can be stolen
112 TaskFunction* closure; //!< the closure to execute
113 Task* parent; //!< parent task to signal when we are finished
114 size_t stackPtr; //!< stack location where closure is stored
115 size_t N; //!< approximative size of task
116 };
117
118 struct TaskQueue
119 {
120 TaskQueue ()
121 : left(0), right(0), stackPtr(0) {}
122
123 __forceinline void* alloc(size_t bytes, size_t align = 64)
124 {
125 size_t ofs = bytes + ((align - stackPtr) & (align-1));
126 if (stackPtr + ofs > CLOSURE_STACK_SIZE)
127 throw std::runtime_error("closure stack overflow");
128 stackPtr += ofs;
129 return &stack[stackPtr-bytes];
130 }
131
132 template<typename Closure>
133 __forceinline void push_right(Thread& thread, const size_t size, const Closure& closure)
134 {
135 if (right >= TASK_STACK_SIZE)
136 throw std::runtime_error("task stack overflow");
137
138 /* allocate new task on right side of stack */
139 size_t oldStackPtr = stackPtr;
140 TaskFunction* func = new (alloc(bytes: sizeof(ClosureTaskFunction<Closure>))) ClosureTaskFunction<Closure>(closure);
141 new (&tasks[right]) Task(func,thread.task,oldStackPtr,size);
142 right++;
143
144 /* also move left pointer */
145 if (left >= right-1) left = right-1;
146 }
147
148 dll_export bool execute_local(Thread& thread, Task* parent);
149 bool execute_local_internal(Thread& thread, Task* parent);
150 bool steal(Thread& thread);
151 size_t getTaskSizeAtLeft();
152
153 bool empty() { return right == 0; }
154
155 public:
156
157 /* task stack */
158 Task tasks[TASK_STACK_SIZE];
159 __aligned(64) std::atomic<size_t> left; //!< threads steal from left
160 __aligned(64) std::atomic<size_t> right; //!< new tasks are added to the right
161
162 /* closure stack */
163 __aligned(64) char stack[CLOSURE_STACK_SIZE];
164 size_t stackPtr;
165 };
166
167 /*! thread local structure for each thread */
168 struct Thread
169 {
170 ALIGNED_STRUCT_(64);
171
172 Thread (size_t threadIndex, const Ref<TaskScheduler>& scheduler)
173 : threadIndex(threadIndex), task(nullptr), scheduler(scheduler) {}
174
175 __forceinline size_t threadCount() {
176 return scheduler->threadCounter;
177 }
178
179 size_t threadIndex; //!< ID of this thread
180 TaskQueue tasks; //!< local task queue
181 Task* task; //!< current active task
182 Ref<TaskScheduler> scheduler; //!< pointer to task scheduler
183 };
184
185 /*! pool of worker threads */
186 struct ThreadPool
187 {
188 ThreadPool (bool set_affinity);
189 ~ThreadPool ();
190
191 /*! starts the threads */
192 dll_export void startThreads();
193
194 /*! sets number of threads to use */
195 void setNumThreads(size_t numThreads, bool startThreads = false);
196
197 /*! adds a task scheduler object for scheduling */
198 dll_export void add(const Ref<TaskScheduler>& scheduler);
199
200 /*! remove the task scheduler object again */
201 dll_export void remove(const Ref<TaskScheduler>& scheduler);
202
203 /*! returns number of threads of the thread pool */
204 size_t size() const { return numThreads; }
205
206 /*! main loop for all threads */
207 void thread_loop(size_t threadIndex);
208
209 private:
210 std::atomic<size_t> numThreads;
211 std::atomic<size_t> numThreadsRunning;
212 bool set_affinity;
213 std::atomic<bool> running;
214 std::vector<thread_t> threads;
215
216 private:
217 MutexSys mutex;
218 ConditionSys condition;
219 std::list<Ref<TaskScheduler> > schedulers;
220 };
221
222 TaskScheduler ();
223 ~TaskScheduler ();
224
225 /*! initializes the task scheduler */
226 static void create(size_t numThreads, bool set_affinity, bool start_threads);
227
228 /*! destroys the task scheduler again */
229 static void destroy();
230
231 /*! lets new worker threads join the tasking system */
232 void join();
233 void reset();
234
235 /*! let a worker thread allocate a thread index */
236 dll_export ssize_t allocThreadIndex();
237
238 /*! wait for some number of threads available (threadCount includes main thread) */
239 void wait_for_threads(size_t threadCount);
240
241 /*! thread loop for all worker threads */
242 std::exception_ptr thread_loop(size_t threadIndex);
243
244 /*! steals a task from a different thread */
245 bool steal_from_other_threads(Thread& thread);
246
247 template<typename Predicate, typename Body>
248 static void steal_loop(Thread& thread, const Predicate& pred, const Body& body);
249
250 /* spawn a new task at the top of the threads task stack */
251 template<typename Closure>
252 void spawn_root(const Closure& closure, size_t size = 1, bool useThreadPool = true)
253 {
254 if (useThreadPool) startThreads();
255
256 size_t threadIndex = allocThreadIndex();
257 std::unique_ptr<Thread> mthread(new Thread(threadIndex,this)); // too large for stack allocation
258 Thread& thread = *mthread;
259 assert(threadLocal[threadIndex].load() == nullptr);
260 threadLocal[threadIndex] = &thread;
261 Thread* oldThread = swapThread(thread: &thread);
262 thread.tasks.push_right(thread,size,closure);
263 {
264 Lock<MutexSys> lock(mutex);
265 anyTasksRunning++;
266 hasRootTask = true;
267 condition.notify_all();
268 }
269
270 if (useThreadPool) addScheduler(scheduler: this);
271
272 while (thread.tasks.execute_local(thread,parent: nullptr));
273 anyTasksRunning--;
274 if (useThreadPool) removeScheduler(scheduler: this);
275
276 threadLocal[threadIndex] = nullptr;
277 swapThread(thread: oldThread);
278
279 /* remember exception to throw */
280 std::exception_ptr except = nullptr;
281 if (cancellingException != nullptr) except = cancellingException;
282
283 /* wait for all threads to terminate */
284 threadCounter--;
285 while (threadCounter > 0) yield();
286 cancellingException = nullptr;
287
288 /* re-throw proper exception */
289 if (except != nullptr)
290 std::rethrow_exception(except);
291 }
292
293 /* spawn a new task at the top of the threads task stack */
294 template<typename Closure>
295 static __forceinline void spawn(size_t size, const Closure& closure)
296 {
297 Thread* thread = TaskScheduler::thread();
298 if (likely(thread != nullptr)) thread->tasks.push_right(*thread,size,closure);
299 else instance()->spawn_root(closure,size);
300 }
301
302 /* spawn a new task at the top of the threads task stack */
303 template<typename Closure>
304 static __forceinline void spawn(const Closure& closure) {
305 spawn(1,closure);
306 }
307
308 /* spawn a new task set */
309 template<typename Index, typename Closure>
310 static void spawn(const Index begin, const Index end, const Index blockSize, const Closure& closure)
311 {
312 spawn(end-begin, [=]()
313 {
314 if (end-begin <= blockSize) {
315 return closure(range<Index>(begin,end));
316 }
317 const Index center = (begin+end)/2;
318 spawn(begin,center,blockSize,closure);
319 spawn(center,end ,blockSize,closure);
320 wait();
321 });
322 }
323
324 /* work on spawned subtasks and wait until all have finished */
325 dll_export static bool wait();
326
327 /* returns the ID of the current thread */
328 dll_export static size_t threadID();
329
330 /* returns the index (0..threadCount-1) of the current thread */
331 dll_export static size_t threadIndex();
332
333 /* returns the total number of threads */
334 dll_export static size_t threadCount();
335
336 private:
337
338 /* returns the thread local task list of this worker thread */
339 dll_export static Thread* thread();
340
341 /* sets the thread local task list of this worker thread */
342 dll_export static Thread* swapThread(Thread* thread);
343
344 /*! returns the taskscheduler object to be used by the master thread */
345 dll_export static TaskScheduler* instance();
346
347 /*! starts the threads */
348 dll_export static void startThreads();
349
350 /*! adds a task scheduler object for scheduling */
351 dll_export static void addScheduler(const Ref<TaskScheduler>& scheduler);
352
353 /*! remove the task scheduler object again */
354 dll_export static void removeScheduler(const Ref<TaskScheduler>& scheduler);
355
356 private:
357 std::vector<atomic<Thread*>> threadLocal;
358 std::atomic<size_t> threadCounter;
359 std::atomic<size_t> anyTasksRunning;
360 std::atomic<bool> hasRootTask;
361 std::exception_ptr cancellingException;
362 MutexSys mutex;
363 ConditionSys condition;
364
365 private:
366 static size_t g_numThreads;
367 static __thread TaskScheduler* g_instance;
368 static __thread Thread* thread_local_thread;
369 static ThreadPool* threadPool;
370 };
371
372 RTC_NAMESPACE_END
373
374#if defined(RTC_NAMESPACE)
375 using RTC_NAMESPACE::TaskScheduler;
376#endif
377}
378

source code of qtquick3d/src/3rdparty/embree/common/tasking/taskschedulerinternal.h