1/*
2 This file is part of the KDE libraries
3 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org>
4 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org>
5 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com>
6
7 SPDX-License-Identifier: LGPL-2.0-only
8*/
9
10#include "scheduler.h"
11#include "scheduler_p.h"
12
13#include "connection_p.h"
14#include "job_p.h"
15#include "kprotocolmanager_p.h"
16#include "sessiondata_p.h"
17#include "worker_p.h"
18#include "workerconfig.h"
19
20#include <kprotocolinfo.h>
21#include <kprotocolmanager.h>
22
23#ifndef KIO_ANDROID_STUB
24#include <QDBusConnection>
25#include <QDBusMessage>
26#endif
27#include <QHash>
28#include <QThread>
29#include <QThreadStorage>
30
31// Workers may be idle for a certain time (3 minutes) before they are killed.
32static const int s_idleWorkerLifetime = 3 * 60;
33
34using namespace KIO;
35
36static inline Worker *jobSWorker(SimpleJob *job)
37{
38 return SimpleJobPrivate::get(job)->m_worker;
39}
40
41static inline int jobCommand(SimpleJob *job)
42{
43 return SimpleJobPrivate::get(job)->m_command;
44}
45
46static inline void startJob(SimpleJob *job, Worker *worker)
47{
48 SimpleJobPrivate::get(job)->start(worker);
49}
50
51class KIO::SchedulerPrivate
52{
53public:
54 SchedulerPrivate()
55 : q(new Scheduler())
56 {
57 }
58
59 ~SchedulerPrivate()
60 {
61 removeWorkerOnHold();
62 delete q;
63 q = nullptr;
64 qDeleteAll(c: m_protocols); // ~ProtoQueue will kill and delete all workers
65 }
66
67 SchedulerPrivate(const SchedulerPrivate &) = delete;
68 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete;
69
70 Scheduler *q;
71
72 Worker *m_workerOnHold = nullptr;
73 QUrl m_urlOnHold;
74 bool m_ignoreConfigReparse = false;
75
76 SessionData sessionData;
77
78 void doJob(SimpleJob *job);
79 void setJobPriority(SimpleJob *job, int priority);
80 void cancelJob(SimpleJob *job);
81 void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker);
82 void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url);
83 void removeWorkerOnHold();
84 Worker *heldWorkerForJob(KIO::SimpleJob *job);
85 bool isWorkerOnHoldFor(const QUrl &url);
86 void updateInternalMetaData(SimpleJob *job);
87
88 MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url);
89 void setupWorker(KIO::Worker *worker,
90 const QUrl &url,
91 const QString &protocol,
92 const QStringList &proxyList,
93 bool newWorker,
94 const KIO::MetaData *config = nullptr);
95
96 void slotWorkerDied(KIO::Worker *worker);
97
98#ifndef KIO_ANDROID_STUB
99 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &);
100#endif
101
102 ProtoQueue *protoQ(const QString &protocol, const QString &host);
103
104private:
105 QHash<QString, ProtoQueue *> m_protocols;
106};
107
108static QThreadStorage<SchedulerPrivate *> s_storage;
109static SchedulerPrivate *schedulerPrivate()
110{
111 if (!s_storage.hasLocalData()) {
112 s_storage.setLocalData(new SchedulerPrivate);
113 }
114 return s_storage.localData();
115}
116
117Scheduler *Scheduler::self()
118{
119 return schedulerPrivate()->q;
120}
121
122SchedulerPrivate *Scheduler::d_func()
123{
124 return schedulerPrivate();
125}
126
127// static
128Scheduler *scheduler()
129{
130 return schedulerPrivate()->q;
131}
132
133////////////////////////////
134
135int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const
136{
137 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
138 newPriority = qBound(min: -10, val: newPriority, max: 10);
139 int unbiasedSerial = oldSerial % m_jobsPerPriority;
140 return unbiasedSerial + newPriority * m_jobsPerPriority;
141}
142
143WorkerManager::WorkerManager()
144{
145 m_grimTimer.setSingleShot(true);
146 connect(sender: &m_grimTimer, signal: &QTimer::timeout, context: this, slot: &WorkerManager::grimReaper);
147}
148
149WorkerManager::~WorkerManager()
150{
151 grimReaper();
152}
153
154void WorkerManager::returnWorker(Worker *worker)
155{
156 Q_ASSERT(worker);
157 worker->setIdle();
158 m_idleWorkers.insert(key: worker->host(), value: worker);
159 scheduleGrimReaper();
160}
161
162Worker *WorkerManager::takeWorkerForJob(SimpleJob *job)
163{
164 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
165 if (worker) {
166 return worker;
167 }
168
169 QUrl url = SimpleJobPrivate::get(job)->m_url;
170 // TODO take port, username and password into account
171 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(key: url.host());
172 if (it == m_idleWorkers.end()) {
173 it = m_idleWorkers.begin();
174 }
175 if (it == m_idleWorkers.end()) {
176 return nullptr;
177 }
178 worker = it.value();
179 m_idleWorkers.erase(it);
180 return worker;
181}
182
183bool WorkerManager::removeWorker(Worker *worker)
184{
185 // ### performance not so great
186 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin();
187 for (; it != m_idleWorkers.end(); ++it) {
188 if (it.value() == worker) {
189 m_idleWorkers.erase(it);
190 return true;
191 }
192 }
193 return false;
194}
195
196void WorkerManager::clear()
197{
198 m_idleWorkers.clear();
199}
200
201QList<Worker *> WorkerManager::allWorkers() const
202{
203 return m_idleWorkers.values();
204}
205
206void WorkerManager::scheduleGrimReaper()
207{
208 if (!m_grimTimer.isActive()) {
209 m_grimTimer.start(msec: (s_idleWorkerLifetime / 2) * 1000);
210 }
211}
212
213// private slot
214void WorkerManager::grimReaper()
215{
216 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin();
217 while (it != m_idleWorkers.end()) {
218 Worker *worker = it.value();
219 if (worker->idleTime() >= s_idleWorkerLifetime) {
220 it = m_idleWorkers.erase(it);
221 if (worker->job()) {
222 // qDebug() << "Idle worker" << worker << "still has job" << worker->job();
223 }
224 // avoid invoking slotWorkerDied() because its cleanup services are not needed
225 worker->kill();
226 } else {
227 ++it;
228 }
229 }
230 if (!m_idleWorkers.isEmpty()) {
231 scheduleGrimReaper();
232 }
233}
234
235int HostQueue::lowestSerial() const
236{
237 QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin();
238 if (first != m_queuedJobs.constEnd()) {
239 return first.key();
240 }
241 return SerialPicker::maxSerial;
242}
243
244void HostQueue::queueJob(SimpleJob *job)
245{
246 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
247 Q_ASSERT(serial != 0);
248 Q_ASSERT(!m_queuedJobs.contains(serial));
249 Q_ASSERT(!m_runningJobs.contains(job));
250 m_queuedJobs.insert(key: serial, value: job);
251}
252
253SimpleJob *HostQueue::takeFirstInQueue()
254{
255 Q_ASSERT(!m_queuedJobs.isEmpty());
256 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin();
257 SimpleJob *job = first.value();
258 m_queuedJobs.erase(it: first);
259 m_runningJobs.insert(value: job);
260 return job;
261}
262
263bool HostQueue::removeJob(SimpleJob *job)
264{
265 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
266 if (m_runningJobs.remove(value: job)) {
267 Q_ASSERT(!m_queuedJobs.contains(serial));
268 return true;
269 }
270 if (m_queuedJobs.remove(key: serial)) {
271 return true;
272 }
273 return false;
274}
275
276QList<Worker *> HostQueue::allWorkers() const
277{
278 QList<Worker *> ret;
279 ret.reserve(asize: m_runningJobs.size());
280 for (SimpleJob *job : m_runningJobs) {
281 Worker *worker = jobSWorker(job);
282 Q_ASSERT(worker);
283 ret.append(t: worker);
284 }
285 return ret;
286}
287
288static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial)
289{
290 Q_UNUSED(queuesBySerial);
291#ifdef SCHEDULER_DEBUG
292 // a host queue may *never* be in queuesBySerial twice.
293 QSet<HostQueue *> seen;
294 auto it = queuesBySerial->cbegin();
295 for (; it != queuesBySerial->cend(); ++it) {
296 Q_ASSERT(!seen.contains(it.value()));
297 seen.insert(it.value());
298 }
299#endif
300}
301
302static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount)
303{
304 Q_UNUSED(queues);
305 Q_UNUSED(runningJobsCount);
306#ifdef SCHEDULER_DEBUG
307 int realRunningJobsCount = 0;
308 auto it = queues->cbegin();
309 for (; it != queues->cend(); ++it) {
310 realRunningJobsCount += it.value().runningJobsCount();
311 }
312 Q_ASSERT(realRunningJobsCount == runningJobsCount);
313
314 // ...and of course we may never run the same job twice!
315 QSet<SimpleJob *> seenJobs;
316 auto it2 = queues->cbegin();
317 for (; it2 != queues->cend(); ++it2) {
318 for (SimpleJob *job : it2.value().runningJobs()) {
319 Q_ASSERT(!seenJobs.contains(job));
320 seenJobs.insert(job);
321 }
322 }
323#endif
324}
325
326ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost)
327 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
328 , m_maxConnectionsTotal(qMax(a: maxWorkers, b: maxWorkersPerHost))
329 , m_runningJobsCount(0)
330
331{
332 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal
333 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/
334 Q_ASSERT(m_maxConnectionsPerHost >= 1);
335 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
336 m_startJobTimer.setSingleShot(true);
337 connect(sender: &m_startJobTimer, signal: &QTimer::timeout, context: this, slot: &ProtoQueue::startAJob);
338}
339
340ProtoQueue::~ProtoQueue()
341{
342 // Gather list of all workers first
343 const QList<Worker *> workers = allWorkers();
344 // Clear the idle workers in the manager to avoid dangling pointers
345 m_workerManager.clear();
346 for (Worker *worker : workers) {
347 // kill the worker process and remove the interface in our process
348 worker->kill();
349 }
350}
351
352void ProtoQueue::queueJob(SimpleJob *job)
353{
354 QString hostname = SimpleJobPrivate::get(job)->m_url.host();
355 HostQueue &hq = m_queuesByHostname[hostname];
356 const int prevLowestSerial = hq.lowestSerial();
357 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
358
359 // nevert insert a job twice
360 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
361 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
362
363 const bool wasQueueEmpty = hq.isQueueEmpty();
364 hq.queueJob(job);
365 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too...
366 // the queue's lowest serial job may have changed, so update the ordered list of queues.
367 // however, we ignore all jobs that would cause more connections to a host than allowed.
368 if (prevLowestSerial != hq.lowestSerial()) {
369 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
370 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs
371 if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) {
372 Q_UNUSED(wasQueueEmpty);
373 Q_ASSERT(wasQueueEmpty);
374 }
375 m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq);
376 } else {
377#ifdef SCHEDULER_DEBUG
378 // ### this assertion may fail if the limits were modified at runtime!
379 // if the per-host connection limit is already reached the host queue's lowest serial
380 // should not be queued.
381 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
382#endif
383 }
384 }
385 // just in case; startAJob() will refuse to start a job if it shouldn't.
386 m_startJobTimer.start();
387
388 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
389}
390
391void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio)
392{
393 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
394 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(key: jobPriv->m_url.host());
395 if (it == m_queuesByHostname.end()) {
396 return;
397 }
398 HostQueue &hq = it.value();
399 const int prevLowestSerial = hq.lowestSerial();
400 if (hq.isJobRunning(job) || !hq.removeJob(job)) {
401 return;
402 }
403 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(oldSerial: jobPriv->m_schedSerial, newPriority: newPrio);
404 hq.queueJob(job);
405 const bool needReinsert = hq.lowestSerial() != prevLowestSerial;
406 // the host queue might be absent from m_queuesBySerial because the connections per host limit
407 // for that host has been reached.
408 if (needReinsert && m_queuesBySerial.remove(key: prevLowestSerial)) {
409 m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq);
410 }
411 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
412}
413
414void ProtoQueue::removeJob(SimpleJob *job)
415{
416 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
417 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
418 const int prevLowestSerial = hq.lowestSerial();
419 const int prevRunningJobs = hq.runningJobsCount();
420
421 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
422
423 if (hq.removeJob(job)) {
424 if (hq.lowestSerial() != prevLowestSerial) {
425 // we have dequeued the not yet running job with the lowest serial
426 Q_ASSERT(!jobPriv->m_worker);
427 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
428 if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) {
429 // make sure that the queue was not scheduled for a good reason
430 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
431 }
432 } else {
433 if (prevRunningJobs != hq.runningJobsCount()) {
434 // we have dequeued a previously running job
435 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
436 m_runningJobsCount--;
437 Q_ASSERT(m_runningJobsCount >= 0);
438 }
439 }
440 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
441 // this may be a no-op, but it's faster than first checking if it's already in.
442 m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq);
443 }
444
445 if (hq.isEmpty()) {
446 // no queued jobs, no running jobs. this destroys hq from above.
447 m_queuesByHostname.remove(key: jobPriv->m_url.host());
448 }
449
450 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
451 m_workerManager.returnWorker(worker: jobPriv->m_worker);
452 }
453 // just in case; startAJob() will refuse to start a job if it shouldn't.
454 m_startJobTimer.start();
455 }
456
457 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
458}
459
460Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url)
461{
462 int error;
463 QString errortext;
464 Worker *worker = Worker::createWorker(protocol, url, error, error_text&: errortext);
465 if (worker) {
466 connect(sender: worker, signal: &Worker::workerDied, context: scheduler(), slot: [](KIO::Worker *worker) {
467 schedulerPrivate()->slotWorkerDied(worker);
468 });
469 } else {
470 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext;
471 if (job) {
472 job->slotError(error, errortext);
473 }
474 }
475 return worker;
476}
477
478bool ProtoQueue::removeWorker(KIO::Worker *worker)
479{
480 const bool removed = m_workerManager.removeWorker(worker);
481 return removed;
482}
483
484QList<Worker *> ProtoQueue::allWorkers() const
485{
486 QList<Worker *> ret(m_workerManager.allWorkers());
487 auto it = m_queuesByHostname.cbegin();
488 for (; it != m_queuesByHostname.cend(); ++it) {
489 ret.append(other: it.value().allWorkers());
490 }
491
492 return ret;
493}
494
495// private slot
496void ProtoQueue::startAJob()
497{
498 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
499 verifyRunningJobsCount(queues: &m_queuesByHostname, runningJobsCount: m_runningJobsCount);
500
501#ifdef SCHEDULER_DEBUG
502 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount;
503 auto it = m_queuesByHostname.cbegin();
504 for (; it != m_queuesByHostname.cend(); ++it) {
505 const QList<KIO::SimpleJob *> list = it.value().runningJobs();
506 for (SimpleJob *job : list) {
507 // qDebug() << SimpleJobPrivate::get(job)->m_url;
508 }
509 }
510#endif
511 if (m_runningJobsCount >= m_maxConnectionsTotal) {
512#ifdef SCHEDULER_DEBUG
513 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached.";
514#endif
515 return;
516 }
517
518 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin();
519 if (first != m_queuesBySerial.end()) {
520 // pick a job and maintain the queue invariant: lower serials first
521 HostQueue *hq = first.value();
522 const int prevLowestSerial = first.key();
523 Q_UNUSED(prevLowestSerial);
524 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
525 // the following assertions should hold due to queueJob(), takeFirstInQueue() and
526 // removeJob() being correct
527 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
528 SimpleJob *startingJob = hq->takeFirstInQueue();
529 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
530 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
531
532 m_queuesBySerial.erase(it: first);
533 // we've increased hq's runningJobsCount() by calling nexStartingJob()
534 // so we need to check again.
535 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
536 m_queuesBySerial.insert(key: hq->lowestSerial(), value: hq);
537 }
538
539 // always increase m_runningJobsCount because it's correct if there is a worker and if there
540 // is no worker, removeJob() will balance the number again. removeJob() would decrease the
541 // number too much otherwise.
542 // Note that createWorker() can call slotError() on a job which in turn calls removeJob(),
543 // so increase the count here already.
544 m_runningJobsCount++;
545
546 bool isNewWorker = false;
547 Worker *worker = m_workerManager.takeWorkerForJob(job: startingJob);
548 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job: startingJob);
549 if (!worker) {
550 isNewWorker = true;
551 worker = createWorker(protocol: jobPriv->m_protocol, job: startingJob, url: jobPriv->m_url);
552 }
553
554 if (worker) {
555 jobPriv->m_worker = worker;
556 schedulerPrivate()->setupWorker(worker, url: jobPriv->m_url, protocol: jobPriv->m_protocol, proxyList: jobPriv->m_proxyList, newWorker: isNewWorker);
557 startJob(job: startingJob, worker);
558 } else {
559 // dispose of our records about the job and mark the job as unknown
560 // (to prevent crashes later)
561 // note that the job's slotError() can have called removeJob() first, so check that
562 // it's not a ghost job with null serial already.
563 if (jobPriv->m_schedSerial) {
564 removeJob(job: startingJob);
565 jobPriv->m_schedSerial = 0;
566 }
567 }
568 } else {
569#ifdef SCHEDULER_DEBUG
570 // qDebug() << "not starting any jobs because there is no queued job.";
571#endif
572 }
573
574 if (!m_queuesBySerial.isEmpty()) {
575 m_startJobTimer.start();
576 }
577}
578
579Scheduler::Scheduler()
580{
581 setObjectName(QStringLiteral("scheduler"));
582
583#ifndef KIO_ANDROID_STUB
584 const QString dbusPath = QStringLiteral("/KIO/Scheduler");
585 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler");
586 QDBusConnection dbus = QDBusConnection::sessionBus();
587 // Not needed, right? We just want to emit two signals.
588 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots |
589 // QDBusConnection::ExportScriptableSignals);
590 dbus.connect(service: QString(),
591 path: dbusPath,
592 interface: dbusInterface,
593 QStringLiteral("reparseSlaveConfiguration"),
594 receiver: this,
595 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage)));
596#endif
597}
598
599Scheduler::~Scheduler()
600{
601}
602
603void Scheduler::doJob(SimpleJob *job)
604{
605 schedulerPrivate()->doJob(job);
606}
607
608// static
609void Scheduler::cancelJob(SimpleJob *job)
610{
611 schedulerPrivate()->cancelJob(job);
612}
613
614void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker)
615{
616 schedulerPrivate()->jobFinished(job, worker);
617}
618
619void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
620{
621 schedulerPrivate()->putWorkerOnHold(job, url);
622}
623
624void Scheduler::removeWorkerOnHold()
625{
626 schedulerPrivate()->removeWorkerOnHold();
627}
628
629bool Scheduler::isWorkerOnHoldFor(const QUrl &url)
630{
631 return schedulerPrivate()->isWorkerOnHoldFor(url);
632}
633
634void Scheduler::updateInternalMetaData(SimpleJob *job)
635{
636 schedulerPrivate()->updateInternalMetaData(job);
637}
638
639void Scheduler::emitReparseSlaveConfiguration()
640{
641#ifndef KIO_ANDROID_STUB
642 // Do it immediately in this process, otherwise we might send a request before reparsing
643 // (e.g. when changing useragent in the plugin)
644 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage());
645#endif
646
647 schedulerPrivate()->m_ignoreConfigReparse = true;
648 Q_EMIT self()->reparseSlaveConfiguration(QString());
649}
650
651#ifndef KIO_ANDROID_STUB
652void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &)
653{
654 if (m_ignoreConfigReparse) {
655 // qDebug() << "Ignoring signal sent by myself";
656 m_ignoreConfigReparse = false;
657 return;
658 }
659
660 // qDebug() << "proto=" << proto;
661 KProtocolManager::reparseConfiguration();
662 WorkerConfig::self()->reset();
663 sessionData.reset();
664
665 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(key: proto);
666 QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd();
667
668 // not found?
669 if (it == endIt) {
670 return;
671 }
672
673 if (!proto.isEmpty()) {
674 endIt = it;
675 ++endIt;
676 }
677
678 for (; it != endIt; ++it) {
679 const QList<KIO::Worker *> list = it.value()->allWorkers();
680 for (Worker *worker : list) {
681 worker->send(cmd: CMD_REPARSECONFIGURATION);
682 worker->resetHost();
683 }
684 }
685}
686#endif
687
688void SchedulerPrivate::doJob(SimpleJob *job)
689{
690 // qDebug() << job;
691 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
692 jobPriv->m_proxyList.clear();
693 jobPriv->m_protocol = KProtocolManagerPrivate::workerProtocol(url: job->url(), proxy&: jobPriv->m_proxyList);
694
695 ProtoQueue *proto = protoQ(protocol: jobPriv->m_protocol, host: job->url().host());
696 proto->queueJob(job);
697}
698
699void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority)
700{
701 // qDebug() << job << priority;
702 const QString protocol = SimpleJobPrivate::get(job)->m_protocol;
703 if (!protocol.isEmpty()) {
704 ProtoQueue *proto = protoQ(protocol, host: job->url().host());
705 proto->changeJobPriority(job, newPrio: priority);
706 }
707}
708
709void SchedulerPrivate::cancelJob(SimpleJob *job)
710{
711 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
712 // this method is called all over the place in job.cpp, so just do this check here to avoid
713 // much boilerplate in job code.
714 if (jobPriv->m_schedSerial == 0) {
715 // qDebug() << "Doing nothing because I don't know job" << job;
716 return;
717 }
718 Worker *worker = jobSWorker(job);
719 // qDebug() << job << worker;
720 jobFinished(job, worker);
721 if (worker) {
722 ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol);
723 if (pq) {
724 pq->removeWorker(worker);
725 }
726 worker->kill(); // don't use worker after this!
727 }
728}
729
730void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker)
731{
732 // qDebug() << job << worker;
733 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
734
735 // make sure that we knew about the job!
736 Q_ASSERT(jobPriv->m_schedSerial);
737
738 ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol);
739 if (pq) {
740 pq->removeJob(job);
741 }
742
743 if (worker) {
744 // If we have internal meta-data, tell existing KIO workers to reload
745 // their configuration.
746 if (jobPriv->m_internalMetaData.count()) {
747 // qDebug() << "Updating KIO workers with new internal metadata information";
748 ProtoQueue *queue = m_protocols.value(key: worker->protocol());
749 if (queue) {
750 const QList<Worker *> workers = queue->allWorkers();
751 for (auto *runningWorker : workers) {
752 if (worker->host() == runningWorker->host()) {
753 worker->setConfig(metaDataFor(protocol: worker->protocol(), proxyList: jobPriv->m_proxyList, url: job->url()));
754 /*qDebug() << "Updated configuration of" << worker->protocol()
755 << "KIO worker, pid=" << worker->worker_pid();*/
756 }
757 }
758 }
759 }
760 worker->setJob(nullptr);
761 worker->disconnect(receiver: job);
762 }
763 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again
764 jobPriv->m_worker = nullptr;
765 // Clear the values in the internal metadata container since they have
766 // already been taken care of above...
767 jobPriv->m_internalMetaData.clear();
768}
769
770MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const QUrl &url)
771{
772 const QString host = url.host();
773 MetaData configData = WorkerConfig::self()->configData(protocol, host);
774 sessionData.configDataFor(configData, proto: protocol, host);
775 if (proxyList.isEmpty()) {
776 configData.remove(QStringLiteral("UseProxy"));
777 configData.remove(QStringLiteral("ProxyUrls"));
778 } else {
779 configData[QStringLiteral("UseProxy")] = proxyList.first();
780 configData[QStringLiteral("ProxyUrls")] = proxyList.join(sep: QLatin1Char(','));
781 }
782
783 return configData;
784}
785
786void SchedulerPrivate::setupWorker(KIO::Worker *worker,
787 const QUrl &url,
788 const QString &protocol,
789 const QStringList &proxyList,
790 bool newWorker,
791 const KIO::MetaData *config)
792{
793 int port = url.port();
794 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that.
795 port = 0;
796 }
797 const QString host = url.host();
798 const QString user = url.userName();
799 const QString passwd = url.password();
800
801 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
802 MetaData configData = metaDataFor(protocol, proxyList, url);
803 if (config) {
804 configData += *config;
805 }
806
807 worker->setConfig(configData);
808 worker->setProtocol(url.scheme());
809 worker->setHost(host, port, user, passwd);
810 }
811}
812
813void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
814{
815 // qDebug() << worker;
816 Q_ASSERT(worker);
817 Q_ASSERT(!worker->isAlive());
818 ProtoQueue *pq = m_protocols.value(key: worker->protocol());
819 if (pq) {
820 if (worker->job()) {
821 pq->removeJob(job: worker->job());
822 }
823 // in case this was a connected worker...
824 pq->removeWorker(worker);
825 }
826 if (worker == m_workerOnHold) {
827 m_workerOnHold = nullptr;
828 m_urlOnHold.clear();
829 }
830 // can't use worker->deref() here because we need to use deleteLater
831 worker->aboutToDelete();
832 worker->deleteLater();
833}
834
835void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
836{
837 Worker *worker = jobSWorker(job);
838 // qDebug() << job << url << worker;
839 worker->disconnect(receiver: job);
840 // prevent the fake death of the worker from trying to kill the job again;
841 // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold().
842 worker->setJob(nullptr);
843 SimpleJobPrivate::get(job)->m_worker = nullptr;
844
845 if (m_workerOnHold) {
846 m_workerOnHold->kill();
847 }
848 m_workerOnHold = worker;
849 m_urlOnHold = url;
850 m_workerOnHold->suspend();
851}
852
853bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url)
854{
855 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
856 return true;
857 }
858
859 return false;
860}
861
862Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job)
863{
864 Worker *worker = nullptr;
865 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
866
867 if (m_workerOnHold) {
868 // Make sure that the job wants to do a GET or a POST, and with no offset
869 const int cmd = jobPriv->m_command;
870 bool canJobReuse = (cmd == CMD_GET);
871
872 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(object: job)) {
873 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
874 if (canJobReuse) {
875 KIO::MetaData outgoing = tJob->outgoingMetaData();
876 const QString resume = outgoing.value(QStringLiteral("resume"));
877 const QString rangeStart = outgoing.value(QStringLiteral("range-start"));
878 // qDebug() << "Resume metadata is" << resume;
879 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0'));
880 }
881 }
882
883 if (job->url() == m_urlOnHold) {
884 if (canJobReuse) {
885 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")";
886 worker = m_workerOnHold;
887 } else {
888 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")";
889 m_workerOnHold->kill();
890 }
891 m_workerOnHold = nullptr;
892 m_urlOnHold.clear();
893 }
894 }
895
896 return worker;
897}
898
899void SchedulerPrivate::removeWorkerOnHold()
900{
901 // qDebug() << m_workerOnHold;
902 if (m_workerOnHold) {
903 m_workerOnHold->kill();
904 }
905 m_workerOnHold = nullptr;
906 m_urlOnHold.clear();
907}
908
909ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host)
910{
911 ProtoQueue *pq = m_protocols.value(key: protocol, defaultValue: nullptr);
912 if (!pq) {
913 // qDebug() << "creating ProtoQueue instance for" << protocol;
914
915 const int maxWorkers = KProtocolInfo::maxWorkers(protocol);
916 int maxWorkersPerHost = -1;
917 if (!host.isEmpty()) {
918 bool ok = false;
919 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(ok: &ok);
920 if (ok) {
921 maxWorkersPerHost = value;
922 }
923 }
924 if (maxWorkersPerHost == -1) {
925 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol);
926 }
927 // Never allow maxWorkersPerHost to exceed maxWorkers.
928 pq = new ProtoQueue(maxWorkers, qMin(a: maxWorkers, b: maxWorkersPerHost));
929 m_protocols.insert(key: protocol, value: pq);
930 }
931 return pq;
932}
933
934void SchedulerPrivate::updateInternalMetaData(SimpleJob *job)
935{
936 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
937 // Preserve all internal meta-data so they can be sent back to the
938 // KIO workers as needed...
939 const QUrl jobUrl = job->url();
940
941 const QLatin1String currHostToken("{internal~currenthost}");
942 const QLatin1String allHostsToken("{internal~allhosts}");
943 // qDebug() << job << jobPriv->m_internalMetaData;
944 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData);
945 while (it.hasNext()) {
946 it.next();
947 if (it.key().startsWith(s: currHostToken, cs: Qt::CaseInsensitive)) {
948 WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: jobUrl.host(), key: it.key().mid(position: currHostToken.size()), value: it.value());
949 } else if (it.key().startsWith(s: allHostsToken, cs: Qt::CaseInsensitive)) {
950 WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: QString(), key: it.key().mid(position: allHostsToken.size()), value: it.value());
951 }
952 }
953}
954
955#include "moc_scheduler.cpp"
956#include "moc_scheduler_p.cpp"
957

source code of kio/src/core/scheduler.cpp