// // // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include #include "src/core/lib/surface/call.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include "src/core/lib/channel/call_finalization.h" #include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/channel/context.h" #include "src/core/lib/channel/status_util.h" #include "src/core/lib/compression/compression_internal.h" #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" #include "src/core/lib/experiments/experiments.h" #include "src/core/lib/gpr/alloc.h" #include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/cpp_impl_of.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/all_ok.h" #include "src/core/lib/promise/arena_promise.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/latch.h" #include "src/core/lib/promise/map.h" #include "src/core/lib/promise/party.h" #include "src/core/lib/promise/pipe.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/promise/race.h" #include "src/core/lib/promise/seq.h" #include "src/core/lib/promise/status_flag.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call_test_only.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/completion_queue.h" #include "src/core/lib/surface/server_interface.h" #include "src/core/lib/surface/validate_metadata.h" #include "src/core/lib/surface/wait_for_cq_end_op.h" #include "src/core/lib/transport/batch_builder.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" grpc_core::TraceFlag grpc_call_error_trace(false, "call_error"); grpc_core::TraceFlag grpc_compression_trace(false, "compression"); grpc_core::TraceFlag grpc_call_trace(false, "call"); grpc_core::DebugOnlyTraceFlag grpc_call_refcount_trace(false, "call_refcount"); namespace grpc_core { /////////////////////////////////////////////////////////////////////////////// // Call class Call : public CppImplOf { public: Arena* arena() { return arena_; } bool is_client() const { return is_client_; } virtual void ContextSet(grpc_context_index elem, void* value, void (*destroy)(void* value)) = 0; virtual void* ContextGet(grpc_context_index elem) const = 0; virtual bool Completed() = 0; void CancelWithStatus(grpc_status_code status, const char* description); virtual void CancelWithError(grpc_error_handle error) = 0; virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0; char* GetPeer(); virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) = 0; virtual bool failed_before_recv_message() const = 0; virtual bool is_trailers_only() const = 0; virtual absl::string_view GetServerAuthority() const = 0; virtual void ExternalRef() = 0; virtual void ExternalUnref() = 0; virtual void InternalRef(const char* reason) = 0; virtual void InternalUnref(const char* reason) = 0; grpc_compression_algorithm test_only_compression_algorithm() { return incoming_compression_algorithm_; } uint32_t test_only_message_flags() { return test_only_last_message_flags_; } CompressionAlgorithmSet encodings_accepted_by_peer() { return encodings_accepted_by_peer_; } // This should return nullptr for the promise stack (and alternative means // for that functionality be invented) virtual grpc_call_stack* call_stack() = 0; // Return the EventEngine used for this call's async execution. virtual grpc_event_engine::experimental::EventEngine* event_engine() const = 0; protected: // The maximum number of concurrent batches possible. // Based upon the maximum number of individually queueable ops in the batch // api: // - initial metadata send // - message send // - status/close send (depending on client/server) // - initial metadata recv // - message recv // - status/close recv (depending on client/server) static constexpr size_t kMaxConcurrentBatches = 6; struct ParentCall { Mutex child_list_mu; Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr; }; struct ChildCall { explicit ChildCall(Call* parent) : parent(parent) {} Call* parent; /// siblings: children of the same parent form a list, and this list is /// protected under /// parent->mu Call* sibling_next = nullptr; Call* sibling_prev = nullptr; }; Call(Arena* arena, bool is_client, Timestamp send_deadline, RefCountedPtr channel) : channel_(std::move(channel)), arena_(arena), send_deadline_(send_deadline), is_client_(is_client) { GPR_DEBUG_ASSERT(arena_ != nullptr); GPR_DEBUG_ASSERT(channel_ != nullptr); } virtual ~Call() = default; void DeleteThis(); ParentCall* GetOrCreateParentCall(); ParentCall* parent_call(); Channel* channel() const { GPR_DEBUG_ASSERT(channel_ != nullptr); return channel_.get(); } absl::Status InitParent(Call* parent, uint32_t propagation_mask); void PublishToParent(Call* parent); void MaybeUnpublishFromParent(); void PropagateCancellationToChildren(); Timestamp send_deadline() const { return send_deadline_; } void set_send_deadline(Timestamp send_deadline) { send_deadline_ = send_deadline; } Slice GetPeerString() const { MutexLock lock(&peer_mu_); return peer_string_.Ref(); } void SetPeerString(Slice peer_string) { MutexLock lock(&peer_mu_); peer_string_ = std::move(peer_string); } void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); } // TODO(ctiller): cancel_func is for cancellation of the call - filter stack // holds no mutexes here, promise stack does, and so locking is different. // Remove this and cancel directly once promise conversion is done. void ProcessIncomingInitialMetadata(grpc_metadata_batch& md); // Fixup outgoing metadata before sending - adds compression, protects // internal headers against external modification. void PrepareOutgoingInitialMetadata(const grpc_op& op, grpc_metadata_batch& md); void NoteLastMessageFlags(uint32_t flags) { test_only_last_message_flags_ = flags; } grpc_compression_algorithm incoming_compression_algorithm() const { return incoming_compression_algorithm_; } void HandleCompressionAlgorithmDisabled( grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; void HandleCompressionAlgorithmNotAccepted( grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; gpr_cycle_counter start_time() const { return start_time_; } private: RefCountedPtr channel_; Arena* const arena_; std::atomic parent_call_{nullptr}; ChildCall* child_ = nullptr; Timestamp send_deadline_; const bool is_client_; // flag indicating that cancellation is inherited bool cancellation_is_inherited_ = false; // Compression algorithm for *incoming* data grpc_compression_algorithm incoming_compression_algorithm_ = GRPC_COMPRESS_NONE; // Supported encodings (compression algorithms), a bitset. // Always support no compression. CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE}; uint32_t test_only_last_message_flags_ = 0; // Peer name is protected by a mutex because it can be accessed by the // application at the same moment as it is being set by the completion // of the recv_initial_metadata op. The mutex should be mostly uncontended. mutable Mutex peer_mu_; Slice peer_string_; gpr_cycle_counter start_time_ = gpr_get_cycle_counter(); }; Call::ParentCall* Call::GetOrCreateParentCall() { ParentCall* p = parent_call_.load(std::memory_order_acquire); if (p == nullptr) { p = arena_->New(); ParentCall* expected = nullptr; if (!parent_call_.compare_exchange_strong(expected, p, std::memory_order_release, std::memory_order_relaxed)) { p->~ParentCall(); p = expected; } } return p; } Call::ParentCall* Call::parent_call() { return parent_call_.load(std::memory_order_acquire); } absl::Status Call::InitParent(Call* parent, uint32_t propagation_mask) { child_ = arena()->New(parent); parent->InternalRef("child"); GPR_ASSERT(is_client_); GPR_ASSERT(!parent->is_client_); if (propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline_ = std::min(send_deadline_, parent->send_deadline_); } // for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with // GRPC_PROPAGATE_STATS_CONTEXT // TODO(ctiller): This should change to use the appropriate census start_op // call. if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) { if (0 == (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT)) { return absl::UnknownError( "Census tracing propagation requested without Census context " "propagation"); } ContextSet(GRPC_CONTEXT_TRACING, parent->ContextGet(GRPC_CONTEXT_TRACING), nullptr); } else if (propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT) { return absl::UnknownError( "Census context propagation requested without Census tracing " "propagation"); } if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) { cancellation_is_inherited_ = true; } return absl::OkStatus(); } void Call::PublishToParent(Call* parent) { ChildCall* cc = child_; ParentCall* pc = parent->GetOrCreateParentCall(); MutexLock lock(&pc->child_list_mu); if (pc->first_child == nullptr) { pc->first_child = this; cc->sibling_next = cc->sibling_prev = this; } else { cc->sibling_next = pc->first_child; cc->sibling_prev = pc->first_child->child_->sibling_prev; cc->sibling_next->child_->sibling_prev = cc->sibling_prev->child_->sibling_next = this; } if (parent->Completed()) { CancelWithError(absl::CancelledError()); } } void Call::MaybeUnpublishFromParent() { ChildCall* cc = child_; if (cc == nullptr) return; ParentCall* pc = cc->parent->parent_call(); { MutexLock lock(&pc->child_list_mu); if (this == pc->first_child) { pc->first_child = cc->sibling_next; if (this == pc->first_child) { pc->first_child = nullptr; } } cc->sibling_prev->child_->sibling_next = cc->sibling_next; cc->sibling_next->child_->sibling_prev = cc->sibling_prev; } cc->parent->InternalUnref("child"); } void Call::CancelWithStatus(grpc_status_code status, const char* description) { // copying 'description' is needed to ensure the grpc_call_cancel_with_status // guarantee that can be short-lived. // TODO(ctiller): change to // absl::Status(static_cast(status), description) // (ie remove the set_int, set_str). CancelWithError(grpc_error_set_int( grpc_error_set_str( absl::Status(static_cast(status), description), StatusStrProperty::kGrpcMessage, description), StatusIntProperty::kRpcStatus, status)); } void Call::PropagateCancellationToChildren() { ParentCall* pc = parent_call(); if (pc != nullptr) { Call* child; MutexLock lock(&pc->child_list_mu); child = pc->first_child; if (child != nullptr) { do { Call* next_child_call = child->child_->sibling_next; if (child->cancellation_is_inherited_) { child->InternalRef("propagate_cancel"); child->CancelWithError(absl::CancelledError()); child->InternalUnref("propagate_cancel"); } child = next_child_call; } while (child != pc->first_child); } } } char* Call::GetPeer() { Slice peer_slice = GetPeerString(); if (!peer_slice.empty()) { absl::string_view peer_string_view = peer_slice.as_string_view(); char* peer_string = static_cast(gpr_malloc(peer_string_view.size() + 1)); memcpy(peer_string, peer_string_view.data(), peer_string_view.size()); peer_string[peer_string_view.size()] = '\0'; return peer_string; } char* peer_string = grpc_channel_get_target(channel_->c_ptr()); if (peer_string != nullptr) return peer_string; return gpr_strdup("unknown"); } void Call::DeleteThis() { RefCountedPtr channel = std::move(channel_); Arena* arena = arena_; this->~Call(); channel->DestroyArena(arena); } void Call::PrepareOutgoingInitialMetadata(const grpc_op& op, grpc_metadata_batch& md) { // TODO(juanlishen): If the user has already specified a compression // algorithm by setting the initial metadata with key of // GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that // with the compression algorithm mapped from compression level. // process compression level grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE; bool level_set = false; if (op.data.send_initial_metadata.maybe_compression_level.is_set) { effective_compression_level = op.data.send_initial_metadata.maybe_compression_level.level; level_set = true; } else { const grpc_compression_options copts = channel()->compression_options(); if (copts.default_level.is_set) { level_set = true; effective_compression_level = copts.default_level.level; } } // Currently, only server side supports compression level setting. if (level_set && !is_client()) { const grpc_compression_algorithm calgo = encodings_accepted_by_peer().CompressionAlgorithmForLevel( effective_compression_level); // The following metadata will be checked and removed by the message // compression filter. It will be used as the call's compression // algorithm. md.Set(GrpcInternalEncodingRequest(), calgo); } // Ignore any te metadata key value pairs specified. md.Remove(TeMetadata()); // Should never come from applications md.Remove(GrpcLbClientStatsMetadata()); } void Call::ProcessIncomingInitialMetadata(grpc_metadata_batch& md) { Slice* peer_string = md.get_pointer(PeerString()); if (peer_string != nullptr) SetPeerString(peer_string->Ref()); incoming_compression_algorithm_ = md.Take(GrpcEncodingMetadata()).value_or(GRPC_COMPRESS_NONE); encodings_accepted_by_peer_ = md.Take(GrpcAcceptEncodingMetadata()) .value_or(CompressionAlgorithmSet{GRPC_COMPRESS_NONE}); const grpc_compression_options compression_options = channel_->compression_options(); const grpc_compression_algorithm compression_algorithm = incoming_compression_algorithm_; if (GPR_UNLIKELY(!CompressionAlgorithmSet::FromUint32( compression_options.enabled_algorithms_bitset) .IsSet(compression_algorithm))) { // check if algorithm is supported by current channel config HandleCompressionAlgorithmDisabled(compression_algorithm); } // GRPC_COMPRESS_NONE is always set. GPR_DEBUG_ASSERT(encodings_accepted_by_peer_.IsSet(GRPC_COMPRESS_NONE)); if (GPR_UNLIKELY(!encodings_accepted_by_peer_.IsSet(compression_algorithm))) { if (GRPC_TRACE_FLAG_ENABLED(grpc_compression_trace)) { HandleCompressionAlgorithmNotAccepted(compression_algorithm); } } } void Call::HandleCompressionAlgorithmNotAccepted( grpc_compression_algorithm compression_algorithm) { const char* algo_name = nullptr; grpc_compression_algorithm_name(compression_algorithm, &algo_name); gpr_log(GPR_ERROR, "Compression algorithm ('%s') not present in the " "accepted encodings (%s)", algo_name, std::string(encodings_accepted_by_peer_.ToString()).c_str()); } void Call::HandleCompressionAlgorithmDisabled( grpc_compression_algorithm compression_algorithm) { const char* algo_name = nullptr; grpc_compression_algorithm_name(compression_algorithm, &algo_name); std::string error_msg = absl::StrFormat("Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg.c_str()); CancelWithError(grpc_error_set_int(absl::UnimplementedError(error_msg), StatusIntProperty::kRpcStatus, GRPC_STATUS_UNIMPLEMENTED)); } /////////////////////////////////////////////////////////////////////////////// // FilterStackCall // To be removed once promise conversion is complete class FilterStackCall final : public Call { public: ~FilterStackCall() override { for (int i = 0; i < GRPC_CONTEXT_COUNT; ++i) { if (context_[i].destroy) { context_[i].destroy(context_[i].value); } } gpr_free(static_cast(const_cast(final_info_.error_string))); } bool Completed() override { return gpr_atm_acq_load(&received_final_op_atm_) != 0; } // TODO(ctiller): return absl::StatusOr>? static grpc_error_handle Create(grpc_call_create_args* args, grpc_call** out_call); static Call* FromTopElem(grpc_call_element* elem) { return FromCallStack(grpc_call_stack_from_top_element(elem)); } grpc_call_stack* call_stack() override { return reinterpret_cast( reinterpret_cast(this) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this))); } grpc_event_engine::experimental::EventEngine* event_engine() const override { return channel()->event_engine(); } grpc_call_element* call_elem(size_t idx) { return grpc_call_stack_element(call_stack(), idx); } CallCombiner* call_combiner() { return &call_combiner_; } void CancelWithError(grpc_error_handle error) override; void SetCompletionQueue(grpc_completion_queue* cq) override; grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) override; void ExternalRef() override { ext_ref_.Ref(); } void ExternalUnref() override; void InternalRef(const char* reason) override { GRPC_CALL_STACK_REF(call_stack(), reason); } void InternalUnref(const char* reason) override { GRPC_CALL_STACK_UNREF(call_stack(), reason); } void ContextSet(grpc_context_index elem, void* value, void (*destroy)(void* value)) override; void* ContextGet(grpc_context_index elem) const override { return context_[elem].value; } bool is_trailers_only() const override { bool result = is_trailers_only_; GPR_DEBUG_ASSERT(!result || recv_initial_metadata_.TransportSize() == 0); return result; } bool failed_before_recv_message() const override { return call_failed_before_recv_message_; } absl::string_view GetServerAuthority() const override { const Slice* authority_metadata = recv_initial_metadata_.get_pointer(HttpAuthorityMetadata()); if (authority_metadata == nullptr) return ""; return authority_metadata->as_string_view(); } static size_t InitialSizeEstimate() { return sizeof(FilterStackCall) + sizeof(BatchControl) * kMaxConcurrentBatches; } private: class ScopedContext : public promise_detail::Context { public: explicit ScopedContext(FilterStackCall* call) : promise_detail::Context(call->arena()) {} }; static constexpr gpr_atm kRecvNone = 0; static constexpr gpr_atm kRecvInitialMetadataFirst = 1; enum class PendingOp { kRecvMessage, kRecvInitialMetadata, kRecvTrailingMetadata, kSends }; static intptr_t PendingOpMask(PendingOp op) { return static_cast(1) << static_cast(op); } static std::string PendingOpString(intptr_t pending_ops) { std::vector pending_op_strings; if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) { pending_op_strings.push_back("kRecvMessage"); } if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) { pending_op_strings.push_back("kRecvInitialMetadata"); } if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) { pending_op_strings.push_back("kRecvTrailingMetadata"); } if (pending_ops & PendingOpMask(PendingOp::kSends)) { pending_op_strings.push_back("kSends"); } return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}"); } struct BatchControl { FilterStackCall* call_ = nullptr; CallTracerAnnotationInterface* call_tracer_ = nullptr; grpc_transport_stream_op_batch op_; // Share memory for cq_completion and notify_tag as they are never needed // simultaneously. Each byte used in this data structure count as six bytes // per call, so any savings we can make are worthwhile, // We use notify_tag to determine whether or not to send notification to the // completion queue. Once we've made that determination, we can reuse the // memory for cq_completion. union { grpc_cq_completion cq_completion; struct { // Any given op indicates completion by either (a) calling a closure or // (b) sending a notification on the call's completion queue. If // \a is_closure is true, \a tag indicates a closure to be invoked; // otherwise, \a tag indicates the tag to be used in the notification to // be sent to the completion queue. void* tag; bool is_closure; } notify_tag; } completion_data_; grpc_closure start_batch_; grpc_closure finish_batch_; std::atomic ops_pending_{0}; AtomicError batch_error_; void set_pending_ops(uintptr_t ops) { ops_pending_.store(ops, std::memory_order_release); } bool completed_batch_step(PendingOp op) { auto mask = PendingOpMask(op); auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel); if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "BATCH:%p COMPLETE:%s REMAINING:%s (tag:%p)", this, PendingOpString(mask).c_str(), PendingOpString(r & ~mask).c_str(), completion_data_.notify_tag.tag); } GPR_ASSERT((r & mask) != 0); return r == mask; } void PostCompletion(); void FinishStep(PendingOp op); void ProcessDataAfterMetadata(); void ReceivingStreamReady(grpc_error_handle error); void ReceivingInitialMetadataReady(grpc_error_handle error); void ReceivingTrailingMetadataReady(grpc_error_handle error); void FinishBatch(grpc_error_handle error); }; FilterStackCall(Arena* arena, const grpc_call_create_args& args) : Call(arena, args.server_transport_data == nullptr, args.send_deadline, args.channel->Ref()), cq_(args.cq), stream_op_payload_(context_) {} static void ReleaseCall(void* call, grpc_error_handle); static void DestroyCall(void* call, grpc_error_handle); static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) { return reinterpret_cast( reinterpret_cast(call_stack) - GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall))); } void ExecuteBatch(grpc_transport_stream_op_batch* batch, grpc_closure* start_batch_closure); void SetFinalStatus(grpc_error_handle error); BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops); bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata, bool is_trailing); void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing); void RecvInitialFilter(grpc_metadata_batch* b); void RecvTrailingFilter(grpc_metadata_batch* b, grpc_error_handle batch_error); RefCount ext_ref_; CallCombiner call_combiner_; grpc_completion_queue* cq_; grpc_polling_entity pollent_; /// has grpc_call_unref been called bool destroy_called_ = false; // Trailers-only response status bool is_trailers_only_ = false; /// which ops are in-flight bool sent_initial_metadata_ = false; bool sending_message_ = false; bool sent_final_op_ = false; bool received_initial_metadata_ = false; bool receiving_message_ = false; bool requested_final_op_ = false; gpr_atm received_final_op_atm_ = 0; BatchControl* active_batches_[kMaxConcurrentBatches] = {}; grpc_transport_stream_op_batch_payload stream_op_payload_; // first idx: is_receiving, second idx: is_trailing grpc_metadata_batch send_initial_metadata_; grpc_metadata_batch send_trailing_metadata_; grpc_metadata_batch recv_initial_metadata_; grpc_metadata_batch recv_trailing_metadata_; // Buffered read metadata waiting to be returned to the application. // Element 0 is initial metadata, element 1 is trailing metadata. grpc_metadata_array* buffered_metadata_[2] = {}; // Call data useful used for reporting. Only valid after the call has // completed grpc_call_final_info final_info_; // Contexts for various subsystems (security, tracing, ...). grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; SliceBuffer send_slice_buffer_; absl::optional receiving_slice_buffer_; uint32_t receiving_stream_flags_; bool call_failed_before_recv_message_ = false; grpc_byte_buffer** receiving_buffer_ = nullptr; grpc_slice receiving_slice_ = grpc_empty_slice(); grpc_closure receiving_stream_ready_; grpc_closure receiving_initial_metadata_ready_; grpc_closure receiving_trailing_metadata_ready_; // Status about operation of call bool sent_server_trailing_metadata_ = false; gpr_atm cancelled_with_error_ = 0; grpc_closure release_call_; union { struct { grpc_status_code* status; grpc_slice* status_details; const char** error_string; } client; struct { int* cancelled; // backpointer to owning server if this is a server side call. ServerInterface* core_server; } server; } final_op_; AtomicError status_error_; // recv_state can contain one of the following values: // RECV_NONE : : no initial metadata and messages received // RECV_INITIAL_METADATA_FIRST : received initial metadata first // a batch_control* : received messages first // +------1------RECV_NONE------3-----+ // | | // | | // v v // RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp // | ^ | ^ // | | | | // +-----2-----+ +-----4-----+ // For 1, 4: See receiving_initial_metadata_ready() function // For 2, 3: See receiving_stream_ready() function gpr_atm recv_state_ = 0; }; grpc_error_handle FilterStackCall::Create(grpc_call_create_args* args, grpc_call** out_call) { Channel* channel = args->channel.get(); auto add_init_error = [](grpc_error_handle* composite, grpc_error_handle new_err) { if (new_err.ok()) return; if (composite->ok()) { *composite = GRPC_ERROR_CREATE("Call creation failed"); } *composite = grpc_error_add_child(*composite, new_err); }; FilterStackCall* call; grpc_error_handle error; grpc_channel_stack* channel_stack = channel->channel_stack(); size_t call_alloc_size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)) + channel_stack->call_stack_size; Arena* arena = channel->CreateArena(); call = new (arena->Alloc(call_alloc_size)) FilterStackCall(arena, *args); GPR_DEBUG_ASSERT(FromC(call->c_ptr()) == call); GPR_DEBUG_ASSERT(FromCallStack(call->call_stack()) == call); *out_call = call->c_ptr(); grpc_slice path = grpc_empty_slice(); ScopedContext ctx(call); if (call->is_client()) { call->final_op_.client.status_details = nullptr; call->final_op_.client.status = nullptr; call->final_op_.client.error_string = nullptr; global_stats().IncrementClientCallsCreated(); path = CSliceRef(args->path->c_slice()); call->send_initial_metadata_.Set(HttpPathMetadata(), std::move(*args->path)); if (args->authority.has_value()) { call->send_initial_metadata_.Set(HttpAuthorityMetadata(), std::move(*args->authority)); } call->send_initial_metadata_.Set( GrpcRegisteredMethod(), reinterpret_cast(static_cast( args->registered_method))); channel_stack->stats_plugin_group->AddClientCallTracers( Slice(CSliceRef(path)), args->registered_method, call->context_); } else { global_stats().IncrementServerCallsCreated(); call->final_op_.server.cancelled = nullptr; call->final_op_.server.core_server = args->server; // TODO(yashykt): In the future, we want to also enable stats and trace // collecting from when the call is created at the transport. The idea is // that the transport would create the call tracer and pass it in as part of // the metadata. // TODO(yijiem): OpenCensus and internal Census is still using this way to // set server call tracer. We need to refactor them to stats plugins // (including removing the client channel filters). if (args->server != nullptr && args->server->server_call_tracer_factory() != nullptr) { auto* server_call_tracer = args->server->server_call_tracer_factory()->CreateNewServerCallTracer( arena, args->server->channel_args()); if (server_call_tracer != nullptr) { // Note that we are setting both // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future // promise-based world, we would just a single tracer object for each // stack (call, subchannel_call, server_call.) call->ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, server_call_tracer, nullptr); call->ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); } } channel_stack->stats_plugin_group->AddServerCallTracers(call->context_); } Call* parent = Call::FromC(args->parent); if (parent != nullptr) { add_init_error(&error, absl_status_to_grpc_error(call->InitParent( parent, args->propagation_mask))); } // initial refcount dropped by grpc_call_unref grpc_call_element_args call_args = { call->call_stack(), args->server_transport_data, call->context_, path, call->start_time(), call->send_deadline(), call->arena(), &call->call_combiner_}; add_init_error(&error, grpc_call_stack_init(channel_stack, 1, DestroyCall, call, &call_args)); // Publish this call to parent only after the call stack has been initialized. if (parent != nullptr) { call->PublishToParent(parent); } if (!error.ok()) { call->CancelWithError(error); } if (args->cq != nullptr) { GPR_ASSERT(args->pollset_set_alternative == nullptr && "Only one of 'cq' and 'pollset_set_alternative' should be " "non-nullptr."); GRPC_CQ_INTERNAL_REF(args->cq, "bind"); call->pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(args->cq)); } if (args->pollset_set_alternative != nullptr) { call->pollent_ = grpc_polling_entity_create_from_pollset_set( args->pollset_set_alternative); } if (!grpc_polling_entity_is_empty(&call->pollent_)) { grpc_call_stack_set_pollset_or_pollset_set(call->call_stack(), &call->pollent_); } if (call->is_client()) { channelz::ChannelNode* channelz_channel = channel->channelz_node(); if (channelz_channel != nullptr) { channelz_channel->RecordCallStarted(); } } else if (call->final_op_.server.core_server != nullptr) { channelz::ServerNode* channelz_node = call->final_op_.server.core_server->channelz_node(); if (channelz_node != nullptr) { channelz_node->RecordCallStarted(); } } CSliceUnref(path); return error; } void FilterStackCall::SetCompletionQueue(grpc_completion_queue* cq) { GPR_ASSERT(cq); if (grpc_polling_entity_pollset_set(&pollent_) != nullptr) { Crash("A pollset_set is already registered for this call."); } cq_ = cq; GRPC_CQ_INTERNAL_REF(cq, "bind"); pollent_ = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); grpc_call_stack_set_pollset_or_pollset_set(call_stack(), &pollent_); } void FilterStackCall::ReleaseCall(void* call, grpc_error_handle /*error*/) { static_cast(call)->DeleteThis(); } void FilterStackCall::DestroyCall(void* call, grpc_error_handle /*error*/) { auto* c = static_cast(call); c->recv_initial_metadata_.Clear(); c->recv_trailing_metadata_.Clear(); c->receiving_slice_buffer_.reset(); ParentCall* pc = c->parent_call(); if (pc != nullptr) { pc->~ParentCall(); } if (c->cq_) { GRPC_CQ_INTERNAL_UNREF(c->cq_, "bind"); } grpc_error_handle status_error = c->status_error_.get(); grpc_error_get_status(status_error, c->send_deadline(), &c->final_info_.final_status, nullptr, nullptr, &(c->final_info_.error_string)); c->status_error_.set(absl::OkStatus()); c->final_info_.stats.latency = gpr_cycle_counter_sub(gpr_get_cycle_counter(), c->start_time()); grpc_call_stack_destroy(c->call_stack(), &c->final_info_, GRPC_CLOSURE_INIT(&c->release_call_, ReleaseCall, c, grpc_schedule_on_exec_ctx)); } void FilterStackCall::ExternalUnref() { if (GPR_LIKELY(!ext_ref_.Unref())) return; ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; GRPC_API_TRACE("grpc_call_unref(c=%p)", 1, (this)); MaybeUnpublishFromParent(); GPR_ASSERT(!destroy_called_); destroy_called_ = true; bool cancel = gpr_atm_acq_load(&received_final_op_atm_) == 0; if (cancel) { CancelWithError(absl::CancelledError()); } else { // Unset the call combiner cancellation closure. This has the // effect of scheduling the previously set cancellation closure, if // any, so that it can release any internal references it may be // holding to the call stack. call_combiner_.SetNotifyOnCancel(nullptr); } InternalUnref("destroy"); } // start_batch_closure points to a caller-allocated closure to be used // for entering the call combiner. void FilterStackCall::ExecuteBatch(grpc_transport_stream_op_batch* batch, grpc_closure* start_batch_closure) { // This is called via the call combiner to start sending a batch down // the filter stack. auto execute_batch_in_call_combiner = [](void* arg, grpc_error_handle) { grpc_transport_stream_op_batch* batch = static_cast(arg); auto* call = static_cast(batch->handler_private.extra_arg); grpc_call_element* elem = call->call_elem(0); GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); elem->filter->start_transport_stream_op_batch(elem, batch); }; batch->handler_private.extra_arg = this; GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, grpc_schedule_on_exec_ctx); GRPC_CALL_COMBINER_START(call_combiner(), start_batch_closure, absl::OkStatus(), "executing batch"); } namespace { struct CancelState { FilterStackCall* call; grpc_closure start_batch; grpc_closure finish_batch; }; } // namespace // The on_complete callback used when sending a cancel_stream batch down // the filter stack. Yields the call combiner when the batch is done. static void done_termination(void* arg, grpc_error_handle /*error*/) { CancelState* state = static_cast(arg); GRPC_CALL_COMBINER_STOP(state->call->call_combiner(), "on_complete for cancel_stream op"); state->call->InternalUnref("termination"); delete state; } void FilterStackCall::CancelWithError(grpc_error_handle error) { if (!gpr_atm_rel_cas(&cancelled_with_error_, 0, 1)) { return; } ClearPeerString(); InternalRef("termination"); // Inform the call combiner of the cancellation, so that it can cancel // any in-flight asynchronous actions that may be holding the call // combiner. This ensures that the cancel_stream batch can be sent // down the filter stack in a timely manner. call_combiner_.Cancel(error); CancelState* state = new CancelState; state->call = this; GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, grpc_schedule_on_exec_ctx); grpc_transport_stream_op_batch* op = grpc_make_transport_stream_op(&state->finish_batch); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; ExecuteBatch(op, &state->start_batch); } void FilterStackCall::SetFinalStatus(grpc_error_handle error) { if (GRPC_TRACE_FLAG_ENABLED(grpc_call_error_trace)) { gpr_log(GPR_DEBUG, "set_final_status %s %s", is_client() ? "CLI" : "SVR", StatusToString(error).c_str()); } if (is_client()) { std::string status_details; grpc_error_get_status(error, send_deadline(), final_op_.client.status, &status_details, nullptr, final_op_.client.error_string); *final_op_.client.status_details = grpc_slice_from_cpp_string(std::move(status_details)); status_error_.set(error); channelz::ChannelNode* channelz_channel = channel()->channelz_node(); if (channelz_channel != nullptr) { if (*final_op_.client.status != GRPC_STATUS_OK) { channelz_channel->RecordCallFailed(); } else { channelz_channel->RecordCallSucceeded(); } } } else { *final_op_.server.cancelled = !error.ok() || !sent_server_trailing_metadata_; channelz::ServerNode* channelz_node = final_op_.server.core_server->channelz_node(); if (channelz_node != nullptr) { if (*final_op_.server.cancelled || !status_error_.ok()) { channelz_node->RecordCallFailed(); } else { channelz_node->RecordCallSucceeded(); } } } } bool FilterStackCall::PrepareApplicationMetadata(size_t count, grpc_metadata* metadata, bool is_trailing) { grpc_metadata_batch* batch = is_trailing ? &send_trailing_metadata_ : &send_initial_metadata_; for (size_t i = 0; i < count; i++) { grpc_metadata* md = &metadata[i]; if (!GRPC_LOG_IF_ERROR("validate_metadata", grpc_validate_header_key_is_legal(md->key))) { return false; } else if (!grpc_is_binary_header_internal(md->key) && !GRPC_LOG_IF_ERROR( "validate_metadata", grpc_validate_header_nonbin_value_is_legal(md->value))) { return false; } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) { // HTTP2 hpack encoding has a maximum limit. return false; } else if (grpc_slice_str_cmp(md->key, "content-length") == 0) { // Filter "content-length metadata" continue; } batch->Append(StringViewFromSlice(md->key), Slice(CSliceRef(md->value)), [md](absl::string_view error, const Slice& value) { gpr_log(GPR_DEBUG, "Append error: %s", absl::StrCat("key=", StringViewFromSlice(md->key), " error=", error, " value=", value.as_string_view()) .c_str()); }); } return true; } namespace { class PublishToAppEncoder { public: explicit PublishToAppEncoder(grpc_metadata_array* dest, const grpc_metadata_batch* encoding, bool is_client) : dest_(dest), encoding_(encoding), is_client_(is_client) {} void Encode(const Slice& key, const Slice& value) { Append(key.c_slice(), value.c_slice()); } // Catch anything that is not explicitly handled, and do not publish it to the // application. If new metadata is added to a batch that needs to be // published, it should be called out here. template void Encode(Which, const typename Which::ValueType&) {} void Encode(UserAgentMetadata, const Slice& slice) { Append(UserAgentMetadata::key(), slice); } void Encode(HostMetadata, const Slice& slice) { Append(HostMetadata::key(), slice); } void Encode(GrpcPreviousRpcAttemptsMetadata, uint32_t count) { Append(GrpcPreviousRpcAttemptsMetadata::key(), count); } void Encode(GrpcRetryPushbackMsMetadata, Duration count) { Append(GrpcRetryPushbackMsMetadata::key(), count.millis()); } void Encode(LbTokenMetadata, const Slice& slice) { Append(LbTokenMetadata::key(), slice); } private: void Append(absl::string_view key, int64_t value) { Append(StaticSlice::FromStaticString(key).c_slice(), Slice::FromInt64(value).c_slice()); } void Append(absl::string_view key, const Slice& value) { Append(StaticSlice::FromStaticString(key).c_slice(), value.c_slice()); } void Append(grpc_slice key, grpc_slice value) { if (dest_->count == dest_->capacity) { Crash(absl::StrCat( "Too many metadata entries: capacity=", dest_->capacity, " on ", is_client_ ? "client" : "server", " encoding ", encoding_->count(), " elements: ", encoding_->DebugString().c_str())); } auto* mdusr = &dest_->metadata[dest_->count++]; mdusr->key = key; mdusr->value = value; } grpc_metadata_array* const dest_; const grpc_metadata_batch* const encoding_; const bool is_client_; }; } // namespace void FilterStackCall::PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing) { if (b->count() == 0) return; if (!is_client() && is_trailing) return; if (is_trailing && buffered_metadata_[1] == nullptr) return; grpc_metadata_array* dest; dest = buffered_metadata_[is_trailing]; if (dest->count + b->count() > dest->capacity) { dest->capacity = std::max(dest->capacity + b->count(), dest->capacity * 3 / 2); dest->metadata = static_cast( gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity)); } PublishToAppEncoder encoder(dest, b, is_client()); b->Encode(&encoder); } void FilterStackCall::RecvInitialFilter(grpc_metadata_batch* b) { ProcessIncomingInitialMetadata(*b); PublishAppMetadata(b, false); } void FilterStackCall::RecvTrailingFilter(grpc_metadata_batch* b, grpc_error_handle batch_error) { if (!batch_error.ok()) { SetFinalStatus(batch_error); } else { absl::optional grpc_status = b->Take(GrpcStatusMetadata()); if (grpc_status.has_value()) { grpc_status_code status_code = *grpc_status; grpc_error_handle error; if (status_code != GRPC_STATUS_OK) { Slice peer = GetPeerString(); error = grpc_error_set_int( GRPC_ERROR_CREATE(absl::StrCat("Error received from peer ", peer.as_string_view())), StatusIntProperty::kRpcStatus, static_cast(status_code)); } auto grpc_message = b->Take(GrpcMessageMetadata()); if (grpc_message.has_value()) { error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, grpc_message->as_string_view()); } else if (!error.ok()) { error = grpc_error_set_str(error, StatusStrProperty::kGrpcMessage, ""); } SetFinalStatus(error); } else if (!is_client()) { SetFinalStatus(absl::OkStatus()); } else { gpr_log(GPR_DEBUG, "Received trailing metadata with no error and no status"); SetFinalStatus(grpc_error_set_int(GRPC_ERROR_CREATE("No status received"), StatusIntProperty::kRpcStatus, GRPC_STATUS_UNKNOWN)); } } PublishAppMetadata(b, true); } namespace { bool AreWriteFlagsValid(uint32_t flags) { // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set const uint32_t allowed_write_positions = (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK); const uint32_t invalid_positions = ~allowed_write_positions; return !(flags & invalid_positions); } bool AreInitialMetadataFlagsValid(uint32_t flags) { // check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set uint32_t invalid_positions = ~GRPC_INITIAL_METADATA_USED_MASK; return !(flags & invalid_positions); } size_t BatchSlotForOp(grpc_op_type type) { switch (type) { case GRPC_OP_SEND_INITIAL_METADATA: return 0; case GRPC_OP_SEND_MESSAGE: return 1; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: case GRPC_OP_SEND_STATUS_FROM_SERVER: return 2; case GRPC_OP_RECV_INITIAL_METADATA: return 3; case GRPC_OP_RECV_MESSAGE: return 4; case GRPC_OP_RECV_CLOSE_ON_SERVER: case GRPC_OP_RECV_STATUS_ON_CLIENT: return 5; } GPR_UNREACHABLE_CODE(return 123456789); } } // namespace FilterStackCall::BatchControl* FilterStackCall::ReuseOrAllocateBatchControl( const grpc_op* ops) { size_t slot_idx = BatchSlotForOp(ops[0].op); BatchControl** pslot = &active_batches_[slot_idx]; BatchControl* bctl; if (*pslot != nullptr) { bctl = *pslot; if (bctl->call_ != nullptr) { return nullptr; } bctl->~BatchControl(); bctl->op_ = {}; new (&bctl->batch_error_) AtomicError(); } else { bctl = arena()->New(); *pslot = bctl; } bctl->call_ = this; bctl->call_tracer_ = static_cast( ContextGet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE)); bctl->op_.payload = &stream_op_payload_; return bctl; } void FilterStackCall::BatchControl::PostCompletion() { FilterStackCall* call = call_; grpc_error_handle error = batch_error_.get(); if (IsCallStatusOverrideOnCancellationEnabled()) { // On the client side, if final call status is already known (i.e if this op // includes recv_trailing_metadata) and if the call status is known to be // OK, then disregard the batch error to ensure call->receiving_buffer_ is // not cleared. if (op_.recv_trailing_metadata && call->is_client() && call->status_error_.ok()) { error = absl::OkStatus(); } } if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "tag:%p batch_error=%s op:%s", completion_data_.notify_tag.tag, error.ToString().c_str(), grpc_transport_stream_op_batch_string(&op_, false).c_str()); } if (op_.send_initial_metadata) { call->send_initial_metadata_.Clear(); } if (op_.send_message) { if (op_.payload->send_message.stream_write_closed && error.ok()) { error = grpc_error_add_child( error, GRPC_ERROR_CREATE( "Attempt to send message after stream was closed.")); } call->sending_message_ = false; call->send_slice_buffer_.Clear(); } if (op_.send_trailing_metadata) { call->send_trailing_metadata_.Clear(); } if (!error.ok() && op_.recv_message && *call->receiving_buffer_ != nullptr) { grpc_byte_buffer_destroy(*call->receiving_buffer_); *call->receiving_buffer_ = nullptr; } if (op_.recv_trailing_metadata) { // propagate cancellation to any interested children gpr_atm_rel_store(&call->received_final_op_atm_, 1); call->PropagateCancellationToChildren(); error = absl::OkStatus(); } batch_error_.set(absl::OkStatus()); if (completion_data_.notify_tag.is_closure) { call_ = nullptr; Closure::Run(DEBUG_LOCATION, static_cast(completion_data_.notify_tag.tag), error); call->InternalUnref("completion"); } else { grpc_cq_end_op( call->cq_, completion_data_.notify_tag.tag, error, [](void* user_data, grpc_cq_completion* /*storage*/) { BatchControl* bctl = static_cast(user_data); Call* call = bctl->call_; bctl->call_ = nullptr; call->InternalUnref("completion"); }, this, &completion_data_.cq_completion); } } void FilterStackCall::BatchControl::FinishStep(PendingOp op) { if (GPR_UNLIKELY(completed_batch_step(op))) { PostCompletion(); } } void FilterStackCall::BatchControl::ProcessDataAfterMetadata() { FilterStackCall* call = call_; if (!call->receiving_slice_buffer_.has_value()) { *call->receiving_buffer_ = nullptr; call->receiving_message_ = false; FinishStep(PendingOp::kRecvMessage); } else { call->NoteLastMessageFlags(call->receiving_stream_flags_); if ((call->receiving_stream_flags_ & GRPC_WRITE_INTERNAL_COMPRESS) && (call->incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) { *call->receiving_buffer_ = grpc_raw_compressed_byte_buffer_create( nullptr, 0, call->incoming_compression_algorithm()); } else { *call->receiving_buffer_ = grpc_raw_byte_buffer_create(nullptr, 0); } grpc_slice_buffer_move_into( call->receiving_slice_buffer_->c_slice_buffer(), &(*call->receiving_buffer_)->data.raw.slice_buffer); call->receiving_message_ = false; call->receiving_slice_buffer_.reset(); FinishStep(PendingOp::kRecvMessage); } } void FilterStackCall::BatchControl::ReceivingStreamReady( grpc_error_handle error) { if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "tag:%p ReceivingStreamReady error=%s " "receiving_slice_buffer.has_value=%d recv_state=%" PRIdPTR, completion_data_.notify_tag.tag, error.ToString().c_str(), call_->receiving_slice_buffer_.has_value(), gpr_atm_no_barrier_load(&call_->recv_state_)); } FilterStackCall* call = call_; if (!error.ok()) { call->receiving_slice_buffer_.reset(); if (batch_error_.ok()) { batch_error_.set(error); } call->CancelWithError(error); } // If recv_state is kRecvNone, we will save the batch_control // object with rel_cas, and will not use it after the cas. Its corresponding // acq_load is in receiving_initial_metadata_ready() if (!error.ok() || !call->receiving_slice_buffer_.has_value() || !gpr_atm_rel_cas(&call->recv_state_, kRecvNone, reinterpret_cast(this))) { ProcessDataAfterMetadata(); } } void FilterStackCall::BatchControl::ReceivingInitialMetadataReady( grpc_error_handle error) { FilterStackCall* call = call_; GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_initial_metadata_ready"); if (error.ok()) { grpc_metadata_batch* md = &call->recv_initial_metadata_; call->RecvInitialFilter(md); absl::optional deadline = md->get(GrpcTimeoutMetadata()); if (deadline.has_value() && !call->is_client()) { call_->set_send_deadline(*deadline); } } else { if (batch_error_.ok()) { batch_error_.set(error); } call->CancelWithError(error); } grpc_closure* saved_rsr_closure = nullptr; while (true) { gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state_); // Should only receive initial metadata once GPR_ASSERT(rsr_bctlp != 1); if (rsr_bctlp == 0) { // We haven't seen initial metadata and messages before, thus initial // metadata is received first. // no_barrier_cas is used, as this function won't access the batch_control // object saved by receiving_stream_ready() if the initial metadata is // received first. if (gpr_atm_no_barrier_cas(&call->recv_state_, kRecvNone, kRecvInitialMetadataFirst)) { break; } } else { // Already received messages saved_rsr_closure = GRPC_CLOSURE_CREATE( [](void* bctl, grpc_error_handle error) { static_cast(bctl)->ReceivingStreamReady(error); }, reinterpret_cast(rsr_bctlp), grpc_schedule_on_exec_ctx); // No need to modify recv_state break; } } if (saved_rsr_closure != nullptr) { Closure::Run(DEBUG_LOCATION, saved_rsr_closure, error); } FinishStep(PendingOp::kRecvInitialMetadata); } void FilterStackCall::BatchControl::ReceivingTrailingMetadataReady( grpc_error_handle error) { GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "recv_trailing_metadata_ready"); grpc_metadata_batch* md = &call_->recv_trailing_metadata_; call_->RecvTrailingFilter(md, error); FinishStep(PendingOp::kRecvTrailingMetadata); } void FilterStackCall::BatchControl::FinishBatch(grpc_error_handle error) { GRPC_CALL_COMBINER_STOP(call_->call_combiner(), "on_complete"); if (batch_error_.ok()) { batch_error_.set(error); } if (!error.ok()) { call_->CancelWithError(error); } FinishStep(PendingOp::kSends); } namespace { void EndOpImmediately(grpc_completion_queue* cq, void* notify_tag, bool is_notify_tag_closure) { if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(cq, notify_tag)); grpc_cq_end_op( cq, notify_tag, absl::OkStatus(), [](void*, grpc_cq_completion* completion) { gpr_free(completion); }, nullptr, static_cast( gpr_malloc(sizeof(grpc_cq_completion)))); } else { Closure::Run(DEBUG_LOCATION, static_cast(notify_tag), absl::OkStatus()); } } } // namespace grpc_call_error FilterStackCall::StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) { size_t i; const grpc_op* op; BatchControl* bctl; grpc_call_error error = GRPC_CALL_OK; grpc_transport_stream_op_batch* stream_op; grpc_transport_stream_op_batch_payload* stream_op_payload; uint32_t seen_ops = 0; intptr_t pending_ops = 0; for (i = 0; i < nops; i++) { if (seen_ops & (1u << ops[i].op)) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } seen_ops |= (1u << ops[i].op); } if (!is_client() && (seen_ops & (1u << GRPC_OP_SEND_STATUS_FROM_SERVER)) != 0 && (seen_ops & (1u << GRPC_OP_RECV_MESSAGE)) != 0) { gpr_log(GPR_ERROR, "******************* SEND_STATUS WITH RECV_MESSAGE " "*******************"); return GRPC_CALL_ERROR; } GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops); if (nops == 0) { EndOpImmediately(cq_, notify_tag, is_notify_tag_closure); error = GRPC_CALL_OK; goto done; } bctl = ReuseOrAllocateBatchControl(ops); if (bctl == nullptr) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } bctl->completion_data_.notify_tag.tag = notify_tag; bctl->completion_data_.notify_tag.is_closure = static_cast(is_notify_tag_closure != 0); stream_op = &bctl->op_; stream_op_payload = &stream_op_payload_; // rewrite batch ops into a transport op for (i = 0; i < nops; i++) { op = &ops[i]; if (op->reserved != nullptr) { error = GRPC_CALL_ERROR; goto done_with_error; } switch (op->op) { case GRPC_OP_SEND_INITIAL_METADATA: { // Flag validation: currently allow no flags if (!AreInitialMetadataFlagsValid(op->flags)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (sent_initial_metadata_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } if (op->data.send_initial_metadata.count > INT_MAX) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } stream_op->send_initial_metadata = true; sent_initial_metadata_ = true; if (!PrepareApplicationMetadata(op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, false)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } PrepareOutgoingInitialMetadata(*op, send_initial_metadata_); // TODO(ctiller): just make these the same variable? if (is_client() && send_deadline() != Timestamp::InfFuture()) { send_initial_metadata_.Set(GrpcTimeoutMetadata(), send_deadline()); } if (is_client()) { send_initial_metadata_.Set( WaitForReady(), WaitForReady::ValueType{ (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0, (op->flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0}); } stream_op_payload->send_initial_metadata.send_initial_metadata = &send_initial_metadata_; pending_ops |= PendingOpMask(PendingOp::kSends); break; } case GRPC_OP_SEND_MESSAGE: { if (!AreWriteFlagsValid(op->flags)) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (op->data.send_message.send_message == nullptr) { error = GRPC_CALL_ERROR_INVALID_MESSAGE; goto done_with_error; } if (sending_message_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } uint32_t flags = op->flags; // If the outgoing buffer is already compressed, mark it as so in the // flags. These will be picked up by the compression filter and further // (wasteful) attempts at compression skipped. if (op->data.send_message.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { flags |= GRPC_WRITE_INTERNAL_COMPRESS; } stream_op->send_message = true; sending_message_ = true; send_slice_buffer_.Clear(); grpc_slice_buffer_move_into( &op->data.send_message.send_message->data.raw.slice_buffer, send_slice_buffer_.c_slice_buffer()); stream_op_payload->send_message.flags = flags; stream_op_payload->send_message.send_message = &send_slice_buffer_; pending_ops |= PendingOpMask(PendingOp::kSends); break; } case GRPC_OP_SEND_CLOSE_FROM_CLIENT: { // Flag validation: currently allow no flags if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (!is_client()) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } if (sent_final_op_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } stream_op->send_trailing_metadata = true; sent_final_op_ = true; stream_op_payload->send_trailing_metadata.send_trailing_metadata = &send_trailing_metadata_; pending_ops |= PendingOpMask(PendingOp::kSends); break; } case GRPC_OP_SEND_STATUS_FROM_SERVER: { // Flag validation: currently allow no flags if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (is_client()) { error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } if (sent_final_op_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } if (op->data.send_status_from_server.trailing_metadata_count > INT_MAX) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } stream_op->send_trailing_metadata = true; sent_final_op_ = true; if (!PrepareApplicationMetadata( op->data.send_status_from_server.trailing_metadata_count, op->data.send_status_from_server.trailing_metadata, true)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } grpc_error_handle status_error = op->data.send_status_from_server.status == GRPC_STATUS_OK ? absl::OkStatus() : grpc_error_set_int( GRPC_ERROR_CREATE("Server returned error"), StatusIntProperty::kRpcStatus, static_cast( op->data.send_status_from_server.status)); if (op->data.send_status_from_server.status_details != nullptr) { send_trailing_metadata_.Set( GrpcMessageMetadata(), Slice(grpc_slice_copy( *op->data.send_status_from_server.status_details))); if (!status_error.ok()) { status_error = grpc_error_set_str( status_error, StatusStrProperty::kGrpcMessage, StringViewFromSlice( *op->data.send_status_from_server.status_details)); } } status_error_.set(status_error); send_trailing_metadata_.Set(GrpcStatusMetadata(), op->data.send_status_from_server.status); // Ignore any te metadata key value pairs specified. send_trailing_metadata_.Remove(TeMetadata()); stream_op_payload->send_trailing_metadata.send_trailing_metadata = &send_trailing_metadata_; stream_op_payload->send_trailing_metadata.sent = &sent_server_trailing_metadata_; pending_ops |= PendingOpMask(PendingOp::kSends); break; } case GRPC_OP_RECV_INITIAL_METADATA: { // Flag validation: currently allow no flags if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (received_initial_metadata_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } received_initial_metadata_ = true; buffered_metadata_[0] = op->data.recv_initial_metadata.recv_initial_metadata; GRPC_CLOSURE_INIT( &receiving_initial_metadata_ready_, [](void* bctl, grpc_error_handle error) { static_cast(bctl)->ReceivingInitialMetadataReady( error); }, bctl, grpc_schedule_on_exec_ctx); stream_op->recv_initial_metadata = true; stream_op_payload->recv_initial_metadata.recv_initial_metadata = &recv_initial_metadata_; stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &receiving_initial_metadata_ready_; if (is_client()) { stream_op_payload->recv_initial_metadata.trailing_metadata_available = &is_trailers_only_; } pending_ops |= PendingOpMask(PendingOp::kRecvInitialMetadata); break; } case GRPC_OP_RECV_MESSAGE: { // Flag validation: currently allow no flags if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (receiving_message_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } receiving_message_ = true; stream_op->recv_message = true; receiving_slice_buffer_.reset(); receiving_buffer_ = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &receiving_slice_buffer_; receiving_stream_flags_ = 0; stream_op_payload->recv_message.flags = &receiving_stream_flags_; stream_op_payload->recv_message.call_failed_before_recv_message = &call_failed_before_recv_message_; GRPC_CLOSURE_INIT( &receiving_stream_ready_, [](void* bctlp, grpc_error_handle error) { auto* bctl = static_cast(bctlp); auto* call = bctl->call_; // Yields the call combiner before processing the received // message. GRPC_CALL_COMBINER_STOP(call->call_combiner(), "recv_message_ready"); bctl->ReceivingStreamReady(error); }, bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &receiving_stream_ready_; pending_ops |= PendingOpMask(PendingOp::kRecvMessage); break; } case GRPC_OP_RECV_STATUS_ON_CLIENT: { // Flag validation: currently allow no flags if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (!is_client()) { error = GRPC_CALL_ERROR_NOT_ON_SERVER; goto done_with_error; } if (requested_final_op_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } requested_final_op_ = true; buffered_metadata_[1] = op->data.recv_status_on_client.trailing_metadata; final_op_.client.status = op->data.recv_status_on_client.status; final_op_.client.status_details = op->data.recv_status_on_client.status_details; final_op_.client.error_string = op->data.recv_status_on_client.error_string; stream_op->recv_trailing_metadata = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &recv_trailing_metadata_; stream_op_payload->recv_trailing_metadata.collect_stats = &final_info_.stats.transport_stream_stats; GRPC_CLOSURE_INIT( &receiving_trailing_metadata_ready_, [](void* bctl, grpc_error_handle error) { static_cast(bctl)->ReceivingTrailingMetadataReady( error); }, bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = &receiving_trailing_metadata_ready_; pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata); break; } case GRPC_OP_RECV_CLOSE_ON_SERVER: { // Flag validation: currently allow no flags if (op->flags != 0) { error = GRPC_CALL_ERROR_INVALID_FLAGS; goto done_with_error; } if (is_client()) { error = GRPC_CALL_ERROR_NOT_ON_CLIENT; goto done_with_error; } if (requested_final_op_) { error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; goto done_with_error; } requested_final_op_ = true; final_op_.server.cancelled = op->data.recv_close_on_server.cancelled; stream_op->recv_trailing_metadata = true; stream_op_payload->recv_trailing_metadata.recv_trailing_metadata = &recv_trailing_metadata_; stream_op_payload->recv_trailing_metadata.collect_stats = &final_info_.stats.transport_stream_stats; GRPC_CLOSURE_INIT( &receiving_trailing_metadata_ready_, [](void* bctl, grpc_error_handle error) { static_cast(bctl)->ReceivingTrailingMetadataReady( error); }, bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready = &receiving_trailing_metadata_ready_; pending_ops |= PendingOpMask(PendingOp::kRecvTrailingMetadata); break; } } } InternalRef("completion"); if (!is_notify_tag_closure) { GPR_ASSERT(grpc_cq_begin_op(cq_, notify_tag)); } bctl->set_pending_ops(pending_ops); if (pending_ops & PendingOpMask(PendingOp::kSends)) { GRPC_CLOSURE_INIT( &bctl->finish_batch_, [](void* bctl, grpc_error_handle error) { static_cast(bctl)->FinishBatch(error); }, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch_; } if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "BATCH:%p START:%s BATCH:%s (tag:%p)", bctl, PendingOpString(pending_ops).c_str(), grpc_transport_stream_op_batch_string(stream_op, false).c_str(), bctl->completion_data_.notify_tag.tag); } ExecuteBatch(stream_op, &bctl->start_batch_); done: return error; done_with_error: // reverse any mutations that occurred if (stream_op->send_initial_metadata) { sent_initial_metadata_ = false; send_initial_metadata_.Clear(); } if (stream_op->send_message) { sending_message_ = false; } if (stream_op->send_trailing_metadata) { sent_final_op_ = false; send_trailing_metadata_.Clear(); } if (stream_op->recv_initial_metadata) { received_initial_metadata_ = false; } if (stream_op->recv_message) { receiving_message_ = false; } if (stream_op->recv_trailing_metadata) { requested_final_op_ = false; } goto done; } void FilterStackCall::ContextSet(grpc_context_index elem, void* value, void (*destroy)(void*)) { if (context_[elem].destroy) { context_[elem].destroy(context_[elem].value); } context_[elem].value = value; context_[elem].destroy = destroy; } /////////////////////////////////////////////////////////////////////////////// // Metadata validation helpers namespace { bool ValidateMetadata(size_t count, grpc_metadata* metadata) { if (count > INT_MAX) { return false; } for (size_t i = 0; i < count; i++) { grpc_metadata* md = &metadata[i]; if (!GRPC_LOG_IF_ERROR("validate_metadata", grpc_validate_header_key_is_legal(md->key))) { return false; } else if (!grpc_is_binary_header_internal(md->key) && !GRPC_LOG_IF_ERROR( "validate_metadata", grpc_validate_header_nonbin_value_is_legal(md->value))) { return false; } else if (GRPC_SLICE_LENGTH(md->value) >= UINT32_MAX) { // HTTP2 hpack encoding has a maximum limit. return false; } } return true; } } // namespace /////////////////////////////////////////////////////////////////////////////// // PromiseBasedCall // Will be folded into Call once the promise conversion is done class BasicPromiseBasedCall : public Call, public Party, public grpc_event_engine::experimental:: EventEngine::Closure /* for deadlines */ { public: using Call::arena; BasicPromiseBasedCall(Arena* arena, uint32_t initial_external_refs, uint32_t initial_internal_refs, const grpc_call_create_args& args) : Call(arena, args.server_transport_data == nullptr, args.send_deadline, args.channel->Ref()), Party(initial_internal_refs), external_refs_(initial_external_refs), cq_(args.cq) { if (args.cq != nullptr) { GRPC_CQ_INTERNAL_REF(args.cq, "bind"); } } ~BasicPromiseBasedCall() override { if (cq_) GRPC_CQ_INTERNAL_UNREF(cq_, "bind"); for (int i = 0; i < GRPC_CONTEXT_COUNT; i++) { if (context_[i].destroy) { context_[i].destroy(context_[i].value); } } } // Implementation of EventEngine::Closure, called when deadline expires void Run() final; virtual void OrphanCall() = 0; virtual ServerCallContext* server_call_context() { return nullptr; } void SetCompletionQueue(grpc_completion_queue* cq) final { cq_ = cq; GRPC_CQ_INTERNAL_REF(cq, "bind"); } // Implementation of call refcounting: move this to DualRefCounted once we // don't need to maintain FilterStackCall compatibility void ExternalRef() final { if (external_refs_.fetch_add(1, std::memory_order_relaxed) == 0) { InternalRef("external"); } } void ExternalUnref() final { if (external_refs_.fetch_sub(1, std::memory_order_acq_rel) == 1) { OrphanCall(); InternalUnref("external"); } } void InternalRef(const char* reason) final { if (grpc_call_refcount_trace.enabled()) { gpr_log(GPR_DEBUG, "INTERNAL_REF:%p:%s", this, reason); } Party::IncrementRefCount(); } void InternalUnref(const char* reason) final { if (grpc_call_refcount_trace.enabled()) { gpr_log(GPR_DEBUG, "INTERNAL_UNREF:%p:%s", this, reason); } Party::Unref(); } void RunInContext(absl::AnyInvocable fn) { Spawn( "run_in_context", [fn = std::move(fn)]() mutable { fn(); return Empty{}; }, [](Empty) {}); } void ContextSet(grpc_context_index elem, void* value, void (*destroy)(void*)) final { if (context_[elem].destroy != nullptr) { context_[elem].destroy(context_[elem].value); } context_[elem].value = value; context_[elem].destroy = destroy; } void* ContextGet(grpc_context_index elem) const final { return context_[elem].value; } void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_); void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_); Timestamp deadline() { MutexLock lock(&deadline_mu_); return deadline_; } // Accept the stats from the context (call once we have proof the transport is // done with them). void AcceptTransportStatsFromContext() { final_stats_ = *call_context_.call_stats(); } // This should return nullptr for the promise stack (and alternative means // for that functionality be invented) grpc_call_stack* call_stack() final { return nullptr; } virtual RefCountedPtr MakeCallSpine(CallArgs) { Crash("Not implemented"); } protected: class ScopedContext : public ScopedActivity, public promise_detail::Context, public promise_detail::Context, public promise_detail::Context, public promise_detail::Context { public: explicit ScopedContext(BasicPromiseBasedCall* call) : ScopedActivity(call), promise_detail::Context(call->arena()), promise_detail::Context(call->context_), promise_detail::Context(&call->call_context_), promise_detail::Context(&call->finalization_) {} }; grpc_call_context_element* context() { return context_; } grpc_completion_queue* cq() { return cq_; } // At the end of the call run any finalization actions. void SetFinalizationStatus(grpc_status_code status, Slice status_details) { final_message_ = std::move(status_details); final_status_ = status; } grpc_event_engine::experimental::EventEngine* event_engine() const override { return channel()->event_engine(); } private: void PartyOver() final { { ScopedContext ctx(this); std::string message; grpc_call_final_info final_info; final_info.stats = final_stats_; final_info.final_status = final_status_; // TODO(ctiller): change type here so we don't need to copy this string. final_info.error_string = nullptr; if (!final_message_.empty()) { message = std::string(final_message_.begin(), final_message_.end()); final_info.error_string = message.c_str(); } final_info.stats.latency = gpr_cycle_counter_sub(gpr_get_cycle_counter(), start_time()); finalization_.Run(&final_info); CancelRemainingParticipants(); arena()->DestroyManagedNewObjects(); } DeleteThis(); } // Double refcounted for now: party owns the internal refcount, we track the // external refcount. Figure out a better scheme post-promise conversion. std::atomic external_refs_; CallFinalization finalization_; CallContext call_context_{this}; // Contexts for various subsystems (security, tracing, ...). grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; grpc_call_stats final_stats_{}; // Current deadline. Mutex deadline_mu_; Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture(); grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY( deadline_mu_) deadline_task_; Slice final_message_; grpc_status_code final_status_ = GRPC_STATUS_UNKNOWN; grpc_completion_queue* cq_; }; void BasicPromiseBasedCall::UpdateDeadline(Timestamp deadline) { MutexLock lock(&deadline_mu_); if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[call] UpdateDeadline from=%s to=%s", DebugTag().c_str(), deadline_.ToString().c_str(), deadline.ToString().c_str()); } if (deadline >= deadline_) return; auto* const event_engine = channel()->event_engine(); if (deadline_ != Timestamp::InfFuture()) { if (!event_engine->Cancel(deadline_task_)) return; } else { InternalRef("deadline"); } deadline_ = deadline; deadline_task_ = event_engine->RunAfter(deadline - Timestamp::Now(), this); } void BasicPromiseBasedCall::ResetDeadline() { { MutexLock lock(&deadline_mu_); if (deadline_ == Timestamp::InfFuture()) return; auto* const event_engine = channel()->event_engine(); if (!event_engine->Cancel(deadline_task_)) return; deadline_ = Timestamp::InfFuture(); } InternalUnref("deadline[reset]"); } void BasicPromiseBasedCall::Run() { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; CancelWithError(absl::DeadlineExceededError("Deadline exceeded")); InternalUnref("deadline[run]"); } class PromiseBasedCall : public BasicPromiseBasedCall { public: PromiseBasedCall(Arena* arena, uint32_t initial_external_refs, const grpc_call_create_args& args); bool Completed() final { return finished_.IsSet(); } bool failed_before_recv_message() const final { return failed_before_recv_message_.load(std::memory_order_relaxed); } using Call::arena; protected: class ScopedContext : public BasicPromiseBasedCall::ScopedContext, public BatchBuilder, public promise_detail::Context { public: explicit ScopedContext(PromiseBasedCall* call) : BasicPromiseBasedCall::ScopedContext(call), BatchBuilder(&call->batch_payload_), promise_detail::Context(this) {} }; class Completion { public: Completion() : index_(kNullIndex) {} ~Completion() { GPR_ASSERT(index_ == kNullIndex); } explicit Completion(uint8_t index) : index_(index) {} Completion(const Completion& other) = delete; Completion& operator=(const Completion& other) = delete; Completion(Completion&& other) noexcept : index_(other.index_) { other.index_ = kNullIndex; } Completion& operator=(Completion&& other) noexcept { GPR_ASSERT(index_ == kNullIndex); index_ = other.index_; other.index_ = kNullIndex; return *this; } uint8_t index() const { return index_; } uint8_t TakeIndex() { return std::exchange(index_, kNullIndex); } bool has_value() const { return index_ != kNullIndex; } private: enum : uint8_t { kNullIndex = 0xff }; uint8_t index_; }; // Enumerates why a Completion is still pending enum class PendingOp { // We're in the midst of starting a batch of operations kStartingBatch = 0, // The following correspond with the batch operations from above kSendInitialMetadata, kReceiveInitialMetadata, kReceiveStatusOnClient, kReceiveCloseOnServer = kReceiveStatusOnClient, kSendMessage, kReceiveMessage, kSendStatusFromServer, kSendCloseFromClient = kSendStatusFromServer, }; bool RunParty() override { ScopedContext ctx(this); return Party::RunParty(); } const char* PendingOpString(PendingOp reason) const { switch (reason) { case PendingOp::kStartingBatch: return "StartingBatch"; case PendingOp::kSendInitialMetadata: return "SendInitialMetadata"; case PendingOp::kReceiveInitialMetadata: return "ReceiveInitialMetadata"; case PendingOp::kReceiveStatusOnClient: return is_client() ? "ReceiveStatusOnClient" : "ReceiveCloseOnServer"; case PendingOp::kSendMessage: return "SendMessage"; case PendingOp::kReceiveMessage: return "ReceiveMessage"; case PendingOp::kSendStatusFromServer: return is_client() ? "SendCloseFromClient" : "SendStatusFromServer"; } return "Unknown"; } static constexpr uint32_t PendingOpBit(PendingOp reason) { return 1 << static_cast(reason); } // Begin work on a completion, recording the tag/closure to notify. // Use the op selected in \a ops to determine the index to allocate into. // Starts the "StartingBatch" PendingOp immediately. // Assumes at least one operation in \a ops. Completion StartCompletion(void* tag, bool is_closure, const grpc_op* ops); // Add one pending op to the completion, and return it. Completion AddOpToCompletion(const Completion& completion, PendingOp reason); // Stringify a completion std::string CompletionString(const Completion& completion) const { return completion.has_value() ? completion_info_[completion.index()].pending.ToString(this) : "no-completion"; } // Finish one op on the completion. Must have been previously been added. // The completion as a whole finishes when all pending ops finish. void FinishOpOnCompletion(Completion* completion, PendingOp reason); // Mark the completion as failed. Does not finish it. void FailCompletion(const Completion& completion, SourceLocation source_location = {}); // Mark the completion as infallible. Overrides FailCompletion to report // success always. void ForceCompletionSuccess(const Completion& completion); std::string PresentAndCompletionText(const char* caption, bool has, const Completion& completion) const { if (has) { if (completion.has_value()) { return absl::StrCat(caption, ":", CompletionString(completion), " "); } else { return absl::StrCat(caption, ":!!BUG:operation is present, no completion!! "); } } else { if (!completion.has_value()) { return ""; } else { return absl::StrCat(caption, ":no-op:", CompletionString(completion), " "); } } } // Spawn a job that will first do FirstPromise then receive a message template void StartRecvMessage(const grpc_op& op, const Completion& completion, FirstPromise first, PipeReceiver* receiver, bool cancel_on_error, Party::BulkSpawner& spawner); void StartSendMessage(const grpc_op& op, const Completion& completion, PipeSender* sender, Party::BulkSpawner& spawner); void set_completed() { finished_.Set(); } // Returns a promise that resolves to Empty whenever the call is completed. auto finished() { return finished_.Wait(); } // Returns a promise that resolves to Empty whenever there is no outstanding // send operation auto WaitForSendingStarted() { return [this]() -> Poll { int n = sends_queued_.load(std::memory_order_relaxed); if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[call] WaitForSendingStarted n=%d", DebugTag().c_str(), n); } if (n != 0) return waiting_for_queued_sends_.pending(); return Empty{}; }; } // Mark that a send has been queued - blocks sending trailing metadata. void QueueSend() { if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[call] QueueSend", DebugTag().c_str()); } sends_queued_.fetch_add(1, std::memory_order_relaxed); } // Mark that a send has been dequeued - allows sending trailing metadata once // zero sends are queued. void EnactSend() { if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%s[call] EnactSend", DebugTag().c_str()); } if (1 == sends_queued_.fetch_sub(1, std::memory_order_relaxed)) { waiting_for_queued_sends_.Wake(); } } void set_failed_before_recv_message() { failed_before_recv_message_.store(true, std::memory_order_relaxed); } private: union CompletionInfo { static constexpr uint32_t kOpFailed = 0x8000'0000u; static constexpr uint32_t kOpForceSuccess = 0x4000'0000u; CompletionInfo() {} enum CompletionState { kPending, kSuccess, kFailure, }; struct Pending { // Bitmask of PendingOps at the bottom, and kOpFailed, kOpForceSuccess at // the top. std::atomic state; bool is_closure; // True if this completion was for a recv_message op. // In that case if the completion as a whole fails we need to cleanup the // returned message. bool is_recv_message; void* tag; void Start(bool is_closure, void* tag) { this->is_closure = is_closure; this->is_recv_message = false; this->tag = tag; state.store(PendingOpBit(PendingOp::kStartingBatch), std::memory_order_release); } void AddPendingBit(PendingOp reason) { if (reason == PendingOp::kReceiveMessage) is_recv_message = true; auto prev = state.fetch_or(PendingOpBit(reason), std::memory_order_relaxed); GPR_ASSERT((prev & PendingOpBit(reason)) == 0); } CompletionState RemovePendingBit(PendingOp reason) { const uint32_t mask = ~PendingOpBit(reason); auto prev = state.fetch_and(mask, std::memory_order_acq_rel); GPR_ASSERT((prev & PendingOpBit(reason)) != 0); switch (prev & mask) { case kOpFailed: return kFailure; case kOpFailed | kOpForceSuccess: case kOpForceSuccess: case 0: return kSuccess; default: return kPending; } } void MarkFailed() { state.fetch_or(kOpFailed, std::memory_order_relaxed); } void MarkForceSuccess() { state.fetch_or(kOpForceSuccess, std::memory_order_relaxed); } std::string ToString(const PromiseBasedCall* call) const { auto state = this->state.load(std::memory_order_relaxed); std::vector pending_ops; for (size_t i = 0; i < 24; i++) { if (state & (1u << i)) { pending_ops.push_back( call->PendingOpString(static_cast(i))); } } return absl::StrFormat("{%s}%s:tag=%p", absl::StrJoin(pending_ops, ","), (state & kOpForceSuccess) ? ":force-success" : (state & kOpFailed) ? ":failed" : ":success", tag); } } pending; grpc_cq_completion completion; }; CompletionInfo completion_info_[6]; ExternallyObservableLatch finished_; // Non-zero with an outstanding GRPC_OP_SEND_INITIAL_METADATA or // GRPC_OP_SEND_MESSAGE (one count each), and 0 once those payloads have been // pushed onto the outgoing pipe. std::atomic sends_queued_{0}; std::atomic failed_before_recv_message_{false}; // Waiter for when sends_queued_ becomes 0. IntraActivityWaiter waiting_for_queued_sends_; grpc_byte_buffer** recv_message_ = nullptr; grpc_transport_stream_op_batch_payload batch_payload_{context()}; }; template grpc_error_handle MakePromiseBasedCall(grpc_call_create_args* args, grpc_call** out_call) { Channel* channel = args->channel.get(); auto* arena = channel->CreateArena(); PromiseBasedCall* call = arena->New(arena, args); *out_call = call->c_ptr(); GPR_DEBUG_ASSERT(Call::FromC(*out_call) == call); return absl::OkStatus(); } PromiseBasedCall::PromiseBasedCall(Arena* arena, uint32_t initial_external_refs, const grpc_call_create_args& args) : BasicPromiseBasedCall(arena, initial_external_refs, initial_external_refs != 0 ? 1 : 0, args) {} static void CToMetadata(grpc_metadata* metadata, size_t count, grpc_metadata_batch* b) { for (size_t i = 0; i < count; i++) { grpc_metadata* md = &metadata[i]; auto key = StringViewFromSlice(md->key); // Filter "content-length metadata" if (key == "content-length") continue; b->Append(key, Slice(CSliceRef(md->value)), [md](absl::string_view error, const Slice& value) { gpr_log(GPR_DEBUG, "Append error: %s", absl::StrCat("key=", StringViewFromSlice(md->key), " error=", error, " value=", value.as_string_view()) .c_str()); }); } } PromiseBasedCall::Completion PromiseBasedCall::StartCompletion( void* tag, bool is_closure, const grpc_op* ops) { Completion c(BatchSlotForOp(ops[0].op)); if (!is_closure) { grpc_cq_begin_op(cq(), tag); } completion_info_[c.index()].pending.Start(is_closure, tag); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] StartCompletion %s", DebugTag().c_str(), CompletionString(c).c_str()); } return c; } PromiseBasedCall::Completion PromiseBasedCall::AddOpToCompletion( const Completion& completion, PendingOp reason) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] AddOpToCompletion %s %s", DebugTag().c_str(), CompletionString(completion).c_str(), PendingOpString(reason)); } GPR_ASSERT(completion.has_value()); completion_info_[completion.index()].pending.AddPendingBit(reason); return Completion(completion.index()); } void PromiseBasedCall::FailCompletion(const Completion& completion, SourceLocation location) { if (grpc_call_trace.enabled()) { gpr_log(location.file(), location.line(), GPR_LOG_SEVERITY_ERROR, "%s[call] FailCompletion %s", DebugTag().c_str(), CompletionString(completion).c_str()); } completion_info_[completion.index()].pending.MarkFailed(); } void PromiseBasedCall::ForceCompletionSuccess(const Completion& completion) { completion_info_[completion.index()].pending.MarkForceSuccess(); } void PromiseBasedCall::FinishOpOnCompletion(Completion* completion, PendingOp reason) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] FinishOpOnCompletion completion:%s finish:%s", DebugTag().c_str(), CompletionString(*completion).c_str(), PendingOpString(reason)); } const uint8_t i = completion->TakeIndex(); GPR_ASSERT(i < GPR_ARRAY_SIZE(completion_info_)); CompletionInfo::Pending& pending = completion_info_[i].pending; bool success; switch (pending.RemovePendingBit(reason)) { case CompletionInfo::kPending: return; // Early out case CompletionInfo::kSuccess: success = true; break; case CompletionInfo::kFailure: success = false; break; } if (pending.is_recv_message && !success && *recv_message_ != nullptr) { grpc_byte_buffer_destroy(*recv_message_); *recv_message_ = nullptr; } auto error = success ? absl::OkStatus() : absl::CancelledError(); if (pending.is_closure) { ExecCtx::Run(DEBUG_LOCATION, static_cast(pending.tag), error); } else { InternalRef("cq_end_op"); grpc_cq_end_op( cq(), pending.tag, error, [](void* p, grpc_cq_completion*) { static_cast(p)->InternalUnref("cq_end_op"); }, this, &completion_info_[i].completion); } } void PromiseBasedCall::StartSendMessage(const grpc_op& op, const Completion& completion, PipeSender* sender, Party::BulkSpawner& spawner) { QueueSend(); SliceBuffer send; grpc_slice_buffer_swap( &op.data.send_message.send_message->data.raw.slice_buffer, send.c_slice_buffer()); auto msg = arena()->MakePooled(std::move(send), op.flags); spawner.Spawn( "call_send_message", [this, sender, msg = std::move(msg)]() mutable { EnactSend(); return sender->Push(std::move(msg)); }, [this, completion = AddOpToCompletion( completion, PendingOp::kSendMessage)](bool result) mutable { if (grpc_call_trace.enabled()) { gpr_log(GPR_DEBUG, "%sSendMessage completes %s", DebugTag().c_str(), result ? "successfully" : "with failure"); } if (!result) FailCompletion(completion); FinishOpOnCompletion(&completion, PendingOp::kSendMessage); }); } template void PromiseBasedCall::StartRecvMessage( const grpc_op& op, const Completion& completion, FirstPromiseFactory first_promise_factory, PipeReceiver* receiver, bool cancel_on_error, Party::BulkSpawner& spawner) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] Start RecvMessage: %s", DebugTag().c_str(), CompletionString(completion).c_str()); } recv_message_ = op.data.recv_message.recv_message; spawner.Spawn( "call_recv_message", [first_promise_factory = std::move(first_promise_factory), receiver]() { return Seq(first_promise_factory(), receiver->Next()); }, [this, cancel_on_error, completion = AddOpToCompletion(completion, PendingOp::kReceiveMessage)]( NextResult result) mutable { if (result.has_value()) { MessageHandle& message = *result; NoteLastMessageFlags(message->flags()); if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) { *recv_message_ = grpc_raw_compressed_byte_buffer_create( nullptr, 0, incoming_compression_algorithm()); } else { *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); } grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), &(*recv_message_)->data.raw.slice_buffer); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvMessage: outstanding_recv " "finishes: received %" PRIdPTR " byte message", DebugTag().c_str(), (*recv_message_)->data.raw.slice_buffer.length); } } else if (result.cancelled()) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvMessage: outstanding_recv " "finishes: received end-of-stream with error", DebugTag().c_str()); } set_failed_before_recv_message(); FailCompletion(completion); if (cancel_on_error) CancelWithError(absl::CancelledError()); *recv_message_ = nullptr; } else { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvMessage: outstanding_recv " "finishes: received end-of-stream", DebugTag().c_str()); } *recv_message_ = nullptr; } FinishOpOnCompletion(&completion, PendingOp::kReceiveMessage); }); } /////////////////////////////////////////////////////////////////////////////// // CallContext void CallContext::RunInContext(absl::AnyInvocable fn) { call_->RunInContext(std::move(fn)); } void CallContext::IncrementRefCount(const char* reason) { call_->InternalRef(reason); } void CallContext::Unref(const char* reason) { call_->InternalUnref(reason); } void CallContext::UpdateDeadline(Timestamp deadline) { call_->UpdateDeadline(deadline); } Timestamp CallContext::deadline() const { return call_->deadline(); } ServerCallContext* CallContext::server_call_context() { return call_->server_call_context(); } RefCountedPtr CallContext::MakeCallSpine( CallArgs call_args) { return call_->MakeCallSpine(std::move(call_args)); } grpc_call* CallContext::c_call() { return call_->c_ptr(); } /////////////////////////////////////////////////////////////////////////////// // PublishMetadataArray namespace { void PublishMetadataArray(grpc_metadata_batch* md, grpc_metadata_array* array, bool is_client) { const auto md_count = md->count(); if (md_count > array->capacity) { array->capacity = std::max(array->capacity + md->count(), array->capacity * 3 / 2); array->metadata = static_cast( gpr_realloc(array->metadata, sizeof(grpc_metadata) * array->capacity)); } PublishToAppEncoder encoder(array, md, is_client); md->Encode(&encoder); } } // namespace /////////////////////////////////////////////////////////////////////////////// // ClientPromiseBasedCall #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL class ClientPromiseBasedCall final : public PromiseBasedCall { public: ClientPromiseBasedCall(Arena* arena, grpc_call_create_args* args) : PromiseBasedCall(arena, 1, *args), polling_entity_( args->cq != nullptr ? grpc_polling_entity_create_from_pollset( grpc_cq_pollset(args->cq)) : (args->pollset_set_alternative != nullptr ? grpc_polling_entity_create_from_pollset_set( args->pollset_set_alternative) : grpc_polling_entity{})) { global_stats().IncrementClientCallsCreated(); if (args->cq != nullptr) { GPR_ASSERT(args->pollset_set_alternative == nullptr && "Only one of 'cq' and 'pollset_set_alternative' should be " "non-nullptr."); } ScopedContext context(this); args->channel->channel_stack()->stats_plugin_group->AddClientCallTracers( *args->path, args->registered_method, this->context()); send_initial_metadata_ = GetContext()->MakePooled(); send_initial_metadata_->Set(HttpPathMetadata(), std::move(*args->path)); if (args->authority.has_value()) { send_initial_metadata_->Set(HttpAuthorityMetadata(), std::move(*args->authority)); } send_initial_metadata_->Set(GrpcRegisteredMethod(), reinterpret_cast(static_cast( args->registered_method))); if (auto* channelz_channel = channel()->channelz_node()) { channelz_channel->RecordCallStarted(); } if (args->send_deadline != Timestamp::InfFuture()) { UpdateDeadline(args->send_deadline); } Call* parent = Call::FromC(args->parent); if (parent != nullptr) { auto parent_status = InitParent(parent, args->propagation_mask); if (!parent_status.ok()) { CancelWithError(std::move(parent_status)); } PublishToParent(parent); } } void OrphanCall() override { MaybeUnpublishFromParent(); } ~ClientPromiseBasedCall() override { ScopedContext context(this); send_initial_metadata_.reset(); // Need to destroy the pipes under the ScopedContext above, so we // move them out here and then allow the destructors to run at // end of scope, but before context. auto c2s = std::move(client_to_server_messages_); auto s2c = std::move(server_to_client_messages_); auto sim = std::move(server_initial_metadata_); } void CancelWithError(absl::Status error) override { if (cancel_with_error_called_.exchange(true, std::memory_order_relaxed)) { return; } if (!started_.exchange(true, std::memory_order_relaxed)) { // Initial metadata not sent yet, so we can just fail the call. Spawn( "cancel_before_initial_metadata", [error = std::move(error), this]() { server_to_client_messages_.sender.Close(); auto md = ServerMetadataFromStatus(error); md->Set(GrpcCallWasCancelled(), true); Finish(std::move(md)); return Empty{}; }, [](Empty) {}); } else { Spawn( "cancel_with_error", [error = std::move(error), this]() { if (!cancel_error_.is_set()) { auto md = ServerMetadataFromStatus(error); md->Set(GrpcCallWasCancelled(), true); cancel_error_.Set(std::move(md)); } return Empty{}; }, [](Empty) {}); } } absl::string_view GetServerAuthority() const override { abort(); } bool is_trailers_only() const override { return is_trailers_only_; } grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) override; std::string DebugTag() const override { return absl::StrFormat("CLIENT_CALL[%p]: ", this); } RefCountedPtr MakeCallSpine(CallArgs call_args) final { class WrappingCallSpine final : public CallSpineInterface { public: WrappingCallSpine(ClientPromiseBasedCall* call, ClientMetadataHandle metadata) : call_(call) { call_->InternalRef("call-spine"); SpawnInfallible( "send_client_initial_metadata", [self = Ref(), metadata = std::move(metadata)]() mutable { return Map(self->client_initial_metadata_.sender.Push( std::move(metadata)), [self](bool) { return Empty{}; }); }); } ~WrappingCallSpine() override { { ScopedContext context(call_); // Move these out and destroy before the internal unref. auto client_initial_metadata = std::move(client_initial_metadata_); auto server_trailing_metadata = std::move(server_trailing_metadata_); } call_->InternalUnref("call-spine"); } Pipe& client_initial_metadata() override { return client_initial_metadata_; } Pipe& client_to_server_messages() override { return call_->client_to_server_messages_; } Pipe& server_initial_metadata() override { return call_->server_initial_metadata_; } Pipe& server_to_client_messages() override { return call_->server_to_client_messages_; } Pipe& server_trailing_metadata() override { return server_trailing_metadata_; } Latch& cancel_latch() override { return cancel_error_; } Party& party() override { return *call_; } Arena* arena() override { return call_->arena(); } void IncrementRefCount() override { refs_.Ref(); } void Unref() override { if (refs_.Unref()) delete this; } RefCountedPtr Ref() { IncrementRefCount(); return RefCountedPtr(this); } private: RefCount refs_; ClientPromiseBasedCall* const call_; std::atomic sent_trailing_metadata_{false}; Pipe client_initial_metadata_{call_->arena()}; Pipe server_trailing_metadata_{call_->arena()}; Latch cancel_error_; }; GPR_ASSERT(call_args.server_initial_metadata == &server_initial_metadata_.sender); GPR_ASSERT(call_args.client_to_server_messages == &client_to_server_messages_.receiver); GPR_ASSERT(call_args.server_to_client_messages == &server_to_client_messages_.sender); call_args.client_initial_metadata_outstanding.Complete(true); return MakeRefCounted( this, std::move(call_args.client_initial_metadata)); } private: // Finish the call with the given status/trailing metadata. void Finish(ServerMetadataHandle trailing_metadata); // Validate that a set of ops is valid for a client call. grpc_call_error ValidateBatch(const grpc_op* ops, size_t nops) const; // Commit a valid batch of operations to be executed. void CommitBatch(const grpc_op* ops, size_t nops, const Completion& completion); // Start the underlying promise. void StartPromise(ClientMetadataHandle client_initial_metadata, const Completion& completion, Party::BulkSpawner& spawner); // Start receiving initial metadata void StartRecvInitialMetadata(grpc_metadata_array* array, const Completion& completion, Party::BulkSpawner& spawner); void StartRecvStatusOnClient( const Completion& completion, grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args, Party::BulkSpawner& spawner); // Publish status out to the application. void PublishStatus( grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args, ServerMetadataHandle trailing_metadata); // Publish server initial metadata out to the application. void PublishInitialMetadata(ServerMetadata* metadata); ClientMetadataHandle send_initial_metadata_; Pipe server_initial_metadata_{arena()}; Latch server_trailing_metadata_; Latch cancel_error_; Latch polling_entity_; Pipe client_to_server_messages_{arena()}; Pipe server_to_client_messages_{arena()}; bool is_trailers_only_ = false; bool scheduled_receive_status_ = false; bool scheduled_send_close_ = false; // True once the promise for the call is started. // This corresponds to sending initial metadata, or cancelling before doing // so. // In the latter case real world code sometimes does not sent the initial // metadata, and so gating based upon that does not work out. std::atomic started_{false}; // True after the first CancelWithError call - prevents spamming cancels from // overflowing the party. std::atomic cancel_with_error_called_{false}; // TODO(ctiller): delete when we remove the filter based API (may require some // cleanup in wrapped languages: they depend on this to hold slice refs) ServerMetadataHandle recv_initial_metadata_; ServerMetadataHandle recv_trailing_metadata_; }; void ClientPromiseBasedCall::StartPromise( ClientMetadataHandle client_initial_metadata, const Completion& completion, Party::BulkSpawner& spawner) { auto token = ClientInitialMetadataOutstandingToken::New(arena()); spawner.Spawn( "call_send_initial_metadata", token.Wait(), [this, completion = AddOpToCompletion( completion, PendingOp::kSendInitialMetadata)](bool result) mutable { if (!result) FailCompletion(completion); FinishOpOnCompletion(&completion, PendingOp::kSendInitialMetadata); }); spawner.Spawn( "client_promise", [this, client_initial_metadata = std::move(client_initial_metadata), token = std::move(token)]() mutable { return Race( cancel_error_.Wait(), Map(channel()->channel_stack()->MakeClientCallPromise(CallArgs{ std::move(client_initial_metadata), std::move(token), &polling_entity_, &server_initial_metadata_.sender, &client_to_server_messages_.receiver, &server_to_client_messages_.sender}), [this](ServerMetadataHandle trailing_metadata) { // If we're cancelled the transport doesn't get to return // stats. AcceptTransportStatsFromContext(); return trailing_metadata; })); }, [this](ServerMetadataHandle trailing_metadata) { Finish(std::move(trailing_metadata)); }); } grpc_call_error ClientPromiseBasedCall::ValidateBatch(const grpc_op* ops, size_t nops) const { BitSet<8> got_ops; for (size_t op_idx = 0; op_idx < nops; op_idx++) { const grpc_op& op = ops[op_idx]; switch (op.op) { case GRPC_OP_SEND_INITIAL_METADATA: if (!AreInitialMetadataFlagsValid(op.flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } if (!ValidateMetadata(op.data.send_initial_metadata.count, op.data.send_initial_metadata.metadata)) { return GRPC_CALL_ERROR_INVALID_METADATA; } break; case GRPC_OP_SEND_MESSAGE: if (!AreWriteFlagsValid(op.flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } break; case GRPC_OP_RECV_INITIAL_METADATA: case GRPC_OP_RECV_MESSAGE: if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: if (scheduled_send_close_) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; break; case GRPC_OP_RECV_STATUS_ON_CLIENT: if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (scheduled_receive_status_) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } break; case GRPC_OP_RECV_CLOSE_ON_SERVER: case GRPC_OP_SEND_STATUS_FROM_SERVER: return GRPC_CALL_ERROR_NOT_ON_CLIENT; } if (got_ops.is_set(op.op)) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; } got_ops.set(op.op); } return GRPC_CALL_OK; } void ClientPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops, const Completion& completion) { Party::BulkSpawner spawner(this); for (size_t op_idx = 0; op_idx < nops; op_idx++) { const grpc_op& op = ops[op_idx]; switch (op.op) { case GRPC_OP_SEND_INITIAL_METADATA: { if (started_.exchange(true, std::memory_order_relaxed)) break; CToMetadata(op.data.send_initial_metadata.metadata, op.data.send_initial_metadata.count, send_initial_metadata_.get()); PrepareOutgoingInitialMetadata(op, *send_initial_metadata_); if (send_deadline() != Timestamp::InfFuture()) { send_initial_metadata_->Set(GrpcTimeoutMetadata(), send_deadline()); } send_initial_metadata_->Set( WaitForReady(), WaitForReady::ValueType{ (op.flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) != 0, (op.flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET) != 0}); StartPromise(std::move(send_initial_metadata_), completion, spawner); } break; case GRPC_OP_RECV_INITIAL_METADATA: { StartRecvInitialMetadata( op.data.recv_initial_metadata.recv_initial_metadata, completion, spawner); } break; case GRPC_OP_RECV_STATUS_ON_CLIENT: { scheduled_receive_status_ = true; StartRecvStatusOnClient(completion, op.data.recv_status_on_client, spawner); } break; case GRPC_OP_SEND_MESSAGE: StartSendMessage(op, completion, &client_to_server_messages_.sender, spawner); break; case GRPC_OP_RECV_MESSAGE: StartRecvMessage( op, completion, [this]() { return Race(server_initial_metadata_.receiver.AwaitClosed(), server_to_client_messages_.receiver.AwaitClosed()); }, &server_to_client_messages_.receiver, false, spawner); break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: scheduled_send_close_ = true; spawner.Spawn( "send_close_from_client", [this]() { client_to_server_messages_.sender.Close(); return Empty{}; }, [this, completion = AddOpToCompletion( completion, PendingOp::kSendCloseFromClient)](Empty) mutable { FinishOpOnCompletion(&completion, PendingOp::kSendCloseFromClient); }); break; case GRPC_OP_SEND_STATUS_FROM_SERVER: case GRPC_OP_RECV_CLOSE_ON_SERVER: abort(); // unreachable } } } grpc_call_error ClientPromiseBasedCall::StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) { if (nops == 0) { EndOpImmediately(cq(), notify_tag, is_notify_tag_closure); return GRPC_CALL_OK; } const grpc_call_error validation_result = ValidateBatch(ops, nops); if (validation_result != GRPC_CALL_OK) { return validation_result; } Completion completion = StartCompletion(notify_tag, is_notify_tag_closure, ops); CommitBatch(ops, nops, completion); FinishOpOnCompletion(&completion, PendingOp::kStartingBatch); return GRPC_CALL_OK; } void ClientPromiseBasedCall::StartRecvInitialMetadata( grpc_metadata_array* array, const Completion& completion, Party::BulkSpawner& spawner) { spawner.Spawn( "recv_initial_metadata", [this]() { return Race(server_initial_metadata_.receiver.Next(), Map(finished(), [](Empty) { return NextResult(true); })); }, [this, array, completion = AddOpToCompletion(completion, PendingOp::kReceiveInitialMetadata)]( NextResult next_metadata) mutable { server_initial_metadata_.sender.Close(); ServerMetadataHandle metadata; if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvTrailingMetadata: %s", DebugTag().c_str(), next_metadata.has_value() ? next_metadata.value()->DebugString().c_str() : "null"); } if (next_metadata.has_value()) { metadata = std::move(next_metadata.value()); is_trailers_only_ = metadata->get(GrpcTrailersOnly()).value_or(false); } else { is_trailers_only_ = true; metadata = arena()->MakePooled(); } ProcessIncomingInitialMetadata(*metadata); PublishMetadataArray(metadata.get(), array, true); recv_initial_metadata_ = std::move(metadata); FinishOpOnCompletion(&completion, PendingOp::kReceiveInitialMetadata); }); } void ClientPromiseBasedCall::Finish(ServerMetadataHandle trailing_metadata) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] Finish: %s", DebugTag().c_str(), trailing_metadata->DebugString().c_str()); } ResetDeadline(); set_completed(); client_to_server_messages_.sender.CloseWithError(); client_to_server_messages_.receiver.CloseWithError(); if (trailing_metadata->get(GrpcCallWasCancelled()).value_or(false)) { server_to_client_messages_.receiver.CloseWithError(); server_initial_metadata_.receiver.CloseWithError(); } if (auto* channelz_channel = channel()->channelz_node()) { if (trailing_metadata->get(GrpcStatusMetadata()) .value_or(GRPC_STATUS_UNKNOWN) == GRPC_STATUS_OK) { channelz_channel->RecordCallSucceeded(); } else { channelz_channel->RecordCallFailed(); } } server_trailing_metadata_.Set(std::move(trailing_metadata)); } namespace { std::string MakeErrorString(const ServerMetadata* trailing_metadata) { std::string out = absl::StrCat( trailing_metadata->get(GrpcStatusFromWire()).value_or(false) ? "Error received from peer" : "Error generated by client", "grpc_status: ", grpc_status_code_to_string(trailing_metadata->get(GrpcStatusMetadata()) .value_or(GRPC_STATUS_UNKNOWN))); if (const Slice* message = trailing_metadata->get_pointer(GrpcMessageMetadata())) { absl::StrAppend(&out, "\ngrpc_message: ", message->as_string_view()); } if (auto annotations = trailing_metadata->get_pointer(GrpcStatusContext())) { absl::StrAppend(&out, "\nStatus Context:"); for (const std::string& annotation : *annotations) { absl::StrAppend(&out, "\n ", annotation); } } return out; } } // namespace void ClientPromiseBasedCall::StartRecvStatusOnClient( const Completion& completion, grpc_op::grpc_op_data::grpc_op_recv_status_on_client op_args, Party::BulkSpawner& spawner) { ForceCompletionSuccess(completion); spawner.Spawn( "recv_status_on_client", server_trailing_metadata_.Wait(), [this, op_args, completion = AddOpToCompletion(completion, PendingOp::kReceiveStatusOnClient)]( ServerMetadataHandle trailing_metadata) mutable { const grpc_status_code status = trailing_metadata->get(GrpcStatusMetadata()) .value_or(GRPC_STATUS_UNKNOWN); *op_args.status = status; Slice message_slice; if (Slice* message = trailing_metadata->get_pointer(GrpcMessageMetadata())) { message_slice = message->Ref(); } SetFinalizationStatus(status, message_slice.Ref()); *op_args.status_details = message_slice.TakeCSlice(); if (op_args.error_string != nullptr && status != GRPC_STATUS_OK) { *op_args.error_string = gpr_strdup(MakeErrorString(trailing_metadata.get()).c_str()); } PublishMetadataArray(trailing_metadata.get(), op_args.trailing_metadata, true); recv_trailing_metadata_ = std::move(trailing_metadata); FinishOpOnCompletion(&completion, PendingOp::kReceiveStatusOnClient); }); } #endif /////////////////////////////////////////////////////////////////////////////// // ServerPromiseBasedCall #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL class ServerPromiseBasedCall final : public PromiseBasedCall, public ServerCallContext { public: ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args); void OrphanCall() override {} void CancelWithError(grpc_error_handle) override; grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) override; bool is_trailers_only() const override { Crash("is_trailers_only not implemented for server calls"); } absl::string_view GetServerAuthority() const override { const Slice* authority_metadata = client_initial_metadata_->get_pointer(HttpAuthorityMetadata()); if (authority_metadata == nullptr) return ""; return authority_metadata->as_string_view(); } // Polling order for the server promise stack: // // │ ┌───────────────────────────────────────┐ // │ │ ServerPromiseBasedCall ├──► Lifetime management // │ ├───────────────────────────────────────┤ // │ │ ConnectedChannel ├─┐ // │ ├───────────────────────────────────────┤ └► Interactions with the // │ │ ... closest to transport filter │ transport - send/recv msgs // │ ├───────────────────────────────────────┤ and metadata, call phase // │ │ ... │ ordering // │ ├───────────────────────────────────────┤ // │ │ ... closest to app filter │ ┌► Request matching, initial // │ ├───────────────────────────────────────┤ │ setup, publishing call to // │ │ Server::ChannelData::MakeCallPromise ├─┘ application // │ ├───────────────────────────────────────┤ // │ │ MakeTopOfServerCallPromise ├──► Send trailing metadata // ▼ └───────────────────────────────────────┘ // Polling & // instantiation // order std::string DebugTag() const override { return absl::StrFormat("SERVER_CALL[%p]: ", this); } ServerCallContext* server_call_context() override { return this; } const void* server_stream_data() override { return server_transport_data_; } void PublishInitialMetadata( ClientMetadataHandle metadata, grpc_metadata_array* publish_initial_metadata) override; ArenaPromise MakeTopOfServerCallPromise( CallArgs call_args, grpc_completion_queue* cq, absl::FunctionRef publish) override; private: class RecvCloseOpCancelState { public: // Request that receiver be filled in per // grpc_op_recv_close_on_server. Returns true if the request can // be fulfilled immediately. Returns false if the request will be // fulfilled later. bool ReceiveCloseOnServerOpStarted(int* receiver) { uintptr_t state = state_.load(std::memory_order_acquire); uintptr_t new_state; do { switch (state) { case kUnset: new_state = reinterpret_cast(receiver); break; case kFinishedWithFailure: *receiver = 1; return true; case kFinishedWithSuccess: *receiver = 0; return true; default: Crash("Two threads offered ReceiveCloseOnServerOpStarted"); } } while (!state_.compare_exchange_weak(state, new_state, std::memory_order_acq_rel, std::memory_order_acquire)); return false; } // Mark the call as having completed. // Returns true if this finishes a previous // RequestReceiveCloseOnServer. bool CompleteCallWithCancelledSetTo(bool cancelled) { uintptr_t state = state_.load(std::memory_order_acquire); uintptr_t new_state; bool r; do { switch (state) { case kUnset: new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess; r = false; break; case kFinishedWithFailure: return false; case kFinishedWithSuccess: Crash("unreachable"); default: new_state = cancelled ? kFinishedWithFailure : kFinishedWithSuccess; r = true; } } while (!state_.compare_exchange_weak(state, new_state, std::memory_order_acq_rel, std::memory_order_acquire)); if (r) *reinterpret_cast(state) = cancelled ? 1 : 0; return r; } std::string ToString() const { auto state = state_.load(std::memory_order_relaxed); switch (state) { case kUnset: return "Unset"; case kFinishedWithFailure: return "FinishedWithFailure"; case kFinishedWithSuccess: return "FinishedWithSuccess"; default: return absl::StrFormat("WaitingForReceiver(%p)", reinterpret_cast(state)); } } private: static constexpr uintptr_t kUnset = 0; static constexpr uintptr_t kFinishedWithFailure = 1; static constexpr uintptr_t kFinishedWithSuccess = 2; // Holds one of kUnset, kFinishedWithFailure, or // kFinishedWithSuccess OR an int* that wants to receive the // final status. std::atomic state_{kUnset}; }; void CommitBatch(const grpc_op* ops, size_t nops, const Completion& completion); void Finish(ServerMetadataHandle result); ServerInterface* const server_; const void* const server_transport_data_; PipeSender* server_initial_metadata_ = nullptr; PipeSender* server_to_client_messages_ = nullptr; PipeReceiver* client_to_server_messages_ = nullptr; Latch send_trailing_metadata_; RecvCloseOpCancelState recv_close_op_cancel_state_; ClientMetadataHandle client_initial_metadata_; Completion recv_close_completion_; std::atomic cancelled_{false}; }; ServerPromiseBasedCall::ServerPromiseBasedCall(Arena* arena, grpc_call_create_args* args) : PromiseBasedCall(arena, 0, *args), server_(args->server), server_transport_data_(args->server_transport_data) { global_stats().IncrementServerCallsCreated(); channelz::ServerNode* channelz_node = server_->channelz_node(); if (channelz_node != nullptr) { channelz_node->RecordCallStarted(); } ScopedContext activity_context(this); // TODO(yashykt): In the future, we want to also enable stats and trace // collecting from when the call is created at the transport. The idea is that // the transport would create the call tracer and pass it in as part of the // metadata. // TODO(yijiem): OpenCensus and internal Census is still using this way to // set server call tracer. We need to refactor them to stats plugins // (including removing the client channel filters). if (args->server != nullptr && args->server->server_call_tracer_factory() != nullptr) { auto* server_call_tracer = args->server->server_call_tracer_factory()->CreateNewServerCallTracer( arena, args->server->channel_args()); if (server_call_tracer != nullptr) { // Note that we are setting both // GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE and // GRPC_CONTEXT_CALL_TRACER as a matter of convenience. In the future // promise-based world, we would just a single tracer object for each // stack (call, subchannel_call, server_call.) ContextSet(GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE, server_call_tracer, nullptr); ContextSet(GRPC_CONTEXT_CALL_TRACER, server_call_tracer, nullptr); } } args->channel->channel_stack()->stats_plugin_group->AddServerCallTracers( context()); Spawn("server_promise", channel()->channel_stack()->MakeServerCallPromise( CallArgs{nullptr, ClientInitialMetadataOutstandingToken::Empty(), nullptr, nullptr, nullptr, nullptr}), [this](ServerMetadataHandle result) { Finish(std::move(result)); }); } void ServerPromiseBasedCall::Finish(ServerMetadataHandle result) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] Finish: recv_close_state:%s result:%s", DebugTag().c_str(), recv_close_op_cancel_state_.ToString().c_str(), result->DebugString().c_str()); } const auto status = result->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN); channelz::ServerNode* channelz_node = server_->channelz_node(); if (channelz_node != nullptr) { if (status == GRPC_STATUS_OK) { channelz_node->RecordCallSucceeded(); } else { channelz_node->RecordCallFailed(); } } bool was_cancelled = result->get(GrpcCallWasCancelled()).value_or(true); if (recv_close_op_cancel_state_.CompleteCallWithCancelledSetTo( was_cancelled)) { FinishOpOnCompletion(&recv_close_completion_, PendingOp::kReceiveCloseOnServer); } if (was_cancelled) set_failed_before_recv_message(); if (server_initial_metadata_ != nullptr) { server_initial_metadata_->Close(); } Slice message_slice; if (Slice* message = result->get_pointer(GrpcMessageMetadata())) { message_slice = message->Ref(); } AcceptTransportStatsFromContext(); SetFinalizationStatus(status, std::move(message_slice)); set_completed(); ResetDeadline(); PropagateCancellationToChildren(); } grpc_call_error ValidateServerBatch(const grpc_op* ops, size_t nops) { BitSet<8> got_ops; for (size_t op_idx = 0; op_idx < nops; op_idx++) { const grpc_op& op = ops[op_idx]; switch (op.op) { case GRPC_OP_SEND_INITIAL_METADATA: if (!AreInitialMetadataFlagsValid(op.flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } if (!ValidateMetadata(op.data.send_initial_metadata.count, op.data.send_initial_metadata.metadata)) { return GRPC_CALL_ERROR_INVALID_METADATA; } break; case GRPC_OP_SEND_MESSAGE: if (!AreWriteFlagsValid(op.flags)) { return GRPC_CALL_ERROR_INVALID_FLAGS; } break; case GRPC_OP_SEND_STATUS_FROM_SERVER: if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; if (!ValidateMetadata( op.data.send_status_from_server.trailing_metadata_count, op.data.send_status_from_server.trailing_metadata)) { return GRPC_CALL_ERROR_INVALID_METADATA; } break; case GRPC_OP_RECV_MESSAGE: case GRPC_OP_RECV_CLOSE_ON_SERVER: if (op.flags != 0) return GRPC_CALL_ERROR_INVALID_FLAGS; break; case GRPC_OP_RECV_INITIAL_METADATA: case GRPC_OP_SEND_CLOSE_FROM_CLIENT: case GRPC_OP_RECV_STATUS_ON_CLIENT: return GRPC_CALL_ERROR_NOT_ON_SERVER; } if (got_ops.is_set(op.op)) return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; got_ops.set(op.op); } return GRPC_CALL_OK; } void ServerPromiseBasedCall::CommitBatch(const grpc_op* ops, size_t nops, const Completion& completion) { Party::BulkSpawner spawner(this); for (size_t op_idx = 0; op_idx < nops; op_idx++) { const grpc_op& op = ops[op_idx]; switch (op.op) { case GRPC_OP_SEND_INITIAL_METADATA: { auto metadata = arena()->MakePooled(); PrepareOutgoingInitialMetadata(op, *metadata); CToMetadata(op.data.send_initial_metadata.metadata, op.data.send_initial_metadata.count, metadata.get()); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] Send initial metadata", DebugTag().c_str()); } QueueSend(); spawner.Spawn( "call_send_initial_metadata", [this, metadata = std::move(metadata)]() mutable { EnactSend(); return server_initial_metadata_->Push(std::move(metadata)); }, [this, completion = AddOpToCompletion( completion, PendingOp::kSendInitialMetadata)](bool r) mutable { if (!r) { set_failed_before_recv_message(); FailCompletion(completion); } FinishOpOnCompletion(&completion, PendingOp::kSendInitialMetadata); }); } break; case GRPC_OP_SEND_MESSAGE: StartSendMessage(op, completion, server_to_client_messages_, spawner); break; case GRPC_OP_RECV_MESSAGE: if (cancelled_.load(std::memory_order_relaxed)) { set_failed_before_recv_message(); FailCompletion(completion); break; } StartRecvMessage( op, completion, []() { return []() { return Empty{}; }; }, client_to_server_messages_, true, spawner); break; case GRPC_OP_SEND_STATUS_FROM_SERVER: { auto metadata = arena()->MakePooled(); CToMetadata(op.data.send_status_from_server.trailing_metadata, op.data.send_status_from_server.trailing_metadata_count, metadata.get()); metadata->Set(GrpcStatusMetadata(), op.data.send_status_from_server.status); if (auto* details = op.data.send_status_from_server.status_details) { // TODO(ctiller): this should not be a copy, but we have callers that // allocate and pass in a slice created with // grpc_slice_from_static_string and then delete the string after // passing it in, which shouldn't be a supported API. metadata->Set(GrpcMessageMetadata(), Slice(grpc_slice_copy(*details))); } spawner.Spawn( "call_send_status_from_server", [this, metadata = std::move(metadata)]() mutable { bool r = true; if (send_trailing_metadata_.is_set()) { r = false; } else { send_trailing_metadata_.Set(std::move(metadata)); } return Map(WaitForSendingStarted(), [this, r](Empty) { server_initial_metadata_->Close(); server_to_client_messages_->Close(); return r; }); }, [this, completion = AddOpToCompletion( completion, PendingOp::kSendStatusFromServer)]( bool ok) mutable { if (!ok) { set_failed_before_recv_message(); FailCompletion(completion); } FinishOpOnCompletion(&completion, PendingOp::kSendStatusFromServer); }); } break; case GRPC_OP_RECV_CLOSE_ON_SERVER: if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] StartBatch: RecvClose %s", DebugTag().c_str(), recv_close_op_cancel_state_.ToString().c_str()); } ForceCompletionSuccess(completion); recv_close_completion_ = AddOpToCompletion(completion, PendingOp::kReceiveCloseOnServer); if (recv_close_op_cancel_state_.ReceiveCloseOnServerOpStarted( op.data.recv_close_on_server.cancelled)) { FinishOpOnCompletion(&recv_close_completion_, PendingOp::kReceiveCloseOnServer); } break; case GRPC_OP_RECV_STATUS_ON_CLIENT: case GRPC_OP_SEND_CLOSE_FROM_CLIENT: case GRPC_OP_RECV_INITIAL_METADATA: abort(); // unreachable } } } grpc_call_error ServerPromiseBasedCall::StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) { if (nops == 0) { EndOpImmediately(cq(), notify_tag, is_notify_tag_closure); return GRPC_CALL_OK; } const grpc_call_error validation_result = ValidateServerBatch(ops, nops); if (validation_result != GRPC_CALL_OK) { return validation_result; } Completion completion = StartCompletion(notify_tag, is_notify_tag_closure, ops); CommitBatch(ops, nops, completion); FinishOpOnCompletion(&completion, PendingOp::kStartingBatch); return GRPC_CALL_OK; } void ServerPromiseBasedCall::CancelWithError(absl::Status error) { cancelled_.store(true, std::memory_order_relaxed); Spawn( "cancel_with_error", [this, error = std::move(error)]() { if (!send_trailing_metadata_.is_set()) { auto md = ServerMetadataFromStatus(error); md->Set(GrpcCallWasCancelled(), true); send_trailing_metadata_.Set(std::move(md)); } if (server_to_client_messages_ != nullptr) { server_to_client_messages_->Close(); } if (server_initial_metadata_ != nullptr) { server_initial_metadata_->Close(); } return Empty{}; }, [](Empty) {}); } #endif #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL void ServerPromiseBasedCall::PublishInitialMetadata( ClientMetadataHandle metadata, grpc_metadata_array* publish_initial_metadata) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(), metadata->DebugString().c_str()); } PublishMetadataArray(metadata.get(), publish_initial_metadata, false); client_initial_metadata_ = std::move(metadata); } ArenaPromise ServerPromiseBasedCall::MakeTopOfServerCallPromise( CallArgs call_args, grpc_completion_queue* cq, absl::FunctionRef publish) { SetCompletionQueue(cq); call_args.polling_entity->Set( grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq))); server_to_client_messages_ = call_args.server_to_client_messages; client_to_server_messages_ = call_args.client_to_server_messages; server_initial_metadata_ = call_args.server_initial_metadata; set_send_deadline(deadline()); ProcessIncomingInitialMetadata(*client_initial_metadata_); ExternalRef(); publish(c_ptr()); return Seq(server_to_client_messages_->AwaitClosed(), send_trailing_metadata_.Wait()); } /////////////////////////////////////////////////////////////////////////////// // CallSpine based Server Call class ServerCallSpine final : public CallSpineInterface, public ServerCallContext, public BasicPromiseBasedCall { public: ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena); // CallSpineInterface Pipe& client_initial_metadata() override { return client_initial_metadata_; } Pipe& server_initial_metadata() override { return server_initial_metadata_; } Pipe& client_to_server_messages() override { return client_to_server_messages_; } Pipe& server_to_client_messages() override { return server_to_client_messages_; } Pipe& server_trailing_metadata() override { return server_trailing_metadata_; } Latch& cancel_latch() override { return cancel_latch_; } Party& party() override { return *this; } Arena* arena() override { return BasicPromiseBasedCall::arena(); } void IncrementRefCount() override { InternalRef("CallSpine"); } void Unref() override { InternalUnref("CallSpine"); } // PromiseBasedCall void OrphanCall() override { ResetDeadline(); CancelWithError(absl::CancelledError()); } void CancelWithError(grpc_error_handle error) override { SpawnInfallible("CancelWithError", [this, error = std::move(error)] { std::ignore = Cancel(ServerMetadataFromStatus(error)); return Empty{}; }); } bool is_trailers_only() const override { Crash("is_trailers_only not implemented for server calls"); } absl::string_view GetServerAuthority() const override { Crash("unimplemented"); } grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) override; bool Completed() final { Crash("unimplemented"); } bool failed_before_recv_message() const final { Crash("unimplemented"); } ServerCallContext* server_call_context() override { return this; } const void* server_stream_data() override { Crash("unimplemented"); } void PublishInitialMetadata( ClientMetadataHandle metadata, grpc_metadata_array* publish_initial_metadata) override; ArenaPromise MakeTopOfServerCallPromise( CallArgs, grpc_completion_queue*, absl::FunctionRef) override { Crash("unimplemented"); } bool RunParty() override { ScopedContext ctx(this); return Party::RunParty(); } private: void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure); StatusFlag FinishRecvMessage(NextResult result); std::string DebugTag() const override { return absl::StrFormat("SERVER_CALL_SPINE[%p]: ", this); } // Initial metadata from client to server Pipe client_initial_metadata_; // Initial metadata from server to client Pipe server_initial_metadata_; // Messages travelling from the application to the transport. Pipe client_to_server_messages_; // Messages travelling from the transport to the application. Pipe server_to_client_messages_; // Trailing metadata from server to client Pipe server_trailing_metadata_; // Latch that can be set to terminate the call Latch cancel_latch_; grpc_byte_buffer** recv_message_ = nullptr; ClientMetadataHandle client_initial_metadata_stored_; }; ServerCallSpine::ServerCallSpine(ServerInterface* server, Channel* channel, Arena* arena) : BasicPromiseBasedCall(arena, 0, 1, [channel, server]() -> grpc_call_create_args { grpc_call_create_args args; args.channel = channel->Ref(); args.server = server; args.parent = nullptr; args.propagation_mask = 0; args.cq = nullptr; args.pollset_set_alternative = nullptr; args.server_transport_data = &args; // Arbitrary non-null pointer args.send_deadline = Timestamp::InfFuture(); return args; }()), client_initial_metadata_(arena), server_initial_metadata_(arena), client_to_server_messages_(arena), server_to_client_messages_(arena), server_trailing_metadata_(arena) { global_stats().IncrementServerCallsCreated(); ScopedContext ctx(this); channel->channel_stack()->InitServerCallSpine(this); } void ServerCallSpine::PublishInitialMetadata( ClientMetadataHandle metadata, grpc_metadata_array* publish_initial_metadata) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] PublishInitialMetadata: %s", DebugTag().c_str(), metadata->DebugString().c_str()); } PublishMetadataArray(metadata.get(), publish_initial_metadata, false); client_initial_metadata_stored_ = std::move(metadata); } grpc_call_error ServerCallSpine::StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) { if (nops == 0) { EndOpImmediately(cq(), notify_tag, is_notify_tag_closure); return GRPC_CALL_OK; } const grpc_call_error validation_result = ValidateServerBatch(ops, nops); if (validation_result != GRPC_CALL_OK) { return validation_result; } CommitBatch(ops, nops, notify_tag, is_notify_tag_closure); return GRPC_CALL_OK; } namespace { template class MaybeOpImpl { public: using SetupResult = decltype(std::declval()(grpc_op())); using PromiseFactory = promise_detail::OncePromiseFactory; using Promise = typename PromiseFactory::Promise; struct Dismissed {}; using State = absl::variant; // op_ is garbage but shouldn't be uninitialized MaybeOpImpl() : state_(Dismissed{}), op_(GRPC_OP_RECV_STATUS_ON_CLIENT) {} MaybeOpImpl(SetupResult result, grpc_op_type op) : state_(PromiseFactory(std::move(result))), op_(op) {} MaybeOpImpl(const MaybeOpImpl&) = delete; MaybeOpImpl& operator=(const MaybeOpImpl&) = delete; MaybeOpImpl(MaybeOpImpl&& other) noexcept : state_(MoveState(other.state_)), op_(other.op_) {} MaybeOpImpl& operator=(MaybeOpImpl&& other) noexcept { op_ = other.op_; if (absl::holds_alternative(state_)) { state_.template emplace(); return *this; } // Can't move after first poll => Promise is not an option state_.template emplace( std::move(absl::get(other.state_))); return *this; } Poll operator()() { if (absl::holds_alternative(state_)) return Success{}; if (absl::holds_alternative(state_)) { auto& factory = absl::get(state_); auto promise = factory.Make(); state_.template emplace(std::move(promise)); } if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%sBeginPoll %s", Activity::current()->DebugTag().c_str(), OpName(op_).c_str()); } auto& promise = absl::get(state_); auto r = poll_cast(promise()); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%sEndPoll %s --> %s", Activity::current()->DebugTag().c_str(), OpName(op_).c_str(), r.pending() ? "PENDING" : (r.value().ok() ? "OK" : "FAILURE")); } return r; } private: GPR_NO_UNIQUE_ADDRESS State state_; GPR_NO_UNIQUE_ADDRESS grpc_op_type op_; static std::string OpName(grpc_op_type op) { switch (op) { case GRPC_OP_SEND_INITIAL_METADATA: return "SendInitialMetadata"; case GRPC_OP_SEND_MESSAGE: return "SendMessage"; case GRPC_OP_SEND_STATUS_FROM_SERVER: return "SendStatusFromServer"; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: return "SendCloseFromClient"; case GRPC_OP_RECV_MESSAGE: return "RecvMessage"; case GRPC_OP_RECV_CLOSE_ON_SERVER: return "RecvCloseOnServer"; case GRPC_OP_RECV_INITIAL_METADATA: return "RecvInitialMetadata"; case GRPC_OP_RECV_STATUS_ON_CLIENT: return "RecvStatusOnClient"; } return absl::StrCat("UnknownOp(", op, ")"); } static State MoveState(State& state) { if (absl::holds_alternative(state)) return Dismissed{}; // Can't move after first poll => Promise is not an option return std::move(absl::get(state)); } }; // MaybeOp captures a fairly complicated dance we need to do for the batch API. // We first check if an op is included or not, and if it is, we run the setup // function in the context of the API call (NOT in the call party). // This setup function returns a promise factory which we'll then run *in* the // party to do initial setup, and have it return the promise that we'll // ultimately poll on til completion. // Once we express our surface API in terms of core internal types this whole // dance will go away. template auto MaybeOp(const grpc_op* ops, uint8_t idx, SetupFn setup) { if (idx == 255) { return MaybeOpImpl(); } else { return MaybeOpImpl(setup(ops[idx]), ops[idx].op); } } template class PollBatchLogger { public: PollBatchLogger(void* tag, F f) : tag_(tag), f_(std::move(f)) {} auto operator()() { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "Poll batch %p", tag_); } auto r = f_(); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "Poll batch %p --> %s", tag_, ResultString(r).c_str()); } return r; } private: template static std::string ResultString(Poll r) { if (r.pending()) return "PENDING"; return ResultString(r.value()); } static std::string ResultString(Empty) { return "DONE"; } void* tag_; F f_; }; template PollBatchLogger LogPollBatch(void* tag, F f) { return PollBatchLogger(tag, std::move(f)); } } // namespace StatusFlag ServerCallSpine::FinishRecvMessage( NextResult result) { if (result.has_value()) { MessageHandle& message = *result; NoteLastMessageFlags(message->flags()); if ((message->flags() & GRPC_WRITE_INTERNAL_COMPRESS) && (incoming_compression_algorithm() != GRPC_COMPRESS_NONE)) { *recv_message_ = grpc_raw_compressed_byte_buffer_create( nullptr, 0, incoming_compression_algorithm()); } else { *recv_message_ = grpc_raw_byte_buffer_create(nullptr, 0); } grpc_slice_buffer_move_into(message->payload()->c_slice_buffer(), &(*recv_message_)->data.raw.slice_buffer); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvMessage: outstanding_recv " "finishes: received %" PRIdPTR " byte message", DebugTag().c_str(), (*recv_message_)->data.raw.slice_buffer.length); } recv_message_ = nullptr; return Success{}; } if (result.cancelled()) { if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvMessage: outstanding_recv " "finishes: received end-of-stream with error", DebugTag().c_str()); } *recv_message_ = nullptr; recv_message_ = nullptr; return Failure{}; } if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] RecvMessage: outstanding_recv " "finishes: received end-of-stream", DebugTag().c_str()); } *recv_message_ = nullptr; recv_message_ = nullptr; return Success{}; } void ServerCallSpine::CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, bool is_notify_tag_closure) { std::array got_ops{255, 255, 255, 255, 255, 255, 255, 255}; for (size_t op_idx = 0; op_idx < nops; op_idx++) { const grpc_op& op = ops[op_idx]; got_ops[op.op] = op_idx; } if (!is_notify_tag_closure) grpc_cq_begin_op(cq(), notify_tag); auto send_initial_metadata = MaybeOp( ops, got_ops[GRPC_OP_SEND_INITIAL_METADATA], [this](const grpc_op& op) { auto metadata = arena()->MakePooled(); PrepareOutgoingInitialMetadata(op, *metadata); CToMetadata(op.data.send_initial_metadata.metadata, op.data.send_initial_metadata.count, metadata.get()); if (grpc_call_trace.enabled()) { gpr_log(GPR_INFO, "%s[call] Send initial metadata", DebugTag().c_str()); } return [this, metadata = std::move(metadata)]() mutable { return Map(server_initial_metadata_.sender.Push(std::move(metadata)), [this](bool r) { server_initial_metadata_.sender.Close(); return StatusFlag(r); }); }; }); auto send_message = MaybeOp(ops, got_ops[GRPC_OP_SEND_MESSAGE], [this](const grpc_op& op) { SliceBuffer send; grpc_slice_buffer_swap( &op.data.send_message.send_message->data.raw.slice_buffer, send.c_slice_buffer()); auto msg = arena()->MakePooled(std::move(send), op.flags); return [this, msg = std::move(msg)]() mutable { return Map(server_to_client_messages_.sender.Push(std::move(msg)), [](bool r) { return StatusFlag(r); }); }; }); auto send_trailing_metadata = MaybeOp( ops, got_ops[GRPC_OP_SEND_STATUS_FROM_SERVER], [this](const grpc_op& op) { auto metadata = arena()->MakePooled(); CToMetadata(op.data.send_status_from_server.trailing_metadata, op.data.send_status_from_server.trailing_metadata_count, metadata.get()); metadata->Set(GrpcStatusMetadata(), op.data.send_status_from_server.status); if (auto* details = op.data.send_status_from_server.status_details) { // TODO(ctiller): this should not be a copy, but we have // callers that allocate and pass in a slice created with // grpc_slice_from_static_string and then delete the string // after passing it in, which shouldn't be a supported API. metadata->Set(GrpcMessageMetadata(), Slice(grpc_slice_copy(*details))); } return [this, metadata = std::move(metadata)]() mutable { server_to_client_messages_.sender.Close(); return Map(server_trailing_metadata_.sender.Push(std::move(metadata)), [](bool r) { return StatusFlag(r); }); }; }); auto recv_message = MaybeOp(ops, got_ops[GRPC_OP_RECV_MESSAGE], [this](const grpc_op& op) { GPR_ASSERT(recv_message_ == nullptr); recv_message_ = op.data.recv_message.recv_message; return [this]() mutable { return Map(client_to_server_messages_.receiver.Next(), [this](NextResult msg) { return FinishRecvMessage(std::move(msg)); }); }; }); auto primary_ops = AllOk( std::move(send_initial_metadata), std::move(send_message), std::move(send_trailing_metadata), std::move(recv_message)); if (got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER] != 255) { auto recv_trailing_metadata = MaybeOp( ops, got_ops[GRPC_OP_RECV_CLOSE_ON_SERVER], [this](const grpc_op& op) { return [this, cancelled = op.data.recv_close_on_server.cancelled]() { return Map(server_trailing_metadata_.receiver.AwaitClosed(), [cancelled, this](bool result) -> Success { ResetDeadline(); *cancelled = result ? 1 : 0; return Success{}; }); }; }); SpawnInfallible( "final-batch", [primary_ops = std::move(primary_ops), recv_trailing_metadata = std::move(recv_trailing_metadata), is_notify_tag_closure, notify_tag, this]() mutable { return LogPollBatch( notify_tag, Seq(std::move(primary_ops), std::move(recv_trailing_metadata), [is_notify_tag_closure, notify_tag, this](StatusFlag) { return WaitForCqEndOp(is_notify_tag_closure, notify_tag, absl::OkStatus(), cq()); })); }); } else { SpawnInfallible("batch", [primary_ops = std::move(primary_ops), is_notify_tag_closure, notify_tag, this]() mutable { return LogPollBatch( notify_tag, Seq(std::move(primary_ops), [is_notify_tag_closure, notify_tag, this](StatusFlag r) { return WaitForCqEndOp(is_notify_tag_closure, notify_tag, StatusCast(r), cq()); })); }); } } RefCountedPtr MakeServerCall(ServerInterface* server, Channel* channel, Arena* arena) { return RefCountedPtr( arena->New(server, channel, arena)); } #else RefCountedPtr MakeServerCall(ServerInterface*, Channel*, Arena*) { Crash("not implemented"); } #endif } // namespace grpc_core /////////////////////////////////////////////////////////////////////////////// // C-based API void* grpc_call_arena_alloc(grpc_call* call, size_t size) { grpc_core::ExecCtx exec_ctx; return grpc_core::Call::FromC(call)->arena()->Alloc(size); } size_t grpc_call_get_initial_size_estimate() { return grpc_core::FilterStackCall::InitialSizeEstimate(); } grpc_error_handle grpc_call_create(grpc_call_create_args* args, grpc_call** out_call) { #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_CLIENT_CALL if (grpc_core::IsPromiseBasedClientCallEnabled() && args->server_transport_data == nullptr && args->channel->is_promising()) { return grpc_core::MakePromiseBasedCall( args, out_call); } #endif #ifdef GRPC_EXPERIMENT_IS_INCLUDED_PROMISE_BASED_SERVER_CALL if (grpc_core::IsPromiseBasedServerCallEnabled() && args->server_transport_data != nullptr && args->channel->is_promising()) { return grpc_core::MakePromiseBasedCall( args, out_call); } #endif return grpc_core::FilterStackCall::Create(args, out_call); } void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq) { grpc_core::Call::FromC(call)->SetCompletionQueue(cq); } void grpc_call_ref(grpc_call* c) { grpc_core::Call::FromC(c)->ExternalRef(); } void grpc_call_unref(grpc_call* c) { grpc_core::ExecCtx exec_ctx; grpc_core::Call::FromC(c)->ExternalUnref(); } char* grpc_call_get_peer(grpc_call* call) { return grpc_core::Call::FromC(call)->GetPeer(); } grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element) { return grpc_core::FilterStackCall::FromTopElem(surface_element)->c_ptr(); } grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) { GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved)); GPR_ASSERT(reserved == nullptr); if (call == nullptr) { return GRPC_CALL_ERROR; } grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError()); return GRPC_CALL_OK; } grpc_call_error grpc_call_cancel_with_status(grpc_call* c, grpc_status_code status, const char* description, void* reserved) { GRPC_API_TRACE( "grpc_call_cancel_with_status(" "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == nullptr); if (c == nullptr) { return GRPC_CALL_ERROR; } grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; grpc_core::Call::FromC(c)->CancelWithStatus(status, description); return GRPC_CALL_OK; } void grpc_call_cancel_internal(grpc_call* call) { grpc_core::Call::FromC(call)->CancelWithError(absl::CancelledError()); } grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call* call) { return grpc_core::Call::FromC(call)->test_only_compression_algorithm(); } uint32_t grpc_call_test_only_get_message_flags(grpc_call* call) { return grpc_core::Call::FromC(call)->test_only_message_flags(); } uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call* call) { return grpc_core::Call::FromC(call) ->encodings_accepted_by_peer() .ToLegacyBitmask(); } grpc_core::Arena* grpc_call_get_arena(grpc_call* call) { return grpc_core::Call::FromC(call)->arena(); } grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) { return grpc_core::Call::FromC(call)->call_stack(); } grpc_call_error grpc_call_start_batch(grpc_call* call, const grpc_op* ops, size_t nops, void* tag, void* reserved) { GRPC_API_TRACE( "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, " "reserved=%p)", 5, (call, ops, (unsigned long)nops, tag, reserved)); if (reserved != nullptr || call == nullptr) { return GRPC_CALL_ERROR; } else { grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; grpc_core::ExecCtx exec_ctx; return grpc_core::Call::FromC(call)->StartBatch(ops, nops, tag, false); } } grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call, const grpc_op* ops, size_t nops, grpc_closure* closure) { return grpc_core::Call::FromC(call)->StartBatch(ops, nops, closure, true); } void grpc_call_context_set(grpc_call* call, grpc_context_index elem, void* value, void (*destroy)(void* value)) { return grpc_core::Call::FromC(call)->ContextSet(elem, value, destroy); } void* grpc_call_context_get(grpc_call* call, grpc_context_index elem) { return grpc_core::Call::FromC(call)->ContextGet(elem); } uint8_t grpc_call_is_client(grpc_call* call) { return grpc_core::Call::FromC(call)->is_client(); } grpc_compression_algorithm grpc_call_compression_for_level( grpc_call* call, grpc_compression_level level) { return grpc_core::Call::FromC(call) ->encodings_accepted_by_peer() .CompressionAlgorithmForLevel(level); } bool grpc_call_is_trailers_only(const grpc_call* call) { return grpc_core::Call::FromC(call)->is_trailers_only(); } int grpc_call_failed_before_recv_message(const grpc_call* c) { return grpc_core::Call::FromC(c)->failed_before_recv_message(); } absl::string_view grpc_call_server_authority(const grpc_call* call) { return grpc_core::Call::FromC(call)->GetServerAuthority(); } const char* grpc_call_error_to_string(grpc_call_error error) { switch (error) { case GRPC_CALL_ERROR: return "GRPC_CALL_ERROR"; case GRPC_CALL_ERROR_ALREADY_ACCEPTED: return "GRPC_CALL_ERROR_ALREADY_ACCEPTED"; case GRPC_CALL_ERROR_ALREADY_FINISHED: return "GRPC_CALL_ERROR_ALREADY_FINISHED"; case GRPC_CALL_ERROR_ALREADY_INVOKED: return "GRPC_CALL_ERROR_ALREADY_INVOKED"; case GRPC_CALL_ERROR_BATCH_TOO_BIG: return "GRPC_CALL_ERROR_BATCH_TOO_BIG"; case GRPC_CALL_ERROR_INVALID_FLAGS: return "GRPC_CALL_ERROR_INVALID_FLAGS"; case GRPC_CALL_ERROR_INVALID_MESSAGE: return "GRPC_CALL_ERROR_INVALID_MESSAGE"; case GRPC_CALL_ERROR_INVALID_METADATA: return "GRPC_CALL_ERROR_INVALID_METADATA"; case GRPC_CALL_ERROR_NOT_INVOKED: return "GRPC_CALL_ERROR_NOT_INVOKED"; case GRPC_CALL_ERROR_NOT_ON_CLIENT: return "GRPC_CALL_ERROR_NOT_ON_CLIENT"; case GRPC_CALL_ERROR_NOT_ON_SERVER: return "GRPC_CALL_ERROR_NOT_ON_SERVER"; case GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE: return "GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE"; case GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH: return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH"; case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS: return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS"; case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN: return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN"; case GRPC_CALL_OK: return "GRPC_CALL_OK"; } GPR_UNREACHABLE_CODE(return "GRPC_CALL_ERROR_UNKNOW"); } void grpc_call_run_in_event_engine(const grpc_call* call, absl::AnyInvocable cb) { grpc_core::Call::FromC(call)->event_engine()->Run(std::move(cb)); }