• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/compiler/jit/cluster_scoping_pass.h"
17 
18 #include "absl/algorithm/container.h"
19 #include "absl/container/flat_hash_set.h"
20 #include "absl/strings/str_cat.h"
21 #include "tensorflow/compiler/jit/defs.h"
22 #include "tensorflow/compiler/jit/xla_cluster_util.h"
23 #include "tensorflow/core/framework/node_def_util.h"
24 #include "tensorflow/core/graph/algorithm.h"
25 
26 namespace tensorflow {
27 namespace {
28 
29 class ClusterScopingPassImpl {
30  public:
ClusterScopingPassImpl(Graph * graph,OptimizerOptions::GlobalJitLevel global_jit_level)31   ClusterScopingPassImpl(Graph* graph,
32                          OptimizerOptions::GlobalJitLevel global_jit_level)
33       : graph_(graph),
34         global_jit_level_(global_jit_level),
35         unique_scope_id_(0) {}
36 
37   Status Run();
38 
39  private:
40   Status ScopingForPipelineStages();
41 
GetUniqueScopeId()42   size_t GetUniqueScopeId() { return unique_scope_id_++; }
43 
44   void AddScopeToAllTransitivePredecessors(Node* start);
45 
46   void AddScopeToAllTransitiveSuccessors(Node* start);
47 
48  private:
49   Graph* graph_;
50   OptimizerOptions::GlobalJitLevel global_jit_level_;
51   size_t unique_scope_id_;
52 };
53 
GetXlaInternalScope(Node * node)54 std::optional<string> GetXlaInternalScope(Node* node) {
55   string scope;
56   if (GetNodeAttr(node->attrs(), kXlaInternalScopeAttr, &scope).ok()) {
57     return scope;
58   }
59 
60   return std::nullopt;
61 }
62 
SetXlaInternalScope(Node * node,StringPiece scope)63 void SetXlaInternalScope(Node* node, StringPiece scope) {
64   node->AddAttr(kXlaInternalScopeAttr, scope);
65 }
66 
67 // NB! We append a new scope as suffix to the _XlaInternalScope attribute
68 // instead of overriding the old value.  In other words, appending scope B to
69 // scope A creates the conjunction of the scopes A and B (i.e, A & B) and,
70 // in effect, the node gets both the old and new scopes.  As a unique scope
71 // disallows a node being merged with nodes in other scopes, the scope
72 // conjunction preserves the semantic of the old scope (i.e., the node still
73 // cannot be merged with the previously incompatible nodes.)
74 //
75 // For example, the below case should be rare in practice but can serve for the
76 // purpose of discussion.  After adding scopes for both Stage and Unstage,
77 // Node_Y will receive both scopes "unstage" and "stage", while Node_X receives
78 // only scope "stage".  The semantic of scope "unstage" is preserved although
79 // scope "stage" is later appended.  As a result, Node_X and Node_Y will be put
80 // into different clusters.
81 //
82 //                Unstage -> Node_Y (scope "unstage & stage")
83 //                              |
84 //                              V
85 //  Node_X (scope "stage") -> Stage
86 //
AddOrAppendXlaInternalScope(Node * node,absl::string_view suffix)87 void AddOrAppendXlaInternalScope(Node* node, absl::string_view suffix) {
88   string updated_scope;
89   std::optional<string> cur_scope = GetXlaInternalScope(node);
90   if (cur_scope == std::nullopt) {
91     updated_scope = std::string(suffix);
92   } else {
93     updated_scope = absl::StrCat(cur_scope.value(), "&", suffix);
94   }
95   SetXlaInternalScope(node, updated_scope);
96 }
97 
AddScopeToAllTransitivePredecessors(Node * start)98 void ClusterScopingPassImpl::AddScopeToAllTransitivePredecessors(Node* start) {
99   const string unique_suffix = absl::StrCat("_", GetUniqueScopeId());
100 
101   std::vector<Node*> starts;
102   starts.push_back(start);
103   auto enter = [&](Node* n) { AddOrAppendXlaInternalScope(n, unique_suffix); };
104   ReverseDFSFrom(*graph_, starts, enter, /*leave=*/nullptr,
105                  /*stable_comparator=*/NodeComparatorName());
106 }
107 
AddScopeToAllTransitiveSuccessors(Node * start)108 void ClusterScopingPassImpl::AddScopeToAllTransitiveSuccessors(Node* start) {
109   const string unique_suffix = absl::StrCat("_", GetUniqueScopeId());
110 
111   std::vector<Node*> starts;
112   starts.push_back(start);
113   auto enter = [&](Node* n) { AddOrAppendXlaInternalScope(n, unique_suffix); };
114   DFSFrom(*graph_, starts, enter, /*leave=*/nullptr,
115           /*stable_comparator=*/NodeComparatorName(),
116           // Do not filter any edges to better capture the semantics of
117           // transitive closure of successors.  We may revisit this when
118           // we see more cases needing cluster scoping in the future.
119           /*edge_filter=*/nullptr);
120 }
121 
122 // This preserves the parallelism between pipeline stages.  For example, below
123 // is a typical pattern of input pipelining in Tensorflow and this heuristic
124 // ensures Node_X and Node_Y are put into different clusters.  Without the
125 // heuristic, they may be put into the same cluster and it can introduce
126 // artificial dependencies and incur great performance loss.  In this example,
127 // Node_Y becomes dependent on IteratorGetNext and the latencies add up if
128 // Node_X and Node_Y are in the same cluster.
129 //
130 // IteratorGetNext -> Node_X -> Stage
131 //
132 // Unstage -> Node_Y
133 //
ScopingForPipelineStages()134 Status ClusterScopingPassImpl::ScopingForPipelineStages() {
135   for (Node* n : graph_->nodes()) {
136     DCHECK(n);
137     if (n->type_string() == "Unstage") {
138       AddScopeToAllTransitiveSuccessors(n);
139     }
140     if (n->type_string() == "Stage") {
141       AddScopeToAllTransitivePredecessors(n);
142     }
143   }
144 
145   return OkStatus();
146 }
147 
Run()148 Status ClusterScopingPassImpl::Run() {
149   if (global_jit_level_ == OptimizerOptions::OFF) {
150     return OkStatus();
151   }
152 
153   return ScopingForPipelineStages();
154 }
155 }  // namespace
156 
Run(const GraphOptimizationPassOptions & options)157 Status ClusterScopingPass::Run(const GraphOptimizationPassOptions& options) {
158   Graph* graph = options.graph->get();
159 
160   return ClusterScopingPassImpl{graph, GetGlobalJitLevelForGraph(options)}
161       .Run();
162 }
163 }  // namespace tensorflow
164