1 /*
2 *
3 * Copyright 2015-2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/surface/server.h"
22
23 #include <limits.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <algorithm>
28 #include <atomic>
29 #include <iterator>
30 #include <list>
31 #include <utility>
32 #include <vector>
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37
38 #include "absl/types/optional.h"
39
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/channelz.h"
42 #include "src/core/lib/channel/connected_channel.h"
43 #include "src/core/lib/debug/stats.h"
44 #include "src/core/lib/gpr/spinlock.h"
45 #include "src/core/lib/gpr/string.h"
46 #include "src/core/lib/gprpp/mpscq.h"
47 #include "src/core/lib/iomgr/executor.h"
48 #include "src/core/lib/iomgr/iomgr.h"
49 #include "src/core/lib/slice/slice_internal.h"
50 #include "src/core/lib/surface/api_trace.h"
51 #include "src/core/lib/surface/call.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/completion_queue.h"
54 #include "src/core/lib/surface/init.h"
55 #include "src/core/lib/transport/metadata.h"
56 #include "src/core/lib/transport/static_metadata.h"
57
58 namespace grpc_core {
59
60 TraceFlag grpc_server_channel_trace(false, "server_channel");
61
62 namespace {
63
64 void server_on_recv_initial_metadata(void* ptr, grpc_error* error);
65 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error);
66
67 struct Listener {
Listenergrpc_core::__anonb082b28c0111::Listener68 explicit Listener(OrphanablePtr<ServerListenerInterface> l)
69 : listener(std::move(l)) {}
70
71 OrphanablePtr<ServerListenerInterface> listener;
72 grpc_closure destroy_done;
73 };
74
75 enum class RequestedCallType { BATCH_CALL, REGISTERED_CALL };
76
77 struct registered_method;
78
79 struct requested_call {
requested_callgrpc_core::__anonb082b28c0111::requested_call80 requested_call(void* tag_arg, grpc_completion_queue* call_cq,
81 grpc_call** call_arg, grpc_metadata_array* initial_md,
82 grpc_call_details* details)
83 : type(RequestedCallType::BATCH_CALL),
84 tag(tag_arg),
85 cq_bound_to_call(call_cq),
86 call(call_arg),
87 initial_metadata(initial_md) {
88 details->reserved = nullptr;
89 data.batch.details = details;
90 }
91
requested_callgrpc_core::__anonb082b28c0111::requested_call92 requested_call(void* tag_arg, grpc_completion_queue* call_cq,
93 grpc_call** call_arg, grpc_metadata_array* initial_md,
94 registered_method* rm, gpr_timespec* deadline,
95 grpc_byte_buffer** optional_payload)
96 : type(RequestedCallType::REGISTERED_CALL),
97 tag(tag_arg),
98 cq_bound_to_call(call_cq),
99 call(call_arg),
100 initial_metadata(initial_md) {
101 data.registered.method = rm;
102 data.registered.deadline = deadline;
103 data.registered.optional_payload = optional_payload;
104 }
105
106 MultiProducerSingleConsumerQueue::Node mpscq_node;
107 const RequestedCallType type;
108 void* const tag;
109 grpc_completion_queue* const cq_bound_to_call;
110 grpc_call** const call;
111 grpc_cq_completion completion;
112 grpc_metadata_array* const initial_metadata;
113 union {
114 struct {
115 grpc_call_details* details;
116 } batch;
117 struct {
118 registered_method* method;
119 gpr_timespec* deadline;
120 grpc_byte_buffer** optional_payload;
121 } registered;
122 } data;
123 };
124
125 struct channel_registered_method {
126 registered_method* server_registered_method = nullptr;
127 uint32_t flags;
128 bool has_host;
129 ExternallyManagedSlice method;
130 ExternallyManagedSlice host;
131 };
132
133 struct channel_data {
134 channel_data() = default;
135 ~channel_data();
136
137 grpc_server* server = nullptr;
138 grpc_channel* channel;
139 size_t cq_idx;
140 absl::optional<std::list<channel_data*>::iterator> list_position;
141
142 // registered_methods is a hash-table of the methods and hosts of the
143 // registered methods.
144 // TODO(vjpai): Convert this to an STL map type as opposed to a direct bucket
145 // implementation. (Consider performance impact, hash function to use, etc.)
146 std::unique_ptr<std::vector<channel_registered_method>> registered_methods;
147 uint32_t registered_method_max_probes;
148
149 grpc_closure finish_destroy_channel_closure;
150 intptr_t channelz_socket_uuid;
151 };
152
153 struct shutdown_tag {
shutdown_taggrpc_core::__anonb082b28c0111::shutdown_tag154 shutdown_tag(void* tag_arg, grpc_completion_queue* cq_arg)
155 : tag(tag_arg), cq(cq_arg) {}
156
157 void* const tag;
158 grpc_completion_queue* const cq;
159 grpc_cq_completion completion;
160 };
161
162 enum class CallState {
163 /* waiting for metadata */
164 NOT_STARTED,
165 /* initial metadata read, not flow controlled in yet */
166 PENDING,
167 /* flow controlled in, on completion queue */
168 ACTIVATED,
169 /* cancelled before being queued */
170 ZOMBIED
171 };
172
173 struct call_data;
174
175 grpc_call_error ValidateServerRequest(
176 grpc_completion_queue* cq_for_notification, void* tag,
177 grpc_byte_buffer** optional_payload, registered_method* rm);
178
179 // RPCs that come in from the transport must be matched against RPC requests
180 // from the application. An incoming request from the application can be matched
181 // to an RPC that has already arrived or can be queued up for later use.
182 // Likewise, an RPC coming in from the transport can either be matched to a
183 // request that already arrived from the application or can be queued up for
184 // later use (marked pending). If there is a match, the request's tag is posted
185 // on the request's notification CQ.
186 //
187 // RequestMatcherInterface is the base class to provide this functionality.
188 class RequestMatcherInterface {
189 public:
~RequestMatcherInterface()190 virtual ~RequestMatcherInterface() {}
191
192 // Unref the calls associated with any incoming RPCs in the pending queue (not
193 // yet matched to an application-requested RPC).
194 virtual void ZombifyPending() = 0;
195
196 // Mark all application-requested RPCs failed if they have not been matched to
197 // an incoming RPC. The error parameter indicates why the RPCs are being
198 // failed (always server shutdown in all current implementations).
199 virtual void KillRequests(grpc_error* error) = 0;
200
201 // How many request queues are supported by this matcher. This is an abstract
202 // concept that essentially maps to gRPC completion queues.
203 virtual size_t request_queue_count() const = 0;
204
205 // This function is invoked when the application requests a new RPC whose
206 // information is in the call parameter. The request_queue_index marks the
207 // queue onto which to place this RPC, and is typically associated with a gRPC
208 // CQ. If there are pending RPCs waiting to be matched, publish one (match it
209 // and notify the CQ).
210 virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
211 requested_call* call) = 0;
212
213 // This function is invoked on an incoming RPC, represented by the calld
214 // object. The RequestMatcher will try to match it against an
215 // application-requested RPC if possible or will place it in the pending queue
216 // otherwise. To enable some measure of fairness between server CQs, the match
217 // is done starting at the start_request_queue_index parameter in a cyclic
218 // order rather than always starting at 0.
219 virtual void MatchOrQueue(size_t start_request_queue_index,
220 call_data* calld) = 0;
221
222 // Returns the server associated with this request matcher
223 virtual grpc_server* server() const = 0;
224 };
225
226 struct call_data {
call_datagrpc_core::__anonb082b28c0111::call_data227 call_data(grpc_call_element* elem, const grpc_call_element_args& args)
228 : call(grpc_call_from_top_element(elem)),
229 call_combiner(args.call_combiner) {
230 GRPC_CLOSURE_INIT(&on_recv_initial_metadata,
231 server_on_recv_initial_metadata, elem,
232 grpc_schedule_on_exec_ctx);
233 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready,
234 server_recv_trailing_metadata_ready, elem,
235 grpc_schedule_on_exec_ctx);
236 }
~call_datagrpc_core::__anonb082b28c0111::call_data237 ~call_data() {
238 GPR_ASSERT(state.Load(grpc_core::MemoryOrder::RELAXED) !=
239 CallState::PENDING);
240 GRPC_ERROR_UNREF(recv_initial_metadata_error);
241 if (host_set) {
242 grpc_slice_unref_internal(host);
243 }
244 if (path_set) {
245 grpc_slice_unref_internal(path);
246 }
247 grpc_metadata_array_destroy(&initial_metadata);
248 grpc_byte_buffer_destroy(payload);
249 }
250
251 grpc_call* call;
252
253 Atomic<CallState> state{CallState::NOT_STARTED};
254
255 bool path_set = false;
256 bool host_set = false;
257 grpc_slice path;
258 grpc_slice host;
259 grpc_millis deadline = GRPC_MILLIS_INF_FUTURE;
260
261 grpc_completion_queue* cq_new = nullptr;
262
263 grpc_metadata_batch* recv_initial_metadata = nullptr;
264 uint32_t recv_initial_metadata_flags = 0;
265 grpc_metadata_array initial_metadata =
266 grpc_metadata_array(); // Zero-initialize the C struct.
267
268 RequestMatcherInterface* matcher = nullptr;
269 grpc_byte_buffer* payload = nullptr;
270
271 grpc_closure got_initial_metadata;
272 grpc_closure on_recv_initial_metadata;
273 grpc_closure kill_zombie_closure;
274 grpc_closure* on_done_recv_initial_metadata;
275 grpc_closure recv_trailing_metadata_ready;
276 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
277 grpc_closure* original_recv_trailing_metadata_ready;
278 grpc_error* recv_trailing_metadata_error = GRPC_ERROR_NONE;
279 bool seen_recv_trailing_metadata_ready = false;
280
281 grpc_closure publish;
282
283 CallCombiner* call_combiner;
284 };
285
286 struct registered_method {
registered_methodgrpc_core::__anonb082b28c0111::registered_method287 registered_method(
288 const char* method_arg, const char* host_arg,
289 grpc_server_register_method_payload_handling payload_handling_arg,
290 uint32_t flags_arg)
291 : method(method_arg == nullptr ? "" : method_arg),
292 host(host_arg == nullptr ? "" : host_arg),
293 payload_handling(payload_handling_arg),
294 flags(flags_arg) {}
295
296 ~registered_method() = default;
297
298 const std::string method;
299 const std::string host;
300 const grpc_server_register_method_payload_handling payload_handling;
301 const uint32_t flags;
302 /* one request matcher per method */
303 std::unique_ptr<RequestMatcherInterface> matcher;
304 };
305
306 } // namespace
307 } // namespace grpc_core
308
309 struct grpc_server {
grpc_servergrpc_server310 explicit grpc_server(const grpc_channel_args* args)
311 : channel_args(grpc_channel_args_copy(args)) {
312 if (grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_CHANNELZ,
313 GRPC_ENABLE_CHANNELZ_DEFAULT)) {
314 size_t channel_tracer_max_memory = grpc_channel_args_find_integer(
315 args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE,
316 {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});
317 channelz_server =
318 grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(
319 this, channel_tracer_max_memory);
320 channelz_server->AddTraceEvent(
321 grpc_core::channelz::ChannelTrace::Severity::Info,
322 grpc_slice_from_static_string("Server created"));
323 }
324
325 if (args != nullptr) {
326 grpc_resource_quota* resource_quota =
327 grpc_resource_quota_from_channel_args(args, false /* create */);
328 if (resource_quota != nullptr) {
329 default_resource_user =
330 grpc_resource_user_create(resource_quota, "default");
331 }
332 }
333 }
334
~grpc_servergrpc_server335 ~grpc_server() {
336 grpc_channel_args_destroy(channel_args);
337 for (size_t i = 0; i < cqs.size(); i++) {
338 GRPC_CQ_INTERNAL_UNREF(cqs[i], "server");
339 }
340 }
341
342 grpc_channel_args* const channel_args;
343
344 grpc_resource_user* default_resource_user = nullptr;
345
346 std::vector<grpc_completion_queue*> cqs;
347 std::vector<grpc_pollset*> pollsets;
348 bool started = false;
349
350 /* The two following mutexes control access to server-state
351 mu_global controls access to non-call-related state (e.g., channel state)
352 mu_call controls access to call-related state (e.g., the call lists)
353
354 If they are ever required to be nested, you must lock mu_global
355 before mu_call. This is currently used in shutdown processing
356 (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
357 grpc_core::Mutex mu_global; // mutex for server and channel state
358 grpc_core::Mutex mu_call; // mutex for call-specific state
359
360 /* startup synchronization: flag is protected by mu_global, signals whether
361 we are doing the listener start routine or not */
362 bool starting = false;
363 grpc_core::CondVar starting_cv;
364
365 std::vector<std::unique_ptr<grpc_core::registered_method>> registered_methods;
366
367 // one request matcher for unregistered methods
368 std::unique_ptr<grpc_core::RequestMatcherInterface>
369 unregistered_request_matcher;
370
371 std::atomic_bool shutdown_flag{false};
372 bool shutdown_published = false;
373 std::vector<grpc_core::shutdown_tag> shutdown_tags;
374
375 std::list<grpc_core::channel_data*> channels;
376
377 std::list<grpc_core::Listener> listeners;
378 size_t listeners_destroyed = 0;
379 grpc_core::RefCount internal_refcount;
380
381 /** when did we print the last shutdown progress message */
382 gpr_timespec last_shutdown_message_time;
383
384 grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;
385 };
386
387 // Non-API functions of the server that are only for gRPC core internal use.
388 // TODO(markdroth): Make these class member functions
grpc_server_add_listener(grpc_server * server,grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener)389 void grpc_server_add_listener(
390 grpc_server* server,
391 grpc_core::OrphanablePtr<grpc_core::ServerListenerInterface> listener) {
392 grpc_core::channelz::ListenSocketNode* listen_socket_node =
393 listener->channelz_listen_socket_node();
394 if (listen_socket_node != nullptr && server->channelz_server != nullptr) {
395 server->channelz_server->AddChildListenSocket(listen_socket_node->Ref());
396 }
397 server->listeners.emplace_back(std::move(listener));
398 }
399
grpc_server_get_channel_args(grpc_server * server)400 const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {
401 return server->channel_args;
402 }
403
grpc_server_get_default_resource_user(grpc_server * server)404 grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {
405 return server->default_resource_user;
406 }
407
grpc_server_has_open_connections(grpc_server * server)408 bool grpc_server_has_open_connections(grpc_server* server) {
409 grpc_core::MutexLock lock(&server->mu_global);
410 return !server->channels.empty();
411 }
412
grpc_server_get_channelz_node(grpc_server * server)413 grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(
414 grpc_server* server) {
415 if (server == nullptr) {
416 return nullptr;
417 }
418 return server->channelz_server.get();
419 }
420
421 namespace grpc_core {
422 namespace {
423
424 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
425 requested_call* rc);
426 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
427 grpc_error* error);
428 /* Before calling maybe_finish_shutdown, we must hold mu_global and not
429 hold mu_call */
430 void maybe_finish_shutdown(grpc_server* server);
431
kill_zombie(void * elem,grpc_error *)432 void kill_zombie(void* elem, grpc_error* /*error*/) {
433 grpc_call_unref(
434 grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));
435 }
436
437 // Validate a requested RPC for a server CQ and bind it to that CQ
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)438 grpc_call_error ValidateServerRequest(
439 grpc_completion_queue* cq_for_notification, void* tag,
440 grpc_byte_buffer** optional_payload, registered_method* rm) {
441 if ((rm == nullptr && optional_payload != nullptr) ||
442 ((rm != nullptr) && ((optional_payload == nullptr) !=
443 (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
444 return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
445 }
446 if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
447 return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
448 }
449 return GRPC_CALL_OK;
450 }
451
452 // Validate that a requested RPC has a valid server CQ and is valid, and bind it
ValidateServerRequestAndCq(size_t * cq_idx,grpc_server * server,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,registered_method * rm)453 grpc_call_error ValidateServerRequestAndCq(
454 size_t* cq_idx, grpc_server* server,
455 grpc_completion_queue* cq_for_notification, void* tag,
456 grpc_byte_buffer** optional_payload, registered_method* rm) {
457 size_t idx;
458 for (idx = 0; idx < server->cqs.size(); idx++) {
459 if (server->cqs[idx] == cq_for_notification) {
460 break;
461 }
462 }
463 if (idx == server->cqs.size()) {
464 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
465 }
466 grpc_call_error error =
467 ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
468 if (error != GRPC_CALL_OK) {
469 return error;
470 }
471
472 *cq_idx = idx;
473 return GRPC_CALL_OK;
474 }
475 /*
476 * channel broadcaster
477 */
478
479 struct shutdown_cleanup_args {
480 grpc_closure closure;
481 grpc_slice slice;
482 };
483
shutdown_cleanup(void * arg,grpc_error *)484 void shutdown_cleanup(void* arg, grpc_error* /*error*/) {
485 shutdown_cleanup_args* a = static_cast<shutdown_cleanup_args*>(arg);
486 grpc_slice_unref_internal(a->slice);
487 delete a;
488 }
489
send_shutdown(grpc_channel * channel,bool send_goaway,grpc_error * send_disconnect)490 void send_shutdown(grpc_channel* channel, bool send_goaway,
491 grpc_error* send_disconnect) {
492 shutdown_cleanup_args* sc = new shutdown_cleanup_args;
493 GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,
494 grpc_schedule_on_exec_ctx);
495 grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
496 grpc_channel_element* elem;
497
498 op->goaway_error =
499 send_goaway ? grpc_error_set_int(
500 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),
501 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)
502 : GRPC_ERROR_NONE;
503 op->set_accept_stream = true;
504 sc->slice = grpc_slice_from_copied_string("Server shutdown");
505 op->disconnect_with_error = send_disconnect;
506
507 elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
508 elem->filter->start_transport_op(elem, op);
509 }
510
511 class ChannelBroadcaster {
512 public:
513 // This can have an empty constructor and destructor since we want to control
514 // when the actual setup and shutdown broadcast take place
515
516 // This function copies over the channels from the locked server
FillChannelsLocked(const grpc_server * s)517 void FillChannelsLocked(const grpc_server* s) {
518 GPR_DEBUG_ASSERT(channels_.empty());
519 channels_.reserve(s->channels.size());
520 for (const channel_data* chand : s->channels) {
521 channels_.push_back(chand->channel);
522 GRPC_CHANNEL_INTERNAL_REF(chand->channel, "broadcast");
523 }
524 }
525
526 // Broadcast a shutdown on each channel
BroadcastShutdown(bool send_goaway,grpc_error * force_disconnect)527 void BroadcastShutdown(bool send_goaway, grpc_error* force_disconnect) {
528 for (grpc_channel* channel : channels_) {
529 send_shutdown(channel, send_goaway, GRPC_ERROR_REF(force_disconnect));
530 GRPC_CHANNEL_INTERNAL_UNREF(channel, "broadcast");
531 }
532 channels_.clear(); // just for safety against double broadcast
533 GRPC_ERROR_UNREF(force_disconnect);
534 }
535
536 private:
537 std::vector<grpc_channel*> channels_;
538 };
539
540 /*
541 * request_matcher
542 */
543
544 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
545 // actually uses all the features of RequestMatcherInterface: expecting the
546 // application to explicitly request RPCs and then matching those to incoming
547 // RPCs, along with a slow path by which incoming RPCs are put on a locked
548 // pending list if they aren't able to be matched to an application request.
549 class RealRequestMatcher : public RequestMatcherInterface {
550 public:
RealRequestMatcher(grpc_server * server)551 explicit RealRequestMatcher(grpc_server* server)
552 : server_(server), requests_per_cq_(server->cqs.size()) {}
553
~RealRequestMatcher()554 ~RealRequestMatcher() override {
555 for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
556 GPR_ASSERT(queue.Pop() == nullptr);
557 }
558 }
559
ZombifyPending()560 void ZombifyPending() override {
561 for (call_data* calld : pending_) {
562 calld->state.Store(CallState::ZOMBIED, grpc_core::MemoryOrder::RELAXED);
563 GRPC_CLOSURE_INIT(
564 &calld->kill_zombie_closure, kill_zombie,
565 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
566 grpc_schedule_on_exec_ctx);
567 ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
568 GRPC_ERROR_NONE);
569 }
570 pending_.clear();
571 }
572
KillRequests(grpc_error * error)573 void KillRequests(grpc_error* error) override {
574 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
575 requested_call* rc;
576 while ((rc = reinterpret_cast<requested_call*>(
577 requests_per_cq_[i].Pop())) != nullptr) {
578 fail_call(server_, i, rc, GRPC_ERROR_REF(error));
579 }
580 }
581 GRPC_ERROR_UNREF(error);
582 }
583
request_queue_count() const584 size_t request_queue_count() const override {
585 return requests_per_cq_.size();
586 }
587
RequestCallWithPossiblePublish(size_t request_queue_index,requested_call * call)588 void RequestCallWithPossiblePublish(size_t request_queue_index,
589 requested_call* call) override {
590 if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
591 /* this was the first queued request: we need to lock and start
592 matching calls */
593 struct PendingCall {
594 requested_call* rc = nullptr;
595 call_data* calld;
596 };
597 auto pop_next_pending = [this, request_queue_index] {
598 PendingCall pending;
599 {
600 MutexLock lock(&server_->mu_call);
601 if (!pending_.empty()) {
602 pending.rc = reinterpret_cast<requested_call*>(
603 requests_per_cq_[request_queue_index].Pop());
604 if (pending.rc != nullptr) {
605 pending.calld = pending_.front();
606 pending_.pop_front();
607 }
608 }
609 }
610 return pending;
611 };
612 while (true) {
613 PendingCall next_pending = pop_next_pending();
614 if (next_pending.rc == nullptr) break;
615 CallState expect_pending = CallState::PENDING;
616 if (!next_pending.calld->state.CompareExchangeStrong(
617 &expect_pending, CallState::ACTIVATED,
618 grpc_core::MemoryOrder::ACQ_REL,
619 grpc_core::MemoryOrder::RELAXED)) {
620 // Zombied Call
621 GRPC_CLOSURE_INIT(
622 &next_pending.calld->kill_zombie_closure, kill_zombie,
623 grpc_call_stack_element(
624 grpc_call_get_call_stack(next_pending.calld->call), 0),
625 grpc_schedule_on_exec_ctx);
626 ExecCtx::Run(DEBUG_LOCATION, &next_pending.calld->kill_zombie_closure,
627 GRPC_ERROR_NONE);
628 } else {
629 publish_call(server_, next_pending.calld, request_queue_index,
630 next_pending.rc);
631 }
632 }
633 }
634 }
635
MatchOrQueue(size_t start_request_queue_index,call_data * calld)636 void MatchOrQueue(size_t start_request_queue_index,
637 call_data* calld) override {
638 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
639 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
640 requested_call* rc =
641 reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].TryPop());
642 if (rc == nullptr) {
643 continue;
644 } else {
645 GRPC_STATS_INC_SERVER_CQS_CHECKED(i);
646 calld->state.Store(CallState::ACTIVATED,
647 grpc_core::MemoryOrder::RELAXED);
648 publish_call(server_, calld, cq_idx, rc);
649 return; /* early out */
650 }
651 }
652
653 /* no cq to take the request found: queue it on the slow list */
654 GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();
655
656 // We need to ensure that all the queues are empty. We do this under
657 // the server mu_call lock to ensure that if something is added to
658 // an empty request queue, it will block until the call is actually
659 // added to the pending list.
660 requested_call* rc = nullptr;
661 size_t cq_idx = 0;
662 size_t loop_count;
663 {
664 MutexLock lock(&server_->mu_call);
665 for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
666 cq_idx =
667 (start_request_queue_index + loop_count) % requests_per_cq_.size();
668 rc = reinterpret_cast<requested_call*>(requests_per_cq_[cq_idx].Pop());
669 if (rc != nullptr) {
670 break;
671 }
672 }
673 if (rc == nullptr) {
674 calld->state.Store(CallState::PENDING, grpc_core::MemoryOrder::RELAXED);
675 pending_.push_back(calld);
676 return;
677 }
678 }
679 GRPC_STATS_INC_SERVER_CQS_CHECKED(loop_count + requests_per_cq_.size());
680 calld->state.Store(CallState::ACTIVATED, grpc_core::MemoryOrder::RELAXED);
681 publish_call(server_, calld, cq_idx, rc);
682 }
683
server() const684 grpc_server* server() const override { return server_; }
685
686 private:
687 grpc_server* const server_;
688 std::list<call_data*> pending_;
689 std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
690 };
691
692 // AllocatingRequestMatchers don't allow the application to request an RPC in
693 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
694 // will call out to an allocation function passed in at the construction of the
695 // object. These request matchers are designed for the C++ callback API, so they
696 // only support 1 completion queue (passed in at the constructor).
697 class AllocatingRequestMatcherBase : public RequestMatcherInterface {
698 public:
AllocatingRequestMatcherBase(grpc_server * server,grpc_completion_queue * cq)699 AllocatingRequestMatcherBase(grpc_server* server, grpc_completion_queue* cq)
700 : server_(server), cq_(cq) {
701 size_t idx;
702 for (idx = 0; idx < server->cqs.size(); idx++) {
703 if (server->cqs[idx] == cq) {
704 break;
705 }
706 }
707 GPR_ASSERT(idx < server->cqs.size());
708 cq_idx_ = idx;
709 }
710
ZombifyPending()711 void ZombifyPending() override {}
712
KillRequests(grpc_error * error)713 void KillRequests(grpc_error* error) override { GRPC_ERROR_UNREF(error); }
714
request_queue_count() const715 size_t request_queue_count() const override { return 0; }
716
RequestCallWithPossiblePublish(size_t,requested_call *)717 void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
718 requested_call* /*call*/) final {
719 GPR_ASSERT(false);
720 }
721
server() const722 grpc_server* server() const override { return server_; }
723
724 // Supply the completion queue related to this request matcher
cq() const725 grpc_completion_queue* cq() const { return cq_; }
726
727 // Supply the completion queue's index relative to the server.
cq_idx() const728 size_t cq_idx() const { return cq_idx_; }
729
730 private:
731 grpc_server* const server_;
732 grpc_completion_queue* const cq_;
733 size_t cq_idx_;
734 };
735
736 // An allocating request matcher for non-registered methods (used for generic
737 // API and unimplemented RPCs).
738 class AllocatingRequestMatcherBatch : public AllocatingRequestMatcherBase {
739 public:
AllocatingRequestMatcherBatch(grpc_server * server,grpc_completion_queue * cq,std::function<ServerBatchCallAllocation ()> allocator)740 AllocatingRequestMatcherBatch(
741 grpc_server* server, grpc_completion_queue* cq,
742 std::function<ServerBatchCallAllocation()> allocator)
743 : AllocatingRequestMatcherBase(server, cq),
744 allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)745 void MatchOrQueue(size_t /*start_request_queue_index*/,
746 call_data* calld) override {
747 ServerBatchCallAllocation call_info = allocator_();
748 GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
749 nullptr, nullptr) == GRPC_CALL_OK);
750 requested_call* rc = new requested_call(
751 static_cast<void*>(call_info.tag), cq(), call_info.call,
752 call_info.initial_metadata, call_info.details);
753 calld->state.Store(CallState::ACTIVATED, grpc_core::MemoryOrder::RELAXED);
754 publish_call(server(), calld, cq_idx(), rc);
755 }
756
757 private:
758 std::function<ServerBatchCallAllocation()> allocator_;
759 };
760
761 // An allocating request matcher for registered methods.
762 class AllocatingRequestMatcherRegistered : public AllocatingRequestMatcherBase {
763 public:
AllocatingRequestMatcherRegistered(grpc_server * server,grpc_completion_queue * cq,registered_method * rm,std::function<ServerRegisteredCallAllocation ()> allocator)764 AllocatingRequestMatcherRegistered(
765 grpc_server* server, grpc_completion_queue* cq, registered_method* rm,
766 std::function<ServerRegisteredCallAllocation()> allocator)
767 : AllocatingRequestMatcherBase(server, cq),
768 registered_method_(rm),
769 allocator_(std::move(allocator)) {}
MatchOrQueue(size_t,call_data * calld)770 void MatchOrQueue(size_t /*start_request_queue_index*/,
771 call_data* calld) override {
772 ServerRegisteredCallAllocation call_info = allocator_();
773 GPR_ASSERT(ValidateServerRequest(cq(), static_cast<void*>(call_info.tag),
774 call_info.optional_payload,
775 registered_method_) == GRPC_CALL_OK);
776 requested_call* rc = new requested_call(
777 static_cast<void*>(call_info.tag), cq(), call_info.call,
778 call_info.initial_metadata, registered_method_, call_info.deadline,
779 call_info.optional_payload);
780 calld->state.Store(CallState::ACTIVATED, grpc_core::MemoryOrder::RELAXED);
781 publish_call(server(), calld, cq_idx(), rc);
782 }
783
784 private:
785 registered_method* const registered_method_;
786 std::function<ServerRegisteredCallAllocation()> allocator_;
787 };
788
789 /*
790 * server proper
791 */
792
server_ref(grpc_server * server)793 void server_ref(grpc_server* server) { server->internal_refcount.Ref(); }
794
server_unref(grpc_server * server)795 void server_unref(grpc_server* server) {
796 if (GPR_UNLIKELY(server->internal_refcount.Unref())) {
797 delete server;
798 }
799 }
800
finish_destroy_channel(void * cd,grpc_error *)801 void finish_destroy_channel(void* cd, grpc_error* /*error*/) {
802 channel_data* chand = static_cast<channel_data*>(cd);
803 grpc_server* server = chand->server;
804 GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");
805 server_unref(server);
806 }
807
destroy_channel(channel_data * chand)808 void destroy_channel(channel_data* chand) {
809 if (!chand->list_position.has_value()) return;
810 GPR_ASSERT(chand->server != nullptr);
811 chand->server->channels.erase(*chand->list_position);
812 chand->list_position.reset();
813 server_ref(chand->server);
814 maybe_finish_shutdown(chand->server);
815 GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,
816 finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);
817
818 if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
819 gpr_log(GPR_INFO, "Disconnected client");
820 }
821
822 grpc_transport_op* op =
823 grpc_make_transport_op(&chand->finish_destroy_channel_closure);
824 op->set_accept_stream = true;
825 grpc_channel_next_op(grpc_channel_stack_element(
826 grpc_channel_get_channel_stack(chand->channel), 0),
827 op);
828 }
829
done_request_event(void * req,grpc_cq_completion *)830 void done_request_event(void* req, grpc_cq_completion* /*c*/) {
831 delete static_cast<requested_call*>(req);
832 }
833
publish_call(grpc_server * server,call_data * calld,size_t cq_idx,requested_call * rc)834 void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,
835 requested_call* rc) {
836 grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);
837 grpc_call* call = calld->call;
838 *rc->call = call;
839 calld->cq_new = server->cqs[cq_idx];
840 GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
841 switch (rc->type) {
842 case RequestedCallType::BATCH_CALL:
843 GPR_ASSERT(calld->host_set);
844 GPR_ASSERT(calld->path_set);
845 rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);
846 rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);
847 rc->data.batch.details->deadline =
848 grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
849 rc->data.batch.details->flags = calld->recv_initial_metadata_flags;
850 break;
851 case RequestedCallType::REGISTERED_CALL:
852 *rc->data.registered.deadline =
853 grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);
854 if (rc->data.registered.optional_payload) {
855 *rc->data.registered.optional_payload = calld->payload;
856 calld->payload = nullptr;
857 }
858 break;
859 default:
860 GPR_UNREACHABLE_CODE(return );
861 }
862
863 grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,
864 rc, &rc->completion, true);
865 }
866
publish_new_rpc(void * arg,grpc_error * error)867 void publish_new_rpc(void* arg, grpc_error* error) {
868 grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
869 call_data* calld = static_cast<call_data*>(call_elem->call_data);
870 channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);
871 RequestMatcherInterface* rm = calld->matcher;
872 grpc_server* server = rm->server();
873
874 if (error != GRPC_ERROR_NONE ||
875 server->shutdown_flag.load(std::memory_order_acquire)) {
876 calld->state.Store(CallState::ZOMBIED, grpc_core::MemoryOrder::RELAXED);
877 GRPC_CLOSURE_INIT(
878 &calld->kill_zombie_closure, kill_zombie,
879 grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
880 grpc_schedule_on_exec_ctx);
881 ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
882 GRPC_ERROR_REF(error));
883 return;
884 }
885
886 rm->MatchOrQueue(chand->cq_idx, calld);
887 }
888
finish_start_new_rpc(grpc_server * server,grpc_call_element * elem,RequestMatcherInterface * rm,grpc_server_register_method_payload_handling payload_handling)889 void finish_start_new_rpc(
890 grpc_server* server, grpc_call_element* elem, RequestMatcherInterface* rm,
891 grpc_server_register_method_payload_handling payload_handling) {
892 call_data* calld = static_cast<call_data*>(elem->call_data);
893
894 if (server->shutdown_flag.load(std::memory_order_acquire)) {
895 calld->state.Store(CallState::ZOMBIED, grpc_core::MemoryOrder::RELAXED);
896 GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
897 grpc_schedule_on_exec_ctx);
898 ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure, GRPC_ERROR_NONE);
899 return;
900 }
901
902 calld->matcher = rm;
903
904 switch (payload_handling) {
905 case GRPC_SRM_PAYLOAD_NONE:
906 publish_new_rpc(elem, GRPC_ERROR_NONE);
907 break;
908 case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
909 grpc_op op;
910 op.op = GRPC_OP_RECV_MESSAGE;
911 op.flags = 0;
912 op.reserved = nullptr;
913 op.data.recv_message.recv_message = &calld->payload;
914 GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,
915 grpc_schedule_on_exec_ctx);
916 grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);
917 break;
918 }
919 }
920 }
921
start_new_rpc(grpc_call_element * elem)922 void start_new_rpc(grpc_call_element* elem) {
923 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
924 call_data* calld = static_cast<call_data*>(elem->call_data);
925 grpc_server* server = chand->server;
926 uint32_t i;
927 uint32_t hash;
928 channel_registered_method* rm;
929
930 if (chand->registered_methods && calld->path_set && calld->host_set) {
931 /* TODO(ctiller): unify these two searches */
932 /* check for an exact match with host */
933 hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash_internal(calld->host),
934 grpc_slice_hash_internal(calld->path));
935 for (i = 0; i <= chand->registered_method_max_probes; i++) {
936 rm = &(*chand->registered_methods)[(hash + i) %
937 chand->registered_methods->size()];
938 if (rm->server_registered_method == nullptr) break;
939 if (!rm->has_host) continue;
940 if (rm->host != calld->host) continue;
941 if (rm->method != calld->path) continue;
942 if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
943 0 == (calld->recv_initial_metadata_flags &
944 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
945 continue;
946 }
947 finish_start_new_rpc(server, elem,
948 rm->server_registered_method->matcher.get(),
949 rm->server_registered_method->payload_handling);
950 return;
951 }
952 /* check for a wildcard method definition (no host set) */
953 hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash_internal(calld->path));
954 for (i = 0; i <= chand->registered_method_max_probes; i++) {
955 rm = &(*chand->registered_methods)[(hash + i) %
956 chand->registered_methods->size()];
957 if (rm->server_registered_method == nullptr) break;
958 if (rm->has_host) continue;
959 if (rm->method != calld->path) continue;
960 if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&
961 0 == (calld->recv_initial_metadata_flags &
962 GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {
963 continue;
964 }
965 finish_start_new_rpc(server, elem,
966 rm->server_registered_method->matcher.get(),
967 rm->server_registered_method->payload_handling);
968 return;
969 }
970 }
971 finish_start_new_rpc(server, elem, server->unregistered_request_matcher.get(),
972 GRPC_SRM_PAYLOAD_NONE);
973 }
974
done_shutdown_event(void * server,grpc_cq_completion *)975 void done_shutdown_event(void* server, grpc_cq_completion* /*completion*/) {
976 server_unref(static_cast<grpc_server*>(server));
977 }
978
num_channels(grpc_server * server)979 int num_channels(grpc_server* server) { return server->channels.size(); }
980
kill_pending_work_locked(grpc_server * server,grpc_error * error)981 void kill_pending_work_locked(grpc_server* server, grpc_error* error) {
982 if (server->started) {
983 server->unregistered_request_matcher->KillRequests(GRPC_ERROR_REF(error));
984 server->unregistered_request_matcher->ZombifyPending();
985 for (std::unique_ptr<registered_method>& rm : server->registered_methods) {
986 rm->matcher->KillRequests(GRPC_ERROR_REF(error));
987 rm->matcher->ZombifyPending();
988 }
989 }
990 GRPC_ERROR_UNREF(error);
991 }
992
maybe_finish_shutdown(grpc_server * server)993 void maybe_finish_shutdown(grpc_server* server) {
994 size_t i;
995 if (!server->shutdown_flag.load(std::memory_order_acquire) ||
996 server->shutdown_published) {
997 return;
998 }
999
1000 {
1001 MutexLock lock(&server->mu_call);
1002 kill_pending_work_locked(
1003 server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1004 }
1005
1006 if (!server->channels.empty() ||
1007 server->listeners_destroyed < server->listeners.size()) {
1008 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
1009 server->last_shutdown_message_time),
1010 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
1011 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1012 gpr_log(GPR_DEBUG,
1013 "Waiting for %d channels and %" PRIuPTR "/%" PRIuPTR
1014 " listeners to be destroyed before shutting down server",
1015 num_channels(server),
1016 server->listeners.size() - server->listeners_destroyed,
1017 server->listeners.size());
1018 }
1019 return;
1020 }
1021 server->shutdown_published = 1;
1022 for (i = 0; i < server->shutdown_tags.size(); i++) {
1023 server_ref(server);
1024 grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,
1025 GRPC_ERROR_NONE, done_shutdown_event, server,
1026 &server->shutdown_tags[i].completion);
1027 }
1028 }
1029
server_on_recv_initial_metadata(void * ptr,grpc_error * error)1030 void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
1031 grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
1032 call_data* calld = static_cast<call_data*>(elem->call_data);
1033 grpc_millis op_deadline;
1034
1035 if (error == GRPC_ERROR_NONE) {
1036 GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);
1037 GPR_DEBUG_ASSERT(calld->recv_initial_metadata->idx.named.authority !=
1038 nullptr);
1039 calld->path = grpc_slice_ref_internal(
1040 GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
1041 calld->host = grpc_slice_ref_internal(
1042 GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));
1043 calld->path_set = true;
1044 calld->host_set = true;
1045 grpc_metadata_batch_remove(calld->recv_initial_metadata, GRPC_BATCH_PATH);
1046 grpc_metadata_batch_remove(calld->recv_initial_metadata,
1047 GRPC_BATCH_AUTHORITY);
1048 } else {
1049 GRPC_ERROR_REF(error);
1050 }
1051 op_deadline = calld->recv_initial_metadata->deadline;
1052 if (op_deadline != GRPC_MILLIS_INF_FUTURE) {
1053 calld->deadline = op_deadline;
1054 }
1055 if (calld->host_set && calld->path_set) {
1056 /* do nothing */
1057 } else {
1058 /* Pass the error reference to calld->recv_initial_metadata_error */
1059 grpc_error* src_error = error;
1060 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1061 "Missing :authority or :path", &src_error, 1);
1062 GRPC_ERROR_UNREF(src_error);
1063 calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1064 }
1065 grpc_closure* closure = calld->on_done_recv_initial_metadata;
1066 calld->on_done_recv_initial_metadata = nullptr;
1067 if (calld->seen_recv_trailing_metadata_ready) {
1068 GRPC_CALL_COMBINER_START(calld->call_combiner,
1069 &calld->recv_trailing_metadata_ready,
1070 calld->recv_trailing_metadata_error,
1071 "continue server_recv_trailing_metadata_ready");
1072 }
1073 Closure::Run(DEBUG_LOCATION, closure, error);
1074 }
1075
server_recv_trailing_metadata_ready(void * user_data,grpc_error * error)1076 void server_recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
1077 grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
1078 call_data* calld = static_cast<call_data*>(elem->call_data);
1079 if (calld->on_done_recv_initial_metadata != nullptr) {
1080 calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
1081 calld->seen_recv_trailing_metadata_ready = true;
1082 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
1083 server_recv_trailing_metadata_ready, elem,
1084 grpc_schedule_on_exec_ctx);
1085 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1086 "deferring server_recv_trailing_metadata_ready "
1087 "until after server_on_recv_initial_metadata");
1088 return;
1089 }
1090 error =
1091 grpc_error_add_child(GRPC_ERROR_REF(error),
1092 GRPC_ERROR_REF(calld->recv_initial_metadata_error));
1093 Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready,
1094 error);
1095 }
1096
server_mutate_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1097 void server_mutate_op(grpc_call_element* elem,
1098 grpc_transport_stream_op_batch* op) {
1099 call_data* calld = static_cast<call_data*>(elem->call_data);
1100
1101 if (op->recv_initial_metadata) {
1102 GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);
1103 calld->recv_initial_metadata =
1104 op->payload->recv_initial_metadata.recv_initial_metadata;
1105 calld->on_done_recv_initial_metadata =
1106 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
1107 op->payload->recv_initial_metadata.recv_initial_metadata_ready =
1108 &calld->on_recv_initial_metadata;
1109 op->payload->recv_initial_metadata.recv_flags =
1110 &calld->recv_initial_metadata_flags;
1111 }
1112 if (op->recv_trailing_metadata) {
1113 calld->original_recv_trailing_metadata_ready =
1114 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1115 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1116 &calld->recv_trailing_metadata_ready;
1117 }
1118 }
1119
server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)1120 void server_start_transport_stream_op_batch(
1121 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
1122 server_mutate_op(elem, op);
1123 grpc_call_next_op(elem, op);
1124 }
1125
got_initial_metadata(void * ptr,grpc_error * error)1126 void got_initial_metadata(void* ptr, grpc_error* error) {
1127 grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);
1128 call_data* calld = static_cast<call_data*>(elem->call_data);
1129 if (error == GRPC_ERROR_NONE) {
1130 start_new_rpc(elem);
1131 } else {
1132 CallState expect_not_started = CallState::NOT_STARTED;
1133 CallState expect_pending = CallState::PENDING;
1134 if (calld->state.CompareExchangeStrong(
1135 &expect_not_started, CallState::ZOMBIED,
1136 grpc_core::MemoryOrder::ACQ_REL, grpc_core::MemoryOrder::RELAXED)) {
1137 GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,
1138 grpc_schedule_on_exec_ctx);
1139 ExecCtx::Run(DEBUG_LOCATION, &calld->kill_zombie_closure,
1140 GRPC_ERROR_NONE);
1141 } else if (calld->state.CompareExchangeStrong(
1142 &expect_pending, CallState::ZOMBIED,
1143 grpc_core::MemoryOrder::ACQ_REL,
1144 grpc_core::MemoryOrder::RELAXED)) {
1145 /* zombied call will be destroyed when it's removed from the pending
1146 queue... later */
1147 }
1148 }
1149 }
1150
accept_stream(void * cd,grpc_transport *,const void * transport_server_data)1151 void accept_stream(void* cd, grpc_transport* /*transport*/,
1152 const void* transport_server_data) {
1153 channel_data* chand = static_cast<channel_data*>(cd);
1154 /* create a call */
1155 grpc_call_create_args args;
1156 args.channel = chand->channel;
1157 args.server = chand->server;
1158 args.parent = nullptr;
1159 args.propagation_mask = 0;
1160 args.cq = nullptr;
1161 args.pollset_set_alternative = nullptr;
1162 args.server_transport_data = transport_server_data;
1163 args.add_initial_metadata = nullptr;
1164 args.add_initial_metadata_count = 0;
1165 args.send_deadline = GRPC_MILLIS_INF_FUTURE;
1166 grpc_call* call;
1167 grpc_error* error = grpc_call_create(&args, &call);
1168 grpc_call_element* elem =
1169 grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1170 if (error != GRPC_ERROR_NONE) {
1171 got_initial_metadata(elem, error);
1172 GRPC_ERROR_UNREF(error);
1173 return;
1174 }
1175 call_data* calld = static_cast<call_data*>(elem->call_data);
1176 grpc_op op;
1177 op.op = GRPC_OP_RECV_INITIAL_METADATA;
1178 op.flags = 0;
1179 op.reserved = nullptr;
1180 op.data.recv_initial_metadata.recv_initial_metadata =
1181 &calld->initial_metadata;
1182 GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,
1183 grpc_schedule_on_exec_ctx);
1184 grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);
1185 }
1186
server_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)1187 grpc_error* server_init_call_elem(grpc_call_element* elem,
1188 const grpc_call_element_args* args) {
1189 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1190 server_ref(chand->server);
1191 new (elem->call_data) call_data(elem, *args);
1192 return GRPC_ERROR_NONE;
1193 }
1194
server_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)1195 void server_destroy_call_elem(grpc_call_element* elem,
1196 const grpc_call_final_info* /*final_info*/,
1197 grpc_closure* /*ignored*/) {
1198 call_data* calld = static_cast<call_data*>(elem->call_data);
1199 calld->~call_data();
1200 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1201 server_unref(chand->server);
1202 }
1203
server_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)1204 grpc_error* server_init_channel_elem(grpc_channel_element* elem,
1205 grpc_channel_element_args* args) {
1206 GPR_ASSERT(args->is_first);
1207 GPR_ASSERT(!args->is_last);
1208
1209 new (static_cast<channel_data*>(elem->channel_data)) channel_data;
1210 return GRPC_ERROR_NONE;
1211 }
1212
~channel_data()1213 channel_data::~channel_data() {
1214 if (registered_methods) {
1215 for (const channel_registered_method& crm : *registered_methods) {
1216 grpc_slice_unref_internal(crm.method);
1217 GPR_DEBUG_ASSERT(crm.method.refcount == &kNoopRefcount ||
1218 crm.method.refcount == nullptr);
1219 if (crm.has_host) {
1220 grpc_slice_unref_internal(crm.host);
1221 GPR_DEBUG_ASSERT(crm.host.refcount == &kNoopRefcount ||
1222 crm.host.refcount == nullptr);
1223 }
1224 }
1225 }
1226 if (server) {
1227 if (server->channelz_server != nullptr && channelz_socket_uuid != 0) {
1228 server->channelz_server->RemoveChildSocket(channelz_socket_uuid);
1229 }
1230 {
1231 MutexLock lock(&server->mu_global);
1232 if (list_position.has_value()) {
1233 server->channels.erase(*list_position);
1234 }
1235 maybe_finish_shutdown(server);
1236 }
1237 server_unref(server);
1238 }
1239 }
1240
server_destroy_channel_elem(grpc_channel_element * elem)1241 void server_destroy_channel_elem(grpc_channel_element* elem) {
1242 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1243 chand->~channel_data();
1244 }
1245
register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1246 void register_completion_queue(grpc_server* server, grpc_completion_queue* cq,
1247 void* reserved) {
1248 size_t i;
1249 GPR_ASSERT(!reserved);
1250 for (i = 0; i < server->cqs.size(); i++) {
1251 if (server->cqs[i] == cq) return;
1252 }
1253
1254 GRPC_CQ_INTERNAL_REF(cq, "server");
1255 server->cqs.push_back(cq);
1256 }
1257
streq(const std::string & a,const char * b)1258 bool streq(const std::string& a, const char* b) {
1259 return (a.empty() && b == nullptr) ||
1260 ((b != nullptr) && !strcmp(a.c_str(), b));
1261 }
1262
1263 class ConnectivityWatcher : public AsyncConnectivityStateWatcherInterface {
1264 public:
ConnectivityWatcher(channel_data * chand)1265 explicit ConnectivityWatcher(channel_data* chand) : chand_(chand) {
1266 GRPC_CHANNEL_INTERNAL_REF(chand_->channel, "connectivity");
1267 }
1268
~ConnectivityWatcher()1269 ~ConnectivityWatcher() {
1270 GRPC_CHANNEL_INTERNAL_UNREF(chand_->channel, "connectivity");
1271 }
1272
1273 private:
OnConnectivityStateChange(grpc_connectivity_state new_state)1274 void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
1275 // Don't do anything until we are being shut down.
1276 if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1277 // Shut down channel.
1278 grpc_server* server = chand_->server;
1279 MutexLock lock(&server->mu_global);
1280 destroy_channel(chand_);
1281 }
1282
1283 channel_data* chand_;
1284 };
1285
done_published_shutdown(void *,grpc_cq_completion * storage)1286 void done_published_shutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
1287 delete storage;
1288 }
1289
listener_destroy_done(void * s,grpc_error *)1290 void listener_destroy_done(void* s, grpc_error* /*error*/) {
1291 grpc_server* server = static_cast<grpc_server*>(s);
1292 MutexLock lock(&server->mu_global);
1293 server->listeners_destroyed++;
1294 maybe_finish_shutdown(server);
1295 }
1296
queue_call_request(grpc_server * server,size_t cq_idx,requested_call * rc)1297 grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,
1298 requested_call* rc) {
1299 if (server->shutdown_flag.load(std::memory_order_acquire)) {
1300 fail_call(server, cq_idx, rc,
1301 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1302 return GRPC_CALL_OK;
1303 }
1304 RequestMatcherInterface* rm;
1305 switch (rc->type) {
1306 case RequestedCallType::BATCH_CALL:
1307 rm = server->unregistered_request_matcher.get();
1308 break;
1309 case RequestedCallType::REGISTERED_CALL:
1310 rm = rc->data.registered.method->matcher.get();
1311 break;
1312 }
1313 rm->RequestCallWithPossiblePublish(cq_idx, rc);
1314 return GRPC_CALL_OK;
1315 }
1316
fail_call(grpc_server * server,size_t cq_idx,requested_call * rc,grpc_error * error)1317 void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,
1318 grpc_error* error) {
1319 *rc->call = nullptr;
1320 rc->initial_metadata->count = 0;
1321 GPR_ASSERT(error != GRPC_ERROR_NONE);
1322
1323 grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,
1324 &rc->completion);
1325 }
1326
1327 } // namespace
1328
SetServerRegisteredMethodAllocator(grpc_server * server,grpc_completion_queue * cq,void * method_tag,std::function<ServerRegisteredCallAllocation ()> allocator)1329 void SetServerRegisteredMethodAllocator(
1330 grpc_server* server, grpc_completion_queue* cq, void* method_tag,
1331 std::function<ServerRegisteredCallAllocation()> allocator) {
1332 registered_method* rm = static_cast<registered_method*>(method_tag);
1333 rm->matcher = absl::make_unique<AllocatingRequestMatcherRegistered>(
1334 server, cq, rm, std::move(allocator));
1335 }
1336
SetServerBatchMethodAllocator(grpc_server * server,grpc_completion_queue * cq,std::function<ServerBatchCallAllocation ()> allocator)1337 void SetServerBatchMethodAllocator(
1338 grpc_server* server, grpc_completion_queue* cq,
1339 std::function<ServerBatchCallAllocation()> allocator) {
1340 GPR_DEBUG_ASSERT(server->unregistered_request_matcher == nullptr);
1341 server->unregistered_request_matcher =
1342 absl::make_unique<AllocatingRequestMatcherBatch>(server, cq,
1343 std::move(allocator));
1344 }
1345
1346 } // namespace grpc_core
1347
1348 const grpc_channel_filter grpc_server_top_filter = {
1349 grpc_core::server_start_transport_stream_op_batch,
1350 grpc_channel_next_op,
1351 sizeof(grpc_core::call_data),
1352 grpc_core::server_init_call_elem,
1353 grpc_call_stack_ignore_set_pollset_or_pollset_set,
1354 grpc_core::server_destroy_call_elem,
1355 sizeof(grpc_core::channel_data),
1356 grpc_core::server_init_channel_elem,
1357 grpc_core::server_destroy_channel_elem,
1358 grpc_channel_next_get_info,
1359 "server",
1360 };
1361
1362 // The following are core surface API functions.
1363
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1364 void grpc_server_register_completion_queue(grpc_server* server,
1365 grpc_completion_queue* cq,
1366 void* reserved) {
1367 GRPC_API_TRACE(
1368 "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1369 (server, cq, reserved));
1370
1371 auto cq_type = grpc_get_cq_completion_type(cq);
1372 if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1373 gpr_log(GPR_INFO,
1374 "Completion queue of type %d is being registered as a "
1375 "server-completion-queue",
1376 static_cast<int>(cq_type));
1377 /* Ideally we should log an error and abort but ruby-wrapped-language API
1378 calls grpc_completion_queue_pluck() on server completion queues */
1379 }
1380
1381 grpc_core::register_completion_queue(server, cq, reserved);
1382 }
1383
grpc_server_create(const grpc_channel_args * args,void * reserved)1384 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1385 grpc_core::ExecCtx exec_ctx;
1386 GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1387
1388 return new grpc_server(args);
1389 }
1390
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1391 void* grpc_server_register_method(
1392 grpc_server* server, const char* method, const char* host,
1393 grpc_server_register_method_payload_handling payload_handling,
1394 uint32_t flags) {
1395 GRPC_API_TRACE(
1396 "grpc_server_register_method(server=%p, method=%s, host=%s, "
1397 "flags=0x%08x)",
1398 4, (server, method, host, flags));
1399 if (!method) {
1400 gpr_log(GPR_ERROR,
1401 "grpc_server_register_method method string cannot be NULL");
1402 return nullptr;
1403 }
1404 for (std::unique_ptr<grpc_core::registered_method>& m :
1405 server->registered_methods) {
1406 if (grpc_core::streq(m->method, method) &&
1407 grpc_core::streq(m->host, host)) {
1408 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
1409 host ? host : "*");
1410 return nullptr;
1411 }
1412 }
1413 if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {
1414 gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
1415 flags);
1416 return nullptr;
1417 }
1418 server->registered_methods.emplace_back(
1419 new grpc_core::registered_method(method, host, payload_handling, flags));
1420 return static_cast<void*>(server->registered_methods.back().get());
1421 }
1422
grpc_server_start(grpc_server * server)1423 void grpc_server_start(grpc_server* server) {
1424 size_t i;
1425 grpc_core::ExecCtx exec_ctx;
1426
1427 GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1428
1429 server->started = true;
1430 for (i = 0; i < server->cqs.size(); i++) {
1431 if (grpc_cq_can_listen(server->cqs[i])) {
1432 server->pollsets.push_back(grpc_cq_pollset(server->cqs[i]));
1433 }
1434 }
1435 if (server->unregistered_request_matcher == nullptr) {
1436 server->unregistered_request_matcher =
1437 absl::make_unique<grpc_core::RealRequestMatcher>(server);
1438 }
1439 for (std::unique_ptr<grpc_core::registered_method>& rm :
1440 server->registered_methods) {
1441 if (rm->matcher == nullptr) {
1442 rm->matcher = absl::make_unique<grpc_core::RealRequestMatcher>(server);
1443 }
1444 }
1445
1446 {
1447 grpc_core::MutexLock lock(&server->mu_global);
1448 server->starting = true;
1449 }
1450
1451 for (auto& listener : server->listeners) {
1452 listener.listener->Start(server, &server->pollsets);
1453 }
1454
1455 grpc_core::MutexLock lock(&server->mu_global);
1456 server->starting = false;
1457 server->starting_cv.Signal();
1458 }
1459
1460 /*
1461 - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via
1462 grpc_server_request_call and grpc_server_request_registered call will now be
1463 cancelled). See 'kill_pending_work_locked()'
1464
1465 - Shuts down the listeners (i.e the server will no longer listen on the port
1466 for new incoming channels).
1467
1468 - Iterates through all channels on the server and sends shutdown msg (see
1469 'ChannelBroadcaster::BroadcastShutdown' for details) to the clients via the
1470 transport layer. The transport layer then guarantees the following:
1471 -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)
1472 -- If the server has outstanding calls that are in the process, the
1473 connection is NOT closed until the server is done with all those calls
1474 -- Once, there are no more calls in progress, the channel is closed
1475 */
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1476 void grpc_server_shutdown_and_notify(grpc_server* server,
1477 grpc_completion_queue* cq, void* tag) {
1478 grpc_core::ChannelBroadcaster broadcaster;
1479 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1480 grpc_core::ExecCtx exec_ctx;
1481
1482 GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1483 (server, cq, tag));
1484
1485 {
1486 /* wait for startup to be finished: locks mu_global */
1487 grpc_core::MutexLock lock(&server->mu_global);
1488 server->starting_cv.WaitUntil(&server->mu_global,
1489 [server] { return !server->starting; });
1490
1491 /* stay locked, and gather up some stuff to do */
1492 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1493 if (server->shutdown_published) {
1494 grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE,
1495 grpc_core::done_published_shutdown, nullptr,
1496 new grpc_cq_completion);
1497 return;
1498 }
1499 server->shutdown_tags.emplace_back(tag, cq);
1500 if (server->shutdown_flag.load(std::memory_order_acquire)) {
1501 return;
1502 }
1503
1504 server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1505
1506 broadcaster.FillChannelsLocked(server);
1507
1508 server->shutdown_flag.store(true, std::memory_order_release);
1509
1510 /* collect all unregistered then registered calls */
1511 {
1512 grpc_core::MutexLock lock(&server->mu_call);
1513 grpc_core::kill_pending_work_locked(
1514 server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));
1515 }
1516
1517 grpc_core::maybe_finish_shutdown(server);
1518 }
1519
1520 /* Shutdown listeners */
1521 for (auto& listener : server->listeners) {
1522 grpc_core::channelz::ListenSocketNode* channelz_listen_socket_node =
1523 listener.listener->channelz_listen_socket_node();
1524 if (server->channelz_server != nullptr &&
1525 channelz_listen_socket_node != nullptr) {
1526 server->channelz_server->RemoveChildListenSocket(
1527 channelz_listen_socket_node->uuid());
1528 }
1529 GRPC_CLOSURE_INIT(&listener.destroy_done, grpc_core::listener_destroy_done,
1530 server, grpc_schedule_on_exec_ctx);
1531 listener.listener->SetOnDestroyDone(&listener.destroy_done);
1532 listener.listener.reset();
1533 }
1534
1535 broadcaster.BroadcastShutdown(/*send_goaway=*/true, GRPC_ERROR_NONE);
1536 }
1537
grpc_server_cancel_all_calls(grpc_server * server)1538 void grpc_server_cancel_all_calls(grpc_server* server) {
1539 grpc_core::ChannelBroadcaster broadcaster;
1540 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1541 grpc_core::ExecCtx exec_ctx;
1542
1543 GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1544
1545 {
1546 grpc_core::MutexLock lock(&server->mu_global);
1547 broadcaster.FillChannelsLocked(server);
1548 }
1549
1550 broadcaster.BroadcastShutdown(
1551 /*send_goaway=*/false,
1552 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));
1553 }
1554
grpc_server_destroy(grpc_server * server)1555 void grpc_server_destroy(grpc_server* server) {
1556 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1557 grpc_core::ExecCtx exec_ctx;
1558
1559 GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1560
1561 {
1562 grpc_core::MutexLock lock(&server->mu_global);
1563 GPR_ASSERT(server->shutdown_flag.load(std::memory_order_acquire) ||
1564 server->listeners.empty());
1565 GPR_ASSERT(server->listeners_destroyed == server->listeners.size());
1566 }
1567
1568 if (server->default_resource_user != nullptr) {
1569 grpc_resource_quota_unref(
1570 grpc_resource_user_quota(server->default_resource_user));
1571 grpc_resource_user_shutdown(server->default_resource_user);
1572 grpc_resource_user_unref(server->default_resource_user);
1573 }
1574 grpc_core::server_unref(server);
1575 }
1576
grpc_server_get_pollsets(grpc_server * server)1577 const std::vector<grpc_pollset*>& grpc_server_get_pollsets(
1578 grpc_server* server) {
1579 return server->pollsets;
1580 }
1581
grpc_server_setup_transport(grpc_server * s,grpc_transport * transport,grpc_pollset * accepting_pollset,const grpc_channel_args * args,const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> & socket_node,grpc_resource_user * resource_user)1582 void grpc_server_setup_transport(
1583 grpc_server* s, grpc_transport* transport, grpc_pollset* accepting_pollset,
1584 const grpc_channel_args* args,
1585 const grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>&
1586 socket_node,
1587 grpc_resource_user* resource_user) {
1588 size_t num_registered_methods;
1589 grpc_core::channel_registered_method* crm;
1590 grpc_channel* channel;
1591 grpc_core::channel_data* chand;
1592 uint32_t hash;
1593 size_t slots;
1594 uint32_t probes;
1595 uint32_t max_probes = 0;
1596 grpc_transport_op* op = nullptr;
1597
1598 channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,
1599 resource_user);
1600 chand = static_cast<grpc_core::channel_data*>(
1601 grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)
1602 ->channel_data);
1603 chand->server = s;
1604 grpc_core::server_ref(s);
1605 chand->channel = channel;
1606 if (socket_node != nullptr) {
1607 chand->channelz_socket_uuid = socket_node->uuid();
1608 s->channelz_server->AddChildSocket(socket_node);
1609 } else {
1610 chand->channelz_socket_uuid = 0;
1611 }
1612
1613 size_t cq_idx;
1614 for (cq_idx = 0; cq_idx < s->cqs.size(); cq_idx++) {
1615 if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;
1616 }
1617 if (cq_idx == s->cqs.size()) {
1618 /* completion queue not found: pick a random one to publish new calls to */
1619 cq_idx = static_cast<size_t>(rand()) % s->cqs.size();
1620 }
1621 chand->cq_idx = cq_idx;
1622
1623 num_registered_methods = s->registered_methods.size();
1624 /* build a lookup table phrased in terms of mdstr's in this channels context
1625 to quickly find registered methods */
1626 if (num_registered_methods > 0) {
1627 slots = 2 * num_registered_methods;
1628 chand->registered_methods.reset(
1629 new std::vector<grpc_core::channel_registered_method>(slots));
1630 for (std::unique_ptr<grpc_core::registered_method>& rm :
1631 s->registered_methods) {
1632 grpc_core::ExternallyManagedSlice host;
1633 grpc_core::ExternallyManagedSlice method(rm->method.c_str());
1634 const bool has_host = !rm->host.empty();
1635 if (has_host) {
1636 host = grpc_core::ExternallyManagedSlice(rm->host.c_str());
1637 }
1638 hash = GRPC_MDSTR_KV_HASH(has_host ? host.Hash() : 0, method.Hash());
1639 for (probes = 0; (*chand->registered_methods)[(hash + probes) % slots]
1640 .server_registered_method != nullptr;
1641 probes++) {
1642 }
1643 if (probes > max_probes) max_probes = probes;
1644 crm = &(*chand->registered_methods)[(hash + probes) % slots];
1645 crm->server_registered_method = rm.get();
1646 crm->flags = rm->flags;
1647 crm->has_host = has_host;
1648 if (has_host) {
1649 crm->host = host;
1650 }
1651 crm->method = method;
1652 }
1653 GPR_ASSERT(slots <= UINT32_MAX);
1654 chand->registered_method_max_probes = max_probes;
1655 }
1656
1657 {
1658 grpc_core::MutexLock lock(&s->mu_global);
1659 s->channels.push_front(chand);
1660 chand->list_position = s->channels.begin();
1661 }
1662
1663 op = grpc_make_transport_op(nullptr);
1664 op->set_accept_stream = true;
1665 op->set_accept_stream_fn = grpc_core::accept_stream;
1666 op->set_accept_stream_user_data = chand;
1667 op->start_connectivity_watch.reset(new grpc_core::ConnectivityWatcher(chand));
1668 if (s->shutdown_flag.load(std::memory_order_acquire)) {
1669 op->disconnect_with_error =
1670 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");
1671 }
1672 grpc_transport_perform_op(transport, op);
1673 }
1674
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * initial_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1675 grpc_call_error grpc_server_request_call(
1676 grpc_server* server, grpc_call** call, grpc_call_details* details,
1677 grpc_metadata_array* initial_metadata,
1678 grpc_completion_queue* cq_bound_to_call,
1679 grpc_completion_queue* cq_for_notification, void* tag) {
1680 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1681 grpc_core::ExecCtx exec_ctx;
1682 GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1683 GRPC_API_TRACE(
1684 "grpc_server_request_call("
1685 "server=%p, call=%p, details=%p, initial_metadata=%p, "
1686 "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1687 7,
1688 (server, call, details, initial_metadata, cq_bound_to_call,
1689 cq_for_notification, tag));
1690
1691 size_t cq_idx;
1692 grpc_call_error error = grpc_core::ValidateServerRequestAndCq(
1693 &cq_idx, server, cq_for_notification, tag, nullptr, nullptr);
1694 if (error != GRPC_CALL_OK) {
1695 return error;
1696 }
1697
1698 grpc_core::requested_call* rc = new grpc_core::requested_call(
1699 tag, cq_bound_to_call, call, initial_metadata, details);
1700 return queue_call_request(server, cq_idx, rc);
1701 }
1702
grpc_server_request_registered_call(grpc_server * server,void * rmp,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * initial_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1703 grpc_call_error grpc_server_request_registered_call(
1704 grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,
1705 grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,
1706 grpc_completion_queue* cq_bound_to_call,
1707 grpc_completion_queue* cq_for_notification, void* tag_new) {
1708 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1709 grpc_core::ExecCtx exec_ctx;
1710 GRPC_STATS_INC_SERVER_REQUESTED_CALLS();
1711 grpc_core::registered_method* rm =
1712 static_cast<grpc_core::registered_method*>(rmp);
1713 GRPC_API_TRACE(
1714 "grpc_server_request_registered_call("
1715 "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1716 "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1717 "tag=%p)",
1718 9,
1719 (server, rmp, call, deadline, initial_metadata, optional_payload,
1720 cq_bound_to_call, cq_for_notification, tag_new));
1721
1722 size_t cq_idx;
1723 grpc_call_error error = ValidateServerRequestAndCq(
1724 &cq_idx, server, cq_for_notification, tag_new, optional_payload, rm);
1725 if (error != GRPC_CALL_OK) {
1726 return error;
1727 }
1728
1729 grpc_core::requested_call* rc = new grpc_core::requested_call(
1730 tag_new, cq_bound_to_call, call, initial_metadata, rm, deadline,
1731 optional_payload);
1732 return queue_call_request(server, cq_idx, rc);
1733 }
1734