• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H
16 #define GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H
17 
18 #include <grpc/byte_buffer.h>
19 #include <grpc/compression.h>
20 #include <grpc/event_engine/event_engine.h>
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/status.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/port_platform.h>
30 #include <grpc/support/string_util.h>
31 #include <inttypes.h>
32 #include <limits.h>
33 #include <stdlib.h>
34 #include <string.h>
35 
36 #include <atomic>
37 #include <cstdint>
38 #include <string>
39 
40 #include "absl/status/status.h"
41 #include "absl/strings/str_format.h"
42 #include "absl/strings/string_view.h"
43 #include "src/core/lib/promise/status_flag.h"
44 #include "src/core/lib/resource_quota/arena.h"
45 #include "src/core/lib/surface/call.h"
46 #include "src/core/lib/surface/call_utils.h"
47 #include "src/core/lib/transport/metadata.h"
48 #include "src/core/util/crash.h"
49 #include "src/core/util/ref_counted.h"
50 #include "src/core/util/ref_counted_ptr.h"
51 #include "src/core/util/single_set_ptr.h"
52 
53 namespace grpc_core {
54 
55 class ClientCall final
56     : public Call,
57       public DualRefCounted<ClientCall, NonPolymorphicRefCount,
58                             UnrefCallDestroy> {
59  public:
60   ClientCall(grpc_call* parent_call, uint32_t propagation_mask,
61              grpc_completion_queue* cq, Slice path,
62              absl::optional<Slice> authority, bool registered_method,
63              Timestamp deadline, grpc_compression_options compression_options,
64              RefCountedPtr<Arena> arena,
65              RefCountedPtr<UnstartedCallDestination> destination);
66 
67   void CancelWithError(grpc_error_handle error) override;
is_trailers_only()68   bool is_trailers_only() const override { return is_trailers_only_; }
GetServerAuthority()69   absl::string_view GetServerAuthority() const override {
70     Crash("unimplemented");
71   }
72   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
73                              bool is_notify_tag_closure) override;
74 
ExternalRef()75   void ExternalRef() override { Ref().release(); }
ExternalUnref()76   void ExternalUnref() override { Unref(); }
InternalRef(const char *)77   void InternalRef(const char*) override { WeakRef().release(); }
InternalUnref(const char *)78   void InternalUnref(const char*) override { WeakUnref(); }
79 
Orphaned()80   void Orphaned() override {
81     if (!saw_trailing_metadata_.load(std::memory_order_relaxed)) {
82       CancelWithError(absl::CancelledError());
83     }
84   }
85 
SetCompletionQueue(grpc_completion_queue *)86   void SetCompletionQueue(grpc_completion_queue*) override {
87     Crash("unimplemented");
88   }
89 
compression_options()90   grpc_compression_options compression_options() override {
91     return compression_options_;
92   }
93 
call_stack()94   grpc_call_stack* call_stack() override { return nullptr; }
95 
96   char* GetPeer() override;
97 
Completed()98   bool Completed() final { Crash("unimplemented"); }
failed_before_recv_message()99   bool failed_before_recv_message() const final {
100     return started_call_initiator_.WasCancelledPushed();
101   }
102 
incoming_compression_algorithm()103   grpc_compression_algorithm incoming_compression_algorithm() override {
104     return message_receiver_.incoming_compression_algorithm();
105   }
106 
SetIncomingCompressionAlgorithm(grpc_compression_algorithm algorithm)107   void SetIncomingCompressionAlgorithm(
108       grpc_compression_algorithm algorithm) override {
109     message_receiver_.SetIncomingCompressionAlgorithm(algorithm);
110   }
111 
test_only_message_flags()112   uint32_t test_only_message_flags() override {
113     return message_receiver_.last_message_flags();
114   }
115 
Destroy()116   void Destroy() {
117     auto arena = this->arena()->Ref();
118     this->~ClientCall();
119   }
120 
121  private:
122   struct UnorderedStart {
123     absl::AnyInvocable<void()> start_pending_batch;
124     UnorderedStart* next;
125   };
126 
127   void CommitBatch(const grpc_op* ops, size_t nops, void* notify_tag,
128                    bool is_notify_tag_closure);
129   template <typename Batch>
130   void ScheduleCommittedBatch(Batch batch);
131   Party::WakeupHold StartCall(const grpc_op& send_initial_metadata_op);
132   // Attempt to start the call and send handler down the stack; returns true if
133   // state was updated, false otherwise (with cur_state updated to the new
134   // current state).
135   // If this function returns false, it's guaranteed that handler is not
136   // touched.
137   // Should be called repeatedly until it returns true.
138   bool StartCallMaybeUpdateState(uintptr_t& cur_state,
139                                  UnstartedCallHandler& handler);
140 
DebugTag()141   std::string DebugTag() { return absl::StrFormat("CLIENT_CALL[%p]: ", this); }
142   void OnReceivedStatus(ServerMetadataHandle server_trailing_metadata,
143                         grpc_status_code* out_status,
144                         grpc_slice* out_status_details,
145                         const char** out_error_string,
146                         grpc_metadata_array* out_trailing_metadata);
147 
148   // call_state_ is one of:
149   // 1. kUnstarted - call has not yet been started
150   // 2. pointer to an UnorderedStart - call has ops started, but no send initial
151   //    metadata yet
152   // 3. kStarted - call has been started and call_initiator_ is ready
153   // 4. kCancelled - call was cancelled before starting
154   // In cases (1) and (2) send_initial_metadata_ is used to store the initial
155   // but unsent metadata.
156   // In case (3) started_call_initiator_ is used to store the call initiator.
157   // In case (4) no other state is used.
158   enum CallState : uintptr_t {
159     kUnstarted = 0,
160     kStarted = 1,
161     kCancelled = 2,
162   };
163   std::atomic<uintptr_t> call_state_{kUnstarted};
164   ClientMetadataHandle send_initial_metadata_{
165       Arena::MakePooledForOverwrite<ClientMetadata>()};
166   CallInitiator started_call_initiator_;
167   // Status passed to CancelWithError;
168   // if call_state_ == kCancelled then this is the authoritative status,
169   // otherwise the server trailing metadata from started_call_initiator_ is
170   // authoritative.
171   SingleSetPtr<absl::Status> cancel_status_;
172   MessageReceiver message_receiver_;
173   grpc_completion_queue* const cq_;
174   const RefCountedPtr<UnstartedCallDestination> call_destination_;
175   const grpc_compression_options compression_options_;
176   ServerMetadataHandle received_initial_metadata_;
177   ServerMetadataHandle received_trailing_metadata_;
178   bool is_trailers_only_;
179   std::atomic<bool> saw_trailing_metadata_{false};
180 };
181 
182 grpc_call* MakeClientCall(grpc_call* parent_call, uint32_t propagation_mask,
183                           grpc_completion_queue* cq, Slice path,
184                           absl::optional<Slice> authority,
185                           bool registered_method, Timestamp deadline,
186                           grpc_compression_options compression_options,
187                           RefCountedPtr<Arena> arena,
188                           RefCountedPtr<UnstartedCallDestination> destination);
189 
190 }  // namespace grpc_core
191 
192 #endif  // GRPC_SRC_CORE_LIB_SURFACE_CLIENT_CALL_H
193