1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#include "qtconcurrentthreadengine.h"
5
6#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
7
8QT_BEGIN_NAMESPACE
9
10namespace QtConcurrent {
11
12/*!
13 \class QtConcurrent::ThreadEngineBarrier
14 \inmodule QtConcurrent
15 \internal
16*/
17
18/*!
19 \enum QtConcurrent::ThreadFunctionResult
20 \internal
21*/
22
23/*!
24 \class QtConcurrent::ThreadEngineBase
25 \inmodule QtConcurrent
26 \internal
27*/
28
29/*!
30 \class QtConcurrent::ThreadEngine
31 \inmodule QtConcurrent
32 \internal
33*/
34
35/*!
36 \class QtConcurrent::ThreadEngineStarterBase
37 \inmodule QtConcurrent
38 \internal
39*/
40
41/*!
42 \class QtConcurrent::ThreadEngineStarter
43 \inmodule QtConcurrent
44 \internal
45*/
46
47/*!
48 \fn [qtconcurrentthreadengine-1] template <typename ThreadEngine> ThreadEngineStarter<typename ThreadEngine::ResultType> QtConcurrent::startThreadEngine(ThreadEngine *threadEngine)
49 \internal
50*/
51
52ThreadEngineBarrier::ThreadEngineBarrier()
53:count(0) { }
54
55void ThreadEngineBarrier::acquire()
56{
57 forever {
58 int localCount = count.loadRelaxed();
59 if (localCount < 0) {
60 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount -1))
61 return;
62 } else {
63 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
64 return;
65 }
66 qYieldCpu();
67 }
68}
69
70int ThreadEngineBarrier::release()
71{
72 forever {
73 int localCount = count.loadRelaxed();
74 if (localCount == -1) {
75 if (count.testAndSetOrdered(expectedValue: -1, newValue: 0)) {
76 semaphore.release();
77 return 0;
78 }
79 } else if (localCount < 0) {
80 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
81 return qAbs(t: localCount + 1);
82 } else {
83 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1))
84 return localCount - 1;
85 }
86 qYieldCpu();
87 }
88}
89
90// Wait until all threads have been released
91void ThreadEngineBarrier::wait()
92{
93 forever {
94 int localCount = count.loadRelaxed();
95 if (localCount == 0)
96 return;
97
98 Q_ASSERT(localCount > 0); // multiple waiters are not allowed.
99 if (count.testAndSetOrdered(expectedValue: localCount, newValue: -localCount)) {
100 semaphore.acquire();
101 return;
102 }
103 qYieldCpu();
104 }
105}
106
107int ThreadEngineBarrier::currentCount()
108{
109 return count.loadRelaxed();
110}
111
112// releases a thread, unless this is the last thread.
113// returns true if the thread was released.
114bool ThreadEngineBarrier::releaseUnlessLast()
115{
116 forever {
117 int localCount = count.loadRelaxed();
118 if (qAbs(t: localCount) == 1) {
119 return false;
120 } else if (localCount < 0) {
121 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1))
122 return true;
123 } else {
124 if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1))
125 return true;
126 }
127 qYieldCpu();
128 }
129}
130
131ThreadEngineBase::ThreadEngineBase(QThreadPool *pool)
132 : futureInterface(nullptr), threadPool(pool)
133{
134 setAutoDelete(false);
135}
136
137ThreadEngineBase::~ThreadEngineBase() {}
138
139void ThreadEngineBase::startSingleThreaded()
140{
141 start();
142 while (threadFunction() != ThreadFinished)
143 ;
144 finish();
145}
146
147void ThreadEngineBase::startThread()
148{
149 startThreadInternal();
150}
151
152void ThreadEngineBase::acquireBarrierSemaphore()
153{
154 barrier.acquire();
155}
156
157void ThreadEngineBase::reportIfSuspensionDone() const
158{
159 if (futureInterface && futureInterface->isSuspending())
160 futureInterface->reportSuspended();
161}
162
163bool ThreadEngineBase::isCanceled()
164{
165 if (futureInterface)
166 return futureInterface->isCanceled();
167 else
168 return false;
169}
170
171void ThreadEngineBase::waitForResume()
172{
173 if (futureInterface)
174 futureInterface->waitForResume();
175}
176
177bool ThreadEngineBase::isProgressReportingEnabled()
178{
179 // If we don't have a QFuture, there is no-one to report the progress to.
180 return (futureInterface != nullptr);
181}
182
183void ThreadEngineBase::setProgressValue(int progress)
184{
185 if (futureInterface)
186 futureInterface->setProgressValue(progress);
187}
188
189void ThreadEngineBase::setProgressRange(int minimum, int maximum)
190{
191 if (futureInterface)
192 futureInterface->setProgressRange(minimum, maximum);
193}
194
195bool ThreadEngineBase::startThreadInternal()
196{
197 if (this->isCanceled())
198 return false;
199
200 barrier.acquire();
201 if (!threadPool->tryStart(runnable: this)) {
202 barrier.release();
203 return false;
204 }
205 return true;
206}
207
208void ThreadEngineBase::startThreads()
209{
210 while (shouldStartThread() && startThreadInternal())
211 ;
212}
213
214void ThreadEngineBase::threadExit()
215{
216 const bool asynchronous = (futureInterface != nullptr);
217 const int lastThread = (barrier.release() == 0);
218
219 if (lastThread && asynchronous)
220 this->asynchronousFinish();
221}
222
223// Called by a worker thread that wants to be throttled. If the current number
224// of running threads is larger than one the thread is allowed to exit and
225// this function returns one.
226bool ThreadEngineBase::threadThrottleExit()
227{
228 return barrier.releaseUnlessLast();
229}
230
231void ThreadEngineBase::run() // implements QRunnable.
232{
233 if (this->isCanceled()) {
234 threadExit();
235 return;
236 }
237
238 startThreads();
239
240#ifndef QT_NO_EXCEPTIONS
241 try {
242#endif
243 while (threadFunction() == ThrottleThread) {
244 // threadFunction returning ThrottleThread means it that the user
245 // struct wants to be throttled by making a worker thread exit.
246 // Respect that request unless this is the only worker thread left
247 // running, in which case it has to keep going.
248 if (threadThrottleExit()) {
249 return;
250 } else {
251 // If the last worker thread is throttled and the state is "suspending",
252 // it means that suspension has been requested, and it is already
253 // in effect (because all previous threads have already exited).
254 // Report the "Suspended" state.
255 reportIfSuspensionDone();
256 }
257 }
258
259#ifndef QT_NO_EXCEPTIONS
260 } catch (QException &e) {
261 handleException(exception: e);
262 } catch (...) {
263 handleException(exception: QUnhandledException(std::current_exception()));
264 }
265#endif
266 threadExit();
267}
268
269#ifndef QT_NO_EXCEPTIONS
270
271void ThreadEngineBase::handleException(const QException &exception)
272{
273 if (futureInterface) {
274 futureInterface->reportException(e: exception);
275 } else {
276 QMutexLocker lock(&mutex);
277 if (!exceptionStore.hasException())
278 exceptionStore.setException(exception);
279 }
280}
281#endif
282
283
284} // namespace QtConcurrent
285
286QT_END_NAMESPACE
287
288#endif // QT_NO_CONCURRENT
289

Provided by KDAB

Privacy Policy
Learn Advanced QML with KDAB
Find out more

source code of qtbase/src/concurrent/qtconcurrentthreadengine.cpp