1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/channel/channel_stack.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <stdint.h>
23
24 #include <memory>
25 #include <utility>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/channel/channel_fwd.h"
31 #include "src/core/lib/surface/channel_init.h"
32 #include "src/core/util/alloc.h"
33
34 using grpc_event_engine::experimental::EventEngine;
35
__anon817a871e0102() 36 static int register_get_name_fn = []() {
37 grpc_core::NameFromChannelFilter = [](const grpc_channel_filter* filter) {
38 return filter->name;
39 };
40 return 0;
41 }();
42
43 // Memory layouts.
44
45 // Channel stack is laid out as: {
46 // grpc_channel_stack stk;
47 // padding to GPR_MAX_ALIGNMENT
48 // grpc_channel_element[stk.count];
49 // per-filter memory, aligned to GPR_MAX_ALIGNMENT
50 // }
51
52 // Call stack is laid out as: {
53 // grpc_call_stack stk;
54 // padding to GPR_MAX_ALIGNMENT
55 // grpc_call_element[stk.count];
56 // per-filter memory, aligned to GPR_MAX_ALIGNMENT
57 // }
58
grpc_channel_stack_size(const grpc_channel_filter ** filters,size_t filter_count)59 size_t grpc_channel_stack_size(const grpc_channel_filter** filters,
60 size_t filter_count) {
61 // always need the header, and size for the channel elements
62 size_t size = GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)) +
63 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count *
64 sizeof(grpc_channel_element));
65 size_t i;
66
67 CHECK((GPR_MAX_ALIGNMENT & (GPR_MAX_ALIGNMENT - 1)) == 0)
68 << "GPR_MAX_ALIGNMENT must be a power of two";
69
70 // add the size for each filter
71 for (i = 0; i < filter_count; i++) {
72 size += GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
73 }
74
75 return size;
76 }
77
78 #define CHANNEL_ELEMS_FROM_STACK(stk) \
79 ((grpc_channel_element*)((char*)(stk) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
80 sizeof(grpc_channel_stack))))
81
82 #define CALL_ELEMS_FROM_STACK(stk) \
83 ((grpc_call_element*)((char*)(stk) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
84 sizeof(grpc_call_stack))))
85
grpc_channel_stack_element(grpc_channel_stack * channel_stack,size_t index)86 grpc_channel_element* grpc_channel_stack_element(
87 grpc_channel_stack* channel_stack, size_t index) {
88 return CHANNEL_ELEMS_FROM_STACK(channel_stack) + index;
89 }
90
grpc_channel_stack_last_element(grpc_channel_stack * channel_stack)91 grpc_channel_element* grpc_channel_stack_last_element(
92 grpc_channel_stack* channel_stack) {
93 return grpc_channel_stack_element(channel_stack, channel_stack->count - 1);
94 }
95
grpc_channel_stack_filter_instance_number(grpc_channel_stack * channel_stack,grpc_channel_element * elem)96 size_t grpc_channel_stack_filter_instance_number(
97 grpc_channel_stack* channel_stack, grpc_channel_element* elem) {
98 size_t num_found = 0;
99 for (size_t i = 0; i < channel_stack->count; ++i) {
100 grpc_channel_element* element =
101 grpc_channel_stack_element(channel_stack, i);
102 if (element == elem) break;
103 if (element->filter == elem->filter) ++num_found;
104 }
105 return num_found;
106 }
107
grpc_call_stack_element(grpc_call_stack * call_stack,size_t index)108 grpc_call_element* grpc_call_stack_element(grpc_call_stack* call_stack,
109 size_t index) {
110 return CALL_ELEMS_FROM_STACK(call_stack) + index;
111 }
112
grpc_channel_stack_init(int initial_refs,grpc_iomgr_cb_func destroy,void * destroy_arg,const grpc_channel_filter ** filters,size_t filter_count,const grpc_core::ChannelArgs & channel_args,const char * name,grpc_channel_stack * stack,const grpc_core::Blackboard * old_blackboard,grpc_core::Blackboard * new_blackboard)113 grpc_error_handle grpc_channel_stack_init(
114 int initial_refs, grpc_iomgr_cb_func destroy, void* destroy_arg,
115 const grpc_channel_filter** filters, size_t filter_count,
116 const grpc_core::ChannelArgs& channel_args, const char* name,
117 grpc_channel_stack* stack, const grpc_core::Blackboard* old_blackboard,
118 grpc_core::Blackboard* new_blackboard) {
119 if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
120 LOG(INFO) << "CHANNEL_STACK: init " << name;
121 for (size_t i = 0; i < filter_count; i++) {
122 LOG(INFO) << "CHANNEL_STACK: filter " << filters[i]->name;
123 }
124 }
125
126 stack->on_destroy.Init([]() {});
127 stack->event_engine.Init(channel_args.GetObjectRef<EventEngine>());
128 stack->stats_plugin_group.Init();
129
130 size_t call_size =
131 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)) +
132 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count * sizeof(grpc_call_element));
133 grpc_channel_element* elems;
134 grpc_channel_element_args args;
135 char* user_data;
136 size_t i;
137
138 stack->count = filter_count;
139 GRPC_STREAM_REF_INIT(&stack->refcount, initial_refs, destroy, destroy_arg,
140 name);
141 elems = CHANNEL_ELEMS_FROM_STACK(stack);
142 user_data = (reinterpret_cast<char*>(elems)) +
143 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filter_count *
144 sizeof(grpc_channel_element));
145
146 // init per-filter data
147 args.old_blackboard = old_blackboard;
148 args.new_blackboard = new_blackboard;
149 grpc_error_handle first_error;
150 for (i = 0; i < filter_count; i++) {
151 args.channel_stack = stack;
152 args.channel_args = channel_args;
153 args.is_first = i == 0;
154 args.is_last = i == (filter_count - 1);
155 elems[i].filter = filters[i];
156 elems[i].channel_data = user_data;
157 grpc_error_handle error =
158 elems[i].filter->init_channel_elem(&elems[i], &args);
159 if (!error.ok()) {
160 if (first_error.ok()) {
161 first_error = error;
162 }
163 }
164 user_data +=
165 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
166 call_size += GPR_ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
167 }
168
169 CHECK(user_data > (char*)stack);
170 CHECK((uintptr_t)(user_data - (char*)stack) ==
171 grpc_channel_stack_size(filters, filter_count));
172
173 stack->call_stack_size = call_size;
174 return first_error;
175 }
176
grpc_channel_stack_destroy(grpc_channel_stack * stack)177 void grpc_channel_stack_destroy(grpc_channel_stack* stack) {
178 grpc_channel_element* channel_elems = CHANNEL_ELEMS_FROM_STACK(stack);
179 size_t count = stack->count;
180 size_t i;
181
182 // destroy per-filter data
183 for (i = 0; i < count; i++) {
184 channel_elems[i].filter->destroy_channel_elem(&channel_elems[i]);
185 }
186
187 (*stack->on_destroy)();
188 stack->on_destroy.Destroy();
189 stack->event_engine.Destroy();
190 stack->stats_plugin_group.Destroy();
191 }
192
grpc_call_stack_init(grpc_channel_stack * channel_stack,int initial_refs,grpc_iomgr_cb_func destroy,void * destroy_arg,const grpc_call_element_args * elem_args)193 grpc_error_handle grpc_call_stack_init(
194 grpc_channel_stack* channel_stack, int initial_refs,
195 grpc_iomgr_cb_func destroy, void* destroy_arg,
196 const grpc_call_element_args* elem_args) {
197 grpc_channel_element* channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
198 size_t count = channel_stack->count;
199 grpc_call_element* call_elems;
200 char* user_data;
201
202 elem_args->call_stack->count = count;
203 GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy,
204 destroy_arg, "CALL_STACK");
205 call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack);
206 user_data = (reinterpret_cast<char*>(call_elems)) +
207 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
208
209 // init per-filter data
210 grpc_error_handle first_error;
211 for (size_t i = 0; i < count; i++) {
212 call_elems[i].filter = channel_elems[i].filter;
213 call_elems[i].channel_data = channel_elems[i].channel_data;
214 call_elems[i].call_data = user_data;
215 user_data +=
216 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
217 }
218 for (size_t i = 0; i < count; i++) {
219 grpc_error_handle error =
220 call_elems[i].filter->init_call_elem(&call_elems[i], elem_args);
221 if (!error.ok()) {
222 if (first_error.ok()) {
223 first_error = error;
224 }
225 }
226 }
227 return first_error;
228 }
229
grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack * call_stack,grpc_polling_entity * pollent)230 void grpc_call_stack_set_pollset_or_pollset_set(grpc_call_stack* call_stack,
231 grpc_polling_entity* pollent) {
232 size_t count = call_stack->count;
233 grpc_call_element* call_elems;
234 size_t i;
235
236 call_elems = CALL_ELEMS_FROM_STACK(call_stack);
237
238 // init per-filter data
239 for (i = 0; i < count; i++) {
240 call_elems[i].filter->set_pollset_or_pollset_set(&call_elems[i], pollent);
241 }
242 }
243
grpc_call_stack_ignore_set_pollset_or_pollset_set(grpc_call_element *,grpc_polling_entity *)244 void grpc_call_stack_ignore_set_pollset_or_pollset_set(
245 grpc_call_element* /*elem*/, grpc_polling_entity* /*pollent*/) {}
246
grpc_call_stack_destroy(grpc_call_stack * stack,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)247 void grpc_call_stack_destroy(grpc_call_stack* stack,
248 const grpc_call_final_info* final_info,
249 grpc_closure* then_schedule_closure) {
250 grpc_call_element* elems = CALL_ELEMS_FROM_STACK(stack);
251 size_t count = stack->count;
252 size_t i;
253
254 // destroy per-filter data
255 for (i = 0; i < count; i++) {
256 elems[i].filter->destroy_call_elem(
257 &elems[i], final_info,
258 i == count - 1 ? then_schedule_closure : nullptr);
259 }
260 }
261
grpc_call_next_op(grpc_call_element * elem,grpc_transport_stream_op_batch * op)262 void grpc_call_next_op(grpc_call_element* elem,
263 grpc_transport_stream_op_batch* op) {
264 grpc_call_element* next_elem = elem + 1;
265 GRPC_TRACE_LOG(channel, INFO)
266 << "OP[" << elem->filter->name << ":" << elem
267 << "]: " << grpc_transport_stream_op_batch_string(op, false);
268 next_elem->filter->start_transport_stream_op_batch(next_elem, op);
269 }
270
grpc_channel_next_get_info(grpc_channel_element * elem,const grpc_channel_info * channel_info)271 void grpc_channel_next_get_info(grpc_channel_element* elem,
272 const grpc_channel_info* channel_info) {
273 grpc_channel_element* next_elem = elem + 1;
274 next_elem->filter->get_channel_info(next_elem, channel_info);
275 }
276
grpc_channel_next_op(grpc_channel_element * elem,grpc_transport_op * op)277 void grpc_channel_next_op(grpc_channel_element* elem, grpc_transport_op* op) {
278 grpc_channel_element* next_elem = elem + 1;
279 next_elem->filter->start_transport_op(next_elem, op);
280 }
281
grpc_channel_stack_from_top_element(grpc_channel_element * elem)282 grpc_channel_stack* grpc_channel_stack_from_top_element(
283 grpc_channel_element* elem) {
284 return reinterpret_cast<grpc_channel_stack*>(
285 reinterpret_cast<char*>(elem) -
286 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_channel_stack)));
287 }
288
grpc_call_stack_from_top_element(grpc_call_element * elem)289 grpc_call_stack* grpc_call_stack_from_top_element(grpc_call_element* elem) {
290 return reinterpret_cast<grpc_call_stack*>(
291 reinterpret_cast<char*>(elem) -
292 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_call_stack)));
293 }
294
grpc_channel_stack_no_post_init(grpc_channel_stack *,grpc_channel_element *)295 void grpc_channel_stack_no_post_init(grpc_channel_stack*,
296 grpc_channel_element*) {}
297