1 /**
2 * Copyright 2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "kernel/graph_kernel/graph_kernel_builder.h"
17
18 #include <fcntl.h>
19 #include <sys/shm.h>
20 #include <unistd.h>
21 #include <algorithm>
22 #include <chrono>
23 #include <iostream>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <unordered_set>
28 #include <utility>
29 #include <vector>
30 #include "backend/common/graph_kernel/graph_kernel_flags.h"
31 #include "include/backend/anf_runtime_algorithm.h"
32 #include "include/common/debug/common.h"
33 #include "include/common/utils/anfalgo.h"
34 #include "ir/func_graph.h"
35 #include "kernel/framework_utils.h"
36 #include "kernel/graph_kernel/graph_kernel_json_generator.h"
37 #include "mindspore/core/ops/framework_ops.h"
38 #include "utils/file_utils.h"
39
40 namespace mindspore {
41 namespace kernel {
42 constexpr int32_t MAX_ERROR_LEN = 1024;
43 constexpr int32_t PROCESS_NUM = 16;
44 constexpr int32_t TIME_OUT = 300;
45 constexpr auto kLogLevel = "log_level";
46
47 #define ACQUIRE_LOCK LockMng lock(fd_, __func__, __LINE__)
48
GetErrorInfo()49 inline std::string GetErrorInfo() {
50 char buf[MAX_ERROR_LEN + 1] = {0};
51 auto ret = strerror_r(errno, buf, MAX_ERROR_LEN);
52 #if (_POSIX_C_SOURCE >= 200112L) && !_GNU_SOURCE
53 if (ret != 0 || strlen(buf) == 0) {
54 return "Call strerror_r failed";
55 }
56
57 return std::string(buf);
58 #else
59 return ret != nullptr ? std::string(ret) : "Failed to get error info";
60 #endif
61 }
62
TryLock() const63 bool KernelPool::LockMng::TryLock() const {
64 // Try to lock trial times. Return errno if lock unsuccessfully
65 uint32_t trial = 2000;
66 const uint32_t sleep_time_us = 5000;
67
68 int32_t ret;
69 while (trial > 0) {
70 ret = lockf(fd_, F_TLOCK, 0);
71 if (ret == 0 || (errno != EACCES && errno != EAGAIN)) {
72 break;
73 }
74
75 trial--;
76 (void)usleep(sleep_time_us);
77 }
78
79 if (ret == -1) {
80 MS_LOG(ERROR) << "Failed to acquire the lock, error msg:" << GetErrorInfo() << ", left trying times: " << trial;
81 return false;
82 }
83
84 MS_LOG(INFO) << "KernelBuild successfully acquire lock called at " << calling_position_;
85 return true;
86 }
87
Unlock() const88 void KernelPool::LockMng::Unlock() const noexcept {
89 auto ret = lockf(fd_, F_ULOCK, 0);
90 if (ret == -1) {
91 MS_LOG(ERROR) << "Failed to release the lock, error msg:" << GetErrorInfo();
92 }
93 MS_LOG(INFO) << "KernelBuild successfully release lock called at " << calling_position_;
94 }
95
GetTmpKeyPath() const96 std::string KernelPool::GetTmpKeyPath() const {
97 std::string config_path = MsContext::GetInstance()->get_param<std::string>(MS_CTX_COMPILE_CACHE_PATH);
98 if (config_path.empty()) {
99 config_path = common::GetEnv(kCompilerCachePath);
100 }
101 if (config_path.empty()) {
102 char cwd[PATH_MAX];
103 char *ret = getcwd(cwd, sizeof(cwd));
104 if (ret == nullptr) {
105 MS_LOG(ERROR) << "Get current work directory failed, error msg:" << GetErrorInfo();
106 return "";
107 }
108 config_path = std::string(cwd);
109 }
110 auto real_path = FileUtils::GetRealPath(common::SafeCStr(config_path));
111 if (!real_path.has_value()) {
112 MS_LOG(ERROR) << "Change to realpath failed, error msg:" << GetErrorInfo();
113 return "";
114 }
115 return real_path.value();
116 }
117
CreateSharedMem(const std::string & path)118 void *KernelPool::CreateSharedMem(const std::string &path) {
119 is_creator_ = false;
120
121 auto hash_id = std::hash<std::string>()(path);
122 auto key_id = static_cast<key_t>(hash_id);
123 const size_t min_mem_size = 512;
124 auto mem_size = sizeof(size_t) * kListNum_ * (kMaxKernelNum_ + 1) + min_mem_size;
125
126 {
127 ACQUIRE_LOCK;
128 if (!lock.locked_) {
129 MS_LOG(ERROR) << "Failed to acquire lock.";
130 return nullptr;
131 }
132
133 // check if the shared memory exists or not.
134 // remove shared memory if exists and the nattach is 0
135 struct shmid_ds buf;
136 auto id = shmget(key_id, mem_size, 0);
137 if (id != -1) {
138 auto ret = shmctl(id, IPC_STAT, &buf);
139 if (ret == -1) {
140 MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
141 return nullptr;
142 }
143
144 if (buf.shm_nattch == 0) {
145 ret = shmctl(id, IPC_RMID, nullptr);
146 if (ret < 0) {
147 MS_LOG(EXCEPTION) << "Release shared_mem failed, error msg:" << GetErrorInfo();
148 }
149 }
150 }
151 }
152
153 ACQUIRE_LOCK;
154 if (!lock.locked_) {
155 MS_LOG(ERROR) << "Failed to acquire lock.";
156 return nullptr;
157 }
158 const int kShmFlg = IPC_CREAT | IPC_EXCL | 0600;
159 shm_id_ = shmget(key_id, mem_size, kShmFlg);
160 if (shm_id_ == -1) {
161 if (errno == EEXIST) {
162 MS_LOG(INFO) << "akg_build_tmp.key already exist.";
163 shm_id_ = shmget(key_id, mem_size, 0);
164 }
165
166 if (shm_id_ == -1) {
167 MS_LOG(ERROR) << "Create shared_mem failed, error msg:" << GetErrorInfo();
168 return nullptr;
169 }
170 } else {
171 is_creator_ = true;
172 }
173
174 auto local_addr = shmat(shm_id_, nullptr, 0);
175 if (local_addr == reinterpret_cast<void *>(-1)) {
176 MS_LOG(ERROR) << "Attach to shared_mem failed, error msg:" << GetErrorInfo();
177 return nullptr;
178 }
179
180 if (is_creator_) {
181 if (memset_s(local_addr, mem_size, 0, mem_size) != EOK) {
182 MS_LOG(ERROR) << "Failed to call memset_s.";
183 return nullptr;
184 }
185 }
186
187 return local_addr;
188 }
189
Init(const std::vector<JsonNodePair> & build_args)190 int32_t KernelPool::Init(const std::vector<JsonNodePair> &build_args) {
191 auto cp = GetTmpKeyPath();
192 if (cp.empty()) {
193 return -1;
194 }
195
196 auto file_name = cp + "/" + std::string(kKeyName_);
197 fd_ = open(file_name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
198 if (fd_ == -1) {
199 MS_LOG(ERROR) << "open file <" << file_name << "> failed, error msg:" << GetErrorInfo();
200 return -1;
201 }
202
203 auto addr = CreateSharedMem(cp);
204 if (addr == nullptr) {
205 return -1;
206 }
207
208 InitKernelLists(addr);
209
210 auto ret = AddKernels(build_args);
211 if (ret != 0) {
212 MS_LOG(ERROR) << "KernelPool AddKernels failed.";
213 return -1;
214 }
215
216 return 0;
217 }
218
Release() const219 int32_t KernelPool::Release() const {
220 {
221 ACQUIRE_LOCK;
222 if (!lock.locked_) {
223 MS_LOG(ERROR) << "Failed to acquire lock.";
224 return -1;
225 }
226
227 struct shmid_ds buf;
228 auto ret = shmctl(shm_id_, IPC_STAT, &buf);
229 if (ret == -1) {
230 MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
231 return -1;
232 }
233
234 bool need_delete_by_last = false;
235
236 // if the creator exits unexpectedly and fails to delete the shm, the last process will try to delete the shm
237 if (((buf.shm_perm.mode & SHM_DEST) == 0) && (buf.shm_nattch == 1)) {
238 need_delete_by_last = true;
239 }
240
241 // Detach shared memory
242 ret = shmdt(static_cast<void *>(kernel_lists_[0]));
243 if (ret < 0) {
244 MS_LOG(ERROR) << "Shared_mem detach failed, error msg:" << GetErrorInfo();
245 return -1;
246 }
247
248 // Release shared memory
249 if (is_creator_ || need_delete_by_last) {
250 ret = shmctl(shm_id_, IPC_RMID, nullptr);
251 if (ret < 0) {
252 MS_LOG(ERROR) << "Release shared_mem failed, error msg:" << GetErrorInfo();
253 return -1;
254 }
255 }
256 }
257
258 return 0;
259 }
260
AddKernels(const std::vector<JsonNodePair> & build_args)261 int32_t KernelPool::AddKernels(const std::vector<JsonNodePair> &build_args) {
262 ACQUIRE_LOCK;
263 if (!lock.locked_) {
264 MS_LOG(ERROR) << "Failed to acquire lock.";
265 return -1;
266 }
267
268 std::set<size_t> todo_list(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_));
269 std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
270 std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
271
272 for (const auto &[json_generator, anf_node] : build_args) {
273 MS_EXCEPTION_IF_NULL(anf_node);
274 auto kernel_name = json_generator.kernel_name();
275
276 auto hash_id = std::hash<std::string>()(kernel_name);
277 if (self_kernel_ids_.count(hash_id) != 0) {
278 MS_LOG(ERROR) << "Duplicated kernel found in the kernel compiling list. kernel_name[" << kernel_name << "]";
279 return -1;
280 }
281
282 (void)self_kernel_ids_.emplace(hash_id);
283 }
284
285 std::set<size_t> diff_from_todo;
286 std::set<size_t> diff_from_doing;
287 std::set<size_t> diff_from_done;
288
289 // add the unique kernel only once, so need to check if it exists in todo_list, doing_list, or done_list
290 (void)std::set_difference(self_kernel_ids_.begin(), self_kernel_ids_.end(), todo_list.begin(), todo_list.end(),
291 std::inserter(diff_from_todo, diff_from_todo.begin()));
292 (void)std::set_difference(diff_from_todo.begin(), diff_from_todo.end(), doing_list.begin(), doing_list.end(),
293 std::inserter(diff_from_doing, diff_from_doing.begin()));
294 (void)std::set_difference(diff_from_doing.begin(), diff_from_doing.end(), done_list.begin(), done_list.end(),
295 std::inserter(diff_from_done, diff_from_done.begin()));
296
297 auto new_kernel_size = diff_from_done.size();
298 if (new_kernel_size + todo_list.size() > static_cast<size_t>(kMaxKernelNum_)) {
299 MS_LOG(ERROR) << "The size of kernels is " << new_kernel_size << ", while the left space of the pool is "
300 << kMaxKernelNum_ - todo_list.size();
301 return -1;
302 }
303
304 (void)std::copy(diff_from_done.begin(), diff_from_done.end(), ListEnd(kToDoIdx_));
305 IncListSize(kToDoIdx_, new_kernel_size);
306
307 return 0;
308 }
309
FetchKernels(std::set<size_t> * out)310 int32_t KernelPool::FetchKernels(std::set<size_t> *out) {
311 ACQUIRE_LOCK;
312 if (!lock.locked_) {
313 MS_LOG(ERROR) << "Failed to acquire lock.";
314 return -1;
315 }
316
317 std::set<size_t> left_in_todo_list;
318
319 // filter out kernels which does not belongs to this process
320 auto FilterBySelfList = [&left_in_todo_list, &out, this](size_t id) {
321 if (this->self_kernel_ids_.count(id) != 0) {
322 (void)out->emplace(id);
323 } else {
324 (void)left_in_todo_list.emplace(id);
325 }
326 };
327
328 (void)std::for_each(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_), FilterBySelfList);
329
330 (void)std::copy(out->begin(), out->end(), ListEnd(kDoingIdx_));
331 IncListSize(kDoingIdx_, out->size());
332
333 (void)std::copy(left_in_todo_list.begin(), left_in_todo_list.end(), ListBegin(kToDoIdx_));
334 ResetListSize(kToDoIdx_, left_in_todo_list.size());
335
336 return 0;
337 }
338
UpdateAndWait(const std::set<size_t> & ids)339 int32_t KernelPool::UpdateAndWait(const std::set<size_t> &ids) {
340 if (!ids.empty()) {
341 ACQUIRE_LOCK;
342 if (!lock.locked_) {
343 MS_LOG(ERROR) << "Failed to acquire lock.";
344 return -1;
345 }
346
347 // update the state of finished kernels to `done`
348 (void)std::copy(ids.begin(), ids.end(), ListEnd(kDoneIdx_));
349 IncListSize(kDoneIdx_, ids.size());
350
351 // delete the finished kernels from doing_list
352 std::vector<size_t> left_in_doing_list;
353 std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
354 (void)std::set_difference(doing_list.begin(), doing_list.end(), ids.begin(), ids.end(),
355 std::inserter(left_in_doing_list, left_in_doing_list.begin()));
356
357 (void)std::copy(left_in_doing_list.begin(), left_in_doing_list.end(), ListBegin(kDoingIdx_));
358 ResetListSize(kDoingIdx_, left_in_doing_list.size());
359 }
360
361 auto ret = Wait();
362 if (ret != 0) {
363 MS_LOG(ERROR) << "KernelPool Wait failed.";
364 return -1;
365 }
366
367 return 0;
368 }
369
Wait() const370 int32_t KernelPool::Wait() const {
371 // wait until all the kernels which belong to this process finish compiling
372 uint32_t trials = 1000;
373 const uint32_t sleep_time_us = 1000000;
374
375 while (trials > 0) {
376 {
377 ACQUIRE_LOCK;
378 if (!lock.locked_) {
379 MS_LOG(ERROR) << "Failed to acquire lock.";
380 return -1;
381 }
382
383 std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
384
385 if (std::all_of(self_kernel_ids_.begin(), self_kernel_ids_.end(),
386 [&done_list](size_t id) { return done_list.count(id) != 0; })) {
387 return 0;
388 }
389 }
390
391 (void)usleep(sleep_time_us);
392 trials--;
393 }
394
395 MS_LOG(ERROR) << "Time out while wait kernel compiling";
396 return -1;
397 }
398
SearchKernelCache(const std::string & kernel_name)399 KernelPackPtr GraphKernelBuilder::SearchKernelCache(const std::string &kernel_name) {
400 auto processor = GetStrProcessorFromContext();
401 return SearchCache(kernel_name, processor);
402 }
403
InsertKernelCache(const std::string & kernel_name)404 KernelPackPtr GraphKernelBuilder::InsertKernelCache(const std::string &kernel_name) {
405 auto processor = GetStrProcessorFromContext();
406 return InsertCache(kernel_name, processor);
407 }
408
GetNotCachedKernels(const std::vector<JsonNodePair> & build_args)409 std::vector<JsonNodePair> GraphKernelBuilder::GetNotCachedKernels(const std::vector<JsonNodePair> &build_args) {
410 LoadCache();
411 std::unordered_set<std::string> kernel_name_set;
412 std::vector<JsonNodePair> new_build_args;
413 for (const auto &[json_generator, anf_node] : build_args) {
414 MS_EXCEPTION_IF_NULL(anf_node);
415 auto kernel_name = json_generator.kernel_name();
416
417 auto cached_kernel_pack = SearchKernelCache(kernel_name);
418 if (cached_kernel_pack != nullptr) {
419 MS_LOG(DEBUG) << "Use cached kernel, kernel_name[" << kernel_name << "], fullname_with_scope["
420 << anf_node->fullname_with_scope() << "].";
421 SetKernelMod(cached_kernel_pack, json_generator, anf_node);
422 continue;
423 }
424
425 if (kernel_name_set.count(kernel_name) != 0) {
426 (void)repeat_nodes_.emplace_back(json_generator, anf_node);
427 continue;
428 }
429 (void)kernel_name_set.insert(kernel_name);
430 (void)new_build_args.emplace_back(json_generator, anf_node);
431 }
432 return new_build_args;
433 }
434
InsertToCache(const std::vector<JsonNodePair> & build_args)435 bool GraphKernelBuilder::InsertToCache(const std::vector<JsonNodePair> &build_args) {
436 for (const auto &[json_generator, anf_node] : build_args) {
437 auto kernel_name = json_generator.kernel_name();
438 auto new_kernel_pack = InsertKernelCache(kernel_name);
439 if (new_kernel_pack == nullptr) {
440 MS_LOG(ERROR) << "Insert to cache failed, kernel_name[" << kernel_name << "], fullname_with_scope["
441 << anf_node->fullname_with_scope() << "].";
442 return false;
443 }
444 SetKernelMod(new_kernel_pack, json_generator, anf_node);
445 MS_LOG(DEBUG) << "Kernel compiler compile " << kernel_name << " kernel and insert cache successfully!";
446 }
447 return true;
448 }
449
HandleRepeatNodes()450 bool GraphKernelBuilder::HandleRepeatNodes() {
451 for (const auto &[json_generator, anf_node] : repeat_nodes_) {
452 auto kernel_name = json_generator.kernel_name();
453 auto cached_kernel_pack = SearchKernelCache(kernel_name);
454 if (cached_kernel_pack == nullptr) {
455 MS_LOG(ERROR) << "Kernel is not found in cache, kernel_name[" << kernel_name << "], fullname_with_scope["
456 << anf_node->fullname_with_scope() << "].";
457 return false;
458 }
459 MS_LOG(DEBUG) << "Use the cached kernel found in cache, kernel_name[" << kernel_name << "], fullname_with_scope["
460 << anf_node->fullname_with_scope() << "].";
461 SetKernelMod(cached_kernel_pack, json_generator, anf_node);
462 }
463 return true;
464 }
465
GetKernelJsonsByHashId(const std::vector<JsonNodePair> & build_args,const std::set<size_t> & fetched_ids)466 std::vector<std::string> GraphKernelBuilder::GetKernelJsonsByHashId(const std::vector<JsonNodePair> &build_args,
467 const std::set<size_t> &fetched_ids) {
468 std::vector<std::string> jsons;
469 for (const auto &[json_generator, anf_node] : build_args) {
470 MS_EXCEPTION_IF_NULL(anf_node);
471 auto kernel_name = json_generator.kernel_name();
472 auto hash_id = std::hash<std::string>()(kernel_name);
473 if (fetched_ids.count(hash_id) == 0) {
474 continue;
475 }
476 auto kernel_json = json_generator.kernel_json_str();
477 SaveJsonInfo(kernel_name, kernel_json);
478 jsons.push_back(kernel_json);
479 }
480 return jsons;
481 }
482
LoadCache()483 void GraphKernelBuilder::LoadCache() {
484 static bool has_load = false;
485 if (has_load) {
486 return;
487 }
488 auto bin_map = KernelMeta::GetInstance();
489 auto kernel_dir = bin_map->kernel_meta_path();
490 DIR *dir = opendir(kernel_dir.c_str());
491 if (dir == nullptr) {
492 MS_LOG(DEBUG) << "kernel dir [" << kernel_dir << "] not exist";
493 return;
494 }
495 struct dirent *entry;
496 constexpr size_t SUFFIX_LENS = 5;
497 while ((entry = readdir(dir)) != nullptr) {
498 std::string kernel_json = entry->d_name;
499 if (kernel_json.length() <= SUFFIX_LENS) {
500 continue;
501 }
502 auto suffix = kernel_json.substr(kernel_json.length() - SUFFIX_LENS);
503 if (suffix != kJsonSuffix) {
504 continue;
505 }
506 auto sp = kernel_json.rfind('/');
507 if (sp != std::string::npos) {
508 continue;
509 }
510 auto kernel_name = kernel_json.substr(0, kernel_json.length() - SUFFIX_LENS);
511 (void)bin_map->Insert(kernel_name, kernel_dir + kernel_json);
512 }
513 has_load = true;
514 (void)closedir(dir);
515 return;
516 }
517
CollectBuildAttrs()518 std::string GraphKernelBuilder::CollectBuildAttrs() {
519 auto &flags = graphkernel::GraphKernelFlags::GetInstance();
520 if (!flags.enable_vectorization) {
521 build_attrs_["enable_vectorization"] = flags.enable_vectorization;
522 }
523 if (flags.online_tuning > 0) {
524 build_attrs_["online_tuning"] = flags.online_tuning;
525 }
526 if (!flags.repository_path.empty()) {
527 build_attrs_["repository_path"] = flags.repository_path;
528 }
529 auto compile_cache = GetCompilerCachePath();
530 if (!compile_cache.empty()) {
531 build_attrs_["compile_cache"] = compile_cache;
532 }
533 return build_attrs_.empty() ? "" : build_attrs_.dump();
534 }
535 } // namespace kernel
536 } // namespace mindspore
537