• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H
16 #define GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H
17 
18 #include <grpc/byte_buffer.h>
19 #include <grpc/compression.h>
20 #include <grpc/event_engine/event_engine.h>
21 #include <grpc/grpc.h>
22 #include <grpc/impl/call.h>
23 #include <grpc/impl/propagation_bits.h>
24 #include <grpc/slice.h>
25 #include <grpc/slice_buffer.h>
26 #include <grpc/status.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/atm.h>
29 #include <grpc/support/port_platform.h>
30 #include <grpc/support/string_util.h>
31 #include <inttypes.h>
32 #include <limits.h>
33 #include <stdlib.h>
34 #include <string.h>
35 
36 #include <atomic>
37 #include <cstdint>
38 #include <string>
39 #include <vector>
40 
41 #include "absl/log/check.h"
42 #include "absl/strings/str_cat.h"
43 #include "absl/strings/str_join.h"
44 #include "absl/strings/string_view.h"
45 #include "src/core/lib/channel/channel_stack.h"
46 #include "src/core/lib/iomgr/call_combiner.h"
47 #include "src/core/lib/iomgr/polling_entity.h"
48 #include "src/core/lib/promise/context.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/slice/slice_buffer.h"
51 #include "src/core/lib/surface/call.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/completion_queue.h"
54 #include "src/core/lib/transport/metadata_batch.h"
55 #include "src/core/lib/transport/transport.h"
56 #include "src/core/server/server_interface.h"
57 #include "src/core/telemetry/call_tracer.h"
58 #include "src/core/util/alloc.h"
59 #include "src/core/util/ref_counted.h"
60 #include "src/core/util/ref_counted_ptr.h"
61 
62 namespace grpc_core {
63 
64 ///////////////////////////////////////////////////////////////////////////////
65 // FilterStackCall
66 // To be removed once promise conversion is complete
67 
68 class FilterStackCall final : public Call {
69  public:
~FilterStackCall()70   ~FilterStackCall() override {
71     gpr_free(static_cast<void*>(const_cast<char*>(final_info_.error_string)));
72   }
73 
Completed()74   bool Completed() override {
75     return gpr_atm_acq_load(&received_final_op_atm_) != 0;
76   }
77 
78   // TODO(ctiller): return absl::StatusOr<SomeSmartPointer<Call>>?
79   static grpc_error_handle Create(grpc_call_create_args* args,
80                                   grpc_call** out_call);
81 
FromTopElem(grpc_call_element * elem)82   static Call* FromTopElem(grpc_call_element* elem) {
83     return FromCallStack(grpc_call_stack_from_top_element(elem));
84   }
85 
call_stack()86   grpc_call_stack* call_stack() override {
87     return reinterpret_cast<grpc_call_stack*>(
88         reinterpret_cast<char*>(this) +
89         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(*this)));
90   }
91 
call_elem(size_t idx)92   grpc_call_element* call_elem(size_t idx) {
93     return grpc_call_stack_element(call_stack(), idx);
94   }
95 
call_combiner()96   CallCombiner* call_combiner() { return &call_combiner_; }
97 
98   void CancelWithError(grpc_error_handle error) override;
99   void SetCompletionQueue(grpc_completion_queue* cq) override;
100   grpc_call_error StartBatch(const grpc_op* ops, size_t nops, void* notify_tag,
101                              bool is_notify_tag_closure) override;
ExternalRef()102   void ExternalRef() override { ext_ref_.Ref(); }
103   void ExternalUnref() override;
InternalRef(const char * reason)104   void InternalRef(const char* reason) override {
105     GRPC_CALL_STACK_REF(call_stack(), reason);
106   }
InternalUnref(const char * reason)107   void InternalUnref(const char* reason) override {
108     GRPC_CALL_STACK_UNREF(call_stack(), reason);
109   }
110 
is_trailers_only()111   bool is_trailers_only() const override {
112     bool result = is_trailers_only_;
113     DCHECK(!result || recv_initial_metadata_.TransportSize() == 0);
114     return result;
115   }
116 
failed_before_recv_message()117   bool failed_before_recv_message() const override {
118     return call_failed_before_recv_message_;
119   }
120 
test_only_message_flags()121   uint32_t test_only_message_flags() override {
122     return test_only_last_message_flags_;
123   }
124 
GetServerAuthority()125   absl::string_view GetServerAuthority() const override {
126     const Slice* authority_metadata =
127         recv_initial_metadata_.get_pointer(HttpAuthorityMetadata());
128     if (authority_metadata == nullptr) return "";
129     return authority_metadata->as_string_view();
130   }
131 
InitialSizeEstimate()132   static size_t InitialSizeEstimate() {
133     return sizeof(FilterStackCall) +
134            (sizeof(BatchControl) * kMaxConcurrentBatches);
135   }
136 
137   char* GetPeer() final;
138 
compression_options()139   grpc_compression_options compression_options() override {
140     return channel_->compression_options();
141   }
142 
DeleteThis()143   void DeleteThis() {
144     auto arena = this->arena()->Ref();
145     this->~FilterStackCall();
146   }
147 
channel()148   Channel* channel() const { return channel_.get(); }
149 
150  private:
151   class ScopedContext : public promise_detail::Context<Arena> {
152    public:
ScopedContext(FilterStackCall * call)153     explicit ScopedContext(FilterStackCall* call)
154         : promise_detail::Context<Arena>(call->arena()) {}
155   };
156 
157   static constexpr gpr_atm kRecvNone = 0;
158   static constexpr gpr_atm kRecvInitialMetadataFirst = 1;
159 
160   enum class PendingOp {
161     kRecvMessage,
162     kRecvInitialMetadata,
163     kRecvTrailingMetadata,
164     kSends
165   };
PendingOpMask(PendingOp op)166   static intptr_t PendingOpMask(PendingOp op) {
167     return static_cast<intptr_t>(1) << static_cast<intptr_t>(op);
168   }
PendingOpString(intptr_t pending_ops)169   static std::string PendingOpString(intptr_t pending_ops) {
170     std::vector<absl::string_view> pending_op_strings;
171     if (pending_ops & PendingOpMask(PendingOp::kRecvMessage)) {
172       pending_op_strings.push_back("kRecvMessage");
173     }
174     if (pending_ops & PendingOpMask(PendingOp::kRecvInitialMetadata)) {
175       pending_op_strings.push_back("kRecvInitialMetadata");
176     }
177     if (pending_ops & PendingOpMask(PendingOp::kRecvTrailingMetadata)) {
178       pending_op_strings.push_back("kRecvTrailingMetadata");
179     }
180     if (pending_ops & PendingOpMask(PendingOp::kSends)) {
181       pending_op_strings.push_back("kSends");
182     }
183     return absl::StrCat("{", absl::StrJoin(pending_op_strings, ","), "}");
184   }
185   struct BatchControl {
186     FilterStackCall* call_ = nullptr;
187     CallTracerAnnotationInterface* call_tracer_ = nullptr;
188     grpc_transport_stream_op_batch op_;
189     // Share memory for cq_completion and notify_tag as they are never needed
190     // simultaneously. Each byte used in this data structure count as six bytes
191     // per call, so any savings we can make are worthwhile,
192 
193     // We use notify_tag to determine whether or not to send notification to the
194     // completion queue. Once we've made that determination, we can reuse the
195     // memory for cq_completion.
196     union {
197       grpc_cq_completion cq_completion;
198       struct {
199         // Any given op indicates completion by either (a) calling a closure or
200         // (b) sending a notification on the call's completion queue.  If
201         // \a is_closure is true, \a tag indicates a closure to be invoked;
202         // otherwise, \a tag indicates the tag to be used in the notification to
203         // be sent to the completion queue.
204         void* tag;
205         bool is_closure;
206       } notify_tag;
207     } completion_data_;
208     grpc_closure start_batch_;
209     grpc_closure finish_batch_;
210     std::atomic<intptr_t> ops_pending_{0};
211     AtomicError batch_error_;
set_pending_opsBatchControl212     void set_pending_ops(uintptr_t ops) {
213       ops_pending_.store(ops, std::memory_order_release);
214     }
completed_batch_stepBatchControl215     bool completed_batch_step(PendingOp op) {
216       auto mask = PendingOpMask(op);
217       auto r = ops_pending_.fetch_sub(mask, std::memory_order_acq_rel);
218       GRPC_TRACE_VLOG(call, 2)
219           << "BATCH:" << this << " COMPLETE:" << PendingOpString(mask)
220           << " REMAINING:" << PendingOpString(r & ~mask)
221           << " (tag:" << completion_data_.notify_tag.tag << ")";
222       CHECK_NE((r & mask), 0);
223       return r == mask;
224     }
225 
226     void PostCompletion();
227     void FinishStep(PendingOp op);
228     void ProcessDataAfterMetadata();
229     void ReceivingStreamReady(grpc_error_handle error);
230     void ReceivingInitialMetadataReady(grpc_error_handle error);
231     void ReceivingTrailingMetadataReady(grpc_error_handle error);
232     void FinishBatch(grpc_error_handle error);
233   };
234 
235   FilterStackCall(RefCountedPtr<Arena> arena,
236                   const grpc_call_create_args& args);
237 
238   static void ReleaseCall(void* call, grpc_error_handle);
239   static void DestroyCall(void* call, grpc_error_handle);
240 
FromCallStack(grpc_call_stack * call_stack)241   static FilterStackCall* FromCallStack(grpc_call_stack* call_stack) {
242     return reinterpret_cast<FilterStackCall*>(
243         reinterpret_cast<char*>(call_stack) -
244         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(FilterStackCall)));
245   }
246 
247   void ExecuteBatch(grpc_transport_stream_op_batch* batch,
248                     grpc_closure* start_batch_closure);
249   void SetFinalStatus(grpc_error_handle error);
250   BatchControl* ReuseOrAllocateBatchControl(const grpc_op* ops);
251   bool PrepareApplicationMetadata(size_t count, grpc_metadata* metadata,
252                                   bool is_trailing);
253   void PublishAppMetadata(grpc_metadata_batch* b, bool is_trailing);
254   void RecvInitialFilter(grpc_metadata_batch* b);
255   void RecvTrailingFilter(grpc_metadata_batch* b,
256                           grpc_error_handle batch_error);
257 
incoming_compression_algorithm()258   grpc_compression_algorithm incoming_compression_algorithm() override {
259     return incoming_compression_algorithm_;
260   }
SetIncomingCompressionAlgorithm(grpc_compression_algorithm algorithm)261   void SetIncomingCompressionAlgorithm(
262       grpc_compression_algorithm algorithm) override {
263     incoming_compression_algorithm_ = algorithm;
264   }
265 
266   RefCountedPtr<Channel> channel_;
267   RefCount ext_ref_;
268   CallCombiner call_combiner_;
269   grpc_completion_queue* cq_;
270   grpc_polling_entity pollent_;
271 
272   /// has grpc_call_unref been called
273   bool destroy_called_ = false;
274   // Trailers-only response status
275   bool is_trailers_only_ = false;
276   /// which ops are in-flight
277   bool sent_initial_metadata_ = false;
278   bool sending_message_ = false;
279   bool sent_final_op_ = false;
280   bool received_initial_metadata_ = false;
281   bool receiving_message_ = false;
282   bool requested_final_op_ = false;
283   gpr_atm received_final_op_atm_ = 0;
284 
285   BatchControl* active_batches_[kMaxConcurrentBatches] = {};
286   grpc_transport_stream_op_batch_payload stream_op_payload_;
287 
288   // first idx: is_receiving, second idx: is_trailing
289   grpc_metadata_batch send_initial_metadata_;
290   grpc_metadata_batch send_trailing_metadata_;
291   grpc_metadata_batch recv_initial_metadata_;
292   grpc_metadata_batch recv_trailing_metadata_;
293 
294   // Buffered read metadata waiting to be returned to the application.
295   // Element 0 is initial metadata, element 1 is trailing metadata.
296   grpc_metadata_array* buffered_metadata_[2] = {};
297 
298   // Call data useful used for reporting. Only valid after the call has
299   // completed
300   grpc_call_final_info final_info_;
301 
302   SliceBuffer send_slice_buffer_;
303   absl::optional<SliceBuffer> receiving_slice_buffer_;
304   uint32_t receiving_stream_flags_;
305   uint32_t test_only_last_message_flags_ = 0;
306   // Compression algorithm for *incoming* data
307   grpc_compression_algorithm incoming_compression_algorithm_ =
308       GRPC_COMPRESS_NONE;
309 
310   bool call_failed_before_recv_message_ = false;
311   grpc_byte_buffer** receiving_buffer_ = nullptr;
312   grpc_slice receiving_slice_ = grpc_empty_slice();
313   grpc_closure receiving_stream_ready_;
314   grpc_closure receiving_initial_metadata_ready_;
315   grpc_closure receiving_trailing_metadata_ready_;
316   // Status about operation of call
317   bool sent_server_trailing_metadata_ = false;
318   gpr_atm cancelled_with_error_ = 0;
319 
320   grpc_closure release_call_;
321 
322   union {
323     struct {
324       grpc_status_code* status;
325       grpc_slice* status_details;
326       const char** error_string;
327     } client;
328     struct {
329       int* cancelled;
330       // backpointer to owning server if this is a server side call.
331       ServerInterface* core_server;
332     } server;
333   } final_op_;
334   AtomicError status_error_;
335 
336   // recv_state can contain one of the following values:
337   // RECV_NONE :                 :  no initial metadata and messages received
338   // RECV_INITIAL_METADATA_FIRST :  received initial metadata first
339   // a batch_control*            :  received messages first
340 
341   //             +------1------RECV_NONE------3-----+
342   //             |                                  |
343   //             |                                  |
344   //             v                                  v
345   // RECV_INITIAL_METADATA_FIRST        receiving_stream_ready_bctlp
346   //       |           ^                      |           ^
347   //       |           |                      |           |
348   //       +-----2-----+                      +-----4-----+
349 
350   // For 1, 4: See receiving_initial_metadata_ready() function
351   // For 2, 3: See receiving_stream_ready() function
352   gpr_atm recv_state_ = 0;
353 };
354 
355 // Create a new call based on \a args.
356 // Regardless of success or failure, always returns a valid new call into *call
357 //
358 grpc_error_handle grpc_call_create(grpc_call_create_args* args,
359                                    grpc_call** call);
360 
361 // Given the top call_element, get the call object.
362 grpc_call* grpc_call_from_top_element(grpc_call_element* surface_element);
363 
364 }  // namespace grpc_core
365 
366 #endif  // GRPC_SRC_CORE_LIB_SURFACE_FILTER_STACK_CALL_H
367