1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <iostream>
17 #include <memory>
18 #include <vector>
19
20 #include "minddata/dataset/core/client.h"
21 #include "minddata/dataset/engine/data_schema.h"
22 #include "minddata/dataset/engine/jagged_connector.h"
23 #include "common/common.h"
24 #include "gtest/gtest.h"
25 #include "utils/log_adapter.h"
26
27 namespace common = mindspore::common;
28
29 using namespace mindspore::dataset;
30 using mindspore::LogStream;
31 using mindspore::ExceptionType::NoExceptionType;
32 using mindspore::MsLogLevel::INFO;
33
34 class MindDataTestTFReaderOp : public UT::DatasetOpTesting {};
35
TEST_F(MindDataTestTFReaderOp,TestTFReaderBasic1)36 TEST_F(MindDataTestTFReaderOp, TestTFReaderBasic1) {
37 // Start with an empty execution tree
38 auto my_tree = std::make_shared<ExecutionTree>();
39 Status rc;
40 std::string dataset_path;
41 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
42
43 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
44 int32_t op_connector_size = config_manager->op_connector_size();
45 int32_t num_workers = 1;
46 int32_t worker_connector_size = config_manager->worker_connector_size();
47 std::vector<std::string> files = {dataset_path};
48 std::vector<std::string> columns_to_load = {};
49
50 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
51 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
52 std::shared_ptr<TFReaderOp> my_tfreader_op =
53 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
54 columns_to_load, false, 1, 0, false);
55 rc = my_tfreader_op->Init();
56 ASSERT_TRUE(rc.IsOk());
57 rc = my_tree->AssociateNode(my_tfreader_op);
58 ASSERT_TRUE(rc.IsOk());
59
60 rc = my_tree->AssignRoot(my_tfreader_op);
61 ASSERT_TRUE(rc.IsOk());
62
63 MS_LOG(INFO) << "Launching tree and begin iteration.";
64 rc = my_tree->Prepare();
65 ASSERT_TRUE(rc.IsOk());
66
67 rc = my_tree->Launch();
68 ASSERT_TRUE(rc.IsOk());
69
70 // Start the loop of reading tensors from our pipeline
71 DatasetIterator di(my_tree);
72 TensorRow tensor_list;
73 rc = di.FetchNextTensorRow(&tensor_list);
74 ASSERT_TRUE(rc.IsOk());
75
76 int row_count = 0;
77 while (!tensor_list.empty()) {
78 // Display the tensor by calling the printer on it
79 for (int i = 0; i < tensor_list.size(); i++) {
80 std::ostringstream ss;
81 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
82 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
83 }
84
85 rc = di.FetchNextTensorRow(&tensor_list);
86 ASSERT_TRUE(rc.IsOk());
87 row_count++;
88 }
89
90 ASSERT_EQ(row_count, 12);
91 }
92
TEST_F(MindDataTestTFReaderOp,TestTFReaderLargeRowsPerBuffer)93 TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeRowsPerBuffer) {
94 // Start with an empty execution tree
95 auto my_tree = std::make_shared<ExecutionTree>();
96 Status rc;
97 std::string dataset_path;
98 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
99
100 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
101 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
102 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
103 int32_t op_connector_size = config_manager->op_connector_size();
104 int32_t num_workers = 1;
105 int32_t worker_connector_size = config_manager->worker_connector_size();
106 std::vector<std::string> files = {dataset_path};
107 std::vector<std::string> columns_to_load = {};
108
109 std::shared_ptr<TFReaderOp> my_tfreader_op =
110 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
111 columns_to_load, false, 1, 0, false);
112 rc = my_tfreader_op->Init();
113 ASSERT_TRUE(rc.IsOk());
114 rc = my_tree->AssociateNode(my_tfreader_op);
115 ASSERT_TRUE(rc.IsOk());
116
117 rc = my_tree->AssignRoot(my_tfreader_op);
118 ASSERT_TRUE(rc.IsOk());
119
120 MS_LOG(INFO) << "Launching tree and begin iteration.";
121 rc = my_tree->Prepare();
122 ASSERT_TRUE(rc.IsOk());
123
124 rc = my_tree->Launch();
125 ASSERT_TRUE(rc.IsOk());
126
127 // Start the loop of reading tensors from our pipeline
128 DatasetIterator di(my_tree);
129 TensorRow tensor_list;
130 rc = di.FetchNextTensorRow(&tensor_list);
131 ASSERT_TRUE(rc.IsOk());
132
133 int row_count = 0;
134 while (!tensor_list.empty()) {
135 // Display the tensor by calling the printer on it
136 for (int i = 0; i < tensor_list.size(); i++) {
137 std::ostringstream ss;
138 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
139 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
140 }
141
142 rc = di.FetchNextTensorRow(&tensor_list);
143 ASSERT_TRUE(rc.IsOk());
144 row_count++;
145 }
146
147 ASSERT_EQ(row_count, 12);
148 }
149
TEST_F(MindDataTestTFReaderOp,TestTFReaderSmallRowsPerBuffer)150 TEST_F(MindDataTestTFReaderOp, TestTFReaderSmallRowsPerBuffer) {
151 // Start with an empty execution tree
152 auto my_tree = std::make_shared<ExecutionTree>();
153 Status rc;
154 std::string dataset_path;
155 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
156
157 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
158 int32_t op_connector_size = config_manager->op_connector_size();
159 int32_t num_workers = 1;
160 int32_t worker_connector_size = config_manager->worker_connector_size();
161 std::vector<std::string> files = {dataset_path};
162 std::vector<std::string> columns_to_load = {};
163
164 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
165 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
166 std::shared_ptr<TFReaderOp> my_tfreader_op =
167 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
168 columns_to_load, false, 1, 0, false);
169 rc = my_tfreader_op->Init();
170 ASSERT_TRUE(rc.IsOk());
171 rc = my_tree->AssociateNode(my_tfreader_op);
172 ASSERT_TRUE(rc.IsOk());
173
174 rc = my_tree->AssignRoot(my_tfreader_op);
175 ASSERT_TRUE(rc.IsOk());
176
177 MS_LOG(INFO) << "Launching tree and begin iteration.";
178 rc = my_tree->Prepare();
179 ASSERT_TRUE(rc.IsOk());
180
181 rc = my_tree->Launch();
182 ASSERT_TRUE(rc.IsOk());
183
184 // Start the loop of reading tensors from our pipeline
185 DatasetIterator di(my_tree);
186 TensorRow tensor_list;
187 rc = di.FetchNextTensorRow(&tensor_list);
188 ASSERT_TRUE(rc.IsOk());
189
190 int row_count = 0;
191 while (!tensor_list.empty()) {
192 // Display the tensor by calling the printer on it
193 for (int i = 0; i < tensor_list.size(); i++) {
194 std::ostringstream ss;
195 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
196 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
197 }
198
199 rc = di.FetchNextTensorRow(&tensor_list);
200 ASSERT_TRUE(rc.IsOk());
201 row_count++;
202 }
203
204 ASSERT_EQ(row_count, 12);
205 }
206
TEST_F(MindDataTestTFReaderOp,TestTFReaderLargeQueueSize)207 TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeQueueSize) {
208 // Start with an empty execution tree
209 auto my_tree = std::make_shared<ExecutionTree>();
210 Status rc;
211 std::string dataset_path;
212 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
213
214 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
215 int32_t op_connector_size = config_manager->op_connector_size();
216 int32_t num_workers = 1;
217 int32_t worker_connector_size = 1;
218 std::vector<std::string> files = {dataset_path};
219 std::vector<std::string> columns_to_load = {};
220
221 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
222 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
223 std::shared_ptr<TFReaderOp> my_tfreader_op =
224 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
225 columns_to_load, false, 1, 0, false);
226 rc = my_tfreader_op->Init();
227 ASSERT_TRUE(rc.IsOk());
228 rc = my_tree->AssociateNode(my_tfreader_op);
229 ASSERT_TRUE(rc.IsOk());
230
231 rc = my_tree->AssignRoot(my_tfreader_op);
232 ASSERT_TRUE(rc.IsOk());
233
234 MS_LOG(INFO) << "Launching tree and begin iteration.";
235 rc = my_tree->Prepare();
236 ASSERT_TRUE(rc.IsOk());
237
238 rc = my_tree->Launch();
239 ASSERT_TRUE(rc.IsOk());
240
241 // Start the loop of reading tensors from our pipeline
242 DatasetIterator di(my_tree);
243 TensorRow tensor_list;
244 rc = di.FetchNextTensorRow(&tensor_list);
245 ASSERT_TRUE(rc.IsOk());
246
247 int row_count = 0;
248 while (!tensor_list.empty()) {
249 // Display the tensor by calling the printer on it
250 for (int i = 0; i < tensor_list.size(); i++) {
251 std::ostringstream ss;
252 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
253 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
254 }
255
256 rc = di.FetchNextTensorRow(&tensor_list);
257 ASSERT_TRUE(rc.IsOk());
258 row_count++;
259 }
260
261 ASSERT_EQ(row_count, 12);
262 }
263
TEST_F(MindDataTestTFReaderOp,TestTFReaderOneThread)264 TEST_F(MindDataTestTFReaderOp, TestTFReaderOneThread) {
265 // Start with an empty execution tree
266 auto my_tree = std::make_shared<ExecutionTree>();
267 Status rc;
268 std::string dataset_path;
269 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
270
271 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
272 int32_t op_connector_size = config_manager->op_connector_size();
273 int32_t num_workers = 1;
274 int32_t worker_connector_size = config_manager->worker_connector_size();
275 std::vector<std::string> files = {dataset_path};
276 std::vector<std::string> columns_to_load = {};
277
278 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
279 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
280 std::shared_ptr<TFReaderOp> my_tfreader_op =
281 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
282 columns_to_load, false, 1, 0, false);
283 rc = my_tfreader_op->Init();
284 ASSERT_TRUE(rc.IsOk());
285 rc = my_tree->AssociateNode(my_tfreader_op);
286 ASSERT_TRUE(rc.IsOk());
287
288 rc = my_tree->AssignRoot(my_tfreader_op);
289 ASSERT_TRUE(rc.IsOk());
290
291 MS_LOG(INFO) << "Launching tree and begin iteration.";
292 rc = my_tree->Prepare();
293 ASSERT_TRUE(rc.IsOk());
294
295 rc = my_tree->Launch();
296 ASSERT_TRUE(rc.IsOk());
297
298 // Start the loop of reading tensors from our pipeline
299 DatasetIterator di(my_tree);
300 TensorRow tensor_list;
301 rc = di.FetchNextTensorRow(&tensor_list);
302 ASSERT_TRUE(rc.IsOk());
303
304 int row_count = 0;
305 while (!tensor_list.empty()) {
306 // Display the tensor by calling the printer on it
307 for (int i = 0; i < tensor_list.size(); i++) {
308 std::ostringstream ss;
309 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
310 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
311 }
312
313 rc = di.FetchNextTensorRow(&tensor_list);
314 ASSERT_TRUE(rc.IsOk());
315 row_count++;
316 }
317
318 ASSERT_EQ(row_count, 12);
319 }
320
TEST_F(MindDataTestTFReaderOp,TestTFReaderRepeat)321 TEST_F(MindDataTestTFReaderOp, TestTFReaderRepeat) {
322 // Start with an empty execution tree
323 auto my_tree = std::make_shared<ExecutionTree>();
324 Status rc;
325 std::string dataset_path;
326 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
327
328 // TFReaderOp
329 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
330 int32_t op_connector_size = config_manager->op_connector_size();
331 int32_t num_workers = 1;
332 int32_t worker_connector_size = 16;
333 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
334 std::vector<std::string> files = {dataset_path};
335 std::vector<std::string> columns_to_load = {};
336
337 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
338 std::shared_ptr<TFReaderOp> my_tfreader_op =
339 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
340 columns_to_load, false, 1, 0, false);
341 rc = my_tfreader_op->Init();
342 ASSERT_TRUE(rc.IsOk());
343 rc = my_tree->AssociateNode(my_tfreader_op);
344 ASSERT_TRUE(rc.IsOk());
345
346 // RepeatOp
347 uint32_t num_repeats = 3;
348 std::shared_ptr<RepeatOp> my_repeat_op = std::make_shared<RepeatOp>(num_repeats);
349 rc = my_tree->AssociateNode(my_repeat_op);
350 ASSERT_TRUE(rc.IsOk());
351
352 // Set children/root layout.
353 my_tfreader_op->SetTotalRepeats(num_repeats);
354 my_tfreader_op->SetNumRepeatsPerEpoch(num_repeats);
355 rc = my_repeat_op->AddChild(my_tfreader_op);
356 ASSERT_TRUE(rc.IsOk());
357 rc = my_tree->AssignRoot(my_repeat_op);
358 ASSERT_TRUE(rc.IsOk());
359
360 MS_LOG(INFO) << "Launching tree and begin iteration.";
361 rc = my_tree->Prepare();
362 ASSERT_TRUE(rc.IsOk());
363
364 rc = my_tree->Launch();
365 ASSERT_TRUE(rc.IsOk());
366
367 // Start the loop of reading tensors from our pipeline
368 DatasetIterator di(my_tree);
369 TensorRow tensor_list;
370 rc = di.FetchNextTensorRow(&tensor_list);
371 ASSERT_TRUE(rc.IsOk());
372
373 int row_count = 0;
374 while (!tensor_list.empty()) {
375 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
376
377 // Display the tensor by calling the printer on it
378 for (int i = 0; i < tensor_list.size(); i++) {
379 std::ostringstream ss;
380 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
381 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
382 }
383
384 rc = di.FetchNextTensorRow(&tensor_list);
385 ASSERT_TRUE(rc.IsOk());
386 row_count++;
387 }
388
389 ASSERT_EQ(row_count, 12 * 3);
390 }
391
TEST_F(MindDataTestTFReaderOp,TestTFReaderSchemaConstructor)392 TEST_F(MindDataTestTFReaderOp, TestTFReaderSchemaConstructor) {
393 // Start with an empty execution tree
394 auto my_tree = std::make_shared<ExecutionTree>();
395 Status rc;
396 std::string dataset_path;
397 dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
398 std::vector<std::string> files = {dataset_path + "/test.data"};
399
400 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
401 std::vector<std::string> columns_to_load;
402 columns_to_load.push_back("col_sint32");
403 columns_to_load.push_back("col_binary");
404 schema->LoadSchemaFile(dataset_path + "/datasetSchema.json", columns_to_load);
405
406 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
407 int32_t op_connector_size = config_manager->op_connector_size();
408 int32_t worker_connector_size = config_manager->worker_connector_size();
409 int32_t num_workers = 1;
410
411 std::shared_ptr<TFReaderOp> my_tfreader_op =
412 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
413 columns_to_load, false, 1, 0, false);
414 rc = my_tfreader_op->Init();
415 ASSERT_TRUE(rc.IsOk());
416 rc = my_tree->AssociateNode(my_tfreader_op);
417 ASSERT_TRUE(rc.IsOk());
418
419 rc = my_tree->AssignRoot(my_tfreader_op);
420 ASSERT_TRUE(rc.IsOk());
421
422 MS_LOG(INFO) << "Launching tree and begin iteration.";
423 rc = my_tree->Prepare();
424 ASSERT_TRUE(rc.IsOk());
425
426 rc = my_tree->Launch();
427 ASSERT_TRUE(rc.IsOk());
428
429 // Start the loop of reading tensors from our pipeline
430 DatasetIterator di(my_tree);
431 TensorRow tensor_list;
432 rc = di.FetchNextTensorRow(&tensor_list);
433 ASSERT_TRUE(rc.IsOk());
434
435 int row_count = 0;
436 while (!tensor_list.empty()) {
437 // Display the tensor by calling the printer on it
438 ASSERT_EQ(tensor_list.size(), columns_to_load.size());
439
440 for (int i = 0; i < tensor_list.size(); i++) {
441 std::ostringstream ss;
442 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
443 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
444 }
445
446 rc = di.FetchNextTensorRow(&tensor_list);
447 ASSERT_TRUE(rc.IsOk());
448 row_count++;
449 }
450
451 ASSERT_EQ(row_count, 12);
452 }
453
TEST_F(MindDataTestTFReaderOp,TestTFReaderTake1Row)454 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Row) {
455 // Start with an empty execution tree
456 auto my_tree = std::make_shared<ExecutionTree>();
457 Status rc;
458 std::string dataset_path;
459 dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
460
461 std::string data_schema_filepath = dataset_path + "/datasetSchema1Row.json";
462
463 // TFReaderOp
464 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
465 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema1Row.json", {});
466 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
467 int32_t op_connector_size = config_manager->op_connector_size();
468 int32_t num_workers = 1;
469 int32_t worker_connector_size = config_manager->worker_connector_size();
470 std::vector<std::string> files = {dataset_path + "/test.data"};
471 std::vector<std::string> columns_to_load = {};
472
473 std::shared_ptr<TFReaderOp> my_tfreader_op =
474 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
475 columns_to_load, false, 1, 0, false);
476 rc = my_tfreader_op->Init();
477 ASSERT_TRUE(rc.IsOk());
478 rc = my_tree->AssociateNode(my_tfreader_op);
479 ASSERT_TRUE(rc.IsOk());
480
481 rc = my_tree->AssignRoot(my_tfreader_op);
482 ASSERT_TRUE(rc.IsOk());
483
484 MS_LOG(INFO) << "Launching tree and begin iteration.";
485 rc = my_tree->Prepare();
486 ASSERT_TRUE(rc.IsOk());
487
488 rc = my_tree->Launch();
489 ASSERT_TRUE(rc.IsOk());
490
491 // Start the loop of reading tensors from our pipeline
492 DatasetIterator di(my_tree);
493 TensorRow tensor_list;
494 rc = di.FetchNextTensorRow(&tensor_list);
495 ASSERT_TRUE(rc.IsOk());
496
497 int row_count = 0;
498 while (!tensor_list.empty()) {
499 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
500
501 // Display the tensor by calling the printer on it
502 for (int i = 0; i < tensor_list.size(); i++) {
503 std::ostringstream ss;
504 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
505 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
506 }
507
508 rc = di.FetchNextTensorRow(&tensor_list);
509 ASSERT_TRUE(rc.IsOk());
510 row_count++;
511 }
512
513 ASSERT_EQ(row_count, 1);
514 }
515
TEST_F(MindDataTestTFReaderOp,TestTFReaderTake1Buffer)516 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Buffer) {
517 // Start with an empty execution tree
518 auto my_tree = std::make_shared<ExecutionTree>();
519 Status rc;
520 std::string dataset_path;
521 dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
522
523 std::string data_schema_filepath = dataset_path + "/datasetSchema5Rows.json";
524
525 // TFReaderOp
526 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
527 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema5Rows.json", {});
528 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
529 int32_t op_connector_size = config_manager->op_connector_size();
530 int32_t num_workers = 1;
531 int32_t worker_connector_size = config_manager->worker_connector_size();
532 std::vector<std::string> files = {dataset_path + "/test.data"};
533 std::vector<std::string> columns_to_load = {};
534
535 std::shared_ptr<TFReaderOp> my_tfreader_op =
536 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
537 columns_to_load, false, 1, 0, false);
538 rc = my_tfreader_op->Init();
539 ASSERT_TRUE(rc.IsOk());
540 rc = my_tree->AssociateNode(my_tfreader_op);
541 ASSERT_TRUE(rc.IsOk());
542
543 rc = my_tree->AssignRoot(my_tfreader_op);
544 ASSERT_TRUE(rc.IsOk());
545
546 MS_LOG(INFO) << "Launching tree and begin iteration.";
547 rc = my_tree->Prepare();
548 ASSERT_TRUE(rc.IsOk());
549
550 rc = my_tree->Launch();
551 ASSERT_TRUE(rc.IsOk());
552
553 // Start the loop of reading tensors from our pipeline
554 DatasetIterator di(my_tree);
555 TensorRow tensor_list;
556 rc = di.FetchNextTensorRow(&tensor_list);
557 ASSERT_TRUE(rc.IsOk());
558
559 int row_count = 0;
560 while (!tensor_list.empty()) {
561 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
562
563 // Display the tensor by calling the printer on it
564 for (int i = 0; i < tensor_list.size(); i++) {
565 std::ostringstream ss;
566 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
567 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
568 }
569
570 rc = di.FetchNextTensorRow(&tensor_list);
571 ASSERT_TRUE(rc.IsOk());
572 row_count++;
573 }
574
575 ASSERT_EQ(row_count, 5);
576 }
577
TEST_F(MindDataTestTFReaderOp,TestTFReaderTake7Rows)578 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake7Rows) {
579 // Start with an empty execution tree
580 auto my_tree = std::make_shared<ExecutionTree>();
581 Status rc;
582 std::string dataset_path;
583 dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
584
585 std::string data_schema_filepath = dataset_path + "/datasetSchema7Rows.json";
586
587 // TFReaderOp
588 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
589 schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema7Rows.json", {});
590 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
591 int32_t op_connector_size = config_manager->op_connector_size();
592 int32_t num_workers = 1;
593 int32_t worker_connector_size = config_manager->worker_connector_size();
594 std::vector<std::string> files = {dataset_path + "/test.data"};
595 std::vector<std::string> columns_to_load = {};
596
597 std::shared_ptr<TFReaderOp> my_tfreader_op =
598 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
599 columns_to_load, false, 1, 0, false);
600 rc = my_tfreader_op->Init();
601 ASSERT_TRUE(rc.IsOk());
602 rc = my_tree->AssociateNode(my_tfreader_op);
603 ASSERT_TRUE(rc.IsOk());
604
605 rc = my_tree->AssignRoot(my_tfreader_op);
606 ASSERT_TRUE(rc.IsOk());
607
608 MS_LOG(INFO) << "Launching tree and begin iteration.";
609 rc = my_tree->Prepare();
610 ASSERT_TRUE(rc.IsOk());
611
612 rc = my_tree->Launch();
613 ASSERT_TRUE(rc.IsOk());
614
615 // Start the loop of reading tensors from our pipeline
616 DatasetIterator di(my_tree);
617 TensorRow tensor_list;
618 rc = di.FetchNextTensorRow(&tensor_list);
619 ASSERT_TRUE(rc.IsOk());
620
621 int row_count = 0;
622 while (!tensor_list.empty()) {
623 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
624
625 // Display the tensor by calling the printer on it
626 for (int i = 0; i < tensor_list.size(); i++) {
627 std::ostringstream ss;
628 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
629 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
630 }
631
632 rc = di.FetchNextTensorRow(&tensor_list);
633 ASSERT_TRUE(rc.IsOk());
634 row_count++;
635 }
636
637 ASSERT_EQ(row_count, 7);
638 }
639
TEST_F(MindDataTestTFReaderOp,TestTFReaderBasicNoSchema)640 TEST_F(MindDataTestTFReaderOp, TestTFReaderBasicNoSchema) {
641 // Start with an empty execution tree
642 auto my_tree = std::make_shared<ExecutionTree>();
643 Status rc;
644 std::string dataset_path;
645 dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
646 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
647 int32_t op_connector_size = config_manager->op_connector_size();
648 int32_t num_workers = 1;
649 std::vector<std::string> columns_to_load = {};
650 std::vector<std::string> files = {dataset_path};
651 int32_t worker_connector_size = config_manager->worker_connector_size();
652 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
653 std::shared_ptr<TFReaderOp> my_tfreader_op =
654 std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
655 columns_to_load, false, 1, 0, false);
656 rc = my_tfreader_op->Init();
657 ASSERT_TRUE(rc.IsOk());
658 rc = my_tree->AssociateNode(my_tfreader_op);
659 ASSERT_TRUE(rc.IsOk());
660
661 rc = my_tree->AssignRoot(my_tfreader_op);
662 ASSERT_TRUE(rc.IsOk());
663
664 MS_LOG(INFO) << "Launching tree and begin iteration.";
665 rc = my_tree->Prepare();
666 ASSERT_TRUE(rc.IsOk());
667
668 rc = my_tree->Launch();
669 ASSERT_TRUE(rc.IsOk());
670
671 // Start the loop of reading tensors from our pipeline
672 DatasetIterator di(my_tree);
673 TensorRow tensor_list;
674 rc = di.FetchNextTensorRow(&tensor_list);
675 ASSERT_TRUE(rc.IsOk());
676
677 int row_count = 0;
678 while (!tensor_list.empty()) {
679 // Display the tensor by calling the printer on it
680 ASSERT_EQ(tensor_list.size(), 9);
681 for (int i = 0; i < tensor_list.size(); i++) {
682 std::ostringstream ss;
683 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
684 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
685 }
686
687 rc = di.FetchNextTensorRow(&tensor_list);
688 ASSERT_TRUE(rc.IsOk());
689 row_count++;
690 }
691
692 ASSERT_EQ(row_count, 12);
693 }
694
TEST_F(MindDataTestTFReaderOp,TestTotalRowsBasic)695 TEST_F(MindDataTestTFReaderOp, TestTotalRowsBasic) {
696 std::string tf_file = datasets_root_path_ + "/testTFTestAllTypes/test.data";
697
698 std::vector<std::string> filenames;
699
700 for (int i = 0; i < 5; i++) {
701 filenames.push_back(tf_file);
702 }
703
704 int64_t total_rows = 0;
705 TFReaderOp::CountTotalRows(&total_rows, filenames, 1);
706 ASSERT_EQ(total_rows, 60);
707 TFReaderOp::CountTotalRows(&total_rows, filenames, 2);
708 ASSERT_EQ(total_rows, 60);
709 TFReaderOp::CountTotalRows(&total_rows, filenames, 3);
710 ASSERT_EQ(total_rows, 60);
711 TFReaderOp::CountTotalRows(&total_rows, filenames, 4);
712 ASSERT_EQ(total_rows, 60);
713 TFReaderOp::CountTotalRows(&total_rows, filenames, 5);
714 ASSERT_EQ(total_rows, 60);
715 TFReaderOp::CountTotalRows(&total_rows, filenames, 6);
716 ASSERT_EQ(total_rows, 60);
717 TFReaderOp::CountTotalRows(&total_rows, filenames, 729);
718 ASSERT_EQ(total_rows, 60);
719 TFReaderOp::CountTotalRows(&total_rows, filenames, 1, true);
720 ASSERT_EQ(total_rows, 60);
721 TFReaderOp::CountTotalRows(&total_rows, filenames, 2, true);
722 ASSERT_EQ(total_rows, 60);
723 TFReaderOp::CountTotalRows(&total_rows, filenames, 3, true);
724 ASSERT_EQ(total_rows, 60);
725 TFReaderOp::CountTotalRows(&total_rows, filenames, 4, true);
726 ASSERT_EQ(total_rows, 60);
727 TFReaderOp::CountTotalRows(&total_rows, filenames, 5, true);
728 ASSERT_EQ(total_rows, 60);
729 TFReaderOp::CountTotalRows(&total_rows, filenames, 6, true);
730 ASSERT_EQ(total_rows, 60);
731 TFReaderOp::CountTotalRows(&total_rows, filenames, 729, true);
732 ASSERT_EQ(total_rows, 60);
733 }
734