| 1 | /* |
| 2 | * |
| 3 | * Copyright 2015 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | #ifndef GRPCPP_SERVER_IMPL_H |
| 20 | #define GRPCPP_SERVER_IMPL_H |
| 21 | |
| 22 | #include <list> |
| 23 | #include <memory> |
| 24 | #include <vector> |
| 25 | |
| 26 | #include <grpc/impl/codegen/port_platform.h> |
| 27 | |
| 28 | #include <grpc/compression.h> |
| 29 | #include <grpc/support/atm.h> |
| 30 | #include <grpcpp/channel_impl.h> |
| 31 | #include <grpcpp/completion_queue_impl.h> |
| 32 | #include <grpcpp/health_check_service_interface.h> |
| 33 | #include <grpcpp/impl/call.h> |
| 34 | #include <grpcpp/impl/codegen/client_interceptor.h> |
| 35 | #include <grpcpp/impl/codegen/completion_queue_impl.h> |
| 36 | #include <grpcpp/impl/codegen/grpc_library.h> |
| 37 | #include <grpcpp/impl/codegen/server_interface.h> |
| 38 | #include <grpcpp/impl/rpc_service_method.h> |
| 39 | #include <grpcpp/security/server_credentials.h> |
| 40 | #include <grpcpp/support/channel_arguments_impl.h> |
| 41 | #include <grpcpp/support/config.h> |
| 42 | #include <grpcpp/support/status.h> |
| 43 | |
| 44 | struct grpc_server; |
| 45 | |
| 46 | namespace grpc { |
| 47 | class AsyncGenericService; |
| 48 | |
| 49 | namespace internal { |
| 50 | class ExternalConnectionAcceptorImpl; |
| 51 | } // namespace internal |
| 52 | |
| 53 | } // namespace grpc |
| 54 | |
| 55 | namespace grpc_impl { |
| 56 | class HealthCheckServiceInterface; |
| 57 | class ServerContext; |
| 58 | class ServerInitializer; |
| 59 | |
| 60 | /// Represents a gRPC server. |
| 61 | /// |
| 62 | /// Use a \a grpc::ServerBuilder to create, configure, and start |
| 63 | /// \a Server instances. |
| 64 | class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { |
| 65 | public: |
| 66 | ~Server(); |
| 67 | |
| 68 | /// Block until the server shuts down. |
| 69 | /// |
| 70 | /// \warning The server must be either shutting down or some other thread must |
| 71 | /// call \a Shutdown for this function to ever return. |
| 72 | void Wait() override; |
| 73 | |
| 74 | /// Global callbacks are a set of hooks that are called when server |
| 75 | /// events occur. \a SetGlobalCallbacks method is used to register |
| 76 | /// the hooks with gRPC. Note that |
| 77 | /// the \a GlobalCallbacks instance will be shared among all |
| 78 | /// \a Server instances in an application and can be set exactly |
| 79 | /// once per application. |
| 80 | class GlobalCallbacks { |
| 81 | public: |
| 82 | virtual ~GlobalCallbacks() {} |
| 83 | /// Called before server is created. |
| 84 | virtual void UpdateArguments(ChannelArguments* /*args*/) {} |
| 85 | /// Called before application callback for each synchronous server request |
| 86 | virtual void PreSynchronousRequest(grpc_impl::ServerContext* context) = 0; |
| 87 | /// Called after application callback for each synchronous server request |
| 88 | virtual void PostSynchronousRequest(grpc_impl::ServerContext* context) = 0; |
| 89 | /// Called before server is started. |
| 90 | virtual void PreServerStart(Server* /*server*/) {} |
| 91 | /// Called after a server port is added. |
| 92 | virtual void AddPort(Server* /*server*/, const grpc::string& /*addr*/, |
| 93 | grpc::ServerCredentials* /*creds*/, int /*port*/) {} |
| 94 | }; |
| 95 | /// Set the global callback object. Can only be called once per application. |
| 96 | /// Does not take ownership of callbacks, and expects the pointed to object |
| 97 | /// to be alive until all server objects in the process have been destroyed. |
| 98 | /// The same \a GlobalCallbacks object will be used throughout the |
| 99 | /// application and is shared among all \a Server objects. |
| 100 | static void SetGlobalCallbacks(GlobalCallbacks* callbacks); |
| 101 | |
| 102 | /// Returns a \em raw pointer to the underlying \a grpc_server instance. |
| 103 | /// EXPERIMENTAL: for internal/test use only |
| 104 | grpc_server* c_server(); |
| 105 | |
| 106 | /// Returns the health check service. |
| 107 | grpc::HealthCheckServiceInterface* GetHealthCheckService() const { |
| 108 | return health_check_service_.get(); |
| 109 | } |
| 110 | |
| 111 | /// Establish a channel for in-process communication |
| 112 | std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); |
| 113 | |
| 114 | /// NOTE: class experimental_type is not part of the public API of this class. |
| 115 | /// TODO(yashykt): Integrate into public API when this is no longer |
| 116 | /// experimental. |
| 117 | class experimental_type { |
| 118 | public: |
| 119 | explicit experimental_type(Server* server) : server_(server) {} |
| 120 | |
| 121 | /// Establish a channel for in-process communication with client |
| 122 | /// interceptors |
| 123 | std::shared_ptr<Channel> InProcessChannelWithInterceptors( |
| 124 | const ChannelArguments& args, |
| 125 | std::vector<std::unique_ptr< |
| 126 | grpc::experimental::ClientInterceptorFactoryInterface>> |
| 127 | interceptor_creators); |
| 128 | |
| 129 | private: |
| 130 | Server* server_; |
| 131 | }; |
| 132 | |
| 133 | /// NOTE: The function experimental() is not stable public API. It is a view |
| 134 | /// to the experimental components of this class. It may be changed or removed |
| 135 | /// at any time. |
| 136 | experimental_type experimental() { return experimental_type(this); } |
| 137 | |
| 138 | protected: |
| 139 | /// Register a service. This call does not take ownership of the service. |
| 140 | /// The service must exist for the lifetime of the Server instance. |
| 141 | bool RegisterService(const grpc::string* host, |
| 142 | grpc::Service* service) override; |
| 143 | |
| 144 | /// Try binding the server to the given \a addr endpoint |
| 145 | /// (port, and optionally including IP address to bind to). |
| 146 | /// |
| 147 | /// It can be invoked multiple times. Should be used before |
| 148 | /// starting the server. |
| 149 | /// |
| 150 | /// \param addr The address to try to bind to the server (eg, localhost:1234, |
| 151 | /// 192.168.1.1:31416, [::1]:27182, etc.). |
| 152 | /// \param creds The credentials associated with the server. |
| 153 | /// |
| 154 | /// \return bound port number on success, 0 on failure. |
| 155 | /// |
| 156 | /// \warning It is an error to call this method on an already started server. |
| 157 | int AddListeningPort(const grpc::string& addr, |
| 158 | grpc::ServerCredentials* creds) override; |
| 159 | |
| 160 | /// NOTE: This is *NOT* a public API. The server constructors are supposed to |
| 161 | /// be used by \a ServerBuilder class only. The constructor will be made |
| 162 | /// 'private' very soon. |
| 163 | /// |
| 164 | /// Server constructors. To be used by \a ServerBuilder only. |
| 165 | /// |
| 166 | /// \param args The channel args |
| 167 | /// |
| 168 | /// \param sync_server_cqs The completion queues to use if the server is a |
| 169 | /// synchronous server (or a hybrid server). The server polls for new RPCs on |
| 170 | /// these queues |
| 171 | /// |
| 172 | /// \param min_pollers The minimum number of polling threads per server |
| 173 | /// completion queue (in param sync_server_cqs) to use for listening to |
| 174 | /// incoming requests (used only in case of sync server) |
| 175 | /// |
| 176 | /// \param max_pollers The maximum number of polling threads per server |
| 177 | /// completion queue (in param sync_server_cqs) to use for listening to |
| 178 | /// incoming requests (used only in case of sync server) |
| 179 | /// |
| 180 | /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on |
| 181 | /// server completion queues passed via sync_server_cqs param. |
| 182 | Server(ChannelArguments* args, |
| 183 | std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> |
| 184 | sync_server_cqs, |
| 185 | int min_pollers, int max_pollers, int sync_cq_timeout_msec, |
| 186 | std::vector< |
| 187 | std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> |
| 188 | acceptors, |
| 189 | grpc_resource_quota* server_rq = nullptr, |
| 190 | std::vector<std::unique_ptr< |
| 191 | grpc::experimental::ServerInterceptorFactoryInterface>> |
| 192 | interceptor_creators = std::vector<std::unique_ptr< |
| 193 | grpc::experimental::ServerInterceptorFactoryInterface>>()); |
| 194 | |
| 195 | /// Start the server. |
| 196 | /// |
| 197 | /// \param cqs Completion queues for handling asynchronous services. The |
| 198 | /// caller is required to keep all completion queues live until the server is |
| 199 | /// destroyed. |
| 200 | /// \param num_cqs How many completion queues does \a cqs hold. |
| 201 | void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; |
| 202 | |
| 203 | grpc_server* server() override { return server_; } |
| 204 | |
| 205 | protected: |
| 206 | /// NOTE: This method is not part of the public API for this class. |
| 207 | void set_health_check_service( |
| 208 | std::unique_ptr<grpc::HealthCheckServiceInterface> service) { |
| 209 | health_check_service_ = std::move(service); |
| 210 | } |
| 211 | |
| 212 | /// NOTE: This method is not part of the public API for this class. |
| 213 | bool health_check_service_disabled() const { |
| 214 | return health_check_service_disabled_; |
| 215 | } |
| 216 | |
| 217 | private: |
| 218 | std::vector< |
| 219 | std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>* |
| 220 | interceptor_creators() override { |
| 221 | return &interceptor_creators_; |
| 222 | } |
| 223 | |
| 224 | friend class grpc::AsyncGenericService; |
| 225 | friend class grpc_impl::ServerBuilder; |
| 226 | friend class grpc_impl::ServerInitializer; |
| 227 | |
| 228 | class SyncRequest; |
| 229 | class CallbackRequestBase; |
| 230 | template <class ServerContextType> |
| 231 | class CallbackRequest; |
| 232 | class UnimplementedAsyncRequest; |
| 233 | class UnimplementedAsyncResponse; |
| 234 | |
| 235 | /// SyncRequestThreadManager is an implementation of ThreadManager. This class |
| 236 | /// is responsible for polling for incoming RPCs and calling the RPC handlers. |
| 237 | /// This is only used in case of a Sync server (i.e a server exposing a sync |
| 238 | /// interface) |
| 239 | class SyncRequestThreadManager; |
| 240 | |
| 241 | /// Register a generic service. This call does not take ownership of the |
| 242 | /// service. The service must exist for the lifetime of the Server instance. |
| 243 | void RegisterAsyncGenericService(grpc::AsyncGenericService* service) override; |
| 244 | |
| 245 | #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL |
| 246 | /// Register a callback-based generic service. This call does not take |
| 247 | /// ownership of theservice. The service must exist for the lifetime of the |
| 248 | /// Server instance. |
| 249 | void RegisterCallbackGenericService( |
| 250 | grpc::CallbackGenericService* service) override; |
| 251 | #else |
| 252 | /// NOTE: class experimental_registration_type is not part of the public API |
| 253 | /// of this class |
| 254 | /// TODO(vjpai): Move these contents to the public API of Server when |
| 255 | /// they are no longer experimental |
| 256 | class experimental_registration_type final |
| 257 | : public experimental_registration_interface { |
| 258 | public: |
| 259 | explicit experimental_registration_type(Server* server) : server_(server) {} |
| 260 | void RegisterCallbackGenericService( |
| 261 | grpc::experimental::CallbackGenericService* service) override { |
| 262 | server_->RegisterCallbackGenericService(service); |
| 263 | } |
| 264 | |
| 265 | private: |
| 266 | Server* server_; |
| 267 | }; |
| 268 | |
| 269 | /// TODO(vjpai): Mark this override when experimental type above is deleted |
| 270 | void RegisterCallbackGenericService( |
| 271 | grpc::experimental::CallbackGenericService* service); |
| 272 | |
| 273 | /// NOTE: The function experimental_registration() is not stable public API. |
| 274 | /// It is a view to the experimental components of this class. It may be |
| 275 | /// changed or removed at any time. |
| 276 | experimental_registration_interface* experimental_registration() override { |
| 277 | return &experimental_registration_; |
| 278 | } |
| 279 | #endif |
| 280 | |
| 281 | void PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, |
| 282 | grpc::internal::Call* call) override; |
| 283 | |
| 284 | void ShutdownInternal(gpr_timespec deadline) override; |
| 285 | |
| 286 | int max_receive_message_size() const override { |
| 287 | return max_receive_message_size_; |
| 288 | } |
| 289 | |
| 290 | CompletionQueue* CallbackCQ() override; |
| 291 | |
| 292 | grpc_impl::ServerInitializer* initializer(); |
| 293 | |
| 294 | // Functions to manage the server shutdown ref count. Things that increase |
| 295 | // the ref count are the running state of the server (take a ref at start and |
| 296 | // drop it at shutdown) and each running callback RPC. |
| 297 | void Ref(); |
| 298 | void UnrefWithPossibleNotify() /* LOCKS_EXCLUDED(mu_) */; |
| 299 | void UnrefAndWaitLocked() /* EXCLUSIVE_LOCKS_REQUIRED(mu_) */; |
| 300 | |
| 301 | std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> |
| 302 | acceptors_; |
| 303 | |
| 304 | // A vector of interceptor factory objects. |
| 305 | // This should be destroyed after health_check_service_ and this requirement |
| 306 | // is satisfied by declaring interceptor_creators_ before |
| 307 | // health_check_service_. (C++ mandates that member objects be destroyed in |
| 308 | // the reverse order of initialization.) |
| 309 | std::vector< |
| 310 | std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> |
| 311 | interceptor_creators_; |
| 312 | |
| 313 | int max_receive_message_size_; |
| 314 | |
| 315 | /// The following completion queues are ONLY used in case of Sync API |
| 316 | /// i.e. if the server has any services with sync methods. The server uses |
| 317 | /// these completion queues to poll for new RPCs |
| 318 | std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> |
| 319 | sync_server_cqs_; |
| 320 | |
| 321 | /// List of \a ThreadManager instances (one for each cq in |
| 322 | /// the \a sync_server_cqs) |
| 323 | std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; |
| 324 | |
| 325 | #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL |
| 326 | // For registering experimental callback generic service; remove when that |
| 327 | // method longer experimental |
| 328 | experimental_registration_type experimental_registration_{this}; |
| 329 | #endif |
| 330 | |
| 331 | // Server status |
| 332 | grpc::internal::Mutex mu_; |
| 333 | bool started_; |
| 334 | bool shutdown_; |
| 335 | bool shutdown_notified_; // Was notify called on the shutdown_cv_ |
| 336 | grpc::internal::CondVar shutdown_done_cv_; |
| 337 | bool shutdown_done_ = false; |
| 338 | std::atomic_int shutdown_refs_outstanding_{1}; |
| 339 | |
| 340 | grpc::internal::CondVar shutdown_cv_; |
| 341 | |
| 342 | std::shared_ptr<GlobalCallbacks> global_callbacks_; |
| 343 | |
| 344 | std::vector<grpc::string> services_; |
| 345 | bool has_async_generic_service_ = false; |
| 346 | bool has_callback_generic_service_ = false; |
| 347 | bool has_callback_methods_ = false; |
| 348 | |
| 349 | // Pointer to the wrapped grpc_server. |
| 350 | grpc_server* server_; |
| 351 | |
| 352 | std::unique_ptr<grpc_impl::ServerInitializer> server_initializer_; |
| 353 | |
| 354 | std::unique_ptr<grpc::HealthCheckServiceInterface> health_check_service_; |
| 355 | bool health_check_service_disabled_; |
| 356 | |
| 357 | // When appropriate, use a default callback generic service to handle |
| 358 | // unimplemented methods |
| 359 | #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL |
| 360 | std::unique_ptr<grpc::CallbackGenericService> unimplemented_service_; |
| 361 | #else |
| 362 | std::unique_ptr<grpc::experimental::CallbackGenericService> |
| 363 | unimplemented_service_; |
| 364 | #endif |
| 365 | |
| 366 | // A special handler for resource exhausted in sync case |
| 367 | std::unique_ptr<grpc::internal::MethodHandler> resource_exhausted_handler_; |
| 368 | |
| 369 | // Handler for callback generic service, if any |
| 370 | std::unique_ptr<grpc::internal::MethodHandler> generic_handler_; |
| 371 | |
| 372 | // callback_cq_ references the callbackable completion queue associated |
| 373 | // with this server (if any). It is set on the first call to CallbackCQ(). |
| 374 | // It is _not owned_ by the server; ownership belongs with its internal |
| 375 | // shutdown callback tag (invoked when the CQ is fully shutdown). |
| 376 | CompletionQueue* callback_cq_ /* GUARDED_BY(mu_) */ = nullptr; |
| 377 | |
| 378 | // List of CQs passed in by user that must be Shutdown only after Server is |
| 379 | // Shutdown. Even though this is only used with NDEBUG, instantiate it in all |
| 380 | // cases since otherwise the size will be inconsistent. |
| 381 | std::vector<CompletionQueue*> cq_list_; |
| 382 | }; |
| 383 | |
| 384 | } // namespace grpc_impl |
| 385 | |
| 386 | #endif // GRPCPP_SERVER_IMPL_H |
| 387 | |