• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
18 
19 #include <unordered_set>
20 
21 #include "utils/ms_utils.h"
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/core/tensor_shape.h"
24 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
25 #include "minddata/dataset/engine/execution_tree.h"
26 
27 namespace mindspore {
28 namespace dataset {
29 #ifdef ENABLE_PYTHON
ImageFolderOp(int32_t num_wkrs,std::string file_dir,int32_t queue_size,bool recursive,bool do_decode,const std::set<std::string> & exts,const std::map<std::string,int32_t> & map,std::unique_ptr<DataSchema> data_schema,std::shared_ptr<SamplerRT> sampler,py::function decrypt)30 ImageFolderOp::ImageFolderOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, bool recursive, bool do_decode,
31                              const std::set<std::string> &exts, const std::map<std::string, int32_t> &map,
32                              std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler,
33                              py::function decrypt)
34     : MappableLeafOp(num_wkrs, queue_size, std::move(sampler)),
35       folder_path_(std::move(file_dir)),
36       recursive_(recursive),
37       decode_(do_decode),
38       extensions_(exts),
39       class_index_(map),
40       data_schema_(std::move(data_schema)),
41       sampler_ind_(0),
42       dirname_offset_(0),
43       decrypt_(std::move(decrypt)) {
44   folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
45   image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
46 }
47 #else
48 ImageFolderOp::ImageFolderOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, bool recursive, bool do_decode,
49                              const std::set<std::string> &exts, const std::map<std::string, int32_t> &map,
50                              std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
51     : MappableLeafOp(num_wkrs, queue_size, std::move(sampler)),
52       folder_path_(std::move(file_dir)),
53       recursive_(recursive),
54       decode_(do_decode),
55       extensions_(exts),
56       class_index_(map),
57       data_schema_(std::move(data_schema)),
58       sampler_ind_(0),
59       dirname_offset_(0) {
60   folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
61   image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
62 }
63 #endif
64 
65 // Master thread that pulls the prescan worker's results.
66 // Keep collecting results until all prescan workers quit
67 // Then consolidate 2 level shuffles together into 1 giant vector
68 // calculate numRows then return
PrepareData()69 Status ImageFolderOp::PrepareData() {
70   std::vector<FolderImagesPair> v;
71   int64_t cnt = 0;
72   while (cnt != num_workers_) {  // count number of end signals
73     FolderImagesPair p;
74     RETURN_IF_NOT_OK(image_name_queue_->PopFront(&p));
75     if (p == nullptr) {
76       cnt++;
77     } else {
78       v.push_back(p);
79     }
80   }
81   std::sort(v.begin(), v.end(),
82             [](const FolderImagesPair &lhs, const FolderImagesPair &rhs) { return lhs->first < rhs->first; });
83   // following loop puts the 2 level of shuffles together into 1 vector
84   for (size_t ind = 0; ind < v.size(); ++ind) {
85     while (v[ind]->second.empty() == false) {
86       MS_ASSERT(!(v[ind]->first.empty()));  // make sure that v[ind]->first.substr(1) is not out of bound
87       v[ind]->second.front()->second = class_index_.empty() ? ind : class_index_[v[ind]->first.substr(1)];
88       image_label_pairs_.push_back(v[ind]->second.front());
89       v[ind]->second.pop();
90     }
91   }
92   image_label_pairs_.shrink_to_fit();
93   num_rows_ = image_label_pairs_.size();
94   if (num_rows_ == 0) {
95     RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
96                              "Dataset API can't read the data file (interface mismatch or no data found). Check " +
97                              DatasetName() + " file path: " + folder_path_);
98   }
99   // free memory of two queues used for pre-scan
100   folder_name_queue_->Reset();
101   image_name_queue_->Reset();
102   return Status::OK();
103 }
104 
105 // Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow
LoadTensorRow(row_id_type row_id,TensorRow * trow)106 Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
107   RETURN_UNEXPECTED_IF_NULL(trow);
108   ImageLabelPair pair_ptr = image_label_pairs_[row_id];
109   std::shared_ptr<Tensor> image, label;
110   RETURN_IF_NOT_OK(Tensor::CreateScalar(pair_ptr->second, &label));
111 #ifdef ENABLE_PYTHON
112   RETURN_IF_NOT_OK(MappableLeafOp::ImageDecrypt(folder_path_ + (pair_ptr->first), &image, decrypt_));
113 #else
114   RETURN_IF_NOT_OK(Tensor::CreateFromFile(folder_path_ + (pair_ptr->first), &image));
115 #endif
116 
117   if (decode_ == true) {
118     Status rc = Decode(image, &image);
119     if (rc.IsError()) {
120       std::string err = "Invalid image, " + folder_path_ + (pair_ptr->first) +
121                         " decode failed, the image is broken or permission denied.";
122       RETURN_STATUS_UNEXPECTED(err);
123     }
124   }
125   (*trow) = TensorRow(row_id, {std::move(image), std::move(label)});
126   trow->setPath({folder_path_ + (pair_ptr->first), std::string("")});
127   return Status::OK();
128 }
129 
Print(std::ostream & out,bool show_all) const130 void ImageFolderOp::Print(std::ostream &out, bool show_all) const {
131   if (!show_all) {
132     // Call the super class for displaying any common 1-liner info
133     ParallelOp::Print(out, show_all);
134     // Then show any custom derived-internal 1-liner info for this op
135     out << "\n";
136   } else {
137     // Call the super class for displaying any common detailed info
138     ParallelOp::Print(out, show_all);
139     // Then show any custom derived-internal stuff
140     out << "\nNumber of rows:" << num_rows_ << "\n"
141         << DatasetName(true) << " directory: " << folder_path_ << "\nDecode: " << (decode_ ? "yes" : "no") << "\n\n";
142   }
143 }
144 
145 // Derived from RandomAccessOp
GetClassIds(std::map<int32_t,std::vector<int64_t>> * cls_ids) const146 Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
147   if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {
148     if (image_label_pairs_.empty()) {
149       RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + DatasetName(true) +
150                                "Dataset API can't read the data file(interface mismatch or no data found). Check " +
151                                DatasetName() + " file path: " + folder_path_);
152     } else {
153       RETURN_STATUS_UNEXPECTED(
154         "[Internal ERROR], Map containing image-index pair is nullptr or has been set in other place,"
155         "it must be empty before using GetClassIds.");
156     }
157   }
158   for (size_t i = 0; i < image_label_pairs_.size(); ++i) {
159     (*cls_ids)[image_label_pairs_[i]->second].push_back(i);
160   }
161   for (auto &pair : (*cls_ids)) {
162     pair.second.shrink_to_fit();
163   }
164   return Status::OK();
165 }
166 
167 // Worker Entry for pre-scanning all the folders and do the 1st level shuffle
168 // Worker pull a file name from folder_name_queue_ (which is a Queue), walks all the images under that foldername
169 // After walking is complete, sort all the file names (relative path to all jpeg files under the same directory )
170 // (Sort is automatically conducted using a set which is implemented using a Red-Black Tree)
171 // Add the sorted filenames in to a queue. The make a pair (foldername, queue<filenames>*),
172 // foldername is used for 2nd level sorting.
173 // FYI: 1st level sorting: sort all images under the same directory.
174 // FYI: 2nd level sorting: sort all folder names
175 // push this pair to image_name_queue (which is again a Queue)
PrescanWorkerEntry(int32_t worker_id)176 Status ImageFolderOp::PrescanWorkerEntry(int32_t worker_id) {
177   TaskManager::FindMe()->Post();
178   std::string folder_name;
179   RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
180   while (folder_name.empty() == false) {
181     Path folder(folder_path_ + folder_name);
182     std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder);
183     if (folder.Exists() == false || dirItr == nullptr) {
184       RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + folder_name + " does not exist or permission denied.");
185     }
186     std::set<std::string> imgs;  // use this for ordering
187     while (dirItr->HasNext()) {
188       Path file = dirItr->Next();
189       if (extensions_.empty() || extensions_.find(file.Extension()) != extensions_.end()) {
190         (void)imgs.insert(file.ToString().substr(dirname_offset_));
191       } else {
192         MS_LOG(WARNING) << DatasetName(true) << " operator unsupported file found: " << file.ToString()
193                         << ", extension: " << file.Extension() << ".";
194       }
195     }
196     FolderImagesPair p = std::make_shared<std::pair<std::string, std::queue<ImageLabelPair>>>();
197     p->first = folder_name;
198     for (const std::string &img : imgs) {
199       p->second.push(std::make_shared<std::pair<std::string, int32_t>>(img, 0));
200     }
201     RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(p));
202     RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
203   }
204   RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(nullptr));  // end signal
205   return Status::OK();
206 }
207 
208 // This helper function recursively walks all folder_paths, and send each foldername to folder_name_queue_
209 // if mRecursive == false, don't go into folder of folders
RecursiveWalkFolder(Path * dir)210 Status ImageFolderOp::RecursiveWalkFolder(Path *dir) {
211   RETURN_UNEXPECTED_IF_NULL(dir);
212   std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(dir);
213   RETURN_UNEXPECTED_IF_NULL(dir_itr);
214   while (dir_itr->HasNext()) {
215     Path subdir = dir_itr->Next();
216     if (subdir.IsDirectory()) {
217       if (class_index_.empty() ||
218           class_index_.find(subdir.ToString().substr(dirname_offset_ + 1)) != class_index_.end()) {
219         RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack(subdir.ToString().substr(dirname_offset_)));
220       }
221       if (recursive_ == true) {
222         MS_LOG(ERROR) << "[Internal ERROR] RecursiveWalkFolder(&subdir) functionality is disabled permanently. "
223                       << "No recursive walk of directory will be performed.";
224       }
225     }
226   }
227   return Status::OK();
228 }
229 
230 // A thread that calls RecursiveWalkFolder
StartAsyncWalk()231 Status ImageFolderOp::StartAsyncWalk() {
232   TaskManager::FindMe()->Post();
233   Path dir(folder_path_);
234   if (dir.Exists() == false || dir.IsDirectory() == false) {
235     RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + folder_path_ + " may not exist or the path is not a directory.");
236   }
237   dirname_offset_ = folder_path_.length();
238   RETURN_IF_NOT_OK(RecursiveWalkFolder(&dir));
239   // send out num_workers_ end signal to folder_name_queue_, 1 for each worker.
240   // Upon receiving end Signal, worker quits and set another end Signal to image_name_queue.
241   for (int32_t ind = 0; ind < num_workers_; ++ind) {
242     RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack(""));  // end signal
243   }
244   return Status::OK();
245 }
246 
RegisterAndLaunchThreads()247 Status ImageFolderOp::RegisterAndLaunchThreads() {
248   RETURN_IF_NOT_OK(ParallelOp::RegisterAndLaunchThreads());
249   RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
250   RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
251 
252   // The following code launch 3 threads group
253   // 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_.
254   // 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue
255   // 3) Launch main workers that load TensorRows by reading all images
256   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(Name() + "::WalkDir",
257                                                       std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
258   RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
259                                         std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
260                                         Name() + "::PrescanWorkerEntry", id()));
261 
262   return Status::OK();
263 }
264 
CountRowsAndClasses(const std::string & path,const std::set<std::string> & exts,int64_t * num_rows,int64_t * num_classes,std::map<std::string,int32_t> class_index)265 Status ImageFolderOp::CountRowsAndClasses(const std::string &path, const std::set<std::string> &exts, int64_t *num_rows,
266                                           int64_t *num_classes, std::map<std::string, int32_t> class_index) {
267   Path dir(path);
268   std::string err_msg = "";
269   int64_t row_cnt = 0;
270   err_msg += (dir.Exists() == false || dir.IsDirectory() == false)
271                ? "Invalid dataset_dir, " + path + " does not exist or the path is not a directory. "
272                : "";
273   err_msg += (num_classes == nullptr && num_rows == nullptr) ? "[Internal ERROR] num_class and num_rows are null." : "";
274   if (err_msg.empty() == false) {
275     RETURN_STATUS_UNEXPECTED(err_msg);
276   }
277   std::queue<std::string> folder_paths;
278   std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(&dir);
279   RETURN_UNEXPECTED_IF_NULL(dir_itr);
280   std::unordered_set<std::string> folder_names;
281   while (dir_itr->HasNext()) {
282     Path subdir = dir_itr->Next();
283     if (subdir.IsDirectory()) {
284       folder_paths.push(subdir.ToString());
285       if (!class_index.empty()) {
286         folder_names.insert(subdir.Basename());
287       }
288     }
289   }
290   if (num_classes != nullptr) {
291     // if class index is empty, get everything on disk
292     if (class_index.empty()) {
293       *num_classes = folder_paths.size();
294     } else {
295       for (const auto &p : class_index) {
296         CHECK_FAIL_RETURN_UNEXPECTED(folder_names.find(p.first) != folder_names.end(),
297                                      "Invalid subdirectory, class: " + p.first + " doesn't exist in " + path + " .");
298       }
299       (*num_classes) = class_index.size();
300     }
301   }
302   // return here if only num_class is needed
303   RETURN_OK_IF_TRUE(num_rows == nullptr);
304   while (folder_paths.empty() == false) {
305     Path subdir(folder_paths.front());
306     dir_itr = Path::DirIterator::OpenDirectory(&subdir);
307     if (subdir.Exists() == false || dir_itr == nullptr) {
308       RETURN_STATUS_UNEXPECTED("Invalid subdirectory, ImageFolder Dataset subdirectory: " + subdir.ToString() +
309                                " does not exist or permission denied");
310     }
311     while (dir_itr->HasNext()) {
312       if (exts.empty() || exts.find(dir_itr->Next().Extension()) != exts.end()) {
313         ++row_cnt;
314       }
315     }
316     folder_paths.pop();
317   }
318   (*num_rows) = row_cnt;
319   return Status::OK();
320 }
321 
ComputeColMap()322 Status ImageFolderOp::ComputeColMap() {
323   // Set the column name map (base class field)
324   if (column_name_id_map_.empty()) {
325     for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
326       column_name_id_map_[data_schema_->Column(i).Name()] = i;
327     }
328   } else {
329     MS_LOG(WARNING) << "Column name map is already set!";
330   }
331   return Status::OK();
332 }
333 
334 // Get number of classes
GetNumClasses(int64_t * num_classes)335 Status ImageFolderOp::GetNumClasses(int64_t *num_classes) {
336   RETURN_UNEXPECTED_IF_NULL(num_classes);
337   if (num_classes_ > 0) {
338     *num_classes = num_classes_;
339     return Status::OK();
340   }
341   RETURN_IF_NOT_OK(CountRowsAndClasses(folder_path_, extensions_, nullptr, num_classes, class_index_));
342   num_classes_ = *num_classes;
343   return Status::OK();
344 }
345 
InitPullMode()346 Status ImageFolderOp::InitPullMode() {
347   // to avoid the concurrent and multi end signal in StartAsyncWalk, explicitly set num_workers_ to 1
348   num_workers_ = 1;
349   RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
350   RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
351   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(Name() + "::WalkDir",
352                                                       std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
353   RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
354                                         std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
355                                         Name() + "::PrescanWorkerEntry", id()));
356   return PrepareData();
357 }
358 
GetClassIndexing(std::vector<std::pair<std::string,std::vector<int32_t>>> * output_class_indexing)359 Status ImageFolderOp::GetClassIndexing(
360   std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) {
361   RETURN_UNEXPECTED_IF_NULL(output_class_indexing);
362   output_class_indexing->clear();
363 
364   // if class_index_ exist, return directly
365   if (!class_index_.empty()) {
366     (void)std::transform(
367       class_index_.begin(), class_index_.end(), std::back_inserter(*output_class_indexing),
368       [](const auto &elem) { return std::pair<std::string, std::vector<int32_t>>(elem.first, {elem.second}); });
369     return Status::OK();
370   }
371 
372   // Iter folder path
373   Path dir(folder_path_);
374   if (!dir.Exists() || !dir.IsDirectory()) {
375     RETURN_STATUS_UNEXPECTED("Invalid dataset_dir, " + folder_path_ + " does not exist or not a directory.");
376   }
377 
378   std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(&dir);
379   RETURN_UNEXPECTED_IF_NULL(dir_itr);
380   std::vector<std::string> folder_names;
381   while (dir_itr->HasNext()) {
382     Path subdir = dir_itr->Next();
383     if (subdir.IsDirectory()) {
384       folder_names.push_back(subdir.Basename());
385     }
386   }
387   if (folder_names.empty()) {
388     RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
389                              "Dataset API can't read the data file (interface mismatch or no data found). Check " +
390                              DatasetName() + " file path: " + folder_path_);
391   }
392 
393   std::sort(folder_names.begin(), folder_names.end());
394   int32_t label_count = 0;
395   (void)std::transform(folder_names.begin(), folder_names.end(), std::back_inserter(*output_class_indexing),
396                        [&label_count](const auto &elem) {
397                          auto p = std::pair<std::string, std::vector<int32_t>>(elem, {label_count});
398                          label_count++;
399                          return p;
400                        });
401   return Status::OK();
402 }
403 }  // namespace dataset
404 }  // namespace mindspore
405