1// Copyright (C) 2016 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
3
4#ifndef QTCONCURRENT_REDUCEKERNEL_H
5#define QTCONCURRENT_REDUCEKERNEL_H
6
7#include <QtConcurrent/qtconcurrent_global.h>
8
9#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
10
11#include <QtCore/qatomic.h>
12#include <QtCore/qlist.h>
13#include <QtCore/qmap.h>
14#include <QtCore/qmutex.h>
15#include <QtCore/qthread.h>
16#include <QtCore/qthreadpool.h>
17
18#include <mutex>
19
20QT_BEGIN_NAMESPACE
21
22namespace QtPrivate {
23
24template<typename Sequence>
25struct SequenceHolder
26{
27 SequenceHolder(const Sequence &s) : sequence(s) { }
28 SequenceHolder(Sequence &&s) : sequence(std::move(s)) { }
29 Sequence sequence;
30};
31
32}
33
34namespace QtConcurrent {
35
36/*
37 The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
38 limit the reduce queue size for MapReduce. When the number of
39 reduce blocks in the queue exceeds ReduceQueueStartLimit,
40 MapReduce won't start any new threads, and when it exceeds
41 ReduceQueueThrottleLimit running threads will be stopped.
42*/
43#ifdef Q_QDOC
44enum ReduceQueueLimits {
45 ReduceQueueStartLimit = 20,
46 ReduceQueueThrottleLimit = 30
47};
48#else
49enum {
50 ReduceQueueStartLimit = 20,
51 ReduceQueueThrottleLimit = 30
52};
53#endif
54
55// IntermediateResults holds a block of intermediate results from a
56// map or filter functor. The begin/end offsets indicates the origin
57// and range of the block.
58template <typename T>
59class IntermediateResults
60{
61public:
62 int begin, end;
63 QList<T> vector;
64};
65
66enum ReduceOption {
67 UnorderedReduce = 0x1,
68 OrderedReduce = 0x2,
69 SequentialReduce = 0x4
70 // ParallelReduce = 0x8
71};
72Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
73#ifndef Q_QDOC
74Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
75#endif
76// supports both ordered and out-of-order reduction
77template <typename ReduceFunctor, typename ReduceResultType, typename T>
78class ReduceKernel
79{
80 typedef QMap<int, IntermediateResults<T> > ResultsMap;
81
82 const ReduceOptions reduceOptions;
83
84 QMutex mutex;
85 int progress, resultsMapSize;
86 const int threadCount;
87 ResultsMap resultsMap;
88
89 bool canReduce(int begin) const
90 {
91 return (((reduceOptions & UnorderedReduce)
92 && progress == 0)
93 || ((reduceOptions & OrderedReduce)
94 && progress == begin));
95 }
96
97 void reduceResult(ReduceFunctor &reduce,
98 ReduceResultType &r,
99 const IntermediateResults<T> &result)
100 {
101 for (int i = 0; i < result.vector.size(); ++i) {
102 std::invoke(reduce, r, result.vector.at(i));
103 }
104 }
105
106 void reduceResults(ReduceFunctor &reduce,
107 ReduceResultType &r,
108 ResultsMap &map)
109 {
110 typename ResultsMap::iterator it = map.begin();
111 while (it != map.end()) {
112 reduceResult(reduce, r, result: it.value());
113 ++it;
114 }
115 }
116
117public:
118 ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
119 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
120 threadCount(pool->maxThreadCount())
121 { }
122
123 void runReduce(ReduceFunctor &reduce,
124 ReduceResultType &r,
125 const IntermediateResults<T> &result)
126 {
127 std::unique_lock<QMutex> locker(mutex);
128 if (!canReduce(begin: result.begin)) {
129 ++resultsMapSize;
130 resultsMap.insert(result.begin, result);
131 return;
132 }
133
134 if (reduceOptions & UnorderedReduce) {
135 // UnorderedReduce
136 progress = -1;
137
138 // reduce this result
139 locker.unlock();
140 reduceResult(reduce, r, result);
141 locker.lock();
142
143 // reduce all stored results as well
144 while (!resultsMap.isEmpty()) {
145 ResultsMap resultsMapCopy = resultsMap;
146 resultsMap.clear();
147
148 locker.unlock();
149 reduceResults(reduce, r, map&: resultsMapCopy);
150 locker.lock();
151
152 resultsMapSize -= resultsMapCopy.size();
153 }
154
155 progress = 0;
156 } else {
157 // reduce this result
158 locker.unlock();
159 reduceResult(reduce, r, result);
160 locker.lock();
161
162 // OrderedReduce
163 progress += result.end - result.begin;
164
165 // reduce as many other results as possible
166 typename ResultsMap::iterator it = resultsMap.begin();
167 while (it != resultsMap.end()) {
168 if (it.value().begin != progress)
169 break;
170
171 locker.unlock();
172 reduceResult(reduce, r, result: it.value());
173 locker.lock();
174
175 --resultsMapSize;
176 progress += it.value().end - it.value().begin;
177 it = resultsMap.erase(it);
178 }
179 }
180 }
181
182 // final reduction
183 void finish(ReduceFunctor &reduce, ReduceResultType &r)
184 {
185 reduceResults(reduce, r, map&: resultsMap);
186 }
187
188 inline bool shouldThrottle()
189 {
190 std::lock_guard<QMutex> locker(mutex);
191 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
192 }
193
194 inline bool shouldStartThread()
195 {
196 std::lock_guard<QMutex> locker(mutex);
197 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
198 }
199};
200
201template <typename Sequence, typename Base, typename Functor1, typename Functor2>
202struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
203{
204 template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2>
205 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
206 ReduceOptions reduceOptions)
207 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
208 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
209 std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions)
210 { }
211
212 template<typename InitialValueType, typename S = Sequence,
213 typename F1 = Functor1, typename F2 = Functor2>
214 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
215 InitialValueType &&initialValue, ReduceOptions reduceOptions)
216 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
217 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
218 std::forward<F1>(functor1), std::forward<F2>(functor2),
219 std::forward<InitialValueType>(initialValue), reduceOptions)
220 { }
221
222 void finish() override
223 {
224 Base::finish();
225 // Clear the sequence to make sure all temporaries are destroyed
226 // before finished is signaled.
227 this->sequence = Sequence();
228 }
229};
230
231} // namespace QtConcurrent
232
233QT_END_NAMESPACE
234
235#endif // QT_NO_CONCURRENT
236
237#endif
238

source code of qtbase/src/concurrent/qtconcurrentreducekernel.h