1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/engine/datasetops/source/cifar_op.h"
17
18 #include <algorithm>
19 #include <fstream>
20 #include <utility>
21
22 #include "minddata/dataset/core/config_manager.h"
23 #include "minddata/dataset/core/tensor_shape.h"
24 #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h"
25 #include "minddata/dataset/engine/db_connector.h"
26 #include "minddata/dataset/engine/execution_tree.h"
27 #include "utils/ms_utils.h"
28
29 namespace mindspore {
30 namespace dataset {
31
32 constexpr uint32_t kCifarImageHeight = 32;
33 constexpr uint32_t kCifarImageWidth = 32;
34 constexpr uint32_t kCifarImageChannel = 3;
35 constexpr uint32_t kCifarBlockImageNum = 5;
36 constexpr uint32_t kCifarImageSize = kCifarImageHeight * kCifarImageWidth * kCifarImageChannel;
CifarOp(CifarType type,const std::string & usage,int32_t num_works,const std::string & file_dir,int32_t queue_size,std::unique_ptr<DataSchema> data_schema,std::shared_ptr<SamplerRT> sampler)37 CifarOp::CifarOp(CifarType type, const std::string &usage, int32_t num_works, const std::string &file_dir,
38 int32_t queue_size, std::unique_ptr<DataSchema> data_schema, std::shared_ptr<SamplerRT> sampler)
39 : MappableLeafOp(num_works, queue_size, std::move(sampler)),
40 cifar_type_(type),
41 usage_(usage),
42 folder_path_(file_dir),
43 data_schema_(std::move(data_schema)) {
44 constexpr uint64_t kUtilQueueSize = 512;
45 cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize);
46 io_block_queues_.Init(num_workers_, queue_size);
47 }
48
LaunchThreadsAndInitOp()49 Status CifarOp::LaunchThreadsAndInitOp() {
50 if (tree_ == nullptr) {
51 RETURN_STATUS_UNEXPECTED("Pipeline init failed, Execution tree not set.");
52 }
53 RETURN_IF_NOT_OK(io_block_queues_.Register(tree_->AllTasks()));
54 RETURN_IF_NOT_OK(wait_for_workers_post_.Register(tree_->AllTasks()));
55 RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
56 "Get cifar data block", std::bind(&CifarOp::ReadCifarBlockDataAsync, this), nullptr, id()));
57 RETURN_IF_NOT_OK(
58 tree_->LaunchWorkers(num_workers_, std::bind(&CifarOp::WorkerEntry, this, std::placeholders::_1), "", id()));
59 TaskManager::FindMe()->Post();
60 // The order of the following 2 functions must not be changed!
61 RETURN_IF_NOT_OK(ParseCifarData()); // Parse cifar data and get num rows, blocking
62 RETURN_IF_NOT_OK(InitSampler()); // Pass numRows to Sampler
63 return Status::OK();
64 }
65
66 // Load 1 TensorRow (image,label). 1 function call produces 1 TensorTow
LoadTensorRow(row_id_type index,TensorRow * trow)67 Status CifarOp::LoadTensorRow(row_id_type index, TensorRow *trow) {
68 std::shared_ptr<Tensor> label;
69 std::shared_ptr<Tensor> fine_label;
70 std::shared_ptr<Tensor> ori_image = cifar_image_label_pairs_[index].first;
71 std::shared_ptr<Tensor> copy_image;
72 uint64_t path_index = std::ceil(index / kCifarBlockImageNum);
73 RETURN_IF_NOT_OK(Tensor::CreateFromTensor(ori_image, ©_image));
74 RETURN_IF_NOT_OK(Tensor::CreateScalar(cifar_image_label_pairs_[index].second[0], &label));
75
76 if (cifar_image_label_pairs_[index].second.size() > 1) {
77 RETURN_IF_NOT_OK(Tensor::CreateScalar(cifar_image_label_pairs_[index].second[1], &fine_label));
78 (*trow) = TensorRow(index, {copy_image, std::move(label), std::move(fine_label)});
79 // Add file path info
80 trow->setPath({path_record_[path_index], path_record_[path_index], path_record_[path_index]});
81 } else {
82 (*trow) = TensorRow(index, {copy_image, std::move(label)});
83 // Add file path info
84 trow->setPath({path_record_[path_index], path_record_[path_index]});
85 }
86
87 return Status::OK();
88 }
89
Print(std::ostream & out,bool show_all) const90 void CifarOp::Print(std::ostream &out, bool show_all) const {
91 if (!show_all) {
92 // Call the super class for displaying any common 1-liner info
93 ParallelOp::Print(out, show_all);
94 // Then show any custom derived-internal 1-liner info for this op
95 out << "\n";
96 } else {
97 // Call the super class for displaying any common detailed info
98 ParallelOp::Print(out, show_all);
99 // Then show any custom derived-internal stuff
100 out << "\nNumber of rows:" << num_rows_ << "\nCifar directory: " << folder_path_ << "\n\n";
101 }
102 }
103
ReadCifarBlockDataAsync()104 Status CifarOp::ReadCifarBlockDataAsync() {
105 TaskManager::FindMe()->Post();
106 RETURN_IF_NOT_OK(GetCifarFiles());
107 if (cifar_type_ == kCifar10) {
108 RETURN_IF_NOT_OK(ReadCifar10BlockData());
109 } else {
110 RETURN_IF_NOT_OK(ReadCifar100BlockData());
111 }
112
113 return Status::OK();
114 }
115
ReadCifar10BlockData()116 Status CifarOp::ReadCifar10BlockData() {
117 // CIFAR 10 has 6 bin files. data_batch_1.bin ... data_batch_5.bin and 1 test_batch.bin file
118 // each of the file has exactly 10K images and labels and size is 30,730 KB
119 // each image has the dimension of 32 x 32 x 3 = 3072 plus 1 label (label has 10 classes) so each row has 3073 bytes
120 constexpr uint32_t num_cifar10_records = 10000;
121 uint32_t block_size = (kCifarImageSize + 1) * kCifarBlockImageNum; // about 2M
122 std::vector<unsigned char> image_data(block_size * sizeof(unsigned char), 0);
123 for (auto &file : cifar_files_) {
124 // check the validity of the file path
125 Path file_path(file);
126 CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
127 "Invalid file, failed to find cifar10 file: " + file);
128 std::string file_name = file_path.Basename();
129
130 if (usage_ == "train") {
131 if (file_name.find("data_batch") == std::string::npos) continue;
132 } else if (usage_ == "test") {
133 if (file_name.find("test_batch") == std::string::npos) continue;
134 } else { // get all the files that contain the word batch, aka any cifar 100 files
135 if (file_name.find("batch") == std::string::npos) continue;
136 }
137
138 std::ifstream in(file, std::ios::binary);
139 CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar10 file: " + file +
140 ", make sure file not damaged or permission denied.");
141
142 for (uint32_t index = 0; index < num_cifar10_records / kCifarBlockImageNum; ++index) {
143 (void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char));
144 CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Invalid data, failed to read data from cifar10 file: " + file +
145 ", re-download dataset(make sure it is CIFAR-10 binary version).");
146 (void)cifar_raw_data_block_->EmplaceBack(image_data);
147 // Add file path info
148 path_record_.push_back(file);
149 }
150 in.close();
151 }
152 (void)cifar_raw_data_block_->EmplaceBack(std::vector<unsigned char>()); // end block
153
154 return Status::OK();
155 }
156
ReadCifar100BlockData()157 Status CifarOp::ReadCifar100BlockData() {
158 // CIFAR 100 has 2 bin files. train.bin (60K imgs) 153,700KB and test.bin (30,740KB) (10K imgs)
159 // each img has two labels. Each row then is 32 * 32 *5 + 2 = 3,074 Bytes
160 uint32_t num_cifar100_records = 0; // test:10000, train:50000
161 constexpr uint32_t num_cifar100_test_records = 10000;
162 constexpr uint32_t num_cifar100_train_records = 50000;
163 uint32_t block_size = (kCifarImageSize + 2) * kCifarBlockImageNum; // about 2M
164 std::vector<unsigned char> image_data(block_size * sizeof(unsigned char), 0);
165 for (auto &file : cifar_files_) {
166 // check the validity of the file path
167 Path file_path(file);
168 CHECK_FAIL_RETURN_UNEXPECTED(file_path.Exists() && !file_path.IsDirectory(),
169 "Invalid file, failed to find cifar100 file: " + file);
170 std::string file_name = file_path.Basename();
171
172 // if usage is train/test, get only these 2 files
173 if (usage_ == "train" && file_name.find("train") == std::string::npos) continue;
174 if (usage_ == "test" && file_name.find("test") == std::string::npos) continue;
175
176 if (file_name.find("test") != std::string::npos) {
177 num_cifar100_records = num_cifar100_test_records;
178 } else if (file_name.find("train") != std::string::npos) {
179 num_cifar100_records = num_cifar100_train_records;
180 } else {
181 RETURN_STATUS_UNEXPECTED("Invalid file, Cifar100 train/test file not found in: " + file_name);
182 }
183
184 std::ifstream in(file, std::ios::binary);
185 CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar100 file: " + file +
186 ", make sure file not damaged or permission denied.");
187
188 for (uint32_t index = 0; index < num_cifar100_records / kCifarBlockImageNum; index++) {
189 (void)in.read(reinterpret_cast<char *>(&(image_data[0])), block_size * sizeof(unsigned char));
190 CHECK_FAIL_RETURN_UNEXPECTED(!in.fail(), "Invalid data, failed to read data from cifar100 file: " + file +
191 ", re-download dataset(make sure it is CIFAR-100 binary version).");
192 (void)cifar_raw_data_block_->EmplaceBack(image_data);
193 // Add file path info
194 path_record_.push_back(file);
195 }
196 in.close();
197 }
198 (void)cifar_raw_data_block_->EmplaceBack(std::vector<unsigned char>()); // block end
199 return Status::OK();
200 }
201
GetCifarFiles()202 Status CifarOp::GetCifarFiles() {
203 const std::string extension = ".bin";
204 Path dir_path(folder_path_);
205 auto dirIt = Path::DirIterator::OpenDirectory(&dir_path);
206 if (dirIt) {
207 while (dirIt->HasNext()) {
208 Path file = dirIt->Next();
209 if (file.Extension() == extension) {
210 cifar_files_.push_back(file.ToString());
211 }
212 }
213 } else {
214 RETURN_STATUS_UNEXPECTED("Invalid file, failed to open directory: " + dir_path.ToString() +
215 ", make sure file not damaged or permission denied.");
216 }
217 CHECK_FAIL_RETURN_UNEXPECTED(!cifar_files_.empty(), "Invalid file, no .bin files found under " + folder_path_);
218 std::sort(cifar_files_.begin(), cifar_files_.end());
219 return Status::OK();
220 }
221
ParseCifarData()222 Status CifarOp::ParseCifarData() {
223 std::vector<unsigned char> block;
224 RETURN_IF_NOT_OK(cifar_raw_data_block_->PopFront(&block));
225 uint32_t cur_block_index = 0;
226 while (!block.empty()) {
227 for (uint32_t index = 0; index < kCifarBlockImageNum; ++index) {
228 std::vector<uint32_t> labels;
229 uint32_t label = block[cur_block_index++];
230 labels.push_back(label);
231 if (cifar_type_ == kCifar100) {
232 uint32_t fine_label = block[cur_block_index++];
233 labels.push_back(fine_label);
234 }
235
236 std::shared_ptr<Tensor> image_tensor;
237 RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({kCifarImageHeight, kCifarImageWidth, kCifarImageChannel}),
238 data_schema_->Column(0).Type(), &image_tensor));
239 auto itr = image_tensor->begin<uint8_t>();
240 uint32_t total_pix = kCifarImageHeight * kCifarImageWidth;
241 for (uint32_t pix = 0; pix < total_pix; ++pix) {
242 for (uint32_t ch = 0; ch < kCifarImageChannel; ++ch) {
243 *itr = block[cur_block_index + ch * total_pix + pix];
244 ++itr;
245 }
246 }
247 cur_block_index += total_pix * kCifarImageChannel;
248 cifar_image_label_pairs_.emplace_back(std::make_pair(image_tensor, labels));
249 }
250 RETURN_IF_NOT_OK(cifar_raw_data_block_->PopFront(&block));
251 cur_block_index = 0;
252 }
253 cifar_image_label_pairs_.shrink_to_fit();
254 num_rows_ = cifar_image_label_pairs_.size();
255 if (num_rows_ == 0) {
256 std::string api = cifar_type_ == kCifar10 ? "Cifar10Dataset" : "Cifar100Dataset";
257 RETURN_STATUS_UNEXPECTED("Invalid data, " + api +
258 " API can't read the data file (interface mismatch or no data found). "
259 "Check file in directory:" +
260 folder_path_);
261 }
262 cifar_raw_data_block_->Reset();
263 return Status::OK();
264 }
265
266 // Derived from RandomAccessOp
GetClassIds(std::map<int32_t,std::vector<int64_t>> * cls_ids) const267 Status CifarOp::GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const {
268 if (cls_ids == nullptr || !cls_ids->empty()) {
269 RETURN_STATUS_UNEXPECTED(
270 "[Internal ERROR] Map for containing image-index pair is nullptr or has been set in other place,"
271 "it must be empty before using GetClassIds.");
272 }
273
274 for (uint64_t index = 0; index < cifar_image_label_pairs_.size(); ++index) {
275 uint32_t label = (cifar_image_label_pairs_[index].second)[0];
276 (*cls_ids)[label].push_back(index);
277 }
278
279 for (auto &pair : (*cls_ids)) {
280 pair.second.shrink_to_fit();
281 }
282 return Status::OK();
283 }
284
CountTotalRows(const std::string & dir,const std::string & usage,bool isCIFAR10,int64_t * count)285 Status CifarOp::CountTotalRows(const std::string &dir, const std::string &usage, bool isCIFAR10, int64_t *count) {
286 // the logic of counting the number of samples is copied from ReadCifar100Block() and ReadCifar10Block()
287 // Note that this count logic is flawed, should be able to copy the sampler of original CifarOp without state
288 *count = 0;
289 const int64_t num_samples = 0;
290 const int64_t start_index = 0;
291 auto new_sampler = std::make_shared<SequentialSamplerRT>(start_index, num_samples);
292
293 CifarType type = isCIFAR10 ? kCifar10 : kCifar100;
294 // build a new unique schema object
295 auto new_schema = std::make_unique<DataSchema>();
296 TensorShape scalar = TensorShape::CreateScalar();
297 RETURN_IF_NOT_OK(
298 new_schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1)));
299 if (type == kCifar10) {
300 RETURN_IF_NOT_OK(
301 new_schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar)));
302 } else {
303 RETURN_IF_NOT_OK(new_schema->AddColumn(
304 ColDescriptor("coarse_label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar)));
305 TensorShape another_scalar = TensorShape::CreateScalar();
306 RETURN_IF_NOT_OK(new_schema->AddColumn(
307 ColDescriptor("fine_label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &another_scalar)));
308 }
309 std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
310 int32_t num_workers = cfg->num_parallel_workers();
311 int32_t op_connect_size = cfg->op_connector_size();
312 std::shared_ptr<CifarOp> op = std::make_shared<CifarOp>(type, usage, num_workers, dir, op_connect_size,
313 std::move(new_schema), std::move(new_sampler));
314
315 RETURN_IF_NOT_OK(op->GetCifarFiles());
316 if (op->cifar_type_ == kCifar10) {
317 constexpr int64_t num_cifar10_records = 10000;
318 for (auto &file : op->cifar_files_) {
319 Path file_path(file);
320 CHECK_FAIL_RETURN_UNEXPECTED(
321 file_path.Exists() && !file_path.IsDirectory(),
322 "Invalid file, failed to open cifar10 file: " + file + ", make sure file not damaged or permission denied.");
323 std::string file_name = file_path.Basename();
324
325 if (op->usage_ == "train") {
326 if (file_name.find("data_batch") == std::string::npos) continue;
327 } else if (op->usage_ == "test") {
328 if (file_name.find("test_batch") == std::string::npos) continue;
329 } else { // get all the files that contain the word batch, aka any cifar 100 files
330 if (file_name.find("batch") == std::string::npos) continue;
331 }
332
333 std::ifstream in(file, std::ios::binary);
334
335 CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar10 file: " + file +
336 ", make sure file not damaged or permission denied.");
337 *count = *count + num_cifar10_records;
338 }
339 return Status::OK();
340 } else {
341 const uint32_t kCifar100RecordsPerTestFile = 10000;
342 const uint32_t kCifar100RecordsPerTrainFile = 50000;
343 int64_t num_cifar100_records = 0;
344 for (auto &file : op->cifar_files_) {
345 Path file_path(file);
346 std::string file_name = file_path.Basename();
347
348 CHECK_FAIL_RETURN_UNEXPECTED(
349 file_path.Exists() && !file_path.IsDirectory(),
350 "Invalid file, failed to find cifar100 file: " + file + ", make sure file not damaged or permission denied.");
351
352 if (op->usage_ == "train" && file_path.Basename().find("train") == std::string::npos) continue;
353 if (op->usage_ == "test" && file_path.Basename().find("test") == std::string::npos) continue;
354
355 if (file_name.find("test") != std::string::npos) {
356 num_cifar100_records += kCifar100RecordsPerTestFile;
357 } else if (file_name.find("train") != std::string::npos) {
358 num_cifar100_records += kCifar100RecordsPerTrainFile;
359 }
360 std::ifstream in(file, std::ios::binary);
361 CHECK_FAIL_RETURN_UNEXPECTED(in.is_open(), "Invalid file, failed to open cifar100 file: " + file +
362 ", make sure file not damaged or permission denied.");
363 }
364 *count = num_cifar100_records;
365 return Status::OK();
366 }
367 }
368
ComputeColMap()369 Status CifarOp::ComputeColMap() {
370 // set the column name map (base class field)
371 if (column_name_id_map_.empty()) {
372 for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
373 column_name_id_map_[data_schema_->Column(i).Name()] = i;
374 }
375 } else {
376 MS_LOG(WARNING) << "Column name map is already set!";
377 }
378 return Status::OK();
379 }
380
381 } // namespace dataset
382 } // namespace mindspore
383