• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <string>
28 
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65 
66 /** The maximum number of concurrent batches possible.
67     Based upon the maximum number of individually queueable ops in the batch
68     api:
69       - initial metadata send
70       - message send
71       - status/close send (depending on client/server)
72       - initial metadata recv
73       - message recv
74       - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76 
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78 
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81 
82 struct batch_control {
83   batch_control() = default;
84 
85   grpc_call* call = nullptr;
86   grpc_transport_stream_op_batch op;
87   /* Share memory for cq_completion and notify_tag as they are never needed
88      simultaneously. Each byte used in this data structure count as six bytes
89      per call, so any savings we can make are worthwhile,
90 
91      We use notify_tag to determine whether or not to send notification to the
92      completion queue. Once we've made that determination, we can reuse the
93      memory for cq_completion. */
94   union {
95     grpc_cq_completion cq_completion;
96     struct {
97       /* Any given op indicates completion by either (a) calling a closure or
98          (b) sending a notification on the call's completion queue.  If
99          \a is_closure is true, \a tag indicates a closure to be invoked;
100          otherwise, \a tag indicates the tag to be used in the notification to
101          be sent to the completion queue. */
102       void* tag;
103       bool is_closure;
104     } notify_tag;
105   } completion_data;
106   grpc_closure start_batch;
107   grpc_closure finish_batch;
108   grpc_core::Atomic<intptr_t> steps_to_complete;
109   gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110   void set_num_steps_to_complete(uintptr_t steps) {
111     steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112   }
completed_batch_stepbatch_control113   bool completed_batch_step() {
114     return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115   }
116 };
117 
118 struct parent_call {
parent_callparent_call119   parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120   ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121 
122   gpr_mu child_list_mu;
123   grpc_call* first_child = nullptr;
124 };
125 
126 struct child_call {
child_callchild_call127   explicit child_call(grpc_call* parent) : parent(parent) {}
128   grpc_call* parent;
129   /** siblings: children of the same parent form a list, and this list is
130      protected under
131       parent->mu */
132   grpc_call* sibling_next = nullptr;
133   grpc_call* sibling_prev = nullptr;
134 };
135 
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138 
139 struct grpc_call {
grpc_callgrpc_call140   grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141       : arena(arena),
142         cq(args.cq),
143         channel(args.channel),
144         is_client(args.server_transport_data == nullptr),
145         stream_op_payload(context) {
146     for (int i = 0; i < 2; i++) {
147       for (int j = 0; j < 2; j++) {
148         metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149       }
150     }
151   }
152 
~grpc_callgrpc_call153   ~grpc_call() {
154     gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155   }
156 
157   grpc_core::RefCount ext_ref;
158   grpc_core::Arena* arena;
159   grpc_core::CallCombiner call_combiner;
160   grpc_completion_queue* cq;
161   grpc_polling_entity pollent;
162   grpc_channel* channel;
163   gpr_cycle_counter start_time = gpr_get_cycle_counter();
164   /* parent_call* */ gpr_atm parent_call_atm = 0;
165   child_call* child = nullptr;
166 
167   /* client or server call */
168   bool is_client;
169   /** has grpc_call_unref been called */
170   bool destroy_called = false;
171   /** flag indicating that cancellation is inherited */
172   bool cancellation_is_inherited = false;
173   /** which ops are in-flight */
174   bool sent_initial_metadata = false;
175   bool sending_message = false;
176   bool sent_final_op = false;
177   bool received_initial_metadata = false;
178   bool receiving_message = false;
179   bool requested_final_op = false;
180   gpr_atm any_ops_sent_atm = 0;
181   gpr_atm received_final_op_atm = 0;
182 
183   batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184   grpc_transport_stream_op_batch_payload stream_op_payload;
185 
186   /* first idx: is_receiving, second idx: is_trailing */
187   grpc_metadata_batch metadata_batch[2][2] = {};
188 
189   /* Buffered read metadata waiting to be returned to the application.
190      Element 0 is initial metadata, element 1 is trailing metadata. */
191   grpc_metadata_array* buffered_metadata[2] = {};
192 
193   grpc_metadata compression_md;
194 
195   // A char* indicating the peer name.
196   gpr_atm peer_string = 0;
197 
198   /* Call data useful used for reporting. Only valid after the call has
199    * completed */
200   grpc_call_final_info final_info;
201 
202   /* Compression algorithm for *incoming* data */
203   grpc_message_compression_algorithm incoming_message_compression_algorithm =
204       GRPC_MESSAGE_COMPRESS_NONE;
205   /* Stream compression algorithm for *incoming* data */
206   grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207       GRPC_STREAM_COMPRESS_NONE;
208   /* Supported encodings (compression algorithms), a bitset.
209    * Always support no compression. */
210   uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211   /* Supported stream encodings (stream compression algorithms), a bitset */
212   uint32_t stream_encodings_accepted_by_peer = 0;
213 
214   /* Contexts for various subsystems (security, tracing, ...). */
215   grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216 
217   /* for the client, extra metadata is initial metadata; for the
218      server, it's trailing metadata */
219   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220   int send_extra_metadata_count;
221   grpc_millis send_deadline;
222 
223   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224 
225   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226   grpc_byte_buffer** receiving_buffer = nullptr;
227   grpc_slice receiving_slice = grpc_empty_slice();
228   grpc_closure receiving_slice_ready;
229   grpc_closure receiving_stream_ready;
230   grpc_closure receiving_initial_metadata_ready;
231   grpc_closure receiving_trailing_metadata_ready;
232   uint32_t test_only_last_message_flags = 0;
233   // Status about operation of call
234   bool sent_server_trailing_metadata = false;
235   gpr_atm cancelled_with_error = 0;
236 
237   grpc_closure release_call;
238 
239   union {
240     struct {
241       grpc_status_code* status;
242       grpc_slice* status_details;
243       const char** error_string;
244     } client;
245     struct {
246       int* cancelled;
247       // backpointer to owning server if this is a server side call.
248       grpc_core::Server* core_server;
249     } server;
250   } final_op;
251   gpr_atm status_error = 0;
252 
253   /* recv_state can contain one of the following values:
254      RECV_NONE :                 :  no initial metadata and messages received
255      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
256      a batch_control*            :  received messages first
257 
258                  +------1------RECV_NONE------3-----+
259                  |                                  |
260                  |                                  |
261                  v                                  v
262      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
263            |           ^                      |           ^
264            |           |                      |           |
265            +-----2-----+                      +-----4-----+
266 
267     For 1, 4: See receiving_initial_metadata_ready() function
268     For 2, 3: See receiving_stream_ready() function */
269   gpr_atm recv_state = 0;
270 };
271 
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274 
275 #define CALL_STACK_FROM_CALL(call)   \
276   (grpc_call_stack*)((char*)(call) + \
277                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279   (grpc_call*)(((char*)(call_stack)) -   \
280                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281 
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286 
287 static void execute_batch(grpc_call* call,
288                           grpc_transport_stream_op_batch* batch,
289                           grpc_closure* start_batch_closure);
290 
291 static void cancel_with_status(grpc_call* c, grpc_status_code status,
292                                const char* description);
293 static void cancel_with_error(grpc_call* c, grpc_error_handle error);
294 static void destroy_call(void* call_stack, grpc_error_handle error);
295 static void receiving_slice_ready(void* bctlp, grpc_error_handle error);
296 static void set_final_status(grpc_call* call, grpc_error_handle error);
297 static void process_data_after_md(batch_control* bctl);
298 static void post_batch_completion(batch_control* bctl);
299 
add_init_error(grpc_error_handle * composite,grpc_error_handle new_err)300 static void add_init_error(grpc_error_handle* composite,
301                            grpc_error_handle new_err) {
302   if (new_err == GRPC_ERROR_NONE) return;
303   if (*composite == GRPC_ERROR_NONE) {
304     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
305   }
306   *composite = grpc_error_add_child(*composite, new_err);
307 }
308 
grpc_call_arena_alloc(grpc_call * call,size_t size)309 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
310   return call->arena->Alloc(size);
311 }
312 
get_or_create_parent_call(grpc_call * call)313 static parent_call* get_or_create_parent_call(grpc_call* call) {
314   parent_call* p =
315       reinterpret_cast<parent_call*>(gpr_atm_acq_load(&call->parent_call_atm));
316   if (p == nullptr) {
317     p = call->arena->New<parent_call>();
318     if (!gpr_atm_rel_cas(&call->parent_call_atm,
319                          reinterpret_cast<gpr_atm>(nullptr),
320                          reinterpret_cast<gpr_atm>(p))) {
321       p->~parent_call();
322       p = reinterpret_cast<parent_call*>(
323           gpr_atm_acq_load(&call->parent_call_atm));
324     }
325   }
326   return p;
327 }
328 
get_parent_call(grpc_call * call)329 static parent_call* get_parent_call(grpc_call* call) {
330   return reinterpret_cast<parent_call*>(
331       gpr_atm_acq_load(&call->parent_call_atm));
332 }
333 
grpc_call_get_initial_size_estimate()334 size_t grpc_call_get_initial_size_estimate() {
335   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
336          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
337 }
338 
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)339 grpc_error_handle grpc_call_create(const grpc_call_create_args* args,
340                                    grpc_call** out_call) {
341   GPR_TIMER_SCOPE("grpc_call_create", 0);
342 
343   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
344 
345   grpc_core::Arena* arena;
346   grpc_call* call;
347   grpc_error_handle error = GRPC_ERROR_NONE;
348   grpc_channel_stack* channel_stack =
349       grpc_channel_get_channel_stack(args->channel);
350   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
351   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
352   size_t call_and_stack_size =
353       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
354       channel_stack->call_stack_size;
355   size_t call_alloc_size =
356       call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
357 
358   std::pair<grpc_core::Arena*, void*> arena_with_call =
359       grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
360   arena = arena_with_call.first;
361   call = new (arena_with_call.second) grpc_call(arena, *args);
362   *out_call = call;
363   grpc_slice path = grpc_empty_slice();
364   if (call->is_client) {
365     call->final_op.client.status_details = nullptr;
366     call->final_op.client.status = nullptr;
367     call->final_op.client.error_string = nullptr;
368     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
369     GPR_ASSERT(args->add_initial_metadata_count <
370                MAX_SEND_EXTRA_METADATA_COUNT);
371     for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
372       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
373       if (grpc_slice_eq_static_interned(
374               GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
375         path = grpc_slice_ref_internal(
376             GRPC_MDVALUE(args->add_initial_metadata[i]));
377       }
378     }
379     call->send_extra_metadata_count =
380         static_cast<int>(args->add_initial_metadata_count);
381   } else {
382     GRPC_STATS_INC_SERVER_CALLS_CREATED();
383     call->final_op.server.cancelled = nullptr;
384     call->final_op.server.core_server = args->server;
385     GPR_ASSERT(args->add_initial_metadata_count == 0);
386     call->send_extra_metadata_count = 0;
387   }
388 
389   grpc_millis send_deadline = args->send_deadline;
390   bool immediately_cancel = false;
391 
392   if (args->parent != nullptr) {
393     call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
394                        call_and_stack_size) child_call(args->parent);
395 
396     GRPC_CALL_INTERNAL_REF(args->parent, "child");
397     GPR_ASSERT(call->is_client);
398     GPR_ASSERT(!args->parent->is_client);
399 
400     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
401       send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
402     }
403     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
404      * GRPC_PROPAGATE_STATS_CONTEXT */
405     /* TODO(ctiller): This should change to use the appropriate census start_op
406      * call. */
407     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
408       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
409         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
410                                    "Census tracing propagation requested "
411                                    "without Census context propagation"));
412       }
413       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
414                             args->parent->context[GRPC_CONTEXT_TRACING].value,
415                             nullptr);
416     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
417       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
418                                  "Census context propagation requested "
419                                  "without Census tracing propagation"));
420     }
421     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
422       call->cancellation_is_inherited = true;
423       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
424         immediately_cancel = true;
425       }
426     }
427   }
428   call->send_deadline = send_deadline;
429   /* initial refcount dropped by grpc_call_unref */
430   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
431                                       args->server_transport_data,
432                                       call->context,
433                                       path,
434                                       call->start_time,
435                                       send_deadline,
436                                       call->arena,
437                                       &call->call_combiner};
438   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
439                                               call, &call_args));
440   // Publish this call to parent only after the call stack has been initialized.
441   if (args->parent != nullptr) {
442     child_call* cc = call->child;
443     parent_call* pc = get_or_create_parent_call(args->parent);
444     gpr_mu_lock(&pc->child_list_mu);
445     if (pc->first_child == nullptr) {
446       pc->first_child = call;
447       cc->sibling_next = cc->sibling_prev = call;
448     } else {
449       cc->sibling_next = pc->first_child;
450       cc->sibling_prev = pc->first_child->child->sibling_prev;
451       cc->sibling_next->child->sibling_prev =
452           cc->sibling_prev->child->sibling_next = call;
453     }
454     gpr_mu_unlock(&pc->child_list_mu);
455   }
456 
457   if (error != GRPC_ERROR_NONE) {
458     cancel_with_error(call, GRPC_ERROR_REF(error));
459   }
460   if (immediately_cancel) {
461     cancel_with_error(call, GRPC_ERROR_CANCELLED);
462   }
463   if (args->cq != nullptr) {
464     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
465                "Only one of 'cq' and 'pollset_set_alternative' should be "
466                "non-nullptr.");
467     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
468     call->pollent =
469         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
470   }
471   if (args->pollset_set_alternative != nullptr) {
472     call->pollent = grpc_polling_entity_create_from_pollset_set(
473         args->pollset_set_alternative);
474   }
475   if (!grpc_polling_entity_is_empty(&call->pollent)) {
476     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
477                                                &call->pollent);
478   }
479 
480   if (call->is_client) {
481     grpc_core::channelz::ChannelNode* channelz_channel =
482         grpc_channel_get_channelz_node(call->channel);
483     if (channelz_channel != nullptr) {
484       channelz_channel->RecordCallStarted();
485     }
486   } else if (call->final_op.server.core_server != nullptr) {
487     grpc_core::channelz::ServerNode* channelz_node =
488         call->final_op.server.core_server->channelz_node();
489     if (channelz_node != nullptr) {
490       channelz_node->RecordCallStarted();
491     }
492   }
493 
494   grpc_slice_unref_internal(path);
495 
496   return error;
497 }
498 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)499 void grpc_call_set_completion_queue(grpc_call* call,
500                                     grpc_completion_queue* cq) {
501   GPR_ASSERT(cq);
502 
503   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
504     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
505     abort();
506   }
507   call->cq = cq;
508   GRPC_CQ_INTERNAL_REF(cq, "bind");
509   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
510   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
511                                              &call->pollent);
512 }
513 
514 #ifndef NDEBUG
515 #define REF_REASON reason
516 #define REF_ARG , const char* reason
517 #else
518 #define REF_REASON ""
519 #define REF_ARG
520 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)521 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
522   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
523 }
grpc_call_internal_unref(grpc_call * c REF_ARG)524 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
525   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
526 }
527 
release_call(void * call,grpc_error_handle)528 static void release_call(void* call, grpc_error_handle /*error*/) {
529   grpc_call* c = static_cast<grpc_call*>(call);
530   grpc_channel* channel = c->channel;
531   grpc_core::Arena* arena = c->arena;
532   c->~grpc_call();
533   grpc_channel_update_call_size_estimate(channel, arena->Destroy());
534   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
535 }
536 
destroy_call(void * call,grpc_error_handle)537 static void destroy_call(void* call, grpc_error_handle /*error*/) {
538   GPR_TIMER_SCOPE("destroy_call", 0);
539   size_t i;
540   int ii;
541   grpc_call* c = static_cast<grpc_call*>(call);
542   for (i = 0; i < 2; i++) {
543     grpc_metadata_batch_destroy(
544         &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
545   }
546   c->receiving_stream.reset();
547   parent_call* pc = get_parent_call(c);
548   if (pc != nullptr) {
549     pc->~parent_call();
550   }
551   for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
552     GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
553   }
554   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
555     if (c->context[i].destroy) {
556       c->context[i].destroy(c->context[i].value);
557     }
558   }
559   if (c->cq) {
560     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
561   }
562 
563   grpc_error_handle status_error =
564       reinterpret_cast<grpc_error_handle>(gpr_atm_acq_load(&c->status_error));
565   grpc_error_get_status(status_error, c->send_deadline,
566                         &c->final_info.final_status, nullptr, nullptr,
567                         &(c->final_info.error_string));
568   GRPC_ERROR_UNREF(status_error);
569   c->final_info.stats.latency =
570       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
571   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
572                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
573                                             grpc_schedule_on_exec_ctx));
574 }
575 
grpc_call_ref(grpc_call * c)576 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
577 
grpc_call_unref(grpc_call * c)578 void grpc_call_unref(grpc_call* c) {
579   if (GPR_LIKELY(!c->ext_ref.Unref())) return;
580 
581   GPR_TIMER_SCOPE("grpc_call_unref", 0);
582 
583   child_call* cc = c->child;
584   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
585   grpc_core::ExecCtx exec_ctx;
586 
587   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
588 
589   if (cc) {
590     parent_call* pc = get_parent_call(cc->parent);
591     gpr_mu_lock(&pc->child_list_mu);
592     if (c == pc->first_child) {
593       pc->first_child = cc->sibling_next;
594       if (c == pc->first_child) {
595         pc->first_child = nullptr;
596       }
597     }
598     cc->sibling_prev->child->sibling_next = cc->sibling_next;
599     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
600     gpr_mu_unlock(&pc->child_list_mu);
601     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
602   }
603 
604   GPR_ASSERT(!c->destroy_called);
605   c->destroy_called = true;
606   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
607                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
608   if (cancel) {
609     cancel_with_error(c, GRPC_ERROR_CANCELLED);
610   } else {
611     // Unset the call combiner cancellation closure.  This has the
612     // effect of scheduling the previously set cancellation closure, if
613     // any, so that it can release any internal references it may be
614     // holding to the call stack.
615     c->call_combiner.SetNotifyOnCancel(nullptr);
616   }
617   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
618 }
619 
grpc_call_cancel(grpc_call * call,void * reserved)620 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
621   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
622   GPR_ASSERT(!reserved);
623   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
624   grpc_core::ExecCtx exec_ctx;
625   cancel_with_error(call, GRPC_ERROR_CANCELLED);
626   return GRPC_CALL_OK;
627 }
628 
629 // This is called via the call combiner to start sending a batch down
630 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error_handle)631 static void execute_batch_in_call_combiner(void* arg,
632                                            grpc_error_handle /*ignored*/) {
633   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
634   grpc_transport_stream_op_batch* batch =
635       static_cast<grpc_transport_stream_op_batch*>(arg);
636   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
637   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
638   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
639   elem->filter->start_transport_stream_op_batch(elem, batch);
640 }
641 
642 // start_batch_closure points to a caller-allocated closure to be used
643 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)644 static void execute_batch(grpc_call* call,
645                           grpc_transport_stream_op_batch* batch,
646                           grpc_closure* start_batch_closure) {
647   batch->handler_private.extra_arg = call;
648   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
649                     grpc_schedule_on_exec_ctx);
650   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
651                            GRPC_ERROR_NONE, "executing batch");
652 }
653 
grpc_call_get_peer(grpc_call * call)654 char* grpc_call_get_peer(grpc_call* call) {
655   char* peer_string =
656       reinterpret_cast<char*>(gpr_atm_acq_load(&call->peer_string));
657   if (peer_string != nullptr) return gpr_strdup(peer_string);
658   peer_string = grpc_channel_get_target(call->channel);
659   if (peer_string != nullptr) return peer_string;
660   return gpr_strdup("unknown");
661 }
662 
grpc_call_from_top_element(grpc_call_element * surface_element)663 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
664   return CALL_FROM_TOP_ELEM(surface_element);
665 }
666 
667 /*******************************************************************************
668  * CANCELLATION
669  */
670 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)671 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
672                                              grpc_status_code status,
673                                              const char* description,
674                                              void* reserved) {
675   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
676   grpc_core::ExecCtx exec_ctx;
677   GRPC_API_TRACE(
678       "grpc_call_cancel_with_status("
679       "c=%p, status=%d, description=%s, reserved=%p)",
680       4, (c, (int)status, description, reserved));
681   GPR_ASSERT(reserved == nullptr);
682   cancel_with_status(c, status, description);
683   return GRPC_CALL_OK;
684 }
685 
686 struct cancel_state {
687   grpc_call* call;
688   grpc_closure start_batch;
689   grpc_closure finish_batch;
690 };
691 // The on_complete callback used when sending a cancel_stream batch down
692 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)693 static void done_termination(void* arg, grpc_error_handle /*error*/) {
694   cancel_state* state = static_cast<cancel_state*>(arg);
695   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
696                           "on_complete for cancel_stream op");
697   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
698   gpr_free(state);
699 }
700 
cancel_with_error(grpc_call * c,grpc_error_handle error)701 static void cancel_with_error(grpc_call* c, grpc_error_handle error) {
702   if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
703     GRPC_ERROR_UNREF(error);
704     return;
705   }
706   GRPC_CALL_INTERNAL_REF(c, "termination");
707   // Inform the call combiner of the cancellation, so that it can cancel
708   // any in-flight asynchronous actions that may be holding the call
709   // combiner.  This ensures that the cancel_stream batch can be sent
710   // down the filter stack in a timely manner.
711   c->call_combiner.Cancel(GRPC_ERROR_REF(error));
712   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
713   state->call = c;
714   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
715                     grpc_schedule_on_exec_ctx);
716   grpc_transport_stream_op_batch* op =
717       grpc_make_transport_stream_op(&state->finish_batch);
718   op->cancel_stream = true;
719   op->payload->cancel_stream.cancel_error = error;
720   execute_batch(c, op, &state->start_batch);
721 }
722 
grpc_call_cancel_internal(grpc_call * call)723 void grpc_call_cancel_internal(grpc_call* call) {
724   cancel_with_error(call, GRPC_ERROR_CANCELLED);
725 }
726 
error_from_status(grpc_status_code status,const char * description)727 static grpc_error_handle error_from_status(grpc_status_code status,
728                                            const char* description) {
729   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
730   // guarantee that can be short-lived.
731   return grpc_error_set_int(
732       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
733                          GRPC_ERROR_STR_GRPC_MESSAGE,
734                          grpc_slice_from_copied_string(description)),
735       GRPC_ERROR_INT_GRPC_STATUS, status);
736 }
737 
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)738 static void cancel_with_status(grpc_call* c, grpc_status_code status,
739                                const char* description) {
740   cancel_with_error(c, error_from_status(status, description));
741 }
742 
set_final_status(grpc_call * call,grpc_error_handle error)743 static void set_final_status(grpc_call* call, grpc_error_handle error) {
744   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
745     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
746     gpr_log(GPR_DEBUG, "%s", grpc_error_std_string(error).c_str());
747   }
748   if (call->is_client) {
749     grpc_error_get_status(error, call->send_deadline,
750                           call->final_op.client.status,
751                           call->final_op.client.status_details, nullptr,
752                           call->final_op.client.error_string);
753     // explicitly take a ref
754     grpc_slice_ref_internal(*call->final_op.client.status_details);
755     gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
756     grpc_core::channelz::ChannelNode* channelz_channel =
757         grpc_channel_get_channelz_node(call->channel);
758     if (channelz_channel != nullptr) {
759       if (*call->final_op.client.status != GRPC_STATUS_OK) {
760         channelz_channel->RecordCallFailed();
761       } else {
762         channelz_channel->RecordCallSucceeded();
763       }
764     }
765   } else {
766     *call->final_op.server.cancelled =
767         error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
768     grpc_core::channelz::ServerNode* channelz_node =
769         call->final_op.server.core_server->channelz_node();
770     if (channelz_node != nullptr) {
771       if (*call->final_op.server.cancelled ||
772           reinterpret_cast<grpc_error_handle>(
773               gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
774         channelz_node->RecordCallFailed();
775       } else {
776         channelz_node->RecordCallSucceeded();
777       }
778     }
779     GRPC_ERROR_UNREF(error);
780   }
781 }
782 
783 /*******************************************************************************
784  * COMPRESSION
785  */
786 
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)787 static void set_incoming_message_compression_algorithm(
788     grpc_call* call, grpc_message_compression_algorithm algo) {
789   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
790   call->incoming_message_compression_algorithm = algo;
791 }
792 
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)793 static void set_incoming_stream_compression_algorithm(
794     grpc_call* call, grpc_stream_compression_algorithm algo) {
795   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
796   call->incoming_stream_compression_algorithm = algo;
797 }
798 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)799 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
800     grpc_call* call) {
801   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
802   grpc_compression_algorithm_from_message_stream_compression_algorithm(
803       &algorithm, call->incoming_message_compression_algorithm,
804       call->incoming_stream_compression_algorithm);
805   return algorithm;
806 }
807 
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)808 static grpc_compression_algorithm compression_algorithm_for_level_locked(
809     grpc_call* call, grpc_compression_level level) {
810   return grpc_compression_algorithm_for_level(level,
811                                               call->encodings_accepted_by_peer);
812 }
813 
grpc_call_test_only_get_message_flags(grpc_call * call)814 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
815   uint32_t flags;
816   flags = call->test_only_last_message_flags;
817   return flags;
818 }
819 
destroy_encodings_accepted_by_peer(void *)820 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
821 
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)822 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
823                                            grpc_mdelem mdel,
824                                            uint32_t* encodings_accepted_by_peer,
825                                            bool stream_encoding) {
826   size_t i;
827   uint32_t algorithm;
828   grpc_slice_buffer accept_encoding_parts;
829   grpc_slice accept_encoding_slice;
830   void* accepted_user_data;
831 
832   accepted_user_data =
833       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
834   if (accepted_user_data != nullptr) {
835     *encodings_accepted_by_peer = static_cast<uint32_t>(
836         reinterpret_cast<uintptr_t>(accepted_user_data) - 1);
837     return;
838   }
839 
840   *encodings_accepted_by_peer = 0;
841 
842   accept_encoding_slice = GRPC_MDVALUE(mdel);
843   grpc_slice_buffer_init(&accept_encoding_parts);
844   grpc_slice_split_without_space(accept_encoding_slice, ",",
845                                  &accept_encoding_parts);
846 
847   GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
848   for (i = 0; i < accept_encoding_parts.count; i++) {
849     int r;
850     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
851     if (!stream_encoding) {
852       r = grpc_message_compression_algorithm_parse(
853           accept_encoding_entry_slice,
854           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
855     } else {
856       r = grpc_stream_compression_algorithm_parse(
857           accept_encoding_entry_slice,
858           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
859     }
860     if (r) {
861       GPR_BITSET(encodings_accepted_by_peer, algorithm);
862     } else {
863       char* accept_encoding_entry_str =
864           grpc_slice_to_c_string(accept_encoding_entry_slice);
865       gpr_log(GPR_DEBUG,
866               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
867               accept_encoding_entry_str);
868       gpr_free(accept_encoding_entry_str);
869     }
870   }
871 
872   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
873 
874   grpc_mdelem_set_user_data(
875       mdel, destroy_encodings_accepted_by_peer,
876       reinterpret_cast<void*>(
877           static_cast<uintptr_t>(*encodings_accepted_by_peer) + 1));
878 }
879 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)880 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
881   uint32_t encodings_accepted_by_peer;
882   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
883   return encodings_accepted_by_peer;
884 }
885 
886 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)887 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
888   return call->incoming_stream_compression_algorithm;
889 }
890 
linked_from_md(grpc_metadata * md)891 static grpc_linked_mdelem* linked_from_md(grpc_metadata* md) {
892   return reinterpret_cast<grpc_linked_mdelem*>(&md->internal_data);
893 }
894 
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)895 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
896                                   grpc_metadata* additional_metadata, int i,
897                                   int count) {
898   grpc_metadata* res =
899       i < count ? &metadata[i] : &additional_metadata[i - count];
900   GPR_ASSERT(res);
901   return res;
902 }
903 
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)904 static int prepare_application_metadata(grpc_call* call, int count,
905                                         grpc_metadata* metadata,
906                                         int is_trailing,
907                                         int prepend_extra_metadata,
908                                         grpc_metadata* additional_metadata,
909                                         int additional_metadata_count) {
910   int total_count = count + additional_metadata_count;
911   int i;
912   grpc_metadata_batch* batch =
913       &call->metadata_batch[0 /* is_receiving */][is_trailing];
914   for (i = 0; i < total_count; i++) {
915     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
916     grpc_linked_mdelem* l = linked_from_md(md);
917     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
918     if (!GRPC_LOG_IF_ERROR("validate_metadata",
919                            grpc_validate_header_key_is_legal(md->key))) {
920       break;
921     } else if (!grpc_is_binary_header_internal(md->key) &&
922                !GRPC_LOG_IF_ERROR(
923                    "validate_metadata",
924                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
925       break;
926     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
927       // HTTP2 hpack encoding has a maximum limit.
928       break;
929     }
930     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
931   }
932   if (i != total_count) {
933     for (int j = 0; j < i; j++) {
934       grpc_metadata* md = get_md_elem(metadata, additional_metadata, j, count);
935       grpc_linked_mdelem* l = linked_from_md(md);
936       GRPC_MDELEM_UNREF(l->md);
937     }
938     return 0;
939   }
940   if (prepend_extra_metadata) {
941     if (call->send_extra_metadata_count == 0) {
942       prepend_extra_metadata = 0;
943     } else {
944       for (i = 0; i < call->send_extra_metadata_count; i++) {
945         GRPC_LOG_IF_ERROR("prepare_application_metadata",
946                           grpc_metadata_batch_link_tail(
947                               batch, &call->send_extra_metadata[i]));
948       }
949     }
950   }
951   for (i = 0; i < total_count; i++) {
952     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
953     grpc_linked_mdelem* l = linked_from_md(md);
954     grpc_error_handle error = grpc_metadata_batch_link_tail(batch, l);
955     if (error != GRPC_ERROR_NONE) {
956       GRPC_MDELEM_UNREF(l->md);
957     }
958     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
959   }
960   call->send_extra_metadata_count = 0;
961 
962   return 1;
963 }
964 
decode_message_compression(grpc_mdelem md)965 static grpc_message_compression_algorithm decode_message_compression(
966     grpc_mdelem md) {
967   grpc_message_compression_algorithm algorithm =
968       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
969   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
970     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
971     gpr_log(GPR_ERROR,
972             "Invalid incoming message compression algorithm: '%s'. "
973             "Interpreting incoming data as uncompressed.",
974             md_c_str);
975     gpr_free(md_c_str);
976     return GRPC_MESSAGE_COMPRESS_NONE;
977   }
978   return algorithm;
979 }
980 
decode_stream_compression(grpc_mdelem md)981 static grpc_stream_compression_algorithm decode_stream_compression(
982     grpc_mdelem md) {
983   grpc_stream_compression_algorithm algorithm =
984       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
985   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
986     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
987     gpr_log(GPR_ERROR,
988             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
989             "incoming data as uncompressed.",
990             md_c_str);
991     gpr_free(md_c_str);
992     return GRPC_STREAM_COMPRESS_NONE;
993   }
994   return algorithm;
995 }
996 
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)997 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
998                                  int is_trailing) {
999   if (b->list.count == 0) return;
1000   if (!call->is_client && is_trailing) return;
1001   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
1002   GPR_TIMER_SCOPE("publish_app_metadata", 0);
1003   grpc_metadata_array* dest;
1004   grpc_metadata* mdusr;
1005   dest = call->buffered_metadata[is_trailing];
1006   if (dest->count + b->list.count > dest->capacity) {
1007     dest->capacity =
1008         GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1009     dest->metadata = static_cast<grpc_metadata*>(
1010         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1011   }
1012   for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1013     mdusr = &dest->metadata[dest->count++];
1014     /* we pass back borrowed slices that are valid whilst the call is valid */
1015     mdusr->key = GRPC_MDKEY(l->md);
1016     mdusr->value = GRPC_MDVALUE(l->md);
1017   }
1018 }
1019 
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1020 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1021   if (b->idx.named.content_encoding != nullptr) {
1022     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1023     set_incoming_stream_compression_algorithm(
1024         call, decode_stream_compression(b->idx.named.content_encoding->md));
1025     grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1026   }
1027   if (b->idx.named.grpc_encoding != nullptr) {
1028     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1029     set_incoming_message_compression_algorithm(
1030         call, decode_message_compression(b->idx.named.grpc_encoding->md));
1031     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1032   }
1033   uint32_t message_encodings_accepted_by_peer = 1u;
1034   uint32_t stream_encodings_accepted_by_peer = 1u;
1035   if (b->idx.named.grpc_accept_encoding != nullptr) {
1036     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1037     set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1038                                    &message_encodings_accepted_by_peer, false);
1039     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1040   }
1041   if (b->idx.named.accept_encoding != nullptr) {
1042     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1043     set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1044                                    &stream_encodings_accepted_by_peer, true);
1045     grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1046   }
1047   call->encodings_accepted_by_peer =
1048       grpc_compression_bitset_from_message_stream_compression_bitset(
1049           message_encodings_accepted_by_peer,
1050           stream_encodings_accepted_by_peer);
1051   publish_app_metadata(call, b, false);
1052 }
1053 
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error_handle batch_error)1054 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1055                                  grpc_error_handle batch_error) {
1056   grpc_call* call = static_cast<grpc_call*>(args);
1057   if (batch_error != GRPC_ERROR_NONE) {
1058     set_final_status(call, batch_error);
1059   } else if (b->idx.named.grpc_status != nullptr) {
1060     grpc_status_code status_code =
1061         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1062     grpc_error_handle error = GRPC_ERROR_NONE;
1063     if (status_code != GRPC_STATUS_OK) {
1064       char* peer = grpc_call_get_peer(call);
1065       error = grpc_error_set_int(
1066           GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1067               absl::StrCat("Error received from peer ", peer).c_str()),
1068           GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1069       gpr_free(peer);
1070     }
1071     if (b->idx.named.grpc_message != nullptr) {
1072       error = grpc_error_set_str(
1073           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1074           grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1075       grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1076     } else if (error != GRPC_ERROR_NONE) {
1077       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1078                                  grpc_empty_slice());
1079     }
1080     set_final_status(call, GRPC_ERROR_REF(error));
1081     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1082     GRPC_ERROR_UNREF(error);
1083   } else if (!call->is_client) {
1084     set_final_status(call, GRPC_ERROR_NONE);
1085   } else {
1086     gpr_log(GPR_DEBUG,
1087             "Received trailing metadata with no error and no status");
1088     set_final_status(
1089         call, grpc_error_set_int(
1090                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1091                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1092   }
1093   publish_app_metadata(call, b, true);
1094 }
1095 
grpc_call_get_arena(grpc_call * call)1096 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1097 
grpc_call_get_call_stack(grpc_call * call)1098 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1099   return CALL_STACK_FROM_CALL(call);
1100 }
1101 
1102 /*******************************************************************************
1103  * BATCH API IMPLEMENTATION
1104  */
1105 
are_write_flags_valid(uint32_t flags)1106 static bool are_write_flags_valid(uint32_t flags) {
1107   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1108   const uint32_t allowed_write_positions =
1109       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1110   const uint32_t invalid_positions = ~allowed_write_positions;
1111   return !(flags & invalid_positions);
1112 }
1113 
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1114 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1115   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1116   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1117   if (!is_client) {
1118     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1119   }
1120   return !(flags & invalid_positions);
1121 }
1122 
batch_slot_for_op(grpc_op_type type)1123 static size_t batch_slot_for_op(grpc_op_type type) {
1124   switch (type) {
1125     case GRPC_OP_SEND_INITIAL_METADATA:
1126       return 0;
1127     case GRPC_OP_SEND_MESSAGE:
1128       return 1;
1129     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1130     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1131       return 2;
1132     case GRPC_OP_RECV_INITIAL_METADATA:
1133       return 3;
1134     case GRPC_OP_RECV_MESSAGE:
1135       return 4;
1136     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1137     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1138       return 5;
1139   }
1140   GPR_UNREACHABLE_CODE(return 123456789);
1141 }
1142 
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1143 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1144                                                       const grpc_op* ops) {
1145   size_t slot_idx = batch_slot_for_op(ops[0].op);
1146   batch_control** pslot = &call->active_batches[slot_idx];
1147   batch_control* bctl;
1148   if (*pslot != nullptr) {
1149     bctl = *pslot;
1150     if (bctl->call != nullptr) {
1151       return nullptr;
1152     }
1153     bctl->~batch_control();
1154     bctl->op = {};
1155   } else {
1156     bctl = call->arena->New<batch_control>();
1157     *pslot = bctl;
1158   }
1159   bctl->call = call;
1160   bctl->op.payload = &call->stream_op_payload;
1161   return bctl;
1162 }
1163 
finish_batch_completion(void * user_data,grpc_cq_completion *)1164 static void finish_batch_completion(void* user_data,
1165                                     grpc_cq_completion* /*storage*/) {
1166   batch_control* bctl = static_cast<batch_control*>(user_data);
1167   grpc_call* call = bctl->call;
1168   bctl->call = nullptr;
1169   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1170 }
1171 
reset_batch_errors(batch_control * bctl)1172 static void reset_batch_errors(batch_control* bctl) {
1173   GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
1174       gpr_atm_acq_load(&bctl->batch_error)));
1175   gpr_atm_rel_store(&bctl->batch_error,
1176                     reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1177 }
1178 
post_batch_completion(batch_control * bctl)1179 static void post_batch_completion(batch_control* bctl) {
1180   grpc_call* next_child_call;
1181   grpc_call* call = bctl->call;
1182   grpc_error_handle error = GRPC_ERROR_REF(reinterpret_cast<grpc_error_handle>(
1183       gpr_atm_acq_load(&bctl->batch_error)));
1184 
1185   if (bctl->op.send_initial_metadata) {
1186     grpc_metadata_batch_destroy(
1187         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1188   }
1189   if (bctl->op.send_message) {
1190     if (bctl->op.payload->send_message.stream_write_closed &&
1191         error == GRPC_ERROR_NONE) {
1192       error = grpc_error_add_child(
1193           error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1194                      "Attempt to send message after stream was closed."));
1195     }
1196     call->sending_message = false;
1197   }
1198   if (bctl->op.send_trailing_metadata) {
1199     grpc_metadata_batch_destroy(
1200         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1201   }
1202   if (bctl->op.recv_trailing_metadata) {
1203     /* propagate cancellation to any interested children */
1204     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1205     parent_call* pc = get_parent_call(call);
1206     if (pc != nullptr) {
1207       grpc_call* child;
1208       gpr_mu_lock(&pc->child_list_mu);
1209       child = pc->first_child;
1210       if (child != nullptr) {
1211         do {
1212           next_child_call = child->child->sibling_next;
1213           if (child->cancellation_is_inherited) {
1214             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1215             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1216             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1217           }
1218           child = next_child_call;
1219         } while (child != pc->first_child);
1220       }
1221       gpr_mu_unlock(&pc->child_list_mu);
1222     }
1223     GRPC_ERROR_UNREF(error);
1224     error = GRPC_ERROR_NONE;
1225   }
1226   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1227       *call->receiving_buffer != nullptr) {
1228     grpc_byte_buffer_destroy(*call->receiving_buffer);
1229     *call->receiving_buffer = nullptr;
1230   }
1231   reset_batch_errors(bctl);
1232 
1233   if (bctl->completion_data.notify_tag.is_closure) {
1234     /* unrefs error */
1235     bctl->call = nullptr;
1236     grpc_core::Closure::Run(
1237         DEBUG_LOCATION,
1238         static_cast<grpc_closure*>(bctl->completion_data.notify_tag.tag),
1239         error);
1240     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1241   } else {
1242     /* unrefs error */
1243     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1244                    finish_batch_completion, bctl,
1245                    &bctl->completion_data.cq_completion);
1246   }
1247 }
1248 
finish_batch_step(batch_control * bctl)1249 static void finish_batch_step(batch_control* bctl) {
1250   if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1251     post_batch_completion(bctl);
1252   }
1253 }
1254 
continue_receiving_slices(batch_control * bctl)1255 static void continue_receiving_slices(batch_control* bctl) {
1256   grpc_error_handle error;
1257   grpc_call* call = bctl->call;
1258   for (;;) {
1259     size_t remaining = call->receiving_stream->length() -
1260                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1261     if (remaining == 0) {
1262       call->receiving_message = false;
1263       call->receiving_stream.reset();
1264       finish_batch_step(bctl);
1265       return;
1266     }
1267     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1268       error = call->receiving_stream->Pull(&call->receiving_slice);
1269       if (error == GRPC_ERROR_NONE) {
1270         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1271                               call->receiving_slice);
1272       } else {
1273         call->receiving_stream.reset();
1274         grpc_byte_buffer_destroy(*call->receiving_buffer);
1275         *call->receiving_buffer = nullptr;
1276         call->receiving_message = false;
1277         finish_batch_step(bctl);
1278         GRPC_ERROR_UNREF(error);
1279         return;
1280       }
1281     } else {
1282       return;
1283     }
1284   }
1285 }
1286 
receiving_slice_ready(void * bctlp,grpc_error_handle error)1287 static void receiving_slice_ready(void* bctlp, grpc_error_handle error) {
1288   batch_control* bctl = static_cast<batch_control*>(bctlp);
1289   grpc_call* call = bctl->call;
1290   bool release_error = false;
1291 
1292   if (error == GRPC_ERROR_NONE) {
1293     grpc_slice slice;
1294     error = call->receiving_stream->Pull(&slice);
1295     if (error == GRPC_ERROR_NONE) {
1296       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1297                             slice);
1298       continue_receiving_slices(bctl);
1299     } else {
1300       /* Error returned by ByteStream::Pull() needs to be released manually */
1301       release_error = true;
1302     }
1303   }
1304 
1305   if (error != GRPC_ERROR_NONE) {
1306     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1307       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1308     }
1309     call->receiving_stream.reset();
1310     grpc_byte_buffer_destroy(*call->receiving_buffer);
1311     *call->receiving_buffer = nullptr;
1312     call->receiving_message = false;
1313     finish_batch_step(bctl);
1314     if (release_error) {
1315       GRPC_ERROR_UNREF(error);
1316     }
1317   }
1318 }
1319 
process_data_after_md(batch_control * bctl)1320 static void process_data_after_md(batch_control* bctl) {
1321   grpc_call* call = bctl->call;
1322   if (call->receiving_stream == nullptr) {
1323     *call->receiving_buffer = nullptr;
1324     call->receiving_message = false;
1325     finish_batch_step(bctl);
1326   } else {
1327     call->test_only_last_message_flags = call->receiving_stream->flags();
1328     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1329         (call->incoming_message_compression_algorithm >
1330          GRPC_MESSAGE_COMPRESS_NONE)) {
1331       grpc_compression_algorithm algo;
1332       GPR_ASSERT(
1333           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1334               &algo, call->incoming_message_compression_algorithm,
1335               (grpc_stream_compression_algorithm)0));
1336       *call->receiving_buffer =
1337           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1338     } else {
1339       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1340     }
1341     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1342                       grpc_schedule_on_exec_ctx);
1343     continue_receiving_slices(bctl);
1344   }
1345 }
1346 
receiving_stream_ready(void * bctlp,grpc_error_handle error)1347 static void receiving_stream_ready(void* bctlp, grpc_error_handle error) {
1348   batch_control* bctl = static_cast<batch_control*>(bctlp);
1349   grpc_call* call = bctl->call;
1350   if (error != GRPC_ERROR_NONE) {
1351     call->receiving_stream.reset();
1352     if (reinterpret_cast<grpc_error_handle>(
1353             gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
1354       gpr_atm_rel_store(&bctl->batch_error,
1355                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1356     }
1357     cancel_with_error(call, GRPC_ERROR_REF(error));
1358   }
1359   /* If recv_state is RECV_NONE, we will save the batch_control
1360    * object with rel_cas, and will not use it after the cas. Its corresponding
1361    * acq_load is in receiving_initial_metadata_ready() */
1362   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1363       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE,
1364                        reinterpret_cast<gpr_atm>(bctlp))) {
1365     process_data_after_md(bctl);
1366   }
1367 }
1368 
1369 // The recv_message_ready callback used when sending a batch containing
1370 // a recv_message op down the filter stack.  Yields the call combiner
1371 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error_handle error)1372 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1373                                                     grpc_error_handle error) {
1374   batch_control* bctl = static_cast<batch_control*>(bctlp);
1375   grpc_call* call = bctl->call;
1376   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1377   receiving_stream_ready(bctlp, error);
1378 }
1379 
1380 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1381 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1382   std::string error_msg = absl::StrFormat(
1383       "Incoming stream has both stream compression (%d) and message "
1384       "compression (%d).",
1385       call->incoming_stream_compression_algorithm,
1386       call->incoming_message_compression_algorithm);
1387   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1388   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1389 }
1390 
1391 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1392 handle_error_parsing_compression_algorithm(grpc_call* call) {
1393   std::string error_msg = absl::StrFormat(
1394       "Error in incoming message compression (%d) or stream "
1395       "compression (%d).",
1396       call->incoming_stream_compression_algorithm,
1397       call->incoming_message_compression_algorithm);
1398   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1399 }
1400 
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1401 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1402     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1403   std::string error_msg = absl::StrFormat(
1404       "Invalid compression algorithm value '%d'.", compression_algorithm);
1405   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1406   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1407 }
1408 
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1409 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1410     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1411   const char* algo_name = nullptr;
1412   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1413   std::string error_msg =
1414       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1415   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1416   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1417 }
1418 
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1419 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1420     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1421   const char* algo_name = nullptr;
1422   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1423   gpr_log(GPR_ERROR,
1424           "Compression algorithm ('%s') not present in the bitset of "
1425           "accepted encodings ('0x%x')",
1426           algo_name, call->encodings_accepted_by_peer);
1427 }
1428 
validate_filtered_metadata(batch_control * bctl)1429 static void validate_filtered_metadata(batch_control* bctl) {
1430   grpc_compression_algorithm compression_algorithm;
1431   grpc_call* call = bctl->call;
1432   if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1433                        GRPC_STREAM_COMPRESS_NONE &&
1434                    call->incoming_message_compression_algorithm !=
1435                        GRPC_MESSAGE_COMPRESS_NONE)) {
1436     handle_both_stream_and_msg_compression_set(call);
1437   } else if (
1438       GPR_UNLIKELY(
1439           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1440               &compression_algorithm,
1441               call->incoming_message_compression_algorithm,
1442               call->incoming_stream_compression_algorithm) == 0)) {
1443     handle_error_parsing_compression_algorithm(call);
1444   } else {
1445     const grpc_compression_options compression_options =
1446         grpc_channel_compression_options(call->channel);
1447     if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1448       handle_invalid_compression(call, compression_algorithm);
1449     } else if (GPR_UNLIKELY(
1450                    grpc_compression_options_is_algorithm_enabled_internal(
1451                        &compression_options, compression_algorithm) == 0)) {
1452       /* check if algorithm is supported by current channel config */
1453       handle_compression_algorithm_disabled(call, compression_algorithm);
1454     }
1455     /* GRPC_COMPRESS_NONE is always set. */
1456     GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1457     if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1458                                  compression_algorithm))) {
1459       if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1460         handle_compression_algorithm_not_accepted(call, compression_algorithm);
1461       }
1462     }
1463   }
1464 }
1465 
receiving_initial_metadata_ready(void * bctlp,grpc_error_handle error)1466 static void receiving_initial_metadata_ready(void* bctlp,
1467                                              grpc_error_handle error) {
1468   batch_control* bctl = static_cast<batch_control*>(bctlp);
1469   grpc_call* call = bctl->call;
1470 
1471   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1472 
1473   if (error == GRPC_ERROR_NONE) {
1474     grpc_metadata_batch* md =
1475         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1476     recv_initial_filter(call, md);
1477 
1478     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1479     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1480     validate_filtered_metadata(bctl);
1481 
1482     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1483       call->send_deadline = md->deadline;
1484     }
1485   } else {
1486     if (reinterpret_cast<grpc_error_handle>(
1487             gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
1488       gpr_atm_rel_store(&bctl->batch_error,
1489                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1490     }
1491     cancel_with_error(call, GRPC_ERROR_REF(error));
1492   }
1493 
1494   grpc_closure* saved_rsr_closure = nullptr;
1495   while (true) {
1496     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1497     /* Should only receive initial metadata once */
1498     GPR_ASSERT(rsr_bctlp != 1);
1499     if (rsr_bctlp == 0) {
1500       /* We haven't seen initial metadata and messages before, thus initial
1501        * metadata is received first.
1502        * no_barrier_cas is used, as this function won't access the batch_control
1503        * object saved by receiving_stream_ready() if the initial metadata is
1504        * received first. */
1505       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1506                                  RECV_INITIAL_METADATA_FIRST)) {
1507         break;
1508       }
1509     } else {
1510       /* Already received messages */
1511       saved_rsr_closure =
1512           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1513                               grpc_schedule_on_exec_ctx);
1514       /* No need to modify recv_state */
1515       break;
1516     }
1517   }
1518   if (saved_rsr_closure != nullptr) {
1519     grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1520                             GRPC_ERROR_REF(error));
1521   }
1522 
1523   finish_batch_step(bctl);
1524 }
1525 
receiving_trailing_metadata_ready(void * bctlp,grpc_error_handle error)1526 static void receiving_trailing_metadata_ready(void* bctlp,
1527                                               grpc_error_handle error) {
1528   batch_control* bctl = static_cast<batch_control*>(bctlp);
1529   grpc_call* call = bctl->call;
1530   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1531   grpc_metadata_batch* md =
1532       &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1533   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1534   finish_batch_step(bctl);
1535 }
1536 
finish_batch(void * bctlp,grpc_error_handle error)1537 static void finish_batch(void* bctlp, grpc_error_handle error) {
1538   batch_control* bctl = static_cast<batch_control*>(bctlp);
1539   grpc_call* call = bctl->call;
1540   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1541   if (reinterpret_cast<grpc_error_handle>(
1542           gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
1543     gpr_atm_rel_store(&bctl->batch_error,
1544                       reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1545   }
1546   if (error != GRPC_ERROR_NONE) {
1547     cancel_with_error(call, GRPC_ERROR_REF(error));
1548   }
1549   finish_batch_step(bctl);
1550 }
1551 
free_no_op_completion(void *,grpc_cq_completion * completion)1552 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1553   gpr_free(completion);
1554 }
1555 
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1556 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1557                                         size_t nops, void* notify_tag,
1558                                         int is_notify_tag_closure) {
1559   GPR_TIMER_SCOPE("call_start_batch", 0);
1560 
1561   size_t i;
1562   const grpc_op* op;
1563   batch_control* bctl;
1564   bool has_send_ops = false;
1565   int num_recv_ops = 0;
1566   grpc_call_error error = GRPC_CALL_OK;
1567   grpc_transport_stream_op_batch* stream_op;
1568   grpc_transport_stream_op_batch_payload* stream_op_payload;
1569 
1570   GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1571 
1572   if (nops == 0) {
1573     if (!is_notify_tag_closure) {
1574       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1575       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1576                      free_no_op_completion, nullptr,
1577                      static_cast<grpc_cq_completion*>(
1578                          gpr_malloc(sizeof(grpc_cq_completion))));
1579     } else {
1580       grpc_core::Closure::Run(DEBUG_LOCATION,
1581                               static_cast<grpc_closure*>(notify_tag),
1582                               GRPC_ERROR_NONE);
1583     }
1584     error = GRPC_CALL_OK;
1585     goto done;
1586   }
1587 
1588   bctl = reuse_or_allocate_batch_control(call, ops);
1589   if (bctl == nullptr) {
1590     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1591   }
1592   bctl->completion_data.notify_tag.tag = notify_tag;
1593   bctl->completion_data.notify_tag.is_closure =
1594       static_cast<uint8_t>(is_notify_tag_closure != 0);
1595 
1596   stream_op = &bctl->op;
1597   stream_op_payload = &call->stream_op_payload;
1598 
1599   /* rewrite batch ops into a transport op */
1600   for (i = 0; i < nops; i++) {
1601     op = &ops[i];
1602     if (op->reserved != nullptr) {
1603       error = GRPC_CALL_ERROR;
1604       goto done_with_error;
1605     }
1606     switch (op->op) {
1607       case GRPC_OP_SEND_INITIAL_METADATA: {
1608         /* Flag validation: currently allow no flags */
1609         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1610           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1611           goto done_with_error;
1612         }
1613         if (call->sent_initial_metadata) {
1614           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1615           goto done_with_error;
1616         }
1617         // TODO(juanlishen): If the user has already specified a compression
1618         // algorithm by setting the initial metadata with key of
1619         // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1620         // with the compression algorithm mapped from compression level.
1621         /* process compression level */
1622         grpc_metadata& compression_md = call->compression_md;
1623         compression_md.key = grpc_empty_slice();
1624         compression_md.value = grpc_empty_slice();
1625         compression_md.flags = 0;
1626         size_t additional_metadata_count = 0;
1627         grpc_compression_level effective_compression_level =
1628             GRPC_COMPRESS_LEVEL_NONE;
1629         bool level_set = false;
1630         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1631           effective_compression_level =
1632               op->data.send_initial_metadata.maybe_compression_level.level;
1633           level_set = true;
1634         } else {
1635           const grpc_compression_options copts =
1636               grpc_channel_compression_options(call->channel);
1637           if (copts.default_level.is_set) {
1638             level_set = true;
1639             effective_compression_level = copts.default_level.level;
1640           }
1641         }
1642         // Currently, only server side supports compression level setting.
1643         if (level_set && !call->is_client) {
1644           const grpc_compression_algorithm calgo =
1645               compression_algorithm_for_level_locked(
1646                   call, effective_compression_level);
1647           // The following metadata will be checked and removed by the message
1648           // compression filter. It will be used as the call's compression
1649           // algorithm.
1650           compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1651           compression_md.value = grpc_compression_algorithm_slice(calgo);
1652           additional_metadata_count++;
1653         }
1654         if (op->data.send_initial_metadata.count + additional_metadata_count >
1655             INT_MAX) {
1656           error = GRPC_CALL_ERROR_INVALID_METADATA;
1657           goto done_with_error;
1658         }
1659         stream_op->send_initial_metadata = true;
1660         call->sent_initial_metadata = true;
1661         if (!prepare_application_metadata(
1662                 call, static_cast<int>(op->data.send_initial_metadata.count),
1663                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1664                 &compression_md, static_cast<int>(additional_metadata_count))) {
1665           error = GRPC_CALL_ERROR_INVALID_METADATA;
1666           goto done_with_error;
1667         }
1668         /* TODO(ctiller): just make these the same variable? */
1669         if (call->is_client) {
1670           call->metadata_batch[0][0].deadline = call->send_deadline;
1671         }
1672         stream_op_payload->send_initial_metadata.send_initial_metadata =
1673             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1674         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1675             op->flags;
1676         if (call->is_client) {
1677           stream_op_payload->send_initial_metadata.peer_string =
1678               &call->peer_string;
1679         }
1680         has_send_ops = true;
1681         break;
1682       }
1683       case GRPC_OP_SEND_MESSAGE: {
1684         if (!are_write_flags_valid(op->flags)) {
1685           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1686           goto done_with_error;
1687         }
1688         if (op->data.send_message.send_message == nullptr) {
1689           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1690           goto done_with_error;
1691         }
1692         if (call->sending_message) {
1693           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1694           goto done_with_error;
1695         }
1696         uint32_t flags = op->flags;
1697         /* If the outgoing buffer is already compressed, mark it as so in the
1698            flags. These will be picked up by the compression filter and further
1699            (wasteful) attempts at compression skipped. */
1700         if (op->data.send_message.send_message->data.raw.compression >
1701             GRPC_COMPRESS_NONE) {
1702           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1703         }
1704         stream_op->send_message = true;
1705         call->sending_message = true;
1706         call->sending_stream.Init(
1707             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1708         stream_op_payload->send_message.send_message.reset(
1709             call->sending_stream.get());
1710         has_send_ops = true;
1711         break;
1712       }
1713       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1714         /* Flag validation: currently allow no flags */
1715         if (op->flags != 0) {
1716           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1717           goto done_with_error;
1718         }
1719         if (!call->is_client) {
1720           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1721           goto done_with_error;
1722         }
1723         if (call->sent_final_op) {
1724           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1725           goto done_with_error;
1726         }
1727         stream_op->send_trailing_metadata = true;
1728         call->sent_final_op = true;
1729         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1730             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1731         has_send_ops = true;
1732         break;
1733       }
1734       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1735         /* Flag validation: currently allow no flags */
1736         if (op->flags != 0) {
1737           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1738           goto done_with_error;
1739         }
1740         if (call->is_client) {
1741           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1742           goto done_with_error;
1743         }
1744         if (call->sent_final_op) {
1745           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1746           goto done_with_error;
1747         }
1748         if (op->data.send_status_from_server.trailing_metadata_count >
1749             INT_MAX) {
1750           error = GRPC_CALL_ERROR_INVALID_METADATA;
1751           goto done_with_error;
1752         }
1753         stream_op->send_trailing_metadata = true;
1754         call->sent_final_op = true;
1755         GPR_ASSERT(call->send_extra_metadata_count == 0);
1756         call->send_extra_metadata_count = 1;
1757         call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1758             op->data.send_status_from_server.status);
1759         grpc_error_handle status_error =
1760             op->data.send_status_from_server.status == GRPC_STATUS_OK
1761                 ? GRPC_ERROR_NONE
1762                 : grpc_error_set_int(
1763                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1764                           "Server returned error"),
1765                       GRPC_ERROR_INT_GRPC_STATUS,
1766                       static_cast<intptr_t>(
1767                           op->data.send_status_from_server.status));
1768         if (op->data.send_status_from_server.status_details != nullptr) {
1769           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1770               GRPC_MDSTR_GRPC_MESSAGE,
1771               grpc_slice_ref_internal(
1772                   *op->data.send_status_from_server.status_details));
1773           call->send_extra_metadata_count++;
1774           if (status_error != GRPC_ERROR_NONE) {
1775             char* msg = grpc_slice_to_c_string(
1776                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1777             status_error =
1778                 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1779                                    grpc_slice_from_copied_string(msg));
1780             gpr_free(msg);
1781           }
1782         }
1783 
1784         gpr_atm_rel_store(&call->status_error,
1785                           reinterpret_cast<gpr_atm>(status_error));
1786         if (!prepare_application_metadata(
1787                 call,
1788                 static_cast<int>(
1789                     op->data.send_status_from_server.trailing_metadata_count),
1790                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1791                 nullptr, 0)) {
1792           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1793             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1794           }
1795           call->send_extra_metadata_count = 0;
1796           error = GRPC_CALL_ERROR_INVALID_METADATA;
1797           goto done_with_error;
1798         }
1799         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1800             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1801         stream_op_payload->send_trailing_metadata.sent =
1802             &call->sent_server_trailing_metadata;
1803         has_send_ops = true;
1804         break;
1805       }
1806       case GRPC_OP_RECV_INITIAL_METADATA: {
1807         /* Flag validation: currently allow no flags */
1808         if (op->flags != 0) {
1809           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1810           goto done_with_error;
1811         }
1812         if (call->received_initial_metadata) {
1813           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1814           goto done_with_error;
1815         }
1816         call->received_initial_metadata = true;
1817         call->buffered_metadata[0] =
1818             op->data.recv_initial_metadata.recv_initial_metadata;
1819         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1820                           receiving_initial_metadata_ready, bctl,
1821                           grpc_schedule_on_exec_ctx);
1822         stream_op->recv_initial_metadata = true;
1823         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1824             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1825         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1826             &call->receiving_initial_metadata_ready;
1827         if (!call->is_client) {
1828           stream_op_payload->recv_initial_metadata.peer_string =
1829               &call->peer_string;
1830         }
1831         ++num_recv_ops;
1832         break;
1833       }
1834       case GRPC_OP_RECV_MESSAGE: {
1835         /* Flag validation: currently allow no flags */
1836         if (op->flags != 0) {
1837           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1838           goto done_with_error;
1839         }
1840         if (call->receiving_message) {
1841           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1842           goto done_with_error;
1843         }
1844         call->receiving_message = true;
1845         stream_op->recv_message = true;
1846         call->receiving_buffer = op->data.recv_message.recv_message;
1847         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1848         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1849                           receiving_stream_ready_in_call_combiner, bctl,
1850                           grpc_schedule_on_exec_ctx);
1851         stream_op_payload->recv_message.recv_message_ready =
1852             &call->receiving_stream_ready;
1853         ++num_recv_ops;
1854         break;
1855       }
1856       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1857         /* Flag validation: currently allow no flags */
1858         if (op->flags != 0) {
1859           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1860           goto done_with_error;
1861         }
1862         if (!call->is_client) {
1863           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1864           goto done_with_error;
1865         }
1866         if (call->requested_final_op) {
1867           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1868           goto done_with_error;
1869         }
1870         call->requested_final_op = true;
1871         call->buffered_metadata[1] =
1872             op->data.recv_status_on_client.trailing_metadata;
1873         call->final_op.client.status = op->data.recv_status_on_client.status;
1874         call->final_op.client.status_details =
1875             op->data.recv_status_on_client.status_details;
1876         call->final_op.client.error_string =
1877             op->data.recv_status_on_client.error_string;
1878         stream_op->recv_trailing_metadata = true;
1879         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1880             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1881         stream_op_payload->recv_trailing_metadata.collect_stats =
1882             &call->final_info.stats.transport_stream_stats;
1883         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1884                           receiving_trailing_metadata_ready, bctl,
1885                           grpc_schedule_on_exec_ctx);
1886         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1887             &call->receiving_trailing_metadata_ready;
1888         ++num_recv_ops;
1889         break;
1890       }
1891       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1892         /* Flag validation: currently allow no flags */
1893         if (op->flags != 0) {
1894           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1895           goto done_with_error;
1896         }
1897         if (call->is_client) {
1898           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1899           goto done_with_error;
1900         }
1901         if (call->requested_final_op) {
1902           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1903           goto done_with_error;
1904         }
1905         call->requested_final_op = true;
1906         call->final_op.server.cancelled =
1907             op->data.recv_close_on_server.cancelled;
1908         stream_op->recv_trailing_metadata = true;
1909         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1910             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1911         stream_op_payload->recv_trailing_metadata.collect_stats =
1912             &call->final_info.stats.transport_stream_stats;
1913         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1914                           receiving_trailing_metadata_ready, bctl,
1915                           grpc_schedule_on_exec_ctx);
1916         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1917             &call->receiving_trailing_metadata_ready;
1918         ++num_recv_ops;
1919         break;
1920       }
1921     }
1922   }
1923 
1924   GRPC_CALL_INTERNAL_REF(call, "completion");
1925   if (!is_notify_tag_closure) {
1926     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1927   }
1928   bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1929 
1930   if (has_send_ops) {
1931     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1932                       grpc_schedule_on_exec_ctx);
1933     stream_op->on_complete = &bctl->finish_batch;
1934   }
1935 
1936   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1937   execute_batch(call, stream_op, &bctl->start_batch);
1938 
1939 done:
1940   return error;
1941 
1942 done_with_error:
1943   /* reverse any mutations that occurred */
1944   if (stream_op->send_initial_metadata) {
1945     call->sent_initial_metadata = false;
1946     grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1947   }
1948   if (stream_op->send_message) {
1949     call->sending_message = false;
1950     call->sending_stream->Orphan();
1951   }
1952   if (stream_op->send_trailing_metadata) {
1953     call->sent_final_op = false;
1954     grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1955   }
1956   if (stream_op->recv_initial_metadata) {
1957     call->received_initial_metadata = false;
1958   }
1959   if (stream_op->recv_message) {
1960     call->receiving_message = false;
1961   }
1962   if (stream_op->recv_trailing_metadata) {
1963     call->requested_final_op = false;
1964   }
1965   goto done;
1966 }
1967 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1968 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1969                                       size_t nops, void* tag, void* reserved) {
1970   grpc_call_error err;
1971 
1972   GRPC_API_TRACE(
1973       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1974       "reserved=%p)",
1975       5, (call, ops, (unsigned long)nops, tag, reserved));
1976 
1977   if (reserved != nullptr) {
1978     err = GRPC_CALL_ERROR;
1979   } else {
1980     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1981     grpc_core::ExecCtx exec_ctx;
1982     err = call_start_batch(call, ops, nops, tag, 0);
1983   }
1984 
1985   return err;
1986 }
1987 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1988 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1989                                                   const grpc_op* ops,
1990                                                   size_t nops,
1991                                                   grpc_closure* closure) {
1992   return call_start_batch(call, ops, nops, closure, 1);
1993 }
1994 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1995 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1996                            void* value, void (*destroy)(void* value)) {
1997   if (call->context[elem].destroy) {
1998     call->context[elem].destroy(call->context[elem].value);
1999   }
2000   call->context[elem].value = value;
2001   call->context[elem].destroy = destroy;
2002 }
2003 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)2004 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
2005   return call->context[elem].value;
2006 }
2007 
grpc_call_is_client(grpc_call * call)2008 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2009 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2010 grpc_compression_algorithm grpc_call_compression_for_level(
2011     grpc_call* call, grpc_compression_level level) {
2012   grpc_compression_algorithm algo =
2013       compression_algorithm_for_level_locked(call, level);
2014   return algo;
2015 }
2016 
grpc_call_error_to_string(grpc_call_error error)2017 const char* grpc_call_error_to_string(grpc_call_error error) {
2018   switch (error) {
2019     case GRPC_CALL_ERROR:
2020       return "GRPC_CALL_ERROR";
2021     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2022       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2023     case GRPC_CALL_ERROR_ALREADY_FINISHED:
2024       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2025     case GRPC_CALL_ERROR_ALREADY_INVOKED:
2026       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2027     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2028       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2029     case GRPC_CALL_ERROR_INVALID_FLAGS:
2030       return "GRPC_CALL_ERROR_INVALID_FLAGS";
2031     case GRPC_CALL_ERROR_INVALID_MESSAGE:
2032       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2033     case GRPC_CALL_ERROR_INVALID_METADATA:
2034       return "GRPC_CALL_ERROR_INVALID_METADATA";
2035     case GRPC_CALL_ERROR_NOT_INVOKED:
2036       return "GRPC_CALL_ERROR_NOT_INVOKED";
2037     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2038       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2039     case GRPC_CALL_ERROR_NOT_ON_SERVER:
2040       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2041     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2042       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2043     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2044       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2045     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2046       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2047     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2048       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2049     case GRPC_CALL_OK:
2050       return "GRPC_CALL_OK";
2051   }
2052   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2053 }
2054