1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#ifndef QTCONCURRENT_ITERATEKERNEL_H
5#define QTCONCURRENT_ITERATEKERNEL_H
6
7#include <QtConcurrent/qtconcurrent_global.h>
8
9#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
10
11#include <QtCore/qatomic.h>
12#include <QtConcurrent/qtconcurrentmedian.h>
13#include <QtConcurrent/qtconcurrentthreadengine.h>
14
15#include <iterator>
16
17QT_BEGIN_NAMESPACE
18
19
20
21namespace QtConcurrent {
22
23/*
24 The BlockSizeManager class manages how many iterations a thread should
25 reserve and process at a time. This is done by measuring the time spent
26 in the user code versus the control part code, and then increasing
27 the block size if the ratio between them is to small. The block size
28 management is done on the basis of the median of several timing measurements,
29 and it is done individually for each thread.
30*/
31class Q_CONCURRENT_EXPORT BlockSizeManager
32{
33public:
34 explicit BlockSizeManager(QThreadPool *pool, int iterationCount);
35
36 void timeBeforeUser();
37 void timeAfterUser();
38 int blockSize();
39
40private:
41 inline bool blockSizeMaxed()
42 {
43 return (m_blockSize >= maxBlockSize);
44 }
45
46 const int maxBlockSize;
47 qint64 beforeUser;
48 qint64 afterUser;
49 Median controlPartElapsed;
50 Median userPartElapsed;
51 int m_blockSize;
52
53 Q_DISABLE_COPY(BlockSizeManager)
54};
55
56template <typename T>
57class ResultReporter
58{
59public:
60 ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
61 : threadEngine(_threadEngine), defaultValue(_defaultValue)
62 {
63 }
64
65 void reserveSpace(int resultCount)
66 {
67 currentResultCount = resultCount;
68 resizeList(size: qMax(resultCount, vector.size()));
69 }
70
71 void reportResults(int begin)
72 {
73 const int useVectorThreshold = 4; // Tunable parameter.
74 if (currentResultCount > useVectorThreshold) {
75 resizeList(size: currentResultCount);
76 threadEngine->reportResults(vector, begin);
77 } else {
78 for (int i = 0; i < currentResultCount; ++i)
79 threadEngine->reportResult(&vector.at(i), begin + i);
80 }
81 }
82
83 inline T * getPointer()
84 {
85 return vector.data();
86 }
87
88 int currentResultCount;
89 ThreadEngine<T> *threadEngine;
90 QList<T> vector;
91
92private:
93 void resizeList(qsizetype size)
94 {
95 if constexpr (std::is_default_constructible_v<T>)
96 vector.resize(size);
97 else
98 vector.resize(size, defaultValue);
99 }
100
101 T &defaultValue;
102};
103
104template <>
105class ResultReporter<void>
106{
107public:
108 inline ResultReporter(ThreadEngine<void> *) { }
109 inline void reserveSpace(int) { }
110 inline void reportResults(int) { }
111 inline void * getPointer() { return nullptr; }
112};
113
114template<typename T>
115struct DefaultValueContainer
116{
117 template<typename U = T>
118 DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
119 {
120 }
121
122 T value;
123};
124
125template<>
126struct DefaultValueContainer<void>
127{
128};
129
130inline bool selectIteration(std::bidirectional_iterator_tag)
131{
132 return false; // while
133}
134
135inline bool selectIteration(std::forward_iterator_tag)
136{
137 return false; // while
138}
139
140inline bool selectIteration(std::random_access_iterator_tag)
141{
142 return true; // for
143}
144
145template <typename Iterator, typename T>
146class IterateKernel : public ThreadEngine<T>
147{
148 using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
149
150public:
151 typedef T ResultType;
152
153 template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
154 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
155 : ThreadEngine<U>(pool),
156 begin(_begin),
157 end(_end),
158 current(_begin),
159 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
160 forIteration(selectIteration(IteratorCategory())),
161 progressReportingEnabled(true)
162 {
163 }
164
165 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
166 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
167 : ThreadEngine<U>(pool),
168 begin(_begin),
169 end(_end),
170 current(_begin),
171 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
172 forIteration(selectIteration(IteratorCategory())),
173 progressReportingEnabled(true),
174 defaultValue(U())
175 {
176 }
177
178 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
179 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
180 : ThreadEngine<U>(pool),
181 begin(_begin),
182 end(_end),
183 current(_begin),
184 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
185 forIteration(selectIteration(IteratorCategory())),
186 progressReportingEnabled(true),
187 defaultValue(std::forward<U>(_defaultValue))
188 {
189 }
190
191 virtual ~IterateKernel() { }
192
193 virtual bool runIteration(Iterator, int , T *) { return false; }
194 virtual bool runIterations(Iterator, int, int, T *) { return false; }
195
196 void start() override
197 {
198 progressReportingEnabled = this->isProgressReportingEnabled();
199 if (progressReportingEnabled && iterationCount > 0)
200 this->setProgressRange(0, iterationCount);
201 }
202
203 bool shouldStartThread() override
204 {
205 if (forIteration)
206 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
207 else // whileIteration
208 return (iteratorThreads.loadRelaxed() == 0);
209 }
210
211 ThreadFunctionResult threadFunction() override
212 {
213 if (forIteration)
214 return this->forThreadFunction();
215 else // whileIteration
216 return this->whileThreadFunction();
217 }
218
219 ThreadFunctionResult forThreadFunction()
220 {
221 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
222 ResultReporter<T> resultReporter = createResultsReporter();
223
224 for(;;) {
225 if (this->isCanceled())
226 break;
227
228 const int currentBlockSize = blockSizeManager.blockSize();
229
230 if (currentIndex.loadRelaxed() >= iterationCount)
231 break;
232
233 // Atomically reserve a block of iterationCount for this thread.
234 const int beginIndex = currentIndex.fetchAndAddRelease(valueToAdd: currentBlockSize);
235 const int endIndex = qMin(a: beginIndex + currentBlockSize, b: iterationCount);
236
237 if (beginIndex >= endIndex) {
238 // No more work
239 break;
240 }
241
242 this->waitForResume(); // (only waits if the qfuture is paused.)
243
244 if (shouldStartThread())
245 this->startThread();
246
247 const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
248 resultReporter.reserveSpace(finalBlockSize);
249
250 // Call user code with the current iteration range.
251 blockSizeManager.timeBeforeUser();
252 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
253 blockSizeManager.timeAfterUser();
254
255 if (resultsAvailable)
256 resultReporter.reportResults(beginIndex);
257
258 // Report progress if progress reporting enabled.
259 if (progressReportingEnabled) {
260 completed.fetchAndAddAcquire(valueToAdd: finalBlockSize);
261 this->setProgressValue(this->completed.loadRelaxed());
262 }
263
264 if (this->shouldThrottleThread())
265 return ThrottleThread;
266 }
267 return ThreadFinished;
268 }
269
270 ThreadFunctionResult whileThreadFunction()
271 {
272 if (iteratorThreads.testAndSetAcquire(expectedValue: 0, newValue: 1) == false)
273 return ThreadFinished;
274
275 ResultReporter<T> resultReporter = createResultsReporter();
276 resultReporter.reserveSpace(1);
277
278 while (current != end) {
279 // The following two lines breaks support for input iterators according to
280 // the sgi docs: dereferencing prev after calling ++current is not allowed
281 // on input iterators. (prev is dereferenced inside user.runIteration())
282 Iterator prev = current;
283 ++current;
284 int index = currentIndex.fetchAndAddRelaxed(valueToAdd: 1);
285 iteratorThreads.testAndSetRelease(expectedValue: 1, newValue: 0);
286
287 this->waitForResume(); // (only waits if the qfuture is paused.)
288
289 if (shouldStartThread())
290 this->startThread();
291
292 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
293 if (resultAavailable)
294 resultReporter.reportResults(index);
295
296 if (this->shouldThrottleThread())
297 return ThrottleThread;
298
299 if (iteratorThreads.testAndSetAcquire(expectedValue: 0, newValue: 1) == false)
300 return ThreadFinished;
301 }
302
303 return ThreadFinished;
304 }
305
306private:
307 ResultReporter<T> createResultsReporter()
308 {
309 if constexpr (!std::is_same_v<T, void>)
310 return ResultReporter<T>(this, defaultValue.value);
311 else
312 return ResultReporter<T>(this);
313 }
314
315public:
316 const Iterator begin;
317 const Iterator end;
318 Iterator current;
319 QAtomicInt currentIndex;
320 QAtomicInt iteratorThreads;
321 QAtomicInt completed;
322 const int iterationCount;
323 const bool forIteration;
324 bool progressReportingEnabled;
325 DefaultValueContainer<ResultType> defaultValue;
326};
327
328} // namespace QtConcurrent
329
330
331QT_END_NAMESPACE
332
333#endif // QT_NO_CONCURRENT
334
335#endif
336

source code of qtbase/src/concurrent/qtconcurrentiteratekernel.h