1 | // Copyright (C) 2022 The Qt Company Ltd. |
---|---|
2 | // Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com>, Viktor Kopp <vifactor@gmail.com> |
3 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only |
4 | |
5 | #include <QtGrpc/private/qtgrpclogging_p.h> |
6 | #include <QtGrpc/qgrpccalloptions.h> |
7 | #include <QtGrpc/qgrpcchanneloptions.h> |
8 | #include <QtGrpc/qgrpchttp2channel.h> |
9 | #include <QtGrpc/qgrpcoperationcontext.h> |
10 | #include <QtGrpc/qgrpcserializationformat.h> |
11 | #include <QtGrpc/qgrpcstatus.h> |
12 | |
13 | #include <QtProtobuf/qprotobufjsonserializer.h> |
14 | #include <QtProtobuf/qprotobufserializer.h> |
15 | |
16 | #include <QtNetwork/private/hpack_p.h> |
17 | #include <QtNetwork/private/http2protocol_p.h> |
18 | #include <QtNetwork/private/qhttp2connection_p.h> |
19 | #if QT_CONFIG(localserver) |
20 | # include <QtNetwork/qlocalsocket.h> |
21 | #endif |
22 | #include <QtNetwork/qtcpsocket.h> |
23 | #if QT_CONFIG(ssl) |
24 | # include <QtNetwork/qsslsocket.h> |
25 | #endif |
26 | |
27 | #include <QtCore/private/qnoncontiguousbytedevice_p.h> |
28 | #include <QtCore/qalgorithms.h> |
29 | #include <QtCore/qbytearray.h> |
30 | #include <QtCore/qendian.h> |
31 | #include <QtCore/qiodevice.h> |
32 | #include <QtCore/qlist.h> |
33 | #include <QtCore/qmetaobject.h> |
34 | #include <QtCore/qpointer.h> |
35 | #include <QtCore/qqueue.h> |
36 | #include <QtCore/qtimer.h> |
37 | |
38 | #include <functional> |
39 | #include <optional> |
40 | #include <utility> |
41 | |
42 | QT_BEGIN_NAMESPACE |
43 | |
44 | using namespace Qt::StringLiterals; |
45 | using namespace QtGrpc; |
46 | |
47 | /*! |
48 | \class QGrpcHttp2Channel |
49 | \inmodule QtGrpc |
50 | |
51 | \brief The QGrpcHttp2Channel class is an HTTP/2-based of |
52 | QAbstractGrpcChannel, based on \l {Qt Network} HTTP/2 implementation. |
53 | |
54 | Uses \l QGrpcChannelOptions and \l QGrpcCallOptions |
55 | to control the HTTP/2 communication with the server. |
56 | |
57 | Use \l QGrpcChannelOptions to set the SSL configuration, |
58 | application-specific HTTP/2 headers, and connection timeouts. |
59 | |
60 | \l QGrpcCallOptions control channel parameters for the |
61 | specific unary call or gRPC stream. |
62 | |
63 | QGrpcHttp2Channel uses QGrpcChannelOptions to select the serialization |
64 | format for the protobuf messages. The serialization format can be set |
65 | either using the \c {content-type} metadata or by setting the |
66 | \l QGrpcChannelOptions::serializationFormat directly. |
67 | |
68 | Using the following example you can create a QGrpcHttp2Channel with the |
69 | JSON serialization format using the \c {content-type} metadata: |
70 | \code |
71 | auto channelJson = std::make_shared< |
72 | QGrpcHttp2Channel>(QGrpcChannelOptions{ QUrl("http://localhost:50051", QUrl::StrictMode) } |
73 | .withMetadata({ { "content-type"_ba, |
74 | "application/grpc+json"_ba } })); |
75 | \endcode |
76 | |
77 | Also you can use own serializer and custom \c {content-type} as following: |
78 | \code |
79 | class DummySerializer : public QAbstractProtobufSerializer |
80 | { |
81 | ... |
82 | }; |
83 | auto channel = std::make_shared< |
84 | QGrpcHttp2Channel>(QUrl("http://localhost:50051", QUrl::StrictMode), QGrpcChannelOptions{} |
85 | .setSerializationFormat(QGrpcSerializationFormat{ "dummy", |
86 | std::make_shared<DummySerializer>() })); |
87 | \endcode |
88 | |
89 | QGrpcHttp2Channel will use the \c DummySerializer to serialize |
90 | and deserialize protobuf message and use the |
91 | \c { content-type: application/grpc+dummy } header when sending |
92 | HTTP/2 requests to server. |
93 | |
94 | \l QGrpcChannelOptions::serializationFormat has higher priority and |
95 | if the \c {content-type} metadata suffix doesn't match the |
96 | \l QGrpcSerializationFormat::suffix of the specified |
97 | \l QGrpcChannelOptions::serializationFormat QGrpcHttp2Channel produces |
98 | warning. |
99 | |
100 | \sa QGrpcChannelOptions, QGrpcCallOptions, QSslConfiguration |
101 | */ |
102 | |
103 | namespace { |
104 | |
105 | constexpr QByteArrayView AuthorityHeader(":authority"); |
106 | constexpr QByteArrayView MethodHeader(":method"); |
107 | constexpr QByteArrayView PathHeader(":path"); |
108 | constexpr QByteArrayView SchemeHeader(":scheme"); |
109 | |
110 | constexpr QByteArrayView ContentTypeHeader("content-type"); |
111 | constexpr QByteArrayView AcceptEncodingHeader("accept-encoding"); |
112 | constexpr QByteArrayView TEHeader("te"); |
113 | |
114 | constexpr QByteArrayView GrpcServiceNameHeader("service-name"); |
115 | constexpr QByteArrayView GrpcAcceptEncodingHeader("grpc-accept-encoding"); |
116 | constexpr QByteArrayView GrpcStatusHeader("grpc-status"); |
117 | constexpr QByteArrayView GrpcStatusMessageHeader("grpc-message"); |
118 | constexpr qsizetype GrpcMessageSizeHeaderSize = 5; |
119 | constexpr QByteArrayView DefaultContentType = "application/grpc"; |
120 | |
121 | // This HTTP/2 Error Codes to QGrpcStatus::StatusCode mapping should be kept in sync |
122 | // with the following docs: |
123 | // https://www.rfc-editor.org/rfc/rfc7540#section-7 |
124 | // https://github.com/grpc/grpc/blob/master/doc/statuscodes.md |
125 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md |
126 | constexpr StatusCode http2ErrorToStatusCode(const quint32 http2Error) |
127 | { |
128 | using Http2Error = Http2::Http2Error; |
129 | |
130 | switch (http2Error) { |
131 | case Http2Error::HTTP2_NO_ERROR: |
132 | case Http2Error::PROTOCOL_ERROR: |
133 | case Http2Error::INTERNAL_ERROR: |
134 | case Http2Error::FLOW_CONTROL_ERROR: |
135 | case Http2Error::SETTINGS_TIMEOUT: |
136 | case Http2Error::STREAM_CLOSED: |
137 | case Http2Error::FRAME_SIZE_ERROR: |
138 | return StatusCode::Internal; |
139 | case Http2Error::REFUSE_STREAM: |
140 | return StatusCode::Unavailable; |
141 | case Http2Error::CANCEL: |
142 | return StatusCode::Cancelled; |
143 | case Http2Error::COMPRESSION_ERROR: |
144 | case Http2Error::CONNECT_ERROR: |
145 | return StatusCode::Internal; |
146 | case Http2Error::ENHANCE_YOUR_CALM: |
147 | return StatusCode::ResourceExhausted; |
148 | case Http2Error::INADEQUATE_SECURITY: |
149 | return StatusCode::PermissionDenied; |
150 | case Http2Error::HTTP_1_1_REQUIRED: |
151 | return StatusCode::Unknown; |
152 | } |
153 | return StatusCode::Internal; |
154 | } |
155 | |
156 | // Sends the errorOccured and finished signals asynchronously to make sure user |
157 | // connections work correctly. |
158 | void operationContextAsyncError(QGrpcOperationContext *operationContext, const QGrpcStatus &status) |
159 | { |
160 | Q_ASSERT_X(operationContext != nullptr, "QGrpcHttp2ChannelPrivate::operationContextAsyncError", |
161 | "operationContext is null"); |
162 | QTimer::singleShot(interval: 0, receiver: operationContext, |
163 | slot: [operationContext, status]() { emit operationContext->finished(status); }); |
164 | } |
165 | |
166 | } // namespace |
167 | |
168 | struct ExpectedData |
169 | { |
170 | qsizetype expectedSize = 0; |
171 | QByteArray container; |
172 | |
173 | bool updateExpectedSize() |
174 | { |
175 | if (expectedSize == 0) { |
176 | if (container.size() < GrpcMessageSizeHeaderSize) |
177 | return false; |
178 | expectedSize = qFromBigEndian<quint32>(src: container.data() + 1) |
179 | + GrpcMessageSizeHeaderSize; |
180 | } |
181 | return true; |
182 | } |
183 | }; |
184 | |
185 | // The Http2Handler manages an individual RPC over the HTTP/2 channel. |
186 | // Each instance corresponds to an RPC initiated by the user. |
187 | class Http2Handler : public QObject |
188 | { |
189 | // Q_OBJECT macro is not needed and adds unwanted overhead. |
190 | |
191 | public: |
192 | enum State : uint8_t { Active, Cancelled, Finished }; |
193 | |
194 | explicit Http2Handler(const std::shared_ptr<QGrpcOperationContext> &operation, |
195 | QGrpcHttp2ChannelPrivate *parent, bool endStream); |
196 | ~Http2Handler() override; |
197 | |
198 | void sendInitialRequest(); |
199 | void attachStream(QHttp2Stream *stream_); |
200 | void processQueue(); |
201 | |
202 | [[nodiscard]] QGrpcOperationContext *operation() const; |
203 | [[nodiscard]] bool expired() const { return m_operation.expired(); } |
204 | |
205 | [[nodiscard]] bool isStreamClosedForSending() const |
206 | { |
207 | // If stream pointer is nullptr this means we never opened it and should collect |
208 | // the incoming messages in queue until the stream is opened or the error occurred. |
209 | return m_stream != nullptr |
210 | && (m_stream->state() == QHttp2Stream::State::HalfClosedLocal |
211 | || m_stream->state() == QHttp2Stream::State::Closed); |
212 | } |
213 | |
214 | // context slot handlers: |
215 | bool cancel(); |
216 | void writesDone(); |
217 | void writeMessage(QByteArrayView data); |
218 | void deadlineTimeout(); |
219 | |
220 | private: |
221 | void prepareInitialRequest(QGrpcOperationContext *operationContext, |
222 | QGrpcHttp2ChannelPrivate *channel); |
223 | |
224 | HPack::HttpHeader m_initialHeaders; |
225 | std::weak_ptr<QGrpcOperationContext> m_operation; |
226 | QQueue<QByteArray> m_queue; |
227 | QPointer<QHttp2Stream> m_stream; |
228 | ExpectedData m_expectedData; |
229 | State m_handlerState = Active; |
230 | const bool m_endStreamAtFirstData; |
231 | QTimer m_deadlineTimer; |
232 | |
233 | Q_DISABLE_COPY_MOVE(Http2Handler) |
234 | }; |
235 | |
236 | class QGrpcHttp2ChannelPrivate : public QObject |
237 | { |
238 | Q_OBJECT |
239 | public: |
240 | explicit QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q); |
241 | ~QGrpcHttp2ChannelPrivate() override = default; |
242 | |
243 | void processOperation(const std::shared_ptr<QGrpcOperationContext> &operationContext, |
244 | bool endStream = false); |
245 | |
246 | void deleteHandler(Http2Handler *handler); |
247 | [[nodiscard]] bool isLocalSocket() const |
248 | { |
249 | #if QT_CONFIG(localserver) |
250 | return m_isLocalSocket; |
251 | #else |
252 | return false; |
253 | #endif |
254 | } |
255 | [[nodiscard]] QByteArrayView contentType() const { return m_contentType; } |
256 | |
257 | std::shared_ptr<QAbstractProtobufSerializer> serializer; |
258 | QUrl hostUri; |
259 | QGrpcHttp2Channel *q_ptr = nullptr; |
260 | |
261 | private: |
262 | enum ConnectionState { Connecting = 0, Connected, Error }; |
263 | |
264 | template <typename T> |
265 | static void connectErrorHandler(T *socket, QGrpcOperationContext *operationContext) |
266 | { |
267 | QObject::connect(socket, &T::errorOccurred, operationContext, |
268 | [operationContextPtr = QPointer(operationContext)](auto error) { |
269 | emit operationContextPtr->finished(status: QGrpcStatus{ |
270 | StatusCode::Unavailable, |
271 | QGrpcHttp2ChannelPrivate::tr(s: "Network error occurred %1") |
272 | .arg(error) }); |
273 | }); |
274 | } |
275 | |
276 | void sendInitialRequest(Http2Handler *handler); |
277 | void createHttp2Connection(); |
278 | void handleSocketError(); |
279 | |
280 | template <typename T> |
281 | T *initSocket() |
282 | { |
283 | auto p = std::make_unique<T>(); |
284 | T *typedSocket = p.get(); |
285 | m_socket = std::move(p); |
286 | return typedSocket; |
287 | } |
288 | |
289 | std::unique_ptr<QIODevice> m_socket = nullptr; |
290 | QHttp2Connection *m_connection = nullptr; |
291 | QList<Http2Handler *> m_activeHandlers; |
292 | QList<Http2Handler *> m_pendingHandlers; |
293 | #if QT_CONFIG(localserver) |
294 | bool m_isLocalSocket = false; |
295 | #endif |
296 | QByteArray m_contentType; |
297 | ConnectionState m_state = Connecting; |
298 | std::function<void()> m_reconnectFunction; |
299 | |
300 | Q_DISABLE_COPY_MOVE(QGrpcHttp2ChannelPrivate) |
301 | }; |
302 | |
303 | /// |
304 | /// ## Http2Handler Implementations |
305 | /// |
306 | |
307 | Http2Handler::Http2Handler(const std::shared_ptr<QGrpcOperationContext> &operation, |
308 | QGrpcHttp2ChannelPrivate *parent, bool endStream) |
309 | : QObject(parent), m_operation(operation), m_endStreamAtFirstData(endStream) |
310 | { |
311 | auto *channelOpPtr = operation.get(); |
312 | QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::cancelRequested, context: this, |
313 | slot: &Http2Handler::cancel); |
314 | QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::writesDoneRequested, context: this, |
315 | slot: &Http2Handler::writesDone); |
316 | if (!m_endStreamAtFirstData) { |
317 | QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::writeMessageRequested, context: this, |
318 | slot: &Http2Handler::writeMessage); |
319 | } |
320 | QObject::connect(sender: channelOpPtr, signal: &QGrpcOperationContext::finished, context: &m_deadlineTimer, |
321 | slot: &QTimer::stop); |
322 | prepareInitialRequest(operationContext: channelOpPtr, channel: parent); |
323 | } |
324 | |
325 | Http2Handler::~Http2Handler() |
326 | { |
327 | if (m_stream) { |
328 | QHttp2Stream *streamPtr = m_stream.get(); |
329 | m_stream.clear(); |
330 | delete streamPtr; |
331 | } |
332 | } |
333 | |
334 | // Attaches the HTTP/2 stream and sets up the necessary connections and |
335 | // preconditions. The HTTP/2 connection is established, and the transport |
336 | // is now ready for communication. |
337 | void Http2Handler::attachStream(QHttp2Stream *stream_) |
338 | { |
339 | Q_ASSERT(m_stream == nullptr); |
340 | Q_ASSERT(stream_ != nullptr); |
341 | |
342 | auto *channelOpPtr = operation(); |
343 | m_stream = stream_; |
344 | |
345 | auto *parentChannel = qobject_cast<QGrpcHttp2ChannelPrivate *>(object: parent()); |
346 | QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::headersReceived, context: channelOpPtr, |
347 | slot: [channelOpPtr, parentChannel, this](const HPack::HttpHeader &headers, |
348 | bool endStream) { |
349 | auto md = channelOpPtr->serverMetadata(); |
350 | QtGrpc::StatusCode statusCode = StatusCode::Ok; |
351 | QString statusMessage; |
352 | for (const auto &header : headers) { |
353 | md.insert(key: header.name, value: header.value); |
354 | if (header.name == GrpcStatusHeader) { |
355 | statusCode = static_cast< |
356 | StatusCode>(QString::fromLatin1(ba: header.value).toShort()); |
357 | } else if (header.name == GrpcStatusMessageHeader) { |
358 | statusMessage = QString::fromUtf8(ba: header.value); |
359 | } |
360 | } |
361 | |
362 | channelOpPtr->setServerMetadata(std::move(md)); |
363 | |
364 | if (endStream) { |
365 | if (m_handlerState != Cancelled) { |
366 | emit channelOpPtr->finished( |
367 | status: QGrpcStatus{ statusCode,statusMessage }); |
368 | } |
369 | parentChannel->deleteHandler(handler: this); |
370 | } |
371 | }); |
372 | |
373 | Q_ASSERT(parentChannel != nullptr); |
374 | auto errorConnection = std::make_shared<QMetaObject::Connection>(); |
375 | *errorConnection = QObject::connect( |
376 | sender: m_stream.get(), signal: &QHttp2Stream::errorOccurred, context: parentChannel, |
377 | slot: [parentChannel, errorConnection, this](quint32 http2ErrorCode, const QString &errorString) { |
378 | if (!m_operation.expired()) { |
379 | auto channelOp = m_operation.lock(); |
380 | emit channelOp->finished(status: QGrpcStatus{ http2ErrorToStatusCode(http2Error: http2ErrorCode), |
381 | errorString }); |
382 | } |
383 | parentChannel->deleteHandler(handler: this); |
384 | QObject::disconnect(*errorConnection); |
385 | }); |
386 | |
387 | QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::dataReceived, context: channelOpPtr, |
388 | slot: [channelOpPtr, parentChannel, this](const QByteArray &data, bool endStream) { |
389 | if (m_handlerState != Cancelled) { |
390 | m_expectedData.container.append(a: data); |
391 | |
392 | if (!m_expectedData.updateExpectedSize()) |
393 | return; |
394 | |
395 | while (m_expectedData.container.size() |
396 | >= m_expectedData.expectedSize) { |
397 | qGrpcDebug() << "Full data received:"<< data.size() |
398 | << "dataContainer:"<< m_expectedData.container.size() |
399 | << "capacity:"<< m_expectedData.expectedSize; |
400 | emit channelOpPtr |
401 | ->messageReceived(data: m_expectedData.container |
402 | .mid(index: GrpcMessageSizeHeaderSize, |
403 | len: m_expectedData.expectedSize |
404 | - GrpcMessageSizeHeaderSize)); |
405 | m_expectedData.container.remove(index: 0, len: m_expectedData.expectedSize); |
406 | m_expectedData.expectedSize = 0; |
407 | if (!m_expectedData.updateExpectedSize()) |
408 | return; |
409 | } |
410 | if (endStream) { |
411 | m_handlerState = Finished; |
412 | emit channelOpPtr->finished(status: {}); |
413 | parentChannel->deleteHandler(handler: this); |
414 | } |
415 | } |
416 | }); |
417 | |
418 | QObject::connect(sender: m_stream.get(), signal: &QHttp2Stream::uploadFinished, context: this, |
419 | slot: &Http2Handler::processQueue); |
420 | |
421 | std::optional<std::chrono::milliseconds> deadline; |
422 | if (channelOpPtr->callOptions().deadlineTimeout()) |
423 | deadline = channelOpPtr->callOptions().deadlineTimeout(); |
424 | else if (parentChannel->q_ptr->channelOptions().deadlineTimeout()) |
425 | deadline = parentChannel->q_ptr->channelOptions().deadlineTimeout(); |
426 | if (deadline) { |
427 | // We have an active stream and a deadline. It's time to start the timer. |
428 | QObject::connect(sender: &m_deadlineTimer, signal: &QTimer::timeout, context: this, slot: &Http2Handler::deadlineTimeout); |
429 | m_deadlineTimer.start(value: *deadline); |
430 | } |
431 | } |
432 | |
433 | QGrpcOperationContext *Http2Handler::operation() const |
434 | { |
435 | Q_ASSERT(!m_operation.expired()); |
436 | |
437 | return m_operation.lock().get(); |
438 | } |
439 | |
440 | // Prepares the initial headers and enqueues the initial message. |
441 | // The headers are sent once the HTTP/2 connection is established. |
442 | void Http2Handler::prepareInitialRequest(QGrpcOperationContext *operationContext, |
443 | QGrpcHttp2ChannelPrivate *channel) |
444 | { |
445 | const auto &channelOptions = channel->q_ptr->channelOptions(); |
446 | QByteArray service{ operationContext->service().data(), operationContext->service().size() }; |
447 | QByteArray method{ operationContext->method().data(), operationContext->method().size() }; |
448 | m_initialHeaders = HPack::HttpHeader{ |
449 | { AuthorityHeader.toByteArray(), channel->hostUri.host().toLatin1() }, |
450 | { MethodHeader.toByteArray(), "POST"_ba}, |
451 | { PathHeader.toByteArray(), QByteArray('/' + service + '/' + method) }, |
452 | { SchemeHeader.toByteArray(), |
453 | channel->isLocalSocket() ? "http"_ba: channel->hostUri.scheme().toLatin1() }, |
454 | { ContentTypeHeader.toByteArray(), channel->contentType().toByteArray() }, |
455 | { GrpcServiceNameHeader.toByteArray(), { service } }, |
456 | { GrpcAcceptEncodingHeader.toByteArray(), "identity,deflate,gzip"_ba}, |
457 | { AcceptEncodingHeader.toByteArray(), "identity,gzip"_ba}, |
458 | { TEHeader.toByteArray(), "trailers"_ba}, |
459 | }; |
460 | |
461 | auto iterateMetadata = [this](const auto &metadata) { |
462 | for (const auto &[key, value] : metadata.asKeyValueRange()) { |
463 | const auto lowerKey = key.toLower(); |
464 | if (lowerKey == AuthorityHeader || lowerKey == MethodHeader || lowerKey == PathHeader |
465 | || lowerKey == SchemeHeader || lowerKey == ContentTypeHeader) { |
466 | continue; |
467 | } |
468 | m_initialHeaders.emplace_back(lowerKey, value); |
469 | } |
470 | }; |
471 | |
472 | iterateMetadata(channelOptions.metadata()); |
473 | iterateMetadata(operationContext->callOptions().metadata()); |
474 | |
475 | writeMessage(data: operationContext->argument()); |
476 | } |
477 | |
478 | // Slot to enqueue a writeMessage request, either from the initial message |
479 | // or from the user in client/bidirectional streaming RPCs. |
480 | void Http2Handler::writeMessage(QByteArrayView data) |
481 | { |
482 | if (m_handlerState != Active || isStreamClosedForSending()) { |
483 | qGrpcDebug("Attempt sending data to the ended stream"); |
484 | return; |
485 | } |
486 | |
487 | QByteArray msg(GrpcMessageSizeHeaderSize + data.size(), '\0'); |
488 | // Args must be 4-byte unsigned int to fit into 4-byte big endian |
489 | qToBigEndian(src: static_cast<quint32>(data.size()), dest: msg.data() + 1); |
490 | |
491 | // protect against nullptr data. |
492 | if (!data.isEmpty()) { |
493 | std::memcpy(dest: msg.begin() + GrpcMessageSizeHeaderSize, src: data.begin(), |
494 | n: static_cast<size_t>(data.size())); |
495 | } |
496 | |
497 | m_queue.enqueue(t: msg); |
498 | processQueue(); |
499 | } |
500 | |
501 | // Sends the initial headers and processes the message queue containing the |
502 | // initial message. At this point, the HTTP/2 connection is established, and |
503 | // the stream is attached. |
504 | void Http2Handler::sendInitialRequest() |
505 | { |
506 | Q_ASSERT(!m_initialHeaders.empty()); |
507 | Q_ASSERT(m_stream); |
508 | |
509 | if (!m_stream->sendHEADERS(headers: m_initialHeaders, endStream: false)) { |
510 | operationContextAsyncError(operationContext: operation(), |
511 | status: QGrpcStatus{ |
512 | StatusCode::Unavailable, |
513 | tr(s: "Unable to send initial headers to an HTTP/2 stream") }); |
514 | return; |
515 | } |
516 | m_initialHeaders.clear(); |
517 | processQueue(); |
518 | } |
519 | |
520 | // The core logic for sending the already serialized data through the HTTP/2 stream. |
521 | // This function is invoked either by the user via writeMessageRequested() or |
522 | // writesDoneRequested(), or it is continuously polled after the previous uploadFinished() |
523 | void Http2Handler::processQueue() |
524 | { |
525 | if (!m_stream) |
526 | return; |
527 | |
528 | if (m_stream->isUploadingDATA()) |
529 | return; |
530 | |
531 | if (m_queue.isEmpty()) |
532 | return; |
533 | |
534 | // Take ownership of the byte device. |
535 | auto *device = QNonContiguousByteDeviceFactory::create(byteArray: m_queue.dequeue()); |
536 | device->setParent(m_stream); |
537 | |
538 | m_stream->sendDATA(device, endStream: device->atEnd() || m_endStreamAtFirstData); |
539 | // Manage the lifetime through the uploadFinished signal (or this |
540 | // Http2Handler). Don't use QObject::deleteLater here as this function is |
541 | // expensive and blocking. Delete the byte device directly. |
542 | // This is fine in this context. |
543 | connect(sender: m_stream.get(), signal: &QHttp2Stream::uploadFinished, context: device, slot: [device] { delete device; }); |
544 | } |
545 | |
546 | bool Http2Handler::cancel() |
547 | { |
548 | if (m_handlerState != Active || !m_stream) |
549 | return false; |
550 | m_handlerState = Cancelled; |
551 | |
552 | // Client cancelled the stream before the deadline exceeded. |
553 | m_deadlineTimer.stop(); |
554 | |
555 | // Immediate cancellation by sending the RST_STREAM frame. |
556 | return m_stream->sendRST_STREAM(errorCode: Http2::Http2Error::CANCEL); |
557 | } |
558 | |
559 | void Http2Handler::writesDone() |
560 | { |
561 | if (m_handlerState != Active) |
562 | return; |
563 | |
564 | m_handlerState = Finished; |
565 | |
566 | // Stream is already (half)closed, skip sending the DATA frame with the end-of-stream flag. |
567 | if (isStreamClosedForSending()) |
568 | return; |
569 | |
570 | m_queue.enqueue(t: {}); |
571 | processQueue(); |
572 | } |
573 | |
574 | void Http2Handler::deadlineTimeout() |
575 | { |
576 | Q_ASSERT_X(m_stream, "onDeadlineTimeout", "stream is not available"); |
577 | |
578 | if (m_operation.expired()) { |
579 | qGrpcWarning("Operation expired on deadline timeout"); |
580 | return; |
581 | } |
582 | // cancel the stream by sending the RST_FRAME and report |
583 | // the status back to our user. |
584 | if (cancel()) { |
585 | emit m_operation.lock()->finished(status: { StatusCode::DeadlineExceeded, |
586 | "Deadline Exceeded"}); |
587 | } else { |
588 | qGrpcWarning("Cancellation failed on deadline timeout."); |
589 | } |
590 | } |
591 | |
592 | /// |
593 | /// ## QGrpcHttp2ChannelPrivate Implementations |
594 | /// |
595 | |
596 | QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QUrl &uri, QGrpcHttp2Channel *q) |
597 | : hostUri(uri), q_ptr(q) |
598 | { |
599 | auto channelOptions = q_ptr->channelOptions(); |
600 | auto formatSuffix = channelOptions.serializationFormat().suffix(); |
601 | const QByteArray defaultContentType = DefaultContentType.toByteArray(); |
602 | const QByteArray contentTypeFromOptions = !formatSuffix.isEmpty() |
603 | ? defaultContentType + '+' + formatSuffix |
604 | : defaultContentType; |
605 | bool warnAboutFormatConflict = !formatSuffix.isEmpty(); |
606 | |
607 | const auto it = channelOptions.metadata().constFind(key: ContentTypeHeader.data()); |
608 | if (it != channelOptions.metadata().cend()) { |
609 | if (formatSuffix.isEmpty() && it.value() != DefaultContentType) { |
610 | if (it.value() == "application/grpc+json") { |
611 | channelOptions.setSerializationFormat(SerializationFormat::Json); |
612 | } else if (it.value() == "application/grpc+proto"|| it.value() == DefaultContentType) { |
613 | channelOptions.setSerializationFormat(SerializationFormat::Protobuf); |
614 | } else { |
615 | qGrpcWarning() << "Cannot choose the serializer for "<< ContentTypeHeader |
616 | << it.value() << ". Using protobuf format as the default one."; |
617 | channelOptions.setSerializationFormat(SerializationFormat::Default); |
618 | } |
619 | q_ptr->setChannelOptions(channelOptions); |
620 | } else if (it.value() != contentTypeFromOptions) { |
621 | warnAboutFormatConflict = true; |
622 | } else { |
623 | warnAboutFormatConflict = false; |
624 | } |
625 | } else { |
626 | warnAboutFormatConflict = false; |
627 | } |
628 | |
629 | if (formatSuffix == channelOptions.serializationFormat().suffix()) { // no change |
630 | m_contentType = contentTypeFromOptions; |
631 | } else { // format has changed, update content type |
632 | m_contentType = !channelOptions.serializationFormat().suffix().isEmpty() |
633 | ? defaultContentType + '+' + channelOptions.serializationFormat().suffix() |
634 | : defaultContentType; |
635 | } |
636 | |
637 | if (warnAboutFormatConflict) { |
638 | qGrpcWarning() |
639 | << "Manually specified serialization format '%1' doesn't match the %2 header value " |
640 | "'%3'"_L1.arg(args: QString::fromLatin1(ba: contentTypeFromOptions), |
641 | args: QString::fromLatin1(ba: ContentTypeHeader), |
642 | args: QString::fromLatin1(ba: it.value())); |
643 | } |
644 | |
645 | #if QT_CONFIG(localserver) |
646 | if (hostUri.scheme() == "unix"_L1) { |
647 | auto *localSocket = initSocket<QLocalSocket>(); |
648 | m_isLocalSocket = true; |
649 | |
650 | QObject::connect(sender: localSocket, signal: &QLocalSocket::connected, context: this, |
651 | slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection); |
652 | QObject::connect(sender: localSocket, signal: &QLocalSocket::errorOccurred, context: this, |
653 | slot: [this](QLocalSocket::LocalSocketError error) { |
654 | qGrpcDebug() |
655 | << "Error occurred("<< error << "):" |
656 | << static_cast<QLocalSocket *>(m_socket.get())->errorString() |
657 | << hostUri; |
658 | handleSocketError(); |
659 | }); |
660 | m_reconnectFunction = [localSocket, this] { |
661 | localSocket->connectToServer(name: hostUri.host() + hostUri.path()); |
662 | }; |
663 | } else |
664 | #endif |
665 | #if QT_CONFIG(ssl) |
666 | if (hostUri.scheme() == "https"_L1|| channelOptions.sslConfiguration()) { |
667 | auto *sslSocket = initSocket<QSslSocket>(); |
668 | if (hostUri.port() < 0) { |
669 | hostUri.setPort(443); |
670 | } |
671 | |
672 | if (const auto userSslConfig = channelOptions.sslConfiguration(); userSslConfig) { |
673 | sslSocket->setSslConfiguration(*userSslConfig); |
674 | } else { |
675 | static const QByteArray h2NexProtocol = "h2"_ba; |
676 | auto defautlSslConfig = QSslConfiguration::defaultConfiguration(); |
677 | auto allowedNextProtocols = defautlSslConfig.allowedNextProtocols(); |
678 | if (!allowedNextProtocols.contains(t: h2NexProtocol)) |
679 | allowedNextProtocols.append(t: h2NexProtocol); |
680 | defautlSslConfig.setAllowedNextProtocols(allowedNextProtocols); |
681 | sslSocket->setSslConfiguration(defautlSslConfig); |
682 | } |
683 | |
684 | QObject::connect(sender: sslSocket, signal: &QSslSocket::encrypted, context: this, |
685 | slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection); |
686 | QObject::connect(sender: sslSocket, signal: &QAbstractSocket::errorOccurred, context: this, |
687 | slot: [this](QAbstractSocket::SocketError error) { |
688 | qDebug() |
689 | << "Error occurred("<< error << "):" |
690 | << static_cast<QAbstractSocket *>(m_socket.get())->errorString() |
691 | << hostUri; |
692 | handleSocketError(); |
693 | }); |
694 | m_reconnectFunction = [sslSocket, this] { |
695 | sslSocket->connectToHostEncrypted(hostName: hostUri.host(), port: static_cast<quint16>(hostUri.port())); |
696 | }; |
697 | } else |
698 | #endif |
699 | { |
700 | if (hostUri.scheme() != "http"_L1) { |
701 | qGrpcWarning() << "Unsupported transport protocol scheme '"<< hostUri.scheme() |
702 | << "'. Fall back to 'http'."; |
703 | } |
704 | |
705 | auto *httpSocket = initSocket<QTcpSocket>(); |
706 | if (hostUri.port() < 0) { |
707 | hostUri.setPort(80); |
708 | } |
709 | |
710 | QObject::connect(sender: httpSocket, signal: &QAbstractSocket::connected, context: this, |
711 | slot: &QGrpcHttp2ChannelPrivate::createHttp2Connection); |
712 | QObject::connect(sender: httpSocket, signal: &QAbstractSocket::errorOccurred, context: this, |
713 | slot: [this](QAbstractSocket::SocketError error) { |
714 | qGrpcDebug() |
715 | << "Error occurred("<< error << "):" |
716 | << static_cast<QAbstractSocket *>(m_socket.get())->errorString() |
717 | << hostUri; |
718 | handleSocketError(); |
719 | }); |
720 | m_reconnectFunction = [httpSocket, this] { |
721 | httpSocket->connectToHost(hostName: hostUri.host(), port: static_cast<quint16>(hostUri.port())); |
722 | }; |
723 | } |
724 | m_reconnectFunction(); |
725 | } |
726 | |
727 | void QGrpcHttp2ChannelPrivate::processOperation(const std::shared_ptr<QGrpcOperationContext> |
728 | &operationContext, |
729 | bool endStream) |
730 | { |
731 | auto *operationContextPtr = operationContext.get(); |
732 | Q_ASSERT_X(operationContextPtr != nullptr, "QGrpcHttp2ChannelPrivate::processOperation", |
733 | "operation context is nullptr."); |
734 | |
735 | if (!m_socket->isWritable()) { |
736 | operationContextAsyncError(operationContext: operationContextPtr, |
737 | status: QGrpcStatus{ StatusCode::Unavailable, |
738 | m_socket->errorString() }); |
739 | return; |
740 | } |
741 | |
742 | #if QT_CONFIG(localserver) |
743 | if (m_isLocalSocket) { |
744 | connectErrorHandler<QLocalSocket>(socket: static_cast<QLocalSocket *>(m_socket.get()), |
745 | operationContext: operationContextPtr); |
746 | } else |
747 | #endif |
748 | { |
749 | connectErrorHandler<QAbstractSocket>(socket: static_cast<QAbstractSocket *>(m_socket.get()), |
750 | operationContext: operationContextPtr); |
751 | } |
752 | |
753 | auto *handler = new Http2Handler(operationContext, this, endStream); |
754 | if (m_connection == nullptr) { |
755 | m_pendingHandlers.push_back(t: handler); |
756 | } else { |
757 | sendInitialRequest(handler); |
758 | m_activeHandlers.push_back(t: handler); |
759 | } |
760 | |
761 | if (m_state == ConnectionState::Error) { |
762 | Q_ASSERT_X(m_reconnectFunction, "QGrpcHttp2ChannelPrivate::processOperation", |
763 | "Socket reconnection function is not defined."); |
764 | m_reconnectFunction(); |
765 | m_state = ConnectionState::Connecting; |
766 | } |
767 | } |
768 | |
769 | void QGrpcHttp2ChannelPrivate::createHttp2Connection() |
770 | { |
771 | Q_ASSERT_X(m_connection == nullptr, "QGrpcHttp2ChannelPrivate::createHttp2Connection", |
772 | "Attempt to create the HTTP/2 connection, but it already exists. This situation is " |
773 | "exceptional."); |
774 | m_connection = QHttp2Connection::createDirectConnection(socket: m_socket.get(), config: {}); |
775 | |
776 | if (m_connection) { |
777 | QObject::connect(sender: m_socket.get(), signal: &QAbstractSocket::readyRead, context: m_connection, |
778 | slot: &QHttp2Connection::handleReadyRead); |
779 | m_state = ConnectionState::Connected; |
780 | } |
781 | |
782 | for (const auto &handler : m_pendingHandlers) { |
783 | if (handler->expired()) { |
784 | delete handler; |
785 | continue; |
786 | } |
787 | sendInitialRequest(handler); |
788 | } |
789 | m_activeHandlers.append(l: m_pendingHandlers); |
790 | m_pendingHandlers.clear(); |
791 | } |
792 | |
793 | void QGrpcHttp2ChannelPrivate::handleSocketError() |
794 | { |
795 | qDeleteAll(c: m_activeHandlers); |
796 | m_activeHandlers.clear(); |
797 | qDeleteAll(c: m_pendingHandlers); |
798 | m_pendingHandlers.clear(); |
799 | delete m_connection; |
800 | m_connection = nullptr; |
801 | m_state = ConnectionState::Error; |
802 | } |
803 | |
804 | void QGrpcHttp2ChannelPrivate::sendInitialRequest(Http2Handler *handler) |
805 | { |
806 | Q_ASSERT(handler != nullptr); |
807 | auto *channelOpPtr = handler->operation(); |
808 | if (!m_connection) { |
809 | operationContextAsyncError(operationContext: channelOpPtr, |
810 | status: QGrpcStatus{ |
811 | StatusCode::Unavailable, |
812 | tr(s: "Unable to establish an HTTP/2 connection") }); |
813 | return; |
814 | } |
815 | |
816 | const auto streamAttempt = m_connection->createStream(); |
817 | if (!streamAttempt.ok()) { |
818 | operationContextAsyncError(operationContext: channelOpPtr, |
819 | status: QGrpcStatus{ |
820 | StatusCode::Unavailable, |
821 | tr(s: "Unable to create an HTTP/2 stream (%1)") |
822 | .arg(a: QDebug::toString(object: streamAttempt.error())) }); |
823 | return; |
824 | } |
825 | handler->attachStream(stream_: streamAttempt.unwrap()); |
826 | handler->sendInitialRequest(); |
827 | } |
828 | |
829 | void QGrpcHttp2ChannelPrivate::deleteHandler(Http2Handler *handler) |
830 | { |
831 | const auto it = std::find(first: m_activeHandlers.constBegin(), last: m_activeHandlers.constEnd(), val: handler); |
832 | if (it == m_activeHandlers.constEnd()) |
833 | return; |
834 | handler->deleteLater(); |
835 | m_activeHandlers.erase(pos: it); |
836 | } |
837 | |
838 | /// |
839 | /// ## QGrpcHttp2Channel Implementations |
840 | /// |
841 | |
842 | /*! |
843 | Constructs QGrpcHttp2Channel with \a hostUri. |
844 | */ |
845 | QGrpcHttp2Channel::QGrpcHttp2Channel(const QUrl &hostUri) |
846 | : d_ptr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: hostUri, args: this)) |
847 | { |
848 | } |
849 | |
850 | /*! |
851 | Constructs QGrpcHttp2Channel with \a hostUri and \a options. |
852 | */ |
853 | QGrpcHttp2Channel::QGrpcHttp2Channel(const QUrl &hostUri, const QGrpcChannelOptions &options) |
854 | : QAbstractGrpcChannel(options), |
855 | d_ptr(std::make_unique<QGrpcHttp2ChannelPrivate>(args: hostUri, args: this)) |
856 | { |
857 | } |
858 | |
859 | /*! |
860 | Destroys the QGrpcHttp2Channel object. |
861 | */ |
862 | QGrpcHttp2Channel::~QGrpcHttp2Channel() = default; |
863 | |
864 | /*! |
865 | Returns the host URI for this channel. |
866 | */ |
867 | QUrl QGrpcHttp2Channel::hostUri() const |
868 | { |
869 | return d_ptr->hostUri; |
870 | } |
871 | |
872 | /*! |
873 | \internal |
874 | Implementation of unary gRPC call based on \l QNetworkAccessManager. |
875 | */ |
876 | void QGrpcHttp2Channel::call(std::shared_ptr<QGrpcOperationContext> operationContext) |
877 | { |
878 | d_ptr->processOperation(operationContext, endStream: true); |
879 | } |
880 | |
881 | /*! |
882 | \internal |
883 | Implementation of server-side gRPC stream based on \l QNetworkAccessManager. |
884 | */ |
885 | void QGrpcHttp2Channel::serverStream(std::shared_ptr<QGrpcOperationContext> operationContext) |
886 | { |
887 | d_ptr->processOperation(operationContext, endStream: true); |
888 | } |
889 | |
890 | /*! |
891 | \internal |
892 | Implementation of client-side gRPC stream based on \l QNetworkAccessManager. |
893 | */ |
894 | void QGrpcHttp2Channel::clientStream(std::shared_ptr<QGrpcOperationContext> operationContext) |
895 | { |
896 | d_ptr->processOperation(operationContext); |
897 | } |
898 | |
899 | /*! |
900 | \internal |
901 | Implementation of bidirectional gRPC stream based on \l QNetworkAccessManager. |
902 | */ |
903 | void QGrpcHttp2Channel::bidiStream(std::shared_ptr<QGrpcOperationContext> operationContext) |
904 | { |
905 | d_ptr->processOperation(operationContext); |
906 | } |
907 | |
908 | /*! |
909 | Returns the newly created QProtobufSerializer shared pointer. |
910 | */ |
911 | std::shared_ptr<QAbstractProtobufSerializer> QGrpcHttp2Channel::serializer() const |
912 | { |
913 | return channelOptions().serializationFormat().serializer(); |
914 | } |
915 | |
916 | QT_END_NAMESPACE |
917 | |
918 | #include "qgrpchttp2channel.moc" |
919 |
Definitions
- AuthorityHeader
- MethodHeader
- PathHeader
- SchemeHeader
- ContentTypeHeader
- AcceptEncodingHeader
- TEHeader
- GrpcServiceNameHeader
- GrpcAcceptEncodingHeader
- GrpcStatusHeader
- GrpcStatusMessageHeader
- GrpcMessageSizeHeaderSize
- DefaultContentType
- http2ErrorToStatusCode
- operationContextAsyncError
- ExpectedData
- updateExpectedSize
- Http2Handler
- State
- expired
- isStreamClosedForSending
- Http2Handler
- QGrpcHttp2ChannelPrivate
- ~QGrpcHttp2ChannelPrivate
- isLocalSocket
- contentType
- ConnectionState
- connectErrorHandler
- initSocket
- QGrpcHttp2ChannelPrivate
- Http2Handler
- ~Http2Handler
- attachStream
- operation
- prepareInitialRequest
- writeMessage
- sendInitialRequest
- processQueue
- cancel
- writesDone
- deadlineTimeout
- QGrpcHttp2ChannelPrivate
- processOperation
- createHttp2Connection
- handleSocketError
- sendInitialRequest
- deleteHandler
- QGrpcHttp2Channel
- QGrpcHttp2Channel
- ~QGrpcHttp2Channel
- hostUri
- call
- serverStream
- clientStream
- bidiStream
Start learning QML with our Intro Training
Find out more