1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_LIB_SURFACE_CALL_H 20 #define GRPC_SRC_CORE_LIB_SURFACE_CALL_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/impl/compression_types.h> 24 #include <grpc/support/atm.h> 25 #include <grpc/support/port_platform.h> 26 #include <stddef.h> 27 #include <stdint.h> 28 29 #include "absl/functional/any_invocable.h" 30 #include "absl/functional/function_ref.h" 31 #include "absl/log/check.h" 32 #include "absl/strings/string_view.h" 33 #include "absl/types/optional.h" 34 #include "src/core/lib/channel/channel_fwd.h" 35 #include "src/core/lib/channel/channel_stack.h" 36 #include "src/core/lib/debug/trace.h" 37 #include "src/core/lib/iomgr/closure.h" 38 #include "src/core/lib/iomgr/error.h" 39 #include "src/core/lib/iomgr/iomgr_fwd.h" 40 #include "src/core/lib/promise/arena_promise.h" 41 #include "src/core/lib/promise/context.h" 42 #include "src/core/lib/resource_quota/arena.h" 43 #include "src/core/lib/slice/slice.h" 44 #include "src/core/lib/surface/channel.h" 45 #include "src/core/lib/transport/transport.h" 46 #include "src/core/server/server_interface.h" 47 #include "src/core/util/ref_counted_ptr.h" 48 #include "src/core/util/time.h" 49 #include "src/core/util/time_precise.h" 50 51 typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success, 52 void* user_data); 53 54 typedef struct grpc_call_create_args { 55 grpc_core::RefCountedPtr<grpc_core::Channel> channel; 56 grpc_core::ServerInterface* server; 57 58 grpc_call* parent; 59 uint32_t propagation_mask; 60 61 grpc_completion_queue* cq; 62 // if not NULL, it'll be used in lieu of cq 63 grpc_pollset_set* pollset_set_alternative; 64 65 const void* server_transport_data; 66 67 absl::optional<grpc_core::Slice> path; 68 absl::optional<grpc_core::Slice> authority; 69 70 grpc_core::Timestamp send_deadline; 71 bool registered_method; // client_only 72 } grpc_call_create_args; 73 74 namespace grpc_core { 75 76 template <> 77 struct ArenaContextType<census_context> { 78 static void Destroy(census_context*) {} 79 }; 80 81 class Call : public CppImplOf<Call, grpc_call>, 82 public grpc_event_engine::experimental::EventEngine:: 83 Closure /* for deadlines */ { 84 public: 85 Arena* arena() const { return arena_.get(); } 86 bool is_client() const { return is_client_; } 87 88 virtual bool Completed() = 0; 89 void CancelWithStatus(grpc_status_code status, const char* description); 90 virtual void CancelWithError(grpc_error_handle error) = 0; 91 virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0; 92 virtual char* GetPeer() = 0; 93 virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops, 94 void* notify_tag, 95 bool is_notify_tag_closure) = 0; 96 virtual bool failed_before_recv_message() const = 0; 97 virtual bool is_trailers_only() const = 0; 98 virtual absl::string_view GetServerAuthority() const = 0; 99 virtual void ExternalRef() = 0; 100 virtual void ExternalUnref() = 0; 101 virtual void InternalRef(const char* reason) = 0; 102 virtual void InternalUnref(const char* reason) = 0; 103 104 void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_); 105 void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_); 106 Timestamp deadline() { 107 MutexLock lock(&deadline_mu_); 108 return deadline_; 109 } 110 111 virtual uint32_t test_only_message_flags() = 0; 112 CompressionAlgorithmSet encodings_accepted_by_peer() { 113 return encodings_accepted_by_peer_; 114 } 115 116 // This should return nullptr for the promise stack (and alternative means 117 // for that functionality be invented) 118 virtual grpc_call_stack* call_stack() = 0; 119 120 // Implementation of EventEngine::Closure, called when deadline expires 121 void Run() final; 122 123 gpr_cycle_counter start_time() const { return start_time_; } 124 125 void set_traced(bool traced) { traced_ = traced; } 126 bool traced() const { return traced_; } 127 128 virtual grpc_compression_algorithm incoming_compression_algorithm() = 0; 129 130 protected: 131 // The maximum number of concurrent batches possible. 132 // Based upon the maximum number of individually queueable ops in the batch 133 // api: 134 // - initial metadata send 135 // - message send 136 // - status/close send (depending on client/server) 137 // - initial metadata recv 138 // - message recv 139 // - status/close recv (depending on client/server) 140 static constexpr size_t kMaxConcurrentBatches = 6; 141 142 struct ParentCall { 143 Mutex child_list_mu; 144 Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr; 145 }; 146 147 struct ChildCall { 148 explicit ChildCall(Call* parent) : parent(parent) {} 149 Call* parent; 150 /// siblings: children of the same parent form a list, and this list is 151 /// protected under 152 /// parent->mu 153 Call* sibling_next = nullptr; 154 Call* sibling_prev = nullptr; 155 }; 156 157 Call(bool is_client, Timestamp send_deadline, RefCountedPtr<Arena> arena); 158 ~Call() override = default; 159 160 ParentCall* GetOrCreateParentCall(); 161 ParentCall* parent_call(); 162 163 absl::Status InitParent(Call* parent, uint32_t propagation_mask); 164 void PublishToParent(Call* parent); 165 void MaybeUnpublishFromParent(); 166 void PropagateCancellationToChildren(); 167 168 Timestamp send_deadline() const { return send_deadline_; } 169 void set_send_deadline(Timestamp send_deadline) { 170 send_deadline_ = send_deadline; 171 } 172 173 Slice GetPeerString() const { 174 MutexLock lock(&peer_mu_); 175 return peer_string_.Ref(); 176 } 177 178 void SetPeerString(Slice peer_string) { 179 MutexLock lock(&peer_mu_); 180 peer_string_ = std::move(peer_string); 181 } 182 183 void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); } 184 185 // TODO(ctiller): cancel_func is for cancellation of the call - filter stack 186 // holds no mutexes here, promise stack does, and so locking is different. 187 // Remove this and cancel directly once promise conversion is done. 188 void ProcessIncomingInitialMetadata(grpc_metadata_batch& md); 189 // Fixup outgoing metadata before sending - adds compression, protects 190 // internal headers against external modification. 191 void PrepareOutgoingInitialMetadata(const grpc_op& op, 192 grpc_metadata_batch& md); 193 194 void HandleCompressionAlgorithmDisabled( 195 grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; 196 void HandleCompressionAlgorithmNotAccepted( 197 grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE; 198 199 virtual grpc_compression_options compression_options() = 0; 200 201 virtual void SetIncomingCompressionAlgorithm( 202 grpc_compression_algorithm algorithm) = 0; 203 204 private: 205 const RefCountedPtr<Arena> arena_; 206 std::atomic<ParentCall*> parent_call_{nullptr}; 207 ChildCall* child_ = nullptr; 208 Timestamp send_deadline_; 209 const bool is_client_; 210 // flag indicating that cancellation is inherited 211 bool cancellation_is_inherited_ = false; 212 // Is this call traced? 213 bool traced_ = false; 214 // Supported encodings (compression algorithms), a bitset. 215 // Always support no compression. 216 CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE}; 217 // Peer name is protected by a mutex because it can be accessed by the 218 // application at the same moment as it is being set by the completion 219 // of the recv_initial_metadata op. The mutex should be mostly uncontended. 220 mutable Mutex peer_mu_; 221 Slice peer_string_; 222 // Current deadline. 223 Mutex deadline_mu_; 224 Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture(); 225 grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY( 226 deadline_mu_) deadline_task_; 227 gpr_cycle_counter start_time_ = gpr_get_cycle_counter(); 228 }; 229 230 template <> 231 struct ArenaContextType<Call> { 232 static void Destroy(Call*) {} 233 }; 234 235 } // namespace grpc_core 236 237 // Create a new call based on \a args. 238 // Regardless of success or failure, always returns a valid new call into *call 239 // 240 grpc_error_handle grpc_call_create(grpc_call_create_args* args, 241 grpc_call** call); 242 243 void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq); 244 245 grpc_core::Arena* grpc_call_get_arena(grpc_call* call); 246 247 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call); 248 249 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call, 250 const grpc_op* ops, 251 size_t nops, 252 grpc_closure* closure); 253 254 // gRPC core internal version of grpc_call_cancel that does not create 255 // exec_ctx. 256 void grpc_call_cancel_internal(grpc_call* call); 257 258 // Given the top call_element, get the call object. 259 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element); 260 261 void grpc_call_log_batch(const char* file, int line, const grpc_op* ops, 262 size_t nops); 263 264 void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer); 265 266 // Sets call tracer on the call and manages its life by using the call's arena. 267 // When using this API, the tracer will be destroyed by grpc_call arena when 268 // grpc_call is about to be destroyed. The caller of this API SHOULD NOT 269 // manually destroy the tracer. This API is used by Python as a way of using 270 // Arena to manage the lifetime of the call tracer. Python needs this API 271 // because the tracer was created within a separate shared object library which 272 // doesn't have access to core functions like arena->ManagedNew<>. 273 void grpc_call_tracer_set_and_manage(grpc_call* call, 274 grpc_core::ClientCallTracer* tracer); 275 276 void* grpc_call_tracer_get(grpc_call* call); 277 278 #define GRPC_CALL_LOG_BATCH(ops, nops) \ 279 do { \ 280 if (GRPC_TRACE_FLAG_ENABLED(api)) { \ 281 grpc_call_log_batch(__FILE__, __LINE__, ops, nops); \ 282 } \ 283 } while (0) 284 285 uint8_t grpc_call_is_client(grpc_call* call); 286 287 class ClientCallTracerWrapper { 288 public: 289 explicit ClientCallTracerWrapper(grpc_core::ClientCallTracer* tracer) 290 : tracer_(tracer) {} 291 292 private: 293 std::unique_ptr<grpc_core::ClientCallTracer> tracer_; 294 }; 295 296 // Return an appropriate compression algorithm for the requested compression \a 297 // level in the context of \a call. 298 grpc_compression_algorithm grpc_call_compression_for_level( 299 grpc_call* call, grpc_compression_level level); 300 301 // Did this client call receive a trailers-only response 302 // TODO(markdroth): This is currently available only to the C++ API. 303 // Move to surface API if requested by other languages. 304 bool grpc_call_is_trailers_only(const grpc_call* call); 305 306 // Returns the authority for the call, as seen on the server side. 307 absl::string_view grpc_call_server_authority(const grpc_call* call); 308 309 #endif // GRPC_SRC_CORE_LIB_SURFACE_CALL_H 310