1 | /* |
2 | This file is part of the KDE libraries |
3 | SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org> |
4 | SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org> |
5 | SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com> |
6 | |
7 | SPDX-License-Identifier: LGPL-2.0-only |
8 | */ |
9 | |
10 | #include "scheduler.h" |
11 | #include "scheduler_p.h" |
12 | |
13 | #include "connection_p.h" |
14 | #include "job_p.h" |
15 | #include "kprotocolmanager_p.h" |
16 | #include "sessiondata_p.h" |
17 | #include "worker_p.h" |
18 | #include "workerconfig.h" |
19 | |
20 | #include <kprotocolinfo.h> |
21 | #include <kprotocolmanager.h> |
22 | |
23 | #ifndef KIO_ANDROID_STUB |
24 | #include <QDBusConnection> |
25 | #include <QDBusMessage> |
26 | #endif |
27 | #include <QHash> |
28 | #include <QThread> |
29 | #include <QThreadStorage> |
30 | |
31 | // Workers may be idle for a certain time (3 minutes) before they are killed. |
32 | static const int s_idleWorkerLifetime = 3 * 60; |
33 | |
34 | using namespace KIO; |
35 | |
36 | static inline Worker *jobSWorker(SimpleJob *job) |
37 | { |
38 | return SimpleJobPrivate::get(job)->m_worker; |
39 | } |
40 | |
41 | static inline int jobCommand(SimpleJob *job) |
42 | { |
43 | return SimpleJobPrivate::get(job)->m_command; |
44 | } |
45 | |
46 | static inline void startJob(SimpleJob *job, Worker *worker) |
47 | { |
48 | SimpleJobPrivate::get(job)->start(worker); |
49 | } |
50 | |
51 | class KIO::SchedulerPrivate |
52 | { |
53 | public: |
54 | SchedulerPrivate() |
55 | : q(new Scheduler()) |
56 | { |
57 | } |
58 | |
59 | ~SchedulerPrivate() |
60 | { |
61 | removeWorkerOnHold(); |
62 | delete q; |
63 | q = nullptr; |
64 | qDeleteAll(c: m_protocols); // ~ProtoQueue will kill and delete all workers |
65 | } |
66 | |
67 | SchedulerPrivate(const SchedulerPrivate &) = delete; |
68 | SchedulerPrivate &operator=(const SchedulerPrivate &) = delete; |
69 | |
70 | Scheduler *q; |
71 | |
72 | Worker *m_workerOnHold = nullptr; |
73 | QUrl m_urlOnHold; |
74 | bool m_ignoreConfigReparse = false; |
75 | |
76 | SessionData sessionData; |
77 | |
78 | void doJob(SimpleJob *job); |
79 | void setJobPriority(SimpleJob *job, int priority); |
80 | void cancelJob(SimpleJob *job); |
81 | void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker); |
82 | void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url); |
83 | void removeWorkerOnHold(); |
84 | Worker *heldWorkerForJob(KIO::SimpleJob *job); |
85 | bool isWorkerOnHoldFor(const QUrl &url); |
86 | void updateInternalMetaData(SimpleJob *job); |
87 | |
88 | MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url); |
89 | void setupWorker(KIO::Worker *worker, |
90 | const QUrl &url, |
91 | const QString &protocol, |
92 | const QStringList &proxyList, |
93 | bool newWorker, |
94 | const KIO::MetaData *config = nullptr); |
95 | |
96 | void slotWorkerDied(KIO::Worker *worker); |
97 | |
98 | #ifndef KIO_ANDROID_STUB |
99 | void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &); |
100 | #endif |
101 | |
102 | ProtoQueue *protoQ(const QString &protocol, const QString &host); |
103 | |
104 | private: |
105 | QHash<QString, ProtoQueue *> m_protocols; |
106 | }; |
107 | |
108 | static QThreadStorage<SchedulerPrivate *> s_storage; |
109 | static SchedulerPrivate *schedulerPrivate() |
110 | { |
111 | if (!s_storage.hasLocalData()) { |
112 | s_storage.setLocalData(new SchedulerPrivate); |
113 | } |
114 | return s_storage.localData(); |
115 | } |
116 | |
117 | Scheduler *Scheduler::self() |
118 | { |
119 | return schedulerPrivate()->q; |
120 | } |
121 | |
122 | SchedulerPrivate *Scheduler::d_func() |
123 | { |
124 | return schedulerPrivate(); |
125 | } |
126 | |
127 | // static |
128 | Scheduler *scheduler() |
129 | { |
130 | return schedulerPrivate()->q; |
131 | } |
132 | |
133 | //////////////////////////// |
134 | |
135 | int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const |
136 | { |
137 | Q_ASSERT(newPriority >= -10 && newPriority <= 10); |
138 | newPriority = qBound(min: -10, val: newPriority, max: 10); |
139 | int unbiasedSerial = oldSerial % m_jobsPerPriority; |
140 | return unbiasedSerial + newPriority * m_jobsPerPriority; |
141 | } |
142 | |
143 | WorkerManager::WorkerManager() |
144 | { |
145 | m_grimTimer.setSingleShot(true); |
146 | connect(sender: &m_grimTimer, signal: &QTimer::timeout, context: this, slot: &WorkerManager::grimReaper); |
147 | } |
148 | |
149 | WorkerManager::~WorkerManager() |
150 | { |
151 | grimReaper(); |
152 | } |
153 | |
154 | void WorkerManager::returnWorker(Worker *worker) |
155 | { |
156 | Q_ASSERT(worker); |
157 | worker->setIdle(); |
158 | m_idleWorkers.insert(key: worker->host(), value: worker); |
159 | scheduleGrimReaper(); |
160 | } |
161 | |
162 | Worker *WorkerManager::takeWorkerForJob(SimpleJob *job) |
163 | { |
164 | Worker *worker = schedulerPrivate()->heldWorkerForJob(job); |
165 | if (worker) { |
166 | return worker; |
167 | } |
168 | |
169 | QUrl url = SimpleJobPrivate::get(job)->m_url; |
170 | // TODO take port, username and password into account |
171 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(key: url.host()); |
172 | if (it == m_idleWorkers.end()) { |
173 | it = m_idleWorkers.begin(); |
174 | } |
175 | if (it == m_idleWorkers.end()) { |
176 | return nullptr; |
177 | } |
178 | worker = it.value(); |
179 | m_idleWorkers.erase(it); |
180 | return worker; |
181 | } |
182 | |
183 | bool WorkerManager::removeWorker(Worker *worker) |
184 | { |
185 | // ### performance not so great |
186 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); |
187 | for (; it != m_idleWorkers.end(); ++it) { |
188 | if (it.value() == worker) { |
189 | m_idleWorkers.erase(it); |
190 | return true; |
191 | } |
192 | } |
193 | return false; |
194 | } |
195 | |
196 | void WorkerManager::clear() |
197 | { |
198 | m_idleWorkers.clear(); |
199 | } |
200 | |
201 | QList<Worker *> WorkerManager::allWorkers() const |
202 | { |
203 | return m_idleWorkers.values(); |
204 | } |
205 | |
206 | void WorkerManager::scheduleGrimReaper() |
207 | { |
208 | if (!m_grimTimer.isActive()) { |
209 | m_grimTimer.start(msec: (s_idleWorkerLifetime / 2) * 1000); |
210 | } |
211 | } |
212 | |
213 | // private slot |
214 | void WorkerManager::grimReaper() |
215 | { |
216 | QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin(); |
217 | while (it != m_idleWorkers.end()) { |
218 | Worker *worker = it.value(); |
219 | if (worker->idleTime() >= s_idleWorkerLifetime) { |
220 | it = m_idleWorkers.erase(it); |
221 | if (worker->job()) { |
222 | // qDebug() << "Idle worker" << worker << "still has job" << worker->job(); |
223 | } |
224 | // avoid invoking slotWorkerDied() because its cleanup services are not needed |
225 | worker->kill(); |
226 | } else { |
227 | ++it; |
228 | } |
229 | } |
230 | if (!m_idleWorkers.isEmpty()) { |
231 | scheduleGrimReaper(); |
232 | } |
233 | } |
234 | |
235 | int HostQueue::lowestSerial() const |
236 | { |
237 | QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin(); |
238 | if (first != m_queuedJobs.constEnd()) { |
239 | return first.key(); |
240 | } |
241 | return SerialPicker::maxSerial; |
242 | } |
243 | |
244 | void HostQueue::queueJob(SimpleJob *job) |
245 | { |
246 | const int serial = SimpleJobPrivate::get(job)->m_schedSerial; |
247 | Q_ASSERT(serial != 0); |
248 | Q_ASSERT(!m_queuedJobs.contains(serial)); |
249 | Q_ASSERT(!m_runningJobs.contains(job)); |
250 | m_queuedJobs.insert(key: serial, value: job); |
251 | } |
252 | |
253 | SimpleJob *HostQueue::takeFirstInQueue() |
254 | { |
255 | Q_ASSERT(!m_queuedJobs.isEmpty()); |
256 | QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin(); |
257 | SimpleJob *job = first.value(); |
258 | m_queuedJobs.erase(it: first); |
259 | m_runningJobs.insert(value: job); |
260 | return job; |
261 | } |
262 | |
263 | bool HostQueue::removeJob(SimpleJob *job) |
264 | { |
265 | const int serial = SimpleJobPrivate::get(job)->m_schedSerial; |
266 | if (m_runningJobs.remove(value: job)) { |
267 | Q_ASSERT(!m_queuedJobs.contains(serial)); |
268 | return true; |
269 | } |
270 | if (m_queuedJobs.remove(key: serial)) { |
271 | return true; |
272 | } |
273 | return false; |
274 | } |
275 | |
276 | QList<Worker *> HostQueue::allWorkers() const |
277 | { |
278 | QList<Worker *> ret; |
279 | ret.reserve(asize: m_runningJobs.size()); |
280 | for (SimpleJob *job : m_runningJobs) { |
281 | Worker *worker = jobSWorker(job); |
282 | Q_ASSERT(worker); |
283 | ret.append(t: worker); |
284 | } |
285 | return ret; |
286 | } |
287 | |
288 | static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial) |
289 | { |
290 | Q_UNUSED(queuesBySerial); |
291 | #ifdef SCHEDULER_DEBUG |
292 | // a host queue may *never* be in queuesBySerial twice. |
293 | QSet<HostQueue *> seen; |
294 | auto it = queuesBySerial->cbegin(); |
295 | for (; it != queuesBySerial->cend(); ++it) { |
296 | Q_ASSERT(!seen.contains(it.value())); |
297 | seen.insert(it.value()); |
298 | } |
299 | #endif |
300 | } |
301 | |
302 | static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount) |
303 | { |
304 | Q_UNUSED(queues); |
305 | Q_UNUSED(runningJobsCount); |
306 | #ifdef SCHEDULER_DEBUG |
307 | int realRunningJobsCount = 0; |
308 | auto it = queues->cbegin(); |
309 | for (; it != queues->cend(); ++it) { |
310 | realRunningJobsCount += it.value().runningJobsCount(); |
311 | } |
312 | Q_ASSERT(realRunningJobsCount == runningJobsCount); |
313 | |
314 | // ...and of course we may never run the same job twice! |
315 | QSet<SimpleJob *> seenJobs; |
316 | auto it2 = queues->cbegin(); |
317 | for (; it2 != queues->cend(); ++it2) { |
318 | for (SimpleJob *job : it2.value().runningJobs()) { |
319 | Q_ASSERT(!seenJobs.contains(job)); |
320 | seenJobs.insert(job); |
321 | } |
322 | } |
323 | #endif |
324 | } |
325 | |
326 | ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost) |
327 | : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers) |
328 | , m_maxConnectionsTotal(qMax(a: maxWorkers, b: maxWorkersPerHost)) |
329 | , m_runningJobsCount(0) |
330 | |
331 | { |
332 | /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal |
333 | << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/ |
334 | Q_ASSERT(m_maxConnectionsPerHost >= 1); |
335 | Q_ASSERT(maxWorkers >= maxWorkersPerHost); |
336 | m_startJobTimer.setSingleShot(true); |
337 | connect(sender: &m_startJobTimer, signal: &QTimer::timeout, context: this, slot: &ProtoQueue::startAJob); |
338 | } |
339 | |
340 | ProtoQueue::~ProtoQueue() |
341 | { |
342 | // Gather list of all workers first |
343 | const QList<Worker *> workers = allWorkers(); |
344 | // Clear the idle workers in the manager to avoid dangling pointers |
345 | m_workerManager.clear(); |
346 | for (Worker *worker : workers) { |
347 | // kill the worker process and remove the interface in our process |
348 | worker->kill(); |
349 | } |
350 | } |
351 | |
352 | void ProtoQueue::queueJob(SimpleJob *job) |
353 | { |
354 | QString hostname = SimpleJobPrivate::get(job)->m_url.host(); |
355 | HostQueue &hq = m_queuesByHostname[hostname]; |
356 | const int prevLowestSerial = hq.lowestSerial(); |
357 | Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); |
358 | |
359 | // nevert insert a job twice |
360 | Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0); |
361 | SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next(); |
362 | |
363 | const bool wasQueueEmpty = hq.isQueueEmpty(); |
364 | hq.queueJob(job); |
365 | // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too... |
366 | // the queue's lowest serial job may have changed, so update the ordered list of queues. |
367 | // however, we ignore all jobs that would cause more connections to a host than allowed. |
368 | if (prevLowestSerial != hq.lowestSerial()) { |
369 | if (hq.runningJobsCount() < m_maxConnectionsPerHost) { |
370 | // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs |
371 | if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) { |
372 | Q_UNUSED(wasQueueEmpty); |
373 | Q_ASSERT(wasQueueEmpty); |
374 | } |
375 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
376 | } else { |
377 | #ifdef SCHEDULER_DEBUG |
378 | // ### this assertion may fail if the limits were modified at runtime! |
379 | // if the per-host connection limit is already reached the host queue's lowest serial |
380 | // should not be queued. |
381 | Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial)); |
382 | #endif |
383 | } |
384 | } |
385 | // just in case; startAJob() will refuse to start a job if it shouldn't. |
386 | m_startJobTimer.start(); |
387 | |
388 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
389 | } |
390 | |
391 | void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio) |
392 | { |
393 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); |
394 | QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(key: jobPriv->m_url.host()); |
395 | if (it == m_queuesByHostname.end()) { |
396 | return; |
397 | } |
398 | HostQueue &hq = it.value(); |
399 | const int prevLowestSerial = hq.lowestSerial(); |
400 | if (hq.isJobRunning(job) || !hq.removeJob(job)) { |
401 | return; |
402 | } |
403 | jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(oldSerial: jobPriv->m_schedSerial, newPriority: newPrio); |
404 | hq.queueJob(job); |
405 | const bool needReinsert = hq.lowestSerial() != prevLowestSerial; |
406 | // the host queue might be absent from m_queuesBySerial because the connections per host limit |
407 | // for that host has been reached. |
408 | if (needReinsert && m_queuesBySerial.remove(key: prevLowestSerial)) { |
409 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
410 | } |
411 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
412 | } |
413 | |
414 | void ProtoQueue::removeJob(SimpleJob *job) |
415 | { |
416 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); |
417 | HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()]; |
418 | const int prevLowestSerial = hq.lowestSerial(); |
419 | const int prevRunningJobs = hq.runningJobsCount(); |
420 | |
421 | Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); |
422 | |
423 | if (hq.removeJob(job)) { |
424 | if (hq.lowestSerial() != prevLowestSerial) { |
425 | // we have dequeued the not yet running job with the lowest serial |
426 | Q_ASSERT(!jobPriv->m_worker); |
427 | Q_ASSERT(prevRunningJobs == hq.runningJobsCount()); |
428 | if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) { |
429 | // make sure that the queue was not scheduled for a good reason |
430 | Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost); |
431 | } |
432 | } else { |
433 | if (prevRunningJobs != hq.runningJobsCount()) { |
434 | // we have dequeued a previously running job |
435 | Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount()); |
436 | m_runningJobsCount--; |
437 | Q_ASSERT(m_runningJobsCount >= 0); |
438 | } |
439 | } |
440 | if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) { |
441 | // this may be a no-op, but it's faster than first checking if it's already in. |
442 | m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq); |
443 | } |
444 | |
445 | if (hq.isEmpty()) { |
446 | // no queued jobs, no running jobs. this destroys hq from above. |
447 | m_queuesByHostname.remove(key: jobPriv->m_url.host()); |
448 | } |
449 | |
450 | if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) { |
451 | m_workerManager.returnWorker(worker: jobPriv->m_worker); |
452 | } |
453 | // just in case; startAJob() will refuse to start a job if it shouldn't. |
454 | m_startJobTimer.start(); |
455 | } |
456 | |
457 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
458 | } |
459 | |
460 | Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url) |
461 | { |
462 | int error; |
463 | QString errortext; |
464 | Worker *worker = Worker::createWorker(protocol, url, error, error_text&: errortext); |
465 | if (worker) { |
466 | connect(sender: worker, signal: &Worker::workerDied, context: scheduler(), slot: [](KIO::Worker *worker) { |
467 | schedulerPrivate()->slotWorkerDied(worker); |
468 | }); |
469 | } else { |
470 | qCWarning(KIO_CORE) << "couldn't create worker:" << errortext; |
471 | if (job) { |
472 | job->slotError(error, errortext); |
473 | } |
474 | } |
475 | return worker; |
476 | } |
477 | |
478 | bool ProtoQueue::removeWorker(KIO::Worker *worker) |
479 | { |
480 | const bool removed = m_workerManager.removeWorker(worker); |
481 | return removed; |
482 | } |
483 | |
484 | QList<Worker *> ProtoQueue::allWorkers() const |
485 | { |
486 | QList<Worker *> ret(m_workerManager.allWorkers()); |
487 | auto it = m_queuesByHostname.cbegin(); |
488 | for (; it != m_queuesByHostname.cend(); ++it) { |
489 | ret.append(other: it.value().allWorkers()); |
490 | } |
491 | |
492 | return ret; |
493 | } |
494 | |
495 | // private slot |
496 | void ProtoQueue::startAJob() |
497 | { |
498 | ensureNoDuplicates(queuesBySerial: &m_queuesBySerial); |
499 | verifyRunningJobsCount(queues: &m_queuesByHostname, runningJobsCount: m_runningJobsCount); |
500 | |
501 | #ifdef SCHEDULER_DEBUG |
502 | // qDebug() << "m_runningJobsCount:" << m_runningJobsCount; |
503 | auto it = m_queuesByHostname.cbegin(); |
504 | for (; it != m_queuesByHostname.cend(); ++it) { |
505 | const QList<KIO::SimpleJob *> list = it.value().runningJobs(); |
506 | for (SimpleJob *job : list) { |
507 | // qDebug() << SimpleJobPrivate::get(job)->m_url; |
508 | } |
509 | } |
510 | #endif |
511 | if (m_runningJobsCount >= m_maxConnectionsTotal) { |
512 | #ifdef SCHEDULER_DEBUG |
513 | // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached."; |
514 | #endif |
515 | return; |
516 | } |
517 | |
518 | QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin(); |
519 | if (first != m_queuesBySerial.end()) { |
520 | // pick a job and maintain the queue invariant: lower serials first |
521 | HostQueue *hq = first.value(); |
522 | const int prevLowestSerial = first.key(); |
523 | Q_UNUSED(prevLowestSerial); |
524 | Q_ASSERT(hq->lowestSerial() == prevLowestSerial); |
525 | // the following assertions should hold due to queueJob(), takeFirstInQueue() and |
526 | // removeJob() being correct |
527 | Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost); |
528 | SimpleJob *startingJob = hq->takeFirstInQueue(); |
529 | Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost); |
530 | Q_ASSERT(hq->lowestSerial() != prevLowestSerial); |
531 | |
532 | m_queuesBySerial.erase(it: first); |
533 | // we've increased hq's runningJobsCount() by calling nexStartingJob() |
534 | // so we need to check again. |
535 | if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) { |
536 | m_queuesBySerial.insert(key: hq->lowestSerial(), value: hq); |
537 | } |
538 | |
539 | // always increase m_runningJobsCount because it's correct if there is a worker and if there |
540 | // is no worker, removeJob() will balance the number again. removeJob() would decrease the |
541 | // number too much otherwise. |
542 | // Note that createWorker() can call slotError() on a job which in turn calls removeJob(), |
543 | // so increase the count here already. |
544 | m_runningJobsCount++; |
545 | |
546 | bool isNewWorker = false; |
547 | Worker *worker = m_workerManager.takeWorkerForJob(job: startingJob); |
548 | SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job: startingJob); |
549 | if (!worker) { |
550 | isNewWorker = true; |
551 | worker = createWorker(protocol: jobPriv->m_protocol, job: startingJob, url: jobPriv->m_url); |
552 | } |
553 | |
554 | if (worker) { |
555 | jobPriv->m_worker = worker; |
556 | schedulerPrivate()->setupWorker(worker, url: jobPriv->m_url, protocol: jobPriv->m_protocol, proxyList: jobPriv->m_proxyList, newWorker: isNewWorker); |
557 | startJob(job: startingJob, worker); |
558 | } else { |
559 | // dispose of our records about the job and mark the job as unknown |
560 | // (to prevent crashes later) |
561 | // note that the job's slotError() can have called removeJob() first, so check that |
562 | // it's not a ghost job with null serial already. |
563 | if (jobPriv->m_schedSerial) { |
564 | removeJob(job: startingJob); |
565 | jobPriv->m_schedSerial = 0; |
566 | } |
567 | } |
568 | } else { |
569 | #ifdef SCHEDULER_DEBUG |
570 | // qDebug() << "not starting any jobs because there is no queued job."; |
571 | #endif |
572 | } |
573 | |
574 | if (!m_queuesBySerial.isEmpty()) { |
575 | m_startJobTimer.start(); |
576 | } |
577 | } |
578 | |
579 | Scheduler::Scheduler() |
580 | { |
581 | setObjectName(QStringLiteral("scheduler" )); |
582 | |
583 | #ifndef KIO_ANDROID_STUB |
584 | const QString dbusPath = QStringLiteral("/KIO/Scheduler" ); |
585 | const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler" ); |
586 | QDBusConnection dbus = QDBusConnection::sessionBus(); |
587 | // Not needed, right? We just want to emit two signals. |
588 | // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots | |
589 | // QDBusConnection::ExportScriptableSignals); |
590 | dbus.connect(service: QString(), |
591 | path: dbusPath, |
592 | interface: dbusInterface, |
593 | QStringLiteral("reparseSlaveConfiguration" ), |
594 | receiver: this, |
595 | SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage))); |
596 | #endif |
597 | } |
598 | |
599 | Scheduler::~Scheduler() |
600 | { |
601 | } |
602 | |
603 | void Scheduler::doJob(SimpleJob *job) |
604 | { |
605 | schedulerPrivate()->doJob(job); |
606 | } |
607 | |
608 | // static |
609 | void Scheduler::cancelJob(SimpleJob *job) |
610 | { |
611 | schedulerPrivate()->cancelJob(job); |
612 | } |
613 | |
614 | void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker) |
615 | { |
616 | schedulerPrivate()->jobFinished(job, worker); |
617 | } |
618 | |
619 | void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) |
620 | { |
621 | schedulerPrivate()->putWorkerOnHold(job, url); |
622 | } |
623 | |
624 | void Scheduler::removeWorkerOnHold() |
625 | { |
626 | schedulerPrivate()->removeWorkerOnHold(); |
627 | } |
628 | |
629 | bool Scheduler::isWorkerOnHoldFor(const QUrl &url) |
630 | { |
631 | return schedulerPrivate()->isWorkerOnHoldFor(url); |
632 | } |
633 | |
634 | void Scheduler::updateInternalMetaData(SimpleJob *job) |
635 | { |
636 | schedulerPrivate()->updateInternalMetaData(job); |
637 | } |
638 | |
639 | void Scheduler::emitReparseSlaveConfiguration() |
640 | { |
641 | #ifndef KIO_ANDROID_STUB |
642 | // Do it immediately in this process, otherwise we might send a request before reparsing |
643 | // (e.g. when changing useragent in the plugin) |
644 | schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage()); |
645 | #endif |
646 | |
647 | schedulerPrivate()->m_ignoreConfigReparse = true; |
648 | Q_EMIT self()->reparseSlaveConfiguration(QString()); |
649 | } |
650 | |
651 | #ifndef KIO_ANDROID_STUB |
652 | void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &) |
653 | { |
654 | if (m_ignoreConfigReparse) { |
655 | // qDebug() << "Ignoring signal sent by myself"; |
656 | m_ignoreConfigReparse = false; |
657 | return; |
658 | } |
659 | |
660 | // qDebug() << "proto=" << proto; |
661 | KProtocolManager::reparseConfiguration(); |
662 | WorkerConfig::self()->reset(); |
663 | sessionData.reset(); |
664 | |
665 | QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(key: proto); |
666 | QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd(); |
667 | |
668 | // not found? |
669 | if (it == endIt) { |
670 | return; |
671 | } |
672 | |
673 | if (!proto.isEmpty()) { |
674 | endIt = it; |
675 | ++endIt; |
676 | } |
677 | |
678 | for (; it != endIt; ++it) { |
679 | const QList<KIO::Worker *> list = it.value()->allWorkers(); |
680 | for (Worker *worker : list) { |
681 | worker->send(cmd: CMD_REPARSECONFIGURATION); |
682 | worker->resetHost(); |
683 | } |
684 | } |
685 | } |
686 | #endif |
687 | |
688 | void SchedulerPrivate::doJob(SimpleJob *job) |
689 | { |
690 | // qDebug() << job; |
691 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
692 | jobPriv->m_proxyList.clear(); |
693 | jobPriv->m_protocol = KProtocolManagerPrivate::workerProtocol(url: job->url(), proxy&: jobPriv->m_proxyList); |
694 | |
695 | ProtoQueue *proto = protoQ(protocol: jobPriv->m_protocol, host: job->url().host()); |
696 | proto->queueJob(job); |
697 | } |
698 | |
699 | void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority) |
700 | { |
701 | // qDebug() << job << priority; |
702 | const QString protocol = SimpleJobPrivate::get(job)->m_protocol; |
703 | if (!protocol.isEmpty()) { |
704 | ProtoQueue *proto = protoQ(protocol, host: job->url().host()); |
705 | proto->changeJobPriority(job, newPrio: priority); |
706 | } |
707 | } |
708 | |
709 | void SchedulerPrivate::cancelJob(SimpleJob *job) |
710 | { |
711 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
712 | // this method is called all over the place in job.cpp, so just do this check here to avoid |
713 | // much boilerplate in job code. |
714 | if (jobPriv->m_schedSerial == 0) { |
715 | // qDebug() << "Doing nothing because I don't know job" << job; |
716 | return; |
717 | } |
718 | Worker *worker = jobSWorker(job); |
719 | // qDebug() << job << worker; |
720 | jobFinished(job, worker); |
721 | if (worker) { |
722 | ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol); |
723 | if (pq) { |
724 | pq->removeWorker(worker); |
725 | } |
726 | worker->kill(); // don't use worker after this! |
727 | } |
728 | } |
729 | |
730 | void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker) |
731 | { |
732 | // qDebug() << job << worker; |
733 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
734 | |
735 | // make sure that we knew about the job! |
736 | Q_ASSERT(jobPriv->m_schedSerial); |
737 | |
738 | ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol); |
739 | if (pq) { |
740 | pq->removeJob(job); |
741 | } |
742 | |
743 | if (worker) { |
744 | // If we have internal meta-data, tell existing KIO workers to reload |
745 | // their configuration. |
746 | if (jobPriv->m_internalMetaData.count()) { |
747 | // qDebug() << "Updating KIO workers with new internal metadata information"; |
748 | ProtoQueue *queue = m_protocols.value(key: worker->protocol()); |
749 | if (queue) { |
750 | const QList<Worker *> workers = queue->allWorkers(); |
751 | for (auto *runningWorker : workers) { |
752 | if (worker->host() == runningWorker->host()) { |
753 | worker->setConfig(metaDataFor(protocol: worker->protocol(), proxyList: jobPriv->m_proxyList, url: job->url())); |
754 | /*qDebug() << "Updated configuration of" << worker->protocol() |
755 | << "KIO worker, pid=" << worker->worker_pid();*/ |
756 | } |
757 | } |
758 | } |
759 | } |
760 | worker->setJob(nullptr); |
761 | worker->disconnect(receiver: job); |
762 | } |
763 | jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again |
764 | jobPriv->m_worker = nullptr; |
765 | // Clear the values in the internal metadata container since they have |
766 | // already been taken care of above... |
767 | jobPriv->m_internalMetaData.clear(); |
768 | } |
769 | |
770 | MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url) |
771 | { |
772 | const QString host = url.host(); |
773 | MetaData configData = WorkerConfig::self()->configData(protocol, host); |
774 | sessionData.configDataFor(configData, proto: protocol, host); |
775 | if (proxyList.isEmpty()) { |
776 | configData.remove(QStringLiteral("UseProxy" )); |
777 | configData.remove(QStringLiteral("ProxyUrls" )); |
778 | } else { |
779 | configData[QStringLiteral("UseProxy" )] = proxyList.first(); |
780 | configData[QStringLiteral("ProxyUrls" )] = proxyList.join(sep: QLatin1Char(',')); |
781 | } |
782 | |
783 | return configData; |
784 | } |
785 | |
786 | void SchedulerPrivate::setupWorker(KIO::Worker *worker, |
787 | const QUrl &url, |
788 | const QString &protocol, |
789 | const QStringList &proxyList, |
790 | bool newWorker, |
791 | const KIO::MetaData *config) |
792 | { |
793 | int port = url.port(); |
794 | if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that. |
795 | port = 0; |
796 | } |
797 | const QString host = url.host(); |
798 | const QString user = url.userName(); |
799 | const QString passwd = url.password(); |
800 | |
801 | if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) { |
802 | MetaData configData = metaDataFor(protocol, proxyList, url); |
803 | if (config) { |
804 | configData += *config; |
805 | } |
806 | |
807 | worker->setConfig(configData); |
808 | worker->setProtocol(url.scheme()); |
809 | worker->setHost(host, port, user, passwd); |
810 | } |
811 | } |
812 | |
813 | void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker) |
814 | { |
815 | // qDebug() << worker; |
816 | Q_ASSERT(worker); |
817 | Q_ASSERT(!worker->isAlive()); |
818 | ProtoQueue *pq = m_protocols.value(key: worker->protocol()); |
819 | if (pq) { |
820 | if (worker->job()) { |
821 | pq->removeJob(job: worker->job()); |
822 | } |
823 | // in case this was a connected worker... |
824 | pq->removeWorker(worker); |
825 | } |
826 | if (worker == m_workerOnHold) { |
827 | m_workerOnHold = nullptr; |
828 | m_urlOnHold.clear(); |
829 | } |
830 | // can't use worker->deref() here because we need to use deleteLater |
831 | worker->aboutToDelete(); |
832 | worker->deleteLater(); |
833 | } |
834 | |
835 | void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url) |
836 | { |
837 | Worker *worker = jobSWorker(job); |
838 | // qDebug() << job << url << worker; |
839 | worker->disconnect(receiver: job); |
840 | // prevent the fake death of the worker from trying to kill the job again; |
841 | // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold(). |
842 | worker->setJob(nullptr); |
843 | SimpleJobPrivate::get(job)->m_worker = nullptr; |
844 | |
845 | if (m_workerOnHold) { |
846 | m_workerOnHold->kill(); |
847 | } |
848 | m_workerOnHold = worker; |
849 | m_urlOnHold = url; |
850 | m_workerOnHold->suspend(); |
851 | } |
852 | |
853 | bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url) |
854 | { |
855 | if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) { |
856 | return true; |
857 | } |
858 | |
859 | return false; |
860 | } |
861 | |
862 | Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job) |
863 | { |
864 | Worker *worker = nullptr; |
865 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
866 | |
867 | if (m_workerOnHold) { |
868 | // Make sure that the job wants to do a GET or a POST, and with no offset |
869 | const int cmd = jobPriv->m_command; |
870 | bool canJobReuse = (cmd == CMD_GET); |
871 | |
872 | if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(object: job)) { |
873 | canJobReuse = (canJobReuse || cmd == CMD_SPECIAL); |
874 | if (canJobReuse) { |
875 | KIO::MetaData outgoing = tJob->outgoingMetaData(); |
876 | const QString resume = outgoing.value(QStringLiteral("resume" )); |
877 | const QString rangeStart = outgoing.value(QStringLiteral("range-start" )); |
878 | // qDebug() << "Resume metadata is" << resume; |
879 | canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0')); |
880 | } |
881 | } |
882 | |
883 | if (job->url() == m_urlOnHold) { |
884 | if (canJobReuse) { |
885 | // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")"; |
886 | worker = m_workerOnHold; |
887 | } else { |
888 | // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")"; |
889 | m_workerOnHold->kill(); |
890 | } |
891 | m_workerOnHold = nullptr; |
892 | m_urlOnHold.clear(); |
893 | } |
894 | } |
895 | |
896 | return worker; |
897 | } |
898 | |
899 | void SchedulerPrivate::removeWorkerOnHold() |
900 | { |
901 | // qDebug() << m_workerOnHold; |
902 | if (m_workerOnHold) { |
903 | m_workerOnHold->kill(); |
904 | } |
905 | m_workerOnHold = nullptr; |
906 | m_urlOnHold.clear(); |
907 | } |
908 | |
909 | ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host) |
910 | { |
911 | ProtoQueue *pq = m_protocols.value(key: protocol, defaultValue: nullptr); |
912 | if (!pq) { |
913 | // qDebug() << "creating ProtoQueue instance for" << protocol; |
914 | |
915 | const int maxWorkers = KProtocolInfo::maxWorkers(protocol); |
916 | int maxWorkersPerHost = -1; |
917 | if (!host.isEmpty()) { |
918 | bool ok = false; |
919 | const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections" )).toInt(ok: &ok); |
920 | if (ok) { |
921 | maxWorkersPerHost = value; |
922 | } |
923 | } |
924 | if (maxWorkersPerHost == -1) { |
925 | maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol); |
926 | } |
927 | // Never allow maxWorkersPerHost to exceed maxWorkers. |
928 | pq = new ProtoQueue(maxWorkers, qMin(a: maxWorkers, b: maxWorkersPerHost)); |
929 | m_protocols.insert(key: protocol, value: pq); |
930 | } |
931 | return pq; |
932 | } |
933 | |
934 | void SchedulerPrivate::updateInternalMetaData(SimpleJob *job) |
935 | { |
936 | KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); |
937 | // Preserve all internal meta-data so they can be sent back to the |
938 | // KIO workers as needed... |
939 | const QUrl jobUrl = job->url(); |
940 | |
941 | const QLatin1String currHostToken("{internal~currenthost}" ); |
942 | const QLatin1String allHostsToken("{internal~allhosts}" ); |
943 | // qDebug() << job << jobPriv->m_internalMetaData; |
944 | QMapIterator<QString, QString> it(jobPriv->m_internalMetaData); |
945 | while (it.hasNext()) { |
946 | it.next(); |
947 | if (it.key().startsWith(s: currHostToken, cs: Qt::CaseInsensitive)) { |
948 | WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: jobUrl.host(), key: it.key().mid(position: currHostToken.size()), value: it.value()); |
949 | } else if (it.key().startsWith(s: allHostsToken, cs: Qt::CaseInsensitive)) { |
950 | WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: QString(), key: it.key().mid(position: allHostsToken.size()), value: it.value()); |
951 | } |
952 | } |
953 | } |
954 | |
955 | #include "moc_scheduler.cpp" |
956 | #include "moc_scheduler_p.cpp" |
957 | |