1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H 16 #define GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H 17 18 #include <grpc/byte_buffer.h> 19 #include <grpc/compression.h> 20 #include <grpc/event_engine/event_engine.h> 21 #include <grpc/grpc.h> 22 #include <grpc/impl/call.h> 23 #include <grpc/impl/propagation_bits.h> 24 #include <grpc/slice.h> 25 #include <grpc/slice_buffer.h> 26 #include <grpc/status.h> 27 #include <grpc/support/alloc.h> 28 #include <grpc/support/atm.h> 29 #include <grpc/support/port_platform.h> 30 #include <grpc/support/string_util.h> 31 #include <inttypes.h> 32 #include <limits.h> 33 #include <stdlib.h> 34 #include <string.h> 35 36 #include <atomic> 37 #include <cstdint> 38 #include <memory> 39 #include <string> 40 #include <utility> 41 42 #include "absl/log/check.h" 43 #include "absl/status/status.h" 44 #include "absl/strings/str_format.h" 45 #include "absl/strings/string_view.h" 46 #include "src/core/lib/promise/poll.h" 47 #include "src/core/lib/resource_quota/arena.h" 48 #include "src/core/lib/surface/call.h" 49 #include "src/core/lib/surface/call_utils.h" 50 #include "src/core/lib/transport/metadata.h" 51 #include "src/core/lib/transport/metadata_batch.h" 52 #include "src/core/server/server_interface.h" 53 #include "src/core/telemetry/stats.h" 54 #include "src/core/telemetry/stats_data.h" 55 #include "src/core/util/crash.h" 56 #include "src/core/util/ref_counted.h" 57 #include "src/core/util/ref_counted_ptr.h" 58 59 namespace grpc_core { 60 61 class ServerCall final : public Call, public DualRefCounted<ServerCall> { 62 public: ServerCall(ClientMetadataHandle client_initial_metadata,CallHandler call_handler,ServerInterface * server,grpc_completion_queue * cq)63 ServerCall(ClientMetadataHandle client_initial_metadata, 64 CallHandler call_handler, ServerInterface* server, 65 grpc_completion_queue* cq) 66 : Call(false, 67 client_initial_metadata->get(GrpcTimeoutMetadata()) 68 .value_or(Timestamp::InfFuture()), 69 call_handler.arena()->Ref()), 70 call_handler_(std::move(call_handler)), 71 client_initial_metadata_stored_(std::move(client_initial_metadata)), 72 cq_(cq), 73 server_(server) { 74 global_stats().IncrementServerCallsCreated(); 75 } 76 CancelWithError(grpc_error_handle error)77 void CancelWithError(grpc_error_handle error) override { 78 call_handler_.SpawnInfallible( 79 "CancelWithError", 80 [self = WeakRefAsSubclass<ServerCall>(), error = std::move(error)] { 81 self->call_handler_.PushServerTrailingMetadata( 82 CancelledServerMetadataFromStatus(error)); 83 }); 84 } is_trailers_only()85 bool is_trailers_only() const override { 86 Crash("is_trailers_only not implemented for server calls"); 87 } GetServerAuthority()88 absl::string_view GetServerAuthority() const override { 89 Crash("unimplemented"); 90 } 91 grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag, 92 bool is_notify_tag_closure) override; 93 ExternalRef()94 void ExternalRef() override { Ref().release(); } ExternalUnref()95 void ExternalUnref() override { Unref(); } InternalRef(const char *)96 void InternalRef(const char*) override { WeakRef().release(); } InternalUnref(const char *)97 void InternalUnref(const char*) override { WeakUnref(); } 98 Orphaned()99 void Orphaned() override { 100 if (!saw_was_cancelled_.load(std::memory_order_relaxed)) { 101 CancelWithError(absl::CancelledError()); 102 } 103 } 104 SetCompletionQueue(grpc_completion_queue *)105 void SetCompletionQueue(grpc_completion_queue*) override { 106 Crash("unimplemented"); 107 } 108 compression_options()109 grpc_compression_options compression_options() override { 110 return server_->compression_options(); 111 } 112 call_stack()113 grpc_call_stack* call_stack() override { return nullptr; } 114 GetPeer()115 char* GetPeer() override { 116 Slice peer_slice = GetPeerString(); 117 if (!peer_slice.empty()) { 118 absl::string_view peer_string_view = peer_slice.as_string_view(); 119 char* peer_string = 120 static_cast<char*>(gpr_malloc(peer_string_view.size() + 1)); 121 memcpy(peer_string, peer_string_view.data(), peer_string_view.size()); 122 peer_string[peer_string_view.size()] = '\0'; 123 return peer_string; 124 } 125 return gpr_strdup("unknown"); 126 } 127 Completed()128 bool Completed() final { Crash("unimplemented"); } failed_before_recv_message()129 bool failed_before_recv_message() const final { 130 return call_handler_.WasCancelledPushed(); 131 } 132 test_only_message_flags()133 uint32_t test_only_message_flags() override { 134 return message_receiver_.last_message_flags(); 135 } 136 incoming_compression_algorithm()137 grpc_compression_algorithm incoming_compression_algorithm() override { 138 return message_receiver_.incoming_compression_algorithm(); 139 } 140 SetIncomingCompressionAlgorithm(grpc_compression_algorithm algorithm)141 void SetIncomingCompressionAlgorithm( 142 grpc_compression_algorithm algorithm) override { 143 message_receiver_.SetIncomingCompressionAlgorithm(algorithm); 144 } 145 146 private: 147 void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag, 148 bool is_notify_tag_closure); 149 DebugTag()150 std::string DebugTag() { return absl::StrFormat("SERVER_CALL[%p]: ", this); } 151 152 CallHandler call_handler_; 153 MessageReceiver message_receiver_; 154 ClientMetadataHandle client_initial_metadata_stored_; 155 grpc_completion_queue* const cq_; 156 ServerInterface* const server_; 157 std::atomic<bool> saw_was_cancelled_{false}; 158 }; 159 160 grpc_call* MakeServerCall(CallHandler call_handler, 161 ClientMetadataHandle client_initial_metadata, 162 ServerInterface* server, grpc_completion_queue* cq, 163 grpc_metadata_array* publish_initial_metadata); 164 165 } // namespace grpc_core 166 167 #endif // GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H 168