1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
17
18 #include "tensorflow/core/common_runtime/copy_tensor.h"
19 #include "tensorflow/core/common_runtime/device.h"
20 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
23 #include "tensorflow/core/common_runtime/gpu_device_context.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/framework/tensor.pb.h"
26 #include "tensorflow/core/framework/tensor_reference.h"
27 #include "tensorflow/core/framework/types.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/lib/core/refcount.h"
30 #include "tensorflow/core/lib/hash/hash.h"
31 #include "tensorflow/core/lib/strings/strcat.h"
32 #include "tensorflow/core/lib/strings/stringprintf.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/stream_executor.h"
35 #include "tensorflow/core/platform/tensor_coding.h"
36 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
37 #include "tensorflow/core/util/util.h"
38
39 // IMPLEMENTATION NOTE:
40 //
41 // 1. Within this module, we intentionally LOG(FATAL) if any stream
42 // involved in memcpy becomes !stream->ok(), because TF process
43 // today (1/2016) can not properly recover from such an error.
44 //
45 // 2. When 0-size tensor is being copied, we should not schedule a
46 // copy ThenMemcpy since there is no byte to move. However, we must
47 // ensure the causal ordering by arranging the copy done callback
48 // happens-after all activities scheduled on the given stream being
49 // finished.
50
51 // If this need to be runtime configurable, consider adding options to
52 // ConfigProto.
53 const int64_t FLAGS_brain_gpu_util_debug_string_maxlen = 128;
54 extern bool FLAGS_brain_gpu_record_mem_types;
55
56 namespace tensorflow {
57
58 using se::DeviceMemoryBase;
59 using se::Stream;
60
PrepareCopy(Device * device,const DeviceContext * ctx,const Tensor & src,const Tensor * dst,const DeviceBase::AcceleratorDeviceInfo ** dev_info,se::Stream ** stream)61 Status PrepareCopy(Device* device, const DeviceContext* ctx, const Tensor& src,
62 const Tensor* dst,
63 const DeviceBase::AcceleratorDeviceInfo** dev_info,
64 se::Stream** stream) {
65 if (device == nullptr) {
66 return errors::Internal("Unexpected null device.");
67 }
68 auto di = device->tensorflow_accelerator_device_info();
69 if (di == nullptr) {
70 return errors::Internal("Unexpected null device info.");
71 }
72 *dev_info = di;
73 if (ctx == nullptr) {
74 return errors::Internal("Unexpected null device context.");
75 }
76 auto gs = static_cast<const GPUDeviceContext*>(ctx)->stream();
77 if (gs == nullptr) {
78 return errors::Internal("No gpu stream is available.");
79 }
80 *stream = gs;
81 if (dst != nullptr) {
82 if (src.dtype() != dst->dtype()) {
83 return errors::Internal("Can't copy a tensor of ",
84 DataTypeString(src.dtype()), " into a tensor of ",
85 DataTypeString(dst->dtype()));
86 }
87 if (src.TotalBytes() != dst->TotalBytes()) {
88 return errors::Internal("Can't copy ", src.TotalBytes(),
89 " bytes of a tensor into another with ",
90 dst->TotalBytes(), " bytes buffer.");
91 }
92 if ((src.TotalBytes() > 0) && !src.IsInitialized()) {
93 return errors::Internal("Src tensor is not initialized.");
94 }
95 if ((dst->TotalBytes() > 0) && !dst->IsInitialized()) {
96 return errors::Internal("Dst tensor is not initialized.");
97 }
98 }
99 if (!DMAHelper::CanUseDMA(&src)) {
100 return errors::Internal("GPU copy from non-DMA ",
101 DataTypeString(src.dtype()), " tensor");
102 }
103 return OkStatus();
104 }
105
GetBase(const Tensor * src)106 void* GetBase(const Tensor* src) {
107 return const_cast<void*>(DMAHelper::base(src));
108 }
109
GetBase(Tensor * dst)110 void* GetBase(Tensor* dst) { return DMAHelper::base(dst); }
111
112 /*static*/
SetProtoFromGPU(const Tensor & tensor,Device * dev,const DeviceContext * device_context,TensorProto * proto,bool is_dead,StatusCallback done)113 void GPUUtil::SetProtoFromGPU(const Tensor& tensor, Device* dev,
114 const DeviceContext* device_context,
115 TensorProto* proto, bool is_dead,
116 StatusCallback done) {
117 VLOG(1) << "SetProtoFromGPU device_context " << device_context;
118 const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
119 se::Stream* send_stream = nullptr;
120 Status s = PrepareCopy(dev, device_context, tensor, nullptr, &dev_info,
121 &send_stream);
122 if (!s.ok()) {
123 done(s);
124 return;
125 }
126
127 auto send_device_to_host_stream =
128 static_cast<const GPUDeviceContext*>(device_context)
129 ->device_to_host_stream();
130 if (send_device_to_host_stream == nullptr) {
131 done(errors::Internal("No send gpu copy-out-stream is available."));
132 return;
133 }
134 // Wait for the sender's main stream to make sure the data are available.
135 send_device_to_host_stream->ThenWaitFor(send_stream);
136
137 // Tensor values need to be copied from GPU to CPU ram so that
138 // we can build the protobuf response for a RecvTensor RPC.
139 // "device context" identifies the stream where the _Send op executed.
140 proto->set_dtype(tensor.dtype());
141 tensor.shape().AsProto(proto->mutable_tensor_shape());
142
143 // Prepare a proto with the right data buf size, and DMA the data
144 // over from the GPU buffer. Note that 0-size tensors do not have a
145 // backing buffer.
146 Allocator* alloc = nullptr;
147 char* buf = nullptr;
148 const int64_t total_bytes = is_dead ? 0 : tensor.TotalBytes();
149 if (total_bytes > 0) {
150 profiler::ScopedAnnotation annotation("SetProtoFromGPU");
151 alloc = GPUProcessState::singleton()->GetGpuHostAllocator(0);
152 buf = static_cast<char*>(
153 alloc->AllocateRaw(Allocator::kAllocatorAlignment, total_bytes));
154 if (LogMemory::IsEnabled()) {
155 LogMemory::RecordRawAllocation("SetProtoFromGPU",
156 LogMemory::PROTO_BUFFER_STEP_ID,
157 total_bytes, buf, alloc);
158 }
159 void* src_ptr = GetBase(&tensor);
160 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
161 send_device_to_host_stream->ThenMemcpy(buf, gpu_src_ptr, total_bytes);
162 }
163 // Use of tensor may outlive stack scope, so keep a ref.
164 TensorReference tensor_ref(tensor);
165 dev_info->event_mgr->ThenExecute(
166 send_device_to_host_stream, [send_device_to_host_stream, done, proto, buf,
167 total_bytes, alloc, tensor_ref]() {
168 if (!send_device_to_host_stream->ok()) {
169 LOG(FATAL) << "SetProtoFromGPU: GPU Memcpy failed";
170 }
171 tensor_ref.Unref();
172 if (total_bytes > 0) {
173 port::CopyFromArray(proto->mutable_tensor_content(), buf,
174 total_bytes);
175 if (LogMemory::IsEnabled()) {
176 LogMemory::RecordRawDeallocation("SetProtoFromGPU",
177 LogMemory::PROTO_BUFFER_STEP_ID,
178 buf, alloc, false);
179 }
180 alloc->DeallocateRaw(buf);
181 }
182 done(OkStatus());
183 });
184 }
185
186 // static
DeviceToDeviceCopy(DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,AllocatorAttributes src_alloc_attr,AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)187 void GPUUtil::DeviceToDeviceCopy(
188 DeviceContext* send_dev_context, DeviceContext* recv_dev_context,
189 Device* src, Device* dst, AllocatorAttributes src_alloc_attr,
190 AllocatorAttributes dst_alloc_attr, const Tensor* input, Tensor* output,
191 int dev_to_dev_stream_index, StatusCallback done) {
192 const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
193 se::Stream* send_stream = nullptr;
194 Status s = PrepareCopy(src, send_dev_context, *input, output, &dev_info,
195 &send_stream);
196 if (!s.ok()) {
197 done(s);
198 return;
199 }
200 auto send_device_to_device_stream =
201 static_cast<const GPUDeviceContext*>(send_dev_context)
202 ->device_to_device_stream(dev_to_dev_stream_index);
203 if (send_device_to_device_stream == nullptr) {
204 done(errors::Internal("No send gpu copy-out-stream is available."));
205 return;
206 }
207 // Wait for the main stream on the sender to make sure the result is
208 // available.
209 send_device_to_device_stream->ThenWaitFor(send_stream);
210
211 const int64_t total_bytes = input->TotalBytes();
212 if (total_bytes > 0) {
213 void* src_ptr = GetBase(input);
214 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
215 void* dst_ptr = GetBase(output);
216 DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
217 auto recv_stream =
218 static_cast<const GPUDeviceContext*>(recv_dev_context)->stream();
219 if (recv_stream == nullptr) {
220 done(errors::Internal("No recv gpu stream is available."));
221 return;
222 }
223 // Since we want to use the memory from recv_stream in the
224 // send_device_to_device_stream, add a dependency to make sure the memory is
225 // truly free.
226 // TODO(zhengxq): remove this dependency when we switch to a better way
227 // to make sure the memory is free.
228 send_device_to_device_stream->ThenWaitFor(recv_stream);
229
230 VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
231 send_device_to_device_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr,
232 total_bytes);
233 }
234
235 // Use of input may outlive stack scope, so keep a ref.
236 TensorReference input_ref(*input);
237 dev_info->event_mgr->ThenExecute(
238 send_device_to_device_stream,
239 [done, send_device_to_device_stream, input_ref]() {
240 input_ref.Unref();
241 if (!send_device_to_device_stream->ok()) {
242 LOG(FATAL) << "GPU->GPU Memcpy failed";
243 }
244 done(OkStatus());
245 });
246 send_dev_context->MaintainLifetimeOnStream(input,
247 send_device_to_device_stream);
248 }
249
250 static CopyTensor::Registration register_gpu_gpu_copy(
251 DEVICE_GPU, DEVICE_GPU, GPUUtil::DeviceToDeviceCopy);
252
253 namespace {
254
255 // Returns whether staging is needed based on tensor buffer's memory type.
NeedStaging(const Tensor * tensor)256 bool NeedStaging(const Tensor* tensor) {
257 // Only stage data if the host tensor is on pageable memory.
258 // So if the memory type is unknown, it will fallback to GPU driver to handle
259 // the staging if needed.
260 return tensor->GetMemoryType() == AllocatorMemoryType::kHostPageable;
261 }
262
263 } // namespace
264
265 // static
CopyGPUTensorToCPU(Device * gpu_device,const DeviceContext * device_context,const Tensor * gpu_tensor,Tensor * cpu_tensor,StatusCallback done)266 void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
267 const DeviceContext* device_context,
268 const Tensor* gpu_tensor, Tensor* cpu_tensor,
269 StatusCallback done) {
270 VLOG(1) << "CopyGPUTensorToCPU";
271 const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
272 se::Stream* send_stream = nullptr;
273 Status s = PrepareCopy(gpu_device, device_context, *gpu_tensor, cpu_tensor,
274 &dev_info, &send_stream);
275 if (!s.ok()) {
276 done(s);
277 return;
278 }
279
280 auto send_device_to_host_stream =
281 static_cast<const GPUDeviceContext*>(device_context)
282 ->device_to_host_stream();
283 if (send_device_to_host_stream == nullptr) {
284 done(errors::Internal("No send gpu copy-out-stream is available."));
285 return;
286 }
287 // Wait for the sender's main stream to make sure the data are available.
288 send_device_to_host_stream->ThenWaitFor(send_stream);
289
290 const int64_t total_bytes = gpu_tensor->TotalBytes();
291 if (total_bytes > 0) {
292 void* src_ptr = GetBase(gpu_tensor);
293 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
294 void* dst_ptr = GetBase(cpu_tensor);
295 send_device_to_host_stream->ThenMemcpy(dst_ptr, gpu_src_ptr, total_bytes);
296 }
297 // Use of the input may outlive stack scope, so keep a ref.
298 TensorReference input_ref(*gpu_tensor);
299 dev_info->event_mgr->ThenExecute(
300 send_device_to_host_stream,
301 [send_device_to_host_stream, done, input_ref]() {
302 if (!send_device_to_host_stream->ok()) {
303 LOG(FATAL) << "GPU->CPU Memcpy failed";
304 }
305 input_ref.Unref();
306 done(OkStatus());
307 });
308 }
309
310 /* static */
CopyCPUTensorToGPU(const Tensor * cpu_tensor,const DeviceContext * device_context,Device * gpu_device,Tensor * gpu_tensor,StatusCallback done,bool sync_dst_compute)311 void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
312 const DeviceContext* device_context,
313 Device* gpu_device, Tensor* gpu_tensor,
314 StatusCallback done, bool sync_dst_compute) {
315 VLOG(1) << "CopyCPUTensorToGPU";
316 const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
317 se::Stream* recv_stream = nullptr;
318 Status s = PrepareCopy(gpu_device, device_context, *cpu_tensor, gpu_tensor,
319 &dev_info, &recv_stream);
320 if (!s.ok()) {
321 done(s);
322 return;
323 }
324
325 auto recv_host_to_device_stream =
326 static_cast<const GPUDeviceContext*>(device_context)
327 ->host_to_device_stream();
328 if (recv_host_to_device_stream == nullptr) {
329 done(errors::Internal("No send gpu copy-out-stream is available."));
330 return;
331 }
332 // Wait for the recv-stream to make sure the buffer is truly available.
333 if (sync_dst_compute) {
334 recv_host_to_device_stream->ThenWaitFor(recv_stream);
335 }
336
337 const int64_t total_bytes = cpu_tensor->TotalBytes();
338
339 bool do_staging = false;
340 void* staging_buffer = nullptr;
341 Allocator* host_memory_allocator = device_context->host_memory_allocator();
342
343 // Use of cpu_tensor may outlive stack scope, so keep a ref.
344 TensorReference input_ref(*cpu_tensor);
345
346 // Note that 0-size tensors have no backing buffer.
347 if (total_bytes > 0) {
348 void* src_ptr = GetBase(cpu_tensor);
349 void* dst_ptr = GetBase(gpu_tensor);
350 DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
351
352 if (NeedStaging(cpu_tensor)) {
353 if (host_memory_allocator == nullptr) {
354 LOG_FIRST_N(WARNING, 1)
355 << "No host memory allocator is available to "
356 "stage data for CPU->GPU transfer. Staging will be skipped.";
357 } else {
358 do_staging = true;
359 }
360 }
361
362 if (do_staging) {
363 staging_buffer = host_memory_allocator->AllocateRaw(
364 tensorflow::Allocator::kAllocatorAlignment, total_bytes);
365 std::memcpy(staging_buffer, src_ptr, total_bytes);
366 input_ref.Unref();
367
368 recv_host_to_device_stream->ThenMemcpy(&gpu_dst_ptr, staging_buffer,
369 total_bytes);
370 } else {
371 recv_host_to_device_stream->ThenMemcpy(&gpu_dst_ptr, src_ptr,
372 total_bytes);
373 }
374 }
375
376 dev_info->event_mgr->ThenExecute(
377 recv_host_to_device_stream,
378 [recv_host_to_device_stream, done, input_ref, do_staging, staging_buffer,
379 host_memory_allocator]() {
380 if (do_staging) {
381 host_memory_allocator->DeallocateRaw(staging_buffer);
382 } else {
383 input_ref.Unref();
384 }
385 if (!recv_host_to_device_stream->ok()) {
386 LOG(FATAL) << "CPU->GPU Memcpy failed";
387 }
388 done(OkStatus());
389 });
390 }
391
Sync(Device * gpu_device)392 Status GPUUtil::Sync(Device* gpu_device) {
393 VLOG(1) << "GPUUtil::Sync";
394 auto* dev_info = gpu_device->tensorflow_accelerator_device_info();
395 if (!dev_info) {
396 return errors::Internal("Failed to find dest device GPUDeviceInfo");
397 }
398 return dev_info->stream->BlockHostUntilDone();
399 }
400
SyncAll(Device * gpu_device)401 Status GPUUtil::SyncAll(Device* gpu_device) {
402 VLOG(1) << "GPUUtil::SyncAll";
403 auto* dev_info = gpu_device->tensorflow_accelerator_device_info();
404 if (!dev_info) {
405 return errors::Internal("Failed to find dest device GPUDeviceInfo");
406 }
407 if (!dev_info->stream->parent()->SynchronizeAllActivity() ||
408 !dev_info->stream->ok()) {
409 return errors::Internal("GPU sync failed");
410 }
411 return OkStatus();
412 }
413
MemoryDebugString(const Device * device,Tensor * tensor)414 string GPUUtil::MemoryDebugString(const Device* device, Tensor* tensor) {
415 string ret;
416 CHECK(tensor);
417 const int64_t num_bytes = std::min<int64_t>(
418 FLAGS_brain_gpu_util_debug_string_maxlen, tensor->TotalBytes());
419 void* ptr = (num_bytes > 0) ? GetBase(tensor) : nullptr;
420 strings::Appendf(&ret, "%p:", ptr);
421 if (num_bytes > 0) {
422 auto* dev_info = device->tensorflow_accelerator_device_info();
423 if (!dev_info) {
424 strings::StrAppend(
425 &ret, PrintMemory(reinterpret_cast<const char*>(ptr), num_bytes));
426 } else {
427 string buf;
428 buf.resize(num_bytes);
429 DeviceMemoryBase gpu_ptr(ptr, num_bytes);
430 auto s = dev_info->stream->parent()->SynchronousMemcpyD2H(
431 gpu_ptr, num_bytes, &*buf.begin());
432 strings::StrAppend(&ret, PrintMemory(&*buf.begin(), num_bytes));
433 }
434 }
435 return ret;
436 }
437
438 // TODO(pbar) Checksum is called from places without a valid device context.
Checksum(Device * gpu_device,const DeviceContext * device_context,const Tensor & tensor)439 uint64 GPUUtil::Checksum(Device* gpu_device,
440 const DeviceContext* device_context,
441 const Tensor& tensor) {
442 Tensor copy(tensor.dtype(), tensor.shape());
443 Status s;
444 Notification n;
445 CopyGPUTensorToCPU(gpu_device, device_context, &tensor, ©,
446 [&s, &n](Status status) {
447 s.Update(status);
448 n.Notify();
449 });
450 n.WaitForNotification();
451 CHECK(s.ok()) << s;
452 return Checksum(copy);
453 }
454
Checksum(const Tensor & tensor)455 uint64 GPUUtil::Checksum(const Tensor& tensor) {
456 const float* fptr = reinterpret_cast<const float*>(GetBase(&tensor));
457 size_t num_bytes = tensor.TotalBytes();
458 size_t num_floats = num_bytes / sizeof(float);
459 for (size_t i = 0; i < num_floats; ++i) {
460 CHECK(!std::isnan(fptr[i])) << " i " << i;
461 }
462 // TODO(tucker): consider using crc32c instead.
463 return Hash64(reinterpret_cast<const char*>(GetBase(&tensor)),
464 tensor.TotalBytes(), 0);
465 }
466
467 // static
CopyGPUTensorToSameGPU(Device * gpu_device,const DeviceContext * device_context,const Tensor * src_gpu_tensor,Tensor * dst_gpu_tensor,StatusCallback done)468 void GPUUtil::CopyGPUTensorToSameGPU(Device* gpu_device,
469 const DeviceContext* device_context,
470 const Tensor* src_gpu_tensor,
471 Tensor* dst_gpu_tensor,
472 StatusCallback done) {
473 VLOG(1) << "CopyGPUTensorToSameGPU";
474 const DeviceBase::AcceleratorDeviceInfo* dev_info = nullptr;
475 se::Stream* send_stream = nullptr;
476 Status s = PrepareCopy(gpu_device, device_context, *src_gpu_tensor,
477 dst_gpu_tensor, &dev_info, &send_stream);
478 if (!s.ok()) {
479 done(s);
480 return;
481 }
482
483 const int64_t total_bytes = src_gpu_tensor->TotalBytes();
484 if (total_bytes > 0) {
485 void* src_ptr = GetBase(src_gpu_tensor);
486 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
487 void* dst_ptr = GetBase(dst_gpu_tensor);
488 DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
489 send_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr, total_bytes);
490 }
491
492 done(OkStatus());
493 }
494
495 } // namespace tensorflow
496