• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "common/common.h"
17 #include "minddata/dataset/core/global_context.h"
18 #include "minddata/dataset/include/dataset/datasets.h"
19 
20 using namespace mindspore::dataset;
21 
22 using mindspore::dataset::ShuffleMode;
23 
24 class MindDataTestPipeline : public UT::DatasetOpTesting {
25  protected:
26 };
27 
TEST_F(MindDataTestPipeline,TestTextFileDatasetBasic)28 TEST_F(MindDataTestPipeline, TestTextFileDatasetBasic) {
29   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetBasic.";
30   // Test TextFile Dataset with single text file and many default inputs
31 
32   // Set configuration
33   uint32_t original_seed = GlobalContext::config_manager()->seed();
34   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
35   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
36   GlobalContext::config_manager()->set_seed(987);
37   GlobalContext::config_manager()->set_num_parallel_workers(4);
38 
39   // Create a TextFile Dataset, with single text file
40   // Note: 1.txt has 3 rows
41   // Use 2 samples
42   // Use defaults for other input parameters
43   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
44   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 2);
45   EXPECT_NE(ds, nullptr);
46 
47   // Create an iterator over the result of the above dataset.
48   // This will trigger the creation of the Execution Tree and launch it.
49   std::shared_ptr<Iterator> iter = ds->CreateIterator();
50   EXPECT_NE(iter, nullptr);
51 
52   // Iterate the dataset and get each row
53   std::unordered_map<std::string, mindspore::MSTensor> row;
54   ASSERT_OK(iter->GetNextRow(&row));
55 
56   EXPECT_NE(row.find("text"), row.end());
57   std::vector<std::string> expected_result = {"Be happy every day.", "This is a text file."};
58 
59   uint64_t i = 0;
60   while (row.size() != 0) {
61     auto text = row["text"];
62     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
63     std::shared_ptr<Tensor> de_text;
64     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
65     std::string_view sv;
66     ASSERT_OK(de_text->GetItemAt(&sv, {}));
67     std::string ss(sv);
68     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
69     // Compare against expected result
70     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
71 
72     i++;
73     ASSERT_OK(iter->GetNextRow(&row));
74   }
75 
76   // Expect 2 samples
77   EXPECT_EQ(i, 2);
78 
79   // Manually terminate the pipeline
80   iter->Stop();
81 
82   // Restore configuration
83   GlobalContext::config_manager()->set_seed(original_seed);
84   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
85 }
86 
TEST_F(MindDataTestPipeline,TestTextFileDatasetBasicWithPipeline)87 TEST_F(MindDataTestPipeline, TestTextFileDatasetBasicWithPipeline) {
88   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetBasicWithPipeline.";
89   // Test TextFile Dataset with single text file and many default inputs
90 
91   // Set configuration
92   uint32_t original_seed = GlobalContext::config_manager()->seed();
93   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
94   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
95   GlobalContext::config_manager()->set_seed(987);
96   GlobalContext::config_manager()->set_num_parallel_workers(4);
97 
98   // Create two TextFile Dataset, with single text file
99   // Note: 1.txt has 3 rows
100   // Use 2 samples
101   // Use defaults for other input parameters
102   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
103   std::shared_ptr<Dataset> ds1 = TextFile({tf_file1}, 2);
104   std::shared_ptr<Dataset> ds2 = TextFile({tf_file1}, 2);
105   EXPECT_NE(ds1, nullptr);
106   EXPECT_NE(ds2, nullptr);
107 
108   // Create two Repeat operation on ds
109   int32_t repeat_num = 2;
110   ds1 = ds1->Repeat(repeat_num);
111   EXPECT_NE(ds1, nullptr);
112   repeat_num = 3;
113   ds2 = ds2->Repeat(repeat_num);
114   EXPECT_NE(ds2, nullptr);
115 
116   // Create a Concat operation on the ds
117   ds1 = ds1->Concat({ds2});
118   EXPECT_NE(ds1, nullptr);
119 
120   // Create an iterator over the result of the above dataset.
121   // This will trigger the creation of the Execution Tree and launch it.
122   std::shared_ptr<Iterator> iter = ds1->CreateIterator();
123   EXPECT_NE(iter, nullptr);
124 
125   // Iterate the dataset and get each row
126   std::unordered_map<std::string, mindspore::MSTensor> row;
127   ASSERT_OK(iter->GetNextRow(&row));
128 
129   EXPECT_NE(row.find("text"), row.end());
130   std::vector<std::string> expected_result = {"Be happy every day.", "This is a text file."};
131 
132   uint64_t i = 0;
133   while (row.size() != 0) {
134     auto text = row["text"];
135     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
136     i++;
137     ASSERT_OK(iter->GetNextRow(&row));
138   }
139 
140   // Expect 10 samples
141   EXPECT_EQ(i, 10);
142 
143   // Manually terminate the pipeline
144   iter->Stop();
145 
146   // Restore configuration
147   GlobalContext::config_manager()->set_seed(original_seed);
148   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
149 }
150 
TEST_F(MindDataTestPipeline,TestTextFileGetters)151 TEST_F(MindDataTestPipeline, TestTextFileGetters) {
152   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileGetters.";
153   // Test TextFile Dataset with single text file and many default inputs
154 
155   // Set configuration
156   uint32_t original_seed = GlobalContext::config_manager()->seed();
157   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
158   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
159   GlobalContext::config_manager()->set_seed(987);
160   GlobalContext::config_manager()->set_num_parallel_workers(4);
161 
162   // Create a TextFile Dataset, with single text file
163   // Note: 1.txt has 3 rows
164   // Use 2 samples
165   // Use defaults for other input parameters
166   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
167   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 2);
168   EXPECT_NE(ds, nullptr);
169 
170   std::vector<std::string> column_names = {"text"};
171   EXPECT_EQ(ds->GetDatasetSize(), 2);
172   EXPECT_EQ(ds->GetColumnNames(), column_names);
173 
174   ds = TextFile({tf_file1}, 0);
175   EXPECT_NE(ds, nullptr);
176 
177   EXPECT_EQ(ds->GetDatasetSize(), 3);
178   // Restore configuration
179   GlobalContext::config_manager()->set_seed(original_seed);
180   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
181 }
182 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail1)183 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail1) {
184   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail1.";
185 
186   // Create a TextFile Dataset
187   // with invalid samplers=-1
188   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
189   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, -1);
190   EXPECT_NE(ds, nullptr);
191 
192   // Create an iterator over the result of the above dataset.
193   std::shared_ptr<Iterator> iter = ds->CreateIterator();
194   // Expect failure: TextFile number of samples cannot be negative
195   EXPECT_EQ(iter, nullptr);
196 }
197 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail2)198 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail2) {
199   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail2.";
200 
201   // Attempt to create a TextFile Dataset
202   // with wrongful empty dataset_files input
203   std::shared_ptr<Dataset> ds = TextFile({});
204   EXPECT_NE(ds, nullptr);
205 
206   // Create an iterator over the result of the above dataset.
207   std::shared_ptr<Iterator> iter = ds->CreateIterator();
208   // Expect failure: dataset_files is not specified
209   EXPECT_EQ(iter, nullptr);
210 }
211 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail3)212 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail3) {
213   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail3.";
214 
215   // Create a TextFile Dataset
216   // with non-existent dataset_files input
217   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
218   std::shared_ptr<Dataset> ds = TextFile({tf_file1, "notexist.txt"}, 0, ShuffleMode::kFalse);
219   EXPECT_NE(ds, nullptr);
220 
221   // Create an iterator over the result of the above dataset.
222   std::shared_ptr<Iterator> iter = ds->CreateIterator();
223   // Expect failure: specified dataset_files does not exist
224   EXPECT_EQ(iter, nullptr);
225 }
226 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail4)227 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail4) {
228   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail4.";
229 
230   // Create a TextFile Dataset
231   // with empty string dataset_files input
232   std::shared_ptr<Dataset> ds = TextFile({""}, 0, ShuffleMode::kFiles);
233   EXPECT_NE(ds, nullptr);
234 
235   // Create an iterator over the result of the above dataset.
236   std::shared_ptr<Iterator> iter = ds->CreateIterator();
237   // Expect failure: specified dataset_files does not exist
238   EXPECT_EQ(iter, nullptr);
239 }
240 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail5)241 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail5) {
242   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail5.";
243 
244   // Create a TextFile Dataset
245   // with invalid num_shards=0 value
246   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
247   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 1, ShuffleMode::kFalse, 0);
248   EXPECT_NE(ds, nullptr);
249 
250   // Create an iterator over the result of the above dataset.
251   std::shared_ptr<Iterator> iter = ds->CreateIterator();
252   // Expect failure: Number of shards cannot be <=0
253   EXPECT_EQ(iter, nullptr);
254 }
255 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail6)256 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail6) {
257   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail6.";
258 
259   // Create a TextFile Dataset
260   // with invalid shard_id=-1 value
261   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
262   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 0, ShuffleMode::kFiles, -1);
263   EXPECT_NE(ds, nullptr);
264 
265   // Create an iterator over the result of the above dataset.
266   std::shared_ptr<Iterator> iter = ds->CreateIterator();
267   // Expect failure: shard_id cannot be negative
268   EXPECT_EQ(iter, nullptr);
269 }
270 
TEST_F(MindDataTestPipeline,TestTextFileDatasetFail7)271 TEST_F(MindDataTestPipeline, TestTextFileDatasetFail7) {
272   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetFail7.";
273 
274   // Create a TextFile Dataset
275   // with invalid shard_id=2 and num_shards=2 combination
276   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
277   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 0, ShuffleMode::kGlobal, 2, 2);
278   EXPECT_NE(ds, nullptr);
279 
280   // Create an iterator over the result of the above dataset.
281   std::shared_ptr<Iterator> iter = ds->CreateIterator();
282   // Expect failure: Cannot have shard_id >= num_shards
283   EXPECT_EQ(iter, nullptr);
284 }
285 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFalse1A)286 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFalse1A) {
287   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFalse1A.";
288   // Test TextFile Dataset with two text files and no shuffle, num_parallel_workers=1
289 
290   // Set configuration
291   uint32_t original_seed = GlobalContext::config_manager()->seed();
292   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
293   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
294   GlobalContext::config_manager()->set_seed(654);
295   GlobalContext::config_manager()->set_num_parallel_workers(1);
296 
297   // Create a TextFile Dataset, with two text files, 1.txt then 2.txt, in lexicographical order.
298   // Note: 1.txt has 3 rows
299   // Note: 2.txt has 2 rows
300   // Use default of all samples
301   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
302   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
303   std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFalse);
304   EXPECT_NE(ds, nullptr);
305 
306   // Create an iterator over the result of the above dataset.
307   // This will trigger the creation of the Execution Tree and launch it.
308   std::shared_ptr<Iterator> iter = ds->CreateIterator();
309   EXPECT_NE(iter, nullptr);
310 
311   // Iterate the dataset and get each row
312   std::unordered_map<std::string, mindspore::MSTensor> row;
313   ASSERT_OK(iter->GetNextRow(&row));
314 
315   EXPECT_NE(row.find("text"), row.end());
316   std::vector<std::string> expected_result = {"This is a text file.", "Be happy every day.", "Good luck to everyone.",
317                                               "Another file.", "End of file."};
318 
319   uint64_t i = 0;
320   while (row.size() != 0) {
321     auto text = row["text"];
322     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
323     std::shared_ptr<Tensor> de_text;
324     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
325     std::string_view sv;
326     ASSERT_OK(de_text->GetItemAt(&sv, {}));
327     std::string ss(sv);
328     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
329     // Compare against expected result
330     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
331 
332     i++;
333     ASSERT_OK(iter->GetNextRow(&row));
334   }
335 
336   // Expect 2 + 3 = 5 samples
337   EXPECT_EQ(i, 5);
338 
339   // Manually terminate the pipeline
340   iter->Stop();
341 
342   // Restore configuration
343   GlobalContext::config_manager()->set_seed(original_seed);
344   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
345 }
346 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFalse1B)347 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFalse1B) {
348   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFalse1B.";
349   // Test TextFile Dataset with two text files and no shuffle, num_parallel_workers=1
350 
351   // Set configuration
352   uint32_t original_seed = GlobalContext::config_manager()->seed();
353   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
354   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
355   GlobalContext::config_manager()->set_seed(654);
356   GlobalContext::config_manager()->set_num_parallel_workers(1);
357 
358   // Create a TextFile Dataset, with two text files, 2.txt then 1.txt, in non-lexicographical order
359   // Note: 1.txt has 3 rows
360   // Note: 2.txt has 2 rows
361   // Use default of all samples
362   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
363   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
364   std::shared_ptr<Dataset> ds = TextFile({tf_file2, tf_file1}, 0, ShuffleMode::kFalse);
365   EXPECT_NE(ds, nullptr);
366 
367   // Create an iterator over the result of the above dataset.
368   // This will trigger the creation of the Execution Tree and launch it.
369   std::shared_ptr<Iterator> iter = ds->CreateIterator();
370   EXPECT_NE(iter, nullptr);
371 
372   // Iterate the dataset and get each row
373   std::unordered_map<std::string, mindspore::MSTensor> row;
374   ASSERT_OK(iter->GetNextRow(&row));
375 
376   EXPECT_NE(row.find("text"), row.end());
377   std::vector<std::string> expected_result = {"This is a text file.", "Be happy every day.", "Good luck to everyone.",
378                                               "Another file.", "End of file."};
379 
380   uint64_t i = 0;
381   while (row.size() != 0) {
382     auto text = row["text"];
383     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
384     std::shared_ptr<Tensor> de_text;
385     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
386     std::string_view sv;
387     ASSERT_OK(de_text->GetItemAt(&sv, {}));
388     std::string ss(sv);
389     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
390     // Compare against expected result
391     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
392 
393     i++;
394     ASSERT_OK(iter->GetNextRow(&row));
395   }
396 
397   // Expect 2 + 3 = 5 samples
398   EXPECT_EQ(i, 5);
399 
400   // Manually terminate the pipeline
401   iter->Stop();
402 
403   // Restore configuration
404   GlobalContext::config_manager()->set_seed(original_seed);
405   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
406 }
407 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFalse4Shard)408 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFalse4Shard) {
409   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFalse4Shard.";
410   // Test TextFile Dataset with two text files and no shuffle, num_parallel_workers=4, shard coverage
411 
412   // Set configuration
413   uint32_t original_seed = GlobalContext::config_manager()->seed();
414   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
415   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
416   GlobalContext::config_manager()->set_seed(654);
417   GlobalContext::config_manager()->set_num_parallel_workers(4);
418 
419   // Create a TextFile Dataset, with two text files
420   // Note: 1.txt has 3 rows
421   // Note: 2.txt has 2 rows
422   // Set shuffle to file shuffle, num_shards=2, shard_id=0
423   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
424   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
425   std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFalse, 2, 0);
426   EXPECT_NE(ds, nullptr);
427 
428   // Create an iterator over the result of the above dataset.
429   // This will trigger the creation of the Execution Tree and launch it.
430   std::shared_ptr<Iterator> iter = ds->CreateIterator();
431   EXPECT_NE(iter, nullptr);
432 
433   // Iterate the dataset and get each row
434   std::unordered_map<std::string, mindspore::MSTensor> row;
435   ASSERT_OK(iter->GetNextRow(&row));
436 
437   EXPECT_NE(row.find("text"), row.end());
438   std::vector<std::string> expected_result = {"This is a text file.", "Be happy every day.", "Good luck to everyone."};
439 
440   uint64_t i = 0;
441   while (row.size() != 0) {
442     auto text = row["text"];
443     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
444     std::shared_ptr<Tensor> de_text;
445     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
446     std::string_view sv;
447     ASSERT_OK(de_text->GetItemAt(&sv, {}));
448     std::string ss(sv);
449     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
450     // Compare against expected result
451     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
452 
453     i++;
454     ASSERT_OK(iter->GetNextRow(&row));
455   }
456 
457   // Expect 3 samples for this shard
458   EXPECT_EQ(i, 3);
459 
460   // Manually terminate the pipeline
461   iter->Stop();
462 
463   // Restore configuration
464   GlobalContext::config_manager()->set_seed(original_seed);
465   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
466 }
467 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFiles1A)468 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFiles1A) {
469   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFiles1A.";
470   // Test TextFile Dataset with files shuffle, num_parallel_workers=1
471 
472   // Set configuration
473   uint32_t original_seed = GlobalContext::config_manager()->seed();
474   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
475   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
476   GlobalContext::config_manager()->set_seed(135);
477   GlobalContext::config_manager()->set_num_parallel_workers(1);
478 
479   // Create a TextFile Dataset, with two text files, 1.txt then 2.txt, in lexicographical order.
480   // Note: 1.txt has 3 rows
481   // Note: 2.txt has 2 rows
482   // Use default of all samples
483   // Set shuffle to files shuffle
484   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
485   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
486   std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFiles);
487   EXPECT_NE(ds, nullptr);
488 
489   // Create an iterator over the result of the above dataset.
490   // This will trigger the creation of the Execution Tree and launch it.
491   std::shared_ptr<Iterator> iter = ds->CreateIterator();
492   EXPECT_NE(iter, nullptr);
493 
494   // Iterate the dataset and get each row
495   std::unordered_map<std::string, mindspore::MSTensor> row;
496   ASSERT_OK(iter->GetNextRow(&row));
497 
498   EXPECT_NE(row.find("text"), row.end());
499   std::vector<std::string> expected_result = {
500     "This is a text file.", "Be happy every day.", "Good luck to everyone.", "Another file.", "End of file.",
501   };
502 
503   uint64_t i = 0;
504   while (row.size() != 0) {
505     auto text = row["text"];
506     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
507     std::shared_ptr<Tensor> de_text;
508     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
509     std::string_view sv;
510     ASSERT_OK(de_text->GetItemAt(&sv, {}));
511     std::string ss(sv);
512     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
513     // Compare against expected result
514     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
515 
516     i++;
517     ASSERT_OK(iter->GetNextRow(&row));
518   }
519 
520   // Expect 2 + 3 = 5 samples
521   EXPECT_EQ(i, 5);
522 
523   // Manually terminate the pipeline
524   iter->Stop();
525 
526   // Restore configuration
527   GlobalContext::config_manager()->set_seed(original_seed);
528   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
529 }
530 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFiles1B)531 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFiles1B) {
532   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFiles1B.";
533   // Test TextFile Dataset with files shuffle, num_parallel_workers=1
534 
535   // Set configuration
536   uint32_t original_seed = GlobalContext::config_manager()->seed();
537   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
538   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
539   GlobalContext::config_manager()->set_seed(135);
540   GlobalContext::config_manager()->set_num_parallel_workers(1);
541 
542   // Create a TextFile Dataset, with two text files, 2.txt then 1.txt, in non-lexicographical order.
543   // Note: 1.txt has 3 rows
544   // Note: 2.txt has 2 rows
545   // Use default of all samples
546   // Set shuffle to files shuffle
547   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
548   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
549   std::shared_ptr<Dataset> ds = TextFile({tf_file2, tf_file1}, 0, ShuffleMode::kFiles);
550   EXPECT_NE(ds, nullptr);
551 
552   // Create an iterator over the result of the above dataset.
553   // This will trigger the creation of the Execution Tree and launch it.
554   std::shared_ptr<Iterator> iter = ds->CreateIterator();
555   EXPECT_NE(iter, nullptr);
556 
557   // Iterate the dataset and get each row
558   std::unordered_map<std::string, mindspore::MSTensor> row;
559   ASSERT_OK(iter->GetNextRow(&row));
560 
561   EXPECT_NE(row.find("text"), row.end());
562   std::vector<std::string> expected_result = {
563     "This is a text file.", "Be happy every day.", "Good luck to everyone.", "Another file.", "End of file.",
564   };
565 
566   uint64_t i = 0;
567   while (row.size() != 0) {
568     auto text = row["text"];
569     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
570     std::shared_ptr<Tensor> de_text;
571     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
572     std::string_view sv;
573     ASSERT_OK(de_text->GetItemAt(&sv, {}));
574     std::string ss(sv);
575     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
576     // Compare against expected result
577     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
578 
579     i++;
580     ASSERT_OK(iter->GetNextRow(&row));
581   }
582 
583   // Expect 2 + 3 = 5 samples
584   EXPECT_EQ(i, 5);
585 
586   // Manually terminate the pipeline
587   iter->Stop();
588 
589   // Restore configuration
590   GlobalContext::config_manager()->set_seed(original_seed);
591   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
592 }
593 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleFiles4)594 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleFiles4) {
595   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleFiles4.";
596   // Test TextFile Dataset with files shuffle, num_parallel_workers=4
597 
598   // Set configuration
599   uint32_t original_seed = GlobalContext::config_manager()->seed();
600   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
601   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
602   GlobalContext::config_manager()->set_seed(135);
603   GlobalContext::config_manager()->set_num_parallel_workers(4);
604 
605   // Create a TextFile Dataset, with two text files
606   // Note: 1.txt has 3 rows
607   // Note: 2.txt has 2 rows
608   // Use default of all samples
609   // Set shuffle to files shuffle
610   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
611   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
612   std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kFiles);
613   EXPECT_NE(ds, nullptr);
614 
615   // Create an iterator over the result of the above dataset.
616   // This will trigger the creation of the Execution Tree and launch it.
617   std::shared_ptr<Iterator> iter = ds->CreateIterator();
618   EXPECT_NE(iter, nullptr);
619 
620   // Iterate the dataset and get each row
621   std::unordered_map<std::string, mindspore::MSTensor> row;
622   ASSERT_OK(iter->GetNextRow(&row));
623 
624   EXPECT_NE(row.find("text"), row.end());
625   std::vector<std::string> expected_result = {"This is a text file.", "Another file.", "Be happy every day.",
626                                               "End of file.", "Good luck to everyone."};
627 
628   uint64_t i = 0;
629   while (row.size() != 0) {
630     auto text = row["text"];
631     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
632     std::shared_ptr<Tensor> de_text;
633     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
634     std::string_view sv;
635     ASSERT_OK(de_text->GetItemAt(&sv, {}));
636     std::string ss(sv);
637     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
638     // Compare against expected result
639     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
640 
641     i++;
642     ASSERT_OK(iter->GetNextRow(&row));
643   }
644 
645   // Expect 2 + 3 = 5 samples
646   EXPECT_EQ(i, 5);
647 
648   // Manually terminate the pipeline
649   iter->Stop();
650 
651   // Restore configuration
652   GlobalContext::config_manager()->set_seed(original_seed);
653   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
654 }
655 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleGlobal1A)656 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleGlobal1A) {
657   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleGlobal1A.";
658   // Test TextFile Dataset with 1 text file, global shuffle, num_parallel_workers=1
659 
660   // Set configuration
661   uint32_t original_seed = GlobalContext::config_manager()->seed();
662   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
663   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
664   GlobalContext::config_manager()->set_seed(246);
665   GlobalContext::config_manager()->set_num_parallel_workers(1);
666 
667   // Create a TextFile Dataset, with two text files
668   // Note: 1.txt has 3 rows
669   // Set shuffle to global shuffle
670   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
671   std::shared_ptr<Dataset> ds = TextFile({tf_file1}, 0, ShuffleMode::kGlobal);
672   EXPECT_NE(ds, nullptr);
673 
674   // Create an iterator over the result of the above dataset.
675   // This will trigger the creation of the Execution Tree and launch it.
676   std::shared_ptr<Iterator> iter = ds->CreateIterator();
677   EXPECT_NE(iter, nullptr);
678 
679   // Iterate the dataset and get each row
680   std::unordered_map<std::string, mindspore::MSTensor> row;
681   ASSERT_OK(iter->GetNextRow(&row));
682 
683   EXPECT_NE(row.find("text"), row.end());
684   std::vector<std::string> expected_result = {"Good luck to everyone.", "This is a text file.", "Be happy every day."};
685 
686   uint64_t i = 0;
687   while (row.size() != 0) {
688     auto text = row["text"];
689     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
690     std::shared_ptr<Tensor> de_text;
691     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
692     std::string_view sv;
693     ASSERT_OK(de_text->GetItemAt(&sv, {}));
694     std::string ss(sv);
695     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
696     // Compare against expected result
697     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
698 
699     i++;
700     ASSERT_OK(iter->GetNextRow(&row));
701   }
702 
703   // Expect 3 samples
704   EXPECT_EQ(i, 3);
705 
706   // Manually terminate the pipeline
707   iter->Stop();
708 
709   // Restore configuration
710   GlobalContext::config_manager()->set_seed(original_seed);
711   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
712 }
713 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleGlobal1B)714 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleGlobal1B) {
715   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleGlobal1B.";
716   // Test TextFile Dataset with 2 text files, global shuffle, num_parallel_workers=1
717 
718   // Set configuration
719   uint32_t original_seed = GlobalContext::config_manager()->seed();
720   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
721   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
722   GlobalContext::config_manager()->set_seed(246);
723   GlobalContext::config_manager()->set_num_parallel_workers(1);
724 
725   // Create a TextFile Dataset, with two text files
726   // Note: 1.txt has 3 rows
727   // Note: 2.txt has 2 rows
728   // Set shuffle to global shuffle
729   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
730   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
731   std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kGlobal);
732   EXPECT_NE(ds, nullptr);
733 
734   // Create an iterator over the result of the above dataset.
735   // This will trigger the creation of the Execution Tree and launch it.
736   std::shared_ptr<Iterator> iter = ds->CreateIterator();
737   EXPECT_NE(iter, nullptr);
738 
739   // Iterate the dataset and get each row
740   std::unordered_map<std::string, mindspore::MSTensor> row;
741   ASSERT_OK(iter->GetNextRow(&row));
742 
743   EXPECT_NE(row.find("text"), row.end());
744   std::vector<std::string> expected_result = {"Another file.", "Good luck to everyone.", "This is a text file.",
745                                               "End of file.", "Be happy every day."};
746 
747   uint64_t i = 0;
748   while (row.size() != 0) {
749     auto text = row["text"];
750     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
751     std::shared_ptr<Tensor> de_text;
752     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
753     std::string_view sv;
754     ASSERT_OK(de_text->GetItemAt(&sv, {}));
755     std::string ss(sv);
756     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
757     // Compare against expected result
758     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
759 
760     i++;
761     ASSERT_OK(iter->GetNextRow(&row));
762   }
763 
764   // Expect 2 + 3 = 5 samples
765   EXPECT_EQ(i, 5);
766 
767   // Manually terminate the pipeline
768   iter->Stop();
769 
770   // Restore configuration
771   GlobalContext::config_manager()->set_seed(original_seed);
772   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
773 }
774 
TEST_F(MindDataTestPipeline,TestTextFileDatasetShuffleGlobal4)775 TEST_F(MindDataTestPipeline, TestTextFileDatasetShuffleGlobal4) {
776   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestTextFileDatasetShuffleGlobal4.";
777   // Test TextFile Dataset with 2 text files, global shuffle, num_parallel_workers=4
778 
779   // Set configuration
780   uint32_t original_seed = GlobalContext::config_manager()->seed();
781   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
782   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
783   GlobalContext::config_manager()->set_seed(246);
784   GlobalContext::config_manager()->set_num_parallel_workers(4);
785 
786   // Create a TextFile Dataset, with two text files
787   // Note: 1.txt has 3 rows
788   // Note: 2.txt has 2 rows
789   // Set shuffle to global shuffle
790   std::string tf_file1 = datasets_root_path_ + "/testTextFileDataset/1.txt";
791   std::string tf_file2 = datasets_root_path_ + "/testTextFileDataset/2.txt";
792   std::shared_ptr<Dataset> ds = TextFile({tf_file1, tf_file2}, 0, ShuffleMode::kGlobal);
793   EXPECT_NE(ds, nullptr);
794 
795   // Create an iterator over the result of the above dataset.
796   // This will trigger the creation of the Execution Tree and launch it.
797   std::shared_ptr<Iterator> iter = ds->CreateIterator();
798   EXPECT_NE(iter, nullptr);
799 
800   // Iterate the dataset and get each row
801   std::unordered_map<std::string, mindspore::MSTensor> row;
802   ASSERT_OK(iter->GetNextRow(&row));
803 
804   EXPECT_NE(row.find("text"), row.end());
805   std::vector<std::string> expected_result = {"Another file.", "Good luck to everyone.", "End of file.",
806                                               "This is a text file.", "Be happy every day."};
807 
808   uint64_t i = 0;
809   while (row.size() != 0) {
810     auto text = row["text"];
811     MS_LOG(INFO) << "Tensor text shape: " << text.Shape();
812     std::shared_ptr<Tensor> de_text;
813     ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
814     std::string_view sv;
815     ASSERT_OK(de_text->GetItemAt(&sv, {}));
816     std::string ss(sv);
817     MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
818     // Compare against expected result
819     EXPECT_STREQ(ss.c_str(), expected_result[i].c_str());
820 
821     i++;
822     ASSERT_OK(iter->GetNextRow(&row));
823   }
824 
825   // Expect 2 + 3 = 5 samples
826   EXPECT_EQ(i, 5);
827 
828   // Manually terminate the pipeline
829   iter->Stop();
830 
831   // Restore configuration
832   GlobalContext::config_manager()->set_seed(original_seed);
833   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
834 }
835