• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 
27 #include <string>
28 
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38 
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65 
66 /** The maximum number of concurrent batches possible.
67     Based upon the maximum number of individually queueable ops in the batch
68     api:
69       - initial metadata send
70       - message send
71       - status/close send (depending on client/server)
72       - initial metadata recv
73       - message recv
74       - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76 
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78 
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81 
82 struct batch_control {
83   batch_control() = default;
84 
85   grpc_call* call = nullptr;
86   grpc_transport_stream_op_batch op;
87   /* Share memory for cq_completion and notify_tag as they are never needed
88      simultaneously. Each byte used in this data structure count as six bytes
89      per call, so any savings we can make are worthwhile,
90 
91      We use notify_tag to determine whether or not to send notification to the
92      completion queue. Once we've made that determination, we can reuse the
93      memory for cq_completion. */
94   union {
95     grpc_cq_completion cq_completion;
96     struct {
97       /* Any given op indicates completion by either (a) calling a closure or
98          (b) sending a notification on the call's completion queue.  If
99          \a is_closure is true, \a tag indicates a closure to be invoked;
100          otherwise, \a tag indicates the tag to be used in the notification to
101          be sent to the completion queue. */
102       void* tag;
103       bool is_closure;
104     } notify_tag;
105   } completion_data;
106   grpc_closure start_batch;
107   grpc_closure finish_batch;
108   grpc_core::Atomic<intptr_t> steps_to_complete;
109   gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110   void set_num_steps_to_complete(uintptr_t steps) {
111     steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112   }
completed_batch_stepbatch_control113   bool completed_batch_step() {
114     return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115   }
116 };
117 
118 struct parent_call {
parent_callparent_call119   parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120   ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121 
122   gpr_mu child_list_mu;
123   grpc_call* first_child = nullptr;
124 };
125 
126 struct child_call {
child_callchild_call127   child_call(grpc_call* parent) : parent(parent) {}
128   grpc_call* parent;
129   /** siblings: children of the same parent form a list, and this list is
130      protected under
131       parent->mu */
132   grpc_call* sibling_next = nullptr;
133   grpc_call* sibling_prev = nullptr;
134 };
135 
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138 
139 struct grpc_call {
grpc_callgrpc_call140   grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141       : arena(arena),
142         cq(args.cq),
143         channel(args.channel),
144         is_client(args.server_transport_data == nullptr),
145         stream_op_payload(context) {
146     for (int i = 0; i < 2; i++) {
147       for (int j = 0; j < 2; j++) {
148         metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149       }
150     }
151   }
152 
~grpc_callgrpc_call153   ~grpc_call() {
154     gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155   }
156 
157   grpc_core::RefCount ext_ref;
158   grpc_core::Arena* arena;
159   grpc_core::CallCombiner call_combiner;
160   grpc_completion_queue* cq;
161   grpc_polling_entity pollent;
162   grpc_channel* channel;
163   gpr_cycle_counter start_time = gpr_get_cycle_counter();
164   /* parent_call* */ gpr_atm parent_call_atm = 0;
165   child_call* child = nullptr;
166 
167   /* client or server call */
168   bool is_client;
169   /** has grpc_call_unref been called */
170   bool destroy_called = false;
171   /** flag indicating that cancellation is inherited */
172   bool cancellation_is_inherited = false;
173   /** which ops are in-flight */
174   bool sent_initial_metadata = false;
175   bool sending_message = false;
176   bool sent_final_op = false;
177   bool received_initial_metadata = false;
178   bool receiving_message = false;
179   bool requested_final_op = false;
180   gpr_atm any_ops_sent_atm = 0;
181   gpr_atm received_final_op_atm = 0;
182 
183   batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184   grpc_transport_stream_op_batch_payload stream_op_payload;
185 
186   /* first idx: is_receiving, second idx: is_trailing */
187   grpc_metadata_batch metadata_batch[2][2] = {};
188 
189   /* Buffered read metadata waiting to be returned to the application.
190      Element 0 is initial metadata, element 1 is trailing metadata. */
191   grpc_metadata_array* buffered_metadata[2] = {};
192 
193   grpc_metadata compression_md;
194 
195   // A char* indicating the peer name.
196   gpr_atm peer_string = 0;
197 
198   /* Call data useful used for reporting. Only valid after the call has
199    * completed */
200   grpc_call_final_info final_info;
201 
202   /* Compression algorithm for *incoming* data */
203   grpc_message_compression_algorithm incoming_message_compression_algorithm =
204       GRPC_MESSAGE_COMPRESS_NONE;
205   /* Stream compression algorithm for *incoming* data */
206   grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207       GRPC_STREAM_COMPRESS_NONE;
208   /* Supported encodings (compression algorithms), a bitset.
209    * Always support no compression. */
210   uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211   /* Supported stream encodings (stream compression algorithms), a bitset */
212   uint32_t stream_encodings_accepted_by_peer = 0;
213 
214   /* Contexts for various subsystems (security, tracing, ...). */
215   grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216 
217   /* for the client, extra metadata is initial metadata; for the
218      server, it's trailing metadata */
219   grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220   int send_extra_metadata_count;
221   grpc_millis send_deadline;
222 
223   grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224 
225   grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226   grpc_byte_buffer** receiving_buffer = nullptr;
227   grpc_slice receiving_slice = grpc_empty_slice();
228   grpc_closure receiving_slice_ready;
229   grpc_closure receiving_stream_ready;
230   grpc_closure receiving_initial_metadata_ready;
231   grpc_closure receiving_trailing_metadata_ready;
232   uint32_t test_only_last_message_flags = 0;
233   // Status about operation of call
234   bool sent_server_trailing_metadata = false;
235   gpr_atm cancelled_with_error = 0;
236 
237   grpc_closure release_call;
238 
239   union {
240     struct {
241       grpc_status_code* status;
242       grpc_slice* status_details;
243       const char** error_string;
244     } client;
245     struct {
246       int* cancelled;
247       // backpointer to owning server if this is a server side call.
248       grpc_server* server;
249     } server;
250   } final_op;
251   gpr_atm status_error = 0;
252 
253   /* recv_state can contain one of the following values:
254      RECV_NONE :                 :  no initial metadata and messages received
255      RECV_INITIAL_METADATA_FIRST :  received initial metadata first
256      a batch_control*            :  received messages first
257 
258                  +------1------RECV_NONE------3-----+
259                  |                                  |
260                  |                                  |
261                  v                                  v
262      RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
263            |           ^                      |           ^
264            |           |                      |           |
265            +-----2-----+                      +-----4-----+
266 
267     For 1, 4: See receiving_initial_metadata_ready() function
268     For 2, 3: See receiving_stream_ready() function */
269   gpr_atm recv_state = 0;
270 };
271 
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274 
275 #define CALL_STACK_FROM_CALL(call)   \
276   (grpc_call_stack*)((char*)(call) + \
277                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279   (grpc_call*)(((char*)(call_stack)) -   \
280                GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281 
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286 
287 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
288                           grpc_closure* start_batch_closure);
289 
290 static void cancel_with_status(grpc_call* c, grpc_status_code status,
291                                const char* description);
292 static void cancel_with_error(grpc_call* c, grpc_error* error);
293 static void destroy_call(void* call_stack, grpc_error* error);
294 static void receiving_slice_ready(void* bctlp, grpc_error* error);
295 static void set_final_status(grpc_call* call, grpc_error* error);
296 static void process_data_after_md(batch_control* bctl);
297 static void post_batch_completion(batch_control* bctl);
298 
add_init_error(grpc_error ** composite,grpc_error * new_err)299 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
300   if (new_err == GRPC_ERROR_NONE) return;
301   if (*composite == GRPC_ERROR_NONE)
302     *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
303   *composite = grpc_error_add_child(*composite, new_err);
304 }
305 
grpc_call_arena_alloc(grpc_call * call,size_t size)306 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
307   return call->arena->Alloc(size);
308 }
309 
get_or_create_parent_call(grpc_call * call)310 static parent_call* get_or_create_parent_call(grpc_call* call) {
311   parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
312   if (p == nullptr) {
313     p = call->arena->New<parent_call>();
314     if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
315                          (gpr_atm)p)) {
316       p->~parent_call();
317       p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
318     }
319   }
320   return p;
321 }
322 
get_parent_call(grpc_call * call)323 static parent_call* get_parent_call(grpc_call* call) {
324   return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
325 }
326 
grpc_call_get_initial_size_estimate()327 size_t grpc_call_get_initial_size_estimate() {
328   return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
329          sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
330 }
331 
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)332 grpc_error* grpc_call_create(const grpc_call_create_args* args,
333                              grpc_call** out_call) {
334   GPR_TIMER_SCOPE("grpc_call_create", 0);
335 
336   GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
337 
338   grpc_core::Arena* arena;
339   grpc_call* call;
340   grpc_error* error = GRPC_ERROR_NONE;
341   grpc_channel_stack* channel_stack =
342       grpc_channel_get_channel_stack(args->channel);
343   size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
344   GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
345   size_t call_and_stack_size =
346       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
347       channel_stack->call_stack_size;
348   size_t call_alloc_size =
349       call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
350 
351   std::pair<grpc_core::Arena*, void*> arena_with_call =
352       grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
353   arena = arena_with_call.first;
354   call = new (arena_with_call.second) grpc_call(arena, *args);
355   *out_call = call;
356   grpc_slice path = grpc_empty_slice();
357   if (call->is_client) {
358     call->final_op.client.status_details = nullptr;
359     call->final_op.client.status = nullptr;
360     call->final_op.client.error_string = nullptr;
361     GRPC_STATS_INC_CLIENT_CALLS_CREATED();
362     GPR_ASSERT(args->add_initial_metadata_count <
363                MAX_SEND_EXTRA_METADATA_COUNT);
364     for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
365       call->send_extra_metadata[i].md = args->add_initial_metadata[i];
366       if (grpc_slice_eq_static_interned(
367               GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
368         path = grpc_slice_ref_internal(
369             GRPC_MDVALUE(args->add_initial_metadata[i]));
370       }
371     }
372     call->send_extra_metadata_count =
373         static_cast<int>(args->add_initial_metadata_count);
374   } else {
375     GRPC_STATS_INC_SERVER_CALLS_CREATED();
376     call->final_op.server.cancelled = nullptr;
377     call->final_op.server.server = args->server;
378     GPR_ASSERT(args->add_initial_metadata_count == 0);
379     call->send_extra_metadata_count = 0;
380   }
381 
382   grpc_millis send_deadline = args->send_deadline;
383   bool immediately_cancel = false;
384 
385   if (args->parent != nullptr) {
386     call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
387                        call_and_stack_size) child_call(args->parent);
388 
389     GRPC_CALL_INTERNAL_REF(args->parent, "child");
390     GPR_ASSERT(call->is_client);
391     GPR_ASSERT(!args->parent->is_client);
392 
393     if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
394       send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
395     }
396     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
397      * GRPC_PROPAGATE_STATS_CONTEXT */
398     /* TODO(ctiller): This should change to use the appropriate census start_op
399      * call. */
400     if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
401       if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
402         add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
403                                    "Census tracing propagation requested "
404                                    "without Census context propagation"));
405       }
406       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
407                             args->parent->context[GRPC_CONTEXT_TRACING].value,
408                             nullptr);
409     } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
410       add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
411                                  "Census context propagation requested "
412                                  "without Census tracing propagation"));
413     }
414     if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
415       call->cancellation_is_inherited = 1;
416       if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
417         immediately_cancel = true;
418       }
419     }
420   }
421   call->send_deadline = send_deadline;
422   /* initial refcount dropped by grpc_call_unref */
423   grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
424                                       args->server_transport_data,
425                                       call->context,
426                                       path,
427                                       call->start_time,
428                                       send_deadline,
429                                       call->arena,
430                                       &call->call_combiner};
431   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
432                                               call, &call_args));
433   // Publish this call to parent only after the call stack has been initialized.
434   if (args->parent != nullptr) {
435     child_call* cc = call->child;
436     parent_call* pc = get_or_create_parent_call(args->parent);
437     gpr_mu_lock(&pc->child_list_mu);
438     if (pc->first_child == nullptr) {
439       pc->first_child = call;
440       cc->sibling_next = cc->sibling_prev = call;
441     } else {
442       cc->sibling_next = pc->first_child;
443       cc->sibling_prev = pc->first_child->child->sibling_prev;
444       cc->sibling_next->child->sibling_prev =
445           cc->sibling_prev->child->sibling_next = call;
446     }
447     gpr_mu_unlock(&pc->child_list_mu);
448   }
449 
450   if (error != GRPC_ERROR_NONE) {
451     cancel_with_error(call, GRPC_ERROR_REF(error));
452   }
453   if (immediately_cancel) {
454     cancel_with_error(call, GRPC_ERROR_CANCELLED);
455   }
456   if (args->cq != nullptr) {
457     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
458                "Only one of 'cq' and 'pollset_set_alternative' should be "
459                "non-nullptr.");
460     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
461     call->pollent =
462         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
463   }
464   if (args->pollset_set_alternative != nullptr) {
465     call->pollent = grpc_polling_entity_create_from_pollset_set(
466         args->pollset_set_alternative);
467   }
468   if (!grpc_polling_entity_is_empty(&call->pollent)) {
469     grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
470                                                &call->pollent);
471   }
472 
473   if (call->is_client) {
474     grpc_core::channelz::ChannelNode* channelz_channel =
475         grpc_channel_get_channelz_node(call->channel);
476     if (channelz_channel != nullptr) {
477       channelz_channel->RecordCallStarted();
478     }
479   } else {
480     grpc_core::channelz::ServerNode* channelz_server =
481         grpc_server_get_channelz_node(call->final_op.server.server);
482     if (channelz_server != nullptr) {
483       channelz_server->RecordCallStarted();
484     }
485   }
486 
487   grpc_slice_unref_internal(path);
488 
489   return error;
490 }
491 
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)492 void grpc_call_set_completion_queue(grpc_call* call,
493                                     grpc_completion_queue* cq) {
494   GPR_ASSERT(cq);
495 
496   if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
497     gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
498     abort();
499   }
500   call->cq = cq;
501   GRPC_CQ_INTERNAL_REF(cq, "bind");
502   call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
503   grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
504                                              &call->pollent);
505 }
506 
507 #ifndef NDEBUG
508 #define REF_REASON reason
509 #define REF_ARG , const char* reason
510 #else
511 #define REF_REASON ""
512 #define REF_ARG
513 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)514 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
515   GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
516 }
grpc_call_internal_unref(grpc_call * c REF_ARG)517 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
518   GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
519 }
520 
release_call(void * call,grpc_error *)521 static void release_call(void* call, grpc_error* /*error*/) {
522   grpc_call* c = static_cast<grpc_call*>(call);
523   grpc_channel* channel = c->channel;
524   grpc_core::Arena* arena = c->arena;
525   c->~grpc_call();
526   grpc_channel_update_call_size_estimate(channel, arena->Destroy());
527   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
528 }
529 
destroy_call(void * call,grpc_error *)530 static void destroy_call(void* call, grpc_error* /*error*/) {
531   GPR_TIMER_SCOPE("destroy_call", 0);
532   size_t i;
533   int ii;
534   grpc_call* c = static_cast<grpc_call*>(call);
535   for (i = 0; i < 2; i++) {
536     grpc_metadata_batch_destroy(
537         &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
538   }
539   c->receiving_stream.reset();
540   parent_call* pc = get_parent_call(c);
541   if (pc != nullptr) {
542     pc->~parent_call();
543   }
544   for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
545     GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
546   }
547   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
548     if (c->context[i].destroy) {
549       c->context[i].destroy(c->context[i].value);
550     }
551   }
552   if (c->cq) {
553     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
554   }
555 
556   grpc_error* status_error =
557       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
558   grpc_error_get_status(status_error, c->send_deadline,
559                         &c->final_info.final_status, nullptr, nullptr,
560                         &(c->final_info.error_string));
561   GRPC_ERROR_UNREF(status_error);
562   c->final_info.stats.latency =
563       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
564   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
565                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
566                                             grpc_schedule_on_exec_ctx));
567 }
568 
grpc_call_ref(grpc_call * c)569 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
570 
grpc_call_unref(grpc_call * c)571 void grpc_call_unref(grpc_call* c) {
572   if (GPR_LIKELY(!c->ext_ref.Unref())) return;
573 
574   GPR_TIMER_SCOPE("grpc_call_unref", 0);
575 
576   child_call* cc = c->child;
577   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
578   grpc_core::ExecCtx exec_ctx;
579 
580   GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
581 
582   if (cc) {
583     parent_call* pc = get_parent_call(cc->parent);
584     gpr_mu_lock(&pc->child_list_mu);
585     if (c == pc->first_child) {
586       pc->first_child = cc->sibling_next;
587       if (c == pc->first_child) {
588         pc->first_child = nullptr;
589       }
590     }
591     cc->sibling_prev->child->sibling_next = cc->sibling_next;
592     cc->sibling_next->child->sibling_prev = cc->sibling_prev;
593     gpr_mu_unlock(&pc->child_list_mu);
594     GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
595   }
596 
597   GPR_ASSERT(!c->destroy_called);
598   c->destroy_called = 1;
599   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
600                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
601   if (cancel) {
602     cancel_with_error(c, GRPC_ERROR_CANCELLED);
603   } else {
604     // Unset the call combiner cancellation closure.  This has the
605     // effect of scheduling the previously set cancellation closure, if
606     // any, so that it can release any internal references it may be
607     // holding to the call stack. Also flush the closures on exec_ctx so that
608     // filters that schedule cancel notification closures on exec_ctx do not
609     // need to take a ref of the call stack to guarantee closure liveness.
610     c->call_combiner.SetNotifyOnCancel(nullptr);
611     grpc_core::ExecCtx::Get()->Flush();
612   }
613   GRPC_CALL_INTERNAL_UNREF(c, "destroy");
614 }
615 
grpc_call_cancel(grpc_call * call,void * reserved)616 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
617   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
618   GPR_ASSERT(!reserved);
619   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
620   grpc_core::ExecCtx exec_ctx;
621   cancel_with_error(call, GRPC_ERROR_CANCELLED);
622   return GRPC_CALL_OK;
623 }
624 
625 // This is called via the call combiner to start sending a batch down
626 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error *)627 static void execute_batch_in_call_combiner(void* arg, grpc_error* /*ignored*/) {
628   GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
629   grpc_transport_stream_op_batch* batch =
630       static_cast<grpc_transport_stream_op_batch*>(arg);
631   grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
632   grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
633   GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
634   elem->filter->start_transport_stream_op_batch(elem, batch);
635 }
636 
637 // start_batch_closure points to a caller-allocated closure to be used
638 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)639 static void execute_batch(grpc_call* call,
640                           grpc_transport_stream_op_batch* batch,
641                           grpc_closure* start_batch_closure) {
642   batch->handler_private.extra_arg = call;
643   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
644                     grpc_schedule_on_exec_ctx);
645   GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
646                            GRPC_ERROR_NONE, "executing batch");
647 }
648 
grpc_call_get_peer(grpc_call * call)649 char* grpc_call_get_peer(grpc_call* call) {
650   char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
651   if (peer_string != nullptr) return gpr_strdup(peer_string);
652   peer_string = grpc_channel_get_target(call->channel);
653   if (peer_string != nullptr) return peer_string;
654   return gpr_strdup("unknown");
655 }
656 
grpc_call_from_top_element(grpc_call_element * elem)657 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
658   return CALL_FROM_TOP_ELEM(elem);
659 }
660 
661 /*******************************************************************************
662  * CANCELLATION
663  */
664 
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)665 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
666                                              grpc_status_code status,
667                                              const char* description,
668                                              void* reserved) {
669   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
670   grpc_core::ExecCtx exec_ctx;
671   GRPC_API_TRACE(
672       "grpc_call_cancel_with_status("
673       "c=%p, status=%d, description=%s, reserved=%p)",
674       4, (c, (int)status, description, reserved));
675   GPR_ASSERT(reserved == nullptr);
676   cancel_with_status(c, status, description);
677   return GRPC_CALL_OK;
678 }
679 
680 struct cancel_state {
681   grpc_call* call;
682   grpc_closure start_batch;
683   grpc_closure finish_batch;
684 };
685 // The on_complete callback used when sending a cancel_stream batch down
686 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error *)687 static void done_termination(void* arg, grpc_error* /*error*/) {
688   cancel_state* state = static_cast<cancel_state*>(arg);
689   GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
690                           "on_complete for cancel_stream op");
691   GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
692   gpr_free(state);
693 }
694 
cancel_with_error(grpc_call * c,grpc_error * error)695 static void cancel_with_error(grpc_call* c, grpc_error* error) {
696   if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
697     GRPC_ERROR_UNREF(error);
698     return;
699   }
700   GRPC_CALL_INTERNAL_REF(c, "termination");
701   // Inform the call combiner of the cancellation, so that it can cancel
702   // any in-flight asynchronous actions that may be holding the call
703   // combiner.  This ensures that the cancel_stream batch can be sent
704   // down the filter stack in a timely manner.
705   c->call_combiner.Cancel(GRPC_ERROR_REF(error));
706   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
707   state->call = c;
708   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
709                     grpc_schedule_on_exec_ctx);
710   grpc_transport_stream_op_batch* op =
711       grpc_make_transport_stream_op(&state->finish_batch);
712   op->cancel_stream = true;
713   op->payload->cancel_stream.cancel_error = error;
714   execute_batch(c, op, &state->start_batch);
715 }
716 
grpc_call_cancel_internal(grpc_call * call)717 void grpc_call_cancel_internal(grpc_call* call) {
718   cancel_with_error(call, GRPC_ERROR_CANCELLED);
719 }
720 
error_from_status(grpc_status_code status,const char * description)721 static grpc_error* error_from_status(grpc_status_code status,
722                                      const char* description) {
723   // copying 'description' is needed to ensure the grpc_call_cancel_with_status
724   // guarantee that can be short-lived.
725   return grpc_error_set_int(
726       grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
727                          GRPC_ERROR_STR_GRPC_MESSAGE,
728                          grpc_slice_from_copied_string(description)),
729       GRPC_ERROR_INT_GRPC_STATUS, status);
730 }
731 
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)732 static void cancel_with_status(grpc_call* c, grpc_status_code status,
733                                const char* description) {
734   cancel_with_error(c, error_from_status(status, description));
735 }
736 
set_final_status(grpc_call * call,grpc_error * error)737 static void set_final_status(grpc_call* call, grpc_error* error) {
738   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
739     gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
740     gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
741   }
742   if (call->is_client) {
743     grpc_error_get_status(error, call->send_deadline,
744                           call->final_op.client.status,
745                           call->final_op.client.status_details, nullptr,
746                           call->final_op.client.error_string);
747     // explicitly take a ref
748     grpc_slice_ref_internal(*call->final_op.client.status_details);
749     gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
750     grpc_core::channelz::ChannelNode* channelz_channel =
751         grpc_channel_get_channelz_node(call->channel);
752     if (channelz_channel != nullptr) {
753       if (*call->final_op.client.status != GRPC_STATUS_OK) {
754         channelz_channel->RecordCallFailed();
755       } else {
756         channelz_channel->RecordCallSucceeded();
757       }
758     }
759   } else {
760     *call->final_op.server.cancelled =
761         error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
762     grpc_core::channelz::ServerNode* channelz_server =
763         grpc_server_get_channelz_node(call->final_op.server.server);
764     if (channelz_server != nullptr) {
765       if (*call->final_op.server.cancelled ||
766           reinterpret_cast<grpc_error*>(
767               gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
768         channelz_server->RecordCallFailed();
769       } else {
770         channelz_server->RecordCallSucceeded();
771       }
772     }
773     GRPC_ERROR_UNREF(error);
774   }
775 }
776 
777 /*******************************************************************************
778  * COMPRESSION
779  */
780 
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)781 static void set_incoming_message_compression_algorithm(
782     grpc_call* call, grpc_message_compression_algorithm algo) {
783   GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
784   call->incoming_message_compression_algorithm = algo;
785 }
786 
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)787 static void set_incoming_stream_compression_algorithm(
788     grpc_call* call, grpc_stream_compression_algorithm algo) {
789   GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
790   call->incoming_stream_compression_algorithm = algo;
791 }
792 
grpc_call_test_only_get_compression_algorithm(grpc_call * call)793 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
794     grpc_call* call) {
795   grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
796   grpc_compression_algorithm_from_message_stream_compression_algorithm(
797       &algorithm, call->incoming_message_compression_algorithm,
798       call->incoming_stream_compression_algorithm);
799   return algorithm;
800 }
801 
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)802 static grpc_compression_algorithm compression_algorithm_for_level_locked(
803     grpc_call* call, grpc_compression_level level) {
804   return grpc_compression_algorithm_for_level(level,
805                                               call->encodings_accepted_by_peer);
806 }
807 
grpc_call_test_only_get_message_flags(grpc_call * call)808 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
809   uint32_t flags;
810   flags = call->test_only_last_message_flags;
811   return flags;
812 }
813 
destroy_encodings_accepted_by_peer(void *)814 static void destroy_encodings_accepted_by_peer(void* /*p*/) { return; }
815 
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)816 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
817                                            grpc_mdelem mdel,
818                                            uint32_t* encodings_accepted_by_peer,
819                                            bool stream_encoding) {
820   size_t i;
821   uint32_t algorithm;
822   grpc_slice_buffer accept_encoding_parts;
823   grpc_slice accept_encoding_slice;
824   void* accepted_user_data;
825 
826   accepted_user_data =
827       grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
828   if (accepted_user_data != nullptr) {
829     *encodings_accepted_by_peer =
830         static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
831     return;
832   }
833 
834   *encodings_accepted_by_peer = 0;
835 
836   accept_encoding_slice = GRPC_MDVALUE(mdel);
837   grpc_slice_buffer_init(&accept_encoding_parts);
838   grpc_slice_split_without_space(accept_encoding_slice, ",",
839                                  &accept_encoding_parts);
840 
841   GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
842   for (i = 0; i < accept_encoding_parts.count; i++) {
843     int r;
844     grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
845     if (!stream_encoding) {
846       r = grpc_message_compression_algorithm_parse(
847           accept_encoding_entry_slice,
848           reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
849     } else {
850       r = grpc_stream_compression_algorithm_parse(
851           accept_encoding_entry_slice,
852           reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
853     }
854     if (r) {
855       GPR_BITSET(encodings_accepted_by_peer, algorithm);
856     } else {
857       char* accept_encoding_entry_str =
858           grpc_slice_to_c_string(accept_encoding_entry_slice);
859       gpr_log(GPR_DEBUG,
860               "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
861               accept_encoding_entry_str);
862       gpr_free(accept_encoding_entry_str);
863     }
864   }
865 
866   grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
867 
868   grpc_mdelem_set_user_data(
869       mdel, destroy_encodings_accepted_by_peer,
870       (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
871 }
872 
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)873 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
874   uint32_t encodings_accepted_by_peer;
875   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
876   return encodings_accepted_by_peer;
877 }
878 
879 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)880 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
881   return call->incoming_stream_compression_algorithm;
882 }
883 
linked_from_md(const grpc_metadata * md)884 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
885   return (grpc_linked_mdelem*)&md->internal_data;
886 }
887 
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)888 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
889                                   grpc_metadata* additional_metadata, int i,
890                                   int count) {
891   grpc_metadata* res =
892       i < count ? &metadata[i] : &additional_metadata[i - count];
893   GPR_ASSERT(res);
894   return res;
895 }
896 
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)897 static int prepare_application_metadata(grpc_call* call, int count,
898                                         grpc_metadata* metadata,
899                                         int is_trailing,
900                                         int prepend_extra_metadata,
901                                         grpc_metadata* additional_metadata,
902                                         int additional_metadata_count) {
903   int total_count = count + additional_metadata_count;
904   int i;
905   grpc_metadata_batch* batch =
906       &call->metadata_batch[0 /* is_receiving */][is_trailing];
907   for (i = 0; i < total_count; i++) {
908     const grpc_metadata* md =
909         get_md_elem(metadata, additional_metadata, i, count);
910     grpc_linked_mdelem* l = linked_from_md(md);
911     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
912     if (!GRPC_LOG_IF_ERROR("validate_metadata",
913                            grpc_validate_header_key_is_legal(md->key))) {
914       break;
915     } else if (!grpc_is_binary_header_internal(md->key) &&
916                !GRPC_LOG_IF_ERROR(
917                    "validate_metadata",
918                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
919       break;
920     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
921       // HTTP2 hpack encoding has a maximum limit.
922       break;
923     }
924     l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
925   }
926   if (i != total_count) {
927     for (int j = 0; j < i; j++) {
928       const grpc_metadata* md =
929           get_md_elem(metadata, additional_metadata, j, count);
930       grpc_linked_mdelem* l = linked_from_md(md);
931       GRPC_MDELEM_UNREF(l->md);
932     }
933     return 0;
934   }
935   if (prepend_extra_metadata) {
936     if (call->send_extra_metadata_count == 0) {
937       prepend_extra_metadata = 0;
938     } else {
939       for (i = 0; i < call->send_extra_metadata_count; i++) {
940         GRPC_LOG_IF_ERROR("prepare_application_metadata",
941                           grpc_metadata_batch_link_tail(
942                               batch, &call->send_extra_metadata[i]));
943       }
944     }
945   }
946   for (i = 0; i < total_count; i++) {
947     grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
948     grpc_linked_mdelem* l = linked_from_md(md);
949     grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
950     if (error != GRPC_ERROR_NONE) {
951       GRPC_MDELEM_UNREF(l->md);
952     }
953     GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
954   }
955   call->send_extra_metadata_count = 0;
956 
957   return 1;
958 }
959 
decode_message_compression(grpc_mdelem md)960 static grpc_message_compression_algorithm decode_message_compression(
961     grpc_mdelem md) {
962   grpc_message_compression_algorithm algorithm =
963       grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
964   if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
965     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
966     gpr_log(GPR_ERROR,
967             "Invalid incoming message compression algorithm: '%s'. "
968             "Interpreting incoming data as uncompressed.",
969             md_c_str);
970     gpr_free(md_c_str);
971     return GRPC_MESSAGE_COMPRESS_NONE;
972   }
973   return algorithm;
974 }
975 
decode_stream_compression(grpc_mdelem md)976 static grpc_stream_compression_algorithm decode_stream_compression(
977     grpc_mdelem md) {
978   grpc_stream_compression_algorithm algorithm =
979       grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
980   if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
981     char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
982     gpr_log(GPR_ERROR,
983             "Invalid incoming stream compression algorithm: '%s'. Interpreting "
984             "incoming data as uncompressed.",
985             md_c_str);
986     gpr_free(md_c_str);
987     return GRPC_STREAM_COMPRESS_NONE;
988   }
989   return algorithm;
990 }
991 
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)992 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
993                                  int is_trailing) {
994   if (b->list.count == 0) return;
995   if (!call->is_client && is_trailing) return;
996   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
997   GPR_TIMER_SCOPE("publish_app_metadata", 0);
998   grpc_metadata_array* dest;
999   grpc_metadata* mdusr;
1000   dest = call->buffered_metadata[is_trailing];
1001   if (dest->count + b->list.count > dest->capacity) {
1002     dest->capacity =
1003         GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1004     dest->metadata = static_cast<grpc_metadata*>(
1005         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1006   }
1007   for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1008     mdusr = &dest->metadata[dest->count++];
1009     /* we pass back borrowed slices that are valid whilst the call is valid */
1010     mdusr->key = GRPC_MDKEY(l->md);
1011     mdusr->value = GRPC_MDVALUE(l->md);
1012   }
1013 }
1014 
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1015 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1016   if (b->idx.named.content_encoding != nullptr) {
1017     GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1018     set_incoming_stream_compression_algorithm(
1019         call, decode_stream_compression(b->idx.named.content_encoding->md));
1020     grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1021   }
1022   if (b->idx.named.grpc_encoding != nullptr) {
1023     GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1024     set_incoming_message_compression_algorithm(
1025         call, decode_message_compression(b->idx.named.grpc_encoding->md));
1026     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1027   }
1028   uint32_t message_encodings_accepted_by_peer = 1u;
1029   uint32_t stream_encodings_accepted_by_peer = 1u;
1030   if (b->idx.named.grpc_accept_encoding != nullptr) {
1031     GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1032     set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1033                                    &message_encodings_accepted_by_peer, false);
1034     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1035   }
1036   if (b->idx.named.accept_encoding != nullptr) {
1037     GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1038     set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1039                                    &stream_encodings_accepted_by_peer, true);
1040     grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1041   }
1042   call->encodings_accepted_by_peer =
1043       grpc_compression_bitset_from_message_stream_compression_bitset(
1044           message_encodings_accepted_by_peer,
1045           stream_encodings_accepted_by_peer);
1046   publish_app_metadata(call, b, false);
1047 }
1048 
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1049 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1050                                  grpc_error* batch_error) {
1051   grpc_call* call = static_cast<grpc_call*>(args);
1052   if (batch_error != GRPC_ERROR_NONE) {
1053     set_final_status(call, batch_error);
1054   } else if (b->idx.named.grpc_status != nullptr) {
1055     grpc_status_code status_code =
1056         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1057     grpc_error* error = GRPC_ERROR_NONE;
1058     if (status_code != GRPC_STATUS_OK) {
1059       char* peer = grpc_call_get_peer(call);
1060       error = grpc_error_set_int(
1061           GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1062               absl::StrCat("Error received from peer ", peer).c_str()),
1063           GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1064       gpr_free(peer);
1065     }
1066     if (b->idx.named.grpc_message != nullptr) {
1067       error = grpc_error_set_str(
1068           error, GRPC_ERROR_STR_GRPC_MESSAGE,
1069           grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1070       grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1071     } else if (error != GRPC_ERROR_NONE) {
1072       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1073                                  grpc_empty_slice());
1074     }
1075     set_final_status(call, GRPC_ERROR_REF(error));
1076     grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1077     GRPC_ERROR_UNREF(error);
1078   } else if (!call->is_client) {
1079     set_final_status(call, GRPC_ERROR_NONE);
1080   } else {
1081     gpr_log(GPR_DEBUG,
1082             "Received trailing metadata with no error and no status");
1083     set_final_status(
1084         call, grpc_error_set_int(
1085                   GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1086                   GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1087   }
1088   publish_app_metadata(call, b, true);
1089 }
1090 
grpc_call_get_arena(grpc_call * call)1091 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1092 
grpc_call_get_call_stack(grpc_call * call)1093 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1094   return CALL_STACK_FROM_CALL(call);
1095 }
1096 
1097 /*******************************************************************************
1098  * BATCH API IMPLEMENTATION
1099  */
1100 
are_write_flags_valid(uint32_t flags)1101 static bool are_write_flags_valid(uint32_t flags) {
1102   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1103   const uint32_t allowed_write_positions =
1104       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1105   const uint32_t invalid_positions = ~allowed_write_positions;
1106   return !(flags & invalid_positions);
1107 }
1108 
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1109 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1110   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1111   uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1112   if (!is_client) {
1113     invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1114   }
1115   return !(flags & invalid_positions);
1116 }
1117 
batch_slot_for_op(grpc_op_type type)1118 static size_t batch_slot_for_op(grpc_op_type type) {
1119   switch (type) {
1120     case GRPC_OP_SEND_INITIAL_METADATA:
1121       return 0;
1122     case GRPC_OP_SEND_MESSAGE:
1123       return 1;
1124     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1125     case GRPC_OP_SEND_STATUS_FROM_SERVER:
1126       return 2;
1127     case GRPC_OP_RECV_INITIAL_METADATA:
1128       return 3;
1129     case GRPC_OP_RECV_MESSAGE:
1130       return 4;
1131     case GRPC_OP_RECV_CLOSE_ON_SERVER:
1132     case GRPC_OP_RECV_STATUS_ON_CLIENT:
1133       return 5;
1134   }
1135   GPR_UNREACHABLE_CODE(return 123456789);
1136 }
1137 
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1138 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1139                                                       const grpc_op* ops) {
1140   size_t slot_idx = batch_slot_for_op(ops[0].op);
1141   batch_control** pslot = &call->active_batches[slot_idx];
1142   batch_control* bctl;
1143   if (*pslot != nullptr) {
1144     bctl = *pslot;
1145     if (bctl->call != nullptr) {
1146       return nullptr;
1147     }
1148     bctl->~batch_control();
1149     bctl->op = {};
1150   } else {
1151     bctl = call->arena->New<batch_control>();
1152     *pslot = bctl;
1153   }
1154   bctl->call = call;
1155   bctl->op.payload = &call->stream_op_payload;
1156   return bctl;
1157 }
1158 
finish_batch_completion(void * user_data,grpc_cq_completion *)1159 static void finish_batch_completion(void* user_data,
1160                                     grpc_cq_completion* /*storage*/) {
1161   batch_control* bctl = static_cast<batch_control*>(user_data);
1162   grpc_call* call = bctl->call;
1163   bctl->call = nullptr;
1164   GRPC_CALL_INTERNAL_UNREF(call, "completion");
1165 }
1166 
reset_batch_errors(batch_control * bctl)1167 static void reset_batch_errors(batch_control* bctl) {
1168   GRPC_ERROR_UNREF(
1169       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1170   gpr_atm_rel_store(&bctl->batch_error,
1171                     reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1172 }
1173 
post_batch_completion(batch_control * bctl)1174 static void post_batch_completion(batch_control* bctl) {
1175   grpc_call* next_child_call;
1176   grpc_call* call = bctl->call;
1177   grpc_error* error = GRPC_ERROR_REF(
1178       reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1179 
1180   if (bctl->op.send_initial_metadata) {
1181     grpc_metadata_batch_destroy(
1182         &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1183   }
1184   if (bctl->op.send_message) {
1185     if (bctl->op.payload->send_message.stream_write_closed &&
1186         error == GRPC_ERROR_NONE) {
1187       error = grpc_error_add_child(
1188           error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1189                      "Attempt to send message after stream was closed."));
1190     }
1191     call->sending_message = false;
1192   }
1193   if (bctl->op.send_trailing_metadata) {
1194     grpc_metadata_batch_destroy(
1195         &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1196   }
1197   if (bctl->op.recv_trailing_metadata) {
1198     /* propagate cancellation to any interested children */
1199     gpr_atm_rel_store(&call->received_final_op_atm, 1);
1200     parent_call* pc = get_parent_call(call);
1201     if (pc != nullptr) {
1202       grpc_call* child;
1203       gpr_mu_lock(&pc->child_list_mu);
1204       child = pc->first_child;
1205       if (child != nullptr) {
1206         do {
1207           next_child_call = child->child->sibling_next;
1208           if (child->cancellation_is_inherited) {
1209             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1210             cancel_with_error(child, GRPC_ERROR_CANCELLED);
1211             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1212           }
1213           child = next_child_call;
1214         } while (child != pc->first_child);
1215       }
1216       gpr_mu_unlock(&pc->child_list_mu);
1217     }
1218     GRPC_ERROR_UNREF(error);
1219     error = GRPC_ERROR_NONE;
1220   }
1221   if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1222       *call->receiving_buffer != nullptr) {
1223     grpc_byte_buffer_destroy(*call->receiving_buffer);
1224     *call->receiving_buffer = nullptr;
1225   }
1226   reset_batch_errors(bctl);
1227 
1228   if (bctl->completion_data.notify_tag.is_closure) {
1229     /* unrefs error */
1230     bctl->call = nullptr;
1231     grpc_core::Closure::Run(DEBUG_LOCATION,
1232                             (grpc_closure*)bctl->completion_data.notify_tag.tag,
1233                             error);
1234     GRPC_CALL_INTERNAL_UNREF(call, "completion");
1235   } else {
1236     /* unrefs error */
1237     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1238                    finish_batch_completion, bctl,
1239                    &bctl->completion_data.cq_completion);
1240   }
1241 }
1242 
finish_batch_step(batch_control * bctl)1243 static void finish_batch_step(batch_control* bctl) {
1244   if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1245     post_batch_completion(bctl);
1246   }
1247 }
1248 
continue_receiving_slices(batch_control * bctl)1249 static void continue_receiving_slices(batch_control* bctl) {
1250   grpc_error* error;
1251   grpc_call* call = bctl->call;
1252   for (;;) {
1253     size_t remaining = call->receiving_stream->length() -
1254                        (*call->receiving_buffer)->data.raw.slice_buffer.length;
1255     if (remaining == 0) {
1256       call->receiving_message = 0;
1257       call->receiving_stream.reset();
1258       finish_batch_step(bctl);
1259       return;
1260     }
1261     if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1262       error = call->receiving_stream->Pull(&call->receiving_slice);
1263       if (error == GRPC_ERROR_NONE) {
1264         grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1265                               call->receiving_slice);
1266       } else {
1267         call->receiving_stream.reset();
1268         grpc_byte_buffer_destroy(*call->receiving_buffer);
1269         *call->receiving_buffer = nullptr;
1270         call->receiving_message = 0;
1271         finish_batch_step(bctl);
1272         GRPC_ERROR_UNREF(error);
1273         return;
1274       }
1275     } else {
1276       return;
1277     }
1278   }
1279 }
1280 
receiving_slice_ready(void * bctlp,grpc_error * error)1281 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1282   batch_control* bctl = static_cast<batch_control*>(bctlp);
1283   grpc_call* call = bctl->call;
1284   bool release_error = false;
1285 
1286   if (error == GRPC_ERROR_NONE) {
1287     grpc_slice slice;
1288     error = call->receiving_stream->Pull(&slice);
1289     if (error == GRPC_ERROR_NONE) {
1290       grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1291                             slice);
1292       continue_receiving_slices(bctl);
1293     } else {
1294       /* Error returned by ByteStream::Pull() needs to be released manually */
1295       release_error = true;
1296     }
1297   }
1298 
1299   if (error != GRPC_ERROR_NONE) {
1300     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1301       GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1302     }
1303     call->receiving_stream.reset();
1304     grpc_byte_buffer_destroy(*call->receiving_buffer);
1305     *call->receiving_buffer = nullptr;
1306     call->receiving_message = 0;
1307     finish_batch_step(bctl);
1308     if (release_error) {
1309       GRPC_ERROR_UNREF(error);
1310     }
1311   }
1312 }
1313 
process_data_after_md(batch_control * bctl)1314 static void process_data_after_md(batch_control* bctl) {
1315   grpc_call* call = bctl->call;
1316   if (call->receiving_stream == nullptr) {
1317     *call->receiving_buffer = nullptr;
1318     call->receiving_message = 0;
1319     finish_batch_step(bctl);
1320   } else {
1321     call->test_only_last_message_flags = call->receiving_stream->flags();
1322     if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1323         (call->incoming_message_compression_algorithm >
1324          GRPC_MESSAGE_COMPRESS_NONE)) {
1325       grpc_compression_algorithm algo;
1326       GPR_ASSERT(
1327           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1328               &algo, call->incoming_message_compression_algorithm,
1329               (grpc_stream_compression_algorithm)0));
1330       *call->receiving_buffer =
1331           grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1332     } else {
1333       *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1334     }
1335     GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1336                       grpc_schedule_on_exec_ctx);
1337     continue_receiving_slices(bctl);
1338   }
1339 }
1340 
receiving_stream_ready(void * bctlp,grpc_error * error)1341 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1342   batch_control* bctl = static_cast<batch_control*>(bctlp);
1343   grpc_call* call = bctl->call;
1344   if (error != GRPC_ERROR_NONE) {
1345     call->receiving_stream.reset();
1346     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1347         GRPC_ERROR_NONE) {
1348       gpr_atm_rel_store(&bctl->batch_error,
1349                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1350     }
1351     cancel_with_error(call, GRPC_ERROR_REF(error));
1352   }
1353   /* If recv_state is RECV_NONE, we will save the batch_control
1354    * object with rel_cas, and will not use it after the cas. Its corresponding
1355    * acq_load is in receiving_initial_metadata_ready() */
1356   if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1357       !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1358     process_data_after_md(bctl);
1359   }
1360 }
1361 
1362 // The recv_message_ready callback used when sending a batch containing
1363 // a recv_message op down the filter stack.  Yields the call combiner
1364 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1365 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1366                                                     grpc_error* error) {
1367   batch_control* bctl = static_cast<batch_control*>(bctlp);
1368   grpc_call* call = bctl->call;
1369   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1370   receiving_stream_ready(bctlp, error);
1371 }
1372 
1373 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1374 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1375   std::string error_msg = absl::StrFormat(
1376       "Incoming stream has both stream compression (%d) and message "
1377       "compression (%d).",
1378       call->incoming_stream_compression_algorithm,
1379       call->incoming_message_compression_algorithm);
1380   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1381   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1382 }
1383 
1384 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1385 handle_error_parsing_compression_algorithm(grpc_call* call) {
1386   std::string error_msg = absl::StrFormat(
1387       "Error in incoming message compression (%d) or stream "
1388       "compression (%d).",
1389       call->incoming_stream_compression_algorithm,
1390       call->incoming_message_compression_algorithm);
1391   cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1392 }
1393 
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1394 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1395     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1396   std::string error_msg = absl::StrFormat(
1397       "Invalid compression algorithm value '%d'.", compression_algorithm);
1398   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1399   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1400 }
1401 
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1402 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1403     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1404   const char* algo_name = nullptr;
1405   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1406   std::string error_msg =
1407       absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1408   gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1409   cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1410 }
1411 
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1412 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1413     grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1414   const char* algo_name = nullptr;
1415   grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1416   gpr_log(GPR_ERROR,
1417           "Compression algorithm ('%s') not present in the bitset of "
1418           "accepted encodings ('0x%x')",
1419           algo_name, call->encodings_accepted_by_peer);
1420 }
1421 
validate_filtered_metadata(batch_control * bctl)1422 static void validate_filtered_metadata(batch_control* bctl) {
1423   grpc_compression_algorithm compression_algorithm;
1424   grpc_call* call = bctl->call;
1425   if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1426                        GRPC_STREAM_COMPRESS_NONE &&
1427                    call->incoming_message_compression_algorithm !=
1428                        GRPC_MESSAGE_COMPRESS_NONE)) {
1429     handle_both_stream_and_msg_compression_set(call);
1430   } else if (
1431       GPR_UNLIKELY(
1432           grpc_compression_algorithm_from_message_stream_compression_algorithm(
1433               &compression_algorithm,
1434               call->incoming_message_compression_algorithm,
1435               call->incoming_stream_compression_algorithm) == 0)) {
1436     handle_error_parsing_compression_algorithm(call);
1437   } else {
1438     const grpc_compression_options compression_options =
1439         grpc_channel_compression_options(call->channel);
1440     if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1441       handle_invalid_compression(call, compression_algorithm);
1442     } else if (GPR_UNLIKELY(
1443                    grpc_compression_options_is_algorithm_enabled_internal(
1444                        &compression_options, compression_algorithm) == 0)) {
1445       /* check if algorithm is supported by current channel config */
1446       handle_compression_algorithm_disabled(call, compression_algorithm);
1447     }
1448     /* GRPC_COMPRESS_NONE is always set. */
1449     GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1450     if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1451                                  compression_algorithm))) {
1452       if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1453         handle_compression_algorithm_not_accepted(call, compression_algorithm);
1454       }
1455     }
1456   }
1457 }
1458 
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1459 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1460   batch_control* bctl = static_cast<batch_control*>(bctlp);
1461   grpc_call* call = bctl->call;
1462 
1463   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1464 
1465   if (error == GRPC_ERROR_NONE) {
1466     grpc_metadata_batch* md =
1467         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1468     recv_initial_filter(call, md);
1469 
1470     /* TODO(ctiller): this could be moved into recv_initial_filter now */
1471     GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1472     validate_filtered_metadata(bctl);
1473 
1474     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1475       call->send_deadline = md->deadline;
1476     }
1477   } else {
1478     if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1479         GRPC_ERROR_NONE) {
1480       gpr_atm_rel_store(&bctl->batch_error,
1481                         reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1482     }
1483     cancel_with_error(call, GRPC_ERROR_REF(error));
1484   }
1485 
1486   grpc_closure* saved_rsr_closure = nullptr;
1487   while (true) {
1488     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1489     /* Should only receive initial metadata once */
1490     GPR_ASSERT(rsr_bctlp != 1);
1491     if (rsr_bctlp == 0) {
1492       /* We haven't seen initial metadata and messages before, thus initial
1493        * metadata is received first.
1494        * no_barrier_cas is used, as this function won't access the batch_control
1495        * object saved by receiving_stream_ready() if the initial metadata is
1496        * received first. */
1497       if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1498                                  RECV_INITIAL_METADATA_FIRST)) {
1499         break;
1500       }
1501     } else {
1502       /* Already received messages */
1503       saved_rsr_closure =
1504           GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1505                               grpc_schedule_on_exec_ctx);
1506       /* No need to modify recv_state */
1507       break;
1508     }
1509   }
1510   if (saved_rsr_closure != nullptr) {
1511     grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1512                             GRPC_ERROR_REF(error));
1513   }
1514 
1515   finish_batch_step(bctl);
1516 }
1517 
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1518 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1519   batch_control* bctl = static_cast<batch_control*>(bctlp);
1520   grpc_call* call = bctl->call;
1521   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1522   grpc_metadata_batch* md =
1523       &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1524   recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1525   finish_batch_step(bctl);
1526 }
1527 
finish_batch(void * bctlp,grpc_error * error)1528 static void finish_batch(void* bctlp, grpc_error* error) {
1529   batch_control* bctl = static_cast<batch_control*>(bctlp);
1530   grpc_call* call = bctl->call;
1531   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1532   if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1533       GRPC_ERROR_NONE) {
1534     gpr_atm_rel_store(&bctl->batch_error,
1535                       reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1536   }
1537   if (error != GRPC_ERROR_NONE) {
1538     cancel_with_error(call, GRPC_ERROR_REF(error));
1539   }
1540   finish_batch_step(bctl);
1541 }
1542 
free_no_op_completion(void *,grpc_cq_completion * completion)1543 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1544   gpr_free(completion);
1545 }
1546 
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1547 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1548                                         size_t nops, void* notify_tag,
1549                                         int is_notify_tag_closure) {
1550   GPR_TIMER_SCOPE("call_start_batch", 0);
1551 
1552   size_t i;
1553   const grpc_op* op;
1554   batch_control* bctl;
1555   bool has_send_ops = false;
1556   int num_recv_ops = 0;
1557   grpc_call_error error = GRPC_CALL_OK;
1558   grpc_transport_stream_op_batch* stream_op;
1559   grpc_transport_stream_op_batch_payload* stream_op_payload;
1560 
1561   GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1562 
1563   if (nops == 0) {
1564     if (!is_notify_tag_closure) {
1565       GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1566       grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1567                      free_no_op_completion, nullptr,
1568                      static_cast<grpc_cq_completion*>(
1569                          gpr_malloc(sizeof(grpc_cq_completion))));
1570     } else {
1571       grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
1572                               GRPC_ERROR_NONE);
1573     }
1574     error = GRPC_CALL_OK;
1575     goto done;
1576   }
1577 
1578   bctl = reuse_or_allocate_batch_control(call, ops);
1579   if (bctl == nullptr) {
1580     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1581   }
1582   bctl->completion_data.notify_tag.tag = notify_tag;
1583   bctl->completion_data.notify_tag.is_closure =
1584       static_cast<uint8_t>(is_notify_tag_closure != 0);
1585 
1586   stream_op = &bctl->op;
1587   stream_op_payload = &call->stream_op_payload;
1588 
1589   /* rewrite batch ops into a transport op */
1590   for (i = 0; i < nops; i++) {
1591     op = &ops[i];
1592     if (op->reserved != nullptr) {
1593       error = GRPC_CALL_ERROR;
1594       goto done_with_error;
1595     }
1596     switch (op->op) {
1597       case GRPC_OP_SEND_INITIAL_METADATA: {
1598         /* Flag validation: currently allow no flags */
1599         if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1600           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1601           goto done_with_error;
1602         }
1603         if (call->sent_initial_metadata) {
1604           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1605           goto done_with_error;
1606         }
1607         // TODO(juanlishen): If the user has already specified a compression
1608         // algorithm by setting the initial metadata with key of
1609         // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1610         // with the compression algorithm mapped from compression level.
1611         /* process compression level */
1612         grpc_metadata& compression_md = call->compression_md;
1613         compression_md.key = grpc_empty_slice();
1614         compression_md.value = grpc_empty_slice();
1615         compression_md.flags = 0;
1616         size_t additional_metadata_count = 0;
1617         grpc_compression_level effective_compression_level =
1618             GRPC_COMPRESS_LEVEL_NONE;
1619         bool level_set = false;
1620         if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1621           effective_compression_level =
1622               op->data.send_initial_metadata.maybe_compression_level.level;
1623           level_set = true;
1624         } else {
1625           const grpc_compression_options copts =
1626               grpc_channel_compression_options(call->channel);
1627           if (copts.default_level.is_set) {
1628             level_set = true;
1629             effective_compression_level = copts.default_level.level;
1630           }
1631         }
1632         // Currently, only server side supports compression level setting.
1633         if (level_set && !call->is_client) {
1634           const grpc_compression_algorithm calgo =
1635               compression_algorithm_for_level_locked(
1636                   call, effective_compression_level);
1637           // The following metadata will be checked and removed by the message
1638           // compression filter. It will be used as the call's compression
1639           // algorithm.
1640           compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1641           compression_md.value = grpc_compression_algorithm_slice(calgo);
1642           additional_metadata_count++;
1643         }
1644         if (op->data.send_initial_metadata.count + additional_metadata_count >
1645             INT_MAX) {
1646           error = GRPC_CALL_ERROR_INVALID_METADATA;
1647           goto done_with_error;
1648         }
1649         stream_op->send_initial_metadata = true;
1650         call->sent_initial_metadata = true;
1651         if (!prepare_application_metadata(
1652                 call, static_cast<int>(op->data.send_initial_metadata.count),
1653                 op->data.send_initial_metadata.metadata, 0, call->is_client,
1654                 &compression_md, static_cast<int>(additional_metadata_count))) {
1655           error = GRPC_CALL_ERROR_INVALID_METADATA;
1656           goto done_with_error;
1657         }
1658         /* TODO(ctiller): just make these the same variable? */
1659         if (call->is_client) {
1660           call->metadata_batch[0][0].deadline = call->send_deadline;
1661         }
1662         stream_op_payload->send_initial_metadata.send_initial_metadata =
1663             &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1664         stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1665             op->flags;
1666         if (call->is_client) {
1667           stream_op_payload->send_initial_metadata.peer_string =
1668               &call->peer_string;
1669         }
1670         has_send_ops = true;
1671         break;
1672       }
1673       case GRPC_OP_SEND_MESSAGE: {
1674         if (!are_write_flags_valid(op->flags)) {
1675           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1676           goto done_with_error;
1677         }
1678         if (op->data.send_message.send_message == nullptr) {
1679           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1680           goto done_with_error;
1681         }
1682         if (call->sending_message) {
1683           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1684           goto done_with_error;
1685         }
1686         uint32_t flags = op->flags;
1687         /* If the outgoing buffer is already compressed, mark it as so in the
1688            flags. These will be picked up by the compression filter and further
1689            (wasteful) attempts at compression skipped. */
1690         if (op->data.send_message.send_message->data.raw.compression >
1691             GRPC_COMPRESS_NONE) {
1692           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1693         }
1694         stream_op->send_message = true;
1695         call->sending_message = true;
1696         call->sending_stream.Init(
1697             &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1698         stream_op_payload->send_message.send_message.reset(
1699             call->sending_stream.get());
1700         has_send_ops = true;
1701         break;
1702       }
1703       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1704         /* Flag validation: currently allow no flags */
1705         if (op->flags != 0) {
1706           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1707           goto done_with_error;
1708         }
1709         if (!call->is_client) {
1710           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1711           goto done_with_error;
1712         }
1713         if (call->sent_final_op) {
1714           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1715           goto done_with_error;
1716         }
1717         stream_op->send_trailing_metadata = true;
1718         call->sent_final_op = true;
1719         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1720             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1721         has_send_ops = true;
1722         break;
1723       }
1724       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1725         /* Flag validation: currently allow no flags */
1726         if (op->flags != 0) {
1727           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1728           goto done_with_error;
1729         }
1730         if (call->is_client) {
1731           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1732           goto done_with_error;
1733         }
1734         if (call->sent_final_op) {
1735           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1736           goto done_with_error;
1737         }
1738         if (op->data.send_status_from_server.trailing_metadata_count >
1739             INT_MAX) {
1740           error = GRPC_CALL_ERROR_INVALID_METADATA;
1741           goto done_with_error;
1742         }
1743         stream_op->send_trailing_metadata = true;
1744         call->sent_final_op = true;
1745         GPR_ASSERT(call->send_extra_metadata_count == 0);
1746         call->send_extra_metadata_count = 1;
1747         call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1748             op->data.send_status_from_server.status);
1749         grpc_error* status_error =
1750             op->data.send_status_from_server.status == GRPC_STATUS_OK
1751                 ? GRPC_ERROR_NONE
1752                 : grpc_error_set_int(
1753                       GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1754                           "Server returned error"),
1755                       GRPC_ERROR_INT_GRPC_STATUS,
1756                       static_cast<intptr_t>(
1757                           op->data.send_status_from_server.status));
1758         if (op->data.send_status_from_server.status_details != nullptr) {
1759           call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1760               GRPC_MDSTR_GRPC_MESSAGE,
1761               grpc_slice_ref_internal(
1762                   *op->data.send_status_from_server.status_details));
1763           call->send_extra_metadata_count++;
1764           if (status_error != GRPC_ERROR_NONE) {
1765             char* msg = grpc_slice_to_c_string(
1766                 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1767             status_error =
1768                 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1769                                    grpc_slice_from_copied_string(msg));
1770             gpr_free(msg);
1771           }
1772         }
1773 
1774         gpr_atm_rel_store(&call->status_error,
1775                           reinterpret_cast<gpr_atm>(status_error));
1776         if (!prepare_application_metadata(
1777                 call,
1778                 static_cast<int>(
1779                     op->data.send_status_from_server.trailing_metadata_count),
1780                 op->data.send_status_from_server.trailing_metadata, 1, 1,
1781                 nullptr, 0)) {
1782           for (int n = 0; n < call->send_extra_metadata_count; n++) {
1783             GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1784           }
1785           call->send_extra_metadata_count = 0;
1786           error = GRPC_CALL_ERROR_INVALID_METADATA;
1787           goto done_with_error;
1788         }
1789         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1790             &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1791         stream_op_payload->send_trailing_metadata.sent =
1792             &call->sent_server_trailing_metadata;
1793         has_send_ops = true;
1794         break;
1795       }
1796       case GRPC_OP_RECV_INITIAL_METADATA: {
1797         /* Flag validation: currently allow no flags */
1798         if (op->flags != 0) {
1799           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1800           goto done_with_error;
1801         }
1802         if (call->received_initial_metadata) {
1803           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1804           goto done_with_error;
1805         }
1806         call->received_initial_metadata = true;
1807         call->buffered_metadata[0] =
1808             op->data.recv_initial_metadata.recv_initial_metadata;
1809         GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1810                           receiving_initial_metadata_ready, bctl,
1811                           grpc_schedule_on_exec_ctx);
1812         stream_op->recv_initial_metadata = true;
1813         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1814             &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1815         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1816             &call->receiving_initial_metadata_ready;
1817         if (!call->is_client) {
1818           stream_op_payload->recv_initial_metadata.peer_string =
1819               &call->peer_string;
1820         }
1821         ++num_recv_ops;
1822         break;
1823       }
1824       case GRPC_OP_RECV_MESSAGE: {
1825         /* Flag validation: currently allow no flags */
1826         if (op->flags != 0) {
1827           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1828           goto done_with_error;
1829         }
1830         if (call->receiving_message) {
1831           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1832           goto done_with_error;
1833         }
1834         call->receiving_message = true;
1835         stream_op->recv_message = true;
1836         call->receiving_buffer = op->data.recv_message.recv_message;
1837         stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1838         GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1839                           receiving_stream_ready_in_call_combiner, bctl,
1840                           grpc_schedule_on_exec_ctx);
1841         stream_op_payload->recv_message.recv_message_ready =
1842             &call->receiving_stream_ready;
1843         ++num_recv_ops;
1844         break;
1845       }
1846       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1847         /* Flag validation: currently allow no flags */
1848         if (op->flags != 0) {
1849           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1850           goto done_with_error;
1851         }
1852         if (!call->is_client) {
1853           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1854           goto done_with_error;
1855         }
1856         if (call->requested_final_op) {
1857           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1858           goto done_with_error;
1859         }
1860         call->requested_final_op = true;
1861         call->buffered_metadata[1] =
1862             op->data.recv_status_on_client.trailing_metadata;
1863         call->final_op.client.status = op->data.recv_status_on_client.status;
1864         call->final_op.client.status_details =
1865             op->data.recv_status_on_client.status_details;
1866         call->final_op.client.error_string =
1867             op->data.recv_status_on_client.error_string;
1868         stream_op->recv_trailing_metadata = true;
1869         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1870             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1871         stream_op_payload->recv_trailing_metadata.collect_stats =
1872             &call->final_info.stats.transport_stream_stats;
1873         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1874                           receiving_trailing_metadata_ready, bctl,
1875                           grpc_schedule_on_exec_ctx);
1876         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1877             &call->receiving_trailing_metadata_ready;
1878         ++num_recv_ops;
1879         break;
1880       }
1881       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1882         /* Flag validation: currently allow no flags */
1883         if (op->flags != 0) {
1884           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1885           goto done_with_error;
1886         }
1887         if (call->is_client) {
1888           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1889           goto done_with_error;
1890         }
1891         if (call->requested_final_op) {
1892           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1893           goto done_with_error;
1894         }
1895         call->requested_final_op = true;
1896         call->final_op.server.cancelled =
1897             op->data.recv_close_on_server.cancelled;
1898         stream_op->recv_trailing_metadata = true;
1899         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1900             &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1901         stream_op_payload->recv_trailing_metadata.collect_stats =
1902             &call->final_info.stats.transport_stream_stats;
1903         GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1904                           receiving_trailing_metadata_ready, bctl,
1905                           grpc_schedule_on_exec_ctx);
1906         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1907             &call->receiving_trailing_metadata_ready;
1908         ++num_recv_ops;
1909         break;
1910       }
1911     }
1912   }
1913 
1914   GRPC_CALL_INTERNAL_REF(call, "completion");
1915   if (!is_notify_tag_closure) {
1916     GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1917   }
1918   bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1919 
1920   if (has_send_ops) {
1921     GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1922                       grpc_schedule_on_exec_ctx);
1923     stream_op->on_complete = &bctl->finish_batch;
1924   }
1925 
1926   gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1927   execute_batch(call, stream_op, &bctl->start_batch);
1928 
1929 done:
1930   return error;
1931 
1932 done_with_error:
1933   /* reverse any mutations that occurred */
1934   if (stream_op->send_initial_metadata) {
1935     call->sent_initial_metadata = false;
1936     grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1937   }
1938   if (stream_op->send_message) {
1939     call->sending_message = false;
1940     call->sending_stream->Orphan();
1941   }
1942   if (stream_op->send_trailing_metadata) {
1943     call->sent_final_op = false;
1944     grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1945   }
1946   if (stream_op->recv_initial_metadata) {
1947     call->received_initial_metadata = false;
1948   }
1949   if (stream_op->recv_message) {
1950     call->receiving_message = false;
1951   }
1952   if (stream_op->recv_trailing_metadata) {
1953     call->requested_final_op = false;
1954   }
1955   goto done;
1956 }
1957 
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1958 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1959                                       size_t nops, void* tag, void* reserved) {
1960   grpc_call_error err;
1961 
1962   GRPC_API_TRACE(
1963       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1964       "reserved=%p)",
1965       5, (call, ops, (unsigned long)nops, tag, reserved));
1966 
1967   if (reserved != nullptr) {
1968     err = GRPC_CALL_ERROR;
1969   } else {
1970     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1971     grpc_core::ExecCtx exec_ctx;
1972     err = call_start_batch(call, ops, nops, tag, 0);
1973   }
1974 
1975   return err;
1976 }
1977 
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1978 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1979                                                   const grpc_op* ops,
1980                                                   size_t nops,
1981                                                   grpc_closure* closure) {
1982   return call_start_batch(call, ops, nops, closure, 1);
1983 }
1984 
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1985 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1986                            void* value, void (*destroy)(void* value)) {
1987   if (call->context[elem].destroy) {
1988     call->context[elem].destroy(call->context[elem].value);
1989   }
1990   call->context[elem].value = value;
1991   call->context[elem].destroy = destroy;
1992 }
1993 
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1994 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1995   return call->context[elem].value;
1996 }
1997 
grpc_call_is_client(grpc_call * call)1998 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
1999 
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2000 grpc_compression_algorithm grpc_call_compression_for_level(
2001     grpc_call* call, grpc_compression_level level) {
2002   grpc_compression_algorithm algo =
2003       compression_algorithm_for_level_locked(call, level);
2004   return algo;
2005 }
2006 
grpc_call_error_to_string(grpc_call_error error)2007 const char* grpc_call_error_to_string(grpc_call_error error) {
2008   switch (error) {
2009     case GRPC_CALL_ERROR:
2010       return "GRPC_CALL_ERROR";
2011     case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2012       return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2013     case GRPC_CALL_ERROR_ALREADY_FINISHED:
2014       return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2015     case GRPC_CALL_ERROR_ALREADY_INVOKED:
2016       return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2017     case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2018       return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2019     case GRPC_CALL_ERROR_INVALID_FLAGS:
2020       return "GRPC_CALL_ERROR_INVALID_FLAGS";
2021     case GRPC_CALL_ERROR_INVALID_MESSAGE:
2022       return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2023     case GRPC_CALL_ERROR_INVALID_METADATA:
2024       return "GRPC_CALL_ERROR_INVALID_METADATA";
2025     case GRPC_CALL_ERROR_NOT_INVOKED:
2026       return "GRPC_CALL_ERROR_NOT_INVOKED";
2027     case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2028       return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2029     case GRPC_CALL_ERROR_NOT_ON_SERVER:
2030       return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2031     case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2032       return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2033     case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2034       return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2035     case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2036       return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2037     case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2038       return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2039     case GRPC_CALL_OK:
2040       return "GRPC_CALL_OK";
2041   }
2042   GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2043 }
2044