• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015-2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/server.h"
22 
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <algorithm>
28 #include <atomic>
29 #include <iterator>
30 #include <list>
31 #include <utility>
32 #include <vector>
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 
38 #include "absl/types/optional.h"
39 
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/channelz.h"
42 #include "src/core/lib/channel/connected_channel.h"
43 #include "src/core/lib/debug/stats.h"
44 #include "src/core/lib/gpr/spinlock.h"
45 #include "src/core/lib/gpr/string.h"
46 #include "src/core/lib/gprpp/mpscq.h"
47 #include "src/core/lib/iomgr/executor.h"
48 #include "src/core/lib/iomgr/iomgr.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/surface/api_trace.h"
51 #include "src/core/lib/surface/call.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/completion_queue.h"
54 #include "src/core/lib/surface/init.h"
55 #include "src/core/lib/transport/metadata.h"
56 #include "src/core/lib/transport/static_metadata.h"
57 
58 namespace grpc_core {
59 
60 TraceFlag grpc_server_channel_trace(false, "server_channel");
61 
62 namespace {
63 
64 void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
65 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error);
66 
67 struct Listener {
Listenergrpc_core::__anonb082b28c0111::Listener68   explicit Listener(OrphanablePtr<ServerListenerInterface> l)
69       : listener(std::move(l)) {}
70 
71   OrphanablePtr<ServerListenerInterface> listener;
72   grpc_closure destroy_done;
73 };
74 
75 enum class RequestedCallType { BATCH_CALL, REGISTERED_CALL };
76 
77 struct registered_method;
78 
79 struct requested_call {
requested_callgrpc_core::__anonb082b28c0111::requested_call80   requested_call(void* tag_arg, grpc_completion_queue* call_cq,
81                  grpc_call** call_arg, grpc_metadata_array* initial_md,
82                  grpc_call_details* details)
83       : type(RequestedCallType::BATCH_CALL),
84         tag(tag_arg),
85         cq_bound_to_call(call_cq),
86         call(call_arg),
87         initial_metadata(initial_md) {
88     details->reserved = nullptr;
89     data.batch.details = details;
90   }
91 
requested_callgrpc_core::__anonb082b28c0111::requested_call92   requested_call(void* tag_arg, grpc_completion_queue* call_cq,
93                  grpc_call** call_arg, grpc_metadata_array* initial_md,
94                  registered_method* rm, gpr_timespec* deadline,
95                  grpc_byte_buffer** optional_payload)
96       : type(RequestedCallType::REGISTERED_CALL),
97         tag(tag_arg),
98         cq_bound_to_call(call_cq),
99         call(call_arg),
100         initial_metadata(initial_md) {
101     data.registered.method = rm;
102     data.registered.deadline = deadline;
103     data.registered.optional_payload = optional_payload;
104   }
105 
106   MultiProducerSingleConsumerQueue::Node mpscq_node;
107   const RequestedCallType type;
108   void* const tag;
109   grpc_completion_queue* const cq_bound_to_call;
110   grpc_call** const call;
111   grpc_cq_completion completion;
112   grpc_metadata_array* const initial_metadata;
113   union {
114     struct {
115       grpc_call_details* details;
116     } batch;
117     struct {
118       registered_method* method;
119       gpr_timespec* deadline;
120       grpc_byte_buffer** optional_payload;
121     } registered;
122   } data;
123 };
124 
125 struct channel_registered_method {
126   registered_method* server_registered_method = nullptr;
127   uint32_t flags;
128   bool has_host;
129   ExternallyManagedSlice method;
130   ExternallyManagedSlice host;
131 };
132 
133 struct channel_data {
134   channel_data() = default;
135   ~channel_data();
136 
137   grpc_server* server = nullptr;
138   grpc_channel* channel;
139   size_t cq_idx;
140   absl::optional<std::list<channel_data*>::iterator> list_position;
141 
142   // registered_methods is a hash-table of the methods and hosts of the
143   // registered methods.
144   // TODO(vjpai): Convert this to an STL map type as opposed to a direct bucket
145   // implementation. (Consider performance impact, hash function to use, etc.)
146   std::unique_ptr<std::vector<channel_registered_method>> registered_methods;
147   uint32_t registered_method_max_probes;
148 
149   grpc_closure finish_destroy_channel_closure;
150   intptr_t channelz_socket_uuid;
151 };
152 
153 struct shutdown_tag {
shutdown_taggrpc_core::__anonb082b28c0111::shutdown_tag154   shutdown_tag(void* tag_arg, grpc_completion_queue* cq_arg)
155       : tag(tag_arg), cq(cq_arg) {}
156 
157   void* const tag;
158   grpc_completion_queue* const cq;
159   grpc_cq_completion completion;
160 };
161 
162 enum class CallState {
163   /* waiting for metadata */
164   NOT_STARTED,
165   /* initial metadata read, not flow controlled in yet */
166   PENDING,
167   /* flow controlled in, on completion queue */
168   ACTIVATED,
169   /* cancelled before being queued */
170   ZOMBIED
171 };
172 
173 struct call_data;
174 
175 grpc_call_error ValidateServerRequest(
176     grpc_completion_queue* cq_for_notification, void* tag,
177     grpc_byte_buffer** optional_payload, registered_method* rm);
178 
179 // RPCs that come in from the transport must be matched against RPC requests
180 // from the application. An incoming request from the application can be matched
181 // to an RPC that has already arrived or can be queued up for later use.
182 // Likewise, an RPC coming in from the transport can either be matched to a
183 // request that already arrived from the application or can be queued up for
184 // later use (marked pending). If there is a match, the request's tag is posted
185 // on the request's notification CQ.
186 //
187 // RequestMatcherInterface is the base class to provide this functionality.
188 class RequestMatcherInterface {
189  public:
~RequestMatcherInterface()190   virtual ~RequestMatcherInterface() {}
191 
192   // Unref the calls associated with any incoming RPCs in the pending queue (not
193   // yet matched to an application-requested RPC).
194   virtual void ZombifyPending() = 0;
195 
196   // Mark all application-requested RPCs failed if they have not been matched to
197   // an incoming RPC. The error parameter indicates why the RPCs are being
198   // failed (always server shutdown in all current implementations).
199   virtual void KillRequests(grpc_error* error) = 0;
200 
201   // How many request queues are supported by this matcher. This is an abstract
202   // concept that essentially maps to gRPC completion queues.
203   virtual size_t request_queue_count() const = 0;
204 
205   // This function is invoked when the application requests a new RPC whose
206   // information is in the call parameter. The request_queue_index marks the
207   // queue onto which to place this RPC, and is typically associated with a gRPC
208   // CQ. If there are pending RPCs waiting to be matched, publish one (match it
209   // and notify the CQ).
210   virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
211                                               requested_call* call) = 0;
212 
213   // This function is invoked on an incoming RPC, represented by the calld
214   // object. The RequestMatcher will try to match it against an
215   // application-requested RPC if possible or will place it in the pending queue
216   // otherwise. To enable some measure of fairness between server CQs, the match
217   // is done starting at the start_request_queue_index parameter in a cyclic
218   // order rather than always starting at 0.
219   virtual void MatchOrQueue(size_t start_request_queue_index,
220                             call_data* calld) = 0;
221 
222   // Returns the server associated with this request matcher
223   virtual grpc_server* server() const = 0;
224 };
225 
226 struct call_data {
call_datagrpc_core::__anonb082b28c0111::call_data227   call_data(grpc_call_element* elem, const grpc_call_element_args& args)
228       : call(grpc_call_from_top_element(elem)),
229         call_combiner(args.call_combiner) {
230     GRPC_CLOSURE_INIT(&on_recv_initial_metadata,
231                       server_on_recv_initial_metadata, elem,
232                       grpc_schedule_on_exec_ctx);
233     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
234                       server_recv_trailing_metadata_ready, elem,
235                       grpc_schedule_on_exec_ctx);
236   }
~call_datagrpc_core::__anonb082b28c0111::call_data237   ~call_data() {
238     GPR_ASSERT(state.Load(grpc_core::MemoryOrder::RELAXED) !=
239                CallState::PENDING);
240     GRPC_ERROR_UNREF(recv_initial_metadata_error);
241     if (host_set) {
242       grpc_slice_unref_internal(host);
243     }
244     if (path_set) {
245       grpc_slice_unref_internal(path);
246     }
247     grpc_metadata_array_destroy(&initial_metadata);
248     grpc_byte_buffer_destroy(payload);
249   }
250 
251   grpc_call* call;
252 
253   Atomic<CallState> state{CallState::NOT_STARTED};
254 
255   bool path_set = false;
256   bool host_set = false;
257   grpc_slice path;
258   grpc_slice host;
259   grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
260 
261   grpc_completion_queue* cq_new = nullptr;
262 
263   grpc_metadata_batch* recv_initial_metadata = nullptr;
264   uint32_t recv_initial_metadata_flags = 0;
265   grpc_metadata_array initial_metadata =
266       grpc_metadata_array();  // Zero-initialize the C struct.
267 
268   RequestMatcherInterface* matcher = nullptr;
269   grpc_byte_buffer* payload = nullptr;
270 
271   grpc_closure got_initial_metadata;
272   grpc_closure on_recv_initial_metadata;
273   grpc_closure kill_zombie_closure;
274   grpc_closure* on_done_recv_initial_metadata;
275   grpc_closure recv_trailing_metadata_ready;
276   grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
277   grpc_closure* original_recv_trailing_metadata_ready;
278   grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
279   bool seen_recv_trailing_metadata_ready = false;
280 
281   grpc_closure publish;
282 
283   CallCombiner* call_combiner;
284 };
285 
286 struct registered_method {
registered_methodgrpc_core::__anonb082b28c0111::registered_method287   registered_method(
288       const char* method_arg, const char* host_arg,
289       grpc_server_register_method_payload_handling payload_handling_arg,
290       uint32_t flags_arg)
291       : method(method_arg == nullptr ? "" : method_arg),
292         host(host_arg == nullptr ? "" : host_arg),
293         payload_handling(payload_handling_arg),
294         flags(flags_arg) {}
295 
296   ~registered_method() = default;
297 
298   const std::string method;
299   const std::string host;
300   const grpc_server_register_method_payload_handling payload_handling;
301   const uint32_t flags;
302   /* one request matcher per method */
303   std::unique_ptr<RequestMatcherInterface> matcher;
304 };
305 
306 }  // namespace
307 }  // namespace grpc_core
308 
309 struct grpc_server {
grpc_servergrpc_server310   explicit grpc_server(const grpc_channel_args* args)
311       : channel_args(grpc_channel_args_copy(args)) {
312     if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
313                                     GRPC_ENABLE_CHANNELZ_DEFAULT)) {
314       size_t channel_tracer_max_memory = grpc_channel_args_find_integer(
315           args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE,
316           {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
317       channelz_server =
318           grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
319               this, channel_tracer_max_memory);
320       channelz_server->AddTraceEvent(
321           grpc_core::channelz::ChannelTrace::Severity::Info,
322           grpc_slice_from_static_string("Server created"));
323     }
324 
325     if (args != nullptr) {
326       grpc_resource_quota* resource_quota =
327           grpc_resource_quota_from_channel_args(args, false /* create */);
328       if (resource_quota != nullptr) {
329         default_resource_user =
330             grpc_resource_user_create(resource_quota, "default");
331       }
332     }
333   }
334 
~grpc_servergrpc_server335   ~grpc_server() {
336     grpc_channel_args_destroy(channel_args);
337     for (size_t i = 0; i < cqs.size(); i++) {
338       GRPC_CQ_INTERNAL_UNREF(cqs[i], "server");
339     }
340   }
341 
342   grpc_channel_args* const channel_args;
343 
344   grpc_resource_user* default_resource_user = nullptr;
345 
346   std::vector<grpc_completion_queue*> cqs;
347   std::vector<grpc_pollset*> pollsets;
348   bool started = false;
349 
350   /* The two following mutexes control access to server-state
351      mu_global controls access to non-call-related state (e.g., channel state)
352      mu_call controls access to call-related state (e.g., the call lists)
353 
354      If they are ever required to be nested, you must lock mu_global
355      before mu_call. This is currently used in shutdown processing
356      (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
357   grpc_core::Mutex mu_global;  // mutex for server and channel state
358   grpc_core::Mutex mu_call;    // mutex for call-specific state
359 
360   /* startup synchronization: flag is protected by mu_global, signals whether
361      we are doing the listener start routine or not */
362   bool starting = false;
363   grpc_core::CondVar starting_cv;
364 
365   std::vector<std::unique_ptr<grpc_core::registered_method>> registered_methods;
366 
367   // one request matcher for unregistered methods
368   std::unique_ptr<grpc_core::RequestMatcherInterface>
369       unregistered_request_matcher;
370 
371   std::atomic_bool shutdown_flag{false};
372   bool shutdown_published = false;
373   std::vector<grpc_core::shutdown_tag> shutdown_tags;
374 
375   std::list<grpc_core::channel_data*> channels;
376 
377   std::list<grpc_core::Listener> listeners;
378   size_t listeners_destroyed = 0;
379   grpc_core::RefCount internal_refcount;
380 
381   /** when did we print the last shutdown progress message */
382   gpr_timespec last_shutdown_message_time;
383 
384   grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
385 };
386 
387 // Non-API functions of the server that are only for gRPC core internal use.
388 // TODO(markdroth): Make these class member functions
grpc_server_add_listener(grpc_server * server,grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener)389 void grpc_server_add_listener(
390     grpc_server* server,
391     grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener) {
392   grpc_core::channelz::ListenSocketNode* listen_socket_node =
393       listener->channelz_listen_socket_node();
394   if (listen_socket_node != nullptr && server->channelz_server != nullptr) {
395     server->channelz_server->AddChildListenSocket(listen_socket_node->Ref());
396   }
397   server->listeners.emplace_back(std::move(listener));
398 }
399 
grpc_server_get_channel_args(grpc_server * server)400 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
401   return server->channel_args;
402 }
403 
grpc_server_get_default_resource_user(grpc_server * server)404 grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
405   return server->default_resource_user;
406 }
407 
grpc_server_has_open_connections(grpc_server * server)408 bool grpc_server_has_open_connections(grpc_server* server) {
409   grpc_core::MutexLock lock(&server->mu_global);
410   return !server->channels.empty();
411 }
412 
grpc_server_get_channelz_node(grpc_server * server)413 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
414     grpc_server* server) {
415   if (server == nullptr) {
416     return nullptr;
417   }
418   return server->channelz_server.get();
419 }
420 
421 namespace grpc_core {
422 namespace {
423 
424 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
425                   requested_call* rc);
426 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
427                grpc_error* error);
428 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
429    hold mu_call */
430 void maybe_finish_shutdown(grpc_server* server);
431 
kill_zombie(void * elem,grpc_error *)432 void kill_zombie(void* elem, grpc_error* /*error*/) {
433   grpc_call_unref(
434       grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
435 }
436 
437 // Validate a requested RPC for a server CQ and bind it to that CQ
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)438 grpc_call_error ValidateServerRequest(
439     grpc_completion_queue* cq_for_notification, void* tag,
440     grpc_byte_buffer** optional_payload, registered_method* rm) {
441   if ((rm == nullptr && optional_payload != nullptr) ||
442       ((rm != nullptr) && ((optional_payload == nullptr) !=
443                            (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
444     return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
445   }
446   if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
447     return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
448   }
449   return GRPC_CALL_OK;
450 }
451 
452 // Validate that a requested RPC has a valid server CQ and is valid, and bind it
ValidateServerRequestAndCq(size_t * cq_idx,grpc_server * server,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)453 grpc_call_error ValidateServerRequestAndCq(
454     size_t* cq_idx, grpc_server* server,
455     grpc_completion_queue* cq_for_notification, void* tag,
456     grpc_byte_buffer** optional_payload, registered_method* rm) {
457   size_t idx;
458   for (idx = 0; idx < server->cqs.size(); idx++) {
459     if (server->cqs[idx] == cq_for_notification) {
460       break;
461     }
462   }
463   if (idx == server->cqs.size()) {
464     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
465   }
466   grpc_call_error error =
467       ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
468   if (error != GRPC_CALL_OK) {
469     return error;
470   }
471 
472   *cq_idx = idx;
473   return GRPC_CALL_OK;
474 }
475 /*
476  * channel broadcaster
477  */
478 
479 struct shutdown_cleanup_args {
480   grpc_closure closure;
481   grpc_slice slice;
482 };
483 
shutdown_cleanup(void * arg,grpc_error *)484 void shutdown_cleanup(void* arg, grpc_error* /*error*/) {
485   shutdown_cleanup_args* a = static_cast<shutdown_cleanup_args*>(arg);
486   grpc_slice_unref_internal(a->slice);
487   delete a;
488 }
489 
send_shutdown(grpc_channel * channel,bool send_goaway,grpc_error * send_disconnect)490 void send_shutdown(grpc_channel* channel, bool send_goaway,
491                    grpc_error* send_disconnect) {
492   shutdown_cleanup_args* sc = new shutdown_cleanup_args;
493   GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
494                     grpc_schedule_on_exec_ctx);
495   grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
496   grpc_channel_element* elem;
497 
498   op->goaway_error =
499       send_goaway ? grpc_error_set_int(
500                         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
501                         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
502                   : GRPC_ERROR_NONE;
503   op->set_accept_stream = true;
504   sc->slice = grpc_slice_from_copied_string("Server shutdown");
505   op->disconnect_with_error = send_disconnect;
506 
507   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
508   elem->filter->start_transport_op(elem, op);
509 }
510 
511 class ChannelBroadcaster {
512  public:
513   // This can have an empty constructor and destructor since we want to control
514   // when the actual setup and shutdown broadcast take place
515 
516   // This function copies over the channels from the locked server
FillChannelsLocked(const grpc_server * s)517   void FillChannelsLocked(const grpc_server* s) {
518     GPR_DEBUG_ASSERT(channels_.empty());
519     channels_.reserve(s->channels.size());
520     for (const channel_data* chand : s->channels) {
521       channels_.push_back(chand->channel);
522       GRPC_CHANNEL_INTERNAL_REF(chand->channel, "broadcast");
523     }
524   }
525 
526   // Broadcast a shutdown on each channel
BroadcastShutdown(bool send_goaway,grpc_error * force_disconnect)527   void BroadcastShutdown(bool send_goaway, grpc_error* force_disconnect) {
528     for (grpc_channel* channel : channels_) {
529       send_shutdown(channel, send_goaway, GRPC_ERROR_REF(force_disconnect));
530       GRPC_CHANNEL_INTERNAL_UNREF(channel, "broadcast");
531     }
532     channels_.clear();  // just for safety against double broadcast
533     GRPC_ERROR_UNREF(force_disconnect);
534   }
535 
536  private:
537   std::vector<grpc_channel*> channels_;
538 };
539 
540 /*
541  * request_matcher
542  */
543 
544 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
545 // actually uses all the features of RequestMatcherInterface: expecting the
546 // application to explicitly request RPCs and then matching those to incoming
547 // RPCs, along with a slow path by which incoming RPCs are put on a locked
548 // pending list if they aren't able to be matched to an application request.
549 class RealRequestMatcher : public RequestMatcherInterface {
550  public:
RealRequestMatcher(grpc_server * server)551   explicit RealRequestMatcher(grpc_server* server)
552       : server_(server), requests_per_cq_(server->cqs.size()) {}
553 
~RealRequestMatcher()554   ~RealRequestMatcher() override {
555     for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
556       GPR_ASSERT(queue.Pop() == nullptr);
557     }
558   }
559 
ZombifyPending()560   void ZombifyPending() override {
561     for (call_data* calld : pending_) {
562       calld->state.Store(CallState::ZOMBIED, grpc_core::MemoryOrder::RELAXED);
563       GRPC_CLOSURE_INIT(
564           &calld->kill_zombie_closure, kill_zombie,
565           grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
566           grpc_schedule_on_exec_ctx);
567       ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
568                    GRPC_ERROR_NONE);
569     }
570     pending_.clear();
571   }
572 
KillRequests(grpc_error * error)573   void KillRequests(grpc_error* error) override {
574     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
575       requested_call* rc;
576       while ((rc = reinterpret_cast<requested_call*>(
577                   requests_per_cq_[i].Pop())) != nullptr) {
578         fail_call(server_, i, rc, GRPC_ERROR_REF(error));
579       }
580     }
581     GRPC_ERROR_UNREF(error);
582   }
583 
request_queue_count() const584   size_t request_queue_count() const override {
585     return requests_per_cq_.size();
586   }
587 
RequestCallWithPossiblePublish(size_t request_queue_index,requested_call * call)588   void RequestCallWithPossiblePublish(size_t request_queue_index,
589                                       requested_call* call) override {
590     if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
591       /* this was the first queued request: we need to lock and start
592          matching calls */
593       struct PendingCall {
594         requested_call* rc = nullptr;
595         call_data* calld;
596       };
597       auto pop_next_pending = [this, request_queue_index] {
598         PendingCall pending;
599         {
600           MutexLock lock(&server_->mu_call);
601           if (!pending_.empty()) {
602             pending.rc = reinterpret_cast<requested_call*>(
603                 requests_per_cq_[request_queue_index].Pop());
604             if (pending.rc != nullptr) {
605               pending.calld = pending_.front();
606               pending_.pop_front();
607             }
608           }
609         }
610         return pending;
611       };
612       while (true) {
613         PendingCall next_pending = pop_next_pending();
614         if (next_pending.rc == nullptr) break;
615         CallState expect_pending = CallState::PENDING;
616         if (!next_pending.calld->state.CompareExchangeStrong(
617                 &expect_pending, CallState::ACTIVATED,
618                 grpc_core::MemoryOrder::ACQ_REL,
619                 grpc_core::MemoryOrder::RELAXED)) {
620           // Zombied Call
621           GRPC_CLOSURE_INIT(
622               &next_pending.calld->kill_zombie_closure, kill_zombie,
623               grpc_call_stack_element(
624                   grpc_call_get_call_stack(next_pending.calld->call), 0),
625               grpc_schedule_on_exec_ctx);
626           ExecCtx::Run(DEBUG_LOCATION, &next_pending.calld->kill_zombie_closure,
627                        GRPC_ERROR_NONE);
628         } else {
629           publish_call(server_, next_pending.calld, request_queue_index,
630                        next_pending.rc);
631         }
632       }
633     }
634   }
635 
MatchOrQueue(size_t start_request_queue_index,call_data * calld)636   void MatchOrQueue(size_t start_request_queue_index,
637                     call_data* calld) override {
638     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
639       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
640       requested_call* rc =
641           reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].TryPop());
642       if (rc == nullptr) {
643         continue;
644       } else {
645         GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
646         calld->state.Store(CallState::ACTIVATED,
647                            grpc_core::MemoryOrder::RELAXED);
648         publish_call(server_, calld, cq_idx, rc);
649         return; /* early out */
650       }
651     }
652 
653     /* no cq to take the request found: queue it on the slow list */
654     GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
655 
656     // We need to ensure that all the queues are empty.  We do this under
657     // the server mu_call lock to ensure that if something is added to
658     // an empty request queue, it will block until the call is actually
659     // added to the pending list.
660     requested_call* rc = nullptr;
661     size_t cq_idx = 0;
662     size_t loop_count;
663     {
664       MutexLock lock(&server_->mu_call);
665       for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
666         cq_idx =
667             (start_request_queue_index + loop_count) % requests_per_cq_.size();
668         rc = reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].Pop());
669         if (rc != nullptr) {
670           break;
671         }
672       }
673       if (rc == nullptr) {
674         calld->state.Store(CallState::PENDING, grpc_core::MemoryOrder::RELAXED);
675         pending_.push_back(calld);
676         return;
677       }
678     }
679     GRPC_STATS_INC_SERVER_CQS_CHECKED(loop_count + requests_per_cq_.size());
680     calld->state.Store(CallState::ACTIVATED, grpc_core::MemoryOrder::RELAXED);
681     publish_call(server_, calld, cq_idx, rc);
682   }
683 
server() const684   grpc_server* server() const override { return server_; }
685 
686  private:
687   grpc_server* const server_;
688   std::list<call_data*> pending_;
689   std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
690 };
691 
692 // AllocatingRequestMatchers don't allow the application to request an RPC in
693 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
694 // will call out to an allocation function passed in at the construction of the
695 // object. These request matchers are designed for the C++ callback API, so they
696 // only support 1 completion queue (passed in at the constructor).
697 class AllocatingRequestMatcherBase : public RequestMatcherInterface {
698  public:
AllocatingRequestMatcherBase(grpc_server * server,grpc_completion_queue * cq)699   AllocatingRequestMatcherBase(grpc_server* server, grpc_completion_queue* cq)
700       : server_(server), cq_(cq) {
701     size_t idx;
702     for (idx = 0; idx < server->cqs.size(); idx++) {
703       if (server->cqs[idx] == cq) {
704         break;
705       }
706     }
707     GPR_ASSERT(idx < server->cqs.size());
708     cq_idx_ = idx;
709   }
710 
ZombifyPending()711   void ZombifyPending() override {}
712 
KillRequests(grpc_error * error)713   void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); }
714 
request_queue_count() const715   size_t request_queue_count() const override { return 0; }
716 
RequestCallWithPossiblePublish(size_t,requested_call *)717   void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
718                                       requested_call* /*call*/) final {
719     GPR_ASSERT(false);
720   }
721 
server() const722   grpc_server* server() const override { return server_; }
723 
724   // Supply the completion queue related to this request matcher
cq() const725   grpc_completion_queue* cq() const { return cq_; }
726 
727   // Supply the completion queue's index relative to the server.
cq_idx() const728   size_t cq_idx() const { return cq_idx_; }
729 
730  private:
731   grpc_server* const server_;
732   grpc_completion_queue* const cq_;
733   size_t cq_idx_;
734 };
735 
736 // An allocating request matcher for non-registered methods (used for generic
737 // API and unimplemented RPCs).
738 class AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase {
739  public:
AllocatingRequestMatcherBatch(grpc_server * server,grpc_completion_queue * cq,std::function<ServerBatchCallAllocation ()> allocator)740   AllocatingRequestMatcherBatch(
741       grpc_server* server, grpc_completion_queue* cq,
742       std::function<ServerBatchCallAllocation()> allocator)
743       : AllocatingRequestMatcherBase(server, cq),
744         allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)745   void MatchOrQueue(size_t /*start_request_queue_index*/,
746                     call_data* calld) override {
747     ServerBatchCallAllocation call_info = allocator_();
748     GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
749                                      nullptr, nullptr) == GRPC_CALL_OK);
750     requested_call* rc = new requested_call(
751         static_cast<void*>(call_info.tag), cq(), call_info.call,
752         call_info.initial_metadata, call_info.details);
753     calld->state.Store(CallState::ACTIVATED, grpc_core::MemoryOrder::RELAXED);
754     publish_call(server(), calld, cq_idx(), rc);
755   }
756 
757  private:
758   std::function<ServerBatchCallAllocation()> allocator_;
759 };
760 
761 // An allocating request matcher for registered methods.
762 class AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase {
763  public:
AllocatingRequestMatcherRegistered(grpc_server * server,grpc_completion_queue * cq,registered_method * rm,std::function<ServerRegisteredCallAllocation ()> allocator)764   AllocatingRequestMatcherRegistered(
765       grpc_server* server, grpc_completion_queue* cq, registered_method* rm,
766       std::function<ServerRegisteredCallAllocation()> allocator)
767       : AllocatingRequestMatcherBase(server, cq),
768         registered_method_(rm),
769         allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)770   void MatchOrQueue(size_t /*start_request_queue_index*/,
771                     call_data* calld) override {
772     ServerRegisteredCallAllocation call_info = allocator_();
773     GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
774                                      call_info.optional_payload,
775                                      registered_method_) == GRPC_CALL_OK);
776     requested_call* rc = new requested_call(
777         static_cast<void*>(call_info.tag), cq(), call_info.call,
778         call_info.initial_metadata, registered_method_, call_info.deadline,
779         call_info.optional_payload);
780     calld->state.Store(CallState::ACTIVATED, grpc_core::MemoryOrder::RELAXED);
781     publish_call(server(), calld, cq_idx(), rc);
782   }
783 
784  private:
785   registered_method* const registered_method_;
786   std::function<ServerRegisteredCallAllocation()> allocator_;
787 };
788 
789 /*
790  * server proper
791  */
792 
server_ref(grpc_server * server)793 void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
794 
server_unref(grpc_server * server)795 void server_unref(grpc_server* server) {
796   if (GPR_UNLIKELY(server->internal_refcount.Unref())) {
797     delete server;
798   }
799 }
800 
finish_destroy_channel(void * cd,grpc_error *)801 void finish_destroy_channel(void* cd, grpc_error* /*error*/) {
802   channel_data* chand = static_cast<channel_data*>(cd);
803   grpc_server* server = chand->server;
804   GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
805   server_unref(server);
806 }
807 
destroy_channel(channel_data * chand)808 void destroy_channel(channel_data* chand) {
809   if (!chand->list_position.has_value()) return;
810   GPR_ASSERT(chand->server != nullptr);
811   chand->server->channels.erase(*chand->list_position);
812   chand->list_position.reset();
813   server_ref(chand->server);
814   maybe_finish_shutdown(chand->server);
815   GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
816                     finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
817 
818   if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
819     gpr_log(GPR_INFO, "Disconnected client");
820   }
821 
822   grpc_transport_op* op =
823       grpc_make_transport_op(&chand->finish_destroy_channel_closure);
824   op->set_accept_stream = true;
825   grpc_channel_next_op(grpc_channel_stack_element(
826                            grpc_channel_get_channel_stack(chand->channel), 0),
827                        op);
828 }
829 
done_request_event(void * req,grpc_cq_completion *)830 void done_request_event(void* req, grpc_cq_completion* /*c*/) {
831   delete static_cast<requested_call*>(req);
832 }
833 
publish_call(grpc_server * server,call_data * calld,size_t cq_idx,requested_call * rc)834 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
835                   requested_call* rc) {
836   grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
837   grpc_call* call = calld->call;
838   *rc->call = call;
839   calld->cq_new = server->cqs[cq_idx];
840   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
841   switch (rc->type) {
842     case RequestedCallType::BATCH_CALL:
843       GPR_ASSERT(calld->host_set);
844       GPR_ASSERT(calld->path_set);
845       rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
846       rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
847       rc->data.batch.details->deadline =
848           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
849       rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
850       break;
851     case RequestedCallType::REGISTERED_CALL:
852       *rc->data.registered.deadline =
853           grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
854       if (rc->data.registered.optional_payload) {
855         *rc->data.registered.optional_payload = calld->payload;
856         calld->payload = nullptr;
857       }
858       break;
859     default:
860       GPR_UNREACHABLE_CODE(return );
861   }
862 
863   grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
864                  rc, &rc->completion, true);
865 }
866 
publish_new_rpc(void * arg,grpc_error * error)867 void publish_new_rpc(void* arg, grpc_error* error) {
868   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
869   call_data* calld = static_cast<call_data*>(call_elem->call_data);
870   channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
871   RequestMatcherInterface* rm = calld->matcher;
872   grpc_server* server = rm->server();
873 
874   if (error != GRPC_ERROR_NONE ||
875       server->shutdown_flag.load(std::memory_order_acquire)) {
876     calld->state.Store(CallState::ZOMBIED, grpc_core::MemoryOrder::RELAXED);
877     GRPC_CLOSURE_INIT(
878         &calld->kill_zombie_closure, kill_zombie,
879         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
880         grpc_schedule_on_exec_ctx);
881     ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
882                  GRPC_ERROR_REF(error));
883     return;
884   }
885 
886   rm->MatchOrQueue(chand->cq_idx, calld);
887 }
888 
finish_start_new_rpc(grpc_server * server,grpc_call_element * elem,RequestMatcherInterface * rm,grpc_server_register_method_payload_handling payload_handling)889 void finish_start_new_rpc(
890     grpc_server* server, grpc_call_element* elem, RequestMatcherInterface* rm,
891     grpc_server_register_method_payload_handling payload_handling) {
892   call_data* calld = static_cast<call_data*>(elem->call_data);
893 
894   if (server->shutdown_flag.load(std::memory_order_acquire)) {
895     calld->state.Store(CallState::ZOMBIED, grpc_core::MemoryOrder::RELAXED);
896     GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
897                       grpc_schedule_on_exec_ctx);
898     ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
899     return;
900   }
901 
902   calld->matcher = rm;
903 
904   switch (payload_handling) {
905     case GRPC_SRM_PAYLOAD_NONE:
906       publish_new_rpc(elem, GRPC_ERROR_NONE);
907       break;
908     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
909       grpc_op op;
910       op.op = GRPC_OP_RECV_MESSAGE;
911       op.flags = 0;
912       op.reserved = nullptr;
913       op.data.recv_message.recv_message = &calld->payload;
914       GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
915                         grpc_schedule_on_exec_ctx);
916       grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
917       break;
918     }
919   }
920 }
921 
start_new_rpc(grpc_call_element * elem)922 void start_new_rpc(grpc_call_element* elem) {
923   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
924   call_data* calld = static_cast<call_data*>(elem->call_data);
925   grpc_server* server = chand->server;
926   uint32_t i;
927   uint32_t hash;
928   channel_registered_method* rm;
929 
930   if (chand->registered_methods && calld->path_set && calld->host_set) {
931     /* TODO(ctiller): unify these two searches */
932     /* check for an exact match with host */
933     hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash_internal(calld->host),
934                               grpc_slice_hash_internal(calld->path));
935     for (i = 0; i <= chand->registered_method_max_probes; i++) {
936       rm = &(*chand->registered_methods)[(hash + i) %
937                                          chand->registered_methods->size()];
938       if (rm->server_registered_method == nullptr) break;
939       if (!rm->has_host) continue;
940       if (rm->host != calld->host) continue;
941       if (rm->method != calld->path) continue;
942       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
943           0 == (calld->recv_initial_metadata_flags &
944                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
945         continue;
946       }
947       finish_start_new_rpc(server, elem,
948                            rm->server_registered_method->matcher.get(),
949                            rm->server_registered_method->payload_handling);
950       return;
951     }
952     /* check for a wildcard method definition (no host set) */
953     hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash_internal(calld->path));
954     for (i = 0; i <= chand->registered_method_max_probes; i++) {
955       rm = &(*chand->registered_methods)[(hash + i) %
956                                          chand->registered_methods->size()];
957       if (rm->server_registered_method == nullptr) break;
958       if (rm->has_host) continue;
959       if (rm->method != calld->path) continue;
960       if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
961           0 == (calld->recv_initial_metadata_flags &
962                 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
963         continue;
964       }
965       finish_start_new_rpc(server, elem,
966                            rm->server_registered_method->matcher.get(),
967                            rm->server_registered_method->payload_handling);
968       return;
969     }
970   }
971   finish_start_new_rpc(server, elem, server->unregistered_request_matcher.get(),
972                        GRPC_SRM_PAYLOAD_NONE);
973 }
974 
done_shutdown_event(void * server,grpc_cq_completion *)975 void done_shutdown_event(void* server, grpc_cq_completion* /*completion*/) {
976   server_unref(static_cast<grpc_server*>(server));
977 }
978 
num_channels(grpc_server * server)979 int num_channels(grpc_server* server) { return server->channels.size(); }
980 
kill_pending_work_locked(grpc_server * server,grpc_error * error)981 void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
982   if (server->started) {
983     server->unregistered_request_matcher->KillRequests(GRPC_ERROR_REF(error));
984     server->unregistered_request_matcher->ZombifyPending();
985     for (std::unique_ptr<registered_method>& rm : server->registered_methods) {
986       rm->matcher->KillRequests(GRPC_ERROR_REF(error));
987       rm->matcher->ZombifyPending();
988     }
989   }
990   GRPC_ERROR_UNREF(error);
991 }
992 
maybe_finish_shutdown(grpc_server * server)993 void maybe_finish_shutdown(grpc_server* server) {
994   size_t i;
995   if (!server->shutdown_flag.load(std::memory_order_acquire) ||
996       server->shutdown_published) {
997     return;
998   }
999 
1000   {
1001     MutexLock lock(&server->mu_call);
1002     kill_pending_work_locked(
1003         server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1004   }
1005 
1006   if (!server->channels.empty() ||
1007       server->listeners_destroyed < server->listeners.size()) {
1008     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
1009                                   server->last_shutdown_message_time),
1010                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
1011       server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1012       gpr_log(GPR_DEBUG,
1013               "Waiting for %d channels and %" PRIuPTR "/%" PRIuPTR
1014               " listeners to be destroyed before shutting down server",
1015               num_channels(server),
1016               server->listeners.size() - server->listeners_destroyed,
1017               server->listeners.size());
1018     }
1019     return;
1020   }
1021   server->shutdown_published = 1;
1022   for (i = 0; i < server->shutdown_tags.size(); i++) {
1023     server_ref(server);
1024     grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
1025                    GRPC_ERROR_NONE, done_shutdown_event, server,
1026                    &server->shutdown_tags[i].completion);
1027   }
1028 }
1029 
server_on_recv_initial_metadata(void * ptr,grpc_error * error)1030 void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
1031   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
1032   call_data* calld = static_cast<call_data*>(elem->call_data);
1033   grpc_millis op_deadline;
1034 
1035   if (error == GRPC_ERROR_NONE) {
1036     GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
1037     GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.authority !=
1038                      nullptr);
1039     calld->path = grpc_slice_ref_internal(
1040         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
1041     calld->host = grpc_slice_ref_internal(
1042         GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
1043     calld->path_set = true;
1044     calld->host_set = true;
1045     grpc_metadata_batch_remove(calld->recv_initial_metadata, GRPC_BATCH_PATH);
1046     grpc_metadata_batch_remove(calld->recv_initial_metadata,
1047                                GRPC_BATCH_AUTHORITY);
1048   } else {
1049     GRPC_ERROR_REF(error);
1050   }
1051   op_deadline = calld->recv_initial_metadata->deadline;
1052   if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
1053     calld->deadline = op_deadline;
1054   }
1055   if (calld->host_set && calld->path_set) {
1056     /* do nothing */
1057   } else {
1058     /* Pass the error reference to calld->recv_initial_metadata_error */
1059     grpc_error* src_error = error;
1060     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1061         "Missing :authority or :path", &src_error, 1);
1062     GRPC_ERROR_UNREF(src_error);
1063     calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1064   }
1065   grpc_closure* closure = calld->on_done_recv_initial_metadata;
1066   calld->on_done_recv_initial_metadata = nullptr;
1067   if (calld->seen_recv_trailing_metadata_ready) {
1068     GRPC_CALL_COMBINER_START(calld->call_combiner,
1069                              &calld->recv_trailing_metadata_ready,
1070                              calld->recv_trailing_metadata_error,
1071                              "continue server_recv_trailing_metadata_ready");
1072   }
1073   Closure::Run(DEBUG_LOCATION, closure, error);
1074 }
1075 
server_recv_trailing_metadata_ready(void * user_data,grpc_error * error)1076 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
1077   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
1078   call_data* calld = static_cast<call_data*>(elem->call_data);
1079   if (calld->on_done_recv_initial_metadata != nullptr) {
1080     calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
1081     calld->seen_recv_trailing_metadata_ready = true;
1082     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
1083                       server_recv_trailing_metadata_ready, elem,
1084                       grpc_schedule_on_exec_ctx);
1085     GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1086                             "deferring server_recv_trailing_metadata_ready "
1087                             "until after server_on_recv_initial_metadata");
1088     return;
1089   }
1090   error =
1091       grpc_error_add_child(GRPC_ERROR_REF(error),
1092                            GRPC_ERROR_REF(calld->recv_initial_metadata_error));
1093   Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready,
1094                error);
1095 }
1096 
server_mutate_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1097 void server_mutate_op(grpc_call_element* elem,
1098                       grpc_transport_stream_op_batch* op) {
1099   call_data* calld = static_cast<call_data*>(elem->call_data);
1100 
1101   if (op->recv_initial_metadata) {
1102     GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
1103     calld->recv_initial_metadata =
1104         op->payload->recv_initial_metadata.recv_initial_metadata;
1105     calld->on_done_recv_initial_metadata =
1106         op->payload->recv_initial_metadata.recv_initial_metadata_ready;
1107     op->payload->recv_initial_metadata.recv_initial_metadata_ready =
1108         &calld->on_recv_initial_metadata;
1109     op->payload->recv_initial_metadata.recv_flags =
1110         &calld->recv_initial_metadata_flags;
1111   }
1112   if (op->recv_trailing_metadata) {
1113     calld->original_recv_trailing_metadata_ready =
1114         op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1115     op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1116         &calld->recv_trailing_metadata_ready;
1117   }
1118 }
1119 
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1120 void server_start_transport_stream_op_batch(
1121     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
1122   server_mutate_op(elem, op);
1123   grpc_call_next_op(elem, op);
1124 }
1125 
got_initial_metadata(void * ptr,grpc_error * error)1126 void got_initial_metadata(void* ptr, grpc_error* error) {
1127   grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
1128   call_data* calld = static_cast<call_data*>(elem->call_data);
1129   if (error == GRPC_ERROR_NONE) {
1130     start_new_rpc(elem);
1131   } else {
1132     CallState expect_not_started = CallState::NOT_STARTED;
1133     CallState expect_pending = CallState::PENDING;
1134     if (calld->state.CompareExchangeStrong(
1135             &expect_not_started, CallState::ZOMBIED,
1136             grpc_core::MemoryOrder::ACQ_REL, grpc_core::MemoryOrder::RELAXED)) {
1137       GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
1138                         grpc_schedule_on_exec_ctx);
1139       ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
1140                    GRPC_ERROR_NONE);
1141     } else if (calld->state.CompareExchangeStrong(
1142                    &expect_pending, CallState::ZOMBIED,
1143                    grpc_core::MemoryOrder::ACQ_REL,
1144                    grpc_core::MemoryOrder::RELAXED)) {
1145       /* zombied call will be destroyed when it's removed from the pending
1146          queue... later */
1147     }
1148   }
1149 }
1150 
accept_stream(void * cd,grpc_transport *,const void * transport_server_data)1151 void accept_stream(void* cd, grpc_transport* /*transport*/,
1152                    const void* transport_server_data) {
1153   channel_data* chand = static_cast<channel_data*>(cd);
1154   /* create a call */
1155   grpc_call_create_args args;
1156   args.channel = chand->channel;
1157   args.server = chand->server;
1158   args.parent = nullptr;
1159   args.propagation_mask = 0;
1160   args.cq = nullptr;
1161   args.pollset_set_alternative = nullptr;
1162   args.server_transport_data = transport_server_data;
1163   args.add_initial_metadata = nullptr;
1164   args.add_initial_metadata_count = 0;
1165   args.send_deadline = GRPC_MILLIS_INF_FUTURE;
1166   grpc_call* call;
1167   grpc_error* error = grpc_call_create(&args, &call);
1168   grpc_call_element* elem =
1169       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1170   if (error != GRPC_ERROR_NONE) {
1171     got_initial_metadata(elem, error);
1172     GRPC_ERROR_UNREF(error);
1173     return;
1174   }
1175   call_data* calld = static_cast<call_data*>(elem->call_data);
1176   grpc_op op;
1177   op.op = GRPC_OP_RECV_INITIAL_METADATA;
1178   op.flags = 0;
1179   op.reserved = nullptr;
1180   op.data.recv_initial_metadata.recv_initial_metadata =
1181       &calld->initial_metadata;
1182   GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
1183                     grpc_schedule_on_exec_ctx);
1184   grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
1185 }
1186 
server_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)1187 grpc_error* server_init_call_elem(grpc_call_element* elem,
1188                                   const grpc_call_element_args* args) {
1189   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1190   server_ref(chand->server);
1191   new (elem->call_data) call_data(elem, *args);
1192   return GRPC_ERROR_NONE;
1193 }
1194 
server_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)1195 void server_destroy_call_elem(grpc_call_element* elem,
1196                               const grpc_call_final_info* /*final_info*/,
1197                               grpc_closure* /*ignored*/) {
1198   call_data* calld = static_cast<call_data*>(elem->call_data);
1199   calld->~call_data();
1200   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1201   server_unref(chand->server);
1202 }
1203 
server_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)1204 grpc_error* server_init_channel_elem(grpc_channel_element* elem,
1205                                      grpc_channel_element_args* args) {
1206   GPR_ASSERT(args->is_first);
1207   GPR_ASSERT(!args->is_last);
1208 
1209   new (static_cast<channel_data*>(elem->channel_data)) channel_data;
1210   return GRPC_ERROR_NONE;
1211 }
1212 
~channel_data()1213 channel_data::~channel_data() {
1214   if (registered_methods) {
1215     for (const channel_registered_method& crm : *registered_methods) {
1216       grpc_slice_unref_internal(crm.method);
1217       GPR_DEBUG_ASSERT(crm.method.refcount == &kNoopRefcount ||
1218                        crm.method.refcount == nullptr);
1219       if (crm.has_host) {
1220         grpc_slice_unref_internal(crm.host);
1221         GPR_DEBUG_ASSERT(crm.host.refcount == &kNoopRefcount ||
1222                          crm.host.refcount == nullptr);
1223       }
1224     }
1225   }
1226   if (server) {
1227     if (server->channelz_server != nullptr && channelz_socket_uuid != 0) {
1228       server->channelz_server->RemoveChildSocket(channelz_socket_uuid);
1229     }
1230     {
1231       MutexLock lock(&server->mu_global);
1232       if (list_position.has_value()) {
1233         server->channels.erase(*list_position);
1234       }
1235       maybe_finish_shutdown(server);
1236     }
1237     server_unref(server);
1238   }
1239 }
1240 
server_destroy_channel_elem(grpc_channel_element * elem)1241 void server_destroy_channel_elem(grpc_channel_element* elem) {
1242   channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1243   chand->~channel_data();
1244 }
1245 
register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1246 void register_completion_queue(grpc_server* server, grpc_completion_queue* cq,
1247                                void* reserved) {
1248   size_t i;
1249   GPR_ASSERT(!reserved);
1250   for (i = 0; i < server->cqs.size(); i++) {
1251     if (server->cqs[i] == cq) return;
1252   }
1253 
1254   GRPC_CQ_INTERNAL_REF(cq, "server");
1255   server->cqs.push_back(cq);
1256 }
1257 
streq(const std::string & a,const char * b)1258 bool streq(const std::string& a, const char* b) {
1259   return (a.empty() && b == nullptr) ||
1260          ((b != nullptr) && !strcmp(a.c_str(), b));
1261 }
1262 
1263 class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
1264  public:
ConnectivityWatcher(channel_data * chand)1265   explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
1266     GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity");
1267   }
1268 
~ConnectivityWatcher()1269   ~ConnectivityWatcher() {
1270     GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity");
1271   }
1272 
1273  private:
OnConnectivityStateChange(grpc_connectivity_state new_state)1274   void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
1275     // Don't do anything until we are being shut down.
1276     if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1277     // Shut down channel.
1278     grpc_server* server = chand_->server;
1279     MutexLock lock(&server->mu_global);
1280     destroy_channel(chand_);
1281   }
1282 
1283   channel_data* chand_;
1284 };
1285 
done_published_shutdown(void *,grpc_cq_completion * storage)1286 void done_published_shutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
1287   delete storage;
1288 }
1289 
listener_destroy_done(void * s,grpc_error *)1290 void listener_destroy_done(void* s, grpc_error* /*error*/) {
1291   grpc_server* server = static_cast<grpc_server*>(s);
1292   MutexLock lock(&server->mu_global);
1293   server->listeners_destroyed++;
1294   maybe_finish_shutdown(server);
1295 }
1296 
queue_call_request(grpc_server * server,size_t cq_idx,requested_call * rc)1297 grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1298                                    requested_call* rc) {
1299   if (server->shutdown_flag.load(std::memory_order_acquire)) {
1300     fail_call(server, cq_idx, rc,
1301               GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1302     return GRPC_CALL_OK;
1303   }
1304   RequestMatcherInterface* rm;
1305   switch (rc->type) {
1306     case RequestedCallType::BATCH_CALL:
1307       rm = server->unregistered_request_matcher.get();
1308       break;
1309     case RequestedCallType::REGISTERED_CALL:
1310       rm = rc->data.registered.method->matcher.get();
1311       break;
1312   }
1313   rm->RequestCallWithPossiblePublish(cq_idx, rc);
1314   return GRPC_CALL_OK;
1315 }
1316 
fail_call(grpc_server * server,size_t cq_idx,requested_call * rc,grpc_error * error)1317 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1318                grpc_error* error) {
1319   *rc->call = nullptr;
1320   rc->initial_metadata->count = 0;
1321   GPR_ASSERT(error != GRPC_ERROR_NONE);
1322 
1323   grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1324                  &rc->completion);
1325 }
1326 
1327 }  // namespace
1328 
SetServerRegisteredMethodAllocator(grpc_server * server,grpc_completion_queue * cq,void * method_tag,std::function<ServerRegisteredCallAllocation ()> allocator)1329 void SetServerRegisteredMethodAllocator(
1330     grpc_server* server, grpc_completion_queue* cq, void* method_tag,
1331     std::function<ServerRegisteredCallAllocation()> allocator) {
1332   registered_method* rm = static_cast<registered_method*>(method_tag);
1333   rm->matcher = absl::make_unique<AllocatingRequestMatcherRegistered>(
1334       server, cq, rm, std::move(allocator));
1335 }
1336 
SetServerBatchMethodAllocator(grpc_server * server,grpc_completion_queue * cq,std::function<ServerBatchCallAllocation ()> allocator)1337 void SetServerBatchMethodAllocator(
1338     grpc_server* server, grpc_completion_queue* cq,
1339     std::function<ServerBatchCallAllocation()> allocator) {
1340   GPR_DEBUG_ASSERT(server->unregistered_request_matcher == nullptr);
1341   server->unregistered_request_matcher =
1342       absl::make_unique<AllocatingRequestMatcherBatch>(server, cq,
1343                                                        std::move(allocator));
1344 }
1345 
1346 }  // namespace grpc_core
1347 
1348 const grpc_channel_filter grpc_server_top_filter = {
1349     grpc_core::server_start_transport_stream_op_batch,
1350     grpc_channel_next_op,
1351     sizeof(grpc_core::call_data),
1352     grpc_core::server_init_call_elem,
1353     grpc_call_stack_ignore_set_pollset_or_pollset_set,
1354     grpc_core::server_destroy_call_elem,
1355     sizeof(grpc_core::channel_data),
1356     grpc_core::server_init_channel_elem,
1357     grpc_core::server_destroy_channel_elem,
1358     grpc_channel_next_get_info,
1359     "server",
1360 };
1361 
1362 // The following are core surface API functions.
1363 
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1364 void grpc_server_register_completion_queue(grpc_server* server,
1365                                            grpc_completion_queue* cq,
1366                                            void* reserved) {
1367   GRPC_API_TRACE(
1368       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1369       (server, cq, reserved));
1370 
1371   auto cq_type = grpc_get_cq_completion_type(cq);
1372   if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1373     gpr_log(GPR_INFO,
1374             "Completion queue of type %d is being registered as a "
1375             "server-completion-queue",
1376             static_cast<int>(cq_type));
1377     /* Ideally we should log an error and abort but ruby-wrapped-language API
1378        calls grpc_completion_queue_pluck() on server completion queues */
1379   }
1380 
1381   grpc_core::register_completion_queue(server, cq, reserved);
1382 }
1383 
grpc_server_create(const grpc_channel_args * args,void * reserved)1384 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1385   grpc_core::ExecCtx exec_ctx;
1386   GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1387 
1388   return new grpc_server(args);
1389 }
1390 
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1391 void* grpc_server_register_method(
1392     grpc_server* server, const char* method, const char* host,
1393     grpc_server_register_method_payload_handling payload_handling,
1394     uint32_t flags) {
1395   GRPC_API_TRACE(
1396       "grpc_server_register_method(server=%p, method=%s, host=%s, "
1397       "flags=0x%08x)",
1398       4, (server, method, host, flags));
1399   if (!method) {
1400     gpr_log(GPR_ERROR,
1401             "grpc_server_register_method method string cannot be NULL");
1402     return nullptr;
1403   }
1404   for (std::unique_ptr<grpc_core::registered_method>& m :
1405        server->registered_methods) {
1406     if (grpc_core::streq(m->method, method) &&
1407         grpc_core::streq(m->host, host)) {
1408       gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1409               host ? host : "*");
1410       return nullptr;
1411     }
1412   }
1413   if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1414     gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1415             flags);
1416     return nullptr;
1417   }
1418   server->registered_methods.emplace_back(
1419       new grpc_core::registered_method(method, host, payload_handling, flags));
1420   return static_cast<void*>(server->registered_methods.back().get());
1421 }
1422 
grpc_server_start(grpc_server * server)1423 void grpc_server_start(grpc_server* server) {
1424   size_t i;
1425   grpc_core::ExecCtx exec_ctx;
1426 
1427   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1428 
1429   server->started = true;
1430   for (i = 0; i < server->cqs.size(); i++) {
1431     if (grpc_cq_can_listen(server->cqs[i])) {
1432       server->pollsets.push_back(grpc_cq_pollset(server->cqs[i]));
1433     }
1434   }
1435   if (server->unregistered_request_matcher == nullptr) {
1436     server->unregistered_request_matcher =
1437         absl::make_unique<grpc_core::RealRequestMatcher>(server);
1438   }
1439   for (std::unique_ptr<grpc_core::registered_method>& rm :
1440        server->registered_methods) {
1441     if (rm->matcher == nullptr) {
1442       rm->matcher = absl::make_unique<grpc_core::RealRequestMatcher>(server);
1443     }
1444   }
1445 
1446   {
1447     grpc_core::MutexLock lock(&server->mu_global);
1448     server->starting = true;
1449   }
1450 
1451   for (auto& listener : server->listeners) {
1452     listener.listener->Start(server, &server->pollsets);
1453   }
1454 
1455   grpc_core::MutexLock lock(&server->mu_global);
1456   server->starting = false;
1457   server->starting_cv.Signal();
1458 }
1459 
1460 /*
1461   - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1462     grpc_server_request_call and grpc_server_request_registered call will now be
1463     cancelled). See 'kill_pending_work_locked()'
1464 
1465   - Shuts down the listeners (i.e the server will no longer listen on the port
1466     for new incoming channels).
1467 
1468   - Iterates through all channels on the server and sends shutdown msg (see
1469     'ChannelBroadcaster::BroadcastShutdown' for details) to the clients via the
1470     transport layer. The transport layer then guarantees the following:
1471      -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1472      -- If the server has outstanding calls that are in the process, the
1473         connection is NOT closed until the server is done with all those calls
1474      -- Once, there are no more calls in progress, the channel is closed
1475  */
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1476 void grpc_server_shutdown_and_notify(grpc_server* server,
1477                                      grpc_completion_queue* cq, void* tag) {
1478   grpc_core::ChannelBroadcaster broadcaster;
1479   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1480   grpc_core::ExecCtx exec_ctx;
1481 
1482   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1483                  (server, cq, tag));
1484 
1485   {
1486     /* wait for startup to be finished: locks mu_global */
1487     grpc_core::MutexLock lock(&server->mu_global);
1488     server->starting_cv.WaitUntil(&server->mu_global,
1489                                   [server] { return !server->starting; });
1490 
1491     /* stay locked, and gather up some stuff to do */
1492     GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1493     if (server->shutdown_published) {
1494       grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE,
1495                      grpc_core::done_published_shutdown, nullptr,
1496                      new grpc_cq_completion);
1497       return;
1498     }
1499     server->shutdown_tags.emplace_back(tag, cq);
1500     if (server->shutdown_flag.load(std::memory_order_acquire)) {
1501       return;
1502     }
1503 
1504     server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1505 
1506     broadcaster.FillChannelsLocked(server);
1507 
1508     server->shutdown_flag.store(true, std::memory_order_release);
1509 
1510     /* collect all unregistered then registered calls */
1511     {
1512       grpc_core::MutexLock lock(&server->mu_call);
1513       grpc_core::kill_pending_work_locked(
1514           server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1515     }
1516 
1517     grpc_core::maybe_finish_shutdown(server);
1518   }
1519 
1520   /* Shutdown listeners */
1521   for (auto& listener : server->listeners) {
1522     grpc_core::channelz::ListenSocketNode* channelz_listen_socket_node =
1523         listener.listener->channelz_listen_socket_node();
1524     if (server->channelz_server != nullptr &&
1525         channelz_listen_socket_node != nullptr) {
1526       server->channelz_server->RemoveChildListenSocket(
1527           channelz_listen_socket_node->uuid());
1528     }
1529     GRPC_CLOSURE_INIT(&listener.destroy_done, grpc_core::listener_destroy_done,
1530                       server, grpc_schedule_on_exec_ctx);
1531     listener.listener->SetOnDestroyDone(&listener.destroy_done);
1532     listener.listener.reset();
1533   }
1534 
1535   broadcaster.BroadcastShutdown(/*send_goaway=*/true, GRPC_ERROR_NONE);
1536 }
1537 
grpc_server_cancel_all_calls(grpc_server * server)1538 void grpc_server_cancel_all_calls(grpc_server* server) {
1539   grpc_core::ChannelBroadcaster broadcaster;
1540   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1541   grpc_core::ExecCtx exec_ctx;
1542 
1543   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1544 
1545   {
1546     grpc_core::MutexLock lock(&server->mu_global);
1547     broadcaster.FillChannelsLocked(server);
1548   }
1549 
1550   broadcaster.BroadcastShutdown(
1551       /*send_goaway=*/false,
1552       GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1553 }
1554 
grpc_server_destroy(grpc_server * server)1555 void grpc_server_destroy(grpc_server* server) {
1556   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1557   grpc_core::ExecCtx exec_ctx;
1558 
1559   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1560 
1561   {
1562     grpc_core::MutexLock lock(&server->mu_global);
1563     GPR_ASSERT(server->shutdown_flag.load(std::memory_order_acquire) ||
1564                server->listeners.empty());
1565     GPR_ASSERT(server->listeners_destroyed == server->listeners.size());
1566   }
1567 
1568   if (server->default_resource_user != nullptr) {
1569     grpc_resource_quota_unref(
1570         grpc_resource_user_quota(server->default_resource_user));
1571     grpc_resource_user_shutdown(server->default_resource_user);
1572     grpc_resource_user_unref(server->default_resource_user);
1573   }
1574   grpc_core::server_unref(server);
1575 }
1576 
grpc_server_get_pollsets(grpc_server * server)1577 const std::vector<grpc_pollset*>& grpc_server_get_pollsets(
1578     grpc_server* server) {
1579   return server->pollsets;
1580 }
1581 
grpc_server_setup_transport(grpc_server * s,grpc_transport * transport,grpc_pollset * accepting_pollset,const grpc_channel_args * args,const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> & socket_node,grpc_resource_user * resource_user)1582 void grpc_server_setup_transport(
1583     grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
1584     const grpc_channel_args* args,
1585     const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
1586         socket_node,
1587     grpc_resource_user* resource_user) {
1588   size_t num_registered_methods;
1589   grpc_core::channel_registered_method* crm;
1590   grpc_channel* channel;
1591   grpc_core::channel_data* chand;
1592   uint32_t hash;
1593   size_t slots;
1594   uint32_t probes;
1595   uint32_t max_probes = 0;
1596   grpc_transport_op* op = nullptr;
1597 
1598   channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
1599                                 resource_user);
1600   chand = static_cast<grpc_core::channel_data*>(
1601       grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1602           ->channel_data);
1603   chand->server = s;
1604   grpc_core::server_ref(s);
1605   chand->channel = channel;
1606   if (socket_node != nullptr) {
1607     chand->channelz_socket_uuid = socket_node->uuid();
1608     s->channelz_server->AddChildSocket(socket_node);
1609   } else {
1610     chand->channelz_socket_uuid = 0;
1611   }
1612 
1613   size_t cq_idx;
1614   for (cq_idx = 0; cq_idx < s->cqs.size(); cq_idx++) {
1615     if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1616   }
1617   if (cq_idx == s->cqs.size()) {
1618     /* completion queue not found: pick a random one to publish new calls to */
1619     cq_idx = static_cast<size_t>(rand()) % s->cqs.size();
1620   }
1621   chand->cq_idx = cq_idx;
1622 
1623   num_registered_methods = s->registered_methods.size();
1624   /* build a lookup table phrased in terms of mdstr's in this channels context
1625      to quickly find registered methods */
1626   if (num_registered_methods > 0) {
1627     slots = 2 * num_registered_methods;
1628     chand->registered_methods.reset(
1629         new std::vector<grpc_core::channel_registered_method>(slots));
1630     for (std::unique_ptr<grpc_core::registered_method>& rm :
1631          s->registered_methods) {
1632       grpc_core::ExternallyManagedSlice host;
1633       grpc_core::ExternallyManagedSlice method(rm->method.c_str());
1634       const bool has_host = !rm->host.empty();
1635       if (has_host) {
1636         host = grpc_core::ExternallyManagedSlice(rm->host.c_str());
1637       }
1638       hash = GRPC_MDSTR_KV_HASH(has_host ? host.Hash() : 0, method.Hash());
1639       for (probes = 0; (*chand->registered_methods)[(hash + probes) % slots]
1640                            .server_registered_method != nullptr;
1641            probes++) {
1642       }
1643       if (probes > max_probes) max_probes = probes;
1644       crm = &(*chand->registered_methods)[(hash + probes) % slots];
1645       crm->server_registered_method = rm.get();
1646       crm->flags = rm->flags;
1647       crm->has_host = has_host;
1648       if (has_host) {
1649         crm->host = host;
1650       }
1651       crm->method = method;
1652     }
1653     GPR_ASSERT(slots <= UINT32_MAX);
1654     chand->registered_method_max_probes = max_probes;
1655   }
1656 
1657   {
1658     grpc_core::MutexLock lock(&s->mu_global);
1659     s->channels.push_front(chand);
1660     chand->list_position = s->channels.begin();
1661   }
1662 
1663   op = grpc_make_transport_op(nullptr);
1664   op->set_accept_stream = true;
1665   op->set_accept_stream_fn = grpc_core::accept_stream;
1666   op->set_accept_stream_user_data = chand;
1667   op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand));
1668   if (s->shutdown_flag.load(std::memory_order_acquire)) {
1669     op->disconnect_with_error =
1670         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1671   }
1672   grpc_transport_perform_op(transport, op);
1673 }
1674 
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * initial_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1675 grpc_call_error grpc_server_request_call(
1676     grpc_server* server, grpc_call** call, grpc_call_details* details,
1677     grpc_metadata_array* initial_metadata,
1678     grpc_completion_queue* cq_bound_to_call,
1679     grpc_completion_queue* cq_for_notification, void* tag) {
1680   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1681   grpc_core::ExecCtx exec_ctx;
1682   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1683   GRPC_API_TRACE(
1684       "grpc_server_request_call("
1685       "server=%p, call=%p, details=%p, initial_metadata=%p, "
1686       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1687       7,
1688       (server, call, details, initial_metadata, cq_bound_to_call,
1689        cq_for_notification, tag));
1690 
1691   size_t cq_idx;
1692   grpc_call_error error = grpc_core::ValidateServerRequestAndCq(
1693       &cq_idx, server, cq_for_notification, tag, nullptr, nullptr);
1694   if (error != GRPC_CALL_OK) {
1695     return error;
1696   }
1697 
1698   grpc_core::requested_call* rc = new grpc_core::requested_call(
1699       tag, cq_bound_to_call, call, initial_metadata, details);
1700   return queue_call_request(server, cq_idx, rc);
1701 }
1702 
grpc_server_request_registered_call(grpc_server * server,void * rmp,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * initial_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1703 grpc_call_error grpc_server_request_registered_call(
1704     grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1705     grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1706     grpc_completion_queue* cq_bound_to_call,
1707     grpc_completion_queue* cq_for_notification, void* tag_new) {
1708   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1709   grpc_core::ExecCtx exec_ctx;
1710   GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1711   grpc_core::registered_method* rm =
1712       static_cast<grpc_core::registered_method*>(rmp);
1713   GRPC_API_TRACE(
1714       "grpc_server_request_registered_call("
1715       "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1716       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1717       "tag=%p)",
1718       9,
1719       (server, rmp, call, deadline, initial_metadata, optional_payload,
1720        cq_bound_to_call, cq_for_notification, tag_new));
1721 
1722   size_t cq_idx;
1723   grpc_call_error error = ValidateServerRequestAndCq(
1724       &cq_idx, server, cq_for_notification, tag_new, optional_payload, rm);
1725   if (error != GRPC_CALL_OK) {
1726     return error;
1727   }
1728 
1729   grpc_core::requested_call* rc = new grpc_core::requested_call(
1730       tag_new, cq_bound_to_call, call, initial_metadata, rm, deadline,
1731       optional_payload);
1732   return queue_call_request(server, cq_idx, rc);
1733 }
1734