1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
18
19 #include <unordered_set>
20
21 #include "utils/ms_utils.h"
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/core/tensor_shape.h"
24 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
25 #include "minddata/dataset/engine/execution_tree.h"
26
27 namespace mindspore {
28 namespace dataset {
29 #ifdef ENABLE_PYTHON
ImageFolderOp(int32_t num_wkrs,std::string file_dir,int32_t queue_size,bool recursive,bool do_decode,const std::set<std::string> & exts,const std::map<std::string,int32_t> & map,std::unique_ptr<DataSchema> data_schema,std::shared_ptr<SamplerRT> sampler,py::function decrypt)30 ImageFolderOp::ImageFolderOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, bool recursive, bool do_decode,
31 const std::set<std::string> &exts, const std::map<std::string, int32_t> &map,
32 std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler,
33 py::function decrypt)
34 : MappableLeafOp(num_wkrs, queue_size, std::move(sampler)),
35 folder_path_(std::move(file_dir)),
36 recursive_(recursive),
37 decode_(do_decode),
38 extensions_(exts),
39 class_index_(map),
40 data_schema_(std::move(data_schema)),
41 sampler_ind_(0),
42 dirname_offset_(0),
43 decrypt_(std::move(decrypt)) {
44 folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
45 image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
46 }
47 #else
48 ImageFolderOp::ImageFolderOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, bool recursive, bool do_decode,
49 const std::set<std::string> &exts, const std::map<std::string, int32_t> &map,
50 std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
51 : MappableLeafOp(num_wkrs, queue_size, std::move(sampler)),
52 folder_path_(std::move(file_dir)),
53 recursive_(recursive),
54 decode_(do_decode),
55 extensions_(exts),
56 class_index_(map),
57 data_schema_(std::move(data_schema)),
58 sampler_ind_(0),
59 dirname_offset_(0) {
60 folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
61 image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
62 }
63 #endif
64
65 // Master thread that pulls the prescan worker's results.
66 // Keep collecting results until all prescan workers quit
67 // Then consolidate 2 level shuffles together into 1 giant vector
68 // calculate numRows then return
PrepareData()69 Status ImageFolderOp::PrepareData() {
70 std::vector<FolderImagesPair> v;
71 int64_t cnt = 0;
72 while (cnt != num_workers_) { // count number of end signals
73 FolderImagesPair p;
74 RETURN_IF_NOT_OK(image_name_queue_->PopFront(&p));
75 if (p == nullptr) {
76 cnt++;
77 } else {
78 v.push_back(p);
79 }
80 }
81 std::sort(v.begin(), v.end(),
82 [](const FolderImagesPair &lhs, const FolderImagesPair &rhs) { return lhs->first < rhs->first; });
83 // following loop puts the 2 level of shuffles together into 1 vector
84 for (size_t ind = 0; ind < v.size(); ++ind) {
85 while (v[ind]->second.empty() == false) {
86 MS_ASSERT(!(v[ind]->first.empty())); // make sure that v[ind]->first.substr(1) is not out of bound
87 v[ind]->second.front()->second = class_index_.empty() ? ind : class_index_[v[ind]->first.substr(1)];
88 image_label_pairs_.push_back(v[ind]->second.front());
89 v[ind]->second.pop();
90 }
91 }
92 image_label_pairs_.shrink_to_fit();
93 num_rows_ = image_label_pairs_.size();
94 if (num_rows_ == 0) {
95 RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
96 "Dataset API can't read the data file (interface mismatch or no data found). Check " +
97 DatasetName() + " file path: " + folder_path_);
98 }
99 // free memory of two queues used for pre-scan
100 folder_name_queue_->Reset();
101 image_name_queue_->Reset();
102 return Status::OK();
103 }
104
105 // Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow
LoadTensorRow(row_id_type row_id,TensorRow * trow)106 Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
107 RETURN_UNEXPECTED_IF_NULL(trow);
108 ImageLabelPair pair_ptr = image_label_pairs_[row_id];
109 std::shared_ptr<Tensor> image, label;
110 RETURN_IF_NOT_OK(Tensor::CreateScalar(pair_ptr->second, &label));
111 #ifdef ENABLE_PYTHON
112 RETURN_IF_NOT_OK(MappableLeafOp::ImageDecrypt(folder_path_ + (pair_ptr->first), &image, decrypt_));
113 #else
114 RETURN_IF_NOT_OK(Tensor::CreateFromFile(folder_path_ + (pair_ptr->first), &image));
115 #endif
116
117 if (decode_ == true) {
118 Status rc = Decode(image, &image);
119 if (rc.IsError()) {
120 std::string err = "Invalid image, " + folder_path_ + (pair_ptr->first) +
121 " decode failed, the image is broken or permission denied.";
122 RETURN_STATUS_UNEXPECTED(err);
123 }
124 }
125 (*trow) = TensorRow(row_id, {std::move(image), std::move(label)});
126 trow->setPath({folder_path_ + (pair_ptr->first), std::string("")});
127 return Status::OK();
128 }
129
Print(std::ostream & out,bool show_all) const130 void ImageFolderOp::Print(std::ostream &out, bool show_all) const {
131 if (!show_all) {
132 // Call the super class for displaying any common 1-liner info
133 ParallelOp::Print(out, show_all);
134 // Then show any custom derived-internal 1-liner info for this op
135 out << "\n";
136 } else {
137 // Call the super class for displaying any common detailed info
138 ParallelOp::Print(out, show_all);
139 // Then show any custom derived-internal stuff
140 out << "\nNumber of rows:" << num_rows_ << "\n"
141 << DatasetName(true) << " directory: " << folder_path_ << "\nDecode: " << (decode_ ? "yes" : "no") << "\n\n";
142 }
143 }
144
145 // Derived from RandomAccessOp
GetClassIds(std::map<int32_t,std::vector<int64_t>> * cls_ids) const146 Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
147 if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {
148 if (image_label_pairs_.empty()) {
149 RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + DatasetName(true) +
150 "Dataset API can't read the data file(interface mismatch or no data found). Check " +
151 DatasetName() + " file path: " + folder_path_);
152 } else {
153 RETURN_STATUS_UNEXPECTED(
154 "[Internal ERROR], Map containing image-index pair is nullptr or has been set in other place,"
155 "it must be empty before using GetClassIds.");
156 }
157 }
158 for (size_t i = 0; i < image_label_pairs_.size(); ++i) {
159 (*cls_ids)[image_label_pairs_[i]->second].push_back(i);
160 }
161 for (auto &pair : (*cls_ids)) {
162 pair.second.shrink_to_fit();
163 }
164 return Status::OK();
165 }
166
167 // Worker Entry for pre-scanning all the folders and do the 1st level shuffle
168 // Worker pull a file name from folder_name_queue_ (which is a Queue), walks all the images under that foldername
169 // After walking is complete, sort all the file names (relative path to all jpeg files under the same directory )
170 // (Sort is automatically conducted using a set which is implemented using a Red-Black Tree)
171 // Add the sorted filenames in to a queue. The make a pair (foldername, queue<filenames>*),
172 // foldername is used for 2nd level sorting.
173 // FYI: 1st level sorting: sort all images under the same directory.
174 // FYI: 2nd level sorting: sort all folder names
175 // push this pair to image_name_queue (which is again a Queue)
PrescanWorkerEntry(int32_t worker_id)176 Status ImageFolderOp::PrescanWorkerEntry(int32_t worker_id) {
177 TaskManager::FindMe()->Post();
178 std::string folder_name;
179 RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
180 while (folder_name.empty() == false) {
181 Path folder(folder_path_ + folder_name);
182 std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder);
183 if (folder.Exists() == false || dirItr == nullptr) {
184 RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + folder_name + " does not exist or permission denied.");
185 }
186 std::set<std::string> imgs; // use this for ordering
187 while (dirItr->HasNext()) {
188 Path file = dirItr->Next();
189 if (extensions_.empty() || extensions_.find(file.Extension()) != extensions_.end()) {
190 (void)imgs.insert(file.ToString().substr(dirname_offset_));
191 } else {
192 MS_LOG(WARNING) << DatasetName(true) << " operator unsupported file found: " << file.ToString()
193 << ", extension: " << file.Extension() << ".";
194 }
195 }
196 FolderImagesPair p = std::make_shared<std::pair<std::string, std::queue<ImageLabelPair>>>();
197 p->first = folder_name;
198 for (const std::string &img : imgs) {
199 p->second.push(std::make_shared<std::pair<std::string, int32_t>>(img, 0));
200 }
201 RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(p));
202 RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
203 }
204 RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(nullptr)); // end signal
205 return Status::OK();
206 }
207
208 // This helper function recursively walks all folder_paths, and send each foldername to folder_name_queue_
209 // if mRecursive == false, don't go into folder of folders
RecursiveWalkFolder(Path * dir)210 Status ImageFolderOp::RecursiveWalkFolder(Path *dir) {
211 RETURN_UNEXPECTED_IF_NULL(dir);
212 std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(dir);
213 RETURN_UNEXPECTED_IF_NULL(dir_itr);
214 while (dir_itr->HasNext()) {
215 Path subdir = dir_itr->Next();
216 if (subdir.IsDirectory()) {
217 if (class_index_.empty() ||
218 class_index_.find(subdir.ToString().substr(dirname_offset_ + 1)) != class_index_.end()) {
219 RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack(subdir.ToString().substr(dirname_offset_)));
220 }
221 if (recursive_ == true) {
222 MS_LOG(ERROR) << "[Internal ERROR] RecursiveWalkFolder(&subdir) functionality is disabled permanently. "
223 << "No recursive walk of directory will be performed.";
224 }
225 }
226 }
227 return Status::OK();
228 }
229
230 // A thread that calls RecursiveWalkFolder
StartAsyncWalk()231 Status ImageFolderOp::StartAsyncWalk() {
232 TaskManager::FindMe()->Post();
233 Path dir(folder_path_);
234 if (dir.Exists() == false || dir.IsDirectory() == false) {
235 RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + folder_path_ + " may not exist or the path is not a directory.");
236 }
237 dirname_offset_ = folder_path_.length();
238 RETURN_IF_NOT_OK(RecursiveWalkFolder(&dir));
239 // send out num_workers_ end signal to folder_name_queue_, 1 for each worker.
240 // Upon receiving end Signal, worker quits and set another end Signal to image_name_queue.
241 for (int32_t ind = 0; ind < num_workers_; ++ind) {
242 RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack("")); // end signal
243 }
244 return Status::OK();
245 }
246
RegisterAndLaunchThreads()247 Status ImageFolderOp::RegisterAndLaunchThreads() {
248 RETURN_IF_NOT_OK(ParallelOp::RegisterAndLaunchThreads());
249 RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
250 RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
251
252 // The following code launch 3 threads group
253 // 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_.
254 // 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue
255 // 3) Launch main workers that load TensorRows by reading all images
256 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(Name() + "::WalkDir",
257 std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
258 RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
259 std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
260 Name() + "::PrescanWorkerEntry", id()));
261
262 return Status::OK();
263 }
264
CountRowsAndClasses(const std::string & path,const std::set<std::string> & exts,int64_t * num_rows,int64_t * num_classes,std::map<std::string,int32_t> class_index)265 Status ImageFolderOp::CountRowsAndClasses(const std::string &path, const std::set<std::string> &exts, int64_t *num_rows,
266 int64_t *num_classes, std::map<std::string, int32_t> class_index) {
267 Path dir(path);
268 std::string err_msg = "";
269 int64_t row_cnt = 0;
270 err_msg += (dir.Exists() == false || dir.IsDirectory() == false)
271 ? "Invalid dataset_dir, " + path + " does not exist or the path is not a directory. "
272 : "";
273 err_msg += (num_classes == nullptr && num_rows == nullptr) ? "[Internal ERROR] num_class and num_rows are null." : "";
274 if (err_msg.empty() == false) {
275 RETURN_STATUS_UNEXPECTED(err_msg);
276 }
277 std::queue<std::string> folder_paths;
278 std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(&dir);
279 RETURN_UNEXPECTED_IF_NULL(dir_itr);
280 std::unordered_set<std::string> folder_names;
281 while (dir_itr->HasNext()) {
282 Path subdir = dir_itr->Next();
283 if (subdir.IsDirectory()) {
284 folder_paths.push(subdir.ToString());
285 if (!class_index.empty()) {
286 folder_names.insert(subdir.Basename());
287 }
288 }
289 }
290 if (num_classes != nullptr) {
291 // if class index is empty, get everything on disk
292 if (class_index.empty()) {
293 *num_classes = folder_paths.size();
294 } else {
295 for (const auto &p : class_index) {
296 CHECK_FAIL_RETURN_UNEXPECTED(folder_names.find(p.first) != folder_names.end(),
297 "Invalid subdirectory, class: " + p.first + " doesn't exist in " + path + " .");
298 }
299 (*num_classes) = class_index.size();
300 }
301 }
302 // return here if only num_class is needed
303 RETURN_OK_IF_TRUE(num_rows == nullptr);
304 while (folder_paths.empty() == false) {
305 Path subdir(folder_paths.front());
306 dir_itr = Path::DirIterator::OpenDirectory(&subdir);
307 if (subdir.Exists() == false || dir_itr == nullptr) {
308 RETURN_STATUS_UNEXPECTED("Invalid subdirectory, ImageFolder Dataset subdirectory: " + subdir.ToString() +
309 " does not exist or permission denied");
310 }
311 while (dir_itr->HasNext()) {
312 if (exts.empty() || exts.find(dir_itr->Next().Extension()) != exts.end()) {
313 ++row_cnt;
314 }
315 }
316 folder_paths.pop();
317 }
318 (*num_rows) = row_cnt;
319 return Status::OK();
320 }
321
ComputeColMap()322 Status ImageFolderOp::ComputeColMap() {
323 // Set the column name map (base class field)
324 if (column_name_id_map_.empty()) {
325 for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
326 column_name_id_map_[data_schema_->Column(i).Name()] = i;
327 }
328 } else {
329 MS_LOG(WARNING) << "Column name map is already set!";
330 }
331 return Status::OK();
332 }
333
334 // Get number of classes
GetNumClasses(int64_t * num_classes)335 Status ImageFolderOp::GetNumClasses(int64_t *num_classes) {
336 RETURN_UNEXPECTED_IF_NULL(num_classes);
337 if (num_classes_ > 0) {
338 *num_classes = num_classes_;
339 return Status::OK();
340 }
341 RETURN_IF_NOT_OK(CountRowsAndClasses(folder_path_, extensions_, nullptr, num_classes, class_index_));
342 num_classes_ = *num_classes;
343 return Status::OK();
344 }
345
InitPullMode()346 Status ImageFolderOp::InitPullMode() {
347 // to avoid the concurrent and multi end signal in StartAsyncWalk, explicitly set num_workers_ to 1
348 num_workers_ = 1;
349 RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
350 RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
351 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(Name() + "::WalkDir",
352 std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
353 RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
354 std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
355 Name() + "::PrescanWorkerEntry", id()));
356 return PrepareData();
357 }
358
GetClassIndexing(std::vector<std::pair<std::string,std::vector<int32_t>>> * output_class_indexing)359 Status ImageFolderOp::GetClassIndexing(
360 std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) {
361 RETURN_UNEXPECTED_IF_NULL(output_class_indexing);
362 output_class_indexing->clear();
363
364 // if class_index_ exist, return directly
365 if (!class_index_.empty()) {
366 (void)std::transform(
367 class_index_.begin(), class_index_.end(), std::back_inserter(*output_class_indexing),
368 [](const auto &elem) { return std::pair<std::string, std::vector<int32_t>>(elem.first, {elem.second}); });
369 return Status::OK();
370 }
371
372 // Iter folder path
373 Path dir(folder_path_);
374 if (!dir.Exists() || !dir.IsDirectory()) {
375 RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + folder_path_ + " does not exist or not a directory.");
376 }
377
378 std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(&dir);
379 RETURN_UNEXPECTED_IF_NULL(dir_itr);
380 std::vector<std::string> folder_names;
381 while (dir_itr->HasNext()) {
382 Path subdir = dir_itr->Next();
383 if (subdir.IsDirectory()) {
384 folder_names.push_back(subdir.Basename());
385 }
386 }
387 if (folder_names.empty()) {
388 RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
389 "Dataset API can't read the data file (interface mismatch or no data found). Check " +
390 DatasetName() + " file path: " + folder_path_);
391 }
392
393 std::sort(folder_names.begin(), folder_names.end());
394 int32_t label_count = 0;
395 (void)std::transform(folder_names.begin(), folder_names.end(), std::back_inserter(*output_class_indexing),
396 [&label_count](const auto &elem) {
397 auto p = std::pair<std::string, std::vector<int32_t>>(elem, {label_count});
398 label_count++;
399 return p;
400 });
401 return Status::OK();
402 }
403 } // namespace dataset
404 } // namespace mindspore
405