1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/graph/collective_order.h"
16
17 #include "absl/container/flat_hash_map.h"
18 #include "absl/container/flat_hash_set.h"
19 #include "tensorflow/core/graph/algorithm.h"
20
21 namespace tensorflow {
22 namespace {
23
24 // Find all CollectiveReduce nodes and the existing data dependencies between
25 // them.
DiscoverDataDependencies(const Graph * graph,std::vector<Node * > * collective_nodes,std::vector<int32> * instance_keys,absl::flat_hash_map<Node *,absl::flat_hash_set<int32>> * data_dependencies)26 Status DiscoverDataDependencies(
27 const Graph* graph, std::vector<Node*>* collective_nodes,
28 std::vector<int32>* instance_keys,
29 absl::flat_hash_map<Node*, absl::flat_hash_set<int32>>* data_dependencies) {
30 Status s;
31 // Algorithm: do Reverse DFS starting at sink. `node_leave` is called when
32 // all parents of `node` have been visited. At that point,
33 // `data_dependencies[node]` is a list containing `instance_key` of every
34 // `CollectiveReduce` on which `node` has a data dependency.
35 // For this node's children, add all these instance keys. Also, if this node
36 // is collective, add as a dependency for the children.
37 auto node_leave = [collective_nodes, instance_keys, data_dependencies,
38 &s](Node* node) {
39 int32 instance_key;
40 bool enter_node =
41 node->IsCollective() && node->type_string() == "CollectiveReduce";
42 if (enter_node) {
43 Status get_attr_status =
44 GetNodeAttr(node->attrs(), "instance_key", &instance_key);
45 s.Update(get_attr_status);
46 collective_nodes->push_back(node);
47 instance_keys->push_back(instance_key);
48 VLOG(2) << "collective node " << node->DebugString();
49 }
50 // Avoid reference invalidation of `node_deps`.
51 data_dependencies->reserve(data_dependencies->size() + 1 +
52 node->out_edges().size());
53 const auto& node_deps = (*data_dependencies)[node];
54 for (const Edge* out_edge : node->out_edges()) {
55 auto& child_deps = (*data_dependencies)[out_edge->dst()];
56 child_deps.insert(node_deps.begin(), node_deps.end());
57 if (enter_node && s.ok()) {
58 child_deps.insert(instance_key);
59 }
60 }
61 };
62 ReverseDFS(*graph, nullptr, node_leave);
63 return s;
64 }
65
66 // Given a list of `collective_nodes` and `data_dependencies` between the
67 // collective nodes, create control dependencies between concurrent collectives
68 // and store in `dependency_edges`.
69 // If there exists an edge a -> b then `dependency_edges[a]` contains `b`
CreateControlDependencies(const std::vector<Node * > & collective_nodes,const std::vector<int32> & instance_keys,absl::flat_hash_map<Node *,absl::flat_hash_set<int32>> * data_dependencies,absl::flat_hash_map<Node *,absl::flat_hash_set<Node * >> * dependency_edges)70 Status CreateControlDependencies(
71 const std::vector<Node*>& collective_nodes,
72 const std::vector<int32>& instance_keys,
73 absl::flat_hash_map<Node*, absl::flat_hash_set<int32>>* data_dependencies,
74 absl::flat_hash_map<Node*, absl::flat_hash_set<Node*>>* dependency_edges) {
75 // If there exists some path a -> ... -> b then `all_paths[a]` contains `b`
76 absl::flat_hash_map<Node*, absl::flat_hash_set<Node*>> all_paths;
77 for (int i = 0; i < collective_nodes.size() - 1; i++) {
78 if (!collective_nodes[i]->IsCollective() ||
79 collective_nodes[i]->type_string() != "CollectiveReduce") {
80 return errors::Internal("Unexpected node ",
81 collective_nodes[i]->DebugString());
82 }
83 const auto& deps_i = (*data_dependencies)[collective_nodes[i]];
84 for (int j = i + 1; j < collective_nodes.size(); j++) {
85 if (collective_nodes[i]->requested_device() !=
86 collective_nodes[j]->requested_device()) {
87 continue;
88 }
89 if (instance_keys[i] == instance_keys[j]) {
90 return errors::Internal("Unexpected same instance_key ",
91 instance_keys[i],
92 " on 2 nodes with the same device ",
93 collective_nodes[i]->requested_device());
94 }
95 const auto& deps_j = (*data_dependencies)[collective_nodes[j]];
96 if (deps_i.find(instance_keys[j]) == deps_i.end() &&
97 deps_j.find(instance_keys[i]) == deps_j.end()) {
98 int src_idx = instance_keys[i] > instance_keys[j] ? i : j;
99 int dst_idx = instance_keys[i] > instance_keys[j] ? j : i;
100 Node* src_node = collective_nodes[src_idx];
101 Node* dst_node = collective_nodes[dst_idx];
102 VLOG(1) << "Adding control dependency from node " << src_node->name()
103 << " instance " << instance_keys[src_idx] << " to node "
104 << dst_node->name() << " instance " << instance_keys[dst_idx];
105 (*dependency_edges)[src_node].insert(dst_node);
106 auto& src_paths = all_paths[src_node];
107 src_paths.insert(dst_node);
108 for (Node* downstream_node : all_paths[dst_node]) {
109 src_paths.insert(downstream_node);
110 }
111 }
112 }
113 }
114
115 // Prune dependency edges so that if there are edges a -> b, b -> c, and a ->
116 // c, then remove a -> c. This dependency would be handled naturally during
117 // op scheduling.
118 for (int i = 0; i < collective_nodes.size(); ++i) {
119 Node* node = collective_nodes[i];
120 auto& neighbor_set = (*dependency_edges)[node];
121 std::vector<Node*> neighbor_list(neighbor_set.begin(), neighbor_set.end());
122 // For all n1, n2 in `neighbor_list` if there is a path from n1 -> n2 then
123 // eliminate n2 from `neighbor_set` and `neighbor_list`. We remove from
124 // `neighbor_list` by replacing with a `nullptr`, hence the `nullptr` checks
125 // below.
126 for (int j = 0; j < neighbor_list.size(); ++j) {
127 Node* n1 = neighbor_list[j];
128 if (n1 == nullptr) continue;
129 auto& n1_paths = all_paths[n1];
130 for (int k = 0; k < neighbor_list.size(); ++k) {
131 Node* n2 = neighbor_list[k];
132 if (j == k || n2 == nullptr) continue;
133 if (n1_paths.find(n2) != n1_paths.end()) {
134 neighbor_set.erase(n2);
135 neighbor_list[k] = nullptr;
136 }
137 }
138 }
139 }
140
141 return Status::OK();
142 }
143
144 // Insert control dependencies defined by `dependency_edges` in `graph`. If
145 // `order_type` is `kEdges`, insert explicit control edges, else if `order_type`
146 // is `kAttrs`, encode dependencies as an attribute on collective node.
InsertControlDependencies(Graph * graph,GraphCollectiveOrder order_type,const absl::flat_hash_map<Node *,absl::flat_hash_set<Node * >> & dependency_edges)147 Status InsertControlDependencies(
148 Graph* graph, GraphCollectiveOrder order_type,
149 const absl::flat_hash_map<Node*, absl::flat_hash_set<Node*>>&
150 dependency_edges) {
151 if (order_type == GraphCollectiveOrder::kEdges) {
152 for (const auto& pair : dependency_edges) {
153 Node* src_node = pair.first;
154 for (Node* dst_node : pair.second) {
155 graph->AddControlEdge(src_node, dst_node);
156 }
157 }
158 } else if (order_type == GraphCollectiveOrder::kAttrs) {
159 // `wait_for` is the inverse of `dependency_edges`, i.e. `wait_for[node]`
160 // contains the list of instance keys for which `node` must wait.
161 absl::flat_hash_map<Node*, absl::flat_hash_set<int32>> wait_for;
162 for (const auto& pair : dependency_edges) {
163 int32 src_instance;
164 TF_RETURN_IF_ERROR(
165 GetNodeAttr(pair.first->attrs(), "instance_key", &src_instance));
166 for (Node* dst_node : pair.second) {
167 wait_for[dst_node].insert(src_instance);
168 }
169 }
170 for (const auto& pair : wait_for) {
171 std::vector<int32> wait_for_list(pair.second.begin(), pair.second.end());
172 pair.first->ClearAttr("wait_for");
173 pair.first->AddAttr("wait_for", wait_for_list);
174 }
175 } else {
176 return errors::Internal("Unexpected GraphCollectiveOrder type ",
177 static_cast<int>(order_type));
178 }
179 return Status::OK();
180 }
181
182 } // namespace
183
OrderCollectives(Graph * graph,GraphCollectiveOrder order_type)184 Status OrderCollectives(Graph* graph, GraphCollectiveOrder order_type) {
185 // `instance_keys[i]` corresponds to `collective_nodes[i]`
186 std::vector<Node*> collective_nodes;
187 std::vector<int32> instance_keys;
188 // node -> set of collectives on which node depends.
189 absl::flat_hash_map<Node*, absl::flat_hash_set<int32>> data_dependencies;
190 TF_RETURN_IF_ERROR(DiscoverDataDependencies(
191 graph, &collective_nodes, &instance_keys, &data_dependencies));
192
193 if (collective_nodes.empty()) return Status::OK();
194
195 absl::flat_hash_map<Node*, absl::flat_hash_set<Node*>> dependency_edges;
196 // For all pairs of collective nodes n1 and n2 on the same device, if n1 does
197 // not depend on n2 and n2 does not depend on n1, then they are potentially
198 // concurrent. Create an arbitrary, deterministic ordering between them.
199 TF_RETURN_IF_ERROR(CreateControlDependencies(
200 collective_nodes, instance_keys, &data_dependencies, &dependency_edges));
201
202 return InsertControlDependencies(graph, order_type, dependency_edges);
203 }
204
205 } // namespace tensorflow
206