1 | /* |
2 | * |
3 | * Copyright 2015 gRPC authors. |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | * you may not use this file except in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, software |
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | * See the License for the specific language governing permissions and |
15 | * limitations under the License. |
16 | * |
17 | */ |
18 | |
19 | #ifndef GRPCPP_SERVER_IMPL_H |
20 | #define GRPCPP_SERVER_IMPL_H |
21 | |
22 | #include <list> |
23 | #include <memory> |
24 | #include <vector> |
25 | |
26 | #include <grpc/impl/codegen/port_platform.h> |
27 | |
28 | #include <grpc/compression.h> |
29 | #include <grpc/support/atm.h> |
30 | #include <grpcpp/channel_impl.h> |
31 | #include <grpcpp/completion_queue_impl.h> |
32 | #include <grpcpp/health_check_service_interface.h> |
33 | #include <grpcpp/impl/call.h> |
34 | #include <grpcpp/impl/codegen/client_interceptor.h> |
35 | #include <grpcpp/impl/codegen/completion_queue_impl.h> |
36 | #include <grpcpp/impl/codegen/grpc_library.h> |
37 | #include <grpcpp/impl/codegen/server_interface.h> |
38 | #include <grpcpp/impl/rpc_service_method.h> |
39 | #include <grpcpp/security/server_credentials.h> |
40 | #include <grpcpp/support/channel_arguments_impl.h> |
41 | #include <grpcpp/support/config.h> |
42 | #include <grpcpp/support/status.h> |
43 | |
44 | struct grpc_server; |
45 | |
46 | namespace grpc { |
47 | class AsyncGenericService; |
48 | |
49 | namespace internal { |
50 | class ExternalConnectionAcceptorImpl; |
51 | } // namespace internal |
52 | |
53 | } // namespace grpc |
54 | |
55 | namespace grpc_impl { |
56 | class HealthCheckServiceInterface; |
57 | class ServerContext; |
58 | class ServerInitializer; |
59 | |
60 | /// Represents a gRPC server. |
61 | /// |
62 | /// Use a \a grpc::ServerBuilder to create, configure, and start |
63 | /// \a Server instances. |
64 | class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen { |
65 | public: |
66 | ~Server(); |
67 | |
68 | /// Block until the server shuts down. |
69 | /// |
70 | /// \warning The server must be either shutting down or some other thread must |
71 | /// call \a Shutdown for this function to ever return. |
72 | void Wait() override; |
73 | |
74 | /// Global callbacks are a set of hooks that are called when server |
75 | /// events occur. \a SetGlobalCallbacks method is used to register |
76 | /// the hooks with gRPC. Note that |
77 | /// the \a GlobalCallbacks instance will be shared among all |
78 | /// \a Server instances in an application and can be set exactly |
79 | /// once per application. |
80 | class GlobalCallbacks { |
81 | public: |
82 | virtual ~GlobalCallbacks() {} |
83 | /// Called before server is created. |
84 | virtual void UpdateArguments(ChannelArguments* /*args*/) {} |
85 | /// Called before application callback for each synchronous server request |
86 | virtual void PreSynchronousRequest(grpc_impl::ServerContext* context) = 0; |
87 | /// Called after application callback for each synchronous server request |
88 | virtual void PostSynchronousRequest(grpc_impl::ServerContext* context) = 0; |
89 | /// Called before server is started. |
90 | virtual void PreServerStart(Server* /*server*/) {} |
91 | /// Called after a server port is added. |
92 | virtual void AddPort(Server* /*server*/, const grpc::string& /*addr*/, |
93 | grpc::ServerCredentials* /*creds*/, int /*port*/) {} |
94 | }; |
95 | /// Set the global callback object. Can only be called once per application. |
96 | /// Does not take ownership of callbacks, and expects the pointed to object |
97 | /// to be alive until all server objects in the process have been destroyed. |
98 | /// The same \a GlobalCallbacks object will be used throughout the |
99 | /// application and is shared among all \a Server objects. |
100 | static void SetGlobalCallbacks(GlobalCallbacks* callbacks); |
101 | |
102 | /// Returns a \em raw pointer to the underlying \a grpc_server instance. |
103 | /// EXPERIMENTAL: for internal/test use only |
104 | grpc_server* c_server(); |
105 | |
106 | /// Returns the health check service. |
107 | grpc::HealthCheckServiceInterface* GetHealthCheckService() const { |
108 | return health_check_service_.get(); |
109 | } |
110 | |
111 | /// Establish a channel for in-process communication |
112 | std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); |
113 | |
114 | /// NOTE: class experimental_type is not part of the public API of this class. |
115 | /// TODO(yashykt): Integrate into public API when this is no longer |
116 | /// experimental. |
117 | class experimental_type { |
118 | public: |
119 | explicit experimental_type(Server* server) : server_(server) {} |
120 | |
121 | /// Establish a channel for in-process communication with client |
122 | /// interceptors |
123 | std::shared_ptr<Channel> InProcessChannelWithInterceptors( |
124 | const ChannelArguments& args, |
125 | std::vector<std::unique_ptr< |
126 | grpc::experimental::ClientInterceptorFactoryInterface>> |
127 | interceptor_creators); |
128 | |
129 | private: |
130 | Server* server_; |
131 | }; |
132 | |
133 | /// NOTE: The function experimental() is not stable public API. It is a view |
134 | /// to the experimental components of this class. It may be changed or removed |
135 | /// at any time. |
136 | experimental_type experimental() { return experimental_type(this); } |
137 | |
138 | protected: |
139 | /// Register a service. This call does not take ownership of the service. |
140 | /// The service must exist for the lifetime of the Server instance. |
141 | bool RegisterService(const grpc::string* host, |
142 | grpc::Service* service) override; |
143 | |
144 | /// Try binding the server to the given \a addr endpoint |
145 | /// (port, and optionally including IP address to bind to). |
146 | /// |
147 | /// It can be invoked multiple times. Should be used before |
148 | /// starting the server. |
149 | /// |
150 | /// \param addr The address to try to bind to the server (eg, localhost:1234, |
151 | /// 192.168.1.1:31416, [::1]:27182, etc.). |
152 | /// \param creds The credentials associated with the server. |
153 | /// |
154 | /// \return bound port number on success, 0 on failure. |
155 | /// |
156 | /// \warning It is an error to call this method on an already started server. |
157 | int AddListeningPort(const grpc::string& addr, |
158 | grpc::ServerCredentials* creds) override; |
159 | |
160 | /// NOTE: This is *NOT* a public API. The server constructors are supposed to |
161 | /// be used by \a ServerBuilder class only. The constructor will be made |
162 | /// 'private' very soon. |
163 | /// |
164 | /// Server constructors. To be used by \a ServerBuilder only. |
165 | /// |
166 | /// \param args The channel args |
167 | /// |
168 | /// \param sync_server_cqs The completion queues to use if the server is a |
169 | /// synchronous server (or a hybrid server). The server polls for new RPCs on |
170 | /// these queues |
171 | /// |
172 | /// \param min_pollers The minimum number of polling threads per server |
173 | /// completion queue (in param sync_server_cqs) to use for listening to |
174 | /// incoming requests (used only in case of sync server) |
175 | /// |
176 | /// \param max_pollers The maximum number of polling threads per server |
177 | /// completion queue (in param sync_server_cqs) to use for listening to |
178 | /// incoming requests (used only in case of sync server) |
179 | /// |
180 | /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on |
181 | /// server completion queues passed via sync_server_cqs param. |
182 | Server(ChannelArguments* args, |
183 | std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> |
184 | sync_server_cqs, |
185 | int min_pollers, int max_pollers, int sync_cq_timeout_msec, |
186 | std::vector< |
187 | std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> |
188 | acceptors, |
189 | grpc_resource_quota* server_rq = nullptr, |
190 | std::vector<std::unique_ptr< |
191 | grpc::experimental::ServerInterceptorFactoryInterface>> |
192 | interceptor_creators = std::vector<std::unique_ptr< |
193 | grpc::experimental::ServerInterceptorFactoryInterface>>()); |
194 | |
195 | /// Start the server. |
196 | /// |
197 | /// \param cqs Completion queues for handling asynchronous services. The |
198 | /// caller is required to keep all completion queues live until the server is |
199 | /// destroyed. |
200 | /// \param num_cqs How many completion queues does \a cqs hold. |
201 | void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; |
202 | |
203 | grpc_server* server() override { return server_; } |
204 | |
205 | protected: |
206 | /// NOTE: This method is not part of the public API for this class. |
207 | void set_health_check_service( |
208 | std::unique_ptr<grpc::HealthCheckServiceInterface> service) { |
209 | health_check_service_ = std::move(service); |
210 | } |
211 | |
212 | /// NOTE: This method is not part of the public API for this class. |
213 | bool health_check_service_disabled() const { |
214 | return health_check_service_disabled_; |
215 | } |
216 | |
217 | private: |
218 | std::vector< |
219 | std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>* |
220 | interceptor_creators() override { |
221 | return &interceptor_creators_; |
222 | } |
223 | |
224 | friend class grpc::AsyncGenericService; |
225 | friend class grpc_impl::ServerBuilder; |
226 | friend class grpc_impl::ServerInitializer; |
227 | |
228 | class SyncRequest; |
229 | class CallbackRequestBase; |
230 | template <class ServerContextType> |
231 | class CallbackRequest; |
232 | class UnimplementedAsyncRequest; |
233 | class UnimplementedAsyncResponse; |
234 | |
235 | /// SyncRequestThreadManager is an implementation of ThreadManager. This class |
236 | /// is responsible for polling for incoming RPCs and calling the RPC handlers. |
237 | /// This is only used in case of a Sync server (i.e a server exposing a sync |
238 | /// interface) |
239 | class SyncRequestThreadManager; |
240 | |
241 | /// Register a generic service. This call does not take ownership of the |
242 | /// service. The service must exist for the lifetime of the Server instance. |
243 | void RegisterAsyncGenericService(grpc::AsyncGenericService* service) override; |
244 | |
245 | #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL |
246 | /// Register a callback-based generic service. This call does not take |
247 | /// ownership of theservice. The service must exist for the lifetime of the |
248 | /// Server instance. |
249 | void RegisterCallbackGenericService( |
250 | grpc::CallbackGenericService* service) override; |
251 | #else |
252 | /// NOTE: class experimental_registration_type is not part of the public API |
253 | /// of this class |
254 | /// TODO(vjpai): Move these contents to the public API of Server when |
255 | /// they are no longer experimental |
256 | class experimental_registration_type final |
257 | : public experimental_registration_interface { |
258 | public: |
259 | explicit experimental_registration_type(Server* server) : server_(server) {} |
260 | void RegisterCallbackGenericService( |
261 | grpc::experimental::CallbackGenericService* service) override { |
262 | server_->RegisterCallbackGenericService(service); |
263 | } |
264 | |
265 | private: |
266 | Server* server_; |
267 | }; |
268 | |
269 | /// TODO(vjpai): Mark this override when experimental type above is deleted |
270 | void RegisterCallbackGenericService( |
271 | grpc::experimental::CallbackGenericService* service); |
272 | |
273 | /// NOTE: The function experimental_registration() is not stable public API. |
274 | /// It is a view to the experimental components of this class. It may be |
275 | /// changed or removed at any time. |
276 | experimental_registration_interface* experimental_registration() override { |
277 | return &experimental_registration_; |
278 | } |
279 | #endif |
280 | |
281 | void PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops, |
282 | grpc::internal::Call* call) override; |
283 | |
284 | void ShutdownInternal(gpr_timespec deadline) override; |
285 | |
286 | int max_receive_message_size() const override { |
287 | return max_receive_message_size_; |
288 | } |
289 | |
290 | CompletionQueue* CallbackCQ() override; |
291 | |
292 | grpc_impl::ServerInitializer* initializer(); |
293 | |
294 | // Functions to manage the server shutdown ref count. Things that increase |
295 | // the ref count are the running state of the server (take a ref at start and |
296 | // drop it at shutdown) and each running callback RPC. |
297 | void Ref(); |
298 | void UnrefWithPossibleNotify() /* LOCKS_EXCLUDED(mu_) */; |
299 | void UnrefAndWaitLocked() /* EXCLUSIVE_LOCKS_REQUIRED(mu_) */; |
300 | |
301 | std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>> |
302 | acceptors_; |
303 | |
304 | // A vector of interceptor factory objects. |
305 | // This should be destroyed after health_check_service_ and this requirement |
306 | // is satisfied by declaring interceptor_creators_ before |
307 | // health_check_service_. (C++ mandates that member objects be destroyed in |
308 | // the reverse order of initialization.) |
309 | std::vector< |
310 | std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>> |
311 | interceptor_creators_; |
312 | |
313 | int max_receive_message_size_; |
314 | |
315 | /// The following completion queues are ONLY used in case of Sync API |
316 | /// i.e. if the server has any services with sync methods. The server uses |
317 | /// these completion queues to poll for new RPCs |
318 | std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> |
319 | sync_server_cqs_; |
320 | |
321 | /// List of \a ThreadManager instances (one for each cq in |
322 | /// the \a sync_server_cqs) |
323 | std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; |
324 | |
325 | #ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL |
326 | // For registering experimental callback generic service; remove when that |
327 | // method longer experimental |
328 | experimental_registration_type experimental_registration_{this}; |
329 | #endif |
330 | |
331 | // Server status |
332 | grpc::internal::Mutex mu_; |
333 | bool started_; |
334 | bool shutdown_; |
335 | bool shutdown_notified_; // Was notify called on the shutdown_cv_ |
336 | grpc::internal::CondVar shutdown_done_cv_; |
337 | bool shutdown_done_ = false; |
338 | std::atomic_int shutdown_refs_outstanding_{1}; |
339 | |
340 | grpc::internal::CondVar shutdown_cv_; |
341 | |
342 | std::shared_ptr<GlobalCallbacks> global_callbacks_; |
343 | |
344 | std::vector<grpc::string> services_; |
345 | bool has_async_generic_service_ = false; |
346 | bool has_callback_generic_service_ = false; |
347 | bool has_callback_methods_ = false; |
348 | |
349 | // Pointer to the wrapped grpc_server. |
350 | grpc_server* server_; |
351 | |
352 | std::unique_ptr<grpc_impl::ServerInitializer> server_initializer_; |
353 | |
354 | std::unique_ptr<grpc::HealthCheckServiceInterface> health_check_service_; |
355 | bool health_check_service_disabled_; |
356 | |
357 | // When appropriate, use a default callback generic service to handle |
358 | // unimplemented methods |
359 | #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL |
360 | std::unique_ptr<grpc::CallbackGenericService> unimplemented_service_; |
361 | #else |
362 | std::unique_ptr<grpc::experimental::CallbackGenericService> |
363 | unimplemented_service_; |
364 | #endif |
365 | |
366 | // A special handler for resource exhausted in sync case |
367 | std::unique_ptr<grpc::internal::MethodHandler> resource_exhausted_handler_; |
368 | |
369 | // Handler for callback generic service, if any |
370 | std::unique_ptr<grpc::internal::MethodHandler> generic_handler_; |
371 | |
372 | // callback_cq_ references the callbackable completion queue associated |
373 | // with this server (if any). It is set on the first call to CallbackCQ(). |
374 | // It is _not owned_ by the server; ownership belongs with its internal |
375 | // shutdown callback tag (invoked when the CQ is fully shutdown). |
376 | CompletionQueue* callback_cq_ /* GUARDED_BY(mu_) */ = nullptr; |
377 | |
378 | // List of CQs passed in by user that must be Shutdown only after Server is |
379 | // Shutdown. Even though this is only used with NDEBUG, instantiate it in all |
380 | // cases since otherwise the size will be inconsistent. |
381 | std::vector<CompletionQueue*> cq_list_; |
382 | }; |
383 | |
384 | } // namespace grpc_impl |
385 | |
386 | #endif // GRPCPP_SERVER_IMPL_H |
387 | |