• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <iostream>
17 #include <memory>
18 #include <vector>
19 
20 #include "common/common.h"
21 #include "utils/ms_utils.h"
22 #include "minddata/dataset/core/client.h"
23 #include "minddata/dataset/engine/jagged_connector.h"
24 #include "gtest/gtest.h"
25 #include "utils/log_adapter.h"
26 
27 namespace common = mindspore::common;
28 
29 using namespace mindspore::dataset;
30 using mindspore::LogStream;
31 using mindspore::ExceptionType::NoExceptionType;
32 using mindspore::MsLogLevel::INFO;
33 
34 class MindDataTestConcatOp : public UT::DatasetOpTesting {};
35 
36 TEST_F(MindDataTestConcatOp, TestConcatProject) {
37   /* Tree:
38    *
39    *            OpId(2) ConcatOp
40    *            /               \
41    *     OpId(0) TFReaderOp    OpId(1) TFReaderOp
42    *
43    * Start with an empty execution tree
44    */
45   MS_LOG(INFO) << "UT test TestConcatProject.";
46   auto my_tree = std::make_shared<ExecutionTree>();
47 
48   std::string dataset_path;
49   dataset_path = datasets_root_path_ + "/testTFTestAllTypes/test.data";
50 
51   // TFReaderOp1
52   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
53   auto op_connector_size = config_manager->op_connector_size();
54   int32_t num_workers = 1;  // only one file -> one worker
55   int32_t worker_connector_size = 16;
56   std::vector<std::string> columns_to_load = {};
57   std::vector<std::string> files = {dataset_path};
58   std::unique_ptr<DataSchema> schema1 = std::make_unique<DataSchema>();
59   schema1->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema1Row.json", {});
60   // 16 is worker connector size
61   std::shared_ptr<TFReaderOp> my_tfreader_op1 =
62     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema1), op_connector_size,
63                                  columns_to_load, false, 1, 0, false);
64   Status rc = my_tfreader_op1->Init();
65   ASSERT_OK(rc);
66   rc = my_tree->AssociateNode(my_tfreader_op1);
67   ASSERT_OK(rc);
68 
69   // TFReaderOp2
70   std::unique_ptr<DataSchema> schema2 = std::make_unique<DataSchema>();
71   schema2->LoadSchemaFile(datasets_root_path_ + "/testTFTestAllTypes/datasetSchema1Row.json", {});
72   // 16 is worker connector size
73   std::shared_ptr<TFReaderOp> my_tfreader_op2 =
74     std::make_shared<TFReaderOp>(num_workers, worker_connector_size, 0, files, std::move(schema2), op_connector_size,
75                                  columns_to_load, false, 1, 0, false);
76   rc = my_tfreader_op2->Init();
77   ASSERT_OK(rc);
78   rc = my_tree->AssociateNode(my_tfreader_op2);
79   ASSERT_OK(rc);
80 
81   // Creating ConcatOp
82   std::shared_ptr<SamplerRT> concat_sampler = std::make_shared<DistributedSamplerRT>(1, 0, false, 0);
83   std::vector<std::pair<int, int>> flag_and_nums = {};
84   std::vector<std::pair<int, int>> children_start_end_index = {};
85   std::shared_ptr<ConcatOp> concat_op =
86     std::make_shared<ConcatOp>(std::move(concat_sampler), flag_and_nums, children_start_end_index);
87 
88   rc = my_tree->AssociateNode(concat_op);
89   EXPECT_TRUE(rc.IsOk());
90   rc = concat_op->AddChild(std::move(my_tfreader_op1));
91   EXPECT_TRUE(rc.IsOk());
92   rc = concat_op->AddChild(std::move(my_tfreader_op2));
93   EXPECT_TRUE(rc.IsOk());
94   rc = my_tree->AssignRoot(concat_op);
95   EXPECT_TRUE(rc.IsOk());
96   rc = my_tree->Prepare();
97   EXPECT_TRUE(rc.IsOk());
98 
99   // Launch the tree execution to kick off threads and start running the pipeline
100   MS_LOG(INFO) << "Launching my tree.";
101   rc = my_tree->Launch();
102   EXPECT_TRUE(rc.IsOk());
103 
104   // Simulate a parse of data from our pipeline.
105   std::shared_ptr<DatasetOp> rootNode = my_tree->root();
106 
107   DatasetIterator di(my_tree);
108   TensorRow tensor_list;
109   rc = di.FetchNextTensorRow(&tensor_list);
110   EXPECT_TRUE(rc.IsOk());
111 
112   int row_count = 0;
113   while (!tensor_list.empty()) {
114     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
115 
116     // Display the tensor by calling the printer on it
117     for (int i = 0; i < tensor_list.size(); i++) {
118       std::ostringstream ss;
119       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
120       MS_LOG(INFO) << "Tensor print: " << common::SafeCStr(ss.str()) << ".";
121     }
122     rc = di.FetchNextTensorRow(&tensor_list);
123     EXPECT_TRUE(rc.IsOk());
124     row_count++;
125   }
126   ASSERT_EQ(row_count, 2);  // Should be 2 rows fetched
127 }
128