1 /**
2 * Copyright 2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "plugin/device/ascend/hal/device/tensordump_utils.h"
18 #include <string>
19 #include <fstream>
20 #include "debug/data_dump/npy_header.h"
21 #include "ir/tensor.h"
22 #include "utils/file_utils.h"
23 #include "utils/log_adapter.h"
24
25 namespace mindspore::device::ascend {
26 namespace {
27
SaveTensor2NPY(std::string file_name,mindspore::tensor::TensorPtr tensor_ptr)28 void SaveTensor2NPY(std::string file_name, mindspore::tensor::TensorPtr tensor_ptr) {
29 std::string npy_header = GenerateNpyHeader(tensor_ptr->shape(), tensor_ptr->data_type());
30 if (!npy_header.empty()) {
31 ChangeFileMode(file_name, S_IWUSR);
32 std::fstream output{file_name, std::ios::out | std::ios::trunc | std::ios::binary};
33 if (!output.is_open()) {
34 MS_LOG(ERROR) << "For 'TensorDump' ops, open " << file_name << " file failed, the args of 'file' is invalid.";
35 return;
36 }
37 output << npy_header;
38 (void)output.write(reinterpret_cast<const char *>(tensor_ptr->data_c()), SizeToLong(tensor_ptr->Size()));
39 if (output.bad()) {
40 output.close();
41 MS_LOG(ERROR) << "For 'TensorDump' ops, write mem to " << file_name << " failed.";
42 return;
43 }
44 output.close();
45 ChangeFileMode(file_name, S_IRUSR);
46 } else {
47 MS_LOG(ERROR) << "For 'TensorDump' ops, the type of " << TypeIdToType(tensor_ptr->data_type())->ToString()
48 << " not support dump.";
49 }
50 }
51
EndsWith(const std::string & s,const std::string & sub)52 bool EndsWith(const std::string &s, const std::string &sub) {
53 if (s.length() < sub.length()) {
54 return false;
55 }
56 return s.rfind(sub) == (s.length() - sub.length()) ? true : false;
57 }
58
59 } // namespace
60
AsyncFileWriter(size_t thread_nums)61 AsyncFileWriter::AsyncFileWriter(size_t thread_nums) { threads.reserve(thread_nums); }
62
~AsyncFileWriter()63 AsyncFileWriter::~AsyncFileWriter() {
64 stop.store(true, std::memory_order_acq_rel);
65 cv.notify_all();
66 for (auto &thread : threads) {
67 if (thread.joinable()) {
68 MS_LOG(INFO) << "TensorDump join file writer threads";
69 thread.join();
70 }
71 }
72 }
73
Submit(std::function<void ()> func)74 void AsyncFileWriter::Submit(std::function<void()> func) {
75 if (!threads_started.exchange(true)) {
76 MS_LOG(INFO) << "Create AsyncFileWriter threads.";
77 for (size_t i = 0; i < threads.capacity(); ++i) {
78 threads.emplace_back(&AsyncFileWriter::WorkerThread, this);
79 }
80 }
81 {
82 std::lock_guard<std::mutex> lock(queue_mutex);
83 tasks.push(func);
84 }
85 cv.notify_one();
86 }
87
WorkerThread()88 void AsyncFileWriter::WorkerThread() {
89 while (true) {
90 std::function<void()> task;
91 {
92 std::unique_lock<std::mutex> lock(queue_mutex);
93 cv.wait(lock, [this] { return stop || !tasks.empty(); });
94 if (stop && tasks.empty()) {
95 return;
96 }
97 task = tasks.front();
98 tasks.pop();
99 }
100 task();
101 }
102 }
103
TensorNameToArrayName(const std::string & tensor_path)104 std::string TensorDumpUtils::TensorNameToArrayName(const std::string &tensor_path) {
105 static size_t name_id = 0;
106 std::string npy_suffix{".npy"};
107 std::string separator{"_"};
108 std::optional<std::string> parent_path;
109 std::optional<std::string> file_name;
110 FileUtils::SplitDirAndFileName(tensor_path, &parent_path, &file_name);
111 if (!parent_path.has_value()) {
112 parent_path = ".";
113 }
114 std::optional<std::string> realpath = FileUtils::CreateNotExistDirs(parent_path.value());
115 std::optional<std::string> new_file_name = std::to_string(name_id++) + separator + file_name.value();
116 if (!EndsWith(new_file_name.value(), npy_suffix)) {
117 new_file_name.value() += npy_suffix;
118 }
119 std::optional<std::string> new_file_path;
120 FileUtils::ConcatDirAndFileName(&realpath, &new_file_name, &new_file_path);
121 MS_LOG(INFO) << "For 'TensorDump' ops, dump file path is " << new_file_path.value();
122 return new_file_path.value();
123 }
124
GetInstance()125 TensorDumpUtils &TensorDumpUtils::GetInstance() {
126 static TensorDumpUtils instance;
127 return instance;
128 }
129
AsyncSaveDatasetToNpyFile(const ScopeAclTdtDataset & dataset)130 void TensorDumpUtils::AsyncSaveDatasetToNpyFile(const ScopeAclTdtDataset &dataset) {
131 std::string tensor_name = dataset.GetDatasetName();
132 MS_LOG(INFO) << "For 'TensorDump' ops, acltdt received Tensor name is " << tensor_name;
133 if (tensor_name.empty()) {
134 MS_LOG(ERROR) << "For 'TensorDump' ops, the args of 'file' is empty, skip this data.";
135 return;
136 }
137
138 auto file_name = TensorNameToArrayName(tensor_name);
139 for (auto data_elem : dataset.GetDataItems()) {
140 if (std::holds_alternative<std::string>(data_elem)) {
141 MS_LOG(WARNING) << "Ignore data of string type: " << std::get<std::string>(data_elem);
142 }
143 auto tensor_ptr = std::get<mindspore::tensor::TensorPtr>(data_elem);
144 file_writer.Submit(std::bind(SaveTensor2NPY, file_name, tensor_ptr));
145 }
146 }
147
148 } // namespace mindspore::device::ascend
149