1/* -*- C++ -*-
2 This file is part of ThreadWeaver.
3
4 SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7*/
8
9#include "collection_p.h"
10#include "collection.h"
11#include "debuggingaids.h"
12#include "managedjobpointer.h"
13#include "queueapi.h"
14
15namespace ThreadWeaver
16{
17namespace Private
18{
19Collection_Private::Collection_Private()
20 : api(nullptr)
21 , jobCounter(0)
22 , selfIsExecuting(false)
23{
24}
25
26Collection_Private::~Collection_Private()
27{
28}
29
30void Collection_Private::finalCleanup(Collection *collection)
31{
32 Q_ASSERT(!self.isNull());
33 Q_ASSERT(!mutex.tryLock());
34 if (collection->shouldAbort()) {
35 collection->setStatus(Job::Status_Aborted);
36 } else if (collection->status() < Job::Status_Success) {
37 collection->setStatus(Job::Status_Success);
38 } else {
39 // At this point we either should have been running
40 // in which case above would mark Success
41 // or otherwise we already should be in Failed or Aborted state
42 Q_ASSERT(collection->status() == Job::Status_Failed || collection->status() == Job::Status_Aborted);
43 }
44 freeQueuePolicyResources(self);
45 api = nullptr;
46}
47
48void Collection_Private::enqueueElements()
49{
50 Q_ASSERT(!mutex.tryLock());
51 prepareToEnqueueElements();
52 jobCounter.fetchAndStoreOrdered(newValue: elements.count() + 1); // including self
53 api->enqueue(jobs: elements);
54}
55
56void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *)
57{
58 QMutexLocker l(&mutex);
59 Q_UNUSED(l);
60 Q_UNUSED(job) // except in Q_ASSERT
61 Q_ASSERT(!self.isNull());
62 if (jobsStarted.fetchAndAddOrdered(valueToAdd: 1) == 0) {
63 // emit started() signal on beginning of first job execution
64 selfExecuteWrapper.callBegin();
65 }
66}
67
68namespace
69{
70struct MutexUnlocker {
71 QMutexLocker<QMutex> *locker;
72
73 MutexUnlocker(QMutexLocker<QMutex> *l)
74 : locker(l)
75 {
76 locker->unlock();
77 }
78 ~MutexUnlocker()
79 {
80 locker->relock();
81 }
82 MutexUnlocker(const MutexUnlocker &) = delete;
83 MutexUnlocker &operator=(const MutexUnlocker &) = delete;
84};
85}
86
87void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread)
88{
89 JobPointer saveYourSelf = self;
90 Q_UNUSED(saveYourSelf);
91 QMutexLocker l(&mutex);
92 Q_UNUSED(l);
93 Q_ASSERT(!self.isNull());
94 Q_UNUSED(job) // except in Q_ASSERT
95 if (selfIsExecuting) {
96 // the element that is finished is the collection itself
97 // the collection is always executed first
98 // No need to queue elements if we were aborted
99 if (!collection->shouldAbort()) {
100 // queue the collection elements:
101 enqueueElements();
102 }
103 selfIsExecuting = false;
104 }
105 const int started = jobsStarted.loadAcquire();
106 Q_ASSERT(started >= 0);
107 Q_UNUSED(started);
108 processCompletedElement(collection, job, thread);
109 const int remainingJobs = jobCounter.fetchAndAddOrdered(valueToAdd: -1) - 1;
110 TWDEBUG(4, "Collection_Private::elementFinished: %i\n", remainingJobs);
111 if (remainingJobs <= -1) {
112 // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime
113 } else {
114 Q_ASSERT(remainingJobs >= 0);
115 if (remainingJobs == 0) {
116 // all elements can only be done if self has been executed:
117 // there is a small chance that (this) has been dequeued in the
118 // meantime, in this case, there is nothing left to clean up
119 finalCleanup(collection);
120 {
121 MutexUnlocker u(&l);
122 Q_UNUSED(u);
123 selfExecuteWrapper.callEnd();
124 }
125 self.clear();
126 }
127 }
128}
129
130void Collection_Private::prepareToEnqueueElements()
131{
132 // empty in Collection
133}
134
135JobInterface::Status Collection_Private::updateStatus(Collection *collection, JobPointer job)
136{
137 // Keep our collection status in running until all jobs have finished
138 // but make failures sticky so on first failed job we keep it counting as failure
139 auto newStatus = Job::Status_Running;
140 const auto jobStatus = job->status();
141 if (jobStatus != JobInterface::Status_Success) {
142 newStatus = jobStatus;
143 }
144 collection->setStatus(newStatus);
145 return newStatus;
146}
147
148void Collection_Private::processCompletedElement(Collection *collection, JobPointer job, Thread *)
149{
150 updateStatus(collection, job);
151}
152
153void Collection_Private::stop(Collection *collection)
154{
155 QMutexLocker l(&mutex);
156 if (api != nullptr) {
157 // We can't dequeue ourselves while locked because we will
158 // get deadlock when our own aboutToBeDequeued will be invoked
159 // which will try to acquire this same lock
160 // and we need our own `api` because `finalCleanup` can be invoked in between
161 auto currentApi = api;
162 l.unlock();
163 TWDEBUG(4, "Collection::stop: dequeueing %p.\n", collection);
164 if (!currentApi->dequeue(job: ManagedJobPointer<Collection>(collection))) {
165 l.relock();
166 dequeueElements(collection, queueApiIsLocked: false);
167 }
168 }
169}
170
171void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked)
172{
173 // dequeue everything:
174 Q_ASSERT(!mutex.tryLock());
175 if (api == nullptr) {
176 return; // not queued
177 }
178
179 for (const auto &job : elements) {
180 bool result;
181 if (queueApiIsLocked) {
182 result = api->dequeue_p(job);
183 } else {
184 result = api->dequeue(job);
185 }
186 if (result) {
187 jobCounter.fetchAndAddOrdered(valueToAdd: -1);
188 }
189 TWDEBUG(3,
190 "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n",
191 (void *)job.data(),
192 result ? "found" : "not found",
193 jobCounter.loadAcquire());
194 elementDequeued(job);
195 }
196
197 if (jobCounter.loadAcquire() == 1) {
198 finalCleanup(collection);
199 }
200}
201
202void Collection_Private::requestAbort(Collection *collection)
203{
204 stop(collection);
205 QMutexLocker l(&mutex);
206 for (auto job = elements.begin(); job != elements.end(); ++job) {
207 if ((*job)->status() <= JobInterface::Status_Running) {
208 (*job)->requestAbort();
209 }
210 }
211}
212
213void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread)
214{
215 job_ = job;
216 thread_ = thread;
217}
218
219void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread)
220{
221 Q_ASSERT(job_ == job && thread_ == thread);
222 Q_UNUSED(job);
223 Q_UNUSED(thread); // except in assert
224}
225
226void CollectionSelfExecuteWrapper::callBegin()
227{
228 ExecuteWrapper::begin(job: job_, thread_);
229}
230
231void CollectionSelfExecuteWrapper::callEnd()
232{
233 ExecuteWrapper::end(job: job_, thread_);
234 job_.clear();
235}
236
237}
238
239}
240

source code of threadweaver/src/collection_p.cpp