1 /**
2 * Copyright 2019 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "backend/kernel_compiler/tbe/tbe_kernel_parallel_build.h"
18 #include <memory>
19 #include <set>
20 #include <algorithm>
21 #include <vector>
22 #include <string>
23 #include "utils/ms_context.h"
24 #include "backend/kernel_compiler/common_utils.h"
25 #include "backend/kernel_compiler/tbe/tbe_adapter.h"
26 #include "backend/kernel_compiler/tbe/tbe_kernel_build.h"
27 #include "backend/kernel_compiler/tbe/tbe_kernel_mod.h"
28 #include "backend/session/anf_runtime_algorithm.h"
29 #include "backend/kernel_compiler/tbe/tbe_convert_utils.h"
30 #include "backend/kernel_compiler/tbe/tbe_utils.h"
31 #include "backend/kernel_compiler/tbe/tbe_dynaminc_shape_util.h"
32 #include "utils/trace_base.h"
33 #include "utils/json_operation_utils.h"
34
35 namespace mindspore {
36 namespace kernel {
37 using mindspore::kernel::tbe::TbeUtils;
TbeOpParallelBuild(const std::vector<AnfNodePtr> & anf_nodes)38 bool TbeOpParallelBuild(const std::vector<AnfNodePtr> &anf_nodes) {
39 auto build_manger = std::make_shared<ParallelBuildManager>();
40 MS_EXCEPTION_IF_NULL(build_manger);
41 static std::set<std::string> processed_kernel = {};
42 auto context_ptr = MsContext::GetInstance();
43 MS_EXCEPTION_IF_NULL(context_ptr);
44 auto tune_mode = context_ptr->get_param<std::string>(MS_CTX_TUNE_MODE);
45 std::string offline_tune = common::GetEnv("ENABLE_TUNE_DUMP");
46 if (!offline_tune.empty()) {
47 for (size_t j = 0; j < offline_tune.length(); j++) {
48 offline_tune[j] = tolower(offline_tune[j]);
49 }
50 if (!(offline_tune == "true" || offline_tune == "false")) {
51 MS_LOG(ERROR) << "Invalid environment variable 'ENABLE_TUNE_DUMP', it should be 'true' or 'false', but got "
52 << tune_mode;
53 return false;
54 }
55 }
56
57 for (const auto &anf_node : anf_nodes) {
58 // gen kernel json
59 if (AnfAlgo::GetKernelMod(anf_node) != nullptr) {
60 continue;
61 }
62 nlohmann::json kernel_json;
63 TbeKernelJsonCreator creator(SINGLE_BUILD);
64 if (!creator.GenTbeSingleKernelJson(anf_node, &kernel_json)) {
65 MS_LOG(ERROR) << "GenTbeSingleKernelJson failed";
66 TbeUtils::SaveJsonInfo(kernel_json["op_info"]["kernel_name"], kernel_json.dump());
67 return false;
68 }
69 // get size
70 std::vector<size_t> input_size_list;
71 std::vector<size_t> output_size_list;
72 (void)TbeKernelBuild::GetIOSize(kernel_json, &input_size_list, &output_size_list);
73 // search cache
74 const std::string &json_name = creator.json_name();
75 if (build_manger->SearchInCache(json_name, input_size_list, output_size_list, anf_node.get()) &&
76 ((!offline_tune.empty() && offline_tune != "true") || tune_mode == "NO_TUNE")) {
77 continue;
78 }
79 // same op not need build, but need wait build finish to set kernel mode
80 if (processed_kernel.find(json_name) != processed_kernel.end()) {
81 build_manger->SaveSameOpInfo(anf_node, json_name, input_size_list, output_size_list);
82 continue;
83 }
84 (void)processed_kernel.insert(json_name);
85 // op build
86 TbeUtils::SaveJsonInfo(kernel_json["op_info"]["kernel_name"], kernel_json.dump());
87 auto task_id = ParallelBuildManager::StartCompileOp(kernel_json);
88 build_manger->SaveTaskInfo(task_id, anf_node, json_name, input_size_list, output_size_list);
89 }
90 while (!build_manger->IsAllTaskFinish()) {
91 int task_id = -1;
92 std::string task_result;
93 std::string build_result;
94 auto ret = ParallelBuildManager::WaitOne(&task_id, &task_result, &build_result);
95 if (!ret) {
96 MS_EXCEPTION(ArgumentError) << "Build Failed. wait one ret:" << ret << ", task id:" << task_id
97 << " trace: " << trace::DumpSourceLines(build_manger->GetAnfNodeByTaskID(task_id));
98 }
99
100 if (task_result != "Success") {
101 MS_EXCEPTION(ArgumentError) << "task compile Failed, task id:" << task_id << ", cause:" << task_result
102 << " trace: " << trace::DumpSourceLines(build_manger->GetAnfNodeByTaskID(task_id));
103 }
104 (void)build_manger->TaskFinishProcess(task_id, build_result);
105 }
106 return build_manger->GenSameOpKernelMod();
107 }
108
~ParallelBuildManager()109 ParallelBuildManager::~ParallelBuildManager() { ResetTaskInfo(); }
110
SavePreBuildTaskInfo(int32_t task_id,const AnfNodePtr & anf_node,const std::string & json_name)111 void ParallelBuildManager::SavePreBuildTaskInfo(int32_t task_id, const AnfNodePtr &anf_node,
112 const std::string &json_name) {
113 MS_LOG(DEBUG) << "SavePreBuildTaskInfo, task id: " << task_id;
114 struct KernelBuildTaskInfo task_info;
115 task_info.node = anf_node;
116 task_info.json_name = json_name;
117 if (anf_node == nullptr) {
118 task_info.processor = tbe::kProcessorAiCore;
119 } else {
120 task_info.processor = tbe::GetProcessor(anf_node);
121 }
122 task_info.scope_id = 0;
123 pre_build_task_map_[task_id] = task_info;
124 }
125
SaveTaskInfo(int32_t task_id,const mindspore::AnfNodePtr & anf_node,const std::string & json_name,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list,int64_t scope_id)126 void ParallelBuildManager::SaveTaskInfo(int32_t task_id, const mindspore::AnfNodePtr &anf_node,
127 const std::string &json_name, const std::vector<size_t> &input_size_list,
128 const std::vector<size_t> &output_size_list, int64_t scope_id) {
129 MS_LOG(DEBUG) << "SaveTaskInfo, task id: " << task_id;
130 struct KernelBuildTaskInfo task_info;
131 task_info.node = anf_node;
132 task_info.json_name = json_name;
133 if (anf_node == nullptr) {
134 task_info.processor = tbe::kProcessorAiCore;
135 } else {
136 task_info.processor = tbe::GetProcessor(anf_node);
137 }
138 task_info.input_size_list.assign(input_size_list.begin(), input_size_list.end());
139 task_info.output_size_list.assign(output_size_list.begin(), output_size_list.end());
140 task_info.scope_id = scope_id;
141 task_map_[task_id] = task_info;
142 }
143
IsAllTaskFinish() const144 bool ParallelBuildManager::IsAllTaskFinish() const {
145 MS_LOG(INFO) << "wait process task_num: " << task_map_.size();
146 return task_map_.empty();
147 }
148
PreTaskFinishProcess(int32_t task_id,const std::string & pre_build_result)149 void ParallelBuildManager::PreTaskFinishProcess(int32_t task_id, const std::string &pre_build_result) {
150 MS_LOG(DEBUG) << "can find pre task_id : " << task_id << " result:" << pre_build_result;
151 auto task_iter = pre_build_task_map_.find(task_id);
152 if (task_iter == pre_build_task_map_.end()) {
153 MS_EXCEPTION(ArgumentError) << "can find pre task_id:" << task_id;
154 }
155 nlohmann::json result;
156 if (!ParseJson(pre_build_result, &result)) {
157 MS_LOG(EXCEPTION) << "Parse prebuild result error.";
158 }
159 auto fusion_name = GetJsonValue<std::string>(result, "op_pattern");
160 auto fusion_type = kernel::GetFusionTypeByName(fusion_name);
161 auto output_data_desc = GetJsonValue<nlohmann::json>(result, "op_params");
162
163 auto node = task_iter->second.node;
164 AnfAlgo::SetFusionType(node, fusion_type);
165 AnfAlgo::SetOutputDataDesc(node, {output_data_desc});
166 (void)pre_build_task_map_.erase(task_iter);
167 }
168
TaskFinishProcess(int32_t task_id,const std::string & build_ret,bool set_kernel_mod)169 std::pair<int32_t, KernelModPtr> ParallelBuildManager::TaskFinishProcess(int32_t task_id, const std::string &build_ret,
170 bool set_kernel_mod) {
171 auto task_iter = task_map_.find(task_id);
172 if (task_iter == task_map_.end()) {
173 MS_EXCEPTION(ArgumentError) << "can find task_id:" << task_id;
174 }
175 auto json_name = task_iter->second.json_name;
176 auto processor = task_iter->second.processor;
177 auto kernel_pack = TbeUtils::InsertCache(json_name, processor);
178 if (kernel_pack == nullptr) {
179 if (set_kernel_mod) {
180 MS_EXCEPTION(ArgumentError) << "Can not find .json file or the binary .o file for op "
181 << task_iter->second.json_name << ", go check the cache files in kernel_meta/";
182 } else {
183 MS_LOG(INFO) << "fusion build kernel name:" << task_iter->second.json_name << "failed.";
184 auto fusion_kernel_mod = std::make_pair(task_iter->second.scope_id, nullptr);
185 (void)task_map_.erase(task_iter);
186 return fusion_kernel_mod;
187 }
188 }
189 auto kernel_mod = GenKernelMod(task_iter->second.input_size_list, task_iter->second.output_size_list, kernel_pack);
190 MS_EXCEPTION_IF_NULL(kernel_mod);
191 if (set_kernel_mod) {
192 auto cur_node = task_iter->second.node;
193 MS_EXCEPTION_IF_NULL(cur_node);
194 if (AnfAlgo::IsDynamicShape(cur_node) && (build_ret.empty() || build_ret.find("vars") == std::string::npos)) {
195 MS_LOG(EXCEPTION) << "Build failed. The build result of dynamic shape op [" << AnfAlgo::GetCNodeName(cur_node)
196 << "] should not be empty, or can not find key ['vars'] in the result. build_res:[" << build_ret
197 << "].";
198 }
199 AnfAlgo::SetKernelMod(kernel_mod, cur_node.get());
200 MS_LOG(INFO) << json_name << ": save compile info to json file, compile_info:" << build_ret;
201 std::string old_build = common::GetEnv("MS_OLD_BUILD_PROCESS");
202 if (!old_build.empty()) {
203 AnfAlgo::SetNodeAttr(kAttrCompileInfo, MakeValue(build_ret), cur_node);
204 } else {
205 bool save_flag = true;
206 TbeUtils::SaveCompileInfo(json_name, build_ret, &save_flag);
207 if (!save_flag) {
208 MS_LOG(EXCEPTION) << "Save json file failed, compile_info:" << build_ret;
209 }
210 }
211 }
212 auto ret = std::make_pair(task_iter->second.scope_id, kernel_mod);
213 (void)task_map_.erase(task_iter);
214 MS_LOG(INFO) << "wait process remain task_num:" << task_map_.size();
215 return ret;
216 }
217
SaveSameOpInfo(const mindspore::AnfNodePtr & anf_node,const std::string & json_name,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list)218 void ParallelBuildManager::SaveSameOpInfo(const mindspore::AnfNodePtr &anf_node, const std::string &json_name,
219 const std::vector<size_t> &input_size_list,
220 const std::vector<size_t> &output_size_list) {
221 struct KernelBuildTaskInfo task_info;
222 task_info.node = anf_node;
223 task_info.json_name = json_name;
224 task_info.processor = tbe::GetProcessor(anf_node);
225 task_info.input_size_list.assign(input_size_list.begin(), input_size_list.end());
226 task_info.output_size_list.assign(output_size_list.begin(), output_size_list.end());
227 same_op_list_.push_back(task_info);
228 }
229
SaveSameFusionOpInfo(const int64_t scope_id,const std::string & json_name,const std::string & processor,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list)230 void ParallelBuildManager::SaveSameFusionOpInfo(const int64_t scope_id, const std::string &json_name,
231 const std::string &processor,
232 const std::vector<size_t> &input_size_list,
233 const std::vector<size_t> &output_size_list) {
234 struct KernelBuildTaskInfo task_info;
235 task_info.scope_id = scope_id;
236 task_info.json_name = json_name;
237 task_info.processor = processor;
238 task_info.input_size_list.assign(input_size_list.begin(), input_size_list.end());
239 task_info.output_size_list.assign(output_size_list.begin(), output_size_list.end());
240 same_op_list_.push_back(task_info);
241 }
242
GenSameOpKernelMod() const243 bool ParallelBuildManager::GenSameOpKernelMod() const {
244 for (const auto &task_info : same_op_list_) {
245 bool ret =
246 SearchInCache(task_info.json_name, task_info.input_size_list, task_info.output_size_list, task_info.node.get());
247 if (!ret) {
248 MS_LOG(INFO) << "can't find " << task_info.json_name << " in cache.";
249 return false;
250 }
251 }
252 return true;
253 }
254
GenSameFusionOpKernelMod(std::map<int64_t,KernelModPtr> * kernel_mode_ret) const255 bool ParallelBuildManager::GenSameFusionOpKernelMod(std::map<int64_t, KernelModPtr> *kernel_mode_ret) const {
256 MS_EXCEPTION_IF_NULL(kernel_mode_ret);
257 bool ret = true;
258 for (const auto &task_info : same_op_list_) {
259 auto kernel_pack = TbeUtils::SearchCache(task_info.json_name);
260 if (kernel_pack != nullptr) {
261 auto kernel_mode = GenKernelMod(task_info.input_size_list, task_info.output_size_list, kernel_pack);
262 if (kernel_mode != nullptr) {
263 (*kernel_mode_ret)[task_info.scope_id] = kernel_mode;
264 continue;
265 }
266 }
267 MS_LOG(INFO) << "can't find " << task_info.json_name << " in cache.";
268 ret = false;
269 }
270 return ret;
271 }
272
SearchInCache(const std::string & json_name,const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list,mindspore::AnfNode * node) const273 bool ParallelBuildManager::SearchInCache(const std::string &json_name, const std::vector<size_t> &input_size_list,
274 const std::vector<size_t> &output_size_list, mindspore::AnfNode *node) const {
275 auto cached_kernel_pack = TbeUtils::SearchCache(json_name);
276 if (cached_kernel_pack != nullptr) {
277 auto kernel_mod_ptr = GenKernelMod(input_size_list, output_size_list, cached_kernel_pack);
278 MS_EXCEPTION_IF_NULL(kernel_mod_ptr);
279 AnfAlgo::SetKernelMod(kernel_mod_ptr, node);
280 return true;
281 } else {
282 return false;
283 }
284 }
285
GenKernelMod(const std::vector<size_t> & input_size_list,const std::vector<size_t> & output_size_list,const mindspore::kernel::KernelPackPtr & kernel_pack) const286 KernelModPtr ParallelBuildManager::GenKernelMod(const std::vector<size_t> &input_size_list,
287 const std::vector<size_t> &output_size_list,
288 const mindspore::kernel::KernelPackPtr &kernel_pack) const {
289 MS_EXCEPTION_IF_NULL(kernel_pack);
290 auto kernel_json_info = kernel_pack->kernel_json_info();
291 auto kernel_mod_ptr = std::make_shared<TbeKernelMod>(kernel_pack);
292 MS_EXCEPTION_IF_NULL(kernel_mod_ptr);
293 kernel_mod_ptr->SetInputSizeList(input_size_list);
294 kernel_mod_ptr->SetOutputSizeList(output_size_list);
295 kernel_mod_ptr->SetWorkspaceSizeList(kernel_json_info.workspaces);
296 return kernel_mod_ptr;
297 }
298
StartCompileOp(const nlohmann::json & kernel_json)299 int ParallelBuildManager::StartCompileOp(const nlohmann::json &kernel_json) {
300 auto tune_mode = kernel_json["SocInfo"]["autoTilingMode"];
301 return AscendKernelBuildClient::Instance().TbeStart(kernel_json.dump(), tune_mode);
302 }
303
ProcessTbeJob(const nlohmann::json & kernel_json)304 std::string ParallelBuildManager::ProcessTbeJob(const nlohmann::json &kernel_json) {
305 return AscendKernelBuildClient::Instance().TbeSendJob(kernel_json.dump());
306 }
307
WaitOne(int * task_id,std::string * task_result,std::string * pre_build_result)308 bool ParallelBuildManager::WaitOne(int *task_id, std::string *task_result, std::string *pre_build_result) {
309 MS_EXCEPTION_IF_NULL(task_id);
310 return AscendKernelBuildClient::Instance().TbeWait(task_id, task_result, pre_build_result);
311 }
312
ResetTaskInfo()313 void ParallelBuildManager::ResetTaskInfo() noexcept {
314 task_map_.clear();
315 same_op_list_.clear();
316 pre_build_task_map_.clear();
317 }
318
GetAnfNodeByTaskID(int32_t task_id)319 AnfNodePtr ParallelBuildManager::GetAnfNodeByTaskID(int32_t task_id) {
320 auto find_iter = task_map_.find(task_id);
321 if (find_iter != task_map_.end()) {
322 return find_iter->second.node;
323 }
324 return nullptr;
325 }
326 } // namespace kernel
327 } // namespace mindspore
328