1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <string>
28
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65
66 /** The maximum number of concurrent batches possible.
67 Based upon the maximum number of individually queueable ops in the batch
68 api:
69 - initial metadata send
70 - message send
71 - status/close send (depending on client/server)
72 - initial metadata recv
73 - message recv
74 - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81
82 struct batch_control {
83 batch_control() = default;
84
85 grpc_call* call = nullptr;
86 grpc_transport_stream_op_batch op;
87 /* Share memory for cq_completion and notify_tag as they are never needed
88 simultaneously. Each byte used in this data structure count as six bytes
89 per call, so any savings we can make are worthwhile,
90
91 We use notify_tag to determine whether or not to send notification to the
92 completion queue. Once we've made that determination, we can reuse the
93 memory for cq_completion. */
94 union {
95 grpc_cq_completion cq_completion;
96 struct {
97 /* Any given op indicates completion by either (a) calling a closure or
98 (b) sending a notification on the call's completion queue. If
99 \a is_closure is true, \a tag indicates a closure to be invoked;
100 otherwise, \a tag indicates the tag to be used in the notification to
101 be sent to the completion queue. */
102 void* tag;
103 bool is_closure;
104 } notify_tag;
105 } completion_data;
106 grpc_closure start_batch;
107 grpc_closure finish_batch;
108 grpc_core::Atomic<intptr_t> steps_to_complete;
109 gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110 void set_num_steps_to_complete(uintptr_t steps) {
111 steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112 }
completed_batch_stepbatch_control113 bool completed_batch_step() {
114 return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115 }
116 };
117
118 struct parent_call {
parent_callparent_call119 parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120 ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121
122 gpr_mu child_list_mu;
123 grpc_call* first_child = nullptr;
124 };
125
126 struct child_call {
child_callchild_call127 child_call(grpc_call* parent) : parent(parent) {}
128 grpc_call* parent;
129 /** siblings: children of the same parent form a list, and this list is
130 protected under
131 parent->mu */
132 grpc_call* sibling_next = nullptr;
133 grpc_call* sibling_prev = nullptr;
134 };
135
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138
139 struct grpc_call {
grpc_callgrpc_call140 grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141 : arena(arena),
142 cq(args.cq),
143 channel(args.channel),
144 is_client(args.server_transport_data == nullptr),
145 stream_op_payload(context) {
146 for (int i = 0; i < 2; i++) {
147 for (int j = 0; j < 2; j++) {
148 metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149 }
150 }
151 }
152
~grpc_callgrpc_call153 ~grpc_call() {
154 gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155 }
156
157 grpc_core::RefCount ext_ref;
158 grpc_core::Arena* arena;
159 grpc_core::CallCombiner call_combiner;
160 grpc_completion_queue* cq;
161 grpc_polling_entity pollent;
162 grpc_channel* channel;
163 gpr_cycle_counter start_time = gpr_get_cycle_counter();
164 /* parent_call* */ gpr_atm parent_call_atm = 0;
165 child_call* child = nullptr;
166
167 /* client or server call */
168 bool is_client;
169 /** has grpc_call_unref been called */
170 bool destroy_called = false;
171 /** flag indicating that cancellation is inherited */
172 bool cancellation_is_inherited = false;
173 /** which ops are in-flight */
174 bool sent_initial_metadata = false;
175 bool sending_message = false;
176 bool sent_final_op = false;
177 bool received_initial_metadata = false;
178 bool receiving_message = false;
179 bool requested_final_op = false;
180 gpr_atm any_ops_sent_atm = 0;
181 gpr_atm received_final_op_atm = 0;
182
183 batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184 grpc_transport_stream_op_batch_payload stream_op_payload;
185
186 /* first idx: is_receiving, second idx: is_trailing */
187 grpc_metadata_batch metadata_batch[2][2] = {};
188
189 /* Buffered read metadata waiting to be returned to the application.
190 Element 0 is initial metadata, element 1 is trailing metadata. */
191 grpc_metadata_array* buffered_metadata[2] = {};
192
193 grpc_metadata compression_md;
194
195 // A char* indicating the peer name.
196 gpr_atm peer_string = 0;
197
198 /* Call data useful used for reporting. Only valid after the call has
199 * completed */
200 grpc_call_final_info final_info;
201
202 /* Compression algorithm for *incoming* data */
203 grpc_message_compression_algorithm incoming_message_compression_algorithm =
204 GRPC_MESSAGE_COMPRESS_NONE;
205 /* Stream compression algorithm for *incoming* data */
206 grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207 GRPC_STREAM_COMPRESS_NONE;
208 /* Supported encodings (compression algorithms), a bitset.
209 * Always support no compression. */
210 uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211 /* Supported stream encodings (stream compression algorithms), a bitset */
212 uint32_t stream_encodings_accepted_by_peer = 0;
213
214 /* Contexts for various subsystems (security, tracing, ...). */
215 grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216
217 /* for the client, extra metadata is initial metadata; for the
218 server, it's trailing metadata */
219 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220 int send_extra_metadata_count;
221 grpc_millis send_deadline;
222
223 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224
225 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226 grpc_byte_buffer** receiving_buffer = nullptr;
227 grpc_slice receiving_slice = grpc_empty_slice();
228 grpc_closure receiving_slice_ready;
229 grpc_closure receiving_stream_ready;
230 grpc_closure receiving_initial_metadata_ready;
231 grpc_closure receiving_trailing_metadata_ready;
232 uint32_t test_only_last_message_flags = 0;
233 // Status about operation of call
234 bool sent_server_trailing_metadata = false;
235 gpr_atm cancelled_with_error = 0;
236
237 grpc_closure release_call;
238
239 union {
240 struct {
241 grpc_status_code* status;
242 grpc_slice* status_details;
243 const char** error_string;
244 } client;
245 struct {
246 int* cancelled;
247 // backpointer to owning server if this is a server side call.
248 grpc_server* server;
249 } server;
250 } final_op;
251 gpr_atm status_error = 0;
252
253 /* recv_state can contain one of the following values:
254 RECV_NONE : : no initial metadata and messages received
255 RECV_INITIAL_METADATA_FIRST : received initial metadata first
256 a batch_control* : received messages first
257
258 +------1------RECV_NONE------3-----+
259 | |
260 | |
261 v v
262 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
263 | ^ | ^
264 | | | |
265 +-----2-----+ +-----4-----+
266
267 For 1, 4: See receiving_initial_metadata_ready() function
268 For 2, 3: See receiving_stream_ready() function */
269 gpr_atm recv_state = 0;
270 };
271
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274
275 #define CALL_STACK_FROM_CALL(call) \
276 (grpc_call_stack*)((char*)(call) + \
277 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279 (grpc_call*)(((char*)(call_stack)) - \
280 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286
287 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
288 grpc_closure* start_batch_closure);
289
290 static void cancel_with_status(grpc_call* c, grpc_status_code status,
291 const char* description);
292 static void cancel_with_error(grpc_call* c, grpc_error* error);
293 static void destroy_call(void* call_stack, grpc_error* error);
294 static void receiving_slice_ready(void* bctlp, grpc_error* error);
295 static void set_final_status(grpc_call* call, grpc_error* error);
296 static void process_data_after_md(batch_control* bctl);
297 static void post_batch_completion(batch_control* bctl);
298
add_init_error(grpc_error ** composite,grpc_error * new_err)299 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
300 if (new_err == GRPC_ERROR_NONE) return;
301 if (*composite == GRPC_ERROR_NONE)
302 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
303 *composite = grpc_error_add_child(*composite, new_err);
304 }
305
grpc_call_arena_alloc(grpc_call * call,size_t size)306 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
307 return call->arena->Alloc(size);
308 }
309
get_or_create_parent_call(grpc_call * call)310 static parent_call* get_or_create_parent_call(grpc_call* call) {
311 parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
312 if (p == nullptr) {
313 p = call->arena->New<parent_call>();
314 if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
315 (gpr_atm)p)) {
316 p->~parent_call();
317 p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
318 }
319 }
320 return p;
321 }
322
get_parent_call(grpc_call * call)323 static parent_call* get_parent_call(grpc_call* call) {
324 return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
325 }
326
grpc_call_get_initial_size_estimate()327 size_t grpc_call_get_initial_size_estimate() {
328 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
329 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
330 }
331
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)332 grpc_error* grpc_call_create(const grpc_call_create_args* args,
333 grpc_call** out_call) {
334 GPR_TIMER_SCOPE("grpc_call_create", 0);
335
336 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
337
338 grpc_core::Arena* arena;
339 grpc_call* call;
340 grpc_error* error = GRPC_ERROR_NONE;
341 grpc_channel_stack* channel_stack =
342 grpc_channel_get_channel_stack(args->channel);
343 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
344 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
345 size_t call_and_stack_size =
346 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
347 channel_stack->call_stack_size;
348 size_t call_alloc_size =
349 call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
350
351 std::pair<grpc_core::Arena*, void*> arena_with_call =
352 grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
353 arena = arena_with_call.first;
354 call = new (arena_with_call.second) grpc_call(arena, *args);
355 *out_call = call;
356 grpc_slice path = grpc_empty_slice();
357 if (call->is_client) {
358 call->final_op.client.status_details = nullptr;
359 call->final_op.client.status = nullptr;
360 call->final_op.client.error_string = nullptr;
361 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
362 GPR_ASSERT(args->add_initial_metadata_count <
363 MAX_SEND_EXTRA_METADATA_COUNT);
364 for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
365 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
366 if (grpc_slice_eq_static_interned(
367 GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
368 path = grpc_slice_ref_internal(
369 GRPC_MDVALUE(args->add_initial_metadata[i]));
370 }
371 }
372 call->send_extra_metadata_count =
373 static_cast<int>(args->add_initial_metadata_count);
374 } else {
375 GRPC_STATS_INC_SERVER_CALLS_CREATED();
376 call->final_op.server.cancelled = nullptr;
377 call->final_op.server.server = args->server;
378 GPR_ASSERT(args->add_initial_metadata_count == 0);
379 call->send_extra_metadata_count = 0;
380 }
381
382 grpc_millis send_deadline = args->send_deadline;
383 bool immediately_cancel = false;
384
385 if (args->parent != nullptr) {
386 call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
387 call_and_stack_size) child_call(args->parent);
388
389 GRPC_CALL_INTERNAL_REF(args->parent, "child");
390 GPR_ASSERT(call->is_client);
391 GPR_ASSERT(!args->parent->is_client);
392
393 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
394 send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
395 }
396 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
397 * GRPC_PROPAGATE_STATS_CONTEXT */
398 /* TODO(ctiller): This should change to use the appropriate census start_op
399 * call. */
400 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
401 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
402 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
403 "Census tracing propagation requested "
404 "without Census context propagation"));
405 }
406 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
407 args->parent->context[GRPC_CONTEXT_TRACING].value,
408 nullptr);
409 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
410 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
411 "Census context propagation requested "
412 "without Census tracing propagation"));
413 }
414 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
415 call->cancellation_is_inherited = 1;
416 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
417 immediately_cancel = true;
418 }
419 }
420 }
421 call->send_deadline = send_deadline;
422 /* initial refcount dropped by grpc_call_unref */
423 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
424 args->server_transport_data,
425 call->context,
426 path,
427 call->start_time,
428 send_deadline,
429 call->arena,
430 &call->call_combiner};
431 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
432 call, &call_args));
433 // Publish this call to parent only after the call stack has been initialized.
434 if (args->parent != nullptr) {
435 child_call* cc = call->child;
436 parent_call* pc = get_or_create_parent_call(args->parent);
437 gpr_mu_lock(&pc->child_list_mu);
438 if (pc->first_child == nullptr) {
439 pc->first_child = call;
440 cc->sibling_next = cc->sibling_prev = call;
441 } else {
442 cc->sibling_next = pc->first_child;
443 cc->sibling_prev = pc->first_child->child->sibling_prev;
444 cc->sibling_next->child->sibling_prev =
445 cc->sibling_prev->child->sibling_next = call;
446 }
447 gpr_mu_unlock(&pc->child_list_mu);
448 }
449
450 if (error != GRPC_ERROR_NONE) {
451 cancel_with_error(call, GRPC_ERROR_REF(error));
452 }
453 if (immediately_cancel) {
454 cancel_with_error(call, GRPC_ERROR_CANCELLED);
455 }
456 if (args->cq != nullptr) {
457 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
458 "Only one of 'cq' and 'pollset_set_alternative' should be "
459 "non-nullptr.");
460 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
461 call->pollent =
462 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
463 }
464 if (args->pollset_set_alternative != nullptr) {
465 call->pollent = grpc_polling_entity_create_from_pollset_set(
466 args->pollset_set_alternative);
467 }
468 if (!grpc_polling_entity_is_empty(&call->pollent)) {
469 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
470 &call->pollent);
471 }
472
473 if (call->is_client) {
474 grpc_core::channelz::ChannelNode* channelz_channel =
475 grpc_channel_get_channelz_node(call->channel);
476 if (channelz_channel != nullptr) {
477 channelz_channel->RecordCallStarted();
478 }
479 } else {
480 grpc_core::channelz::ServerNode* channelz_server =
481 grpc_server_get_channelz_node(call->final_op.server.server);
482 if (channelz_server != nullptr) {
483 channelz_server->RecordCallStarted();
484 }
485 }
486
487 grpc_slice_unref_internal(path);
488
489 return error;
490 }
491
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)492 void grpc_call_set_completion_queue(grpc_call* call,
493 grpc_completion_queue* cq) {
494 GPR_ASSERT(cq);
495
496 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
497 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
498 abort();
499 }
500 call->cq = cq;
501 GRPC_CQ_INTERNAL_REF(cq, "bind");
502 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
503 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
504 &call->pollent);
505 }
506
507 #ifndef NDEBUG
508 #define REF_REASON reason
509 #define REF_ARG , const char* reason
510 #else
511 #define REF_REASON ""
512 #define REF_ARG
513 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)514 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
515 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
516 }
grpc_call_internal_unref(grpc_call * c REF_ARG)517 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
518 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
519 }
520
release_call(void * call,grpc_error *)521 static void release_call(void* call, grpc_error* /*error*/) {
522 grpc_call* c = static_cast<grpc_call*>(call);
523 grpc_channel* channel = c->channel;
524 grpc_core::Arena* arena = c->arena;
525 c->~grpc_call();
526 grpc_channel_update_call_size_estimate(channel, arena->Destroy());
527 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
528 }
529
destroy_call(void * call,grpc_error *)530 static void destroy_call(void* call, grpc_error* /*error*/) {
531 GPR_TIMER_SCOPE("destroy_call", 0);
532 size_t i;
533 int ii;
534 grpc_call* c = static_cast<grpc_call*>(call);
535 for (i = 0; i < 2; i++) {
536 grpc_metadata_batch_destroy(
537 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
538 }
539 c->receiving_stream.reset();
540 parent_call* pc = get_parent_call(c);
541 if (pc != nullptr) {
542 pc->~parent_call();
543 }
544 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
545 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
546 }
547 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
548 if (c->context[i].destroy) {
549 c->context[i].destroy(c->context[i].value);
550 }
551 }
552 if (c->cq) {
553 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
554 }
555
556 grpc_error* status_error =
557 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
558 grpc_error_get_status(status_error, c->send_deadline,
559 &c->final_info.final_status, nullptr, nullptr,
560 &(c->final_info.error_string));
561 GRPC_ERROR_UNREF(status_error);
562 c->final_info.stats.latency =
563 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
564 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
565 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
566 grpc_schedule_on_exec_ctx));
567 }
568
grpc_call_ref(grpc_call * c)569 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
570
grpc_call_unref(grpc_call * c)571 void grpc_call_unref(grpc_call* c) {
572 if (GPR_LIKELY(!c->ext_ref.Unref())) return;
573
574 GPR_TIMER_SCOPE("grpc_call_unref", 0);
575
576 child_call* cc = c->child;
577 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
578 grpc_core::ExecCtx exec_ctx;
579
580 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
581
582 if (cc) {
583 parent_call* pc = get_parent_call(cc->parent);
584 gpr_mu_lock(&pc->child_list_mu);
585 if (c == pc->first_child) {
586 pc->first_child = cc->sibling_next;
587 if (c == pc->first_child) {
588 pc->first_child = nullptr;
589 }
590 }
591 cc->sibling_prev->child->sibling_next = cc->sibling_next;
592 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
593 gpr_mu_unlock(&pc->child_list_mu);
594 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
595 }
596
597 GPR_ASSERT(!c->destroy_called);
598 c->destroy_called = 1;
599 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
600 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
601 if (cancel) {
602 cancel_with_error(c, GRPC_ERROR_CANCELLED);
603 } else {
604 // Unset the call combiner cancellation closure. This has the
605 // effect of scheduling the previously set cancellation closure, if
606 // any, so that it can release any internal references it may be
607 // holding to the call stack. Also flush the closures on exec_ctx so that
608 // filters that schedule cancel notification closures on exec_ctx do not
609 // need to take a ref of the call stack to guarantee closure liveness.
610 c->call_combiner.SetNotifyOnCancel(nullptr);
611 grpc_core::ExecCtx::Get()->Flush();
612 }
613 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
614 }
615
grpc_call_cancel(grpc_call * call,void * reserved)616 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
617 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
618 GPR_ASSERT(!reserved);
619 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
620 grpc_core::ExecCtx exec_ctx;
621 cancel_with_error(call, GRPC_ERROR_CANCELLED);
622 return GRPC_CALL_OK;
623 }
624
625 // This is called via the call combiner to start sending a batch down
626 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error *)627 static void execute_batch_in_call_combiner(void* arg, grpc_error* /*ignored*/) {
628 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
629 grpc_transport_stream_op_batch* batch =
630 static_cast<grpc_transport_stream_op_batch*>(arg);
631 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
632 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
633 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
634 elem->filter->start_transport_stream_op_batch(elem, batch);
635 }
636
637 // start_batch_closure points to a caller-allocated closure to be used
638 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)639 static void execute_batch(grpc_call* call,
640 grpc_transport_stream_op_batch* batch,
641 grpc_closure* start_batch_closure) {
642 batch->handler_private.extra_arg = call;
643 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
644 grpc_schedule_on_exec_ctx);
645 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
646 GRPC_ERROR_NONE, "executing batch");
647 }
648
grpc_call_get_peer(grpc_call * call)649 char* grpc_call_get_peer(grpc_call* call) {
650 char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
651 if (peer_string != nullptr) return gpr_strdup(peer_string);
652 peer_string = grpc_channel_get_target(call->channel);
653 if (peer_string != nullptr) return peer_string;
654 return gpr_strdup("unknown");
655 }
656
grpc_call_from_top_element(grpc_call_element * elem)657 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
658 return CALL_FROM_TOP_ELEM(elem);
659 }
660
661 /*******************************************************************************
662 * CANCELLATION
663 */
664
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)665 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
666 grpc_status_code status,
667 const char* description,
668 void* reserved) {
669 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
670 grpc_core::ExecCtx exec_ctx;
671 GRPC_API_TRACE(
672 "grpc_call_cancel_with_status("
673 "c=%p, status=%d, description=%s, reserved=%p)",
674 4, (c, (int)status, description, reserved));
675 GPR_ASSERT(reserved == nullptr);
676 cancel_with_status(c, status, description);
677 return GRPC_CALL_OK;
678 }
679
680 struct cancel_state {
681 grpc_call* call;
682 grpc_closure start_batch;
683 grpc_closure finish_batch;
684 };
685 // The on_complete callback used when sending a cancel_stream batch down
686 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error *)687 static void done_termination(void* arg, grpc_error* /*error*/) {
688 cancel_state* state = static_cast<cancel_state*>(arg);
689 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
690 "on_complete for cancel_stream op");
691 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
692 gpr_free(state);
693 }
694
cancel_with_error(grpc_call * c,grpc_error * error)695 static void cancel_with_error(grpc_call* c, grpc_error* error) {
696 if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
697 GRPC_ERROR_UNREF(error);
698 return;
699 }
700 GRPC_CALL_INTERNAL_REF(c, "termination");
701 // Inform the call combiner of the cancellation, so that it can cancel
702 // any in-flight asynchronous actions that may be holding the call
703 // combiner. This ensures that the cancel_stream batch can be sent
704 // down the filter stack in a timely manner.
705 c->call_combiner.Cancel(GRPC_ERROR_REF(error));
706 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
707 state->call = c;
708 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
709 grpc_schedule_on_exec_ctx);
710 grpc_transport_stream_op_batch* op =
711 grpc_make_transport_stream_op(&state->finish_batch);
712 op->cancel_stream = true;
713 op->payload->cancel_stream.cancel_error = error;
714 execute_batch(c, op, &state->start_batch);
715 }
716
grpc_call_cancel_internal(grpc_call * call)717 void grpc_call_cancel_internal(grpc_call* call) {
718 cancel_with_error(call, GRPC_ERROR_CANCELLED);
719 }
720
error_from_status(grpc_status_code status,const char * description)721 static grpc_error* error_from_status(grpc_status_code status,
722 const char* description) {
723 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
724 // guarantee that can be short-lived.
725 return grpc_error_set_int(
726 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
727 GRPC_ERROR_STR_GRPC_MESSAGE,
728 grpc_slice_from_copied_string(description)),
729 GRPC_ERROR_INT_GRPC_STATUS, status);
730 }
731
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)732 static void cancel_with_status(grpc_call* c, grpc_status_code status,
733 const char* description) {
734 cancel_with_error(c, error_from_status(status, description));
735 }
736
set_final_status(grpc_call * call,grpc_error * error)737 static void set_final_status(grpc_call* call, grpc_error* error) {
738 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
739 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
740 gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
741 }
742 if (call->is_client) {
743 grpc_error_get_status(error, call->send_deadline,
744 call->final_op.client.status,
745 call->final_op.client.status_details, nullptr,
746 call->final_op.client.error_string);
747 // explicitly take a ref
748 grpc_slice_ref_internal(*call->final_op.client.status_details);
749 gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
750 grpc_core::channelz::ChannelNode* channelz_channel =
751 grpc_channel_get_channelz_node(call->channel);
752 if (channelz_channel != nullptr) {
753 if (*call->final_op.client.status != GRPC_STATUS_OK) {
754 channelz_channel->RecordCallFailed();
755 } else {
756 channelz_channel->RecordCallSucceeded();
757 }
758 }
759 } else {
760 *call->final_op.server.cancelled =
761 error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
762 grpc_core::channelz::ServerNode* channelz_server =
763 grpc_server_get_channelz_node(call->final_op.server.server);
764 if (channelz_server != nullptr) {
765 if (*call->final_op.server.cancelled ||
766 reinterpret_cast<grpc_error*>(
767 gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
768 channelz_server->RecordCallFailed();
769 } else {
770 channelz_server->RecordCallSucceeded();
771 }
772 }
773 GRPC_ERROR_UNREF(error);
774 }
775 }
776
777 /*******************************************************************************
778 * COMPRESSION
779 */
780
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)781 static void set_incoming_message_compression_algorithm(
782 grpc_call* call, grpc_message_compression_algorithm algo) {
783 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
784 call->incoming_message_compression_algorithm = algo;
785 }
786
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)787 static void set_incoming_stream_compression_algorithm(
788 grpc_call* call, grpc_stream_compression_algorithm algo) {
789 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
790 call->incoming_stream_compression_algorithm = algo;
791 }
792
grpc_call_test_only_get_compression_algorithm(grpc_call * call)793 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
794 grpc_call* call) {
795 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
796 grpc_compression_algorithm_from_message_stream_compression_algorithm(
797 &algorithm, call->incoming_message_compression_algorithm,
798 call->incoming_stream_compression_algorithm);
799 return algorithm;
800 }
801
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)802 static grpc_compression_algorithm compression_algorithm_for_level_locked(
803 grpc_call* call, grpc_compression_level level) {
804 return grpc_compression_algorithm_for_level(level,
805 call->encodings_accepted_by_peer);
806 }
807
grpc_call_test_only_get_message_flags(grpc_call * call)808 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
809 uint32_t flags;
810 flags = call->test_only_last_message_flags;
811 return flags;
812 }
813
destroy_encodings_accepted_by_peer(void *)814 static void destroy_encodings_accepted_by_peer(void* /*p*/) { return; }
815
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)816 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
817 grpc_mdelem mdel,
818 uint32_t* encodings_accepted_by_peer,
819 bool stream_encoding) {
820 size_t i;
821 uint32_t algorithm;
822 grpc_slice_buffer accept_encoding_parts;
823 grpc_slice accept_encoding_slice;
824 void* accepted_user_data;
825
826 accepted_user_data =
827 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
828 if (accepted_user_data != nullptr) {
829 *encodings_accepted_by_peer =
830 static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
831 return;
832 }
833
834 *encodings_accepted_by_peer = 0;
835
836 accept_encoding_slice = GRPC_MDVALUE(mdel);
837 grpc_slice_buffer_init(&accept_encoding_parts);
838 grpc_slice_split_without_space(accept_encoding_slice, ",",
839 &accept_encoding_parts);
840
841 GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
842 for (i = 0; i < accept_encoding_parts.count; i++) {
843 int r;
844 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
845 if (!stream_encoding) {
846 r = grpc_message_compression_algorithm_parse(
847 accept_encoding_entry_slice,
848 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
849 } else {
850 r = grpc_stream_compression_algorithm_parse(
851 accept_encoding_entry_slice,
852 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
853 }
854 if (r) {
855 GPR_BITSET(encodings_accepted_by_peer, algorithm);
856 } else {
857 char* accept_encoding_entry_str =
858 grpc_slice_to_c_string(accept_encoding_entry_slice);
859 gpr_log(GPR_DEBUG,
860 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
861 accept_encoding_entry_str);
862 gpr_free(accept_encoding_entry_str);
863 }
864 }
865
866 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
867
868 grpc_mdelem_set_user_data(
869 mdel, destroy_encodings_accepted_by_peer,
870 (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
871 }
872
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)873 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
874 uint32_t encodings_accepted_by_peer;
875 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
876 return encodings_accepted_by_peer;
877 }
878
879 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)880 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
881 return call->incoming_stream_compression_algorithm;
882 }
883
linked_from_md(const grpc_metadata * md)884 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
885 return (grpc_linked_mdelem*)&md->internal_data;
886 }
887
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)888 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
889 grpc_metadata* additional_metadata, int i,
890 int count) {
891 grpc_metadata* res =
892 i < count ? &metadata[i] : &additional_metadata[i - count];
893 GPR_ASSERT(res);
894 return res;
895 }
896
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)897 static int prepare_application_metadata(grpc_call* call, int count,
898 grpc_metadata* metadata,
899 int is_trailing,
900 int prepend_extra_metadata,
901 grpc_metadata* additional_metadata,
902 int additional_metadata_count) {
903 int total_count = count + additional_metadata_count;
904 int i;
905 grpc_metadata_batch* batch =
906 &call->metadata_batch[0 /* is_receiving */][is_trailing];
907 for (i = 0; i < total_count; i++) {
908 const grpc_metadata* md =
909 get_md_elem(metadata, additional_metadata, i, count);
910 grpc_linked_mdelem* l = linked_from_md(md);
911 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
912 if (!GRPC_LOG_IF_ERROR("validate_metadata",
913 grpc_validate_header_key_is_legal(md->key))) {
914 break;
915 } else if (!grpc_is_binary_header_internal(md->key) &&
916 !GRPC_LOG_IF_ERROR(
917 "validate_metadata",
918 grpc_validate_header_nonbin_value_is_legal(md->value))) {
919 break;
920 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
921 // HTTP2 hpack encoding has a maximum limit.
922 break;
923 }
924 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
925 }
926 if (i != total_count) {
927 for (int j = 0; j < i; j++) {
928 const grpc_metadata* md =
929 get_md_elem(metadata, additional_metadata, j, count);
930 grpc_linked_mdelem* l = linked_from_md(md);
931 GRPC_MDELEM_UNREF(l->md);
932 }
933 return 0;
934 }
935 if (prepend_extra_metadata) {
936 if (call->send_extra_metadata_count == 0) {
937 prepend_extra_metadata = 0;
938 } else {
939 for (i = 0; i < call->send_extra_metadata_count; i++) {
940 GRPC_LOG_IF_ERROR("prepare_application_metadata",
941 grpc_metadata_batch_link_tail(
942 batch, &call->send_extra_metadata[i]));
943 }
944 }
945 }
946 for (i = 0; i < total_count; i++) {
947 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
948 grpc_linked_mdelem* l = linked_from_md(md);
949 grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
950 if (error != GRPC_ERROR_NONE) {
951 GRPC_MDELEM_UNREF(l->md);
952 }
953 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
954 }
955 call->send_extra_metadata_count = 0;
956
957 return 1;
958 }
959
decode_message_compression(grpc_mdelem md)960 static grpc_message_compression_algorithm decode_message_compression(
961 grpc_mdelem md) {
962 grpc_message_compression_algorithm algorithm =
963 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
964 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
965 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
966 gpr_log(GPR_ERROR,
967 "Invalid incoming message compression algorithm: '%s'. "
968 "Interpreting incoming data as uncompressed.",
969 md_c_str);
970 gpr_free(md_c_str);
971 return GRPC_MESSAGE_COMPRESS_NONE;
972 }
973 return algorithm;
974 }
975
decode_stream_compression(grpc_mdelem md)976 static grpc_stream_compression_algorithm decode_stream_compression(
977 grpc_mdelem md) {
978 grpc_stream_compression_algorithm algorithm =
979 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
980 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
981 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
982 gpr_log(GPR_ERROR,
983 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
984 "incoming data as uncompressed.",
985 md_c_str);
986 gpr_free(md_c_str);
987 return GRPC_STREAM_COMPRESS_NONE;
988 }
989 return algorithm;
990 }
991
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)992 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
993 int is_trailing) {
994 if (b->list.count == 0) return;
995 if (!call->is_client && is_trailing) return;
996 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
997 GPR_TIMER_SCOPE("publish_app_metadata", 0);
998 grpc_metadata_array* dest;
999 grpc_metadata* mdusr;
1000 dest = call->buffered_metadata[is_trailing];
1001 if (dest->count + b->list.count > dest->capacity) {
1002 dest->capacity =
1003 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1004 dest->metadata = static_cast<grpc_metadata*>(
1005 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1006 }
1007 for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1008 mdusr = &dest->metadata[dest->count++];
1009 /* we pass back borrowed slices that are valid whilst the call is valid */
1010 mdusr->key = GRPC_MDKEY(l->md);
1011 mdusr->value = GRPC_MDVALUE(l->md);
1012 }
1013 }
1014
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1015 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1016 if (b->idx.named.content_encoding != nullptr) {
1017 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1018 set_incoming_stream_compression_algorithm(
1019 call, decode_stream_compression(b->idx.named.content_encoding->md));
1020 grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1021 }
1022 if (b->idx.named.grpc_encoding != nullptr) {
1023 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1024 set_incoming_message_compression_algorithm(
1025 call, decode_message_compression(b->idx.named.grpc_encoding->md));
1026 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1027 }
1028 uint32_t message_encodings_accepted_by_peer = 1u;
1029 uint32_t stream_encodings_accepted_by_peer = 1u;
1030 if (b->idx.named.grpc_accept_encoding != nullptr) {
1031 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1032 set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1033 &message_encodings_accepted_by_peer, false);
1034 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1035 }
1036 if (b->idx.named.accept_encoding != nullptr) {
1037 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1038 set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1039 &stream_encodings_accepted_by_peer, true);
1040 grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1041 }
1042 call->encodings_accepted_by_peer =
1043 grpc_compression_bitset_from_message_stream_compression_bitset(
1044 message_encodings_accepted_by_peer,
1045 stream_encodings_accepted_by_peer);
1046 publish_app_metadata(call, b, false);
1047 }
1048
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1049 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1050 grpc_error* batch_error) {
1051 grpc_call* call = static_cast<grpc_call*>(args);
1052 if (batch_error != GRPC_ERROR_NONE) {
1053 set_final_status(call, batch_error);
1054 } else if (b->idx.named.grpc_status != nullptr) {
1055 grpc_status_code status_code =
1056 grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1057 grpc_error* error = GRPC_ERROR_NONE;
1058 if (status_code != GRPC_STATUS_OK) {
1059 char* peer = grpc_call_get_peer(call);
1060 error = grpc_error_set_int(
1061 GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1062 absl::StrCat("Error received from peer ", peer).c_str()),
1063 GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1064 gpr_free(peer);
1065 }
1066 if (b->idx.named.grpc_message != nullptr) {
1067 error = grpc_error_set_str(
1068 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1069 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1070 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1071 } else if (error != GRPC_ERROR_NONE) {
1072 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1073 grpc_empty_slice());
1074 }
1075 set_final_status(call, GRPC_ERROR_REF(error));
1076 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1077 GRPC_ERROR_UNREF(error);
1078 } else if (!call->is_client) {
1079 set_final_status(call, GRPC_ERROR_NONE);
1080 } else {
1081 gpr_log(GPR_DEBUG,
1082 "Received trailing metadata with no error and no status");
1083 set_final_status(
1084 call, grpc_error_set_int(
1085 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1086 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1087 }
1088 publish_app_metadata(call, b, true);
1089 }
1090
grpc_call_get_arena(grpc_call * call)1091 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1092
grpc_call_get_call_stack(grpc_call * call)1093 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1094 return CALL_STACK_FROM_CALL(call);
1095 }
1096
1097 /*******************************************************************************
1098 * BATCH API IMPLEMENTATION
1099 */
1100
are_write_flags_valid(uint32_t flags)1101 static bool are_write_flags_valid(uint32_t flags) {
1102 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1103 const uint32_t allowed_write_positions =
1104 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1105 const uint32_t invalid_positions = ~allowed_write_positions;
1106 return !(flags & invalid_positions);
1107 }
1108
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1109 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1110 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1111 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1112 if (!is_client) {
1113 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1114 }
1115 return !(flags & invalid_positions);
1116 }
1117
batch_slot_for_op(grpc_op_type type)1118 static size_t batch_slot_for_op(grpc_op_type type) {
1119 switch (type) {
1120 case GRPC_OP_SEND_INITIAL_METADATA:
1121 return 0;
1122 case GRPC_OP_SEND_MESSAGE:
1123 return 1;
1124 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1125 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1126 return 2;
1127 case GRPC_OP_RECV_INITIAL_METADATA:
1128 return 3;
1129 case GRPC_OP_RECV_MESSAGE:
1130 return 4;
1131 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1132 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1133 return 5;
1134 }
1135 GPR_UNREACHABLE_CODE(return 123456789);
1136 }
1137
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1138 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1139 const grpc_op* ops) {
1140 size_t slot_idx = batch_slot_for_op(ops[0].op);
1141 batch_control** pslot = &call->active_batches[slot_idx];
1142 batch_control* bctl;
1143 if (*pslot != nullptr) {
1144 bctl = *pslot;
1145 if (bctl->call != nullptr) {
1146 return nullptr;
1147 }
1148 bctl->~batch_control();
1149 bctl->op = {};
1150 } else {
1151 bctl = call->arena->New<batch_control>();
1152 *pslot = bctl;
1153 }
1154 bctl->call = call;
1155 bctl->op.payload = &call->stream_op_payload;
1156 return bctl;
1157 }
1158
finish_batch_completion(void * user_data,grpc_cq_completion *)1159 static void finish_batch_completion(void* user_data,
1160 grpc_cq_completion* /*storage*/) {
1161 batch_control* bctl = static_cast<batch_control*>(user_data);
1162 grpc_call* call = bctl->call;
1163 bctl->call = nullptr;
1164 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1165 }
1166
reset_batch_errors(batch_control * bctl)1167 static void reset_batch_errors(batch_control* bctl) {
1168 GRPC_ERROR_UNREF(
1169 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1170 gpr_atm_rel_store(&bctl->batch_error,
1171 reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1172 }
1173
post_batch_completion(batch_control * bctl)1174 static void post_batch_completion(batch_control* bctl) {
1175 grpc_call* next_child_call;
1176 grpc_call* call = bctl->call;
1177 grpc_error* error = GRPC_ERROR_REF(
1178 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1179
1180 if (bctl->op.send_initial_metadata) {
1181 grpc_metadata_batch_destroy(
1182 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1183 }
1184 if (bctl->op.send_message) {
1185 if (bctl->op.payload->send_message.stream_write_closed &&
1186 error == GRPC_ERROR_NONE) {
1187 error = grpc_error_add_child(
1188 error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1189 "Attempt to send message after stream was closed."));
1190 }
1191 call->sending_message = false;
1192 }
1193 if (bctl->op.send_trailing_metadata) {
1194 grpc_metadata_batch_destroy(
1195 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1196 }
1197 if (bctl->op.recv_trailing_metadata) {
1198 /* propagate cancellation to any interested children */
1199 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1200 parent_call* pc = get_parent_call(call);
1201 if (pc != nullptr) {
1202 grpc_call* child;
1203 gpr_mu_lock(&pc->child_list_mu);
1204 child = pc->first_child;
1205 if (child != nullptr) {
1206 do {
1207 next_child_call = child->child->sibling_next;
1208 if (child->cancellation_is_inherited) {
1209 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1210 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1211 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1212 }
1213 child = next_child_call;
1214 } while (child != pc->first_child);
1215 }
1216 gpr_mu_unlock(&pc->child_list_mu);
1217 }
1218 GRPC_ERROR_UNREF(error);
1219 error = GRPC_ERROR_NONE;
1220 }
1221 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1222 *call->receiving_buffer != nullptr) {
1223 grpc_byte_buffer_destroy(*call->receiving_buffer);
1224 *call->receiving_buffer = nullptr;
1225 }
1226 reset_batch_errors(bctl);
1227
1228 if (bctl->completion_data.notify_tag.is_closure) {
1229 /* unrefs error */
1230 bctl->call = nullptr;
1231 grpc_core::Closure::Run(DEBUG_LOCATION,
1232 (grpc_closure*)bctl->completion_data.notify_tag.tag,
1233 error);
1234 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1235 } else {
1236 /* unrefs error */
1237 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1238 finish_batch_completion, bctl,
1239 &bctl->completion_data.cq_completion);
1240 }
1241 }
1242
finish_batch_step(batch_control * bctl)1243 static void finish_batch_step(batch_control* bctl) {
1244 if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1245 post_batch_completion(bctl);
1246 }
1247 }
1248
continue_receiving_slices(batch_control * bctl)1249 static void continue_receiving_slices(batch_control* bctl) {
1250 grpc_error* error;
1251 grpc_call* call = bctl->call;
1252 for (;;) {
1253 size_t remaining = call->receiving_stream->length() -
1254 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1255 if (remaining == 0) {
1256 call->receiving_message = 0;
1257 call->receiving_stream.reset();
1258 finish_batch_step(bctl);
1259 return;
1260 }
1261 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1262 error = call->receiving_stream->Pull(&call->receiving_slice);
1263 if (error == GRPC_ERROR_NONE) {
1264 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1265 call->receiving_slice);
1266 } else {
1267 call->receiving_stream.reset();
1268 grpc_byte_buffer_destroy(*call->receiving_buffer);
1269 *call->receiving_buffer = nullptr;
1270 call->receiving_message = 0;
1271 finish_batch_step(bctl);
1272 GRPC_ERROR_UNREF(error);
1273 return;
1274 }
1275 } else {
1276 return;
1277 }
1278 }
1279 }
1280
receiving_slice_ready(void * bctlp,grpc_error * error)1281 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1282 batch_control* bctl = static_cast<batch_control*>(bctlp);
1283 grpc_call* call = bctl->call;
1284 bool release_error = false;
1285
1286 if (error == GRPC_ERROR_NONE) {
1287 grpc_slice slice;
1288 error = call->receiving_stream->Pull(&slice);
1289 if (error == GRPC_ERROR_NONE) {
1290 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1291 slice);
1292 continue_receiving_slices(bctl);
1293 } else {
1294 /* Error returned by ByteStream::Pull() needs to be released manually */
1295 release_error = true;
1296 }
1297 }
1298
1299 if (error != GRPC_ERROR_NONE) {
1300 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1301 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1302 }
1303 call->receiving_stream.reset();
1304 grpc_byte_buffer_destroy(*call->receiving_buffer);
1305 *call->receiving_buffer = nullptr;
1306 call->receiving_message = 0;
1307 finish_batch_step(bctl);
1308 if (release_error) {
1309 GRPC_ERROR_UNREF(error);
1310 }
1311 }
1312 }
1313
process_data_after_md(batch_control * bctl)1314 static void process_data_after_md(batch_control* bctl) {
1315 grpc_call* call = bctl->call;
1316 if (call->receiving_stream == nullptr) {
1317 *call->receiving_buffer = nullptr;
1318 call->receiving_message = 0;
1319 finish_batch_step(bctl);
1320 } else {
1321 call->test_only_last_message_flags = call->receiving_stream->flags();
1322 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1323 (call->incoming_message_compression_algorithm >
1324 GRPC_MESSAGE_COMPRESS_NONE)) {
1325 grpc_compression_algorithm algo;
1326 GPR_ASSERT(
1327 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1328 &algo, call->incoming_message_compression_algorithm,
1329 (grpc_stream_compression_algorithm)0));
1330 *call->receiving_buffer =
1331 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1332 } else {
1333 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1334 }
1335 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1336 grpc_schedule_on_exec_ctx);
1337 continue_receiving_slices(bctl);
1338 }
1339 }
1340
receiving_stream_ready(void * bctlp,grpc_error * error)1341 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1342 batch_control* bctl = static_cast<batch_control*>(bctlp);
1343 grpc_call* call = bctl->call;
1344 if (error != GRPC_ERROR_NONE) {
1345 call->receiving_stream.reset();
1346 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1347 GRPC_ERROR_NONE) {
1348 gpr_atm_rel_store(&bctl->batch_error,
1349 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1350 }
1351 cancel_with_error(call, GRPC_ERROR_REF(error));
1352 }
1353 /* If recv_state is RECV_NONE, we will save the batch_control
1354 * object with rel_cas, and will not use it after the cas. Its corresponding
1355 * acq_load is in receiving_initial_metadata_ready() */
1356 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1357 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1358 process_data_after_md(bctl);
1359 }
1360 }
1361
1362 // The recv_message_ready callback used when sending a batch containing
1363 // a recv_message op down the filter stack. Yields the call combiner
1364 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1365 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1366 grpc_error* error) {
1367 batch_control* bctl = static_cast<batch_control*>(bctlp);
1368 grpc_call* call = bctl->call;
1369 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1370 receiving_stream_ready(bctlp, error);
1371 }
1372
1373 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1374 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1375 std::string error_msg = absl::StrFormat(
1376 "Incoming stream has both stream compression (%d) and message "
1377 "compression (%d).",
1378 call->incoming_stream_compression_algorithm,
1379 call->incoming_message_compression_algorithm);
1380 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1381 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1382 }
1383
1384 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1385 handle_error_parsing_compression_algorithm(grpc_call* call) {
1386 std::string error_msg = absl::StrFormat(
1387 "Error in incoming message compression (%d) or stream "
1388 "compression (%d).",
1389 call->incoming_stream_compression_algorithm,
1390 call->incoming_message_compression_algorithm);
1391 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1392 }
1393
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1394 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1395 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1396 std::string error_msg = absl::StrFormat(
1397 "Invalid compression algorithm value '%d'.", compression_algorithm);
1398 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1399 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1400 }
1401
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1402 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1403 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1404 const char* algo_name = nullptr;
1405 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1406 std::string error_msg =
1407 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1408 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1409 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1410 }
1411
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1412 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1413 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1414 const char* algo_name = nullptr;
1415 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1416 gpr_log(GPR_ERROR,
1417 "Compression algorithm ('%s') not present in the bitset of "
1418 "accepted encodings ('0x%x')",
1419 algo_name, call->encodings_accepted_by_peer);
1420 }
1421
validate_filtered_metadata(batch_control * bctl)1422 static void validate_filtered_metadata(batch_control* bctl) {
1423 grpc_compression_algorithm compression_algorithm;
1424 grpc_call* call = bctl->call;
1425 if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1426 GRPC_STREAM_COMPRESS_NONE &&
1427 call->incoming_message_compression_algorithm !=
1428 GRPC_MESSAGE_COMPRESS_NONE)) {
1429 handle_both_stream_and_msg_compression_set(call);
1430 } else if (
1431 GPR_UNLIKELY(
1432 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1433 &compression_algorithm,
1434 call->incoming_message_compression_algorithm,
1435 call->incoming_stream_compression_algorithm) == 0)) {
1436 handle_error_parsing_compression_algorithm(call);
1437 } else {
1438 const grpc_compression_options compression_options =
1439 grpc_channel_compression_options(call->channel);
1440 if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1441 handle_invalid_compression(call, compression_algorithm);
1442 } else if (GPR_UNLIKELY(
1443 grpc_compression_options_is_algorithm_enabled_internal(
1444 &compression_options, compression_algorithm) == 0)) {
1445 /* check if algorithm is supported by current channel config */
1446 handle_compression_algorithm_disabled(call, compression_algorithm);
1447 }
1448 /* GRPC_COMPRESS_NONE is always set. */
1449 GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1450 if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1451 compression_algorithm))) {
1452 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1453 handle_compression_algorithm_not_accepted(call, compression_algorithm);
1454 }
1455 }
1456 }
1457 }
1458
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1459 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1460 batch_control* bctl = static_cast<batch_control*>(bctlp);
1461 grpc_call* call = bctl->call;
1462
1463 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1464
1465 if (error == GRPC_ERROR_NONE) {
1466 grpc_metadata_batch* md =
1467 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1468 recv_initial_filter(call, md);
1469
1470 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1471 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1472 validate_filtered_metadata(bctl);
1473
1474 if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1475 call->send_deadline = md->deadline;
1476 }
1477 } else {
1478 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1479 GRPC_ERROR_NONE) {
1480 gpr_atm_rel_store(&bctl->batch_error,
1481 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1482 }
1483 cancel_with_error(call, GRPC_ERROR_REF(error));
1484 }
1485
1486 grpc_closure* saved_rsr_closure = nullptr;
1487 while (true) {
1488 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1489 /* Should only receive initial metadata once */
1490 GPR_ASSERT(rsr_bctlp != 1);
1491 if (rsr_bctlp == 0) {
1492 /* We haven't seen initial metadata and messages before, thus initial
1493 * metadata is received first.
1494 * no_barrier_cas is used, as this function won't access the batch_control
1495 * object saved by receiving_stream_ready() if the initial metadata is
1496 * received first. */
1497 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1498 RECV_INITIAL_METADATA_FIRST)) {
1499 break;
1500 }
1501 } else {
1502 /* Already received messages */
1503 saved_rsr_closure =
1504 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1505 grpc_schedule_on_exec_ctx);
1506 /* No need to modify recv_state */
1507 break;
1508 }
1509 }
1510 if (saved_rsr_closure != nullptr) {
1511 grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1512 GRPC_ERROR_REF(error));
1513 }
1514
1515 finish_batch_step(bctl);
1516 }
1517
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1518 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1519 batch_control* bctl = static_cast<batch_control*>(bctlp);
1520 grpc_call* call = bctl->call;
1521 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1522 grpc_metadata_batch* md =
1523 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1524 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1525 finish_batch_step(bctl);
1526 }
1527
finish_batch(void * bctlp,grpc_error * error)1528 static void finish_batch(void* bctlp, grpc_error* error) {
1529 batch_control* bctl = static_cast<batch_control*>(bctlp);
1530 grpc_call* call = bctl->call;
1531 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1532 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1533 GRPC_ERROR_NONE) {
1534 gpr_atm_rel_store(&bctl->batch_error,
1535 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1536 }
1537 if (error != GRPC_ERROR_NONE) {
1538 cancel_with_error(call, GRPC_ERROR_REF(error));
1539 }
1540 finish_batch_step(bctl);
1541 }
1542
free_no_op_completion(void *,grpc_cq_completion * completion)1543 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1544 gpr_free(completion);
1545 }
1546
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1547 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1548 size_t nops, void* notify_tag,
1549 int is_notify_tag_closure) {
1550 GPR_TIMER_SCOPE("call_start_batch", 0);
1551
1552 size_t i;
1553 const grpc_op* op;
1554 batch_control* bctl;
1555 bool has_send_ops = false;
1556 int num_recv_ops = 0;
1557 grpc_call_error error = GRPC_CALL_OK;
1558 grpc_transport_stream_op_batch* stream_op;
1559 grpc_transport_stream_op_batch_payload* stream_op_payload;
1560
1561 GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1562
1563 if (nops == 0) {
1564 if (!is_notify_tag_closure) {
1565 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1566 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1567 free_no_op_completion, nullptr,
1568 static_cast<grpc_cq_completion*>(
1569 gpr_malloc(sizeof(grpc_cq_completion))));
1570 } else {
1571 grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
1572 GRPC_ERROR_NONE);
1573 }
1574 error = GRPC_CALL_OK;
1575 goto done;
1576 }
1577
1578 bctl = reuse_or_allocate_batch_control(call, ops);
1579 if (bctl == nullptr) {
1580 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1581 }
1582 bctl->completion_data.notify_tag.tag = notify_tag;
1583 bctl->completion_data.notify_tag.is_closure =
1584 static_cast<uint8_t>(is_notify_tag_closure != 0);
1585
1586 stream_op = &bctl->op;
1587 stream_op_payload = &call->stream_op_payload;
1588
1589 /* rewrite batch ops into a transport op */
1590 for (i = 0; i < nops; i++) {
1591 op = &ops[i];
1592 if (op->reserved != nullptr) {
1593 error = GRPC_CALL_ERROR;
1594 goto done_with_error;
1595 }
1596 switch (op->op) {
1597 case GRPC_OP_SEND_INITIAL_METADATA: {
1598 /* Flag validation: currently allow no flags */
1599 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1600 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1601 goto done_with_error;
1602 }
1603 if (call->sent_initial_metadata) {
1604 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1605 goto done_with_error;
1606 }
1607 // TODO(juanlishen): If the user has already specified a compression
1608 // algorithm by setting the initial metadata with key of
1609 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1610 // with the compression algorithm mapped from compression level.
1611 /* process compression level */
1612 grpc_metadata& compression_md = call->compression_md;
1613 compression_md.key = grpc_empty_slice();
1614 compression_md.value = grpc_empty_slice();
1615 compression_md.flags = 0;
1616 size_t additional_metadata_count = 0;
1617 grpc_compression_level effective_compression_level =
1618 GRPC_COMPRESS_LEVEL_NONE;
1619 bool level_set = false;
1620 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1621 effective_compression_level =
1622 op->data.send_initial_metadata.maybe_compression_level.level;
1623 level_set = true;
1624 } else {
1625 const grpc_compression_options copts =
1626 grpc_channel_compression_options(call->channel);
1627 if (copts.default_level.is_set) {
1628 level_set = true;
1629 effective_compression_level = copts.default_level.level;
1630 }
1631 }
1632 // Currently, only server side supports compression level setting.
1633 if (level_set && !call->is_client) {
1634 const grpc_compression_algorithm calgo =
1635 compression_algorithm_for_level_locked(
1636 call, effective_compression_level);
1637 // The following metadata will be checked and removed by the message
1638 // compression filter. It will be used as the call's compression
1639 // algorithm.
1640 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1641 compression_md.value = grpc_compression_algorithm_slice(calgo);
1642 additional_metadata_count++;
1643 }
1644 if (op->data.send_initial_metadata.count + additional_metadata_count >
1645 INT_MAX) {
1646 error = GRPC_CALL_ERROR_INVALID_METADATA;
1647 goto done_with_error;
1648 }
1649 stream_op->send_initial_metadata = true;
1650 call->sent_initial_metadata = true;
1651 if (!prepare_application_metadata(
1652 call, static_cast<int>(op->data.send_initial_metadata.count),
1653 op->data.send_initial_metadata.metadata, 0, call->is_client,
1654 &compression_md, static_cast<int>(additional_metadata_count))) {
1655 error = GRPC_CALL_ERROR_INVALID_METADATA;
1656 goto done_with_error;
1657 }
1658 /* TODO(ctiller): just make these the same variable? */
1659 if (call->is_client) {
1660 call->metadata_batch[0][0].deadline = call->send_deadline;
1661 }
1662 stream_op_payload->send_initial_metadata.send_initial_metadata =
1663 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1664 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1665 op->flags;
1666 if (call->is_client) {
1667 stream_op_payload->send_initial_metadata.peer_string =
1668 &call->peer_string;
1669 }
1670 has_send_ops = true;
1671 break;
1672 }
1673 case GRPC_OP_SEND_MESSAGE: {
1674 if (!are_write_flags_valid(op->flags)) {
1675 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1676 goto done_with_error;
1677 }
1678 if (op->data.send_message.send_message == nullptr) {
1679 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1680 goto done_with_error;
1681 }
1682 if (call->sending_message) {
1683 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1684 goto done_with_error;
1685 }
1686 uint32_t flags = op->flags;
1687 /* If the outgoing buffer is already compressed, mark it as so in the
1688 flags. These will be picked up by the compression filter and further
1689 (wasteful) attempts at compression skipped. */
1690 if (op->data.send_message.send_message->data.raw.compression >
1691 GRPC_COMPRESS_NONE) {
1692 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1693 }
1694 stream_op->send_message = true;
1695 call->sending_message = true;
1696 call->sending_stream.Init(
1697 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1698 stream_op_payload->send_message.send_message.reset(
1699 call->sending_stream.get());
1700 has_send_ops = true;
1701 break;
1702 }
1703 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1704 /* Flag validation: currently allow no flags */
1705 if (op->flags != 0) {
1706 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1707 goto done_with_error;
1708 }
1709 if (!call->is_client) {
1710 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1711 goto done_with_error;
1712 }
1713 if (call->sent_final_op) {
1714 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1715 goto done_with_error;
1716 }
1717 stream_op->send_trailing_metadata = true;
1718 call->sent_final_op = true;
1719 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1720 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1721 has_send_ops = true;
1722 break;
1723 }
1724 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1725 /* Flag validation: currently allow no flags */
1726 if (op->flags != 0) {
1727 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1728 goto done_with_error;
1729 }
1730 if (call->is_client) {
1731 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1732 goto done_with_error;
1733 }
1734 if (call->sent_final_op) {
1735 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1736 goto done_with_error;
1737 }
1738 if (op->data.send_status_from_server.trailing_metadata_count >
1739 INT_MAX) {
1740 error = GRPC_CALL_ERROR_INVALID_METADATA;
1741 goto done_with_error;
1742 }
1743 stream_op->send_trailing_metadata = true;
1744 call->sent_final_op = true;
1745 GPR_ASSERT(call->send_extra_metadata_count == 0);
1746 call->send_extra_metadata_count = 1;
1747 call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1748 op->data.send_status_from_server.status);
1749 grpc_error* status_error =
1750 op->data.send_status_from_server.status == GRPC_STATUS_OK
1751 ? GRPC_ERROR_NONE
1752 : grpc_error_set_int(
1753 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1754 "Server returned error"),
1755 GRPC_ERROR_INT_GRPC_STATUS,
1756 static_cast<intptr_t>(
1757 op->data.send_status_from_server.status));
1758 if (op->data.send_status_from_server.status_details != nullptr) {
1759 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1760 GRPC_MDSTR_GRPC_MESSAGE,
1761 grpc_slice_ref_internal(
1762 *op->data.send_status_from_server.status_details));
1763 call->send_extra_metadata_count++;
1764 if (status_error != GRPC_ERROR_NONE) {
1765 char* msg = grpc_slice_to_c_string(
1766 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1767 status_error =
1768 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1769 grpc_slice_from_copied_string(msg));
1770 gpr_free(msg);
1771 }
1772 }
1773
1774 gpr_atm_rel_store(&call->status_error,
1775 reinterpret_cast<gpr_atm>(status_error));
1776 if (!prepare_application_metadata(
1777 call,
1778 static_cast<int>(
1779 op->data.send_status_from_server.trailing_metadata_count),
1780 op->data.send_status_from_server.trailing_metadata, 1, 1,
1781 nullptr, 0)) {
1782 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1783 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1784 }
1785 call->send_extra_metadata_count = 0;
1786 error = GRPC_CALL_ERROR_INVALID_METADATA;
1787 goto done_with_error;
1788 }
1789 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1790 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1791 stream_op_payload->send_trailing_metadata.sent =
1792 &call->sent_server_trailing_metadata;
1793 has_send_ops = true;
1794 break;
1795 }
1796 case GRPC_OP_RECV_INITIAL_METADATA: {
1797 /* Flag validation: currently allow no flags */
1798 if (op->flags != 0) {
1799 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1800 goto done_with_error;
1801 }
1802 if (call->received_initial_metadata) {
1803 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1804 goto done_with_error;
1805 }
1806 call->received_initial_metadata = true;
1807 call->buffered_metadata[0] =
1808 op->data.recv_initial_metadata.recv_initial_metadata;
1809 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1810 receiving_initial_metadata_ready, bctl,
1811 grpc_schedule_on_exec_ctx);
1812 stream_op->recv_initial_metadata = true;
1813 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1814 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1815 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1816 &call->receiving_initial_metadata_ready;
1817 if (!call->is_client) {
1818 stream_op_payload->recv_initial_metadata.peer_string =
1819 &call->peer_string;
1820 }
1821 ++num_recv_ops;
1822 break;
1823 }
1824 case GRPC_OP_RECV_MESSAGE: {
1825 /* Flag validation: currently allow no flags */
1826 if (op->flags != 0) {
1827 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1828 goto done_with_error;
1829 }
1830 if (call->receiving_message) {
1831 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1832 goto done_with_error;
1833 }
1834 call->receiving_message = true;
1835 stream_op->recv_message = true;
1836 call->receiving_buffer = op->data.recv_message.recv_message;
1837 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1838 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1839 receiving_stream_ready_in_call_combiner, bctl,
1840 grpc_schedule_on_exec_ctx);
1841 stream_op_payload->recv_message.recv_message_ready =
1842 &call->receiving_stream_ready;
1843 ++num_recv_ops;
1844 break;
1845 }
1846 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1847 /* Flag validation: currently allow no flags */
1848 if (op->flags != 0) {
1849 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1850 goto done_with_error;
1851 }
1852 if (!call->is_client) {
1853 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1854 goto done_with_error;
1855 }
1856 if (call->requested_final_op) {
1857 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1858 goto done_with_error;
1859 }
1860 call->requested_final_op = true;
1861 call->buffered_metadata[1] =
1862 op->data.recv_status_on_client.trailing_metadata;
1863 call->final_op.client.status = op->data.recv_status_on_client.status;
1864 call->final_op.client.status_details =
1865 op->data.recv_status_on_client.status_details;
1866 call->final_op.client.error_string =
1867 op->data.recv_status_on_client.error_string;
1868 stream_op->recv_trailing_metadata = true;
1869 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1870 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1871 stream_op_payload->recv_trailing_metadata.collect_stats =
1872 &call->final_info.stats.transport_stream_stats;
1873 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1874 receiving_trailing_metadata_ready, bctl,
1875 grpc_schedule_on_exec_ctx);
1876 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1877 &call->receiving_trailing_metadata_ready;
1878 ++num_recv_ops;
1879 break;
1880 }
1881 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1882 /* Flag validation: currently allow no flags */
1883 if (op->flags != 0) {
1884 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1885 goto done_with_error;
1886 }
1887 if (call->is_client) {
1888 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1889 goto done_with_error;
1890 }
1891 if (call->requested_final_op) {
1892 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1893 goto done_with_error;
1894 }
1895 call->requested_final_op = true;
1896 call->final_op.server.cancelled =
1897 op->data.recv_close_on_server.cancelled;
1898 stream_op->recv_trailing_metadata = true;
1899 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1900 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1901 stream_op_payload->recv_trailing_metadata.collect_stats =
1902 &call->final_info.stats.transport_stream_stats;
1903 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1904 receiving_trailing_metadata_ready, bctl,
1905 grpc_schedule_on_exec_ctx);
1906 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1907 &call->receiving_trailing_metadata_ready;
1908 ++num_recv_ops;
1909 break;
1910 }
1911 }
1912 }
1913
1914 GRPC_CALL_INTERNAL_REF(call, "completion");
1915 if (!is_notify_tag_closure) {
1916 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1917 }
1918 bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1919
1920 if (has_send_ops) {
1921 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1922 grpc_schedule_on_exec_ctx);
1923 stream_op->on_complete = &bctl->finish_batch;
1924 }
1925
1926 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1927 execute_batch(call, stream_op, &bctl->start_batch);
1928
1929 done:
1930 return error;
1931
1932 done_with_error:
1933 /* reverse any mutations that occurred */
1934 if (stream_op->send_initial_metadata) {
1935 call->sent_initial_metadata = false;
1936 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1937 }
1938 if (stream_op->send_message) {
1939 call->sending_message = false;
1940 call->sending_stream->Orphan();
1941 }
1942 if (stream_op->send_trailing_metadata) {
1943 call->sent_final_op = false;
1944 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1945 }
1946 if (stream_op->recv_initial_metadata) {
1947 call->received_initial_metadata = false;
1948 }
1949 if (stream_op->recv_message) {
1950 call->receiving_message = false;
1951 }
1952 if (stream_op->recv_trailing_metadata) {
1953 call->requested_final_op = false;
1954 }
1955 goto done;
1956 }
1957
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1958 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1959 size_t nops, void* tag, void* reserved) {
1960 grpc_call_error err;
1961
1962 GRPC_API_TRACE(
1963 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1964 "reserved=%p)",
1965 5, (call, ops, (unsigned long)nops, tag, reserved));
1966
1967 if (reserved != nullptr) {
1968 err = GRPC_CALL_ERROR;
1969 } else {
1970 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1971 grpc_core::ExecCtx exec_ctx;
1972 err = call_start_batch(call, ops, nops, tag, 0);
1973 }
1974
1975 return err;
1976 }
1977
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1978 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1979 const grpc_op* ops,
1980 size_t nops,
1981 grpc_closure* closure) {
1982 return call_start_batch(call, ops, nops, closure, 1);
1983 }
1984
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1985 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1986 void* value, void (*destroy)(void* value)) {
1987 if (call->context[elem].destroy) {
1988 call->context[elem].destroy(call->context[elem].value);
1989 }
1990 call->context[elem].value = value;
1991 call->context[elem].destroy = destroy;
1992 }
1993
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1994 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1995 return call->context[elem].value;
1996 }
1997
grpc_call_is_client(grpc_call * call)1998 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
1999
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2000 grpc_compression_algorithm grpc_call_compression_for_level(
2001 grpc_call* call, grpc_compression_level level) {
2002 grpc_compression_algorithm algo =
2003 compression_algorithm_for_level_locked(call, level);
2004 return algo;
2005 }
2006
grpc_call_error_to_string(grpc_call_error error)2007 const char* grpc_call_error_to_string(grpc_call_error error) {
2008 switch (error) {
2009 case GRPC_CALL_ERROR:
2010 return "GRPC_CALL_ERROR";
2011 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2012 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2013 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2014 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2015 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2016 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2017 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2018 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2019 case GRPC_CALL_ERROR_INVALID_FLAGS:
2020 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2021 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2022 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2023 case GRPC_CALL_ERROR_INVALID_METADATA:
2024 return "GRPC_CALL_ERROR_INVALID_METADATA";
2025 case GRPC_CALL_ERROR_NOT_INVOKED:
2026 return "GRPC_CALL_ERROR_NOT_INVOKED";
2027 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2028 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2029 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2030 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2031 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2032 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2033 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2034 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2035 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2036 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2037 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2038 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2039 case GRPC_CALL_OK:
2040 return "GRPC_CALL_OK";
2041 }
2042 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2043 }
2044