1 /**
2 * Copyright 2019-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/core/client.h"
17 #include "common/common.h"
18 #include "utils/ms_utils.h"
19 #include "gtest/gtest.h"
20 #include "utils/log_adapter.h"
21 #include <memory>
22 #include <vector>
23 #include <iostream>
24 #include "minddata/dataset/util/random.h"
25 #include "minddata/dataset/engine/jagged_connector.h"
26
27 namespace common = mindspore::common;
28
29 using namespace mindspore::dataset;
30 using mindspore::LogStream;
31 using mindspore::ExceptionType::NoExceptionType;
32 using mindspore::MsLogLevel::INFO;
33
34 class MindDataTestShuffleOp : public UT::DatasetOpTesting {};
35
36 // Test info:
37 // - Dataset from testDataset1 has 10 rows, 2 columns.
38 // - RowsPerBuffer buffer setting of 2 divides evenly into total rows.
39 // - Shuffle size is multiple of rows per buffer.
40 //
41 // Tree: shuffle over TFReader
42 //
43 // ShuffleOp
44 // |
45 // TFReaderOp
46 //
TEST_F(MindDataTestShuffleOp,TestShuffleBasic1)47 TEST_F(MindDataTestShuffleOp, TestShuffleBasic1) {
48 Status rc;
49 MS_LOG(INFO) << "UT test TestShuffleBasic1.";
50
51 // Start with an empty execution tree
52 auto my_tree = std::make_shared<ExecutionTree>();
53
54 std::string dataset_path;
55 dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
56 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
57 auto op_connector_size = config_manager->op_connector_size();
58 std::vector<std::string> columns_to_load = {};
59 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
60 std::vector<std::string> files = {dataset_path};
61 std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
62 1, 16, 0, files, std::move(schema), op_connector_size, columns_to_load, false, 1, 0, false);
63 rc = my_tfreader_op->Init();
64 EXPECT_TRUE(rc.IsOk());
65 rc = my_tree->AssociateNode(my_tfreader_op);
66 EXPECT_TRUE(rc.IsOk());
67 uint32_t shuffle_seed = GetSeed();
68 std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(4, shuffle_seed, op_connector_size, true);
69 rc = my_tree->AssociateNode(my_shuffle_op);
70 EXPECT_TRUE(rc.IsOk());
71
72 // Set children/root layout.
73 rc = my_shuffle_op->AddChild(my_tfreader_op);
74 EXPECT_TRUE(rc.IsOk());
75 rc = my_tree->AssignRoot(my_shuffle_op);
76 EXPECT_TRUE(rc.IsOk());
77 MS_LOG(INFO) << "Launching tree and begin iteration.";
78 rc = my_tree->Prepare();
79 EXPECT_TRUE(rc.IsOk());
80 rc = my_tree->Launch();
81 EXPECT_TRUE(rc.IsOk());
82
83 // Start the loop of reading tensors from our pipeline
84 DatasetIterator di(my_tree);
85 TensorRow tensor_list;
86 rc = di.FetchNextTensorRow(&tensor_list);
87 EXPECT_TRUE(rc.IsOk());
88
89 int row_count = 0;
90 while (!tensor_list.empty()) {
91 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
92
93 // Display the tensor by calling the printer on it
94 for (int i = 0; i < tensor_list.size(); i++) {
95 std::ostringstream ss;
96 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
97 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
98 }
99 rc = di.FetchNextTensorRow(&tensor_list);
100 EXPECT_TRUE(rc.IsOk());
101 row_count++;
102 }
103 ASSERT_EQ(row_count, 10);
104 }
105
106 // Test info:
107 // - Dataset from testDataset1 has 10 rows, 2 columns.
108 // - RowsPerBuffer buffer setting of 3 does not divide evenly into total rows, thereby causing
109 // partially filled buffers.
110 // - Shuffle size is not a multiple of rows per buffer.
111 // - User has provided a non-default seed value.
112 //
113 // Tree: shuffle over TFReader
114 //
115 // ShuffleOp
116 // |
117 // TFReaderOp
118 //
TEST_F(MindDataTestShuffleOp,TestShuffleBasic2)119 TEST_F(MindDataTestShuffleOp, TestShuffleBasic2) {
120 Status rc;
121 MS_LOG(INFO) << "UT test TestShuffleBasic2.";
122
123 // Start with an empty execution tree
124 auto my_tree = std::make_shared<ExecutionTree>();
125
126 std::string dataset_path;
127 dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
128 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
129 int32_t op_connector_size = config_manager->op_connector_size();
130 std::vector<std::string> columns_to_load = {};
131 std::vector<std::string> files = {dataset_path};
132 std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>();
133 std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
134 1, 16, 0, files, std::move(schema), op_connector_size, columns_to_load, false, 1, 0, false);
135 rc = my_tfreader_op->Init();
136 EXPECT_TRUE(rc.IsOk());
137 rc = my_tree->AssociateNode(my_tfreader_op);
138 EXPECT_TRUE(rc.IsOk());
139 std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(4, 100, op_connector_size, true);
140 rc = my_tree->AssociateNode(my_shuffle_op);
141 EXPECT_TRUE(rc.IsOk());
142
143 // Set children/root layout.
144 rc = my_shuffle_op->AddChild(my_tfreader_op);
145 EXPECT_TRUE(rc.IsOk());
146 rc = my_tree->AssignRoot(my_shuffle_op);
147 EXPECT_TRUE(rc.IsOk());
148 MS_LOG(INFO) << "Launching tree and begin iteration.";
149 rc = my_tree->Prepare();
150 EXPECT_TRUE(rc.IsOk());
151 rc = my_tree->Launch();
152 EXPECT_TRUE(rc.IsOk());
153
154 // Start the loop of reading tensors from our pipeline
155 DatasetIterator di(my_tree);
156 TensorRow tensor_list;
157 rc = di.FetchNextTensorRow(&tensor_list);
158 EXPECT_TRUE(rc.IsOk());
159 int row_count = 0;
160 while (!tensor_list.empty()) {
161 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
162
163 // Display the tensor by calling the printer on it
164 for (int i = 0; i < tensor_list.size(); i++) {
165 std::ostringstream ss;
166 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
167 MS_LOG(INFO) << "Tensor print: " << ss.str() << ".";
168 }
169 rc = di.FetchNextTensorRow(&tensor_list);
170 EXPECT_TRUE(rc.IsOk());
171 row_count++;
172 }
173 ASSERT_EQ(row_count, 10);
174 }
175
176 // Test info:
177 // - Dataset from testDataset1 has 10 rows, 2 columns.
178 // - RowsPerBuffer buffer setting of 3 does not divide evenly into total rows, thereby causing
179 // partially filled buffers
180 // - Shuffle size captures the entire dataset size (actually sets a value that is larger than the
181 // amount of rows in the dataset.
182 //
183 // Tree: shuffle over TFReader
184 //
185 // ShuffleOp
186 // |
187 // TFReaderOp
188 //
TEST_F(MindDataTestShuffleOp,TestShuffleBasic3)189 TEST_F(MindDataTestShuffleOp, TestShuffleBasic3) {
190 Status rc;
191 MS_LOG(INFO) << "UT test TestShuffleBasic3.";
192
193 // Start with an empty execution tree
194 auto my_tree = std::make_shared<ExecutionTree>();
195
196 std::string dataset_path;
197 dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
198 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
199 auto op_connector_size = config_manager->op_connector_size();
200 std::vector<std::string> columns_to_load = {};
201 std::vector<std::string> files = {dataset_path};
202 std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
203 1, 16, 0, files, std::make_unique<DataSchema>(), op_connector_size, columns_to_load, false, 1, 0, false);
204 rc = my_tfreader_op->Init();
205 EXPECT_TRUE(rc.IsOk());
206 my_tree->AssociateNode(my_tfreader_op);
207 uint32_t shuffle_seed = GetSeed();
208 std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(100, shuffle_seed, op_connector_size, true);
209 rc = my_tree->AssociateNode(my_shuffle_op);
210 EXPECT_TRUE(rc.IsOk());
211
212 // Set children/root layout.
213 rc = my_shuffle_op->AddChild(my_tfreader_op);
214 EXPECT_TRUE(rc.IsOk());
215 rc = my_tree->AssignRoot(my_shuffle_op);
216 EXPECT_TRUE(rc.IsOk());
217 MS_LOG(INFO) << "Launching tree and begin iteration.";
218 rc = my_tree->Prepare();
219 EXPECT_TRUE(rc.IsOk());
220 rc = my_tree->Launch();
221 EXPECT_TRUE(rc.IsOk());
222
223 // Start the loop of reading tensors from our pipeline
224 DatasetIterator di(my_tree);
225 TensorRow tensor_list;
226 rc = di.FetchNextTensorRow(&tensor_list);
227 EXPECT_TRUE(rc.IsOk());
228 int row_count = 0;
229 while (!tensor_list.empty()) {
230 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
231
232 // Display the tensor by calling the printer on it
233 for (int i = 0; i < tensor_list.size(); i++) {
234 std::ostringstream ss;
235 ss << "(" << tensor_list[i] << "): " << *tensor_list[i] << std::endl;
236 MS_LOG(INFO) << "Tensor print: " << common::SafeCStr(ss.str()) << ".";
237 }
238 rc = di.FetchNextTensorRow(&tensor_list);
239 EXPECT_TRUE(rc.IsOk());
240 row_count++;
241 }
242 ASSERT_EQ(row_count, 10);
243 }
244
245 // Test info:
246 // - Dataset from testDataset1 has 10 rows, 2 columns.
247 // - RowsPerBuffer buffer setting of 3 does not divide evenly into total rows thereby causing
248 // partially filled buffers
249 // - Shuffle size is not a multiple of rows per buffer.
250 // - shuffle seed is given, and subsequent epochs will change the seed each time.
251 // - Repeat count of 2
252 //
253 // Tree: Repeat over shuffle over TFReader
254 //
255 // Repeat
256 // |
257 // shuffle
258 // |
259 // TFReaderOp
260 //
TEST_F(MindDataTestShuffleOp,TestRepeatShuffle)261 TEST_F(MindDataTestShuffleOp, TestRepeatShuffle) {
262 Status rc;
263 MS_LOG(INFO) << "UT test TestRepeatShuffle.";
264
265 // Start with an empty execution tree
266 auto my_tree = std::make_shared<ExecutionTree>();
267
268 std::string dataset_path;
269 dataset_path = datasets_root_path_ + "/testDataset1/testDataset1.data";
270 std::shared_ptr<ConfigManager> config_manager = GlobalContext::config_manager();
271 int32_t op_connector_size = config_manager->op_connector_size();
272 std::vector<std::string> columns_to_load = {};
273 std::vector<std::string> files = {dataset_path};
274 std::shared_ptr<TFReaderOp> my_tfreader_op = std::make_shared<TFReaderOp>(
275 2, 16, 0, files, std::make_unique<DataSchema>(), op_connector_size, columns_to_load, false, 1, 0, false);
276 rc = my_tfreader_op->Init();
277 EXPECT_TRUE(rc.IsOk());
278 rc = my_tree->AssociateNode(my_tfreader_op);
279 EXPECT_TRUE(rc.IsOk());
280 std::shared_ptr<ShuffleOp> my_shuffle_op = std::make_shared<ShuffleOp>(4, 100, op_connector_size, true);
281 rc = my_tree->AssociateNode(my_shuffle_op);
282 EXPECT_TRUE(rc.IsOk());
283 uint32_t num_repeats = 2;
284 std::shared_ptr<RepeatOp> my_repeat_op = std::make_shared<RepeatOp>(num_repeats);
285 rc = my_tree->AssociateNode(my_repeat_op);
286 EXPECT_TRUE(rc.IsOk());
287
288 // Set children/root layout.
289 my_shuffle_op->SetTotalRepeats(num_repeats);
290 my_shuffle_op->SetNumRepeatsPerEpoch(num_repeats);
291 rc = my_repeat_op->AddChild(my_shuffle_op);
292 EXPECT_TRUE(rc.IsOk());
293 my_tfreader_op->SetTotalRepeats(num_repeats);
294 my_tfreader_op->SetNumRepeatsPerEpoch(num_repeats);
295 rc = my_shuffle_op->AddChild(my_tfreader_op);
296 EXPECT_TRUE(rc.IsOk());
297 rc = my_tree->AssignRoot(my_repeat_op);
298 EXPECT_TRUE(rc.IsOk());
299 MS_LOG(INFO) << "Launching tree and begin iteration.";
300 rc = my_tree->Prepare();
301 EXPECT_TRUE(rc.IsOk());
302 rc = my_tree->Launch();
303 EXPECT_TRUE(rc.IsOk());
304
305 // Start the loop of reading tensors from our pipeline
306 DatasetIterator di(my_tree);
307 TensorRow tensor_list;
308 rc = di.FetchNextTensorRow(&tensor_list);
309 EXPECT_TRUE(rc.IsOk());
310 int row_count = 0;
311 while (!tensor_list.empty()) {
312 MS_LOG(INFO) << "Row display for row #: " << row_count << ".";
313
314 // Display the tensor by calling the printer on it
315 for (int i = 0; i < tensor_list.size(); i++) {
316 std::ostringstream ss;
317 ss << *tensor_list[i] << std::endl;
318 MS_LOG(INFO) << "Tensor print: " << common::SafeCStr(ss.str()) << ".";
319 }
320 rc = di.FetchNextTensorRow(&tensor_list);
321 EXPECT_TRUE(rc.IsOk());
322 row_count++;
323 }
324 ASSERT_EQ(row_count, 20);
325 }
326