• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2019-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/core/client.h"
17 #include "common/common.h"
18 #include "utils/ms_utils.h"
19 #include "gtest/gtest.h"
20 #include "utils/log_adapter.h"
21 #include <memory>
22 #include <vector>
23 #include <iostream>
24 #include "minddata/dataset/util/random.h"
25 #include "minddata/dataset/engine/jagged_connector.h"
26 
27 namespace common = mindspore::common;
28 
29 using namespace mindspore::dataset;
30 using mindspore::LogStream;
31 using mindspore::ExceptionType::NoExceptionType;
32 using mindspore::MsLogLevel::INFO;
33 
34 class MindDataTestShuffleOp : public UT::DatasetOpTesting {};
35 
36 // Test info:
37 // - Dataset from testDataset1 has 10 rows, 2 columns.
38 // - RowsPerBuffer buffer setting of 2 divides evenly into total rows.
39 // - Shuffle size is multiple of rows per buffer.
40 //
41 // Tree:  shuffle over TFReader
42 //
43 //    ShuffleOp
44 //        |
45 //    TFReaderOp
46 //
TEST_F(MindDataTestShuffleOp,TestShuffleBasic1)47 TEST_F(MindDataTestShuffleOp, TestShuffleBasic1) {
48   Status rc;
49   MS_LOG(INFO) << "UT test TestShuffleBasic1.";
50 
51   // Start with an empty execution tree
52   auto my_tree = std::make_shared<ExecutionTree>();
53 
54   std::string dataset_path;
55   dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
56   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
57   auto op_connector_size = config_manager->op_connector_size();
58   std::vector<std::string> columns_to_load = {};
59   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
60   std::vector<std::string> files = {dataset_path};
61   std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
62     1, 16, 0, files, std::move(schema), op_connector_size, columns_to_load, false, 1, 0, false);
63   rc = my_tfreader_op->Init();
64   EXPECT_TRUE(rc.IsOk());
65   rc = my_tree->AssociateNode(my_tfreader_op);
66   EXPECT_TRUE(rc.IsOk());
67   uint32_t shuffle_seed = GetSeed();
68   std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(4, shuffle_seed, op_connector_size, true);
69   rc = my_tree->AssociateNode(my_shuffle_op);
70   EXPECT_TRUE(rc.IsOk());
71 
72   // Set children/root layout.
73   rc = my_shuffle_op->AddChild(my_tfreader_op);
74   EXPECT_TRUE(rc.IsOk());
75   rc = my_tree->AssignRoot(my_shuffle_op);
76   EXPECT_TRUE(rc.IsOk());
77   MS_LOG(INFO) << "Launching tree and begin iteration.";
78   rc = my_tree->Prepare();
79   EXPECT_TRUE(rc.IsOk());
80   rc = my_tree->Launch();
81   EXPECT_TRUE(rc.IsOk());
82 
83   // Start the loop of reading tensors from our pipeline
84   DatasetIterator di(my_tree);
85   TensorRow tensor_list;
86   rc = di.FetchNextTensorRow(&tensor_list);
87   EXPECT_TRUE(rc.IsOk());
88 
89   int row_count = 0;
90   while (!tensor_list.empty()) {
91     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
92 
93     // Display the tensor by calling the printer on it
94     for (int i = 0; i < tensor_list.size(); i++) {
95       std::ostringstream ss;
96       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
97       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
98     }
99     rc = di.FetchNextTensorRow(&tensor_list);
100     EXPECT_TRUE(rc.IsOk());
101     row_count++;
102   }
103   ASSERT_EQ(row_count, 10);
104 }
105 
106 // Test info:
107 // - Dataset from testDataset1 has 10 rows, 2 columns.
108 // - RowsPerBuffer buffer setting of 3 does not divide evenly into total rows, thereby causing
109 //   partially filled buffers.
110 // - Shuffle size is not a multiple of rows per buffer.
111 // - User has provided a non-default seed value.
112 //
113 // Tree: shuffle over TFReader
114 //
115 //    ShuffleOp
116 //       |
117 //    TFReaderOp
118 //
TEST_F(MindDataTestShuffleOp,TestShuffleBasic2)119 TEST_F(MindDataTestShuffleOp, TestShuffleBasic2) {
120   Status rc;
121   MS_LOG(INFO) << "UT test TestShuffleBasic2.";
122 
123   // Start with an empty execution tree
124   auto my_tree = std::make_shared<ExecutionTree>();
125 
126   std::string dataset_path;
127   dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
128   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
129   int32_t op_connector_size = config_manager->op_connector_size();
130   std::vector<std::string> columns_to_load = {};
131   std::vector<std::string> files = {dataset_path};
132   std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
133   std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
134     1, 16, 0, files, std::move(schema), op_connector_size, columns_to_load, false, 1, 0, false);
135   rc = my_tfreader_op->Init();
136   EXPECT_TRUE(rc.IsOk());
137   rc = my_tree->AssociateNode(my_tfreader_op);
138   EXPECT_TRUE(rc.IsOk());
139   std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(4, 100, op_connector_size, true);
140   rc = my_tree->AssociateNode(my_shuffle_op);
141   EXPECT_TRUE(rc.IsOk());
142 
143   // Set children/root layout.
144   rc = my_shuffle_op->AddChild(my_tfreader_op);
145   EXPECT_TRUE(rc.IsOk());
146   rc = my_tree->AssignRoot(my_shuffle_op);
147   EXPECT_TRUE(rc.IsOk());
148   MS_LOG(INFO) << "Launching tree and begin iteration.";
149   rc = my_tree->Prepare();
150   EXPECT_TRUE(rc.IsOk());
151   rc = my_tree->Launch();
152   EXPECT_TRUE(rc.IsOk());
153 
154   // Start the loop of reading tensors from our pipeline
155   DatasetIterator di(my_tree);
156   TensorRow tensor_list;
157   rc = di.FetchNextTensorRow(&tensor_list);
158   EXPECT_TRUE(rc.IsOk());
159   int row_count = 0;
160   while (!tensor_list.empty()) {
161     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
162 
163     // Display the tensor by calling the printer on it
164     for (int i = 0; i < tensor_list.size(); i++) {
165       std::ostringstream ss;
166       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
167       MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
168     }
169     rc = di.FetchNextTensorRow(&tensor_list);
170     EXPECT_TRUE(rc.IsOk());
171     row_count++;
172   }
173   ASSERT_EQ(row_count, 10);
174 }
175 
176 // Test info:
177 // - Dataset from testDataset1 has 10 rows, 2 columns.
178 // - RowsPerBuffer buffer setting of 3 does not divide evenly into total rows, thereby causing
179 //   partially filled buffers
180 // - Shuffle size captures the entire dataset size (actually sets a value that is larger than the
181 //   amount of rows in the dataset.
182 //
183 // Tree: shuffle over TFReader
184 //
185 //    ShuffleOp
186 //        |
187 //    TFReaderOp
188 //
TEST_F(MindDataTestShuffleOp,TestShuffleBasic3)189 TEST_F(MindDataTestShuffleOp, TestShuffleBasic3) {
190   Status rc;
191   MS_LOG(INFO) << "UT test TestShuffleBasic3.";
192 
193   // Start with an empty execution tree
194   auto my_tree = std::make_shared<ExecutionTree>();
195 
196   std::string dataset_path;
197   dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
198   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
199   auto op_connector_size = config_manager->op_connector_size();
200   std::vector<std::string> columns_to_load = {};
201   std::vector<std::string> files = {dataset_path};
202   std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
203     1, 16, 0, files, std::make_unique<DataSchema>(), op_connector_size, columns_to_load, false, 1, 0, false);
204   rc = my_tfreader_op->Init();
205   EXPECT_TRUE(rc.IsOk());
206   my_tree->AssociateNode(my_tfreader_op);
207   uint32_t shuffle_seed = GetSeed();
208   std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(100, shuffle_seed, op_connector_size, true);
209   rc = my_tree->AssociateNode(my_shuffle_op);
210   EXPECT_TRUE(rc.IsOk());
211 
212   // Set children/root layout.
213   rc = my_shuffle_op->AddChild(my_tfreader_op);
214   EXPECT_TRUE(rc.IsOk());
215   rc = my_tree->AssignRoot(my_shuffle_op);
216   EXPECT_TRUE(rc.IsOk());
217   MS_LOG(INFO) << "Launching tree and begin iteration.";
218   rc = my_tree->Prepare();
219   EXPECT_TRUE(rc.IsOk());
220   rc = my_tree->Launch();
221   EXPECT_TRUE(rc.IsOk());
222 
223   // Start the loop of reading tensors from our pipeline
224   DatasetIterator di(my_tree);
225   TensorRow tensor_list;
226   rc = di.FetchNextTensorRow(&tensor_list);
227   EXPECT_TRUE(rc.IsOk());
228   int row_count = 0;
229   while (!tensor_list.empty()) {
230     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
231 
232     // Display the tensor by calling the printer on it
233     for (int i = 0; i < tensor_list.size(); i++) {
234       std::ostringstream ss;
235       ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
236       MS_LOG(INFO) << "Tensor print: " << common::SafeCStr(ss.str()) << ".";
237     }
238     rc = di.FetchNextTensorRow(&tensor_list);
239     EXPECT_TRUE(rc.IsOk());
240     row_count++;
241   }
242   ASSERT_EQ(row_count, 10);
243 }
244 
245 // Test info:
246 // - Dataset from testDataset1 has 10 rows, 2 columns.
247 // - RowsPerBuffer buffer setting of 3 does not divide evenly into total rows thereby causing
248 //   partially filled buffers
249 // - Shuffle size is not a multiple of rows per buffer.
250 // - shuffle seed is given, and subsequent epochs will change the seed each time.
251 // - Repeat count of 2
252 //
253 // Tree: Repeat over shuffle over TFReader
254 //
255 //    Repeat
256 //       |
257 //    shuffle
258 //       |
259 //    TFReaderOp
260 //
TEST_F(MindDataTestShuffleOp,TestRepeatShuffle)261 TEST_F(MindDataTestShuffleOp, TestRepeatShuffle) {
262   Status rc;
263   MS_LOG(INFO) << "UT test TestRepeatShuffle.";
264 
265   // Start with an empty execution tree
266   auto my_tree = std::make_shared<ExecutionTree>();
267 
268   std::string dataset_path;
269   dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
270   std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
271   int32_t op_connector_size = config_manager->op_connector_size();
272   std::vector<std::string> columns_to_load = {};
273   std::vector<std::string> files = {dataset_path};
274   std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
275     2, 16, 0, files, std::make_unique<DataSchema>(), op_connector_size, columns_to_load, false, 1, 0, false);
276   rc = my_tfreader_op->Init();
277   EXPECT_TRUE(rc.IsOk());
278   rc = my_tree->AssociateNode(my_tfreader_op);
279   EXPECT_TRUE(rc.IsOk());
280   std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(4, 100, op_connector_size, true);
281   rc = my_tree->AssociateNode(my_shuffle_op);
282   EXPECT_TRUE(rc.IsOk());
283   uint32_t num_repeats = 2;
284   std::shared_ptr<RepeatOp> my_repeat_op = std::make_shared<RepeatOp>(num_repeats);
285   rc = my_tree->AssociateNode(my_repeat_op);
286   EXPECT_TRUE(rc.IsOk());
287 
288   // Set children/root layout.
289   my_shuffle_op->SetTotalRepeats(num_repeats);
290   my_shuffle_op->SetNumRepeatsPerEpoch(num_repeats);
291   rc = my_repeat_op->AddChild(my_shuffle_op);
292   EXPECT_TRUE(rc.IsOk());
293   my_tfreader_op->SetTotalRepeats(num_repeats);
294   my_tfreader_op->SetNumRepeatsPerEpoch(num_repeats);
295   rc = my_shuffle_op->AddChild(my_tfreader_op);
296   EXPECT_TRUE(rc.IsOk());
297   rc = my_tree->AssignRoot(my_repeat_op);
298   EXPECT_TRUE(rc.IsOk());
299   MS_LOG(INFO) << "Launching tree and begin iteration.";
300   rc = my_tree->Prepare();
301   EXPECT_TRUE(rc.IsOk());
302   rc = my_tree->Launch();
303   EXPECT_TRUE(rc.IsOk());
304 
305   // Start the loop of reading tensors from our pipeline
306   DatasetIterator di(my_tree);
307   TensorRow tensor_list;
308   rc = di.FetchNextTensorRow(&tensor_list);
309   EXPECT_TRUE(rc.IsOk());
310   int row_count = 0;
311   while (!tensor_list.empty()) {
312     MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
313 
314     // Display the tensor by calling the printer on it
315     for (int i = 0; i < tensor_list.size(); i++) {
316       std::ostringstream ss;
317       ss << *tensor_list[i] << std::endl;
318       MS_LOG(INFO) << "Tensor print: " << common::SafeCStr(ss.str()) << ".";
319     }
320     rc = di.FetchNextTensorRow(&tensor_list);
321     EXPECT_TRUE(rc.IsOk());
322     row_count++;
323   }
324   ASSERT_EQ(row_count, 20);
325 }
326