• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
17 #include <fstream>
18 #include "utils/ms_utils.h"
19 #include "minddata/dataset/core/config_manager.h"
20 #include "minddata/dataset/core/tensor_shape.h"
21 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
22 #include "minddata/dataset/engine/db_connector.h"
23 #include "minddata/dataset/engine/execution_tree.h"
24 
25 namespace mindspore {
26 namespace dataset {
ImageFolderOp(int32_t num_wkrs,std::string file_dir,int32_t queue_size,bool recursive,bool do_decode,const std::set<std::string> & exts,const std::map<std::string,int32_t> & map,std::unique_ptr<DataSchema> data_schema,std::shared_ptr<SamplerRT> sampler)27 ImageFolderOp::ImageFolderOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, bool recursive, bool do_decode,
28                              const std::set<std::string> &exts, const std::map<std::string, int32_t> &map,
29                              std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
30     : MappableLeafOp(num_wkrs, queue_size, std::move(sampler)),
31       folder_path_(std::move(file_dir)),
32       recursive_(recursive),
33       decode_(do_decode),
34       extensions_(exts),
35       class_index_(map),
36       data_schema_(std::move(data_schema)),
37       sampler_ind_(0),
38       dirname_offset_(0) {
39   folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
40   image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
41   io_block_queues_.Init(num_workers_, queue_size);
42 }
43 
44 // Master thread that pulls the prescan worker's results.
45 // Keep collecting results until all prescan workers quit
46 // Then consolidate 2 level shuffles together into 1 giant vector
47 // calculate numRows then return
PrescanMasterEntry(const std::string & filedir)48 Status ImageFolderOp::PrescanMasterEntry(const std::string &filedir) {
49   std::vector<FolderImagesPair> v;
50   int64_t cnt = 0;
51   while (cnt != num_workers_) {  // count number of end signals
52     FolderImagesPair p;
53     RETURN_IF_NOT_OK(image_name_queue_->PopFront(&p));
54     if (p == nullptr) {
55       cnt++;
56     } else {
57       v.push_back(p);
58     }
59   }
60   std::sort(v.begin(), v.end(),
61             [](const FolderImagesPair &lhs, const FolderImagesPair &rhs) { return lhs->first < rhs->first; });
62   // following loop puts the 2 level of shuffles together into 1 vector
63   for (size_t ind = 0; ind < v.size(); ++ind) {
64     while (v[ind]->second.empty() == false) {
65       MS_ASSERT(!(v[ind]->first.empty()));  // make sure that v[ind]->first.substr(1) is not out of bound
66       v[ind]->second.front()->second = class_index_.empty() ? ind : class_index_[v[ind]->first.substr(1)];
67       image_label_pairs_.push_back(v[ind]->second.front());
68       v[ind]->second.pop();
69     }
70   }
71   image_label_pairs_.shrink_to_fit();
72   num_rows_ = image_label_pairs_.size();
73   if (num_rows_ == 0) {
74     RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
75                              "Dataset API can't read the data file (interface mismatch or no data found). Check " +
76                              DatasetName() + " file path: " + folder_path_);
77   }
78   // free memory of two queues used for pre-scan
79   folder_name_queue_->Reset();
80   image_name_queue_->Reset();
81   return Status::OK();
82 }
83 
84 // Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow
LoadTensorRow(row_id_type row_id,TensorRow * trow)85 Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
86   ImageLabelPair pair_ptr = image_label_pairs_[row_id];
87   std::shared_ptr<Tensor> image, label;
88   RETURN_IF_NOT_OK(Tensor::CreateScalar(pair_ptr->second, &label));
89   RETURN_IF_NOT_OK(Tensor::CreateFromFile(folder_path_ + (pair_ptr->first), &image));
90 
91   if (decode_ == true) {
92     Status rc = Decode(image, &image);
93     if (rc.IsError()) {
94       std::string err = "Invalid data, failed to decode image: " + folder_path_ + (pair_ptr->first);
95       RETURN_STATUS_UNEXPECTED(err);
96     }
97   }
98   (*trow) = TensorRow(row_id, {std::move(image), std::move(label)});
99   trow->setPath({folder_path_ + (pair_ptr->first), std::string("")});
100   return Status::OK();
101 }
102 
Print(std::ostream & out,bool show_all) const103 void ImageFolderOp::Print(std::ostream &out, bool show_all) const {
104   if (!show_all) {
105     // Call the super class for displaying any common 1-liner info
106     ParallelOp::Print(out, show_all);
107     // Then show any custom derived-internal 1-liner info for this op
108     out << "\n";
109   } else {
110     // Call the super class for displaying any common detailed info
111     ParallelOp::Print(out, show_all);
112     // Then show any custom derived-internal stuff
113     out << "\nNumber of rows:" << num_rows_ << "\n"
114         << DatasetName(true) << " directory: " << folder_path_ << "\nDecode: " << (decode_ ? "yes" : "no") << "\n\n";
115   }
116 }
117 
118 // Derived from RandomAccessOp
GetClassIds(std::map<int32_t,std::vector<int64_t>> * cls_ids) const119 Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
120   if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {
121     if (image_label_pairs_.empty()) {
122       RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
123                                "Dataset API can't read the data file(interface mismatch or no data found). Check " +
124                                DatasetName() + " file path: " + folder_path_);
125     } else {
126       RETURN_STATUS_UNEXPECTED(
127         "[Internal ERROR], Map containing image-index pair is nullptr or has been set in other place,"
128         "it must be empty before using GetClassIds.");
129     }
130   }
131   for (size_t i = 0; i < image_label_pairs_.size(); ++i) {
132     (*cls_ids)[image_label_pairs_[i]->second].push_back(i);
133   }
134   for (auto &pair : (*cls_ids)) {
135     pair.second.shrink_to_fit();
136   }
137   return Status::OK();
138 }
139 
140 // Worker Entry for pre-scanning all the folders and do the 1st level shuffle
141 // Worker pull a file name from folder_name_queue_ (which is a Queue), walks all the images under that foldername
142 // After walking is complete, sort all the file names (relative path to all jpeg files under the same directory )
143 // (Sort is automatically conducted using a set which is implemented using a Red-Black Tree)
144 // Add the sorted filenames in to a queue. The make a pair (foldername, queue<filenames>*),
145 // foldername is used for 2nd level sorting.
146 // FYI: 1st level sorting: sort all images under the same directory.
147 // FYI: 2nd level sorting: sort all folder names
148 // push this pair to image_name_queue (which is again a Queue)
PrescanWorkerEntry(int32_t worker_id)149 Status ImageFolderOp::PrescanWorkerEntry(int32_t worker_id) {
150   TaskManager::FindMe()->Post();
151   std::string folder_name;
152   RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
153   while (folder_name.empty() == false) {
154     Path folder(folder_path_ + folder_name);
155     std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder);
156     if (folder.Exists() == false || dirItr == nullptr) {
157       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open " + DatasetName() + ": " + folder_name);
158     }
159     std::set<std::string> imgs;  // use this for ordering
160     while (dirItr->HasNext()) {
161       Path file = dirItr->Next();
162       if (extensions_.empty() || extensions_.find(file.Extension()) != extensions_.end()) {
163         (void)imgs.insert(file.ToString().substr(dirname_offset_));
164       } else {
165         MS_LOG(WARNING) << DatasetName(true) << " operator unsupported file found: " << file.ToString()
166                         << ", extension: " << file.Extension() << ".";
167       }
168     }
169     FolderImagesPair p = std::make_shared<std::pair<std::string, std::queue<ImageLabelPair>>>();
170     p->first = folder_name;
171     for (const std::string &img : imgs) {
172       p->second.push(std::make_shared<std::pair<std::string, int32_t>>(img, 0));
173     }
174     RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(p));
175     RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
176   }
177   RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(nullptr));  // end signal
178   return Status::OK();
179 }
180 
181 // This helper function recursively walks all folder_paths, and send each foldername to folder_name_queue_
182 // if mRecursive == false, don't go into folder of folders
RecursiveWalkFolder(Path * dir)183 Status ImageFolderOp::RecursiveWalkFolder(Path *dir) {
184   std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(dir);
185   RETURN_UNEXPECTED_IF_NULL(dir_itr);
186   while (dir_itr->HasNext()) {
187     Path subdir = dir_itr->Next();
188     if (subdir.IsDirectory()) {
189       if (class_index_.empty() ||
190           class_index_.find(subdir.ToString().substr(dirname_offset_ + 1)) != class_index_.end()) {
191         RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack(subdir.ToString().substr(dirname_offset_)));
192       }
193       if (recursive_ == true) {
194         MS_LOG(ERROR) << "RecursiveWalkFolder(&subdir) functionality is disabled permanently. No recursive walk of "
195                       << "directory will be performed.";
196       }
197     }
198   }
199   return Status::OK();
200 }
201 
202 // A thread that calls RecursiveWalkFolder
StartAsyncWalk()203 Status ImageFolderOp::StartAsyncWalk() {
204   TaskManager::FindMe()->Post();
205   Path dir(folder_path_);
206   if (dir.Exists() == false || dir.IsDirectory() == false) {
207     RETURN_STATUS_UNEXPECTED("Invalid file, failed to open " + DatasetName() + ": " + folder_path_);
208   }
209   dirname_offset_ = folder_path_.length();
210   RETURN_IF_NOT_OK(RecursiveWalkFolder(&dir));
211   // send out num_workers_ end signal to folder_name_queue_, 1 for each worker.
212   // Upon receiving end Signal, worker quits and set another end Signal to image_name_queue.
213   for (int32_t ind = 0; ind < num_workers_; ++ind) {
214     RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack(""));  // end signal
215   }
216   return Status::OK();
217 }
218 
LaunchThreadsAndInitOp()219 Status ImageFolderOp::LaunchThreadsAndInitOp() {
220   RETURN_UNEXPECTED_IF_NULL(tree_);
221   // Registers QueueList and individual Queues for interrupt services
222   RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
223   RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
224   RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
225   RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
226   // The following code launch 3 threads group
227   // 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_.
228   // 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue
229   // 3) Launch main workers that load TensorRows by reading all images
230   RETURN_IF_NOT_OK(
231     tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
232   RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
233                                         std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
234                                         Name() + "::PrescanWorkerEntry", id()));
235   RETURN_IF_NOT_OK(tree_->LaunchWorkers(
236     num_workers_, std::bind(&ImageFolderOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry", id()));
237   TaskManager::FindMe()->Post();
238   // The order of the following 2 functions must not be changed!
239   RETURN_IF_NOT_OK(this->PrescanMasterEntry(folder_path_));  // Master thread of pre-scan workers, blocking
240   RETURN_IF_NOT_OK(this->InitSampler());                     // pass numRows to Sampler
241   return Status::OK();
242 }
243 
CountRowsAndClasses(const std::string & path,const std::set<std::string> & exts,int64_t * num_rows,int64_t * num_classes,std::map<std::string,int32_t> class_index)244 Status ImageFolderOp::CountRowsAndClasses(const std::string &path, const std::set<std::string> &exts, int64_t *num_rows,
245                                           int64_t *num_classes, std::map<std::string, int32_t> class_index) {
246   Path dir(path);
247   std::string err_msg = "";
248   int64_t row_cnt = 0;
249   err_msg += (dir.Exists() == false || dir.IsDirectory() == false)
250                ? "Invalid parameter, input path is invalid or not set, path: " + path
251                : "";
252   err_msg +=
253     (num_classes == nullptr && num_rows == nullptr) ? "Invalid parameter, num_class and num_rows are null.\n" : "";
254   if (err_msg.empty() == false) {
255     RETURN_STATUS_UNEXPECTED(err_msg);
256   }
257   std::queue<std::string> folder_paths;
258   std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(&dir);
259   std::unordered_set<std::string> folder_names;
260   while (dir_itr->HasNext()) {
261     Path subdir = dir_itr->Next();
262     if (subdir.IsDirectory()) {
263       folder_paths.push(subdir.ToString());
264       if (!class_index.empty()) folder_names.insert(subdir.Basename());
265     }
266   }
267   if (num_classes != nullptr) {
268     // if class index is empty, get everything on disk
269     if (class_index.empty()) {
270       *num_classes = folder_paths.size();
271     } else {
272       for (const auto &p : class_index) {
273         CHECK_FAIL_RETURN_UNEXPECTED(folder_names.find(p.first) != folder_names.end(),
274                                      "Invalid parameter, folder: " + p.first + " doesn't exist in " + path + " .");
275       }
276       (*num_classes) = class_index.size();
277     }
278   }
279   // return here if only num_class is needed
280   RETURN_OK_IF_TRUE(num_rows == nullptr);
281   while (folder_paths.empty() == false) {
282     Path subdir(folder_paths.front());
283     dir_itr = Path::DirIterator::OpenDirectory(&subdir);
284     if (subdir.Exists() == false || dir_itr == nullptr) {
285       RETURN_STATUS_UNEXPECTED("Invalid file, failed to open folder: " + subdir.ToString());
286     }
287     while (dir_itr->HasNext()) {
288       if (exts.empty() || exts.find(subdir.Extension()) != exts.end()) {
289         ++row_cnt;
290       }
291     }
292     folder_paths.pop();
293   }
294   (*num_rows) = row_cnt;
295   return Status::OK();
296 }
297 
ComputeColMap()298 Status ImageFolderOp::ComputeColMap() {
299   // Set the column name map (base class field)
300   if (column_name_id_map_.empty()) {
301     for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
302       column_name_id_map_[data_schema_->Column(i).Name()] = i;
303     }
304   } else {
305     MS_LOG(WARNING) << "Column name map is already set!";
306   }
307   return Status::OK();
308 }
309 
310 // Get number of classes
GetNumClasses(int64_t * num_classes)311 Status ImageFolderOp::GetNumClasses(int64_t *num_classes) {
312   if (num_classes_ > 0) {
313     *num_classes = num_classes_;
314     return Status::OK();
315   }
316   RETURN_IF_NOT_OK(CountRowsAndClasses(folder_path_, extensions_, nullptr, num_classes, class_index_));
317   num_classes_ = *num_classes;
318   return Status::OK();
319 }
320 }  // namespace dataset
321 }  // namespace mindspore
322