1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/surface/call.h"
20
21 #include <grpc/byte_buffer.h>
22 #include <grpc/compression.h>
23 #include <grpc/event_engine/event_engine.h>
24 #include <grpc/grpc.h>
25 #include <grpc/impl/call.h>
26 #include <grpc/impl/propagation_bits.h>
27 #include <grpc/slice.h>
28 #include <grpc/slice_buffer.h>
29 #include <grpc/status.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/atm.h>
32 #include <grpc/support/port_platform.h>
33 #include <grpc/support/string_util.h>
34 #include <inttypes.h>
35 #include <limits.h>
36 #include <stdlib.h>
37 #include <string.h>
38
39 #include <algorithm>
40 #include <atomic>
41 #include <cstdint>
42 #include <memory>
43 #include <new>
44 #include <queue>
45 #include <string>
46 #include <type_traits>
47 #include <utility>
48 #include <vector>
49
50 #include "absl/base/thread_annotations.h"
51 #include "absl/log/check.h"
52 #include "absl/log/log.h"
53 #include "absl/status/status.h"
54 #include "absl/strings/str_cat.h"
55 #include "absl/strings/str_format.h"
56 #include "absl/strings/str_join.h"
57 #include "absl/strings/string_view.h"
58 #include "src/core/channelz/channelz.h"
59 #include "src/core/lib/channel/call_finalization.h"
60 #include "src/core/lib/channel/channel_stack.h"
61 #include "src/core/lib/channel/status_util.h"
62 #include "src/core/lib/compression/compression_internal.h"
63 #include "src/core/lib/event_engine/event_engine_context.h"
64 #include "src/core/lib/experiments/experiments.h"
65 #include "src/core/lib/iomgr/call_combiner.h"
66 #include "src/core/lib/iomgr/exec_ctx.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/promise/activity.h"
69 #include "src/core/lib/promise/all_ok.h"
70 #include "src/core/lib/promise/arena_promise.h"
71 #include "src/core/lib/promise/cancel_callback.h"
72 #include "src/core/lib/promise/context.h"
73 #include "src/core/lib/promise/latch.h"
74 #include "src/core/lib/promise/map.h"
75 #include "src/core/lib/promise/pipe.h"
76 #include "src/core/lib/promise/poll.h"
77 #include "src/core/lib/promise/race.h"
78 #include "src/core/lib/promise/seq.h"
79 #include "src/core/lib/promise/status_flag.h"
80 #include "src/core/lib/promise/try_seq.h"
81 #include "src/core/lib/resource_quota/arena.h"
82 #include "src/core/lib/slice/slice_buffer.h"
83 #include "src/core/lib/slice/slice_internal.h"
84 #include "src/core/lib/surface/call_test_only.h"
85 #include "src/core/lib/surface/channel.h"
86 #include "src/core/lib/surface/completion_queue.h"
87 #include "src/core/lib/surface/validate_metadata.h"
88 #include "src/core/lib/transport/error_utils.h"
89 #include "src/core/lib/transport/metadata.h"
90 #include "src/core/lib/transport/metadata_batch.h"
91 #include "src/core/lib/transport/transport.h"
92 #include "src/core/server/server_interface.h"
93 #include "src/core/telemetry/call_tracer.h"
94 #include "src/core/telemetry/stats.h"
95 #include "src/core/telemetry/stats_data.h"
96 #include "src/core/util/alloc.h"
97 #include "src/core/util/bitset.h"
98 #include "src/core/util/cpp_impl_of.h"
99 #include "src/core/util/crash.h"
100 #include "src/core/util/debug_location.h"
101 #include "src/core/util/match.h"
102 #include "src/core/util/ref_counted.h"
103 #include "src/core/util/ref_counted_ptr.h"
104 #include "src/core/util/status_helper.h"
105 #include "src/core/util/sync.h"
106 #include "src/core/util/time_precise.h"
107 #include "src/core/util/useful.h"
108
109 namespace grpc_core {
110
111 // Alias to make this type available in Call implementation without a grpc_core
112 // prefix.
113 using GrpcClosure = Closure;
114
115 ///////////////////////////////////////////////////////////////////////////////
116 // Call
117
Call(bool is_client,Timestamp send_deadline,RefCountedPtr<Arena> arena)118 Call::Call(bool is_client, Timestamp send_deadline, RefCountedPtr<Arena> arena)
119 : arena_(std::move(arena)),
120 send_deadline_(send_deadline),
121 is_client_(is_client) {
122 DCHECK_NE(arena_.get(), nullptr);
123 DCHECK_NE(arena_->GetContext<grpc_event_engine::experimental::EventEngine>(),
124 nullptr);
125 arena_->SetContext<Call>(this);
126 }
127
GetOrCreateParentCall()128 Call::ParentCall* Call::GetOrCreateParentCall() {
129 ParentCall* p = parent_call_.load(std::memory_order_acquire);
130 if (p == nullptr) {
131 p = arena()->New<ParentCall>();
132 ParentCall* expected = nullptr;
133 if (!parent_call_.compare_exchange_strong(expected, p,
134 std::memory_order_release,
135 std::memory_order_relaxed)) {
136 p->~ParentCall();
137 p = expected;
138 }
139 }
140 return p;
141 }
142
parent_call()143 Call::ParentCall* Call::parent_call() {
144 return parent_call_.load(std::memory_order_acquire);
145 }
146
InitParent(Call * parent,uint32_t propagation_mask)147 absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) {
148 child_ = arena()->New<ChildCall>(parent);
149
150 parent->InternalRef("child");
151 CHECK(is_client_);
152 CHECK(!parent->is_client_);
153
154 if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
155 send_deadline_ = std::min(send_deadline_, parent->send_deadline_);
156 }
157 // for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
158 // GRPC_PROPAGATE_STATS_CONTEXT
159 // TODO(ctiller): This should change to use the appropriate census start_op
160 // call.
161 if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
162 if (0 == (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
163 return absl::UnknownError(
164 "Census tracing propagation requested without Census context "
165 "propagation");
166 }
167 arena()->SetContext<census_context>(
168 parent->arena()->GetContext<census_context>());
169 } else if (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
170 return absl::UnknownError(
171 "Census context propagation requested without Census tracing "
172 "propagation");
173 }
174 if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
175 cancellation_is_inherited_ = true;
176 }
177 return absl::OkStatus();
178 }
179
PublishToParent(Call * parent)180 void Call::PublishToParent(Call* parent) {
181 ChildCall* cc = child_;
182 ParentCall* pc = parent->GetOrCreateParentCall();
183 MutexLock lock(&pc->child_list_mu);
184 if (pc->first_child == nullptr) {
185 pc->first_child = this;
186 cc->sibling_next = cc->sibling_prev = this;
187 } else {
188 cc->sibling_next = pc->first_child;
189 cc->sibling_prev = pc->first_child->child_->sibling_prev;
190 cc->sibling_next->child_->sibling_prev =
191 cc->sibling_prev->child_->sibling_next = this;
192 }
193 if (parent->Completed()) {
194 CancelWithError(absl::CancelledError());
195 }
196 }
197
MaybeUnpublishFromParent()198 void Call::MaybeUnpublishFromParent() {
199 ChildCall* cc = child_;
200 if (cc == nullptr) return;
201
202 ParentCall* pc = cc->parent->parent_call();
203 {
204 MutexLock lock(&pc->child_list_mu);
205 if (this == pc->first_child) {
206 pc->first_child = cc->sibling_next;
207 if (this == pc->first_child) {
208 pc->first_child = nullptr;
209 }
210 }
211 cc->sibling_prev->child_->sibling_next = cc->sibling_next;
212 cc->sibling_next->child_->sibling_prev = cc->sibling_prev;
213 }
214 cc->parent->InternalUnref("child");
215 }
216
CancelWithStatus(grpc_status_code status,const char * description)217 void Call::CancelWithStatus(grpc_status_code status, const char* description) {
218 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
219 // guarantee that can be short-lived.
220 // TODO(ctiller): change to
221 // absl::Status(static_cast<absl::StatusCode>(status), description)
222 // (ie remove the set_int, set_str).
223 CancelWithError(grpc_error_set_int(
224 grpc_error_set_str(
225 absl::Status(static_cast<absl::StatusCode>(status), description),
226 StatusStrProperty::kGrpcMessage, description),
227 StatusIntProperty::kRpcStatus, status));
228 }
229
PropagateCancellationToChildren()230 void Call::PropagateCancellationToChildren() {
231 ParentCall* pc = parent_call();
232 if (pc != nullptr) {
233 Call* child;
234 MutexLock lock(&pc->child_list_mu);
235 child = pc->first_child;
236 if (child != nullptr) {
237 do {
238 Call* next_child_call = child->child_->sibling_next;
239 if (child->cancellation_is_inherited_) {
240 child->InternalRef("propagate_cancel");
241 child->CancelWithError(absl::CancelledError());
242 child->InternalUnref("propagate_cancel");
243 }
244 child = next_child_call;
245 } while (child != pc->first_child);
246 }
247 }
248 }
249
PrepareOutgoingInitialMetadata(const grpc_op & op,grpc_metadata_batch & md)250 void Call::PrepareOutgoingInitialMetadata(const grpc_op& op,
251 grpc_metadata_batch& md) {
252 // TODO(juanlishen): If the user has already specified a compression
253 // algorithm by setting the initial metadata with key of
254 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
255 // with the compression algorithm mapped from compression level.
256 // process compression level
257 grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE;
258 bool level_set = false;
259 if (op.data.send_initial_metadata.maybe_compression_level.is_set) {
260 effective_compression_level =
261 op.data.send_initial_metadata.maybe_compression_level.level;
262 level_set = true;
263 } else {
264 const grpc_compression_options copts = compression_options();
265 if (copts.default_level.is_set) {
266 level_set = true;
267 effective_compression_level = copts.default_level.level;
268 }
269 }
270 // Currently, only server side supports compression level setting.
271 if (level_set && !is_client()) {
272 const grpc_compression_algorithm calgo =
273 encodings_accepted_by_peer().CompressionAlgorithmForLevel(
274 effective_compression_level);
275 // The following metadata will be checked and removed by the message
276 // compression filter. It will be used as the call's compression
277 // algorithm.
278 md.Set(GrpcInternalEncodingRequest(), calgo);
279 }
280 // Ignore any te metadata key value pairs specified.
281 md.Remove(TeMetadata());
282 // Should never come from applications
283 md.Remove(GrpcLbClientStatsMetadata());
284 }
285
ProcessIncomingInitialMetadata(grpc_metadata_batch & md)286 void Call::ProcessIncomingInitialMetadata(grpc_metadata_batch& md) {
287 Slice* peer_string = md.get_pointer(PeerString());
288 if (peer_string != nullptr) SetPeerString(peer_string->Ref());
289
290 SetIncomingCompressionAlgorithm(
291 md.Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE));
292 encodings_accepted_by_peer_ =
293 md.Take(GrpcAcceptEncodingMetadata())
294 .value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE});
295
296 const grpc_compression_options copts = compression_options();
297 const grpc_compression_algorithm compression_algorithm =
298 incoming_compression_algorithm();
299 if (GPR_UNLIKELY(
300 !CompressionAlgorithmSet::FromUint32(copts.enabled_algorithms_bitset)
301 .IsSet(compression_algorithm))) {
302 // check if algorithm is supported by current channel config
303 HandleCompressionAlgorithmDisabled(compression_algorithm);
304 }
305 // GRPC_COMPRESS_NONE is always set.
306 DCHECK(encodings_accepted_by_peer_.IsSet(GRPC_COMPRESS_NONE));
307 if (GPR_UNLIKELY(!encodings_accepted_by_peer_.IsSet(compression_algorithm))) {
308 if (GRPC_TRACE_FLAG_ENABLED(compression)) {
309 HandleCompressionAlgorithmNotAccepted(compression_algorithm);
310 }
311 }
312 }
313
HandleCompressionAlgorithmNotAccepted(grpc_compression_algorithm compression_algorithm)314 void Call::HandleCompressionAlgorithmNotAccepted(
315 grpc_compression_algorithm compression_algorithm) {
316 const char* algo_name = nullptr;
317 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
318 LOG(ERROR) << "Compression algorithm ('" << algo_name
319 << "') not present in the accepted encodings ("
320 << encodings_accepted_by_peer_.ToString() << ")";
321 }
322
HandleCompressionAlgorithmDisabled(grpc_compression_algorithm compression_algorithm)323 void Call::HandleCompressionAlgorithmDisabled(
324 grpc_compression_algorithm compression_algorithm) {
325 const char* algo_name = nullptr;
326 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
327 std::string error_msg =
328 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
329 LOG(ERROR) << error_msg;
330 CancelWithError(grpc_error_set_int(absl::UnimplementedError(error_msg),
331 StatusIntProperty::kRpcStatus,
332 GRPC_STATUS_UNIMPLEMENTED));
333 }
334
UpdateDeadline(Timestamp deadline)335 void Call::UpdateDeadline(Timestamp deadline) {
336 ReleasableMutexLock lock(&deadline_mu_);
337 GRPC_TRACE_LOG(call, INFO)
338 << "[call " << this << "] UpdateDeadline from=" << deadline_.ToString()
339 << " to=" << deadline.ToString();
340 if (deadline >= deadline_) return;
341 if (deadline < Timestamp::Now()) {
342 lock.Release();
343 CancelWithError(grpc_error_set_int(
344 absl::DeadlineExceededError("Deadline Exceeded"),
345 StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED));
346 return;
347 }
348 auto* event_engine =
349 arena_->GetContext<grpc_event_engine::experimental::EventEngine>();
350 if (deadline_ != Timestamp::InfFuture()) {
351 if (!event_engine->Cancel(deadline_task_)) return;
352 } else {
353 InternalRef("deadline");
354 }
355 deadline_ = deadline;
356 deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this);
357 }
358
ResetDeadline()359 void Call::ResetDeadline() {
360 {
361 MutexLock lock(&deadline_mu_);
362 if (deadline_ == Timestamp::InfFuture()) return;
363 if (!arena_->GetContext<grpc_event_engine::experimental::EventEngine>()
364 ->Cancel(deadline_task_)) {
365 return;
366 }
367 deadline_ = Timestamp::InfFuture();
368 }
369 InternalUnref("deadline[reset]");
370 }
371
Run()372 void Call::Run() {
373 ApplicationCallbackExecCtx callback_exec_ctx;
374 ExecCtx exec_ctx;
375 GRPC_TRACE_LOG(call, INFO)
376 << "call deadline expired "
377 << GRPC_DUMP_ARGS(Timestamp::Now(), send_deadline_);
378 CancelWithError(grpc_error_set_int(
379 absl::DeadlineExceededError("Deadline Exceeded"),
380 StatusIntProperty::kRpcStatus, GRPC_STATUS_DEADLINE_EXCEEDED));
381 InternalUnref("deadline[run]");
382 }
383
384 } // namespace grpc_core
385
386 ///////////////////////////////////////////////////////////////////////////////
387 // C-based API
388
grpc_call_arena_alloc(grpc_call * call,size_t size)389 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
390 grpc_core::ExecCtx exec_ctx;
391 return grpc_core::Call::FromC(call)->arena()->Alloc(size);
392 }
393
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)394 void grpc_call_set_completion_queue(grpc_call* call,
395 grpc_completion_queue* cq) {
396 grpc_core::Call::FromC(call)->SetCompletionQueue(cq);
397 }
398
grpc_call_ref(grpc_call * c)399 void grpc_call_ref(grpc_call* c) { grpc_core::Call::FromC(c)->ExternalRef(); }
400
grpc_call_unref(grpc_call * c)401 void grpc_call_unref(grpc_call* c) {
402 grpc_core::ExecCtx exec_ctx;
403 grpc_core::Call::FromC(c)->ExternalUnref();
404 }
405
grpc_call_get_peer(grpc_call * call)406 char* grpc_call_get_peer(grpc_call* call) {
407 return grpc_core::Call::FromC(call)->GetPeer();
408 }
409
grpc_call_cancel(grpc_call * call,void * reserved)410 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
411 GRPC_TRACE_LOG(api, INFO)
412 << "grpc_call_cancel(call=" << call << ", reserved=" << reserved << ")";
413 CHECK_EQ(reserved, nullptr);
414 if (call == nullptr) {
415 return GRPC_CALL_ERROR;
416 }
417 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
418 grpc_core::ExecCtx exec_ctx;
419 grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
420 return GRPC_CALL_OK;
421 }
422
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)423 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
424 grpc_status_code status,
425 const char* description,
426 void* reserved) {
427 GRPC_TRACE_LOG(api, INFO)
428 << "grpc_call_cancel_with_status(c=" << c << ", status=" << (int)status
429 << ", description=" << description << ", reserved=" << reserved << ")";
430 CHECK_EQ(reserved, nullptr);
431 if (c == nullptr) {
432 return GRPC_CALL_ERROR;
433 }
434 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
435 grpc_core::ExecCtx exec_ctx;
436 grpc_core::Call::FromC(c)->CancelWithStatus(status, description);
437 return GRPC_CALL_OK;
438 }
439
grpc_call_cancel_internal(grpc_call * call)440 void grpc_call_cancel_internal(grpc_call* call) {
441 grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError());
442 }
443
grpc_call_test_only_get_compression_algorithm(grpc_call * call)444 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
445 grpc_call* call) {
446 return grpc_core::Call::FromC(call)->incoming_compression_algorithm();
447 }
448
grpc_call_test_only_get_message_flags(grpc_call * call)449 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
450 return grpc_core::Call::FromC(call)->test_only_message_flags();
451 }
452
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)453 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
454 return grpc_core::Call::FromC(call)
455 ->encodings_accepted_by_peer()
456 .ToLegacyBitmask();
457 }
458
grpc_call_get_arena(grpc_call * call)459 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) {
460 return grpc_core::Call::FromC(call)->arena();
461 }
462
grpc_call_get_call_stack(grpc_call * call)463 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
464 return grpc_core::Call::FromC(call)->call_stack();
465 }
466
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)467 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
468 size_t nops, void* tag, void* reserved) {
469 GRPC_TRACE_LOG(api, INFO)
470 << "grpc_call_start_batch(call=" << call << ", ops=" << ops
471 << ", nops=" << (unsigned long)nops << ", tag=" << tag
472 << ", reserved=" << reserved << ")";
473
474 if (reserved != nullptr || call == nullptr) {
475 return GRPC_CALL_ERROR;
476 } else {
477 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
478 grpc_core::ExecCtx exec_ctx;
479 return grpc_core::Call::FromC(call)->StartBatch(ops, nops, tag, false);
480 }
481 }
482
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)483 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
484 const grpc_op* ops,
485 size_t nops,
486 grpc_closure* closure) {
487 return grpc_core::Call::FromC(call)->StartBatch(ops, nops, closure, true);
488 }
489
grpc_call_tracer_set(grpc_call * call,grpc_core::ClientCallTracer * tracer)490 void grpc_call_tracer_set(grpc_call* call,
491 grpc_core::ClientCallTracer* tracer) {
492 grpc_core::Arena* arena = grpc_call_get_arena(call);
493 return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
494 }
495
grpc_call_tracer_set_and_manage(grpc_call * call,grpc_core::ClientCallTracer * tracer)496 void grpc_call_tracer_set_and_manage(grpc_call* call,
497 grpc_core::ClientCallTracer* tracer) {
498 grpc_core::Arena* arena = grpc_call_get_arena(call);
499 arena->ManagedNew<ClientCallTracerWrapper>(tracer);
500 return arena->SetContext<grpc_core::CallTracerAnnotationInterface>(tracer);
501 }
502
grpc_call_tracer_get(grpc_call * call)503 void* grpc_call_tracer_get(grpc_call* call) {
504 grpc_core::Arena* arena = grpc_call_get_arena(call);
505 auto* call_tracer =
506 arena->GetContext<grpc_core::CallTracerAnnotationInterface>();
507 return call_tracer;
508 }
509
grpc_call_is_client(grpc_call * call)510 uint8_t grpc_call_is_client(grpc_call* call) {
511 return grpc_core::Call::FromC(call)->is_client();
512 }
513
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)514 grpc_compression_algorithm grpc_call_compression_for_level(
515 grpc_call* call, grpc_compression_level level) {
516 return grpc_core::Call::FromC(call)
517 ->encodings_accepted_by_peer()
518 .CompressionAlgorithmForLevel(level);
519 }
520
grpc_call_is_trailers_only(const grpc_call * call)521 bool grpc_call_is_trailers_only(const grpc_call* call) {
522 return grpc_core::Call::FromC(call)->is_trailers_only();
523 }
524
grpc_call_failed_before_recv_message(const grpc_call * c)525 int grpc_call_failed_before_recv_message(const grpc_call* c) {
526 return grpc_core::Call::FromC(c)->failed_before_recv_message();
527 }
528
grpc_call_server_authority(const grpc_call * call)529 absl::string_view grpc_call_server_authority(const grpc_call* call) {
530 return grpc_core::Call::FromC(call)->GetServerAuthority();
531 }
532
grpc_call_error_to_string(grpc_call_error error)533 const char* grpc_call_error_to_string(grpc_call_error error) {
534 switch (error) {
535 case GRPC_CALL_ERROR:
536 return "GRPC_CALL_ERROR";
537 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
538 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
539 case GRPC_CALL_ERROR_ALREADY_FINISHED:
540 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
541 case GRPC_CALL_ERROR_ALREADY_INVOKED:
542 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
543 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
544 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
545 case GRPC_CALL_ERROR_INVALID_FLAGS:
546 return "GRPC_CALL_ERROR_INVALID_FLAGS";
547 case GRPC_CALL_ERROR_INVALID_MESSAGE:
548 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
549 case GRPC_CALL_ERROR_INVALID_METADATA:
550 return "GRPC_CALL_ERROR_INVALID_METADATA";
551 case GRPC_CALL_ERROR_NOT_INVOKED:
552 return "GRPC_CALL_ERROR_NOT_INVOKED";
553 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
554 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
555 case GRPC_CALL_ERROR_NOT_ON_SERVER:
556 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
557 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
558 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
559 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
560 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
561 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
562 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
563 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
564 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
565 case GRPC_CALL_OK:
566 return "GRPC_CALL_OK";
567 }
568 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
569 }
570
grpc_call_run_in_event_engine(const grpc_call * call,absl::AnyInvocable<void ()> cb)571 void grpc_call_run_in_event_engine(const grpc_call* call,
572 absl::AnyInvocable<void()> cb) {
573 grpc_core::Call::FromC(call)
574 ->arena()
575 ->GetContext<grpc_event_engine::experimental::EventEngine>()
576 ->Run(std::move(cb));
577 }
578