| 1 | /* -*- C++ -*- |
| 2 | This file is part of ThreadWeaver. |
| 3 | |
| 4 | SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org> |
| 5 | |
| 6 | SPDX-License-Identifier: LGPL-2.0-or-later |
| 7 | */ |
| 8 | |
| 9 | #include "collection_p.h" |
| 10 | #include "collection.h" |
| 11 | #include "debuggingaids.h" |
| 12 | #include "managedjobpointer.h" |
| 13 | #include "queueapi.h" |
| 14 | |
| 15 | namespace ThreadWeaver |
| 16 | { |
| 17 | namespace Private |
| 18 | { |
| 19 | Collection_Private::Collection_Private() |
| 20 | : api(nullptr) |
| 21 | , jobCounter(0) |
| 22 | , selfIsExecuting(false) |
| 23 | { |
| 24 | } |
| 25 | |
| 26 | Collection_Private::~Collection_Private() |
| 27 | { |
| 28 | } |
| 29 | |
| 30 | void Collection_Private::finalCleanup(Collection *collection) |
| 31 | { |
| 32 | Q_ASSERT(!self.isNull()); |
| 33 | Q_ASSERT(!mutex.tryLock()); |
| 34 | if (collection->shouldAbort()) { |
| 35 | collection->setStatus(Job::Status_Aborted); |
| 36 | } else if (collection->status() < Job::Status_Success) { |
| 37 | collection->setStatus(Job::Status_Success); |
| 38 | } else { |
| 39 | // At this point we either should have been running |
| 40 | // in which case above would mark Success |
| 41 | // or otherwise we already should be in Failed or Aborted state |
| 42 | Q_ASSERT(collection->status() == Job::Status_Failed || collection->status() == Job::Status_Aborted); |
| 43 | } |
| 44 | freeQueuePolicyResources(self); |
| 45 | api = nullptr; |
| 46 | } |
| 47 | |
| 48 | void Collection_Private::enqueueElements() |
| 49 | { |
| 50 | Q_ASSERT(!mutex.tryLock()); |
| 51 | prepareToEnqueueElements(); |
| 52 | jobCounter.fetchAndStoreOrdered(newValue: elements.count() + 1); // including self |
| 53 | api->enqueue(jobs: elements); |
| 54 | } |
| 55 | |
| 56 | void Collection_Private::elementStarted(Collection *, JobPointer job, Thread *) |
| 57 | { |
| 58 | QMutexLocker l(&mutex); |
| 59 | Q_UNUSED(l); |
| 60 | Q_UNUSED(job) // except in Q_ASSERT |
| 61 | Q_ASSERT(!self.isNull()); |
| 62 | if (jobsStarted.fetchAndAddOrdered(valueToAdd: 1) == 0) { |
| 63 | // emit started() signal on beginning of first job execution |
| 64 | selfExecuteWrapper.callBegin(); |
| 65 | } |
| 66 | } |
| 67 | |
| 68 | namespace |
| 69 | { |
| 70 | struct MutexUnlocker { |
| 71 | QMutexLocker<QMutex> *locker; |
| 72 | |
| 73 | MutexUnlocker(QMutexLocker<QMutex> *l) |
| 74 | : locker(l) |
| 75 | { |
| 76 | locker->unlock(); |
| 77 | } |
| 78 | ~MutexUnlocker() |
| 79 | { |
| 80 | locker->relock(); |
| 81 | } |
| 82 | MutexUnlocker(const MutexUnlocker &) = delete; |
| 83 | MutexUnlocker &operator=(const MutexUnlocker &) = delete; |
| 84 | }; |
| 85 | } |
| 86 | |
| 87 | void Collection_Private::elementFinished(Collection *collection, JobPointer job, Thread *thread) |
| 88 | { |
| 89 | JobPointer saveYourSelf = self; |
| 90 | Q_UNUSED(saveYourSelf); |
| 91 | QMutexLocker l(&mutex); |
| 92 | Q_UNUSED(l); |
| 93 | Q_ASSERT(!self.isNull()); |
| 94 | Q_UNUSED(job) // except in Q_ASSERT |
| 95 | if (selfIsExecuting) { |
| 96 | // the element that is finished is the collection itself |
| 97 | // the collection is always executed first |
| 98 | // No need to queue elements if we were aborted |
| 99 | if (!collection->shouldAbort()) { |
| 100 | // queue the collection elements: |
| 101 | enqueueElements(); |
| 102 | } |
| 103 | selfIsExecuting = false; |
| 104 | } |
| 105 | const int started = jobsStarted.loadAcquire(); |
| 106 | Q_ASSERT(started >= 0); |
| 107 | Q_UNUSED(started); |
| 108 | processCompletedElement(collection, job, thread); |
| 109 | const int remainingJobs = jobCounter.fetchAndAddOrdered(valueToAdd: -1) - 1; |
| 110 | TWDEBUG(4, "Collection_Private::elementFinished: %i\n" , remainingJobs); |
| 111 | if (remainingJobs <= -1) { |
| 112 | // its no use to count, the elements have been dequeued, now the threads call back that have been processing jobs in the meantime |
| 113 | } else { |
| 114 | Q_ASSERT(remainingJobs >= 0); |
| 115 | if (remainingJobs == 0) { |
| 116 | // all elements can only be done if self has been executed: |
| 117 | // there is a small chance that (this) has been dequeued in the |
| 118 | // meantime, in this case, there is nothing left to clean up |
| 119 | finalCleanup(collection); |
| 120 | { |
| 121 | MutexUnlocker u(&l); |
| 122 | Q_UNUSED(u); |
| 123 | selfExecuteWrapper.callEnd(); |
| 124 | } |
| 125 | self.clear(); |
| 126 | } |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | void Collection_Private::prepareToEnqueueElements() |
| 131 | { |
| 132 | // empty in Collection |
| 133 | } |
| 134 | |
| 135 | JobInterface::Status Collection_Private::updateStatus(Collection *collection, JobPointer job) |
| 136 | { |
| 137 | // Keep our collection status in running until all jobs have finished |
| 138 | // but make failures sticky so on first failed job we keep it counting as failure |
| 139 | auto newStatus = Job::Status_Running; |
| 140 | const auto jobStatus = job->status(); |
| 141 | if (jobStatus != JobInterface::Status_Success) { |
| 142 | newStatus = jobStatus; |
| 143 | } |
| 144 | collection->setStatus(newStatus); |
| 145 | return newStatus; |
| 146 | } |
| 147 | |
| 148 | void Collection_Private::processCompletedElement(Collection *collection, JobPointer job, Thread *) |
| 149 | { |
| 150 | updateStatus(collection, job); |
| 151 | } |
| 152 | |
| 153 | void Collection_Private::stop(Collection *collection) |
| 154 | { |
| 155 | QMutexLocker l(&mutex); |
| 156 | if (api != nullptr) { |
| 157 | // We can't dequeue ourselves while locked because we will |
| 158 | // get deadlock when our own aboutToBeDequeued will be invoked |
| 159 | // which will try to acquire this same lock |
| 160 | // and we need our own `api` because `finalCleanup` can be invoked in between |
| 161 | auto currentApi = api; |
| 162 | l.unlock(); |
| 163 | TWDEBUG(4, "Collection::stop: dequeueing %p.\n" , collection); |
| 164 | if (!currentApi->dequeue(job: ManagedJobPointer<Collection>(collection))) { |
| 165 | l.relock(); |
| 166 | dequeueElements(collection, queueApiIsLocked: false); |
| 167 | } |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | void Collection_Private::dequeueElements(Collection *collection, bool queueApiIsLocked) |
| 172 | { |
| 173 | // dequeue everything: |
| 174 | Q_ASSERT(!mutex.tryLock()); |
| 175 | if (api == nullptr) { |
| 176 | return; // not queued |
| 177 | } |
| 178 | |
| 179 | for (const auto &job : elements) { |
| 180 | bool result; |
| 181 | if (queueApiIsLocked) { |
| 182 | result = api->dequeue_p(job); |
| 183 | } else { |
| 184 | result = api->dequeue(job); |
| 185 | } |
| 186 | if (result) { |
| 187 | jobCounter.fetchAndAddOrdered(valueToAdd: -1); |
| 188 | } |
| 189 | TWDEBUG(3, |
| 190 | "Collection::Private::dequeueElements: dequeueing %p (%s, %i jobs left).\n" , |
| 191 | (void *)job.data(), |
| 192 | result ? "found" : "not found" , |
| 193 | jobCounter.loadAcquire()); |
| 194 | elementDequeued(job); |
| 195 | } |
| 196 | |
| 197 | if (jobCounter.loadAcquire() == 1) { |
| 198 | finalCleanup(collection); |
| 199 | } |
| 200 | } |
| 201 | |
| 202 | void Collection_Private::requestAbort(Collection *collection) |
| 203 | { |
| 204 | stop(collection); |
| 205 | QMutexLocker l(&mutex); |
| 206 | for (auto job = elements.begin(); job != elements.end(); ++job) { |
| 207 | if ((*job)->status() <= JobInterface::Status_Running) { |
| 208 | (*job)->requestAbort(); |
| 209 | } |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | void CollectionSelfExecuteWrapper::begin(const JobPointer &job, Thread *thread) |
| 214 | { |
| 215 | job_ = job; |
| 216 | thread_ = thread; |
| 217 | } |
| 218 | |
| 219 | void CollectionSelfExecuteWrapper::end(const JobPointer &job, Thread *thread) |
| 220 | { |
| 221 | Q_ASSERT(job_ == job && thread_ == thread); |
| 222 | Q_UNUSED(job); |
| 223 | Q_UNUSED(thread); // except in assert |
| 224 | } |
| 225 | |
| 226 | void CollectionSelfExecuteWrapper::callBegin() |
| 227 | { |
| 228 | ExecuteWrapper::begin(job: job_, thread_); |
| 229 | } |
| 230 | |
| 231 | void CollectionSelfExecuteWrapper::callEnd() |
| 232 | { |
| 233 | ExecuteWrapper::end(job: job_, thread_); |
| 234 | job_.clear(); |
| 235 | } |
| 236 | |
| 237 | } |
| 238 | |
| 239 | } |
| 240 | |