1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <grpc/compression.h>
28 #include <grpc/grpc.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/alloc.h>
31 #include <grpc/support/log.h>
32 #include <grpc/support/string_util.h>
33
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/compression/algorithm_metadata.h"
36 #include "src/core/lib/debug/stats.h"
37 #include "src/core/lib/gpr/alloc.h"
38 #include "src/core/lib/gpr/arena.h"
39 #include "src/core/lib/gpr/string.h"
40 #include "src/core/lib/gpr/useful.h"
41 #include "src/core/lib/gprpp/manual_constructor.h"
42 #include "src/core/lib/iomgr/timer.h"
43 #include "src/core/lib/profiling/timers.h"
44 #include "src/core/lib/slice/slice_internal.h"
45 #include "src/core/lib/slice/slice_string_helpers.h"
46 #include "src/core/lib/surface/api_trace.h"
47 #include "src/core/lib/surface/call.h"
48 #include "src/core/lib/surface/call_test_only.h"
49 #include "src/core/lib/surface/channel.h"
50 #include "src/core/lib/surface/completion_queue.h"
51 #include "src/core/lib/surface/server.h"
52 #include "src/core/lib/surface/validate_metadata.h"
53 #include "src/core/lib/transport/error_utils.h"
54 #include "src/core/lib/transport/metadata.h"
55 #include "src/core/lib/transport/static_metadata.h"
56 #include "src/core/lib/transport/status_metadata.h"
57 #include "src/core/lib/transport/transport.h"
58
59 /** The maximum number of concurrent batches possible.
60 Based upon the maximum number of individually queueable ops in the batch
61 api:
62 - initial metadata send
63 - message send
64 - status/close send (depending on client/server)
65 - initial metadata recv
66 - message recv
67 - status/close recv (depending on client/server) */
68 #define MAX_CONCURRENT_BATCHES 6
69
70 #define MAX_SEND_EXTRA_METADATA_COUNT 3
71
72 // Used to create arena for the first call.
73 #define ESTIMATED_MDELEM_COUNT 16
74
75 typedef struct batch_control {
76 grpc_call* call;
77 /* Share memory for cq_completion and notify_tag as they are never needed
78 simultaneously. Each byte used in this data structure count as six bytes
79 per call, so any savings we can make are worthwhile,
80
81 We use notify_tag to determine whether or not to send notification to the
82 completion queue. Once we've made that determination, we can reuse the
83 memory for cq_completion. */
84 union {
85 grpc_cq_completion cq_completion;
86 struct {
87 /* Any given op indicates completion by either (a) calling a closure or
88 (b) sending a notification on the call's completion queue. If
89 \a is_closure is true, \a tag indicates a closure to be invoked;
90 otherwise, \a tag indicates the tag to be used in the notification to
91 be sent to the completion queue. */
92 void* tag;
93 bool is_closure;
94 } notify_tag;
95 } completion_data;
96 grpc_closure start_batch;
97 grpc_closure finish_batch;
98 gpr_refcount steps_to_complete;
99 gpr_atm batch_error;
100 grpc_transport_stream_op_batch op;
101 } batch_control;
102
103 typedef struct {
104 gpr_mu child_list_mu;
105 grpc_call* first_child;
106 } parent_call;
107
108 typedef struct {
109 grpc_call* parent;
110 /** siblings: children of the same parent form a list, and this list is
111 protected under
112 parent->mu */
113 grpc_call* sibling_next;
114 grpc_call* sibling_prev;
115 } child_call;
116
117 #define RECV_NONE ((gpr_atm)0)
118 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
119
120 struct grpc_call {
121 gpr_refcount ext_ref;
122 gpr_arena* arena;
123 grpc_call_combiner call_combiner;
124 grpc_completion_queue* cq;
125 grpc_polling_entity pollent;
126 grpc_channel* channel;
127 gpr_timespec start_time;
128 /* parent_call* */ gpr_atm parent_call_atm;
129 child_call* child;
130
131 /* client or server call */
132 bool is_client;
133 /** has grpc_call_unref been called */
134 bool destroy_called;
135 /** flag indicating that cancellation is inherited */
136 bool cancellation_is_inherited;
137 /** which ops are in-flight */
138 bool sent_initial_metadata;
139 bool sending_message;
140 bool sent_final_op;
141 bool received_initial_metadata;
142 bool receiving_message;
143 bool requested_final_op;
144 gpr_atm any_ops_sent_atm;
145 gpr_atm received_final_op_atm;
146
147 batch_control* active_batches[MAX_CONCURRENT_BATCHES];
148 grpc_transport_stream_op_batch_payload stream_op_payload;
149
150 /* first idx: is_receiving, second idx: is_trailing */
151 grpc_metadata_batch metadata_batch[2][2];
152
153 /* Buffered read metadata waiting to be returned to the application.
154 Element 0 is initial metadata, element 1 is trailing metadata. */
155 grpc_metadata_array* buffered_metadata[2];
156
157 grpc_metadata compression_md;
158
159 // A char* indicating the peer name.
160 gpr_atm peer_string;
161
162 /* Call data useful used for reporting. Only valid after the call has
163 * completed */
164 grpc_call_final_info final_info;
165
166 /* Compression algorithm for *incoming* data */
167 grpc_message_compression_algorithm incoming_message_compression_algorithm;
168 /* Stream compression algorithm for *incoming* data */
169 grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
170 /* Supported encodings (compression algorithms), a bitset */
171 uint32_t encodings_accepted_by_peer;
172 /* Supported stream encodings (stream compression algorithms), a bitset */
173 uint32_t stream_encodings_accepted_by_peer;
174
175 /* Contexts for various subsystems (security, tracing, ...). */
176 grpc_call_context_element context[GRPC_CONTEXT_COUNT];
177
178 /* for the client, extra metadata is initial metadata; for the
179 server, it's trailing metadata */
180 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
181 int send_extra_metadata_count;
182 grpc_millis send_deadline;
183
184 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
185
186 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
187 grpc_byte_buffer** receiving_buffer;
188 grpc_slice receiving_slice;
189 grpc_closure receiving_slice_ready;
190 grpc_closure receiving_stream_ready;
191 grpc_closure receiving_initial_metadata_ready;
192 grpc_closure receiving_trailing_metadata_ready;
193 uint32_t test_only_last_message_flags;
194 gpr_atm cancelled;
195
196 grpc_closure release_call;
197
198 union {
199 struct {
200 grpc_status_code* status;
201 grpc_slice* status_details;
202 const char** error_string;
203 } client;
204 struct {
205 int* cancelled;
206 // backpointer to owning server if this is a server side call.
207 grpc_server* server;
208 } server;
209 } final_op;
210 gpr_atm status_error;
211
212 /* recv_state can contain one of the following values:
213 RECV_NONE : : no initial metadata and messages received
214 RECV_INITIAL_METADATA_FIRST : received initial metadata first
215 a batch_control* : received messages first
216
217 +------1------RECV_NONE------3-----+
218 | |
219 | |
220 v v
221 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
222 | ^ | ^
223 | | | |
224 +-----2-----+ +-----4-----+
225
226 For 1, 4: See receiving_initial_metadata_ready() function
227 For 2, 3: See receiving_stream_ready() function */
228 gpr_atm recv_state;
229 };
230
231 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
232 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
233
234 #define CALL_STACK_FROM_CALL(call) \
235 (grpc_call_stack*)((char*)(call) + \
236 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
237 #define CALL_FROM_CALL_STACK(call_stack) \
238 (grpc_call*)(((char*)(call_stack)) - \
239 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
240
241 #define CALL_ELEM_FROM_CALL(call, idx) \
242 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
243 #define CALL_FROM_TOP_ELEM(top_elem) \
244 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
245
246 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
247 grpc_closure* start_batch_closure);
248
249 static void cancel_with_status(grpc_call* c, grpc_status_code status,
250 const char* description);
251 static void cancel_with_error(grpc_call* c, grpc_error* error);
252 static void destroy_call(void* call_stack, grpc_error* error);
253 static void receiving_slice_ready(void* bctlp, grpc_error* error);
254 static void set_final_status(grpc_call* call, grpc_error* error);
255 static void process_data_after_md(batch_control* bctl);
256 static void post_batch_completion(batch_control* bctl);
257
add_init_error(grpc_error ** composite,grpc_error * new_err)258 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
259 if (new_err == GRPC_ERROR_NONE) return;
260 if (*composite == GRPC_ERROR_NONE)
261 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
262 *composite = grpc_error_add_child(*composite, new_err);
263 }
264
grpc_call_arena_alloc(grpc_call * call,size_t size)265 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
266 return gpr_arena_alloc(call->arena, size);
267 }
268
get_or_create_parent_call(grpc_call * call)269 static parent_call* get_or_create_parent_call(grpc_call* call) {
270 parent_call* p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
271 if (p == nullptr) {
272 p = static_cast<parent_call*>(gpr_arena_alloc(call->arena, sizeof(*p)));
273 gpr_mu_init(&p->child_list_mu);
274 if (!gpr_atm_rel_cas(&call->parent_call_atm, (gpr_atm) nullptr,
275 (gpr_atm)p)) {
276 gpr_mu_destroy(&p->child_list_mu);
277 p = (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
278 }
279 }
280 return p;
281 }
282
get_parent_call(grpc_call * call)283 static parent_call* get_parent_call(grpc_call* call) {
284 return (parent_call*)gpr_atm_acq_load(&call->parent_call_atm);
285 }
286
grpc_call_get_initial_size_estimate()287 size_t grpc_call_get_initial_size_estimate() {
288 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
289 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
290 }
291
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)292 grpc_error* grpc_call_create(const grpc_call_create_args* args,
293 grpc_call** out_call) {
294 GPR_TIMER_SCOPE("grpc_call_create", 0);
295 size_t i, j;
296 grpc_error* error = GRPC_ERROR_NONE;
297 grpc_channel_stack* channel_stack =
298 grpc_channel_get_channel_stack(args->channel);
299 grpc_call* call;
300 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
301 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
302 gpr_arena* arena = gpr_arena_create(initial_size);
303 call = static_cast<grpc_call*>(
304 gpr_arena_alloc(arena, GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
305 channel_stack->call_stack_size));
306 gpr_ref_init(&call->ext_ref, 1);
307 gpr_atm_no_barrier_store(&call->cancelled, 0);
308 call->arena = arena;
309 grpc_call_combiner_init(&call->call_combiner);
310 *out_call = call;
311 call->channel = args->channel;
312 call->cq = args->cq;
313 call->start_time = gpr_now(GPR_CLOCK_MONOTONIC);
314 /* Always support no compression */
315 GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_MESSAGE_COMPRESS_NONE);
316 call->is_client = args->server_transport_data == nullptr;
317 call->stream_op_payload.context = call->context;
318 grpc_slice path = grpc_empty_slice();
319 if (call->is_client) {
320 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
321 GPR_ASSERT(args->add_initial_metadata_count <
322 MAX_SEND_EXTRA_METADATA_COUNT);
323 for (i = 0; i < args->add_initial_metadata_count; i++) {
324 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
325 if (grpc_slice_eq(GRPC_MDKEY(args->add_initial_metadata[i]),
326 GRPC_MDSTR_PATH)) {
327 path = grpc_slice_ref_internal(
328 GRPC_MDVALUE(args->add_initial_metadata[i]));
329 }
330 }
331 call->send_extra_metadata_count =
332 static_cast<int>(args->add_initial_metadata_count);
333 } else {
334 GRPC_STATS_INC_SERVER_CALLS_CREATED();
335 call->final_op.server.server = args->server;
336 GPR_ASSERT(args->add_initial_metadata_count == 0);
337 call->send_extra_metadata_count = 0;
338 }
339 for (i = 0; i < 2; i++) {
340 for (j = 0; j < 2; j++) {
341 call->metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
342 }
343 }
344 grpc_millis send_deadline = args->send_deadline;
345
346 bool immediately_cancel = false;
347
348 if (args->parent != nullptr) {
349 call->child =
350 static_cast<child_call*>(gpr_arena_alloc(arena, sizeof(child_call)));
351 call->child->parent = args->parent;
352
353 GRPC_CALL_INTERNAL_REF(args->parent, "child");
354 GPR_ASSERT(call->is_client);
355 GPR_ASSERT(!args->parent->is_client);
356
357 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
358 send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
359 }
360 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
361 * GRPC_PROPAGATE_STATS_CONTEXT */
362 /* TODO(ctiller): This should change to use the appropriate census start_op
363 * call. */
364 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
365 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
366 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
367 "Census tracing propagation requested "
368 "without Census context propagation"));
369 }
370 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
371 args->parent->context[GRPC_CONTEXT_TRACING].value,
372 nullptr);
373 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
374 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
375 "Census context propagation requested "
376 "without Census tracing propagation"));
377 }
378 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
379 call->cancellation_is_inherited = 1;
380 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
381 immediately_cancel = true;
382 }
383 }
384 }
385
386 call->send_deadline = send_deadline;
387
388 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
389 /* initial refcount dropped by grpc_call_unref */
390 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
391 args->server_transport_data,
392 call->context,
393 path,
394 call->start_time,
395 send_deadline,
396 call->arena,
397 &call->call_combiner};
398 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
399 call, &call_args));
400 // Publish this call to parent only after the call stack has been initialized.
401 if (args->parent != nullptr) {
402 child_call* cc = call->child;
403 parent_call* pc = get_or_create_parent_call(args->parent);
404 gpr_mu_lock(&pc->child_list_mu);
405 if (pc->first_child == nullptr) {
406 pc->first_child = call;
407 cc->sibling_next = cc->sibling_prev = call;
408 } else {
409 cc->sibling_next = pc->first_child;
410 cc->sibling_prev = pc->first_child->child->sibling_prev;
411 cc->sibling_next->child->sibling_prev =
412 cc->sibling_prev->child->sibling_next = call;
413 }
414 gpr_mu_unlock(&pc->child_list_mu);
415 }
416 if (error != GRPC_ERROR_NONE) {
417 cancel_with_error(call, GRPC_ERROR_REF(error));
418 }
419 if (immediately_cancel) {
420 cancel_with_error(call, GRPC_ERROR_CANCELLED);
421 }
422 if (args->cq != nullptr) {
423 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
424 "Only one of 'cq' and 'pollset_set_alternative' should be "
425 "non-nullptr.");
426 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
427 call->pollent =
428 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
429 }
430 if (args->pollset_set_alternative != nullptr) {
431 call->pollent = grpc_polling_entity_create_from_pollset_set(
432 args->pollset_set_alternative);
433 }
434 if (!grpc_polling_entity_is_empty(&call->pollent)) {
435 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
436 &call->pollent);
437 }
438
439 if (call->is_client) {
440 grpc_core::channelz::ChannelNode* channelz_channel =
441 grpc_channel_get_channelz_node(call->channel);
442 if (channelz_channel != nullptr) {
443 channelz_channel->RecordCallStarted();
444 }
445 } else {
446 grpc_core::channelz::ServerNode* channelz_server =
447 grpc_server_get_channelz_node(call->final_op.server.server);
448 if (channelz_server != nullptr) {
449 channelz_server->RecordCallStarted();
450 }
451 }
452
453 grpc_slice_unref_internal(path);
454
455 return error;
456 }
457
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)458 void grpc_call_set_completion_queue(grpc_call* call,
459 grpc_completion_queue* cq) {
460 GPR_ASSERT(cq);
461
462 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
463 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
464 abort();
465 }
466 call->cq = cq;
467 GRPC_CQ_INTERNAL_REF(cq, "bind");
468 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
469 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
470 &call->pollent);
471 }
472
473 #ifndef NDEBUG
474 #define REF_REASON reason
475 #define REF_ARG , const char* reason
476 #else
477 #define REF_REASON ""
478 #define REF_ARG
479 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)480 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
481 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
482 }
grpc_call_internal_unref(grpc_call * c REF_ARG)483 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
484 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
485 }
486
release_call(void * call,grpc_error * error)487 static void release_call(void* call, grpc_error* error) {
488 grpc_call* c = static_cast<grpc_call*>(call);
489 grpc_channel* channel = c->channel;
490 gpr_free(static_cast<void*>(const_cast<char*>(c->final_info.error_string)));
491 grpc_call_combiner_destroy(&c->call_combiner);
492 grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
493 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
494 }
495
destroy_call(void * call,grpc_error * error)496 static void destroy_call(void* call, grpc_error* error) {
497 GPR_TIMER_SCOPE("destroy_call", 0);
498 size_t i;
499 int ii;
500 grpc_call* c = static_cast<grpc_call*>(call);
501 for (i = 0; i < 2; i++) {
502 grpc_metadata_batch_destroy(
503 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
504 }
505 c->receiving_stream.reset();
506 parent_call* pc = get_parent_call(c);
507 if (pc != nullptr) {
508 gpr_mu_destroy(&pc->child_list_mu);
509 }
510 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
511 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
512 }
513 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
514 if (c->context[i].destroy) {
515 c->context[i].destroy(c->context[i].value);
516 }
517 }
518 if (c->cq) {
519 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
520 }
521
522 grpc_error* status_error =
523 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&c->status_error));
524 grpc_error_get_status(status_error, c->send_deadline,
525 &c->final_info.final_status, nullptr, nullptr,
526 &(c->final_info.error_string));
527 GRPC_ERROR_UNREF(status_error);
528 c->final_info.stats.latency =
529 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
530
531 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
532 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
533 grpc_schedule_on_exec_ctx));
534 }
535
grpc_call_ref(grpc_call * c)536 void grpc_call_ref(grpc_call* c) { gpr_ref(&c->ext_ref); }
537
grpc_call_unref(grpc_call * c)538 void grpc_call_unref(grpc_call* c) {
539 if (!gpr_unref(&c->ext_ref)) return;
540
541 GPR_TIMER_SCOPE("grpc_call_unref", 0);
542
543 child_call* cc = c->child;
544 grpc_core::ExecCtx exec_ctx;
545
546 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
547
548 if (cc) {
549 parent_call* pc = get_parent_call(cc->parent);
550 gpr_mu_lock(&pc->child_list_mu);
551 if (c == pc->first_child) {
552 pc->first_child = cc->sibling_next;
553 if (c == pc->first_child) {
554 pc->first_child = nullptr;
555 }
556 }
557 cc->sibling_prev->child->sibling_next = cc->sibling_next;
558 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
559 gpr_mu_unlock(&pc->child_list_mu);
560 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
561 }
562
563 GPR_ASSERT(!c->destroy_called);
564 c->destroy_called = 1;
565 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
566 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
567 if (cancel) {
568 cancel_with_error(c, GRPC_ERROR_CANCELLED);
569 } else {
570 // Unset the call combiner cancellation closure. This has the
571 // effect of scheduling the previously set cancellation closure, if
572 // any, so that it can release any internal references it may be
573 // holding to the call stack. Also flush the closures on exec_ctx so that
574 // filters that schedule cancel notification closures on exec_ctx do not
575 // need to take a ref of the call stack to guarantee closure liveness.
576 grpc_call_combiner_set_notify_on_cancel(&c->call_combiner, nullptr);
577 grpc_core::ExecCtx::Get()->Flush();
578 }
579 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
580 }
581
grpc_call_cancel(grpc_call * call,void * reserved)582 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
583 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
584 GPR_ASSERT(!reserved);
585 grpc_core::ExecCtx exec_ctx;
586 cancel_with_error(call, GRPC_ERROR_CANCELLED);
587 return GRPC_CALL_OK;
588 }
589
590 // This is called via the call combiner to start sending a batch down
591 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error * ignored)592 static void execute_batch_in_call_combiner(void* arg, grpc_error* ignored) {
593 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
594 grpc_transport_stream_op_batch* batch =
595 static_cast<grpc_transport_stream_op_batch*>(arg);
596 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
597 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
598 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
599 elem->filter->start_transport_stream_op_batch(elem, batch);
600 }
601
602 // start_batch_closure points to a caller-allocated closure to be used
603 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)604 static void execute_batch(grpc_call* call,
605 grpc_transport_stream_op_batch* batch,
606 grpc_closure* start_batch_closure) {
607 batch->handler_private.extra_arg = call;
608 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
609 grpc_schedule_on_exec_ctx);
610 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
611 GRPC_ERROR_NONE, "executing batch");
612 }
613
grpc_call_get_peer(grpc_call * call)614 char* grpc_call_get_peer(grpc_call* call) {
615 char* peer_string = (char*)gpr_atm_acq_load(&call->peer_string);
616 if (peer_string != nullptr) return gpr_strdup(peer_string);
617 peer_string = grpc_channel_get_target(call->channel);
618 if (peer_string != nullptr) return peer_string;
619 return gpr_strdup("unknown");
620 }
621
grpc_call_from_top_element(grpc_call_element * elem)622 grpc_call* grpc_call_from_top_element(grpc_call_element* elem) {
623 return CALL_FROM_TOP_ELEM(elem);
624 }
625
626 /*******************************************************************************
627 * CANCELLATION
628 */
629
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)630 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
631 grpc_status_code status,
632 const char* description,
633 void* reserved) {
634 grpc_core::ExecCtx exec_ctx;
635 GRPC_API_TRACE(
636 "grpc_call_cancel_with_status("
637 "c=%p, status=%d, description=%s, reserved=%p)",
638 4, (c, (int)status, description, reserved));
639 GPR_ASSERT(reserved == nullptr);
640 cancel_with_status(c, status, description);
641 return GRPC_CALL_OK;
642 }
643
644 typedef struct {
645 grpc_call* call;
646 grpc_closure start_batch;
647 grpc_closure finish_batch;
648 } cancel_state;
649
650 // The on_complete callback used when sending a cancel_stream batch down
651 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error * error)652 static void done_termination(void* arg, grpc_error* error) {
653 cancel_state* state = static_cast<cancel_state*>(arg);
654 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
655 "on_complete for cancel_stream op");
656 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
657 gpr_free(state);
658 }
659
cancel_with_error(grpc_call * c,grpc_error * error)660 static void cancel_with_error(grpc_call* c, grpc_error* error) {
661 if (!gpr_atm_rel_cas(&c->cancelled, 0, 1)) {
662 GRPC_ERROR_UNREF(error);
663 return;
664 }
665 GRPC_CALL_INTERNAL_REF(c, "termination");
666 // Inform the call combiner of the cancellation, so that it can cancel
667 // any in-flight asynchronous actions that may be holding the call
668 // combiner. This ensures that the cancel_stream batch can be sent
669 // down the filter stack in a timely manner.
670 grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
671 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
672 state->call = c;
673 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
674 grpc_schedule_on_exec_ctx);
675 grpc_transport_stream_op_batch* op =
676 grpc_make_transport_stream_op(&state->finish_batch);
677 op->cancel_stream = true;
678 op->payload->cancel_stream.cancel_error = error;
679 execute_batch(c, op, &state->start_batch);
680 }
681
error_from_status(grpc_status_code status,const char * description)682 static grpc_error* error_from_status(grpc_status_code status,
683 const char* description) {
684 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
685 // guarantee that can be short-lived.
686 return grpc_error_set_int(
687 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
688 GRPC_ERROR_STR_GRPC_MESSAGE,
689 grpc_slice_from_copied_string(description)),
690 GRPC_ERROR_INT_GRPC_STATUS, status);
691 }
692
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)693 static void cancel_with_status(grpc_call* c, grpc_status_code status,
694 const char* description) {
695 cancel_with_error(c, error_from_status(status, description));
696 }
697
set_final_status(grpc_call * call,grpc_error * error)698 static void set_final_status(grpc_call* call, grpc_error* error) {
699 if (grpc_call_error_trace.enabled()) {
700 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
701 gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
702 }
703 if (call->is_client) {
704 grpc_error_get_status(error, call->send_deadline,
705 call->final_op.client.status,
706 call->final_op.client.status_details, nullptr,
707 call->final_op.client.error_string);
708 // explicitly take a ref
709 grpc_slice_ref_internal(*call->final_op.client.status_details);
710 gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
711 grpc_core::channelz::ChannelNode* channelz_channel =
712 grpc_channel_get_channelz_node(call->channel);
713 if (channelz_channel != nullptr) {
714 if (*call->final_op.client.status != GRPC_STATUS_OK) {
715 channelz_channel->RecordCallFailed();
716 } else {
717 channelz_channel->RecordCallSucceeded();
718 }
719 }
720 } else {
721 *call->final_op.server.cancelled =
722 error != GRPC_ERROR_NONE ||
723 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&call->status_error)) !=
724 GRPC_ERROR_NONE;
725 grpc_core::channelz::ServerNode* channelz_server =
726 grpc_server_get_channelz_node(call->final_op.server.server);
727 if (channelz_server != nullptr) {
728 if (*call->final_op.server.cancelled) {
729 channelz_server->RecordCallFailed();
730 } else {
731 channelz_server->RecordCallSucceeded();
732 }
733 }
734 GRPC_ERROR_UNREF(error);
735 }
736 }
737
738 /*******************************************************************************
739 * COMPRESSION
740 */
741
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)742 static void set_incoming_message_compression_algorithm(
743 grpc_call* call, grpc_message_compression_algorithm algo) {
744 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
745 call->incoming_message_compression_algorithm = algo;
746 }
747
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)748 static void set_incoming_stream_compression_algorithm(
749 grpc_call* call, grpc_stream_compression_algorithm algo) {
750 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
751 call->incoming_stream_compression_algorithm = algo;
752 }
753
grpc_call_test_only_get_compression_algorithm(grpc_call * call)754 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
755 grpc_call* call) {
756 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
757 grpc_compression_algorithm_from_message_stream_compression_algorithm(
758 &algorithm, call->incoming_message_compression_algorithm,
759 call->incoming_stream_compression_algorithm);
760 return algorithm;
761 }
762
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)763 static grpc_compression_algorithm compression_algorithm_for_level_locked(
764 grpc_call* call, grpc_compression_level level) {
765 return grpc_compression_algorithm_for_level(level,
766 call->encodings_accepted_by_peer);
767 }
768
grpc_call_test_only_get_message_flags(grpc_call * call)769 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
770 uint32_t flags;
771 flags = call->test_only_last_message_flags;
772 return flags;
773 }
774
destroy_encodings_accepted_by_peer(void * p)775 static void destroy_encodings_accepted_by_peer(void* p) { return; }
776
set_encodings_accepted_by_peer(grpc_call * call,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)777 static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel,
778 uint32_t* encodings_accepted_by_peer,
779 bool stream_encoding) {
780 size_t i;
781 uint32_t algorithm;
782 grpc_slice_buffer accept_encoding_parts;
783 grpc_slice accept_encoding_slice;
784 void* accepted_user_data;
785
786 accepted_user_data =
787 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
788 if (accepted_user_data != nullptr) {
789 *encodings_accepted_by_peer =
790 static_cast<uint32_t>(((uintptr_t)accepted_user_data) - 1);
791 return;
792 }
793
794 *encodings_accepted_by_peer = 0;
795
796 accept_encoding_slice = GRPC_MDVALUE(mdel);
797 grpc_slice_buffer_init(&accept_encoding_parts);
798 grpc_slice_split_without_space(accept_encoding_slice, ",",
799 &accept_encoding_parts);
800
801 GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
802 for (i = 0; i < accept_encoding_parts.count; i++) {
803 int r;
804 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
805 if (!stream_encoding) {
806 r = grpc_message_compression_algorithm_parse(
807 accept_encoding_entry_slice,
808 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
809 } else {
810 r = grpc_stream_compression_algorithm_parse(
811 accept_encoding_entry_slice,
812 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
813 }
814 if (r) {
815 GPR_BITSET(encodings_accepted_by_peer, algorithm);
816 } else {
817 char* accept_encoding_entry_str =
818 grpc_slice_to_c_string(accept_encoding_entry_slice);
819 gpr_log(GPR_DEBUG,
820 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
821 accept_encoding_entry_str);
822 gpr_free(accept_encoding_entry_str);
823 }
824 }
825
826 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
827
828 grpc_mdelem_set_user_data(
829 mdel, destroy_encodings_accepted_by_peer,
830 (void*)((static_cast<uintptr_t>(*encodings_accepted_by_peer)) + 1));
831 }
832
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)833 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
834 uint32_t encodings_accepted_by_peer;
835 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
836 return encodings_accepted_by_peer;
837 }
838
839 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)840 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
841 return call->incoming_stream_compression_algorithm;
842 }
843
linked_from_md(const grpc_metadata * md)844 static grpc_linked_mdelem* linked_from_md(const grpc_metadata* md) {
845 return (grpc_linked_mdelem*)&md->internal_data;
846 }
847
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)848 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
849 grpc_metadata* additional_metadata, int i,
850 int count) {
851 grpc_metadata* res =
852 i < count ? &metadata[i] : &additional_metadata[i - count];
853 GPR_ASSERT(res);
854 return res;
855 }
856
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)857 static int prepare_application_metadata(grpc_call* call, int count,
858 grpc_metadata* metadata,
859 int is_trailing,
860 int prepend_extra_metadata,
861 grpc_metadata* additional_metadata,
862 int additional_metadata_count) {
863 int total_count = count + additional_metadata_count;
864 int i;
865 grpc_metadata_batch* batch =
866 &call->metadata_batch[0 /* is_receiving */][is_trailing];
867 for (i = 0; i < total_count; i++) {
868 const grpc_metadata* md =
869 get_md_elem(metadata, additional_metadata, i, count);
870 grpc_linked_mdelem* l = linked_from_md(md);
871 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
872 if (!GRPC_LOG_IF_ERROR("validate_metadata",
873 grpc_validate_header_key_is_legal(md->key))) {
874 break;
875 } else if (!grpc_is_binary_header(md->key) &&
876 !GRPC_LOG_IF_ERROR(
877 "validate_metadata",
878 grpc_validate_header_nonbin_value_is_legal(md->value))) {
879 break;
880 }
881 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
882 }
883 if (i != total_count) {
884 for (int j = 0; j < i; j++) {
885 const grpc_metadata* md =
886 get_md_elem(metadata, additional_metadata, j, count);
887 grpc_linked_mdelem* l = linked_from_md(md);
888 GRPC_MDELEM_UNREF(l->md);
889 }
890 return 0;
891 }
892 if (prepend_extra_metadata) {
893 if (call->send_extra_metadata_count == 0) {
894 prepend_extra_metadata = 0;
895 } else {
896 for (i = 0; i < call->send_extra_metadata_count; i++) {
897 GRPC_LOG_IF_ERROR("prepare_application_metadata",
898 grpc_metadata_batch_link_tail(
899 batch, &call->send_extra_metadata[i]));
900 }
901 }
902 }
903 for (i = 0; i < total_count; i++) {
904 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
905 grpc_linked_mdelem* l = linked_from_md(md);
906 grpc_error* error = grpc_metadata_batch_link_tail(batch, l);
907 if (error != GRPC_ERROR_NONE) {
908 GRPC_MDELEM_UNREF(l->md);
909 }
910 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
911 }
912 call->send_extra_metadata_count = 0;
913
914 return 1;
915 }
916
decode_message_compression(grpc_mdelem md)917 static grpc_message_compression_algorithm decode_message_compression(
918 grpc_mdelem md) {
919 grpc_message_compression_algorithm algorithm =
920 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
921 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
922 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
923 gpr_log(GPR_ERROR,
924 "Invalid incoming message compression algorithm: '%s'. "
925 "Interpreting incoming data as uncompressed.",
926 md_c_str);
927 gpr_free(md_c_str);
928 return GRPC_MESSAGE_COMPRESS_NONE;
929 }
930 return algorithm;
931 }
932
decode_stream_compression(grpc_mdelem md)933 static grpc_stream_compression_algorithm decode_stream_compression(
934 grpc_mdelem md) {
935 grpc_stream_compression_algorithm algorithm =
936 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
937 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
938 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
939 gpr_log(GPR_ERROR,
940 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
941 "incoming data as uncompressed.",
942 md_c_str);
943 gpr_free(md_c_str);
944 return GRPC_STREAM_COMPRESS_NONE;
945 }
946 return algorithm;
947 }
948
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)949 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
950 int is_trailing) {
951 if (b->list.count == 0) return;
952 if (!call->is_client && is_trailing) return;
953 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
954 GPR_TIMER_SCOPE("publish_app_metadata", 0);
955 grpc_metadata_array* dest;
956 grpc_metadata* mdusr;
957 dest = call->buffered_metadata[is_trailing];
958 if (dest->count + b->list.count > dest->capacity) {
959 dest->capacity =
960 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
961 dest->metadata = static_cast<grpc_metadata*>(
962 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
963 }
964 for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
965 mdusr = &dest->metadata[dest->count++];
966 /* we pass back borrowed slices that are valid whilst the call is valid */
967 mdusr->key = GRPC_MDKEY(l->md);
968 mdusr->value = GRPC_MDVALUE(l->md);
969 }
970 }
971
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)972 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
973 if (b->idx.named.content_encoding != nullptr) {
974 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
975 set_incoming_stream_compression_algorithm(
976 call, decode_stream_compression(b->idx.named.content_encoding->md));
977 grpc_metadata_batch_remove(b, b->idx.named.content_encoding);
978 }
979 if (b->idx.named.grpc_encoding != nullptr) {
980 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
981 set_incoming_message_compression_algorithm(
982 call, decode_message_compression(b->idx.named.grpc_encoding->md));
983 grpc_metadata_batch_remove(b, b->idx.named.grpc_encoding);
984 }
985 uint32_t message_encodings_accepted_by_peer = 1u;
986 uint32_t stream_encodings_accepted_by_peer = 1u;
987 if (b->idx.named.grpc_accept_encoding != nullptr) {
988 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
989 set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
990 &message_encodings_accepted_by_peer, false);
991 grpc_metadata_batch_remove(b, b->idx.named.grpc_accept_encoding);
992 }
993 if (b->idx.named.accept_encoding != nullptr) {
994 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
995 set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
996 &stream_encodings_accepted_by_peer, true);
997 grpc_metadata_batch_remove(b, b->idx.named.accept_encoding);
998 }
999 call->encodings_accepted_by_peer =
1000 grpc_compression_bitset_from_message_stream_compression_bitset(
1001 message_encodings_accepted_by_peer,
1002 stream_encodings_accepted_by_peer);
1003 publish_app_metadata(call, b, false);
1004 }
1005
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error * batch_error)1006 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1007 grpc_error* batch_error) {
1008 grpc_call* call = static_cast<grpc_call*>(args);
1009 if (batch_error != GRPC_ERROR_NONE) {
1010 set_final_status(call, batch_error);
1011 } else if (b->idx.named.grpc_status != nullptr) {
1012 grpc_status_code status_code =
1013 grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1014 grpc_error* error = GRPC_ERROR_NONE;
1015 if (status_code != GRPC_STATUS_OK) {
1016 error = grpc_error_set_int(
1017 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Error received from peer"),
1018 GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1019 }
1020 if (b->idx.named.grpc_message != nullptr) {
1021 error = grpc_error_set_str(
1022 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1023 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1024 grpc_metadata_batch_remove(b, b->idx.named.grpc_message);
1025 } else if (error != GRPC_ERROR_NONE) {
1026 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1027 grpc_empty_slice());
1028 }
1029 set_final_status(call, GRPC_ERROR_REF(error));
1030 grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
1031 GRPC_ERROR_UNREF(error);
1032 } else if (!call->is_client) {
1033 set_final_status(call, GRPC_ERROR_NONE);
1034 } else {
1035 gpr_log(GPR_DEBUG,
1036 "Received trailing metadata with no error and no status");
1037 set_final_status(
1038 call, grpc_error_set_int(
1039 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1040 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1041 }
1042 publish_app_metadata(call, b, true);
1043 }
1044
grpc_call_get_arena(grpc_call * call)1045 gpr_arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1046
grpc_call_get_call_stack(grpc_call * call)1047 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1048 return CALL_STACK_FROM_CALL(call);
1049 }
1050
1051 /*******************************************************************************
1052 * BATCH API IMPLEMENTATION
1053 */
1054
are_write_flags_valid(uint32_t flags)1055 static bool are_write_flags_valid(uint32_t flags) {
1056 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1057 const uint32_t allowed_write_positions =
1058 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1059 const uint32_t invalid_positions = ~allowed_write_positions;
1060 return !(flags & invalid_positions);
1061 }
1062
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1063 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1064 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1065 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1066 if (!is_client) {
1067 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1068 }
1069 return !(flags & invalid_positions);
1070 }
1071
batch_slot_for_op(grpc_op_type type)1072 static size_t batch_slot_for_op(grpc_op_type type) {
1073 switch (type) {
1074 case GRPC_OP_SEND_INITIAL_METADATA:
1075 return 0;
1076 case GRPC_OP_SEND_MESSAGE:
1077 return 1;
1078 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1079 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1080 return 2;
1081 case GRPC_OP_RECV_INITIAL_METADATA:
1082 return 3;
1083 case GRPC_OP_RECV_MESSAGE:
1084 return 4;
1085 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1086 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1087 return 5;
1088 }
1089 GPR_UNREACHABLE_CODE(return 123456789);
1090 }
1091
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops,size_t num_ops)1092 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1093 const grpc_op* ops,
1094 size_t num_ops) {
1095 size_t slot_idx = batch_slot_for_op(ops[0].op);
1096 batch_control** pslot = &call->active_batches[slot_idx];
1097 batch_control* bctl;
1098 if (*pslot != nullptr) {
1099 bctl = *pslot;
1100 if (bctl->call != nullptr) {
1101 return nullptr;
1102 }
1103 memset(bctl, 0, sizeof(*bctl));
1104 } else {
1105 bctl = static_cast<batch_control*>(
1106 gpr_arena_alloc(call->arena, sizeof(batch_control)));
1107 *pslot = bctl;
1108 }
1109 bctl->call = call;
1110 bctl->op.payload = &call->stream_op_payload;
1111 return bctl;
1112 }
1113
finish_batch_completion(void * user_data,grpc_cq_completion * storage)1114 static void finish_batch_completion(void* user_data,
1115 grpc_cq_completion* storage) {
1116 batch_control* bctl = static_cast<batch_control*>(user_data);
1117 grpc_call* call = bctl->call;
1118 bctl->call = nullptr;
1119 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1120 }
1121
reset_batch_errors(batch_control * bctl)1122 static void reset_batch_errors(batch_control* bctl) {
1123 GRPC_ERROR_UNREF(
1124 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1125 gpr_atm_rel_store(&bctl->batch_error,
1126 reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1127 }
1128
post_batch_completion(batch_control * bctl)1129 static void post_batch_completion(batch_control* bctl) {
1130 grpc_call* next_child_call;
1131 grpc_call* call = bctl->call;
1132 grpc_error* error = GRPC_ERROR_REF(
1133 reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)));
1134
1135 if (bctl->op.send_initial_metadata) {
1136 grpc_metadata_batch_destroy(
1137 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1138 }
1139 if (bctl->op.send_message) {
1140 call->sending_message = false;
1141 }
1142 if (bctl->op.send_trailing_metadata) {
1143 grpc_metadata_batch_destroy(
1144 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1145 }
1146 if (bctl->op.recv_trailing_metadata) {
1147 /* propagate cancellation to any interested children */
1148 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1149 parent_call* pc = get_parent_call(call);
1150 if (pc != nullptr) {
1151 grpc_call* child;
1152 gpr_mu_lock(&pc->child_list_mu);
1153 child = pc->first_child;
1154 if (child != nullptr) {
1155 do {
1156 next_child_call = child->child->sibling_next;
1157 if (child->cancellation_is_inherited) {
1158 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1159 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1160 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1161 }
1162 child = next_child_call;
1163 } while (child != pc->first_child);
1164 }
1165 gpr_mu_unlock(&pc->child_list_mu);
1166 }
1167 GRPC_ERROR_UNREF(error);
1168 error = GRPC_ERROR_NONE;
1169 }
1170 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1171 *call->receiving_buffer != nullptr) {
1172 grpc_byte_buffer_destroy(*call->receiving_buffer);
1173 *call->receiving_buffer = nullptr;
1174 }
1175 reset_batch_errors(bctl);
1176
1177 if (bctl->completion_data.notify_tag.is_closure) {
1178 /* unrefs error */
1179 bctl->call = nullptr;
1180 /* This closure may be meant to be run within some combiner. Since we aren't
1181 * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
1182 * of GRPC_CLOSURE_RUN.
1183 */
1184 GRPC_CLOSURE_SCHED((grpc_closure*)bctl->completion_data.notify_tag.tag,
1185 error);
1186 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1187 } else {
1188 /* unrefs error */
1189 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1190 finish_batch_completion, bctl,
1191 &bctl->completion_data.cq_completion);
1192 }
1193 }
1194
finish_batch_step(batch_control * bctl)1195 static void finish_batch_step(batch_control* bctl) {
1196 if (gpr_unref(&bctl->steps_to_complete)) {
1197 post_batch_completion(bctl);
1198 }
1199 }
1200
continue_receiving_slices(batch_control * bctl)1201 static void continue_receiving_slices(batch_control* bctl) {
1202 grpc_error* error;
1203 grpc_call* call = bctl->call;
1204 for (;;) {
1205 size_t remaining = call->receiving_stream->length() -
1206 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1207 if (remaining == 0) {
1208 call->receiving_message = 0;
1209 call->receiving_stream.reset();
1210 finish_batch_step(bctl);
1211 return;
1212 }
1213 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1214 error = call->receiving_stream->Pull(&call->receiving_slice);
1215 if (error == GRPC_ERROR_NONE) {
1216 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1217 call->receiving_slice);
1218 } else {
1219 call->receiving_stream.reset();
1220 grpc_byte_buffer_destroy(*call->receiving_buffer);
1221 *call->receiving_buffer = nullptr;
1222 call->receiving_message = 0;
1223 finish_batch_step(bctl);
1224 return;
1225 }
1226 } else {
1227 return;
1228 }
1229 }
1230 }
1231
receiving_slice_ready(void * bctlp,grpc_error * error)1232 static void receiving_slice_ready(void* bctlp, grpc_error* error) {
1233 batch_control* bctl = static_cast<batch_control*>(bctlp);
1234 grpc_call* call = bctl->call;
1235 bool release_error = false;
1236
1237 if (error == GRPC_ERROR_NONE) {
1238 grpc_slice slice;
1239 error = call->receiving_stream->Pull(&slice);
1240 if (error == GRPC_ERROR_NONE) {
1241 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1242 slice);
1243 continue_receiving_slices(bctl);
1244 } else {
1245 /* Error returned by ByteStream::Pull() needs to be released manually */
1246 release_error = true;
1247 }
1248 }
1249
1250 if (error != GRPC_ERROR_NONE) {
1251 if (grpc_trace_operation_failures.enabled()) {
1252 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1253 }
1254 call->receiving_stream.reset();
1255 grpc_byte_buffer_destroy(*call->receiving_buffer);
1256 *call->receiving_buffer = nullptr;
1257 call->receiving_message = 0;
1258 finish_batch_step(bctl);
1259 if (release_error) {
1260 GRPC_ERROR_UNREF(error);
1261 }
1262 }
1263 }
1264
process_data_after_md(batch_control * bctl)1265 static void process_data_after_md(batch_control* bctl) {
1266 grpc_call* call = bctl->call;
1267 if (call->receiving_stream == nullptr) {
1268 *call->receiving_buffer = nullptr;
1269 call->receiving_message = 0;
1270 finish_batch_step(bctl);
1271 } else {
1272 call->test_only_last_message_flags = call->receiving_stream->flags();
1273 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1274 (call->incoming_message_compression_algorithm >
1275 GRPC_MESSAGE_COMPRESS_NONE)) {
1276 grpc_compression_algorithm algo;
1277 GPR_ASSERT(
1278 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1279 &algo, call->incoming_message_compression_algorithm,
1280 (grpc_stream_compression_algorithm)0));
1281 *call->receiving_buffer =
1282 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1283 } else {
1284 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1285 }
1286 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1287 grpc_schedule_on_exec_ctx);
1288 continue_receiving_slices(bctl);
1289 }
1290 }
1291
receiving_stream_ready(void * bctlp,grpc_error * error)1292 static void receiving_stream_ready(void* bctlp, grpc_error* error) {
1293 batch_control* bctl = static_cast<batch_control*>(bctlp);
1294 grpc_call* call = bctl->call;
1295 if (error != GRPC_ERROR_NONE) {
1296 call->receiving_stream.reset();
1297 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1298 GRPC_ERROR_NONE) {
1299 gpr_atm_rel_store(&bctl->batch_error,
1300 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1301 }
1302 cancel_with_error(call, GRPC_ERROR_REF(error));
1303 }
1304 /* If recv_state is RECV_NONE, we will save the batch_control
1305 * object with rel_cas, and will not use it after the cas. Its corresponding
1306 * acq_load is in receiving_initial_metadata_ready() */
1307 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1308 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
1309 process_data_after_md(bctl);
1310 }
1311 }
1312
1313 // The recv_message_ready callback used when sending a batch containing
1314 // a recv_message op down the filter stack. Yields the call combiner
1315 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error * error)1316 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1317 grpc_error* error) {
1318 batch_control* bctl = static_cast<batch_control*>(bctlp);
1319 grpc_call* call = bctl->call;
1320 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1321 receiving_stream_ready(bctlp, error);
1322 }
1323
validate_filtered_metadata(batch_control * bctl)1324 static void validate_filtered_metadata(batch_control* bctl) {
1325 grpc_compression_algorithm compression_algorithm;
1326 grpc_call* call = bctl->call;
1327 if (call->incoming_stream_compression_algorithm !=
1328 GRPC_STREAM_COMPRESS_NONE &&
1329 call->incoming_message_compression_algorithm !=
1330 GRPC_MESSAGE_COMPRESS_NONE) {
1331 char* error_msg = nullptr;
1332 gpr_asprintf(&error_msg,
1333 "Incoming stream has both stream compression (%d) and message "
1334 "compression (%d).",
1335 call->incoming_stream_compression_algorithm,
1336 call->incoming_message_compression_algorithm);
1337 gpr_log(GPR_ERROR, "%s", error_msg);
1338 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1339 gpr_free(error_msg);
1340 } else if (
1341 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1342 &compression_algorithm, call->incoming_message_compression_algorithm,
1343 call->incoming_stream_compression_algorithm) == 0) {
1344 char* error_msg = nullptr;
1345 gpr_asprintf(&error_msg,
1346 "Error in incoming message compression (%d) or stream "
1347 "compression (%d).",
1348 call->incoming_stream_compression_algorithm,
1349 call->incoming_message_compression_algorithm);
1350 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
1351 gpr_free(error_msg);
1352 } else {
1353 char* error_msg = nullptr;
1354 const grpc_compression_options compression_options =
1355 grpc_channel_compression_options(call->channel);
1356 if (compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT) {
1357 gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
1358 compression_algorithm);
1359 gpr_log(GPR_ERROR, "%s", error_msg);
1360 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1361 } else if (grpc_compression_options_is_algorithm_enabled(
1362 &compression_options, compression_algorithm) == 0) {
1363 /* check if algorithm is supported by current channel config */
1364 const char* algo_name = nullptr;
1365 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1366 gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
1367 algo_name);
1368 gpr_log(GPR_ERROR, "%s", error_msg);
1369 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
1370 }
1371 gpr_free(error_msg);
1372
1373 GPR_ASSERT(call->encodings_accepted_by_peer != 0);
1374 if (!GPR_BITGET(call->encodings_accepted_by_peer, compression_algorithm)) {
1375 if (grpc_compression_trace.enabled()) {
1376 const char* algo_name = nullptr;
1377 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1378 gpr_log(GPR_ERROR,
1379 "Compression algorithm ('%s') not present in the bitset of "
1380 "accepted encodings ('0x%x')",
1381 algo_name, call->encodings_accepted_by_peer);
1382 }
1383 }
1384 }
1385 }
1386
receiving_initial_metadata_ready(void * bctlp,grpc_error * error)1387 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
1388 batch_control* bctl = static_cast<batch_control*>(bctlp);
1389 grpc_call* call = bctl->call;
1390
1391 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1392
1393 if (error == GRPC_ERROR_NONE) {
1394 grpc_metadata_batch* md =
1395 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1396 recv_initial_filter(call, md);
1397
1398 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1399 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1400 validate_filtered_metadata(bctl);
1401
1402 if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1403 call->send_deadline = md->deadline;
1404 }
1405 } else {
1406 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1407 GRPC_ERROR_NONE) {
1408 gpr_atm_rel_store(&bctl->batch_error,
1409 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1410 }
1411 cancel_with_error(call, GRPC_ERROR_REF(error));
1412 }
1413
1414 grpc_closure* saved_rsr_closure = nullptr;
1415 while (true) {
1416 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1417 /* Should only receive initial metadata once */
1418 GPR_ASSERT(rsr_bctlp != 1);
1419 if (rsr_bctlp == 0) {
1420 /* We haven't seen initial metadata and messages before, thus initial
1421 * metadata is received first.
1422 * no_barrier_cas is used, as this function won't access the batch_control
1423 * object saved by receiving_stream_ready() if the initial metadata is
1424 * received first. */
1425 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1426 RECV_INITIAL_METADATA_FIRST)) {
1427 break;
1428 }
1429 } else {
1430 /* Already received messages */
1431 saved_rsr_closure =
1432 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1433 grpc_schedule_on_exec_ctx);
1434 /* No need to modify recv_state */
1435 break;
1436 }
1437 }
1438 if (saved_rsr_closure != nullptr) {
1439 GRPC_CLOSURE_RUN(saved_rsr_closure, GRPC_ERROR_REF(error));
1440 }
1441
1442 finish_batch_step(bctl);
1443 }
1444
receiving_trailing_metadata_ready(void * bctlp,grpc_error * error)1445 static void receiving_trailing_metadata_ready(void* bctlp, grpc_error* error) {
1446 batch_control* bctl = static_cast<batch_control*>(bctlp);
1447 grpc_call* call = bctl->call;
1448 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1449 grpc_metadata_batch* md =
1450 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1451 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1452 finish_batch_step(bctl);
1453 }
1454
finish_batch(void * bctlp,grpc_error * error)1455 static void finish_batch(void* bctlp, grpc_error* error) {
1456 batch_control* bctl = static_cast<batch_control*>(bctlp);
1457 grpc_call* call = bctl->call;
1458 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1459 if (reinterpret_cast<grpc_error*>(gpr_atm_acq_load(&bctl->batch_error)) ==
1460 GRPC_ERROR_NONE) {
1461 gpr_atm_rel_store(&bctl->batch_error,
1462 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1463 }
1464 if (error != GRPC_ERROR_NONE) {
1465 cancel_with_error(call, GRPC_ERROR_REF(error));
1466 }
1467 finish_batch_step(bctl);
1468 }
1469
free_no_op_completion(void * p,grpc_cq_completion * completion)1470 static void free_no_op_completion(void* p, grpc_cq_completion* completion) {
1471 gpr_free(completion);
1472 }
1473
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1474 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1475 size_t nops, void* notify_tag,
1476 int is_notify_tag_closure) {
1477 GPR_TIMER_SCOPE("call_start_batch", 0);
1478
1479 size_t i;
1480 const grpc_op* op;
1481 batch_control* bctl;
1482 bool has_send_ops = false;
1483 int num_recv_ops = 0;
1484 grpc_call_error error = GRPC_CALL_OK;
1485 grpc_transport_stream_op_batch* stream_op;
1486 grpc_transport_stream_op_batch_payload* stream_op_payload;
1487
1488 GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
1489
1490 if (nops == 0) {
1491 if (!is_notify_tag_closure) {
1492 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1493 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1494 free_no_op_completion, nullptr,
1495 static_cast<grpc_cq_completion*>(
1496 gpr_malloc(sizeof(grpc_cq_completion))));
1497 } else {
1498 GRPC_CLOSURE_SCHED((grpc_closure*)notify_tag, GRPC_ERROR_NONE);
1499 }
1500 error = GRPC_CALL_OK;
1501 goto done;
1502 }
1503
1504 bctl = reuse_or_allocate_batch_control(call, ops, nops);
1505 if (bctl == nullptr) {
1506 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1507 }
1508 bctl->completion_data.notify_tag.tag = notify_tag;
1509 bctl->completion_data.notify_tag.is_closure =
1510 static_cast<uint8_t>(is_notify_tag_closure != 0);
1511
1512 stream_op = &bctl->op;
1513 stream_op_payload = &call->stream_op_payload;
1514
1515 /* rewrite batch ops into a transport op */
1516 for (i = 0; i < nops; i++) {
1517 op = &ops[i];
1518 if (op->reserved != nullptr) {
1519 error = GRPC_CALL_ERROR;
1520 goto done_with_error;
1521 }
1522 switch (op->op) {
1523 case GRPC_OP_SEND_INITIAL_METADATA: {
1524 /* Flag validation: currently allow no flags */
1525 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1526 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1527 goto done_with_error;
1528 }
1529 if (call->sent_initial_metadata) {
1530 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1531 goto done_with_error;
1532 }
1533 /* process compression level */
1534 memset(&call->compression_md, 0, sizeof(call->compression_md));
1535 size_t additional_metadata_count = 0;
1536 grpc_compression_level effective_compression_level =
1537 GRPC_COMPRESS_LEVEL_NONE;
1538 bool level_set = false;
1539 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1540 effective_compression_level =
1541 op->data.send_initial_metadata.maybe_compression_level.level;
1542 level_set = true;
1543 } else {
1544 const grpc_compression_options copts =
1545 grpc_channel_compression_options(call->channel);
1546 if (copts.default_level.is_set) {
1547 level_set = true;
1548 effective_compression_level = copts.default_level.level;
1549 }
1550 }
1551 if (level_set && !call->is_client) {
1552 const grpc_compression_algorithm calgo =
1553 compression_algorithm_for_level_locked(
1554 call, effective_compression_level);
1555 /* the following will be picked up by the compress filter and used
1556 * as the call's compression algorithm. */
1557 call->compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1558 call->compression_md.value = grpc_compression_algorithm_slice(calgo);
1559 additional_metadata_count++;
1560 }
1561
1562 if (op->data.send_initial_metadata.count + additional_metadata_count >
1563 INT_MAX) {
1564 error = GRPC_CALL_ERROR_INVALID_METADATA;
1565 goto done_with_error;
1566 }
1567 stream_op->send_initial_metadata = true;
1568 call->sent_initial_metadata = true;
1569 if (!prepare_application_metadata(
1570 call, static_cast<int>(op->data.send_initial_metadata.count),
1571 op->data.send_initial_metadata.metadata, 0, call->is_client,
1572 &call->compression_md,
1573 static_cast<int>(additional_metadata_count))) {
1574 error = GRPC_CALL_ERROR_INVALID_METADATA;
1575 goto done_with_error;
1576 }
1577 /* TODO(ctiller): just make these the same variable? */
1578 if (call->is_client) {
1579 call->metadata_batch[0][0].deadline = call->send_deadline;
1580 }
1581 stream_op_payload->send_initial_metadata.send_initial_metadata =
1582 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1583 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1584 op->flags;
1585 if (call->is_client) {
1586 stream_op_payload->send_initial_metadata.peer_string =
1587 &call->peer_string;
1588 }
1589 has_send_ops = true;
1590 break;
1591 }
1592 case GRPC_OP_SEND_MESSAGE: {
1593 if (!are_write_flags_valid(op->flags)) {
1594 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1595 goto done_with_error;
1596 }
1597 if (op->data.send_message.send_message == nullptr) {
1598 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1599 goto done_with_error;
1600 }
1601 if (call->sending_message) {
1602 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1603 goto done_with_error;
1604 }
1605 uint32_t flags = op->flags;
1606 /* If the outgoing buffer is already compressed, mark it as so in the
1607 flags. These will be picked up by the compression filter and further
1608 (wasteful) attempts at compression skipped. */
1609 if (op->data.send_message.send_message->data.raw.compression >
1610 GRPC_COMPRESS_NONE) {
1611 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1612 }
1613 stream_op->send_message = true;
1614 call->sending_message = true;
1615 call->sending_stream.Init(
1616 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1617 stream_op_payload->send_message.send_message.reset(
1618 call->sending_stream.get());
1619 has_send_ops = true;
1620 break;
1621 }
1622 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1623 /* Flag validation: currently allow no flags */
1624 if (op->flags != 0) {
1625 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1626 goto done_with_error;
1627 }
1628 if (!call->is_client) {
1629 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1630 goto done_with_error;
1631 }
1632 if (call->sent_final_op) {
1633 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1634 goto done_with_error;
1635 }
1636 stream_op->send_trailing_metadata = true;
1637 call->sent_final_op = true;
1638 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1639 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1640 has_send_ops = true;
1641 break;
1642 }
1643 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1644 /* Flag validation: currently allow no flags */
1645 if (op->flags != 0) {
1646 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1647 goto done_with_error;
1648 }
1649 if (call->is_client) {
1650 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1651 goto done_with_error;
1652 }
1653 if (call->sent_final_op) {
1654 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1655 goto done_with_error;
1656 }
1657 if (op->data.send_status_from_server.trailing_metadata_count >
1658 INT_MAX) {
1659 error = GRPC_CALL_ERROR_INVALID_METADATA;
1660 goto done_with_error;
1661 }
1662 stream_op->send_trailing_metadata = true;
1663 call->sent_final_op = true;
1664 GPR_ASSERT(call->send_extra_metadata_count == 0);
1665 call->send_extra_metadata_count = 1;
1666 call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
1667 call->channel, op->data.send_status_from_server.status);
1668 grpc_error* status_error =
1669 op->data.send_status_from_server.status == GRPC_STATUS_OK
1670 ? GRPC_ERROR_NONE
1671 : grpc_error_set_int(
1672 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1673 "Server returned error"),
1674 GRPC_ERROR_INT_GRPC_STATUS,
1675 static_cast<intptr_t>(
1676 op->data.send_status_from_server.status));
1677 if (op->data.send_status_from_server.status_details != nullptr) {
1678 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1679 GRPC_MDSTR_GRPC_MESSAGE,
1680 grpc_slice_ref_internal(
1681 *op->data.send_status_from_server.status_details));
1682 call->send_extra_metadata_count++;
1683 if (status_error != GRPC_ERROR_NONE) {
1684 char* msg = grpc_slice_to_c_string(
1685 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1686 status_error =
1687 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1688 grpc_slice_from_copied_string(msg));
1689 gpr_free(msg);
1690 }
1691 }
1692
1693 gpr_atm_rel_store(&call->status_error,
1694 reinterpret_cast<gpr_atm>(status_error));
1695 if (!prepare_application_metadata(
1696 call,
1697 static_cast<int>(
1698 op->data.send_status_from_server.trailing_metadata_count),
1699 op->data.send_status_from_server.trailing_metadata, 1, 1,
1700 nullptr, 0)) {
1701 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1702 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1703 }
1704 call->send_extra_metadata_count = 0;
1705 error = GRPC_CALL_ERROR_INVALID_METADATA;
1706 goto done_with_error;
1707 }
1708 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1709 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1710 has_send_ops = true;
1711 break;
1712 }
1713 case GRPC_OP_RECV_INITIAL_METADATA: {
1714 /* Flag validation: currently allow no flags */
1715 if (op->flags != 0) {
1716 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1717 goto done_with_error;
1718 }
1719 if (call->received_initial_metadata) {
1720 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1721 goto done_with_error;
1722 }
1723 call->received_initial_metadata = true;
1724 call->buffered_metadata[0] =
1725 op->data.recv_initial_metadata.recv_initial_metadata;
1726 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1727 receiving_initial_metadata_ready, bctl,
1728 grpc_schedule_on_exec_ctx);
1729 stream_op->recv_initial_metadata = true;
1730 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1731 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1732 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1733 &call->receiving_initial_metadata_ready;
1734 if (!call->is_client) {
1735 stream_op_payload->recv_initial_metadata.peer_string =
1736 &call->peer_string;
1737 }
1738 ++num_recv_ops;
1739 break;
1740 }
1741 case GRPC_OP_RECV_MESSAGE: {
1742 /* Flag validation: currently allow no flags */
1743 if (op->flags != 0) {
1744 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1745 goto done_with_error;
1746 }
1747 if (call->receiving_message) {
1748 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1749 goto done_with_error;
1750 }
1751 call->receiving_message = true;
1752 stream_op->recv_message = true;
1753 call->receiving_buffer = op->data.recv_message.recv_message;
1754 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1755 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1756 receiving_stream_ready_in_call_combiner, bctl,
1757 grpc_schedule_on_exec_ctx);
1758 stream_op_payload->recv_message.recv_message_ready =
1759 &call->receiving_stream_ready;
1760 ++num_recv_ops;
1761 break;
1762 }
1763 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1764 /* Flag validation: currently allow no flags */
1765 if (op->flags != 0) {
1766 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1767 goto done_with_error;
1768 }
1769 if (!call->is_client) {
1770 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1771 goto done_with_error;
1772 }
1773 if (call->requested_final_op) {
1774 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1775 goto done_with_error;
1776 }
1777 call->requested_final_op = true;
1778 call->buffered_metadata[1] =
1779 op->data.recv_status_on_client.trailing_metadata;
1780 call->final_op.client.status = op->data.recv_status_on_client.status;
1781 call->final_op.client.status_details =
1782 op->data.recv_status_on_client.status_details;
1783 call->final_op.client.error_string =
1784 op->data.recv_status_on_client.error_string;
1785 stream_op->recv_trailing_metadata = true;
1786 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1787 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1788 stream_op_payload->recv_trailing_metadata.collect_stats =
1789 &call->final_info.stats.transport_stream_stats;
1790 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1791 receiving_trailing_metadata_ready, bctl,
1792 grpc_schedule_on_exec_ctx);
1793 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1794 &call->receiving_trailing_metadata_ready;
1795 ++num_recv_ops;
1796 break;
1797 }
1798 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1799 /* Flag validation: currently allow no flags */
1800 if (op->flags != 0) {
1801 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1802 goto done_with_error;
1803 }
1804 if (call->is_client) {
1805 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1806 goto done_with_error;
1807 }
1808 if (call->requested_final_op) {
1809 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1810 goto done_with_error;
1811 }
1812 call->requested_final_op = true;
1813 call->final_op.server.cancelled =
1814 op->data.recv_close_on_server.cancelled;
1815 stream_op->recv_trailing_metadata = true;
1816 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1817 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1818 stream_op_payload->recv_trailing_metadata.collect_stats =
1819 &call->final_info.stats.transport_stream_stats;
1820 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1821 receiving_trailing_metadata_ready, bctl,
1822 grpc_schedule_on_exec_ctx);
1823 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1824 &call->receiving_trailing_metadata_ready;
1825 ++num_recv_ops;
1826 break;
1827 }
1828 }
1829 }
1830
1831 GRPC_CALL_INTERNAL_REF(call, "completion");
1832 if (!is_notify_tag_closure) {
1833 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1834 }
1835 gpr_ref_init(&bctl->steps_to_complete, (has_send_ops ? 1 : 0) + num_recv_ops);
1836
1837 if (has_send_ops) {
1838 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1839 grpc_schedule_on_exec_ctx);
1840 stream_op->on_complete = &bctl->finish_batch;
1841 }
1842
1843 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1844 execute_batch(call, stream_op, &bctl->start_batch);
1845
1846 done:
1847 return error;
1848
1849 done_with_error:
1850 /* reverse any mutations that occurred */
1851 if (stream_op->send_initial_metadata) {
1852 call->sent_initial_metadata = false;
1853 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1854 }
1855 if (stream_op->send_message) {
1856 call->sending_message = false;
1857 call->sending_stream->Orphan();
1858 }
1859 if (stream_op->send_trailing_metadata) {
1860 call->sent_final_op = false;
1861 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1862 }
1863 if (stream_op->recv_initial_metadata) {
1864 call->received_initial_metadata = false;
1865 }
1866 if (stream_op->recv_message) {
1867 call->receiving_message = false;
1868 }
1869 if (stream_op->recv_trailing_metadata) {
1870 call->requested_final_op = false;
1871 }
1872 goto done;
1873 }
1874
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1875 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1876 size_t nops, void* tag, void* reserved) {
1877 grpc_core::ExecCtx exec_ctx;
1878 grpc_call_error err;
1879
1880 GRPC_API_TRACE(
1881 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1882 "reserved=%p)",
1883 5, (call, ops, (unsigned long)nops, tag, reserved));
1884
1885 if (reserved != nullptr) {
1886 err = GRPC_CALL_ERROR;
1887 } else {
1888 err = call_start_batch(call, ops, nops, tag, 0);
1889 }
1890
1891 return err;
1892 }
1893
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1894 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1895 const grpc_op* ops,
1896 size_t nops,
1897 grpc_closure* closure) {
1898 return call_start_batch(call, ops, nops, closure, 1);
1899 }
1900
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1901 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1902 void* value, void (*destroy)(void* value)) {
1903 if (call->context[elem].destroy) {
1904 call->context[elem].destroy(call->context[elem].value);
1905 }
1906 call->context[elem].value = value;
1907 call->context[elem].destroy = destroy;
1908 }
1909
grpc_call_context_get(grpc_call * call,grpc_context_index elem)1910 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
1911 return call->context[elem].value;
1912 }
1913
grpc_call_is_client(grpc_call * call)1914 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
1915
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)1916 grpc_compression_algorithm grpc_call_compression_for_level(
1917 grpc_call* call, grpc_compression_level level) {
1918 grpc_compression_algorithm algo =
1919 compression_algorithm_for_level_locked(call, level);
1920 return algo;
1921 }
1922
grpc_call_error_to_string(grpc_call_error error)1923 const char* grpc_call_error_to_string(grpc_call_error error) {
1924 switch (error) {
1925 case GRPC_CALL_ERROR:
1926 return "GRPC_CALL_ERROR";
1927 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
1928 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
1929 case GRPC_CALL_ERROR_ALREADY_FINISHED:
1930 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
1931 case GRPC_CALL_ERROR_ALREADY_INVOKED:
1932 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
1933 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
1934 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
1935 case GRPC_CALL_ERROR_INVALID_FLAGS:
1936 return "GRPC_CALL_ERROR_INVALID_FLAGS";
1937 case GRPC_CALL_ERROR_INVALID_MESSAGE:
1938 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
1939 case GRPC_CALL_ERROR_INVALID_METADATA:
1940 return "GRPC_CALL_ERROR_INVALID_METADATA";
1941 case GRPC_CALL_ERROR_NOT_INVOKED:
1942 return "GRPC_CALL_ERROR_NOT_INVOKED";
1943 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
1944 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
1945 case GRPC_CALL_ERROR_NOT_ON_SERVER:
1946 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
1947 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
1948 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
1949 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
1950 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
1951 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
1952 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
1953 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
1954 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
1955 case GRPC_CALL_OK:
1956 return "GRPC_CALL_OK";
1957 }
1958 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
1959 }
1960