| 1 | /* |
| 2 | * |
| 3 | * Copyright 2015 gRPC authors. |
| 4 | * |
| 5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | * you may not use this file except in compliance with the License. |
| 7 | * You may obtain a copy of the License at |
| 8 | * |
| 9 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | * |
| 11 | * Unless required by applicable law or agreed to in writing, software |
| 12 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | * See the License for the specific language governing permissions and |
| 15 | * limitations under the License. |
| 16 | * |
| 17 | */ |
| 18 | |
| 19 | #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H |
| 20 | #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H |
| 21 | |
| 22 | // IWYU pragma: private |
| 23 | |
| 24 | #include <grpc/impl/codegen/port_platform.h> |
| 25 | |
| 26 | #include <grpc/impl/codegen/grpc_types.h> |
| 27 | #include <grpcpp/impl/codegen/call.h> |
| 28 | #include <grpcpp/impl/codegen/call_hook.h> |
| 29 | #include <grpcpp/impl/codegen/completion_queue_tag.h> |
| 30 | #include <grpcpp/impl/codegen/core_codegen_interface.h> |
| 31 | #include <grpcpp/impl/codegen/interceptor_common.h> |
| 32 | #include <grpcpp/impl/codegen/rpc_service_method.h> |
| 33 | #include <grpcpp/impl/codegen/server_context.h> |
| 34 | #include <grpcpp/support/byte_buffer.h> |
| 35 | |
| 36 | namespace grpc { |
| 37 | |
| 38 | class AsyncGenericService; |
| 39 | class Channel; |
| 40 | class CompletionQueue; |
| 41 | class GenericServerContext; |
| 42 | class ServerCompletionQueue; |
| 43 | class ServerCredentials; |
| 44 | class Service; |
| 45 | |
| 46 | extern CoreCodegenInterface* g_core_codegen_interface; |
| 47 | |
| 48 | /// Models a gRPC server. |
| 49 | /// |
| 50 | /// Servers are configured and started via \a grpc::ServerBuilder. |
| 51 | namespace internal { |
| 52 | class ServerAsyncStreamingInterface; |
| 53 | } // namespace internal |
| 54 | |
| 55 | class CallbackGenericService; |
| 56 | |
| 57 | namespace experimental { |
| 58 | class ServerInterceptorFactoryInterface; |
| 59 | } // namespace experimental |
| 60 | |
| 61 | class ServerInterface : public internal::CallHook { |
| 62 | public: |
| 63 | ~ServerInterface() override {} |
| 64 | |
| 65 | /// \a Shutdown does the following things: |
| 66 | /// |
| 67 | /// 1. Shutdown the server: deactivate all listening ports, mark it in |
| 68 | /// "shutdown mode" so that further call Request's or incoming RPC matches |
| 69 | /// are no longer allowed. Also return all Request'ed-but-not-yet-active |
| 70 | /// calls as failed (!ok). This refers to calls that have been requested |
| 71 | /// at the server by the server-side library or application code but that |
| 72 | /// have not yet been matched to incoming RPCs from the client. Note that |
| 73 | /// this would even include default calls added automatically by the gRPC |
| 74 | /// C++ API without the user's input (e.g., "Unimplemented RPC method") |
| 75 | /// |
| 76 | /// 2. Block until all rpc method handlers invoked automatically by the sync |
| 77 | /// API finish. |
| 78 | /// |
| 79 | /// 3. If all pending calls complete (and all their operations are |
| 80 | /// retrieved by Next) before \a deadline expires, this finishes |
| 81 | /// gracefully. Otherwise, forcefully cancel all pending calls associated |
| 82 | /// with the server after \a deadline expires. In the case of the sync API, |
| 83 | /// if the RPC function for a streaming call has already been started and |
| 84 | /// takes a week to complete, the RPC function won't be forcefully |
| 85 | /// terminated (since that would leave state corrupt and incomplete) and |
| 86 | /// the method handler will just keep running (which will prevent the |
| 87 | /// server from completing the "join" operation that it needs to do at |
| 88 | /// shutdown time). |
| 89 | /// |
| 90 | /// All completion queue associated with the server (for example, for async |
| 91 | /// serving) must be shutdown *after* this method has returned: |
| 92 | /// See \a ServerBuilder::AddCompletionQueue for details. |
| 93 | /// They must also be drained (by repeated Next) after being shutdown. |
| 94 | /// |
| 95 | /// \param deadline How long to wait until pending rpcs are forcefully |
| 96 | /// terminated. |
| 97 | template <class T> |
| 98 | void Shutdown(const T& deadline) { |
| 99 | ShutdownInternal(deadline: TimePoint<T>(deadline).raw_time()); |
| 100 | } |
| 101 | |
| 102 | /// Shutdown the server without a deadline and forced cancellation. |
| 103 | /// |
| 104 | /// All completion queue associated with the server (for example, for async |
| 105 | /// serving) must be shutdown *after* this method has returned: |
| 106 | /// See \a ServerBuilder::AddCompletionQueue for details. |
| 107 | void Shutdown() { |
| 108 | ShutdownInternal( |
| 109 | deadline: g_core_codegen_interface->gpr_inf_future(type: GPR_CLOCK_MONOTONIC)); |
| 110 | } |
| 111 | |
| 112 | /// Block waiting for all work to complete. |
| 113 | /// |
| 114 | /// \warning The server must be either shutting down or some other thread must |
| 115 | /// call \a Shutdown for this function to ever return. |
| 116 | virtual void Wait() = 0; |
| 117 | |
| 118 | protected: |
| 119 | friend class grpc::Service; |
| 120 | |
| 121 | /// Register a service. This call does not take ownership of the service. |
| 122 | /// The service must exist for the lifetime of the Server instance. |
| 123 | virtual bool RegisterService(const std::string* host, Service* service) = 0; |
| 124 | |
| 125 | /// Register a generic service. This call does not take ownership of the |
| 126 | /// service. The service must exist for the lifetime of the Server instance. |
| 127 | virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; |
| 128 | |
| 129 | /// Register a callback generic service. This call does not take ownership of |
| 130 | /// the service. The service must exist for the lifetime of the Server |
| 131 | /// instance. May not be abstract since this is a post-1.0 API addition. |
| 132 | |
| 133 | virtual void RegisterCallbackGenericService(CallbackGenericService* |
| 134 | /*service*/) {} |
| 135 | |
| 136 | /// Tries to bind \a server to the given \a addr. |
| 137 | /// |
| 138 | /// It can be invoked multiple times. |
| 139 | /// |
| 140 | /// \param addr The address to try to bind to the server (eg, localhost:1234, |
| 141 | /// 192.168.1.1:31416, [::1]:27182, etc.). |
| 142 | /// \params creds The credentials associated with the server. |
| 143 | /// |
| 144 | /// \return bound port number on success, 0 on failure. |
| 145 | /// |
| 146 | /// \warning It's an error to call this method on an already started server. |
| 147 | virtual int AddListeningPort(const std::string& addr, |
| 148 | ServerCredentials* creds) = 0; |
| 149 | |
| 150 | /// Start the server. |
| 151 | /// |
| 152 | /// \param cqs Completion queues for handling asynchronous services. The |
| 153 | /// caller is required to keep all completion queues live until the server is |
| 154 | /// destroyed. |
| 155 | /// \param num_cqs How many completion queues does \a cqs hold. |
| 156 | virtual void Start(grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0; |
| 157 | |
| 158 | virtual void ShutdownInternal(gpr_timespec deadline) = 0; |
| 159 | |
| 160 | virtual int max_receive_message_size() const = 0; |
| 161 | |
| 162 | virtual grpc_server* server() = 0; |
| 163 | |
| 164 | void PerformOpsOnCall(internal::CallOpSetInterface* ops, |
| 165 | internal::Call* call) override = 0; |
| 166 | |
| 167 | class BaseAsyncRequest : public internal::CompletionQueueTag { |
| 168 | public: |
| 169 | BaseAsyncRequest(ServerInterface* server, grpc::ServerContext* context, |
| 170 | internal::ServerAsyncStreamingInterface* stream, |
| 171 | grpc::CompletionQueue* call_cq, |
| 172 | grpc::ServerCompletionQueue* notification_cq, void* tag, |
| 173 | bool delete_on_finalize); |
| 174 | ~BaseAsyncRequest() override; |
| 175 | |
| 176 | bool FinalizeResult(void** tag, bool* status) override; |
| 177 | |
| 178 | private: |
| 179 | void ContinueFinalizeResultAfterInterception(); |
| 180 | |
| 181 | protected: |
| 182 | ServerInterface* const server_; |
| 183 | grpc::ServerContext* const context_; |
| 184 | internal::ServerAsyncStreamingInterface* const stream_; |
| 185 | grpc::CompletionQueue* const call_cq_; |
| 186 | grpc::ServerCompletionQueue* const notification_cq_; |
| 187 | void* const tag_; |
| 188 | const bool delete_on_finalize_; |
| 189 | grpc_call* call_; |
| 190 | internal::Call call_wrapper_; |
| 191 | internal::InterceptorBatchMethodsImpl interceptor_methods_; |
| 192 | bool done_intercepting_; |
| 193 | }; |
| 194 | |
| 195 | /// RegisteredAsyncRequest is not part of the C++ API |
| 196 | class RegisteredAsyncRequest : public BaseAsyncRequest { |
| 197 | public: |
| 198 | RegisteredAsyncRequest(ServerInterface* server, |
| 199 | grpc::ServerContext* context, |
| 200 | internal::ServerAsyncStreamingInterface* stream, |
| 201 | grpc::CompletionQueue* call_cq, |
| 202 | grpc::ServerCompletionQueue* notification_cq, |
| 203 | void* tag, const char* name, |
| 204 | internal::RpcMethod::RpcType type); |
| 205 | |
| 206 | bool FinalizeResult(void** tag, bool* status) override { |
| 207 | /* If we are done intercepting, then there is nothing more for us to do */ |
| 208 | if (done_intercepting_) { |
| 209 | return BaseAsyncRequest::FinalizeResult(tag, status); |
| 210 | } |
| 211 | call_wrapper_ = grpc::internal::Call( |
| 212 | call_, server_, call_cq_, server_->max_receive_message_size(), |
| 213 | context_->set_server_rpc_info(method: name_, type: type_, |
| 214 | creators: *server_->interceptor_creators())); |
| 215 | return BaseAsyncRequest::FinalizeResult(tag, status); |
| 216 | } |
| 217 | |
| 218 | protected: |
| 219 | void IssueRequest(void* registered_method, grpc_byte_buffer** payload, |
| 220 | grpc::ServerCompletionQueue* notification_cq); |
| 221 | const char* name_; |
| 222 | const internal::RpcMethod::RpcType type_; |
| 223 | }; |
| 224 | |
| 225 | class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { |
| 226 | public: |
| 227 | NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, |
| 228 | ServerInterface* server, grpc::ServerContext* context, |
| 229 | internal::ServerAsyncStreamingInterface* stream, |
| 230 | grpc::CompletionQueue* call_cq, |
| 231 | grpc::ServerCompletionQueue* notification_cq, |
| 232 | void* tag) |
| 233 | : RegisteredAsyncRequest( |
| 234 | server, context, stream, call_cq, notification_cq, tag, |
| 235 | registered_method->name(), registered_method->method_type()) { |
| 236 | IssueRequest(registered_method: registered_method->server_tag(), payload: nullptr, notification_cq); |
| 237 | } |
| 238 | |
| 239 | // uses RegisteredAsyncRequest::FinalizeResult |
| 240 | }; |
| 241 | |
| 242 | template <class Message> |
| 243 | class PayloadAsyncRequest final : public RegisteredAsyncRequest { |
| 244 | public: |
| 245 | PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, |
| 246 | ServerInterface* server, grpc::ServerContext* context, |
| 247 | internal::ServerAsyncStreamingInterface* stream, |
| 248 | grpc::CompletionQueue* call_cq, |
| 249 | grpc::ServerCompletionQueue* notification_cq, void* tag, |
| 250 | Message* request) |
| 251 | : RegisteredAsyncRequest( |
| 252 | server, context, stream, call_cq, notification_cq, tag, |
| 253 | registered_method->name(), registered_method->method_type()), |
| 254 | registered_method_(registered_method), |
| 255 | request_(request) { |
| 256 | IssueRequest(registered_method: registered_method->server_tag(), payload: payload_.bbuf_ptr(), |
| 257 | notification_cq); |
| 258 | } |
| 259 | |
| 260 | ~PayloadAsyncRequest() override { |
| 261 | payload_.Release(); // We do not own the payload_ |
| 262 | } |
| 263 | |
| 264 | bool FinalizeResult(void** tag, bool* status) override { |
| 265 | /* If we are done intercepting, then there is nothing more for us to do */ |
| 266 | if (done_intercepting_) { |
| 267 | return RegisteredAsyncRequest::FinalizeResult(tag, status); |
| 268 | } |
| 269 | if (*status) { |
| 270 | if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( |
| 271 | payload_.bbuf_ptr(), request_) |
| 272 | .ok()) { |
| 273 | // If deserialization fails, we cancel the call and instantiate |
| 274 | // a new instance of ourselves to request another call. We then |
| 275 | // return false, which prevents the call from being returned to |
| 276 | // the application. |
| 277 | g_core_codegen_interface->grpc_call_cancel_with_status( |
| 278 | call: call_, status: GRPC_STATUS_INTERNAL, description: "Unable to parse request" , reserved: nullptr); |
| 279 | g_core_codegen_interface->grpc_call_unref(call: call_); |
| 280 | new PayloadAsyncRequest(registered_method_, server_, context_, |
| 281 | stream_, call_cq_, notification_cq_, tag_, |
| 282 | request_); |
| 283 | delete this; |
| 284 | return false; |
| 285 | } |
| 286 | } |
| 287 | /* Set interception point for recv message */ |
| 288 | interceptor_methods_.AddInterceptionHookPoint( |
| 289 | type: experimental::InterceptionHookPoints::POST_RECV_MESSAGE); |
| 290 | interceptor_methods_.SetRecvMessage(message: request_, hijacked_recv_message_failed: nullptr); |
| 291 | return RegisteredAsyncRequest::FinalizeResult(tag, status); |
| 292 | } |
| 293 | |
| 294 | private: |
| 295 | internal::RpcServiceMethod* const registered_method_; |
| 296 | Message* const request_; |
| 297 | ByteBuffer payload_; |
| 298 | }; |
| 299 | |
| 300 | class GenericAsyncRequest : public BaseAsyncRequest { |
| 301 | public: |
| 302 | GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, |
| 303 | internal::ServerAsyncStreamingInterface* stream, |
| 304 | grpc::CompletionQueue* call_cq, |
| 305 | grpc::ServerCompletionQueue* notification_cq, void* tag, |
| 306 | bool delete_on_finalize); |
| 307 | |
| 308 | bool FinalizeResult(void** tag, bool* status) override; |
| 309 | |
| 310 | private: |
| 311 | grpc_call_details call_details_; |
| 312 | }; |
| 313 | |
| 314 | template <class Message> |
| 315 | void RequestAsyncCall(internal::RpcServiceMethod* method, |
| 316 | grpc::ServerContext* context, |
| 317 | internal::ServerAsyncStreamingInterface* stream, |
| 318 | grpc::CompletionQueue* call_cq, |
| 319 | grpc::ServerCompletionQueue* notification_cq, void* tag, |
| 320 | Message* message) { |
| 321 | GPR_CODEGEN_ASSERT(method); |
| 322 | new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, |
| 323 | notification_cq, tag, message); |
| 324 | } |
| 325 | |
| 326 | void RequestAsyncCall(internal::RpcServiceMethod* method, |
| 327 | grpc::ServerContext* context, |
| 328 | internal::ServerAsyncStreamingInterface* stream, |
| 329 | grpc::CompletionQueue* call_cq, |
| 330 | grpc::ServerCompletionQueue* notification_cq, |
| 331 | void* tag) { |
| 332 | GPR_CODEGEN_ASSERT(method); |
| 333 | new NoPayloadAsyncRequest(method, this, context, stream, call_cq, |
| 334 | notification_cq, tag); |
| 335 | } |
| 336 | |
| 337 | void RequestAsyncGenericCall(GenericServerContext* context, |
| 338 | internal::ServerAsyncStreamingInterface* stream, |
| 339 | grpc::CompletionQueue* call_cq, |
| 340 | grpc::ServerCompletionQueue* notification_cq, |
| 341 | void* tag) { |
| 342 | new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, |
| 343 | tag, true); |
| 344 | } |
| 345 | |
| 346 | private: |
| 347 | // EXPERIMENTAL |
| 348 | // Getter method for the vector of interceptor factory objects. |
| 349 | // Returns a nullptr (rather than being pure) since this is a post-1.0 method |
| 350 | // and adding a new pure method to an interface would be a breaking change |
| 351 | // (even though this is private and non-API) |
| 352 | virtual std::vector< |
| 353 | std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* |
| 354 | interceptor_creators() { |
| 355 | return nullptr; |
| 356 | } |
| 357 | |
| 358 | // A method to get the callbackable completion queue associated with this |
| 359 | // server. If the return value is nullptr, this server doesn't support |
| 360 | // callback operations. |
| 361 | // TODO(vjpai): Consider a better default like using a global CQ |
| 362 | // Returns nullptr (rather than being pure) since this is a post-1.0 method |
| 363 | // and adding a new pure method to an interface would be a breaking change |
| 364 | // (even though this is private and non-API) |
| 365 | virtual grpc::CompletionQueue* CallbackCQ() { return nullptr; } |
| 366 | }; |
| 367 | |
| 368 | } // namespace grpc |
| 369 | |
| 370 | #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H |
| 371 | |