1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H 16 #define GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H 17 18 #include <grpc/byte_buffer.h> 19 #include <grpc/compression.h> 20 #include <grpc/event_engine/event_engine.h> 21 #include <grpc/grpc.h> 22 #include <grpc/impl/call.h> 23 #include <grpc/impl/propagation_bits.h> 24 #include <grpc/slice.h> 25 #include <grpc/slice_buffer.h> 26 #include <grpc/status.h> 27 #include <grpc/support/alloc.h> 28 #include <grpc/support/atm.h> 29 #include <grpc/support/port_platform.h> 30 #include <grpc/support/string_util.h> 31 #include <inttypes.h> 32 #include <limits.h> 33 #include <stdlib.h> 34 #include <string.h> 35 36 #include <atomic> 37 #include <cstdint> 38 #include <string> 39 40 #include "absl/status/status.h" 41 #include "absl/strings/str_format.h" 42 #include "absl/strings/string_view.h" 43 #include "src/core/lib/promise/status_flag.h" 44 #include "src/core/lib/resource_quota/arena.h" 45 #include "src/core/lib/surface/call.h" 46 #include "src/core/lib/surface/call_utils.h" 47 #include "src/core/lib/transport/metadata.h" 48 #include "src/core/util/crash.h" 49 #include "src/core/util/ref_counted.h" 50 #include "src/core/util/ref_counted_ptr.h" 51 #include "src/core/util/single_set_ptr.h" 52 53 namespace grpc_core { 54 55 class ClientCall final 56 : public Call, 57 public DualRefCounted<ClientCall, NonPolymorphicRefCount, 58 UnrefCallDestroy> { 59 public: 60 ClientCall(grpc_call* parent_call, uint32_t propagation_mask, 61 grpc_completion_queue* cq, Slice path, 62 absl::optional<Slice> authority, bool registered_method, 63 Timestamp deadline, grpc_compression_options compression_options, 64 RefCountedPtr<Arena> arena, 65 RefCountedPtr<UnstartedCallDestination> destination); 66 67 void CancelWithError(grpc_error_handle error) override; is_trailers_only()68 bool is_trailers_only() const override { return is_trailers_only_; } GetServerAuthority()69 absl::string_view GetServerAuthority() const override { 70 Crash("unimplemented"); 71 } 72 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, 73 bool is_notify_tag_closure) override; 74 ExternalRef()75 void ExternalRef() override { Ref().release(); } ExternalUnref()76 void ExternalUnref() override { Unref(); } InternalRef(const char *)77 void InternalRef(const char*) override { WeakRef().release(); } InternalUnref(const char *)78 void InternalUnref(const char*) override { WeakUnref(); } 79 Orphaned()80 void Orphaned() override { 81 if (!saw_trailing_metadata_.load(std::memory_order_relaxed)) { 82 CancelWithError(absl::CancelledError()); 83 } 84 } 85 SetCompletionQueue(grpc_completion_queue *)86 void SetCompletionQueue(grpc_completion_queue*) override { 87 Crash("unimplemented"); 88 } 89 compression_options()90 grpc_compression_options compression_options() override { 91 return compression_options_; 92 } 93 call_stack()94 grpc_call_stack* call_stack() override { return nullptr; } 95 96 char* GetPeer() override; 97 Completed()98 bool Completed() final { Crash("unimplemented"); } failed_before_recv_message()99 bool failed_before_recv_message() const final { 100 return started_call_initiator_.WasCancelledPushed(); 101 } 102 incoming_compression_algorithm()103 grpc_compression_algorithm incoming_compression_algorithm() override { 104 return message_receiver_.incoming_compression_algorithm(); 105 } 106 SetIncomingCompressionAlgorithm(grpc_compression_algorithm algorithm)107 void SetIncomingCompressionAlgorithm( 108 grpc_compression_algorithm algorithm) override { 109 message_receiver_.SetIncomingCompressionAlgorithm(algorithm); 110 } 111 test_only_message_flags()112 uint32_t test_only_message_flags() override { 113 return message_receiver_.last_message_flags(); 114 } 115 Destroy()116 void Destroy() { 117 auto arena = this->arena()->Ref(); 118 this->~ClientCall(); 119 } 120 121 private: 122 struct UnorderedStart { 123 absl::AnyInvocable<void()> start_pending_batch; 124 UnorderedStart* next; 125 }; 126 127 void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, 128 bool is_notify_tag_closure); 129 template <typename Batch> 130 void ScheduleCommittedBatch(Batch batch); 131 Party::WakeupHold StartCall(const grpc_op& send_initial_metadata_op); 132 // Attempt to start the call and send handler down the stack; returns true if 133 // state was updated, false otherwise (with cur_state updated to the new 134 // current state). 135 // If this function returns false, it's guaranteed that handler is not 136 // touched. 137 // Should be called repeatedly until it returns true. 138 bool StartCallMaybeUpdateState(uintptr_t& cur_state, 139 UnstartedCallHandler& handler); 140 DebugTag()141 std::string DebugTag() { return absl::StrFormat("CLIENT_CALL[%p]: ", this); } 142 void OnReceivedStatus(ServerMetadataHandle server_trailing_metadata, 143 grpc_status_code* out_status, 144 grpc_slice* out_status_details, 145 const char** out_error_string, 146 grpc_metadata_array* out_trailing_metadata); 147 148 // call_state_ is one of: 149 // 1. kUnstarted - call has not yet been started 150 // 2. pointer to an UnorderedStart - call has ops started, but no send initial 151 // metadata yet 152 // 3. kStarted - call has been started and call_initiator_ is ready 153 // 4. kCancelled - call was cancelled before starting 154 // In cases (1) and (2) send_initial_metadata_ is used to store the initial 155 // but unsent metadata. 156 // In case (3) started_call_initiator_ is used to store the call initiator. 157 // In case (4) no other state is used. 158 enum CallState : uintptr_t { 159 kUnstarted = 0, 160 kStarted = 1, 161 kCancelled = 2, 162 }; 163 std::atomic<uintptr_t> call_state_{kUnstarted}; 164 ClientMetadataHandle send_initial_metadata_{ 165 Arena::MakePooledForOverwrite<ClientMetadata>()}; 166 CallInitiator started_call_initiator_; 167 // Status passed to CancelWithError; 168 // if call_state_ == kCancelled then this is the authoritative status, 169 // otherwise the server trailing metadata from started_call_initiator_ is 170 // authoritative. 171 SingleSetPtr<absl::Status> cancel_status_; 172 MessageReceiver message_receiver_; 173 grpc_completion_queue* const cq_; 174 const RefCountedPtr<UnstartedCallDestination> call_destination_; 175 const grpc_compression_options compression_options_; 176 ServerMetadataHandle received_initial_metadata_; 177 ServerMetadataHandle received_trailing_metadata_; 178 bool is_trailers_only_; 179 std::atomic<bool> saw_trailing_metadata_{false}; 180 }; 181 182 grpc_call* MakeClientCall(grpc_call* parent_call, uint32_t propagation_mask, 183 grpc_completion_queue* cq, Slice path, 184 absl::optional<Slice> authority, 185 bool registered_method, Timestamp deadline, 186 grpc_compression_options compression_options, 187 RefCountedPtr<Arena> arena, 188 RefCountedPtr<UnstartedCallDestination> destination); 189 190 } // namespace grpc_core 191 192 #endif // GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H 193