• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/surface/call.h"
20 
21 #include <grpc/byte_buffer.h>
22 #include <grpc/compression.h>
23 #include <grpc/event_engine/event_engine.h>
24 #include <grpc/grpc.h>
25 #include <grpc/impl/call.h>
26 #include <grpc/impl/propagation_bits.h>
27 #include <grpc/slice.h>
28 #include <grpc/slice_buffer.h>
29 #include <grpc/status.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/atm.h>
32 #include <grpc/support/port_platform.h>
33 #include <grpc/support/string_util.h>
34 #include <inttypes.h>
35 #include <limits.h>
36 #include <stdlib.h>
37 #include <string.h>
38 
39 #include <algorithm>
40 #include <atomic>
41 #include <cstdint>
42 #include <memory>
43 #include <new>
44 #include <queue>
45 #include <string>
46 #include <type_traits>
47 #include <utility>
48 #include <vector>
49 
50 #include "absl/base/thread_annotations.h"
51 #include "absl/log/check.h"
52 #include "absl/log/log.h"
53 #include "absl/status/status.h"
54 #include "absl/strings/str_cat.h"
55 #include "absl/strings/str_format.h"
56 #include "absl/strings/str_join.h"
57 #include "absl/strings/string_view.h"
58 #include "src/core/channelz/channelz.h"
59 #include "src/core/lib/channel/call_finalization.h"
60 #include "src/core/lib/channel/channel_stack.h"
61 #include "src/core/lib/channel/status_util.h"
62 #include "src/core/lib/compression/compression_internal.h"
63 #include "src/core/lib/event_engine/event_engine_context.h"
64 #include "src/core/lib/experiments/experiments.h"
65 #include "src/core/lib/iomgr/call_combiner.h"
66 #include "src/core/lib/iomgr/exec_ctx.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/promise/activity.h"
69 #include "src/core/lib/promise/all_ok.h"
70 #include "src/core/lib/promise/arena_promise.h"
71 #include "src/core/lib/promise/cancel_callback.h"
72 #include "src/core/lib/promise/context.h"
73 #include "src/core/lib/promise/latch.h"
74 #include "src/core/lib/promise/map.h"
75 #include "src/core/lib/promise/pipe.h"
76 #include "src/core/lib/promise/poll.h"
77 #include "src/core/lib/promise/race.h"
78 #include "src/core/lib/promise/seq.h"
79 #include "src/core/lib/promise/status_flag.h"
80 #include "src/core/lib/promise/try_seq.h"
81 #include "src/core/lib/resource_quota/arena.h"
82 #include "src/core/lib/slice/slice_buffer.h"
83 #include "src/core/lib/slice/slice_internal.h"
84 #include "src/core/lib/surface/call_test_only.h"
85 #include "src/core/lib/surface/channel.h"
86 #include "src/core/lib/surface/completion_queue.h"
87 #include "src/core/lib/surface/validate_metadata.h"
88 #include "src/core/lib/transport/error_utils.h"
89 #include "src/core/lib/transport/metadata.h"
90 #include "src/core/lib/transport/metadata_batch.h"
91 #include "src/core/lib/transport/transport.h"
92 #include "src/core/server/server_interface.h"
93 #include "src/core/telemetry/call_tracer.h"
94 #include "src/core/telemetry/stats.h"
95 #include "src/core/telemetry/stats_data.h"
96 #include "src/core/util/alloc.h"
97 #include "src/core/util/bitset.h"
98 #include "src/core/util/cpp_impl_of.h"
99 #include "src/core/util/crash.h"
100 #include "src/core/util/debug_location.h"
101 #include "src/core/util/match.h"
102 #include "src/core/util/ref_counted.h"
103 #include "src/core/util/ref_counted_ptr.h"
104 #include "src/core/util/status_helper.h"
105 #include "src/core/util/sync.h"
106 #include "src/core/util/time_precise.h"
107 #include "src/core/util/useful.h"
108 
109 namespace grpc_core {
110 
111 // Alias to make this type available in Call implementation without a grpc_core
112 // prefix.
113 using GrpcClosure = Closure;
114 
115 ///////////////////////////////////////////////////////////////////////////////
116 // Call
117 
Call(bool is_client,Timestamp send_deadline,RefCountedPtr<Arena> arena)118 Call::Call(bool is_client, Timestamp send_deadline, RefCountedPtr<Arena> arena)
119     : arena_(std::move(arena)),
120       send_deadline_(send_deadline),
121       is_client_(is_client) {
122   DCHECK_NE(arena_.get(), nullptr);
123   DCHECK_NE(arena_->GetContext<grpc_event_engine::experimental::EventEngine>(),
124             nullptr);
125   arena_->SetContext<Call>(this);
126 }
127 
GetOrCreateParentCall()128 Call::ParentCall* Call::GetOrCreateParentCall() {
129   ParentCall* p = parent_call_.load(std::memory_order_acquire);
130   if (p == nullptr) {
131     p = arena()->New<ParentCall>();
132     ParentCall* expected = nullptr;
133     if (!parent_call_.compare_exchange_strong(expected, p,
134                                               std::memory_order_release,
135                                               std::memory_order_relaxed)) {
136       p->~ParentCall();
137       p = expected;
138     }
139   }
140   return p;
141 }
142 
parent_call()143 Call::ParentCall* Call::parent_call() {
144   return parent_call_.load(std::memory_order_acquire);
145 }
146 
InitParent(Call * parent,uint32_t propagation_mask)147 absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) {
148   child_ = arena()->New<ChildCall>(parent);
149 
150   parent->InternalRef("child");
151   CHECK(is_client_);
152   CHECK(!parent->is_client_);
153 
154   if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
155     send_deadline_ = std::min(send_deadline_, parent->send_deadline_);
156   }
157   // for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
158   // GRPC_PROPAGATE_STATS_CONTEXT
159   // TODO(ctiller): This should change to use the appropriate census start_op
160   // call.
161   if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
162     if (0 == (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
163       return absl::UnknownError(
164           "Census tracing propagation requested without Census context "
165           "propagation");
166     }
167     arena()->SetContext<census_context>(
168         parent->arena()->GetContext<census_context>());
169   } else if (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
170     return absl::UnknownError(
171         "Census context propagation requested without Census tracing "
172         "propagation");
173   }
174   if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
175     cancellation_is_inherited_ = true;
176   }
177   return absl::OkStatus();
178 }
179 
PublishToParent(Call * parent)180 void Call::PublishToParent(Call* parent) {
181   ChildCall* cc = child_;
182   ParentCall* pc = parent->GetOrCreateParentCall();
183   MutexLock lock(&pc->child_list_mu);
184   if (pc->first_child == nullptr) {
185     pc->first_child = this;
186     cc->sibling_next = cc->sibling_prev = this;
187   } else {
188     cc->sibling_next = pc->first_child;
189     cc->sibling_prev = pc->first_child->child_->sibling_prev;
190     cc->sibling_next->child_->sibling_prev =
191         cc->sibling_prev->child_->sibling_next = this;
192   }
193   if (parent->Completed()) {
194     CancelWithError(absl::CancelledError());
195   }
196 }
197 
MaybeUnpublishFromParent()198 void Call::MaybeUnpublishFromParent() {
199   ChildCall* cc = child_;
200   if (cc == nullptr) return;
201 
202   ParentCall* pc = cc->parent->parent_call();
203   {
204     MutexLock lock(&pc->child_list_mu);
205     if (this == pc->first_child) {
206       pc->first_child = cc->sibling_next;
207       if (this == pc->first_child) {
208         pc->first_child = nullptr;
209       }
210     }
211     cc->sibling_prev->child_->sibling_next = cc->sibling_next;
212     cc->sibling_next->child_->sibling_prev = cc->sibling_prev;
213   }
214   cc->parent->InternalUnref("child");
215 }
216 
CancelWithStatus(grpc_status_code status,const char * description)217 void Call::CancelWithStatus(grpc_status_code status, const char* description) {
218   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
219   // guarantee that can be short-lived.
220   // TODO(ctiller): change to
221   // absl::Status(static_cast<absl::StatusCode>(status), description)
222   // (ie remove the set_int, set_str).
223   CancelWithError(grpc_error_set_int(
224       grpc_error_set_str(
225           absl::Status(static_cast<absl::StatusCode>(status), description),
226           StatusStrProperty::kGrpcMessage, description),
227       StatusIntProperty::kRpcStatus, status));
228 }
229 
PropagateCancellationToChildren()230 void Call::PropagateCancellationToChildren() {
231   ParentCall* pc = parent_call();
232   if (pc != nullptr) {
233     Call* child;
234     MutexLock lock(&pc->child_list_mu);
235     child = pc->first_child;
236     if (child != nullptr) {
237       do {
238         Call* next_child_call = child->child_->sibling_next;
239         if (child->cancellation_is_inherited_) {
240           child->InternalRef("propagate_cancel");
241           child->CancelWithError(absl::CancelledError());
242           child->InternalUnref("propagate_cancel");
243         }
244         child = next_child_call;
245       } while (child != pc->first_child);
246     }
247   }
248 }
249 
PrepareOutgoingInitialMetadata(const grpc_op & op,grpc_metadata_batch & md)250 void Call::PrepareOutgoingInitialMetadata(const grpc_op& op,
251                                           grpc_metadata_batch& md) {
252   // TODO(juanlishen): If the user has already specified a compression
253   // algorithm by setting the initial metadata with key of
254   // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
255   // with the compression algorithm mapped from compression level.
256   // process compression level
257   grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE;
258   bool level_set = false;
259   if (op.data.send_initial_metadata.maybe_compression_level.is_set) {
260     effective_compression_level =
261         op.data.send_initial_metadata.maybe_compression_level.level;
262     level_set = true;
263   } else {
264     const grpc_compression_options copts = compression_options();
265     if (copts.default_level.is_set) {
266       level_set = true;
267       effective_compression_level = copts.default_level.level;
268     }
269   }
270   // Currently, only server side supports compression level setting.
271   if (level_set && !is_client()) {
272     const grpc_compression_algorithm calgo =
273         encodings_accepted_by_peer().CompressionAlgorithmForLevel(
274             effective_compression_level);
275     // The following metadata will be checked and removed by the message
276     // compression filter. It will be used as the call's compression
277     // algorithm.
278     md.Set(GrpcInternalEncodingRequest(), calgo);
279   }
280   // Ignore any te metadata key value pairs specified.
281   md.Remove(TeMetadata());
282   // Should never come from applications
283   md.Remove(GrpcLbClientStatsMetadata());
284 }
285 
ProcessIncomingInitialMetadata(grpc_metadata_batch & md)286 void Call::ProcessIncomingInitialMetadata(grpc_metadata_batch& md) {
287   Slice* peer_string = md.get_pointer(PeerString());
288   if (peer_string != nullptr) SetPeerString(peer_string->Ref());
289 
290   SetIncomingCompressionAlgorithm(
291       md.Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE));
292   encodings_accepted_by_peer_ =
293       md.Take(GrpcAcceptEncodingMetadata())
294           .value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE});
295 
296   const grpc_compression_options copts = compression_options();
297   const grpc_compression_algorithm compression_algorithm =
298       incoming_compression_algorithm();
299   if (GPR_UNLIKELY(
300           !CompressionAlgorithmSet::FromUint32(copts.enabled_algorithms_bitset)
301                .IsSet(compression_algorithm))) {
302     // check if algorithm is supported by current channel config
303     HandleCompressionAlgorithmDisabled(compression_algorithm);
304   }
305   // GRPC_COMPRESS_NONE is always set.
306   DCHECK(encodings_accepted_by_peer_.IsSet(GRPC_COMPRESS_NONE));
307   if (GPR_UNLIKELY(!encodings_accepted_by_peer_.IsSet(compression_algorithm))) {
308     if (GRPC_TRACE_FLAG_ENABLED(compression)) {
309       HandleCompressionAlgorithmNotAccepted(compression_algorithm);
310     }
311   }
312 }
313 
HandleCompressionAlgorithmNotAccepted(grpc_compression_algorithm compression_algorithm)314 void Call::HandleCompressionAlgorithmNotAccepted(
315     grpc_compression_algorithm compression_algorithm) {
316   const char* algo_name = nullptr;
317   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
318   LOG(ERROR) << "Compression algorithm ('" << algo_name
319              << "') not present in the accepted encodings ("
320              << encodings_accepted_by_peer_.ToString() << ")";
321 }
322 
HandleCompressionAlgorithmDisabled(grpc_compression_algorithm compression_algorithm)323 void Call::HandleCompressionAlgorithmDisabled(
324     grpc_compression_algorithm compression_algorithm) {
325   const char* algo_name = nullptr;
326   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
327   std::string error_msg =
328       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
329   LOG(ERROR) << error_msg;
330   CancelWithError(grpc_error_set_int(absl::UnimplementedError(error_msg),
331                                      StatusIntProperty::kRpcStatus,
332                                      GRPC_STATUS_UNIMPLEMENTED));
333 }
334 
UpdateDeadline(Timestamp deadline)335 void Call::UpdateDeadline(Timestamp deadline) {
336   ReleasableMutexLock lock(&deadline_mu_);
337   GRPC_TRACE_LOG(call, INFO)
338       << "[call " << this << "] UpdateDeadline from=" << deadline_.ToString()
339       << " to=" << deadline.ToString();
340   if (deadline >= deadline_) return;
341   if (deadline < Timestamp::Now()) {
342     lock.Release();
343     CancelWithError(grpc_error_set_int(
344         absl::DeadlineExceededError("Deadline Exceeded"),
345         StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED));
346     return;
347   }
348   auto* event_engine =
349       arena_->GetContext<grpc_event_engine::experimental::EventEngine>();
350   if (deadline_ != Timestamp::InfFuture()) {
351     if (!event_engine->Cancel(deadline_task_)) return;
352   } else {
353     InternalRef("deadline");
354   }
355   deadline_ = deadline;
356   deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
357 }
358 
ResetDeadline()359 void Call::ResetDeadline() {
360   {
361     MutexLock lock(&deadline_mu_);
362     if (deadline_ == Timestamp::InfFuture()) return;
363     if (!arena_->GetContext<grpc_event_engine::experimental::EventEngine>()
364              ->Cancel(deadline_task_)) {
365       return;
366     }
367     deadline_ = Timestamp::InfFuture();
368   }
369   InternalUnref("deadline[reset]");
370 }
371 
Run()372 void Call::Run() {
373   ApplicationCallbackExecCtx callback_exec_ctx;
374   ExecCtx exec_ctx;
375   GRPC_TRACE_LOG(call, INFO)
376       << "call deadline expired "
377       << GRPC_DUMP_ARGS(Timestamp::Now(), send_deadline_);
378   CancelWithError(grpc_error_set_int(
379       absl::DeadlineExceededError("Deadline Exceeded"),
380       StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED));
381   InternalUnref("deadline[run]");
382 }
383 
384 }  // namespace grpc_core
385 
386 ///////////////////////////////////////////////////////////////////////////////
387 // C-based API
388 
grpc_call_arena_alloc(grpc_call * call,size_t size)389 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
390   grpc_core::ExecCtx exec_ctx;
391   return grpc_core::Call::FromC(call)->arena()->Alloc(size);
392 }
393 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)394 void grpc_call_set_completion_queue(grpc_call* call,
395                                     grpc_completion_queue* cq) {
396   grpc_core::Call::FromC(call)->SetCompletionQueue(cq);
397 }
398 
grpc_call_ref(grpc_call * c)399 void grpc_call_ref(grpc_call* c) { grpc_core::Call::FromC(c)->ExternalRef(); }
400 
grpc_call_unref(grpc_call * c)401 void grpc_call_unref(grpc_call* c) {
402   grpc_core::ExecCtx exec_ctx;
403   grpc_core::Call::FromC(c)->ExternalUnref();
404 }
405 
grpc_call_get_peer(grpc_call * call)406 char* grpc_call_get_peer(grpc_call* call) {
407   return grpc_core::Call::FromC(call)->GetPeer();
408 }
409 
grpc_call_cancel(grpc_call * call,void * reserved)410 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
411   GRPC_TRACE_LOG(api, INFO)
412       << "grpc_call_cancel(call=" << call << ", reserved=" << reserved << ")";
413   CHECK_EQ(reserved, nullptr);
414   if (call == nullptr) {
415     return GRPC_CALL_ERROR;
416   }
417   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
418   grpc_core::ExecCtx exec_ctx;
419   grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
420   return GRPC_CALL_OK;
421 }
422 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)423 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
424                                              grpc_status_code status,
425                                              const char* description,
426                                              void* reserved) {
427   GRPC_TRACE_LOG(api, INFO)
428       << "grpc_call_cancel_with_status(c=" << c << ", status=" << (int)status
429       << ", description=" << description << ", reserved=" << reserved << ")";
430   CHECK_EQ(reserved, nullptr);
431   if (c == nullptr) {
432     return GRPC_CALL_ERROR;
433   }
434   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
435   grpc_core::ExecCtx exec_ctx;
436   grpc_core::Call::FromC(c)->CancelWithStatus(status, description);
437   return GRPC_CALL_OK;
438 }
439 
grpc_call_cancel_internal(grpc_call * call)440 void grpc_call_cancel_internal(grpc_call* call) {
441   grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
442 }
443 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)444 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
445     grpc_call* call) {
446   return grpc_core::Call::FromC(call)->incoming_compression_algorithm();
447 }
448 
grpc_call_test_only_get_message_flags(grpc_call * call)449 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
450   return grpc_core::Call::FromC(call)->test_only_message_flags();
451 }
452 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)453 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
454   return grpc_core::Call::FromC(call)
455       ->encodings_accepted_by_peer()
456       .ToLegacyBitmask();
457 }
458 
grpc_call_get_arena(grpc_call * call)459 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) {
460   return grpc_core::Call::FromC(call)->arena();
461 }
462 
grpc_call_get_call_stack(grpc_call * call)463 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
464   return grpc_core::Call::FromC(call)->call_stack();
465 }
466 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)467 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
468                                       size_t nops, void* tag, void* reserved) {
469   GRPC_TRACE_LOG(api, INFO)
470       << "grpc_call_start_batch(call=" << call << ", ops=" << ops
471       << ", nops=" << (unsigned long)nops << ", tag=" << tag
472       << ", reserved=" << reserved << ")";
473 
474   if (reserved != nullptr || call == nullptr) {
475     return GRPC_CALL_ERROR;
476   } else {
477     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
478     grpc_core::ExecCtx exec_ctx;
479     return grpc_core::Call::FromC(call)->StartBatch(ops, nops, tag, false);
480   }
481 }
482 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)483 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
484                                                   const grpc_op* ops,
485                                                   size_t nops,
486                                                   grpc_closure* closure) {
487   return grpc_core::Call::FromC(call)->StartBatch(ops, nops, closure, true);
488 }
489 
grpc_call_tracer_set(grpc_call * call,grpc_core::ClientCallTracer * tracer)490 void grpc_call_tracer_set(grpc_call* call,
491                           grpc_core::ClientCallTracer* tracer) {
492   grpc_core::Arena* arena = grpc_call_get_arena(call);
493   return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
494 }
495 
grpc_call_tracer_set_and_manage(grpc_call * call,grpc_core::ClientCallTracer * tracer)496 void grpc_call_tracer_set_and_manage(grpc_call* call,
497                                      grpc_core::ClientCallTracer* tracer) {
498   grpc_core::Arena* arena = grpc_call_get_arena(call);
499   arena->ManagedNew<ClientCallTracerWrapper>(tracer);
500   return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
501 }
502 
grpc_call_tracer_get(grpc_call * call)503 void* grpc_call_tracer_get(grpc_call* call) {
504   grpc_core::Arena* arena = grpc_call_get_arena(call);
505   auto* call_tracer =
506       arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
507   return call_tracer;
508 }
509 
grpc_call_is_client(grpc_call * call)510 uint8_t grpc_call_is_client(grpc_call* call) {
511   return grpc_core::Call::FromC(call)->is_client();
512 }
513 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)514 grpc_compression_algorithm grpc_call_compression_for_level(
515     grpc_call* call, grpc_compression_level level) {
516   return grpc_core::Call::FromC(call)
517       ->encodings_accepted_by_peer()
518       .CompressionAlgorithmForLevel(level);
519 }
520 
grpc_call_is_trailers_only(const grpc_call * call)521 bool grpc_call_is_trailers_only(const grpc_call* call) {
522   return grpc_core::Call::FromC(call)->is_trailers_only();
523 }
524 
grpc_call_failed_before_recv_message(const grpc_call * c)525 int grpc_call_failed_before_recv_message(const grpc_call* c) {
526   return grpc_core::Call::FromC(c)->failed_before_recv_message();
527 }
528 
grpc_call_server_authority(const grpc_call * call)529 absl::string_view grpc_call_server_authority(const grpc_call* call) {
530   return grpc_core::Call::FromC(call)->GetServerAuthority();
531 }
532 
grpc_call_error_to_string(grpc_call_error error)533 const char* grpc_call_error_to_string(grpc_call_error error) {
534   switch (error) {
535     case GRPC_CALL_ERROR:
536       return "GRPC_CALL_ERROR";
537     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
538       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
539     case GRPC_CALL_ERROR_ALREADY_FINISHED:
540       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
541     case GRPC_CALL_ERROR_ALREADY_INVOKED:
542       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
543     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
544       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
545     case GRPC_CALL_ERROR_INVALID_FLAGS:
546       return "GRPC_CALL_ERROR_INVALID_FLAGS";
547     case GRPC_CALL_ERROR_INVALID_MESSAGE:
548       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
549     case GRPC_CALL_ERROR_INVALID_METADATA:
550       return "GRPC_CALL_ERROR_INVALID_METADATA";
551     case GRPC_CALL_ERROR_NOT_INVOKED:
552       return "GRPC_CALL_ERROR_NOT_INVOKED";
553     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
554       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
555     case GRPC_CALL_ERROR_NOT_ON_SERVER:
556       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
557     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
558       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
559     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
560       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
561     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
562       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
563     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
564       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
565     case GRPC_CALL_OK:
566       return "GRPC_CALL_OK";
567   }
568   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
569 }
570 
grpc_call_run_in_event_engine(const grpc_call * call,absl::AnyInvocable<void ()> cb)571 void grpc_call_run_in_event_engine(const grpc_call* call,
572                                    absl::AnyInvocable<void()> cb) {
573   grpc_core::Call::FromC(call)
574       ->arena()
575       ->GetContext<grpc_event_engine::experimental::EventEngine>()
576       ->Run(std::move(cb));
577 }
578