1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "common/common.h"
17 #include "minddata/dataset/core/global_context.h"
18 #include "minddata/dataset/include/dataset/datasets.h"
19
20 using namespace mindspore::dataset;
21
22 using mindspore::dataset::ShuffleMode;
23
24 class MindDataTestPipeline : public UT::DatasetOpTesting {
25 protected:
26 };
27
TEST_F(MindDataTestPipeline,TestTextFileDatasetBasic)28 TEST_F(MindDataTestPipeline, TestTextFileDatasetBasic) {
29 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetBasic.";
30 // Test TextFile Dataset with single text file and many default inputs
31
32 // Set configuration
33 uint32_t original_seed = GlobalContext::config_manager()->seed();
34 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
35 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
36 GlobalContext::config_manager()->set_seed(987);
37 GlobalContext::config_manager()->set_num_parallel_workers(4);
38
39 // Create a TextFile Dataset, with single text file
40 // Note: 1.txt has 3 rows
41 // Use 2 samples
42 // Use defaults for other input parameters
43 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
44 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 2);
45 EXPECT_NE(ds, nullptr);
46
47 // Create an iterator over the result of the above dataset.
48 // This will trigger the creation of the Execution Tree and launch it.
49 std::shared_ptr<Iterator> iter = ds->CreateIterator();
50 EXPECT_NE(iter, nullptr);
51
52 // Iterate the dataset and get each row
53 std::unordered_map<std::string, mindspore::MSTensor> row;
54 ASSERT_OK(iter->GetNextRow(&row));
55
56 EXPECT_NE(row.find("text"), row.end());
57 std::vector<std::string> expected_result = {"Be happy every day.", "This is a text file."};
58
59 uint64_t i = 0;
60 while (row.size() != 0) {
61 auto text = row["text"];
62 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
63 std::shared_ptr<Tensor> de_text;
64 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
65 std::string_view sv;
66 ASSERT_OK(de_text->GetItemAt(&sv, {}));
67 std::string ss(sv);
68 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
69 // Compare against expected result
70 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
71
72 i++;
73 ASSERT_OK(iter->GetNextRow(&row));
74 }
75
76 // Expect 2 samples
77 EXPECT_EQ(i, 2);
78
79 // Manually terminate the pipeline
80 iter->Stop();
81
82 // Restore configuration
83 GlobalContext::config_manager()->set_seed(original_seed);
84 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
85 }
86
TEST_F(MindDataTestPipeline,TestTextFileDatasetBasicWithPipeline)87 TEST_F(MindDataTestPipeline, TestTextFileDatasetBasicWithPipeline) {
88 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetBasicWithPipeline.";
89 // Test TextFile Dataset with single text file and many default inputs
90
91 // Set configuration
92 uint32_t original_seed = GlobalContext::config_manager()->seed();
93 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
94 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
95 GlobalContext::config_manager()->set_seed(987);
96 GlobalContext::config_manager()->set_num_parallel_workers(4);
97
98 // Create two TextFile Dataset, with single text file
99 // Note: 1.txt has 3 rows
100 // Use 2 samples
101 // Use defaults for other input parameters
102 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
103 std::shared_ptr<Dataset> ds1 = TextFile({tf_file1}, 2);
104 std::shared_ptr<Dataset> ds2 = TextFile({tf_file1}, 2);
105 EXPECT_NE(ds1, nullptr);
106 EXPECT_NE(ds2, nullptr);
107
108 // Create two Repeat operation on ds
109 int32_t repeat_num = 2;
110 ds1 = ds1->Repeat(repeat_num);
111 EXPECT_NE(ds1, nullptr);
112 repeat_num = 3;
113 ds2 = ds2->Repeat(repeat_num);
114 EXPECT_NE(ds2, nullptr);
115
116 // Create a Concat operation on the ds
117 ds1 = ds1->Concat({ds2});
118 EXPECT_NE(ds1, nullptr);
119
120 // Create an iterator over the result of the above dataset.
121 // This will trigger the creation of the Execution Tree and launch it.
122 std::shared_ptr<Iterator> iter = ds1->CreateIterator();
123 EXPECT_NE(iter, nullptr);
124
125 // Iterate the dataset and get each row
126 std::unordered_map<std::string, mindspore::MSTensor> row;
127 ASSERT_OK(iter->GetNextRow(&row));
128
129 EXPECT_NE(row.find("text"), row.end());
130 std::vector<std::string> expected_result = {"Be happy every day.", "This is a text file."};
131
132 uint64_t i = 0;
133 while (row.size() != 0) {
134 auto text = row["text"];
135 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
136 i++;
137 ASSERT_OK(iter->GetNextRow(&row));
138 }
139
140 // Expect 10 samples
141 EXPECT_EQ(i, 10);
142
143 // Manually terminate the pipeline
144 iter->Stop();
145
146 // Restore configuration
147 GlobalContext::config_manager()->set_seed(original_seed);
148 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
149 }
150
TEST_F(MindDataTestPipeline,TestTextFileGetters)151 TEST_F(MindDataTestPipeline, TestTextFileGetters) {
152 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileGetters.";
153 // Test TextFile Dataset with single text file and many default inputs
154
155 // Set configuration
156 uint32_t original_seed = GlobalContext::config_manager()->seed();
157 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
158 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
159 GlobalContext::config_manager()->set_seed(987);
160 GlobalContext::config_manager()->set_num_parallel_workers(4);
161
162 // Create a TextFile Dataset, with single text file
163 // Note: 1.txt has 3 rows
164 // Use 2 samples
165 // Use defaults for other input parameters
166 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
167 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 2);
168 EXPECT_NE(ds, nullptr);
169
170 std::vector<std::string> column_names = {"text"};
171 EXPECT_EQ(ds->GetDatasetSize(), 2);
172 EXPECT_EQ(ds->GetColumnNames(), column_names);
173
174 ds = TextFile({tf_file1}, 0);
175 EXPECT_NE(ds, nullptr);
176
177 EXPECT_EQ(ds->GetDatasetSize(), 3);
178 // Restore configuration
179 GlobalContext::config_manager()->set_seed(original_seed);
180 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
181 }
182
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail1)183 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail1) {
184 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail1.";
185
186 // Create a TextFile Dataset
187 // with invalid samplers=-1
188 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
189 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, -1);
190 EXPECT_NE(ds, nullptr);
191
192 // Create an iterator over the result of the above dataset.
193 std::shared_ptr<Iterator> iter = ds->CreateIterator();
194 // Expect failure: TextFile number of samples cannot be negative
195 EXPECT_EQ(iter, nullptr);
196 }
197
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail2)198 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail2) {
199 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail2.";
200
201 // Attempt to create a TextFile Dataset
202 // with wrongful empty dataset_files input
203 std::shared_ptr<Dataset> ds = TextFile({});
204 EXPECT_NE(ds, nullptr);
205
206 // Create an iterator over the result of the above dataset.
207 std::shared_ptr<Iterator> iter = ds->CreateIterator();
208 // Expect failure: dataset_files is not specified
209 EXPECT_EQ(iter, nullptr);
210 }
211
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail3)212 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail3) {
213 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail3.";
214
215 // Create a TextFile Dataset
216 // with non-existent dataset_files input
217 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
218 std::shared_ptr<Dataset> ds = TextFile({tf_file1, "notexist.txt"}, 0, ShuffleMode::kFalse);
219 EXPECT_NE(ds, nullptr);
220
221 // Create an iterator over the result of the above dataset.
222 std::shared_ptr<Iterator> iter = ds->CreateIterator();
223 // Expect failure: specified dataset_files does not exist
224 EXPECT_EQ(iter, nullptr);
225 }
226
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail4)227 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail4) {
228 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail4.";
229
230 // Create a TextFile Dataset
231 // with empty string dataset_files input
232 std::shared_ptr<Dataset> ds = TextFile({""}, 0, ShuffleMode::kFiles);
233 EXPECT_NE(ds, nullptr);
234
235 // Create an iterator over the result of the above dataset.
236 std::shared_ptr<Iterator> iter = ds->CreateIterator();
237 // Expect failure: specified dataset_files does not exist
238 EXPECT_EQ(iter, nullptr);
239 }
240
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail5)241 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail5) {
242 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail5.";
243
244 // Create a TextFile Dataset
245 // with invalid num_shards=0 value
246 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
247 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 1, ShuffleMode::kFalse, 0);
248 EXPECT_NE(ds, nullptr);
249
250 // Create an iterator over the result of the above dataset.
251 std::shared_ptr<Iterator> iter = ds->CreateIterator();
252 // Expect failure: Number of shards cannot be <=0
253 EXPECT_EQ(iter, nullptr);
254 }
255
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail6)256 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail6) {
257 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail6.";
258
259 // Create a TextFile Dataset
260 // with invalid shard_id=-1 value
261 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
262 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 0, ShuffleMode::kFiles, -1);
263 EXPECT_NE(ds, nullptr);
264
265 // Create an iterator over the result of the above dataset.
266 std::shared_ptr<Iterator> iter = ds->CreateIterator();
267 // Expect failure: shard_id cannot be negative
268 EXPECT_EQ(iter, nullptr);
269 }
270
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail7)271 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail7) {
272 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail7.";
273
274 // Create a TextFile Dataset
275 // with invalid shard_id=2 and num_shards=2 combination
276 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
277 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 0, ShuffleMode::kGlobal, 2, 2);
278 EXPECT_NE(ds, nullptr);
279
280 // Create an iterator over the result of the above dataset.
281 std::shared_ptr<Iterator> iter = ds->CreateIterator();
282 // Expect failure: Cannot have shard_id >= num_shards
283 EXPECT_EQ(iter, nullptr);
284 }
285
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFalse1A)286 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFalse1A) {
287 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFalse1A.";
288 // Test TextFile Dataset with two text files and no shuffle, num_parallel_workers=1
289
290 // Set configuration
291 uint32_t original_seed = GlobalContext::config_manager()->seed();
292 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
293 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
294 GlobalContext::config_manager()->set_seed(654);
295 GlobalContext::config_manager()->set_num_parallel_workers(1);
296
297 // Create a TextFile Dataset, with two text files, 1.txt then 2.txt, in lexicographical order.
298 // Note: 1.txt has 3 rows
299 // Note: 2.txt has 2 rows
300 // Use default of all samples
301 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
302 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
303 std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFalse);
304 EXPECT_NE(ds, nullptr);
305
306 // Create an iterator over the result of the above dataset.
307 // This will trigger the creation of the Execution Tree and launch it.
308 std::shared_ptr<Iterator> iter = ds->CreateIterator();
309 EXPECT_NE(iter, nullptr);
310
311 // Iterate the dataset and get each row
312 std::unordered_map<std::string, mindspore::MSTensor> row;
313 ASSERT_OK(iter->GetNextRow(&row));
314
315 EXPECT_NE(row.find("text"), row.end());
316 std::vector<std::string> expected_result = {"This is a text file.", "Be happy every day.", "Good luck to everyone.",
317 "Another file.", "End of file."};
318
319 uint64_t i = 0;
320 while (row.size() != 0) {
321 auto text = row["text"];
322 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
323 std::shared_ptr<Tensor> de_text;
324 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
325 std::string_view sv;
326 ASSERT_OK(de_text->GetItemAt(&sv, {}));
327 std::string ss(sv);
328 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
329 // Compare against expected result
330 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
331
332 i++;
333 ASSERT_OK(iter->GetNextRow(&row));
334 }
335
336 // Expect 2 + 3 = 5 samples
337 EXPECT_EQ(i, 5);
338
339 // Manually terminate the pipeline
340 iter->Stop();
341
342 // Restore configuration
343 GlobalContext::config_manager()->set_seed(original_seed);
344 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
345 }
346
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFalse1B)347 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFalse1B) {
348 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFalse1B.";
349 // Test TextFile Dataset with two text files and no shuffle, num_parallel_workers=1
350
351 // Set configuration
352 uint32_t original_seed = GlobalContext::config_manager()->seed();
353 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
354 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
355 GlobalContext::config_manager()->set_seed(654);
356 GlobalContext::config_manager()->set_num_parallel_workers(1);
357
358 // Create a TextFile Dataset, with two text files, 2.txt then 1.txt, in non-lexicographical order
359 // Note: 1.txt has 3 rows
360 // Note: 2.txt has 2 rows
361 // Use default of all samples
362 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
363 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
364 std::shared_ptr<Dataset> ds = TextFile({tf_file2, tf_file1}, 0, ShuffleMode::kFalse);
365 EXPECT_NE(ds, nullptr);
366
367 // Create an iterator over the result of the above dataset.
368 // This will trigger the creation of the Execution Tree and launch it.
369 std::shared_ptr<Iterator> iter = ds->CreateIterator();
370 EXPECT_NE(iter, nullptr);
371
372 // Iterate the dataset and get each row
373 std::unordered_map<std::string, mindspore::MSTensor> row;
374 ASSERT_OK(iter->GetNextRow(&row));
375
376 EXPECT_NE(row.find("text"), row.end());
377 std::vector<std::string> expected_result = {"This is a text file.", "Be happy every day.", "Good luck to everyone.",
378 "Another file.", "End of file."};
379
380 uint64_t i = 0;
381 while (row.size() != 0) {
382 auto text = row["text"];
383 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
384 std::shared_ptr<Tensor> de_text;
385 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
386 std::string_view sv;
387 ASSERT_OK(de_text->GetItemAt(&sv, {}));
388 std::string ss(sv);
389 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
390 // Compare against expected result
391 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
392
393 i++;
394 ASSERT_OK(iter->GetNextRow(&row));
395 }
396
397 // Expect 2 + 3 = 5 samples
398 EXPECT_EQ(i, 5);
399
400 // Manually terminate the pipeline
401 iter->Stop();
402
403 // Restore configuration
404 GlobalContext::config_manager()->set_seed(original_seed);
405 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
406 }
407
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFalse4Shard)408 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFalse4Shard) {
409 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFalse4Shard.";
410 // Test TextFile Dataset with two text files and no shuffle, num_parallel_workers=4, shard coverage
411
412 // Set configuration
413 uint32_t original_seed = GlobalContext::config_manager()->seed();
414 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
415 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
416 GlobalContext::config_manager()->set_seed(654);
417 GlobalContext::config_manager()->set_num_parallel_workers(4);
418
419 // Create a TextFile Dataset, with two text files
420 // Note: 1.txt has 3 rows
421 // Note: 2.txt has 2 rows
422 // Set shuffle to file shuffle, num_shards=2, shard_id=0
423 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
424 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
425 std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFalse, 2, 0);
426 EXPECT_NE(ds, nullptr);
427
428 // Create an iterator over the result of the above dataset.
429 // This will trigger the creation of the Execution Tree and launch it.
430 std::shared_ptr<Iterator> iter = ds->CreateIterator();
431 EXPECT_NE(iter, nullptr);
432
433 // Iterate the dataset and get each row
434 std::unordered_map<std::string, mindspore::MSTensor> row;
435 ASSERT_OK(iter->GetNextRow(&row));
436
437 EXPECT_NE(row.find("text"), row.end());
438 std::vector<std::string> expected_result = {"This is a text file.", "Be happy every day.", "Good luck to everyone."};
439
440 uint64_t i = 0;
441 while (row.size() != 0) {
442 auto text = row["text"];
443 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
444 std::shared_ptr<Tensor> de_text;
445 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
446 std::string_view sv;
447 ASSERT_OK(de_text->GetItemAt(&sv, {}));
448 std::string ss(sv);
449 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
450 // Compare against expected result
451 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
452
453 i++;
454 ASSERT_OK(iter->GetNextRow(&row));
455 }
456
457 // Expect 3 samples for this shard
458 EXPECT_EQ(i, 3);
459
460 // Manually terminate the pipeline
461 iter->Stop();
462
463 // Restore configuration
464 GlobalContext::config_manager()->set_seed(original_seed);
465 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
466 }
467
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFiles1A)468 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFiles1A) {
469 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFiles1A.";
470 // Test TextFile Dataset with files shuffle, num_parallel_workers=1
471
472 // Set configuration
473 uint32_t original_seed = GlobalContext::config_manager()->seed();
474 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
475 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
476 GlobalContext::config_manager()->set_seed(135);
477 GlobalContext::config_manager()->set_num_parallel_workers(1);
478
479 // Create a TextFile Dataset, with two text files, 1.txt then 2.txt, in lexicographical order.
480 // Note: 1.txt has 3 rows
481 // Note: 2.txt has 2 rows
482 // Use default of all samples
483 // Set shuffle to files shuffle
484 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
485 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
486 std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFiles);
487 EXPECT_NE(ds, nullptr);
488
489 // Create an iterator over the result of the above dataset.
490 // This will trigger the creation of the Execution Tree and launch it.
491 std::shared_ptr<Iterator> iter = ds->CreateIterator();
492 EXPECT_NE(iter, nullptr);
493
494 // Iterate the dataset and get each row
495 std::unordered_map<std::string, mindspore::MSTensor> row;
496 ASSERT_OK(iter->GetNextRow(&row));
497
498 EXPECT_NE(row.find("text"), row.end());
499 std::vector<std::string> expected_result = {
500 "This is a text file.", "Be happy every day.", "Good luck to everyone.", "Another file.", "End of file.",
501 };
502
503 uint64_t i = 0;
504 while (row.size() != 0) {
505 auto text = row["text"];
506 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
507 std::shared_ptr<Tensor> de_text;
508 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
509 std::string_view sv;
510 ASSERT_OK(de_text->GetItemAt(&sv, {}));
511 std::string ss(sv);
512 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
513 // Compare against expected result
514 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
515
516 i++;
517 ASSERT_OK(iter->GetNextRow(&row));
518 }
519
520 // Expect 2 + 3 = 5 samples
521 EXPECT_EQ(i, 5);
522
523 // Manually terminate the pipeline
524 iter->Stop();
525
526 // Restore configuration
527 GlobalContext::config_manager()->set_seed(original_seed);
528 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
529 }
530
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFiles1B)531 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFiles1B) {
532 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFiles1B.";
533 // Test TextFile Dataset with files shuffle, num_parallel_workers=1
534
535 // Set configuration
536 uint32_t original_seed = GlobalContext::config_manager()->seed();
537 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
538 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
539 GlobalContext::config_manager()->set_seed(135);
540 GlobalContext::config_manager()->set_num_parallel_workers(1);
541
542 // Create a TextFile Dataset, with two text files, 2.txt then 1.txt, in non-lexicographical order.
543 // Note: 1.txt has 3 rows
544 // Note: 2.txt has 2 rows
545 // Use default of all samples
546 // Set shuffle to files shuffle
547 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
548 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
549 std::shared_ptr<Dataset> ds = TextFile({tf_file2, tf_file1}, 0, ShuffleMode::kFiles);
550 EXPECT_NE(ds, nullptr);
551
552 // Create an iterator over the result of the above dataset.
553 // This will trigger the creation of the Execution Tree and launch it.
554 std::shared_ptr<Iterator> iter = ds->CreateIterator();
555 EXPECT_NE(iter, nullptr);
556
557 // Iterate the dataset and get each row
558 std::unordered_map<std::string, mindspore::MSTensor> row;
559 ASSERT_OK(iter->GetNextRow(&row));
560
561 EXPECT_NE(row.find("text"), row.end());
562 std::vector<std::string> expected_result = {
563 "This is a text file.", "Be happy every day.", "Good luck to everyone.", "Another file.", "End of file.",
564 };
565
566 uint64_t i = 0;
567 while (row.size() != 0) {
568 auto text = row["text"];
569 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
570 std::shared_ptr<Tensor> de_text;
571 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
572 std::string_view sv;
573 ASSERT_OK(de_text->GetItemAt(&sv, {}));
574 std::string ss(sv);
575 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
576 // Compare against expected result
577 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
578
579 i++;
580 ASSERT_OK(iter->GetNextRow(&row));
581 }
582
583 // Expect 2 + 3 = 5 samples
584 EXPECT_EQ(i, 5);
585
586 // Manually terminate the pipeline
587 iter->Stop();
588
589 // Restore configuration
590 GlobalContext::config_manager()->set_seed(original_seed);
591 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
592 }
593
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFiles4)594 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFiles4) {
595 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFiles4.";
596 // Test TextFile Dataset with files shuffle, num_parallel_workers=4
597
598 // Set configuration
599 uint32_t original_seed = GlobalContext::config_manager()->seed();
600 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
601 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
602 GlobalContext::config_manager()->set_seed(135);
603 GlobalContext::config_manager()->set_num_parallel_workers(4);
604
605 // Create a TextFile Dataset, with two text files
606 // Note: 1.txt has 3 rows
607 // Note: 2.txt has 2 rows
608 // Use default of all samples
609 // Set shuffle to files shuffle
610 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
611 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
612 std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFiles);
613 EXPECT_NE(ds, nullptr);
614
615 // Create an iterator over the result of the above dataset.
616 // This will trigger the creation of the Execution Tree and launch it.
617 std::shared_ptr<Iterator> iter = ds->CreateIterator();
618 EXPECT_NE(iter, nullptr);
619
620 // Iterate the dataset and get each row
621 std::unordered_map<std::string, mindspore::MSTensor> row;
622 ASSERT_OK(iter->GetNextRow(&row));
623
624 EXPECT_NE(row.find("text"), row.end());
625 std::vector<std::string> expected_result = {"This is a text file.", "Another file.", "Be happy every day.",
626 "End of file.", "Good luck to everyone."};
627
628 uint64_t i = 0;
629 while (row.size() != 0) {
630 auto text = row["text"];
631 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
632 std::shared_ptr<Tensor> de_text;
633 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
634 std::string_view sv;
635 ASSERT_OK(de_text->GetItemAt(&sv, {}));
636 std::string ss(sv);
637 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
638 // Compare against expected result
639 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
640
641 i++;
642 ASSERT_OK(iter->GetNextRow(&row));
643 }
644
645 // Expect 2 + 3 = 5 samples
646 EXPECT_EQ(i, 5);
647
648 // Manually terminate the pipeline
649 iter->Stop();
650
651 // Restore configuration
652 GlobalContext::config_manager()->set_seed(original_seed);
653 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
654 }
655
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleGlobal1A)656 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleGlobal1A) {
657 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleGlobal1A.";
658 // Test TextFile Dataset with 1 text file, global shuffle, num_parallel_workers=1
659
660 // Set configuration
661 uint32_t original_seed = GlobalContext::config_manager()->seed();
662 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
663 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
664 GlobalContext::config_manager()->set_seed(246);
665 GlobalContext::config_manager()->set_num_parallel_workers(1);
666
667 // Create a TextFile Dataset, with two text files
668 // Note: 1.txt has 3 rows
669 // Set shuffle to global shuffle
670 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
671 std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 0, ShuffleMode::kGlobal);
672 EXPECT_NE(ds, nullptr);
673
674 // Create an iterator over the result of the above dataset.
675 // This will trigger the creation of the Execution Tree and launch it.
676 std::shared_ptr<Iterator> iter = ds->CreateIterator();
677 EXPECT_NE(iter, nullptr);
678
679 // Iterate the dataset and get each row
680 std::unordered_map<std::string, mindspore::MSTensor> row;
681 ASSERT_OK(iter->GetNextRow(&row));
682
683 EXPECT_NE(row.find("text"), row.end());
684 std::vector<std::string> expected_result = {"Good luck to everyone.", "This is a text file.", "Be happy every day."};
685
686 uint64_t i = 0;
687 while (row.size() != 0) {
688 auto text = row["text"];
689 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
690 std::shared_ptr<Tensor> de_text;
691 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
692 std::string_view sv;
693 ASSERT_OK(de_text->GetItemAt(&sv, {}));
694 std::string ss(sv);
695 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
696 // Compare against expected result
697 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
698
699 i++;
700 ASSERT_OK(iter->GetNextRow(&row));
701 }
702
703 // Expect 3 samples
704 EXPECT_EQ(i, 3);
705
706 // Manually terminate the pipeline
707 iter->Stop();
708
709 // Restore configuration
710 GlobalContext::config_manager()->set_seed(original_seed);
711 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
712 }
713
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleGlobal1B)714 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleGlobal1B) {
715 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleGlobal1B.";
716 // Test TextFile Dataset with 2 text files, global shuffle, num_parallel_workers=1
717
718 // Set configuration
719 uint32_t original_seed = GlobalContext::config_manager()->seed();
720 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
721 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
722 GlobalContext::config_manager()->set_seed(246);
723 GlobalContext::config_manager()->set_num_parallel_workers(1);
724
725 // Create a TextFile Dataset, with two text files
726 // Note: 1.txt has 3 rows
727 // Note: 2.txt has 2 rows
728 // Set shuffle to global shuffle
729 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
730 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
731 std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kGlobal);
732 EXPECT_NE(ds, nullptr);
733
734 // Create an iterator over the result of the above dataset.
735 // This will trigger the creation of the Execution Tree and launch it.
736 std::shared_ptr<Iterator> iter = ds->CreateIterator();
737 EXPECT_NE(iter, nullptr);
738
739 // Iterate the dataset and get each row
740 std::unordered_map<std::string, mindspore::MSTensor> row;
741 ASSERT_OK(iter->GetNextRow(&row));
742
743 EXPECT_NE(row.find("text"), row.end());
744 std::vector<std::string> expected_result = {"Another file.", "Good luck to everyone.", "This is a text file.",
745 "End of file.", "Be happy every day."};
746
747 uint64_t i = 0;
748 while (row.size() != 0) {
749 auto text = row["text"];
750 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
751 std::shared_ptr<Tensor> de_text;
752 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
753 std::string_view sv;
754 ASSERT_OK(de_text->GetItemAt(&sv, {}));
755 std::string ss(sv);
756 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
757 // Compare against expected result
758 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
759
760 i++;
761 ASSERT_OK(iter->GetNextRow(&row));
762 }
763
764 // Expect 2 + 3 = 5 samples
765 EXPECT_EQ(i, 5);
766
767 // Manually terminate the pipeline
768 iter->Stop();
769
770 // Restore configuration
771 GlobalContext::config_manager()->set_seed(original_seed);
772 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
773 }
774
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleGlobal4)775 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleGlobal4) {
776 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleGlobal4.";
777 // Test TextFile Dataset with 2 text files, global shuffle, num_parallel_workers=4
778
779 // Set configuration
780 uint32_t original_seed = GlobalContext::config_manager()->seed();
781 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
782 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
783 GlobalContext::config_manager()->set_seed(246);
784 GlobalContext::config_manager()->set_num_parallel_workers(4);
785
786 // Create a TextFile Dataset, with two text files
787 // Note: 1.txt has 3 rows
788 // Note: 2.txt has 2 rows
789 // Set shuffle to global shuffle
790 std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
791 std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
792 std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kGlobal);
793 EXPECT_NE(ds, nullptr);
794
795 // Create an iterator over the result of the above dataset.
796 // This will trigger the creation of the Execution Tree and launch it.
797 std::shared_ptr<Iterator> iter = ds->CreateIterator();
798 EXPECT_NE(iter, nullptr);
799
800 // Iterate the dataset and get each row
801 std::unordered_map<std::string, mindspore::MSTensor> row;
802 ASSERT_OK(iter->GetNextRow(&row));
803
804 EXPECT_NE(row.find("text"), row.end());
805 std::vector<std::string> expected_result = {"Another file.", "Good luck to everyone.", "End of file.",
806 "This is a text file.", "Be happy every day."};
807
808 uint64_t i = 0;
809 while (row.size() != 0) {
810 auto text = row["text"];
811 MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
812 std::shared_ptr<Tensor> de_text;
813 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
814 std::string_view sv;
815 ASSERT_OK(de_text->GetItemAt(&sv, {}));
816 std::string ss(sv);
817 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
818 // Compare against expected result
819 EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
820
821 i++;
822 ASSERT_OK(iter->GetNextRow(&row));
823 }
824
825 // Expect 2 + 3 = 5 samples
826 EXPECT_EQ(i, 5);
827
828 // Manually terminate the pipeline
829 iter->Stop();
830
831 // Restore configuration
832 GlobalContext::config_manager()->set_seed(original_seed);
833 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
834 }
835