• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "backend/common/graph_kernel/add_atomic_clean.h"
18 #include <algorithm>
19 #include <functional>
20 #include <map>
21 #include <memory>
22 #include <set>
23 #include <stack>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 #include "backend/common/graph_kernel/core/graph_kernel_utils.h"
28 #include "backend/common/graph_kernel/graph_kernel_helper.h"
29 #include "include/backend/kernel_graph.h"
30 #include "include/common/debug/anf_ir_dump.h"
31 #include "include/common/utils/utils.h"
32 #include "kernel/framework_utils.h"
33 #include "kernel/kernel.h"
34 #include "mindspore/core/ops/math_ops.h"
35 #include "mindspore/core/ops/sequence_ops.h"
36 #include "utils/log_adapter.h"
37 
38 namespace mindspore::graphkernel {
39 namespace {
GetUniqReduceAxes(const AnfNodePtr & node,bool is_ascend=false)40 std::set<int64_t> GetUniqReduceAxes(const AnfNodePtr &node, bool is_ascend = false) {
41   if (!IsPrimitiveCNode(node, prim::kPrimReduceSum)) {
42     MS_LOG(EXCEPTION) << "Expect ReduceSum node, but got " << common::AnfAlgo::GetCNodeName(node);
43   }
44 
45   auto input = node->cast<CNodePtr>()->input(kFirstDataInputIndex);
46   ShapeVector src_shape_vec;
47   if (is_ascend) {
48     src_shape_vec = GetDeviceShape(input);
49   } else {
50     src_shape_vec = GetShape(input);
51   }
52   auto axis_vec = GetReduceAxis(node);
53   if (axis_vec.empty()) {
54     axis_vec.resize(src_shape_vec.size());
55     for (size_t i = 0; i < src_shape_vec.size(); ++i) {
56       axis_vec[i] = SizeToLong(i);
57     }
58   } else {
59     (void)std::transform(axis_vec.begin(), axis_vec.end(), axis_vec.begin(), [&src_shape_vec](int64_t axis) -> int64_t {
60       return axis < 0 ? axis + SizeToLong(src_shape_vec.size()) : axis;
61     });
62   }
63 
64   std::set<int64_t> axis_set(axis_vec.begin(), axis_vec.end());
65   return axis_set;
66 }
67 
HaveReduceInPredecessors(const AnfNodePtr & node)68 bool HaveReduceInPredecessors(const AnfNodePtr &node) {
69   std::stack<AnfNodePtr> st;
70   st.push(node);
71   while (!st.empty()) {
72     auto n = st.top();
73     st.pop();
74 
75     if (n != node) {
76       if (!n->isa<CNode>()) {
77         continue;
78       }
79       if (IsPrimitiveCNode(n, prim::kPrimReduceSum)) {
80         return true;
81       }
82     }
83 
84     auto n_inputs = n->cast<CNodePtr>()->inputs();
85     (void)std::for_each(n_inputs.cbegin() + 1, n_inputs.cend(), [&st](const AnfNodePtr &n) -> void { st.push(n); });
86   }
87 
88   return false;
89 }
90 }  // namespace
91 
Init()92 std::shared_ptr<AtomicAddChecker> AtomicAddChecker::Init() {
93   auto processor = kernel::GetProcessorFromContext();
94   if (processor == kernel::Processor::AICORE) {
95     return std::make_shared<AtomicAddCheckerAscend>();
96   } else if (processor == kernel::Processor::CUDA) {
97     return std::make_shared<AtomicAddCheckerGPU>();
98   }
99   return nullptr;
100 }
101 
FindCandidate(const AnfNodePtr & anf_node)102 bool AtomicAddChecker::FindCandidate(const AnfNodePtr &anf_node) {
103   atomic_add_infos_.clear();
104   auto node = anf_node->cast<CNodePtr>();
105   MS_EXCEPTION_IF_NULL(node);
106   auto sub_graph = common::AnfAlgo::GetCNodeFuncGraphPtr(node);
107   auto mng_sub = sub_graph->manager();
108   if (mng_sub == nullptr) {
109     mng_sub = Manage(sub_graph, false);
110     sub_graph->set_manager(mng_sub);
111   }
112 
113   auto CheckSuitableTarget = [&mng_sub](const InplaceAssignerInfo &atomic_add_info) {
114     // Target type should not fuse any other ops in out direction, which means it should be in output list.
115     return mng_sub->node_users()[atomic_add_info.op_node].size() <= 1;
116   };
117 
118   auto real_return_node = sub_graph->get_return()->input(kFirstDataInputIndex);
119   InplaceAssignerInfo atomic_add_info;
120   if (IsPrimitiveCNode(real_return_node, prim::kPrimMakeTuple)) {
121     const auto &inputs = real_return_node->cast<CNodePtr>()->inputs();
122     for (size_t i = 1; i < inputs.size(); ++i) {
123       if (IsPrimitiveCNode(inputs[i], target_type_)) {
124         atomic_add_info.op_node = inputs[i]->cast<CNodePtr>();
125         atomic_add_info.real_output_index = i - 1;
126         atomic_add_info.real_output_num = inputs.size() - 1;
127         // Target type should not fuse any other ops in out direction, which means it should be in output list.
128         if (!CheckSuitableTarget(atomic_add_info)) {
129           continue;
130         }
131         atomic_add_infos_.push_back(atomic_add_info);
132       }
133     }
134   } else if (IsPrimitiveCNode(real_return_node, target_type_)) {
135     atomic_add_info.op_node = real_return_node->cast<CNodePtr>();
136     atomic_add_info.real_output_num = 1;
137     if (CheckSuitableTarget(atomic_add_info)) {
138       atomic_add_infos_.push_back(atomic_add_info);
139     }
140   } else {
141     return false;
142   }
143 
144   return !atomic_add_infos_.empty();
145 }
146 
CanActivateAtomicAdd(const AnfNodePtr & anf_node)147 bool AtomicAddChecker::CanActivateAtomicAdd(const AnfNodePtr &anf_node) {
148   // Rules to activate atomic add:
149   // 1. Find only one ReduceSum inside sub-graph, and it should not fuse any other ops in out direction,
150   //    which mean it should be in output list.
151   // 2. The reduce axis and reduce number should meet condition:
152   //    (GPU) all-reduce or reduce-x when fuse number is greater than or equal to 1024, or reduce-y.
153   //    (Ascend) The first valid axis of the input data is the reduce axis or the non-reduce axis
154   //    cannot make full use of multi-core.
155   // 3. No other ReduceSum as output ReduceSum's predecessors (reduce compile limitation).
156 
157   // Rule 1.
158   if (!FindCandidate(anf_node) || atomic_add_infos_.size() > 1) {
159     return false;
160   }
161 
162   // Rule 2.
163   if (!SuitableForAtomicAdd(atomic_add_infos_[0].op_node)) {
164     return false;
165   }
166 
167   // Rule 3.
168   return !HaveReduceInPredecessors(atomic_add_infos_[0].op_node);
169 }
170 
Check(const AnfNodePtr & node)171 bool AtomicAddChecker::Check(const AnfNodePtr &node) {
172   return (common::AnfAlgo::IsGraphKernel(node) && CanActivateAtomicAdd(node));
173 }
174 
SuitableForAtomicAdd(const AnfNodePtr & node)175 bool AtomicAddCheckerGPU::SuitableForAtomicAdd(const AnfNodePtr &node) {
176   auto input = node->cast<CNodePtr>()->input(kFirstDataInputIndex);
177   auto src_shape_vec = GetShape(input);
178   std::set<int64_t> axis_set = GetUniqReduceAxes(node);
179 
180   // For reduce whose last dim is reduced (including all-reduce),
181   // it is suitable for atomic add only the reduce num is greater than or equal to 1024.
182   if (axis_set.count(src_shape_vec.size() - 1) != 0) {
183     size_t reduce_size = std::accumulate(
184       axis_set.begin(), axis_set.end(), LongToSize(1),
185       [&src_shape_vec](size_t size, int64_t axis) { return size * LongToSize(src_shape_vec[LongToSize(axis)]); });
186     return reduce_size >= 1024;
187   }
188 
189   // For reduce whose last dim is not reduced, always true.
190   return true;
191 }
192 
SuitableForAtomicAdd(const AnfNodePtr & node)193 bool AtomicAddCheckerAscend::SuitableForAtomicAdd(const AnfNodePtr &node) {
194   auto input = node->cast<CNodePtr>()->input(kFirstDataInputIndex);
195 
196   // Atomic addition is enabled only when the data type is fp32
197   auto type = AnfAlgo::GetOutputDeviceDataType(input, 0);
198   if (type != kNumberTypeFloat32) {
199     return false;
200   }
201 
202   // If the first valid axis of the input data is the reduce axis, enable atomic addition
203   auto src_shape_vec = GetDeviceShape(input);
204   std::set<int64_t> reduce_axis_set = GetUniqReduceAxes(node, true);
205   auto start_with_reduce = false;
206   for (size_t i = 0; i < src_shape_vec.size(); ++i) {
207     auto dim = src_shape_vec[i];
208     if (dim != 1) {
209       if (reduce_axis_set.count(i) != 0) {
210         start_with_reduce = true;
211       }
212       break;
213     }
214   }
215   if (start_with_reduce) {
216     return true;
217   }
218 
219   // If the non-reduce axis cannot make full use of multi-core, enable atomic addition
220   constexpr auto processor_core_num = 32LL;
221   auto start_non_reduce_dim = 1LL;
222   for (size_t i = 0; i < src_shape_vec.size(); ++i) {
223     auto dim = src_shape_vec[i];
224     if (reduce_axis_set.count(i) != 0) {
225       break;
226     }
227     start_non_reduce_dim = start_non_reduce_dim * dim;
228   }
229   if (start_non_reduce_dim < processor_core_num) {
230     return true;
231   }
232 
233   return false;
234 }
235 
InsertAtomicClean(const FuncGraphPtr & main_graph,const AnfNodePtr & anf_node,const std::vector<InplaceAssignerInfo> & atomic_add_infos,const FuncGraphManagerPtr & mng)236 void AtomicCleanInserter::InsertAtomicClean(const FuncGraphPtr &main_graph, const AnfNodePtr &anf_node,
237                                             const std::vector<InplaceAssignerInfo> &atomic_add_infos,
238                                             const FuncGraphManagerPtr &mng) {
239   auto origin_composite_node = anf_node->cast<CNodePtr>();
240   MS_EXCEPTION_IF_NULL(origin_composite_node);
241 
242   // Create broadcast node.
243   std::vector<std::pair<InplaceAssignerInfo, AnfNodePtr>> info_and_inplace_assignee_addr;
244   for (auto atomic_add_info : atomic_add_infos) {
245     auto out_type = GetType(atomic_add_info.op_node)->cast<TensorTypePtr>();
246     MS_EXCEPTION_IF_NULL(out_type);
247     auto broadcast_to_node = CreateCleanCompositeNode(atomic_add_info, main_graph, out_type->element()->type_id());
248     (void)info_and_inplace_assignee_addr.emplace_back(atomic_add_info, broadcast_to_node);
249   }
250 
251   // Insert extra input(broadcast node output) to composite node, and make ReduceSum inplace-assign to it.
252   ProcessOriginCNode(origin_composite_node, info_and_inplace_assignee_addr);
253 
254   // Insert Depend before origin ReduceSum's user to keep execution order.
255   ProcessOriginCNodeUser(main_graph, origin_composite_node, info_and_inplace_assignee_addr, mng);
256   std::stringstream ss;
257   ss << "Target node: " << origin_composite_node->fullname_with_scope() << ", clean nodes: ";
258   for (auto iter : info_and_inplace_assignee_addr) {
259     ss << iter.second->fullname_with_scope() << ", ";
260   }
261 
262   MS_LOG(INFO) << ss.str();
263 }
264 
Run(const FuncGraphPtr & func_graph)265 bool AtomicCleanInserter::Run(const FuncGraphPtr &func_graph) {
266   auto kernel_graph = std::dynamic_pointer_cast<session::KernelGraph>(func_graph);
267   MS_EXCEPTION_IF_NULL(kernel_graph);
268   auto mng = kernel_graph->manager();
269   if (mng == nullptr) {
270     mng = Manage(kernel_graph, true);
271     kernel_graph->set_manager(mng);
272   }
273 
274   bool changed = false;
275   std::shared_ptr<AtomicAddChecker> atomic_add_checker = AtomicAddChecker::Init();
276   if (atomic_add_checker == nullptr) {
277     return changed;
278   }
279 
280   auto topo_nodes = TopoSort(kernel_graph->get_return());
281   for (const auto &node : topo_nodes) {
282     if (!atomic_add_checker->Check(node)) {
283       continue;
284     }
285     auto atomic_add_infos = atomic_add_checker->GetAtomicAddInfo();
286     InsertAtomicClean(kernel_graph, node, atomic_add_infos, mng);
287     changed = true;
288   }
289 
290   if (changed) {
291     mng->RemoveRoots();
292     mng->KeepRoots({func_graph});
293   }
294 
295   return changed;
296 }
297 }  // namespace mindspore::graphkernel
298