1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <string>
28
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65
66 /** The maximum number of concurrent batches possible.
67 Based upon the maximum number of individually queueable ops in the batch
68 api:
69 - initial metadata send
70 - message send
71 - status/close send (depending on client/server)
72 - initial metadata recv
73 - message recv
74 - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81
82 struct batch_control {
83 batch_control() = default;
84
85 grpc_call* call = nullptr;
86 grpc_transport_stream_op_batch op;
87 /* Share memory for cq_completion and notify_tag as they are never needed
88 simultaneously. Each byte used in this data structure count as six bytes
89 per call, so any savings we can make are worthwhile,
90
91 We use notify_tag to determine whether or not to send notification to the
92 completion queue. Once we've made that determination, we can reuse the
93 memory for cq_completion. */
94 union {
95 grpc_cq_completion cq_completion;
96 struct {
97 /* Any given op indicates completion by either (a) calling a closure or
98 (b) sending a notification on the call's completion queue. If
99 \a is_closure is true, \a tag indicates a closure to be invoked;
100 otherwise, \a tag indicates the tag to be used in the notification to
101 be sent to the completion queue. */
102 void* tag;
103 bool is_closure;
104 } notify_tag;
105 } completion_data;
106 grpc_closure start_batch;
107 grpc_closure finish_batch;
108 grpc_core::Atomic<intptr_t> steps_to_complete;
109 gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110 void set_num_steps_to_complete(uintptr_t steps) {
111 steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112 }
completed_batch_stepbatch_control113 bool completed_batch_step() {
114 return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115 }
116 };
117
118 struct parent_call {
parent_callparent_call119 parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120 ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121
122 gpr_mu child_list_mu;
123 grpc_call* first_child = nullptr;
124 };
125
126 struct child_call {
child_callchild_call127 explicit child_call(grpc_call* parent) : parent(parent) {}
128 grpc_call* parent;
129 /** siblings: children of the same parent form a list, and this list is
130 protected under
131 parent->mu */
132 grpc_call* sibling_next = nullptr;
133 grpc_call* sibling_prev = nullptr;
134 };
135
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138
139 struct grpc_call {
grpc_callgrpc_call140 grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141 : arena(arena),
142 cq(args.cq),
143 channel(args.channel),
144 is_client(args.server_transport_data == nullptr),
145 stream_op_payload(context) {
146 for (int i = 0; i < 2; i++) {
147 for (int j = 0; j < 2; j++) {
148 metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149 }
150 }
151 }
152
~grpc_callgrpc_call153 ~grpc_call() {
154 gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155 }
156
157 grpc_core::RefCount ext_ref;
158 grpc_core::Arena* arena;
159 grpc_core::CallCombiner call_combiner;
160 grpc_completion_queue* cq;
161 grpc_polling_entity pollent;
162 grpc_channel* channel;
163 gpr_cycle_counter start_time = gpr_get_cycle_counter();
164 /* parent_call* */ gpr_atm parent_call_atm = 0;
165 child_call* child = nullptr;
166
167 /* client or server call */
168 bool is_client;
169 /** has grpc_call_unref been called */
170 bool destroy_called = false;
171 /** flag indicating that cancellation is inherited */
172 bool cancellation_is_inherited = false;
173 /** which ops are in-flight */
174 bool sent_initial_metadata = false;
175 bool sending_message = false;
176 bool sent_final_op = false;
177 bool received_initial_metadata = false;
178 bool receiving_message = false;
179 bool requested_final_op = false;
180 gpr_atm any_ops_sent_atm = 0;
181 gpr_atm received_final_op_atm = 0;
182
183 batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184 grpc_transport_stream_op_batch_payload stream_op_payload;
185
186 /* first idx: is_receiving, second idx: is_trailing */
187 grpc_metadata_batch metadata_batch[2][2] = {};
188
189 /* Buffered read metadata waiting to be returned to the application.
190 Element 0 is initial metadata, element 1 is trailing metadata. */
191 grpc_metadata_array* buffered_metadata[2] = {};
192
193 grpc_metadata compression_md;
194
195 // A char* indicating the peer name.
196 gpr_atm peer_string = 0;
197
198 /* Call data useful used for reporting. Only valid after the call has
199 * completed */
200 grpc_call_final_info final_info;
201
202 /* Compression algorithm for *incoming* data */
203 grpc_message_compression_algorithm incoming_message_compression_algorithm =
204 GRPC_MESSAGE_COMPRESS_NONE;
205 /* Stream compression algorithm for *incoming* data */
206 grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207 GRPC_STREAM_COMPRESS_NONE;
208 /* Supported encodings (compression algorithms), a bitset.
209 * Always support no compression. */
210 uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211 /* Supported stream encodings (stream compression algorithms), a bitset */
212 uint32_t stream_encodings_accepted_by_peer = 0;
213
214 /* Contexts for various subsystems (security, tracing, ...). */
215 grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216
217 /* for the client, extra metadata is initial metadata; for the
218 server, it's trailing metadata */
219 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220 int send_extra_metadata_count;
221 grpc_millis send_deadline;
222
223 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224
225 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226 grpc_byte_buffer** receiving_buffer = nullptr;
227 grpc_slice receiving_slice = grpc_empty_slice();
228 grpc_closure receiving_slice_ready;
229 grpc_closure receiving_stream_ready;
230 grpc_closure receiving_initial_metadata_ready;
231 grpc_closure receiving_trailing_metadata_ready;
232 uint32_t test_only_last_message_flags = 0;
233 // Status about operation of call
234 bool sent_server_trailing_metadata = false;
235 gpr_atm cancelled_with_error = 0;
236
237 grpc_closure release_call;
238
239 union {
240 struct {
241 grpc_status_code* status;
242 grpc_slice* status_details;
243 const char** error_string;
244 } client;
245 struct {
246 int* cancelled;
247 // backpointer to owning server if this is a server side call.
248 grpc_core::Server* core_server;
249 } server;
250 } final_op;
251 gpr_atm status_error = 0;
252
253 /* recv_state can contain one of the following values:
254 RECV_NONE : : no initial metadata and messages received
255 RECV_INITIAL_METADATA_FIRST : received initial metadata first
256 a batch_control* : received messages first
257
258 +------1------RECV_NONE------3-----+
259 | |
260 | |
261 v v
262 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
263 | ^ | ^
264 | | | |
265 +-----2-----+ +-----4-----+
266
267 For 1, 4: See receiving_initial_metadata_ready() function
268 For 2, 3: See receiving_stream_ready() function */
269 gpr_atm recv_state = 0;
270 };
271
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274
275 #define CALL_STACK_FROM_CALL(call) \
276 (grpc_call_stack*)((char*)(call) + \
277 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279 (grpc_call*)(((char*)(call_stack)) - \
280 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286
287 static void execute_batch(grpc_call* call,
288 grpc_transport_stream_op_batch* batch,
289 grpc_closure* start_batch_closure);
290
291 static void cancel_with_status(grpc_call* c, grpc_status_code status,
292 const char* description);
293 static void cancel_with_error(grpc_call* c, grpc_error* error);
294 static void destroy_call(void* call_stack, grpc_error* error);
295 static void receiving_slice_ready(void* bctlp, grpc_error* error);
296 static void set_final_status(grpc_call* call, grpc_error* error);
297 static void process_data_after_md(batch_control* bctl);
298 static void post_batch_completion(batch_control* bctl);
299
add_init_error(grpc_error ** composite,grpc_error * new_err)300 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
301 if (new_err == GRPC_ERROR_NONE) return;
302 if (*composite == GRPC_ERROR_NONE) {
303 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
304 }
305 *composite = grpc_error_add_child(*composite, new_err);
306 }
307
grpc_call_arena_alloc(grpc_call * call,size_t size)308 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
309 return call->arena->Alloc(size);
310 }
311
get_or_create_parent_call(grpc_call * call)312 static parent_call* get_or_create_parent_call(grpc_call* call) {
313 parent_call* p =
314 reinterpret_cast<parent_call*>(gpr_atm_acq_load(&call->parent_call_atm));
315 if (p == nullptr) {
316 p = call->arena->New<parent_call>();
317 if (!gpr_atm_rel_cas(&call->parent_call_atm,
318 reinterpret_cast<gpr_atm>(nullptr),
319 reinterpret_cast<gpr_atm>(p))) {
320 p->~parent_call();
321 p = reinterpret_cast<parent_call*>(
322 gpr_atm_acq_load(&call->parent_call_atm));
323 }
324 }
325 return p;
326 }
327
get_parent_call(grpc_call * call)328 static parent_call* get_parent_call(grpc_call* call) {
329 return reinterpret_cast<parent_call*>(
330 gpr_atm_acq_load(&call->parent_call_atm));
331 }
332
grpc_call_get_initial_size_estimate()333 size_t grpc_call_get_initial_size_estimate() {
334 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
335 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
336 }
337
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)338 grpc_error* grpc_call_create(const grpc_call_create_args* args,
339 grpc_call** out_call) {
340 GPR_TIMER_SCOPE("grpc_call_create", 0);
341
342 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
343
344 grpc_core::Arena* arena;
345 grpc_call* call;
346 grpc_error* error = GRPC_ERROR_NONE;
347 grpc_channel_stack* channel_stack =
348 grpc_channel_get_channel_stack(args->channel);
349 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
350 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
351 size_t call_and_stack_size =
352 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
353 channel_stack->call_stack_size;
354 size_t call_alloc_size =
355 call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
356
357 std::pair<grpc_core::Arena*, void*> arena_with_call =
358 grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
359 arena = arena_with_call.first;
360 call = new (arena_with_call.second) grpc_call(arena, *args);
361 *out_call = call;
362 grpc_slice path = grpc_empty_slice();
363 if (call->is_client) {
364 call->final_op.client.status_details = nullptr;
365 call->final_op.client.status = nullptr;
366 call->final_op.client.error_string = nullptr;
367 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
368 GPR_ASSERT(args->add_initial_metadata_count <
369 MAX_SEND_EXTRA_METADATA_COUNT);
370 for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
371 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
372 if (grpc_slice_eq_static_interned(
373 GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
374 path = grpc_slice_ref_internal(
375 GRPC_MDVALUE(args->add_initial_metadata[i]));
376 }
377 }
378 call->send_extra_metadata_count =
379 static_cast<int>(args->add_initial_metadata_count);
380 } else {
381 GRPC_STATS_INC_SERVER_CALLS_CREATED();
382 call->final_op.server.cancelled = nullptr;
383 call->final_op.server.core_server = args->server;
384 GPR_ASSERT(args->add_initial_metadata_count == 0);
385 call->send_extra_metadata_count = 0;
386 }
387
388 grpc_millis send_deadline = args->send_deadline;
389 bool immediately_cancel = false;
390
391 if (args->parent != nullptr) {
392 call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
393 call_and_stack_size) child_call(args->parent);
394
395 GRPC_CALL_INTERNAL_REF(args->parent, "child");
396 GPR_ASSERT(call->is_client);
397 GPR_ASSERT(!args->parent->is_client);
398
399 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
400 send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
401 }
402 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
403 * GRPC_PROPAGATE_STATS_CONTEXT */
404 /* TODO(ctiller): This should change to use the appropriate census start_op
405 * call. */
406 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
407 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
408 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
409 "Census tracing propagation requested "
410 "without Census context propagation"));
411 }
412 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
413 args->parent->context[GRPC_CONTEXT_TRACING].value,
414 nullptr);
415 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
416 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
417 "Census context propagation requested "
418 "without Census tracing propagation"));
419 }
420 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
421 call->cancellation_is_inherited = true;
422 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
423 immediately_cancel = true;
424 }
425 }
426 }
427 call->send_deadline = send_deadline;
428 /* initial refcount dropped by grpc_call_unref */
429 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
430 args->server_transport_data,
431 call->context,
432 path,
433 call->start_time,
434 send_deadline,
435 call->arena,
436 &call->call_combiner};
437 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
438 call, &call_args));
439 // Publish this call to parent only after the call stack has been initialized.
440 if (args->parent != nullptr) {
441 child_call* cc = call->child;
442 parent_call* pc = get_or_create_parent_call(args->parent);
443 gpr_mu_lock(&pc->child_list_mu);
444 if (pc->first_child == nullptr) {
445 pc->first_child = call;
446 cc->sibling_next = cc->sibling_prev = call;
447 } else {
448 cc->sibling_next = pc->first_child;
449 cc->sibling_prev = pc->first_child->child->sibling_prev;
450 cc->sibling_next->child->sibling_prev =
451 cc->sibling_prev->child->sibling_next = call;
452 }
453 gpr_mu_unlock(&pc->child_list_mu);
454 }
455
456 if (error != GRPC_ERROR_NONE) {
457 cancel_with_error(call, GRPC_ERROR_REF(error));
458 }
459 if (immediately_cancel) {
460 cancel_with_error(call, GRPC_ERROR_CANCELLED);
461 }
462 if (args->cq != nullptr) {
463 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
464 "Only one of 'cq' and 'pollset_set_alternative' should be "
465 "non-nullptr.");
466 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
467 call->pollent =
468 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
469 }
470 if (args->pollset_set_alternative != nullptr) {
471 call->pollent = grpc_polling_entity_create_from_pollset_set(
472 args->pollset_set_alternative);
473 }
474 if (!grpc_polling_entity_is_empty(&call->pollent)) {
475 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
476 &call->pollent);
477 }
478
479 if (call->is_client) {
480 grpc_core::channelz::ChannelNode* channelz_channel =
481 grpc_channel_get_channelz_node(call->channel);
482 if (channelz_channel != nullptr) {
483 channelz_channel->RecordCallStarted();
484 }
485 } else if (call->final_op.server.core_server != nullptr) {
486 grpc_core::channelz::ServerNode* channelz_node =
487 call->final_op.server.core_server->channelz_node();
488 if (channelz_node != nullptr) {
489 channelz_node->RecordCallStarted();
490 }
491 }
492
493 grpc_slice_unref_internal(path);
494
495 return error;
496 }
497
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)498 void grpc_call_set_completion_queue(grpc_call* call,
499 grpc_completion_queue* cq) {
500 GPR_ASSERT(cq);
501
502 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
503 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
504 abort();
505 }
506 call->cq = cq;
507 GRPC_CQ_INTERNAL_REF(cq, "bind");
508 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
509 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
510 &call->pollent);
511 }
512
513 #ifndef NDEBUG
514 #define REF_REASON reason
515 #define REF_ARG , const char* reason
516 #else
517 #define REF_REASON ""
518 #define REF_ARG
519 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)520 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
521 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
522 }
grpc_call_internal_unref(grpc_call * c REF_ARG)523 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
524 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
525 }
526
release_call(void * call,grpc_error *)527 static void release_call(void* call, grpc_error* /*error*/) {
528 grpc_call* c = static_cast<grpc_call*>(call);
529 grpc_channel* channel = c->channel;
530 grpc_core::Arena* arena = c->arena;
531 c->~grpc_call();
532 grpc_channel_update_call_size_estimate(channel, arena->Destroy());
533 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
534 }
535
destroy_call(void * call,grpc_error *)536 static void destroy_call(void* call, grpc_error* /*error*/) {
537 GPR_TIMER_SCOPE("destroy_call", 0);
538 size_t i;
539 int ii;
540 grpc_call* c = static_cast<grpc_call*>(call);
541 for (i = 0; i < 2; i++) {
542 grpc_metadata_batch_destroy(
543 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
544 }
545 c->receiving_stream.reset();
546 parent_call* pc = get_parent_call(c);
547 if (pc != nullptr) {
548 pc->~parent_call();
549 }
550 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
551 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
552 }
553 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
554 if (c->context[i].destroy) {
555 c->context[i].destroy(c->context[i].value);
556 }
557 }
558 if (c->cq) {
559 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
560 }
561
562 grpc_error* status_error =
563 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
564 grpc_error_get_status(status_error, c->send_deadline,
565 &c->final_info.final_status, nullptr, nullptr,
566 &(c->final_info.error_string));
567 GRPC_ERROR_UNREF(status_error);
568 c->final_info.stats.latency =
569 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
570 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
571 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
572 grpc_schedule_on_exec_ctx));
573 }
574
grpc_call_ref(grpc_call * c)575 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
576
grpc_call_unref(grpc_call * c)577 void grpc_call_unref(grpc_call* c) {
578 if (GPR_LIKELY(!c->ext_ref.Unref())) return;
579
580 GPR_TIMER_SCOPE("grpc_call_unref", 0);
581
582 child_call* cc = c->child;
583 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
584 grpc_core::ExecCtx exec_ctx;
585
586 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
587
588 if (cc) {
589 parent_call* pc = get_parent_call(cc->parent);
590 gpr_mu_lock(&pc->child_list_mu);
591 if (c == pc->first_child) {
592 pc->first_child = cc->sibling_next;
593 if (c == pc->first_child) {
594 pc->first_child = nullptr;
595 }
596 }
597 cc->sibling_prev->child->sibling_next = cc->sibling_next;
598 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
599 gpr_mu_unlock(&pc->child_list_mu);
600 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
601 }
602
603 GPR_ASSERT(!c->destroy_called);
604 c->destroy_called = true;
605 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
606 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
607 if (cancel) {
608 cancel_with_error(c, GRPC_ERROR_CANCELLED);
609 } else {
610 // Unset the call combiner cancellation closure. This has the
611 // effect of scheduling the previously set cancellation closure, if
612 // any, so that it can release any internal references it may be
613 // holding to the call stack. Also flush the closures on exec_ctx so that
614 // filters that schedule cancel notification closures on exec_ctx do not
615 // need to take a ref of the call stack to guarantee closure liveness.
616 c->call_combiner.SetNotifyOnCancel(nullptr);
617 grpc_core::ExecCtx::Get()->Flush();
618 }
619 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
620 }
621
grpc_call_cancel(grpc_call * call,void * reserved)622 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
623 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
624 GPR_ASSERT(!reserved);
625 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
626 grpc_core::ExecCtx exec_ctx;
627 cancel_with_error(call, GRPC_ERROR_CANCELLED);
628 return GRPC_CALL_OK;
629 }
630
631 // This is called via the call combiner to start sending a batch down
632 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error *)633 static void execute_batch_in_call_combiner(void* arg, grpc_error* /*ignored*/) {
634 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
635 grpc_transport_stream_op_batch* batch =
636 static_cast<grpc_transport_stream_op_batch*>(arg);
637 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
638 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
639 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
640 elem->filter->start_transport_stream_op_batch(elem, batch);
641 }
642
643 // start_batch_closure points to a caller-allocated closure to be used
644 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)645 static void execute_batch(grpc_call* call,
646 grpc_transport_stream_op_batch* batch,
647 grpc_closure* start_batch_closure) {
648 batch->handler_private.extra_arg = call;
649 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
650 grpc_schedule_on_exec_ctx);
651 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
652 GRPC_ERROR_NONE, "executing batch");
653 }
654
grpc_call_get_peer(grpc_call * call)655 char* grpc_call_get_peer(grpc_call* call) {
656 char* peer_string =
657 reinterpret_cast<char*>(gpr_atm_acq_load(&call->peer_string));
658 if (peer_string != nullptr) return gpr_strdup(peer_string);
659 peer_string = grpc_channel_get_target(call->channel);
660 if (peer_string != nullptr) return peer_string;
661 return gpr_strdup("unknown");
662 }
663
grpc_call_from_top_element(grpc_call_element * surface_element)664 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
665 return CALL_FROM_TOP_ELEM(surface_element);
666 }
667
668 /*******************************************************************************
669 * CANCELLATION
670 */
671
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)672 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
673 grpc_status_code status,
674 const char* description,
675 void* reserved) {
676 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
677 grpc_core::ExecCtx exec_ctx;
678 GRPC_API_TRACE(
679 "grpc_call_cancel_with_status("
680 "c=%p, status=%d, description=%s, reserved=%p)",
681 4, (c, (int)status, description, reserved));
682 GPR_ASSERT(reserved == nullptr);
683 cancel_with_status(c, status, description);
684 return GRPC_CALL_OK;
685 }
686
687 struct cancel_state {
688 grpc_call* call;
689 grpc_closure start_batch;
690 grpc_closure finish_batch;
691 };
692 // The on_complete callback used when sending a cancel_stream batch down
693 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error *)694 static void done_termination(void* arg, grpc_error* /*error*/) {
695 cancel_state* state = static_cast<cancel_state*>(arg);
696 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
697 "on_complete for cancel_stream op");
698 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
699 gpr_free(state);
700 }
701
cancel_with_error(grpc_call * c,grpc_error * error)702 static void cancel_with_error(grpc_call* c, grpc_error* error) {
703 if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
704 GRPC_ERROR_UNREF(error);
705 return;
706 }
707 GRPC_CALL_INTERNAL_REF(c, "termination");
708 // Inform the call combiner of the cancellation, so that it can cancel
709 // any in-flight asynchronous actions that may be holding the call
710 // combiner. This ensures that the cancel_stream batch can be sent
711 // down the filter stack in a timely manner.
712 c->call_combiner.Cancel(GRPC_ERROR_REF(error));
713 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
714 state->call = c;
715 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
716 grpc_schedule_on_exec_ctx);
717 grpc_transport_stream_op_batch* op =
718 grpc_make_transport_stream_op(&state->finish_batch);
719 op->cancel_stream = true;
720 op->payload->cancel_stream.cancel_error = error;
721 execute_batch(c, op, &state->start_batch);
722 }
723
grpc_call_cancel_internal(grpc_call * call)724 void grpc_call_cancel_internal(grpc_call* call) {
725 cancel_with_error(call, GRPC_ERROR_CANCELLED);
726 }
727
error_from_status(grpc_status_code status,const char * description)728 static grpc_error* error_from_status(grpc_status_code status,
729 const char* description) {
730 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
731 // guarantee that can be short-lived.
732 return grpc_error_set_int(
733 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
734 GRPC_ERROR_STR_GRPC_MESSAGE,
735 grpc_slice_from_copied_string(description)),
736 GRPC_ERROR_INT_GRPC_STATUS, status);
737 }
738
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)739 static void cancel_with_status(grpc_call* c, grpc_status_code status,
740 const char* description) {
741 cancel_with_error(c, error_from_status(status, description));
742 }
743
set_final_status(grpc_call * call,grpc_error * error)744 static void set_final_status(grpc_call* call, grpc_error* error) {
745 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
746 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
747 gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
748 }
749 if (call->is_client) {
750 grpc_error_get_status(error, call->send_deadline,
751 call->final_op.client.status,
752 call->final_op.client.status_details, nullptr,
753 call->final_op.client.error_string);
754 // explicitly take a ref
755 grpc_slice_ref_internal(*call->final_op.client.status_details);
756 gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
757 grpc_core::channelz::ChannelNode* channelz_channel =
758 grpc_channel_get_channelz_node(call->channel);
759 if (channelz_channel != nullptr) {
760 if (*call->final_op.client.status != GRPC_STATUS_OK) {
761 channelz_channel->RecordCallFailed();
762 } else {
763 channelz_channel->RecordCallSucceeded();
764 }
765 }
766 } else {
767 *call->final_op.server.cancelled =
768 error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
769 grpc_core::channelz::ServerNode* channelz_node =
770 call->final_op.server.core_server->channelz_node();
771 if (channelz_node != nullptr) {
772 if (*call->final_op.server.cancelled ||
773 reinterpret_cast<grpc_error*>(
774 gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
775 channelz_node->RecordCallFailed();
776 } else {
777 channelz_node->RecordCallSucceeded();
778 }
779 }
780 GRPC_ERROR_UNREF(error);
781 }
782 }
783
784 /*******************************************************************************
785 * COMPRESSION
786 */
787
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)788 static void set_incoming_message_compression_algorithm(
789 grpc_call* call, grpc_message_compression_algorithm algo) {
790 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
791 call->incoming_message_compression_algorithm = algo;
792 }
793
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)794 static void set_incoming_stream_compression_algorithm(
795 grpc_call* call, grpc_stream_compression_algorithm algo) {
796 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
797 call->incoming_stream_compression_algorithm = algo;
798 }
799
grpc_call_test_only_get_compression_algorithm(grpc_call * call)800 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
801 grpc_call* call) {
802 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
803 grpc_compression_algorithm_from_message_stream_compression_algorithm(
804 &algorithm, call->incoming_message_compression_algorithm,
805 call->incoming_stream_compression_algorithm);
806 return algorithm;
807 }
808
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)809 static grpc_compression_algorithm compression_algorithm_for_level_locked(
810 grpc_call* call, grpc_compression_level level) {
811 return grpc_compression_algorithm_for_level(level,
812 call->encodings_accepted_by_peer);
813 }
814
grpc_call_test_only_get_message_flags(grpc_call * call)815 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
816 uint32_t flags;
817 flags = call->test_only_last_message_flags;
818 return flags;
819 }
820
destroy_encodings_accepted_by_peer(void *)821 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
822
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)823 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
824 grpc_mdelem mdel,
825 uint32_t* encodings_accepted_by_peer,
826 bool stream_encoding) {
827 size_t i;
828 uint32_t algorithm;
829 grpc_slice_buffer accept_encoding_parts;
830 grpc_slice accept_encoding_slice;
831 void* accepted_user_data;
832
833 accepted_user_data =
834 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
835 if (accepted_user_data != nullptr) {
836 *encodings_accepted_by_peer = static_cast<uint32_t>(
837 reinterpret_cast<uintptr_t>(accepted_user_data) - 1);
838 return;
839 }
840
841 *encodings_accepted_by_peer = 0;
842
843 accept_encoding_slice = GRPC_MDVALUE(mdel);
844 grpc_slice_buffer_init(&accept_encoding_parts);
845 grpc_slice_split_without_space(accept_encoding_slice, ",",
846 &accept_encoding_parts);
847
848 GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
849 for (i = 0; i < accept_encoding_parts.count; i++) {
850 int r;
851 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
852 if (!stream_encoding) {
853 r = grpc_message_compression_algorithm_parse(
854 accept_encoding_entry_slice,
855 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
856 } else {
857 r = grpc_stream_compression_algorithm_parse(
858 accept_encoding_entry_slice,
859 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
860 }
861 if (r) {
862 GPR_BITSET(encodings_accepted_by_peer, algorithm);
863 } else {
864 char* accept_encoding_entry_str =
865 grpc_slice_to_c_string(accept_encoding_entry_slice);
866 gpr_log(GPR_DEBUG,
867 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
868 accept_encoding_entry_str);
869 gpr_free(accept_encoding_entry_str);
870 }
871 }
872
873 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
874
875 grpc_mdelem_set_user_data(
876 mdel, destroy_encodings_accepted_by_peer,
877 reinterpret_cast<void*>(
878 static_cast<uintptr_t>(*encodings_accepted_by_peer) + 1));
879 }
880
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)881 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
882 uint32_t encodings_accepted_by_peer;
883 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
884 return encodings_accepted_by_peer;
885 }
886
887 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)888 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
889 return call->incoming_stream_compression_algorithm;
890 }
891
linked_from_md(grpc_metadata * md)892 static grpc_linked_mdelem* linked_from_md(grpc_metadata* md) {
893 return reinterpret_cast<grpc_linked_mdelem*>(&md->internal_data);
894 }
895
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)896 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
897 grpc_metadata* additional_metadata, int i,
898 int count) {
899 grpc_metadata* res =
900 i < count ? &metadata[i] : &additional_metadata[i - count];
901 GPR_ASSERT(res);
902 return res;
903 }
904
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)905 static int prepare_application_metadata(grpc_call* call, int count,
906 grpc_metadata* metadata,
907 int is_trailing,
908 int prepend_extra_metadata,
909 grpc_metadata* additional_metadata,
910 int additional_metadata_count) {
911 int total_count = count + additional_metadata_count;
912 int i;
913 grpc_metadata_batch* batch =
914 &call->metadata_batch[0 /* is_receiving */][is_trailing];
915 for (i = 0; i < total_count; i++) {
916 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
917 grpc_linked_mdelem* l = linked_from_md(md);
918 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
919 if (!GRPC_LOG_IF_ERROR("validate_metadata",
920 grpc_validate_header_key_is_legal(md->key))) {
921 break;
922 } else if (!grpc_is_binary_header_internal(md->key) &&
923 !GRPC_LOG_IF_ERROR(
924 "validate_metadata",
925 grpc_validate_header_nonbin_value_is_legal(md->value))) {
926 break;
927 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
928 // HTTP2 hpack encoding has a maximum limit.
929 break;
930 }
931 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
932 }
933 if (i != total_count) {
934 for (int j = 0; j < i; j++) {
935 grpc_metadata* md = get_md_elem(metadata, additional_metadata, j, count);
936 grpc_linked_mdelem* l = linked_from_md(md);
937 GRPC_MDELEM_UNREF(l->md);
938 }
939 return 0;
940 }
941 if (prepend_extra_metadata) {
942 if (call->send_extra_metadata_count == 0) {
943 prepend_extra_metadata = 0;
944 } else {
945 for (i = 0; i < call->send_extra_metadata_count; i++) {
946 GRPC_LOG_IF_ERROR("prepare_application_metadata",
947 grpc_metadata_batch_link_tail(
948 batch, &call->send_extra_metadata[i]));
949 }
950 }
951 }
952 for (i = 0; i < total_count; i++) {
953 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
954 grpc_linked_mdelem* l = linked_from_md(md);
955 grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
956 if (error != GRPC_ERROR_NONE) {
957 GRPC_MDELEM_UNREF(l->md);
958 }
959 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
960 }
961 call->send_extra_metadata_count = 0;
962
963 return 1;
964 }
965
decode_message_compression(grpc_mdelem md)966 static grpc_message_compression_algorithm decode_message_compression(
967 grpc_mdelem md) {
968 grpc_message_compression_algorithm algorithm =
969 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
970 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
971 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
972 gpr_log(GPR_ERROR,
973 "Invalid incoming message compression algorithm: '%s'. "
974 "Interpreting incoming data as uncompressed.",
975 md_c_str);
976 gpr_free(md_c_str);
977 return GRPC_MESSAGE_COMPRESS_NONE;
978 }
979 return algorithm;
980 }
981
decode_stream_compression(grpc_mdelem md)982 static grpc_stream_compression_algorithm decode_stream_compression(
983 grpc_mdelem md) {
984 grpc_stream_compression_algorithm algorithm =
985 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
986 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
987 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
988 gpr_log(GPR_ERROR,
989 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
990 "incoming data as uncompressed.",
991 md_c_str);
992 gpr_free(md_c_str);
993 return GRPC_STREAM_COMPRESS_NONE;
994 }
995 return algorithm;
996 }
997
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)998 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
999 int is_trailing) {
1000 if (b->list.count == 0) return;
1001 if (!call->is_client && is_trailing) return;
1002 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
1003 GPR_TIMER_SCOPE("publish_app_metadata", 0);
1004 grpc_metadata_array* dest;
1005 grpc_metadata* mdusr;
1006 dest = call->buffered_metadata[is_trailing];
1007 if (dest->count + b->list.count > dest->capacity) {
1008 dest->capacity =
1009 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1010 dest->metadata = static_cast<grpc_metadata*>(
1011 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1012 }
1013 for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1014 mdusr = &dest->metadata[dest->count++];
1015 /* we pass back borrowed slices that are valid whilst the call is valid */
1016 mdusr->key = GRPC_MDKEY(l->md);
1017 mdusr->value = GRPC_MDVALUE(l->md);
1018 }
1019 }
1020
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1021 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1022 if (b->idx.named.content_encoding != nullptr) {
1023 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1024 set_incoming_stream_compression_algorithm(
1025 call, decode_stream_compression(b->idx.named.content_encoding->md));
1026 grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1027 }
1028 if (b->idx.named.grpc_encoding != nullptr) {
1029 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1030 set_incoming_message_compression_algorithm(
1031 call, decode_message_compression(b->idx.named.grpc_encoding->md));
1032 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1033 }
1034 uint32_t message_encodings_accepted_by_peer = 1u;
1035 uint32_t stream_encodings_accepted_by_peer = 1u;
1036 if (b->idx.named.grpc_accept_encoding != nullptr) {
1037 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1038 set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1039 &message_encodings_accepted_by_peer, false);
1040 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1041 }
1042 if (b->idx.named.accept_encoding != nullptr) {
1043 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1044 set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1045 &stream_encodings_accepted_by_peer, true);
1046 grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1047 }
1048 call->encodings_accepted_by_peer =
1049 grpc_compression_bitset_from_message_stream_compression_bitset(
1050 message_encodings_accepted_by_peer,
1051 stream_encodings_accepted_by_peer);
1052 publish_app_metadata(call, b, false);
1053 }
1054
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1055 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1056 grpc_error* batch_error) {
1057 grpc_call* call = static_cast<grpc_call*>(args);
1058 if (batch_error != GRPC_ERROR_NONE) {
1059 set_final_status(call, batch_error);
1060 } else if (b->idx.named.grpc_status != nullptr) {
1061 grpc_status_code status_code =
1062 grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1063 grpc_error* error = GRPC_ERROR_NONE;
1064 if (status_code != GRPC_STATUS_OK) {
1065 char* peer = grpc_call_get_peer(call);
1066 error = grpc_error_set_int(
1067 GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1068 absl::StrCat("Error received from peer ", peer).c_str()),
1069 GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1070 gpr_free(peer);
1071 }
1072 if (b->idx.named.grpc_message != nullptr) {
1073 error = grpc_error_set_str(
1074 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1075 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1076 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1077 } else if (error != GRPC_ERROR_NONE) {
1078 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1079 grpc_empty_slice());
1080 }
1081 set_final_status(call, GRPC_ERROR_REF(error));
1082 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1083 GRPC_ERROR_UNREF(error);
1084 } else if (!call->is_client) {
1085 set_final_status(call, GRPC_ERROR_NONE);
1086 } else {
1087 gpr_log(GPR_DEBUG,
1088 "Received trailing metadata with no error and no status");
1089 set_final_status(
1090 call, grpc_error_set_int(
1091 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1092 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1093 }
1094 publish_app_metadata(call, b, true);
1095 }
1096
grpc_call_get_arena(grpc_call * call)1097 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1098
grpc_call_get_call_stack(grpc_call * call)1099 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1100 return CALL_STACK_FROM_CALL(call);
1101 }
1102
1103 /*******************************************************************************
1104 * BATCH API IMPLEMENTATION
1105 */
1106
are_write_flags_valid(uint32_t flags)1107 static bool are_write_flags_valid(uint32_t flags) {
1108 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1109 const uint32_t allowed_write_positions =
1110 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1111 const uint32_t invalid_positions = ~allowed_write_positions;
1112 return !(flags & invalid_positions);
1113 }
1114
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1115 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1116 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1117 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1118 if (!is_client) {
1119 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1120 }
1121 return !(flags & invalid_positions);
1122 }
1123
batch_slot_for_op(grpc_op_type type)1124 static size_t batch_slot_for_op(grpc_op_type type) {
1125 switch (type) {
1126 case GRPC_OP_SEND_INITIAL_METADATA:
1127 return 0;
1128 case GRPC_OP_SEND_MESSAGE:
1129 return 1;
1130 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1131 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1132 return 2;
1133 case GRPC_OP_RECV_INITIAL_METADATA:
1134 return 3;
1135 case GRPC_OP_RECV_MESSAGE:
1136 return 4;
1137 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1138 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1139 return 5;
1140 }
1141 GPR_UNREACHABLE_CODE(return 123456789);
1142 }
1143
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1144 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1145 const grpc_op* ops) {
1146 size_t slot_idx = batch_slot_for_op(ops[0].op);
1147 batch_control** pslot = &call->active_batches[slot_idx];
1148 batch_control* bctl;
1149 if (*pslot != nullptr) {
1150 bctl = *pslot;
1151 if (bctl->call != nullptr) {
1152 return nullptr;
1153 }
1154 bctl->~batch_control();
1155 bctl->op = {};
1156 } else {
1157 bctl = call->arena->New<batch_control>();
1158 *pslot = bctl;
1159 }
1160 bctl->call = call;
1161 bctl->op.payload = &call->stream_op_payload;
1162 return bctl;
1163 }
1164
finish_batch_completion(void * user_data,grpc_cq_completion *)1165 static void finish_batch_completion(void* user_data,
1166 grpc_cq_completion* /*storage*/) {
1167 batch_control* bctl = static_cast<batch_control*>(user_data);
1168 grpc_call* call = bctl->call;
1169 bctl->call = nullptr;
1170 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1171 }
1172
reset_batch_errors(batch_control * bctl)1173 static void reset_batch_errors(batch_control* bctl) {
1174 GRPC_ERROR_UNREF(
1175 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1176 gpr_atm_rel_store(&bctl->batch_error,
1177 reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1178 }
1179
post_batch_completion(batch_control * bctl)1180 static void post_batch_completion(batch_control* bctl) {
1181 grpc_call* next_child_call;
1182 grpc_call* call = bctl->call;
1183 grpc_error* error = GRPC_ERROR_REF(
1184 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1185
1186 if (bctl->op.send_initial_metadata) {
1187 grpc_metadata_batch_destroy(
1188 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1189 }
1190 if (bctl->op.send_message) {
1191 if (bctl->op.payload->send_message.stream_write_closed &&
1192 error == GRPC_ERROR_NONE) {
1193 error = grpc_error_add_child(
1194 error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1195 "Attempt to send message after stream was closed."));
1196 }
1197 call->sending_message = false;
1198 }
1199 if (bctl->op.send_trailing_metadata) {
1200 grpc_metadata_batch_destroy(
1201 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1202 }
1203 if (bctl->op.recv_trailing_metadata) {
1204 /* propagate cancellation to any interested children */
1205 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1206 parent_call* pc = get_parent_call(call);
1207 if (pc != nullptr) {
1208 grpc_call* child;
1209 gpr_mu_lock(&pc->child_list_mu);
1210 child = pc->first_child;
1211 if (child != nullptr) {
1212 do {
1213 next_child_call = child->child->sibling_next;
1214 if (child->cancellation_is_inherited) {
1215 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1216 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1217 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1218 }
1219 child = next_child_call;
1220 } while (child != pc->first_child);
1221 }
1222 gpr_mu_unlock(&pc->child_list_mu);
1223 }
1224 GRPC_ERROR_UNREF(error);
1225 error = GRPC_ERROR_NONE;
1226 }
1227 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1228 *call->receiving_buffer != nullptr) {
1229 grpc_byte_buffer_destroy(*call->receiving_buffer);
1230 *call->receiving_buffer = nullptr;
1231 }
1232 reset_batch_errors(bctl);
1233
1234 if (bctl->completion_data.notify_tag.is_closure) {
1235 /* unrefs error */
1236 bctl->call = nullptr;
1237 grpc_core::Closure::Run(
1238 DEBUG_LOCATION,
1239 static_cast<grpc_closure*>(bctl->completion_data.notify_tag.tag),
1240 error);
1241 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1242 } else {
1243 /* unrefs error */
1244 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1245 finish_batch_completion, bctl,
1246 &bctl->completion_data.cq_completion);
1247 }
1248 }
1249
finish_batch_step(batch_control * bctl)1250 static void finish_batch_step(batch_control* bctl) {
1251 if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1252 post_batch_completion(bctl);
1253 }
1254 }
1255
continue_receiving_slices(batch_control * bctl)1256 static void continue_receiving_slices(batch_control* bctl) {
1257 grpc_error* error;
1258 grpc_call* call = bctl->call;
1259 for (;;) {
1260 size_t remaining = call->receiving_stream->length() -
1261 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1262 if (remaining == 0) {
1263 call->receiving_message = false;
1264 call->receiving_stream.reset();
1265 finish_batch_step(bctl);
1266 return;
1267 }
1268 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1269 error = call->receiving_stream->Pull(&call->receiving_slice);
1270 if (error == GRPC_ERROR_NONE) {
1271 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1272 call->receiving_slice);
1273 } else {
1274 call->receiving_stream.reset();
1275 grpc_byte_buffer_destroy(*call->receiving_buffer);
1276 *call->receiving_buffer = nullptr;
1277 call->receiving_message = false;
1278 finish_batch_step(bctl);
1279 GRPC_ERROR_UNREF(error);
1280 return;
1281 }
1282 } else {
1283 return;
1284 }
1285 }
1286 }
1287
receiving_slice_ready(void * bctlp,grpc_error * error)1288 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1289 batch_control* bctl = static_cast<batch_control*>(bctlp);
1290 grpc_call* call = bctl->call;
1291 bool release_error = false;
1292
1293 if (error == GRPC_ERROR_NONE) {
1294 grpc_slice slice;
1295 error = call->receiving_stream->Pull(&slice);
1296 if (error == GRPC_ERROR_NONE) {
1297 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1298 slice);
1299 continue_receiving_slices(bctl);
1300 } else {
1301 /* Error returned by ByteStream::Pull() needs to be released manually */
1302 release_error = true;
1303 }
1304 }
1305
1306 if (error != GRPC_ERROR_NONE) {
1307 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1308 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1309 }
1310 call->receiving_stream.reset();
1311 grpc_byte_buffer_destroy(*call->receiving_buffer);
1312 *call->receiving_buffer = nullptr;
1313 call->receiving_message = false;
1314 finish_batch_step(bctl);
1315 if (release_error) {
1316 GRPC_ERROR_UNREF(error);
1317 }
1318 }
1319 }
1320
process_data_after_md(batch_control * bctl)1321 static void process_data_after_md(batch_control* bctl) {
1322 grpc_call* call = bctl->call;
1323 if (call->receiving_stream == nullptr) {
1324 *call->receiving_buffer = nullptr;
1325 call->receiving_message = false;
1326 finish_batch_step(bctl);
1327 } else {
1328 call->test_only_last_message_flags = call->receiving_stream->flags();
1329 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1330 (call->incoming_message_compression_algorithm >
1331 GRPC_MESSAGE_COMPRESS_NONE)) {
1332 grpc_compression_algorithm algo;
1333 GPR_ASSERT(
1334 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1335 &algo, call->incoming_message_compression_algorithm,
1336 (grpc_stream_compression_algorithm)0));
1337 *call->receiving_buffer =
1338 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1339 } else {
1340 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1341 }
1342 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1343 grpc_schedule_on_exec_ctx);
1344 continue_receiving_slices(bctl);
1345 }
1346 }
1347
receiving_stream_ready(void * bctlp,grpc_error * error)1348 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1349 batch_control* bctl = static_cast<batch_control*>(bctlp);
1350 grpc_call* call = bctl->call;
1351 if (error != GRPC_ERROR_NONE) {
1352 call->receiving_stream.reset();
1353 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1354 GRPC_ERROR_NONE) {
1355 gpr_atm_rel_store(&bctl->batch_error,
1356 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1357 }
1358 cancel_with_error(call, GRPC_ERROR_REF(error));
1359 }
1360 /* If recv_state is RECV_NONE, we will save the batch_control
1361 * object with rel_cas, and will not use it after the cas. Its corresponding
1362 * acq_load is in receiving_initial_metadata_ready() */
1363 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1364 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE,
1365 reinterpret_cast<gpr_atm>(bctlp))) {
1366 process_data_after_md(bctl);
1367 }
1368 }
1369
1370 // The recv_message_ready callback used when sending a batch containing
1371 // a recv_message op down the filter stack. Yields the call combiner
1372 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1373 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1374 grpc_error* error) {
1375 batch_control* bctl = static_cast<batch_control*>(bctlp);
1376 grpc_call* call = bctl->call;
1377 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1378 receiving_stream_ready(bctlp, error);
1379 }
1380
1381 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1382 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1383 std::string error_msg = absl::StrFormat(
1384 "Incoming stream has both stream compression (%d) and message "
1385 "compression (%d).",
1386 call->incoming_stream_compression_algorithm,
1387 call->incoming_message_compression_algorithm);
1388 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1389 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1390 }
1391
1392 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1393 handle_error_parsing_compression_algorithm(grpc_call* call) {
1394 std::string error_msg = absl::StrFormat(
1395 "Error in incoming message compression (%d) or stream "
1396 "compression (%d).",
1397 call->incoming_stream_compression_algorithm,
1398 call->incoming_message_compression_algorithm);
1399 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1400 }
1401
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1402 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1403 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1404 std::string error_msg = absl::StrFormat(
1405 "Invalid compression algorithm value '%d'.", compression_algorithm);
1406 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1407 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1408 }
1409
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1410 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1411 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1412 const char* algo_name = nullptr;
1413 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1414 std::string error_msg =
1415 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1416 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1417 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1418 }
1419
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1420 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1421 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1422 const char* algo_name = nullptr;
1423 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1424 gpr_log(GPR_ERROR,
1425 "Compression algorithm ('%s') not present in the bitset of "
1426 "accepted encodings ('0x%x')",
1427 algo_name, call->encodings_accepted_by_peer);
1428 }
1429
validate_filtered_metadata(batch_control * bctl)1430 static void validate_filtered_metadata(batch_control* bctl) {
1431 grpc_compression_algorithm compression_algorithm;
1432 grpc_call* call = bctl->call;
1433 if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1434 GRPC_STREAM_COMPRESS_NONE &&
1435 call->incoming_message_compression_algorithm !=
1436 GRPC_MESSAGE_COMPRESS_NONE)) {
1437 handle_both_stream_and_msg_compression_set(call);
1438 } else if (
1439 GPR_UNLIKELY(
1440 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1441 &compression_algorithm,
1442 call->incoming_message_compression_algorithm,
1443 call->incoming_stream_compression_algorithm) == 0)) {
1444 handle_error_parsing_compression_algorithm(call);
1445 } else {
1446 const grpc_compression_options compression_options =
1447 grpc_channel_compression_options(call->channel);
1448 if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1449 handle_invalid_compression(call, compression_algorithm);
1450 } else if (GPR_UNLIKELY(
1451 grpc_compression_options_is_algorithm_enabled_internal(
1452 &compression_options, compression_algorithm) == 0)) {
1453 /* check if algorithm is supported by current channel config */
1454 handle_compression_algorithm_disabled(call, compression_algorithm);
1455 }
1456 /* GRPC_COMPRESS_NONE is always set. */
1457 GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1458 if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1459 compression_algorithm))) {
1460 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1461 handle_compression_algorithm_not_accepted(call, compression_algorithm);
1462 }
1463 }
1464 }
1465 }
1466
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1467 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1468 batch_control* bctl = static_cast<batch_control*>(bctlp);
1469 grpc_call* call = bctl->call;
1470
1471 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1472
1473 if (error == GRPC_ERROR_NONE) {
1474 grpc_metadata_batch* md =
1475 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1476 recv_initial_filter(call, md);
1477
1478 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1479 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1480 validate_filtered_metadata(bctl);
1481
1482 if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1483 call->send_deadline = md->deadline;
1484 }
1485 } else {
1486 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1487 GRPC_ERROR_NONE) {
1488 gpr_atm_rel_store(&bctl->batch_error,
1489 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1490 }
1491 cancel_with_error(call, GRPC_ERROR_REF(error));
1492 }
1493
1494 grpc_closure* saved_rsr_closure = nullptr;
1495 while (true) {
1496 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1497 /* Should only receive initial metadata once */
1498 GPR_ASSERT(rsr_bctlp != 1);
1499 if (rsr_bctlp == 0) {
1500 /* We haven't seen initial metadata and messages before, thus initial
1501 * metadata is received first.
1502 * no_barrier_cas is used, as this function won't access the batch_control
1503 * object saved by receiving_stream_ready() if the initial metadata is
1504 * received first. */
1505 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1506 RECV_INITIAL_METADATA_FIRST)) {
1507 break;
1508 }
1509 } else {
1510 /* Already received messages */
1511 saved_rsr_closure =
1512 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1513 grpc_schedule_on_exec_ctx);
1514 /* No need to modify recv_state */
1515 break;
1516 }
1517 }
1518 if (saved_rsr_closure != nullptr) {
1519 grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1520 GRPC_ERROR_REF(error));
1521 }
1522
1523 finish_batch_step(bctl);
1524 }
1525
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1526 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1527 batch_control* bctl = static_cast<batch_control*>(bctlp);
1528 grpc_call* call = bctl->call;
1529 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1530 grpc_metadata_batch* md =
1531 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1532 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1533 finish_batch_step(bctl);
1534 }
1535
finish_batch(void * bctlp,grpc_error * error)1536 static void finish_batch(void* bctlp, grpc_error* error) {
1537 batch_control* bctl = static_cast<batch_control*>(bctlp);
1538 grpc_call* call = bctl->call;
1539 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1540 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1541 GRPC_ERROR_NONE) {
1542 gpr_atm_rel_store(&bctl->batch_error,
1543 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1544 }
1545 if (error != GRPC_ERROR_NONE) {
1546 cancel_with_error(call, GRPC_ERROR_REF(error));
1547 }
1548 finish_batch_step(bctl);
1549 }
1550
free_no_op_completion(void *,grpc_cq_completion * completion)1551 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1552 gpr_free(completion);
1553 }
1554
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1555 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1556 size_t nops, void* notify_tag,
1557 int is_notify_tag_closure) {
1558 GPR_TIMER_SCOPE("call_start_batch", 0);
1559
1560 size_t i;
1561 const grpc_op* op;
1562 batch_control* bctl;
1563 bool has_send_ops = false;
1564 int num_recv_ops = 0;
1565 grpc_call_error error = GRPC_CALL_OK;
1566 grpc_transport_stream_op_batch* stream_op;
1567 grpc_transport_stream_op_batch_payload* stream_op_payload;
1568
1569 GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1570
1571 if (nops == 0) {
1572 if (!is_notify_tag_closure) {
1573 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1574 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1575 free_no_op_completion, nullptr,
1576 static_cast<grpc_cq_completion*>(
1577 gpr_malloc(sizeof(grpc_cq_completion))));
1578 } else {
1579 grpc_core::Closure::Run(DEBUG_LOCATION,
1580 static_cast<grpc_closure*>(notify_tag),
1581 GRPC_ERROR_NONE);
1582 }
1583 error = GRPC_CALL_OK;
1584 goto done;
1585 }
1586
1587 bctl = reuse_or_allocate_batch_control(call, ops);
1588 if (bctl == nullptr) {
1589 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1590 }
1591 bctl->completion_data.notify_tag.tag = notify_tag;
1592 bctl->completion_data.notify_tag.is_closure =
1593 static_cast<uint8_t>(is_notify_tag_closure != 0);
1594
1595 stream_op = &bctl->op;
1596 stream_op_payload = &call->stream_op_payload;
1597
1598 /* rewrite batch ops into a transport op */
1599 for (i = 0; i < nops; i++) {
1600 op = &ops[i];
1601 if (op->reserved != nullptr) {
1602 error = GRPC_CALL_ERROR;
1603 goto done_with_error;
1604 }
1605 switch (op->op) {
1606 case GRPC_OP_SEND_INITIAL_METADATA: {
1607 /* Flag validation: currently allow no flags */
1608 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1609 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1610 goto done_with_error;
1611 }
1612 if (call->sent_initial_metadata) {
1613 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1614 goto done_with_error;
1615 }
1616 // TODO(juanlishen): If the user has already specified a compression
1617 // algorithm by setting the initial metadata with key of
1618 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1619 // with the compression algorithm mapped from compression level.
1620 /* process compression level */
1621 grpc_metadata& compression_md = call->compression_md;
1622 compression_md.key = grpc_empty_slice();
1623 compression_md.value = grpc_empty_slice();
1624 compression_md.flags = 0;
1625 size_t additional_metadata_count = 0;
1626 grpc_compression_level effective_compression_level =
1627 GRPC_COMPRESS_LEVEL_NONE;
1628 bool level_set = false;
1629 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1630 effective_compression_level =
1631 op->data.send_initial_metadata.maybe_compression_level.level;
1632 level_set = true;
1633 } else {
1634 const grpc_compression_options copts =
1635 grpc_channel_compression_options(call->channel);
1636 if (copts.default_level.is_set) {
1637 level_set = true;
1638 effective_compression_level = copts.default_level.level;
1639 }
1640 }
1641 // Currently, only server side supports compression level setting.
1642 if (level_set && !call->is_client) {
1643 const grpc_compression_algorithm calgo =
1644 compression_algorithm_for_level_locked(
1645 call, effective_compression_level);
1646 // The following metadata will be checked and removed by the message
1647 // compression filter. It will be used as the call's compression
1648 // algorithm.
1649 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1650 compression_md.value = grpc_compression_algorithm_slice(calgo);
1651 additional_metadata_count++;
1652 }
1653 if (op->data.send_initial_metadata.count + additional_metadata_count >
1654 INT_MAX) {
1655 error = GRPC_CALL_ERROR_INVALID_METADATA;
1656 goto done_with_error;
1657 }
1658 stream_op->send_initial_metadata = true;
1659 call->sent_initial_metadata = true;
1660 if (!prepare_application_metadata(
1661 call, static_cast<int>(op->data.send_initial_metadata.count),
1662 op->data.send_initial_metadata.metadata, 0, call->is_client,
1663 &compression_md, static_cast<int>(additional_metadata_count))) {
1664 error = GRPC_CALL_ERROR_INVALID_METADATA;
1665 goto done_with_error;
1666 }
1667 /* TODO(ctiller): just make these the same variable? */
1668 if (call->is_client) {
1669 call->metadata_batch[0][0].deadline = call->send_deadline;
1670 }
1671 stream_op_payload->send_initial_metadata.send_initial_metadata =
1672 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1673 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1674 op->flags;
1675 if (call->is_client) {
1676 stream_op_payload->send_initial_metadata.peer_string =
1677 &call->peer_string;
1678 }
1679 has_send_ops = true;
1680 break;
1681 }
1682 case GRPC_OP_SEND_MESSAGE: {
1683 if (!are_write_flags_valid(op->flags)) {
1684 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1685 goto done_with_error;
1686 }
1687 if (op->data.send_message.send_message == nullptr) {
1688 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1689 goto done_with_error;
1690 }
1691 if (call->sending_message) {
1692 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1693 goto done_with_error;
1694 }
1695 uint32_t flags = op->flags;
1696 /* If the outgoing buffer is already compressed, mark it as so in the
1697 flags. These will be picked up by the compression filter and further
1698 (wasteful) attempts at compression skipped. */
1699 if (op->data.send_message.send_message->data.raw.compression >
1700 GRPC_COMPRESS_NONE) {
1701 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1702 }
1703 stream_op->send_message = true;
1704 call->sending_message = true;
1705 call->sending_stream.Init(
1706 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1707 stream_op_payload->send_message.send_message.reset(
1708 call->sending_stream.get());
1709 has_send_ops = true;
1710 break;
1711 }
1712 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1713 /* Flag validation: currently allow no flags */
1714 if (op->flags != 0) {
1715 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1716 goto done_with_error;
1717 }
1718 if (!call->is_client) {
1719 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1720 goto done_with_error;
1721 }
1722 if (call->sent_final_op) {
1723 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1724 goto done_with_error;
1725 }
1726 stream_op->send_trailing_metadata = true;
1727 call->sent_final_op = true;
1728 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1729 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1730 has_send_ops = true;
1731 break;
1732 }
1733 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1734 /* Flag validation: currently allow no flags */
1735 if (op->flags != 0) {
1736 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1737 goto done_with_error;
1738 }
1739 if (call->is_client) {
1740 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1741 goto done_with_error;
1742 }
1743 if (call->sent_final_op) {
1744 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1745 goto done_with_error;
1746 }
1747 if (op->data.send_status_from_server.trailing_metadata_count >
1748 INT_MAX) {
1749 error = GRPC_CALL_ERROR_INVALID_METADATA;
1750 goto done_with_error;
1751 }
1752 stream_op->send_trailing_metadata = true;
1753 call->sent_final_op = true;
1754 GPR_ASSERT(call->send_extra_metadata_count == 0);
1755 call->send_extra_metadata_count = 1;
1756 call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1757 op->data.send_status_from_server.status);
1758 grpc_error* status_error =
1759 op->data.send_status_from_server.status == GRPC_STATUS_OK
1760 ? GRPC_ERROR_NONE
1761 : grpc_error_set_int(
1762 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1763 "Server returned error"),
1764 GRPC_ERROR_INT_GRPC_STATUS,
1765 static_cast<intptr_t>(
1766 op->data.send_status_from_server.status));
1767 if (op->data.send_status_from_server.status_details != nullptr) {
1768 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1769 GRPC_MDSTR_GRPC_MESSAGE,
1770 grpc_slice_ref_internal(
1771 *op->data.send_status_from_server.status_details));
1772 call->send_extra_metadata_count++;
1773 if (status_error != GRPC_ERROR_NONE) {
1774 char* msg = grpc_slice_to_c_string(
1775 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1776 status_error =
1777 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1778 grpc_slice_from_copied_string(msg));
1779 gpr_free(msg);
1780 }
1781 }
1782
1783 gpr_atm_rel_store(&call->status_error,
1784 reinterpret_cast<gpr_atm>(status_error));
1785 if (!prepare_application_metadata(
1786 call,
1787 static_cast<int>(
1788 op->data.send_status_from_server.trailing_metadata_count),
1789 op->data.send_status_from_server.trailing_metadata, 1, 1,
1790 nullptr, 0)) {
1791 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1792 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1793 }
1794 call->send_extra_metadata_count = 0;
1795 error = GRPC_CALL_ERROR_INVALID_METADATA;
1796 goto done_with_error;
1797 }
1798 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1799 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1800 stream_op_payload->send_trailing_metadata.sent =
1801 &call->sent_server_trailing_metadata;
1802 has_send_ops = true;
1803 break;
1804 }
1805 case GRPC_OP_RECV_INITIAL_METADATA: {
1806 /* Flag validation: currently allow no flags */
1807 if (op->flags != 0) {
1808 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1809 goto done_with_error;
1810 }
1811 if (call->received_initial_metadata) {
1812 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1813 goto done_with_error;
1814 }
1815 call->received_initial_metadata = true;
1816 call->buffered_metadata[0] =
1817 op->data.recv_initial_metadata.recv_initial_metadata;
1818 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1819 receiving_initial_metadata_ready, bctl,
1820 grpc_schedule_on_exec_ctx);
1821 stream_op->recv_initial_metadata = true;
1822 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1823 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1824 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1825 &call->receiving_initial_metadata_ready;
1826 if (!call->is_client) {
1827 stream_op_payload->recv_initial_metadata.peer_string =
1828 &call->peer_string;
1829 }
1830 ++num_recv_ops;
1831 break;
1832 }
1833 case GRPC_OP_RECV_MESSAGE: {
1834 /* Flag validation: currently allow no flags */
1835 if (op->flags != 0) {
1836 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1837 goto done_with_error;
1838 }
1839 if (call->receiving_message) {
1840 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1841 goto done_with_error;
1842 }
1843 call->receiving_message = true;
1844 stream_op->recv_message = true;
1845 call->receiving_buffer = op->data.recv_message.recv_message;
1846 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1847 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1848 receiving_stream_ready_in_call_combiner, bctl,
1849 grpc_schedule_on_exec_ctx);
1850 stream_op_payload->recv_message.recv_message_ready =
1851 &call->receiving_stream_ready;
1852 ++num_recv_ops;
1853 break;
1854 }
1855 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1856 /* Flag validation: currently allow no flags */
1857 if (op->flags != 0) {
1858 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1859 goto done_with_error;
1860 }
1861 if (!call->is_client) {
1862 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1863 goto done_with_error;
1864 }
1865 if (call->requested_final_op) {
1866 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1867 goto done_with_error;
1868 }
1869 call->requested_final_op = true;
1870 call->buffered_metadata[1] =
1871 op->data.recv_status_on_client.trailing_metadata;
1872 call->final_op.client.status = op->data.recv_status_on_client.status;
1873 call->final_op.client.status_details =
1874 op->data.recv_status_on_client.status_details;
1875 call->final_op.client.error_string =
1876 op->data.recv_status_on_client.error_string;
1877 stream_op->recv_trailing_metadata = true;
1878 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1879 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1880 stream_op_payload->recv_trailing_metadata.collect_stats =
1881 &call->final_info.stats.transport_stream_stats;
1882 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1883 receiving_trailing_metadata_ready, bctl,
1884 grpc_schedule_on_exec_ctx);
1885 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1886 &call->receiving_trailing_metadata_ready;
1887 ++num_recv_ops;
1888 break;
1889 }
1890 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1891 /* Flag validation: currently allow no flags */
1892 if (op->flags != 0) {
1893 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1894 goto done_with_error;
1895 }
1896 if (call->is_client) {
1897 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1898 goto done_with_error;
1899 }
1900 if (call->requested_final_op) {
1901 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1902 goto done_with_error;
1903 }
1904 call->requested_final_op = true;
1905 call->final_op.server.cancelled =
1906 op->data.recv_close_on_server.cancelled;
1907 stream_op->recv_trailing_metadata = true;
1908 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1909 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1910 stream_op_payload->recv_trailing_metadata.collect_stats =
1911 &call->final_info.stats.transport_stream_stats;
1912 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1913 receiving_trailing_metadata_ready, bctl,
1914 grpc_schedule_on_exec_ctx);
1915 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1916 &call->receiving_trailing_metadata_ready;
1917 ++num_recv_ops;
1918 break;
1919 }
1920 }
1921 }
1922
1923 GRPC_CALL_INTERNAL_REF(call, "completion");
1924 if (!is_notify_tag_closure) {
1925 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1926 }
1927 bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1928
1929 if (has_send_ops) {
1930 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1931 grpc_schedule_on_exec_ctx);
1932 stream_op->on_complete = &bctl->finish_batch;
1933 }
1934
1935 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1936 execute_batch(call, stream_op, &bctl->start_batch);
1937
1938 done:
1939 return error;
1940
1941 done_with_error:
1942 /* reverse any mutations that occurred */
1943 if (stream_op->send_initial_metadata) {
1944 call->sent_initial_metadata = false;
1945 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1946 }
1947 if (stream_op->send_message) {
1948 call->sending_message = false;
1949 call->sending_stream->Orphan();
1950 }
1951 if (stream_op->send_trailing_metadata) {
1952 call->sent_final_op = false;
1953 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1954 }
1955 if (stream_op->recv_initial_metadata) {
1956 call->received_initial_metadata = false;
1957 }
1958 if (stream_op->recv_message) {
1959 call->receiving_message = false;
1960 }
1961 if (stream_op->recv_trailing_metadata) {
1962 call->requested_final_op = false;
1963 }
1964 goto done;
1965 }
1966
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1967 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1968 size_t nops, void* tag, void* reserved) {
1969 grpc_call_error err;
1970
1971 GRPC_API_TRACE(
1972 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1973 "reserved=%p)",
1974 5, (call, ops, (unsigned long)nops, tag, reserved));
1975
1976 if (reserved != nullptr) {
1977 err = GRPC_CALL_ERROR;
1978 } else {
1979 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1980 grpc_core::ExecCtx exec_ctx;
1981 err = call_start_batch(call, ops, nops, tag, 0);
1982 }
1983
1984 return err;
1985 }
1986
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1987 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1988 const grpc_op* ops,
1989 size_t nops,
1990 grpc_closure* closure) {
1991 return call_start_batch(call, ops, nops, closure, 1);
1992 }
1993
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1994 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1995 void* value, void (*destroy)(void* value)) {
1996 if (call->context[elem].destroy) {
1997 call->context[elem].destroy(call->context[elem].value);
1998 }
1999 call->context[elem].value = value;
2000 call->context[elem].destroy = destroy;
2001 }
2002
grpc_call_context_get(grpc_call * call,grpc_context_index elem)2003 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
2004 return call->context[elem].value;
2005 }
2006
grpc_call_is_client(grpc_call * call)2007 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2008
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2009 grpc_compression_algorithm grpc_call_compression_for_level(
2010 grpc_call* call, grpc_compression_level level) {
2011 grpc_compression_algorithm algo =
2012 compression_algorithm_for_level_locked(call, level);
2013 return algo;
2014 }
2015
grpc_call_error_to_string(grpc_call_error error)2016 const char* grpc_call_error_to_string(grpc_call_error error) {
2017 switch (error) {
2018 case GRPC_CALL_ERROR:
2019 return "GRPC_CALL_ERROR";
2020 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2021 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2022 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2023 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2024 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2025 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2026 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2027 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2028 case GRPC_CALL_ERROR_INVALID_FLAGS:
2029 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2030 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2031 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2032 case GRPC_CALL_ERROR_INVALID_METADATA:
2033 return "GRPC_CALL_ERROR_INVALID_METADATA";
2034 case GRPC_CALL_ERROR_NOT_INVOKED:
2035 return "GRPC_CALL_ERROR_NOT_INVOKED";
2036 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2037 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2038 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2039 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2040 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2041 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2042 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2043 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2044 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2045 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2046 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2047 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2048 case GRPC_CALL_OK:
2049 return "GRPC_CALL_OK";
2050 }
2051 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2052 }
2053