• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015-2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/lib/surface/server.h"
20 
21 #include <inttypes.h>
22 #include <stdlib.h>
23 #include <string.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <list>
28 #include <memory>
29 #include <new>
30 #include <queue>
31 #include <type_traits>
32 #include <utility>
33 #include <vector>
34 
35 #include "absl/cleanup/cleanup.h"
36 #include "absl/container/flat_hash_map.h"
37 #include "absl/status/status.h"
38 #include "absl/types/optional.h"
39 
40 #include <grpc/byte_buffer.h>
41 #include <grpc/grpc.h>
42 #include <grpc/impl/channel_arg_names.h>
43 #include <grpc/impl/connectivity_state.h>
44 #include <grpc/slice.h>
45 #include <grpc/status.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/time.h>
48 
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channel_args_preconditioning.h"
51 #include "src/core/lib/channel/channel_trace.h"
52 #include "src/core/lib/channel/channelz.h"
53 #include "src/core/lib/config/core_configuration.h"
54 #include "src/core/lib/debug/stats.h"
55 #include "src/core/lib/experiments/experiments.h"
56 #include "src/core/lib/gpr/useful.h"
57 #include "src/core/lib/gprpp/crash.h"
58 #include "src/core/lib/gprpp/debug_location.h"
59 #include "src/core/lib/gprpp/mpscq.h"
60 #include "src/core/lib/gprpp/status_helper.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/iomgr/pollset_set.h"
63 #include "src/core/lib/promise/activity.h"
64 #include "src/core/lib/promise/cancel_callback.h"
65 #include "src/core/lib/promise/context.h"
66 #include "src/core/lib/promise/map.h"
67 #include "src/core/lib/promise/pipe.h"
68 #include "src/core/lib/promise/poll.h"
69 #include "src/core/lib/promise/promise.h"
70 #include "src/core/lib/promise/seq.h"
71 #include "src/core/lib/promise/try_join.h"
72 #include "src/core/lib/promise/try_seq.h"
73 #include "src/core/lib/slice/slice_buffer.h"
74 #include "src/core/lib/slice/slice_internal.h"
75 #include "src/core/lib/surface/api_trace.h"
76 #include "src/core/lib/surface/call.h"
77 #include "src/core/lib/surface/channel.h"
78 #include "src/core/lib/surface/channel_stack_type.h"
79 #include "src/core/lib/surface/completion_queue.h"
80 #include "src/core/lib/surface/legacy_channel.h"
81 #include "src/core/lib/surface/wait_for_cq_end_op.h"
82 #include "src/core/lib/transport/connectivity_state.h"
83 #include "src/core/lib/transport/error_utils.h"
84 
85 namespace grpc_core {
86 
87 TraceFlag grpc_server_channel_trace(false, "server_channel");
88 
89 //
90 // Server::RegisteredMethod
91 //
92 
93 struct Server::RegisteredMethod {
RegisteredMethodgrpc_core::Server::RegisteredMethod94   RegisteredMethod(
95       const char* method_arg, const char* host_arg,
96       grpc_server_register_method_payload_handling payload_handling_arg,
97       uint32_t flags_arg)
98       : method(method_arg == nullptr ? "" : method_arg),
99         host(host_arg == nullptr ? "" : host_arg),
100         payload_handling(payload_handling_arg),
101         flags(flags_arg) {}
102 
103   ~RegisteredMethod() = default;
104 
105   const std::string method;
106   const std::string host;
107   const grpc_server_register_method_payload_handling payload_handling;
108   const uint32_t flags;
109   // One request matcher per method.
110   std::unique_ptr<RequestMatcherInterface> matcher;
111 };
112 
113 //
114 // Server::RequestMatcherInterface
115 //
116 
117 // RPCs that come in from the transport must be matched against RPC requests
118 // from the application. An incoming request from the application can be matched
119 // to an RPC that has already arrived or can be queued up for later use.
120 // Likewise, an RPC coming in from the transport can either be matched to a
121 // request that already arrived from the application or can be queued up for
122 // later use (marked pending). If there is a match, the request's tag is posted
123 // on the request's notification CQ.
124 //
125 // RequestMatcherInterface is the base class to provide this functionality.
126 class Server::RequestMatcherInterface {
127  public:
~RequestMatcherInterface()128   virtual ~RequestMatcherInterface() {}
129 
130   // Unref the calls associated with any incoming RPCs in the pending queue (not
131   // yet matched to an application-requested RPC).
132   virtual void ZombifyPending() = 0;
133 
134   // Mark all application-requested RPCs failed if they have not been matched to
135   // an incoming RPC. The error parameter indicates why the RPCs are being
136   // failed (always server shutdown in all current implementations).
137   virtual void KillRequests(grpc_error_handle error) = 0;
138 
139   // How many request queues are supported by this matcher. This is an abstract
140   // concept that essentially maps to gRPC completion queues.
141   virtual size_t request_queue_count() const = 0;
142 
143   // This function is invoked when the application requests a new RPC whose
144   // information is in the call parameter. The request_queue_index marks the
145   // queue onto which to place this RPC, and is typically associated with a gRPC
146   // CQ. If there are pending RPCs waiting to be matched, publish one (match it
147   // and notify the CQ).
148   virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
149                                               RequestedCall* call) = 0;
150 
151   class MatchResult {
152    public:
MatchResult(Server * server,size_t cq_idx,RequestedCall * requested_call)153     MatchResult(Server* server, size_t cq_idx, RequestedCall* requested_call)
154         : server_(server), cq_idx_(cq_idx), requested_call_(requested_call) {}
~MatchResult()155     ~MatchResult() {
156       if (requested_call_ != nullptr) {
157         server_->FailCall(cq_idx_, requested_call_, absl::CancelledError());
158       }
159     }
160 
161     MatchResult(const MatchResult&) = delete;
162     MatchResult& operator=(const MatchResult&) = delete;
163 
MatchResult(MatchResult && other)164     MatchResult(MatchResult&& other) noexcept
165         : server_(other.server_),
166           cq_idx_(other.cq_idx_),
167           requested_call_(std::exchange(other.requested_call_, nullptr)) {}
168 
TakeCall()169     RequestedCall* TakeCall() {
170       return std::exchange(requested_call_, nullptr);
171     }
172 
cq() const173     grpc_completion_queue* cq() const { return server_->cqs_[cq_idx_]; }
cq_idx() const174     size_t cq_idx() const { return cq_idx_; }
175 
176    private:
177     Server* server_;
178     size_t cq_idx_;
179     RequestedCall* requested_call_;
180   };
181 
182   // This function is invoked on an incoming promise based RPC.
183   // The RequestMatcher will try to match it against an application-requested
184   // RPC if possible or will place it in the pending queue otherwise. To enable
185   // some measure of fairness between server CQs, the match is done starting at
186   // the start_request_queue_index parameter in a cyclic order rather than
187   // always starting at 0.
188   virtual ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
189       size_t start_request_queue_index) = 0;
190 
191   // This function is invoked on an incoming RPC, represented by the calld
192   // object. The RequestMatcher will try to match it against an
193   // application-requested RPC if possible or will place it in the pending queue
194   // otherwise. To enable some measure of fairness between server CQs, the match
195   // is done starting at the start_request_queue_index parameter in a cyclic
196   // order rather than always starting at 0.
197   virtual void MatchOrQueue(size_t start_request_queue_index,
198                             CallData* calld) = 0;
199 
200   // Returns the server associated with this request matcher
201   virtual Server* server() const = 0;
202 };
203 
204 //
205 // Server::RequestedCall
206 //
207 
208 struct Server::RequestedCall {
209   enum class Type { BATCH_CALL, REGISTERED_CALL };
210 
RequestedCallgrpc_core::Server::RequestedCall211   RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
212                 grpc_call** call_arg, grpc_metadata_array* initial_md,
213                 grpc_call_details* details)
214       : type(Type::BATCH_CALL),
215         tag(tag_arg),
216         cq_bound_to_call(call_cq),
217         call(call_arg),
218         initial_metadata(initial_md) {
219     data.batch.details = details;
220   }
221 
RequestedCallgrpc_core::Server::RequestedCall222   RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
223                 grpc_call** call_arg, grpc_metadata_array* initial_md,
224                 RegisteredMethod* rm, gpr_timespec* deadline,
225                 grpc_byte_buffer** optional_payload)
226       : type(Type::REGISTERED_CALL),
227         tag(tag_arg),
228         cq_bound_to_call(call_cq),
229         call(call_arg),
230         initial_metadata(initial_md) {
231     data.registered.method = rm;
232     data.registered.deadline = deadline;
233     data.registered.optional_payload = optional_payload;
234   }
235 
Completegrpc_core::Server::RequestedCall236   void Complete(NextResult<MessageHandle> payload, ClientMetadata& md) {
237     Timestamp deadline = GetContext<CallContext>()->deadline();
238     switch (type) {
239       case RequestedCall::Type::BATCH_CALL:
240         GPR_ASSERT(!payload.has_value());
241         data.batch.details->host =
242             CSliceRef(md.get_pointer(HttpAuthorityMetadata())->c_slice());
243         data.batch.details->method =
244             CSliceRef(md.Take(HttpPathMetadata())->c_slice());
245         data.batch.details->deadline =
246             deadline.as_timespec(GPR_CLOCK_MONOTONIC);
247         break;
248       case RequestedCall::Type::REGISTERED_CALL:
249         md.Remove(HttpPathMetadata());
250         *data.registered.deadline = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
251         if (data.registered.optional_payload != nullptr) {
252           if (payload.has_value()) {
253             auto* sb = payload.value()->payload()->c_slice_buffer();
254             *data.registered.optional_payload =
255                 grpc_raw_byte_buffer_create(sb->slices, sb->count);
256           } else {
257             *data.registered.optional_payload = nullptr;
258           }
259         }
260         break;
261       default:
262         GPR_UNREACHABLE_CODE(abort());
263     }
264   }
265 
266   MultiProducerSingleConsumerQueue::Node mpscq_node;
267   const Type type;
268   void* const tag;
269   grpc_completion_queue* const cq_bound_to_call;
270   grpc_call** const call;
271   grpc_cq_completion completion;
272   grpc_metadata_array* const initial_metadata;
273   union {
274     struct {
275       grpc_call_details* details;
276     } batch;
277     struct {
278       RegisteredMethod* method;
279       gpr_timespec* deadline;
280       grpc_byte_buffer** optional_payload;
281     } registered;
282   } data;
283 };
284 
285 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
286 // actually uses all the features of RequestMatcherInterface: expecting the
287 // application to explicitly request RPCs and then matching those to incoming
288 // RPCs, along with a slow path by which incoming RPCs are put on a locked
289 // pending list if they aren't able to be matched to an application request.
290 class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
291  public:
RealRequestMatcherFilterStack(Server * server)292   explicit RealRequestMatcherFilterStack(Server* server)
293       : server_(server), requests_per_cq_(server->cqs_.size()) {}
294 
~RealRequestMatcherFilterStack()295   ~RealRequestMatcherFilterStack() override {
296     for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
297       GPR_ASSERT(queue.Pop() == nullptr);
298     }
299     GPR_ASSERT(pending_.empty());
300   }
301 
ZombifyPending()302   void ZombifyPending() override {
303     while (!pending_.empty()) {
304       pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
305       pending_.front().calld->KillZombie();
306       pending_.pop();
307     }
308   }
309 
KillRequests(grpc_error_handle error)310   void KillRequests(grpc_error_handle error) override {
311     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
312       RequestedCall* rc;
313       while ((rc = reinterpret_cast<RequestedCall*>(
314                   requests_per_cq_[i].Pop())) != nullptr) {
315         server_->FailCall(i, rc, error);
316       }
317     }
318   }
319 
request_queue_count() const320   size_t request_queue_count() const override {
321     return requests_per_cq_.size();
322   }
323 
RequestCallWithPossiblePublish(size_t request_queue_index,RequestedCall * call)324   void RequestCallWithPossiblePublish(size_t request_queue_index,
325                                       RequestedCall* call) override {
326     if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
327       // this was the first queued request: we need to lock and start
328       // matching calls
329       struct NextPendingCall {
330         RequestedCall* rc = nullptr;
331         CallData* pending;
332       };
333       while (true) {
334         NextPendingCall pending_call;
335         {
336           MutexLock lock(&server_->mu_call_);
337           while (!pending_.empty() &&
338                  pending_.front().Age() > server_->max_time_in_pending_queue_) {
339             pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
340             pending_.front().calld->KillZombie();
341             pending_.pop();
342           }
343           if (!pending_.empty()) {
344             pending_call.rc = reinterpret_cast<RequestedCall*>(
345                 requests_per_cq_[request_queue_index].Pop());
346             if (pending_call.rc != nullptr) {
347               pending_call.pending = pending_.front().calld;
348               pending_.pop();
349             }
350           }
351         }
352         if (pending_call.rc == nullptr) break;
353         if (!pending_call.pending->MaybeActivate()) {
354           // Zombied Call
355           pending_call.pending->KillZombie();
356           requests_per_cq_[request_queue_index].Push(
357               &pending_call.rc->mpscq_node);
358         } else {
359           pending_call.pending->Publish(request_queue_index, pending_call.rc);
360         }
361       }
362     }
363   }
364 
MatchOrQueue(size_t start_request_queue_index,CallData * calld)365   void MatchOrQueue(size_t start_request_queue_index,
366                     CallData* calld) override {
367     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
368       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
369       RequestedCall* rc =
370           reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
371       if (rc != nullptr) {
372         calld->SetState(CallData::CallState::ACTIVATED);
373         calld->Publish(cq_idx, rc);
374         return;
375       }
376     }
377     // No cq to take the request found; queue it on the slow list.
378     // We need to ensure that all the queues are empty.  We do this under
379     // the server mu_call_ lock to ensure that if something is added to
380     // an empty request queue, it will block until the call is actually
381     // added to the pending list.
382     RequestedCall* rc = nullptr;
383     size_t cq_idx = 0;
384     size_t loop_count;
385     {
386       MutexLock lock(&server_->mu_call_);
387       for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
388         cq_idx =
389             (start_request_queue_index + loop_count) % requests_per_cq_.size();
390         rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
391         if (rc != nullptr) {
392           break;
393         }
394       }
395       if (rc == nullptr) {
396         calld->SetState(CallData::CallState::PENDING);
397         pending_.push(PendingCall{calld});
398         return;
399       }
400     }
401     calld->SetState(CallData::CallState::ACTIVATED);
402     calld->Publish(cq_idx, rc);
403   }
404 
MatchRequest(size_t)405   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override {
406     Crash("not implemented for filter stack request matcher");
407   }
408 
server() const409   Server* server() const final { return server_; }
410 
411  private:
412   Server* const server_;
413   struct PendingCall {
414     CallData* calld;
415     Timestamp created = Timestamp::Now();
Agegrpc_core::Server::RealRequestMatcherFilterStack::PendingCall416     Duration Age() { return Timestamp::Now() - created; }
417   };
418   std::queue<PendingCall> pending_;
419   std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
420 };
421 
422 class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
423  public:
RealRequestMatcherPromises(Server * server)424   explicit RealRequestMatcherPromises(Server* server)
425       : server_(server), requests_per_cq_(server->cqs_.size()) {}
426 
~RealRequestMatcherPromises()427   ~RealRequestMatcherPromises() override {
428     for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
429       GPR_ASSERT(queue.Pop() == nullptr);
430     }
431   }
432 
ZombifyPending()433   void ZombifyPending() override {
434     while (!pending_.empty()) {
435       pending_.front()->Finish(absl::InternalError("Server closed"));
436       pending_.pop();
437     }
438   }
439 
KillRequests(grpc_error_handle error)440   void KillRequests(grpc_error_handle error) override {
441     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
442       RequestedCall* rc;
443       while ((rc = reinterpret_cast<RequestedCall*>(
444                   requests_per_cq_[i].Pop())) != nullptr) {
445         server_->FailCall(i, rc, error);
446       }
447     }
448   }
449 
request_queue_count() const450   size_t request_queue_count() const override {
451     return requests_per_cq_.size();
452   }
453 
RequestCallWithPossiblePublish(size_t request_queue_index,RequestedCall * call)454   void RequestCallWithPossiblePublish(size_t request_queue_index,
455                                       RequestedCall* call) override {
456     if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
457       // this was the first queued request: we need to lock and start
458       // matching calls
459       struct NextPendingCall {
460         RequestedCall* rc = nullptr;
461         PendingCall pending;
462       };
463       while (true) {
464         NextPendingCall pending_call;
465         {
466           MutexLock lock(&server_->mu_call_);
467           if (!pending_.empty()) {
468             pending_call.rc = reinterpret_cast<RequestedCall*>(
469                 requests_per_cq_[request_queue_index].Pop());
470             if (pending_call.rc != nullptr) {
471               pending_call.pending = std::move(pending_.front());
472               pending_.pop();
473             }
474           }
475         }
476         if (pending_call.rc == nullptr) break;
477         if (!pending_call.pending->Finish(server(), request_queue_index,
478                                           pending_call.rc)) {
479           requests_per_cq_[request_queue_index].Push(
480               &pending_call.rc->mpscq_node);
481         }
482       }
483     }
484   }
485 
MatchOrQueue(size_t,CallData *)486   void MatchOrQueue(size_t, CallData*) override {
487     Crash("not implemented for promises");
488   }
489 
MatchRequest(size_t start_request_queue_index)490   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
491       size_t start_request_queue_index) override {
492     for (size_t i = 0; i < requests_per_cq_.size(); i++) {
493       size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
494       RequestedCall* rc =
495           reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
496       if (rc != nullptr) {
497         return Immediate(MatchResult(server(), cq_idx, rc));
498       }
499     }
500     // No cq to take the request found; queue it on the slow list.
501     // We need to ensure that all the queues are empty.  We do this under
502     // the server mu_call_ lock to ensure that if something is added to
503     // an empty request queue, it will block until the call is actually
504     // added to the pending list.
505     RequestedCall* rc = nullptr;
506     size_t cq_idx = 0;
507     size_t loop_count;
508     {
509       std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
510       MutexLock lock(&server_->mu_call_);
511       while (!pending_.empty() &&
512              pending_.front()->Age() > server_->max_time_in_pending_queue_) {
513         removed_pending.push_back(std::move(pending_.front()));
514         pending_.pop();
515       }
516       for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
517         cq_idx =
518             (start_request_queue_index + loop_count) % requests_per_cq_.size();
519         rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
520         if (rc != nullptr) break;
521       }
522       if (rc == nullptr) {
523         if (server_->pending_backlog_protector_.Reject(pending_.size(),
524                                                        server_->bitgen_)) {
525           return Immediate(absl::ResourceExhaustedError(
526               "Too many pending requests for this server"));
527         }
528         auto w = std::make_shared<ActivityWaiter>(
529             GetContext<Activity>()->MakeOwningWaker());
530         pending_.push(w);
531         return OnCancel(
532             [w]() -> Poll<absl::StatusOr<MatchResult>> {
533               std::unique_ptr<absl::StatusOr<MatchResult>> r(
534                   w->result.exchange(nullptr, std::memory_order_acq_rel));
535               if (r == nullptr) return Pending{};
536               return std::move(*r);
537             },
538             [w]() { w->Expire(); });
539       }
540     }
541     return Immediate(MatchResult(server(), cq_idx, rc));
542   }
543 
server() const544   Server* server() const final { return server_; }
545 
546  private:
547   Server* const server_;
548   struct ActivityWaiter {
549     using ResultType = absl::StatusOr<MatchResult>;
ActivityWaitergrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter550     explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaitergrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter551     ~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
Finishgrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter552     void Finish(absl::Status status) {
553       delete result.exchange(new ResultType(std::move(status)),
554                              std::memory_order_acq_rel);
555       waker.WakeupAsync();
556     }
557     // Returns true if requested_call consumed, false otherwise.
Finishgrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter558     GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx,
559                                      RequestedCall* requested_call) {
560       ResultType* expected = nullptr;
561       ResultType* new_value =
562           new ResultType(MatchResult(server, cq_idx, requested_call));
563       if (!result.compare_exchange_strong(expected, new_value,
564                                           std::memory_order_acq_rel,
565                                           std::memory_order_acquire)) {
566         GPR_ASSERT(new_value->value().TakeCall() == requested_call);
567         delete new_value;
568         return false;
569       }
570       waker.WakeupAsync();
571       return true;
572     }
Expiregrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter573     void Expire() {
574       delete result.exchange(new ResultType(absl::CancelledError()),
575                              std::memory_order_acq_rel);
576     }
Agegrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter577     Duration Age() { return Timestamp::Now() - created; }
578     Waker waker;
579     std::atomic<ResultType*> result{nullptr};
580     const Timestamp created = Timestamp::Now();
581   };
582   using PendingCall = std::shared_ptr<ActivityWaiter>;
583   std::queue<PendingCall> pending_;
584   std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
585 };
586 
587 // AllocatingRequestMatchers don't allow the application to request an RPC in
588 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
589 // will call out to an allocation function passed in at the construction of the
590 // object. These request matchers are designed for the C++ callback API, so they
591 // only support 1 completion queue (passed in at the constructor). They are also
592 // used for the sync API.
593 class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface {
594  public:
AllocatingRequestMatcherBase(Server * server,grpc_completion_queue * cq)595   AllocatingRequestMatcherBase(Server* server, grpc_completion_queue* cq)
596       : server_(server), cq_(cq) {
597     size_t idx;
598     for (idx = 0; idx < server->cqs_.size(); idx++) {
599       if (server->cqs_[idx] == cq) {
600         break;
601       }
602     }
603     GPR_ASSERT(idx < server->cqs_.size());
604     cq_idx_ = idx;
605   }
606 
ZombifyPending()607   void ZombifyPending() override {}
608 
KillRequests(grpc_error_handle)609   void KillRequests(grpc_error_handle /*error*/) override {}
610 
request_queue_count() const611   size_t request_queue_count() const override { return 0; }
612 
RequestCallWithPossiblePublish(size_t,RequestedCall *)613   void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
614                                       RequestedCall* /*call*/) final {
615     Crash("unreachable");
616   }
617 
server() const618   Server* server() const final { return server_; }
619 
620   // Supply the completion queue related to this request matcher
cq() const621   grpc_completion_queue* cq() const { return cq_; }
622 
623   // Supply the completion queue's index relative to the server.
cq_idx() const624   size_t cq_idx() const { return cq_idx_; }
625 
626  private:
627   Server* const server_;
628   grpc_completion_queue* const cq_;
629   size_t cq_idx_;
630 };
631 
632 // An allocating request matcher for non-registered methods (used for generic
633 // API and unimplemented RPCs).
634 class Server::AllocatingRequestMatcherBatch
635     : public AllocatingRequestMatcherBase {
636  public:
AllocatingRequestMatcherBatch(Server * server,grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)637   AllocatingRequestMatcherBatch(Server* server, grpc_completion_queue* cq,
638                                 std::function<BatchCallAllocation()> allocator)
639       : AllocatingRequestMatcherBase(server, cq),
640         allocator_(std::move(allocator)) {}
641 
MatchOrQueue(size_t,CallData * calld)642   void MatchOrQueue(size_t /*start_request_queue_index*/,
643                     CallData* calld) override {
644     const bool still_running = server()->ShutdownRefOnRequest();
645     auto cleanup_ref =
646         absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
647     if (still_running) {
648       BatchCallAllocation call_info = allocator_();
649       GPR_ASSERT(server()->ValidateServerRequest(
650                      cq(), static_cast<void*>(call_info.tag), nullptr,
651                      nullptr) == GRPC_CALL_OK);
652       RequestedCall* rc = new RequestedCall(
653           static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
654           call_info.initial_metadata, call_info.details);
655       calld->SetState(CallData::CallState::ACTIVATED);
656       calld->Publish(cq_idx(), rc);
657     } else {
658       calld->FailCallCreation();
659     }
660   }
661 
MatchRequest(size_t)662   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
663       size_t /*start_request_queue_index*/) override {
664     BatchCallAllocation call_info = allocator_();
665     GPR_ASSERT(server()->ValidateServerRequest(
666                    cq(), static_cast<void*>(call_info.tag), nullptr, nullptr) ==
667                GRPC_CALL_OK);
668     RequestedCall* rc = new RequestedCall(
669         static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
670         call_info.initial_metadata, call_info.details);
671     return Immediate(MatchResult(server(), cq_idx(), rc));
672   }
673 
674  private:
675   std::function<BatchCallAllocation()> allocator_;
676 };
677 
678 // An allocating request matcher for registered methods.
679 class Server::AllocatingRequestMatcherRegistered
680     : public AllocatingRequestMatcherBase {
681  public:
AllocatingRequestMatcherRegistered(Server * server,grpc_completion_queue * cq,RegisteredMethod * rm,std::function<RegisteredCallAllocation ()> allocator)682   AllocatingRequestMatcherRegistered(
683       Server* server, grpc_completion_queue* cq, RegisteredMethod* rm,
684       std::function<RegisteredCallAllocation()> allocator)
685       : AllocatingRequestMatcherBase(server, cq),
686         registered_method_(rm),
687         allocator_(std::move(allocator)) {}
688 
MatchOrQueue(size_t,CallData * calld)689   void MatchOrQueue(size_t /*start_request_queue_index*/,
690                     CallData* calld) override {
691     auto cleanup_ref =
692         absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
693     if (server()->ShutdownRefOnRequest()) {
694       RegisteredCallAllocation call_info = allocator_();
695       GPR_ASSERT(server()->ValidateServerRequest(
696                      cq(), call_info.tag, call_info.optional_payload,
697                      registered_method_) == GRPC_CALL_OK);
698       RequestedCall* rc =
699           new RequestedCall(call_info.tag, call_info.cq, call_info.call,
700                             call_info.initial_metadata, registered_method_,
701                             call_info.deadline, call_info.optional_payload);
702       calld->SetState(CallData::CallState::ACTIVATED);
703       calld->Publish(cq_idx(), rc);
704     } else {
705       calld->FailCallCreation();
706     }
707   }
708 
MatchRequest(size_t)709   ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
710       size_t /*start_request_queue_index*/) override {
711     RegisteredCallAllocation call_info = allocator_();
712     GPR_ASSERT(server()->ValidateServerRequest(
713                    cq(), call_info.tag, call_info.optional_payload,
714                    registered_method_) == GRPC_CALL_OK);
715     RequestedCall* rc = new RequestedCall(
716         call_info.tag, call_info.cq, call_info.call, call_info.initial_metadata,
717         registered_method_, call_info.deadline, call_info.optional_payload);
718     return Immediate(MatchResult(server(), cq_idx(), rc));
719   }
720 
721  private:
722   RegisteredMethod* const registered_method_;
723   std::function<RegisteredCallAllocation()> allocator_;
724 };
725 
726 //
727 // ChannelBroadcaster
728 //
729 
730 namespace {
731 
732 class ChannelBroadcaster {
733  public:
734   // This can have an empty constructor and destructor since we want to control
735   // when the actual setup and shutdown broadcast take place.
736 
737   // Copies over the channels from the locked server.
FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels)738   void FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels) {
739     GPR_DEBUG_ASSERT(channels_.empty());
740     channels_ = std::move(channels);
741   }
742 
743   // Broadcasts a shutdown on each channel.
BroadcastShutdown(bool send_goaway,grpc_error_handle force_disconnect)744   void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) {
745     for (const RefCountedPtr<Channel>& channel : channels_) {
746       SendShutdown(channel.get(), send_goaway, force_disconnect);
747     }
748     channels_.clear();  // just for safety against double broadcast
749   }
750 
751  private:
752   struct ShutdownCleanupArgs {
753     grpc_closure closure;
754     grpc_slice slice;
755   };
756 
ShutdownCleanup(void * arg,grpc_error_handle)757   static void ShutdownCleanup(void* arg, grpc_error_handle /*error*/) {
758     ShutdownCleanupArgs* a = static_cast<ShutdownCleanupArgs*>(arg);
759     CSliceUnref(a->slice);
760     delete a;
761   }
762 
SendShutdown(Channel * channel,bool send_goaway,grpc_error_handle send_disconnect)763   static void SendShutdown(Channel* channel, bool send_goaway,
764                            grpc_error_handle send_disconnect) {
765     ShutdownCleanupArgs* sc = new ShutdownCleanupArgs;
766     GRPC_CLOSURE_INIT(&sc->closure, ShutdownCleanup, sc,
767                       grpc_schedule_on_exec_ctx);
768     grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
769     grpc_channel_element* elem;
770     op->goaway_error =
771         send_goaway
772             ? grpc_error_set_int(GRPC_ERROR_CREATE("Server shutdown"),
773                                  StatusIntProperty::kRpcStatus, GRPC_STATUS_OK)
774             : absl::OkStatus();
775     sc->slice = grpc_slice_from_copied_string("Server shutdown");
776     op->disconnect_with_error = send_disconnect;
777     elem = grpc_channel_stack_element(channel->channel_stack(), 0);
778     elem->filter->start_transport_op(elem, op);
779   }
780 
781   std::vector<RefCountedPtr<Channel>> channels_;
782 };
783 
784 }  // namespace
785 
786 //
787 // Server
788 //
789 
790 const grpc_channel_filter Server::kServerTopFilter = {
791     Server::CallData::StartTransportStreamOpBatch,
792     Server::ChannelData::MakeCallPromise,
__anone6a6c4b10902() 793     [](grpc_channel_element*, CallSpineInterface*) {
794       // TODO(ctiller): remove the server filter when call-v3 is finalized
795     },
796     grpc_channel_next_op,
797     sizeof(Server::CallData),
798     Server::CallData::InitCallElement,
799     grpc_call_stack_ignore_set_pollset_or_pollset_set,
800     Server::CallData::DestroyCallElement,
801     sizeof(Server::ChannelData),
802     Server::ChannelData::InitChannelElement,
803     grpc_channel_stack_no_post_init,
804     Server::ChannelData::DestroyChannelElement,
805     grpc_channel_next_get_info,
806     "server",
807 };
808 
809 namespace {
810 
CreateChannelzNode(const ChannelArgs & args)811 RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
812     const ChannelArgs& args) {
813   RefCountedPtr<channelz::ServerNode> channelz_node;
814   if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
815           .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
816     size_t channel_tracer_max_memory = std::max(
817         0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
818                .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
819     channelz_node =
820         MakeRefCounted<channelz::ServerNode>(channel_tracer_max_memory);
821     channelz_node->AddTraceEvent(
822         channelz::ChannelTrace::Severity::Info,
823         grpc_slice_from_static_string("Server created"));
824   }
825   return channelz_node;
826 }
827 
828 }  // namespace
829 
Server(const ChannelArgs & args)830 Server::Server(const ChannelArgs& args)
831     : channel_args_(args),
832       channelz_node_(CreateChannelzNode(args)),
833       server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
834       max_time_in_pending_queue_(Duration::Seconds(
835           channel_args_
836               .GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)
837               .value_or(30))) {}
838 
~Server()839 Server::~Server() {
840   // Remove the cq pollsets from the config_fetcher.
841   if (started_ && config_fetcher_ != nullptr &&
842       config_fetcher_->interested_parties() != nullptr) {
843     for (grpc_pollset* pollset : pollsets_) {
844       grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(),
845                                    pollset);
846     }
847   }
848   for (size_t i = 0; i < cqs_.size(); i++) {
849     GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server");
850   }
851 }
852 
AddListener(OrphanablePtr<ListenerInterface> listener)853 void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {
854   channelz::ListenSocketNode* listen_socket_node =
855       listener->channelz_listen_socket_node();
856   if (listen_socket_node != nullptr && channelz_node_ != nullptr) {
857     channelz_node_->AddChildListenSocket(
858         listen_socket_node->RefAsSubclass<channelz::ListenSocketNode>());
859   }
860   listeners_.emplace_back(std::move(listener));
861 }
862 
Start()863 void Server::Start() {
864   auto make_real_request_matcher =
865       [this]() -> std::unique_ptr<RequestMatcherInterface> {
866     if (IsPromiseBasedServerCallEnabled()) {
867       return std::make_unique<RealRequestMatcherPromises>(this);
868     } else {
869       return std::make_unique<RealRequestMatcherFilterStack>(this);
870     }
871   };
872 
873   started_ = true;
874   for (grpc_completion_queue* cq : cqs_) {
875     if (grpc_cq_can_listen(cq)) {
876       pollsets_.push_back(grpc_cq_pollset(cq));
877     }
878   }
879   if (unregistered_request_matcher_ == nullptr) {
880     unregistered_request_matcher_ = make_real_request_matcher();
881   }
882   for (auto& rm : registered_methods_) {
883     if (rm.second->matcher == nullptr) {
884       rm.second->matcher = make_real_request_matcher();
885     }
886   }
887   {
888     MutexLock lock(&mu_global_);
889     starting_ = true;
890   }
891   // Register the interested parties from the config fetcher to the cq pollsets
892   // before starting listeners so that config fetcher is being polled when the
893   // listeners start watch the fetcher.
894   if (config_fetcher_ != nullptr &&
895       config_fetcher_->interested_parties() != nullptr) {
896     for (grpc_pollset* pollset : pollsets_) {
897       grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(),
898                                    pollset);
899     }
900   }
901   for (auto& listener : listeners_) {
902     listener.listener->Start(this, &pollsets_);
903   }
904   MutexLock lock(&mu_global_);
905   starting_ = false;
906   starting_cv_.Signal();
907 }
908 
SetupTransport(Transport * transport,grpc_pollset * accepting_pollset,const ChannelArgs & args,const RefCountedPtr<channelz::SocketNode> & socket_node)909 grpc_error_handle Server::SetupTransport(
910     Transport* transport, grpc_pollset* accepting_pollset,
911     const ChannelArgs& args,
912     const RefCountedPtr<channelz::SocketNode>& socket_node) {
913   // Create channel.
914   global_stats().IncrementServerChannelsCreated();
915   absl::StatusOr<OrphanablePtr<Channel>> channel =
916       LegacyChannel::Create("", args.SetObject(transport), GRPC_SERVER_CHANNEL);
917   if (!channel.ok()) {
918     return absl_status_to_grpc_error(channel.status());
919   }
920   ChannelData* chand = static_cast<ChannelData*>(
921       grpc_channel_stack_element((*channel)->channel_stack(), 0)->channel_data);
922   // Set up CQs.
923   size_t cq_idx;
924   for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
925     if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break;
926   }
927   if (cq_idx == cqs_.size()) {
928     // Completion queue not found.  Pick a random one to publish new calls to.
929     cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size());
930   }
931   // Set up channelz node.
932   intptr_t channelz_socket_uuid = 0;
933   if (socket_node != nullptr) {
934     channelz_socket_uuid = socket_node->uuid();
935     channelz_node_->AddChildSocket(socket_node);
936   }
937   // Initialize chand.
938   chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
939                        channelz_socket_uuid);
940   return absl::OkStatus();
941 }
942 
HasOpenConnections()943 bool Server::HasOpenConnections() {
944   MutexLock lock(&mu_global_);
945   return !channels_.empty();
946 }
947 
SetRegisteredMethodAllocator(grpc_completion_queue * cq,void * method_tag,std::function<RegisteredCallAllocation ()> allocator)948 void Server::SetRegisteredMethodAllocator(
949     grpc_completion_queue* cq, void* method_tag,
950     std::function<RegisteredCallAllocation()> allocator) {
951   RegisteredMethod* rm = static_cast<RegisteredMethod*>(method_tag);
952   rm->matcher = std::make_unique<AllocatingRequestMatcherRegistered>(
953       this, cq, rm, std::move(allocator));
954 }
955 
SetBatchMethodAllocator(grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)956 void Server::SetBatchMethodAllocator(
957     grpc_completion_queue* cq, std::function<BatchCallAllocation()> allocator) {
958   GPR_DEBUG_ASSERT(unregistered_request_matcher_ == nullptr);
959   unregistered_request_matcher_ =
960       std::make_unique<AllocatingRequestMatcherBatch>(this, cq,
961                                                       std::move(allocator));
962 }
963 
RegisterCompletionQueue(grpc_completion_queue * cq)964 void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {
965   for (grpc_completion_queue* queue : cqs_) {
966     if (queue == cq) return;
967   }
968   GRPC_CQ_INTERNAL_REF(cq, "server");
969   cqs_.push_back(cq);
970 }
971 
RegisterMethod(const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)972 Server::RegisteredMethod* Server::RegisterMethod(
973     const char* method, const char* host,
974     grpc_server_register_method_payload_handling payload_handling,
975     uint32_t flags) {
976   if (started_) {
977     Crash("Attempting to register method after server started");
978   }
979 
980   if (!method) {
981     gpr_log(GPR_ERROR,
982             "grpc_server_register_method method string cannot be NULL");
983     return nullptr;
984   }
985   auto key = std::make_pair(host ? host : "", method);
986   if (registered_methods_.find(key) != registered_methods_.end()) {
987     gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
988             host ? host : "*");
989     return nullptr;
990   }
991   if (flags != 0) {
992     gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
993             flags);
994     return nullptr;
995   }
996   auto it = registered_methods_.emplace(
997       key, std::make_unique<RegisteredMethod>(method, host, payload_handling,
998                                               flags));
999   return it.first->second.get();
1000 }
1001 
DoneRequestEvent(void * req,grpc_cq_completion *)1002 void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {
1003   delete static_cast<RequestedCall*>(req);
1004 }
1005 
FailCall(size_t cq_idx,RequestedCall * rc,grpc_error_handle error)1006 void Server::FailCall(size_t cq_idx, RequestedCall* rc,
1007                       grpc_error_handle error) {
1008   *rc->call = nullptr;
1009   rc->initial_metadata->count = 0;
1010   GPR_ASSERT(!error.ok());
1011   grpc_cq_end_op(cqs_[cq_idx], rc->tag, error, DoneRequestEvent, rc,
1012                  &rc->completion);
1013 }
1014 
1015 // Before calling MaybeFinishShutdown(), we must hold mu_global_ and not
1016 // hold mu_call_.
MaybeFinishShutdown()1017 void Server::MaybeFinishShutdown() {
1018   if (!ShutdownReady() || shutdown_published_) {
1019     return;
1020   }
1021   {
1022     MutexLock lock(&mu_call_);
1023     KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1024   }
1025   if (!channels_.empty() || listeners_destroyed_ < listeners_.size()) {
1026     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
1027                                   last_shutdown_message_time_),
1028                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
1029       last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1030       gpr_log(GPR_DEBUG,
1031               "Waiting for %" PRIuPTR " channels and %" PRIuPTR "/%" PRIuPTR
1032               " listeners to be destroyed before shutting down server",
1033               channels_.size(), listeners_.size() - listeners_destroyed_,
1034               listeners_.size());
1035     }
1036     return;
1037   }
1038   shutdown_published_ = true;
1039   for (auto& shutdown_tag : shutdown_tags_) {
1040     Ref().release();
1041     grpc_cq_end_op(shutdown_tag.cq, shutdown_tag.tag, absl::OkStatus(),
1042                    DoneShutdownEvent, this, &shutdown_tag.completion);
1043   }
1044 }
1045 
KillPendingWorkLocked(grpc_error_handle error)1046 void Server::KillPendingWorkLocked(grpc_error_handle error) {
1047   if (started_) {
1048     unregistered_request_matcher_->KillRequests(error);
1049     unregistered_request_matcher_->ZombifyPending();
1050     for (auto& rm : registered_methods_) {
1051       rm.second->matcher->KillRequests(error);
1052       rm.second->matcher->ZombifyPending();
1053     }
1054   }
1055 }
1056 
GetChannelsLocked() const1057 std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {
1058   std::vector<RefCountedPtr<Channel>> channels;
1059   channels.reserve(channels_.size());
1060   for (const ChannelData* chand : channels_) {
1061     channels.push_back(chand->channel()->Ref());
1062   }
1063   return channels;
1064 }
1065 
ListenerDestroyDone(void * arg,grpc_error_handle)1066 void Server::ListenerDestroyDone(void* arg, grpc_error_handle /*error*/) {
1067   Server* server = static_cast<Server*>(arg);
1068   MutexLock lock(&server->mu_global_);
1069   server->listeners_destroyed_++;
1070   server->MaybeFinishShutdown();
1071 }
1072 
1073 namespace {
1074 
DonePublishedShutdown(void *,grpc_cq_completion * storage)1075 void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
1076   delete storage;
1077 }
1078 
1079 }  // namespace
1080 
1081 // - Kills all pending requests-for-incoming-RPC-calls (i.e., the requests made
1082 //   via grpc_server_request_call() and grpc_server_request_registered_call()
1083 //   will now be cancelled). See KillPendingWorkLocked().
1084 //
1085 // - Shuts down the listeners (i.e., the server will no longer listen on the
1086 //   port for new incoming channels).
1087 //
1088 // - Iterates through all channels on the server and sends shutdown msg (see
1089 //   ChannelBroadcaster::BroadcastShutdown() for details) to the clients via
1090 //   the transport layer. The transport layer then guarantees the following:
1091 //    -- Sends shutdown to the client (e.g., HTTP2 transport sends GOAWAY).
1092 //    -- If the server has outstanding calls that are in the process, the
1093 //       connection is NOT closed until the server is done with all those calls.
1094 //    -- Once there are no more calls in progress, the channel is closed.
ShutdownAndNotify(grpc_completion_queue * cq,void * tag)1095 void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
1096   ChannelBroadcaster broadcaster;
1097   {
1098     // Wait for startup to be finished.  Locks mu_global.
1099     MutexLock lock(&mu_global_);
1100     while (starting_) {
1101       starting_cv_.Wait(&mu_global_);
1102     }
1103     // Stay locked, and gather up some stuff to do.
1104     GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1105     if (shutdown_published_) {
1106       grpc_cq_end_op(cq, tag, absl::OkStatus(), DonePublishedShutdown, nullptr,
1107                      new grpc_cq_completion);
1108       return;
1109     }
1110     shutdown_tags_.emplace_back(tag, cq);
1111     if (ShutdownCalled()) {
1112       return;
1113     }
1114     last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1115     broadcaster.FillChannelsLocked(GetChannelsLocked());
1116     // Collect all unregistered then registered calls.
1117     {
1118       MutexLock lock(&mu_call_);
1119       KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1120     }
1121     ShutdownUnrefOnShutdownCall();
1122   }
1123   StopListening();
1124   broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1125 }
1126 
StopListening()1127 void Server::StopListening() {
1128   for (auto& listener : listeners_) {
1129     if (listener.listener == nullptr) continue;
1130     channelz::ListenSocketNode* channelz_listen_socket_node =
1131         listener.listener->channelz_listen_socket_node();
1132     if (channelz_node_ != nullptr && channelz_listen_socket_node != nullptr) {
1133       channelz_node_->RemoveChildListenSocket(
1134           channelz_listen_socket_node->uuid());
1135     }
1136     GRPC_CLOSURE_INIT(&listener.destroy_done, ListenerDestroyDone, this,
1137                       grpc_schedule_on_exec_ctx);
1138     listener.listener->SetOnDestroyDone(&listener.destroy_done);
1139     listener.listener.reset();
1140   }
1141 }
1142 
CancelAllCalls()1143 void Server::CancelAllCalls() {
1144   ChannelBroadcaster broadcaster;
1145   {
1146     MutexLock lock(&mu_global_);
1147     broadcaster.FillChannelsLocked(GetChannelsLocked());
1148   }
1149   broadcaster.BroadcastShutdown(
1150       /*send_goaway=*/false, GRPC_ERROR_CREATE("Cancelling all calls"));
1151 }
1152 
SendGoaways()1153 void Server::SendGoaways() {
1154   ChannelBroadcaster broadcaster;
1155   {
1156     MutexLock lock(&mu_global_);
1157     broadcaster.FillChannelsLocked(GetChannelsLocked());
1158   }
1159   broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1160 }
1161 
Orphan()1162 void Server::Orphan() {
1163   {
1164     MutexLock lock(&mu_global_);
1165     GPR_ASSERT(ShutdownCalled() || listeners_.empty());
1166     GPR_ASSERT(listeners_destroyed_ == listeners_.size());
1167   }
1168   Unref();
1169 }
1170 
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1171 grpc_call_error Server::ValidateServerRequest(
1172     grpc_completion_queue* cq_for_notification, void* tag,
1173     grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1174   if ((rm == nullptr && optional_payload != nullptr) ||
1175       ((rm != nullptr) && ((optional_payload == nullptr) !=
1176                            (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
1177     return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1178   }
1179   if (!grpc_cq_begin_op(cq_for_notification, tag)) {
1180     return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1181   }
1182   return GRPC_CALL_OK;
1183 }
1184 
ValidateServerRequestAndCq(size_t * cq_idx,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1185 grpc_call_error Server::ValidateServerRequestAndCq(
1186     size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
1187     grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1188   size_t idx;
1189   for (idx = 0; idx < cqs_.size(); idx++) {
1190     if (cqs_[idx] == cq_for_notification) {
1191       break;
1192     }
1193   }
1194   if (idx == cqs_.size()) {
1195     return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1196   }
1197   grpc_call_error error =
1198       ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
1199   if (error != GRPC_CALL_OK) {
1200     return error;
1201   }
1202   *cq_idx = idx;
1203   return GRPC_CALL_OK;
1204 }
1205 
QueueRequestedCall(size_t cq_idx,RequestedCall * rc)1206 grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) {
1207   if (ShutdownCalled()) {
1208     FailCall(cq_idx, rc, GRPC_ERROR_CREATE("Server Shutdown"));
1209     return GRPC_CALL_OK;
1210   }
1211   RequestMatcherInterface* rm;
1212   switch (rc->type) {
1213     case RequestedCall::Type::BATCH_CALL:
1214       rm = unregistered_request_matcher_.get();
1215       break;
1216     case RequestedCall::Type::REGISTERED_CALL:
1217       rm = rc->data.registered.method->matcher.get();
1218       break;
1219   }
1220   rm->RequestCallWithPossiblePublish(cq_idx, rc);
1221   return GRPC_CALL_OK;
1222 }
1223 
RequestCall(grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1224 grpc_call_error Server::RequestCall(grpc_call** call,
1225                                     grpc_call_details* details,
1226                                     grpc_metadata_array* request_metadata,
1227                                     grpc_completion_queue* cq_bound_to_call,
1228                                     grpc_completion_queue* cq_for_notification,
1229                                     void* tag) {
1230   size_t cq_idx;
1231   grpc_call_error error = ValidateServerRequestAndCq(
1232       &cq_idx, cq_for_notification, tag, nullptr, nullptr);
1233   if (error != GRPC_CALL_OK) {
1234     return error;
1235   }
1236   RequestedCall* rc =
1237       new RequestedCall(tag, cq_bound_to_call, call, request_metadata, details);
1238   return QueueRequestedCall(cq_idx, rc);
1239 }
1240 
RequestRegisteredCall(RegisteredMethod * rm,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1241 grpc_call_error Server::RequestRegisteredCall(
1242     RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
1243     grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload,
1244     grpc_completion_queue* cq_bound_to_call,
1245     grpc_completion_queue* cq_for_notification, void* tag_new) {
1246   size_t cq_idx;
1247   grpc_call_error error = ValidateServerRequestAndCq(
1248       &cq_idx, cq_for_notification, tag_new, optional_payload, rm);
1249   if (error != GRPC_CALL_OK) {
1250     return error;
1251   }
1252   RequestedCall* rc =
1253       new RequestedCall(tag_new, cq_bound_to_call, call, request_metadata, rm,
1254                         deadline, optional_payload);
1255   return QueueRequestedCall(cq_idx, rc);
1256 }
1257 
1258 //
1259 // Server::ChannelData::ConnectivityWatcher
1260 //
1261 
1262 class Server::ChannelData::ConnectivityWatcher
1263     : public AsyncConnectivityStateWatcherInterface {
1264  public:
ConnectivityWatcher(ChannelData * chand)1265   explicit ConnectivityWatcher(ChannelData* chand)
1266       : chand_(chand), channel_(chand_->channel_->Ref()) {}
1267 
1268  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status &)1269   void OnConnectivityStateChange(grpc_connectivity_state new_state,
1270                                  const absl::Status& /*status*/) override {
1271     // Don't do anything until we are being shut down.
1272     if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1273     // Shut down channel.
1274     MutexLock lock(&chand_->server_->mu_global_);
1275     chand_->Destroy();
1276   }
1277 
1278   ChannelData* const chand_;
1279   const RefCountedPtr<Channel> channel_;
1280 };
1281 
1282 //
1283 // Server::ChannelData
1284 //
1285 
~ChannelData()1286 Server::ChannelData::~ChannelData() {
1287   if (server_ != nullptr) {
1288     if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) {
1289       server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_);
1290     }
1291     {
1292       MutexLock lock(&server_->mu_global_);
1293       if (list_position_.has_value()) {
1294         server_->channels_.erase(*list_position_);
1295         list_position_.reset();
1296       }
1297       server_->MaybeFinishShutdown();
1298     }
1299   }
1300 }
1301 
CreateArena()1302 Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); }
1303 
CreateCall(ClientMetadata & client_initial_metadata,Arena * arena)1304 absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall(
1305     ClientMetadata& client_initial_metadata, Arena* arena) {
1306   SetRegisteredMethodOnMetadata(client_initial_metadata);
1307   auto call = MakeServerCall(server_.get(), channel_.get(), arena);
1308   InitCall(call);
1309   return CallInitiator(std::move(call));
1310 }
1311 
InitTransport(RefCountedPtr<Server> server,OrphanablePtr<Channel> channel,size_t cq_idx,Transport * transport,intptr_t channelz_socket_uuid)1312 void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
1313                                         OrphanablePtr<Channel> channel,
1314                                         size_t cq_idx, Transport* transport,
1315                                         intptr_t channelz_socket_uuid) {
1316   server_ = std::move(server);
1317   channel_ = std::move(channel);
1318   cq_idx_ = cq_idx;
1319   channelz_socket_uuid_ = channelz_socket_uuid;
1320   // Publish channel.
1321   {
1322     MutexLock lock(&server_->mu_global_);
1323     server_->channels_.push_front(this);
1324     list_position_ = server_->channels_.begin();
1325   }
1326   // Start accept_stream transport op.
1327   grpc_transport_op* op = grpc_make_transport_op(nullptr);
1328   int accept_stream_types = 0;
1329   if (transport->filter_stack_transport() != nullptr) {
1330     ++accept_stream_types;
1331     op->set_accept_stream = true;
1332     op->set_accept_stream_fn = AcceptStream;
1333     op->set_registered_method_matcher_fn = [](void* arg,
1334                                               ClientMetadata* metadata) {
1335       static_cast<ChannelData*>(arg)->SetRegisteredMethodOnMetadata(*metadata);
1336     };
1337     op->set_accept_stream_user_data = this;
1338   }
1339   if (transport->server_transport() != nullptr) {
1340     ++accept_stream_types;
1341     transport->server_transport()->SetAcceptor(this);
1342   }
1343   GPR_ASSERT(accept_stream_types == 1);
1344   op->start_connectivity_watch = MakeOrphanable<ConnectivityWatcher>(this);
1345   if (server_->ShutdownCalled()) {
1346     op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
1347   }
1348   transport->PerformOp(op);
1349 }
1350 
GetRegisteredMethod(const absl::string_view & host,const absl::string_view & path)1351 Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod(
1352     const absl::string_view& host, const absl::string_view& path) {
1353   if (server_->registered_methods_.empty()) return nullptr;
1354   // check for an exact match with host
1355   auto it = server_->registered_methods_.find(std::make_pair(host, path));
1356   if (it != server_->registered_methods_.end()) {
1357     return it->second.get();
1358   }
1359   // check for wildcard method definition (no host set)
1360   it = server_->registered_methods_.find(std::make_pair("", path));
1361   if (it != server_->registered_methods_.end()) {
1362     return it->second.get();
1363   }
1364   return nullptr;
1365 }
1366 
SetRegisteredMethodOnMetadata(ClientMetadata & metadata)1367 void Server::ChannelData::SetRegisteredMethodOnMetadata(
1368     ClientMetadata& metadata) {
1369   auto* authority = metadata.get_pointer(HttpAuthorityMetadata());
1370   if (authority == nullptr) {
1371     authority = metadata.get_pointer(HostMetadata());
1372     if (authority == nullptr) {
1373       // Authority not being set is an RPC error.
1374       return;
1375     }
1376   }
1377   auto* path = metadata.get_pointer(HttpPathMetadata());
1378   if (path == nullptr) {
1379     // Path not being set would result in an RPC error.
1380     return;
1381   }
1382   RegisteredMethod* method =
1383       GetRegisteredMethod(authority->as_string_view(), path->as_string_view());
1384   // insert in metadata
1385   metadata.Set(GrpcRegisteredMethod(), method);
1386 }
1387 
AcceptStream(void * arg,Transport *,const void * transport_server_data)1388 void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/,
1389                                        const void* transport_server_data) {
1390   auto* chand = static_cast<Server::ChannelData*>(arg);
1391   // create a call
1392   grpc_call_create_args args;
1393   args.channel = chand->channel_->Ref();
1394   args.server = chand->server_.get();
1395   args.parent = nullptr;
1396   args.propagation_mask = 0;
1397   args.cq = nullptr;
1398   args.pollset_set_alternative = nullptr;
1399   args.server_transport_data = transport_server_data;
1400   args.send_deadline = Timestamp::InfFuture();
1401   grpc_call* call;
1402   grpc_error_handle error = grpc_call_create(&args, &call);
1403   grpc_call_stack* call_stack = grpc_call_get_call_stack(call);
1404   if (call_stack == nullptr) {  // Promise based calls do not have a call stack
1405     GPR_ASSERT(error.ok());
1406     GPR_ASSERT(IsPromiseBasedServerCallEnabled());
1407     return;
1408   } else {
1409     grpc_call_element* elem = grpc_call_stack_element(call_stack, 0);
1410     auto* calld = static_cast<Server::CallData*>(elem->call_data);
1411     if (!error.ok()) {
1412       calld->FailCallCreation();
1413       return;
1414     }
1415     calld->Start(elem);
1416   }
1417 }
1418 
1419 namespace {
CancelledDueToServerShutdown()1420 auto CancelledDueToServerShutdown() {
1421   return [] {
1422     return ServerMetadataFromStatus(absl::CancelledError("Server shutdown"));
1423   };
1424 }
1425 }  // namespace
1426 
InitCall(RefCountedPtr<CallSpineInterface> call)1427 void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
1428   call->SpawnGuarded("request_matcher", [this, call]() {
1429     return TrySeq(
1430         // Wait for initial metadata to pass through all filters
1431         Map(call->client_initial_metadata().receiver.Next(),
1432             [](NextResult<ClientMetadataHandle> md)
1433                 -> absl::StatusOr<ClientMetadataHandle> {
1434               if (!md.has_value()) {
1435                 return absl::InternalError("Missing metadata");
1436               }
1437               if (!md.value()->get_pointer(HttpPathMetadata())) {
1438                 return absl::InternalError("Missing :path header");
1439               }
1440               if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
1441                 return absl::InternalError("Missing :authority header");
1442               }
1443               return std::move(*md);
1444             }),
1445         // Match request with requested call
1446         [this, call](ClientMetadataHandle md) {
1447           auto* registered_method = static_cast<RegisteredMethod*>(
1448               md->get(GrpcRegisteredMethod()).value_or(nullptr));
1449           RequestMatcherInterface* rm;
1450           grpc_server_register_method_payload_handling payload_handling =
1451               GRPC_SRM_PAYLOAD_NONE;
1452           if (registered_method == nullptr) {
1453             rm = server_->unregistered_request_matcher_.get();
1454           } else {
1455             payload_handling = registered_method->payload_handling;
1456             rm = registered_method->matcher.get();
1457           }
1458           auto maybe_read_first_message = If(
1459               payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
1460               [call]() {
1461                 return call->client_to_server_messages().receiver.Next();
1462               },
1463               []() -> NextResult<MessageHandle> {
1464                 return NextResult<MessageHandle>();
1465               });
1466           return TryJoin<absl::StatusOr>(
1467               Map(std::move(maybe_read_first_message),
1468                   [](NextResult<MessageHandle> n) {
1469                     return ValueOrFailure<NextResult<MessageHandle>>{
1470                         std::move(n)};
1471                   }),
1472               rm->MatchRequest(cq_idx()), [md = std::move(md)]() mutable {
1473                 return ValueOrFailure<ClientMetadataHandle>(std::move(md));
1474               });
1475         },
1476         // Publish call to cq
1477         [](std::tuple<NextResult<MessageHandle>,
1478                       RequestMatcherInterface::MatchResult,
1479                       ClientMetadataHandle>
1480                r) {
1481           RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
1482           auto md = std::move(std::get<2>(r));
1483           auto* rc = mr.TakeCall();
1484           rc->Complete(std::move(std::get<0>(r)), *md);
1485           auto* call_context = GetContext<CallContext>();
1486           *rc->call = call_context->c_call();
1487           grpc_call_ref(*rc->call);
1488           grpc_call_set_completion_queue(call_context->c_call(),
1489                                          rc->cq_bound_to_call);
1490           call_context->server_call_context()->PublishInitialMetadata(
1491               std::move(md), rc->initial_metadata);
1492           // TODO(ctiller): publish metadata
1493           return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
1494                      [rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
1495                        return absl::OkStatus();
1496                      });
1497         });
1498   });
1499 }
1500 
MakeCallPromise(grpc_channel_element * elem,CallArgs call_args,NextPromiseFactory)1501 ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
1502     grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
1503   auto* chand = static_cast<Server::ChannelData*>(elem->channel_data);
1504   auto* server = chand->server_.get();
1505   if (server->ShutdownCalled()) return CancelledDueToServerShutdown();
1506   auto cleanup_ref =
1507       absl::MakeCleanup([server] { server->ShutdownUnrefOnRequest(); });
1508   if (!server->ShutdownRefOnRequest()) return CancelledDueToServerShutdown();
1509   auto path_ptr =
1510       call_args.client_initial_metadata->get_pointer(HttpPathMetadata());
1511   if (path_ptr == nullptr) {
1512     return [] {
1513       return ServerMetadataFromStatus(
1514           absl::InternalError("Missing :path header"));
1515     };
1516   }
1517   auto host_ptr =
1518       call_args.client_initial_metadata->get_pointer(HttpAuthorityMetadata());
1519   if (host_ptr == nullptr) {
1520     return [] {
1521       return ServerMetadataFromStatus(
1522           absl::InternalError("Missing :authority header"));
1523     };
1524   }
1525   // Find request matcher.
1526   RequestMatcherInterface* matcher;
1527   RegisteredMethod* rm = static_cast<RegisteredMethod*>(
1528       call_args.client_initial_metadata->get(GrpcRegisteredMethod())
1529           .value_or(nullptr));
1530   ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>>
1531       maybe_read_first_message([] { return NextResult<MessageHandle>(); });
1532   if (rm != nullptr) {
1533     matcher = rm->matcher.get();
1534     switch (rm->payload_handling) {
1535       case GRPC_SRM_PAYLOAD_NONE:
1536         break;
1537       case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER:
1538         maybe_read_first_message =
1539             Map(call_args.client_to_server_messages->Next(),
1540                 [](NextResult<MessageHandle> msg)
1541                     -> absl::StatusOr<NextResult<MessageHandle>> {
1542                   return std::move(msg);
1543                 });
1544     }
1545   } else {
1546     matcher = server->unregistered_request_matcher_.get();
1547   }
1548   return TrySeq(
1549       std::move(maybe_read_first_message),
1550       [cleanup_ref = std::move(cleanup_ref), matcher,
1551        chand](NextResult<MessageHandle> payload) mutable {
1552         return Map(
1553             [cleanup_ref = std::move(cleanup_ref),
1554              mr = matcher->MatchRequest(chand->cq_idx())]() mutable {
1555               return mr();
1556             },
1557             [payload = std::move(payload)](
1558                 absl::StatusOr<RequestMatcherInterface::MatchResult> mr) mutable
1559             -> absl::StatusOr<std::pair<RequestMatcherInterface::MatchResult,
1560                                         NextResult<MessageHandle>>> {
1561               if (!mr.ok()) return mr.status();
1562               return std::make_pair(std::move(*mr), std::move(payload));
1563             });
1564       },
1565       [call_args =
1566            std::move(call_args)](std::pair<RequestMatcherInterface::MatchResult,
1567                                            NextResult<MessageHandle>>
1568                                      r) mutable {
1569         auto& mr = r.first;
1570         auto& payload = r.second;
1571         auto* rc = mr.TakeCall();
1572         auto* cq_for_new_request = mr.cq();
1573         auto* server_call_context =
1574             GetContext<CallContext>()->server_call_context();
1575         rc->Complete(std::move(payload), *call_args.client_initial_metadata);
1576         server_call_context->PublishInitialMetadata(
1577             std::move(call_args.client_initial_metadata), rc->initial_metadata);
1578         return server_call_context->MakeTopOfServerCallPromise(
1579             std::move(call_args), rc->cq_bound_to_call,
1580             [rc, cq_for_new_request](grpc_call* call) {
1581               *rc->call = call;
1582               grpc_cq_end_op(cq_for_new_request, rc->tag, absl::OkStatus(),
1583                              Server::DoneRequestEvent, rc, &rc->completion,
1584                              true);
1585             });
1586       });
1587 }
1588 
FinishDestroy(void * arg,grpc_error_handle)1589 void Server::ChannelData::FinishDestroy(void* arg,
1590                                         grpc_error_handle /*error*/) {
1591   auto* chand = static_cast<Server::ChannelData*>(arg);
1592   Server* server = chand->server_.get();
1593   auto* channel_stack = chand->channel_->channel_stack();
1594   chand->channel_.reset();
1595   server->Unref();
1596   GRPC_CHANNEL_STACK_UNREF(channel_stack, "Server::ChannelData::Destroy");
1597 }
1598 
Destroy()1599 void Server::ChannelData::Destroy() {
1600   if (!list_position_.has_value()) return;
1601   GPR_ASSERT(server_ != nullptr);
1602   server_->channels_.erase(*list_position_);
1603   list_position_.reset();
1604   server_->Ref().release();
1605   server_->MaybeFinishShutdown();
1606   // Unreffed by FinishDestroy
1607   GRPC_CHANNEL_STACK_REF(channel_->channel_stack(),
1608                          "Server::ChannelData::Destroy");
1609   GRPC_CLOSURE_INIT(&finish_destroy_channel_closure_, FinishDestroy, this,
1610                     grpc_schedule_on_exec_ctx);
1611   if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
1612     gpr_log(GPR_INFO, "Disconnected client");
1613   }
1614   grpc_transport_op* op =
1615       grpc_make_transport_op(&finish_destroy_channel_closure_);
1616   op->set_accept_stream = true;
1617   grpc_channel_next_op(grpc_channel_stack_element(channel_->channel_stack(), 0),
1618                        op);
1619 }
1620 
InitChannelElement(grpc_channel_element * elem,grpc_channel_element_args * args)1621 grpc_error_handle Server::ChannelData::InitChannelElement(
1622     grpc_channel_element* elem, grpc_channel_element_args* args) {
1623   GPR_ASSERT(args->is_first);
1624   GPR_ASSERT(!args->is_last);
1625   new (elem->channel_data) ChannelData();
1626   return absl::OkStatus();
1627 }
1628 
DestroyChannelElement(grpc_channel_element * elem)1629 void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) {
1630   auto* chand = static_cast<ChannelData*>(elem->channel_data);
1631   chand->~ChannelData();
1632 }
1633 
1634 //
1635 // Server::CallData
1636 //
1637 
CallData(grpc_call_element * elem,const grpc_call_element_args & args,RefCountedPtr<Server> server)1638 Server::CallData::CallData(grpc_call_element* elem,
1639                            const grpc_call_element_args& args,
1640                            RefCountedPtr<Server> server)
1641     : server_(std::move(server)),
1642       call_(grpc_call_from_top_element(elem)),
1643       call_combiner_(args.call_combiner) {
1644   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
1645                     elem, grpc_schedule_on_exec_ctx);
1646   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
1647                     elem, grpc_schedule_on_exec_ctx);
1648 }
1649 
~CallData()1650 Server::CallData::~CallData() {
1651   GPR_ASSERT(state_.load(std::memory_order_relaxed) != CallState::PENDING);
1652   grpc_metadata_array_destroy(&initial_metadata_);
1653   grpc_byte_buffer_destroy(payload_);
1654 }
1655 
SetState(CallState state)1656 void Server::CallData::SetState(CallState state) {
1657   state_.store(state, std::memory_order_relaxed);
1658 }
1659 
MaybeActivate()1660 bool Server::CallData::MaybeActivate() {
1661   CallState expected = CallState::PENDING;
1662   return state_.compare_exchange_strong(expected, CallState::ACTIVATED,
1663                                         std::memory_order_acq_rel,
1664                                         std::memory_order_relaxed);
1665 }
1666 
FailCallCreation()1667 void Server::CallData::FailCallCreation() {
1668   CallState expected_not_started = CallState::NOT_STARTED;
1669   CallState expected_pending = CallState::PENDING;
1670   if (state_.compare_exchange_strong(expected_not_started, CallState::ZOMBIED,
1671                                      std::memory_order_acq_rel,
1672                                      std::memory_order_acquire)) {
1673     KillZombie();
1674   } else if (state_.compare_exchange_strong(
1675                  expected_pending, CallState::ZOMBIED,
1676                  std::memory_order_acq_rel, std::memory_order_relaxed)) {
1677     // Zombied call will be destroyed when it's removed from the pending
1678     // queue... later.
1679   }
1680 }
1681 
Start(grpc_call_element * elem)1682 void Server::CallData::Start(grpc_call_element* elem) {
1683   grpc_op op;
1684   op.op = GRPC_OP_RECV_INITIAL_METADATA;
1685   op.flags = 0;
1686   op.reserved = nullptr;
1687   op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_;
1688   GRPC_CLOSURE_INIT(&recv_initial_metadata_batch_complete_,
1689                     RecvInitialMetadataBatchComplete, elem,
1690                     grpc_schedule_on_exec_ctx);
1691   grpc_call_start_batch_and_execute(call_, &op, 1,
1692                                     &recv_initial_metadata_batch_complete_);
1693 }
1694 
Publish(size_t cq_idx,RequestedCall * rc)1695 void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) {
1696   grpc_call_set_completion_queue(call_, rc->cq_bound_to_call);
1697   *rc->call = call_;
1698   cq_new_ = server_->cqs_[cq_idx];
1699   std::swap(*rc->initial_metadata, initial_metadata_);
1700   switch (rc->type) {
1701     case RequestedCall::Type::BATCH_CALL:
1702       GPR_ASSERT(host_.has_value());
1703       GPR_ASSERT(path_.has_value());
1704       rc->data.batch.details->host = CSliceRef(host_->c_slice());
1705       rc->data.batch.details->method = CSliceRef(path_->c_slice());
1706       rc->data.batch.details->deadline =
1707           deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1708       break;
1709     case RequestedCall::Type::REGISTERED_CALL:
1710       *rc->data.registered.deadline =
1711           deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1712       if (rc->data.registered.optional_payload != nullptr) {
1713         *rc->data.registered.optional_payload = payload_;
1714         payload_ = nullptr;
1715       }
1716       break;
1717     default:
1718       GPR_UNREACHABLE_CODE(return);
1719   }
1720   grpc_cq_end_op(cq_new_, rc->tag, absl::OkStatus(), Server::DoneRequestEvent,
1721                  rc, &rc->completion, true);
1722 }
1723 
PublishNewRpc(void * arg,grpc_error_handle error)1724 void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) {
1725   grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
1726   auto* calld = static_cast<Server::CallData*>(call_elem->call_data);
1727   auto* chand = static_cast<Server::ChannelData*>(call_elem->channel_data);
1728   RequestMatcherInterface* rm = calld->matcher_;
1729   Server* server = rm->server();
1730   if (!error.ok() || server->ShutdownCalled()) {
1731     calld->state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1732     calld->KillZombie();
1733     return;
1734   }
1735   rm->MatchOrQueue(chand->cq_idx(), calld);
1736 }
1737 
1738 namespace {
1739 
KillZombieClosure(void * call,grpc_error_handle)1740 void KillZombieClosure(void* call, grpc_error_handle /*error*/) {
1741   grpc_call_unref(static_cast<grpc_call*>(call));
1742 }
1743 
1744 }  // namespace
1745 
KillZombie()1746 void Server::CallData::KillZombie() {
1747   GRPC_CLOSURE_INIT(&kill_zombie_closure_, KillZombieClosure, call_,
1748                     grpc_schedule_on_exec_ctx);
1749   ExecCtx::Run(DEBUG_LOCATION, &kill_zombie_closure_, absl::OkStatus());
1750 }
1751 
1752 // If this changes, change MakeCallPromise too.
StartNewRpc(grpc_call_element * elem)1753 void Server::CallData::StartNewRpc(grpc_call_element* elem) {
1754   if (server_->ShutdownCalled()) {
1755     state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1756     KillZombie();
1757     return;
1758   }
1759   // Find request matcher.
1760   matcher_ = server_->unregistered_request_matcher_.get();
1761   grpc_server_register_method_payload_handling payload_handling =
1762       GRPC_SRM_PAYLOAD_NONE;
1763   if (path_.has_value() && host_.has_value()) {
1764     RegisteredMethod* rm = static_cast<RegisteredMethod*>(
1765         recv_initial_metadata_->get(GrpcRegisteredMethod()).value_or(nullptr));
1766     if (rm != nullptr) {
1767       matcher_ = rm->matcher.get();
1768       payload_handling = rm->payload_handling;
1769     }
1770   }
1771   // Start recv_message op if needed.
1772   switch (payload_handling) {
1773     case GRPC_SRM_PAYLOAD_NONE:
1774       PublishNewRpc(elem, absl::OkStatus());
1775       break;
1776     case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
1777       grpc_op op;
1778       op.op = GRPC_OP_RECV_MESSAGE;
1779       op.flags = 0;
1780       op.reserved = nullptr;
1781       op.data.recv_message.recv_message = &payload_;
1782       GRPC_CLOSURE_INIT(&publish_, PublishNewRpc, elem,
1783                         grpc_schedule_on_exec_ctx);
1784       grpc_call_start_batch_and_execute(call_, &op, 1, &publish_);
1785       break;
1786     }
1787   }
1788 }
1789 
RecvInitialMetadataBatchComplete(void * arg,grpc_error_handle error)1790 void Server::CallData::RecvInitialMetadataBatchComplete(
1791     void* arg, grpc_error_handle error) {
1792   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1793   auto* calld = static_cast<Server::CallData*>(elem->call_data);
1794   if (!error.ok()) {
1795     gpr_log(GPR_DEBUG, "Failed call creation: %s",
1796             StatusToString(error).c_str());
1797     calld->FailCallCreation();
1798     return;
1799   }
1800   calld->StartNewRpc(elem);
1801 }
1802 
StartTransportStreamOpBatchImpl(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1803 void Server::CallData::StartTransportStreamOpBatchImpl(
1804     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1805   if (batch->recv_initial_metadata) {
1806     recv_initial_metadata_ =
1807         batch->payload->recv_initial_metadata.recv_initial_metadata;
1808     original_recv_initial_metadata_ready_ =
1809         batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
1810     batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1811         &recv_initial_metadata_ready_;
1812   }
1813   if (batch->recv_trailing_metadata) {
1814     original_recv_trailing_metadata_ready_ =
1815         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1816     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1817         &recv_trailing_metadata_ready_;
1818   }
1819   grpc_call_next_op(elem, batch);
1820 }
1821 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1822 void Server::CallData::RecvInitialMetadataReady(void* arg,
1823                                                 grpc_error_handle error) {
1824   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1825   CallData* calld = static_cast<CallData*>(elem->call_data);
1826   if (error.ok()) {
1827     calld->path_ = calld->recv_initial_metadata_->Take(HttpPathMetadata());
1828     auto* host =
1829         calld->recv_initial_metadata_->get_pointer(HttpAuthorityMetadata());
1830     if (host != nullptr) calld->host_.emplace(host->Ref());
1831   }
1832   auto op_deadline = calld->recv_initial_metadata_->get(GrpcTimeoutMetadata());
1833   if (op_deadline.has_value()) {
1834     calld->deadline_ = *op_deadline;
1835   }
1836   if (calld->host_.has_value() && calld->path_.has_value()) {
1837     // do nothing
1838   } else if (error.ok()) {
1839     // Pass the error reference to calld->recv_initial_metadata_error
1840     error = absl::UnknownError("Missing :authority or :path");
1841     calld->recv_initial_metadata_error_ = error;
1842   }
1843   grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
1844   calld->original_recv_initial_metadata_ready_ = nullptr;
1845   if (calld->seen_recv_trailing_metadata_ready_) {
1846     GRPC_CALL_COMBINER_START(calld->call_combiner_,
1847                              &calld->recv_trailing_metadata_ready_,
1848                              calld->recv_trailing_metadata_error_,
1849                              "continue server recv_trailing_metadata_ready");
1850   }
1851   Closure::Run(DEBUG_LOCATION, closure, error);
1852 }
1853 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)1854 void Server::CallData::RecvTrailingMetadataReady(void* arg,
1855                                                  grpc_error_handle error) {
1856   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1857   CallData* calld = static_cast<CallData*>(elem->call_data);
1858   if (calld->original_recv_initial_metadata_ready_ != nullptr) {
1859     calld->recv_trailing_metadata_error_ = error;
1860     calld->seen_recv_trailing_metadata_ready_ = true;
1861     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
1862                       RecvTrailingMetadataReady, elem,
1863                       grpc_schedule_on_exec_ctx);
1864     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1865                             "deferring server recv_trailing_metadata_ready "
1866                             "until after recv_initial_metadata_ready");
1867     return;
1868   }
1869   error = grpc_error_add_child(error, calld->recv_initial_metadata_error_);
1870   Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
1871                error);
1872 }
1873 
InitCallElement(grpc_call_element * elem,const grpc_call_element_args * args)1874 grpc_error_handle Server::CallData::InitCallElement(
1875     grpc_call_element* elem, const grpc_call_element_args* args) {
1876   auto* chand = static_cast<ChannelData*>(elem->channel_data);
1877   new (elem->call_data) Server::CallData(elem, *args, chand->server());
1878   return absl::OkStatus();
1879 }
1880 
DestroyCallElement(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)1881 void Server::CallData::DestroyCallElement(
1882     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1883     grpc_closure* /*ignored*/) {
1884   auto* calld = static_cast<CallData*>(elem->call_data);
1885   calld->~CallData();
1886 }
1887 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1888 void Server::CallData::StartTransportStreamOpBatch(
1889     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1890   auto* calld = static_cast<CallData*>(elem->call_data);
1891   calld->StartTransportStreamOpBatchImpl(elem, batch);
1892 }
1893 
1894 }  // namespace grpc_core
1895 
1896 //
1897 // C-core API
1898 //
1899 
grpc_server_create(const grpc_channel_args * args,void * reserved)1900 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1901   grpc_core::ExecCtx exec_ctx;
1902   GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1903   grpc_core::Server* server =
1904       new grpc_core::Server(grpc_core::CoreConfiguration::Get()
1905                                 .channel_args_preconditioning()
1906                                 .PreconditionChannelArgs(args));
1907   return server->c_ptr();
1908 }
1909 
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1910 void grpc_server_register_completion_queue(grpc_server* server,
1911                                            grpc_completion_queue* cq,
1912                                            void* reserved) {
1913   GRPC_API_TRACE(
1914       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1915       (server, cq, reserved));
1916   GPR_ASSERT(!reserved);
1917   auto cq_type = grpc_get_cq_completion_type(cq);
1918   if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1919     gpr_log(GPR_INFO,
1920             "Completion queue of type %d is being registered as a "
1921             "server-completion-queue",
1922             static_cast<int>(cq_type));
1923     // Ideally we should log an error and abort but ruby-wrapped-language API
1924     // calls grpc_completion_queue_pluck() on server completion queues
1925   }
1926   grpc_core::Server::FromC(server)->RegisterCompletionQueue(cq);
1927 }
1928 
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1929 void* grpc_server_register_method(
1930     grpc_server* server, const char* method, const char* host,
1931     grpc_server_register_method_payload_handling payload_handling,
1932     uint32_t flags) {
1933   GRPC_API_TRACE(
1934       "grpc_server_register_method(server=%p, method=%s, host=%s, "
1935       "flags=0x%08x)",
1936       4, (server, method, host, flags));
1937   return grpc_core::Server::FromC(server)->RegisterMethod(
1938       method, host, payload_handling, flags);
1939 }
1940 
grpc_server_start(grpc_server * server)1941 void grpc_server_start(grpc_server* server) {
1942   grpc_core::ExecCtx exec_ctx;
1943   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1944   grpc_core::Server::FromC(server)->Start();
1945 }
1946 
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1947 void grpc_server_shutdown_and_notify(grpc_server* server,
1948                                      grpc_completion_queue* cq, void* tag) {
1949   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1950   grpc_core::ExecCtx exec_ctx;
1951   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1952                  (server, cq, tag));
1953   grpc_core::Server::FromC(server)->ShutdownAndNotify(cq, tag);
1954 }
1955 
grpc_server_cancel_all_calls(grpc_server * server)1956 void grpc_server_cancel_all_calls(grpc_server* server) {
1957   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1958   grpc_core::ExecCtx exec_ctx;
1959   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1960   grpc_core::Server::FromC(server)->CancelAllCalls();
1961 }
1962 
grpc_server_destroy(grpc_server * server)1963 void grpc_server_destroy(grpc_server* server) {
1964   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1965   grpc_core::ExecCtx exec_ctx;
1966   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1967   grpc_core::Server::FromC(server)->Orphan();
1968 }
1969 
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1970 grpc_call_error grpc_server_request_call(
1971     grpc_server* server, grpc_call** call, grpc_call_details* details,
1972     grpc_metadata_array* request_metadata,
1973     grpc_completion_queue* cq_bound_to_call,
1974     grpc_completion_queue* cq_for_notification, void* tag) {
1975   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1976   grpc_core::ExecCtx exec_ctx;
1977   GRPC_API_TRACE(
1978       "grpc_server_request_call("
1979       "server=%p, call=%p, details=%p, initial_metadata=%p, "
1980       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1981       7,
1982       (server, call, details, request_metadata, cq_bound_to_call,
1983        cq_for_notification, tag));
1984   return grpc_core::Server::FromC(server)->RequestCall(
1985       call, details, request_metadata, cq_bound_to_call, cq_for_notification,
1986       tag);
1987 }
1988 
grpc_server_request_registered_call(grpc_server * server,void * registered_method,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1989 grpc_call_error grpc_server_request_registered_call(
1990     grpc_server* server, void* registered_method, grpc_call** call,
1991     gpr_timespec* deadline, grpc_metadata_array* request_metadata,
1992     grpc_byte_buffer** optional_payload,
1993     grpc_completion_queue* cq_bound_to_call,
1994     grpc_completion_queue* cq_for_notification, void* tag_new) {
1995   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1996   grpc_core::ExecCtx exec_ctx;
1997   auto* rm =
1998       static_cast<grpc_core::Server::RegisteredMethod*>(registered_method);
1999   GRPC_API_TRACE(
2000       "grpc_server_request_registered_call("
2001       "server=%p, registered_method=%p, call=%p, deadline=%p, "
2002       "request_metadata=%p, "
2003       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
2004       "tag=%p)",
2005       9,
2006       (server, registered_method, call, deadline, request_metadata,
2007        optional_payload, cq_bound_to_call, cq_for_notification, tag_new));
2008   return grpc_core::Server::FromC(server)->RequestRegisteredCall(
2009       rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call,
2010       cq_for_notification, tag_new);
2011 }
2012 
grpc_server_set_config_fetcher(grpc_server * server,grpc_server_config_fetcher * server_config_fetcher)2013 void grpc_server_set_config_fetcher(
2014     grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) {
2015   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2016   grpc_core::ExecCtx exec_ctx;
2017   GRPC_API_TRACE("grpc_server_set_config_fetcher(server=%p, config_fetcher=%p)",
2018                  2, (server, server_config_fetcher));
2019   grpc_core::Server::FromC(server)->set_config_fetcher(
2020       std::unique_ptr<grpc_server_config_fetcher>(server_config_fetcher));
2021 }
2022 
grpc_server_config_fetcher_destroy(grpc_server_config_fetcher * server_config_fetcher)2023 void grpc_server_config_fetcher_destroy(
2024     grpc_server_config_fetcher* server_config_fetcher) {
2025   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2026   grpc_core::ExecCtx exec_ctx;
2027   GRPC_API_TRACE("grpc_server_config_fetcher_destroy(config_fetcher=%p)", 1,
2028                  (server_config_fetcher));
2029   delete server_config_fetcher;
2030 }
2031