• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015-2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/server/server.h"
18 
19 #include <grpc/byte_buffer.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/channel_arg_names.h>
22 #include <grpc/impl/connectivity_state.h>
23 #include <grpc/slice.h>
24 #include <grpc/status.h>
25 #include <grpc/support/port_platform.h>
26 #include <grpc/support/time.h>
27 #include <inttypes.h>
28 #include <stdlib.h>
29 #include <string.h>
30 
31 #include <algorithm>
32 #include <atomic>
33 #include <list>
34 #include <memory>
35 #include <new>
36 #include <queue>
37 #include <type_traits>
38 #include <utility>
39 #include <vector>
40 
41 #include "absl/cleanup/cleanup.h"
42 #include "absl/container/flat_hash_map.h"
43 #include "absl/log/check.h"
44 #include "absl/log/log.h"
45 #include "absl/status/status.h"
46 #include "absl/types/optional.h"
47 #include "src/core/channelz/channel_trace.h"
48 #include "src/core/channelz/channelz.h"
49 #include "src/core/config/core_configuration.h"
50 #include "src/core/lib/address_utils/sockaddr_utils.h"
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_args_preconditioning.h"
53 #include "src/core/lib/experiments/experiments.h"
54 #include "src/core/lib/iomgr/exec_ctx.h"
55 #include "src/core/lib/iomgr/pollset_set.h"
56 #include "src/core/lib/promise/activity.h"
57 #include "src/core/lib/promise/cancel_callback.h"
58 #include "src/core/lib/promise/context.h"
59 #include "src/core/lib/promise/map.h"
60 #include "src/core/lib/promise/pipe.h"
61 #include "src/core/lib/promise/poll.h"
62 #include "src/core/lib/promise/promise.h"
63 #include "src/core/lib/promise/seq.h"
64 #include "src/core/lib/promise/try_join.h"
65 #include "src/core/lib/promise/try_seq.h"
66 #include "src/core/lib/security/credentials/credentials.h"
67 #include "src/core/lib/slice/slice_buffer.h"
68 #include "src/core/lib/slice/slice_internal.h"
69 #include "src/core/lib/surface/call.h"
70 #include "src/core/lib/surface/call_utils.h"
71 #include "src/core/lib/surface/channel.h"
72 #include "src/core/lib/surface/channel_stack_type.h"
73 #include "src/core/lib/surface/completion_queue.h"
74 #include "src/core/lib/surface/legacy_channel.h"
75 #include "src/core/lib/surface/server_call.h"
76 #include "src/core/lib/transport/connectivity_state.h"
77 #include "src/core/lib/transport/error_utils.h"
78 #include "src/core/lib/transport/interception_chain.h"
79 #include "src/core/telemetry/stats.h"
80 #include "src/core/util/crash.h"
81 #include "src/core/util/debug_location.h"
82 #include "src/core/util/mpscq.h"
83 #include "src/core/util/orphanable.h"
84 #include "src/core/util/status_helper.h"
85 #include "src/core/util/useful.h"
86 
87 namespace grpc_core {
88 
89 //
90 // Server::ListenerState::ConfigFetcherWatcher
91 //
92 
UpdateConnectionManager(RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager)93 void Server::ListenerState::ConfigFetcherWatcher::UpdateConnectionManager(
94     RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager) {
95   RefCountedPtr<ServerConfigFetcher::ConnectionManager>
96       connection_manager_to_destroy;
97   {
98     MutexLock lock(&listener_state_->mu_);
99     connection_manager_to_destroy = listener_state_->connection_manager_;
100     listener_state_->connection_manager_ = std::move(connection_manager);
101     listener_state_->DrainConnectionsLocked();
102     if (listener_state_->server_->ShutdownCalled()) {
103       return;
104     }
105     listener_state_->is_serving_ = true;
106     if (listener_state_->started_) return;
107     listener_state_->started_ = true;
108   }
109   listener_state_->listener_->Start();
110 }
111 
StopServing()112 void Server::ListenerState::ConfigFetcherWatcher::StopServing() {
113   MutexLock lock(&listener_state_->mu_);
114   listener_state_->is_serving_ = false;
115   listener_state_->DrainConnectionsLocked();
116 }
117 
118 //
119 // Server::ListenerState
120 //
121 
ListenerState(RefCountedPtr<Server> server,OrphanablePtr<ListenerInterface> l)122 Server::ListenerState::ListenerState(RefCountedPtr<Server> server,
123                                      OrphanablePtr<ListenerInterface> l)
124     : server_(std::move(server)),
125       memory_quota_(
126           server_->channel_args().GetObject<ResourceQuota>()->memory_quota()),
127       connection_quota_(MakeRefCounted<ConnectionQuota>()),
128       event_engine_(
129           server_->channel_args()
130               .GetObject<grpc_event_engine::experimental::EventEngine>()),
131       listener_(std::move(l)) {
132   auto max_allowed_incoming_connections =
133       server_->channel_args().GetInt(GRPC_ARG_MAX_ALLOWED_INCOMING_CONNECTIONS);
134   if (max_allowed_incoming_connections.has_value()) {
135     connection_quota_->SetMaxIncomingConnections(
136         max_allowed_incoming_connections.value());
137   }
138 }
139 
Start()140 void Server::ListenerState::Start() {
141   if (IsServerListenerEnabled()) {
142     if (server_->config_fetcher() != nullptr) {
143       auto watcher = std::make_unique<ConfigFetcherWatcher>(this);
144       config_fetcher_watcher_ = watcher.get();
145       server_->config_fetcher()->StartWatch(
146           grpc_sockaddr_to_string(listener_->resolved_address(), false).value(),
147           std::move(watcher));
148     } else {
149       {
150         MutexLock lock(&mu_);
151         started_ = true;
152         is_serving_ = true;
153       }
154       listener_->Start();
155     }
156   } else {
157     listener_->Start();
158   }
159 }
160 
Stop()161 void Server::ListenerState::Stop() {
162   if (IsServerListenerEnabled()) {
163     absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>>
164         connections;
165     {
166       MutexLock lock(&mu_);
167       // Orphan the connections so that they can start cleaning up.
168       connections = std::move(connections_);
169       connections_.clear();
170       is_serving_ = false;
171     }
172     if (config_fetcher_watcher_ != nullptr) {
173       CHECK_NE(server_->config_fetcher(), nullptr);
174       server_->config_fetcher()->CancelWatch(config_fetcher_watcher_);
175     }
176   }
177   GRPC_CLOSURE_INIT(&destroy_done_, ListenerDestroyDone, server_.get(),
178                     grpc_schedule_on_exec_ctx);
179   listener_->SetOnDestroyDone(&destroy_done_);
180   listener_.reset();
181 }
182 
AddLogicalConnection(OrphanablePtr<ListenerInterface::LogicalConnection> connection,const ChannelArgs & args,grpc_endpoint * endpoint)183 absl::optional<ChannelArgs> Server::ListenerState::AddLogicalConnection(
184     OrphanablePtr<ListenerInterface::LogicalConnection> connection,
185     const ChannelArgs& args, grpc_endpoint* endpoint) {
186   RefCountedPtr<ServerConfigFetcher::ConnectionManager> connection_manager;
187   {
188     MutexLock lock(&mu_);
189     if (!is_serving_) {
190       // Not serving
191       return absl::nullopt;
192     }
193     connection_manager = connection_manager_;
194   }
195   // The following section is intentionally outside the critical section. The
196   // operation to update channel args for a connection is heavy and complicated.
197   // For example, if using the xDS config fetcher, an involved matching process
198   // is performed to determine the filter chain to apply for this connection,
199   // prepare the filters, config selector and credentials. Subsequently, the
200   // credentials are used to create a security connector as well. Doing this
201   // outside the critical region allows us to get a larger degree of parallelism
202   // for the handling of incoming connections.
203   ChannelArgs new_args = args;
204   if (server_->config_fetcher() != nullptr) {
205     if (connection_manager == nullptr) {
206       // Connection manager not available
207       return absl::nullopt;
208     }
209     absl::StatusOr<ChannelArgs> args_result =
210         connection_manager->UpdateChannelArgsForConnection(new_args, endpoint);
211     if (!args_result.ok()) {
212       return absl::nullopt;
213     }
214     auto* server_credentials =
215         (*args_result).GetObject<grpc_server_credentials>();
216     if (server_credentials == nullptr) {
217       // Could not find server credentials
218       return absl::nullopt;
219     }
220     auto security_connector =
221         server_credentials->create_security_connector(*args_result);
222     if (security_connector == nullptr) {
223       // Unable to create secure server with credentials
224       return absl::nullopt;
225     }
226     new_args = (*args_result).SetObject(security_connector);
227   }
228   MutexLock lock(&mu_);
229   // Since we let go of the lock earlier, we need to protect ourselves against
230   // time-of-check-to-time-of-use cases. The server may have stopped serving
231   // or the connection manager may have changed before we add the connection
232   // to the list of tracked connections.
233   if (!is_serving_ || connection_manager != connection_manager_) {
234     // Not serving
235     return absl::nullopt;
236   }
237   connections_.emplace(std::move(connection));
238   return new_args;
239 }
240 
OnHandshakeDone(ListenerInterface::LogicalConnection * connection)241 void Server::ListenerState::OnHandshakeDone(
242     ListenerInterface::LogicalConnection* connection) {
243   if (server_->config_fetcher() != nullptr) {
244     return;
245   }
246   // There is no config fetcher, so we can remove this connection from being
247   // tracked immediately.
248   OrphanablePtr<ListenerInterface::LogicalConnection> connection_to_remove;
249   {
250     // Remove the connection if it wasn't already removed.
251     MutexLock lock(&mu_);
252     auto connection_handle = connections_.extract(connection);
253     if (!connection_handle.empty()) {
254       connection_to_remove = std::move(connection_handle.value());
255     }
256     // We do not need to check connections_to_be_drained_list_ since that only
257     // gets set if there is a config fetcher.
258   }
259 }
260 
RemoveLogicalConnection(ListenerInterface::LogicalConnection * connection)261 void Server::ListenerState::RemoveLogicalConnection(
262     ListenerInterface::LogicalConnection* connection) {
263   OrphanablePtr<ListenerInterface::LogicalConnection> connection_to_remove;
264   // Remove the connection if it wasn't already removed.
265   MutexLock lock(&mu_);
266   auto connection_handle = connections_.extract(connection);
267   if (!connection_handle.empty()) {
268     connection_to_remove = std::move(connection_handle.value());
269     return;
270   }
271   // The connection might be getting drained.
272   for (auto it = connections_to_be_drained_list_.begin();
273        it != connections_to_be_drained_list_.end(); ++it) {
274     auto connection_handle = it->connections.extract(connection);
275     if (!connection_handle.empty()) {
276       connection_to_remove = std::move(connection_handle.value());
277       RemoveConnectionsToBeDrainedOnEmptyLocked(it);
278       return;
279     }
280   }
281 }
282 
DrainConnectionsLocked()283 void Server::ListenerState::DrainConnectionsLocked() {
284   if (connections_.empty()) {
285     return;
286   }
287   // Send GOAWAYs on the transports so that they disconnect when existing
288   // RPCs finish.
289   for (auto& connection : connections_) {
290     connection->SendGoAway();
291   }
292   connections_to_be_drained_list_.emplace_back();
293   auto& connections_to_be_drained = connections_to_be_drained_list_.back();
294   connections_to_be_drained.connections = std::move(connections_);
295   connections_.clear();
296   connections_to_be_drained.timestamp =
297       Timestamp::Now() +
298       std::max(Duration::Zero(),
299                server_->channel_args()
300                    .GetDurationFromIntMillis(
301                        GRPC_ARG_SERVER_CONFIG_CHANGE_DRAIN_GRACE_TIME_MS)
302                    .value_or(Duration::Minutes(10)));
303   MaybeStartNewGraceTimerLocked();
304 }
305 
OnDrainGraceTimer()306 void Server::ListenerState::OnDrainGraceTimer() {
307   absl::flat_hash_set<OrphanablePtr<ListenerInterface::LogicalConnection>>
308       connections_to_be_drained;
309   {
310     MutexLock lock(&mu_);
311     if (connections_to_be_drained_list_.empty()) {
312       return;
313     }
314     connections_to_be_drained =
315         std::move(connections_to_be_drained_list_.front().connections);
316     connections_to_be_drained_list_.pop_front();
317     MaybeStartNewGraceTimerLocked();
318   }
319   for (auto& connection : connections_to_be_drained) {
320     connection->DisconnectImmediately();
321   }
322 }
323 
MaybeStartNewGraceTimerLocked()324 void Server::ListenerState::MaybeStartNewGraceTimerLocked() {
325   if (connections_to_be_drained_list_.empty()) {
326     return;
327   }
328   drain_grace_timer_handle_ = event_engine()->RunAfter(
329       connections_to_be_drained_list_.front().timestamp - Timestamp::Now(),
330       [self = Ref()]() mutable {
331         ApplicationCallbackExecCtx callback_exec_ctx;
332         ExecCtx exec_ctx;
333         self->OnDrainGraceTimer();
334         // resetting within an active ExecCtx
335         self.reset();
336       });
337 }
338 
RemoveConnectionsToBeDrainedOnEmptyLocked(std::deque<ConnectionsToBeDrained>::iterator it)339 void Server::ListenerState::RemoveConnectionsToBeDrainedOnEmptyLocked(
340     std::deque<ConnectionsToBeDrained>::iterator it) {
341   if (it->connections.empty()) {
342     // Cancel the timer if the set of connections is now empty.
343     if (event_engine()->Cancel(drain_grace_timer_handle_)) {
344       // Only remove the entry from the list if the cancellation was
345       // actually successful. OnDrainGraceTimer() will remove if
346       // cancellation is not successful.
347       connections_to_be_drained_list_.erase(it);
348       MaybeStartNewGraceTimerLocked();
349     }
350   }
351 }
352 
353 //
354 // Server::RequestMatcherInterface
355 //
356 
357 // RPCs that come in from the transport must be matched against RPC requests
358 // from the application. An incoming request from the application can be matched
359 // to an RPC that has already arrived or can be queued up for later use.
360 // Likewise, an RPC coming in from the transport can either be matched to a
361 // request that already arrived from the application or can be queued up for
362 // later use (marked pending). If there is a match, the request's tag is posted
363 // on the request's notification CQ.
364 //
365 // RequestMatcherInterface is the base class to provide this functionality.
366 class Server::RequestMatcherInterface {
367  public:
~RequestMatcherInterface()368   virtual ~RequestMatcherInterface() {}
369 
370   // Unref the calls associated with any incoming RPCs in the pending queue (not
371   // yet matched to an application-requested RPC).
372   virtual void ZombifyPending() = 0;
373 
374   // Mark all application-requested RPCs failed if they have not been matched to
375   // an incoming RPC. The error parameter indicates why the RPCs are being
376   // failed (always server shutdown in all current implementations).
377   virtual void KillRequests(grpc_error_handle error) = 0;
378 
379   // How many request queues are supported by this matcher. This is an abstract
380   // concept that essentially maps to gRPC completion queues.
381   virtual size_t request_queue_count() const = 0;
382 
383   // This function is invoked when the application requests a new RPC whose
384   // information is in the call parameter. The request_queue_index marks the
385   // queue onto which to place this RPC, and is typically associated with a gRPC
386   // CQ. If there are pending RPCs waiting to be matched, publish one (match it
387   // and notify the CQ).
388   virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
389                                               RequestedCall* call) = 0;
390 
391   class MatchResult {
392    public:
MatchResult(Server * server,size_t cq_idx,RequestedCall * requested_call)393     MatchResult(Server* server, size_t cq_idx, RequestedCall* requested_call)
394         : server_(server), cq_idx_(cq_idx), requested_call_(requested_call) {}
~MatchResult()395     ~MatchResult() {
396       if (requested_call_ != nullptr) {
397         server_->FailCall(cq_idx_, requested_call_, absl::CancelledError());
398       }
399     }
400 
401     MatchResult(const MatchResult&) = delete;
402     MatchResult& operator=(const MatchResult&) = delete;
403 
MatchResult(MatchResult && other)404     MatchResult(MatchResult&& other) noexcept
405         : server_(other.server_),
406           cq_idx_(other.cq_idx_),
407           requested_call_(std::exchange(other.requested_call_, nullptr)) {}
408 
TakeCall()409     RequestedCall* TakeCall() {
410       return std::exchange(requested_call_, nullptr);
411     }
412 
cq() const413     grpc_completion_queue* cq() const { return server_->cqs_[cq_idx_]; }
cq_idx() const414     size_t cq_idx() const { return cq_idx_; }
415 
416    private:
417     Server* server_;
418     size_t cq_idx_;
419     RequestedCall* requested_call_;
420   };
421 
422   // This function is invoked on an incoming promise based RPC.
423   // The RequestMatcher will try to match it against an application-requested
424   // RPC if possible or will place it in the pending queue otherwise. To enable
425   // some measure of fairness between server CQs, the match is done starting at
426   // the start_request_queue_index parameter in a cyclic order rather than
427   // always starting at 0.
428   virtual ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
429       size_t start_request_queue_index) = 0;
430 
431   // This function is invoked on an incoming RPC, represented by the calld
432   // object. The RequestMatcher will try to match it against an
433   // application-requested RPC if possible or will place it in the pending queue
434   // otherwise. To enable some measure of fairness between server CQs, the match
435   // is done starting at the start_request_queue_index parameter in a cyclic
436   // order rather than always starting at 0.
437   virtual void MatchOrQueue(size_t start_request_queue_index,
438                             CallData* calld) = 0;
439 
440   // Returns the server associated with this request matcher
441   virtual Server* server() const = 0;
442 };
443 
444 //
445 // Server::RegisteredMethod
446 //
447 
448 struct Server::RegisteredMethod {
RegisteredMethodgrpc_core::Server::RegisteredMethod449   RegisteredMethod(
450       const char* method_arg, const char* host_arg,
451       grpc_server_register_method_payload_handling payload_handling_arg,
452       uint32_t flags_arg)
453       : method(method_arg == nullptr ? "" : method_arg),
454         host(host_arg == nullptr ? "" : host_arg),
455         payload_handling(payload_handling_arg),
456         flags(flags_arg) {}
457 
458   ~RegisteredMethod() = default;
459 
460   const std::string method;
461   const std::string host;
462   const grpc_server_register_method_payload_handling payload_handling;
463   const uint32_t flags;
464   // One request matcher per method.
465   std::unique_ptr<RequestMatcherInterface> matcher;
466 };
467 
468 //
469 // Server::RequestedCall
470 //
471 
472 struct Server::RequestedCall {
473   enum class Type { BATCH_CALL, REGISTERED_CALL };
474 
RequestedCallgrpc_core::Server::RequestedCall475   RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
476                 grpc_call** call_arg, grpc_metadata_array* initial_md,
477                 grpc_call_details* details)
478       : type(Type::BATCH_CALL),
479         tag(tag_arg),
480         cq_bound_to_call(call_cq),
481         call(call_arg),
482         initial_metadata(initial_md) {
483     data.batch.details = details;
484   }
485 
RequestedCallgrpc_core::Server::RequestedCall486   RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
487                 grpc_call** call_arg, grpc_metadata_array* initial_md,
488                 RegisteredMethod* rm, gpr_timespec* deadline,
489                 grpc_byte_buffer** optional_payload)
490       : type(Type::REGISTERED_CALL),
491         tag(tag_arg),
492         cq_bound_to_call(call_cq),
493         call(call_arg),
494         initial_metadata(initial_md) {
495     data.registered.method = rm;
496     data.registered.deadline = deadline;
497     data.registered.optional_payload = optional_payload;
498   }
499 
500   template <typename OptionalPayload>
Completegrpc_core::Server::RequestedCall501   void Complete(OptionalPayload payload, ClientMetadata& md) {
502     Timestamp deadline =
503         md.get(GrpcTimeoutMetadata()).value_or(Timestamp::InfFuture());
504     switch (type) {
505       case RequestedCall::Type::BATCH_CALL:
506         CHECK(!payload.has_value());
507         data.batch.details->host =
508             CSliceRef(md.get_pointer(HttpAuthorityMetadata())->c_slice());
509         data.batch.details->method =
510             CSliceRef(md.Take(HttpPathMetadata())->c_slice());
511         data.batch.details->deadline =
512             deadline.as_timespec(GPR_CLOCK_MONOTONIC);
513         break;
514       case RequestedCall::Type::REGISTERED_CALL:
515         md.Remove(HttpPathMetadata());
516         *data.registered.deadline = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
517         if (data.registered.optional_payload != nullptr) {
518           if (payload.has_value()) {
519             auto* sb = payload.value()->payload()->c_slice_buffer();
520             *data.registered.optional_payload =
521                 grpc_raw_byte_buffer_create(sb->slices, sb->count);
522           } else {
523             *data.registered.optional_payload = nullptr;
524           }
525         }
526         break;
527       default:
528         GPR_UNREACHABLE_CODE(abort());
529     }
530   }
531 
532   MultiProducerSingleConsumerQueue::Node mpscq_node;
533   const Type type;
534   void* const tag;
535   grpc_completion_queue* const cq_bound_to_call;
536   grpc_call** const call;
537   grpc_cq_completion completion;
538   grpc_metadata_array* const initial_metadata;
539   union {
540     struct {
541       grpc_call_details* details;
542     } batch;
543     struct {
544       RegisteredMethod* method;
545       gpr_timespec* deadline;
546       grpc_byte_buffer** optional_payload;
547     } registered;
548   } data;
549 };
550 
551 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
552 // actually uses all the features of RequestMatcherInterface: expecting the
553 // application to explicitly request RPCs and then matching those to incoming
554 // RPCs, along with a slow path by which incoming RPCs are put on a locked
555 // pending list if they aren't able to be matched to an application request.
556 class Server::RealRequestMatcher : public RequestMatcherInterface {
557  public:
RealRequestMatcher(Server * server)558   explicit RealRequestMatcher(Server* server)
559       : server_(server), requests_per_cq_(server->cqs_.size()) {}
560 
~RealRequestMatcher()561   ~RealRequestMatcher() override {
562     for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
563       CHECK_EQ(queue.Pop(), nullptr);
564     }
565     CHECK(pending_filter_stack_.empty());
566     CHECK(pending_promises_.empty());
567   }
568 
ZombifyPending()569   void ZombifyPending() override {
570     while (!pending_filter_stack_.empty()) {
571       pending_filter_stack_.front().calld->SetState(
572           CallData::CallState::ZOMBIED);
573       pending_filter_stack_.front().calld->KillZombie();
574       pending_filter_stack_.pop();
575     }
576     while (!pending_promises_.empty()) {
577       pending_promises_.front()->Finish(absl::InternalError("Server closed"));
578       pending_promises_.pop();
579     }
580     zombified_ = true;
581   }
582 
KillRequests(grpc_error_handle error)583   void KillRequests(grpc_error_handle error) override {
584     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
585       RequestedCall* rc;
586       while ((rc = reinterpret_cast<RequestedCall*>(
587                   requests_per_cq_[i].Pop())) != nullptr) {
588         server_->FailCall(i, rc, error);
589       }
590     }
591   }
592 
request_queue_count() const593   size_t request_queue_count() const override {
594     return requests_per_cq_.size();
595   }
596 
RequestCallWithPossiblePublish(size_t request_queue_index,RequestedCall * call)597   void RequestCallWithPossiblePublish(size_t request_queue_index,
598                                       RequestedCall* call) override {
599     if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
600       // this was the first queued request: we need to lock and start
601       // matching calls
602       struct NextPendingCall {
603         RequestedCall* rc = nullptr;
604         CallData* pending_filter_stack = nullptr;
605         PendingCallPromises pending_promise;
606       };
607       while (true) {
608         NextPendingCall pending_call;
609         {
610           MutexLock lock(&server_->mu_call_);
611           while (!pending_filter_stack_.empty() &&
612                  pending_filter_stack_.front().Age() >
613                      server_->max_time_in_pending_queue_) {
614             pending_filter_stack_.front().calld->SetState(
615                 CallData::CallState::ZOMBIED);
616             pending_filter_stack_.front().calld->KillZombie();
617             pending_filter_stack_.pop();
618           }
619           if (!pending_promises_.empty()) {
620             pending_call.rc = reinterpret_cast<RequestedCall*>(
621                 requests_per_cq_[request_queue_index].Pop());
622             if (pending_call.rc != nullptr) {
623               pending_call.pending_promise =
624                   std::move(pending_promises_.front());
625               pending_promises_.pop();
626             }
627           } else if (!pending_filter_stack_.empty()) {
628             pending_call.rc = reinterpret_cast<RequestedCall*>(
629                 requests_per_cq_[request_queue_index].Pop());
630             if (pending_call.rc != nullptr) {
631               pending_call.pending_filter_stack =
632                   pending_filter_stack_.front().calld;
633               pending_filter_stack_.pop();
634             }
635           }
636         }
637         if (pending_call.rc == nullptr) break;
638         if (pending_call.pending_filter_stack != nullptr) {
639           if (!pending_call.pending_filter_stack->MaybeActivate()) {
640             // Zombied Call
641             pending_call.pending_filter_stack->KillZombie();
642             requests_per_cq_[request_queue_index].Push(
643                 &pending_call.rc->mpscq_node);
644           } else {
645             pending_call.pending_filter_stack->Publish(request_queue_index,
646                                                        pending_call.rc);
647           }
648         } else {
649           if (!pending_call.pending_promise->Finish(
650                   server(), request_queue_index, pending_call.rc)) {
651             requests_per_cq_[request_queue_index].Push(
652                 &pending_call.rc->mpscq_node);
653           }
654         }
655       }
656     }
657   }
658 
MatchOrQueue(size_t start_request_queue_index,CallData * calld)659   void MatchOrQueue(size_t start_request_queue_index,
660                     CallData* calld) override {
661     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
662       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
663       RequestedCall* rc =
664           reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
665       if (rc != nullptr) {
666         calld->SetState(CallData::CallState::ACTIVATED);
667         calld->Publish(cq_idx, rc);
668         return;
669       }
670     }
671     // No cq to take the request found; queue it on the slow list.
672     // We need to ensure that all the queues are empty.  We do this under
673     // the server mu_call_ lock to ensure that if something is added to
674     // an empty request queue, it will block until the call is actually
675     // added to the pending list.
676     RequestedCall* rc = nullptr;
677     size_t cq_idx = 0;
678     size_t loop_count;
679     {
680       MutexLock lock(&server_->mu_call_);
681       for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
682         cq_idx =
683             (start_request_queue_index + loop_count) % requests_per_cq_.size();
684         rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
685         if (rc != nullptr) {
686           break;
687         }
688       }
689       if (rc == nullptr) {
690         calld->SetState(CallData::CallState::PENDING);
691         pending_filter_stack_.push(PendingCallFilterStack{calld});
692         return;
693       }
694     }
695     calld->SetState(CallData::CallState::ACTIVATED);
696     calld->Publish(cq_idx, rc);
697   }
698 
MatchRequest(size_t start_request_queue_index)699   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
700       size_t start_request_queue_index) override {
701     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
702       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
703       RequestedCall* rc =
704           reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
705       if (rc != nullptr) {
706         return Immediate(MatchResult(server(), cq_idx, rc));
707       }
708     }
709     // No cq to take the request found; queue it on the slow list.
710     // We need to ensure that all the queues are empty.  We do this under
711     // the server mu_call_ lock to ensure that if something is added to
712     // an empty request queue, it will block until the call is actually
713     // added to the pending list.
714     RequestedCall* rc = nullptr;
715     size_t cq_idx = 0;
716     size_t loop_count;
717     {
718       std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
719       MutexLock lock(&server_->mu_call_);
720       while (!pending_promises_.empty() &&
721              pending_promises_.front()->Age() >
722                  server_->max_time_in_pending_queue_) {
723         removed_pending.push_back(std::move(pending_promises_.front()));
724         pending_promises_.pop();
725       }
726       for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
727         cq_idx =
728             (start_request_queue_index + loop_count) % requests_per_cq_.size();
729         rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
730         if (rc != nullptr) break;
731       }
732       if (rc == nullptr) {
733         if (server_->pending_backlog_protector_.Reject(pending_promises_.size(),
734                                                        server_->bitgen_)) {
735           return Immediate(absl::ResourceExhaustedError(
736               "Too many pending requests for this server"));
737         }
738         if (zombified_) {
739           return Immediate(absl::InternalError("Server closed"));
740         }
741         auto w = std::make_shared<ActivityWaiter>(
742             GetContext<Activity>()->MakeOwningWaker());
743         pending_promises_.push(w);
744         return OnCancel(
745             [w]() -> Poll<absl::StatusOr<MatchResult>> {
746               std::unique_ptr<absl::StatusOr<MatchResult>> r(
747                   w->result.exchange(nullptr, std::memory_order_acq_rel));
748               if (r == nullptr) return Pending{};
749               return std::move(*r);
750             },
751             [w]() { w->Finish(absl::CancelledError()); });
752       }
753     }
754     return Immediate(MatchResult(server(), cq_idx, rc));
755   }
756 
server() const757   Server* server() const final { return server_; }
758 
759  private:
760   Server* const server_;
761   struct PendingCallFilterStack {
762     CallData* calld;
763     Timestamp created = Timestamp::Now();
Agegrpc_core::Server::RealRequestMatcher::PendingCallFilterStack764     Duration Age() { return Timestamp::Now() - created; }
765   };
766   struct ActivityWaiter {
767     using ResultType = absl::StatusOr<MatchResult>;
ActivityWaitergrpc_core::Server::RealRequestMatcher::ActivityWaiter768     explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaitergrpc_core::Server::RealRequestMatcher::ActivityWaiter769     ~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
Finishgrpc_core::Server::RealRequestMatcher::ActivityWaiter770     void Finish(absl::Status status) {
771       ResultType* expected = nullptr;
772       ResultType* new_value = new ResultType(std::move(status));
773       if (!result.compare_exchange_strong(expected, new_value,
774                                           std::memory_order_acq_rel,
775                                           std::memory_order_acquire)) {
776         delete new_value;
777         return;
778       }
779       waker.WakeupAsync();
780     }
781     // Returns true if requested_call consumed, false otherwise.
Finishgrpc_core::Server::RealRequestMatcher::ActivityWaiter782     GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx,
783                                      RequestedCall* requested_call) {
784       ResultType* expected = nullptr;
785       ResultType* new_value =
786           new ResultType(MatchResult(server, cq_idx, requested_call));
787       if (!result.compare_exchange_strong(expected, new_value,
788                                           std::memory_order_acq_rel,
789                                           std::memory_order_acquire)) {
790         CHECK(new_value->value().TakeCall() == requested_call);
791         delete new_value;
792         return false;
793       }
794       waker.WakeupAsync();
795       return true;
796     }
Agegrpc_core::Server::RealRequestMatcher::ActivityWaiter797     Duration Age() { return Timestamp::Now() - created; }
798     Waker waker;
799     std::atomic<ResultType*> result{nullptr};
800     const Timestamp created = Timestamp::Now();
801   };
802   using PendingCallPromises = std::shared_ptr<ActivityWaiter>;
803   std::queue<PendingCallFilterStack> pending_filter_stack_;
804   std::queue<PendingCallPromises> pending_promises_;
805   std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
806   bool zombified_ = false;
807 };
808 
809 // AllocatingRequestMatchers don't allow the application to request an RPC in
810 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
811 // will call out to an allocation function passed in at the construction of the
812 // object. These request matchers are designed for the C++ callback API, so they
813 // only support 1 completion queue (passed in at the constructor). They are also
814 // used for the sync API.
815 class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface {
816  public:
AllocatingRequestMatcherBase(Server * server,grpc_completion_queue * cq)817   AllocatingRequestMatcherBase(Server* server, grpc_completion_queue* cq)
818       : server_(server), cq_(cq) {
819     size_t idx;
820     for (idx = 0; idx < server->cqs_.size(); idx++) {
821       if (server->cqs_[idx] == cq) {
822         break;
823       }
824     }
825     CHECK(idx < server->cqs_.size());
826     cq_idx_ = idx;
827   }
828 
ZombifyPending()829   void ZombifyPending() override {}
830 
KillRequests(grpc_error_handle)831   void KillRequests(grpc_error_handle /*error*/) override {}
832 
request_queue_count() const833   size_t request_queue_count() const override { return 0; }
834 
RequestCallWithPossiblePublish(size_t,RequestedCall *)835   void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
836                                       RequestedCall* /*call*/) final {
837     Crash("unreachable");
838   }
839 
server() const840   Server* server() const final { return server_; }
841 
842   // Supply the completion queue related to this request matcher
cq() const843   grpc_completion_queue* cq() const { return cq_; }
844 
845   // Supply the completion queue's index relative to the server.
cq_idx() const846   size_t cq_idx() const { return cq_idx_; }
847 
848  private:
849   Server* const server_;
850   grpc_completion_queue* const cq_;
851   size_t cq_idx_;
852 };
853 
854 // An allocating request matcher for non-registered methods (used for generic
855 // API and unimplemented RPCs).
856 class Server::AllocatingRequestMatcherBatch
857     : public AllocatingRequestMatcherBase {
858  public:
AllocatingRequestMatcherBatch(Server * server,grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)859   AllocatingRequestMatcherBatch(Server* server, grpc_completion_queue* cq,
860                                 std::function<BatchCallAllocation()> allocator)
861       : AllocatingRequestMatcherBase(server, cq),
862         allocator_(std::move(allocator)) {}
863 
MatchOrQueue(size_t,CallData * calld)864   void MatchOrQueue(size_t /*start_request_queue_index*/,
865                     CallData* calld) override {
866     const bool still_running = server()->ShutdownRefOnRequest();
867     auto cleanup_ref =
868         absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
869     if (still_running) {
870       BatchCallAllocation call_info = allocator_();
871       CHECK(server()->ValidateServerRequest(cq(),
872                                             static_cast<void*>(call_info.tag),
873                                             nullptr, nullptr) == GRPC_CALL_OK);
874       RequestedCall* rc = new RequestedCall(
875           static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
876           call_info.initial_metadata, call_info.details);
877       calld->SetState(CallData::CallState::ACTIVATED);
878       calld->Publish(cq_idx(), rc);
879     } else {
880       calld->FailCallCreation();
881     }
882   }
883 
MatchRequest(size_t)884   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
885       size_t /*start_request_queue_index*/) override {
886     BatchCallAllocation call_info = allocator_();
887     CHECK(server()->ValidateServerRequest(cq(),
888                                           static_cast<void*>(call_info.tag),
889                                           nullptr, nullptr) == GRPC_CALL_OK);
890     RequestedCall* rc = new RequestedCall(
891         static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
892         call_info.initial_metadata, call_info.details);
893     return Immediate(MatchResult(server(), cq_idx(), rc));
894   }
895 
896  private:
897   std::function<BatchCallAllocation()> allocator_;
898 };
899 
900 // An allocating request matcher for registered methods.
901 class Server::AllocatingRequestMatcherRegistered
902     : public AllocatingRequestMatcherBase {
903  public:
AllocatingRequestMatcherRegistered(Server * server,grpc_completion_queue * cq,RegisteredMethod * rm,std::function<RegisteredCallAllocation ()> allocator)904   AllocatingRequestMatcherRegistered(
905       Server* server, grpc_completion_queue* cq, RegisteredMethod* rm,
906       std::function<RegisteredCallAllocation()> allocator)
907       : AllocatingRequestMatcherBase(server, cq),
908         registered_method_(rm),
909         allocator_(std::move(allocator)) {}
910 
MatchOrQueue(size_t,CallData * calld)911   void MatchOrQueue(size_t /*start_request_queue_index*/,
912                     CallData* calld) override {
913     auto cleanup_ref =
914         absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
915     if (server()->ShutdownRefOnRequest()) {
916       RegisteredCallAllocation call_info = allocator_();
917       CHECK(server()->ValidateServerRequest(
918                 cq(), call_info.tag, call_info.optional_payload,
919                 registered_method_) == GRPC_CALL_OK);
920       RequestedCall* rc =
921           new RequestedCall(call_info.tag, call_info.cq, call_info.call,
922                             call_info.initial_metadata, registered_method_,
923                             call_info.deadline, call_info.optional_payload);
924       calld->SetState(CallData::CallState::ACTIVATED);
925       calld->Publish(cq_idx(), rc);
926     } else {
927       calld->FailCallCreation();
928     }
929   }
930 
MatchRequest(size_t)931   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
932       size_t /*start_request_queue_index*/) override {
933     RegisteredCallAllocation call_info = allocator_();
934     CHECK(server()->ValidateServerRequest(cq(), call_info.tag,
935                                           call_info.optional_payload,
936                                           registered_method_) == GRPC_CALL_OK);
937     RequestedCall* rc = new RequestedCall(
938         call_info.tag, call_info.cq, call_info.call, call_info.initial_metadata,
939         registered_method_, call_info.deadline, call_info.optional_payload);
940     return Immediate(MatchResult(server(), cq_idx(), rc));
941   }
942 
943  private:
944   RegisteredMethod* const registered_method_;
945   std::function<RegisteredCallAllocation()> allocator_;
946 };
947 
948 //
949 // ChannelBroadcaster
950 //
951 
952 namespace {
953 
954 class ChannelBroadcaster {
955  public:
956   // This can have an empty constructor and destructor since we want to control
957   // when the actual setup and shutdown broadcast take place.
958 
959   // Copies over the channels from the locked server.
FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels)960   void FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels) {
961     DCHECK(channels_.empty());
962     channels_ = std::move(channels);
963   }
964 
965   // Broadcasts a shutdown on each channel.
BroadcastShutdown(bool send_goaway,grpc_error_handle force_disconnect)966   void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) {
967     for (const RefCountedPtr<Channel>& channel : channels_) {
968       SendShutdown(channel.get(), send_goaway, force_disconnect);
969     }
970     channels_.clear();  // just for safety against double broadcast
971   }
972 
973  private:
974   struct ShutdownCleanupArgs {
975     grpc_closure closure;
976     grpc_slice slice;
977   };
978 
ShutdownCleanup(void * arg,grpc_error_handle)979   static void ShutdownCleanup(void* arg, grpc_error_handle /*error*/) {
980     ShutdownCleanupArgs* a = static_cast<ShutdownCleanupArgs*>(arg);
981     CSliceUnref(a->slice);
982     delete a;
983   }
984 
SendShutdown(Channel * channel,bool send_goaway,grpc_error_handle send_disconnect)985   static void SendShutdown(Channel* channel, bool send_goaway,
986                            grpc_error_handle send_disconnect) {
987     ShutdownCleanupArgs* sc = new ShutdownCleanupArgs;
988     GRPC_CLOSURE_INIT(&sc->closure, ShutdownCleanup, sc,
989                       grpc_schedule_on_exec_ctx);
990     grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
991     grpc_channel_element* elem;
992     op->goaway_error =
993         send_goaway
994             ? grpc_error_set_int(GRPC_ERROR_CREATE("Server shutdown"),
995                                  StatusIntProperty::kRpcStatus, GRPC_STATUS_OK)
996             : absl::OkStatus();
997     sc->slice = grpc_slice_from_copied_string("Server shutdown");
998     op->disconnect_with_error = send_disconnect;
999     elem = grpc_channel_stack_element(channel->channel_stack(), 0);
1000     elem->filter->start_transport_op(elem, op);
1001   }
1002 
1003   std::vector<RefCountedPtr<Channel>> channels_;
1004 };
1005 
1006 }  // namespace
1007 
1008 //
1009 // Server::TransportConnectivityWatcher
1010 //
1011 
1012 class Server::TransportConnectivityWatcher
1013     : public AsyncConnectivityStateWatcherInterface {
1014  public:
TransportConnectivityWatcher(RefCountedPtr<ServerTransport> transport,RefCountedPtr<Server> server)1015   TransportConnectivityWatcher(RefCountedPtr<ServerTransport> transport,
1016                                RefCountedPtr<Server> server)
1017       : transport_(std::move(transport)), server_(std::move(server)) {}
1018 
1019  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status &)1020   void OnConnectivityStateChange(grpc_connectivity_state new_state,
1021                                  const absl::Status& /*status*/) override {
1022     // Don't do anything until we are being shut down.
1023     if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1024     // Shut down channel.
1025     MutexLock lock(&server_->mu_global_);
1026     server_->connections_.erase(transport_.get());
1027     --server_->connections_open_;
1028     server_->MaybeFinishShutdown();
1029   }
1030 
1031   RefCountedPtr<ServerTransport> transport_;
1032   RefCountedPtr<Server> server_;
1033 };
1034 
1035 //
1036 // Server
1037 //
1038 
1039 const grpc_channel_filter Server::kServerTopFilter = {
1040     Server::CallData::StartTransportStreamOpBatch,
1041     grpc_channel_next_op,
1042     sizeof(Server::CallData),
1043     Server::CallData::InitCallElement,
1044     grpc_call_stack_ignore_set_pollset_or_pollset_set,
1045     Server::CallData::DestroyCallElement,
1046     sizeof(Server::ChannelData),
1047     Server::ChannelData::InitChannelElement,
1048     grpc_channel_stack_no_post_init,
1049     Server::ChannelData::DestroyChannelElement,
1050     grpc_channel_next_get_info,
1051     GRPC_UNIQUE_TYPE_NAME_HERE("server"),
1052 };
1053 
1054 namespace {
1055 
CreateChannelzNode(const ChannelArgs & args)1056 RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
1057     const ChannelArgs& args) {
1058   RefCountedPtr<channelz::ServerNode> channelz_node;
1059   if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
1060           .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
1061     size_t channel_tracer_max_memory = std::max(
1062         0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
1063                .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
1064     channelz_node =
1065         MakeRefCounted<channelz::ServerNode>(channel_tracer_max_memory);
1066     channelz_node->AddTraceEvent(
1067         channelz::ChannelTrace::Severity::Info,
1068         grpc_slice_from_static_string("Server created"));
1069   }
1070   return channelz_node;
1071 }
1072 
CheckClientMetadata(ValueOrFailure<ClientMetadataHandle> md)1073 absl::StatusOr<ClientMetadataHandle> CheckClientMetadata(
1074     ValueOrFailure<ClientMetadataHandle> md) {
1075   if (!md.ok()) {
1076     return absl::InternalError("Error reading metadata");
1077   }
1078   if (!md.value()->get_pointer(HttpPathMetadata())) {
1079     return absl::InternalError("Missing :path header");
1080   }
1081   if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
1082     return absl::InternalError("Missing :authority header");
1083   }
1084   return std::move(*md);
1085 }
1086 }  // namespace
1087 
MatchAndPublishCall(CallHandler call_handler)1088 auto Server::MatchAndPublishCall(CallHandler call_handler) {
1089   call_handler.SpawnGuardedUntilCallCompletes(
1090       "request_matcher", [this, call_handler]() mutable {
1091         return TrySeq(
1092             // Wait for initial metadata to pass through all filters
1093             Map(call_handler.PullClientInitialMetadata(), CheckClientMetadata),
1094             // Match request with requested call
1095             [this, call_handler](ClientMetadataHandle md) mutable {
1096               auto* registered_method = static_cast<RegisteredMethod*>(
1097                   md->get(GrpcRegisteredMethod()).value_or(nullptr));
1098               RequestMatcherInterface* rm;
1099               grpc_server_register_method_payload_handling payload_handling =
1100                   GRPC_SRM_PAYLOAD_NONE;
1101               if (registered_method == nullptr) {
1102                 rm = unregistered_request_matcher_.get();
1103               } else {
1104                 payload_handling = registered_method->payload_handling;
1105                 rm = registered_method->matcher.get();
1106               }
1107               using FirstMessageResult =
1108                   ValueOrFailure<absl::optional<MessageHandle>>;
1109               auto maybe_read_first_message = If(
1110                   payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
1111                   [call_handler]() mutable {
1112                     return Map(
1113                         call_handler.PullMessage(),
1114                         [](ClientToServerNextMessage next_msg)
1115                             -> FirstMessageResult {
1116                           if (!next_msg.ok()) return Failure{};
1117                           if (!next_msg.has_value()) {
1118                             return FirstMessageResult(absl::nullopt);
1119                           }
1120                           return FirstMessageResult(next_msg.TakeValue());
1121                         });
1122                   },
1123                   []() -> FirstMessageResult {
1124                     return FirstMessageResult(absl::nullopt);
1125                   });
1126               return TryJoin<absl::StatusOr>(
1127                   std::move(maybe_read_first_message), rm->MatchRequest(0),
1128                   [md = std::move(md)]() mutable {
1129                     return ValueOrFailure<ClientMetadataHandle>(std::move(md));
1130                   });
1131             },
1132             // Publish call to cq
1133             [call_handler,
1134              this](std::tuple<absl::optional<MessageHandle>,
1135                               RequestMatcherInterface::MatchResult,
1136                               ClientMetadataHandle>
1137                        r) {
1138               RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
1139               auto md = std::move(std::get<2>(r));
1140               auto* rc = mr.TakeCall();
1141               rc->Complete(std::move(std::get<0>(r)), *md);
1142               grpc_call* call =
1143                   MakeServerCall(call_handler, std::move(md), this,
1144                                  rc->cq_bound_to_call, rc->initial_metadata);
1145               *rc->call = call;
1146               return Map(
1147                   WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
1148                   [rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
1149                     return absl::OkStatus();
1150                   });
1151             });
1152       });
1153 }
1154 
1155 absl::StatusOr<RefCountedPtr<UnstartedCallDestination>>
MakeCallDestination(const ChannelArgs & args)1156 Server::MakeCallDestination(const ChannelArgs& args) {
1157   InterceptionChainBuilder builder(args);
1158   // TODO(ctiller): find a way to avoid adding a server ref per call
1159   builder.AddOnClientInitialMetadata([self = Ref()](ClientMetadata& md) {
1160     self->SetRegisteredMethodOnMetadata(md);
1161   });
1162   CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
1163       GRPC_SERVER_CHANNEL, builder);
1164   return builder.Build(
1165       MakeCallDestinationFromHandlerFunction([this](CallHandler handler) {
1166         return MatchAndPublishCall(std::move(handler));
1167       }));
1168 }
1169 
Server(const ChannelArgs & args)1170 Server::Server(const ChannelArgs& args)
1171     : channel_args_(args),
1172       channelz_node_(CreateChannelzNode(args)),
1173       server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
1174       compression_options_(CompressionOptionsFromChannelArgs(args)),
1175       max_time_in_pending_queue_(Duration::Seconds(
1176           channel_args_
1177               .GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)
1178               .value_or(30))) {}
1179 
~Server()1180 Server::~Server() {
1181   // Remove the cq pollsets from the config_fetcher.
1182   if (started_ && config_fetcher_ != nullptr &&
1183       config_fetcher_->interested_parties() != nullptr) {
1184     for (grpc_pollset* pollset : pollsets_) {
1185       grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(),
1186                                    pollset);
1187     }
1188   }
1189   for (size_t i = 0; i < cqs_.size(); i++) {
1190     GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server");
1191   }
1192 }
1193 
AddListener(OrphanablePtr<ListenerInterface> listener)1194 void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {
1195   channelz::ListenSocketNode* listen_socket_node =
1196       listener->channelz_listen_socket_node();
1197   if (listen_socket_node != nullptr && channelz_node_ != nullptr) {
1198     channelz_node_->AddChildListenSocket(
1199         listen_socket_node->RefAsSubclass<channelz::ListenSocketNode>());
1200   }
1201   ListenerInterface* ptr = listener.get();
1202   listener_states_.emplace_back(
1203       MakeRefCounted<ListenerState>(Ref(), std::move(listener)));
1204   ptr->SetServerListenerState(listener_states_.back());
1205 }
1206 
Start()1207 void Server::Start() {
1208   started_ = true;
1209   for (grpc_completion_queue* cq : cqs_) {
1210     if (grpc_cq_can_listen(cq)) {
1211       pollsets_.push_back(grpc_cq_pollset(cq));
1212     }
1213   }
1214   if (unregistered_request_matcher_ == nullptr) {
1215     unregistered_request_matcher_ = std::make_unique<RealRequestMatcher>(this);
1216   }
1217   for (auto& rm : registered_methods_) {
1218     if (rm.second->matcher == nullptr) {
1219       rm.second->matcher = std::make_unique<RealRequestMatcher>(this);
1220     }
1221   }
1222   {
1223     MutexLock lock(&mu_global_);
1224     starting_ = true;
1225   }
1226   // Register the interested parties from the config fetcher to the cq pollsets
1227   // before starting listeners so that config fetcher is being polled when the
1228   // listeners start watch the fetcher.
1229   if (config_fetcher_ != nullptr &&
1230       config_fetcher_->interested_parties() != nullptr) {
1231     for (grpc_pollset* pollset : pollsets_) {
1232       grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(),
1233                                    pollset);
1234     }
1235   }
1236   for (auto& listener_state : listener_states_) {
1237     listener_state->Start();
1238   }
1239   MutexLock lock(&mu_global_);
1240   starting_ = false;
1241   starting_cv_.Signal();
1242 }
1243 
SetupTransport(Transport * transport,grpc_pollset * accepting_pollset,const ChannelArgs & args,const RefCountedPtr<channelz::SocketNode> & socket_node)1244 grpc_error_handle Server::SetupTransport(
1245     Transport* transport, grpc_pollset* accepting_pollset,
1246     const ChannelArgs& args,
1247     const RefCountedPtr<channelz::SocketNode>& socket_node) {
1248   GRPC_LATENT_SEE_INNER_SCOPE("Server::SetupTransport");
1249   // Create channel.
1250   global_stats().IncrementServerChannelsCreated();
1251   // Set up channelz node.
1252   if (transport->server_transport() != nullptr) {
1253     // Take ownership
1254     // TODO(ctiller): post-v3-transition make this method take an
1255     // OrphanablePtr<ServerTransport> directly.
1256     OrphanablePtr<ServerTransport> t(transport->server_transport());
1257     auto destination = MakeCallDestination(args.SetObject(transport));
1258     if (!destination.ok()) {
1259       return absl_status_to_grpc_error(destination.status());
1260     }
1261     // TODO(ctiller): add channelz node
1262     t->SetCallDestination(std::move(*destination));
1263     MutexLock lock(&mu_global_);
1264     if (ShutdownCalled()) {
1265       t->DisconnectWithError(GRPC_ERROR_CREATE("Server shutdown"));
1266     }
1267     t->StartConnectivityWatch(MakeOrphanable<TransportConnectivityWatcher>(
1268         t->RefAsSubclass<ServerTransport>(), Ref()));
1269     GRPC_TRACE_LOG(server_channel, INFO) << "Adding connection";
1270     connections_.emplace(std::move(t));
1271     ++connections_open_;
1272   } else {
1273     CHECK(transport->filter_stack_transport() != nullptr);
1274     absl::StatusOr<RefCountedPtr<Channel>> channel = LegacyChannel::Create(
1275         "", args.SetObject(transport), GRPC_SERVER_CHANNEL);
1276     if (!channel.ok()) {
1277       return absl_status_to_grpc_error(channel.status());
1278     }
1279     CHECK(*channel != nullptr);
1280     auto* channel_stack = (*channel)->channel_stack();
1281     CHECK(channel_stack != nullptr);
1282     ChannelData* chand = static_cast<ChannelData*>(
1283         grpc_channel_stack_element(channel_stack, 0)->channel_data);
1284     // Set up CQs.
1285     size_t cq_idx;
1286     for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
1287       if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break;
1288     }
1289     if (cq_idx == cqs_.size()) {
1290       // Completion queue not found.  Pick a random one to publish new calls to.
1291       cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size());
1292     }
1293     intptr_t channelz_socket_uuid = 0;
1294     if (socket_node != nullptr) {
1295       channelz_socket_uuid = socket_node->uuid();
1296       channelz_node_->AddChildSocket(socket_node);
1297     }
1298     // Initialize chand.
1299     chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
1300                          channelz_socket_uuid);
1301   }
1302   return absl::OkStatus();
1303 }
1304 
HasOpenConnections()1305 bool Server::HasOpenConnections() {
1306   MutexLock lock(&mu_global_);
1307   return !channels_.empty() || !connections_.empty();
1308 }
1309 
SetRegisteredMethodAllocator(grpc_completion_queue * cq,void * method_tag,std::function<RegisteredCallAllocation ()> allocator)1310 void Server::SetRegisteredMethodAllocator(
1311     grpc_completion_queue* cq, void* method_tag,
1312     std::function<RegisteredCallAllocation()> allocator) {
1313   RegisteredMethod* rm = static_cast<RegisteredMethod*>(method_tag);
1314   rm->matcher = std::make_unique<AllocatingRequestMatcherRegistered>(
1315       this, cq, rm, std::move(allocator));
1316 }
1317 
SetBatchMethodAllocator(grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)1318 void Server::SetBatchMethodAllocator(
1319     grpc_completion_queue* cq, std::function<BatchCallAllocation()> allocator) {
1320   DCHECK(unregistered_request_matcher_ == nullptr);
1321   unregistered_request_matcher_ =
1322       std::make_unique<AllocatingRequestMatcherBatch>(this, cq,
1323                                                       std::move(allocator));
1324 }
1325 
RegisterCompletionQueue(grpc_completion_queue * cq)1326 void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {
1327   for (grpc_completion_queue* queue : cqs_) {
1328     if (queue == cq) return;
1329   }
1330   GRPC_CQ_INTERNAL_REF(cq, "server");
1331   cqs_.push_back(cq);
1332 }
1333 
RegisterMethod(const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1334 Server::RegisteredMethod* Server::RegisterMethod(
1335     const char* method, const char* host,
1336     grpc_server_register_method_payload_handling payload_handling,
1337     uint32_t flags) {
1338   if (started_) {
1339     Crash("Attempting to register method after server started");
1340   }
1341 
1342   if (!method) {
1343     LOG(ERROR) << "grpc_server_register_method method string cannot be NULL";
1344     return nullptr;
1345   }
1346   auto key = std::make_pair(host ? host : "", method);
1347   if (registered_methods_.find(key) != registered_methods_.end()) {
1348     LOG(ERROR) << "duplicate registration for " << method << "@"
1349                << (host ? host : "*");
1350     return nullptr;
1351   }
1352   if (flags != 0) {
1353     LOG(ERROR) << "grpc_server_register_method invalid flags "
1354                << absl::StrFormat("0x%08x", flags);
1355     return nullptr;
1356   }
1357   auto it = registered_methods_.emplace(
1358       key, std::make_unique<RegisteredMethod>(method, host, payload_handling,
1359                                               flags));
1360   return it.first->second.get();
1361 }
1362 
DoneRequestEvent(void * req,grpc_cq_completion *)1363 void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {
1364   delete static_cast<RequestedCall*>(req);
1365 }
1366 
FailCall(size_t cq_idx,RequestedCall * rc,grpc_error_handle error)1367 void Server::FailCall(size_t cq_idx, RequestedCall* rc,
1368                       grpc_error_handle error) {
1369   *rc->call = nullptr;
1370   rc->initial_metadata->count = 0;
1371   CHECK(!error.ok());
1372   grpc_cq_end_op(cqs_[cq_idx], rc->tag, error, DoneRequestEvent, rc,
1373                  &rc->completion);
1374 }
1375 
1376 // Before calling MaybeFinishShutdown(), we must hold mu_global_ and not
1377 // hold mu_call_.
MaybeFinishShutdown()1378 void Server::MaybeFinishShutdown() {
1379   if (!ShutdownReady() || shutdown_published_) {
1380     return;
1381   }
1382   {
1383     MutexLock lock(&mu_call_);
1384     KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1385   }
1386   if (!channels_.empty() || connections_open_ > 0 ||
1387       listeners_destroyed_ < listener_states_.size()) {
1388     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
1389                                   last_shutdown_message_time_),
1390                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
1391       last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1392       VLOG(2) << "Waiting for " << channels_.size() << " channels "
1393               << connections_open_ << " connections and "
1394               << listener_states_.size() - listeners_destroyed_ << "/"
1395               << listener_states_.size()
1396               << " listeners to be destroyed before shutting down server";
1397     }
1398     return;
1399   }
1400   shutdown_published_ = true;
1401   for (auto& shutdown_tag : shutdown_tags_) {
1402     Ref().release();
1403     grpc_cq_end_op(shutdown_tag.cq, shutdown_tag.tag, absl::OkStatus(),
1404                    DoneShutdownEvent, this, &shutdown_tag.completion);
1405   }
1406 }
1407 
KillPendingWorkLocked(grpc_error_handle error)1408 void Server::KillPendingWorkLocked(grpc_error_handle error) {
1409   if (started_) {
1410     unregistered_request_matcher_->KillRequests(error);
1411     unregistered_request_matcher_->ZombifyPending();
1412     for (auto& rm : registered_methods_) {
1413       rm.second->matcher->KillRequests(error);
1414       rm.second->matcher->ZombifyPending();
1415     }
1416   }
1417 }
1418 
GetChannelsLocked() const1419 std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {
1420   std::vector<RefCountedPtr<Channel>> channels;
1421   channels.reserve(channels_.size());
1422   for (const ChannelData* chand : channels_) {
1423     channels.push_back(chand->channel()->RefAsSubclass<Channel>());
1424   }
1425   return channels;
1426 }
1427 
ListenerDestroyDone(void * arg,grpc_error_handle)1428 void Server::ListenerDestroyDone(void* arg, grpc_error_handle /*error*/) {
1429   Server* server = static_cast<Server*>(arg);
1430   MutexLock lock(&server->mu_global_);
1431   server->listeners_destroyed_++;
1432   server->MaybeFinishShutdown();
1433 }
1434 
1435 namespace {
1436 
DonePublishedShutdown(void *,grpc_cq_completion * storage)1437 void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
1438   delete storage;
1439 }
1440 
1441 }  // namespace
1442 
1443 // - Kills all pending requests-for-incoming-RPC-calls (i.e., the requests made
1444 //   via grpc_server_request_call() and grpc_server_request_registered_call()
1445 //   will now be cancelled). See KillPendingWorkLocked().
1446 //
1447 // - Shuts down the listeners (i.e., the server will no longer listen on the
1448 //   port for new incoming channels).
1449 //
1450 // - Iterates through all channels on the server and sends shutdown msg (see
1451 //   ChannelBroadcaster::BroadcastShutdown() for details) to the clients via
1452 //   the transport layer. The transport layer then guarantees the following:
1453 //    -- Sends shutdown to the client (e.g., HTTP2 transport sends GOAWAY).
1454 //    -- If the server has outstanding calls that are in the process, the
1455 //       connection is NOT closed until the server is done with all those calls.
1456 //    -- Once there are no more calls in progress, the channel is closed.
ShutdownAndNotify(grpc_completion_queue * cq,void * tag)1457 void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
1458   ChannelBroadcaster broadcaster;
1459   absl::flat_hash_set<OrphanablePtr<ServerTransport>> removing_connections;
1460   {
1461     // Wait for startup to be finished.  Locks mu_global.
1462     MutexLock lock(&mu_global_);
1463     while (starting_) {
1464       starting_cv_.Wait(&mu_global_);
1465     }
1466     // Stay locked, and gather up some stuff to do.
1467     CHECK(grpc_cq_begin_op(cq, tag));
1468     if (shutdown_published_) {
1469       grpc_cq_end_op(cq, tag, absl::OkStatus(), DonePublishedShutdown, nullptr,
1470                      new grpc_cq_completion);
1471       return;
1472     }
1473     shutdown_tags_.emplace_back(tag, cq);
1474     if (ShutdownCalled()) {
1475       return;
1476     }
1477     last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1478     broadcaster.FillChannelsLocked(GetChannelsLocked());
1479     removing_connections.swap(connections_);
1480     // Collect all unregistered then registered calls.
1481     {
1482       MutexLock lock(&mu_call_);
1483       KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1484     }
1485     ShutdownUnrefOnShutdownCall();
1486   }
1487   StopListening();
1488   broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1489 }
1490 
StopListening()1491 void Server::StopListening() {
1492   for (auto& listener_state : listener_states_) {
1493     if (listener_state->listener() == nullptr) continue;
1494     channelz::ListenSocketNode* channelz_listen_socket_node =
1495         listener_state->listener()->channelz_listen_socket_node();
1496     if (channelz_node_ != nullptr && channelz_listen_socket_node != nullptr) {
1497       channelz_node_->RemoveChildListenSocket(
1498           channelz_listen_socket_node->uuid());
1499     }
1500     listener_state->Stop();
1501   }
1502 }
1503 
CancelAllCalls()1504 void Server::CancelAllCalls() {
1505   ChannelBroadcaster broadcaster;
1506   {
1507     MutexLock lock(&mu_global_);
1508     broadcaster.FillChannelsLocked(GetChannelsLocked());
1509   }
1510   broadcaster.BroadcastShutdown(
1511       /*send_goaway=*/false, GRPC_ERROR_CREATE("Cancelling all calls"));
1512 }
1513 
SendGoaways()1514 void Server::SendGoaways() {
1515   ChannelBroadcaster broadcaster;
1516   {
1517     MutexLock lock(&mu_global_);
1518     broadcaster.FillChannelsLocked(GetChannelsLocked());
1519   }
1520   broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1521 }
1522 
Orphan()1523 void Server::Orphan() {
1524   {
1525     MutexLock lock(&mu_global_);
1526     CHECK(ShutdownCalled() || listener_states_.empty());
1527     CHECK(listeners_destroyed_ == listener_states_.size());
1528   }
1529   listener_states_.clear();
1530   Unref();
1531 }
1532 
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1533 grpc_call_error Server::ValidateServerRequest(
1534     grpc_completion_queue* cq_for_notification, void* tag,
1535     grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1536   if ((rm == nullptr && optional_payload != nullptr) ||
1537       ((rm != nullptr) && ((optional_payload == nullptr) !=
1538                            (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
1539     return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1540   }
1541   if (!grpc_cq_begin_op(cq_for_notification, tag)) {
1542     return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1543   }
1544   return GRPC_CALL_OK;
1545 }
1546 
ValidateServerRequestAndCq(size_t * cq_idx,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1547 grpc_call_error Server::ValidateServerRequestAndCq(
1548     size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
1549     grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1550   size_t idx;
1551   for (idx = 0; idx < cqs_.size(); idx++) {
1552     if (cqs_[idx] == cq_for_notification) {
1553       break;
1554     }
1555   }
1556   if (idx == cqs_.size()) {
1557     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1558   }
1559   grpc_call_error error =
1560       ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
1561   if (error != GRPC_CALL_OK) {
1562     return error;
1563   }
1564   *cq_idx = idx;
1565   return GRPC_CALL_OK;
1566 }
1567 
QueueRequestedCall(size_t cq_idx,RequestedCall * rc)1568 grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) {
1569   if (ShutdownCalled()) {
1570     FailCall(cq_idx, rc, GRPC_ERROR_CREATE("Server Shutdown"));
1571     return GRPC_CALL_OK;
1572   }
1573   RequestMatcherInterface* rm;
1574   switch (rc->type) {
1575     case RequestedCall::Type::BATCH_CALL:
1576       rm = unregistered_request_matcher_.get();
1577       break;
1578     case RequestedCall::Type::REGISTERED_CALL:
1579       rm = rc->data.registered.method->matcher.get();
1580       break;
1581   }
1582   rm->RequestCallWithPossiblePublish(cq_idx, rc);
1583   return GRPC_CALL_OK;
1584 }
1585 
RequestCall(grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1586 grpc_call_error Server::RequestCall(grpc_call** call,
1587                                     grpc_call_details* details,
1588                                     grpc_metadata_array* request_metadata,
1589                                     grpc_completion_queue* cq_bound_to_call,
1590                                     grpc_completion_queue* cq_for_notification,
1591                                     void* tag) {
1592   size_t cq_idx;
1593   grpc_call_error error = ValidateServerRequestAndCq(
1594       &cq_idx, cq_for_notification, tag, nullptr, nullptr);
1595   if (error != GRPC_CALL_OK) {
1596     return error;
1597   }
1598   RequestedCall* rc =
1599       new RequestedCall(tag, cq_bound_to_call, call, request_metadata, details);
1600   return QueueRequestedCall(cq_idx, rc);
1601 }
1602 
RequestRegisteredCall(RegisteredMethod * rm,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1603 grpc_call_error Server::RequestRegisteredCall(
1604     RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
1605     grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload,
1606     grpc_completion_queue* cq_bound_to_call,
1607     grpc_completion_queue* cq_for_notification, void* tag_new) {
1608   size_t cq_idx;
1609   grpc_call_error error = ValidateServerRequestAndCq(
1610       &cq_idx, cq_for_notification, tag_new, optional_payload, rm);
1611   if (error != GRPC_CALL_OK) {
1612     return error;
1613   }
1614   RequestedCall* rc =
1615       new RequestedCall(tag_new, cq_bound_to_call, call, request_metadata, rm,
1616                         deadline, optional_payload);
1617   return QueueRequestedCall(cq_idx, rc);
1618 }
1619 
1620 //
1621 // Server::ChannelData::ConnectivityWatcher
1622 //
1623 
1624 class Server::ChannelData::ConnectivityWatcher
1625     : public AsyncConnectivityStateWatcherInterface {
1626  public:
ConnectivityWatcher(ChannelData * chand)1627   explicit ConnectivityWatcher(ChannelData* chand)
1628       : chand_(chand), channel_(chand_->channel_->RefAsSubclass<Channel>()) {}
1629 
1630  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status &)1631   void OnConnectivityStateChange(grpc_connectivity_state new_state,
1632                                  const absl::Status& /*status*/) override {
1633     // Don't do anything until we are being shut down.
1634     if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1635     // Shut down channel.
1636     MutexLock lock(&chand_->server_->mu_global_);
1637     chand_->Destroy();
1638   }
1639 
1640   ChannelData* const chand_;
1641   const RefCountedPtr<Channel> channel_;
1642 };
1643 
1644 //
1645 // Server::ChannelData
1646 //
1647 
~ChannelData()1648 Server::ChannelData::~ChannelData() {
1649   if (server_ != nullptr) {
1650     if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) {
1651       server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_);
1652     }
1653     {
1654       MutexLock lock(&server_->mu_global_);
1655       if (list_position_.has_value()) {
1656         server_->channels_.erase(*list_position_);
1657         list_position_.reset();
1658       }
1659       server_->MaybeFinishShutdown();
1660     }
1661   }
1662 }
1663 
InitTransport(RefCountedPtr<Server> server,RefCountedPtr<Channel> channel,size_t cq_idx,Transport * transport,intptr_t channelz_socket_uuid)1664 void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
1665                                         RefCountedPtr<Channel> channel,
1666                                         size_t cq_idx, Transport* transport,
1667                                         intptr_t channelz_socket_uuid) {
1668   server_ = std::move(server);
1669   channel_ = std::move(channel);
1670   cq_idx_ = cq_idx;
1671   channelz_socket_uuid_ = channelz_socket_uuid;
1672   // Publish channel.
1673   {
1674     MutexLock lock(&server_->mu_global_);
1675     server_->channels_.push_front(this);
1676     list_position_ = server_->channels_.begin();
1677   }
1678   // Start accept_stream transport op.
1679   grpc_transport_op* op = grpc_make_transport_op(nullptr);
1680   CHECK(transport->filter_stack_transport() != nullptr);
1681   op->set_accept_stream = true;
1682   op->set_accept_stream_fn = AcceptStream;
1683   op->set_registered_method_matcher_fn = [](void* arg,
1684                                             ClientMetadata* metadata) {
1685     static_cast<ChannelData*>(arg)->server_->SetRegisteredMethodOnMetadata(
1686         *metadata);
1687   };
1688   op->set_accept_stream_user_data = this;
1689   op->start_connectivity_watch = MakeOrphanable<ConnectivityWatcher>(this);
1690   if (server_->ShutdownCalled()) {
1691     op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
1692   }
1693   transport->PerformOp(op);
1694 }
1695 
GetRegisteredMethod(const absl::string_view & host,const absl::string_view & path)1696 Server::RegisteredMethod* Server::GetRegisteredMethod(
1697     const absl::string_view& host, const absl::string_view& path) {
1698   if (registered_methods_.empty()) return nullptr;
1699   // check for an exact match with host
1700   auto it = registered_methods_.find(std::make_pair(host, path));
1701   if (it != registered_methods_.end()) {
1702     return it->second.get();
1703   }
1704   // check for wildcard method definition (no host set)
1705   it = registered_methods_.find(std::make_pair("", path));
1706   if (it != registered_methods_.end()) {
1707     return it->second.get();
1708   }
1709   return nullptr;
1710 }
1711 
SetRegisteredMethodOnMetadata(ClientMetadata & metadata)1712 void Server::SetRegisteredMethodOnMetadata(ClientMetadata& metadata) {
1713   auto* authority = metadata.get_pointer(HttpAuthorityMetadata());
1714   if (authority == nullptr) {
1715     authority = metadata.get_pointer(HostMetadata());
1716     if (authority == nullptr) {
1717       // Authority not being set is an RPC error.
1718       return;
1719     }
1720   }
1721   auto* path = metadata.get_pointer(HttpPathMetadata());
1722   if (path == nullptr) {
1723     // Path not being set would result in an RPC error.
1724     return;
1725   }
1726   RegisteredMethod* method =
1727       GetRegisteredMethod(authority->as_string_view(), path->as_string_view());
1728   // insert in metadata
1729   metadata.Set(GrpcRegisteredMethod(), method);
1730 }
1731 
AcceptStream(void * arg,Transport *,const void * transport_server_data)1732 void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/,
1733                                        const void* transport_server_data) {
1734   auto* chand = static_cast<Server::ChannelData*>(arg);
1735   // create a call
1736   grpc_call_create_args args;
1737   args.channel = chand->channel_->RefAsSubclass<Channel>();
1738   args.server = chand->server_.get();
1739   args.parent = nullptr;
1740   args.propagation_mask = 0;
1741   args.cq = nullptr;
1742   args.pollset_set_alternative = nullptr;
1743   args.server_transport_data = transport_server_data;
1744   args.send_deadline = Timestamp::InfFuture();
1745   grpc_call* call;
1746   grpc_error_handle error = grpc_call_create(&args, &call);
1747   grpc_call_stack* call_stack = grpc_call_get_call_stack(call);
1748   CHECK_NE(call_stack, nullptr);
1749   grpc_call_element* elem = grpc_call_stack_element(call_stack, 0);
1750   auto* calld = static_cast<Server::CallData*>(elem->call_data);
1751   if (!error.ok()) {
1752     calld->FailCallCreation();
1753     return;
1754   }
1755   calld->Start(elem);
1756 }
1757 
FinishDestroy(void * arg,grpc_error_handle)1758 void Server::ChannelData::FinishDestroy(void* arg,
1759                                         grpc_error_handle /*error*/) {
1760   auto* chand = static_cast<Server::ChannelData*>(arg);
1761   Server* server = chand->server_.get();
1762   auto* channel_stack = chand->channel_->channel_stack();
1763   chand->channel_.reset();
1764   server->Unref();
1765   GRPC_CHANNEL_STACK_UNREF(channel_stack, "Server::ChannelData::Destroy");
1766 }
1767 
Destroy()1768 void Server::ChannelData::Destroy() {
1769   if (!list_position_.has_value()) return;
1770   CHECK(server_ != nullptr);
1771   server_->channels_.erase(*list_position_);
1772   list_position_.reset();
1773   server_->Ref().release();
1774   server_->MaybeFinishShutdown();
1775   // Unreffed by FinishDestroy
1776   GRPC_CHANNEL_STACK_REF(channel_->channel_stack(),
1777                          "Server::ChannelData::Destroy");
1778   GRPC_CLOSURE_INIT(&finish_destroy_channel_closure_, FinishDestroy, this,
1779                     grpc_schedule_on_exec_ctx);
1780   GRPC_TRACE_LOG(server_channel, INFO) << "Disconnected client";
1781   grpc_transport_op* op =
1782       grpc_make_transport_op(&finish_destroy_channel_closure_);
1783   op->set_accept_stream = true;
1784   grpc_channel_next_op(grpc_channel_stack_element(channel_->channel_stack(), 0),
1785                        op);
1786 }
1787 
InitChannelElement(grpc_channel_element * elem,grpc_channel_element_args * args)1788 grpc_error_handle Server::ChannelData::InitChannelElement(
1789     grpc_channel_element* elem, grpc_channel_element_args* args) {
1790   CHECK(args->is_first);
1791   CHECK(!args->is_last);
1792   new (elem->channel_data) ChannelData();
1793   return absl::OkStatus();
1794 }
1795 
DestroyChannelElement(grpc_channel_element * elem)1796 void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) {
1797   auto* chand = static_cast<ChannelData*>(elem->channel_data);
1798   chand->~ChannelData();
1799 }
1800 
1801 //
1802 // Server::CallData
1803 //
1804 
CallData(grpc_call_element * elem,const grpc_call_element_args & args,RefCountedPtr<Server> server)1805 Server::CallData::CallData(grpc_call_element* elem,
1806                            const grpc_call_element_args& args,
1807                            RefCountedPtr<Server> server)
1808     : server_(std::move(server)),
1809       call_(grpc_call_from_top_element(elem)),
1810       call_combiner_(args.call_combiner) {
1811   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
1812                     elem, grpc_schedule_on_exec_ctx);
1813   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
1814                     elem, grpc_schedule_on_exec_ctx);
1815 }
1816 
~CallData()1817 Server::CallData::~CallData() {
1818   CHECK(state_.load(std::memory_order_relaxed) != CallState::PENDING);
1819   grpc_metadata_array_destroy(&initial_metadata_);
1820   grpc_byte_buffer_destroy(payload_);
1821 }
1822 
SetState(CallState state)1823 void Server::CallData::SetState(CallState state) {
1824   state_.store(state, std::memory_order_relaxed);
1825 }
1826 
MaybeActivate()1827 bool Server::CallData::MaybeActivate() {
1828   CallState expected = CallState::PENDING;
1829   return state_.compare_exchange_strong(expected, CallState::ACTIVATED,
1830                                         std::memory_order_acq_rel,
1831                                         std::memory_order_relaxed);
1832 }
1833 
FailCallCreation()1834 void Server::CallData::FailCallCreation() {
1835   CallState expected_not_started = CallState::NOT_STARTED;
1836   CallState expected_pending = CallState::PENDING;
1837   if (state_.compare_exchange_strong(expected_not_started, CallState::ZOMBIED,
1838                                      std::memory_order_acq_rel,
1839                                      std::memory_order_acquire)) {
1840     KillZombie();
1841   } else if (state_.compare_exchange_strong(
1842                  expected_pending, CallState::ZOMBIED,
1843                  std::memory_order_acq_rel, std::memory_order_relaxed)) {
1844     // Zombied call will be destroyed when it's removed from the pending
1845     // queue... later.
1846   }
1847 }
1848 
Start(grpc_call_element * elem)1849 void Server::CallData::Start(grpc_call_element* elem) {
1850   grpc_op op;
1851   op.op = GRPC_OP_RECV_INITIAL_METADATA;
1852   op.flags = 0;
1853   op.reserved = nullptr;
1854   op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_;
1855   GRPC_CLOSURE_INIT(&recv_initial_metadata_batch_complete_,
1856                     RecvInitialMetadataBatchComplete, elem,
1857                     grpc_schedule_on_exec_ctx);
1858   grpc_call_start_batch_and_execute(call_, &op, 1,
1859                                     &recv_initial_metadata_batch_complete_);
1860 }
1861 
Publish(size_t cq_idx,RequestedCall * rc)1862 void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) {
1863   grpc_call_set_completion_queue(call_, rc->cq_bound_to_call);
1864   *rc->call = call_;
1865   cq_new_ = server_->cqs_[cq_idx];
1866   std::swap(*rc->initial_metadata, initial_metadata_);
1867   switch (rc->type) {
1868     case RequestedCall::Type::BATCH_CALL:
1869       CHECK(host_.has_value());
1870       CHECK(path_.has_value());
1871       rc->data.batch.details->host = CSliceRef(host_->c_slice());
1872       rc->data.batch.details->method = CSliceRef(path_->c_slice());
1873       rc->data.batch.details->deadline =
1874           deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1875       break;
1876     case RequestedCall::Type::REGISTERED_CALL:
1877       *rc->data.registered.deadline =
1878           deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1879       if (rc->data.registered.optional_payload != nullptr) {
1880         *rc->data.registered.optional_payload = payload_;
1881         payload_ = nullptr;
1882       }
1883       break;
1884     default:
1885       GPR_UNREACHABLE_CODE(return);
1886   }
1887   grpc_cq_end_op(cq_new_, rc->tag, absl::OkStatus(), Server::DoneRequestEvent,
1888                  rc, &rc->completion, true);
1889 }
1890 
PublishNewRpc(void * arg,grpc_error_handle error)1891 void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) {
1892   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
1893   auto* calld = static_cast<Server::CallData*>(call_elem->call_data);
1894   auto* chand = static_cast<Server::ChannelData*>(call_elem->channel_data);
1895   RequestMatcherInterface* rm = calld->matcher_;
1896   Server* server = rm->server();
1897   if (!error.ok() || server->ShutdownCalled()) {
1898     calld->state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1899     calld->KillZombie();
1900     return;
1901   }
1902   rm->MatchOrQueue(chand->cq_idx(), calld);
1903 }
1904 
1905 namespace {
1906 
KillZombieClosure(void * call,grpc_error_handle)1907 void KillZombieClosure(void* call, grpc_error_handle /*error*/) {
1908   grpc_call_unref(static_cast<grpc_call*>(call));
1909 }
1910 
1911 }  // namespace
1912 
KillZombie()1913 void Server::CallData::KillZombie() {
1914   GRPC_CLOSURE_INIT(&kill_zombie_closure_, KillZombieClosure, call_,
1915                     grpc_schedule_on_exec_ctx);
1916   ExecCtx::Run(DEBUG_LOCATION, &kill_zombie_closure_, absl::OkStatus());
1917 }
1918 
1919 // If this changes, change MakeCallPromise too.
StartNewRpc(grpc_call_element * elem)1920 void Server::CallData::StartNewRpc(grpc_call_element* elem) {
1921   if (server_->ShutdownCalled()) {
1922     state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1923     KillZombie();
1924     return;
1925   }
1926   // Find request matcher.
1927   matcher_ = server_->unregistered_request_matcher_.get();
1928   grpc_server_register_method_payload_handling payload_handling =
1929       GRPC_SRM_PAYLOAD_NONE;
1930   if (path_.has_value() && host_.has_value()) {
1931     RegisteredMethod* rm = static_cast<RegisteredMethod*>(
1932         recv_initial_metadata_->get(GrpcRegisteredMethod()).value_or(nullptr));
1933     if (rm != nullptr) {
1934       matcher_ = rm->matcher.get();
1935       payload_handling = rm->payload_handling;
1936     }
1937   }
1938   // Start recv_message op if needed.
1939   switch (payload_handling) {
1940     case GRPC_SRM_PAYLOAD_NONE:
1941       PublishNewRpc(elem, absl::OkStatus());
1942       break;
1943     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
1944       grpc_op op;
1945       op.op = GRPC_OP_RECV_MESSAGE;
1946       op.flags = 0;
1947       op.reserved = nullptr;
1948       op.data.recv_message.recv_message = &payload_;
1949       GRPC_CLOSURE_INIT(&publish_, PublishNewRpc, elem,
1950                         grpc_schedule_on_exec_ctx);
1951       grpc_call_start_batch_and_execute(call_, &op, 1, &publish_);
1952       break;
1953     }
1954   }
1955 }
1956 
RecvInitialMetadataBatchComplete(void * arg,grpc_error_handle error)1957 void Server::CallData::RecvInitialMetadataBatchComplete(
1958     void* arg, grpc_error_handle error) {
1959   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1960   auto* calld = static_cast<Server::CallData*>(elem->call_data);
1961   if (!error.ok()) {
1962     VLOG(2) << "Failed call creation: " << StatusToString(error);
1963     calld->FailCallCreation();
1964     return;
1965   }
1966   calld->StartNewRpc(elem);
1967 }
1968 
StartTransportStreamOpBatchImpl(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1969 void Server::CallData::StartTransportStreamOpBatchImpl(
1970     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1971   if (batch->recv_initial_metadata) {
1972     recv_initial_metadata_ =
1973         batch->payload->recv_initial_metadata.recv_initial_metadata;
1974     original_recv_initial_metadata_ready_ =
1975         batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
1976     batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1977         &recv_initial_metadata_ready_;
1978   }
1979   if (batch->recv_trailing_metadata) {
1980     original_recv_trailing_metadata_ready_ =
1981         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1982     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1983         &recv_trailing_metadata_ready_;
1984   }
1985   grpc_call_next_op(elem, batch);
1986 }
1987 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1988 void Server::CallData::RecvInitialMetadataReady(void* arg,
1989                                                 grpc_error_handle error) {
1990   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1991   CallData* calld = static_cast<CallData*>(elem->call_data);
1992   if (error.ok()) {
1993     calld->path_ = calld->recv_initial_metadata_->Take(HttpPathMetadata());
1994     auto* host =
1995         calld->recv_initial_metadata_->get_pointer(HttpAuthorityMetadata());
1996     if (host != nullptr) calld->host_.emplace(host->Ref());
1997   }
1998   auto op_deadline = calld->recv_initial_metadata_->get(GrpcTimeoutMetadata());
1999   if (op_deadline.has_value()) {
2000     calld->deadline_ = *op_deadline;
2001     Call::FromC(calld->call_)->UpdateDeadline(*op_deadline);
2002   }
2003   if (calld->host_.has_value() && calld->path_.has_value()) {
2004     // do nothing
2005   } else if (error.ok()) {
2006     // Pass the error reference to calld->recv_initial_metadata_error
2007     error = absl::UnknownError("Missing :authority or :path");
2008     calld->recv_initial_metadata_error_ = error;
2009   }
2010   grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
2011   calld->original_recv_initial_metadata_ready_ = nullptr;
2012   if (calld->seen_recv_trailing_metadata_ready_) {
2013     GRPC_CALL_COMBINER_START(calld->call_combiner_,
2014                              &calld->recv_trailing_metadata_ready_,
2015                              calld->recv_trailing_metadata_error_,
2016                              "continue server recv_trailing_metadata_ready");
2017   }
2018   Closure::Run(DEBUG_LOCATION, closure, error);
2019 }
2020 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)2021 void Server::CallData::RecvTrailingMetadataReady(void* arg,
2022                                                  grpc_error_handle error) {
2023   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2024   CallData* calld = static_cast<CallData*>(elem->call_data);
2025   if (calld->original_recv_initial_metadata_ready_ != nullptr) {
2026     calld->recv_trailing_metadata_error_ = error;
2027     calld->seen_recv_trailing_metadata_ready_ = true;
2028     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2029                       RecvTrailingMetadataReady, elem,
2030                       grpc_schedule_on_exec_ctx);
2031     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2032                             "deferring server recv_trailing_metadata_ready "
2033                             "until after recv_initial_metadata_ready");
2034     return;
2035   }
2036   error = grpc_error_add_child(error, calld->recv_initial_metadata_error_);
2037   Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2038                error);
2039 }
2040 
InitCallElement(grpc_call_element * elem,const grpc_call_element_args * args)2041 grpc_error_handle Server::CallData::InitCallElement(
2042     grpc_call_element* elem, const grpc_call_element_args* args) {
2043   auto* chand = static_cast<ChannelData*>(elem->channel_data);
2044   new (elem->call_data) Server::CallData(elem, *args, chand->server());
2045   return absl::OkStatus();
2046 }
2047 
DestroyCallElement(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)2048 void Server::CallData::DestroyCallElement(
2049     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
2050     grpc_closure* /*ignored*/) {
2051   auto* calld = static_cast<CallData*>(elem->call_data);
2052   calld->~CallData();
2053 }
2054 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2055 void Server::CallData::StartTransportStreamOpBatch(
2056     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2057   auto* calld = static_cast<CallData*>(elem->call_data);
2058   calld->StartTransportStreamOpBatchImpl(elem, batch);
2059 }
2060 
2061 }  // namespace grpc_core
2062 
2063 //
2064 // C-core API
2065 //
2066 
grpc_server_create(const grpc_channel_args * args,void * reserved)2067 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
2068   grpc_core::ExecCtx exec_ctx;
2069   GRPC_TRACE_LOG(api, INFO)
2070       << "grpc_server_create(" << args << ", " << reserved << ")";
2071   grpc_core::Server* server =
2072       new grpc_core::Server(grpc_core::CoreConfiguration::Get()
2073                                 .channel_args_preconditioning()
2074                                 .PreconditionChannelArgs(args));
2075   return server->c_ptr();
2076 }
2077 
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)2078 void grpc_server_register_completion_queue(grpc_server* server,
2079                                            grpc_completion_queue* cq,
2080                                            void* reserved) {
2081   GRPC_TRACE_LOG(api, INFO)
2082       << "grpc_server_register_completion_queue(server=" << server
2083       << ", cq=" << cq << ", reserved=" << reserved << ")";
2084   CHECK(!reserved);
2085   auto cq_type = grpc_get_cq_completion_type(cq);
2086   if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
2087     VLOG(2) << "Completion queue of type " << static_cast<int>(cq_type)
2088             << " is being registered as a server-completion-queue";
2089     // Ideally we should log an error and abort but ruby-wrapped-language API
2090     // calls grpc_completion_queue_pluck() on server completion queues
2091   }
2092   grpc_core::Server::FromC(server)->RegisterCompletionQueue(cq);
2093 }
2094 
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)2095 void* grpc_server_register_method(
2096     grpc_server* server, const char* method, const char* host,
2097     grpc_server_register_method_payload_handling payload_handling,
2098     uint32_t flags) {
2099   GRPC_TRACE_LOG(api, INFO) << "grpc_server_register_method(server=" << server
2100                             << ", method=" << method << ", host=" << host
2101                             << ", flags=" << absl::StrFormat("0x%08x", flags);
2102   return grpc_core::Server::FromC(server)->RegisterMethod(
2103       method, host, payload_handling, flags);
2104 }
2105 
grpc_server_start(grpc_server * server)2106 void grpc_server_start(grpc_server* server) {
2107   grpc_core::ExecCtx exec_ctx;
2108   GRPC_TRACE_LOG(api, INFO) << "grpc_server_start(server=" << server << ")";
2109   grpc_core::Server::FromC(server)->Start();
2110 }
2111 
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)2112 void grpc_server_shutdown_and_notify(grpc_server* server,
2113                                      grpc_completion_queue* cq, void* tag) {
2114   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2115   grpc_core::ExecCtx exec_ctx;
2116   GRPC_TRACE_LOG(api, INFO)
2117       << "grpc_server_shutdown_and_notify(server=" << server << ", cq=" << cq
2118       << ", tag=" << tag << ")";
2119   grpc_core::Server::FromC(server)->ShutdownAndNotify(cq, tag);
2120 }
2121 
grpc_server_cancel_all_calls(grpc_server * server)2122 void grpc_server_cancel_all_calls(grpc_server* server) {
2123   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2124   grpc_core::ExecCtx exec_ctx;
2125   GRPC_TRACE_LOG(api, INFO)
2126       << "grpc_server_cancel_all_calls(server=" << server << ")";
2127   grpc_core::Server::FromC(server)->CancelAllCalls();
2128 }
2129 
grpc_server_destroy(grpc_server * server)2130 void grpc_server_destroy(grpc_server* server) {
2131   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2132   grpc_core::ExecCtx exec_ctx;
2133   GRPC_TRACE_LOG(api, INFO) << "grpc_server_destroy(server=" << server << ")";
2134   grpc_core::Server::FromC(server)->Orphan();
2135 }
2136 
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)2137 grpc_call_error grpc_server_request_call(
2138     grpc_server* server, grpc_call** call, grpc_call_details* details,
2139     grpc_metadata_array* request_metadata,
2140     grpc_completion_queue* cq_bound_to_call,
2141     grpc_completion_queue* cq_for_notification, void* tag) {
2142   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2143   grpc_core::ExecCtx exec_ctx;
2144   GRPC_TRACE_LOG(api, INFO)
2145       << "grpc_server_request_call(" << "server=" << server << ", call=" << call
2146       << ", details=" << details << ", initial_metadata=" << request_metadata
2147       << ", cq_bound_to_call=" << cq_bound_to_call
2148       << ", cq_for_notification=" << cq_for_notification << ", tag=" << tag;
2149   return grpc_core::Server::FromC(server)->RequestCall(
2150       call, details, request_metadata, cq_bound_to_call, cq_for_notification,
2151       tag);
2152 }
2153 
grpc_server_request_registered_call(grpc_server * server,void * registered_method,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)2154 grpc_call_error grpc_server_request_registered_call(
2155     grpc_server* server, void* registered_method, grpc_call** call,
2156     gpr_timespec* deadline, grpc_metadata_array* request_metadata,
2157     grpc_byte_buffer** optional_payload,
2158     grpc_completion_queue* cq_bound_to_call,
2159     grpc_completion_queue* cq_for_notification, void* tag_new) {
2160   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2161   grpc_core::ExecCtx exec_ctx;
2162   auto* rm =
2163       static_cast<grpc_core::Server::RegisteredMethod*>(registered_method);
2164   GRPC_TRACE_LOG(api, INFO)
2165       << "grpc_server_request_registered_call(" << "server=" << server
2166       << ", registered_method=" << registered_method << ", call=" << call
2167       << ", deadline=" << deadline << ", request_metadata=" << request_metadata
2168       << ", optional_payload=" << optional_payload
2169       << ", cq_bound_to_call=" << cq_bound_to_call
2170       << ", cq_for_notification=" << cq_for_notification << ", tag=" << tag_new
2171       << ")";
2172   return grpc_core::Server::FromC(server)->RequestRegisteredCall(
2173       rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call,
2174       cq_for_notification, tag_new);
2175 }
2176 
grpc_server_set_config_fetcher(grpc_server * server,grpc_server_config_fetcher * server_config_fetcher)2177 void grpc_server_set_config_fetcher(
2178     grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) {
2179   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2180   grpc_core::ExecCtx exec_ctx;
2181   GRPC_TRACE_LOG(api, INFO)
2182       << "grpc_server_set_config_fetcher(server=" << server
2183       << ", config_fetcher=" << server_config_fetcher << ")";
2184   grpc_core::Server::FromC(server)->set_config_fetcher(
2185       std::unique_ptr<grpc_core::ServerConfigFetcher>(
2186           grpc_core::ServerConfigFetcher::FromC(server_config_fetcher)));
2187 }
2188 
grpc_server_config_fetcher_destroy(grpc_server_config_fetcher * server_config_fetcher)2189 void grpc_server_config_fetcher_destroy(
2190     grpc_server_config_fetcher* server_config_fetcher) {
2191   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2192   grpc_core::ExecCtx exec_ctx;
2193   GRPC_TRACE_LOG(api, INFO)
2194       << "grpc_server_config_fetcher_destroy(config_fetcher="
2195       << server_config_fetcher << ")";
2196   delete grpc_core::ServerConfigFetcher::FromC(server_config_fetcher);
2197 }
2198