1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 // TODO(opensource): Use a more generic sounding preprocessor name than
17 // GOOGLE_CUDA
18 #if (defined(GOOGLE_CUDA) && GOOGLE_CUDA) || \
19 (defined(TENSORFLOW_USE_ROCM) && TENSORFLOW_USE_ROCM)
20
21 #if TENSORFLOW_USE_ROCM
22 #include "rocm/include/hip/hip_runtime.h"
23 #endif
24
25 #define EIGEN_USE_GPU
26
27 #include <stdlib.h>
28 #include <string.h>
29
30 #include <algorithm>
31 #include <list>
32 #include <map>
33 #include <tuple>
34 #include <vector>
35
36 #include "absl/container/flat_hash_set.h"
37 #include "absl/strings/str_split.h"
38 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
39 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
40 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
41 #include "tensorflow/core/common_runtime/device_factory.h"
42 #include "tensorflow/core/common_runtime/gpu/gpu_device.h"
43 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
44 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
45 #include "tensorflow/core/common_runtime/gpu/gpu_init.h"
46 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
47 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
48 #include "tensorflow/core/common_runtime/gpu_device_context.h"
49 #include "tensorflow/core/common_runtime/local_device.h"
50 #include "tensorflow/core/framework/allocator.h"
51 #include "tensorflow/core/framework/device_base.h"
52 #include "tensorflow/core/framework/op_kernel.h"
53 #include "tensorflow/core/framework/tensor.h"
54 #include "tensorflow/core/framework/tensor.pb.h"
55 #include "tensorflow/core/framework/types.h"
56 #include "tensorflow/core/framework/variant_op_registry.h"
57 #include "tensorflow/core/graph/types.h"
58 #include "tensorflow/core/lib/core/errors.h"
59 #include "tensorflow/core/lib/core/status.h"
60 #include "tensorflow/core/lib/strings/numbers.h"
61 #include "tensorflow/core/lib/strings/str_util.h"
62 #include "tensorflow/core/lib/strings/strcat.h"
63 #if GOOGLE_CUDA
64 #include "third_party/gpus/cudnn/cudnn.h"
65 #include "tensorflow/stream_executor/cuda/cuda_activation.h"
66 #elif TENSORFLOW_USE_ROCM
67 #include "tensorflow/core/platform/rocm.h"
68 #endif
69 #include "tensorflow/compiler/xla/stream_executor/gpu/gpu_stream.h"
70 #include "tensorflow/core/platform/fingerprint.h"
71 #include "tensorflow/core/platform/logging.h"
72 #include "tensorflow/core/platform/macros.h"
73 #include "tensorflow/core/platform/stream_executor.h"
74 #include "tensorflow/core/platform/types.h"
75 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
76 #include "tensorflow/core/profiler/lib/scoped_memory_debug_annotation.h"
77 #include "tensorflow/core/public/session_options.h"
78 #include "tensorflow/core/util/device_name_utils.h"
79 #include "tensorflow/core/util/env_var.h"
80 #include "tensorflow/core/util/stream_executor_util.h"
81 #include "tensorflow/stream_executor/platform/dso_loader.h"
82
83 #if !defined(PLATFORM_GOOGLE)
84 #if GOOGLE_CUDA
85 #include "third_party/gpus/cuda/cuda_config.h"
86 #endif
87 #endif
88
89 namespace tensorflow {
90
91 #if GOOGLE_CUDA
92
93 typedef cudaStream_t gpuStream_t;
94 typedef cudaDeviceProp gpuDeviceProp_t;
95 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
96 using se::cuda::ScopedActivateExecutorContext;
97
98 #elif TENSORFLOW_USE_ROCM
99
100 typedef hipStream_t gpuStream_t;
101 typedef hipDeviceProp_t gpuDeviceProp_t;
102 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
103 using se::rocm::ScopedActivateExecutorContext;
104
105 #endif
106
107 // Eigen Ops directly allocate memory only for temporary buffers used
108 // during OpKernel::Compute(). The recommended way of allocating such
109 // memory is via OpKernelContext::allocate_temp(). However, Eigen Ops
110 // don't have access to OpKernelContext, instead they get access to
111 // memory directly through the device allocator. As an Open Source
112 // project, Eigen assumes allocator semantics similar to those of the
113 // CUDA or ROCm memory allocator, and may not work correctly due to race
114 // conditions if used with some other allocator. For safety, we need
115 // to delay deallocation calls out of Eigen until all events on the
116 // corresponding stream have completed. The following two classes
117 // serve this purpose in two different compilation environments.
118
119 class EigenGpuStreamDevice : public ::Eigen::StreamInterface {
120 public:
EigenGpuStreamDevice()121 EigenGpuStreamDevice()
122 : scratch_(nullptr),
123 semaphore_(nullptr),
124 context_(nullptr),
125 device_(this) {}
~EigenGpuStreamDevice()126 ~EigenGpuStreamDevice() override {}
127
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,PlatformDeviceId platform_device_id,::tensorflow::Allocator * alloc,char * scratch)128 void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
129 PlatformDeviceId platform_device_id,
130 ::tensorflow::Allocator* alloc, char* scratch) {
131 if (LogMemory::IsEnabled()) {
132 operation_ = context->op_kernel().name() + "/EigenAllocator";
133 step_id_ = context->step_id();
134 }
135 context_ = context;
136 scratch_ = scratch;
137 semaphore_ =
138 reinterpret_cast<unsigned int*>(scratch + Eigen::kGpuScratchSize);
139 stream_ = gpu_stream;
140 allocator_ = alloc;
141 device_prop_ = &Eigen::GetGpuDeviceProperties(platform_device_id.value());
142 }
143
stream() const144 const gpuStream_t& stream() const override { return *stream_; }
deviceProperties() const145 const gpuDeviceProp_t& deviceProperties() const override {
146 return *device_prop_;
147 }
148
allocate(size_t num_bytes) const149 void* allocate(size_t num_bytes) const override {
150 void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes);
151 if (ret == nullptr) {
152 if (context_) {
153 context_->SetStatus(errors::ResourceExhausted(
154 strings::StrCat("Ran out of GPU memory when allocating ", num_bytes,
155 " bytes for ", operation_)));
156 } else {
157 LOG(FATAL)
158 << "EigenAllocator for GPU ran out of memory when allocating "
159 << num_bytes << ". See error logs for more detailed info.";
160 }
161 }
162 if (LogMemory::IsEnabled() && ret != nullptr) {
163 LogMemory::RecordRawAllocation(operation_, step_id_, num_bytes, ret,
164 allocator_);
165 }
166 return ret;
167 }
deallocate(void * buffer) const168 void deallocate(void* buffer) const override {
169 if (LogMemory::IsEnabled() && buffer != nullptr) {
170 LogMemory::RecordRawDeallocation(operation_, step_id_, buffer, allocator_,
171 true);
172 }
173 AsyncFreeData* afData =
174 new AsyncFreeData(allocator_, buffer, operation_, step_id_);
175 #if GOOGLE_CUDA
176 cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0);
177 CHECK_EQ(err, cudaSuccess);
178 #elif TENSORFLOW_USE_ROCM
179 hipError_t err = hipStreamAddCallback(*stream_, asyncFree, afData, 0);
180 CHECK_EQ(err, hipSuccess);
181 #endif
182 }
183
184 // Return a pointer to a per stream scratchpad of 1024 bytes residing
185 // in global memory.
scratchpad() const186 void* scratchpad() const override { return scratch_; }
187
188 // Return a semaphore. The semaphore is initially initialized to 0, and
189 // each kernel using it is responsible for resetting to 0 upon completion
190 // to maintain the invariant that the semaphore is always equal to 0 upon
191 // each kernel start.
semaphore() const192 unsigned int* semaphore() const override { return semaphore_; }
193
device() const194 const Eigen::GpuDevice& device() const { return device_; }
195
196 private:
197 struct AsyncFreeData {
AsyncFreeDatatensorflow::EigenGpuStreamDevice::AsyncFreeData198 AsyncFreeData(::tensorflow::Allocator* a, void* p, const string& o,
199 const int64_t s)
200 : allocator_(a), address_(p), operation_(o), step_id_(s) {}
201 ::tensorflow::Allocator* allocator_;
202 void* address_;
203 const string operation_;
204 const int64_t step_id_;
205 };
206
207 #if GOOGLE_CUDA
asyncFree(gpuStream_t stream,cudaError_t status,void * userData)208 static void CUDART_CB asyncFree(gpuStream_t stream, cudaError_t status,
209 void* userData)
210 #elif TENSORFLOW_USE_ROCM
211 static void asyncFree(gpuStream_t stream, hipError_t status, void* userData)
212 #endif
213 {
214 AsyncFreeData* data = static_cast<AsyncFreeData*>(userData);
215 if (LogMemory::IsEnabled()) {
216 LogMemory::RecordRawDeallocation(data->operation_, data->step_id_,
217 data->address_, data->allocator_, false);
218 }
219 data->allocator_->DeallocateRaw(data->address_);
220 delete data;
221 }
222
223 string operation_;
224 int64_t step_id_;
225 const gpuStream_t* stream_; // Not owned.
226 const gpuDeviceProp_t* device_prop_; // Not owned.
227 ::tensorflow::Allocator* allocator_; // Not owned.
228 mutable char* scratch_;
229 mutable unsigned int* semaphore_;
230 OpKernelContext* context_;
231 Eigen::GpuDevice device_;
232
233 TF_DISALLOW_COPY_AND_ASSIGN(EigenGpuStreamDevice);
234 };
235
236 // This factory helps to ensure that different GPU device objects that refer to
237 // the same physical device and stream group id use the same stream group
238 // object (and therefore the same CUDA streams). This is necessary since there
239 // is a single memory allocator per device (see ProcessState::GetGPUAllocator)
240 // and allocators must not be shared across streams.
241 class BaseGPUDevice::StreamGroupFactory {
242 public:
243 // Returns the unique stream group for use with the stream defined by
244 // {tf_device_id, stream_group_within_gpu}, creating it if it does not yet
245 // exist.
246 // This function is thread safe.
GetOrCreate(TfDeviceId tf_device_id,int stream_group_within_gpu,se::StreamExecutor * executor,const GPUOptions & options)247 BaseGPUDevice::StreamGroup* GetOrCreate(TfDeviceId tf_device_id,
248 int stream_group_within_gpu,
249 se::StreamExecutor* executor,
250 const GPUOptions& options) {
251 mutex_lock guard(lock_);
252 StreamGroup* group =
253 &streams_[key_type(tf_device_id.value(), stream_group_within_gpu)];
254 if (!group->compute) {
255 int priority = GetPriority(tf_device_id.value(), options);
256 group->priority = priority;
257 group->compute = GetStream(executor, priority);
258 group->compute->Init();
259 VLOG(2) << "Created stream[" << stream_group_within_gpu
260 << "] = " << group->compute << " with priority: " << priority;
261
262 #if TENSORFLOW_USE_ROCM
263 // ROCm streams are lightweight and will not necessarily trigger device
264 // queue init until they are first used. For optimal performance,
265 // compute and nccl streams must be immediate siblings.
266 group->nccl = GetStream(executor, priority);
267 group->nccl->Init();
268 VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
269 << "] = " << group->nccl;
270
271 // Force underlying resource creation now.
272 group->compute->ThenWaitFor(group->nccl);
273 group->nccl->ThenWaitFor(group->compute);
274 #endif
275
276 group->host_to_device = GetStream(executor, priority);
277 group->host_to_device->Init();
278 VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
279 << "] = " << group->host_to_device;
280
281 group->device_to_host = GetStream(executor, priority);
282 group->device_to_host->Init();
283 VLOG(2) << "Created device_to_host_stream[" << stream_group_within_gpu
284 << "] = " << group->device_to_host;
285
286 int num_d2d_streams =
287 options.experimental().num_dev_to_dev_copy_streams();
288 if (num_d2d_streams == 0) num_d2d_streams = 1;
289 if (num_d2d_streams < 1 || num_d2d_streams > 4) {
290 LOG(ERROR)
291 << "Illegal GPUOptions.experimental.num_dev_to_dev_copy_streams="
292 << num_d2d_streams << " set to 1 instead.";
293 num_d2d_streams = 1;
294 }
295 for (int i = 0; i < num_d2d_streams; ++i) {
296 se::Stream* stream = GetStream(executor, priority);
297 stream->Init();
298 group->device_to_device.push_back(stream);
299 VLOG(2) << "Created device_to_device_stream[" << stream_group_within_gpu
300 << "] = " << group->device_to_device.back();
301 }
302 }
303 return group;
304 }
305
306 // Returns a reference to the StreamGroupFactory singleton. Note that this is
307 // never destroyed, so the objects it owns are never deleted.
Global()308 static StreamGroupFactory& Global() {
309 static StreamGroupFactory* instance = new StreamGroupFactory();
310 return *instance;
311 }
312
313 // Helper method for unit tests to reset the streams. Never use in production.
TestOnlyReset()314 void TestOnlyReset() {
315 mutex_lock guard(lock_);
316 for (auto& item : streams_) {
317 auto& stream = item.second;
318 if (stream.compute) {
319 delete stream.compute;
320 stream.compute = nullptr;
321 }
322 #if TENSORFLOW_USE_ROCM
323 if (stream.nccl) {
324 delete stream.nccl;
325 stream.nccl = nullptr;
326 }
327 #endif
328 if (stream.host_to_device) {
329 delete stream.host_to_device;
330 stream.host_to_device = nullptr;
331 }
332 if (stream.device_to_host) {
333 delete stream.device_to_host;
334 stream.device_to_host = nullptr;
335 }
336 while (!stream.device_to_device.empty()) {
337 auto back = stream.device_to_device.back();
338 if (back) {
339 delete back;
340 }
341 stream.device_to_device.pop_back();
342 }
343 }
344 streams_.clear();
345 }
346
347 private:
348 // Returns priority for the given virtual GPU id from the session options.
349 // Returns 0 if no virtual devices are specified.
GetPriority(int tf_device_id,const GPUOptions & options)350 int GetPriority(int tf_device_id, const GPUOptions& options) {
351 int id = tf_device_id;
352 int i = 0;
353 int priority = 0;
354 while (i < options.experimental().virtual_devices_size()) {
355 const int size =
356 options.experimental().virtual_devices().Get(i).priority_size();
357 if (id >= size) {
358 id -= size;
359 } else {
360 priority =
361 options.experimental().virtual_devices().Get(i).priority().Get(id);
362 break;
363 }
364 i++;
365 }
366 return priority;
367 }
368
369 // Returns a Stream with the underlying GPUStream with the given priority.
GetStream(se::StreamExecutor * executor,int priority)370 se::Stream* GetStream(se::StreamExecutor* executor, int priority) {
371 auto stream = new se::Stream(executor);
372 static_cast<stream_executor::gpu::GpuStream*>(stream->implementation())
373 ->SetPriority(priority);
374 return stream;
375 }
376
377 mutex lock_;
378 using key_type = std::tuple<int, int>;
379 std::map<key_type, StreamGroup> streams_;
380
381 // StreamGroupFactory cannot be created directly; Call
382 // StreamGroupFactory::Global() to get the global instance.
383 StreamGroupFactory() = default;
384 TF_DISALLOW_COPY_AND_ASSIGN(StreamGroupFactory);
385 };
386
BaseGPUDevice(const SessionOptions & options,const string & name,Bytes memory_limit,const DeviceLocality & locality,TfDeviceId tf_device_id,const string & physical_device_desc,Allocator * gpu_allocator,Allocator * cpu_allocator,bool sync_every_op)387 BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
388 Bytes memory_limit, const DeviceLocality& locality,
389 TfDeviceId tf_device_id,
390 const string& physical_device_desc,
391 Allocator* gpu_allocator, Allocator* cpu_allocator,
392 bool sync_every_op)
393 : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
394 memory_limit, locality,
395 physical_device_desc)),
396 gpu_allocator_(gpu_allocator),
397 cpu_allocator_(cpu_allocator),
398 scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
399 tf_device_id_(tf_device_id),
400 sync_every_op_(sync_every_op) {
401 // XLA device IDs for GPUs are arbitrary but must be unique, so we hash device
402 // names (which include a replica index even for multi-client).
403 set_xla_global_id(Fingerprint32(name) % std::numeric_limits<int32_t>::max());
404 GPUProcessState::singleton()->EnableGPUDevice();
405 }
406
~BaseGPUDevice()407 BaseGPUDevice::~BaseGPUDevice() {
408 delete accelerator_device_info_;
409 if (scratch_) gpu_allocator_->DeallocateRaw(scratch_);
410 device_context_->Unref();
411 }
412
413 // This should be idempotent if already initialized.
InitScratchBuffers()414 Status BaseGPUDevice::InitScratchBuffers() {
415 mutex_lock l(scratch_init_mutex_);
416 if (!scratch_) {
417 DCHECK(stream_);
418 size_t scratch_buffer_size = Eigen::kGpuScratchSize + sizeof(unsigned int);
419 profiler::ScopedMemoryDebugAnnotation op_annotation("ScratchBuffer");
420 void* scratch_buffer = gpu_allocator_->AllocateRaw(
421 Allocator::kAllocatorAlignment, scratch_buffer_size);
422 if (scratch_buffer == nullptr) {
423 return errors::FailedPrecondition(
424 "Failed to allocate scratch buffer for device ",
425 tf_device_id_.value());
426 }
427 se::DeviceMemory<char> mem(
428 se::DeviceMemoryBase(scratch_buffer, scratch_buffer_size));
429 TF_RETURN_IF_ERROR(executor_->SynchronousMemZero(
430 &mem, Eigen::kGpuScratchSize + sizeof(unsigned int)));
431 scratch_ = static_cast<char*>(scratch_buffer);
432 }
433 return OkStatus();
434 }
435
Init(const SessionOptions & options)436 Status BaseGPUDevice::Init(const SessionOptions& options) {
437 auto executor_status = DeviceIdUtil::ExecutorForTfDeviceId(
438 DEVICE_GPU, GPUMachineManager(), tf_device_id_);
439 if (!executor_status.status().ok()) {
440 return errors::Internal("Failed to get StreamExecutor for device ",
441 tf_device_id_.value());
442 }
443
444 executor_ = executor_status.ValueOrDie();
445
446 stream_ = StreamGroupFactory::Global().GetOrCreate(
447 tf_device_id_, 0, executor_, options.config.gpu_options());
448
449 // Get an allocator that allocates pinned memory on host.
450 AllocatorAttributes attr;
451 attr.set_on_host(true);
452 attr.set_gpu_compatible(true);
453 Allocator* host_memory_allocator = GetAllocator(attr);
454
455 device_context_ =
456 new GPUDeviceContext(0, stream_->compute,
457 #if TENSORFLOW_USE_ROCM
458 stream_->nccl,
459 #endif
460 stream_->host_to_device, stream_->device_to_host,
461 stream_->device_to_device, host_memory_allocator);
462
463 em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
464 options.config.gpu_options());
465
466 GPUKernelTracker::Params tracker_params(
467 options.config.gpu_options().experimental().kernel_tracker_max_interval(),
468 options.config.gpu_options().experimental().kernel_tracker_max_bytes(),
469 options.config.gpu_options().experimental().kernel_tracker_max_pending());
470 timestamped_allocator_ =
471 options.config.gpu_options().experimental().timestamped_allocator();
472 pending_cap_ = tracker_params.max_pending;
473 if (timestamped_allocator_ ||
474 (tracker_params.max_interval > 0 || tracker_params.max_bytes > 0 ||
475 tracker_params.max_pending > 0)) {
476 SharedCounter* timing_counter = nullptr;
477 if (timestamped_allocator_) {
478 // In this case the SharedCounter was already created and set in the
479 // associated Allocator, with ownership by GPUProcessState.
480 // The GPUKernelTracker will use this SharedCounter, instead of
481 // owning its own.
482 timing_counter =
483 GPUProcessState::singleton()->GPUAllocatorCounter(tf_device_id_);
484 DCHECK(timing_counter);
485 }
486 kernel_tracker_.reset(new GPUKernelTracker(
487 tracker_params, Env::Default(), stream_->compute, timing_counter,
488 timestamped_allocator_ ? gpu_allocator_ : nullptr, em_));
489 }
490
491 accelerator_device_info_ = new DeviceBase::AcceleratorDeviceInfo;
492 accelerator_device_info_->stream = stream_->compute;
493 accelerator_device_info_->default_context = device_context_;
494 accelerator_device_info_->event_mgr = em_;
495 PlatformDeviceId platform_device_id;
496 TF_RETURN_IF_ERROR(
497 GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id));
498 accelerator_device_info_->gpu_id = platform_device_id.value();
499 set_tensorflow_accelerator_device_info(accelerator_device_info_);
500
501 // Whether and how the GPU device uses its own threadpool.
502 // This option is experimental. Once we confirm the best setting, we
503 // may change the default behavior and completely remove this flag.
504 // Default values might change in future releases.
505 // Possible values:
506 // * global: GPU uses threads shared with CPU in the main compute
507 // thread-pool. This is currently the default.
508 // * gpu_private: GPU uses threads dedicated to this device.
509 // * gpu_shared: All GPUs share a dedicated thread pool.
510 string gpu_thread_mode;
511 TF_RETURN_IF_ERROR(
512 ReadStringFromEnvVar("TF_GPU_THREAD_MODE", "global", &gpu_thread_mode));
513 gpu_thread_mode = absl::AsciiStrToLower(gpu_thread_mode);
514 if (gpu_thread_mode != "global") {
515 int64_t gpu_thread_count = -1;
516 // Default to two threads. One for device compute and another for memory
517 // copies.
518 TF_RETURN_IF_ERROR(
519 ReadInt64FromEnvVar("TF_GPU_THREAD_COUNT", 2, &gpu_thread_count));
520 if (gpu_thread_mode == "gpu_private") {
521 // TODO(zhengxq): since these threads only serve a single GPU device,
522 // we should set the device context once for each thread, and avoid
523 // setting them for each kernel.
524 // TODO(zhengxq): pin the thread to the same socket of the target GPU.
525 thread_pool_.reset(new thread::ThreadPool(
526 options.env, ThreadOptions(),
527 strings::StrCat("gpu_private_", tf_device_id_.value()),
528 static_cast<int32>(gpu_thread_count),
529 !options.config.experimental().disable_thread_spinning(),
530 /*allocator=*/nullptr));
531 set_tensorflow_device_thread_pool(thread_pool_.get());
532 } else if (gpu_thread_mode == "gpu_shared") {
533 static thread::ThreadPool* thread_pool = new thread::ThreadPool(
534 options.env, ThreadOptions(), "gpu_shared",
535 static_cast<int32>(gpu_thread_count),
536 !options.config.experimental().disable_thread_spinning(),
537 /*allocator=*/nullptr);
538 set_tensorflow_device_thread_pool(thread_pool);
539 } else {
540 string error_message =
541 strings::StrCat("Invalid gpu_thread_mode: ", gpu_thread_mode);
542 LOG(WARNING) << error_message;
543 return errors::InvalidArgument(error_message);
544 }
545 }
546
547 TF_ASSIGN_OR_RETURN(
548 node_file_writer_,
549 NodeFileWriter::GetNodeFileWriterIfEnabled(name(), env()));
550 if (node_file_writer_) {
551 LOG(INFO) << "Writing NodeDefs to file: " << node_file_writer_->filename();
552 }
553
554 return OkStatus();
555 }
556
ComputeOpKernelDebugString(const OpKernel & op_kernel,const int & stream_id)557 string BaseGPUDevice::ComputeOpKernelDebugString(const OpKernel& op_kernel,
558 const int& stream_id) {
559 return strings::StrCat(op_kernel.name(), " op ", op_kernel.type_string(),
560 " on GPU ", tf_device_id_.value(), " stream[",
561 stream_id, "]");
562 }
563
564 namespace {
GetOpsToLogFromEnv()565 const absl::flat_hash_set<std::string>* GetOpsToLogFromEnv() {
566 auto* result = new absl::flat_hash_set<std::string>;
567 const char* env = getenv("TF_GPU_DEBUG_OPS_TO_LOG");
568 if (!env) {
569 return result;
570 }
571
572 std::vector<absl::string_view> ops = absl::StrSplit(env, ',');
573 LOG(INFO) << "Will log inputs & outputs from the following ops: ";
574 for (absl::string_view op : ops) {
575 result->insert(std::string(op));
576 LOG(INFO) << " |" << op << "|";
577 }
578
579 return result;
580 }
581
ShouldLogInputsAndOutputs(OpKernel * op_kernel)582 bool ShouldLogInputsAndOutputs(OpKernel* op_kernel) {
583 static const absl::flat_hash_set<std::string>& ops_to_log =
584 *GetOpsToLogFromEnv();
585 return ops_to_log.count(op_kernel->type_string());
586 }
587 } // namespace
588
CopyGpuTensorToHostDebugOnly(const Tensor & gpu_tensor)589 Tensor BaseGPUDevice::CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor) {
590 Tensor host_tensor(gpu_tensor.dtype(), gpu_tensor.shape());
591 CHECK(device_context_->stream()
592 ->ThenMemcpy(host_tensor.data(),
593 se::DeviceMemoryBase(gpu_tensor.data(),
594 gpu_tensor.TotalBytes()),
595 gpu_tensor.TotalBytes())
596 .BlockHostUntilDone()
597 .ok());
598 return host_tensor;
599 }
600
LogInputs(OpKernel * op_kernel,OpKernelContext * context)601 void BaseGPUDevice::LogInputs(OpKernel* op_kernel, OpKernelContext* context) {
602 LOG(INFO) << "Inputs for " << op_kernel->name() << " (total "
603 << context->num_inputs() << "):";
604 for (int i = 0; i < context->num_inputs(); i++) {
605 if (!context->has_input(i)) {
606 LOG(INFO) << "input # " << i << " is absent";
607 continue;
608 }
609
610 Tensor input = context->input_memory_type(i) == DEVICE_MEMORY
611 ? CopyGpuTensorToHostDebugOnly(context->input(i))
612 : context->input(i);
613
614 LOG(INFO) << "input # " << i;
615 LOG(INFO) << input.DebugString(-1);
616 }
617 LOG(INFO) << "";
618 }
619
LogOutputs(OpKernel * op_kernel,OpKernelContext * context)620 void BaseGPUDevice::LogOutputs(OpKernel* op_kernel, OpKernelContext* context) {
621 if (!context->status().ok()) {
622 LOG(INFO) << op_kernel->name()
623 << " failed: " << context->status().error_message();
624 return;
625 }
626
627 LOG(INFO) << "Outputs for " << op_kernel->name() << " (total "
628 << context->num_inputs() << "):";
629 for (int i = 0; i < context->num_outputs(); i++) {
630 Tensor* output_ptr = context->mutable_output(i);
631 if (output_ptr == nullptr) {
632 LOG(INFO) << "output # " << i << " is null";
633 continue;
634 }
635
636 Tensor output = context->output_memory_type(i) == DEVICE_MEMORY
637 ? CopyGpuTensorToHostDebugOnly(*output_ptr)
638 : *output_ptr;
639
640 LOG(INFO) << "output # " << i;
641 LOG(INFO) << output.DebugString(-1);
642 }
643 LOG(INFO) << "";
644 }
645
Compute(OpKernel * op_kernel,OpKernelContext * context)646 void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
647 // NOTE(tucker): We need to discriminate between Eigen GPU
648 // operations and all others. If an operation is Eigen
649 // implemented (or otherwise tries to launch a GPU kernel
650 // directly), we need to establish a stacked-scoped environment
651 // that directs it to execute on the proper device. Otherwise we
652 // expect the Op to use StreamExecutor directly and correctly.
653 GPUDeviceContext* gpu_device_context = device_context_;
654 if (context->op_device_context() != nullptr) {
655 gpu_device_context =
656 static_cast<GPUDeviceContext*>(context->op_device_context());
657 }
658 se::Stream* stream = gpu_device_context->stream();
659 const auto stream_id = gpu_device_context->stream_id();
660
661 const bool vlog_1 = VLOG_IS_ON(1);
662
663 if (vlog_1) {
664 VLOG(1) << "GpuDevice::ComputeHelper "
665 << ComputeOpKernelDebugString(*op_kernel, stream_id);
666 }
667
668 if (kernel_tracker_.get()) {
669 context->set_record_memory_consumption(true);
670 if (pending_cap_ > 0) {
671 kernel_tracker_->PauseWhilePendingExceeds(pending_cap_);
672 }
673 }
674 ScopedActivateExecutorContext scoped_activation{stream->parent()};
675 profiler::ScopedMemoryDebugAnnotation op_annotation(
676 op_kernel->name_view().data(), context->step_id());
677 bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
678
679 if (should_log_inputs_and_outputs) {
680 LogInputs(op_kernel, context);
681 }
682
683 op_kernel->Compute(context);
684
685 if (should_log_inputs_and_outputs) {
686 LogOutputs(op_kernel, context);
687 }
688
689 if (context->status().ok()) {
690 if (sync_every_op_) {
691 // Note: GPUUtil::Sync() only syncs the default stream.
692 // We need to either sync the stream used by this op, or
693 // all streams. Given that this flag is typically used for
694 // debugging it makes more sense to sync all GPU activity.
695 context->SetStatus(GPUUtil::SyncAll(this));
696 if (vlog_1) {
697 VLOG(1) << "GpuDevice::ComputeHelper finished "
698 << ComputeOpKernelDebugString(*op_kernel, stream_id);
699 }
700 } else if (vlog_1) {
701 VLOG(1) << "GpuDevice::ComputeHelper scheduled "
702 << ComputeOpKernelDebugString(*op_kernel, stream_id);
703 }
704 if (kernel_tracker_) {
705 GPUKernelTracker* tracker = kernel_tracker_.get();
706 DCHECK(tracker);
707 uint64 queued_count = tracker->MaybeQueue(context);
708 if (queued_count > 0) {
709 em_->ThenExecute(stream, [tracker, queued_count]() {
710 tracker->RecordTerminated(queued_count);
711 });
712 }
713 }
714 if (node_file_writer_) {
715 Status s = node_file_writer_->RecordNodeExecution(op_kernel, context);
716 if (!s.ok()) {
717 LOG(ERROR) << s;
718 context->SetStatus(s);
719 }
720 }
721 } else {
722 if (vlog_1) {
723 VLOG(1) << "GpuDevice::ComputeHelper failed to schedule "
724 << ComputeOpKernelDebugString(*op_kernel, stream_id);
725 }
726 }
727 }
728
Sync()729 Status BaseGPUDevice::Sync() {
730 DCHECK_NE(stream_, nullptr);
731
732 // Device::Sync is supposed to block until all operations queued on the device
733 // at the time of the call have completed. On GPUs, only operations enqueued
734 // on the compute stream can remain pending after the (Async)OpKernel that
735 // enqueued the operation has completed. We do use other streams for copies
736 // and collectives, but in those cases the (Async)OpKernels themselves block
737 // until the queued operation has finished.
738 return stream_->compute->BlockHostUntilDone();
739 }
740
ComputeAsync(AsyncOpKernel * op_kernel,OpKernelContext * context,AsyncOpKernel::DoneCallback done)741 void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel,
742 OpKernelContext* context,
743 AsyncOpKernel::DoneCallback done) {
744 GPUDeviceContext* gpu_device_context = device_context_;
745 if (context->op_device_context() != nullptr) {
746 gpu_device_context =
747 static_cast<GPUDeviceContext*>(context->op_device_context());
748 }
749 se::Stream* stream = gpu_device_context->stream();
750 const auto stream_id = gpu_device_context->stream_id();
751
752 VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op "
753 << op_kernel->type_string() << " on GPU" << tf_device_id_
754 << " stream[" << stream_id << "]";
755
756 bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
757
758 if (should_log_inputs_and_outputs) {
759 LogInputs(op_kernel, context);
760 AsyncOpKernel::DoneCallback parent_done = done;
761 done = [this, parent_done, should_log_inputs_and_outputs, op_kernel,
762 context]() {
763 LogOutputs(op_kernel, context);
764 parent_done();
765 };
766 }
767
768 ScopedActivateExecutorContext scoped_activation{stream->parent()};
769 op_kernel->ComputeAsync(context, std::move(done));
770 }
771
MaybeCopyTensorToGPU(const AllocatorAttributes & alloc_attrs,const Tensor & from,Tensor * to,StatusCallback done)772 Status BaseGPUDevice::MaybeCopyTensorToGPU(
773 const AllocatorAttributes& alloc_attrs, const Tensor& from, Tensor* to,
774 StatusCallback done) {
775 if (alloc_attrs.on_host()) {
776 *to = from;
777 done(OkStatus());
778 return OkStatus();
779 } else {
780 if (!DMAHelper::CanUseDMA(&from)) {
781 Status err = errors::Internal("GPU copy from non-DMA ",
782 DataTypeString(from.dtype()), " tensor");
783 done(err);
784 return err;
785 }
786 AllocationAttributes allocation_attr;
787 uint64 safe_alloc_frontier = 0;
788 std::function<uint64()> freed_by_func = [this, &safe_alloc_frontier]() {
789 safe_alloc_frontier = SafeAllocFrontier(safe_alloc_frontier);
790 return safe_alloc_frontier;
791 };
792 if (timestamped_allocator_) {
793 allocation_attr.freed_by_func = &freed_by_func;
794 }
795 auto* copy = new Tensor(GetAllocator(alloc_attrs), from.dtype(),
796 from.shape(), allocation_attr);
797
798 // If the tensor is not initialized, we likely ran out of memory.
799 if (!copy->IsInitialized()) {
800 delete copy;
801 Status err = errors::ResourceExhausted(
802 "OOM when allocating tensor of shape ", from.shape().DebugString(),
803 " and type ", DataTypeString(from.dtype()));
804 done(err);
805 return err;
806 }
807
808 auto wrapped_done = [to, copy, done = std::move(done)](const Status& s) {
809 if (s.ok()) {
810 *to = std::move(*copy);
811 }
812 delete copy;
813 done(s);
814 };
815
816 profiler::ScopedAnnotation annotation("MakeTensorFromProto");
817 device_context_->CopyCPUTensorToDevice(
818 &from, this, copy, std::move(wrapped_done),
819 !timestamped_allocator_ /*sync_dst_compute*/);
820 return OkStatus();
821 }
822 }
823
MakeTensorFromProto(const TensorProto & tensor_proto,const AllocatorAttributes alloc_attrs,Tensor * tensor)824 Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto,
825 const AllocatorAttributes alloc_attrs,
826 Tensor* tensor) {
827 AllocatorAttributes attr;
828 attr.set_on_host(true);
829 attr.set_gpu_compatible(true);
830 Allocator* host_alloc = GetAllocator(attr);
831 Tensor parsed(tensor_proto.dtype());
832 if (!parsed.FromProto(host_alloc, tensor_proto)) {
833 return errors::InvalidArgument("Cannot parse tensor from proto: ",
834 tensor_proto.DebugString());
835 }
836
837 profiler::ScopedMemoryDebugAnnotation op_annotation(
838 "MakeTensorFromProto", "dynamic", parsed.dtype(),
839 [&parsed]() { return parsed.shape().DebugString(); });
840 if (parsed.dtype() == DT_VARIANT) {
841 const Variant* from = parsed.flat<Variant>().data();
842 int numa_node = attributes().locality().numa_node();
843 Tensor copy(cpu_allocator(numa_node), DT_VARIANT, parsed.shape());
844 Variant* copy_variant = copy.flat<Variant>().data();
845
846 std::list<Notification> notifications;
847 Status copy_status;
848 auto copier = [this, &alloc_attrs, ¬ifications, ©_status](
849 const Tensor& from, Tensor* to) {
850 // Copier isn't run in a multithreaded environment, so we don't
851 // have to worry about the notifications list being modified in parallel.
852 notifications.emplace_back();
853 Notification& n = *notifications.rbegin();
854 return MaybeCopyTensorToGPU(alloc_attrs, from, to,
855 [&n, ©_status](const Status& s) {
856 if (copy_status.ok()) {
857 copy_status.Update(s);
858 }
859 n.Notify();
860 });
861 };
862 Status s;
863 for (int64_t ix = 0; ix < parsed.NumElements(); ++ix) {
864 s = VariantDeviceCopy(VariantDeviceCopyDirection::HOST_TO_DEVICE,
865 from[ix], ©_variant[ix], copier);
866 if (!s.ok()) {
867 break;
868 }
869 }
870 for (auto& n : notifications) {
871 n.WaitForNotification();
872 }
873 if (!s.ok()) {
874 return s;
875 }
876 *tensor = std::move(copy);
877 return copy_status;
878 } else {
879 Notification n;
880 Status status;
881 TF_RETURN_IF_ERROR(MaybeCopyTensorToGPU(alloc_attrs, parsed, tensor,
882 [&n, &status](const Status& s) {
883 status = s;
884 n.Notify();
885 }));
886 n.WaitForNotification();
887 return status;
888 }
889 }
890
CopyTensorInSameDevice(const Tensor * input_tensor,Tensor * output_tensor,const DeviceContext * device_context,StatusCallback done)891 void BaseGPUDevice::CopyTensorInSameDevice(const Tensor* input_tensor,
892 Tensor* output_tensor,
893 const DeviceContext* device_context,
894 StatusCallback done) {
895 GPUUtil::CopyGPUTensorToSameGPU(static_cast<Device*>(this), device_context,
896 input_tensor, output_tensor, std::move(done));
897 }
898
ConcretePerOpGpuDevice()899 ConcretePerOpGpuDevice::ConcretePerOpGpuDevice()
900 : stream_device_(std::make_unique<EigenGpuStreamDevice>()) {}
901
Reinitialize(OpKernelContext * context,const void * gpu_stream,TfDeviceId tf_device_id,Allocator * base_allocator,char * scratch)902 void ConcretePerOpGpuDevice::Reinitialize(OpKernelContext* context,
903 const void* gpu_stream,
904 TfDeviceId tf_device_id,
905 Allocator* base_allocator,
906 char* scratch) {
907 PlatformDeviceId platform_device_id;
908 TF_CHECK_OK(
909 GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
910 static_cast<EigenGpuStreamDevice*>(stream_device_.get())
911 ->Reinitialize(context, static_cast<const gpuStream_t*>(gpu_stream),
912 platform_device_id, base_allocator, scratch);
913 }
914
Reinitialize(OpKernelContext * context,const void * gpu_stream,PlatformDeviceId platform_device_id,Allocator * base_allocator,char * scratch)915 void ConcretePerOpGpuDevice::Reinitialize(OpKernelContext* context,
916 const void* gpu_stream,
917 PlatformDeviceId platform_device_id,
918 Allocator* base_allocator,
919 char* scratch) {
920 static_cast<EigenGpuStreamDevice*>(stream_device_.get())
921 ->Reinitialize(context, static_cast<const gpuStream_t*>(gpu_stream),
922 platform_device_id, base_allocator, scratch);
923 }
924
device() const925 const Eigen::GpuDevice& ConcretePerOpGpuDevice::device() const {
926 return static_cast<EigenGpuStreamDevice*>(stream_device_.get())->device();
927 }
928
929 namespace {
930
VerifyVirtualDeviceSettings(const size_t num_gpus_to_use,const GPUOptions & gpu_options,const std::vector<PlatformDeviceId> & visible_gpu_order,const std::vector<PlatformDeviceId> & valid_platform_device_ids,const std::map<int,std::pair<int,int>> & supported_priority_ranges)931 Status VerifyVirtualDeviceSettings(
932 const size_t num_gpus_to_use, const GPUOptions& gpu_options,
933 const std::vector<PlatformDeviceId>& visible_gpu_order,
934 const std::vector<PlatformDeviceId>& valid_platform_device_ids,
935 const std::map<int, std::pair<int, int>>& supported_priority_ranges) {
936 const auto& virtual_devices = gpu_options.experimental().virtual_devices();
937 CHECK(!virtual_devices.empty());
938 if (gpu_options.per_process_gpu_memory_fraction() > 0) {
939 return errors::InvalidArgument(
940 "It's invalid to set per_process_gpu_memory_fraction when "
941 "virtual_devices is set.");
942 }
943 if (num_gpus_to_use < virtual_devices.size()) {
944 return errors::Unknown(
945 "Not enough GPUs to create virtual devices."
946 " num_gpus_to_use: ",
947 num_gpus_to_use, " #virtual_devices: ", virtual_devices.size());
948 }
949 if (!gpu_options.visible_device_list().empty() &&
950 visible_gpu_order.size() != virtual_devices.size()) {
951 return errors::InvalidArgument(
952 "The number of GPUs in visible_device_list doesn't match the number "
953 "of elements in the virtual_devices list.",
954 " #GPUs in visible_device_list: ", visible_gpu_order.size(),
955 " virtual_devices.size(): ", virtual_devices.size());
956 }
957 if (valid_platform_device_ids.size() != virtual_devices.size()) {
958 return errors::Unknown(
959 "The number of valid GPUs doesn't match the number of elements in "
960 "the virtual_devices list.",
961 " #valid GPUs: ", valid_platform_device_ids.size(),
962 " virtual_devices.size(): ", virtual_devices.size());
963 }
964 for (int i = 0; i < virtual_devices.size(); ++i) {
965 // Compares against the first virtual_device list.
966 if (virtual_devices.Get(0).device_ordinal().empty() !=
967 virtual_devices.Get(i).device_ordinal().empty()) {
968 return errors::InvalidArgument(
969 "Device ordinals must be set for all virtual devices or none. But "
970 "the device_ordinal is specified for ",
971 i, " while previous devices didn't have any set.");
972 }
973 }
974 if (!virtual_devices.Get(0).device_ordinal().empty()) {
975 for (int i = 0; i < virtual_devices.size(); ++i) {
976 const size_t memory_limit_mb_size =
977 virtual_devices.Get(i).memory_limit_mb().size();
978 const size_t device_ordinal_size =
979 virtual_devices.Get(i).device_ordinal().size();
980 if (memory_limit_mb_size != device_ordinal_size) {
981 return errors::InvalidArgument(
982 "Number of virtual device ordinals specified doesn't "
983 "match with number of memory_limit_mb specified for GPU# ",
984 i, " memory_limit_mb size: ", memory_limit_mb_size,
985 " and device_ordinal size: ", device_ordinal_size);
986 }
987 }
988 }
989 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
990 // Check memory_limt_mb and priority sizes match if priority is non-empty.
991 bool priority_exists = !virtual_devices.Get(0).priority().empty();
992 for (int i = 0; i < virtual_devices.size(); ++i) {
993 const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
994 const auto& priority = virtual_devices.Get(i).priority();
995 // If the priority is empty in the first one then treat this as having no
996 // priority set in any of the virtual devices for backward compatibility.
997 // Either it's set for all or none.
998 if (!priority_exists) {
999 if (!priority.empty()) {
1000 return errors::InvalidArgument(
1001 "Priority must be set for all virtual devices or none. But the "
1002 "priority is specified for ",
1003 i,
1004 " while previous devices didn't "
1005 "have any set.");
1006 }
1007 }
1008 if (priority_exists && memory_limit_mb.size() != priority.size()) {
1009 return errors::InvalidArgument(
1010 "Number of virtual device priorities specified doesn't "
1011 "match with number of memory_limit_mb specified for GPU# ",
1012 i, " memory_limit_mb size: ", memory_limit_mb.size(),
1013 " and priority size: ", priority.size());
1014 }
1015 const int gpu_id = valid_platform_device_ids[i].value();
1016 auto it = supported_priority_ranges.find(gpu_id);
1017 if (it == supported_priority_ranges.end()) {
1018 return errors::Internal(
1019 "Failed to find supported priority range for GPU"
1020 " device ",
1021 gpu_id);
1022 }
1023 const std::pair<int, int>& priority_range = it->second;
1024 for (int p : priority) {
1025 if (p > priority_range.first || p < priority_range.second) {
1026 return errors::InvalidArgument(
1027 "Priority ", p,
1028 " is outside the range of supported priorities "
1029 "[",
1030 priority_range.second, ",", priority_range.first,
1031 "] for virtual device ", i, " on GPU# ", gpu_id);
1032 }
1033 }
1034 }
1035 #endif
1036
1037 return OkStatus();
1038 }
1039
MinSystemMemory(int64_t available_memory,int cc_major)1040 int64_t MinSystemMemory(int64_t available_memory, int cc_major) {
1041 // We use the following heuristic for now:
1042 //
1043 // If the available_memory is < 2GiB, we allocate 225MiB to system memory.
1044 // Otherwise, depending on the capability version assign
1045 // 500MiB (for cuda_compute_capability <= 6.x) or
1046 // 1050MiB (for cuda_compute_capability <= 7.x) or
1047 // 1536MiB (for cuda_compute_capability >= 8.x)
1048 int64_t min_system_memory;
1049 if (available_memory < (1LL << 31)) {
1050 min_system_memory = 225 * 1024 * 1024;
1051 } else {
1052 if (cc_major <= 6) {
1053 min_system_memory = 500 * 1024 * 1024;
1054 } else if (cc_major <= 7) {
1055 min_system_memory = 1050 * 1024 * 1024;
1056 } else {
1057 min_system_memory = 1536 * 1024 * 1024;
1058 }
1059 }
1060 #if defined(__GNUC__) && defined(__OPTIMIZE__)
1061 // Do nothing
1062 #elif !defined(__GNUC__) && defined(NDEBUG)
1063 // Do nothing
1064 #else
1065 // Double the amount of available GPU memory in non-opt builds (debug
1066 // builds in windows); because in non-opt builds more system memory
1067 // is necessary.
1068 min_system_memory *= 2;
1069 #endif
1070
1071 #if defined(ANDROID_TEGRA)
1072 // 1GB system mem for NVIDIA Tegra devices since they use the same mem for
1073 // RAM and Video RAM
1074 min_system_memory = 1 << 30;
1075 #endif
1076
1077 VLOG(5) << "available_memory = " << available_memory;
1078 VLOG(5) << "min_system_memory = " << min_system_memory;
1079 return min_system_memory;
1080 }
1081
1082 // Get the memory limit for the virtual device being created on GPU with
1083 // 'platform_device_id', when that virtual device is the only virtual device
1084 // being created on that GPU.
SingleVirtualDeviceMemoryLimit(const GPUOptions & gpu_options,PlatformDeviceId platform_device_id,int64_t * memory_limit)1085 Status SingleVirtualDeviceMemoryLimit(const GPUOptions& gpu_options,
1086 PlatformDeviceId platform_device_id,
1087 int64_t* memory_limit) {
1088 int64_t total_memory = 0;
1089 int64_t available_memory = 0;
1090 se::StreamExecutor* se = DeviceIdUtil::ExecutorForPlatformDeviceId(
1091 GPUMachineManager(), platform_device_id)
1092 .ValueOrDie();
1093 if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
1094 return errors::Unknown("Failed to query available memory for GPU ",
1095 platform_device_id.value());
1096 }
1097
1098 int64_t allocated_memory = 0;
1099 const double per_process_gpu_memory_fraction =
1100 gpu_options.per_process_gpu_memory_fraction();
1101 se::CudaComputeCapability cc =
1102 se->GetDeviceDescription().cuda_compute_capability();
1103 if ((per_process_gpu_memory_fraction > 1.0 ||
1104 gpu_options.experimental().use_unified_memory()) &&
1105 !cc.IsAtLeast(se::CudaComputeCapability::PASCAL_)) {
1106 return errors::Internal(
1107 "Unified memory on GPUs with compute capability lower than 6.0 "
1108 "(pre-Pascal class GPUs) does not support oversubscription.");
1109 }
1110
1111 if (per_process_gpu_memory_fraction == 0) {
1112 allocated_memory = available_memory;
1113 const int64_t min_system_memory =
1114 MinSystemMemory(available_memory, cc.major);
1115 if (min_system_memory < allocated_memory) {
1116 allocated_memory -= min_system_memory;
1117 }
1118 } else {
1119 allocated_memory = total_memory * per_process_gpu_memory_fraction;
1120 }
1121
1122 // Override the excluded memory when TF_DEVICE_MIN_SYS_MEMORY_IN_MB is set.
1123 const char* force_device_reserved_bytes =
1124 std::getenv("TF_DEVICE_MIN_SYS_MEMORY_IN_MB");
1125 if (force_device_reserved_bytes != nullptr &&
1126 strcmp(force_device_reserved_bytes, "") != 0) {
1127 int64_t reserved_mb;
1128 if (!strings::safe_strto64(force_device_reserved_bytes, &reserved_mb) ||
1129 reserved_mb < 0) {
1130 LOG(WARNING) << "The requested reserved device memory "
1131 << force_device_reserved_bytes
1132 << " is invalid. The request will be ignored.";
1133 } else {
1134 // Convert MBytes to Bytes.
1135 int64_t allowable_reserved_memory = reserved_mb * 1024 * 1024;
1136 // TF_DEVICE_MIN_SYS_MEMORY_IN_MB overrides
1137 // per_process_gpu_memory_fraction.
1138 if (allowable_reserved_memory <= available_memory) {
1139 allocated_memory = available_memory - allowable_reserved_memory;
1140 VLOG(1) << "Setting the GPU reserved bytes to "
1141 << strings::HumanReadableNumBytes(allocated_memory)
1142 << " MBytes";
1143 } else {
1144 LOG(WARNING) << "The requested reserved device memory "
1145 << strings::HumanReadableNumBytes(
1146 allowable_reserved_memory)
1147 << " is larger than the available memory of "
1148 << strings::HumanReadableNumBytes(available_memory)
1149 << ". The request is ignored.";
1150 }
1151 }
1152 }
1153 *memory_limit = allocated_memory;
1154 return OkStatus();
1155 }
1156 } // namespace
1157
ReinitializeDevice(OpKernelContext * context,PerOpGpuDevice * device,int stream_id,Allocator * allocator)1158 void BaseGPUDevice::ReinitializeDevice(OpKernelContext* context,
1159 PerOpGpuDevice* device, int stream_id,
1160 Allocator* allocator) {
1161 ConcretePerOpGpuDevice* concrete_device =
1162 static_cast<ConcretePerOpGpuDevice*>(device);
1163 DCHECK(concrete_device);
1164 DCHECK_EQ(stream_id, 0);
1165 const gpuStream_t* gpu_stream = reinterpret_cast<const gpuStream_t*>(
1166 stream_->compute->implementation()->GpuStreamMemberHack());
1167 concrete_device->Reinitialize(context, gpu_stream, tf_device_id_, allocator,
1168 scratch_);
1169 }
1170
MakeGpuDevice()1171 PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice() {
1172 return new ConcretePerOpGpuDevice();
1173 }
1174
ReinitializeGpuDevice(OpKernelContext * context,PerOpGpuDevice * device,DeviceContext * dc,Allocator * allocator)1175 Status BaseGPUDevice::ReinitializeGpuDevice(OpKernelContext* context,
1176 PerOpGpuDevice* device,
1177 DeviceContext* dc,
1178 Allocator* allocator) {
1179 TF_RETURN_IF_ERROR(InitScratchBuffers());
1180 if (dc) {
1181 const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc);
1182 const int stream_id = gpu_dc->stream_id();
1183 VLOG(1) << " eigen_gpu_device(" << dc << ") => stream[" << stream_id
1184 << "]";
1185 CHECK_EQ(stream_id, 0);
1186 ReinitializeDevice(context, device, stream_id, allocator);
1187 } else {
1188 ReinitializeDevice(context, device, 0, allocator);
1189 }
1190 return OkStatus();
1191 }
1192
GetScopedAllocator(AllocatorAttributes attr,int64_t step_id)1193 Allocator* BaseGPUDevice::GetScopedAllocator(AllocatorAttributes attr,
1194 int64_t step_id) {
1195 if (attr.scope_id > 0) {
1196 return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance(
1197 attr.scope_id);
1198 }
1199 LOG(FATAL) << "Unexpected call to BaseGPUDevice::GetScopedAllocator "
1200 << "attr.scope_id = " << attr.scope_id;
1201 return gpu_allocator_;
1202 }
1203
1204 const int BaseGPUDeviceFactory::InterconnectMap::kSameDeviceStrength = 1000;
1205 const int BaseGPUDeviceFactory::InterconnectMap::kStreamExecutorStrength = 1;
1206
CacheDeviceIds()1207 Status BaseGPUDeviceFactory::CacheDeviceIds() {
1208 if (!cached_device_ids_.empty()) {
1209 return OkStatus();
1210 }
1211
1212 TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1213 se::Platform* gpu_manager = GPUMachineManager();
1214 if (gpu_manager == nullptr) {
1215 return OkStatus();
1216 }
1217
1218 int device_count = gpu_manager->VisibleDeviceCount();
1219 if (device_count <= 0) {
1220 return OkStatus();
1221 }
1222
1223 std::vector<PlatformDeviceId> visible_gpu_order(device_count);
1224 std::iota(visible_gpu_order.begin(), visible_gpu_order.end(), 0);
1225 TF_RETURN_IF_ERROR(GetValidDeviceIds(visible_gpu_order, &cached_device_ids_));
1226 return OkStatus();
1227 }
1228
ListPhysicalDevices(std::vector<string> * devices)1229 Status BaseGPUDeviceFactory::ListPhysicalDevices(std::vector<string>* devices) {
1230 TF_RETURN_IF_ERROR(CacheDeviceIds());
1231 for (PlatformDeviceId platform_device_id : cached_device_ids_) {
1232 const string device_name =
1233 strings::StrCat("/physical_device:GPU:", platform_device_id.value());
1234 devices->push_back(device_name);
1235 }
1236
1237 return OkStatus();
1238 }
1239
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)1240 Status BaseGPUDeviceFactory::GetDeviceDetails(
1241 int device_index, std::unordered_map<string, string>* details) {
1242 TF_RETURN_IF_ERROR(CacheDeviceIds());
1243
1244 if (device_index < 0 || device_index > cached_device_ids_.size()) {
1245 return errors::Internal("Invalid device index: ", device_index);
1246 }
1247 PlatformDeviceId platform_device_id = cached_device_ids_[device_index];
1248
1249 TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1250 se::Platform* gpu_manager = GPUMachineManager();
1251 if (gpu_manager == nullptr) {
1252 return errors::Internal("Cannot get GPUMachineManager");
1253 }
1254 auto desc_status =
1255 gpu_manager->DescriptionForDevice(platform_device_id.value());
1256 if (!desc_status.ok()) {
1257 return desc_status.status();
1258 }
1259
1260 auto desc = std::move(desc_status).value();
1261 (*details)["device_name"] = desc->name();
1262 #if GOOGLE_CUDA
1263 (*details)["compute_capability"] = desc->cuda_compute_capability().ToString();
1264 #endif // GOOGLE_CUDA
1265 return OkStatus();
1266 }
1267
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)1268 Status BaseGPUDeviceFactory::CreateDevices(
1269 const SessionOptions& options, const string& name_prefix,
1270 std::vector<std::unique_ptr<Device>>* devices) {
1271 TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1272 se::Platform* gpu_manager = GPUMachineManager();
1273 if (gpu_manager == nullptr) {
1274 return OkStatus();
1275 }
1276 // If there are no GPUs visible, do nothing.
1277 if (gpu_manager->VisibleDeviceCount() <= 0) {
1278 return OkStatus();
1279 }
1280
1281 size_t num_gpus_to_use = INT_MAX;
1282 auto iter = options.config.device_count().find("GPU");
1283 if (iter != options.config.device_count().end()) {
1284 num_gpus_to_use = iter->second;
1285 }
1286 const auto& gpu_options = options.config.gpu_options();
1287 std::vector<PlatformDeviceId> visible_gpu_order;
1288 std::vector<PlatformDeviceId> valid_platform_device_ids;
1289 // If we aren't going to use any GPUs, don't initialize them.
1290 // We don't want to call ParseVisibleDeviceList if num_gpus_to_use is 0,
1291 // because it treats an empty gpu_options.visible_device_list as 'all GPUs
1292 // are visible'.
1293 if (num_gpus_to_use > 0) {
1294 TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
1295 gpu_options.visible_device_list(), gpu_manager->VisibleDeviceCount(),
1296 &visible_gpu_order));
1297 bool new_gpu_found = false;
1298 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1299 int visible_gpu_id = visible_gpu_order[i].value();
1300
1301 // Only perform this once per visible gpu id.
1302 if (visible_gpu_initialized_[visible_gpu_id]) {
1303 continue;
1304 }
1305
1306 visible_gpu_initialized_[visible_gpu_id] = true;
1307 new_gpu_found = true;
1308 }
1309
1310 // Checking peering and shows matrix if more than one gpu found.
1311 if (new_gpu_found && visible_gpu_order.size() > 1) {
1312 // Enable peer access
1313 TF_RETURN_IF_ERROR(EnablePeerAccess(visible_gpu_order));
1314 }
1315
1316 TF_RETURN_IF_ERROR(
1317 GetValidDeviceIds(visible_gpu_order, &valid_platform_device_ids));
1318 }
1319 if (num_gpus_to_use > valid_platform_device_ids.size()) {
1320 num_gpus_to_use = valid_platform_device_ids.size();
1321 }
1322 std::map<int, std::pair<int, int>> supported_priority_ranges;
1323 if (!valid_platform_device_ids.empty()) {
1324 // Save the original device.
1325 int original_device = 0;
1326 #if GOOGLE_CUDA
1327 cudaError_t err = cudaGetDevice(&original_device);
1328 if (err != cudaSuccess) {
1329 return errors::Internal("cudaGetDevice() failed. Status: ",
1330 cudaGetErrorString(err));
1331 }
1332 #elif TENSORFLOW_USE_ROCM
1333 hipError_t err = hipGetDevice(&original_device);
1334 if (err != hipSuccess) {
1335 return errors::Internal("hipGetDevice() failed. Status: ",
1336 hipGetErrorString(err));
1337 }
1338 #endif
1339
1340 // Force to implicitly initialize CUDA runtime on each valid GPU before
1341 // CreateGPUDevice().
1342 for (PlatformDeviceId platform_device_id : valid_platform_device_ids) {
1343 #if GOOGLE_CUDA
1344 err = cudaSetDevice(platform_device_id.value());
1345 if (err != cudaSuccess) {
1346 return errors::Internal(
1347 "cudaSetDevice() on GPU:", platform_device_id.value(),
1348 " failed. Status: ", cudaGetErrorString(err));
1349 }
1350 int priority_low, priority_high;
1351 cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1352 if (err != cudaSuccess) {
1353 return errors::Internal(
1354 "cudaDeviceGetStreamPriorityRange() on GPU:", original_device,
1355 " failed. Status: ", cudaGetErrorString(err));
1356 }
1357 VLOG(1) << "Cuda stream priority range on GPU(" << original_device
1358 << "): " << priority_high << "," << priority_low;
1359 supported_priority_ranges.insert(
1360 std::make_pair(platform_device_id.value(),
1361 std::make_pair(priority_low, priority_high)));
1362 #elif TENSORFLOW_USE_ROCM
1363 err = hipSetDevice(platform_device_id.value());
1364 if (err != hipSuccess) {
1365 return errors::Internal(
1366 "hipSetDevice() on GPU:", platform_device_id.value(),
1367 " failed. Status: ", hipGetErrorString(err));
1368 }
1369 err = hipFree(nullptr);
1370 if (err != hipSuccess) {
1371 return errors::Internal("ROCm runtime implicit initialization on GPU:",
1372 platform_device_id.value(),
1373 " failed. Status: ", hipGetErrorString(err));
1374 }
1375 int priority_low, priority_high;
1376 hipDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1377 if (err != hipSuccess) {
1378 return errors::Internal(
1379 "hipDeviceGetStreamPriorityRange() on GPU:", original_device,
1380 " failed. Status: ", hipGetErrorString(err));
1381 }
1382 VLOG(1) << "HIP stream priority range on GPU(" << original_device
1383 << "): " << priority_high << "," << priority_low;
1384 supported_priority_ranges.insert(
1385 std::make_pair(platform_device_id.value(),
1386 std::make_pair(priority_low, priority_high)));
1387 #endif
1388 }
1389 // Reset to the original device.
1390 #if GOOGLE_CUDA
1391 err = cudaSetDevice(original_device);
1392 if (err != cudaSuccess) {
1393 return errors::Internal("cudaSetDevice() on GPU:", original_device,
1394 " failed. Status: ", cudaGetErrorString(err));
1395 }
1396 #elif TENSORFLOW_USE_ROCM
1397 err = hipSetDevice(original_device);
1398 if (err != hipSuccess) {
1399 return errors::Internal("hipSetDevice() on GPU:", original_device,
1400 " failed. Status: ", hipGetErrorString(err));
1401 }
1402 #endif
1403
1404 #if GOOGLE_CUDA
1405 // Log the version of CUDA and cuDNN
1406 int cuda_major_version = CUDART_VERSION / 1000;
1407 int cuda_minor_version = (CUDART_VERSION / 10) % 10;
1408 VLOG(1) << "TensorFlow compiled with CUDA " << cuda_major_version << "."
1409 << cuda_minor_version << " and cuDNN " << CUDNN_MAJOR << "."
1410 << CUDNN_MINOR << "." << CUDNN_PATCHLEVEL;
1411 #endif
1412 }
1413
1414 std::vector<InterconnectMap> interconnect_maps;
1415 TF_RETURN_IF_ERROR(
1416 GetInterconnectMaps(visible_gpu_order, gpu_manager, &interconnect_maps));
1417
1418 // Print each interconnect map to the log.
1419 for (const InterconnectMap& im : interconnect_maps) {
1420 VLOG(1) << "Device interconnect " << im.name << " with strength "
1421 << im.strength << " edge matrix:";
1422 string line_buf = " ";
1423 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1424 strings::StrAppend(&line_buf, visible_gpu_order[i].value(), " ");
1425 }
1426 VLOG(1) << line_buf;
1427 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1428 line_buf = strings::StrCat(visible_gpu_order[i].value(), ": ");
1429 PlatformDeviceId gpu_id_i = visible_gpu_order[i];
1430 for (int j = 0; j < visible_gpu_order.size(); ++j) {
1431 PlatformDeviceId gpu_id_j = visible_gpu_order[j];
1432 if (im.directed_links.find({gpu_id_i, gpu_id_j}) !=
1433 im.directed_links.end()) {
1434 line_buf.append("Y ");
1435 } else {
1436 line_buf.append("N ");
1437 }
1438 }
1439 VLOG(1) << line_buf;
1440 }
1441 }
1442
1443 const auto& virtual_devices = gpu_options.experimental().virtual_devices();
1444 if (!virtual_devices.empty()) {
1445 TF_RETURN_IF_ERROR(VerifyVirtualDeviceSettings(
1446 num_gpus_to_use, gpu_options, visible_gpu_order,
1447 valid_platform_device_ids, supported_priority_ranges));
1448 // We've verified that num_gpus_to_use >= virtual_devices.size().
1449 num_gpus_to_use = virtual_devices.size();
1450 CHECK(gpu_options.visible_device_list().empty() ||
1451 valid_platform_device_ids == visible_gpu_order);
1452 }
1453
1454 struct TfDeviceSpec {
1455 PlatformDeviceId platform_device_id;
1456 int64_t memory_limit_bytes;
1457 int index; // Index in the concatenated virtual device configuration list.
1458 int device_ordinal; // Ordinal for determining the tf device id.
1459 TfDeviceSpec(PlatformDeviceId platform_device_id,
1460 int64_t memory_limit_bytes, int index, int device_ordinal)
1461 : platform_device_id(platform_device_id),
1462 memory_limit_bytes(memory_limit_bytes),
1463 index(index),
1464 device_ordinal(device_ordinal) {}
1465 };
1466
1467 std::vector<TfDeviceSpec> tf_device_specs;
1468
1469 constexpr int64_t kMegaByte = 1ll << 20;
1470
1471 for (int i = 0; i < num_gpus_to_use; ++i) {
1472 const PlatformDeviceId platform_device_id = valid_platform_device_ids[i];
1473 if (virtual_devices.empty() ||
1474 virtual_devices.Get(i).memory_limit_mb_size() == 0) {
1475 int64_t single_virtual_device_memory_limit = 0;
1476 TF_RETURN_IF_ERROR(
1477 SingleVirtualDeviceMemoryLimit(gpu_options, platform_device_id,
1478 &single_virtual_device_memory_limit));
1479 tf_device_specs.emplace_back(
1480 platform_device_id, single_virtual_device_memory_limit,
1481 /*index=*/tf_device_specs.size(), /*device_ordinal=*/0);
1482 } else {
1483 const GPUOptions::Experimental::VirtualDevices& virtual_devices_for_gpu =
1484 virtual_devices.Get(i);
1485 for (int j = 0; j < virtual_devices_for_gpu.memory_limit_mb().size();
1486 j++) {
1487 tf_device_specs.emplace_back(
1488 platform_device_id,
1489 static_cast<int64_t>(virtual_devices_for_gpu.memory_limit_mb(j)) *
1490 kMegaByte,
1491 /*index=*/tf_device_specs.size(),
1492 /*device_ordinal=*/j <
1493 virtual_devices_for_gpu.device_ordinal().size()
1494 ? virtual_devices_for_gpu.device_ordinal(j)
1495 : 0);
1496 }
1497 }
1498 }
1499
1500 // Reorder virtual devices by their device_ordinal, breaking ties by the index
1501 // in the concatenated virtual device list.
1502 std::sort(tf_device_specs.begin(), tf_device_specs.end(),
1503 [](const TfDeviceSpec& a, const TfDeviceSpec& b) {
1504 if (a.device_ordinal < b.device_ordinal) {
1505 return true;
1506 } else if (a.device_ordinal > b.device_ordinal) {
1507 return false;
1508 }
1509 DCHECK_EQ(a.device_ordinal, b.device_ordinal);
1510 DCHECK_NE(a.index, b.index); // index is unique.
1511 if (a.index < b.index) {
1512 return true;
1513 }
1514 return false;
1515 });
1516
1517 for (int di = 0; di < tf_device_specs.size(); ++di) {
1518 TfDeviceId tf_device_id(di);
1519 TF_RETURN_IF_ERROR(GpuIdManager::InsertTfPlatformDeviceIdPair(
1520 tf_device_id, tf_device_specs[di].platform_device_id));
1521 }
1522
1523 LocalityMap device_localities;
1524 TF_RETURN_IF_ERROR(GetDeviceLocalities(
1525 tf_device_specs.size(), interconnect_maps, &device_localities));
1526
1527 // Build the GPUDevices
1528 for (int di = 0; di < tf_device_specs.size(); ++di) {
1529 TfDeviceId tf_device_id(di);
1530 auto it = device_localities.find(tf_device_id);
1531 if (it == device_localities.end()) {
1532 return errors::Internal("Failed to find DeviceLocality for GPU device ",
1533 tf_device_id.value());
1534 }
1535 TF_RETURN_IF_ERROR(CreateGPUDevice(options, name_prefix, tf_device_id,
1536 tf_device_specs[di].memory_limit_bytes,
1537 it->second, tf_device_specs.size(),
1538 devices));
1539 }
1540 return OkStatus();
1541 }
1542
GetShortDeviceDescription(PlatformDeviceId platform_device_id,const se::DeviceDescription & desc)1543 static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
1544 const se::DeviceDescription& desc) {
1545 #if GOOGLE_CUDA
1546 // LINT.IfChange
1547 return strings::StrCat(
1548 "device: ", platform_device_id.value(), ", name: ", desc.name(),
1549 ", pci bus id: ", desc.pci_bus_id(),
1550 ", compute capability: ", desc.cuda_compute_capability().ToString());
1551 // LINT.ThenChange(//tensorflow/python/framework/gpu_util.py)
1552 #elif TENSORFLOW_USE_ROCM
1553 return strings::StrCat("device: ", platform_device_id.value(),
1554 ", name: ", desc.name(),
1555 ", pci bus id: ", desc.pci_bus_id());
1556 #endif
1557 }
1558
CreateGPUDevice(const SessionOptions & options,const string & name_prefix,TfDeviceId tf_device_id,int64_t memory_limit,const DeviceLocality & dev_locality,size_t num_tf_gpus,std::vector<std::unique_ptr<Device>> * devices)1559 Status BaseGPUDeviceFactory::CreateGPUDevice(
1560 const SessionOptions& options, const string& name_prefix,
1561 TfDeviceId tf_device_id, int64_t memory_limit,
1562 const DeviceLocality& dev_locality, size_t num_tf_gpus,
1563 std::vector<std::unique_ptr<Device>>* devices) {
1564 CHECK_GE(tf_device_id.value(), 0);
1565 const string device_name =
1566 strings::StrCat(name_prefix, "/device:GPU:", tf_device_id.value());
1567 DeviceIdUtil::CheckValidTfDeviceId(DEVICE_GPU, GPUMachineManager(),
1568 tf_device_id);
1569 PlatformDeviceId platform_device_id;
1570 TF_RETURN_IF_ERROR(
1571 GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1572 int numa_node = dev_locality.numa_node();
1573
1574 se::Platform* gpu_manager = GPUMachineManager();
1575 auto desc_status =
1576 gpu_manager->DescriptionForDevice(platform_device_id.value());
1577 if (!desc_status.ok()) {
1578 return desc_status.status();
1579 }
1580 auto desc = std::move(desc_status).value();
1581
1582 std::vector<TfDeviceId> peer_gpu_ids;
1583 peer_gpu_ids.reserve(num_tf_gpus);
1584 for (int id = 0; id < num_tf_gpus; ++id) {
1585 TfDeviceId peer_tf_device_id(id);
1586 if (peer_tf_device_id != tf_device_id) {
1587 peer_gpu_ids.push_back(peer_tf_device_id);
1588 }
1589 }
1590
1591 GPUProcessState* process_state = GPUProcessState::singleton();
1592 Allocator* gpu_allocator = process_state->GetGPUAllocator(
1593 options.config.gpu_options(), tf_device_id, memory_limit, peer_gpu_ids);
1594 if (gpu_allocator == nullptr) {
1595 return errors::Internal("Failed to get memory allocator for TF GPU ",
1596 tf_device_id.value(), " with ", memory_limit,
1597 " bytes of memory.");
1598 }
1599 absl::optional<AllocatorStats> stats = gpu_allocator->GetStats();
1600 if (!stats) {
1601 return errors::Internal("No allocator statistics");
1602 }
1603 // 'memory_limit' is the required memory size, but if the allocator with
1604 // given tf_device_id was created before, we'll use it instead of creating a
1605 // new one (as TF gpu device is a shared resource), in which case the actual
1606 // memory limit represented by 'stats.bytes_limit' used by that allocator
1607 // may be different (which should be an error).
1608 //
1609 // TODO(laigd): report error if memory_limit doesn't match
1610 // stats->bytes_limit.
1611 int64_t bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
1612 std::unique_ptr<BaseGPUDevice> gpu_device = CreateGPUDevice(
1613 options, device_name, static_cast<Bytes>(bytes_limit), dev_locality,
1614 tf_device_id, GetShortDeviceDescription(platform_device_id, *desc),
1615 gpu_allocator, ProcessState::singleton()->GetCPUAllocator(numa_node));
1616 LOG(INFO) << "Created device " << device_name << " with "
1617 << (bytes_limit >> 20) << " MB memory: "
1618 << " -> " << GetShortDeviceDescription(platform_device_id, *desc);
1619 TF_RETURN_IF_ERROR(gpu_device->Init(options));
1620 gpu_allocator->SetStreamAndPreallocateMemory(gpu_device->GetStream());
1621 devices->push_back(std::move(gpu_device));
1622
1623 return OkStatus();
1624 }
1625
1626 namespace {
1627 std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
GetPeerAccessMap(se::Platform * platform,const std::vector<PlatformDeviceId> & visible_gpu_order)1628 GetPeerAccessMap(se::Platform* platform,
1629 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1630 std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
1631 map(new std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>);
1632 for (PlatformDeviceId platform_gpu_i : visible_gpu_order) {
1633 for (PlatformDeviceId platform_gpu_j : visible_gpu_order) {
1634 se::StreamExecutor* from =
1635 DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_i)
1636 .ValueOrDie();
1637 se::StreamExecutor* to =
1638 DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_j)
1639 .ValueOrDie();
1640 (*map)[{platform_gpu_i, platform_gpu_j}] =
1641 from->CanEnablePeerAccessTo(to);
1642 }
1643 }
1644
1645 return map;
1646 }
1647
1648 } // namespace
1649
GetInterconnectMaps(const std::vector<PlatformDeviceId> & visible_gpu_order,se::Platform * gpu_manager,std::vector<InterconnectMap> * maps)1650 Status BaseGPUDeviceFactory::GetInterconnectMaps(
1651 const std::vector<PlatformDeviceId>& visible_gpu_order,
1652 se::Platform* gpu_manager, std::vector<InterconnectMap>* maps) {
1653 // The default interconnect map is obtained from the StreamExecutor.
1654 auto access_map = GetPeerAccessMap(gpu_manager, visible_gpu_order);
1655 maps->resize(1);
1656 InterconnectMap& imap = maps->at(0);
1657 imap.name = "StreamExecutor";
1658 imap.strength = InterconnectMap::kStreamExecutorStrength;
1659 for (PlatformDeviceId gpu_id_i : visible_gpu_order) {
1660 for (PlatformDeviceId gpu_id_j : visible_gpu_order) {
1661 if (gpu_id_i == gpu_id_j) continue;
1662 if ((*access_map)[{gpu_id_i, gpu_id_j}]) {
1663 imap.directed_links.insert({gpu_id_i, gpu_id_j});
1664 }
1665 }
1666 }
1667 return OkStatus();
1668 }
1669
GetDeviceLocalities(int num_tf_gpus,const std::vector<InterconnectMap> & interconnects,LocalityMap * localities)1670 Status BaseGPUDeviceFactory::GetDeviceLocalities(
1671 int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
1672 LocalityMap* localities) {
1673 std::vector<TfDeviceId> all_tf_device_ids;
1674 all_tf_device_ids.reserve(num_tf_gpus);
1675 for (int i = 0; i < num_tf_gpus; ++i) {
1676 all_tf_device_ids.push_back(TfDeviceId(i));
1677 }
1678 for (TfDeviceId tf_device_id : all_tf_device_ids) {
1679 PlatformDeviceId platform_device_id;
1680 TF_RETURN_IF_ERROR(
1681 GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1682 // Get GPU bus_id from its reported NUMA affinity. Because GPUs are
1683 // virtualized in some environments, we can't just use the GPU id.
1684 // NUMA locales are indexed from 0, buses are indexed from 1.
1685 se::Platform* gpu_manager = GPUMachineManager();
1686 auto desc_status =
1687 gpu_manager->DescriptionForDevice(platform_device_id.value());
1688 if (!desc_status.ok()) {
1689 return desc_status.status();
1690 }
1691 auto desc = std::move(desc_status).value();
1692 int numa_node = desc->numa_node();
1693 if (numa_node < 0) {
1694 // For some reason the StreamExecutor couldn't get the NUMA
1695 // affinity of the GPU. If this is not a multi-socket mobo with
1696 // GPUs local to different buses, it doesn't matter. If it is, we
1697 // may run into trouble later with data transfer operations. The
1698 // trouble may manifest as slower than expected performance, or
1699 // outright failures.
1700 LOG(INFO) << "Could not identify NUMA node of platform GPU id "
1701 << platform_device_id
1702 << ", defaulting to 0. Your kernel may not have been built "
1703 << "with NUMA support.";
1704 numa_node = 0;
1705 }
1706 DeviceLocality dev_locality;
1707 dev_locality.set_numa_node(numa_node);
1708 dev_locality.set_bus_id(numa_node + 1);
1709
1710 // Set LocalLinks from InterconnectMaps.
1711 LocalLinks* links = dev_locality.mutable_links();
1712 for (const InterconnectMap& imap : interconnects) {
1713 for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1714 PlatformDeviceId platform_gpu_dst;
1715 TF_RETURN_IF_ERROR(
1716 GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1717 if (imap.directed_links.find({platform_device_id, platform_gpu_dst}) !=
1718 imap.directed_links.end()) {
1719 InterconnectLink* ilink = links->add_link();
1720 ilink->set_device_id(tf_gpu_dst.value());
1721 ilink->set_type(imap.name);
1722 ilink->set_strength(imap.strength);
1723 }
1724 }
1725 }
1726
1727 // If this is one of multiple virtual GPUs on the same physical GPU
1728 // add high strength links to the others.
1729 for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1730 if (tf_device_id == tf_gpu_dst) continue;
1731 PlatformDeviceId platform_gpu_dst;
1732 TF_RETURN_IF_ERROR(
1733 GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1734 if (platform_device_id == platform_gpu_dst) {
1735 InterconnectLink* ilink = links->add_link();
1736 ilink->set_device_id(tf_gpu_dst.value());
1737 ilink->set_type("SAME_DEVICE");
1738 ilink->set_strength(InterconnectMap::kSameDeviceStrength);
1739 }
1740 }
1741
1742 (*localities)[tf_device_id] = dev_locality;
1743 VLOG(1) << "GPUDevice PlatformDeviceId " << platform_device_id
1744 << " TfDeviceId " << tf_device_id << " on bus "
1745 << dev_locality.bus_id() << " numa: " << numa_node
1746 << " pci: " << desc->pci_bus_id()
1747 << " DeviceLocality: " << dev_locality.DebugString();
1748 }
1749 return OkStatus();
1750 }
1751
GetDefaultMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1752 static int GetDefaultMinGPUMultiprocessorCount(
1753 se::Platform* gpu_manager,
1754 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1755 static const int kDefaultMinGPUMultiprocessorCount = 8;
1756
1757 // Find the highest multi-processor count across all visible GPUs.
1758 int max_count = -1;
1759 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1760 int visible_gpu_id = visible_gpu_order[i].value();
1761 auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1762 if (!description_status.ok()) {
1763 continue;
1764 }
1765
1766 auto description = std::move(description_status).value();
1767 max_count = std::max(max_count, description->core_count());
1768 }
1769
1770 if (max_count < 0 || kDefaultMinGPUMultiprocessorCount < max_count) {
1771 return kDefaultMinGPUMultiprocessorCount;
1772 } else {
1773 return max_count;
1774 }
1775 }
1776
GetMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1777 static int GetMinGPUMultiprocessorCount(
1778 se::Platform* gpu_manager,
1779 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1780 const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT");
1781
1782 if (tf_min_gpu_core_count == nullptr ||
1783 strcmp(tf_min_gpu_core_count, "") == 0) {
1784 return GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1785 }
1786
1787 int min_gpu_core_count = -1;
1788 if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) {
1789 if (min_gpu_core_count >= 0) {
1790 return min_gpu_core_count;
1791 }
1792 }
1793
1794 int count =
1795 GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1796 LOG(ERROR) << "Invalid minimum GPU multiprocessor count: ["
1797 << tf_min_gpu_core_count << "]. "
1798 << "Using the default value: " << count;
1799 return count;
1800 }
1801
1802 namespace {
1803
1804 #if GOOGLE_CUDA
1805
ComputeCapabilityFromString(const std::string & version_name)1806 se::CudaComputeCapability ComputeCapabilityFromString(
1807 const std::string& version_name) {
1808 int major_part, minor_part;
1809 size_t dot_pos = version_name.find('.');
1810 CHECK(dot_pos != string::npos)
1811 << "Illegal version name: [" << version_name << "]";
1812 string major_str = version_name.substr(0, dot_pos);
1813 CHECK(strings::safe_strto32(major_str, &major_part))
1814 << "Illegal version name: [" << version_name << "]";
1815 string minor_str = version_name.substr(dot_pos + 1);
1816 CHECK(strings::safe_strto32(minor_str, &minor_part))
1817 << "Illegal version name: [" << version_name << "]";
1818 return se::CudaComputeCapability{major_part, minor_part};
1819 }
1820
GetSupportedCudaComputeCapabilities()1821 std::vector<se::CudaComputeCapability> GetSupportedCudaComputeCapabilities() {
1822 std::vector<se::CudaComputeCapability> cuda_caps = {
1823 ComputeCapabilityFromString("3.5"), ComputeCapabilityFromString("5.2")};
1824 #ifdef TF_EXTRA_CUDA_CAPABILITIES
1825 // TF_EXTRA_CUDA_CAPABILITIES should be defined a sequence separated by commas,
1826 // for example:
1827 // TF_EXTRA_CUDA_CAPABILITIES=3.0,4.0,5.0
1828 // Use two-level macro expansion for stringification.
1829 #define TF_XSTRING(...) #__VA_ARGS__
1830 #define TF_STRING(s) TF_XSTRING(s)
1831 string extra_cuda_caps = TF_STRING(TF_EXTRA_CUDA_CAPABILITIES);
1832 #undef TF_STRING
1833 #undef TF_XSTRING
1834 auto extra_capabilities = str_util::Split(extra_cuda_caps, ',');
1835 for (const auto& capability : extra_capabilities) {
1836 cuda_caps.push_back(ComputeCapabilityFromString(capability));
1837 }
1838 #endif
1839 return cuda_caps;
1840 }
1841 #endif // GOOGLE_CUDA
1842
1843 } // namespace
1844
EnablePeerAccess(const std::vector<PlatformDeviceId> & visible_gpu_order)1845 Status BaseGPUDeviceFactory::EnablePeerAccess(
1846 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1847 se::Platform* gpu_manager = GPUMachineManager();
1848 int possible_peer_count = 0;
1849 int enabled_peer_count = 0;
1850 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1851 const PlatformDeviceId platform_gpu_i = visible_gpu_order[i];
1852 for (int j = 0; j < visible_gpu_order.size(); ++j) {
1853 const PlatformDeviceId platform_gpu_j = visible_gpu_order[j];
1854 // We have already validated that ExecutorForDevice() calls return OK.
1855 se::StreamExecutor* from =
1856 DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_i)
1857 .ValueOrDie();
1858 se::StreamExecutor* to =
1859 DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_j)
1860 .ValueOrDie();
1861
1862 if (from->CanEnablePeerAccessTo(to)) {
1863 ++possible_peer_count;
1864 auto status = from->EnablePeerAccessTo(to);
1865 if (!status.ok()) {
1866 LOG(WARNING)
1867 << "Unable to enable peer access between device ordinals "
1868 << platform_gpu_i << " and " << platform_gpu_j
1869 << ", status: " << status;
1870 } else {
1871 ++enabled_peer_count;
1872 }
1873 }
1874 }
1875 }
1876
1877 // Return an error in the extreme failure case where the driver
1878 // reported that peering was possible but not a single peering was
1879 // successful. This is to catch possible system misconfigurations
1880 // or more fundamental issues.
1881 if (possible_peer_count > 0 && enabled_peer_count == 0) {
1882 return errors::Internal(possible_peer_count,
1883 " potential peer access pairs were reported by the "
1884 "driver, but no peering could be enabled.");
1885 }
1886 return OkStatus();
1887 }
1888
GetValidDeviceIds(const std::vector<PlatformDeviceId> & visible_gpu_order,std::vector<PlatformDeviceId> * ids)1889 Status BaseGPUDeviceFactory::GetValidDeviceIds(
1890 const std::vector<PlatformDeviceId>& visible_gpu_order,
1891 std::vector<PlatformDeviceId>* ids) {
1892 se::Platform* gpu_manager = GPUMachineManager();
1893 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1894 int visible_gpu_id = visible_gpu_order[i].value();
1895 auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1896 if (!description_status.ok()) {
1897 return description_status.status();
1898 }
1899
1900 auto description = std::move(description_status).value();
1901 #if GOOGLE_CUDA
1902 VLOG(1) << "Found device " << i << " with properties: "
1903 << "\npciBusID: " << description->pci_bus_id()
1904 << " name: " << description->name() << " computeCapability: "
1905 << description->cuda_compute_capability().ToString()
1906 << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1907 << " coreCount: " << description->core_count()
1908 << " deviceMemorySize: "
1909 << strings::HumanReadableNumBytes(description->device_memory_size())
1910 << " deviceMemoryBandwidth: "
1911 << strings::HumanReadableNumBytes(description->memory_bandwidth())
1912 << "/s";
1913 #elif TENSORFLOW_USE_ROCM
1914 std::string gcn_arch_name =
1915 description->rocm_compute_capability().gcn_arch_name();
1916 VLOG(1) << "Found device " << i << " with properties: "
1917 << "\npciBusID: " << description->pci_bus_id()
1918 << " name: " << description->name()
1919 << " ROCm AMDGPU Arch: " << gcn_arch_name
1920 << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1921 << " coreCount: " << description->core_count()
1922 << " deviceMemorySize: "
1923 << strings::HumanReadableNumBytes(description->device_memory_size())
1924 << " deviceMemoryBandwidth: "
1925 << strings::HumanReadableNumBytes(description->memory_bandwidth())
1926 << "/s";
1927 #endif
1928 }
1929
1930 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
1931 // Try to dlopen GPU libraries if they are supposed to be dynamically loaded.
1932 auto handle_or = se::internal::DsoLoader::MaybeTryDlopenGPULibraries();
1933 if (!handle_or.ok()) {
1934 LOG(WARNING) << "Cannot dlopen some GPU libraries. Please make sure the "
1935 "missing libraries mentioned above are installed properly "
1936 "if you would like to use GPU. Follow the guide at "
1937 "https://www.tensorflow.org/install/gpu for how to "
1938 "download and setup the required libraries for your "
1939 "platform.\nSkipping registering "
1940 "GPU devices...";
1941 return OkStatus();
1942 }
1943 #endif
1944
1945 #if GOOGLE_CUDA
1946 auto cuda_supported_capabilities = GetSupportedCudaComputeCapabilities();
1947 if (cuda_supported_capabilities.empty()) {
1948 return errors::FailedPrecondition(
1949 "No supported cuda capabilities in binary.");
1950 }
1951 se::CudaComputeCapability min_supported_capability = *std::min_element(
1952 cuda_supported_capabilities.begin(), cuda_supported_capabilities.end());
1953 #endif
1954
1955 int min_gpu_core_count =
1956 GetMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1957
1958 // Filter out devices that don't have the right capability or power.
1959 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1960 const PlatformDeviceId visible_gpu_id = visible_gpu_order[i];
1961 auto description_status =
1962 gpu_manager->DescriptionForDevice(visible_gpu_id.value());
1963 if (!description_status.ok()) {
1964 LOG(INFO) << "Ignoring visible gpu device " << visible_gpu_id
1965 << " whose executor is in invalid state: "
1966 << description_status.status().ToString();
1967 continue;
1968 }
1969
1970 auto desc = std::move(description_status).value();
1971
1972 #if GOOGLE_CUDA
1973 // Only GPUs with no less than the minimum supported compute capability is
1974 // accepted.
1975 if (desc->cuda_compute_capability() < min_supported_capability) {
1976 LOG(INFO) << "Ignoring visible gpu device "
1977 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1978 << ") "
1979 << "with Cuda compute capability "
1980 << desc->cuda_compute_capability().ToString()
1981 << ". The minimum required Cuda capability is "
1982 << min_supported_capability.ToString() << ".";
1983 continue;
1984 }
1985 #elif TENSORFLOW_USE_ROCM
1986 int device_isa;
1987 // Only GPUs with supported gfx versions are accepted.
1988 auto rocm_compute_capability = desc->rocm_compute_capability();
1989 if (!rocm_compute_capability.is_supported_gfx_version()) {
1990 LOG(INFO) << "Ignoring visible gpu device "
1991 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1992 << ") "
1993 << "with AMDGPU version : "
1994 << rocm_compute_capability.gfx_version()
1995 << ". The supported AMDGPU versions are "
1996 << rocm_compute_capability.supported_gfx_versions_str() << ".";
1997 continue;
1998 }
1999 #endif
2000
2001 // Filter out slow GPUs. By default, GPUs with a lower multiprocessor
2002 // count than the fastest GPU are filtered out, unless they have 8 or more
2003 // multiprocessors. If the TF_MIN_GPU_MULTIPROCESSOR_COUNT environment
2004 // variable is set, its value will be used to filter out GPUs.
2005 if (desc->core_count() < min_gpu_core_count) {
2006 LOG(INFO) << "Ignoring visible gpu device "
2007 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
2008 << ") "
2009 << "with core count: " << desc->core_count()
2010 << ". The minimum required count is " << min_gpu_core_count
2011 << ". You can adjust this requirement with the env var "
2012 "TF_MIN_GPU_MULTIPROCESSOR_COUNT.";
2013 continue;
2014 }
2015
2016 #if defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
2017 // Compare compute capability of device with list in cuda_config.h and
2018 // warn about long loading time when we need to JIT kernels from PTX.
2019 auto compute_capabilities = {TF_CUDA_COMPUTE_CAPABILITIES};
2020 auto device_capability = desc->cuda_compute_capability();
2021 if (std::count_if(std::cbegin(compute_capabilities),
2022 std::cend(compute_capabilities), [&](int cc) {
2023 // CUBINs are backwards compatible within major version.
2024 return cc / 10 == device_capability.major &&
2025 cc % 10 <= device_capability.minor;
2026 }) == 0) {
2027 LOG(WARNING)
2028 << "TensorFlow was not built with CUDA kernel binaries "
2029 "compatible with compute capability "
2030 << device_capability.ToString() << ". CUDA kernels will be "
2031 << "jit-compiled from PTX, which could take 30 minutes or longer.";
2032 }
2033 #endif // defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
2034
2035 ids->push_back(visible_gpu_id);
2036 }
2037 if (!ids->empty()) {
2038 std::vector<int> raw_ids(ids->size());
2039 std::transform(ids->begin(), ids->end(), raw_ids.begin(),
2040 [](PlatformDeviceId id) -> int { return id.value(); });
2041 VLOG(1) << "Adding visible gpu devices: " << absl::StrJoin(raw_ids, ", ");
2042 }
2043
2044 return OkStatus();
2045 }
2046
SafeAllocFrontier(uint64 old_value)2047 uint64 BaseGPUDevice::SafeAllocFrontier(uint64 old_value) {
2048 if (timestamped_allocator_) {
2049 return kernel_tracker_->LastTerminatedCount(old_value);
2050 } else {
2051 return 0;
2052 }
2053 }
2054
PendingKernels()2055 int BaseGPUDevice::PendingKernels() {
2056 if (kernel_tracker_) {
2057 return kernel_tracker_->NumPending();
2058 }
2059 return 0;
2060 }
2061
TestOnlyReset()2062 void BaseGPUDevice::TestOnlyReset() {
2063 StreamGroupFactory::Global().TestOnlyReset();
2064 }
2065
MaybeQueue(OpKernelContext * ctx)2066 uint64 GPUKernelTracker::MaybeQueue(OpKernelContext* ctx) {
2067 mutex_lock l(mu_);
2068 ++ops_since_last_;
2069 int64_t mem_used =
2070 ctx->persistent_memory_allocated() + ctx->temp_memory_allocated();
2071 VLOG(2) << "kernel: " << ctx->op_kernel().name() << " mem_used: " << mem_used;
2072 mem_since_last_ += mem_used;
2073 int weight = 1;
2074 // Note that if all {max_bytes, max_interval, max_pending} are zero then
2075 // we track every single kernel with no pending cap. This can happen if
2076 // timestamped_allocator alone was specified.
2077 if ((mem_since_last_ < params_.max_bytes) &&
2078 (ops_since_last_ < params_.max_interval)) {
2079 return 0;
2080 } else {
2081 weight = std::min(
2082 params_.max_pending,
2083 std::max(1, mem_since_last_ / std::max(16386, params_.max_bytes)));
2084 mem_since_last_ = 0;
2085 ops_since_last_ = 0;
2086 }
2087 uint64 queued_count = timing_counter_->next();
2088 RecordQueued(queued_count, weight);
2089 return queued_count;
2090 }
2091
RecordQueued(uint64 queued_count,int weight)2092 void GPUKernelTracker::RecordQueued(uint64 queued_count, int weight) {
2093 VLOG(2) << "RecordQueued queued_count=" << queued_count
2094 << " first_available_=" << first_available_
2095 << " last_completed_=" << last_completed_
2096 << " num_pending_=" << num_pending_;
2097 pending_kernels_[first_available_].queued_count = queued_count;
2098 pending_kernels_[first_available_].weight = weight;
2099 pending_kernels_[first_available_].terminated = false;
2100 ++first_available_;
2101 num_pending_ += weight;
2102 if (first_available_ >= pending_kernels_.size()) {
2103 if (last_completed_ >= 0) {
2104 // wrap
2105 first_available_ = 0;
2106 } else {
2107 // enlarge the ring buffer
2108 pending_kernels_.resize(2 * pending_kernels_.size());
2109 }
2110 }
2111 if (first_available_ == last_completed_) {
2112 // Ring buffer is full: double it. All of the same valid PendingKernel
2113 // entries exist after the copy, they are just shifted to begin
2114 // at index 0 in the new array.
2115 std::vector<PendingKernel> new_buffer(pending_kernels_.size() * 2);
2116 for (int i = 0; i < pending_kernels_.size(); ++i) {
2117 int j = (i + last_completed_) % pending_kernels_.size();
2118 new_buffer[i] = pending_kernels_[j];
2119 }
2120 last_completed_ = 0;
2121 first_available_ = pending_kernels_.size();
2122 pending_kernels_.swap(new_buffer);
2123 VLOG(1) << "last_completed_=" << last_completed_
2124 << " first_available_=" << first_available_
2125 << " num_pending_=" << num_pending_;
2126 }
2127 DCHECK_NE(first_available_, last_completed_) << "exhausted pending_kernels";
2128 }
2129
2130 // Called by LastTerminatedCount() when new_value is equal to old_value. This
2131 // case can occur where an allocation failed and waited for memory to be freed,
2132 // then when it retried the safe allocation frontier had not advanced because no
2133 // tracking event had matured. Maybe GPU progress has stalled waiting on an i/o
2134 // event, or maybe we're tracking at too infrequent an interval. In any case if
2135 // the GPU compute queue is actually empty it's safe to advance the safe
2136 // frontier so that this request can allocate from unrestricted (and better
2137 // compacted) memory. So queue an event on the compute stream to ensure the
2138 // frontier does advance.
MaybeQueueProgressEvent()2139 void GPUKernelTracker::MaybeQueueProgressEvent() {
2140 mutex_lock l(mu_);
2141 if (num_pending_ == 0) {
2142 uint64 new_count = timing_counter_->next();
2143 RecordQueued(new_count, 1);
2144 em_->ThenExecute(stream_,
2145 [this, new_count]() { RecordTerminated(new_count); });
2146 }
2147 }
2148
RecordTerminated(uint64 queued_count)2149 void GPUKernelTracker::RecordTerminated(uint64 queued_count) {
2150 mutex_lock l(mu_);
2151 VLOG(2) << this << " RecordTerminated queued_count=" << queued_count
2152 << " first_available_=" << first_available_
2153 << " last_completed_=" << last_completed_
2154 << " num_pending_=" << num_pending_ << " LC="
2155 << ((last_completed_ >= 0)
2156 ? pending_kernels_[last_completed_].queued_count
2157 : -1);
2158 DCHECK_NE(first_available_, last_completed_);
2159 DCHECK_GT(num_pending_, 0);
2160 // Starting just past the last completed entry, find the entry with
2161 // this queued_count and mark it done.
2162 int index = (last_completed_ + 1) % pending_kernels_.size();
2163 int weight = 1;
2164 while (true) {
2165 if (index == first_available_) {
2166 // This should never happen.
2167 LOG(FATAL) << "Failed to find " << queued_count // Crash OK
2168 << " in queue, last_completed_=" << last_completed_
2169 << " index=" << index
2170 << " first_available_=" << first_available_
2171 << " pending_kernels_.size()=" << pending_kernels_.size();
2172 }
2173 if (pending_kernels_[index].queued_count == queued_count) {
2174 pending_kernels_[index].terminated = true;
2175 weight = pending_kernels_[index].weight;
2176 break;
2177 }
2178 index = (index + 1) % pending_kernels_.size();
2179 }
2180 // Next move last_completed_ forward past all completed kernels. In theory
2181 // kernels should always complete in queued order so we should be able to
2182 // advance the completed frontier to the just-completed PendingKernel. In
2183 // practice we occasionally see the termination callbacks arrive out of
2184 // order probably because of thread scheduling. Eventually we may support
2185 // out-of- order completion involving multiple compute streams so here we
2186 // follow a conservative approach and wait for every single callback to
2187 // arrive before advancing the frontier.
2188 while (true) {
2189 int next_index = (last_completed_ + 1) % pending_kernels_.size();
2190 if (next_index == first_available_) break;
2191 if (pending_kernels_[next_index].terminated) {
2192 last_completed_ = next_index;
2193 } else {
2194 break;
2195 }
2196 }
2197 if (last_completed_ >= 0) {
2198 int64_t v = pending_kernels_[last_completed_].queued_count;
2199 last_terminated_count_ = v;
2200 if (allocator_) {
2201 allocator_->SetSafeFrontier(v);
2202 }
2203 }
2204 // Last decrease num_pending before maybe waking a waiter.
2205 num_pending_ -= weight;
2206 pending_decreased_.notify_all();
2207 }
2208
2209 } // namespace tensorflow
2210
2211 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
2212