1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
17
18 #include "tensorflow/core/common_runtime/copy_tensor.h"
19 #include "tensorflow/core/common_runtime/device.h"
20 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
21 #include "tensorflow/core/common_runtime/dma_helper.h"
22 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
23 #include "tensorflow/core/common_runtime/gpu_device_context.h"
24 #include "tensorflow/core/framework/tensor.h"
25 #include "tensorflow/core/framework/tensor.pb.h"
26 #include "tensorflow/core/framework/tensor_reference.h"
27 #include "tensorflow/core/framework/types.h"
28 #include "tensorflow/core/lib/core/errors.h"
29 #include "tensorflow/core/lib/core/refcount.h"
30 #include "tensorflow/core/lib/hash/hash.h"
31 #include "tensorflow/core/lib/strings/strcat.h"
32 #include "tensorflow/core/lib/strings/stringprintf.h"
33 #include "tensorflow/core/platform/logging.h"
34 #include "tensorflow/core/platform/stream_executor.h"
35 #include "tensorflow/core/platform/tensor_coding.h"
36 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
37 #include "tensorflow/core/util/util.h"
38
39 // IMPLEMENTATION NOTE:
40 //
41 // 1. Within this module, we intentionally LOG(FATAL) if any stream
42 // involved in memcpy becomes !stream->ok(), because TF process
43 // today (1/2016) can not properly recover from such an error.
44 //
45 // 2. When 0-size tensor is being copied, we should not schedule a
46 // copy ThenMemcpy since there is no byte to move. However, we must
47 // ensure the causal ordering by arranging the copy done callback
48 // happens-after all activities scheduled on the given stream being
49 // finished.
50
51 // If this need to be runtime configurable, consider adding options to
52 // ConfigProto.
53 const tensorflow::int64 FLAGS_brain_gpu_util_debug_string_maxlen = 128;
54 extern bool FLAGS_brain_gpu_record_mem_types;
55
56 namespace tensorflow {
57
58 using se::DeviceMemoryBase;
59 using se::Stream;
60
PrepareCopy(Device * device,const DeviceContext * ctx,const Tensor & src,const Tensor * dst,const DeviceBase::GpuDeviceInfo ** dev_info,se::Stream ** stream)61 Status PrepareCopy(Device* device, const DeviceContext* ctx, const Tensor& src,
62 const Tensor* dst,
63 const DeviceBase::GpuDeviceInfo** dev_info,
64 se::Stream** stream) {
65 if (device == nullptr) {
66 return errors::Internal("Unexpected null device.");
67 }
68 auto di = device->tensorflow_gpu_device_info();
69 if (di == nullptr) {
70 return errors::Internal("Unexpected null device info.");
71 }
72 *dev_info = di;
73 if (ctx == nullptr) {
74 return errors::Internal("Unexpected null device context.");
75 }
76 auto gs = static_cast<const GPUDeviceContext*>(ctx)->stream();
77 if (gs == nullptr) {
78 return errors::Internal("No gpu stream is available.");
79 }
80 *stream = gs;
81 if (dst != nullptr) {
82 if (src.dtype() != dst->dtype()) {
83 return errors::Internal("Can't copy a tensor of ",
84 DataTypeString(src.dtype()), " into a tensor of ",
85 DataTypeString(dst->dtype()));
86 }
87 if (src.TotalBytes() != dst->TotalBytes()) {
88 return errors::Internal("Can't copy ", src.TotalBytes(),
89 " bytes of a tensor into another with ",
90 dst->TotalBytes(), " bytes buffer.");
91 }
92 if ((src.TotalBytes() > 0) && !src.IsInitialized()) {
93 return errors::Internal("Src tensor is not initialized.");
94 }
95 if ((dst->TotalBytes() > 0) && !dst->IsInitialized()) {
96 return errors::Internal("Dst tensor is not initialized.");
97 }
98 }
99 if (!DMAHelper::CanUseDMA(&src)) {
100 return errors::Internal("GPU copy from non-DMA ",
101 DataTypeString(src.dtype()), " tensor");
102 }
103 return Status::OK();
104 }
105
GetBase(const Tensor * src)106 void* GetBase(const Tensor* src) {
107 return const_cast<void*>(DMAHelper::base(src));
108 }
109
GetBase(Tensor * dst)110 void* GetBase(Tensor* dst) { return DMAHelper::base(dst); }
111
112 /*static*/
SetProtoFromGPU(const Tensor & tensor,Device * dev,const DeviceContext * device_context,TensorProto * proto,bool is_dead,StatusCallback done)113 void GPUUtil::SetProtoFromGPU(const Tensor& tensor, Device* dev,
114 const DeviceContext* device_context,
115 TensorProto* proto, bool is_dead,
116 StatusCallback done) {
117 VLOG(1) << "SetProtoFromGPU device_context " << device_context;
118 const DeviceBase::GpuDeviceInfo* dev_info = nullptr;
119 se::Stream* send_stream = nullptr;
120 Status s = PrepareCopy(dev, device_context, tensor, nullptr, &dev_info,
121 &send_stream);
122 if (!s.ok()) {
123 done(s);
124 return;
125 }
126
127 auto send_device_to_host_stream =
128 static_cast<const GPUDeviceContext*>(device_context)
129 ->device_to_host_stream();
130 if (send_device_to_host_stream == nullptr) {
131 done(errors::Internal("No send gpu copy-out-stream is available."));
132 return;
133 }
134 // Wait for the sender's main stream to make sure the data are available.
135 send_device_to_host_stream->ThenWaitFor(send_stream);
136
137 // Tensor values need to be copied from GPU to CPU ram so that
138 // we can build the protobuf response for a RecvTensor RPC.
139 // "device context" identifies the stream where the _Send op executed.
140 proto->set_dtype(tensor.dtype());
141 tensor.shape().AsProto(proto->mutable_tensor_shape());
142
143 // Prepare a proto with the right data buf size, and DMA the data
144 // over from the GPU buffer. Note that 0-size tensors do not have a
145 // backing buffer.
146 Allocator* alloc = nullptr;
147 char* buf = nullptr;
148 const int64 total_bytes = is_dead ? 0 : tensor.TotalBytes();
149 if (total_bytes > 0) {
150 profiler::ScopedAnnotation annotation("SetProtoFromGPU");
151 alloc = GPUProcessState::singleton()->GetGpuHostAllocator(0);
152 buf = static_cast<char*>(
153 alloc->AllocateRaw(Allocator::kAllocatorAlignment, total_bytes));
154 if (LogMemory::IsEnabled()) {
155 LogMemory::RecordRawAllocation("SetProtoFromGPU",
156 LogMemory::PROTO_BUFFER_STEP_ID,
157 total_bytes, buf, alloc);
158 }
159 void* src_ptr = GetBase(&tensor);
160 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
161 send_device_to_host_stream->ThenMemcpy(buf, gpu_src_ptr, total_bytes);
162 }
163 // Use of tensor may outlive stack scope, so keep a ref.
164 TensorReference tensor_ref(tensor);
165 dev_info->event_mgr->ThenExecute(
166 send_device_to_host_stream, [send_device_to_host_stream, done, proto, buf,
167 total_bytes, alloc, tensor_ref]() {
168 if (!send_device_to_host_stream->ok()) {
169 LOG(FATAL) << "SetProtoFromGPU: GPU Memcpy failed";
170 }
171 tensor_ref.Unref();
172 if (total_bytes > 0) {
173 port::CopyFromArray(proto->mutable_tensor_content(), buf,
174 total_bytes);
175 if (LogMemory::IsEnabled()) {
176 LogMemory::RecordRawDeallocation("SetProtoFromGPU",
177 LogMemory::PROTO_BUFFER_STEP_ID,
178 buf, alloc, false);
179 }
180 alloc->DeallocateRaw(buf);
181 }
182 done(Status::OK());
183 });
184 }
185
186 // static
DeviceToDeviceCopy(DeviceContext * send_dev_context,DeviceContext * recv_dev_context,Device * src,Device * dst,AllocatorAttributes src_alloc_attr,AllocatorAttributes dst_alloc_attr,const Tensor * input,Tensor * output,int dev_to_dev_stream_index,StatusCallback done)187 void GPUUtil::DeviceToDeviceCopy(
188 DeviceContext* send_dev_context, DeviceContext* recv_dev_context,
189 Device* src, Device* dst, AllocatorAttributes src_alloc_attr,
190 AllocatorAttributes dst_alloc_attr, const Tensor* input, Tensor* output,
191 int dev_to_dev_stream_index, StatusCallback done) {
192 const DeviceBase::GpuDeviceInfo* dev_info = nullptr;
193 se::Stream* send_stream = nullptr;
194 Status s = PrepareCopy(src, send_dev_context, *input, output, &dev_info,
195 &send_stream);
196 if (!s.ok()) {
197 done(s);
198 return;
199 }
200 auto send_device_to_device_stream =
201 static_cast<const GPUDeviceContext*>(send_dev_context)
202 ->device_to_device_stream(dev_to_dev_stream_index);
203 if (send_device_to_device_stream == nullptr) {
204 done(errors::Internal("No send gpu copy-out-stream is available."));
205 return;
206 }
207 // Wait for the main stream on the sender to make sure the result is
208 // available.
209 send_device_to_device_stream->ThenWaitFor(send_stream);
210
211 const int64 total_bytes = input->TotalBytes();
212 if (total_bytes > 0) {
213 void* src_ptr = GetBase(input);
214 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
215 void* dst_ptr = GetBase(output);
216 DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
217 auto recv_stream =
218 static_cast<const GPUDeviceContext*>(recv_dev_context)->stream();
219 if (recv_stream == nullptr) {
220 done(errors::Internal("No recv gpu stream is available."));
221 return;
222 }
223 // Since we want to use the memory from recv_stream in the
224 // send_device_to_device_stream, add a dependency to make sure the memory is
225 // truly free.
226 // TODO(zhengxq): remove this dependency when we switch to a better way
227 // to make sure the memory is free.
228 send_device_to_device_stream->ThenWaitFor(recv_stream);
229
230 VLOG(2) << "src_ptr " << src_ptr << " dst_ptr " << dst_ptr;
231 send_device_to_device_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr,
232 total_bytes);
233 }
234
235 // Use of input may outlive stack scope, so keep a ref.
236 TensorReference input_ref(*input);
237 dev_info->event_mgr->ThenExecute(
238 send_device_to_device_stream,
239 [done, send_device_to_device_stream, input_ref]() {
240 input_ref.Unref();
241 if (!send_device_to_device_stream->ok()) {
242 LOG(FATAL) << "GPU->GPU Memcpy failed";
243 }
244 done(Status::OK());
245 });
246 send_dev_context->MaintainLifetimeOnStream(input,
247 send_device_to_device_stream);
248 }
249
250 static CopyTensor::Registration register_gpu_gpu_copy(
251 DEVICE_GPU, DEVICE_GPU, GPUUtil::DeviceToDeviceCopy);
252
253 // static
CopyGPUTensorToCPU(Device * gpu_device,const DeviceContext * device_context,const Tensor * gpu_tensor,Tensor * cpu_tensor,StatusCallback done)254 void GPUUtil::CopyGPUTensorToCPU(Device* gpu_device,
255 const DeviceContext* device_context,
256 const Tensor* gpu_tensor, Tensor* cpu_tensor,
257 StatusCallback done) {
258 VLOG(1) << "CopyGPUTensorToCPU";
259 const DeviceBase::GpuDeviceInfo* dev_info = nullptr;
260 se::Stream* send_stream = nullptr;
261 Status s = PrepareCopy(gpu_device, device_context, *gpu_tensor, cpu_tensor,
262 &dev_info, &send_stream);
263 if (!s.ok()) {
264 done(s);
265 return;
266 }
267
268 auto send_device_to_host_stream =
269 static_cast<const GPUDeviceContext*>(device_context)
270 ->device_to_host_stream();
271 if (send_device_to_host_stream == nullptr) {
272 done(errors::Internal("No send gpu copy-out-stream is available."));
273 return;
274 }
275 // Wait for the sender's main stream to make sure the data are available.
276 send_device_to_host_stream->ThenWaitFor(send_stream);
277
278 const int64 total_bytes = gpu_tensor->TotalBytes();
279 if (total_bytes > 0) {
280 void* src_ptr = GetBase(gpu_tensor);
281 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
282 void* dst_ptr = GetBase(cpu_tensor);
283 send_device_to_host_stream->ThenMemcpy(dst_ptr, gpu_src_ptr, total_bytes);
284 }
285 // Use of the input may outlive stack scope, so keep a ref.
286 TensorReference input_ref(*gpu_tensor);
287 dev_info->event_mgr->ThenExecute(
288 send_device_to_host_stream,
289 [send_device_to_host_stream, done, input_ref]() {
290 if (!send_device_to_host_stream->ok()) {
291 LOG(FATAL) << "GPU->CPU Memcpy failed";
292 }
293 input_ref.Unref();
294 done(Status::OK());
295 });
296 }
297
298 /* static */
CopyCPUTensorToGPU(const Tensor * cpu_tensor,const DeviceContext * device_context,Device * gpu_device,Tensor * gpu_tensor,StatusCallback done,bool sync_dst_compute)299 void GPUUtil::CopyCPUTensorToGPU(const Tensor* cpu_tensor,
300 const DeviceContext* device_context,
301 Device* gpu_device, Tensor* gpu_tensor,
302 StatusCallback done, bool sync_dst_compute) {
303 VLOG(1) << "CopyCPUTensorToGPU";
304 const DeviceBase::GpuDeviceInfo* dev_info = nullptr;
305 se::Stream* recv_stream = nullptr;
306 Status s = PrepareCopy(gpu_device, device_context, *cpu_tensor, gpu_tensor,
307 &dev_info, &recv_stream);
308 if (!s.ok()) {
309 done(s);
310 return;
311 }
312
313 auto recv_host_to_device_stream =
314 static_cast<const GPUDeviceContext*>(device_context)
315 ->host_to_device_stream();
316 if (recv_host_to_device_stream == nullptr) {
317 done(errors::Internal("No send gpu copy-out-stream is available."));
318 return;
319 }
320 // Wait for the recv-stream to make sure the buffer is truly available.
321 if (sync_dst_compute) {
322 recv_host_to_device_stream->ThenWaitFor(recv_stream);
323 }
324
325 const int64 total_bytes = cpu_tensor->TotalBytes();
326 // Note that 0-size tensors have no backing buffer.
327 if (total_bytes > 0) {
328 void* src_ptr = GetBase(cpu_tensor);
329 void* dst_ptr = GetBase(gpu_tensor);
330 DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
331 recv_host_to_device_stream->ThenMemcpy(&gpu_dst_ptr, src_ptr, total_bytes);
332 }
333 // Use of cpu_tensor may outlive stack scope, so keep a ref.
334 TensorReference input_ref(*cpu_tensor);
335 dev_info->event_mgr->ThenExecute(
336 recv_host_to_device_stream,
337 [recv_host_to_device_stream, done, input_ref]() {
338 input_ref.Unref();
339 if (!recv_host_to_device_stream->ok()) {
340 LOG(FATAL) << "CPU->GPU Memcpy failed";
341 }
342 done(Status::OK());
343 });
344 }
345
Sync(Device * gpu_device)346 Status GPUUtil::Sync(Device* gpu_device) {
347 VLOG(1) << "GPUUtil::Sync";
348 auto* dev_info = gpu_device->tensorflow_gpu_device_info();
349 if (!dev_info) {
350 return errors::Internal("Failed to find dest device GPUDeviceInfo");
351 }
352 return dev_info->stream->BlockHostUntilDone();
353 }
354
SyncAll(Device * gpu_device)355 Status GPUUtil::SyncAll(Device* gpu_device) {
356 VLOG(1) << "GPUUtil::SyncAll";
357 auto* dev_info = gpu_device->tensorflow_gpu_device_info();
358 if (!dev_info) {
359 return errors::Internal("Failed to find dest device GPUDeviceInfo");
360 }
361 if (!dev_info->stream->parent()->SynchronizeAllActivity() ||
362 !dev_info->stream->ok()) {
363 return errors::Internal("GPU sync failed");
364 }
365 return Status::OK();
366 }
367
MemoryDebugString(const Device * device,Tensor * tensor)368 string GPUUtil::MemoryDebugString(const Device* device, Tensor* tensor) {
369 string ret;
370 CHECK(tensor);
371 const int64 num_bytes = std::min<int64>(
372 FLAGS_brain_gpu_util_debug_string_maxlen, tensor->TotalBytes());
373 void* ptr = (num_bytes > 0) ? GetBase(tensor) : nullptr;
374 strings::Appendf(&ret, "%p:", ptr);
375 if (num_bytes > 0) {
376 auto* dev_info = device->tensorflow_gpu_device_info();
377 if (!dev_info) {
378 strings::StrAppend(
379 &ret, PrintMemory(reinterpret_cast<const char*>(ptr), num_bytes));
380 } else {
381 string buf;
382 buf.resize(num_bytes);
383 DeviceMemoryBase gpu_ptr(ptr, num_bytes);
384 auto s = dev_info->stream->parent()->SynchronousMemcpyD2H(
385 gpu_ptr, num_bytes, &*buf.begin());
386 strings::StrAppend(&ret, PrintMemory(&*buf.begin(), num_bytes));
387 }
388 }
389 return ret;
390 }
391
392 // TODO(pbar) Checksum is called from places without a valid device context.
Checksum(Device * gpu_device,const DeviceContext * device_context,const Tensor & tensor)393 uint64 GPUUtil::Checksum(Device* gpu_device,
394 const DeviceContext* device_context,
395 const Tensor& tensor) {
396 Tensor copy(tensor.dtype(), tensor.shape());
397 Status s;
398 Notification n;
399 CopyGPUTensorToCPU(gpu_device, device_context, &tensor, ©,
400 [&s, &n](Status status) {
401 s.Update(status);
402 n.Notify();
403 });
404 n.WaitForNotification();
405 CHECK(s.ok()) << s;
406 return Checksum(copy);
407 }
408
Checksum(const Tensor & tensor)409 uint64 GPUUtil::Checksum(const Tensor& tensor) {
410 const float* fptr = reinterpret_cast<const float*>(GetBase(&tensor));
411 size_t num_bytes = tensor.TotalBytes();
412 size_t num_floats = num_bytes / sizeof(float);
413 for (size_t i = 0; i < num_floats; ++i) {
414 CHECK(!std::isnan(fptr[i])) << " i " << i;
415 }
416 // TODO(tucker): consider using crc32c instead.
417 return Hash64(reinterpret_cast<const char*>(GetBase(&tensor)),
418 tensor.TotalBytes(), 0);
419 }
420
421 // static
CopyGPUTensorToSameGPU(Device * gpu_device,const DeviceContext * device_context,const Tensor * src_gpu_tensor,Tensor * dst_gpu_tensor,StatusCallback done)422 void GPUUtil::CopyGPUTensorToSameGPU(Device* gpu_device,
423 const DeviceContext* device_context,
424 const Tensor* src_gpu_tensor,
425 Tensor* dst_gpu_tensor,
426 StatusCallback done) {
427 VLOG(1) << "CopyGPUTensorToSameGPU";
428 const DeviceBase::GpuDeviceInfo* dev_info = nullptr;
429 se::Stream* send_stream = nullptr;
430 Status s = PrepareCopy(gpu_device, device_context, *src_gpu_tensor,
431 dst_gpu_tensor, &dev_info, &send_stream);
432 if (!s.ok()) {
433 done(s);
434 return;
435 }
436
437 const int64 total_bytes = src_gpu_tensor->TotalBytes();
438 if (total_bytes > 0) {
439 void* src_ptr = GetBase(src_gpu_tensor);
440 DeviceMemoryBase gpu_src_ptr(src_ptr, total_bytes);
441 void* dst_ptr = GetBase(dst_gpu_tensor);
442 DeviceMemoryBase gpu_dst_ptr(dst_ptr, total_bytes);
443 send_stream->ThenMemcpy(&gpu_dst_ptr, gpu_src_ptr, total_bytes);
444 }
445
446 done(Status::OK());
447 }
448
449 } // namespace tensorflow
450