1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qtconcurrentthreadengine.h"
5
6#include <QtCore/private/qsimd_p.h>
7
8#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
9
10QT_BEGIN_NAMESPACE
11
12namespace QtConcurrent {
13
14/*!
15 \class QtConcurrent::ThreadEngineBarrier
16 \inmodule QtConcurrent
17 \internal
18*/
19
20/*!
21 \enum QtConcurrent::ThreadFunctionResult
22 \internal
23*/
24
25/*!
26 \class QtConcurrent::ThreadEngineBase
27 \inmodule QtConcurrent
28 \internal
29*/
30
31/*!
32 \class QtConcurrent::ThreadEngine
33 \inmodule QtConcurrent
34 \internal
35*/
36
37/*!
38 \class QtConcurrent::ThreadEngineStarterBase
39 \inmodule QtConcurrent
40 \internal
41*/
42
43/*!
44 \class QtConcurrent::ThreadEngineStarter
45 \inmodule QtConcurrent
46 \internal
47*/
48
49/*!
50 \fn [qtconcurrentthreadengine-1] template <typename ThreadEngine> ThreadEngineStarter<typename ThreadEngine::ResultType> QtConcurrent::startThreadEngine(ThreadEngine *threadEngine)
51 \internal
52*/
53
54ThreadEngineBarrier::ThreadEngineBarrier()
55:count(0) { }
56
57void ThreadEngineBarrier::acquire()
58{
59 forever {
60 int localCount = count.loadRelaxed();
61 if (localCount < 0) {
62 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount -1))
63 return;
64 } else {
65 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
66 return;
67 }
68 qYieldCpu();
69 }
70}
71
72int ThreadEngineBarrier::release()
73{
74 forever {
75 int localCount = count.loadRelaxed();
76 if (localCount == -1) {
77 if (count.testAndSetOrdered(expectedValue: -1, newValue: 0)) {
78 semaphore.release();
79 return 0;
80 }
81 } else if (localCount < 0) {
82 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
83 return qAbs(t: localCount + 1);
84 } else {
85 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1))
86 return localCount - 1;
87 }
88 qYieldCpu();
89 }
90}
91
92// Wait until all threads have been released
93void ThreadEngineBarrier::wait()
94{
95 forever {
96 int localCount = count.loadRelaxed();
97 if (localCount == 0)
98 return;
99
100 Q_ASSERT(localCount > 0); // multiple waiters are not allowed.
101 if (count.testAndSetOrdered(expectedValue: localCount, newValue: -localCount)) {
102 semaphore.acquire();
103 return;
104 }
105 qYieldCpu();
106 }
107}
108
109int ThreadEngineBarrier::currentCount()
110{
111 return count.loadRelaxed();
112}
113
114// releases a thread, unless this is the last thread.
115// returns true if the thread was released.
116bool ThreadEngineBarrier::releaseUnlessLast()
117{
118 forever {
119 int localCount = count.loadRelaxed();
120 if (qAbs(t: localCount) == 1) {
121 return false;
122 } else if (localCount < 0) {
123 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
124 return true;
125 } else {
126 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1))
127 return true;
128 }
129 qYieldCpu();
130 }
131}
132
133ThreadEngineBase::ThreadEngineBase(QThreadPool *pool)
134 : futureInterface(nullptr), threadPool(pool)
135{
136 setAutoDelete(false);
137}
138
139ThreadEngineBase::~ThreadEngineBase() {}
140
141void ThreadEngineBase::startSingleThreaded()
142{
143 start();
144 while (threadFunction() != ThreadFinished)
145 ;
146 finish();
147}
148
149void ThreadEngineBase::startThread()
150{
151 startThreadInternal();
152}
153
154void ThreadEngineBase::acquireBarrierSemaphore()
155{
156 barrier.acquire();
157}
158
159void ThreadEngineBase::reportIfSuspensionDone() const
160{
161 if (futureInterface && futureInterface->isSuspending())
162 futureInterface->reportSuspended();
163}
164
165bool ThreadEngineBase::isCanceled()
166{
167 if (futureInterface)
168 return futureInterface->isCanceled();
169 else
170 return false;
171}
172
173void ThreadEngineBase::waitForResume()
174{
175 if (futureInterface)
176 futureInterface->waitForResume();
177}
178
179bool ThreadEngineBase::isProgressReportingEnabled()
180{
181 // If we don't have a QFuture, there is no-one to report the progress to.
182 return (futureInterface != nullptr);
183}
184
185void ThreadEngineBase::setProgressValue(int progress)
186{
187 if (futureInterface)
188 futureInterface->setProgressValue(progress);
189}
190
191void ThreadEngineBase::setProgressRange(int minimum, int maximum)
192{
193 if (futureInterface)
194 futureInterface->setProgressRange(minimum, maximum);
195}
196
197bool ThreadEngineBase::startThreadInternal()
198{
199 if (this->isCanceled())
200 return false;
201
202 barrier.acquire();
203 if (!threadPool->tryStart(runnable: this)) {
204 barrier.release();
205 return false;
206 }
207 return true;
208}
209
210void ThreadEngineBase::startThreads()
211{
212 while (shouldStartThread() && startThreadInternal())
213 ;
214}
215
216void ThreadEngineBase::threadExit()
217{
218 const bool asynchronous = (futureInterface != nullptr);
219 const int lastThread = (barrier.release() == 0);
220
221 if (lastThread && asynchronous)
222 this->asynchronousFinish();
223}
224
225// Called by a worker thread that wants to be throttled. If the current number
226// of running threads is larger than one the thread is allowed to exit and
227// this function returns one.
228bool ThreadEngineBase::threadThrottleExit()
229{
230 return barrier.releaseUnlessLast();
231}
232
233void ThreadEngineBase::run() // implements QRunnable.
234{
235 if (this->isCanceled()) {
236 threadExit();
237 return;
238 }
239
240 startThreads();
241
242#ifndef QT_NO_EXCEPTIONS
243 try {
244#endif
245 while (threadFunction() == ThrottleThread) {
246 // threadFunction returning ThrottleThread means it that the user
247 // struct wants to be throttled by making a worker thread exit.
248 // Respect that request unless this is the only worker thread left
249 // running, in which case it has to keep going.
250 if (threadThrottleExit()) {
251 return;
252 } else {
253 // If the last worker thread is throttled and the state is "suspending",
254 // it means that suspension has been requested, and it is already
255 // in effect (because all previous threads have already exited).
256 // Report the "Suspended" state.
257 reportIfSuspensionDone();
258 }
259 }
260
261#ifndef QT_NO_EXCEPTIONS
262 } catch (QException &e) {
263 handleException(exception: e);
264 } catch (...) {
265 handleException(exception: QUnhandledException(std::current_exception()));
266 }
267#endif
268 threadExit();
269}
270
271#ifndef QT_NO_EXCEPTIONS
272
273void ThreadEngineBase::handleException(const QException &exception)
274{
275 if (futureInterface) {
276 futureInterface->reportException(e: exception);
277 } else {
278 QMutexLocker lock(&mutex);
279 if (!exceptionStore.hasException())
280 exceptionStore.setException(exception);
281 }
282}
283#endif
284
285
286} // namespace QtConcurrent
287
288QT_END_NAMESPACE
289
290#endif // QT_NO_CONCURRENT
291

source code of qtbase/src/concurrent/qtconcurrentthreadengine.cpp