1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/image_folder_op.h"
17 #include <fstream>
18 #include "utils/ms_utils.h"
19 #include "minddata/dataset/core/config_manager.h"
20 #include "minddata/dataset/core/tensor_shape.h"
21 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
22 #include "minddata/dataset/engine/db_connector.h"
23 #include "minddata/dataset/engine/execution_tree.h"
24
25 namespace mindspore {
26 namespace dataset {
ImageFolderOp(int32_t num_wkrs,std::string file_dir,int32_t queue_size,bool recursive,bool do_decode,const std::set<std::string> & exts,const std::map<std::string,int32_t> & map,std::unique_ptr<DataSchema> data_schema,std::shared_ptr<SamplerRT> sampler)27 ImageFolderOp::ImageFolderOp(int32_t num_wkrs, std::string file_dir, int32_t queue_size, bool recursive, bool do_decode,
28 const std::set<std::string> &exts, const std::map<std::string, int32_t> &map,
29 std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
30 : MappableLeafOp(num_wkrs, queue_size, std::move(sampler)),
31 folder_path_(std::move(file_dir)),
32 recursive_(recursive),
33 decode_(do_decode),
34 extensions_(exts),
35 class_index_(map),
36 data_schema_(std::move(data_schema)),
37 sampler_ind_(0),
38 dirname_offset_(0) {
39 folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
40 image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
41 io_block_queues_.Init(num_workers_, queue_size);
42 }
43
44 // Master thread that pulls the prescan worker's results.
45 // Keep collecting results until all prescan workers quit
46 // Then consolidate 2 level shuffles together into 1 giant vector
47 // calculate numRows then return
PrescanMasterEntry(const std::string & filedir)48 Status ImageFolderOp::PrescanMasterEntry(const std::string &filedir) {
49 std::vector<FolderImagesPair> v;
50 int64_t cnt = 0;
51 while (cnt != num_workers_) { // count number of end signals
52 FolderImagesPair p;
53 RETURN_IF_NOT_OK(image_name_queue_->PopFront(&p));
54 if (p == nullptr) {
55 cnt++;
56 } else {
57 v.push_back(p);
58 }
59 }
60 std::sort(v.begin(), v.end(),
61 [](const FolderImagesPair &lhs, const FolderImagesPair &rhs) { return lhs->first < rhs->first; });
62 // following loop puts the 2 level of shuffles together into 1 vector
63 for (size_t ind = 0; ind < v.size(); ++ind) {
64 while (v[ind]->second.empty() == false) {
65 MS_ASSERT(!(v[ind]->first.empty())); // make sure that v[ind]->first.substr(1) is not out of bound
66 v[ind]->second.front()->second = class_index_.empty() ? ind : class_index_[v[ind]->first.substr(1)];
67 image_label_pairs_.push_back(v[ind]->second.front());
68 v[ind]->second.pop();
69 }
70 }
71 image_label_pairs_.shrink_to_fit();
72 num_rows_ = image_label_pairs_.size();
73 if (num_rows_ == 0) {
74 RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
75 "Dataset API can't read the data file (interface mismatch or no data found). Check " +
76 DatasetName() + " file path: " + folder_path_);
77 }
78 // free memory of two queues used for pre-scan
79 folder_name_queue_->Reset();
80 image_name_queue_->Reset();
81 return Status::OK();
82 }
83
84 // Load 1 TensorRow (image,label) using 1 ImageLabelPair. 1 function call produces 1 TensorTow
LoadTensorRow(row_id_type row_id,TensorRow * trow)85 Status ImageFolderOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) {
86 ImageLabelPair pair_ptr = image_label_pairs_[row_id];
87 std::shared_ptr<Tensor> image, label;
88 RETURN_IF_NOT_OK(Tensor::CreateScalar(pair_ptr->second, &label));
89 RETURN_IF_NOT_OK(Tensor::CreateFromFile(folder_path_ + (pair_ptr->first), &image));
90
91 if (decode_ == true) {
92 Status rc = Decode(image, &image);
93 if (rc.IsError()) {
94 std::string err = "Invalid data, failed to decode image: " + folder_path_ + (pair_ptr->first);
95 RETURN_STATUS_UNEXPECTED(err);
96 }
97 }
98 (*trow) = TensorRow(row_id, {std::move(image), std::move(label)});
99 trow->setPath({folder_path_ + (pair_ptr->first), std::string("")});
100 return Status::OK();
101 }
102
Print(std::ostream & out,bool show_all) const103 void ImageFolderOp::Print(std::ostream &out, bool show_all) const {
104 if (!show_all) {
105 // Call the super class for displaying any common 1-liner info
106 ParallelOp::Print(out, show_all);
107 // Then show any custom derived-internal 1-liner info for this op
108 out << "\n";
109 } else {
110 // Call the super class for displaying any common detailed info
111 ParallelOp::Print(out, show_all);
112 // Then show any custom derived-internal stuff
113 out << "\nNumber of rows:" << num_rows_ << "\n"
114 << DatasetName(true) << " directory: " << folder_path_ << "\nDecode: " << (decode_ ? "yes" : "no") << "\n\n";
115 }
116 }
117
118 // Derived from RandomAccessOp
GetClassIds(std::map<int32_t,std::vector<int64_t>> * cls_ids) const119 Status ImageFolderOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
120 if (cls_ids == nullptr || !cls_ids->empty() || image_label_pairs_.empty()) {
121 if (image_label_pairs_.empty()) {
122 RETURN_STATUS_UNEXPECTED("Invalid data, " + DatasetName(true) +
123 "Dataset API can't read the data file(interface mismatch or no data found). Check " +
124 DatasetName() + " file path: " + folder_path_);
125 } else {
126 RETURN_STATUS_UNEXPECTED(
127 "[Internal ERROR], Map containing image-index pair is nullptr or has been set in other place,"
128 "it must be empty before using GetClassIds.");
129 }
130 }
131 for (size_t i = 0; i < image_label_pairs_.size(); ++i) {
132 (*cls_ids)[image_label_pairs_[i]->second].push_back(i);
133 }
134 for (auto &pair : (*cls_ids)) {
135 pair.second.shrink_to_fit();
136 }
137 return Status::OK();
138 }
139
140 // Worker Entry for pre-scanning all the folders and do the 1st level shuffle
141 // Worker pull a file name from folder_name_queue_ (which is a Queue), walks all the images under that foldername
142 // After walking is complete, sort all the file names (relative path to all jpeg files under the same directory )
143 // (Sort is automatically conducted using a set which is implemented using a Red-Black Tree)
144 // Add the sorted filenames in to a queue. The make a pair (foldername, queue<filenames>*),
145 // foldername is used for 2nd level sorting.
146 // FYI: 1st level sorting: sort all images under the same directory.
147 // FYI: 2nd level sorting: sort all folder names
148 // push this pair to image_name_queue (which is again a Queue)
PrescanWorkerEntry(int32_t worker_id)149 Status ImageFolderOp::PrescanWorkerEntry(int32_t worker_id) {
150 TaskManager::FindMe()->Post();
151 std::string folder_name;
152 RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
153 while (folder_name.empty() == false) {
154 Path folder(folder_path_ + folder_name);
155 std::shared_ptr<Path::DirIterator> dirItr = Path::DirIterator::OpenDirectory(&folder);
156 if (folder.Exists() == false || dirItr == nullptr) {
157 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open " + DatasetName() + ": " + folder_name);
158 }
159 std::set<std::string> imgs; // use this for ordering
160 while (dirItr->HasNext()) {
161 Path file = dirItr->Next();
162 if (extensions_.empty() || extensions_.find(file.Extension()) != extensions_.end()) {
163 (void)imgs.insert(file.ToString().substr(dirname_offset_));
164 } else {
165 MS_LOG(WARNING) << DatasetName(true) << " operator unsupported file found: " << file.ToString()
166 << ", extension: " << file.Extension() << ".";
167 }
168 }
169 FolderImagesPair p = std::make_shared<std::pair<std::string, std::queue<ImageLabelPair>>>();
170 p->first = folder_name;
171 for (const std::string &img : imgs) {
172 p->second.push(std::make_shared<std::pair<std::string, int32_t>>(img, 0));
173 }
174 RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(p));
175 RETURN_IF_NOT_OK(folder_name_queue_->PopFront(&folder_name));
176 }
177 RETURN_IF_NOT_OK(image_name_queue_->EmplaceBack(nullptr)); // end signal
178 return Status::OK();
179 }
180
181 // This helper function recursively walks all folder_paths, and send each foldername to folder_name_queue_
182 // if mRecursive == false, don't go into folder of folders
RecursiveWalkFolder(Path * dir)183 Status ImageFolderOp::RecursiveWalkFolder(Path *dir) {
184 std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(dir);
185 RETURN_UNEXPECTED_IF_NULL(dir_itr);
186 while (dir_itr->HasNext()) {
187 Path subdir = dir_itr->Next();
188 if (subdir.IsDirectory()) {
189 if (class_index_.empty() ||
190 class_index_.find(subdir.ToString().substr(dirname_offset_ + 1)) != class_index_.end()) {
191 RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack(subdir.ToString().substr(dirname_offset_)));
192 }
193 if (recursive_ == true) {
194 MS_LOG(ERROR) << "RecursiveWalkFolder(&subdir) functionality is disabled permanently. No recursive walk of "
195 << "directory will be performed.";
196 }
197 }
198 }
199 return Status::OK();
200 }
201
202 // A thread that calls RecursiveWalkFolder
StartAsyncWalk()203 Status ImageFolderOp::StartAsyncWalk() {
204 TaskManager::FindMe()->Post();
205 Path dir(folder_path_);
206 if (dir.Exists() == false || dir.IsDirectory() == false) {
207 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open " + DatasetName() + ": " + folder_path_);
208 }
209 dirname_offset_ = folder_path_.length();
210 RETURN_IF_NOT_OK(RecursiveWalkFolder(&dir));
211 // send out num_workers_ end signal to folder_name_queue_, 1 for each worker.
212 // Upon receiving end Signal, worker quits and set another end Signal to image_name_queue.
213 for (int32_t ind = 0; ind < num_workers_; ++ind) {
214 RETURN_IF_NOT_OK(folder_name_queue_->EmplaceBack("")); // end signal
215 }
216 return Status::OK();
217 }
218
LaunchThreadsAndInitOp()219 Status ImageFolderOp::LaunchThreadsAndInitOp() {
220 RETURN_UNEXPECTED_IF_NULL(tree_);
221 // Registers QueueList and individual Queues for interrupt services
222 RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
223 RETURN_IF_NOT_OK(folder_name_queue_->Register(tree_->AllTasks()));
224 RETURN_IF_NOT_OK(image_name_queue_->Register(tree_->AllTasks()));
225 RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
226 // The following code launch 3 threads group
227 // 1) A thread that walks all folders and push the folder names to a util:Queue folder_name_queue_.
228 // 2) Workers that pull foldername from folder_name_queue_, walk it and return the sorted images to image_name_queue
229 // 3) Launch main workers that load TensorRows by reading all images
230 RETURN_IF_NOT_OK(
231 tree_->AllTasks()->CreateAsyncTask("walk dir", std::bind(&ImageFolderOp::StartAsyncWalk, this), nullptr, id()));
232 RETURN_IF_NOT_OK(tree_->LaunchWorkers(num_workers_,
233 std::bind(&ImageFolderOp::PrescanWorkerEntry, this, std::placeholders::_1),
234 Name() + "::PrescanWorkerEntry", id()));
235 RETURN_IF_NOT_OK(tree_->LaunchWorkers(
236 num_workers_, std::bind(&ImageFolderOp::WorkerEntry, this, std::placeholders::_1), Name() + "::WorkerEntry", id()));
237 TaskManager::FindMe()->Post();
238 // The order of the following 2 functions must not be changed!
239 RETURN_IF_NOT_OK(this->PrescanMasterEntry(folder_path_)); // Master thread of pre-scan workers, blocking
240 RETURN_IF_NOT_OK(this->InitSampler()); // pass numRows to Sampler
241 return Status::OK();
242 }
243
CountRowsAndClasses(const std::string & path,const std::set<std::string> & exts,int64_t * num_rows,int64_t * num_classes,std::map<std::string,int32_t> class_index)244 Status ImageFolderOp::CountRowsAndClasses(const std::string &path, const std::set<std::string> &exts, int64_t *num_rows,
245 int64_t *num_classes, std::map<std::string, int32_t> class_index) {
246 Path dir(path);
247 std::string err_msg = "";
248 int64_t row_cnt = 0;
249 err_msg += (dir.Exists() == false || dir.IsDirectory() == false)
250 ? "Invalid parameter, input path is invalid or not set, path: " + path
251 : "";
252 err_msg +=
253 (num_classes == nullptr && num_rows == nullptr) ? "Invalid parameter, num_class and num_rows are null.\n" : "";
254 if (err_msg.empty() == false) {
255 RETURN_STATUS_UNEXPECTED(err_msg);
256 }
257 std::queue<std::string> folder_paths;
258 std::shared_ptr<Path::DirIterator> dir_itr = Path::DirIterator::OpenDirectory(&dir);
259 std::unordered_set<std::string> folder_names;
260 while (dir_itr->HasNext()) {
261 Path subdir = dir_itr->Next();
262 if (subdir.IsDirectory()) {
263 folder_paths.push(subdir.ToString());
264 if (!class_index.empty()) folder_names.insert(subdir.Basename());
265 }
266 }
267 if (num_classes != nullptr) {
268 // if class index is empty, get everything on disk
269 if (class_index.empty()) {
270 *num_classes = folder_paths.size();
271 } else {
272 for (const auto &p : class_index) {
273 CHECK_FAIL_RETURN_UNEXPECTED(folder_names.find(p.first) != folder_names.end(),
274 "Invalid parameter, folder: " + p.first + " doesn't exist in " + path + " .");
275 }
276 (*num_classes) = class_index.size();
277 }
278 }
279 // return here if only num_class is needed
280 RETURN_OK_IF_TRUE(num_rows == nullptr);
281 while (folder_paths.empty() == false) {
282 Path subdir(folder_paths.front());
283 dir_itr = Path::DirIterator::OpenDirectory(&subdir);
284 if (subdir.Exists() == false || dir_itr == nullptr) {
285 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open folder: " + subdir.ToString());
286 }
287 while (dir_itr->HasNext()) {
288 if (exts.empty() || exts.find(subdir.Extension()) != exts.end()) {
289 ++row_cnt;
290 }
291 }
292 folder_paths.pop();
293 }
294 (*num_rows) = row_cnt;
295 return Status::OK();
296 }
297
ComputeColMap()298 Status ImageFolderOp::ComputeColMap() {
299 // Set the column name map (base class field)
300 if (column_name_id_map_.empty()) {
301 for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
302 column_name_id_map_[data_schema_->Column(i).Name()] = i;
303 }
304 } else {
305 MS_LOG(WARNING) << "Column name map is already set!";
306 }
307 return Status::OK();
308 }
309
310 // Get number of classes
GetNumClasses(int64_t * num_classes)311 Status ImageFolderOp::GetNumClasses(int64_t *num_classes) {
312 if (num_classes_ > 0) {
313 *num_classes = num_classes_;
314 return Status::OK();
315 }
316 RETURN_IF_NOT_OK(CountRowsAndClasses(folder_path_, extensions_, nullptr, num_classes, class_index_));
317 num_classes_ = *num_classes;
318 return Status::OK();
319 }
320 } // namespace dataset
321 } // namespace mindspore
322