1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <assert.h>
22 #include <limits.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26
27 #include <string>
28
29 #include "absl/strings/str_cat.h"
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/compression.h>
33 #include <grpc/grpc.h>
34 #include <grpc/slice.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/string_util.h>
38
39 #include "src/core/lib/channel/channel_stack.h"
40 #include "src/core/lib/compression/algorithm_metadata.h"
41 #include "src/core/lib/debug/stats.h"
42 #include "src/core/lib/gpr/alloc.h"
43 #include "src/core/lib/gpr/string.h"
44 #include "src/core/lib/gpr/time_precise.h"
45 #include "src/core/lib/gpr/useful.h"
46 #include "src/core/lib/gprpp/arena.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted.h"
49 #include "src/core/lib/iomgr/timer.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_string_helpers.h"
52 #include "src/core/lib/slice/slice_utils.h"
53 #include "src/core/lib/surface/api_trace.h"
54 #include "src/core/lib/surface/call.h"
55 #include "src/core/lib/surface/call_test_only.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/server.h"
59 #include "src/core/lib/surface/validate_metadata.h"
60 #include "src/core/lib/transport/error_utils.h"
61 #include "src/core/lib/transport/metadata.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64 #include "src/core/lib/transport/transport.h"
65
66 /** The maximum number of concurrent batches possible.
67 Based upon the maximum number of individually queueable ops in the batch
68 api:
69 - initial metadata send
70 - message send
71 - status/close send (depending on client/server)
72 - initial metadata recv
73 - message recv
74 - status/close recv (depending on client/server) */
75 #define MAX_CONCURRENT_BATCHES 6
76
77 #define MAX_SEND_EXTRA_METADATA_COUNT 3
78
79 // Used to create arena for the first call.
80 #define ESTIMATED_MDELEM_COUNT 16
81
82 struct batch_control {
83 batch_control() = default;
84
85 grpc_call* call = nullptr;
86 grpc_transport_stream_op_batch op;
87 /* Share memory for cq_completion and notify_tag as they are never needed
88 simultaneously. Each byte used in this data structure count as six bytes
89 per call, so any savings we can make are worthwhile,
90
91 We use notify_tag to determine whether or not to send notification to the
92 completion queue. Once we've made that determination, we can reuse the
93 memory for cq_completion. */
94 union {
95 grpc_cq_completion cq_completion;
96 struct {
97 /* Any given op indicates completion by either (a) calling a closure or
98 (b) sending a notification on the call's completion queue. If
99 \a is_closure is true, \a tag indicates a closure to be invoked;
100 otherwise, \a tag indicates the tag to be used in the notification to
101 be sent to the completion queue. */
102 void* tag;
103 bool is_closure;
104 } notify_tag;
105 } completion_data;
106 grpc_closure start_batch;
107 grpc_closure finish_batch;
108 grpc_core::Atomic<intptr_t> steps_to_complete;
109 gpr_atm batch_error = reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE);
set_num_steps_to_completebatch_control110 void set_num_steps_to_complete(uintptr_t steps) {
111 steps_to_complete.Store(steps, grpc_core::MemoryOrder::RELEASE);
112 }
completed_batch_stepbatch_control113 bool completed_batch_step() {
114 return steps_to_complete.FetchSub(1, grpc_core::MemoryOrder::ACQ_REL) == 1;
115 }
116 };
117
118 struct parent_call {
parent_callparent_call119 parent_call() { gpr_mu_init(&child_list_mu); }
~parent_callparent_call120 ~parent_call() { gpr_mu_destroy(&child_list_mu); }
121
122 gpr_mu child_list_mu;
123 grpc_call* first_child = nullptr;
124 };
125
126 struct child_call {
child_callchild_call127 explicit child_call(grpc_call* parent) : parent(parent) {}
128 grpc_call* parent;
129 /** siblings: children of the same parent form a list, and this list is
130 protected under
131 parent->mu */
132 grpc_call* sibling_next = nullptr;
133 grpc_call* sibling_prev = nullptr;
134 };
135
136 #define RECV_NONE ((gpr_atm)0)
137 #define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
138
139 struct grpc_call {
grpc_callgrpc_call140 grpc_call(grpc_core::Arena* arena, const grpc_call_create_args& args)
141 : arena(arena),
142 cq(args.cq),
143 channel(args.channel),
144 is_client(args.server_transport_data == nullptr),
145 stream_op_payload(context) {
146 for (int i = 0; i < 2; i++) {
147 for (int j = 0; j < 2; j++) {
148 metadata_batch[i][j].deadline = GRPC_MILLIS_INF_FUTURE;
149 }
150 }
151 }
152
~grpc_callgrpc_call153 ~grpc_call() {
154 gpr_free(static_cast<void*>(const_cast<char*>(final_info.error_string)));
155 }
156
157 grpc_core::RefCount ext_ref;
158 grpc_core::Arena* arena;
159 grpc_core::CallCombiner call_combiner;
160 grpc_completion_queue* cq;
161 grpc_polling_entity pollent;
162 grpc_channel* channel;
163 gpr_cycle_counter start_time = gpr_get_cycle_counter();
164 /* parent_call* */ gpr_atm parent_call_atm = 0;
165 child_call* child = nullptr;
166
167 /* client or server call */
168 bool is_client;
169 /** has grpc_call_unref been called */
170 bool destroy_called = false;
171 /** flag indicating that cancellation is inherited */
172 bool cancellation_is_inherited = false;
173 /** which ops are in-flight */
174 bool sent_initial_metadata = false;
175 bool sending_message = false;
176 bool sent_final_op = false;
177 bool received_initial_metadata = false;
178 bool receiving_message = false;
179 bool requested_final_op = false;
180 gpr_atm any_ops_sent_atm = 0;
181 gpr_atm received_final_op_atm = 0;
182
183 batch_control* active_batches[MAX_CONCURRENT_BATCHES] = {};
184 grpc_transport_stream_op_batch_payload stream_op_payload;
185
186 /* first idx: is_receiving, second idx: is_trailing */
187 grpc_metadata_batch metadata_batch[2][2] = {};
188
189 /* Buffered read metadata waiting to be returned to the application.
190 Element 0 is initial metadata, element 1 is trailing metadata. */
191 grpc_metadata_array* buffered_metadata[2] = {};
192
193 grpc_metadata compression_md;
194
195 // A char* indicating the peer name.
196 gpr_atm peer_string = 0;
197
198 /* Call data useful used for reporting. Only valid after the call has
199 * completed */
200 grpc_call_final_info final_info;
201
202 /* Compression algorithm for *incoming* data */
203 grpc_message_compression_algorithm incoming_message_compression_algorithm =
204 GRPC_MESSAGE_COMPRESS_NONE;
205 /* Stream compression algorithm for *incoming* data */
206 grpc_stream_compression_algorithm incoming_stream_compression_algorithm =
207 GRPC_STREAM_COMPRESS_NONE;
208 /* Supported encodings (compression algorithms), a bitset.
209 * Always support no compression. */
210 uint32_t encodings_accepted_by_peer = 1 << GRPC_MESSAGE_COMPRESS_NONE;
211 /* Supported stream encodings (stream compression algorithms), a bitset */
212 uint32_t stream_encodings_accepted_by_peer = 0;
213
214 /* Contexts for various subsystems (security, tracing, ...). */
215 grpc_call_context_element context[GRPC_CONTEXT_COUNT] = {};
216
217 /* for the client, extra metadata is initial metadata; for the
218 server, it's trailing metadata */
219 grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
220 int send_extra_metadata_count;
221 grpc_millis send_deadline;
222
223 grpc_core::ManualConstructor<grpc_core::SliceBufferByteStream> sending_stream;
224
225 grpc_core::OrphanablePtr<grpc_core::ByteStream> receiving_stream;
226 grpc_byte_buffer** receiving_buffer = nullptr;
227 grpc_slice receiving_slice = grpc_empty_slice();
228 grpc_closure receiving_slice_ready;
229 grpc_closure receiving_stream_ready;
230 grpc_closure receiving_initial_metadata_ready;
231 grpc_closure receiving_trailing_metadata_ready;
232 uint32_t test_only_last_message_flags = 0;
233 // Status about operation of call
234 bool sent_server_trailing_metadata = false;
235 gpr_atm cancelled_with_error = 0;
236
237 grpc_closure release_call;
238
239 union {
240 struct {
241 grpc_status_code* status;
242 grpc_slice* status_details;
243 const char** error_string;
244 } client;
245 struct {
246 int* cancelled;
247 // backpointer to owning server if this is a server side call.
248 grpc_core::Server* core_server;
249 } server;
250 } final_op;
251 gpr_atm status_error = 0;
252
253 /* recv_state can contain one of the following values:
254 RECV_NONE : : no initial metadata and messages received
255 RECV_INITIAL_METADATA_FIRST : received initial metadata first
256 a batch_control* : received messages first
257
258 +------1------RECV_NONE------3-----+
259 | |
260 | |
261 v v
262 RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
263 | ^ | ^
264 | | | |
265 +-----2-----+ +-----4-----+
266
267 For 1, 4: See receiving_initial_metadata_ready() function
268 For 2, 3: See receiving_stream_ready() function */
269 gpr_atm recv_state = 0;
270 };
271
272 grpc_core::TraceFlag grpc_call_error_trace(false, "call_error");
273 grpc_core::TraceFlag grpc_compression_trace(false, "compression");
274
275 #define CALL_STACK_FROM_CALL(call) \
276 (grpc_call_stack*)((char*)(call) + \
277 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
278 #define CALL_FROM_CALL_STACK(call_stack) \
279 (grpc_call*)(((char*)(call_stack)) - \
280 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)))
281
282 #define CALL_ELEM_FROM_CALL(call, idx) \
283 grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
284 #define CALL_FROM_TOP_ELEM(top_elem) \
285 CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
286
287 static void execute_batch(grpc_call* call,
288 grpc_transport_stream_op_batch* batch,
289 grpc_closure* start_batch_closure);
290
291 static void cancel_with_status(grpc_call* c, grpc_status_code status,
292 const char* description);
293 static void cancel_with_error(grpc_call* c, grpc_error_handle error);
294 static void destroy_call(void* call_stack, grpc_error_handle error);
295 static void receiving_slice_ready(void* bctlp, grpc_error_handle error);
296 static void set_final_status(grpc_call* call, grpc_error_handle error);
297 static void process_data_after_md(batch_control* bctl);
298 static void post_batch_completion(batch_control* bctl);
299
add_init_error(grpc_error_handle * composite,grpc_error_handle new_err)300 static void add_init_error(grpc_error_handle* composite,
301 grpc_error_handle new_err) {
302 if (new_err == GRPC_ERROR_NONE) return;
303 if (*composite == GRPC_ERROR_NONE) {
304 *composite = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Call creation failed");
305 }
306 *composite = grpc_error_add_child(*composite, new_err);
307 }
308
grpc_call_arena_alloc(grpc_call * call,size_t size)309 void* grpc_call_arena_alloc(grpc_call* call, size_t size) {
310 return call->arena->Alloc(size);
311 }
312
get_or_create_parent_call(grpc_call * call)313 static parent_call* get_or_create_parent_call(grpc_call* call) {
314 parent_call* p =
315 reinterpret_cast<parent_call*>(gpr_atm_acq_load(&call->parent_call_atm));
316 if (p == nullptr) {
317 p = call->arena->New<parent_call>();
318 if (!gpr_atm_rel_cas(&call->parent_call_atm,
319 reinterpret_cast<gpr_atm>(nullptr),
320 reinterpret_cast<gpr_atm>(p))) {
321 p->~parent_call();
322 p = reinterpret_cast<parent_call*>(
323 gpr_atm_acq_load(&call->parent_call_atm));
324 }
325 }
326 return p;
327 }
328
get_parent_call(grpc_call * call)329 static parent_call* get_parent_call(grpc_call* call) {
330 return reinterpret_cast<parent_call*>(
331 gpr_atm_acq_load(&call->parent_call_atm));
332 }
333
grpc_call_get_initial_size_estimate()334 size_t grpc_call_get_initial_size_estimate() {
335 return sizeof(grpc_call) + sizeof(batch_control) * MAX_CONCURRENT_BATCHES +
336 sizeof(grpc_linked_mdelem) * ESTIMATED_MDELEM_COUNT;
337 }
338
grpc_call_create(const grpc_call_create_args * args,grpc_call ** out_call)339 grpc_error_handle grpc_call_create(const grpc_call_create_args* args,
340 grpc_call** out_call) {
341 GPR_TIMER_SCOPE("grpc_call_create", 0);
342
343 GRPC_CHANNEL_INTERNAL_REF(args->channel, "call");
344
345 grpc_core::Arena* arena;
346 grpc_call* call;
347 grpc_error_handle error = GRPC_ERROR_NONE;
348 grpc_channel_stack* channel_stack =
349 grpc_channel_get_channel_stack(args->channel);
350 size_t initial_size = grpc_channel_get_call_size_estimate(args->channel);
351 GRPC_STATS_INC_CALL_INITIAL_SIZE(initial_size);
352 size_t call_and_stack_size =
353 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call)) +
354 channel_stack->call_stack_size;
355 size_t call_alloc_size =
356 call_and_stack_size + (args->parent ? sizeof(child_call) : 0);
357
358 std::pair<grpc_core::Arena*, void*> arena_with_call =
359 grpc_core::Arena::CreateWithAlloc(initial_size, call_alloc_size);
360 arena = arena_with_call.first;
361 call = new (arena_with_call.second) grpc_call(arena, *args);
362 *out_call = call;
363 grpc_slice path = grpc_empty_slice();
364 if (call->is_client) {
365 call->final_op.client.status_details = nullptr;
366 call->final_op.client.status = nullptr;
367 call->final_op.client.error_string = nullptr;
368 GRPC_STATS_INC_CLIENT_CALLS_CREATED();
369 GPR_ASSERT(args->add_initial_metadata_count <
370 MAX_SEND_EXTRA_METADATA_COUNT);
371 for (size_t i = 0; i < args->add_initial_metadata_count; i++) {
372 call->send_extra_metadata[i].md = args->add_initial_metadata[i];
373 if (grpc_slice_eq_static_interned(
374 GRPC_MDKEY(args->add_initial_metadata[i]), GRPC_MDSTR_PATH)) {
375 path = grpc_slice_ref_internal(
376 GRPC_MDVALUE(args->add_initial_metadata[i]));
377 }
378 }
379 call->send_extra_metadata_count =
380 static_cast<int>(args->add_initial_metadata_count);
381 } else {
382 GRPC_STATS_INC_SERVER_CALLS_CREATED();
383 call->final_op.server.cancelled = nullptr;
384 call->final_op.server.core_server = args->server;
385 GPR_ASSERT(args->add_initial_metadata_count == 0);
386 call->send_extra_metadata_count = 0;
387 }
388
389 grpc_millis send_deadline = args->send_deadline;
390 bool immediately_cancel = false;
391
392 if (args->parent != nullptr) {
393 call->child = new (reinterpret_cast<char*>(arena_with_call.second) +
394 call_and_stack_size) child_call(args->parent);
395
396 GRPC_CALL_INTERNAL_REF(args->parent, "child");
397 GPR_ASSERT(call->is_client);
398 GPR_ASSERT(!args->parent->is_client);
399
400 if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) {
401 send_deadline = GPR_MIN(send_deadline, args->parent->send_deadline);
402 }
403 /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
404 * GRPC_PROPAGATE_STATS_CONTEXT */
405 /* TODO(ctiller): This should change to use the appropriate census start_op
406 * call. */
407 if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
408 if (0 == (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) {
409 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
410 "Census tracing propagation requested "
411 "without Census context propagation"));
412 }
413 grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
414 args->parent->context[GRPC_CONTEXT_TRACING].value,
415 nullptr);
416 } else if (args->propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) {
417 add_init_error(&error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
418 "Census context propagation requested "
419 "without Census tracing propagation"));
420 }
421 if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
422 call->cancellation_is_inherited = true;
423 if (gpr_atm_acq_load(&args->parent->received_final_op_atm)) {
424 immediately_cancel = true;
425 }
426 }
427 }
428 call->send_deadline = send_deadline;
429 /* initial refcount dropped by grpc_call_unref */
430 grpc_call_element_args call_args = {CALL_STACK_FROM_CALL(call),
431 args->server_transport_data,
432 call->context,
433 path,
434 call->start_time,
435 send_deadline,
436 call->arena,
437 &call->call_combiner};
438 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, destroy_call,
439 call, &call_args));
440 // Publish this call to parent only after the call stack has been initialized.
441 if (args->parent != nullptr) {
442 child_call* cc = call->child;
443 parent_call* pc = get_or_create_parent_call(args->parent);
444 gpr_mu_lock(&pc->child_list_mu);
445 if (pc->first_child == nullptr) {
446 pc->first_child = call;
447 cc->sibling_next = cc->sibling_prev = call;
448 } else {
449 cc->sibling_next = pc->first_child;
450 cc->sibling_prev = pc->first_child->child->sibling_prev;
451 cc->sibling_next->child->sibling_prev =
452 cc->sibling_prev->child->sibling_next = call;
453 }
454 gpr_mu_unlock(&pc->child_list_mu);
455 }
456
457 if (error != GRPC_ERROR_NONE) {
458 cancel_with_error(call, GRPC_ERROR_REF(error));
459 }
460 if (immediately_cancel) {
461 cancel_with_error(call, GRPC_ERROR_CANCELLED);
462 }
463 if (args->cq != nullptr) {
464 GPR_ASSERT(args->pollset_set_alternative == nullptr &&
465 "Only one of 'cq' and 'pollset_set_alternative' should be "
466 "non-nullptr.");
467 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
468 call->pollent =
469 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
470 }
471 if (args->pollset_set_alternative != nullptr) {
472 call->pollent = grpc_polling_entity_create_from_pollset_set(
473 args->pollset_set_alternative);
474 }
475 if (!grpc_polling_entity_is_empty(&call->pollent)) {
476 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
477 &call->pollent);
478 }
479
480 if (call->is_client) {
481 grpc_core::channelz::ChannelNode* channelz_channel =
482 grpc_channel_get_channelz_node(call->channel);
483 if (channelz_channel != nullptr) {
484 channelz_channel->RecordCallStarted();
485 }
486 } else if (call->final_op.server.core_server != nullptr) {
487 grpc_core::channelz::ServerNode* channelz_node =
488 call->final_op.server.core_server->channelz_node();
489 if (channelz_node != nullptr) {
490 channelz_node->RecordCallStarted();
491 }
492 }
493
494 grpc_slice_unref_internal(path);
495
496 return error;
497 }
498
grpc_call_set_completion_queue(grpc_call * call,grpc_completion_queue * cq)499 void grpc_call_set_completion_queue(grpc_call* call,
500 grpc_completion_queue* cq) {
501 GPR_ASSERT(cq);
502
503 if (grpc_polling_entity_pollset_set(&call->pollent) != nullptr) {
504 gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
505 abort();
506 }
507 call->cq = cq;
508 GRPC_CQ_INTERNAL_REF(cq, "bind");
509 call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
510 grpc_call_stack_set_pollset_or_pollset_set(CALL_STACK_FROM_CALL(call),
511 &call->pollent);
512 }
513
514 #ifndef NDEBUG
515 #define REF_REASON reason
516 #define REF_ARG , const char* reason
517 #else
518 #define REF_REASON ""
519 #define REF_ARG
520 #endif
grpc_call_internal_ref(grpc_call * c REF_ARG)521 void grpc_call_internal_ref(grpc_call* c REF_ARG) {
522 GRPC_CALL_STACK_REF(CALL_STACK_FROM_CALL(c), REF_REASON);
523 }
grpc_call_internal_unref(grpc_call * c REF_ARG)524 void grpc_call_internal_unref(grpc_call* c REF_ARG) {
525 GRPC_CALL_STACK_UNREF(CALL_STACK_FROM_CALL(c), REF_REASON);
526 }
527
release_call(void * call,grpc_error_handle)528 static void release_call(void* call, grpc_error_handle /*error*/) {
529 grpc_call* c = static_cast<grpc_call*>(call);
530 grpc_channel* channel = c->channel;
531 grpc_core::Arena* arena = c->arena;
532 c->~grpc_call();
533 grpc_channel_update_call_size_estimate(channel, arena->Destroy());
534 GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
535 }
536
destroy_call(void * call,grpc_error_handle)537 static void destroy_call(void* call, grpc_error_handle /*error*/) {
538 GPR_TIMER_SCOPE("destroy_call", 0);
539 size_t i;
540 int ii;
541 grpc_call* c = static_cast<grpc_call*>(call);
542 for (i = 0; i < 2; i++) {
543 grpc_metadata_batch_destroy(
544 &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
545 }
546 c->receiving_stream.reset();
547 parent_call* pc = get_parent_call(c);
548 if (pc != nullptr) {
549 pc->~parent_call();
550 }
551 for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
552 GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
553 }
554 for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
555 if (c->context[i].destroy) {
556 c->context[i].destroy(c->context[i].value);
557 }
558 }
559 if (c->cq) {
560 GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
561 }
562
563 grpc_error_handle status_error =
564 reinterpret_cast<grpc_error_handle>(gpr_atm_acq_load(&c->status_error));
565 grpc_error_get_status(status_error, c->send_deadline,
566 &c->final_info.final_status, nullptr, nullptr,
567 &(c->final_info.error_string));
568 GRPC_ERROR_UNREF(status_error);
569 c->final_info.stats.latency =
570 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time);
571 grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
572 GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
573 grpc_schedule_on_exec_ctx));
574 }
575
grpc_call_ref(grpc_call * c)576 void grpc_call_ref(grpc_call* c) { c->ext_ref.Ref(); }
577
grpc_call_unref(grpc_call * c)578 void grpc_call_unref(grpc_call* c) {
579 if (GPR_LIKELY(!c->ext_ref.Unref())) return;
580
581 GPR_TIMER_SCOPE("grpc_call_unref", 0);
582
583 child_call* cc = c->child;
584 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
585 grpc_core::ExecCtx exec_ctx;
586
587 GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (c));
588
589 if (cc) {
590 parent_call* pc = get_parent_call(cc->parent);
591 gpr_mu_lock(&pc->child_list_mu);
592 if (c == pc->first_child) {
593 pc->first_child = cc->sibling_next;
594 if (c == pc->first_child) {
595 pc->first_child = nullptr;
596 }
597 }
598 cc->sibling_prev->child->sibling_next = cc->sibling_next;
599 cc->sibling_next->child->sibling_prev = cc->sibling_prev;
600 gpr_mu_unlock(&pc->child_list_mu);
601 GRPC_CALL_INTERNAL_UNREF(cc->parent, "child");
602 }
603
604 GPR_ASSERT(!c->destroy_called);
605 c->destroy_called = true;
606 bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
607 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
608 if (cancel) {
609 cancel_with_error(c, GRPC_ERROR_CANCELLED);
610 } else {
611 // Unset the call combiner cancellation closure. This has the
612 // effect of scheduling the previously set cancellation closure, if
613 // any, so that it can release any internal references it may be
614 // holding to the call stack.
615 c->call_combiner.SetNotifyOnCancel(nullptr);
616 }
617 GRPC_CALL_INTERNAL_UNREF(c, "destroy");
618 }
619
grpc_call_cancel(grpc_call * call,void * reserved)620 grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
621 GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
622 GPR_ASSERT(!reserved);
623 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
624 grpc_core::ExecCtx exec_ctx;
625 cancel_with_error(call, GRPC_ERROR_CANCELLED);
626 return GRPC_CALL_OK;
627 }
628
629 // This is called via the call combiner to start sending a batch down
630 // the filter stack.
execute_batch_in_call_combiner(void * arg,grpc_error_handle)631 static void execute_batch_in_call_combiner(void* arg,
632 grpc_error_handle /*ignored*/) {
633 GPR_TIMER_SCOPE("execute_batch_in_call_combiner", 0);
634 grpc_transport_stream_op_batch* batch =
635 static_cast<grpc_transport_stream_op_batch*>(arg);
636 grpc_call* call = static_cast<grpc_call*>(batch->handler_private.extra_arg);
637 grpc_call_element* elem = CALL_ELEM_FROM_CALL(call, 0);
638 GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
639 elem->filter->start_transport_stream_op_batch(elem, batch);
640 }
641
642 // start_batch_closure points to a caller-allocated closure to be used
643 // for entering the call combiner.
execute_batch(grpc_call * call,grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)644 static void execute_batch(grpc_call* call,
645 grpc_transport_stream_op_batch* batch,
646 grpc_closure* start_batch_closure) {
647 batch->handler_private.extra_arg = call;
648 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
649 grpc_schedule_on_exec_ctx);
650 GRPC_CALL_COMBINER_START(&call->call_combiner, start_batch_closure,
651 GRPC_ERROR_NONE, "executing batch");
652 }
653
grpc_call_get_peer(grpc_call * call)654 char* grpc_call_get_peer(grpc_call* call) {
655 char* peer_string =
656 reinterpret_cast<char*>(gpr_atm_acq_load(&call->peer_string));
657 if (peer_string != nullptr) return gpr_strdup(peer_string);
658 peer_string = grpc_channel_get_target(call->channel);
659 if (peer_string != nullptr) return peer_string;
660 return gpr_strdup("unknown");
661 }
662
grpc_call_from_top_element(grpc_call_element * surface_element)663 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
664 return CALL_FROM_TOP_ELEM(surface_element);
665 }
666
667 /*******************************************************************************
668 * CANCELLATION
669 */
670
grpc_call_cancel_with_status(grpc_call * c,grpc_status_code status,const char * description,void * reserved)671 grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
672 grpc_status_code status,
673 const char* description,
674 void* reserved) {
675 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
676 grpc_core::ExecCtx exec_ctx;
677 GRPC_API_TRACE(
678 "grpc_call_cancel_with_status("
679 "c=%p, status=%d, description=%s, reserved=%p)",
680 4, (c, (int)status, description, reserved));
681 GPR_ASSERT(reserved == nullptr);
682 cancel_with_status(c, status, description);
683 return GRPC_CALL_OK;
684 }
685
686 struct cancel_state {
687 grpc_call* call;
688 grpc_closure start_batch;
689 grpc_closure finish_batch;
690 };
691 // The on_complete callback used when sending a cancel_stream batch down
692 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)693 static void done_termination(void* arg, grpc_error_handle /*error*/) {
694 cancel_state* state = static_cast<cancel_state*>(arg);
695 GRPC_CALL_COMBINER_STOP(&state->call->call_combiner,
696 "on_complete for cancel_stream op");
697 GRPC_CALL_INTERNAL_UNREF(state->call, "termination");
698 gpr_free(state);
699 }
700
cancel_with_error(grpc_call * c,grpc_error_handle error)701 static void cancel_with_error(grpc_call* c, grpc_error_handle error) {
702 if (!gpr_atm_rel_cas(&c->cancelled_with_error, 0, 1)) {
703 GRPC_ERROR_UNREF(error);
704 return;
705 }
706 GRPC_CALL_INTERNAL_REF(c, "termination");
707 // Inform the call combiner of the cancellation, so that it can cancel
708 // any in-flight asynchronous actions that may be holding the call
709 // combiner. This ensures that the cancel_stream batch can be sent
710 // down the filter stack in a timely manner.
711 c->call_combiner.Cancel(GRPC_ERROR_REF(error));
712 cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
713 state->call = c;
714 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
715 grpc_schedule_on_exec_ctx);
716 grpc_transport_stream_op_batch* op =
717 grpc_make_transport_stream_op(&state->finish_batch);
718 op->cancel_stream = true;
719 op->payload->cancel_stream.cancel_error = error;
720 execute_batch(c, op, &state->start_batch);
721 }
722
grpc_call_cancel_internal(grpc_call * call)723 void grpc_call_cancel_internal(grpc_call* call) {
724 cancel_with_error(call, GRPC_ERROR_CANCELLED);
725 }
726
error_from_status(grpc_status_code status,const char * description)727 static grpc_error_handle error_from_status(grpc_status_code status,
728 const char* description) {
729 // copying 'description' is needed to ensure the grpc_call_cancel_with_status
730 // guarantee that can be short-lived.
731 return grpc_error_set_int(
732 grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
733 GRPC_ERROR_STR_GRPC_MESSAGE,
734 grpc_slice_from_copied_string(description)),
735 GRPC_ERROR_INT_GRPC_STATUS, status);
736 }
737
cancel_with_status(grpc_call * c,grpc_status_code status,const char * description)738 static void cancel_with_status(grpc_call* c, grpc_status_code status,
739 const char* description) {
740 cancel_with_error(c, error_from_status(status, description));
741 }
742
set_final_status(grpc_call * call,grpc_error_handle error)743 static void set_final_status(grpc_call* call, grpc_error_handle error) {
744 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) {
745 gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
746 gpr_log(GPR_DEBUG, "%s", grpc_error_std_string(error).c_str());
747 }
748 if (call->is_client) {
749 grpc_error_get_status(error, call->send_deadline,
750 call->final_op.client.status,
751 call->final_op.client.status_details, nullptr,
752 call->final_op.client.error_string);
753 // explicitly take a ref
754 grpc_slice_ref_internal(*call->final_op.client.status_details);
755 gpr_atm_rel_store(&call->status_error, reinterpret_cast<gpr_atm>(error));
756 grpc_core::channelz::ChannelNode* channelz_channel =
757 grpc_channel_get_channelz_node(call->channel);
758 if (channelz_channel != nullptr) {
759 if (*call->final_op.client.status != GRPC_STATUS_OK) {
760 channelz_channel->RecordCallFailed();
761 } else {
762 channelz_channel->RecordCallSucceeded();
763 }
764 }
765 } else {
766 *call->final_op.server.cancelled =
767 error != GRPC_ERROR_NONE || !call->sent_server_trailing_metadata;
768 grpc_core::channelz::ServerNode* channelz_node =
769 call->final_op.server.core_server->channelz_node();
770 if (channelz_node != nullptr) {
771 if (*call->final_op.server.cancelled ||
772 reinterpret_cast<grpc_error_handle>(
773 gpr_atm_acq_load(&call->status_error)) != GRPC_ERROR_NONE) {
774 channelz_node->RecordCallFailed();
775 } else {
776 channelz_node->RecordCallSucceeded();
777 }
778 }
779 GRPC_ERROR_UNREF(error);
780 }
781 }
782
783 /*******************************************************************************
784 * COMPRESSION
785 */
786
set_incoming_message_compression_algorithm(grpc_call * call,grpc_message_compression_algorithm algo)787 static void set_incoming_message_compression_algorithm(
788 grpc_call* call, grpc_message_compression_algorithm algo) {
789 GPR_ASSERT(algo < GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT);
790 call->incoming_message_compression_algorithm = algo;
791 }
792
set_incoming_stream_compression_algorithm(grpc_call * call,grpc_stream_compression_algorithm algo)793 static void set_incoming_stream_compression_algorithm(
794 grpc_call* call, grpc_stream_compression_algorithm algo) {
795 GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
796 call->incoming_stream_compression_algorithm = algo;
797 }
798
grpc_call_test_only_get_compression_algorithm(grpc_call * call)799 grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
800 grpc_call* call) {
801 grpc_compression_algorithm algorithm = GRPC_COMPRESS_NONE;
802 grpc_compression_algorithm_from_message_stream_compression_algorithm(
803 &algorithm, call->incoming_message_compression_algorithm,
804 call->incoming_stream_compression_algorithm);
805 return algorithm;
806 }
807
compression_algorithm_for_level_locked(grpc_call * call,grpc_compression_level level)808 static grpc_compression_algorithm compression_algorithm_for_level_locked(
809 grpc_call* call, grpc_compression_level level) {
810 return grpc_compression_algorithm_for_level(level,
811 call->encodings_accepted_by_peer);
812 }
813
grpc_call_test_only_get_message_flags(grpc_call * call)814 uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) {
815 uint32_t flags;
816 flags = call->test_only_last_message_flags;
817 return flags;
818 }
819
destroy_encodings_accepted_by_peer(void *)820 static void destroy_encodings_accepted_by_peer(void* /*p*/) {}
821
set_encodings_accepted_by_peer(grpc_call *,grpc_mdelem mdel,uint32_t * encodings_accepted_by_peer,bool stream_encoding)822 static void set_encodings_accepted_by_peer(grpc_call* /*call*/,
823 grpc_mdelem mdel,
824 uint32_t* encodings_accepted_by_peer,
825 bool stream_encoding) {
826 size_t i;
827 uint32_t algorithm;
828 grpc_slice_buffer accept_encoding_parts;
829 grpc_slice accept_encoding_slice;
830 void* accepted_user_data;
831
832 accepted_user_data =
833 grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
834 if (accepted_user_data != nullptr) {
835 *encodings_accepted_by_peer = static_cast<uint32_t>(
836 reinterpret_cast<uintptr_t>(accepted_user_data) - 1);
837 return;
838 }
839
840 *encodings_accepted_by_peer = 0;
841
842 accept_encoding_slice = GRPC_MDVALUE(mdel);
843 grpc_slice_buffer_init(&accept_encoding_parts);
844 grpc_slice_split_without_space(accept_encoding_slice, ",",
845 &accept_encoding_parts);
846
847 GPR_BITSET(encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
848 for (i = 0; i < accept_encoding_parts.count; i++) {
849 int r;
850 grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
851 if (!stream_encoding) {
852 r = grpc_message_compression_algorithm_parse(
853 accept_encoding_entry_slice,
854 reinterpret_cast<grpc_message_compression_algorithm*>(&algorithm));
855 } else {
856 r = grpc_stream_compression_algorithm_parse(
857 accept_encoding_entry_slice,
858 reinterpret_cast<grpc_stream_compression_algorithm*>(&algorithm));
859 }
860 if (r) {
861 GPR_BITSET(encodings_accepted_by_peer, algorithm);
862 } else {
863 char* accept_encoding_entry_str =
864 grpc_slice_to_c_string(accept_encoding_entry_slice);
865 gpr_log(GPR_DEBUG,
866 "Unknown entry in accept encoding metadata: '%s'. Ignoring.",
867 accept_encoding_entry_str);
868 gpr_free(accept_encoding_entry_str);
869 }
870 }
871
872 grpc_slice_buffer_destroy_internal(&accept_encoding_parts);
873
874 grpc_mdelem_set_user_data(
875 mdel, destroy_encodings_accepted_by_peer,
876 reinterpret_cast<void*>(
877 static_cast<uintptr_t>(*encodings_accepted_by_peer) + 1));
878 }
879
grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call * call)880 uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) {
881 uint32_t encodings_accepted_by_peer;
882 encodings_accepted_by_peer = call->encodings_accepted_by_peer;
883 return encodings_accepted_by_peer;
884 }
885
886 grpc_stream_compression_algorithm
grpc_call_test_only_get_incoming_stream_encodings(grpc_call * call)887 grpc_call_test_only_get_incoming_stream_encodings(grpc_call* call) {
888 return call->incoming_stream_compression_algorithm;
889 }
890
linked_from_md(grpc_metadata * md)891 static grpc_linked_mdelem* linked_from_md(grpc_metadata* md) {
892 return reinterpret_cast<grpc_linked_mdelem*>(&md->internal_data);
893 }
894
get_md_elem(grpc_metadata * metadata,grpc_metadata * additional_metadata,int i,int count)895 static grpc_metadata* get_md_elem(grpc_metadata* metadata,
896 grpc_metadata* additional_metadata, int i,
897 int count) {
898 grpc_metadata* res =
899 i < count ? &metadata[i] : &additional_metadata[i - count];
900 GPR_ASSERT(res);
901 return res;
902 }
903
prepare_application_metadata(grpc_call * call,int count,grpc_metadata * metadata,int is_trailing,int prepend_extra_metadata,grpc_metadata * additional_metadata,int additional_metadata_count)904 static int prepare_application_metadata(grpc_call* call, int count,
905 grpc_metadata* metadata,
906 int is_trailing,
907 int prepend_extra_metadata,
908 grpc_metadata* additional_metadata,
909 int additional_metadata_count) {
910 int total_count = count + additional_metadata_count;
911 int i;
912 grpc_metadata_batch* batch =
913 &call->metadata_batch[0 /* is_receiving */][is_trailing];
914 for (i = 0; i < total_count; i++) {
915 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
916 grpc_linked_mdelem* l = linked_from_md(md);
917 GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
918 if (!GRPC_LOG_IF_ERROR("validate_metadata",
919 grpc_validate_header_key_is_legal(md->key))) {
920 break;
921 } else if (!grpc_is_binary_header_internal(md->key) &&
922 !GRPC_LOG_IF_ERROR(
923 "validate_metadata",
924 grpc_validate_header_nonbin_value_is_legal(md->value))) {
925 break;
926 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
927 // HTTP2 hpack encoding has a maximum limit.
928 break;
929 }
930 l->md = grpc_mdelem_from_grpc_metadata(const_cast<grpc_metadata*>(md));
931 }
932 if (i != total_count) {
933 for (int j = 0; j < i; j++) {
934 grpc_metadata* md = get_md_elem(metadata, additional_metadata, j, count);
935 grpc_linked_mdelem* l = linked_from_md(md);
936 GRPC_MDELEM_UNREF(l->md);
937 }
938 return 0;
939 }
940 if (prepend_extra_metadata) {
941 if (call->send_extra_metadata_count == 0) {
942 prepend_extra_metadata = 0;
943 } else {
944 for (i = 0; i < call->send_extra_metadata_count; i++) {
945 GRPC_LOG_IF_ERROR("prepare_application_metadata",
946 grpc_metadata_batch_link_tail(
947 batch, &call->send_extra_metadata[i]));
948 }
949 }
950 }
951 for (i = 0; i < total_count; i++) {
952 grpc_metadata* md = get_md_elem(metadata, additional_metadata, i, count);
953 grpc_linked_mdelem* l = linked_from_md(md);
954 grpc_error_handle error = grpc_metadata_batch_link_tail(batch, l);
955 if (error != GRPC_ERROR_NONE) {
956 GRPC_MDELEM_UNREF(l->md);
957 }
958 GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
959 }
960 call->send_extra_metadata_count = 0;
961
962 return 1;
963 }
964
decode_message_compression(grpc_mdelem md)965 static grpc_message_compression_algorithm decode_message_compression(
966 grpc_mdelem md) {
967 grpc_message_compression_algorithm algorithm =
968 grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
969 if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
970 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
971 gpr_log(GPR_ERROR,
972 "Invalid incoming message compression algorithm: '%s'. "
973 "Interpreting incoming data as uncompressed.",
974 md_c_str);
975 gpr_free(md_c_str);
976 return GRPC_MESSAGE_COMPRESS_NONE;
977 }
978 return algorithm;
979 }
980
decode_stream_compression(grpc_mdelem md)981 static grpc_stream_compression_algorithm decode_stream_compression(
982 grpc_mdelem md) {
983 grpc_stream_compression_algorithm algorithm =
984 grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
985 if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
986 char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
987 gpr_log(GPR_ERROR,
988 "Invalid incoming stream compression algorithm: '%s'. Interpreting "
989 "incoming data as uncompressed.",
990 md_c_str);
991 gpr_free(md_c_str);
992 return GRPC_STREAM_COMPRESS_NONE;
993 }
994 return algorithm;
995 }
996
publish_app_metadata(grpc_call * call,grpc_metadata_batch * b,int is_trailing)997 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
998 int is_trailing) {
999 if (b->list.count == 0) return;
1000 if (!call->is_client && is_trailing) return;
1001 if (is_trailing && call->buffered_metadata[1] == nullptr) return;
1002 GPR_TIMER_SCOPE("publish_app_metadata", 0);
1003 grpc_metadata_array* dest;
1004 grpc_metadata* mdusr;
1005 dest = call->buffered_metadata[is_trailing];
1006 if (dest->count + b->list.count > dest->capacity) {
1007 dest->capacity =
1008 GPR_MAX(dest->capacity + b->list.count, dest->capacity * 3 / 2);
1009 dest->metadata = static_cast<grpc_metadata*>(
1010 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
1011 }
1012 for (grpc_linked_mdelem* l = b->list.head; l != nullptr; l = l->next) {
1013 mdusr = &dest->metadata[dest->count++];
1014 /* we pass back borrowed slices that are valid whilst the call is valid */
1015 mdusr->key = GRPC_MDKEY(l->md);
1016 mdusr->value = GRPC_MDVALUE(l->md);
1017 }
1018 }
1019
recv_initial_filter(grpc_call * call,grpc_metadata_batch * b)1020 static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
1021 if (b->idx.named.content_encoding != nullptr) {
1022 GPR_TIMER_SCOPE("incoming_stream_compression_algorithm", 0);
1023 set_incoming_stream_compression_algorithm(
1024 call, decode_stream_compression(b->idx.named.content_encoding->md));
1025 grpc_metadata_batch_remove(b, GRPC_BATCH_CONTENT_ENCODING);
1026 }
1027 if (b->idx.named.grpc_encoding != nullptr) {
1028 GPR_TIMER_SCOPE("incoming_message_compression_algorithm", 0);
1029 set_incoming_message_compression_algorithm(
1030 call, decode_message_compression(b->idx.named.grpc_encoding->md));
1031 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ENCODING);
1032 }
1033 uint32_t message_encodings_accepted_by_peer = 1u;
1034 uint32_t stream_encodings_accepted_by_peer = 1u;
1035 if (b->idx.named.grpc_accept_encoding != nullptr) {
1036 GPR_TIMER_SCOPE("encodings_accepted_by_peer", 0);
1037 set_encodings_accepted_by_peer(call, b->idx.named.grpc_accept_encoding->md,
1038 &message_encodings_accepted_by_peer, false);
1039 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_ACCEPT_ENCODING);
1040 }
1041 if (b->idx.named.accept_encoding != nullptr) {
1042 GPR_TIMER_SCOPE("stream_encodings_accepted_by_peer", 0);
1043 set_encodings_accepted_by_peer(call, b->idx.named.accept_encoding->md,
1044 &stream_encodings_accepted_by_peer, true);
1045 grpc_metadata_batch_remove(b, GRPC_BATCH_ACCEPT_ENCODING);
1046 }
1047 call->encodings_accepted_by_peer =
1048 grpc_compression_bitset_from_message_stream_compression_bitset(
1049 message_encodings_accepted_by_peer,
1050 stream_encodings_accepted_by_peer);
1051 publish_app_metadata(call, b, false);
1052 }
1053
recv_trailing_filter(void * args,grpc_metadata_batch * b,grpc_error_handle batch_error)1054 static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
1055 grpc_error_handle batch_error) {
1056 grpc_call* call = static_cast<grpc_call*>(args);
1057 if (batch_error != GRPC_ERROR_NONE) {
1058 set_final_status(call, batch_error);
1059 } else if (b->idx.named.grpc_status != nullptr) {
1060 grpc_status_code status_code =
1061 grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
1062 grpc_error_handle error = GRPC_ERROR_NONE;
1063 if (status_code != GRPC_STATUS_OK) {
1064 char* peer = grpc_call_get_peer(call);
1065 error = grpc_error_set_int(
1066 GRPC_ERROR_CREATE_FROM_COPIED_STRING(
1067 absl::StrCat("Error received from peer ", peer).c_str()),
1068 GRPC_ERROR_INT_GRPC_STATUS, static_cast<intptr_t>(status_code));
1069 gpr_free(peer);
1070 }
1071 if (b->idx.named.grpc_message != nullptr) {
1072 error = grpc_error_set_str(
1073 error, GRPC_ERROR_STR_GRPC_MESSAGE,
1074 grpc_slice_ref_internal(GRPC_MDVALUE(b->idx.named.grpc_message->md)));
1075 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_MESSAGE);
1076 } else if (error != GRPC_ERROR_NONE) {
1077 error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
1078 grpc_empty_slice());
1079 }
1080 set_final_status(call, GRPC_ERROR_REF(error));
1081 grpc_metadata_batch_remove(b, GRPC_BATCH_GRPC_STATUS);
1082 GRPC_ERROR_UNREF(error);
1083 } else if (!call->is_client) {
1084 set_final_status(call, GRPC_ERROR_NONE);
1085 } else {
1086 gpr_log(GPR_DEBUG,
1087 "Received trailing metadata with no error and no status");
1088 set_final_status(
1089 call, grpc_error_set_int(
1090 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No status received"),
1091 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNKNOWN));
1092 }
1093 publish_app_metadata(call, b, true);
1094 }
1095
grpc_call_get_arena(grpc_call * call)1096 grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return call->arena; }
1097
grpc_call_get_call_stack(grpc_call * call)1098 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
1099 return CALL_STACK_FROM_CALL(call);
1100 }
1101
1102 /*******************************************************************************
1103 * BATCH API IMPLEMENTATION
1104 */
1105
are_write_flags_valid(uint32_t flags)1106 static bool are_write_flags_valid(uint32_t flags) {
1107 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1108 const uint32_t allowed_write_positions =
1109 (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1110 const uint32_t invalid_positions = ~allowed_write_positions;
1111 return !(flags & invalid_positions);
1112 }
1113
are_initial_metadata_flags_valid(uint32_t flags,bool is_client)1114 static bool are_initial_metadata_flags_valid(uint32_t flags, bool is_client) {
1115 /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1116 uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK;
1117 if (!is_client) {
1118 invalid_positions |= GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
1119 }
1120 return !(flags & invalid_positions);
1121 }
1122
batch_slot_for_op(grpc_op_type type)1123 static size_t batch_slot_for_op(grpc_op_type type) {
1124 switch (type) {
1125 case GRPC_OP_SEND_INITIAL_METADATA:
1126 return 0;
1127 case GRPC_OP_SEND_MESSAGE:
1128 return 1;
1129 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1130 case GRPC_OP_SEND_STATUS_FROM_SERVER:
1131 return 2;
1132 case GRPC_OP_RECV_INITIAL_METADATA:
1133 return 3;
1134 case GRPC_OP_RECV_MESSAGE:
1135 return 4;
1136 case GRPC_OP_RECV_CLOSE_ON_SERVER:
1137 case GRPC_OP_RECV_STATUS_ON_CLIENT:
1138 return 5;
1139 }
1140 GPR_UNREACHABLE_CODE(return 123456789);
1141 }
1142
reuse_or_allocate_batch_control(grpc_call * call,const grpc_op * ops)1143 static batch_control* reuse_or_allocate_batch_control(grpc_call* call,
1144 const grpc_op* ops) {
1145 size_t slot_idx = batch_slot_for_op(ops[0].op);
1146 batch_control** pslot = &call->active_batches[slot_idx];
1147 batch_control* bctl;
1148 if (*pslot != nullptr) {
1149 bctl = *pslot;
1150 if (bctl->call != nullptr) {
1151 return nullptr;
1152 }
1153 bctl->~batch_control();
1154 bctl->op = {};
1155 } else {
1156 bctl = call->arena->New<batch_control>();
1157 *pslot = bctl;
1158 }
1159 bctl->call = call;
1160 bctl->op.payload = &call->stream_op_payload;
1161 return bctl;
1162 }
1163
finish_batch_completion(void * user_data,grpc_cq_completion *)1164 static void finish_batch_completion(void* user_data,
1165 grpc_cq_completion* /*storage*/) {
1166 batch_control* bctl = static_cast<batch_control*>(user_data);
1167 grpc_call* call = bctl->call;
1168 bctl->call = nullptr;
1169 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1170 }
1171
reset_batch_errors(batch_control * bctl)1172 static void reset_batch_errors(batch_control* bctl) {
1173 GRPC_ERROR_UNREF(reinterpret_cast<grpc_error_handle>(
1174 gpr_atm_acq_load(&bctl->batch_error)));
1175 gpr_atm_rel_store(&bctl->batch_error,
1176 reinterpret_cast<gpr_atm>(GRPC_ERROR_NONE));
1177 }
1178
post_batch_completion(batch_control * bctl)1179 static void post_batch_completion(batch_control* bctl) {
1180 grpc_call* next_child_call;
1181 grpc_call* call = bctl->call;
1182 grpc_error_handle error = GRPC_ERROR_REF(reinterpret_cast<grpc_error_handle>(
1183 gpr_atm_acq_load(&bctl->batch_error)));
1184
1185 if (bctl->op.send_initial_metadata) {
1186 grpc_metadata_batch_destroy(
1187 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
1188 }
1189 if (bctl->op.send_message) {
1190 if (bctl->op.payload->send_message.stream_write_closed &&
1191 error == GRPC_ERROR_NONE) {
1192 error = grpc_error_add_child(
1193 error, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1194 "Attempt to send message after stream was closed."));
1195 }
1196 call->sending_message = false;
1197 }
1198 if (bctl->op.send_trailing_metadata) {
1199 grpc_metadata_batch_destroy(
1200 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1201 }
1202 if (bctl->op.recv_trailing_metadata) {
1203 /* propagate cancellation to any interested children */
1204 gpr_atm_rel_store(&call->received_final_op_atm, 1);
1205 parent_call* pc = get_parent_call(call);
1206 if (pc != nullptr) {
1207 grpc_call* child;
1208 gpr_mu_lock(&pc->child_list_mu);
1209 child = pc->first_child;
1210 if (child != nullptr) {
1211 do {
1212 next_child_call = child->child->sibling_next;
1213 if (child->cancellation_is_inherited) {
1214 GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
1215 cancel_with_error(child, GRPC_ERROR_CANCELLED);
1216 GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
1217 }
1218 child = next_child_call;
1219 } while (child != pc->first_child);
1220 }
1221 gpr_mu_unlock(&pc->child_list_mu);
1222 }
1223 GRPC_ERROR_UNREF(error);
1224 error = GRPC_ERROR_NONE;
1225 }
1226 if (error != GRPC_ERROR_NONE && bctl->op.recv_message &&
1227 *call->receiving_buffer != nullptr) {
1228 grpc_byte_buffer_destroy(*call->receiving_buffer);
1229 *call->receiving_buffer = nullptr;
1230 }
1231 reset_batch_errors(bctl);
1232
1233 if (bctl->completion_data.notify_tag.is_closure) {
1234 /* unrefs error */
1235 bctl->call = nullptr;
1236 grpc_core::Closure::Run(
1237 DEBUG_LOCATION,
1238 static_cast<grpc_closure*>(bctl->completion_data.notify_tag.tag),
1239 error);
1240 GRPC_CALL_INTERNAL_UNREF(call, "completion");
1241 } else {
1242 /* unrefs error */
1243 grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
1244 finish_batch_completion, bctl,
1245 &bctl->completion_data.cq_completion);
1246 }
1247 }
1248
finish_batch_step(batch_control * bctl)1249 static void finish_batch_step(batch_control* bctl) {
1250 if (GPR_UNLIKELY(bctl->completed_batch_step())) {
1251 post_batch_completion(bctl);
1252 }
1253 }
1254
continue_receiving_slices(batch_control * bctl)1255 static void continue_receiving_slices(batch_control* bctl) {
1256 grpc_error_handle error;
1257 grpc_call* call = bctl->call;
1258 for (;;) {
1259 size_t remaining = call->receiving_stream->length() -
1260 (*call->receiving_buffer)->data.raw.slice_buffer.length;
1261 if (remaining == 0) {
1262 call->receiving_message = false;
1263 call->receiving_stream.reset();
1264 finish_batch_step(bctl);
1265 return;
1266 }
1267 if (call->receiving_stream->Next(remaining, &call->receiving_slice_ready)) {
1268 error = call->receiving_stream->Pull(&call->receiving_slice);
1269 if (error == GRPC_ERROR_NONE) {
1270 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1271 call->receiving_slice);
1272 } else {
1273 call->receiving_stream.reset();
1274 grpc_byte_buffer_destroy(*call->receiving_buffer);
1275 *call->receiving_buffer = nullptr;
1276 call->receiving_message = false;
1277 finish_batch_step(bctl);
1278 GRPC_ERROR_UNREF(error);
1279 return;
1280 }
1281 } else {
1282 return;
1283 }
1284 }
1285 }
1286
receiving_slice_ready(void * bctlp,grpc_error_handle error)1287 static void receiving_slice_ready(void* bctlp, grpc_error_handle error) {
1288 batch_control* bctl = static_cast<batch_control*>(bctlp);
1289 grpc_call* call = bctl->call;
1290 bool release_error = false;
1291
1292 if (error == GRPC_ERROR_NONE) {
1293 grpc_slice slice;
1294 error = call->receiving_stream->Pull(&slice);
1295 if (error == GRPC_ERROR_NONE) {
1296 grpc_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
1297 slice);
1298 continue_receiving_slices(bctl);
1299 } else {
1300 /* Error returned by ByteStream::Pull() needs to be released manually */
1301 release_error = true;
1302 }
1303 }
1304
1305 if (error != GRPC_ERROR_NONE) {
1306 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_operation_failures)) {
1307 GRPC_LOG_IF_ERROR("receiving_slice_ready", GRPC_ERROR_REF(error));
1308 }
1309 call->receiving_stream.reset();
1310 grpc_byte_buffer_destroy(*call->receiving_buffer);
1311 *call->receiving_buffer = nullptr;
1312 call->receiving_message = false;
1313 finish_batch_step(bctl);
1314 if (release_error) {
1315 GRPC_ERROR_UNREF(error);
1316 }
1317 }
1318 }
1319
process_data_after_md(batch_control * bctl)1320 static void process_data_after_md(batch_control* bctl) {
1321 grpc_call* call = bctl->call;
1322 if (call->receiving_stream == nullptr) {
1323 *call->receiving_buffer = nullptr;
1324 call->receiving_message = false;
1325 finish_batch_step(bctl);
1326 } else {
1327 call->test_only_last_message_flags = call->receiving_stream->flags();
1328 if ((call->receiving_stream->flags() & GRPC_WRITE_INTERNAL_COMPRESS) &&
1329 (call->incoming_message_compression_algorithm >
1330 GRPC_MESSAGE_COMPRESS_NONE)) {
1331 grpc_compression_algorithm algo;
1332 GPR_ASSERT(
1333 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1334 &algo, call->incoming_message_compression_algorithm,
1335 (grpc_stream_compression_algorithm)0));
1336 *call->receiving_buffer =
1337 grpc_raw_compressed_byte_buffer_create(nullptr, 0, algo);
1338 } else {
1339 *call->receiving_buffer = grpc_raw_byte_buffer_create(nullptr, 0);
1340 }
1341 GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl,
1342 grpc_schedule_on_exec_ctx);
1343 continue_receiving_slices(bctl);
1344 }
1345 }
1346
receiving_stream_ready(void * bctlp,grpc_error_handle error)1347 static void receiving_stream_ready(void* bctlp, grpc_error_handle error) {
1348 batch_control* bctl = static_cast<batch_control*>(bctlp);
1349 grpc_call* call = bctl->call;
1350 if (error != GRPC_ERROR_NONE) {
1351 call->receiving_stream.reset();
1352 if (reinterpret_cast<grpc_error_handle>(
1353 gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
1354 gpr_atm_rel_store(&bctl->batch_error,
1355 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1356 }
1357 cancel_with_error(call, GRPC_ERROR_REF(error));
1358 }
1359 /* If recv_state is RECV_NONE, we will save the batch_control
1360 * object with rel_cas, and will not use it after the cas. Its corresponding
1361 * acq_load is in receiving_initial_metadata_ready() */
1362 if (error != GRPC_ERROR_NONE || call->receiving_stream == nullptr ||
1363 !gpr_atm_rel_cas(&call->recv_state, RECV_NONE,
1364 reinterpret_cast<gpr_atm>(bctlp))) {
1365 process_data_after_md(bctl);
1366 }
1367 }
1368
1369 // The recv_message_ready callback used when sending a batch containing
1370 // a recv_message op down the filter stack. Yields the call combiner
1371 // before processing the received message.
receiving_stream_ready_in_call_combiner(void * bctlp,grpc_error_handle error)1372 static void receiving_stream_ready_in_call_combiner(void* bctlp,
1373 grpc_error_handle error) {
1374 batch_control* bctl = static_cast<batch_control*>(bctlp);
1375 grpc_call* call = bctl->call;
1376 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_message_ready");
1377 receiving_stream_ready(bctlp, error);
1378 }
1379
1380 static void GPR_ATTRIBUTE_NOINLINE
handle_both_stream_and_msg_compression_set(grpc_call * call)1381 handle_both_stream_and_msg_compression_set(grpc_call* call) {
1382 std::string error_msg = absl::StrFormat(
1383 "Incoming stream has both stream compression (%d) and message "
1384 "compression (%d).",
1385 call->incoming_stream_compression_algorithm,
1386 call->incoming_message_compression_algorithm);
1387 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1388 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1389 }
1390
1391 static void GPR_ATTRIBUTE_NOINLINE
handle_error_parsing_compression_algorithm(grpc_call * call)1392 handle_error_parsing_compression_algorithm(grpc_call* call) {
1393 std::string error_msg = absl::StrFormat(
1394 "Error in incoming message compression (%d) or stream "
1395 "compression (%d).",
1396 call->incoming_stream_compression_algorithm,
1397 call->incoming_message_compression_algorithm);
1398 cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg.c_str());
1399 }
1400
handle_invalid_compression(grpc_call * call,grpc_compression_algorithm compression_algorithm)1401 static void GPR_ATTRIBUTE_NOINLINE handle_invalid_compression(
1402 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1403 std::string error_msg = absl::StrFormat(
1404 "Invalid compression algorithm value '%d'.", compression_algorithm);
1405 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1406 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1407 }
1408
handle_compression_algorithm_disabled(grpc_call * call,grpc_compression_algorithm compression_algorithm)1409 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_disabled(
1410 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1411 const char* algo_name = nullptr;
1412 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1413 std::string error_msg =
1414 absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name);
1415 gpr_log(GPR_ERROR, "%s", error_msg.c_str());
1416 cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg.c_str());
1417 }
1418
handle_compression_algorithm_not_accepted(grpc_call * call,grpc_compression_algorithm compression_algorithm)1419 static void GPR_ATTRIBUTE_NOINLINE handle_compression_algorithm_not_accepted(
1420 grpc_call* call, grpc_compression_algorithm compression_algorithm) {
1421 const char* algo_name = nullptr;
1422 grpc_compression_algorithm_name(compression_algorithm, &algo_name);
1423 gpr_log(GPR_ERROR,
1424 "Compression algorithm ('%s') not present in the bitset of "
1425 "accepted encodings ('0x%x')",
1426 algo_name, call->encodings_accepted_by_peer);
1427 }
1428
validate_filtered_metadata(batch_control * bctl)1429 static void validate_filtered_metadata(batch_control* bctl) {
1430 grpc_compression_algorithm compression_algorithm;
1431 grpc_call* call = bctl->call;
1432 if (GPR_UNLIKELY(call->incoming_stream_compression_algorithm !=
1433 GRPC_STREAM_COMPRESS_NONE &&
1434 call->incoming_message_compression_algorithm !=
1435 GRPC_MESSAGE_COMPRESS_NONE)) {
1436 handle_both_stream_and_msg_compression_set(call);
1437 } else if (
1438 GPR_UNLIKELY(
1439 grpc_compression_algorithm_from_message_stream_compression_algorithm(
1440 &compression_algorithm,
1441 call->incoming_message_compression_algorithm,
1442 call->incoming_stream_compression_algorithm) == 0)) {
1443 handle_error_parsing_compression_algorithm(call);
1444 } else {
1445 const grpc_compression_options compression_options =
1446 grpc_channel_compression_options(call->channel);
1447 if (GPR_UNLIKELY(compression_algorithm >= GRPC_COMPRESS_ALGORITHMS_COUNT)) {
1448 handle_invalid_compression(call, compression_algorithm);
1449 } else if (GPR_UNLIKELY(
1450 grpc_compression_options_is_algorithm_enabled_internal(
1451 &compression_options, compression_algorithm) == 0)) {
1452 /* check if algorithm is supported by current channel config */
1453 handle_compression_algorithm_disabled(call, compression_algorithm);
1454 }
1455 /* GRPC_COMPRESS_NONE is always set. */
1456 GPR_DEBUG_ASSERT(call->encodings_accepted_by_peer != 0);
1457 if (GPR_UNLIKELY(!GPR_BITGET(call->encodings_accepted_by_peer,
1458 compression_algorithm))) {
1459 if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) {
1460 handle_compression_algorithm_not_accepted(call, compression_algorithm);
1461 }
1462 }
1463 }
1464 }
1465
receiving_initial_metadata_ready(void * bctlp,grpc_error_handle error)1466 static void receiving_initial_metadata_ready(void* bctlp,
1467 grpc_error_handle error) {
1468 batch_control* bctl = static_cast<batch_control*>(bctlp);
1469 grpc_call* call = bctl->call;
1470
1471 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
1472
1473 if (error == GRPC_ERROR_NONE) {
1474 grpc_metadata_batch* md =
1475 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1476 recv_initial_filter(call, md);
1477
1478 /* TODO(ctiller): this could be moved into recv_initial_filter now */
1479 GPR_TIMER_SCOPE("validate_filtered_metadata", 0);
1480 validate_filtered_metadata(bctl);
1481
1482 if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
1483 call->send_deadline = md->deadline;
1484 }
1485 } else {
1486 if (reinterpret_cast<grpc_error_handle>(
1487 gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
1488 gpr_atm_rel_store(&bctl->batch_error,
1489 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1490 }
1491 cancel_with_error(call, GRPC_ERROR_REF(error));
1492 }
1493
1494 grpc_closure* saved_rsr_closure = nullptr;
1495 while (true) {
1496 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
1497 /* Should only receive initial metadata once */
1498 GPR_ASSERT(rsr_bctlp != 1);
1499 if (rsr_bctlp == 0) {
1500 /* We haven't seen initial metadata and messages before, thus initial
1501 * metadata is received first.
1502 * no_barrier_cas is used, as this function won't access the batch_control
1503 * object saved by receiving_stream_ready() if the initial metadata is
1504 * received first. */
1505 if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
1506 RECV_INITIAL_METADATA_FIRST)) {
1507 break;
1508 }
1509 } else {
1510 /* Already received messages */
1511 saved_rsr_closure =
1512 GRPC_CLOSURE_CREATE(receiving_stream_ready, (batch_control*)rsr_bctlp,
1513 grpc_schedule_on_exec_ctx);
1514 /* No need to modify recv_state */
1515 break;
1516 }
1517 }
1518 if (saved_rsr_closure != nullptr) {
1519 grpc_core::Closure::Run(DEBUG_LOCATION, saved_rsr_closure,
1520 GRPC_ERROR_REF(error));
1521 }
1522
1523 finish_batch_step(bctl);
1524 }
1525
receiving_trailing_metadata_ready(void * bctlp,grpc_error_handle error)1526 static void receiving_trailing_metadata_ready(void* bctlp,
1527 grpc_error_handle error) {
1528 batch_control* bctl = static_cast<batch_control*>(bctlp);
1529 grpc_call* call = bctl->call;
1530 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_trailing_metadata_ready");
1531 grpc_metadata_batch* md =
1532 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1533 recv_trailing_filter(call, md, GRPC_ERROR_REF(error));
1534 finish_batch_step(bctl);
1535 }
1536
finish_batch(void * bctlp,grpc_error_handle error)1537 static void finish_batch(void* bctlp, grpc_error_handle error) {
1538 batch_control* bctl = static_cast<batch_control*>(bctlp);
1539 grpc_call* call = bctl->call;
1540 GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
1541 if (reinterpret_cast<grpc_error_handle>(
1542 gpr_atm_acq_load(&bctl->batch_error)) == GRPC_ERROR_NONE) {
1543 gpr_atm_rel_store(&bctl->batch_error,
1544 reinterpret_cast<gpr_atm>(GRPC_ERROR_REF(error)));
1545 }
1546 if (error != GRPC_ERROR_NONE) {
1547 cancel_with_error(call, GRPC_ERROR_REF(error));
1548 }
1549 finish_batch_step(bctl);
1550 }
1551
free_no_op_completion(void *,grpc_cq_completion * completion)1552 static void free_no_op_completion(void* /*p*/, grpc_cq_completion* completion) {
1553 gpr_free(completion);
1554 }
1555
call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * notify_tag,int is_notify_tag_closure)1556 static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
1557 size_t nops, void* notify_tag,
1558 int is_notify_tag_closure) {
1559 GPR_TIMER_SCOPE("call_start_batch", 0);
1560
1561 size_t i;
1562 const grpc_op* op;
1563 batch_control* bctl;
1564 bool has_send_ops = false;
1565 int num_recv_ops = 0;
1566 grpc_call_error error = GRPC_CALL_OK;
1567 grpc_transport_stream_op_batch* stream_op;
1568 grpc_transport_stream_op_batch_payload* stream_op_payload;
1569
1570 GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
1571
1572 if (nops == 0) {
1573 if (!is_notify_tag_closure) {
1574 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1575 grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
1576 free_no_op_completion, nullptr,
1577 static_cast<grpc_cq_completion*>(
1578 gpr_malloc(sizeof(grpc_cq_completion))));
1579 } else {
1580 grpc_core::Closure::Run(DEBUG_LOCATION,
1581 static_cast<grpc_closure*>(notify_tag),
1582 GRPC_ERROR_NONE);
1583 }
1584 error = GRPC_CALL_OK;
1585 goto done;
1586 }
1587
1588 bctl = reuse_or_allocate_batch_control(call, ops);
1589 if (bctl == nullptr) {
1590 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1591 }
1592 bctl->completion_data.notify_tag.tag = notify_tag;
1593 bctl->completion_data.notify_tag.is_closure =
1594 static_cast<uint8_t>(is_notify_tag_closure != 0);
1595
1596 stream_op = &bctl->op;
1597 stream_op_payload = &call->stream_op_payload;
1598
1599 /* rewrite batch ops into a transport op */
1600 for (i = 0; i < nops; i++) {
1601 op = &ops[i];
1602 if (op->reserved != nullptr) {
1603 error = GRPC_CALL_ERROR;
1604 goto done_with_error;
1605 }
1606 switch (op->op) {
1607 case GRPC_OP_SEND_INITIAL_METADATA: {
1608 /* Flag validation: currently allow no flags */
1609 if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
1610 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1611 goto done_with_error;
1612 }
1613 if (call->sent_initial_metadata) {
1614 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1615 goto done_with_error;
1616 }
1617 // TODO(juanlishen): If the user has already specified a compression
1618 // algorithm by setting the initial metadata with key of
1619 // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
1620 // with the compression algorithm mapped from compression level.
1621 /* process compression level */
1622 grpc_metadata& compression_md = call->compression_md;
1623 compression_md.key = grpc_empty_slice();
1624 compression_md.value = grpc_empty_slice();
1625 compression_md.flags = 0;
1626 size_t additional_metadata_count = 0;
1627 grpc_compression_level effective_compression_level =
1628 GRPC_COMPRESS_LEVEL_NONE;
1629 bool level_set = false;
1630 if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
1631 effective_compression_level =
1632 op->data.send_initial_metadata.maybe_compression_level.level;
1633 level_set = true;
1634 } else {
1635 const grpc_compression_options copts =
1636 grpc_channel_compression_options(call->channel);
1637 if (copts.default_level.is_set) {
1638 level_set = true;
1639 effective_compression_level = copts.default_level.level;
1640 }
1641 }
1642 // Currently, only server side supports compression level setting.
1643 if (level_set && !call->is_client) {
1644 const grpc_compression_algorithm calgo =
1645 compression_algorithm_for_level_locked(
1646 call, effective_compression_level);
1647 // The following metadata will be checked and removed by the message
1648 // compression filter. It will be used as the call's compression
1649 // algorithm.
1650 compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
1651 compression_md.value = grpc_compression_algorithm_slice(calgo);
1652 additional_metadata_count++;
1653 }
1654 if (op->data.send_initial_metadata.count + additional_metadata_count >
1655 INT_MAX) {
1656 error = GRPC_CALL_ERROR_INVALID_METADATA;
1657 goto done_with_error;
1658 }
1659 stream_op->send_initial_metadata = true;
1660 call->sent_initial_metadata = true;
1661 if (!prepare_application_metadata(
1662 call, static_cast<int>(op->data.send_initial_metadata.count),
1663 op->data.send_initial_metadata.metadata, 0, call->is_client,
1664 &compression_md, static_cast<int>(additional_metadata_count))) {
1665 error = GRPC_CALL_ERROR_INVALID_METADATA;
1666 goto done_with_error;
1667 }
1668 /* TODO(ctiller): just make these the same variable? */
1669 if (call->is_client) {
1670 call->metadata_batch[0][0].deadline = call->send_deadline;
1671 }
1672 stream_op_payload->send_initial_metadata.send_initial_metadata =
1673 &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1674 stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
1675 op->flags;
1676 if (call->is_client) {
1677 stream_op_payload->send_initial_metadata.peer_string =
1678 &call->peer_string;
1679 }
1680 has_send_ops = true;
1681 break;
1682 }
1683 case GRPC_OP_SEND_MESSAGE: {
1684 if (!are_write_flags_valid(op->flags)) {
1685 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1686 goto done_with_error;
1687 }
1688 if (op->data.send_message.send_message == nullptr) {
1689 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1690 goto done_with_error;
1691 }
1692 if (call->sending_message) {
1693 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1694 goto done_with_error;
1695 }
1696 uint32_t flags = op->flags;
1697 /* If the outgoing buffer is already compressed, mark it as so in the
1698 flags. These will be picked up by the compression filter and further
1699 (wasteful) attempts at compression skipped. */
1700 if (op->data.send_message.send_message->data.raw.compression >
1701 GRPC_COMPRESS_NONE) {
1702 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
1703 }
1704 stream_op->send_message = true;
1705 call->sending_message = true;
1706 call->sending_stream.Init(
1707 &op->data.send_message.send_message->data.raw.slice_buffer, flags);
1708 stream_op_payload->send_message.send_message.reset(
1709 call->sending_stream.get());
1710 has_send_ops = true;
1711 break;
1712 }
1713 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
1714 /* Flag validation: currently allow no flags */
1715 if (op->flags != 0) {
1716 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1717 goto done_with_error;
1718 }
1719 if (!call->is_client) {
1720 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1721 goto done_with_error;
1722 }
1723 if (call->sent_final_op) {
1724 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1725 goto done_with_error;
1726 }
1727 stream_op->send_trailing_metadata = true;
1728 call->sent_final_op = true;
1729 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1730 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1731 has_send_ops = true;
1732 break;
1733 }
1734 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
1735 /* Flag validation: currently allow no flags */
1736 if (op->flags != 0) {
1737 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1738 goto done_with_error;
1739 }
1740 if (call->is_client) {
1741 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1742 goto done_with_error;
1743 }
1744 if (call->sent_final_op) {
1745 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1746 goto done_with_error;
1747 }
1748 if (op->data.send_status_from_server.trailing_metadata_count >
1749 INT_MAX) {
1750 error = GRPC_CALL_ERROR_INVALID_METADATA;
1751 goto done_with_error;
1752 }
1753 stream_op->send_trailing_metadata = true;
1754 call->sent_final_op = true;
1755 GPR_ASSERT(call->send_extra_metadata_count == 0);
1756 call->send_extra_metadata_count = 1;
1757 call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
1758 op->data.send_status_from_server.status);
1759 grpc_error_handle status_error =
1760 op->data.send_status_from_server.status == GRPC_STATUS_OK
1761 ? GRPC_ERROR_NONE
1762 : grpc_error_set_int(
1763 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1764 "Server returned error"),
1765 GRPC_ERROR_INT_GRPC_STATUS,
1766 static_cast<intptr_t>(
1767 op->data.send_status_from_server.status));
1768 if (op->data.send_status_from_server.status_details != nullptr) {
1769 call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
1770 GRPC_MDSTR_GRPC_MESSAGE,
1771 grpc_slice_ref_internal(
1772 *op->data.send_status_from_server.status_details));
1773 call->send_extra_metadata_count++;
1774 if (status_error != GRPC_ERROR_NONE) {
1775 char* msg = grpc_slice_to_c_string(
1776 GRPC_MDVALUE(call->send_extra_metadata[1].md));
1777 status_error =
1778 grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
1779 grpc_slice_from_copied_string(msg));
1780 gpr_free(msg);
1781 }
1782 }
1783
1784 gpr_atm_rel_store(&call->status_error,
1785 reinterpret_cast<gpr_atm>(status_error));
1786 if (!prepare_application_metadata(
1787 call,
1788 static_cast<int>(
1789 op->data.send_status_from_server.trailing_metadata_count),
1790 op->data.send_status_from_server.trailing_metadata, 1, 1,
1791 nullptr, 0)) {
1792 for (int n = 0; n < call->send_extra_metadata_count; n++) {
1793 GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
1794 }
1795 call->send_extra_metadata_count = 0;
1796 error = GRPC_CALL_ERROR_INVALID_METADATA;
1797 goto done_with_error;
1798 }
1799 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
1800 &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1801 stream_op_payload->send_trailing_metadata.sent =
1802 &call->sent_server_trailing_metadata;
1803 has_send_ops = true;
1804 break;
1805 }
1806 case GRPC_OP_RECV_INITIAL_METADATA: {
1807 /* Flag validation: currently allow no flags */
1808 if (op->flags != 0) {
1809 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1810 goto done_with_error;
1811 }
1812 if (call->received_initial_metadata) {
1813 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1814 goto done_with_error;
1815 }
1816 call->received_initial_metadata = true;
1817 call->buffered_metadata[0] =
1818 op->data.recv_initial_metadata.recv_initial_metadata;
1819 GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
1820 receiving_initial_metadata_ready, bctl,
1821 grpc_schedule_on_exec_ctx);
1822 stream_op->recv_initial_metadata = true;
1823 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
1824 &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1825 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
1826 &call->receiving_initial_metadata_ready;
1827 if (!call->is_client) {
1828 stream_op_payload->recv_initial_metadata.peer_string =
1829 &call->peer_string;
1830 }
1831 ++num_recv_ops;
1832 break;
1833 }
1834 case GRPC_OP_RECV_MESSAGE: {
1835 /* Flag validation: currently allow no flags */
1836 if (op->flags != 0) {
1837 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1838 goto done_with_error;
1839 }
1840 if (call->receiving_message) {
1841 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1842 goto done_with_error;
1843 }
1844 call->receiving_message = true;
1845 stream_op->recv_message = true;
1846 call->receiving_buffer = op->data.recv_message.recv_message;
1847 stream_op_payload->recv_message.recv_message = &call->receiving_stream;
1848 GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
1849 receiving_stream_ready_in_call_combiner, bctl,
1850 grpc_schedule_on_exec_ctx);
1851 stream_op_payload->recv_message.recv_message_ready =
1852 &call->receiving_stream_ready;
1853 ++num_recv_ops;
1854 break;
1855 }
1856 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1857 /* Flag validation: currently allow no flags */
1858 if (op->flags != 0) {
1859 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1860 goto done_with_error;
1861 }
1862 if (!call->is_client) {
1863 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1864 goto done_with_error;
1865 }
1866 if (call->requested_final_op) {
1867 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1868 goto done_with_error;
1869 }
1870 call->requested_final_op = true;
1871 call->buffered_metadata[1] =
1872 op->data.recv_status_on_client.trailing_metadata;
1873 call->final_op.client.status = op->data.recv_status_on_client.status;
1874 call->final_op.client.status_details =
1875 op->data.recv_status_on_client.status_details;
1876 call->final_op.client.error_string =
1877 op->data.recv_status_on_client.error_string;
1878 stream_op->recv_trailing_metadata = true;
1879 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1880 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1881 stream_op_payload->recv_trailing_metadata.collect_stats =
1882 &call->final_info.stats.transport_stream_stats;
1883 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1884 receiving_trailing_metadata_ready, bctl,
1885 grpc_schedule_on_exec_ctx);
1886 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1887 &call->receiving_trailing_metadata_ready;
1888 ++num_recv_ops;
1889 break;
1890 }
1891 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1892 /* Flag validation: currently allow no flags */
1893 if (op->flags != 0) {
1894 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1895 goto done_with_error;
1896 }
1897 if (call->is_client) {
1898 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1899 goto done_with_error;
1900 }
1901 if (call->requested_final_op) {
1902 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1903 goto done_with_error;
1904 }
1905 call->requested_final_op = true;
1906 call->final_op.server.cancelled =
1907 op->data.recv_close_on_server.cancelled;
1908 stream_op->recv_trailing_metadata = true;
1909 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1910 &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1911 stream_op_payload->recv_trailing_metadata.collect_stats =
1912 &call->final_info.stats.transport_stream_stats;
1913 GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
1914 receiving_trailing_metadata_ready, bctl,
1915 grpc_schedule_on_exec_ctx);
1916 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1917 &call->receiving_trailing_metadata_ready;
1918 ++num_recv_ops;
1919 break;
1920 }
1921 }
1922 }
1923
1924 GRPC_CALL_INTERNAL_REF(call, "completion");
1925 if (!is_notify_tag_closure) {
1926 GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
1927 }
1928 bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
1929
1930 if (has_send_ops) {
1931 GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
1932 grpc_schedule_on_exec_ctx);
1933 stream_op->on_complete = &bctl->finish_batch;
1934 }
1935
1936 gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
1937 execute_batch(call, stream_op, &bctl->start_batch);
1938
1939 done:
1940 return error;
1941
1942 done_with_error:
1943 /* reverse any mutations that occurred */
1944 if (stream_op->send_initial_metadata) {
1945 call->sent_initial_metadata = false;
1946 grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1947 }
1948 if (stream_op->send_message) {
1949 call->sending_message = false;
1950 call->sending_stream->Orphan();
1951 }
1952 if (stream_op->send_trailing_metadata) {
1953 call->sent_final_op = false;
1954 grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1955 }
1956 if (stream_op->recv_initial_metadata) {
1957 call->received_initial_metadata = false;
1958 }
1959 if (stream_op->recv_message) {
1960 call->receiving_message = false;
1961 }
1962 if (stream_op->recv_trailing_metadata) {
1963 call->requested_final_op = false;
1964 }
1965 goto done;
1966 }
1967
grpc_call_start_batch(grpc_call * call,const grpc_op * ops,size_t nops,void * tag,void * reserved)1968 grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops,
1969 size_t nops, void* tag, void* reserved) {
1970 grpc_call_error err;
1971
1972 GRPC_API_TRACE(
1973 "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
1974 "reserved=%p)",
1975 5, (call, ops, (unsigned long)nops, tag, reserved));
1976
1977 if (reserved != nullptr) {
1978 err = GRPC_CALL_ERROR;
1979 } else {
1980 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
1981 grpc_core::ExecCtx exec_ctx;
1982 err = call_start_batch(call, ops, nops, tag, 0);
1983 }
1984
1985 return err;
1986 }
1987
grpc_call_start_batch_and_execute(grpc_call * call,const grpc_op * ops,size_t nops,grpc_closure * closure)1988 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
1989 const grpc_op* ops,
1990 size_t nops,
1991 grpc_closure* closure) {
1992 return call_start_batch(call, ops, nops, closure, 1);
1993 }
1994
grpc_call_context_set(grpc_call * call,grpc_context_index elem,void * value,void (* destroy)(void * value))1995 void grpc_call_context_set(grpc_call* call, grpc_context_index elem,
1996 void* value, void (*destroy)(void* value)) {
1997 if (call->context[elem].destroy) {
1998 call->context[elem].destroy(call->context[elem].value);
1999 }
2000 call->context[elem].value = value;
2001 call->context[elem].destroy = destroy;
2002 }
2003
grpc_call_context_get(grpc_call * call,grpc_context_index elem)2004 void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) {
2005 return call->context[elem].value;
2006 }
2007
grpc_call_is_client(grpc_call * call)2008 uint8_t grpc_call_is_client(grpc_call* call) { return call->is_client; }
2009
grpc_call_compression_for_level(grpc_call * call,grpc_compression_level level)2010 grpc_compression_algorithm grpc_call_compression_for_level(
2011 grpc_call* call, grpc_compression_level level) {
2012 grpc_compression_algorithm algo =
2013 compression_algorithm_for_level_locked(call, level);
2014 return algo;
2015 }
2016
grpc_call_error_to_string(grpc_call_error error)2017 const char* grpc_call_error_to_string(grpc_call_error error) {
2018 switch (error) {
2019 case GRPC_CALL_ERROR:
2020 return "GRPC_CALL_ERROR";
2021 case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
2022 return "GRPC_CALL_ERROR_ALREADY_ACCEPTED";
2023 case GRPC_CALL_ERROR_ALREADY_FINISHED:
2024 return "GRPC_CALL_ERROR_ALREADY_FINISHED";
2025 case GRPC_CALL_ERROR_ALREADY_INVOKED:
2026 return "GRPC_CALL_ERROR_ALREADY_INVOKED";
2027 case GRPC_CALL_ERROR_BATCH_TOO_BIG:
2028 return "GRPC_CALL_ERROR_BATCH_TOO_BIG";
2029 case GRPC_CALL_ERROR_INVALID_FLAGS:
2030 return "GRPC_CALL_ERROR_INVALID_FLAGS";
2031 case GRPC_CALL_ERROR_INVALID_MESSAGE:
2032 return "GRPC_CALL_ERROR_INVALID_MESSAGE";
2033 case GRPC_CALL_ERROR_INVALID_METADATA:
2034 return "GRPC_CALL_ERROR_INVALID_METADATA";
2035 case GRPC_CALL_ERROR_NOT_INVOKED:
2036 return "GRPC_CALL_ERROR_NOT_INVOKED";
2037 case GRPC_CALL_ERROR_NOT_ON_CLIENT:
2038 return "GRPC_CALL_ERROR_NOT_ON_CLIENT";
2039 case GRPC_CALL_ERROR_NOT_ON_SERVER:
2040 return "GRPC_CALL_ERROR_NOT_ON_SERVER";
2041 case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE:
2042 return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE";
2043 case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH:
2044 return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
2045 case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
2046 return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
2047 case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
2048 return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
2049 case GRPC_CALL_OK:
2050 return "GRPC_CALL_OK";
2051 }
2052 GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW");
2053 }
2054