• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "backend/kernel_compiler/akg/akg_kernel_build.h"
18 
19 #include <fcntl.h>
20 #include <unistd.h>
21 #include <cstdio>
22 #include <cerrno>
23 
24 #include <algorithm>
25 #include <map>
26 #include <memory>
27 #include <string>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 #include <iostream>
32 #include "nlohmann/json.hpp"
33 #include "ir/dtype.h"
34 #include "ir/func_graph.h"
35 #include "utils/context/graph_kernel_flags.h"
36 #include "backend/kernel_compiler/common_utils.h"
37 #include "backend/kernel_compiler/akg/akg_kernel_json_generator.h"
38 #include "backend/kernel_compiler/akg/akg_kernel_attrs_process.h"
39 #include "backend/session/anf_runtime_algorithm.h"
40 
41 namespace mindspore {
42 namespace kernel {
43 constexpr int32_t MAX_ERROR_LEN = 1024;
44 constexpr int32_t PROCESS_NUM = 16;
45 constexpr int32_t TIME_OUT = 300;
46 
GetErrorInfo()47 inline std::string GetErrorInfo() {
48   char buf[MAX_ERROR_LEN + 1] = {0};
49   auto ret = strerror_r(errno, buf, MAX_ERROR_LEN);
50 #if (_POSIX_C_SOURCE >= 200112L) && !_GNU_SOURCE
51   if (ret != 0 || strlen(buf) == 0) {
52     return "Call strerror_r failed";
53   }
54 
55   return std::string(buf);
56 #else
57   return ret != nullptr ? std::string(ret) : "Failed to get error info";
58 #endif
59 }
60 
TryLock() const61 bool AkgKernelPool::LockMng::TryLock() const {
62   // Try to lock 100 times. Return errno if lock unsuccessfully
63   uint32_t trial = 100;
64 
65   int32_t ret = -1;
66   while (trial > 0) {
67     ret = lockf(fd_, F_TLOCK, 0);
68     if (ret == 0 || (errno != EACCES && errno != EAGAIN)) {
69       break;
70     }
71 
72     trial--;
73     (void)usleep(5000);
74   }
75 
76   if (ret == -1) {
77     MS_LOG(ERROR) << "Failed to acquire the lock, error msg:" << GetErrorInfo() << ".";
78     return false;
79   }
80 
81   return true;
82 }
83 
Unlock() const84 void AkgKernelPool::LockMng::Unlock() const {
85   auto ret = lockf(fd_, F_ULOCK, 0);
86   if (ret == -1) {
87     MS_LOG(ERROR) << "Failed to release the lock, error msg:" << GetErrorInfo();
88   }
89 }
90 
GetCurrentPath() const91 std::string AkgKernelPool::GetCurrentPath() const {
92   char cwd[PATH_MAX];
93   char *ret = getcwd(cwd, sizeof(cwd));
94   if (ret == nullptr) {
95     MS_LOG(ERROR) << "Get current work directory failed, error msg:" << GetErrorInfo();
96     return "";
97   }
98 
99   char abspath[PATH_MAX];
100   char *res = realpath(cwd, abspath);
101   if (res == nullptr) {
102     MS_LOG(ERROR) << "Change to realpath failed, error msg:" << GetErrorInfo();
103     return "";
104   }
105 
106   return std::string(abspath);
107 }
108 
CreateSharedMem(const std::string & path)109 void *AkgKernelPool::CreateSharedMem(const std::string &path) {
110   is_creator_ = false;
111 
112   auto hash_id = std::hash<std::string>()(path);
113   auto key_id = static_cast<key_t>(hash_id);
114   auto mem_size = sizeof(size_t) * kListNum_ * (kMaxKernelNum_ + 1) + 512;
115 
116   {
117     LockMng lock(fd_);
118     if (!lock.locked_) {
119       MS_LOG(ERROR) << "Failed to acquire lock.";
120       return nullptr;
121     }
122 
123     // check if the shared memory exists or not.
124     // remove shared memory if exists and the nattach is 0
125     struct shmid_ds buf;
126     auto id = shmget(key_id, mem_size, 0);
127     if (id != -1) {
128       auto ret = shmctl(id, IPC_STAT, &buf);
129       if (ret == -1) {
130         MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
131         return nullptr;
132       }
133 
134       if (buf.shm_nattch == 0) {
135         ret = shmctl(id, IPC_RMID, nullptr);
136         if (ret < 0) {
137           MS_LOG(EXCEPTION) << "Realse shared_mem failed, error msg:" << GetErrorInfo();
138         }
139       }
140     }
141   }
142 
143   LockMng lock(fd_);
144   if (!lock.locked_) {
145     MS_LOG(ERROR) << "Failed to acquire lock.";
146     return nullptr;
147   }
148 
149   shm_id_ = shmget(key_id, mem_size, IPC_CREAT | IPC_EXCL | 0600);
150   if (shm_id_ == -1) {
151     if (errno == EEXIST) {
152       shm_id_ = shmget(key_id, mem_size, 0);
153     }
154 
155     if (shm_id_ == -1) {
156       MS_LOG(ERROR) << "Create shared_mem failed, error msg:" << GetErrorInfo();
157       return nullptr;
158     }
159   } else {
160     is_creator_ = true;
161   }
162 
163   auto local_addr = shmat(shm_id_, nullptr, 0);
164   if (local_addr == reinterpret_cast<void *>(-1)) {
165     MS_LOG(ERROR) << "Attach to shared_mem failed, error msg:" << GetErrorInfo();
166     return nullptr;
167   }
168 
169   if (is_creator_) {
170     (void)memset_s(local_addr, mem_size, 0, mem_size);
171   }
172 
173   return local_addr;
174 }
175 
Init(const std::vector<JsonNodePair> & build_args)176 int32_t AkgKernelPool::Init(const std::vector<JsonNodePair> &build_args) {
177   auto cp = GetCurrentPath();
178   if (cp.empty()) {
179     return -1;
180   }
181 
182   fd_ = open(kKeyName_, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
183   if (fd_ == -1) {
184     MS_LOG(ERROR) << "open file <" << kKeyName_ << "> failed, error msg:" << GetErrorInfo();
185     return -1;
186   }
187 
188   auto addr = CreateSharedMem(cp);
189   if (addr == nullptr) {
190     return -1;
191   }
192 
193   InitKernelLists(addr);
194 
195   auto ret = AddKernels(build_args);
196   if (ret != 0) {
197     MS_LOG(ERROR) << "AkgKernelPool AddKernels failed.";
198     return -1;
199   }
200 
201   return 0;
202 }
203 
Release() const204 int32_t AkgKernelPool::Release() const {
205   {
206     LockMng lock(fd_);
207     if (!lock.locked_) {
208       MS_LOG(ERROR) << "Failed to acquire lock.";
209       return -1;
210     }
211 
212     struct shmid_ds buf;
213     auto ret = shmctl(shm_id_, IPC_STAT, &buf);
214     if (ret == -1) {
215       MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
216       return -1;
217     }
218 
219     bool need_delete_by_last = false;
220 
221     // if the creator exits unexpectedly and fails to delete the shm, the last process will try to delete the shm
222     if (((buf.shm_perm.mode & SHM_DEST) == 0) && (buf.shm_nattch == 1)) {
223       need_delete_by_last = true;
224     }
225 
226     // Detach shared memory
227     ret = shmdt(reinterpret_cast<void *>(kernel_lists_[0]));
228     if (ret < 0) {
229       MS_LOG(ERROR) << "Shared_mem detach failed, error msg:" << GetErrorInfo();
230       return -1;
231     }
232 
233     // Realse shared_memroy
234     if (is_creator_ || need_delete_by_last) {
235       ret = shmctl(shm_id_, IPC_RMID, nullptr);
236       if (ret < 0) {
237         MS_LOG(ERROR) << "Realse shared_mem failed, error msg:" << GetErrorInfo();
238         return -1;
239       }
240     }
241   }
242 
243   return 0;
244 }
245 
AddKernels(const std::vector<JsonNodePair> & build_args)246 int32_t AkgKernelPool::AddKernels(const std::vector<JsonNodePair> &build_args) {
247   LockMng lock(fd_);
248   if (!lock.locked_) {
249     MS_LOG(ERROR) << "Failed to acquire lock.";
250     return -1;
251   }
252 
253   std::set<size_t> todo_list(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_));
254   std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
255   std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
256 
257   for (const auto &[json_generator, anf_node] : build_args) {
258     MS_EXCEPTION_IF_NULL(anf_node);
259     auto kernel_name = json_generator.kernel_name();
260 
261     auto hash_id = std::hash<std::string>()(kernel_name);
262     if (self_kernel_ids_.count(hash_id) != 0) {
263       MS_LOG(ERROR) << "Duplicated hash_id in list.";
264       return -1;
265     }
266 
267     (void)self_kernel_ids_.emplace(hash_id);
268   }
269 
270   std::set<size_t> diff_from_todo;
271   std::set<size_t> diff_from_doing;
272   std::set<size_t> diff_from_done;
273 
274   // add the unique kernel only once, so need to check if it exists in todo_list, doing_list, or done_list
275   (void)std::set_difference(self_kernel_ids_.begin(), self_kernel_ids_.end(), todo_list.begin(), todo_list.end(),
276                             std::inserter(diff_from_todo, diff_from_todo.begin()));
277   (void)std::set_difference(diff_from_todo.begin(), diff_from_todo.end(), doing_list.begin(), doing_list.end(),
278                             std::inserter(diff_from_doing, diff_from_doing.begin()));
279   (void)std::set_difference(diff_from_doing.begin(), diff_from_doing.end(), done_list.begin(), done_list.end(),
280                             std::inserter(diff_from_done, diff_from_done.begin()));
281 
282   auto new_kernel_size = diff_from_done.size();
283   if (new_kernel_size + todo_list.size() > static_cast<size_t>(kMaxKernelNum_)) {
284     MS_LOG(ERROR) << "The size of kernels is " << new_kernel_size << ", while the left space of the pool is "
285                   << kMaxKernelNum_ - todo_list.size();
286     return -1;
287   }
288 
289   (void)std::copy(diff_from_done.begin(), diff_from_done.end(), ListEnd(kToDoIdx_));
290   IncListSize(kToDoIdx_, new_kernel_size);
291 
292   return 0;
293 }
294 
FetchKernels(std::set<size_t> * out)295 int32_t AkgKernelPool::FetchKernels(std::set<size_t> *out) {
296   LockMng lock(fd_);
297   if (!lock.locked_) {
298     MS_LOG(ERROR) << "Failed to acquire lock.";
299     return -1;
300   }
301 
302   std::set<size_t> left_in_todo_list;
303 
304   // filter out kernels which belongs to other processes
305   auto FilterBySelfList = [&left_in_todo_list, &out, this](size_t id) {
306     if (this->self_kernel_ids_.count(id) != 0) {
307       (void)out->emplace(id);
308     } else {
309       (void)left_in_todo_list.emplace(id);
310     }
311   };
312 
313   (void)std::for_each(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_), FilterBySelfList);
314 
315   (void)std::copy(out->begin(), out->end(), ListEnd(kDoingIdx_));
316   IncListSize(kDoingIdx_, out->size());
317 
318   (void)std::copy(left_in_todo_list.begin(), left_in_todo_list.end(), ListBegin(kToDoIdx_));
319   ResetListSize(kToDoIdx_, left_in_todo_list.size());
320 
321   return 0;
322 }
323 
UpdateAndWait(const std::set<size_t> & ids)324 int32_t AkgKernelPool::UpdateAndWait(const std::set<size_t> &ids) {
325   if (!ids.empty()) {
326     LockMng lock(fd_);
327     if (!lock.locked_) {
328       MS_LOG(ERROR) << "Failed to acquire lock.";
329       return -1;
330     }
331 
332     // update the state of finished kernels to `done`
333     (void)std::copy(ids.begin(), ids.end(), ListEnd(kDoneIdx_));
334     IncListSize(kDoneIdx_, ids.size());
335 
336     // delete the finished kernels from doing_list
337     std::vector<size_t> left_in_doing_list;
338     std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
339     (void)std::set_difference(doing_list.begin(), doing_list.end(), ids.begin(), ids.end(),
340                               std::inserter(left_in_doing_list, left_in_doing_list.begin()));
341 
342     (void)std::copy(left_in_doing_list.begin(), left_in_doing_list.end(), ListBegin(kDoingIdx_));
343     ResetListSize(kDoingIdx_, left_in_doing_list.size());
344   }
345 
346   auto ret = Wait();
347   if (ret != 0) {
348     MS_LOG(ERROR) << "AkgKernelPool Wait failed.";
349     return -1;
350   }
351 
352   return 0;
353 }
354 
Wait() const355 int32_t AkgKernelPool::Wait() const {
356   // wait until all the kernels which belong to this process finish compiling
357   uint32_t trials = 1000;
358 
359   while (trials > 0) {
360     {
361       LockMng lock(fd_);
362       if (!lock.locked_) {
363         MS_LOG(ERROR) << "Failed to acquire lock.";
364         return -1;
365       }
366 
367       std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
368 
369       if (std::all_of(self_kernel_ids_.begin(), self_kernel_ids_.end(),
370                       [&done_list](size_t id) { return done_list.count(id) != 0; })) {
371         return 0;
372       }
373     }
374 
375     (void)usleep(1000000);
376     trials--;
377   }
378 
379   MS_LOG(ERROR) << "Time out while wait kernel compiling";
380   return -1;
381 }
382 
GetNotCachedKernels(const std::vector<JsonNodePair> & build_args)383 std::vector<JsonNodePair> AkgKernelBuilder::GetNotCachedKernels(const std::vector<JsonNodePair> &build_args) {
384   std::unordered_set<std::string> kernel_name_set;
385   std::vector<JsonNodePair> new_build_args;
386   for (const auto &[json_generator, anf_node] : build_args) {
387     MS_EXCEPTION_IF_NULL(anf_node);
388     auto kernel_name = json_generator.kernel_name();
389 
390     auto cached_kernel_pack = AkgSearchCache(kernel_name);
391     if (cached_kernel_pack != nullptr) {
392       MS_LOG(DEBUG) << "Use cached kernel, kernel_name[" << kernel_name << "], fullname_with_scope["
393                     << anf_node->fullname_with_scope() << "].";
394       AkgSetKernelMod(cached_kernel_pack, json_generator, anf_node);
395       continue;
396     }
397 
398     if (kernel_name_set.count(kernel_name) != 0) {
399       (void)repeat_nodes_.emplace_back(json_generator, anf_node);
400       continue;
401     }
402     kernel_name_set.insert(kernel_name);
403     (void)new_build_args.emplace_back(json_generator, anf_node);
404   }
405   return new_build_args;
406 }
407 
InsertToCache(const std::vector<JsonNodePair> & build_args)408 bool AkgKernelBuilder::InsertToCache(const std::vector<JsonNodePair> &build_args) {
409   for (const auto &[json_generator, anf_node] : build_args) {
410     auto kernel_name = json_generator.kernel_name();
411     auto new_kernel_pack = AkgInsertCache(kernel_name);
412     if (new_kernel_pack == nullptr) {
413       MS_LOG(ERROR) << "Insert to cache failed, kernel_name[" << kernel_name << "], fullname_with_scope["
414                     << anf_node->fullname_with_scope() << "].";
415       return false;
416     }
417     AkgSetKernelMod(new_kernel_pack, json_generator, anf_node);
418     MS_LOG(DEBUG) << "Akg compile " << kernel_name << " kernel and insert cache successfully!";
419   }
420   return true;
421 }
422 
HandleRepeatNodes()423 bool AkgKernelBuilder::HandleRepeatNodes() {
424   for (const auto &[json_generator, anf_node] : repeat_nodes_) {
425     auto kernel_name = json_generator.kernel_name();
426     auto cached_kernel_pack = AkgSearchCache(kernel_name);
427     if (cached_kernel_pack == nullptr) {
428       MS_LOG(ERROR) << "Use cached kernel failed, kernel_name[" << kernel_name << "], fullname_with_scope["
429                     << anf_node->fullname_with_scope() << "].";
430       return false;
431     }
432     MS_LOG(DEBUG) << "Use just compiled kernel, kernel_name[" << kernel_name << "], fullname_with_scope["
433                   << anf_node->fullname_with_scope() << "].";
434     AkgSetKernelMod(cached_kernel_pack, json_generator, anf_node);
435   }
436   return true;
437 }
438 
GetKernelJsonsByHashId(const std::vector<JsonNodePair> & build_args,const std::set<size_t> fetched_ids)439 std::vector<std::string> AkgKernelBuilder::GetKernelJsonsByHashId(const std::vector<JsonNodePair> &build_args,
440                                                                   const std::set<size_t> fetched_ids) {
441   std::vector<std::string> jsons;
442   for (const auto &[json_generator, anf_node] : build_args) {
443     MS_EXCEPTION_IF_NULL(anf_node);
444     auto kernel_name = json_generator.kernel_name();
445     auto hash_id = std::hash<std::string>()(kernel_name);
446     if (fetched_ids.count(hash_id) == 0) {
447       continue;
448     }
449     auto kernel_json = json_generator.kernel_json_str();
450     AkgSaveJsonInfo(kernel_name, kernel_json);
451     jsons.push_back(kernel_json);
452   }
453   return jsons;
454 }
455 
AkgOpParallelBuild(const std::vector<JsonNodePair> & build_args)456 bool AkgKernelBuilder::AkgOpParallelBuild(const std::vector<JsonNodePair> &build_args) {
457   repeat_nodes_.clear();
458   auto new_build_args = GetNotCachedKernels(build_args);
459   if (new_build_args.empty()) {
460     return true;
461   }
462 
463   AkgKernelPool kp;
464   auto ret = kp.Init(new_build_args);
465   if (ret != 0) {
466     MS_LOG(ERROR) << "AkgKernelPool init failed.";
467     return false;
468   }
469 
470   std::set<size_t> fetched_ids;
471   ret = kp.FetchKernels(&fetched_ids);
472   if (ret != 0) {
473     MS_LOG(ERROR) << "AkgKernelPool FetchKernels failed.";
474     return false;
475   }
476 
477   if (!fetched_ids.empty()) {
478     auto jsons = GetKernelJsonsByHashId(new_build_args, fetched_ids);
479 
480     auto client = GetClient();
481     MS_EXCEPTION_IF_NULL(client);
482     if (!client->AkgStart(PROCESS_NUM, TIME_OUT)) {
483       MS_LOG(ERROR) << "Akg start failed.";
484       return false;
485     }
486     auto attrs = CollectBuildAttrs();
487     if (!attrs.empty() && !client->AkgSendAttr(attrs)) {
488       MS_LOG(ERROR) << "Akg send attr failed.";
489       return false;
490     }
491     if (!client->AkgSendData(jsons)) {
492       MS_LOG(ERROR) << "Akg send data failed.";
493       return false;
494     }
495     if (!client->AkgWait()) {
496       MS_LOG(ERROR) << "Akg compile failed.";
497       return false;
498     }
499   }
500 
501   ret = kp.UpdateAndWait(fetched_ids);
502   if (ret != 0) {
503     MS_LOG(ERROR) << "AkgKernelPool UpdateAndWait failed.";
504     return false;
505   }
506 
507   if (kp.Release() != 0) {
508     MS_LOG(ERROR) << "AkgKernelPool release failed.";
509     return false;
510   }
511 
512   // All unique done here, cache them and set kernel.
513   if (!InsertToCache(build_args)) {
514     MS_LOG(ERROR) << "Insert cache failed.";
515     return false;
516   }
517 
518   if (!HandleRepeatNodes()) {
519     MS_LOG(ERROR) << "Handle repeat nodes failed.";
520     return false;
521   }
522 
523   return true;
524 }
525 
AkgKernelParallelBuild(const std::vector<AnfNodePtr> & anf_nodes)526 bool AkgKernelBuilder::AkgKernelParallelBuild(const std::vector<AnfNodePtr> &anf_nodes) {
527   std::vector<JsonNodePair> json_and_node;
528   for (const auto &anf_node : anf_nodes) {
529     MS_EXCEPTION_IF_NULL(anf_node);
530     DumpOption option;
531     option.get_compute_capability = true;
532     AkgKernelJsonGenerator akg_kernel_json_generator(option);
533     auto cnode = anf_node->cast<CNodePtr>();
534     MS_EXCEPTION_IF_NULL(cnode);
535     if (AnfAlgo::IsGraphKernel(cnode)) {
536       auto func_graph = AnfAlgo::GetCNodeFuncGraphPtr(cnode);
537       MS_EXCEPTION_IF_NULL(func_graph);
538       auto mng = func_graph->manager();
539       if (mng == nullptr) {
540         mng = Manage(func_graph, true);
541         func_graph->set_manager(mng);
542       }
543       std::vector<AnfNodePtr> node_list, input_list, output_list;
544       GetValidKernelNodes(func_graph, &node_list, &input_list, &output_list);
545       if (!akg_kernel_json_generator.CollectFusedJson(node_list, input_list, output_list)) {
546         MS_EXCEPTION(UnknownError) << "Collect op info failed. op[" << anf_node->fullname_with_scope() << "].";
547       }
548     } else {
549       if (!akg_kernel_json_generator.CollectJson(anf_node)) {
550         MS_EXCEPTION(UnknownError) << "Collect op info failed. op[" << anf_node->fullname_with_scope() << "].";
551       }
552     }
553     json_and_node.push_back({akg_kernel_json_generator, anf_node});
554   }
555 
556   if (json_and_node.empty()) {
557     MS_LOG(INFO) << "There is no akg kernel to be compiled.";
558     return true;
559   }
560 
561   struct timeval start_time, end_time;
562   (void)gettimeofday(&start_time, nullptr);
563 
564   MS_LOG(INFO) << "Akg start parallel build. kernel count: " << json_and_node.size();
565   bool res = AkgOpParallelBuild(json_and_node);
566   if (!res) {
567     MS_LOG(ERROR) << "Akg build kernel failed.";
568   }
569 
570   (void)gettimeofday(&end_time, nullptr);
571   const uint64_t kUSecondInSecond = 1000000;
572   uint64_t cost = kUSecondInSecond * static_cast<uint64_t>(end_time.tv_sec - start_time.tv_sec);
573   cost += static_cast<uint64_t>(end_time.tv_usec - start_time.tv_usec);
574   MS_LOG(INFO) << "Akg kernel build time: " << cost << " us.";
575   return true;
576 }
577 
CollectBuildAttrs()578 std::string AkgKernelBuilder::CollectBuildAttrs() {
579   auto &flags = context::GraphKernelFlags::GetInstance();
580   nlohmann::json attrs;
581   if (flags.online_tuning > 0) {
582     attrs["online_tuning"] = flags.online_tuning;
583   }
584   if (!flags.repository_path.empty()) {
585     attrs["repository_path"] = flags.repository_path;
586   }
587   return attrs.empty() ? "" : attrs.dump();
588 }
589 }  // namespace kernel
590 }  // namespace mindspore
591