1 /*
2 * Copyright 2015 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 */
17
18 #include <grpcpp/server.h>
19
20 #include <cstdlib>
21 #include <sstream>
22 #include <utility>
23
24 #include <grpc/grpc.h>
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/completion_queue.h>
28 #include <grpcpp/generic/async_generic_service.h>
29 #include <grpcpp/impl/codegen/async_unary_call.h>
30 #include <grpcpp/impl/codegen/completion_queue_tag.h>
31 #include <grpcpp/impl/grpc_library.h>
32 #include <grpcpp/impl/method_handler_impl.h>
33 #include <grpcpp/impl/rpc_service_method.h>
34 #include <grpcpp/impl/server_initializer.h>
35 #include <grpcpp/impl/service_type.h>
36 #include <grpcpp/security/server_credentials.h>
37 #include <grpcpp/server_context.h>
38 #include <grpcpp/support/time.h>
39
40 #include "src/core/ext/transport/inproc/inproc_transport.h"
41 #include "src/core/lib/profiling/timers.h"
42 #include "src/core/lib/surface/call.h"
43 #include "src/cpp/client/create_channel_internal.h"
44 #include "src/cpp/server/health/default_health_check_service.h"
45 #include "src/cpp/thread_manager/thread_manager.h"
46
47 namespace grpc {
48 namespace {
49
50 // The default value for maximum number of threads that can be created in the
51 // sync server. This value of INT_MAX is chosen to match the default behavior if
52 // no ResourceQuota is set. To modify the max number of threads in a sync
53 // server, pass a custom ResourceQuota object (with the desired number of
54 // max-threads set) to the server builder.
55 #define DEFAULT_MAX_SYNC_SERVER_THREADS INT_MAX
56
57 class DefaultGlobalCallbacks final : public Server::GlobalCallbacks {
58 public:
~DefaultGlobalCallbacks()59 ~DefaultGlobalCallbacks() override {}
PreSynchronousRequest(ServerContext * context)60 void PreSynchronousRequest(ServerContext* context) override {}
PostSynchronousRequest(ServerContext * context)61 void PostSynchronousRequest(ServerContext* context) override {}
62 };
63
64 std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
65 gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
66
InitGlobalCallbacks()67 void InitGlobalCallbacks() {
68 if (!g_callbacks) {
69 g_callbacks.reset(new DefaultGlobalCallbacks());
70 }
71 }
72
73 class ShutdownTag : public internal::CompletionQueueTag {
74 public:
FinalizeResult(void ** tag,bool * status)75 bool FinalizeResult(void** tag, bool* status) { return false; }
76 };
77
78 class DummyTag : public internal::CompletionQueueTag {
79 public:
FinalizeResult(void ** tag,bool * status)80 bool FinalizeResult(void** tag, bool* status) {
81 *status = true;
82 return true;
83 }
84 };
85
86 class UnimplementedAsyncRequestContext {
87 protected:
UnimplementedAsyncRequestContext()88 UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
89
90 GenericServerContext server_context_;
91 GenericServerAsyncReaderWriter generic_stream_;
92 };
93
94 } // namespace
95
96 /// Use private inheritance rather than composition only to establish order
97 /// of construction, since the public base class should be constructed after the
98 /// elements belonging to the private base class are constructed. This is not
99 /// possible using true composition.
100 class Server::UnimplementedAsyncRequest final
101 : private UnimplementedAsyncRequestContext,
102 public GenericAsyncRequest {
103 public:
UnimplementedAsyncRequest(Server * server,ServerCompletionQueue * cq)104 UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
105 : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
106 nullptr, false),
107 server_(server),
108 cq_(cq) {}
109
110 bool FinalizeResult(void** tag, bool* status) override;
111
context()112 ServerContext* context() { return &server_context_; }
stream()113 GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
114
115 private:
116 Server* const server_;
117 ServerCompletionQueue* const cq_;
118 };
119
120 /// UnimplementedAsyncResponse should not post user-visible completions to the
121 /// C++ completion queue, but is generated as a CQ event by the core
122 class Server::UnimplementedAsyncResponse final
123 : public internal::CallOpSet<internal::CallOpSendInitialMetadata,
124 internal::CallOpServerSendStatus> {
125 public:
126 UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
~UnimplementedAsyncResponse()127 ~UnimplementedAsyncResponse() { delete request_; }
128
FinalizeResult(void ** tag,bool * status)129 bool FinalizeResult(void** tag, bool* status) override {
130 internal::CallOpSet<
131 internal::CallOpSendInitialMetadata,
132 internal::CallOpServerSendStatus>::FinalizeResult(tag, status);
133 delete this;
134 return false;
135 }
136
137 private:
138 UnimplementedAsyncRequest* const request_;
139 };
140
141 class Server::SyncRequest final : public internal::CompletionQueueTag {
142 public:
SyncRequest(internal::RpcServiceMethod * method,void * tag)143 SyncRequest(internal::RpcServiceMethod* method, void* tag)
144 : method_(method),
145 tag_(tag),
146 in_flight_(false),
147 has_request_payload_(
148 method->method_type() == internal::RpcMethod::NORMAL_RPC ||
149 method->method_type() == internal::RpcMethod::SERVER_STREAMING),
150 call_details_(nullptr),
151 cq_(nullptr) {
152 grpc_metadata_array_init(&request_metadata_);
153 }
154
~SyncRequest()155 ~SyncRequest() {
156 if (call_details_) {
157 delete call_details_;
158 }
159 grpc_metadata_array_destroy(&request_metadata_);
160 }
161
SetupRequest()162 void SetupRequest() { cq_ = grpc_completion_queue_create_for_pluck(nullptr); }
163
TeardownRequest()164 void TeardownRequest() {
165 grpc_completion_queue_destroy(cq_);
166 cq_ = nullptr;
167 }
168
Request(grpc_server * server,grpc_completion_queue * notify_cq)169 void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
170 GPR_ASSERT(cq_ && !in_flight_);
171 in_flight_ = true;
172 if (tag_) {
173 if (GRPC_CALL_OK !=
174 grpc_server_request_registered_call(
175 server, tag_, &call_, &deadline_, &request_metadata_,
176 has_request_payload_ ? &request_payload_ : nullptr, cq_,
177 notify_cq, this)) {
178 TeardownRequest();
179 return;
180 }
181 } else {
182 if (!call_details_) {
183 call_details_ = new grpc_call_details;
184 grpc_call_details_init(call_details_);
185 }
186 if (grpc_server_request_call(server, &call_, call_details_,
187 &request_metadata_, cq_, notify_cq,
188 this) != GRPC_CALL_OK) {
189 TeardownRequest();
190 return;
191 }
192 }
193 }
194
FinalizeResult(void ** tag,bool * status)195 bool FinalizeResult(void** tag, bool* status) override {
196 if (!*status) {
197 grpc_completion_queue_destroy(cq_);
198 }
199 if (call_details_) {
200 deadline_ = call_details_->deadline;
201 grpc_call_details_destroy(call_details_);
202 grpc_call_details_init(call_details_);
203 }
204 return true;
205 }
206
207 class CallData final {
208 public:
CallData(Server * server,SyncRequest * mrd)209 explicit CallData(Server* server, SyncRequest* mrd)
210 : cq_(mrd->cq_),
211 call_(mrd->call_, server, &cq_, server->max_receive_message_size()),
212 ctx_(mrd->deadline_, &mrd->request_metadata_),
213 has_request_payload_(mrd->has_request_payload_),
214 request_payload_(has_request_payload_ ? mrd->request_payload_
215 : nullptr),
216 method_(mrd->method_),
217 server_(server) {
218 ctx_.set_call(mrd->call_);
219 ctx_.cq_ = &cq_;
220 GPR_ASSERT(mrd->in_flight_);
221 mrd->in_flight_ = false;
222 mrd->request_metadata_.count = 0;
223 }
224
~CallData()225 ~CallData() {
226 if (has_request_payload_ && request_payload_) {
227 grpc_byte_buffer_destroy(request_payload_);
228 }
229 }
230
Run(const std::shared_ptr<GlobalCallbacks> & global_callbacks,bool resources)231 void Run(const std::shared_ptr<GlobalCallbacks>& global_callbacks,
232 bool resources) {
233 ctx_.BeginCompletionOp(&call_);
234 global_callbacks->PreSynchronousRequest(&ctx_);
235 auto* handler = resources ? method_->handler()
236 : server_->resource_exhausted_handler_.get();
237 handler->RunHandler(internal::MethodHandler::HandlerParameter(
238 &call_, &ctx_, request_payload_));
239 global_callbacks->PostSynchronousRequest(&ctx_);
240 request_payload_ = nullptr;
241
242 cq_.Shutdown();
243
244 internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
245 cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
246
247 /* Ensure the cq_ is shutdown */
248 DummyTag ignored_tag;
249 GPR_ASSERT(cq_.Pluck(&ignored_tag) == false);
250 }
251
252 private:
253 CompletionQueue cq_;
254 internal::Call call_;
255 ServerContext ctx_;
256 const bool has_request_payload_;
257 grpc_byte_buffer* request_payload_;
258 internal::RpcServiceMethod* const method_;
259 Server* server_;
260 };
261
262 private:
263 internal::RpcServiceMethod* const method_;
264 void* const tag_;
265 bool in_flight_;
266 const bool has_request_payload_;
267 grpc_call* call_;
268 grpc_call_details* call_details_;
269 gpr_timespec deadline_;
270 grpc_metadata_array request_metadata_;
271 grpc_byte_buffer* request_payload_;
272 grpc_completion_queue* cq_;
273 };
274
275 // Implementation of ThreadManager. Each instance of SyncRequestThreadManager
276 // manages a pool of threads that poll for incoming Sync RPCs and call the
277 // appropriate RPC handlers
278 class Server::SyncRequestThreadManager : public ThreadManager {
279 public:
SyncRequestThreadManager(Server * server,CompletionQueue * server_cq,std::shared_ptr<GlobalCallbacks> global_callbacks,grpc_resource_quota * rq,int min_pollers,int max_pollers,int cq_timeout_msec)280 SyncRequestThreadManager(Server* server, CompletionQueue* server_cq,
281 std::shared_ptr<GlobalCallbacks> global_callbacks,
282 grpc_resource_quota* rq, int min_pollers,
283 int max_pollers, int cq_timeout_msec)
284 : ThreadManager("SyncServer", rq, min_pollers, max_pollers),
285 server_(server),
286 server_cq_(server_cq),
287 cq_timeout_msec_(cq_timeout_msec),
288 global_callbacks_(std::move(global_callbacks)) {}
289
PollForWork(void ** tag,bool * ok)290 WorkStatus PollForWork(void** tag, bool* ok) override {
291 *tag = nullptr;
292 // TODO(ctiller): workaround for GPR_TIMESPAN based deadlines not working
293 // right now
294 gpr_timespec deadline =
295 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
296 gpr_time_from_millis(cq_timeout_msec_, GPR_TIMESPAN));
297
298 switch (server_cq_->AsyncNext(tag, ok, deadline)) {
299 case CompletionQueue::TIMEOUT:
300 return TIMEOUT;
301 case CompletionQueue::SHUTDOWN:
302 return SHUTDOWN;
303 case CompletionQueue::GOT_EVENT:
304 return WORK_FOUND;
305 }
306
307 GPR_UNREACHABLE_CODE(return TIMEOUT);
308 }
309
DoWork(void * tag,bool ok,bool resources)310 void DoWork(void* tag, bool ok, bool resources) override {
311 SyncRequest* sync_req = static_cast<SyncRequest*>(tag);
312
313 if (!sync_req) {
314 // No tag. Nothing to work on. This is an unlikley scenario and possibly a
315 // bug in RPC Manager implementation.
316 gpr_log(GPR_ERROR, "Sync server. DoWork() was called with NULL tag");
317 return;
318 }
319
320 if (ok) {
321 // Calldata takes ownership of the completion queue inside sync_req
322 SyncRequest::CallData cd(server_, sync_req);
323 // Prepare for the next request
324 if (!IsShutdown()) {
325 sync_req->SetupRequest(); // Create new completion queue for sync_req
326 sync_req->Request(server_->c_server(), server_cq_->cq());
327 }
328
329 GPR_TIMER_SCOPE("cd.Run()", 0);
330 cd.Run(global_callbacks_, resources);
331 }
332 // TODO (sreek) If ok is false here (which it isn't in case of
333 // grpc_request_registered_call), we should still re-queue the request
334 // object
335 }
336
AddSyncMethod(internal::RpcServiceMethod * method,void * tag)337 void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
338 sync_requests_.emplace_back(new SyncRequest(method, tag));
339 }
340
AddUnknownSyncMethod()341 void AddUnknownSyncMethod() {
342 if (!sync_requests_.empty()) {
343 unknown_method_.reset(new internal::RpcServiceMethod(
344 "unknown", internal::RpcMethod::BIDI_STREAMING,
345 new internal::UnknownMethodHandler));
346 sync_requests_.emplace_back(
347 new SyncRequest(unknown_method_.get(), nullptr));
348 }
349 }
350
Shutdown()351 void Shutdown() override {
352 ThreadManager::Shutdown();
353 server_cq_->Shutdown();
354 }
355
Wait()356 void Wait() override {
357 ThreadManager::Wait();
358 // Drain any pending items from the queue
359 void* tag;
360 bool ok;
361 while (server_cq_->Next(&tag, &ok)) {
362 // Do nothing
363 }
364 }
365
Start()366 void Start() {
367 if (!sync_requests_.empty()) {
368 for (auto m = sync_requests_.begin(); m != sync_requests_.end(); m++) {
369 (*m)->SetupRequest();
370 (*m)->Request(server_->c_server(), server_cq_->cq());
371 }
372
373 Initialize(); // ThreadManager's Initialize()
374 }
375 }
376
377 private:
378 Server* server_;
379 CompletionQueue* server_cq_;
380 int cq_timeout_msec_;
381 std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
382 std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
383 std::unique_ptr<internal::RpcServiceMethod> health_check_;
384 std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
385 };
386
387 static internal::GrpcLibraryInitializer g_gli_initializer;
Server(int max_receive_message_size,ChannelArguments * args,std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> sync_server_cqs,int min_pollers,int max_pollers,int sync_cq_timeout_msec,grpc_resource_quota * server_rq)388 Server::Server(
389 int max_receive_message_size, ChannelArguments* args,
390 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
391 sync_server_cqs,
392 int min_pollers, int max_pollers, int sync_cq_timeout_msec,
393 grpc_resource_quota* server_rq)
394 : max_receive_message_size_(max_receive_message_size),
395 sync_server_cqs_(std::move(sync_server_cqs)),
396 started_(false),
397 shutdown_(false),
398 shutdown_notified_(false),
399 has_generic_service_(false),
400 server_(nullptr),
401 server_initializer_(new ServerInitializer(this)),
402 health_check_service_disabled_(false) {
403 g_gli_initializer.summon();
404 gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
405 global_callbacks_ = g_callbacks;
406 global_callbacks_->UpdateArguments(args);
407
408 if (sync_server_cqs_ != nullptr) {
409 bool default_rq_created = false;
410 if (server_rq == nullptr) {
411 server_rq = grpc_resource_quota_create("SyncServer-default-rq");
412 grpc_resource_quota_set_max_threads(server_rq,
413 DEFAULT_MAX_SYNC_SERVER_THREADS);
414 default_rq_created = true;
415 }
416
417 for (const auto& it : *sync_server_cqs_) {
418 sync_req_mgrs_.emplace_back(new SyncRequestThreadManager(
419 this, it.get(), global_callbacks_, server_rq, min_pollers,
420 max_pollers, sync_cq_timeout_msec));
421 }
422
423 if (default_rq_created) {
424 grpc_resource_quota_unref(server_rq);
425 }
426 }
427
428 grpc_channel_args channel_args;
429 args->SetChannelArgs(&channel_args);
430
431 for (size_t i = 0; i < channel_args.num_args; i++) {
432 if (0 ==
433 strcmp(channel_args.args[i].key, kHealthCheckServiceInterfaceArg)) {
434 if (channel_args.args[i].value.pointer.p == nullptr) {
435 health_check_service_disabled_ = true;
436 } else {
437 health_check_service_.reset(static_cast<HealthCheckServiceInterface*>(
438 channel_args.args[i].value.pointer.p));
439 }
440 break;
441 }
442 }
443
444 server_ = grpc_server_create(&channel_args, nullptr);
445 }
446
~Server()447 Server::~Server() {
448 {
449 std::unique_lock<std::mutex> lock(mu_);
450 if (started_ && !shutdown_) {
451 lock.unlock();
452 Shutdown();
453 } else if (!started_) {
454 // Shutdown the completion queues
455 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
456 (*it)->Shutdown();
457 }
458 }
459 }
460
461 grpc_server_destroy(server_);
462 }
463
SetGlobalCallbacks(GlobalCallbacks * callbacks)464 void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
465 GPR_ASSERT(!g_callbacks);
466 GPR_ASSERT(callbacks);
467 g_callbacks.reset(callbacks);
468 }
469
c_server()470 grpc_server* Server::c_server() { return server_; }
471
InProcessChannel(const ChannelArguments & args)472 std::shared_ptr<Channel> Server::InProcessChannel(
473 const ChannelArguments& args) {
474 grpc_channel_args channel_args = args.c_channel_args();
475 return CreateChannelInternal(
476 "inproc", grpc_inproc_channel_create(server_, &channel_args, nullptr));
477 }
478
PayloadHandlingForMethod(internal::RpcServiceMethod * method)479 static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
480 internal::RpcServiceMethod* method) {
481 switch (method->method_type()) {
482 case internal::RpcMethod::NORMAL_RPC:
483 case internal::RpcMethod::SERVER_STREAMING:
484 return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
485 case internal::RpcMethod::CLIENT_STREAMING:
486 case internal::RpcMethod::BIDI_STREAMING:
487 return GRPC_SRM_PAYLOAD_NONE;
488 }
489 GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
490 }
491
RegisterService(const grpc::string * host,Service * service)492 bool Server::RegisterService(const grpc::string* host, Service* service) {
493 bool has_async_methods = service->has_async_methods();
494 if (has_async_methods) {
495 GPR_ASSERT(service->server_ == nullptr &&
496 "Can only register an asynchronous service against one server.");
497 service->server_ = this;
498 }
499
500 const char* method_name = nullptr;
501 for (auto it = service->methods_.begin(); it != service->methods_.end();
502 ++it) {
503 if (it->get() == nullptr) { // Handled by generic service if any.
504 continue;
505 }
506
507 internal::RpcServiceMethod* method = it->get();
508 void* tag = grpc_server_register_method(
509 server_, method->name(), host ? host->c_str() : nullptr,
510 PayloadHandlingForMethod(method), 0);
511 if (tag == nullptr) {
512 gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
513 method->name());
514 return false;
515 }
516
517 if (method->handler() == nullptr) { // Async method
518 method->set_server_tag(tag);
519 } else {
520 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
521 (*it)->AddSyncMethod(method, tag);
522 }
523 }
524
525 method_name = method->name();
526 }
527
528 // Parse service name.
529 if (method_name != nullptr) {
530 std::stringstream ss(method_name);
531 grpc::string service_name;
532 if (std::getline(ss, service_name, '/') &&
533 std::getline(ss, service_name, '/')) {
534 services_.push_back(service_name);
535 }
536 }
537 return true;
538 }
539
RegisterAsyncGenericService(AsyncGenericService * service)540 void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
541 GPR_ASSERT(service->server_ == nullptr &&
542 "Can only register an async generic service against one server.");
543 service->server_ = this;
544 has_generic_service_ = true;
545 }
546
AddListeningPort(const grpc::string & addr,ServerCredentials * creds)547 int Server::AddListeningPort(const grpc::string& addr,
548 ServerCredentials* creds) {
549 GPR_ASSERT(!started_);
550 int port = creds->AddPortToServer(addr, server_);
551 global_callbacks_->AddPort(this, addr, creds, port);
552 return port;
553 }
554
Start(ServerCompletionQueue ** cqs,size_t num_cqs)555 void Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
556 GPR_ASSERT(!started_);
557 global_callbacks_->PreServerStart(this);
558 started_ = true;
559
560 // Only create default health check service when user did not provide an
561 // explicit one.
562 if (health_check_service_ == nullptr && !health_check_service_disabled_ &&
563 DefaultHealthCheckServiceEnabled()) {
564 if (sync_server_cqs_ == nullptr || sync_server_cqs_->empty()) {
565 gpr_log(GPR_INFO,
566 "Default health check service disabled at async-only server.");
567 } else {
568 auto* default_hc_service = new DefaultHealthCheckService;
569 health_check_service_.reset(default_hc_service);
570 RegisterService(nullptr, default_hc_service->GetHealthCheckService());
571 }
572 }
573
574 grpc_server_start(server_);
575
576 if (!has_generic_service_) {
577 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
578 (*it)->AddUnknownSyncMethod();
579 }
580
581 for (size_t i = 0; i < num_cqs; i++) {
582 if (cqs[i]->IsFrequentlyPolled()) {
583 new UnimplementedAsyncRequest(this, cqs[i]);
584 }
585 }
586 }
587
588 // If this server has any support for synchronous methods (has any sync
589 // server CQs), make sure that we have a ResourceExhausted handler
590 // to deal with the case of thread exhaustion
591 if (sync_server_cqs_ != nullptr && !sync_server_cqs_->empty()) {
592 resource_exhausted_handler_.reset(new internal::ResourceExhaustedHandler);
593 }
594
595 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
596 (*it)->Start();
597 }
598 }
599
ShutdownInternal(gpr_timespec deadline)600 void Server::ShutdownInternal(gpr_timespec deadline) {
601 std::unique_lock<std::mutex> lock(mu_);
602 if (!shutdown_) {
603 shutdown_ = true;
604
605 /// The completion queue to use for server shutdown completion notification
606 CompletionQueue shutdown_cq;
607 ShutdownTag shutdown_tag; // Dummy shutdown tag
608 grpc_server_shutdown_and_notify(server_, shutdown_cq.cq(), &shutdown_tag);
609
610 shutdown_cq.Shutdown();
611
612 void* tag;
613 bool ok;
614 CompletionQueue::NextStatus status =
615 shutdown_cq.AsyncNext(&tag, &ok, deadline);
616
617 // If this timed out, it means we are done with the grace period for a clean
618 // shutdown. We should force a shutdown now by cancelling all inflight calls
619 if (status == CompletionQueue::NextStatus::TIMEOUT) {
620 grpc_server_cancel_all_calls(server_);
621 }
622 // Else in case of SHUTDOWN or GOT_EVENT, it means that the server has
623 // successfully shutdown
624
625 // Shutdown all ThreadManagers. This will try to gracefully stop all the
626 // threads in the ThreadManagers (once they process any inflight requests)
627 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
628 (*it)->Shutdown(); // ThreadManager's Shutdown()
629 }
630
631 // Wait for threads in all ThreadManagers to terminate
632 for (auto it = sync_req_mgrs_.begin(); it != sync_req_mgrs_.end(); it++) {
633 (*it)->Wait();
634 }
635
636 // Drain the shutdown queue (if the previous call to AsyncNext() timed out
637 // and we didn't remove the tag from the queue yet)
638 while (shutdown_cq.Next(&tag, &ok)) {
639 // Nothing to be done here. Just ignore ok and tag values
640 }
641
642 shutdown_notified_ = true;
643 shutdown_cv_.notify_all();
644 }
645 }
646
Wait()647 void Server::Wait() {
648 std::unique_lock<std::mutex> lock(mu_);
649 while (started_ && !shutdown_notified_) {
650 shutdown_cv_.wait(lock);
651 }
652 }
653
PerformOpsOnCall(internal::CallOpSetInterface * ops,internal::Call * call)654 void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
655 internal::Call* call) {
656 static const size_t MAX_OPS = 8;
657 size_t nops = 0;
658 grpc_op cops[MAX_OPS];
659 ops->FillOps(call->call(), cops, &nops);
660 // TODO(vjpai): Use ops->cq_tag once this case supports callbacks
661 auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
662 if (result != GRPC_CALL_OK) {
663 gpr_log(GPR_ERROR, "Fatal: grpc_call_start_batch returned %d", result);
664 grpc_call_log_batch(__FILE__, __LINE__, GPR_LOG_SEVERITY_ERROR,
665 call->call(), cops, nops, ops);
666 abort();
667 }
668 }
669
BaseAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,void * tag,bool delete_on_finalize)670 ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
671 ServerInterface* server, ServerContext* context,
672 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
673 void* tag, bool delete_on_finalize)
674 : server_(server),
675 context_(context),
676 stream_(stream),
677 call_cq_(call_cq),
678 tag_(tag),
679 delete_on_finalize_(delete_on_finalize),
680 call_(nullptr) {
681 call_cq_->RegisterAvalanching(); // This op will trigger more ops
682 }
683
~BaseAsyncRequest()684 ServerInterface::BaseAsyncRequest::~BaseAsyncRequest() {
685 call_cq_->CompleteAvalanching();
686 }
687
FinalizeResult(void ** tag,bool * status)688 bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
689 bool* status) {
690 context_->set_call(call_);
691 context_->cq_ = call_cq_;
692 internal::Call call(call_, server_, call_cq_,
693 server_->max_receive_message_size());
694 if (*status && call_) {
695 context_->BeginCompletionOp(&call);
696 }
697 // just the pointers inside call are copied here
698 stream_->BindCall(&call);
699 *tag = tag_;
700 if (delete_on_finalize_) {
701 delete this;
702 }
703 return true;
704 }
705
RegisteredAsyncRequest(ServerInterface * server,ServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,void * tag)706 ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
707 ServerInterface* server, ServerContext* context,
708 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
709 void* tag)
710 : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
711
IssueRequest(void * registered_method,grpc_byte_buffer ** payload,ServerCompletionQueue * notification_cq)712 void ServerInterface::RegisteredAsyncRequest::IssueRequest(
713 void* registered_method, grpc_byte_buffer** payload,
714 ServerCompletionQueue* notification_cq) {
715 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_registered_call(
716 server_->server(), registered_method, &call_,
717 &context_->deadline_,
718 context_->client_metadata_.arr(), payload,
719 call_cq_->cq(), notification_cq->cq(), this));
720 }
721
GenericAsyncRequest(ServerInterface * server,GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,CompletionQueue * call_cq,ServerCompletionQueue * notification_cq,void * tag,bool delete_on_finalize)722 ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
723 ServerInterface* server, GenericServerContext* context,
724 internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
725 ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
726 : BaseAsyncRequest(server, context, stream, call_cq, tag,
727 delete_on_finalize) {
728 grpc_call_details_init(&call_details_);
729 GPR_ASSERT(notification_cq);
730 GPR_ASSERT(call_cq);
731 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
732 server->server(), &call_, &call_details_,
733 context->client_metadata_.arr(), call_cq->cq(),
734 notification_cq->cq(), this));
735 }
736
FinalizeResult(void ** tag,bool * status)737 bool ServerInterface::GenericAsyncRequest::FinalizeResult(void** tag,
738 bool* status) {
739 // TODO(yangg) remove the copy here.
740 if (*status) {
741 static_cast<GenericServerContext*>(context_)->method_ =
742 StringFromCopiedSlice(call_details_.method);
743 static_cast<GenericServerContext*>(context_)->host_ =
744 StringFromCopiedSlice(call_details_.host);
745 context_->deadline_ = call_details_.deadline;
746 }
747 grpc_slice_unref(call_details_.method);
748 grpc_slice_unref(call_details_.host);
749 return BaseAsyncRequest::FinalizeResult(tag, status);
750 }
751
FinalizeResult(void ** tag,bool * status)752 bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
753 bool* status) {
754 if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
755 new UnimplementedAsyncRequest(server_, cq_);
756 new UnimplementedAsyncResponse(this);
757 } else {
758 delete this;
759 }
760 return false;
761 }
762
UnimplementedAsyncResponse(UnimplementedAsyncRequest * request)763 Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
764 UnimplementedAsyncRequest* request)
765 : request_(request) {
766 Status status(StatusCode::UNIMPLEMENTED, "");
767 internal::UnknownMethodHandler::FillOps(request_->context(), this);
768 request_->stream()->call_.PerformOps(this);
769 }
770
initializer()771 ServerInitializer* Server::initializer() { return server_initializer_.get(); }
772
773 } // namespace grpc
774