1 | // Copyright (C) 2016 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include "qtconcurrentthreadengine.h" |
5 | |
6 | #include <QtCore/private/qsimd_p.h> |
7 | |
8 | #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC) |
9 | |
10 | QT_BEGIN_NAMESPACE |
11 | |
12 | namespace QtConcurrent { |
13 | |
14 | /*! |
15 | \class QtConcurrent::ThreadEngineBarrier |
16 | \inmodule QtConcurrent |
17 | \internal |
18 | */ |
19 | |
20 | /*! |
21 | \enum QtConcurrent::ThreadFunctionResult |
22 | \internal |
23 | */ |
24 | |
25 | /*! |
26 | \class QtConcurrent::ThreadEngineBase |
27 | \inmodule QtConcurrent |
28 | \internal |
29 | */ |
30 | |
31 | /*! |
32 | \class QtConcurrent::ThreadEngine |
33 | \inmodule QtConcurrent |
34 | \internal |
35 | */ |
36 | |
37 | /*! |
38 | \class QtConcurrent::ThreadEngineStarterBase |
39 | \inmodule QtConcurrent |
40 | \internal |
41 | */ |
42 | |
43 | /*! |
44 | \class QtConcurrent::ThreadEngineStarter |
45 | \inmodule QtConcurrent |
46 | \internal |
47 | */ |
48 | |
49 | /*! |
50 | \fn [qtconcurrentthreadengine-1] template <typename ThreadEngine> ThreadEngineStarter<typename ThreadEngine::ResultType> QtConcurrent::startThreadEngine(ThreadEngine *threadEngine) |
51 | \internal |
52 | */ |
53 | |
54 | ThreadEngineBarrier::ThreadEngineBarrier() |
55 | :count(0) { } |
56 | |
57 | void ThreadEngineBarrier::acquire() |
58 | { |
59 | forever { |
60 | int localCount = count.loadRelaxed(); |
61 | if (localCount < 0) { |
62 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount -1)) |
63 | return; |
64 | } else { |
65 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1)) |
66 | return; |
67 | } |
68 | qYieldCpu(); |
69 | } |
70 | } |
71 | |
72 | int ThreadEngineBarrier::release() |
73 | { |
74 | forever { |
75 | int localCount = count.loadRelaxed(); |
76 | if (localCount == -1) { |
77 | if (count.testAndSetOrdered(expectedValue: -1, newValue: 0)) { |
78 | semaphore.release(); |
79 | return 0; |
80 | } |
81 | } else if (localCount < 0) { |
82 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1)) |
83 | return qAbs(t: localCount + 1); |
84 | } else { |
85 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1)) |
86 | return localCount - 1; |
87 | } |
88 | qYieldCpu(); |
89 | } |
90 | } |
91 | |
92 | // Wait until all threads have been released |
93 | void ThreadEngineBarrier::wait() |
94 | { |
95 | forever { |
96 | int localCount = count.loadRelaxed(); |
97 | if (localCount == 0) |
98 | return; |
99 | |
100 | Q_ASSERT(localCount > 0); // multiple waiters are not allowed. |
101 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: -localCount)) { |
102 | semaphore.acquire(); |
103 | return; |
104 | } |
105 | qYieldCpu(); |
106 | } |
107 | } |
108 | |
109 | int ThreadEngineBarrier::currentCount() |
110 | { |
111 | return count.loadRelaxed(); |
112 | } |
113 | |
114 | // releases a thread, unless this is the last thread. |
115 | // returns true if the thread was released. |
116 | bool ThreadEngineBarrier::releaseUnlessLast() |
117 | { |
118 | forever { |
119 | int localCount = count.loadRelaxed(); |
120 | if (qAbs(t: localCount) == 1) { |
121 | return false; |
122 | } else if (localCount < 0) { |
123 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1)) |
124 | return true; |
125 | } else { |
126 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1)) |
127 | return true; |
128 | } |
129 | qYieldCpu(); |
130 | } |
131 | } |
132 | |
133 | ThreadEngineBase::ThreadEngineBase(QThreadPool *pool) |
134 | : futureInterface(nullptr), threadPool(pool) |
135 | { |
136 | setAutoDelete(false); |
137 | } |
138 | |
139 | ThreadEngineBase::~ThreadEngineBase() {} |
140 | |
141 | void ThreadEngineBase::startSingleThreaded() |
142 | { |
143 | start(); |
144 | while (threadFunction() != ThreadFinished) |
145 | ; |
146 | finish(); |
147 | } |
148 | |
149 | void ThreadEngineBase::startThread() |
150 | { |
151 | startThreadInternal(); |
152 | } |
153 | |
154 | void ThreadEngineBase::acquireBarrierSemaphore() |
155 | { |
156 | barrier.acquire(); |
157 | } |
158 | |
159 | void ThreadEngineBase::reportIfSuspensionDone() const |
160 | { |
161 | if (futureInterface && futureInterface->isSuspending()) |
162 | futureInterface->reportSuspended(); |
163 | } |
164 | |
165 | bool ThreadEngineBase::isCanceled() |
166 | { |
167 | if (futureInterface) |
168 | return futureInterface->isCanceled(); |
169 | else |
170 | return false; |
171 | } |
172 | |
173 | void ThreadEngineBase::waitForResume() |
174 | { |
175 | if (futureInterface) |
176 | futureInterface->waitForResume(); |
177 | } |
178 | |
179 | bool ThreadEngineBase::isProgressReportingEnabled() |
180 | { |
181 | // If we don't have a QFuture, there is no-one to report the progress to. |
182 | return (futureInterface != nullptr); |
183 | } |
184 | |
185 | void ThreadEngineBase::setProgressValue(int progress) |
186 | { |
187 | if (futureInterface) |
188 | futureInterface->setProgressValue(progress); |
189 | } |
190 | |
191 | void ThreadEngineBase::setProgressRange(int minimum, int maximum) |
192 | { |
193 | if (futureInterface) |
194 | futureInterface->setProgressRange(minimum, maximum); |
195 | } |
196 | |
197 | bool ThreadEngineBase::startThreadInternal() |
198 | { |
199 | if (this->isCanceled()) |
200 | return false; |
201 | |
202 | barrier.acquire(); |
203 | if (!threadPool->tryStart(runnable: this)) { |
204 | barrier.release(); |
205 | return false; |
206 | } |
207 | return true; |
208 | } |
209 | |
210 | void ThreadEngineBase::startThreads() |
211 | { |
212 | while (shouldStartThread() && startThreadInternal()) |
213 | ; |
214 | } |
215 | |
216 | void ThreadEngineBase::threadExit() |
217 | { |
218 | const bool asynchronous = (futureInterface != nullptr); |
219 | const int lastThread = (barrier.release() == 0); |
220 | |
221 | if (lastThread && asynchronous) |
222 | this->asynchronousFinish(); |
223 | } |
224 | |
225 | // Called by a worker thread that wants to be throttled. If the current number |
226 | // of running threads is larger than one the thread is allowed to exit and |
227 | // this function returns one. |
228 | bool ThreadEngineBase::threadThrottleExit() |
229 | { |
230 | return barrier.releaseUnlessLast(); |
231 | } |
232 | |
233 | void ThreadEngineBase::run() // implements QRunnable. |
234 | { |
235 | if (this->isCanceled()) { |
236 | threadExit(); |
237 | return; |
238 | } |
239 | |
240 | startThreads(); |
241 | |
242 | #ifndef QT_NO_EXCEPTIONS |
243 | try { |
244 | #endif |
245 | while (threadFunction() == ThrottleThread) { |
246 | // threadFunction returning ThrottleThread means it that the user |
247 | // struct wants to be throttled by making a worker thread exit. |
248 | // Respect that request unless this is the only worker thread left |
249 | // running, in which case it has to keep going. |
250 | if (threadThrottleExit()) { |
251 | return; |
252 | } else { |
253 | // If the last worker thread is throttled and the state is "suspending", |
254 | // it means that suspension has been requested, and it is already |
255 | // in effect (because all previous threads have already exited). |
256 | // Report the "Suspended" state. |
257 | reportIfSuspensionDone(); |
258 | } |
259 | } |
260 | |
261 | #ifndef QT_NO_EXCEPTIONS |
262 | } catch (QException &e) { |
263 | handleException(exception: e); |
264 | } catch (...) { |
265 | handleException(exception: QUnhandledException(std::current_exception())); |
266 | } |
267 | #endif |
268 | threadExit(); |
269 | } |
270 | |
271 | #ifndef QT_NO_EXCEPTIONS |
272 | |
273 | void ThreadEngineBase::handleException(const QException &exception) |
274 | { |
275 | if (futureInterface) { |
276 | futureInterface->reportException(e: exception); |
277 | } else { |
278 | QMutexLocker lock(&mutex); |
279 | if (!exceptionStore.hasException()) |
280 | exceptionStore.setException(exception); |
281 | } |
282 | } |
283 | #endif |
284 | |
285 | |
286 | } // namespace QtConcurrent |
287 | |
288 | QT_END_NAMESPACE |
289 | |
290 | #endif // QT_NO_CONCURRENT |
291 | |