1 | // Copyright (C) 2016 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only |
3 | |
4 | #ifndef QTCONCURRENT_REDUCEKERNEL_H |
5 | #define QTCONCURRENT_REDUCEKERNEL_H |
6 | |
7 | #include <QtConcurrent/qtconcurrent_global.h> |
8 | |
9 | #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC) |
10 | |
11 | #include <QtCore/qatomic.h> |
12 | #include <QtCore/qlist.h> |
13 | #include <QtCore/qmap.h> |
14 | #include <QtCore/qmutex.h> |
15 | #include <QtCore/qthread.h> |
16 | #include <QtCore/qthreadpool.h> |
17 | |
18 | #include <mutex> |
19 | |
20 | QT_BEGIN_NAMESPACE |
21 | |
22 | namespace QtPrivate { |
23 | |
24 | template<typename Sequence> |
25 | struct SequenceHolder |
26 | { |
27 | SequenceHolder(const Sequence &s) : sequence(s) { } |
28 | SequenceHolder(Sequence &&s) : sequence(std::move(s)) { } |
29 | Sequence sequence; |
30 | }; |
31 | |
32 | } |
33 | |
34 | namespace QtConcurrent { |
35 | |
36 | /* |
37 | The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants |
38 | limit the reduce queue size for MapReduce. When the number of |
39 | reduce blocks in the queue exceeds ReduceQueueStartLimit, |
40 | MapReduce won't start any new threads, and when it exceeds |
41 | ReduceQueueThrottleLimit running threads will be stopped. |
42 | */ |
43 | #ifdef Q_QDOC |
44 | enum ReduceQueueLimits { |
45 | ReduceQueueStartLimit = 20, |
46 | ReduceQueueThrottleLimit = 30 |
47 | }; |
48 | #else |
49 | enum { |
50 | ReduceQueueStartLimit = 20, |
51 | ReduceQueueThrottleLimit = 30 |
52 | }; |
53 | #endif |
54 | |
55 | // IntermediateResults holds a block of intermediate results from a |
56 | // map or filter functor. The begin/end offsets indicates the origin |
57 | // and range of the block. |
58 | template <typename T> |
59 | class IntermediateResults |
60 | { |
61 | public: |
62 | int begin, end; |
63 | QList<T> vector; |
64 | }; |
65 | |
66 | enum ReduceOption { |
67 | UnorderedReduce = 0x1, |
68 | OrderedReduce = 0x2, |
69 | SequentialReduce = 0x4 |
70 | // ParallelReduce = 0x8 |
71 | }; |
72 | Q_DECLARE_FLAGS(ReduceOptions, ReduceOption) |
73 | #ifndef Q_QDOC |
74 | Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions) |
75 | #endif |
76 | // supports both ordered and out-of-order reduction |
77 | template <typename ReduceFunctor, typename ReduceResultType, typename T> |
78 | class ReduceKernel |
79 | { |
80 | typedef QMap<int, IntermediateResults<T> > ResultsMap; |
81 | |
82 | const ReduceOptions reduceOptions; |
83 | |
84 | QMutex mutex; |
85 | int progress, resultsMapSize; |
86 | const int threadCount; |
87 | ResultsMap resultsMap; |
88 | |
89 | bool canReduce(int begin) const |
90 | { |
91 | return (((reduceOptions & UnorderedReduce) |
92 | && progress == 0) |
93 | || ((reduceOptions & OrderedReduce) |
94 | && progress == begin)); |
95 | } |
96 | |
97 | void reduceResult(ReduceFunctor &reduce, |
98 | ReduceResultType &r, |
99 | const IntermediateResults<T> &result) |
100 | { |
101 | for (int i = 0; i < result.vector.size(); ++i) { |
102 | std::invoke(reduce, r, result.vector.at(i)); |
103 | } |
104 | } |
105 | |
106 | void reduceResults(ReduceFunctor &reduce, |
107 | ReduceResultType &r, |
108 | ResultsMap &map) |
109 | { |
110 | typename ResultsMap::iterator it = map.begin(); |
111 | while (it != map.end()) { |
112 | reduceResult(reduce, r, result: it.value()); |
113 | ++it; |
114 | } |
115 | } |
116 | |
117 | public: |
118 | ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions) |
119 | : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0), |
120 | threadCount(pool->maxThreadCount()) |
121 | { } |
122 | |
123 | void runReduce(ReduceFunctor &reduce, |
124 | ReduceResultType &r, |
125 | const IntermediateResults<T> &result) |
126 | { |
127 | std::unique_lock<QMutex> locker(mutex); |
128 | if (!canReduce(begin: result.begin)) { |
129 | ++resultsMapSize; |
130 | resultsMap.insert(result.begin, result); |
131 | return; |
132 | } |
133 | |
134 | if (reduceOptions & UnorderedReduce) { |
135 | // UnorderedReduce |
136 | progress = -1; |
137 | |
138 | // reduce this result |
139 | locker.unlock(); |
140 | reduceResult(reduce, r, result); |
141 | locker.lock(); |
142 | |
143 | // reduce all stored results as well |
144 | while (!resultsMap.isEmpty()) { |
145 | ResultsMap resultsMapCopy = resultsMap; |
146 | resultsMap.clear(); |
147 | |
148 | locker.unlock(); |
149 | reduceResults(reduce, r, map&: resultsMapCopy); |
150 | locker.lock(); |
151 | |
152 | resultsMapSize -= resultsMapCopy.size(); |
153 | } |
154 | |
155 | progress = 0; |
156 | } else { |
157 | // reduce this result |
158 | locker.unlock(); |
159 | reduceResult(reduce, r, result); |
160 | locker.lock(); |
161 | |
162 | // OrderedReduce |
163 | progress += result.end - result.begin; |
164 | |
165 | // reduce as many other results as possible |
166 | typename ResultsMap::iterator it = resultsMap.begin(); |
167 | while (it != resultsMap.end()) { |
168 | if (it.value().begin != progress) |
169 | break; |
170 | |
171 | locker.unlock(); |
172 | reduceResult(reduce, r, result: it.value()); |
173 | locker.lock(); |
174 | |
175 | --resultsMapSize; |
176 | progress += it.value().end - it.value().begin; |
177 | it = resultsMap.erase(it); |
178 | } |
179 | } |
180 | } |
181 | |
182 | // final reduction |
183 | void finish(ReduceFunctor &reduce, ReduceResultType &r) |
184 | { |
185 | reduceResults(reduce, r, map&: resultsMap); |
186 | } |
187 | |
188 | inline bool shouldThrottle() |
189 | { |
190 | std::lock_guard<QMutex> locker(mutex); |
191 | return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount)); |
192 | } |
193 | |
194 | inline bool shouldStartThread() |
195 | { |
196 | std::lock_guard<QMutex> locker(mutex); |
197 | return (resultsMapSize <= (ReduceQueueStartLimit * threadCount)); |
198 | } |
199 | }; |
200 | |
201 | template <typename Sequence, typename Base, typename Functor1, typename Functor2> |
202 | struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base |
203 | { |
204 | template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2> |
205 | SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2, |
206 | ReduceOptions reduceOptions) |
207 | : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)), |
208 | Base(pool, this->sequence.cbegin(), this->sequence.cend(), |
209 | std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions) |
210 | { } |
211 | |
212 | template<typename InitialValueType, typename S = Sequence, |
213 | typename F1 = Functor1, typename F2 = Functor2> |
214 | SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2, |
215 | InitialValueType &&initialValue, ReduceOptions reduceOptions) |
216 | : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)), |
217 | Base(pool, this->sequence.cbegin(), this->sequence.cend(), |
218 | std::forward<F1>(functor1), std::forward<F2>(functor2), |
219 | std::forward<InitialValueType>(initialValue), reduceOptions) |
220 | { } |
221 | |
222 | void finish() override |
223 | { |
224 | Base::finish(); |
225 | // Clear the sequence to make sure all temporaries are destroyed |
226 | // before finished is signaled. |
227 | this->sequence = Sequence(); |
228 | } |
229 | }; |
230 | |
231 | } // namespace QtConcurrent |
232 | |
233 | QT_END_NAMESPACE |
234 | |
235 | #endif // QT_NO_CONCURRENT |
236 | |
237 | #endif |
238 | |