| 1 | /* |
| 2 | * |
| 3 | * Copyright 2015 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H |
| 20 | #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H |
| 21 | |
| 22 | #include <grpc/impl/codegen/port_platform.h> |
| 23 | |
| 24 | #include <grpc/impl/codegen/grpc_types.h> |
| 25 | #include <grpcpp/impl/codegen/byte_buffer.h> |
| 26 | #include <grpcpp/impl/codegen/call.h> |
| 27 | #include <grpcpp/impl/codegen/call_hook.h> |
| 28 | #include <grpcpp/impl/codegen/completion_queue_tag.h> |
| 29 | #include <grpcpp/impl/codegen/core_codegen_interface.h> |
| 30 | #include <grpcpp/impl/codegen/interceptor_common.h> |
| 31 | #include <grpcpp/impl/codegen/rpc_service_method.h> |
| 32 | #include <grpcpp/impl/codegen/server_context_impl.h> |
| 33 | |
| 34 | namespace grpc_impl { |
| 35 | |
| 36 | class Channel; |
| 37 | class CompletionQueue; |
| 38 | class ServerCompletionQueue; |
| 39 | class ServerCredentials; |
| 40 | } // namespace grpc_impl |
| 41 | namespace grpc { |
| 42 | |
| 43 | class AsyncGenericService; |
| 44 | class GenericServerContext; |
| 45 | class Service; |
| 46 | |
| 47 | extern CoreCodegenInterface* g_core_codegen_interface; |
| 48 | |
| 49 | /// Models a gRPC server. |
| 50 | /// |
| 51 | /// Servers are configured and started via \a grpc::ServerBuilder. |
| 52 | namespace internal { |
| 53 | class ServerAsyncStreamingInterface; |
| 54 | } // namespace internal |
| 55 | |
| 56 | #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL |
| 57 | namespace experimental { |
| 58 | #endif |
| 59 | class CallbackGenericService; |
| 60 | #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL |
| 61 | } // namespace experimental |
| 62 | #endif |
| 63 | |
| 64 | namespace experimental { |
| 65 | class ServerInterceptorFactoryInterface; |
| 66 | } // namespace experimental |
| 67 | |
| 68 | class ServerInterface : public internal::CallHook { |
| 69 | public: |
| 70 | virtual ~ServerInterface() {} |
| 71 | |
| 72 | /// \a Shutdown does the following things: |
| 73 | /// |
| 74 | /// 1. Shutdown the server: deactivate all listening ports, mark it in |
| 75 | /// "shutdown mode" so that further call Request's or incoming RPC matches |
| 76 | /// are no longer allowed. Also return all Request'ed-but-not-yet-active |
| 77 | /// calls as failed (!ok). This refers to calls that have been requested |
| 78 | /// at the server by the server-side library or application code but that |
| 79 | /// have not yet been matched to incoming RPCs from the client. Note that |
| 80 | /// this would even include default calls added automatically by the gRPC |
| 81 | /// C++ API without the user's input (e.g., "Unimplemented RPC method") |
| 82 | /// |
| 83 | /// 2. Block until all rpc method handlers invoked automatically by the sync |
| 84 | /// API finish. |
| 85 | /// |
| 86 | /// 3. If all pending calls complete (and all their operations are |
| 87 | /// retrieved by Next) before \a deadline expires, this finishes |
| 88 | /// gracefully. Otherwise, forcefully cancel all pending calls associated |
| 89 | /// with the server after \a deadline expires. In the case of the sync API, |
| 90 | /// if the RPC function for a streaming call has already been started and |
| 91 | /// takes a week to complete, the RPC function won't be forcefully |
| 92 | /// terminated (since that would leave state corrupt and incomplete) and |
| 93 | /// the method handler will just keep running (which will prevent the |
| 94 | /// server from completing the "join" operation that it needs to do at |
| 95 | /// shutdown time). |
| 96 | /// |
| 97 | /// All completion queue associated with the server (for example, for async |
| 98 | /// serving) must be shutdown *after* this method has returned: |
| 99 | /// See \a ServerBuilder::AddCompletionQueue for details. |
| 100 | /// They must also be drained (by repeated Next) after being shutdown. |
| 101 | /// |
| 102 | /// \param deadline How long to wait until pending rpcs are forcefully |
| 103 | /// terminated. |
| 104 | template <class T> |
| 105 | void Shutdown(const T& deadline) { |
| 106 | ShutdownInternal(deadline: TimePoint<T>(deadline).raw_time()); |
| 107 | } |
| 108 | |
| 109 | /// Shutdown the server without a deadline and forced cancellation. |
| 110 | /// |
| 111 | /// All completion queue associated with the server (for example, for async |
| 112 | /// serving) must be shutdown *after* this method has returned: |
| 113 | /// See \a ServerBuilder::AddCompletionQueue for details. |
| 114 | void Shutdown() { |
| 115 | ShutdownInternal( |
| 116 | deadline: g_core_codegen_interface->gpr_inf_future(type: GPR_CLOCK_MONOTONIC)); |
| 117 | } |
| 118 | |
| 119 | /// Block waiting for all work to complete. |
| 120 | /// |
| 121 | /// \warning The server must be either shutting down or some other thread must |
| 122 | /// call \a Shutdown for this function to ever return. |
| 123 | virtual void Wait() = 0; |
| 124 | |
| 125 | protected: |
| 126 | friend class ::grpc::Service; |
| 127 | |
| 128 | /// Register a service. This call does not take ownership of the service. |
| 129 | /// The service must exist for the lifetime of the Server instance. |
| 130 | virtual bool RegisterService(const grpc::string* host, Service* service) = 0; |
| 131 | |
| 132 | /// Register a generic service. This call does not take ownership of the |
| 133 | /// service. The service must exist for the lifetime of the Server instance. |
| 134 | virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; |
| 135 | |
| 136 | #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL |
| 137 | /// Register a callback generic service. This call does not take ownership of |
| 138 | /// the service. The service must exist for the lifetime of the Server |
| 139 | /// instance. May not be abstract since this is a post-1.0 API addition. |
| 140 | |
| 141 | virtual void RegisterCallbackGenericService(CallbackGenericService* |
| 142 | /*service*/) {} |
| 143 | #else |
| 144 | /// NOTE: class experimental_registration_interface is not part of the public |
| 145 | /// API of this class |
| 146 | /// TODO(vjpai): Move these contents to public API when no longer experimental |
| 147 | class experimental_registration_interface { |
| 148 | public: |
| 149 | virtual ~experimental_registration_interface() {} |
| 150 | /// May not be abstract since this is a post-1.0 API addition |
| 151 | virtual void RegisterCallbackGenericService( |
| 152 | experimental::CallbackGenericService* /*service*/) {} |
| 153 | }; |
| 154 | |
| 155 | /// NOTE: The function experimental_registration() is not stable public API. |
| 156 | /// It is a view to the experimental components of this class. It may be |
| 157 | /// changed or removed at any time. May not be abstract since this is a |
| 158 | /// post-1.0 API addition |
| 159 | virtual experimental_registration_interface* experimental_registration() { |
| 160 | return nullptr; |
| 161 | } |
| 162 | #endif |
| 163 | |
| 164 | /// Tries to bind \a server to the given \a addr. |
| 165 | /// |
| 166 | /// It can be invoked multiple times. |
| 167 | /// |
| 168 | /// \param addr The address to try to bind to the server (eg, localhost:1234, |
| 169 | /// 192.168.1.1:31416, [::1]:27182, etc.). |
| 170 | /// \params creds The credentials associated with the server. |
| 171 | /// |
| 172 | /// \return bound port number on success, 0 on failure. |
| 173 | /// |
| 174 | /// \warning It's an error to call this method on an already started server. |
| 175 | virtual int AddListeningPort(const grpc::string& addr, |
| 176 | grpc_impl::ServerCredentials* creds) = 0; |
| 177 | |
| 178 | /// Start the server. |
| 179 | /// |
| 180 | /// \param cqs Completion queues for handling asynchronous services. The |
| 181 | /// caller is required to keep all completion queues live until the server is |
| 182 | /// destroyed. |
| 183 | /// \param num_cqs How many completion queues does \a cqs hold. |
| 184 | virtual void Start(::grpc_impl::ServerCompletionQueue** cqs, |
| 185 | size_t num_cqs) = 0; |
| 186 | |
| 187 | virtual void ShutdownInternal(gpr_timespec deadline) = 0; |
| 188 | |
| 189 | virtual int max_receive_message_size() const = 0; |
| 190 | |
| 191 | virtual grpc_server* server() = 0; |
| 192 | |
| 193 | virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, |
| 194 | internal::Call* call) = 0; |
| 195 | |
| 196 | class BaseAsyncRequest : public internal::CompletionQueueTag { |
| 197 | public: |
| 198 | BaseAsyncRequest(ServerInterface* server, |
| 199 | ::grpc_impl::ServerContext* context, |
| 200 | internal::ServerAsyncStreamingInterface* stream, |
| 201 | ::grpc_impl::CompletionQueue* call_cq, |
| 202 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 203 | void* tag, bool delete_on_finalize); |
| 204 | virtual ~BaseAsyncRequest(); |
| 205 | |
| 206 | bool FinalizeResult(void** tag, bool* status) override; |
| 207 | |
| 208 | private: |
| 209 | void ContinueFinalizeResultAfterInterception(); |
| 210 | |
| 211 | protected: |
| 212 | ServerInterface* const server_; |
| 213 | ::grpc_impl::ServerContext* const context_; |
| 214 | internal::ServerAsyncStreamingInterface* const stream_; |
| 215 | ::grpc_impl::CompletionQueue* const call_cq_; |
| 216 | ::grpc_impl::ServerCompletionQueue* const notification_cq_; |
| 217 | void* const tag_; |
| 218 | const bool delete_on_finalize_; |
| 219 | grpc_call* call_; |
| 220 | internal::Call call_wrapper_; |
| 221 | internal::InterceptorBatchMethodsImpl interceptor_methods_; |
| 222 | bool done_intercepting_; |
| 223 | }; |
| 224 | |
| 225 | /// RegisteredAsyncRequest is not part of the C++ API |
| 226 | class RegisteredAsyncRequest : public BaseAsyncRequest { |
| 227 | public: |
| 228 | RegisteredAsyncRequest(ServerInterface* server, |
| 229 | ::grpc_impl::ServerContext* context, |
| 230 | internal::ServerAsyncStreamingInterface* stream, |
| 231 | ::grpc_impl::CompletionQueue* call_cq, |
| 232 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 233 | void* tag, const char* name, |
| 234 | internal::RpcMethod::RpcType type); |
| 235 | |
| 236 | virtual bool FinalizeResult(void** tag, bool* status) override { |
| 237 | /* If we are done intercepting, then there is nothing more for us to do */ |
| 238 | if (done_intercepting_) { |
| 239 | return BaseAsyncRequest::FinalizeResult(tag, status); |
| 240 | } |
| 241 | call_wrapper_ = ::grpc::internal::Call( |
| 242 | call_, server_, call_cq_, server_->max_receive_message_size(), |
| 243 | context_->set_server_rpc_info(method: name_, type: type_, |
| 244 | creators: *server_->interceptor_creators())); |
| 245 | return BaseAsyncRequest::FinalizeResult(tag, status); |
| 246 | } |
| 247 | |
| 248 | protected: |
| 249 | void IssueRequest(void* registered_method, grpc_byte_buffer** payload, |
| 250 | ::grpc_impl::ServerCompletionQueue* notification_cq); |
| 251 | const char* name_; |
| 252 | const internal::RpcMethod::RpcType type_; |
| 253 | }; |
| 254 | |
| 255 | class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { |
| 256 | public: |
| 257 | NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, |
| 258 | ServerInterface* server, |
| 259 | ::grpc_impl::ServerContext* context, |
| 260 | internal::ServerAsyncStreamingInterface* stream, |
| 261 | ::grpc_impl::CompletionQueue* call_cq, |
| 262 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 263 | void* tag) |
| 264 | : RegisteredAsyncRequest( |
| 265 | server, context, stream, call_cq, notification_cq, tag, |
| 266 | registered_method->name(), registered_method->method_type()) { |
| 267 | IssueRequest(registered_method: registered_method->server_tag(), payload: nullptr, notification_cq); |
| 268 | } |
| 269 | |
| 270 | // uses RegisteredAsyncRequest::FinalizeResult |
| 271 | }; |
| 272 | |
| 273 | template <class Message> |
| 274 | class PayloadAsyncRequest final : public RegisteredAsyncRequest { |
| 275 | public: |
| 276 | PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, |
| 277 | ServerInterface* server, |
| 278 | ::grpc_impl::ServerContext* context, |
| 279 | internal::ServerAsyncStreamingInterface* stream, |
| 280 | ::grpc_impl::CompletionQueue* call_cq, |
| 281 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 282 | void* tag, Message* request) |
| 283 | : RegisteredAsyncRequest( |
| 284 | server, context, stream, call_cq, notification_cq, tag, |
| 285 | registered_method->name(), registered_method->method_type()), |
| 286 | registered_method_(registered_method), |
| 287 | request_(request) { |
| 288 | IssueRequest(registered_method: registered_method->server_tag(), payload: payload_.bbuf_ptr(), |
| 289 | notification_cq); |
| 290 | } |
| 291 | |
| 292 | ~PayloadAsyncRequest() { |
| 293 | payload_.Release(); // We do not own the payload_ |
| 294 | } |
| 295 | |
| 296 | bool FinalizeResult(void** tag, bool* status) override { |
| 297 | /* If we are done intercepting, then there is nothing more for us to do */ |
| 298 | if (done_intercepting_) { |
| 299 | return RegisteredAsyncRequest::FinalizeResult(tag, status); |
| 300 | } |
| 301 | if (*status) { |
| 302 | if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( |
| 303 | payload_.bbuf_ptr(), request_) |
| 304 | .ok()) { |
| 305 | // If deserialization fails, we cancel the call and instantiate |
| 306 | // a new instance of ourselves to request another call. We then |
| 307 | // return false, which prevents the call from being returned to |
| 308 | // the application. |
| 309 | g_core_codegen_interface->grpc_call_cancel_with_status( |
| 310 | call: call_, status: GRPC_STATUS_INTERNAL, description: "Unable to parse request" , reserved: nullptr); |
| 311 | g_core_codegen_interface->grpc_call_unref(call: call_); |
| 312 | new PayloadAsyncRequest(registered_method_, server_, context_, |
| 313 | stream_, call_cq_, notification_cq_, tag_, |
| 314 | request_); |
| 315 | delete this; |
| 316 | return false; |
| 317 | } |
| 318 | } |
| 319 | /* Set interception point for recv message */ |
| 320 | interceptor_methods_.AddInterceptionHookPoint( |
| 321 | type: experimental::InterceptionHookPoints::POST_RECV_MESSAGE); |
| 322 | interceptor_methods_.SetRecvMessage(message: request_, hijacked_recv_message_failed: nullptr); |
| 323 | return RegisteredAsyncRequest::FinalizeResult(tag, status); |
| 324 | } |
| 325 | |
| 326 | private: |
| 327 | internal::RpcServiceMethod* const registered_method_; |
| 328 | Message* const request_; |
| 329 | ByteBuffer payload_; |
| 330 | }; |
| 331 | |
| 332 | class GenericAsyncRequest : public BaseAsyncRequest { |
| 333 | public: |
| 334 | GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, |
| 335 | internal::ServerAsyncStreamingInterface* stream, |
| 336 | ::grpc_impl::CompletionQueue* call_cq, |
| 337 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 338 | void* tag, bool delete_on_finalize); |
| 339 | |
| 340 | bool FinalizeResult(void** tag, bool* status) override; |
| 341 | |
| 342 | private: |
| 343 | grpc_call_details call_details_; |
| 344 | }; |
| 345 | |
| 346 | template <class Message> |
| 347 | void RequestAsyncCall(internal::RpcServiceMethod* method, |
| 348 | ::grpc_impl::ServerContext* context, |
| 349 | internal::ServerAsyncStreamingInterface* stream, |
| 350 | ::grpc_impl::CompletionQueue* call_cq, |
| 351 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 352 | void* tag, Message* message) { |
| 353 | GPR_CODEGEN_ASSERT(method); |
| 354 | new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, |
| 355 | notification_cq, tag, message); |
| 356 | } |
| 357 | |
| 358 | void RequestAsyncCall(internal::RpcServiceMethod* method, |
| 359 | ::grpc_impl::ServerContext* context, |
| 360 | internal::ServerAsyncStreamingInterface* stream, |
| 361 | ::grpc_impl::CompletionQueue* call_cq, |
| 362 | ::grpc_impl::ServerCompletionQueue* notification_cq, |
| 363 | void* tag) { |
| 364 | GPR_CODEGEN_ASSERT(method); |
| 365 | new NoPayloadAsyncRequest(method, this, context, stream, call_cq, |
| 366 | notification_cq, tag); |
| 367 | } |
| 368 | |
| 369 | void RequestAsyncGenericCall( |
| 370 | GenericServerContext* context, |
| 371 | internal::ServerAsyncStreamingInterface* stream, |
| 372 | ::grpc_impl::CompletionQueue* call_cq, |
| 373 | ::grpc_impl::ServerCompletionQueue* notification_cq, void* tag) { |
| 374 | new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, |
| 375 | tag, true); |
| 376 | } |
| 377 | |
| 378 | private: |
| 379 | // EXPERIMENTAL |
| 380 | // Getter method for the vector of interceptor factory objects. |
| 381 | // Returns a nullptr (rather than being pure) since this is a post-1.0 method |
| 382 | // and adding a new pure method to an interface would be a breaking change |
| 383 | // (even though this is private and non-API) |
| 384 | virtual std::vector< |
| 385 | std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* |
| 386 | interceptor_creators() { |
| 387 | return nullptr; |
| 388 | } |
| 389 | |
| 390 | // EXPERIMENTAL |
| 391 | // A method to get the callbackable completion queue associated with this |
| 392 | // server. If the return value is nullptr, this server doesn't support |
| 393 | // callback operations. |
| 394 | // TODO(vjpai): Consider a better default like using a global CQ |
| 395 | // Returns nullptr (rather than being pure) since this is a post-1.0 method |
| 396 | // and adding a new pure method to an interface would be a breaking change |
| 397 | // (even though this is private and non-API) |
| 398 | virtual ::grpc_impl::CompletionQueue* CallbackCQ() { return nullptr; } |
| 399 | }; |
| 400 | |
| 401 | } // namespace grpc |
| 402 | |
| 403 | #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H |
| 404 | |