• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/clue_op.h"
17 
18 #include <string>
19 #include <utility>
20 #include <vector>
21 #include <fstream>
22 #include <iomanip>
23 
24 #include "utils/file_utils.h"
25 #include "minddata/dataset/core/config_manager.h"
26 #include "minddata/dataset/engine/jagged_connector.h"
27 #include "minddata/dataset/engine/execution_tree.h"
28 #include "minddata/dataset/engine/datasetops/source/io_block.h"
29 #include "minddata/dataset/util/random.h"
30 
31 namespace mindspore {
32 namespace dataset {
ClueOp(int32_t num_workers,int64_t num_samples,int32_t worker_connector_size,ColKeyMap cols_to_keyword,std::vector<std::string> clue_files_list,int32_t op_connector_size,bool shuffle_files,int32_t num_devices,int32_t device_id)33 ClueOp::ClueOp(int32_t num_workers, int64_t num_samples, int32_t worker_connector_size, ColKeyMap cols_to_keyword,
34                std::vector<std::string> clue_files_list, int32_t op_connector_size, bool shuffle_files,
35                int32_t num_devices, int32_t device_id)
36     : NonMappableLeafOp(num_workers, worker_connector_size, num_samples, op_connector_size, shuffle_files, num_devices,
37                         device_id),
38       clue_files_list_(std::move(clue_files_list)),
39       cols_to_keyword_(std::move(cols_to_keyword)) {}
40 
Init()41 Status ClueOp::Init() {
42   RETURN_IF_NOT_OK(filename_index_->insert(clue_files_list_));
43 
44   int32_t safe_queue_size = static_cast<int32_t>(std::ceil(clue_files_list_.size() / num_workers_) + 1);
45   io_block_queues_.Init(num_workers_, safe_queue_size);
46 
47   RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));
48   jagged_rows_connector_ = std::make_unique<JaggedConnector>(num_workers_, 1, worker_connector_size_);
49 
50   return Status::OK();
51 }
52 
GetValue(const nlohmann::json & js,std::vector<std::string> key_chain,std::shared_ptr<Tensor> * t)53 Status ClueOp::GetValue(const nlohmann::json &js, std::vector<std::string> key_chain, std::shared_ptr<Tensor> *t) {
54   nlohmann::json cursor = js;
55   for (int i = 0; i < key_chain.size(); i++) {
56     if (cursor.find(key_chain[i]) != cursor.end()) {
57       cursor = cursor[key_chain[i]];
58     } else {
59       RETURN_STATUS_UNEXPECTED("Invalid data, in given JSON file, failed to find key: " + key_chain[i]);
60     }
61   }
62   std::string final_str = key_chain.back();
63   switch (cursor.type()) {
64     case nlohmann::detail::value_t::string:
65       RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<std::string>(), t));
66       break;
67     case nlohmann::detail::value_t::number_integer:
68       RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<int32_t>(), t));
69       break;
70     case nlohmann::detail::value_t::number_unsigned:
71       RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<uint32_t>(), t));
72       break;
73     case nlohmann::detail::value_t::number_float:
74       RETURN_IF_NOT_OK(Tensor::CreateScalar(cursor.get<float>(), t));
75       break;
76     case nlohmann::detail::value_t::array:
77       RETURN_IF_NOT_OK(Tensor::CreateFromVector(cursor.get<std::vector<std::string>>(), t));
78       break;
79     default:
80       break;
81   }
82   return Status::OK();
83 }
84 
LoadFile(const std::string & file,int64_t start_offset,int64_t end_offset,int32_t worker_id)85 Status ClueOp::LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) {
86   auto realpath = FileUtils::GetRealPath(file.data());
87   if (!realpath.has_value()) {
88     MS_LOG(ERROR) << "Invalid file, get real path failed, path=" << file;
89     RETURN_STATUS_UNEXPECTED("Invalid file, get real path failed, path=" + file);
90   }
91 
92   std::ifstream handle(realpath.value());
93   if (!handle.is_open()) {
94     RETURN_STATUS_UNEXPECTED("Invalid file, failed to open file: " + file);
95   }
96 
97   int64_t rows_total = 0;
98   std::string line;
99 
100   while (getline(handle, line)) {
101     if (line.empty()) {
102       continue;
103     }
104     // If read to the end offset of this file, break.
105     if (rows_total >= end_offset) {
106       break;
107     }
108     // Skip line before start offset.
109     if (rows_total < start_offset) {
110       rows_total++;
111       continue;
112     }
113 
114     nlohmann::json js;
115     try {
116       js = nlohmann::json::parse(line);
117     } catch (const std::exception &err) {
118       // Catch any exception and convert to Status return code
119       RETURN_STATUS_UNEXPECTED("Invalid file, failed to parse JSON file: " + file);
120     }
121     int cols_count = cols_to_keyword_.size();
122     TensorRow t_row(cols_count, nullptr);
123     // Add file path info
124     std::vector<std::string> file_path(cols_count, file);
125     t_row.setPath(file_path);
126     int cout = 0;
127     for (auto &p : cols_to_keyword_) {
128       std::shared_ptr<Tensor> tensor;
129       RETURN_IF_NOT_OK(GetValue(js, p.second, &tensor));
130       t_row[cout] = std::move(tensor);
131       cout++;
132     }
133 
134     rows_total++;
135     RETURN_IF_NOT_OK(jagged_rows_connector_->Add(worker_id, std::move(t_row)));
136   }
137 
138   return Status::OK();
139 }
140 
141 // A print method typically used for debugging
Print(std::ostream & out,bool show_all) const142 void ClueOp::Print(std::ostream &out, bool show_all) const {
143   if (!show_all) {
144     // Call the super class for displaying any common 1-liner info
145     ParallelOp::Print(out, show_all);
146     // Then show any custom derived-internal 1-liner info for this op
147     out << "\n";
148   } else {
149     // Call the super class for displaying any common detailed info
150     ParallelOp::Print(out, show_all);
151     // Then show any custom derived-internal stuff
152     out << "\nSample count: " << total_rows_ << "\nDevice id: " << device_id_ << "\nNumber of devices: " << num_devices_
153         << "\nShuffle files: " << ((shuffle_files_) ? "yes" : "no") << "\nClue files list:\n";
154     for (int i = 0; i < clue_files_list_.size(); ++i) {
155       out << " " << clue_files_list_[i];
156     }
157     out << "\n\n";
158   }
159 }
160 
FillIOBlockQueue(const std::vector<int64_t> & i_keys)161 Status ClueOp::FillIOBlockQueue(const std::vector<int64_t> &i_keys) {
162   int32_t queue_index = 0;
163   int64_t pre_count = 0;
164   int64_t start_offset = 0;
165   int64_t end_offset = 0;
166   bool finish = false;
167   while (!finish) {
168     std::vector<std::pair<std::string, int64_t>> file_index;
169     if (!i_keys.empty()) {
170       for (auto it = i_keys.begin(); it != i_keys.end(); ++it) {
171         {
172           if (!load_io_block_queue_) {
173             break;
174           }
175         }
176         file_index.emplace_back(std::pair<std::string, int64_t>((*filename_index_)[*it], *it));
177       }
178     } else {
179       for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
180         {
181           if (!load_io_block_queue_) {
182             break;
183           }
184         }
185         file_index.emplace_back(std::pair<std::string, int64_t>(it.value(), it.key()));
186       }
187     }
188     for (auto file_info : file_index) {
189       if (NeedPushFileToBlockQueue(file_info.first, &start_offset, &end_offset, pre_count)) {
190         auto ioBlock =
191           std::make_unique<FilenameBlock>(file_info.second, start_offset, end_offset, IOBlock::kDeIoBlockNone);
192         RETURN_IF_NOT_OK(PushIoBlockQueue(queue_index, std::move(ioBlock)));
193         queue_index = (queue_index + 1) % num_workers_;
194       }
195 
196       pre_count += filename_numrows_[file_info.first];
197     }
198 
199     if (pre_count < (static_cast<int64_t>(device_id_) + 1) * num_rows_per_shard_) {
200       finish = false;
201     } else {
202       finish = true;
203     }
204   }
205 
206   RETURN_IF_NOT_OK(PostEndOfEpoch(queue_index));
207   return Status::OK();
208 }
209 
CalculateNumRowsPerShard()210 Status ClueOp::CalculateNumRowsPerShard() {
211   for (auto it = filename_index_->begin(); it != filename_index_->end(); ++it) {
212     int64_t count = CountTotalRows(it.value());
213     filename_numrows_[it.value()] = count;
214     num_rows_ += count;
215   }
216   if (num_rows_ == 0) {
217     std::stringstream ss;
218     for (int i = 0; i < clue_files_list_.size(); ++i) {
219       ss << " " << clue_files_list_[i];
220     }
221     std::string file_list = ss.str();
222     RETURN_STATUS_UNEXPECTED(
223       "Invalid data, CLUEDataset API can't read the data file (interface mismatch or no data found). "
224       "Check file path:" +
225       file_list);
226   }
227 
228   num_rows_per_shard_ = static_cast<int64_t>(std::ceil(num_rows_ * 1.0 / num_devices_));
229   MS_LOG(DEBUG) << "Number rows per shard is " << num_rows_per_shard_;
230   return Status::OK();
231 }
232 
CountTotalRowsPerFile(const std::string & file)233 int64_t CountTotalRowsPerFile(const std::string &file) {
234   auto realpath = FileUtils::GetRealPath(file.data());
235   if (!realpath.has_value()) {
236     MS_LOG(ERROR) << "Get real path failed, path=" << file;
237     return 0;
238   }
239 
240   std::ifstream handle(realpath.value());
241   if (!handle.is_open()) {
242     MS_LOG(ERROR) << "Invalid file, failed to open file: " << file;
243     return 0;
244   }
245 
246   std::string line;
247   int64_t count = 0;
248   while (getline(handle, line)) {
249     if (!line.empty()) {
250       count++;
251     }
252   }
253 
254   return count;
255 }
256 
CountTotalRows(const std::string & file)257 int64_t ClueOp::CountTotalRows(const std::string &file) { return CountTotalRowsPerFile(file); }
258 
CountAllFileRows(const std::vector<std::string> & files,int64_t * count)259 Status ClueOp::CountAllFileRows(const std::vector<std::string> &files, int64_t *count) {
260   std::shared_ptr<ClueOp> op;
261   *count = 0;
262   for (auto file : files) {
263     *count += CountTotalRowsPerFile(file);
264   }
265   return Status::OK();
266 }
267 
ComputeColMap()268 Status ClueOp::ComputeColMap() {
269   // Set the column name mapping (base class field)
270   if (column_name_id_map_.empty()) {
271     int count = 0;
272     for (auto &p : cols_to_keyword_) {
273       column_name_id_map_[p.first] = count;
274       count++;
275     }
276   } else {
277     MS_LOG(WARNING) << "Column name map is already set!";
278   }
279   return Status::OK();
280 }
281 
282 }  // namespace dataset
283 }  // namespace mindspore
284