• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/datasetops/source/cifar_op.h"
17 
18 #include <algorithm>
19 #include <fstream>
20 #include <utility>
21 
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/core/tensor_shape.h"
24 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
25 #include "minddata/dataset/engine/db_connector.h"
26 #include "minddata/dataset/engine/execution_tree.h"
27 #include "utils/ms_utils.h"
28 
29 namespace mindspore {
30 namespace dataset {
31 
32 constexpr uint32_t kCifarImageHeight = 32;
33 constexpr uint32_t kCifarImageWidth = 32;
34 constexpr uint32_t kCifarImageChannel = 3;
35 constexpr uint32_t kCifarBlockImageNum = 5;
36 constexpr uint32_t kCifarImageSize = kCifarImageHeight * kCifarImageWidth * kCifarImageChannel;
CifarOp(CifarType type,const std::string & usage,int32_t num_works,const std::string & file_dir,int32_t queue_size,std::unique_ptr<DataSchema> data_schema,std::shared_ptr<SamplerRT> sampler)37 CifarOp::CifarOp(CifarType type, const std::string &usage, int32_t num_works, const std::string &file_dir,
38                  int32_t queue_size, std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
39     : MappableLeafOp(num_works, queue_size, std::move(sampler)),
40       cifar_type_(type),
41       usage_(usage),
42       folder_path_(file_dir),
43       data_schema_(std::move(data_schema)) {
44   constexpr uint64_t kUtilQueueSize = 512;
45   cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize);
46   io_block_queues_.Init(num_workers_, queue_size);
47 }
48 
LaunchThreadsAndInitOp()49 Status CifarOp::LaunchThreadsAndInitOp() {
50   if (tree_ == nullptr) {
51     RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
52   }
53   RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
54   RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
55   RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
56     "Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this), nullptr, id()));
57   RETURN_IF_NOT_OK(
58     tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1), "", id()));
59   TaskManager::FindMe()->Post();
60   // The order of the following 2 functions must not be changed!
61   RETURN_IF_NOT_OK(ParseCifarData());  // Parse cifar data and get num rows, blocking
62   RETURN_IF_NOT_OK(InitSampler());     // Pass numRows to Sampler
63   return Status::OK();
64 }
65 
66 // Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow
LoadTensorRow(row_id_type index,TensorRow * trow)67 Status CifarOp::LoadTensorRow(row_id_type index, TensorRow *trow) {
68   std::shared_ptr<Tensor> label;
69   std::shared_ptr<Tensor> fine_label;
70   std::shared_ptr<Tensor> ori_image = cifar_image_label_pairs_[index].first;
71   std::shared_ptr<Tensor> copy_image;
72   uint64_t path_index = std::ceil(index / kCifarBlockImageNum);
73   RETURN_IF_NOT_OK(Tensor::CreateFromTensor(ori_image, &copy_image));
74   RETURN_IF_NOT_OK(Tensor::CreateScalar(cifar_image_label_pairs_[index].second[0], &label));
75 
76   if (cifar_image_label_pairs_[index].second.size() > 1) {
77     RETURN_IF_NOT_OK(Tensor::CreateScalar(cifar_image_label_pairs_[index].second[1], &fine_label));
78     (*trow) = TensorRow(index, {copy_image, std::move(label), std::move(fine_label)});
79     // Add file path info
80     trow->setPath({path_record_[path_index], path_record_[path_index], path_record_[path_index]});
81   } else {
82     (*trow) = TensorRow(index, {copy_image, std::move(label)});
83     // Add file path info
84     trow->setPath({path_record_[path_index], path_record_[path_index]});
85   }
86 
87   return Status::OK();
88 }
89 
Print(std::ostream & out,bool show_all) const90 void CifarOp::Print(std::ostream &out, bool show_all) const {
91   if (!show_all) {
92     // Call the super class for displaying any common 1-liner info
93     ParallelOp::Print(out, show_all);
94     // Then show any custom derived-internal 1-liner info for this op
95     out << "\n";
96   } else {
97     // Call the super class for displaying any common detailed info
98     ParallelOp::Print(out, show_all);
99     // Then show any custom derived-internal stuff
100     out << "\nNumber of rows:" << num_rows_ << "\nCifar directory: " << folder_path_ << "\n\n";
101   }
102 }
103 
ReadCifarBlockDataAsync()104 Status CifarOp::ReadCifarBlockDataAsync() {
105   TaskManager::FindMe()->Post();
106   RETURN_IF_NOT_OK(GetCifarFiles());
107   if (cifar_type_ == kCifar10) {
108     RETURN_IF_NOT_OK(ReadCifar10BlockData());
109   } else {
110     RETURN_IF_NOT_OK(ReadCifar100BlockData());
111   }
112 
113   return Status::OK();
114 }
115 
ReadCifar10BlockData()116 Status CifarOp::ReadCifar10BlockData() {
117   // CIFAR 10 has 6 bin files. data_batch_1.bin ... data_batch_5.bin and 1 test_batch.bin file
118   // each of the file has exactly 10K images and labels and size is 30,730 KB
119   // each image has the dimension of 32 x 32 x 3 = 3072 plus 1 label (label has 10 classes) so each row has 3073 bytes
120   constexpr uint32_t num_cifar10_records = 10000;
121   uint32_t block_size = (kCifarImageSize + 1) * kCifarBlockImageNum;  // about 2M
122   std::vector<unsigned char> image_data(block_size * sizeof(unsigned char), 0);
123   for (auto &file : cifar_files_) {
124     // check the validity of the file path
125     Path file_path(file);
126     CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
127                                  "Invalid file, failed to find cifar10 file: " + file);
128     std::string file_name = file_path.Basename();
129 
130     if (usage_ == "train") {
131       if (file_name.find("data_batch") == std::string::npos) continue;
132     } else if (usage_ == "test") {
133       if (file_name.find("test_batch") == std::string::npos) continue;
134     } else {  // get all the files that contain the word batch, aka any cifar 100 files
135       if (file_name.find("batch") == std::string::npos) continue;
136     }
137 
138     std::ifstream in(file, std::ios::binary);
139     CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar10 file: " + file +
140                                                  ", make sure file not damaged or permission denied.");
141 
142     for (uint32_t index = 0; index < num_cifar10_records / kCifarBlockImageNum; ++index) {
143       (void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char));
144       CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Invalid data, failed to read data from cifar10 file: " + file +
145                                                  ", re-download dataset(make sure it is CIFAR-10 binary version).");
146       (void)cifar_raw_data_block_->EmplaceBack(image_data);
147       // Add file path info
148       path_record_.push_back(file);
149     }
150     in.close();
151   }
152   (void)cifar_raw_data_block_->EmplaceBack(std::vector<unsigned char>());  // end block
153 
154   return Status::OK();
155 }
156 
ReadCifar100BlockData()157 Status CifarOp::ReadCifar100BlockData() {
158   // CIFAR 100 has 2 bin files. train.bin (60K imgs)  153,700KB and test.bin (30,740KB) (10K imgs)
159   // each img has two labels. Each row then is 32 * 32 *5 + 2 = 3,074 Bytes
160   uint32_t num_cifar100_records = 0;  // test:10000, train:50000
161   constexpr uint32_t num_cifar100_test_records = 10000;
162   constexpr uint32_t num_cifar100_train_records = 50000;
163   uint32_t block_size = (kCifarImageSize + 2) * kCifarBlockImageNum;  // about 2M
164   std::vector<unsigned char> image_data(block_size * sizeof(unsigned char), 0);
165   for (auto &file : cifar_files_) {
166     // check the validity of the file path
167     Path file_path(file);
168     CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
169                                  "Invalid file, failed to find cifar100 file: " + file);
170     std::string file_name = file_path.Basename();
171 
172     // if usage is train/test, get only these 2 files
173     if (usage_ == "train" && file_name.find("train") == std::string::npos) continue;
174     if (usage_ == "test" && file_name.find("test") == std::string::npos) continue;
175 
176     if (file_name.find("test") != std::string::npos) {
177       num_cifar100_records = num_cifar100_test_records;
178     } else if (file_name.find("train") != std::string::npos) {
179       num_cifar100_records = num_cifar100_train_records;
180     } else {
181       RETURN_STATUS_UNEXPECTED("Invalid file, Cifar100 train/test file not found in: " + file_name);
182     }
183 
184     std::ifstream in(file, std::ios::binary);
185     CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar100 file: " + file +
186                                                  ", make sure file not damaged or permission denied.");
187 
188     for (uint32_t index = 0; index < num_cifar100_records / kCifarBlockImageNum; index++) {
189       (void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char));
190       CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Invalid data, failed to read data from cifar100 file: " + file +
191                                                  ", re-download dataset(make sure it is CIFAR-100 binary version).");
192       (void)cifar_raw_data_block_->EmplaceBack(image_data);
193       // Add file path info
194       path_record_.push_back(file);
195     }
196     in.close();
197   }
198   (void)cifar_raw_data_block_->EmplaceBack(std::vector<unsigned char>());  // block end
199   return Status::OK();
200 }
201 
GetCifarFiles()202 Status CifarOp::GetCifarFiles() {
203   const std::string extension = ".bin";
204   Path dir_path(folder_path_);
205   auto dirIt = Path::DirIterator::OpenDirectory(&dir_path);
206   if (dirIt) {
207     while (dirIt->HasNext()) {
208       Path file = dirIt->Next();
209       if (file.Extension() == extension) {
210         cifar_files_.push_back(file.ToString());
211       }
212     }
213   } else {
214     RETURN_STATUS_UNEXPECTED("Invalid file, failed to open directory: " + dir_path.ToString() +
215                              ", make sure file not damaged or permission denied.");
216   }
217   CHECK_FAIL_RETURN_UNEXPECTED(!cifar_files_.empty(), "Invalid file, no .bin files found under " + folder_path_);
218   std::sort(cifar_files_.begin(), cifar_files_.end());
219   return Status::OK();
220 }
221 
ParseCifarData()222 Status CifarOp::ParseCifarData() {
223   std::vector<unsigned char> block;
224   RETURN_IF_NOT_OK(cifar_raw_data_block_->PopFront(&block));
225   uint32_t cur_block_index = 0;
226   while (!block.empty()) {
227     for (uint32_t index = 0; index < kCifarBlockImageNum; ++index) {
228       std::vector<uint32_t> labels;
229       uint32_t label = block[cur_block_index++];
230       labels.push_back(label);
231       if (cifar_type_ == kCifar100) {
232         uint32_t fine_label = block[cur_block_index++];
233         labels.push_back(fine_label);
234       }
235 
236       std::shared_ptr<Tensor> image_tensor;
237       RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({kCifarImageHeight, kCifarImageWidth, kCifarImageChannel}),
238                                            data_schema_->Column(0).Type(), &image_tensor));
239       auto itr = image_tensor->begin<uint8_t>();
240       uint32_t total_pix = kCifarImageHeight * kCifarImageWidth;
241       for (uint32_t pix = 0; pix < total_pix; ++pix) {
242         for (uint32_t ch = 0; ch < kCifarImageChannel; ++ch) {
243           *itr = block[cur_block_index + ch * total_pix + pix];
244           ++itr;
245         }
246       }
247       cur_block_index += total_pix * kCifarImageChannel;
248       cifar_image_label_pairs_.emplace_back(std::make_pair(image_tensor, labels));
249     }
250     RETURN_IF_NOT_OK(cifar_raw_data_block_->PopFront(&block));
251     cur_block_index = 0;
252   }
253   cifar_image_label_pairs_.shrink_to_fit();
254   num_rows_ = cifar_image_label_pairs_.size();
255   if (num_rows_ == 0) {
256     std::string api = cifar_type_ == kCifar10 ? "Cifar10Dataset" : "Cifar100Dataset";
257     RETURN_STATUS_UNEXPECTED("Invalid data, " + api +
258                              " API can't read the data file (interface mismatch or no data found). "
259                              "Check file in directory:" +
260                              folder_path_);
261   }
262   cifar_raw_data_block_->Reset();
263   return Status::OK();
264 }
265 
266 // Derived from RandomAccessOp
GetClassIds(std::map<int32_t,std::vector<int64_t>> * cls_ids) const267 Status CifarOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
268   if (cls_ids == nullptr || !cls_ids->empty()) {
269     RETURN_STATUS_UNEXPECTED(
270       "[Internal ERROR] Map for containing image-index pair is nullptr or has been set in other place,"
271       "it must be empty before using GetClassIds.");
272   }
273 
274   for (uint64_t index = 0; index < cifar_image_label_pairs_.size(); ++index) {
275     uint32_t label = (cifar_image_label_pairs_[index].second)[0];
276     (*cls_ids)[label].push_back(index);
277   }
278 
279   for (auto &pair : (*cls_ids)) {
280     pair.second.shrink_to_fit();
281   }
282   return Status::OK();
283 }
284 
CountTotalRows(const std::string & dir,const std::string & usage,bool isCIFAR10,int64_t * count)285 Status CifarOp::CountTotalRows(const std::string &dir, const std::string &usage, bool isCIFAR10, int64_t *count) {
286   // the logic of counting the number of samples is copied from ReadCifar100Block() and ReadCifar10Block()
287   // Note that this count logic is flawed, should be able to copy the sampler of original CifarOp without state
288   *count = 0;
289   const int64_t num_samples = 0;
290   const int64_t start_index = 0;
291   auto new_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
292 
293   CifarType type = isCIFAR10 ? kCifar10 : kCifar100;
294   // build a new unique schema object
295   auto new_schema = std::make_unique<DataSchema>();
296   TensorShape scalar = TensorShape::CreateScalar();
297   RETURN_IF_NOT_OK(
298     new_schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1)));
299   if (type == kCifar10) {
300     RETURN_IF_NOT_OK(
301       new_schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar)));
302   } else {
303     RETURN_IF_NOT_OK(new_schema->AddColumn(
304       ColDescriptor("coarse_label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar)));
305     TensorShape another_scalar = TensorShape::CreateScalar();
306     RETURN_IF_NOT_OK(new_schema->AddColumn(
307       ColDescriptor("fine_label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &another_scalar)));
308   }
309   std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
310   int32_t num_workers = cfg->num_parallel_workers();
311   int32_t op_connect_size = cfg->op_connector_size();
312   std::shared_ptr<CifarOp> op = std::make_shared<CifarOp>(type, usage, num_workers, dir, op_connect_size,
313                                                           std::move(new_schema), std::move(new_sampler));
314 
315   RETURN_IF_NOT_OK(op->GetCifarFiles());
316   if (op->cifar_type_ == kCifar10) {
317     constexpr int64_t num_cifar10_records = 10000;
318     for (auto &file : op->cifar_files_) {
319       Path file_path(file);
320       CHECK_FAIL_RETURN_UNEXPECTED(
321         file_path.Exists() && !file_path.IsDirectory(),
322         "Invalid file, failed to open cifar10 file: " + file + ", make sure file not damaged or permission denied.");
323       std::string file_name = file_path.Basename();
324 
325       if (op->usage_ == "train") {
326         if (file_name.find("data_batch") == std::string::npos) continue;
327       } else if (op->usage_ == "test") {
328         if (file_name.find("test_batch") == std::string::npos) continue;
329       } else {  // get all the files that contain the word batch, aka any cifar 100 files
330         if (file_name.find("batch") == std::string::npos) continue;
331       }
332 
333       std::ifstream in(file, std::ios::binary);
334 
335       CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar10 file: " + file +
336                                                    ", make sure file not damaged or permission denied.");
337       *count = *count + num_cifar10_records;
338     }
339     return Status::OK();
340   } else {
341     const uint32_t kCifar100RecordsPerTestFile = 10000;
342     const uint32_t kCifar100RecordsPerTrainFile = 50000;
343     int64_t num_cifar100_records = 0;
344     for (auto &file : op->cifar_files_) {
345       Path file_path(file);
346       std::string file_name = file_path.Basename();
347 
348       CHECK_FAIL_RETURN_UNEXPECTED(
349         file_path.Exists() && !file_path.IsDirectory(),
350         "Invalid file, failed to find cifar100 file: " + file + ", make sure file not damaged or permission denied.");
351 
352       if (op->usage_ == "train" && file_path.Basename().find("train") == std::string::npos) continue;
353       if (op->usage_ == "test" && file_path.Basename().find("test") == std::string::npos) continue;
354 
355       if (file_name.find("test") != std::string::npos) {
356         num_cifar100_records += kCifar100RecordsPerTestFile;
357       } else if (file_name.find("train") != std::string::npos) {
358         num_cifar100_records += kCifar100RecordsPerTrainFile;
359       }
360       std::ifstream in(file, std::ios::binary);
361       CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar100 file: " + file +
362                                                    ", make sure file not damaged or permission denied.");
363     }
364     *count = num_cifar100_records;
365     return Status::OK();
366   }
367 }
368 
ComputeColMap()369 Status CifarOp::ComputeColMap() {
370   // set the column name map (base class field)
371   if (column_name_id_map_.empty()) {
372     for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
373       column_name_id_map_[data_schema_->Column(i).Name()] = i;
374     }
375   } else {
376     MS_LOG(WARNING) << "Column name map is already set!";
377   }
378   return Status::OK();
379 }
380 
381 }  // namespace dataset
382 }  // namespace mindspore
383