1// Copyright (C) 2017 The Qt Company Ltd.
2// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
3
4#include "qmqttconnection_p.h"
5#include "qmqttconnectionproperties_p.h"
6#include "qmqttcontrolpacket_p.h"
7#include "qmqttmessage_p.h"
8#include "qmqttpublishproperties_p.h"
9#include "qmqttsubscription_p.h"
10#include "qmqttclient_p.h"
11
12#include <QtCore/QLoggingCategory>
13#include <QtNetwork/QSslSocket>
14#include <QtNetwork/QTcpSocket>
15
16#include <limits>
17#include <cstdint>
18
19QT_BEGIN_NAMESPACE
20
21Q_LOGGING_CATEGORY(lcMqttConnection, "qt.mqtt.connection")
22Q_LOGGING_CATEGORY(lcMqttConnectionVerbose, "qt.mqtt.connection.verbose");
23
24template <typename T>
25T QMqttConnection::readBufferTyped(qint64 *dataSize)
26{
27 Q_STATIC_ASSERT(std::is_integral<T>::value);
28
29 T result = 0;
30 if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(sizeof(result)))) {
31 qCWarning(lcMqttConnection) << "Attempt to read past the data";
32 return result;
33 }
34 if (readBuffer(data: reinterpret_cast<char *>(&result), size: sizeof(result)) && dataSize != nullptr)
35 *dataSize -= sizeof(result);
36 return qFromBigEndian(result);
37}
38
39template<>
40QByteArray QMqttConnection::readBufferTyped(qint64 *dataSize)
41{
42 const quint16 size = readBufferTyped<quint16>(dataSize);
43 if (Q_UNLIKELY(dataSize != nullptr && *dataSize < qint64(size))) {
44 qCWarning(lcMqttConnection) << "Attempt to read past the data";
45 return QByteArray();
46 }
47 QByteArray ba(int(size), Qt::Uninitialized);
48 if (readBuffer(data: ba.data(), size) && dataSize != nullptr)
49 *dataSize -= size;
50 return ba;
51}
52
53template<>
54QString QMqttConnection::readBufferTyped(qint64 *dataSize)
55{
56 return QString::fromUtf8(ba: readBufferTyped<QByteArray>(dataSize));
57}
58
59QMqttConnection::QMqttConnection(QObject *parent) : QObject(parent)
60{
61}
62
63QMqttConnection::~QMqttConnection()
64{
65 if (m_internalState == BrokerConnected)
66 sendControlDisconnect();
67
68 if (m_ownTransport && m_transport)
69 delete m_transport;
70}
71
72void QMqttConnection::timerEvent(QTimerEvent *event)
73{
74 if (Q_LIKELY(event->timerId() == m_pingTimer.timerId())) {
75 sendControlPingRequest();
76 return;
77 }
78
79 QObject::timerEvent(event);
80}
81
82void QMqttConnection::setTransport(QIODevice *device, QMqttClient::TransportType transport)
83{
84 qCDebug(lcMqttConnection) << Q_FUNC_INFO << device << " Type:" << transport;
85
86 if (m_transport) {
87 disconnect(sender: m_transport, signal: &QIODevice::aboutToClose, receiver: this, slot: &QMqttConnection::transportConnectionClosed);
88 disconnect(sender: m_transport, signal: &QIODevice::readyRead, receiver: this, slot: &QMqttConnection::transportReadyRead);
89 if (m_ownTransport)
90 delete m_transport;
91 }
92
93 m_transport = device;
94 m_transportType = transport;
95 m_ownTransport = false;
96
97 connect(sender: m_transport, signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed);
98 connect(sender: m_transport, signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead);
99}
100
101QIODevice *QMqttConnection::transport() const
102{
103 return m_transport;
104}
105
106bool QMqttConnection::ensureTransport(bool createSecureIfNeeded)
107{
108 Q_UNUSED(createSecureIfNeeded); // QT_NO_SSL
109 qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transport;
110
111 if (m_transport) {
112 if (m_ownTransport)
113 delete m_transport;
114 else
115 return true;
116 }
117
118 // We are asked to create a transport layer
119 if (m_clientPrivate->m_hostname.isEmpty() || m_clientPrivate->m_port == 0) {
120 qCDebug(lcMqttConnection) << "No hostname specified, not able to create a transport layer.";
121 return false;
122 }
123 auto socket =
124#ifndef QT_NO_SSL
125 createSecureIfNeeded ? new QSslSocket() :
126#endif
127 new QTcpSocket();
128 m_transport = socket;
129 m_ownTransport = true;
130 m_transportType =
131#ifndef QT_NO_SSL
132 createSecureIfNeeded ? QMqttClient::SecureSocket :
133#endif
134 QMqttClient::AbstractSocket;
135
136#ifndef QT_NO_SSL
137 if (QSslSocket *sslSocket = qobject_cast<QSslSocket *>(object: socket))
138 QObject::connect(sender: sslSocket, signal: &QSslSocket::encrypted, context: this, slot: &QMqttConnection::transportConnectionEstablished);
139 else
140#endif
141 connect(sender: socket, signal: &QAbstractSocket::connected, context: this, slot: &QMqttConnection::transportConnectionEstablished);
142 connect(sender: socket, signal: &QAbstractSocket::disconnected, context: this, slot: &QMqttConnection::transportConnectionClosed);
143 connect(sender: socket, signal: &QAbstractSocket::errorOccurred, context: this, slot: &QMqttConnection::transportError);
144
145 connect(sender: m_transport, signal: &QIODevice::aboutToClose, context: this, slot: &QMqttConnection::transportConnectionClosed);
146 connect(sender: m_transport, signal: &QIODevice::readyRead, context: this, slot: &QMqttConnection::transportReadyRead);
147 return true;
148}
149
150bool QMqttConnection::ensureTransportOpen(const QString &sslPeerName)
151{
152 qCDebug(lcMqttConnection) << Q_FUNC_INFO << m_transportType;
153
154 if (m_transportType == QMqttClient::IODevice) {
155 if (m_transport->isOpen())
156 return sendControlConnect();
157
158 if (!m_transport->open(mode: QIODevice::ReadWrite)) {
159 qCDebug(lcMqttConnection) << "Could not open Transport IO device.";
160 m_internalState = BrokerDisconnected;
161 return false;
162 }
163 return sendControlConnect();
164 }
165
166 if (m_transportType == QMqttClient::AbstractSocket) {
167 auto socket = qobject_cast<QTcpSocket *>(object: m_transport);
168 Q_ASSERT(socket);
169 if (socket->state() == QAbstractSocket::ConnectedState)
170 return sendControlConnect();
171
172 m_internalState = BrokerConnecting;
173 socket->connectToHost(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port);
174 }
175#ifndef QT_NO_SSL
176 else if (m_transportType == QMqttClient::SecureSocket) {
177 auto socket = qobject_cast<QSslSocket *>(object: m_transport);
178 Q_ASSERT(socket);
179 if (socket->state() == QAbstractSocket::ConnectedState)
180 return sendControlConnect();
181
182 m_internalState = BrokerConnecting;
183 if (!m_sslConfiguration.isNull())
184 socket->setSslConfiguration(m_sslConfiguration);
185 socket->connectToHostEncrypted(hostName: m_clientPrivate->m_hostname, port: m_clientPrivate->m_port, sslPeerName);
186 }
187#else
188 Q_UNUSED(sslPeerName);
189#endif
190
191 return true;
192}
193
194bool QMqttConnection::sendControlConnect()
195{
196 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
197
198 QMqttControlPacket packet(QMqttControlPacket::CONNECT);
199
200 // Variable header
201 // 3.1.2.1 Protocol Name
202 // 3.1.2.2 Protocol Level
203 switch (m_clientPrivate->m_protocolVersion) {
204 case QMqttClient::MQTT_3_1:
205 packet.append(data: "MQIsdp");
206 packet.append(value: char(3)); // Version 3.1
207 break;
208 case QMqttClient::MQTT_3_1_1:
209 packet.append(data: "MQTT");
210 packet.append(value: char(4)); // Version 3.1.1
211 break;
212 case QMqttClient::MQTT_5_0:
213 packet.append(data: "MQTT");
214 packet.append(value: char(5)); // Version 5.0
215 break;
216 }
217
218 // 3.1.2.3 Connect Flags
219 quint8 flags = 0;
220 // Clean session
221 if (m_clientPrivate->m_cleanSession)
222 flags |= 1 << 1;
223
224 if (!m_clientPrivate->m_willTopic.isEmpty()) {
225 flags |= 1 << 2;
226 if (m_clientPrivate->m_willQoS > 2) {
227 qCDebug(lcMqttConnection) << "Invalid Will QoS specified.";
228 return false;
229 }
230 if (m_clientPrivate->m_willQoS == 1)
231 flags |= 1 << 3;
232 else if (m_clientPrivate->m_willQoS == 2)
233 flags |= 1 << 4;
234 if (m_clientPrivate->m_willRetain)
235 flags |= 1 << 5;
236 }
237 if (m_clientPrivate->m_username.size())
238 flags |= 1 << 7;
239
240 if (m_clientPrivate->m_password.size())
241 flags |= 1 << 6;
242
243 packet.append(value: char(flags));
244
245 // 3.1.2.10 Keep Alive
246 packet.append(value: m_clientPrivate->m_keepAlive);
247
248 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
249 packet.appendRaw(data: writeConnectProperties());
250
251 // 3.1.3 Payload
252 // 3.1.3.1 Client Identifier
253 const QByteArray clientStringArray = m_clientPrivate->m_clientId.toUtf8();
254 if (clientStringArray.size()) {
255 packet.append(data: clientStringArray);
256 } else {
257 packet.append(value: char(0));
258 packet.append(value: char(0));
259 }
260
261 if (!m_clientPrivate->m_willTopic.isEmpty()) {
262 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
263 packet.appendRaw(data: writeLastWillProperties());
264
265 packet.append(data: m_clientPrivate->m_willTopic.toUtf8());
266 packet.append(data: m_clientPrivate->m_willMessage);
267 }
268
269 if (m_clientPrivate->m_username.size())
270 packet.append(data: m_clientPrivate->m_username.toUtf8());
271
272 if (m_clientPrivate->m_password.size())
273 packet.append(data: m_clientPrivate->m_password.toUtf8());
274
275 m_internalState = BrokerWaitForConnectAck;
276 m_missingData = 0;
277
278 if (!writePacketToTransport(p: packet)) {
279 qCDebug(lcMqttConnection) << "Could not write CONNECT frame to transport.";
280 return false;
281 }
282 return true;
283}
284
285bool QMqttConnection::sendControlAuthenticate(const QMqttAuthenticationProperties &properties)
286{
287 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
288
289 QMqttControlPacket packet(QMqttControlPacket::AUTH);
290
291 switch (m_internalState) {
292 case BrokerDisconnected:
293 case BrokerConnecting:
294 case ClientDestruction:
295 qCDebug(lcMqttConnection) << "Using AUTH while disconnected.";
296 return false;
297 case BrokerWaitForConnectAck:
298 qCDebug(lcMqttConnection) << "AUTH while connecting, set continuation flag.";
299 packet.append(value: char(QMqtt::ReasonCode::ContinueAuthentication));
300 break;
301 case BrokerConnected:
302 qCDebug(lcMqttConnection) << "AUTH while connected, initiate re-authentication.";
303 packet.append(value: char(QMqtt::ReasonCode::ReAuthenticate));
304 break;
305 }
306
307 packet.appendRaw(data: writeAuthenticationProperties(properties));
308
309 if (!writePacketToTransport(p: packet)) {
310 qCDebug(lcMqttConnection) << "Could not write AUTH frame to transport.";
311 return false;
312 }
313
314 return true;
315}
316
317qint32 QMqttConnection::sendControlPublish(const QMqttTopicName &topic,
318 const QByteArray &message,
319 quint8 qos,
320 bool retain,
321 const QMqttPublishProperties &properties)
322{
323 qCDebug(lcMqttConnection) << Q_FUNC_INFO << topic << " Size:" << message.size() << " bytes."
324 << "QoS:" << qos << " Retain:" << retain;
325
326 if (!topic.isValid())
327 return -1;
328
329 quint8 header = QMqttControlPacket::PUBLISH;
330 if (qos == 1)
331 header |= 0x02;
332 else if (qos == 2)
333 header |= 0x04;
334
335 if (retain)
336 header |= 0x01;
337
338 QSharedPointer<QMqttControlPacket> packet(new QMqttControlPacket(header));
339 // topic alias
340 QMqttPublishProperties publishProperties(properties);
341 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
342 // 3.3.4 A PUBLISH packet sent from a Client to a Server MUST NOT contain a Subscription Identifier
343 if (publishProperties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) {
344 qCWarning(lcMqttConnection) << "SubscriptionIdentifier must not be specified for publish.";
345 return -1;
346 }
347
348 const quint16 topicAlias = publishProperties.topicAlias();
349 if (topicAlias > 0) { // User specified topic Alias
350 if (topicAlias > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) {
351 qCDebug(lcMqttConnection) << "TopicAlias publish: overflow.";
352 return -1;
353 }
354 if (m_publishAliases.at(i: topicAlias - 1) != topic) {
355 qCDebug(lcMqttConnection) << "TopicAlias publish: Assign:" << topicAlias << ":" << topic;
356 m_publishAliases[topicAlias - 1] = topic;
357 packet->append(data: topic.name().toUtf8());
358 } else {
359 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Reuse:" << topicAlias;
360 packet->append(value: quint16(0));
361 }
362 } else if (m_publishAliases.size() > 0) { // Automatic module alias assignment
363 int autoAlias = m_publishAliases.indexOf(t: topic);
364 if (autoAlias != -1) {
365 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: Use auto alias:" << autoAlias;
366 packet->append(value: quint16(0));
367 publishProperties.setTopicAlias(quint16(autoAlias + 1));
368 } else {
369 autoAlias = m_publishAliases.indexOf(t: QMqttTopicName());
370 if (autoAlias != -1) {
371 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: auto alias assignment:" << autoAlias;
372 m_publishAliases[autoAlias] = topic;
373 publishProperties.setTopicAlias(quint16(autoAlias) + 1);
374 } else
375 qCDebug(lcMqttConnectionVerbose) << "TopicAlias publish: alias storage full, using full topic";
376 packet->append(data: topic.name().toUtf8());
377 }
378 } else {
379 packet->append(data: topic.name().toUtf8());
380 }
381 } else { // ! MQTT_5_0
382 packet->append(data: topic.name().toUtf8());
383 }
384 quint16 identifier = 0;
385 if (qos > 0) {
386 identifier = unusedPacketIdentifier();
387 packet->append(value: identifier);
388 m_pendingMessages.insert(key: identifier, value: packet);
389 }
390
391 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
392 packet->appendRaw(data: writePublishProperties(properties: publishProperties));
393
394 packet->appendRaw(data: message);
395
396 const bool written = writePacketToTransport(p: *packet.data());
397
398 if (!written && qos > 0)
399 m_pendingMessages.remove(key: identifier);
400 return written ? identifier : -1;
401}
402
403bool QMqttConnection::sendControlPublishAcknowledge(quint16 id)
404{
405 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
406 QMqttControlPacket packet(QMqttControlPacket::PUBACK);
407 packet.append(value: id);
408 return writePacketToTransport(p: packet);
409}
410
411bool QMqttConnection::sendControlPublishRelease(quint16 id)
412{
413 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
414 quint8 header = QMqttControlPacket::PUBREL;
415 header |= 0x02; // MQTT-3.6.1-1
416
417 QMqttControlPacket packet(header);
418 packet.append(value: id);
419 return writePacketToTransport(p: packet);
420}
421
422bool QMqttConnection::sendControlPublishReceive(quint16 id)
423{
424 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
425 QMqttControlPacket packet(QMqttControlPacket::PUBREC);
426 packet.append(value: id);
427 return writePacketToTransport(p: packet);
428}
429
430bool QMqttConnection::sendControlPublishComp(quint16 id)
431{
432 qCDebug(lcMqttConnection) << Q_FUNC_INFO << id;
433 QMqttControlPacket packet(QMqttControlPacket::PUBCOMP);
434 packet.append(value: id);
435 return writePacketToTransport(p: packet);
436}
437
438QMqttSubscription *QMqttConnection::sendControlSubscribe(const QMqttTopicFilter &topic,
439 quint8 qos,
440 const QMqttSubscriptionProperties &properties)
441{
442 qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic << " qos:" << qos;
443
444 // Overflow protection
445 if (Q_UNLIKELY(!topic.isValid())) {
446 qCWarning(lcMqttConnection) << "Invalid subscription topic filter.";
447 return nullptr;
448 }
449
450 if (Q_UNLIKELY(qos > 2)) {
451 qCWarning(lcMqttConnection) << "Invalid subscription QoS.";
452 return nullptr;
453 }
454
455 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
456 const QString sharedSubscriptionName = topic.sharedSubscriptionName();
457 if (!sharedSubscriptionName.isEmpty()) {
458 const QMqttTopicFilter filter(topic.filter().section(asep: QLatin1Char('/'), astart: 2));
459 auto it = m_activeSubscriptions.constFind(key: filter);
460 if (it != m_activeSubscriptions.cend() && (*it)->sharedSubscriptionName() == sharedSubscriptionName)
461 return *it;
462 } else {
463 auto it = m_activeSubscriptions.constFind(key: topic);
464 if (it != m_activeSubscriptions.cend() && !(*it)->isSharedSubscription())
465 return *it;
466 }
467 } else {
468 auto it = m_activeSubscriptions.constFind(key: topic);
469 if (it != m_activeSubscriptions.cend())
470 return *it;
471 }
472
473 // has to have 0010 as bits 3-0, maybe update SUBSCRIBE instead?
474 // MQTT-3.8.1-1
475 const quint8 header = QMqttControlPacket::SUBSCRIBE + 0x02;
476 QMqttControlPacket packet(header);
477
478 // Add Packet Identifier
479 const quint16 identifier = unusedPacketIdentifier();
480
481 packet.append(value: identifier);
482
483 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
484 packet.appendRaw(data: writeSubscriptionProperties(properties));
485
486 packet.append(data: topic.filter().toUtf8());
487 char options = char(qos);
488 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && properties.noLocal())
489 options |= 1 << 2;
490 packet.append(value: options);
491
492 auto result = new QMqttSubscription(this);
493 result->setTopic(topic);
494 result->setClient(m_clientPrivate->m_client);
495 result->setQos(qos);
496 result->setState(QMqttSubscription::SubscriptionPending);
497 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && !topic.sharedSubscriptionName().isEmpty()) {
498 result->setSharedSubscriptionName(topic.sharedSubscriptionName());
499 result->setSharedSubscription(true);
500 result->setTopic(topic.filter().section(asep: QLatin1Char('/'), astart: 2));
501 }
502
503 if (!writePacketToTransport(p: packet)) {
504 delete result;
505 return nullptr;
506 }
507
508 // SUBACK must contain identifier MQTT-3.8.4-2
509 m_pendingSubscriptionAck.insert(key: identifier, value: result);
510 m_activeSubscriptions.insert(key: result->topic(), value: result);
511 return result;
512}
513
514bool QMqttConnection::sendControlUnsubscribe(const QMqttTopicFilter &topic, const QMqttUnsubscriptionProperties &properties)
515{
516 qCDebug(lcMqttConnection) << Q_FUNC_INFO << " Topic:" << topic;
517
518 // MQTT-3.10.3-2
519 if (!topic.isValid())
520 return false;
521
522 if (!m_activeSubscriptions.contains(key: topic))
523 return false;
524
525 if (m_internalState != QMqttConnection::BrokerConnected) {
526 m_activeSubscriptions.remove(key: topic);
527 return true;
528 }
529
530 // has to have 0010 as bits 3-0, maybe update UNSUBSCRIBE instead?
531 // MQTT-3.10.1-1
532 const quint8 header = QMqttControlPacket::UNSUBSCRIBE + 0x02;
533 QMqttControlPacket packet(header);
534
535 // Add Packet Identifier
536 const quint16 identifier = unusedPacketIdentifier();
537
538 packet.append(value: identifier);
539
540 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
541 packet.appendRaw(data: writeUnsubscriptionProperties(properties));
542 }
543
544 packet.append(data: topic.filter().toUtf8());
545 auto sub = m_activeSubscriptions[topic];
546 sub->setState(QMqttSubscription::UnsubscriptionPending);
547
548 if (!writePacketToTransport(p: packet))
549 return false;
550
551 // Do not remove from m_activeSubscriptions as there might be QoS1/2 messages to still
552 // be sent before UNSUBSCRIBE is acknowledged.
553 m_pendingUnsubscriptions.insert(key: identifier, value: sub);
554
555 return true;
556}
557
558bool QMqttConnection::sendControlPingRequest(bool isAuto)
559{
560 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
561
562 if (m_internalState != QMqttConnection::BrokerConnected)
563 return false;
564
565
566 if (!isAuto && m_clientPrivate->m_autoKeepAlive) {
567 qCDebug(lcMqttConnection) << "Requesting a manual ping while autoKeepAlive is enabled "
568 << "is not allowed.";
569 return false;
570 }
571
572 // 3.1.2.10 If a Client does not receive a PINGRESP packet within a reasonable amount of time
573 // after it has sent a PINGREQ, it SHOULD close the Network Connection to the Server
574 if (m_pingTimeout > 1) {
575 closeConnection(error: QMqttClient::ServerUnavailable);
576 return false;
577 }
578
579 const QMqttControlPacket packet(QMqttControlPacket::PINGREQ);
580 if (!writePacketToTransport(p: packet)) {
581 qCDebug(lcMqttConnection) << "Failed to write PINGREQ to transport.";
582 return false;
583 }
584 m_pingTimeout++;
585 return true;
586}
587
588bool QMqttConnection::sendControlDisconnect()
589{
590 qCDebug(lcMqttConnection) << Q_FUNC_INFO;
591
592 m_pingTimer.stop();
593 m_pingTimeout = 0;
594
595 m_activeSubscriptions.clear();
596
597 m_receiveAliases.clear();
598 m_publishAliases.clear();
599
600 const QMqttControlPacket packet(QMqttControlPacket::DISCONNECT);
601 if (!writePacketToTransport(p: packet)) {
602 qCDebug(lcMqttConnection) << "Failed to write DISCONNECT to transport.";
603 return false;
604 }
605 if (m_internalState != ClientDestruction)
606 m_internalState = BrokerDisconnected;
607
608 if (m_transport->waitForBytesWritten(msecs: 30000)) {
609 // MQTT-3.14.4-1 must disconnect
610 m_transport->close();
611 return true;
612 }
613 return false;
614}
615
616void QMqttConnection::setClientPrivate(QMqttClientPrivate *clientPrivate)
617{
618 m_clientPrivate = clientPrivate;
619}
620
621quint16 QMqttConnection::unusedPacketIdentifier() const
622{
623 // MQTT-2.3.1-1 Control Packets MUST contain a non-zero 16-bit Packet Identifier
624 static quint16 packetIdentifierCounter = 1;
625 const std::uint16_t u16max = std::numeric_limits<std::uint16_t>::max();
626
627 // MQTT-2.3.1-2 ...it MUST assign it a currently unused Packet Identifier
628 const quint16 lastIdentifier = packetIdentifierCounter;
629 do {
630 if (packetIdentifierCounter == u16max)
631 packetIdentifierCounter = 1;
632 else
633 packetIdentifierCounter++;
634
635 if (lastIdentifier == packetIdentifierCounter) {
636 qCDebug(lcMqttConnection) << "Could not generate unique packet identifier.";
637 break;
638 }
639 } while (m_pendingSubscriptionAck.contains(key: packetIdentifierCounter)
640 || m_pendingUnsubscriptions.contains(key: packetIdentifierCounter)
641 || m_pendingMessages.contains(key: packetIdentifierCounter)
642 || m_pendingReleaseMessages.contains(key: packetIdentifierCounter));
643 return packetIdentifierCounter;
644}
645
646void QMqttConnection::cleanSubscriptions()
647{
648 for (auto item : m_pendingSubscriptionAck)
649 item->setState(QMqttSubscription::Unsubscribed);
650 m_pendingSubscriptionAck.clear();
651
652 for (auto item : m_pendingUnsubscriptions)
653 item->setState(QMqttSubscription::Unsubscribed);
654 m_pendingUnsubscriptions.clear();
655
656 for (auto item : m_activeSubscriptions)
657 item->setState(QMqttSubscription::Unsubscribed);
658 m_activeSubscriptions.clear();
659}
660
661void QMqttConnection::transportConnectionEstablished()
662{
663 if (m_internalState != BrokerConnecting) {
664 qCWarning(lcMqttConnection) << "Connection established at an unexpected time";
665 return;
666 }
667
668 if (!sendControlConnect()) {
669 qCDebug(lcMqttConnection) << "Failed to write CONNECT to transport.";
670 // ### Who disconnects now? Connection or client?
671 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid);
672 }
673}
674
675void QMqttConnection::transportConnectionClosed()
676{
677 m_readBuffer.clear();
678 m_readPosition = 0;
679 m_pingTimer.stop();
680 m_pingTimeout = 0;
681 if (m_internalState == ClientDestruction)
682 return;
683 if (m_internalState == BrokerDisconnected) // We manually disconnected
684 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::NoError);
685 else
686 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: QMqttClient::TransportInvalid);
687}
688
689void QMqttConnection::transportReadyRead()
690{
691 qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO;
692 m_readBuffer.append(a: m_transport->readAll());
693 processData();
694}
695
696void QMqttConnection::transportError(QAbstractSocket::SocketError e)
697{
698 qCDebug(lcMqttConnection) << Q_FUNC_INFO << e;
699 closeConnection(error: QMqttClient::TransportInvalid);
700}
701
702bool QMqttConnection::readBuffer(char *data, quint64 size)
703{
704 if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) {
705 qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation";
706 closeConnection(error: QMqttClient::ProtocolViolation);
707 return false;
708 }
709 memcpy(dest: data, src: m_readBuffer.constData() + m_readPosition, n: size);
710 m_readPosition += size;
711 return true;
712}
713
714qint32 QMqttConnection::readVariableByteInteger(qint64 *dataSize)
715{
716 quint32 multiplier = 1;
717 qint32 msgLength = 0;
718 quint8 b = 0;
719 quint8 iteration = 0;
720 do {
721 b = readBufferTyped<quint8>(dataSize);
722 msgLength += (b & 127) * multiplier;
723 multiplier *= 128;
724 iteration++;
725 if (iteration > 4) {
726 qCDebug(lcMqttConnection) << "Overflow trying to read variable integer.";
727 closeConnection(error: QMqttClient::ProtocolViolation);
728 return -1;
729 }
730 } while ((b & 128) != 0);
731 return msgLength;
732}
733
734void QMqttConnection::closeConnection(QMqttClient::ClientError error)
735{
736 m_readBuffer.clear();
737 m_readPosition = 0;
738 m_pingTimer.stop();
739 m_pingTimeout = 0;
740 m_activeSubscriptions.clear();
741 m_internalState = BrokerDisconnected;
742 m_transport->disconnect();
743 m_transport->close();
744 m_clientPrivate->setStateAndError(s: QMqttClient::Disconnected, e: error);
745}
746
747QByteArray QMqttConnection::readBuffer(quint64 size)
748{
749 if (Q_UNLIKELY(quint64(m_readBuffer.size() - m_readPosition) < size)) {
750 qCDebug(lcMqttConnection) << "Reaching out of buffer, protocol violation";
751 closeConnection(error: QMqttClient::ProtocolViolation);
752 return QByteArray();
753 }
754 QByteArray res(m_readBuffer.constData() + m_readPosition, int(size));
755 m_readPosition += size;
756 return res;
757}
758
759void QMqttConnection::readAuthProperties(QMqttAuthenticationProperties &properties)
760{
761 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
762 m_missingData -= propertyLength;
763
764 QMqttUserProperties userProperties;
765 while (propertyLength > 0) {
766 quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
767
768 switch (propertyId) {
769 case 0x15: { //3.15.2.2.2 Authentication Method
770 const QString method = readBufferTyped<QString>(dataSize: &propertyLength);
771 properties.setAuthenticationMethod(method);
772 break;
773 }
774 case 0x16: { // 3.15.2.2.3 Authentication Data
775 const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength);
776 properties.setAuthenticationData(data);
777 break;
778 }
779 case 0x1F: { // 3.15.2.2.4 Reason String
780 const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength);
781 properties.setReason(reasonString);
782 break;
783 }
784 case 0x26: { // 3.15.2.2.5 User property
785 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
786 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
787
788 userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
789 break;
790 }
791 default:
792 qCDebug(lcMqttConnection) << "Unknown property id in AUTH:" << propertyId;
793 break;
794 }
795 }
796 if (!userProperties.isEmpty())
797 properties.setUserProperties(userProperties);
798}
799
800void QMqttConnection::readConnackProperties(QMqttServerConnectionProperties &properties)
801{
802 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
803 m_missingData -= propertyLength;
804
805 properties.serverData->valid = true;
806
807 while (propertyLength > 0) {
808 quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
809 switch (propertyId) {
810 case 0x11: { // 3.2.2.3.2 Session Expiry Interval
811 const quint32 expiryInterval = readBufferTyped<quint32>(dataSize: &propertyLength);
812 properties.serverData->details |= QMqttServerConnectionProperties::SessionExpiryInterval;
813 properties.setSessionExpiryInterval(expiryInterval);
814 break;
815 }
816 case 0x21: { // 3.2.2.3.3 Receive Maximum
817 const quint16 receiveMaximum = readBufferTyped<quint16>(dataSize: &propertyLength);
818 properties.serverData->details |= QMqttServerConnectionProperties::MaximumReceive;
819 properties.setMaximumReceive(receiveMaximum);
820 break;
821 }
822 case 0x24: { // 3.2.2.3.4 Maximum QoS Level
823 const quint8 maxQoS = readBufferTyped<quint8>(dataSize: &propertyLength);
824 properties.serverData->details |= QMqttServerConnectionProperties::MaximumQoS;
825 properties.serverData->maximumQoS = maxQoS;
826 break;
827 }
828 case 0x25: { // 3.2.2.3.5 Retain available
829 const quint8 retainAvailable = readBufferTyped<quint8>(dataSize: &propertyLength);
830 properties.serverData->details |= QMqttServerConnectionProperties::RetainAvailable;
831 properties.serverData->retainAvailable = retainAvailable == 1;
832 break;
833 }
834 case 0x27: { // 3.2.2.3.6 Maximum packet size
835 const quint32 maxPacketSize = readBufferTyped<quint32>(dataSize: &propertyLength);
836 properties.serverData->details |= QMqttServerConnectionProperties::MaximumPacketSize;
837 properties.setMaximumPacketSize(maxPacketSize);
838 break;
839 }
840 case 0x12: { // 3.2.2.3.7 Assigned clientId
841 const QString assignedClientId = readBufferTyped<QString>(dataSize: &propertyLength);
842 properties.serverData->details |= QMqttServerConnectionProperties::AssignedClientId;
843 m_clientPrivate->setClientId(assignedClientId);
844 break;
845 }
846 case 0x22: { // 3.2.2.3.8 Topic Alias Maximum
847 const quint16 topicAliasMaximum = readBufferTyped<quint16>(dataSize: &propertyLength);
848 properties.serverData->details |= QMqttServerConnectionProperties::MaximumTopicAlias;
849 properties.setMaximumTopicAlias(topicAliasMaximum);
850 break;
851 }
852 case 0x1F: { // 3.2.2.3.9 Reason String
853 const QString reasonString = readBufferTyped<QString>(dataSize: &propertyLength);
854 properties.serverData->details |= QMqttServerConnectionProperties::ReasonString;
855 properties.serverData->reasonString = reasonString;
856 break;
857 }
858 case 0x26: { // 3.2.2.3.10 User property
859 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
860 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
861
862 properties.serverData->details |= QMqttServerConnectionProperties::UserProperty;
863 properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
864 break;
865 }
866 case 0x28: { // 3.2.2.3.11 Wildcard subscriptions available
867 const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength);
868 properties.serverData->details |= QMqttServerConnectionProperties::WildCardSupported;
869 properties.serverData->wildcardSupported = available == 1;
870 break;
871 }
872 case 0x29: { // 3.2.2.3.12 Subscription identifiers available
873 const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength);
874 properties.serverData->details |= QMqttServerConnectionProperties::SubscriptionIdentifierSupport;
875 properties.serverData->subscriptionIdentifierSupported = available == 1;
876 break;
877 }
878 case 0x2A: { // 3.2.2.3.13 Shared subscriptions available
879 const quint8 available = readBufferTyped<quint8>(dataSize: &propertyLength);
880 properties.serverData->details |= QMqttServerConnectionProperties::SharedSubscriptionSupport;
881 properties.serverData->sharedSubscriptionSupported = available == 1;
882 break;
883 }
884 case 0x13: { // 3.2.2.3.14 Server Keep Alive
885 const quint16 serverKeepAlive = readBufferTyped<quint16>(dataSize: &propertyLength);
886 properties.serverData->details |= QMqttServerConnectionProperties::ServerKeepAlive;
887 m_clientPrivate->m_client->setKeepAlive(serverKeepAlive);
888 break;
889 }
890 case 0x1A: { // 3.2.2.3.15 Response information
891 const QString responseInfo = readBufferTyped<QString>(dataSize: &propertyLength);
892 properties.serverData->details |= QMqttServerConnectionProperties::ResponseInformation;
893 properties.serverData->responseInformation = responseInfo;
894 break;
895 }
896 case 0x1C: { // 3.2.2.3.16 Server reference
897 const QString serverReference = readBufferTyped<QString>(dataSize: &propertyLength);
898 properties.serverData->details |= QMqttServerConnectionProperties::ServerReference;
899 properties.serverData->serverReference = serverReference;
900 break;
901 }
902 case 0x15: { // 3.2.2.3.17 Authentication method
903 const QString method = readBufferTyped<QString>(dataSize: &propertyLength);
904 properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationMethod;
905 properties.data->authenticationMethod = method;
906 break;
907 }
908 case 0x16: { // 3.2.2.3.18 Authentication data
909 const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength);
910 properties.serverData->details |= QMqttServerConnectionProperties::AuthenticationData;
911 properties.data->authenticationData = data;
912 break;
913 }
914 default:
915 qCDebug(lcMqttConnection) << "Unknown property id in CONNACK:" << int(propertyId);
916 break;
917 }
918 }
919}
920
921void QMqttConnection::readMessageStatusProperties(QMqttMessageStatusProperties &properties)
922{
923 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
924 m_missingData -= propertyLength;
925
926 while (propertyLength > 0) {
927 const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
928 switch (propertyId) {
929 case 0x1f: { // 3.4.2.2.2 Reason String
930 const QString content = readBufferTyped<QString>(dataSize: &propertyLength);
931 properties.data->reasonString = content;
932 break;
933 }
934 case 0x26: { // 3.4.2.2.3 User Properites
935 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
936 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
937 properties.data->userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
938 break;
939 }
940 default:
941 qCDebug(lcMqttConnection) << "Unknown subscription property received.";
942 break;
943 }
944 }
945}
946
947void QMqttConnection::readPublishProperties(QMqttPublishProperties &properties)
948{
949 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
950 m_missingData -= propertyLength;
951
952 QMqttUserProperties userProperties;
953 QList<quint32> subscriptionIds;
954
955 while (propertyLength > 0) {
956 const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
957 switch (propertyId) {
958 case 0x01: { // 3.3.2.3.2 Payload Format Indicator
959 const quint8 format = readBufferTyped<quint8>(dataSize: &propertyLength);
960 if (format == 1)
961 properties.setPayloadFormatIndicator(QMqtt::PayloadFormatIndicator::UTF8Encoded);
962 break;
963 }
964 case 0x02: { // 3.3.2.3.3 Message Expiry Interval
965 const quint32 interval = readBufferTyped<quint32>(dataSize: &propertyLength);
966 properties.setMessageExpiryInterval(interval);
967 break;
968 }
969 case 0x23: { // 3.3.2.3.4 Topic alias
970 const quint16 alias = readBufferTyped<quint16>(dataSize: &propertyLength);
971 properties.setTopicAlias(alias);
972 break;
973 }
974 case 0x08: { // 3.3.2.3.5 Response Topic
975 const QString responseTopic = readBufferTyped<QString>(dataSize: &propertyLength);
976 properties.setResponseTopic(responseTopic);
977 break;
978 }
979 case 0x09: { // 3.3.2.3.6 Correlation Data
980 const QByteArray data = readBufferTyped<QByteArray>(dataSize: &propertyLength);
981 properties.setCorrelationData(data);
982 break;
983 }
984 case 0x26: { // 3.3.2.3.7 User property
985 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
986 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
987 userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
988 break;
989 }
990 case 0x0b: { // 3.3.2.3.8 Subscription Identifier
991 qint32 id = readVariableByteInteger(dataSize: &propertyLength);
992 if (id < 0)
993 return; // readVariableByteInteger closes connection
994 subscriptionIds.append(t: quint32(id));
995 break;
996 }
997 case 0x03: { // 3.3.2.3.9 Content Type
998 const QString content = readBufferTyped<QString>(dataSize: &propertyLength);
999 properties.setContentType(content);
1000 break;
1001 }
1002 default:
1003 qCDebug(lcMqttConnection) << "Unknown publish property received.";
1004 break;
1005 }
1006 }
1007 if (!userProperties.isEmpty())
1008 properties.setUserProperties(userProperties);
1009
1010 if (!subscriptionIds.isEmpty())
1011 properties.setSubscriptionIdentifiers(subscriptionIds);
1012}
1013
1014void QMqttConnection::readSubscriptionProperties(QMqttSubscription *sub)
1015{
1016 qint64 propertyLength = readVariableByteInteger(dataSize: &m_missingData);
1017 m_missingData -= propertyLength;
1018
1019 while (propertyLength > 0) {
1020 const quint8 propertyId = readBufferTyped<quint8>(dataSize: &propertyLength);
1021 switch (propertyId) {
1022 case 0x1f: { // 3.9.2.1.2 Reason String
1023 const QString content = readBufferTyped<QString>(dataSize: &propertyLength);
1024 sub->d_func()->m_reasonString = content;
1025 break;
1026 }
1027 case 0x26: { // 3.9.2.1.3
1028 const QString propertyName = readBufferTyped<QString>(dataSize: &propertyLength);
1029 const QString propertyValue = readBufferTyped<QString>(dataSize: &propertyLength);
1030
1031 sub->d_func()->m_userProperties.append(t: QMqttStringPair(propertyName, propertyValue));
1032 break;
1033 }
1034 default:
1035 qCDebug(lcMqttConnection) << "Unknown subscription property received.";
1036 break;
1037 }
1038 }
1039}
1040
1041QByteArray QMqttConnection::writeConnectProperties()
1042{
1043 QMqttControlPacket properties;
1044
1045 // According to MQTT5 3.1.2.11 default values do not need to be included in the
1046 // connect statement.
1047
1048 // 3.1.2.11.2
1049 if (m_clientPrivate->m_connectionProperties.sessionExpiryInterval() != 0) {
1050 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify sessionExpiryInterval";
1051 properties.append(value: char(0x11));
1052 properties.append(value: m_clientPrivate->m_connectionProperties.sessionExpiryInterval());
1053 }
1054
1055 // 3.1.2.11.3
1056 if (m_clientPrivate->m_connectionProperties.maximumReceive() != 65535) {
1057 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumReceive";
1058 properties.append(value: char(0x21));
1059 properties.append(value: m_clientPrivate->m_connectionProperties.maximumReceive());
1060 }
1061
1062 // 3.1.2.11.4
1063 if (m_clientPrivate->m_connectionProperties.maximumPacketSize() != std::numeric_limits<quint32>::max()) {
1064 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumPacketSize";
1065 properties.append(value: char(0x27));
1066 properties.append(value: m_clientPrivate->m_connectionProperties.maximumPacketSize());
1067 }
1068
1069 // 3.1.2.11.5
1070 if (m_clientPrivate->m_connectionProperties.maximumTopicAlias() != 0) {
1071 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify maximumTopicAlias";
1072 properties.append(value: char(0x22));
1073 properties.append(value: m_clientPrivate->m_connectionProperties.maximumTopicAlias());
1074 }
1075
1076 // 3.1.2.11.6
1077 if (m_clientPrivate->m_connectionProperties.requestResponseInformation()) {
1078 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestResponseInformation";
1079 properties.append(value: char(0x19));
1080 properties.append(value: char(1));
1081 }
1082
1083 // 3.1.2.11.7
1084 if (!m_clientPrivate->m_connectionProperties.requestProblemInformation()) {
1085 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify requestProblemInformation";
1086 properties.append(value: char(0x17));
1087 properties.append(value: char(0));
1088 }
1089
1090 // 3.1.2.11.8 Add User properties
1091 auto userProperties = m_clientPrivate->m_connectionProperties.userProperties();
1092 if (!userProperties.isEmpty()) {
1093 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify user properties";
1094 for (const auto &prop : userProperties) {
1095 properties.append(value: char(0x26));
1096 properties.append(data: prop.name().toUtf8());
1097 properties.append(data: prop.value().toUtf8());
1098 }
1099 }
1100
1101 // 3.1.2.11.9 Add Authentication
1102 const QString authenticationMethod = m_clientPrivate->m_connectionProperties.authenticationMethod();
1103 if (!authenticationMethod.isEmpty()) {
1104 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: specify AuthenticationMethod:";
1105 qCDebug(lcMqttConnectionVerbose) << " " << authenticationMethod;
1106 properties.append(value: char(0x15));
1107 properties.append(data: authenticationMethod.toUtf8());
1108 // 3.1.2.11.10
1109 const QByteArray authenticationData = m_clientPrivate->m_connectionProperties.authenticationData();
1110 if (!authenticationData.isEmpty()) {
1111 qCDebug(lcMqttConnectionVerbose) << "Connection Properties: Authentication Data:";
1112 qCDebug(lcMqttConnectionVerbose) << " " << authenticationData;
1113 properties.append(value: char(0x16));
1114 properties.append(data: authenticationData);
1115 }
1116 }
1117
1118 return properties.serializePayload();
1119}
1120
1121QByteArray QMqttConnection::writeLastWillProperties() const
1122{
1123 QMqttControlPacket properties;
1124 const QMqttLastWillProperties &lastWillProperties = m_clientPrivate->m_lastWillProperties;
1125 // Will Delay interval 3.1.3.2.2
1126 if (lastWillProperties.willDelayInterval() > 0) {
1127 const quint32 delay = lastWillProperties.willDelayInterval();
1128 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify will delay interval:"
1129 << delay;
1130 properties.append(value: char(0x18));
1131 properties.append(value: delay);
1132 }
1133
1134 // Payload Format Indicator 3.1.3.2.3
1135 if (lastWillProperties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) {
1136 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: payload format indicator specified";
1137 properties.append(value: char(0x01));
1138 properties.append(value: char(0x01)); // UTF8
1139 }
1140
1141 // Message Expiry Interval 3.1.3.2.4
1142 if (lastWillProperties.messageExpiryInterval() > 0) {
1143 const quint32 interval = lastWillProperties.messageExpiryInterval();
1144 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Message Expiry interval:"
1145 << interval;
1146 properties.append(value: char(0x02));
1147 properties.append(value: interval);
1148 }
1149
1150 // Content Type 3.1.3.2.5
1151 if (!lastWillProperties.contentType().isEmpty()) {
1152 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Content Type:"
1153 << lastWillProperties.contentType();
1154 properties.append(value: char(0x03));
1155 properties.append(data: lastWillProperties.contentType().toUtf8());
1156 }
1157
1158 // Response Topic 3.1.3.2.6
1159 if (!lastWillProperties.responseTopic().isEmpty()) {
1160 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Response Topic:"
1161 << lastWillProperties.responseTopic();
1162 properties.append(value: char(0x08));
1163 properties.append(data: lastWillProperties.responseTopic().toUtf8());
1164 }
1165
1166 // Correlation Data 3.1.3.2.7
1167 if (!lastWillProperties.correlationData().isEmpty()) {
1168 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: Correlation Data:"
1169 << lastWillProperties.correlationData();
1170 properties.append(value: char(0x09));
1171 properties.append(data: lastWillProperties.correlationData());
1172 }
1173
1174 // User Properties 3.1.3.2.8
1175 if (!lastWillProperties.userProperties().isEmpty()) {
1176 auto userProperties = lastWillProperties.userProperties();
1177 qCDebug(lcMqttConnectionVerbose) << "Last Will Properties: specify user properties";
1178 for (const auto &prop : userProperties) {
1179 properties.append(value: char(0x26));
1180 properties.append(data: prop.name().toUtf8());
1181 properties.append(data: prop.value().toUtf8());
1182 }
1183 }
1184
1185 return properties.serializePayload();
1186}
1187
1188QByteArray QMqttConnection::writePublishProperties(const QMqttPublishProperties &properties)
1189{
1190 QMqttControlPacket packet;
1191
1192 // 3.3.2.3.2 Payload Indicator
1193 if (properties.availableProperties() & QMqttPublishProperties::PayloadFormatIndicator &&
1194 properties.payloadFormatIndicator() != QMqtt::PayloadFormatIndicator::Unspecified) {
1195 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Payload Indicator:"
1196 << (properties.payloadFormatIndicator() == QMqtt::PayloadFormatIndicator::UTF8Encoded ? 1 : 0);
1197 packet.append(value: char(0x01));
1198 switch (properties.payloadFormatIndicator()) {
1199 case QMqtt::PayloadFormatIndicator::UTF8Encoded:
1200 packet.append(value: char(0x01));
1201 break;
1202 default:
1203 qCDebug(lcMqttConnection) << "Unknown payload indicator.";
1204 break;
1205 }
1206 }
1207
1208 // 3.3.2.3.3 Message Expiry
1209 if (properties.availableProperties() & QMqttPublishProperties::MessageExpiryInterval &&
1210 properties.messageExpiryInterval() > 0) {
1211 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Message Expiry :"
1212 << properties.messageExpiryInterval();
1213 packet.append(value: char(0x02));
1214 packet.append(value: properties.messageExpiryInterval());
1215 }
1216
1217 // 3.3.2.3.4 Topic alias
1218 if (properties.availableProperties() & QMqttPublishProperties::TopicAlias &&
1219 properties.topicAlias() > 0) {
1220 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Topic Alias :"
1221 << properties.topicAlias();
1222 if (m_clientPrivate->m_serverConnectionProperties.availableProperties() & QMqttServerConnectionProperties::MaximumTopicAlias
1223 && properties.topicAlias() > m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias()) {
1224 qCDebug(lcMqttConnection) << "Invalid topic alias specified: " << properties.topicAlias()
1225 << " Maximum by server is:"
1226 << m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias();
1227
1228 } else {
1229 packet.append(value: char(0x23));
1230 packet.append(value: properties.topicAlias());
1231 }
1232 }
1233
1234 // 3.3.2.3.5 Response Topic
1235 if (properties.availableProperties() & QMqttPublishProperties::ResponseTopic &&
1236 !properties.responseTopic().isEmpty()) {
1237 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Response Topic :"
1238 << properties.responseTopic();
1239 packet.append(value: char(0x08));
1240 packet.append(data: properties.responseTopic().toUtf8());
1241 }
1242
1243 // 3.3.2.3.6 Correlation Data
1244 if (properties.availableProperties() & QMqttPublishProperties::CorrelationData &&
1245 !properties.correlationData().isEmpty()) {
1246 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Correlation Data :"
1247 << properties.correlationData();
1248 packet.append(value: char(0x09));
1249 packet.append(data: properties.correlationData());
1250 }
1251
1252 // 3.3.2.3.7 User Property
1253 if (properties.availableProperties() & QMqttPublishProperties::UserProperty) {
1254 auto userProperties = properties.userProperties();
1255 if (!userProperties.isEmpty()) {
1256 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: specify user properties";
1257 for (const auto &prop : userProperties) {
1258 packet.append(value: char(0x26));
1259 packet.append(data: prop.name().toUtf8());
1260 packet.append(data: prop.value().toUtf8());
1261 }
1262 }
1263 }
1264
1265 // 3.3.2.3.8 Subscription Identifier
1266 if (properties.availableProperties() & QMqttPublishProperties::SubscriptionIdentifier) {
1267 for (auto id : properties.subscriptionIdentifiers()) {
1268 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Subscription ID:" << id;
1269 packet.append(value: char(0x0b));
1270 packet.appendRawVariableInteger(value: id);
1271 }
1272 }
1273
1274 // 3.3.2.3.9 Content Type
1275 if (properties.availableProperties() & QMqttPublishProperties::ContentType &&
1276 !properties.contentType().isEmpty()) {
1277 qCDebug(lcMqttConnectionVerbose) << "Publish Properties: Content Type :"
1278 << properties.contentType();
1279 packet.append(value: char(0x03));
1280 packet.append(data: properties.contentType().toUtf8());
1281 }
1282
1283 return packet.serializePayload();
1284}
1285
1286QByteArray QMqttConnection::writeSubscriptionProperties(const QMqttSubscriptionProperties &properties)
1287{
1288 QMqttControlPacket packet;
1289
1290 // 3.8.2.1.2 Subscription Identifier
1291 if (properties.subscriptionIdentifier() > 0) {
1292 qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: Subscription Identifier";
1293 packet.append(value: char(0x0b));
1294 packet.appendRawVariableInteger(value: properties.subscriptionIdentifier());
1295 }
1296
1297 // 3.8.2.1.3 User Property
1298 auto userProperties = properties.userProperties();
1299 if (!userProperties.isEmpty()) {
1300 qCDebug(lcMqttConnectionVerbose) << "Subscription Properties: specify user properties";
1301 for (const auto &prop : userProperties) {
1302 packet.append(value: char(0x26));
1303 packet.append(data: prop.name().toUtf8());
1304 packet.append(data: prop.value().toUtf8());
1305 }
1306 }
1307
1308 return packet.serializePayload();
1309}
1310
1311QByteArray QMqttConnection::writeUnsubscriptionProperties(const QMqttUnsubscriptionProperties &properties)
1312{
1313 QMqttControlPacket packet;
1314
1315 // 3.10.2.1.2
1316 auto userProperties = properties.userProperties();
1317 if (!userProperties.isEmpty()) {
1318 qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties";
1319 for (const auto &prop : userProperties) {
1320 packet.append(value: char(0x26));
1321 packet.append(data: prop.name().toUtf8());
1322 packet.append(data: prop.value().toUtf8());
1323 }
1324 }
1325
1326 return packet.serializePayload();
1327}
1328
1329QByteArray QMqttConnection::writeAuthenticationProperties(const QMqttAuthenticationProperties &properties)
1330{
1331 QMqttControlPacket packet;
1332
1333 // 3.15.2.2.2
1334 if (!properties.authenticationMethod().isEmpty()) {
1335 packet.append(value: char(0x15));
1336 packet.append(data: properties.authenticationMethod().toUtf8());
1337 }
1338 // 3.15.2.2.3
1339 if (!properties.authenticationData().isEmpty()) {
1340 packet.append(value: char(0x16));
1341 packet.append(data: properties.authenticationData());
1342 }
1343
1344 // 3.15.2.2.4
1345 if (!properties.reason().isEmpty()) {
1346 packet.append(value: char(0x1F));
1347 packet.append(data: properties.reason().toUtf8());
1348 }
1349
1350 // 3.15.2.2.5
1351 auto userProperties = properties.userProperties();
1352 if (!userProperties.isEmpty()) {
1353 qCDebug(lcMqttConnectionVerbose) << "Unsubscription Properties: specify user properties";
1354 for (const auto &prop : userProperties) {
1355 packet.append(value: char(0x26));
1356 packet.append(data: prop.name().toUtf8());
1357 packet.append(data: prop.value().toUtf8());
1358 }
1359 }
1360
1361 return packet.serializePayload();
1362}
1363
1364void QMqttConnection::finalize_auth()
1365{
1366 qCDebug(lcMqttConnectionVerbose) << "Finalize AUTH";
1367
1368 quint8 authReason = 0;
1369 QMqttAuthenticationProperties authProperties;
1370 // 3.15.2.1 - The Reason Code and Property Length can be omitted if the Reason Code
1371 // is 0x00 (Success) and there are no Properties. In this case the AUTH has a
1372 // Remaining Length of 0.
1373 if (m_missingData > 0) {
1374 authReason = readBufferTyped<quint8>(dataSize: &m_missingData);
1375 readAuthProperties(properties&: authProperties);
1376 }
1377
1378 // 3.15.2.1
1379 switch (QMqtt::ReasonCode(authReason)) {
1380 case QMqtt::ReasonCode::Success:
1381 emit m_clientPrivate->m_client->authenticationFinished(p: authProperties);
1382 break;
1383 case QMqtt::ReasonCode::ContinueAuthentication:
1384 case QMqtt::ReasonCode::ReAuthenticate:
1385 emit m_clientPrivate->m_client->authenticationRequested(p: authProperties);
1386 break;
1387 default:
1388 qCDebug(lcMqttConnection) << "Received illegal AUTH reason code:" << authReason;
1389 closeConnection(error: QMqttClient::ProtocolViolation);
1390 break;
1391 }
1392}
1393
1394void QMqttConnection::finalize_connack()
1395{
1396 qCDebug(lcMqttConnectionVerbose) << "Finalize CONNACK";
1397
1398 const quint8 ackFlags = readBufferTyped<quint8>(dataSize: &m_missingData);
1399
1400 if (ackFlags > 1) { // MQTT-3.2.2.1
1401 qCDebug(lcMqttConnection) << "Unexpected CONNACK Flags specified:" << QString::number(ackFlags);
1402 readBuffer(size: quint64(m_missingData));
1403 m_missingData = 0;
1404 closeConnection(error: QMqttClient::ProtocolViolation);
1405 return;
1406 }
1407 bool sessionPresent = ackFlags == 1;
1408
1409 // MQTT-3.2.2-1 & MQTT-3.2.2-2
1410 if (sessionPresent) {
1411 emit m_clientPrivate->m_client->brokerSessionRestored();
1412 if (m_clientPrivate->m_cleanSession)
1413 qCDebug(lcMqttConnection) << "Connected with a clean session, ack contains session present.";
1414 } else {
1415 // MQTT-4.1.0.-1 MQTT-4.1.0-2 Session not stored on broker side
1416 // regardless whether cleanSession is false
1417 cleanSubscriptions();
1418 }
1419
1420 quint8 connectResultValue = readBufferTyped<quint8>(dataSize: &m_missingData);
1421 QMqttServerConnectionProperties serverProp;
1422 serverProp.serverData->reasonCode = QMqtt::ReasonCode(connectResultValue);
1423 m_clientPrivate->m_serverConnectionProperties = serverProp;
1424 if (connectResultValue != 0 && m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
1425 qCDebug(lcMqttConnection) << "Connection has been rejected.";
1426 closeConnection(error: static_cast<QMqttClient::ClientError>(connectResultValue));
1427 return;
1428 }
1429
1430 // MQTT 5.0 has variable part != 2 in the header
1431 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
1432 readConnackProperties(properties&: m_clientPrivate->m_serverConnectionProperties);
1433 m_receiveAliases.resize(size: m_clientPrivate->m_serverConnectionProperties.maximumTopicAlias());
1434 m_publishAliases.resize(size: m_clientPrivate->m_connectionProperties.maximumTopicAlias());
1435
1436 // 3.2.2.2
1437 switch (QMqtt::ReasonCode(connectResultValue)) {
1438 case QMqtt::ReasonCode::Success:
1439 break;
1440 case QMqtt::ReasonCode::MalformedPacket:
1441 case QMqtt::ReasonCode::ProtocolError:
1442 closeConnection(error: QMqttClient::ProtocolViolation);
1443 return;
1444 case QMqtt::ReasonCode::UnsupportedProtocolVersion:
1445 closeConnection(error: QMqttClient::InvalidProtocolVersion);
1446 return;
1447 case QMqtt::ReasonCode::InvalidClientId:
1448 closeConnection(error: QMqttClient::IdRejected);
1449 return;
1450 case QMqtt::ReasonCode::ServerNotAvailable:
1451 case QMqtt::ReasonCode::ServerBusy:
1452 case QMqtt::ReasonCode::UseAnotherServer:
1453 case QMqtt::ReasonCode::ServerMoved:
1454 closeConnection(error: QMqttClient::ServerUnavailable);
1455 return;
1456 case QMqtt::ReasonCode::InvalidUserNameOrPassword:
1457 closeConnection(error: QMqttClient::BadUsernameOrPassword);
1458 return;
1459 case QMqtt::ReasonCode::NotAuthorized:
1460 closeConnection(error: QMqttClient::NotAuthorized);
1461 return;
1462 case QMqtt::ReasonCode::UnspecifiedError:
1463 closeConnection(error: QMqttClient::UnknownError);
1464 return;
1465 case QMqtt::ReasonCode::ImplementationSpecificError:
1466 case QMqtt::ReasonCode::ClientBanned:
1467 case QMqtt::ReasonCode::InvalidAuthenticationMethod:
1468 case QMqtt::ReasonCode::InvalidTopicName:
1469 case QMqtt::ReasonCode::PacketTooLarge:
1470 case QMqtt::ReasonCode::QuotaExceeded:
1471 case QMqtt::ReasonCode::InvalidPayloadFormat:
1472 case QMqtt::ReasonCode::RetainNotSupported:
1473 case QMqtt::ReasonCode::QoSNotSupported:
1474 case QMqtt::ReasonCode::ExceededConnectionRate:
1475 closeConnection(error: QMqttClient::Mqtt5SpecificError);
1476 return;
1477 default:
1478 qCDebug(lcMqttConnection) << "Received illegal CONNACK reason code:" << connectResultValue;
1479 closeConnection(error: QMqttClient::ProtocolViolation);
1480 return;
1481 }
1482 }
1483
1484 m_internalState = BrokerConnected;
1485 m_clientPrivate->setStateAndError(s: QMqttClient::Connected);
1486
1487 if (m_clientPrivate->m_autoKeepAlive)
1488 m_pingTimer.start(msec: m_clientPrivate->m_keepAlive * 1000, obj: this);
1489}
1490
1491void QMqttConnection::finalize_suback()
1492{
1493 const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1494
1495 auto sub = m_pendingSubscriptionAck.take(key: id);
1496 if (Q_UNLIKELY(sub == nullptr)) {
1497 qCDebug(lcMqttConnection) << "Received SUBACK for unknown subscription request.";
1498 return;
1499 }
1500
1501 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
1502 readSubscriptionProperties(sub);
1503
1504 // 3.9.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the SUBSCRIBE packet being acknowledged.
1505 // Whereas 3.8.3 states "The Payload MUST contain at least one Topic Filter and Subscription Options pair. A SUBSCRIBE packet with no Payload is a Protocol Error."
1506 do {
1507 quint8 reason = readBufferTyped<quint8>(dataSize: &m_missingData);
1508
1509 sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reason);
1510
1511 // 3.9.3
1512 switch (QMqtt::ReasonCode(reason)) {
1513 case QMqtt::ReasonCode::SubscriptionQoSLevel0:
1514 case QMqtt::ReasonCode::SubscriptionQoSLevel1:
1515 case QMqtt::ReasonCode::SubscriptionQoSLevel2:
1516 qCDebug(lcMqttConnectionVerbose) << "Finalize SUBACK: id:" << id << "qos:" << reason;
1517 // The broker might have a different support level for QoS than what
1518 // the client requested
1519 if (reason != sub->qos()) {
1520 sub->setQos(reason);
1521 emit sub->qosChanged(reason);
1522 }
1523 sub->setState(QMqttSubscription::Subscribed);
1524 break;
1525 case QMqtt::ReasonCode::UnspecifiedError:
1526 qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason;
1527 sub->setState(QMqttSubscription::Error);
1528 break;
1529 case QMqtt::ReasonCode::ImplementationSpecificError:
1530 case QMqtt::ReasonCode::NotAuthorized:
1531 case QMqtt::ReasonCode::InvalidTopicFilter:
1532 case QMqtt::ReasonCode::MessageIdInUse:
1533 case QMqtt::ReasonCode::QuotaExceeded:
1534 case QMqtt::ReasonCode::SharedSubscriptionsNotSupported:
1535 case QMqtt::ReasonCode::SubscriptionIdsNotSupported:
1536 case QMqtt::ReasonCode::WildCardSubscriptionsNotSupported:
1537 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
1538 qCWarning(lcMqttConnection) << "Subscription for id " << id << " failed. Reason Code:" << reason;
1539 sub->setState(QMqttSubscription::Error);
1540 break;
1541 }
1542 Q_FALLTHROUGH();
1543 default:
1544 qCWarning(lcMqttConnection) << "Received illegal SUBACK reason code:" << reason;
1545 closeConnection(error: QMqttClient::ProtocolViolation);
1546 break;
1547 }
1548 } while (m_missingData > 0);
1549}
1550
1551void QMqttConnection::finalize_unsuback()
1552{
1553 const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1554 qCDebug(lcMqttConnectionVerbose) << "Finalize UNSUBACK: " << id;
1555
1556 auto sub = m_pendingUnsubscriptions.take(key: id);
1557 if (Q_UNLIKELY(sub == nullptr)) {
1558 qCDebug(lcMqttConnection) << "Received UNSUBACK for unknown request.";
1559 return;
1560 }
1561
1562 m_activeSubscriptions.remove(key: sub->topic());
1563
1564 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0) {
1565 readSubscriptionProperties(sub);
1566 } else {
1567 // 3.11.3 - The UNSUBACK Packet has no payload.
1568 // emulate successful unsubscription
1569 sub->d_func()->m_reasonCode = QMqtt::ReasonCode::Success;
1570 sub->setState(QMqttSubscription::Unsubscribed);
1571 return;
1572 }
1573
1574 // 3.1.3 - The Payload contains a list of Reason Codes. Each Reason Code corresponds to a Topic Filter in the UNSUBSCRIBE packet being acknowledged.
1575 // Whereas 3.10.3 states "The Payload of an UNSUBSCRIBE packet MUST contain at least one Topic Filter. An UNSUBSCRIBE packet with no Payload is a Protocol Error."
1576 do {
1577 const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData);
1578 sub->d_func()->m_reasonCode = QMqtt::ReasonCode(reasonCode);
1579
1580 // 3.11.3
1581 switch (QMqtt::ReasonCode(reasonCode)) {
1582 case QMqtt::ReasonCode::Success:
1583 sub->setState(QMqttSubscription::Unsubscribed);
1584 break;
1585 case QMqtt::ReasonCode::NoSubscriptionExisted:
1586 case QMqtt::ReasonCode::ImplementationSpecificError:
1587 case QMqtt::ReasonCode::NotAuthorized:
1588 case QMqtt::ReasonCode::InvalidTopicFilter:
1589 case QMqtt::ReasonCode::MessageIdInUse:
1590 case QMqtt::ReasonCode::UnspecifiedError:
1591 qCWarning(lcMqttConnection) << "Unsubscription for id " << id << " failed. Reason Code:" << reasonCode;
1592 sub->setState(QMqttSubscription::Error);
1593 break;
1594 default:
1595 qCWarning(lcMqttConnection) << "Received illegal UNSUBACK reason code:" << reasonCode;
1596 closeConnection(error: QMqttClient::ProtocolViolation);
1597 break;
1598 }
1599 } while (m_missingData > 0);
1600}
1601
1602void QMqttConnection::finalize_publish()
1603{
1604 // String topic
1605 QMqttTopicName topic = readBufferTyped<QString>(dataSize: &m_missingData);
1606 const int topicLength = topic.name().size();
1607
1608 quint16 id = 0;
1609 if (m_currentPublish.qos > 0)
1610 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1611
1612 QMqttPublishProperties publishProperties;
1613 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0)
1614 readPublishProperties(properties&: publishProperties);
1615
1616 if (publishProperties.availableProperties() & QMqttPublishProperties::TopicAlias) {
1617 const quint16 topicAlias = publishProperties.topicAlias();
1618 if (topicAlias == 0 || topicAlias > m_clientPrivate->m_connectionProperties.maximumTopicAlias()) {
1619 qCDebug(lcMqttConnection) << "TopicAlias receive: overflow.";
1620 closeConnection(error: QMqttClient::ProtocolViolation);
1621 return;
1622 }
1623 if (topicLength == 0) { // New message on existing topic alias
1624 topic = m_receiveAliases.at(i: topicAlias - 1);
1625 if (topic.name().isEmpty()) {
1626 qCDebug(lcMqttConnection) << "TopicAlias receive: alias for unknown topic.";
1627 closeConnection(error: QMqttClient::ProtocolViolation);
1628 return;
1629 }
1630 qCDebug(lcMqttConnectionVerbose) << "TopicAlias receive: Using " << topicAlias;
1631 } else { // Resetting a topic alias
1632 qCDebug(lcMqttConnection) << "TopicAlias receive: Resetting:" << topic.name() << " : " << topicAlias;
1633 m_receiveAliases[topicAlias - 1] = topic;
1634 }
1635 }
1636
1637 // message
1638 const quint64 payloadLength = quint64(m_missingData);
1639 const QByteArray message = readBuffer(size: payloadLength);
1640 m_missingData -= payloadLength;
1641
1642 qCDebug(lcMqttConnectionVerbose) << "Finalize PUBLISH: topic:" << topic
1643 << " payloadLength:" << payloadLength;;
1644
1645 emit m_clientPrivate->m_client->messageReceived(message, topic);
1646
1647 QMqttMessage qmsg(topic, message, id, m_currentPublish.qos,
1648 m_currentPublish.dup, m_currentPublish.retain);
1649 qmsg.d->m_publishProperties = publishProperties;
1650
1651 if (id != 0) {
1652 QMqttMessageStatusProperties statusProp;
1653 statusProp.data->userProperties = publishProperties.userProperties();
1654 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Published, properties: statusProp);
1655 }
1656
1657 // Store subscriptions in a temporary container as each messageReceived is allowed to subscribe
1658 // again and thus invalid the iterator of the loop.
1659 QList<QMqttSubscription *> subscribers;
1660 for (const auto [key, value] : m_activeSubscriptions.asKeyValueRange()) {
1661 if (key.match(name: topic))
1662 subscribers.append(t: value);
1663 }
1664 for (const auto &s : subscribers)
1665 emit s->messageReceived(msg: qmsg);
1666
1667 if (m_currentPublish.qos == 1)
1668 sendControlPublishAcknowledge(id);
1669 else if (m_currentPublish.qos == 2)
1670 sendControlPublishReceive(id);
1671}
1672
1673void QMqttConnection::finalize_pubAckRecRelComp()
1674{
1675 qCDebug(lcMqttConnectionVerbose) << "Finalize PUBACK/REC/REL/COMP";
1676 const quint16 id = readBufferTyped<quint16>(dataSize: &m_missingData);
1677
1678 QMqttMessageStatusProperties properties;
1679 if (m_clientPrivate->m_protocolVersion == QMqttClient::MQTT_5_0 && m_missingData > 0) {
1680 // Reason Code (1byte)
1681 const quint8 reasonCode = readBufferTyped<quint8>(dataSize: &m_missingData);
1682 properties.data->reasonCode = QMqtt::ReasonCode(reasonCode);
1683
1684 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBACK || (m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) {
1685 // 3.4.2.1, 3.5.2.1
1686 switch (QMqtt::ReasonCode(reasonCode)) {
1687 case QMqtt::ReasonCode::Success:
1688 case QMqtt::ReasonCode::NoMatchingSubscriber:
1689 case QMqtt::ReasonCode::UnspecifiedError:
1690 case QMqtt::ReasonCode::ImplementationSpecificError:
1691 case QMqtt::ReasonCode::NotAuthorized:
1692 case QMqtt::ReasonCode::InvalidTopicName:
1693 case QMqtt::ReasonCode::MessageIdInUse:
1694 case QMqtt::ReasonCode::QuotaExceeded:
1695 case QMqtt::ReasonCode::InvalidPayloadFormat:
1696 break;
1697 default:
1698 qCWarning(lcMqttConnection) << "Received illegal PUBACK/REC reason code:" << reasonCode;
1699 closeConnection(error: QMqttClient::ProtocolViolation);
1700 return;
1701 }
1702 } else {
1703 // 3.6.2.1, 3.7.2.1
1704 switch (QMqtt::ReasonCode(reasonCode)) {
1705 case QMqtt::ReasonCode::Success:
1706 case QMqtt::ReasonCode::MessageIdNotFound:
1707 break;
1708 default:
1709 qCWarning(lcMqttConnection) << "Received illegal PUBREL/COMP reason code:" << reasonCode;
1710 closeConnection(error: QMqttClient::ProtocolViolation);
1711 return;
1712 }
1713 }
1714
1715 readMessageStatusProperties(properties);
1716 }
1717
1718 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREL) {
1719 qCDebug(lcMqttConnectionVerbose) << " PUBREL:" << id;
1720 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Released, properties);
1721 sendControlPublishComp(id);
1722 return;
1723 }
1724
1725 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBCOMP) {
1726 qCDebug(lcMqttConnectionVerbose) << " PUBCOMP:" << id;
1727 auto pendingRelease = m_pendingReleaseMessages.take(key: id);
1728 if (!pendingRelease)
1729 qCDebug(lcMqttConnection) << "Received PUBCOMP for unknown released message.";
1730 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Completed, properties);
1731 emit m_clientPrivate->m_client->messageSent(id);
1732 return;
1733 }
1734
1735 auto pendingMsg = m_pendingMessages.take(key: id);
1736 if (!pendingMsg) {
1737 qCDebug(lcMqttConnection) << "Received PUBACK for unknown message: " << id;
1738 return;
1739 }
1740 if ((m_currentPacket & 0xF0) == QMqttControlPacket::PUBREC) {
1741 qCDebug(lcMqttConnectionVerbose) << " PUBREC:" << id;
1742 m_pendingReleaseMessages.insert(key: id, value: pendingMsg);
1743 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Received, properties);
1744 sendControlPublishRelease(id);
1745 } else {
1746 qCDebug(lcMqttConnectionVerbose) << " PUBACK:" << id;
1747 emit m_clientPrivate->m_client->messageStatusChanged(id, s: QMqtt::MessageStatus::Acknowledged, properties);
1748 emit m_clientPrivate->m_client->messageSent(id);
1749 }
1750}
1751
1752void QMqttConnection::finalize_pingresp()
1753{
1754 qCDebug(lcMqttConnectionVerbose) << "Finalize PINGRESP";
1755 const quint8 v = readBufferTyped<quint8>(dataSize: &m_missingData);
1756
1757 if (v != 0) {
1758 qCDebug(lcMqttConnection) << "Received a PINGRESP including payload.";
1759 closeConnection(error: QMqttClient::ProtocolViolation);
1760 return;
1761 }
1762 m_pingTimeout--;
1763 emit m_clientPrivate->m_client->pingResponseReceived();
1764}
1765
1766bool QMqttConnection::processDataHelper()
1767{
1768 if (m_missingData > 0) {
1769 if ((m_readBuffer.size() - m_readPosition) < m_missingData)
1770 return false;
1771
1772 switch (m_currentPacket & 0xF0) {
1773 case QMqttControlPacket::AUTH:
1774 finalize_auth();
1775 break;
1776 case QMqttControlPacket::CONNACK:
1777 finalize_connack();
1778 break;
1779 case QMqttControlPacket::SUBACK:
1780 finalize_suback();
1781 break;
1782 case QMqttControlPacket::UNSUBACK:
1783 finalize_unsuback();
1784 break;
1785 case QMqttControlPacket::PUBLISH:
1786 finalize_publish();
1787 break;
1788 case QMqttControlPacket::PUBACK:
1789 case QMqttControlPacket::PUBREC:
1790 case QMqttControlPacket::PUBREL:
1791 case QMqttControlPacket::PUBCOMP:
1792 finalize_pubAckRecRelComp();
1793 break;
1794 case QMqttControlPacket::PINGRESP:
1795 finalize_pingresp();
1796 break;
1797 default:
1798 qCDebug(lcMqttConnection) << "Unknown packet to finalize.";
1799 closeConnection(error: QMqttClient::ProtocolViolation);
1800 break;
1801 }
1802
1803 if (m_internalState == BrokerDisconnected)
1804 return false;
1805
1806 Q_ASSERT(m_missingData == 0);
1807
1808 m_readBuffer = m_readBuffer.mid(index: m_readPosition);
1809 m_readPosition = 0;
1810 }
1811
1812 // MQTT-2.2 A fixed header of a control packet must be at least 2 bytes. If the payload is
1813 // longer than 127 bytes the header can be up to 5 bytes long.
1814 switch (m_readBuffer.size()) {
1815 case 0:
1816 case 1:
1817 return false;
1818 case 2:
1819 if ((m_readBuffer.at(i: 1) & 128) != 0)
1820 return false;
1821 break;
1822 case 3:
1823 if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0)
1824 return false;
1825 break;
1826 case 4:
1827 if ((m_readBuffer.at(i: 1) & 128) != 0 && (m_readBuffer.at(i: 2) & 128) != 0 && (m_readBuffer.at(i: 3) & 128) != 0)
1828 return false;
1829 break;
1830 default:
1831 break;
1832 }
1833
1834 readBuffer(data: reinterpret_cast<char *>(&m_currentPacket), size: 1);
1835
1836 switch (m_currentPacket & 0xF0) {
1837 case QMqttControlPacket::CONNACK: {
1838 qCDebug(lcMqttConnectionVerbose) << "Received CONNACK";
1839 if (m_internalState != BrokerWaitForConnectAck) {
1840 qCDebug(lcMqttConnection) << "Received CONNACK at unexpected time.";
1841 closeConnection(error: QMqttClient::ProtocolViolation);
1842 return false;
1843 }
1844
1845 qint32 payloadSize = readVariableByteInteger();
1846 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
1847 if (payloadSize != 2) {
1848 qCDebug(lcMqttConnection) << "Unexpected FRAME size in CONNACK.";
1849 closeConnection(error: QMqttClient::ProtocolViolation);
1850 return false;
1851 }
1852 }
1853 m_missingData = payloadSize;
1854 break;
1855 }
1856 case QMqttControlPacket::SUBACK: {
1857 qCDebug(lcMqttConnectionVerbose) << "Received SUBACK";
1858 const quint8 remaining = readBufferTyped<quint8>();
1859 m_missingData = remaining;
1860 break;
1861 }
1862 case QMqttControlPacket::PUBLISH: {
1863 qCDebug(lcMqttConnectionVerbose) << "Received PUBLISH";
1864 m_currentPublish.dup = m_currentPacket & 0x08;
1865 m_currentPublish.qos = (m_currentPacket & 0x06) >> 1;
1866 m_currentPublish.retain = m_currentPacket & 0x01;
1867 if ((m_currentPublish.qos == 0 && m_currentPublish.dup != 0)
1868 || m_currentPublish.qos > 2) {
1869 closeConnection(error: QMqttClient::ProtocolViolation);
1870 return false;
1871 }
1872
1873 m_missingData = readVariableByteInteger();
1874 if (m_missingData == -1)
1875 return false; // Connection closed inside readVariableByteInteger
1876 break;
1877 }
1878 case QMqttControlPacket::PINGRESP:
1879 qCDebug(lcMqttConnectionVerbose) << "Received PINGRESP";
1880 m_missingData = 1;
1881 break;
1882
1883
1884 case QMqttControlPacket::PUBREL: {
1885 qCDebug(lcMqttConnectionVerbose) << "Received PUBREL";
1886 const quint8 remaining = readBufferTyped<quint8>();
1887 if (remaining != 0x02) {
1888 qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length.";
1889 closeConnection(error: QMqttClient::ProtocolViolation);
1890 return false;
1891 }
1892 if ((m_currentPacket & 0x0F) != 0x02) {
1893 qCDebug(lcMqttConnection) << "Malformed fixed header for PUBREL.";
1894 closeConnection(error: QMqttClient::ProtocolViolation);
1895 return false;
1896 }
1897 m_missingData = 2;
1898 break;
1899 }
1900
1901 case QMqttControlPacket::UNSUBACK:
1902 case QMqttControlPacket::PUBACK:
1903 case QMqttControlPacket::PUBREC:
1904 case QMqttControlPacket::PUBCOMP: {
1905 qCDebug(lcMqttConnectionVerbose) << "Received UNSUBACK/PUBACK/PUBREC/PUBCOMP";
1906 if ((m_currentPacket & 0x0F) != 0) {
1907 qCDebug(lcMqttConnection) << "Malformed fixed header for UNSUBACK/PUBACK/PUBREC/PUBCOMP.";
1908 closeConnection(error: QMqttClient::ProtocolViolation);
1909 return false;
1910 }
1911 const quint8 remaining = readBufferTyped<quint8>();
1912 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0 && remaining != 0x02) {
1913 qCDebug(lcMqttConnection) << "Received 2 byte message with invalid remaining length.";
1914 closeConnection(error: QMqttClient::ProtocolViolation);
1915 return false;
1916 }
1917 m_missingData = remaining;
1918 break;
1919 }
1920 case QMqttControlPacket::AUTH:
1921 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
1922 qCDebug(lcMqttConnection) << "Received unknown command.";
1923 closeConnection(error: QMqttClient::ProtocolViolation);
1924 return false;
1925 }
1926 qCDebug(lcMqttConnectionVerbose) << "Received AUTH";
1927 if ((m_currentPacket & 0x0F) != 0) {
1928 qCDebug(lcMqttConnection) << "Malformed fixed header for AUTH.";
1929 closeConnection(error: QMqttClient::ProtocolViolation);
1930 return false;
1931 }
1932 m_missingData = readVariableByteInteger();
1933 if (m_missingData == -1)
1934 return false; // Connection closed inside readVariableByteInteger
1935 break;
1936 case QMqttControlPacket::DISCONNECT:
1937 if (m_clientPrivate->m_protocolVersion != QMqttClient::MQTT_5_0) {
1938 qCDebug(lcMqttConnection) << "Received unknown command.";
1939 closeConnection(error: QMqttClient::ProtocolViolation);
1940 return false;
1941 }
1942 qCDebug(lcMqttConnectionVerbose) << "Received DISCONNECT";
1943 if ((m_currentPacket & 0x0F) != 0) {
1944 qCDebug(lcMqttConnection) << "Malformed fixed header for DISCONNECT.";
1945 closeConnection(error: QMqttClient::ProtocolViolation);
1946 return false;
1947 }
1948 if (m_internalState != BrokerConnected) {
1949 qCDebug(lcMqttConnection) << "Received DISCONNECT at unexpected time.";
1950 closeConnection(error: QMqttClient::ProtocolViolation);
1951 return false;
1952 }
1953 closeConnection(error: QMqttClient::NoError);
1954 return false;
1955 default:
1956 qCDebug(lcMqttConnection) << "Received unknown command.";
1957 closeConnection(error: QMqttClient::ProtocolViolation);
1958 return false;
1959 }
1960
1961 /* set current command CONNACK - PINGRESP */
1962 /* read command size */
1963 /* calculate missing_data */
1964 return true; // reiterate. implicitly finishes and enqueues
1965}
1966
1967void QMqttConnection::processData()
1968{
1969 while (processDataHelper())
1970 ;
1971}
1972
1973bool QMqttConnection::writePacketToTransport(const QMqttControlPacket &p)
1974{
1975 const QByteArray writeData = p.serialize();
1976 qCDebug(lcMqttConnectionVerbose) << Q_FUNC_INFO << " DataSize:" << writeData.size();
1977 const qint64 res = m_transport->write(data: writeData.constData(), len: writeData.size());
1978 if (Q_UNLIKELY(res == -1)) {
1979 qCDebug(lcMqttConnection) << "Could not write frame to transport.";
1980 return false;
1981 }
1982 return true;
1983}
1984
1985QT_END_NAMESPACE
1986

source code of qtmqtt/src/mqtt/qmqttconnection.cpp