1 | // Copyright (C) 2022 The Qt Company Ltd. |
2 | // Copyright (C) 2019 Alexey Edelev <semlanik@gmail.com> |
3 | // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only |
4 | |
5 | #include <QtCore/QThread> |
6 | #include <QtCore/QTimer> |
7 | #include <QtGrpc/private/qabstractgrpcchannel_p.h> |
8 | #include <QtGrpc/qgrpccallreply.h> |
9 | #include <QtGrpc/qgrpcstream.h> |
10 | #include <QtProtobuf/qprotobufserializer.h> |
11 | |
12 | #include <qtgrpcglobal_p.h> |
13 | |
14 | #include <private/qobject_p.h> |
15 | |
16 | #include "qabstractgrpcclient.h" |
17 | |
18 | QT_BEGIN_NAMESPACE |
19 | |
20 | using namespace Qt::StringLiterals; |
21 | |
22 | namespace { |
23 | static QString threadSafetyWarning(QLatin1StringView methodName) |
24 | { |
25 | return "%1 is called from a different thread.\n" |
26 | "Qt GRPC doesn't guarantee thread safety on the channel level.\n" |
27 | "You have to be confident that channel routines are working in " |
28 | "the same thread as QAbstractGrpcClient."_L1 .arg(args&: methodName); |
29 | } |
30 | } // namespace |
31 | |
32 | /*! |
33 | \class QAbstractGrpcClient |
34 | \inmodule QtGrpc |
35 | \brief The QAbstractGrpcClient class is bridge between gRPC clients |
36 | and channels. |
37 | |
38 | QAbstractGrpcClient provides a set of functions for client classes |
39 | generated out of protobuf services. |
40 | QAbstractGrpcClient enforces thread safety for startStream() and call() methods |
41 | of generated clients. |
42 | The methods QAbstractGrpcClient::call() and QAbstractGrpcClient::startStream() |
43 | should only be called by the generated client classes. |
44 | */ |
45 | |
46 | /*! |
47 | \fn template <typename ParamType> QGrpcStatus QAbstractGrpcClient::call(QLatin1StringView method, |
48 | const QProtobufMessage &arg, const QGrpcCallOptions &options); |
49 | |
50 | Synchronously calls the given \a method of this service client, |
51 | with argument \a arg. |
52 | Uses \a options argument to set additional parameter for the call. |
53 | */ |
54 | |
55 | /*! |
56 | \fn template <typename ParamType, typename ReturnType> QGrpcStatus QAbstractGrpcClient::call(QLatin1StringView method, |
57 | const QProtobufMessage &arg, ReturnType &ret, const QGrpcCallOptions &options); |
58 | |
59 | Synchronously calls the given \a method of this service client, |
60 | with argument \a arg and fills \a ret with gRPC reply. |
61 | Uses \a options argument to set additional parameter for the call. |
62 | */ |
63 | |
64 | /*! |
65 | \fn template <typename ParamType> QSharedPointer<QGrpcStream> QAbstractGrpcClient::startStream(QLatin1StringView method, |
66 | const QProtobufMessage &arg, const QGrpcCallOptions &options); |
67 | |
68 | Streams messages from the server stream \a method with the message |
69 | argument \a arg to the attached channel. |
70 | Uses \a options argument to set additional parameter for the call. |
71 | */ |
72 | |
73 | class QAbstractGrpcClientPrivate : public QObjectPrivate |
74 | { |
75 | Q_DECLARE_PUBLIC(QAbstractGrpcClient) |
76 | public: |
77 | QAbstractGrpcClientPrivate(QLatin1StringView service) : service(service.data(), service.size()) |
78 | { |
79 | } |
80 | |
81 | QGrpcStatus checkThread(QLatin1StringView warningPreamble); |
82 | |
83 | std::shared_ptr<QAbstractGrpcChannel> channel; |
84 | const std::string service; |
85 | std::vector<std::shared_ptr<QGrpcStream>> activeStreams; |
86 | }; |
87 | |
88 | QGrpcStatus QAbstractGrpcClientPrivate::checkThread(QLatin1StringView warningPreamble) |
89 | { |
90 | Q_Q(QAbstractGrpcClient); |
91 | |
92 | QGrpcStatus status; |
93 | if (q->thread() != QThread::currentThread()) { |
94 | status = { QGrpcStatus::Unknown, threadSafetyWarning(methodName: warningPreamble) }; |
95 | qGrpcCritical() << status.message(); |
96 | emit q->errorOccurred(status); |
97 | } |
98 | return status; |
99 | } |
100 | |
101 | QAbstractGrpcClient::QAbstractGrpcClient(QLatin1StringView service, QObject *parent) |
102 | : QObject(*new QAbstractGrpcClientPrivate(service), parent) |
103 | { |
104 | } |
105 | |
106 | QAbstractGrpcClient::~QAbstractGrpcClient() = default; |
107 | |
108 | /*! |
109 | Attaches \a channel to client as transport layer for gRPC. |
110 | |
111 | Parameters and return values will be serialized to the channel |
112 | in a format it supports. |
113 | |
114 | \note \b Warning: Qt GRPC doesn't guarantee thread safety on the channel level. |
115 | You have to invoke the channel-related functions on the same thread as |
116 | QAbstractGrpcClient. |
117 | */ |
118 | void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChannel> &channel) |
119 | { |
120 | if (channel->dPtr->threadId != QThread::currentThreadId()) { |
121 | const QString status = threadSafetyWarning(methodName: "QAbstractGrpcClient::attachChannel"_L1 ); |
122 | qGrpcCritical() << status; |
123 | emit errorOccurred(status: { QGrpcStatus::Unknown, status }); |
124 | return; |
125 | } |
126 | Q_D(QAbstractGrpcClient); |
127 | for (auto &stream : d->activeStreams) |
128 | stream->abort(); |
129 | |
130 | d->channel = channel; |
131 | } |
132 | |
133 | QGrpcStatus QAbstractGrpcClient::call(QLatin1StringView method, QByteArrayView arg, QByteArray &ret, |
134 | const QGrpcCallOptions &options) |
135 | { |
136 | Q_D(QAbstractGrpcClient); |
137 | |
138 | QGrpcStatus callStatus = d->checkThread(warningPreamble: "QAbstractGrpcClient::call"_L1 ); |
139 | if (callStatus != QGrpcStatus::Ok) |
140 | return callStatus; |
141 | |
142 | callStatus = d->channel |
143 | ? d->channel->call(method, service: QLatin1StringView(d->service), args: arg, ret, options) |
144 | : QGrpcStatus{ QGrpcStatus::Unknown, "No channel(s) attached."_L1 }; |
145 | |
146 | if (callStatus != QGrpcStatus::Ok) |
147 | emit errorOccurred(status: callStatus); |
148 | |
149 | return callStatus; |
150 | } |
151 | |
152 | std::shared_ptr<QGrpcCallReply> QAbstractGrpcClient::call(QLatin1StringView method, |
153 | QByteArrayView arg, |
154 | const QGrpcCallOptions &options) |
155 | { |
156 | std::shared_ptr<QGrpcCallReply> reply; |
157 | Q_D(QAbstractGrpcClient); |
158 | if (d->checkThread(warningPreamble: "QAbstractGrpcClient::call"_L1 ) != QGrpcStatus::Ok) |
159 | return reply; |
160 | |
161 | if (d->channel) { |
162 | reply = d->channel->call(method, service: QLatin1StringView(d->service), args: arg, options); |
163 | |
164 | auto errorConnection = std::make_shared<QMetaObject::Connection>(); |
165 | *errorConnection = connect(sender: reply.get(), signal: &QGrpcCallReply::errorOccurred, context: this, |
166 | slot: [this](const QGrpcStatus &status) { |
167 | emit errorOccurred(status); |
168 | }); |
169 | } else { |
170 | emit errorOccurred(status: { QGrpcStatus::Unknown, "No channel(s) attached."_L1 }); |
171 | } |
172 | |
173 | return reply; |
174 | } |
175 | |
176 | std::shared_ptr<QGrpcStream> QAbstractGrpcClient::startStream(QLatin1StringView method, |
177 | QByteArrayView arg, |
178 | const QGrpcCallOptions &options) |
179 | { |
180 | Q_D(QAbstractGrpcClient); |
181 | |
182 | std::shared_ptr<QGrpcStream> grpcStream; |
183 | if (d->checkThread(warningPreamble: "QAbstractGrpcClient::startStream"_L1 ) != QGrpcStatus::Ok) |
184 | return grpcStream; |
185 | |
186 | if (d->channel) { |
187 | grpcStream = d->channel->startStream(method, service: QLatin1StringView(d->service), arg, options); |
188 | |
189 | auto errorConnection = std::make_shared<QMetaObject::Connection>(); |
190 | *errorConnection = connect(sender: grpcStream.get(), signal: &QGrpcStream::errorOccurred, context: this, |
191 | slot: [this, grpcStream](const QGrpcStatus &status) { |
192 | Q_D(QAbstractGrpcClient); |
193 | qGrpcWarning() |
194 | << grpcStream->method() << "call" << d->service |
195 | << "stream error: " << status.message(); |
196 | errorOccurred(status); |
197 | }); |
198 | |
199 | auto finishedConnection = std::make_shared<QMetaObject::Connection>(); |
200 | *finishedConnection = connect( |
201 | sender: grpcStream.get(), signal: &QGrpcStream::finished, context: this, |
202 | slot: [this, grpcStream, errorConnection, finishedConnection]() mutable { |
203 | Q_D(QAbstractGrpcClient); |
204 | qGrpcWarning() |
205 | << grpcStream->method() << "call" << d->service << "stream finished." ; |
206 | |
207 | auto it = |
208 | std::find(first: d->activeStreams.begin(), last: d->activeStreams.end(), val: grpcStream); |
209 | if (it != d->activeStreams.end()) |
210 | d->activeStreams.erase(position: it); |
211 | |
212 | QObject::disconnect(*errorConnection); |
213 | QObject::disconnect(*finishedConnection); |
214 | grpcStream.reset(); |
215 | }); |
216 | |
217 | d->activeStreams.push_back(x: grpcStream); |
218 | } else { |
219 | emit errorOccurred(status: { QGrpcStatus::Unknown, "No channel(s) attached."_L1 }); |
220 | } |
221 | return grpcStream; |
222 | } |
223 | |
224 | /*! |
225 | Serializer provides assigned to client serializer. |
226 | Returns pointer to serializerowned by QProtobufSerializerRegistry. |
227 | */ |
228 | std::shared_ptr<QAbstractProtobufSerializer> QAbstractGrpcClient::serializer() const |
229 | { |
230 | Q_D(const QAbstractGrpcClient); |
231 | if (const auto &c = d->channel) |
232 | return c->serializer(); |
233 | return nullptr; |
234 | } |
235 | |
236 | QGrpcStatus QAbstractGrpcClient::handleDeserializationError( |
237 | const QAbstractProtobufSerializer::DeserializationError &err) |
238 | { |
239 | QGrpcStatus status{ QGrpcStatus::Ok }; |
240 | switch (err) { |
241 | case QAbstractProtobufSerializer::InvalidHeaderError: { |
242 | const QLatin1StringView errStr("Response deserialization failed: invalid field found." ); |
243 | status = { QGrpcStatus::InvalidArgument, errStr }; |
244 | qGrpcCritical() << errStr; |
245 | emit errorOccurred(status); |
246 | } break; |
247 | case QAbstractProtobufSerializer::NoDeserializerError: { |
248 | const QLatin1StringView errStr("No deserializer was found for a given type." ); |
249 | status = { QGrpcStatus::InvalidArgument, errStr }; |
250 | qGrpcCritical() << errStr; |
251 | emit errorOccurred(status); |
252 | } break; |
253 | case QAbstractProtobufSerializer::UnexpectedEndOfStreamError: { |
254 | const QLatin1StringView errStr("Invalid size of received buffer." ); |
255 | status = { QGrpcStatus::OutOfRange, errStr }; |
256 | qGrpcCritical() << errStr; |
257 | emit errorOccurred(status); |
258 | } break; |
259 | case QAbstractProtobufSerializer::NoError: |
260 | Q_FALLTHROUGH(); |
261 | default: |
262 | const QLatin1StringView errStr("Deserializing failed, but no error was set." ); |
263 | status = { QGrpcStatus::InvalidArgument, errStr }; |
264 | qGrpcCritical() << errStr; |
265 | emit errorOccurred(status); |
266 | } |
267 | return status; |
268 | } |
269 | |
270 | QT_END_NAMESPACE |
271 | |
272 | #include "moc_qabstractgrpcclient.cpp" |
273 | |