1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
5#ifndef QTCONCURRENT_ITERATEKERNEL_H
6#define QTCONCURRENT_ITERATEKERNEL_H
7
8#include <QtConcurrent/qtconcurrent_global.h>
9
10#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
11
12#include <QtCore/qatomic.h>
13#include <QtConcurrent/qtconcurrentmedian.h>
14#include <QtConcurrent/qtconcurrentthreadengine.h>
15
16#include <iterator>
17
18QT_BEGIN_NAMESPACE
19
20
21
22namespace QtConcurrent {
23
24/*
25 The BlockSizeManager class manages how many iterations a thread should
26 reserve and process at a time. This is done by measuring the time spent
27 in the user code versus the control part code, and then increasing
28 the block size if the ratio between them is to small. The block size
29 management is done on the basis of the median of several timing measurements,
30 and it is done individually for each thread.
31*/
32class Q_CONCURRENT_EXPORT BlockSizeManager
33{
34public:
35 explicit BlockSizeManager(QThreadPool *pool, int iterationCount);
36
37 void timeBeforeUser();
38 void timeAfterUser();
39 int blockSize();
40
41private:
42 inline bool blockSizeMaxed()
43 {
44 return (m_blockSize >= maxBlockSize);
45 }
46
47 const int maxBlockSize;
48 qint64 beforeUser;
49 qint64 afterUser;
50 Median controlPartElapsed;
51 Median userPartElapsed;
52 int m_blockSize;
53
54 Q_DISABLE_COPY(BlockSizeManager)
55};
56
57template <typename T>
58class ResultReporter
59{
60public:
61 ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
62 : threadEngine(_threadEngine), defaultValue(_defaultValue)
63 {
64 }
65
66 void reserveSpace(int resultCount)
67 {
68 currentResultCount = resultCount;
69 resizeList(size: qMax(resultCount, vector.size()));
70 }
71
72 void reportResults(int begin)
73 {
74 const int useVectorThreshold = 4; // Tunable parameter.
75 if (currentResultCount > useVectorThreshold) {
76 resizeList(size: currentResultCount);
77 threadEngine->reportResults(vector, begin);
78 } else {
79 for (int i = 0; i < currentResultCount; ++i)
80 threadEngine->reportResult(&vector.at(i), begin + i);
81 }
82 }
83
84 inline T * getPointer()
85 {
86 return vector.data();
87 }
88
89 int currentResultCount = 0;
90 ThreadEngine<T> *threadEngine;
91 QList<T> vector;
92
93private:
94 void resizeList(qsizetype size)
95 {
96 if constexpr (std::is_default_constructible_v<T>)
97 vector.resize(size);
98 else
99 vector.resize(size, defaultValue);
100 }
101
102 T &defaultValue;
103};
104
105template <>
106class ResultReporter<void>
107{
108public:
109 inline ResultReporter(ThreadEngine<void> *) { }
110 inline void reserveSpace(int) { }
111 inline void reportResults(int) { }
112 inline void * getPointer() { return nullptr; }
113};
114
115template<typename T>
116struct DefaultValueContainer
117{
118 template<typename U = T>
119 DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
120 {
121 }
122
123 T value;
124};
125
126template<>
127struct DefaultValueContainer<void>
128{
129};
130
131inline bool selectIteration(std::bidirectional_iterator_tag)
132{
133 return false; // while
134}
135
136inline bool selectIteration(std::forward_iterator_tag)
137{
138 return false; // while
139}
140
141inline bool selectIteration(std::random_access_iterator_tag)
142{
143 return true; // for
144}
145
146template <typename Iterator, typename T>
147class IterateKernel : public ThreadEngine<T>
148{
149 using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
150
151public:
152 typedef T ResultType;
153
154 template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
155 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
156 : ThreadEngine<U>(pool),
157 begin(_begin),
158 end(_end),
159 current(_begin),
160 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
161 forIteration(selectIteration(IteratorCategory())),
162 progressReportingEnabled(true)
163 {
164 }
165
166 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
167 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
168 : ThreadEngine<U>(pool),
169 begin(_begin),
170 end(_end),
171 current(_begin),
172 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
173 forIteration(selectIteration(IteratorCategory())),
174 progressReportingEnabled(true),
175 defaultValue(U())
176 {
177 }
178
179 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
180 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
181 : ThreadEngine<U>(pool),
182 begin(_begin),
183 end(_end),
184 current(_begin),
185 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
186 forIteration(selectIteration(IteratorCategory())),
187 progressReportingEnabled(true),
188 defaultValue(std::forward<U>(_defaultValue))
189 {
190 }
191
192 virtual ~IterateKernel() { }
193
194 virtual bool runIteration(Iterator, int , T *) { return false; }
195 virtual bool runIterations(Iterator, int, int, T *) { return false; }
196
197 void start() override
198 {
199 progressReportingEnabled = this->isProgressReportingEnabled();
200 if (progressReportingEnabled && iterationCount > 0)
201 this->setProgressRange(0, iterationCount);
202 }
203
204 bool shouldStartThread() override
205 {
206 if (forIteration)
207 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
208 else // whileIteration
209 return (iteratorThreads.loadRelaxed() == 0);
210 }
211
212 ThreadFunctionResult threadFunction() override
213 {
214 if (forIteration)
215 return this->forThreadFunction();
216 else // whileIteration
217 return this->whileThreadFunction();
218 }
219
220 ThreadFunctionResult forThreadFunction()
221 {
222 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
223 ResultReporter<T> resultReporter = createResultsReporter();
224
225 for(;;) {
226 if (this->isCanceled())
227 break;
228
229 const int currentBlockSize = blockSizeManager.blockSize();
230
231 if (currentIndex.loadRelaxed() >= iterationCount)
232 break;
233
234 // Atomically reserve a block of iterationCount for this thread.
235 const int beginIndex = currentIndex.fetchAndAddRelease(valueToAdd: currentBlockSize);
236 const int endIndex = qMin(a: beginIndex + currentBlockSize, b: iterationCount);
237
238 if (beginIndex >= endIndex) {
239 // No more work
240 break;
241 }
242
243 this->waitForResume(); // (only waits if the qfuture is paused.)
244
245 if (shouldStartThread())
246 this->startThread();
247
248 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
249 resultReporter.reserveSpace(finalBlockSize);
250
251 // Call user code with the current iteration range.
252 blockSizeManager.timeBeforeUser();
253 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
254 blockSizeManager.timeAfterUser();
255
256 if (resultsAvailable)
257 resultReporter.reportResults(beginIndex);
258
259 // Report progress if progress reporting enabled.
260 if (progressReportingEnabled) {
261 completed.fetchAndAddAcquire(valueToAdd: finalBlockSize);
262 this->setProgressValue(this->completed.loadRelaxed());
263 }
264
265 if (this->shouldThrottleThread())
266 return ThrottleThread;
267 }
268 return ThreadFinished;
269 }
270
271 ThreadFunctionResult whileThreadFunction()
272 {
273 if (iteratorThreads.testAndSetAcquire(expectedValue: 0, newValue: 1) == false)
274 return ThreadFinished;
275
276 ResultReporter<T> resultReporter = createResultsReporter();
277 resultReporter.reserveSpace(1);
278
279 while (current != end) {
280 // The following two lines breaks support for input iterators according to
281 // the sgi docs: dereferencing prev after calling ++current is not allowed
282 // on input iterators. (prev is dereferenced inside user.runIteration())
283 Iterator prev = current;
284 ++current;
285 int index = currentIndex.fetchAndAddRelaxed(valueToAdd: 1);
286 iteratorThreads.testAndSetRelease(expectedValue: 1, newValue: 0);
287
288 this->waitForResume(); // (only waits if the qfuture is paused.)
289
290 if (shouldStartThread())
291 this->startThread();
292
293 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
294 if (resultAavailable)
295 resultReporter.reportResults(index);
296
297 if (this->shouldThrottleThread())
298 return ThrottleThread;
299
300 if (iteratorThreads.testAndSetAcquire(expectedValue: 0, newValue: 1) == false)
301 return ThreadFinished;
302 }
303
304 return ThreadFinished;
305 }
306
307private:
308 ResultReporter<T> createResultsReporter()
309 {
310 if constexpr (!std::is_same_v<T, void>)
311 return ResultReporter<T>(this, defaultValue.value);
312 else
313 return ResultReporter<T>(this);
314 }
315
316public:
317 const Iterator begin;
318 const Iterator end;
319 Iterator current;
320 QAtomicInt currentIndex;
321 QAtomicInt iteratorThreads;
322 QAtomicInt completed;
323 const int iterationCount;
324 const bool forIteration;
325 bool progressReportingEnabled;
326 DefaultValueContainer<ResultType> defaultValue;
327};
328
329} // namespace QtConcurrent
330
331
332QT_END_NAMESPACE
333
334#endif // QT_NO_CONCURRENT
335
336#endif
337

source code of qtbase/src/concurrent/qtconcurrentiteratekernel.h