• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "kernel/graph_kernel/graph_kernel_builder.h"
17 
18 #include <fcntl.h>
19 #include <sys/shm.h>
20 #include <unistd.h>
21 #include <algorithm>
22 #include <chrono>
23 #include <iostream>
24 #include <map>
25 #include <memory>
26 #include <string>
27 #include <unordered_set>
28 #include <utility>
29 #include <vector>
30 #include "backend/common/graph_kernel/graph_kernel_flags.h"
31 #include "include/backend/anf_runtime_algorithm.h"
32 #include "include/common/debug/common.h"
33 #include "include/common/utils/anfalgo.h"
34 #include "ir/func_graph.h"
35 #include "kernel/framework_utils.h"
36 #include "kernel/graph_kernel/graph_kernel_json_generator.h"
37 #include "mindspore/core/ops/framework_ops.h"
38 #include "utils/file_utils.h"
39 
40 namespace mindspore {
41 namespace kernel {
42 constexpr int32_t MAX_ERROR_LEN = 1024;
43 constexpr int32_t PROCESS_NUM = 16;
44 constexpr int32_t TIME_OUT = 300;
45 constexpr auto kLogLevel = "log_level";
46 
47 #define ACQUIRE_LOCK LockMng lock(fd_, __func__, __LINE__)
48 
GetErrorInfo()49 inline std::string GetErrorInfo() {
50   char buf[MAX_ERROR_LEN + 1] = {0};
51   auto ret = strerror_r(errno, buf, MAX_ERROR_LEN);
52 #if (_POSIX_C_SOURCE >= 200112L) && !_GNU_SOURCE
53   if (ret != 0 || strlen(buf) == 0) {
54     return "Call strerror_r failed";
55   }
56 
57   return std::string(buf);
58 #else
59   return ret != nullptr ? std::string(ret) : "Failed to get error info";
60 #endif
61 }
62 
TryLock() const63 bool KernelPool::LockMng::TryLock() const {
64   // Try to lock trial times. Return errno if lock unsuccessfully
65   uint32_t trial = 2000;
66   const uint32_t sleep_time_us = 5000;
67 
68   int32_t ret;
69   while (trial > 0) {
70     ret = lockf(fd_, F_TLOCK, 0);
71     if (ret == 0 || (errno != EACCES && errno != EAGAIN)) {
72       break;
73     }
74 
75     trial--;
76     (void)usleep(sleep_time_us);
77   }
78 
79   if (ret == -1) {
80     MS_LOG(ERROR) << "Failed to acquire the lock, error msg:" << GetErrorInfo() << ", left trying times: " << trial;
81     return false;
82   }
83 
84   MS_LOG(INFO) << "KernelBuild successfully acquire lock called at " << calling_position_;
85   return true;
86 }
87 
Unlock() const88 void KernelPool::LockMng::Unlock() const noexcept {
89   auto ret = lockf(fd_, F_ULOCK, 0);
90   if (ret == -1) {
91     MS_LOG(ERROR) << "Failed to release the lock, error msg:" << GetErrorInfo();
92   }
93   MS_LOG(INFO) << "KernelBuild successfully release lock called at " << calling_position_;
94 }
95 
GetTmpKeyPath() const96 std::string KernelPool::GetTmpKeyPath() const {
97   std::string config_path = MsContext::GetInstance()->get_param<std::string>(MS_CTX_COMPILE_CACHE_PATH);
98   if (config_path.empty()) {
99     config_path = common::GetEnv(kCompilerCachePath);
100   }
101   if (config_path.empty()) {
102     char cwd[PATH_MAX];
103     char *ret = getcwd(cwd, sizeof(cwd));
104     if (ret == nullptr) {
105       MS_LOG(ERROR) << "Get current work directory failed, error msg:" << GetErrorInfo();
106       return "";
107     }
108     config_path = std::string(cwd);
109   }
110   auto real_path = FileUtils::GetRealPath(common::SafeCStr(config_path));
111   if (!real_path.has_value()) {
112     MS_LOG(ERROR) << "Change to realpath failed, error msg:" << GetErrorInfo();
113     return "";
114   }
115   return real_path.value();
116 }
117 
CreateSharedMem(const std::string & path)118 void *KernelPool::CreateSharedMem(const std::string &path) {
119   is_creator_ = false;
120 
121   auto hash_id = std::hash<std::string>()(path);
122   auto key_id = static_cast<key_t>(hash_id);
123   const size_t min_mem_size = 512;
124   auto mem_size = sizeof(size_t) * kListNum_ * (kMaxKernelNum_ + 1) + min_mem_size;
125 
126   {
127     ACQUIRE_LOCK;
128     if (!lock.locked_) {
129       MS_LOG(ERROR) << "Failed to acquire lock.";
130       return nullptr;
131     }
132 
133     // check if the shared memory exists or not.
134     // remove shared memory if exists and the nattach is 0
135     struct shmid_ds buf;
136     auto id = shmget(key_id, mem_size, 0);
137     if (id != -1) {
138       auto ret = shmctl(id, IPC_STAT, &buf);
139       if (ret == -1) {
140         MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
141         return nullptr;
142       }
143 
144       if (buf.shm_nattch == 0) {
145         ret = shmctl(id, IPC_RMID, nullptr);
146         if (ret < 0) {
147           MS_LOG(EXCEPTION) << "Release shared_mem failed, error msg:" << GetErrorInfo();
148         }
149       }
150     }
151   }
152 
153   ACQUIRE_LOCK;
154   if (!lock.locked_) {
155     MS_LOG(ERROR) << "Failed to acquire lock.";
156     return nullptr;
157   }
158   const int kShmFlg = IPC_CREAT | IPC_EXCL | 0600;
159   shm_id_ = shmget(key_id, mem_size, kShmFlg);
160   if (shm_id_ == -1) {
161     if (errno == EEXIST) {
162       MS_LOG(INFO) << "akg_build_tmp.key already exist.";
163       shm_id_ = shmget(key_id, mem_size, 0);
164     }
165 
166     if (shm_id_ == -1) {
167       MS_LOG(ERROR) << "Create shared_mem failed, error msg:" << GetErrorInfo();
168       return nullptr;
169     }
170   } else {
171     is_creator_ = true;
172   }
173 
174   auto local_addr = shmat(shm_id_, nullptr, 0);
175   if (local_addr == reinterpret_cast<void *>(-1)) {
176     MS_LOG(ERROR) << "Attach to shared_mem failed, error msg:" << GetErrorInfo();
177     return nullptr;
178   }
179 
180   if (is_creator_) {
181     if (memset_s(local_addr, mem_size, 0, mem_size) != EOK) {
182       MS_LOG(ERROR) << "Failed to call memset_s.";
183       return nullptr;
184     }
185   }
186 
187   return local_addr;
188 }
189 
Init(const std::vector<JsonNodePair> & build_args)190 int32_t KernelPool::Init(const std::vector<JsonNodePair> &build_args) {
191   auto cp = GetTmpKeyPath();
192   if (cp.empty()) {
193     return -1;
194   }
195 
196   auto file_name = cp + "/" + std::string(kKeyName_);
197   fd_ = open(file_name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
198   if (fd_ == -1) {
199     MS_LOG(ERROR) << "open file <" << file_name << "> failed, error msg:" << GetErrorInfo();
200     return -1;
201   }
202 
203   auto addr = CreateSharedMem(cp);
204   if (addr == nullptr) {
205     return -1;
206   }
207 
208   InitKernelLists(addr);
209 
210   auto ret = AddKernels(build_args);
211   if (ret != 0) {
212     MS_LOG(ERROR) << "KernelPool AddKernels failed.";
213     return -1;
214   }
215 
216   return 0;
217 }
218 
Release() const219 int32_t KernelPool::Release() const {
220   {
221     ACQUIRE_LOCK;
222     if (!lock.locked_) {
223       MS_LOG(ERROR) << "Failed to acquire lock.";
224       return -1;
225     }
226 
227     struct shmid_ds buf;
228     auto ret = shmctl(shm_id_, IPC_STAT, &buf);
229     if (ret == -1) {
230       MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
231       return -1;
232     }
233 
234     bool need_delete_by_last = false;
235 
236     // if the creator exits unexpectedly and fails to delete the shm, the last process will try to delete the shm
237     if (((buf.shm_perm.mode & SHM_DEST) == 0) && (buf.shm_nattch == 1)) {
238       need_delete_by_last = true;
239     }
240 
241     // Detach shared memory
242     ret = shmdt(static_cast<void *>(kernel_lists_[0]));
243     if (ret < 0) {
244       MS_LOG(ERROR) << "Shared_mem detach failed, error msg:" << GetErrorInfo();
245       return -1;
246     }
247 
248     // Release shared memory
249     if (is_creator_ || need_delete_by_last) {
250       ret = shmctl(shm_id_, IPC_RMID, nullptr);
251       if (ret < 0) {
252         MS_LOG(ERROR) << "Release shared_mem failed, error msg:" << GetErrorInfo();
253         return -1;
254       }
255     }
256   }
257 
258   return 0;
259 }
260 
AddKernels(const std::vector<JsonNodePair> & build_args)261 int32_t KernelPool::AddKernels(const std::vector<JsonNodePair> &build_args) {
262   ACQUIRE_LOCK;
263   if (!lock.locked_) {
264     MS_LOG(ERROR) << "Failed to acquire lock.";
265     return -1;
266   }
267 
268   std::set<size_t> todo_list(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_));
269   std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
270   std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
271 
272   for (const auto &[json_generator, anf_node] : build_args) {
273     MS_EXCEPTION_IF_NULL(anf_node);
274     auto kernel_name = json_generator.kernel_name();
275 
276     auto hash_id = std::hash<std::string>()(kernel_name);
277     if (self_kernel_ids_.count(hash_id) != 0) {
278       MS_LOG(ERROR) << "Duplicated kernel found in the kernel compiling list. kernel_name[" << kernel_name << "]";
279       return -1;
280     }
281 
282     (void)self_kernel_ids_.emplace(hash_id);
283   }
284 
285   std::set<size_t> diff_from_todo;
286   std::set<size_t> diff_from_doing;
287   std::set<size_t> diff_from_done;
288 
289   // add the unique kernel only once, so need to check if it exists in todo_list, doing_list, or done_list
290   (void)std::set_difference(self_kernel_ids_.begin(), self_kernel_ids_.end(), todo_list.begin(), todo_list.end(),
291                             std::inserter(diff_from_todo, diff_from_todo.begin()));
292   (void)std::set_difference(diff_from_todo.begin(), diff_from_todo.end(), doing_list.begin(), doing_list.end(),
293                             std::inserter(diff_from_doing, diff_from_doing.begin()));
294   (void)std::set_difference(diff_from_doing.begin(), diff_from_doing.end(), done_list.begin(), done_list.end(),
295                             std::inserter(diff_from_done, diff_from_done.begin()));
296 
297   auto new_kernel_size = diff_from_done.size();
298   if (new_kernel_size + todo_list.size() > static_cast<size_t>(kMaxKernelNum_)) {
299     MS_LOG(ERROR) << "The size of kernels is " << new_kernel_size << ", while the left space of the pool is "
300                   << kMaxKernelNum_ - todo_list.size();
301     return -1;
302   }
303 
304   (void)std::copy(diff_from_done.begin(), diff_from_done.end(), ListEnd(kToDoIdx_));
305   IncListSize(kToDoIdx_, new_kernel_size);
306 
307   return 0;
308 }
309 
FetchKernels(std::set<size_t> * out)310 int32_t KernelPool::FetchKernels(std::set<size_t> *out) {
311   ACQUIRE_LOCK;
312   if (!lock.locked_) {
313     MS_LOG(ERROR) << "Failed to acquire lock.";
314     return -1;
315   }
316 
317   std::set<size_t> left_in_todo_list;
318 
319   // filter out kernels which does not belongs to this process
320   auto FilterBySelfList = [&left_in_todo_list, &out, this](size_t id) {
321     if (this->self_kernel_ids_.count(id) != 0) {
322       (void)out->emplace(id);
323     } else {
324       (void)left_in_todo_list.emplace(id);
325     }
326   };
327 
328   (void)std::for_each(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_), FilterBySelfList);
329 
330   (void)std::copy(out->begin(), out->end(), ListEnd(kDoingIdx_));
331   IncListSize(kDoingIdx_, out->size());
332 
333   (void)std::copy(left_in_todo_list.begin(), left_in_todo_list.end(), ListBegin(kToDoIdx_));
334   ResetListSize(kToDoIdx_, left_in_todo_list.size());
335 
336   return 0;
337 }
338 
UpdateAndWait(const std::set<size_t> & ids)339 int32_t KernelPool::UpdateAndWait(const std::set<size_t> &ids) {
340   if (!ids.empty()) {
341     ACQUIRE_LOCK;
342     if (!lock.locked_) {
343       MS_LOG(ERROR) << "Failed to acquire lock.";
344       return -1;
345     }
346 
347     // update the state of finished kernels to `done`
348     (void)std::copy(ids.begin(), ids.end(), ListEnd(kDoneIdx_));
349     IncListSize(kDoneIdx_, ids.size());
350 
351     // delete the finished kernels from doing_list
352     std::vector<size_t> left_in_doing_list;
353     std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
354     (void)std::set_difference(doing_list.begin(), doing_list.end(), ids.begin(), ids.end(),
355                               std::inserter(left_in_doing_list, left_in_doing_list.begin()));
356 
357     (void)std::copy(left_in_doing_list.begin(), left_in_doing_list.end(), ListBegin(kDoingIdx_));
358     ResetListSize(kDoingIdx_, left_in_doing_list.size());
359   }
360 
361   auto ret = Wait();
362   if (ret != 0) {
363     MS_LOG(ERROR) << "KernelPool Wait failed.";
364     return -1;
365   }
366 
367   return 0;
368 }
369 
Wait() const370 int32_t KernelPool::Wait() const {
371   // wait until all the kernels which belong to this process finish compiling
372   uint32_t trials = 1000;
373   const uint32_t sleep_time_us = 1000000;
374 
375   while (trials > 0) {
376     {
377       ACQUIRE_LOCK;
378       if (!lock.locked_) {
379         MS_LOG(ERROR) << "Failed to acquire lock.";
380         return -1;
381       }
382 
383       std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
384 
385       if (std::all_of(self_kernel_ids_.begin(), self_kernel_ids_.end(),
386                       [&done_list](size_t id) { return done_list.count(id) != 0; })) {
387         return 0;
388       }
389     }
390 
391     (void)usleep(sleep_time_us);
392     trials--;
393   }
394 
395   MS_LOG(ERROR) << "Time out while wait kernel compiling";
396   return -1;
397 }
398 
SearchKernelCache(const std::string & kernel_name)399 KernelPackPtr GraphKernelBuilder::SearchKernelCache(const std::string &kernel_name) {
400   auto processor = GetStrProcessorFromContext();
401   return SearchCache(kernel_name, processor);
402 }
403 
InsertKernelCache(const std::string & kernel_name)404 KernelPackPtr GraphKernelBuilder::InsertKernelCache(const std::string &kernel_name) {
405   auto processor = GetStrProcessorFromContext();
406   return InsertCache(kernel_name, processor);
407 }
408 
GetNotCachedKernels(const std::vector<JsonNodePair> & build_args)409 std::vector<JsonNodePair> GraphKernelBuilder::GetNotCachedKernels(const std::vector<JsonNodePair> &build_args) {
410   LoadCache();
411   std::unordered_set<std::string> kernel_name_set;
412   std::vector<JsonNodePair> new_build_args;
413   for (const auto &[json_generator, anf_node] : build_args) {
414     MS_EXCEPTION_IF_NULL(anf_node);
415     auto kernel_name = json_generator.kernel_name();
416 
417     auto cached_kernel_pack = SearchKernelCache(kernel_name);
418     if (cached_kernel_pack != nullptr) {
419       MS_LOG(DEBUG) << "Use cached kernel, kernel_name[" << kernel_name << "], fullname_with_scope["
420                     << anf_node->fullname_with_scope() << "].";
421       SetKernelMod(cached_kernel_pack, json_generator, anf_node);
422       continue;
423     }
424 
425     if (kernel_name_set.count(kernel_name) != 0) {
426       (void)repeat_nodes_.emplace_back(json_generator, anf_node);
427       continue;
428     }
429     (void)kernel_name_set.insert(kernel_name);
430     (void)new_build_args.emplace_back(json_generator, anf_node);
431   }
432   return new_build_args;
433 }
434 
InsertToCache(const std::vector<JsonNodePair> & build_args)435 bool GraphKernelBuilder::InsertToCache(const std::vector<JsonNodePair> &build_args) {
436   for (const auto &[json_generator, anf_node] : build_args) {
437     auto kernel_name = json_generator.kernel_name();
438     auto new_kernel_pack = InsertKernelCache(kernel_name);
439     if (new_kernel_pack == nullptr) {
440       MS_LOG(ERROR) << "Insert to cache failed, kernel_name[" << kernel_name << "], fullname_with_scope["
441                     << anf_node->fullname_with_scope() << "].";
442       return false;
443     }
444     SetKernelMod(new_kernel_pack, json_generator, anf_node);
445     MS_LOG(DEBUG) << "Kernel compiler compile " << kernel_name << " kernel and insert cache successfully!";
446   }
447   return true;
448 }
449 
HandleRepeatNodes()450 bool GraphKernelBuilder::HandleRepeatNodes() {
451   for (const auto &[json_generator, anf_node] : repeat_nodes_) {
452     auto kernel_name = json_generator.kernel_name();
453     auto cached_kernel_pack = SearchKernelCache(kernel_name);
454     if (cached_kernel_pack == nullptr) {
455       MS_LOG(ERROR) << "Kernel is not found in cache, kernel_name[" << kernel_name << "], fullname_with_scope["
456                     << anf_node->fullname_with_scope() << "].";
457       return false;
458     }
459     MS_LOG(DEBUG) << "Use the cached kernel found in cache, kernel_name[" << kernel_name << "], fullname_with_scope["
460                   << anf_node->fullname_with_scope() << "].";
461     SetKernelMod(cached_kernel_pack, json_generator, anf_node);
462   }
463   return true;
464 }
465 
GetKernelJsonsByHashId(const std::vector<JsonNodePair> & build_args,const std::set<size_t> & fetched_ids)466 std::vector<std::string> GraphKernelBuilder::GetKernelJsonsByHashId(const std::vector<JsonNodePair> &build_args,
467                                                                     const std::set<size_t> &fetched_ids) {
468   std::vector<std::string> jsons;
469   for (const auto &[json_generator, anf_node] : build_args) {
470     MS_EXCEPTION_IF_NULL(anf_node);
471     auto kernel_name = json_generator.kernel_name();
472     auto hash_id = std::hash<std::string>()(kernel_name);
473     if (fetched_ids.count(hash_id) == 0) {
474       continue;
475     }
476     auto kernel_json = json_generator.kernel_json_str();
477     SaveJsonInfo(kernel_name, kernel_json);
478     jsons.push_back(kernel_json);
479   }
480   return jsons;
481 }
482 
LoadCache()483 void GraphKernelBuilder::LoadCache() {
484   static bool has_load = false;
485   if (has_load) {
486     return;
487   }
488   auto bin_map = KernelMeta::GetInstance();
489   auto kernel_dir = bin_map->kernel_meta_path();
490   DIR *dir = opendir(kernel_dir.c_str());
491   if (dir == nullptr) {
492     MS_LOG(DEBUG) << "kernel dir [" << kernel_dir << "] not exist";
493     return;
494   }
495   struct dirent *entry;
496   constexpr size_t SUFFIX_LENS = 5;
497   while ((entry = readdir(dir)) != nullptr) {
498     std::string kernel_json = entry->d_name;
499     if (kernel_json.length() <= SUFFIX_LENS) {
500       continue;
501     }
502     auto suffix = kernel_json.substr(kernel_json.length() - SUFFIX_LENS);
503     if (suffix != kJsonSuffix) {
504       continue;
505     }
506     auto sp = kernel_json.rfind('/');
507     if (sp != std::string::npos) {
508       continue;
509     }
510     auto kernel_name = kernel_json.substr(0, kernel_json.length() - SUFFIX_LENS);
511     (void)bin_map->Insert(kernel_name, kernel_dir + kernel_json);
512   }
513   has_load = true;
514   (void)closedir(dir);
515   return;
516 }
517 
CollectBuildAttrs()518 std::string GraphKernelBuilder::CollectBuildAttrs() {
519   auto &flags = graphkernel::GraphKernelFlags::GetInstance();
520   if (!flags.enable_vectorization) {
521     build_attrs_["enable_vectorization"] = flags.enable_vectorization;
522   }
523   if (flags.online_tuning > 0) {
524     build_attrs_["online_tuning"] = flags.online_tuning;
525   }
526   if (!flags.repository_path.empty()) {
527     build_attrs_["repository_path"] = flags.repository_path;
528   }
529   auto compile_cache = GetCompilerCachePath();
530   if (!compile_cache.empty()) {
531     build_attrs_["compile_cache"] = compile_cache;
532   }
533   return build_attrs_.empty() ? "" : build_attrs_.dump();
534 }
535 }  // namespace kernel
536 }  // namespace mindspore
537