• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/surface/filter_stack_call.h"
16 
17 #include <grpc/byte_buffer.h>
18 #include <grpc/compression.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/grpc.h>
21 #include <grpc/impl/call.h>
22 #include <grpc/impl/propagation_bits.h>
23 #include <grpc/slice.h>
24 #include <grpc/slice_buffer.h>
25 #include <grpc/status.h>
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/port_platform.h>
29 #include <grpc/support/string_util.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <stdlib.h>
33 #include <string.h>
34 
35 #include <algorithm>
36 #include <cstdint>
37 #include <string>
38 #include <utility>
39 
40 #include "absl/log/check.h"
41 #include "absl/log/log.h"
42 #include "absl/status/status.h"
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/string_view.h"
45 #include "src/core/channelz/channelz.h"
46 #include "src/core/lib/channel/channel_stack.h"
47 #include "src/core/lib/event_engine/event_engine_context.h"
48 #include "src/core/lib/experiments/experiments.h"
49 #include "src/core/lib/iomgr/call_combiner.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/iomgr/polling_entity.h"
52 #include "src/core/lib/resource_quota/arena.h"
53 #include "src/core/lib/slice/slice_buffer.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/surface/call_utils.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/completion_queue.h"
58 #include "src/core/lib/surface/validate_metadata.h"
59 #include "src/core/lib/transport/error_utils.h"
60 #include "src/core/lib/transport/metadata_batch.h"
61 #include "src/core/lib/transport/transport.h"
62 #include "src/core/server/server_interface.h"
63 #include "src/core/telemetry/call_tracer.h"
64 #include "src/core/telemetry/stats.h"
65 #include "src/core/telemetry/stats_data.h"
66 #include "src/core/util/alloc.h"
67 #include "src/core/util/crash.h"
68 #include "src/core/util/debug_location.h"
69 #include "src/core/util/ref_counted.h"
70 #include "src/core/util/ref_counted_ptr.h"
71 #include "src/core/util/status_helper.h"
72 #include "src/core/util/time_precise.h"
73 
74 namespace grpc_core {
75 
76 // Alias to make this type available in Call implementation without a grpc_core
77 // prefix.
78 using GrpcClosure = Closure;
79 
FilterStackCall(RefCountedPtr<Arena> arena,const grpc_call_create_args & args)80 FilterStackCall::FilterStackCall(RefCountedPtr<Arena> arena,
81                                  const grpc_call_create_args& args)
82     : Call(args.server_transport_data == nullptr, args.send_deadline,
83            std::move(arena)),
84       channel_(args.channel->RefAsSubclass<Channel>()),
85       cq_(args.cq),
86       stream_op_payload_{} {}
87 
Create(grpc_call_create_args * args,grpc_call ** out_call)88 grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args,
89                                           grpc_call** out_call) {
90   Channel* channel = args->channel.get();
91 
92   auto add_init_error = [](grpc_error_handle* composite,
93                            grpc_error_handle new_err) {
94     if (new_err.ok()) return;
95     if (composite->ok()) {
96       *composite = GRPC_ERROR_CREATE("Call creation failed");
97     }
98     *composite = grpc_error_add_child(*composite, new_err);
99   };
100 
101   FilterStackCall* call;
102   grpc_error_handle error;
103   grpc_channel_stack* channel_stack = channel->channel_stack();
104   size_t call_alloc_size =
105       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) +
106       channel_stack->call_stack_size;
107 
108   RefCountedPtr<Arena> arena = channel->call_arena_allocator()->MakeArena();
109   arena->SetContext<grpc_event_engine::experimental::EventEngine>(
110       args->channel->event_engine());
111   call = new (arena->Alloc(call_alloc_size)) FilterStackCall(arena, *args);
112   DCHECK(FromC(call->c_ptr()) == call);
113   DCHECK(FromCallStack(call->call_stack()) == call);
114   *out_call = call->c_ptr();
115   grpc_slice path = grpc_empty_slice();
116   ScopedContext ctx(call);
117   if (call->is_client()) {
118     call->final_op_.client.status_details = nullptr;
119     call->final_op_.client.status = nullptr;
120     call->final_op_.client.error_string = nullptr;
121     global_stats().IncrementClientCallsCreated();
122     path = CSliceRef(args->path->c_slice());
123     call->send_initial_metadata_.Set(HttpPathMetadata(),
124                                      std::move(*args->path));
125     if (args->authority.has_value()) {
126       call->send_initial_metadata_.Set(HttpAuthorityMetadata(),
127                                        std::move(*args->authority));
128     }
129     call->send_initial_metadata_.Set(
130         GrpcRegisteredMethod(), reinterpret_cast<void*>(static_cast<uintptr_t>(
131                                     args->registered_method)));
132     channel_stack->stats_plugin_group->AddClientCallTracers(
133         Slice(CSliceRef(path)), args->registered_method, arena.get());
134   } else {
135     global_stats().IncrementServerCallsCreated();
136     call->final_op_.server.cancelled = nullptr;
137     call->final_op_.server.core_server = args->server;
138     // TODO(yashykt): In the future, we want to also enable stats and trace
139     // collecting from when the call is created at the transport. The idea is
140     // that the transport would create the call tracer and pass it in as part of
141     // the metadata.
142     // TODO(yijiem): OpenCensus and internal Census is still using this way to
143     // set server call tracer. We need to refactor them to stats plugins
144     // (including removing the client channel filters).
145     if (args->server != nullptr &&
146         args->server->server_call_tracer_factory() != nullptr) {
147       auto* server_call_tracer =
148           args->server->server_call_tracer_factory()->CreateNewServerCallTracer(
149               arena.get(), args->server->channel_args());
150       if (server_call_tracer != nullptr) {
151         // Note that we are setting both
152         // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and
153         // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future
154         // promise-based world, we would just a single tracer object for each
155         // stack (call, subchannel_call, server_call.)
156         arena->SetContext<CallTracerAnnotationInterface>(server_call_tracer);
157         arena->SetContext<CallTracerInterface>(server_call_tracer);
158       }
159     }
160     channel_stack->stats_plugin_group->AddServerCallTracers(arena.get());
161   }
162 
163   Call* parent = Call::FromC(args->parent);
164   if (parent != nullptr) {
165     add_init_error(&error, absl_status_to_grpc_error(call->InitParent(
166                                parent, args->propagation_mask)));
167   }
168   // initial refcount dropped by grpc_call_unref
169   grpc_call_element_args call_args = {
170       call->call_stack(),   args->server_transport_data, path,
171       call->start_time(),   call->send_deadline(),       call->arena(),
172       &call->call_combiner_};
173   add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall,
174                                               call, &call_args));
175   // Publish this call to parent only after the call stack has been initialized.
176   if (parent != nullptr) {
177     call->PublishToParent(parent);
178   }
179 
180   if (!error.ok()) {
181     call->CancelWithError(error);
182   }
183   if (args->cq != nullptr) {
184     CHECK(args->pollset_set_alternative == nullptr)
185         << "Only one of 'cq' and 'pollset_set_alternative' should be "
186            "non-nullptr.";
187     GRPC_CQ_INTERNAL_REF(args->cq, "bind");
188     call->pollent_ =
189         grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq));
190   }
191   if (args->pollset_set_alternative != nullptr) {
192     call->pollent_ = grpc_polling_entity_create_from_pollset_set(
193         args->pollset_set_alternative);
194   }
195   if (!grpc_polling_entity_is_empty(&call->pollent_)) {
196     grpc_call_stack_set_pollset_or_pollset_set(call->call_stack(),
197                                                &call->pollent_);
198   }
199 
200   if (call->is_client()) {
201     channelz::ChannelNode* channelz_channel = channel->channelz_node();
202     if (channelz_channel != nullptr) {
203       channelz_channel->RecordCallStarted();
204     }
205   } else if (call->final_op_.server.core_server != nullptr) {
206     channelz::ServerNode* channelz_node =
207         call->final_op_.server.core_server->channelz_node();
208     if (channelz_node != nullptr) {
209       channelz_node->RecordCallStarted();
210     }
211   }
212 
213   if (args->send_deadline != Timestamp::InfFuture()) {
214     call->UpdateDeadline(args->send_deadline);
215   }
216 
217   CSliceUnref(path);
218 
219   return error;
220 }
221 
SetCompletionQueue(grpc_completion_queue * cq)222 void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) {
223   CHECK(cq);
224 
225   if (grpc_polling_entity_pollset_set(&pollent_) != nullptr) {
226     Crash("A pollset_set is already registered for this call.");
227   }
228   cq_ = cq;
229   GRPC_CQ_INTERNAL_REF(cq, "bind");
230   pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
231   grpc_call_stack_set_pollset_or_pollset_set(call_stack(), &pollent_);
232 }
233 
ReleaseCall(void * call,grpc_error_handle)234 void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) {
235   static_cast<FilterStackCall*>(call)->DeleteThis();
236 }
237 
DestroyCall(void * call,grpc_error_handle)238 void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) {
239   auto* c = static_cast<FilterStackCall*>(call);
240   c->recv_initial_metadata_.Clear();
241   c->recv_trailing_metadata_.Clear();
242   c->receiving_slice_buffer_.reset();
243   ParentCall* pc = c->parent_call();
244   if (pc != nullptr) {
245     pc->~ParentCall();
246   }
247   if (c->cq_) {
248     GRPC_CQ_INTERNAL_UNREF(c->cq_, "bind");
249   }
250 
251   grpc_error_handle status_error = c->status_error_.get();
252   grpc_error_get_status(status_error, c->send_deadline(),
253                         &c->final_info_.final_status, nullptr, nullptr,
254                         &(c->final_info_.error_string));
255   c->status_error_.set(absl::OkStatus());
256   c->final_info_.stats.latency =
257       gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time());
258   grpc_call_stack_destroy(c->call_stack(), &c->final_info_,
259                           GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c,
260                                             grpc_schedule_on_exec_ctx));
261 }
262 
ExternalUnref()263 void FilterStackCall::ExternalUnref() {
264   if (GPR_LIKELY(!ext_ref_.Unref())) return;
265 
266   ApplicationCallbackExecCtx callback_exec_ctx;
267   ExecCtx exec_ctx;
268 
269   GRPC_TRACE_LOG(api, INFO) << "grpc_call_unref(c=" << this << ")";
270 
271   MaybeUnpublishFromParent();
272 
273   CHECK(!destroy_called_);
274   destroy_called_ = true;
275   bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0;
276   if (cancel) {
277     CancelWithError(absl::CancelledError());
278   } else {
279     // Unset the call combiner cancellation closure.  This has the
280     // effect of scheduling the previously set cancellation closure, if
281     // any, so that it can release any internal references it may be
282     // holding to the call stack.
283     call_combiner_.SetNotifyOnCancel(nullptr);
284   }
285   InternalUnref("destroy");
286 }
287 
288 // start_batch_closure points to a caller-allocated closure to be used
289 // for entering the call combiner.
ExecuteBatch(grpc_transport_stream_op_batch * batch,grpc_closure * start_batch_closure)290 void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch,
291                                    grpc_closure* start_batch_closure) {
292   // This is called via the call combiner to start sending a batch down
293   // the filter stack.
294   auto execute_batch_in_call_combiner = [](void* arg, grpc_error_handle) {
295     grpc_transport_stream_op_batch* batch =
296         static_cast<grpc_transport_stream_op_batch*>(arg);
297     auto* call =
298         static_cast<FilterStackCall*>(batch->handler_private.extra_arg);
299     grpc_call_element* elem = call->call_elem(0);
300     GRPC_TRACE_LOG(channel, INFO)
301         << "OP[" << elem->filter->name << ":" << elem
302         << "]: " << grpc_transport_stream_op_batch_string(batch, false);
303     elem->filter->start_transport_stream_op_batch(elem, batch);
304   };
305   batch->handler_private.extra_arg = this;
306   GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
307                     grpc_schedule_on_exec_ctx);
308   GRPC_CALL_COMBINER_START(call_combiner(), start_batch_closure,
309                            absl::OkStatus(), "executing batch");
310 }
311 
312 namespace {
313 struct CancelState {
314   FilterStackCall* call;
315   grpc_closure start_batch;
316   grpc_closure finish_batch;
317 };
318 }  // namespace
319 
320 // The on_complete callback used when sending a cancel_stream batch down
321 // the filter stack.  Yields the call combiner when the batch is done.
done_termination(void * arg,grpc_error_handle)322 static void done_termination(void* arg, grpc_error_handle /*error*/) {
323   CancelState* state = static_cast<CancelState*>(arg);
324   GRPC_CALL_COMBINER_STOP(state->call->call_combiner(),
325                           "on_complete for cancel_stream op");
326   state->call->InternalUnref("termination");
327   delete state;
328 }
329 
CancelWithError(grpc_error_handle error)330 void FilterStackCall::CancelWithError(grpc_error_handle error) {
331   if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) {
332     return;
333   }
334   GRPC_TRACE_LOG(call_error, INFO)
335       << "CancelWithError " << (is_client() ? "CLI" : "SVR") << " "
336       << StatusToString(error);
337   ClearPeerString();
338   InternalRef("termination");
339   ResetDeadline();
340   // Inform the call combiner of the cancellation, so that it can cancel
341   // any in-flight asynchronous actions that may be holding the call
342   // combiner.  This ensures that the cancel_stream batch can be sent
343   // down the filter stack in a timely manner.
344   call_combiner_.Cancel(error);
345   CancelState* state = new CancelState;
346   state->call = this;
347   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
348                     grpc_schedule_on_exec_ctx);
349   grpc_transport_stream_op_batch* op =
350       grpc_make_transport_stream_op(&state->finish_batch);
351   op->cancel_stream = true;
352   op->payload->cancel_stream.cancel_error = error;
353   ExecuteBatch(op, &state->start_batch);
354 }
355 
SetFinalStatus(grpc_error_handle error)356 void FilterStackCall::SetFinalStatus(grpc_error_handle error) {
357   GRPC_TRACE_LOG(call_error, INFO)
358       << "set_final_status " << (is_client() ? "CLI" : "SVR") << " "
359       << StatusToString(error);
360   ResetDeadline();
361   if (is_client()) {
362     std::string status_details;
363     grpc_error_get_status(error, send_deadline(), final_op_.client.status,
364                           &status_details, nullptr,
365                           final_op_.client.error_string);
366     *final_op_.client.status_details =
367         grpc_slice_from_cpp_string(std::move(status_details));
368     status_error_.set(error);
369     channelz::ChannelNode* channelz_channel = channel()->channelz_node();
370     if (channelz_channel != nullptr) {
371       if (*final_op_.client.status != GRPC_STATUS_OK) {
372         channelz_channel->RecordCallFailed();
373       } else {
374         channelz_channel->RecordCallSucceeded();
375       }
376     }
377   } else {
378     *final_op_.server.cancelled =
379         !error.ok() || !sent_server_trailing_metadata_;
380     channelz::ServerNode* channelz_node =
381         final_op_.server.core_server->channelz_node();
382     if (channelz_node != nullptr) {
383       if (*final_op_.server.cancelled || !status_error_.ok()) {
384         channelz_node->RecordCallFailed();
385       } else {
386         channelz_node->RecordCallSucceeded();
387       }
388     }
389   }
390 }
391 
PrepareApplicationMetadata(size_t count,grpc_metadata * metadata,bool is_trailing)392 bool FilterStackCall::PrepareApplicationMetadata(size_t count,
393                                                  grpc_metadata* metadata,
394                                                  bool is_trailing) {
395   grpc_metadata_batch* batch =
396       is_trailing ? &send_trailing_metadata_ : &send_initial_metadata_;
397   for (size_t i = 0; i < count; i++) {
398     grpc_metadata* md = &metadata[i];
399     if (!GRPC_LOG_IF_ERROR("validate_metadata",
400                            grpc_validate_header_key_is_legal(md->key))) {
401       return false;
402     } else if (!grpc_is_binary_header_internal(md->key) &&
403                !GRPC_LOG_IF_ERROR(
404                    "validate_metadata",
405                    grpc_validate_header_nonbin_value_is_legal(md->value))) {
406       return false;
407     } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) {
408       // HTTP2 hpack encoding has a maximum limit.
409       return false;
410     } else if (grpc_slice_str_cmp(md->key, "content-length") == 0) {
411       // Filter "content-length metadata"
412       continue;
413     }
414     batch->Append(StringViewFromSlice(md->key), Slice(CSliceRef(md->value)),
415                   [md](absl::string_view error, const Slice& value) {
416                     VLOG(2)
417                         << "Append error: key=" << StringViewFromSlice(md->key)
418                         << " error=" << error
419                         << " value=" << value.as_string_view();
420                   });
421   }
422 
423   return true;
424 }
425 
PublishAppMetadata(grpc_metadata_batch * b,bool is_trailing)426 void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b,
427                                          bool is_trailing) {
428   if (b->count() == 0) return;
429   if (!is_client() && is_trailing) return;
430   if (is_trailing && buffered_metadata_[1] == nullptr) return;
431   grpc_metadata_array* dest;
432   dest = buffered_metadata_[is_trailing];
433   if (dest->count + b->count() > dest->capacity) {
434     dest->capacity =
435         std::max(dest->capacity + b->count(), dest->capacity * 3 / 2);
436     dest->metadata = static_cast<grpc_metadata*>(
437         gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity));
438   }
439   PublishToAppEncoder encoder(dest, b, is_client());
440   b->Encode(&encoder);
441 }
442 
RecvInitialFilter(grpc_metadata_batch * b)443 void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) {
444   ProcessIncomingInitialMetadata(*b);
445   PublishAppMetadata(b, false);
446 }
447 
RecvTrailingFilter(grpc_metadata_batch * b,grpc_error_handle batch_error)448 void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b,
449                                          grpc_error_handle batch_error) {
450   if (!batch_error.ok()) {
451     SetFinalStatus(batch_error);
452   } else {
453     absl::optional<grpc_status_code> grpc_status =
454         b->Take(GrpcStatusMetadata());
455     if (grpc_status.has_value()) {
456       grpc_status_code status_code = *grpc_status;
457       grpc_error_handle error;
458       if (status_code != GRPC_STATUS_OK) {
459         Slice peer = GetPeerString();
460         error = grpc_error_set_int(
461             GRPC_ERROR_CREATE(absl::StrCat("Error received from peer ",
462                                            peer.as_string_view())),
463             StatusIntProperty::kRpcStatus, static_cast<intptr_t>(status_code));
464       }
465       auto grpc_message = b->Take(GrpcMessageMetadata());
466       if (grpc_message.has_value()) {
467         error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage,
468                                    grpc_message->as_string_view());
469       } else if (!error.ok()) {
470         error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, "");
471       }
472       SetFinalStatus(error);
473     } else if (!is_client()) {
474       SetFinalStatus(absl::OkStatus());
475     } else {
476       VLOG(2) << "Received trailing metadata with no error and no status";
477       SetFinalStatus(grpc_error_set_int(GRPC_ERROR_CREATE("No status received"),
478                                         StatusIntProperty::kRpcStatus,
479                                         GRPC_STATUS_UNKNOWN));
480     }
481   }
482   PublishAppMetadata(b, true);
483 }
484 
485 namespace {
BatchSlotForOp(grpc_op_type type)486 size_t BatchSlotForOp(grpc_op_type type) {
487   switch (type) {
488     case GRPC_OP_SEND_INITIAL_METADATA:
489       return 0;
490     case GRPC_OP_SEND_MESSAGE:
491       return 1;
492     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
493     case GRPC_OP_SEND_STATUS_FROM_SERVER:
494       return 2;
495     case GRPC_OP_RECV_INITIAL_METADATA:
496       return 3;
497     case GRPC_OP_RECV_MESSAGE:
498       return 4;
499     case GRPC_OP_RECV_CLOSE_ON_SERVER:
500     case GRPC_OP_RECV_STATUS_ON_CLIENT:
501       return 5;
502   }
503   GPR_UNREACHABLE_CODE(return 123456789);
504 }
505 }  // namespace
506 
ReuseOrAllocateBatchControl(const grpc_op * ops)507 FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl(
508     const grpc_op* ops) {
509   size_t slot_idx = BatchSlotForOp(ops[0].op);
510   BatchControl** pslot = &active_batches_[slot_idx];
511   BatchControl* bctl;
512   if (*pslot != nullptr) {
513     bctl = *pslot;
514     if (bctl->call_ != nullptr) {
515       return nullptr;
516     }
517     bctl->~BatchControl();
518     bctl->op_ = {};
519     new (&bctl->batch_error_) AtomicError();
520   } else {
521     bctl = arena()->New<BatchControl>();
522     *pslot = bctl;
523   }
524   bctl->call_ = this;
525   bctl->call_tracer_ = arena()->GetContext<CallTracerAnnotationInterface>();
526   bctl->op_.payload = &stream_op_payload_;
527   return bctl;
528 }
529 
PostCompletion()530 void FilterStackCall::BatchControl::PostCompletion() {
531   FilterStackCall* call = call_;
532   grpc_error_handle error = batch_error_.get();
533 
534   // On the client side, if final call status is already known (i.e if this op
535   // includes recv_trailing_metadata) and if the call status is known to be
536   // OK, then disregard the batch error to ensure call->receiving_buffer_ is
537   // not cleared.
538   if (op_.recv_trailing_metadata && call->is_client() &&
539       call->status_error_.ok()) {
540     error = absl::OkStatus();
541   }
542 
543   GRPC_TRACE_VLOG(call, 2) << "tag:" << completion_data_.notify_tag.tag
544                            << " batch_error=" << error << " op:"
545                            << grpc_transport_stream_op_batch_string(&op_,
546                                                                     false);
547 
548   if (op_.send_initial_metadata) {
549     call->send_initial_metadata_.Clear();
550   }
551   if (op_.send_message) {
552     if (op_.payload->send_message.stream_write_closed && error.ok()) {
553       error = grpc_error_add_child(
554           error, GRPC_ERROR_CREATE(
555                      "Attempt to send message after stream was closed."));
556     }
557     call->sending_message_ = false;
558     call->send_slice_buffer_.Clear();
559   }
560   if (op_.send_trailing_metadata) {
561     call->send_trailing_metadata_.Clear();
562   }
563 
564   if (!error.ok() && op_.recv_message && *call->receiving_buffer_ != nullptr) {
565     grpc_byte_buffer_destroy(*call->receiving_buffer_);
566     *call->receiving_buffer_ = nullptr;
567   }
568   if (op_.recv_trailing_metadata) {
569     // propagate cancellation to any interested children
570     gpr_atm_rel_store(&call->received_final_op_atm_, 1);
571     call->PropagateCancellationToChildren();
572     error = absl::OkStatus();
573   }
574   batch_error_.set(absl::OkStatus());
575 
576   if (completion_data_.notify_tag.is_closure) {
577     call_ = nullptr;
578     GrpcClosure::Run(
579         DEBUG_LOCATION,
580         static_cast<grpc_closure*>(completion_data_.notify_tag.tag), error);
581     call->InternalUnref("completion");
582   } else {
583     grpc_cq_end_op(
584         call->cq_, completion_data_.notify_tag.tag, error,
585         [](void* user_data, grpc_cq_completion* /*storage*/) {
586           BatchControl* bctl = static_cast<BatchControl*>(user_data);
587           Call* call = bctl->call_;
588           bctl->call_ = nullptr;
589           call->InternalUnref("completion");
590         },
591         this, &completion_data_.cq_completion);
592   }
593 }
594 
FinishStep(PendingOp op)595 void FilterStackCall::BatchControl::FinishStep(PendingOp op) {
596   if (GPR_UNLIKELY(completed_batch_step(op))) {
597     PostCompletion();
598   }
599 }
600 
ProcessDataAfterMetadata()601 void FilterStackCall::BatchControl::ProcessDataAfterMetadata() {
602   FilterStackCall* call = call_;
603   if (!call->receiving_slice_buffer_.has_value()) {
604     *call->receiving_buffer_ = nullptr;
605     call->receiving_message_ = false;
606     FinishStep(PendingOp::kRecvMessage);
607   } else {
608     call->test_only_last_message_flags_ = call->receiving_stream_flags_;
609     if ((call->receiving_stream_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) &&
610         (call->incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) {
611       *call->receiving_buffer_ = grpc_raw_compressed_byte_buffer_create(
612           nullptr, 0, call->incoming_compression_algorithm());
613     } else {
614       *call->receiving_buffer_ = grpc_raw_byte_buffer_create(nullptr, 0);
615     }
616     grpc_slice_buffer_move_into(
617         call->receiving_slice_buffer_->c_slice_buffer(),
618         &(*call->receiving_buffer_)->data.raw.slice_buffer);
619     call->receiving_message_ = false;
620     call->receiving_slice_buffer_.reset();
621     FinishStep(PendingOp::kRecvMessage);
622   }
623 }
624 
ReceivingStreamReady(grpc_error_handle error)625 void FilterStackCall::BatchControl::ReceivingStreamReady(
626     grpc_error_handle error) {
627   GRPC_TRACE_VLOG(call, 2) << "tag:" << completion_data_.notify_tag.tag
628                            << " ReceivingStreamReady error=" << error
629                            << " receiving_slice_buffer.has_value="
630                            << call_->receiving_slice_buffer_.has_value()
631                            << " recv_state="
632                            << gpr_atm_no_barrier_load(&call_->recv_state_);
633   FilterStackCall* call = call_;
634   if (!error.ok()) {
635     call->receiving_slice_buffer_.reset();
636     if (batch_error_.ok()) {
637       batch_error_.set(error);
638     }
639     call->CancelWithError(error);
640   }
641   // If recv_state is kRecvNone, we will save the batch_control
642   // object with rel_cas, and will not use it after the cas. Its corresponding
643   // acq_load is in receiving_initial_metadata_ready()
644   if (!error.ok() || !call->receiving_slice_buffer_.has_value() ||
645       !gpr_atm_rel_cas(&call->recv_state_, kRecvNone,
646                        reinterpret_cast<gpr_atm>(this))) {
647     ProcessDataAfterMetadata();
648   }
649 }
650 
ReceivingInitialMetadataReady(grpc_error_handle error)651 void FilterStackCall::BatchControl::ReceivingInitialMetadataReady(
652     grpc_error_handle error) {
653   FilterStackCall* call = call_;
654 
655   GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_initial_metadata_ready");
656 
657   if (error.ok()) {
658     grpc_metadata_batch* md = &call->recv_initial_metadata_;
659     call->RecvInitialFilter(md);
660 
661     absl::optional<Timestamp> deadline = md->get(GrpcTimeoutMetadata());
662     if (deadline.has_value() && !call->is_client()) {
663       call_->set_send_deadline(*deadline);
664     }
665   } else {
666     if (batch_error_.ok()) {
667       batch_error_.set(error);
668     }
669     call->CancelWithError(error);
670   }
671 
672   grpc_closure* saved_rsr_closure = nullptr;
673   while (true) {
674     gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state_);
675     // Should only receive initial metadata once
676     CHECK_NE(rsr_bctlp, 1);
677     if (rsr_bctlp == 0) {
678       // We haven't seen initial metadata and messages before, thus initial
679       // metadata is received first.
680       // no_barrier_cas is used, as this function won't access the batch_control
681       // object saved by receiving_stream_ready() if the initial metadata is
682       // received first.
683       if (gpr_atm_no_barrier_cas(&call->recv_state_, kRecvNone,
684                                  kRecvInitialMetadataFirst)) {
685         break;
686       }
687     } else {
688       // Already received messages
689       saved_rsr_closure = GRPC_CLOSURE_CREATE(
690           [](void* bctl, grpc_error_handle error) {
691             static_cast<BatchControl*>(bctl)->ReceivingStreamReady(error);
692           },
693           reinterpret_cast<BatchControl*>(rsr_bctlp),
694           grpc_schedule_on_exec_ctx);
695       // No need to modify recv_state
696       break;
697     }
698   }
699   if (saved_rsr_closure != nullptr) {
700     GrpcClosure::Run(DEBUG_LOCATION, saved_rsr_closure, error);
701   }
702 
703   FinishStep(PendingOp::kRecvInitialMetadata);
704 }
705 
ReceivingTrailingMetadataReady(grpc_error_handle error)706 void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady(
707     grpc_error_handle error) {
708   GRPC_CALL_COMBINER_STOP(call_->call_combiner(),
709                           "recv_trailing_metadata_ready");
710   grpc_metadata_batch* md = &call_->recv_trailing_metadata_;
711   call_->RecvTrailingFilter(md, error);
712   FinishStep(PendingOp::kRecvTrailingMetadata);
713 }
714 
FinishBatch(grpc_error_handle error)715 void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) {
716   GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "on_complete");
717   if (batch_error_.ok()) {
718     batch_error_.set(error);
719   }
720   if (!error.ok()) {
721     call_->CancelWithError(error);
722   }
723   FinishStep(PendingOp::kSends);
724 }
725 
StartBatch(const grpc_op * ops,size_t nops,void * notify_tag,bool is_notify_tag_closure)726 grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops,
727                                             void* notify_tag,
728                                             bool is_notify_tag_closure) {
729   GRPC_LATENT_SEE_INNER_SCOPE("FilterStackCall::StartBatch");
730 
731   size_t i;
732   const grpc_op* op;
733   BatchControl* bctl;
734   grpc_call_error error = GRPC_CALL_OK;
735   grpc_transport_stream_op_batch* stream_op;
736   grpc_transport_stream_op_batch_payload* stream_op_payload;
737   uint32_t seen_ops = 0;
738   intptr_t pending_ops = 0;
739 
740   for (i = 0; i < nops; i++) {
741     if (seen_ops & (1u << ops[i].op)) {
742       return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
743     }
744     seen_ops |= (1u << ops[i].op);
745   }
746 
747   if (!is_client() &&
748       (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 &&
749       (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) {
750     LOG(ERROR) << "******************* SEND_STATUS WITH RECV_MESSAGE "
751                   "*******************";
752     return GRPC_CALL_ERROR;
753   }
754 
755   GRPC_CALL_LOG_BATCH(ops, nops);
756 
757   if (nops == 0) {
758     EndOpImmediately(cq_, notify_tag, is_notify_tag_closure);
759     error = GRPC_CALL_OK;
760     goto done;
761   }
762 
763   bctl = ReuseOrAllocateBatchControl(ops);
764   if (bctl == nullptr) {
765     return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
766   }
767   bctl->completion_data_.notify_tag.tag = notify_tag;
768   bctl->completion_data_.notify_tag.is_closure =
769       static_cast<uint8_t>(is_notify_tag_closure != 0);
770 
771   stream_op = &bctl->op_;
772   stream_op_payload = &stream_op_payload_;
773 
774   // rewrite batch ops into a transport op
775   for (i = 0; i < nops; i++) {
776     op = &ops[i];
777     if (op->reserved != nullptr) {
778       error = GRPC_CALL_ERROR;
779       goto done_with_error;
780     }
781     switch (op->op) {
782       case GRPC_OP_SEND_INITIAL_METADATA: {
783         // Flag validation: currently allow no flags
784         if (!AreInitialMetadataFlagsValid(op->flags)) {
785           error = GRPC_CALL_ERROR_INVALID_FLAGS;
786           goto done_with_error;
787         }
788         if (sent_initial_metadata_) {
789           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
790           goto done_with_error;
791         }
792         if (op->data.send_initial_metadata.count > INT_MAX) {
793           error = GRPC_CALL_ERROR_INVALID_METADATA;
794           goto done_with_error;
795         }
796         stream_op->send_initial_metadata = true;
797         sent_initial_metadata_ = true;
798         if (!PrepareApplicationMetadata(op->data.send_initial_metadata.count,
799                                         op->data.send_initial_metadata.metadata,
800                                         false)) {
801           error = GRPC_CALL_ERROR_INVALID_METADATA;
802           goto done_with_error;
803         }
804         PrepareOutgoingInitialMetadata(*op, send_initial_metadata_);
805         // TODO(ctiller): just make these the same variable?
806         if (is_client() && send_deadline() != Timestamp::InfFuture()) {
807           send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline());
808         }
809         if (is_client()) {
810           send_initial_metadata_.Set(
811               WaitForReady(),
812               WaitForReady::ValueType{
813                   (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0,
814                   (op->flags &
815                    GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0});
816         }
817         stream_op_payload->send_initial_metadata.send_initial_metadata =
818             &send_initial_metadata_;
819         pending_ops |= PendingOpMask(PendingOp::kSends);
820         break;
821       }
822       case GRPC_OP_SEND_MESSAGE: {
823         if (!AreWriteFlagsValid(op->flags)) {
824           error = GRPC_CALL_ERROR_INVALID_FLAGS;
825           goto done_with_error;
826         }
827         if (op->data.send_message.send_message == nullptr) {
828           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
829           goto done_with_error;
830         }
831         if (sending_message_) {
832           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
833           goto done_with_error;
834         }
835         uint32_t flags = op->flags;
836         // If the outgoing buffer is already compressed, mark it as so in the
837         // flags. These will be picked up by the compression filter and further
838         // (wasteful) attempts at compression skipped.
839         if (op->data.send_message.send_message->data.raw.compression >
840             GRPC_COMPRESS_NONE) {
841           flags |= GRPC_WRITE_INTERNAL_COMPRESS;
842         }
843         stream_op->send_message = true;
844         sending_message_ = true;
845         send_slice_buffer_.Clear();
846         grpc_slice_buffer_move_into(
847             &op->data.send_message.send_message->data.raw.slice_buffer,
848             send_slice_buffer_.c_slice_buffer());
849         stream_op_payload->send_message.flags = flags;
850         stream_op_payload->send_message.send_message = &send_slice_buffer_;
851         pending_ops |= PendingOpMask(PendingOp::kSends);
852         break;
853       }
854       case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
855         // Flag validation: currently allow no flags
856         if (op->flags != 0) {
857           error = GRPC_CALL_ERROR_INVALID_FLAGS;
858           goto done_with_error;
859         }
860         if (!is_client()) {
861           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
862           goto done_with_error;
863         }
864         if (sent_final_op_) {
865           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
866           goto done_with_error;
867         }
868         stream_op->send_trailing_metadata = true;
869         sent_final_op_ = true;
870         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
871             &send_trailing_metadata_;
872         pending_ops |= PendingOpMask(PendingOp::kSends);
873         break;
874       }
875       case GRPC_OP_SEND_STATUS_FROM_SERVER: {
876         // Flag validation: currently allow no flags
877         if (op->flags != 0) {
878           error = GRPC_CALL_ERROR_INVALID_FLAGS;
879           goto done_with_error;
880         }
881         if (is_client()) {
882           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
883           goto done_with_error;
884         }
885         if (sent_final_op_) {
886           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
887           goto done_with_error;
888         }
889         if (op->data.send_status_from_server.trailing_metadata_count >
890             INT_MAX) {
891           error = GRPC_CALL_ERROR_INVALID_METADATA;
892           goto done_with_error;
893         }
894         stream_op->send_trailing_metadata = true;
895         sent_final_op_ = true;
896 
897         if (!PrepareApplicationMetadata(
898                 op->data.send_status_from_server.trailing_metadata_count,
899                 op->data.send_status_from_server.trailing_metadata, true)) {
900           error = GRPC_CALL_ERROR_INVALID_METADATA;
901           goto done_with_error;
902         }
903 
904         grpc_error_handle status_error =
905             op->data.send_status_from_server.status == GRPC_STATUS_OK
906                 ? absl::OkStatus()
907                 : grpc_error_set_int(
908                       GRPC_ERROR_CREATE("Server returned error"),
909                       StatusIntProperty::kRpcStatus,
910                       static_cast<intptr_t>(
911                           op->data.send_status_from_server.status));
912         if (op->data.send_status_from_server.status_details != nullptr) {
913           send_trailing_metadata_.Set(
914               GrpcMessageMetadata(),
915               Slice(grpc_slice_copy(
916                   *op->data.send_status_from_server.status_details)));
917           if (!status_error.ok()) {
918             status_error = grpc_error_set_str(
919                 status_error, StatusStrProperty::kGrpcMessage,
920                 StringViewFromSlice(
921                     *op->data.send_status_from_server.status_details));
922           }
923         }
924 
925         status_error_.set(status_error);
926 
927         send_trailing_metadata_.Set(GrpcStatusMetadata(),
928                                     op->data.send_status_from_server.status);
929 
930         // Ignore any te metadata key value pairs specified.
931         send_trailing_metadata_.Remove(TeMetadata());
932         stream_op_payload->send_trailing_metadata.send_trailing_metadata =
933             &send_trailing_metadata_;
934         stream_op_payload->send_trailing_metadata.sent =
935             &sent_server_trailing_metadata_;
936         pending_ops |= PendingOpMask(PendingOp::kSends);
937         break;
938       }
939       case GRPC_OP_RECV_INITIAL_METADATA: {
940         // Flag validation: currently allow no flags
941         if (op->flags != 0) {
942           error = GRPC_CALL_ERROR_INVALID_FLAGS;
943           goto done_with_error;
944         }
945         if (received_initial_metadata_) {
946           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
947           goto done_with_error;
948         }
949         received_initial_metadata_ = true;
950         buffered_metadata_[0] =
951             op->data.recv_initial_metadata.recv_initial_metadata;
952         GRPC_CLOSURE_INIT(
953             &receiving_initial_metadata_ready_,
954             [](void* bctl, grpc_error_handle error) {
955               static_cast<BatchControl*>(bctl)->ReceivingInitialMetadataReady(
956                   error);
957             },
958             bctl, grpc_schedule_on_exec_ctx);
959         stream_op->recv_initial_metadata = true;
960         stream_op_payload->recv_initial_metadata.recv_initial_metadata =
961             &recv_initial_metadata_;
962         stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
963             &receiving_initial_metadata_ready_;
964         if (is_client()) {
965           stream_op_payload->recv_initial_metadata.trailing_metadata_available =
966               &is_trailers_only_;
967         }
968         pending_ops |= PendingOpMask(PendingOp::kRecvInitialMetadata);
969         break;
970       }
971       case GRPC_OP_RECV_MESSAGE: {
972         // Flag validation: currently allow no flags
973         if (op->flags != 0) {
974           error = GRPC_CALL_ERROR_INVALID_FLAGS;
975           goto done_with_error;
976         }
977         if (receiving_message_) {
978           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
979           goto done_with_error;
980         }
981         receiving_message_ = true;
982         stream_op->recv_message = true;
983         receiving_slice_buffer_.reset();
984         receiving_buffer_ = op->data.recv_message.recv_message;
985         stream_op_payload->recv_message.recv_message = &receiving_slice_buffer_;
986         receiving_stream_flags_ = 0;
987         stream_op_payload->recv_message.flags = &receiving_stream_flags_;
988         stream_op_payload->recv_message.call_failed_before_recv_message =
989             &call_failed_before_recv_message_;
990         GRPC_CLOSURE_INIT(
991             &receiving_stream_ready_,
992             [](void* bctlp, grpc_error_handle error) {
993               auto* bctl = static_cast<BatchControl*>(bctlp);
994               auto* call = bctl->call_;
995               //  Yields the call combiner before processing the received
996               //  message.
997               GRPC_CALL_COMBINER_STOP(call->call_combiner(),
998                                       "recv_message_ready");
999               bctl->ReceivingStreamReady(error);
1000             },
1001             bctl, grpc_schedule_on_exec_ctx);
1002         stream_op_payload->recv_message.recv_message_ready =
1003             &receiving_stream_ready_;
1004         pending_ops |= PendingOpMask(PendingOp::kRecvMessage);
1005         break;
1006       }
1007       case GRPC_OP_RECV_STATUS_ON_CLIENT: {
1008         // Flag validation: currently allow no flags
1009         if (op->flags != 0) {
1010           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1011           goto done_with_error;
1012         }
1013         if (!is_client()) {
1014           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1015           goto done_with_error;
1016         }
1017         if (requested_final_op_) {
1018           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1019           goto done_with_error;
1020         }
1021         requested_final_op_ = true;
1022         buffered_metadata_[1] =
1023             op->data.recv_status_on_client.trailing_metadata;
1024         final_op_.client.status = op->data.recv_status_on_client.status;
1025         final_op_.client.status_details =
1026             op->data.recv_status_on_client.status_details;
1027         final_op_.client.error_string =
1028             op->data.recv_status_on_client.error_string;
1029         stream_op->recv_trailing_metadata = true;
1030         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1031             &recv_trailing_metadata_;
1032         stream_op_payload->recv_trailing_metadata.collect_stats =
1033             &final_info_.stats.transport_stream_stats;
1034         GRPC_CLOSURE_INIT(
1035             &receiving_trailing_metadata_ready_,
1036             [](void* bctl, grpc_error_handle error) {
1037               static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1038                   error);
1039             },
1040             bctl, grpc_schedule_on_exec_ctx);
1041         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1042             &receiving_trailing_metadata_ready_;
1043         pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1044         break;
1045       }
1046       case GRPC_OP_RECV_CLOSE_ON_SERVER: {
1047         // Flag validation: currently allow no flags
1048         if (op->flags != 0) {
1049           error = GRPC_CALL_ERROR_INVALID_FLAGS;
1050           goto done_with_error;
1051         }
1052         if (is_client()) {
1053           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1054           goto done_with_error;
1055         }
1056         if (requested_final_op_) {
1057           error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1058           goto done_with_error;
1059         }
1060         requested_final_op_ = true;
1061         final_op_.server.cancelled = op->data.recv_close_on_server.cancelled;
1062         stream_op->recv_trailing_metadata = true;
1063         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
1064             &recv_trailing_metadata_;
1065         stream_op_payload->recv_trailing_metadata.collect_stats =
1066             &final_info_.stats.transport_stream_stats;
1067         GRPC_CLOSURE_INIT(
1068             &receiving_trailing_metadata_ready_,
1069             [](void* bctl, grpc_error_handle error) {
1070               static_cast<BatchControl*>(bctl)->ReceivingTrailingMetadataReady(
1071                   error);
1072             },
1073             bctl, grpc_schedule_on_exec_ctx);
1074         stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1075             &receiving_trailing_metadata_ready_;
1076         pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata);
1077         break;
1078       }
1079     }
1080   }
1081 
1082   InternalRef("completion");
1083   if (!is_notify_tag_closure) {
1084     CHECK(grpc_cq_begin_op(cq_, notify_tag));
1085   }
1086   bctl->set_pending_ops(pending_ops);
1087 
1088   if (pending_ops & PendingOpMask(PendingOp::kSends)) {
1089     GRPC_CLOSURE_INIT(
1090         &bctl->finish_batch_,
1091         [](void* bctl, grpc_error_handle error) {
1092           static_cast<BatchControl*>(bctl)->FinishBatch(error);
1093         },
1094         bctl, grpc_schedule_on_exec_ctx);
1095     stream_op->on_complete = &bctl->finish_batch_;
1096   }
1097 
1098   GRPC_TRACE_VLOG(call, 2)
1099       << "BATCH:" << bctl << " START:" << PendingOpString(pending_ops)
1100       << " BATCH:" << grpc_transport_stream_op_batch_string(stream_op, false)
1101       << " (tag:" << bctl->completion_data_.notify_tag.tag << ")";
1102   ExecuteBatch(stream_op, &bctl->start_batch_);
1103 
1104 done:
1105   return error;
1106 
1107 done_with_error:
1108   // reverse any mutations that occurred
1109   if (stream_op->send_initial_metadata) {
1110     sent_initial_metadata_ = false;
1111     send_initial_metadata_.Clear();
1112   }
1113   if (stream_op->send_message) {
1114     sending_message_ = false;
1115   }
1116   if (stream_op->send_trailing_metadata) {
1117     sent_final_op_ = false;
1118     send_trailing_metadata_.Clear();
1119   }
1120   if (stream_op->recv_initial_metadata) {
1121     received_initial_metadata_ = false;
1122   }
1123   if (stream_op->recv_message) {
1124     receiving_message_ = false;
1125   }
1126   if (stream_op->recv_trailing_metadata) {
1127     requested_final_op_ = false;
1128   }
1129   goto done;
1130 }
1131 
GetPeer()1132 char* FilterStackCall::GetPeer() {
1133   Slice peer_slice = GetPeerString();
1134   if (!peer_slice.empty()) {
1135     absl::string_view peer_string_view = peer_slice.as_string_view();
1136     char* peer_string =
1137         static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
1138     memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
1139     peer_string[peer_string_view.size()] = '\0';
1140     return peer_string;
1141   }
1142   char* peer_string = grpc_channel_get_target(channel_->c_ptr());
1143   if (peer_string != nullptr) return peer_string;
1144   return gpr_strdup("unknown");
1145 }
1146 
1147 }  // namespace grpc_core
1148 
grpc_call_create(grpc_call_create_args * args,grpc_call ** out_call)1149 grpc_error_handle grpc_call_create(grpc_call_create_args* args,
1150                                    grpc_call** out_call) {
1151   return grpc_core::FilterStackCall::Create(args, out_call);
1152 }
1153 
grpc_call_from_top_element(grpc_call_element * surface_element)1154 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) {
1155   return grpc_core::FilterStackCall::FromTopElem(surface_element)->c_ptr();
1156 }
1157