1 //
2 // Copyright 2015-2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/lib/surface/server.h"
20
21 #include <inttypes.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <list>
28 #include <memory>
29 #include <new>
30 #include <queue>
31 #include <type_traits>
32 #include <utility>
33 #include <vector>
34
35 #include "absl/cleanup/cleanup.h"
36 #include "absl/container/flat_hash_map.h"
37 #include "absl/status/status.h"
38 #include "absl/types/optional.h"
39
40 #include <grpc/byte_buffer.h>
41 #include <grpc/grpc.h>
42 #include <grpc/impl/channel_arg_names.h>
43 #include <grpc/impl/connectivity_state.h>
44 #include <grpc/slice.h>
45 #include <grpc/status.h>
46 #include <grpc/support/log.h>
47 #include <grpc/support/time.h>
48
49 #include "src/core/lib/channel/channel_args.h"
50 #include "src/core/lib/channel/channel_args_preconditioning.h"
51 #include "src/core/lib/channel/channel_trace.h"
52 #include "src/core/lib/channel/channelz.h"
53 #include "src/core/lib/config/core_configuration.h"
54 #include "src/core/lib/debug/stats.h"
55 #include "src/core/lib/experiments/experiments.h"
56 #include "src/core/lib/gpr/useful.h"
57 #include "src/core/lib/gprpp/crash.h"
58 #include "src/core/lib/gprpp/debug_location.h"
59 #include "src/core/lib/gprpp/mpscq.h"
60 #include "src/core/lib/gprpp/status_helper.h"
61 #include "src/core/lib/iomgr/exec_ctx.h"
62 #include "src/core/lib/iomgr/pollset_set.h"
63 #include "src/core/lib/promise/activity.h"
64 #include "src/core/lib/promise/cancel_callback.h"
65 #include "src/core/lib/promise/context.h"
66 #include "src/core/lib/promise/map.h"
67 #include "src/core/lib/promise/pipe.h"
68 #include "src/core/lib/promise/poll.h"
69 #include "src/core/lib/promise/promise.h"
70 #include "src/core/lib/promise/seq.h"
71 #include "src/core/lib/promise/try_join.h"
72 #include "src/core/lib/promise/try_seq.h"
73 #include "src/core/lib/slice/slice_buffer.h"
74 #include "src/core/lib/slice/slice_internal.h"
75 #include "src/core/lib/surface/api_trace.h"
76 #include "src/core/lib/surface/call.h"
77 #include "src/core/lib/surface/channel.h"
78 #include "src/core/lib/surface/channel_stack_type.h"
79 #include "src/core/lib/surface/completion_queue.h"
80 #include "src/core/lib/surface/legacy_channel.h"
81 #include "src/core/lib/surface/wait_for_cq_end_op.h"
82 #include "src/core/lib/transport/connectivity_state.h"
83 #include "src/core/lib/transport/error_utils.h"
84
85 namespace grpc_core {
86
87 TraceFlag grpc_server_channel_trace(false, "server_channel");
88
89 //
90 // Server::RegisteredMethod
91 //
92
93 struct Server::RegisteredMethod {
RegisteredMethodgrpc_core::Server::RegisteredMethod94 RegisteredMethod(
95 const char* method_arg, const char* host_arg,
96 grpc_server_register_method_payload_handling payload_handling_arg,
97 uint32_t flags_arg)
98 : method(method_arg == nullptr ? "" : method_arg),
99 host(host_arg == nullptr ? "" : host_arg),
100 payload_handling(payload_handling_arg),
101 flags(flags_arg) {}
102
103 ~RegisteredMethod() = default;
104
105 const std::string method;
106 const std::string host;
107 const grpc_server_register_method_payload_handling payload_handling;
108 const uint32_t flags;
109 // One request matcher per method.
110 std::unique_ptr<RequestMatcherInterface> matcher;
111 };
112
113 //
114 // Server::RequestMatcherInterface
115 //
116
117 // RPCs that come in from the transport must be matched against RPC requests
118 // from the application. An incoming request from the application can be matched
119 // to an RPC that has already arrived or can be queued up for later use.
120 // Likewise, an RPC coming in from the transport can either be matched to a
121 // request that already arrived from the application or can be queued up for
122 // later use (marked pending). If there is a match, the request's tag is posted
123 // on the request's notification CQ.
124 //
125 // RequestMatcherInterface is the base class to provide this functionality.
126 class Server::RequestMatcherInterface {
127 public:
~RequestMatcherInterface()128 virtual ~RequestMatcherInterface() {}
129
130 // Unref the calls associated with any incoming RPCs in the pending queue (not
131 // yet matched to an application-requested RPC).
132 virtual void ZombifyPending() = 0;
133
134 // Mark all application-requested RPCs failed if they have not been matched to
135 // an incoming RPC. The error parameter indicates why the RPCs are being
136 // failed (always server shutdown in all current implementations).
137 virtual void KillRequests(grpc_error_handle error) = 0;
138
139 // How many request queues are supported by this matcher. This is an abstract
140 // concept that essentially maps to gRPC completion queues.
141 virtual size_t request_queue_count() const = 0;
142
143 // This function is invoked when the application requests a new RPC whose
144 // information is in the call parameter. The request_queue_index marks the
145 // queue onto which to place this RPC, and is typically associated with a gRPC
146 // CQ. If there are pending RPCs waiting to be matched, publish one (match it
147 // and notify the CQ).
148 virtual void RequestCallWithPossiblePublish(size_t request_queue_index,
149 RequestedCall* call) = 0;
150
151 class MatchResult {
152 public:
MatchResult(Server * server,size_t cq_idx,RequestedCall * requested_call)153 MatchResult(Server* server, size_t cq_idx, RequestedCall* requested_call)
154 : server_(server), cq_idx_(cq_idx), requested_call_(requested_call) {}
~MatchResult()155 ~MatchResult() {
156 if (requested_call_ != nullptr) {
157 server_->FailCall(cq_idx_, requested_call_, absl::CancelledError());
158 }
159 }
160
161 MatchResult(const MatchResult&) = delete;
162 MatchResult& operator=(const MatchResult&) = delete;
163
MatchResult(MatchResult && other)164 MatchResult(MatchResult&& other) noexcept
165 : server_(other.server_),
166 cq_idx_(other.cq_idx_),
167 requested_call_(std::exchange(other.requested_call_, nullptr)) {}
168
TakeCall()169 RequestedCall* TakeCall() {
170 return std::exchange(requested_call_, nullptr);
171 }
172
cq() const173 grpc_completion_queue* cq() const { return server_->cqs_[cq_idx_]; }
cq_idx() const174 size_t cq_idx() const { return cq_idx_; }
175
176 private:
177 Server* server_;
178 size_t cq_idx_;
179 RequestedCall* requested_call_;
180 };
181
182 // This function is invoked on an incoming promise based RPC.
183 // The RequestMatcher will try to match it against an application-requested
184 // RPC if possible or will place it in the pending queue otherwise. To enable
185 // some measure of fairness between server CQs, the match is done starting at
186 // the start_request_queue_index parameter in a cyclic order rather than
187 // always starting at 0.
188 virtual ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
189 size_t start_request_queue_index) = 0;
190
191 // This function is invoked on an incoming RPC, represented by the calld
192 // object. The RequestMatcher will try to match it against an
193 // application-requested RPC if possible or will place it in the pending queue
194 // otherwise. To enable some measure of fairness between server CQs, the match
195 // is done starting at the start_request_queue_index parameter in a cyclic
196 // order rather than always starting at 0.
197 virtual void MatchOrQueue(size_t start_request_queue_index,
198 CallData* calld) = 0;
199
200 // Returns the server associated with this request matcher
201 virtual Server* server() const = 0;
202 };
203
204 //
205 // Server::RequestedCall
206 //
207
208 struct Server::RequestedCall {
209 enum class Type { BATCH_CALL, REGISTERED_CALL };
210
RequestedCallgrpc_core::Server::RequestedCall211 RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
212 grpc_call** call_arg, grpc_metadata_array* initial_md,
213 grpc_call_details* details)
214 : type(Type::BATCH_CALL),
215 tag(tag_arg),
216 cq_bound_to_call(call_cq),
217 call(call_arg),
218 initial_metadata(initial_md) {
219 data.batch.details = details;
220 }
221
RequestedCallgrpc_core::Server::RequestedCall222 RequestedCall(void* tag_arg, grpc_completion_queue* call_cq,
223 grpc_call** call_arg, grpc_metadata_array* initial_md,
224 RegisteredMethod* rm, gpr_timespec* deadline,
225 grpc_byte_buffer** optional_payload)
226 : type(Type::REGISTERED_CALL),
227 tag(tag_arg),
228 cq_bound_to_call(call_cq),
229 call(call_arg),
230 initial_metadata(initial_md) {
231 data.registered.method = rm;
232 data.registered.deadline = deadline;
233 data.registered.optional_payload = optional_payload;
234 }
235
Completegrpc_core::Server::RequestedCall236 void Complete(NextResult<MessageHandle> payload, ClientMetadata& md) {
237 Timestamp deadline = GetContext<CallContext>()->deadline();
238 switch (type) {
239 case RequestedCall::Type::BATCH_CALL:
240 GPR_ASSERT(!payload.has_value());
241 data.batch.details->host =
242 CSliceRef(md.get_pointer(HttpAuthorityMetadata())->c_slice());
243 data.batch.details->method =
244 CSliceRef(md.Take(HttpPathMetadata())->c_slice());
245 data.batch.details->deadline =
246 deadline.as_timespec(GPR_CLOCK_MONOTONIC);
247 break;
248 case RequestedCall::Type::REGISTERED_CALL:
249 md.Remove(HttpPathMetadata());
250 *data.registered.deadline = deadline.as_timespec(GPR_CLOCK_MONOTONIC);
251 if (data.registered.optional_payload != nullptr) {
252 if (payload.has_value()) {
253 auto* sb = payload.value()->payload()->c_slice_buffer();
254 *data.registered.optional_payload =
255 grpc_raw_byte_buffer_create(sb->slices, sb->count);
256 } else {
257 *data.registered.optional_payload = nullptr;
258 }
259 }
260 break;
261 default:
262 GPR_UNREACHABLE_CODE(abort());
263 }
264 }
265
266 MultiProducerSingleConsumerQueue::Node mpscq_node;
267 const Type type;
268 void* const tag;
269 grpc_completion_queue* const cq_bound_to_call;
270 grpc_call** const call;
271 grpc_cq_completion completion;
272 grpc_metadata_array* const initial_metadata;
273 union {
274 struct {
275 grpc_call_details* details;
276 } batch;
277 struct {
278 RegisteredMethod* method;
279 gpr_timespec* deadline;
280 grpc_byte_buffer** optional_payload;
281 } registered;
282 } data;
283 };
284
285 // The RealRequestMatcher is an implementation of RequestMatcherInterface that
286 // actually uses all the features of RequestMatcherInterface: expecting the
287 // application to explicitly request RPCs and then matching those to incoming
288 // RPCs, along with a slow path by which incoming RPCs are put on a locked
289 // pending list if they aren't able to be matched to an application request.
290 class Server::RealRequestMatcherFilterStack : public RequestMatcherInterface {
291 public:
RealRequestMatcherFilterStack(Server * server)292 explicit RealRequestMatcherFilterStack(Server* server)
293 : server_(server), requests_per_cq_(server->cqs_.size()) {}
294
~RealRequestMatcherFilterStack()295 ~RealRequestMatcherFilterStack() override {
296 for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
297 GPR_ASSERT(queue.Pop() == nullptr);
298 }
299 GPR_ASSERT(pending_.empty());
300 }
301
ZombifyPending()302 void ZombifyPending() override {
303 while (!pending_.empty()) {
304 pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
305 pending_.front().calld->KillZombie();
306 pending_.pop();
307 }
308 }
309
KillRequests(grpc_error_handle error)310 void KillRequests(grpc_error_handle error) override {
311 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
312 RequestedCall* rc;
313 while ((rc = reinterpret_cast<RequestedCall*>(
314 requests_per_cq_[i].Pop())) != nullptr) {
315 server_->FailCall(i, rc, error);
316 }
317 }
318 }
319
request_queue_count() const320 size_t request_queue_count() const override {
321 return requests_per_cq_.size();
322 }
323
RequestCallWithPossiblePublish(size_t request_queue_index,RequestedCall * call)324 void RequestCallWithPossiblePublish(size_t request_queue_index,
325 RequestedCall* call) override {
326 if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
327 // this was the first queued request: we need to lock and start
328 // matching calls
329 struct NextPendingCall {
330 RequestedCall* rc = nullptr;
331 CallData* pending;
332 };
333 while (true) {
334 NextPendingCall pending_call;
335 {
336 MutexLock lock(&server_->mu_call_);
337 while (!pending_.empty() &&
338 pending_.front().Age() > server_->max_time_in_pending_queue_) {
339 pending_.front().calld->SetState(CallData::CallState::ZOMBIED);
340 pending_.front().calld->KillZombie();
341 pending_.pop();
342 }
343 if (!pending_.empty()) {
344 pending_call.rc = reinterpret_cast<RequestedCall*>(
345 requests_per_cq_[request_queue_index].Pop());
346 if (pending_call.rc != nullptr) {
347 pending_call.pending = pending_.front().calld;
348 pending_.pop();
349 }
350 }
351 }
352 if (pending_call.rc == nullptr) break;
353 if (!pending_call.pending->MaybeActivate()) {
354 // Zombied Call
355 pending_call.pending->KillZombie();
356 requests_per_cq_[request_queue_index].Push(
357 &pending_call.rc->mpscq_node);
358 } else {
359 pending_call.pending->Publish(request_queue_index, pending_call.rc);
360 }
361 }
362 }
363 }
364
MatchOrQueue(size_t start_request_queue_index,CallData * calld)365 void MatchOrQueue(size_t start_request_queue_index,
366 CallData* calld) override {
367 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
368 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
369 RequestedCall* rc =
370 reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
371 if (rc != nullptr) {
372 calld->SetState(CallData::CallState::ACTIVATED);
373 calld->Publish(cq_idx, rc);
374 return;
375 }
376 }
377 // No cq to take the request found; queue it on the slow list.
378 // We need to ensure that all the queues are empty. We do this under
379 // the server mu_call_ lock to ensure that if something is added to
380 // an empty request queue, it will block until the call is actually
381 // added to the pending list.
382 RequestedCall* rc = nullptr;
383 size_t cq_idx = 0;
384 size_t loop_count;
385 {
386 MutexLock lock(&server_->mu_call_);
387 for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
388 cq_idx =
389 (start_request_queue_index + loop_count) % requests_per_cq_.size();
390 rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
391 if (rc != nullptr) {
392 break;
393 }
394 }
395 if (rc == nullptr) {
396 calld->SetState(CallData::CallState::PENDING);
397 pending_.push(PendingCall{calld});
398 return;
399 }
400 }
401 calld->SetState(CallData::CallState::ACTIVATED);
402 calld->Publish(cq_idx, rc);
403 }
404
MatchRequest(size_t)405 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(size_t) override {
406 Crash("not implemented for filter stack request matcher");
407 }
408
server() const409 Server* server() const final { return server_; }
410
411 private:
412 Server* const server_;
413 struct PendingCall {
414 CallData* calld;
415 Timestamp created = Timestamp::Now();
Agegrpc_core::Server::RealRequestMatcherFilterStack::PendingCall416 Duration Age() { return Timestamp::Now() - created; }
417 };
418 std::queue<PendingCall> pending_;
419 std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
420 };
421
422 class Server::RealRequestMatcherPromises : public RequestMatcherInterface {
423 public:
RealRequestMatcherPromises(Server * server)424 explicit RealRequestMatcherPromises(Server* server)
425 : server_(server), requests_per_cq_(server->cqs_.size()) {}
426
~RealRequestMatcherPromises()427 ~RealRequestMatcherPromises() override {
428 for (LockedMultiProducerSingleConsumerQueue& queue : requests_per_cq_) {
429 GPR_ASSERT(queue.Pop() == nullptr);
430 }
431 }
432
ZombifyPending()433 void ZombifyPending() override {
434 while (!pending_.empty()) {
435 pending_.front()->Finish(absl::InternalError("Server closed"));
436 pending_.pop();
437 }
438 }
439
KillRequests(grpc_error_handle error)440 void KillRequests(grpc_error_handle error) override {
441 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
442 RequestedCall* rc;
443 while ((rc = reinterpret_cast<RequestedCall*>(
444 requests_per_cq_[i].Pop())) != nullptr) {
445 server_->FailCall(i, rc, error);
446 }
447 }
448 }
449
request_queue_count() const450 size_t request_queue_count() const override {
451 return requests_per_cq_.size();
452 }
453
RequestCallWithPossiblePublish(size_t request_queue_index,RequestedCall * call)454 void RequestCallWithPossiblePublish(size_t request_queue_index,
455 RequestedCall* call) override {
456 if (requests_per_cq_[request_queue_index].Push(&call->mpscq_node)) {
457 // this was the first queued request: we need to lock and start
458 // matching calls
459 struct NextPendingCall {
460 RequestedCall* rc = nullptr;
461 PendingCall pending;
462 };
463 while (true) {
464 NextPendingCall pending_call;
465 {
466 MutexLock lock(&server_->mu_call_);
467 if (!pending_.empty()) {
468 pending_call.rc = reinterpret_cast<RequestedCall*>(
469 requests_per_cq_[request_queue_index].Pop());
470 if (pending_call.rc != nullptr) {
471 pending_call.pending = std::move(pending_.front());
472 pending_.pop();
473 }
474 }
475 }
476 if (pending_call.rc == nullptr) break;
477 if (!pending_call.pending->Finish(server(), request_queue_index,
478 pending_call.rc)) {
479 requests_per_cq_[request_queue_index].Push(
480 &pending_call.rc->mpscq_node);
481 }
482 }
483 }
484 }
485
MatchOrQueue(size_t,CallData *)486 void MatchOrQueue(size_t, CallData*) override {
487 Crash("not implemented for promises");
488 }
489
MatchRequest(size_t start_request_queue_index)490 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
491 size_t start_request_queue_index) override {
492 for (size_t i = 0; i < requests_per_cq_.size(); i++) {
493 size_t cq_idx = (start_request_queue_index + i) % requests_per_cq_.size();
494 RequestedCall* rc =
495 reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].TryPop());
496 if (rc != nullptr) {
497 return Immediate(MatchResult(server(), cq_idx, rc));
498 }
499 }
500 // No cq to take the request found; queue it on the slow list.
501 // We need to ensure that all the queues are empty. We do this under
502 // the server mu_call_ lock to ensure that if something is added to
503 // an empty request queue, it will block until the call is actually
504 // added to the pending list.
505 RequestedCall* rc = nullptr;
506 size_t cq_idx = 0;
507 size_t loop_count;
508 {
509 std::vector<std::shared_ptr<ActivityWaiter>> removed_pending;
510 MutexLock lock(&server_->mu_call_);
511 while (!pending_.empty() &&
512 pending_.front()->Age() > server_->max_time_in_pending_queue_) {
513 removed_pending.push_back(std::move(pending_.front()));
514 pending_.pop();
515 }
516 for (loop_count = 0; loop_count < requests_per_cq_.size(); loop_count++) {
517 cq_idx =
518 (start_request_queue_index + loop_count) % requests_per_cq_.size();
519 rc = reinterpret_cast<RequestedCall*>(requests_per_cq_[cq_idx].Pop());
520 if (rc != nullptr) break;
521 }
522 if (rc == nullptr) {
523 if (server_->pending_backlog_protector_.Reject(pending_.size(),
524 server_->bitgen_)) {
525 return Immediate(absl::ResourceExhaustedError(
526 "Too many pending requests for this server"));
527 }
528 auto w = std::make_shared<ActivityWaiter>(
529 GetContext<Activity>()->MakeOwningWaker());
530 pending_.push(w);
531 return OnCancel(
532 [w]() -> Poll<absl::StatusOr<MatchResult>> {
533 std::unique_ptr<absl::StatusOr<MatchResult>> r(
534 w->result.exchange(nullptr, std::memory_order_acq_rel));
535 if (r == nullptr) return Pending{};
536 return std::move(*r);
537 },
538 [w]() { w->Expire(); });
539 }
540 }
541 return Immediate(MatchResult(server(), cq_idx, rc));
542 }
543
server() const544 Server* server() const final { return server_; }
545
546 private:
547 Server* const server_;
548 struct ActivityWaiter {
549 using ResultType = absl::StatusOr<MatchResult>;
ActivityWaitergrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter550 explicit ActivityWaiter(Waker waker) : waker(std::move(waker)) {}
~ActivityWaitergrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter551 ~ActivityWaiter() { delete result.load(std::memory_order_acquire); }
Finishgrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter552 void Finish(absl::Status status) {
553 delete result.exchange(new ResultType(std::move(status)),
554 std::memory_order_acq_rel);
555 waker.WakeupAsync();
556 }
557 // Returns true if requested_call consumed, false otherwise.
Finishgrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter558 GRPC_MUST_USE_RESULT bool Finish(Server* server, size_t cq_idx,
559 RequestedCall* requested_call) {
560 ResultType* expected = nullptr;
561 ResultType* new_value =
562 new ResultType(MatchResult(server, cq_idx, requested_call));
563 if (!result.compare_exchange_strong(expected, new_value,
564 std::memory_order_acq_rel,
565 std::memory_order_acquire)) {
566 GPR_ASSERT(new_value->value().TakeCall() == requested_call);
567 delete new_value;
568 return false;
569 }
570 waker.WakeupAsync();
571 return true;
572 }
Expiregrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter573 void Expire() {
574 delete result.exchange(new ResultType(absl::CancelledError()),
575 std::memory_order_acq_rel);
576 }
Agegrpc_core::Server::RealRequestMatcherPromises::ActivityWaiter577 Duration Age() { return Timestamp::Now() - created; }
578 Waker waker;
579 std::atomic<ResultType*> result{nullptr};
580 const Timestamp created = Timestamp::Now();
581 };
582 using PendingCall = std::shared_ptr<ActivityWaiter>;
583 std::queue<PendingCall> pending_;
584 std::vector<LockedMultiProducerSingleConsumerQueue> requests_per_cq_;
585 };
586
587 // AllocatingRequestMatchers don't allow the application to request an RPC in
588 // advance or queue up any incoming RPC for later match. Instead, MatchOrQueue
589 // will call out to an allocation function passed in at the construction of the
590 // object. These request matchers are designed for the C++ callback API, so they
591 // only support 1 completion queue (passed in at the constructor). They are also
592 // used for the sync API.
593 class Server::AllocatingRequestMatcherBase : public RequestMatcherInterface {
594 public:
AllocatingRequestMatcherBase(Server * server,grpc_completion_queue * cq)595 AllocatingRequestMatcherBase(Server* server, grpc_completion_queue* cq)
596 : server_(server), cq_(cq) {
597 size_t idx;
598 for (idx = 0; idx < server->cqs_.size(); idx++) {
599 if (server->cqs_[idx] == cq) {
600 break;
601 }
602 }
603 GPR_ASSERT(idx < server->cqs_.size());
604 cq_idx_ = idx;
605 }
606
ZombifyPending()607 void ZombifyPending() override {}
608
KillRequests(grpc_error_handle)609 void KillRequests(grpc_error_handle /*error*/) override {}
610
request_queue_count() const611 size_t request_queue_count() const override { return 0; }
612
RequestCallWithPossiblePublish(size_t,RequestedCall *)613 void RequestCallWithPossiblePublish(size_t /*request_queue_index*/,
614 RequestedCall* /*call*/) final {
615 Crash("unreachable");
616 }
617
server() const618 Server* server() const final { return server_; }
619
620 // Supply the completion queue related to this request matcher
cq() const621 grpc_completion_queue* cq() const { return cq_; }
622
623 // Supply the completion queue's index relative to the server.
cq_idx() const624 size_t cq_idx() const { return cq_idx_; }
625
626 private:
627 Server* const server_;
628 grpc_completion_queue* const cq_;
629 size_t cq_idx_;
630 };
631
632 // An allocating request matcher for non-registered methods (used for generic
633 // API and unimplemented RPCs).
634 class Server::AllocatingRequestMatcherBatch
635 : public AllocatingRequestMatcherBase {
636 public:
AllocatingRequestMatcherBatch(Server * server,grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)637 AllocatingRequestMatcherBatch(Server* server, grpc_completion_queue* cq,
638 std::function<BatchCallAllocation()> allocator)
639 : AllocatingRequestMatcherBase(server, cq),
640 allocator_(std::move(allocator)) {}
641
MatchOrQueue(size_t,CallData * calld)642 void MatchOrQueue(size_t /*start_request_queue_index*/,
643 CallData* calld) override {
644 const bool still_running = server()->ShutdownRefOnRequest();
645 auto cleanup_ref =
646 absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
647 if (still_running) {
648 BatchCallAllocation call_info = allocator_();
649 GPR_ASSERT(server()->ValidateServerRequest(
650 cq(), static_cast<void*>(call_info.tag), nullptr,
651 nullptr) == GRPC_CALL_OK);
652 RequestedCall* rc = new RequestedCall(
653 static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
654 call_info.initial_metadata, call_info.details);
655 calld->SetState(CallData::CallState::ACTIVATED);
656 calld->Publish(cq_idx(), rc);
657 } else {
658 calld->FailCallCreation();
659 }
660 }
661
MatchRequest(size_t)662 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
663 size_t /*start_request_queue_index*/) override {
664 BatchCallAllocation call_info = allocator_();
665 GPR_ASSERT(server()->ValidateServerRequest(
666 cq(), static_cast<void*>(call_info.tag), nullptr, nullptr) ==
667 GRPC_CALL_OK);
668 RequestedCall* rc = new RequestedCall(
669 static_cast<void*>(call_info.tag), call_info.cq, call_info.call,
670 call_info.initial_metadata, call_info.details);
671 return Immediate(MatchResult(server(), cq_idx(), rc));
672 }
673
674 private:
675 std::function<BatchCallAllocation()> allocator_;
676 };
677
678 // An allocating request matcher for registered methods.
679 class Server::AllocatingRequestMatcherRegistered
680 : public AllocatingRequestMatcherBase {
681 public:
AllocatingRequestMatcherRegistered(Server * server,grpc_completion_queue * cq,RegisteredMethod * rm,std::function<RegisteredCallAllocation ()> allocator)682 AllocatingRequestMatcherRegistered(
683 Server* server, grpc_completion_queue* cq, RegisteredMethod* rm,
684 std::function<RegisteredCallAllocation()> allocator)
685 : AllocatingRequestMatcherBase(server, cq),
686 registered_method_(rm),
687 allocator_(std::move(allocator)) {}
688
MatchOrQueue(size_t,CallData * calld)689 void MatchOrQueue(size_t /*start_request_queue_index*/,
690 CallData* calld) override {
691 auto cleanup_ref =
692 absl::MakeCleanup([this] { server()->ShutdownUnrefOnRequest(); });
693 if (server()->ShutdownRefOnRequest()) {
694 RegisteredCallAllocation call_info = allocator_();
695 GPR_ASSERT(server()->ValidateServerRequest(
696 cq(), call_info.tag, call_info.optional_payload,
697 registered_method_) == GRPC_CALL_OK);
698 RequestedCall* rc =
699 new RequestedCall(call_info.tag, call_info.cq, call_info.call,
700 call_info.initial_metadata, registered_method_,
701 call_info.deadline, call_info.optional_payload);
702 calld->SetState(CallData::CallState::ACTIVATED);
703 calld->Publish(cq_idx(), rc);
704 } else {
705 calld->FailCallCreation();
706 }
707 }
708
MatchRequest(size_t)709 ArenaPromise<absl::StatusOr<MatchResult>> MatchRequest(
710 size_t /*start_request_queue_index*/) override {
711 RegisteredCallAllocation call_info = allocator_();
712 GPR_ASSERT(server()->ValidateServerRequest(
713 cq(), call_info.tag, call_info.optional_payload,
714 registered_method_) == GRPC_CALL_OK);
715 RequestedCall* rc = new RequestedCall(
716 call_info.tag, call_info.cq, call_info.call, call_info.initial_metadata,
717 registered_method_, call_info.deadline, call_info.optional_payload);
718 return Immediate(MatchResult(server(), cq_idx(), rc));
719 }
720
721 private:
722 RegisteredMethod* const registered_method_;
723 std::function<RegisteredCallAllocation()> allocator_;
724 };
725
726 //
727 // ChannelBroadcaster
728 //
729
730 namespace {
731
732 class ChannelBroadcaster {
733 public:
734 // This can have an empty constructor and destructor since we want to control
735 // when the actual setup and shutdown broadcast take place.
736
737 // Copies over the channels from the locked server.
FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels)738 void FillChannelsLocked(std::vector<RefCountedPtr<Channel>> channels) {
739 GPR_DEBUG_ASSERT(channels_.empty());
740 channels_ = std::move(channels);
741 }
742
743 // Broadcasts a shutdown on each channel.
BroadcastShutdown(bool send_goaway,grpc_error_handle force_disconnect)744 void BroadcastShutdown(bool send_goaway, grpc_error_handle force_disconnect) {
745 for (const RefCountedPtr<Channel>& channel : channels_) {
746 SendShutdown(channel.get(), send_goaway, force_disconnect);
747 }
748 channels_.clear(); // just for safety against double broadcast
749 }
750
751 private:
752 struct ShutdownCleanupArgs {
753 grpc_closure closure;
754 grpc_slice slice;
755 };
756
ShutdownCleanup(void * arg,grpc_error_handle)757 static void ShutdownCleanup(void* arg, grpc_error_handle /*error*/) {
758 ShutdownCleanupArgs* a = static_cast<ShutdownCleanupArgs*>(arg);
759 CSliceUnref(a->slice);
760 delete a;
761 }
762
SendShutdown(Channel * channel,bool send_goaway,grpc_error_handle send_disconnect)763 static void SendShutdown(Channel* channel, bool send_goaway,
764 grpc_error_handle send_disconnect) {
765 ShutdownCleanupArgs* sc = new ShutdownCleanupArgs;
766 GRPC_CLOSURE_INIT(&sc->closure, ShutdownCleanup, sc,
767 grpc_schedule_on_exec_ctx);
768 grpc_transport_op* op = grpc_make_transport_op(&sc->closure);
769 grpc_channel_element* elem;
770 op->goaway_error =
771 send_goaway
772 ? grpc_error_set_int(GRPC_ERROR_CREATE("Server shutdown"),
773 StatusIntProperty::kRpcStatus, GRPC_STATUS_OK)
774 : absl::OkStatus();
775 sc->slice = grpc_slice_from_copied_string("Server shutdown");
776 op->disconnect_with_error = send_disconnect;
777 elem = grpc_channel_stack_element(channel->channel_stack(), 0);
778 elem->filter->start_transport_op(elem, op);
779 }
780
781 std::vector<RefCountedPtr<Channel>> channels_;
782 };
783
784 } // namespace
785
786 //
787 // Server
788 //
789
790 const grpc_channel_filter Server::kServerTopFilter = {
791 Server::CallData::StartTransportStreamOpBatch,
792 Server::ChannelData::MakeCallPromise,
__anone6a6c4b10902() 793 [](grpc_channel_element*, CallSpineInterface*) {
794 // TODO(ctiller): remove the server filter when call-v3 is finalized
795 },
796 grpc_channel_next_op,
797 sizeof(Server::CallData),
798 Server::CallData::InitCallElement,
799 grpc_call_stack_ignore_set_pollset_or_pollset_set,
800 Server::CallData::DestroyCallElement,
801 sizeof(Server::ChannelData),
802 Server::ChannelData::InitChannelElement,
803 grpc_channel_stack_no_post_init,
804 Server::ChannelData::DestroyChannelElement,
805 grpc_channel_next_get_info,
806 "server",
807 };
808
809 namespace {
810
CreateChannelzNode(const ChannelArgs & args)811 RefCountedPtr<channelz::ServerNode> CreateChannelzNode(
812 const ChannelArgs& args) {
813 RefCountedPtr<channelz::ServerNode> channelz_node;
814 if (args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
815 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
816 size_t channel_tracer_max_memory = std::max(
817 0, args.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
818 .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT));
819 channelz_node =
820 MakeRefCounted<channelz::ServerNode>(channel_tracer_max_memory);
821 channelz_node->AddTraceEvent(
822 channelz::ChannelTrace::Severity::Info,
823 grpc_slice_from_static_string("Server created"));
824 }
825 return channelz_node;
826 }
827
828 } // namespace
829
Server(const ChannelArgs & args)830 Server::Server(const ChannelArgs& args)
831 : channel_args_(args),
832 channelz_node_(CreateChannelzNode(args)),
833 server_call_tracer_factory_(ServerCallTracerFactory::Get(args)),
834 max_time_in_pending_queue_(Duration::Seconds(
835 channel_args_
836 .GetInt(GRPC_ARG_SERVER_MAX_UNREQUESTED_TIME_IN_SERVER_SECONDS)
837 .value_or(30))) {}
838
~Server()839 Server::~Server() {
840 // Remove the cq pollsets from the config_fetcher.
841 if (started_ && config_fetcher_ != nullptr &&
842 config_fetcher_->interested_parties() != nullptr) {
843 for (grpc_pollset* pollset : pollsets_) {
844 grpc_pollset_set_del_pollset(config_fetcher_->interested_parties(),
845 pollset);
846 }
847 }
848 for (size_t i = 0; i < cqs_.size(); i++) {
849 GRPC_CQ_INTERNAL_UNREF(cqs_[i], "server");
850 }
851 }
852
AddListener(OrphanablePtr<ListenerInterface> listener)853 void Server::AddListener(OrphanablePtr<ListenerInterface> listener) {
854 channelz::ListenSocketNode* listen_socket_node =
855 listener->channelz_listen_socket_node();
856 if (listen_socket_node != nullptr && channelz_node_ != nullptr) {
857 channelz_node_->AddChildListenSocket(
858 listen_socket_node->RefAsSubclass<channelz::ListenSocketNode>());
859 }
860 listeners_.emplace_back(std::move(listener));
861 }
862
Start()863 void Server::Start() {
864 auto make_real_request_matcher =
865 [this]() -> std::unique_ptr<RequestMatcherInterface> {
866 if (IsPromiseBasedServerCallEnabled()) {
867 return std::make_unique<RealRequestMatcherPromises>(this);
868 } else {
869 return std::make_unique<RealRequestMatcherFilterStack>(this);
870 }
871 };
872
873 started_ = true;
874 for (grpc_completion_queue* cq : cqs_) {
875 if (grpc_cq_can_listen(cq)) {
876 pollsets_.push_back(grpc_cq_pollset(cq));
877 }
878 }
879 if (unregistered_request_matcher_ == nullptr) {
880 unregistered_request_matcher_ = make_real_request_matcher();
881 }
882 for (auto& rm : registered_methods_) {
883 if (rm.second->matcher == nullptr) {
884 rm.second->matcher = make_real_request_matcher();
885 }
886 }
887 {
888 MutexLock lock(&mu_global_);
889 starting_ = true;
890 }
891 // Register the interested parties from the config fetcher to the cq pollsets
892 // before starting listeners so that config fetcher is being polled when the
893 // listeners start watch the fetcher.
894 if (config_fetcher_ != nullptr &&
895 config_fetcher_->interested_parties() != nullptr) {
896 for (grpc_pollset* pollset : pollsets_) {
897 grpc_pollset_set_add_pollset(config_fetcher_->interested_parties(),
898 pollset);
899 }
900 }
901 for (auto& listener : listeners_) {
902 listener.listener->Start(this, &pollsets_);
903 }
904 MutexLock lock(&mu_global_);
905 starting_ = false;
906 starting_cv_.Signal();
907 }
908
SetupTransport(Transport * transport,grpc_pollset * accepting_pollset,const ChannelArgs & args,const RefCountedPtr<channelz::SocketNode> & socket_node)909 grpc_error_handle Server::SetupTransport(
910 Transport* transport, grpc_pollset* accepting_pollset,
911 const ChannelArgs& args,
912 const RefCountedPtr<channelz::SocketNode>& socket_node) {
913 // Create channel.
914 global_stats().IncrementServerChannelsCreated();
915 absl::StatusOr<OrphanablePtr<Channel>> channel =
916 LegacyChannel::Create("", args.SetObject(transport), GRPC_SERVER_CHANNEL);
917 if (!channel.ok()) {
918 return absl_status_to_grpc_error(channel.status());
919 }
920 ChannelData* chand = static_cast<ChannelData*>(
921 grpc_channel_stack_element((*channel)->channel_stack(), 0)->channel_data);
922 // Set up CQs.
923 size_t cq_idx;
924 for (cq_idx = 0; cq_idx < cqs_.size(); cq_idx++) {
925 if (grpc_cq_pollset(cqs_[cq_idx]) == accepting_pollset) break;
926 }
927 if (cq_idx == cqs_.size()) {
928 // Completion queue not found. Pick a random one to publish new calls to.
929 cq_idx = static_cast<size_t>(rand()) % std::max<size_t>(1, cqs_.size());
930 }
931 // Set up channelz node.
932 intptr_t channelz_socket_uuid = 0;
933 if (socket_node != nullptr) {
934 channelz_socket_uuid = socket_node->uuid();
935 channelz_node_->AddChildSocket(socket_node);
936 }
937 // Initialize chand.
938 chand->InitTransport(Ref(), std::move(*channel), cq_idx, transport,
939 channelz_socket_uuid);
940 return absl::OkStatus();
941 }
942
HasOpenConnections()943 bool Server::HasOpenConnections() {
944 MutexLock lock(&mu_global_);
945 return !channels_.empty();
946 }
947
SetRegisteredMethodAllocator(grpc_completion_queue * cq,void * method_tag,std::function<RegisteredCallAllocation ()> allocator)948 void Server::SetRegisteredMethodAllocator(
949 grpc_completion_queue* cq, void* method_tag,
950 std::function<RegisteredCallAllocation()> allocator) {
951 RegisteredMethod* rm = static_cast<RegisteredMethod*>(method_tag);
952 rm->matcher = std::make_unique<AllocatingRequestMatcherRegistered>(
953 this, cq, rm, std::move(allocator));
954 }
955
SetBatchMethodAllocator(grpc_completion_queue * cq,std::function<BatchCallAllocation ()> allocator)956 void Server::SetBatchMethodAllocator(
957 grpc_completion_queue* cq, std::function<BatchCallAllocation()> allocator) {
958 GPR_DEBUG_ASSERT(unregistered_request_matcher_ == nullptr);
959 unregistered_request_matcher_ =
960 std::make_unique<AllocatingRequestMatcherBatch>(this, cq,
961 std::move(allocator));
962 }
963
RegisterCompletionQueue(grpc_completion_queue * cq)964 void Server::RegisterCompletionQueue(grpc_completion_queue* cq) {
965 for (grpc_completion_queue* queue : cqs_) {
966 if (queue == cq) return;
967 }
968 GRPC_CQ_INTERNAL_REF(cq, "server");
969 cqs_.push_back(cq);
970 }
971
RegisterMethod(const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)972 Server::RegisteredMethod* Server::RegisterMethod(
973 const char* method, const char* host,
974 grpc_server_register_method_payload_handling payload_handling,
975 uint32_t flags) {
976 if (started_) {
977 Crash("Attempting to register method after server started");
978 }
979
980 if (!method) {
981 gpr_log(GPR_ERROR,
982 "grpc_server_register_method method string cannot be NULL");
983 return nullptr;
984 }
985 auto key = std::make_pair(host ? host : "", method);
986 if (registered_methods_.find(key) != registered_methods_.end()) {
987 gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
988 host ? host : "*");
989 return nullptr;
990 }
991 if (flags != 0) {
992 gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",
993 flags);
994 return nullptr;
995 }
996 auto it = registered_methods_.emplace(
997 key, std::make_unique<RegisteredMethod>(method, host, payload_handling,
998 flags));
999 return it.first->second.get();
1000 }
1001
DoneRequestEvent(void * req,grpc_cq_completion *)1002 void Server::DoneRequestEvent(void* req, grpc_cq_completion* /*c*/) {
1003 delete static_cast<RequestedCall*>(req);
1004 }
1005
FailCall(size_t cq_idx,RequestedCall * rc,grpc_error_handle error)1006 void Server::FailCall(size_t cq_idx, RequestedCall* rc,
1007 grpc_error_handle error) {
1008 *rc->call = nullptr;
1009 rc->initial_metadata->count = 0;
1010 GPR_ASSERT(!error.ok());
1011 grpc_cq_end_op(cqs_[cq_idx], rc->tag, error, DoneRequestEvent, rc,
1012 &rc->completion);
1013 }
1014
1015 // Before calling MaybeFinishShutdown(), we must hold mu_global_ and not
1016 // hold mu_call_.
MaybeFinishShutdown()1017 void Server::MaybeFinishShutdown() {
1018 if (!ShutdownReady() || shutdown_published_) {
1019 return;
1020 }
1021 {
1022 MutexLock lock(&mu_call_);
1023 KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1024 }
1025 if (!channels_.empty() || listeners_destroyed_ < listeners_.size()) {
1026 if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
1027 last_shutdown_message_time_),
1028 gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
1029 last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1030 gpr_log(GPR_DEBUG,
1031 "Waiting for %" PRIuPTR " channels and %" PRIuPTR "/%" PRIuPTR
1032 " listeners to be destroyed before shutting down server",
1033 channels_.size(), listeners_.size() - listeners_destroyed_,
1034 listeners_.size());
1035 }
1036 return;
1037 }
1038 shutdown_published_ = true;
1039 for (auto& shutdown_tag : shutdown_tags_) {
1040 Ref().release();
1041 grpc_cq_end_op(shutdown_tag.cq, shutdown_tag.tag, absl::OkStatus(),
1042 DoneShutdownEvent, this, &shutdown_tag.completion);
1043 }
1044 }
1045
KillPendingWorkLocked(grpc_error_handle error)1046 void Server::KillPendingWorkLocked(grpc_error_handle error) {
1047 if (started_) {
1048 unregistered_request_matcher_->KillRequests(error);
1049 unregistered_request_matcher_->ZombifyPending();
1050 for (auto& rm : registered_methods_) {
1051 rm.second->matcher->KillRequests(error);
1052 rm.second->matcher->ZombifyPending();
1053 }
1054 }
1055 }
1056
GetChannelsLocked() const1057 std::vector<RefCountedPtr<Channel>> Server::GetChannelsLocked() const {
1058 std::vector<RefCountedPtr<Channel>> channels;
1059 channels.reserve(channels_.size());
1060 for (const ChannelData* chand : channels_) {
1061 channels.push_back(chand->channel()->Ref());
1062 }
1063 return channels;
1064 }
1065
ListenerDestroyDone(void * arg,grpc_error_handle)1066 void Server::ListenerDestroyDone(void* arg, grpc_error_handle /*error*/) {
1067 Server* server = static_cast<Server*>(arg);
1068 MutexLock lock(&server->mu_global_);
1069 server->listeners_destroyed_++;
1070 server->MaybeFinishShutdown();
1071 }
1072
1073 namespace {
1074
DonePublishedShutdown(void *,grpc_cq_completion * storage)1075 void DonePublishedShutdown(void* /*done_arg*/, grpc_cq_completion* storage) {
1076 delete storage;
1077 }
1078
1079 } // namespace
1080
1081 // - Kills all pending requests-for-incoming-RPC-calls (i.e., the requests made
1082 // via grpc_server_request_call() and grpc_server_request_registered_call()
1083 // will now be cancelled). See KillPendingWorkLocked().
1084 //
1085 // - Shuts down the listeners (i.e., the server will no longer listen on the
1086 // port for new incoming channels).
1087 //
1088 // - Iterates through all channels on the server and sends shutdown msg (see
1089 // ChannelBroadcaster::BroadcastShutdown() for details) to the clients via
1090 // the transport layer. The transport layer then guarantees the following:
1091 // -- Sends shutdown to the client (e.g., HTTP2 transport sends GOAWAY).
1092 // -- If the server has outstanding calls that are in the process, the
1093 // connection is NOT closed until the server is done with all those calls.
1094 // -- Once there are no more calls in progress, the channel is closed.
ShutdownAndNotify(grpc_completion_queue * cq,void * tag)1095 void Server::ShutdownAndNotify(grpc_completion_queue* cq, void* tag) {
1096 ChannelBroadcaster broadcaster;
1097 {
1098 // Wait for startup to be finished. Locks mu_global.
1099 MutexLock lock(&mu_global_);
1100 while (starting_) {
1101 starting_cv_.Wait(&mu_global_);
1102 }
1103 // Stay locked, and gather up some stuff to do.
1104 GPR_ASSERT(grpc_cq_begin_op(cq, tag));
1105 if (shutdown_published_) {
1106 grpc_cq_end_op(cq, tag, absl::OkStatus(), DonePublishedShutdown, nullptr,
1107 new grpc_cq_completion);
1108 return;
1109 }
1110 shutdown_tags_.emplace_back(tag, cq);
1111 if (ShutdownCalled()) {
1112 return;
1113 }
1114 last_shutdown_message_time_ = gpr_now(GPR_CLOCK_REALTIME);
1115 broadcaster.FillChannelsLocked(GetChannelsLocked());
1116 // Collect all unregistered then registered calls.
1117 {
1118 MutexLock lock(&mu_call_);
1119 KillPendingWorkLocked(GRPC_ERROR_CREATE("Server Shutdown"));
1120 }
1121 ShutdownUnrefOnShutdownCall();
1122 }
1123 StopListening();
1124 broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1125 }
1126
StopListening()1127 void Server::StopListening() {
1128 for (auto& listener : listeners_) {
1129 if (listener.listener == nullptr) continue;
1130 channelz::ListenSocketNode* channelz_listen_socket_node =
1131 listener.listener->channelz_listen_socket_node();
1132 if (channelz_node_ != nullptr && channelz_listen_socket_node != nullptr) {
1133 channelz_node_->RemoveChildListenSocket(
1134 channelz_listen_socket_node->uuid());
1135 }
1136 GRPC_CLOSURE_INIT(&listener.destroy_done, ListenerDestroyDone, this,
1137 grpc_schedule_on_exec_ctx);
1138 listener.listener->SetOnDestroyDone(&listener.destroy_done);
1139 listener.listener.reset();
1140 }
1141 }
1142
CancelAllCalls()1143 void Server::CancelAllCalls() {
1144 ChannelBroadcaster broadcaster;
1145 {
1146 MutexLock lock(&mu_global_);
1147 broadcaster.FillChannelsLocked(GetChannelsLocked());
1148 }
1149 broadcaster.BroadcastShutdown(
1150 /*send_goaway=*/false, GRPC_ERROR_CREATE("Cancelling all calls"));
1151 }
1152
SendGoaways()1153 void Server::SendGoaways() {
1154 ChannelBroadcaster broadcaster;
1155 {
1156 MutexLock lock(&mu_global_);
1157 broadcaster.FillChannelsLocked(GetChannelsLocked());
1158 }
1159 broadcaster.BroadcastShutdown(/*send_goaway=*/true, absl::OkStatus());
1160 }
1161
Orphan()1162 void Server::Orphan() {
1163 {
1164 MutexLock lock(&mu_global_);
1165 GPR_ASSERT(ShutdownCalled() || listeners_.empty());
1166 GPR_ASSERT(listeners_destroyed_ == listeners_.size());
1167 }
1168 Unref();
1169 }
1170
ValidateServerRequest(grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1171 grpc_call_error Server::ValidateServerRequest(
1172 grpc_completion_queue* cq_for_notification, void* tag,
1173 grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1174 if ((rm == nullptr && optional_payload != nullptr) ||
1175 ((rm != nullptr) && ((optional_payload == nullptr) !=
1176 (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)))) {
1177 return GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
1178 }
1179 if (!grpc_cq_begin_op(cq_for_notification, tag)) {
1180 return GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
1181 }
1182 return GRPC_CALL_OK;
1183 }
1184
ValidateServerRequestAndCq(size_t * cq_idx,grpc_completion_queue * cq_for_notification,void * tag,grpc_byte_buffer ** optional_payload,RegisteredMethod * rm)1185 grpc_call_error Server::ValidateServerRequestAndCq(
1186 size_t* cq_idx, grpc_completion_queue* cq_for_notification, void* tag,
1187 grpc_byte_buffer** optional_payload, RegisteredMethod* rm) {
1188 size_t idx;
1189 for (idx = 0; idx < cqs_.size(); idx++) {
1190 if (cqs_[idx] == cq_for_notification) {
1191 break;
1192 }
1193 }
1194 if (idx == cqs_.size()) {
1195 return GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1196 }
1197 grpc_call_error error =
1198 ValidateServerRequest(cq_for_notification, tag, optional_payload, rm);
1199 if (error != GRPC_CALL_OK) {
1200 return error;
1201 }
1202 *cq_idx = idx;
1203 return GRPC_CALL_OK;
1204 }
1205
QueueRequestedCall(size_t cq_idx,RequestedCall * rc)1206 grpc_call_error Server::QueueRequestedCall(size_t cq_idx, RequestedCall* rc) {
1207 if (ShutdownCalled()) {
1208 FailCall(cq_idx, rc, GRPC_ERROR_CREATE("Server Shutdown"));
1209 return GRPC_CALL_OK;
1210 }
1211 RequestMatcherInterface* rm;
1212 switch (rc->type) {
1213 case RequestedCall::Type::BATCH_CALL:
1214 rm = unregistered_request_matcher_.get();
1215 break;
1216 case RequestedCall::Type::REGISTERED_CALL:
1217 rm = rc->data.registered.method->matcher.get();
1218 break;
1219 }
1220 rm->RequestCallWithPossiblePublish(cq_idx, rc);
1221 return GRPC_CALL_OK;
1222 }
1223
RequestCall(grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1224 grpc_call_error Server::RequestCall(grpc_call** call,
1225 grpc_call_details* details,
1226 grpc_metadata_array* request_metadata,
1227 grpc_completion_queue* cq_bound_to_call,
1228 grpc_completion_queue* cq_for_notification,
1229 void* tag) {
1230 size_t cq_idx;
1231 grpc_call_error error = ValidateServerRequestAndCq(
1232 &cq_idx, cq_for_notification, tag, nullptr, nullptr);
1233 if (error != GRPC_CALL_OK) {
1234 return error;
1235 }
1236 RequestedCall* rc =
1237 new RequestedCall(tag, cq_bound_to_call, call, request_metadata, details);
1238 return QueueRequestedCall(cq_idx, rc);
1239 }
1240
RequestRegisteredCall(RegisteredMethod * rm,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1241 grpc_call_error Server::RequestRegisteredCall(
1242 RegisteredMethod* rm, grpc_call** call, gpr_timespec* deadline,
1243 grpc_metadata_array* request_metadata, grpc_byte_buffer** optional_payload,
1244 grpc_completion_queue* cq_bound_to_call,
1245 grpc_completion_queue* cq_for_notification, void* tag_new) {
1246 size_t cq_idx;
1247 grpc_call_error error = ValidateServerRequestAndCq(
1248 &cq_idx, cq_for_notification, tag_new, optional_payload, rm);
1249 if (error != GRPC_CALL_OK) {
1250 return error;
1251 }
1252 RequestedCall* rc =
1253 new RequestedCall(tag_new, cq_bound_to_call, call, request_metadata, rm,
1254 deadline, optional_payload);
1255 return QueueRequestedCall(cq_idx, rc);
1256 }
1257
1258 //
1259 // Server::ChannelData::ConnectivityWatcher
1260 //
1261
1262 class Server::ChannelData::ConnectivityWatcher
1263 : public AsyncConnectivityStateWatcherInterface {
1264 public:
ConnectivityWatcher(ChannelData * chand)1265 explicit ConnectivityWatcher(ChannelData* chand)
1266 : chand_(chand), channel_(chand_->channel_->Ref()) {}
1267
1268 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status &)1269 void OnConnectivityStateChange(grpc_connectivity_state new_state,
1270 const absl::Status& /*status*/) override {
1271 // Don't do anything until we are being shut down.
1272 if (new_state != GRPC_CHANNEL_SHUTDOWN) return;
1273 // Shut down channel.
1274 MutexLock lock(&chand_->server_->mu_global_);
1275 chand_->Destroy();
1276 }
1277
1278 ChannelData* const chand_;
1279 const RefCountedPtr<Channel> channel_;
1280 };
1281
1282 //
1283 // Server::ChannelData
1284 //
1285
~ChannelData()1286 Server::ChannelData::~ChannelData() {
1287 if (server_ != nullptr) {
1288 if (server_->channelz_node_ != nullptr && channelz_socket_uuid_ != 0) {
1289 server_->channelz_node_->RemoveChildSocket(channelz_socket_uuid_);
1290 }
1291 {
1292 MutexLock lock(&server_->mu_global_);
1293 if (list_position_.has_value()) {
1294 server_->channels_.erase(*list_position_);
1295 list_position_.reset();
1296 }
1297 server_->MaybeFinishShutdown();
1298 }
1299 }
1300 }
1301
CreateArena()1302 Arena* Server::ChannelData::CreateArena() { return channel_->CreateArena(); }
1303
CreateCall(ClientMetadata & client_initial_metadata,Arena * arena)1304 absl::StatusOr<CallInitiator> Server::ChannelData::CreateCall(
1305 ClientMetadata& client_initial_metadata, Arena* arena) {
1306 SetRegisteredMethodOnMetadata(client_initial_metadata);
1307 auto call = MakeServerCall(server_.get(), channel_.get(), arena);
1308 InitCall(call);
1309 return CallInitiator(std::move(call));
1310 }
1311
InitTransport(RefCountedPtr<Server> server,OrphanablePtr<Channel> channel,size_t cq_idx,Transport * transport,intptr_t channelz_socket_uuid)1312 void Server::ChannelData::InitTransport(RefCountedPtr<Server> server,
1313 OrphanablePtr<Channel> channel,
1314 size_t cq_idx, Transport* transport,
1315 intptr_t channelz_socket_uuid) {
1316 server_ = std::move(server);
1317 channel_ = std::move(channel);
1318 cq_idx_ = cq_idx;
1319 channelz_socket_uuid_ = channelz_socket_uuid;
1320 // Publish channel.
1321 {
1322 MutexLock lock(&server_->mu_global_);
1323 server_->channels_.push_front(this);
1324 list_position_ = server_->channels_.begin();
1325 }
1326 // Start accept_stream transport op.
1327 grpc_transport_op* op = grpc_make_transport_op(nullptr);
1328 int accept_stream_types = 0;
1329 if (transport->filter_stack_transport() != nullptr) {
1330 ++accept_stream_types;
1331 op->set_accept_stream = true;
1332 op->set_accept_stream_fn = AcceptStream;
1333 op->set_registered_method_matcher_fn = [](void* arg,
1334 ClientMetadata* metadata) {
1335 static_cast<ChannelData*>(arg)->SetRegisteredMethodOnMetadata(*metadata);
1336 };
1337 op->set_accept_stream_user_data = this;
1338 }
1339 if (transport->server_transport() != nullptr) {
1340 ++accept_stream_types;
1341 transport->server_transport()->SetAcceptor(this);
1342 }
1343 GPR_ASSERT(accept_stream_types == 1);
1344 op->start_connectivity_watch = MakeOrphanable<ConnectivityWatcher>(this);
1345 if (server_->ShutdownCalled()) {
1346 op->disconnect_with_error = GRPC_ERROR_CREATE("Server shutdown");
1347 }
1348 transport->PerformOp(op);
1349 }
1350
GetRegisteredMethod(const absl::string_view & host,const absl::string_view & path)1351 Server::RegisteredMethod* Server::ChannelData::GetRegisteredMethod(
1352 const absl::string_view& host, const absl::string_view& path) {
1353 if (server_->registered_methods_.empty()) return nullptr;
1354 // check for an exact match with host
1355 auto it = server_->registered_methods_.find(std::make_pair(host, path));
1356 if (it != server_->registered_methods_.end()) {
1357 return it->second.get();
1358 }
1359 // check for wildcard method definition (no host set)
1360 it = server_->registered_methods_.find(std::make_pair("", path));
1361 if (it != server_->registered_methods_.end()) {
1362 return it->second.get();
1363 }
1364 return nullptr;
1365 }
1366
SetRegisteredMethodOnMetadata(ClientMetadata & metadata)1367 void Server::ChannelData::SetRegisteredMethodOnMetadata(
1368 ClientMetadata& metadata) {
1369 auto* authority = metadata.get_pointer(HttpAuthorityMetadata());
1370 if (authority == nullptr) {
1371 authority = metadata.get_pointer(HostMetadata());
1372 if (authority == nullptr) {
1373 // Authority not being set is an RPC error.
1374 return;
1375 }
1376 }
1377 auto* path = metadata.get_pointer(HttpPathMetadata());
1378 if (path == nullptr) {
1379 // Path not being set would result in an RPC error.
1380 return;
1381 }
1382 RegisteredMethod* method =
1383 GetRegisteredMethod(authority->as_string_view(), path->as_string_view());
1384 // insert in metadata
1385 metadata.Set(GrpcRegisteredMethod(), method);
1386 }
1387
AcceptStream(void * arg,Transport *,const void * transport_server_data)1388 void Server::ChannelData::AcceptStream(void* arg, Transport* /*transport*/,
1389 const void* transport_server_data) {
1390 auto* chand = static_cast<Server::ChannelData*>(arg);
1391 // create a call
1392 grpc_call_create_args args;
1393 args.channel = chand->channel_->Ref();
1394 args.server = chand->server_.get();
1395 args.parent = nullptr;
1396 args.propagation_mask = 0;
1397 args.cq = nullptr;
1398 args.pollset_set_alternative = nullptr;
1399 args.server_transport_data = transport_server_data;
1400 args.send_deadline = Timestamp::InfFuture();
1401 grpc_call* call;
1402 grpc_error_handle error = grpc_call_create(&args, &call);
1403 grpc_call_stack* call_stack = grpc_call_get_call_stack(call);
1404 if (call_stack == nullptr) { // Promise based calls do not have a call stack
1405 GPR_ASSERT(error.ok());
1406 GPR_ASSERT(IsPromiseBasedServerCallEnabled());
1407 return;
1408 } else {
1409 grpc_call_element* elem = grpc_call_stack_element(call_stack, 0);
1410 auto* calld = static_cast<Server::CallData*>(elem->call_data);
1411 if (!error.ok()) {
1412 calld->FailCallCreation();
1413 return;
1414 }
1415 calld->Start(elem);
1416 }
1417 }
1418
1419 namespace {
CancelledDueToServerShutdown()1420 auto CancelledDueToServerShutdown() {
1421 return [] {
1422 return ServerMetadataFromStatus(absl::CancelledError("Server shutdown"));
1423 };
1424 }
1425 } // namespace
1426
InitCall(RefCountedPtr<CallSpineInterface> call)1427 void Server::ChannelData::InitCall(RefCountedPtr<CallSpineInterface> call) {
1428 call->SpawnGuarded("request_matcher", [this, call]() {
1429 return TrySeq(
1430 // Wait for initial metadata to pass through all filters
1431 Map(call->client_initial_metadata().receiver.Next(),
1432 [](NextResult<ClientMetadataHandle> md)
1433 -> absl::StatusOr<ClientMetadataHandle> {
1434 if (!md.has_value()) {
1435 return absl::InternalError("Missing metadata");
1436 }
1437 if (!md.value()->get_pointer(HttpPathMetadata())) {
1438 return absl::InternalError("Missing :path header");
1439 }
1440 if (!md.value()->get_pointer(HttpAuthorityMetadata())) {
1441 return absl::InternalError("Missing :authority header");
1442 }
1443 return std::move(*md);
1444 }),
1445 // Match request with requested call
1446 [this, call](ClientMetadataHandle md) {
1447 auto* registered_method = static_cast<RegisteredMethod*>(
1448 md->get(GrpcRegisteredMethod()).value_or(nullptr));
1449 RequestMatcherInterface* rm;
1450 grpc_server_register_method_payload_handling payload_handling =
1451 GRPC_SRM_PAYLOAD_NONE;
1452 if (registered_method == nullptr) {
1453 rm = server_->unregistered_request_matcher_.get();
1454 } else {
1455 payload_handling = registered_method->payload_handling;
1456 rm = registered_method->matcher.get();
1457 }
1458 auto maybe_read_first_message = If(
1459 payload_handling == GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER,
1460 [call]() {
1461 return call->client_to_server_messages().receiver.Next();
1462 },
1463 []() -> NextResult<MessageHandle> {
1464 return NextResult<MessageHandle>();
1465 });
1466 return TryJoin<absl::StatusOr>(
1467 Map(std::move(maybe_read_first_message),
1468 [](NextResult<MessageHandle> n) {
1469 return ValueOrFailure<NextResult<MessageHandle>>{
1470 std::move(n)};
1471 }),
1472 rm->MatchRequest(cq_idx()), [md = std::move(md)]() mutable {
1473 return ValueOrFailure<ClientMetadataHandle>(std::move(md));
1474 });
1475 },
1476 // Publish call to cq
1477 [](std::tuple<NextResult<MessageHandle>,
1478 RequestMatcherInterface::MatchResult,
1479 ClientMetadataHandle>
1480 r) {
1481 RequestMatcherInterface::MatchResult& mr = std::get<1>(r);
1482 auto md = std::move(std::get<2>(r));
1483 auto* rc = mr.TakeCall();
1484 rc->Complete(std::move(std::get<0>(r)), *md);
1485 auto* call_context = GetContext<CallContext>();
1486 *rc->call = call_context->c_call();
1487 grpc_call_ref(*rc->call);
1488 grpc_call_set_completion_queue(call_context->c_call(),
1489 rc->cq_bound_to_call);
1490 call_context->server_call_context()->PublishInitialMetadata(
1491 std::move(md), rc->initial_metadata);
1492 // TODO(ctiller): publish metadata
1493 return Map(WaitForCqEndOp(false, rc->tag, absl::OkStatus(), mr.cq()),
1494 [rc = std::unique_ptr<RequestedCall>(rc)](Empty) {
1495 return absl::OkStatus();
1496 });
1497 });
1498 });
1499 }
1500
MakeCallPromise(grpc_channel_element * elem,CallArgs call_args,NextPromiseFactory)1501 ArenaPromise<ServerMetadataHandle> Server::ChannelData::MakeCallPromise(
1502 grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
1503 auto* chand = static_cast<Server::ChannelData*>(elem->channel_data);
1504 auto* server = chand->server_.get();
1505 if (server->ShutdownCalled()) return CancelledDueToServerShutdown();
1506 auto cleanup_ref =
1507 absl::MakeCleanup([server] { server->ShutdownUnrefOnRequest(); });
1508 if (!server->ShutdownRefOnRequest()) return CancelledDueToServerShutdown();
1509 auto path_ptr =
1510 call_args.client_initial_metadata->get_pointer(HttpPathMetadata());
1511 if (path_ptr == nullptr) {
1512 return [] {
1513 return ServerMetadataFromStatus(
1514 absl::InternalError("Missing :path header"));
1515 };
1516 }
1517 auto host_ptr =
1518 call_args.client_initial_metadata->get_pointer(HttpAuthorityMetadata());
1519 if (host_ptr == nullptr) {
1520 return [] {
1521 return ServerMetadataFromStatus(
1522 absl::InternalError("Missing :authority header"));
1523 };
1524 }
1525 // Find request matcher.
1526 RequestMatcherInterface* matcher;
1527 RegisteredMethod* rm = static_cast<RegisteredMethod*>(
1528 call_args.client_initial_metadata->get(GrpcRegisteredMethod())
1529 .value_or(nullptr));
1530 ArenaPromise<absl::StatusOr<NextResult<MessageHandle>>>
1531 maybe_read_first_message([] { return NextResult<MessageHandle>(); });
1532 if (rm != nullptr) {
1533 matcher = rm->matcher.get();
1534 switch (rm->payload_handling) {
1535 case GRPC_SRM_PAYLOAD_NONE:
1536 break;
1537 case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER:
1538 maybe_read_first_message =
1539 Map(call_args.client_to_server_messages->Next(),
1540 [](NextResult<MessageHandle> msg)
1541 -> absl::StatusOr<NextResult<MessageHandle>> {
1542 return std::move(msg);
1543 });
1544 }
1545 } else {
1546 matcher = server->unregistered_request_matcher_.get();
1547 }
1548 return TrySeq(
1549 std::move(maybe_read_first_message),
1550 [cleanup_ref = std::move(cleanup_ref), matcher,
1551 chand](NextResult<MessageHandle> payload) mutable {
1552 return Map(
1553 [cleanup_ref = std::move(cleanup_ref),
1554 mr = matcher->MatchRequest(chand->cq_idx())]() mutable {
1555 return mr();
1556 },
1557 [payload = std::move(payload)](
1558 absl::StatusOr<RequestMatcherInterface::MatchResult> mr) mutable
1559 -> absl::StatusOr<std::pair<RequestMatcherInterface::MatchResult,
1560 NextResult<MessageHandle>>> {
1561 if (!mr.ok()) return mr.status();
1562 return std::make_pair(std::move(*mr), std::move(payload));
1563 });
1564 },
1565 [call_args =
1566 std::move(call_args)](std::pair<RequestMatcherInterface::MatchResult,
1567 NextResult<MessageHandle>>
1568 r) mutable {
1569 auto& mr = r.first;
1570 auto& payload = r.second;
1571 auto* rc = mr.TakeCall();
1572 auto* cq_for_new_request = mr.cq();
1573 auto* server_call_context =
1574 GetContext<CallContext>()->server_call_context();
1575 rc->Complete(std::move(payload), *call_args.client_initial_metadata);
1576 server_call_context->PublishInitialMetadata(
1577 std::move(call_args.client_initial_metadata), rc->initial_metadata);
1578 return server_call_context->MakeTopOfServerCallPromise(
1579 std::move(call_args), rc->cq_bound_to_call,
1580 [rc, cq_for_new_request](grpc_call* call) {
1581 *rc->call = call;
1582 grpc_cq_end_op(cq_for_new_request, rc->tag, absl::OkStatus(),
1583 Server::DoneRequestEvent, rc, &rc->completion,
1584 true);
1585 });
1586 });
1587 }
1588
FinishDestroy(void * arg,grpc_error_handle)1589 void Server::ChannelData::FinishDestroy(void* arg,
1590 grpc_error_handle /*error*/) {
1591 auto* chand = static_cast<Server::ChannelData*>(arg);
1592 Server* server = chand->server_.get();
1593 auto* channel_stack = chand->channel_->channel_stack();
1594 chand->channel_.reset();
1595 server->Unref();
1596 GRPC_CHANNEL_STACK_UNREF(channel_stack, "Server::ChannelData::Destroy");
1597 }
1598
Destroy()1599 void Server::ChannelData::Destroy() {
1600 if (!list_position_.has_value()) return;
1601 GPR_ASSERT(server_ != nullptr);
1602 server_->channels_.erase(*list_position_);
1603 list_position_.reset();
1604 server_->Ref().release();
1605 server_->MaybeFinishShutdown();
1606 // Unreffed by FinishDestroy
1607 GRPC_CHANNEL_STACK_REF(channel_->channel_stack(),
1608 "Server::ChannelData::Destroy");
1609 GRPC_CLOSURE_INIT(&finish_destroy_channel_closure_, FinishDestroy, this,
1610 grpc_schedule_on_exec_ctx);
1611 if (GRPC_TRACE_FLAG_ENABLED(grpc_server_channel_trace)) {
1612 gpr_log(GPR_INFO, "Disconnected client");
1613 }
1614 grpc_transport_op* op =
1615 grpc_make_transport_op(&finish_destroy_channel_closure_);
1616 op->set_accept_stream = true;
1617 grpc_channel_next_op(grpc_channel_stack_element(channel_->channel_stack(), 0),
1618 op);
1619 }
1620
InitChannelElement(grpc_channel_element * elem,grpc_channel_element_args * args)1621 grpc_error_handle Server::ChannelData::InitChannelElement(
1622 grpc_channel_element* elem, grpc_channel_element_args* args) {
1623 GPR_ASSERT(args->is_first);
1624 GPR_ASSERT(!args->is_last);
1625 new (elem->channel_data) ChannelData();
1626 return absl::OkStatus();
1627 }
1628
DestroyChannelElement(grpc_channel_element * elem)1629 void Server::ChannelData::DestroyChannelElement(grpc_channel_element* elem) {
1630 auto* chand = static_cast<ChannelData*>(elem->channel_data);
1631 chand->~ChannelData();
1632 }
1633
1634 //
1635 // Server::CallData
1636 //
1637
CallData(grpc_call_element * elem,const grpc_call_element_args & args,RefCountedPtr<Server> server)1638 Server::CallData::CallData(grpc_call_element* elem,
1639 const grpc_call_element_args& args,
1640 RefCountedPtr<Server> server)
1641 : server_(std::move(server)),
1642 call_(grpc_call_from_top_element(elem)),
1643 call_combiner_(args.call_combiner) {
1644 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
1645 elem, grpc_schedule_on_exec_ctx);
1646 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
1647 elem, grpc_schedule_on_exec_ctx);
1648 }
1649
~CallData()1650 Server::CallData::~CallData() {
1651 GPR_ASSERT(state_.load(std::memory_order_relaxed) != CallState::PENDING);
1652 grpc_metadata_array_destroy(&initial_metadata_);
1653 grpc_byte_buffer_destroy(payload_);
1654 }
1655
SetState(CallState state)1656 void Server::CallData::SetState(CallState state) {
1657 state_.store(state, std::memory_order_relaxed);
1658 }
1659
MaybeActivate()1660 bool Server::CallData::MaybeActivate() {
1661 CallState expected = CallState::PENDING;
1662 return state_.compare_exchange_strong(expected, CallState::ACTIVATED,
1663 std::memory_order_acq_rel,
1664 std::memory_order_relaxed);
1665 }
1666
FailCallCreation()1667 void Server::CallData::FailCallCreation() {
1668 CallState expected_not_started = CallState::NOT_STARTED;
1669 CallState expected_pending = CallState::PENDING;
1670 if (state_.compare_exchange_strong(expected_not_started, CallState::ZOMBIED,
1671 std::memory_order_acq_rel,
1672 std::memory_order_acquire)) {
1673 KillZombie();
1674 } else if (state_.compare_exchange_strong(
1675 expected_pending, CallState::ZOMBIED,
1676 std::memory_order_acq_rel, std::memory_order_relaxed)) {
1677 // Zombied call will be destroyed when it's removed from the pending
1678 // queue... later.
1679 }
1680 }
1681
Start(grpc_call_element * elem)1682 void Server::CallData::Start(grpc_call_element* elem) {
1683 grpc_op op;
1684 op.op = GRPC_OP_RECV_INITIAL_METADATA;
1685 op.flags = 0;
1686 op.reserved = nullptr;
1687 op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_;
1688 GRPC_CLOSURE_INIT(&recv_initial_metadata_batch_complete_,
1689 RecvInitialMetadataBatchComplete, elem,
1690 grpc_schedule_on_exec_ctx);
1691 grpc_call_start_batch_and_execute(call_, &op, 1,
1692 &recv_initial_metadata_batch_complete_);
1693 }
1694
Publish(size_t cq_idx,RequestedCall * rc)1695 void Server::CallData::Publish(size_t cq_idx, RequestedCall* rc) {
1696 grpc_call_set_completion_queue(call_, rc->cq_bound_to_call);
1697 *rc->call = call_;
1698 cq_new_ = server_->cqs_[cq_idx];
1699 std::swap(*rc->initial_metadata, initial_metadata_);
1700 switch (rc->type) {
1701 case RequestedCall::Type::BATCH_CALL:
1702 GPR_ASSERT(host_.has_value());
1703 GPR_ASSERT(path_.has_value());
1704 rc->data.batch.details->host = CSliceRef(host_->c_slice());
1705 rc->data.batch.details->method = CSliceRef(path_->c_slice());
1706 rc->data.batch.details->deadline =
1707 deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1708 break;
1709 case RequestedCall::Type::REGISTERED_CALL:
1710 *rc->data.registered.deadline =
1711 deadline_.as_timespec(GPR_CLOCK_MONOTONIC);
1712 if (rc->data.registered.optional_payload != nullptr) {
1713 *rc->data.registered.optional_payload = payload_;
1714 payload_ = nullptr;
1715 }
1716 break;
1717 default:
1718 GPR_UNREACHABLE_CODE(return);
1719 }
1720 grpc_cq_end_op(cq_new_, rc->tag, absl::OkStatus(), Server::DoneRequestEvent,
1721 rc, &rc->completion, true);
1722 }
1723
PublishNewRpc(void * arg,grpc_error_handle error)1724 void Server::CallData::PublishNewRpc(void* arg, grpc_error_handle error) {
1725 grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);
1726 auto* calld = static_cast<Server::CallData*>(call_elem->call_data);
1727 auto* chand = static_cast<Server::ChannelData*>(call_elem->channel_data);
1728 RequestMatcherInterface* rm = calld->matcher_;
1729 Server* server = rm->server();
1730 if (!error.ok() || server->ShutdownCalled()) {
1731 calld->state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1732 calld->KillZombie();
1733 return;
1734 }
1735 rm->MatchOrQueue(chand->cq_idx(), calld);
1736 }
1737
1738 namespace {
1739
KillZombieClosure(void * call,grpc_error_handle)1740 void KillZombieClosure(void* call, grpc_error_handle /*error*/) {
1741 grpc_call_unref(static_cast<grpc_call*>(call));
1742 }
1743
1744 } // namespace
1745
KillZombie()1746 void Server::CallData::KillZombie() {
1747 GRPC_CLOSURE_INIT(&kill_zombie_closure_, KillZombieClosure, call_,
1748 grpc_schedule_on_exec_ctx);
1749 ExecCtx::Run(DEBUG_LOCATION, &kill_zombie_closure_, absl::OkStatus());
1750 }
1751
1752 // If this changes, change MakeCallPromise too.
StartNewRpc(grpc_call_element * elem)1753 void Server::CallData::StartNewRpc(grpc_call_element* elem) {
1754 if (server_->ShutdownCalled()) {
1755 state_.store(CallState::ZOMBIED, std::memory_order_relaxed);
1756 KillZombie();
1757 return;
1758 }
1759 // Find request matcher.
1760 matcher_ = server_->unregistered_request_matcher_.get();
1761 grpc_server_register_method_payload_handling payload_handling =
1762 GRPC_SRM_PAYLOAD_NONE;
1763 if (path_.has_value() && host_.has_value()) {
1764 RegisteredMethod* rm = static_cast<RegisteredMethod*>(
1765 recv_initial_metadata_->get(GrpcRegisteredMethod()).value_or(nullptr));
1766 if (rm != nullptr) {
1767 matcher_ = rm->matcher.get();
1768 payload_handling = rm->payload_handling;
1769 }
1770 }
1771 // Start recv_message op if needed.
1772 switch (payload_handling) {
1773 case GRPC_SRM_PAYLOAD_NONE:
1774 PublishNewRpc(elem, absl::OkStatus());
1775 break;
1776 case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {
1777 grpc_op op;
1778 op.op = GRPC_OP_RECV_MESSAGE;
1779 op.flags = 0;
1780 op.reserved = nullptr;
1781 op.data.recv_message.recv_message = &payload_;
1782 GRPC_CLOSURE_INIT(&publish_, PublishNewRpc, elem,
1783 grpc_schedule_on_exec_ctx);
1784 grpc_call_start_batch_and_execute(call_, &op, 1, &publish_);
1785 break;
1786 }
1787 }
1788 }
1789
RecvInitialMetadataBatchComplete(void * arg,grpc_error_handle error)1790 void Server::CallData::RecvInitialMetadataBatchComplete(
1791 void* arg, grpc_error_handle error) {
1792 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1793 auto* calld = static_cast<Server::CallData*>(elem->call_data);
1794 if (!error.ok()) {
1795 gpr_log(GPR_DEBUG, "Failed call creation: %s",
1796 StatusToString(error).c_str());
1797 calld->FailCallCreation();
1798 return;
1799 }
1800 calld->StartNewRpc(elem);
1801 }
1802
StartTransportStreamOpBatchImpl(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1803 void Server::CallData::StartTransportStreamOpBatchImpl(
1804 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1805 if (batch->recv_initial_metadata) {
1806 recv_initial_metadata_ =
1807 batch->payload->recv_initial_metadata.recv_initial_metadata;
1808 original_recv_initial_metadata_ready_ =
1809 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
1810 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1811 &recv_initial_metadata_ready_;
1812 }
1813 if (batch->recv_trailing_metadata) {
1814 original_recv_trailing_metadata_ready_ =
1815 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1816 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1817 &recv_trailing_metadata_ready_;
1818 }
1819 grpc_call_next_op(elem, batch);
1820 }
1821
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1822 void Server::CallData::RecvInitialMetadataReady(void* arg,
1823 grpc_error_handle error) {
1824 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1825 CallData* calld = static_cast<CallData*>(elem->call_data);
1826 if (error.ok()) {
1827 calld->path_ = calld->recv_initial_metadata_->Take(HttpPathMetadata());
1828 auto* host =
1829 calld->recv_initial_metadata_->get_pointer(HttpAuthorityMetadata());
1830 if (host != nullptr) calld->host_.emplace(host->Ref());
1831 }
1832 auto op_deadline = calld->recv_initial_metadata_->get(GrpcTimeoutMetadata());
1833 if (op_deadline.has_value()) {
1834 calld->deadline_ = *op_deadline;
1835 }
1836 if (calld->host_.has_value() && calld->path_.has_value()) {
1837 // do nothing
1838 } else if (error.ok()) {
1839 // Pass the error reference to calld->recv_initial_metadata_error
1840 error = absl::UnknownError("Missing :authority or :path");
1841 calld->recv_initial_metadata_error_ = error;
1842 }
1843 grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
1844 calld->original_recv_initial_metadata_ready_ = nullptr;
1845 if (calld->seen_recv_trailing_metadata_ready_) {
1846 GRPC_CALL_COMBINER_START(calld->call_combiner_,
1847 &calld->recv_trailing_metadata_ready_,
1848 calld->recv_trailing_metadata_error_,
1849 "continue server recv_trailing_metadata_ready");
1850 }
1851 Closure::Run(DEBUG_LOCATION, closure, error);
1852 }
1853
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)1854 void Server::CallData::RecvTrailingMetadataReady(void* arg,
1855 grpc_error_handle error) {
1856 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
1857 CallData* calld = static_cast<CallData*>(elem->call_data);
1858 if (calld->original_recv_initial_metadata_ready_ != nullptr) {
1859 calld->recv_trailing_metadata_error_ = error;
1860 calld->seen_recv_trailing_metadata_ready_ = true;
1861 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
1862 RecvTrailingMetadataReady, elem,
1863 grpc_schedule_on_exec_ctx);
1864 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1865 "deferring server recv_trailing_metadata_ready "
1866 "until after recv_initial_metadata_ready");
1867 return;
1868 }
1869 error = grpc_error_add_child(error, calld->recv_initial_metadata_error_);
1870 Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
1871 error);
1872 }
1873
InitCallElement(grpc_call_element * elem,const grpc_call_element_args * args)1874 grpc_error_handle Server::CallData::InitCallElement(
1875 grpc_call_element* elem, const grpc_call_element_args* args) {
1876 auto* chand = static_cast<ChannelData*>(elem->channel_data);
1877 new (elem->call_data) Server::CallData(elem, *args, chand->server());
1878 return absl::OkStatus();
1879 }
1880
DestroyCallElement(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)1881 void Server::CallData::DestroyCallElement(
1882 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1883 grpc_closure* /*ignored*/) {
1884 auto* calld = static_cast<CallData*>(elem->call_data);
1885 calld->~CallData();
1886 }
1887
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1888 void Server::CallData::StartTransportStreamOpBatch(
1889 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1890 auto* calld = static_cast<CallData*>(elem->call_data);
1891 calld->StartTransportStreamOpBatchImpl(elem, batch);
1892 }
1893
1894 } // namespace grpc_core
1895
1896 //
1897 // C-core API
1898 //
1899
grpc_server_create(const grpc_channel_args * args,void * reserved)1900 grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {
1901 grpc_core::ExecCtx exec_ctx;
1902 GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));
1903 grpc_core::Server* server =
1904 new grpc_core::Server(grpc_core::CoreConfiguration::Get()
1905 .channel_args_preconditioning()
1906 .PreconditionChannelArgs(args));
1907 return server->c_ptr();
1908 }
1909
grpc_server_register_completion_queue(grpc_server * server,grpc_completion_queue * cq,void * reserved)1910 void grpc_server_register_completion_queue(grpc_server* server,
1911 grpc_completion_queue* cq,
1912 void* reserved) {
1913 GRPC_API_TRACE(
1914 "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
1915 (server, cq, reserved));
1916 GPR_ASSERT(!reserved);
1917 auto cq_type = grpc_get_cq_completion_type(cq);
1918 if (cq_type != GRPC_CQ_NEXT && cq_type != GRPC_CQ_CALLBACK) {
1919 gpr_log(GPR_INFO,
1920 "Completion queue of type %d is being registered as a "
1921 "server-completion-queue",
1922 static_cast<int>(cq_type));
1923 // Ideally we should log an error and abort but ruby-wrapped-language API
1924 // calls grpc_completion_queue_pluck() on server completion queues
1925 }
1926 grpc_core::Server::FromC(server)->RegisterCompletionQueue(cq);
1927 }
1928
grpc_server_register_method(grpc_server * server,const char * method,const char * host,grpc_server_register_method_payload_handling payload_handling,uint32_t flags)1929 void* grpc_server_register_method(
1930 grpc_server* server, const char* method, const char* host,
1931 grpc_server_register_method_payload_handling payload_handling,
1932 uint32_t flags) {
1933 GRPC_API_TRACE(
1934 "grpc_server_register_method(server=%p, method=%s, host=%s, "
1935 "flags=0x%08x)",
1936 4, (server, method, host, flags));
1937 return grpc_core::Server::FromC(server)->RegisterMethod(
1938 method, host, payload_handling, flags);
1939 }
1940
grpc_server_start(grpc_server * server)1941 void grpc_server_start(grpc_server* server) {
1942 grpc_core::ExecCtx exec_ctx;
1943 GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
1944 grpc_core::Server::FromC(server)->Start();
1945 }
1946
grpc_server_shutdown_and_notify(grpc_server * server,grpc_completion_queue * cq,void * tag)1947 void grpc_server_shutdown_and_notify(grpc_server* server,
1948 grpc_completion_queue* cq, void* tag) {
1949 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1950 grpc_core::ExecCtx exec_ctx;
1951 GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1952 (server, cq, tag));
1953 grpc_core::Server::FromC(server)->ShutdownAndNotify(cq, tag);
1954 }
1955
grpc_server_cancel_all_calls(grpc_server * server)1956 void grpc_server_cancel_all_calls(grpc_server* server) {
1957 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1958 grpc_core::ExecCtx exec_ctx;
1959 GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1960 grpc_core::Server::FromC(server)->CancelAllCalls();
1961 }
1962
grpc_server_destroy(grpc_server * server)1963 void grpc_server_destroy(grpc_server* server) {
1964 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1965 grpc_core::ExecCtx exec_ctx;
1966 GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1967 grpc_core::Server::FromC(server)->Orphan();
1968 }
1969
grpc_server_request_call(grpc_server * server,grpc_call ** call,grpc_call_details * details,grpc_metadata_array * request_metadata,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag)1970 grpc_call_error grpc_server_request_call(
1971 grpc_server* server, grpc_call** call, grpc_call_details* details,
1972 grpc_metadata_array* request_metadata,
1973 grpc_completion_queue* cq_bound_to_call,
1974 grpc_completion_queue* cq_for_notification, void* tag) {
1975 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1976 grpc_core::ExecCtx exec_ctx;
1977 GRPC_API_TRACE(
1978 "grpc_server_request_call("
1979 "server=%p, call=%p, details=%p, initial_metadata=%p, "
1980 "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1981 7,
1982 (server, call, details, request_metadata, cq_bound_to_call,
1983 cq_for_notification, tag));
1984 return grpc_core::Server::FromC(server)->RequestCall(
1985 call, details, request_metadata, cq_bound_to_call, cq_for_notification,
1986 tag);
1987 }
1988
grpc_server_request_registered_call(grpc_server * server,void * registered_method,grpc_call ** call,gpr_timespec * deadline,grpc_metadata_array * request_metadata,grpc_byte_buffer ** optional_payload,grpc_completion_queue * cq_bound_to_call,grpc_completion_queue * cq_for_notification,void * tag_new)1989 grpc_call_error grpc_server_request_registered_call(
1990 grpc_server* server, void* registered_method, grpc_call** call,
1991 gpr_timespec* deadline, grpc_metadata_array* request_metadata,
1992 grpc_byte_buffer** optional_payload,
1993 grpc_completion_queue* cq_bound_to_call,
1994 grpc_completion_queue* cq_for_notification, void* tag_new) {
1995 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1996 grpc_core::ExecCtx exec_ctx;
1997 auto* rm =
1998 static_cast<grpc_core::Server::RegisteredMethod*>(registered_method);
1999 GRPC_API_TRACE(
2000 "grpc_server_request_registered_call("
2001 "server=%p, registered_method=%p, call=%p, deadline=%p, "
2002 "request_metadata=%p, "
2003 "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
2004 "tag=%p)",
2005 9,
2006 (server, registered_method, call, deadline, request_metadata,
2007 optional_payload, cq_bound_to_call, cq_for_notification, tag_new));
2008 return grpc_core::Server::FromC(server)->RequestRegisteredCall(
2009 rm, call, deadline, request_metadata, optional_payload, cq_bound_to_call,
2010 cq_for_notification, tag_new);
2011 }
2012
grpc_server_set_config_fetcher(grpc_server * server,grpc_server_config_fetcher * server_config_fetcher)2013 void grpc_server_set_config_fetcher(
2014 grpc_server* server, grpc_server_config_fetcher* server_config_fetcher) {
2015 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2016 grpc_core::ExecCtx exec_ctx;
2017 GRPC_API_TRACE("grpc_server_set_config_fetcher(server=%p, config_fetcher=%p)",
2018 2, (server, server_config_fetcher));
2019 grpc_core::Server::FromC(server)->set_config_fetcher(
2020 std::unique_ptr<grpc_server_config_fetcher>(server_config_fetcher));
2021 }
2022
grpc_server_config_fetcher_destroy(grpc_server_config_fetcher * server_config_fetcher)2023 void grpc_server_config_fetcher_destroy(
2024 grpc_server_config_fetcher* server_config_fetcher) {
2025 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2026 grpc_core::ExecCtx exec_ctx;
2027 GRPC_API_TRACE("grpc_server_config_fetcher_destroy(config_fetcher=%p)", 1,
2028 (server_config_fetcher));
2029 delete server_config_fetcher;
2030 }
2031