• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpcpp/server_builder.h>
20 
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/impl/service_type.h>
24 #include <grpcpp/resource_quota.h>
25 #include <grpcpp/server.h>
26 
27 #include <utility>
28 
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gpr/string.h"
31 #include "src/core/lib/gpr/useful.h"
32 #include "src/cpp/server/external_connection_acceptor_impl.h"
33 #include "src/cpp/server/thread_pool_interface.h"
34 
35 namespace grpc {
36 
37 static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
38     g_plugin_factory_list;
39 static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
40 
do_plugin_list_init(void)41 static void do_plugin_list_init(void) {
42   g_plugin_factory_list =
43       new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
44 }
45 
ServerBuilder()46 ServerBuilder::ServerBuilder()
47     : max_receive_message_size_(INT_MIN),
48       max_send_message_size_(INT_MIN),
49       sync_server_settings_(SyncServerSettings()),
50       resource_quota_(nullptr) {
51   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
52   for (const auto& value : *g_plugin_factory_list) {
53     plugins_.emplace_back(value());
54   }
55 
56   // all compression algorithms enabled by default.
57   enabled_compression_algorithms_bitset_ =
58       (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
59   memset(&maybe_default_compression_level_, 0,
60          sizeof(maybe_default_compression_level_));
61   memset(&maybe_default_compression_algorithm_, 0,
62          sizeof(maybe_default_compression_algorithm_));
63 }
64 
~ServerBuilder()65 ServerBuilder::~ServerBuilder() {
66   if (resource_quota_ != nullptr) {
67     grpc_resource_quota_unref(resource_quota_);
68   }
69 }
70 
71 std::unique_ptr<grpc_impl::ServerCompletionQueue>
AddCompletionQueue(bool is_frequently_polled)72 ServerBuilder::AddCompletionQueue(bool is_frequently_polled) {
73   grpc_impl::ServerCompletionQueue* cq = new grpc_impl::ServerCompletionQueue(
74       GRPC_CQ_NEXT,
75       is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING,
76       nullptr);
77   cqs_.push_back(cq);
78   return std::unique_ptr<grpc_impl::ServerCompletionQueue>(cq);
79 }
80 
RegisterService(Service * service)81 ServerBuilder& ServerBuilder::RegisterService(Service* service) {
82   services_.emplace_back(new NamedService(service));
83   return *this;
84 }
85 
RegisterService(const std::string & addr,Service * service)86 ServerBuilder& ServerBuilder::RegisterService(const std::string& addr,
87                                               Service* service) {
88   services_.emplace_back(new NamedService(addr, service));
89   return *this;
90 }
91 
RegisterAsyncGenericService(AsyncGenericService * service)92 ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
93     AsyncGenericService* service) {
94   if (generic_service_ || callback_generic_service_) {
95     gpr_log(GPR_ERROR,
96             "Adding multiple generic services is unsupported for now. "
97             "Dropping the service %p",
98             (void*)service);
99   } else {
100     generic_service_ = service;
101   }
102   return *this;
103 }
104 
105 #ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
RegisterCallbackGenericService(CallbackGenericService * service)106 ServerBuilder& ServerBuilder::RegisterCallbackGenericService(
107     CallbackGenericService* service) {
108   if (generic_service_ || callback_generic_service_) {
109     gpr_log(GPR_ERROR,
110             "Adding multiple generic services is unsupported for now. "
111             "Dropping the service %p",
112             (void*)service);
113   } else {
114     callback_generic_service_ = service;
115   }
116   return *this;
117 }
118 #else
RegisterCallbackGenericService(experimental::CallbackGenericService * service)119 ServerBuilder& ServerBuilder::experimental_type::RegisterCallbackGenericService(
120     experimental::CallbackGenericService* service) {
121   if (builder_->generic_service_ || builder_->callback_generic_service_) {
122     gpr_log(GPR_ERROR,
123             "Adding multiple generic services is unsupported for now. "
124             "Dropping the service %p",
125             (void*)service);
126   } else {
127     builder_->callback_generic_service_ = service;
128   }
129   return *builder_;
130 }
131 #endif
132 
133 std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor>
AddExternalConnectionAcceptor(experimental_type::ExternalConnectionType type,std::shared_ptr<ServerCredentials> creds)134 ServerBuilder::experimental_type::AddExternalConnectionAcceptor(
135     experimental_type::ExternalConnectionType type,
136     std::shared_ptr<ServerCredentials> creds) {
137   std::string name_prefix("external:");
138   char count_str[GPR_LTOA_MIN_BUFSIZE];
139   gpr_ltoa(static_cast<long>(builder_->acceptors_.size()), count_str);
140   builder_->acceptors_.emplace_back(
141       std::make_shared<grpc::internal::ExternalConnectionAcceptorImpl>(
142           name_prefix.append(count_str), type, creds));
143   return builder_->acceptors_.back()->GetAcceptor();
144 }
145 
SetOption(std::unique_ptr<ServerBuilderOption> option)146 ServerBuilder& ServerBuilder::SetOption(
147     std::unique_ptr<ServerBuilderOption> option) {
148   options_.push_back(std::move(option));
149   return *this;
150 }
151 
SetSyncServerOption(ServerBuilder::SyncServerOption option,int val)152 ServerBuilder& ServerBuilder::SetSyncServerOption(
153     ServerBuilder::SyncServerOption option, int val) {
154   switch (option) {
155     case NUM_CQS:
156       sync_server_settings_.num_cqs = val;
157       break;
158     case MIN_POLLERS:
159       sync_server_settings_.min_pollers = val;
160       break;
161     case MAX_POLLERS:
162       sync_server_settings_.max_pollers = val;
163       break;
164     case CQ_TIMEOUT_MSEC:
165       sync_server_settings_.cq_timeout_msec = val;
166       break;
167   }
168   return *this;
169 }
170 
SetCompressionAlgorithmSupportStatus(grpc_compression_algorithm algorithm,bool enabled)171 ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
172     grpc_compression_algorithm algorithm, bool enabled) {
173   if (enabled) {
174     GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm);
175   } else {
176     GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm);
177   }
178   return *this;
179 }
180 
SetDefaultCompressionLevel(grpc_compression_level level)181 ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
182     grpc_compression_level level) {
183   maybe_default_compression_level_.is_set = true;
184   maybe_default_compression_level_.level = level;
185   return *this;
186 }
187 
SetDefaultCompressionAlgorithm(grpc_compression_algorithm algorithm)188 ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
189     grpc_compression_algorithm algorithm) {
190   maybe_default_compression_algorithm_.is_set = true;
191   maybe_default_compression_algorithm_.algorithm = algorithm;
192   return *this;
193 }
194 
SetResourceQuota(const grpc::ResourceQuota & resource_quota)195 ServerBuilder& ServerBuilder::SetResourceQuota(
196     const grpc::ResourceQuota& resource_quota) {
197   if (resource_quota_ != nullptr) {
198     grpc_resource_quota_unref(resource_quota_);
199   }
200   resource_quota_ = resource_quota.c_resource_quota();
201   grpc_resource_quota_ref(resource_quota_);
202   return *this;
203 }
204 
AddListeningPort(const std::string & addr_uri,std::shared_ptr<ServerCredentials> creds,int * selected_port)205 ServerBuilder& ServerBuilder::AddListeningPort(
206     const std::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
207     int* selected_port) {
208   const std::string uri_scheme = "dns:";
209   std::string addr = addr_uri;
210   if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
211     size_t pos = uri_scheme.size();
212     while (addr_uri[pos] == '/') ++pos;  // Skip slashes.
213     addr = addr_uri.substr(pos);
214   }
215   Port port = {addr, std::move(creds), selected_port};
216   ports_.push_back(port);
217   return *this;
218 }
219 
BuildAndStart()220 std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
221   grpc::ChannelArguments args;
222   if (max_receive_message_size_ >= -1) {
223     args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
224   }
225   if (max_send_message_size_ >= -1) {
226     args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
227   }
228   for (const auto& option : options_) {
229     option->UpdateArguments(&args);
230     option->UpdatePlugins(&plugins_);
231   }
232   args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
233               enabled_compression_algorithms_bitset_);
234   if (maybe_default_compression_level_.is_set) {
235     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL,
236                 maybe_default_compression_level_.level);
237   }
238   if (maybe_default_compression_algorithm_.is_set) {
239     args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
240                 maybe_default_compression_algorithm_.algorithm);
241   }
242 
243   if (resource_quota_ != nullptr) {
244     args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
245                               grpc_resource_quota_arg_vtable());
246   }
247 
248   for (const auto& plugin : plugins_) {
249     plugin->UpdateServerBuilder(this);
250     plugin->UpdateChannelArguments(&args);
251   }
252 
253   // == Determine if the server has any syncrhonous methods ==
254   bool has_sync_methods = false;
255   for (const auto& value : services_) {
256     if (value->service->has_synchronous_methods()) {
257       has_sync_methods = true;
258       break;
259     }
260   }
261 
262   if (!has_sync_methods) {
263     for (const auto& value : plugins_) {
264       if (value->has_sync_methods()) {
265         has_sync_methods = true;
266         break;
267       }
268     }
269   }
270 
271   // If this is a Sync server, i.e a server expositing sync API, then the server
272   // needs to create some completion queues to listen for incoming requests.
273   // 'sync_server_cqs' are those internal completion queues.
274   //
275   // This is different from the completion queues added to the server via
276   // ServerBuilder's AddCompletionQueue() method (those completion queues
277   // are in 'cqs_' member variable of ServerBuilder object)
278   std::shared_ptr<
279       std::vector<std::unique_ptr<grpc_impl::ServerCompletionQueue>>>
280       sync_server_cqs(
281           std::make_shared<std::vector<
282               std::unique_ptr<grpc_impl::ServerCompletionQueue>>>());
283 
284   bool has_frequently_polled_cqs = false;
285   for (const auto& cq : cqs_) {
286     if (cq->IsFrequentlyPolled()) {
287       has_frequently_polled_cqs = true;
288       break;
289     }
290   }
291 
292   // == Determine if the server has any callback methods ==
293   bool has_callback_methods = false;
294   for (const auto& service : services_) {
295     if (service->service->has_callback_methods()) {
296       has_callback_methods = true;
297       has_frequently_polled_cqs = true;
298       break;
299     }
300   }
301 
302   const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs;
303 
304   if (has_sync_methods) {
305     grpc_cq_polling_type polling_type =
306         is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
307 
308     // Create completion queues to listen to incoming rpc requests
309     for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
310       sync_server_cqs->emplace_back(new grpc_impl::ServerCompletionQueue(
311           GRPC_CQ_NEXT, polling_type, nullptr));
312     }
313   }
314 
315   // TODO(vjpai): Add a section here for plugins once they can support callback
316   // methods
317 
318   if (has_sync_methods) {
319     // This is a Sync server
320     gpr_log(GPR_INFO,
321             "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
322             "%d, CQ timeout (msec): %d",
323             sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
324             sync_server_settings_.max_pollers,
325             sync_server_settings_.cq_timeout_msec);
326   }
327 
328   if (has_callback_methods) {
329     gpr_log(GPR_INFO, "Callback server.");
330   }
331 
332   std::unique_ptr<grpc::Server> server(new grpc::Server(
333       &args, sync_server_cqs, sync_server_settings_.min_pollers,
334       sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
335       std::move(acceptors_), resource_quota_,
336       std::move(interceptor_creators_)));
337 
338   grpc_impl::ServerInitializer* initializer = server->initializer();
339 
340   // Register all the completion queues with the server. i.e
341   //  1. sync_server_cqs: internal completion queues created IF this is a sync
342   //     server
343   //  2. cqs_: Completion queues added via AddCompletionQueue() call
344 
345   for (const auto& cq : *sync_server_cqs) {
346     grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
347     has_frequently_polled_cqs = true;
348   }
349 
350   if (has_callback_methods || callback_generic_service_ != nullptr) {
351     auto* cq = server->CallbackCQ();
352     grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
353   }
354 
355   // cqs_ contains the completion queue added by calling the ServerBuilder's
356   // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
357   // calling Next() or AsyncNext()) and hence are not safe to be used for
358   // listening to incoming channels. Such completion queues must be registered
359   // as non-listening queues. In debug mode, these should have their server list
360   // tracked since these are provided the user and must be Shutdown by the user
361   // after the server is shutdown.
362   for (const auto& cq : cqs_) {
363     grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
364     cq->RegisterServer(server.get());
365   }
366 
367   if (!has_frequently_polled_cqs) {
368     gpr_log(GPR_ERROR,
369             "At least one of the completion queues must be frequently polled");
370     return nullptr;
371   }
372 
373   for (const auto& value : services_) {
374     if (!server->RegisterService(value->host.get(), value->service)) {
375       return nullptr;
376     }
377   }
378 
379   for (const auto& value : plugins_) {
380     value->InitServer(initializer);
381   }
382 
383   if (generic_service_) {
384     server->RegisterAsyncGenericService(generic_service_);
385   } else if (callback_generic_service_) {
386     server->RegisterCallbackGenericService(callback_generic_service_);
387   } else {
388     for (const auto& value : services_) {
389       if (value->service->has_generic_methods()) {
390         gpr_log(GPR_ERROR,
391                 "Some methods were marked generic but there is no "
392                 "generic service registered.");
393         return nullptr;
394       }
395     }
396   }
397 
398   bool added_port = false;
399   for (auto& port : ports_) {
400     int r = server->AddListeningPort(port.addr, port.creds.get());
401     if (!r) {
402       if (added_port) server->Shutdown();
403       return nullptr;
404     }
405     added_port = true;
406     if (port.selected_port != nullptr) {
407       *port.selected_port = r;
408     }
409   }
410 
411   auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
412   server->Start(cqs_data, cqs_.size());
413 
414   for (const auto& value : plugins_) {
415     value->Finish(initializer);
416   }
417 
418   return server;
419 }
420 
InternalAddPluginFactory(std::unique_ptr<ServerBuilderPlugin> (* CreatePlugin)())421 void ServerBuilder::InternalAddPluginFactory(
422     std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
423   gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
424   (*g_plugin_factory_list).push_back(CreatePlugin);
425 }
426 
EnableWorkaround(grpc_workaround_list id)427 ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
428   switch (id) {
429     case GRPC_WORKAROUND_ID_CRONET_COMPRESSION:
430       return AddChannelArgument(GRPC_ARG_WORKAROUND_CRONET_COMPRESSION, 1);
431     default:
432       gpr_log(GPR_ERROR, "Workaround %u does not exist or is obsolete.", id);
433       return *this;
434   }
435 }
436 
437 }  // namespace grpc
438