1 | // Copyright (C) 2016 The Qt Company Ltd. |
---|---|
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include "qthreadpooler_p.h" |
5 | #include "qaspectjobmanager_p.h" |
6 | #include <QtCore/QDebug> |
7 | |
8 | QT_BEGIN_NAMESPACE |
9 | |
10 | namespace Qt3DCore { |
11 | |
12 | QThreadPooler::QThreadPooler(QObject *parent) |
13 | : QObject(parent) |
14 | , m_futureInterface(nullptr) |
15 | , m_mutex() |
16 | , m_taskCount(0) |
17 | , m_threadPool(new QThreadPool(this)) |
18 | , m_totalRunJobs(0) |
19 | { |
20 | m_threadPool->setMaxThreadCount(QAspectJobManager::idealThreadCount()); |
21 | // Ensures that threads will never be recycled |
22 | m_threadPool->setExpiryTimeout(-1); |
23 | } |
24 | |
25 | QThreadPooler::~QThreadPooler() |
26 | { |
27 | // Wait till all tasks are finished before deleting mutex |
28 | QMutexLocker locker(&m_mutex); |
29 | locker.unlock(); |
30 | } |
31 | |
32 | void QThreadPooler::enqueueTasks(const QList<RunnableInterface *> &tasks) |
33 | { |
34 | // The caller have to set the mutex |
35 | const QList<RunnableInterface *>::const_iterator end = tasks.cend(); |
36 | |
37 | m_totalRunJobs = 0; |
38 | for (QList<RunnableInterface *>::const_iterator it = tasks.cbegin(); |
39 | it != end; ++it) { |
40 | |
41 | // Only AspectTaskRunnables are checked for dependencies. |
42 | static const auto hasDependencies = [](RunnableInterface *task) -> bool { |
43 | return (task->type() == RunnableInterface::RunnableType::AspectTask) |
44 | && (static_cast<AspectTaskRunnable *>(task)->m_dependerCount > 0); |
45 | }; |
46 | |
47 | if (!hasDependencies(*it) && !(*it)->reserved()) { |
48 | (*it)->setReserved(true); |
49 | if ((*it)->isRequired()) { |
50 | (*it)->setPooler(this); |
51 | m_threadPool->start(runnable: (*it)); |
52 | } else { |
53 | skipTask(task: *it); |
54 | } |
55 | } |
56 | } |
57 | } |
58 | |
59 | void QThreadPooler::skipTask(RunnableInterface *task) |
60 | { |
61 | enqueueDepencies(task); |
62 | |
63 | if (currentCount() == 0) { |
64 | if (m_futureInterface) { |
65 | m_futureInterface->reportFinished(); |
66 | delete m_futureInterface; |
67 | } |
68 | m_futureInterface = nullptr; |
69 | } |
70 | |
71 | delete task; // normally gets deleted by threadpool |
72 | } |
73 | |
74 | void QThreadPooler::enqueueDepencies(RunnableInterface *task) |
75 | { |
76 | release(); |
77 | |
78 | if (task->type() == RunnableInterface::RunnableType::AspectTask) { |
79 | AspectTaskRunnable *aspectTask = static_cast<AspectTaskRunnable *>(task); |
80 | const auto &dependers = aspectTask->m_dependers; |
81 | for (auto it = dependers.begin(); it != dependers.end(); ++it) { |
82 | AspectTaskRunnable *dependerTask = static_cast<AspectTaskRunnable *>(*it); |
83 | if (--dependerTask->m_dependerCount == 0) { |
84 | if (!dependerTask->reserved()) { |
85 | dependerTask->setReserved(true); |
86 | if ((*it)->isRequired()) { |
87 | dependerTask->setPooler(this); |
88 | m_threadPool->start(runnable: dependerTask); |
89 | } else { |
90 | skipTask(task: *it); |
91 | } |
92 | } |
93 | } |
94 | } |
95 | } |
96 | } |
97 | |
98 | void QThreadPooler::taskFinished(RunnableInterface *task) |
99 | { |
100 | const QMutexLocker locker(&m_mutex); |
101 | |
102 | m_totalRunJobs++; |
103 | |
104 | enqueueDepencies(task); |
105 | |
106 | if (currentCount() == 0) { |
107 | if (m_futureInterface) { |
108 | m_futureInterface->reportFinished(); |
109 | delete m_futureInterface; |
110 | } |
111 | m_futureInterface = nullptr; |
112 | } |
113 | } |
114 | |
115 | QFuture<void> QThreadPooler::mapDependables(QList<RunnableInterface *> &taskQueue) |
116 | { |
117 | const QMutexLocker locker(&m_mutex); |
118 | |
119 | if (!m_futureInterface) |
120 | m_futureInterface = new QFutureInterface<void>(); |
121 | if (!taskQueue.empty()) |
122 | m_futureInterface->reportStarted(); |
123 | |
124 | acquire(add: taskQueue.size()); |
125 | enqueueTasks(tasks: taskQueue); |
126 | |
127 | return QFuture<void>(m_futureInterface); |
128 | } |
129 | |
130 | int QThreadPooler::waitForAllJobs() |
131 | { |
132 | future().waitForFinished(); |
133 | return m_totalRunJobs; |
134 | } |
135 | |
136 | QFuture<void> QThreadPooler::future() |
137 | { |
138 | const QMutexLocker locker(&m_mutex); |
139 | |
140 | if (!m_futureInterface) |
141 | return QFuture<void>(); |
142 | else |
143 | return QFuture<void>(m_futureInterface); |
144 | } |
145 | |
146 | void QThreadPooler::acquire(int add) |
147 | { |
148 | // The caller have to set the mutex |
149 | |
150 | m_taskCount.fetchAndAddOrdered(valueToAdd: add); |
151 | } |
152 | |
153 | void QThreadPooler::release() |
154 | { |
155 | // The caller have to set the mutex |
156 | |
157 | m_taskCount.fetchAndAddOrdered(valueToAdd: -1); |
158 | } |
159 | |
160 | int QThreadPooler::currentCount() const |
161 | { |
162 | // The caller have to set the mutex |
163 | |
164 | return m_taskCount.loadRelaxed(); |
165 | } |
166 | |
167 | } // namespace Qt3DCore |
168 | |
169 | QT_END_NAMESPACE |
170 | |
171 | #include "moc_qthreadpooler_p.cpp" |
172 |