1 /**
2 * Copyright 2020-2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "common/common.h"
17 #include "minddata/dataset/core/global_context.h"
18 #include "minddata/dataset/include/dataset/datasets.h"
19
20 // need for CsvRecord
21 #include "minddata/dataset/engine/ir/datasetops/source/csv_node.h"
22
23 using namespace mindspore::dataset;
24
25 class MindDataTestPipeline : public UT::DatasetOpTesting {
26 protected:
27 };
28
TEST_F(MindDataTestPipeline,TestCSVDatasetBasic)29 TEST_F(MindDataTestPipeline, TestCSVDatasetBasic) {
30 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetBasic.";
31
32 // Create a CSVDataset, with single CSV file
33 std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
34 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
35 std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse);
36 EXPECT_NE(ds, nullptr);
37
38 // Create an iterator over the result of the above dataset
39 // This will trigger the creation of the Execution Tree and launch it.
40 std::shared_ptr<Iterator> iter = ds->CreateIterator();
41 EXPECT_NE(iter, nullptr);
42
43 // Iterate the dataset and get each row
44 std::unordered_map<std::string, mindspore::MSTensor> row;
45 ASSERT_OK(iter->GetNextRow(&row));
46 EXPECT_NE(row.find("col1"), row.end());
47 std::vector<std::vector<std::string>> expected_result = {
48 {"1", "2", "3", "4"},
49 {"5", "6", "7", "8"},
50 {"9", "10", "11", "12"},
51 };
52
53 uint64_t i = 0;
54 while (row.size() != 0) {
55 for (int j = 0; j < column_names.size(); j++) {
56 auto text = row[column_names[j]];
57 std::shared_ptr<Tensor> de_text;
58 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
59 std::string_view sv;
60 ASSERT_OK(de_text->GetItemAt(&sv, {}));
61 std::string ss(sv);
62 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
63 }
64 ASSERT_OK(iter->GetNextRow(&row));
65 i++;
66 }
67
68 // Expect 3 samples
69 EXPECT_EQ(i, 3);
70
71 // Manually terminate the pipeline
72 iter->Stop();
73 }
74
TEST_F(MindDataTestPipeline,TestCSVGetters)75 TEST_F(MindDataTestPipeline, TestCSVGetters) {
76 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVGetters.";
77
78 // Create a CSVDataset, with single CSV file
79 std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
80 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
81 std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse);
82 EXPECT_NE(ds, nullptr);
83
84 EXPECT_EQ(ds->GetDatasetSize(), 3);
85 EXPECT_EQ(ds->GetColumnNames(), column_names);
86 }
87
TEST_F(MindDataTestPipeline,TestCSVDatasetMultiFiles)88 TEST_F(MindDataTestPipeline, TestCSVDatasetMultiFiles) {
89 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetMultiFiles.";
90
91 // Set configuration
92 uint32_t original_seed = GlobalContext::config_manager()->seed();
93 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
94 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
95 GlobalContext::config_manager()->set_seed(111);
96 GlobalContext::config_manager()->set_num_parallel_workers(4);
97
98 // Create a CSVDataset, with single CSV file
99 std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
100 std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
101 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
102 std::shared_ptr<Dataset> ds = CSV({file1, file2}, ',', {}, column_names, 0, ShuffleMode::kGlobal);
103 EXPECT_NE(ds, nullptr);
104
105 // Create an iterator over the result of the above dataset
106 // This will trigger the creation of the Execution Tree and launch it.
107 std::shared_ptr<Iterator> iter = ds->CreateIterator();
108 EXPECT_NE(iter, nullptr);
109
110 // Iterate the dataset and get each row
111 std::unordered_map<std::string, mindspore::MSTensor> row;
112 ASSERT_OK(iter->GetNextRow(&row));
113 EXPECT_NE(row.find("col1"), row.end());
114 std::vector<std::vector<std::string>> expected_result = {
115 {"17", "18", "19", "20"}, {"1", "2", "3", "4"}, {"5", "6", "7", "8"},
116 {"13", "14", "15", "16"}, {"21", "22", "23", "24"}, {"9", "10", "11", "12"},
117 };
118
119 uint64_t i = 0;
120 while (row.size() != 0) {
121 for (int j = 0; j < column_names.size(); j++) {
122 auto text = row[column_names[j]];
123 std::shared_ptr<Tensor> de_text;
124 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
125 std::string_view sv;
126 ASSERT_OK(de_text->GetItemAt(&sv, {}));
127 std::string ss(sv);
128 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
129 }
130 ASSERT_OK(iter->GetNextRow(&row));
131 i++;
132 }
133
134 // Expect 6 samples
135 EXPECT_EQ(i, 6);
136
137 // Manually terminate the pipeline
138 iter->Stop();
139
140 // Restore configuration
141 GlobalContext::config_manager()->set_seed(original_seed);
142 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
143 }
144
TEST_F(MindDataTestPipeline,TestCSVDatasetNumSamples)145 TEST_F(MindDataTestPipeline, TestCSVDatasetNumSamples) {
146 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetNumSamples.";
147
148 // Create a CSVDataset, with single CSV file
149 std::string file = datasets_root_path_ + "/testCSV/1.csv";
150 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
151 std::shared_ptr<Dataset> ds = CSV({file}, ',', {}, column_names, 2, ShuffleMode::kFalse);
152 EXPECT_NE(ds, nullptr);
153
154 // Create an iterator over the result of the above dataset
155 // This will trigger the creation of the Execution Tree and launch it.
156 std::shared_ptr<Iterator> iter = ds->CreateIterator();
157 EXPECT_NE(iter, nullptr);
158
159 // Iterate the dataset and get each row
160 std::unordered_map<std::string, mindspore::MSTensor> row;
161 ASSERT_OK(iter->GetNextRow(&row));
162 EXPECT_NE(row.find("col1"), row.end());
163 std::vector<std::vector<std::string>> expected_result = {{"1", "2", "3", "4"}, {"5", "6", "7", "8"}};
164
165 uint64_t i = 0;
166 while (row.size() != 0) {
167 for (int j = 0; j < column_names.size(); j++) {
168 auto text = row[column_names[j]];
169 std::shared_ptr<Tensor> de_text;
170 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
171 std::string_view sv;
172 ASSERT_OK(de_text->GetItemAt(&sv, {}));
173 std::string ss(sv);
174 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
175 }
176 ASSERT_OK(iter->GetNextRow(&row));
177 i++;
178 }
179
180 // Expect 2 samples
181 EXPECT_EQ(i, 2);
182
183 // Manually terminate the pipeline
184 iter->Stop();
185 }
186
TEST_F(MindDataTestPipeline,TestCSVDatasetDistribution)187 TEST_F(MindDataTestPipeline, TestCSVDatasetDistribution) {
188 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetDistribution.";
189
190 // Create a CSVDataset, with single CSV file
191 std::string file = datasets_root_path_ + "/testCSV/1.csv";
192 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
193 std::shared_ptr<Dataset> ds = CSV({file}, ',', {}, column_names, 0, ShuffleMode::kFalse, 2, 0);
194 EXPECT_NE(ds, nullptr);
195
196 // Create an iterator over the result of the above dataset
197 // This will trigger the creation of the Execution Tree and launch it.
198 std::shared_ptr<Iterator> iter = ds->CreateIterator();
199 EXPECT_NE(iter, nullptr);
200
201 // Iterate the dataset and get each row
202 std::unordered_map<std::string, mindspore::MSTensor> row;
203 ASSERT_OK(iter->GetNextRow(&row));
204 EXPECT_NE(row.find("col1"), row.end());
205 std::vector<std::vector<std::string>> expected_result = {{"1", "2", "3", "4"}, {"5", "6", "7", "8"}};
206
207 uint64_t i = 0;
208 while (row.size() != 0) {
209 for (int j = 0; j < column_names.size(); j++) {
210 auto text = row[column_names[j]];
211 std::shared_ptr<Tensor> de_text;
212 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
213 std::string_view sv;
214 ASSERT_OK(de_text->GetItemAt(&sv, {}));
215 std::string ss(sv);
216 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
217 }
218 ASSERT_OK(iter->GetNextRow(&row));
219 i++;
220 }
221
222 // Expect 2 samples
223 EXPECT_EQ(i, 2);
224
225 // Manually terminate the pipeline
226 iter->Stop();
227 }
228
TEST_F(MindDataTestPipeline,TestCSVDatasetType)229 TEST_F(MindDataTestPipeline, TestCSVDatasetType) {
230 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetType.";
231
232 // Create a CSVDataset, with single CSV file
233 std::string file = datasets_root_path_ + "/testCSV/default.csv";
234 std::vector<std::shared_ptr<CsvBase>> colum_type = {
235 std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
236 std::make_shared<CsvRecord<int>>(CsvType::INT, 0),
237 std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 0.0),
238 std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
239 };
240 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
241 std::shared_ptr<Dataset> ds = CSV({file}, ',', colum_type, column_names, 0, ShuffleMode::kFalse);
242 EXPECT_NE(ds, nullptr);
243
244 // Create an iterator over the result of the above dataset
245 // This will trigger the creation of the Execution Tree and launch it.
246 std::shared_ptr<Iterator> iter = ds->CreateIterator();
247 EXPECT_NE(iter, nullptr);
248
249 // Iterate the dataset and get each row
250 std::unordered_map<std::string, mindspore::MSTensor> row;
251 ASSERT_OK(iter->GetNextRow(&row));
252 std::vector<std::vector<std::shared_ptr<CsvBase>>> expected = {
253 {
254 std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
255 std::make_shared<CsvRecord<int>>(CsvType::INT, 2),
256 std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 3.0),
257 std::make_shared<CsvRecord<std::string>>(CsvType::STRING, ""),
258 },
259 {
260 std::make_shared<CsvRecord<std::string>>(CsvType::STRING, "a"),
261 std::make_shared<CsvRecord<int>>(CsvType::INT, 4),
262 std::make_shared<CsvRecord<float>>(CsvType::FLOAT, 5.0),
263 std::make_shared<CsvRecord<std::string>>(CsvType::STRING, "b"),
264 },
265 };
266 EXPECT_NE(row.find("col1"), row.end());
267
268 uint64_t i = 0;
269 while (row.size() != 0) {
270 for (int j = 0; j < column_names.size(); j++) {
271 auto text = row[column_names[j]];
272 std::shared_ptr<Tensor> de_text;
273 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
274 if (colum_type[j]->type == CsvType::INT) {
275 int val;
276 ASSERT_OK(de_text->GetItemAt(&val, {}));
277 EXPECT_EQ(val, std::dynamic_pointer_cast<CsvRecord<int>>(expected[i][j])->value);
278 } else if (colum_type[j]->type == CsvType::FLOAT) {
279 float val;
280 ASSERT_OK(de_text->GetItemAt(&val, {}));
281 EXPECT_EQ(val, std::dynamic_pointer_cast<CsvRecord<float>>(expected[i][j])->value);
282 } else if (colum_type[j]->type == CsvType::STRING) {
283 std::string_view sv;
284 ASSERT_OK(de_text->GetItemAt(&sv, {}));
285 std::string ss(sv);
286 EXPECT_STREQ(ss.c_str(), std::dynamic_pointer_cast<CsvRecord<std::string>>(expected[i][j])->value.c_str());
287 }
288 }
289 ASSERT_OK(iter->GetNextRow(&row));
290 i++;
291 }
292
293 // Expect 2 samples
294 EXPECT_EQ(i, 2);
295
296 // Manually terminate the pipeline
297 iter->Stop();
298 }
299
TEST_F(MindDataTestPipeline,TestCSVDatasetHeader)300 TEST_F(MindDataTestPipeline, TestCSVDatasetHeader) {
301 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetHeader.";
302
303 // Create a CSVDataset, with single CSV file
304 std::string train_file = datasets_root_path_ + "/testCSV/header.csv";
305 std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, {});
306 EXPECT_NE(ds, nullptr);
307
308 // Create an iterator over the result of the above dataset
309 // This will trigger the creation of the Execution Tree and launch it.
310 std::shared_ptr<Iterator> iter = ds->CreateIterator();
311 EXPECT_NE(iter, nullptr);
312
313 // Iterate the dataset and get each row
314 std::unordered_map<std::string, mindspore::MSTensor> row;
315 ASSERT_OK(iter->GetNextRow(&row));
316 EXPECT_NE(row.find("col1"), row.end());
317 std::vector<std::vector<std::string>> expected_result = {
318 {"a", "b", "c", "d"},
319 };
320
321 uint64_t i = 0;
322 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
323 while (row.size() != 0) {
324 for (int j = 0; j < column_names.size(); j++) {
325 auto text = row[column_names[j]];
326 std::shared_ptr<Tensor> de_text;
327 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
328 std::string_view sv;
329 ASSERT_OK(de_text->GetItemAt(&sv, {}));
330 std::string ss(sv);
331 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
332 }
333 ASSERT_OK(iter->GetNextRow(&row));
334 i++;
335 }
336
337 // Expect 3 samples
338 EXPECT_EQ(i, 1);
339
340 // Manually terminate the pipeline
341 iter->Stop();
342 }
343
TEST_F(MindDataTestPipeline,TestCSVDatasetFail)344 TEST_F(MindDataTestPipeline, TestCSVDatasetFail) {
345 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetFail.";
346 // Create a CSV Dataset
347 std::string file = datasets_root_path_ + "/testCSV/1.csv";
348 std::string invalid_csv_file = "./NotExistFile";
349 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
350
351 // Test empty file list
352 std::shared_ptr<Dataset> ds0 = CSV({});
353 EXPECT_NE(ds0, nullptr);
354 // Create an iterator over the result of the above dataset
355 std::shared_ptr<Iterator> iter0 = ds0->CreateIterator();
356 // Expect failure: invalid CSV input
357 EXPECT_EQ(iter0, nullptr);
358
359 // Test invalid file
360 std::shared_ptr<Dataset> ds1 = CSV({invalid_csv_file});
361 EXPECT_NE(ds1, nullptr);
362 // Create an iterator over the result of the above dataset
363 std::shared_ptr<Iterator> iter1 = ds1->CreateIterator();
364 // Expect failure: invalid CSV input
365 EXPECT_EQ(iter1, nullptr);
366
367 // Test invalid num_samples < -1
368 std::shared_ptr<Dataset> ds2 = CSV({file}, ',', {}, column_names, -1);
369 EXPECT_NE(ds2, nullptr);
370 // Create an iterator over the result of the above dataset
371 std::shared_ptr<Iterator> iter2 = ds2->CreateIterator();
372 // Expect failure: invalid CSV input
373 EXPECT_EQ(iter2, nullptr);
374
375 // Test invalid num_shards < 1
376 std::shared_ptr<Dataset> ds3 = CSV({file}, ',', {}, column_names, 0, ShuffleMode::kFalse, 0);
377 EXPECT_NE(ds3, nullptr);
378 // Create an iterator over the result of the above dataset
379 std::shared_ptr<Iterator> iter3 = ds3->CreateIterator();
380 // Expect failure: invalid CSV input
381 EXPECT_EQ(iter3, nullptr);
382
383 // Test invalid shard_id >= num_shards
384 std::shared_ptr<Dataset> ds4 = CSV({file}, ',', {}, column_names, 0, ShuffleMode::kFalse, 2, 2);
385 EXPECT_NE(ds4, nullptr);
386 // Create an iterator over the result of the above dataset
387 std::shared_ptr<Iterator> iter4 = ds4->CreateIterator();
388 // Expect failure: invalid CSV input
389 EXPECT_EQ(iter4, nullptr);
390
391 // Test invalid field_delim
392 std::shared_ptr<Dataset> ds5 = CSV({file}, '"', {}, column_names);
393 EXPECT_NE(ds5, nullptr);
394 // Create an iterator over the result of the above dataset
395 std::shared_ptr<Iterator> iter5 = ds5->CreateIterator();
396 // Expect failure: invalid CSV input
397 EXPECT_EQ(iter5, nullptr);
398 }
399
TEST_F(MindDataTestPipeline,TestCSVDatasetShuffleFilesA)400 TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleFilesA) {
401 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleFilesA.";
402
403 // Set configuration
404 uint32_t original_seed = GlobalContext::config_manager()->seed();
405 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
406 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
407 GlobalContext::config_manager()->set_seed(130);
408 GlobalContext::config_manager()->set_num_parallel_workers(4);
409
410 // Create a CSVDataset, with 2 CSV files, 1.csv and append.csv in lexicographical order
411 std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
412 std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
413 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
414 std::shared_ptr<Dataset> ds = CSV({file1, file2}, ',', {}, column_names, 0, ShuffleMode::kFiles);
415 EXPECT_NE(ds, nullptr);
416
417 // Create an iterator over the result of the above dataset
418 // This will trigger the creation of the Execution Tree and launch it.
419 std::shared_ptr<Iterator> iter = ds->CreateIterator();
420 EXPECT_NE(iter, nullptr);
421
422 // Iterate the dataset and get each row
423 std::unordered_map<std::string, mindspore::MSTensor> row;
424 ASSERT_OK(iter->GetNextRow(&row));
425 EXPECT_NE(row.find("col1"), row.end());
426 std::vector<std::vector<std::string>> expected_result = {
427 {"13", "14", "15", "16"}, {"1", "2", "3", "4"}, {"17", "18", "19", "20"},
428 {"5", "6", "7", "8"}, {"21", "22", "23", "24"}, {"9", "10", "11", "12"},
429 };
430
431 uint64_t i = 0;
432 while (row.size() != 0) {
433 for (int j = 0; j < column_names.size(); j++) {
434 auto text = row[column_names[j]];
435 std::shared_ptr<Tensor> de_text;
436 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
437 std::string_view sv;
438 ASSERT_OK(de_text->GetItemAt(&sv, {}));
439 std::string ss(sv);
440 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
441 }
442 ASSERT_OK(iter->GetNextRow(&row));
443 i++;
444 }
445
446 // Expect 6 samples
447 EXPECT_EQ(i, 6);
448
449 // Manually terminate the pipeline
450 iter->Stop();
451
452 // Restore configuration
453 GlobalContext::config_manager()->set_seed(original_seed);
454 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
455 }
456
TEST_F(MindDataTestPipeline,TestCSVDatasetShuffleFilesB)457 TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleFilesB) {
458 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleFilesB.";
459
460 // Set configuration
461 uint32_t original_seed = GlobalContext::config_manager()->seed();
462 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
463 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
464 GlobalContext::config_manager()->set_seed(130);
465 GlobalContext::config_manager()->set_num_parallel_workers(4);
466
467 // Create a CSVDataset, with 2 CSV files, append.csv and 1.csv in non-lexicographical order
468 std::string file1 = datasets_root_path_ + "/testCSV/1.csv";
469 std::string file2 = datasets_root_path_ + "/testCSV/append.csv";
470 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
471 std::shared_ptr<Dataset> ds = CSV({file2, file1}, ',', {}, column_names, 0, ShuffleMode::kFiles);
472 EXPECT_NE(ds, nullptr);
473
474 // Create an iterator over the result of the above dataset
475 // This will trigger the creation of the Execution Tree and launch it.
476 std::shared_ptr<Iterator> iter = ds->CreateIterator();
477 EXPECT_NE(iter, nullptr);
478
479 // Iterate the dataset and get each row
480 std::unordered_map<std::string, mindspore::MSTensor> row;
481 ASSERT_OK(iter->GetNextRow(&row));
482 EXPECT_NE(row.find("col1"), row.end());
483 std::vector<std::vector<std::string>> expected_result = {
484 {"13", "14", "15", "16"}, {"1", "2", "3", "4"}, {"17", "18", "19", "20"},
485 {"5", "6", "7", "8"}, {"21", "22", "23", "24"}, {"9", "10", "11", "12"},
486 };
487
488 uint64_t i = 0;
489 while (row.size() != 0) {
490 for (int j = 0; j < column_names.size(); j++) {
491 auto text = row[column_names[j]];
492 std::shared_ptr<Tensor> de_text;
493 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
494 std::string_view sv;
495 ASSERT_OK(de_text->GetItemAt(&sv, {}));
496 std::string ss(sv);
497 MS_LOG(INFO) << "Text length: " << ss.length() << ", Text: " << ss.substr(0, 50);
498 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
499 }
500 ASSERT_OK(iter->GetNextRow(&row));
501 i++;
502 }
503
504 // Expect 6 samples
505 EXPECT_EQ(i, 6);
506
507 // Manually terminate the pipeline
508 iter->Stop();
509
510 // Restore configuration
511 GlobalContext::config_manager()->set_seed(original_seed);
512 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
513 }
514
TEST_F(MindDataTestPipeline,TestCSVDatasetShuffleGlobal)515 TEST_F(MindDataTestPipeline, TestCSVDatasetShuffleGlobal) {
516 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetShuffleGlobal.";
517 // Test CSV Dataset with GLOBLE shuffle
518
519 // Set configuration
520 uint32_t original_seed = GlobalContext::config_manager()->seed();
521 uint32_t original_num_parallel_workers = GlobalContext::config_manager()->num_parallel_workers();
522 MS_LOG(DEBUG) << "ORIGINAL seed: " << original_seed << ", num_parallel_workers: " << original_num_parallel_workers;
523 GlobalContext::config_manager()->set_seed(135);
524 GlobalContext::config_manager()->set_num_parallel_workers(4);
525
526 // Create a CSVFile Dataset, with single CSV file
527 std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
528 std::vector<std::string> column_names = {"col1", "col2", "col3", "col4"};
529 std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kGlobal);
530 EXPECT_NE(ds, nullptr);
531
532 // Create an iterator over the result of the above dataset
533 // This will trigger the creation of the Execution Tree and launch it.
534 std::shared_ptr<Iterator> iter = ds->CreateIterator();
535 EXPECT_NE(iter, nullptr);
536
537 // Iterate the dataset and get each row
538 std::unordered_map<std::string, mindspore::MSTensor> row;
539 ASSERT_OK(iter->GetNextRow(&row));
540 EXPECT_NE(row.find("col1"), row.end());
541 std::vector<std::vector<std::string>> expected_result = {
542 {"5", "6", "7", "8"}, {"9", "10", "11", "12"}, {"1", "2", "3", "4"}};
543
544 uint64_t i = 0;
545 while (row.size() != 0) {
546 for (int j = 0; j < column_names.size(); j++) {
547 auto text = row[column_names[j]];
548 std::shared_ptr<Tensor> de_text;
549 ASSERT_OK(Tensor::CreateFromMSTensor(text, &de_text));
550 std::string_view sv;
551 ASSERT_OK(de_text->GetItemAt(&sv, {}));
552 std::string ss(sv);
553 EXPECT_STREQ(ss.c_str(), expected_result[i][j].c_str());
554 }
555 ASSERT_OK(iter->GetNextRow(&row));
556 i++;
557 }
558
559 // Expect 3 samples
560 EXPECT_EQ(i, 3);
561
562 // Manually terminate the pipeline
563 iter->Stop();
564
565 // Restore configuration
566 GlobalContext::config_manager()->set_seed(original_seed);
567 GlobalContext::config_manager()->set_num_parallel_workers(original_num_parallel_workers);
568 }
569
TEST_F(MindDataTestPipeline,TestCSVDatasetDuplicateColumnNameFail)570 TEST_F(MindDataTestPipeline, TestCSVDatasetDuplicateColumnNameFail) {
571 MS_LOG(INFO) << "Doing MindDataTestPipeline-TestCSVDatasetDuplicateColumnNameFail.";
572
573 // Create a CSVDataset, with single CSV file
574 std::string train_file = datasets_root_path_ + "/testCSV/1.csv";
575 std::vector<std::string> column_names = {"col1", "col1", "col3", "col4"};
576 std::shared_ptr<Dataset> ds = CSV({train_file}, ',', {}, column_names, 0, ShuffleMode::kFalse);
577 EXPECT_NE(ds, nullptr);
578
579 // Create an iterator over the result of the above dataset
580 std::shared_ptr<Iterator> iter = ds->CreateIterator();
581 // Expect failure: invalid CSV input, duplicate column names
582 EXPECT_EQ(iter, nullptr);
583 }
584