1// Copyright (C) 2022 The Qt Company Ltd.
2// Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com>, Viktor Kopp <vifactor@gmail.com>
3// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only
4
5#include <QtGrpc/private/qtgrpclogging_p.h>
6#include <QtGrpc/qgrpccalloptions.h>
7#include <QtGrpc/qgrpcchanneloptions.h>
8#include <QtGrpc/qgrpchttp2channel.h>
9#include <QtGrpc/qgrpcoperationcontext.h>
10#include <QtGrpc/qgrpcserializationformat.h>
11#include <QtGrpc/qgrpcstatus.h>
12
13#include <QtProtobuf/qprotobufjsonserializer.h>
14#include <QtProtobuf/qprotobufserializer.h>
15
16#include <QtNetwork/private/hpack_p.h>
17#include <QtNetwork/private/http2protocol_p.h>
18#include <QtNetwork/private/qhttp2connection_p.h>
19#if QT_CONFIG(localserver)
20# include <QtNetwork/qlocalsocket.h>
21#endif
22#include <QtNetwork/qtcpsocket.h>
23#if QT_CONFIG(ssl)
24# include <QtNetwork/qsslsocket.h>
25#endif
26
27#include <QtCore/private/qnoncontiguousbytedevice_p.h>
28#include <QtCore/qalgorithms.h>
29#include <QtCore/qbytearray.h>
30#include <QtCore/qendian.h>
31#include <QtCore/qiodevice.h>
32#include <QtCore/qlist.h>
33#include <QtCore/qmetaobject.h>
34#include <QtCore/qpointer.h>
35#include <QtCore/qqueue.h>
36#include <QtCore/qtimer.h>
37
38#include <functional>
39#include <optional>
40#include <utility>
41
42QT_BEGIN_NAMESPACE
43
44using namespace Qt::StringLiterals;
45using namespace QtGrpc;
46
47/*!
48 \class QGrpcHttp2Channel
49 \inmodule QtGrpc
50 \brief The QGrpcHttp2Channel class provides a HTTP/2 transport layer
51 for \gRPC communication.
52
53 The QGrpcHttp2Channel class implements QAbstractGrpcChannel, enabling \gRPC
54 communication carried over \l{https://datatracker.ietf.org/doc/html/rfc7540}
55 {HTTP/2 framing}.
56
57 HTTP/2 introduces several advantages over its predecessor, HTTP/1.1, making
58 QGrpcHttp2Channel well-suited for high-performance, real-time applications
59 that require efficient communication, without sacrificing security or
60 reliability, by using multiplexed TCP connections.
61
62 The channel can be customized with \l{Secure Sockets Layer (SSL)
63 Classes}{SSL} support, a custom \l{QGrpcChannelOptions::}
64 {serializationFormat}, or other options by constructing it with a
65 QGrpcChannelOptions containing the required customizations.
66
67 \section2 Transportation scheme
68
69 The QGrpcHttp2Channel implementation prefers different transportation
70 methods based on the provided \c{hostUri}, \l{QUrl::}{scheme} and options.
71 The following criteria applies:
72
73 \table
74 \header
75 \li Scheme
76 \li Description
77 \li Default Port
78 \li Requirements
79 \li Example
80 \row
81 \li \c{http}
82 \li Unencrypted HTTP/2 over TCP
83 \li 80
84 \li None
85 \li \c{http://localhost}
86 \row
87 \li \c{https}
88 \li TLS-encrypted HTTP/2 over TCP
89 \li 443
90 \li QSslSocket support \b{AND} (scheme \b{OR} \l{QGrpcChannelOptions::}{sslConfiguration})
91 \li \c{https://localhost}
92 \row
93 \li \c{unix}
94 \li Unix domain socket in filesystem path
95 \li
96 \li QLocalSocket support \b{AND} scheme
97 \li \c{unix:///tmp/grpc.socket}
98 \endtable
99
100 \section2 Content-Type
101
102 The \e{content-type} in \gRPC over HTTP/2 determines the message
103 serialization format. It must start with \c{application/grpc} and can
104 include a suffix. The format follows this scheme:
105
106 \code
107 "content-type": "application/grpc" [("+proto" / "+json" / {custom})]
108 \endcode
109
110 For example:
111 \list
112 \li \c{application/grpc+proto} specifies Protobuf encoding.
113 \li \c{application/grpc+json} specifies JSON encoding.
114 \endlist
115
116 The serialization format can be configured either by specifying the \c
117 {content-type} inside the metadata or by setting the \l{QGrpcChannelOptions::}
118 {serializationFormat} directly. By default, the \c {application/grpc}
119 content-type is used.
120
121 To configure QGrpcHttp2Channel with the JSON serialization format using
122 \c {content-type} metadata:
123
124 \code
125 auto jsonChannel = std::make_shared<QGrpcHttp2Channel>(
126 QUrl("http://localhost:50051"_L1),
127 QGrpcChannelOptions().setMetadata({
128 { "content-type"_ba, "application/grpc+json"_ba },
129 })
130 );
131 \endcode
132
133 For a custom serializer and \c {content-type}, you can directly set the
134 serialization format:
135
136 \include qgrpcserializationformat.cpp custom-serializer-code
137
138 \code
139 auto dummyChannel = std::make_shared<QGrpcHttp2Channel>(
140 QUrl("http://localhost:50051"_L1),
141 QGrpcChannelOptions().setSerializationFormat(dummyFormat)
142 );
143 \endcode
144
145 \include qgrpcserializationformat.cpp custom-serializer-desc
146
147 \sa QAbstractGrpcChannel, QGrpcChannelOptions, QGrpcSerializationFormat
148*/
149
150namespace {
151
152constexpr QByteArrayView AuthorityHeader(":authority");
153constexpr QByteArrayView MethodHeader(":method");
154constexpr QByteArrayView PathHeader(":path");
155constexpr QByteArrayView SchemeHeader(":scheme");
156
157constexpr QByteArrayView ContentTypeHeader("content-type");
158constexpr QByteArrayView AcceptEncodingHeader("accept-encoding");
159constexpr QByteArrayView TEHeader("te");
160
161constexpr QByteArrayView GrpcServiceNameHeader("service-name");
162constexpr QByteArrayView GrpcAcceptEncodingHeader("grpc-accept-encoding");
163constexpr QByteArrayView GrpcStatusHeader("grpc-status");
164constexpr QByteArrayView GrpcStatusMessageHeader("grpc-message");
165constexpr qsizetype GrpcMessageSizeHeaderSize = 5;
166constexpr QByteArrayView DefaultContentType = "application/grpc";
167
168// This HTTP/2 Error Codes to QGrpcStatus::StatusCode mapping should be kept in sync
169// with the following docs:
170// https://www.rfc-editor.org/rfc/rfc7540#section-7
171// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
172// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
173constexpr StatusCode http2ErrorToStatusCode(const quint32 http2Error)
174{
175 using Http2Error = Http2::Http2Error;
176
177 switch (http2Error) {
178 case Http2Error::HTTP2_NO_ERROR:
179 case Http2Error::PROTOCOL_ERROR:
180 case Http2Error::INTERNAL_ERROR:
181 case Http2Error::FLOW_CONTROL_ERROR:
182 case Http2Error::SETTINGS_TIMEOUT:
183 case Http2Error::STREAM_CLOSED:
184 case Http2Error::FRAME_SIZE_ERROR:
185 return StatusCode::Internal;
186 case Http2Error::REFUSE_STREAM:
187 return StatusCode::Unavailable;
188 case Http2Error::CANCEL:
189 return StatusCode::Cancelled;
190 case Http2Error::COMPRESSION_ERROR:
191 case Http2Error::CONNECT_ERROR:
192 return StatusCode::Internal;
193 case Http2Error::ENHANCE_YOUR_CALM:
194 return StatusCode::ResourceExhausted;
195 case Http2Error::INADEQUATE_SECURITY:
196 return StatusCode::PermissionDenied;
197 case Http2Error::HTTP_1_1_REQUIRED:
198 return StatusCode::Unknown;
199 }
200 return StatusCode::Internal;
201}
202
203// Sends the errorOccured and finished signals asynchronously to make sure user
204// connections work correctly.
205void operationContextAsyncError(QGrpcOperationContext *operationContext, const QGrpcStatus &status)
206{
207 Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::operationContextAsyncError",
208 "operationContext is null");
209 QTimer::singleShot(interval: 0, receiver: operationContext,
210 slot: [operationContext, status]() { emit operationContext->finished(status); });
211}
212
213} // namespace
214
215struct ExpectedData
216{
217 qsizetype expectedSize = 0;
218 QByteArray container;
219
220 bool updateExpectedSize()
221 {
222 if (expectedSize == 0) {
223 if (container.size() < GrpcMessageSizeHeaderSize)
224 return false;
225 expectedSize = qFromBigEndian<quint32>(src: container.data() + 1)
226 + GrpcMessageSizeHeaderSize;
227 }
228 return true;
229 }
230};
231
232// The Http2Handler manages an individual RPC over the HTTP/2 channel.
233// Each instance corresponds to an RPC initiated by the user.
234class Http2Handler : public QObject
235{
236 // Q_OBJECT macro is not needed and adds unwanted overhead.
237
238public:
239 enum State : uint8_t { Active, Cancelled, Finished };
240
241 explicit Http2Handler(const std::shared_ptr<QGrpcOperationContext> &operation,
242 QGrpcHttp2ChannelPrivate *parent, bool endStream);
243 ~Http2Handler() override;
244
245 void sendInitialRequest();
246 void attachStream(QHttp2Stream *stream_);
247 void processQueue();
248
249 [[nodiscard]] QGrpcOperationContext *operation() const;
250 [[nodiscard]] bool expired() const { return m_operation.expired(); }
251
252 [[nodiscard]] bool isStreamClosedForSending() const
253 {
254 // If stream pointer is nullptr this means we never opened it and should collect
255 // the incoming messages in queue until the stream is opened or the error occurred.
256 return m_stream != nullptr
257 && (m_stream->state() == QHttp2Stream::State::HalfClosedLocal
258 || m_stream->state() == QHttp2Stream::State::Closed);
259 }
260
261// context slot handlers:
262 bool cancel();
263 void writesDone();
264 void writeMessage(QByteArrayView data);
265 void deadlineTimeout();
266
267private:
268 void prepareInitialRequest(QGrpcOperationContext *operationContext,
269 QGrpcHttp2ChannelPrivate *channel);
270
271 HPack::HttpHeader m_initialHeaders;
272 std::weak_ptr<QGrpcOperationContext> m_operation;
273 QQueue<QByteArray> m_queue;
274 QPointer<QHttp2Stream> m_stream;
275 ExpectedData m_expectedData;
276 State m_handlerState = Active;
277 const bool m_endStreamAtFirstData;
278 QTimer m_deadlineTimer;
279
280 Q_DISABLE_COPY_MOVE(Http2Handler)
281};
282
283class QGrpcHttp2ChannelPrivate : public QObject
284{
285 Q_OBJECT
286public:
287 explicit QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q);
288 ~QGrpcHttp2ChannelPrivate() override = default;
289
290 void processOperation(const std::shared_ptr<QGrpcOperationContext> &operationContext,
291 bool endStream = false);
292
293 void deleteHandler(Http2Handler *handler);
294 [[nodiscard]] bool isLocalSocket() const
295 {
296#if QT_CONFIG(localserver)
297 return m_isLocalSocket;
298#else
299 return false;
300#endif
301 }
302 [[nodiscard]] const QByteArray &contentType() const { return m_contentType; }
303
304 [[nodiscard]] const QByteArray &authorityHeader() const { return m_authorityHeader; }
305
306 std::shared_ptr<QAbstractProtobufSerializer> serializer;
307 QUrl hostUri;
308 QGrpcHttp2Channel *q_ptr = nullptr;
309
310private:
311 enum ConnectionState { Connecting = 0, Connected, Error };
312
313 template <typename T>
314 void connectErrorHandler(T *socket, QGrpcOperationContext *operationContext)
315 {
316 QObject::connect(socket, &T::errorOccurred, operationContext,
317 [operationContextPtr = QPointer(operationContext), this](auto error) {
318 if (m_isInsideSocketErrorOccurred) {
319 qGrpcCritical("Socket errorOccurred signal triggered while "
320 "already handling an error");
321 return;
322 }
323 m_isInsideSocketErrorOccurred = true;
324 auto reset = qScopeGuard([this]() {
325 m_isInsideSocketErrorOccurred = false;
326 });
327 emit operationContextPtr->finished(status: QGrpcStatus{
328 StatusCode::Unavailable,
329 QGrpcHttp2ChannelPrivate::tr(s: "Network error occurred: %1")
330 .arg(error) });
331 });
332 }
333
334 void sendInitialRequest(Http2Handler *handler);
335 void createHttp2Connection();
336 void handleSocketError();
337
338 template <typename T>
339 T *initSocket()
340 {
341 auto p = std::make_unique<T>();
342 T *typedSocket = p.get();
343 m_socket = std::move(p);
344 return typedSocket;
345 }
346
347 std::unique_ptr<QIODevice> m_socket = nullptr;
348 bool m_isInsideSocketErrorOccurred = false;
349 QHttp2Connection *m_connection = nullptr;
350 QList<Http2Handler *> m_activeHandlers;
351 QList<Http2Handler *> m_pendingHandlers;
352#if QT_CONFIG(localserver)
353 bool m_isLocalSocket = false;
354#endif
355 QByteArray m_contentType;
356 ConnectionState m_state = Connecting;
357 std::function<void()> m_reconnectFunction;
358
359 QByteArray m_authorityHeader;
360 Q_DISABLE_COPY_MOVE(QGrpcHttp2ChannelPrivate)
361};
362
363///
364/// ## Http2Handler Implementations
365///
366
367Http2Handler::Http2Handler(const std::shared_ptr<QGrpcOperationContext> &operation,
368 QGrpcHttp2ChannelPrivate *parent, bool endStream)
369 : QObject(parent), m_operation(operation), m_endStreamAtFirstData(endStream)
370{
371 auto *channelOpPtr = operation.get();
372 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::cancelRequested, context: this,
373 slot: &Http2Handler::cancel);
374 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::writesDoneRequested, context: this,
375 slot: &Http2Handler::writesDone);
376 if (!m_endStreamAtFirstData) {
377 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::writeMessageRequested, context: this,
378 slot: &Http2Handler::writeMessage);
379 }
380 QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::finished, context: &m_deadlineTimer,
381 slot: &QTimer::stop);
382 prepareInitialRequest(operationContext: channelOpPtr, channel: parent);
383}
384
385Http2Handler::~Http2Handler()
386{
387 if (m_stream) {
388 QHttp2Stream *streamPtr = m_stream.get();
389 m_stream.clear();
390 delete streamPtr;
391 }
392}
393
394// Attaches the HTTP/2 stream and sets up the necessary connections and
395// preconditions. The HTTP/2 connection is established, and the transport
396// is now ready for communication.
397void Http2Handler::attachStream(QHttp2Stream *stream_)
398{
399 Q_ASSERT(m_stream == nullptr);
400 Q_ASSERT(stream_ != nullptr);
401
402 auto *channelOpPtr = operation();
403 m_stream = stream_;
404
405 auto *parentChannel = qobject_cast<QGrpcHttp2ChannelPrivate *>(object: parent());
406 QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::headersReceived, context: channelOpPtr,
407 slot: [channelOpPtr, parentChannel, this](const HPack::HttpHeader &headers,
408 bool endStream) {
409 auto md = channelOpPtr->serverMetadata();
410 QtGrpc::StatusCode statusCode = StatusCode::Ok;
411 QString statusMessage;
412 for (const auto &header : headers) {
413 md.insert(key: header.name, value: header.value);
414 if (header.name == GrpcStatusHeader) {
415 statusCode = static_cast<
416 StatusCode>(QString::fromLatin1(ba: header.value).toShort());
417 } else if (header.name == GrpcStatusMessageHeader) {
418 statusMessage = QString::fromUtf8(ba: header.value);
419 }
420 }
421
422 channelOpPtr->setServerMetadata(std::move(md));
423
424 if (endStream) {
425 if (m_handlerState != Cancelled) {
426 emit channelOpPtr->finished(
427 status: QGrpcStatus{ statusCode,statusMessage });
428 }
429 parentChannel->deleteHandler(handler: this);
430 }
431 });
432
433 Q_ASSERT(parentChannel != nullptr);
434 auto errorConnection = std::make_shared<QMetaObject::Connection>();
435 *errorConnection = QObject::connect(
436 sender: m_stream.get(), signal: &QHttp2Stream::errorOccurred, context: parentChannel,
437 slot: [parentChannel, errorConnection, this](quint32 http2ErrorCode, const QString &errorString) {
438 if (!m_operation.expired()) {
439 auto channelOp = m_operation.lock();
440 emit channelOp->finished(status: QGrpcStatus{ http2ErrorToStatusCode(http2Error: http2ErrorCode),
441 errorString });
442 }
443 parentChannel->deleteHandler(handler: this);
444 QObject::disconnect(*errorConnection);
445 });
446
447 QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::dataReceived, context: channelOpPtr,
448 slot: [channelOpPtr, parentChannel, this](const QByteArray &data, bool endStream) {
449 if (m_handlerState != Cancelled) {
450 m_expectedData.container.append(a: data);
451
452 if (!m_expectedData.updateExpectedSize())
453 return;
454
455 while (m_expectedData.container.size()
456 >= m_expectedData.expectedSize) {
457 qGrpcDebug() << "Full data received:" << data.size()
458 << "dataContainer:" << m_expectedData.container.size()
459 << "capacity:" << m_expectedData.expectedSize;
460 emit channelOpPtr
461 ->messageReceived(data: m_expectedData.container
462 .mid(index: GrpcMessageSizeHeaderSize,
463 len: m_expectedData.expectedSize
464 - GrpcMessageSizeHeaderSize));
465 m_expectedData.container.remove(index: 0, len: m_expectedData.expectedSize);
466 m_expectedData.expectedSize = 0;
467 if (!m_expectedData.updateExpectedSize())
468 return;
469 }
470 if (endStream) {
471 m_handlerState = Finished;
472 emit channelOpPtr->finished(status: {});
473 parentChannel->deleteHandler(handler: this);
474 }
475 }
476 });
477
478 QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::uploadFinished, context: this,
479 slot: &Http2Handler::processQueue);
480
481 std::optional<std::chrono::milliseconds> deadline;
482 if (channelOpPtr->callOptions().deadlineTimeout())
483 deadline = channelOpPtr->callOptions().deadlineTimeout();
484 else if (parentChannel->q_ptr->channelOptions().deadlineTimeout())
485 deadline = parentChannel->q_ptr->channelOptions().deadlineTimeout();
486 if (deadline) {
487 // We have an active stream and a deadline. It's time to start the timer.
488 QObject::connect(sender: &m_deadlineTimer, signal: &QTimer::timeout, context: this, slot: &Http2Handler::deadlineTimeout);
489 m_deadlineTimer.start(value: *deadline);
490 }
491}
492
493QGrpcOperationContext *Http2Handler::operation() const
494{
495 Q_ASSERT(!m_operation.expired());
496
497 return m_operation.lock().get();
498}
499
500// Prepares the initial headers and enqueues the initial message.
501// The headers are sent once the HTTP/2 connection is established.
502void Http2Handler::prepareInitialRequest(QGrpcOperationContext *operationContext,
503 QGrpcHttp2ChannelPrivate *channel)
504{
505 const auto &channelOptions = channel->q_ptr->channelOptions();
506 QByteArray service{ operationContext->service().data(), operationContext->service().size() };
507 QByteArray method{ operationContext->method().data(), operationContext->method().size() };
508 m_initialHeaders = HPack::HttpHeader{
509 { AuthorityHeader.toByteArray(), channel->authorityHeader() },
510 { MethodHeader.toByteArray(), "POST"_ba },
511 { PathHeader.toByteArray(), QByteArray('/' + service + '/' + method) },
512 { SchemeHeader.toByteArray(),
513 channel->isLocalSocket() ? "http"_ba : channel->hostUri.scheme().toLatin1() },
514 { ContentTypeHeader.toByteArray(), channel->contentType() },
515 { GrpcServiceNameHeader.toByteArray(), { service } },
516 { GrpcAcceptEncodingHeader.toByteArray(), "identity,deflate,gzip"_ba },
517 { AcceptEncodingHeader.toByteArray(), "identity,gzip"_ba },
518 { TEHeader.toByteArray(), "trailers"_ba },
519 };
520
521 auto iterateMetadata = [this](const auto &metadata) {
522 for (const auto &[key, value] : metadata.asKeyValueRange()) {
523 const auto lowerKey = key.toLower();
524 if (lowerKey == AuthorityHeader || lowerKey == MethodHeader || lowerKey == PathHeader
525 || lowerKey == SchemeHeader || lowerKey == ContentTypeHeader) {
526 continue;
527 }
528 m_initialHeaders.emplace_back(lowerKey, value);
529 }
530 };
531
532 iterateMetadata(channelOptions.metadata());
533 iterateMetadata(operationContext->callOptions().metadata());
534
535 writeMessage(data: operationContext->argument());
536}
537
538// Slot to enqueue a writeMessage request, either from the initial message
539// or from the user in client/bidirectional streaming RPCs.
540void Http2Handler::writeMessage(QByteArrayView data)
541{
542 if (m_handlerState != Active || isStreamClosedForSending()) {
543 qGrpcDebug("Attempt sending data to the ended stream");
544 return;
545 }
546
547 QByteArray msg(GrpcMessageSizeHeaderSize + data.size(), '\0');
548 // Args must be 4-byte unsigned int to fit into 4-byte big endian
549 qToBigEndian(src: static_cast<quint32>(data.size()), dest: msg.data() + 1);
550
551 // protect against nullptr data.
552 if (!data.isEmpty()) {
553 std::memcpy(dest: msg.begin() + GrpcMessageSizeHeaderSize, src: data.begin(),
554 n: static_cast<size_t>(data.size()));
555 }
556
557 m_queue.enqueue(t: msg);
558 processQueue();
559}
560
561// Sends the initial headers and processes the message queue containing the
562// initial message. At this point, the HTTP/2 connection is established, and
563// the stream is attached.
564void Http2Handler::sendInitialRequest()
565{
566 Q_ASSERT(!m_initialHeaders.empty());
567 Q_ASSERT(m_stream);
568
569 if (!m_stream->sendHEADERS(headers: m_initialHeaders, endStream: false)) {
570 operationContextAsyncError(operationContext: operation(),
571 status: QGrpcStatus{
572 StatusCode::Unavailable,
573 tr(s: "Unable to send initial headers to an HTTP/2 stream") });
574 return;
575 }
576 m_initialHeaders.clear();
577 processQueue();
578}
579
580// The core logic for sending the already serialized data through the HTTP/2 stream.
581// This function is invoked either by the user via writeMessageRequested() or
582// writesDoneRequested(), or it is continuously polled after the previous uploadFinished()
583void Http2Handler::processQueue()
584{
585 if (!m_stream)
586 return;
587
588 if (m_stream->isUploadingDATA())
589 return;
590
591 if (m_queue.isEmpty())
592 return;
593
594 // Take ownership of the byte device.
595 auto *device = QNonContiguousByteDeviceFactory::create(byteArray: m_queue.dequeue());
596 device->setParent(m_stream);
597
598 m_stream->sendDATA(device, endStream: device->atEnd() || m_endStreamAtFirstData);
599 // Manage the lifetime through the uploadFinished signal (or this
600 // Http2Handler). Don't use QObject::deleteLater here as this function is
601 // expensive and blocking. Delete the byte device directly.
602 // This is fine in this context.
603 connect(sender: m_stream.get(), signal: &QHttp2Stream::uploadFinished, context: device, slot: [device] { delete device; });
604}
605
606bool Http2Handler::cancel()
607{
608 if (m_handlerState != Active || !m_stream)
609 return false;
610 m_handlerState = Cancelled;
611
612 // Client cancelled the stream before the deadline exceeded.
613 m_deadlineTimer.stop();
614
615 // Immediate cancellation by sending the RST_STREAM frame.
616 return m_stream->sendRST_STREAM(errorCode: Http2::Http2Error::CANCEL);
617}
618
619void Http2Handler::writesDone()
620{
621 if (m_handlerState != Active)
622 return;
623
624 m_handlerState = Finished;
625
626 // Stream is already (half)closed, skip sending the DATA frame with the end-of-stream flag.
627 if (isStreamClosedForSending())
628 return;
629
630 m_queue.enqueue(t: {});
631 processQueue();
632}
633
634void Http2Handler::deadlineTimeout()
635{
636 Q_ASSERT_X(m_stream, "onDeadlineTimeout", "stream is not available");
637
638 if (m_operation.expired()) {
639 qGrpcWarning("Operation expired on deadline timeout");
640 return;
641 }
642 // cancel the stream by sending the RST_FRAME and report
643 // the status back to our user.
644 if (cancel()) {
645 emit m_operation.lock()->finished(status: { StatusCode::DeadlineExceeded,
646 "Deadline Exceeded" });
647 } else {
648 qGrpcWarning("Cancellation failed on deadline timeout.");
649 }
650}
651
652///
653/// ## QGrpcHttp2ChannelPrivate Implementations
654///
655
656QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q)
657 : hostUri(uri), q_ptr(q)
658{
659 auto channelOptions = q_ptr->channelOptions();
660 auto formatSuffix = channelOptions.serializationFormat().suffix();
661 const QByteArray defaultContentType = DefaultContentType.toByteArray();
662 const QByteArray contentTypeFromOptions = !formatSuffix.isEmpty()
663 ? defaultContentType + '+' + formatSuffix
664 : defaultContentType;
665 bool warnAboutFormatConflict = !formatSuffix.isEmpty();
666
667 const auto it = channelOptions.metadata().constFind(key: ContentTypeHeader.data());
668 if (it != channelOptions.metadata().cend()) {
669 if (formatSuffix.isEmpty() && it.value() != DefaultContentType) {
670 if (it.value() == "application/grpc+json") {
671 channelOptions.setSerializationFormat(SerializationFormat::Json);
672 } else if (it.value() == "application/grpc+proto" || it.value() == DefaultContentType) {
673 channelOptions.setSerializationFormat(SerializationFormat::Protobuf);
674 } else {
675 qGrpcWarning() << "Cannot choose the serializer for " << ContentTypeHeader
676 << it.value() << ". Using protobuf format as the default one.";
677 channelOptions.setSerializationFormat(SerializationFormat::Default);
678 }
679 q_ptr->setChannelOptions(channelOptions);
680 } else if (it.value() != contentTypeFromOptions) {
681 warnAboutFormatConflict = true;
682 } else {
683 warnAboutFormatConflict = false;
684 }
685 } else {
686 warnAboutFormatConflict = false;
687 }
688
689 if (formatSuffix == channelOptions.serializationFormat().suffix()) { // no change
690 m_contentType = contentTypeFromOptions;
691 } else { // format has changed, update content type
692 m_contentType = !channelOptions.serializationFormat().suffix().isEmpty()
693 ? defaultContentType + '+' + channelOptions.serializationFormat().suffix()
694 : defaultContentType;
695 }
696
697 if (warnAboutFormatConflict) {
698 qGrpcWarning()
699 << "Manually specified serialization format '%1' doesn't match the %2 header value "
700 "'%3'"_L1.arg(args: QString::fromLatin1(ba: contentTypeFromOptions),
701 args: QString::fromLatin1(ba: ContentTypeHeader),
702 args: QString::fromLatin1(ba: it.value()));
703 }
704
705 bool nonDefaultPort = false;
706#if QT_CONFIG(localserver)
707 if (hostUri.scheme() == "unix"_L1) {
708 auto *localSocket = initSocket<QLocalSocket>();
709 m_isLocalSocket = true;
710
711 QObject::connect(sender: localSocket, signal: &QLocalSocket::connected, context: this,
712 slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection);
713 QObject::connect(sender: localSocket, signal: &QLocalSocket::errorOccurred, context: this,
714 slot: [this](QLocalSocket::LocalSocketError error) {
715 qGrpcDebug()
716 << "Error occurred(" << error << "):"
717 << static_cast<QLocalSocket *>(m_socket.get())->errorString()
718 << hostUri;
719 handleSocketError();
720 });
721 m_reconnectFunction = [localSocket, this] {
722 localSocket->connectToServer(name: hostUri.host() + hostUri.path());
723 };
724 } else
725#endif
726#if QT_CONFIG(ssl)
727 if (hostUri.scheme() == "https"_L1 || channelOptions.sslConfiguration()) {
728 auto *sslSocket = initSocket<QSslSocket>();
729 if (hostUri.port() < 0) {
730 hostUri.setPort(443);
731 } else {
732 nonDefaultPort = hostUri.port() != 443;
733 }
734
735 if (const auto userSslConfig = channelOptions.sslConfiguration(); userSslConfig) {
736 sslSocket->setSslConfiguration(*userSslConfig);
737 } else {
738 static const QByteArray h2NexProtocol = "h2"_ba;
739 auto defautlSslConfig = QSslConfiguration::defaultConfiguration();
740 auto allowedNextProtocols = defautlSslConfig.allowedNextProtocols();
741 if (!allowedNextProtocols.contains(t: h2NexProtocol))
742 allowedNextProtocols.append(t: h2NexProtocol);
743 defautlSslConfig.setAllowedNextProtocols(allowedNextProtocols);
744 sslSocket->setSslConfiguration(defautlSslConfig);
745 }
746
747 QObject::connect(sender: sslSocket, signal: &QSslSocket::encrypted, context: this,
748 slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection);
749 QObject::connect(sender: sslSocket, signal: &QAbstractSocket::errorOccurred, context: this,
750 slot: [this](QAbstractSocket::SocketError error) {
751 qDebug()
752 << "Error occurred(" << error << "):"
753 << static_cast<QAbstractSocket *>(m_socket.get())->errorString()
754 << hostUri;
755 handleSocketError();
756 });
757 m_reconnectFunction = [sslSocket, this] {
758 sslSocket->connectToHostEncrypted(hostName: hostUri.host(), port: static_cast<quint16>(hostUri.port()));
759 };
760 } else
761#endif
762 {
763 if (hostUri.scheme() != "http"_L1) {
764 qGrpcWarning() << "Unsupported transport protocol scheme '" << hostUri.scheme()
765 << "'. Fall back to 'http'.";
766 }
767
768 auto *httpSocket = initSocket<QTcpSocket>();
769 if (hostUri.port() < 0) {
770 hostUri.setPort(80);
771 } else {
772 nonDefaultPort = hostUri.port() != 80;
773 }
774
775 QObject::connect(sender: httpSocket, signal: &QAbstractSocket::connected, context: this,
776 slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection);
777 QObject::connect(sender: httpSocket, signal: &QAbstractSocket::errorOccurred, context: this,
778 slot: [this](QAbstractSocket::SocketError error) {
779 qGrpcDebug()
780 << "Error occurred(" << error << "):"
781 << static_cast<QAbstractSocket *>(m_socket.get())->errorString()
782 << hostUri;
783 handleSocketError();
784 });
785 m_reconnectFunction = [httpSocket, this] {
786 httpSocket->connectToHost(hostName: hostUri.host(), port: static_cast<quint16>(hostUri.port()));
787 };
788 }
789
790 m_authorityHeader = hostUri.host().toLatin1();
791 if (nonDefaultPort) {
792 m_authorityHeader += ':';
793 m_authorityHeader += QByteArray::number(hostUri.port());
794 }
795
796 m_reconnectFunction();
797}
798
799void QGrpcHttp2ChannelPrivate::processOperation(const std::shared_ptr<QGrpcOperationContext>
800 &operationContext,
801 bool endStream)
802{
803 auto *operationContextPtr = operationContext.get();
804 Q_ASSERT_X(operationContextPtr != nullptr, "QGrpcHttp2ChannelPrivate::processOperation",
805 "operation context is nullptr.");
806
807 if (!m_socket->isWritable() && m_state == ConnectionState::Connected) {
808 operationContextAsyncError(operationContext: operationContextPtr,
809 status: QGrpcStatus{ StatusCode::Unavailable,
810 m_socket->errorString() });
811 return;
812 }
813
814#if QT_CONFIG(localserver)
815 if (m_isLocalSocket) {
816 connectErrorHandler<QLocalSocket>(socket: static_cast<QLocalSocket *>(m_socket.get()),
817 operationContext: operationContextPtr);
818 } else
819#endif
820 {
821 connectErrorHandler<QAbstractSocket>(socket: static_cast<QAbstractSocket *>(m_socket.get()),
822 operationContext: operationContextPtr);
823 }
824
825 auto *handler = new Http2Handler(operationContext, this, endStream);
826 if (m_connection == nullptr) {
827 m_pendingHandlers.push_back(t: handler);
828 } else {
829 sendInitialRequest(handler);
830 m_activeHandlers.push_back(t: handler);
831 }
832
833 if (m_state == ConnectionState::Error) {
834 Q_ASSERT_X(m_reconnectFunction, "QGrpcHttp2ChannelPrivate::processOperation",
835 "Socket reconnection function is not defined.");
836 if (m_isInsideSocketErrorOccurred) {
837 qGrpcWarning("Inside socket error handler. Reconnect deferred to event loop.");
838 QTimer::singleShot(interval: 0, slot: [this]{ m_reconnectFunction(); });
839 } else {
840 m_reconnectFunction();
841 }
842 m_state = ConnectionState::Connecting;
843 }
844}
845
846void QGrpcHttp2ChannelPrivate::createHttp2Connection()
847{
848 Q_ASSERT_X(m_connection == nullptr, "QGrpcHttp2ChannelPrivate::createHttp2Connection",
849 "Attempt to create the HTTP/2 connection, but it already exists. This situation is "
850 "exceptional.");
851
852 // Nagle's algorithm slows down gRPC communication when frequently sending small utility
853 // HTTP/2 frames. Since an ACK is not sent until a predefined timeout if the TCP frame is
854 // not full enough, communication hangs. In our case, this results in a 40ms delay when
855 // WINDOW_UPDATE or PING frames are sent in a separate TCP frame.
856 //
857 // TODO: We should probably allow users to opt out of this using QGrpcChannelOptions,
858 // see QTBUG-134428.
859 if (QAbstractSocket *abstractSocket = qobject_cast<QAbstractSocket *>(object: m_socket.get()))
860 abstractSocket->setSocketOption(option: QAbstractSocket::LowDelayOption, value: 1);
861
862 m_connection = QHttp2Connection::createDirectConnection(socket: m_socket.get(), config: {});
863
864 if (m_connection) {
865 QObject::connect(sender: m_socket.get(), signal: &QAbstractSocket::readyRead, context: m_connection,
866 slot: &QHttp2Connection::handleReadyRead);
867 m_state = ConnectionState::Connected;
868 }
869
870 for (const auto &handler : m_pendingHandlers) {
871 if (handler->expired()) {
872 delete handler;
873 continue;
874 }
875 sendInitialRequest(handler);
876 }
877 m_activeHandlers.append(l: m_pendingHandlers);
878 m_pendingHandlers.clear();
879}
880
881void QGrpcHttp2ChannelPrivate::handleSocketError()
882{
883 qDeleteAll(c: m_activeHandlers);
884 m_activeHandlers.clear();
885 qDeleteAll(c: m_pendingHandlers);
886 m_pendingHandlers.clear();
887 delete m_connection;
888 m_connection = nullptr;
889 m_state = ConnectionState::Error;
890}
891
892void QGrpcHttp2ChannelPrivate::sendInitialRequest(Http2Handler *handler)
893{
894 Q_ASSERT(handler != nullptr);
895 auto *channelOpPtr = handler->operation();
896 if (!m_connection) {
897 operationContextAsyncError(operationContext: channelOpPtr,
898 status: QGrpcStatus{
899 StatusCode::Unavailable,
900 tr(s: "Unable to establish an HTTP/2 connection") });
901 return;
902 }
903
904 const auto streamAttempt = m_connection->createStream();
905 if (!streamAttempt.ok()) {
906 operationContextAsyncError(operationContext: channelOpPtr,
907 status: QGrpcStatus{
908 StatusCode::Unavailable,
909 tr(s: "Unable to create an HTTP/2 stream (%1)")
910 .arg(a: QDebug::toString(object: streamAttempt.error())) });
911 return;
912 }
913 handler->attachStream(stream_: streamAttempt.unwrap());
914 handler->sendInitialRequest();
915}
916
917void QGrpcHttp2ChannelPrivate::deleteHandler(Http2Handler *handler)
918{
919 const auto it = std::find(first: m_activeHandlers.constBegin(), last: m_activeHandlers.constEnd(), val: handler);
920 if (it == m_activeHandlers.constEnd())
921 return;
922 handler->deleteLater();
923 m_activeHandlers.erase(pos: it);
924}
925
926///
927/// ## QGrpcHttp2Channel Implementations
928///
929
930/*!
931 Constructs QGrpcHttp2Channel with \a hostUri. Please see the
932 \l{Transportation scheme} section for more information.
933*/
934QGrpcHttp2Channel::QGrpcHttp2Channel(const QUrl &hostUri)
935 : d_ptr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: hostUri, args: this))
936{
937}
938
939/*!
940 Constructs QGrpcHttp2Channel with \a hostUri and \a options. Please see the
941 \l{Transportation scheme} section for more information.
942*/
943QGrpcHttp2Channel::QGrpcHttp2Channel(const QUrl &hostUri, const QGrpcChannelOptions &options)
944 : QAbstractGrpcChannel(options),
945 d_ptr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: hostUri, args: this))
946{
947}
948
949/*!
950 Destroys the QGrpcHttp2Channel object.
951*/
952QGrpcHttp2Channel::~QGrpcHttp2Channel() = default;
953
954/*!
955 Returns the host URI for this channel.
956*/
957QUrl QGrpcHttp2Channel::hostUri() const
958{
959 return d_ptr->hostUri;
960}
961
962/*!
963 \internal
964 Initiates a unary \gRPC call.
965*/
966void QGrpcHttp2Channel::call(std::shared_ptr<QGrpcOperationContext> operationContext)
967{
968 d_ptr->processOperation(operationContext, endStream: true);
969}
970
971/*!
972 \internal
973 Initiates a server-side \gRPC stream.
974*/
975void QGrpcHttp2Channel::serverStream(std::shared_ptr<QGrpcOperationContext> operationContext)
976{
977 d_ptr->processOperation(operationContext, endStream: true);
978}
979
980/*!
981 \internal
982 Initiates a client-side \gRPC stream.
983*/
984void QGrpcHttp2Channel::clientStream(std::shared_ptr<QGrpcOperationContext> operationContext)
985{
986 d_ptr->processOperation(operationContext);
987}
988
989/*!
990 \internal
991 Initiates a bidirectional \gRPC stream.
992*/
993void QGrpcHttp2Channel::bidiStream(std::shared_ptr<QGrpcOperationContext> operationContext)
994{
995 d_ptr->processOperation(operationContext);
996}
997
998/*!
999 \internal
1000 Returns the serializer of the channel.
1001*/
1002std::shared_ptr<QAbstractProtobufSerializer> QGrpcHttp2Channel::serializer() const
1003{
1004 return channelOptions().serializationFormat().serializer();
1005}
1006
1007QT_END_NAMESPACE
1008
1009#include "qgrpchttp2channel.moc"
1010

Provided by KDAB

Privacy Policy
Learn Advanced QML with KDAB
Find out more

source code of qtgrpc/src/grpc/qgrpchttp2channel.cpp