1/*
2 This file is part of the KDE libraries
3 SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org>
4 SPDX-FileCopyrightText: 2000 Waldo Bastian <bastian@kde.org>
5 SPDX-FileCopyrightText: 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com>
6
7 SPDX-License-Identifier: LGPL-2.0-only
8*/
9
10#include "scheduler.h"
11#include "scheduler_p.h"
12
13#include "job_p.h"
14#include "worker_p.h"
15#include "workerconfig.h"
16
17#include <kprotocolinfo.h>
18#include <kprotocolmanager.h>
19
20#ifdef WITH_QTDBUS
21#include <QDBusConnection>
22#include <QDBusMessage>
23#endif
24#include <QHash>
25#include <QThread>
26#include <QThreadStorage>
27
28// Workers may be idle for a certain time (3 minutes) before they are killed.
29static const int s_idleWorkerLifetime = 3 * 60;
30
31using namespace KIO;
32
33static inline Worker *jobSWorker(SimpleJob *job)
34{
35 return SimpleJobPrivate::get(job)->m_worker;
36}
37
38static inline void startJob(SimpleJob *job, Worker *worker)
39{
40 SimpleJobPrivate::get(job)->start(worker);
41}
42
43class KIO::SchedulerPrivate
44{
45public:
46 SchedulerPrivate()
47 : q(new Scheduler())
48 {
49 }
50
51 ~SchedulerPrivate()
52 {
53 removeWorkerOnHold();
54 delete q;
55 q = nullptr;
56 qDeleteAll(c: m_protocols); // ~ProtoQueue will kill and delete all workers
57 }
58
59 SchedulerPrivate(const SchedulerPrivate &) = delete;
60 SchedulerPrivate &operator=(const SchedulerPrivate &) = delete;
61
62 Scheduler *q;
63
64 Worker *m_workerOnHold = nullptr;
65 QUrl m_urlOnHold;
66 bool m_ignoreConfigReparse = false;
67
68 void doJob(SimpleJob *job);
69 void cancelJob(SimpleJob *job);
70 void jobFinished(KIO::SimpleJob *job, KIO::Worker *worker);
71 void putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url);
72 void removeWorkerOnHold();
73 Worker *heldWorkerForJob(KIO::SimpleJob *job);
74 bool isWorkerOnHoldFor(const QUrl &url);
75 void updateInternalMetaData(SimpleJob *job);
76
77 MetaData metaDataFor(const QString &protocol, const QUrl &url);
78 void setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config = nullptr);
79
80 void slotWorkerDied(KIO::Worker *worker);
81
82#ifdef WITH_QTDBUS
83 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage &);
84#endif
85
86 ProtoQueue *protoQ(const QString &protocol, const QString &host);
87
88private:
89 QHash<QString, ProtoQueue *> m_protocols;
90};
91
92static QThreadStorage<SchedulerPrivate *> s_storage;
93static SchedulerPrivate *schedulerPrivate()
94{
95 if (!s_storage.hasLocalData()) {
96 s_storage.setLocalData(new SchedulerPrivate);
97 }
98 return s_storage.localData();
99}
100
101Scheduler *Scheduler::self()
102{
103 return schedulerPrivate()->q;
104}
105
106SchedulerPrivate *Scheduler::d_func()
107{
108 return schedulerPrivate();
109}
110
111// static
112Scheduler *scheduler()
113{
114 return schedulerPrivate()->q;
115}
116
117////////////////////////////
118
119WorkerManager::WorkerManager()
120{
121 m_grimTimer.setSingleShot(true);
122 connect(sender: &m_grimTimer, signal: &QTimer::timeout, context: this, slot: &WorkerManager::grimReaper);
123}
124
125WorkerManager::~WorkerManager()
126{
127 grimReaper();
128}
129
130void WorkerManager::returnWorker(Worker *worker)
131{
132 Q_ASSERT(worker);
133 worker->setIdle();
134 m_idleWorkers.insert(key: worker->host(), value: worker);
135 scheduleGrimReaper();
136}
137
138Worker *WorkerManager::takeWorkerForJob(SimpleJob *job)
139{
140 Worker *worker = schedulerPrivate()->heldWorkerForJob(job);
141 if (worker) {
142 return worker;
143 }
144
145 QUrl url = SimpleJobPrivate::get(job)->m_url;
146 // TODO take port, username and password into account
147 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.find(key: url.host());
148 if (it == m_idleWorkers.end()) {
149 it = m_idleWorkers.begin();
150 }
151 if (it == m_idleWorkers.end()) {
152 return nullptr;
153 }
154 worker = it.value();
155 m_idleWorkers.erase(it);
156 return worker;
157}
158
159bool WorkerManager::removeWorker(Worker *worker)
160{
161 // ### performance not so great
162 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin();
163 for (; it != m_idleWorkers.end(); ++it) {
164 if (it.value() == worker) {
165 m_idleWorkers.erase(it);
166 return true;
167 }
168 }
169 return false;
170}
171
172void WorkerManager::clear()
173{
174 m_idleWorkers.clear();
175}
176
177QList<Worker *> WorkerManager::allWorkers() const
178{
179 return m_idleWorkers.values();
180}
181
182void WorkerManager::scheduleGrimReaper()
183{
184 if (!m_grimTimer.isActive()) {
185 m_grimTimer.start(msec: (s_idleWorkerLifetime / 2) * 1000);
186 }
187}
188
189// private slot
190void WorkerManager::grimReaper()
191{
192 QMultiHash<QString, Worker *>::Iterator it = m_idleWorkers.begin();
193 while (it != m_idleWorkers.end()) {
194 Worker *worker = it.value();
195 if (worker->idleTime() >= s_idleWorkerLifetime) {
196 it = m_idleWorkers.erase(it);
197 if (worker->job()) {
198 // qDebug() << "Idle worker" << worker << "still has job" << worker->job();
199 }
200 // avoid invoking slotWorkerDied() because its cleanup services are not needed
201 worker->kill();
202 } else {
203 ++it;
204 }
205 }
206 if (!m_idleWorkers.isEmpty()) {
207 scheduleGrimReaper();
208 }
209}
210
211int HostQueue::lowestSerial() const
212{
213 QMap<int, SimpleJob *>::ConstIterator first = m_queuedJobs.constBegin();
214 if (first != m_queuedJobs.constEnd()) {
215 return first.key();
216 }
217 return SerialPicker::maxSerial;
218}
219
220void HostQueue::queueJob(SimpleJob *job)
221{
222 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
223 Q_ASSERT(serial != 0);
224 Q_ASSERT(!m_queuedJobs.contains(serial));
225 Q_ASSERT(!m_runningJobs.contains(job));
226 m_queuedJobs.insert(key: serial, value: job);
227}
228
229SimpleJob *HostQueue::takeFirstInQueue()
230{
231 Q_ASSERT(!m_queuedJobs.isEmpty());
232 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin();
233 SimpleJob *job = first.value();
234 m_queuedJobs.erase(it: first);
235 m_runningJobs.insert(value: job);
236 return job;
237}
238
239bool HostQueue::removeJob(SimpleJob *job)
240{
241 const int serial = SimpleJobPrivate::get(job)->m_schedSerial;
242 if (m_runningJobs.remove(value: job)) {
243 Q_ASSERT(!m_queuedJobs.contains(serial));
244 return true;
245 }
246 if (m_queuedJobs.remove(key: serial)) {
247 return true;
248 }
249 return false;
250}
251
252QList<Worker *> HostQueue::allWorkers() const
253{
254 QList<Worker *> ret;
255 ret.reserve(asize: m_runningJobs.size());
256 for (SimpleJob *job : m_runningJobs) {
257 Worker *worker = jobSWorker(job);
258 Q_ASSERT(worker);
259 ret.append(t: worker);
260 }
261 return ret;
262}
263
264static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial)
265{
266 Q_UNUSED(queuesBySerial);
267#ifdef SCHEDULER_DEBUG
268 // a host queue may *never* be in queuesBySerial twice.
269 QSet<HostQueue *> seen;
270 auto it = queuesBySerial->cbegin();
271 for (; it != queuesBySerial->cend(); ++it) {
272 Q_ASSERT(!seen.contains(it.value()));
273 seen.insert(it.value());
274 }
275#endif
276}
277
278static void verifyRunningJobsCount(const std::unordered_map<QString, HostQueue> *const queues, int runningJobsCount)
279{
280 Q_UNUSED(queues);
281 Q_UNUSED(runningJobsCount);
282#ifdef SCHEDULER_DEBUG
283 int realRunningJobsCount = 0;
284 auto it = queues->cbegin();
285 for (; it != queues->cend(); ++it) {
286 realRunningJobsCount += it->second.runningJobsCount();
287 }
288 Q_ASSERT(realRunningJobsCount == runningJobsCount);
289
290 // ...and of course we may never run the same job twice!
291 QSet<SimpleJob *> seenJobs;
292 auto it2 = queues->cbegin();
293 for (; it2 != queues->cend(); ++it2) {
294 for (SimpleJob *job : it2->second.runningJobs()) {
295 Q_ASSERT(!seenJobs.contains(job));
296 seenJobs.insert(job);
297 }
298 }
299#endif
300}
301
302ProtoQueue::ProtoQueue(int maxWorkers, int maxWorkersPerHost)
303 : m_maxConnectionsPerHost(maxWorkersPerHost ? maxWorkersPerHost : maxWorkers)
304 , m_maxConnectionsTotal(qMax(a: maxWorkers, b: maxWorkersPerHost))
305 , m_runningJobsCount(0)
306
307{
308 /*qDebug() << "m_maxConnectionsTotal:" << m_maxConnectionsTotal
309 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;*/
310 Q_ASSERT(m_maxConnectionsPerHost >= 1);
311 Q_ASSERT(maxWorkers >= maxWorkersPerHost);
312 m_startJobTimer.setSingleShot(true);
313 connect(sender: &m_startJobTimer, signal: &QTimer::timeout, context: this, slot: &ProtoQueue::startAJob);
314}
315
316ProtoQueue::~ProtoQueue()
317{
318 // Gather list of all workers first
319 const QList<Worker *> workers = allWorkers();
320 // Clear the idle workers in the manager to avoid dangling pointers
321 m_workerManager.clear();
322 for (Worker *worker : workers) {
323 // kill the worker process and remove the interface in our process
324 worker->kill();
325 }
326}
327
328void ProtoQueue::queueJob(SimpleJob *job)
329{
330 QString hostname = SimpleJobPrivate::get(job)->m_url.host();
331 HostQueue &hq = m_queuesByHostname[hostname];
332 const int prevLowestSerial = hq.lowestSerial();
333 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
334
335 // nevert insert a job twice
336 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0);
337 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next();
338
339 const bool wasQueueEmpty = hq.isQueueEmpty();
340 hq.queueJob(job);
341 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too...
342 // the queue's lowest serial job may have changed, so update the ordered list of queues.
343 // however, we ignore all jobs that would cause more connections to a host than allowed.
344 if (prevLowestSerial != hq.lowestSerial()) {
345 if (hq.runningJobsCount() < m_maxConnectionsPerHost) {
346 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs
347 if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) {
348 Q_UNUSED(wasQueueEmpty);
349 Q_ASSERT(wasQueueEmpty);
350 }
351 m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq);
352 } else {
353#ifdef SCHEDULER_DEBUG
354 // ### this assertion may fail if the limits were modified at runtime!
355 // if the per-host connection limit is already reached the host queue's lowest serial
356 // should not be queued.
357 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
358#endif
359 }
360 }
361 // just in case; startAJob() will refuse to start a job if it shouldn't.
362 m_startJobTimer.start();
363
364 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
365}
366
367void ProtoQueue::removeJob(SimpleJob *job)
368{
369 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job);
370 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()];
371 const int prevLowestSerial = hq.lowestSerial();
372 const int prevRunningJobs = hq.runningJobsCount();
373
374 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost);
375
376 if (hq.removeJob(job)) {
377 if (hq.lowestSerial() != prevLowestSerial) {
378 // we have dequeued the not yet running job with the lowest serial
379 Q_ASSERT(!jobPriv->m_worker);
380 Q_ASSERT(prevRunningJobs == hq.runningJobsCount());
381 if (m_queuesBySerial.remove(key: prevLowestSerial) == 0) {
382 // make sure that the queue was not scheduled for a good reason
383 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost);
384 }
385 } else {
386 if (prevRunningJobs != hq.runningJobsCount()) {
387 // we have dequeued a previously running job
388 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount());
389 m_runningJobsCount--;
390 Q_ASSERT(m_runningJobsCount >= 0);
391 }
392 }
393 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) {
394 // this may be a no-op, but it's faster than first checking if it's already in.
395 m_queuesBySerial.insert(key: hq.lowestSerial(), value: &hq);
396 }
397
398 if (hq.isEmpty()) {
399 // no queued jobs, no running jobs. this destroys hq from above.
400 m_queuesByHostname.erase(x: jobPriv->m_url.host());
401 }
402
403 if (jobPriv->m_worker && jobPriv->m_worker->isAlive()) {
404 m_workerManager.returnWorker(worker: jobPriv->m_worker);
405 }
406 // just in case; startAJob() will refuse to start a job if it shouldn't.
407 m_startJobTimer.start();
408 }
409
410 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
411}
412
413Worker *ProtoQueue::createWorker(const QString &protocol, SimpleJob *job, const QUrl &url)
414{
415 int error;
416 QString errortext;
417 Worker *worker = Worker::createWorker(protocol, url, error, error_text&: errortext);
418 if (worker) {
419 connect(sender: worker, signal: &Worker::workerDied, context: scheduler(), slot: [](KIO::Worker *worker) {
420 schedulerPrivate()->slotWorkerDied(worker);
421 });
422 } else {
423 qCWarning(KIO_CORE) << "couldn't create worker:" << errortext;
424 if (job) {
425 job->slotError(error, errortext);
426 }
427 }
428 return worker;
429}
430
431bool ProtoQueue::removeWorker(KIO::Worker *worker)
432{
433 const bool removed = m_workerManager.removeWorker(worker);
434 return removed;
435}
436
437QList<Worker *> ProtoQueue::allWorkers() const
438{
439 QList<Worker *> ret(m_workerManager.allWorkers());
440 auto it = m_queuesByHostname.cbegin();
441 for (; it != m_queuesByHostname.cend(); ++it) {
442 ret.append(other: it->second.allWorkers());
443 }
444
445 return ret;
446}
447
448// private slot
449void ProtoQueue::startAJob()
450{
451 ensureNoDuplicates(queuesBySerial: &m_queuesBySerial);
452 verifyRunningJobsCount(queues: &m_queuesByHostname, runningJobsCount: m_runningJobsCount);
453
454#ifdef SCHEDULER_DEBUG
455 // qDebug() << "m_runningJobsCount:" << m_runningJobsCount;
456 auto it = m_queuesByHostname.cbegin();
457 for (; it != m_queuesByHostname.cend(); ++it) {
458 const QList<KIO::SimpleJob *> list = it->second.runningJobs();
459 for (SimpleJob *job : list) {
460 // qDebug() << SimpleJobPrivate::get(job)->m_url;
461 }
462 }
463#endif
464 if (m_runningJobsCount >= m_maxConnectionsTotal) {
465#ifdef SCHEDULER_DEBUG
466 // qDebug() << "not starting any jobs because maxConnectionsTotal has been reached.";
467#endif
468 return;
469 }
470
471 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin();
472 if (first != m_queuesBySerial.end()) {
473 // pick a job and maintain the queue invariant: lower serials first
474 HostQueue *hq = first.value();
475 const int prevLowestSerial = first.key();
476 Q_UNUSED(prevLowestSerial);
477 Q_ASSERT(hq->lowestSerial() == prevLowestSerial);
478 // the following assertions should hold due to queueJob(), takeFirstInQueue() and
479 // removeJob() being correct
480 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost);
481 SimpleJob *startingJob = hq->takeFirstInQueue();
482 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost);
483 Q_ASSERT(hq->lowestSerial() != prevLowestSerial);
484
485 m_queuesBySerial.erase(it: first);
486 // we've increased hq's runningJobsCount() by calling nexStartingJob()
487 // so we need to check again.
488 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) {
489 m_queuesBySerial.insert(key: hq->lowestSerial(), value: hq);
490 }
491
492 // always increase m_runningJobsCount because it's correct if there is a worker and if there
493 // is no worker, removeJob() will balance the number again. removeJob() would decrease the
494 // number too much otherwise.
495 // Note that createWorker() can call slotError() on a job which in turn calls removeJob(),
496 // so increase the count here already.
497 m_runningJobsCount++;
498
499 bool isNewWorker = false;
500 Worker *worker = m_workerManager.takeWorkerForJob(job: startingJob);
501 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job: startingJob);
502 if (!worker) {
503 isNewWorker = true;
504 worker = createWorker(protocol: jobPriv->m_protocol, job: startingJob, url: jobPriv->m_url);
505 }
506
507 if (worker) {
508 jobPriv->m_worker = worker;
509 schedulerPrivate()->setupWorker(worker, url: jobPriv->m_url, protocol: jobPriv->m_protocol, newWorker: isNewWorker);
510 startJob(job: startingJob, worker);
511 } else {
512 // dispose of our records about the job and mark the job as unknown
513 // (to prevent crashes later)
514 // note that the job's slotError() can have called removeJob() first, so check that
515 // it's not a ghost job with null serial already.
516 if (jobPriv->m_schedSerial) {
517 removeJob(job: startingJob);
518 jobPriv->m_schedSerial = 0;
519 }
520 }
521 } else {
522#ifdef SCHEDULER_DEBUG
523 // qDebug() << "not starting any jobs because there is no queued job.";
524#endif
525 }
526
527 if (!m_queuesBySerial.isEmpty()) {
528 m_startJobTimer.start();
529 }
530}
531
532Scheduler::Scheduler()
533{
534 setObjectName(QStringLiteral("scheduler"));
535
536#ifdef WITH_QTDBUS
537 const QString dbusPath = QStringLiteral("/KIO/Scheduler");
538 const QString dbusInterface = QStringLiteral("org.kde.KIO.Scheduler");
539 QDBusConnection dbus = QDBusConnection::sessionBus();
540 // Not needed, right? We just want to emit two signals.
541 // dbus.registerObject(dbusPath, this, QDBusConnection::ExportScriptableSlots |
542 // QDBusConnection::ExportScriptableSignals);
543 dbus.connect(service: QString(),
544 path: dbusPath,
545 interface: dbusInterface,
546 QStringLiteral("reparseSlaveConfiguration"),
547 receiver: this,
548 SLOT(slotReparseSlaveConfiguration(QString, QDBusMessage)));
549#endif
550}
551
552Scheduler::~Scheduler()
553{
554}
555
556void Scheduler::doJob(SimpleJob *job)
557{
558 schedulerPrivate()->doJob(job);
559}
560
561// static
562void Scheduler::cancelJob(SimpleJob *job)
563{
564 schedulerPrivate()->cancelJob(job);
565}
566
567void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Worker *worker)
568{
569 schedulerPrivate()->jobFinished(job, worker);
570}
571
572void Scheduler::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
573{
574 schedulerPrivate()->putWorkerOnHold(job, url);
575}
576
577void Scheduler::removeWorkerOnHold()
578{
579 schedulerPrivate()->removeWorkerOnHold();
580}
581
582bool Scheduler::isWorkerOnHoldFor(const QUrl &url)
583{
584 return schedulerPrivate()->isWorkerOnHoldFor(url);
585}
586
587void Scheduler::updateInternalMetaData(SimpleJob *job)
588{
589 schedulerPrivate()->updateInternalMetaData(job);
590}
591
592void Scheduler::emitReparseSlaveConfiguration()
593{
594#ifdef WITH_QTDBUS
595 // Do it immediately in this process, otherwise we might send a request before reparsing
596 // (e.g. when changing useragent in the plugin)
597 schedulerPrivate()->slotReparseSlaveConfiguration(QString(), QDBusMessage());
598#endif
599
600 schedulerPrivate()->m_ignoreConfigReparse = true;
601 Q_EMIT self()->reparseSlaveConfiguration(QString());
602}
603
604#ifdef WITH_QTDBUS
605void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage &)
606{
607 if (m_ignoreConfigReparse) {
608 // qDebug() << "Ignoring signal sent by myself";
609 m_ignoreConfigReparse = false;
610 return;
611 }
612
613 // qDebug() << "proto=" << proto;
614 KProtocolManager::reparseConfiguration();
615 WorkerConfig::self()->reset();
616
617 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : m_protocols.constFind(key: proto);
618 QHash<QString, ProtoQueue *>::ConstIterator endIt = m_protocols.constEnd();
619
620 // not found?
621 if (it == endIt) {
622 return;
623 }
624
625 if (!proto.isEmpty()) {
626 endIt = it;
627 ++endIt;
628 }
629
630 for (; it != endIt; ++it) {
631 const QList<KIO::Worker *> list = it.value()->allWorkers();
632 for (Worker *worker : list) {
633 worker->send(cmd: CMD_REPARSECONFIGURATION);
634 worker->resetHost();
635 }
636 }
637}
638#endif
639
640void SchedulerPrivate::doJob(SimpleJob *job)
641{
642 // qDebug() << job;
643 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
644 jobPriv->m_protocol = job->url().scheme();
645
646 ProtoQueue *proto = protoQ(protocol: jobPriv->m_protocol, host: job->url().host());
647 proto->queueJob(job);
648}
649
650void SchedulerPrivate::cancelJob(SimpleJob *job)
651{
652 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
653 // this method is called all over the place in job.cpp, so just do this check here to avoid
654 // much boilerplate in job code.
655 if (jobPriv->m_schedSerial == 0) {
656 // qDebug() << "Doing nothing because I don't know job" << job;
657 return;
658 }
659 Worker *worker = jobSWorker(job);
660 // qDebug() << job << worker;
661 jobFinished(job, worker);
662 if (worker) {
663 ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol);
664 if (pq) {
665 pq->removeWorker(worker);
666 }
667 worker->kill(); // don't use worker after this!
668 }
669}
670
671void SchedulerPrivate::jobFinished(SimpleJob *job, Worker *worker)
672{
673 // qDebug() << job << worker;
674 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
675
676 // make sure that we knew about the job!
677 Q_ASSERT(jobPriv->m_schedSerial);
678
679 ProtoQueue *pq = m_protocols.value(key: jobPriv->m_protocol);
680 if (pq) {
681 pq->removeJob(job);
682 }
683
684 if (worker) {
685 // If we have internal meta-data, tell existing KIO workers to reload
686 // their configuration.
687 if (jobPriv->m_internalMetaData.count()) {
688 // qDebug() << "Updating KIO workers with new internal metadata information";
689 ProtoQueue *queue = m_protocols.value(key: worker->protocol());
690 if (queue) {
691 const QList<Worker *> workers = queue->allWorkers();
692 for (auto *runningWorker : workers) {
693 if (worker->host() == runningWorker->host()) {
694 worker->setConfig(metaDataFor(protocol: worker->protocol(), url: job->url()));
695 /*qDebug() << "Updated configuration of" << worker->protocol()
696 << "KIO worker, pid=" << worker->worker_pid();*/
697 }
698 }
699 }
700 }
701 worker->setJob(nullptr);
702 worker->disconnect(receiver: job);
703 }
704 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again
705 jobPriv->m_worker = nullptr;
706 // Clear the values in the internal metadata container since they have
707 // already been taken care of above...
708 jobPriv->m_internalMetaData.clear();
709}
710
711MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QUrl &url)
712{
713 const QString host = url.host();
714 MetaData configData = WorkerConfig::self()->configData(protocol, host);
715
716 return configData;
717}
718
719void SchedulerPrivate::setupWorker(KIO::Worker *worker, const QUrl &url, const QString &protocol, bool newWorker, const KIO::MetaData *config)
720{
721 int port = url.port();
722 if (port == -1) { // no port is -1 in QUrl, but in kde3 we used 0 and the KIO workers assume that.
723 port = 0;
724 }
725 const QString host = url.host();
726 const QString user = url.userName();
727 const QString passwd = url.password();
728
729 if (newWorker || worker->host() != host || worker->port() != port || worker->user() != user || worker->passwd() != passwd) {
730 MetaData configData = metaDataFor(protocol, url);
731 if (config) {
732 configData += *config;
733 }
734
735 worker->setConfig(configData);
736 worker->setProtocol(url.scheme());
737 worker->setHost(host, port, user, passwd);
738 }
739}
740
741void SchedulerPrivate::slotWorkerDied(KIO::Worker *worker)
742{
743 // qDebug() << worker;
744 Q_ASSERT(worker);
745 Q_ASSERT(!worker->isAlive());
746 ProtoQueue *pq = m_protocols.value(key: worker->protocol());
747 if (pq) {
748 if (worker->job()) {
749 pq->removeJob(job: worker->job());
750 }
751 // in case this was a connected worker...
752 pq->removeWorker(worker);
753 }
754 if (worker == m_workerOnHold) {
755 m_workerOnHold = nullptr;
756 m_urlOnHold.clear();
757 }
758 // can't use worker->deref() here because we need to use deleteLater
759 worker->aboutToDelete();
760 worker->deleteLater();
761}
762
763void SchedulerPrivate::putWorkerOnHold(KIO::SimpleJob *job, const QUrl &url)
764{
765 Worker *worker = jobSWorker(job);
766 // qDebug() << job << url << worker;
767 worker->disconnect(receiver: job);
768 // prevent the fake death of the worker from trying to kill the job again;
769 // cf. Worker::hold(const QUrl &url) called in SchedulerPrivate::publishWorkerOnHold().
770 worker->setJob(nullptr);
771 SimpleJobPrivate::get(job)->m_worker = nullptr;
772
773 if (m_workerOnHold) {
774 m_workerOnHold->kill();
775 }
776 m_workerOnHold = worker;
777 m_urlOnHold = url;
778 m_workerOnHold->suspend();
779}
780
781bool SchedulerPrivate::isWorkerOnHoldFor(const QUrl &url)
782{
783 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) {
784 return true;
785 }
786
787 return false;
788}
789
790Worker *SchedulerPrivate::heldWorkerForJob(SimpleJob *job)
791{
792 Worker *worker = nullptr;
793 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
794
795 if (m_workerOnHold) {
796 // Make sure that the job wants to do a GET or a POST, and with no offset
797 const int cmd = jobPriv->m_command;
798 bool canJobReuse = (cmd == CMD_GET);
799
800 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(object: job)) {
801 canJobReuse = (canJobReuse || cmd == CMD_SPECIAL);
802 if (canJobReuse) {
803 KIO::MetaData outgoing = tJob->outgoingMetaData();
804 const QString resume = outgoing.value(QStringLiteral("resume"));
805 const QString rangeStart = outgoing.value(QStringLiteral("range-start"));
806 // qDebug() << "Resume metadata is" << resume;
807 canJobReuse = (resume.isEmpty() || resume == QLatin1Char('0')) && (rangeStart.isEmpty() || rangeStart == QLatin1Char('0'));
808 }
809 }
810
811 if (job->url() == m_urlOnHold) {
812 if (canJobReuse) {
813 // qDebug() << "HOLD: Reusing held worker (" << m_workerOnHold << ")";
814 worker = m_workerOnHold;
815 } else {
816 // qDebug() << "HOLD: Discarding held worker (" << m_workerOnHold << ")";
817 m_workerOnHold->kill();
818 }
819 m_workerOnHold = nullptr;
820 m_urlOnHold.clear();
821 }
822 }
823
824 return worker;
825}
826
827void SchedulerPrivate::removeWorkerOnHold()
828{
829 // qDebug() << m_workerOnHold;
830 if (m_workerOnHold) {
831 m_workerOnHold->kill();
832 }
833 m_workerOnHold = nullptr;
834 m_urlOnHold.clear();
835}
836
837ProtoQueue *SchedulerPrivate::protoQ(const QString &protocol, const QString &host)
838{
839 ProtoQueue *pq = m_protocols.value(key: protocol, defaultValue: nullptr);
840 if (!pq) {
841 // qDebug() << "creating ProtoQueue instance for" << protocol;
842
843 const int maxWorkers = KProtocolInfo::maxWorkers(protocol);
844 int maxWorkersPerHost = -1;
845 if (!host.isEmpty()) {
846 bool ok = false;
847 const int value = WorkerConfig::self()->configData(protocol, host, QStringLiteral("MaxConnections")).toInt(ok: &ok);
848 if (ok) {
849 maxWorkersPerHost = value;
850 }
851 }
852 if (maxWorkersPerHost == -1) {
853 maxWorkersPerHost = KProtocolInfo::maxWorkersPerHost(protocol);
854 }
855 // Never allow maxWorkersPerHost to exceed maxWorkers.
856 pq = new ProtoQueue(maxWorkers, qMin(a: maxWorkers, b: maxWorkersPerHost));
857 m_protocols.insert(key: protocol, value: pq);
858 }
859 return pq;
860}
861
862void SchedulerPrivate::updateInternalMetaData(SimpleJob *job)
863{
864 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job);
865 // Preserve all internal meta-data so they can be sent back to the
866 // KIO workers as needed...
867 const QUrl jobUrl = job->url();
868
869 const QLatin1String currHostToken("{internal~currenthost}");
870 const QLatin1String allHostsToken("{internal~allhosts}");
871 // qDebug() << job << jobPriv->m_internalMetaData;
872 QMapIterator<QString, QString> it(jobPriv->m_internalMetaData);
873 while (it.hasNext()) {
874 it.next();
875 if (it.key().startsWith(s: currHostToken, cs: Qt::CaseInsensitive)) {
876 WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: jobUrl.host(), key: it.key().mid(position: currHostToken.size()), value: it.value());
877 } else if (it.key().startsWith(s: allHostsToken, cs: Qt::CaseInsensitive)) {
878 WorkerConfig::self()->setConfigData(protocol: jobUrl.scheme(), host: QString(), key: it.key().mid(position: allHostsToken.size()), value: it.value());
879 }
880 }
881}
882
883#include "moc_scheduler.cpp"
884#include "moc_scheduler_p.cpp"
885

source code of kio/src/core/scheduler.cpp