1 /** 2 * Copyright 2019-2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #include <iostream> 17 #include <memory> 18 #include <vector> 19 20 #include "minddata/dataset/core/client.h" 21 #include "minddata/dataset/engine/data_schema.h" 22 #include "minddata/dataset/engine/jagged_connector.h" 23 #include "common/common.h" 24 #include "gtest/gtest.h" 25 #include "utils/log_adapter.h" 26 27 namespace common = mindspore::common; 28 29 using namespace mindspore::dataset; 30 using mindspore::LogStream; 31 using mindspore::ExceptionType::NoExceptionType; 32 using mindspore::MsLogLevel::INFO; 33 34 class MindDataTestTFReaderOp : public UT::DatasetOpTesting {}; 35 36 TEST_F(MindDataTestTFReaderOp, TestTFReaderBasic1) { 37 // Start with an empty execution tree 38 auto my_tree = std::make_shared<ExecutionTree>(); 39 Status rc; 40 std::string dataset_path; 41 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 42 43 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 44 int32_t op_connector_size = config_manager->op_connector_size(); 45 int32_t num_workers = 1; 46 int32_t worker_connector_size = config_manager->worker_connector_size(); 47 std::vector<std::string> files = {dataset_path}; 48 std::vector<std::string> columns_to_load = {}; 49 50 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 51 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {}); 52 std::shared_ptr<TFReaderOp> my_tfreader_op = 53 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 54 columns_to_load, false, 1, 0, false); 55 rc = my_tfreader_op->Init(); 56 ASSERT_TRUE(rc.IsOk()); 57 rc = my_tree->AssociateNode(my_tfreader_op); 58 ASSERT_TRUE(rc.IsOk()); 59 60 rc = my_tree->AssignRoot(my_tfreader_op); 61 ASSERT_TRUE(rc.IsOk()); 62 63 MS_LOG(INFO) << "Launching tree and begin iteration."; 64 rc = my_tree->Prepare(); 65 ASSERT_TRUE(rc.IsOk()); 66 67 rc = my_tree->Launch(); 68 ASSERT_TRUE(rc.IsOk()); 69 70 // Start the loop of reading tensors from our pipeline 71 DatasetIterator di(my_tree); 72 TensorRow tensor_list; 73 rc = di.FetchNextTensorRow(&tensor_list); 74 ASSERT_TRUE(rc.IsOk()); 75 76 int row_count = 0; 77 while (!tensor_list.empty()) { 78 // Display the tensor by calling the printer on it 79 for (int i = 0; i < tensor_list.size(); i++) { 80 std::ostringstream ss; 81 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 82 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 83 } 84 85 rc = di.FetchNextTensorRow(&tensor_list); 86 ASSERT_TRUE(rc.IsOk()); 87 row_count++; 88 } 89 90 ASSERT_EQ(row_count, 12); 91 } 92 93 TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeRowsPerBuffer) { 94 // Start with an empty execution tree 95 auto my_tree = std::make_shared<ExecutionTree>(); 96 Status rc; 97 std::string dataset_path; 98 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 99 100 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 101 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {}); 102 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 103 int32_t op_connector_size = config_manager->op_connector_size(); 104 int32_t num_workers = 1; 105 int32_t worker_connector_size = config_manager->worker_connector_size(); 106 std::vector<std::string> files = {dataset_path}; 107 std::vector<std::string> columns_to_load = {}; 108 109 std::shared_ptr<TFReaderOp> my_tfreader_op = 110 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 111 columns_to_load, false, 1, 0, false); 112 rc = my_tfreader_op->Init(); 113 ASSERT_TRUE(rc.IsOk()); 114 rc = my_tree->AssociateNode(my_tfreader_op); 115 ASSERT_TRUE(rc.IsOk()); 116 117 rc = my_tree->AssignRoot(my_tfreader_op); 118 ASSERT_TRUE(rc.IsOk()); 119 120 MS_LOG(INFO) << "Launching tree and begin iteration."; 121 rc = my_tree->Prepare(); 122 ASSERT_TRUE(rc.IsOk()); 123 124 rc = my_tree->Launch(); 125 ASSERT_TRUE(rc.IsOk()); 126 127 // Start the loop of reading tensors from our pipeline 128 DatasetIterator di(my_tree); 129 TensorRow tensor_list; 130 rc = di.FetchNextTensorRow(&tensor_list); 131 ASSERT_TRUE(rc.IsOk()); 132 133 int row_count = 0; 134 while (!tensor_list.empty()) { 135 // Display the tensor by calling the printer on it 136 for (int i = 0; i < tensor_list.size(); i++) { 137 std::ostringstream ss; 138 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 139 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 140 } 141 142 rc = di.FetchNextTensorRow(&tensor_list); 143 ASSERT_TRUE(rc.IsOk()); 144 row_count++; 145 } 146 147 ASSERT_EQ(row_count, 12); 148 } 149 150 TEST_F(MindDataTestTFReaderOp, TestTFReaderSmallRowsPerBuffer) { 151 // Start with an empty execution tree 152 auto my_tree = std::make_shared<ExecutionTree>(); 153 Status rc; 154 std::string dataset_path; 155 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 156 157 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 158 int32_t op_connector_size = config_manager->op_connector_size(); 159 int32_t num_workers = 1; 160 int32_t worker_connector_size = config_manager->worker_connector_size(); 161 std::vector<std::string> files = {dataset_path}; 162 std::vector<std::string> columns_to_load = {}; 163 164 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 165 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {}); 166 std::shared_ptr<TFReaderOp> my_tfreader_op = 167 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 168 columns_to_load, false, 1, 0, false); 169 rc = my_tfreader_op->Init(); 170 ASSERT_TRUE(rc.IsOk()); 171 rc = my_tree->AssociateNode(my_tfreader_op); 172 ASSERT_TRUE(rc.IsOk()); 173 174 rc = my_tree->AssignRoot(my_tfreader_op); 175 ASSERT_TRUE(rc.IsOk()); 176 177 MS_LOG(INFO) << "Launching tree and begin iteration."; 178 rc = my_tree->Prepare(); 179 ASSERT_TRUE(rc.IsOk()); 180 181 rc = my_tree->Launch(); 182 ASSERT_TRUE(rc.IsOk()); 183 184 // Start the loop of reading tensors from our pipeline 185 DatasetIterator di(my_tree); 186 TensorRow tensor_list; 187 rc = di.FetchNextTensorRow(&tensor_list); 188 ASSERT_TRUE(rc.IsOk()); 189 190 int row_count = 0; 191 while (!tensor_list.empty()) { 192 // Display the tensor by calling the printer on it 193 for (int i = 0; i < tensor_list.size(); i++) { 194 std::ostringstream ss; 195 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 196 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 197 } 198 199 rc = di.FetchNextTensorRow(&tensor_list); 200 ASSERT_TRUE(rc.IsOk()); 201 row_count++; 202 } 203 204 ASSERT_EQ(row_count, 12); 205 } 206 207 TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeQueueSize) { 208 // Start with an empty execution tree 209 auto my_tree = std::make_shared<ExecutionTree>(); 210 Status rc; 211 std::string dataset_path; 212 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 213 214 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 215 int32_t op_connector_size = config_manager->op_connector_size(); 216 int32_t num_workers = 1; 217 int32_t worker_connector_size = 1; 218 std::vector<std::string> files = {dataset_path}; 219 std::vector<std::string> columns_to_load = {}; 220 221 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 222 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {}); 223 std::shared_ptr<TFReaderOp> my_tfreader_op = 224 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 225 columns_to_load, false, 1, 0, false); 226 rc = my_tfreader_op->Init(); 227 ASSERT_TRUE(rc.IsOk()); 228 rc = my_tree->AssociateNode(my_tfreader_op); 229 ASSERT_TRUE(rc.IsOk()); 230 231 rc = my_tree->AssignRoot(my_tfreader_op); 232 ASSERT_TRUE(rc.IsOk()); 233 234 MS_LOG(INFO) << "Launching tree and begin iteration."; 235 rc = my_tree->Prepare(); 236 ASSERT_TRUE(rc.IsOk()); 237 238 rc = my_tree->Launch(); 239 ASSERT_TRUE(rc.IsOk()); 240 241 // Start the loop of reading tensors from our pipeline 242 DatasetIterator di(my_tree); 243 TensorRow tensor_list; 244 rc = di.FetchNextTensorRow(&tensor_list); 245 ASSERT_TRUE(rc.IsOk()); 246 247 int row_count = 0; 248 while (!tensor_list.empty()) { 249 // Display the tensor by calling the printer on it 250 for (int i = 0; i < tensor_list.size(); i++) { 251 std::ostringstream ss; 252 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 253 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 254 } 255 256 rc = di.FetchNextTensorRow(&tensor_list); 257 ASSERT_TRUE(rc.IsOk()); 258 row_count++; 259 } 260 261 ASSERT_EQ(row_count, 12); 262 } 263 264 TEST_F(MindDataTestTFReaderOp, TestTFReaderOneThread) { 265 // Start with an empty execution tree 266 auto my_tree = std::make_shared<ExecutionTree>(); 267 Status rc; 268 std::string dataset_path; 269 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 270 271 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 272 int32_t op_connector_size = config_manager->op_connector_size(); 273 int32_t num_workers = 1; 274 int32_t worker_connector_size = config_manager->worker_connector_size(); 275 std::vector<std::string> files = {dataset_path}; 276 std::vector<std::string> columns_to_load = {}; 277 278 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 279 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {}); 280 std::shared_ptr<TFReaderOp> my_tfreader_op = 281 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 282 columns_to_load, false, 1, 0, false); 283 rc = my_tfreader_op->Init(); 284 ASSERT_TRUE(rc.IsOk()); 285 rc = my_tree->AssociateNode(my_tfreader_op); 286 ASSERT_TRUE(rc.IsOk()); 287 288 rc = my_tree->AssignRoot(my_tfreader_op); 289 ASSERT_TRUE(rc.IsOk()); 290 291 MS_LOG(INFO) << "Launching tree and begin iteration."; 292 rc = my_tree->Prepare(); 293 ASSERT_TRUE(rc.IsOk()); 294 295 rc = my_tree->Launch(); 296 ASSERT_TRUE(rc.IsOk()); 297 298 // Start the loop of reading tensors from our pipeline 299 DatasetIterator di(my_tree); 300 TensorRow tensor_list; 301 rc = di.FetchNextTensorRow(&tensor_list); 302 ASSERT_TRUE(rc.IsOk()); 303 304 int row_count = 0; 305 while (!tensor_list.empty()) { 306 // Display the tensor by calling the printer on it 307 for (int i = 0; i < tensor_list.size(); i++) { 308 std::ostringstream ss; 309 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 310 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 311 } 312 313 rc = di.FetchNextTensorRow(&tensor_list); 314 ASSERT_TRUE(rc.IsOk()); 315 row_count++; 316 } 317 318 ASSERT_EQ(row_count, 12); 319 } 320 321 TEST_F(MindDataTestTFReaderOp, TestTFReaderRepeat) { 322 // Start with an empty execution tree 323 auto my_tree = std::make_shared<ExecutionTree>(); 324 Status rc; 325 std::string dataset_path; 326 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 327 328 // TFReaderOp 329 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 330 int32_t op_connector_size = config_manager->op_connector_size(); 331 int32_t num_workers = 1; 332 int32_t worker_connector_size = 16; 333 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 334 std::vector<std::string> files = {dataset_path}; 335 std::vector<std::string> columns_to_load = {}; 336 337 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {}); 338 std::shared_ptr<TFReaderOp> my_tfreader_op = 339 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 340 columns_to_load, false, 1, 0, false); 341 rc = my_tfreader_op->Init(); 342 ASSERT_TRUE(rc.IsOk()); 343 rc = my_tree->AssociateNode(my_tfreader_op); 344 ASSERT_TRUE(rc.IsOk()); 345 346 // RepeatOp 347 uint32_t num_repeats = 3; 348 std::shared_ptr<RepeatOp> my_repeat_op = std::make_shared<RepeatOp>(num_repeats); 349 rc = my_tree->AssociateNode(my_repeat_op); 350 ASSERT_TRUE(rc.IsOk()); 351 352 // Set children/root layout. 353 my_tfreader_op->SetTotalRepeats(num_repeats); 354 my_tfreader_op->SetNumRepeatsPerEpoch(num_repeats); 355 rc = my_repeat_op->AddChild(my_tfreader_op); 356 ASSERT_TRUE(rc.IsOk()); 357 rc = my_tree->AssignRoot(my_repeat_op); 358 ASSERT_TRUE(rc.IsOk()); 359 360 MS_LOG(INFO) << "Launching tree and begin iteration."; 361 rc = my_tree->Prepare(); 362 ASSERT_TRUE(rc.IsOk()); 363 364 rc = my_tree->Launch(); 365 ASSERT_TRUE(rc.IsOk()); 366 367 // Start the loop of reading tensors from our pipeline 368 DatasetIterator di(my_tree); 369 TensorRow tensor_list; 370 rc = di.FetchNextTensorRow(&tensor_list); 371 ASSERT_TRUE(rc.IsOk()); 372 373 int row_count = 0; 374 while (!tensor_list.empty()) { 375 MS_LOG(INFO) << "Row display for row #: " << row_count << "."; 376 377 // Display the tensor by calling the printer on it 378 for (int i = 0; i < tensor_list.size(); i++) { 379 std::ostringstream ss; 380 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 381 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 382 } 383 384 rc = di.FetchNextTensorRow(&tensor_list); 385 ASSERT_TRUE(rc.IsOk()); 386 row_count++; 387 } 388 389 ASSERT_EQ(row_count, 12 * 3); 390 } 391 392 TEST_F(MindDataTestTFReaderOp, TestTFReaderSchemaConstructor) { 393 // Start with an empty execution tree 394 auto my_tree = std::make_shared<ExecutionTree>(); 395 Status rc; 396 std::string dataset_path; 397 dataset_path = datasets_root_path_ + "/testTFTestAllTypes"; 398 std::vector<std::string> files = {dataset_path + "/test.data"}; 399 400 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 401 std::vector<std::string> columns_to_load; 402 columns_to_load.push_back("col_sint32"); 403 columns_to_load.push_back("col_binary"); 404 schema->LoadSchemaFile(dataset_path + "/datasetSchema.json", columns_to_load); 405 406 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 407 int32_t op_connector_size = config_manager->op_connector_size(); 408 int32_t worker_connector_size = config_manager->worker_connector_size(); 409 int32_t num_workers = 1; 410 411 std::shared_ptr<TFReaderOp> my_tfreader_op = 412 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 413 columns_to_load, false, 1, 0, false); 414 rc = my_tfreader_op->Init(); 415 ASSERT_TRUE(rc.IsOk()); 416 rc = my_tree->AssociateNode(my_tfreader_op); 417 ASSERT_TRUE(rc.IsOk()); 418 419 rc = my_tree->AssignRoot(my_tfreader_op); 420 ASSERT_TRUE(rc.IsOk()); 421 422 MS_LOG(INFO) << "Launching tree and begin iteration."; 423 rc = my_tree->Prepare(); 424 ASSERT_TRUE(rc.IsOk()); 425 426 rc = my_tree->Launch(); 427 ASSERT_TRUE(rc.IsOk()); 428 429 // Start the loop of reading tensors from our pipeline 430 DatasetIterator di(my_tree); 431 TensorRow tensor_list; 432 rc = di.FetchNextTensorRow(&tensor_list); 433 ASSERT_TRUE(rc.IsOk()); 434 435 int row_count = 0; 436 while (!tensor_list.empty()) { 437 // Display the tensor by calling the printer on it 438 ASSERT_EQ(tensor_list.size(), columns_to_load.size()); 439 440 for (int i = 0; i < tensor_list.size(); i++) { 441 std::ostringstream ss; 442 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 443 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 444 } 445 446 rc = di.FetchNextTensorRow(&tensor_list); 447 ASSERT_TRUE(rc.IsOk()); 448 row_count++; 449 } 450 451 ASSERT_EQ(row_count, 12); 452 } 453 454 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Row) { 455 // Start with an empty execution tree 456 auto my_tree = std::make_shared<ExecutionTree>(); 457 Status rc; 458 std::string dataset_path; 459 dataset_path = datasets_root_path_ + "/testTFTestAllTypes"; 460 461 std::string data_schema_filepath = dataset_path + "/datasetSchema1Row.json"; 462 463 // TFReaderOp 464 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 465 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema1Row.json", {}); 466 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 467 int32_t op_connector_size = config_manager->op_connector_size(); 468 int32_t num_workers = 1; 469 int32_t worker_connector_size = config_manager->worker_connector_size(); 470 std::vector<std::string> files = {dataset_path + "/test.data"}; 471 std::vector<std::string> columns_to_load = {}; 472 473 std::shared_ptr<TFReaderOp> my_tfreader_op = 474 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 475 columns_to_load, false, 1, 0, false); 476 rc = my_tfreader_op->Init(); 477 ASSERT_TRUE(rc.IsOk()); 478 rc = my_tree->AssociateNode(my_tfreader_op); 479 ASSERT_TRUE(rc.IsOk()); 480 481 rc = my_tree->AssignRoot(my_tfreader_op); 482 ASSERT_TRUE(rc.IsOk()); 483 484 MS_LOG(INFO) << "Launching tree and begin iteration."; 485 rc = my_tree->Prepare(); 486 ASSERT_TRUE(rc.IsOk()); 487 488 rc = my_tree->Launch(); 489 ASSERT_TRUE(rc.IsOk()); 490 491 // Start the loop of reading tensors from our pipeline 492 DatasetIterator di(my_tree); 493 TensorRow tensor_list; 494 rc = di.FetchNextTensorRow(&tensor_list); 495 ASSERT_TRUE(rc.IsOk()); 496 497 int row_count = 0; 498 while (!tensor_list.empty()) { 499 MS_LOG(INFO) << "Row display for row #: " << row_count << "."; 500 501 // Display the tensor by calling the printer on it 502 for (int i = 0; i < tensor_list.size(); i++) { 503 std::ostringstream ss; 504 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 505 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 506 } 507 508 rc = di.FetchNextTensorRow(&tensor_list); 509 ASSERT_TRUE(rc.IsOk()); 510 row_count++; 511 } 512 513 ASSERT_EQ(row_count, 1); 514 } 515 516 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Buffer) { 517 // Start with an empty execution tree 518 auto my_tree = std::make_shared<ExecutionTree>(); 519 Status rc; 520 std::string dataset_path; 521 dataset_path = datasets_root_path_ + "/testTFTestAllTypes"; 522 523 std::string data_schema_filepath = dataset_path + "/datasetSchema5Rows.json"; 524 525 // TFReaderOp 526 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 527 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema5Rows.json", {}); 528 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 529 int32_t op_connector_size = config_manager->op_connector_size(); 530 int32_t num_workers = 1; 531 int32_t worker_connector_size = config_manager->worker_connector_size(); 532 std::vector<std::string> files = {dataset_path + "/test.data"}; 533 std::vector<std::string> columns_to_load = {}; 534 535 std::shared_ptr<TFReaderOp> my_tfreader_op = 536 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 537 columns_to_load, false, 1, 0, false); 538 rc = my_tfreader_op->Init(); 539 ASSERT_TRUE(rc.IsOk()); 540 rc = my_tree->AssociateNode(my_tfreader_op); 541 ASSERT_TRUE(rc.IsOk()); 542 543 rc = my_tree->AssignRoot(my_tfreader_op); 544 ASSERT_TRUE(rc.IsOk()); 545 546 MS_LOG(INFO) << "Launching tree and begin iteration."; 547 rc = my_tree->Prepare(); 548 ASSERT_TRUE(rc.IsOk()); 549 550 rc = my_tree->Launch(); 551 ASSERT_TRUE(rc.IsOk()); 552 553 // Start the loop of reading tensors from our pipeline 554 DatasetIterator di(my_tree); 555 TensorRow tensor_list; 556 rc = di.FetchNextTensorRow(&tensor_list); 557 ASSERT_TRUE(rc.IsOk()); 558 559 int row_count = 0; 560 while (!tensor_list.empty()) { 561 MS_LOG(INFO) << "Row display for row #: " << row_count << "."; 562 563 // Display the tensor by calling the printer on it 564 for (int i = 0; i < tensor_list.size(); i++) { 565 std::ostringstream ss; 566 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 567 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 568 } 569 570 rc = di.FetchNextTensorRow(&tensor_list); 571 ASSERT_TRUE(rc.IsOk()); 572 row_count++; 573 } 574 575 ASSERT_EQ(row_count, 5); 576 } 577 578 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake7Rows) { 579 // Start with an empty execution tree 580 auto my_tree = std::make_shared<ExecutionTree>(); 581 Status rc; 582 std::string dataset_path; 583 dataset_path = datasets_root_path_ + "/testTFTestAllTypes"; 584 585 std::string data_schema_filepath = dataset_path + "/datasetSchema7Rows.json"; 586 587 // TFReaderOp 588 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 589 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema7Rows.json", {}); 590 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 591 int32_t op_connector_size = config_manager->op_connector_size(); 592 int32_t num_workers = 1; 593 int32_t worker_connector_size = config_manager->worker_connector_size(); 594 std::vector<std::string> files = {dataset_path + "/test.data"}; 595 std::vector<std::string> columns_to_load = {}; 596 597 std::shared_ptr<TFReaderOp> my_tfreader_op = 598 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 599 columns_to_load, false, 1, 0, false); 600 rc = my_tfreader_op->Init(); 601 ASSERT_TRUE(rc.IsOk()); 602 rc = my_tree->AssociateNode(my_tfreader_op); 603 ASSERT_TRUE(rc.IsOk()); 604 605 rc = my_tree->AssignRoot(my_tfreader_op); 606 ASSERT_TRUE(rc.IsOk()); 607 608 MS_LOG(INFO) << "Launching tree and begin iteration."; 609 rc = my_tree->Prepare(); 610 ASSERT_TRUE(rc.IsOk()); 611 612 rc = my_tree->Launch(); 613 ASSERT_TRUE(rc.IsOk()); 614 615 // Start the loop of reading tensors from our pipeline 616 DatasetIterator di(my_tree); 617 TensorRow tensor_list; 618 rc = di.FetchNextTensorRow(&tensor_list); 619 ASSERT_TRUE(rc.IsOk()); 620 621 int row_count = 0; 622 while (!tensor_list.empty()) { 623 MS_LOG(INFO) << "Row display for row #: " << row_count << "."; 624 625 // Display the tensor by calling the printer on it 626 for (int i = 0; i < tensor_list.size(); i++) { 627 std::ostringstream ss; 628 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 629 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 630 } 631 632 rc = di.FetchNextTensorRow(&tensor_list); 633 ASSERT_TRUE(rc.IsOk()); 634 row_count++; 635 } 636 637 ASSERT_EQ(row_count, 7); 638 } 639 640 TEST_F(MindDataTestTFReaderOp, TestTFReaderBasicNoSchema) { 641 // Start with an empty execution tree 642 auto my_tree = std::make_shared<ExecutionTree>(); 643 Status rc; 644 std::string dataset_path; 645 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 646 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager(); 647 int32_t op_connector_size = config_manager->op_connector_size(); 648 int32_t num_workers = 1; 649 std::vector<std::string> columns_to_load = {}; 650 std::vector<std::string> files = {dataset_path}; 651 int32_t worker_connector_size = config_manager->worker_connector_size(); 652 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); 653 std::shared_ptr<TFReaderOp> my_tfreader_op = 654 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size, 655 columns_to_load, false, 1, 0, false); 656 rc = my_tfreader_op->Init(); 657 ASSERT_TRUE(rc.IsOk()); 658 rc = my_tree->AssociateNode(my_tfreader_op); 659 ASSERT_TRUE(rc.IsOk()); 660 661 rc = my_tree->AssignRoot(my_tfreader_op); 662 ASSERT_TRUE(rc.IsOk()); 663 664 MS_LOG(INFO) << "Launching tree and begin iteration."; 665 rc = my_tree->Prepare(); 666 ASSERT_TRUE(rc.IsOk()); 667 668 rc = my_tree->Launch(); 669 ASSERT_TRUE(rc.IsOk()); 670 671 // Start the loop of reading tensors from our pipeline 672 DatasetIterator di(my_tree); 673 TensorRow tensor_list; 674 rc = di.FetchNextTensorRow(&tensor_list); 675 ASSERT_TRUE(rc.IsOk()); 676 677 int row_count = 0; 678 while (!tensor_list.empty()) { 679 // Display the tensor by calling the printer on it 680 ASSERT_EQ(tensor_list.size(), 9); 681 for (int i = 0; i < tensor_list.size(); i++) { 682 std::ostringstream ss; 683 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl; 684 MS_LOG(INFO) << "Tensor print: " << ss.str() << "."; 685 } 686 687 rc = di.FetchNextTensorRow(&tensor_list); 688 ASSERT_TRUE(rc.IsOk()); 689 row_count++; 690 } 691 692 ASSERT_EQ(row_count, 12); 693 } 694 695 TEST_F(MindDataTestTFReaderOp, TestTotalRowsBasic) { 696 std::string tf_file = datasets_root_path_ + "/testTFTestAllTypes/test.data"; 697 698 std::vector<std::string> filenames; 699 700 for (int i = 0; i < 5; i++) { 701 filenames.push_back(tf_file); 702 } 703 704 int64_t total_rows = 0; 705 TFReaderOp::CountTotalRows(&total_rows, filenames, 1); 706 ASSERT_EQ(total_rows, 60); 707 TFReaderOp::CountTotalRows(&total_rows, filenames, 2); 708 ASSERT_EQ(total_rows, 60); 709 TFReaderOp::CountTotalRows(&total_rows, filenames, 3); 710 ASSERT_EQ(total_rows, 60); 711 TFReaderOp::CountTotalRows(&total_rows, filenames, 4); 712 ASSERT_EQ(total_rows, 60); 713 TFReaderOp::CountTotalRows(&total_rows, filenames, 5); 714 ASSERT_EQ(total_rows, 60); 715 TFReaderOp::CountTotalRows(&total_rows, filenames, 6); 716 ASSERT_EQ(total_rows, 60); 717 TFReaderOp::CountTotalRows(&total_rows, filenames, 729); 718 ASSERT_EQ(total_rows, 60); 719 TFReaderOp::CountTotalRows(&total_rows, filenames, 1, true); 720 ASSERT_EQ(total_rows, 60); 721 TFReaderOp::CountTotalRows(&total_rows, filenames, 2, true); 722 ASSERT_EQ(total_rows, 60); 723 TFReaderOp::CountTotalRows(&total_rows, filenames, 3, true); 724 ASSERT_EQ(total_rows, 60); 725 TFReaderOp::CountTotalRows(&total_rows, filenames, 4, true); 726 ASSERT_EQ(total_rows, 60); 727 TFReaderOp::CountTotalRows(&total_rows, filenames, 5, true); 728 ASSERT_EQ(total_rows, 60); 729 TFReaderOp::CountTotalRows(&total_rows, filenames, 6, true); 730 ASSERT_EQ(total_rows, 60); 731 TFReaderOp::CountTotalRows(&total_rows, filenames, 729, true); 732 ASSERT_EQ(total_rows, 60); 733 } 734