• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
20 #define GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
21 
22 // A channel filter defines how operations on a channel are implemented.
23 // Channel filters are chained together to create full channels, and if those
24 // chains are linear, then channel stacks provide a mechanism to minimize
25 // allocations for that chain.
26 // Call stacks are created by channel stacks and represent the per-call data
27 // for that stack.
28 
29 // Implementations should take care of the following details for a batch -
30 // 1. Synchronization is achieved with a CallCombiner. View
31 // src/core/lib/iomgr/call_combiner.h for more details.
32 // 2. If the filter wants to inject an error on the way down, it needs to call
33 // grpc_transport_stream_op_batch_finish_with_failure from within the call
34 // combiner. This will cause any batch callbacks to be called with that error.
35 // 3. If the filter wants to inject an error on the way up (from a callback), it
36 // should also inject that error in the recv_trailing_metadata callback so that
37 // it can have an effect on the call status.
38 //
39 
40 #include <grpc/event_engine/event_engine.h>
41 #include <grpc/grpc.h>
42 #include <grpc/slice.h>
43 #include <grpc/status.h>
44 #include <grpc/support/port_platform.h>
45 #include <grpc/support/time.h>
46 #include <stddef.h>
47 
48 #include <functional>
49 #include <memory>
50 
51 #include "src/core/lib/channel/channel_args.h"
52 #include "src/core/lib/channel/channel_fwd.h"
53 #include "src/core/lib/debug/trace.h"
54 #include "src/core/lib/iomgr/call_combiner.h"
55 #include "src/core/lib/iomgr/closure.h"
56 #include "src/core/lib/iomgr/error.h"
57 #include "src/core/lib/iomgr/polling_entity.h"
58 #include "src/core/lib/promise/arena_promise.h"
59 #include "src/core/lib/resource_quota/arena.h"
60 #include "src/core/lib/transport/call_final_info.h"
61 #include "src/core/lib/transport/transport.h"
62 #include "src/core/telemetry/metrics.h"
63 #include "src/core/util/manual_constructor.h"
64 #include "src/core/util/ref_counted_ptr.h"
65 #include "src/core/util/time.h"
66 #include "src/core/util/time_precise.h"
67 #include "src/core/util/unique_type_name.h"
68 
69 namespace grpc_core {
70 class Blackboard;
71 }  // namespace grpc_core
72 
73 struct grpc_channel_element_args {
74   grpc_channel_stack* channel_stack;
75   grpc_core::ChannelArgs channel_args;
76   int is_first;
77   int is_last;
78   const grpc_core::Blackboard* old_blackboard;
79   grpc_core::Blackboard* new_blackboard;
80 };
81 struct grpc_call_element_args {
82   grpc_call_stack* call_stack;
83   const void* server_transport_data;
84   const grpc_slice& path;
85   gpr_cycle_counter start_time;  // Note: not populated in subchannel stack.
86   grpc_core::Timestamp deadline;
87   grpc_core::Arena* arena;
88   grpc_core::CallCombiner* call_combiner;
89 };
90 
91 // Channel filters specify:
92 // 1. the amount of memory needed in the channel & call (via the sizeof_XXX
93 //    members)
94 // 2. functions to initialize and destroy channel & call data
95 //    (init_XXX, destroy_XXX)
96 // 3. functions to implement call operations and channel operations (call_op,
97 //    channel_op)
98 // 4. a name, which is useful when debugging
99 
100 // Members are laid out in approximate frequency of use order.
101 struct grpc_channel_filter {
102   // Called to eg. send/receive data on a call.
103   // See grpc_call_next_op on how to call the next element in the stack
104   void (*start_transport_stream_op_batch)(grpc_call_element* elem,
105                                           grpc_transport_stream_op_batch* op);
106   // Called to handle channel level operations - e.g. new calls, or transport
107   // closure.
108   // See grpc_channel_next_op on how to call the next element in the stack
109   void (*start_transport_op)(grpc_channel_element* elem, grpc_transport_op* op);
110 
111   // sizeof(per call data)
112   size_t sizeof_call_data;
113   // Initialize per call data.
114   // elem is initialized at the start of the call, and elem->call_data is what
115   // needs initializing.
116   // The filter does not need to do any chaining.
117   // server_transport_data is an opaque pointer. If it is NULL, this call is
118   // on a client; if it is non-NULL, then it points to memory owned by the
119   // transport and is on the server. Most filters want to ignore this
120   // argument.
121   // Implementations may assume that elem->call_data is all zeros.
122   grpc_error_handle (*init_call_elem)(grpc_call_element* elem,
123                                       const grpc_call_element_args* args);
124   void (*set_pollset_or_pollset_set)(grpc_call_element* elem,
125                                      grpc_polling_entity* pollent);
126   // Destroy per call data.
127   // The filter does not need to do any chaining.
128   // The bottom filter of a stack will be passed a non-NULL pointer to
129   // \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when
130   // destruction is complete. \a final_info contains data about the completed
131   // call, mainly for reporting purposes.
132   void (*destroy_call_elem)(grpc_call_element* elem,
133                             const grpc_call_final_info* final_info,
134                             grpc_closure* then_schedule_closure);
135 
136   // sizeof(per channel data)
137   size_t sizeof_channel_data;
138   // Initialize per-channel data.
139   // elem is initialized at the creating of the channel, and elem->channel_data
140   // is what needs initializing.
141   // is_first, is_last designate this elements position in the stack, and are
142   // useful for asserting correct configuration by upper layer code.
143   // The filter does not need to do any chaining.
144   // Implementations may assume that elem->channel_data is all zeros.
145   grpc_error_handle (*init_channel_elem)(grpc_channel_element* elem,
146                                          grpc_channel_element_args* args);
147   // Post init per-channel data.
148   // Called after all channel elements have been successfully created.
149   void (*post_init_channel_elem)(grpc_channel_stack* stk,
150                                  grpc_channel_element* elem);
151   // Destroy per channel data.
152   // The filter does not need to do any chaining
153   void (*destroy_channel_elem)(grpc_channel_element* elem);
154 
155   // Implement grpc_channel_get_info()
156   void (*get_channel_info)(grpc_channel_element* elem,
157                            const grpc_channel_info* channel_info);
158 
159   // The name of this filter
160   grpc_core::UniqueTypeName name;
161 };
162 // A channel_element tracks its filter and the filter requested memory within
163 // a channel allocation
164 struct grpc_channel_element {
165   const grpc_channel_filter* filter;
166   void* channel_data;
167 };
168 
169 // A call_element tracks its filter, the filter requested memory within
170 // a channel allocation, and the filter requested memory within a call
171 // allocation
172 struct grpc_call_element {
173   const grpc_channel_filter* filter;
174   void* channel_data;
175   void* call_data;
176 };
177 
178 // A channel stack tracks a set of related filters for one channel, and
179 // guarantees they live within a single malloc() allocation
180 struct grpc_channel_stack {
181   grpc_stream_refcount refcount;
182   size_t count;
183   // Memory required for a call stack (computed at channel stack
184   // initialization)
185   size_t call_stack_size;
186   // TODO(ctiller): remove this mechanism... it's a hack to allow
187   // Channel to be separated from grpc_channel_stack's allocation. As the
188   // promise conversion continues, we'll reconsider what grpc_channel_stack
189   // should look like and this can go.
190   grpc_core::ManualConstructor<std::function<void()>> on_destroy;
191 
192   grpc_core::ManualConstructor<
193       std::shared_ptr<grpc_event_engine::experimental::EventEngine>>
194       event_engine;
195 
EventEnginegrpc_channel_stack196   grpc_event_engine::experimental::EventEngine* EventEngine() const {
197     return event_engine->get();
198   }
199 
200   grpc_core::ManualConstructor<
201       grpc_core::GlobalStatsPluginRegistry::StatsPluginGroup>
202       stats_plugin_group;
203 
204   // Minimal infrastructure to act like a RefCounted thing without converting
205   // everything.
206   // It's likely that we'll want to replace grpc_channel_stack with something
207   // less regimented once the promise conversion completes, so avoiding doing a
208   // full C++-ification for now.
209   void IncrementRefCount();
210   void Unref();
211   void Unref(const grpc_core::DebugLocation& location, const char* reason);
Refgrpc_channel_stack212   grpc_core::RefCountedPtr<grpc_channel_stack> Ref() {
213     IncrementRefCount();
214     return grpc_core::RefCountedPtr<grpc_channel_stack>(this);
215   }
216 };
217 
218 // A call stack tracks a set of related filters for one call, and guarantees
219 // they live within a single malloc() allocation
220 struct grpc_call_stack {
221   // shared refcount for this channel stack.
222   // MUST be the first element: the underlying code calls destroy
223   // with the address of the refcount, but higher layers prefer to think
224   // about the address of the call stack itself.
225   grpc_stream_refcount refcount;
226   size_t count;
227 
228   // Minimal infrastructure to act like a RefCounted thing without converting
229   // everything.
230   // grpc_call_stack will be eliminated once the promise conversion completes.
231   void IncrementRefCount();
232   void Unref();
Refgrpc_call_stack233   grpc_core::RefCountedPtr<grpc_call_stack> Ref() {
234     IncrementRefCount();
235     return grpc_core::RefCountedPtr<grpc_call_stack>(this);
236   }
237 };
238 
239 // Get a channel element given a channel stack and its index
240 grpc_channel_element* grpc_channel_stack_element(grpc_channel_stack* stack,
241                                                  size_t i);
242 // Get the last channel element in a channel stack
243 grpc_channel_element* grpc_channel_stack_last_element(
244     grpc_channel_stack* stack);
245 
246 // A utility function for a filter to determine how many other instances
247 // of the same filter exist above it in the same stack.  Intended to be
248 // used in the filter's init_channel_elem() method.
249 size_t grpc_channel_stack_filter_instance_number(
250     grpc_channel_stack* channel_stack, grpc_channel_element* elem);
251 
252 // Get a call stack element given a call stack and an index
253 grpc_call_element* grpc_call_stack_element(grpc_call_stack* stack, size_t i);
254 
255 // Determine memory required for a channel stack containing a set of filters
256 size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
257                                size_t filter_count);
258 // Initialize a channel stack given some filters
259 grpc_error_handle grpc_channel_stack_init(
260     int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
261     const grpc_channel_filter** filters, size_t filter_count,
262     const grpc_core::ChannelArgs& args, const char* name,
263     grpc_channel_stack* stack,
264     const grpc_core::Blackboard* old_blackboard = nullptr,
265     grpc_core::Blackboard* new_blackboard = nullptr);
266 // Destroy a channel stack
267 void grpc_channel_stack_destroy(grpc_channel_stack* stack);
268 
269 // Initialize a call stack given a channel stack. transport_server_data is
270 // expected to be NULL on a client, or an opaque transport owned pointer on the
271 // server.
272 grpc_error_handle grpc_call_stack_init(grpc_channel_stack* channel_stack,
273                                        int initial_refs,
274                                        grpc_iomgr_cb_func destroy,
275                                        void* destroy_arg,
276                                        const grpc_call_element_args* elem_args);
277 // Set a pollset or a pollset_set for a call stack: must occur before the first
278 // op is started
279 void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
280                                                 grpc_polling_entity* pollent);
281 
282 #ifndef NDEBUG
283 #define GRPC_CALL_STACK_REF(call_stack, reason) \
284   grpc_stream_ref(&(call_stack)->refcount, reason)
285 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \
286   grpc_stream_unref(&(call_stack)->refcount, reason)
287 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
288   grpc_stream_ref(&(channel_stack)->refcount, reason)
289 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
290   grpc_stream_unref(&(channel_stack)->refcount, reason)
291 #else
292 #define GRPC_CALL_STACK_REF(call_stack, reason) \
293   do {                                          \
294     grpc_stream_ref(&(call_stack)->refcount);   \
295     (void)(reason);                             \
296   } while (0);
297 #define GRPC_CALL_STACK_UNREF(call_stack, reason) \
298   do {                                            \
299     grpc_stream_unref(&(call_stack)->refcount);   \
300     (void)(reason);                               \
301   } while (0);
302 #define GRPC_CHANNEL_STACK_REF(channel_stack, reason) \
303   do {                                                \
304     grpc_stream_ref(&(channel_stack)->refcount);      \
305     (void)(reason);                                   \
306   } while (0);
307 #define GRPC_CHANNEL_STACK_UNREF(channel_stack, reason) \
308   do {                                                  \
309     grpc_stream_unref(&(channel_stack)->refcount);      \
310     (void)(reason);                                     \
311   } while (0);
312 #endif
313 
IncrementRefCount()314 inline void grpc_channel_stack::IncrementRefCount() {
315   GRPC_CHANNEL_STACK_REF(this, "smart_pointer");
316 }
317 
Unref()318 inline void grpc_channel_stack::Unref() {
319   GRPC_CHANNEL_STACK_UNREF(this, "smart_pointer");
320 }
321 
Unref(const grpc_core::DebugLocation &,const char * reason)322 inline void grpc_channel_stack::Unref(const grpc_core::DebugLocation&,
323                                       const char* reason) {
324   GRPC_CHANNEL_STACK_UNREF(this, reason);
325 }
326 
IncrementRefCount()327 inline void grpc_call_stack::IncrementRefCount() {
328   GRPC_CALL_STACK_REF(this, "smart_pointer");
329 }
330 
Unref()331 inline void grpc_call_stack::Unref() {
332   GRPC_CALL_STACK_UNREF(this, "smart_pointer");
333 }
334 
335 // Destroy a call stack
336 void grpc_call_stack_destroy(grpc_call_stack* stack,
337                              const grpc_call_final_info* final_info,
338                              grpc_closure* then_schedule_closure);
339 
340 // Ignore set pollset{_set} - used by filters if they don't care about pollsets
341 // at all. Does nothing.
342 void grpc_call_stack_ignore_set_pollset_or_pollset_set(
343     grpc_call_element* elem, grpc_polling_entity* pollent);
344 // Call the next operation in a call stack
345 void grpc_call_next_op(grpc_call_element* elem,
346                        grpc_transport_stream_op_batch* op);
347 // Call the next operation (depending on call directionality) in a channel
348 // stack
349 void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op);
350 // Pass through a request to get_channel_info() to the next child element
351 void grpc_channel_next_get_info(grpc_channel_element* elem,
352                                 const grpc_channel_info* channel_info);
353 
354 // Given the top element of a channel stack, get the channel stack itself
355 grpc_channel_stack* grpc_channel_stack_from_top_element(
356     grpc_channel_element* elem);
357 // Given the top element of a call stack, get the call stack itself
358 grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem);
359 
360 void grpc_channel_stack_no_post_init(grpc_channel_stack* stk,
361                                      grpc_channel_element* elem);
362 
363 #endif  // GRPC_SRC_CORE_LIB_CHANNEL_CHANNEL_STACK_H
364