• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 gRPC authors.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  */
17 
18 #include <grpcpp/server.h>
19 
20 #include <cstdlib>
21 #include <sstream>
22 #include <utility>
23 
24 #include <grpc/grpc.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/completion_queue.h>
28 #include <grpcpp/generic/async_generic_service.h>
29 #include <grpcpp/impl/codegen/async_unary_call.h>
30 #include <grpcpp/impl/codegen/completion_queue_tag.h>
31 #include <grpcpp/impl/grpc_library.h>
32 #include <grpcpp/impl/method_handler_impl.h>
33 #include <grpcpp/impl/rpc_service_method.h>
34 #include <grpcpp/impl/server_initializer.h>
35 #include <grpcpp/impl/service_type.h>
36 #include <grpcpp/security/server_credentials.h>
37 #include <grpcpp/server_context.h>
38 #include <grpcpp/support/time.h>
39 
40 #include "src/core/ext/transport/inproc/inproc_transport.h"
41 #include "src/core/lib/profiling/timers.h"
42 #include "src/core/lib/surface/call.h"
43 #include "src/cpp/client/create_channel_internal.h"
44 #include "src/cpp/server/health/default_health_check_service.h"
45 #include "src/cpp/thread_manager/thread_manager.h"
46 
47 namespace grpc {
48 namespace {
49 
50 // The default value for maximum number of threads that can be created in the
51 // sync server. This value of INT_MAX is chosen to match the default behavior if
52 // no ResourceQuota is set. To modify the max number of threads in a sync
53 // server, pass a custom ResourceQuota object  (with the desired number of
54 // max-threads set) to the server builder.
55 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
56 
57 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
58  public:
~DefaultGlobalCallbacks()59   ~DefaultGlobalCallbacks() override {}
PreSynchronousRequest(ServerContext * context)60   void PreSynchronousRequest(ServerContext* context) override {}
PostSynchronousRequest(ServerContext * context)61   void PostSynchronousRequest(ServerContext* context) override {}
62 };
63 
64 std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
65 gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
66 
InitGlobalCallbacks()67 void InitGlobalCallbacks() {
68   if (!g_callbacks) {
69     g_callbacks.reset(new DefaultGlobalCallbacks());
70   }
71 }
72 
73 class ShutdownTag : public internal::CompletionQueueTag {
74  public:
FinalizeResult(void ** tag,bool * status)75   bool FinalizeResult(void** tag, bool* status) { return false; }
76 };
77 
78 class DummyTag : public internal::CompletionQueueTag {
79  public:
FinalizeResult(void ** tag,bool * status)80   bool FinalizeResult(void** tag, bool* status) {
81     *status = true;
82     return true;
83   }
84 };
85 
86 class UnimplementedAsyncRequestContext {
87  protected:
UnimplementedAsyncRequestContext()88   UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
89 
90   GenericServerContext server_context_;
91   GenericServerAsyncReaderWriter generic_stream_;
92 };
93 
94 }  // namespace
95 
96 /// Use private inheritance rather than composition only to establish order
97 /// of construction, since the public base class should be constructed after the
98 /// elements belonging to the private base class are constructed. This is not
99 /// possible using true composition.
100 class Server::UnimplementedAsyncRequest final
101     : private UnimplementedAsyncRequestContext,
102       public GenericAsyncRequest {
103  public:
UnimplementedAsyncRequest(Server * server,ServerCompletionQueue * cq)104   UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
105       : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
106                             nullptr, false),
107         server_(server),
108         cq_(cq) {}
109 
110   bool FinalizeResult(void** tag, bool* status) override;
111 
context()112   ServerContext* context() { return &server_context_; }
stream()113   GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
114 
115  private:
116   Server* const server_;
117   ServerCompletionQueue* const cq_;
118 };
119 
120 /// UnimplementedAsyncResponse should not post user-visible completions to the
121 /// C++ completion queue, but is generated as a CQ event by the core
122 class Server::UnimplementedAsyncResponse final
123     : public internal::CallOpSet<internal::CallOpSendInitialMetadata,
124                                  internal::CallOpServerSendStatus> {
125  public:
126   UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
~UnimplementedAsyncResponse()127   ~UnimplementedAsyncResponse() { delete request_; }
128 
FinalizeResult(void ** tag,bool * status)129   bool FinalizeResult(void** tag, bool* status) override {
130     internal::CallOpSet<
131         internal::CallOpSendInitialMetadata,
132         internal::CallOpServerSendStatus>::FinalizeResult(tag, status);
133     delete this;
134     return false;
135   }
136 
137  private:
138   UnimplementedAsyncRequest* const request_;
139 };
140 
141 class Server::SyncRequest final : public internal::CompletionQueueTag {
142  public:
SyncRequest(internal::RpcServiceMethod * method,void * tag)143   SyncRequest(internal::RpcServiceMethod* method, void* tag)
144       : method_(method),
145         tag_(tag),
146         in_flight_(false),
147         has_request_payload_(
148             method->method_type() == internal::RpcMethod::NORMAL_RPC ||
149             method->method_type() == internal::RpcMethod::SERVER_STREAMING),
150         call_details_(nullptr),
151         cq_(nullptr) {
152     grpc_metadata_array_init(&request_metadata_);
153   }
154 
~SyncRequest()155   ~SyncRequest() {
156     if (call_details_) {
157       delete call_details_;
158     }
159     grpc_metadata_array_destroy(&request_metadata_);
160   }
161 
SetupRequest()162   void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
163 
TeardownRequest()164   void TeardownRequest() {
165     grpc_completion_queue_destroy(cq_);
166     cq_ = nullptr;
167   }
168 
Request(grpc_server * server,grpc_completion_queue * notify_cq)169   void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
170     GPR_ASSERT(cq_ && !in_flight_);
171     in_flight_ = true;
172     if (tag_) {
173       if (GRPC_CALL_OK !=
174           grpc_server_request_registered_call(
175               server, tag_, &call_, &deadline_, &request_metadata_,
176               has_request_payload_ ? &request_payload_ : nullptr, cq_,
177               notify_cq, this)) {
178         TeardownRequest();
179         return;
180       }
181     } else {
182       if (!call_details_) {
183         call_details_ = new grpc_call_details;
184         grpc_call_details_init(call_details_);
185       }
186       if (grpc_server_request_call(server, &call_, call_details_,
187                                    &request_metadata_, cq_, notify_cq,
188                                    this) != GRPC_CALL_OK) {
189         TeardownRequest();
190         return;
191       }
192     }
193   }
194 
FinalizeResult(void ** tag,bool * status)195   bool FinalizeResult(void** tag, bool* status) override {
196     if (!*status) {
197       grpc_completion_queue_destroy(cq_);
198     }
199     if (call_details_) {
200       deadline_ = call_details_->deadline;
201       grpc_call_details_destroy(call_details_);
202       grpc_call_details_init(call_details_);
203     }
204     return true;
205   }
206 
207   class CallData final {
208    public:
CallData(Server * server,SyncRequest * mrd)209     explicit CallData(Server* server, SyncRequest* mrd)
210         : cq_(mrd->cq_),
211           call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
212           ctx_(mrd->deadline_, &mrd->request_metadata_),
213           has_request_payload_(mrd->has_request_payload_),
214           request_payload_(has_request_payload_ ? mrd->request_payload_
215                                                 : nullptr),
216           method_(mrd->method_),
217           server_(server) {
218       ctx_.set_call(mrd->call_);
219       ctx_.cq_ = &cq_;
220       GPR_ASSERT(mrd->in_flight_);
221       mrd->in_flight_ = false;
222       mrd->request_metadata_.count = 0;
223     }
224 
~CallData()225     ~CallData() {
226       if (has_request_payload_ && request_payload_) {
227         grpc_byte_buffer_destroy(request_payload_);
228       }
229     }
230 
Run(const std::shared_ptr<GlobalCallbacks> & global_callbacks,bool resources)231     void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
232              bool resources) {
233       ctx_.BeginCompletionOp(&call_);
234       global_callbacks->PreSynchronousRequest(&ctx_);
235       auto* handler = resources ? method_->handler()
236                                 : server_->resource_exhausted_handler_.get();
237       handler->RunHandler(internal::MethodHandler::HandlerParameter(
238           &call_, &ctx_, request_payload_));
239       global_callbacks->PostSynchronousRequest(&ctx_);
240       request_payload_ = nullptr;
241 
242       cq_.Shutdown();
243 
244       internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
245       cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
246 
247       /* Ensure the cq_ is shutdown */
248       DummyTag ignored_tag;
249       GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
250     }
251 
252    private:
253     CompletionQueue cq_;
254     internal::Call call_;
255     ServerContext ctx_;
256     const bool has_request_payload_;
257     grpc_byte_buffer* request_payload_;
258     internal::RpcServiceMethod* const method_;
259     Server* server_;
260   };
261 
262  private:
263   internal::RpcServiceMethod* const method_;
264   void* const tag_;
265   bool in_flight_;
266   const bool has_request_payload_;
267   grpc_call* call_;
268   grpc_call_details* call_details_;
269   gpr_timespec deadline_;
270   grpc_metadata_array request_metadata_;
271   grpc_byte_buffer* request_payload_;
272   grpc_completion_queue* cq_;
273 };
274 
275 // Implementation of ThreadManager. Each instance of SyncRequestThreadManager
276 // manages a pool of threads that poll for incoming Sync RPCs and call the
277 // appropriate RPC handlers
278 class Server::SyncRequestThreadManager : public ThreadManager {
279  public:
SyncRequestThreadManager(Server * server,CompletionQueue * server_cq,std::shared_ptr<GlobalCallbacks> global_callbacks,grpc_resource_quota * rq,int min_pollers,int max_pollers,int cq_timeout_msec)280   SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
281                            std::shared_ptr<GlobalCallbacks> global_callbacks,
282                            grpc_resource_quota* rq, int min_pollers,
283                            int max_pollers, int cq_timeout_msec)
284       : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
285         server_(server),
286         server_cq_(server_cq),
287         cq_timeout_msec_(cq_timeout_msec),
288         global_callbacks_(std::move(global_callbacks)) {}
289 
PollForWork(void ** tag,bool * ok)290   WorkStatus PollForWork(void** tag, bool* ok) override {
291     *tag = nullptr;
292     // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
293     // right now
294     gpr_timespec deadline =
295         gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
296                      gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
297 
298     switch (server_cq_->AsyncNext(tag, ok, deadline)) {
299       case CompletionQueue::TIMEOUT:
300         return TIMEOUT;
301       case CompletionQueue::SHUTDOWN:
302         return SHUTDOWN;
303       case CompletionQueue::GOT_EVENT:
304         return WORK_FOUND;
305     }
306 
307     GPR_UNREACHABLE_CODE(return TIMEOUT);
308   }
309 
DoWork(void * tag,bool ok,bool resources)310   void DoWork(void* tag, bool ok, bool resources) override {
311     SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
312 
313     if (!sync_req) {
314       // No tag. Nothing to work on. This is an unlikley scenario and possibly a
315       // bug in RPC Manager implementation.
316       gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
317       return;
318     }
319 
320     if (ok) {
321       // Calldata takes ownership of the completion queue inside sync_req
322       SyncRequest::CallData cd(server_, sync_req);
323       // Prepare for the next request
324       if (!IsShutdown()) {
325         sync_req->SetupRequest();  // Create new completion queue for sync_req
326         sync_req->Request(server_->c_server(), server_cq_->cq());
327       }
328 
329       GPR_TIMER_SCOPE("cd.Run()", 0);
330       cd.Run(global_callbacks_, resources);
331     }
332     // TODO (sreek) If ok is false here (which it isn't in case of
333     // grpc_request_registered_call), we should still re-queue the request
334     // object
335   }
336 
AddSyncMethod(internal::RpcServiceMethod * method,void * tag)337   void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
338     sync_requests_.emplace_back(new SyncRequest(method, tag));
339   }
340 
AddUnknownSyncMethod()341   void AddUnknownSyncMethod() {
342     if (!sync_requests_.empty()) {
343       unknown_method_.reset(new internal::RpcServiceMethod(
344           "unknown", internal::RpcMethod::BIDI_STREAMING,
345           new internal::UnknownMethodHandler));
346       sync_requests_.emplace_back(
347           new SyncRequest(unknown_method_.get(), nullptr));
348     }
349   }
350 
Shutdown()351   void Shutdown() override {
352     ThreadManager::Shutdown();
353     server_cq_->Shutdown();
354   }
355 
Wait()356   void Wait() override {
357     ThreadManager::Wait();
358     // Drain any pending items from the queue
359     void* tag;
360     bool ok;
361     while (server_cq_->Next(&tag, &ok)) {
362       // Do nothing
363     }
364   }
365 
Start()366   void Start() {
367     if (!sync_requests_.empty()) {
368       for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) {
369         (*m)->SetupRequest();
370         (*m)->Request(server_->c_server(), server_cq_->cq());
371       }
372 
373       Initialize();  // ThreadManager's Initialize()
374     }
375   }
376 
377  private:
378   Server* server_;
379   CompletionQueue* server_cq_;
380   int cq_timeout_msec_;
381   std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
382   std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
383   std::unique_ptr<internal::RpcServiceMethod> health_check_;
384   std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
385 };
386 
387 static internal::GrpcLibraryInitializer g_gli_initializer;
Server(int max_receive_message_size,ChannelArguments * args,std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs,int min_pollers,int max_pollers,int sync_cq_timeout_msec,grpc_resource_quota * server_rq)388 Server::Server(
389     int max_receive_message_size, ChannelArguments* args,
390     std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
391         sync_server_cqs,
392     int min_pollers, int max_pollers, int sync_cq_timeout_msec,
393     grpc_resource_quota* server_rq)
394     : max_receive_message_size_(max_receive_message_size),
395       sync_server_cqs_(std::move(sync_server_cqs)),
396       started_(false),
397       shutdown_(false),
398       shutdown_notified_(false),
399       has_generic_service_(false),
400       server_(nullptr),
401       server_initializer_(new ServerInitializer(this)),
402       health_check_service_disabled_(false) {
403   g_gli_initializer.summon();
404   gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
405   global_callbacks_ = g_callbacks;
406   global_callbacks_->UpdateArguments(args);
407 
408   if (sync_server_cqs_ != nullptr) {
409     bool default_rq_created = false;
410     if (server_rq == nullptr) {
411       server_rq = grpc_resource_quota_create("SyncServer-default-rq");
412       grpc_resource_quota_set_max_threads(server_rq,
413                                           DEFAULT_MAX_SYNC_SERVER_THREADS);
414       default_rq_created = true;
415     }
416 
417     for (const auto& it : *sync_server_cqs_) {
418       sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
419           this, it.get(), global_callbacks_, server_rq, min_pollers,
420           max_pollers, sync_cq_timeout_msec));
421     }
422 
423     if (default_rq_created) {
424       grpc_resource_quota_unref(server_rq);
425     }
426   }
427 
428   grpc_channel_args channel_args;
429   args->SetChannelArgs(&channel_args);
430 
431   for (size_t i = 0; i < channel_args.num_args; i++) {
432     if (0 ==
433         strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) {
434       if (channel_args.args[i].value.pointer.p == nullptr) {
435         health_check_service_disabled_ = true;
436       } else {
437         health_check_service_.reset(static_cast<HealthCheckServiceInterface*>(
438             channel_args.args[i].value.pointer.p));
439       }
440       break;
441     }
442   }
443 
444   server_ = grpc_server_create(&channel_args, nullptr);
445 }
446 
~Server()447 Server::~Server() {
448   {
449     std::unique_lock<std::mutex> lock(mu_);
450     if (started_ && !shutdown_) {
451       lock.unlock();
452       Shutdown();
453     } else if (!started_) {
454       // Shutdown the completion queues
455       for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
456         (*it)->Shutdown();
457       }
458     }
459   }
460 
461   grpc_server_destroy(server_);
462 }
463 
SetGlobalCallbacks(GlobalCallbacks * callbacks)464 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
465   GPR_ASSERT(!g_callbacks);
466   GPR_ASSERT(callbacks);
467   g_callbacks.reset(callbacks);
468 }
469 
c_server()470 grpc_server* Server::c_server() { return server_; }
471 
InProcessChannel(const ChannelArguments & args)472 std::shared_ptr<Channel> Server::InProcessChannel(
473     const ChannelArguments& args) {
474   grpc_channel_args channel_args = args.c_channel_args();
475   return CreateChannelInternal(
476       "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr));
477 }
478 
PayloadHandlingForMethod(internal::RpcServiceMethod * method)479 static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
480     internal::RpcServiceMethod* method) {
481   switch (method->method_type()) {
482     case internal::RpcMethod::NORMAL_RPC:
483     case internal::RpcMethod::SERVER_STREAMING:
484       return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
485     case internal::RpcMethod::CLIENT_STREAMING:
486     case internal::RpcMethod::BIDI_STREAMING:
487       return GRPC_SRM_PAYLOAD_NONE;
488   }
489   GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
490 }
491 
RegisterService(const grpc::string * host,Service * service)492 bool Server::RegisterService(const grpc::string* host, Service* service) {
493   bool has_async_methods = service->has_async_methods();
494   if (has_async_methods) {
495     GPR_ASSERT(service->server_ == nullptr &&
496                "Can only register an asynchronous service against one server.");
497     service->server_ = this;
498   }
499 
500   const char* method_name = nullptr;
501   for (auto it = service->methods_.begin(); it != service->methods_.end();
502        ++it) {
503     if (it->get() == nullptr) {  // Handled by generic service if any.
504       continue;
505     }
506 
507     internal::RpcServiceMethod* method = it->get();
508     void* tag = grpc_server_register_method(
509         server_, method->name(), host ? host->c_str() : nullptr,
510         PayloadHandlingForMethod(method), 0);
511     if (tag == nullptr) {
512       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
513               method->name());
514       return false;
515     }
516 
517     if (method->handler() == nullptr) {  // Async method
518       method->set_server_tag(tag);
519     } else {
520       for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
521         (*it)->AddSyncMethod(method, tag);
522       }
523     }
524 
525     method_name = method->name();
526   }
527 
528   // Parse service name.
529   if (method_name != nullptr) {
530     std::stringstream ss(method_name);
531     grpc::string service_name;
532     if (std::getline(ss, service_name, '/') &&
533         std::getline(ss, service_name, '/')) {
534       services_.push_back(service_name);
535     }
536   }
537   return true;
538 }
539 
RegisterAsyncGenericService(AsyncGenericService * service)540 void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
541   GPR_ASSERT(service->server_ == nullptr &&
542              "Can only register an async generic service against one server.");
543   service->server_ = this;
544   has_generic_service_ = true;
545 }
546 
AddListeningPort(const grpc::string & addr,ServerCredentials * creds)547 int Server::AddListeningPort(const grpc::string& addr,
548                              ServerCredentials* creds) {
549   GPR_ASSERT(!started_);
550   int port = creds->AddPortToServer(addr, server_);
551   global_callbacks_->AddPort(this, addr, creds, port);
552   return port;
553 }
554 
Start(ServerCompletionQueue ** cqs,size_t num_cqs)555 void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
556   GPR_ASSERT(!started_);
557   global_callbacks_->PreServerStart(this);
558   started_ = true;
559 
560   // Only create default health check service when user did not provide an
561   // explicit one.
562   if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
563       DefaultHealthCheckServiceEnabled()) {
564     if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
565       gpr_log(GPR_INFO,
566               "Default health check service disabled at async-only server.");
567     } else {
568       auto* default_hc_service = new DefaultHealthCheckService;
569       health_check_service_.reset(default_hc_service);
570       RegisterService(nullptr, default_hc_service->GetHealthCheckService());
571     }
572   }
573 
574   grpc_server_start(server_);
575 
576   if (!has_generic_service_) {
577     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
578       (*it)->AddUnknownSyncMethod();
579     }
580 
581     for (size_t i = 0; i < num_cqs; i++) {
582       if (cqs[i]->IsFrequentlyPolled()) {
583         new UnimplementedAsyncRequest(this, cqs[i]);
584       }
585     }
586   }
587 
588   // If this server has any support for synchronous methods (has any sync
589   // server CQs), make sure that we have a ResourceExhausted handler
590   // to deal with the case of thread exhaustion
591   if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
592     resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
593   }
594 
595   for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
596     (*it)->Start();
597   }
598 }
599 
ShutdownInternal(gpr_timespec deadline)600 void Server::ShutdownInternal(gpr_timespec deadline) {
601   std::unique_lock<std::mutex> lock(mu_);
602   if (!shutdown_) {
603     shutdown_ = true;
604 
605     /// The completion queue to use for server shutdown completion notification
606     CompletionQueue shutdown_cq;
607     ShutdownTag shutdown_tag;  // Dummy shutdown tag
608     grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
609 
610     shutdown_cq.Shutdown();
611 
612     void* tag;
613     bool ok;
614     CompletionQueue::NextStatus status =
615         shutdown_cq.AsyncNext(&tag, &ok, deadline);
616 
617     // If this timed out, it means we are done with the grace period for a clean
618     // shutdown. We should force a shutdown now by cancelling all inflight calls
619     if (status == CompletionQueue::NextStatus::TIMEOUT) {
620       grpc_server_cancel_all_calls(server_);
621     }
622     // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
623     // successfully shutdown
624 
625     // Shutdown all ThreadManagers. This will try to gracefully stop all the
626     // threads in the ThreadManagers (once they process any inflight requests)
627     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
628       (*it)->Shutdown();  // ThreadManager's Shutdown()
629     }
630 
631     // Wait for threads in all ThreadManagers to terminate
632     for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
633       (*it)->Wait();
634     }
635 
636     // Drain the shutdown queue (if the previous call to AsyncNext() timed out
637     // and we didn't remove the tag from the queue yet)
638     while (shutdown_cq.Next(&tag, &ok)) {
639       // Nothing to be done here. Just ignore ok and tag values
640     }
641 
642     shutdown_notified_ = true;
643     shutdown_cv_.notify_all();
644   }
645 }
646 
Wait()647 void Server::Wait() {
648   std::unique_lock<std::mutex> lock(mu_);
649   while (started_ && !shutdown_notified_) {
650     shutdown_cv_.wait(lock);
651   }
652 }
653 
PerformOpsOnCall(internal::CallOpSetInterface * ops,internal::Call * call)654 void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
655                               internal::Call* call) {
656   static const size_t MAX_OPS = 8;
657   size_t nops = 0;
658   grpc_op cops[MAX_OPS];
659   ops->FillOps(call->call(), cops, &nops);
660   // TODO(vjpai): Use ops->cq_tag once this case supports callbacks
661   auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
662   if (result != GRPC_CALL_OK) {
663     gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
664     grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
665                         call->call(), cops, nops, ops);
666     abort();
667   }
668 }
669 
BaseAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,void * tag,bool delete_on_finalize)670 ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
671     ServerInterface* server, ServerContext* context,
672     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
673     void* tag, bool delete_on_finalize)
674     : server_(server),
675       context_(context),
676       stream_(stream),
677       call_cq_(call_cq),
678       tag_(tag),
679       delete_on_finalize_(delete_on_finalize),
680       call_(nullptr) {
681   call_cq_->RegisterAvalanching();  // This op will trigger more ops
682 }
683 
~BaseAsyncRequest()684 ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
685   call_cq_->CompleteAvalanching();
686 }
687 
FinalizeResult(void ** tag,bool * status)688 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
689                                                        bool* status) {
690   context_->set_call(call_);
691   context_->cq_ = call_cq_;
692   internal::Call call(call_, server_, call_cq_,
693                       server_->max_receive_message_size());
694   if (*status && call_) {
695     context_->BeginCompletionOp(&call);
696   }
697   // just the pointers inside call are copied here
698   stream_->BindCall(&call);
699   *tag = tag_;
700   if (delete_on_finalize_) {
701     delete this;
702   }
703   return true;
704 }
705 
RegisteredAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,void * tag)706 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
707     ServerInterface* server, ServerContext* context,
708     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
709     void* tag)
710     : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
711 
IssueRequest(void * registered_method,grpc_byte_buffer ** payload,ServerCompletionQueue * notification_cq)712 void ServerInterface::RegisteredAsyncRequest::IssueRequest(
713     void* registered_method, grpc_byte_buffer** payload,
714     ServerCompletionQueue* notification_cq) {
715   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
716                                  server_->server(), registered_method, &call_,
717                                  &context_->deadline_,
718                                  context_->client_metadata_.arr(), payload,
719                                  call_cq_->cq(), notification_cq->cq(), this));
720 }
721 
GenericAsyncRequest(ServerInterface * server,GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,bool delete_on_finalize)722 ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
723     ServerInterface* server, GenericServerContext* context,
724     internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
725     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
726     : BaseAsyncRequest(server, context, stream, call_cq, tag,
727                        delete_on_finalize) {
728   grpc_call_details_init(&call_details_);
729   GPR_ASSERT(notification_cq);
730   GPR_ASSERT(call_cq);
731   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
732                                  server->server(), &call_, &call_details_,
733                                  context->client_metadata_.arr(), call_cq->cq(),
734                                  notification_cq->cq(), this));
735 }
736 
FinalizeResult(void ** tag,bool * status)737 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
738                                                           bool* status) {
739   // TODO(yangg) remove the copy here.
740   if (*status) {
741     static_cast<GenericServerContext*>(context_)->method_ =
742         StringFromCopiedSlice(call_details_.method);
743     static_cast<GenericServerContext*>(context_)->host_ =
744         StringFromCopiedSlice(call_details_.host);
745     context_->deadline_ = call_details_.deadline;
746   }
747   grpc_slice_unref(call_details_.method);
748   grpc_slice_unref(call_details_.host);
749   return BaseAsyncRequest::FinalizeResult(tag, status);
750 }
751 
FinalizeResult(void ** tag,bool * status)752 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
753                                                        bool* status) {
754   if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
755     new UnimplementedAsyncRequest(server_, cq_);
756     new UnimplementedAsyncResponse(this);
757   } else {
758     delete this;
759   }
760   return false;
761 }
762 
UnimplementedAsyncResponse(UnimplementedAsyncRequest * request)763 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
764     UnimplementedAsyncRequest* request)
765     : request_(request) {
766   Status status(StatusCode::UNIMPLEMENTED, "");
767   internal::UnknownMethodHandler::FillOps(request_->context(), this);
768   request_->stream()->call_.PerformOps(this);
769 }
770 
initializer()771 ServerInitializer* Server::initializer() { return server_initializer_.get(); }
772 
773 }  // namespace grpc
774