• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "minddata/dataset/include/dataset/execute.h"
18 
19 #include <algorithm>
20 #include <fstream>
21 #include <locale>
22 #include <string>
23 
24 #include "minddata/dataset/core/de_tensor.h"
25 #include "minddata/dataset/core/tensor_row.h"
26 #include "minddata/dataset/core/tensor.h"
27 #include "minddata/dataset/core/type_id.h"
28 #include "minddata/dataset/kernels/data/compose_op.h"
29 #include "minddata/dataset/kernels/ir/tensor_operation.h"
30 #include "minddata/dataset/kernels/tensor_op.h"
31 #include "minddata/dataset/util/log_adapter.h"
32 #include "minddata/dataset/core/ascend_resource.h"
33 #include "minddata/dataset/kernels/image/dvpp/utils/CommonDataType.h"
34 #include "minddata/dataset/kernels/ir/vision/ascend_vision_ir.h"
35 #if !defined(BUILD_LITE) && defined(ENABLE_D)
36 #include "minddata/dataset/kernels/image/image_utils.h"
37 #endif
38 #ifndef BUILD_LITE
39 #include "mindspore/core/utils/file_utils.h"
40 namespace platform = mindspore;
41 #else
42 #include "mindspore/lite/src/common/file_utils.h"
43 namespace platform = mindspore::lite;
44 #endif
45 #if !defined(BUILD_LITE) && defined(ENABLE_D)
46 #include "minddata/dataset/kernels/image/dvpp/acl_adapter.h"
47 #include "minddata/dataset/kernels/image/dvpp/utils/ErrorCode.h"
48 #endif
49 
50 namespace mindspore {
51 namespace dataset {
52 using json = nlohmann::json;
53 
54 struct Execute::ExtraInfo {
55   std::multimap<std::string, std::vector<uint32_t>> aipp_cfg_;
56   bool init_with_shared_ptr_ = true;  // Initial execute object with shared_ptr as default
57 #if defined(WITH_BACKEND) || defined(ENABLE_ACL)
58   std::multimap<std::string, std::string> op2para_map_ = {{vision::kDvppCropJpegOperation, "size"},
59                                                           {vision::kDvppDecodeResizeOperation, "size"},
60                                                           {vision::kDvppDecodeResizeCropOperation, "crop_size"},
61                                                           {vision::kDvppDecodeResizeCropOperation, "resize_size"},
62                                                           {vision::kDvppNormalizeOperation, "mean"},
63                                                           {vision::kDvppNormalizeOperation, "std"},
64                                                           {vision::kDvppResizeJpegOperation, "size"}};
65 #endif
66 };
67 
InitResource(MapTargetDevice device_type,uint32_t device_id)68 Status Execute::InitResource(MapTargetDevice device_type, uint32_t device_id) {
69   if (device_type == MapTargetDevice::kAscend310) {
70     MS_LOG(INFO) << "InitResource for Ascend310";
71 #if defined(WITH_BACKEND) || defined(ENABLE_ACL)
72     if (device_resource_ == nullptr) {
73       device_resource_ = std::make_shared<AscendResource>();
74       Status rc = device_resource_->InitResource(device_id);
75       if (!rc.IsOk()) {
76         device_resource_ = nullptr;
77         std::string err_msg = "Initialize Ascend310 resource fail";
78         MS_LOG(ERROR) << err_msg;
79         RETURN_STATUS_UNEXPECTED(err_msg);
80       }
81     } else {
82       MS_LOG(INFO) << "Ascend310 context resource had been initialized.";
83     }
84 #endif
85     device_type_ = device_type;
86 #if !defined(BUILD_LITE) && defined(ENABLE_D)
87   } else if (device_type == MapTargetDevice::kAscend910B) {
88     MS_LOG(INFO) << "InitResource for Ascend910B";
89     if (device_context_ == nullptr) {
90       auto ms_context = MsContext::GetInstance();
91       RETURN_UNEXPECTED_IF_NULL(ms_context);
92       device_context_ = device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext(
93         {ms_context->get_param<std::string>(MS_CTX_DEVICE_TARGET), ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID)});
94       RETURN_UNEXPECTED_IF_NULL(device_context_);
95       device_context_->Initialize();
96       RETURN_UNEXPECTED_IF_NULL(device_context_->device_res_manager_);
97 
98       std::string soc_version;
99       auto ret = AclAdapter::GetInstance().GetSocName(&soc_version);
100       if (ret != APP_ERR_OK) {
101         std::string error = "Get Soc Version failed.";
102         RETURN_STATUS_UNEXPECTED(error);
103       }
104       if (soc_version.find("Ascend910B") == std::string::npos && soc_version.find("Ascend910C") == std::string::npos) {
105         std::string err_msg = "The SoC: " + soc_version + " is not Ascend910B / Ascend910C";
106         MS_LOG(ERROR) << err_msg;
107         // reset the device_context_, because the Soc is not support yet
108         device_context_ = nullptr;
109         RETURN_STATUS_UNEXPECTED(err_msg);
110       }
111 
112       if (!device_context_->device_res_manager_->CreateStream(&stream_id_)) {
113         std::string err_msg = "Create new stream failed on Ascend910B platform.";
114         MS_LOG(ERROR) << err_msg;
115         RETURN_STATUS_UNEXPECTED(err_msg);
116       }
117       MS_LOG(INFO) << "Create new stream id: " << std::to_string(stream_id_);
118       device_type_ = device_type;
119     } else {
120       MS_LOG(INFO) << "Ascend910B context resource had been initialized.";
121     }
122 #endif
123   }
124   return Status::OK();
125 }
126 
Execute(const std::shared_ptr<TensorOperation> & op,MapTargetDevice device_type,uint32_t device_id)127 Execute::Execute(const std::shared_ptr<TensorOperation> &op, MapTargetDevice device_type, uint32_t device_id) {
128   (void)ops_.emplace_back(op);
129   device_type_ = device_type;
130   info_ = std::make_shared<ExtraInfo>();
131 #if !defined(BUILD_LITE) && defined(ENABLE_D)
132   device_context_ = nullptr;
133 #endif
134 
135   // Ascend910B
136   if (op->Type() == MapTargetDevice::kAscend910B) {
137     // Update the device_type
138     device_type = MapTargetDevice::kAscend910B;
139     MS_LOG(INFO) << "Update device type: " << std::to_string(static_cast<int>(device_type));
140   }
141 
142   (void)InitResource(device_type, device_id);
143 }
144 
Execute(const std::shared_ptr<TensorTransform> & op,MapTargetDevice device_type,uint32_t device_id)145 Execute::Execute(const std::shared_ptr<TensorTransform> &op, MapTargetDevice device_type, uint32_t device_id) {
146   // Initialize the op and other context
147   (void)transforms_.emplace_back(op);
148 
149   info_ = std::make_shared<ExtraInfo>();
150   device_type_ = device_type;
151   (void)InitResource(device_type, device_id);
152 }
153 
Execute(const std::reference_wrapper<TensorTransform> & op,MapTargetDevice device_type,uint32_t device_id)154 Execute::Execute(const std::reference_wrapper<TensorTransform> &op, MapTargetDevice device_type, uint32_t device_id) {
155   // Initialize the transforms_ and other context
156   std::shared_ptr<TensorOperation> operation = op.get().Parse();
157   (void)ops_.emplace_back(std::move(operation));
158 
159   info_ = std::make_shared<ExtraInfo>();
160   info_->init_with_shared_ptr_ = false;
161   device_type_ = device_type;
162   (void)InitResource(device_type, device_id);
163 }
164 
165 // Execute function for the example case: auto decode(new vision::Decode());
Execute(TensorTransform * op,MapTargetDevice device_type,uint32_t device_id)166 Execute::Execute(TensorTransform *op, MapTargetDevice device_type, uint32_t device_id) {
167   // Initialize the transforms_ and other context
168   (void)ops_.emplace_back(op->Parse());
169 
170   info_ = std::make_shared<ExtraInfo>();
171   info_->init_with_shared_ptr_ = false;
172   device_type_ = device_type;
173   (void)InitResource(device_type, device_id);
174 }
175 
Execute(const std::vector<std::shared_ptr<TensorOperation>> & ops,MapTargetDevice device_type,uint32_t device_id)176 Execute::Execute(const std::vector<std::shared_ptr<TensorOperation>> &ops, MapTargetDevice device_type,
177                  uint32_t device_id)
178     : ops_(ops), device_type_(device_type) {
179   info_ = std::make_shared<ExtraInfo>();
180   (void)InitResource(device_type, device_id);
181 }
182 
Execute(const std::vector<std::shared_ptr<TensorTransform>> & ops,MapTargetDevice device_type,uint32_t device_id)183 Execute::Execute(const std::vector<std::shared_ptr<TensorTransform>> &ops, MapTargetDevice device_type,
184                  uint32_t device_id) {
185   // Initialize the transforms_ and other context
186   transforms_ = ops;
187 
188   info_ = std::make_shared<ExtraInfo>();
189   device_type_ = device_type;
190   (void)InitResource(device_type, device_id);
191 }
192 
Execute(const std::vector<std::reference_wrapper<TensorTransform>> & ops,MapTargetDevice device_type,uint32_t device_id)193 Execute::Execute(const std::vector<std::reference_wrapper<TensorTransform>> &ops, MapTargetDevice device_type,
194                  uint32_t device_id) {
195   // Initialize the transforms_ and other context
196   if (device_type == MapTargetDevice::kCpu) {
197     (void)std::transform(
198       ops.begin(), ops.end(), std::back_inserter(ops_),
199       [](TensorTransform &operation) -> std::shared_ptr<TensorOperation> { return operation.Parse(); });
200   } else {
201     for (auto &op : ops) {
202       (void)ops_.emplace_back(op.get().Parse(device_type));
203     }
204   }
205 
206   info_ = std::make_shared<ExtraInfo>();
207   info_->init_with_shared_ptr_ = false;
208   device_type_ = device_type;
209   (void)InitResource(device_type, device_id);
210 }
211 
212 // Execute function for the example vector case: auto decode(new vision::Decode());
Execute(const std::vector<TensorTransform * > & ops,MapTargetDevice device_type,uint32_t device_id)213 Execute::Execute(const std::vector<TensorTransform *> &ops, MapTargetDevice device_type, uint32_t device_id) {
214   // Initialize the transforms_ and other context
215   (void)std::transform(
216     ops.begin(), ops.end(), std::back_inserter(ops_),
217     [](TensorTransform *operation) -> std::shared_ptr<TensorOperation> { return operation->Parse(); });
218 
219   info_ = std::make_shared<ExtraInfo>();
220   info_->init_with_shared_ptr_ = false;
221   device_type_ = device_type;
222   (void)InitResource(device_type, device_id);
223 }
224 
~Execute()225 Execute::~Execute() {
226   if (device_type_ == MapTargetDevice::kAscend310) {
227     if (device_resource_) {
228       auto rc = device_resource_->FinalizeResource();
229       if (rc.IsError()) {
230         MS_LOG(ERROR) << "Device resource release failed, error msg is " << rc;
231       }
232     } else {
233       MS_LOG(ERROR) << "Device resource is nullptr which is illegal under case Ascend310";
234     }
235   }
236 }
237 
UpdateOperation(const std::shared_ptr<TensorOperation> & op)238 Status Execute::UpdateOperation(const std::shared_ptr<TensorOperation> &op) {
239   // clear the ops_ first
240   ops_.clear();
241   (void)ops_.emplace_back(op);
242   return Status::OK();
243 }
244 
BuildTransforms(std::vector<std::shared_ptr<TensorOp>> * transforms_rt)245 Status Execute::BuildTransforms(std::vector<std::shared_ptr<TensorOp>> *transforms_rt) {
246   // Parse TensorTransform transforms_ into TensorOperation ops_
247   if (info_->init_with_shared_ptr_) {
248     RETURN_IF_NOT_OK(ParseTransforms());
249     info_->init_with_shared_ptr_ = false;
250   }
251   CHECK_FAIL_RETURN_UNEXPECTED(!ops_.empty(), "Input TensorOperation should be provided.");
252 
253   std::map<MapTargetDevice, std::string> env_list = {
254     {MapTargetDevice::kCpu, "kCpu"}, {MapTargetDevice::kGpu, "kGpu"}, {MapTargetDevice::kAscend310, "kAscend310"}};
255 
256   // Validate and build runtime ops
257   for (size_t i = 0; i < ops_.size(); i++) {
258     if (ops_[i] == nullptr) {
259       std::string err_msg = "Input TensorOperation[" + std::to_string(i) +
260                             "] is unsupported on your input device:" + env_list.at(device_type_);
261       MS_LOG(ERROR) << err_msg;
262       RETURN_STATUS_UNEXPECTED(err_msg);
263     }
264     RETURN_IF_NOT_OK(ops_[i]->ValidateParams());
265     (void)transforms_rt->emplace_back(ops_[i]->Build());
266   }
267 
268   // if transforms_rt[0] is ComposeOp and all dvpp, extract all the ops from ComposeOp
269   if ((*transforms_rt)[0]->IsDvppOp() && (*transforms_rt)[0]->Name() == kComposeOp) {
270     if (dynamic_cast<ComposeOp *>((*transforms_rt)[0].get())->IsMixedOps()) {
271       std::string err_msg = "Currently, it is not supported to mix DVPP transforms with CPU transforms in Compose.";
272       MS_LOG(ERROR) << err_msg;
273       RETURN_STATUS_UNEXPECTED(err_msg);
274     }
275     *transforms_rt = std::move(dynamic_cast<ComposeOp *>((*transforms_rt)[0].get())->GetOps());
276     MS_LOG(INFO) << "Extract the ComposeOp to " << std::to_string((*transforms_rt).size()) << " ops.";
277   }
278 
279   return Status::OK();
280 }
281 
operator ()(const mindspore::MSTensor & input,mindspore::MSTensor * output)282 Status Execute::operator()(const mindspore::MSTensor &input, mindspore::MSTensor *output) {
283   // Validate input tensor
284   RETURN_UNEXPECTED_IF_NULL(output);
285   CHECK_FAIL_RETURN_UNEXPECTED(input.DataSize() > 0, "Input Tensor has no data.");
286   CHECK_FAIL_RETURN_UNEXPECTED(ValidateDevice(), "Device Type should be 'Ascend310' or 'CPU'.");
287   std::vector<std::shared_ptr<TensorOp>> transforms_rt;
288   CHECK_FAIL_RETURN_UNEXPECTED(BuildTransforms(&transforms_rt), "Building Transform ops failed!");
289 
290   if (device_type_ == MapTargetDevice::kCpu) {
291     // Convert mindspore::Tensor to dataset::Tensor
292     std::shared_ptr<dataset::Tensor> de_tensor;
293     Status rc = dataset::Tensor::CreateFromMemory(dataset::TensorShape(input.Shape()),
294                                                   MSTypeToDEType(static_cast<TypeId>(input.DataType())),
295                                                   (const uchar *)(input.Data().get()), input.DataSize(), &de_tensor);
296     if (rc.IsError()) {
297       MS_LOG(ERROR) << rc;
298       return rc;
299     }
300 
301     // Apply transforms on tensor
302     for (auto &t : transforms_rt) {
303       TensorRow de_tensor_row;
304       TensorRow de_output_row;
305       de_tensor_row.push_back(de_tensor);
306       de_output_row.resize(1);
307       Status rc_ = t->Compute(de_tensor_row, &de_output_row);
308       if (rc_.IsError()) {
309         MS_LOG(ERROR) << rc_;
310         return rc_;
311       }
312 
313       // For next transform
314       de_tensor = std::move(de_output_row[0]);
315     }
316 
317     // Convert dataset::Tensor to mindspore::Tensor
318     if (!de_tensor->HasData()) {
319       std::stringstream ss;
320       ss << "Transformation returned an empty tensor with shape " << de_tensor->shape();
321       RETURN_STATUS_UNEXPECTED(ss.str());
322     }
323     *output = mindspore::MSTensor(std::make_shared<DETensor>(de_tensor));
324   } else if (device_type_ ==
325              MapTargetDevice::kAscend310) {  // Ascend310 case, where we must set Ascend resource on each operations
326 #if defined(WITH_BACKEND) || defined(ENABLE_ACL)
327     CHECK_FAIL_RETURN_UNEXPECTED(device_resource_, "Device resource is nullptr which is illegal under case Ascend310.");
328     // Sink data from host into device
329     std::shared_ptr<mindspore::dataset::DeviceTensor> device_input;
330     RETURN_IF_NOT_OK(device_resource_->Sink(input, &device_input));
331 
332     for (auto &t : transforms_rt) {
333       // Initialize AscendResource for each operations
334       std::shared_ptr<DeviceTensor> device_output;
335       RETURN_IF_NOT_OK(t->SetAscendResource(device_resource_));
336 
337       RETURN_IF_NOT_OK(t->Compute(device_input, &device_output));
338 
339       // For next transform
340       device_input = std::move(device_output);
341     }
342     CHECK_FAIL_RETURN_UNEXPECTED(device_input->HasDeviceData(), "Apply transform failed, output tensor has no data.");
343 
344     *output = mindspore::MSTensor(std::make_shared<DETensor>(device_input, true));
345 #endif
346   } else {
347     std::string err_msg = "Your input device is not supported. (Option: CPU or Ascend310)";
348     MS_LOG(ERROR) << err_msg;
349     RETURN_STATUS_UNEXPECTED(err_msg);
350   }
351   return Status::OK();
352 }
353 
operator ()(const std::vector<MSTensor> & input_tensor_list,std::vector<MSTensor> * output_tensor_list)354 Status Execute::operator()(const std::vector<MSTensor> &input_tensor_list, std::vector<MSTensor> *output_tensor_list) {
355   // Validate input tensor
356   RETURN_UNEXPECTED_IF_NULL(output_tensor_list);
357   CHECK_FAIL_RETURN_UNEXPECTED(!input_tensor_list.empty(), "Input Tensor is not valid.");
358   output_tensor_list->clear();
359   for (auto &tensor : input_tensor_list) {
360     CHECK_FAIL_RETURN_UNEXPECTED(tensor.DataSize() > 0, "Input Tensor has no data.");
361   }
362   CHECK_FAIL_RETURN_UNEXPECTED(ValidateDevice(), "Device Type should be 'Ascend310' or 'CPU'.");
363   std::vector<std::shared_ptr<TensorOp>> transforms_rt;
364   CHECK_FAIL_RETURN_UNEXPECTED(BuildTransforms(&transforms_rt), "Building Transform ops failed!");
365 
366   if (device_type_ == MapTargetDevice::kCpu) {  // Case CPU
367     TensorRow de_tensor_list;
368     for (auto &tensor : input_tensor_list) {
369       std::shared_ptr<dataset::Tensor> de_tensor;
370       Status rc = dataset::Tensor::CreateFromMemory(
371         dataset::TensorShape(tensor.Shape()), MSTypeToDEType(static_cast<TypeId>(tensor.DataType())),
372         (const uchar *)(tensor.Data().get()), tensor.DataSize(), &de_tensor);
373       if (rc.IsError()) {
374         MS_LOG(ERROR) << rc;
375         RETURN_IF_NOT_OK(rc);
376       }
377       (void)de_tensor_list.emplace_back(std::move(de_tensor));
378     }
379     // Apply transforms on tensor
380     for (auto &t : transforms_rt) {
381       TensorRow de_output_list;
382       RETURN_IF_NOT_OK(t->Compute(de_tensor_list, &de_output_list));
383       // For next transform
384       de_tensor_list = std::move(de_output_list);
385     }
386     int32_t idx = 0;
387     for (auto &tensor : de_tensor_list) {
388       if (!tensor->HasData()) {
389         std::stringstream ss;
390         ss << "Transformation returned an empty tensor at location " << idx << ". ";
391         ss << "The shape of the tensor is " << tensor->shape();
392         MS_LOG(WARNING) << ss.str();
393       }
394       auto ms_tensor = mindspore::MSTensor(std::make_shared<DETensor>(tensor));
395       (void)output_tensor_list->emplace_back(ms_tensor);
396       ++idx;
397     }
398     CHECK_FAIL_RETURN_UNEXPECTED(!output_tensor_list->empty(), "Output Tensor is not valid.");
399   } else if (device_type_ ==
400              MapTargetDevice::kAscend310) {  // Ascend310 case, where we must set Ascend resource on each operations
401     CHECK_FAIL_RETURN_UNEXPECTED(device_resource_, "Device resource is nullptr which is illegal under case Ascend310.");
402     for (auto &input_tensor : input_tensor_list) {
403       // Sink each data from host into device
404       std::shared_ptr<dataset::DeviceTensor> device_input;
405       RETURN_IF_NOT_OK(device_resource_->Sink(input_tensor, &device_input));
406 
407       for (auto &t : transforms_rt) {
408         std::shared_ptr<DeviceTensor> device_output;
409         RETURN_IF_NOT_OK(t->SetAscendResource(device_resource_));
410 
411         RETURN_IF_NOT_OK(t->Compute(device_input, &device_output));
412 
413         // For next transform
414         device_input = std::move(device_output);
415       }
416       CHECK_FAIL_RETURN_UNEXPECTED(device_input->HasDeviceData(), "Apply transform failed, output tensor has no data");
417       // Due to the limitation of Ascend310 memory, we have to pop every data onto host memory
418       // So the speed of this batch method is slower than solo mode
419       std::shared_ptr<mindspore::dataset::Tensor> host_output;
420       RETURN_IF_NOT_OK(device_resource_->Pop(device_input, &host_output));
421 
422       auto ms_tensor = mindspore::MSTensor(std::make_shared<DETensor>(host_output));
423       (void)output_tensor_list->emplace_back(ms_tensor);
424       // Release the data on the device because we have copied one piece onto host
425       RETURN_IF_NOT_OK(device_resource_->DeviceDataRelease());
426     }
427     CHECK_FAIL_RETURN_UNEXPECTED(!output_tensor_list->empty(), "Output Tensor vector is empty.");
428   } else {
429     std::string err_msg = "Your input device is not supported. (Option: CPU or Ascend310)";
430     MS_LOG(ERROR) << err_msg;
431     RETURN_STATUS_UNEXPECTED(err_msg);
432   }
433   return Status::OK();
434 }
435 
operator ()(const std::vector<std::shared_ptr<Tensor>> & input_tensor_list,std::vector<std::shared_ptr<Tensor>> * out)436 Status PyExecute::operator()(const std::vector<std::shared_ptr<Tensor>> &input_tensor_list,
437                              std::vector<std::shared_ptr<Tensor>> *out) {
438   // Validate input tensor
439   CHECK_FAIL_RETURN_UNEXPECTED(!input_tensor_list.empty(), "Input Tensor is not valid.");
440   RETURN_UNEXPECTED_IF_NULL(out);
441   out->clear();
442   for (auto &tensor : input_tensor_list) {
443     CHECK_FAIL_RETURN_UNEXPECTED(tensor->Size() > 0, "Input Tensor has no data.");
444   }
445   CHECK_FAIL_RETURN_UNEXPECTED(ValidateDevice(), "Device Type should be 'CPU'.");
446 
447   std::vector<std::shared_ptr<TensorOp>> transforms_rt;
448   CHECK_FAIL_RETURN_UNEXPECTED(BuildTransforms(&transforms_rt), "Building Transform ops failed!");
449 
450   if (!transforms_rt[0]->IsDvppOp()) {
451     TensorRow de_tensor_list(input_tensor_list);
452 
453     // Apply transforms on tensor
454     for (auto &t : transforms_rt) {
455       TensorRow de_output_list;
456       RETURN_IF_NOT_OK(t->Compute(de_tensor_list, &de_output_list));
457       // For next transform
458       de_tensor_list = std::move(de_output_list);
459     }
460     *out = std::move(de_tensor_list.getRow());
461     CHECK_FAIL_RETURN_UNEXPECTED(!out->empty(), "Output Tensor is not valid.");
462 #if !defined(BUILD_LITE) && defined(ENABLE_D)
463   } else if (transforms_rt[0]->IsDvppOp() == true) {
464     (void)InitResource(MapTargetDevice::kAscend910B);
465 
466     // construct the device tensor list by host tensor
467     std::vector<std::shared_ptr<DeviceTensorAscend910B>> device_tensor_list;
468     for (auto &item : input_tensor_list) {
469       std::shared_ptr<DeviceTensorAscend910B> device_tensor = nullptr;
470       // here we use the first op's IsHWC() to create device tensor
471       if (transforms_rt[0]->Name() == kDvppConvertColorOp) {
472         std::vector<int> channels = {1, 3, 4};
473         RETURN_IF_NOT_OK(DeviceTensorAscend910B::CreateDeviceTensor(item, device_context_, stream_id_, &device_tensor,
474                                                                     transforms_rt[0]->IsHWC(), channels));
475       } else {
476         RETURN_IF_NOT_OK(DeviceTensorAscend910B::CreateDeviceTensor(item, device_context_, stream_id_, &device_tensor,
477                                                                     transforms_rt[0]->IsHWC()));
478       }
479       device_tensor_list.push_back(std::move(device_tensor));
480     }
481 
482     // hold all the inputs and release them when the executor finish
483     std::vector<std::vector<std::shared_ptr<DeviceTensorAscend910B>>> hold_input_list;
484 
485     for (auto &t : transforms_rt) {
486       MS_LOG(INFO) << "Execute " << t->Name() << " transform.";
487       std::vector<std::shared_ptr<DeviceTensorAscend910B>> device_output_list;
488 
489       // if the op is Decode, we should get the height and width form JPEG header and create the output tensor first
490       int img_width = 0;
491       int img_height = 0;
492       if (t->Name() == kDvppDecodeOp) {
493         for (int32_t k = 0; k < input_tensor_list.size(); k++) {
494           CHECK_FAIL_RETURN_UNEXPECTED(input_tensor_list[k]->shape().Rank() == 1,
495                                        "Invalid data shape. Currently only support 1D. Its rank is: " +
496                                          std::to_string(input_tensor_list[k]->shape().Rank()));
497           CHECK_FAIL_RETURN_UNEXPECTED(IsNonEmptyJPEG(input_tensor_list[k]) == true,
498                                        "Invalid image type. Currently only support JPG. Its shape is: " +
499                                          input_tensor_list[k]->shape().ToString());
500           CHECK_FAIL_RETURN_UNEXPECTED(
501             input_tensor_list[k]->type() == DataType::DE_UINT8,
502             "Invalid data type. Currently only support uint8. Its type is: " + input_tensor_list[k]->type().ToString());
503           RETURN_IF_NOT_OK(GetJpegImageInfo(input_tensor_list[k], &img_width, &img_height));
504           TensorShape shape{1, img_height, img_width, 3};
505           DataType type(DataType::DE_UINT8);
506           std::shared_ptr<DeviceTensorAscend910B> device_tensor = nullptr;
507           RETURN_IF_NOT_OK(
508             DeviceTensorAscend910B::CreateDeviceTensor(shape, type, device_tensor_list[k]->GetDeviceContext(),
509                                                        device_tensor_list[k]->GetStreamID(), &device_tensor));
510           device_output_list.push_back(std::move(device_tensor));
511         }
512       }
513 
514       RETURN_IF_NOT_OK(t->Compute(device_tensor_list, &device_output_list));
515 
516       // move the input to the hold_input_list first
517       hold_input_list.push_back(std::move(device_tensor_list));
518 
519       // For next transform
520       device_tensor_list = std::move(device_output_list);
521     }
522 
523     // the transforms all done, do SyncStream
524     if (!device_context_->device_res_manager_->SyncStream(stream_id_)) {
525       std::string err_msg = "SyncStream stream id: " + std::to_string(stream_id_) + " failed.";
526       RETURN_STATUS_UNEXPECTED(err_msg);
527     }
528 
529     // copy the data from device tensor to host tensor
530     std::vector<std::shared_ptr<Tensor>> host_out(device_tensor_list.size());
531     int32_t i = 0;
532     for (auto &item : device_tensor_list) {
533       CHECK_FAIL_RETURN_UNEXPECTED(item->ToHostTensor(&host_out[i]), "Copy tensor from device to host failed.");
534       i += 1;
535     }
536     *out = std::move(host_out);
537     CHECK_FAIL_RETURN_UNEXPECTED(!out->empty(), "Output Tensor is not valid.");
538 
539     // release all the inputs' device memory and workspace memory
540     for (auto &tensor_list : hold_input_list) {
541       for (auto &item : tensor_list) {
542         if (!item->ReleaseDeviceMemory()) {
543           std::string err_msg = "Release the device memory failed after the dvpp ops executed.";
544           MS_LOG(ERROR) << err_msg;
545           RETURN_STATUS_UNEXPECTED(err_msg);
546         }
547       }
548     }
549 #endif
550   } else {
551     std::string err_msg = "Your input device is not supported. (Option: CPU or Ascend910B)";
552     MS_LOG(ERROR) << err_msg;
553     RETURN_STATUS_UNEXPECTED(err_msg);
554   }
555   return Status::OK();
556 }
557 
AippSizeFilter(const std::vector<uint32_t> & resize_para,const std::vector<uint32_t> & crop_para)558 std::vector<uint32_t> AippSizeFilter(const std::vector<uint32_t> &resize_para, const std::vector<uint32_t> &crop_para) {
559   std::vector<uint32_t> aipp_size;
560 
561   // Special condition where (no Crop and no Resize) or (no Crop and resize with fixed ratio) will lead to dynamic input
562   if ((resize_para.empty() || resize_para.size() == 1) && crop_para.empty()) {
563     aipp_size = {0, 0};
564     MS_LOG(WARNING) << "Dynamic input shape is not supported, incomplete aipp config file will be generated. Please "
565                        "checkout your TensorTransform input, both src_image_size_h and src_image_size will be 0.";
566     return aipp_size;
567   }
568 
569   if (resize_para.empty()) {  // If only Crop operation exists
570     aipp_size = crop_para;
571   } else if (crop_para.empty()) {  // If only Resize operation with 2 parameters exists
572     aipp_size = resize_para;
573   } else {  // If both of them exist
574     if (resize_para.size() == 1) {
575       aipp_size = crop_para;
576     } else {
577       aipp_size =
578         *min_element(resize_para.begin(), resize_para.end()) < *min_element(crop_para.begin(), crop_para.end())
579           ? resize_para
580           : crop_para;
581     }
582   }
583 
584   aipp_size[0] = DVPP_ALIGN_UP(aipp_size[0], VPC_HEIGHT_ALIGN);  // H
585   aipp_size[1] = DVPP_ALIGN_UP(aipp_size[1], VPC_WIDTH_ALIGN);   // W
586   return aipp_size;
587 }
588 
AippMeanFilter(const std::vector<uint32_t> & normalize_para)589 std::vector<uint32_t> AippMeanFilter(const std::vector<uint32_t> &normalize_para) {
590   std::vector<uint32_t> aipp_mean;
591   if (normalize_para.size() == 6) {  // If Normalize operation exist
592     std::transform(normalize_para.begin(), normalize_para.begin() + 3, std::back_inserter(aipp_mean),
593                    [](uint32_t i) { return static_cast<uint32_t>(i / 10000); });
594   } else {
595     aipp_mean = {0, 0, 0};
596   }
597   return aipp_mean;
598 }
599 
AippStdFilter(const std::vector<uint32_t> & normalize_para)600 std::vector<float> AippStdFilter(const std::vector<uint32_t> &normalize_para) {
601   std::vector<float> aipp_std;
602   if (normalize_para.size() == 6) {  // If Normalize operation exist
603     auto zeros = std::find(std::begin(normalize_para), std::end(normalize_para), 0);
604     if (zeros == std::end(normalize_para)) {
605       if (std::any_of(normalize_para.begin() + 3, normalize_para.end(), [](uint32_t i) { return i == 0; })) {
606         MS_LOG(ERROR) << "value in normalize para got 0.";
607         return {};
608       }
609       std::transform(normalize_para.begin() + 3, normalize_para.end(), std::back_inserter(aipp_std),
610                      [](uint32_t i) { return 10000 / static_cast<float>(i); });
611     } else {  // If 0 occurs in std vector
612       MS_LOG(WARNING) << "Detect 0 in std vector, please verify your input.";
613       aipp_std = {1.0, 1.0, 1.0};
614     }
615   } else {
616     aipp_std = {1.0, 1.0, 1.0};
617   }
618   return aipp_std;
619 }
620 
AippInfoCollection(std::map<std::string,std::string> * aipp_options,const std::vector<uint32_t> & aipp_size,const std::vector<uint32_t> & aipp_mean,const std::vector<float> & aipp_std)621 Status AippInfoCollection(std::map<std::string, std::string> *aipp_options, const std::vector<uint32_t> &aipp_size,
622                           const std::vector<uint32_t> &aipp_mean, const std::vector<float> &aipp_std) {
623   RETURN_UNEXPECTED_IF_NULL(aipp_options);
624   // Several aipp config parameters
625   aipp_options->insert(std::make_pair("related_input_rank", "0"));
626   aipp_options->insert(std::make_pair("src_image_size_w", std::to_string(aipp_size[1])));
627   aipp_options->insert(std::make_pair("src_image_size_h", std::to_string(aipp_size[0])));
628   aipp_options->insert(std::make_pair("crop", "false"));
629   aipp_options->insert(std::make_pair("input_format", "YUV420SP_U8"));
630   aipp_options->insert(std::make_pair("aipp_mode", "static"));
631   aipp_options->insert(std::make_pair("csc_switch", "true"));
632   aipp_options->insert(std::make_pair("rbuv_swap_switch", "false"));
633   // Y = AX + b, this part is A
634   std::vector<int32_t> color_space_matrix = {256, 0, 359, 256, -88, -183, 256, 454, 0};
635   int count = 0;
636   for (int i = 0; i < 3; i++) {
637     for (int j = 0; j < 3; j++) {
638       std::string key_word = "matrix_r" + std::to_string(i) + "c" + std::to_string(j);
639       aipp_options->insert(std::make_pair(key_word, std::to_string(color_space_matrix[count])));
640       ++count;
641     }
642   }
643   // This part is b
644   std::vector<uint32_t> color_space_bias = {0, 128, 128};
645   for (int i = 0; i < 3; i++) {
646     std::string key_word = "input_bias_" + std::to_string(i);
647     aipp_options->insert(std::make_pair(key_word, std::to_string(color_space_bias[i])));
648   }
649   // Y = (X - mean - min) * [std^(-1)], this part is mean
650   for (int i = 0; i < aipp_mean.size(); i++) {
651     std::string key_word = "mean_chn_" + std::to_string(i);
652     aipp_options->insert(std::make_pair(key_word, std::to_string(aipp_mean[i])));
653   }
654   // This part is min
655   for (int i = 0; i < aipp_mean.size(); i++) {
656     std::string key_word = "min_chn_" + std::to_string(i);
657     aipp_options->insert(std::make_pair(key_word, "0.0"));
658   }
659   // This part is std^(-1)
660   for (int i = 0; i < aipp_std.size(); i++) {
661     std::string key_word = "var_reci_chn_" + std::to_string(i);
662     aipp_options->insert(std::make_pair(key_word, std::to_string(aipp_std[i])));
663   }
664   return Status::OK();
665 }
666 
AippCfgGenerator()667 std::string Execute::AippCfgGenerator() {
668   std::string config_location = "./aipp.cfg";
669   if (info_ == nullptr) {
670     MS_LOG(ERROR) << "info_ is null";
671     return "";
672   }
673 #if defined(WITH_BACKEND) || defined(ENABLE_ACL)
674   if (info_->init_with_shared_ptr_) {
675     auto rc = ParseTransforms();
676     RETURN_SECOND_IF_ERROR(rc, "");
677     info_->init_with_shared_ptr_ = false;
678   }
679   std::vector<uint32_t> paras;  // Record the parameters value of each Ascend operations
680   for (int32_t i = 0; i < ops_.size(); i++) {
681     // Validate operation ir
682     json ir_info;
683     if (ops_[i] == nullptr) {
684       MS_LOG(ERROR) << "Input TensorOperation[" + std::to_string(i) + "] is null.";
685       return "";
686     }
687 
688     // Define map between operation name and parameter name
689     auto rc = ops_[i]->to_json(&ir_info);
690     if (rc.IsError()) {
691       MS_LOG(ERROR) << "IR information serialize to json failed, error msg is " << rc;
692       return "";
693     }
694 
695     // Collect the information of operations
696     for (auto pos = info_->op2para_map_.equal_range(ops_[i]->Name()); pos.first != pos.second; ++pos.first) {
697       auto paras_key_word = pos.first->second;
698       paras = ir_info[paras_key_word].get<std::vector<uint32_t>>();
699       info_->aipp_cfg_.insert(std::make_pair(ops_[i]->Name(), paras));
700     }
701   }
702 
703   std::ofstream outfile;
704   outfile.open(config_location, std::ofstream::out);
705 
706   if (!outfile.is_open()) {
707     MS_LOG(ERROR) << "Fail to open Aipp config file, please verify your system config(including authority)."
708                   << "We will return empty string which represent the location of Aipp config file in this case.";
709     return "";
710   }
711 
712   if (device_type_ == MapTargetDevice::kAscend310) {
713     // Process resize parameters and crop parameters to find out the final size of input data
714     std::vector<uint32_t> resize_paras;
715     std::vector<uint32_t> crop_paras;
716 
717     // Find resize parameters
718     std::map<std::string, std::vector<uint32_t>>::iterator iter;
719     if (info_->aipp_cfg_.find(vision::kDvppResizeJpegOperation) != info_->aipp_cfg_.end()) {
720       iter = info_->aipp_cfg_.find(vision::kDvppResizeJpegOperation);
721       resize_paras = iter->second;
722     } else if (info_->aipp_cfg_.find(vision::kDvppDecodeResizeOperation) != info_->aipp_cfg_.end()) {
723       iter = info_->aipp_cfg_.find(vision::kDvppDecodeResizeOperation);
724       resize_paras = iter->second;
725     }
726 
727     // Find crop parameters
728     if (info_->aipp_cfg_.find(vision::kDvppCropJpegOperation) != info_->aipp_cfg_.end()) {
729       iter = info_->aipp_cfg_.find(vision::kDvppCropJpegOperation);
730       crop_paras = iter->second;
731     } else if (info_->aipp_cfg_.find(vision::kDvppDecodeResizeCropOperation) != info_->aipp_cfg_.end()) {
732       iter = info_->aipp_cfg_.find(vision::kDvppDecodeResizeCropOperation);
733       crop_paras = iter->second;
734     }
735     if (crop_paras.size() == 1) {
736       (void)crop_paras.emplace_back(crop_paras[0]);
737     }
738 
739     std::vector<uint32_t> aipp_size = AippSizeFilter(resize_paras, crop_paras);
740 
741     // Process Normalization parameters to find out the final Normalization parameters for Aipp module
742     std::vector<uint32_t> normalize_paras;
743     if (info_->aipp_cfg_.find(vision::kDvppNormalizeOperation) != info_->aipp_cfg_.end()) {
744       for (auto pos = info_->aipp_cfg_.equal_range(vision::kDvppNormalizeOperation); pos.first != pos.second;
745            ++pos.first) {
746         auto mean_or_std = pos.first->second;
747         normalize_paras.insert(normalize_paras.end(), mean_or_std.begin(), mean_or_std.end());
748       }
749     }
750 
751     std::vector<uint32_t> aipp_mean = AippMeanFilter(normalize_paras);
752     std::vector<float> aipp_std = AippStdFilter(normalize_paras);
753 
754     std::map<std::string, std::string> aipp_options;
755     auto rc = AippInfoCollection(&aipp_options, aipp_size, aipp_mean, aipp_std);
756     if (rc.IsError()) {
757       MS_LOG(ERROR) << "Aipp information initialization failed, error msg is " << rc;
758       outfile.close();
759       return "";
760     }
761 
762     std::string tab_char(4, ' ');
763     outfile << "aipp_op {" << std::endl;
764     for (auto &option : aipp_options) {
765       outfile << tab_char << option.first << " : " << option.second << std::endl;
766     }
767     outfile << "}";
768     outfile.close();
769   } else {  // For case GPU or CPU
770     outfile << "aipp_op {" << std::endl << "}";
771     outfile.close();
772     MS_LOG(WARNING) << "Your runtime environment is not Ascend310, this config file will lead to undefined behavior on "
773                        "computing result. Please check that.";
774   }
775 
776   platform::ChangeFileMode(config_location, S_IRUSR | S_IWUSR);
777 #endif
778   return config_location;
779 }
780 
IsEmptyPtr(const std::shared_ptr<TensorTransform> & api_ptr)781 bool IsEmptyPtr(const std::shared_ptr<TensorTransform> &api_ptr) { return api_ptr == nullptr; }
782 
ParseTransforms()783 Status Execute::ParseTransforms() {
784   auto iter = std::find_if(transforms_.begin(), transforms_.end(), IsEmptyPtr);
785   if (iter != transforms_.end()) {
786     std::string err_msg = "Your input TensorTransforms contain at least one nullptr, please check your input.";
787     MS_LOG(ERROR) << err_msg;
788     RETURN_STATUS_UNEXPECTED(err_msg);
789   }
790 
791   if (device_type_ == MapTargetDevice::kCpu) {
792     (void)std::transform(transforms_.begin(), transforms_.end(), std::back_inserter(ops_),
793                          [](const std::shared_ptr<TensorTransform> &operation) -> std::shared_ptr<TensorOperation> {
794                            return operation->Parse();
795                          });
796   } else if (device_type_ == MapTargetDevice::kAscend310) {
797     for (auto &transform_ : transforms_) {
798       (void)ops_.emplace_back(transform_->Parse(device_type_));
799     }
800   } else if (device_type_ == MapTargetDevice::kAscend910B) {
801     for (auto &transform_ : transforms_) {
802       (void)ops_.emplace_back(transform_->Parse(device_type_));
803     }
804   } else {
805     std::string err_msg = "Your input device is not supported. (Option: CPU or Ascend310)";
806     MS_LOG(ERROR) << err_msg;
807     RETURN_STATUS_UNEXPECTED(err_msg);
808   }
809 
810   return Status::OK();
811 }
812 
ValidateDevice()813 Status Execute::ValidateDevice() {
814   if (device_type_ != MapTargetDevice::kCpu && device_type_ != MapTargetDevice::kAscend310 &&
815       device_type_ != MapTargetDevice::kGpu && device_type_ != MapTargetDevice::kAscend910B) {
816     std::string err_msg = "Your input device is not supported. (Option: CPU, GPU, Ascend310 or Ascend910B).";
817     MS_LOG(ERROR) << err_msg;
818     RETURN_STATUS_UNEXPECTED(err_msg);
819   }
820   return Status::OK();
821 }
822 
DeviceMemoryRelease()823 Status Execute::DeviceMemoryRelease() {
824   CHECK_FAIL_RETURN_UNEXPECTED(device_resource_, "Device resource is nullptr which is illegal under case Ascend310.");
825   Status rc = device_resource_->DeviceDataRelease();
826   if (rc.IsError()) {
827     std::string err_msg = "Error in device data release";
828     MS_LOG(ERROR) << err_msg;
829     RETURN_STATUS_UNEXPECTED(err_msg);
830   }
831   return Status::OK();
832 }
833 
Run(const std::vector<std::shared_ptr<dataset::Execute>> & data_graph,const std::vector<mindspore::MSTensor> & inputs,std::vector<mindspore::MSTensor> * outputs)834 Status Execute::Run(const std::vector<std::shared_ptr<dataset::Execute>> &data_graph,
835                     const std::vector<mindspore::MSTensor> &inputs, std::vector<mindspore::MSTensor> *outputs) {
836   RETURN_UNEXPECTED_IF_NULL(outputs);
837   std::vector<MSTensor> transform_inputs = inputs;
838   std::vector<MSTensor> transform_outputs;
839   if (!data_graph.empty()) {
840     for (const auto &exes : data_graph) {
841       CHECK_FAIL_RETURN_UNEXPECTED(exes != nullptr, "Given execute object is null.");
842       Status ret = exes->operator()(transform_inputs, &transform_outputs);
843       if (ret != kSuccess) {
844         MS_LOG(ERROR) << "Run preprocess failed:" << ret.GetErrDescription();
845         return ret;
846       }
847       MS_LOG(DEBUG) << "transform_outputs[0].Shape: " << transform_outputs[0].Shape();
848       transform_inputs = transform_outputs;
849     }
850     *outputs = std::move(transform_outputs);
851   } else {
852     std::string msg = "The set of Executors can not be empty.";
853     MS_LOG(ERROR) << msg;
854     RETURN_STATUS_UNEXPECTED(msg);
855   }
856   return Status::OK();
857 }
858 
859 // In the current stage, there is a cyclic dependency between libmindspore.so and c_dataengine.so,
860 // we make a C function here and dlopen by libminspore.so to avoid linking explicitly,
861 // will be fix after decouling libminspore.so into multi submodules
862 extern "C" {
863 // ExecuteRun_C has C-linkage specified, but returns user-defined type 'mindspore::Status' which is incompatible with C
ExecuteRun_C(const std::vector<std::shared_ptr<dataset::Execute>> & data_graph,const std::vector<mindspore::MSTensor> & inputs,std::vector<mindspore::MSTensor> * outputs,Status * s)864 void ExecuteRun_C(const std::vector<std::shared_ptr<dataset::Execute>> &data_graph,
865                   const std::vector<mindspore::MSTensor> &inputs, std::vector<mindspore::MSTensor> *outputs,
866                   Status *s) {
867   Status ret = Execute::Run(data_graph, inputs, outputs);
868   if (s == nullptr) {
869     return;
870   }
871   *s = Status(ret);
872 }
873 }
874 }  // namespace dataset
875 }  // namespace mindspore
876