1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "runtime/device/ascend/tasksink/task_generator.h"
18
19 #include <runtime/rt.h>
20 #include "backend/kernel_compiler/task_stream.h"
21 #include "utils/ms_utils.h"
22 #ifndef ENABLE_SECURITY
23 #include "runtime/device/ascend/profiling/profiling_utils.h"
24 #include "runtime/device/ascend/profiling/profiling_manager.h"
25 #endif
26 #ifdef ENABLE_DUMP_IR
27 #include "debug/rdr/running_data_recorder.h"
28 #endif
29
30 namespace mindspore {
31 namespace device {
32 namespace ascend {
33 namespace tasksink {
GenTasks(const std::vector<CNodePtr> & anf_node_list,std::vector<TaskInfoPtr> * task_info_list,uint32_t graph_id)34 bool TaskGenerator::GenTasks(const std::vector<CNodePtr> &anf_node_list, std::vector<TaskInfoPtr> *task_info_list,
35 uint32_t graph_id) {
36 MS_LOG(INFO) << "GenTasks start...";
37 MS_EXCEPTION_IF_NULL(task_info_list);
38 // Traverse graph applykernel list and run
39 if (!LaunchAllKernel(anf_node_list, task_info_list, graph_id)) {
40 MS_LOG(ERROR) << "LaunchAllKernel failed";
41 return false;
42 }
43 MS_LOG(INFO) << "GenTasks end...";
44 #ifdef ENABLE_DUMP_IR
45 string task_info_name = "task_info_graph." + std::to_string(graph_id);
46 (void)mindspore::RDR::RecordTaskDebugInfo(SUBMODULE_ID, task_info_name, task_debug_info_list_);
47 auto context_ptr = MsContext::GetInstance();
48 MS_EXCEPTION_IF_NULL(context_ptr);
49 bool save_graphs = context_ptr->get_param<bool>(MS_CTX_SAVE_GRAPHS_FLAG);
50 if (save_graphs) {
51 #ifndef ENABLE_SECURITY
52 std::string file_path = GetSaveGraphsPathName("task_info_graph_" + std::to_string(graph_id) + ".ir");
53 DumpTaskInfo(file_path);
54 #endif
55 }
56 #endif
57 return true;
58 }
59
LaunchAddrCleanAkgKernel(const CNodePtr & anf_node_ptr,AddressPtrList * kernel_inputs)60 void TaskGenerator::LaunchAddrCleanAkgKernel(const CNodePtr &anf_node_ptr, AddressPtrList *kernel_inputs) {
61 MS_EXCEPTION_IF_NULL(anf_node_ptr);
62 MS_EXCEPTION_IF_NULL(kernel_inputs);
63 // akg process
64 // set atomic clean addr
65 if (AnfAlgo::HasNodeAttr(kAttrAtomicOutputIndexs, anf_node_ptr)) {
66 auto clean_output_indexs = AnfAlgo::GetNodeAttr<std::vector<size_t>>(anf_node_ptr, kAttrAtomicOutputIndexs);
67 auto graph = anf_node_ptr->func_graph();
68 MS_EXCEPTION_IF_NULL(graph);
69 auto manager = graph->manager();
70 MS_EXCEPTION_IF_NULL(manager);
71 auto &node_users = manager->node_users();
72 if (node_users[anf_node_ptr].empty()) {
73 MS_LOG(EXCEPTION) << "Node users of " << anf_node_ptr->ToString() << " is empty.";
74 }
75 auto depend_node = node_users[anf_node_ptr].pop().first;
76 if (!IsPrimitiveCNode(depend_node, prim::kPrimDepend)) {
77 MS_LOG(EXCEPTION) << "Checking Depend node failed";
78 }
79 if (node_users[depend_node].empty()) {
80 MS_LOG(EXCEPTION) << "Node users of " << depend_node->ToString() << " is empty.";
81 }
82 auto post_node = node_users[depend_node].pop().first;
83 for (auto index : clean_output_indexs) {
84 auto device_address = AnfAlgo::GetOutputAddr(post_node, index);
85 kernel::AddressPtr input = std::make_shared<kernel::Address>();
86 MS_EXCEPTION_IF_NULL(input);
87 input->addr = device_address->ptr_;
88 input->size = device_address->size_;
89 kernel_inputs->push_back(input);
90 }
91 MS_LOG(DEBUG) << "AtomicAddClean clean output size: " << clean_output_indexs.size();
92 }
93 }
94
LaunchAddrCleanKernel(const CNodePtr & anf_node_ptr,AddressPtrList * kernel_inputs)95 void TaskGenerator::LaunchAddrCleanKernel(const CNodePtr &anf_node_ptr, AddressPtrList *kernel_inputs) {
96 MS_EXCEPTION_IF_NULL(anf_node_ptr);
97 MS_EXCEPTION_IF_NULL(kernel_inputs);
98 // akg process
99 if (AnfAlgo::GetKernelType(anf_node_ptr) == KernelType::AKG_KERNEL) {
100 LaunchAddrCleanAkgKernel(anf_node_ptr, kernel_inputs);
101 return;
102 }
103 // tbe process
104 auto input_tensor_num = AnfAlgo::GetInputTensorNum(anf_node_ptr);
105 for (size_t i = 0; i < input_tensor_num; i++) {
106 // set clean output addr
107 MS_EXCEPTION_IF_NULL(anf_node_ptr->inputs()[i + 1]);
108 auto pre_node = anf_node_ptr->input(i + 1)->cast<CNodePtr>();
109 if (AnfAlgo::HasNodeAttr(kAttrAtomicOutputIndexs, pre_node)) {
110 auto clean_output_indexs = AnfAlgo::GetNodeAttr<std::vector<size_t>>(pre_node, kAttrAtomicOutputIndexs);
111 for (auto index : clean_output_indexs) {
112 auto device_address = AnfAlgo::GetOutputAddr(pre_node, index);
113 kernel::AddressPtr input = std::make_shared<kernel::Address>();
114 MS_EXCEPTION_IF_NULL(input);
115 input->addr = device_address->ptr_;
116 MS_EXCEPTION_IF_NULL(input->addr);
117 input->size = device_address->size_;
118 kernel_inputs->push_back(input);
119 }
120 MS_LOG(DEBUG) << "AtomicAddClean clean output size:" << clean_output_indexs.size();
121 }
122 // set clean workspace address
123 if (AnfAlgo::HasNodeAttr(kAttrAtomicWorkspaceIndexs, pre_node)) {
124 auto clean_workspace_indexs = AnfAlgo::GetNodeAttr<std::vector<size_t>>(pre_node, kAttrAtomicWorkspaceIndexs);
125 for (const auto &index : clean_workspace_indexs) {
126 auto device_address = AnfAlgo::GetWorkspaceAddr(pre_node, index);
127 kernel::AddressPtr workspace = std::make_shared<kernel::Address>();
128 MS_EXCEPTION_IF_NULL(workspace);
129 workspace->addr = device_address->ptr_;
130 MS_EXCEPTION_IF_NULL(workspace->addr);
131 workspace->size = device_address->size_;
132 kernel_inputs->push_back(workspace);
133 }
134 MS_LOG(DEBUG) << "AtomicAddClean clean workspace size:" << clean_workspace_indexs.size();
135 }
136 }
137 auto clear_mems = AnfAlgo::GetNodeAttr<std::vector<size_t>>(anf_node_ptr, kAttrAtomicAddMemSize);
138 if (kernel_inputs->size() != clear_mems.size()) {
139 MS_LOG(EXCEPTION) << "AtomicAddClean kernel inputs size not equal clear memory size, kernel inputs size:"
140 << kernel_inputs->size() << ",clean mem size" << clear_mems.size();
141 }
142 }
143
LaunchKernel(const CNodePtr & anf_node_ptr,uint32_t stream_id,std::vector<TaskInfoPtr> * task_info_list)144 bool TaskGenerator::LaunchKernel(const CNodePtr &anf_node_ptr, uint32_t stream_id,
145 std::vector<TaskInfoPtr> *task_info_list) {
146 MS_EXCEPTION_IF_NULL(task_info_list);
147 MS_EXCEPTION_IF_NULL(anf_node_ptr);
148 AddressPtrList kernel_inputs;
149 AddressPtrList kernel_workspaces;
150 AddressPtrList kernel_outputs;
151 auto kernel_mod = AnfAlgo::GetKernelMod(anf_node_ptr);
152 MS_EXCEPTION_IF_NULL(kernel_mod);
153 kernel_mod->set_unique_name(anf_node_ptr->UniqueName());
154 kernel_mod->set_fullname(anf_node_ptr->fullname_with_scope());
155 kernel_mod->set_is_monad(AnfAlgo::IsNodeInputContainMonad(anf_node_ptr) && HasAbstractMonad(anf_node_ptr));
156 auto op_name = AnfAlgo::GetCNodeName(anf_node_ptr);
157 constexpr size_t kNonePlaceholderIdx = 3;
158 if ((op_name == kSplitOpName || op_name == kSplitVOpName) && AnfAlgo::HasNodeAttr(kAttrNonTask, anf_node_ptr)) {
159 MS_LOG(INFO) << "Skip task generation for NonTask op " << anf_node_ptr->fullname_with_scope();
160 auto debug_info = std::make_shared<TaskDebugInfo>();
161 MS_EXCEPTION_IF_NULL(debug_info);
162 debug_info->op_name_ = anf_node_ptr->fullname_with_scope() + "-NonTask";
163 debug_info->task_num_ = 0;
164 task_debug_info_list_.push_back(debug_info);
165 return true;
166 }
167
168 if (op_name != kAtomicAddrCleanOpName) {
169 size_t input_num = AnfAlgo::GetInputTensorNum(anf_node_ptr);
170 for (size_t i = 0; i < input_num; ++i) {
171 if (op_name == kDynamicRNNOpName && i == kNonePlaceholderIdx) {
172 continue;
173 }
174 if (op_name == kDynamicGRUV2OpName) {
175 auto none_index = AnfAlgo::GetNodeAttr<std::vector<int64_t>>(anf_node_ptr, "placeholder_index");
176 auto item = find(none_index.begin(), none_index.end(), i);
177 if (item != none_index.end()) {
178 continue;
179 }
180 }
181 auto real_input_index = AnfAlgo::GetRealInputIndex(anf_node_ptr, i);
182 auto device_address = AnfAlgo::GetPrevNodeOutputAddr(anf_node_ptr, real_input_index);
183 AddressPtr input = std::make_shared<Address>();
184 MS_EXCEPTION_IF_NULL(input);
185 input->addr = device_address->ptr_;
186 input->size = device_address->size_;
187
188 auto prenode_with_index = AnfAlgo::GetPrevNodeOutput(anf_node_ptr, i);
189 MS_EXCEPTION_IF_NULL(prenode_with_index.first);
190 if (AnfAlgo::IsRealCNodeKernel(prenode_with_index.first)) {
191 if ((AnfAlgo::GetCNodeName(prenode_with_index.first) == kSplitOpName ||
192 AnfAlgo::GetCNodeName(prenode_with_index.first) == kSplitVOpName) &&
193 AnfAlgo::HasNodeAttr(kAttrNonTask, prenode_with_index.first->cast<CNodePtr>())) {
194 // use memory offset to implement NonTask Type Split op
195 // when op A -> split(NonTask) -> op B, op B's input addr is split's input0's addr + offset
196 // offset is split's output index * split's output size
197 auto split_input0_device_address = AnfAlgo::GetPrevNodeOutputAddr(prenode_with_index.first, 0);
198 MS_EXCEPTION_IF_NULL(split_input0_device_address);
199 input->addr =
200 static_cast<uint8_t *>(split_input0_device_address->ptr_) + (prenode_with_index.second * input->size);
201 MS_LOG(INFO) << "Change " << anf_node_ptr->fullname_with_scope() << "'s input " << i << " address to "
202 << split_input0_device_address->ptr_ << " + " << prenode_with_index.second * input->size;
203 }
204 }
205 kernel_inputs.push_back(input);
206 }
207
208 // No kernel output if output of the cnode is monad, such as LabelSwitch.
209 if (!HasAbstractMonad(anf_node_ptr)) {
210 size_t output_num = AnfAlgo::GetOutputTensorNum(anf_node_ptr);
211 for (size_t i = 0; i < output_num; ++i) {
212 auto it = AnfAlgo::GetOutputAddr(anf_node_ptr, i);
213 AddressPtr output = std::make_shared<Address>();
214 output->addr = it->ptr_;
215 output->size = it->size_;
216 kernel_outputs.push_back(output);
217 }
218 }
219
220 for (size_t i = 0; i < kernel_mod->GetWorkspaceSizeList().size(); ++i) {
221 auto device_address = AnfAlgo::GetWorkspaceAddr(anf_node_ptr, i);
222 kernel::AddressPtr workspace = std::make_shared<kernel::Address>();
223 MS_EXCEPTION_IF_NULL(workspace);
224 workspace->addr = device_address->ptr_;
225 workspace->size = device_address->size_;
226 kernel_workspaces.push_back(workspace);
227 }
228 } else {
229 LaunchAddrCleanKernel(anf_node_ptr, &kernel_inputs);
230 }
231
232 auto ascend_kernel_mod = dynamic_cast<kernel::AscendKernelMod *>(kernel_mod);
233 MS_EXCEPTION_IF_NULL(ascend_kernel_mod);
234 std::vector<TaskInfoPtr> task_info_ptrs =
235 ascend_kernel_mod->GenTask(kernel_inputs, kernel_workspaces, kernel_outputs, stream_id);
236 task_info_list->insert(task_info_list->end(), task_info_ptrs.begin(), task_info_ptrs.end());
237 auto debug_info = std::make_shared<TaskDebugInfo>();
238 MS_EXCEPTION_IF_NULL(debug_info);
239 if (task_info_ptrs.empty()) {
240 MS_LOG(ERROR) << "Empty task_info_ptrs.";
241 return false;
242 }
243 debug_info->op_name_ = anf_node_ptr->fullname_with_scope();
244 debug_info->task_num_ = task_info_ptrs.size();
245 debug_info->stream_id_ = task_info_ptrs[0]->stream_id();
246 debug_info->dump_flag_ = task_info_ptrs[0]->dump_flag();
247 debug_info->input_addrs_ = kernel_inputs;
248 debug_info->output_addrs_ = kernel_outputs;
249 debug_info->workspace_addrs_ = kernel_workspaces;
250 task_debug_info_list_.push_back(debug_info);
251 return true;
252 }
253
LaunchAllKernel(const std::vector<CNodePtr> & anf_node_list,std::vector<TaskInfoPtr> * task_info_list,uint32_t graph_id)254 bool TaskGenerator::LaunchAllKernel(const std::vector<CNodePtr> &anf_node_list,
255 std::vector<TaskInfoPtr> *task_info_list, uint32_t graph_id) {
256 uint32_t current_op_index = 0;
257 std::vector<CNodePtr> profiling_cnode_list;
258 std::vector<std::string> kernel_name_list;
259 for (const auto &anf_node_ptr : anf_node_list) {
260 size_t old_size = task_info_list->size();
261 uint32_t stream_id = AnfAlgo::GetStreamId(anf_node_ptr);
262 MS_EXCEPTION_IF_NULL(anf_node_ptr);
263 MS_LOG(INFO) << "Task gen launch begin, current_op_idx:" << current_op_index
264 << " name:" << anf_node_ptr->fullname_with_scope() << ", stream id:" << stream_id;
265 if (!LaunchKernel(anf_node_ptr, stream_id, task_info_list)) {
266 MS_LOG(ERROR) << "LaunchKernel failed.";
267 return false;
268 }
269 for (size_t i = old_size; i < task_info_list->size(); ++i) {
270 profiling_cnode_list.emplace_back(anf_node_ptr);
271 kernel_name_list.emplace_back(anf_node_ptr->fullname_with_scope());
272 }
273 current_op_index++;
274 }
275
276 #ifndef ENABLE_SECURITY
277 ProfilingUtils::SetGraphKernelName(graph_id, kernel_name_list);
278 if (ProfilingManager::GetInstance().IsProfiling()) {
279 ProfilingUtils::SetGraphProfilingCNode(graph_id, profiling_cnode_list);
280 }
281 #endif
282
283 return true;
284 }
285
286 #ifdef ENABLE_DUMP_IR
DumpTaskInfo(const string & real_filename,const std::vector<TaskDebugInfoPtr> & task_debug_info_list)287 void TaskGenerator::DumpTaskInfo(const string &real_filename,
288 const std::vector<TaskDebugInfoPtr> &task_debug_info_list) {
289 ChangeFileMode(real_filename, S_IRWXU);
290 SaveTaskDebugInfoToFile(real_filename, task_debug_info_list);
291 // set file mode to read only by user
292 ChangeFileMode(real_filename, S_IRUSR);
293 }
294
DumpTaskInfo(const std::string & real_filename)295 void TaskGenerator::DumpTaskInfo(const std::string &real_filename) {
296 if (real_filename.size() >= PATH_MAX) {
297 MS_LOG(ERROR) << "File path " << real_filename << " is too long.";
298 return;
299 }
300 char real_path[PATH_MAX] = {0};
301 #if defined(_WIN32) || defined(_WIN64)
302 if (_fullpath(real_path, filename.c_str(), PATH_MAX) == nullptr) {
303 MS_LOG(DEBUG) << "dir " << filename << " does not exit.";
304 }
305 #else
306 if (realpath(real_filename.c_str(), real_path) == nullptr) {
307 MS_LOG(DEBUG) << "Dir " << real_filename << " does not exit.";
308 }
309 #endif
310
311 std::string path_string = real_path;
312 ChangeFileMode(path_string, S_IRWXU);
313 SaveTaskDebugInfoToFile(path_string, task_debug_info_list_);
314 // set file mode to read only by user
315 ChangeFileMode(path_string, S_IRUSR);
316 }
317 #else
DumpTaskInfo(const std::string & real_filename)318 void TaskGenerator::DumpTaskInfo(const std::string &real_filename) {
319 static bool already_printed = false;
320 if (already_printed) {
321 return;
322 }
323 already_printed = true;
324 MS_LOG(WARNING) << "The functionality of dumping task debug info is disabled, "
325 << "please enable ENABLE_DUMP_IR with '-D on' and recomiple source.";
326 }
DumpTaskInfo(const string & real_filename,const std::vector<TaskDebugInfoPtr> & task_debug_info_list)327 void TaskGenerator::DumpTaskInfo(const string &real_filename,
328 const std::vector<TaskDebugInfoPtr> &task_debug_info_list) {
329 static bool already_printed = false;
330 if (already_printed) {
331 return;
332 }
333 already_printed = true;
334 MS_LOG(WARNING) << "The functionality of dumping task debug info is disabled, "
335 << "please enable ENABLE_DUMP_IR with '-D on' and recomiple source.";
336 }
337 #endif
338
SaveTaskDebugInfoToFile(const std::string & real_filename,const std::vector<TaskDebugInfoPtr> & task_debug_info_list)339 void TaskGenerator::SaveTaskDebugInfoToFile(const std::string &real_filename,
340 const std::vector<TaskDebugInfoPtr> &task_debug_info_list) {
341 std::ofstream fout(real_filename);
342
343 if (!fout.is_open()) {
344 MS_LOG(ERROR) << "Open dump file '" << real_filename << "' failed!";
345 return;
346 }
347
348 size_t index = 0;
349 for (auto &task_debug_info : task_debug_info_list) {
350 MS_EXCEPTION_IF_NULL(task_debug_info);
351 fout << "op_name:" << task_debug_info->op_name_ << "\n"
352 << "task_index:" << index << "\t"
353 << "task_num:" << task_debug_info->task_num_ << "\t"
354 << "task0_stream_id:" << task_debug_info->stream_id_ << "\t"
355 << "task0_type:" << task_debug_info->type_ << "\t"
356 << "task0_dump_flag:" << task_debug_info->dump_flag_ << "\n";
357 index++;
358 if (!task_debug_info->input_addrs_.empty()) {
359 fout << "input address:";
360 for (auto &input : task_debug_info->input_addrs_) {
361 MS_EXCEPTION_IF_NULL(input);
362 fout << input->addr << "(" << input->size << ")\t";
363 }
364 fout << "\n";
365 }
366
367 if (!task_debug_info->output_addrs_.empty()) {
368 fout << "output address:";
369 for (auto &output : task_debug_info->output_addrs_) {
370 MS_EXCEPTION_IF_NULL(output);
371 fout << output->addr << "(" << output->size << ")\t";
372 }
373 fout << "\n";
374 }
375
376 if (!task_debug_info->workspace_addrs_.empty()) {
377 fout << "workspace address:";
378 for (auto &workspace : task_debug_info->workspace_addrs_) {
379 MS_EXCEPTION_IF_NULL(workspace);
380 fout << workspace->addr << "(" << workspace->size << ")\t";
381 }
382 fout << "\n";
383 }
384 fout << "\n";
385 }
386
387 fout.close();
388 }
389 } // namespace tasksink
390 } // namespace ascend
391 } // namespace device
392 } // namespace mindspore
393