• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/surface/channel_init.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <string.h>
23 
24 #include <algorithm>
25 #include <map>
26 #include <queue>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_join.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/lib/surface/channel_stack_type.h"
39 #include "src/core/util/crash.h"
40 #include "src/core/util/sync.h"
41 #include "src/core/util/unique_type_name.h"
42 
43 namespace grpc_core {
44 
45 UniqueTypeName (*NameFromChannelFilter)(const grpc_channel_filter*);
46 
47 namespace {
48 struct CompareChannelFiltersByName {
operator ()grpc_core::__anon1aef240c0111::CompareChannelFiltersByName49   bool operator()(UniqueTypeName a, UniqueTypeName b) const {
50     // Compare lexicographically instead of by pointer value so that different
51     // builds make the same choices.
52     return a.name() < b.name();
53   }
54 };
55 }  // namespace
56 
After(std::initializer_list<UniqueTypeName> filters)57 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
58     std::initializer_list<UniqueTypeName> filters) {
59   for (auto filter : filters) {
60     after_.push_back(filter);
61   }
62   return *this;
63 }
64 
Before(std::initializer_list<UniqueTypeName> filters)65 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::Before(
66     std::initializer_list<UniqueTypeName> filters) {
67   for (auto filter : filters) {
68     before_.push_back(filter);
69   }
70   return *this;
71 }
72 
If(InclusionPredicate predicate)73 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::If(
74     InclusionPredicate predicate) {
75   predicates_.emplace_back(std::move(predicate));
76   return *this;
77 }
78 
IfNot(InclusionPredicate predicate)79 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfNot(
80     InclusionPredicate predicate) {
81   predicates_.emplace_back(
82       [predicate = std::move(predicate)](const ChannelArgs& args) {
83         return !predicate(args);
84       });
85   return *this;
86 }
87 
88 ChannelInit::FilterRegistration&
IfHasChannelArg(const char * arg)89 ChannelInit::FilterRegistration::IfHasChannelArg(const char* arg) {
90   return If([arg](const ChannelArgs& args) { return args.Contains(arg); });
91 }
92 
IfChannelArg(const char * arg,bool default_value)93 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfChannelArg(
94     const char* arg, bool default_value) {
95   return If([arg, default_value](const ChannelArgs& args) {
96     return args.GetBool(arg).value_or(default_value);
97   });
98 }
99 
100 ChannelInit::FilterRegistration&
ExcludeFromMinimalStack()101 ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
102   return If([](const ChannelArgs& args) { return !args.WantMinimalStack(); });
103 }
104 
RegisterFilter(grpc_channel_stack_type type,UniqueTypeName name,const grpc_channel_filter * filter,FilterAdder filter_adder,SourceLocation registration_source)105 ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
106     grpc_channel_stack_type type, UniqueTypeName name,
107     const grpc_channel_filter* filter, FilterAdder filter_adder,
108     SourceLocation registration_source) {
109   filters_[type].emplace_back(std::make_unique<FilterRegistration>(
110       name, filter, filter_adder, registration_source));
111   return *filters_[type].back();
112 }
113 
114 class ChannelInit::DependencyTracker {
115  public:
116   // Declare that a filter exists.
Declare(FilterRegistration * registration)117   void Declare(FilterRegistration* registration) {
118     nodes_.emplace(registration->name_, registration);
119   }
120   // Insert an edge from a to b
121   // Both nodes must be declared.
InsertEdge(UniqueTypeName a,UniqueTypeName b)122   void InsertEdge(UniqueTypeName a, UniqueTypeName b) {
123     auto it_a = nodes_.find(a);
124     auto it_b = nodes_.find(b);
125     if (it_a == nodes_.end()) {
126       GRPC_TRACE_LOG(channel_stack, INFO)
127           << "gRPC Filter " << a.name()
128           << " was not declared before adding an edge to " << b.name();
129       return;
130     }
131     if (it_b == nodes_.end()) {
132       GRPC_TRACE_LOG(channel_stack, INFO)
133           << "gRPC Filter " << b.name()
134           << " was not declared before adding an edge from " << a.name();
135       return;
136     }
137     auto& node_a = it_a->second;
138     auto& node_b = it_b->second;
139     node_a.dependents.push_back(&node_b);
140     node_b.all_dependencies.push_back(a);
141     ++node_b.waiting_dependencies;
142   }
143 
144   // Finish the dependency graph and begin iteration.
FinishDependencyMap()145   void FinishDependencyMap() {
146     for (auto& p : nodes_) {
147       if (p.second.waiting_dependencies == 0) {
148         ready_dependencies_.emplace(&p.second);
149       }
150     }
151   }
152 
Next()153   FilterRegistration* Next() {
154     if (ready_dependencies_.empty()) {
155       CHECK_EQ(nodes_taken_, nodes_.size()) << "Unresolvable graph of channel "
156                                                "filters:\n"
157                                             << GraphString();
158       return nullptr;
159     }
160     auto next = ready_dependencies_.top();
161     ready_dependencies_.pop();
162     if (!ready_dependencies_.empty() &&
163         next.node->ordering() != Ordering::kDefault) {
164       // Constraint: if we use ordering other than default, then we must have an
165       // unambiguous pick. If there is ambiguity, we must fix it by adding
166       // explicit ordering constraints.
167       CHECK_NE(next.node->ordering(),
168                ready_dependencies_.top().node->ordering())
169           << "Ambiguous ordering between " << next.node->name() << " and "
170           << ready_dependencies_.top().node->name();
171     }
172     for (Node* dependent : next.node->dependents) {
173       CHECK_GT(dependent->waiting_dependencies, 0u);
174       --dependent->waiting_dependencies;
175       if (dependent->waiting_dependencies == 0) {
176         ready_dependencies_.emplace(dependent);
177       }
178     }
179     ++nodes_taken_;
180     return next.node->registration;
181   }
182 
183   // Debug helper to dump the graph
GraphString() const184   std::string GraphString() const {
185     std::string result;
186     for (const auto& p : nodes_) {
187       absl::StrAppend(&result, p.first, " ->");
188       for (const auto& d : p.second.all_dependencies) {
189         absl::StrAppend(&result, " ", d);
190       }
191       absl::StrAppend(&result, "\n");
192     }
193     return result;
194   }
195 
DependenciesFor(UniqueTypeName name) const196   absl::Span<const UniqueTypeName> DependenciesFor(UniqueTypeName name) const {
197     auto it = nodes_.find(name);
198     CHECK(it != nodes_.end()) << "Filter " << name.name() << " not found";
199     return it->second.all_dependencies;
200   }
201 
202  private:
203   struct Node {
Nodegrpc_core::ChannelInit::DependencyTracker::Node204     explicit Node(FilterRegistration* registration)
205         : registration(registration) {}
206     // Nodes that depend on this node
207     std::vector<Node*> dependents;
208     // Nodes that this node depends on - for debugging purposes only
209     std::vector<UniqueTypeName> all_dependencies;
210     // The registration for this node
211     FilterRegistration* registration;
212     // Number of nodes this node is waiting on
213     size_t waiting_dependencies = 0;
214 
orderinggrpc_core::ChannelInit::DependencyTracker::Node215     Ordering ordering() const { return registration->ordering_; }
namegrpc_core::ChannelInit::DependencyTracker::Node216     absl::string_view name() const { return registration->name_.name(); }
217   };
218   struct ReadyDependency {
ReadyDependencygrpc_core::ChannelInit::DependencyTracker::ReadyDependency219     explicit ReadyDependency(Node* node) : node(node) {}
220     Node* node;
operator <grpc_core::ChannelInit::DependencyTracker::ReadyDependency221     bool operator<(const ReadyDependency& other) const {
222       // Sort first on ordering, and then lexically on name.
223       // The lexical sort means that the ordering is stable between builds
224       // (UniqueTypeName ordering is not stable between builds).
225       return node->ordering() > other.node->ordering() ||
226              (node->ordering() == other.node->ordering() &&
227               node->name() > other.node->name());
228     }
229   };
230   absl::flat_hash_map<UniqueTypeName, Node> nodes_;
231   std::priority_queue<ReadyDependency> ready_dependencies_;
232   size_t nodes_taken_ = 0;
233 };
234 
BuildStackConfig(const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>> & registrations,PostProcessor * post_processors,grpc_channel_stack_type type)235 ChannelInit::StackConfig ChannelInit::BuildStackConfig(
236     const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
237         registrations,
238     PostProcessor* post_processors, grpc_channel_stack_type type) {
239   // Phase 1: Build a map from filter to the set of filters that must be
240   // initialized before it.
241   // We order this map (and the set of dependent filters) by filter name to
242   // ensure algorithm ordering stability is deterministic for a given build.
243   // We should not require this, but at the time of writing it's expected that
244   // this will help overall stability.
245   DependencyTracker dependencies;
246   std::vector<Filter> terminal_filters;
247   for (const auto& registration : registrations) {
248     if (registration->terminal_) {
249       CHECK(registration->after_.empty());
250       CHECK(registration->before_.empty());
251       CHECK(!registration->before_all_);
252       CHECK_EQ(registration->ordering_, Ordering::kDefault);
253       terminal_filters.emplace_back(
254           registration->name_, registration->filter_, nullptr,
255           std::move(registration->predicates_), registration->version_,
256           registration->ordering_, registration->registration_source_);
257     } else {
258       dependencies.Declare(registration.get());
259     }
260   }
261   for (const auto& registration : registrations) {
262     if (registration->terminal_) continue;
263     for (UniqueTypeName after : registration->after_) {
264       dependencies.InsertEdge(after, registration->name_);
265     }
266     for (UniqueTypeName before : registration->before_) {
267       dependencies.InsertEdge(registration->name_, before);
268     }
269     if (registration->before_all_) {
270       for (const auto& other : registrations) {
271         if (other.get() == registration.get()) continue;
272         if (other->terminal_) continue;
273         dependencies.InsertEdge(registration->name_, other->name_);
274       }
275     }
276   }
277   // Phase 2: Build a list of filters in dependency order.
278   // We can simply iterate through and add anything with no dependency.
279   // We then remove that filter from the dependency list of all other filters.
280   // We repeat until we have no more filters to add.
281   dependencies.FinishDependencyMap();
282   std::vector<Filter> filters;
283   while (auto registration = dependencies.Next()) {
284     filters.emplace_back(
285         registration->name_, registration->filter_, registration->filter_adder_,
286         std::move(registration->predicates_), registration->version_,
287         registration->ordering_, registration->registration_source_);
288   }
289   // Collect post processors that need to be applied.
290   // We've already ensured the one-per-slot constraint, so now we can just
291   // collect everything up into a vector and run it in order.
292   std::vector<PostProcessor> post_processor_functions;
293   for (int i = 0; i < static_cast<int>(PostProcessorSlot::kCount); i++) {
294     if (post_processors[i] == nullptr) continue;
295     post_processor_functions.emplace_back(std::move(post_processors[i]));
296   }
297   // Log out the graph we built if that's been requested.
298   if (GRPC_TRACE_FLAG_ENABLED(channel_stack)) {
299     PrintChannelStackTrace(type, registrations, dependencies, filters,
300                            terminal_filters);
301   }
302   // Check if there are no terminal filters: this would be an error.
303   // GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
304   // condition here.
305   // Right now we only log: many tests end up with a core configuration that
306   // is invalid.
307   // TODO(ctiller): evaluate if we can turn this into a crash one day.
308   // Right now it forces too many tests to know about channel initialization,
309   // either by supplying a valid configuration or by including an opt-out flag.
310   if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
311     LOG(ERROR) << "No terminal filters registered for channel stack type "
312                << grpc_channel_stack_type_string(type)
313                << "; this is common for unit tests messing with "
314                   "CoreConfiguration, but will result in a "
315                   "ChannelInit::CreateStack that never completes successfully.";
316   }
317   return StackConfig{std::move(filters), std::move(terminal_filters),
318                      std::move(post_processor_functions)};
319 };
320 
PrintChannelStackTrace(grpc_channel_stack_type type,const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>> & registrations,const DependencyTracker & dependencies,const std::vector<Filter> & filters,const std::vector<Filter> & terminal_filters)321 void ChannelInit::PrintChannelStackTrace(
322     grpc_channel_stack_type type,
323     const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
324         registrations,
325     const DependencyTracker& dependencies, const std::vector<Filter>& filters,
326     const std::vector<Filter>& terminal_filters) {
327   // It can happen that multiple threads attempt to construct a core config at
328   // once.
329   // This is benign - the first one wins and others are discarded.
330   // However, it messes up our logging and makes it harder to reason about the
331   // graph, so we add some protection here.
332   static Mutex* const m = new Mutex();
333   MutexLock lock(m);
334   // List the channel stack type (since we'll be repeatedly printing graphs in
335   // this loop).
336   LOG(INFO) << "ORDERED CHANNEL STACK " << grpc_channel_stack_type_string(type)
337             << ":";
338   // First build up a map of filter -> file:line: strings, because it helps
339   // the readability of this log to get later fields aligned vertically.
340   absl::flat_hash_map<UniqueTypeName, std::string> loc_strs;
341   size_t max_loc_str_len = 0;
342   size_t max_filter_name_len = 0;
343   auto add_loc_str = [&max_loc_str_len, &loc_strs, &registrations,
344                       &max_filter_name_len](UniqueTypeName name) {
345     max_filter_name_len = std::max(name.name().length(), max_filter_name_len);
346     for (const auto& registration : registrations) {
347       if (registration->name_ == name) {
348         auto source = registration->registration_source_;
349         absl::string_view file = source.file();
350         auto slash_pos = file.rfind('/');
351         if (slash_pos != file.npos) {
352           file = file.substr(slash_pos + 1);
353         }
354         auto loc_str = absl::StrCat(file, ":", source.line(), ":");
355         max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
356         loc_strs.emplace(name, std::move(loc_str));
357         break;
358       }
359     }
360   };
361   for (const auto& filter : filters) {
362     add_loc_str(filter.name);
363   }
364   for (const auto& terminal : terminal_filters) {
365     add_loc_str(terminal.name);
366   }
367   for (auto& loc_str : loc_strs) {
368     loc_str.second = absl::StrCat(
369         loc_str.second,
370         std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
371   }
372   // For each regular filter, print the location registered, the name of the
373   // filter, and if it needed to occur after some other filters list those
374   // filters too.
375   // Note that we use the processed after list here - earlier we turned Before
376   // registrations into After registrations and we used those converted
377   // registrations to build the final ordering.
378   // If you're trying to track down why 'A' is listed as after 'B', look at
379   // the following:
380   //  - If A is registered with .After({B}), then A will be 'after' B here.
381   //  - If B is registered with .Before({A}), then A will be 'after' B here.
382   //  - If B is registered as BeforeAll, then A will be 'after' B here.
383   for (const auto& filter : filters) {
384     auto after = dependencies.DependenciesFor(filter.name);
385     std::string after_str;
386     if (!after.empty()) {
387       after_str = absl::StrCat(
388           std::string(max_filter_name_len + 1 - filter.name.name().length(),
389                       ' '),
390           "after ", absl::StrJoin(after, ", "));
391     } else {
392       after_str =
393           std::string(max_filter_name_len - filter.name.name().length(), ' ');
394     }
395     LOG(INFO) << "  " << loc_strs[filter.name] << filter.name << after_str
396               << " [" << filter.ordering << "/" << filter.version << "]";
397   }
398   // Finally list out the terminal filters and where they were registered
399   // from.
400   for (const auto& terminal : terminal_filters) {
401     const auto filter_str = absl::StrCat(
402         "  ", loc_strs[terminal.name], terminal.name,
403         std::string(max_filter_name_len + 1 - terminal.name.name().length(),
404                     ' '),
405         "[terminal]");
406     LOG(INFO) << filter_str;
407   }
408 }
409 
Build()410 ChannelInit ChannelInit::Builder::Build() {
411   ChannelInit result;
412   for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
413     result.stack_configs_[i] =
414         BuildStackConfig(filters_[i], post_processors_[i],
415                          static_cast<grpc_channel_stack_type>(i));
416   }
417   return result;
418 }
419 
CheckPredicates(const ChannelArgs & args) const420 bool ChannelInit::Filter::CheckPredicates(const ChannelArgs& args) const {
421   for (const auto& predicate : predicates) {
422     if (!predicate(args)) return false;
423   }
424   return true;
425 }
426 
CreateStack(ChannelStackBuilder * builder) const427 bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
428   const auto& stack_config = stack_configs_[builder->channel_stack_type()];
429   for (const auto& filter : stack_config.filters) {
430     if (SkipV2(filter.version)) continue;
431     if (!filter.CheckPredicates(builder->channel_args())) continue;
432     builder->AppendFilter(filter.filter);
433   }
434   int found_terminators = 0;
435   for (const auto& terminator : stack_config.terminators) {
436     if (!terminator.CheckPredicates(builder->channel_args())) continue;
437     builder->AppendFilter(terminator.filter);
438     ++found_terminators;
439   }
440   if (found_terminators != 1) {
441     std::string error = absl::StrCat(
442         found_terminators,
443         " terminating filters found creating a channel of type ",
444         grpc_channel_stack_type_string(builder->channel_stack_type()),
445         " with arguments ", builder->channel_args().ToString(),
446         " (we insist upon one and only one terminating "
447         "filter)\n");
448     if (stack_config.terminators.empty()) {
449       absl::StrAppend(&error, "  No terminal filters were registered");
450     } else {
451       for (const auto& terminator : stack_config.terminators) {
452         absl::StrAppend(&error, "  ", terminator.name, " registered @ ",
453                         terminator.registration_source.file(), ":",
454                         terminator.registration_source.line(), ": enabled = ",
455                         terminator.CheckPredicates(builder->channel_args())
456                             ? "true"
457                             : "false",
458                         "\n");
459       }
460     }
461     LOG(ERROR) << error;
462     return false;
463   }
464   for (const auto& post_processor : stack_config.post_processors) {
465     post_processor(*builder);
466   }
467   return true;
468 }
469 
AddToInterceptionChainBuilder(grpc_channel_stack_type type,InterceptionChainBuilder & builder) const470 void ChannelInit::AddToInterceptionChainBuilder(
471     grpc_channel_stack_type type, InterceptionChainBuilder& builder) const {
472   const auto& stack_config = stack_configs_[type];
473   // Based on predicates build a list of filters to include in this segment.
474   for (const auto& filter : stack_config.filters) {
475     if (SkipV3(filter.version)) continue;
476     if (!filter.CheckPredicates(builder.channel_args())) continue;
477     if (filter.filter_adder == nullptr) {
478       builder.Fail(absl::InvalidArgumentError(
479           absl::StrCat("Filter ", filter.name, " has no v3-callstack vtable")));
480       return;
481     }
482     filter.filter_adder(builder);
483   }
484 }
485 
486 }  // namespace grpc_core
487