1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#ifndef GRPCPP_SUPPORT_SERVER_CALLBACK_H
20#define GRPCPP_SUPPORT_SERVER_CALLBACK_H
21
22#include <atomic>
23#include <functional>
24#include <type_traits>
25
26#include <grpcpp/impl/call.h>
27#include <grpcpp/impl/call_op_set.h>
28#include <grpcpp/impl/codegen/core_codegen_interface.h>
29#include <grpcpp/impl/codegen/sync.h>
30#include <grpcpp/support/callback_common.h>
31#include <grpcpp/support/config.h>
32#include <grpcpp/support/message_allocator.h>
33#include <grpcpp/support/status.h>
34
35namespace grpc {
36
37// Declare base class of all reactors as internal
38namespace internal {
39
40// Forward declarations
41template <class Request, class Response>
42class CallbackUnaryHandler;
43template <class Request, class Response>
44class CallbackClientStreamingHandler;
45template <class Request, class Response>
46class CallbackServerStreamingHandler;
47template <class Request, class Response>
48class CallbackBidiHandler;
49
50class ServerReactor {
51 public:
52 virtual ~ServerReactor() = default;
53 virtual void OnDone() = 0;
54 virtual void OnCancel() = 0;
55
56 // The following is not API. It is for internal use only and specifies whether
57 // all reactions of this Reactor can be run without an extra executor
58 // scheduling. This should only be used for internally-defined reactors with
59 // trivial reactions.
60 virtual bool InternalInlineable() { return false; }
61
62 private:
63 template <class Request, class Response>
64 friend class CallbackUnaryHandler;
65 template <class Request, class Response>
66 friend class CallbackClientStreamingHandler;
67 template <class Request, class Response>
68 friend class CallbackServerStreamingHandler;
69 template <class Request, class Response>
70 friend class CallbackBidiHandler;
71};
72
73/// The base class of ServerCallbackUnary etc.
74class ServerCallbackCall {
75 public:
76 virtual ~ServerCallbackCall() {}
77
78 // This object is responsible for tracking when it is safe to call OnDone and
79 // OnCancel. OnDone should not be called until the method handler is complete,
80 // Finish has been called, the ServerContext CompletionOp (which tracks
81 // cancellation or successful completion) has completed, and all outstanding
82 // Read/Write actions have seen their reactions. OnCancel should not be called
83 // until after the method handler is done and the RPC has completed with a
84 // cancellation. This is tracked by counting how many of these conditions have
85 // been met and calling OnCancel when none remain unmet.
86
87 // Public versions of MaybeDone: one where we don't know the reactor in
88 // advance (used for the ServerContext CompletionOp), and one for where we
89 // know the inlineability of the OnDone reaction. You should set the inline
90 // flag to true if either the Reactor is InternalInlineable() or if this
91 // callback is already being forced to run dispatched to an executor
92 // (typically because it contains additional work than just the MaybeDone).
93
94 void MaybeDone() {
95 if (GPR_UNLIKELY(Unref() == 1)) {
96 ScheduleOnDone(inline_ondone: reactor()->InternalInlineable());
97 }
98 }
99
100 void MaybeDone(bool inline_ondone) {
101 if (GPR_UNLIKELY(Unref() == 1)) {
102 ScheduleOnDone(inline_ondone);
103 }
104 }
105
106 // Fast version called with known reactor passed in, used from derived
107 // classes, typically in non-cancel case
108 void MaybeCallOnCancel(ServerReactor* reactor) {
109 if (GPR_UNLIKELY(UnblockCancellation())) {
110 CallOnCancel(reactor);
111 }
112 }
113
114 // Slower version called from object that doesn't know the reactor a priori
115 // (such as the ServerContext CompletionOp which is formed before the
116 // reactor). This is used in cancel cases only, so it's ok to be slower and
117 // invoke a virtual function.
118 void MaybeCallOnCancel() {
119 if (GPR_UNLIKELY(UnblockCancellation())) {
120 CallOnCancel(reactor: reactor());
121 }
122 }
123
124 protected:
125 /// Increases the reference count
126 void Ref() { callbacks_outstanding_.fetch_add(i: 1, m: std::memory_order_relaxed); }
127
128 private:
129 virtual ServerReactor* reactor() = 0;
130
131 // CallOnDone performs the work required at completion of the RPC: invoking
132 // the OnDone function and doing all necessary cleanup. This function is only
133 // ever invoked on a fully-Unref'fed ServerCallbackCall.
134 virtual void CallOnDone() = 0;
135
136 // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
137 // to an executor.
138 void ScheduleOnDone(bool inline_ondone);
139
140 // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
141 // it to an executor.
142 void CallOnCancel(ServerReactor* reactor);
143
144 // Implement the cancellation constraint counter. Return true if OnCancel
145 // should be called, false otherwise.
146 bool UnblockCancellation() {
147 return on_cancel_conditions_remaining_.fetch_sub(
148 i: 1, m: std::memory_order_acq_rel) == 1;
149 }
150
151 /// Decreases the reference count and returns the previous value
152 int Unref() {
153 return callbacks_outstanding_.fetch_sub(i: 1, m: std::memory_order_acq_rel);
154 }
155
156 std::atomic_int on_cancel_conditions_remaining_{2};
157 std::atomic_int callbacks_outstanding_{
158 3}; // reserve for start, Finish, and CompletionOp
159};
160
161template <class Request, class Response>
162class DefaultMessageHolder : public MessageHolder<Request, Response> {
163 public:
164 DefaultMessageHolder() {
165 this->set_request(&request_obj_);
166 this->set_response(&response_obj_);
167 }
168 void Release() override {
169 // the object is allocated in the call arena.
170 this->~DefaultMessageHolder<Request, Response>();
171 }
172
173 private:
174 Request request_obj_;
175 Response response_obj_;
176};
177
178} // namespace internal
179
180// Forward declarations
181class ServerUnaryReactor;
182template <class Request>
183class ServerReadReactor;
184template <class Response>
185class ServerWriteReactor;
186template <class Request, class Response>
187class ServerBidiReactor;
188
189// NOTE: The actual call/stream object classes are provided as API only to
190// support mocking. There are no implementations of these class interfaces in
191// the API.
192class ServerCallbackUnary : public internal::ServerCallbackCall {
193 public:
194 ~ServerCallbackUnary() override {}
195 virtual void Finish(grpc::Status s) = 0;
196 virtual void SendInitialMetadata() = 0;
197
198 protected:
199 // Use a template rather than explicitly specifying ServerUnaryReactor to
200 // delay binding and avoid a circular forward declaration issue
201 template <class Reactor>
202 void BindReactor(Reactor* reactor) {
203 reactor->InternalBindCall(this);
204 }
205};
206
207template <class Request>
208class ServerCallbackReader : public internal::ServerCallbackCall {
209 public:
210 ~ServerCallbackReader() override {}
211 virtual void Finish(grpc::Status s) = 0;
212 virtual void SendInitialMetadata() = 0;
213 virtual void Read(Request* msg) = 0;
214
215 protected:
216 void BindReactor(ServerReadReactor<Request>* reactor) {
217 reactor->InternalBindReader(this);
218 }
219};
220
221template <class Response>
222class ServerCallbackWriter : public internal::ServerCallbackCall {
223 public:
224 ~ServerCallbackWriter() override {}
225
226 virtual void Finish(grpc::Status s) = 0;
227 virtual void SendInitialMetadata() = 0;
228 virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
229 virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
230 grpc::Status s) = 0;
231
232 protected:
233 void BindReactor(ServerWriteReactor<Response>* reactor) {
234 reactor->InternalBindWriter(this);
235 }
236};
237
238template <class Request, class Response>
239class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
240 public:
241 ~ServerCallbackReaderWriter() override {}
242
243 virtual void Finish(grpc::Status s) = 0;
244 virtual void SendInitialMetadata() = 0;
245 virtual void Read(Request* msg) = 0;
246 virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
247 virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options,
248 grpc::Status s) = 0;
249
250 protected:
251 void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
252 reactor->InternalBindStream(this);
253 }
254};
255
256// The following classes are the reactor interfaces that are to be implemented
257// by the user, returned as the output parameter of the method handler for a
258// callback method. Note that none of the classes are pure; all reactions have a
259// default empty reaction so that the user class only needs to override those
260// reactions that it cares about. The reaction methods will be invoked by the
261// library in response to the completion of various operations. Reactions must
262// not include blocking operations (such as blocking I/O, starting synchronous
263// RPCs, or waiting on condition variables). Reactions may be invoked
264// concurrently, except that OnDone is called after all others (assuming proper
265// API usage). The reactor may not be deleted until OnDone is called.
266
267/// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
268template <class Request, class Response>
269class ServerBidiReactor : public internal::ServerReactor {
270 public:
271 // NOTE: Initializing stream_ as a constructor initializer rather than a
272 // default initializer because gcc-4.x requires a copy constructor for
273 // default initializing a templated member, which isn't ok for atomic.
274 // TODO(vjpai): Switch to default constructor and default initializer when
275 // gcc-4.x is no longer supported
276 ServerBidiReactor() : stream_(nullptr) {}
277 ~ServerBidiReactor() override = default;
278
279 /// Send any initial metadata stored in the RPC context. If not invoked,
280 /// any initial metadata will be passed along with the first Write or the
281 /// Finish (if there are no writes).
282 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_) {
283 ServerCallbackReaderWriter<Request, Response>* stream =
284 stream_.load(std::memory_order_acquire);
285 if (stream == nullptr) {
286 grpc::internal::MutexLock l(&stream_mu_);
287 stream = stream_.load(std::memory_order_relaxed);
288 if (stream == nullptr) {
289 backlog_.send_initial_metadata_wanted = true;
290 return;
291 }
292 }
293 stream->SendInitialMetadata();
294 }
295
296 /// Initiate a read operation.
297 ///
298 /// \param[out] req Where to eventually store the read message. Valid when
299 /// the library calls OnReadDone
300 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_) {
301 ServerCallbackReaderWriter<Request, Response>* stream =
302 stream_.load(std::memory_order_acquire);
303 if (stream == nullptr) {
304 grpc::internal::MutexLock l(&stream_mu_);
305 stream = stream_.load(std::memory_order_relaxed);
306 if (stream == nullptr) {
307 backlog_.read_wanted = req;
308 return;
309 }
310 }
311 stream->Read(req);
312 }
313
314 /// Initiate a write operation.
315 ///
316 /// \param[in] resp The message to be written. The library does not take
317 /// ownership but the caller must ensure that the message is
318 /// not deleted or modified until OnWriteDone is called.
319 void StartWrite(const Response* resp) {
320 StartWrite(resp, grpc::WriteOptions());
321 }
322
323 /// Initiate a write operation with specified options.
324 ///
325 /// \param[in] resp The message to be written. The library does not take
326 /// ownership but the caller must ensure that the message is
327 /// not deleted or modified until OnWriteDone is called.
328 /// \param[in] options The WriteOptions to use for writing this message
329 void StartWrite(const Response* resp, grpc::WriteOptions options)
330 ABSL_LOCKS_EXCLUDED(stream_mu_) {
331 ServerCallbackReaderWriter<Request, Response>* stream =
332 stream_.load(std::memory_order_acquire);
333 if (stream == nullptr) {
334 grpc::internal::MutexLock l(&stream_mu_);
335 stream = stream_.load(std::memory_order_relaxed);
336 if (stream == nullptr) {
337 backlog_.write_wanted = resp;
338 backlog_.write_options_wanted = options;
339 return;
340 }
341 }
342 stream->Write(resp, options);
343 }
344
345 /// Initiate a write operation with specified options and final RPC Status,
346 /// which also causes any trailing metadata for this RPC to be sent out.
347 /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
348 /// single step. A key difference, though, is that this operation doesn't have
349 /// an OnWriteDone reaction - it is considered complete only when OnDone is
350 /// available. An RPC can either have StartWriteAndFinish or Finish, but not
351 /// both.
352 ///
353 /// \param[in] resp The message to be written. The library does not take
354 /// ownership but the caller must ensure that the message is
355 /// not deleted or modified until OnDone is called.
356 /// \param[in] options The WriteOptions to use for writing this message
357 /// \param[in] s The status outcome of this RPC
358 void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
359 grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
360 ServerCallbackReaderWriter<Request, Response>* stream =
361 stream_.load(std::memory_order_acquire);
362 if (stream == nullptr) {
363 grpc::internal::MutexLock l(&stream_mu_);
364 stream = stream_.load(std::memory_order_relaxed);
365 if (stream == nullptr) {
366 backlog_.write_and_finish_wanted = true;
367 backlog_.write_wanted = resp;
368 backlog_.write_options_wanted = options;
369 backlog_.status_wanted = std::move(s);
370 return;
371 }
372 }
373 stream->WriteAndFinish(resp, options, std::move(s));
374 }
375
376 /// Inform system of a planned write operation with specified options, but
377 /// allow the library to schedule the actual write coalesced with the writing
378 /// of trailing metadata (which takes place on a Finish call).
379 ///
380 /// \param[in] resp The message to be written. The library does not take
381 /// ownership but the caller must ensure that the message is
382 /// not deleted or modified until OnWriteDone is called.
383 /// \param[in] options The WriteOptions to use for writing this message
384 void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
385 StartWrite(resp, options.set_last_message());
386 }
387
388 /// Indicate that the stream is to be finished and the trailing metadata and
389 /// RPC status are to be sent. Every RPC MUST be finished using either Finish
390 /// or StartWriteAndFinish (but not both), even if the RPC is already
391 /// cancelled.
392 ///
393 /// \param[in] s The status outcome of this RPC
394 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_) {
395 ServerCallbackReaderWriter<Request, Response>* stream =
396 stream_.load(std::memory_order_acquire);
397 if (stream == nullptr) {
398 grpc::internal::MutexLock l(&stream_mu_);
399 stream = stream_.load(std::memory_order_relaxed);
400 if (stream == nullptr) {
401 backlog_.finish_wanted = true;
402 backlog_.status_wanted = std::move(s);
403 return;
404 }
405 }
406 stream->Finish(std::move(s));
407 }
408
409 /// Notifies the application that an explicit StartSendInitialMetadata
410 /// operation completed. Not used when the sending of initial metadata
411 /// piggybacks onto the first write.
412 ///
413 /// \param[in] ok Was it successful? If false, no further write-side operation
414 /// will succeed.
415 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
416
417 /// Notifies the application that a StartRead operation completed.
418 ///
419 /// \param[in] ok Was it successful? If false, no further read-side operation
420 /// will succeed.
421 virtual void OnReadDone(bool /*ok*/) {}
422
423 /// Notifies the application that a StartWrite (or StartWriteLast) operation
424 /// completed.
425 ///
426 /// \param[in] ok Was it successful? If false, no further write-side operation
427 /// will succeed.
428 virtual void OnWriteDone(bool /*ok*/) {}
429
430 /// Notifies the application that all operations associated with this RPC
431 /// have completed. This is an override (from the internal base class) but
432 /// still abstract, so derived classes MUST override it to be instantiated.
433 void OnDone() override = 0;
434
435 /// Notifies the application that this RPC has been cancelled. This is an
436 /// override (from the internal base class) but not final, so derived classes
437 /// should override it if they want to take action.
438 void OnCancel() override {}
439
440 private:
441 friend class ServerCallbackReaderWriter<Request, Response>;
442 // May be overridden by internal implementation details. This is not a public
443 // customization point.
444 virtual void InternalBindStream(
445 ServerCallbackReaderWriter<Request, Response>* stream) {
446 grpc::internal::MutexLock l(&stream_mu_);
447
448 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
449 stream->SendInitialMetadata();
450 }
451 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
452 stream->Read(backlog_.read_wanted);
453 }
454 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
455 stream->WriteAndFinish(backlog_.write_wanted,
456 std::move(backlog_.write_options_wanted),
457 std::move(backlog_.status_wanted));
458 } else {
459 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
460 stream->Write(backlog_.write_wanted,
461 std::move(backlog_.write_options_wanted));
462 }
463 if (GPR_UNLIKELY(backlog_.finish_wanted)) {
464 stream->Finish(std::move(backlog_.status_wanted));
465 }
466 }
467 // Set stream_ last so that other functions can use it lock-free
468 stream_.store(stream, std::memory_order_release);
469 }
470
471 grpc::internal::Mutex stream_mu_;
472 // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
473 // once C++17 or ABSL is supported since stream and backlog are
474 // mutually exclusive in this class. Do likewise with the
475 // remaining reactor classes and their backlogs as well.
476 std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
477 struct PreBindBacklog {
478 bool send_initial_metadata_wanted = false;
479 bool write_and_finish_wanted = false;
480 bool finish_wanted = false;
481 Request* read_wanted = nullptr;
482 const Response* write_wanted = nullptr;
483 grpc::WriteOptions write_options_wanted;
484 grpc::Status status_wanted;
485 };
486 PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
487};
488
489/// \a ServerReadReactor is the interface for a client-streaming RPC.
490template <class Request>
491class ServerReadReactor : public internal::ServerReactor {
492 public:
493 ServerReadReactor() : reader_(nullptr) {}
494 ~ServerReadReactor() override = default;
495
496 /// The following operation initiations are exactly like ServerBidiReactor.
497 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_) {
498 ServerCallbackReader<Request>* reader =
499 reader_.load(std::memory_order_acquire);
500 if (reader == nullptr) {
501 grpc::internal::MutexLock l(&reader_mu_);
502 reader = reader_.load(std::memory_order_relaxed);
503 if (reader == nullptr) {
504 backlog_.send_initial_metadata_wanted = true;
505 return;
506 }
507 }
508 reader->SendInitialMetadata();
509 }
510 void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_) {
511 ServerCallbackReader<Request>* reader =
512 reader_.load(std::memory_order_acquire);
513 if (reader == nullptr) {
514 grpc::internal::MutexLock l(&reader_mu_);
515 reader = reader_.load(std::memory_order_relaxed);
516 if (reader == nullptr) {
517 backlog_.read_wanted = req;
518 return;
519 }
520 }
521 reader->Read(req);
522 }
523 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_) {
524 ServerCallbackReader<Request>* reader =
525 reader_.load(std::memory_order_acquire);
526 if (reader == nullptr) {
527 grpc::internal::MutexLock l(&reader_mu_);
528 reader = reader_.load(std::memory_order_relaxed);
529 if (reader == nullptr) {
530 backlog_.finish_wanted = true;
531 backlog_.status_wanted = std::move(s);
532 return;
533 }
534 }
535 reader->Finish(std::move(s));
536 }
537
538 /// The following notifications are exactly like ServerBidiReactor.
539 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
540 virtual void OnReadDone(bool /*ok*/) {}
541 void OnDone() override = 0;
542 void OnCancel() override {}
543
544 private:
545 friend class ServerCallbackReader<Request>;
546
547 // May be overridden by internal implementation details. This is not a public
548 // customization point.
549 virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
550 ABSL_LOCKS_EXCLUDED(reader_mu_) {
551 grpc::internal::MutexLock l(&reader_mu_);
552
553 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
554 reader->SendInitialMetadata();
555 }
556 if (GPR_UNLIKELY(backlog_.read_wanted != nullptr)) {
557 reader->Read(backlog_.read_wanted);
558 }
559 if (GPR_UNLIKELY(backlog_.finish_wanted)) {
560 reader->Finish(std::move(backlog_.status_wanted));
561 }
562 // Set reader_ last so that other functions can use it lock-free
563 reader_.store(reader, std::memory_order_release);
564 }
565
566 grpc::internal::Mutex reader_mu_;
567 std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
568 struct PreBindBacklog {
569 bool send_initial_metadata_wanted = false;
570 bool finish_wanted = false;
571 Request* read_wanted = nullptr;
572 grpc::Status status_wanted;
573 };
574 PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
575};
576
577/// \a ServerWriteReactor is the interface for a server-streaming RPC.
578template <class Response>
579class ServerWriteReactor : public internal::ServerReactor {
580 public:
581 ServerWriteReactor() : writer_(nullptr) {}
582 ~ServerWriteReactor() override = default;
583
584 /// The following operation initiations are exactly like ServerBidiReactor.
585 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_) {
586 ServerCallbackWriter<Response>* writer =
587 writer_.load(std::memory_order_acquire);
588 if (writer == nullptr) {
589 grpc::internal::MutexLock l(&writer_mu_);
590 writer = writer_.load(std::memory_order_relaxed);
591 if (writer == nullptr) {
592 backlog_.send_initial_metadata_wanted = true;
593 return;
594 }
595 }
596 writer->SendInitialMetadata();
597 }
598 void StartWrite(const Response* resp) {
599 StartWrite(resp, grpc::WriteOptions());
600 }
601 void StartWrite(const Response* resp, grpc::WriteOptions options)
602 ABSL_LOCKS_EXCLUDED(writer_mu_) {
603 ServerCallbackWriter<Response>* writer =
604 writer_.load(std::memory_order_acquire);
605 if (writer == nullptr) {
606 grpc::internal::MutexLock l(&writer_mu_);
607 writer = writer_.load(std::memory_order_relaxed);
608 if (writer == nullptr) {
609 backlog_.write_wanted = resp;
610 backlog_.write_options_wanted = options;
611 return;
612 }
613 }
614 writer->Write(resp, options);
615 }
616 void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options,
617 grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
618 ServerCallbackWriter<Response>* writer =
619 writer_.load(std::memory_order_acquire);
620 if (writer == nullptr) {
621 grpc::internal::MutexLock l(&writer_mu_);
622 writer = writer_.load(std::memory_order_relaxed);
623 if (writer == nullptr) {
624 backlog_.write_and_finish_wanted = true;
625 backlog_.write_wanted = resp;
626 backlog_.write_options_wanted = options;
627 backlog_.status_wanted = std::move(s);
628 return;
629 }
630 }
631 writer->WriteAndFinish(resp, options, std::move(s));
632 }
633 void StartWriteLast(const Response* resp, grpc::WriteOptions options) {
634 StartWrite(resp, options.set_last_message());
635 }
636 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_) {
637 ServerCallbackWriter<Response>* writer =
638 writer_.load(std::memory_order_acquire);
639 if (writer == nullptr) {
640 grpc::internal::MutexLock l(&writer_mu_);
641 writer = writer_.load(std::memory_order_relaxed);
642 if (writer == nullptr) {
643 backlog_.finish_wanted = true;
644 backlog_.status_wanted = std::move(s);
645 return;
646 }
647 }
648 writer->Finish(std::move(s));
649 }
650
651 /// The following notifications are exactly like ServerBidiReactor.
652 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
653 virtual void OnWriteDone(bool /*ok*/) {}
654 void OnDone() override = 0;
655 void OnCancel() override {}
656
657 private:
658 friend class ServerCallbackWriter<Response>;
659 // May be overridden by internal implementation details. This is not a public
660 // customization point.
661 virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
662 ABSL_LOCKS_EXCLUDED(writer_mu_) {
663 grpc::internal::MutexLock l(&writer_mu_);
664
665 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
666 writer->SendInitialMetadata();
667 }
668 if (GPR_UNLIKELY(backlog_.write_and_finish_wanted)) {
669 writer->WriteAndFinish(backlog_.write_wanted,
670 std::move(backlog_.write_options_wanted),
671 std::move(backlog_.status_wanted));
672 } else {
673 if (GPR_UNLIKELY(backlog_.write_wanted != nullptr)) {
674 writer->Write(backlog_.write_wanted,
675 std::move(backlog_.write_options_wanted));
676 }
677 if (GPR_UNLIKELY(backlog_.finish_wanted)) {
678 writer->Finish(std::move(backlog_.status_wanted));
679 }
680 }
681 // Set writer_ last so that other functions can use it lock-free
682 writer_.store(writer, std::memory_order_release);
683 }
684
685 grpc::internal::Mutex writer_mu_;
686 std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
687 struct PreBindBacklog {
688 bool send_initial_metadata_wanted = false;
689 bool write_and_finish_wanted = false;
690 bool finish_wanted = false;
691 const Response* write_wanted = nullptr;
692 grpc::WriteOptions write_options_wanted;
693 grpc::Status status_wanted;
694 };
695 PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
696};
697
698class ServerUnaryReactor : public internal::ServerReactor {
699 public:
700 ServerUnaryReactor() : call_(nullptr) {}
701 ~ServerUnaryReactor() override = default;
702
703 /// StartSendInitialMetadata is exactly like ServerBidiReactor.
704 void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_) {
705 ServerCallbackUnary* call = call_.load(m: std::memory_order_acquire);
706 if (call == nullptr) {
707 grpc::internal::MutexLock l(&call_mu_);
708 call = call_.load(m: std::memory_order_relaxed);
709 if (call == nullptr) {
710 backlog_.send_initial_metadata_wanted = true;
711 return;
712 }
713 }
714 call->SendInitialMetadata();
715 }
716 /// Finish is similar to ServerBidiReactor except for one detail.
717 /// If the status is non-OK, any message will not be sent. Instead,
718 /// the client will only receive the status and any trailing metadata.
719 void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_) {
720 ServerCallbackUnary* call = call_.load(m: std::memory_order_acquire);
721 if (call == nullptr) {
722 grpc::internal::MutexLock l(&call_mu_);
723 call = call_.load(m: std::memory_order_relaxed);
724 if (call == nullptr) {
725 backlog_.finish_wanted = true;
726 backlog_.status_wanted = std::move(s);
727 return;
728 }
729 }
730 call->Finish(s: std::move(s));
731 }
732
733 /// The following notifications are exactly like ServerBidiReactor.
734 virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
735 void OnDone() override = 0;
736 void OnCancel() override {}
737
738 private:
739 friend class ServerCallbackUnary;
740 // May be overridden by internal implementation details. This is not a public
741 // customization point.
742 virtual void InternalBindCall(ServerCallbackUnary* call)
743 ABSL_LOCKS_EXCLUDED(call_mu_) {
744 grpc::internal::MutexLock l(&call_mu_);
745
746 if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted)) {
747 call->SendInitialMetadata();
748 }
749 if (GPR_UNLIKELY(backlog_.finish_wanted)) {
750 call->Finish(s: std::move(backlog_.status_wanted));
751 }
752 // Set call_ last so that other functions can use it lock-free
753 call_.store(p: call, m: std::memory_order_release);
754 }
755
756 grpc::internal::Mutex call_mu_;
757 std::atomic<ServerCallbackUnary*> call_{nullptr};
758 struct PreBindBacklog {
759 bool send_initial_metadata_wanted = false;
760 bool finish_wanted = false;
761 grpc::Status status_wanted;
762 };
763 PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
764};
765
766namespace internal {
767
768template <class Base>
769class FinishOnlyReactor : public Base {
770 public:
771 explicit FinishOnlyReactor(grpc::Status s) { this->Finish(std::move(s)); }
772 void OnDone() override { this->~FinishOnlyReactor(); }
773};
774
775using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
776template <class Request>
777using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
778template <class Response>
779using UnimplementedWriteReactor =
780 FinishOnlyReactor<ServerWriteReactor<Response>>;
781template <class Request, class Response>
782using UnimplementedBidiReactor =
783 FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
784
785} // namespace internal
786
787// TODO(vjpai): Remove namespace experimental when last known users are migrated
788// off.
789namespace experimental {
790
791template <class Request, class Response>
792using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
793
794} // namespace experimental
795
796} // namespace grpc
797
798#endif // GRPCPP_SUPPORT_SERVER_CALLBACK_H
799

source code of include/grpcpp/support/server_callback.h