• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/copy_tensor.h"
17 
18 #include <atomic>
19 #include <utility>
20 #include <vector>
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/framework/variant_op_registry.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/refcount.h"
25 #include "tensorflow/core/platform/logging.h"
26 #include "tensorflow/core/platform/tracing.h"
27 #include "tensorflow/core/util/reffed_status_callback.h"
28 
29 namespace tensorflow {
30 namespace {
31 
32 struct RegistrationInfo {
RegistrationInfotensorflow::__anon4c89d33a0111::RegistrationInfo33   RegistrationInfo(DeviceType s, DeviceType r, CopyTensor::CopyFunction cf)
34       : sender_device_type(std::move(s)),
35         receiver_device_type(std::move(r)),
36         copy_function(cf) {}
37   DeviceType sender_device_type;
38   DeviceType receiver_device_type;
39   CopyTensor::CopyFunction copy_function;
40 };
41 
42 // We use a vector instead of a map since we expect there to be very
43 // few registrations.
MutableRegistry()44 std::vector<RegistrationInfo>* MutableRegistry() {
45   static std::vector<RegistrationInfo>* registry =
46       new std::vector<RegistrationInfo>;
47   return registry;
48 }
49 
CopyHostToDevice(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * dst,Tensor * output,DeviceContext * recv_dev_context,StatusCallback done)50 void CopyHostToDevice(const Tensor* input, Allocator* cpu_allocator,
51                       Allocator* out_allocator, StringPiece edge_name,
52                       Device* dst, Tensor* output,
53                       DeviceContext* recv_dev_context, StatusCallback done) {
54   if (input->dtype() == DT_VARIANT) {
55     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
56     auto* status_cb = new ReffedStatusCallback(std::move(done));
57     core::ScopedUnref status_cb_unref(status_cb);
58 
59     auto wrapped_done = [status_cb](const Status& s) {
60       status_cb->UpdateStatus(s);
61       status_cb->Unref();
62     };
63     auto copier = std::bind(
64         [dst, recv_dev_context, out_allocator, status_cb, cpu_allocator,
65          edge_name](StatusCallback wrapped_done_,
66                     // Begin unbound arguments
67                     const Tensor& from, Tensor* to) {
68           if (from.dtype() == DT_VARIANT) {
69             status_cb->Ref();
70             CopyHostToDevice(&from, cpu_allocator, out_allocator, edge_name,
71                              dst, to, recv_dev_context, wrapped_done_);
72             return Status::OK();
73           } else {
74             if (!DMAHelper::CanUseDMA(&from)) {
75               Status err = errors::InvalidArgument(
76                   "During Variant Host->Device Copy: "
77                   "non-DMA-copy attempted of tensor type: ",
78                   DataTypeString(from.dtype()));
79               status_cb->UpdateStatus(err);
80               return err;
81             }
82             if (status_cb->ok()) {
83               status_cb->Ref();
84               *to = Tensor(out_allocator, from.dtype(), from.shape());
85               recv_dev_context->CopyCPUTensorToDevice(&from, dst, to,
86                                                       wrapped_done_);
87               return Status::OK();
88             } else {
89               return status_cb->status();
90             }
91           }
92         },
93         std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
94 
95     const Variant* v = input->flat<Variant>().data();
96     Variant* v_out = copy.flat<Variant>().data();
97     Status s_copy_init;
98     for (int64 i = 0; i < input->NumElements(); ++i) {
99       s_copy_init = VariantDeviceCopy(
100           VariantDeviceCopyDirection::HOST_TO_DEVICE, v[i], &v_out[i], copier);
101       if (!s_copy_init.ok()) {
102         status_cb->UpdateStatus(s_copy_init);
103         break;
104       }
105     }
106     if (s_copy_init.ok()) {
107       *output = std::move(copy);
108     }
109   } else {
110     recv_dev_context->CopyCPUTensorToDevice(input, dst, output,
111                                             std::move(done));
112   }
113 }
114 
CopyDeviceToHost(const Tensor * input,Allocator * cpu_allocator,Allocator * out_allocator,StringPiece edge_name,Device * src,Tensor * output,DeviceContext * send_dev_context,StatusCallback done)115 void CopyDeviceToHost(const Tensor* input, Allocator* cpu_allocator,
116                       Allocator* out_allocator, StringPiece edge_name,
117                       Device* src, Tensor* output,
118                       DeviceContext* send_dev_context, StatusCallback done) {
119   if (input->dtype() == DT_VARIANT) {
120     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
121     auto* status_cb = new ReffedStatusCallback(std::move(done));
122     core::ScopedUnref status_cb_unref(status_cb);
123 
124     auto wrapped_done = [status_cb](const Status& s) {
125       status_cb->UpdateStatus(s);
126       status_cb->Unref();
127     };
128     auto copier = std::bind(
129         [edge_name, src, send_dev_context, out_allocator, status_cb,
130          cpu_allocator](StatusCallback wrapped_done_,
131                         // Begin unbound arguments
132                         const Tensor& from, Tensor* to) {
133           if (from.dtype() == DT_VARIANT) {
134             status_cb->Ref();
135             CopyDeviceToHost(&from, cpu_allocator, out_allocator, edge_name,
136                              src, to, send_dev_context, wrapped_done_);
137             return Status::OK();
138           } else {
139             if (!DMAHelper::CanUseDMA(&from)) {
140               Status err = errors::InvalidArgument(
141                   "During Variant Device->Host Copy: "
142                   "non-DMA-copy attempted of tensor type: ",
143                   DataTypeString(from.dtype()));
144               status_cb->UpdateStatus(err);
145               return err;
146             }
147             if (status_cb->ok()) {
148               status_cb->Ref();
149               *to = Tensor(out_allocator, from.dtype(), from.shape());
150               send_dev_context->CopyDeviceTensorToCPU(&from, edge_name, src, to,
151                                                       wrapped_done_);
152               return Status::OK();
153             } else {
154               return status_cb->status();
155             }
156           }
157         },
158         std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
159 
160     const Variant* v = input->flat<Variant>().data();
161     Variant* v_out = copy.flat<Variant>().data();
162     Status s_copy_init;
163     for (int64 i = 0; i < input->NumElements(); ++i) {
164       s_copy_init = VariantDeviceCopy(
165           VariantDeviceCopyDirection::DEVICE_TO_HOST, v[i], &v_out[i], copier);
166       if (!s_copy_init.ok()) {
167         status_cb->UpdateStatus(s_copy_init);
168         break;
169       }
170     }
171     if (s_copy_init.ok()) {
172       *output = std::move(copy);
173     }
174   } else {
175     send_dev_context->CopyDeviceTensorToCPU(input, edge_name, src, output,
176                                             std::move(done));
177   }
178 }
179 
CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,Allocator * cpu_allocator,Allocator * out_allocator,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)180 void CopyDeviceToDevice(CopyTensor::CopyFunction copy_function,
181                         Allocator* cpu_allocator, Allocator* out_allocator,
182                         DeviceContext* send_dev_context,
183                         DeviceContext* recv_dev_context, Device* src,
184                         Device* dst, const AllocatorAttributes src_alloc_attr,
185                         const AllocatorAttributes dst_alloc_attr,
186                         const Tensor* input, Tensor* output,
187                         int dev_to_dev_stream_index, StatusCallback done) {
188   if (input->dtype() == DT_VARIANT) {
189     Tensor copy(cpu_allocator, DT_VARIANT, input->shape());
190     auto* status_cb = new ReffedStatusCallback(std::move(done));
191     core::ScopedUnref status_cb_unref(status_cb);
192 
193     auto wrapped_done = [status_cb](const Status& s) {
194       status_cb->UpdateStatus(s);
195       status_cb->Unref();
196     };
197     auto copier = std::bind(
198         [copy_function, src, dst, src_alloc_attr, dst_alloc_attr,
199          recv_dev_context, send_dev_context, out_allocator, status_cb,
200          dev_to_dev_stream_index](StatusCallback wrapped_done_,
201                                   // Begin unbound arguments
202                                   const Tensor& from, Tensor* to) {
203           if (!DMAHelper::CanUseDMA(&from)) {
204             Status err = errors::InvalidArgument(
205                 "During Variant Device->Device Copy: "
206                 "non-DMA-copy attempted of tensor type: ",
207                 DataTypeString(from.dtype()));
208             status_cb->UpdateStatus(err);
209             return err;
210           }
211           if (status_cb->ok()) {
212             status_cb->Ref();
213             *to = Tensor(out_allocator, from.dtype(), from.shape());
214             copy_function(send_dev_context, recv_dev_context, src, dst,
215                           src_alloc_attr, dst_alloc_attr, &from, to,
216                           dev_to_dev_stream_index, std::move(wrapped_done_));
217             return Status::OK();
218           } else {
219             return status_cb->status();
220           }
221         },
222         std::move(wrapped_done), std::placeholders::_1, std::placeholders::_2);
223 
224     const Variant* v = input->flat<Variant>().data();
225     Variant* v_out = copy.flat<Variant>().data();
226     Status s_copy_init;
227     for (int64 i = 0; i < input->NumElements(); ++i) {
228       s_copy_init =
229           VariantDeviceCopy(VariantDeviceCopyDirection::DEVICE_TO_DEVICE, v[i],
230                             &v_out[i], copier);
231       if (!s_copy_init.ok()) {
232         status_cb->UpdateStatus(s_copy_init);
233         break;
234       }
235     }
236     if (s_copy_init.ok()) {
237       *output = std::move(copy);
238     }
239   } else {
240     copy_function(send_dev_context, recv_dev_context, src, dst, src_alloc_attr,
241                   dst_alloc_attr, input, output, dev_to_dev_stream_index,
242                   std::move(done));
243   }
244 }
245 
246 }  // namespace
247 
248 // static
ViaDMA(StringPiece edge_name,DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,const AllocatorAttributes src_alloc_attr,const AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)249 void CopyTensor::ViaDMA(StringPiece edge_name, DeviceContext* send_dev_context,
250                         DeviceContext* recv_dev_context, Device* src,
251                         Device* dst, const AllocatorAttributes src_alloc_attr,
252                         const AllocatorAttributes dst_alloc_attr,
253                         const Tensor* input, Tensor* output,
254                         int dev_to_dev_stream_index, StatusCallback done) {
255   tracing::ScopedAnnotation annotation(edge_name);
256   VLOG(1) << "Copy " << edge_name;
257 
258   const DeviceType src_device_type(
259       src_alloc_attr.on_host() ? DEVICE_CPU : src->attributes().device_type());
260   const DeviceType dst_device_type(
261       dst_alloc_attr.on_host() ? DEVICE_CPU : dst->attributes().device_type());
262   const bool non_cpu_src = src_device_type != DeviceType(DEVICE_CPU);
263   const bool non_cpu_dst = dst_device_type != DeviceType(DEVICE_CPU);
264 
265   // TODO(phawkins): choose an allocator optimal for both the src and dst
266   // devices, not just the src device.
267   AllocatorAttributes host_alloc_attrs;
268   host_alloc_attrs.set_gpu_compatible(true);
269   host_alloc_attrs.set_on_host(true);
270   Allocator* cpu_allocator = src->GetAllocator(host_alloc_attrs);
271   Allocator* out_allocator = dst->GetAllocator(dst_alloc_attr);
272 
273   // E.g., gpu -> gpu
274   if (non_cpu_src && non_cpu_dst) {
275     // Device to device copy.  Look through registry for an appropriate
276     // CopyFunction.
277     std::vector<RegistrationInfo>* registry = MutableRegistry();
278     for (const RegistrationInfo& ri : *registry) {
279       if (ri.sender_device_type == src_device_type &&
280           ri.receiver_device_type == dst_device_type) {
281         CopyDeviceToDevice(ri.copy_function, cpu_allocator, out_allocator,
282                            send_dev_context, recv_dev_context, src, dst,
283                            src_alloc_attr, dst_alloc_attr, input, output,
284                            dev_to_dev_stream_index, std::move(done));
285         return;
286       }
287     }
288 
289     // Fall back to copying via the host.
290     VLOG(1) << "No function registered to copy from devices of type "
291             << src_device_type.type() << " to devices of type "
292             << dst_device_type.type()
293             << ". Falling back to copying via the host.";
294 
295     Tensor* cpu_tensor =
296         new Tensor(cpu_allocator, input->dtype(), input->shape());
297     std::function<void(const Status&)> delete_and_done = std::bind(
298         [cpu_tensor](StatusCallback done_,
299                      // Begin unbound arguments.
300                      const Status& status) {
301           delete cpu_tensor;
302           done_(status);
303         },
304         std::move(done), std::placeholders::_1);
305     std::function<void(const Status&)> then_copy_to_other_device = std::bind(
306         [delete_and_done, recv_dev_context, cpu_tensor, cpu_allocator,
307          out_allocator, edge_name, dst, output](StatusCallback delete_and_done_,
308                                                 // Begin unbound arguments.
309                                                 Status status) {
310           if (!status.ok()) {
311             delete_and_done_(status);
312             return;
313           }
314           CopyHostToDevice(cpu_tensor, cpu_allocator, out_allocator, edge_name,
315                            dst, output, recv_dev_context,
316                            std::move(delete_and_done_));
317         },
318         std::move(delete_and_done), std::placeholders::_1);
319     CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
320                      cpu_tensor, send_dev_context,
321                      std::move(then_copy_to_other_device));
322     return;
323   }
324 
325   // E.g., gpu -> cpu
326   if (non_cpu_src && !non_cpu_dst) {
327     // Device to host copy.
328     CopyDeviceToHost(input, cpu_allocator, out_allocator, edge_name, src,
329                      output, send_dev_context, std::move(done));
330     return;
331   }
332 
333   // E.g., cpu -> gpu
334   if (!non_cpu_src && non_cpu_dst) {
335     // Host to Device copy.
336     CopyHostToDevice(input, cpu_allocator, out_allocator, edge_name, dst,
337                      output, recv_dev_context, std::move(done));
338     return;
339   }
340 
341   // cpu -> cpu
342   CHECK(!non_cpu_src && !non_cpu_dst);
343   *output = *input;
344   done(Status::OK());
345 }
346 
347 // static
Register(DeviceType sender_device_type,DeviceType receiver_device_type,CopyFunction copy_function)348 Status CopyTensor::Register(DeviceType sender_device_type,
349                             DeviceType receiver_device_type,
350                             CopyFunction copy_function) {
351   std::vector<RegistrationInfo>* registry = MutableRegistry();
352   registry->emplace_back(sender_device_type, receiver_device_type,
353                          copy_function);
354   return Status::OK();
355 }
356 
357 namespace {
358 
359 // The following registrations enable a DT_VARIANT tensor element that contains
360 // a wrapped `tensorflow::Tensor` to be copied between devices.
WrappedTensorDeviceCopy(const Tensor & from,Tensor * to,const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn & copy)361 static Status WrappedTensorDeviceCopy(
362     const Tensor& from, Tensor* to,
363     const UnaryVariantOpRegistry::AsyncTensorDeviceCopyFn& copy) {
364   if (from.dtype() == DT_VARIANT) {
365     // TODO(b/116349787): Implement support for nested variants.
366     return errors::Unimplemented(
367         "Support for copying nested variants to device has not yet been "
368         "implemented.");
369   } else if (DMAHelper::CanUseDMA(&from)) {
370     TF_RETURN_IF_ERROR(copy(from, to));
371   } else {
372     *to = from;
373   }
374 
375   return Status::OK();
376 }
377 
378 #define REGISTER_WRAPPED_TENSOR_COPY(DIRECTION)         \
379   INTERNAL_REGISTER_UNARY_VARIANT_DEVICE_COPY_FUNCTION( \
380       Tensor, DIRECTION, WrappedTensorDeviceCopy)
381 
382 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::HOST_TO_DEVICE);
383 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_HOST);
384 REGISTER_WRAPPED_TENSOR_COPY(VariantDeviceCopyDirection::DEVICE_TO_DEVICE);
385 
386 }  // namespace
387 
388 }  // namespace tensorflow
389