1 | /* |
2 | This file is part of the KDE libraries |
3 | SPDX-FileCopyrightText: 2000 Stephan Kulow <coolo@kde.org> |
4 | SPDX-FileCopyrightText: 2000 David Faure <faure@kde.org> |
5 | SPDX-FileCopyrightText: 2007 Thiago Macieira <thiago@kde.org> |
6 | SPDX-FileCopyrightText: 2024 Harald Sitter <sitter@kde.org> |
7 | |
8 | SPDX-License-Identifier: LGPL-2.0-or-later |
9 | */ |
10 | |
11 | #include "connection_p.h" |
12 | #include "connectionbackend_p.h" |
13 | #include "kiocoredebug.h" |
14 | #include <QDebug> |
15 | |
16 | #include <cerrno> |
17 | |
18 | using namespace KIO; |
19 | |
20 | void ConnectionPrivate::dequeue() |
21 | { |
22 | if (!backend || suspended) { |
23 | return; |
24 | } |
25 | |
26 | for (const Task &task : std::as_const(t&: outgoingTasks)) { |
27 | q->sendnow(cmd: task.cmd, data: task.data); |
28 | } |
29 | outgoingTasks.clear(); |
30 | |
31 | if (!incomingTasks.isEmpty()) { |
32 | Q_EMIT q->readyRead(); |
33 | } |
34 | } |
35 | |
36 | void ConnectionPrivate::commandReceived(const Task &task) |
37 | { |
38 | // qDebug() << this << "Command" << task.cmd << "added to the queue"; |
39 | if (!suspended && incomingTasks.isEmpty() && readMode == Connection::ReadMode::EventDriven) { |
40 | auto dequeueFunc = [this]() { |
41 | dequeue(); |
42 | }; |
43 | QMetaObject::invokeMethod(object: q, function&: dequeueFunc, type: Qt::QueuedConnection); |
44 | } |
45 | incomingTasks.append(t: task); |
46 | } |
47 | |
48 | void ConnectionPrivate::disconnected() |
49 | { |
50 | q->close(); |
51 | if (readMode == Connection::ReadMode::EventDriven) { |
52 | QMetaObject::invokeMethod(object: q, function: &Connection::readyRead, type: Qt::QueuedConnection); |
53 | } |
54 | } |
55 | |
56 | void ConnectionPrivate::setBackend(ConnectionBackend *b) |
57 | { |
58 | delete backend; |
59 | backend = b; |
60 | if (backend) { |
61 | q->connect(sender: backend, signal: &ConnectionBackend::commandReceived, context: q, slot: [this](const Task &task) { |
62 | commandReceived(task); |
63 | }); |
64 | q->connect(sender: backend, signal: &ConnectionBackend::disconnected, context: q, slot: [this]() { |
65 | disconnected(); |
66 | }); |
67 | backend->setSuspended(suspended); |
68 | } |
69 | } |
70 | |
71 | Connection::Connection(Type type, QObject *parent) |
72 | : QObject(parent) |
73 | , d(new ConnectionPrivate) |
74 | , m_type(type) |
75 | { |
76 | d->q = this; |
77 | } |
78 | |
79 | Connection::~Connection() |
80 | { |
81 | close(); |
82 | } |
83 | |
84 | void Connection::suspend() |
85 | { |
86 | // qDebug() << this << "Suspended"; |
87 | d->suspended = true; |
88 | if (d->backend) { |
89 | d->backend->setSuspended(true); |
90 | } |
91 | } |
92 | |
93 | void Connection::resume() |
94 | { |
95 | // send any outgoing or incoming commands that may be in queue |
96 | if (d->readMode == Connection::ReadMode::EventDriven) { |
97 | auto dequeueFunc = [this]() { |
98 | d->dequeue(); |
99 | }; |
100 | QMetaObject::invokeMethod(object: this, function&: dequeueFunc, type: Qt::QueuedConnection); |
101 | } |
102 | |
103 | // qDebug() << this << "Resumed"; |
104 | d->suspended = false; |
105 | if (d->backend) { |
106 | d->backend->setSuspended(false); |
107 | } |
108 | } |
109 | |
110 | void Connection::close() |
111 | { |
112 | if (d->backend) { |
113 | d->backend->disconnect(receiver: this); |
114 | d->backend->deleteLater(); |
115 | d->backend = nullptr; |
116 | } |
117 | d->outgoingTasks.clear(); |
118 | d->incomingTasks.clear(); |
119 | } |
120 | |
121 | bool Connection::isConnected() const |
122 | { |
123 | return d->backend && d->backend->state == ConnectionBackend::Connected; |
124 | } |
125 | |
126 | bool Connection::inited() const |
127 | { |
128 | return d->backend; |
129 | } |
130 | |
131 | bool Connection::suspended() const |
132 | { |
133 | return d->suspended; |
134 | } |
135 | |
136 | void Connection::connectToRemote(const QUrl &address) |
137 | { |
138 | // qDebug() << "Connection requested to" << address; |
139 | const QString scheme = address.scheme(); |
140 | |
141 | if (scheme == QLatin1String("local" )) { |
142 | d->setBackend(new ConnectionBackend(this)); |
143 | } else { |
144 | qCWarning(KIO_CORE) << "Unknown protocol requested:" << scheme << "(" << address << ")" ; |
145 | Q_ASSERT(0); |
146 | return; |
147 | } |
148 | |
149 | // connection succeeded |
150 | if (!d->backend->connectToRemote(url: address)) { |
151 | // qCWarning(KIO_CORE) << "could not connect to" << address << "using scheme" << scheme; |
152 | delete d->backend; |
153 | d->backend = nullptr; |
154 | return; |
155 | } |
156 | |
157 | d->dequeue(); |
158 | } |
159 | |
160 | QString Connection::errorString() const |
161 | { |
162 | if (d->backend) { |
163 | return d->backend->errorString; |
164 | } |
165 | return QString(); |
166 | } |
167 | |
168 | bool Connection::send(int cmd, const QByteArray &data) |
169 | { |
170 | // Remember that a Connection instance exists in the Application and the Worker. If the application terminates |
171 | // we potentially get disconnected while looping on data to send in the worker, terminate the worker when this |
172 | // happens. Specifically while reading a possible answer from the Application we may get socketDisconnected() |
173 | // we'll never get an answer in that case. |
174 | if (m_type == Type::Worker && !inited()) { |
175 | return false; |
176 | } |
177 | if (!inited() || !d->outgoingTasks.isEmpty()) { |
178 | Task task; |
179 | task.cmd = cmd; |
180 | task.data = data; |
181 | d->outgoingTasks.append(t: std::move(task)); |
182 | return true; |
183 | } else { |
184 | return sendnow(cmd: cmd, data); |
185 | } |
186 | } |
187 | |
188 | bool Connection::sendnow(int cmd, const QByteArray &data) |
189 | { |
190 | if (!d->backend || data.size() > 0xffffff || !isConnected()) { |
191 | return false; |
192 | } |
193 | |
194 | // qDebug() << this << "Sending command" << cmd << "of size" << data.size(); |
195 | return d->backend->sendCommand(command: cmd, data); |
196 | } |
197 | |
198 | bool Connection::hasTaskAvailable() const |
199 | { |
200 | return !d->incomingTasks.isEmpty(); |
201 | } |
202 | |
203 | bool Connection::waitForIncomingTask(int ms) |
204 | { |
205 | if (!isConnected()) { |
206 | return false; |
207 | } |
208 | |
209 | if (d->backend) { |
210 | return d->backend->waitForIncomingTask(ms); |
211 | } |
212 | return false; |
213 | } |
214 | |
215 | int Connection::read(int *_cmd, QByteArray &data) |
216 | { |
217 | // if it's still empty, then it's an error |
218 | if (d->incomingTasks.isEmpty()) { |
219 | // qCWarning(KIO_CORE) << this << "Task list is empty!"; |
220 | return -1; |
221 | } |
222 | const Task &task = d->incomingTasks.constFirst(); |
223 | // qDebug() << this << "Command" << task.cmd << "removed from the queue (size" << task.data.size() << ")"; |
224 | *_cmd = task.cmd; |
225 | data = task.data; |
226 | |
227 | d->incomingTasks.removeFirst(); |
228 | |
229 | // if we didn't empty our reading queue, emit again |
230 | if (!d->suspended && !d->incomingTasks.isEmpty() && d->readMode == Connection::ReadMode::EventDriven) { |
231 | auto dequeueFunc = [this]() { |
232 | d->dequeue(); |
233 | }; |
234 | QMetaObject::invokeMethod(object: this, function&: dequeueFunc, type: Qt::QueuedConnection); |
235 | } |
236 | |
237 | return data.size(); |
238 | } |
239 | |
240 | void Connection::setReadMode(ReadMode readMode) |
241 | { |
242 | d->readMode = readMode; |
243 | } |
244 | |
245 | #include "moc_connection_p.cpp" |
246 | |