1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/server.h"
22
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpc/support/string_util.h>
30
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/channel/connected_channel.h"
33 #include "src/core/lib/debug/stats.h"
34 #include "src/core/lib/gpr/mpscq.h"
35 #include "src/core/lib/gpr/spinlock.h"
36 #include "src/core/lib/gpr/string.h"
37 #include "src/core/lib/iomgr/executor.h"
38 #include "src/core/lib/iomgr/iomgr.h"
39 #include "src/core/lib/slice/slice_internal.h"
40 #include "src/core/lib/surface/api_trace.h"
41 #include "src/core/lib/surface/call.h"
42 #include "src/core/lib/surface/channel.h"
43 #include "src/core/lib/surface/completion_queue.h"
44 #include "src/core/lib/surface/init.h"
45 #include "src/core/lib/transport/metadata.h"
46 #include "src/core/lib/transport/static_metadata.h"
47
48 grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");
49
50 namespace {
51 struct listener {
52 void* arg;
53 void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,
54 size_t pollset_count);
55 void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);
56 struct listener* next;
57 grpc_closure destroy_done;
58 };
59
60 enum requested_call_type { BATCH_CALL, REGISTERED_CALL };
61
62 struct registered_method;
63
64 struct requested_call {
65 gpr_mpscq_node request_link; /* must be first */
66 requested_call_type type;
67 size_t cq_idx;
68 void* tag;
69 grpc_server* server;
70 grpc_completion_queue* cq_bound_to_call;
71 grpc_call** call;
72 grpc_cq_completion completion;
73 grpc_metadata_array* initial_metadata;
74 union {
75 struct {
76 grpc_call_details* details;
77 } batch;
78 struct {
79 registered_method* method;
80 gpr_timespec* deadline;
81 grpc_byte_buffer** optional_payload;
82 } registered;
83 } data;
84 };
85
86 struct channel_registered_method {
87 registered_method* server_registered_method;
88 uint32_t flags;
89 bool has_host;
90 grpc_slice method;
91 grpc_slice host;
92 };
93
94 struct channel_data {
95 grpc_server* server;
96 grpc_connectivity_state connectivity_state;
97 grpc_channel* channel;
98 size_t cq_idx;
99 /* linked list of all channels on a server */
100 channel_data* next;
101 channel_data* prev;
102 channel_registered_method* registered_methods;
103 uint32_t registered_method_slots;
104 uint32_t registered_method_max_probes;
105 grpc_closure finish_destroy_channel_closure;
106 grpc_closure channel_connectivity_changed;
107 };
108
109 typedef struct shutdown_tag {
110 void* tag;
111 grpc_completion_queue* cq;
112 grpc_cq_completion completion;
113 } shutdown_tag;
114
115 typedef enum {
116 /* waiting for metadata */
117 NOT_STARTED,
118 /* inital metadata read, not flow controlled in yet */
119 PENDING,
120 /* flow controlled in, on completion queue */
121 ACTIVATED,
122 /* cancelled before being queued */
123 ZOMBIED
124 } call_state;
125
126 typedef struct request_matcher request_matcher;
127
128 struct call_data {
129 grpc_call* call;
130
131 gpr_atm state;
132
133 bool path_set;
134 bool host_set;
135 grpc_slice path;
136 grpc_slice host;
137 grpc_millis deadline;
138
139 grpc_completion_queue* cq_new;
140
141 grpc_metadata_batch* recv_initial_metadata;
142 uint32_t recv_initial_metadata_flags;
143 grpc_metadata_array initial_metadata;
144
145 request_matcher* matcher;
146 grpc_byte_buffer* payload;
147
148 grpc_closure got_initial_metadata;
149 grpc_closure server_on_recv_initial_metadata;
150 grpc_closure kill_zombie_closure;
151 grpc_closure* on_done_recv_initial_metadata;
152 grpc_closure recv_trailing_metadata_ready;
153 grpc_error* error;
154 grpc_closure* original_recv_trailing_metadata_ready;
155
156 grpc_closure publish;
157
158 call_data* pending_next;
159 };
160
161 struct request_matcher {
162 grpc_server* server;
163 call_data* pending_head;
164 call_data* pending_tail;
165 gpr_locked_mpscq* requests_per_cq;
166 };
167
168 struct registered_method {
169 char* method;
170 char* host;
171 grpc_server_register_method_payload_handling payload_handling;
172 uint32_t flags;
173 /* one request matcher per method */
174 request_matcher matcher;
175 registered_method* next;
176 };
177
178 typedef struct {
179 grpc_channel** channels;
180 size_t num_channels;
181 } channel_broadcaster;
182 } // namespace
183
184 struct grpc_server {
185 grpc_channel_args* channel_args;
186
187 grpc_completion_queue** cqs;
188 grpc_pollset** pollsets;
189 size_t cq_count;
190 size_t pollset_count;
191 bool started;
192
193 /* The two following mutexes control access to server-state
194 mu_global controls access to non-call-related state (e.g., channel state)
195 mu_call controls access to call-related state (e.g., the call lists)
196
197 If they are ever required to be nested, you must lock mu_global
198 before mu_call. This is currently used in shutdown processing
199 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
200 gpr_mu mu_global; /* mutex for server and channel state */
201 gpr_mu mu_call; /* mutex for call-specific state */
202
203 /* startup synchronization: flag is protected by mu_global, signals whether
204 we are doing the listener start routine or not */
205 bool starting;
206 gpr_cv starting_cv;
207
208 registered_method* registered_methods;
209 /** one request matcher for unregistered methods */
210 request_matcher unregistered_request_matcher;
211
212 gpr_atm shutdown_flag;
213 uint8_t shutdown_published;
214 size_t num_shutdown_tags;
215 shutdown_tag* shutdown_tags;
216
217 channel_data root_channel_data;
218
219 listener* listeners;
220 int listeners_destroyed;
221 gpr_refcount internal_refcount;
222
223 /** when did we print the last shutdown progress message */
224 gpr_timespec last_shutdown_message_time;
225
226 grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
227 };
228
229 #define SERVER_FROM_CALL_ELEM(elem) \
230 (((channel_data*)(elem)->channel_data)->server)
231
232 static void publish_new_rpc(void* calld, grpc_error* error);
233 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
234 grpc_error* error);
235 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
236 hold mu_call */
237 static void maybe_finish_shutdown(grpc_server* server);
238
239 /*
240 * channel broadcaster
241 */
242
243 /* assumes server locked */
channel_broadcaster_init(grpc_server * s,channel_broadcaster * cb)244 static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {
245 channel_data* c;
246 size_t count = 0;
247 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
248 count++;
249 }
250 cb->num_channels = count;
251 cb->channels = static_cast<grpc_channel**>(
252 gpr_malloc(sizeof(*cb->channels) * cb->num_channels));
253 count = 0;
254 for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
255 cb->channels[count++] = c->channel;
256 GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
257 }
258 }
259
260 struct shutdown_cleanup_args {
261 grpc_closure closure;
262 grpc_slice slice;
263 };
264
shutdown_cleanup(void * arg,grpc_error * error)265 static void shutdown_cleanup(void* arg, grpc_error* error) {
266 struct shutdown_cleanup_args* a =
267 static_cast<struct shutdown_cleanup_args*>(arg);
268 grpc_slice_unref_internal(a->slice);
269 gpr_free(a);
270 }
271
send_shutdown(grpc_channel * channel,bool send_goaway,grpc_error * send_disconnect)272 static void send_shutdown(grpc_channel* channel, bool send_goaway,
273 grpc_error* send_disconnect) {
274 struct shutdown_cleanup_args* sc =
275 static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));
276 GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
277 grpc_schedule_on_exec_ctx);
278 grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
279 grpc_channel_element* elem;
280
281 op->goaway_error =
282 send_goaway ? grpc_error_set_int(
283 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
284 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
285 : GRPC_ERROR_NONE;
286 op->set_accept_stream = true;
287 sc->slice = grpc_slice_from_copied_string("Server shutdown");
288 op->disconnect_with_error = send_disconnect;
289
290 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
291 elem->filter->start_transport_op(elem, op);
292 }
293
channel_broadcaster_shutdown(channel_broadcaster * cb,bool send_goaway,grpc_error * force_disconnect)294 static void channel_broadcaster_shutdown(channel_broadcaster* cb,
295 bool send_goaway,
296 grpc_error* force_disconnect) {
297 size_t i;
298
299 for (i = 0; i < cb->num_channels; i++) {
300 send_shutdown(cb->channels[i], send_goaway,
301 GRPC_ERROR_REF(force_disconnect));
302 GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");
303 }
304 gpr_free(cb->channels);
305 GRPC_ERROR_UNREF(force_disconnect);
306 }
307
308 /*
309 * request_matcher
310 */
311
request_matcher_init(request_matcher * rm,grpc_server * server)312 static void request_matcher_init(request_matcher* rm, grpc_server* server) {
313 memset(rm, 0, sizeof(*rm));
314 rm->server = server;
315 rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(
316 gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));
317 for (size_t i = 0; i < server->cq_count; i++) {
318 gpr_locked_mpscq_init(&rm->requests_per_cq[i]);
319 }
320 }
321
request_matcher_destroy(request_matcher * rm)322 static void request_matcher_destroy(request_matcher* rm) {
323 for (size_t i = 0; i < rm->server->cq_count; i++) {
324 GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);
325 gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);
326 }
327 gpr_free(rm->requests_per_cq);
328 }
329
kill_zombie(void * elem,grpc_error * error)330 static void kill_zombie(void* elem, grpc_error* error) {
331 grpc_call_unref(
332 grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
333 }
334
request_matcher_zombify_all_pending_calls(request_matcher * rm)335 static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {
336 while (rm->pending_head) {
337 call_data* calld = rm->pending_head;
338 rm->pending_head = calld->pending_next;
339 gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
340 GRPC_CLOSURE_INIT(
341 &calld->kill_zombie_closure, kill_zombie,
342 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
343 grpc_schedule_on_exec_ctx);
344 GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
345 }
346 }
347
request_matcher_kill_requests(grpc_server * server,request_matcher * rm,grpc_error * error)348 static void request_matcher_kill_requests(grpc_server* server,
349 request_matcher* rm,
350 grpc_error* error) {
351 requested_call* rc;
352 for (size_t i = 0; i < server->cq_count; i++) {
353 while ((rc = reinterpret_cast<requested_call*>(
354 gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {
355 fail_call(server, i, rc, GRPC_ERROR_REF(error));
356 }
357 }
358 GRPC_ERROR_UNREF(error);
359 }
360
361 /*
362 * server proper
363 */
364
server_ref(grpc_server * server)365 static void server_ref(grpc_server* server) {
366 gpr_ref(&server->internal_refcount);
367 }
368
server_delete(grpc_server * server)369 static void server_delete(grpc_server* server) {
370 registered_method* rm;
371 size_t i;
372 server->channelz_server.reset();
373 grpc_channel_args_destroy(server->channel_args);
374 gpr_mu_destroy(&server->mu_global);
375 gpr_mu_destroy(&server->mu_call);
376 gpr_cv_destroy(&server->starting_cv);
377 while ((rm = server->registered_methods) != nullptr) {
378 server->registered_methods = rm->next;
379 if (server->started) {
380 request_matcher_destroy(&rm->matcher);
381 }
382 gpr_free(rm->method);
383 gpr_free(rm->host);
384 gpr_free(rm);
385 }
386 if (server->started) {
387 request_matcher_destroy(&server->unregistered_request_matcher);
388 }
389 for (i = 0; i < server->cq_count; i++) {
390 GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
391 }
392 gpr_free(server->cqs);
393 gpr_free(server->pollsets);
394 gpr_free(server->shutdown_tags);
395 gpr_free(server);
396 }
397
server_unref(grpc_server * server)398 static void server_unref(grpc_server* server) {
399 if (gpr_unref(&server->internal_refcount)) {
400 server_delete(server);
401 }
402 }
403
is_channel_orphaned(channel_data * chand)404 static int is_channel_orphaned(channel_data* chand) {
405 return chand->next == chand;
406 }
407
orphan_channel(channel_data * chand)408 static void orphan_channel(channel_data* chand) {
409 chand->next->prev = chand->prev;
410 chand->prev->next = chand->next;
411 chand->next = chand->prev = chand;
412 }
413
finish_destroy_channel(void * cd,grpc_error * error)414 static void finish_destroy_channel(void* cd, grpc_error* error) {
415 channel_data* chand = static_cast<channel_data*>(cd);
416 grpc_server* server = chand->server;
417 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
418 server_unref(server);
419 }
420
destroy_channel(channel_data * chand,grpc_error * error)421 static void destroy_channel(channel_data* chand, grpc_error* error) {
422 if (is_channel_orphaned(chand)) return;
423 GPR_ASSERT(chand->server != nullptr);
424 orphan_channel(chand);
425 server_ref(chand->server);
426 maybe_finish_shutdown(chand->server);
427 GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
428 finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
429
430 if (grpc_server_channel_trace.enabled() && error != GRPC_ERROR_NONE) {
431 const char* msg = grpc_error_string(error);
432 gpr_log(GPR_INFO, "Disconnected client: %s", msg);
433 }
434 GRPC_ERROR_UNREF(error);
435
436 grpc_transport_op* op =
437 grpc_make_transport_op(&chand->finish_destroy_channel_closure);
438 op->set_accept_stream = true;
439 grpc_channel_next_op(grpc_channel_stack_element(
440 grpc_channel_get_channel_stack(chand->channel), 0),
441 op);
442 }
443
done_request_event(void * req,grpc_cq_completion * c)444 static void done_request_event(void* req, grpc_cq_completion* c) {
445 gpr_free(req);
446 }
447
publish_call(grpc_server * server,call_data * calld,size_t cq_idx,requested_call * rc)448 static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
449 requested_call* rc) {
450 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
451 grpc_call* call = calld->call;
452 *rc->call = call;
453 calld->cq_new = server->cqs[cq_idx];
454 GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
455 switch (rc->type) {
456 case BATCH_CALL:
457 GPR_ASSERT(calld->host_set);
458 GPR_ASSERT(calld->path_set);
459 rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
460 rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
461 rc->data.batch.details->deadline =
462 grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
463 rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
464 break;
465 case REGISTERED_CALL:
466 *rc->data.registered.deadline =
467 grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
468 if (rc->data.registered.optional_payload) {
469 *rc->data.registered.optional_payload = calld->payload;
470 calld->payload = nullptr;
471 }
472 break;
473 default:
474 GPR_UNREACHABLE_CODE(return );
475 }
476
477 grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
478 rc, &rc->completion);
479 }
480
publish_new_rpc(void * arg,grpc_error * error)481 static void publish_new_rpc(void* arg, grpc_error* error) {
482 grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
483 call_data* calld = static_cast<call_data*>(call_elem->call_data);
484 channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
485 request_matcher* rm = calld->matcher;
486 grpc_server* server = rm->server;
487
488 if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {
489 gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
490 GRPC_CLOSURE_INIT(
491 &calld->kill_zombie_closure, kill_zombie,
492 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
493 grpc_schedule_on_exec_ctx);
494 GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));
495 return;
496 }
497
498 for (size_t i = 0; i < server->cq_count; i++) {
499 size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
500 requested_call* rc = reinterpret_cast<requested_call*>(
501 gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));
502 if (rc == nullptr) {
503 continue;
504 } else {
505 GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
506 gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
507 publish_call(server, calld, cq_idx, rc);
508 return; /* early out */
509 }
510 }
511
512 /* no cq to take the request found: queue it on the slow list */
513 GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
514 gpr_mu_lock(&server->mu_call);
515
516 // We need to ensure that all the queues are empty. We do this under
517 // the server mu_call lock to ensure that if something is added to
518 // an empty request queue, it will block until the call is actually
519 // added to the pending list.
520 for (size_t i = 0; i < server->cq_count; i++) {
521 size_t cq_idx = (chand->cq_idx + i) % server->cq_count;
522 requested_call* rc = reinterpret_cast<requested_call*>(
523 gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
524 if (rc == nullptr) {
525 continue;
526 } else {
527 gpr_mu_unlock(&server->mu_call);
528 GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);
529 gpr_atm_no_barrier_store(&calld->state, ACTIVATED);
530 publish_call(server, calld, cq_idx, rc);
531 return; /* early out */
532 }
533 }
534
535 gpr_atm_no_barrier_store(&calld->state, PENDING);
536 if (rm->pending_head == nullptr) {
537 rm->pending_tail = rm->pending_head = calld;
538 } else {
539 rm->pending_tail->pending_next = calld;
540 rm->pending_tail = calld;
541 }
542 calld->pending_next = nullptr;
543 gpr_mu_unlock(&server->mu_call);
544 }
545
finish_start_new_rpc(grpc_server * server,grpc_call_element * elem,request_matcher * rm,grpc_server_register_method_payload_handling payload_handling)546 static void finish_start_new_rpc(
547 grpc_server* server, grpc_call_element* elem, request_matcher* rm,
548 grpc_server_register_method_payload_handling payload_handling) {
549 call_data* calld = static_cast<call_data*>(elem->call_data);
550
551 if (gpr_atm_acq_load(&server->shutdown_flag)) {
552 gpr_atm_no_barrier_store(&calld->state, ZOMBIED);
553 GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
554 grpc_schedule_on_exec_ctx);
555 GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
556 return;
557 }
558
559 calld->matcher = rm;
560
561 switch (payload_handling) {
562 case GRPC_SRM_PAYLOAD_NONE:
563 publish_new_rpc(elem, GRPC_ERROR_NONE);
564 break;
565 case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
566 grpc_op op;
567 memset(&op, 0, sizeof(op));
568 op.op = GRPC_OP_RECV_MESSAGE;
569 op.data.recv_message.recv_message = &calld->payload;
570 GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
571 grpc_schedule_on_exec_ctx);
572 grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
573 break;
574 }
575 }
576 }
577
start_new_rpc(grpc_call_element * elem)578 static void start_new_rpc(grpc_call_element* elem) {
579 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
580 call_data* calld = static_cast<call_data*>(elem->call_data);
581 grpc_server* server = chand->server;
582 uint32_t i;
583 uint32_t hash;
584 channel_registered_method* rm;
585
586 if (chand->registered_methods && calld->path_set && calld->host_set) {
587 /* TODO(ctiller): unify these two searches */
588 /* check for an exact match with host */
589 hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(calld->host),
590 grpc_slice_hash(calld->path));
591 for (i = 0; i <= chand->registered_method_max_probes; i++) {
592 rm = &chand->registered_methods[(hash + i) %
593 chand->registered_method_slots];
594 if (!rm) break;
595 if (!rm->has_host) continue;
596 if (!grpc_slice_eq(rm->host, calld->host)) continue;
597 if (!grpc_slice_eq(rm->method, calld->path)) continue;
598 if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
599 0 == (calld->recv_initial_metadata_flags &
600 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
601 continue;
602 }
603 finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
604 rm->server_registered_method->payload_handling);
605 return;
606 }
607 /* check for a wildcard method definition (no host set) */
608 hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash(calld->path));
609 for (i = 0; i <= chand->registered_method_max_probes; i++) {
610 rm = &chand->registered_methods[(hash + i) %
611 chand->registered_method_slots];
612 if (!rm) break;
613 if (rm->has_host) continue;
614 if (!grpc_slice_eq(rm->method, calld->path)) continue;
615 if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
616 0 == (calld->recv_initial_metadata_flags &
617 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
618 continue;
619 }
620 finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,
621 rm->server_registered_method->payload_handling);
622 return;
623 }
624 }
625 finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,
626 GRPC_SRM_PAYLOAD_NONE);
627 }
628
num_listeners(grpc_server * server)629 static int num_listeners(grpc_server* server) {
630 listener* l;
631 int n = 0;
632 for (l = server->listeners; l; l = l->next) {
633 n++;
634 }
635 return n;
636 }
637
done_shutdown_event(void * server,grpc_cq_completion * completion)638 static void done_shutdown_event(void* server, grpc_cq_completion* completion) {
639 server_unref(static_cast<grpc_server*>(server));
640 }
641
num_channels(grpc_server * server)642 static int num_channels(grpc_server* server) {
643 channel_data* chand;
644 int n = 0;
645 for (chand = server->root_channel_data.next;
646 chand != &server->root_channel_data; chand = chand->next) {
647 n++;
648 }
649 return n;
650 }
651
kill_pending_work_locked(grpc_server * server,grpc_error * error)652 static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
653 if (server->started) {
654 request_matcher_kill_requests(server, &server->unregistered_request_matcher,
655 GRPC_ERROR_REF(error));
656 request_matcher_zombify_all_pending_calls(
657 &server->unregistered_request_matcher);
658 for (registered_method* rm = server->registered_methods; rm;
659 rm = rm->next) {
660 request_matcher_kill_requests(server, &rm->matcher,
661 GRPC_ERROR_REF(error));
662 request_matcher_zombify_all_pending_calls(&rm->matcher);
663 }
664 }
665 GRPC_ERROR_UNREF(error);
666 }
667
maybe_finish_shutdown(grpc_server * server)668 static void maybe_finish_shutdown(grpc_server* server) {
669 size_t i;
670 if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
671 return;
672 }
673
674 kill_pending_work_locked(
675 server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
676
677 if (server->root_channel_data.next != &server->root_channel_data ||
678 server->listeners_destroyed < num_listeners(server)) {
679 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
680 server->last_shutdown_message_time),
681 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
682 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
683 gpr_log(GPR_DEBUG,
684 "Waiting for %d channels and %d/%d listeners to be destroyed"
685 " before shutting down server",
686 num_channels(server),
687 num_listeners(server) - server->listeners_destroyed,
688 num_listeners(server));
689 }
690 return;
691 }
692 server->shutdown_published = 1;
693 for (i = 0; i < server->num_shutdown_tags; i++) {
694 server_ref(server);
695 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
696 GRPC_ERROR_NONE, done_shutdown_event, server,
697 &server->shutdown_tags[i].completion);
698 }
699 }
700
server_on_recv_initial_metadata(void * ptr,grpc_error * error)701 static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
702 grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
703 call_data* calld = static_cast<call_data*>(elem->call_data);
704 grpc_millis op_deadline;
705
706 if (error == GRPC_ERROR_NONE) {
707 GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
708 GPR_ASSERT(calld->recv_initial_metadata->idx.named.authority != nullptr);
709 calld->path = grpc_slice_ref_internal(
710 GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
711 calld->host = grpc_slice_ref_internal(
712 GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
713 calld->path_set = true;
714 calld->host_set = true;
715 grpc_metadata_batch_remove(calld->recv_initial_metadata,
716 calld->recv_initial_metadata->idx.named.path);
717 grpc_metadata_batch_remove(
718 calld->recv_initial_metadata,
719 calld->recv_initial_metadata->idx.named.authority);
720 } else {
721 GRPC_ERROR_REF(error);
722 }
723 op_deadline = calld->recv_initial_metadata->deadline;
724 if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
725 calld->deadline = op_deadline;
726 }
727 if (calld->host_set && calld->path_set) {
728 /* do nothing */
729 } else {
730 grpc_error* src_error = error;
731 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
732 "Missing :authority or :path", &error, 1);
733 GRPC_ERROR_UNREF(src_error);
734 }
735
736 GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
737 }
738
server_recv_trailing_metadata_ready(void * user_data,grpc_error * err)739 static void server_recv_trailing_metadata_ready(void* user_data,
740 grpc_error* err) {
741 grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
742 call_data* calld = static_cast<call_data*>(elem->call_data);
743 err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
744 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
745 }
746
server_mutate_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)747 static void server_mutate_op(grpc_call_element* elem,
748 grpc_transport_stream_op_batch* op) {
749 call_data* calld = static_cast<call_data*>(elem->call_data);
750
751 if (op->recv_initial_metadata) {
752 GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
753 calld->recv_initial_metadata =
754 op->payload->recv_initial_metadata.recv_initial_metadata;
755 calld->on_done_recv_initial_metadata =
756 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
757 op->payload->recv_initial_metadata.recv_initial_metadata_ready =
758 &calld->server_on_recv_initial_metadata;
759 op->payload->recv_initial_metadata.recv_flags =
760 &calld->recv_initial_metadata_flags;
761 }
762 if (op->recv_trailing_metadata) {
763 calld->original_recv_trailing_metadata_ready =
764 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
765 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
766 &calld->recv_trailing_metadata_ready;
767 }
768 }
769
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)770 static void server_start_transport_stream_op_batch(
771 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
772 server_mutate_op(elem, op);
773 grpc_call_next_op(elem, op);
774 }
775
got_initial_metadata(void * ptr,grpc_error * error)776 static void got_initial_metadata(void* ptr, grpc_error* error) {
777 grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
778 call_data* calld = static_cast<call_data*>(elem->call_data);
779 if (error == GRPC_ERROR_NONE) {
780 start_new_rpc(elem);
781 } else {
782 if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {
783 GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
784 grpc_schedule_on_exec_ctx);
785 GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
786 } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {
787 /* zombied call will be destroyed when it's removed from the pending
788 queue... later */
789 }
790 }
791 }
792
accept_stream(void * cd,grpc_transport * transport,const void * transport_server_data)793 static void accept_stream(void* cd, grpc_transport* transport,
794 const void* transport_server_data) {
795 channel_data* chand = static_cast<channel_data*>(cd);
796 /* create a call */
797 grpc_call_create_args args;
798 memset(&args, 0, sizeof(args));
799 args.channel = chand->channel;
800 args.server_transport_data = transport_server_data;
801 args.send_deadline = GRPC_MILLIS_INF_FUTURE;
802 args.server = chand->server;
803 grpc_call* call;
804 grpc_error* error = grpc_call_create(&args, &call);
805 grpc_call_element* elem =
806 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
807 if (error != GRPC_ERROR_NONE) {
808 got_initial_metadata(elem, error);
809 GRPC_ERROR_UNREF(error);
810 return;
811 }
812 call_data* calld = static_cast<call_data*>(elem->call_data);
813 grpc_op op;
814 memset(&op, 0, sizeof(op));
815 op.op = GRPC_OP_RECV_INITIAL_METADATA;
816 op.data.recv_initial_metadata.recv_initial_metadata =
817 &calld->initial_metadata;
818 GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
819 grpc_schedule_on_exec_ctx);
820 grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
821 }
822
channel_connectivity_changed(void * cd,grpc_error * error)823 static void channel_connectivity_changed(void* cd, grpc_error* error) {
824 channel_data* chand = static_cast<channel_data*>(cd);
825 grpc_server* server = chand->server;
826 if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {
827 grpc_transport_op* op = grpc_make_transport_op(nullptr);
828 op->on_connectivity_state_change = &chand->channel_connectivity_changed;
829 op->connectivity_state = &chand->connectivity_state;
830 grpc_channel_next_op(grpc_channel_stack_element(
831 grpc_channel_get_channel_stack(chand->channel), 0),
832 op);
833 } else {
834 gpr_mu_lock(&server->mu_global);
835 destroy_channel(chand, GRPC_ERROR_REF(error));
836 gpr_mu_unlock(&server->mu_global);
837 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");
838 }
839 }
840
init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)841 static grpc_error* init_call_elem(grpc_call_element* elem,
842 const grpc_call_element_args* args) {
843 call_data* calld = static_cast<call_data*>(elem->call_data);
844 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
845 memset(calld, 0, sizeof(call_data));
846 calld->deadline = GRPC_MILLIS_INF_FUTURE;
847 calld->call = grpc_call_from_top_element(elem);
848
849 GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
850 server_on_recv_initial_metadata, elem,
851 grpc_schedule_on_exec_ctx);
852 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
853 server_recv_trailing_metadata_ready, elem,
854 grpc_schedule_on_exec_ctx);
855 server_ref(chand->server);
856 return GRPC_ERROR_NONE;
857 }
858
destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * ignored)859 static void destroy_call_elem(grpc_call_element* elem,
860 const grpc_call_final_info* final_info,
861 grpc_closure* ignored) {
862 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
863 call_data* calld = static_cast<call_data*>(elem->call_data);
864
865 GPR_ASSERT(calld->state != PENDING);
866 GRPC_ERROR_UNREF(calld->error);
867 if (calld->host_set) {
868 grpc_slice_unref_internal(calld->host);
869 }
870 if (calld->path_set) {
871 grpc_slice_unref_internal(calld->path);
872 }
873 grpc_metadata_array_destroy(&calld->initial_metadata);
874 grpc_byte_buffer_destroy(calld->payload);
875
876 server_unref(chand->server);
877 }
878
init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)879 static grpc_error* init_channel_elem(grpc_channel_element* elem,
880 grpc_channel_element_args* args) {
881 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
882 GPR_ASSERT(args->is_first);
883 GPR_ASSERT(!args->is_last);
884 chand->server = nullptr;
885 chand->channel = nullptr;
886 chand->next = chand->prev = chand;
887 chand->registered_methods = nullptr;
888 chand->connectivity_state = GRPC_CHANNEL_IDLE;
889 GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,
890 channel_connectivity_changed, chand,
891 grpc_schedule_on_exec_ctx);
892 return GRPC_ERROR_NONE;
893 }
894
destroy_channel_elem(grpc_channel_element * elem)895 static void destroy_channel_elem(grpc_channel_element* elem) {
896 size_t i;
897 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
898 if (chand->registered_methods) {
899 for (i = 0; i < chand->registered_method_slots; i++) {
900 grpc_slice_unref_internal(chand->registered_methods[i].method);
901 if (chand->registered_methods[i].has_host) {
902 grpc_slice_unref_internal(chand->registered_methods[i].host);
903 }
904 }
905 gpr_free(chand->registered_methods);
906 }
907 if (chand->server) {
908 gpr_mu_lock(&chand->server->mu_global);
909 chand->next->prev = chand->prev;
910 chand->prev->next = chand->next;
911 chand->next = chand->prev = chand;
912 maybe_finish_shutdown(chand->server);
913 gpr_mu_unlock(&chand->server->mu_global);
914 server_unref(chand->server);
915 }
916 }
917
918 const grpc_channel_filter grpc_server_top_filter = {
919 server_start_transport_stream_op_batch,
920 grpc_channel_next_op,
921 sizeof(call_data),
922 init_call_elem,
923 grpc_call_stack_ignore_set_pollset_or_pollset_set,
924 destroy_call_elem,
925 sizeof(channel_data),
926 init_channel_elem,
927 destroy_channel_elem,
928 grpc_channel_next_get_info,
929 "server",
930 };
931
register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)932 static void register_completion_queue(grpc_server* server,
933 grpc_completion_queue* cq,
934 void* reserved) {
935 size_t i, n;
936 GPR_ASSERT(!reserved);
937 for (i = 0; i < server->cq_count; i++) {
938 if (server->cqs[i] == cq) return;
939 }
940
941 GRPC_CQ_INTERNAL_REF(cq, "server");
942 n = server->cq_count++;
943 server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(
944 server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));
945 server->cqs[n] = cq;
946 }
947
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)948 void grpc_server_register_completion_queue(grpc_server* server,
949 grpc_completion_queue* cq,
950 void* reserved) {
951 GRPC_API_TRACE(
952 "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
953 (server, cq, reserved));
954
955 if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {
956 gpr_log(GPR_INFO,
957 "Completion queue which is not of type GRPC_CQ_NEXT is being "
958 "registered as a server-completion-queue");
959 /* Ideally we should log an error and abort but ruby-wrapped-language API
960 calls grpc_completion_queue_pluck() on server completion queues */
961 }
962
963 register_completion_queue(server, cq, reserved);
964 }
965
grpc_server_create(const grpc_channel_args * args,void * reserved)966 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
967 grpc_core::ExecCtx exec_ctx;
968 GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
969
970 grpc_server* server =
971 static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));
972
973 gpr_mu_init(&server->mu_global);
974 gpr_mu_init(&server->mu_call);
975 gpr_cv_init(&server->starting_cv);
976
977 /* decremented by grpc_server_destroy */
978 gpr_ref_init(&server->internal_refcount, 1);
979 server->root_channel_data.next = server->root_channel_data.prev =
980 &server->root_channel_data;
981
982 server->channel_args = grpc_channel_args_copy(args);
983
984 const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);
985 if (grpc_channel_arg_get_bool(arg, false)) {
986 arg = grpc_channel_args_find(args,
987 GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
988 size_t trace_events_per_node =
989 grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
990 server->channelz_server =
991 grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
992 trace_events_per_node);
993 server->channelz_server->AddTraceEvent(
994 grpc_core::channelz::ChannelTrace::Severity::Info,
995 grpc_slice_from_static_string("Server created"));
996 }
997
998 return server;
999 }
1000
streq(const char * a,const char * b)1001 static int streq(const char* a, const char* b) {
1002 if (a == nullptr && b == nullptr) return 1;
1003 if (a == nullptr) return 0;
1004 if (b == nullptr) return 0;
1005 return 0 == strcmp(a, b);
1006 }
1007
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1008 void* grpc_server_register_method(
1009 grpc_server* server, const char* method, const char* host,
1010 grpc_server_register_method_payload_handling payload_handling,
1011 uint32_t flags) {
1012 registered_method* m;
1013 GRPC_API_TRACE(
1014 "grpc_server_register_method(server=%p, method=%s, host=%s, "
1015 "flags=0x%08x)",
1016 4, (server, method, host, flags));
1017 if (!method) {
1018 gpr_log(GPR_ERROR,
1019 "grpc_server_register_method method string cannot be NULL");
1020 return nullptr;
1021 }
1022 for (m = server->registered_methods; m; m = m->next) {
1023 if (streq(m->method, method) && streq(m->host, host)) {
1024 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1025 host ? host : "*");
1026 return nullptr;
1027 }
1028 }
1029 if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1030 gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1031 flags);
1032 return nullptr;
1033 }
1034 m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));
1035 m->method = gpr_strdup(method);
1036 m->host = gpr_strdup(host);
1037 m->next = server->registered_methods;
1038 m->payload_handling = payload_handling;
1039 m->flags = flags;
1040 server->registered_methods = m;
1041 return m;
1042 }
1043
start_listeners(void * s,grpc_error * error)1044 static void start_listeners(void* s, grpc_error* error) {
1045 grpc_server* server = static_cast<grpc_server*>(s);
1046 for (listener* l = server->listeners; l; l = l->next) {
1047 l->start(server, l->arg, server->pollsets, server->pollset_count);
1048 }
1049
1050 gpr_mu_lock(&server->mu_global);
1051 server->starting = false;
1052 gpr_cv_signal(&server->starting_cv);
1053 gpr_mu_unlock(&server->mu_global);
1054
1055 server_unref(server);
1056 }
1057
grpc_server_start(grpc_server * server)1058 void grpc_server_start(grpc_server* server) {
1059 size_t i;
1060 grpc_core::ExecCtx exec_ctx;
1061
1062 GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1063
1064 server->started = true;
1065 server->pollset_count = 0;
1066 server->pollsets = static_cast<grpc_pollset**>(
1067 gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));
1068 for (i = 0; i < server->cq_count; i++) {
1069 if (grpc_cq_can_listen(server->cqs[i])) {
1070 server->pollsets[server->pollset_count++] =
1071 grpc_cq_pollset(server->cqs[i]);
1072 }
1073 }
1074 request_matcher_init(&server->unregistered_request_matcher, server);
1075 for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {
1076 request_matcher_init(&rm->matcher, server);
1077 }
1078
1079 server_ref(server);
1080 server->starting = true;
1081 GRPC_CLOSURE_SCHED(
1082 GRPC_CLOSURE_CREATE(start_listeners, server,
1083 grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),
1084 GRPC_ERROR_NONE);
1085 }
1086
grpc_server_get_pollsets(grpc_server * server,grpc_pollset *** pollsets,size_t * pollset_count)1087 void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,
1088 size_t* pollset_count) {
1089 *pollset_count = server->pollset_count;
1090 *pollsets = server->pollsets;
1091 }
1092
grpc_server_setup_transport(grpc_server * s,grpc_transport * transport,grpc_pollset * accepting_pollset,const grpc_channel_args * args)1093 void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,
1094 grpc_pollset* accepting_pollset,
1095 const grpc_channel_args* args) {
1096 size_t num_registered_methods;
1097 size_t alloc;
1098 registered_method* rm;
1099 channel_registered_method* crm;
1100 grpc_channel* channel;
1101 channel_data* chand;
1102 uint32_t hash;
1103 size_t slots;
1104 uint32_t probes;
1105 uint32_t max_probes = 0;
1106 grpc_transport_op* op = nullptr;
1107
1108 channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport);
1109 chand = static_cast<channel_data*>(
1110 grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1111 ->channel_data);
1112 chand->server = s;
1113 server_ref(s);
1114 chand->channel = channel;
1115
1116 size_t cq_idx;
1117 for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {
1118 if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1119 }
1120 if (cq_idx == s->cq_count) {
1121 /* completion queue not found: pick a random one to publish new calls to */
1122 cq_idx = static_cast<size_t>(rand()) % s->cq_count;
1123 }
1124 chand->cq_idx = cq_idx;
1125
1126 num_registered_methods = 0;
1127 for (rm = s->registered_methods; rm; rm = rm->next) {
1128 num_registered_methods++;
1129 }
1130 /* build a lookup table phrased in terms of mdstr's in this channels context
1131 to quickly find registered methods */
1132 if (num_registered_methods > 0) {
1133 slots = 2 * num_registered_methods;
1134 alloc = sizeof(channel_registered_method) * slots;
1135 chand->registered_methods =
1136 static_cast<channel_registered_method*>(gpr_zalloc(alloc));
1137 for (rm = s->registered_methods; rm; rm = rm->next) {
1138 grpc_slice host;
1139 bool has_host;
1140 grpc_slice method;
1141 if (rm->host != nullptr) {
1142 host = grpc_slice_intern(grpc_slice_from_static_string(rm->host));
1143 has_host = true;
1144 } else {
1145 has_host = false;
1146 }
1147 method = grpc_slice_intern(grpc_slice_from_static_string(rm->method));
1148 hash = GRPC_MDSTR_KV_HASH(has_host ? grpc_slice_hash(host) : 0,
1149 grpc_slice_hash(method));
1150 for (probes = 0; chand->registered_methods[(hash + probes) % slots]
1151 .server_registered_method != nullptr;
1152 probes++)
1153 ;
1154 if (probes > max_probes) max_probes = probes;
1155 crm = &chand->registered_methods[(hash + probes) % slots];
1156 crm->server_registered_method = rm;
1157 crm->flags = rm->flags;
1158 crm->has_host = has_host;
1159 if (has_host) {
1160 crm->host = host;
1161 }
1162 crm->method = method;
1163 }
1164 GPR_ASSERT(slots <= UINT32_MAX);
1165 chand->registered_method_slots = static_cast<uint32_t>(slots);
1166 chand->registered_method_max_probes = max_probes;
1167 }
1168
1169 gpr_mu_lock(&s->mu_global);
1170 chand->next = &s->root_channel_data;
1171 chand->prev = chand->next->prev;
1172 chand->next->prev = chand->prev->next = chand;
1173 gpr_mu_unlock(&s->mu_global);
1174
1175 GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
1176 op = grpc_make_transport_op(nullptr);
1177 op->set_accept_stream = true;
1178 op->set_accept_stream_fn = accept_stream;
1179 op->set_accept_stream_user_data = chand;
1180 op->on_connectivity_state_change = &chand->channel_connectivity_changed;
1181 op->connectivity_state = &chand->connectivity_state;
1182 if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {
1183 op->disconnect_with_error =
1184 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1185 }
1186 grpc_transport_perform_op(transport, op);
1187 }
1188
done_published_shutdown(void * done_arg,grpc_cq_completion * storage)1189 void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {
1190 (void)done_arg;
1191 gpr_free(storage);
1192 }
1193
listener_destroy_done(void * s,grpc_error * error)1194 static void listener_destroy_done(void* s, grpc_error* error) {
1195 grpc_server* server = static_cast<grpc_server*>(s);
1196 gpr_mu_lock(&server->mu_global);
1197 server->listeners_destroyed++;
1198 maybe_finish_shutdown(server);
1199 gpr_mu_unlock(&server->mu_global);
1200 }
1201
1202 /*
1203 - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1204 grpc_server_request_call and grpc_server_request_registered call will now be
1205 cancelled). See 'kill_pending_work_locked()'
1206
1207 - Shuts down the listeners (i.e the server will no longer listen on the port
1208 for new incoming channels).
1209
1210 - Iterates through all channels on the server and sends shutdown msg (see
1211 'channel_broadcaster_shutdown()' for details) to the clients via the
1212 transport layer. The transport layer then guarantees the following:
1213 -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1214 -- If the server has outstanding calls that are in the process, the
1215 connection is NOT closed until the server is done with all those calls
1216 -- Once, there are no more calls in progress, the channel is closed
1217 */
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1218 void grpc_server_shutdown_and_notify(grpc_server* server,
1219 grpc_completion_queue* cq, void* tag) {
1220 listener* l;
1221 shutdown_tag* sdt;
1222 channel_broadcaster broadcaster;
1223 grpc_core::ExecCtx exec_ctx;
1224
1225 GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1226 (server, cq, tag));
1227
1228 /* wait for startup to be finished: locks mu_global */
1229 gpr_mu_lock(&server->mu_global);
1230 while (server->starting) {
1231 gpr_cv_wait(&server->starting_cv, &server->mu_global,
1232 gpr_inf_future(GPR_CLOCK_MONOTONIC));
1233 }
1234
1235 /* stay locked, and gather up some stuff to do */
1236 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1237 if (server->shutdown_published) {
1238 grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,
1239 static_cast<grpc_cq_completion*>(
1240 gpr_malloc(sizeof(grpc_cq_completion))));
1241 gpr_mu_unlock(&server->mu_global);
1242 return;
1243 }
1244 server->shutdown_tags = static_cast<shutdown_tag*>(
1245 gpr_realloc(server->shutdown_tags,
1246 sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));
1247 sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1248 sdt->tag = tag;
1249 sdt->cq = cq;
1250 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1251 gpr_mu_unlock(&server->mu_global);
1252 return;
1253 }
1254
1255 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1256
1257 channel_broadcaster_init(server, &broadcaster);
1258
1259 gpr_atm_rel_store(&server->shutdown_flag, 1);
1260
1261 /* collect all unregistered then registered calls */
1262 gpr_mu_lock(&server->mu_call);
1263 kill_pending_work_locked(
1264 server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1265 gpr_mu_unlock(&server->mu_call);
1266
1267 maybe_finish_shutdown(server);
1268 gpr_mu_unlock(&server->mu_global);
1269
1270 /* Shutdown listeners */
1271 for (l = server->listeners; l; l = l->next) {
1272 GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,
1273 grpc_schedule_on_exec_ctx);
1274 l->destroy(server, l->arg, &l->destroy_done);
1275 }
1276
1277 channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,
1278 GRPC_ERROR_NONE);
1279 }
1280
grpc_server_cancel_all_calls(grpc_server * server)1281 void grpc_server_cancel_all_calls(grpc_server* server) {
1282 channel_broadcaster broadcaster;
1283 grpc_core::ExecCtx exec_ctx;
1284
1285 GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1286
1287 gpr_mu_lock(&server->mu_global);
1288 channel_broadcaster_init(server, &broadcaster);
1289 gpr_mu_unlock(&server->mu_global);
1290
1291 channel_broadcaster_shutdown(
1292 &broadcaster, false /* send_goaway */,
1293 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1294 }
1295
grpc_server_destroy(grpc_server * server)1296 void grpc_server_destroy(grpc_server* server) {
1297 listener* l;
1298 grpc_core::ExecCtx exec_ctx;
1299
1300 GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1301
1302 gpr_mu_lock(&server->mu_global);
1303 GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1304 GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1305
1306 while (server->listeners) {
1307 l = server->listeners;
1308 server->listeners = l->next;
1309 gpr_free(l);
1310 }
1311
1312 gpr_mu_unlock(&server->mu_global);
1313
1314 server_unref(server);
1315 }
1316
grpc_server_add_listener(grpc_server * server,void * arg,void (* start)(grpc_server * server,void * arg,grpc_pollset ** pollsets,size_t pollset_count),void (* destroy)(grpc_server * server,void * arg,grpc_closure * on_done))1317 void grpc_server_add_listener(grpc_server* server, void* arg,
1318 void (*start)(grpc_server* server, void* arg,
1319 grpc_pollset** pollsets,
1320 size_t pollset_count),
1321 void (*destroy)(grpc_server* server, void* arg,
1322 grpc_closure* on_done)) {
1323 listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));
1324 l->arg = arg;
1325 l->start = start;
1326 l->destroy = destroy;
1327 l->next = server->listeners;
1328 server->listeners = l;
1329 }
1330
queue_call_request(grpc_server * server,size_t cq_idx,requested_call * rc)1331 static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1332 requested_call* rc) {
1333 call_data* calld = nullptr;
1334 request_matcher* rm = nullptr;
1335 if (gpr_atm_acq_load(&server->shutdown_flag)) {
1336 fail_call(server, cq_idx, rc,
1337 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1338 return GRPC_CALL_OK;
1339 }
1340 switch (rc->type) {
1341 case BATCH_CALL:
1342 rm = &server->unregistered_request_matcher;
1343 break;
1344 case REGISTERED_CALL:
1345 rm = &rc->data.registered.method->matcher;
1346 break;
1347 }
1348 if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {
1349 /* this was the first queued request: we need to lock and start
1350 matching calls */
1351 gpr_mu_lock(&server->mu_call);
1352 while ((calld = rm->pending_head) != nullptr) {
1353 rc = reinterpret_cast<requested_call*>(
1354 gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));
1355 if (rc == nullptr) break;
1356 rm->pending_head = calld->pending_next;
1357 gpr_mu_unlock(&server->mu_call);
1358 if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {
1359 // Zombied Call
1360 GRPC_CLOSURE_INIT(
1361 &calld->kill_zombie_closure, kill_zombie,
1362 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
1363 grpc_schedule_on_exec_ctx);
1364 GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);
1365 } else {
1366 publish_call(server, calld, cq_idx, rc);
1367 }
1368 gpr_mu_lock(&server->mu_call);
1369 }
1370 gpr_mu_unlock(&server->mu_call);
1371 }
1372 return GRPC_CALL_OK;
1373 }
1374
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * initial_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1375 grpc_call_error grpc_server_request_call(
1376 grpc_server* server, grpc_call** call, grpc_call_details* details,
1377 grpc_metadata_array* initial_metadata,
1378 grpc_completion_queue* cq_bound_to_call,
1379 grpc_completion_queue* cq_for_notification, void* tag) {
1380 grpc_call_error error;
1381 grpc_core::ExecCtx exec_ctx;
1382 requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
1383 GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1384 GRPC_API_TRACE(
1385 "grpc_server_request_call("
1386 "server=%p, call=%p, details=%p, initial_metadata=%p, "
1387 "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1388 7,
1389 (server, call, details, initial_metadata, cq_bound_to_call,
1390 cq_for_notification, tag));
1391 size_t cq_idx;
1392 for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
1393 if (server->cqs[cq_idx] == cq_for_notification) {
1394 break;
1395 }
1396 }
1397 if (cq_idx == server->cq_count) {
1398 gpr_free(rc);
1399 error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1400 goto done;
1401 }
1402 if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1403 gpr_free(rc);
1404 error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1405 goto done;
1406 }
1407 details->reserved = nullptr;
1408 rc->cq_idx = cq_idx;
1409 rc->type = BATCH_CALL;
1410 rc->server = server;
1411 rc->tag = tag;
1412 rc->cq_bound_to_call = cq_bound_to_call;
1413 rc->call = call;
1414 rc->data.batch.details = details;
1415 rc->initial_metadata = initial_metadata;
1416 error = queue_call_request(server, cq_idx, rc);
1417 done:
1418
1419 return error;
1420 }
1421
grpc_server_request_registered_call(grpc_server * server,void * rmp,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * initial_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1422 grpc_call_error grpc_server_request_registered_call(
1423 grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1424 grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1425 grpc_completion_queue* cq_bound_to_call,
1426 grpc_completion_queue* cq_for_notification, void* tag) {
1427 grpc_call_error error;
1428 grpc_core::ExecCtx exec_ctx;
1429 requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));
1430 registered_method* rm = static_cast<registered_method*>(rmp);
1431 GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1432 GRPC_API_TRACE(
1433 "grpc_server_request_registered_call("
1434 "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1435 "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1436 "tag=%p)",
1437 9,
1438 (server, rmp, call, deadline, initial_metadata, optional_payload,
1439 cq_bound_to_call, cq_for_notification, tag));
1440
1441 size_t cq_idx;
1442 for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {
1443 if (server->cqs[cq_idx] == cq_for_notification) {
1444 break;
1445 }
1446 }
1447 if (cq_idx == server->cq_count) {
1448 gpr_free(rc);
1449 error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1450 goto done;
1451 }
1452 if ((optional_payload == nullptr) !=
1453 (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {
1454 gpr_free(rc);
1455 error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1456 goto done;
1457 }
1458 if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
1459 gpr_free(rc);
1460 error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1461 goto done;
1462 }
1463 rc->cq_idx = cq_idx;
1464 rc->type = REGISTERED_CALL;
1465 rc->server = server;
1466 rc->tag = tag;
1467 rc->cq_bound_to_call = cq_bound_to_call;
1468 rc->call = call;
1469 rc->data.registered.method = rm;
1470 rc->data.registered.deadline = deadline;
1471 rc->initial_metadata = initial_metadata;
1472 rc->data.registered.optional_payload = optional_payload;
1473 error = queue_call_request(server, cq_idx, rc);
1474 done:
1475
1476 return error;
1477 }
1478
fail_call(grpc_server * server,size_t cq_idx,requested_call * rc,grpc_error * error)1479 static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1480 grpc_error* error) {
1481 *rc->call = nullptr;
1482 rc->initial_metadata->count = 0;
1483 GPR_ASSERT(error != GRPC_ERROR_NONE);
1484
1485 grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1486 &rc->completion);
1487 }
1488
grpc_server_get_channel_args(grpc_server * server)1489 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
1490 return server->channel_args;
1491 }
1492
grpc_server_has_open_connections(grpc_server * server)1493 int grpc_server_has_open_connections(grpc_server* server) {
1494 int r;
1495 gpr_mu_lock(&server->mu_global);
1496 r = server->root_channel_data.next != &server->root_channel_data;
1497 gpr_mu_unlock(&server->mu_global);
1498 return r;
1499 }
1500
grpc_server_get_channelz_node(grpc_server * server)1501 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
1502 grpc_server* server) {
1503 if (server == nullptr) {
1504 return nullptr;
1505 }
1506 return server->channelz_server.get();
1507 }
1508