1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/surface/filter_stack_call.h"
16
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34
35 #include <algorithm>
36 #include <cstdint>
37 #include <string>
38 #include <utility>
39
40 #include "absl/log/check.h"
41 #include "absl/log/log.h"
42 #include "absl/status/status.h"
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/string_view.h"
45 #include "src/core/channelz/channelz.h"
46 #include "src/core/lib/channel/channel_stack.h"
47 #include "src/core/lib/event_engine/event_engine_context.h"
48 #include "src/core/lib/experiments/experiments.h"
49 #include "src/core/lib/iomgr/call_combiner.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/iomgr/polling_entity.h"
52 #include "src/core/lib/resource_quota/arena.h"
53 #include "src/core/lib/slice/slice_buffer.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/surface/call_utils.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/validate_metadata.h"
59 #include "src/core/lib/transport/error_utils.h"
60 #include "src/core/lib/transport/metadata_batch.h"
61 #include "src/core/lib/transport/transport.h"
62 #include "src/core/server/server_interface.h"
63 #include "src/core/telemetry/call_tracer.h"
64 #include "src/core/telemetry/stats.h"
65 #include "src/core/telemetry/stats_data.h"
66 #include "src/core/util/alloc.h"
67 #include "src/core/util/crash.h"
68 #include "src/core/util/debug_location.h"
69 #include "src/core/util/ref_counted.h"
70 #include "src/core/util/ref_counted_ptr.h"
71 #include "src/core/util/status_helper.h"
72 #include "src/core/util/time_precise.h"
73
74 namespace grpc_core {
75
76 // Alias to make this type available in Call implementation without a grpc_core
77 // prefix.
78 using GrpcClosure = Closure;
79
FilterStackCall(RefCountedPtr<Arena> arena,const grpc_call_create_args & args)80 FilterStackCall::FilterStackCall(RefCountedPtr<Arena> arena,
81 const grpc_call_create_args& args)
82 : Call(args.server_transport_data == nullptr, args.send_deadline,
83 std::move(arena)),
84 channel_(args.channel->RefAsSubclass<Channel>()),
85 cq_(args.cq),
86 stream_op_payload_{} {}
87
Create(grpc_call_create_args * args,grpc_call ** out_call)88 grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
89 grpc_call** out_call) {
90 Channel* channel = args->channel.get();
91
92 auto add_init_error = [](grpc_error_handle* composite,
93 grpc_error_handle new_err) {
94 if (new_err.ok()) return;
95 if (composite->ok()) {
96 *composite = GRPC_ERROR_CREATE("Call creation failed");
97 }
98 *composite = grpc_error_add_child(*composite, new_err);
99 };
100
101 FilterStackCall* call;
102 grpc_error_handle error;
103 grpc_channel_stack* channel_stack = channel->channel_stack();
104 size_t call_alloc_size =
105 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) +
106 channel_stack->call_stack_size;
107
108 RefCountedPtr<Arena> arena = channel->call_arena_allocator()->MakeArena();
109 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
110 args->channel->event_engine());
111 call = new (arena->Alloc(call_alloc_size)) FilterStackCall(arena, *args);
112 DCHECK(FromC(call->c_ptr()) == call);
113 DCHECK(FromCallStack(call->call_stack()) == call);
114 *out_call = call->c_ptr();
115 grpc_slice path = grpc_empty_slice();
116 ScopedContext ctx(call);
117 if (call->is_client()) {
118 call->final_op_.client.status_details = nullptr;
119 call->final_op_.client.status = nullptr;
120 call->final_op_.client.error_string = nullptr;
121 global_stats().IncrementClientCallsCreated();
122 path = CSliceRef(args->path->c_slice());
123 call->send_initial_metadata_.Set(HttpPathMetadata(),
124 std::move(*args->path));
125 if (args->authority.has_value()) {
126 call->send_initial_metadata_.Set(HttpAuthorityMetadata(),
127 std::move(*args->authority));
128 }
129 call->send_initial_metadata_.Set(
130 GrpcRegisteredMethod(), reinterpret_cast<void*>(static_cast<uintptr_t>(
131 args->registered_method)));
132 channel_stack->stats_plugin_group->AddClientCallTracers(
133 Slice(CSliceRef(path)), args->registered_method, arena.get());
134 } else {
135 global_stats().IncrementServerCallsCreated();
136 call->final_op_.server.cancelled = nullptr;
137 call->final_op_.server.core_server = args->server;
138 // TODO(yashykt): In the future, we want to also enable stats and trace
139 // collecting from when the call is created at the transport. The idea is
140 // that the transport would create the call tracer and pass it in as part of
141 // the metadata.
142 // TODO(yijiem): OpenCensus and internal Census is still using this way to
143 // set server call tracer. We need to refactor them to stats plugins
144 // (including removing the client channel filters).
145 if (args->server != nullptr &&
146 args->server->server_call_tracer_factory() != nullptr) {
147 auto* server_call_tracer =
148 args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
149 arena.get(), args->server->channel_args());
150 if (server_call_tracer != nullptr) {
151 // Note that we are setting both
152 // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
153 // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
154 // promise-based world, we would just a single tracer object for each
155 // stack (call, subchannel_call, server_call.)
156 arena->SetContext<CallTracerAnnotationInterface>(server_call_tracer);
157 arena->SetContext<CallTracerInterface>(server_call_tracer);
158 }
159 }
160 channel_stack->stats_plugin_group->AddServerCallTracers(arena.get());
161 }
162
163 Call* parent = Call::FromC(args->parent);
164 if (parent != nullptr) {
165 add_init_error(&error, absl_status_to_grpc_error(call->InitParent(
166 parent, args->propagation_mask)));
167 }
168 // initial refcount dropped by grpc_call_unref
169 grpc_call_element_args call_args = {
170 call->call_stack(), args->server_transport_data, path,
171 call->start_time(), call->send_deadline(), call->arena(),
172 &call->call_combiner_};
173 add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall,
174 call, &call_args));
175 // Publish this call to parent only after the call stack has been initialized.
176 if (parent != nullptr) {
177 call->PublishToParent(parent);
178 }
179
180 if (!error.ok()) {
181 call->CancelWithError(error);
182 }
183 if (args->cq != nullptr) {
184 CHECK(args->pollset_set_alternative == nullptr)
185 << "Only one of 'cq' and 'pollset_set_alternative' should be "
186 "non-nullptr.";
187 GRPC_CQ_INTERNAL_REF(args->cq, "bind");
188 call->pollent_ =
189 grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
190 }
191 if (args->pollset_set_alternative != nullptr) {
192 call->pollent_ = grpc_polling_entity_create_from_pollset_set(
193 args->pollset_set_alternative);
194 }
195 if (!grpc_polling_entity_is_empty(&call->pollent_)) {
196 grpc_call_stack_set_pollset_or_pollset_set(call->call_stack(),
197 &call->pollent_);
198 }
199
200 if (call->is_client()) {
201 channelz::ChannelNode* channelz_channel = channel->channelz_node();
202 if (channelz_channel != nullptr) {
203 channelz_channel->RecordCallStarted();
204 }
205 } else if (call->final_op_.server.core_server != nullptr) {
206 channelz::ServerNode* channelz_node =
207 call->final_op_.server.core_server->channelz_node();
208 if (channelz_node != nullptr) {
209 channelz_node->RecordCallStarted();
210 }
211 }
212
213 if (args->send_deadline != Timestamp::InfFuture()) {
214 call->UpdateDeadline(args->send_deadline);
215 }
216
217 CSliceUnref(path);
218
219 return error;
220 }
221
SetCompletionQueue(grpc_completion_queue * cq)222 void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) {
223 CHECK(cq);
224
225 if (grpc_polling_entity_pollset_set(&pollent_) != nullptr) {
226 Crash("A pollset_set is already registered for this call.");
227 }
228 cq_ = cq;
229 GRPC_CQ_INTERNAL_REF(cq, "bind");
230 pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
231 grpc_call_stack_set_pollset_or_pollset_set(call_stack(), &pollent_);
232 }
233
ReleaseCall(void * call,grpc_error_handle)234 void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) {
235 static_cast<FilterStackCall*>(call)->DeleteThis();
236 }
237
DestroyCall(void * call,grpc_error_handle)238 void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {
239 auto* c = static_cast<FilterStackCall*>(call);
240 c->recv_initial_metadata_.Clear();
241 c->recv_trailing_metadata_.Clear();
242 c->receiving_slice_buffer_.reset();
243 ParentCall* pc = c->parent_call();
244 if (pc != nullptr) {
245 pc->~ParentCall();
246 }
247 if (c->cq_) {
248 GRPC_CQ_INTERNAL_UNREF(c->cq_, "bind");
249 }
250
251 grpc_error_handle status_error = c->status_error_.get();
252 grpc_error_get_status(status_error, c->send_deadline(),
253 &c->final_info_.final_status, nullptr, nullptr,
254 &(c->final_info_.error_string));
255 c->status_error_.set(absl::OkStatus());
256 c->final_info_.stats.latency =
257 gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time());
258 grpc_call_stack_destroy(c->call_stack(), &c->final_info_,
259 GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c,
260 grpc_schedule_on_exec_ctx));
261 }
262
ExternalUnref()263 void FilterStackCall::ExternalUnref() {
264 if (GPR_LIKELY(!ext_ref_.Unref())) return;
265
266 ApplicationCallbackExecCtx callback_exec_ctx;
267 ExecCtx exec_ctx;
268
269 GRPC_TRACE_LOG(api, INFO) << "grpc_call_unref(c=" << this << ")";
270
271 MaybeUnpublishFromParent();
272
273 CHECK(!destroy_called_);
274 destroy_called_ = true;
275 bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0;
276 if (cancel) {
277 CancelWithError(absl::CancelledError());
278 } else {
279 // Unset the call combiner cancellation closure. This has the
280 // effect of scheduling the previously set cancellation closure, if
281 // any, so that it can release any internal references it may be
282 // holding to the call stack.
283 call_combiner_.SetNotifyOnCancel(nullptr);
284 }
285 InternalUnref("destroy");
286 }
287
288 // start_batch_closure points to a caller-allocated closure to be used
289 // for entering the call combiner.
ExecuteBatch(grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)290 void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch,
291 grpc_closure* start_batch_closure) {
292 // This is called via the call combiner to start sending a batch down
293 // the filter stack.
294 auto execute_batch_in_call_combiner = [](void* arg, grpc_error_handle) {
295 grpc_transport_stream_op_batch* batch =
296 static_cast<grpc_transport_stream_op_batch*>(arg);
297 auto* call =
298 static_cast<FilterStackCall*>(batch->handler_private.extra_arg);
299 grpc_call_element* elem = call->call_elem(0);
300 GRPC_TRACE_LOG(channel, INFO)
301 << "OP[" << elem->filter->name << ":" << elem
302 << "]: " << grpc_transport_stream_op_batch_string(batch, false);
303 elem->filter->start_transport_stream_op_batch(elem, batch);
304 };
305 batch->handler_private.extra_arg = this;
306 GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
307 grpc_schedule_on_exec_ctx);
308 GRPC_CALL_COMBINER_START(call_combiner(), start_batch_closure,
309 absl::OkStatus(), "executing batch");
310 }
311
312 namespace {
313 struct CancelState {
314 FilterStackCall* call;
315 grpc_closure start_batch;
316 grpc_closure finish_batch;
317 };
318 } // namespace
319
320 // The on_complete callback used when sending a cancel_stream batch down
321 // the filter stack. Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)322 static void done_termination(void* arg, grpc_error_handle /*error*/) {
323 CancelState* state = static_cast<CancelState*>(arg);
324 GRPC_CALL_COMBINER_STOP(state->call->call_combiner(),
325 "on_complete for cancel_stream op");
326 state->call->InternalUnref("termination");
327 delete state;
328 }
329
CancelWithError(grpc_error_handle error)330 void FilterStackCall::CancelWithError(grpc_error_handle error) {
331 if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) {
332 return;
333 }
334 GRPC_TRACE_LOG(call_error, INFO)
335 << "CancelWithError " << (is_client() ? "CLI" : "SVR") << " "
336 << StatusToString(error);
337 ClearPeerString();
338 InternalRef("termination");
339 ResetDeadline();
340 // Inform the call combiner of the cancellation, so that it can cancel
341 // any in-flight asynchronous actions that may be holding the call
342 // combiner. This ensures that the cancel_stream batch can be sent
343 // down the filter stack in a timely manner.
344 call_combiner_.Cancel(error);
345 CancelState* state = new CancelState;
346 state->call = this;
347 GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
348 grpc_schedule_on_exec_ctx);
349 grpc_transport_stream_op_batch* op =
350 grpc_make_transport_stream_op(&state->finish_batch);
351 op->cancel_stream = true;
352 op->payload->cancel_stream.cancel_error = error;
353 ExecuteBatch(op, &state->start_batch);
354 }
355
SetFinalStatus(grpc_error_handle error)356 void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
357 GRPC_TRACE_LOG(call_error, INFO)
358 << "set_final_status " << (is_client() ? "CLI" : "SVR") << " "
359 << StatusToString(error);
360 ResetDeadline();
361 if (is_client()) {
362 std::string status_details;
363 grpc_error_get_status(error, send_deadline(), final_op_.client.status,
364 &status_details, nullptr,
365 final_op_.client.error_string);
366 *final_op_.client.status_details =
367 grpc_slice_from_cpp_string(std::move(status_details));
368 status_error_.set(error);
369 channelz::ChannelNode* channelz_channel = channel()->channelz_node();
370 if (channelz_channel != nullptr) {
371 if (*final_op_.client.status != GRPC_STATUS_OK) {
372 channelz_channel->RecordCallFailed();
373 } else {
374 channelz_channel->RecordCallSucceeded();
375 }
376 }
377 } else {
378 *final_op_.server.cancelled =
379 !error.ok() || !sent_server_trailing_metadata_;
380 channelz::ServerNode* channelz_node =
381 final_op_.server.core_server->channelz_node();
382 if (channelz_node != nullptr) {
383 if (*final_op_.server.cancelled || !status_error_.ok()) {
384 channelz_node->RecordCallFailed();
385 } else {
386 channelz_node->RecordCallSucceeded();
387 }
388 }
389 }
390 }
391
PrepareApplicationMetadata(size_t count,grpc_metadata * metadata,bool is_trailing)392 bool FilterStackCall::PrepareApplicationMetadata(size_t count,
393 grpc_metadata* metadata,
394 bool is_trailing) {
395 grpc_metadata_batch* batch =
396 is_trailing ? &send_trailing_metadata_ : &send_initial_metadata_;
397 for (size_t i = 0; i < count; i++) {
398 grpc_metadata* md = &metadata[i];
399 if (!GRPC_LOG_IF_ERROR("validate_metadata",
400 grpc_validate_header_key_is_legal(md->key))) {
401 return false;
402 } else if (!grpc_is_binary_header_internal(md->key) &&
403 !GRPC_LOG_IF_ERROR(
404 "validate_metadata",
405 grpc_validate_header_nonbin_value_is_legal(md->value))) {
406 return false;
407 } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
408 // HTTP2 hpack encoding has a maximum limit.
409 return false;
410 } else if (grpc_slice_str_cmp(md->key, "content-length") == 0) {
411 // Filter "content-length metadata"
412 continue;
413 }
414 batch->Append(StringViewFromSlice(md->key), Slice(CSliceRef(md->value)),
415 [md](absl::string_view error, const Slice& value) {
416 VLOG(2)
417 << "Append error: key=" << StringViewFromSlice(md->key)
418 << " error=" << error
419 << " value=" << value.as_string_view();
420 });
421 }
422
423 return true;
424 }
425
PublishAppMetadata(grpc_metadata_batch * b,bool is_trailing)426 void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b,
427 bool is_trailing) {
428 if (b->count() == 0) return;
429 if (!is_client() && is_trailing) return;
430 if (is_trailing && buffered_metadata_[1] == nullptr) return;
431 grpc_metadata_array* dest;
432 dest = buffered_metadata_[is_trailing];
433 if (dest->count + b->count() > dest->capacity) {
434 dest->capacity =
435 std::max(dest->capacity + b->count(), dest->capacity * 3 / 2);
436 dest->metadata = static_cast<grpc_metadata*>(
437 gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
438 }
439 PublishToAppEncoder encoder(dest, b, is_client());
440 b->Encode(&encoder);
441 }
442
RecvInitialFilter(grpc_metadata_batch * b)443 void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) {
444 ProcessIncomingInitialMetadata(*b);
445 PublishAppMetadata(b, false);
446 }
447
RecvTrailingFilter(grpc_metadata_batch * b,grpc_error_handle batch_error)448 void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b,
449 grpc_error_handle batch_error) {
450 if (!batch_error.ok()) {
451 SetFinalStatus(batch_error);
452 } else {
453 absl::optional<grpc_status_code> grpc_status =
454 b->Take(GrpcStatusMetadata());
455 if (grpc_status.has_value()) {
456 grpc_status_code status_code = *grpc_status;
457 grpc_error_handle error;
458 if (status_code != GRPC_STATUS_OK) {
459 Slice peer = GetPeerString();
460 error = grpc_error_set_int(
461 GRPC_ERROR_CREATE(absl::StrCat("Error received from peer ",
462 peer.as_string_view())),
463 StatusIntProperty::kRpcStatus, static_cast<intptr_t>(status_code));
464 }
465 auto grpc_message = b->Take(GrpcMessageMetadata());
466 if (grpc_message.has_value()) {
467 error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage,
468 grpc_message->as_string_view());
469 } else if (!error.ok()) {
470 error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, "");
471 }
472 SetFinalStatus(error);
473 } else if (!is_client()) {
474 SetFinalStatus(absl::OkStatus());
475 } else {
476 VLOG(2) << "Received trailing metadata with no error and no status";
477 SetFinalStatus(grpc_error_set_int(GRPC_ERROR_CREATE("No status received"),
478 StatusIntProperty::kRpcStatus,
479 GRPC_STATUS_UNKNOWN));
480 }
481 }
482 PublishAppMetadata(b, true);
483 }
484
485 namespace {
BatchSlotForOp(grpc_op_type type)486 size_t BatchSlotForOp(grpc_op_type type) {
487 switch (type) {
488 case GRPC_OP_SEND_INITIAL_METADATA:
489 return 0;
490 case GRPC_OP_SEND_MESSAGE:
491 return 1;
492 case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
493 case GRPC_OP_SEND_STATUS_FROM_SERVER:
494 return 2;
495 case GRPC_OP_RECV_INITIAL_METADATA:
496 return 3;
497 case GRPC_OP_RECV_MESSAGE:
498 return 4;
499 case GRPC_OP_RECV_CLOSE_ON_SERVER:
500 case GRPC_OP_RECV_STATUS_ON_CLIENT:
501 return 5;
502 }
503 GPR_UNREACHABLE_CODE(return 123456789);
504 }
505 } // namespace
506
ReuseOrAllocateBatchControl(const grpc_op * ops)507 FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
508 const grpc_op* ops) {
509 size_t slot_idx = BatchSlotForOp(ops[0].op);
510 BatchControl** pslot = &active_batches_[slot_idx];
511 BatchControl* bctl;
512 if (*pslot != nullptr) {
513 bctl = *pslot;
514 if (bctl->call_ != nullptr) {
515 return nullptr;
516 }
517 bctl->~BatchControl();
518 bctl->op_ = {};
519 new (&bctl->batch_error_) AtomicError();
520 } else {
521 bctl = arena()->New<BatchControl>();
522 *pslot = bctl;
523 }
524 bctl->call_ = this;
525 bctl->call_tracer_ = arena()->GetContext<CallTracerAnnotationInterface>();
526 bctl->op_.payload = &stream_op_payload_;
527 return bctl;
528 }
529
PostCompletion()530 void FilterStackCall::BatchControl::PostCompletion() {
531 FilterStackCall* call = call_;
532 grpc_error_handle error = batch_error_.get();
533
534 // On the client side, if final call status is already known (i.e if this op
535 // includes recv_trailing_metadata) and if the call status is known to be
536 // OK, then disregard the batch error to ensure call->receiving_buffer_ is
537 // not cleared.
538 if (op_.recv_trailing_metadata && call->is_client() &&
539 call->status_error_.ok()) {
540 error = absl::OkStatus();
541 }
542
543 GRPC_TRACE_VLOG(call, 2) << "tag:" << completion_data_.notify_tag.tag
544 << " batch_error=" << error << " op:"
545 << grpc_transport_stream_op_batch_string(&op_,
546 false);
547
548 if (op_.send_initial_metadata) {
549 call->send_initial_metadata_.Clear();
550 }
551 if (op_.send_message) {
552 if (op_.payload->send_message.stream_write_closed && error.ok()) {
553 error = grpc_error_add_child(
554 error, GRPC_ERROR_CREATE(
555 "Attempt to send message after stream was closed."));
556 }
557 call->sending_message_ = false;
558 call->send_slice_buffer_.Clear();
559 }
560 if (op_.send_trailing_metadata) {
561 call->send_trailing_metadata_.Clear();
562 }
563
564 if (!error.ok() && op_.recv_message && *call->receiving_buffer_ != nullptr) {
565 grpc_byte_buffer_destroy(*call->receiving_buffer_);
566 *call->receiving_buffer_ = nullptr;
567 }
568 if (op_.recv_trailing_metadata) {
569 // propagate cancellation to any interested children
570 gpr_atm_rel_store(&call->received_final_op_atm_, 1);
571 call->PropagateCancellationToChildren();
572 error = absl::OkStatus();
573 }
574 batch_error_.set(absl::OkStatus());
575
576 if (completion_data_.notify_tag.is_closure) {
577 call_ = nullptr;
578 GrpcClosure::Run(
579 DEBUG_LOCATION,
580 static_cast<grpc_closure*>(completion_data_.notify_tag.tag), error);
581 call->InternalUnref("completion");
582 } else {
583 grpc_cq_end_op(
584 call->cq_, completion_data_.notify_tag.tag, error,
585 [](void* user_data, grpc_cq_completion* /*storage*/) {
586 BatchControl* bctl = static_cast<BatchControl*>(user_data);
587 Call* call = bctl->call_;
588 bctl->call_ = nullptr;
589 call->InternalUnref("completion");
590 },
591 this, &completion_data_.cq_completion);
592 }
593 }
594
FinishStep(PendingOp op)595 void FilterStackCall::BatchControl::FinishStep(PendingOp op) {
596 if (GPR_UNLIKELY(completed_batch_step(op))) {
597 PostCompletion();
598 }
599 }
600
ProcessDataAfterMetadata()601 void FilterStackCall::BatchControl::ProcessDataAfterMetadata() {
602 FilterStackCall* call = call_;
603 if (!call->receiving_slice_buffer_.has_value()) {
604 *call->receiving_buffer_ = nullptr;
605 call->receiving_message_ = false;
606 FinishStep(PendingOp::kRecvMessage);
607 } else {
608 call->test_only_last_message_flags_ = call->receiving_stream_flags_;
609 if ((call->receiving_stream_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) &&
610 (call->incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
611 *call->receiving_buffer_ = grpc_raw_compressed_byte_buffer_create(
612 nullptr, 0, call->incoming_compression_algorithm());
613 } else {
614 *call->receiving_buffer_ = grpc_raw_byte_buffer_create(nullptr, 0);
615 }
616 grpc_slice_buffer_move_into(
617 call->receiving_slice_buffer_->c_slice_buffer(),
618 &(*call->receiving_buffer_)->data.raw.slice_buffer);
619 call->receiving_message_ = false;
620 call->receiving_slice_buffer_.reset();
621 FinishStep(PendingOp::kRecvMessage);
622 }
623 }
624
ReceivingStreamReady(grpc_error_handle error)625 void FilterStackCall::BatchControl::ReceivingStreamReady(
626 grpc_error_handle error) {
627 GRPC_TRACE_VLOG(call, 2) << "tag:" << completion_data_.notify_tag.tag
628 << " ReceivingStreamReady error=" << error
629 << " receiving_slice_buffer.has_value="
630 << call_->receiving_slice_buffer_.has_value()
631 << " recv_state="
632 << gpr_atm_no_barrier_load(&call_->recv_state_);
633 FilterStackCall* call = call_;
634 if (!error.ok()) {
635 call->receiving_slice_buffer_.reset();
636 if (batch_error_.ok()) {
637 batch_error_.set(error);
638 }
639 call->CancelWithError(error);
640 }
641 // If recv_state is kRecvNone, we will save the batch_control
642 // object with rel_cas, and will not use it after the cas. Its corresponding
643 // acq_load is in receiving_initial_metadata_ready()
644 if (!error.ok() || !call->receiving_slice_buffer_.has_value() ||
645 !gpr_atm_rel_cas(&call->recv_state_, kRecvNone,
646 reinterpret_cast<gpr_atm>(this))) {
647 ProcessDataAfterMetadata();
648 }
649 }
650
ReceivingInitialMetadataReady(grpc_error_handle error)651 void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
652 grpc_error_handle error) {
653 FilterStackCall* call = call_;
654
655 GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_initial_metadata_ready");
656
657 if (error.ok()) {
658 grpc_metadata_batch* md = &call->recv_initial_metadata_;
659 call->RecvInitialFilter(md);
660
661 absl::optional<Timestamp> deadline = md->get(GrpcTimeoutMetadata());
662 if (deadline.has_value() && !call->is_client()) {
663 call_->set_send_deadline(*deadline);
664 }
665 } else {
666 if (batch_error_.ok()) {
667 batch_error_.set(error);
668 }
669 call->CancelWithError(error);
670 }
671
672 grpc_closure* saved_rsr_closure = nullptr;
673 while (true) {
674 gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state_);
675 // Should only receive initial metadata once
676 CHECK_NE(rsr_bctlp, 1);
677 if (rsr_bctlp == 0) {
678 // We haven't seen initial metadata and messages before, thus initial
679 // metadata is received first.
680 // no_barrier_cas is used, as this function won't access the batch_control
681 // object saved by receiving_stream_ready() if the initial metadata is
682 // received first.
683 if (gpr_atm_no_barrier_cas(&call->recv_state_, kRecvNone,
684 kRecvInitialMetadataFirst)) {
685 break;
686 }
687 } else {
688 // Already received messages
689 saved_rsr_closure = GRPC_CLOSURE_CREATE(
690 [](void* bctl, grpc_error_handle error) {
691 static_cast<BatchControl*>(bctl)->ReceivingStreamReady(error);
692 },
693 reinterpret_cast<BatchControl*>(rsr_bctlp),
694 grpc_schedule_on_exec_ctx);
695 // No need to modify recv_state
696 break;
697 }
698 }
699 if (saved_rsr_closure != nullptr) {
700 GrpcClosure::Run(DEBUG_LOCATION, saved_rsr_closure, error);
701 }
702
703 FinishStep(PendingOp::kRecvInitialMetadata);
704 }
705
ReceivingTrailingMetadataReady(grpc_error_handle error)706 void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady(
707 grpc_error_handle error) {
708 GRPC_CALL_COMBINER_STOP(call_->call_combiner(),
709 "recv_trailing_metadata_ready");
710 grpc_metadata_batch* md = &call_->recv_trailing_metadata_;
711 call_->RecvTrailingFilter(md, error);
712 FinishStep(PendingOp::kRecvTrailingMetadata);
713 }
714
FinishBatch(grpc_error_handle error)715 void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {
716 GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "on_complete");
717 if (batch_error_.ok()) {
718 batch_error_.set(error);
719 }
720 if (!error.ok()) {
721 call_->CancelWithError(error);
722 }
723 FinishStep(PendingOp::kSends);
724 }
725
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)726 grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
727 void* notify_tag,
728 bool is_notify_tag_closure) {
729 GRPC_LATENT_SEE_INNER_SCOPE("FilterStackCall::StartBatch");
730
731 size_t i;
732 const grpc_op* op;
733 BatchControl* bctl;
734 grpc_call_error error = GRPC_CALL_OK;
735 grpc_transport_stream_op_batch* stream_op;
736 grpc_transport_stream_op_batch_payload* stream_op_payload;
737 uint32_t seen_ops = 0;
738 intptr_t pending_ops = 0;
739
740 for (i = 0; i < nops; i++) {
741 if (seen_ops & (1u << ops[i].op)) {
742 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
743 }
744 seen_ops |= (1u << ops[i].op);
745 }
746
747 if (!is_client() &&
748 (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 &&
749 (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) {
750 LOG(ERROR) << "******************* SEND_STATUS WITH RECV_MESSAGE "
751 "*******************";
752 return GRPC_CALL_ERROR;
753 }
754
755 GRPC_CALL_LOG_BATCH(ops, nops);
756
757 if (nops == 0) {
758 EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
759 error = GRPC_CALL_OK;
760 goto done;
761 }
762
763 bctl = ReuseOrAllocateBatchControl(ops);
764 if (bctl == nullptr) {
765 return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
766 }
767 bctl->completion_data_.notify_tag.tag = notify_tag;
768 bctl->completion_data_.notify_tag.is_closure =
769 static_cast<uint8_t>(is_notify_tag_closure != 0);
770
771 stream_op = &bctl->op_;
772 stream_op_payload = &stream_op_payload_;
773
774 // rewrite batch ops into a transport op
775 for (i = 0; i < nops; i++) {
776 op = &ops[i];
777 if (op->reserved != nullptr) {
778 error = GRPC_CALL_ERROR;
779 goto done_with_error;
780 }
781 switch (op->op) {
782 case GRPC_OP_SEND_INITIAL_METADATA: {
783 // Flag validation: currently allow no flags
784 if (!AreInitialMetadataFlagsValid(op->flags)) {
785 error = GRPC_CALL_ERROR_INVALID_FLAGS;
786 goto done_with_error;
787 }
788 if (sent_initial_metadata_) {
789 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
790 goto done_with_error;
791 }
792 if (op->data.send_initial_metadata.count > INT_MAX) {
793 error = GRPC_CALL_ERROR_INVALID_METADATA;
794 goto done_with_error;
795 }
796 stream_op->send_initial_metadata = true;
797 sent_initial_metadata_ = true;
798 if (!PrepareApplicationMetadata(op->data.send_initial_metadata.count,
799 op->data.send_initial_metadata.metadata,
800 false)) {
801 error = GRPC_CALL_ERROR_INVALID_METADATA;
802 goto done_with_error;
803 }
804 PrepareOutgoingInitialMetadata(*op, send_initial_metadata_);
805 // TODO(ctiller): just make these the same variable?
806 if (is_client() && send_deadline() != Timestamp::InfFuture()) {
807 send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline());
808 }
809 if (is_client()) {
810 send_initial_metadata_.Set(
811 WaitForReady(),
812 WaitForReady::ValueType{
813 (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
814 (op->flags &
815 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
816 }
817 stream_op_payload->send_initial_metadata.send_initial_metadata =
818 &send_initial_metadata_;
819 pending_ops |= PendingOpMask(PendingOp::kSends);
820 break;
821 }
822 case GRPC_OP_SEND_MESSAGE: {
823 if (!AreWriteFlagsValid(op->flags)) {
824 error = GRPC_CALL_ERROR_INVALID_FLAGS;
825 goto done_with_error;
826 }
827 if (op->data.send_message.send_message == nullptr) {
828 error = GRPC_CALL_ERROR_INVALID_MESSAGE;
829 goto done_with_error;
830 }
831 if (sending_message_) {
832 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
833 goto done_with_error;
834 }
835 uint32_t flags = op->flags;
836 // If the outgoing buffer is already compressed, mark it as so in the
837 // flags. These will be picked up by the compression filter and further
838 // (wasteful) attempts at compression skipped.
839 if (op->data.send_message.send_message->data.raw.compression >
840 GRPC_COMPRESS_NONE) {
841 flags |= GRPC_WRITE_INTERNAL_COMPRESS;
842 }
843 stream_op->send_message = true;
844 sending_message_ = true;
845 send_slice_buffer_.Clear();
846 grpc_slice_buffer_move_into(
847 &op->data.send_message.send_message->data.raw.slice_buffer,
848 send_slice_buffer_.c_slice_buffer());
849 stream_op_payload->send_message.flags = flags;
850 stream_op_payload->send_message.send_message = &send_slice_buffer_;
851 pending_ops |= PendingOpMask(PendingOp::kSends);
852 break;
853 }
854 case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
855 // Flag validation: currently allow no flags
856 if (op->flags != 0) {
857 error = GRPC_CALL_ERROR_INVALID_FLAGS;
858 goto done_with_error;
859 }
860 if (!is_client()) {
861 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
862 goto done_with_error;
863 }
864 if (sent_final_op_) {
865 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
866 goto done_with_error;
867 }
868 stream_op->send_trailing_metadata = true;
869 sent_final_op_ = true;
870 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
871 &send_trailing_metadata_;
872 pending_ops |= PendingOpMask(PendingOp::kSends);
873 break;
874 }
875 case GRPC_OP_SEND_STATUS_FROM_SERVER: {
876 // Flag validation: currently allow no flags
877 if (op->flags != 0) {
878 error = GRPC_CALL_ERROR_INVALID_FLAGS;
879 goto done_with_error;
880 }
881 if (is_client()) {
882 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
883 goto done_with_error;
884 }
885 if (sent_final_op_) {
886 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
887 goto done_with_error;
888 }
889 if (op->data.send_status_from_server.trailing_metadata_count >
890 INT_MAX) {
891 error = GRPC_CALL_ERROR_INVALID_METADATA;
892 goto done_with_error;
893 }
894 stream_op->send_trailing_metadata = true;
895 sent_final_op_ = true;
896
897 if (!PrepareApplicationMetadata(
898 op->data.send_status_from_server.trailing_metadata_count,
899 op->data.send_status_from_server.trailing_metadata, true)) {
900 error = GRPC_CALL_ERROR_INVALID_METADATA;
901 goto done_with_error;
902 }
903
904 grpc_error_handle status_error =
905 op->data.send_status_from_server.status == GRPC_STATUS_OK
906 ? absl::OkStatus()
907 : grpc_error_set_int(
908 GRPC_ERROR_CREATE("Server returned error"),
909 StatusIntProperty::kRpcStatus,
910 static_cast<intptr_t>(
911 op->data.send_status_from_server.status));
912 if (op->data.send_status_from_server.status_details != nullptr) {
913 send_trailing_metadata_.Set(
914 GrpcMessageMetadata(),
915 Slice(grpc_slice_copy(
916 *op->data.send_status_from_server.status_details)));
917 if (!status_error.ok()) {
918 status_error = grpc_error_set_str(
919 status_error, StatusStrProperty::kGrpcMessage,
920 StringViewFromSlice(
921 *op->data.send_status_from_server.status_details));
922 }
923 }
924
925 status_error_.set(status_error);
926
927 send_trailing_metadata_.Set(GrpcStatusMetadata(),
928 op->data.send_status_from_server.status);
929
930 // Ignore any te metadata key value pairs specified.
931 send_trailing_metadata_.Remove(TeMetadata());
932 stream_op_payload->send_trailing_metadata.send_trailing_metadata =
933 &send_trailing_metadata_;
934 stream_op_payload->send_trailing_metadata.sent =
935 &sent_server_trailing_metadata_;
936 pending_ops |= PendingOpMask(PendingOp::kSends);
937 break;
938 }
939 case GRPC_OP_RECV_INITIAL_METADATA: {
940 // Flag validation: currently allow no flags
941 if (op->flags != 0) {
942 error = GRPC_CALL_ERROR_INVALID_FLAGS;
943 goto done_with_error;
944 }
945 if (received_initial_metadata_) {
946 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
947 goto done_with_error;
948 }
949 received_initial_metadata_ = true;
950 buffered_metadata_[0] =
951 op->data.recv_initial_metadata.recv_initial_metadata;
952 GRPC_CLOSURE_INIT(
953 &receiving_initial_metadata_ready_,
954 [](void* bctl, grpc_error_handle error) {
955 static_cast<BatchControl*>(bctl)->ReceivingInitialMetadataReady(
956 error);
957 },
958 bctl, grpc_schedule_on_exec_ctx);
959 stream_op->recv_initial_metadata = true;
960 stream_op_payload->recv_initial_metadata.recv_initial_metadata =
961 &recv_initial_metadata_;
962 stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
963 &receiving_initial_metadata_ready_;
964 if (is_client()) {
965 stream_op_payload->recv_initial_metadata.trailing_metadata_available =
966 &is_trailers_only_;
967 }
968 pending_ops |= PendingOpMask(PendingOp::kRecvInitialMetadata);
969 break;
970 }
971 case GRPC_OP_RECV_MESSAGE: {
972 // Flag validation: currently allow no flags
973 if (op->flags != 0) {
974 error = GRPC_CALL_ERROR_INVALID_FLAGS;
975 goto done_with_error;
976 }
977 if (receiving_message_) {
978 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
979 goto done_with_error;
980 }
981 receiving_message_ = true;
982 stream_op->recv_message = true;
983 receiving_slice_buffer_.reset();
984 receiving_buffer_ = op->data.recv_message.recv_message;
985 stream_op_payload->recv_message.recv_message = &receiving_slice_buffer_;
986 receiving_stream_flags_ = 0;
987 stream_op_payload->recv_message.flags = &receiving_stream_flags_;
988 stream_op_payload->recv_message.call_failed_before_recv_message =
989 &call_failed_before_recv_message_;
990 GRPC_CLOSURE_INIT(
991 &receiving_stream_ready_,
992 [](void* bctlp, grpc_error_handle error) {
993 auto* bctl = static_cast<BatchControl*>(bctlp);
994 auto* call = bctl->call_;
995 // Yields the call combiner before processing the received
996 // message.
997 GRPC_CALL_COMBINER_STOP(call->call_combiner(),
998 "recv_message_ready");
999 bctl->ReceivingStreamReady(error);
1000 },
1001 bctl, grpc_schedule_on_exec_ctx);
1002 stream_op_payload->recv_message.recv_message_ready =
1003 &receiving_stream_ready_;
1004 pending_ops |= PendingOpMask(PendingOp::kRecvMessage);
1005 break;
1006 }
1007 case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1008 // Flag validation: currently allow no flags
1009 if (op->flags != 0) {
1010 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1011 goto done_with_error;
1012 }
1013 if (!is_client()) {
1014 error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1015 goto done_with_error;
1016 }
1017 if (requested_final_op_) {
1018 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1019 goto done_with_error;
1020 }
1021 requested_final_op_ = true;
1022 buffered_metadata_[1] =
1023 op->data.recv_status_on_client.trailing_metadata;
1024 final_op_.client.status = op->data.recv_status_on_client.status;
1025 final_op_.client.status_details =
1026 op->data.recv_status_on_client.status_details;
1027 final_op_.client.error_string =
1028 op->data.recv_status_on_client.error_string;
1029 stream_op->recv_trailing_metadata = true;
1030 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1031 &recv_trailing_metadata_;
1032 stream_op_payload->recv_trailing_metadata.collect_stats =
1033 &final_info_.stats.transport_stream_stats;
1034 GRPC_CLOSURE_INIT(
1035 &receiving_trailing_metadata_ready_,
1036 [](void* bctl, grpc_error_handle error) {
1037 static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1038 error);
1039 },
1040 bctl, grpc_schedule_on_exec_ctx);
1041 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1042 &receiving_trailing_metadata_ready_;
1043 pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1044 break;
1045 }
1046 case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1047 // Flag validation: currently allow no flags
1048 if (op->flags != 0) {
1049 error = GRPC_CALL_ERROR_INVALID_FLAGS;
1050 goto done_with_error;
1051 }
1052 if (is_client()) {
1053 error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1054 goto done_with_error;
1055 }
1056 if (requested_final_op_) {
1057 error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1058 goto done_with_error;
1059 }
1060 requested_final_op_ = true;
1061 final_op_.server.cancelled = op->data.recv_close_on_server.cancelled;
1062 stream_op->recv_trailing_metadata = true;
1063 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1064 &recv_trailing_metadata_;
1065 stream_op_payload->recv_trailing_metadata.collect_stats =
1066 &final_info_.stats.transport_stream_stats;
1067 GRPC_CLOSURE_INIT(
1068 &receiving_trailing_metadata_ready_,
1069 [](void* bctl, grpc_error_handle error) {
1070 static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1071 error);
1072 },
1073 bctl, grpc_schedule_on_exec_ctx);
1074 stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1075 &receiving_trailing_metadata_ready_;
1076 pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1077 break;
1078 }
1079 }
1080 }
1081
1082 InternalRef("completion");
1083 if (!is_notify_tag_closure) {
1084 CHECK(grpc_cq_begin_op(cq_, notify_tag));
1085 }
1086 bctl->set_pending_ops(pending_ops);
1087
1088 if (pending_ops & PendingOpMask(PendingOp::kSends)) {
1089 GRPC_CLOSURE_INIT(
1090 &bctl->finish_batch_,
1091 [](void* bctl, grpc_error_handle error) {
1092 static_cast<BatchControl*>(bctl)->FinishBatch(error);
1093 },
1094 bctl, grpc_schedule_on_exec_ctx);
1095 stream_op->on_complete = &bctl->finish_batch_;
1096 }
1097
1098 GRPC_TRACE_VLOG(call, 2)
1099 << "BATCH:" << bctl << " START:" << PendingOpString(pending_ops)
1100 << " BATCH:" << grpc_transport_stream_op_batch_string(stream_op, false)
1101 << " (tag:" << bctl->completion_data_.notify_tag.tag << ")";
1102 ExecuteBatch(stream_op, &bctl->start_batch_);
1103
1104 done:
1105 return error;
1106
1107 done_with_error:
1108 // reverse any mutations that occurred
1109 if (stream_op->send_initial_metadata) {
1110 sent_initial_metadata_ = false;
1111 send_initial_metadata_.Clear();
1112 }
1113 if (stream_op->send_message) {
1114 sending_message_ = false;
1115 }
1116 if (stream_op->send_trailing_metadata) {
1117 sent_final_op_ = false;
1118 send_trailing_metadata_.Clear();
1119 }
1120 if (stream_op->recv_initial_metadata) {
1121 received_initial_metadata_ = false;
1122 }
1123 if (stream_op->recv_message) {
1124 receiving_message_ = false;
1125 }
1126 if (stream_op->recv_trailing_metadata) {
1127 requested_final_op_ = false;
1128 }
1129 goto done;
1130 }
1131
GetPeer()1132 char* FilterStackCall::GetPeer() {
1133 Slice peer_slice = GetPeerString();
1134 if (!peer_slice.empty()) {
1135 absl::string_view peer_string_view = peer_slice.as_string_view();
1136 char* peer_string =
1137 static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
1138 memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
1139 peer_string[peer_string_view.size()] = '\0';
1140 return peer_string;
1141 }
1142 char* peer_string = grpc_channel_get_target(channel_->c_ptr());
1143 if (peer_string != nullptr) return peer_string;
1144 return gpr_strdup("unknown");
1145 }
1146
1147 } // namespace grpc_core
1148
grpc_call_create(grpc_call_create_args * args,grpc_call ** out_call)1149 grpc_error_handle grpc_call_create(grpc_call_create_args* args,
1150 grpc_call** out_call) {
1151 return grpc_core::FilterStackCall::Create(args, out_call);
1152 }
1153
grpc_call_from_top_element(grpc_call_element * surface_element)1154 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
1155 return grpc_core::FilterStackCall::FromTopElem(surface_element)->c_ptr();
1156 }
1157