• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/engine/serdes.h"
17 
18 #include <fstream>
19 #include <iomanip>
20 #include <stack>
21 
22 #include "include/common/utils/utils.h"
23 #include "minddata/dataset/core/pybind_support.h"
24 #ifndef BUILD_LITE
25 #include "mindspore/core/utils/file_utils.h"
26 #else
27 #include "mindspore/lite/src/common/file_utils.h"
28 #endif
29 #include "minddata/dataset/kernels/image/dvpp/acl_adapter.h"
30 
31 namespace mindspore {
32 namespace dataset {
33 std::map<std::string, Status (*)(nlohmann::json json_obj, std::shared_ptr<TensorOperation> *operation)>
34   Serdes::func_ptr_ = Serdes::InitializeFuncPtr();
35 
SaveToJSON(std::shared_ptr<DatasetNode> node,const std::string & filename,nlohmann::json * out_json)36 Status Serdes::SaveToJSON(std::shared_ptr<DatasetNode> node, const std::string &filename, nlohmann::json *out_json) {
37   RETURN_UNEXPECTED_IF_NULL(node);
38   RETURN_UNEXPECTED_IF_NULL(out_json);
39   // If an optimized IR Tree is sent (use-case for MD AutoTune), ignore Top and EpochCtrl nodes
40   if (node->Name() == "Top" || node->Name() == "EpochCtrl") {
41     CHECK_FAIL_RETURN_UNEXPECTED(
42       node->Children().size() == 1,
43       "Expected " + node->Name() + " to have exactly 1 child but it has " + std::to_string(node->Children().size()));
44     return SaveToJSON(node->Children()[0], filename, out_json);
45   }
46   // Dump attributes of current node to json string
47   nlohmann::json args;
48   RETURN_IF_NOT_OK(node->to_json(&args));
49   args["op_type"] = node->Name();
50 
51   // If the current node isn't leaf node, visit all its children and get all attributes
52   std::vector<nlohmann::json> children_pipeline;
53   if (!node->IsLeaf()) {
54     for (auto child : node->Children()) {
55       nlohmann::json child_args;
56       RETURN_IF_NOT_OK(SaveToJSON(child, "", &child_args));
57       children_pipeline.push_back(child_args);
58     }
59   }
60   args["children"] = children_pipeline;
61 
62   // Save json string into file if filename is given.
63   if (!filename.empty()) {
64     RETURN_IF_NOT_OK(SaveJSONToFile(args, filename));
65   }
66 
67   *out_json = args;
68   return Status::OK();
69 }
70 
SaveJSONToFile(const nlohmann::json & json_string,const std::string & file_name,bool pretty)71 Status Serdes::SaveJSONToFile(const nlohmann::json &json_string, const std::string &file_name, bool pretty) {
72   constexpr int field_width = 4;
73   try {
74     std::optional<std::string> dir = "";
75     std::optional<std::string> local_file_name = "";
76     FileUtils::SplitDirAndFileName(file_name, &dir, &local_file_name);
77     if (!dir.has_value()) {
78       dir = ".";
79     }
80     auto realpath = FileUtils::GetRealPath(dir.value().c_str());
81     if (!realpath.has_value()) {
82       MS_LOG(ERROR) << "Invalid file, get real path failed, path=" << file_name;
83       RETURN_STATUS_UNEXPECTED("Invalid file, get real path failed, path=" + file_name);
84     }
85 
86     std::optional<std::string> whole_path = "";
87     FileUtils::ConcatDirAndFileName(&realpath, &local_file_name, &whole_path);
88 
89     std::ofstream file(whole_path.value(), std::ios::out);
90     if (pretty) {
91       file << std::setw(field_width);
92     }
93     file << json_string << std::endl;
94     file.close();
95 
96     ChangeFileMode(whole_path.value(), S_IRUSR | S_IWUSR);
97   } catch (const std::exception &err) {
98     RETURN_STATUS_UNEXPECTED("Invalid data, failed to save json string into file: " + file_name +
99                              ", error message: " + err.what());
100   }
101   return Status::OK();
102 }
103 
Deserialize(const std::string & json_filepath,std::shared_ptr<DatasetNode> * ds)104 Status Serdes::Deserialize(const std::string &json_filepath, std::shared_ptr<DatasetNode> *ds) {
105   nlohmann::json json_obj;
106   CHECK_FAIL_RETURN_UNEXPECTED(json_filepath.size() != 0, "Json path is null");
107   std::ifstream json_in(json_filepath, std::ios::in);
108   CHECK_FAIL_RETURN_UNEXPECTED(json_in, "Invalid file, failed to open json file: " + json_filepath);
109   try {
110     json_in >> json_obj;
111   } catch (const std::exception &e) {
112     json_in.close();
113     return Status(StatusCode::kMDSyntaxError,
114                   "Invalid file, failed to parse json file: " + json_filepath + ", error message: " + e.what());
115   }
116   json_in.close();
117   // Handle config generated by dataset autotune
118   if (json_obj.find("tree") != json_obj.end()) {
119     json_obj = json_obj["tree"];
120   }
121   RETURN_IF_NOT_OK(ConstructPipeline(json_obj, ds));
122   return Status::OK();
123 }
124 
ConstructPipeline(nlohmann::json json_obj,std::shared_ptr<DatasetNode> * ds)125 Status Serdes::ConstructPipeline(nlohmann::json json_obj, std::shared_ptr<DatasetNode> *ds) {
126   CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("children") != json_obj.end(), "Failed to find children");
127   std::shared_ptr<DatasetNode> child_ds;
128 
129   if (json_obj["children"].size() == 0) {
130     // If the JSON object has no child, then this node is a leaf node. Call create node to construct the corresponding
131     // leaf node
132     RETURN_IF_NOT_OK(CreateNode(nullptr, json_obj, ds));
133   } else if (json_obj["children"].size() == 1) {
134     // This node only has one child, construct the sub-tree under it first, and then call create node to construct the
135     // corresponding node
136     RETURN_IF_NOT_OK(ConstructPipeline(json_obj["children"][0], &child_ds));
137     RETURN_IF_NOT_OK(CreateNode(child_ds, json_obj, ds));
138   } else {
139     std::vector<std::shared_ptr<DatasetNode>> datasets;
140     for (const auto &child_json_obj : json_obj["children"]) {
141       RETURN_IF_NOT_OK(ConstructPipeline(child_json_obj, &child_ds));
142       datasets.push_back(child_ds);
143     }
144     if (json_obj["op_type"] == "Zip") {
145       CHECK_FAIL_RETURN_UNEXPECTED(datasets.size() > 1, "Should zip more than 1 dataset");
146       RETURN_IF_NOT_OK(ZipNode::from_json(datasets, ds));
147     } else if (json_obj["op_type"] == "Concat") {
148       CHECK_FAIL_RETURN_UNEXPECTED(datasets.size() > 1, "Should concat more than 1 dataset");
149       RETURN_IF_NOT_OK(ConcatNode::from_json(json_obj, datasets, ds));
150     } else {
151       return Status(StatusCode::kMDUnexpectedError,
152                     "Invalid data, unsupported operation type: " + std::string(json_obj["op_type"]));
153     }
154   }
155   return Status::OK();
156 }
157 
CreateNode(const std::shared_ptr<DatasetNode> & child_ds,nlohmann::json json_obj,std::shared_ptr<DatasetNode> * ds)158 Status Serdes::CreateNode(const std::shared_ptr<DatasetNode> &child_ds, nlohmann::json json_obj,
159                           std::shared_ptr<DatasetNode> *ds) {
160   CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("op_type") != json_obj.end(), "Failed to find op_type in json.");
161   std::string op_type = json_obj["op_type"];
162   if (child_ds == nullptr) {
163     // if dataset doesn't have any child, then create a source dataset IR. e.g., ImageFolderNode, CocoNode
164     RETURN_IF_NOT_OK(CreateDatasetNode(json_obj, op_type, ds));
165   } else {
166     // if the dataset has at least one child, then create an operation dataset IR, e.g., BatchNode, MapNode
167     RETURN_IF_NOT_OK(CreateDatasetOperationNode(child_ds, json_obj, op_type, ds));
168   }
169   return Status::OK();
170 }
171 
CreateDatasetNode(const nlohmann::json & json_obj,const std::string & op_type,std::shared_ptr<DatasetNode> * ds)172 Status Serdes::CreateDatasetNode(const nlohmann::json &json_obj, const std::string &op_type,
173                                  std::shared_ptr<DatasetNode> *ds) {
174   if (op_type == kAlbumNode) {
175     RETURN_IF_NOT_OK(AlbumNode::from_json(json_obj, ds));
176   } else if (op_type == kCelebANode) {
177     RETURN_IF_NOT_OK(CelebANode::from_json(json_obj, ds));
178   } else if (op_type == kCifar10Node) {
179     RETURN_IF_NOT_OK(Cifar10Node::from_json(json_obj, ds));
180   } else if (op_type == kCifar100Node) {
181     RETURN_IF_NOT_OK(Cifar100Node::from_json(json_obj, ds));
182   } else if (op_type == kCLUENode) {
183     RETURN_IF_NOT_OK(CLUENode::from_json(json_obj, ds));
184   } else if (op_type == kCocoNode) {
185     RETURN_IF_NOT_OK(CocoNode::from_json(json_obj, ds));
186   } else if (op_type == kCSVNode) {
187     RETURN_IF_NOT_OK(CSVNode::from_json(json_obj, ds));
188   } else if (op_type == kFlickrNode) {
189     RETURN_IF_NOT_OK(FlickrNode::from_json(json_obj, ds));
190   } else if (op_type == kImageFolderNode) {
191     RETURN_IF_NOT_OK(ImageFolderNode::from_json(json_obj, ds));
192   } else if (op_type == kManifestNode) {
193     RETURN_IF_NOT_OK(ManifestNode::from_json(json_obj, ds));
194   } else if (op_type == kMnistNode) {
195     RETURN_IF_NOT_OK(MnistNode::from_json(json_obj, ds));
196   } else if (op_type == kTextFileNode) {
197     RETURN_IF_NOT_OK(TextFileNode::from_json(json_obj, ds));
198   } else if (op_type == kTFRecordNode) {
199     RETURN_IF_NOT_OK(TFRecordNode::from_json(json_obj, ds));
200   } else if (op_type == kVOCNode) {
201     RETURN_IF_NOT_OK(VOCNode::from_json(json_obj, ds));
202   } else {
203     return Status(StatusCode::kMDUnexpectedError, "Invalid data, unsupported operation type: " + op_type);
204   }
205   return Status::OK();
206 }
207 
CreateDatasetOperationNode(const std::shared_ptr<DatasetNode> & ds,const nlohmann::json & json_obj,const std::string & op_type,std::shared_ptr<DatasetNode> * result)208 Status Serdes::CreateDatasetOperationNode(const std::shared_ptr<DatasetNode> &ds, const nlohmann::json &json_obj,
209                                           const std::string &op_type, std::shared_ptr<DatasetNode> *result) {
210   if (op_type == kBatchNode) {
211     RETURN_IF_NOT_OK(BatchNode::from_json(json_obj, ds, result));
212   } else if (op_type == kMapNode) {
213     RETURN_IF_NOT_OK(MapNode::from_json(json_obj, ds, result));
214   } else if (op_type == kProjectNode) {
215     RETURN_IF_NOT_OK(ProjectNode::from_json(json_obj, ds, result));
216   } else if (op_type == kRenameNode) {
217     RETURN_IF_NOT_OK(RenameNode::from_json(json_obj, ds, result));
218   } else if (op_type == kRepeatNode) {
219     RETURN_IF_NOT_OK(RepeatNode::from_json(json_obj, ds, result));
220   } else if (op_type == kShuffleNode) {
221     RETURN_IF_NOT_OK(ShuffleNode::from_json(json_obj, ds, result));
222   } else if (op_type == kSkipNode) {
223     RETURN_IF_NOT_OK(SkipNode::from_json(json_obj, ds, result));
224   } else if (op_type == kTransferNode) {
225     RETURN_IF_NOT_OK(DataQueueNode::from_json(json_obj, ds, result));
226   } else if (op_type == kTakeNode) {
227     RETURN_IF_NOT_OK(TakeNode::from_json(json_obj, ds, result));
228   } else {
229     return Status(StatusCode::kMDUnexpectedError, "Invalid data, unsupported operation type: " + op_type);
230   }
231   return Status::OK();
232 }
233 
ConstructSampler(nlohmann::json json_obj,std::shared_ptr<SamplerObj> * sampler)234 Status Serdes::ConstructSampler(nlohmann::json json_obj, std::shared_ptr<SamplerObj> *sampler) {
235   if (json_obj["sampler_name"] == "SkipFirstEpochSampler") {
236     RETURN_IF_NOT_OK(SkipFirstEpochSamplerObj::from_json(json_obj, sampler));
237     return Status::OK();
238   }
239   CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("num_samples") != json_obj.end(), "Failed to find num_samples");
240   CHECK_FAIL_RETURN_UNEXPECTED(json_obj.find("sampler_name") != json_obj.end(), "Failed to find sampler_name");
241   int64_t num_samples = json_obj["num_samples"];
242   std::string sampler_name = json_obj["sampler_name"];
243   if (sampler_name == "DistributedSampler") {
244     RETURN_IF_NOT_OK(DistributedSamplerObj::from_json(json_obj, num_samples, sampler));
245   } else if (sampler_name == "PKSampler") {
246     RETURN_IF_NOT_OK(PKSamplerObj::from_json(json_obj, num_samples, sampler));
247   } else if (sampler_name == "RandomSampler") {
248     RETURN_IF_NOT_OK(RandomSamplerObj::from_json(json_obj, num_samples, sampler));
249   } else if (sampler_name == "SequentialSampler") {
250     RETURN_IF_NOT_OK(SequentialSamplerObj::from_json(json_obj, num_samples, sampler));
251   } else if (sampler_name == "SubsetSampler") {
252     RETURN_IF_NOT_OK(SubsetSamplerObj::from_json(json_obj, num_samples, sampler));
253   } else if (sampler_name == "SubsetRandomSampler") {
254     RETURN_IF_NOT_OK(SubsetRandomSamplerObj::from_json(json_obj, num_samples, sampler));
255   } else if (sampler_name == "WeightedRandomSampler") {
256     RETURN_IF_NOT_OK(WeightedRandomSamplerObj::from_json(json_obj, num_samples, sampler));
257   } else {
258     return Status(StatusCode::kMDUnexpectedError, "Invalid data, unsupported sampler type: " + sampler_name);
259   }
260   return Status::OK();
261 }
262 
ConstructTensorOps(nlohmann::json json_obj,std::vector<std::shared_ptr<TensorOperation>> * result)263 Status Serdes::ConstructTensorOps(nlohmann::json json_obj, std::vector<std::shared_ptr<TensorOperation>> *result) {
264   std::vector<std::shared_ptr<TensorOperation>> output;
265   for (nlohmann::json item : json_obj) {
266     if (item.find("python_module") != item.end()) {
267       if (Py_IsInitialized() != 0) {
268         std::vector<std::shared_ptr<TensorOperation>> tmp_res;
269         RETURN_IF_NOT_OK(PyFuncOp::from_json(item, &tmp_res));
270         output.insert(output.end(), tmp_res.begin(), tmp_res.end());
271       } else {
272         LOG_AND_RETURN_STATUS_SYNTAX_ERROR(
273           "Python module is not initialized or Pyfunction is not supported on this platform.");
274       }
275     } else {
276       CHECK_FAIL_RETURN_UNEXPECTED(item.find("tensor_op_name") != item.end(), "Failed to find tensor_op_name");
277       CHECK_FAIL_RETURN_UNEXPECTED(item.find("tensor_op_params") != item.end(), "Failed to find tensor_op_params");
278       std::string op_name = item["tensor_op_name"];
279       nlohmann::json op_params = item["tensor_op_params"];
280       std::shared_ptr<TensorOperation> operation = nullptr;
281       CHECK_FAIL_RETURN_UNEXPECTED(func_ptr_.find(op_name) != func_ptr_.end(),
282                                    "Invalid data, unsupported operation: " + op_name);
283       RETURN_IF_NOT_OK(func_ptr_[op_name](op_params, &operation));
284       output.push_back(operation);
285     }
286   }
287   *result = output;
288   return Status::OK();
289 }
290 
291 std::map<std::string, Status (*)(nlohmann::json json_obj, std::shared_ptr<TensorOperation> *operation)>
InitializeFuncPtr()292 Serdes::InitializeFuncPtr() {
293   std::map<std::string, Status (*)(nlohmann::json json_obj, std::shared_ptr<TensorOperation> * operation)> ops_ptr;
294   ops_ptr[vision::kAdjustGammaOperation] = &(vision::AdjustGammaOperation::from_json);
295   ops_ptr[vision::kAffineOperation] = &(vision::AffineOperation::from_json);
296   ops_ptr[vision::kAutoContrastOperation] = &(vision::AutoContrastOperation::from_json);
297   ops_ptr[vision::kBoundingBoxAugmentOperation] = &(vision::BoundingBoxAugmentOperation::from_json);
298   ops_ptr[vision::kCenterCropOperation] = &(vision::CenterCropOperation::from_json);
299   ops_ptr[vision::kCropOperation] = &(vision::CropOperation::from_json);
300   ops_ptr[vision::kCutMixBatchOperation] = &(vision::CutMixBatchOperation::from_json);
301   ops_ptr[vision::kCutOutOperation] = &(vision::CutOutOperation::from_json);
302   ops_ptr[vision::kDecodeOperation] = &(vision::DecodeOperation::from_json);
303 #if defined(WITH_BACKEND) || defined(ENABLE_ACL)
304   if (AclAdapter::GetInstance().HasAclPlugin()) {
305     ops_ptr[vision::kDvppCropJpegOperation] = &(vision::DvppCropJpegOperation::from_json);
306     ops_ptr[vision::kDvppDecodeResizeOperation] = &(vision::DvppDecodeResizeOperation::from_json);
307     ops_ptr[vision::kDvppDecodeResizeCropOperation] = &(vision::DvppDecodeResizeCropOperation::from_json);
308     ops_ptr[vision::kDvppNormalizeOperation] = &(vision::DvppNormalizeOperation::from_json);
309     ops_ptr[vision::kDvppResizeJpegOperation] = &(vision::DvppResizeJpegOperation::from_json);
310   }
311 #endif
312   ops_ptr[vision::kEqualizeOperation] = &(vision::EqualizeOperation::from_json);
313   ops_ptr[vision::kGaussianBlurOperation] = &(vision::GaussianBlurOperation::from_json);
314   ops_ptr[vision::kHorizontalFlipOperation] = &(vision::HorizontalFlipOperation::from_json);
315   ops_ptr[vision::kHwcToChwOperation] = &(vision::HwcToChwOperation::from_json);
316   ops_ptr[vision::kInvertOperation] = &(vision::InvertOperation::from_json);
317   ops_ptr[vision::kMixUpBatchOperation] = &(vision::MixUpBatchOperation::from_json);
318   ops_ptr[vision::kNormalizeOperation] = &(vision::NormalizeOperation::from_json);
319   ops_ptr[vision::kNormalizePadOperation] = &(vision::NormalizePadOperation::from_json);
320   ops_ptr[vision::kPadOperation] = &(vision::PadOperation::from_json);
321   ops_ptr[vision::kRandomAffineOperation] = &(vision::RandomAffineOperation::from_json);
322   ops_ptr[vision::kRandomColorOperation] = &(vision::RandomColorOperation::from_json);
323   ops_ptr[vision::kRandomColorAdjustOperation] = &(vision::RandomColorAdjustOperation::from_json);
324   ops_ptr[vision::kRandomCropDecodeResizeOperation] = &(vision::RandomCropDecodeResizeOperation::from_json);
325   ops_ptr[vision::kRandomCropOperation] = &(vision::RandomCropOperation::from_json);
326   ops_ptr[vision::kRandomCropWithBBoxOperation] = &(vision::RandomCropWithBBoxOperation::from_json);
327   ops_ptr[vision::kRandomHorizontalFlipOperation] = &(vision::RandomHorizontalFlipOperation::from_json);
328   ops_ptr[vision::kRandomHorizontalFlipWithBBoxOperation] = &(vision::RandomHorizontalFlipWithBBoxOperation::from_json);
329   ops_ptr[vision::kRandomPosterizeOperation] = &(vision::RandomPosterizeOperation::from_json);
330   ops_ptr[vision::kRandomResizeOperation] = &(vision::RandomResizeOperation::from_json);
331   ops_ptr[vision::kRandomResizeWithBBoxOperation] = &(vision::RandomResizeWithBBoxOperation::from_json);
332   ops_ptr[vision::kRandomResizedCropOperation] = &(vision::RandomResizedCropOperation::from_json);
333   ops_ptr[vision::kRandomResizedCropWithBBoxOperation] = &(vision::RandomResizedCropWithBBoxOperation::from_json);
334   ops_ptr[vision::kRandomRotationOperation] = &(vision::RandomRotationOperation::from_json);
335   ops_ptr[vision::kRandomSelectSubpolicyOperation] = &(vision::RandomSelectSubpolicyOperation::from_json);
336   ops_ptr[vision::kRandomSharpnessOperation] = &(vision::RandomSharpnessOperation::from_json);
337   ops_ptr[vision::kRandomSolarizeOperation] = &(vision::RandomSolarizeOperation::from_json);
338   ops_ptr[vision::kRandomVerticalFlipOperation] = &(vision::RandomVerticalFlipOperation::from_json);
339   ops_ptr[vision::kRandomVerticalFlipWithBBoxOperation] = &(vision::RandomVerticalFlipWithBBoxOperation::from_json);
340   ops_ptr[vision::kRandomSharpnessOperation] = &(vision::RandomSharpnessOperation::from_json);
341   ops_ptr[vision::kRandomSolarizeOperation] = &(vision::RandomSolarizeOperation::from_json);
342   ops_ptr[vision::kRescaleOperation] = &(vision::RescaleOperation::from_json);
343   ops_ptr[vision::kResizeOperation] = &(vision::ResizeOperation::from_json);
344   ops_ptr[vision::kResizePreserveAROperation] = &(vision::ResizePreserveAROperation::from_json);
345   ops_ptr[vision::kResizeWithBBoxOperation] = &(vision::ResizeWithBBoxOperation::from_json);
346   ops_ptr[vision::kRgbaToBgrOperation] = &(vision::RgbaToBgrOperation::from_json);
347   ops_ptr[vision::kRgbaToRgbOperation] = &(vision::RgbaToRgbOperation::from_json);
348   ops_ptr[vision::kRgbToBgrOperation] = &(vision::RgbToBgrOperation::from_json);
349   ops_ptr[vision::kRgbToGrayOperation] = &(vision::RgbToGrayOperation::from_json);
350   ops_ptr[vision::kRotateOperation] = &(vision::RotateOperation::from_json);
351   ops_ptr[vision::kSlicePatchesOperation] = &(vision::SlicePatchesOperation::from_json);
352   ops_ptr[vision::kSwapRedBlueOperation] = &(vision::SwapRedBlueOperation::from_json);
353   ops_ptr[vision::kToTensorOperation] = &(vision::ToTensorOperation::from_json);
354   ops_ptr[vision::kUniformAugOperation] = &(vision::UniformAugOperation::from_json);
355   ops_ptr[vision::kVerticalFlipOperation] = &(vision::VerticalFlipOperation::from_json);
356   ops_ptr[transforms::kFillOperation] = &(transforms::FillOperation::from_json);
357   ops_ptr[transforms::kOneHotOperation] = &(transforms::OneHotOperation::from_json);
358   ops_ptr[transforms::kTypeCastOperation] = &(transforms::TypeCastOperation::from_json);
359   ops_ptr[text::kToNumberOperation] = &(text::ToNumberOperation::from_json);
360   return ops_ptr;
361 }
362 
ParseMindIRPreprocess(const std::vector<std::string> & map_json_string,std::vector<std::shared_ptr<mindspore::dataset::Execute>> * data_graph)363 Status Serdes::ParseMindIRPreprocess(const std::vector<std::string> &map_json_string,
364                                      std::vector<std::shared_ptr<mindspore::dataset::Execute>> *data_graph) {
365   CHECK_FAIL_RETURN_UNEXPECTED(!map_json_string.empty(), "Invalid data, no json data in map_json_string.");
366 
367   const std::string process_column = "[\"image\"]";
368   MS_LOG(WARNING) << "Only supports parse \"image\" column from dataset object.";
369 
370   nlohmann::json map_json;
371   try {
372     for (auto &json : map_json_string) {
373       map_json = nlohmann::json::parse(json);
374       if (map_json["input_columns"].dump() == process_column) {
375         break;
376       }
377     }
378   } catch (const std::exception &err) {
379     MS_LOG(ERROR) << "Invalid json content, failed to parse JSON data, error message: " << err.what();
380     RETURN_STATUS_UNEXPECTED("Invalid json content, failed to parse JSON data.");
381   }
382 
383   if (map_json.empty()) {
384     MS_LOG(ERROR) << "Invalid json content, no JSON data found for given input column: " + process_column;
385     RETURN_STATUS_UNEXPECTED("Invalid json content, no JSON data found for given input column: " + process_column);
386   }
387 
388   while (map_json != nullptr) {
389     CHECK_FAIL_RETURN_UNEXPECTED(map_json["op_type"] == "Map", "Invalid json content, this is not a MapOp.");
390 
391     std::vector<std::shared_ptr<TensorOperation>> tensor_ops;
392     RETURN_IF_NOT_OK(ConstructTensorOps(map_json["operations"], &tensor_ops));
393     if (map_json["input_columns"].dump() == process_column) {
394       std::vector<std::string> op_names;
395       std::transform(tensor_ops.begin(), tensor_ops.end(), std::back_inserter(op_names),
396                      [](const auto &op) { return op->Name(); });
397       MS_LOG(INFO) << "Find valid preprocess operations: " << op_names;
398       data_graph->push_back(std::make_shared<Execute>(tensor_ops));
399     }
400     map_json = map_json["children"];
401   }
402 
403   if (!data_graph->size()) {
404     MS_LOG(WARNING) << "Can not find any valid preprocess operation.";
405   }
406 
407   return Status::OK();
408 }
409 
UpdateOptimizedIRTreeJSON(nlohmann::json * serialized_json,const std::map<int32_t,std::shared_ptr<DatasetOp>> & op_map)410 Status Serdes::UpdateOptimizedIRTreeJSON(nlohmann::json *serialized_json,
411                                          const std::map<int32_t, std::shared_ptr<DatasetOp>> &op_map) {
412   RETURN_UNEXPECTED_IF_NULL(serialized_json);
413   int32_t op_id = 0;
414   return RecurseUpdateOptimizedIRTreeJSON(serialized_json, &op_id, op_map);
415 }
416 
IsDatasetOpMatchIRNode(std::string_view ir_node_name,std::string_view dataset_op_name)417 bool IsDatasetOpMatchIRNode(std::string_view ir_node_name, std::string_view dataset_op_name) {
418   // Helper function to match IR Node name to its dataset op name
419   if (ir_node_name == kSyncWaitNode) {
420     return dataset_op_name == kBarrierOp;
421   } else if (ir_node_name == kCifar10Node || ir_node_name == kCifar100Node) {
422     return dataset_op_name == "CifarOp";
423   } else if (ir_node_name == kMindDataNode) {
424     return dataset_op_name == "MindRecordOp";
425   } else if (ir_node_name == kRandomNode) {
426     return dataset_op_name == "RandomDataOp";
427   } else if (ir_node_name == kTFRecordNode) {
428     return dataset_op_name == "TFReaderOp";
429   } else if (ir_node_name == kIWSLT2016Node || ir_node_name == kIWSLT2017Node) {
430     return dataset_op_name == "IWSLTOp";
431   } else {
432     // Generic way of matching, special cases handled above. Special cases will evolve over time.
433     return ir_node_name.substr(0, ir_node_name.find("Dataset")) ==
434            dataset_op_name.substr(0, dataset_op_name.find("Op"));
435   }
436 }
437 
RecurseUpdateOptimizedIRTreeJSON(nlohmann::json * serialized_json,int32_t * op_id,const std::map<int32_t,std::shared_ptr<DatasetOp>> & op_map)438 Status Serdes::RecurseUpdateOptimizedIRTreeJSON(nlohmann::json *serialized_json, int32_t *op_id,
439                                                 const std::map<int32_t, std::shared_ptr<DatasetOp>> &op_map) {
440   RETURN_UNEXPECTED_IF_NULL(serialized_json);
441   RETURN_UNEXPECTED_IF_NULL(op_id);
442 
443   std::string ir_node_name = (*serialized_json)["op_type"];
444   MS_LOG(INFO) << "Visiting IR Node: " << ir_node_name;
445   // Each IR Node should have a corresponding dataset node in the execution tree but the reverse is not necessarily true
446   while (!IsDatasetOpMatchIRNode(ir_node_name, op_map.find(*op_id)->second->Name())) {
447     // During the construction of execution tree, extra dataset nodes may have been inserted
448     // Skip dataset ops unless we get to the expected node
449     MS_LOG(INFO) << "\tSkipping dataset op: " << op_map.find(*op_id)->second->NameWithID();
450     ++(*op_id);
451     CHECK_FAIL_RETURN_UNEXPECTED(*op_id < op_map.size(), "op_id is out of bounds");
452   }
453   MS_LOG(INFO) << "\tMatch found for IR Node: " << ir_node_name
454                << " with dataset op: " << op_map.find(*op_id)->second->NameWithID();
455   if (!op_map.find(*op_id)->second->inlined() && serialized_json->contains("num_parallel_workers") &&
456       serialized_json->contains("connector_queue_size")) {
457     (*serialized_json)["num_parallel_workers"] = op_map.find(*op_id)->second->NumWorkers();
458     (*serialized_json)["connector_queue_size"] = op_map.find(*op_id)->second->ConnectorCapacity();
459   }
460   ++(*op_id);
461   auto num_children = (*serialized_json)["children"].size();
462   for (int i = 0; i < static_cast<int>(num_children); ++i) {
463     RETURN_IF_NOT_OK(RecurseUpdateOptimizedIRTreeJSON(&(*serialized_json)["children"][i], op_id, op_map));
464   }
465   return Status::OK();
466 }
467 
468 // In the current stage, there is a cyclic dependency between libmindspore.so and c_dataengine.so,
469 // we make a C function here and dlopen by libminspore.so to avoid linking explicitly,
470 // will be fix after decouling libminspore.so into multi submodules
471 extern "C" {
472 // ParseMindIRPreprocess_C has C-linkage specified, but returns user-defined type 'mindspore::Status'
473 // which is incompatible with C
ParseMindIRPreprocess_C(const std::vector<std::string> & dataset_json,std::vector<std::shared_ptr<mindspore::dataset::Execute>> * data_graph,Status * s)474 void ParseMindIRPreprocess_C(const std::vector<std::string> &dataset_json,
475                              std::vector<std::shared_ptr<mindspore::dataset::Execute>> *data_graph, Status *s) {
476   if (s != nullptr) {
477     Status ret = Serdes::ParseMindIRPreprocess(dataset_json, data_graph);
478     *s = Status(ret);
479   }
480 }
481 }
482 
483 }  // namespace dataset
484 }  // namespace mindspore
485