1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H 16 #define GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H 17 18 #include <grpc/byte_buffer.h> 19 #include <grpc/compression.h> 20 #include <grpc/event_engine/event_engine.h> 21 #include <grpc/grpc.h> 22 #include <grpc/impl/call.h> 23 #include <grpc/impl/propagation_bits.h> 24 #include <grpc/slice.h> 25 #include <grpc/slice_buffer.h> 26 #include <grpc/status.h> 27 #include <grpc/support/alloc.h> 28 #include <grpc/support/atm.h> 29 #include <grpc/support/port_platform.h> 30 #include <grpc/support/string_util.h> 31 #include <inttypes.h> 32 #include <limits.h> 33 #include <stdlib.h> 34 #include <string.h> 35 36 #include <atomic> 37 #include <cstdint> 38 #include <string> 39 #include <vector> 40 41 #include "absl/log/check.h" 42 #include "absl/strings/str_cat.h" 43 #include "absl/strings/str_join.h" 44 #include "absl/strings/string_view.h" 45 #include "src/core/lib/channel/channel_stack.h" 46 #include "src/core/lib/iomgr/call_combiner.h" 47 #include "src/core/lib/iomgr/polling_entity.h" 48 #include "src/core/lib/promise/context.h" 49 #include "src/core/lib/resource_quota/arena.h" 50 #include "src/core/lib/slice/slice_buffer.h" 51 #include "src/core/lib/surface/call.h" 52 #include "src/core/lib/surface/channel.h" 53 #include "src/core/lib/surface/completion_queue.h" 54 #include "src/core/lib/transport/metadata_batch.h" 55 #include "src/core/lib/transport/transport.h" 56 #include "src/core/server/server_interface.h" 57 #include "src/core/telemetry/call_tracer.h" 58 #include "src/core/util/alloc.h" 59 #include "src/core/util/ref_counted.h" 60 #include "src/core/util/ref_counted_ptr.h" 61 62 namespace grpc_core { 63 64 /////////////////////////////////////////////////////////////////////////////// 65 // FilterStackCall 66 // To be removed once promise conversion is complete 67 68 class FilterStackCall final : public Call { 69 public: ~FilterStackCall()70 ~FilterStackCall() override { 71 gpr_free(static_cast<void*>(const_cast<char*>(final_info_.error_string))); 72 } 73 Completed()74 bool Completed() override { 75 return gpr_atm_acq_load(&received_final_op_atm_) != 0; 76 } 77 78 // TODO(ctiller): return absl::StatusOr<SomeSmartPointer<Call>>? 79 static grpc_error_handle Create(grpc_call_create_args* args, 80 grpc_call** out_call); 81 FromTopElem(grpc_call_element * elem)82 static Call* FromTopElem(grpc_call_element* elem) { 83 return FromCallStack(grpc_call_stack_from_top_element(elem)); 84 } 85 call_stack()86 grpc_call_stack* call_stack() override { 87 return reinterpret_cast<grpc_call_stack*>( 88 reinterpret_cast<char*>(this) + 89 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this))); 90 } 91 call_elem(size_t idx)92 grpc_call_element* call_elem(size_t idx) { 93 return grpc_call_stack_element(call_stack(), idx); 94 } 95 call_combiner()96 CallCombiner* call_combiner() { return &call_combiner_; } 97 98 void CancelWithError(grpc_error_handle error) override; 99 void SetCompletionQueue(grpc_completion_queue* cq) override; 100 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, 101 bool is_notify_tag_closure) override; ExternalRef()102 void ExternalRef() override { ext_ref_.Ref(); } 103 void ExternalUnref() override; InternalRef(const char * reason)104 void InternalRef(const char* reason) override { 105 GRPC_CALL_STACK_REF(call_stack(), reason); 106 } InternalUnref(const char * reason)107 void InternalUnref(const char* reason) override { 108 GRPC_CALL_STACK_UNREF(call_stack(), reason); 109 } 110 is_trailers_only()111 bool is_trailers_only() const override { 112 bool result = is_trailers_only_; 113 DCHECK(!result || recv_initial_metadata_.TransportSize() == 0); 114 return result; 115 } 116 failed_before_recv_message()117 bool failed_before_recv_message() const override { 118 return call_failed_before_recv_message_; 119 } 120 test_only_message_flags()121 uint32_t test_only_message_flags() override { 122 return test_only_last_message_flags_; 123 } 124 GetServerAuthority()125 absl::string_view GetServerAuthority() const override { 126 const Slice* authority_metadata = 127 recv_initial_metadata_.get_pointer(HttpAuthorityMetadata()); 128 if (authority_metadata == nullptr) return ""; 129 return authority_metadata->as_string_view(); 130 } 131 InitialSizeEstimate()132 static size_t InitialSizeEstimate() { 133 return sizeof(FilterStackCall) + 134 (sizeof(BatchControl) * kMaxConcurrentBatches); 135 } 136 137 char* GetPeer() final; 138 compression_options()139 grpc_compression_options compression_options() override { 140 return channel_->compression_options(); 141 } 142 DeleteThis()143 void DeleteThis() { 144 auto arena = this->arena()->Ref(); 145 this->~FilterStackCall(); 146 } 147 channel()148 Channel* channel() const { return channel_.get(); } 149 150 private: 151 class ScopedContext : public promise_detail::Context<Arena> { 152 public: ScopedContext(FilterStackCall * call)153 explicit ScopedContext(FilterStackCall* call) 154 : promise_detail::Context<Arena>(call->arena()) {} 155 }; 156 157 static constexpr gpr_atm kRecvNone = 0; 158 static constexpr gpr_atm kRecvInitialMetadataFirst = 1; 159 160 enum class PendingOp { 161 kRecvMessage, 162 kRecvInitialMetadata, 163 kRecvTrailingMetadata, 164 kSends 165 }; PendingOpMask(PendingOp op)166 static intptr_t PendingOpMask(PendingOp op) { 167 return static_cast<intptr_t>(1) << static_cast<intptr_t>(op); 168 } PendingOpString(intptr_t pending_ops)169 static std::string PendingOpString(intptr_t pending_ops) { 170 std::vector<absl::string_view> pending_op_strings; 171 if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) { 172 pending_op_strings.push_back("kRecvMessage"); 173 } 174 if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) { 175 pending_op_strings.push_back("kRecvInitialMetadata"); 176 } 177 if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) { 178 pending_op_strings.push_back("kRecvTrailingMetadata"); 179 } 180 if (pending_ops & PendingOpMask(PendingOp::kSends)) { 181 pending_op_strings.push_back("kSends"); 182 } 183 return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}"); 184 } 185 struct BatchControl { 186 FilterStackCall* call_ = nullptr; 187 CallTracerAnnotationInterface* call_tracer_ = nullptr; 188 grpc_transport_stream_op_batch op_; 189 // Share memory for cq_completion and notify_tag as they are never needed 190 // simultaneously. Each byte used in this data structure count as six bytes 191 // per call, so any savings we can make are worthwhile, 192 193 // We use notify_tag to determine whether or not to send notification to the 194 // completion queue. Once we've made that determination, we can reuse the 195 // memory for cq_completion. 196 union { 197 grpc_cq_completion cq_completion; 198 struct { 199 // Any given op indicates completion by either (a) calling a closure or 200 // (b) sending a notification on the call's completion queue. If 201 // \a is_closure is true, \a tag indicates a closure to be invoked; 202 // otherwise, \a tag indicates the tag to be used in the notification to 203 // be sent to the completion queue. 204 void* tag; 205 bool is_closure; 206 } notify_tag; 207 } completion_data_; 208 grpc_closure start_batch_; 209 grpc_closure finish_batch_; 210 std::atomic<intptr_t> ops_pending_{0}; 211 AtomicError batch_error_; set_pending_opsBatchControl212 void set_pending_ops(uintptr_t ops) { 213 ops_pending_.store(ops, std::memory_order_release); 214 } completed_batch_stepBatchControl215 bool completed_batch_step(PendingOp op) { 216 auto mask = PendingOpMask(op); 217 auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel); 218 GRPC_TRACE_VLOG(call, 2) 219 << "BATCH:" << this << " COMPLETE:" << PendingOpString(mask) 220 << " REMAINING:" << PendingOpString(r & ~mask) 221 << " (tag:" << completion_data_.notify_tag.tag << ")"; 222 CHECK_NE((r & mask), 0); 223 return r == mask; 224 } 225 226 void PostCompletion(); 227 void FinishStep(PendingOp op); 228 void ProcessDataAfterMetadata(); 229 void ReceivingStreamReady(grpc_error_handle error); 230 void ReceivingInitialMetadataReady(grpc_error_handle error); 231 void ReceivingTrailingMetadataReady(grpc_error_handle error); 232 void FinishBatch(grpc_error_handle error); 233 }; 234 235 FilterStackCall(RefCountedPtr<Arena> arena, 236 const grpc_call_create_args& args); 237 238 static void ReleaseCall(void* call, grpc_error_handle); 239 static void DestroyCall(void* call, grpc_error_handle); 240 FromCallStack(grpc_call_stack * call_stack)241 static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) { 242 return reinterpret_cast<FilterStackCall*>( 243 reinterpret_cast<char*>(call_stack) - 244 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall))); 245 } 246 247 void ExecuteBatch(grpc_transport_stream_op_batch* batch, 248 grpc_closure* start_batch_closure); 249 void SetFinalStatus(grpc_error_handle error); 250 BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops); 251 bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata, 252 bool is_trailing); 253 void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing); 254 void RecvInitialFilter(grpc_metadata_batch* b); 255 void RecvTrailingFilter(grpc_metadata_batch* b, 256 grpc_error_handle batch_error); 257 incoming_compression_algorithm()258 grpc_compression_algorithm incoming_compression_algorithm() override { 259 return incoming_compression_algorithm_; 260 } SetIncomingCompressionAlgorithm(grpc_compression_algorithm algorithm)261 void SetIncomingCompressionAlgorithm( 262 grpc_compression_algorithm algorithm) override { 263 incoming_compression_algorithm_ = algorithm; 264 } 265 266 RefCountedPtr<Channel> channel_; 267 RefCount ext_ref_; 268 CallCombiner call_combiner_; 269 grpc_completion_queue* cq_; 270 grpc_polling_entity pollent_; 271 272 /// has grpc_call_unref been called 273 bool destroy_called_ = false; 274 // Trailers-only response status 275 bool is_trailers_only_ = false; 276 /// which ops are in-flight 277 bool sent_initial_metadata_ = false; 278 bool sending_message_ = false; 279 bool sent_final_op_ = false; 280 bool received_initial_metadata_ = false; 281 bool receiving_message_ = false; 282 bool requested_final_op_ = false; 283 gpr_atm received_final_op_atm_ = 0; 284 285 BatchControl* active_batches_[kMaxConcurrentBatches] = {}; 286 grpc_transport_stream_op_batch_payload stream_op_payload_; 287 288 // first idx: is_receiving, second idx: is_trailing 289 grpc_metadata_batch send_initial_metadata_; 290 grpc_metadata_batch send_trailing_metadata_; 291 grpc_metadata_batch recv_initial_metadata_; 292 grpc_metadata_batch recv_trailing_metadata_; 293 294 // Buffered read metadata waiting to be returned to the application. 295 // Element 0 is initial metadata, element 1 is trailing metadata. 296 grpc_metadata_array* buffered_metadata_[2] = {}; 297 298 // Call data useful used for reporting. Only valid after the call has 299 // completed 300 grpc_call_final_info final_info_; 301 302 SliceBuffer send_slice_buffer_; 303 absl::optional<SliceBuffer> receiving_slice_buffer_; 304 uint32_t receiving_stream_flags_; 305 uint32_t test_only_last_message_flags_ = 0; 306 // Compression algorithm for *incoming* data 307 grpc_compression_algorithm incoming_compression_algorithm_ = 308 GRPC_COMPRESS_NONE; 309 310 bool call_failed_before_recv_message_ = false; 311 grpc_byte_buffer** receiving_buffer_ = nullptr; 312 grpc_slice receiving_slice_ = grpc_empty_slice(); 313 grpc_closure receiving_stream_ready_; 314 grpc_closure receiving_initial_metadata_ready_; 315 grpc_closure receiving_trailing_metadata_ready_; 316 // Status about operation of call 317 bool sent_server_trailing_metadata_ = false; 318 gpr_atm cancelled_with_error_ = 0; 319 320 grpc_closure release_call_; 321 322 union { 323 struct { 324 grpc_status_code* status; 325 grpc_slice* status_details; 326 const char** error_string; 327 } client; 328 struct { 329 int* cancelled; 330 // backpointer to owning server if this is a server side call. 331 ServerInterface* core_server; 332 } server; 333 } final_op_; 334 AtomicError status_error_; 335 336 // recv_state can contain one of the following values: 337 // RECV_NONE : : no initial metadata and messages received 338 // RECV_INITIAL_METADATA_FIRST : received initial metadata first 339 // a batch_control* : received messages first 340 341 // +------1------RECV_NONE------3-----+ 342 // | | 343 // | | 344 // v v 345 // RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp 346 // | ^ | ^ 347 // | | | | 348 // +-----2-----+ +-----4-----+ 349 350 // For 1, 4: See receiving_initial_metadata_ready() function 351 // For 2, 3: See receiving_stream_ready() function 352 gpr_atm recv_state_ = 0; 353 }; 354 355 // Create a new call based on \a args. 356 // Regardless of success or failure, always returns a valid new call into *call 357 // 358 grpc_error_handle grpc_call_create(grpc_call_create_args* args, 359 grpc_call** call); 360 361 // Given the top call_element, get the call object. 362 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element); 363 364 } // namespace grpc_core 365 366 #endif // GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H 367