1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/python/lib/core/py_func.h"
17
18 #include <array>
19
20 #include <Python.h>
21
22 #include "numpy/arrayobject.h"
23 #include "tensorflow/c/eager/c_api.h"
24 #include "tensorflow/c/eager/c_api_internal.h"
25 #include "tensorflow/c/tf_status_helper.h"
26 #include "tensorflow/core/framework/allocation_description.pb.h"
27 #include "tensorflow/core/framework/op_kernel.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/lib/core/threadpool.h"
30 #include "tensorflow/core/platform/macros.h"
31 #include "tensorflow/core/platform/mutex.h"
32 #include "tensorflow/core/platform/types.h"
33 #include "tensorflow/python/eager/pywrap_tfe.h"
34 #include "tensorflow/python/lib/core/ndarray_tensor_bridge.h"
35 #include "tensorflow/python/lib/core/py_util.h"
36 #include "tensorflow/python/lib/core/safe_ptr.h"
37
38 namespace tensorflow {
39 namespace {
40
41 static mutex mu(LINKER_INITIALIZED);
42 static PyObject* py_trampoline GUARDED_BY(mu) = nullptr;
43
44 // Returns the py_trampoline that is used to pass the control to the
45 // python runtime.
GetPyTrampoline()46 PyObject* GetPyTrampoline() {
47 mutex_lock l(mu);
48 return py_trampoline;
49 }
50
51 // A call to the registered python function.
52 struct PyCall {
53 // Passed to python runtime to call the python function registered
54 // with this "token".
55 string token;
56
57 // The device on which Tensors are stored; only used for EagerPyFunc.
58 Device* device = nullptr;
59
60 // True if the call is associated with an EagerPyFunc.
61 bool eager = false;
62
63 // Inputs and outputs of this function invocation.
64 std::vector<Tensor> ins;
65 std::vector<Tensor> out;
66 };
67
IsCPUDevice(const Device * d)68 bool IsCPUDevice(const Device* d) {
69 return d == nullptr || d->tensorflow_gpu_device_info() == nullptr;
70 }
71
72 // Givens the 'call', prepares the token and inputs as a python tuple
73 // that is appropriate for calling the trampoline.
MakeArgTuple(const PyCall * call,PyObject ** tuple)74 Status MakeArgTuple(const PyCall* call, PyObject** tuple) {
75 int64 n = call->ins.size();
76 PyObject* lst = PyList_New(n);
77 CHECK(lst);
78 // TFE_TensorHandle assumes that CPU is identified by nullptr.
79 Device* device = IsCPUDevice(call->device) ? nullptr : call->device;
80 for (int64 i = 0; i < n; ++i) {
81 PyObject* arg = nullptr;
82 const Tensor& t = call->ins[i];
83 if (call->eager) {
84 arg = EagerTensorFromHandle(new TFE_TensorHandle(t, device, device));
85 if (arg == nullptr) {
86 Py_DECREF(lst);
87 return errors::Internal("Unable to procure EagerTensor from Tensor.");
88 }
89 } else {
90 Status s = ConvertTensorToNdarray(t, &arg);
91 if (!s.ok()) {
92 Py_DECREF(lst);
93 return s;
94 }
95 }
96 PyList_SetItem(lst, i, arg);
97 }
98 const char* device_name =
99 device == nullptr ? nullptr : device->attributes().name().c_str();
100 *tuple = Py_BuildValue("(ssN)", call->token.c_str(), device_name, lst);
101 CHECK(*tuple);
102 return Status::OK();
103 }
104
105 // Returns the corresponding tf dtype in 'tf' for numpy data type
106 // 'np'. Returns an error if the type is not supported by this
107 // module.
NumericNpDTypeToTfDType(const int np,DataType * tf)108 Status NumericNpDTypeToTfDType(const int np, DataType* tf) {
109 switch (np) {
110 case NPY_FLOAT16:
111 *tf = DT_HALF;
112 break;
113 case NPY_FLOAT32:
114 *tf = DT_FLOAT;
115 break;
116 case NPY_FLOAT64:
117 *tf = DT_DOUBLE;
118 break;
119 case NPY_INT32:
120 *tf = DT_INT32;
121 break;
122 case NPY_UINT8:
123 *tf = DT_UINT8;
124 break;
125 case NPY_INT8:
126 *tf = DT_INT8;
127 break;
128 case NPY_UINT16:
129 *tf = DT_UINT16;
130 break;
131 case NPY_INT16:
132 *tf = DT_INT16;
133 break;
134 case NPY_INT64:
135 *tf = DT_INT64;
136 break;
137 case NPY_BOOL:
138 *tf = DT_BOOL;
139 break;
140 case NPY_COMPLEX64:
141 *tf = DT_COMPLEX64;
142 break;
143 case NPY_COMPLEX128:
144 *tf = DT_COMPLEX128;
145 break;
146 default:
147 return errors::Unimplemented("Unsupported numpy type ", np);
148 }
149 return Status::OK();
150 }
151
IsSingleNone(PyObject * obj)152 bool IsSingleNone(PyObject* obj) {
153 if (!PyArray_Check(obj)) {
154 return false;
155 }
156 PyArrayObject* array_obj = reinterpret_cast<PyArrayObject*>(obj);
157 if (PyArray_NDIM(array_obj) != 0 || PyArray_SIZE(array_obj) != 1) {
158 return false;
159 }
160 std::array<npy_intp, 0> indices;
161 char* item_ptr =
162 static_cast<char*>(PyArray_GetPtr(array_obj, indices.data()));
163 PyObject* item = PyArray_GETITEM(array_obj, item_ptr);
164 CHECK(item);
165 return item == Py_None;
166 }
167
168 // Retrieves a Tensor from `eager_tensor` and stores it in `output_tensor`.
169 // Validates that `output_tensor` is backed by memory in `expected_device`
170 // (which is assumed to be a local device, one on which the kernel was
171 // executed.)
172 //
173 // It may be nice to copy the tensor to the right device instead of failing if
174 // it isn't already there. This is left as a future exercise. The required
175 // device-copying logic is implemented in Python at the moment.
ExtractTensorFromEagerTensor(const PyObject * eager_tensor,const Device * expected_device,const Tensor ** output_tensor)176 tensorflow::Status ExtractTensorFromEagerTensor(const PyObject* eager_tensor,
177 const Device* expected_device,
178 const Tensor** output_tensor) {
179 auto handle = EagerTensor_Handle(eager_tensor)->handle;
180 Device* actual_device = handle->device();
181 TF_RETURN_IF_ERROR(handle->Tensor(output_tensor));
182 // actual_device may be nullptr, which implies local CPU.
183 if (expected_device == actual_device) return Status::OK();
184 const string& expected_device_name = expected_device->attributes().name();
185 if (actual_device == nullptr) {
186 if (!IsCPUDevice(expected_device)) {
187 return errors::Internal(
188 "Expected the py_func to return a Tensor backed by memory in ",
189 expected_device_name,
190 ", but is actually backed by local host memory. This is a bug.");
191 }
192 return Status::OK();
193 }
194 // NOTE(ebrevdo): Here we could try comparing "actual_device_name"
195 // (actual_device->attributes()->name()) to expected_device_name and ensure
196 // they're the same. However, this comparison fails if we create a ClusterDef
197 // on localhost, mainly because the Device created by Eager code doesn't match
198 // the device created by a session. In this case, expected_device_name may
199 // contain "worker" but the Eager device name contains "localhost". Since we
200 // can't easily access the true underlying device of "worker" here, we are not
201 // able to perform a proper comparison. Furthermore, we can't check
202 // IsCPUDevice(actual_device) because the kernel's device may indeed be a
203 // GPU device (the python interpreter doesn't use it, however).
204 return Status::OK();
205 }
206
207 // Calls the registered py function through the trampoline.
DoCallPyFunc(PyCall * call,bool * out_log_on_error)208 Status DoCallPyFunc(PyCall* call, bool* out_log_on_error) {
209 *out_log_on_error = true;
210 PyObject* trampoline = GetPyTrampoline();
211 if (trampoline == nullptr) {
212 return errors::InvalidArgument(
213 "Missing py trampoline. Most likely, it is a link error.");
214 }
215 // Prepare the argument.
216 PyObject* args = nullptr;
217 TF_RETURN_IF_ERROR(MakeArgTuple(call, &args));
218 CHECK(args);
219
220 // Invokes the trampoline.
221 PyObject* result = PyEval_CallObject(trampoline, args);
222 Py_DECREF(args);
223 if (result == nullptr) {
224 if (PyErr_Occurred()) {
225 if (PyErr_ExceptionMatches(PyExc_ValueError) ||
226 PyErr_ExceptionMatches(PyExc_TypeError)) {
227 return errors::InvalidArgument(PyExceptionFetch());
228 } else if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
229 *out_log_on_error = false;
230 return errors::OutOfRange(PyExceptionFetch());
231 } else if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
232 return errors::ResourceExhausted(PyExceptionFetch());
233 } else if (PyErr_ExceptionMatches(PyExc_NotImplementedError)) {
234 return errors::Unimplemented(PyExceptionFetch());
235 } else {
236 // TODO(ebrevdo): Check if exception is an OpError and use the
237 // OpError.error_code property to map it back in the Status.
238 return errors::Unknown(PyExceptionFetch());
239 }
240 } else {
241 return errors::Internal("Failed to run py callback ", call->token,
242 ": see error log.");
243 }
244 }
245
246 // Process the return values and convert them to TF Tensors.
247 Status s = Status::OK();
248 if (PyList_Check(result)) {
249 // `result` is a Python list; if this operation is an `EagerPyFunc`, then
250 // every item in the list must be an `EagerTensor`; otherwise, every element
251 // must be a NumPy array.
252 call->out.clear();
253 for (int i = 0; i < PyList_Size(result); ++i) {
254 Tensor t;
255 if (call->eager) {
256 const PyObject* item = PyList_GetItem(result, i);
257 if (EagerTensor_CheckExact(item)) {
258 const Tensor* tensor = nullptr;
259 s = ExtractTensorFromEagerTensor(item, call->device, &tensor);
260 if (s.ok()) t = *tensor;
261 } else {
262 s = errors::FailedPrecondition(
263 "Expected EagerTensor, found PyObject of type: ",
264 Py_TYPE(item)->tp_name);
265 }
266 } else {
267 s = ConvertNdarrayToTensor(PyList_GetItem(result, i), &t);
268 }
269
270 if (!s.ok()) {
271 break;
272 }
273 call->out.push_back(t);
274 }
275 } else if (EagerTensor_CheckExact(result) || result == Py_None) {
276 // result is an `EagerTensor` or `None`.
277 DCHECK(call->eager);
278 if (result != Py_None) {
279 const Tensor* t = nullptr;
280 s = ExtractTensorFromEagerTensor(result, call->device, &t);
281 if (s.ok()) call->out.push_back(*t);
282 }
283 } else if (PyArray_Check(result)) {
284 // `result` is a NumPy array.
285 DCHECK(!call->eager);
286 if (!IsSingleNone(result)) {
287 Tensor t;
288 s = ConvertNdarrayToTensor(result, &t);
289 if (s.ok()) {
290 call->out.push_back(t);
291 }
292 }
293 } else {
294 s = errors::Internal("Unexpected PyObject was returned: ",
295 Py_TYPE(result)->tp_name);
296 }
297 Py_DECREF(result);
298 return s;
299 }
300
301 } // end namespace
302
303 // Outside anonymous namespace just to make the friend declaration in
304 // tensorflow::Tensor apply.
305 class NumpyTensorBuffer : public TensorBuffer {
306 public:
NumpyTensorBuffer(PyArrayObject * array,size_t len,void * data)307 NumpyTensorBuffer(PyArrayObject* array, size_t len, void* data)
308 : TensorBuffer(data), array_(array), len_(len) {}
309
~NumpyTensorBuffer()310 ~NumpyTensorBuffer() override {
311 // Note: The session::run wrapper is responsible for freeing this while
312 // holding the GIL.
313 DelayedNumpyDecref(data(), len_, array_);
314 }
315
size() const316 size_t size() const override { return len_; }
root_buffer()317 TensorBuffer* root_buffer() override { return this; }
FillAllocationDescription(AllocationDescription * proto) const318 void FillAllocationDescription(AllocationDescription* proto) const override {
319 tensorflow::int64 rb = size();
320 proto->set_requested_bytes(rb);
321 proto->set_allocator_name(tensorflow::cpu_allocator()->Name());
322 }
MakeTensor(DataType dtype,const TensorShape & shape)323 Tensor MakeTensor(DataType dtype, const TensorShape& shape) {
324 CHECK_EQ(len_, shape.num_elements() * DataTypeSize(dtype));
325 return Tensor(dtype, shape, this);
326 }
327
328 // Prevents input forwarding from overwriting this buffer.
OwnsMemory() const329 bool OwnsMemory() const override { return false; }
330
331 private:
332 PyArrayObject* array_;
333 size_t len_;
334 };
335
PyObjectToString(PyObject * obj,string * str)336 Status PyObjectToString(PyObject* obj, string* str) {
337 char* py_bytes;
338 Py_ssize_t size;
339 if (PyBytes_AsStringAndSize(obj, &py_bytes, &size) != -1) {
340 str->assign(py_bytes, size);
341 return Status::OK();
342 }
343 #if PY_MAJOR_VERSION >= 3
344 const char* ptr = PyUnicode_AsUTF8AndSize(obj, &size);
345 if (ptr != nullptr) {
346 str->assign(ptr, size);
347 return Status::OK();
348 }
349 #else
350 if (PyUnicode_Check(obj)) {
351 PyObject* unicode = PyUnicode_AsUTF8String(obj);
352 char* ptr;
353 if (unicode && PyString_AsStringAndSize(unicode, &ptr, &size) != -1) {
354 str->assign(ptr, size);
355 Py_DECREF(unicode);
356 return Status::OK();
357 }
358 Py_XDECREF(unicode);
359 }
360 #endif
361 return errors::Unimplemented("Unsupported object type ",
362 obj->ob_type->tp_name);
363 }
364
ConvertNdarrayToTensor(PyObject * obj,Tensor * ret)365 Status ConvertNdarrayToTensor(PyObject* obj, Tensor* ret) {
366 PyArrayObject* input = reinterpret_cast<PyArrayObject*>(obj);
367 DataType dtype = DT_INVALID;
368 TensorShape shape;
369 for (int i = 0; i < PyArray_NDIM(input); ++i) {
370 shape.AddDim(PyArray_SHAPE(input)[i]);
371 }
372 const int np_type = PyArray_TYPE(input);
373 switch (np_type) {
374 case NPY_OBJECT: {
375 dtype = DT_STRING;
376 Tensor t(dtype, shape);
377 auto tflat = t.flat<string>();
378 PyObject** input_data = reinterpret_cast<PyObject**>(PyArray_DATA(input));
379 for (int i = 0; i < tflat.dimension(0); ++i) {
380 TF_RETURN_IF_ERROR(PyObjectToString(input_data[i], &tflat(i)));
381 }
382 *ret = t;
383 break;
384 }
385 case NPY_STRING: {
386 dtype = DT_STRING;
387 Tensor t(dtype, shape);
388 auto tflat = t.flat<string>();
389 char* input_data = PyArray_BYTES(input);
390 Py_ssize_t el_size = PyArray_ITEMSIZE(input);
391 for (int i = 0; i < tflat.dimension(0); ++i) {
392 tflat(i) = string(input_data + i * el_size, el_size);
393 }
394 *ret = t;
395 break;
396 }
397 default: {
398 TF_RETURN_IF_ERROR(NumericNpDTypeToTfDType(PyArray_TYPE(input), &dtype));
399 CHECK(DataTypeCanUseMemcpy(dtype));
400 if (reinterpret_cast<intptr_t>(PyArray_DATA(input)) %
401 std::max(1, EIGEN_MAX_ALIGN_BYTES) !=
402 0) {
403 Tensor t(dtype, shape);
404 StringPiece p = t.tensor_data();
405 memcpy(const_cast<char*>(p.data()), PyArray_DATA(input), p.size());
406 *ret = t;
407 } else {
408 // Incref the array as the calling context will decref it when we
409 // return and we want to keep a handle to this memory.
410 Py_INCREF(input);
411 NumpyTensorBuffer* buf = new NumpyTensorBuffer(
412 input, shape.num_elements() * DataTypeSize(dtype),
413 PyArray_DATA(input));
414 *ret = buf->MakeTensor(dtype, shape);
415 buf->Unref();
416 }
417 }
418 }
419 return Status::OK();
420 }
421
422 // Creates a numpy array in 'ret' which either aliases the content of 't' or has
423 // a copy.
ConvertTensorToNdarray(const Tensor & t,PyObject ** ret)424 Status ConvertTensorToNdarray(const Tensor& t, PyObject** ret) {
425 int typenum = -1;
426 TF_RETURN_IF_ERROR(TF_DataType_to_PyArray_TYPE(
427 static_cast<TF_DataType>(t.dtype()), &typenum));
428 PyArray_Descr* descr = PyArray_DescrFromType(typenum);
429 CHECK(descr);
430 std::vector<npy_intp> dims;
431 dims.reserve(t.dims());
432 for (int i = 0; i < t.dims(); ++i) {
433 dims.push_back(t.dim_size(i));
434 }
435 Tensor* copy = new Tensor(t);
436 if (ArrayFromMemory(dims.size(), dims.data(),
437 const_cast<char*>(copy->tensor_data().data()), t.dtype(),
438 [copy]() { delete copy; }, ret)
439 .ok()) {
440 return Status::OK();
441 }
442 delete copy;
443
444 PyObject* obj = PyArray_Empty(dims.size(), dims.data(), descr, 0);
445 if (obj == nullptr) {
446 return errors::Internal("Failed to allocate np array: ",
447 t.shape().DebugString());
448 }
449 PyArrayObject* np_array = reinterpret_cast<PyArrayObject*>(obj);
450 if (typenum == NPY_OBJECT) {
451 CHECK_EQ(DT_STRING, t.dtype());
452 auto tflat = t.flat<string>();
453 PyObject** out = reinterpret_cast<PyObject**>(PyArray_DATA(np_array));
454 for (int i = 0; i < tflat.dimension(0); ++i) {
455 const string& el = tflat(i);
456 out[i] = PyBytes_FromStringAndSize(el.data(), el.size());
457 if (out[i] == nullptr) {
458 for (int j = 0; j < i; ++j) {
459 Py_DECREF(out[j]);
460 }
461 Py_DECREF(obj);
462 return errors::Internal("Failed to allocate a copy of string ", i);
463 }
464 }
465 } else {
466 CHECK(DataTypeCanUseMemcpy(t.dtype()));
467 StringPiece p = t.tensor_data();
468 memcpy(PyArray_DATA(np_array), p.data(), p.size());
469 }
470 *ret = PyArray_Return(np_array);
471 return Status::OK();
472 }
473
InitializePyTrampoline(PyObject * trampoline)474 void InitializePyTrampoline(PyObject* trampoline) {
475 mutex_lock l(mu);
476 if (py_trampoline == nullptr) {
477 py_trampoline = trampoline;
478 Py_INCREF(py_trampoline);
479 } else {
480 LOG(WARNING) << "InitializeCallback should only be called once";
481 }
482 }
483
484 class PyFuncOp : public OpKernel {
485 public:
PyFuncOp(OpKernelConstruction * ctx)486 explicit PyFuncOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
487 OP_REQUIRES_OK(ctx, ctx->GetAttr("token", &token_));
488 eager_ = type_string() == "EagerPyFunc";
489 }
490
IsExpensive()491 bool IsExpensive() override { return true; }
492
Compute(OpKernelContext * ctx)493 void Compute(OpKernelContext* ctx) override {
494 PyCall call;
495 call.token = token_;
496 call.eager = eager_;
497 if (call.eager) {
498 // Eager's C API uses `Device`, whereas `OpKernelContext` stores a
499 // `DeviceBase`; attempt to downcast.
500 call.device = dynamic_cast<Device*>(ctx->device());
501 if (call.device == nullptr) {
502 ctx->CtxFailureWithWarning(errors::Internal(
503 "Unrecognized device class: ", ctx->device()->name()));
504 return;
505 }
506 }
507
508 for (int i = 0; i < ctx->num_inputs(); ++i) {
509 call.ins.push_back(ctx->input(i));
510 }
511
512 // NOTE(mrry): There is a potential time-of-check-to-time-of-use race here.
513 // because it is possible that `Py_Finalize()` could be called in another
514 // thread between this check and the call to `PyGILState_Ensure()`, which
515 // will abort the process if `Py_Finalize()` has been called. A more robust
516 // solution would be welcome, but it is not obvious how to make this work
517 // using the current Python C API.
518 OP_REQUIRES(ctx, Py_IsInitialized(),
519 errors::FailedPrecondition(
520 "Python interpreter state is not initialized. "
521 "The process may be terminated."));
522
523 PyGILState_STATE py_threadstate;
524 py_threadstate = PyGILState_Ensure();
525 bool log_on_error;
526 Status s = DoCallPyFunc(&call, &log_on_error);
527 // Sometimes py_funcs can be called without a session and leak memory. This
528 // ensures we clear the decref cache so this doesn't happen.
529 ClearDecrefCache();
530 PyGILState_Release(py_threadstate);
531
532 // Ensures that GIL is released even when !s.ok().
533 if (!s.ok()) {
534 if (log_on_error) {
535 ctx->CtxFailureWithWarning(s);
536 } else {
537 ctx->CtxFailure(s);
538 }
539 return;
540 }
541
542 OP_REQUIRES(ctx, static_cast<int32>(call.out.size()) == ctx->num_outputs(),
543 errors::InvalidArgument(token_, " returns ", call.out.size(),
544 " values, but expects to see ",
545 ctx->num_outputs(), " values."));
546 for (size_t i = 0; i < call.out.size(); ++i) {
547 const auto& t = call.out[i];
548 OP_REQUIRES(
549 ctx, t.dtype() == output_type(i),
550 errors::InvalidArgument(i, "-th value returned by ", token_, " is ",
551 DataTypeString(t.dtype()), ", but expects ",
552 DataTypeString(output_type(i))));
553 ctx->set_output(i, t);
554 }
555 }
556
557 private:
558 string token_;
559
560 // True if and only if this op should execute the python function eagerly,
561 // i.e., if and only if the eager attribute is set.
562 bool eager_;
563
564 TF_DISALLOW_COPY_AND_ASSIGN(PyFuncOp);
565 };
566
567 REGISTER_KERNEL_BUILDER(Name("PyFunc").Device(DEVICE_CPU), PyFuncOp);
568 REGISTER_KERNEL_BUILDER(Name("PyFuncStateless").Device(DEVICE_CPU), PyFuncOp);
569 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_CPU), PyFuncOp);
570 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_GPU), PyFuncOp);
571
572 } // end namespace tensorflow
573