1 /**
2 * Copyright 2020-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "minddata/dataset/kernels/py_func_op.h"
17
18 #include <memory>
19 #include <vector>
20
21 #include "minddata/dataset/core/tensor.h"
22 #include "minddata/dataset/kernels/ir/data/transforms_ir.h"
23 #include "minddata/dataset/kernels/tensor_op.h"
24 #include "minddata/dataset/util/status.h"
25 #include "minddata/dataset/util/validators.h"
26
27 namespace mindspore {
28 namespace dataset {
ConvertNumpyToTensor(const py::object & py_obj,TensorRow * output)29 Status ConvertNumpyToTensor(const py::object &py_obj, TensorRow *output) {
30 RETURN_UNEXPECTED_IF_NULL(output);
31 std::shared_ptr<Tensor> out;
32 // Python object like bool, int, float, list or tuple can also be converted
33 // to a NumPy array by the following cast, but the data type will be unknown
34 // if it is not a valid NumPy object
35 RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(py_obj.cast<py::array>(), &out));
36 output->push_back(out);
37 return Status::OK();
38 }
39
ConvertPythonToTensor(const py::object & py_obj,TensorRow * output)40 Status ConvertPythonToTensor(const py::object &py_obj, TensorRow *output) {
41 RETURN_UNEXPECTED_IF_NULL(output);
42 // Python objects such as dictionary are converted to a tensor
43 // Note that the tensor will hold a reference to the python object while
44 // the python object will be kept alive in Python layer.
45 std::shared_ptr<Tensor> out;
46 RETURN_IF_NOT_OK(Tensor::CreateFromPythonObject(py_obj, &out));
47 output->push_back(out);
48 return Status::OK();
49 }
50
Compute(const TensorRow & input,TensorRow * output)51 Status PyFuncOp::Compute(const TensorRow &input, TensorRow *output) {
52 IO_CHECK_VECTOR(input, output);
53 {
54 RETURN_IF_NOT_OK(CollectOpInfoStart(this->Name(), "AcquireGIL"));
55 // Acquire Python GIL
56 py::gil_scoped_acquire gil_acquire;
57 RETURN_IF_NOT_OK(CollectOpInfoEnd(this->Name(), "AcquireGIL"));
58 if (Py_IsInitialized() == 0) {
59 return Status(StatusCode::kMDPythonInterpreterFailure, "Python Interpreter is finalized");
60 }
61 try {
62 // Transform input tensor vector into numpy array vector
63 py::object ret_py_obj;
64 if (input.size() > 0) {
65 py::tuple input_args(input.size());
66 for (size_t i = 0; i < input.size(); i++) {
67 if (input.at(i)->type().IsPython()) {
68 py::dict new_data;
69 RETURN_IF_NOT_OK(input.at(i)->GetDataAsPythonObject(&new_data));
70 input_args[i] = new_data;
71 } else {
72 py::array new_data;
73 RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data));
74 // possible memcpy here
75 input_args[i] = new_data;
76 }
77 }
78 // Invoke python function
79 ret_py_obj = this->py_func_ptr_(*input_args);
80 } else {
81 ret_py_obj = this->py_func_ptr_();
82 }
83 if (output_type_ != DataType::DE_UNKNOWN) {
84 RETURN_IF_NOT_OK(CastOutput(ret_py_obj, output));
85 } else {
86 // scenario 1: map multi-processing, subprocess stop first and will get none
87 // scenario 2: thread mode, user pyfunc return none
88 if (ret_py_obj.is_none()) {
89 std::string error_msg =
90 "The subprocess of dataset may exit unexpected or be killed, "
91 "main process will exit. If this is not an artificial operation, you can use "
92 "mindspore.dataset.config.set_enable_watchdog(False) to block this error.";
93 RETURN_STATUS_UNEXPECTED("Got None from Python object. " + error_msg);
94 } else if (py::isinstance<py::tuple>(ret_py_obj)) {
95 // In case of a n-m mapping, the return value will be a tuple of numpy arrays
96 auto ret_py_tuple = ret_py_obj.cast<py::tuple>();
97 // Iterate over two containers simultaneously for memory copy
98 for (size_t i = 0; i < ret_py_tuple.size(); i++) {
99 py::object ret_py_ele = ret_py_tuple[i];
100 // Object is none if pyfunc timeout
101 if (ret_py_ele.is_none()) {
102 MS_LOG(INFO) << "Expected pyfunc to return NumPy array(s) or Python dict(s), but got None. "
103 "If python_multiprocessing is True, it may be due to pyfunc execution timeout.";
104 return STATUS_ERROR(StatusCode::kMDTimeOut,
105 "Expect pyfunc to return numpy array(s), but got None. If python_multiprocessing is "
106 "True, it maybe due to pyfunc execution timeout.");
107 } else if (py::isinstance<py::dict>(ret_py_ele)) {
108 RETURN_IF_NOT_OK(ConvertPythonToTensor(ret_py_ele, output));
109 } else {
110 RETURN_IF_NOT_OK(ConvertNumpyToTensor(ret_py_ele, output));
111 }
112 }
113 } else {
114 // In case of a n-1 mapping, the return value will be a numpy array or a python object
115 // Note that for Python dictionaries, only a reference will be stored in tensor.
116 if (py::isinstance<py::dict>(ret_py_obj)) {
117 RETURN_IF_NOT_OK(ConvertPythonToTensor(ret_py_obj, output));
118 } else {
119 RETURN_IF_NOT_OK(ConvertNumpyToTensor(ret_py_obj, output));
120 }
121 }
122 }
123 } catch (const py::error_already_set &e) {
124 return Status(StatusCode::kMDPyFuncException, e.what());
125 }
126 }
127 return Status::OK();
128 }
129
CastOutput(const py::object & ret_py_obj,TensorRow * output)130 Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) {
131 RETURN_UNEXPECTED_IF_NULL(output);
132 try {
133 std::shared_ptr<Tensor> out;
134 switch (output_type_) {
135 case DataType::DE_INT32:
136 RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({1}), DataType(DataType::DE_INT32), &out));
137 RETURN_IF_NOT_OK(out->SetItemAt({0}, ret_py_obj.cast<int32_t>()));
138 break;
139 case DataType::DE_BOOL:
140 RETURN_IF_NOT_OK(Tensor::CreateScalar(ret_py_obj.cast<bool>(), &out));
141 break;
142 default:
143 RETURN_STATUS_UNEXPECTED("No cast for the specified DataType was found.");
144 }
145 output->push_back(out);
146 } catch (const std::exception &e) {
147 RETURN_STATUS_UNEXPECTED(e.what());
148 }
149 return Status::OK();
150 }
151
to_json(nlohmann::json * out_json)152 Status PyFuncOp::to_json(nlohmann::json *out_json) {
153 RETURN_UNEXPECTED_IF_NULL(out_json);
154 nlohmann::json args;
155 {
156 py::gil_scoped_acquire gil_acquire;
157 if (py_func_ptr_.attr("to_json")) {
158 args = nlohmann::json::parse(py_func_ptr_.attr("to_json")().cast<std::string>());
159 }
160 }
161 *out_json = args;
162 return Status::OK();
163 }
164
from_json(nlohmann::json json_obj,std::vector<std::shared_ptr<TensorOperation>> * result)165 Status PyFuncOp::from_json(nlohmann::json json_obj, std::vector<std::shared_ptr<TensorOperation>> *result) {
166 RETURN_UNEXPECTED_IF_NULL(result);
167 std::vector<std::shared_ptr<TensorOperation>> output;
168 RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "tensor_op_name", kPyFuncOp));
169 RETURN_IF_NOT_OK(ValidateParamInJson(json_obj, "tensor_op_params", kPyFuncOp));
170 std::string op_name = json_obj["tensor_op_name"];
171 nlohmann::json op_params = json_obj["tensor_op_params"];
172 std::string python_module = json_obj["python_module"];
173 std::shared_ptr<TensorOperation> operation = nullptr;
174 py::function py_func =
175 py::module::import(python_module.c_str()).attr(op_name.c_str()).attr("from_json")(op_params.dump());
176 operation = std::make_shared<transforms::PreBuiltOperation>(std::make_shared<PyFuncOp>(py_func));
177 output.push_back(operation);
178 *result = output;
179 return Status::OK();
180 }
181
IsRandom()182 bool PyFuncOp::IsRandom() {
183 bool random = true;
184 if (py::hasattr(py_func_ptr_, "random") &&
185 !static_cast<bool>(py::reinterpret_borrow<py::bool_>(py_func_ptr_.attr("random")))) {
186 random = false;
187 }
188 return random;
189 }
190 } // namespace dataset
191 } // namespace mindspore
192