• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2020-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "minddata/dataset/kernels/py_func_op.h"
17 
18 #include <memory>
19 #include <vector>
20 
21 #include "minddata/dataset/core/tensor.h"
22 #include "minddata/dataset/kernels/ir/data/transforms_ir.h"
23 #include "minddata/dataset/kernels/tensor_op.h"
24 #include "minddata/dataset/util/status.h"
25 #include "minddata/dataset/util/validators.h"
26 
27 namespace mindspore {
28 namespace dataset {
ConvertNumpyToTensor(const py::object & py_obj,TensorRow * output)29 Status ConvertNumpyToTensor(const py::object &py_obj, TensorRow *output) {
30   RETURN_UNEXPECTED_IF_NULL(output);
31   std::shared_ptr<Tensor> out;
32   // Python object like bool, int, float, list or tuple can also be converted
33   // to a NumPy array by the following cast, but the data type will be unknown
34   // if it is not a valid NumPy object
35   RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(py_obj.cast<py::array>(), &out));
36   output->push_back(out);
37   return Status::OK();
38 }
39 
ConvertPythonToTensor(const py::object & py_obj,TensorRow * output)40 Status ConvertPythonToTensor(const py::object &py_obj, TensorRow *output) {
41   RETURN_UNEXPECTED_IF_NULL(output);
42   // Python objects such as dictionary are converted to a tensor
43   // Note that the tensor will hold a reference to the python object while
44   // the python object will be kept alive in Python layer.
45   std::shared_ptr<Tensor> out;
46   RETURN_IF_NOT_OK(Tensor::CreateFromPythonObject(py_obj, &out));
47   output->push_back(out);
48   return Status::OK();
49 }
50 
Compute(const TensorRow & input,TensorRow * output)51 Status PyFuncOp::Compute(const TensorRow &input, TensorRow *output) {
52   IO_CHECK_VECTOR(input, output);
53   {
54     RETURN_IF_NOT_OK(CollectOpInfoStart(this->Name(), "AcquireGIL"));
55     // Acquire Python GIL
56     py::gil_scoped_acquire gil_acquire;
57     RETURN_IF_NOT_OK(CollectOpInfoEnd(this->Name(), "AcquireGIL"));
58     if (Py_IsInitialized() == 0) {
59       return Status(StatusCode::kMDPythonInterpreterFailure, "Python Interpreter is finalized");
60     }
61     try {
62       // Transform input tensor vector into numpy array vector
63       py::object ret_py_obj;
64       if (input.size() > 0) {
65         py::tuple input_args(input.size());
66         for (size_t i = 0; i < input.size(); i++) {
67           if (input.at(i)->type().IsPython()) {
68             py::dict new_data;
69             RETURN_IF_NOT_OK(input.at(i)->GetDataAsPythonObject(&new_data));
70             input_args[i] = new_data;
71           } else {
72             py::array new_data;
73             RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data));
74             // possible memcpy here
75             input_args[i] = new_data;
76           }
77         }
78         // Invoke python function
79         ret_py_obj = this->py_func_ptr_(*input_args);
80       } else {
81         ret_py_obj = this->py_func_ptr_();
82       }
83       if (output_type_ != DataType::DE_UNKNOWN) {
84         RETURN_IF_NOT_OK(CastOutput(ret_py_obj, output));
85       } else {
86         // scenario 1: map multi-processing, subprocess stop first and will get none
87         // scenario 2: thread mode, user pyfunc return none
88         if (ret_py_obj.is_none()) {
89           std::string error_msg =
90             "The subprocess of dataset may exit unexpected or be killed, "
91             "main process will exit. If this is not an artificial operation, you can use "
92             "mindspore.dataset.config.set_enable_watchdog(False) to block this error.";
93           RETURN_STATUS_UNEXPECTED("Got None from Python object. " + error_msg);
94         } else if (py::isinstance<py::tuple>(ret_py_obj)) {
95           // In case of a n-m mapping, the return value will be a tuple of numpy arrays
96           auto ret_py_tuple = ret_py_obj.cast<py::tuple>();
97           // Iterate over two containers simultaneously for memory copy
98           for (size_t i = 0; i < ret_py_tuple.size(); i++) {
99             py::object ret_py_ele = ret_py_tuple[i];
100             // Object is none if pyfunc timeout
101             if (ret_py_ele.is_none()) {
102               MS_LOG(INFO) << "Expected pyfunc to return NumPy array(s) or Python dict(s), but got None. "
103                               "If python_multiprocessing is True, it may be due to pyfunc execution timeout.";
104               return STATUS_ERROR(StatusCode::kMDTimeOut,
105                                   "Expect pyfunc to return numpy array(s), but got None. If python_multiprocessing is "
106                                   "True, it maybe due to pyfunc execution timeout.");
107             } else if (py::isinstance<py::dict>(ret_py_ele)) {
108               RETURN_IF_NOT_OK(ConvertPythonToTensor(ret_py_ele, output));
109             } else {
110               RETURN_IF_NOT_OK(ConvertNumpyToTensor(ret_py_ele, output));
111             }
112           }
113         } else {
114           // In case of a n-1 mapping, the return value will be a numpy array or a python object
115           // Note that for Python dictionaries, only a reference will be stored in tensor.
116           if (py::isinstance<py::dict>(ret_py_obj)) {
117             RETURN_IF_NOT_OK(ConvertPythonToTensor(ret_py_obj, output));
118           } else {
119             RETURN_IF_NOT_OK(ConvertNumpyToTensor(ret_py_obj, output));
120           }
121         }
122       }
123     } catch (const py::error_already_set &e) {
124       return Status(StatusCode::kMDPyFuncException, e.what());
125     }
126   }
127   return Status::OK();
128 }
129 
CastOutput(const py::object & ret_py_obj,TensorRow * output)130 Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) {
131   RETURN_UNEXPECTED_IF_NULL(output);
132   try {
133     std::shared_ptr<Tensor> out;
134     switch (output_type_) {
135       case DataType::DE_INT32:
136         RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({1}), DataType(DataType::DE_INT32), &out));
137         RETURN_IF_NOT_OK(out->SetItemAt({0}, ret_py_obj.cast<int32_t>()));
138         break;
139       case DataType::DE_BOOL:
140         RETURN_IF_NOT_OK(Tensor::CreateScalar(ret_py_obj.cast<bool>(), &out));
141         break;
142       default:
143         RETURN_STATUS_UNEXPECTED("No cast for the specified DataType was found.");
144     }
145     output->push_back(out);
146   } catch (const std::exception &e) {
147     RETURN_STATUS_UNEXPECTED(e.what());
148   }
149   return Status::OK();
150 }
151 
to_json(nlohmann::json * out_json)152 Status PyFuncOp::to_json(nlohmann::json *out_json) {
153   RETURN_UNEXPECTED_IF_NULL(out_json);
154   nlohmann::json args;
155   {
156     py::gil_scoped_acquire gil_acquire;
157     if (py_func_ptr_.attr("to_json")) {
158       args = nlohmann::json::parse(py_func_ptr_.attr("to_json")().cast<std::string>());
159     }
160   }
161   *out_json = args;
162   return Status::OK();
163 }
164 
from_json(nlohmann::json json_obj,std::vector<std::shared_ptr<TensorOperation>> * result)165 Status PyFuncOp::from_json(nlohmann::json json_obj, std::vector<std::shared_ptr<TensorOperation>> *result) {
166   RETURN_UNEXPECTED_IF_NULL(result);
167   std::vector<std::shared_ptr<TensorOperation>> output;
168   RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "tensor_op_name", kPyFuncOp));
169   RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "tensor_op_params", kPyFuncOp));
170   std::string op_name = json_obj["tensor_op_name"];
171   nlohmann::json op_params = json_obj["tensor_op_params"];
172   std::string python_module = json_obj["python_module"];
173   std::shared_ptr<TensorOperation> operation = nullptr;
174   py::function py_func =
175     py::module::import(python_module.c_str()).attr(op_name.c_str()).attr("from_json")(op_params.dump());
176   operation = std::make_shared<transforms::PreBuiltOperation>(std::make_shared<PyFuncOp>(py_func));
177   output.push_back(operation);
178   *result = output;
179   return Status::OK();
180 }
181 
IsRandom()182 bool PyFuncOp::IsRandom() {
183   bool random = true;
184   if (py::hasattr(py_func_ptr_, "random") &&
185       !static_cast<bool>(py::reinterpret_borrow<py::bool_>(py_func_ptr_.attr("random")))) {
186     random = false;
187   }
188   return random;
189 }
190 }  // namespace dataset
191 }  // namespace mindspore
192