1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#ifndef GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
20#define GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
21
22// IWYU pragma: private
23
24#include <array>
25#include <functional>
26
27#include <grpc/impl/codegen/grpc_types.h>
28#include <grpcpp/impl/call_op_set_interface.h>
29#include <grpcpp/impl/codegen/call.h>
30#include <grpcpp/impl/codegen/client_interceptor.h>
31#include <grpcpp/impl/codegen/intercepted_channel.h>
32#include <grpcpp/impl/codegen/server_interceptor.h>
33
34namespace grpc {
35namespace internal {
36
37class InterceptorBatchMethodsImpl
38 : public experimental::InterceptorBatchMethods {
39 public:
40 InterceptorBatchMethodsImpl() {
41 for (auto i = static_cast<experimental::InterceptionHookPoints>(0);
42 i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS;
43 i = static_cast<experimental::InterceptionHookPoints>(
44 static_cast<size_t>(i) + 1)) {
45 hooks_[static_cast<size_t>(i)] = false;
46 }
47 }
48
49 ~InterceptorBatchMethodsImpl() override {}
50
51 bool QueryInterceptionHookPoint(
52 experimental::InterceptionHookPoints type) override {
53 return hooks_[static_cast<size_t>(type)];
54 }
55
56 void Proceed() override {
57 if (call_->client_rpc_info() != nullptr) {
58 return ProceedClient();
59 }
60 GPR_CODEGEN_ASSERT(call_->server_rpc_info() != nullptr);
61 ProceedServer();
62 }
63
64 void Hijack() override {
65 // Only the client can hijack when sending down initial metadata
66 GPR_CODEGEN_ASSERT(!reverse_ && ops_ != nullptr &&
67 call_->client_rpc_info() != nullptr);
68 // It is illegal to call Hijack twice
69 GPR_CODEGEN_ASSERT(!ran_hijacking_interceptor_);
70 auto* rpc_info = call_->client_rpc_info();
71 rpc_info->hijacked_ = true;
72 rpc_info->hijacked_interceptor_ = current_interceptor_index_;
73 ClearHookPoints();
74 ops_->SetHijackingState();
75 ran_hijacking_interceptor_ = true;
76 rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
77 }
78
79 void AddInterceptionHookPoint(experimental::InterceptionHookPoints type) {
80 hooks_[static_cast<size_t>(type)] = true;
81 }
82
83 ByteBuffer* GetSerializedSendMessage() override {
84 GPR_CODEGEN_ASSERT(orig_send_message_ != nullptr);
85 if (*orig_send_message_ != nullptr) {
86 GPR_CODEGEN_ASSERT(serializer_(*orig_send_message_).ok());
87 *orig_send_message_ = nullptr;
88 }
89 return send_message_;
90 }
91
92 const void* GetSendMessage() override {
93 GPR_CODEGEN_ASSERT(orig_send_message_ != nullptr);
94 return *orig_send_message_;
95 }
96
97 void ModifySendMessage(const void* message) override {
98 GPR_CODEGEN_ASSERT(orig_send_message_ != nullptr);
99 *orig_send_message_ = message;
100 }
101
102 bool GetSendMessageStatus() override { return !*fail_send_message_; }
103
104 std::multimap<std::string, std::string>* GetSendInitialMetadata() override {
105 return send_initial_metadata_;
106 }
107
108 Status GetSendStatus() override {
109 return Status(static_cast<StatusCode>(*code_), *error_message_,
110 *error_details_);
111 }
112
113 void ModifySendStatus(const Status& status) override {
114 *code_ = static_cast<grpc_status_code>(status.error_code());
115 *error_details_ = status.error_details();
116 *error_message_ = status.error_message();
117 }
118
119 std::multimap<std::string, std::string>* GetSendTrailingMetadata() override {
120 return send_trailing_metadata_;
121 }
122
123 void* GetRecvMessage() override { return recv_message_; }
124
125 std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
126 override {
127 return recv_initial_metadata_->map();
128 }
129
130 Status* GetRecvStatus() override { return recv_status_; }
131
132 void FailHijackedSendMessage() override {
133 GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>(
134 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)]);
135 *fail_send_message_ = true;
136 }
137
138 std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
139 override {
140 return recv_trailing_metadata_->map();
141 }
142
143 void SetSendMessage(ByteBuffer* buf, const void** msg,
144 bool* fail_send_message,
145 std::function<Status(const void*)> serializer) {
146 send_message_ = buf;
147 orig_send_message_ = msg;
148 fail_send_message_ = fail_send_message;
149 serializer_ = serializer;
150 }
151
152 void SetSendInitialMetadata(
153 std::multimap<std::string, std::string>* metadata) {
154 send_initial_metadata_ = metadata;
155 }
156
157 void SetSendStatus(grpc_status_code* code, std::string* error_details,
158 std::string* error_message) {
159 code_ = code;
160 error_details_ = error_details;
161 error_message_ = error_message;
162 }
163
164 void SetSendTrailingMetadata(
165 std::multimap<std::string, std::string>* metadata) {
166 send_trailing_metadata_ = metadata;
167 }
168
169 void SetRecvMessage(void* message, bool* hijacked_recv_message_failed) {
170 recv_message_ = message;
171 hijacked_recv_message_failed_ = hijacked_recv_message_failed;
172 }
173
174 void SetRecvInitialMetadata(MetadataMap* map) {
175 recv_initial_metadata_ = map;
176 }
177
178 void SetRecvStatus(Status* status) { recv_status_ = status; }
179
180 void SetRecvTrailingMetadata(MetadataMap* map) {
181 recv_trailing_metadata_ = map;
182 }
183
184 std::unique_ptr<ChannelInterface> GetInterceptedChannel() override {
185 auto* info = call_->client_rpc_info();
186 if (info == nullptr) {
187 return std::unique_ptr<ChannelInterface>(nullptr);
188 }
189 // The intercepted channel starts from the interceptor just after the
190 // current interceptor
191 return std::unique_ptr<ChannelInterface>(new InterceptedChannel(
192 info->channel(), current_interceptor_index_ + 1));
193 }
194
195 void FailHijackedRecvMessage() override {
196 GPR_CODEGEN_ASSERT(hooks_[static_cast<size_t>(
197 experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)]);
198 *hijacked_recv_message_failed_ = true;
199 }
200
201 // Clears all state
202 void ClearState() {
203 reverse_ = false;
204 ran_hijacking_interceptor_ = false;
205 ClearHookPoints();
206 }
207
208 // Prepares for Post_recv operations
209 void SetReverse() {
210 reverse_ = true;
211 ran_hijacking_interceptor_ = false;
212 ClearHookPoints();
213 }
214
215 // This needs to be set before interceptors are run
216 void SetCall(Call* call) { call_ = call; }
217
218 // This needs to be set before interceptors are run using RunInterceptors().
219 // Alternatively, RunInterceptors(std::function<void(void)> f) can be used.
220 void SetCallOpSetInterface(CallOpSetInterface* ops) { ops_ = ops; }
221
222 // SetCall should have been called before this.
223 // Returns true if the interceptors list is empty
224 bool InterceptorsListEmpty() {
225 auto* client_rpc_info = call_->client_rpc_info();
226 if (client_rpc_info != nullptr) {
227 return client_rpc_info->interceptors_.empty();
228 }
229
230 auto* server_rpc_info = call_->server_rpc_info();
231 return server_rpc_info == nullptr || server_rpc_info->interceptors_.empty();
232 }
233
234 // This should be used only by subclasses of CallOpSetInterface. SetCall and
235 // SetCallOpSetInterface should have been called before this. After all the
236 // interceptors are done running, either ContinueFillOpsAfterInterception or
237 // ContinueFinalizeOpsAfterInterception will be called. Note that neither of
238 // them is invoked if there were no interceptors registered.
239 bool RunInterceptors() {
240 GPR_CODEGEN_ASSERT(ops_);
241 auto* client_rpc_info = call_->client_rpc_info();
242 if (client_rpc_info != nullptr) {
243 if (client_rpc_info->interceptors_.empty()) {
244 return true;
245 } else {
246 RunClientInterceptors();
247 return false;
248 }
249 }
250
251 auto* server_rpc_info = call_->server_rpc_info();
252 if (server_rpc_info == nullptr || server_rpc_info->interceptors_.empty()) {
253 return true;
254 }
255 RunServerInterceptors();
256 return false;
257 }
258
259 // Returns true if no interceptors are run. Returns false otherwise if there
260 // are interceptors registered. After the interceptors are done running \a f
261 // will be invoked. This is to be used only by BaseAsyncRequest and
262 // SyncRequest.
263 bool RunInterceptors(std::function<void(void)> f) {
264 // This is used only by the server for initial call request
265 GPR_CODEGEN_ASSERT(reverse_ == true);
266 GPR_CODEGEN_ASSERT(call_->client_rpc_info() == nullptr);
267 auto* server_rpc_info = call_->server_rpc_info();
268 if (server_rpc_info == nullptr || server_rpc_info->interceptors_.empty()) {
269 return true;
270 }
271 callback_ = std::move(f);
272 RunServerInterceptors();
273 return false;
274 }
275
276 private:
277 void RunClientInterceptors() {
278 auto* rpc_info = call_->client_rpc_info();
279 if (!reverse_) {
280 current_interceptor_index_ = 0;
281 } else {
282 if (rpc_info->hijacked_) {
283 current_interceptor_index_ = rpc_info->hijacked_interceptor_;
284 } else {
285 current_interceptor_index_ = rpc_info->interceptors_.size() - 1;
286 }
287 }
288 rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
289 }
290
291 void RunServerInterceptors() {
292 auto* rpc_info = call_->server_rpc_info();
293 if (!reverse_) {
294 current_interceptor_index_ = 0;
295 } else {
296 current_interceptor_index_ = rpc_info->interceptors_.size() - 1;
297 }
298 rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
299 }
300
301 void ProceedClient() {
302 auto* rpc_info = call_->client_rpc_info();
303 if (rpc_info->hijacked_ && !reverse_ &&
304 current_interceptor_index_ == rpc_info->hijacked_interceptor_ &&
305 !ran_hijacking_interceptor_) {
306 // We now need to provide hijacked recv ops to this interceptor
307 ClearHookPoints();
308 ops_->SetHijackingState();
309 ran_hijacking_interceptor_ = true;
310 rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
311 return;
312 }
313 if (!reverse_) {
314 current_interceptor_index_++;
315 // We are going down the stack of interceptors
316 if (current_interceptor_index_ < rpc_info->interceptors_.size()) {
317 if (rpc_info->hijacked_ &&
318 current_interceptor_index_ > rpc_info->hijacked_interceptor_) {
319 // This is a hijacked RPC and we are done with hijacking
320 ops_->ContinueFillOpsAfterInterception();
321 } else {
322 rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
323 }
324 } else {
325 // we are done running all the interceptors without any hijacking
326 ops_->ContinueFillOpsAfterInterception();
327 }
328 } else {
329 // We are going up the stack of interceptors
330 if (current_interceptor_index_ > 0) {
331 // Continue running interceptors
332 current_interceptor_index_--;
333 rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
334 } else {
335 // we are done running all the interceptors without any hijacking
336 ops_->ContinueFinalizeResultAfterInterception();
337 }
338 }
339 }
340
341 void ProceedServer() {
342 auto* rpc_info = call_->server_rpc_info();
343 if (!reverse_) {
344 current_interceptor_index_++;
345 if (current_interceptor_index_ < rpc_info->interceptors_.size()) {
346 return rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
347 } else if (ops_) {
348 return ops_->ContinueFillOpsAfterInterception();
349 }
350 } else {
351 // We are going up the stack of interceptors
352 if (current_interceptor_index_ > 0) {
353 // Continue running interceptors
354 current_interceptor_index_--;
355 return rpc_info->RunInterceptor(interceptor_methods: this, pos: current_interceptor_index_);
356 } else if (ops_) {
357 return ops_->ContinueFinalizeResultAfterInterception();
358 }
359 }
360 GPR_CODEGEN_ASSERT(callback_);
361 callback_();
362 }
363
364 void ClearHookPoints() {
365 for (auto i = static_cast<experimental::InterceptionHookPoints>(0);
366 i < experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS;
367 i = static_cast<experimental::InterceptionHookPoints>(
368 static_cast<size_t>(i) + 1)) {
369 hooks_[static_cast<size_t>(i)] = false;
370 }
371 }
372
373 std::array<bool,
374 static_cast<size_t>(
375 experimental::InterceptionHookPoints::NUM_INTERCEPTION_HOOKS)>
376 hooks_;
377
378 size_t current_interceptor_index_ = 0; // Current iterator
379 bool reverse_ = false;
380 bool ran_hijacking_interceptor_ = false;
381 Call* call_ = nullptr; // The Call object is present along with CallOpSet
382 // object/callback
383 CallOpSetInterface* ops_ = nullptr;
384 std::function<void(void)> callback_;
385
386 ByteBuffer* send_message_ = nullptr;
387 bool* fail_send_message_ = nullptr;
388 const void** orig_send_message_ = nullptr;
389 std::function<Status(const void*)> serializer_;
390
391 std::multimap<std::string, std::string>* send_initial_metadata_;
392
393 grpc_status_code* code_ = nullptr;
394 std::string* error_details_ = nullptr;
395 std::string* error_message_ = nullptr;
396
397 std::multimap<std::string, std::string>* send_trailing_metadata_ = nullptr;
398
399 void* recv_message_ = nullptr;
400 bool* hijacked_recv_message_failed_ = nullptr;
401
402 MetadataMap* recv_initial_metadata_ = nullptr;
403
404 Status* recv_status_ = nullptr;
405
406 MetadataMap* recv_trailing_metadata_ = nullptr;
407};
408
409// A special implementation of InterceptorBatchMethods to send a Cancel
410// notification down the interceptor stack
411class CancelInterceptorBatchMethods
412 : public experimental::InterceptorBatchMethods {
413 public:
414 bool QueryInterceptionHookPoint(
415 experimental::InterceptionHookPoints type) override {
416 return type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL;
417 }
418
419 void Proceed() override {
420 // This is a no-op. For actual continuation of the RPC simply needs to
421 // return from the Intercept method
422 }
423
424 void Hijack() override {
425 // Only the client can hijack when sending down initial metadata
426 GPR_CODEGEN_ASSERT(false &&
427 "It is illegal to call Hijack on a method which has a "
428 "Cancel notification");
429 }
430
431 ByteBuffer* GetSerializedSendMessage() override {
432 GPR_CODEGEN_ASSERT(false &&
433 "It is illegal to call GetSendMessage on a method which "
434 "has a Cancel notification");
435 return nullptr;
436 }
437
438 bool GetSendMessageStatus() override {
439 GPR_CODEGEN_ASSERT(
440 false &&
441 "It is illegal to call GetSendMessageStatus on a method which "
442 "has a Cancel notification");
443 return false;
444 }
445
446 const void* GetSendMessage() override {
447 GPR_CODEGEN_ASSERT(
448 false &&
449 "It is illegal to call GetOriginalSendMessage on a method which "
450 "has a Cancel notification");
451 return nullptr;
452 }
453
454 void ModifySendMessage(const void* /*message*/) override {
455 GPR_CODEGEN_ASSERT(
456 false &&
457 "It is illegal to call ModifySendMessage on a method which "
458 "has a Cancel notification");
459 }
460
461 std::multimap<std::string, std::string>* GetSendInitialMetadata() override {
462 GPR_CODEGEN_ASSERT(false &&
463 "It is illegal to call GetSendInitialMetadata on a "
464 "method which has a Cancel notification");
465 return nullptr;
466 }
467
468 Status GetSendStatus() override {
469 GPR_CODEGEN_ASSERT(false &&
470 "It is illegal to call GetSendStatus on a method which "
471 "has a Cancel notification");
472 return Status();
473 }
474
475 void ModifySendStatus(const Status& /*status*/) override {
476 GPR_CODEGEN_ASSERT(false &&
477 "It is illegal to call ModifySendStatus on a method "
478 "which has a Cancel notification");
479 }
480
481 std::multimap<std::string, std::string>* GetSendTrailingMetadata() override {
482 GPR_CODEGEN_ASSERT(false &&
483 "It is illegal to call GetSendTrailingMetadata on a "
484 "method which has a Cancel notification");
485 return nullptr;
486 }
487
488 void* GetRecvMessage() override {
489 GPR_CODEGEN_ASSERT(false &&
490 "It is illegal to call GetRecvMessage on a method which "
491 "has a Cancel notification");
492 return nullptr;
493 }
494
495 std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
496 override {
497 GPR_CODEGEN_ASSERT(false &&
498 "It is illegal to call GetRecvInitialMetadata on a "
499 "method which has a Cancel notification");
500 return nullptr;
501 }
502
503 Status* GetRecvStatus() override {
504 GPR_CODEGEN_ASSERT(false &&
505 "It is illegal to call GetRecvStatus on a method which "
506 "has a Cancel notification");
507 return nullptr;
508 }
509
510 std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
511 override {
512 GPR_CODEGEN_ASSERT(false &&
513 "It is illegal to call GetRecvTrailingMetadata on a "
514 "method which has a Cancel notification");
515 return nullptr;
516 }
517
518 std::unique_ptr<ChannelInterface> GetInterceptedChannel() override {
519 GPR_CODEGEN_ASSERT(false &&
520 "It is illegal to call GetInterceptedChannel on a "
521 "method which has a Cancel notification");
522 return std::unique_ptr<ChannelInterface>(nullptr);
523 }
524
525 void FailHijackedRecvMessage() override {
526 GPR_CODEGEN_ASSERT(false &&
527 "It is illegal to call FailHijackedRecvMessage on a "
528 "method which has a Cancel notification");
529 }
530
531 void FailHijackedSendMessage() override {
532 GPR_CODEGEN_ASSERT(false &&
533 "It is illegal to call FailHijackedSendMessage on a "
534 "method which has a Cancel notification");
535 }
536};
537} // namespace internal
538} // namespace grpc
539
540#endif // GRPCPP_IMPL_CODEGEN_INTERCEPTOR_COMMON_H
541

source code of include/grpcpp/impl/codegen/interceptor_common.h