• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_SURFACE_CALL_H
20 #define GRPC_SRC_CORE_LIB_SURFACE_CALL_H
21 
22 #include <grpc/grpc.h>
23 #include <grpc/impl/compression_types.h>
24 #include <grpc/support/atm.h>
25 #include <grpc/support/port_platform.h>
26 #include <stddef.h>
27 #include <stdint.h>
28 
29 #include "absl/functional/any_invocable.h"
30 #include "absl/functional/function_ref.h"
31 #include "absl/log/check.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 #include "src/core/lib/channel/channel_fwd.h"
35 #include "src/core/lib/channel/channel_stack.h"
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/iomgr/closure.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/iomgr/iomgr_fwd.h"
40 #include "src/core/lib/promise/arena_promise.h"
41 #include "src/core/lib/promise/context.h"
42 #include "src/core/lib/resource_quota/arena.h"
43 #include "src/core/lib/slice/slice.h"
44 #include "src/core/lib/surface/channel.h"
45 #include "src/core/lib/transport/transport.h"
46 #include "src/core/server/server_interface.h"
47 #include "src/core/util/ref_counted_ptr.h"
48 #include "src/core/util/time.h"
49 #include "src/core/util/time_precise.h"
50 
51 typedef void (*grpc_ioreq_completion_func)(grpc_call* call, int success,
52                                            void* user_data);
53 
54 typedef struct grpc_call_create_args {
55   grpc_core::RefCountedPtr<grpc_core::Channel> channel;
56   grpc_core::ServerInterface* server;
57 
58   grpc_call* parent;
59   uint32_t propagation_mask;
60 
61   grpc_completion_queue* cq;
62   // if not NULL, it'll be used in lieu of cq
63   grpc_pollset_set* pollset_set_alternative;
64 
65   const void* server_transport_data;
66 
67   absl::optional<grpc_core::Slice> path;
68   absl::optional<grpc_core::Slice> authority;
69 
70   grpc_core::Timestamp send_deadline;
71   bool registered_method;  // client_only
72 } grpc_call_create_args;
73 
74 namespace grpc_core {
75 
76 template <>
77 struct ArenaContextType<census_context> {
78   static void Destroy(census_context*) {}
79 };
80 
81 class Call : public CppImplOf<Call, grpc_call>,
82              public grpc_event_engine::experimental::EventEngine::
83                  Closure /* for deadlines */ {
84  public:
85   Arena* arena() const { return arena_.get(); }
86   bool is_client() const { return is_client_; }
87 
88   virtual bool Completed() = 0;
89   void CancelWithStatus(grpc_status_code status, const char* description);
90   virtual void CancelWithError(grpc_error_handle error) = 0;
91   virtual void SetCompletionQueue(grpc_completion_queue* cq) = 0;
92   virtual char* GetPeer() = 0;
93   virtual grpc_call_error StartBatch(const grpc_op* ops, size_t nops,
94                                      void* notify_tag,
95                                      bool is_notify_tag_closure) = 0;
96   virtual bool failed_before_recv_message() const = 0;
97   virtual bool is_trailers_only() const = 0;
98   virtual absl::string_view GetServerAuthority() const = 0;
99   virtual void ExternalRef() = 0;
100   virtual void ExternalUnref() = 0;
101   virtual void InternalRef(const char* reason) = 0;
102   virtual void InternalUnref(const char* reason) = 0;
103 
104   void UpdateDeadline(Timestamp deadline) ABSL_LOCKS_EXCLUDED(deadline_mu_);
105   void ResetDeadline() ABSL_LOCKS_EXCLUDED(deadline_mu_);
106   Timestamp deadline() {
107     MutexLock lock(&deadline_mu_);
108     return deadline_;
109   }
110 
111   virtual uint32_t test_only_message_flags() = 0;
112   CompressionAlgorithmSet encodings_accepted_by_peer() {
113     return encodings_accepted_by_peer_;
114   }
115 
116   // This should return nullptr for the promise stack (and alternative means
117   // for that functionality be invented)
118   virtual grpc_call_stack* call_stack() = 0;
119 
120   // Implementation of EventEngine::Closure, called when deadline expires
121   void Run() final;
122 
123   gpr_cycle_counter start_time() const { return start_time_; }
124 
125   void set_traced(bool traced) { traced_ = traced; }
126   bool traced() const { return traced_; }
127 
128   virtual grpc_compression_algorithm incoming_compression_algorithm() = 0;
129 
130  protected:
131   // The maximum number of concurrent batches possible.
132   // Based upon the maximum number of individually queueable ops in the batch
133   // api:
134   //    - initial metadata send
135   //    - message send
136   //    - status/close send (depending on client/server)
137   //    - initial metadata recv
138   //    - message recv
139   //    - status/close recv (depending on client/server)
140   static constexpr size_t kMaxConcurrentBatches = 6;
141 
142   struct ParentCall {
143     Mutex child_list_mu;
144     Call* first_child ABSL_GUARDED_BY(child_list_mu) = nullptr;
145   };
146 
147   struct ChildCall {
148     explicit ChildCall(Call* parent) : parent(parent) {}
149     Call* parent;
150     /// siblings: children of the same parent form a list, and this list is
151     /// protected under
152     /// parent->mu
153     Call* sibling_next = nullptr;
154     Call* sibling_prev = nullptr;
155   };
156 
157   Call(bool is_client, Timestamp send_deadline, RefCountedPtr<Arena> arena);
158   ~Call() override = default;
159 
160   ParentCall* GetOrCreateParentCall();
161   ParentCall* parent_call();
162 
163   absl::Status InitParent(Call* parent, uint32_t propagation_mask);
164   void PublishToParent(Call* parent);
165   void MaybeUnpublishFromParent();
166   void PropagateCancellationToChildren();
167 
168   Timestamp send_deadline() const { return send_deadline_; }
169   void set_send_deadline(Timestamp send_deadline) {
170     send_deadline_ = send_deadline;
171   }
172 
173   Slice GetPeerString() const {
174     MutexLock lock(&peer_mu_);
175     return peer_string_.Ref();
176   }
177 
178   void SetPeerString(Slice peer_string) {
179     MutexLock lock(&peer_mu_);
180     peer_string_ = std::move(peer_string);
181   }
182 
183   void ClearPeerString() { SetPeerString(Slice(grpc_empty_slice())); }
184 
185   // TODO(ctiller): cancel_func is for cancellation of the call - filter stack
186   // holds no mutexes here, promise stack does, and so locking is different.
187   // Remove this and cancel directly once promise conversion is done.
188   void ProcessIncomingInitialMetadata(grpc_metadata_batch& md);
189   // Fixup outgoing metadata before sending - adds compression, protects
190   // internal headers against external modification.
191   void PrepareOutgoingInitialMetadata(const grpc_op& op,
192                                       grpc_metadata_batch& md);
193 
194   void HandleCompressionAlgorithmDisabled(
195       grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
196   void HandleCompressionAlgorithmNotAccepted(
197       grpc_compression_algorithm compression_algorithm) GPR_ATTRIBUTE_NOINLINE;
198 
199   virtual grpc_compression_options compression_options() = 0;
200 
201   virtual void SetIncomingCompressionAlgorithm(
202       grpc_compression_algorithm algorithm) = 0;
203 
204  private:
205   const RefCountedPtr<Arena> arena_;
206   std::atomic<ParentCall*> parent_call_{nullptr};
207   ChildCall* child_ = nullptr;
208   Timestamp send_deadline_;
209   const bool is_client_;
210   // flag indicating that cancellation is inherited
211   bool cancellation_is_inherited_ = false;
212   // Is this call traced?
213   bool traced_ = false;
214   // Supported encodings (compression algorithms), a bitset.
215   // Always support no compression.
216   CompressionAlgorithmSet encodings_accepted_by_peer_{GRPC_COMPRESS_NONE};
217   // Peer name is protected by a mutex because it can be accessed by the
218   // application at the same moment as it is being set by the completion
219   // of the recv_initial_metadata op.  The mutex should be mostly uncontended.
220   mutable Mutex peer_mu_;
221   Slice peer_string_;
222   // Current deadline.
223   Mutex deadline_mu_;
224   Timestamp deadline_ ABSL_GUARDED_BY(deadline_mu_) = Timestamp::InfFuture();
225   grpc_event_engine::experimental::EventEngine::TaskHandle ABSL_GUARDED_BY(
226       deadline_mu_) deadline_task_;
227   gpr_cycle_counter start_time_ = gpr_get_cycle_counter();
228 };
229 
230 template <>
231 struct ArenaContextType<Call> {
232   static void Destroy(Call*) {}
233 };
234 
235 }  // namespace grpc_core
236 
237 // Create a new call based on \a args.
238 // Regardless of success or failure, always returns a valid new call into *call
239 //
240 grpc_error_handle grpc_call_create(grpc_call_create_args* args,
241                                    grpc_call** call);
242 
243 void grpc_call_set_completion_queue(grpc_call* call, grpc_completion_queue* cq);
244 
245 grpc_core::Arena* grpc_call_get_arena(grpc_call* call);
246 
247 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call);
248 
249 grpc_call_error grpc_call_start_batch_and_execute(grpc_call* call,
250                                                   const grpc_op* ops,
251                                                   size_t nops,
252                                                   grpc_closure* closure);
253 
254 // gRPC core internal version of grpc_call_cancel that does not create
255 // exec_ctx.
256 void grpc_call_cancel_internal(grpc_call* call);
257 
258 // Given the top call_element, get the call object.
259 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element);
260 
261 void grpc_call_log_batch(const char* file, int line, const grpc_op* ops,
262                          size_t nops);
263 
264 void grpc_call_tracer_set(grpc_call* call, grpc_core::ClientCallTracer* tracer);
265 
266 // Sets call tracer on the call and manages its life by using the call's arena.
267 // When using this API, the tracer will be destroyed by grpc_call arena when
268 // grpc_call is about to be destroyed. The caller of this API SHOULD NOT
269 // manually destroy the tracer. This API is used by Python as a way of using
270 // Arena to manage the lifetime of the call tracer. Python needs this API
271 // because the tracer was created within a separate shared object library which
272 // doesn't have access to core functions like arena->ManagedNew<>.
273 void grpc_call_tracer_set_and_manage(grpc_call* call,
274                                      grpc_core::ClientCallTracer* tracer);
275 
276 void* grpc_call_tracer_get(grpc_call* call);
277 
278 #define GRPC_CALL_LOG_BATCH(ops, nops)                    \
279   do {                                                    \
280     if (GRPC_TRACE_FLAG_ENABLED(api)) {                   \
281       grpc_call_log_batch(__FILE__, __LINE__, ops, nops); \
282     }                                                     \
283   } while (0)
284 
285 uint8_t grpc_call_is_client(grpc_call* call);
286 
287 class ClientCallTracerWrapper {
288  public:
289   explicit ClientCallTracerWrapper(grpc_core::ClientCallTracer* tracer)
290       : tracer_(tracer) {}
291 
292  private:
293   std::unique_ptr<grpc_core::ClientCallTracer> tracer_;
294 };
295 
296 // Return an appropriate compression algorithm for the requested compression \a
297 // level in the context of \a call.
298 grpc_compression_algorithm grpc_call_compression_for_level(
299     grpc_call* call, grpc_compression_level level);
300 
301 // Did this client call receive a trailers-only response
302 // TODO(markdroth): This is currently available only to the C++ API.
303 //                  Move to surface API if requested by other languages.
304 bool grpc_call_is_trailers_only(const grpc_call* call);
305 
306 // Returns the authority for the call, as seen on the server side.
307 absl::string_view grpc_call_server_authority(const grpc_call* call);
308 
309 #endif  // GRPC_SRC_CORE_LIB_SURFACE_CALL_H
310