1 | /* |
2 | * |
3 | * Copyright 2018 gRPC authors. |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | * you may not use this file except in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, software |
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | * See the License for the specific language governing permissions and |
15 | * limitations under the License. |
16 | * |
17 | */ |
18 | |
19 | #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H |
20 | #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H |
21 | |
22 | #include <cstring> |
23 | #include <map> |
24 | #include <memory> |
25 | |
26 | #include <grpc/impl/codegen/compression_types.h> |
27 | #include <grpc/impl/codegen/grpc_types.h> |
28 | #include <grpcpp/impl/codegen/byte_buffer.h> |
29 | #include <grpcpp/impl/codegen/call.h> |
30 | #include <grpcpp/impl/codegen/call_hook.h> |
31 | #include <grpcpp/impl/codegen/call_op_set_interface.h> |
32 | #include <grpcpp/impl/codegen/client_context_impl.h> |
33 | #include <grpcpp/impl/codegen/completion_queue_impl.h> |
34 | #include <grpcpp/impl/codegen/completion_queue_tag.h> |
35 | #include <grpcpp/impl/codegen/config.h> |
36 | #include <grpcpp/impl/codegen/core_codegen_interface.h> |
37 | #include <grpcpp/impl/codegen/intercepted_channel.h> |
38 | #include <grpcpp/impl/codegen/interceptor_common.h> |
39 | #include <grpcpp/impl/codegen/serialization_traits.h> |
40 | #include <grpcpp/impl/codegen/slice.h> |
41 | #include <grpcpp/impl/codegen/string_ref.h> |
42 | |
43 | namespace grpc { |
44 | |
45 | extern CoreCodegenInterface* g_core_codegen_interface; |
46 | |
47 | namespace internal { |
48 | class Call; |
49 | class CallHook; |
50 | |
51 | // TODO(yangg) if the map is changed before we send, the pointers will be a |
52 | // mess. Make sure it does not happen. |
53 | inline grpc_metadata* FillMetadataArray( |
54 | const std::multimap<grpc::string, grpc::string>& metadata, |
55 | size_t* metadata_count, const grpc::string& optional_error_details) { |
56 | *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1); |
57 | if (*metadata_count == 0) { |
58 | return nullptr; |
59 | } |
60 | grpc_metadata* metadata_array = |
61 | (grpc_metadata*)(g_core_codegen_interface->gpr_malloc( |
62 | size: (*metadata_count) * sizeof(grpc_metadata))); |
63 | size_t i = 0; |
64 | for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) { |
65 | metadata_array[i].key = SliceReferencingString(str: iter->first); |
66 | metadata_array[i].value = SliceReferencingString(str: iter->second); |
67 | } |
68 | if (!optional_error_details.empty()) { |
69 | metadata_array[i].key = |
70 | g_core_codegen_interface->grpc_slice_from_static_buffer( |
71 | buffer: kBinaryErrorDetailsKey, length: sizeof(kBinaryErrorDetailsKey) - 1); |
72 | metadata_array[i].value = SliceReferencingString(str: optional_error_details); |
73 | } |
74 | return metadata_array; |
75 | } |
76 | } // namespace internal |
77 | |
78 | /// Per-message write options. |
79 | class WriteOptions { |
80 | public: |
81 | WriteOptions() : flags_(0), last_message_(false) {} |
82 | WriteOptions(const WriteOptions& other) |
83 | : flags_(other.flags_), last_message_(other.last_message_) {} |
84 | |
85 | /// Default assignment operator |
86 | WriteOptions& operator=(const WriteOptions& other) = default; |
87 | |
88 | /// Clear all flags. |
89 | inline void Clear() { flags_ = 0; } |
90 | |
91 | /// Returns raw flags bitset. |
92 | inline uint32_t flags() const { return flags_; } |
93 | |
94 | /// Sets flag for the disabling of compression for the next message write. |
95 | /// |
96 | /// \sa GRPC_WRITE_NO_COMPRESS |
97 | inline WriteOptions& set_no_compression() { |
98 | SetBit(GRPC_WRITE_NO_COMPRESS); |
99 | return *this; |
100 | } |
101 | |
102 | /// Clears flag for the disabling of compression for the next message write. |
103 | /// |
104 | /// \sa GRPC_WRITE_NO_COMPRESS |
105 | inline WriteOptions& clear_no_compression() { |
106 | ClearBit(GRPC_WRITE_NO_COMPRESS); |
107 | return *this; |
108 | } |
109 | |
110 | /// Get value for the flag indicating whether compression for the next |
111 | /// message write is forcefully disabled. |
112 | /// |
113 | /// \sa GRPC_WRITE_NO_COMPRESS |
114 | inline bool get_no_compression() const { |
115 | return GetBit(GRPC_WRITE_NO_COMPRESS); |
116 | } |
117 | |
118 | /// Sets flag indicating that the write may be buffered and need not go out on |
119 | /// the wire immediately. |
120 | /// |
121 | /// \sa GRPC_WRITE_BUFFER_HINT |
122 | inline WriteOptions& set_buffer_hint() { |
123 | SetBit(GRPC_WRITE_BUFFER_HINT); |
124 | return *this; |
125 | } |
126 | |
127 | /// Clears flag indicating that the write may be buffered and need not go out |
128 | /// on the wire immediately. |
129 | /// |
130 | /// \sa GRPC_WRITE_BUFFER_HINT |
131 | inline WriteOptions& clear_buffer_hint() { |
132 | ClearBit(GRPC_WRITE_BUFFER_HINT); |
133 | return *this; |
134 | } |
135 | |
136 | /// Get value for the flag indicating that the write may be buffered and need |
137 | /// not go out on the wire immediately. |
138 | /// |
139 | /// \sa GRPC_WRITE_BUFFER_HINT |
140 | inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } |
141 | |
142 | /// corked bit: aliases set_buffer_hint currently, with the intent that |
143 | /// set_buffer_hint will be removed in the future |
144 | inline WriteOptions& set_corked() { |
145 | SetBit(GRPC_WRITE_BUFFER_HINT); |
146 | return *this; |
147 | } |
148 | |
149 | inline WriteOptions& clear_corked() { |
150 | ClearBit(GRPC_WRITE_BUFFER_HINT); |
151 | return *this; |
152 | } |
153 | |
154 | inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); } |
155 | |
156 | /// last-message bit: indicates this is the last message in a stream |
157 | /// client-side: makes Write the equivalent of performing Write, WritesDone |
158 | /// in a single step |
159 | /// server-side: hold the Write until the service handler returns (sync api) |
160 | /// or until Finish is called (async api) |
161 | inline WriteOptions& set_last_message() { |
162 | last_message_ = true; |
163 | return *this; |
164 | } |
165 | |
166 | /// Clears flag indicating that this is the last message in a stream, |
167 | /// disabling coalescing. |
168 | inline WriteOptions& clear_last_message() { |
169 | last_message_ = false; |
170 | return *this; |
171 | } |
172 | |
173 | /// Guarantee that all bytes have been written to the socket before completing |
174 | /// this write (usually writes are completed when they pass flow control). |
175 | inline WriteOptions& set_write_through() { |
176 | SetBit(GRPC_WRITE_THROUGH); |
177 | return *this; |
178 | } |
179 | |
180 | inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); } |
181 | |
182 | /// Get value for the flag indicating that this is the last message, and |
183 | /// should be coalesced with trailing metadata. |
184 | /// |
185 | /// \sa GRPC_WRITE_LAST_MESSAGE |
186 | bool is_last_message() const { return last_message_; } |
187 | |
188 | private: |
189 | void SetBit(const uint32_t mask) { flags_ |= mask; } |
190 | |
191 | void ClearBit(const uint32_t mask) { flags_ &= ~mask; } |
192 | |
193 | bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; } |
194 | |
195 | uint32_t flags_; |
196 | bool last_message_; |
197 | }; |
198 | |
199 | namespace internal { |
200 | |
201 | /// Default argument for CallOpSet. I is unused by the class, but can be |
202 | /// used for generating multiple names for the same thing. |
203 | template <int I> |
204 | class CallNoOp { |
205 | protected: |
206 | void AddOp(grpc_op* /*ops*/, size_t* /*nops*/) {} |
207 | void FinishOp(bool* /*status*/) {} |
208 | void SetInterceptionHookPoint( |
209 | InterceptorBatchMethodsImpl* /*interceptor_methods*/) {} |
210 | void SetFinishInterceptionHookPoint( |
211 | InterceptorBatchMethodsImpl* /*interceptor_methods*/) {} |
212 | void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) { |
213 | } |
214 | }; |
215 | |
216 | class CallOpSendInitialMetadata { |
217 | public: |
218 | CallOpSendInitialMetadata() : send_(false) { |
219 | maybe_compression_level_.is_set = false; |
220 | } |
221 | |
222 | void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata, |
223 | uint32_t flags) { |
224 | maybe_compression_level_.is_set = false; |
225 | send_ = true; |
226 | flags_ = flags; |
227 | metadata_map_ = metadata; |
228 | } |
229 | |
230 | void set_compression_level(grpc_compression_level level) { |
231 | maybe_compression_level_.is_set = true; |
232 | maybe_compression_level_.level = level; |
233 | } |
234 | |
235 | protected: |
236 | void AddOp(grpc_op* ops, size_t* nops) { |
237 | if (!send_ || hijacked_) return; |
238 | grpc_op* op = &ops[(*nops)++]; |
239 | op->op = GRPC_OP_SEND_INITIAL_METADATA; |
240 | op->flags = flags_; |
241 | op->reserved = NULL; |
242 | initial_metadata_ = |
243 | FillMetadataArray(metadata: *metadata_map_, metadata_count: &initial_metadata_count_, optional_error_details: "" ); |
244 | op->data.send_initial_metadata.count = initial_metadata_count_; |
245 | op->data.send_initial_metadata.metadata = initial_metadata_; |
246 | op->data.send_initial_metadata.maybe_compression_level.is_set = |
247 | maybe_compression_level_.is_set; |
248 | if (maybe_compression_level_.is_set) { |
249 | op->data.send_initial_metadata.maybe_compression_level.level = |
250 | maybe_compression_level_.level; |
251 | } |
252 | } |
253 | void FinishOp(bool* /*status*/) { |
254 | if (!send_ || hijacked_) return; |
255 | g_core_codegen_interface->gpr_free(p: initial_metadata_); |
256 | send_ = false; |
257 | } |
258 | |
259 | void SetInterceptionHookPoint( |
260 | InterceptorBatchMethodsImpl* interceptor_methods) { |
261 | if (!send_) return; |
262 | interceptor_methods->AddInterceptionHookPoint( |
263 | type: experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA); |
264 | interceptor_methods->SetSendInitialMetadata(metadata_map_); |
265 | } |
266 | |
267 | void SetFinishInterceptionHookPoint( |
268 | InterceptorBatchMethodsImpl* /*interceptor_methods*/) {} |
269 | |
270 | void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) { |
271 | hijacked_ = true; |
272 | } |
273 | |
274 | bool hijacked_ = false; |
275 | bool send_; |
276 | uint32_t flags_; |
277 | size_t initial_metadata_count_; |
278 | std::multimap<grpc::string, grpc::string>* metadata_map_; |
279 | grpc_metadata* initial_metadata_; |
280 | struct { |
281 | bool is_set; |
282 | grpc_compression_level level; |
283 | } maybe_compression_level_; |
284 | }; |
285 | |
286 | class CallOpSendMessage { |
287 | public: |
288 | CallOpSendMessage() : send_buf_() {} |
289 | |
290 | /// Send \a message using \a options for the write. The \a options are cleared |
291 | /// after use. |
292 | template <class M> |
293 | Status SendMessage(const M& message, |
294 | WriteOptions options) GRPC_MUST_USE_RESULT; |
295 | |
296 | template <class M> |
297 | Status SendMessage(const M& message) GRPC_MUST_USE_RESULT; |
298 | |
299 | /// Send \a message using \a options for the write. The \a options are cleared |
300 | /// after use. This form of SendMessage allows gRPC to reference \a message |
301 | /// beyond the lifetime of SendMessage. |
302 | template <class M> |
303 | Status SendMessagePtr(const M* message, |
304 | WriteOptions options) GRPC_MUST_USE_RESULT; |
305 | |
306 | /// This form of SendMessage allows gRPC to reference \a message beyond the |
307 | /// lifetime of SendMessage. |
308 | template <class M> |
309 | Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT; |
310 | |
311 | protected: |
312 | void AddOp(grpc_op* ops, size_t* nops) { |
313 | if (msg_ == nullptr && !send_buf_.Valid()) return; |
314 | if (hijacked_) { |
315 | serializer_ = nullptr; |
316 | return; |
317 | } |
318 | if (msg_ != nullptr) { |
319 | GPR_CODEGEN_ASSERT(serializer_(msg_).ok()); |
320 | } |
321 | serializer_ = nullptr; |
322 | grpc_op* op = &ops[(*nops)++]; |
323 | op->op = GRPC_OP_SEND_MESSAGE; |
324 | op->flags = write_options_.flags(); |
325 | op->reserved = NULL; |
326 | op->data.send_message.send_message = send_buf_.c_buffer(); |
327 | // Flags are per-message: clear them after use. |
328 | write_options_.Clear(); |
329 | } |
330 | void FinishOp(bool* status) { |
331 | if (msg_ == nullptr && !send_buf_.Valid()) return; |
332 | if (hijacked_ && failed_send_) { |
333 | // Hijacking interceptor failed this Op |
334 | *status = false; |
335 | } else if (!*status) { |
336 | // This Op was passed down to core and the Op failed |
337 | failed_send_ = true; |
338 | } |
339 | } |
340 | |
341 | void SetInterceptionHookPoint( |
342 | InterceptorBatchMethodsImpl* interceptor_methods) { |
343 | if (msg_ == nullptr && !send_buf_.Valid()) return; |
344 | interceptor_methods->AddInterceptionHookPoint( |
345 | type: experimental::InterceptionHookPoints::PRE_SEND_MESSAGE); |
346 | interceptor_methods->SetSendMessage(buf: &send_buf_, msg: &msg_, fail_send_message: &failed_send_, |
347 | serializer: serializer_); |
348 | } |
349 | |
350 | void SetFinishInterceptionHookPoint( |
351 | InterceptorBatchMethodsImpl* interceptor_methods) { |
352 | if (msg_ != nullptr || send_buf_.Valid()) { |
353 | interceptor_methods->AddInterceptionHookPoint( |
354 | type: experimental::InterceptionHookPoints::POST_SEND_MESSAGE); |
355 | } |
356 | send_buf_.Clear(); |
357 | msg_ = nullptr; |
358 | // The contents of the SendMessage value that was previously set |
359 | // has had its references stolen by core's operations |
360 | interceptor_methods->SetSendMessage(buf: nullptr, msg: nullptr, fail_send_message: &failed_send_, |
361 | serializer: nullptr); |
362 | } |
363 | |
364 | void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) { |
365 | hijacked_ = true; |
366 | } |
367 | |
368 | private: |
369 | const void* msg_ = nullptr; // The original non-serialized message |
370 | bool hijacked_ = false; |
371 | bool failed_send_ = false; |
372 | ByteBuffer send_buf_; |
373 | WriteOptions write_options_; |
374 | std::function<Status(const void*)> serializer_; |
375 | }; |
376 | |
377 | template <class M> |
378 | Status CallOpSendMessage::SendMessage(const M& message, WriteOptions options) { |
379 | write_options_ = options; |
380 | serializer_ = [this](const void* message) { |
381 | bool own_buf; |
382 | send_buf_.Clear(); |
383 | // TODO(vjpai): Remove the void below when possible |
384 | // The void in the template parameter below should not be needed |
385 | // (since it should be implicit) but is needed due to an observed |
386 | // difference in behavior between clang and gcc for certain internal users |
387 | Status result = SerializationTraits<M, void>::Serialize( |
388 | *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf); |
389 | if (!own_buf) { |
390 | send_buf_.Duplicate(); |
391 | } |
392 | return result; |
393 | }; |
394 | // Serialize immediately only if we do not have access to the message pointer |
395 | if (msg_ == nullptr) { |
396 | Status result = serializer_(&message); |
397 | serializer_ = nullptr; |
398 | return result; |
399 | } |
400 | return Status(); |
401 | } |
402 | |
403 | template <class M> |
404 | Status CallOpSendMessage::SendMessage(const M& message) { |
405 | return SendMessage(message, WriteOptions()); |
406 | } |
407 | |
408 | template <class M> |
409 | Status CallOpSendMessage::SendMessagePtr(const M* message, |
410 | WriteOptions options) { |
411 | msg_ = message; |
412 | return SendMessage(*message, options); |
413 | } |
414 | |
415 | template <class M> |
416 | Status CallOpSendMessage::SendMessagePtr(const M* message) { |
417 | msg_ = message; |
418 | return SendMessage(*message, WriteOptions()); |
419 | } |
420 | |
421 | template <class R> |
422 | class CallOpRecvMessage { |
423 | public: |
424 | void RecvMessage(R* message) { message_ = message; } |
425 | |
426 | // Do not change status if no message is received. |
427 | void AllowNoMessage() { allow_not_getting_message_ = true; } |
428 | |
429 | bool got_message = false; |
430 | |
431 | protected: |
432 | void AddOp(grpc_op* ops, size_t* nops) { |
433 | if (message_ == nullptr || hijacked_) return; |
434 | grpc_op* op = &ops[(*nops)++]; |
435 | op->op = GRPC_OP_RECV_MESSAGE; |
436 | op->flags = 0; |
437 | op->reserved = NULL; |
438 | op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr(); |
439 | } |
440 | |
441 | void FinishOp(bool* status) { |
442 | if (message_ == nullptr) return; |
443 | if (recv_buf_.Valid()) { |
444 | if (*status) { |
445 | got_message = *status = |
446 | SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_) |
447 | .ok(); |
448 | recv_buf_.Release(); |
449 | } else { |
450 | got_message = false; |
451 | recv_buf_.Clear(); |
452 | } |
453 | } else if (hijacked_) { |
454 | if (hijacked_recv_message_failed_) { |
455 | FinishOpRecvMessageFailureHandler(status); |
456 | } else { |
457 | // The op was hijacked and it was successful. There is no further action |
458 | // to be performed since the message is already in its non-serialized |
459 | // form. |
460 | } |
461 | } else { |
462 | FinishOpRecvMessageFailureHandler(status); |
463 | } |
464 | } |
465 | |
466 | void SetInterceptionHookPoint( |
467 | InterceptorBatchMethodsImpl* interceptor_methods) { |
468 | if (message_ == nullptr) return; |
469 | interceptor_methods->SetRecvMessage(message: message_, |
470 | hijacked_recv_message_failed: &hijacked_recv_message_failed_); |
471 | } |
472 | |
473 | void SetFinishInterceptionHookPoint( |
474 | InterceptorBatchMethodsImpl* interceptor_methods) { |
475 | if (message_ == nullptr) return; |
476 | interceptor_methods->AddInterceptionHookPoint( |
477 | type: experimental::InterceptionHookPoints::POST_RECV_MESSAGE); |
478 | if (!got_message) interceptor_methods->SetRecvMessage(message: nullptr, hijacked_recv_message_failed: nullptr); |
479 | } |
480 | void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { |
481 | hijacked_ = true; |
482 | if (message_ == nullptr) return; |
483 | interceptor_methods->AddInterceptionHookPoint( |
484 | type: experimental::InterceptionHookPoints::PRE_RECV_MESSAGE); |
485 | got_message = true; |
486 | } |
487 | |
488 | private: |
489 | // Sets got_message and \a status for a failed recv message op |
490 | void FinishOpRecvMessageFailureHandler(bool* status) { |
491 | got_message = false; |
492 | if (!allow_not_getting_message_) { |
493 | *status = false; |
494 | } |
495 | } |
496 | |
497 | R* message_ = nullptr; |
498 | ByteBuffer recv_buf_; |
499 | bool allow_not_getting_message_ = false; |
500 | bool hijacked_ = false; |
501 | bool hijacked_recv_message_failed_ = false; |
502 | }; |
503 | |
504 | class DeserializeFunc { |
505 | public: |
506 | virtual Status Deserialize(ByteBuffer* buf) = 0; |
507 | virtual ~DeserializeFunc() {} |
508 | }; |
509 | |
510 | template <class R> |
511 | class DeserializeFuncType final : public DeserializeFunc { |
512 | public: |
513 | DeserializeFuncType(R* message) : message_(message) {} |
514 | Status Deserialize(ByteBuffer* buf) override { |
515 | return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_); |
516 | } |
517 | |
518 | ~DeserializeFuncType() override {} |
519 | |
520 | private: |
521 | R* message_; // Not a managed pointer because management is external to this |
522 | }; |
523 | |
524 | class CallOpGenericRecvMessage { |
525 | public: |
526 | template <class R> |
527 | void RecvMessage(R* message) { |
528 | // Use an explicit base class pointer to avoid resolution error in the |
529 | // following unique_ptr::reset for some old implementations. |
530 | DeserializeFunc* func = new DeserializeFuncType<R>(message); |
531 | deserialize_.reset(p: func); |
532 | message_ = message; |
533 | } |
534 | |
535 | // Do not change status if no message is received. |
536 | void AllowNoMessage() { allow_not_getting_message_ = true; } |
537 | |
538 | bool got_message = false; |
539 | |
540 | protected: |
541 | void AddOp(grpc_op* ops, size_t* nops) { |
542 | if (!deserialize_ || hijacked_) return; |
543 | grpc_op* op = &ops[(*nops)++]; |
544 | op->op = GRPC_OP_RECV_MESSAGE; |
545 | op->flags = 0; |
546 | op->reserved = NULL; |
547 | op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr(); |
548 | } |
549 | |
550 | void FinishOp(bool* status) { |
551 | if (!deserialize_ || hijacked_) return; |
552 | if (recv_buf_.Valid()) { |
553 | if (*status) { |
554 | got_message = true; |
555 | *status = deserialize_->Deserialize(buf: &recv_buf_).ok(); |
556 | recv_buf_.Release(); |
557 | } else { |
558 | got_message = false; |
559 | recv_buf_.Clear(); |
560 | } |
561 | } else if (hijacked_) { |
562 | if (hijacked_recv_message_failed_) { |
563 | FinishOpRecvMessageFailureHandler(status); |
564 | } else { |
565 | // The op was hijacked and it was successful. There is no further action |
566 | // to be performed since the message is already in its non-serialized |
567 | // form. |
568 | } |
569 | } else { |
570 | got_message = false; |
571 | if (!allow_not_getting_message_) { |
572 | *status = false; |
573 | } |
574 | } |
575 | } |
576 | |
577 | void SetInterceptionHookPoint( |
578 | InterceptorBatchMethodsImpl* interceptor_methods) { |
579 | if (!deserialize_) return; |
580 | interceptor_methods->SetRecvMessage(message: message_, |
581 | hijacked_recv_message_failed: &hijacked_recv_message_failed_); |
582 | } |
583 | |
584 | void SetFinishInterceptionHookPoint( |
585 | InterceptorBatchMethodsImpl* interceptor_methods) { |
586 | if (!deserialize_) return; |
587 | interceptor_methods->AddInterceptionHookPoint( |
588 | type: experimental::InterceptionHookPoints::POST_RECV_MESSAGE); |
589 | if (!got_message) interceptor_methods->SetRecvMessage(message: nullptr, hijacked_recv_message_failed: nullptr); |
590 | deserialize_.reset(); |
591 | } |
592 | void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { |
593 | hijacked_ = true; |
594 | if (!deserialize_) return; |
595 | interceptor_methods->AddInterceptionHookPoint( |
596 | type: experimental::InterceptionHookPoints::PRE_RECV_MESSAGE); |
597 | got_message = true; |
598 | } |
599 | |
600 | private: |
601 | // Sets got_message and \a status for a failed recv message op |
602 | void FinishOpRecvMessageFailureHandler(bool* status) { |
603 | got_message = false; |
604 | if (!allow_not_getting_message_) { |
605 | *status = false; |
606 | } |
607 | } |
608 | |
609 | void* message_ = nullptr; |
610 | std::unique_ptr<DeserializeFunc> deserialize_; |
611 | ByteBuffer recv_buf_; |
612 | bool allow_not_getting_message_ = false; |
613 | bool hijacked_ = false; |
614 | bool hijacked_recv_message_failed_ = false; |
615 | }; |
616 | |
617 | class CallOpClientSendClose { |
618 | public: |
619 | CallOpClientSendClose() : send_(false) {} |
620 | |
621 | void ClientSendClose() { send_ = true; } |
622 | |
623 | protected: |
624 | void AddOp(grpc_op* ops, size_t* nops) { |
625 | if (!send_ || hijacked_) return; |
626 | grpc_op* op = &ops[(*nops)++]; |
627 | op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; |
628 | op->flags = 0; |
629 | op->reserved = NULL; |
630 | } |
631 | void FinishOp(bool* /*status*/) { send_ = false; } |
632 | |
633 | void SetInterceptionHookPoint( |
634 | InterceptorBatchMethodsImpl* interceptor_methods) { |
635 | if (!send_) return; |
636 | interceptor_methods->AddInterceptionHookPoint( |
637 | type: experimental::InterceptionHookPoints::PRE_SEND_CLOSE); |
638 | } |
639 | |
640 | void SetFinishInterceptionHookPoint( |
641 | InterceptorBatchMethodsImpl* /*interceptor_methods*/) {} |
642 | |
643 | void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) { |
644 | hijacked_ = true; |
645 | } |
646 | |
647 | private: |
648 | bool hijacked_ = false; |
649 | bool send_; |
650 | }; |
651 | |
652 | class CallOpServerSendStatus { |
653 | public: |
654 | CallOpServerSendStatus() : send_status_available_(false) {} |
655 | |
656 | void ServerSendStatus( |
657 | std::multimap<grpc::string, grpc::string>* trailing_metadata, |
658 | const Status& status) { |
659 | send_error_details_ = status.error_details(); |
660 | metadata_map_ = trailing_metadata; |
661 | send_status_available_ = true; |
662 | send_status_code_ = static_cast<grpc_status_code>(status.error_code()); |
663 | send_error_message_ = status.error_message(); |
664 | } |
665 | |
666 | protected: |
667 | void AddOp(grpc_op* ops, size_t* nops) { |
668 | if (!send_status_available_ || hijacked_) return; |
669 | trailing_metadata_ = FillMetadataArray( |
670 | metadata: *metadata_map_, metadata_count: &trailing_metadata_count_, optional_error_details: send_error_details_); |
671 | grpc_op* op = &ops[(*nops)++]; |
672 | op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; |
673 | op->data.send_status_from_server.trailing_metadata_count = |
674 | trailing_metadata_count_; |
675 | op->data.send_status_from_server.trailing_metadata = trailing_metadata_; |
676 | op->data.send_status_from_server.status = send_status_code_; |
677 | error_message_slice_ = SliceReferencingString(str: send_error_message_); |
678 | op->data.send_status_from_server.status_details = |
679 | send_error_message_.empty() ? nullptr : &error_message_slice_; |
680 | op->flags = 0; |
681 | op->reserved = NULL; |
682 | } |
683 | |
684 | void FinishOp(bool* /*status*/) { |
685 | if (!send_status_available_ || hijacked_) return; |
686 | g_core_codegen_interface->gpr_free(p: trailing_metadata_); |
687 | send_status_available_ = false; |
688 | } |
689 | |
690 | void SetInterceptionHookPoint( |
691 | InterceptorBatchMethodsImpl* interceptor_methods) { |
692 | if (!send_status_available_) return; |
693 | interceptor_methods->AddInterceptionHookPoint( |
694 | type: experimental::InterceptionHookPoints::PRE_SEND_STATUS); |
695 | interceptor_methods->SetSendTrailingMetadata(metadata_map_); |
696 | interceptor_methods->SetSendStatus(code: &send_status_code_, error_details: &send_error_details_, |
697 | error_message: &send_error_message_); |
698 | } |
699 | |
700 | void SetFinishInterceptionHookPoint( |
701 | InterceptorBatchMethodsImpl* /*interceptor_methods*/) {} |
702 | |
703 | void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) { |
704 | hijacked_ = true; |
705 | } |
706 | |
707 | private: |
708 | bool hijacked_ = false; |
709 | bool send_status_available_; |
710 | grpc_status_code send_status_code_; |
711 | grpc::string send_error_details_; |
712 | grpc::string send_error_message_; |
713 | size_t trailing_metadata_count_; |
714 | std::multimap<grpc::string, grpc::string>* metadata_map_; |
715 | grpc_metadata* trailing_metadata_; |
716 | grpc_slice error_message_slice_; |
717 | }; |
718 | |
719 | class CallOpRecvInitialMetadata { |
720 | public: |
721 | CallOpRecvInitialMetadata() : metadata_map_(nullptr) {} |
722 | |
723 | void RecvInitialMetadata(::grpc_impl::ClientContext* context) { |
724 | context->initial_metadata_received_ = true; |
725 | metadata_map_ = &context->recv_initial_metadata_; |
726 | } |
727 | |
728 | protected: |
729 | void AddOp(grpc_op* ops, size_t* nops) { |
730 | if (metadata_map_ == nullptr || hijacked_) return; |
731 | grpc_op* op = &ops[(*nops)++]; |
732 | op->op = GRPC_OP_RECV_INITIAL_METADATA; |
733 | op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr(); |
734 | op->flags = 0; |
735 | op->reserved = NULL; |
736 | } |
737 | |
738 | void FinishOp(bool* /*status*/) { |
739 | if (metadata_map_ == nullptr || hijacked_) return; |
740 | } |
741 | |
742 | void SetInterceptionHookPoint( |
743 | InterceptorBatchMethodsImpl* interceptor_methods) { |
744 | interceptor_methods->SetRecvInitialMetadata(metadata_map_); |
745 | } |
746 | |
747 | void SetFinishInterceptionHookPoint( |
748 | InterceptorBatchMethodsImpl* interceptor_methods) { |
749 | if (metadata_map_ == nullptr) return; |
750 | interceptor_methods->AddInterceptionHookPoint( |
751 | type: experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA); |
752 | metadata_map_ = nullptr; |
753 | } |
754 | |
755 | void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { |
756 | hijacked_ = true; |
757 | if (metadata_map_ == nullptr) return; |
758 | interceptor_methods->AddInterceptionHookPoint( |
759 | type: experimental::InterceptionHookPoints::PRE_RECV_INITIAL_METADATA); |
760 | } |
761 | |
762 | private: |
763 | bool hijacked_ = false; |
764 | MetadataMap* metadata_map_; |
765 | }; |
766 | |
767 | class CallOpClientRecvStatus { |
768 | public: |
769 | CallOpClientRecvStatus() |
770 | : recv_status_(nullptr), debug_error_string_(nullptr) {} |
771 | |
772 | void ClientRecvStatus(::grpc_impl::ClientContext* context, Status* status) { |
773 | client_context_ = context; |
774 | metadata_map_ = &client_context_->trailing_metadata_; |
775 | recv_status_ = status; |
776 | error_message_ = g_core_codegen_interface->grpc_empty_slice(); |
777 | } |
778 | |
779 | protected: |
780 | void AddOp(grpc_op* ops, size_t* nops) { |
781 | if (recv_status_ == nullptr || hijacked_) return; |
782 | grpc_op* op = &ops[(*nops)++]; |
783 | op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; |
784 | op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr(); |
785 | op->data.recv_status_on_client.status = &status_code_; |
786 | op->data.recv_status_on_client.status_details = &error_message_; |
787 | op->data.recv_status_on_client.error_string = &debug_error_string_; |
788 | op->flags = 0; |
789 | op->reserved = NULL; |
790 | } |
791 | |
792 | void FinishOp(bool* /*status*/) { |
793 | if (recv_status_ == nullptr || hijacked_) return; |
794 | if (static_cast<StatusCode>(status_code_) == StatusCode::OK) { |
795 | *recv_status_ = Status(); |
796 | GPR_CODEGEN_DEBUG_ASSERT(debug_error_string_ == nullptr); |
797 | } else { |
798 | *recv_status_ = |
799 | Status(static_cast<StatusCode>(status_code_), |
800 | GRPC_SLICE_IS_EMPTY(error_message_) |
801 | ? grpc::string() |
802 | : grpc::string(GRPC_SLICE_START_PTR(error_message_), |
803 | GRPC_SLICE_END_PTR(error_message_)), |
804 | metadata_map_->GetBinaryErrorDetails()); |
805 | if (debug_error_string_ != nullptr) { |
806 | client_context_->set_debug_error_string(debug_error_string_); |
807 | g_core_codegen_interface->gpr_free(p: (void*)debug_error_string_); |
808 | } |
809 | } |
810 | // TODO(soheil): Find callers that set debug string even for status OK, |
811 | // and fix them. |
812 | g_core_codegen_interface->grpc_slice_unref(slice: error_message_); |
813 | } |
814 | |
815 | void SetInterceptionHookPoint( |
816 | InterceptorBatchMethodsImpl* interceptor_methods) { |
817 | interceptor_methods->SetRecvStatus(recv_status_); |
818 | interceptor_methods->SetRecvTrailingMetadata(metadata_map_); |
819 | } |
820 | |
821 | void SetFinishInterceptionHookPoint( |
822 | InterceptorBatchMethodsImpl* interceptor_methods) { |
823 | if (recv_status_ == nullptr) return; |
824 | interceptor_methods->AddInterceptionHookPoint( |
825 | type: experimental::InterceptionHookPoints::POST_RECV_STATUS); |
826 | recv_status_ = nullptr; |
827 | } |
828 | |
829 | void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) { |
830 | hijacked_ = true; |
831 | if (recv_status_ == nullptr) return; |
832 | interceptor_methods->AddInterceptionHookPoint( |
833 | type: experimental::InterceptionHookPoints::PRE_RECV_STATUS); |
834 | } |
835 | |
836 | private: |
837 | bool hijacked_ = false; |
838 | ::grpc_impl::ClientContext* client_context_; |
839 | MetadataMap* metadata_map_; |
840 | Status* recv_status_; |
841 | const char* debug_error_string_; |
842 | grpc_status_code status_code_; |
843 | grpc_slice error_message_; |
844 | }; |
845 | |
846 | template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>, |
847 | class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>, |
848 | class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>> |
849 | class CallOpSet; |
850 | |
851 | /// Primary implementation of CallOpSetInterface. |
852 | /// Since we cannot use variadic templates, we declare slots up to |
853 | /// the maximum count of ops we'll need in a set. We leverage the |
854 | /// empty base class optimization to slim this class (especially |
855 | /// when there are many unused slots used). To avoid duplicate base classes, |
856 | /// the template parameter for CallNoOp is varied by argument position. |
857 | template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6> |
858 | class CallOpSet : public CallOpSetInterface, |
859 | public Op1, |
860 | public Op2, |
861 | public Op3, |
862 | public Op4, |
863 | public Op5, |
864 | public Op6 { |
865 | public: |
866 | CallOpSet() : core_cq_tag_(this), return_tag_(this) {} |
867 | // The copy constructor and assignment operator reset the value of |
868 | // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_ |
869 | // since those are only meaningful on a specific object, not across objects. |
870 | CallOpSet(const CallOpSet& other) |
871 | : core_cq_tag_(this), |
872 | return_tag_(this), |
873 | call_(other.call_), |
874 | done_intercepting_(false), |
875 | interceptor_methods_(InterceptorBatchMethodsImpl()) {} |
876 | |
877 | CallOpSet& operator=(const CallOpSet& other) { |
878 | core_cq_tag_ = this; |
879 | return_tag_ = this; |
880 | call_ = other.call_; |
881 | done_intercepting_ = false; |
882 | interceptor_methods_ = InterceptorBatchMethodsImpl(); |
883 | return *this; |
884 | } |
885 | |
886 | void FillOps(Call* call) override { |
887 | done_intercepting_ = false; |
888 | g_core_codegen_interface->grpc_call_ref(call: call->call()); |
889 | call_ = |
890 | *call; // It's fine to create a copy of call since it's just pointers |
891 | |
892 | if (RunInterceptors()) { |
893 | ContinueFillOpsAfterInterception(); |
894 | } else { |
895 | // After the interceptors are run, ContinueFillOpsAfterInterception will |
896 | // be run |
897 | } |
898 | } |
899 | |
900 | bool FinalizeResult(void** tag, bool* status) override { |
901 | if (done_intercepting_) { |
902 | // Complete the avalanching since we are done with this batch of ops |
903 | call_.cq()->CompleteAvalanching(); |
904 | // We have already finished intercepting and filling in the results. This |
905 | // round trip from the core needed to be made because interceptors were |
906 | // run |
907 | *tag = return_tag_; |
908 | *status = saved_status_; |
909 | g_core_codegen_interface->grpc_call_unref(call: call_.call()); |
910 | return true; |
911 | } |
912 | |
913 | this->Op1::FinishOp(status); |
914 | this->Op2::FinishOp(status); |
915 | this->Op3::FinishOp(status); |
916 | this->Op4::FinishOp(status); |
917 | this->Op5::FinishOp(status); |
918 | this->Op6::FinishOp(status); |
919 | saved_status_ = *status; |
920 | if (RunInterceptorsPostRecv()) { |
921 | *tag = return_tag_; |
922 | g_core_codegen_interface->grpc_call_unref(call: call_.call()); |
923 | return true; |
924 | } |
925 | // Interceptors are going to be run, so we can't return the tag just yet. |
926 | // After the interceptors are run, ContinueFinalizeResultAfterInterception |
927 | return false; |
928 | } |
929 | |
930 | void set_output_tag(void* return_tag) { return_tag_ = return_tag; } |
931 | |
932 | void* core_cq_tag() override { return core_cq_tag_; } |
933 | |
934 | /// set_core_cq_tag is used to provide a different core CQ tag than "this". |
935 | /// This is used for callback-based tags, where the core tag is the core |
936 | /// callback function. It does not change the use or behavior of any other |
937 | /// function (such as FinalizeResult) |
938 | void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; } |
939 | |
940 | // This will be called while interceptors are run if the RPC is a hijacked |
941 | // RPC. This should set hijacking state for each of the ops. |
942 | void SetHijackingState() override { |
943 | this->Op1::SetHijackingState(&interceptor_methods_); |
944 | this->Op2::SetHijackingState(&interceptor_methods_); |
945 | this->Op3::SetHijackingState(&interceptor_methods_); |
946 | this->Op4::SetHijackingState(&interceptor_methods_); |
947 | this->Op5::SetHijackingState(&interceptor_methods_); |
948 | this->Op6::SetHijackingState(&interceptor_methods_); |
949 | } |
950 | |
951 | // Should be called after interceptors are done running |
952 | void ContinueFillOpsAfterInterception() override { |
953 | static const size_t MAX_OPS = 6; |
954 | grpc_op ops[MAX_OPS]; |
955 | size_t nops = 0; |
956 | this->Op1::AddOp(ops, &nops); |
957 | this->Op2::AddOp(ops, &nops); |
958 | this->Op3::AddOp(ops, &nops); |
959 | this->Op4::AddOp(ops, &nops); |
960 | this->Op5::AddOp(ops, &nops); |
961 | this->Op6::AddOp(ops, &nops); |
962 | |
963 | grpc_call_error err = g_core_codegen_interface->grpc_call_start_batch( |
964 | call: call_.call(), ops, nops, tag: core_cq_tag(), reserved: nullptr); |
965 | |
966 | if (err != GRPC_CALL_OK) { |
967 | // A failure here indicates an API misuse; for example, doing a Write |
968 | // while another Write is already pending on the same RPC or invoking |
969 | // WritesDone multiple times |
970 | // gpr_log(GPR_ERROR, "API misuse of type %s observed", |
971 | // g_core_codegen_interface->grpc_call_error_to_string(err)); |
972 | GPR_CODEGEN_ASSERT(false); |
973 | } |
974 | } |
975 | |
976 | // Should be called after interceptors are done running on the finalize result |
977 | // path |
978 | void ContinueFinalizeResultAfterInterception() override { |
979 | done_intercepting_ = true; |
980 | // The following call_start_batch is internally-generated so no need for an |
981 | // explanatory log on failure. |
982 | GPR_CODEGEN_ASSERT(g_core_codegen_interface->grpc_call_start_batch( |
983 | call_.call(), nullptr, 0, core_cq_tag(), nullptr) == |
984 | GRPC_CALL_OK); |
985 | } |
986 | |
987 | private: |
988 | // Returns true if no interceptors need to be run |
989 | bool RunInterceptors() { |
990 | interceptor_methods_.ClearState(); |
991 | interceptor_methods_.SetCallOpSetInterface(this); |
992 | interceptor_methods_.SetCall(&call_); |
993 | this->Op1::SetInterceptionHookPoint(&interceptor_methods_); |
994 | this->Op2::SetInterceptionHookPoint(&interceptor_methods_); |
995 | this->Op3::SetInterceptionHookPoint(&interceptor_methods_); |
996 | this->Op4::SetInterceptionHookPoint(&interceptor_methods_); |
997 | this->Op5::SetInterceptionHookPoint(&interceptor_methods_); |
998 | this->Op6::SetInterceptionHookPoint(&interceptor_methods_); |
999 | if (interceptor_methods_.InterceptorsListEmpty()) { |
1000 | return true; |
1001 | } |
1002 | // This call will go through interceptors and would need to |
1003 | // schedule new batches, so delay completion queue shutdown |
1004 | call_.cq()->RegisterAvalanching(); |
1005 | return interceptor_methods_.RunInterceptors(); |
1006 | } |
1007 | // Returns true if no interceptors need to be run |
1008 | bool RunInterceptorsPostRecv() { |
1009 | // Call and OpSet had already been set on the set state. |
1010 | // SetReverse also clears previously set hook points |
1011 | interceptor_methods_.SetReverse(); |
1012 | this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_); |
1013 | this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_); |
1014 | this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_); |
1015 | this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_); |
1016 | this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_); |
1017 | this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_); |
1018 | return interceptor_methods_.RunInterceptors(); |
1019 | } |
1020 | |
1021 | void* core_cq_tag_; |
1022 | void* return_tag_; |
1023 | Call call_; |
1024 | bool done_intercepting_ = false; |
1025 | InterceptorBatchMethodsImpl interceptor_methods_; |
1026 | bool saved_status_; |
1027 | }; |
1028 | |
1029 | } // namespace internal |
1030 | } // namespace grpc |
1031 | |
1032 | #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H |
1033 | |