• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
17 
18 #include "tensorflow/core/common_runtime/copy_tensor.h"
19 #include "tensorflow/core/common_runtime/device.h"
20 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
23 #include "tensorflow/core/common_runtime/gpu_device_context.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/framework/tensor.pb.h"
26 #include "tensorflow/core/framework/tensor_reference.h"
27 #include "tensorflow/core/framework/types.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/lib/core/refcount.h"
30 #include "tensorflow/core/lib/hash/hash.h"
31 #include "tensorflow/core/lib/strings/strcat.h"
32 #include "tensorflow/core/lib/strings/stringprintf.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/stream_executor.h"
35 #include "tensorflow/core/platform/tensor_coding.h"
36 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
37 #include "tensorflow/core/util/util.h"
38 
39 // IMPLEMENTATION NOTE:
40 //
41 // 1. Within this module, we intentionally LOG(FATAL) if any stream
42 //    involved in memcpy becomes !stream->ok(), because TF process
43 //    today (1/2016) can not properly recover from such an error.
44 //
45 // 2. When 0-size tensor is being copied, we should not schedule a
46 //    copy ThenMemcpy since there is no byte to move. However, we must
47 //    ensure the causal ordering by arranging the copy done callback
48 //    happens-after all activities scheduled on the given stream being
49 //    finished.
50 
51 // If this need to be runtime configurable, consider adding options to
52 // ConfigProto.
53 const int64_t FLAGS_brain_gpu_util_debug_string_maxlen = 128;
54 extern bool FLAGS_brain_gpu_record_mem_types;
55 
56 namespace tensorflow {
57 
58 using se::DeviceMemoryBase;
59 using se::Stream;
60 
PrepareCopy(Device * device,const DeviceContext * ctx,const Tensor & src,const Tensor * dst,const DeviceBase::AcceleratorDeviceInfo ** dev_info,se::Stream ** stream)61 Status PrepareCopy(Device* device, const DeviceContext* ctx, const Tensor& src,
62                    const Tensor* dst,
63                    const DeviceBase::AcceleratorDeviceInfo** dev_info,
64                    se::Stream** stream) {
65   if (device == nullptr) {
66     return errors::Internal("Unexpected null device.");
67   }
68   auto di = device->tensorflow_accelerator_device_info();
69   if (di == nullptr) {
70     return errors::Internal("Unexpected null device info.");
71   }
72   *dev_info = di;
73   if (ctx == nullptr) {
74     return errors::Internal("Unexpected null device context.");
75   }
76   auto gs = static_cast<const GPUDeviceContext*>(ctx)->stream();
77   if (gs == nullptr) {
78     return errors::Internal("No gpu stream is available.");
79   }
80   *stream = gs;
81   if (dst != nullptr) {
82     if (src.dtype() != dst->dtype()) {
83       return errors::Internal("Can't copy a tensor of ",
84                               DataTypeString(src.dtype()), " into a tensor of ",
85                               DataTypeString(dst->dtype()));
86     }
87     if (src.TotalBytes() != dst->TotalBytes()) {
88       return errors::Internal("Can't copy ", src.TotalBytes(),
89                               " bytes of a tensor into another with ",
90                               dst->TotalBytes(), " bytes buffer.");
91     }
92     if ((src.TotalBytes() > 0) && !src.IsInitialized()) {
93       return errors::Internal("Src tensor is not initialized.");
94     }
95     if ((dst->TotalBytes() > 0) && !dst->IsInitialized()) {
96       return errors::Internal("Dst tensor is not initialized.");
97     }
98   }
99   if (!DMAHelper::CanUseDMA(&src)) {
100     return errors::Internal("GPU copy from non-DMA ",
101                             DataTypeString(src.dtype()), " tensor");
102   }
103   return OkStatus();
104 }
105 
GetBase(const Tensor * src)106 void* GetBase(const Tensor* src) {
107   return const_cast<void*>(DMAHelper::base(src));
108 }
109 
GetBase(Tensor * dst)110 void* GetBase(Tensor* dst) { return DMAHelper::base(dst); }
111 
112 /*static*/
SetProtoFromGPU(const Tensor & tensor,Device * dev,const DeviceContext * device_context,TensorProto * proto,bool is_dead,StatusCallback done)113 void GPUUtil::SetProtoFromGPU(const Tensor& tensor, Device* dev,
114                               const DeviceContext* device_context,
115                               TensorProto* proto, bool is_dead,
116                               StatusCallback done) {
117   VLOG(1) << "SetProtoFromGPU device_context " << device_context;
118   const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
119   se::Stream* send_stream = nullptr;
120   Status s = PrepareCopy(dev, device_context, tensor, nullptr, &dev_info,
121                          &send_stream);
122   if (!s.ok()) {
123     done(s);
124     return;
125   }
126 
127   auto send_device_to_host_stream =
128       static_cast<const GPUDeviceContext*>(device_context)
129           ->device_to_host_stream();
130   if (send_device_to_host_stream == nullptr) {
131     done(errors::Internal("No send gpu copy-out-stream is available."));
132     return;
133   }
134   // Wait for the sender's main stream to make sure the data are available.
135   send_device_to_host_stream->ThenWaitFor(send_stream);
136 
137   // Tensor values need to be copied from GPU to CPU ram so that
138   // we can build the protobuf response for a RecvTensor RPC.
139   // "device context" identifies the stream where the _Send op executed.
140   proto->set_dtype(tensor.dtype());
141   tensor.shape().AsProto(proto->mutable_tensor_shape());
142 
143   // Prepare a proto with the right data buf size, and DMA the data
144   // over from the GPU buffer.  Note that 0-size tensors do not have a
145   // backing buffer.
146   Allocator* alloc = nullptr;
147   char* buf = nullptr;
148   const int64_t total_bytes = is_dead ? 0 : tensor.TotalBytes();
149   if (total_bytes > 0) {
150     profiler::ScopedAnnotation annotation("SetProtoFromGPU");
151     alloc = GPUProcessState::singleton()->GetGpuHostAllocator(0);
152     buf = static_cast<char*>(
153         alloc->AllocateRaw(Allocator::kAllocatorAlignment, total_bytes));
154     if (LogMemory::IsEnabled()) {
155       LogMemory::RecordRawAllocation("SetProtoFromGPU",
156                                      LogMemory::PROTO_BUFFER_STEP_ID,
157                                      total_bytes, buf, alloc);
158     }
159     void* src_ptr = GetBase(&tensor);
160     DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
161     send_device_to_host_stream->ThenMemcpy(buf, gpu_src_ptr, total_bytes);
162   }
163   // Use of tensor may outlive stack scope, so keep a ref.
164   TensorReference tensor_ref(tensor);
165   dev_info->event_mgr->ThenExecute(
166       send_device_to_host_stream, [send_device_to_host_stream, done, proto, buf,
167                                    total_bytes, alloc, tensor_ref]() {
168         if (!send_device_to_host_stream->ok()) {
169           LOG(FATAL) << "SetProtoFromGPU: GPU Memcpy failed";
170         }
171         tensor_ref.Unref();
172         if (total_bytes > 0) {
173           port::CopyFromArray(proto->mutable_tensor_content(), buf,
174                               total_bytes);
175           if (LogMemory::IsEnabled()) {
176             LogMemory::RecordRawDeallocation("SetProtoFromGPU",
177                                              LogMemory::PROTO_BUFFER_STEP_ID,
178                                              buf, alloc, false);
179           }
180           alloc->DeallocateRaw(buf);
181         }
182         done(OkStatus());
183       });
184 }
185 
186 // static
DeviceToDeviceCopy(DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,AllocatorAttributes src_alloc_attr,AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)187 void GPUUtil::DeviceToDeviceCopy(
188     DeviceContext* send_dev_context, DeviceContext* recv_dev_context,
189     Device* src, Device* dst, AllocatorAttributes src_alloc_attr,
190     AllocatorAttributes dst_alloc_attr, const Tensor* input, Tensor* output,
191     int dev_to_dev_stream_index, StatusCallback done) {
192   const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
193   se::Stream* send_stream = nullptr;
194   Status s = PrepareCopy(src, send_dev_context, *input, output, &dev_info,
195                          &send_stream);
196   if (!s.ok()) {
197     done(s);
198     return;
199   }
200   auto send_device_to_device_stream =
201       static_cast<const GPUDeviceContext*>(send_dev_context)
202           ->device_to_device_stream(dev_to_dev_stream_index);
203   if (send_device_to_device_stream == nullptr) {
204     done(errors::Internal("No send gpu copy-out-stream is available."));
205     return;
206   }
207   // Wait for the main stream on the sender to make sure the result is
208   // available.
209   send_device_to_device_stream->ThenWaitFor(send_stream);
210 
211   const int64_t total_bytes = input->TotalBytes();
212   if (total_bytes > 0) {
213     void* src_ptr = GetBase(input);
214     DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
215     void* dst_ptr = GetBase(output);
216     DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
217     auto recv_stream =
218         static_cast<const GPUDeviceContext*>(recv_dev_context)->stream();
219     if (recv_stream == nullptr) {
220       done(errors::Internal("No recv gpu stream is available."));
221       return;
222     }
223     // Since we want to use the memory from recv_stream in the
224     // send_device_to_device_stream, add a dependency to make sure the memory is
225     // truly free.
226     // TODO(zhengxq): remove this dependency when we switch to a better way
227     // to make sure the memory is free.
228     send_device_to_device_stream->ThenWaitFor(recv_stream);
229 
230     VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
231     send_device_to_device_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr,
232                                              total_bytes);
233   }
234 
235   // Use of input may outlive stack scope, so keep a ref.
236   TensorReference input_ref(*input);
237   dev_info->event_mgr->ThenExecute(
238       send_device_to_device_stream,
239       [done, send_device_to_device_stream, input_ref]() {
240         input_ref.Unref();
241         if (!send_device_to_device_stream->ok()) {
242           LOG(FATAL) << "GPU->GPU Memcpy failed";
243         }
244         done(OkStatus());
245       });
246   send_dev_context->MaintainLifetimeOnStream(input,
247                                              send_device_to_device_stream);
248 }
249 
250 static CopyTensor::Registration register_gpu_gpu_copy(
251     DEVICE_GPU, DEVICE_GPU, GPUUtil::DeviceToDeviceCopy);
252 
253 namespace {
254 
255 // Returns whether staging is needed based on tensor buffer's memory type.
NeedStaging(const Tensor * tensor)256 bool NeedStaging(const Tensor* tensor) {
257   // Only stage data if the host tensor is on pageable memory.
258   // So if the memory type is unknown, it will fallback to GPU driver to handle
259   // the staging if needed.
260   return tensor->GetMemoryType() == AllocatorMemoryType::kHostPageable;
261 }
262 
263 }  // namespace
264 
265 // static
CopyGPUTensorToCPU(Device * gpu_device,const DeviceContext * device_context,const Tensor * gpu_tensor,Tensor * cpu_tensor,StatusCallback done)266 void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
267                                  const DeviceContext* device_context,
268                                  const Tensor* gpu_tensor, Tensor* cpu_tensor,
269                                  StatusCallback done) {
270   VLOG(1) << "CopyGPUTensorToCPU";
271   const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
272   se::Stream* send_stream = nullptr;
273   Status s = PrepareCopy(gpu_device, device_context, *gpu_tensor, cpu_tensor,
274                          &dev_info, &send_stream);
275   if (!s.ok()) {
276     done(s);
277     return;
278   }
279 
280   auto send_device_to_host_stream =
281       static_cast<const GPUDeviceContext*>(device_context)
282           ->device_to_host_stream();
283   if (send_device_to_host_stream == nullptr) {
284     done(errors::Internal("No send gpu copy-out-stream is available."));
285     return;
286   }
287   // Wait for the sender's main stream to make sure the data are available.
288   send_device_to_host_stream->ThenWaitFor(send_stream);
289 
290   const int64_t total_bytes = gpu_tensor->TotalBytes();
291   if (total_bytes > 0) {
292     void* src_ptr = GetBase(gpu_tensor);
293     DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
294     void* dst_ptr = GetBase(cpu_tensor);
295     send_device_to_host_stream->ThenMemcpy(dst_ptr, gpu_src_ptr, total_bytes);
296   }
297   // Use of the input may outlive stack scope, so keep a ref.
298   TensorReference input_ref(*gpu_tensor);
299   dev_info->event_mgr->ThenExecute(
300       send_device_to_host_stream,
301       [send_device_to_host_stream, done, input_ref]() {
302         if (!send_device_to_host_stream->ok()) {
303           LOG(FATAL) << "GPU->CPU Memcpy failed";
304         }
305         input_ref.Unref();
306         done(OkStatus());
307       });
308 }
309 
310 /*  static */
CopyCPUTensorToGPU(const Tensor * cpu_tensor,const DeviceContext * device_context,Device * gpu_device,Tensor * gpu_tensor,StatusCallback done,bool sync_dst_compute)311 void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
312                                  const DeviceContext* device_context,
313                                  Device* gpu_device, Tensor* gpu_tensor,
314                                  StatusCallback done, bool sync_dst_compute) {
315   VLOG(1) << "CopyCPUTensorToGPU";
316   const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
317   se::Stream* recv_stream = nullptr;
318   Status s = PrepareCopy(gpu_device, device_context, *cpu_tensor, gpu_tensor,
319                          &dev_info, &recv_stream);
320   if (!s.ok()) {
321     done(s);
322     return;
323   }
324 
325   auto recv_host_to_device_stream =
326       static_cast<const GPUDeviceContext*>(device_context)
327           ->host_to_device_stream();
328   if (recv_host_to_device_stream == nullptr) {
329     done(errors::Internal("No send gpu copy-out-stream is available."));
330     return;
331   }
332   // Wait for the recv-stream to make sure the buffer is truly available.
333   if (sync_dst_compute) {
334     recv_host_to_device_stream->ThenWaitFor(recv_stream);
335   }
336 
337   const int64_t total_bytes = cpu_tensor->TotalBytes();
338 
339   bool do_staging = false;
340   void* staging_buffer = nullptr;
341   Allocator* host_memory_allocator = device_context->host_memory_allocator();
342 
343   // Use of cpu_tensor may outlive stack scope, so keep a ref.
344   TensorReference input_ref(*cpu_tensor);
345 
346   // Note that 0-size tensors have no backing buffer.
347   if (total_bytes > 0) {
348     void* src_ptr = GetBase(cpu_tensor);
349     void* dst_ptr = GetBase(gpu_tensor);
350     DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
351 
352     if (NeedStaging(cpu_tensor)) {
353       if (host_memory_allocator == nullptr) {
354         LOG_FIRST_N(WARNING, 1)
355             << "No host memory allocator is available to "
356                "stage data for CPU->GPU transfer. Staging will be skipped.";
357       } else {
358         do_staging = true;
359       }
360     }
361 
362     if (do_staging) {
363       staging_buffer = host_memory_allocator->AllocateRaw(
364           tensorflow::Allocator::kAllocatorAlignment, total_bytes);
365       std::memcpy(staging_buffer, src_ptr, total_bytes);
366       input_ref.Unref();
367 
368       recv_host_to_device_stream->ThenMemcpy(&gpu_dst_ptr, staging_buffer,
369                                              total_bytes);
370     } else {
371       recv_host_to_device_stream->ThenMemcpy(&gpu_dst_ptr, src_ptr,
372                                              total_bytes);
373     }
374   }
375 
376   dev_info->event_mgr->ThenExecute(
377       recv_host_to_device_stream,
378       [recv_host_to_device_stream, done, input_ref, do_staging, staging_buffer,
379        host_memory_allocator]() {
380         if (do_staging) {
381           host_memory_allocator->DeallocateRaw(staging_buffer);
382         } else {
383           input_ref.Unref();
384         }
385         if (!recv_host_to_device_stream->ok()) {
386           LOG(FATAL) << "CPU->GPU Memcpy failed";
387         }
388         done(OkStatus());
389       });
390 }
391 
Sync(Device * gpu_device)392 Status GPUUtil::Sync(Device* gpu_device) {
393   VLOG(1) << "GPUUtil::Sync";
394   auto* dev_info = gpu_device->tensorflow_accelerator_device_info();
395   if (!dev_info) {
396     return errors::Internal("Failed to find dest device GPUDeviceInfo");
397   }
398   return dev_info->stream->BlockHostUntilDone();
399 }
400 
SyncAll(Device * gpu_device)401 Status GPUUtil::SyncAll(Device* gpu_device) {
402   VLOG(1) << "GPUUtil::SyncAll";
403   auto* dev_info = gpu_device->tensorflow_accelerator_device_info();
404   if (!dev_info) {
405     return errors::Internal("Failed to find dest device GPUDeviceInfo");
406   }
407   if (!dev_info->stream->parent()->SynchronizeAllActivity() ||
408       !dev_info->stream->ok()) {
409     return errors::Internal("GPU sync failed");
410   }
411   return OkStatus();
412 }
413 
MemoryDebugString(const Device * device,Tensor * tensor)414 string GPUUtil::MemoryDebugString(const Device* device, Tensor* tensor) {
415   string ret;
416   CHECK(tensor);
417   const int64_t num_bytes = std::min<int64_t>(
418       FLAGS_brain_gpu_util_debug_string_maxlen, tensor->TotalBytes());
419   void* ptr = (num_bytes > 0) ? GetBase(tensor) : nullptr;
420   strings::Appendf(&ret, "%p:", ptr);
421   if (num_bytes > 0) {
422     auto* dev_info = device->tensorflow_accelerator_device_info();
423     if (!dev_info) {
424       strings::StrAppend(
425           &ret, PrintMemory(reinterpret_cast<const char*>(ptr), num_bytes));
426     } else {
427       string buf;
428       buf.resize(num_bytes);
429       DeviceMemoryBase gpu_ptr(ptr, num_bytes);
430       auto s = dev_info->stream->parent()->SynchronousMemcpyD2H(
431           gpu_ptr, num_bytes, &*buf.begin());
432       strings::StrAppend(&ret, PrintMemory(&*buf.begin(), num_bytes));
433     }
434   }
435   return ret;
436 }
437 
438 // TODO(pbar) Checksum is called from places without a valid device context.
Checksum(Device * gpu_device,const DeviceContext * device_context,const Tensor & tensor)439 uint64 GPUUtil::Checksum(Device* gpu_device,
440                          const DeviceContext* device_context,
441                          const Tensor& tensor) {
442   Tensor copy(tensor.dtype(), tensor.shape());
443   Status s;
444   Notification n;
445   CopyGPUTensorToCPU(gpu_device, device_context, &tensor, &copy,
446                      [&s, &n](Status status) {
447                        s.Update(status);
448                        n.Notify();
449                      });
450   n.WaitForNotification();
451   CHECK(s.ok()) << s;
452   return Checksum(copy);
453 }
454 
Checksum(const Tensor & tensor)455 uint64 GPUUtil::Checksum(const Tensor& tensor) {
456   const float* fptr = reinterpret_cast<const float*>(GetBase(&tensor));
457   size_t num_bytes = tensor.TotalBytes();
458   size_t num_floats = num_bytes / sizeof(float);
459   for (size_t i = 0; i < num_floats; ++i) {
460     CHECK(!std::isnan(fptr[i])) << " i " << i;
461   }
462   // TODO(tucker): consider using crc32c instead.
463   return Hash64(reinterpret_cast<const char*>(GetBase(&tensor)),
464                 tensor.TotalBytes(), 0);
465 }
466 
467 // static
CopyGPUTensorToSameGPU(Device * gpu_device,const DeviceContext * device_context,const Tensor * src_gpu_tensor,Tensor * dst_gpu_tensor,StatusCallback done)468 void GPUUtil::CopyGPUTensorToSameGPU(Device* gpu_device,
469                                      const DeviceContext* device_context,
470                                      const Tensor* src_gpu_tensor,
471                                      Tensor* dst_gpu_tensor,
472                                      StatusCallback done) {
473   VLOG(1) << "CopyGPUTensorToSameGPU";
474   const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
475   se::Stream* send_stream = nullptr;
476   Status s = PrepareCopy(gpu_device, device_context, *src_gpu_tensor,
477                          dst_gpu_tensor, &dev_info, &send_stream);
478   if (!s.ok()) {
479     done(s);
480     return;
481   }
482 
483   const int64_t total_bytes = src_gpu_tensor->TotalBytes();
484   if (total_bytes > 0) {
485     void* src_ptr = GetBase(src_gpu_tensor);
486     DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
487     void* dst_ptr = GetBase(dst_gpu_tensor);
488     DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
489     send_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr, total_bytes);
490   }
491 
492   done(OkStatus());
493 }
494 
495 }  // namespace tensorflow
496