1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/surface/channel_init.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <string.h>
23
24 #include <algorithm>
25 #include <map>
26 #include <queue>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/lib/surface/channel_stack_type.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/sync.h"
41 #include "src/core/util/unique_type_name.h"
42
43 namespace grpc_core {
44
45 UniqueTypeName (*NameFromChannelFilter)(const grpc_channel_filter*);
46
47 namespace {
48 struct CompareChannelFiltersByName {
operator ()grpc_core::__anon1aef240c0111::CompareChannelFiltersByName49 bool operator()(UniqueTypeName a, UniqueTypeName b) const {
50 // Compare lexicographically instead of by pointer value so that different
51 // builds make the same choices.
52 return a.name() < b.name();
53 }
54 };
55 } // namespace
56
After(std::initializer_list<UniqueTypeName> filters)57 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
58 std::initializer_list<UniqueTypeName> filters) {
59 for (auto filter : filters) {
60 after_.push_back(filter);
61 }
62 return *this;
63 }
64
Before(std::initializer_list<UniqueTypeName> filters)65 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::Before(
66 std::initializer_list<UniqueTypeName> filters) {
67 for (auto filter : filters) {
68 before_.push_back(filter);
69 }
70 return *this;
71 }
72
If(InclusionPredicate predicate)73 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::If(
74 InclusionPredicate predicate) {
75 predicates_.emplace_back(std::move(predicate));
76 return *this;
77 }
78
IfNot(InclusionPredicate predicate)79 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfNot(
80 InclusionPredicate predicate) {
81 predicates_.emplace_back(
82 [predicate = std::move(predicate)](const ChannelArgs& args) {
83 return !predicate(args);
84 });
85 return *this;
86 }
87
88 ChannelInit::FilterRegistration&
IfHasChannelArg(const char * arg)89 ChannelInit::FilterRegistration::IfHasChannelArg(const char* arg) {
90 return If([arg](const ChannelArgs& args) { return args.Contains(arg); });
91 }
92
IfChannelArg(const char * arg,bool default_value)93 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfChannelArg(
94 const char* arg, bool default_value) {
95 return If([arg, default_value](const ChannelArgs& args) {
96 return args.GetBool(arg).value_or(default_value);
97 });
98 }
99
100 ChannelInit::FilterRegistration&
ExcludeFromMinimalStack()101 ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
102 return If([](const ChannelArgs& args) { return !args.WantMinimalStack(); });
103 }
104
RegisterFilter(grpc_channel_stack_type type,UniqueTypeName name,const grpc_channel_filter * filter,FilterAdder filter_adder,SourceLocation registration_source)105 ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
106 grpc_channel_stack_type type, UniqueTypeName name,
107 const grpc_channel_filter* filter, FilterAdder filter_adder,
108 SourceLocation registration_source) {
109 filters_[type].emplace_back(std::make_unique<FilterRegistration>(
110 name, filter, filter_adder, registration_source));
111 return *filters_[type].back();
112 }
113
114 class ChannelInit::DependencyTracker {
115 public:
116 // Declare that a filter exists.
Declare(FilterRegistration * registration)117 void Declare(FilterRegistration* registration) {
118 nodes_.emplace(registration->name_, registration);
119 }
120 // Insert an edge from a to b
121 // Both nodes must be declared.
InsertEdge(UniqueTypeName a,UniqueTypeName b)122 void InsertEdge(UniqueTypeName a, UniqueTypeName b) {
123 auto it_a = nodes_.find(a);
124 auto it_b = nodes_.find(b);
125 if (it_a == nodes_.end()) {
126 GRPC_TRACE_LOG(channel_stack, INFO)
127 << "gRPC Filter " << a.name()
128 << " was not declared before adding an edge to " << b.name();
129 return;
130 }
131 if (it_b == nodes_.end()) {
132 GRPC_TRACE_LOG(channel_stack, INFO)
133 << "gRPC Filter " << b.name()
134 << " was not declared before adding an edge from " << a.name();
135 return;
136 }
137 auto& node_a = it_a->second;
138 auto& node_b = it_b->second;
139 node_a.dependents.push_back(&node_b);
140 node_b.all_dependencies.push_back(a);
141 ++node_b.waiting_dependencies;
142 }
143
144 // Finish the dependency graph and begin iteration.
FinishDependencyMap()145 void FinishDependencyMap() {
146 for (auto& p : nodes_) {
147 if (p.second.waiting_dependencies == 0) {
148 ready_dependencies_.emplace(&p.second);
149 }
150 }
151 }
152
Next()153 FilterRegistration* Next() {
154 if (ready_dependencies_.empty()) {
155 CHECK_EQ(nodes_taken_, nodes_.size()) << "Unresolvable graph of channel "
156 "filters:\n"
157 << GraphString();
158 return nullptr;
159 }
160 auto next = ready_dependencies_.top();
161 ready_dependencies_.pop();
162 if (!ready_dependencies_.empty() &&
163 next.node->ordering() != Ordering::kDefault) {
164 // Constraint: if we use ordering other than default, then we must have an
165 // unambiguous pick. If there is ambiguity, we must fix it by adding
166 // explicit ordering constraints.
167 CHECK_NE(next.node->ordering(),
168 ready_dependencies_.top().node->ordering())
169 << "Ambiguous ordering between " << next.node->name() << " and "
170 << ready_dependencies_.top().node->name();
171 }
172 for (Node* dependent : next.node->dependents) {
173 CHECK_GT(dependent->waiting_dependencies, 0u);
174 --dependent->waiting_dependencies;
175 if (dependent->waiting_dependencies == 0) {
176 ready_dependencies_.emplace(dependent);
177 }
178 }
179 ++nodes_taken_;
180 return next.node->registration;
181 }
182
183 // Debug helper to dump the graph
GraphString() const184 std::string GraphString() const {
185 std::string result;
186 for (const auto& p : nodes_) {
187 absl::StrAppend(&result, p.first, " ->");
188 for (const auto& d : p.second.all_dependencies) {
189 absl::StrAppend(&result, " ", d);
190 }
191 absl::StrAppend(&result, "\n");
192 }
193 return result;
194 }
195
DependenciesFor(UniqueTypeName name) const196 absl::Span<const UniqueTypeName> DependenciesFor(UniqueTypeName name) const {
197 auto it = nodes_.find(name);
198 CHECK(it != nodes_.end()) << "Filter " << name.name() << " not found";
199 return it->second.all_dependencies;
200 }
201
202 private:
203 struct Node {
Nodegrpc_core::ChannelInit::DependencyTracker::Node204 explicit Node(FilterRegistration* registration)
205 : registration(registration) {}
206 // Nodes that depend on this node
207 std::vector<Node*> dependents;
208 // Nodes that this node depends on - for debugging purposes only
209 std::vector<UniqueTypeName> all_dependencies;
210 // The registration for this node
211 FilterRegistration* registration;
212 // Number of nodes this node is waiting on
213 size_t waiting_dependencies = 0;
214
orderinggrpc_core::ChannelInit::DependencyTracker::Node215 Ordering ordering() const { return registration->ordering_; }
namegrpc_core::ChannelInit::DependencyTracker::Node216 absl::string_view name() const { return registration->name_.name(); }
217 };
218 struct ReadyDependency {
ReadyDependencygrpc_core::ChannelInit::DependencyTracker::ReadyDependency219 explicit ReadyDependency(Node* node) : node(node) {}
220 Node* node;
operator <grpc_core::ChannelInit::DependencyTracker::ReadyDependency221 bool operator<(const ReadyDependency& other) const {
222 // Sort first on ordering, and then lexically on name.
223 // The lexical sort means that the ordering is stable between builds
224 // (UniqueTypeName ordering is not stable between builds).
225 return node->ordering() > other.node->ordering() ||
226 (node->ordering() == other.node->ordering() &&
227 node->name() > other.node->name());
228 }
229 };
230 absl::flat_hash_map<UniqueTypeName, Node> nodes_;
231 std::priority_queue<ReadyDependency> ready_dependencies_;
232 size_t nodes_taken_ = 0;
233 };
234
BuildStackConfig(const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>> & registrations,PostProcessor * post_processors,grpc_channel_stack_type type)235 ChannelInit::StackConfig ChannelInit::BuildStackConfig(
236 const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
237 registrations,
238 PostProcessor* post_processors, grpc_channel_stack_type type) {
239 // Phase 1: Build a map from filter to the set of filters that must be
240 // initialized before it.
241 // We order this map (and the set of dependent filters) by filter name to
242 // ensure algorithm ordering stability is deterministic for a given build.
243 // We should not require this, but at the time of writing it's expected that
244 // this will help overall stability.
245 DependencyTracker dependencies;
246 std::vector<Filter> terminal_filters;
247 for (const auto& registration : registrations) {
248 if (registration->terminal_) {
249 CHECK(registration->after_.empty());
250 CHECK(registration->before_.empty());
251 CHECK(!registration->before_all_);
252 CHECK_EQ(registration->ordering_, Ordering::kDefault);
253 terminal_filters.emplace_back(
254 registration->name_, registration->filter_, nullptr,
255 std::move(registration->predicates_), registration->version_,
256 registration->ordering_, registration->registration_source_);
257 } else {
258 dependencies.Declare(registration.get());
259 }
260 }
261 for (const auto& registration : registrations) {
262 if (registration->terminal_) continue;
263 for (UniqueTypeName after : registration->after_) {
264 dependencies.InsertEdge(after, registration->name_);
265 }
266 for (UniqueTypeName before : registration->before_) {
267 dependencies.InsertEdge(registration->name_, before);
268 }
269 if (registration->before_all_) {
270 for (const auto& other : registrations) {
271 if (other.get() == registration.get()) continue;
272 if (other->terminal_) continue;
273 dependencies.InsertEdge(registration->name_, other->name_);
274 }
275 }
276 }
277 // Phase 2: Build a list of filters in dependency order.
278 // We can simply iterate through and add anything with no dependency.
279 // We then remove that filter from the dependency list of all other filters.
280 // We repeat until we have no more filters to add.
281 dependencies.FinishDependencyMap();
282 std::vector<Filter> filters;
283 while (auto registration = dependencies.Next()) {
284 filters.emplace_back(
285 registration->name_, registration->filter_, registration->filter_adder_,
286 std::move(registration->predicates_), registration->version_,
287 registration->ordering_, registration->registration_source_);
288 }
289 // Collect post processors that need to be applied.
290 // We've already ensured the one-per-slot constraint, so now we can just
291 // collect everything up into a vector and run it in order.
292 std::vector<PostProcessor> post_processor_functions;
293 for (int i = 0; i < static_cast<int>(PostProcessorSlot::kCount); i++) {
294 if (post_processors[i] == nullptr) continue;
295 post_processor_functions.emplace_back(std::move(post_processors[i]));
296 }
297 // Log out the graph we built if that's been requested.
298 if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
299 PrintChannelStackTrace(type, registrations, dependencies, filters,
300 terminal_filters);
301 }
302 // Check if there are no terminal filters: this would be an error.
303 // GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
304 // condition here.
305 // Right now we only log: many tests end up with a core configuration that
306 // is invalid.
307 // TODO(ctiller): evaluate if we can turn this into a crash one day.
308 // Right now it forces too many tests to know about channel initialization,
309 // either by supplying a valid configuration or by including an opt-out flag.
310 if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
311 LOG(ERROR) << "No terminal filters registered for channel stack type "
312 << grpc_channel_stack_type_string(type)
313 << "; this is common for unit tests messing with "
314 "CoreConfiguration, but will result in a "
315 "ChannelInit::CreateStack that never completes successfully.";
316 }
317 return StackConfig{std::move(filters), std::move(terminal_filters),
318 std::move(post_processor_functions)};
319 };
320
PrintChannelStackTrace(grpc_channel_stack_type type,const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>> & registrations,const DependencyTracker & dependencies,const std::vector<Filter> & filters,const std::vector<Filter> & terminal_filters)321 void ChannelInit::PrintChannelStackTrace(
322 grpc_channel_stack_type type,
323 const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
324 registrations,
325 const DependencyTracker& dependencies, const std::vector<Filter>& filters,
326 const std::vector<Filter>& terminal_filters) {
327 // It can happen that multiple threads attempt to construct a core config at
328 // once.
329 // This is benign - the first one wins and others are discarded.
330 // However, it messes up our logging and makes it harder to reason about the
331 // graph, so we add some protection here.
332 static Mutex* const m = new Mutex();
333 MutexLock lock(m);
334 // List the channel stack type (since we'll be repeatedly printing graphs in
335 // this loop).
336 LOG(INFO) << "ORDERED CHANNEL STACK " << grpc_channel_stack_type_string(type)
337 << ":";
338 // First build up a map of filter -> file:line: strings, because it helps
339 // the readability of this log to get later fields aligned vertically.
340 absl::flat_hash_map<UniqueTypeName, std::string> loc_strs;
341 size_t max_loc_str_len = 0;
342 size_t max_filter_name_len = 0;
343 auto add_loc_str = [&max_loc_str_len, &loc_strs, ®istrations,
344 &max_filter_name_len](UniqueTypeName name) {
345 max_filter_name_len = std::max(name.name().length(), max_filter_name_len);
346 for (const auto& registration : registrations) {
347 if (registration->name_ == name) {
348 auto source = registration->registration_source_;
349 absl::string_view file = source.file();
350 auto slash_pos = file.rfind('/');
351 if (slash_pos != file.npos) {
352 file = file.substr(slash_pos + 1);
353 }
354 auto loc_str = absl::StrCat(file, ":", source.line(), ":");
355 max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
356 loc_strs.emplace(name, std::move(loc_str));
357 break;
358 }
359 }
360 };
361 for (const auto& filter : filters) {
362 add_loc_str(filter.name);
363 }
364 for (const auto& terminal : terminal_filters) {
365 add_loc_str(terminal.name);
366 }
367 for (auto& loc_str : loc_strs) {
368 loc_str.second = absl::StrCat(
369 loc_str.second,
370 std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
371 }
372 // For each regular filter, print the location registered, the name of the
373 // filter, and if it needed to occur after some other filters list those
374 // filters too.
375 // Note that we use the processed after list here - earlier we turned Before
376 // registrations into After registrations and we used those converted
377 // registrations to build the final ordering.
378 // If you're trying to track down why 'A' is listed as after 'B', look at
379 // the following:
380 // - If A is registered with .After({B}), then A will be 'after' B here.
381 // - If B is registered with .Before({A}), then A will be 'after' B here.
382 // - If B is registered as BeforeAll, then A will be 'after' B here.
383 for (const auto& filter : filters) {
384 auto after = dependencies.DependenciesFor(filter.name);
385 std::string after_str;
386 if (!after.empty()) {
387 after_str = absl::StrCat(
388 std::string(max_filter_name_len + 1 - filter.name.name().length(),
389 ' '),
390 "after ", absl::StrJoin(after, ", "));
391 } else {
392 after_str =
393 std::string(max_filter_name_len - filter.name.name().length(), ' ');
394 }
395 LOG(INFO) << " " << loc_strs[filter.name] << filter.name << after_str
396 << " [" << filter.ordering << "/" << filter.version << "]";
397 }
398 // Finally list out the terminal filters and where they were registered
399 // from.
400 for (const auto& terminal : terminal_filters) {
401 const auto filter_str = absl::StrCat(
402 " ", loc_strs[terminal.name], terminal.name,
403 std::string(max_filter_name_len + 1 - terminal.name.name().length(),
404 ' '),
405 "[terminal]");
406 LOG(INFO) << filter_str;
407 }
408 }
409
Build()410 ChannelInit ChannelInit::Builder::Build() {
411 ChannelInit result;
412 for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
413 result.stack_configs_[i] =
414 BuildStackConfig(filters_[i], post_processors_[i],
415 static_cast<grpc_channel_stack_type>(i));
416 }
417 return result;
418 }
419
CheckPredicates(const ChannelArgs & args) const420 bool ChannelInit::Filter::CheckPredicates(const ChannelArgs& args) const {
421 for (const auto& predicate : predicates) {
422 if (!predicate(args)) return false;
423 }
424 return true;
425 }
426
CreateStack(ChannelStackBuilder * builder) const427 bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
428 const auto& stack_config = stack_configs_[builder->channel_stack_type()];
429 for (const auto& filter : stack_config.filters) {
430 if (SkipV2(filter.version)) continue;
431 if (!filter.CheckPredicates(builder->channel_args())) continue;
432 builder->AppendFilter(filter.filter);
433 }
434 int found_terminators = 0;
435 for (const auto& terminator : stack_config.terminators) {
436 if (!terminator.CheckPredicates(builder->channel_args())) continue;
437 builder->AppendFilter(terminator.filter);
438 ++found_terminators;
439 }
440 if (found_terminators != 1) {
441 std::string error = absl::StrCat(
442 found_terminators,
443 " terminating filters found creating a channel of type ",
444 grpc_channel_stack_type_string(builder->channel_stack_type()),
445 " with arguments ", builder->channel_args().ToString(),
446 " (we insist upon one and only one terminating "
447 "filter)\n");
448 if (stack_config.terminators.empty()) {
449 absl::StrAppend(&error, " No terminal filters were registered");
450 } else {
451 for (const auto& terminator : stack_config.terminators) {
452 absl::StrAppend(&error, " ", terminator.name, " registered @ ",
453 terminator.registration_source.file(), ":",
454 terminator.registration_source.line(), ": enabled = ",
455 terminator.CheckPredicates(builder->channel_args())
456 ? "true"
457 : "false",
458 "\n");
459 }
460 }
461 LOG(ERROR) << error;
462 return false;
463 }
464 for (const auto& post_processor : stack_config.post_processors) {
465 post_processor(*builder);
466 }
467 return true;
468 }
469
AddToInterceptionChainBuilder(grpc_channel_stack_type type,InterceptionChainBuilder & builder) const470 void ChannelInit::AddToInterceptionChainBuilder(
471 grpc_channel_stack_type type, InterceptionChainBuilder& builder) const {
472 const auto& stack_config = stack_configs_[type];
473 // Based on predicates build a list of filters to include in this segment.
474 for (const auto& filter : stack_config.filters) {
475 if (SkipV3(filter.version)) continue;
476 if (!filter.CheckPredicates(builder.channel_args())) continue;
477 if (filter.filter_adder == nullptr) {
478 builder.Fail(absl::InvalidArgumentError(
479 absl::StrCat("Filter ", filter.name, " has no v3-callstack vtable")));
480 return;
481 }
482 filter.filter_adder(builder);
483 }
484 }
485
486 } // namespace grpc_core
487