1 | /* |
2 | * |
3 | * Copyright 2019 gRPC authors. |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | * you may not use this file except in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, software |
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | * See the License for the specific language governing permissions and |
15 | * limitations under the License. |
16 | */ |
17 | |
18 | #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H |
19 | #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H |
20 | |
21 | #include <atomic> |
22 | #include <functional> |
23 | #include <type_traits> |
24 | |
25 | #include <grpcpp/impl/codegen/call.h> |
26 | #include <grpcpp/impl/codegen/call_op_set.h> |
27 | #include <grpcpp/impl/codegen/callback_common.h> |
28 | #include <grpcpp/impl/codegen/config.h> |
29 | #include <grpcpp/impl/codegen/core_codegen_interface.h> |
30 | #include <grpcpp/impl/codegen/message_allocator.h> |
31 | #include <grpcpp/impl/codegen/status.h> |
32 | |
33 | namespace grpc_impl { |
34 | |
35 | // Declare base class of all reactors as internal |
36 | namespace internal { |
37 | |
38 | // Forward declarations |
39 | template <class Request, class Response> |
40 | class CallbackUnaryHandler; |
41 | template <class Request, class Response> |
42 | class CallbackClientStreamingHandler; |
43 | template <class Request, class Response> |
44 | class CallbackServerStreamingHandler; |
45 | template <class Request, class Response> |
46 | class CallbackBidiHandler; |
47 | |
48 | class ServerReactor { |
49 | public: |
50 | virtual ~ServerReactor() = default; |
51 | virtual void OnDone() = 0; |
52 | virtual void OnCancel() = 0; |
53 | |
54 | // The following is not API. It is for internal use only and specifies whether |
55 | // all reactions of this Reactor can be run without an extra executor |
56 | // scheduling. This should only be used for internally-defined reactors with |
57 | // trivial reactions. |
58 | virtual bool InternalInlineable() { return false; } |
59 | |
60 | private: |
61 | template <class Request, class Response> |
62 | friend class CallbackUnaryHandler; |
63 | template <class Request, class Response> |
64 | friend class CallbackClientStreamingHandler; |
65 | template <class Request, class Response> |
66 | friend class CallbackServerStreamingHandler; |
67 | template <class Request, class Response> |
68 | friend class CallbackBidiHandler; |
69 | }; |
70 | |
71 | /// The base class of ServerCallbackUnary etc. |
72 | class ServerCallbackCall { |
73 | public: |
74 | virtual ~ServerCallbackCall() {} |
75 | |
76 | // This object is responsible for tracking when it is safe to call OnDone and |
77 | // OnCancel. OnDone should not be called until the method handler is complete, |
78 | // Finish has been called, the ServerContext CompletionOp (which tracks |
79 | // cancellation or successful completion) has completed, and all outstanding |
80 | // Read/Write actions have seen their reactions. OnCancel should not be called |
81 | // until after the method handler is done and the RPC has completed with a |
82 | // cancellation. This is tracked by counting how many of these conditions have |
83 | // been met and calling OnCancel when none remain unmet. |
84 | |
85 | // Public versions of MaybeDone: one where we don't know the reactor in |
86 | // advance (used for the ServerContext CompletionOp), and one for where we |
87 | // know the inlineability of the OnDone reaction. You should set the inline |
88 | // flag to true if either the Reactor is InternalInlineable() or if this |
89 | // callback is already being forced to run dispatched to an executor |
90 | // (typically because it contains additional work than just the MaybeDone). |
91 | |
92 | void MaybeDone() { |
93 | if (GPR_UNLIKELY(Unref() == 1)) { |
94 | ScheduleOnDone(inline_ondone: reactor()->InternalInlineable()); |
95 | } |
96 | } |
97 | |
98 | void MaybeDone(bool inline_ondone) { |
99 | if (GPR_UNLIKELY(Unref() == 1)) { |
100 | ScheduleOnDone(inline_ondone); |
101 | } |
102 | } |
103 | |
104 | // Fast version called with known reactor passed in, used from derived |
105 | // classes, typically in non-cancel case |
106 | void MaybeCallOnCancel(ServerReactor* reactor) { |
107 | if (GPR_UNLIKELY(UnblockCancellation())) { |
108 | CallOnCancel(reactor); |
109 | } |
110 | } |
111 | |
112 | // Slower version called from object that doesn't know the reactor a priori |
113 | // (such as the ServerContext CompletionOp which is formed before the |
114 | // reactor). This is used in cancel cases only, so it's ok to be slower and |
115 | // invoke a virtual function. |
116 | void MaybeCallOnCancel() { |
117 | if (GPR_UNLIKELY(UnblockCancellation())) { |
118 | CallOnCancel(reactor: reactor()); |
119 | } |
120 | } |
121 | |
122 | protected: |
123 | /// Increases the reference count |
124 | void Ref() { callbacks_outstanding_.fetch_add(i: 1, m: std::memory_order_relaxed); } |
125 | |
126 | private: |
127 | virtual ServerReactor* reactor() = 0; |
128 | |
129 | // CallOnDone performs the work required at completion of the RPC: invoking |
130 | // the OnDone function and doing all necessary cleanup. This function is only |
131 | // ever invoked on a fully-Unref'fed ServerCallbackCall. |
132 | virtual void CallOnDone() = 0; |
133 | |
134 | // If the OnDone reaction is inlineable, execute it inline. Otherwise send it |
135 | // to an executor. |
136 | void ScheduleOnDone(bool inline_ondone); |
137 | |
138 | // If the OnCancel reaction is inlineable, execute it inline. Otherwise send |
139 | // it to an executor. |
140 | void CallOnCancel(ServerReactor* reactor); |
141 | |
142 | // Implement the cancellation constraint counter. Return true if OnCancel |
143 | // should be called, false otherwise. |
144 | bool UnblockCancellation() { |
145 | return on_cancel_conditions_remaining_.fetch_sub( |
146 | i: 1, m: std::memory_order_acq_rel) == 1; |
147 | } |
148 | |
149 | /// Decreases the reference count and returns the previous value |
150 | int Unref() { |
151 | return callbacks_outstanding_.fetch_sub(i: 1, m: std::memory_order_acq_rel); |
152 | } |
153 | |
154 | std::atomic_int on_cancel_conditions_remaining_{2}; |
155 | std::atomic_int callbacks_outstanding_{ |
156 | 3}; // reserve for start, Finish, and CompletionOp |
157 | }; |
158 | |
159 | template <class Request, class Response> |
160 | class DefaultMessageHolder |
161 | : public ::grpc::experimental::MessageHolder<Request, Response> { |
162 | public: |
163 | DefaultMessageHolder() { |
164 | this->set_request(&request_obj_); |
165 | this->set_response(&response_obj_); |
166 | } |
167 | void Release() override { |
168 | // the object is allocated in the call arena. |
169 | this->~DefaultMessageHolder<Request, Response>(); |
170 | } |
171 | |
172 | private: |
173 | Request request_obj_; |
174 | Response response_obj_; |
175 | }; |
176 | |
177 | } // namespace internal |
178 | |
179 | // Forward declarations |
180 | class ServerUnaryReactor; |
181 | template <class Request> |
182 | class ServerReadReactor; |
183 | template <class Response> |
184 | class ServerWriteReactor; |
185 | template <class Request, class Response> |
186 | class ServerBidiReactor; |
187 | |
188 | // NOTE: The actual call/stream object classes are provided as API only to |
189 | // support mocking. There are no implementations of these class interfaces in |
190 | // the API. |
191 | class ServerCallbackUnary : public internal::ServerCallbackCall { |
192 | public: |
193 | virtual ~ServerCallbackUnary() {} |
194 | virtual void Finish(::grpc::Status s) = 0; |
195 | virtual void SendInitialMetadata() = 0; |
196 | |
197 | protected: |
198 | // Use a template rather than explicitly specifying ServerUnaryReactor to |
199 | // delay binding and avoid a circular forward declaration issue |
200 | template <class Reactor> |
201 | void BindReactor(Reactor* reactor) { |
202 | reactor->InternalBindCall(this); |
203 | } |
204 | }; |
205 | |
206 | template <class Request> |
207 | class ServerCallbackReader : public internal::ServerCallbackCall { |
208 | public: |
209 | virtual ~ServerCallbackReader() {} |
210 | virtual void Finish(::grpc::Status s) = 0; |
211 | virtual void SendInitialMetadata() = 0; |
212 | virtual void Read(Request* msg) = 0; |
213 | |
214 | protected: |
215 | void BindReactor(ServerReadReactor<Request>* reactor) { |
216 | reactor->InternalBindReader(this); |
217 | } |
218 | }; |
219 | |
220 | template <class Response> |
221 | class ServerCallbackWriter : public internal::ServerCallbackCall { |
222 | public: |
223 | virtual ~ServerCallbackWriter() {} |
224 | |
225 | virtual void Finish(::grpc::Status s) = 0; |
226 | virtual void SendInitialMetadata() = 0; |
227 | virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; |
228 | virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, |
229 | ::grpc::Status s) = 0; |
230 | |
231 | protected: |
232 | void BindReactor(ServerWriteReactor<Response>* reactor) { |
233 | reactor->InternalBindWriter(this); |
234 | } |
235 | }; |
236 | |
237 | template <class Request, class Response> |
238 | class ServerCallbackReaderWriter : public internal::ServerCallbackCall { |
239 | public: |
240 | virtual ~ServerCallbackReaderWriter() {} |
241 | |
242 | virtual void Finish(::grpc::Status s) = 0; |
243 | virtual void SendInitialMetadata() = 0; |
244 | virtual void Read(Request* msg) = 0; |
245 | virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0; |
246 | virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options, |
247 | ::grpc::Status s) = 0; |
248 | |
249 | protected: |
250 | void BindReactor(ServerBidiReactor<Request, Response>* reactor) { |
251 | reactor->InternalBindStream(this); |
252 | } |
253 | }; |
254 | |
255 | // The following classes are the reactor interfaces that are to be implemented |
256 | // by the user, returned as the output parameter of the method handler for a |
257 | // callback method. Note that none of the classes are pure; all reactions have a |
258 | // default empty reaction so that the user class only needs to override those |
259 | // classes that it cares about. |
260 | |
261 | /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC. |
262 | template <class Request, class Response> |
263 | class ServerBidiReactor : public internal::ServerReactor { |
264 | public: |
265 | // NOTE: Initializing stream_ as a constructor initializer rather than a |
266 | // default initializer because gcc-4.x requires a copy constructor for |
267 | // default initializing a templated member, which isn't ok for atomic. |
268 | // TODO(vjpai): Switch to default constructor and default initializer when |
269 | // gcc-4.x is no longer supported |
270 | ServerBidiReactor() : stream_(nullptr) {} |
271 | ~ServerBidiReactor() = default; |
272 | |
273 | /// Send any initial metadata stored in the RPC context. If not invoked, |
274 | /// any initial metadata will be passed along with the first Write or the |
275 | /// Finish (if there are no writes). |
276 | void StartSendInitialMetadata() { |
277 | ServerCallbackReaderWriter<Request, Response>* stream = |
278 | stream_.load(std::memory_order_acquire); |
279 | if (stream == nullptr) { |
280 | grpc::internal::MutexLock l(&stream_mu_); |
281 | stream = stream_.load(std::memory_order_relaxed); |
282 | if (stream == nullptr) { |
283 | backlog_.send_initial_metadata_wanted = true; |
284 | return; |
285 | } |
286 | } |
287 | stream->SendInitialMetadata(); |
288 | } |
289 | |
290 | /// Initiate a read operation. |
291 | /// |
292 | /// \param[out] req Where to eventually store the read message. Valid when |
293 | /// the library calls OnReadDone |
294 | void StartRead(Request* req) { |
295 | ServerCallbackReaderWriter<Request, Response>* stream = |
296 | stream_.load(std::memory_order_acquire); |
297 | if (stream == nullptr) { |
298 | grpc::internal::MutexLock l(&stream_mu_); |
299 | stream = stream_.load(std::memory_order_relaxed); |
300 | if (stream == nullptr) { |
301 | backlog_.read_wanted = req; |
302 | return; |
303 | } |
304 | } |
305 | stream->Read(req); |
306 | } |
307 | |
308 | /// Initiate a write operation. |
309 | /// |
310 | /// \param[in] resp The message to be written. The library does not take |
311 | /// ownership but the caller must ensure that the message is |
312 | /// not deleted or modified until OnWriteDone is called. |
313 | void StartWrite(const Response* resp) { |
314 | StartWrite(resp, ::grpc::WriteOptions()); |
315 | } |
316 | |
317 | /// Initiate a write operation with specified options. |
318 | /// |
319 | /// \param[in] resp The message to be written. The library does not take |
320 | /// ownership but the caller must ensure that the message is |
321 | /// not deleted or modified until OnWriteDone is called. |
322 | /// \param[in] options The WriteOptions to use for writing this message |
323 | void StartWrite(const Response* resp, ::grpc::WriteOptions options) { |
324 | ServerCallbackReaderWriter<Request, Response>* stream = |
325 | stream_.load(std::memory_order_acquire); |
326 | if (stream == nullptr) { |
327 | grpc::internal::MutexLock l(&stream_mu_); |
328 | stream = stream_.load(std::memory_order_relaxed); |
329 | if (stream == nullptr) { |
330 | backlog_.write_wanted = resp; |
331 | backlog_.write_options_wanted = std::move(options); |
332 | return; |
333 | } |
334 | } |
335 | stream->Write(resp, std::move(options)); |
336 | } |
337 | |
338 | /// Initiate a write operation with specified options and final RPC Status, |
339 | /// which also causes any trailing metadata for this RPC to be sent out. |
340 | /// StartWriteAndFinish is like merging StartWriteLast and Finish into a |
341 | /// single step. A key difference, though, is that this operation doesn't have |
342 | /// an OnWriteDone reaction - it is considered complete only when OnDone is |
343 | /// available. An RPC can either have StartWriteAndFinish or Finish, but not |
344 | /// both. |
345 | /// |
346 | /// \param[in] resp The message to be written. The library does not take |
347 | /// ownership but the caller must ensure that the message is |
348 | /// not deleted or modified until OnDone is called. |
349 | /// \param[in] options The WriteOptions to use for writing this message |
350 | /// \param[in] s The status outcome of this RPC |
351 | void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, |
352 | ::grpc::Status s) { |
353 | ServerCallbackReaderWriter<Request, Response>* stream = |
354 | stream_.load(std::memory_order_acquire); |
355 | if (stream == nullptr) { |
356 | grpc::internal::MutexLock l(&stream_mu_); |
357 | stream = stream_.load(std::memory_order_relaxed); |
358 | if (stream == nullptr) { |
359 | backlog_.write_and_finish_wanted = true; |
360 | backlog_.write_wanted = resp; |
361 | backlog_.write_options_wanted = std::move(options); |
362 | backlog_.status_wanted = std::move(s); |
363 | return; |
364 | } |
365 | } |
366 | stream->WriteAndFinish(resp, std::move(options), std::move(s)); |
367 | } |
368 | |
369 | /// Inform system of a planned write operation with specified options, but |
370 | /// allow the library to schedule the actual write coalesced with the writing |
371 | /// of trailing metadata (which takes place on a Finish call). |
372 | /// |
373 | /// \param[in] resp The message to be written. The library does not take |
374 | /// ownership but the caller must ensure that the message is |
375 | /// not deleted or modified until OnWriteDone is called. |
376 | /// \param[in] options The WriteOptions to use for writing this message |
377 | void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { |
378 | StartWrite(resp, std::move(options.set_last_message())); |
379 | } |
380 | |
381 | /// Indicate that the stream is to be finished and the trailing metadata and |
382 | /// RPC status are to be sent. Every RPC MUST be finished using either Finish |
383 | /// or StartWriteAndFinish (but not both), even if the RPC is already |
384 | /// cancelled. |
385 | /// |
386 | /// \param[in] s The status outcome of this RPC |
387 | void Finish(::grpc::Status s) { |
388 | ServerCallbackReaderWriter<Request, Response>* stream = |
389 | stream_.load(std::memory_order_acquire); |
390 | if (stream == nullptr) { |
391 | grpc::internal::MutexLock l(&stream_mu_); |
392 | stream = stream_.load(std::memory_order_relaxed); |
393 | if (stream == nullptr) { |
394 | backlog_.finish_wanted = true; |
395 | backlog_.status_wanted = std::move(s); |
396 | return; |
397 | } |
398 | } |
399 | stream->Finish(std::move(s)); |
400 | } |
401 | |
402 | /// Notifies the application that an explicit StartSendInitialMetadata |
403 | /// operation completed. Not used when the sending of initial metadata |
404 | /// piggybacks onto the first write. |
405 | /// |
406 | /// \param[in] ok Was it successful? If false, no further write-side operation |
407 | /// will succeed. |
408 | virtual void OnSendInitialMetadataDone(bool /*ok*/) {} |
409 | |
410 | /// Notifies the application that a StartRead operation completed. |
411 | /// |
412 | /// \param[in] ok Was it successful? If false, no further read-side operation |
413 | /// will succeed. |
414 | virtual void OnReadDone(bool /*ok*/) {} |
415 | |
416 | /// Notifies the application that a StartWrite (or StartWriteLast) operation |
417 | /// completed. |
418 | /// |
419 | /// \param[in] ok Was it successful? If false, no further write-side operation |
420 | /// will succeed. |
421 | virtual void OnWriteDone(bool /*ok*/) {} |
422 | |
423 | /// Notifies the application that all operations associated with this RPC |
424 | /// have completed. This is an override (from the internal base class) but |
425 | /// still abstract, so derived classes MUST override it to be instantiated. |
426 | void OnDone() override = 0; |
427 | |
428 | /// Notifies the application that this RPC has been cancelled. This is an |
429 | /// override (from the internal base class) but not final, so derived classes |
430 | /// should override it if they want to take action. |
431 | void OnCancel() override {} |
432 | |
433 | private: |
434 | friend class ServerCallbackReaderWriter<Request, Response>; |
435 | // May be overridden by internal implementation details. This is not a public |
436 | // customization point. |
437 | virtual void InternalBindStream( |
438 | ServerCallbackReaderWriter<Request, Response>* stream) { |
439 | // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use |
440 | // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that |
441 | // has stream, then std::get<PreBindBacklog> out of that after the lock. |
442 | // Do likewise with the remaining InternalBind* functions as well. |
443 | grpc::internal::ReleasableMutexLock l(&stream_mu_); |
444 | PreBindBacklog ops(std::move(backlog_)); |
445 | stream_.store(stream, std::memory_order_release); |
446 | l.Unlock(); |
447 | |
448 | if (ops.send_initial_metadata_wanted) { |
449 | stream->SendInitialMetadata(); |
450 | } |
451 | if (ops.read_wanted != nullptr) { |
452 | stream->Read(ops.read_wanted); |
453 | } |
454 | if (ops.write_and_finish_wanted) { |
455 | stream->WriteAndFinish(ops.write_wanted, |
456 | std::move(ops.write_options_wanted), |
457 | std::move(ops.status_wanted)); |
458 | } else { |
459 | if (ops.write_wanted != nullptr) { |
460 | stream->Write(ops.write_wanted, std::move(ops.write_options_wanted)); |
461 | } |
462 | if (ops.finish_wanted) { |
463 | stream->Finish(std::move(ops.status_wanted)); |
464 | } |
465 | } |
466 | } |
467 | |
468 | grpc::internal::Mutex stream_mu_; |
469 | // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant |
470 | // once C++17 or ABSL is supported since stream and backlog are |
471 | // mutually exclusive in this class. Do likewise with the |
472 | // remaining reactor classes and their backlogs as well. |
473 | std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr}; |
474 | struct PreBindBacklog { |
475 | bool send_initial_metadata_wanted = false; |
476 | bool write_and_finish_wanted = false; |
477 | bool finish_wanted = false; |
478 | Request* read_wanted = nullptr; |
479 | const Response* write_wanted = nullptr; |
480 | ::grpc::WriteOptions write_options_wanted; |
481 | ::grpc::Status status_wanted; |
482 | }; |
483 | PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */; |
484 | }; |
485 | |
486 | /// \a ServerReadReactor is the interface for a client-streaming RPC. |
487 | template <class Request> |
488 | class ServerReadReactor : public internal::ServerReactor { |
489 | public: |
490 | ServerReadReactor() : reader_(nullptr) {} |
491 | ~ServerReadReactor() = default; |
492 | |
493 | /// The following operation initiations are exactly like ServerBidiReactor. |
494 | void StartSendInitialMetadata() { |
495 | ServerCallbackReader<Request>* reader = |
496 | reader_.load(std::memory_order_acquire); |
497 | if (reader == nullptr) { |
498 | grpc::internal::MutexLock l(&reader_mu_); |
499 | reader = reader_.load(std::memory_order_relaxed); |
500 | if (reader == nullptr) { |
501 | backlog_.send_initial_metadata_wanted = true; |
502 | return; |
503 | } |
504 | } |
505 | reader->SendInitialMetadata(); |
506 | } |
507 | void StartRead(Request* req) { |
508 | ServerCallbackReader<Request>* reader = |
509 | reader_.load(std::memory_order_acquire); |
510 | if (reader == nullptr) { |
511 | grpc::internal::MutexLock l(&reader_mu_); |
512 | reader = reader_.load(std::memory_order_relaxed); |
513 | if (reader == nullptr) { |
514 | backlog_.read_wanted = req; |
515 | return; |
516 | } |
517 | } |
518 | reader->Read(req); |
519 | } |
520 | void Finish(::grpc::Status s) { |
521 | ServerCallbackReader<Request>* reader = |
522 | reader_.load(std::memory_order_acquire); |
523 | if (reader == nullptr) { |
524 | grpc::internal::MutexLock l(&reader_mu_); |
525 | reader = reader_.load(std::memory_order_relaxed); |
526 | if (reader == nullptr) { |
527 | backlog_.finish_wanted = true; |
528 | backlog_.status_wanted = std::move(s); |
529 | return; |
530 | } |
531 | } |
532 | reader->Finish(std::move(s)); |
533 | } |
534 | |
535 | /// The following notifications are exactly like ServerBidiReactor. |
536 | virtual void OnSendInitialMetadataDone(bool /*ok*/) {} |
537 | virtual void OnReadDone(bool /*ok*/) {} |
538 | void OnDone() override = 0; |
539 | void OnCancel() override {} |
540 | |
541 | private: |
542 | friend class ServerCallbackReader<Request>; |
543 | |
544 | // May be overridden by internal implementation details. This is not a public |
545 | // customization point. |
546 | virtual void InternalBindReader(ServerCallbackReader<Request>* reader) { |
547 | grpc::internal::ReleasableMutexLock l(&reader_mu_); |
548 | PreBindBacklog ops(std::move(backlog_)); |
549 | reader_.store(reader, std::memory_order_release); |
550 | l.Unlock(); |
551 | |
552 | if (ops.send_initial_metadata_wanted) { |
553 | reader->SendInitialMetadata(); |
554 | } |
555 | if (ops.read_wanted != nullptr) { |
556 | reader->Read(ops.read_wanted); |
557 | } |
558 | if (ops.finish_wanted) { |
559 | reader->Finish(std::move(ops.status_wanted)); |
560 | } |
561 | } |
562 | |
563 | grpc::internal::Mutex reader_mu_; |
564 | std::atomic<ServerCallbackReader<Request>*> reader_{nullptr}; |
565 | struct PreBindBacklog { |
566 | bool send_initial_metadata_wanted = false; |
567 | bool finish_wanted = false; |
568 | Request* read_wanted = nullptr; |
569 | ::grpc::Status status_wanted; |
570 | }; |
571 | PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */; |
572 | }; |
573 | |
574 | /// \a ServerWriteReactor is the interface for a server-streaming RPC. |
575 | template <class Response> |
576 | class ServerWriteReactor : public internal::ServerReactor { |
577 | public: |
578 | ServerWriteReactor() : writer_(nullptr) {} |
579 | ~ServerWriteReactor() = default; |
580 | |
581 | /// The following operation initiations are exactly like ServerBidiReactor. |
582 | void StartSendInitialMetadata() { |
583 | ServerCallbackWriter<Response>* writer = |
584 | writer_.load(std::memory_order_acquire); |
585 | if (writer == nullptr) { |
586 | grpc::internal::MutexLock l(&writer_mu_); |
587 | writer = writer_.load(std::memory_order_relaxed); |
588 | if (writer == nullptr) { |
589 | backlog_.send_initial_metadata_wanted = true; |
590 | return; |
591 | } |
592 | } |
593 | writer->SendInitialMetadata(); |
594 | } |
595 | void StartWrite(const Response* resp) { |
596 | StartWrite(resp, ::grpc::WriteOptions()); |
597 | } |
598 | void StartWrite(const Response* resp, ::grpc::WriteOptions options) { |
599 | ServerCallbackWriter<Response>* writer = |
600 | writer_.load(std::memory_order_acquire); |
601 | if (writer == nullptr) { |
602 | grpc::internal::MutexLock l(&writer_mu_); |
603 | writer = writer_.load(std::memory_order_relaxed); |
604 | if (writer == nullptr) { |
605 | backlog_.write_wanted = resp; |
606 | backlog_.write_options_wanted = std::move(options); |
607 | return; |
608 | } |
609 | } |
610 | writer->Write(resp, std::move(options)); |
611 | } |
612 | void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options, |
613 | ::grpc::Status s) { |
614 | ServerCallbackWriter<Response>* writer = |
615 | writer_.load(std::memory_order_acquire); |
616 | if (writer == nullptr) { |
617 | grpc::internal::MutexLock l(&writer_mu_); |
618 | writer = writer_.load(std::memory_order_relaxed); |
619 | if (writer == nullptr) { |
620 | backlog_.write_and_finish_wanted = true; |
621 | backlog_.write_wanted = resp; |
622 | backlog_.write_options_wanted = std::move(options); |
623 | backlog_.status_wanted = std::move(s); |
624 | return; |
625 | } |
626 | } |
627 | writer->WriteAndFinish(resp, std::move(options), std::move(s)); |
628 | } |
629 | void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) { |
630 | StartWrite(resp, std::move(options.set_last_message())); |
631 | } |
632 | void Finish(::grpc::Status s) { |
633 | ServerCallbackWriter<Response>* writer = |
634 | writer_.load(std::memory_order_acquire); |
635 | if (writer == nullptr) { |
636 | grpc::internal::MutexLock l(&writer_mu_); |
637 | writer = writer_.load(std::memory_order_relaxed); |
638 | if (writer == nullptr) { |
639 | backlog_.finish_wanted = true; |
640 | backlog_.status_wanted = std::move(s); |
641 | return; |
642 | } |
643 | } |
644 | writer->Finish(std::move(s)); |
645 | } |
646 | |
647 | /// The following notifications are exactly like ServerBidiReactor. |
648 | virtual void OnSendInitialMetadataDone(bool /*ok*/) {} |
649 | virtual void OnWriteDone(bool /*ok*/) {} |
650 | void OnDone() override = 0; |
651 | void OnCancel() override {} |
652 | |
653 | private: |
654 | friend class ServerCallbackWriter<Response>; |
655 | // May be overridden by internal implementation details. This is not a public |
656 | // customization point. |
657 | virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) { |
658 | grpc::internal::ReleasableMutexLock l(&writer_mu_); |
659 | PreBindBacklog ops(std::move(backlog_)); |
660 | writer_.store(writer, std::memory_order_release); |
661 | l.Unlock(); |
662 | |
663 | if (ops.send_initial_metadata_wanted) { |
664 | writer->SendInitialMetadata(); |
665 | } |
666 | if (ops.write_and_finish_wanted) { |
667 | writer->WriteAndFinish(ops.write_wanted, |
668 | std::move(ops.write_options_wanted), |
669 | std::move(ops.status_wanted)); |
670 | } else { |
671 | if (ops.write_wanted != nullptr) { |
672 | writer->Write(ops.write_wanted, std::move(ops.write_options_wanted)); |
673 | } |
674 | if (ops.finish_wanted) { |
675 | writer->Finish(std::move(ops.status_wanted)); |
676 | } |
677 | } |
678 | } |
679 | |
680 | grpc::internal::Mutex writer_mu_; |
681 | std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr}; |
682 | struct PreBindBacklog { |
683 | bool send_initial_metadata_wanted = false; |
684 | bool write_and_finish_wanted = false; |
685 | bool finish_wanted = false; |
686 | const Response* write_wanted = nullptr; |
687 | ::grpc::WriteOptions write_options_wanted; |
688 | ::grpc::Status status_wanted; |
689 | }; |
690 | PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */; |
691 | }; |
692 | |
693 | class ServerUnaryReactor : public internal::ServerReactor { |
694 | public: |
695 | ServerUnaryReactor() : call_(nullptr) {} |
696 | ~ServerUnaryReactor() = default; |
697 | |
698 | /// StartSendInitialMetadata is exactly like ServerBidiReactor. |
699 | void StartSendInitialMetadata() { |
700 | ServerCallbackUnary* call = call_.load(m: std::memory_order_acquire); |
701 | if (call == nullptr) { |
702 | grpc::internal::MutexLock l(&call_mu_); |
703 | call = call_.load(m: std::memory_order_relaxed); |
704 | if (call == nullptr) { |
705 | backlog_.send_initial_metadata_wanted = true; |
706 | return; |
707 | } |
708 | } |
709 | call->SendInitialMetadata(); |
710 | } |
711 | /// Finish is similar to ServerBidiReactor except for one detail. |
712 | /// If the status is non-OK, any message will not be sent. Instead, |
713 | /// the client will only receive the status and any trailing metadata. |
714 | void Finish(::grpc::Status s) { |
715 | ServerCallbackUnary* call = call_.load(m: std::memory_order_acquire); |
716 | if (call == nullptr) { |
717 | grpc::internal::MutexLock l(&call_mu_); |
718 | call = call_.load(m: std::memory_order_relaxed); |
719 | if (call == nullptr) { |
720 | backlog_.finish_wanted = true; |
721 | backlog_.status_wanted = std::move(s); |
722 | return; |
723 | } |
724 | } |
725 | call->Finish(s: std::move(s)); |
726 | } |
727 | |
728 | /// The following notifications are exactly like ServerBidiReactor. |
729 | virtual void OnSendInitialMetadataDone(bool /*ok*/) {} |
730 | void OnDone() override = 0; |
731 | void OnCancel() override {} |
732 | |
733 | private: |
734 | friend class ServerCallbackUnary; |
735 | // May be overridden by internal implementation details. This is not a public |
736 | // customization point. |
737 | virtual void InternalBindCall(ServerCallbackUnary* call) { |
738 | grpc::internal::ReleasableMutexLock l(&call_mu_); |
739 | PreBindBacklog ops(std::move(backlog_)); |
740 | call_.store(p: call, m: std::memory_order_release); |
741 | l.Unlock(); |
742 | |
743 | if (ops.send_initial_metadata_wanted) { |
744 | call->SendInitialMetadata(); |
745 | } |
746 | if (ops.finish_wanted) { |
747 | call->Finish(s: std::move(ops.status_wanted)); |
748 | } |
749 | } |
750 | |
751 | grpc::internal::Mutex call_mu_; |
752 | std::atomic<ServerCallbackUnary*> call_{nullptr}; |
753 | struct PreBindBacklog { |
754 | bool send_initial_metadata_wanted = false; |
755 | bool finish_wanted = false; |
756 | ::grpc::Status status_wanted; |
757 | }; |
758 | PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */; |
759 | }; |
760 | |
761 | namespace internal { |
762 | |
763 | template <class Base> |
764 | class FinishOnlyReactor : public Base { |
765 | public: |
766 | explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); } |
767 | void OnDone() override { this->~FinishOnlyReactor(); } |
768 | }; |
769 | |
770 | using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>; |
771 | template <class Request> |
772 | using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>; |
773 | template <class Response> |
774 | using UnimplementedWriteReactor = |
775 | FinishOnlyReactor<ServerWriteReactor<Response>>; |
776 | template <class Request, class Response> |
777 | using UnimplementedBidiReactor = |
778 | FinishOnlyReactor<ServerBidiReactor<Request, Response>>; |
779 | |
780 | } // namespace internal |
781 | } // namespace grpc_impl |
782 | |
783 | #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H |
784 | |