1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/compiler/jit/cluster_scoping_pass.h"
17
18 #include "absl/algorithm/container.h"
19 #include "absl/container/flat_hash_set.h"
20 #include "absl/strings/str_cat.h"
21 #include "tensorflow/compiler/jit/defs.h"
22 #include "tensorflow/compiler/jit/xla_cluster_util.h"
23 #include "tensorflow/core/framework/node_def_util.h"
24 #include "tensorflow/core/graph/algorithm.h"
25
26 namespace tensorflow {
27 namespace {
28
29 class ClusterScopingPassImpl {
30 public:
ClusterScopingPassImpl(Graph * graph,OptimizerOptions::GlobalJitLevel global_jit_level)31 ClusterScopingPassImpl(Graph* graph,
32 OptimizerOptions::GlobalJitLevel global_jit_level)
33 : graph_(graph),
34 global_jit_level_(global_jit_level),
35 unique_scope_id_(0) {}
36
37 Status Run();
38
39 private:
40 Status ScopingForPipelineStages();
41
GetUniqueScopeId()42 size_t GetUniqueScopeId() { return unique_scope_id_++; }
43
44 void AddScopeToAllTransitivePredecessors(Node* start);
45
46 void AddScopeToAllTransitiveSuccessors(Node* start);
47
48 private:
49 Graph* graph_;
50 OptimizerOptions::GlobalJitLevel global_jit_level_;
51 size_t unique_scope_id_;
52 };
53
GetXlaInternalScope(Node * node)54 std::optional<string> GetXlaInternalScope(Node* node) {
55 string scope;
56 if (GetNodeAttr(node->attrs(), kXlaInternalScopeAttr, &scope).ok()) {
57 return scope;
58 }
59
60 return std::nullopt;
61 }
62
SetXlaInternalScope(Node * node,StringPiece scope)63 void SetXlaInternalScope(Node* node, StringPiece scope) {
64 node->AddAttr(kXlaInternalScopeAttr, scope);
65 }
66
67 // NB! We append a new scope as suffix to the _XlaInternalScope attribute
68 // instead of overriding the old value. In other words, appending scope B to
69 // scope A creates the conjunction of the scopes A and B (i.e, A & B) and,
70 // in effect, the node gets both the old and new scopes. As a unique scope
71 // disallows a node being merged with nodes in other scopes, the scope
72 // conjunction preserves the semantic of the old scope (i.e., the node still
73 // cannot be merged with the previously incompatible nodes.)
74 //
75 // For example, the below case should be rare in practice but can serve for the
76 // purpose of discussion. After adding scopes for both Stage and Unstage,
77 // Node_Y will receive both scopes "unstage" and "stage", while Node_X receives
78 // only scope "stage". The semantic of scope "unstage" is preserved although
79 // scope "stage" is later appended. As a result, Node_X and Node_Y will be put
80 // into different clusters.
81 //
82 // Unstage -> Node_Y (scope "unstage & stage")
83 // |
84 // V
85 // Node_X (scope "stage") -> Stage
86 //
AddOrAppendXlaInternalScope(Node * node,absl::string_view suffix)87 void AddOrAppendXlaInternalScope(Node* node, absl::string_view suffix) {
88 string updated_scope;
89 std::optional<string> cur_scope = GetXlaInternalScope(node);
90 if (cur_scope == std::nullopt) {
91 updated_scope = std::string(suffix);
92 } else {
93 updated_scope = absl::StrCat(cur_scope.value(), "&", suffix);
94 }
95 SetXlaInternalScope(node, updated_scope);
96 }
97
AddScopeToAllTransitivePredecessors(Node * start)98 void ClusterScopingPassImpl::AddScopeToAllTransitivePredecessors(Node* start) {
99 const string unique_suffix = absl::StrCat("_", GetUniqueScopeId());
100
101 std::vector<Node*> starts;
102 starts.push_back(start);
103 auto enter = [&](Node* n) { AddOrAppendXlaInternalScope(n, unique_suffix); };
104 ReverseDFSFrom(*graph_, starts, enter, /*leave=*/nullptr,
105 /*stable_comparator=*/NodeComparatorName());
106 }
107
AddScopeToAllTransitiveSuccessors(Node * start)108 void ClusterScopingPassImpl::AddScopeToAllTransitiveSuccessors(Node* start) {
109 const string unique_suffix = absl::StrCat("_", GetUniqueScopeId());
110
111 std::vector<Node*> starts;
112 starts.push_back(start);
113 auto enter = [&](Node* n) { AddOrAppendXlaInternalScope(n, unique_suffix); };
114 DFSFrom(*graph_, starts, enter, /*leave=*/nullptr,
115 /*stable_comparator=*/NodeComparatorName(),
116 // Do not filter any edges to better capture the semantics of
117 // transitive closure of successors. We may revisit this when
118 // we see more cases needing cluster scoping in the future.
119 /*edge_filter=*/nullptr);
120 }
121
122 // This preserves the parallelism between pipeline stages. For example, below
123 // is a typical pattern of input pipelining in Tensorflow and this heuristic
124 // ensures Node_X and Node_Y are put into different clusters. Without the
125 // heuristic, they may be put into the same cluster and it can introduce
126 // artificial dependencies and incur great performance loss. In this example,
127 // Node_Y becomes dependent on IteratorGetNext and the latencies add up if
128 // Node_X and Node_Y are in the same cluster.
129 //
130 // IteratorGetNext -> Node_X -> Stage
131 //
132 // Unstage -> Node_Y
133 //
ScopingForPipelineStages()134 Status ClusterScopingPassImpl::ScopingForPipelineStages() {
135 for (Node* n : graph_->nodes()) {
136 DCHECK(n);
137 if (n->type_string() == "Unstage") {
138 AddScopeToAllTransitiveSuccessors(n);
139 }
140 if (n->type_string() == "Stage") {
141 AddScopeToAllTransitivePredecessors(n);
142 }
143 }
144
145 return OkStatus();
146 }
147
Run()148 Status ClusterScopingPassImpl::Run() {
149 if (global_jit_level_ == OptimizerOptions::OFF) {
150 return OkStatus();
151 }
152
153 return ScopingForPipelineStages();
154 }
155 } // namespace
156
Run(const GraphOptimizationPassOptions & options)157 Status ClusterScopingPass::Run(const GraphOptimizationPassOptions& options) {
158 Graph* graph = options.graph->get();
159
160 return ClusterScopingPassImpl{graph, GetGlobalJitLevelForGraph(options)}
161 .Run();
162 }
163 } // namespace tensorflow
164