1 /**
2 * Copyright 2020-2022 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "backend/common/graph_kernel/add_atomic_clean.h"
18 #include <algorithm>
19 #include <functional>
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <stack>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 #include "backend/common/graph_kernel/core/graph_kernel_utils.h"
28 #include "backend/common/graph_kernel/graph_kernel_helper.h"
29 #include "include/backend/kernel_graph.h"
30 #include "include/common/debug/anf_ir_dump.h"
31 #include "include/common/utils/utils.h"
32 #include "kernel/framework_utils.h"
33 #include "kernel/kernel.h"
34 #include "mindspore/core/ops/math_ops.h"
35 #include "mindspore/core/ops/sequence_ops.h"
36 #include "utils/log_adapter.h"
37
38 namespace mindspore::graphkernel {
39 namespace {
GetUniqReduceAxes(const AnfNodePtr & node,bool is_ascend=false)40 std::set<int64_t> GetUniqReduceAxes(const AnfNodePtr &node, bool is_ascend = false) {
41 if (!IsPrimitiveCNode(node, prim::kPrimReduceSum)) {
42 MS_LOG(EXCEPTION) << "Expect ReduceSum node, but got " << common::AnfAlgo::GetCNodeName(node);
43 }
44
45 auto input = node->cast<CNodePtr>()->input(kFirstDataInputIndex);
46 ShapeVector src_shape_vec;
47 if (is_ascend) {
48 src_shape_vec = GetDeviceShape(input);
49 } else {
50 src_shape_vec = GetShape(input);
51 }
52 auto axis_vec = GetReduceAxis(node);
53 if (axis_vec.empty()) {
54 axis_vec.resize(src_shape_vec.size());
55 for (size_t i = 0; i < src_shape_vec.size(); ++i) {
56 axis_vec[i] = SizeToLong(i);
57 }
58 } else {
59 (void)std::transform(axis_vec.begin(), axis_vec.end(), axis_vec.begin(), [&src_shape_vec](int64_t axis) -> int64_t {
60 return axis < 0 ? axis + SizeToLong(src_shape_vec.size()) : axis;
61 });
62 }
63
64 std::set<int64_t> axis_set(axis_vec.begin(), axis_vec.end());
65 return axis_set;
66 }
67
HaveReduceInPredecessors(const AnfNodePtr & node)68 bool HaveReduceInPredecessors(const AnfNodePtr &node) {
69 std::stack<AnfNodePtr> st;
70 st.push(node);
71 while (!st.empty()) {
72 auto n = st.top();
73 st.pop();
74
75 if (n != node) {
76 if (!n->isa<CNode>()) {
77 continue;
78 }
79 if (IsPrimitiveCNode(n, prim::kPrimReduceSum)) {
80 return true;
81 }
82 }
83
84 auto n_inputs = n->cast<CNodePtr>()->inputs();
85 (void)std::for_each(n_inputs.cbegin() + 1, n_inputs.cend(), [&st](const AnfNodePtr &n) -> void { st.push(n); });
86 }
87
88 return false;
89 }
90 } // namespace
91
Init()92 std::shared_ptr<AtomicAddChecker> AtomicAddChecker::Init() {
93 auto processor = kernel::GetProcessorFromContext();
94 if (processor == kernel::Processor::AICORE) {
95 return std::make_shared<AtomicAddCheckerAscend>();
96 } else if (processor == kernel::Processor::CUDA) {
97 return std::make_shared<AtomicAddCheckerGPU>();
98 }
99 return nullptr;
100 }
101
FindCandidate(const AnfNodePtr & anf_node)102 bool AtomicAddChecker::FindCandidate(const AnfNodePtr &anf_node) {
103 atomic_add_infos_.clear();
104 auto node = anf_node->cast<CNodePtr>();
105 MS_EXCEPTION_IF_NULL(node);
106 auto sub_graph = common::AnfAlgo::GetCNodeFuncGraphPtr(node);
107 auto mng_sub = sub_graph->manager();
108 if (mng_sub == nullptr) {
109 mng_sub = Manage(sub_graph, false);
110 sub_graph->set_manager(mng_sub);
111 }
112
113 auto CheckSuitableTarget = [&mng_sub](const InplaceAssignerInfo &atomic_add_info) {
114 // Target type should not fuse any other ops in out direction, which means it should be in output list.
115 return mng_sub->node_users()[atomic_add_info.op_node].size() <= 1;
116 };
117
118 auto real_return_node = sub_graph->get_return()->input(kFirstDataInputIndex);
119 InplaceAssignerInfo atomic_add_info;
120 if (IsPrimitiveCNode(real_return_node, prim::kPrimMakeTuple)) {
121 const auto &inputs = real_return_node->cast<CNodePtr>()->inputs();
122 for (size_t i = 1; i < inputs.size(); ++i) {
123 if (IsPrimitiveCNode(inputs[i], target_type_)) {
124 atomic_add_info.op_node = inputs[i]->cast<CNodePtr>();
125 atomic_add_info.real_output_index = i - 1;
126 atomic_add_info.real_output_num = inputs.size() - 1;
127 // Target type should not fuse any other ops in out direction, which means it should be in output list.
128 if (!CheckSuitableTarget(atomic_add_info)) {
129 continue;
130 }
131 atomic_add_infos_.push_back(atomic_add_info);
132 }
133 }
134 } else if (IsPrimitiveCNode(real_return_node, target_type_)) {
135 atomic_add_info.op_node = real_return_node->cast<CNodePtr>();
136 atomic_add_info.real_output_num = 1;
137 if (CheckSuitableTarget(atomic_add_info)) {
138 atomic_add_infos_.push_back(atomic_add_info);
139 }
140 } else {
141 return false;
142 }
143
144 return !atomic_add_infos_.empty();
145 }
146
CanActivateAtomicAdd(const AnfNodePtr & anf_node)147 bool AtomicAddChecker::CanActivateAtomicAdd(const AnfNodePtr &anf_node) {
148 // Rules to activate atomic add:
149 // 1. Find only one ReduceSum inside sub-graph, and it should not fuse any other ops in out direction,
150 // which mean it should be in output list.
151 // 2. The reduce axis and reduce number should meet condition:
152 // (GPU) all-reduce or reduce-x when fuse number is greater than or equal to 1024, or reduce-y.
153 // (Ascend) The first valid axis of the input data is the reduce axis or the non-reduce axis
154 // cannot make full use of multi-core.
155 // 3. No other ReduceSum as output ReduceSum's predecessors (reduce compile limitation).
156
157 // Rule 1.
158 if (!FindCandidate(anf_node) || atomic_add_infos_.size() > 1) {
159 return false;
160 }
161
162 // Rule 2.
163 if (!SuitableForAtomicAdd(atomic_add_infos_[0].op_node)) {
164 return false;
165 }
166
167 // Rule 3.
168 return !HaveReduceInPredecessors(atomic_add_infos_[0].op_node);
169 }
170
Check(const AnfNodePtr & node)171 bool AtomicAddChecker::Check(const AnfNodePtr &node) {
172 return (common::AnfAlgo::IsGraphKernel(node) && CanActivateAtomicAdd(node));
173 }
174
SuitableForAtomicAdd(const AnfNodePtr & node)175 bool AtomicAddCheckerGPU::SuitableForAtomicAdd(const AnfNodePtr &node) {
176 auto input = node->cast<CNodePtr>()->input(kFirstDataInputIndex);
177 auto src_shape_vec = GetShape(input);
178 std::set<int64_t> axis_set = GetUniqReduceAxes(node);
179
180 // For reduce whose last dim is reduced (including all-reduce),
181 // it is suitable for atomic add only the reduce num is greater than or equal to 1024.
182 if (axis_set.count(src_shape_vec.size() - 1) != 0) {
183 size_t reduce_size = std::accumulate(
184 axis_set.begin(), axis_set.end(), LongToSize(1),
185 [&src_shape_vec](size_t size, int64_t axis) { return size * LongToSize(src_shape_vec[LongToSize(axis)]); });
186 return reduce_size >= 1024;
187 }
188
189 // For reduce whose last dim is not reduced, always true.
190 return true;
191 }
192
SuitableForAtomicAdd(const AnfNodePtr & node)193 bool AtomicAddCheckerAscend::SuitableForAtomicAdd(const AnfNodePtr &node) {
194 auto input = node->cast<CNodePtr>()->input(kFirstDataInputIndex);
195
196 // Atomic addition is enabled only when the data type is fp32
197 auto type = AnfAlgo::GetOutputDeviceDataType(input, 0);
198 if (type != kNumberTypeFloat32) {
199 return false;
200 }
201
202 // If the first valid axis of the input data is the reduce axis, enable atomic addition
203 auto src_shape_vec = GetDeviceShape(input);
204 std::set<int64_t> reduce_axis_set = GetUniqReduceAxes(node, true);
205 auto start_with_reduce = false;
206 for (size_t i = 0; i < src_shape_vec.size(); ++i) {
207 auto dim = src_shape_vec[i];
208 if (dim != 1) {
209 if (reduce_axis_set.count(i) != 0) {
210 start_with_reduce = true;
211 }
212 break;
213 }
214 }
215 if (start_with_reduce) {
216 return true;
217 }
218
219 // If the non-reduce axis cannot make full use of multi-core, enable atomic addition
220 constexpr auto processor_core_num = 32LL;
221 auto start_non_reduce_dim = 1LL;
222 for (size_t i = 0; i < src_shape_vec.size(); ++i) {
223 auto dim = src_shape_vec[i];
224 if (reduce_axis_set.count(i) != 0) {
225 break;
226 }
227 start_non_reduce_dim = start_non_reduce_dim * dim;
228 }
229 if (start_non_reduce_dim < processor_core_num) {
230 return true;
231 }
232
233 return false;
234 }
235
InsertAtomicClean(const FuncGraphPtr & main_graph,const AnfNodePtr & anf_node,const std::vector<InplaceAssignerInfo> & atomic_add_infos,const FuncGraphManagerPtr & mng)236 void AtomicCleanInserter::InsertAtomicClean(const FuncGraphPtr &main_graph, const AnfNodePtr &anf_node,
237 const std::vector<InplaceAssignerInfo> &atomic_add_infos,
238 const FuncGraphManagerPtr &mng) {
239 auto origin_composite_node = anf_node->cast<CNodePtr>();
240 MS_EXCEPTION_IF_NULL(origin_composite_node);
241
242 // Create broadcast node.
243 std::vector<std::pair<InplaceAssignerInfo, AnfNodePtr>> info_and_inplace_assignee_addr;
244 for (auto atomic_add_info : atomic_add_infos) {
245 auto out_type = GetType(atomic_add_info.op_node)->cast<TensorTypePtr>();
246 MS_EXCEPTION_IF_NULL(out_type);
247 auto broadcast_to_node = CreateCleanCompositeNode(atomic_add_info, main_graph, out_type->element()->type_id());
248 (void)info_and_inplace_assignee_addr.emplace_back(atomic_add_info, broadcast_to_node);
249 }
250
251 // Insert extra input(broadcast node output) to composite node, and make ReduceSum inplace-assign to it.
252 ProcessOriginCNode(origin_composite_node, info_and_inplace_assignee_addr);
253
254 // Insert Depend before origin ReduceSum's user to keep execution order.
255 ProcessOriginCNodeUser(main_graph, origin_composite_node, info_and_inplace_assignee_addr, mng);
256 std::stringstream ss;
257 ss << "Target node: " << origin_composite_node->fullname_with_scope() << ", clean nodes: ";
258 for (auto iter : info_and_inplace_assignee_addr) {
259 ss << iter.second->fullname_with_scope() << ", ";
260 }
261
262 MS_LOG(INFO) << ss.str();
263 }
264
Run(const FuncGraphPtr & func_graph)265 bool AtomicCleanInserter::Run(const FuncGraphPtr &func_graph) {
266 auto kernel_graph = std::dynamic_pointer_cast<session::KernelGraph>(func_graph);
267 MS_EXCEPTION_IF_NULL(kernel_graph);
268 auto mng = kernel_graph->manager();
269 if (mng == nullptr) {
270 mng = Manage(kernel_graph, true);
271 kernel_graph->set_manager(mng);
272 }
273
274 bool changed = false;
275 std::shared_ptr<AtomicAddChecker> atomic_add_checker = AtomicAddChecker::Init();
276 if (atomic_add_checker == nullptr) {
277 return changed;
278 }
279
280 auto topo_nodes = TopoSort(kernel_graph->get_return());
281 for (const auto &node : topo_nodes) {
282 if (!atomic_add_checker->Check(node)) {
283 continue;
284 }
285 auto atomic_add_infos = atomic_add_checker->GetAtomicAddInfo();
286 InsertAtomicClean(kernel_graph, node, atomic_add_infos, mng);
287 changed = true;
288 }
289
290 if (changed) {
291 mng->RemoveRoots();
292 mng->KeepRoots({func_graph});
293 }
294
295 return changed;
296 }
297 } // namespace mindspore::graphkernel
298