1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpcpp/server_builder.h>
20
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/log.h>
23 #include <grpcpp/impl/service_type.h>
24 #include <grpcpp/resource_quota.h>
25 #include <grpcpp/server.h>
26
27 #include <utility>
28
29 #include "src/core/lib/gpr/useful.h"
30 #include "src/cpp/server/thread_pool_interface.h"
31
32 namespace grpc {
33
34 static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
35 g_plugin_factory_list;
36 static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
37
do_plugin_list_init(void)38 static void do_plugin_list_init(void) {
39 g_plugin_factory_list =
40 new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
41 }
42
ServerBuilder()43 ServerBuilder::ServerBuilder()
44 : max_receive_message_size_(INT_MIN),
45 max_send_message_size_(INT_MIN),
46 sync_server_settings_(SyncServerSettings()),
47 resource_quota_(nullptr),
48 generic_service_(nullptr) {
49 gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
50 for (auto it = g_plugin_factory_list->begin();
51 it != g_plugin_factory_list->end(); it++) {
52 auto& factory = *it;
53 plugins_.emplace_back(factory());
54 }
55
56 // all compression algorithms enabled by default.
57 enabled_compression_algorithms_bitset_ =
58 (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
59 memset(&maybe_default_compression_level_, 0,
60 sizeof(maybe_default_compression_level_));
61 memset(&maybe_default_compression_algorithm_, 0,
62 sizeof(maybe_default_compression_algorithm_));
63 }
64
~ServerBuilder()65 ServerBuilder::~ServerBuilder() {
66 if (resource_quota_ != nullptr) {
67 grpc_resource_quota_unref(resource_quota_);
68 }
69 }
70
AddCompletionQueue(bool is_frequently_polled)71 std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue(
72 bool is_frequently_polled) {
73 ServerCompletionQueue* cq = new ServerCompletionQueue(
74 is_frequently_polled ? GRPC_CQ_DEFAULT_POLLING : GRPC_CQ_NON_LISTENING);
75 cqs_.push_back(cq);
76 return std::unique_ptr<ServerCompletionQueue>(cq);
77 }
78
RegisterService(Service * service)79 ServerBuilder& ServerBuilder::RegisterService(Service* service) {
80 services_.emplace_back(new NamedService(service));
81 return *this;
82 }
83
RegisterService(const grpc::string & addr,Service * service)84 ServerBuilder& ServerBuilder::RegisterService(const grpc::string& addr,
85 Service* service) {
86 services_.emplace_back(new NamedService(addr, service));
87 return *this;
88 }
89
RegisterAsyncGenericService(AsyncGenericService * service)90 ServerBuilder& ServerBuilder::RegisterAsyncGenericService(
91 AsyncGenericService* service) {
92 if (generic_service_) {
93 gpr_log(GPR_ERROR,
94 "Adding multiple AsyncGenericService is unsupported for now. "
95 "Dropping the service %p",
96 (void*)service);
97 } else {
98 generic_service_ = service;
99 }
100 return *this;
101 }
102
SetOption(std::unique_ptr<ServerBuilderOption> option)103 ServerBuilder& ServerBuilder::SetOption(
104 std::unique_ptr<ServerBuilderOption> option) {
105 options_.push_back(std::move(option));
106 return *this;
107 }
108
SetSyncServerOption(ServerBuilder::SyncServerOption option,int val)109 ServerBuilder& ServerBuilder::SetSyncServerOption(
110 ServerBuilder::SyncServerOption option, int val) {
111 switch (option) {
112 case NUM_CQS:
113 sync_server_settings_.num_cqs = val;
114 break;
115 case MIN_POLLERS:
116 sync_server_settings_.min_pollers = val;
117 break;
118 case MAX_POLLERS:
119 sync_server_settings_.max_pollers = val;
120 break;
121 case CQ_TIMEOUT_MSEC:
122 sync_server_settings_.cq_timeout_msec = val;
123 break;
124 }
125 return *this;
126 }
127
SetCompressionAlgorithmSupportStatus(grpc_compression_algorithm algorithm,bool enabled)128 ServerBuilder& ServerBuilder::SetCompressionAlgorithmSupportStatus(
129 grpc_compression_algorithm algorithm, bool enabled) {
130 if (enabled) {
131 GPR_BITSET(&enabled_compression_algorithms_bitset_, algorithm);
132 } else {
133 GPR_BITCLEAR(&enabled_compression_algorithms_bitset_, algorithm);
134 }
135 return *this;
136 }
137
SetDefaultCompressionLevel(grpc_compression_level level)138 ServerBuilder& ServerBuilder::SetDefaultCompressionLevel(
139 grpc_compression_level level) {
140 maybe_default_compression_level_.level = level;
141 return *this;
142 }
143
SetDefaultCompressionAlgorithm(grpc_compression_algorithm algorithm)144 ServerBuilder& ServerBuilder::SetDefaultCompressionAlgorithm(
145 grpc_compression_algorithm algorithm) {
146 maybe_default_compression_algorithm_.is_set = true;
147 maybe_default_compression_algorithm_.algorithm = algorithm;
148 return *this;
149 }
150
SetResourceQuota(const grpc::ResourceQuota & resource_quota)151 ServerBuilder& ServerBuilder::SetResourceQuota(
152 const grpc::ResourceQuota& resource_quota) {
153 if (resource_quota_ != nullptr) {
154 grpc_resource_quota_unref(resource_quota_);
155 }
156 resource_quota_ = resource_quota.c_resource_quota();
157 grpc_resource_quota_ref(resource_quota_);
158 return *this;
159 }
160
AddListeningPort(const grpc::string & addr_uri,std::shared_ptr<ServerCredentials> creds,int * selected_port)161 ServerBuilder& ServerBuilder::AddListeningPort(
162 const grpc::string& addr_uri, std::shared_ptr<ServerCredentials> creds,
163 int* selected_port) {
164 const grpc::string uri_scheme = "dns:";
165 grpc::string addr = addr_uri;
166 if (addr_uri.compare(0, uri_scheme.size(), uri_scheme) == 0) {
167 size_t pos = uri_scheme.size();
168 while (addr_uri[pos] == '/') ++pos; // Skip slashes.
169 addr = addr_uri.substr(pos);
170 }
171 Port port = {addr, std::move(creds), selected_port};
172 ports_.push_back(port);
173 return *this;
174 }
175
BuildAndStart()176 std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
177 ChannelArguments args;
178 for (auto option = options_.begin(); option != options_.end(); ++option) {
179 (*option)->UpdateArguments(&args);
180 (*option)->UpdatePlugins(&plugins_);
181 }
182
183 for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
184 (*plugin)->UpdateServerBuilder(this);
185 (*plugin)->UpdateChannelArguments(&args);
186 }
187
188 if (max_receive_message_size_ >= -1) {
189 args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
190 }
191
192 // The default message size is -1 (max), so no need to explicitly set it for
193 // -1.
194 if (max_send_message_size_ >= 0) {
195 args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
196 }
197
198 args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
199 enabled_compression_algorithms_bitset_);
200 if (maybe_default_compression_level_.is_set) {
201 args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL,
202 maybe_default_compression_level_.level);
203 }
204 if (maybe_default_compression_algorithm_.is_set) {
205 args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
206 maybe_default_compression_algorithm_.algorithm);
207 }
208
209 if (resource_quota_ != nullptr) {
210 args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
211 grpc_resource_quota_arg_vtable());
212 }
213
214 // == Determine if the server has any syncrhonous methods ==
215 bool has_sync_methods = false;
216 for (auto it = services_.begin(); it != services_.end(); ++it) {
217 if ((*it)->service->has_synchronous_methods()) {
218 has_sync_methods = true;
219 break;
220 }
221 }
222
223 if (!has_sync_methods) {
224 for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
225 if ((*plugin)->has_sync_methods()) {
226 has_sync_methods = true;
227 break;
228 }
229 }
230 }
231
232 // If this is a Sync server, i.e a server expositing sync API, then the server
233 // needs to create some completion queues to listen for incoming requests.
234 // 'sync_server_cqs' are those internal completion queues.
235 //
236 // This is different from the completion queues added to the server via
237 // ServerBuilder's AddCompletionQueue() method (those completion queues
238 // are in 'cqs_' member variable of ServerBuilder object)
239 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
240 sync_server_cqs(std::make_shared<
241 std::vector<std::unique_ptr<ServerCompletionQueue>>>());
242
243 int num_frequently_polled_cqs = 0;
244 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
245 if ((*it)->IsFrequentlyPolled()) {
246 num_frequently_polled_cqs++;
247 }
248 }
249
250 const bool is_hybrid_server =
251 has_sync_methods && num_frequently_polled_cqs > 0;
252
253 if (has_sync_methods) {
254 grpc_cq_polling_type polling_type =
255 is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
256
257 // Create completion queues to listen to incoming rpc requests
258 for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
259 sync_server_cqs->emplace_back(new ServerCompletionQueue(polling_type));
260 }
261 }
262
263 std::unique_ptr<Server> server(new Server(
264 max_receive_message_size_, &args, sync_server_cqs,
265 sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
266 sync_server_settings_.cq_timeout_msec, resource_quota_));
267
268 if (has_sync_methods) {
269 // This is a Sync server
270 gpr_log(GPR_INFO,
271 "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
272 "%d, CQ timeout (msec): %d",
273 sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
274 sync_server_settings_.max_pollers,
275 sync_server_settings_.cq_timeout_msec);
276 }
277
278 ServerInitializer* initializer = server->initializer();
279
280 // Register all the completion queues with the server. i.e
281 // 1. sync_server_cqs: internal completion queues created IF this is a sync
282 // server
283 // 2. cqs_: Completion queues added via AddCompletionQueue() call
284
285 for (auto it = sync_server_cqs->begin(); it != sync_server_cqs->end(); ++it) {
286 grpc_server_register_completion_queue(server->server_, (*it)->cq(),
287 nullptr);
288 num_frequently_polled_cqs++;
289 }
290
291 // cqs_ contains the completion queue added by calling the ServerBuilder's
292 // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
293 // calling Next() or AsyncNext()) and hence are not safe to be used for
294 // listening to incoming channels. Such completion queues must be registered
295 // as non-listening queues
296 for (auto it = cqs_.begin(); it != cqs_.end(); ++it) {
297 grpc_server_register_completion_queue(server->server_, (*it)->cq(),
298 nullptr);
299 }
300
301 if (num_frequently_polled_cqs == 0) {
302 gpr_log(GPR_ERROR,
303 "At least one of the completion queues must be frequently polled");
304 return nullptr;
305 }
306
307 for (auto service = services_.begin(); service != services_.end();
308 service++) {
309 if (!server->RegisterService((*service)->host.get(), (*service)->service)) {
310 return nullptr;
311 }
312 }
313
314 for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
315 (*plugin)->InitServer(initializer);
316 }
317
318 if (generic_service_) {
319 server->RegisterAsyncGenericService(generic_service_);
320 } else {
321 for (auto it = services_.begin(); it != services_.end(); ++it) {
322 if ((*it)->service->has_generic_methods()) {
323 gpr_log(GPR_ERROR,
324 "Some methods were marked generic but there is no "
325 "generic service registered.");
326 return nullptr;
327 }
328 }
329 }
330
331 bool added_port = false;
332 for (auto port = ports_.begin(); port != ports_.end(); port++) {
333 int r = server->AddListeningPort(port->addr, port->creds.get());
334 if (!r) {
335 if (added_port) server->Shutdown();
336 return nullptr;
337 }
338 added_port = true;
339 if (port->selected_port != nullptr) {
340 *port->selected_port = r;
341 }
342 }
343
344 auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
345 server->Start(cqs_data, cqs_.size());
346
347 for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
348 (*plugin)->Finish(initializer);
349 }
350
351 return server;
352 }
353
InternalAddPluginFactory(std::unique_ptr<ServerBuilderPlugin> (* CreatePlugin)())354 void ServerBuilder::InternalAddPluginFactory(
355 std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
356 gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
357 (*g_plugin_factory_list).push_back(CreatePlugin);
358 }
359
EnableWorkaround(grpc_workaround_list id)360 ServerBuilder& ServerBuilder::EnableWorkaround(grpc_workaround_list id) {
361 switch (id) {
362 case GRPC_WORKAROUND_ID_CRONET_COMPRESSION:
363 return AddChannelArgument(GRPC_ARG_WORKAROUND_CRONET_COMPRESSION, 1);
364 default:
365 gpr_log(GPR_ERROR, "Workaround %u does not exist or is obsolete.", id);
366 return *this;
367 }
368 }
369
370 } // namespace grpc
371