• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "plugin/device/ascend/hal/device/tensordump_utils.h"
18 #include <string>
19 #include <fstream>
20 #include "debug/data_dump/npy_header.h"
21 #include "ir/tensor.h"
22 #include "utils/file_utils.h"
23 #include "utils/log_adapter.h"
24 
25 namespace mindspore::device::ascend {
26 namespace {
27 
SaveTensor2NPY(std::string file_name,mindspore::tensor::TensorPtr tensor_ptr)28 void SaveTensor2NPY(std::string file_name, mindspore::tensor::TensorPtr tensor_ptr) {
29   std::string npy_header = GenerateNpyHeader(tensor_ptr->shape(), tensor_ptr->data_type());
30   if (!npy_header.empty()) {
31     ChangeFileMode(file_name, S_IWUSR);
32     std::fstream output{file_name, std::ios::out | std::ios::trunc | std::ios::binary};
33     if (!output.is_open()) {
34       MS_LOG(ERROR) << "For 'TensorDump' ops, open " << file_name << " file failed, the args of 'file' is invalid.";
35       return;
36     }
37     output << npy_header;
38     (void)output.write(reinterpret_cast<const char *>(tensor_ptr->data_c()), SizeToLong(tensor_ptr->Size()));
39     if (output.bad()) {
40       output.close();
41       MS_LOG(ERROR) << "For 'TensorDump' ops, write mem to " << file_name << " failed.";
42       return;
43     }
44     output.close();
45     ChangeFileMode(file_name, S_IRUSR);
46   } else {
47     MS_LOG(ERROR) << "For 'TensorDump' ops, the type of " << TypeIdToType(tensor_ptr->data_type())->ToString()
48                   << " not support dump.";
49   }
50 }
51 
EndsWith(const std::string & s,const std::string & sub)52 bool EndsWith(const std::string &s, const std::string &sub) {
53   if (s.length() < sub.length()) {
54     return false;
55   }
56   return s.rfind(sub) == (s.length() - sub.length()) ? true : false;
57 }
58 
59 }  // namespace
60 
AsyncFileWriter(size_t thread_nums)61 AsyncFileWriter::AsyncFileWriter(size_t thread_nums) { threads.reserve(thread_nums); }
62 
~AsyncFileWriter()63 AsyncFileWriter::~AsyncFileWriter() {
64   stop.store(true, std::memory_order_acq_rel);
65   cv.notify_all();
66   for (auto &thread : threads) {
67     if (thread.joinable()) {
68       MS_LOG(INFO) << "TensorDump join file writer threads";
69       thread.join();
70     }
71   }
72 }
73 
Submit(std::function<void ()> func)74 void AsyncFileWriter::Submit(std::function<void()> func) {
75   if (!threads_started.exchange(true)) {
76     MS_LOG(INFO) << "Create AsyncFileWriter threads.";
77     for (size_t i = 0; i < threads.capacity(); ++i) {
78       threads.emplace_back(&AsyncFileWriter::WorkerThread, this);
79     }
80   }
81   {
82     std::lock_guard<std::mutex> lock(queue_mutex);
83     tasks.push(func);
84   }
85   cv.notify_one();
86 }
87 
WorkerThread()88 void AsyncFileWriter::WorkerThread() {
89   while (true) {
90     std::function<void()> task;
91     {
92       std::unique_lock<std::mutex> lock(queue_mutex);
93       cv.wait(lock, [this] { return stop || !tasks.empty(); });
94       if (stop && tasks.empty()) {
95         return;
96       }
97       task = tasks.front();
98       tasks.pop();
99     }
100     task();
101   }
102 }
103 
TensorNameToArrayName(const std::string & tensor_path)104 std::string TensorDumpUtils::TensorNameToArrayName(const std::string &tensor_path) {
105   static size_t name_id = 0;
106   std::string npy_suffix{".npy"};
107   std::string separator{"_"};
108   std::optional<std::string> parent_path;
109   std::optional<std::string> file_name;
110   FileUtils::SplitDirAndFileName(tensor_path, &parent_path, &file_name);
111   if (!parent_path.has_value()) {
112     parent_path = ".";
113   }
114   std::optional<std::string> realpath = FileUtils::CreateNotExistDirs(parent_path.value());
115   std::optional<std::string> new_file_name = std::to_string(name_id++) + separator + file_name.value();
116   if (!EndsWith(new_file_name.value(), npy_suffix)) {
117     new_file_name.value() += npy_suffix;
118   }
119   std::optional<std::string> new_file_path;
120   FileUtils::ConcatDirAndFileName(&realpath, &new_file_name, &new_file_path);
121   MS_LOG(INFO) << "For 'TensorDump' ops, dump file path is " << new_file_path.value();
122   return new_file_path.value();
123 }
124 
GetInstance()125 TensorDumpUtils &TensorDumpUtils::GetInstance() {
126   static TensorDumpUtils instance;
127   return instance;
128 }
129 
AsyncSaveDatasetToNpyFile(const ScopeAclTdtDataset & dataset)130 void TensorDumpUtils::AsyncSaveDatasetToNpyFile(const ScopeAclTdtDataset &dataset) {
131   std::string tensor_name = dataset.GetDatasetName();
132   MS_LOG(INFO) << "For 'TensorDump' ops, acltdt received Tensor name is " << tensor_name;
133   if (tensor_name.empty()) {
134     MS_LOG(ERROR) << "For 'TensorDump' ops, the args of 'file' is empty, skip this data.";
135     return;
136   }
137 
138   auto file_name = TensorNameToArrayName(tensor_name);
139   for (auto data_elem : dataset.GetDataItems()) {
140     if (std::holds_alternative<std::string>(data_elem)) {
141       MS_LOG(WARNING) << "Ignore data of string type: " << std::get<std::string>(data_elem);
142     }
143     auto tensor_ptr = std::get<mindspore::tensor::TensorPtr>(data_elem);
144     file_writer.Submit(std::bind(SaveTensor2NPY, file_name, tensor_ptr));
145   }
146 }
147 
148 }  // namespace mindspore::device::ascend
149