1 | // Copyright (C) 2016 The Qt Company Ltd. |
---|---|
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #include "qtconcurrentthreadengine.h" |
5 | |
6 | #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC) |
7 | |
8 | QT_BEGIN_NAMESPACE |
9 | |
10 | namespace QtConcurrent { |
11 | |
12 | /*! |
13 | \class QtConcurrent::ThreadEngineBarrier |
14 | \inmodule QtConcurrent |
15 | \internal |
16 | */ |
17 | |
18 | /*! |
19 | \enum QtConcurrent::ThreadFunctionResult |
20 | \internal |
21 | */ |
22 | |
23 | /*! |
24 | \class QtConcurrent::ThreadEngineBase |
25 | \inmodule QtConcurrent |
26 | \internal |
27 | */ |
28 | |
29 | /*! |
30 | \class QtConcurrent::ThreadEngine |
31 | \inmodule QtConcurrent |
32 | \internal |
33 | */ |
34 | |
35 | /*! |
36 | \class QtConcurrent::ThreadEngineStarterBase |
37 | \inmodule QtConcurrent |
38 | \internal |
39 | */ |
40 | |
41 | /*! |
42 | \class QtConcurrent::ThreadEngineStarter |
43 | \inmodule QtConcurrent |
44 | \internal |
45 | */ |
46 | |
47 | /*! |
48 | \fn [qtconcurrentthreadengine-1] template <typename ThreadEngine> ThreadEngineStarter<typename ThreadEngine::ResultType> QtConcurrent::startThreadEngine(ThreadEngine *threadEngine) |
49 | \internal |
50 | */ |
51 | |
52 | ThreadEngineBarrier::ThreadEngineBarrier() |
53 | :count(0) { } |
54 | |
55 | void ThreadEngineBarrier::acquire() |
56 | { |
57 | forever { |
58 | int localCount = count.loadRelaxed(); |
59 | if (localCount < 0) { |
60 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount -1)) |
61 | return; |
62 | } else { |
63 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1)) |
64 | return; |
65 | } |
66 | qYieldCpu(); |
67 | } |
68 | } |
69 | |
70 | int ThreadEngineBarrier::release() |
71 | { |
72 | forever { |
73 | int localCount = count.loadRelaxed(); |
74 | if (localCount == -1) { |
75 | if (count.testAndSetOrdered(expectedValue: -1, newValue: 0)) { |
76 | semaphore.release(); |
77 | return 0; |
78 | } |
79 | } else if (localCount < 0) { |
80 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1)) |
81 | return qAbs(t: localCount + 1); |
82 | } else { |
83 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1)) |
84 | return localCount - 1; |
85 | } |
86 | qYieldCpu(); |
87 | } |
88 | } |
89 | |
90 | // Wait until all threads have been released |
91 | void ThreadEngineBarrier::wait() |
92 | { |
93 | forever { |
94 | int localCount = count.loadRelaxed(); |
95 | if (localCount == 0) |
96 | return; |
97 | |
98 | Q_ASSERT(localCount > 0); // multiple waiters are not allowed. |
99 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: -localCount)) { |
100 | semaphore.acquire(); |
101 | return; |
102 | } |
103 | qYieldCpu(); |
104 | } |
105 | } |
106 | |
107 | int ThreadEngineBarrier::currentCount() |
108 | { |
109 | return count.loadRelaxed(); |
110 | } |
111 | |
112 | // releases a thread, unless this is the last thread. |
113 | // returns true if the thread was released. |
114 | bool ThreadEngineBarrier::releaseUnlessLast() |
115 | { |
116 | forever { |
117 | int localCount = count.loadRelaxed(); |
118 | if (qAbs(t: localCount) == 1) { |
119 | return false; |
120 | } else if (localCount < 0) { |
121 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount + 1)) |
122 | return true; |
123 | } else { |
124 | if (count.testAndSetOrdered(expectedValue: localCount, newValue: localCount - 1)) |
125 | return true; |
126 | } |
127 | qYieldCpu(); |
128 | } |
129 | } |
130 | |
131 | ThreadEngineBase::ThreadEngineBase(QThreadPool *pool) |
132 | : futureInterface(nullptr), threadPool(pool) |
133 | { |
134 | setAutoDelete(false); |
135 | } |
136 | |
137 | ThreadEngineBase::~ThreadEngineBase() {} |
138 | |
139 | void ThreadEngineBase::startSingleThreaded() |
140 | { |
141 | start(); |
142 | while (threadFunction() != ThreadFinished) |
143 | ; |
144 | finish(); |
145 | } |
146 | |
147 | void ThreadEngineBase::startThread() |
148 | { |
149 | startThreadInternal(); |
150 | } |
151 | |
152 | void ThreadEngineBase::acquireBarrierSemaphore() |
153 | { |
154 | barrier.acquire(); |
155 | } |
156 | |
157 | void ThreadEngineBase::reportIfSuspensionDone() const |
158 | { |
159 | if (futureInterface && futureInterface->isSuspending()) |
160 | futureInterface->reportSuspended(); |
161 | } |
162 | |
163 | bool ThreadEngineBase::isCanceled() |
164 | { |
165 | if (futureInterface) |
166 | return futureInterface->isCanceled(); |
167 | else |
168 | return false; |
169 | } |
170 | |
171 | void ThreadEngineBase::waitForResume() |
172 | { |
173 | if (futureInterface) |
174 | futureInterface->waitForResume(); |
175 | } |
176 | |
177 | bool ThreadEngineBase::isProgressReportingEnabled() |
178 | { |
179 | // If we don't have a QFuture, there is no-one to report the progress to. |
180 | return (futureInterface != nullptr); |
181 | } |
182 | |
183 | void ThreadEngineBase::setProgressValue(int progress) |
184 | { |
185 | if (futureInterface) |
186 | futureInterface->setProgressValue(progress); |
187 | } |
188 | |
189 | void ThreadEngineBase::setProgressRange(int minimum, int maximum) |
190 | { |
191 | if (futureInterface) |
192 | futureInterface->setProgressRange(minimum, maximum); |
193 | } |
194 | |
195 | bool ThreadEngineBase::startThreadInternal() |
196 | { |
197 | if (this->isCanceled()) |
198 | return false; |
199 | |
200 | barrier.acquire(); |
201 | if (!threadPool->tryStart(runnable: this)) { |
202 | barrier.release(); |
203 | return false; |
204 | } |
205 | return true; |
206 | } |
207 | |
208 | void ThreadEngineBase::startThreads() |
209 | { |
210 | while (shouldStartThread() && startThreadInternal()) |
211 | ; |
212 | } |
213 | |
214 | void ThreadEngineBase::threadExit() |
215 | { |
216 | const bool asynchronous = (futureInterface != nullptr); |
217 | const int lastThread = (barrier.release() == 0); |
218 | |
219 | if (lastThread && asynchronous) |
220 | this->asynchronousFinish(); |
221 | } |
222 | |
223 | // Called by a worker thread that wants to be throttled. If the current number |
224 | // of running threads is larger than one the thread is allowed to exit and |
225 | // this function returns one. |
226 | bool ThreadEngineBase::threadThrottleExit() |
227 | { |
228 | return barrier.releaseUnlessLast(); |
229 | } |
230 | |
231 | void ThreadEngineBase::run() // implements QRunnable. |
232 | { |
233 | if (this->isCanceled()) { |
234 | threadExit(); |
235 | return; |
236 | } |
237 | |
238 | startThreads(); |
239 | |
240 | #ifndef QT_NO_EXCEPTIONS |
241 | try { |
242 | #endif |
243 | while (threadFunction() == ThrottleThread) { |
244 | // threadFunction returning ThrottleThread means it that the user |
245 | // struct wants to be throttled by making a worker thread exit. |
246 | // Respect that request unless this is the only worker thread left |
247 | // running, in which case it has to keep going. |
248 | if (threadThrottleExit()) { |
249 | return; |
250 | } else { |
251 | // If the last worker thread is throttled and the state is "suspending", |
252 | // it means that suspension has been requested, and it is already |
253 | // in effect (because all previous threads have already exited). |
254 | // Report the "Suspended" state. |
255 | reportIfSuspensionDone(); |
256 | } |
257 | } |
258 | |
259 | #ifndef QT_NO_EXCEPTIONS |
260 | } catch (QException &e) { |
261 | handleException(exception: e); |
262 | } catch (...) { |
263 | handleException(exception: QUnhandledException(std::current_exception())); |
264 | } |
265 | #endif |
266 | threadExit(); |
267 | } |
268 | |
269 | #ifndef QT_NO_EXCEPTIONS |
270 | |
271 | void ThreadEngineBase::handleException(const QException &exception) |
272 | { |
273 | if (futureInterface) { |
274 | futureInterface->reportException(e: exception); |
275 | } else { |
276 | QMutexLocker lock(&mutex); |
277 | if (!exceptionStore.hasException()) |
278 | exceptionStore.setException(exception); |
279 | } |
280 | } |
281 | #endif |
282 | |
283 | |
284 | } // namespace QtConcurrent |
285 | |
286 | QT_END_NAMESPACE |
287 | |
288 | #endif // QT_NO_CONCURRENT |
289 |
Definitions
- ThreadEngineBarrier
- acquire
- release
- wait
- currentCount
- releaseUnlessLast
- ThreadEngineBase
- ~ThreadEngineBase
- startSingleThreaded
- startThread
- acquireBarrierSemaphore
- reportIfSuspensionDone
- isCanceled
- waitForResume
- isProgressReportingEnabled
- setProgressValue
- setProgressRange
- startThreadInternal
- startThreads
- threadExit
- threadThrottleExit
- run
Learn Advanced QML with KDAB
Find out more