1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20#define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21
22#include <cstring>
23#include <map>
24#include <memory>
25
26#include <grpc/impl/codegen/compression_types.h>
27#include <grpc/impl/codegen/grpc_types.h>
28#include <grpcpp/impl/codegen/byte_buffer.h>
29#include <grpcpp/impl/codegen/call.h>
30#include <grpcpp/impl/codegen/call_hook.h>
31#include <grpcpp/impl/codegen/call_op_set_interface.h>
32#include <grpcpp/impl/codegen/client_context_impl.h>
33#include <grpcpp/impl/codegen/completion_queue_impl.h>
34#include <grpcpp/impl/codegen/completion_queue_tag.h>
35#include <grpcpp/impl/codegen/config.h>
36#include <grpcpp/impl/codegen/core_codegen_interface.h>
37#include <grpcpp/impl/codegen/intercepted_channel.h>
38#include <grpcpp/impl/codegen/interceptor_common.h>
39#include <grpcpp/impl/codegen/serialization_traits.h>
40#include <grpcpp/impl/codegen/slice.h>
41#include <grpcpp/impl/codegen/string_ref.h>
42
43namespace grpc {
44
45extern CoreCodegenInterface* g_core_codegen_interface;
46
47namespace internal {
48class Call;
49class CallHook;
50
51// TODO(yangg) if the map is changed before we send, the pointers will be a
52// mess. Make sure it does not happen.
53inline grpc_metadata* FillMetadataArray(
54 const std::multimap<grpc::string, grpc::string>& metadata,
55 size_t* metadata_count, const grpc::string& optional_error_details) {
56 *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
57 if (*metadata_count == 0) {
58 return nullptr;
59 }
60 grpc_metadata* metadata_array =
61 (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
62 size: (*metadata_count) * sizeof(grpc_metadata)));
63 size_t i = 0;
64 for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
65 metadata_array[i].key = SliceReferencingString(str: iter->first);
66 metadata_array[i].value = SliceReferencingString(str: iter->second);
67 }
68 if (!optional_error_details.empty()) {
69 metadata_array[i].key =
70 g_core_codegen_interface->grpc_slice_from_static_buffer(
71 buffer: kBinaryErrorDetailsKey, length: sizeof(kBinaryErrorDetailsKey) - 1);
72 metadata_array[i].value = SliceReferencingString(str: optional_error_details);
73 }
74 return metadata_array;
75}
76} // namespace internal
77
78/// Per-message write options.
79class WriteOptions {
80 public:
81 WriteOptions() : flags_(0), last_message_(false) {}
82 WriteOptions(const WriteOptions& other)
83 : flags_(other.flags_), last_message_(other.last_message_) {}
84
85 /// Default assignment operator
86 WriteOptions& operator=(const WriteOptions& other) = default;
87
88 /// Clear all flags.
89 inline void Clear() { flags_ = 0; }
90
91 /// Returns raw flags bitset.
92 inline uint32_t flags() const { return flags_; }
93
94 /// Sets flag for the disabling of compression for the next message write.
95 ///
96 /// \sa GRPC_WRITE_NO_COMPRESS
97 inline WriteOptions& set_no_compression() {
98 SetBit(GRPC_WRITE_NO_COMPRESS);
99 return *this;
100 }
101
102 /// Clears flag for the disabling of compression for the next message write.
103 ///
104 /// \sa GRPC_WRITE_NO_COMPRESS
105 inline WriteOptions& clear_no_compression() {
106 ClearBit(GRPC_WRITE_NO_COMPRESS);
107 return *this;
108 }
109
110 /// Get value for the flag indicating whether compression for the next
111 /// message write is forcefully disabled.
112 ///
113 /// \sa GRPC_WRITE_NO_COMPRESS
114 inline bool get_no_compression() const {
115 return GetBit(GRPC_WRITE_NO_COMPRESS);
116 }
117
118 /// Sets flag indicating that the write may be buffered and need not go out on
119 /// the wire immediately.
120 ///
121 /// \sa GRPC_WRITE_BUFFER_HINT
122 inline WriteOptions& set_buffer_hint() {
123 SetBit(GRPC_WRITE_BUFFER_HINT);
124 return *this;
125 }
126
127 /// Clears flag indicating that the write may be buffered and need not go out
128 /// on the wire immediately.
129 ///
130 /// \sa GRPC_WRITE_BUFFER_HINT
131 inline WriteOptions& clear_buffer_hint() {
132 ClearBit(GRPC_WRITE_BUFFER_HINT);
133 return *this;
134 }
135
136 /// Get value for the flag indicating that the write may be buffered and need
137 /// not go out on the wire immediately.
138 ///
139 /// \sa GRPC_WRITE_BUFFER_HINT
140 inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
141
142 /// corked bit: aliases set_buffer_hint currently, with the intent that
143 /// set_buffer_hint will be removed in the future
144 inline WriteOptions& set_corked() {
145 SetBit(GRPC_WRITE_BUFFER_HINT);
146 return *this;
147 }
148
149 inline WriteOptions& clear_corked() {
150 ClearBit(GRPC_WRITE_BUFFER_HINT);
151 return *this;
152 }
153
154 inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
155
156 /// last-message bit: indicates this is the last message in a stream
157 /// client-side: makes Write the equivalent of performing Write, WritesDone
158 /// in a single step
159 /// server-side: hold the Write until the service handler returns (sync api)
160 /// or until Finish is called (async api)
161 inline WriteOptions& set_last_message() {
162 last_message_ = true;
163 return *this;
164 }
165
166 /// Clears flag indicating that this is the last message in a stream,
167 /// disabling coalescing.
168 inline WriteOptions& clear_last_message() {
169 last_message_ = false;
170 return *this;
171 }
172
173 /// Guarantee that all bytes have been written to the socket before completing
174 /// this write (usually writes are completed when they pass flow control).
175 inline WriteOptions& set_write_through() {
176 SetBit(GRPC_WRITE_THROUGH);
177 return *this;
178 }
179
180 inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
181
182 /// Get value for the flag indicating that this is the last message, and
183 /// should be coalesced with trailing metadata.
184 ///
185 /// \sa GRPC_WRITE_LAST_MESSAGE
186 bool is_last_message() const { return last_message_; }
187
188 private:
189 void SetBit(const uint32_t mask) { flags_ |= mask; }
190
191 void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
192
193 bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
194
195 uint32_t flags_;
196 bool last_message_;
197};
198
199namespace internal {
200
201/// Default argument for CallOpSet. I is unused by the class, but can be
202/// used for generating multiple names for the same thing.
203template <int I>
204class CallNoOp {
205 protected:
206 void AddOp(grpc_op* /*ops*/, size_t* /*nops*/) {}
207 void FinishOp(bool* /*status*/) {}
208 void SetInterceptionHookPoint(
209 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
210 void SetFinishInterceptionHookPoint(
211 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
212 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
213 }
214};
215
216class CallOpSendInitialMetadata {
217 public:
218 CallOpSendInitialMetadata() : send_(false) {
219 maybe_compression_level_.is_set = false;
220 }
221
222 void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
223 uint32_t flags) {
224 maybe_compression_level_.is_set = false;
225 send_ = true;
226 flags_ = flags;
227 metadata_map_ = metadata;
228 }
229
230 void set_compression_level(grpc_compression_level level) {
231 maybe_compression_level_.is_set = true;
232 maybe_compression_level_.level = level;
233 }
234
235 protected:
236 void AddOp(grpc_op* ops, size_t* nops) {
237 if (!send_ || hijacked_) return;
238 grpc_op* op = &ops[(*nops)++];
239 op->op = GRPC_OP_SEND_INITIAL_METADATA;
240 op->flags = flags_;
241 op->reserved = NULL;
242 initial_metadata_ =
243 FillMetadataArray(metadata: *metadata_map_, metadata_count: &initial_metadata_count_, optional_error_details: "");
244 op->data.send_initial_metadata.count = initial_metadata_count_;
245 op->data.send_initial_metadata.metadata = initial_metadata_;
246 op->data.send_initial_metadata.maybe_compression_level.is_set =
247 maybe_compression_level_.is_set;
248 if (maybe_compression_level_.is_set) {
249 op->data.send_initial_metadata.maybe_compression_level.level =
250 maybe_compression_level_.level;
251 }
252 }
253 void FinishOp(bool* /*status*/) {
254 if (!send_ || hijacked_) return;
255 g_core_codegen_interface->gpr_free(p: initial_metadata_);
256 send_ = false;
257 }
258
259 void SetInterceptionHookPoint(
260 InterceptorBatchMethodsImpl* interceptor_methods) {
261 if (!send_) return;
262 interceptor_methods->AddInterceptionHookPoint(
263 type: experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA);
264 interceptor_methods->SetSendInitialMetadata(metadata_map_);
265 }
266
267 void SetFinishInterceptionHookPoint(
268 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
269
270 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
271 hijacked_ = true;
272 }
273
274 bool hijacked_ = false;
275 bool send_;
276 uint32_t flags_;
277 size_t initial_metadata_count_;
278 std::multimap<grpc::string, grpc::string>* metadata_map_;
279 grpc_metadata* initial_metadata_;
280 struct {
281 bool is_set;
282 grpc_compression_level level;
283 } maybe_compression_level_;
284};
285
286class CallOpSendMessage {
287 public:
288 CallOpSendMessage() : send_buf_() {}
289
290 /// Send \a message using \a options for the write. The \a options are cleared
291 /// after use.
292 template <class M>
293 Status SendMessage(const M& message,
294 WriteOptions options) GRPC_MUST_USE_RESULT;
295
296 template <class M>
297 Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
298
299 /// Send \a message using \a options for the write. The \a options are cleared
300 /// after use. This form of SendMessage allows gRPC to reference \a message
301 /// beyond the lifetime of SendMessage.
302 template <class M>
303 Status SendMessagePtr(const M* message,
304 WriteOptions options) GRPC_MUST_USE_RESULT;
305
306 /// This form of SendMessage allows gRPC to reference \a message beyond the
307 /// lifetime of SendMessage.
308 template <class M>
309 Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
310
311 protected:
312 void AddOp(grpc_op* ops, size_t* nops) {
313 if (msg_ == nullptr && !send_buf_.Valid()) return;
314 if (hijacked_) {
315 serializer_ = nullptr;
316 return;
317 }
318 if (msg_ != nullptr) {
319 GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
320 }
321 serializer_ = nullptr;
322 grpc_op* op = &ops[(*nops)++];
323 op->op = GRPC_OP_SEND_MESSAGE;
324 op->flags = write_options_.flags();
325 op->reserved = NULL;
326 op->data.send_message.send_message = send_buf_.c_buffer();
327 // Flags are per-message: clear them after use.
328 write_options_.Clear();
329 }
330 void FinishOp(bool* status) {
331 if (msg_ == nullptr && !send_buf_.Valid()) return;
332 if (hijacked_ && failed_send_) {
333 // Hijacking interceptor failed this Op
334 *status = false;
335 } else if (!*status) {
336 // This Op was passed down to core and the Op failed
337 failed_send_ = true;
338 }
339 }
340
341 void SetInterceptionHookPoint(
342 InterceptorBatchMethodsImpl* interceptor_methods) {
343 if (msg_ == nullptr && !send_buf_.Valid()) return;
344 interceptor_methods->AddInterceptionHookPoint(
345 type: experimental::InterceptionHookPoints::PRE_SEND_MESSAGE);
346 interceptor_methods->SetSendMessage(buf: &send_buf_, msg: &msg_, fail_send_message: &failed_send_,
347 serializer: serializer_);
348 }
349
350 void SetFinishInterceptionHookPoint(
351 InterceptorBatchMethodsImpl* interceptor_methods) {
352 if (msg_ != nullptr || send_buf_.Valid()) {
353 interceptor_methods->AddInterceptionHookPoint(
354 type: experimental::InterceptionHookPoints::POST_SEND_MESSAGE);
355 }
356 send_buf_.Clear();
357 msg_ = nullptr;
358 // The contents of the SendMessage value that was previously set
359 // has had its references stolen by core's operations
360 interceptor_methods->SetSendMessage(buf: nullptr, msg: nullptr, fail_send_message: &failed_send_,
361 serializer: nullptr);
362 }
363
364 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
365 hijacked_ = true;
366 }
367
368 private:
369 const void* msg_ = nullptr; // The original non-serialized message
370 bool hijacked_ = false;
371 bool failed_send_ = false;
372 ByteBuffer send_buf_;
373 WriteOptions write_options_;
374 std::function<Status(const void*)> serializer_;
375};
376
377template <class M>
378Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) {
379 write_options_ = options;
380 serializer_ = [this](const void* message) {
381 bool own_buf;
382 send_buf_.Clear();
383 // TODO(vjpai): Remove the void below when possible
384 // The void in the template parameter below should not be needed
385 // (since it should be implicit) but is needed due to an observed
386 // difference in behavior between clang and gcc for certain internal users
387 Status result = SerializationTraits<M, void>::Serialize(
388 *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
389 if (!own_buf) {
390 send_buf_.Duplicate();
391 }
392 return result;
393 };
394 // Serialize immediately only if we do not have access to the message pointer
395 if (msg_ == nullptr) {
396 Status result = serializer_(&message);
397 serializer_ = nullptr;
398 return result;
399 }
400 return Status();
401}
402
403template <class M>
404Status CallOpSendMessage::SendMessage(const M& message) {
405 return SendMessage(message, WriteOptions());
406}
407
408template <class M>
409Status CallOpSendMessage::SendMessagePtr(const M* message,
410 WriteOptions options) {
411 msg_ = message;
412 return SendMessage(*message, options);
413}
414
415template <class M>
416Status CallOpSendMessage::SendMessagePtr(const M* message) {
417 msg_ = message;
418 return SendMessage(*message, WriteOptions());
419}
420
421template <class R>
422class CallOpRecvMessage {
423 public:
424 void RecvMessage(R* message) { message_ = message; }
425
426 // Do not change status if no message is received.
427 void AllowNoMessage() { allow_not_getting_message_ = true; }
428
429 bool got_message = false;
430
431 protected:
432 void AddOp(grpc_op* ops, size_t* nops) {
433 if (message_ == nullptr || hijacked_) return;
434 grpc_op* op = &ops[(*nops)++];
435 op->op = GRPC_OP_RECV_MESSAGE;
436 op->flags = 0;
437 op->reserved = NULL;
438 op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
439 }
440
441 void FinishOp(bool* status) {
442 if (message_ == nullptr) return;
443 if (recv_buf_.Valid()) {
444 if (*status) {
445 got_message = *status =
446 SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
447 .ok();
448 recv_buf_.Release();
449 } else {
450 got_message = false;
451 recv_buf_.Clear();
452 }
453 } else if (hijacked_) {
454 if (hijacked_recv_message_failed_) {
455 FinishOpRecvMessageFailureHandler(status);
456 } else {
457 // The op was hijacked and it was successful. There is no further action
458 // to be performed since the message is already in its non-serialized
459 // form.
460 }
461 } else {
462 FinishOpRecvMessageFailureHandler(status);
463 }
464 }
465
466 void SetInterceptionHookPoint(
467 InterceptorBatchMethodsImpl* interceptor_methods) {
468 if (message_ == nullptr) return;
469 interceptor_methods->SetRecvMessage(message: message_,
470 hijacked_recv_message_failed: &hijacked_recv_message_failed_);
471 }
472
473 void SetFinishInterceptionHookPoint(
474 InterceptorBatchMethodsImpl* interceptor_methods) {
475 if (message_ == nullptr) return;
476 interceptor_methods->AddInterceptionHookPoint(
477 type: experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
478 if (!got_message) interceptor_methods->SetRecvMessage(message: nullptr, hijacked_recv_message_failed: nullptr);
479 }
480 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
481 hijacked_ = true;
482 if (message_ == nullptr) return;
483 interceptor_methods->AddInterceptionHookPoint(
484 type: experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
485 got_message = true;
486 }
487
488 private:
489 // Sets got_message and \a status for a failed recv message op
490 void FinishOpRecvMessageFailureHandler(bool* status) {
491 got_message = false;
492 if (!allow_not_getting_message_) {
493 *status = false;
494 }
495 }
496
497 R* message_ = nullptr;
498 ByteBuffer recv_buf_;
499 bool allow_not_getting_message_ = false;
500 bool hijacked_ = false;
501 bool hijacked_recv_message_failed_ = false;
502};
503
504class DeserializeFunc {
505 public:
506 virtual Status Deserialize(ByteBuffer* buf) = 0;
507 virtual ~DeserializeFunc() {}
508};
509
510template <class R>
511class DeserializeFuncType final : public DeserializeFunc {
512 public:
513 DeserializeFuncType(R* message) : message_(message) {}
514 Status Deserialize(ByteBuffer* buf) override {
515 return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
516 }
517
518 ~DeserializeFuncType() override {}
519
520 private:
521 R* message_; // Not a managed pointer because management is external to this
522};
523
524class CallOpGenericRecvMessage {
525 public:
526 template <class R>
527 void RecvMessage(R* message) {
528 // Use an explicit base class pointer to avoid resolution error in the
529 // following unique_ptr::reset for some old implementations.
530 DeserializeFunc* func = new DeserializeFuncType<R>(message);
531 deserialize_.reset(p: func);
532 message_ = message;
533 }
534
535 // Do not change status if no message is received.
536 void AllowNoMessage() { allow_not_getting_message_ = true; }
537
538 bool got_message = false;
539
540 protected:
541 void AddOp(grpc_op* ops, size_t* nops) {
542 if (!deserialize_ || hijacked_) return;
543 grpc_op* op = &ops[(*nops)++];
544 op->op = GRPC_OP_RECV_MESSAGE;
545 op->flags = 0;
546 op->reserved = NULL;
547 op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
548 }
549
550 void FinishOp(bool* status) {
551 if (!deserialize_ || hijacked_) return;
552 if (recv_buf_.Valid()) {
553 if (*status) {
554 got_message = true;
555 *status = deserialize_->Deserialize(buf: &recv_buf_).ok();
556 recv_buf_.Release();
557 } else {
558 got_message = false;
559 recv_buf_.Clear();
560 }
561 } else if (hijacked_) {
562 if (hijacked_recv_message_failed_) {
563 FinishOpRecvMessageFailureHandler(status);
564 } else {
565 // The op was hijacked and it was successful. There is no further action
566 // to be performed since the message is already in its non-serialized
567 // form.
568 }
569 } else {
570 got_message = false;
571 if (!allow_not_getting_message_) {
572 *status = false;
573 }
574 }
575 }
576
577 void SetInterceptionHookPoint(
578 InterceptorBatchMethodsImpl* interceptor_methods) {
579 if (!deserialize_) return;
580 interceptor_methods->SetRecvMessage(message: message_,
581 hijacked_recv_message_failed: &hijacked_recv_message_failed_);
582 }
583
584 void SetFinishInterceptionHookPoint(
585 InterceptorBatchMethodsImpl* interceptor_methods) {
586 if (!deserialize_) return;
587 interceptor_methods->AddInterceptionHookPoint(
588 type: experimental::InterceptionHookPoints::POST_RECV_MESSAGE);
589 if (!got_message) interceptor_methods->SetRecvMessage(message: nullptr, hijacked_recv_message_failed: nullptr);
590 deserialize_.reset();
591 }
592 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
593 hijacked_ = true;
594 if (!deserialize_) return;
595 interceptor_methods->AddInterceptionHookPoint(
596 type: experimental::InterceptionHookPoints::PRE_RECV_MESSAGE);
597 got_message = true;
598 }
599
600 private:
601 // Sets got_message and \a status for a failed recv message op
602 void FinishOpRecvMessageFailureHandler(bool* status) {
603 got_message = false;
604 if (!allow_not_getting_message_) {
605 *status = false;
606 }
607 }
608
609 void* message_ = nullptr;
610 std::unique_ptr<DeserializeFunc> deserialize_;
611 ByteBuffer recv_buf_;
612 bool allow_not_getting_message_ = false;
613 bool hijacked_ = false;
614 bool hijacked_recv_message_failed_ = false;
615};
616
617class CallOpClientSendClose {
618 public:
619 CallOpClientSendClose() : send_(false) {}
620
621 void ClientSendClose() { send_ = true; }
622
623 protected:
624 void AddOp(grpc_op* ops, size_t* nops) {
625 if (!send_ || hijacked_) return;
626 grpc_op* op = &ops[(*nops)++];
627 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
628 op->flags = 0;
629 op->reserved = NULL;
630 }
631 void FinishOp(bool* /*status*/) { send_ = false; }
632
633 void SetInterceptionHookPoint(
634 InterceptorBatchMethodsImpl* interceptor_methods) {
635 if (!send_) return;
636 interceptor_methods->AddInterceptionHookPoint(
637 type: experimental::InterceptionHookPoints::PRE_SEND_CLOSE);
638 }
639
640 void SetFinishInterceptionHookPoint(
641 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
642
643 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
644 hijacked_ = true;
645 }
646
647 private:
648 bool hijacked_ = false;
649 bool send_;
650};
651
652class CallOpServerSendStatus {
653 public:
654 CallOpServerSendStatus() : send_status_available_(false) {}
655
656 void ServerSendStatus(
657 std::multimap<grpc::string, grpc::string>* trailing_metadata,
658 const Status& status) {
659 send_error_details_ = status.error_details();
660 metadata_map_ = trailing_metadata;
661 send_status_available_ = true;
662 send_status_code_ = static_cast<grpc_status_code>(status.error_code());
663 send_error_message_ = status.error_message();
664 }
665
666 protected:
667 void AddOp(grpc_op* ops, size_t* nops) {
668 if (!send_status_available_ || hijacked_) return;
669 trailing_metadata_ = FillMetadataArray(
670 metadata: *metadata_map_, metadata_count: &trailing_metadata_count_, optional_error_details: send_error_details_);
671 grpc_op* op = &ops[(*nops)++];
672 op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
673 op->data.send_status_from_server.trailing_metadata_count =
674 trailing_metadata_count_;
675 op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
676 op->data.send_status_from_server.status = send_status_code_;
677 error_message_slice_ = SliceReferencingString(str: send_error_message_);
678 op->data.send_status_from_server.status_details =
679 send_error_message_.empty() ? nullptr : &error_message_slice_;
680 op->flags = 0;
681 op->reserved = NULL;
682 }
683
684 void FinishOp(bool* /*status*/) {
685 if (!send_status_available_ || hijacked_) return;
686 g_core_codegen_interface->gpr_free(p: trailing_metadata_);
687 send_status_available_ = false;
688 }
689
690 void SetInterceptionHookPoint(
691 InterceptorBatchMethodsImpl* interceptor_methods) {
692 if (!send_status_available_) return;
693 interceptor_methods->AddInterceptionHookPoint(
694 type: experimental::InterceptionHookPoints::PRE_SEND_STATUS);
695 interceptor_methods->SetSendTrailingMetadata(metadata_map_);
696 interceptor_methods->SetSendStatus(code: &send_status_code_, error_details: &send_error_details_,
697 error_message: &send_error_message_);
698 }
699
700 void SetFinishInterceptionHookPoint(
701 InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
702
703 void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
704 hijacked_ = true;
705 }
706
707 private:
708 bool hijacked_ = false;
709 bool send_status_available_;
710 grpc_status_code send_status_code_;
711 grpc::string send_error_details_;
712 grpc::string send_error_message_;
713 size_t trailing_metadata_count_;
714 std::multimap<grpc::string, grpc::string>* metadata_map_;
715 grpc_metadata* trailing_metadata_;
716 grpc_slice error_message_slice_;
717};
718
719class CallOpRecvInitialMetadata {
720 public:
721 CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
722
723 void RecvInitialMetadata(::grpc_impl::ClientContext* context) {
724 context->initial_metadata_received_ = true;
725 metadata_map_ = &context->recv_initial_metadata_;
726 }
727
728 protected:
729 void AddOp(grpc_op* ops, size_t* nops) {
730 if (metadata_map_ == nullptr || hijacked_) return;
731 grpc_op* op = &ops[(*nops)++];
732 op->op = GRPC_OP_RECV_INITIAL_METADATA;
733 op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
734 op->flags = 0;
735 op->reserved = NULL;
736 }
737
738 void FinishOp(bool* /*status*/) {
739 if (metadata_map_ == nullptr || hijacked_) return;
740 }
741
742 void SetInterceptionHookPoint(
743 InterceptorBatchMethodsImpl* interceptor_methods) {
744 interceptor_methods->SetRecvInitialMetadata(metadata_map_);
745 }
746
747 void SetFinishInterceptionHookPoint(
748 InterceptorBatchMethodsImpl* interceptor_methods) {
749 if (metadata_map_ == nullptr) return;
750 interceptor_methods->AddInterceptionHookPoint(
751 type: experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA);
752 metadata_map_ = nullptr;
753 }
754
755 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
756 hijacked_ = true;
757 if (metadata_map_ == nullptr) return;
758 interceptor_methods->AddInterceptionHookPoint(
759 type: experimental::InterceptionHookPoints::PRE_RECV_INITIAL_METADATA);
760 }
761
762 private:
763 bool hijacked_ = false;
764 MetadataMap* metadata_map_;
765};
766
767class CallOpClientRecvStatus {
768 public:
769 CallOpClientRecvStatus()
770 : recv_status_(nullptr), debug_error_string_(nullptr) {}
771
772 void ClientRecvStatus(::grpc_impl::ClientContext* context, Status* status) {
773 client_context_ = context;
774 metadata_map_ = &client_context_->trailing_metadata_;
775 recv_status_ = status;
776 error_message_ = g_core_codegen_interface->grpc_empty_slice();
777 }
778
779 protected:
780 void AddOp(grpc_op* ops, size_t* nops) {
781 if (recv_status_ == nullptr || hijacked_) return;
782 grpc_op* op = &ops[(*nops)++];
783 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
784 op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
785 op->data.recv_status_on_client.status = &status_code_;
786 op->data.recv_status_on_client.status_details = &error_message_;
787 op->data.recv_status_on_client.error_string = &debug_error_string_;
788 op->flags = 0;
789 op->reserved = NULL;
790 }
791
792 void FinishOp(bool* /*status*/) {
793 if (recv_status_ == nullptr || hijacked_) return;
794 if (static_cast<StatusCode>(status_code_) == StatusCode::OK) {
795 *recv_status_ = Status();
796 GPR_CODEGEN_DEBUG_ASSERT(debug_error_string_ == nullptr);
797 } else {
798 *recv_status_ =
799 Status(static_cast<StatusCode>(status_code_),
800 GRPC_SLICE_IS_EMPTY(error_message_)
801 ? grpc::string()
802 : grpc::string(GRPC_SLICE_START_PTR(error_message_),
803 GRPC_SLICE_END_PTR(error_message_)),
804 metadata_map_->GetBinaryErrorDetails());
805 if (debug_error_string_ != nullptr) {
806 client_context_->set_debug_error_string(debug_error_string_);
807 g_core_codegen_interface->gpr_free(p: (void*)debug_error_string_);
808 }
809 }
810 // TODO(soheil): Find callers that set debug string even for status OK,
811 // and fix them.
812 g_core_codegen_interface->grpc_slice_unref(slice: error_message_);
813 }
814
815 void SetInterceptionHookPoint(
816 InterceptorBatchMethodsImpl* interceptor_methods) {
817 interceptor_methods->SetRecvStatus(recv_status_);
818 interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
819 }
820
821 void SetFinishInterceptionHookPoint(
822 InterceptorBatchMethodsImpl* interceptor_methods) {
823 if (recv_status_ == nullptr) return;
824 interceptor_methods->AddInterceptionHookPoint(
825 type: experimental::InterceptionHookPoints::POST_RECV_STATUS);
826 recv_status_ = nullptr;
827 }
828
829 void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
830 hijacked_ = true;
831 if (recv_status_ == nullptr) return;
832 interceptor_methods->AddInterceptionHookPoint(
833 type: experimental::InterceptionHookPoints::PRE_RECV_STATUS);
834 }
835
836 private:
837 bool hijacked_ = false;
838 ::grpc_impl::ClientContext* client_context_;
839 MetadataMap* metadata_map_;
840 Status* recv_status_;
841 const char* debug_error_string_;
842 grpc_status_code status_code_;
843 grpc_slice error_message_;
844};
845
846template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
847 class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
848 class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
849class CallOpSet;
850
851/// Primary implementation of CallOpSetInterface.
852/// Since we cannot use variadic templates, we declare slots up to
853/// the maximum count of ops we'll need in a set. We leverage the
854/// empty base class optimization to slim this class (especially
855/// when there are many unused slots used). To avoid duplicate base classes,
856/// the template parameter for CallNoOp is varied by argument position.
857template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
858class CallOpSet : public CallOpSetInterface,
859 public Op1,
860 public Op2,
861 public Op3,
862 public Op4,
863 public Op5,
864 public Op6 {
865 public:
866 CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
867 // The copy constructor and assignment operator reset the value of
868 // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
869 // since those are only meaningful on a specific object, not across objects.
870 CallOpSet(const CallOpSet& other)
871 : core_cq_tag_(this),
872 return_tag_(this),
873 call_(other.call_),
874 done_intercepting_(false),
875 interceptor_methods_(InterceptorBatchMethodsImpl()) {}
876
877 CallOpSet& operator=(const CallOpSet& other) {
878 core_cq_tag_ = this;
879 return_tag_ = this;
880 call_ = other.call_;
881 done_intercepting_ = false;
882 interceptor_methods_ = InterceptorBatchMethodsImpl();
883 return *this;
884 }
885
886 void FillOps(Call* call) override {
887 done_intercepting_ = false;
888 g_core_codegen_interface->grpc_call_ref(call: call->call());
889 call_ =
890 *call; // It's fine to create a copy of call since it's just pointers
891
892 if (RunInterceptors()) {
893 ContinueFillOpsAfterInterception();
894 } else {
895 // After the interceptors are run, ContinueFillOpsAfterInterception will
896 // be run
897 }
898 }
899
900 bool FinalizeResult(void** tag, bool* status) override {
901 if (done_intercepting_) {
902 // Complete the avalanching since we are done with this batch of ops
903 call_.cq()->CompleteAvalanching();
904 // We have already finished intercepting and filling in the results. This
905 // round trip from the core needed to be made because interceptors were
906 // run
907 *tag = return_tag_;
908 *status = saved_status_;
909 g_core_codegen_interface->grpc_call_unref(call: call_.call());
910 return true;
911 }
912
913 this->Op1::FinishOp(status);
914 this->Op2::FinishOp(status);
915 this->Op3::FinishOp(status);
916 this->Op4::FinishOp(status);
917 this->Op5::FinishOp(status);
918 this->Op6::FinishOp(status);
919 saved_status_ = *status;
920 if (RunInterceptorsPostRecv()) {
921 *tag = return_tag_;
922 g_core_codegen_interface->grpc_call_unref(call: call_.call());
923 return true;
924 }
925 // Interceptors are going to be run, so we can't return the tag just yet.
926 // After the interceptors are run, ContinueFinalizeResultAfterInterception
927 return false;
928 }
929
930 void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
931
932 void* core_cq_tag() override { return core_cq_tag_; }
933
934 /// set_core_cq_tag is used to provide a different core CQ tag than "this".
935 /// This is used for callback-based tags, where the core tag is the core
936 /// callback function. It does not change the use or behavior of any other
937 /// function (such as FinalizeResult)
938 void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
939
940 // This will be called while interceptors are run if the RPC is a hijacked
941 // RPC. This should set hijacking state for each of the ops.
942 void SetHijackingState() override {
943 this->Op1::SetHijackingState(&interceptor_methods_);
944 this->Op2::SetHijackingState(&interceptor_methods_);
945 this->Op3::SetHijackingState(&interceptor_methods_);
946 this->Op4::SetHijackingState(&interceptor_methods_);
947 this->Op5::SetHijackingState(&interceptor_methods_);
948 this->Op6::SetHijackingState(&interceptor_methods_);
949 }
950
951 // Should be called after interceptors are done running
952 void ContinueFillOpsAfterInterception() override {
953 static const size_t MAX_OPS = 6;
954 grpc_op ops[MAX_OPS];
955 size_t nops = 0;
956 this->Op1::AddOp(ops, &nops);
957 this->Op2::AddOp(ops, &nops);
958 this->Op3::AddOp(ops, &nops);
959 this->Op4::AddOp(ops, &nops);
960 this->Op5::AddOp(ops, &nops);
961 this->Op6::AddOp(ops, &nops);
962
963 grpc_call_error err = g_core_codegen_interface->grpc_call_start_batch(
964 call: call_.call(), ops, nops, tag: core_cq_tag(), reserved: nullptr);
965
966 if (err != GRPC_CALL_OK) {
967 // A failure here indicates an API misuse; for example, doing a Write
968 // while another Write is already pending on the same RPC or invoking
969 // WritesDone multiple times
970 // gpr_log(GPR_ERROR, "API misuse of type %s observed",
971 // g_core_codegen_interface->grpc_call_error_to_string(err));
972 GPR_CODEGEN_ASSERT(false);
973 }
974 }
975
976 // Should be called after interceptors are done running on the finalize result
977 // path
978 void ContinueFinalizeResultAfterInterception() override {
979 done_intercepting_ = true;
980 // The following call_start_batch is internally-generated so no need for an
981 // explanatory log on failure.
982 GPR_CODEGEN_ASSERT(g_core_codegen_interface->grpc_call_start_batch(
983 call_.call(), nullptr, 0, core_cq_tag(), nullptr) ==
984 GRPC_CALL_OK);
985 }
986
987 private:
988 // Returns true if no interceptors need to be run
989 bool RunInterceptors() {
990 interceptor_methods_.ClearState();
991 interceptor_methods_.SetCallOpSetInterface(this);
992 interceptor_methods_.SetCall(&call_);
993 this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
994 this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
995 this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
996 this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
997 this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
998 this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
999 if (interceptor_methods_.InterceptorsListEmpty()) {
1000 return true;
1001 }
1002 // This call will go through interceptors and would need to
1003 // schedule new batches, so delay completion queue shutdown
1004 call_.cq()->RegisterAvalanching();
1005 return interceptor_methods_.RunInterceptors();
1006 }
1007 // Returns true if no interceptors need to be run
1008 bool RunInterceptorsPostRecv() {
1009 // Call and OpSet had already been set on the set state.
1010 // SetReverse also clears previously set hook points
1011 interceptor_methods_.SetReverse();
1012 this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
1013 this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
1014 this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
1015 this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
1016 this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
1017 this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
1018 return interceptor_methods_.RunInterceptors();
1019 }
1020
1021 void* core_cq_tag_;
1022 void* return_tag_;
1023 Call call_;
1024 bool done_intercepting_ = false;
1025 InterceptorBatchMethodsImpl interceptor_methods_;
1026 bool saved_status_;
1027};
1028
1029} // namespace internal
1030} // namespace grpc
1031
1032#endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
1033

source code of include/grpcpp/impl/codegen/call_op_set.h