1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3// Qt-Security score:significant reason:default
4
5#ifndef QTCONCURRENT_REDUCEKERNEL_H
6#define QTCONCURRENT_REDUCEKERNEL_H
7
8#include <QtConcurrent/qtconcurrent_global.h>
9
10#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
11
12#include <QtCore/qatomic.h>
13#include <QtCore/qlist.h>
14#include <QtCore/qmap.h>
15#include <QtCore/qmutex.h>
16#include <QtCore/qthread.h>
17#include <QtCore/qthreadpool.h>
18
19#include <mutex>
20
21QT_BEGIN_NAMESPACE
22
23namespace QtPrivate {
24
25template<typename Sequence>
26struct SequenceHolder
27{
28 SequenceHolder(const Sequence &s) : sequence(s) { }
29 SequenceHolder(Sequence &&s) : sequence(std::move(s)) { }
30 Sequence sequence;
31};
32
33}
34
35namespace QtConcurrent {
36
37/*
38 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
39 limit the reduce queue size for MapReduce. When the number of
40 reduce blocks in the queue exceeds ReduceQueueStartLimit,
41 MapReduce won't start any new threads, and when it exceeds
42 ReduceQueueThrottleLimit running threads will be stopped.
43*/
44#ifdef Q_QDOC
45enum ReduceQueueLimits {
46 ReduceQueueStartLimit = 20,
47 ReduceQueueThrottleLimit = 30
48};
49#else
50enum {
51 ReduceQueueStartLimit = 20,
52 ReduceQueueThrottleLimit = 30
53};
54#endif
55
56// IntermediateResults holds a block of intermediate results from a
57// map or filter functor. The begin/end offsets indicates the origin
58// and range of the block.
59template <typename T>
60class IntermediateResults
61{
62public:
63 int begin, end;
64 QList<T> vector;
65};
66
67enum ReduceOption {
68 UnorderedReduce = 0x1,
69 OrderedReduce = 0x2,
70 SequentialReduce = 0x4
71 // ParallelReduce = 0x8
72};
73Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
74#ifndef Q_QDOC
75Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
76#endif
77// supports both ordered and out-of-order reduction
78template <typename ReduceFunctor, typename ReduceResultType, typename T>
79class ReduceKernel
80{
81 typedef QMap<int, IntermediateResults<T> > ResultsMap;
82
83 const ReduceOptions reduceOptions;
84
85 QMutex mutex;
86 int progress, resultsMapSize;
87 const int threadCount;
88 ResultsMap resultsMap;
89
90 bool canReduce(int begin) const
91 {
92 return (((reduceOptions & UnorderedReduce)
93 && progress == 0)
94 || ((reduceOptions & OrderedReduce)
95 && progress == begin));
96 }
97
98 void reduceResult(ReduceFunctor &reduce,
99 ReduceResultType &r,
100 const IntermediateResults<T> &result)
101 {
102 for (int i = 0; i < result.vector.size(); ++i) {
103 std::invoke(reduce, r, result.vector.at(i));
104 }
105 }
106
107 void reduceResults(ReduceFunctor &reduce,
108 ReduceResultType &r,
109 ResultsMap &map)
110 {
111 typename ResultsMap::iterator it = map.begin();
112 while (it != map.end()) {
113 reduceResult(reduce, r, result: it.value());
114 ++it;
115 }
116 }
117
118public:
119 ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
120 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
121 threadCount(std::max(a: pool->maxThreadCount(), b: 1))
122 { }
123
124 void runReduce(ReduceFunctor &reduce,
125 ReduceResultType &r,
126 const IntermediateResults<T> &result)
127 {
128 std::unique_lock<QMutex> locker(mutex);
129 if (!canReduce(begin: result.begin)) {
130 ++resultsMapSize;
131 resultsMap.insert(result.begin, result);
132 return;
133 }
134
135 if (reduceOptions & UnorderedReduce) {
136 // UnorderedReduce
137 progress = -1;
138
139 // reduce this result
140 locker.unlock();
141 reduceResult(reduce, r, result);
142 locker.lock();
143
144 // reduce all stored results as well
145 while (!resultsMap.isEmpty()) {
146 ResultsMap resultsMapCopy = resultsMap;
147 resultsMap.clear();
148
149 locker.unlock();
150 reduceResults(reduce, r, map&: resultsMapCopy);
151 locker.lock();
152
153 resultsMapSize -= resultsMapCopy.size();
154 }
155
156 progress = 0;
157 } else {
158 // reduce this result
159 locker.unlock();
160 reduceResult(reduce, r, result);
161 locker.lock();
162
163 // OrderedReduce
164 progress += result.end - result.begin;
165
166 // reduce as many other results as possible
167 typename ResultsMap::iterator it = resultsMap.begin();
168 while (it != resultsMap.end()) {
169 if (it.value().begin != progress)
170 break;
171
172 locker.unlock();
173 reduceResult(reduce, r, result: it.value());
174 locker.lock();
175
176 --resultsMapSize;
177 progress += it.value().end - it.value().begin;
178 it = resultsMap.erase(it);
179 }
180 }
181 }
182
183 // final reduction
184 void finish(ReduceFunctor &reduce, ReduceResultType &r)
185 {
186 reduceResults(reduce, r, map&: resultsMap);
187 }
188
189 inline bool shouldThrottle()
190 {
191 std::lock_guard<QMutex> locker(mutex);
192 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
193 }
194
195 inline bool shouldStartThread()
196 {
197 std::lock_guard<QMutex> locker(mutex);
198 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
199 }
200};
201
202template <typename Sequence, typename Base, typename Functor1, typename Functor2>
203struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
204{
205 template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2>
206 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
207 ReduceOptions reduceOptions)
208 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
209 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
210 std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions)
211 { }
212
213 template<typename InitialValueType, typename S = Sequence,
214 typename F1 = Functor1, typename F2 = Functor2>
215 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
216 InitialValueType &&initialValue, ReduceOptions reduceOptions)
217 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
218 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
219 std::forward<F1>(functor1), std::forward<F2>(functor2),
220 std::forward<InitialValueType>(initialValue), reduceOptions)
221 { }
222
223 void finish() override
224 {
225 Base::finish();
226 // Clear the sequence to make sure all temporaries are destroyed
227 // before finished is signaled.
228 this->sequence = Sequence();
229 }
230};
231
232} // namespace QtConcurrent
233
234QT_END_NAMESPACE
235
236#endif // QT_NO_CONCURRENT
237
238#endif
239

source code of qtbase/src/concurrent/qtconcurrentreducekernel.h