• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/channel/channel_stack.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <stdint.h>
23 
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_fwd.h"
31 #include "src/core/lib/surface/channel_init.h"
32 #include "src/core/util/alloc.h"
33 
34 using grpc_event_engine::experimental::EventEngine;
35 
__anon817a871e0102() 36 static int register_get_name_fn = []() {
37   grpc_core::NameFromChannelFilter = [](const grpc_channel_filter* filter) {
38     return filter->name;
39   };
40   return 0;
41 }();
42 
43 // Memory layouts.
44 
45 // Channel stack is laid out as: {
46 //   grpc_channel_stack stk;
47 //   padding to GPR_MAX_ALIGNMENT
48 //   grpc_channel_element[stk.count];
49 //   per-filter memory, aligned to GPR_MAX_ALIGNMENT
50 // }
51 
52 // Call stack is laid out as: {
53 //   grpc_call_stack stk;
54 //   padding to GPR_MAX_ALIGNMENT
55 //   grpc_call_element[stk.count];
56 //   per-filter memory, aligned to GPR_MAX_ALIGNMENT
57 // }
58 
grpc_channel_stack_size(const grpc_channel_filter ** filters,size_t filter_count)59 size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
60                                size_t filter_count) {
61   // always need the header, and size for the channel elements
62   size_t size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) +
63                 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count *
64                                                sizeof(grpc_channel_element));
65   size_t i;
66 
67   CHECK((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0)
68       << "GPR_MAX_ALIGNMENT must be a power of two";
69 
70   // add the size for each filter
71   for (i = 0; i < filter_count; i++) {
72     size += GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
73   }
74 
75   return size;
76 }
77 
78 #define CHANNEL_ELEMS_FROM_STACK(stk)                                     \
79   ((grpc_channel_element*)((char*)(stk) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
80                                               sizeof(grpc_channel_stack))))
81 
82 #define CALL_ELEMS_FROM_STACK(stk)                                     \
83   ((grpc_call_element*)((char*)(stk) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
84                                            sizeof(grpc_call_stack))))
85 
grpc_channel_stack_element(grpc_channel_stack * channel_stack,size_t index)86 grpc_channel_element* grpc_channel_stack_element(
87     grpc_channel_stack* channel_stack, size_t index) {
88   return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index;
89 }
90 
grpc_channel_stack_last_element(grpc_channel_stack * channel_stack)91 grpc_channel_element* grpc_channel_stack_last_element(
92     grpc_channel_stack* channel_stack) {
93   return grpc_channel_stack_element(channel_stack, channel_stack->count - 1);
94 }
95 
grpc_channel_stack_filter_instance_number(grpc_channel_stack * channel_stack,grpc_channel_element * elem)96 size_t grpc_channel_stack_filter_instance_number(
97     grpc_channel_stack* channel_stack, grpc_channel_element* elem) {
98   size_t num_found = 0;
99   for (size_t i = 0; i < channel_stack->count; ++i) {
100     grpc_channel_element* element =
101         grpc_channel_stack_element(channel_stack, i);
102     if (element == elem) break;
103     if (element->filter == elem->filter) ++num_found;
104   }
105   return num_found;
106 }
107 
grpc_call_stack_element(grpc_call_stack * call_stack,size_t index)108 grpc_call_element* grpc_call_stack_element(grpc_call_stack* call_stack,
109                                            size_t index) {
110   return CALL_ELEMS_FROM_STACK(call_stack) + index;
111 }
112 
grpc_channel_stack_init(int initial_refs,grpc_iomgr_cb_func destroy,void * destroy_arg,const grpc_channel_filter ** filters,size_t filter_count,const grpc_core::ChannelArgs & channel_args,const char * name,grpc_channel_stack * stack,const grpc_core::Blackboard * old_blackboard,grpc_core::Blackboard * new_blackboard)113 grpc_error_handle grpc_channel_stack_init(
114     int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
115     const grpc_channel_filter** filters, size_t filter_count,
116     const grpc_core::ChannelArgs& channel_args, const char* name,
117     grpc_channel_stack* stack, const grpc_core::Blackboard* old_blackboard,
118     grpc_core::Blackboard* new_blackboard) {
119   if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
120     LOG(INFO) << "CHANNEL_STACK: init " << name;
121     for (size_t i = 0; i < filter_count; i++) {
122       LOG(INFO) << "CHANNEL_STACK:   filter " << filters[i]->name;
123     }
124   }
125 
126   stack->on_destroy.Init([]() {});
127   stack->event_engine.Init(channel_args.GetObjectRef<EventEngine>());
128   stack->stats_plugin_group.Init();
129 
130   size_t call_size =
131       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
132       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
133   grpc_channel_element* elems;
134   grpc_channel_element_args args;
135   char* user_data;
136   size_t i;
137 
138   stack->count = filter_count;
139   GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
140                        name);
141   elems = CHANNEL_ELEMS_FROM_STACK(stack);
142   user_data = (reinterpret_cast<char*>(elems)) +
143               GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count *
144                                              sizeof(grpc_channel_element));
145 
146   // init per-filter data
147   args.old_blackboard = old_blackboard;
148   args.new_blackboard = new_blackboard;
149   grpc_error_handle first_error;
150   for (i = 0; i < filter_count; i++) {
151     args.channel_stack = stack;
152     args.channel_args = channel_args;
153     args.is_first = i == 0;
154     args.is_last = i == (filter_count - 1);
155     elems[i].filter = filters[i];
156     elems[i].channel_data = user_data;
157     grpc_error_handle error =
158         elems[i].filter->init_channel_elem(&elems[i], &args);
159     if (!error.ok()) {
160       if (first_error.ok()) {
161         first_error = error;
162       }
163     }
164     user_data +=
165         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
166     call_size += GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
167   }
168 
169   CHECK(user_data > (char*)stack);
170   CHECK((uintptr_t)(user_data - (char*)stack) ==
171         grpc_channel_stack_size(filters, filter_count));
172 
173   stack->call_stack_size = call_size;
174   return first_error;
175 }
176 
grpc_channel_stack_destroy(grpc_channel_stack * stack)177 void grpc_channel_stack_destroy(grpc_channel_stack* stack) {
178   grpc_channel_element* channel_elems = CHANNEL_ELEMS_FROM_STACK(stack);
179   size_t count = stack->count;
180   size_t i;
181 
182   // destroy per-filter data
183   for (i = 0; i < count; i++) {
184     channel_elems[i].filter->destroy_channel_elem(&channel_elems[i]);
185   }
186 
187   (*stack->on_destroy)();
188   stack->on_destroy.Destroy();
189   stack->event_engine.Destroy();
190   stack->stats_plugin_group.Destroy();
191 }
192 
grpc_call_stack_init(grpc_channel_stack * channel_stack,int initial_refs,grpc_iomgr_cb_func destroy,void * destroy_arg,const grpc_call_element_args * elem_args)193 grpc_error_handle grpc_call_stack_init(
194     grpc_channel_stack* channel_stack, int initial_refs,
195     grpc_iomgr_cb_func destroy, void* destroy_arg,
196     const grpc_call_element_args* elem_args) {
197   grpc_channel_element* channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
198   size_t count = channel_stack->count;
199   grpc_call_element* call_elems;
200   char* user_data;
201 
202   elem_args->call_stack->count = count;
203   GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy,
204                        destroy_arg, "CALL_STACK");
205   call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack);
206   user_data = (reinterpret_cast<char*>(call_elems)) +
207               GPR_ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
208 
209   // init per-filter data
210   grpc_error_handle first_error;
211   for (size_t i = 0; i < count; i++) {
212     call_elems[i].filter = channel_elems[i].filter;
213     call_elems[i].channel_data = channel_elems[i].channel_data;
214     call_elems[i].call_data = user_data;
215     user_data +=
216         GPR_ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
217   }
218   for (size_t i = 0; i < count; i++) {
219     grpc_error_handle error =
220         call_elems[i].filter->init_call_elem(&call_elems[i], elem_args);
221     if (!error.ok()) {
222       if (first_error.ok()) {
223         first_error = error;
224       }
225     }
226   }
227   return first_error;
228 }
229 
grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack * call_stack,grpc_polling_entity * pollent)230 void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
231                                                 grpc_polling_entity* pollent) {
232   size_t count = call_stack->count;
233   grpc_call_element* call_elems;
234   size_t i;
235 
236   call_elems = CALL_ELEMS_FROM_STACK(call_stack);
237 
238   // init per-filter data
239   for (i = 0; i < count; i++) {
240     call_elems[i].filter->set_pollset_or_pollset_set(&call_elems[i], pollent);
241   }
242 }
243 
grpc_call_stack_ignore_set_pollset_or_pollset_set(grpc_call_element *,grpc_polling_entity *)244 void grpc_call_stack_ignore_set_pollset_or_pollset_set(
245     grpc_call_element* /*elem*/, grpc_polling_entity* /*pollent*/) {}
246 
grpc_call_stack_destroy(grpc_call_stack * stack,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)247 void grpc_call_stack_destroy(grpc_call_stack* stack,
248                              const grpc_call_final_info* final_info,
249                              grpc_closure* then_schedule_closure) {
250   grpc_call_element* elems = CALL_ELEMS_FROM_STACK(stack);
251   size_t count = stack->count;
252   size_t i;
253 
254   // destroy per-filter data
255   for (i = 0; i < count; i++) {
256     elems[i].filter->destroy_call_elem(
257         &elems[i], final_info,
258         i == count - 1 ? then_schedule_closure : nullptr);
259   }
260 }
261 
grpc_call_next_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)262 void grpc_call_next_op(grpc_call_element* elem,
263                        grpc_transport_stream_op_batch* op) {
264   grpc_call_element* next_elem = elem + 1;
265   GRPC_TRACE_LOG(channel, INFO)
266       << "OP[" << elem->filter->name << ":" << elem
267       << "]: " << grpc_transport_stream_op_batch_string(op, false);
268   next_elem->filter->start_transport_stream_op_batch(next_elem, op);
269 }
270 
grpc_channel_next_get_info(grpc_channel_element * elem,const grpc_channel_info * channel_info)271 void grpc_channel_next_get_info(grpc_channel_element* elem,
272                                 const grpc_channel_info* channel_info) {
273   grpc_channel_element* next_elem = elem + 1;
274   next_elem->filter->get_channel_info(next_elem, channel_info);
275 }
276 
grpc_channel_next_op(grpc_channel_element * elem,grpc_transport_op * op)277 void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op) {
278   grpc_channel_element* next_elem = elem + 1;
279   next_elem->filter->start_transport_op(next_elem, op);
280 }
281 
grpc_channel_stack_from_top_element(grpc_channel_element * elem)282 grpc_channel_stack* grpc_channel_stack_from_top_element(
283     grpc_channel_element* elem) {
284   return reinterpret_cast<grpc_channel_stack*>(
285       reinterpret_cast<char*>(elem) -
286       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)));
287 }
288 
grpc_call_stack_from_top_element(grpc_call_element * elem)289 grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem) {
290   return reinterpret_cast<grpc_call_stack*>(
291       reinterpret_cast<char*>(elem) -
292       GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)));
293 }
294 
grpc_channel_stack_no_post_init(grpc_channel_stack *,grpc_channel_element *)295 void grpc_channel_stack_no_post_init(grpc_channel_stack*,
296                                      grpc_channel_element*) {}
297