1/* -*- C++ -*-
2 This file implements the Job class.
3
4 SPDX-FileCopyrightText: 2004-2013 Mirko Boehm <mirko@kde.org>
5
6 SPDX-License-Identifier: LGPL-2.0-or-later
7
8 $Id: Job.cpp 20 2005-08-08 21:02:51Z mirko $
9*/
10
11#include "job.h"
12#include "job_p.h"
13
14#include "debuggingaids.h"
15#include "thread.h"
16#include <QAtomicInt>
17#include <QAtomicPointer>
18#include <QList>
19#include <QMutex>
20
21#include "dependencypolicy.h"
22#include "exception.h"
23#include "executewrapper_p.h"
24#include "executor_p.h"
25#include "managedjobpointer.h"
26#include "queuepolicy.h"
27
28namespace ThreadWeaver
29{
30Job::Job()
31 : d_(new Private::Job_Private())
32{
33#if !defined(NDEBUG)
34 d()->debugExecuteWrapper.wrap(previous: setExecutor(&(d()->debugExecuteWrapper)));
35#endif
36 d()->status.storeRelease(newValue: Status_New);
37}
38
39Job::Job(Private::Job_Private *d__)
40 : d_(d__)
41{
42#if !defined(NDEBUG)
43 d()->debugExecuteWrapper.wrap(previous: setExecutor(&(d()->debugExecuteWrapper)));
44#endif
45 d()->status.storeRelease(newValue: Status_New);
46}
47
48Job::~Job()
49{
50 for (int index = 0; index < d()->queuePolicies.size(); ++index) {
51 d()->queuePolicies.at(i: index)->destructed(job: this);
52 }
53 delete d_;
54}
55
56void Job::execute(const JobPointer &self, Thread *th)
57{
58 Executor *executor = d()->executor.loadAcquire();
59 Q_ASSERT(executor); // may never be unset!
60 Q_ASSERT(self);
61 executor->begin(self, th);
62 self->setStatus(Status_Running);
63 try {
64 executor->execute(self, th);
65 if (self->status() == Status_Running) {
66 self->setStatus(Status_Success);
67 }
68 } catch (JobAborted &) {
69 self->setStatus(Status_Aborted);
70 } catch (JobFailed &) {
71 self->setStatus(Status_Failed);
72 } catch (AbortThread &) {
73 throw;
74 } catch (...) {
75 TWDEBUG(0, "Uncaught exception in Job %p, aborting.", self.data());
76 throw;
77 }
78 Q_ASSERT(self->status() > Status_Running);
79 executor->end(self, th);
80}
81
82void Job::blockingExecute()
83{
84 execute(self: ManagedJobPointer<Job>(this), th: nullptr);
85}
86
87Executor *Job::setExecutor(Executor *executor)
88{
89 return d()->executor.fetchAndStoreOrdered(newValue: executor == nullptr ? &Private::defaultExecutor : executor);
90}
91
92Executor *Job::executor() const
93{
94 return d()->executor.loadAcquire();
95}
96
97int Job::priority() const
98{
99 return 0;
100}
101
102void Job::setStatus(JobInterface::Status status)
103{
104 d()->status.storeRelease(newValue: status);
105}
106
107JobInterface::Status Job::status() const
108{
109 // since status is set only through setStatus, this should be safe:
110 return static_cast<Status>(d()->status.loadAcquire());
111}
112
113bool Job::success() const
114{
115 return d()->status.loadAcquire() == Status_Success;
116}
117
118void Job::requestAbort()
119{
120 d()->shouldAbort = true;
121}
122
123void Job::defaultBegin(const JobPointer &, Thread *)
124{
125}
126
127void Job::defaultEnd(const JobPointer &job, Thread *)
128{
129 d()->handleFinish(job);
130 d()->freeQueuePolicyResources(job);
131}
132
133void Job::aboutToBeQueued(QueueAPI *api)
134{
135 QMutexLocker l(mutex());
136 Q_UNUSED(l);
137 aboutToBeQueued_locked(api);
138}
139
140void Job::aboutToBeQueued_locked(QueueAPI *)
141{
142}
143
144void Job::aboutToBeDequeued(QueueAPI *api)
145{
146 QMutexLocker l(mutex());
147 Q_UNUSED(l);
148 aboutToBeDequeued_locked(api);
149}
150
151void Job::aboutToBeDequeued_locked(QueueAPI *)
152{
153}
154
155void Job::assignQueuePolicy(QueuePolicy *policy)
156{
157 Q_ASSERT(!mutex()->tryLock());
158 if (!d()->queuePolicies.contains(t: policy)) {
159 d()->queuePolicies.append(t: policy);
160 }
161}
162
163void Job::removeQueuePolicy(QueuePolicy *policy)
164{
165 Q_ASSERT(!mutex()->tryLock());
166 int index = d()->queuePolicies.indexOf(t: policy);
167 if (index != -1) {
168 d()->queuePolicies.removeAt(i: index);
169 }
170}
171
172QList<QueuePolicy *> Job::queuePolicies() const
173{
174 Q_ASSERT(!mutex()->tryLock());
175 return d()->queuePolicies;
176}
177
178Private::Job_Private *Job::d()
179{
180 return d_;
181}
182
183const Private::Job_Private *Job::d() const
184{
185 return d_;
186}
187
188bool Job::isFinished() const
189{
190 const Status s = status();
191 return s == Status_Success || s == Status_Failed || s == Status_Aborted;
192}
193
194QMutex *Job::mutex() const
195{
196 return &(d()->mutex);
197}
198
199bool Job::shouldAbort() const
200{
201 return d()->shouldAbort;
202}
203
204void Job::onFinish(const std::function<void(const JobInterface &job)> &lambda)
205{
206 QMutexLocker l(mutex());
207 d()->finishHandlers << lambda;
208}
209
210}
211
212#include "managedjobpointer.h"
213

source code of threadweaver/src/job.cpp