1 | // Copyright (C) 2017 The Qt Company Ltd. |
2 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only |
3 | |
4 | #include "qmqttconnection_p.h" |
5 | #include "qmqttconnectionproperties_p.h" |
6 | #include "qmqttcontrolpacket_p.h" |
7 | #include "qmqttmessage_p.h" |
8 | #include "qmqttpublishproperties_p.h" |
9 | #include "qmqttsubscription_p.h" |
10 | #include "qmqttclient_p.h" |
11 | |
12 | #include <QtCore/QLoggingCategory> |
13 | #include <QtNetwork/QSslSocket> |
14 | #include <QtNetwork/QTcpSocket> |
15 | |
16 | #include <limits> |
17 | #include <cstdint> |
18 | |
19 | QT_BEGIN_NAMESPACE |
20 | |
21 | Q_LOGGING_CATEGORY(lcMqttConnection, "qt.mqtt.connection" ) |
22 | Q_LOGGING_CATEGORY(lcMqttConnectionVerbose, "qt.mqtt.connection.verbose" ); |
23 | |
24 | template <typename T> |
25 | T QMqttConnection::readBufferTyped(qint64 *dataSize) |
26 | { |
27 | Q_STATIC_ASSERT(std::is_integral<T>::value); |
28 | |
29 | T result = 0; |
30 | if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(sizeof(result)))) { |
31 | qCWarning(lcMqttConnection) << "Attempt to read past the data" ; |
32 | return result; |
33 | } |
34 | if (readBuffer(data: reinterpret_cast<char *>(&result), size: sizeof(result)) && dataSize != nullptr) |
35 | *dataSize -= sizeof(result); |
36 | return qFromBigEndian(result); |
37 | } |
38 | |
39 | template<> |
40 | QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize) |
41 | { |
42 | const quint16 size = readBufferTyped<quint16>(dataSize); |
43 | if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(size))) { |
44 | qCWarning(lcMqttConnection) << "Attempt to read past the data" ; |
45 | return QByteArray(); |
46 | } |
47 | QByteArray ba(int(size), Qt::Uninitialized); |
48 | if (readBuffer(data: ba.data(), size) && dataSize != nullptr) |
49 | *dataSize -= size; |
50 | return ba; |
51 | } |
52 | |
53 | template<> |
54 | QString QMqttConnection::readBufferTyped(qint64 *dataSize) |
55 | { |
56 | return QString::fromUtf8(ba: readBufferTyped<QByteArray>(dataSize)); |
57 | } |
58 | |
59 | QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent) |
60 | { |
61 | } |
62 | |
63 | QMqttConnection::~QMqttConnection() |
64 | { |
65 | if (m_internalState == BrokerConnected) |
66 | sendControlDisconnect(); |
67 | |
68 | if (m_ownTransport && m_transport) |
69 | delete m_transport; |
70 | } |
71 | |
72 | void QMqttConnection::timerEvent(QTimerEvent *event) |
73 | { |
74 | if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) { |
75 | sendControlPingRequest(); |
76 | return; |
77 | } |
78 | |
79 | QObject::timerEvent(event); |
80 | } |
81 | |
82 | void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport) |
83 | { |
84 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport; |
85 | |
86 | if (m_transport) { |
87 | disconnect(sender: m_transport, signal: &QIODevice::aboutToClose, receiver: this, slot: &QMqttConnection::transportConnectionClosed); |
88 | disconnect(sender: m_transport, signal: &QIODevice::readyRead, receiver: this, slot: &QMqttConnection::transportReadyRead); |
89 | if (m_ownTransport) |
90 | delete m_transport; |
91 | } |
92 | |
93 | m_transport = device; |
94 | m_transportType = transport; |
95 | m_ownTransport = false; |
96 | |
97 | connect(sender: m_transport, signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
98 | connect(sender: m_transport, signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
99 | } |
100 | |
101 | QIODevice *QMqttConnection::transport() const |
102 | { |
103 | return m_transport; |
104 | } |
105 | |
106 | bool QMqttConnection::ensureTransport(bool createSecureIfNeeded) |
107 | { |
108 | Q_UNUSED(createSecureIfNeeded); // QT_NO_SSL |
109 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transport; |
110 | |
111 | if (m_transport) { |
112 | if (m_ownTransport) |
113 | delete m_transport; |
114 | else |
115 | return true; |
116 | } |
117 | |
118 | // We are asked to create a transport layer |
119 | if (m_clientPrivate->m_hostname.isEmpty() || m_clientPrivate->m_port == 0) { |
120 | qCDebug(lcMqttConnection) << "No hostname specified, not able to create a transport layer." ; |
121 | return false; |
122 | } |
123 | auto socket = |
124 | #ifndef QT_NO_SSL |
125 | createSecureIfNeeded ? new QSslSocket() : |
126 | #endif |
127 | new QTcpSocket(); |
128 | m_transport = socket; |
129 | m_ownTransport = true; |
130 | m_transportType = |
131 | #ifndef QT_NO_SSL |
132 | createSecureIfNeeded ? QMqttClient::SecureSocket : |
133 | #endif |
134 | QMqttClient::AbstractSocket; |
135 | |
136 | #ifndef QT_NO_SSL |
137 | if (QSslSocket *sslSocket = qobject_cast<QSslSocket *>(object: socket)) |
138 | QObject::connect(sender: sslSocket, signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
139 | else |
140 | #endif |
141 | connect(sender: socket, signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished); |
142 | connect(sender: socket, signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed); |
143 | connect(sender: socket, signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError); |
144 | |
145 | connect(sender: m_transport, signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed); |
146 | connect(sender: m_transport, signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead); |
147 | return true; |
148 | } |
149 | |
150 | bool QMqttConnection::ensureTransportOpen(const QString &sslPeerName) |
151 | { |
152 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transportType; |
153 | |
154 | if (m_transportType == QMqttClient::IODevice) { |
155 | if (m_transport->isOpen()) |
156 | return sendControlConnect(); |
157 | |
158 | if (!m_transport->open(mode: QIODevice::ReadWrite)) { |
159 | qCDebug(lcMqttConnection) << "Could not open Transport IO device." ; |
160 | m_internalState = BrokerDisconnected; |
161 | return false; |
162 | } |
163 | return sendControlConnect(); |
164 | } |
165 | |
166 | if (m_transportType == QMqttClient::AbstractSocket) { |
167 | auto socket = qobject_cast<QTcpSocket *>(object: m_transport); |
168 | Q_ASSERT(socket); |
169 | if (socket->state() == QAbstractSocket::ConnectedState) |
170 | return sendControlConnect(); |
171 | |
172 | m_internalState = BrokerConnecting; |
173 | socket->connectToHost(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port); |
174 | } |
175 | #ifndef QT_NO_SSL |
176 | else if (m_transportType == QMqttClient::SecureSocket) { |
177 | auto socket = qobject_cast<QSslSocket *>(object: m_transport); |
178 | Q_ASSERT(socket); |
179 | if (socket->state() == QAbstractSocket::ConnectedState) |
180 | return sendControlConnect(); |
181 | |
182 | m_internalState = BrokerConnecting; |
183 | if (!m_sslConfiguration.isNull()) |
184 | socket->setSslConfiguration(m_sslConfiguration); |
185 | socket->connectToHostEncrypted(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port, sslPeerName); |
186 | } |
187 | #else |
188 | Q_UNUSED(sslPeerName); |
189 | #endif |
190 | |
191 | return true; |
192 | } |
193 | |
194 | bool QMqttConnection::sendControlConnect() |
195 | { |
196 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
197 | |
198 | QMqttControlPacket packet(QMqttControlPacket::CONNECT); |
199 | |
200 | // Variable header |
201 | // 3.1.2.1 Protocol Name |
202 | // 3.1.2.2 Protocol Level |
203 | switch (m_clientPrivate->m_protocolVersion) { |
204 | case QMqttClient::MQTT_3_1: |
205 | packet.append(data: "MQIsdp" ); |
206 | packet.append(value: char(3)); // Version 3.1 |
207 | break; |
208 | case QMqttClient::MQTT_3_1_1: |
209 | packet.append(data: "MQTT" ); |
210 | packet.append(value: char(4)); // Version 3.1.1 |
211 | break; |
212 | case QMqttClient::MQTT_5_0: |
213 | packet.append(data: "MQTT" ); |
214 | packet.append(value: char(5)); // Version 5.0 |
215 | break; |
216 | } |
217 | |
218 | // 3.1.2.3 Connect Flags |
219 | quint8 flags = 0; |
220 | // Clean session |
221 | if (m_clientPrivate->m_cleanSession) |
222 | flags |= 1 << 1; |
223 | |
224 | if (!m_clientPrivate->m_willTopic.isEmpty()) { |
225 | flags |= 1 << 2; |
226 | if (m_clientPrivate->m_willQoS > 2) { |
227 | qCDebug(lcMqttConnection) << "Invalid Will QoS specified." ; |
228 | return false; |
229 | } |
230 | if (m_clientPrivate->m_willQoS == 1) |
231 | flags |= 1 << 3; |
232 | else if (m_clientPrivate->m_willQoS == 2) |
233 | flags |= 1 << 4; |
234 | if (m_clientPrivate->m_willRetain) |
235 | flags |= 1 << 5; |
236 | } |
237 | if (m_clientPrivate->m_username.size()) |
238 | flags |= 1 << 7; |
239 | |
240 | if (m_clientPrivate->m_password.size()) |
241 | flags |= 1 << 6; |
242 | |
243 | packet.append(value: char(flags)); |
244 | |
245 | // 3.1.2.10 Keep Alive |
246 | packet.append(value: m_clientPrivate->m_keepAlive); |
247 | |
248 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
249 | packet.appendRaw(data: writeConnectProperties()); |
250 | |
251 | // 3.1.3 Payload |
252 | // 3.1.3.1 Client Identifier |
253 | const QByteArray clientStringArray = m_clientPrivate->m_clientId.toUtf8(); |
254 | if (clientStringArray.size()) { |
255 | packet.append(data: clientStringArray); |
256 | } else { |
257 | packet.append(value: char(0)); |
258 | packet.append(value: char(0)); |
259 | } |
260 | |
261 | if (!m_clientPrivate->m_willTopic.isEmpty()) { |
262 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
263 | packet.appendRaw(data: writeLastWillProperties()); |
264 | |
265 | packet.append(data: m_clientPrivate->m_willTopic.toUtf8()); |
266 | packet.append(data: m_clientPrivate->m_willMessage); |
267 | } |
268 | |
269 | if (m_clientPrivate->m_username.size()) |
270 | packet.append(data: m_clientPrivate->m_username.toUtf8()); |
271 | |
272 | if (m_clientPrivate->m_password.size()) |
273 | packet.append(data: m_clientPrivate->m_password.toUtf8()); |
274 | |
275 | m_internalState = BrokerWaitForConnectAck; |
276 | m_missingData = 0; |
277 | |
278 | if (!writePacketToTransport(p: packet)) { |
279 | qCDebug(lcMqttConnection) << "Could not write CONNECT frame to transport." ; |
280 | return false; |
281 | } |
282 | return true; |
283 | } |
284 | |
285 | bool QMqttConnection::sendControlAuthenticate(const QMqttAuthenticationProperties &properties) |
286 | { |
287 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
288 | |
289 | QMqttControlPacket packet(QMqttControlPacket::AUTH); |
290 | |
291 | switch (m_internalState) { |
292 | case BrokerDisconnected: |
293 | case BrokerConnecting: |
294 | case ClientDestruction: |
295 | qCDebug(lcMqttConnection) << "Using AUTH while disconnected." ; |
296 | return false; |
297 | case BrokerWaitForConnectAck: |
298 | qCDebug(lcMqttConnection) << "AUTH while connecting, set continuation flag." ; |
299 | packet.append(value: char(QMqtt::ReasonCode::ContinueAuthentication)); |
300 | break; |
301 | case BrokerConnected: |
302 | qCDebug(lcMqttConnection) << "AUTH while connected, initiate re-authentication." ; |
303 | packet.append(value: char(QMqtt::ReasonCode::ReAuthenticate)); |
304 | break; |
305 | } |
306 | |
307 | packet.appendRaw(data: writeAuthenticationProperties(properties)); |
308 | |
309 | if (!writePacketToTransport(p: packet)) { |
310 | qCDebug(lcMqttConnection) << "Could not write AUTH frame to transport." ; |
311 | return false; |
312 | } |
313 | |
314 | return true; |
315 | } |
316 | |
317 | qint32 QMqttConnection::sendControlPublish(const QMqttTopicName &topic, |
318 | const QByteArray &message, |
319 | quint8 qos, |
320 | bool retain, |
321 | const QMqttPublishProperties &properties) |
322 | { |
323 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << topic << " Size:" << message.size() << " bytes." |
324 | << "QoS:" << qos << " Retain:" << retain; |
325 | |
326 | if (!topic.isValid()) |
327 | return -1; |
328 | |
329 | quint8 = QMqttControlPacket::PUBLISH; |
330 | if (qos == 1) |
331 | header |= 0x02; |
332 | else if (qos == 2) |
333 | header |= 0x04; |
334 | |
335 | if (retain) |
336 | header |= 0x01; |
337 | |
338 | QSharedPointer<QMqttControlPacket> packet(new QMqttControlPacket(header)); |
339 | // topic alias |
340 | QMqttPublishProperties publishProperties(properties); |
341 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
342 | // 3.3.4 A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier |
343 | if (publishProperties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) { |
344 | qCWarning(lcMqttConnection) << "SubscriptionIdentifier must not be specified for publish." ; |
345 | return -1; |
346 | } |
347 | |
348 | const quint16 topicAlias = publishProperties.topicAlias(); |
349 | if (topicAlias > 0) { // User specified topic Alias |
350 | if (topicAlias > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) { |
351 | qCDebug(lcMqttConnection) << "TopicAlias publish: overflow." ; |
352 | return -1; |
353 | } |
354 | if (m_publishAliases.at(i: topicAlias - 1) != topic) { |
355 | qCDebug(lcMqttConnection) << "TopicAlias publish: Assign:" << topicAlias << ":" << topic; |
356 | m_publishAliases[topicAlias - 1] = topic; |
357 | packet->append(data: topic.name().toUtf8()); |
358 | } else { |
359 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Reuse:" << topicAlias; |
360 | packet->append(value: quint16(0)); |
361 | } |
362 | } else if (m_publishAliases.size() > 0) { // Automatic module alias assignment |
363 | int autoAlias = m_publishAliases.indexOf(t: topic); |
364 | if (autoAlias != -1) { |
365 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Use auto alias:" << autoAlias; |
366 | packet->append(value: quint16(0)); |
367 | publishProperties.setTopicAlias(quint16(autoAlias + 1)); |
368 | } else { |
369 | autoAlias = m_publishAliases.indexOf(t: QMqttTopicName()); |
370 | if (autoAlias != -1) { |
371 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: auto alias assignment:" << autoAlias; |
372 | m_publishAliases[autoAlias] = topic; |
373 | publishProperties.setTopicAlias(quint16(autoAlias) + 1); |
374 | } else |
375 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: alias storage full, using full topic" ; |
376 | packet->append(data: topic.name().toUtf8()); |
377 | } |
378 | } else { |
379 | packet->append(data: topic.name().toUtf8()); |
380 | } |
381 | } else { // ! MQTT_5_0 |
382 | packet->append(data: topic.name().toUtf8()); |
383 | } |
384 | quint16 identifier = 0; |
385 | if (qos > 0) { |
386 | identifier = unusedPacketIdentifier(); |
387 | packet->append(value: identifier); |
388 | m_pendingMessages.insert(key: identifier, value: packet); |
389 | } |
390 | |
391 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
392 | packet->appendRaw(data: writePublishProperties(properties: publishProperties)); |
393 | |
394 | packet->appendRaw(data: message); |
395 | |
396 | const bool written = writePacketToTransport(p: *packet.data()); |
397 | |
398 | if (!written && qos > 0) |
399 | m_pendingMessages.remove(key: identifier); |
400 | return written ? identifier : -1; |
401 | } |
402 | |
403 | bool QMqttConnection::sendControlPublishAcknowledge(quint16 id) |
404 | { |
405 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
406 | QMqttControlPacket packet(QMqttControlPacket::PUBACK); |
407 | packet.append(value: id); |
408 | return writePacketToTransport(p: packet); |
409 | } |
410 | |
411 | bool QMqttConnection::sendControlPublishRelease(quint16 id) |
412 | { |
413 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
414 | quint8 = QMqttControlPacket::PUBREL; |
415 | header |= 0x02; // MQTT-3.6.1-1 |
416 | |
417 | QMqttControlPacket packet(header); |
418 | packet.append(value: id); |
419 | return writePacketToTransport(p: packet); |
420 | } |
421 | |
422 | bool QMqttConnection::sendControlPublishReceive(quint16 id) |
423 | { |
424 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
425 | QMqttControlPacket packet(QMqttControlPacket::PUBREC); |
426 | packet.append(value: id); |
427 | return writePacketToTransport(p: packet); |
428 | } |
429 | |
430 | bool QMqttConnection::sendControlPublishComp(quint16 id) |
431 | { |
432 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << id; |
433 | QMqttControlPacket packet(QMqttControlPacket::PUBCOMP); |
434 | packet.append(value: id); |
435 | return writePacketToTransport(p: packet); |
436 | } |
437 | |
438 | QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter &topic, |
439 | quint8 qos, |
440 | const QMqttSubscriptionProperties &properties) |
441 | { |
442 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos; |
443 | |
444 | // Overflow protection |
445 | if (Q_UNLIKELY(!topic.isValid())) { |
446 | qCWarning(lcMqttConnection) << "Invalid subscription topic filter." ; |
447 | return nullptr; |
448 | } |
449 | |
450 | if (Q_UNLIKELY(qos > 2)) { |
451 | qCWarning(lcMqttConnection) << "Invalid subscription QoS." ; |
452 | return nullptr; |
453 | } |
454 | |
455 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
456 | const QString sharedSubscriptionName = topic.sharedSubscriptionName(); |
457 | if (!sharedSubscriptionName.isEmpty()) { |
458 | const QMqttTopicFilter filter(topic.filter().section(asep: QLatin1Char('/'), astart: 2)); |
459 | auto it = m_activeSubscriptions.constFind(key: filter); |
460 | if (it != m_activeSubscriptions.cend() && (*it)->sharedSubscriptionName() == sharedSubscriptionName) |
461 | return *it; |
462 | } else { |
463 | auto it = m_activeSubscriptions.constFind(key: topic); |
464 | if (it != m_activeSubscriptions.cend() && !(*it)->isSharedSubscription()) |
465 | return *it; |
466 | } |
467 | } else { |
468 | auto it = m_activeSubscriptions.constFind(key: topic); |
469 | if (it != m_activeSubscriptions.cend()) |
470 | return *it; |
471 | } |
472 | |
473 | // has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead? |
474 | // MQTT-3.8.1-1 |
475 | const quint8 = QMqttControlPacket::SUBSCRIBE + 0x02; |
476 | QMqttControlPacket packet(header); |
477 | |
478 | // Add Packet Identifier |
479 | const quint16 identifier = unusedPacketIdentifier(); |
480 | |
481 | packet.append(value: identifier); |
482 | |
483 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
484 | packet.appendRaw(data: writeSubscriptionProperties(properties)); |
485 | |
486 | packet.append(data: topic.filter().toUtf8()); |
487 | char options = char(qos); |
488 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && properties.noLocal()) |
489 | options |= 1 << 2; |
490 | packet.append(value: options); |
491 | |
492 | auto result = new QMqttSubscription(this); |
493 | result->setTopic(topic); |
494 | result->setClient(m_clientPrivate->m_client); |
495 | result->setQos(qos); |
496 | result->setState(QMqttSubscription::SubscriptionPending); |
497 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && !topic.sharedSubscriptionName().isEmpty()) { |
498 | result->setSharedSubscriptionName(topic.sharedSubscriptionName()); |
499 | result->setSharedSubscription(true); |
500 | result->setTopic(topic.filter().section(asep: QLatin1Char('/'), astart: 2)); |
501 | } |
502 | |
503 | if (!writePacketToTransport(p: packet)) { |
504 | delete result; |
505 | return nullptr; |
506 | } |
507 | |
508 | // SUBACK must contain identifier MQTT-3.8.4-2 |
509 | m_pendingSubscriptionAck.insert(key: identifier, value: result); |
510 | m_activeSubscriptions.insert(key: result->topic(), value: result); |
511 | return result; |
512 | } |
513 | |
514 | bool QMqttConnection::sendControlUnsubscribe(const QMqttTopicFilter &topic, const QMqttUnsubscriptionProperties &properties) |
515 | { |
516 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic; |
517 | |
518 | // MQTT-3.10.3-2 |
519 | if (!topic.isValid()) |
520 | return false; |
521 | |
522 | if (!m_activeSubscriptions.contains(key: topic)) |
523 | return false; |
524 | |
525 | if (m_internalState != QMqttConnection::BrokerConnected) { |
526 | m_activeSubscriptions.remove(key: topic); |
527 | return true; |
528 | } |
529 | |
530 | // has to have 0010 as bits 3-0, maybe update UNSUBSCRIBE instead? |
531 | // MQTT-3.10.1-1 |
532 | const quint8 = QMqttControlPacket::UNSUBSCRIBE + 0x02; |
533 | QMqttControlPacket packet(header); |
534 | |
535 | // Add Packet Identifier |
536 | const quint16 identifier = unusedPacketIdentifier(); |
537 | |
538 | packet.append(value: identifier); |
539 | |
540 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
541 | packet.appendRaw(data: writeUnsubscriptionProperties(properties)); |
542 | } |
543 | |
544 | packet.append(data: topic.filter().toUtf8()); |
545 | auto sub = m_activeSubscriptions[topic]; |
546 | sub->setState(QMqttSubscription::UnsubscriptionPending); |
547 | |
548 | if (!writePacketToTransport(p: packet)) |
549 | return false; |
550 | |
551 | // Do not remove from m_activeSubscriptions as there might be QoS1/2 messages to still |
552 | // be sent before UNSUBSCRIBE is acknowledged. |
553 | m_pendingUnsubscriptions.insert(key: identifier, value: sub); |
554 | |
555 | return true; |
556 | } |
557 | |
558 | bool QMqttConnection::sendControlPingRequest(bool isAuto) |
559 | { |
560 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
561 | |
562 | if (m_internalState != QMqttConnection::BrokerConnected) |
563 | return false; |
564 | |
565 | |
566 | if (!isAuto && m_clientPrivate->m_autoKeepAlive) { |
567 | qCDebug(lcMqttConnection) << "Requesting a manual ping while autoKeepAlive is enabled " |
568 | << "is not allowed." ; |
569 | return false; |
570 | } |
571 | |
572 | // 3.1.2.10 If a Client does not receive a PINGRESP packet within a reasonable amount of time |
573 | // after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server |
574 | if (m_pingTimeout > 1) { |
575 | closeConnection(error: QMqttClient::ServerUnavailable); |
576 | return false; |
577 | } |
578 | |
579 | const QMqttControlPacket packet(QMqttControlPacket::PINGREQ); |
580 | if (!writePacketToTransport(p: packet)) { |
581 | qCDebug(lcMqttConnection) << "Failed to write PINGREQ to transport." ; |
582 | return false; |
583 | } |
584 | m_pingTimeout++; |
585 | return true; |
586 | } |
587 | |
588 | bool QMqttConnection::sendControlDisconnect() |
589 | { |
590 | qCDebug(lcMqttConnection) << Q_FUNC_INFO; |
591 | |
592 | m_pingTimer.stop(); |
593 | m_pingTimeout = 0; |
594 | |
595 | m_activeSubscriptions.clear(); |
596 | |
597 | m_receiveAliases.clear(); |
598 | m_publishAliases.clear(); |
599 | |
600 | const QMqttControlPacket packet(QMqttControlPacket::DISCONNECT); |
601 | if (!writePacketToTransport(p: packet)) { |
602 | qCDebug(lcMqttConnection) << "Failed to write DISCONNECT to transport." ; |
603 | return false; |
604 | } |
605 | if (m_internalState != ClientDestruction) |
606 | m_internalState = BrokerDisconnected; |
607 | |
608 | if (m_transport->waitForBytesWritten(msecs: 30000)) { |
609 | // MQTT-3.14.4-1 must disconnect |
610 | m_transport->close(); |
611 | return true; |
612 | } |
613 | return false; |
614 | } |
615 | |
616 | void QMqttConnection::setClientPrivate(QMqttClientPrivate *clientPrivate) |
617 | { |
618 | m_clientPrivate = clientPrivate; |
619 | } |
620 | |
621 | quint16 QMqttConnection::unusedPacketIdentifier() const |
622 | { |
623 | // MQTT-2.3.1-1 Control Packets MUST contain a non-zero 16-bit Packet Identifier |
624 | static quint16 packetIdentifierCounter = 1; |
625 | const std::uint16_t u16max = std::numeric_limits<std::uint16_t>::max(); |
626 | |
627 | // MQTT-2.3.1-2 ...it MUST assign it a currently unused Packet Identifier |
628 | const quint16 lastIdentifier = packetIdentifierCounter; |
629 | do { |
630 | if (packetIdentifierCounter == u16max) |
631 | packetIdentifierCounter = 1; |
632 | else |
633 | packetIdentifierCounter++; |
634 | |
635 | if (lastIdentifier == packetIdentifierCounter) { |
636 | qCDebug(lcMqttConnection) << "Could not generate unique packet identifier." ; |
637 | break; |
638 | } |
639 | } while (m_pendingSubscriptionAck.contains(key: packetIdentifierCounter) |
640 | || m_pendingUnsubscriptions.contains(key: packetIdentifierCounter) |
641 | || m_pendingMessages.contains(key: packetIdentifierCounter) |
642 | || m_pendingReleaseMessages.contains(key: packetIdentifierCounter)); |
643 | return packetIdentifierCounter; |
644 | } |
645 | |
646 | void QMqttConnection::cleanSubscriptions() |
647 | { |
648 | for (auto item : m_pendingSubscriptionAck) |
649 | item->setState(QMqttSubscription::Unsubscribed); |
650 | m_pendingSubscriptionAck.clear(); |
651 | |
652 | for (auto item : m_pendingUnsubscriptions) |
653 | item->setState(QMqttSubscription::Unsubscribed); |
654 | m_pendingUnsubscriptions.clear(); |
655 | |
656 | for (auto item : m_activeSubscriptions) |
657 | item->setState(QMqttSubscription::Unsubscribed); |
658 | m_activeSubscriptions.clear(); |
659 | } |
660 | |
661 | void QMqttConnection::transportConnectionEstablished() |
662 | { |
663 | if (m_internalState != BrokerConnecting) { |
664 | qCWarning(lcMqttConnection) << "Connection established at an unexpected time" ; |
665 | return; |
666 | } |
667 | |
668 | if (!sendControlConnect()) { |
669 | qCDebug(lcMqttConnection) << "Failed to write CONNECT to transport." ; |
670 | // ### Who disconnects now? Connection or client? |
671 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid); |
672 | } |
673 | } |
674 | |
675 | void QMqttConnection::transportConnectionClosed() |
676 | { |
677 | m_readBuffer.clear(); |
678 | m_readPosition = 0; |
679 | m_pingTimer.stop(); |
680 | m_pingTimeout = 0; |
681 | if (m_internalState == ClientDestruction) |
682 | return; |
683 | if (m_internalState == BrokerDisconnected) // We manually disconnected |
684 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::NoError); |
685 | else |
686 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid); |
687 | } |
688 | |
689 | void QMqttConnection::transportReadyRead() |
690 | { |
691 | qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO; |
692 | m_readBuffer.append(a: m_transport->readAll()); |
693 | processData(); |
694 | } |
695 | |
696 | void QMqttConnection::transportError(QAbstractSocket::SocketError e) |
697 | { |
698 | qCDebug(lcMqttConnection) << Q_FUNC_INFO << e; |
699 | closeConnection(error: QMqttClient::TransportInvalid); |
700 | } |
701 | |
702 | bool QMqttConnection::readBuffer(char *data, quint64 size) |
703 | { |
704 | if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) { |
705 | qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation" ; |
706 | closeConnection(error: QMqttClient::ProtocolViolation); |
707 | return false; |
708 | } |
709 | memcpy(dest: data, src: m_readBuffer.constData() + m_readPosition, n: size); |
710 | m_readPosition += size; |
711 | return true; |
712 | } |
713 | |
714 | qint32 QMqttConnection::readVariableByteInteger(qint64 *dataSize) |
715 | { |
716 | quint32 multiplier = 1; |
717 | qint32 msgLength = 0; |
718 | quint8 b = 0; |
719 | quint8 iteration = 0; |
720 | do { |
721 | b = readBufferTyped<quint8>(dataSize); |
722 | msgLength += (b & 127) * multiplier; |
723 | multiplier *= 128; |
724 | iteration++; |
725 | if (iteration > 4) { |
726 | qCDebug(lcMqttConnection) << "Overflow trying to read variable integer." ; |
727 | closeConnection(error: QMqttClient::ProtocolViolation); |
728 | return -1; |
729 | } |
730 | } while ((b & 128) != 0); |
731 | return msgLength; |
732 | } |
733 | |
734 | void QMqttConnection::closeConnection(QMqttClient::ClientError error) |
735 | { |
736 | m_readBuffer.clear(); |
737 | m_readPosition = 0; |
738 | m_pingTimer.stop(); |
739 | m_pingTimeout = 0; |
740 | m_activeSubscriptions.clear(); |
741 | m_internalState = BrokerDisconnected; |
742 | m_transport->disconnect(); |
743 | m_transport->close(); |
744 | m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: error); |
745 | } |
746 | |
747 | QByteArray QMqttConnection::readBuffer(quint64 size) |
748 | { |
749 | if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) { |
750 | qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation" ; |
751 | closeConnection(error: QMqttClient::ProtocolViolation); |
752 | return QByteArray(); |
753 | } |
754 | QByteArray res(m_readBuffer.constData() + m_readPosition, int(size)); |
755 | m_readPosition += size; |
756 | return res; |
757 | } |
758 | |
759 | void QMqttConnection::readAuthProperties(QMqttAuthenticationProperties &properties) |
760 | { |
761 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
762 | m_missingData -= propertyLength; |
763 | |
764 | QMqttUserProperties userProperties; |
765 | while (propertyLength > 0) { |
766 | quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
767 | |
768 | switch (propertyId) { |
769 | case 0x15: { //3.15.2.2.2 Authentication Method |
770 | const QString method = readBufferTyped<QString>(dataSize: &propertyLength); |
771 | properties.setAuthenticationMethod(method); |
772 | break; |
773 | } |
774 | case 0x16: { // 3.15.2.2.3 Authentication Data |
775 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
776 | properties.setAuthenticationData(data); |
777 | break; |
778 | } |
779 | case 0x1F: { // 3.15.2.2.4 Reason String |
780 | const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength); |
781 | properties.setReason(reasonString); |
782 | break; |
783 | } |
784 | case 0x26: { // 3.15.2.2.5 User property |
785 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
786 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
787 | |
788 | userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
789 | break; |
790 | } |
791 | default: |
792 | qCDebug(lcMqttConnection) << "Unknown property id in AUTH:" << propertyId; |
793 | break; |
794 | } |
795 | } |
796 | if (!userProperties.isEmpty()) |
797 | properties.setUserProperties(userProperties); |
798 | } |
799 | |
800 | void QMqttConnection::readConnackProperties(QMqttServerConnectionProperties &properties) |
801 | { |
802 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
803 | m_missingData -= propertyLength; |
804 | |
805 | properties.serverData->valid = true; |
806 | |
807 | while (propertyLength > 0) { |
808 | quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
809 | switch (propertyId) { |
810 | case 0x11: { // 3.2.2.3.2 Session Expiry Interval |
811 | const quint32 expiryInterval = readBufferTyped<quint32>(dataSize: &propertyLength); |
812 | properties.serverData->details |= QMqttServerConnectionProperties::SessionExpiryInterval; |
813 | properties.setSessionExpiryInterval(expiryInterval); |
814 | break; |
815 | } |
816 | case 0x21: { // 3.2.2.3.3 Receive Maximum |
817 | const quint16 receiveMaximum = readBufferTyped<quint16>(dataSize: &propertyLength); |
818 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumReceive; |
819 | properties.setMaximumReceive(receiveMaximum); |
820 | break; |
821 | } |
822 | case 0x24: { // 3.2.2.3.4 Maximum QoS Level |
823 | const quint8 maxQoS = readBufferTyped<quint8>(dataSize: &propertyLength); |
824 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumQoS; |
825 | properties.serverData->maximumQoS = maxQoS; |
826 | break; |
827 | } |
828 | case 0x25: { // 3.2.2.3.5 Retain available |
829 | const quint8 retainAvailable = readBufferTyped<quint8>(dataSize: &propertyLength); |
830 | properties.serverData->details |= QMqttServerConnectionProperties::RetainAvailable; |
831 | properties.serverData->retainAvailable = retainAvailable == 1; |
832 | break; |
833 | } |
834 | case 0x27: { // 3.2.2.3.6 Maximum packet size |
835 | const quint32 maxPacketSize = readBufferTyped<quint32>(dataSize: &propertyLength); |
836 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumPacketSize; |
837 | properties.setMaximumPacketSize(maxPacketSize); |
838 | break; |
839 | } |
840 | case 0x12: { // 3.2.2.3.7 Assigned clientId |
841 | const QString assignedClientId = readBufferTyped<QString>(dataSize: &propertyLength); |
842 | properties.serverData->details |= QMqttServerConnectionProperties::AssignedClientId; |
843 | m_clientPrivate->setClientId(assignedClientId); |
844 | break; |
845 | } |
846 | case 0x22: { // 3.2.2.3.8 Topic Alias Maximum |
847 | const quint16 topicAliasMaximum = readBufferTyped<quint16>(dataSize: &propertyLength); |
848 | properties.serverData->details |= QMqttServerConnectionProperties::MaximumTopicAlias; |
849 | properties.setMaximumTopicAlias(topicAliasMaximum); |
850 | break; |
851 | } |
852 | case 0x1F: { // 3.2.2.3.9 Reason String |
853 | const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength); |
854 | properties.serverData->details |= QMqttServerConnectionProperties::ReasonString; |
855 | properties.serverData->reasonString = reasonString; |
856 | break; |
857 | } |
858 | case 0x26: { // 3.2.2.3.10 User property |
859 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
860 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
861 | |
862 | properties.serverData->details |= QMqttServerConnectionProperties::UserProperty; |
863 | properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
864 | break; |
865 | } |
866 | case 0x28: { // 3.2.2.3.11 Wildcard subscriptions available |
867 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
868 | properties.serverData->details |= QMqttServerConnectionProperties::WildCardSupported; |
869 | properties.serverData->wildcardSupported = available == 1; |
870 | break; |
871 | } |
872 | case 0x29: { // 3.2.2.3.12 Subscription identifiers available |
873 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
874 | properties.serverData->details |= QMqttServerConnectionProperties::SubscriptionIdentifierSupport; |
875 | properties.serverData->subscriptionIdentifierSupported = available == 1; |
876 | break; |
877 | } |
878 | case 0x2A: { // 3.2.2.3.13 Shared subscriptions available |
879 | const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength); |
880 | properties.serverData->details |= QMqttServerConnectionProperties::SharedSubscriptionSupport; |
881 | properties.serverData->sharedSubscriptionSupported = available == 1; |
882 | break; |
883 | } |
884 | case 0x13: { // 3.2.2.3.14 Server Keep Alive |
885 | const quint16 serverKeepAlive = readBufferTyped<quint16>(dataSize: &propertyLength); |
886 | properties.serverData->details |= QMqttServerConnectionProperties::ServerKeepAlive; |
887 | m_clientPrivate->m_client->setKeepAlive(serverKeepAlive); |
888 | break; |
889 | } |
890 | case 0x1A: { // 3.2.2.3.15 Response information |
891 | const QString responseInfo = readBufferTyped<QString>(dataSize: &propertyLength); |
892 | properties.serverData->details |= QMqttServerConnectionProperties::ResponseInformation; |
893 | properties.serverData->responseInformation = responseInfo; |
894 | break; |
895 | } |
896 | case 0x1C: { // 3.2.2.3.16 Server reference |
897 | const QString serverReference = readBufferTyped<QString>(dataSize: &propertyLength); |
898 | properties.serverData->details |= QMqttServerConnectionProperties::ServerReference; |
899 | properties.serverData->serverReference = serverReference; |
900 | break; |
901 | } |
902 | case 0x15: { // 3.2.2.3.17 Authentication method |
903 | const QString method = readBufferTyped<QString>(dataSize: &propertyLength); |
904 | properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationMethod; |
905 | properties.data->authenticationMethod = method; |
906 | break; |
907 | } |
908 | case 0x16: { // 3.2.2.3.18 Authentication data |
909 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
910 | properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationData; |
911 | properties.data->authenticationData = data; |
912 | break; |
913 | } |
914 | default: |
915 | qCDebug(lcMqttConnection) << "Unknown property id in CONNACK:" << int(propertyId); |
916 | break; |
917 | } |
918 | } |
919 | } |
920 | |
921 | void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties) |
922 | { |
923 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
924 | m_missingData -= propertyLength; |
925 | |
926 | while (propertyLength > 0) { |
927 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
928 | switch (propertyId) { |
929 | case 0x1f: { // 3.4.2.2.2 Reason String |
930 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
931 | properties.data->reasonString = content; |
932 | break; |
933 | } |
934 | case 0x26: { // 3.4.2.2.3 User Properites |
935 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
936 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
937 | properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
938 | break; |
939 | } |
940 | default: |
941 | qCDebug(lcMqttConnection) << "Unknown subscription property received." ; |
942 | break; |
943 | } |
944 | } |
945 | } |
946 | |
947 | void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties) |
948 | { |
949 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
950 | m_missingData -= propertyLength; |
951 | |
952 | QMqttUserProperties userProperties; |
953 | QList<quint32> subscriptionIds; |
954 | |
955 | while (propertyLength > 0) { |
956 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
957 | switch (propertyId) { |
958 | case 0x01: { // 3.3.2.3.2 Payload Format Indicator |
959 | const quint8 format = readBufferTyped<quint8>(dataSize: &propertyLength); |
960 | if (format == 1) |
961 | properties.setPayloadFormatIndicator(QMqtt::PayloadFormatIndicator::UTF8Encoded); |
962 | break; |
963 | } |
964 | case 0x02: { // 3.3.2.3.3 Message Expiry Interval |
965 | const quint32 interval = readBufferTyped<quint32>(dataSize: &propertyLength); |
966 | properties.setMessageExpiryInterval(interval); |
967 | break; |
968 | } |
969 | case 0x23: { // 3.3.2.3.4 Topic alias |
970 | const quint16 alias = readBufferTyped<quint16>(dataSize: &propertyLength); |
971 | properties.setTopicAlias(alias); |
972 | break; |
973 | } |
974 | case 0x08: { // 3.3.2.3.5 Response Topic |
975 | const QString responseTopic = readBufferTyped<QString>(dataSize: &propertyLength); |
976 | properties.setResponseTopic(responseTopic); |
977 | break; |
978 | } |
979 | case 0x09: { // 3.3.2.3.6 Correlation Data |
980 | const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength); |
981 | properties.setCorrelationData(data); |
982 | break; |
983 | } |
984 | case 0x26: { // 3.3.2.3.7 User property |
985 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
986 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
987 | userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
988 | break; |
989 | } |
990 | case 0x0b: { // 3.3.2.3.8 Subscription Identifier |
991 | qint32 id = readVariableByteInteger(dataSize: &propertyLength); |
992 | if (id < 0) |
993 | return; // readVariableByteInteger closes connection |
994 | subscriptionIds.append(t: quint32(id)); |
995 | break; |
996 | } |
997 | case 0x03: { // 3.3.2.3.9 Content Type |
998 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
999 | properties.setContentType(content); |
1000 | break; |
1001 | } |
1002 | default: |
1003 | qCDebug(lcMqttConnection) << "Unknown publish property received." ; |
1004 | break; |
1005 | } |
1006 | } |
1007 | if (!userProperties.isEmpty()) |
1008 | properties.setUserProperties(userProperties); |
1009 | |
1010 | if (!subscriptionIds.isEmpty()) |
1011 | properties.setSubscriptionIdentifiers(subscriptionIds); |
1012 | } |
1013 | |
1014 | void QMqttConnection::readSubscriptionProperties(QMqttSubscription *sub) |
1015 | { |
1016 | qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData); |
1017 | m_missingData -= propertyLength; |
1018 | |
1019 | while (propertyLength > 0) { |
1020 | const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength); |
1021 | switch (propertyId) { |
1022 | case 0x1f: { // 3.9.2.1.2 Reason String |
1023 | const QString content = readBufferTyped<QString>(dataSize: &propertyLength); |
1024 | sub->d_func()->m_reasonString = content; |
1025 | break; |
1026 | } |
1027 | case 0x26: { // 3.9.2.1.3 |
1028 | const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength); |
1029 | const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength); |
1030 | |
1031 | sub->d_func()->m_userProperties.append(t: QMqttStringPair(propertyName, propertyValue)); |
1032 | break; |
1033 | } |
1034 | default: |
1035 | qCDebug(lcMqttConnection) << "Unknown subscription property received." ; |
1036 | break; |
1037 | } |
1038 | } |
1039 | } |
1040 | |
1041 | QByteArray QMqttConnection::writeConnectProperties() |
1042 | { |
1043 | QMqttControlPacket properties; |
1044 | |
1045 | // According to MQTT5 3.1.2.11 default values do not need to be included in the |
1046 | // connect statement. |
1047 | |
1048 | // 3.1.2.11.2 |
1049 | if (m_clientPrivate->m_connectionProperties.sessionExpiryInterval() != 0) { |
1050 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify sessionExpiryInterval" ; |
1051 | properties.append(value: char(0x11)); |
1052 | properties.append(value: m_clientPrivate->m_connectionProperties.sessionExpiryInterval()); |
1053 | } |
1054 | |
1055 | // 3.1.2.11.3 |
1056 | if (m_clientPrivate->m_connectionProperties.maximumReceive() != 65535) { |
1057 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumReceive" ; |
1058 | properties.append(value: char(0x21)); |
1059 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumReceive()); |
1060 | } |
1061 | |
1062 | // 3.1.2.11.4 |
1063 | if (m_clientPrivate->m_connectionProperties.maximumPacketSize() != std::numeric_limits<quint32>::max()) { |
1064 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumPacketSize" ; |
1065 | properties.append(value: char(0x27)); |
1066 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumPacketSize()); |
1067 | } |
1068 | |
1069 | // 3.1.2.11.5 |
1070 | if (m_clientPrivate->m_connectionProperties.maximumTopicAlias() != 0) { |
1071 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumTopicAlias" ; |
1072 | properties.append(value: char(0x22)); |
1073 | properties.append(value: m_clientPrivate->m_connectionProperties.maximumTopicAlias()); |
1074 | } |
1075 | |
1076 | // 3.1.2.11.6 |
1077 | if (m_clientPrivate->m_connectionProperties.requestResponseInformation()) { |
1078 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestResponseInformation" ; |
1079 | properties.append(value: char(0x19)); |
1080 | properties.append(value: char(1)); |
1081 | } |
1082 | |
1083 | // 3.1.2.11.7 |
1084 | if (!m_clientPrivate->m_connectionProperties.requestProblemInformation()) { |
1085 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestProblemInformation" ; |
1086 | properties.append(value: char(0x17)); |
1087 | properties.append(value: char(0)); |
1088 | } |
1089 | |
1090 | // 3.1.2.11.8 Add User properties |
1091 | auto userProperties = m_clientPrivate->m_connectionProperties.userProperties(); |
1092 | if (!userProperties.isEmpty()) { |
1093 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify user properties" ; |
1094 | for (const auto &prop : userProperties) { |
1095 | properties.append(value: char(0x26)); |
1096 | properties.append(data: prop.name().toUtf8()); |
1097 | properties.append(data: prop.value().toUtf8()); |
1098 | } |
1099 | } |
1100 | |
1101 | // 3.1.2.11.9 Add Authentication |
1102 | const QString authenticationMethod = m_clientPrivate->m_connectionProperties.authenticationMethod(); |
1103 | if (!authenticationMethod.isEmpty()) { |
1104 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify AuthenticationMethod:" ; |
1105 | qCDebug(lcMqttConnectionVerbose) << " " << authenticationMethod; |
1106 | properties.append(value: char(0x15)); |
1107 | properties.append(data: authenticationMethod.toUtf8()); |
1108 | // 3.1.2.11.10 |
1109 | const QByteArray authenticationData = m_clientPrivate->m_connectionProperties.authenticationData(); |
1110 | if (!authenticationData.isEmpty()) { |
1111 | qCDebug(lcMqttConnectionVerbose) << "Connection Properties: Authentication Data:" ; |
1112 | qCDebug(lcMqttConnectionVerbose) << " " << authenticationData; |
1113 | properties.append(value: char(0x16)); |
1114 | properties.append(data: authenticationData); |
1115 | } |
1116 | } |
1117 | |
1118 | return properties.serializePayload(); |
1119 | } |
1120 | |
1121 | QByteArray QMqttConnection::writeLastWillProperties() const |
1122 | { |
1123 | QMqttControlPacket properties; |
1124 | const QMqttLastWillProperties &lastWillProperties = m_clientPrivate->m_lastWillProperties; |
1125 | // Will Delay interval 3.1.3.2.2 |
1126 | if (lastWillProperties.willDelayInterval() > 0) { |
1127 | const quint32 delay = lastWillProperties.willDelayInterval(); |
1128 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify will delay interval:" |
1129 | << delay; |
1130 | properties.append(value: char(0x18)); |
1131 | properties.append(value: delay); |
1132 | } |
1133 | |
1134 | // Payload Format Indicator 3.1.3.2.3 |
1135 | if (lastWillProperties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) { |
1136 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: payload format indicator specified" ; |
1137 | properties.append(value: char(0x01)); |
1138 | properties.append(value: char(0x01)); // UTF8 |
1139 | } |
1140 | |
1141 | // Message Expiry Interval 3.1.3.2.4 |
1142 | if (lastWillProperties.messageExpiryInterval() > 0) { |
1143 | const quint32 interval = lastWillProperties.messageExpiryInterval(); |
1144 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Message Expiry interval:" |
1145 | << interval; |
1146 | properties.append(value: char(0x02)); |
1147 | properties.append(value: interval); |
1148 | } |
1149 | |
1150 | // Content Type 3.1.3.2.5 |
1151 | if (!lastWillProperties.contentType().isEmpty()) { |
1152 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Content Type:" |
1153 | << lastWillProperties.contentType(); |
1154 | properties.append(value: char(0x03)); |
1155 | properties.append(data: lastWillProperties.contentType().toUtf8()); |
1156 | } |
1157 | |
1158 | // Response Topic 3.1.3.2.6 |
1159 | if (!lastWillProperties.responseTopic().isEmpty()) { |
1160 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Response Topic:" |
1161 | << lastWillProperties.responseTopic(); |
1162 | properties.append(value: char(0x08)); |
1163 | properties.append(data: lastWillProperties.responseTopic().toUtf8()); |
1164 | } |
1165 | |
1166 | // Correlation Data 3.1.3.2.7 |
1167 | if (!lastWillProperties.correlationData().isEmpty()) { |
1168 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Correlation Data:" |
1169 | << lastWillProperties.correlationData(); |
1170 | properties.append(value: char(0x09)); |
1171 | properties.append(data: lastWillProperties.correlationData()); |
1172 | } |
1173 | |
1174 | // User Properties 3.1.3.2.8 |
1175 | if (!lastWillProperties.userProperties().isEmpty()) { |
1176 | auto userProperties = lastWillProperties.userProperties(); |
1177 | qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify user properties" ; |
1178 | for (const auto &prop : userProperties) { |
1179 | properties.append(value: char(0x26)); |
1180 | properties.append(data: prop.name().toUtf8()); |
1181 | properties.append(data: prop.value().toUtf8()); |
1182 | } |
1183 | } |
1184 | |
1185 | return properties.serializePayload(); |
1186 | } |
1187 | |
1188 | QByteArray QMqttConnection::writePublishProperties(const QMqttPublishProperties &properties) |
1189 | { |
1190 | QMqttControlPacket packet; |
1191 | |
1192 | // 3.3.2.3.2 Payload Indicator |
1193 | if (properties.availableProperties() & QMqttPublishProperties::PayloadFormatIndicator && |
1194 | properties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) { |
1195 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Payload Indicator:" |
1196 | << (properties.payloadFormatIndicator() == QMqtt::PayloadFormatIndicator::UTF8Encoded ? 1 : 0); |
1197 | packet.append(value: char(0x01)); |
1198 | switch (properties.payloadFormatIndicator()) { |
1199 | case QMqtt::PayloadFormatIndicator::UTF8Encoded: |
1200 | packet.append(value: char(0x01)); |
1201 | break; |
1202 | default: |
1203 | qCDebug(lcMqttConnection) << "Unknown payload indicator." ; |
1204 | break; |
1205 | } |
1206 | } |
1207 | |
1208 | // 3.3.2.3.3 Message Expiry |
1209 | if (properties.availableProperties() & QMqttPublishProperties::MessageExpiryInterval && |
1210 | properties.messageExpiryInterval() > 0) { |
1211 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Message Expiry :" |
1212 | << properties.messageExpiryInterval(); |
1213 | packet.append(value: char(0x02)); |
1214 | packet.append(value: properties.messageExpiryInterval()); |
1215 | } |
1216 | |
1217 | // 3.3.2.3.4 Topic alias |
1218 | if (properties.availableProperties() & QMqttPublishProperties::TopicAlias && |
1219 | properties.topicAlias() > 0) { |
1220 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Topic Alias :" |
1221 | << properties.topicAlias(); |
1222 | if (m_clientPrivate->m_serverConnectionProperties.availableProperties() & QMqttServerConnectionProperties::MaximumTopicAlias |
1223 | && properties.topicAlias() > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) { |
1224 | qCDebug(lcMqttConnection) << "Invalid topic alias specified: " << properties.topicAlias() |
1225 | << " Maximum by server is:" |
1226 | << m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias(); |
1227 | |
1228 | } else { |
1229 | packet.append(value: char(0x23)); |
1230 | packet.append(value: properties.topicAlias()); |
1231 | } |
1232 | } |
1233 | |
1234 | // 3.3.2.3.5 Response Topic |
1235 | if (properties.availableProperties() & QMqttPublishProperties::ResponseTopic && |
1236 | !properties.responseTopic().isEmpty()) { |
1237 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Response Topic :" |
1238 | << properties.responseTopic(); |
1239 | packet.append(value: char(0x08)); |
1240 | packet.append(data: properties.responseTopic().toUtf8()); |
1241 | } |
1242 | |
1243 | // 3.3.2.3.6 Correlation Data |
1244 | if (properties.availableProperties() & QMqttPublishProperties::CorrelationData && |
1245 | !properties.correlationData().isEmpty()) { |
1246 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Correlation Data :" |
1247 | << properties.correlationData(); |
1248 | packet.append(value: char(0x09)); |
1249 | packet.append(data: properties.correlationData()); |
1250 | } |
1251 | |
1252 | // 3.3.2.3.7 User Property |
1253 | if (properties.availableProperties() & QMqttPublishProperties::UserProperty) { |
1254 | auto userProperties = properties.userProperties(); |
1255 | if (!userProperties.isEmpty()) { |
1256 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: specify user properties" ; |
1257 | for (const auto &prop : userProperties) { |
1258 | packet.append(value: char(0x26)); |
1259 | packet.append(data: prop.name().toUtf8()); |
1260 | packet.append(data: prop.value().toUtf8()); |
1261 | } |
1262 | } |
1263 | } |
1264 | |
1265 | // 3.3.2.3.8 Subscription Identifier |
1266 | if (properties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) { |
1267 | for (auto id : properties.subscriptionIdentifiers()) { |
1268 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Subscription ID:" << id; |
1269 | packet.append(value: char(0x0b)); |
1270 | packet.appendRawVariableInteger(value: id); |
1271 | } |
1272 | } |
1273 | |
1274 | // 3.3.2.3.9 Content Type |
1275 | if (properties.availableProperties() & QMqttPublishProperties::ContentType && |
1276 | !properties.contentType().isEmpty()) { |
1277 | qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Content Type :" |
1278 | << properties.contentType(); |
1279 | packet.append(value: char(0x03)); |
1280 | packet.append(data: properties.contentType().toUtf8()); |
1281 | } |
1282 | |
1283 | return packet.serializePayload(); |
1284 | } |
1285 | |
1286 | QByteArray QMqttConnection::writeSubscriptionProperties(const QMqttSubscriptionProperties &properties) |
1287 | { |
1288 | QMqttControlPacket packet; |
1289 | |
1290 | // 3.8.2.1.2 Subscription Identifier |
1291 | if (properties.subscriptionIdentifier() > 0) { |
1292 | qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: Subscription Identifier" ; |
1293 | packet.append(value: char(0x0b)); |
1294 | packet.appendRawVariableInteger(value: properties.subscriptionIdentifier()); |
1295 | } |
1296 | |
1297 | // 3.8.2.1.3 User Property |
1298 | auto userProperties = properties.userProperties(); |
1299 | if (!userProperties.isEmpty()) { |
1300 | qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: specify user properties" ; |
1301 | for (const auto &prop : userProperties) { |
1302 | packet.append(value: char(0x26)); |
1303 | packet.append(data: prop.name().toUtf8()); |
1304 | packet.append(data: prop.value().toUtf8()); |
1305 | } |
1306 | } |
1307 | |
1308 | return packet.serializePayload(); |
1309 | } |
1310 | |
1311 | QByteArray QMqttConnection::writeUnsubscriptionProperties(const QMqttUnsubscriptionProperties &properties) |
1312 | { |
1313 | QMqttControlPacket packet; |
1314 | |
1315 | // 3.10.2.1.2 |
1316 | auto userProperties = properties.userProperties(); |
1317 | if (!userProperties.isEmpty()) { |
1318 | qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties" ; |
1319 | for (const auto &prop : userProperties) { |
1320 | packet.append(value: char(0x26)); |
1321 | packet.append(data: prop.name().toUtf8()); |
1322 | packet.append(data: prop.value().toUtf8()); |
1323 | } |
1324 | } |
1325 | |
1326 | return packet.serializePayload(); |
1327 | } |
1328 | |
1329 | QByteArray QMqttConnection::writeAuthenticationProperties(const QMqttAuthenticationProperties &properties) |
1330 | { |
1331 | QMqttControlPacket packet; |
1332 | |
1333 | // 3.15.2.2.2 |
1334 | if (!properties.authenticationMethod().isEmpty()) { |
1335 | packet.append(value: char(0x15)); |
1336 | packet.append(data: properties.authenticationMethod().toUtf8()); |
1337 | } |
1338 | // 3.15.2.2.3 |
1339 | if (!properties.authenticationData().isEmpty()) { |
1340 | packet.append(value: char(0x16)); |
1341 | packet.append(data: properties.authenticationData()); |
1342 | } |
1343 | |
1344 | // 3.15.2.2.4 |
1345 | if (!properties.reason().isEmpty()) { |
1346 | packet.append(value: char(0x1F)); |
1347 | packet.append(data: properties.reason().toUtf8()); |
1348 | } |
1349 | |
1350 | // 3.15.2.2.5 |
1351 | auto userProperties = properties.userProperties(); |
1352 | if (!userProperties.isEmpty()) { |
1353 | qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties" ; |
1354 | for (const auto &prop : userProperties) { |
1355 | packet.append(value: char(0x26)); |
1356 | packet.append(data: prop.name().toUtf8()); |
1357 | packet.append(data: prop.value().toUtf8()); |
1358 | } |
1359 | } |
1360 | |
1361 | return packet.serializePayload(); |
1362 | } |
1363 | |
1364 | void QMqttConnection::finalize_auth() |
1365 | { |
1366 | qCDebug(lcMqttConnectionVerbose) << "Finalize AUTH" ; |
1367 | |
1368 | quint8 authReason = 0; |
1369 | QMqttAuthenticationProperties authProperties; |
1370 | // 3.15.2.1 - The Reason Code and Property Length can be omitted if the Reason Code |
1371 | // is 0x00 (Success) and there are no Properties. In this case the AUTH has a |
1372 | // Remaining Length of 0. |
1373 | if (m_missingData > 0) { |
1374 | authReason = readBufferTyped<quint8>(dataSize: &m_missingData); |
1375 | readAuthProperties(properties&: authProperties); |
1376 | } |
1377 | |
1378 | // 3.15.2.1 |
1379 | switch (QMqtt::ReasonCode(authReason)) { |
1380 | case QMqtt::ReasonCode::Success: |
1381 | emit m_clientPrivate->m_client->authenticationFinished(p: authProperties); |
1382 | break; |
1383 | case QMqtt::ReasonCode::ContinueAuthentication: |
1384 | case QMqtt::ReasonCode::ReAuthenticate: |
1385 | emit m_clientPrivate->m_client->authenticationRequested(p: authProperties); |
1386 | break; |
1387 | default: |
1388 | qCDebug(lcMqttConnection) << "Received illegal AUTH reason code:" << authReason; |
1389 | closeConnection(error: QMqttClient::ProtocolViolation); |
1390 | break; |
1391 | } |
1392 | } |
1393 | |
1394 | void QMqttConnection::finalize_connack() |
1395 | { |
1396 | qCDebug(lcMqttConnectionVerbose) << "Finalize CONNACK" ; |
1397 | |
1398 | const quint8 ackFlags = readBufferTyped<quint8>(dataSize: &m_missingData); |
1399 | |
1400 | if (ackFlags > 1) { // MQTT-3.2.2.1 |
1401 | qCDebug(lcMqttConnection) << "Unexpected CONNACK Flags specified:" << QString::number(ackFlags); |
1402 | readBuffer(size: quint64(m_missingData)); |
1403 | m_missingData = 0; |
1404 | closeConnection(error: QMqttClient::ProtocolViolation); |
1405 | return; |
1406 | } |
1407 | bool sessionPresent = ackFlags == 1; |
1408 | |
1409 | // MQTT-3.2.2-1 & MQTT-3.2.2-2 |
1410 | if (sessionPresent) { |
1411 | emit m_clientPrivate->m_client->brokerSessionRestored(); |
1412 | if (m_clientPrivate->m_cleanSession) |
1413 | qCDebug(lcMqttConnection) << "Connected with a clean session, ack contains session present." ; |
1414 | } else { |
1415 | // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side |
1416 | // regardless whether cleanSession is false |
1417 | cleanSubscriptions(); |
1418 | } |
1419 | |
1420 | quint8 connectResultValue = readBufferTyped<quint8>(dataSize: &m_missingData); |
1421 | QMqttServerConnectionProperties serverProp; |
1422 | serverProp.serverData->reasonCode = QMqtt::ReasonCode(connectResultValue); |
1423 | m_clientPrivate->m_serverConnectionProperties = serverProp; |
1424 | if (connectResultValue != 0 && m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
1425 | qCDebug(lcMqttConnection) << "Connection has been rejected." ; |
1426 | closeConnection(error: static_cast<QMqttClient::ClientError>(connectResultValue)); |
1427 | return; |
1428 | } |
1429 | |
1430 | // MQTT 5.0 has variable part != 2 in the header |
1431 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
1432 | readConnackProperties(properties&: m_clientPrivate->m_serverConnectionProperties); |
1433 | m_receiveAliases.resize(size: m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()); |
1434 | m_publishAliases.resize(size: m_clientPrivate->m_connectionProperties.maximumTopicAlias()); |
1435 | |
1436 | // 3.2.2.2 |
1437 | switch (QMqtt::ReasonCode(connectResultValue)) { |
1438 | case QMqtt::ReasonCode::Success: |
1439 | break; |
1440 | case QMqtt::ReasonCode::MalformedPacket: |
1441 | case QMqtt::ReasonCode::ProtocolError: |
1442 | closeConnection(error: QMqttClient::ProtocolViolation); |
1443 | return; |
1444 | case QMqtt::ReasonCode::UnsupportedProtocolVersion: |
1445 | closeConnection(error: QMqttClient::InvalidProtocolVersion); |
1446 | return; |
1447 | case QMqtt::ReasonCode::InvalidClientId: |
1448 | closeConnection(error: QMqttClient::IdRejected); |
1449 | return; |
1450 | case QMqtt::ReasonCode::ServerNotAvailable: |
1451 | case QMqtt::ReasonCode::ServerBusy: |
1452 | case QMqtt::ReasonCode::UseAnotherServer: |
1453 | case QMqtt::ReasonCode::ServerMoved: |
1454 | closeConnection(error: QMqttClient::ServerUnavailable); |
1455 | return; |
1456 | case QMqtt::ReasonCode::InvalidUserNameOrPassword: |
1457 | closeConnection(error: QMqttClient::BadUsernameOrPassword); |
1458 | return; |
1459 | case QMqtt::ReasonCode::NotAuthorized: |
1460 | closeConnection(error: QMqttClient::NotAuthorized); |
1461 | return; |
1462 | case QMqtt::ReasonCode::UnspecifiedError: |
1463 | closeConnection(error: QMqttClient::UnknownError); |
1464 | return; |
1465 | case QMqtt::ReasonCode::ImplementationSpecificError: |
1466 | case QMqtt::ReasonCode::ClientBanned: |
1467 | case QMqtt::ReasonCode::InvalidAuthenticationMethod: |
1468 | case QMqtt::ReasonCode::InvalidTopicName: |
1469 | case QMqtt::ReasonCode::PacketTooLarge: |
1470 | case QMqtt::ReasonCode::QuotaExceeded: |
1471 | case QMqtt::ReasonCode::InvalidPayloadFormat: |
1472 | case QMqtt::ReasonCode::RetainNotSupported: |
1473 | case QMqtt::ReasonCode::QoSNotSupported: |
1474 | case QMqtt::ReasonCode::ExceededConnectionRate: |
1475 | closeConnection(error: QMqttClient::Mqtt5SpecificError); |
1476 | return; |
1477 | default: |
1478 | qCDebug(lcMqttConnection) << "Received illegal CONNACK reason code:" << connectResultValue; |
1479 | closeConnection(error: QMqttClient::ProtocolViolation); |
1480 | return; |
1481 | } |
1482 | } |
1483 | |
1484 | m_internalState = BrokerConnected; |
1485 | m_clientPrivate->setStateAndError(s: QMqttClient::Connected); |
1486 | |
1487 | if (m_clientPrivate->m_autoKeepAlive) |
1488 | m_pingTimer.start(msec: m_clientPrivate->m_keepAlive * 1000, obj: this); |
1489 | } |
1490 | |
1491 | void QMqttConnection::finalize_suback() |
1492 | { |
1493 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
1494 | |
1495 | auto sub = m_pendingSubscriptionAck.take(key: id); |
1496 | if (Q_UNLIKELY(sub == nullptr)) { |
1497 | qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request." ; |
1498 | return; |
1499 | } |
1500 | |
1501 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
1502 | readSubscriptionProperties(sub); |
1503 | |
1504 | // 3.9.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the SUBSCRIBE packet being acknowledged. |
1505 | // Whereas 3.8.3 states "The Payload MUST contain at least one Topic Filter and Subscription Options pair. A SUBSCRIBE packet with no Payload is a Protocol Error." |
1506 | do { |
1507 | quint8 reason = readBufferTyped<quint8>(dataSize: &m_missingData); |
1508 | |
1509 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reason); |
1510 | |
1511 | // 3.9.3 |
1512 | switch (QMqtt::ReasonCode(reason)) { |
1513 | case QMqtt::ReasonCode::SubscriptionQoSLevel0: |
1514 | case QMqtt::ReasonCode::SubscriptionQoSLevel1: |
1515 | case QMqtt::ReasonCode::SubscriptionQoSLevel2: |
1516 | qCDebug(lcMqttConnectionVerbose) << "Finalize SUBACK: id:" << id << "qos:" << reason; |
1517 | // The broker might have a different support level for QoS than what |
1518 | // the client requested |
1519 | if (reason != sub->qos()) { |
1520 | sub->setQos(reason); |
1521 | emit sub->qosChanged(reason); |
1522 | } |
1523 | sub->setState(QMqttSubscription::Subscribed); |
1524 | break; |
1525 | case QMqtt::ReasonCode::UnspecifiedError: |
1526 | qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason; |
1527 | sub->setState(QMqttSubscription::Error); |
1528 | break; |
1529 | case QMqtt::ReasonCode::ImplementationSpecificError: |
1530 | case QMqtt::ReasonCode::NotAuthorized: |
1531 | case QMqtt::ReasonCode::InvalidTopicFilter: |
1532 | case QMqtt::ReasonCode::MessageIdInUse: |
1533 | case QMqtt::ReasonCode::QuotaExceeded: |
1534 | case QMqtt::ReasonCode::SharedSubscriptionsNotSupported: |
1535 | case QMqtt::ReasonCode::SubscriptionIdsNotSupported: |
1536 | case QMqtt::ReasonCode::WildCardSubscriptionsNotSupported: |
1537 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
1538 | qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason; |
1539 | sub->setState(QMqttSubscription::Error); |
1540 | break; |
1541 | } |
1542 | Q_FALLTHROUGH(); |
1543 | default: |
1544 | qCWarning(lcMqttConnection) << "Received illegal SUBACK reason code:" << reason; |
1545 | closeConnection(error: QMqttClient::ProtocolViolation); |
1546 | break; |
1547 | } |
1548 | } while (m_missingData > 0); |
1549 | } |
1550 | |
1551 | void QMqttConnection::finalize_unsuback() |
1552 | { |
1553 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
1554 | qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id; |
1555 | |
1556 | auto sub = m_pendingUnsubscriptions.take(key: id); |
1557 | if (Q_UNLIKELY(sub == nullptr)) { |
1558 | qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request." ; |
1559 | return; |
1560 | } |
1561 | |
1562 | m_activeSubscriptions.remove(key: sub->topic()); |
1563 | |
1564 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) { |
1565 | readSubscriptionProperties(sub); |
1566 | } else { |
1567 | // 3.11.3 - The UNSUBACK Packet has no payload. |
1568 | // emulate successful unsubscription |
1569 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode::Success; |
1570 | sub->setState(QMqttSubscription::Unsubscribed); |
1571 | return; |
1572 | } |
1573 | |
1574 | // 3.1.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the UNSUBSCRIBE packet being acknowledged. |
1575 | // Whereas 3.10.3 states "The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no Payload is a Protocol Error." |
1576 | do { |
1577 | const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData); |
1578 | sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reasonCode); |
1579 | |
1580 | // 3.11.3 |
1581 | switch (QMqtt::ReasonCode(reasonCode)) { |
1582 | case QMqtt::ReasonCode::Success: |
1583 | sub->setState(QMqttSubscription::Unsubscribed); |
1584 | break; |
1585 | case QMqtt::ReasonCode::NoSubscriptionExisted: |
1586 | case QMqtt::ReasonCode::ImplementationSpecificError: |
1587 | case QMqtt::ReasonCode::NotAuthorized: |
1588 | case QMqtt::ReasonCode::InvalidTopicFilter: |
1589 | case QMqtt::ReasonCode::MessageIdInUse: |
1590 | case QMqtt::ReasonCode::UnspecifiedError: |
1591 | qCWarning(lcMqttConnection) << "Unsubscription for id " << id << " failed. Reason Code:" << reasonCode; |
1592 | sub->setState(QMqttSubscription::Error); |
1593 | break; |
1594 | default: |
1595 | qCWarning(lcMqttConnection) << "Received illegal UNSUBACK reason code:" << reasonCode; |
1596 | closeConnection(error: QMqttClient::ProtocolViolation); |
1597 | break; |
1598 | } |
1599 | } while (m_missingData > 0); |
1600 | } |
1601 | |
1602 | void QMqttConnection::finalize_publish() |
1603 | { |
1604 | // String topic |
1605 | QMqttTopicName topic = readBufferTyped<QString>(dataSize: &m_missingData); |
1606 | const int topicLength = topic.name().size(); |
1607 | |
1608 | quint16 id = 0; |
1609 | if (m_currentPublish.qos > 0) |
1610 | id = readBufferTyped<quint16>(dataSize: &m_missingData); |
1611 | |
1612 | QMqttPublishProperties publishProperties; |
1613 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) |
1614 | readPublishProperties(properties&: publishProperties); |
1615 | |
1616 | if (publishProperties.availableProperties() & QMqttPublishProperties::TopicAlias) { |
1617 | const quint16 topicAlias = publishProperties.topicAlias(); |
1618 | if (topicAlias == 0 || topicAlias > m_clientPrivate->m_connectionProperties.maximumTopicAlias()) { |
1619 | qCDebug(lcMqttConnection) << "TopicAlias receive: overflow." ; |
1620 | closeConnection(error: QMqttClient::ProtocolViolation); |
1621 | return; |
1622 | } |
1623 | if (topicLength == 0) { // New message on existing topic alias |
1624 | topic = m_receiveAliases.at(i: topicAlias - 1); |
1625 | if (topic.name().isEmpty()) { |
1626 | qCDebug(lcMqttConnection) << "TopicAlias receive: alias for unknown topic." ; |
1627 | closeConnection(error: QMqttClient::ProtocolViolation); |
1628 | return; |
1629 | } |
1630 | qCDebug(lcMqttConnectionVerbose) << "TopicAlias receive: Using " << topicAlias; |
1631 | } else { // Resetting a topic alias |
1632 | qCDebug(lcMqttConnection) << "TopicAlias receive: Resetting:" << topic.name() << " : " << topicAlias; |
1633 | m_receiveAliases[topicAlias - 1] = topic; |
1634 | } |
1635 | } |
1636 | |
1637 | // message |
1638 | const quint64 payloadLength = quint64(m_missingData); |
1639 | const QByteArray message = readBuffer(size: payloadLength); |
1640 | m_missingData -= payloadLength; |
1641 | |
1642 | qCDebug(lcMqttConnectionVerbose) << "Finalize PUBLISH: topic:" << topic |
1643 | << " payloadLength:" << payloadLength;; |
1644 | |
1645 | emit m_clientPrivate->m_client->messageReceived(message, topic); |
1646 | |
1647 | QMqttMessage qmsg(topic, message, id, m_currentPublish.qos, |
1648 | m_currentPublish.dup, m_currentPublish.retain); |
1649 | qmsg.d->m_publishProperties = publishProperties; |
1650 | |
1651 | if (id != 0) { |
1652 | QMqttMessageStatusProperties statusProp; |
1653 | statusProp.data->userProperties = publishProperties.userProperties(); |
1654 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Published, properties: statusProp); |
1655 | } |
1656 | |
1657 | // Store subscriptions in a temporary container as each messageReceived is allowed to subscribe |
1658 | // again and thus invalid the iterator of the loop. |
1659 | QList<QMqttSubscription *> subscribers; |
1660 | for (const auto [key, value] : m_activeSubscriptions.asKeyValueRange()) { |
1661 | if (key.match(name: topic)) |
1662 | subscribers.append(t: value); |
1663 | } |
1664 | for (const auto &s : subscribers) |
1665 | emit s->messageReceived(msg: qmsg); |
1666 | |
1667 | if (m_currentPublish.qos == 1) |
1668 | sendControlPublishAcknowledge(id); |
1669 | else if (m_currentPublish.qos == 2) |
1670 | sendControlPublishReceive(id); |
1671 | } |
1672 | |
1673 | void QMqttConnection::finalize_pubAckRecRelComp() |
1674 | { |
1675 | qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/REL/COMP" ; |
1676 | const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData); |
1677 | |
1678 | QMqttMessageStatusProperties properties; |
1679 | if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) { |
1680 | // Reason Code (1byte) |
1681 | const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData); |
1682 | properties.data->reasonCode = QMqtt::ReasonCode(reasonCode); |
1683 | |
1684 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBACK || (m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { |
1685 | // 3.4.2.1, 3.5.2.1 |
1686 | switch (QMqtt::ReasonCode(reasonCode)) { |
1687 | case QMqtt::ReasonCode::Success: |
1688 | case QMqtt::ReasonCode::NoMatchingSubscriber: |
1689 | case QMqtt::ReasonCode::UnspecifiedError: |
1690 | case QMqtt::ReasonCode::ImplementationSpecificError: |
1691 | case QMqtt::ReasonCode::NotAuthorized: |
1692 | case QMqtt::ReasonCode::InvalidTopicName: |
1693 | case QMqtt::ReasonCode::MessageIdInUse: |
1694 | case QMqtt::ReasonCode::QuotaExceeded: |
1695 | case QMqtt::ReasonCode::InvalidPayloadFormat: |
1696 | break; |
1697 | default: |
1698 | qCWarning(lcMqttConnection) << "Received illegal PUBACK/REC reason code:" << reasonCode; |
1699 | closeConnection(error: QMqttClient::ProtocolViolation); |
1700 | return; |
1701 | } |
1702 | } else { |
1703 | // 3.6.2.1, 3.7.2.1 |
1704 | switch (QMqtt::ReasonCode(reasonCode)) { |
1705 | case QMqtt::ReasonCode::Success: |
1706 | case QMqtt::ReasonCode::MessageIdNotFound: |
1707 | break; |
1708 | default: |
1709 | qCWarning(lcMqttConnection) << "Received illegal PUBREL/COMP reason code:" << reasonCode; |
1710 | closeConnection(error: QMqttClient::ProtocolViolation); |
1711 | return; |
1712 | } |
1713 | } |
1714 | |
1715 | readMessageStatusProperties(properties); |
1716 | } |
1717 | |
1718 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREL) { |
1719 | qCDebug(lcMqttConnectionVerbose) << " PUBREL:" << id; |
1720 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Released, properties); |
1721 | sendControlPublishComp(id); |
1722 | return; |
1723 | } |
1724 | |
1725 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) { |
1726 | qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id; |
1727 | auto pendingRelease = m_pendingReleaseMessages.take(key: id); |
1728 | if (!pendingRelease) |
1729 | qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message." ; |
1730 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Completed, properties); |
1731 | emit m_clientPrivate->m_client->messageSent(id); |
1732 | return; |
1733 | } |
1734 | |
1735 | auto pendingMsg = m_pendingMessages.take(key: id); |
1736 | if (!pendingMsg) { |
1737 | qCDebug(lcMqttConnection) << "Received PUBACK for unknown message: " << id; |
1738 | return; |
1739 | } |
1740 | if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) { |
1741 | qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id; |
1742 | m_pendingReleaseMessages.insert(key: id, value: pendingMsg); |
1743 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Received, properties); |
1744 | sendControlPublishRelease(id); |
1745 | } else { |
1746 | qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id; |
1747 | emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Acknowledged, properties); |
1748 | emit m_clientPrivate->m_client->messageSent(id); |
1749 | } |
1750 | } |
1751 | |
1752 | void QMqttConnection::finalize_pingresp() |
1753 | { |
1754 | qCDebug(lcMqttConnectionVerbose) << "Finalize PINGRESP" ; |
1755 | const quint8 v = readBufferTyped<quint8>(dataSize: &m_missingData); |
1756 | |
1757 | if (v != 0) { |
1758 | qCDebug(lcMqttConnection) << "Received a PINGRESP including payload." ; |
1759 | closeConnection(error: QMqttClient::ProtocolViolation); |
1760 | return; |
1761 | } |
1762 | m_pingTimeout--; |
1763 | emit m_clientPrivate->m_client->pingResponseReceived(); |
1764 | } |
1765 | |
1766 | bool QMqttConnection::processDataHelper() |
1767 | { |
1768 | if (m_missingData > 0) { |
1769 | if ((m_readBuffer.size() - m_readPosition) < m_missingData) |
1770 | return false; |
1771 | |
1772 | switch (m_currentPacket & 0xF0) { |
1773 | case QMqttControlPacket::AUTH: |
1774 | finalize_auth(); |
1775 | break; |
1776 | case QMqttControlPacket::CONNACK: |
1777 | finalize_connack(); |
1778 | break; |
1779 | case QMqttControlPacket::SUBACK: |
1780 | finalize_suback(); |
1781 | break; |
1782 | case QMqttControlPacket::UNSUBACK: |
1783 | finalize_unsuback(); |
1784 | break; |
1785 | case QMqttControlPacket::PUBLISH: |
1786 | finalize_publish(); |
1787 | break; |
1788 | case QMqttControlPacket::PUBACK: |
1789 | case QMqttControlPacket::PUBREC: |
1790 | case QMqttControlPacket::PUBREL: |
1791 | case QMqttControlPacket::PUBCOMP: |
1792 | finalize_pubAckRecRelComp(); |
1793 | break; |
1794 | case QMqttControlPacket::PINGRESP: |
1795 | finalize_pingresp(); |
1796 | break; |
1797 | default: |
1798 | qCDebug(lcMqttConnection) << "Unknown packet to finalize." ; |
1799 | closeConnection(error: QMqttClient::ProtocolViolation); |
1800 | break; |
1801 | } |
1802 | |
1803 | if (m_internalState == BrokerDisconnected) |
1804 | return false; |
1805 | |
1806 | Q_ASSERT(m_missingData == 0); |
1807 | |
1808 | m_readBuffer = m_readBuffer.mid(index: m_readPosition); |
1809 | m_readPosition = 0; |
1810 | } |
1811 | |
1812 | // MQTT-2.2 A fixed header of a control packet must be at least 2 bytes. If the payload is |
1813 | // longer than 127 bytes the header can be up to 5 bytes long. |
1814 | switch (m_readBuffer.size()) { |
1815 | case 0: |
1816 | case 1: |
1817 | return false; |
1818 | case 2: |
1819 | if ((m_readBuffer.at(i: 1) & 128) != 0) |
1820 | return false; |
1821 | break; |
1822 | case 3: |
1823 | if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0) |
1824 | return false; |
1825 | break; |
1826 | case 4: |
1827 | if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0 && (m_readBuffer.at(i: 3) & 128) != 0) |
1828 | return false; |
1829 | break; |
1830 | default: |
1831 | break; |
1832 | } |
1833 | |
1834 | readBuffer(data: reinterpret_cast<char *>(&m_currentPacket), size: 1); |
1835 | |
1836 | switch (m_currentPacket & 0xF0) { |
1837 | case QMqttControlPacket::CONNACK: { |
1838 | qCDebug(lcMqttConnectionVerbose) << "Received CONNACK" ; |
1839 | if (m_internalState != BrokerWaitForConnectAck) { |
1840 | qCDebug(lcMqttConnection) << "Received CONNACK at unexpected time." ; |
1841 | closeConnection(error: QMqttClient::ProtocolViolation); |
1842 | return false; |
1843 | } |
1844 | |
1845 | qint32 payloadSize = readVariableByteInteger(); |
1846 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
1847 | if (payloadSize != 2) { |
1848 | qCDebug(lcMqttConnection) << "Unexpected FRAME size in CONNACK." ; |
1849 | closeConnection(error: QMqttClient::ProtocolViolation); |
1850 | return false; |
1851 | } |
1852 | } |
1853 | m_missingData = payloadSize; |
1854 | break; |
1855 | } |
1856 | case QMqttControlPacket::SUBACK: { |
1857 | qCDebug(lcMqttConnectionVerbose) << "Received SUBACK" ; |
1858 | const quint8 remaining = readBufferTyped<quint8>(); |
1859 | m_missingData = remaining; |
1860 | break; |
1861 | } |
1862 | case QMqttControlPacket::PUBLISH: { |
1863 | qCDebug(lcMqttConnectionVerbose) << "Received PUBLISH" ; |
1864 | m_currentPublish.dup = m_currentPacket & 0x08; |
1865 | m_currentPublish.qos = (m_currentPacket & 0x06) >> 1; |
1866 | m_currentPublish.retain = m_currentPacket & 0x01; |
1867 | if ((m_currentPublish.qos == 0 && m_currentPublish.dup != 0) |
1868 | || m_currentPublish.qos > 2) { |
1869 | closeConnection(error: QMqttClient::ProtocolViolation); |
1870 | return false; |
1871 | } |
1872 | |
1873 | m_missingData = readVariableByteInteger(); |
1874 | if (m_missingData == -1) |
1875 | return false; // Connection closed inside readVariableByteInteger |
1876 | break; |
1877 | } |
1878 | case QMqttControlPacket::PINGRESP: |
1879 | qCDebug(lcMqttConnectionVerbose) << "Received PINGRESP" ; |
1880 | m_missingData = 1; |
1881 | break; |
1882 | |
1883 | |
1884 | case QMqttControlPacket::PUBREL: { |
1885 | qCDebug(lcMqttConnectionVerbose) << "Received PUBREL" ; |
1886 | const quint8 remaining = readBufferTyped<quint8>(); |
1887 | if (remaining != 0x02) { |
1888 | qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length." ; |
1889 | closeConnection(error: QMqttClient::ProtocolViolation); |
1890 | return false; |
1891 | } |
1892 | if ((m_currentPacket & 0x0F) != 0x02) { |
1893 | qCDebug(lcMqttConnection) << "Malformed fixed header for PUBREL." ; |
1894 | closeConnection(error: QMqttClient::ProtocolViolation); |
1895 | return false; |
1896 | } |
1897 | m_missingData = 2; |
1898 | break; |
1899 | } |
1900 | |
1901 | case QMqttControlPacket::UNSUBACK: |
1902 | case QMqttControlPacket::PUBACK: |
1903 | case QMqttControlPacket::PUBREC: |
1904 | case QMqttControlPacket::PUBCOMP: { |
1905 | qCDebug(lcMqttConnectionVerbose) << "Received UNSUBACK/PUBACK/PUBREC/PUBCOMP" ; |
1906 | if ((m_currentPacket & 0x0F) != 0) { |
1907 | qCDebug(lcMqttConnection) << "Malformed fixed header for UNSUBACK/PUBACK/PUBREC/PUBCOMP." ; |
1908 | closeConnection(error: QMqttClient::ProtocolViolation); |
1909 | return false; |
1910 | } |
1911 | const quint8 remaining = readBufferTyped<quint8>(); |
1912 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0 && remaining != 0x02) { |
1913 | qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length." ; |
1914 | closeConnection(error: QMqttClient::ProtocolViolation); |
1915 | return false; |
1916 | } |
1917 | m_missingData = remaining; |
1918 | break; |
1919 | } |
1920 | case QMqttControlPacket::AUTH: |
1921 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
1922 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
1923 | closeConnection(error: QMqttClient::ProtocolViolation); |
1924 | return false; |
1925 | } |
1926 | qCDebug(lcMqttConnectionVerbose) << "Received AUTH" ; |
1927 | if ((m_currentPacket & 0x0F) != 0) { |
1928 | qCDebug(lcMqttConnection) << "Malformed fixed header for AUTH." ; |
1929 | closeConnection(error: QMqttClient::ProtocolViolation); |
1930 | return false; |
1931 | } |
1932 | m_missingData = readVariableByteInteger(); |
1933 | if (m_missingData == -1) |
1934 | return false; // Connection closed inside readVariableByteInteger |
1935 | break; |
1936 | case QMqttControlPacket::DISCONNECT: |
1937 | if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) { |
1938 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
1939 | closeConnection(error: QMqttClient::ProtocolViolation); |
1940 | return false; |
1941 | } |
1942 | qCDebug(lcMqttConnectionVerbose) << "Received DISCONNECT" ; |
1943 | if ((m_currentPacket & 0x0F) != 0) { |
1944 | qCDebug(lcMqttConnection) << "Malformed fixed header for DISCONNECT." ; |
1945 | closeConnection(error: QMqttClient::ProtocolViolation); |
1946 | return false; |
1947 | } |
1948 | if (m_internalState != BrokerConnected) { |
1949 | qCDebug(lcMqttConnection) << "Received DISCONNECT at unexpected time." ; |
1950 | closeConnection(error: QMqttClient::ProtocolViolation); |
1951 | return false; |
1952 | } |
1953 | closeConnection(error: QMqttClient::NoError); |
1954 | return false; |
1955 | default: |
1956 | qCDebug(lcMqttConnection) << "Received unknown command." ; |
1957 | closeConnection(error: QMqttClient::ProtocolViolation); |
1958 | return false; |
1959 | } |
1960 | |
1961 | /* set current command CONNACK - PINGRESP */ |
1962 | /* read command size */ |
1963 | /* calculate missing_data */ |
1964 | return true; // reiterate. implicitly finishes and enqueues |
1965 | } |
1966 | |
1967 | void QMqttConnection::processData() |
1968 | { |
1969 | while (processDataHelper()) |
1970 | ; |
1971 | } |
1972 | |
1973 | bool QMqttConnection::writePacketToTransport(const QMqttControlPacket &p) |
1974 | { |
1975 | const QByteArray writeData = p.serialize(); |
1976 | qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO << " DataSize:" << writeData.size(); |
1977 | const qint64 res = m_transport->write(data: writeData.constData(), len: writeData.size()); |
1978 | if (Q_UNLIKELY(res == -1)) { |
1979 | qCDebug(lcMqttConnection) << "Could not write frame to transport." ; |
1980 | return false; |
1981 | } |
1982 | return true; |
1983 | } |
1984 | |
1985 | QT_END_NAMESPACE |
1986 | |