1 //
2 // Copyright 2015-2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/server/server.h"
18
19 #include <grpc/byte_buffer.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/impl/connectivity_state.h>
23 #include <grpc/slice.h>
24 #include <grpc/status.h>
25 #include <grpc/support/port_platform.h>
26 #include <grpc/support/time.h>
27 #include <inttypes.h>
28 #include <stdlib.h>
29 #include <string.h>
30
31 #include <algorithm>
32 #include <atomic>
33 #include <list>
34 #include <memory>
35 #include <new>
36 #include <queue>
37 #include <type_traits>
38 #include <utility>
39 #include <vector>
40
41 #include "absl/cleanup/cleanup.h"
42 #include "absl/container/flat_hash_map.h"
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/status/status.h"
46 #include "absl/types/optional.h"
47 #include "src/core/channelz/channel_trace.h"
48 #include "src/core/channelz/channelz.h"
49 #include "src/core/config/core_configuration.h"
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_args_preconditioning.h"
53 #include "src/core/lib/experiments/experiments.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/core/lib/iomgr/pollset_set.h"
56 #include "src/core/lib/promise/activity.h"
57 #include "src/core/lib/promise/cancel_callback.h"
58 #include "src/core/lib/promise/context.h"
59 #include "src/core/lib/promise/map.h"
60 #include "src/core/lib/promise/pipe.h"
61 #include "src/core/lib/promise/poll.h"
62 #include "src/core/lib/promise/promise.h"
63 #include "src/core/lib/promise/seq.h"
64 #include "src/core/lib/promise/try_join.h"
65 #include "src/core/lib/promise/try_seq.h"
66 #include "src/core/lib/security/credentials/credentials.h"
67 #include "src/core/lib/slice/slice_buffer.h"
68 #include "src/core/lib/slice/slice_internal.h"
69 #include "src/core/lib/surface/call.h"
70 #include "src/core/lib/surface/call_utils.h"
71 #include "src/core/lib/surface/channel.h"
72 #include "src/core/lib/surface/channel_stack_type.h"
73 #include "src/core/lib/surface/completion_queue.h"
74 #include "src/core/lib/surface/legacy_channel.h"
75 #include "src/core/lib/surface/server_call.h"
76 #include "src/core/lib/transport/connectivity_state.h"
77 #include "src/core/lib/transport/error_utils.h"
78 #include "src/core/lib/transport/interception_chain.h"
79 #include "src/core/telemetry/stats.h"
80 #include "src/core/util/crash.h"
81 #include "src/core/util/debug_location.h"
82 #include "src/core/util/mpscq.h"
83 #include "src/core/util/orphanable.h"
84 #include "src/core/util/status_helper.h"
85 #include "src/core/util/useful.h"
86
87 namespace grpc_core {
88
89 //
90 // Server::ListenerState::ConfigFetcherWatcher
91 //
92
UpdateConnectionManager(RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager)93 void Server::ListenerState::ConfigFetcherWatcher::UpdateConnectionManager(
94 RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager) {
95 RefCountedPtr<ServerConfigFetcher::ConnectionManager>
96 connection_manager_to_destroy;
97 {
98 MutexLock lock(&listener_state_->mu_);
99 connection_manager_to_destroy = listener_state_->connection_manager_;
100 listener_state_->connection_manager_ = std::move(connection_manager);
101 listener_state_->DrainConnectionsLocked();
102 if (listener_state_->server_->ShutdownCalled()) {
103 return;
104 }
105 listener_state_->is_serving_ = true;
106 if (listener_state_->started_) return;
107 listener_state_->started_ = true;
108 }
109 listener_state_->listener_->Start();
110 }
111
StopServing()112 void Server::ListenerState::ConfigFetcherWatcher::StopServing() {
113 MutexLock lock(&listener_state_->mu_);
114 listener_state_->is_serving_ = false;
115 listener_state_->DrainConnectionsLocked();
116 }
117
118 //
119 // Server::ListenerState
120 //
121
ListenerState(RefCountedPtr<Server> server,OrphanablePtr<ListenerInterface> l)122 Server::ListenerState::ListenerState(RefCountedPtr<Server> server,
123 OrphanablePtr<ListenerInterface> l)
124 : server_(std::move(server)),
125 memory_quota_(
126 server_->channel_args().GetObject<ResourceQuota>()->memory_quota()),
127 connection_quota_(MakeRefCounted<ConnectionQuota>()),
128 event_engine_(
129 server_->channel_args()
130 .GetObject<grpc_event_engine::experimental::EventEngine>()),
131 listener_(std::move(l)) {
132 auto max_allowed_incoming_connections =
133 server_->channel_args().GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS);
134 if (max_allowed_incoming_connections.has_value()) {
135 connection_quota_->SetMaxIncomingConnections(
136 max_allowed_incoming_connections.value());
137 }
138 }
139
Start()140 void Server::ListenerState::Start() {
141 if (IsServerListenerEnabled()) {
142 if (server_->config_fetcher() != nullptr) {
143 auto watcher = std::make_unique<ConfigFetcherWatcher>(this);
144 config_fetcher_watcher_ = watcher.get();
145 server_->config_fetcher()->StartWatch(
146 grpc_sockaddr_to_string(listener_->resolved_address(), false).value(),
147 std::move(watcher));
148 } else {
149 {
150 MutexLock lock(&mu_);
151 started_ = true;
152 is_serving_ = true;
153 }
154 listener_->Start();
155 }
156 } else {
157 listener_->Start();
158 }
159 }
160
Stop()161 void Server::ListenerState::Stop() {
162 if (IsServerListenerEnabled()) {
163 absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>>
164 connections;
165 {
166 MutexLock lock(&mu_);
167 // Orphan the connections so that they can start cleaning up.
168 connections = std::move(connections_);
169 connections_.clear();
170 is_serving_ = false;
171 }
172 if (config_fetcher_watcher_ != nullptr) {
173 CHECK_NE(server_->config_fetcher(), nullptr);
174 server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
175 }
176 }
177 GRPC_CLOSURE_INIT(&destroy_done_, ListenerDestroyDone, server_.get(),
178 grpc_schedule_on_exec_ctx);
179 listener_->SetOnDestroyDone(&destroy_done_);
180 listener_.reset();
181 }
182
AddLogicalConnection(OrphanablePtr<ListenerInterface::LogicalConnection> connection,const ChannelArgs & args,grpc_endpoint * endpoint)183 absl::optional<ChannelArgs> Server::ListenerState::AddLogicalConnection(
184 OrphanablePtr<ListenerInterface::LogicalConnection> connection,
185 const ChannelArgs& args, grpc_endpoint* endpoint) {
186 RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager;
187 {
188 MutexLock lock(&mu_);
189 if (!is_serving_) {
190 // Not serving
191 return absl::nullopt;
192 }
193 connection_manager = connection_manager_;
194 }
195 // The following section is intentionally outside the critical section. The
196 // operation to update channel args for a connection is heavy and complicated.
197 // For example, if using the xDS config fetcher, an involved matching process
198 // is performed to determine the filter chain to apply for this connection,
199 // prepare the filters, config selector and credentials. Subsequently, the
200 // credentials are used to create a security connector as well. Doing this
201 // outside the critical region allows us to get a larger degree of parallelism
202 // for the handling of incoming connections.
203 ChannelArgs new_args = args;
204 if (server_->config_fetcher() != nullptr) {
205 if (connection_manager == nullptr) {
206 // Connection manager not available
207 return absl::nullopt;
208 }
209 absl::StatusOr<ChannelArgs> args_result =
210 connection_manager->UpdateChannelArgsForConnection(new_args, endpoint);
211 if (!args_result.ok()) {
212 return absl::nullopt;
213 }
214 auto* server_credentials =
215 (*args_result).GetObject<grpc_server_credentials>();
216 if (server_credentials == nullptr) {
217 // Could not find server credentials
218 return absl::nullopt;
219 }
220 auto security_connector =
221 server_credentials->create_security_connector(*args_result);
222 if (security_connector == nullptr) {
223 // Unable to create secure server with credentials
224 return absl::nullopt;
225 }
226 new_args = (*args_result).SetObject(security_connector);
227 }
228 MutexLock lock(&mu_);
229 // Since we let go of the lock earlier, we need to protect ourselves against
230 // time-of-check-to-time-of-use cases. The server may have stopped serving
231 // or the connection manager may have changed before we add the connection
232 // to the list of tracked connections.
233 if (!is_serving_ || connection_manager != connection_manager_) {
234 // Not serving
235 return absl::nullopt;
236 }
237 connections_.emplace(std::move(connection));
238 return new_args;
239 }
240
OnHandshakeDone(ListenerInterface::LogicalConnection * connection)241 void Server::ListenerState::OnHandshakeDone(
242 ListenerInterface::LogicalConnection* connection) {
243 if (server_->config_fetcher() != nullptr) {
244 return;
245 }
246 // There is no config fetcher, so we can remove this connection from being
247 // tracked immediately.
248 OrphanablePtr<ListenerInterface::LogicalConnection> connection_to_remove;
249 {
250 // Remove the connection if it wasn't already removed.
251 MutexLock lock(&mu_);
252 auto connection_handle = connections_.extract(connection);
253 if (!connection_handle.empty()) {
254 connection_to_remove = std::move(connection_handle.value());
255 }
256 // We do not need to check connections_to_be_drained_list_ since that only
257 // gets set if there is a config fetcher.
258 }
259 }
260
RemoveLogicalConnection(ListenerInterface::LogicalConnection * connection)261 void Server::ListenerState::RemoveLogicalConnection(
262 ListenerInterface::LogicalConnection* connection) {
263 OrphanablePtr<ListenerInterface::LogicalConnection> connection_to_remove;
264 // Remove the connection if it wasn't already removed.
265 MutexLock lock(&mu_);
266 auto connection_handle = connections_.extract(connection);
267 if (!connection_handle.empty()) {
268 connection_to_remove = std::move(connection_handle.value());
269 return;
270 }
271 // The connection might be getting drained.
272 for (auto it = connections_to_be_drained_list_.begin();
273 it != connections_to_be_drained_list_.end(); ++it) {
274 auto connection_handle = it->connections.extract(connection);
275 if (!connection_handle.empty()) {
276 connection_to_remove = std::move(connection_handle.value());
277 RemoveConnectionsToBeDrainedOnEmptyLocked(it);
278 return;
279 }
280 }
281 }
282
DrainConnectionsLocked()283 void Server::ListenerState::DrainConnectionsLocked() {
284 if (connections_.empty()) {
285 return;
286 }
287 // Send GOAWAYs on the transports so that they disconnect when existing
288 // RPCs finish.
289 for (auto& connection : connections_) {
290 connection->SendGoAway();
291 }
292 connections_to_be_drained_list_.emplace_back();
293 auto& connections_to_be_drained = connections_to_be_drained_list_.back();
294 connections_to_be_drained.connections = std::move(connections_);
295 connections_.clear();
296 connections_to_be_drained.timestamp =
297 Timestamp::Now() +
298 std::max(Duration::Zero(),
299 server_->channel_args()
300 .GetDurationFromIntMillis(
301 GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
302 .value_or(Duration::Minutes(10)));
303 MaybeStartNewGraceTimerLocked();
304 }
305
OnDrainGraceTimer()306 void Server::ListenerState::OnDrainGraceTimer() {
307 absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>>
308 connections_to_be_drained;
309 {
310 MutexLock lock(&mu_);
311 if (connections_to_be_drained_list_.empty()) {
312 return;
313 }
314 connections_to_be_drained =
315 std::move(connections_to_be_drained_list_.front().connections);
316 connections_to_be_drained_list_.pop_front();
317 MaybeStartNewGraceTimerLocked();
318 }
319 for (auto& connection : connections_to_be_drained) {
320 connection->DisconnectImmediately();
321 }
322 }
323
MaybeStartNewGraceTimerLocked()324 void Server::ListenerState::MaybeStartNewGraceTimerLocked() {
325 if (connections_to_be_drained_list_.empty()) {
326 return;
327 }
328 drain_grace_timer_handle_ = event_engine()->RunAfter(
329 connections_to_be_drained_list_.front().timestamp - Timestamp::Now(),
330 [self = Ref()]() mutable {
331 ApplicationCallbackExecCtx callback_exec_ctx;
332 ExecCtx exec_ctx;
333 self->OnDrainGraceTimer();
334 // resetting within an active ExecCtx
335 self.reset();
336 });
337 }
338
RemoveConnectionsToBeDrainedOnEmptyLocked(std::deque<ConnectionsToBeDrained>::iterator it)339 void Server::ListenerState::RemoveConnectionsToBeDrainedOnEmptyLocked(
340 std::deque<ConnectionsToBeDrained>::iterator it) {
341 if (it->connections.empty()) {
342 // Cancel the timer if the set of connections is now empty.
343 if (event_engine()->Cancel(drain_grace_timer_handle_)) {
344 // Only remove the entry from the list if the cancellation was
345 // actually successful. OnDrainGraceTimer() will remove if
346 // cancellation is not successful.
347 connections_to_be_drained_list_.erase(it);
348 MaybeStartNewGraceTimerLocked();
349 }
350 }
351 }
352
353 //
354 // Server::RequestMatcherInterface
355 //
356
357 // RPCs that come in from the transport must be matched against RPC requests
358 // from the application. An incoming request from the application can be matched
359 // to an RPC that has already arrived or can be queued up for later use.
360 // Likewise, an RPC coming in from the transport can either be matched to a
361 // request that already arrived from the application or can be queued up for
362 // later use (marked pending). If there is a match, the request's tag is posted
363 // on the request's notification CQ.
364 //
365 // RequestMatcherInterface is the base class to provide this functionality.
366 class Server::RequestMatcherInterface {
367 public:
~RequestMatcherInterface()368 virtual ~RequestMatcherInterface() {}
369
370 // Unref the calls associated with any incoming RPCs in the pending queue (not
371 // yet matched to an application-requested RPC).
372 virtual void ZombifyPending() = 0;
373
374 // Mark all application-requested RPCs failed if they have not been matched to
375 // an incoming RPC. The error parameter indicates why the RPCs are being
376 // failed (always server shutdown in all current implementations).
377 virtual void KillRequests(grpc_error_handle error) = 0;
378
379 // How many request queues are supported by this matcher. This is an abstract
380 // concept that essentially maps to gRPC completion queues.
381 virtual size_t request_queue_count() const = 0;
382
383 // This function is invoked when the application requests a new RPC whose
384 // information is in the call parameter. The request_queue_index marks the
385 // queue onto which to place this RPC, and is typically associated with a gRPC
386 // CQ. If there are pending RPCs waiting to be matched, publish one (match it
387 // and notify the CQ).
388 virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
389 RequestedCall* call) = 0;
390
391 class MatchResult {
392 public:
MatchResult(Server * server,size_t cq_idx,RequestedCall * requested_call)393 MatchResult(Server* server, size_t cq_idx, RequestedCall* requested_call)
394 : server_(server), cq_idx_(cq_idx), requested_call_(requested_call) {}
~MatchResult()395 ~MatchResult() {
396 if (requested_call_ != nullptr) {
397 server_->FailCall(cq_idx_, requested_call_, absl::CancelledError());
398 }
399 }
400
401 MatchResult(const MatchResult&) = delete;
402 MatchResult& operator=(const MatchResult&) = delete;
403
MatchResult(MatchResult && other)404 MatchResult(MatchResult&& other) noexcept
405 : server_(other.server_),
406 cq_idx_(other.cq_idx_),
407 requested_call_(std::exchange(other.requested_call_, nullptr)) {}
408
TakeCall()409 RequestedCall* TakeCall() {
410 return std::exchange(requested_call_, nullptr);
411 }
412
cq() const413 grpc_completion_queue* cq() const { return server_->cqs_[cq_idx_]; }
cq_idx() const414 size_t cq_idx() const { return cq_idx_; }
415
416 private:
417 Server* server_;
418 size_t cq_idx_;
419 RequestedCall* requested_call_;
420 };
421
422 // This function is invoked on an incoming promise based RPC.
423 // The RequestMatcher will try to match it against an application-requested
424 // RPC if possible or will place it in the pending queue otherwise. To enable
425 // some measure of fairness between server CQs, the match is done starting at
426 // the start_request_queue_index parameter in a cyclic order rather than
427 // always starting at 0.
428 virtual ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
429 size_t start_request_queue_index) = 0;
430
431 // This function is invoked on an incoming RPC, represented by the calld
432 // object. The RequestMatcher will try to match it against an
433 // application-requested RPC if possible or will place it in the pending queue
434 // otherwise. To enable some measure of fairness between server CQs, the match
435 // is done starting at the start_request_queue_index parameter in a cyclic
436 // order rather than always starting at 0.
437 virtual void MatchOrQueue(size_t start_request_queue_index,
438 CallData* calld) = 0;
439
440 // Returns the server associated with this request matcher
441 virtual Server* server() const = 0;
442 };
443
444 //
445 // Server::RegisteredMethod
446 //
447
448 struct Server::RegisteredMethod {
RegisteredMethodgrpc_core::Server::RegisteredMethod449 RegisteredMethod(
450 const char* method_arg, const char* host_arg,
451 grpc_server_register_method_payload_handling payload_handling_arg,
452 uint32_t flags_arg)
453 : method(method_arg == nullptr ? "" : method_arg),
454 host(host_arg == nullptr ? "" : host_arg),
455 payload_handling(payload_handling_arg),
456 flags(flags_arg) {}
457
458 ~RegisteredMethod() = default;
459
460 const std::string method;
461 const std::string host;
462 const grpc_server_register_method_payload_handling payload_handling;
463 const uint32_t flags;
464 // One request matcher per method.
465 std::unique_ptr<RequestMatcherInterface> matcher;
466 };
467
468 //
469 // Server::RequestedCall
470 //
471
472 struct Server::RequestedCall {
473 enum class Type { BATCH_CALL, REGISTERED_CALL };
474
RequestedCallgrpc_core::Server::RequestedCall475 RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
476 grpc_call** call_arg, grpc_metadata_array* initial_md,
477 grpc_call_details* details)
478 : type(Type::BATCH_CALL),
479 tag(tag_arg),
480 cq_bound_to_call(call_cq),
481 call(call_arg),
482 initial_metadata(initial_md) {
483 data.batch.details = details;
484 }
485
RequestedCallgrpc_core::Server::RequestedCall486 RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
487 grpc_call** call_arg, grpc_metadata_array* initial_md,
488 RegisteredMethod* rm, gpr_timespec* deadline,
489 grpc_byte_buffer** optional_payload)
490 : type(Type::REGISTERED_CALL),
491 tag(tag_arg),
492 cq_bound_to_call(call_cq),
493 call(call_arg),
494 initial_metadata(initial_md) {
495 data.registered.method = rm;
496 data.registered.deadline = deadline;
497 data.registered.optional_payload = optional_payload;
498 }
499
500 template <typename OptionalPayload>
Completegrpc_core::Server::RequestedCall501 void Complete(OptionalPayload payload, ClientMetadata& md) {
502 Timestamp deadline =
503 md.get(GrpcTimeoutMetadata()).value_or(Timestamp::InfFuture());
504 switch (type) {
505 case RequestedCall::Type::BATCH_CALL:
506 CHECK(!payload.has_value());
507 data.batch.details->host =
508 CSliceRef(md.get_pointer(HttpAuthorityMetadata())->c_slice());
509 data.batch.details->method =
510 CSliceRef(md.Take(HttpPathMetadata())->c_slice());
511 data.batch.details->deadline =
512 deadline.as_timespec(GPR_CLOCK_MONOTONIC);
513 break;
514 case RequestedCall::Type::REGISTERED_CALL:
515 md.Remove(HttpPathMetadata());
516 *data.registered.deadline = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
517 if (data.registered.optional_payload != nullptr) {
518 if (payload.has_value()) {
519 auto* sb = payload.value()->payload()->c_slice_buffer();
520 *data.registered.optional_payload =
521 grpc_raw_byte_buffer_create(sb->slices, sb->count);
522 } else {
523 *data.registered.optional_payload = nullptr;
524 }
525 }
526 break;
527 default:
528 GPR_UNREACHABLE_CODE(abort());
529 }
530 }
531
532 MultiProducerSingleConsumerQueue::Node mpscq_node;
533 const Type type;
534 void* const tag;
535 grpc_completion_queue* const cq_bound_to_call;
536 grpc_call** const call;
537 grpc_cq_completion completion;
538 grpc_metadata_array* const initial_metadata;
539 union {
540 struct {
541 grpc_call_details* details;
542 } batch;
543 struct {
544 RegisteredMethod* method;
545 gpr_timespec* deadline;
546 grpc_byte_buffer** optional_payload;
547 } registered;
548 } data;
549 };
550
551 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
552 // actually uses all the features of RequestMatcherInterface: expecting the
553 // application to explicitly request RPCs and then matching those to incoming
554 // RPCs, along with a slow path by which incoming RPCs are put on a locked
555 // pending list if they aren't able to be matched to an application request.
556 class Server::RealRequestMatcher : public RequestMatcherInterface {
557 public:
RealRequestMatcher(Server * server)558 explicit RealRequestMatcher(Server* server)
559 : server_(server), requests_per_cq_(server->cqs_.size()) {}
560
~RealRequestMatcher()561 ~RealRequestMatcher() override {
562 for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
563 CHECK_EQ(queue.Pop(), nullptr);
564 }
565 CHECK(pending_filter_stack_.empty());
566 CHECK(pending_promises_.empty());
567 }
568
ZombifyPending()569 void ZombifyPending() override {
570 while (!pending_filter_stack_.empty()) {
571 pending_filter_stack_.front().calld->SetState(
572 CallData::CallState::ZOMBIED);
573 pending_filter_stack_.front().calld->KillZombie();
574 pending_filter_stack_.pop();
575 }
576 while (!pending_promises_.empty()) {
577 pending_promises_.front()->Finish(absl::InternalError("Server closed"));
578 pending_promises_.pop();
579 }
580 zombified_ = true;
581 }
582
KillRequests(grpc_error_handle error)583 void KillRequests(grpc_error_handle error) override {
584 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
585 RequestedCall* rc;
586 while ((rc = reinterpret_cast<RequestedCall*>(
587 requests_per_cq_[i].Pop())) != nullptr) {
588 server_->FailCall(i, rc, error);
589 }
590 }
591 }
592
request_queue_count() const593 size_t request_queue_count() const override {
594 return requests_per_cq_.size();
595 }
596
RequestCallWithPossiblePublish(size_t request_queue_index,RequestedCall * call)597 void RequestCallWithPossiblePublish(size_t request_queue_index,
598 RequestedCall* call) override {
599 if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
600 // this was the first queued request: we need to lock and start
601 // matching calls
602 struct NextPendingCall {
603 RequestedCall* rc = nullptr;
604 CallData* pending_filter_stack = nullptr;
605 PendingCallPromises pending_promise;
606 };
607 while (true) {
608 NextPendingCall pending_call;
609 {
610 MutexLock lock(&server_->mu_call_);
611 while (!pending_filter_stack_.empty() &&
612 pending_filter_stack_.front().Age() >
613 server_->max_time_in_pending_queue_) {
614 pending_filter_stack_.front().calld->SetState(
615 CallData::CallState::ZOMBIED);
616 pending_filter_stack_.front().calld->KillZombie();
617 pending_filter_stack_.pop();
618 }
619 if (!pending_promises_.empty()) {
620 pending_call.rc = reinterpret_cast<RequestedCall*>(
621 requests_per_cq_[request_queue_index].Pop());
622 if (pending_call.rc != nullptr) {
623 pending_call.pending_promise =
624 std::move(pending_promises_.front());
625 pending_promises_.pop();
626 }
627 } else if (!pending_filter_stack_.empty()) {
628 pending_call.rc = reinterpret_cast<RequestedCall*>(
629 requests_per_cq_[request_queue_index].Pop());
630 if (pending_call.rc != nullptr) {
631 pending_call.pending_filter_stack =
632 pending_filter_stack_.front().calld;
633 pending_filter_stack_.pop();
634 }
635 }
636 }
637 if (pending_call.rc == nullptr) break;
638 if (pending_call.pending_filter_stack != nullptr) {
639 if (!pending_call.pending_filter_stack->MaybeActivate()) {
640 // Zombied Call
641 pending_call.pending_filter_stack->KillZombie();
642 requests_per_cq_[request_queue_index].Push(
643 &pending_call.rc->mpscq_node);
644 } else {
645 pending_call.pending_filter_stack->Publish(request_queue_index,
646 pending_call.rc);
647 }
648 } else {
649 if (!pending_call.pending_promise->Finish(
650 server(), request_queue_index, pending_call.rc)) {
651 requests_per_cq_[request_queue_index].Push(
652 &pending_call.rc->mpscq_node);
653 }
654 }
655 }
656 }
657 }
658
MatchOrQueue(size_t start_request_queue_index,CallData * calld)659 void MatchOrQueue(size_t start_request_queue_index,
660 CallData* calld) override {
661 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
662 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
663 RequestedCall* rc =
664 reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
665 if (rc != nullptr) {
666 calld->SetState(CallData::CallState::ACTIVATED);
667 calld->Publish(cq_idx, rc);
668 return;
669 }
670 }
671 // No cq to take the request found; queue it on the slow list.
672 // We need to ensure that all the queues are empty. We do this under
673 // the server mu_call_ lock to ensure that if something is added to
674 // an empty request queue, it will block until the call is actually
675 // added to the pending list.
676 RequestedCall* rc = nullptr;
677 size_t cq_idx = 0;
678 size_t loop_count;
679 {
680 MutexLock lock(&server_->mu_call_);
681 for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
682 cq_idx =
683 (start_request_queue_index + loop_count) % requests_per_cq_.size();
684 rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
685 if (rc != nullptr) {
686 break;
687 }
688 }
689 if (rc == nullptr) {
690 calld->SetState(CallData::CallState::PENDING);
691 pending_filter_stack_.push(PendingCallFilterStack{calld});
692 return;
693 }
694 }
695 calld->SetState(CallData::CallState::ACTIVATED);
696 calld->Publish(cq_idx, rc);
697 }
698
MatchRequest(size_t start_request_queue_index)699 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
700 size_t start_request_queue_index) override {
701 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
702 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
703 RequestedCall* rc =
704 reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
705 if (rc != nullptr) {
706 return Immediate(MatchResult(server(), cq_idx, rc));
707 }
708 }
709 // No cq to take the request found; queue it on the slow list.
710 // We need to ensure that all the queues are empty. We do this under
711 // the server mu_call_ lock to ensure that if something is added to
712 // an empty request queue, it will block until the call is actually
713 // added to the pending list.
714 RequestedCall* rc = nullptr;
715 size_t cq_idx = 0;
716 size_t loop_count;
717 {
718 std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
719 MutexLock lock(&server_->mu_call_);
720 while (!pending_promises_.empty() &&
721 pending_promises_.front()->Age() >
722 server_->max_time_in_pending_queue_) {
723 removed_pending.push_back(std::move(pending_promises_.front()));
724 pending_promises_.pop();
725 }
726 for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
727 cq_idx =
728 (start_request_queue_index + loop_count) % requests_per_cq_.size();
729 rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
730 if (rc != nullptr) break;
731 }
732 if (rc == nullptr) {
733 if (server_->pending_backlog_protector_.Reject(pending_promises_.size(),
734 server_->bitgen_)) {
735 return Immediate(absl::ResourceExhaustedError(
736 "Too many pending requests for this server"));
737 }
738 if (zombified_) {
739 return Immediate(absl::InternalError("Server closed"));
740 }
741 auto w = std::make_shared<ActivityWaiter>(
742 GetContext<Activity>()->MakeOwningWaker());
743 pending_promises_.push(w);
744 return OnCancel(
745 [w]() -> Poll<absl::StatusOr<MatchResult>> {
746 std::unique_ptr<absl::StatusOr<MatchResult>> r(
747 w->result.exchange(nullptr, std::memory_order_acq_rel));
748 if (r == nullptr) return Pending{};
749 return std::move(*r);
750 },
751 [w]() { w->Finish(absl::CancelledError()); });
752 }
753 }
754 return Immediate(MatchResult(server(), cq_idx, rc));
755 }
756
server() const757 Server* server() const final { return server_; }
758
759 private:
760 Server* const server_;
761 struct PendingCallFilterStack {
762 CallData* calld;
763 Timestamp created = Timestamp::Now();
Agegrpc_core::Server::RealRequestMatcher::PendingCallFilterStack764 Duration Age() { return Timestamp::Now() - created; }
765 };
766 struct ActivityWaiter {
767 using ResultType = absl::StatusOr<MatchResult>;
ActivityWaitergrpc_core::Server::RealRequestMatcher::ActivityWaiter768 explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaitergrpc_core::Server::RealRequestMatcher::ActivityWaiter769 ~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
Finishgrpc_core::Server::RealRequestMatcher::ActivityWaiter770 void Finish(absl::Status status) {
771 ResultType* expected = nullptr;
772 ResultType* new_value = new ResultType(std::move(status));
773 if (!result.compare_exchange_strong(expected, new_value,
774 std::memory_order_acq_rel,
775 std::memory_order_acquire)) {
776 delete new_value;
777 return;
778 }
779 waker.WakeupAsync();
780 }
781 // Returns true if requested_call consumed, false otherwise.
Finishgrpc_core::Server::RealRequestMatcher::ActivityWaiter782 GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx,
783 RequestedCall* requested_call) {
784 ResultType* expected = nullptr;
785 ResultType* new_value =
786 new ResultType(MatchResult(server, cq_idx, requested_call));
787 if (!result.compare_exchange_strong(expected, new_value,
788 std::memory_order_acq_rel,
789 std::memory_order_acquire)) {
790 CHECK(new_value->value().TakeCall() == requested_call);
791 delete new_value;
792 return false;
793 }
794 waker.WakeupAsync();
795 return true;
796 }
Agegrpc_core::Server::RealRequestMatcher::ActivityWaiter797 Duration Age() { return Timestamp::Now() - created; }
798 Waker waker;
799 std::atomic<ResultType*> result{nullptr};
800 const Timestamp created = Timestamp::Now();
801 };
802 using PendingCallPromises = std::shared_ptr<ActivityWaiter>;
803 std::queue<PendingCallFilterStack> pending_filter_stack_;
804 std::queue<PendingCallPromises> pending_promises_;
805 std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
806 bool zombified_ = false;
807 };
808
809 // AllocatingRequestMatchers don't allow the application to request an RPC in
810 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
811 // will call out to an allocation function passed in at the construction of the
812 // object. These request matchers are designed for the C++ callback API, so they
813 // only support 1 completion queue (passed in at the constructor). They are also
814 // used for the sync API.
815 class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface {
816 public:
AllocatingRequestMatcherBase(Server * server,grpc_completion_queue * cq)817 AllocatingRequestMatcherBase(Server* server, grpc_completion_queue* cq)
818 : server_(server), cq_(cq) {
819 size_t idx;
820 for (idx = 0; idx < server->cqs_.size(); idx++) {
821 if (server->cqs_[idx] == cq) {
822 break;
823 }
824 }
825 CHECK(idx < server->cqs_.size());
826 cq_idx_ = idx;
827 }
828
ZombifyPending()829 void ZombifyPending() override {}
830
KillRequests(grpc_error_handle)831 void KillRequests(grpc_error_handle /*error*/) override {}
832
request_queue_count() const833 size_t request_queue_count() const override { return 0; }
834
RequestCallWithPossiblePublish(size_t,RequestedCall *)835 void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
836 RequestedCall* /*call*/) final {
837 Crash("unreachable");
838 }
839
server() const840 Server* server() const final { return server_; }
841
842 // Supply the completion queue related to this request matcher
cq() const843 grpc_completion_queue* cq() const { return cq_; }
844
845 // Supply the completion queue's index relative to the server.
cq_idx() const846 size_t cq_idx() const { return cq_idx_; }
847
848 private:
849 Server* const server_;
850 grpc_completion_queue* const cq_;
851 size_t cq_idx_;
852 };
853
854 // An allocating request matcher for non-registered methods (used for generic
855 // API and unimplemented RPCs).
856 class Server::AllocatingRequestMatcherBatch
857 : public AllocatingRequestMatcherBase {
858 public:
AllocatingRequestMatcherBatch(Server * server,grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)859 AllocatingRequestMatcherBatch(Server* server, grpc_completion_queue* cq,
860 std::function<BatchCallAllocation()> allocator)
861 : AllocatingRequestMatcherBase(server, cq),
862 allocator_(std::move(allocator)) {}
863
MatchOrQueue(size_t,CallData * calld)864 void MatchOrQueue(size_t /*start_request_queue_index*/,
865 CallData* calld) override {
866 const bool still_running = server()->ShutdownRefOnRequest();
867 auto cleanup_ref =
868 absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
869 if (still_running) {
870 BatchCallAllocation call_info = allocator_();
871 CHECK(server()->ValidateServerRequest(cq(),
872 static_cast<void*>(call_info.tag),
873 nullptr, nullptr) == GRPC_CALL_OK);
874 RequestedCall* rc = new RequestedCall(
875 static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
876 call_info.initial_metadata, call_info.details);
877 calld->SetState(CallData::CallState::ACTIVATED);
878 calld->Publish(cq_idx(), rc);
879 } else {
880 calld->FailCallCreation();
881 }
882 }
883
MatchRequest(size_t)884 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
885 size_t /*start_request_queue_index*/) override {
886 BatchCallAllocation call_info = allocator_();
887 CHECK(server()->ValidateServerRequest(cq(),
888 static_cast<void*>(call_info.tag),
889 nullptr, nullptr) == GRPC_CALL_OK);
890 RequestedCall* rc = new RequestedCall(
891 static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
892 call_info.initial_metadata, call_info.details);
893 return Immediate(MatchResult(server(), cq_idx(), rc));
894 }
895
896 private:
897 std::function<BatchCallAllocation()> allocator_;
898 };
899
900 // An allocating request matcher for registered methods.
901 class Server::AllocatingRequestMatcherRegistered
902 : public AllocatingRequestMatcherBase {
903 public:
AllocatingRequestMatcherRegistered(Server * server,grpc_completion_queue * cq,RegisteredMethod * rm,std::function<RegisteredCallAllocation ()> allocator)904 AllocatingRequestMatcherRegistered(
905 Server* server, grpc_completion_queue* cq, RegisteredMethod* rm,
906 std::function<RegisteredCallAllocation()> allocator)
907 : AllocatingRequestMatcherBase(server, cq),
908 registered_method_(rm),
909 allocator_(std::move(allocator)) {}
910
MatchOrQueue(size_t,CallData * calld)911 void MatchOrQueue(size_t /*start_request_queue_index*/,
912 CallData* calld) override {
913 auto cleanup_ref =
914 absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
915 if (server()->ShutdownRefOnRequest()) {
916 RegisteredCallAllocation call_info = allocator_();
917 CHECK(server()->ValidateServerRequest(
918 cq(), call_info.tag, call_info.optional_payload,
919 registered_method_) == GRPC_CALL_OK);
920 RequestedCall* rc =
921 new RequestedCall(call_info.tag, call_info.cq, call_info.call,
922 call_info.initial_metadata, registered_method_,
923 call_info.deadline, call_info.optional_payload);
924 calld->SetState(CallData::CallState::ACTIVATED);
925 calld->Publish(cq_idx(), rc);
926 } else {
927 calld->FailCallCreation();
928 }
929 }
930
MatchRequest(size_t)931 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
932 size_t /*start_request_queue_index*/) override {
933 RegisteredCallAllocation call_info = allocator_();
934 CHECK(server()->ValidateServerRequest(cq(), call_info.tag,
935 call_info.optional_payload,
936 registered_method_) == GRPC_CALL_OK);
937 RequestedCall* rc = new RequestedCall(
938 call_info.tag, call_info.cq, call_info.call, call_info.initial_metadata,
939 registered_method_, call_info.deadline, call_info.optional_payload);
940 return Immediate(MatchResult(server(), cq_idx(), rc));
941 }
942
943 private:
944 RegisteredMethod* const registered_method_;
945 std::function<RegisteredCallAllocation()> allocator_;
946 };
947
948 //
949 // ChannelBroadcaster
950 //
951
952 namespace {
953
954 class ChannelBroadcaster {
955 public:
956 // This can have an empty constructor and destructor since we want to control
957 // when the actual setup and shutdown broadcast take place.
958
959 // Copies over the channels from the locked server.
FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels)960 void FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels) {
961 DCHECK(channels_.empty());
962 channels_ = std::move(channels);
963 }
964
965 // Broadcasts a shutdown on each channel.
BroadcastShutdown(bool send_goaway,grpc_error_handle force_disconnect)966 void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) {
967 for (const RefCountedPtr<Channel>& channel : channels_) {
968 SendShutdown(channel.get(), send_goaway, force_disconnect);
969 }
970 channels_.clear(); // just for safety against double broadcast
971 }
972
973 private:
974 struct ShutdownCleanupArgs {
975 grpc_closure closure;
976 grpc_slice slice;
977 };
978
ShutdownCleanup(void * arg,grpc_error_handle)979 static void ShutdownCleanup(void* arg, grpc_error_handle /*error*/) {
980 ShutdownCleanupArgs* a = static_cast<ShutdownCleanupArgs*>(arg);
981 CSliceUnref(a->slice);
982 delete a;
983 }
984
SendShutdown(Channel * channel,bool send_goaway,grpc_error_handle send_disconnect)985 static void SendShutdown(Channel* channel, bool send_goaway,
986 grpc_error_handle send_disconnect) {
987 ShutdownCleanupArgs* sc = new ShutdownCleanupArgs;
988 GRPC_CLOSURE_INIT(&sc->closure, ShutdownCleanup, sc,
989 grpc_schedule_on_exec_ctx);
990 grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
991 grpc_channel_element* elem;
992 op->goaway_error =
993 send_goaway
994 ? grpc_error_set_int(GRPC_ERROR_CREATE("Server shutdown"),
995 StatusIntProperty::kRpcStatus, GRPC_STATUS_OK)
996 : absl::OkStatus();
997 sc->slice = grpc_slice_from_copied_string("Server shutdown");
998 op->disconnect_with_error = send_disconnect;
999 elem = grpc_channel_stack_element(channel->channel_stack(), 0);
1000 elem->filter->start_transport_op(elem, op);
1001 }
1002
1003 std::vector<RefCountedPtr<Channel>> channels_;
1004 };
1005
1006 } // namespace
1007
1008 //
1009 // Server::TransportConnectivityWatcher
1010 //
1011
1012 class Server::TransportConnectivityWatcher
1013 : public AsyncConnectivityStateWatcherInterface {
1014 public:
TransportConnectivityWatcher(RefCountedPtr<ServerTransport> transport,RefCountedPtr<Server> server)1015 TransportConnectivityWatcher(RefCountedPtr<ServerTransport> transport,
1016 RefCountedPtr<Server> server)
1017 : transport_(std::move(transport)), server_(std::move(server)) {}
1018
1019 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status &)1020 void OnConnectivityStateChange(grpc_connectivity_state new_state,
1021 const absl::Status& /*status*/) override {
1022 // Don't do anything until we are being shut down.
1023 if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1024 // Shut down channel.
1025 MutexLock lock(&server_->mu_global_);
1026 server_->connections_.erase(transport_.get());
1027 --server_->connections_open_;
1028 server_->MaybeFinishShutdown();
1029 }
1030
1031 RefCountedPtr<ServerTransport> transport_;
1032 RefCountedPtr<Server> server_;
1033 };
1034
1035 //
1036 // Server
1037 //
1038
1039 const grpc_channel_filter Server::kServerTopFilter = {
1040 Server::CallData::StartTransportStreamOpBatch,
1041 grpc_channel_next_op,
1042 sizeof(Server::CallData),
1043 Server::CallData::InitCallElement,
1044 grpc_call_stack_ignore_set_pollset_or_pollset_set,
1045 Server::CallData::DestroyCallElement,
1046 sizeof(Server::ChannelData),
1047 Server::ChannelData::InitChannelElement,
1048 grpc_channel_stack_no_post_init,
1049 Server::ChannelData::DestroyChannelElement,
1050 grpc_channel_next_get_info,
1051 GRPC_UNIQUE_TYPE_NAME_HERE("server"),
1052 };
1053
1054 namespace {
1055
CreateChannelzNode(const ChannelArgs & args)1056 RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
1057 const ChannelArgs& args) {
1058 RefCountedPtr<channelz::ServerNode> channelz_node;
1059 if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
1060 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
1061 size_t channel_tracer_max_memory = std::max(
1062 0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
1063 .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
1064 channelz_node =
1065 MakeRefCounted<channelz::ServerNode>(channel_tracer_max_memory);
1066 channelz_node->AddTraceEvent(
1067 channelz::ChannelTrace::Severity::Info,
1068 grpc_slice_from_static_string("Server created"));
1069 }
1070 return channelz_node;
1071 }
1072
CheckClientMetadata(ValueOrFailure<ClientMetadataHandle> md)1073 absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
1074 ValueOrFailure<ClientMetadataHandle> md) {
1075 if (!md.ok()) {
1076 return absl::InternalError("Error reading metadata");
1077 }
1078 if (!md.value()->get_pointer(HttpPathMetadata())) {
1079 return absl::InternalError("Missing :path header");
1080 }
1081 if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
1082 return absl::InternalError("Missing :authority header");
1083 }
1084 return std::move(*md);
1085 }
1086 } // namespace
1087
MatchAndPublishCall(CallHandler call_handler)1088 auto Server::MatchAndPublishCall(CallHandler call_handler) {
1089 call_handler.SpawnGuardedUntilCallCompletes(
1090 "request_matcher", [this, call_handler]() mutable {
1091 return TrySeq(
1092 // Wait for initial metadata to pass through all filters
1093 Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
1094 // Match request with requested call
1095 [this, call_handler](ClientMetadataHandle md) mutable {
1096 auto* registered_method = static_cast<RegisteredMethod*>(
1097 md->get(GrpcRegisteredMethod()).value_or(nullptr));
1098 RequestMatcherInterface* rm;
1099 grpc_server_register_method_payload_handling payload_handling =
1100 GRPC_SRM_PAYLOAD_NONE;
1101 if (registered_method == nullptr) {
1102 rm = unregistered_request_matcher_.get();
1103 } else {
1104 payload_handling = registered_method->payload_handling;
1105 rm = registered_method->matcher.get();
1106 }
1107 using FirstMessageResult =
1108 ValueOrFailure<absl::optional<MessageHandle>>;
1109 auto maybe_read_first_message = If(
1110 payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
1111 [call_handler]() mutable {
1112 return Map(
1113 call_handler.PullMessage(),
1114 [](ClientToServerNextMessage next_msg)
1115 -> FirstMessageResult {
1116 if (!next_msg.ok()) return Failure{};
1117 if (!next_msg.has_value()) {
1118 return FirstMessageResult(absl::nullopt);
1119 }
1120 return FirstMessageResult(next_msg.TakeValue());
1121 });
1122 },
1123 []() -> FirstMessageResult {
1124 return FirstMessageResult(absl::nullopt);
1125 });
1126 return TryJoin<absl::StatusOr>(
1127 std::move(maybe_read_first_message), rm->MatchRequest(0),
1128 [md = std::move(md)]() mutable {
1129 return ValueOrFailure<ClientMetadataHandle>(std::move(md));
1130 });
1131 },
1132 // Publish call to cq
1133 [call_handler,
1134 this](std::tuple<absl::optional<MessageHandle>,
1135 RequestMatcherInterface::MatchResult,
1136 ClientMetadataHandle>
1137 r) {
1138 RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
1139 auto md = std::move(std::get<2>(r));
1140 auto* rc = mr.TakeCall();
1141 rc->Complete(std::move(std::get<0>(r)), *md);
1142 grpc_call* call =
1143 MakeServerCall(call_handler, std::move(md), this,
1144 rc->cq_bound_to_call, rc->initial_metadata);
1145 *rc->call = call;
1146 return Map(
1147 WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
1148 [rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
1149 return absl::OkStatus();
1150 });
1151 });
1152 });
1153 }
1154
1155 absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>
MakeCallDestination(const ChannelArgs & args)1156 Server::MakeCallDestination(const ChannelArgs& args) {
1157 InterceptionChainBuilder builder(args);
1158 // TODO(ctiller): find a way to avoid adding a server ref per call
1159 builder.AddOnClientInitialMetadata([self = Ref()](ClientMetadata& md) {
1160 self->SetRegisteredMethodOnMetadata(md);
1161 });
1162 CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
1163 GRPC_SERVER_CHANNEL, builder);
1164 return builder.Build(
1165 MakeCallDestinationFromHandlerFunction([this](CallHandler handler) {
1166 return MatchAndPublishCall(std::move(handler));
1167 }));
1168 }
1169
Server(const ChannelArgs & args)1170 Server::Server(const ChannelArgs& args)
1171 : channel_args_(args),
1172 channelz_node_(CreateChannelzNode(args)),
1173 server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
1174 compression_options_(CompressionOptionsFromChannelArgs(args)),
1175 max_time_in_pending_queue_(Duration::Seconds(
1176 channel_args_
1177 .GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)
1178 .value_or(30))) {}
1179
~Server()1180 Server::~Server() {
1181 // Remove the cq pollsets from the config_fetcher.
1182 if (started_ && config_fetcher_ != nullptr &&
1183 config_fetcher_->interested_parties() != nullptr) {
1184 for (grpc_pollset* pollset : pollsets_) {
1185 grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(),
1186 pollset);
1187 }
1188 }
1189 for (size_t i = 0; i < cqs_.size(); i++) {
1190 GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server");
1191 }
1192 }
1193
AddListener(OrphanablePtr<ListenerInterface> listener)1194 void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {
1195 channelz::ListenSocketNode* listen_socket_node =
1196 listener->channelz_listen_socket_node();
1197 if (listen_socket_node != nullptr && channelz_node_ != nullptr) {
1198 channelz_node_->AddChildListenSocket(
1199 listen_socket_node->RefAsSubclass<channelz::ListenSocketNode>());
1200 }
1201 ListenerInterface* ptr = listener.get();
1202 listener_states_.emplace_back(
1203 MakeRefCounted<ListenerState>(Ref(), std::move(listener)));
1204 ptr->SetServerListenerState(listener_states_.back());
1205 }
1206
Start()1207 void Server::Start() {
1208 started_ = true;
1209 for (grpc_completion_queue* cq : cqs_) {
1210 if (grpc_cq_can_listen(cq)) {
1211 pollsets_.push_back(grpc_cq_pollset(cq));
1212 }
1213 }
1214 if (unregistered_request_matcher_ == nullptr) {
1215 unregistered_request_matcher_ = std::make_unique<RealRequestMatcher>(this);
1216 }
1217 for (auto& rm : registered_methods_) {
1218 if (rm.second->matcher == nullptr) {
1219 rm.second->matcher = std::make_unique<RealRequestMatcher>(this);
1220 }
1221 }
1222 {
1223 MutexLock lock(&mu_global_);
1224 starting_ = true;
1225 }
1226 // Register the interested parties from the config fetcher to the cq pollsets
1227 // before starting listeners so that config fetcher is being polled when the
1228 // listeners start watch the fetcher.
1229 if (config_fetcher_ != nullptr &&
1230 config_fetcher_->interested_parties() != nullptr) {
1231 for (grpc_pollset* pollset : pollsets_) {
1232 grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(),
1233 pollset);
1234 }
1235 }
1236 for (auto& listener_state : listener_states_) {
1237 listener_state->Start();
1238 }
1239 MutexLock lock(&mu_global_);
1240 starting_ = false;
1241 starting_cv_.Signal();
1242 }
1243
SetupTransport(Transport * transport,grpc_pollset * accepting_pollset,const ChannelArgs & args,const RefCountedPtr<channelz::SocketNode> & socket_node)1244 grpc_error_handle Server::SetupTransport(
1245 Transport* transport, grpc_pollset* accepting_pollset,
1246 const ChannelArgs& args,
1247 const RefCountedPtr<channelz::SocketNode>& socket_node) {
1248 GRPC_LATENT_SEE_INNER_SCOPE("Server::SetupTransport");
1249 // Create channel.
1250 global_stats().IncrementServerChannelsCreated();
1251 // Set up channelz node.
1252 if (transport->server_transport() != nullptr) {
1253 // Take ownership
1254 // TODO(ctiller): post-v3-transition make this method take an
1255 // OrphanablePtr<ServerTransport> directly.
1256 OrphanablePtr<ServerTransport> t(transport->server_transport());
1257 auto destination = MakeCallDestination(args.SetObject(transport));
1258 if (!destination.ok()) {
1259 return absl_status_to_grpc_error(destination.status());
1260 }
1261 // TODO(ctiller): add channelz node
1262 t->SetCallDestination(std::move(*destination));
1263 MutexLock lock(&mu_global_);
1264 if (ShutdownCalled()) {
1265 t->DisconnectWithError(GRPC_ERROR_CREATE("Server shutdown"));
1266 }
1267 t->StartConnectivityWatch(MakeOrphanable<TransportConnectivityWatcher>(
1268 t->RefAsSubclass<ServerTransport>(), Ref()));
1269 GRPC_TRACE_LOG(server_channel, INFO) << "Adding connection";
1270 connections_.emplace(std::move(t));
1271 ++connections_open_;
1272 } else {
1273 CHECK(transport->filter_stack_transport() != nullptr);
1274 absl::StatusOr<RefCountedPtr<Channel>> channel = LegacyChannel::Create(
1275 "", args.SetObject(transport), GRPC_SERVER_CHANNEL);
1276 if (!channel.ok()) {
1277 return absl_status_to_grpc_error(channel.status());
1278 }
1279 CHECK(*channel != nullptr);
1280 auto* channel_stack = (*channel)->channel_stack();
1281 CHECK(channel_stack != nullptr);
1282 ChannelData* chand = static_cast<ChannelData*>(
1283 grpc_channel_stack_element(channel_stack, 0)->channel_data);
1284 // Set up CQs.
1285 size_t cq_idx;
1286 for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
1287 if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break;
1288 }
1289 if (cq_idx == cqs_.size()) {
1290 // Completion queue not found. Pick a random one to publish new calls to.
1291 cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size());
1292 }
1293 intptr_t channelz_socket_uuid = 0;
1294 if (socket_node != nullptr) {
1295 channelz_socket_uuid = socket_node->uuid();
1296 channelz_node_->AddChildSocket(socket_node);
1297 }
1298 // Initialize chand.
1299 chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
1300 channelz_socket_uuid);
1301 }
1302 return absl::OkStatus();
1303 }
1304
HasOpenConnections()1305 bool Server::HasOpenConnections() {
1306 MutexLock lock(&mu_global_);
1307 return !channels_.empty() || !connections_.empty();
1308 }
1309
SetRegisteredMethodAllocator(grpc_completion_queue * cq,void * method_tag,std::function<RegisteredCallAllocation ()> allocator)1310 void Server::SetRegisteredMethodAllocator(
1311 grpc_completion_queue* cq, void* method_tag,
1312 std::function<RegisteredCallAllocation()> allocator) {
1313 RegisteredMethod* rm = static_cast<RegisteredMethod*>(method_tag);
1314 rm->matcher = std::make_unique<AllocatingRequestMatcherRegistered>(
1315 this, cq, rm, std::move(allocator));
1316 }
1317
SetBatchMethodAllocator(grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)1318 void Server::SetBatchMethodAllocator(
1319 grpc_completion_queue* cq, std::function<BatchCallAllocation()> allocator) {
1320 DCHECK(unregistered_request_matcher_ == nullptr);
1321 unregistered_request_matcher_ =
1322 std::make_unique<AllocatingRequestMatcherBatch>(this, cq,
1323 std::move(allocator));
1324 }
1325
RegisterCompletionQueue(grpc_completion_queue * cq)1326 void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {
1327 for (grpc_completion_queue* queue : cqs_) {
1328 if (queue == cq) return;
1329 }
1330 GRPC_CQ_INTERNAL_REF(cq, "server");
1331 cqs_.push_back(cq);
1332 }
1333
RegisterMethod(const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1334 Server::RegisteredMethod* Server::RegisterMethod(
1335 const char* method, const char* host,
1336 grpc_server_register_method_payload_handling payload_handling,
1337 uint32_t flags) {
1338 if (started_) {
1339 Crash("Attempting to register method after server started");
1340 }
1341
1342 if (!method) {
1343 LOG(ERROR) << "grpc_server_register_method method string cannot be NULL";
1344 return nullptr;
1345 }
1346 auto key = std::make_pair(host ? host : "", method);
1347 if (registered_methods_.find(key) != registered_methods_.end()) {
1348 LOG(ERROR) << "duplicate registration for " << method << "@"
1349 << (host ? host : "*");
1350 return nullptr;
1351 }
1352 if (flags != 0) {
1353 LOG(ERROR) << "grpc_server_register_method invalid flags "
1354 << absl::StrFormat("0x%08x", flags);
1355 return nullptr;
1356 }
1357 auto it = registered_methods_.emplace(
1358 key, std::make_unique<RegisteredMethod>(method, host, payload_handling,
1359 flags));
1360 return it.first->second.get();
1361 }
1362
DoneRequestEvent(void * req,grpc_cq_completion *)1363 void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {
1364 delete static_cast<RequestedCall*>(req);
1365 }
1366
FailCall(size_t cq_idx,RequestedCall * rc,grpc_error_handle error)1367 void Server::FailCall(size_t cq_idx, RequestedCall* rc,
1368 grpc_error_handle error) {
1369 *rc->call = nullptr;
1370 rc->initial_metadata->count = 0;
1371 CHECK(!error.ok());
1372 grpc_cq_end_op(cqs_[cq_idx], rc->tag, error, DoneRequestEvent, rc,
1373 &rc->completion);
1374 }
1375
1376 // Before calling MaybeFinishShutdown(), we must hold mu_global_ and not
1377 // hold mu_call_.
MaybeFinishShutdown()1378 void Server::MaybeFinishShutdown() {
1379 if (!ShutdownReady() || shutdown_published_) {
1380 return;
1381 }
1382 {
1383 MutexLock lock(&mu_call_);
1384 KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1385 }
1386 if (!channels_.empty() || connections_open_ > 0 ||
1387 listeners_destroyed_ < listener_states_.size()) {
1388 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
1389 last_shutdown_message_time_),
1390 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
1391 last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1392 VLOG(2) << "Waiting for " << channels_.size() << " channels "
1393 << connections_open_ << " connections and "
1394 << listener_states_.size() - listeners_destroyed_ << "/"
1395 << listener_states_.size()
1396 << " listeners to be destroyed before shutting down server";
1397 }
1398 return;
1399 }
1400 shutdown_published_ = true;
1401 for (auto& shutdown_tag : shutdown_tags_) {
1402 Ref().release();
1403 grpc_cq_end_op(shutdown_tag.cq, shutdown_tag.tag, absl::OkStatus(),
1404 DoneShutdownEvent, this, &shutdown_tag.completion);
1405 }
1406 }
1407
KillPendingWorkLocked(grpc_error_handle error)1408 void Server::KillPendingWorkLocked(grpc_error_handle error) {
1409 if (started_) {
1410 unregistered_request_matcher_->KillRequests(error);
1411 unregistered_request_matcher_->ZombifyPending();
1412 for (auto& rm : registered_methods_) {
1413 rm.second->matcher->KillRequests(error);
1414 rm.second->matcher->ZombifyPending();
1415 }
1416 }
1417 }
1418
GetChannelsLocked() const1419 std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {
1420 std::vector<RefCountedPtr<Channel>> channels;
1421 channels.reserve(channels_.size());
1422 for (const ChannelData* chand : channels_) {
1423 channels.push_back(chand->channel()->RefAsSubclass<Channel>());
1424 }
1425 return channels;
1426 }
1427
ListenerDestroyDone(void * arg,grpc_error_handle)1428 void Server::ListenerDestroyDone(void* arg, grpc_error_handle /*error*/) {
1429 Server* server = static_cast<Server*>(arg);
1430 MutexLock lock(&server->mu_global_);
1431 server->listeners_destroyed_++;
1432 server->MaybeFinishShutdown();
1433 }
1434
1435 namespace {
1436
DonePublishedShutdown(void *,grpc_cq_completion * storage)1437 void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
1438 delete storage;
1439 }
1440
1441 } // namespace
1442
1443 // - Kills all pending requests-for-incoming-RPC-calls (i.e., the requests made
1444 // via grpc_server_request_call() and grpc_server_request_registered_call()
1445 // will now be cancelled). See KillPendingWorkLocked().
1446 //
1447 // - Shuts down the listeners (i.e., the server will no longer listen on the
1448 // port for new incoming channels).
1449 //
1450 // - Iterates through all channels on the server and sends shutdown msg (see
1451 // ChannelBroadcaster::BroadcastShutdown() for details) to the clients via
1452 // the transport layer. The transport layer then guarantees the following:
1453 // -- Sends shutdown to the client (e.g., HTTP2 transport sends GOAWAY).
1454 // -- If the server has outstanding calls that are in the process, the
1455 // connection is NOT closed until the server is done with all those calls.
1456 // -- Once there are no more calls in progress, the channel is closed.
ShutdownAndNotify(grpc_completion_queue * cq,void * tag)1457 void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
1458 ChannelBroadcaster broadcaster;
1459 absl::flat_hash_set<OrphanablePtr<ServerTransport>> removing_connections;
1460 {
1461 // Wait for startup to be finished. Locks mu_global.
1462 MutexLock lock(&mu_global_);
1463 while (starting_) {
1464 starting_cv_.Wait(&mu_global_);
1465 }
1466 // Stay locked, and gather up some stuff to do.
1467 CHECK(grpc_cq_begin_op(cq, tag));
1468 if (shutdown_published_) {
1469 grpc_cq_end_op(cq, tag, absl::OkStatus(), DonePublishedShutdown, nullptr,
1470 new grpc_cq_completion);
1471 return;
1472 }
1473 shutdown_tags_.emplace_back(tag, cq);
1474 if (ShutdownCalled()) {
1475 return;
1476 }
1477 last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1478 broadcaster.FillChannelsLocked(GetChannelsLocked());
1479 removing_connections.swap(connections_);
1480 // Collect all unregistered then registered calls.
1481 {
1482 MutexLock lock(&mu_call_);
1483 KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1484 }
1485 ShutdownUnrefOnShutdownCall();
1486 }
1487 StopListening();
1488 broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1489 }
1490
StopListening()1491 void Server::StopListening() {
1492 for (auto& listener_state : listener_states_) {
1493 if (listener_state->listener() == nullptr) continue;
1494 channelz::ListenSocketNode* channelz_listen_socket_node =
1495 listener_state->listener()->channelz_listen_socket_node();
1496 if (channelz_node_ != nullptr && channelz_listen_socket_node != nullptr) {
1497 channelz_node_->RemoveChildListenSocket(
1498 channelz_listen_socket_node->uuid());
1499 }
1500 listener_state->Stop();
1501 }
1502 }
1503
CancelAllCalls()1504 void Server::CancelAllCalls() {
1505 ChannelBroadcaster broadcaster;
1506 {
1507 MutexLock lock(&mu_global_);
1508 broadcaster.FillChannelsLocked(GetChannelsLocked());
1509 }
1510 broadcaster.BroadcastShutdown(
1511 /*send_goaway=*/false, GRPC_ERROR_CREATE("Cancelling all calls"));
1512 }
1513
SendGoaways()1514 void Server::SendGoaways() {
1515 ChannelBroadcaster broadcaster;
1516 {
1517 MutexLock lock(&mu_global_);
1518 broadcaster.FillChannelsLocked(GetChannelsLocked());
1519 }
1520 broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1521 }
1522
Orphan()1523 void Server::Orphan() {
1524 {
1525 MutexLock lock(&mu_global_);
1526 CHECK(ShutdownCalled() || listener_states_.empty());
1527 CHECK(listeners_destroyed_ == listener_states_.size());
1528 }
1529 listener_states_.clear();
1530 Unref();
1531 }
1532
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1533 grpc_call_error Server::ValidateServerRequest(
1534 grpc_completion_queue* cq_for_notification, void* tag,
1535 grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1536 if ((rm == nullptr && optional_payload != nullptr) ||
1537 ((rm != nullptr) && ((optional_payload == nullptr) !=
1538 (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
1539 return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1540 }
1541 if (!grpc_cq_begin_op(cq_for_notification, tag)) {
1542 return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1543 }
1544 return GRPC_CALL_OK;
1545 }
1546
ValidateServerRequestAndCq(size_t * cq_idx,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1547 grpc_call_error Server::ValidateServerRequestAndCq(
1548 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
1549 grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1550 size_t idx;
1551 for (idx = 0; idx < cqs_.size(); idx++) {
1552 if (cqs_[idx] == cq_for_notification) {
1553 break;
1554 }
1555 }
1556 if (idx == cqs_.size()) {
1557 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1558 }
1559 grpc_call_error error =
1560 ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
1561 if (error != GRPC_CALL_OK) {
1562 return error;
1563 }
1564 *cq_idx = idx;
1565 return GRPC_CALL_OK;
1566 }
1567
QueueRequestedCall(size_t cq_idx,RequestedCall * rc)1568 grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) {
1569 if (ShutdownCalled()) {
1570 FailCall(cq_idx, rc, GRPC_ERROR_CREATE("Server Shutdown"));
1571 return GRPC_CALL_OK;
1572 }
1573 RequestMatcherInterface* rm;
1574 switch (rc->type) {
1575 case RequestedCall::Type::BATCH_CALL:
1576 rm = unregistered_request_matcher_.get();
1577 break;
1578 case RequestedCall::Type::REGISTERED_CALL:
1579 rm = rc->data.registered.method->matcher.get();
1580 break;
1581 }
1582 rm->RequestCallWithPossiblePublish(cq_idx, rc);
1583 return GRPC_CALL_OK;
1584 }
1585
RequestCall(grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1586 grpc_call_error Server::RequestCall(grpc_call** call,
1587 grpc_call_details* details,
1588 grpc_metadata_array* request_metadata,
1589 grpc_completion_queue* cq_bound_to_call,
1590 grpc_completion_queue* cq_for_notification,
1591 void* tag) {
1592 size_t cq_idx;
1593 grpc_call_error error = ValidateServerRequestAndCq(
1594 &cq_idx, cq_for_notification, tag, nullptr, nullptr);
1595 if (error != GRPC_CALL_OK) {
1596 return error;
1597 }
1598 RequestedCall* rc =
1599 new RequestedCall(tag, cq_bound_to_call, call, request_metadata, details);
1600 return QueueRequestedCall(cq_idx, rc);
1601 }
1602
RequestRegisteredCall(RegisteredMethod * rm,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1603 grpc_call_error Server::RequestRegisteredCall(
1604 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
1605 grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload,
1606 grpc_completion_queue* cq_bound_to_call,
1607 grpc_completion_queue* cq_for_notification, void* tag_new) {
1608 size_t cq_idx;
1609 grpc_call_error error = ValidateServerRequestAndCq(
1610 &cq_idx, cq_for_notification, tag_new, optional_payload, rm);
1611 if (error != GRPC_CALL_OK) {
1612 return error;
1613 }
1614 RequestedCall* rc =
1615 new RequestedCall(tag_new, cq_bound_to_call, call, request_metadata, rm,
1616 deadline, optional_payload);
1617 return QueueRequestedCall(cq_idx, rc);
1618 }
1619
1620 //
1621 // Server::ChannelData::ConnectivityWatcher
1622 //
1623
1624 class Server::ChannelData::ConnectivityWatcher
1625 : public AsyncConnectivityStateWatcherInterface {
1626 public:
ConnectivityWatcher(ChannelData * chand)1627 explicit ConnectivityWatcher(ChannelData* chand)
1628 : chand_(chand), channel_(chand_->channel_->RefAsSubclass<Channel>()) {}
1629
1630 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status &)1631 void OnConnectivityStateChange(grpc_connectivity_state new_state,
1632 const absl::Status& /*status*/) override {
1633 // Don't do anything until we are being shut down.
1634 if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1635 // Shut down channel.
1636 MutexLock lock(&chand_->server_->mu_global_);
1637 chand_->Destroy();
1638 }
1639
1640 ChannelData* const chand_;
1641 const RefCountedPtr<Channel> channel_;
1642 };
1643
1644 //
1645 // Server::ChannelData
1646 //
1647
~ChannelData()1648 Server::ChannelData::~ChannelData() {
1649 if (server_ != nullptr) {
1650 if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) {
1651 server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_);
1652 }
1653 {
1654 MutexLock lock(&server_->mu_global_);
1655 if (list_position_.has_value()) {
1656 server_->channels_.erase(*list_position_);
1657 list_position_.reset();
1658 }
1659 server_->MaybeFinishShutdown();
1660 }
1661 }
1662 }
1663
InitTransport(RefCountedPtr<Server> server,RefCountedPtr<Channel> channel,size_t cq_idx,Transport * transport,intptr_t channelz_socket_uuid)1664 void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
1665 RefCountedPtr<Channel> channel,
1666 size_t cq_idx, Transport* transport,
1667 intptr_t channelz_socket_uuid) {
1668 server_ = std::move(server);
1669 channel_ = std::move(channel);
1670 cq_idx_ = cq_idx;
1671 channelz_socket_uuid_ = channelz_socket_uuid;
1672 // Publish channel.
1673 {
1674 MutexLock lock(&server_->mu_global_);
1675 server_->channels_.push_front(this);
1676 list_position_ = server_->channels_.begin();
1677 }
1678 // Start accept_stream transport op.
1679 grpc_transport_op* op = grpc_make_transport_op(nullptr);
1680 CHECK(transport->filter_stack_transport() != nullptr);
1681 op->set_accept_stream = true;
1682 op->set_accept_stream_fn = AcceptStream;
1683 op->set_registered_method_matcher_fn = [](void* arg,
1684 ClientMetadata* metadata) {
1685 static_cast<ChannelData*>(arg)->server_->SetRegisteredMethodOnMetadata(
1686 *metadata);
1687 };
1688 op->set_accept_stream_user_data = this;
1689 op->start_connectivity_watch = MakeOrphanable<ConnectivityWatcher>(this);
1690 if (server_->ShutdownCalled()) {
1691 op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
1692 }
1693 transport->PerformOp(op);
1694 }
1695
GetRegisteredMethod(const absl::string_view & host,const absl::string_view & path)1696 Server::RegisteredMethod* Server::GetRegisteredMethod(
1697 const absl::string_view& host, const absl::string_view& path) {
1698 if (registered_methods_.empty()) return nullptr;
1699 // check for an exact match with host
1700 auto it = registered_methods_.find(std::make_pair(host, path));
1701 if (it != registered_methods_.end()) {
1702 return it->second.get();
1703 }
1704 // check for wildcard method definition (no host set)
1705 it = registered_methods_.find(std::make_pair("", path));
1706 if (it != registered_methods_.end()) {
1707 return it->second.get();
1708 }
1709 return nullptr;
1710 }
1711
SetRegisteredMethodOnMetadata(ClientMetadata & metadata)1712 void Server::SetRegisteredMethodOnMetadata(ClientMetadata& metadata) {
1713 auto* authority = metadata.get_pointer(HttpAuthorityMetadata());
1714 if (authority == nullptr) {
1715 authority = metadata.get_pointer(HostMetadata());
1716 if (authority == nullptr) {
1717 // Authority not being set is an RPC error.
1718 return;
1719 }
1720 }
1721 auto* path = metadata.get_pointer(HttpPathMetadata());
1722 if (path == nullptr) {
1723 // Path not being set would result in an RPC error.
1724 return;
1725 }
1726 RegisteredMethod* method =
1727 GetRegisteredMethod(authority->as_string_view(), path->as_string_view());
1728 // insert in metadata
1729 metadata.Set(GrpcRegisteredMethod(), method);
1730 }
1731
AcceptStream(void * arg,Transport *,const void * transport_server_data)1732 void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/,
1733 const void* transport_server_data) {
1734 auto* chand = static_cast<Server::ChannelData*>(arg);
1735 // create a call
1736 grpc_call_create_args args;
1737 args.channel = chand->channel_->RefAsSubclass<Channel>();
1738 args.server = chand->server_.get();
1739 args.parent = nullptr;
1740 args.propagation_mask = 0;
1741 args.cq = nullptr;
1742 args.pollset_set_alternative = nullptr;
1743 args.server_transport_data = transport_server_data;
1744 args.send_deadline = Timestamp::InfFuture();
1745 grpc_call* call;
1746 grpc_error_handle error = grpc_call_create(&args, &call);
1747 grpc_call_stack* call_stack = grpc_call_get_call_stack(call);
1748 CHECK_NE(call_stack, nullptr);
1749 grpc_call_element* elem = grpc_call_stack_element(call_stack, 0);
1750 auto* calld = static_cast<Server::CallData*>(elem->call_data);
1751 if (!error.ok()) {
1752 calld->FailCallCreation();
1753 return;
1754 }
1755 calld->Start(elem);
1756 }
1757
FinishDestroy(void * arg,grpc_error_handle)1758 void Server::ChannelData::FinishDestroy(void* arg,
1759 grpc_error_handle /*error*/) {
1760 auto* chand = static_cast<Server::ChannelData*>(arg);
1761 Server* server = chand->server_.get();
1762 auto* channel_stack = chand->channel_->channel_stack();
1763 chand->channel_.reset();
1764 server->Unref();
1765 GRPC_CHANNEL_STACK_UNREF(channel_stack, "Server::ChannelData::Destroy");
1766 }
1767
Destroy()1768 void Server::ChannelData::Destroy() {
1769 if (!list_position_.has_value()) return;
1770 CHECK(server_ != nullptr);
1771 server_->channels_.erase(*list_position_);
1772 list_position_.reset();
1773 server_->Ref().release();
1774 server_->MaybeFinishShutdown();
1775 // Unreffed by FinishDestroy
1776 GRPC_CHANNEL_STACK_REF(channel_->channel_stack(),
1777 "Server::ChannelData::Destroy");
1778 GRPC_CLOSURE_INIT(&finish_destroy_channel_closure_, FinishDestroy, this,
1779 grpc_schedule_on_exec_ctx);
1780 GRPC_TRACE_LOG(server_channel, INFO) << "Disconnected client";
1781 grpc_transport_op* op =
1782 grpc_make_transport_op(&finish_destroy_channel_closure_);
1783 op->set_accept_stream = true;
1784 grpc_channel_next_op(grpc_channel_stack_element(channel_->channel_stack(), 0),
1785 op);
1786 }
1787
InitChannelElement(grpc_channel_element * elem,grpc_channel_element_args * args)1788 grpc_error_handle Server::ChannelData::InitChannelElement(
1789 grpc_channel_element* elem, grpc_channel_element_args* args) {
1790 CHECK(args->is_first);
1791 CHECK(!args->is_last);
1792 new (elem->channel_data) ChannelData();
1793 return absl::OkStatus();
1794 }
1795
DestroyChannelElement(grpc_channel_element * elem)1796 void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) {
1797 auto* chand = static_cast<ChannelData*>(elem->channel_data);
1798 chand->~ChannelData();
1799 }
1800
1801 //
1802 // Server::CallData
1803 //
1804
CallData(grpc_call_element * elem,const grpc_call_element_args & args,RefCountedPtr<Server> server)1805 Server::CallData::CallData(grpc_call_element* elem,
1806 const grpc_call_element_args& args,
1807 RefCountedPtr<Server> server)
1808 : server_(std::move(server)),
1809 call_(grpc_call_from_top_element(elem)),
1810 call_combiner_(args.call_combiner) {
1811 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
1812 elem, grpc_schedule_on_exec_ctx);
1813 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
1814 elem, grpc_schedule_on_exec_ctx);
1815 }
1816
~CallData()1817 Server::CallData::~CallData() {
1818 CHECK(state_.load(std::memory_order_relaxed) != CallState::PENDING);
1819 grpc_metadata_array_destroy(&initial_metadata_);
1820 grpc_byte_buffer_destroy(payload_);
1821 }
1822
SetState(CallState state)1823 void Server::CallData::SetState(CallState state) {
1824 state_.store(state, std::memory_order_relaxed);
1825 }
1826
MaybeActivate()1827 bool Server::CallData::MaybeActivate() {
1828 CallState expected = CallState::PENDING;
1829 return state_.compare_exchange_strong(expected, CallState::ACTIVATED,
1830 std::memory_order_acq_rel,
1831 std::memory_order_relaxed);
1832 }
1833
FailCallCreation()1834 void Server::CallData::FailCallCreation() {
1835 CallState expected_not_started = CallState::NOT_STARTED;
1836 CallState expected_pending = CallState::PENDING;
1837 if (state_.compare_exchange_strong(expected_not_started, CallState::ZOMBIED,
1838 std::memory_order_acq_rel,
1839 std::memory_order_acquire)) {
1840 KillZombie();
1841 } else if (state_.compare_exchange_strong(
1842 expected_pending, CallState::ZOMBIED,
1843 std::memory_order_acq_rel, std::memory_order_relaxed)) {
1844 // Zombied call will be destroyed when it's removed from the pending
1845 // queue... later.
1846 }
1847 }
1848
Start(grpc_call_element * elem)1849 void Server::CallData::Start(grpc_call_element* elem) {
1850 grpc_op op;
1851 op.op = GRPC_OP_RECV_INITIAL_METADATA;
1852 op.flags = 0;
1853 op.reserved = nullptr;
1854 op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_;
1855 GRPC_CLOSURE_INIT(&recv_initial_metadata_batch_complete_,
1856 RecvInitialMetadataBatchComplete, elem,
1857 grpc_schedule_on_exec_ctx);
1858 grpc_call_start_batch_and_execute(call_, &op, 1,
1859 &recv_initial_metadata_batch_complete_);
1860 }
1861
Publish(size_t cq_idx,RequestedCall * rc)1862 void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) {
1863 grpc_call_set_completion_queue(call_, rc->cq_bound_to_call);
1864 *rc->call = call_;
1865 cq_new_ = server_->cqs_[cq_idx];
1866 std::swap(*rc->initial_metadata, initial_metadata_);
1867 switch (rc->type) {
1868 case RequestedCall::Type::BATCH_CALL:
1869 CHECK(host_.has_value());
1870 CHECK(path_.has_value());
1871 rc->data.batch.details->host = CSliceRef(host_->c_slice());
1872 rc->data.batch.details->method = CSliceRef(path_->c_slice());
1873 rc->data.batch.details->deadline =
1874 deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1875 break;
1876 case RequestedCall::Type::REGISTERED_CALL:
1877 *rc->data.registered.deadline =
1878 deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1879 if (rc->data.registered.optional_payload != nullptr) {
1880 *rc->data.registered.optional_payload = payload_;
1881 payload_ = nullptr;
1882 }
1883 break;
1884 default:
1885 GPR_UNREACHABLE_CODE(return);
1886 }
1887 grpc_cq_end_op(cq_new_, rc->tag, absl::OkStatus(), Server::DoneRequestEvent,
1888 rc, &rc->completion, true);
1889 }
1890
PublishNewRpc(void * arg,grpc_error_handle error)1891 void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) {
1892 grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
1893 auto* calld = static_cast<Server::CallData*>(call_elem->call_data);
1894 auto* chand = static_cast<Server::ChannelData*>(call_elem->channel_data);
1895 RequestMatcherInterface* rm = calld->matcher_;
1896 Server* server = rm->server();
1897 if (!error.ok() || server->ShutdownCalled()) {
1898 calld->state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1899 calld->KillZombie();
1900 return;
1901 }
1902 rm->MatchOrQueue(chand->cq_idx(), calld);
1903 }
1904
1905 namespace {
1906
KillZombieClosure(void * call,grpc_error_handle)1907 void KillZombieClosure(void* call, grpc_error_handle /*error*/) {
1908 grpc_call_unref(static_cast<grpc_call*>(call));
1909 }
1910
1911 } // namespace
1912
KillZombie()1913 void Server::CallData::KillZombie() {
1914 GRPC_CLOSURE_INIT(&kill_zombie_closure_, KillZombieClosure, call_,
1915 grpc_schedule_on_exec_ctx);
1916 ExecCtx::Run(DEBUG_LOCATION, &kill_zombie_closure_, absl::OkStatus());
1917 }
1918
1919 // If this changes, change MakeCallPromise too.
StartNewRpc(grpc_call_element * elem)1920 void Server::CallData::StartNewRpc(grpc_call_element* elem) {
1921 if (server_->ShutdownCalled()) {
1922 state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1923 KillZombie();
1924 return;
1925 }
1926 // Find request matcher.
1927 matcher_ = server_->unregistered_request_matcher_.get();
1928 grpc_server_register_method_payload_handling payload_handling =
1929 GRPC_SRM_PAYLOAD_NONE;
1930 if (path_.has_value() && host_.has_value()) {
1931 RegisteredMethod* rm = static_cast<RegisteredMethod*>(
1932 recv_initial_metadata_->get(GrpcRegisteredMethod()).value_or(nullptr));
1933 if (rm != nullptr) {
1934 matcher_ = rm->matcher.get();
1935 payload_handling = rm->payload_handling;
1936 }
1937 }
1938 // Start recv_message op if needed.
1939 switch (payload_handling) {
1940 case GRPC_SRM_PAYLOAD_NONE:
1941 PublishNewRpc(elem, absl::OkStatus());
1942 break;
1943 case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
1944 grpc_op op;
1945 op.op = GRPC_OP_RECV_MESSAGE;
1946 op.flags = 0;
1947 op.reserved = nullptr;
1948 op.data.recv_message.recv_message = &payload_;
1949 GRPC_CLOSURE_INIT(&publish_, PublishNewRpc, elem,
1950 grpc_schedule_on_exec_ctx);
1951 grpc_call_start_batch_and_execute(call_, &op, 1, &publish_);
1952 break;
1953 }
1954 }
1955 }
1956
RecvInitialMetadataBatchComplete(void * arg,grpc_error_handle error)1957 void Server::CallData::RecvInitialMetadataBatchComplete(
1958 void* arg, grpc_error_handle error) {
1959 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1960 auto* calld = static_cast<Server::CallData*>(elem->call_data);
1961 if (!error.ok()) {
1962 VLOG(2) << "Failed call creation: " << StatusToString(error);
1963 calld->FailCallCreation();
1964 return;
1965 }
1966 calld->StartNewRpc(elem);
1967 }
1968
StartTransportStreamOpBatchImpl(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1969 void Server::CallData::StartTransportStreamOpBatchImpl(
1970 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1971 if (batch->recv_initial_metadata) {
1972 recv_initial_metadata_ =
1973 batch->payload->recv_initial_metadata.recv_initial_metadata;
1974 original_recv_initial_metadata_ready_ =
1975 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
1976 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1977 &recv_initial_metadata_ready_;
1978 }
1979 if (batch->recv_trailing_metadata) {
1980 original_recv_trailing_metadata_ready_ =
1981 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1982 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1983 &recv_trailing_metadata_ready_;
1984 }
1985 grpc_call_next_op(elem, batch);
1986 }
1987
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1988 void Server::CallData::RecvInitialMetadataReady(void* arg,
1989 grpc_error_handle error) {
1990 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1991 CallData* calld = static_cast<CallData*>(elem->call_data);
1992 if (error.ok()) {
1993 calld->path_ = calld->recv_initial_metadata_->Take(HttpPathMetadata());
1994 auto* host =
1995 calld->recv_initial_metadata_->get_pointer(HttpAuthorityMetadata());
1996 if (host != nullptr) calld->host_.emplace(host->Ref());
1997 }
1998 auto op_deadline = calld->recv_initial_metadata_->get(GrpcTimeoutMetadata());
1999 if (op_deadline.has_value()) {
2000 calld->deadline_ = *op_deadline;
2001 Call::FromC(calld->call_)->UpdateDeadline(*op_deadline);
2002 }
2003 if (calld->host_.has_value() && calld->path_.has_value()) {
2004 // do nothing
2005 } else if (error.ok()) {
2006 // Pass the error reference to calld->recv_initial_metadata_error
2007 error = absl::UnknownError("Missing :authority or :path");
2008 calld->recv_initial_metadata_error_ = error;
2009 }
2010 grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
2011 calld->original_recv_initial_metadata_ready_ = nullptr;
2012 if (calld->seen_recv_trailing_metadata_ready_) {
2013 GRPC_CALL_COMBINER_START(calld->call_combiner_,
2014 &calld->recv_trailing_metadata_ready_,
2015 calld->recv_trailing_metadata_error_,
2016 "continue server recv_trailing_metadata_ready");
2017 }
2018 Closure::Run(DEBUG_LOCATION, closure, error);
2019 }
2020
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)2021 void Server::CallData::RecvTrailingMetadataReady(void* arg,
2022 grpc_error_handle error) {
2023 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2024 CallData* calld = static_cast<CallData*>(elem->call_data);
2025 if (calld->original_recv_initial_metadata_ready_ != nullptr) {
2026 calld->recv_trailing_metadata_error_ = error;
2027 calld->seen_recv_trailing_metadata_ready_ = true;
2028 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2029 RecvTrailingMetadataReady, elem,
2030 grpc_schedule_on_exec_ctx);
2031 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2032 "deferring server recv_trailing_metadata_ready "
2033 "until after recv_initial_metadata_ready");
2034 return;
2035 }
2036 error = grpc_error_add_child(error, calld->recv_initial_metadata_error_);
2037 Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2038 error);
2039 }
2040
InitCallElement(grpc_call_element * elem,const grpc_call_element_args * args)2041 grpc_error_handle Server::CallData::InitCallElement(
2042 grpc_call_element* elem, const grpc_call_element_args* args) {
2043 auto* chand = static_cast<ChannelData*>(elem->channel_data);
2044 new (elem->call_data) Server::CallData(elem, *args, chand->server());
2045 return absl::OkStatus();
2046 }
2047
DestroyCallElement(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)2048 void Server::CallData::DestroyCallElement(
2049 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
2050 grpc_closure* /*ignored*/) {
2051 auto* calld = static_cast<CallData*>(elem->call_data);
2052 calld->~CallData();
2053 }
2054
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2055 void Server::CallData::StartTransportStreamOpBatch(
2056 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2057 auto* calld = static_cast<CallData*>(elem->call_data);
2058 calld->StartTransportStreamOpBatchImpl(elem, batch);
2059 }
2060
2061 } // namespace grpc_core
2062
2063 //
2064 // C-core API
2065 //
2066
grpc_server_create(const grpc_channel_args * args,void * reserved)2067 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
2068 grpc_core::ExecCtx exec_ctx;
2069 GRPC_TRACE_LOG(api, INFO)
2070 << "grpc_server_create(" << args << ", " << reserved << ")";
2071 grpc_core::Server* server =
2072 new grpc_core::Server(grpc_core::CoreConfiguration::Get()
2073 .channel_args_preconditioning()
2074 .PreconditionChannelArgs(args));
2075 return server->c_ptr();
2076 }
2077
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)2078 void grpc_server_register_completion_queue(grpc_server* server,
2079 grpc_completion_queue* cq,
2080 void* reserved) {
2081 GRPC_TRACE_LOG(api, INFO)
2082 << "grpc_server_register_completion_queue(server=" << server
2083 << ", cq=" << cq << ", reserved=" << reserved << ")";
2084 CHECK(!reserved);
2085 auto cq_type = grpc_get_cq_completion_type(cq);
2086 if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
2087 VLOG(2) << "Completion queue of type " << static_cast<int>(cq_type)
2088 << " is being registered as a server-completion-queue";
2089 // Ideally we should log an error and abort but ruby-wrapped-language API
2090 // calls grpc_completion_queue_pluck() on server completion queues
2091 }
2092 grpc_core::Server::FromC(server)->RegisterCompletionQueue(cq);
2093 }
2094
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)2095 void* grpc_server_register_method(
2096 grpc_server* server, const char* method, const char* host,
2097 grpc_server_register_method_payload_handling payload_handling,
2098 uint32_t flags) {
2099 GRPC_TRACE_LOG(api, INFO) << "grpc_server_register_method(server=" << server
2100 << ", method=" << method << ", host=" << host
2101 << ", flags=" << absl::StrFormat("0x%08x", flags);
2102 return grpc_core::Server::FromC(server)->RegisterMethod(
2103 method, host, payload_handling, flags);
2104 }
2105
grpc_server_start(grpc_server * server)2106 void grpc_server_start(grpc_server* server) {
2107 grpc_core::ExecCtx exec_ctx;
2108 GRPC_TRACE_LOG(api, INFO) << "grpc_server_start(server=" << server << ")";
2109 grpc_core::Server::FromC(server)->Start();
2110 }
2111
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)2112 void grpc_server_shutdown_and_notify(grpc_server* server,
2113 grpc_completion_queue* cq, void* tag) {
2114 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2115 grpc_core::ExecCtx exec_ctx;
2116 GRPC_TRACE_LOG(api, INFO)
2117 << "grpc_server_shutdown_and_notify(server=" << server << ", cq=" << cq
2118 << ", tag=" << tag << ")";
2119 grpc_core::Server::FromC(server)->ShutdownAndNotify(cq, tag);
2120 }
2121
grpc_server_cancel_all_calls(grpc_server * server)2122 void grpc_server_cancel_all_calls(grpc_server* server) {
2123 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2124 grpc_core::ExecCtx exec_ctx;
2125 GRPC_TRACE_LOG(api, INFO)
2126 << "grpc_server_cancel_all_calls(server=" << server << ")";
2127 grpc_core::Server::FromC(server)->CancelAllCalls();
2128 }
2129
grpc_server_destroy(grpc_server * server)2130 void grpc_server_destroy(grpc_server* server) {
2131 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2132 grpc_core::ExecCtx exec_ctx;
2133 GRPC_TRACE_LOG(api, INFO) << "grpc_server_destroy(server=" << server << ")";
2134 grpc_core::Server::FromC(server)->Orphan();
2135 }
2136
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)2137 grpc_call_error grpc_server_request_call(
2138 grpc_server* server, grpc_call** call, grpc_call_details* details,
2139 grpc_metadata_array* request_metadata,
2140 grpc_completion_queue* cq_bound_to_call,
2141 grpc_completion_queue* cq_for_notification, void* tag) {
2142 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2143 grpc_core::ExecCtx exec_ctx;
2144 GRPC_TRACE_LOG(api, INFO)
2145 << "grpc_server_request_call(" << "server=" << server << ", call=" << call
2146 << ", details=" << details << ", initial_metadata=" << request_metadata
2147 << ", cq_bound_to_call=" << cq_bound_to_call
2148 << ", cq_for_notification=" << cq_for_notification << ", tag=" << tag;
2149 return grpc_core::Server::FromC(server)->RequestCall(
2150 call, details, request_metadata, cq_bound_to_call, cq_for_notification,
2151 tag);
2152 }
2153
grpc_server_request_registered_call(grpc_server * server,void * registered_method,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)2154 grpc_call_error grpc_server_request_registered_call(
2155 grpc_server* server, void* registered_method, grpc_call** call,
2156 gpr_timespec* deadline, grpc_metadata_array* request_metadata,
2157 grpc_byte_buffer** optional_payload,
2158 grpc_completion_queue* cq_bound_to_call,
2159 grpc_completion_queue* cq_for_notification, void* tag_new) {
2160 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2161 grpc_core::ExecCtx exec_ctx;
2162 auto* rm =
2163 static_cast<grpc_core::Server::RegisteredMethod*>(registered_method);
2164 GRPC_TRACE_LOG(api, INFO)
2165 << "grpc_server_request_registered_call(" << "server=" << server
2166 << ", registered_method=" << registered_method << ", call=" << call
2167 << ", deadline=" << deadline << ", request_metadata=" << request_metadata
2168 << ", optional_payload=" << optional_payload
2169 << ", cq_bound_to_call=" << cq_bound_to_call
2170 << ", cq_for_notification=" << cq_for_notification << ", tag=" << tag_new
2171 << ")";
2172 return grpc_core::Server::FromC(server)->RequestRegisteredCall(
2173 rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call,
2174 cq_for_notification, tag_new);
2175 }
2176
grpc_server_set_config_fetcher(grpc_server * server,grpc_server_config_fetcher * server_config_fetcher)2177 void grpc_server_set_config_fetcher(
2178 grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) {
2179 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2180 grpc_core::ExecCtx exec_ctx;
2181 GRPC_TRACE_LOG(api, INFO)
2182 << "grpc_server_set_config_fetcher(server=" << server
2183 << ", config_fetcher=" << server_config_fetcher << ")";
2184 grpc_core::Server::FromC(server)->set_config_fetcher(
2185 std::unique_ptr<grpc_core::ServerConfigFetcher>(
2186 grpc_core::ServerConfigFetcher::FromC(server_config_fetcher)));
2187 }
2188
grpc_server_config_fetcher_destroy(grpc_server_config_fetcher * server_config_fetcher)2189 void grpc_server_config_fetcher_destroy(
2190 grpc_server_config_fetcher* server_config_fetcher) {
2191 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2192 grpc_core::ExecCtx exec_ctx;
2193 GRPC_TRACE_LOG(api, INFO)
2194 << "grpc_server_config_fetcher_destroy(config_fetcher="
2195 << server_config_fetcher << ")";
2196 delete grpc_core::ServerConfigFetcher::FromC(server_config_fetcher);
2197 }
2198