1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/copy_tensor.h"
17
18 #include <atomic>
19 #include <utility>
20 #include <vector>
21
22 #include "tensorflow/core/common_runtime/dma_helper.h"
23 #include "tensorflow/core/framework/variant_op_registry.h"
24 #include "tensorflow/core/lib/core/errors.h"
25 #include "tensorflow/core/lib/core/refcount.h"
26 #include "tensorflow/core/platform/logging.h"
27 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
28 #include "tensorflow/core/util/reffed_status_callback.h"
29
30 namespace tensorflow {
31 namespace {
32
33 struct RegistrationInfo {
RegistrationInfotensorflow::__anon841e94980111::RegistrationInfo34 RegistrationInfo(DeviceType s, DeviceType r, CopyTensor::CopyFunction cf)
35 : sender_device_type(std::move(s)),
36 receiver_device_type(std::move(r)),
37 copy_function(cf) {}
38 DeviceType sender_device_type;
39 DeviceType receiver_device_type;
40 CopyTensor::CopyFunction copy_function;
41 };
42
43 // We use a vector instead of a map since we expect there to be very
44 // few registrations.
MutableRegistry()45 std::vector<RegistrationInfo>* MutableRegistry() {
46 static std::vector<RegistrationInfo>* registry =
47 new std::vector<RegistrationInfo>;
48 return registry;
49 }
50
CopyHostToDevice(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * dst,Tensor * output,DeviceContext * recv_dev_context,StatusCallback done,bool sync_dst_compute)51 void CopyHostToDevice(const Tensor* input, Allocator* cpu_allocator,
52 Allocator* out_allocator, StringPiece edge_name,
53 Device* dst, Tensor* output,
54 DeviceContext* recv_dev_context, StatusCallback done,
55 bool sync_dst_compute) {
56 if (input->dtype() == DT_VARIANT) {
57 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
58 auto* status_cb = new ReffedStatusCallback(std::move(done));
59 core::ScopedUnref status_cb_unref(status_cb);
60
61 auto wrapped_done = [status_cb](const Status& s) {
62 status_cb->UpdateStatus(s);
63 status_cb->Unref();
64 };
65 auto copier =
66 [dst, recv_dev_context, out_allocator, status_cb, cpu_allocator,
67 edge_name, sync_dst_compute, wrapped_done = std::move(wrapped_done)](
68 const Tensor& from, Tensor* to) {
69 if (from.dtype() == DT_VARIANT) {
70 status_cb->Ref();
71 CopyHostToDevice(&from, cpu_allocator, out_allocator, edge_name,
72 dst, to, recv_dev_context, wrapped_done,
73 sync_dst_compute);
74 return Status::OK();
75 } else {
76 if (!DMAHelper::CanUseDMA(&from)) {
77 Status err = errors::InvalidArgument(
78 "During Variant Host->Device Copy: "
79 "non-DMA-copy attempted of tensor type: ",
80 DataTypeString(from.dtype()));
81 status_cb->UpdateStatus(err);
82 return err;
83 }
84 if (status_cb->ok()) {
85 status_cb->Ref();
86 *to = Tensor(out_allocator, from.dtype(), from.shape());
87 recv_dev_context->CopyCPUTensorToDevice(
88 &from, dst, to, wrapped_done, sync_dst_compute);
89 return Status::OK();
90 } else {
91 return status_cb->status();
92 }
93 }
94 };
95
96 const Variant* v = input->flat<Variant>().data();
97 Variant* v_out = copy.flat<Variant>().data();
98 Status s_copy_init;
99 for (int64 i = 0; i < input->NumElements(); ++i) {
100 s_copy_init = VariantDeviceCopy(
101 VariantDeviceCopyDirection::HOST_TO_DEVICE, v[i], &v_out[i], copier);
102 if (!s_copy_init.ok()) {
103 status_cb->UpdateStatus(s_copy_init);
104 break;
105 }
106 }
107 if (s_copy_init.ok()) {
108 *output = std::move(copy);
109 }
110 } else if (input->dtype() == DT_RESOURCE) {
111 *output = *input;
112 done(Status::OK());
113 } else {
114 recv_dev_context->CopyCPUTensorToDevice(input, dst, output, std::move(done),
115 sync_dst_compute);
116 }
117 }
118
119
CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,Allocator * cpu_allocator,Allocator * out_allocator,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)120 void CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,
121 Allocator* cpu_allocator, Allocator* out_allocator,
122 DeviceContext* send_dev_context,
123 DeviceContext* recv_dev_context, Device* src,
124 Device* dst, const AllocatorAttributes src_alloc_attr,
125 const AllocatorAttributes dst_alloc_attr,
126 const Tensor* input, Tensor* output,
127 int dev_to_dev_stream_index, StatusCallback done) {
128 if (input->dtype() == DT_VARIANT) {
129 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
130 auto* status_cb = new ReffedStatusCallback(std::move(done));
131 core::ScopedUnref status_cb_unref(status_cb);
132
133 auto wrapped_done = [status_cb](const Status& s) {
134 status_cb->UpdateStatus(s);
135 status_cb->Unref();
136 };
137 auto copier =
138 [copy_function, cpu_allocator, src, dst, src_alloc_attr, dst_alloc_attr,
139 recv_dev_context, send_dev_context, out_allocator, status_cb,
140 dev_to_dev_stream_index, wrapped_done = std::move(wrapped_done)](
141 // Begin unbound arguments
142 const Tensor& from, Tensor* to) {
143 if (from.dtype() == DT_VARIANT) {
144 status_cb->Ref();
145 CopyDeviceToDevice(copy_function, cpu_allocator, out_allocator,
146 send_dev_context, recv_dev_context, src, dst,
147 src_alloc_attr, dst_alloc_attr, &from, to,
148 dev_to_dev_stream_index, wrapped_done);
149 return Status::OK();
150 } else {
151 if (!DMAHelper::CanUseDMA(&from)) {
152 Status err = errors::InvalidArgument(
153 "During Variant Device->Device Copy: ", src->name(), " to ",
154 dst->name(), " non-DMA-copy attempted of tensor type: ",
155 DataTypeString(from.dtype()));
156 status_cb->UpdateStatus(err);
157 return err;
158 }
159 if (status_cb->ok()) {
160 status_cb->Ref();
161 *to = Tensor(out_allocator, from.dtype(), from.shape());
162 copy_function(send_dev_context, recv_dev_context, src, dst,
163 src_alloc_attr, dst_alloc_attr, &from, to,
164 dev_to_dev_stream_index, wrapped_done);
165 return Status::OK();
166 } else {
167 return status_cb->status();
168 }
169 }
170 };
171
172 const Variant* v = input->flat<Variant>().data();
173 Variant* v_out = copy.flat<Variant>().data();
174 Status s_copy_init;
175 for (int64 i = 0; i < input->NumElements(); ++i) {
176 s_copy_init =
177 VariantDeviceCopy(VariantDeviceCopyDirection::DEVICE_TO_DEVICE, v[i],
178 &v_out[i], copier);
179 if (!s_copy_init.ok()) {
180 status_cb->UpdateStatus(s_copy_init);
181 break;
182 }
183 }
184 if (s_copy_init.ok()) {
185 *output = std::move(copy);
186 }
187 } else if (input->dtype() == DT_RESOURCE) {
188 *output = *input;
189 done(Status::OK());
190 } else {
191 copy_function(send_dev_context, recv_dev_context, src, dst, src_alloc_attr,
192 dst_alloc_attr, input, output, dev_to_dev_stream_index,
193 std::move(done));
194 }
195 }
196
197 } // namespace
198
199 // static
ViaDMA(StringPiece edge_name,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done,bool sync_dst_compute)200 void CopyTensor::ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
201 DeviceContext* recv_dev_context, Device* src,
202 Device* dst, const AllocatorAttributes src_alloc_attr,
203 const AllocatorAttributes dst_alloc_attr,
204 const Tensor* input, Tensor* output,
205 int dev_to_dev_stream_index, StatusCallback done,
206 bool sync_dst_compute) {
207 profiler::ScopedAnnotation annotation(
208 [&] { return absl::StrCat("#edge_name=", edge_name, "#"); });
209 VLOG(1) << "Copy " << edge_name;
210
211 const DeviceType src_device_type(
212 src_alloc_attr.on_host() ? DEVICE_CPU : src->attributes().device_type());
213 const DeviceType dst_device_type(
214 dst_alloc_attr.on_host() ? DEVICE_CPU : dst->attributes().device_type());
215 const bool non_cpu_src = src_device_type != DeviceType(DEVICE_CPU);
216 const bool non_cpu_dst = dst_device_type != DeviceType(DEVICE_CPU);
217
218 // TODO(phawkins): choose an allocator optimal for both the src and dst
219 // devices, not just the src device.
220 AllocatorAttributes host_alloc_attrs;
221 host_alloc_attrs.set_gpu_compatible(true);
222 host_alloc_attrs.set_on_host(true);
223 Allocator* cpu_allocator = src->GetAllocator(host_alloc_attrs);
224 Allocator* out_allocator = dst->GetAllocator(dst_alloc_attr);
225
226 // E.g., gpu -> gpu
227 if (non_cpu_src && non_cpu_dst) {
228 // Device to device copy. Look through registry for an appropriate
229 // CopyFunction.
230 std::vector<RegistrationInfo>* registry = MutableRegistry();
231 for (const RegistrationInfo& ri : *registry) {
232 if (ri.sender_device_type == src_device_type &&
233 ri.receiver_device_type == dst_device_type) {
234 CopyDeviceToDevice(ri.copy_function, cpu_allocator, out_allocator,
235 send_dev_context, recv_dev_context, src, dst,
236 src_alloc_attr, dst_alloc_attr, input, output,
237 dev_to_dev_stream_index, std::move(done));
238 return;
239 }
240 }
241
242 // Fall back to copying via the host.
243 VLOG(1) << "No function registered to copy from devices of type "
244 << src_device_type.type() << " to devices of type "
245 << dst_device_type.type()
246 << ". Falling back to copying via the host.";
247
248 Tensor* cpu_tensor =
249 new Tensor(cpu_allocator, input->dtype(), input->shape());
250 auto delete_and_done = [cpu_tensor,
251 done = std::move(done)](const Status& status) {
252 delete cpu_tensor;
253 done(status);
254 };
255 auto then_copy_to_other_device =
256 [delete_and_done = std::move(delete_and_done), recv_dev_context,
257 cpu_tensor, cpu_allocator, out_allocator, edge_name, dst, output,
258 sync_dst_compute](Status status) {
259 if (!status.ok()) {
260 delete_and_done(status);
261 return;
262 }
263 CopyHostToDevice(cpu_tensor, cpu_allocator, out_allocator, edge_name,
264 dst, output, recv_dev_context,
265 std::move(delete_and_done), sync_dst_compute);
266 };
267 CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
268 cpu_tensor, send_dev_context,
269 std::move(then_copy_to_other_device));
270 return;
271 }
272
273 // E.g., gpu -> cpu
274 if (non_cpu_src && !non_cpu_dst) {
275 // Device to host copy.
276 CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
277 output, send_dev_context, std::move(done));
278 return;
279 }
280
281 // E.g., cpu -> gpu
282 if (!non_cpu_src && non_cpu_dst) {
283 // Host to Device copy.
284 CopyHostToDevice(input, cpu_allocator, out_allocator, edge_name, dst,
285 output, recv_dev_context, std::move(done),
286 sync_dst_compute);
287 return;
288 }
289
290 // cpu -> cpu
291 CHECK(!non_cpu_src && !non_cpu_dst);
292 *output = *input;
293 done(Status::OK());
294 }
295
296 // static
Register(DeviceType sender_device_type,DeviceType receiver_device_type,CopyFunction copy_function)297 Status CopyTensor::Register(DeviceType sender_device_type,
298 DeviceType receiver_device_type,
299 CopyFunction copy_function) {
300 std::vector<RegistrationInfo>* registry = MutableRegistry();
301 registry->emplace_back(sender_device_type, receiver_device_type,
302 copy_function);
303 return Status::OK();
304 }
305
306 namespace {
307
308 // The following registrations enable a DT_VARIANT tensor element that contains
309 // a wrapped `tensorflow::Tensor` to be copied between devices.
WrappedTensorDeviceCopy(const Tensor & from,Tensor * to,const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn & copy)310 static Status WrappedTensorDeviceCopy(
311 const Tensor& from, Tensor* to,
312 const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn& copy) {
313 if (DMAHelper::CanUseDMA(&from)) {
314 TF_RETURN_IF_ERROR(copy(from, to));
315 } else {
316 *to = from;
317 }
318
319 return Status::OK();
320 }
321
322 #define REGISTER_WRAPPED_TENSOR_COPY(DIRECTION) \
323 INTERNAL_REGISTER_UNARY_VARIANT_DEVICE_COPY_FUNCTION( \
324 Tensor, DIRECTION, WrappedTensorDeviceCopy)
325
326 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::HOST_TO_DEVICE);
327 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_HOST);
328 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_DEVICE);
329
330 } // namespace
331
CopyDeviceToHost(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * src,Tensor * output,DeviceContext * send_dev_context,StatusCallback done)332 void CopyDeviceToHost(const Tensor* input, Allocator* cpu_allocator,
333 Allocator* out_allocator, StringPiece edge_name,
334 Device* src, Tensor* output,
335 DeviceContext* send_dev_context, StatusCallback done) {
336 if (input->dtype() == DT_VARIANT) {
337 Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
338 auto* status_cb = new ReffedStatusCallback(std::move(done));
339 core::ScopedUnref status_cb_unref(status_cb);
340
341 auto wrapped_done = [status_cb](const Status& s) {
342 status_cb->UpdateStatus(s);
343 status_cb->Unref();
344 };
345 auto copier =
346 [edge_name, src, send_dev_context, out_allocator, status_cb,
347 cpu_allocator, wrapped_done = std::move(wrapped_done)](
348 const Tensor& from, Tensor* to) {
349 if (from.dtype() == DT_VARIANT) {
350 status_cb->Ref();
351 CopyDeviceToHost(&from, cpu_allocator, out_allocator, edge_name,
352 src, to, send_dev_context, wrapped_done);
353 return Status::OK();
354 } else {
355 if (!DMAHelper::CanUseDMA(&from)) {
356 Status err = errors::InvalidArgument(
357 "During Variant Device->Host Copy: "
358 "non-DMA-copy attempted of tensor type: ",
359 DataTypeString(from.dtype()));
360 status_cb->UpdateStatus(err);
361 return err;
362 }
363 if (status_cb->ok()) {
364 status_cb->Ref();
365 *to = Tensor(out_allocator, from.dtype(), from.shape());
366 send_dev_context->CopyDeviceTensorToCPU(&from, edge_name, src, to,
367 wrapped_done);
368 return Status::OK();
369 } else {
370 return status_cb->status();
371 }
372 }
373 };
374
375 const Variant* v = input->flat<Variant>().data();
376 Variant* v_out = copy.flat<Variant>().data();
377 Status s_copy_init;
378 for (int64 i = 0; i < input->NumElements(); ++i) {
379 s_copy_init = VariantDeviceCopy(
380 VariantDeviceCopyDirection::DEVICE_TO_HOST, v[i], &v_out[i], copier);
381 if (!s_copy_init.ok()) {
382 status_cb->UpdateStatus(s_copy_init);
383 break;
384 }
385 }
386 if (s_copy_init.ok()) {
387 *output = std::move(copy);
388 }
389 } else if (input->dtype() == DT_RESOURCE) {
390 *output = *input;
391 done(Status::OK());
392 } else {
393 send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, output,
394 std::move(done));
395 }
396 }
397
398 } // namespace tensorflow
399