• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "backend/kernel_compiler/tbe/tbe_kernel_parallel_build.h"
18 #include <memory>
19 #include <set>
20 #include <algorithm>
21 #include <vector>
22 #include <string>
23 #include "utils/ms_context.h"
24 #include "backend/kernel_compiler/common_utils.h"
25 #include "backend/kernel_compiler/tbe/tbe_adapter.h"
26 #include "backend/kernel_compiler/tbe/tbe_kernel_build.h"
27 #include "backend/kernel_compiler/tbe/tbe_kernel_mod.h"
28 #include "backend/session/anf_runtime_algorithm.h"
29 #include "backend/kernel_compiler/tbe/tbe_convert_utils.h"
30 #include "backend/kernel_compiler/tbe/tbe_utils.h"
31 #include "backend/kernel_compiler/tbe/tbe_dynaminc_shape_util.h"
32 #include "utils/trace_base.h"
33 #include "utils/json_operation_utils.h"
34 
35 namespace mindspore {
36 namespace kernel {
37 using mindspore::kernel::tbe::TbeUtils;
TbeOpParallelBuild(const std::vector<AnfNodePtr> & anf_nodes)38 bool TbeOpParallelBuild(const std::vector<AnfNodePtr> &anf_nodes) {
39   auto build_manger = std::make_shared<ParallelBuildManager>();
40   MS_EXCEPTION_IF_NULL(build_manger);
41   static std::set<std::string> processed_kernel = {};
42   auto context_ptr = MsContext::GetInstance();
43   MS_EXCEPTION_IF_NULL(context_ptr);
44   auto tune_mode = context_ptr->get_param<std::string>(MS_CTX_TUNE_MODE);
45   std::string offline_tune = common::GetEnv("ENABLE_TUNE_DUMP");
46   if (!offline_tune.empty()) {
47     for (size_t j = 0; j < offline_tune.length(); j++) {
48       offline_tune[j] = tolower(offline_tune[j]);
49     }
50     if (!(offline_tune == "true" || offline_tune == "false")) {
51       MS_LOG(ERROR) << "Invalid environment variable 'ENABLE_TUNE_DUMP', it should be 'true' or 'false', but got "
52                     << tune_mode;
53       return false;
54     }
55   }
56 
57   for (const auto &anf_node : anf_nodes) {
58     // gen kernel json
59     if (AnfAlgo::GetKernelMod(anf_node) != nullptr) {
60       continue;
61     }
62     nlohmann::json kernel_json;
63     TbeKernelJsonCreator creator(SINGLE_BUILD);
64     if (!creator.GenTbeSingleKernelJson(anf_node, &kernel_json)) {
65       MS_LOG(ERROR) << "GenTbeSingleKernelJson failed";
66       TbeUtils::SaveJsonInfo(kernel_json["op_info"]["kernel_name"], kernel_json.dump());
67       return false;
68     }
69     // get size
70     std::vector<size_t> input_size_list;
71     std::vector<size_t> output_size_list;
72     (void)TbeKernelBuild::GetIOSize(kernel_json, &input_size_list, &output_size_list);
73     // search cache
74     const std::string &json_name = creator.json_name();
75     if (build_manger->SearchInCache(json_name, input_size_list, output_size_list, anf_node.get()) &&
76         ((!offline_tune.empty() && offline_tune != "true") || tune_mode == "NO_TUNE")) {
77       continue;
78     }
79     // same op not need build, but need wait build finish to set kernel mode
80     if (processed_kernel.find(json_name) != processed_kernel.end()) {
81       build_manger->SaveSameOpInfo(anf_node, json_name, input_size_list, output_size_list);
82       continue;
83     }
84     (void)processed_kernel.insert(json_name);
85     // op build
86     TbeUtils::SaveJsonInfo(kernel_json["op_info"]["kernel_name"], kernel_json.dump());
87     auto task_id = ParallelBuildManager::StartCompileOp(kernel_json);
88     build_manger->SaveTaskInfo(task_id, anf_node, json_name, input_size_list, output_size_list);
89   }
90   while (!build_manger->IsAllTaskFinish()) {
91     int task_id = -1;
92     std::string task_result;
93     std::string build_result;
94     auto ret = ParallelBuildManager::WaitOne(&task_id, &task_result, &build_result);
95     if (!ret) {
96       MS_EXCEPTION(ArgumentError) << "Build Failed. wait one ret:" << ret << ", task id:" << task_id
97                                   << " trace: " << trace::DumpSourceLines(build_manger->GetAnfNodeByTaskID(task_id));
98     }
99 
100     if (task_result != "Success") {
101       MS_EXCEPTION(ArgumentError) << "task compile Failed, task id:" << task_id << ", cause:" << task_result
102                                   << " trace: " << trace::DumpSourceLines(build_manger->GetAnfNodeByTaskID(task_id));
103     }
104     (void)build_manger->TaskFinishProcess(task_id, build_result);
105   }
106   return build_manger->GenSameOpKernelMod();
107 }
108 
~ParallelBuildManager()109 ParallelBuildManager::~ParallelBuildManager() { ResetTaskInfo(); }
110 
SavePreBuildTaskInfo(int32_t task_id,const AnfNodePtr & anf_node,const std::string & json_name)111 void ParallelBuildManager::SavePreBuildTaskInfo(int32_t task_id, const AnfNodePtr &anf_node,
112                                                 const std::string &json_name) {
113   MS_LOG(DEBUG) << "SavePreBuildTaskInfo, task id: " << task_id;
114   struct KernelBuildTaskInfo task_info;
115   task_info.node = anf_node;
116   task_info.json_name = json_name;
117   if (anf_node == nullptr) {
118     task_info.processor = tbe::kProcessorAiCore;
119   } else {
120     task_info.processor = tbe::GetProcessor(anf_node);
121   }
122   task_info.scope_id = 0;
123   pre_build_task_map_[task_id] = task_info;
124 }
125 
SaveTaskInfo(int32_t task_id,const mindspore::AnfNodePtr & anf_node,const std::string & json_name,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list,int64_t scope_id)126 void ParallelBuildManager::SaveTaskInfo(int32_t task_id, const mindspore::AnfNodePtr &anf_node,
127                                         const std::string &json_name, const std::vector<size_t> &input_size_list,
128                                         const std::vector<size_t> &output_size_list, int64_t scope_id) {
129   MS_LOG(DEBUG) << "SaveTaskInfo, task id: " << task_id;
130   struct KernelBuildTaskInfo task_info;
131   task_info.node = anf_node;
132   task_info.json_name = json_name;
133   if (anf_node == nullptr) {
134     task_info.processor = tbe::kProcessorAiCore;
135   } else {
136     task_info.processor = tbe::GetProcessor(anf_node);
137   }
138   task_info.input_size_list.assign(input_size_list.begin(), input_size_list.end());
139   task_info.output_size_list.assign(output_size_list.begin(), output_size_list.end());
140   task_info.scope_id = scope_id;
141   task_map_[task_id] = task_info;
142 }
143 
IsAllTaskFinish() const144 bool ParallelBuildManager::IsAllTaskFinish() const {
145   MS_LOG(INFO) << "wait process task_num: " << task_map_.size();
146   return task_map_.empty();
147 }
148 
PreTaskFinishProcess(int32_t task_id,const std::string & pre_build_result)149 void ParallelBuildManager::PreTaskFinishProcess(int32_t task_id, const std::string &pre_build_result) {
150   MS_LOG(DEBUG) << "can find pre task_id : " << task_id << " result:" << pre_build_result;
151   auto task_iter = pre_build_task_map_.find(task_id);
152   if (task_iter == pre_build_task_map_.end()) {
153     MS_EXCEPTION(ArgumentError) << "can find pre task_id:" << task_id;
154   }
155   nlohmann::json result;
156   if (!ParseJson(pre_build_result, &result)) {
157     MS_LOG(EXCEPTION) << "Parse prebuild result error.";
158   }
159   auto fusion_name = GetJsonValue<std::string>(result, "op_pattern");
160   auto fusion_type = kernel::GetFusionTypeByName(fusion_name);
161   auto output_data_desc = GetJsonValue<nlohmann::json>(result, "op_params");
162 
163   auto node = task_iter->second.node;
164   AnfAlgo::SetFusionType(node, fusion_type);
165   AnfAlgo::SetOutputDataDesc(node, {output_data_desc});
166   (void)pre_build_task_map_.erase(task_iter);
167 }
168 
TaskFinishProcess(int32_t task_id,const std::string & build_ret,bool set_kernel_mod)169 std::pair<int32_t, KernelModPtr> ParallelBuildManager::TaskFinishProcess(int32_t task_id, const std::string &build_ret,
170                                                                          bool set_kernel_mod) {
171   auto task_iter = task_map_.find(task_id);
172   if (task_iter == task_map_.end()) {
173     MS_EXCEPTION(ArgumentError) << "can find task_id:" << task_id;
174   }
175   auto json_name = task_iter->second.json_name;
176   auto processor = task_iter->second.processor;
177   auto kernel_pack = TbeUtils::InsertCache(json_name, processor);
178   if (kernel_pack == nullptr) {
179     if (set_kernel_mod) {
180       MS_EXCEPTION(ArgumentError) << "Can not find .json file or the binary .o file for op "
181                                   << task_iter->second.json_name << ", go check the cache files in kernel_meta/";
182     } else {
183       MS_LOG(INFO) << "fusion build kernel name:" << task_iter->second.json_name << "failed.";
184       auto fusion_kernel_mod = std::make_pair(task_iter->second.scope_id, nullptr);
185       (void)task_map_.erase(task_iter);
186       return fusion_kernel_mod;
187     }
188   }
189   auto kernel_mod = GenKernelMod(task_iter->second.input_size_list, task_iter->second.output_size_list, kernel_pack);
190   MS_EXCEPTION_IF_NULL(kernel_mod);
191   if (set_kernel_mod) {
192     auto cur_node = task_iter->second.node;
193     MS_EXCEPTION_IF_NULL(cur_node);
194     if (AnfAlgo::IsDynamicShape(cur_node) && (build_ret.empty() || build_ret.find("vars") == std::string::npos)) {
195       MS_LOG(EXCEPTION) << "Build failed. The build result of dynamic shape op [" << AnfAlgo::GetCNodeName(cur_node)
196                         << "] should not be empty, or can not find key ['vars'] in the result. build_res:[" << build_ret
197                         << "].";
198     }
199     AnfAlgo::SetKernelMod(kernel_mod, cur_node.get());
200     MS_LOG(INFO) << json_name << ": save compile info to json file, compile_info:" << build_ret;
201     std::string old_build = common::GetEnv("MS_OLD_BUILD_PROCESS");
202     if (!old_build.empty()) {
203       AnfAlgo::SetNodeAttr(kAttrCompileInfo, MakeValue(build_ret), cur_node);
204     } else {
205       bool save_flag = true;
206       TbeUtils::SaveCompileInfo(json_name, build_ret, &save_flag);
207       if (!save_flag) {
208         MS_LOG(EXCEPTION) << "Save json file failed, compile_info:" << build_ret;
209       }
210     }
211   }
212   auto ret = std::make_pair(task_iter->second.scope_id, kernel_mod);
213   (void)task_map_.erase(task_iter);
214   MS_LOG(INFO) << "wait process remain task_num:" << task_map_.size();
215   return ret;
216 }
217 
SaveSameOpInfo(const mindspore::AnfNodePtr & anf_node,const std::string & json_name,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list)218 void ParallelBuildManager::SaveSameOpInfo(const mindspore::AnfNodePtr &anf_node, const std::string &json_name,
219                                           const std::vector<size_t> &input_size_list,
220                                           const std::vector<size_t> &output_size_list) {
221   struct KernelBuildTaskInfo task_info;
222   task_info.node = anf_node;
223   task_info.json_name = json_name;
224   task_info.processor = tbe::GetProcessor(anf_node);
225   task_info.input_size_list.assign(input_size_list.begin(), input_size_list.end());
226   task_info.output_size_list.assign(output_size_list.begin(), output_size_list.end());
227   same_op_list_.push_back(task_info);
228 }
229 
SaveSameFusionOpInfo(const int64_t scope_id,const std::string & json_name,const std::string & processor,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list)230 void ParallelBuildManager::SaveSameFusionOpInfo(const int64_t scope_id, const std::string &json_name,
231                                                 const std::string &processor,
232                                                 const std::vector<size_t> &input_size_list,
233                                                 const std::vector<size_t> &output_size_list) {
234   struct KernelBuildTaskInfo task_info;
235   task_info.scope_id = scope_id;
236   task_info.json_name = json_name;
237   task_info.processor = processor;
238   task_info.input_size_list.assign(input_size_list.begin(), input_size_list.end());
239   task_info.output_size_list.assign(output_size_list.begin(), output_size_list.end());
240   same_op_list_.push_back(task_info);
241 }
242 
GenSameOpKernelMod() const243 bool ParallelBuildManager::GenSameOpKernelMod() const {
244   for (const auto &task_info : same_op_list_) {
245     bool ret =
246       SearchInCache(task_info.json_name, task_info.input_size_list, task_info.output_size_list, task_info.node.get());
247     if (!ret) {
248       MS_LOG(INFO) << "can't find " << task_info.json_name << " in cache.";
249       return false;
250     }
251   }
252   return true;
253 }
254 
GenSameFusionOpKernelMod(std::map<int64_t,KernelModPtr> * kernel_mode_ret) const255 bool ParallelBuildManager::GenSameFusionOpKernelMod(std::map<int64_t, KernelModPtr> *kernel_mode_ret) const {
256   MS_EXCEPTION_IF_NULL(kernel_mode_ret);
257   bool ret = true;
258   for (const auto &task_info : same_op_list_) {
259     auto kernel_pack = TbeUtils::SearchCache(task_info.json_name);
260     if (kernel_pack != nullptr) {
261       auto kernel_mode = GenKernelMod(task_info.input_size_list, task_info.output_size_list, kernel_pack);
262       if (kernel_mode != nullptr) {
263         (*kernel_mode_ret)[task_info.scope_id] = kernel_mode;
264         continue;
265       }
266     }
267     MS_LOG(INFO) << "can't find " << task_info.json_name << " in cache.";
268     ret = false;
269   }
270   return ret;
271 }
272 
SearchInCache(const std::string & json_name,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list,mindspore::AnfNode * node) const273 bool ParallelBuildManager::SearchInCache(const std::string &json_name, const std::vector<size_t> &input_size_list,
274                                          const std::vector<size_t> &output_size_list, mindspore::AnfNode *node) const {
275   auto cached_kernel_pack = TbeUtils::SearchCache(json_name);
276   if (cached_kernel_pack != nullptr) {
277     auto kernel_mod_ptr = GenKernelMod(input_size_list, output_size_list, cached_kernel_pack);
278     MS_EXCEPTION_IF_NULL(kernel_mod_ptr);
279     AnfAlgo::SetKernelMod(kernel_mod_ptr, node);
280     return true;
281   } else {
282     return false;
283   }
284 }
285 
GenKernelMod(const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list,const mindspore::kernel::KernelPackPtr & kernel_pack) const286 KernelModPtr ParallelBuildManager::GenKernelMod(const std::vector<size_t> &input_size_list,
287                                                 const std::vector<size_t> &output_size_list,
288                                                 const mindspore::kernel::KernelPackPtr &kernel_pack) const {
289   MS_EXCEPTION_IF_NULL(kernel_pack);
290   auto kernel_json_info = kernel_pack->kernel_json_info();
291   auto kernel_mod_ptr = std::make_shared<TbeKernelMod>(kernel_pack);
292   MS_EXCEPTION_IF_NULL(kernel_mod_ptr);
293   kernel_mod_ptr->SetInputSizeList(input_size_list);
294   kernel_mod_ptr->SetOutputSizeList(output_size_list);
295   kernel_mod_ptr->SetWorkspaceSizeList(kernel_json_info.workspaces);
296   return kernel_mod_ptr;
297 }
298 
StartCompileOp(const nlohmann::json & kernel_json)299 int ParallelBuildManager::StartCompileOp(const nlohmann::json &kernel_json) {
300   auto tune_mode = kernel_json["SocInfo"]["autoTilingMode"];
301   return AscendKernelBuildClient::Instance().TbeStart(kernel_json.dump(), tune_mode);
302 }
303 
ProcessTbeJob(const nlohmann::json & kernel_json)304 std::string ParallelBuildManager::ProcessTbeJob(const nlohmann::json &kernel_json) {
305   return AscendKernelBuildClient::Instance().TbeSendJob(kernel_json.dump());
306 }
307 
WaitOne(int * task_id,std::string * task_result,std::string * pre_build_result)308 bool ParallelBuildManager::WaitOne(int *task_id, std::string *task_result, std::string *pre_build_result) {
309   MS_EXCEPTION_IF_NULL(task_id);
310   return AscendKernelBuildClient::Instance().TbeWait(task_id, task_result, pre_build_result);
311 }
312 
ResetTaskInfo()313 void ParallelBuildManager::ResetTaskInfo() noexcept {
314   task_map_.clear();
315   same_op_list_.clear();
316   pre_build_task_map_.clear();
317 }
318 
GetAnfNodeByTaskID(int32_t task_id)319 AnfNodePtr ParallelBuildManager::GetAnfNodeByTaskID(int32_t task_id) {
320   auto find_iter = task_map_.find(task_id);
321   if (find_iter != task_map_.end()) {
322     return find_iter->second.node;
323   }
324   return nullptr;
325 }
326 }  // namespace kernel
327 }  // namespace mindspore
328