1 | /* -*- C++ -*- |
2 | This file is part of ThreadWeaver. |
3 | |
4 | SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org> |
5 | |
6 | SPDX-License-Identifier: LGPL-2.0-or-later |
7 | */ |
8 | |
9 | #include "collection_p.h" |
10 | #include "collection.h" |
11 | #include "debuggingaids.h" |
12 | #include "managedjobpointer.h" |
13 | #include "queueapi.h" |
14 | |
15 | namespace ThreadWeaver |
16 | { |
17 | namespace Private |
18 | { |
19 | Collection_Private::Collection_Private() |
20 | : api(nullptr) |
21 | , jobCounter(0) |
22 | , selfIsExecuting(false) |
23 | { |
24 | } |
25 | |
26 | Collection_Private::~Collection_Private() |
27 | { |
28 | } |
29 | |
30 | void Collection_Private::finalCleanup(Collection *collection) |
31 | { |
32 | Q_ASSERT(!self.isNull()); |
33 | Q_ASSERT(!mutex.tryLock()); |
34 | if (collection->shouldAbort()) { |
35 | collection->setStatus(Job::Status_Aborted); |
36 | } else if (collection->status() < Job::Status_Success) { |
37 | collection->setStatus(Job::Status_Success); |
38 | } else { |
39 | // At this point we either should have been running |
40 | // in which case above would mark Success |
41 | // or otherwise we already should be in Failed or Aborted state |
42 | Q_ASSERT(collection->status() == Job::Status_Failed || collection->status() == Job::Status_Aborted); |
43 | } |
44 | freeQueuePolicyResources(self); |
45 | api = nullptr; |
46 | } |
47 | |
48 | void Collection_Private::enqueueElements() |
49 | { |
50 | Q_ASSERT(!mutex.tryLock()); |
51 | prepareToEnqueueElements(); |
52 | jobCounter.fetchAndStoreOrdered(newValue: elements.count() + 1); // including self |
53 | api->enqueue(jobs: elements); |
54 | } |
55 | |
56 | void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *) |
57 | { |
58 | QMutexLocker l(&mutex); |
59 | Q_UNUSED(l); |
60 | Q_UNUSED(job) // except in Q_ASSERT |
61 | Q_ASSERT(!self.isNull()); |
62 | if (jobsStarted.fetchAndAddOrdered(valueToAdd: 1) == 0) { |
63 | // emit started() signal on beginning of first job execution |
64 | selfExecuteWrapper.callBegin(); |
65 | } |
66 | } |
67 | |
68 | namespace |
69 | { |
70 | struct MutexUnlocker { |
71 | QMutexLocker<QMutex> *locker; |
72 | |
73 | MutexUnlocker(QMutexLocker<QMutex> *l) |
74 | : locker(l) |
75 | { |
76 | locker->unlock(); |
77 | } |
78 | ~MutexUnlocker() |
79 | { |
80 | locker->relock(); |
81 | } |
82 | MutexUnlocker(const MutexUnlocker &) = delete; |
83 | MutexUnlocker &operator=(const MutexUnlocker &) = delete; |
84 | }; |
85 | } |
86 | |
87 | void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread) |
88 | { |
89 | JobPointer saveYourSelf = self; |
90 | Q_UNUSED(saveYourSelf); |
91 | QMutexLocker l(&mutex); |
92 | Q_UNUSED(l); |
93 | Q_ASSERT(!self.isNull()); |
94 | Q_UNUSED(job) // except in Q_ASSERT |
95 | if (selfIsExecuting) { |
96 | // the element that is finished is the collection itself |
97 | // the collection is always executed first |
98 | // No need to queue elements if we were aborted |
99 | if (!collection->shouldAbort()) { |
100 | // queue the collection elements: |
101 | enqueueElements(); |
102 | } |
103 | selfIsExecuting = false; |
104 | } |
105 | const int started = jobsStarted.loadAcquire(); |
106 | Q_ASSERT(started >= 0); |
107 | Q_UNUSED(started); |
108 | processCompletedElement(collection, job, thread); |
109 | const int remainingJobs = jobCounter.fetchAndAddOrdered(valueToAdd: -1) - 1; |
110 | TWDEBUG(4, "Collection_Private::elementFinished: %i\n" , remainingJobs); |
111 | if (remainingJobs <= -1) { |
112 | // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime |
113 | } else { |
114 | Q_ASSERT(remainingJobs >= 0); |
115 | if (remainingJobs == 0) { |
116 | // all elements can only be done if self has been executed: |
117 | // there is a small chance that (this) has been dequeued in the |
118 | // meantime, in this case, there is nothing left to clean up |
119 | finalCleanup(collection); |
120 | { |
121 | MutexUnlocker u(&l); |
122 | Q_UNUSED(u); |
123 | selfExecuteWrapper.callEnd(); |
124 | } |
125 | self.clear(); |
126 | } |
127 | } |
128 | } |
129 | |
130 | void Collection_Private::prepareToEnqueueElements() |
131 | { |
132 | // empty in Collection |
133 | } |
134 | |
135 | JobInterface::Status Collection_Private::updateStatus(Collection *collection, JobPointer job) |
136 | { |
137 | // Keep our collection status in running until all jobs have finished |
138 | // but make failures sticky so on first failed job we keep it counting as failure |
139 | auto newStatus = Job::Status_Running; |
140 | const auto jobStatus = job->status(); |
141 | if (jobStatus != JobInterface::Status_Success) { |
142 | newStatus = jobStatus; |
143 | } |
144 | collection->setStatus(newStatus); |
145 | return newStatus; |
146 | } |
147 | |
148 | void Collection_Private::processCompletedElement(Collection *collection, JobPointer job, Thread *) |
149 | { |
150 | updateStatus(collection, job); |
151 | } |
152 | |
153 | void Collection_Private::stop(Collection *collection) |
154 | { |
155 | QMutexLocker l(&mutex); |
156 | if (api != nullptr) { |
157 | // We can't dequeue ourselves while locked because we will |
158 | // get deadlock when our own aboutToBeDequeued will be invoked |
159 | // which will try to acquire this same lock |
160 | // and we need our own `api` because `finalCleanup` can be invoked in between |
161 | auto currentApi = api; |
162 | l.unlock(); |
163 | TWDEBUG(4, "Collection::stop: dequeueing %p.\n" , collection); |
164 | if (!currentApi->dequeue(job: ManagedJobPointer<Collection>(collection))) { |
165 | l.relock(); |
166 | dequeueElements(collection, queueApiIsLocked: false); |
167 | } |
168 | } |
169 | } |
170 | |
171 | void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked) |
172 | { |
173 | // dequeue everything: |
174 | Q_ASSERT(!mutex.tryLock()); |
175 | if (api == nullptr) { |
176 | return; // not queued |
177 | } |
178 | |
179 | for (const auto &job : elements) { |
180 | bool result; |
181 | if (queueApiIsLocked) { |
182 | result = api->dequeue_p(job); |
183 | } else { |
184 | result = api->dequeue(job); |
185 | } |
186 | if (result) { |
187 | jobCounter.fetchAndAddOrdered(valueToAdd: -1); |
188 | } |
189 | TWDEBUG(3, |
190 | "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n" , |
191 | (void *)job.data(), |
192 | result ? "found" : "not found" , |
193 | jobCounter.loadAcquire()); |
194 | elementDequeued(job); |
195 | } |
196 | |
197 | if (jobCounter.loadAcquire() == 1) { |
198 | finalCleanup(collection); |
199 | } |
200 | } |
201 | |
202 | void Collection_Private::requestAbort(Collection *collection) |
203 | { |
204 | stop(collection); |
205 | QMutexLocker l(&mutex); |
206 | for (auto job = elements.begin(); job != elements.end(); ++job) { |
207 | if ((*job)->status() <= JobInterface::Status_Running) { |
208 | (*job)->requestAbort(); |
209 | } |
210 | } |
211 | } |
212 | |
213 | void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread) |
214 | { |
215 | job_ = job; |
216 | thread_ = thread; |
217 | } |
218 | |
219 | void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread) |
220 | { |
221 | Q_ASSERT(job_ == job && thread_ == thread); |
222 | Q_UNUSED(job); |
223 | Q_UNUSED(thread); // except in assert |
224 | } |
225 | |
226 | void CollectionSelfExecuteWrapper::callBegin() |
227 | { |
228 | ExecuteWrapper::begin(job: job_, thread_); |
229 | } |
230 | |
231 | void CollectionSelfExecuteWrapper::callEnd() |
232 | { |
233 | ExecuteWrapper::end(job: job_, thread_); |
234 | job_.clear(); |
235 | } |
236 | |
237 | } |
238 | |
239 | } |
240 | |