1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
5#include "qtconcurrentthreadengine.h"
6
7#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
8
9QT_BEGIN_NAMESPACE
10
11namespace QtConcurrent {
12
13/*!
14 \class QtConcurrent::ThreadEngineBarrier
15 \inmodule QtConcurrent
16 \internal
17*/
18
19/*!
20 \enum QtConcurrent::ThreadFunctionResult
21 \internal
22*/
23
24/*!
25 \class QtConcurrent::ThreadEngineBase
26 \inmodule QtConcurrent
27 \internal
28*/
29
30/*!
31 \class QtConcurrent::ThreadEngine
32 \inmodule QtConcurrent
33 \internal
34*/
35
36/*!
37 \class QtConcurrent::ThreadEngineStarterBase
38 \inmodule QtConcurrent
39 \internal
40*/
41
42/*!
43 \class QtConcurrent::ThreadEngineStarter
44 \inmodule QtConcurrent
45 \internal
46*/
47
48/*!
49 \fn [qtconcurrentthreadengine-1] template <typename ThreadEngine> ThreadEngineStarter<typename ThreadEngine::ResultType> QtConcurrent::startThreadEngine(ThreadEngine *threadEngine)
50 \internal
51*/
52
53ThreadEngineBarrier::ThreadEngineBarrier()
54:count(0) { }
55
56void ThreadEngineBarrier::acquire()
57{
58 forever {
59 int localCount = count.loadRelaxed();
60 if (localCount < 0) {
61 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount -1))
62 return;
63 } else {
64 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
65 return;
66 }
67 qYieldCpu();
68 }
69}
70
71int ThreadEngineBarrier::release()
72{
73 forever {
74 int localCount = count.loadRelaxed();
75 if (localCount == -1) {
76 if (count.testAndSetOrdered(expectedValue: -1, newValue: 0)) {
77 semaphore.release();
78 return 0;
79 }
80 } else if (localCount < 0) {
81 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
82 return qAbs(t: localCount + 1);
83 } else {
84 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1))
85 return localCount - 1;
86 }
87 qYieldCpu();
88 }
89}
90
91// Wait until all threads have been released
92void ThreadEngineBarrier::wait()
93{
94 forever {
95 int localCount = count.loadRelaxed();
96 if (localCount == 0)
97 return;
98
99 Q_ASSERT(localCount > 0); // multiple waiters are not allowed.
100 if (count.testAndSetOrdered(expectedValue: localCount, newValue: -localCount)) {
101 semaphore.acquire();
102 return;
103 }
104 qYieldCpu();
105 }
106}
107
108int ThreadEngineBarrier::currentCount()
109{
110 return count.loadRelaxed();
111}
112
113// releases a thread, unless this is the last thread.
114// returns true if the thread was released.
115bool ThreadEngineBarrier::releaseUnlessLast()
116{
117 forever {
118 int localCount = count.loadRelaxed();
119 if (qAbs(t: localCount) == 1) {
120 return false;
121 } else if (localCount < 0) {
122 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
123 return true;
124 } else {
125 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1))
126 return true;
127 }
128 qYieldCpu();
129 }
130}
131
132ThreadEngineBase::ThreadEngineBase(QThreadPool *pool)
133 : futureInterface(nullptr), threadPool(pool)
134{
135 setAutoDelete(false);
136}
137
138ThreadEngineBase::~ThreadEngineBase() {}
139
140void ThreadEngineBase::startSingleThreaded()
141{
142 start();
143 while (threadFunction() != ThreadFinished)
144 ;
145 finish();
146}
147
148void ThreadEngineBase::startThread()
149{
150 startThreadInternal();
151}
152
153void ThreadEngineBase::acquireBarrierSemaphore()
154{
155 barrier.acquire();
156}
157
158void ThreadEngineBase::reportIfSuspensionDone() const
159{
160 if (futureInterface && futureInterface->isSuspending())
161 futureInterface->reportSuspended();
162}
163
164bool ThreadEngineBase::isCanceled()
165{
166 if (futureInterface)
167 return futureInterface->isCanceled();
168 else
169 return false;
170}
171
172void ThreadEngineBase::waitForResume()
173{
174 if (futureInterface)
175 futureInterface->waitForResume();
176}
177
178bool ThreadEngineBase::isProgressReportingEnabled()
179{
180 // If we don't have a QFuture, there is no-one to report the progress to.
181 return (futureInterface != nullptr);
182}
183
184void ThreadEngineBase::setProgressValue(int progress)
185{
186 if (futureInterface)
187 futureInterface->setProgressValue(progress);
188}
189
190void ThreadEngineBase::setProgressRange(int minimum, int maximum)
191{
192 if (futureInterface)
193 futureInterface->setProgressRange(minimum, maximum);
194}
195
196bool ThreadEngineBase::startThreadInternal()
197{
198 if (this->isCanceled())
199 return false;
200
201 barrier.acquire();
202 if (!threadPool->tryStart(runnable: this)) {
203 barrier.release();
204 return false;
205 }
206 return true;
207}
208
209void ThreadEngineBase::startThreads()
210{
211 while (shouldStartThread() && startThreadInternal())
212 ;
213}
214
215void ThreadEngineBase::threadExit()
216{
217 const bool asynchronous = (futureInterface != nullptr);
218 const int lastThread = (barrier.release() == 0);
219
220 if (lastThread && asynchronous)
221 this->asynchronousFinish();
222}
223
224// Called by a worker thread that wants to be throttled. If the current number
225// of running threads is larger than one the thread is allowed to exit and
226// this function returns one.
227bool ThreadEngineBase::threadThrottleExit()
228{
229 return barrier.releaseUnlessLast();
230}
231
232void ThreadEngineBase::run() // implements QRunnable.
233{
234 if (this->isCanceled()) {
235 threadExit();
236 return;
237 }
238
239 startThreads();
240
241#ifndef QT_NO_EXCEPTIONS
242 try {
243#endif
244 while (threadFunction() == ThrottleThread) {
245 // threadFunction returning ThrottleThread means it that the user
246 // struct wants to be throttled by making a worker thread exit.
247 // Respect that request unless this is the only worker thread left
248 // running, in which case it has to keep going.
249 if (threadThrottleExit()) {
250 return;
251 } else {
252 // If the last worker thread is throttled and the state is "suspending",
253 // it means that suspension has been requested, and it is already
254 // in effect (because all previous threads have already exited).
255 // Report the "Suspended" state.
256 reportIfSuspensionDone();
257 }
258 }
259
260#ifndef QT_NO_EXCEPTIONS
261 } catch (QException &e) {
262 handleException(exception: e);
263 } catch (...) {
264 handleException(exception: QUnhandledException(std::current_exception()));
265 }
266#endif
267 threadExit();
268}
269
270#ifndef QT_NO_EXCEPTIONS
271
272void ThreadEngineBase::handleException(const QException &exception)
273{
274 if (futureInterface) {
275 futureInterface->reportException(e: exception);
276 } else {
277 QMutexLocker lock(&mutex);
278 if (!exceptionStore.hasException())
279 exceptionStore.setException(exception);
280 }
281}
282#endif
283
284
285} // namespace QtConcurrent
286
287QT_END_NAMESPACE
288
289#endif // QT_NO_CONCURRENT
290

source code of qtbase/src/concurrent/qtconcurrentthreadengine.cpp