• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "common/common.h"
17 #include "minddata/dataset/core/global_context.h"
18 #include "minddata/dataset/include/dataset/datasets.h"
19 
20 // need for CsvRecord
21 #include "minddata/dataset/engine/ir/datasetops/source/csv_node.h"
22 
23 using namespace mindspore::dataset;
24 
25 class MindDataTestPipeline : public UT::DatasetOpTesting {
26  protected:
27 };
28 
TEST_F(MindDataTestPipeline,TestCSVDatasetBasic)29 TEST_F(MindDataTestPipeline, TestCSVDatasetBasic) {
30   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetBasic.";
31 
32   // Create a CSVDataset, with single CSV file
33   std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
34   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
35   std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse);
36   EXPECT_NE(ds, nullptr);
37 
38   // Create an iterator over the result of the above dataset
39   // This will trigger the creation of the Execution Tree and launch it.
40   std::shared_ptr<Iterator> iter = ds->CreateIterator();
41   EXPECT_NE(iter, nullptr);
42 
43   // Iterate the dataset and get each row
44   std::unordered_map<std::string, mindspore::MSTensor> row;
45   ASSERT_OK(iter->GetNextRow(&row));
46   EXPECT_NE(row.find("col1"), row.end());
47   std::vector<std::vector<std::string>> expected_result = {
48     {"1", "2", "3", "4"},
49     {"5", "6", "7", "8"},
50     {"9", "10", "11", "12"},
51   };
52 
53   uint64_t i = 0;
54   while (row.size() != 0) {
55     for (int j = 0; j < column_names.size(); j++) {
56       auto text = row[column_names[j]];
57       std::shared_ptr<Tensor> de_text;
58       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
59       std::string_view sv;
60       ASSERT_OK(de_text->GetItemAt(&sv, {}));
61       std::string ss(sv);
62       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
63     }
64     ASSERT_OK(iter->GetNextRow(&row));
65     i++;
66   }
67 
68   // Expect 3 samples
69   EXPECT_EQ(i, 3);
70 
71   // Manually terminate the pipeline
72   iter->Stop();
73 }
74 
TEST_F(MindDataTestPipeline,TestCSVGetters)75 TEST_F(MindDataTestPipeline, TestCSVGetters) {
76   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVGetters.";
77 
78   // Create a CSVDataset, with single CSV file
79   std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
80   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
81   std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse);
82   EXPECT_NE(ds, nullptr);
83 
84   EXPECT_EQ(ds->GetDatasetSize(), 3);
85   EXPECT_EQ(ds->GetColumnNames(), column_names);
86 }
87 
TEST_F(MindDataTestPipeline,TestCSVDatasetMultiFiles)88 TEST_F(MindDataTestPipeline, TestCSVDatasetMultiFiles) {
89   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetMultiFiles.";
90 
91   // Set configuration
92   uint32_t original_seed = GlobalContext::config_manager()->seed();
93   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
94   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
95   GlobalContext::config_manager()->set_seed(111);
96   GlobalContext::config_manager()->set_num_parallel_workers(4);
97 
98   // Create a CSVDataset, with single CSV file
99   std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
100   std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
101   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
102   std::shared_ptr<Dataset> ds = CSV({file1, file2}, ',', {}, column_names, 0, ShuffleMode::kGlobal);
103   EXPECT_NE(ds, nullptr);
104 
105   // Create an iterator over the result of the above dataset
106   // This will trigger the creation of the Execution Tree and launch it.
107   std::shared_ptr<Iterator> iter = ds->CreateIterator();
108   EXPECT_NE(iter, nullptr);
109 
110   // Iterate the dataset and get each row
111   std::unordered_map<std::string, mindspore::MSTensor> row;
112   ASSERT_OK(iter->GetNextRow(&row));
113   EXPECT_NE(row.find("col1"), row.end());
114   std::vector<std::vector<std::string>> expected_result = {
115     {"17", "18", "19", "20"}, {"1", "2", "3", "4"},     {"5", "6", "7", "8"},
116     {"13", "14", "15", "16"}, {"21", "22", "23", "24"}, {"9", "10", "11", "12"},
117   };
118 
119   uint64_t i = 0;
120   while (row.size() != 0) {
121     for (int j = 0; j < column_names.size(); j++) {
122       auto text = row[column_names[j]];
123       std::shared_ptr<Tensor> de_text;
124       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
125       std::string_view sv;
126       ASSERT_OK(de_text->GetItemAt(&sv, {}));
127       std::string ss(sv);
128       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
129     }
130     ASSERT_OK(iter->GetNextRow(&row));
131     i++;
132   }
133 
134   // Expect 6 samples
135   EXPECT_EQ(i, 6);
136 
137   // Manually terminate the pipeline
138   iter->Stop();
139 
140   // Restore configuration
141   GlobalContext::config_manager()->set_seed(original_seed);
142   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
143 }
144 
TEST_F(MindDataTestPipeline,TestCSVDatasetNumSamples)145 TEST_F(MindDataTestPipeline, TestCSVDatasetNumSamples) {
146   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetNumSamples.";
147 
148   // Create a CSVDataset, with single CSV file
149   std::string file = datasets_root_path_ + "/testCSV/1.csv";
150   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
151   std::shared_ptr<Dataset> ds = CSV({file}, ',', {}, column_names, 2, ShuffleMode::kFalse);
152   EXPECT_NE(ds, nullptr);
153 
154   // Create an iterator over the result of the above dataset
155   // This will trigger the creation of the Execution Tree and launch it.
156   std::shared_ptr<Iterator> iter = ds->CreateIterator();
157   EXPECT_NE(iter, nullptr);
158 
159   // Iterate the dataset and get each row
160   std::unordered_map<std::string, mindspore::MSTensor> row;
161   ASSERT_OK(iter->GetNextRow(&row));
162   EXPECT_NE(row.find("col1"), row.end());
163   std::vector<std::vector<std::string>> expected_result = {{"1", "2", "3", "4"}, {"5", "6", "7", "8"}};
164 
165   uint64_t i = 0;
166   while (row.size() != 0) {
167     for (int j = 0; j < column_names.size(); j++) {
168       auto text = row[column_names[j]];
169       std::shared_ptr<Tensor> de_text;
170       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
171       std::string_view sv;
172       ASSERT_OK(de_text->GetItemAt(&sv, {}));
173       std::string ss(sv);
174       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
175     }
176     ASSERT_OK(iter->GetNextRow(&row));
177     i++;
178   }
179 
180   // Expect 2 samples
181   EXPECT_EQ(i, 2);
182 
183   // Manually terminate the pipeline
184   iter->Stop();
185 }
186 
TEST_F(MindDataTestPipeline,TestCSVDatasetDistribution)187 TEST_F(MindDataTestPipeline, TestCSVDatasetDistribution) {
188   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetDistribution.";
189 
190   // Create a CSVDataset, with single CSV file
191   std::string file = datasets_root_path_ + "/testCSV/1.csv";
192   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
193   std::shared_ptr<Dataset> ds = CSV({file}, ',', {}, column_names, 0, ShuffleMode::kFalse, 2, 0);
194   EXPECT_NE(ds, nullptr);
195 
196   // Create an iterator over the result of the above dataset
197   // This will trigger the creation of the Execution Tree and launch it.
198   std::shared_ptr<Iterator> iter = ds->CreateIterator();
199   EXPECT_NE(iter, nullptr);
200 
201   // Iterate the dataset and get each row
202   std::unordered_map<std::string, mindspore::MSTensor> row;
203   ASSERT_OK(iter->GetNextRow(&row));
204   EXPECT_NE(row.find("col1"), row.end());
205   std::vector<std::vector<std::string>> expected_result = {{"1", "2", "3", "4"}, {"5", "6", "7", "8"}};
206 
207   uint64_t i = 0;
208   while (row.size() != 0) {
209     for (int j = 0; j < column_names.size(); j++) {
210       auto text = row[column_names[j]];
211       std::shared_ptr<Tensor> de_text;
212       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
213       std::string_view sv;
214       ASSERT_OK(de_text->GetItemAt(&sv, {}));
215       std::string ss(sv);
216       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
217     }
218     ASSERT_OK(iter->GetNextRow(&row));
219     i++;
220   }
221 
222   // Expect 2 samples
223   EXPECT_EQ(i, 2);
224 
225   // Manually terminate the pipeline
226   iter->Stop();
227 }
228 
TEST_F(MindDataTestPipeline,TestCSVDatasetType)229 TEST_F(MindDataTestPipeline, TestCSVDatasetType) {
230   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetType.";
231 
232   // Create a CSVDataset, with single CSV file
233   std::string file = datasets_root_path_ + "/testCSV/default.csv";
234   std::vector<std::shared_ptr<CsvBase>> colum_type = {
235     std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
236     std::make_shared<CsvRecord<int>>(CsvType::INT, 0),
237     std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 0.0),
238     std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
239   };
240   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
241   std::shared_ptr<Dataset> ds = CSV({file}, ',', colum_type, column_names, 0, ShuffleMode::kFalse);
242   EXPECT_NE(ds, nullptr);
243 
244   // Create an iterator over the result of the above dataset
245   // This will trigger the creation of the Execution Tree and launch it.
246   std::shared_ptr<Iterator> iter = ds->CreateIterator();
247   EXPECT_NE(iter, nullptr);
248 
249   // Iterate the dataset and get each row
250   std::unordered_map<std::string, mindspore::MSTensor> row;
251   ASSERT_OK(iter->GetNextRow(&row));
252   std::vector<std::vector<std::shared_ptr<CsvBase>>> expected = {
253     {
254       std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
255       std::make_shared<CsvRecord<int>>(CsvType::INT, 2),
256       std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 3.0),
257       std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
258     },
259     {
260       std::make_shared<CsvRecord<std::string>>(CsvType::STRING, "a"),
261       std::make_shared<CsvRecord<int>>(CsvType::INT, 4),
262       std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 5.0),
263       std::make_shared<CsvRecord<std::string>>(CsvType::STRING, "b"),
264     },
265   };
266   EXPECT_NE(row.find("col1"), row.end());
267 
268   uint64_t i = 0;
269   while (row.size() != 0) {
270     for (int j = 0; j < column_names.size(); j++) {
271       auto text = row[column_names[j]];
272       std::shared_ptr<Tensor> de_text;
273       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
274       if (colum_type[j]->type == CsvType::INT) {
275         int val;
276         ASSERT_OK(de_text->GetItemAt(&val, {}));
277         EXPECT_EQ(val, std::dynamic_pointer_cast<CsvRecord<int>>(expected[i][j])->value);
278       } else if (colum_type[j]->type == CsvType::FLOAT) {
279         float val;
280         ASSERT_OK(de_text->GetItemAt(&val, {}));
281         EXPECT_EQ(val, std::dynamic_pointer_cast<CsvRecord<float>>(expected[i][j])->value);
282       } else if (colum_type[j]->type == CsvType::STRING) {
283         std::string_view sv;
284         ASSERT_OK(de_text->GetItemAt(&sv, {}));
285         std::string ss(sv);
286         EXPECT_STREQ(ss.c_str(), std::dynamic_pointer_cast<CsvRecord<std::string>>(expected[i][j])->value.c_str());
287       }
288     }
289     ASSERT_OK(iter->GetNextRow(&row));
290     i++;
291   }
292 
293   // Expect 2 samples
294   EXPECT_EQ(i, 2);
295 
296   // Manually terminate the pipeline
297   iter->Stop();
298 }
299 
TEST_F(MindDataTestPipeline,TestCSVDatasetHeader)300 TEST_F(MindDataTestPipeline, TestCSVDatasetHeader) {
301   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetHeader.";
302 
303   // Create a CSVDataset, with single CSV file
304   std::string train_file = datasets_root_path_ + "/testCSV/header.csv";
305   std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, {});
306   EXPECT_NE(ds, nullptr);
307 
308   // Create an iterator over the result of the above dataset
309   // This will trigger the creation of the Execution Tree and launch it.
310   std::shared_ptr<Iterator> iter = ds->CreateIterator();
311   EXPECT_NE(iter, nullptr);
312 
313   // Iterate the dataset and get each row
314   std::unordered_map<std::string, mindspore::MSTensor> row;
315   ASSERT_OK(iter->GetNextRow(&row));
316   EXPECT_NE(row.find("col1"), row.end());
317   std::vector<std::vector<std::string>> expected_result = {
318     {"a", "b", "c", "d"},
319   };
320 
321   uint64_t i = 0;
322   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
323   while (row.size() != 0) {
324     for (int j = 0; j < column_names.size(); j++) {
325       auto text = row[column_names[j]];
326       std::shared_ptr<Tensor> de_text;
327       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
328       std::string_view sv;
329       ASSERT_OK(de_text->GetItemAt(&sv, {}));
330       std::string ss(sv);
331       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
332     }
333     ASSERT_OK(iter->GetNextRow(&row));
334     i++;
335   }
336 
337   // Expect 3 samples
338   EXPECT_EQ(i, 1);
339 
340   // Manually terminate the pipeline
341   iter->Stop();
342 }
343 
TEST_F(MindDataTestPipeline,TestCSVDatasetFail)344 TEST_F(MindDataTestPipeline, TestCSVDatasetFail) {
345   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetFail.";
346   // Create a CSV Dataset
347   std::string file = datasets_root_path_ + "/testCSV/1.csv";
348   std::string invalid_csv_file = "./NotExistFile";
349   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
350 
351   // Test empty file list
352   std::shared_ptr<Dataset> ds0 = CSV({});
353   EXPECT_NE(ds0, nullptr);
354   // Create an iterator over the result of the above dataset
355   std::shared_ptr<Iterator> iter0 = ds0->CreateIterator();
356   // Expect failure: invalid CSV input
357   EXPECT_EQ(iter0, nullptr);
358 
359   // Test invalid file
360   std::shared_ptr<Dataset> ds1 = CSV({invalid_csv_file});
361   EXPECT_NE(ds1, nullptr);
362   // Create an iterator over the result of the above dataset
363   std::shared_ptr<Iterator> iter1 = ds1->CreateIterator();
364   // Expect failure: invalid CSV input
365   EXPECT_EQ(iter1, nullptr);
366 
367   // Test invalid num_samples < -1
368   std::shared_ptr<Dataset> ds2 = CSV({file}, ',', {}, column_names, -1);
369   EXPECT_NE(ds2, nullptr);
370   // Create an iterator over the result of the above dataset
371   std::shared_ptr<Iterator> iter2 = ds2->CreateIterator();
372   // Expect failure: invalid CSV input
373   EXPECT_EQ(iter2, nullptr);
374 
375   // Test invalid num_shards < 1
376   std::shared_ptr<Dataset> ds3 = CSV({file}, ',', {}, column_names, 0, ShuffleMode::kFalse, 0);
377   EXPECT_NE(ds3, nullptr);
378   // Create an iterator over the result of the above dataset
379   std::shared_ptr<Iterator> iter3 = ds3->CreateIterator();
380   // Expect failure: invalid CSV input
381   EXPECT_EQ(iter3, nullptr);
382 
383   // Test invalid shard_id >= num_shards
384   std::shared_ptr<Dataset> ds4 = CSV({file}, ',', {}, column_names, 0, ShuffleMode::kFalse, 2, 2);
385   EXPECT_NE(ds4, nullptr);
386   // Create an iterator over the result of the above dataset
387   std::shared_ptr<Iterator> iter4 = ds4->CreateIterator();
388   // Expect failure: invalid CSV input
389   EXPECT_EQ(iter4, nullptr);
390 
391   // Test invalid field_delim
392   std::shared_ptr<Dataset> ds5 = CSV({file}, '"', {}, column_names);
393   EXPECT_NE(ds5, nullptr);
394   // Create an iterator over the result of the above dataset
395   std::shared_ptr<Iterator> iter5 = ds5->CreateIterator();
396   // Expect failure: invalid CSV input
397   EXPECT_EQ(iter5, nullptr);
398 }
399 
TEST_F(MindDataTestPipeline,TestCSVDatasetShuffleFilesA)400 TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleFilesA) {
401   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleFilesA.";
402 
403   // Set configuration
404   uint32_t original_seed = GlobalContext::config_manager()->seed();
405   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
406   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
407   GlobalContext::config_manager()->set_seed(130);
408   GlobalContext::config_manager()->set_num_parallel_workers(4);
409 
410   // Create a CSVDataset, with 2 CSV files, 1.csv and append.csv in lexicographical order
411   std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
412   std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
413   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
414   std::shared_ptr<Dataset> ds = CSV({file1, file2}, ',', {}, column_names, 0, ShuffleMode::kFiles);
415   EXPECT_NE(ds, nullptr);
416 
417   // Create an iterator over the result of the above dataset
418   // This will trigger the creation of the Execution Tree and launch it.
419   std::shared_ptr<Iterator> iter = ds->CreateIterator();
420   EXPECT_NE(iter, nullptr);
421 
422   // Iterate the dataset and get each row
423   std::unordered_map<std::string, mindspore::MSTensor> row;
424   ASSERT_OK(iter->GetNextRow(&row));
425   EXPECT_NE(row.find("col1"), row.end());
426   std::vector<std::vector<std::string>> expected_result = {
427     {"13", "14", "15", "16"}, {"1", "2", "3", "4"},     {"17", "18", "19", "20"},
428     {"5", "6", "7", "8"},     {"21", "22", "23", "24"}, {"9", "10", "11", "12"},
429   };
430 
431   uint64_t i = 0;
432   while (row.size() != 0) {
433     for (int j = 0; j < column_names.size(); j++) {
434       auto text = row[column_names[j]];
435       std::shared_ptr<Tensor> de_text;
436       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
437       std::string_view sv;
438       ASSERT_OK(de_text->GetItemAt(&sv, {}));
439       std::string ss(sv);
440       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
441     }
442     ASSERT_OK(iter->GetNextRow(&row));
443     i++;
444   }
445 
446   // Expect 6 samples
447   EXPECT_EQ(i, 6);
448 
449   // Manually terminate the pipeline
450   iter->Stop();
451 
452   // Restore configuration
453   GlobalContext::config_manager()->set_seed(original_seed);
454   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
455 }
456 
TEST_F(MindDataTestPipeline,TestCSVDatasetShuffleFilesB)457 TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleFilesB) {
458   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleFilesB.";
459 
460   // Set configuration
461   uint32_t original_seed = GlobalContext::config_manager()->seed();
462   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
463   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
464   GlobalContext::config_manager()->set_seed(130);
465   GlobalContext::config_manager()->set_num_parallel_workers(4);
466 
467   // Create a CSVDataset, with 2 CSV files, append.csv and 1.csv in non-lexicographical order
468   std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
469   std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
470   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
471   std::shared_ptr<Dataset> ds = CSV({file2, file1}, ',', {}, column_names, 0, ShuffleMode::kFiles);
472   EXPECT_NE(ds, nullptr);
473 
474   // Create an iterator over the result of the above dataset
475   // This will trigger the creation of the Execution Tree and launch it.
476   std::shared_ptr<Iterator> iter = ds->CreateIterator();
477   EXPECT_NE(iter, nullptr);
478 
479   // Iterate the dataset and get each row
480   std::unordered_map<std::string, mindspore::MSTensor> row;
481   ASSERT_OK(iter->GetNextRow(&row));
482   EXPECT_NE(row.find("col1"), row.end());
483   std::vector<std::vector<std::string>> expected_result = {
484     {"13", "14", "15", "16"}, {"1", "2", "3", "4"},     {"17", "18", "19", "20"},
485     {"5", "6", "7", "8"},     {"21", "22", "23", "24"}, {"9", "10", "11", "12"},
486   };
487 
488   uint64_t i = 0;
489   while (row.size() != 0) {
490     for (int j = 0; j < column_names.size(); j++) {
491       auto text = row[column_names[j]];
492       std::shared_ptr<Tensor> de_text;
493       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
494       std::string_view sv;
495       ASSERT_OK(de_text->GetItemAt(&sv, {}));
496       std::string ss(sv);
497       MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
498       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
499     }
500     ASSERT_OK(iter->GetNextRow(&row));
501     i++;
502   }
503 
504   // Expect 6 samples
505   EXPECT_EQ(i, 6);
506 
507   // Manually terminate the pipeline
508   iter->Stop();
509 
510   // Restore configuration
511   GlobalContext::config_manager()->set_seed(original_seed);
512   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
513 }
514 
TEST_F(MindDataTestPipeline,TestCSVDatasetShuffleGlobal)515 TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleGlobal) {
516   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleGlobal.";
517   // Test CSV Dataset with GLOBLE shuffle
518 
519   // Set configuration
520   uint32_t original_seed = GlobalContext::config_manager()->seed();
521   uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
522   MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
523   GlobalContext::config_manager()->set_seed(135);
524   GlobalContext::config_manager()->set_num_parallel_workers(4);
525 
526   // Create a CSVFile Dataset, with single CSV file
527   std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
528   std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
529   std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kGlobal);
530   EXPECT_NE(ds, nullptr);
531 
532   // Create an iterator over the result of the above dataset
533   // This will trigger the creation of the Execution Tree and launch it.
534   std::shared_ptr<Iterator> iter = ds->CreateIterator();
535   EXPECT_NE(iter, nullptr);
536 
537   // Iterate the dataset and get each row
538   std::unordered_map<std::string, mindspore::MSTensor> row;
539   ASSERT_OK(iter->GetNextRow(&row));
540   EXPECT_NE(row.find("col1"), row.end());
541   std::vector<std::vector<std::string>> expected_result = {
542     {"5", "6", "7", "8"}, {"9", "10", "11", "12"}, {"1", "2", "3", "4"}};
543 
544   uint64_t i = 0;
545   while (row.size() != 0) {
546     for (int j = 0; j < column_names.size(); j++) {
547       auto text = row[column_names[j]];
548       std::shared_ptr<Tensor> de_text;
549       ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
550       std::string_view sv;
551       ASSERT_OK(de_text->GetItemAt(&sv, {}));
552       std::string ss(sv);
553       EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
554     }
555     ASSERT_OK(iter->GetNextRow(&row));
556     i++;
557   }
558 
559   // Expect 3 samples
560   EXPECT_EQ(i, 3);
561 
562   // Manually terminate the pipeline
563   iter->Stop();
564 
565   // Restore configuration
566   GlobalContext::config_manager()->set_seed(original_seed);
567   GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
568 }
569 
TEST_F(MindDataTestPipeline,TestCSVDatasetDuplicateColumnNameFail)570 TEST_F(MindDataTestPipeline, TestCSVDatasetDuplicateColumnNameFail) {
571   MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetDuplicateColumnNameFail.";
572 
573   // Create a CSVDataset, with single CSV file
574   std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
575   std::vector<std::string> column_names = {"col1", "col1", "col3", "col4"};
576   std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse);
577   EXPECT_NE(ds, nullptr);
578 
579   // Create an iterator over the result of the above dataset
580   std::shared_ptr<Iterator> iter = ds->CreateIterator();
581   // Expect failure: invalid CSV input, duplicate column names
582   EXPECT_EQ(iter, nullptr);
583 }
584