1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "backend/kernel_compiler/akg/akg_kernel_build.h"
18
19 #include <fcntl.h>
20 #include <unistd.h>
21 #include <cstdio>
22 #include <cerrno>
23
24 #include <algorithm>
25 #include <map>
26 #include <memory>
27 #include <string>
28 #include <unordered_set>
29 #include <utility>
30 #include <vector>
31 #include <iostream>
32 #include "nlohmann/json.hpp"
33 #include "ir/dtype.h"
34 #include "ir/func_graph.h"
35 #include "utils/context/graph_kernel_flags.h"
36 #include "backend/kernel_compiler/common_utils.h"
37 #include "backend/kernel_compiler/akg/akg_kernel_json_generator.h"
38 #include "backend/kernel_compiler/akg/akg_kernel_attrs_process.h"
39 #include "backend/session/anf_runtime_algorithm.h"
40
41 namespace mindspore {
42 namespace kernel {
43 constexpr int32_t MAX_ERROR_LEN = 1024;
44 constexpr int32_t PROCESS_NUM = 16;
45 constexpr int32_t TIME_OUT = 300;
46
GetErrorInfo()47 inline std::string GetErrorInfo() {
48 char buf[MAX_ERROR_LEN + 1] = {0};
49 auto ret = strerror_r(errno, buf, MAX_ERROR_LEN);
50 #if (_POSIX_C_SOURCE >= 200112L) && !_GNU_SOURCE
51 if (ret != 0 || strlen(buf) == 0) {
52 return "Call strerror_r failed";
53 }
54
55 return std::string(buf);
56 #else
57 return ret != nullptr ? std::string(ret) : "Failed to get error info";
58 #endif
59 }
60
TryLock() const61 bool AkgKernelPool::LockMng::TryLock() const {
62 // Try to lock 100 times. Return errno if lock unsuccessfully
63 uint32_t trial = 100;
64
65 int32_t ret = -1;
66 while (trial > 0) {
67 ret = lockf(fd_, F_TLOCK, 0);
68 if (ret == 0 || (errno != EACCES && errno != EAGAIN)) {
69 break;
70 }
71
72 trial--;
73 (void)usleep(5000);
74 }
75
76 if (ret == -1) {
77 MS_LOG(ERROR) << "Failed to acquire the lock, error msg:" << GetErrorInfo() << ".";
78 return false;
79 }
80
81 return true;
82 }
83
Unlock() const84 void AkgKernelPool::LockMng::Unlock() const {
85 auto ret = lockf(fd_, F_ULOCK, 0);
86 if (ret == -1) {
87 MS_LOG(ERROR) << "Failed to release the lock, error msg:" << GetErrorInfo();
88 }
89 }
90
GetCurrentPath() const91 std::string AkgKernelPool::GetCurrentPath() const {
92 char cwd[PATH_MAX];
93 char *ret = getcwd(cwd, sizeof(cwd));
94 if (ret == nullptr) {
95 MS_LOG(ERROR) << "Get current work directory failed, error msg:" << GetErrorInfo();
96 return "";
97 }
98
99 char abspath[PATH_MAX];
100 char *res = realpath(cwd, abspath);
101 if (res == nullptr) {
102 MS_LOG(ERROR) << "Change to realpath failed, error msg:" << GetErrorInfo();
103 return "";
104 }
105
106 return std::string(abspath);
107 }
108
CreateSharedMem(const std::string & path)109 void *AkgKernelPool::CreateSharedMem(const std::string &path) {
110 is_creator_ = false;
111
112 auto hash_id = std::hash<std::string>()(path);
113 auto key_id = static_cast<key_t>(hash_id);
114 auto mem_size = sizeof(size_t) * kListNum_ * (kMaxKernelNum_ + 1) + 512;
115
116 {
117 LockMng lock(fd_);
118 if (!lock.locked_) {
119 MS_LOG(ERROR) << "Failed to acquire lock.";
120 return nullptr;
121 }
122
123 // check if the shared memory exists or not.
124 // remove shared memory if exists and the nattach is 0
125 struct shmid_ds buf;
126 auto id = shmget(key_id, mem_size, 0);
127 if (id != -1) {
128 auto ret = shmctl(id, IPC_STAT, &buf);
129 if (ret == -1) {
130 MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
131 return nullptr;
132 }
133
134 if (buf.shm_nattch == 0) {
135 ret = shmctl(id, IPC_RMID, nullptr);
136 if (ret < 0) {
137 MS_LOG(EXCEPTION) << "Realse shared_mem failed, error msg:" << GetErrorInfo();
138 }
139 }
140 }
141 }
142
143 LockMng lock(fd_);
144 if (!lock.locked_) {
145 MS_LOG(ERROR) << "Failed to acquire lock.";
146 return nullptr;
147 }
148
149 shm_id_ = shmget(key_id, mem_size, IPC_CREAT | IPC_EXCL | 0600);
150 if (shm_id_ == -1) {
151 if (errno == EEXIST) {
152 shm_id_ = shmget(key_id, mem_size, 0);
153 }
154
155 if (shm_id_ == -1) {
156 MS_LOG(ERROR) << "Create shared_mem failed, error msg:" << GetErrorInfo();
157 return nullptr;
158 }
159 } else {
160 is_creator_ = true;
161 }
162
163 auto local_addr = shmat(shm_id_, nullptr, 0);
164 if (local_addr == reinterpret_cast<void *>(-1)) {
165 MS_LOG(ERROR) << "Attach to shared_mem failed, error msg:" << GetErrorInfo();
166 return nullptr;
167 }
168
169 if (is_creator_) {
170 (void)memset_s(local_addr, mem_size, 0, mem_size);
171 }
172
173 return local_addr;
174 }
175
Init(const std::vector<JsonNodePair> & build_args)176 int32_t AkgKernelPool::Init(const std::vector<JsonNodePair> &build_args) {
177 auto cp = GetCurrentPath();
178 if (cp.empty()) {
179 return -1;
180 }
181
182 fd_ = open(kKeyName_, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
183 if (fd_ == -1) {
184 MS_LOG(ERROR) << "open file <" << kKeyName_ << "> failed, error msg:" << GetErrorInfo();
185 return -1;
186 }
187
188 auto addr = CreateSharedMem(cp);
189 if (addr == nullptr) {
190 return -1;
191 }
192
193 InitKernelLists(addr);
194
195 auto ret = AddKernels(build_args);
196 if (ret != 0) {
197 MS_LOG(ERROR) << "AkgKernelPool AddKernels failed.";
198 return -1;
199 }
200
201 return 0;
202 }
203
Release() const204 int32_t AkgKernelPool::Release() const {
205 {
206 LockMng lock(fd_);
207 if (!lock.locked_) {
208 MS_LOG(ERROR) << "Failed to acquire lock.";
209 return -1;
210 }
211
212 struct shmid_ds buf;
213 auto ret = shmctl(shm_id_, IPC_STAT, &buf);
214 if (ret == -1) {
215 MS_LOG(ERROR) << "Failed to get the info of shared memory, error msg:" << GetErrorInfo();
216 return -1;
217 }
218
219 bool need_delete_by_last = false;
220
221 // if the creator exits unexpectedly and fails to delete the shm, the last process will try to delete the shm
222 if (((buf.shm_perm.mode & SHM_DEST) == 0) && (buf.shm_nattch == 1)) {
223 need_delete_by_last = true;
224 }
225
226 // Detach shared memory
227 ret = shmdt(reinterpret_cast<void *>(kernel_lists_[0]));
228 if (ret < 0) {
229 MS_LOG(ERROR) << "Shared_mem detach failed, error msg:" << GetErrorInfo();
230 return -1;
231 }
232
233 // Realse shared_memroy
234 if (is_creator_ || need_delete_by_last) {
235 ret = shmctl(shm_id_, IPC_RMID, nullptr);
236 if (ret < 0) {
237 MS_LOG(ERROR) << "Realse shared_mem failed, error msg:" << GetErrorInfo();
238 return -1;
239 }
240 }
241 }
242
243 return 0;
244 }
245
AddKernels(const std::vector<JsonNodePair> & build_args)246 int32_t AkgKernelPool::AddKernels(const std::vector<JsonNodePair> &build_args) {
247 LockMng lock(fd_);
248 if (!lock.locked_) {
249 MS_LOG(ERROR) << "Failed to acquire lock.";
250 return -1;
251 }
252
253 std::set<size_t> todo_list(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_));
254 std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
255 std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
256
257 for (const auto &[json_generator, anf_node] : build_args) {
258 MS_EXCEPTION_IF_NULL(anf_node);
259 auto kernel_name = json_generator.kernel_name();
260
261 auto hash_id = std::hash<std::string>()(kernel_name);
262 if (self_kernel_ids_.count(hash_id) != 0) {
263 MS_LOG(ERROR) << "Duplicated hash_id in list.";
264 return -1;
265 }
266
267 (void)self_kernel_ids_.emplace(hash_id);
268 }
269
270 std::set<size_t> diff_from_todo;
271 std::set<size_t> diff_from_doing;
272 std::set<size_t> diff_from_done;
273
274 // add the unique kernel only once, so need to check if it exists in todo_list, doing_list, or done_list
275 (void)std::set_difference(self_kernel_ids_.begin(), self_kernel_ids_.end(), todo_list.begin(), todo_list.end(),
276 std::inserter(diff_from_todo, diff_from_todo.begin()));
277 (void)std::set_difference(diff_from_todo.begin(), diff_from_todo.end(), doing_list.begin(), doing_list.end(),
278 std::inserter(diff_from_doing, diff_from_doing.begin()));
279 (void)std::set_difference(diff_from_doing.begin(), diff_from_doing.end(), done_list.begin(), done_list.end(),
280 std::inserter(diff_from_done, diff_from_done.begin()));
281
282 auto new_kernel_size = diff_from_done.size();
283 if (new_kernel_size + todo_list.size() > static_cast<size_t>(kMaxKernelNum_)) {
284 MS_LOG(ERROR) << "The size of kernels is " << new_kernel_size << ", while the left space of the pool is "
285 << kMaxKernelNum_ - todo_list.size();
286 return -1;
287 }
288
289 (void)std::copy(diff_from_done.begin(), diff_from_done.end(), ListEnd(kToDoIdx_));
290 IncListSize(kToDoIdx_, new_kernel_size);
291
292 return 0;
293 }
294
FetchKernels(std::set<size_t> * out)295 int32_t AkgKernelPool::FetchKernels(std::set<size_t> *out) {
296 LockMng lock(fd_);
297 if (!lock.locked_) {
298 MS_LOG(ERROR) << "Failed to acquire lock.";
299 return -1;
300 }
301
302 std::set<size_t> left_in_todo_list;
303
304 // filter out kernels which belongs to other processes
305 auto FilterBySelfList = [&left_in_todo_list, &out, this](size_t id) {
306 if (this->self_kernel_ids_.count(id) != 0) {
307 (void)out->emplace(id);
308 } else {
309 (void)left_in_todo_list.emplace(id);
310 }
311 };
312
313 (void)std::for_each(ListBegin(kToDoIdx_), ListEnd(kToDoIdx_), FilterBySelfList);
314
315 (void)std::copy(out->begin(), out->end(), ListEnd(kDoingIdx_));
316 IncListSize(kDoingIdx_, out->size());
317
318 (void)std::copy(left_in_todo_list.begin(), left_in_todo_list.end(), ListBegin(kToDoIdx_));
319 ResetListSize(kToDoIdx_, left_in_todo_list.size());
320
321 return 0;
322 }
323
UpdateAndWait(const std::set<size_t> & ids)324 int32_t AkgKernelPool::UpdateAndWait(const std::set<size_t> &ids) {
325 if (!ids.empty()) {
326 LockMng lock(fd_);
327 if (!lock.locked_) {
328 MS_LOG(ERROR) << "Failed to acquire lock.";
329 return -1;
330 }
331
332 // update the state of finished kernels to `done`
333 (void)std::copy(ids.begin(), ids.end(), ListEnd(kDoneIdx_));
334 IncListSize(kDoneIdx_, ids.size());
335
336 // delete the finished kernels from doing_list
337 std::vector<size_t> left_in_doing_list;
338 std::set<size_t> doing_list(ListBegin(kDoingIdx_), ListEnd(kDoingIdx_));
339 (void)std::set_difference(doing_list.begin(), doing_list.end(), ids.begin(), ids.end(),
340 std::inserter(left_in_doing_list, left_in_doing_list.begin()));
341
342 (void)std::copy(left_in_doing_list.begin(), left_in_doing_list.end(), ListBegin(kDoingIdx_));
343 ResetListSize(kDoingIdx_, left_in_doing_list.size());
344 }
345
346 auto ret = Wait();
347 if (ret != 0) {
348 MS_LOG(ERROR) << "AkgKernelPool Wait failed.";
349 return -1;
350 }
351
352 return 0;
353 }
354
Wait() const355 int32_t AkgKernelPool::Wait() const {
356 // wait until all the kernels which belong to this process finish compiling
357 uint32_t trials = 1000;
358
359 while (trials > 0) {
360 {
361 LockMng lock(fd_);
362 if (!lock.locked_) {
363 MS_LOG(ERROR) << "Failed to acquire lock.";
364 return -1;
365 }
366
367 std::set<size_t> done_list(ListBegin(kDoneIdx_), ListEnd(kDoneIdx_));
368
369 if (std::all_of(self_kernel_ids_.begin(), self_kernel_ids_.end(),
370 [&done_list](size_t id) { return done_list.count(id) != 0; })) {
371 return 0;
372 }
373 }
374
375 (void)usleep(1000000);
376 trials--;
377 }
378
379 MS_LOG(ERROR) << "Time out while wait kernel compiling";
380 return -1;
381 }
382
GetNotCachedKernels(const std::vector<JsonNodePair> & build_args)383 std::vector<JsonNodePair> AkgKernelBuilder::GetNotCachedKernels(const std::vector<JsonNodePair> &build_args) {
384 std::unordered_set<std::string> kernel_name_set;
385 std::vector<JsonNodePair> new_build_args;
386 for (const auto &[json_generator, anf_node] : build_args) {
387 MS_EXCEPTION_IF_NULL(anf_node);
388 auto kernel_name = json_generator.kernel_name();
389
390 auto cached_kernel_pack = AkgSearchCache(kernel_name);
391 if (cached_kernel_pack != nullptr) {
392 MS_LOG(DEBUG) << "Use cached kernel, kernel_name[" << kernel_name << "], fullname_with_scope["
393 << anf_node->fullname_with_scope() << "].";
394 AkgSetKernelMod(cached_kernel_pack, json_generator, anf_node);
395 continue;
396 }
397
398 if (kernel_name_set.count(kernel_name) != 0) {
399 (void)repeat_nodes_.emplace_back(json_generator, anf_node);
400 continue;
401 }
402 kernel_name_set.insert(kernel_name);
403 (void)new_build_args.emplace_back(json_generator, anf_node);
404 }
405 return new_build_args;
406 }
407
InsertToCache(const std::vector<JsonNodePair> & build_args)408 bool AkgKernelBuilder::InsertToCache(const std::vector<JsonNodePair> &build_args) {
409 for (const auto &[json_generator, anf_node] : build_args) {
410 auto kernel_name = json_generator.kernel_name();
411 auto new_kernel_pack = AkgInsertCache(kernel_name);
412 if (new_kernel_pack == nullptr) {
413 MS_LOG(ERROR) << "Insert to cache failed, kernel_name[" << kernel_name << "], fullname_with_scope["
414 << anf_node->fullname_with_scope() << "].";
415 return false;
416 }
417 AkgSetKernelMod(new_kernel_pack, json_generator, anf_node);
418 MS_LOG(DEBUG) << "Akg compile " << kernel_name << " kernel and insert cache successfully!";
419 }
420 return true;
421 }
422
HandleRepeatNodes()423 bool AkgKernelBuilder::HandleRepeatNodes() {
424 for (const auto &[json_generator, anf_node] : repeat_nodes_) {
425 auto kernel_name = json_generator.kernel_name();
426 auto cached_kernel_pack = AkgSearchCache(kernel_name);
427 if (cached_kernel_pack == nullptr) {
428 MS_LOG(ERROR) << "Use cached kernel failed, kernel_name[" << kernel_name << "], fullname_with_scope["
429 << anf_node->fullname_with_scope() << "].";
430 return false;
431 }
432 MS_LOG(DEBUG) << "Use just compiled kernel, kernel_name[" << kernel_name << "], fullname_with_scope["
433 << anf_node->fullname_with_scope() << "].";
434 AkgSetKernelMod(cached_kernel_pack, json_generator, anf_node);
435 }
436 return true;
437 }
438
GetKernelJsonsByHashId(const std::vector<JsonNodePair> & build_args,const std::set<size_t> fetched_ids)439 std::vector<std::string> AkgKernelBuilder::GetKernelJsonsByHashId(const std::vector<JsonNodePair> &build_args,
440 const std::set<size_t> fetched_ids) {
441 std::vector<std::string> jsons;
442 for (const auto &[json_generator, anf_node] : build_args) {
443 MS_EXCEPTION_IF_NULL(anf_node);
444 auto kernel_name = json_generator.kernel_name();
445 auto hash_id = std::hash<std::string>()(kernel_name);
446 if (fetched_ids.count(hash_id) == 0) {
447 continue;
448 }
449 auto kernel_json = json_generator.kernel_json_str();
450 AkgSaveJsonInfo(kernel_name, kernel_json);
451 jsons.push_back(kernel_json);
452 }
453 return jsons;
454 }
455
AkgOpParallelBuild(const std::vector<JsonNodePair> & build_args)456 bool AkgKernelBuilder::AkgOpParallelBuild(const std::vector<JsonNodePair> &build_args) {
457 repeat_nodes_.clear();
458 auto new_build_args = GetNotCachedKernels(build_args);
459 if (new_build_args.empty()) {
460 return true;
461 }
462
463 AkgKernelPool kp;
464 auto ret = kp.Init(new_build_args);
465 if (ret != 0) {
466 MS_LOG(ERROR) << "AkgKernelPool init failed.";
467 return false;
468 }
469
470 std::set<size_t> fetched_ids;
471 ret = kp.FetchKernels(&fetched_ids);
472 if (ret != 0) {
473 MS_LOG(ERROR) << "AkgKernelPool FetchKernels failed.";
474 return false;
475 }
476
477 if (!fetched_ids.empty()) {
478 auto jsons = GetKernelJsonsByHashId(new_build_args, fetched_ids);
479
480 auto client = GetClient();
481 MS_EXCEPTION_IF_NULL(client);
482 if (!client->AkgStart(PROCESS_NUM, TIME_OUT)) {
483 MS_LOG(ERROR) << "Akg start failed.";
484 return false;
485 }
486 auto attrs = CollectBuildAttrs();
487 if (!attrs.empty() && !client->AkgSendAttr(attrs)) {
488 MS_LOG(ERROR) << "Akg send attr failed.";
489 return false;
490 }
491 if (!client->AkgSendData(jsons)) {
492 MS_LOG(ERROR) << "Akg send data failed.";
493 return false;
494 }
495 if (!client->AkgWait()) {
496 MS_LOG(ERROR) << "Akg compile failed.";
497 return false;
498 }
499 }
500
501 ret = kp.UpdateAndWait(fetched_ids);
502 if (ret != 0) {
503 MS_LOG(ERROR) << "AkgKernelPool UpdateAndWait failed.";
504 return false;
505 }
506
507 if (kp.Release() != 0) {
508 MS_LOG(ERROR) << "AkgKernelPool release failed.";
509 return false;
510 }
511
512 // All unique done here, cache them and set kernel.
513 if (!InsertToCache(build_args)) {
514 MS_LOG(ERROR) << "Insert cache failed.";
515 return false;
516 }
517
518 if (!HandleRepeatNodes()) {
519 MS_LOG(ERROR) << "Handle repeat nodes failed.";
520 return false;
521 }
522
523 return true;
524 }
525
AkgKernelParallelBuild(const std::vector<AnfNodePtr> & anf_nodes)526 bool AkgKernelBuilder::AkgKernelParallelBuild(const std::vector<AnfNodePtr> &anf_nodes) {
527 std::vector<JsonNodePair> json_and_node;
528 for (const auto &anf_node : anf_nodes) {
529 MS_EXCEPTION_IF_NULL(anf_node);
530 DumpOption option;
531 option.get_compute_capability = true;
532 AkgKernelJsonGenerator akg_kernel_json_generator(option);
533 auto cnode = anf_node->cast<CNodePtr>();
534 MS_EXCEPTION_IF_NULL(cnode);
535 if (AnfAlgo::IsGraphKernel(cnode)) {
536 auto func_graph = AnfAlgo::GetCNodeFuncGraphPtr(cnode);
537 MS_EXCEPTION_IF_NULL(func_graph);
538 auto mng = func_graph->manager();
539 if (mng == nullptr) {
540 mng = Manage(func_graph, true);
541 func_graph->set_manager(mng);
542 }
543 std::vector<AnfNodePtr> node_list, input_list, output_list;
544 GetValidKernelNodes(func_graph, &node_list, &input_list, &output_list);
545 if (!akg_kernel_json_generator.CollectFusedJson(node_list, input_list, output_list)) {
546 MS_EXCEPTION(UnknownError) << "Collect op info failed. op[" << anf_node->fullname_with_scope() << "].";
547 }
548 } else {
549 if (!akg_kernel_json_generator.CollectJson(anf_node)) {
550 MS_EXCEPTION(UnknownError) << "Collect op info failed. op[" << anf_node->fullname_with_scope() << "].";
551 }
552 }
553 json_and_node.push_back({akg_kernel_json_generator, anf_node});
554 }
555
556 if (json_and_node.empty()) {
557 MS_LOG(INFO) << "There is no akg kernel to be compiled.";
558 return true;
559 }
560
561 struct timeval start_time, end_time;
562 (void)gettimeofday(&start_time, nullptr);
563
564 MS_LOG(INFO) << "Akg start parallel build. kernel count: " << json_and_node.size();
565 bool res = AkgOpParallelBuild(json_and_node);
566 if (!res) {
567 MS_LOG(ERROR) << "Akg build kernel failed.";
568 }
569
570 (void)gettimeofday(&end_time, nullptr);
571 const uint64_t kUSecondInSecond = 1000000;
572 uint64_t cost = kUSecondInSecond * static_cast<uint64_t>(end_time.tv_sec - start_time.tv_sec);
573 cost += static_cast<uint64_t>(end_time.tv_usec - start_time.tv_usec);
574 MS_LOG(INFO) << "Akg kernel build time: " << cost << " us.";
575 return true;
576 }
577
CollectBuildAttrs()578 std::string AkgKernelBuilder::CollectBuildAttrs() {
579 auto &flags = context::GraphKernelFlags::GetInstance();
580 nlohmann::json attrs;
581 if (flags.online_tuning > 0) {
582 attrs["online_tuning"] = flags.online_tuning;
583 }
584 if (!flags.repository_path.empty()) {
585 attrs["repository_path"] = flags.repository_path;
586 }
587 return attrs.empty() ? "" : attrs.dump();
588 }
589 } // namespace kernel
590 } // namespace mindspore
591