• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H
16 #define GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H
17 
18 #include <grpc/byte_buffer.h>
19 #include <grpc/compression.h>
20 #include <grpc/event_engine/event_engine.h>
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/status.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/port_platform.h>
30 #include <grpc/support/string_util.h>
31 #include <inttypes.h>
32 #include <limits.h>
33 #include <stdlib.h>
34 #include <string.h>
35 
36 #include <atomic>
37 #include <cstdint>
38 #include <memory>
39 #include <string>
40 #include <utility>
41 
42 #include "absl/log/check.h"
43 #include "absl/status/status.h"
44 #include "absl/strings/str_format.h"
45 #include "absl/strings/string_view.h"
46 #include "src/core/lib/promise/poll.h"
47 #include "src/core/lib/resource_quota/arena.h"
48 #include "src/core/lib/surface/call.h"
49 #include "src/core/lib/surface/call_utils.h"
50 #include "src/core/lib/transport/metadata.h"
51 #include "src/core/lib/transport/metadata_batch.h"
52 #include "src/core/server/server_interface.h"
53 #include "src/core/telemetry/stats.h"
54 #include "src/core/telemetry/stats_data.h"
55 #include "src/core/util/crash.h"
56 #include "src/core/util/ref_counted.h"
57 #include "src/core/util/ref_counted_ptr.h"
58 
59 namespace grpc_core {
60 
61 class ServerCall final : public Call, public DualRefCounted<ServerCall> {
62  public:
ServerCall(ClientMetadataHandle client_initial_metadata,CallHandler call_handler,ServerInterface * server,grpc_completion_queue * cq)63   ServerCall(ClientMetadataHandle client_initial_metadata,
64              CallHandler call_handler, ServerInterface* server,
65              grpc_completion_queue* cq)
66       : Call(false,
67              client_initial_metadata->get(GrpcTimeoutMetadata())
68                  .value_or(Timestamp::InfFuture()),
69              call_handler.arena()->Ref()),
70         call_handler_(std::move(call_handler)),
71         client_initial_metadata_stored_(std::move(client_initial_metadata)),
72         cq_(cq),
73         server_(server) {
74     global_stats().IncrementServerCallsCreated();
75   }
76 
CancelWithError(grpc_error_handle error)77   void CancelWithError(grpc_error_handle error) override {
78     call_handler_.SpawnInfallible(
79         "CancelWithError",
80         [self = WeakRefAsSubclass<ServerCall>(), error = std::move(error)] {
81           self->call_handler_.PushServerTrailingMetadata(
82               CancelledServerMetadataFromStatus(error));
83         });
84   }
is_trailers_only()85   bool is_trailers_only() const override {
86     Crash("is_trailers_only not implemented for server calls");
87   }
GetServerAuthority()88   absl::string_view GetServerAuthority() const override {
89     Crash("unimplemented");
90   }
91   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
92                              bool is_notify_tag_closure) override;
93 
ExternalRef()94   void ExternalRef() override { Ref().release(); }
ExternalUnref()95   void ExternalUnref() override { Unref(); }
InternalRef(const char *)96   void InternalRef(const char*) override { WeakRef().release(); }
InternalUnref(const char *)97   void InternalUnref(const char*) override { WeakUnref(); }
98 
Orphaned()99   void Orphaned() override {
100     if (!saw_was_cancelled_.load(std::memory_order_relaxed)) {
101       CancelWithError(absl::CancelledError());
102     }
103   }
104 
SetCompletionQueue(grpc_completion_queue *)105   void SetCompletionQueue(grpc_completion_queue*) override {
106     Crash("unimplemented");
107   }
108 
compression_options()109   grpc_compression_options compression_options() override {
110     return server_->compression_options();
111   }
112 
call_stack()113   grpc_call_stack* call_stack() override { return nullptr; }
114 
GetPeer()115   char* GetPeer() override {
116     Slice peer_slice = GetPeerString();
117     if (!peer_slice.empty()) {
118       absl::string_view peer_string_view = peer_slice.as_string_view();
119       char* peer_string =
120           static_cast<char*>(gpr_malloc(peer_string_view.size() + 1));
121       memcpy(peer_string, peer_string_view.data(), peer_string_view.size());
122       peer_string[peer_string_view.size()] = '\0';
123       return peer_string;
124     }
125     return gpr_strdup("unknown");
126   }
127 
Completed()128   bool Completed() final { Crash("unimplemented"); }
failed_before_recv_message()129   bool failed_before_recv_message() const final {
130     return call_handler_.WasCancelledPushed();
131   }
132 
test_only_message_flags()133   uint32_t test_only_message_flags() override {
134     return message_receiver_.last_message_flags();
135   }
136 
incoming_compression_algorithm()137   grpc_compression_algorithm incoming_compression_algorithm() override {
138     return message_receiver_.incoming_compression_algorithm();
139   }
140 
SetIncomingCompressionAlgorithm(grpc_compression_algorithm algorithm)141   void SetIncomingCompressionAlgorithm(
142       grpc_compression_algorithm algorithm) override {
143     message_receiver_.SetIncomingCompressionAlgorithm(algorithm);
144   }
145 
146  private:
147   void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
148                    bool is_notify_tag_closure);
149 
DebugTag()150   std::string DebugTag() { return absl::StrFormat("SERVER_CALL[%p]: ", this); }
151 
152   CallHandler call_handler_;
153   MessageReceiver message_receiver_;
154   ClientMetadataHandle client_initial_metadata_stored_;
155   grpc_completion_queue* const cq_;
156   ServerInterface* const server_;
157   std::atomic<bool> saw_was_cancelled_{false};
158 };
159 
160 grpc_call* MakeServerCall(CallHandler call_handler,
161                           ClientMetadataHandle client_initial_metadata,
162                           ServerInterface* server, grpc_completion_queue* cq,
163                           grpc_metadata_array* publish_initial_metadata);
164 
165 }  // namespace grpc_core
166 
167 #endif  // GRPC_SRC_CORE_LIB_SURFACE_SERVER_CALL_H
168