1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/copy_tensor.h"
17
18 #include <atomic>
19 #include <utility>
20 #include <vector>
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/framework/variant_op_registry.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/refcount.h"
25 #include "tensorflow/core/platform/logging.h"
26 #include "tensorflow/core/platform/tracing.h"
27 #include "tensorflow/core/util/reffed_status_callback.h"
28
29 namespace tensorflow {
30 namespace {
31
32 struct RegistrationInfo {
RegistrationInfotensorflow::__anon4c89d33a0111::RegistrationInfo33 RegistrationInfo(DeviceType s, DeviceType r, CopyTensor::CopyFunction cf)
34 : sender_device_type(std::move(s)),
35 receiver_device_type(std::move(r)),
36 copy_function(cf) {}
37 DeviceType sender_device_type;
38 DeviceType receiver_device_type;
39 CopyTensor::CopyFunction copy_function;
40 };
41
42 // We use a vector instead of a map since we expect there to be very
43 // few registrations.
MutableRegistry()44 std::vector<RegistrationInfo>* MutableRegistry() {
45 static std::vector<RegistrationInfo>* registry =
46 new std::vector<RegistrationInfo>;
47 return registry;
48 }
49
CopyHostToDevice(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * dst,Tensor * output,DeviceContext * recv_dev_context,StatusCallback done)50 void CopyHostToDevice(const Tensor* input, Allocator* cpu_allocator,
51 Allocator* out_allocator, StringPiece edge_name,
52 Device* dst, Tensor* output,
53 DeviceContext* recv_dev_context, StatusCallback done) {
54 if (input->dtype() == DT_VARIANT) {
55 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
56 auto* status_cb = new ReffedStatusCallback(std::move(done));
57 core::ScopedUnref status_cb_unref(status_cb);
58
59 auto wrapped_done = [status_cb](const Status& s) {
60 status_cb->UpdateStatus(s);
61 status_cb->Unref();
62 };
63 auto copier = std::bind(
64 [dst, recv_dev_context, out_allocator, status_cb, cpu_allocator,
65 edge_name](StatusCallback wrapped_done_,
66 // Begin unbound arguments
67 const Tensor& from, Tensor* to) {
68 if (from.dtype() == DT_VARIANT) {
69 status_cb->Ref();
70 CopyHostToDevice(&from, cpu_allocator, out_allocator, edge_name,
71 dst, to, recv_dev_context, wrapped_done_);
72 return Status::OK();
73 } else {
74 if (!DMAHelper::CanUseDMA(&from)) {
75 Status err = errors::InvalidArgument(
76 "During Variant Host->Device Copy: "
77 "non-DMA-copy attempted of tensor type: ",
78 DataTypeString(from.dtype()));
79 status_cb->UpdateStatus(err);
80 return err;
81 }
82 if (status_cb->ok()) {
83 status_cb->Ref();
84 *to = Tensor(out_allocator, from.dtype(), from.shape());
85 recv_dev_context->CopyCPUTensorToDevice(&from, dst, to,
86 wrapped_done_);
87 return Status::OK();
88 } else {
89 return status_cb->status();
90 }
91 }
92 },
93 std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
94
95 const Variant* v = input->flat<Variant>().data();
96 Variant* v_out = copy.flat<Variant>().data();
97 Status s_copy_init;
98 for (int64 i = 0; i < input->NumElements(); ++i) {
99 s_copy_init = VariantDeviceCopy(
100 VariantDeviceCopyDirection::HOST_TO_DEVICE, v[i], &v_out[i], copier);
101 if (!s_copy_init.ok()) {
102 status_cb->UpdateStatus(s_copy_init);
103 break;
104 }
105 }
106 if (s_copy_init.ok()) {
107 *output = std::move(copy);
108 }
109 } else {
110 recv_dev_context->CopyCPUTensorToDevice(input, dst, output,
111 std::move(done));
112 }
113 }
114
CopyDeviceToHost(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * src,Tensor * output,DeviceContext * send_dev_context,StatusCallback done)115 void CopyDeviceToHost(const Tensor* input, Allocator* cpu_allocator,
116 Allocator* out_allocator, StringPiece edge_name,
117 Device* src, Tensor* output,
118 DeviceContext* send_dev_context, StatusCallback done) {
119 if (input->dtype() == DT_VARIANT) {
120 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
121 auto* status_cb = new ReffedStatusCallback(std::move(done));
122 core::ScopedUnref status_cb_unref(status_cb);
123
124 auto wrapped_done = [status_cb](const Status& s) {
125 status_cb->UpdateStatus(s);
126 status_cb->Unref();
127 };
128 auto copier = std::bind(
129 [edge_name, src, send_dev_context, out_allocator, status_cb,
130 cpu_allocator](StatusCallback wrapped_done_,
131 // Begin unbound arguments
132 const Tensor& from, Tensor* to) {
133 if (from.dtype() == DT_VARIANT) {
134 status_cb->Ref();
135 CopyDeviceToHost(&from, cpu_allocator, out_allocator, edge_name,
136 src, to, send_dev_context, wrapped_done_);
137 return Status::OK();
138 } else {
139 if (!DMAHelper::CanUseDMA(&from)) {
140 Status err = errors::InvalidArgument(
141 "During Variant Device->Host Copy: "
142 "non-DMA-copy attempted of tensor type: ",
143 DataTypeString(from.dtype()));
144 status_cb->UpdateStatus(err);
145 return err;
146 }
147 if (status_cb->ok()) {
148 status_cb->Ref();
149 *to = Tensor(out_allocator, from.dtype(), from.shape());
150 send_dev_context->CopyDeviceTensorToCPU(&from, edge_name, src, to,
151 wrapped_done_);
152 return Status::OK();
153 } else {
154 return status_cb->status();
155 }
156 }
157 },
158 std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
159
160 const Variant* v = input->flat<Variant>().data();
161 Variant* v_out = copy.flat<Variant>().data();
162 Status s_copy_init;
163 for (int64 i = 0; i < input->NumElements(); ++i) {
164 s_copy_init = VariantDeviceCopy(
165 VariantDeviceCopyDirection::DEVICE_TO_HOST, v[i], &v_out[i], copier);
166 if (!s_copy_init.ok()) {
167 status_cb->UpdateStatus(s_copy_init);
168 break;
169 }
170 }
171 if (s_copy_init.ok()) {
172 *output = std::move(copy);
173 }
174 } else {
175 send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, output,
176 std::move(done));
177 }
178 }
179
CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,Allocator * cpu_allocator,Allocator * out_allocator,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)180 void CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,
181 Allocator* cpu_allocator, Allocator* out_allocator,
182 DeviceContext* send_dev_context,
183 DeviceContext* recv_dev_context, Device* src,
184 Device* dst, const AllocatorAttributes src_alloc_attr,
185 const AllocatorAttributes dst_alloc_attr,
186 const Tensor* input, Tensor* output,
187 int dev_to_dev_stream_index, StatusCallback done) {
188 if (input->dtype() == DT_VARIANT) {
189 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
190 auto* status_cb = new ReffedStatusCallback(std::move(done));
191 core::ScopedUnref status_cb_unref(status_cb);
192
193 auto wrapped_done = [status_cb](const Status& s) {
194 status_cb->UpdateStatus(s);
195 status_cb->Unref();
196 };
197 auto copier = std::bind(
198 [copy_function, src, dst, src_alloc_attr, dst_alloc_attr,
199 recv_dev_context, send_dev_context, out_allocator, status_cb,
200 dev_to_dev_stream_index](StatusCallback wrapped_done_,
201 // Begin unbound arguments
202 const Tensor& from, Tensor* to) {
203 if (!DMAHelper::CanUseDMA(&from)) {
204 Status err = errors::InvalidArgument(
205 "During Variant Device->Device Copy: "
206 "non-DMA-copy attempted of tensor type: ",
207 DataTypeString(from.dtype()));
208 status_cb->UpdateStatus(err);
209 return err;
210 }
211 if (status_cb->ok()) {
212 status_cb->Ref();
213 *to = Tensor(out_allocator, from.dtype(), from.shape());
214 copy_function(send_dev_context, recv_dev_context, src, dst,
215 src_alloc_attr, dst_alloc_attr, &from, to,
216 dev_to_dev_stream_index, std::move(wrapped_done_));
217 return Status::OK();
218 } else {
219 return status_cb->status();
220 }
221 },
222 std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
223
224 const Variant* v = input->flat<Variant>().data();
225 Variant* v_out = copy.flat<Variant>().data();
226 Status s_copy_init;
227 for (int64 i = 0; i < input->NumElements(); ++i) {
228 s_copy_init =
229 VariantDeviceCopy(VariantDeviceCopyDirection::DEVICE_TO_DEVICE, v[i],
230 &v_out[i], copier);
231 if (!s_copy_init.ok()) {
232 status_cb->UpdateStatus(s_copy_init);
233 break;
234 }
235 }
236 if (s_copy_init.ok()) {
237 *output = std::move(copy);
238 }
239 } else {
240 copy_function(send_dev_context, recv_dev_context, src, dst, src_alloc_attr,
241 dst_alloc_attr, input, output, dev_to_dev_stream_index,
242 std::move(done));
243 }
244 }
245
246 } // namespace
247
248 // static
ViaDMA(StringPiece edge_name,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)249 void CopyTensor::ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
250 DeviceContext* recv_dev_context, Device* src,
251 Device* dst, const AllocatorAttributes src_alloc_attr,
252 const AllocatorAttributes dst_alloc_attr,
253 const Tensor* input, Tensor* output,
254 int dev_to_dev_stream_index, StatusCallback done) {
255 tracing::ScopedAnnotation annotation(edge_name);
256 VLOG(1) << "Copy " << edge_name;
257
258 const DeviceType src_device_type(
259 src_alloc_attr.on_host() ? DEVICE_CPU : src->attributes().device_type());
260 const DeviceType dst_device_type(
261 dst_alloc_attr.on_host() ? DEVICE_CPU : dst->attributes().device_type());
262 const bool non_cpu_src = src_device_type != DeviceType(DEVICE_CPU);
263 const bool non_cpu_dst = dst_device_type != DeviceType(DEVICE_CPU);
264
265 // TODO(phawkins): choose an allocator optimal for both the src and dst
266 // devices, not just the src device.
267 AllocatorAttributes host_alloc_attrs;
268 host_alloc_attrs.set_gpu_compatible(true);
269 host_alloc_attrs.set_on_host(true);
270 Allocator* cpu_allocator = src->GetAllocator(host_alloc_attrs);
271 Allocator* out_allocator = dst->GetAllocator(dst_alloc_attr);
272
273 // E.g., gpu -> gpu
274 if (non_cpu_src && non_cpu_dst) {
275 // Device to device copy. Look through registry for an appropriate
276 // CopyFunction.
277 std::vector<RegistrationInfo>* registry = MutableRegistry();
278 for (const RegistrationInfo& ri : *registry) {
279 if (ri.sender_device_type == src_device_type &&
280 ri.receiver_device_type == dst_device_type) {
281 CopyDeviceToDevice(ri.copy_function, cpu_allocator, out_allocator,
282 send_dev_context, recv_dev_context, src, dst,
283 src_alloc_attr, dst_alloc_attr, input, output,
284 dev_to_dev_stream_index, std::move(done));
285 return;
286 }
287 }
288
289 // Fall back to copying via the host.
290 VLOG(1) << "No function registered to copy from devices of type "
291 << src_device_type.type() << " to devices of type "
292 << dst_device_type.type()
293 << ". Falling back to copying via the host.";
294
295 Tensor* cpu_tensor =
296 new Tensor(cpu_allocator, input->dtype(), input->shape());
297 std::function<void(const Status&)> delete_and_done = std::bind(
298 [cpu_tensor](StatusCallback done_,
299 // Begin unbound arguments.
300 const Status& status) {
301 delete cpu_tensor;
302 done_(status);
303 },
304 std::move(done), std::placeholders::_1);
305 std::function<void(const Status&)> then_copy_to_other_device = std::bind(
306 [delete_and_done, recv_dev_context, cpu_tensor, cpu_allocator,
307 out_allocator, edge_name, dst, output](StatusCallback delete_and_done_,
308 // Begin unbound arguments.
309 Status status) {
310 if (!status.ok()) {
311 delete_and_done_(status);
312 return;
313 }
314 CopyHostToDevice(cpu_tensor, cpu_allocator, out_allocator, edge_name,
315 dst, output, recv_dev_context,
316 std::move(delete_and_done_));
317 },
318 std::move(delete_and_done), std::placeholders::_1);
319 CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
320 cpu_tensor, send_dev_context,
321 std::move(then_copy_to_other_device));
322 return;
323 }
324
325 // E.g., gpu -> cpu
326 if (non_cpu_src && !non_cpu_dst) {
327 // Device to host copy.
328 CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
329 output, send_dev_context, std::move(done));
330 return;
331 }
332
333 // E.g., cpu -> gpu
334 if (!non_cpu_src && non_cpu_dst) {
335 // Host to Device copy.
336 CopyHostToDevice(input, cpu_allocator, out_allocator, edge_name, dst,
337 output, recv_dev_context, std::move(done));
338 return;
339 }
340
341 // cpu -> cpu
342 CHECK(!non_cpu_src && !non_cpu_dst);
343 *output = *input;
344 done(Status::OK());
345 }
346
347 // static
Register(DeviceType sender_device_type,DeviceType receiver_device_type,CopyFunction copy_function)348 Status CopyTensor::Register(DeviceType sender_device_type,
349 DeviceType receiver_device_type,
350 CopyFunction copy_function) {
351 std::vector<RegistrationInfo>* registry = MutableRegistry();
352 registry->emplace_back(sender_device_type, receiver_device_type,
353 copy_function);
354 return Status::OK();
355 }
356
357 namespace {
358
359 // The following registrations enable a DT_VARIANT tensor element that contains
360 // a wrapped `tensorflow::Tensor` to be copied between devices.
WrappedTensorDeviceCopy(const Tensor & from,Tensor * to,const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn & copy)361 static Status WrappedTensorDeviceCopy(
362 const Tensor& from, Tensor* to,
363 const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn& copy) {
364 if (from.dtype() == DT_VARIANT) {
365 // TODO(b/116349787): Implement support for nested variants.
366 return errors::Unimplemented(
367 "Support for copying nested variants to device has not yet been "
368 "implemented.");
369 } else if (DMAHelper::CanUseDMA(&from)) {
370 TF_RETURN_IF_ERROR(copy(from, to));
371 } else {
372 *to = from;
373 }
374
375 return Status::OK();
376 }
377
378 #define REGISTER_WRAPPED_TENSOR_COPY(DIRECTION) \
379 INTERNAL_REGISTER_UNARY_VARIANT_DEVICE_COPY_FUNCTION( \
380 Tensor, DIRECTION, WrappedTensorDeviceCopy)
381
382 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::HOST_TO_DEVICE);
383 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_HOST);
384 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_DEVICE);
385
386 } // namespace
387
388 } // namespace tensorflow
389