1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/clue_op.h"
17
18 #include <string>
19 #include <utility>
20 #include <vector>
21 #include <fstream>
22 #include <iomanip>
23
24 #include "utils/file_utils.h"
25 #include "minddata/dataset/core/config_manager.h"
26 #include "minddata/dataset/engine/jagged_connector.h"
27 #include "minddata/dataset/engine/execution_tree.h"
28 #include "minddata/dataset/engine/datasetops/source/io_block.h"
29 #include "minddata/dataset/util/random.h"
30
31 namespace mindspore {
32 namespace dataset {
ClueOp(int32_t num_workers,int64_t num_samples,int32_t worker_connector_size,ColKeyMap cols_to_keyword,std::vector<std::string> clue_files_list,int32_t op_connector_size,bool shuffle_files,int32_t num_devices,int32_t device_id)33 ClueOp::ClueOp(int32_t num_workers, int64_t num_samples, int32_t worker_connector_size, ColKeyMap cols_to_keyword,
34 std::vector<std::string> clue_files_list, int32_t op_connector_size, bool shuffle_files,
35 int32_t num_devices, int32_t device_id)
36 : NonMappableLeafOp(num_workers, worker_connector_size, num_samples, op_connector_size, shuffle_files, num_devices,
37 device_id),
38 clue_files_list_(std::move(clue_files_list)),
39 cols_to_keyword_(std::move(cols_to_keyword)) {}
40
Init()41 Status ClueOp::Init() {
42 RETURN_IF_NOT_OK(filename_index_->insert(clue_files_list_));
43
44 int32_t safe_queue_size = static_cast<int32_t>(std::ceil(clue_files_list_.size() / num_workers_) + 1);
45 io_block_queues_.Init(num_workers_, safe_queue_size);
46
47 RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));
48 jagged_rows_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
49
50 return Status::OK();
51 }
52
GetValue(const nlohmann::json & js,std::vector<std::string> key_chain,std::shared_ptr<Tensor> * t)53 Status ClueOp::GetValue(const nlohmann::json &js, std::vector<std::string> key_chain, std::shared_ptr<Tensor> *t) {
54 nlohmann::json cursor = js;
55 for (int i = 0; i < key_chain.size(); i++) {
56 if (cursor.find(key_chain[i]) != cursor.end()) {
57 cursor = cursor[key_chain[i]];
58 } else {
59 RETURN_STATUS_UNEXPECTED("Invalid data, in given JSON file, failed to find key: " + key_chain[i]);
60 }
61 }
62 std::string final_str = key_chain.back();
63 switch (cursor.type()) {
64 case nlohmann::detail::value_t::string:
65 RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<std::string>(), t));
66 break;
67 case nlohmann::detail::value_t::number_integer:
68 RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<int32_t>(), t));
69 break;
70 case nlohmann::detail::value_t::number_unsigned:
71 RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<uint32_t>(), t));
72 break;
73 case nlohmann::detail::value_t::number_float:
74 RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<float>(), t));
75 break;
76 case nlohmann::detail::value_t::array:
77 RETURN_IF_NOT_OK(Tensor::CreateFromVector(cursor.get<std::vector<std::string>>(), t));
78 break;
79 default:
80 break;
81 }
82 return Status::OK();
83 }
84
LoadFile(const std::string & file,int64_t start_offset,int64_t end_offset,int32_t worker_id)85 Status ClueOp::LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) {
86 auto realpath = FileUtils::GetRealPath(file.data());
87 if (!realpath.has_value()) {
88 MS_LOG(ERROR) << "Invalid file, get real path failed, path=" << file;
89 RETURN_STATUS_UNEXPECTED("Invalid file, get real path failed, path=" + file);
90 }
91
92 std::ifstream handle(realpath.value());
93 if (!handle.is_open()) {
94 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open file: " + file);
95 }
96
97 int64_t rows_total = 0;
98 std::string line;
99
100 while (getline(handle, line)) {
101 if (line.empty()) {
102 continue;
103 }
104 // If read to the end offset of this file, break.
105 if (rows_total >= end_offset) {
106 break;
107 }
108 // Skip line before start offset.
109 if (rows_total < start_offset) {
110 rows_total++;
111 continue;
112 }
113
114 nlohmann::json js;
115 try {
116 js = nlohmann::json::parse(line);
117 } catch (const std::exception &err) {
118 // Catch any exception and convert to Status return code
119 RETURN_STATUS_UNEXPECTED("Invalid file, failed to parse JSON file: " + file);
120 }
121 int cols_count = cols_to_keyword_.size();
122 TensorRow t_row(cols_count, nullptr);
123 // Add file path info
124 std::vector<std::string> file_path(cols_count, file);
125 t_row.setPath(file_path);
126 int cout = 0;
127 for (auto &p : cols_to_keyword_) {
128 std::shared_ptr<Tensor> tensor;
129 RETURN_IF_NOT_OK(GetValue(js, p.second, &tensor));
130 t_row[cout] = std::move(tensor);
131 cout++;
132 }
133
134 rows_total++;
135 RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(t_row)));
136 }
137
138 return Status::OK();
139 }
140
141 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const142 void ClueOp::Print(std::ostream &out, bool show_all) const {
143 if (!show_all) {
144 // Call the super class for displaying any common 1-liner info
145 ParallelOp::Print(out, show_all);
146 // Then show any custom derived-internal 1-liner info for this op
147 out << "\n";
148 } else {
149 // Call the super class for displaying any common detailed info
150 ParallelOp::Print(out, show_all);
151 // Then show any custom derived-internal stuff
152 out << "\nSample count: " << total_rows_ << "\nDevice id: " << device_id_ << "\nNumber of devices: " << num_devices_
153 << "\nShuffle files: " << ((shuffle_files_) ? "yes" : "no") << "\nClue files list:\n";
154 for (int i = 0; i < clue_files_list_.size(); ++i) {
155 out << " " << clue_files_list_[i];
156 }
157 out << "\n\n";
158 }
159 }
160
FillIOBlockQueue(const std::vector<int64_t> & i_keys)161 Status ClueOp::FillIOBlockQueue(const std::vector<int64_t> &i_keys) {
162 int32_t queue_index = 0;
163 int64_t pre_count = 0;
164 int64_t start_offset = 0;
165 int64_t end_offset = 0;
166 bool finish = false;
167 while (!finish) {
168 std::vector<std::pair<std::string, int64_t>> file_index;
169 if (!i_keys.empty()) {
170 for (auto it = i_keys.begin(); it != i_keys.end(); ++it) {
171 {
172 if (!load_io_block_queue_) {
173 break;
174 }
175 }
176 file_index.emplace_back(std::pair<std::string, int64_t>((*filename_index_)[*it], *it));
177 }
178 } else {
179 for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
180 {
181 if (!load_io_block_queue_) {
182 break;
183 }
184 }
185 file_index.emplace_back(std::pair<std::string, int64_t>(it.value(), it.key()));
186 }
187 }
188 for (auto file_info : file_index) {
189 if (NeedPushFileToBlockQueue(file_info.first, &start_offset, &end_offset, pre_count)) {
190 auto ioBlock =
191 std::make_unique<FilenameBlock>(file_info.second, start_offset, end_offset, IOBlock::kDeIoBlockNone);
192 RETURN_IF_NOT_OK(PushIoBlockQueue(queue_index, std::move(ioBlock)));
193 queue_index = (queue_index + 1) % num_workers_;
194 }
195
196 pre_count += filename_numrows_[file_info.first];
197 }
198
199 if (pre_count < (static_cast<int64_t>(device_id_) + 1) * num_rows_per_shard_) {
200 finish = false;
201 } else {
202 finish = true;
203 }
204 }
205
206 RETURN_IF_NOT_OK(PostEndOfEpoch(queue_index));
207 return Status::OK();
208 }
209
CalculateNumRowsPerShard()210 Status ClueOp::CalculateNumRowsPerShard() {
211 for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
212 int64_t count = CountTotalRows(it.value());
213 filename_numrows_[it.value()] = count;
214 num_rows_ += count;
215 }
216 if (num_rows_ == 0) {
217 std::stringstream ss;
218 for (int i = 0; i < clue_files_list_.size(); ++i) {
219 ss << " " << clue_files_list_[i];
220 }
221 std::string file_list = ss.str();
222 RETURN_STATUS_UNEXPECTED(
223 "Invalid data, CLUEDataset API can't read the data file (interface mismatch or no data found). "
224 "Check file path:" +
225 file_list);
226 }
227
228 num_rows_per_shard_ = static_cast<int64_t>(std::ceil(num_rows_ * 1.0 / num_devices_));
229 MS_LOG(DEBUG) << "Number rows per shard is " << num_rows_per_shard_;
230 return Status::OK();
231 }
232
CountTotalRowsPerFile(const std::string & file)233 int64_t CountTotalRowsPerFile(const std::string &file) {
234 auto realpath = FileUtils::GetRealPath(file.data());
235 if (!realpath.has_value()) {
236 MS_LOG(ERROR) << "Get real path failed, path=" << file;
237 return 0;
238 }
239
240 std::ifstream handle(realpath.value());
241 if (!handle.is_open()) {
242 MS_LOG(ERROR) << "Invalid file, failed to open file: " << file;
243 return 0;
244 }
245
246 std::string line;
247 int64_t count = 0;
248 while (getline(handle, line)) {
249 if (!line.empty()) {
250 count++;
251 }
252 }
253
254 return count;
255 }
256
CountTotalRows(const std::string & file)257 int64_t ClueOp::CountTotalRows(const std::string &file) { return CountTotalRowsPerFile(file); }
258
CountAllFileRows(const std::vector<std::string> & files,int64_t * count)259 Status ClueOp::CountAllFileRows(const std::vector<std::string> &files, int64_t *count) {
260 std::shared_ptr<ClueOp> op;
261 *count = 0;
262 for (auto file : files) {
263 *count += CountTotalRowsPerFile(file);
264 }
265 return Status::OK();
266 }
267
ComputeColMap()268 Status ClueOp::ComputeColMap() {
269 // Set the column name mapping (base class field)
270 if (column_name_id_map_.empty()) {
271 int count = 0;
272 for (auto &p : cols_to_keyword_) {
273 column_name_id_map_[p.first] = count;
274 count++;
275 }
276 } else {
277 MS_LOG(WARNING) << "Column name map is already set!";
278 }
279 return Status::OK();
280 }
281
282 } // namespace dataset
283 } // namespace mindspore
284