• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <iostream>
17 #include <memory>
18 #include <vector>
19 
20 #include "minddata/dataset/core/client.h"
21 #include "minddata/dataset/engine/data_schema.h"
22 #include "minddata/dataset/engine/jagged_connector.h"
23 #include "common/common.h"
24 #include "gtest/gtest.h"
25 #include "utils/log_adapter.h"
26 
27 namespace common = mindspore::common;
28 
29 using namespace mindspore::dataset;
30 using mindspore::LogStream;
31 using mindspore::ExceptionType::NoExceptionType;
32 using mindspore::MsLogLevel::INFO;
33 
34 class MindDataTestTFReaderOp : public UT::DatasetOpTesting {};
35 
36 TEST_F(MindDataTestTFReaderOp, TestTFReaderBasic1) {
37   // Start with an empty execution tree
38   auto my_tree = std::make_shared<ExecutionTree>();
39   Status rc;
40   std::string dataset_path;
41   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
42 
43   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
44   int32_t op_connector_size = config_manager->op_connector_size();
45   int32_t num_workers = 1;
46   int32_t worker_connector_size = config_manager->worker_connector_size();
47   std::vector<std::string> files = {dataset_path};
48   std::vector<std::string> columns_to_load = {};
49 
50   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
51   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
52   std::shared_ptr<TFReaderOp> my_tfreader_op =
53     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
54                                  columns_to_load, false, 1, 0, false);
55   rc = my_tfreader_op->Init();
56   ASSERT_TRUE(rc.IsOk());
57   rc = my_tree->AssociateNode(my_tfreader_op);
58   ASSERT_TRUE(rc.IsOk());
59 
60   rc = my_tree->AssignRoot(my_tfreader_op);
61   ASSERT_TRUE(rc.IsOk());
62 
63   MS_LOG(INFO) << "Launching tree and begin iteration.";
64   rc = my_tree->Prepare();
65   ASSERT_TRUE(rc.IsOk());
66 
67   rc = my_tree->Launch();
68   ASSERT_TRUE(rc.IsOk());
69 
70   // Start the loop of reading tensors from our pipeline
71   DatasetIterator di(my_tree);
72   TensorRow tensor_list;
73   rc = di.FetchNextTensorRow(&tensor_list);
74   ASSERT_TRUE(rc.IsOk());
75 
76   int row_count = 0;
77   while (!tensor_list.empty()) {
78     // Display the tensor by calling the printer on it
79     for (int i = 0; i < tensor_list.size(); i++) {
80       std::ostringstream ss;
81       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
82       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
83     }
84 
85     rc = di.FetchNextTensorRow(&tensor_list);
86     ASSERT_TRUE(rc.IsOk());
87     row_count++;
88   }
89 
90   ASSERT_EQ(row_count, 12);
91 }
92 
93 TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeRowsPerBuffer) {
94   // Start with an empty execution tree
95   auto my_tree = std::make_shared<ExecutionTree>();
96   Status rc;
97   std::string dataset_path;
98   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
99 
100   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
101   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
102   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
103   int32_t op_connector_size = config_manager->op_connector_size();
104   int32_t num_workers = 1;
105   int32_t worker_connector_size = config_manager->worker_connector_size();
106   std::vector<std::string> files = {dataset_path};
107   std::vector<std::string> columns_to_load = {};
108 
109   std::shared_ptr<TFReaderOp> my_tfreader_op =
110     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
111                                  columns_to_load, false, 1, 0, false);
112   rc = my_tfreader_op->Init();
113   ASSERT_TRUE(rc.IsOk());
114   rc = my_tree->AssociateNode(my_tfreader_op);
115   ASSERT_TRUE(rc.IsOk());
116 
117   rc = my_tree->AssignRoot(my_tfreader_op);
118   ASSERT_TRUE(rc.IsOk());
119 
120   MS_LOG(INFO) << "Launching tree and begin iteration.";
121   rc = my_tree->Prepare();
122   ASSERT_TRUE(rc.IsOk());
123 
124   rc = my_tree->Launch();
125   ASSERT_TRUE(rc.IsOk());
126 
127   // Start the loop of reading tensors from our pipeline
128   DatasetIterator di(my_tree);
129   TensorRow tensor_list;
130   rc = di.FetchNextTensorRow(&tensor_list);
131   ASSERT_TRUE(rc.IsOk());
132 
133   int row_count = 0;
134   while (!tensor_list.empty()) {
135     // Display the tensor by calling the printer on it
136     for (int i = 0; i < tensor_list.size(); i++) {
137       std::ostringstream ss;
138       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
139       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
140     }
141 
142     rc = di.FetchNextTensorRow(&tensor_list);
143     ASSERT_TRUE(rc.IsOk());
144     row_count++;
145   }
146 
147   ASSERT_EQ(row_count, 12);
148 }
149 
150 TEST_F(MindDataTestTFReaderOp, TestTFReaderSmallRowsPerBuffer) {
151   // Start with an empty execution tree
152   auto my_tree = std::make_shared<ExecutionTree>();
153   Status rc;
154   std::string dataset_path;
155   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
156 
157   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
158   int32_t op_connector_size = config_manager->op_connector_size();
159   int32_t num_workers = 1;
160   int32_t worker_connector_size = config_manager->worker_connector_size();
161   std::vector<std::string> files = {dataset_path};
162   std::vector<std::string> columns_to_load = {};
163 
164   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
165   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
166   std::shared_ptr<TFReaderOp> my_tfreader_op =
167     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
168                                  columns_to_load, false, 1, 0, false);
169   rc = my_tfreader_op->Init();
170   ASSERT_TRUE(rc.IsOk());
171   rc = my_tree->AssociateNode(my_tfreader_op);
172   ASSERT_TRUE(rc.IsOk());
173 
174   rc = my_tree->AssignRoot(my_tfreader_op);
175   ASSERT_TRUE(rc.IsOk());
176 
177   MS_LOG(INFO) << "Launching tree and begin iteration.";
178   rc = my_tree->Prepare();
179   ASSERT_TRUE(rc.IsOk());
180 
181   rc = my_tree->Launch();
182   ASSERT_TRUE(rc.IsOk());
183 
184   // Start the loop of reading tensors from our pipeline
185   DatasetIterator di(my_tree);
186   TensorRow tensor_list;
187   rc = di.FetchNextTensorRow(&tensor_list);
188   ASSERT_TRUE(rc.IsOk());
189 
190   int row_count = 0;
191   while (!tensor_list.empty()) {
192     // Display the tensor by calling the printer on it
193     for (int i = 0; i < tensor_list.size(); i++) {
194       std::ostringstream ss;
195       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
196       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
197     }
198 
199     rc = di.FetchNextTensorRow(&tensor_list);
200     ASSERT_TRUE(rc.IsOk());
201     row_count++;
202   }
203 
204   ASSERT_EQ(row_count, 12);
205 }
206 
207 TEST_F(MindDataTestTFReaderOp, TestTFReaderLargeQueueSize) {
208   // Start with an empty execution tree
209   auto my_tree = std::make_shared<ExecutionTree>();
210   Status rc;
211   std::string dataset_path;
212   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
213 
214   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
215   int32_t op_connector_size = config_manager->op_connector_size();
216   int32_t num_workers = 1;
217   int32_t worker_connector_size = 1;
218   std::vector<std::string> files = {dataset_path};
219   std::vector<std::string> columns_to_load = {};
220 
221   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
222   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
223   std::shared_ptr<TFReaderOp> my_tfreader_op =
224     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
225                                  columns_to_load, false, 1, 0, false);
226   rc = my_tfreader_op->Init();
227   ASSERT_TRUE(rc.IsOk());
228   rc = my_tree->AssociateNode(my_tfreader_op);
229   ASSERT_TRUE(rc.IsOk());
230 
231   rc = my_tree->AssignRoot(my_tfreader_op);
232   ASSERT_TRUE(rc.IsOk());
233 
234   MS_LOG(INFO) << "Launching tree and begin iteration.";
235   rc = my_tree->Prepare();
236   ASSERT_TRUE(rc.IsOk());
237 
238   rc = my_tree->Launch();
239   ASSERT_TRUE(rc.IsOk());
240 
241   // Start the loop of reading tensors from our pipeline
242   DatasetIterator di(my_tree);
243   TensorRow tensor_list;
244   rc = di.FetchNextTensorRow(&tensor_list);
245   ASSERT_TRUE(rc.IsOk());
246 
247   int row_count = 0;
248   while (!tensor_list.empty()) {
249     // Display the tensor by calling the printer on it
250     for (int i = 0; i < tensor_list.size(); i++) {
251       std::ostringstream ss;
252       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
253       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
254     }
255 
256     rc = di.FetchNextTensorRow(&tensor_list);
257     ASSERT_TRUE(rc.IsOk());
258     row_count++;
259   }
260 
261   ASSERT_EQ(row_count, 12);
262 }
263 
264 TEST_F(MindDataTestTFReaderOp, TestTFReaderOneThread) {
265   // Start with an empty execution tree
266   auto my_tree = std::make_shared<ExecutionTree>();
267   Status rc;
268   std::string dataset_path;
269   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
270 
271   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
272   int32_t op_connector_size = config_manager->op_connector_size();
273   int32_t num_workers = 1;
274   int32_t worker_connector_size = config_manager->worker_connector_size();
275   std::vector<std::string> files = {dataset_path};
276   std::vector<std::string> columns_to_load = {};
277 
278   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
279   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
280   std::shared_ptr<TFReaderOp> my_tfreader_op =
281     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
282                                  columns_to_load, false, 1, 0, false);
283   rc = my_tfreader_op->Init();
284   ASSERT_TRUE(rc.IsOk());
285   rc = my_tree->AssociateNode(my_tfreader_op);
286   ASSERT_TRUE(rc.IsOk());
287 
288   rc = my_tree->AssignRoot(my_tfreader_op);
289   ASSERT_TRUE(rc.IsOk());
290 
291   MS_LOG(INFO) << "Launching tree and begin iteration.";
292   rc = my_tree->Prepare();
293   ASSERT_TRUE(rc.IsOk());
294 
295   rc = my_tree->Launch();
296   ASSERT_TRUE(rc.IsOk());
297 
298   // Start the loop of reading tensors from our pipeline
299   DatasetIterator di(my_tree);
300   TensorRow tensor_list;
301   rc = di.FetchNextTensorRow(&tensor_list);
302   ASSERT_TRUE(rc.IsOk());
303 
304   int row_count = 0;
305   while (!tensor_list.empty()) {
306     // Display the tensor by calling the printer on it
307     for (int i = 0; i < tensor_list.size(); i++) {
308       std::ostringstream ss;
309       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
310       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
311     }
312 
313     rc = di.FetchNextTensorRow(&tensor_list);
314     ASSERT_TRUE(rc.IsOk());
315     row_count++;
316   }
317 
318   ASSERT_EQ(row_count, 12);
319 }
320 
321 TEST_F(MindDataTestTFReaderOp, TestTFReaderRepeat) {
322   // Start with an empty execution tree
323   auto my_tree = std::make_shared<ExecutionTree>();
324   Status rc;
325   std::string dataset_path;
326   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
327 
328   // TFReaderOp
329   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
330   int32_t op_connector_size = config_manager->op_connector_size();
331   int32_t num_workers = 1;
332   int32_t worker_connector_size = 16;
333   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
334   std::vector<std::string> files = {dataset_path};
335   std::vector<std::string> columns_to_load = {};
336 
337   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema.json", {});
338   std::shared_ptr<TFReaderOp> my_tfreader_op =
339     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
340                                  columns_to_load, false, 1, 0, false);
341   rc = my_tfreader_op->Init();
342   ASSERT_TRUE(rc.IsOk());
343   rc = my_tree->AssociateNode(my_tfreader_op);
344   ASSERT_TRUE(rc.IsOk());
345 
346   // RepeatOp
347   uint32_t num_repeats = 3;
348   std::shared_ptr<RepeatOp> my_repeat_op = std::make_shared<RepeatOp>(num_repeats);
349   rc = my_tree->AssociateNode(my_repeat_op);
350   ASSERT_TRUE(rc.IsOk());
351 
352   // Set children/root layout.
353   my_tfreader_op->SetTotalRepeats(num_repeats);
354   my_tfreader_op->SetNumRepeatsPerEpoch(num_repeats);
355   rc = my_repeat_op->AddChild(my_tfreader_op);
356   ASSERT_TRUE(rc.IsOk());
357   rc = my_tree->AssignRoot(my_repeat_op);
358   ASSERT_TRUE(rc.IsOk());
359 
360   MS_LOG(INFO) << "Launching tree and begin iteration.";
361   rc = my_tree->Prepare();
362   ASSERT_TRUE(rc.IsOk());
363 
364   rc = my_tree->Launch();
365   ASSERT_TRUE(rc.IsOk());
366 
367   // Start the loop of reading tensors from our pipeline
368   DatasetIterator di(my_tree);
369   TensorRow tensor_list;
370   rc = di.FetchNextTensorRow(&tensor_list);
371   ASSERT_TRUE(rc.IsOk());
372 
373   int row_count = 0;
374   while (!tensor_list.empty()) {
375     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
376 
377     // Display the tensor by calling the printer on it
378     for (int i = 0; i < tensor_list.size(); i++) {
379       std::ostringstream ss;
380       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
381       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
382     }
383 
384     rc = di.FetchNextTensorRow(&tensor_list);
385     ASSERT_TRUE(rc.IsOk());
386     row_count++;
387   }
388 
389   ASSERT_EQ(row_count, 12 * 3);
390 }
391 
392 TEST_F(MindDataTestTFReaderOp, TestTFReaderSchemaConstructor) {
393   // Start with an empty execution tree
394   auto my_tree = std::make_shared<ExecutionTree>();
395   Status rc;
396   std::string dataset_path;
397   dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
398   std::vector<std::string> files = {dataset_path + "/test.data"};
399 
400   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
401   std::vector<std::string> columns_to_load;
402   columns_to_load.push_back("col_sint32");
403   columns_to_load.push_back("col_binary");
404   schema->LoadSchemaFile(dataset_path + "/datasetSchema.json", columns_to_load);
405 
406   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
407   int32_t op_connector_size = config_manager->op_connector_size();
408   int32_t worker_connector_size = config_manager->worker_connector_size();
409   int32_t num_workers = 1;
410 
411   std::shared_ptr<TFReaderOp> my_tfreader_op =
412     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
413                                  columns_to_load, false, 1, 0, false);
414   rc = my_tfreader_op->Init();
415   ASSERT_TRUE(rc.IsOk());
416   rc = my_tree->AssociateNode(my_tfreader_op);
417   ASSERT_TRUE(rc.IsOk());
418 
419   rc = my_tree->AssignRoot(my_tfreader_op);
420   ASSERT_TRUE(rc.IsOk());
421 
422   MS_LOG(INFO) << "Launching tree and begin iteration.";
423   rc = my_tree->Prepare();
424   ASSERT_TRUE(rc.IsOk());
425 
426   rc = my_tree->Launch();
427   ASSERT_TRUE(rc.IsOk());
428 
429   // Start the loop of reading tensors from our pipeline
430   DatasetIterator di(my_tree);
431   TensorRow tensor_list;
432   rc = di.FetchNextTensorRow(&tensor_list);
433   ASSERT_TRUE(rc.IsOk());
434 
435   int row_count = 0;
436   while (!tensor_list.empty()) {
437     // Display the tensor by calling the printer on it
438     ASSERT_EQ(tensor_list.size(), columns_to_load.size());
439 
440     for (int i = 0; i < tensor_list.size(); i++) {
441       std::ostringstream ss;
442       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
443       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
444     }
445 
446     rc = di.FetchNextTensorRow(&tensor_list);
447     ASSERT_TRUE(rc.IsOk());
448     row_count++;
449   }
450 
451   ASSERT_EQ(row_count, 12);
452 }
453 
454 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Row) {
455   // Start with an empty execution tree
456   auto my_tree = std::make_shared<ExecutionTree>();
457   Status rc;
458   std::string dataset_path;
459   dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
460 
461   std::string data_schema_filepath = dataset_path + "/datasetSchema1Row.json";
462 
463   // TFReaderOp
464   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
465   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema1Row.json", {});
466   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
467   int32_t op_connector_size = config_manager->op_connector_size();
468   int32_t num_workers = 1;
469   int32_t worker_connector_size = config_manager->worker_connector_size();
470   std::vector<std::string> files = {dataset_path + "/test.data"};
471   std::vector<std::string> columns_to_load = {};
472 
473   std::shared_ptr<TFReaderOp> my_tfreader_op =
474     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
475                                  columns_to_load, false, 1, 0, false);
476   rc = my_tfreader_op->Init();
477   ASSERT_TRUE(rc.IsOk());
478   rc = my_tree->AssociateNode(my_tfreader_op);
479   ASSERT_TRUE(rc.IsOk());
480 
481   rc = my_tree->AssignRoot(my_tfreader_op);
482   ASSERT_TRUE(rc.IsOk());
483 
484   MS_LOG(INFO) << "Launching tree and begin iteration.";
485   rc = my_tree->Prepare();
486   ASSERT_TRUE(rc.IsOk());
487 
488   rc = my_tree->Launch();
489   ASSERT_TRUE(rc.IsOk());
490 
491   // Start the loop of reading tensors from our pipeline
492   DatasetIterator di(my_tree);
493   TensorRow tensor_list;
494   rc = di.FetchNextTensorRow(&tensor_list);
495   ASSERT_TRUE(rc.IsOk());
496 
497   int row_count = 0;
498   while (!tensor_list.empty()) {
499     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
500 
501     // Display the tensor by calling the printer on it
502     for (int i = 0; i < tensor_list.size(); i++) {
503       std::ostringstream ss;
504       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
505       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
506     }
507 
508     rc = di.FetchNextTensorRow(&tensor_list);
509     ASSERT_TRUE(rc.IsOk());
510     row_count++;
511   }
512 
513   ASSERT_EQ(row_count, 1);
514 }
515 
516 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake1Buffer) {
517   // Start with an empty execution tree
518   auto my_tree = std::make_shared<ExecutionTree>();
519   Status rc;
520   std::string dataset_path;
521   dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
522 
523   std::string data_schema_filepath = dataset_path + "/datasetSchema5Rows.json";
524 
525   // TFReaderOp
526   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
527   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema5Rows.json", {});
528   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
529   int32_t op_connector_size = config_manager->op_connector_size();
530   int32_t num_workers = 1;
531   int32_t worker_connector_size = config_manager->worker_connector_size();
532   std::vector<std::string> files = {dataset_path + "/test.data"};
533   std::vector<std::string> columns_to_load = {};
534 
535   std::shared_ptr<TFReaderOp> my_tfreader_op =
536     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
537                                  columns_to_load, false, 1, 0, false);
538   rc = my_tfreader_op->Init();
539   ASSERT_TRUE(rc.IsOk());
540   rc = my_tree->AssociateNode(my_tfreader_op);
541   ASSERT_TRUE(rc.IsOk());
542 
543   rc = my_tree->AssignRoot(my_tfreader_op);
544   ASSERT_TRUE(rc.IsOk());
545 
546   MS_LOG(INFO) << "Launching tree and begin iteration.";
547   rc = my_tree->Prepare();
548   ASSERT_TRUE(rc.IsOk());
549 
550   rc = my_tree->Launch();
551   ASSERT_TRUE(rc.IsOk());
552 
553   // Start the loop of reading tensors from our pipeline
554   DatasetIterator di(my_tree);
555   TensorRow tensor_list;
556   rc = di.FetchNextTensorRow(&tensor_list);
557   ASSERT_TRUE(rc.IsOk());
558 
559   int row_count = 0;
560   while (!tensor_list.empty()) {
561     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
562 
563     // Display the tensor by calling the printer on it
564     for (int i = 0; i < tensor_list.size(); i++) {
565       std::ostringstream ss;
566       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
567       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
568     }
569 
570     rc = di.FetchNextTensorRow(&tensor_list);
571     ASSERT_TRUE(rc.IsOk());
572     row_count++;
573   }
574 
575   ASSERT_EQ(row_count, 5);
576 }
577 
578 TEST_F(MindDataTestTFReaderOp, TestTFReaderTake7Rows) {
579   // Start with an empty execution tree
580   auto my_tree = std::make_shared<ExecutionTree>();
581   Status rc;
582   std::string dataset_path;
583   dataset_path = datasets_root_path_ + "/testTFTestAllTypes";
584 
585   std::string data_schema_filepath = dataset_path + "/datasetSchema7Rows.json";
586 
587   // TFReaderOp
588   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
589   schema->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema7Rows.json", {});
590   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
591   int32_t op_connector_size = config_manager->op_connector_size();
592   int32_t num_workers = 1;
593   int32_t worker_connector_size = config_manager->worker_connector_size();
594   std::vector<std::string> files = {dataset_path + "/test.data"};
595   std::vector<std::string> columns_to_load = {};
596 
597   std::shared_ptr<TFReaderOp> my_tfreader_op =
598     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
599                                  columns_to_load, false, 1, 0, false);
600   rc = my_tfreader_op->Init();
601   ASSERT_TRUE(rc.IsOk());
602   rc = my_tree->AssociateNode(my_tfreader_op);
603   ASSERT_TRUE(rc.IsOk());
604 
605   rc = my_tree->AssignRoot(my_tfreader_op);
606   ASSERT_TRUE(rc.IsOk());
607 
608   MS_LOG(INFO) << "Launching tree and begin iteration.";
609   rc = my_tree->Prepare();
610   ASSERT_TRUE(rc.IsOk());
611 
612   rc = my_tree->Launch();
613   ASSERT_TRUE(rc.IsOk());
614 
615   // Start the loop of reading tensors from our pipeline
616   DatasetIterator di(my_tree);
617   TensorRow tensor_list;
618   rc = di.FetchNextTensorRow(&tensor_list);
619   ASSERT_TRUE(rc.IsOk());
620 
621   int row_count = 0;
622   while (!tensor_list.empty()) {
623     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
624 
625     // Display the tensor by calling the printer on it
626     for (int i = 0; i < tensor_list.size(); i++) {
627       std::ostringstream ss;
628       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
629       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
630     }
631 
632     rc = di.FetchNextTensorRow(&tensor_list);
633     ASSERT_TRUE(rc.IsOk());
634     row_count++;
635   }
636 
637   ASSERT_EQ(row_count, 7);
638 }
639 
640 TEST_F(MindDataTestTFReaderOp, TestTFReaderBasicNoSchema) {
641   // Start with an empty execution tree
642   auto my_tree = std::make_shared<ExecutionTree>();
643   Status rc;
644   std::string dataset_path;
645   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
646   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
647   int32_t op_connector_size = config_manager->op_connector_size();
648   int32_t num_workers = 1;
649   std::vector<std::string> columns_to_load = {};
650   std::vector<std::string> files = {dataset_path};
651   int32_t worker_connector_size = config_manager->worker_connector_size();
652   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
653   std::shared_ptr<TFReaderOp> my_tfreader_op =
654     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema), op_connector_size,
655                                  columns_to_load, false, 1, 0, false);
656   rc = my_tfreader_op->Init();
657   ASSERT_TRUE(rc.IsOk());
658   rc = my_tree->AssociateNode(my_tfreader_op);
659   ASSERT_TRUE(rc.IsOk());
660 
661   rc = my_tree->AssignRoot(my_tfreader_op);
662   ASSERT_TRUE(rc.IsOk());
663 
664   MS_LOG(INFO) << "Launching tree and begin iteration.";
665   rc = my_tree->Prepare();
666   ASSERT_TRUE(rc.IsOk());
667 
668   rc = my_tree->Launch();
669   ASSERT_TRUE(rc.IsOk());
670 
671   // Start the loop of reading tensors from our pipeline
672   DatasetIterator di(my_tree);
673   TensorRow tensor_list;
674   rc = di.FetchNextTensorRow(&tensor_list);
675   ASSERT_TRUE(rc.IsOk());
676 
677   int row_count = 0;
678   while (!tensor_list.empty()) {
679     // Display the tensor by calling the printer on it
680     ASSERT_EQ(tensor_list.size(), 9);
681     for (int i = 0; i < tensor_list.size(); i++) {
682       std::ostringstream ss;
683       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
684       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
685     }
686 
687     rc = di.FetchNextTensorRow(&tensor_list);
688     ASSERT_TRUE(rc.IsOk());
689     row_count++;
690   }
691 
692   ASSERT_EQ(row_count, 12);
693 }
694 
695 TEST_F(MindDataTestTFReaderOp, TestTotalRowsBasic) {
696   std::string tf_file = datasets_root_path_ + "/testTFTestAllTypes/test.data";
697 
698   std::vector<std::string> filenames;
699 
700   for (int i = 0; i < 5; i++) {
701     filenames.push_back(tf_file);
702   }
703 
704   int64_t total_rows = 0;
705   TFReaderOp::CountTotalRows(&total_rows, filenames, 1);
706   ASSERT_EQ(total_rows, 60);
707   TFReaderOp::CountTotalRows(&total_rows, filenames, 2);
708   ASSERT_EQ(total_rows, 60);
709   TFReaderOp::CountTotalRows(&total_rows, filenames, 3);
710   ASSERT_EQ(total_rows, 60);
711   TFReaderOp::CountTotalRows(&total_rows, filenames, 4);
712   ASSERT_EQ(total_rows, 60);
713   TFReaderOp::CountTotalRows(&total_rows, filenames, 5);
714   ASSERT_EQ(total_rows, 60);
715   TFReaderOp::CountTotalRows(&total_rows, filenames, 6);
716   ASSERT_EQ(total_rows, 60);
717   TFReaderOp::CountTotalRows(&total_rows, filenames, 729);
718   ASSERT_EQ(total_rows, 60);
719   TFReaderOp::CountTotalRows(&total_rows, filenames, 1, true);
720   ASSERT_EQ(total_rows, 60);
721   TFReaderOp::CountTotalRows(&total_rows, filenames, 2, true);
722   ASSERT_EQ(total_rows, 60);
723   TFReaderOp::CountTotalRows(&total_rows, filenames, 3, true);
724   ASSERT_EQ(total_rows, 60);
725   TFReaderOp::CountTotalRows(&total_rows, filenames, 4, true);
726   ASSERT_EQ(total_rows, 60);
727   TFReaderOp::CountTotalRows(&total_rows, filenames, 5, true);
728   ASSERT_EQ(total_rows, 60);
729   TFReaderOp::CountTotalRows(&total_rows, filenames, 6, true);
730   ASSERT_EQ(total_rows, 60);
731   TFReaderOp::CountTotalRows(&total_rows, filenames, 729, true);
732   ASSERT_EQ(total_rows, 60);
733 }
734