• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // TODO(opensource): Use a more generic sounding preprocessor name than
17 // GOOGLE_CUDA
18 #if (defined(GOOGLE_CUDA) && GOOGLE_CUDA) || \
19     (defined(TENSORFLOW_USE_ROCM) && TENSORFLOW_USE_ROCM)
20 
21 #if TENSORFLOW_USE_ROCM
22 #include "rocm/include/hip/hip_runtime.h"
23 #endif
24 
25 #define EIGEN_USE_GPU
26 
27 #include <stdlib.h>
28 #include <string.h>
29 
30 #include <algorithm>
31 #include <list>
32 #include <map>
33 #include <tuple>
34 #include <vector>
35 
36 #include "absl/container/flat_hash_set.h"
37 #include "absl/strings/str_split.h"
38 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
39 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
40 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
41 #include "tensorflow/core/common_runtime/device_factory.h"
42 #include "tensorflow/core/common_runtime/gpu/gpu_device.h"
43 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
44 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
45 #include "tensorflow/core/common_runtime/gpu/gpu_init.h"
46 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
47 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
48 #include "tensorflow/core/common_runtime/gpu_device_context.h"
49 #include "tensorflow/core/common_runtime/local_device.h"
50 #include "tensorflow/core/framework/allocator.h"
51 #include "tensorflow/core/framework/device_base.h"
52 #include "tensorflow/core/framework/op_kernel.h"
53 #include "tensorflow/core/framework/tensor.h"
54 #include "tensorflow/core/framework/tensor.pb.h"
55 #include "tensorflow/core/framework/types.h"
56 #include "tensorflow/core/framework/variant_op_registry.h"
57 #include "tensorflow/core/graph/types.h"
58 #include "tensorflow/core/lib/core/errors.h"
59 #include "tensorflow/core/lib/core/status.h"
60 #include "tensorflow/core/lib/strings/numbers.h"
61 #include "tensorflow/core/lib/strings/str_util.h"
62 #include "tensorflow/core/lib/strings/strcat.h"
63 #if GOOGLE_CUDA
64 #include "third_party/gpus/cudnn/cudnn.h"
65 #include "tensorflow/stream_executor/cuda/cuda_activation.h"
66 #elif TENSORFLOW_USE_ROCM
67 #include "tensorflow/core/platform/rocm.h"
68 #endif
69 #include "tensorflow/compiler/xla/stream_executor/gpu/gpu_stream.h"
70 #include "tensorflow/core/platform/fingerprint.h"
71 #include "tensorflow/core/platform/logging.h"
72 #include "tensorflow/core/platform/macros.h"
73 #include "tensorflow/core/platform/stream_executor.h"
74 #include "tensorflow/core/platform/types.h"
75 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
76 #include "tensorflow/core/profiler/lib/scoped_memory_debug_annotation.h"
77 #include "tensorflow/core/public/session_options.h"
78 #include "tensorflow/core/util/device_name_utils.h"
79 #include "tensorflow/core/util/env_var.h"
80 #include "tensorflow/core/util/stream_executor_util.h"
81 #include "tensorflow/stream_executor/platform/dso_loader.h"
82 
83 #if !defined(PLATFORM_GOOGLE)
84 #if GOOGLE_CUDA
85 #include "third_party/gpus/cuda/cuda_config.h"
86 #endif
87 #endif
88 
89 namespace tensorflow {
90 
91 #if GOOGLE_CUDA
92 
93 typedef cudaStream_t gpuStream_t;
94 typedef cudaDeviceProp gpuDeviceProp_t;
95 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
96 using se::cuda::ScopedActivateExecutorContext;
97 
98 #elif TENSORFLOW_USE_ROCM
99 
100 typedef hipStream_t gpuStream_t;
101 typedef hipDeviceProp_t gpuDeviceProp_t;
102 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
103 using se::rocm::ScopedActivateExecutorContext;
104 
105 #endif
106 
107 // Eigen Ops directly allocate memory only for temporary buffers used
108 // during OpKernel::Compute().  The recommended way of allocating such
109 // memory is via OpKernelContext::allocate_temp().  However, Eigen Ops
110 // don't have access to OpKernelContext, instead they get access to
111 // memory directly through the device allocator.  As an Open Source
112 // project, Eigen assumes allocator semantics similar to those of the
113 // CUDA or ROCm memory allocator, and may not work correctly due to race
114 // conditions if used with some other allocator.  For safety, we need
115 // to delay deallocation calls out of Eigen until all events on the
116 // corresponding stream have completed.  The following two classes
117 // serve this purpose in two different compilation environments.
118 
119 class EigenGpuStreamDevice : public ::Eigen::StreamInterface {
120  public:
EigenGpuStreamDevice()121   EigenGpuStreamDevice()
122       : scratch_(nullptr),
123         semaphore_(nullptr),
124         context_(nullptr),
125         device_(this) {}
~EigenGpuStreamDevice()126   ~EigenGpuStreamDevice() override {}
127 
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,PlatformDeviceId platform_device_id,::tensorflow::Allocator * alloc,char * scratch)128   void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
129                     PlatformDeviceId platform_device_id,
130                     ::tensorflow::Allocator* alloc, char* scratch) {
131     if (LogMemory::IsEnabled()) {
132       operation_ = context->op_kernel().name() + "/EigenAllocator";
133       step_id_ = context->step_id();
134     }
135     context_ = context;
136     scratch_ = scratch;
137     semaphore_ =
138         reinterpret_cast<unsigned int*>(scratch + Eigen::kGpuScratchSize);
139     stream_ = gpu_stream;
140     allocator_ = alloc;
141     device_prop_ = &Eigen::GetGpuDeviceProperties(platform_device_id.value());
142   }
143 
stream() const144   const gpuStream_t& stream() const override { return *stream_; }
deviceProperties() const145   const gpuDeviceProp_t& deviceProperties() const override {
146     return *device_prop_;
147   }
148 
allocate(size_t num_bytes) const149   void* allocate(size_t num_bytes) const override {
150     void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes);
151     if (ret == nullptr) {
152       if (context_) {
153         context_->SetStatus(errors::ResourceExhausted(
154             strings::StrCat("Ran out of GPU memory when allocating ", num_bytes,
155                             " bytes for ", operation_)));
156       } else {
157         LOG(FATAL)
158             << "EigenAllocator for GPU ran out of memory when allocating "
159             << num_bytes << ". See error logs for more detailed info.";
160       }
161     }
162     if (LogMemory::IsEnabled() && ret != nullptr) {
163       LogMemory::RecordRawAllocation(operation_, step_id_, num_bytes, ret,
164                                      allocator_);
165     }
166     return ret;
167   }
deallocate(void * buffer) const168   void deallocate(void* buffer) const override {
169     if (LogMemory::IsEnabled() && buffer != nullptr) {
170       LogMemory::RecordRawDeallocation(operation_, step_id_, buffer, allocator_,
171                                        true);
172     }
173     AsyncFreeData* afData =
174         new AsyncFreeData(allocator_, buffer, operation_, step_id_);
175 #if GOOGLE_CUDA
176     cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0);
177     CHECK_EQ(err, cudaSuccess);
178 #elif TENSORFLOW_USE_ROCM
179     hipError_t err = hipStreamAddCallback(*stream_, asyncFree, afData, 0);
180     CHECK_EQ(err, hipSuccess);
181 #endif
182   }
183 
184   // Return a pointer to a per stream scratchpad of 1024 bytes residing
185   // in global memory.
scratchpad() const186   void* scratchpad() const override { return scratch_; }
187 
188   // Return a semaphore. The semaphore is initially initialized to 0, and
189   // each kernel using it is responsible for resetting to 0 upon completion
190   // to maintain the invariant that the semaphore is always equal to 0 upon
191   // each kernel start.
semaphore() const192   unsigned int* semaphore() const override { return semaphore_; }
193 
device() const194   const Eigen::GpuDevice& device() const { return device_; }
195 
196  private:
197   struct AsyncFreeData {
AsyncFreeDatatensorflow::EigenGpuStreamDevice::AsyncFreeData198     AsyncFreeData(::tensorflow::Allocator* a, void* p, const string& o,
199                   const int64_t s)
200         : allocator_(a), address_(p), operation_(o), step_id_(s) {}
201     ::tensorflow::Allocator* allocator_;
202     void* address_;
203     const string operation_;
204     const int64_t step_id_;
205   };
206 
207 #if GOOGLE_CUDA
asyncFree(gpuStream_t stream,cudaError_t status,void * userData)208   static void CUDART_CB asyncFree(gpuStream_t stream, cudaError_t status,
209                                   void* userData)
210 #elif TENSORFLOW_USE_ROCM
211   static void asyncFree(gpuStream_t stream, hipError_t status, void* userData)
212 #endif
213   {
214     AsyncFreeData* data = static_cast<AsyncFreeData*>(userData);
215     if (LogMemory::IsEnabled()) {
216       LogMemory::RecordRawDeallocation(data->operation_, data->step_id_,
217                                        data->address_, data->allocator_, false);
218     }
219     data->allocator_->DeallocateRaw(data->address_);
220     delete data;
221   }
222 
223   string operation_;
224   int64_t step_id_;
225   const gpuStream_t* stream_;           // Not owned.
226   const gpuDeviceProp_t* device_prop_;  // Not owned.
227   ::tensorflow::Allocator* allocator_;  // Not owned.
228   mutable char* scratch_;
229   mutable unsigned int* semaphore_;
230   OpKernelContext* context_;
231   Eigen::GpuDevice device_;
232 
233   TF_DISALLOW_COPY_AND_ASSIGN(EigenGpuStreamDevice);
234 };
235 
236 // This factory helps to ensure that different GPU device objects that refer to
237 // the same physical device and stream group id use the same stream group
238 // object (and therefore the same CUDA streams). This is necessary since there
239 // is a single memory allocator per device (see ProcessState::GetGPUAllocator)
240 // and allocators must not be shared across streams.
241 class BaseGPUDevice::StreamGroupFactory {
242  public:
243   // Returns the unique stream group for use with the stream defined by
244   // {tf_device_id, stream_group_within_gpu}, creating it if it does not yet
245   // exist.
246   // This function is thread safe.
GetOrCreate(TfDeviceId tf_device_id,int stream_group_within_gpu,se::StreamExecutor * executor,const GPUOptions & options)247   BaseGPUDevice::StreamGroup* GetOrCreate(TfDeviceId tf_device_id,
248                                           int stream_group_within_gpu,
249                                           se::StreamExecutor* executor,
250                                           const GPUOptions& options) {
251     mutex_lock guard(lock_);
252     StreamGroup* group =
253         &streams_[key_type(tf_device_id.value(), stream_group_within_gpu)];
254     if (!group->compute) {
255       int priority = GetPriority(tf_device_id.value(), options);
256       group->priority = priority;
257       group->compute = GetStream(executor, priority);
258       group->compute->Init();
259       VLOG(2) << "Created stream[" << stream_group_within_gpu
260               << "] = " << group->compute << " with priority: " << priority;
261 
262 #if TENSORFLOW_USE_ROCM
263       // ROCm streams are lightweight and will not necessarily trigger device
264       // queue init until they are first used. For optimal performance,
265       // compute and nccl streams must be immediate siblings.
266       group->nccl = GetStream(executor, priority);
267       group->nccl->Init();
268       VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
269               << "] = " << group->nccl;
270 
271       // Force underlying resource creation now.
272       group->compute->ThenWaitFor(group->nccl);
273       group->nccl->ThenWaitFor(group->compute);
274 #endif
275 
276       group->host_to_device = GetStream(executor, priority);
277       group->host_to_device->Init();
278       VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
279               << "] = " << group->host_to_device;
280 
281       group->device_to_host = GetStream(executor, priority);
282       group->device_to_host->Init();
283       VLOG(2) << "Created device_to_host_stream[" << stream_group_within_gpu
284               << "] = " << group->device_to_host;
285 
286       int num_d2d_streams =
287           options.experimental().num_dev_to_dev_copy_streams();
288       if (num_d2d_streams == 0) num_d2d_streams = 1;
289       if (num_d2d_streams < 1 || num_d2d_streams > 4) {
290         LOG(ERROR)
291             << "Illegal GPUOptions.experimental.num_dev_to_dev_copy_streams="
292             << num_d2d_streams << " set to 1 instead.";
293         num_d2d_streams = 1;
294       }
295       for (int i = 0; i < num_d2d_streams; ++i) {
296         se::Stream* stream = GetStream(executor, priority);
297         stream->Init();
298         group->device_to_device.push_back(stream);
299         VLOG(2) << "Created device_to_device_stream[" << stream_group_within_gpu
300                 << "] = " << group->device_to_device.back();
301       }
302     }
303     return group;
304   }
305 
306   // Returns a reference to the StreamGroupFactory singleton. Note that this is
307   // never destroyed, so the objects it owns are never deleted.
Global()308   static StreamGroupFactory& Global() {
309     static StreamGroupFactory* instance = new StreamGroupFactory();
310     return *instance;
311   }
312 
313   // Helper method for unit tests to reset the streams. Never use in production.
TestOnlyReset()314   void TestOnlyReset() {
315     mutex_lock guard(lock_);
316     for (auto& item : streams_) {
317       auto& stream = item.second;
318       if (stream.compute) {
319         delete stream.compute;
320         stream.compute = nullptr;
321       }
322 #if TENSORFLOW_USE_ROCM
323       if (stream.nccl) {
324         delete stream.nccl;
325         stream.nccl = nullptr;
326       }
327 #endif
328       if (stream.host_to_device) {
329         delete stream.host_to_device;
330         stream.host_to_device = nullptr;
331       }
332       if (stream.device_to_host) {
333         delete stream.device_to_host;
334         stream.device_to_host = nullptr;
335       }
336       while (!stream.device_to_device.empty()) {
337         auto back = stream.device_to_device.back();
338         if (back) {
339           delete back;
340         }
341         stream.device_to_device.pop_back();
342       }
343     }
344     streams_.clear();
345   }
346 
347  private:
348   // Returns priority for the given virtual GPU id from the session options.
349   // Returns 0 if no virtual devices are specified.
GetPriority(int tf_device_id,const GPUOptions & options)350   int GetPriority(int tf_device_id, const GPUOptions& options) {
351     int id = tf_device_id;
352     int i = 0;
353     int priority = 0;
354     while (i < options.experimental().virtual_devices_size()) {
355       const int size =
356           options.experimental().virtual_devices().Get(i).priority_size();
357       if (id >= size) {
358         id -= size;
359       } else {
360         priority =
361             options.experimental().virtual_devices().Get(i).priority().Get(id);
362         break;
363       }
364       i++;
365     }
366     return priority;
367   }
368 
369   // Returns a Stream with the underlying GPUStream with the given priority.
GetStream(se::StreamExecutor * executor,int priority)370   se::Stream* GetStream(se::StreamExecutor* executor, int priority) {
371     auto stream = new se::Stream(executor);
372     static_cast<stream_executor::gpu::GpuStream*>(stream->implementation())
373         ->SetPriority(priority);
374     return stream;
375   }
376 
377   mutex lock_;
378   using key_type = std::tuple<int, int>;
379   std::map<key_type, StreamGroup> streams_;
380 
381   // StreamGroupFactory cannot be created directly; Call
382   // StreamGroupFactory::Global() to get the global instance.
383   StreamGroupFactory() = default;
384   TF_DISALLOW_COPY_AND_ASSIGN(StreamGroupFactory);
385 };
386 
BaseGPUDevice(const SessionOptions & options,const string & name,Bytes memory_limit,const DeviceLocality & locality,TfDeviceId tf_device_id,const string & physical_device_desc,Allocator * gpu_allocator,Allocator * cpu_allocator,bool sync_every_op)387 BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
388                              Bytes memory_limit, const DeviceLocality& locality,
389                              TfDeviceId tf_device_id,
390                              const string& physical_device_desc,
391                              Allocator* gpu_allocator, Allocator* cpu_allocator,
392                              bool sync_every_op)
393     : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
394                                                          memory_limit, locality,
395                                                          physical_device_desc)),
396       gpu_allocator_(gpu_allocator),
397       cpu_allocator_(cpu_allocator),
398       scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
399       tf_device_id_(tf_device_id),
400       sync_every_op_(sync_every_op) {
401   // XLA device IDs for GPUs are arbitrary but must be unique, so we hash device
402   // names (which include a replica index even for multi-client).
403   set_xla_global_id(Fingerprint32(name) % std::numeric_limits<int32_t>::max());
404   GPUProcessState::singleton()->EnableGPUDevice();
405 }
406 
~BaseGPUDevice()407 BaseGPUDevice::~BaseGPUDevice() {
408   delete accelerator_device_info_;
409   if (scratch_) gpu_allocator_->DeallocateRaw(scratch_);
410   device_context_->Unref();
411 }
412 
413 // This should be idempotent if already initialized.
InitScratchBuffers()414 Status BaseGPUDevice::InitScratchBuffers() {
415   mutex_lock l(scratch_init_mutex_);
416   if (!scratch_) {
417     DCHECK(stream_);
418     size_t scratch_buffer_size = Eigen::kGpuScratchSize + sizeof(unsigned int);
419     profiler::ScopedMemoryDebugAnnotation op_annotation("ScratchBuffer");
420     void* scratch_buffer = gpu_allocator_->AllocateRaw(
421         Allocator::kAllocatorAlignment, scratch_buffer_size);
422     if (scratch_buffer == nullptr) {
423       return errors::FailedPrecondition(
424           "Failed to allocate scratch buffer for device ",
425           tf_device_id_.value());
426     }
427     se::DeviceMemory<char> mem(
428         se::DeviceMemoryBase(scratch_buffer, scratch_buffer_size));
429     TF_RETURN_IF_ERROR(executor_->SynchronousMemZero(
430         &mem, Eigen::kGpuScratchSize + sizeof(unsigned int)));
431     scratch_ = static_cast<char*>(scratch_buffer);
432   }
433   return OkStatus();
434 }
435 
Init(const SessionOptions & options)436 Status BaseGPUDevice::Init(const SessionOptions& options) {
437   auto executor_status = DeviceIdUtil::ExecutorForTfDeviceId(
438       DEVICE_GPU, GPUMachineManager(), tf_device_id_);
439   if (!executor_status.status().ok()) {
440     return errors::Internal("Failed to get StreamExecutor for device ",
441                             tf_device_id_.value());
442   }
443 
444   executor_ = executor_status.ValueOrDie();
445 
446   stream_ = StreamGroupFactory::Global().GetOrCreate(
447       tf_device_id_, 0, executor_, options.config.gpu_options());
448 
449   // Get an allocator that allocates pinned memory on host.
450   AllocatorAttributes attr;
451   attr.set_on_host(true);
452   attr.set_gpu_compatible(true);
453   Allocator* host_memory_allocator = GetAllocator(attr);
454 
455   device_context_ =
456       new GPUDeviceContext(0, stream_->compute,
457 #if TENSORFLOW_USE_ROCM
458                            stream_->nccl,
459 #endif
460                            stream_->host_to_device, stream_->device_to_host,
461                            stream_->device_to_device, host_memory_allocator);
462 
463   em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
464                                                   options.config.gpu_options());
465 
466   GPUKernelTracker::Params tracker_params(
467       options.config.gpu_options().experimental().kernel_tracker_max_interval(),
468       options.config.gpu_options().experimental().kernel_tracker_max_bytes(),
469       options.config.gpu_options().experimental().kernel_tracker_max_pending());
470   timestamped_allocator_ =
471       options.config.gpu_options().experimental().timestamped_allocator();
472   pending_cap_ = tracker_params.max_pending;
473   if (timestamped_allocator_ ||
474       (tracker_params.max_interval > 0 || tracker_params.max_bytes > 0 ||
475        tracker_params.max_pending > 0)) {
476     SharedCounter* timing_counter = nullptr;
477     if (timestamped_allocator_) {
478       // In this case the SharedCounter was already created and set in the
479       // associated Allocator, with ownership by GPUProcessState.
480       // The GPUKernelTracker will use this SharedCounter, instead of
481       // owning its own.
482       timing_counter =
483           GPUProcessState::singleton()->GPUAllocatorCounter(tf_device_id_);
484       DCHECK(timing_counter);
485     }
486     kernel_tracker_.reset(new GPUKernelTracker(
487         tracker_params, Env::Default(), stream_->compute, timing_counter,
488         timestamped_allocator_ ? gpu_allocator_ : nullptr, em_));
489   }
490 
491   accelerator_device_info_ = new DeviceBase::AcceleratorDeviceInfo;
492   accelerator_device_info_->stream = stream_->compute;
493   accelerator_device_info_->default_context = device_context_;
494   accelerator_device_info_->event_mgr = em_;
495   PlatformDeviceId platform_device_id;
496   TF_RETURN_IF_ERROR(
497       GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id));
498   accelerator_device_info_->gpu_id = platform_device_id.value();
499   set_tensorflow_accelerator_device_info(accelerator_device_info_);
500 
501   // Whether and how the GPU device uses its own threadpool.
502   // This option is experimental. Once we confirm the best setting, we
503   // may change the default behavior and completely remove this flag.
504   // Default values might change in future releases.
505   // Possible values:
506   //   * global: GPU uses threads shared with CPU in the main compute
507   //          thread-pool. This is currently the default.
508   //   * gpu_private: GPU uses threads dedicated to this device.
509   //   * gpu_shared: All GPUs share a dedicated thread pool.
510   string gpu_thread_mode;
511   TF_RETURN_IF_ERROR(
512       ReadStringFromEnvVar("TF_GPU_THREAD_MODE", "global", &gpu_thread_mode));
513   gpu_thread_mode = absl::AsciiStrToLower(gpu_thread_mode);
514   if (gpu_thread_mode != "global") {
515     int64_t gpu_thread_count = -1;
516     // Default to two threads. One for device compute and another for memory
517     // copies.
518     TF_RETURN_IF_ERROR(
519         ReadInt64FromEnvVar("TF_GPU_THREAD_COUNT", 2, &gpu_thread_count));
520     if (gpu_thread_mode == "gpu_private") {
521       // TODO(zhengxq): since these threads only serve a single GPU device,
522       //   we should set the device context once for each thread, and avoid
523       //   setting them for each kernel.
524       // TODO(zhengxq): pin the thread to the same socket of the target GPU.
525       thread_pool_.reset(new thread::ThreadPool(
526           options.env, ThreadOptions(),
527           strings::StrCat("gpu_private_", tf_device_id_.value()),
528           static_cast<int32>(gpu_thread_count),
529           !options.config.experimental().disable_thread_spinning(),
530           /*allocator=*/nullptr));
531       set_tensorflow_device_thread_pool(thread_pool_.get());
532     } else if (gpu_thread_mode == "gpu_shared") {
533       static thread::ThreadPool* thread_pool = new thread::ThreadPool(
534           options.env, ThreadOptions(), "gpu_shared",
535           static_cast<int32>(gpu_thread_count),
536           !options.config.experimental().disable_thread_spinning(),
537           /*allocator=*/nullptr);
538       set_tensorflow_device_thread_pool(thread_pool);
539     } else {
540       string error_message =
541           strings::StrCat("Invalid gpu_thread_mode: ", gpu_thread_mode);
542       LOG(WARNING) << error_message;
543       return errors::InvalidArgument(error_message);
544     }
545   }
546 
547   TF_ASSIGN_OR_RETURN(
548       node_file_writer_,
549       NodeFileWriter::GetNodeFileWriterIfEnabled(name(), env()));
550   if (node_file_writer_) {
551     LOG(INFO) << "Writing NodeDefs to file: " << node_file_writer_->filename();
552   }
553 
554   return OkStatus();
555 }
556 
ComputeOpKernelDebugString(const OpKernel & op_kernel,const int & stream_id)557 string BaseGPUDevice::ComputeOpKernelDebugString(const OpKernel& op_kernel,
558                                                  const int& stream_id) {
559   return strings::StrCat(op_kernel.name(), " op ", op_kernel.type_string(),
560                          " on GPU ", tf_device_id_.value(), " stream[",
561                          stream_id, "]");
562 }
563 
564 namespace {
GetOpsToLogFromEnv()565 const absl::flat_hash_set<std::string>* GetOpsToLogFromEnv() {
566   auto* result = new absl::flat_hash_set<std::string>;
567   const char* env = getenv("TF_GPU_DEBUG_OPS_TO_LOG");
568   if (!env) {
569     return result;
570   }
571 
572   std::vector<absl::string_view> ops = absl::StrSplit(env, ',');
573   LOG(INFO) << "Will log inputs & outputs from the following ops: ";
574   for (absl::string_view op : ops) {
575     result->insert(std::string(op));
576     LOG(INFO) << "  |" << op << "|";
577   }
578 
579   return result;
580 }
581 
ShouldLogInputsAndOutputs(OpKernel * op_kernel)582 bool ShouldLogInputsAndOutputs(OpKernel* op_kernel) {
583   static const absl::flat_hash_set<std::string>& ops_to_log =
584       *GetOpsToLogFromEnv();
585   return ops_to_log.count(op_kernel->type_string());
586 }
587 }  // namespace
588 
CopyGpuTensorToHostDebugOnly(const Tensor & gpu_tensor)589 Tensor BaseGPUDevice::CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor) {
590   Tensor host_tensor(gpu_tensor.dtype(), gpu_tensor.shape());
591   CHECK(device_context_->stream()
592             ->ThenMemcpy(host_tensor.data(),
593                          se::DeviceMemoryBase(gpu_tensor.data(),
594                                               gpu_tensor.TotalBytes()),
595                          gpu_tensor.TotalBytes())
596             .BlockHostUntilDone()
597             .ok());
598   return host_tensor;
599 }
600 
LogInputs(OpKernel * op_kernel,OpKernelContext * context)601 void BaseGPUDevice::LogInputs(OpKernel* op_kernel, OpKernelContext* context) {
602   LOG(INFO) << "Inputs for " << op_kernel->name() << " (total "
603             << context->num_inputs() << "):";
604   for (int i = 0; i < context->num_inputs(); i++) {
605     if (!context->has_input(i)) {
606       LOG(INFO) << "input # " << i << " is absent";
607       continue;
608     }
609 
610     Tensor input = context->input_memory_type(i) == DEVICE_MEMORY
611                        ? CopyGpuTensorToHostDebugOnly(context->input(i))
612                        : context->input(i);
613 
614     LOG(INFO) << "input # " << i;
615     LOG(INFO) << input.DebugString(-1);
616   }
617   LOG(INFO) << "";
618 }
619 
LogOutputs(OpKernel * op_kernel,OpKernelContext * context)620 void BaseGPUDevice::LogOutputs(OpKernel* op_kernel, OpKernelContext* context) {
621   if (!context->status().ok()) {
622     LOG(INFO) << op_kernel->name()
623               << " failed: " << context->status().error_message();
624     return;
625   }
626 
627   LOG(INFO) << "Outputs for " << op_kernel->name() << " (total "
628             << context->num_inputs() << "):";
629   for (int i = 0; i < context->num_outputs(); i++) {
630     Tensor* output_ptr = context->mutable_output(i);
631     if (output_ptr == nullptr) {
632       LOG(INFO) << "output # " << i << " is null";
633       continue;
634     }
635 
636     Tensor output = context->output_memory_type(i) == DEVICE_MEMORY
637                         ? CopyGpuTensorToHostDebugOnly(*output_ptr)
638                         : *output_ptr;
639 
640     LOG(INFO) << "output # " << i;
641     LOG(INFO) << output.DebugString(-1);
642   }
643   LOG(INFO) << "";
644 }
645 
Compute(OpKernel * op_kernel,OpKernelContext * context)646 void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
647   // NOTE(tucker): We need to discriminate between Eigen GPU
648   // operations and all others.  If an operation is Eigen
649   // implemented (or otherwise tries to launch a GPU kernel
650   // directly), we need to establish a stacked-scoped environment
651   // that directs it to execute on the proper device.  Otherwise we
652   // expect the Op to use StreamExecutor directly and correctly.
653   GPUDeviceContext* gpu_device_context = device_context_;
654   if (context->op_device_context() != nullptr) {
655     gpu_device_context =
656         static_cast<GPUDeviceContext*>(context->op_device_context());
657   }
658   se::Stream* stream = gpu_device_context->stream();
659   const auto stream_id = gpu_device_context->stream_id();
660 
661   const bool vlog_1 = VLOG_IS_ON(1);
662 
663   if (vlog_1) {
664     VLOG(1) << "GpuDevice::ComputeHelper "
665             << ComputeOpKernelDebugString(*op_kernel, stream_id);
666   }
667 
668   if (kernel_tracker_.get()) {
669     context->set_record_memory_consumption(true);
670     if (pending_cap_ > 0) {
671       kernel_tracker_->PauseWhilePendingExceeds(pending_cap_);
672     }
673   }
674   ScopedActivateExecutorContext scoped_activation{stream->parent()};
675   profiler::ScopedMemoryDebugAnnotation op_annotation(
676       op_kernel->name_view().data(), context->step_id());
677   bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
678 
679   if (should_log_inputs_and_outputs) {
680     LogInputs(op_kernel, context);
681   }
682 
683   op_kernel->Compute(context);
684 
685   if (should_log_inputs_and_outputs) {
686     LogOutputs(op_kernel, context);
687   }
688 
689   if (context->status().ok()) {
690     if (sync_every_op_) {
691       // Note: GPUUtil::Sync() only syncs the default stream.
692       // We need to either sync the stream used by this op, or
693       // all streams.  Given that this flag is typically used for
694       // debugging it makes more sense to sync all GPU activity.
695       context->SetStatus(GPUUtil::SyncAll(this));
696       if (vlog_1) {
697         VLOG(1) << "GpuDevice::ComputeHelper finished "
698                 << ComputeOpKernelDebugString(*op_kernel, stream_id);
699       }
700     } else if (vlog_1) {
701       VLOG(1) << "GpuDevice::ComputeHelper scheduled "
702               << ComputeOpKernelDebugString(*op_kernel, stream_id);
703     }
704     if (kernel_tracker_) {
705       GPUKernelTracker* tracker = kernel_tracker_.get();
706       DCHECK(tracker);
707       uint64 queued_count = tracker->MaybeQueue(context);
708       if (queued_count > 0) {
709         em_->ThenExecute(stream, [tracker, queued_count]() {
710           tracker->RecordTerminated(queued_count);
711         });
712       }
713     }
714     if (node_file_writer_) {
715       Status s = node_file_writer_->RecordNodeExecution(op_kernel, context);
716       if (!s.ok()) {
717         LOG(ERROR) << s;
718         context->SetStatus(s);
719       }
720     }
721   } else {
722     if (vlog_1) {
723       VLOG(1) << "GpuDevice::ComputeHelper failed to schedule "
724               << ComputeOpKernelDebugString(*op_kernel, stream_id);
725     }
726   }
727 }
728 
Sync()729 Status BaseGPUDevice::Sync() {
730   DCHECK_NE(stream_, nullptr);
731 
732   // Device::Sync is supposed to block until all operations queued on the device
733   // at the time of the call have completed.  On GPUs, only operations enqueued
734   // on the compute stream can remain pending after the (Async)OpKernel that
735   // enqueued the operation has completed.  We do use other streams for copies
736   // and collectives, but in those cases the (Async)OpKernels themselves block
737   // until the queued operation has finished.
738   return stream_->compute->BlockHostUntilDone();
739 }
740 
ComputeAsync(AsyncOpKernel * op_kernel,OpKernelContext * context,AsyncOpKernel::DoneCallback done)741 void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel,
742                                  OpKernelContext* context,
743                                  AsyncOpKernel::DoneCallback done) {
744   GPUDeviceContext* gpu_device_context = device_context_;
745   if (context->op_device_context() != nullptr) {
746     gpu_device_context =
747         static_cast<GPUDeviceContext*>(context->op_device_context());
748   }
749   se::Stream* stream = gpu_device_context->stream();
750   const auto stream_id = gpu_device_context->stream_id();
751 
752   VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op "
753           << op_kernel->type_string() << " on GPU" << tf_device_id_
754           << " stream[" << stream_id << "]";
755 
756   bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
757 
758   if (should_log_inputs_and_outputs) {
759     LogInputs(op_kernel, context);
760     AsyncOpKernel::DoneCallback parent_done = done;
761     done = [this, parent_done, should_log_inputs_and_outputs, op_kernel,
762             context]() {
763       LogOutputs(op_kernel, context);
764       parent_done();
765     };
766   }
767 
768   ScopedActivateExecutorContext scoped_activation{stream->parent()};
769   op_kernel->ComputeAsync(context, std::move(done));
770 }
771 
MaybeCopyTensorToGPU(const AllocatorAttributes & alloc_attrs,const Tensor & from,Tensor * to,StatusCallback done)772 Status BaseGPUDevice::MaybeCopyTensorToGPU(
773     const AllocatorAttributes& alloc_attrs, const Tensor& from, Tensor* to,
774     StatusCallback done) {
775   if (alloc_attrs.on_host()) {
776     *to = from;
777     done(OkStatus());
778     return OkStatus();
779   } else {
780     if (!DMAHelper::CanUseDMA(&from)) {
781       Status err = errors::Internal("GPU copy from non-DMA ",
782                                     DataTypeString(from.dtype()), " tensor");
783       done(err);
784       return err;
785     }
786     AllocationAttributes allocation_attr;
787     uint64 safe_alloc_frontier = 0;
788     std::function<uint64()> freed_by_func = [this, &safe_alloc_frontier]() {
789       safe_alloc_frontier = SafeAllocFrontier(safe_alloc_frontier);
790       return safe_alloc_frontier;
791     };
792     if (timestamped_allocator_) {
793       allocation_attr.freed_by_func = &freed_by_func;
794     }
795     auto* copy = new Tensor(GetAllocator(alloc_attrs), from.dtype(),
796                             from.shape(), allocation_attr);
797 
798     // If the tensor is not initialized, we likely ran out of memory.
799     if (!copy->IsInitialized()) {
800       delete copy;
801       Status err = errors::ResourceExhausted(
802           "OOM when allocating tensor of shape ", from.shape().DebugString(),
803           " and type ", DataTypeString(from.dtype()));
804       done(err);
805       return err;
806     }
807 
808     auto wrapped_done = [to, copy, done = std::move(done)](const Status& s) {
809       if (s.ok()) {
810         *to = std::move(*copy);
811       }
812       delete copy;
813       done(s);
814     };
815 
816     profiler::ScopedAnnotation annotation("MakeTensorFromProto");
817     device_context_->CopyCPUTensorToDevice(
818         &from, this, copy, std::move(wrapped_done),
819         !timestamped_allocator_ /*sync_dst_compute*/);
820     return OkStatus();
821   }
822 }
823 
MakeTensorFromProto(const TensorProto & tensor_proto,const AllocatorAttributes alloc_attrs,Tensor * tensor)824 Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto,
825                                           const AllocatorAttributes alloc_attrs,
826                                           Tensor* tensor) {
827   AllocatorAttributes attr;
828   attr.set_on_host(true);
829   attr.set_gpu_compatible(true);
830   Allocator* host_alloc = GetAllocator(attr);
831   Tensor parsed(tensor_proto.dtype());
832   if (!parsed.FromProto(host_alloc, tensor_proto)) {
833     return errors::InvalidArgument("Cannot parse tensor from proto: ",
834                                    tensor_proto.DebugString());
835   }
836 
837   profiler::ScopedMemoryDebugAnnotation op_annotation(
838       "MakeTensorFromProto", "dynamic", parsed.dtype(),
839       [&parsed]() { return parsed.shape().DebugString(); });
840   if (parsed.dtype() == DT_VARIANT) {
841     const Variant* from = parsed.flat<Variant>().data();
842     int numa_node = attributes().locality().numa_node();
843     Tensor copy(cpu_allocator(numa_node), DT_VARIANT, parsed.shape());
844     Variant* copy_variant = copy.flat<Variant>().data();
845 
846     std::list<Notification> notifications;
847     Status copy_status;
848     auto copier = [this, &alloc_attrs, &notifications, &copy_status](
849                       const Tensor& from, Tensor* to) {
850       // Copier isn't run in a multithreaded environment, so we don't
851       // have to worry about the notifications list being modified in parallel.
852       notifications.emplace_back();
853       Notification& n = *notifications.rbegin();
854       return MaybeCopyTensorToGPU(alloc_attrs, from, to,
855                                   [&n, &copy_status](const Status& s) {
856                                     if (copy_status.ok()) {
857                                       copy_status.Update(s);
858                                     }
859                                     n.Notify();
860                                   });
861     };
862     Status s;
863     for (int64_t ix = 0; ix < parsed.NumElements(); ++ix) {
864       s = VariantDeviceCopy(VariantDeviceCopyDirection::HOST_TO_DEVICE,
865                             from[ix], &copy_variant[ix], copier);
866       if (!s.ok()) {
867         break;
868       }
869     }
870     for (auto& n : notifications) {
871       n.WaitForNotification();
872     }
873     if (!s.ok()) {
874       return s;
875     }
876     *tensor = std::move(copy);
877     return copy_status;
878   } else {
879     Notification n;
880     Status status;
881     TF_RETURN_IF_ERROR(MaybeCopyTensorToGPU(alloc_attrs, parsed, tensor,
882                                             [&n, &status](const Status& s) {
883                                               status = s;
884                                               n.Notify();
885                                             }));
886     n.WaitForNotification();
887     return status;
888   }
889 }
890 
CopyTensorInSameDevice(const Tensor * input_tensor,Tensor * output_tensor,const DeviceContext * device_context,StatusCallback done)891 void BaseGPUDevice::CopyTensorInSameDevice(const Tensor* input_tensor,
892                                            Tensor* output_tensor,
893                                            const DeviceContext* device_context,
894                                            StatusCallback done) {
895   GPUUtil::CopyGPUTensorToSameGPU(static_cast<Device*>(this), device_context,
896                                   input_tensor, output_tensor, std::move(done));
897 }
898 
ConcretePerOpGpuDevice()899 ConcretePerOpGpuDevice::ConcretePerOpGpuDevice()
900     : stream_device_(std::make_unique<EigenGpuStreamDevice>()) {}
901 
Reinitialize(OpKernelContext * context,const void * gpu_stream,TfDeviceId tf_device_id,Allocator * base_allocator,char * scratch)902 void ConcretePerOpGpuDevice::Reinitialize(OpKernelContext* context,
903                                           const void* gpu_stream,
904                                           TfDeviceId tf_device_id,
905                                           Allocator* base_allocator,
906                                           char* scratch) {
907   PlatformDeviceId platform_device_id;
908   TF_CHECK_OK(
909       GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
910   static_cast<EigenGpuStreamDevice*>(stream_device_.get())
911       ->Reinitialize(context, static_cast<const gpuStream_t*>(gpu_stream),
912                      platform_device_id, base_allocator, scratch);
913 }
914 
Reinitialize(OpKernelContext * context,const void * gpu_stream,PlatformDeviceId platform_device_id,Allocator * base_allocator,char * scratch)915 void ConcretePerOpGpuDevice::Reinitialize(OpKernelContext* context,
916                                           const void* gpu_stream,
917                                           PlatformDeviceId platform_device_id,
918                                           Allocator* base_allocator,
919                                           char* scratch) {
920   static_cast<EigenGpuStreamDevice*>(stream_device_.get())
921       ->Reinitialize(context, static_cast<const gpuStream_t*>(gpu_stream),
922                      platform_device_id, base_allocator, scratch);
923 }
924 
device() const925 const Eigen::GpuDevice& ConcretePerOpGpuDevice::device() const {
926   return static_cast<EigenGpuStreamDevice*>(stream_device_.get())->device();
927 }
928 
929 namespace {
930 
VerifyVirtualDeviceSettings(const size_t num_gpus_to_use,const GPUOptions & gpu_options,const std::vector<PlatformDeviceId> & visible_gpu_order,const std::vector<PlatformDeviceId> & valid_platform_device_ids,const std::map<int,std::pair<int,int>> & supported_priority_ranges)931 Status VerifyVirtualDeviceSettings(
932     const size_t num_gpus_to_use, const GPUOptions& gpu_options,
933     const std::vector<PlatformDeviceId>& visible_gpu_order,
934     const std::vector<PlatformDeviceId>& valid_platform_device_ids,
935     const std::map<int, std::pair<int, int>>& supported_priority_ranges) {
936   const auto& virtual_devices = gpu_options.experimental().virtual_devices();
937   CHECK(!virtual_devices.empty());
938   if (gpu_options.per_process_gpu_memory_fraction() > 0) {
939     return errors::InvalidArgument(
940         "It's invalid to set per_process_gpu_memory_fraction when "
941         "virtual_devices is set.");
942   }
943   if (num_gpus_to_use < virtual_devices.size()) {
944     return errors::Unknown(
945         "Not enough GPUs to create virtual devices."
946         " num_gpus_to_use: ",
947         num_gpus_to_use, " #virtual_devices: ", virtual_devices.size());
948   }
949   if (!gpu_options.visible_device_list().empty() &&
950       visible_gpu_order.size() != virtual_devices.size()) {
951     return errors::InvalidArgument(
952         "The number of GPUs in visible_device_list doesn't match the number "
953         "of elements in the virtual_devices list.",
954         " #GPUs in visible_device_list: ", visible_gpu_order.size(),
955         " virtual_devices.size(): ", virtual_devices.size());
956   }
957   if (valid_platform_device_ids.size() != virtual_devices.size()) {
958     return errors::Unknown(
959         "The number of valid GPUs doesn't match the number of elements in "
960         "the virtual_devices list.",
961         " #valid GPUs: ", valid_platform_device_ids.size(),
962         " virtual_devices.size(): ", virtual_devices.size());
963   }
964   for (int i = 0; i < virtual_devices.size(); ++i) {
965     // Compares against the first virtual_device list.
966     if (virtual_devices.Get(0).device_ordinal().empty() !=
967         virtual_devices.Get(i).device_ordinal().empty()) {
968       return errors::InvalidArgument(
969           "Device ordinals must be set for all virtual devices or none. But "
970           "the device_ordinal is specified for ",
971           i, " while previous devices didn't have any set.");
972     }
973   }
974   if (!virtual_devices.Get(0).device_ordinal().empty()) {
975     for (int i = 0; i < virtual_devices.size(); ++i) {
976       const size_t memory_limit_mb_size =
977           virtual_devices.Get(i).memory_limit_mb().size();
978       const size_t device_ordinal_size =
979           virtual_devices.Get(i).device_ordinal().size();
980       if (memory_limit_mb_size != device_ordinal_size) {
981         return errors::InvalidArgument(
982             "Number of virtual device ordinals specified doesn't "
983             "match with number of memory_limit_mb specified for GPU# ",
984             i, " memory_limit_mb size: ", memory_limit_mb_size,
985             " and device_ordinal size: ", device_ordinal_size);
986       }
987     }
988   }
989 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
990   // Check memory_limt_mb and priority sizes match if priority is non-empty.
991   bool priority_exists = !virtual_devices.Get(0).priority().empty();
992   for (int i = 0; i < virtual_devices.size(); ++i) {
993     const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
994     const auto& priority = virtual_devices.Get(i).priority();
995     // If the priority is empty in the first one then treat this as having no
996     // priority set in any of the virtual devices for backward compatibility.
997     // Either it's set for all or none.
998     if (!priority_exists) {
999       if (!priority.empty()) {
1000         return errors::InvalidArgument(
1001             "Priority must be set for all virtual devices or none. But the "
1002             "priority is specified for ",
1003             i,
1004             " while previous devices didn't "
1005             "have any set.");
1006       }
1007     }
1008     if (priority_exists && memory_limit_mb.size() != priority.size()) {
1009       return errors::InvalidArgument(
1010           "Number of virtual device priorities specified doesn't "
1011           "match with number of memory_limit_mb specified for GPU# ",
1012           i, " memory_limit_mb size: ", memory_limit_mb.size(),
1013           " and priority size: ", priority.size());
1014     }
1015     const int gpu_id = valid_platform_device_ids[i].value();
1016     auto it = supported_priority_ranges.find(gpu_id);
1017     if (it == supported_priority_ranges.end()) {
1018       return errors::Internal(
1019           "Failed to find supported priority range for GPU"
1020           " device ",
1021           gpu_id);
1022     }
1023     const std::pair<int, int>& priority_range = it->second;
1024     for (int p : priority) {
1025       if (p > priority_range.first || p < priority_range.second) {
1026         return errors::InvalidArgument(
1027             "Priority ", p,
1028             " is outside the range of supported priorities "
1029             "[",
1030             priority_range.second, ",", priority_range.first,
1031             "] for virtual device ", i, " on GPU# ", gpu_id);
1032       }
1033     }
1034   }
1035 #endif
1036 
1037   return OkStatus();
1038 }
1039 
MinSystemMemory(int64_t available_memory,int cc_major)1040 int64_t MinSystemMemory(int64_t available_memory, int cc_major) {
1041   // We use the following heuristic for now:
1042   //
1043   // If the available_memory is < 2GiB, we allocate 225MiB to system memory.
1044   // Otherwise, depending on the capability version assign
1045   //  500MiB (for cuda_compute_capability <= 6.x) or
1046   // 1050MiB (for cuda_compute_capability <= 7.x) or
1047   // 1536MiB (for cuda_compute_capability >= 8.x)
1048   int64_t min_system_memory;
1049   if (available_memory < (1LL << 31)) {
1050     min_system_memory = 225 * 1024 * 1024;
1051   } else {
1052     if (cc_major <= 6) {
1053       min_system_memory = 500 * 1024 * 1024;
1054     } else if (cc_major <= 7) {
1055       min_system_memory = 1050 * 1024 * 1024;
1056     } else {
1057       min_system_memory = 1536 * 1024 * 1024;
1058     }
1059   }
1060 #if defined(__GNUC__) && defined(__OPTIMIZE__)
1061 // Do nothing
1062 #elif !defined(__GNUC__) && defined(NDEBUG)
1063 // Do nothing
1064 #else
1065   // Double the amount of available GPU memory in non-opt builds (debug
1066   // builds in windows); because in non-opt builds more system memory
1067   // is necessary.
1068   min_system_memory *= 2;
1069 #endif
1070 
1071 #if defined(ANDROID_TEGRA)
1072   // 1GB system mem for NVIDIA Tegra devices since they use the same mem for
1073   // RAM and Video RAM
1074   min_system_memory = 1 << 30;
1075 #endif
1076 
1077   VLOG(5) << "available_memory = " << available_memory;
1078   VLOG(5) << "min_system_memory = " << min_system_memory;
1079   return min_system_memory;
1080 }
1081 
1082 // Get the memory limit for the virtual device being created on GPU with
1083 // 'platform_device_id', when that virtual device is the only virtual device
1084 // being created on that GPU.
SingleVirtualDeviceMemoryLimit(const GPUOptions & gpu_options,PlatformDeviceId platform_device_id,int64_t * memory_limit)1085 Status SingleVirtualDeviceMemoryLimit(const GPUOptions& gpu_options,
1086                                       PlatformDeviceId platform_device_id,
1087                                       int64_t* memory_limit) {
1088   int64_t total_memory = 0;
1089   int64_t available_memory = 0;
1090   se::StreamExecutor* se = DeviceIdUtil::ExecutorForPlatformDeviceId(
1091                                GPUMachineManager(), platform_device_id)
1092                                .ValueOrDie();
1093   if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
1094     return errors::Unknown("Failed to query available memory for GPU ",
1095                            platform_device_id.value());
1096   }
1097 
1098   int64_t allocated_memory = 0;
1099   const double per_process_gpu_memory_fraction =
1100       gpu_options.per_process_gpu_memory_fraction();
1101   se::CudaComputeCapability cc =
1102       se->GetDeviceDescription().cuda_compute_capability();
1103   if ((per_process_gpu_memory_fraction > 1.0 ||
1104        gpu_options.experimental().use_unified_memory()) &&
1105       !cc.IsAtLeast(se::CudaComputeCapability::PASCAL_)) {
1106     return errors::Internal(
1107         "Unified memory on GPUs with compute capability lower than 6.0 "
1108         "(pre-Pascal class GPUs) does not support oversubscription.");
1109   }
1110 
1111   if (per_process_gpu_memory_fraction == 0) {
1112     allocated_memory = available_memory;
1113     const int64_t min_system_memory =
1114         MinSystemMemory(available_memory, cc.major);
1115     if (min_system_memory < allocated_memory) {
1116       allocated_memory -= min_system_memory;
1117     }
1118   } else {
1119     allocated_memory = total_memory * per_process_gpu_memory_fraction;
1120   }
1121 
1122   // Override the excluded memory when TF_DEVICE_MIN_SYS_MEMORY_IN_MB is set.
1123   const char* force_device_reserved_bytes =
1124       std::getenv("TF_DEVICE_MIN_SYS_MEMORY_IN_MB");
1125   if (force_device_reserved_bytes != nullptr &&
1126       strcmp(force_device_reserved_bytes, "") != 0) {
1127     int64_t reserved_mb;
1128     if (!strings::safe_strto64(force_device_reserved_bytes, &reserved_mb) ||
1129         reserved_mb < 0) {
1130       LOG(WARNING) << "The requested reserved device memory "
1131                    << force_device_reserved_bytes
1132                    << " is invalid. The request will be ignored.";
1133     } else {
1134       // Convert MBytes to Bytes.
1135       int64_t allowable_reserved_memory = reserved_mb * 1024 * 1024;
1136       // TF_DEVICE_MIN_SYS_MEMORY_IN_MB overrides
1137       // per_process_gpu_memory_fraction.
1138       if (allowable_reserved_memory <= available_memory) {
1139         allocated_memory = available_memory - allowable_reserved_memory;
1140         VLOG(1) << "Setting the GPU reserved bytes to "
1141                 << strings::HumanReadableNumBytes(allocated_memory)
1142                 << " MBytes";
1143       } else {
1144         LOG(WARNING) << "The requested reserved device memory "
1145                      << strings::HumanReadableNumBytes(
1146                             allowable_reserved_memory)
1147                      << " is larger than the available memory of "
1148                      << strings::HumanReadableNumBytes(available_memory)
1149                      << ". The request is ignored.";
1150       }
1151     }
1152   }
1153   *memory_limit = allocated_memory;
1154   return OkStatus();
1155 }
1156 }  // namespace
1157 
ReinitializeDevice(OpKernelContext * context,PerOpGpuDevice * device,int stream_id,Allocator * allocator)1158 void BaseGPUDevice::ReinitializeDevice(OpKernelContext* context,
1159                                        PerOpGpuDevice* device, int stream_id,
1160                                        Allocator* allocator) {
1161   ConcretePerOpGpuDevice* concrete_device =
1162       static_cast<ConcretePerOpGpuDevice*>(device);
1163   DCHECK(concrete_device);
1164   DCHECK_EQ(stream_id, 0);
1165   const gpuStream_t* gpu_stream = reinterpret_cast<const gpuStream_t*>(
1166       stream_->compute->implementation()->GpuStreamMemberHack());
1167   concrete_device->Reinitialize(context, gpu_stream, tf_device_id_, allocator,
1168                                 scratch_);
1169 }
1170 
MakeGpuDevice()1171 PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice() {
1172   return new ConcretePerOpGpuDevice();
1173 }
1174 
ReinitializeGpuDevice(OpKernelContext * context,PerOpGpuDevice * device,DeviceContext * dc,Allocator * allocator)1175 Status BaseGPUDevice::ReinitializeGpuDevice(OpKernelContext* context,
1176                                             PerOpGpuDevice* device,
1177                                             DeviceContext* dc,
1178                                             Allocator* allocator) {
1179   TF_RETURN_IF_ERROR(InitScratchBuffers());
1180   if (dc) {
1181     const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc);
1182     const int stream_id = gpu_dc->stream_id();
1183     VLOG(1) << "  eigen_gpu_device(" << dc << ") => stream[" << stream_id
1184             << "]";
1185     CHECK_EQ(stream_id, 0);
1186     ReinitializeDevice(context, device, stream_id, allocator);
1187   } else {
1188     ReinitializeDevice(context, device, 0, allocator);
1189   }
1190   return OkStatus();
1191 }
1192 
GetScopedAllocator(AllocatorAttributes attr,int64_t step_id)1193 Allocator* BaseGPUDevice::GetScopedAllocator(AllocatorAttributes attr,
1194                                              int64_t step_id) {
1195   if (attr.scope_id > 0) {
1196     return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance(
1197         attr.scope_id);
1198   }
1199   LOG(FATAL) << "Unexpected call to BaseGPUDevice::GetScopedAllocator "
1200              << "attr.scope_id = " << attr.scope_id;
1201   return gpu_allocator_;
1202 }
1203 
1204 const int BaseGPUDeviceFactory::InterconnectMap::kSameDeviceStrength = 1000;
1205 const int BaseGPUDeviceFactory::InterconnectMap::kStreamExecutorStrength = 1;
1206 
CacheDeviceIds()1207 Status BaseGPUDeviceFactory::CacheDeviceIds() {
1208   if (!cached_device_ids_.empty()) {
1209     return OkStatus();
1210   }
1211 
1212   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1213   se::Platform* gpu_manager = GPUMachineManager();
1214   if (gpu_manager == nullptr) {
1215     return OkStatus();
1216   }
1217 
1218   int device_count = gpu_manager->VisibleDeviceCount();
1219   if (device_count <= 0) {
1220     return OkStatus();
1221   }
1222 
1223   std::vector<PlatformDeviceId> visible_gpu_order(device_count);
1224   std::iota(visible_gpu_order.begin(), visible_gpu_order.end(), 0);
1225   TF_RETURN_IF_ERROR(GetValidDeviceIds(visible_gpu_order, &cached_device_ids_));
1226   return OkStatus();
1227 }
1228 
ListPhysicalDevices(std::vector<string> * devices)1229 Status BaseGPUDeviceFactory::ListPhysicalDevices(std::vector<string>* devices) {
1230   TF_RETURN_IF_ERROR(CacheDeviceIds());
1231   for (PlatformDeviceId platform_device_id : cached_device_ids_) {
1232     const string device_name =
1233         strings::StrCat("/physical_device:GPU:", platform_device_id.value());
1234     devices->push_back(device_name);
1235   }
1236 
1237   return OkStatus();
1238 }
1239 
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)1240 Status BaseGPUDeviceFactory::GetDeviceDetails(
1241     int device_index, std::unordered_map<string, string>* details) {
1242   TF_RETURN_IF_ERROR(CacheDeviceIds());
1243 
1244   if (device_index < 0 || device_index > cached_device_ids_.size()) {
1245     return errors::Internal("Invalid device index: ", device_index);
1246   }
1247   PlatformDeviceId platform_device_id = cached_device_ids_[device_index];
1248 
1249   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1250   se::Platform* gpu_manager = GPUMachineManager();
1251   if (gpu_manager == nullptr) {
1252     return errors::Internal("Cannot get GPUMachineManager");
1253   }
1254   auto desc_status =
1255       gpu_manager->DescriptionForDevice(platform_device_id.value());
1256   if (!desc_status.ok()) {
1257     return desc_status.status();
1258   }
1259 
1260   auto desc = std::move(desc_status).value();
1261   (*details)["device_name"] = desc->name();
1262 #if GOOGLE_CUDA
1263   (*details)["compute_capability"] = desc->cuda_compute_capability().ToString();
1264 #endif  // GOOGLE_CUDA
1265   return OkStatus();
1266 }
1267 
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)1268 Status BaseGPUDeviceFactory::CreateDevices(
1269     const SessionOptions& options, const string& name_prefix,
1270     std::vector<std::unique_ptr<Device>>* devices) {
1271   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1272   se::Platform* gpu_manager = GPUMachineManager();
1273   if (gpu_manager == nullptr) {
1274     return OkStatus();
1275   }
1276   // If there are no GPUs visible, do nothing.
1277   if (gpu_manager->VisibleDeviceCount() <= 0) {
1278     return OkStatus();
1279   }
1280 
1281   size_t num_gpus_to_use = INT_MAX;
1282   auto iter = options.config.device_count().find("GPU");
1283   if (iter != options.config.device_count().end()) {
1284     num_gpus_to_use = iter->second;
1285   }
1286   const auto& gpu_options = options.config.gpu_options();
1287   std::vector<PlatformDeviceId> visible_gpu_order;
1288   std::vector<PlatformDeviceId> valid_platform_device_ids;
1289   // If we aren't going to use any GPUs, don't initialize them.
1290   // We don't want to call ParseVisibleDeviceList if num_gpus_to_use is 0,
1291   // because it treats an empty gpu_options.visible_device_list as 'all GPUs
1292   // are visible'.
1293   if (num_gpus_to_use > 0) {
1294     TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
1295         gpu_options.visible_device_list(), gpu_manager->VisibleDeviceCount(),
1296         &visible_gpu_order));
1297     bool new_gpu_found = false;
1298     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1299       int visible_gpu_id = visible_gpu_order[i].value();
1300 
1301       // Only perform this once per visible gpu id.
1302       if (visible_gpu_initialized_[visible_gpu_id]) {
1303         continue;
1304       }
1305 
1306       visible_gpu_initialized_[visible_gpu_id] = true;
1307       new_gpu_found = true;
1308     }
1309 
1310     // Checking peering and shows matrix if more than one gpu found.
1311     if (new_gpu_found && visible_gpu_order.size() > 1) {
1312       // Enable peer access
1313       TF_RETURN_IF_ERROR(EnablePeerAccess(visible_gpu_order));
1314     }
1315 
1316     TF_RETURN_IF_ERROR(
1317         GetValidDeviceIds(visible_gpu_order, &valid_platform_device_ids));
1318   }
1319   if (num_gpus_to_use > valid_platform_device_ids.size()) {
1320     num_gpus_to_use = valid_platform_device_ids.size();
1321   }
1322   std::map<int, std::pair<int, int>> supported_priority_ranges;
1323   if (!valid_platform_device_ids.empty()) {
1324     // Save the original device.
1325     int original_device = 0;
1326 #if GOOGLE_CUDA
1327     cudaError_t err = cudaGetDevice(&original_device);
1328     if (err != cudaSuccess) {
1329       return errors::Internal("cudaGetDevice() failed. Status: ",
1330                               cudaGetErrorString(err));
1331     }
1332 #elif TENSORFLOW_USE_ROCM
1333     hipError_t err = hipGetDevice(&original_device);
1334     if (err != hipSuccess) {
1335       return errors::Internal("hipGetDevice() failed. Status: ",
1336                               hipGetErrorString(err));
1337     }
1338 #endif
1339 
1340     // Force to implicitly initialize CUDA runtime on each valid GPU before
1341     // CreateGPUDevice().
1342     for (PlatformDeviceId platform_device_id : valid_platform_device_ids) {
1343 #if GOOGLE_CUDA
1344       err = cudaSetDevice(platform_device_id.value());
1345       if (err != cudaSuccess) {
1346         return errors::Internal(
1347             "cudaSetDevice() on GPU:", platform_device_id.value(),
1348             " failed. Status: ", cudaGetErrorString(err));
1349       }
1350       int priority_low, priority_high;
1351       cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1352       if (err != cudaSuccess) {
1353         return errors::Internal(
1354             "cudaDeviceGetStreamPriorityRange() on GPU:", original_device,
1355             " failed. Status: ", cudaGetErrorString(err));
1356       }
1357       VLOG(1) << "Cuda stream priority range on GPU(" << original_device
1358               << "): " << priority_high << "," << priority_low;
1359       supported_priority_ranges.insert(
1360           std::make_pair(platform_device_id.value(),
1361                          std::make_pair(priority_low, priority_high)));
1362 #elif TENSORFLOW_USE_ROCM
1363       err = hipSetDevice(platform_device_id.value());
1364       if (err != hipSuccess) {
1365         return errors::Internal(
1366             "hipSetDevice() on GPU:", platform_device_id.value(),
1367             " failed. Status: ", hipGetErrorString(err));
1368       }
1369       err = hipFree(nullptr);
1370       if (err != hipSuccess) {
1371         return errors::Internal("ROCm runtime implicit initialization on GPU:",
1372                                 platform_device_id.value(),
1373                                 " failed. Status: ", hipGetErrorString(err));
1374       }
1375       int priority_low, priority_high;
1376       hipDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1377       if (err != hipSuccess) {
1378         return errors::Internal(
1379             "hipDeviceGetStreamPriorityRange() on GPU:", original_device,
1380             " failed. Status: ", hipGetErrorString(err));
1381       }
1382       VLOG(1) << "HIP stream priority range on GPU(" << original_device
1383               << "): " << priority_high << "," << priority_low;
1384       supported_priority_ranges.insert(
1385           std::make_pair(platform_device_id.value(),
1386                          std::make_pair(priority_low, priority_high)));
1387 #endif
1388     }
1389     // Reset to the original device.
1390 #if GOOGLE_CUDA
1391     err = cudaSetDevice(original_device);
1392     if (err != cudaSuccess) {
1393       return errors::Internal("cudaSetDevice() on GPU:", original_device,
1394                               " failed. Status: ", cudaGetErrorString(err));
1395     }
1396 #elif TENSORFLOW_USE_ROCM
1397     err = hipSetDevice(original_device);
1398     if (err != hipSuccess) {
1399       return errors::Internal("hipSetDevice() on GPU:", original_device,
1400                               " failed. Status: ", hipGetErrorString(err));
1401     }
1402 #endif
1403 
1404 #if GOOGLE_CUDA
1405     // Log the version of CUDA and cuDNN
1406     int cuda_major_version = CUDART_VERSION / 1000;
1407     int cuda_minor_version = (CUDART_VERSION / 10) % 10;
1408     VLOG(1) << "TensorFlow compiled with CUDA " << cuda_major_version << "."
1409             << cuda_minor_version << " and cuDNN " << CUDNN_MAJOR << "."
1410             << CUDNN_MINOR << "." << CUDNN_PATCHLEVEL;
1411 #endif
1412   }
1413 
1414   std::vector<InterconnectMap> interconnect_maps;
1415   TF_RETURN_IF_ERROR(
1416       GetInterconnectMaps(visible_gpu_order, gpu_manager, &interconnect_maps));
1417 
1418   // Print each interconnect map to the log.
1419   for (const InterconnectMap& im : interconnect_maps) {
1420     VLOG(1) << "Device interconnect " << im.name << " with strength "
1421             << im.strength << " edge matrix:";
1422     string line_buf = "     ";
1423     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1424       strings::StrAppend(&line_buf, visible_gpu_order[i].value(), " ");
1425     }
1426     VLOG(1) << line_buf;
1427     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1428       line_buf = strings::StrCat(visible_gpu_order[i].value(), ":   ");
1429       PlatformDeviceId gpu_id_i = visible_gpu_order[i];
1430       for (int j = 0; j < visible_gpu_order.size(); ++j) {
1431         PlatformDeviceId gpu_id_j = visible_gpu_order[j];
1432         if (im.directed_links.find({gpu_id_i, gpu_id_j}) !=
1433             im.directed_links.end()) {
1434           line_buf.append("Y ");
1435         } else {
1436           line_buf.append("N ");
1437         }
1438       }
1439       VLOG(1) << line_buf;
1440     }
1441   }
1442 
1443   const auto& virtual_devices = gpu_options.experimental().virtual_devices();
1444   if (!virtual_devices.empty()) {
1445     TF_RETURN_IF_ERROR(VerifyVirtualDeviceSettings(
1446         num_gpus_to_use, gpu_options, visible_gpu_order,
1447         valid_platform_device_ids, supported_priority_ranges));
1448     // We've verified that num_gpus_to_use >= virtual_devices.size().
1449     num_gpus_to_use = virtual_devices.size();
1450     CHECK(gpu_options.visible_device_list().empty() ||
1451           valid_platform_device_ids == visible_gpu_order);
1452   }
1453 
1454   struct TfDeviceSpec {
1455     PlatformDeviceId platform_device_id;
1456     int64_t memory_limit_bytes;
1457     int index;  // Index in the concatenated virtual device configuration list.
1458     int device_ordinal;  // Ordinal for determining the tf device id.
1459     TfDeviceSpec(PlatformDeviceId platform_device_id,
1460                  int64_t memory_limit_bytes, int index, int device_ordinal)
1461         : platform_device_id(platform_device_id),
1462           memory_limit_bytes(memory_limit_bytes),
1463           index(index),
1464           device_ordinal(device_ordinal) {}
1465   };
1466 
1467   std::vector<TfDeviceSpec> tf_device_specs;
1468 
1469   constexpr int64_t kMegaByte = 1ll << 20;
1470 
1471   for (int i = 0; i < num_gpus_to_use; ++i) {
1472     const PlatformDeviceId platform_device_id = valid_platform_device_ids[i];
1473     if (virtual_devices.empty() ||
1474         virtual_devices.Get(i).memory_limit_mb_size() == 0) {
1475       int64_t single_virtual_device_memory_limit = 0;
1476       TF_RETURN_IF_ERROR(
1477           SingleVirtualDeviceMemoryLimit(gpu_options, platform_device_id,
1478                                          &single_virtual_device_memory_limit));
1479       tf_device_specs.emplace_back(
1480           platform_device_id, single_virtual_device_memory_limit,
1481           /*index=*/tf_device_specs.size(), /*device_ordinal=*/0);
1482     } else {
1483       const GPUOptions::Experimental::VirtualDevices& virtual_devices_for_gpu =
1484           virtual_devices.Get(i);
1485       for (int j = 0; j < virtual_devices_for_gpu.memory_limit_mb().size();
1486            j++) {
1487         tf_device_specs.emplace_back(
1488             platform_device_id,
1489             static_cast<int64_t>(virtual_devices_for_gpu.memory_limit_mb(j)) *
1490                 kMegaByte,
1491             /*index=*/tf_device_specs.size(),
1492             /*device_ordinal=*/j <
1493                     virtual_devices_for_gpu.device_ordinal().size()
1494                 ? virtual_devices_for_gpu.device_ordinal(j)
1495                 : 0);
1496       }
1497     }
1498   }
1499 
1500   // Reorder virtual devices by their device_ordinal, breaking ties by the index
1501   // in the concatenated virtual device list.
1502   std::sort(tf_device_specs.begin(), tf_device_specs.end(),
1503             [](const TfDeviceSpec& a, const TfDeviceSpec& b) {
1504               if (a.device_ordinal < b.device_ordinal) {
1505                 return true;
1506               } else if (a.device_ordinal > b.device_ordinal) {
1507                 return false;
1508               }
1509               DCHECK_EQ(a.device_ordinal, b.device_ordinal);
1510               DCHECK_NE(a.index, b.index);  // index is unique.
1511               if (a.index < b.index) {
1512                 return true;
1513               }
1514               return false;
1515             });
1516 
1517   for (int di = 0; di < tf_device_specs.size(); ++di) {
1518     TfDeviceId tf_device_id(di);
1519     TF_RETURN_IF_ERROR(GpuIdManager::InsertTfPlatformDeviceIdPair(
1520         tf_device_id, tf_device_specs[di].platform_device_id));
1521   }
1522 
1523   LocalityMap device_localities;
1524   TF_RETURN_IF_ERROR(GetDeviceLocalities(
1525       tf_device_specs.size(), interconnect_maps, &device_localities));
1526 
1527   // Build the GPUDevices
1528   for (int di = 0; di < tf_device_specs.size(); ++di) {
1529     TfDeviceId tf_device_id(di);
1530     auto it = device_localities.find(tf_device_id);
1531     if (it == device_localities.end()) {
1532       return errors::Internal("Failed to find DeviceLocality for GPU device ",
1533                               tf_device_id.value());
1534     }
1535     TF_RETURN_IF_ERROR(CreateGPUDevice(options, name_prefix, tf_device_id,
1536                                        tf_device_specs[di].memory_limit_bytes,
1537                                        it->second, tf_device_specs.size(),
1538                                        devices));
1539   }
1540   return OkStatus();
1541 }
1542 
GetShortDeviceDescription(PlatformDeviceId platform_device_id,const se::DeviceDescription & desc)1543 static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
1544                                         const se::DeviceDescription& desc) {
1545 #if GOOGLE_CUDA
1546   // LINT.IfChange
1547   return strings::StrCat(
1548       "device: ", platform_device_id.value(), ", name: ", desc.name(),
1549       ", pci bus id: ", desc.pci_bus_id(),
1550       ", compute capability: ", desc.cuda_compute_capability().ToString());
1551   // LINT.ThenChange(//tensorflow/python/framework/gpu_util.py)
1552 #elif TENSORFLOW_USE_ROCM
1553   return strings::StrCat("device: ", platform_device_id.value(),
1554                          ", name: ", desc.name(),
1555                          ", pci bus id: ", desc.pci_bus_id());
1556 #endif
1557 }
1558 
CreateGPUDevice(const SessionOptions & options,const string & name_prefix,TfDeviceId tf_device_id,int64_t memory_limit,const DeviceLocality & dev_locality,size_t num_tf_gpus,std::vector<std::unique_ptr<Device>> * devices)1559 Status BaseGPUDeviceFactory::CreateGPUDevice(
1560     const SessionOptions& options, const string& name_prefix,
1561     TfDeviceId tf_device_id, int64_t memory_limit,
1562     const DeviceLocality& dev_locality, size_t num_tf_gpus,
1563     std::vector<std::unique_ptr<Device>>* devices) {
1564   CHECK_GE(tf_device_id.value(), 0);
1565   const string device_name =
1566       strings::StrCat(name_prefix, "/device:GPU:", tf_device_id.value());
1567   DeviceIdUtil::CheckValidTfDeviceId(DEVICE_GPU, GPUMachineManager(),
1568                                      tf_device_id);
1569   PlatformDeviceId platform_device_id;
1570   TF_RETURN_IF_ERROR(
1571       GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1572   int numa_node = dev_locality.numa_node();
1573 
1574   se::Platform* gpu_manager = GPUMachineManager();
1575   auto desc_status =
1576       gpu_manager->DescriptionForDevice(platform_device_id.value());
1577   if (!desc_status.ok()) {
1578     return desc_status.status();
1579   }
1580   auto desc = std::move(desc_status).value();
1581 
1582   std::vector<TfDeviceId> peer_gpu_ids;
1583   peer_gpu_ids.reserve(num_tf_gpus);
1584   for (int id = 0; id < num_tf_gpus; ++id) {
1585     TfDeviceId peer_tf_device_id(id);
1586     if (peer_tf_device_id != tf_device_id) {
1587       peer_gpu_ids.push_back(peer_tf_device_id);
1588     }
1589   }
1590 
1591   GPUProcessState* process_state = GPUProcessState::singleton();
1592   Allocator* gpu_allocator = process_state->GetGPUAllocator(
1593       options.config.gpu_options(), tf_device_id, memory_limit, peer_gpu_ids);
1594   if (gpu_allocator == nullptr) {
1595     return errors::Internal("Failed to get memory allocator for TF GPU ",
1596                             tf_device_id.value(), " with ", memory_limit,
1597                             " bytes of memory.");
1598   }
1599   absl::optional<AllocatorStats> stats = gpu_allocator->GetStats();
1600   if (!stats) {
1601     return errors::Internal("No allocator statistics");
1602   }
1603   // 'memory_limit' is the required memory size, but if the allocator with
1604   // given tf_device_id was created before, we'll use it instead of creating a
1605   // new one (as TF gpu device is a shared resource), in which case the actual
1606   // memory limit represented by 'stats.bytes_limit' used by that allocator
1607   // may be different (which should be an error).
1608   //
1609   // TODO(laigd): report error if memory_limit doesn't match
1610   // stats->bytes_limit.
1611   int64_t bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
1612   std::unique_ptr<BaseGPUDevice> gpu_device = CreateGPUDevice(
1613       options, device_name, static_cast<Bytes>(bytes_limit), dev_locality,
1614       tf_device_id, GetShortDeviceDescription(platform_device_id, *desc),
1615       gpu_allocator, ProcessState::singleton()->GetCPUAllocator(numa_node));
1616   LOG(INFO) << "Created device " << device_name << " with "
1617             << (bytes_limit >> 20) << " MB memory: "
1618             << " -> " << GetShortDeviceDescription(platform_device_id, *desc);
1619   TF_RETURN_IF_ERROR(gpu_device->Init(options));
1620   gpu_allocator->SetStreamAndPreallocateMemory(gpu_device->GetStream());
1621   devices->push_back(std::move(gpu_device));
1622 
1623   return OkStatus();
1624 }
1625 
1626 namespace {
1627 std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
GetPeerAccessMap(se::Platform * platform,const std::vector<PlatformDeviceId> & visible_gpu_order)1628 GetPeerAccessMap(se::Platform* platform,
1629                  const std::vector<PlatformDeviceId>& visible_gpu_order) {
1630   std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
1631       map(new std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>);
1632   for (PlatformDeviceId platform_gpu_i : visible_gpu_order) {
1633     for (PlatformDeviceId platform_gpu_j : visible_gpu_order) {
1634       se::StreamExecutor* from =
1635           DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_i)
1636               .ValueOrDie();
1637       se::StreamExecutor* to =
1638           DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_j)
1639               .ValueOrDie();
1640       (*map)[{platform_gpu_i, platform_gpu_j}] =
1641           from->CanEnablePeerAccessTo(to);
1642     }
1643   }
1644 
1645   return map;
1646 }
1647 
1648 }  // namespace
1649 
GetInterconnectMaps(const std::vector<PlatformDeviceId> & visible_gpu_order,se::Platform * gpu_manager,std::vector<InterconnectMap> * maps)1650 Status BaseGPUDeviceFactory::GetInterconnectMaps(
1651     const std::vector<PlatformDeviceId>& visible_gpu_order,
1652     se::Platform* gpu_manager, std::vector<InterconnectMap>* maps) {
1653   // The default interconnect map is obtained from the StreamExecutor.
1654   auto access_map = GetPeerAccessMap(gpu_manager, visible_gpu_order);
1655   maps->resize(1);
1656   InterconnectMap& imap = maps->at(0);
1657   imap.name = "StreamExecutor";
1658   imap.strength = InterconnectMap::kStreamExecutorStrength;
1659   for (PlatformDeviceId gpu_id_i : visible_gpu_order) {
1660     for (PlatformDeviceId gpu_id_j : visible_gpu_order) {
1661       if (gpu_id_i == gpu_id_j) continue;
1662       if ((*access_map)[{gpu_id_i, gpu_id_j}]) {
1663         imap.directed_links.insert({gpu_id_i, gpu_id_j});
1664       }
1665     }
1666   }
1667   return OkStatus();
1668 }
1669 
GetDeviceLocalities(int num_tf_gpus,const std::vector<InterconnectMap> & interconnects,LocalityMap * localities)1670 Status BaseGPUDeviceFactory::GetDeviceLocalities(
1671     int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
1672     LocalityMap* localities) {
1673   std::vector<TfDeviceId> all_tf_device_ids;
1674   all_tf_device_ids.reserve(num_tf_gpus);
1675   for (int i = 0; i < num_tf_gpus; ++i) {
1676     all_tf_device_ids.push_back(TfDeviceId(i));
1677   }
1678   for (TfDeviceId tf_device_id : all_tf_device_ids) {
1679     PlatformDeviceId platform_device_id;
1680     TF_RETURN_IF_ERROR(
1681         GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1682     // Get GPU bus_id from its reported NUMA affinity.  Because GPUs are
1683     // virtualized in some environments, we can't just use the GPU id.
1684     // NUMA locales are indexed from 0, buses are indexed from 1.
1685     se::Platform* gpu_manager = GPUMachineManager();
1686     auto desc_status =
1687         gpu_manager->DescriptionForDevice(platform_device_id.value());
1688     if (!desc_status.ok()) {
1689       return desc_status.status();
1690     }
1691     auto desc = std::move(desc_status).value();
1692     int numa_node = desc->numa_node();
1693     if (numa_node < 0) {
1694       // For some reason the StreamExecutor couldn't get the NUMA
1695       // affinity of the GPU.  If this is not a multi-socket mobo with
1696       // GPUs local to different buses, it doesn't matter.  If it is, we
1697       // may run into trouble later with data transfer operations.  The
1698       // trouble may manifest as slower than expected performance, or
1699       // outright failures.
1700       LOG(INFO) << "Could not identify NUMA node of platform GPU id "
1701                 << platform_device_id
1702                 << ", defaulting to 0.  Your kernel may not have been built "
1703                 << "with NUMA support.";
1704       numa_node = 0;
1705     }
1706     DeviceLocality dev_locality;
1707     dev_locality.set_numa_node(numa_node);
1708     dev_locality.set_bus_id(numa_node + 1);
1709 
1710     // Set LocalLinks from InterconnectMaps.
1711     LocalLinks* links = dev_locality.mutable_links();
1712     for (const InterconnectMap& imap : interconnects) {
1713       for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1714         PlatformDeviceId platform_gpu_dst;
1715         TF_RETURN_IF_ERROR(
1716             GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1717         if (imap.directed_links.find({platform_device_id, platform_gpu_dst}) !=
1718             imap.directed_links.end()) {
1719           InterconnectLink* ilink = links->add_link();
1720           ilink->set_device_id(tf_gpu_dst.value());
1721           ilink->set_type(imap.name);
1722           ilink->set_strength(imap.strength);
1723         }
1724       }
1725     }
1726 
1727     // If this is one of multiple virtual GPUs on the same physical GPU
1728     // add high strength links to the others.
1729     for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1730       if (tf_device_id == tf_gpu_dst) continue;
1731       PlatformDeviceId platform_gpu_dst;
1732       TF_RETURN_IF_ERROR(
1733           GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1734       if (platform_device_id == platform_gpu_dst) {
1735         InterconnectLink* ilink = links->add_link();
1736         ilink->set_device_id(tf_gpu_dst.value());
1737         ilink->set_type("SAME_DEVICE");
1738         ilink->set_strength(InterconnectMap::kSameDeviceStrength);
1739       }
1740     }
1741 
1742     (*localities)[tf_device_id] = dev_locality;
1743     VLOG(1) << "GPUDevice PlatformDeviceId " << platform_device_id
1744             << " TfDeviceId " << tf_device_id << " on bus "
1745             << dev_locality.bus_id() << " numa: " << numa_node
1746             << " pci: " << desc->pci_bus_id()
1747             << " DeviceLocality: " << dev_locality.DebugString();
1748   }
1749   return OkStatus();
1750 }
1751 
GetDefaultMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1752 static int GetDefaultMinGPUMultiprocessorCount(
1753     se::Platform* gpu_manager,
1754     const std::vector<PlatformDeviceId>& visible_gpu_order) {
1755   static const int kDefaultMinGPUMultiprocessorCount = 8;
1756 
1757   // Find the highest multi-processor count across all visible GPUs.
1758   int max_count = -1;
1759   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1760     int visible_gpu_id = visible_gpu_order[i].value();
1761     auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1762     if (!description_status.ok()) {
1763       continue;
1764     }
1765 
1766     auto description = std::move(description_status).value();
1767     max_count = std::max(max_count, description->core_count());
1768   }
1769 
1770   if (max_count < 0 || kDefaultMinGPUMultiprocessorCount < max_count) {
1771     return kDefaultMinGPUMultiprocessorCount;
1772   } else {
1773     return max_count;
1774   }
1775 }
1776 
GetMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1777 static int GetMinGPUMultiprocessorCount(
1778     se::Platform* gpu_manager,
1779     const std::vector<PlatformDeviceId>& visible_gpu_order) {
1780   const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT");
1781 
1782   if (tf_min_gpu_core_count == nullptr ||
1783       strcmp(tf_min_gpu_core_count, "") == 0) {
1784     return GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1785   }
1786 
1787   int min_gpu_core_count = -1;
1788   if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) {
1789     if (min_gpu_core_count >= 0) {
1790       return min_gpu_core_count;
1791     }
1792   }
1793 
1794   int count =
1795       GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1796   LOG(ERROR) << "Invalid minimum GPU multiprocessor count: ["
1797              << tf_min_gpu_core_count << "]. "
1798              << "Using the default value: " << count;
1799   return count;
1800 }
1801 
1802 namespace {
1803 
1804 #if GOOGLE_CUDA
1805 
ComputeCapabilityFromString(const std::string & version_name)1806 se::CudaComputeCapability ComputeCapabilityFromString(
1807     const std::string& version_name) {
1808   int major_part, minor_part;
1809   size_t dot_pos = version_name.find('.');
1810   CHECK(dot_pos != string::npos)
1811       << "Illegal version name: [" << version_name << "]";
1812   string major_str = version_name.substr(0, dot_pos);
1813   CHECK(strings::safe_strto32(major_str, &major_part))
1814       << "Illegal version name: [" << version_name << "]";
1815   string minor_str = version_name.substr(dot_pos + 1);
1816   CHECK(strings::safe_strto32(minor_str, &minor_part))
1817       << "Illegal version name: [" << version_name << "]";
1818   return se::CudaComputeCapability{major_part, minor_part};
1819 }
1820 
GetSupportedCudaComputeCapabilities()1821 std::vector<se::CudaComputeCapability> GetSupportedCudaComputeCapabilities() {
1822   std::vector<se::CudaComputeCapability> cuda_caps = {
1823       ComputeCapabilityFromString("3.5"), ComputeCapabilityFromString("5.2")};
1824 #ifdef TF_EXTRA_CUDA_CAPABILITIES
1825 // TF_EXTRA_CUDA_CAPABILITIES should be defined a sequence separated by commas,
1826 // for example:
1827 //   TF_EXTRA_CUDA_CAPABILITIES=3.0,4.0,5.0
1828 // Use two-level macro expansion for stringification.
1829 #define TF_XSTRING(...) #__VA_ARGS__
1830 #define TF_STRING(s) TF_XSTRING(s)
1831   string extra_cuda_caps = TF_STRING(TF_EXTRA_CUDA_CAPABILITIES);
1832 #undef TF_STRING
1833 #undef TF_XSTRING
1834   auto extra_capabilities = str_util::Split(extra_cuda_caps, ',');
1835   for (const auto& capability : extra_capabilities) {
1836     cuda_caps.push_back(ComputeCapabilityFromString(capability));
1837   }
1838 #endif
1839   return cuda_caps;
1840 }
1841 #endif  // GOOGLE_CUDA
1842 
1843 }  // namespace
1844 
EnablePeerAccess(const std::vector<PlatformDeviceId> & visible_gpu_order)1845 Status BaseGPUDeviceFactory::EnablePeerAccess(
1846     const std::vector<PlatformDeviceId>& visible_gpu_order) {
1847   se::Platform* gpu_manager = GPUMachineManager();
1848   int possible_peer_count = 0;
1849   int enabled_peer_count = 0;
1850   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1851     const PlatformDeviceId platform_gpu_i = visible_gpu_order[i];
1852     for (int j = 0; j < visible_gpu_order.size(); ++j) {
1853       const PlatformDeviceId platform_gpu_j = visible_gpu_order[j];
1854       // We have already validated that ExecutorForDevice() calls return OK.
1855       se::StreamExecutor* from =
1856           DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_i)
1857               .ValueOrDie();
1858       se::StreamExecutor* to =
1859           DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_j)
1860               .ValueOrDie();
1861 
1862       if (from->CanEnablePeerAccessTo(to)) {
1863         ++possible_peer_count;
1864         auto status = from->EnablePeerAccessTo(to);
1865         if (!status.ok()) {
1866           LOG(WARNING)
1867               << "Unable to enable peer access between device ordinals "
1868               << platform_gpu_i << " and " << platform_gpu_j
1869               << ", status: " << status;
1870         } else {
1871           ++enabled_peer_count;
1872         }
1873       }
1874     }
1875   }
1876 
1877   // Return an error in the extreme failure case where the driver
1878   // reported that peering was possible but not a single peering was
1879   // successful.  This is to catch possible system misconfigurations
1880   // or more fundamental issues.
1881   if (possible_peer_count > 0 && enabled_peer_count == 0) {
1882     return errors::Internal(possible_peer_count,
1883                             " potential peer access pairs were reported by the "
1884                             "driver, but no peering could be enabled.");
1885   }
1886   return OkStatus();
1887 }
1888 
GetValidDeviceIds(const std::vector<PlatformDeviceId> & visible_gpu_order,std::vector<PlatformDeviceId> * ids)1889 Status BaseGPUDeviceFactory::GetValidDeviceIds(
1890     const std::vector<PlatformDeviceId>& visible_gpu_order,
1891     std::vector<PlatformDeviceId>* ids) {
1892   se::Platform* gpu_manager = GPUMachineManager();
1893   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1894     int visible_gpu_id = visible_gpu_order[i].value();
1895     auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1896     if (!description_status.ok()) {
1897       return description_status.status();
1898     }
1899 
1900     auto description = std::move(description_status).value();
1901 #if GOOGLE_CUDA
1902     VLOG(1) << "Found device " << i << " with properties: "
1903             << "\npciBusID: " << description->pci_bus_id()
1904             << " name: " << description->name() << " computeCapability: "
1905             << description->cuda_compute_capability().ToString()
1906             << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1907             << " coreCount: " << description->core_count()
1908             << " deviceMemorySize: "
1909             << strings::HumanReadableNumBytes(description->device_memory_size())
1910             << " deviceMemoryBandwidth: "
1911             << strings::HumanReadableNumBytes(description->memory_bandwidth())
1912             << "/s";
1913 #elif TENSORFLOW_USE_ROCM
1914     std::string gcn_arch_name =
1915         description->rocm_compute_capability().gcn_arch_name();
1916     VLOG(1) << "Found device " << i << " with properties: "
1917             << "\npciBusID: " << description->pci_bus_id()
1918             << " name: " << description->name()
1919             << "     ROCm AMDGPU Arch: " << gcn_arch_name
1920             << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1921             << " coreCount: " << description->core_count()
1922             << " deviceMemorySize: "
1923             << strings::HumanReadableNumBytes(description->device_memory_size())
1924             << " deviceMemoryBandwidth: "
1925             << strings::HumanReadableNumBytes(description->memory_bandwidth())
1926             << "/s";
1927 #endif
1928   }
1929 
1930 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
1931   // Try to dlopen GPU libraries if they are supposed to be dynamically loaded.
1932   auto handle_or = se::internal::DsoLoader::MaybeTryDlopenGPULibraries();
1933   if (!handle_or.ok()) {
1934     LOG(WARNING) << "Cannot dlopen some GPU libraries. Please make sure the "
1935                     "missing libraries mentioned above are installed properly "
1936                     "if you would like to use GPU. Follow the guide at "
1937                     "https://www.tensorflow.org/install/gpu for how to "
1938                     "download and setup the required libraries for your "
1939                     "platform.\nSkipping registering "
1940                     "GPU devices...";
1941     return OkStatus();
1942   }
1943 #endif
1944 
1945 #if GOOGLE_CUDA
1946   auto cuda_supported_capabilities = GetSupportedCudaComputeCapabilities();
1947   if (cuda_supported_capabilities.empty()) {
1948     return errors::FailedPrecondition(
1949         "No supported cuda capabilities in binary.");
1950   }
1951   se::CudaComputeCapability min_supported_capability = *std::min_element(
1952       cuda_supported_capabilities.begin(), cuda_supported_capabilities.end());
1953 #endif
1954 
1955   int min_gpu_core_count =
1956       GetMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1957 
1958   // Filter out devices that don't have the right capability or power.
1959   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1960     const PlatformDeviceId visible_gpu_id = visible_gpu_order[i];
1961     auto description_status =
1962         gpu_manager->DescriptionForDevice(visible_gpu_id.value());
1963     if (!description_status.ok()) {
1964       LOG(INFO) << "Ignoring visible gpu device " << visible_gpu_id
1965                 << " whose executor is in invalid state: "
1966                 << description_status.status().ToString();
1967       continue;
1968     }
1969 
1970     auto desc = std::move(description_status).value();
1971 
1972 #if GOOGLE_CUDA
1973     // Only GPUs with no less than the minimum supported compute capability is
1974     // accepted.
1975     if (desc->cuda_compute_capability() < min_supported_capability) {
1976       LOG(INFO) << "Ignoring visible gpu device "
1977                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1978                 << ") "
1979                 << "with Cuda compute capability "
1980                 << desc->cuda_compute_capability().ToString()
1981                 << ". The minimum required Cuda capability is "
1982                 << min_supported_capability.ToString() << ".";
1983       continue;
1984     }
1985 #elif TENSORFLOW_USE_ROCM
1986     int device_isa;
1987     // Only GPUs with supported gfx versions are accepted.
1988     auto rocm_compute_capability = desc->rocm_compute_capability();
1989     if (!rocm_compute_capability.is_supported_gfx_version()) {
1990       LOG(INFO) << "Ignoring visible gpu device "
1991                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1992                 << ") "
1993                 << "with AMDGPU version : "
1994                 << rocm_compute_capability.gfx_version()
1995                 << ". The supported AMDGPU versions are "
1996                 << rocm_compute_capability.supported_gfx_versions_str() << ".";
1997       continue;
1998     }
1999 #endif
2000 
2001     // Filter out slow GPUs. By default, GPUs with a lower multiprocessor
2002     // count than the fastest GPU are filtered out, unless they have 8 or more
2003     // multiprocessors. If the TF_MIN_GPU_MULTIPROCESSOR_COUNT environment
2004     // variable is set, its value will be used to filter out GPUs.
2005     if (desc->core_count() < min_gpu_core_count) {
2006       LOG(INFO) << "Ignoring visible gpu device "
2007                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
2008                 << ") "
2009                 << "with core count: " << desc->core_count()
2010                 << ". The minimum required count is " << min_gpu_core_count
2011                 << ". You can adjust this requirement with the env var "
2012                    "TF_MIN_GPU_MULTIPROCESSOR_COUNT.";
2013       continue;
2014     }
2015 
2016 #if defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
2017     // Compare compute capability of device with list in cuda_config.h and
2018     // warn about long loading time when we need to JIT kernels from PTX.
2019     auto compute_capabilities = {TF_CUDA_COMPUTE_CAPABILITIES};
2020     auto device_capability = desc->cuda_compute_capability();
2021     if (std::count_if(std::cbegin(compute_capabilities),
2022                       std::cend(compute_capabilities), [&](int cc) {
2023                         // CUBINs are backwards compatible within major version.
2024                         return cc / 10 == device_capability.major &&
2025                                cc % 10 <= device_capability.minor;
2026                       }) == 0) {
2027       LOG(WARNING)
2028           << "TensorFlow was not built with CUDA kernel binaries "
2029              "compatible with compute capability "
2030           << device_capability.ToString() << ". CUDA kernels will be "
2031           << "jit-compiled from PTX, which could take 30 minutes or longer.";
2032     }
2033 #endif  // defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
2034 
2035     ids->push_back(visible_gpu_id);
2036   }
2037   if (!ids->empty()) {
2038     std::vector<int> raw_ids(ids->size());
2039     std::transform(ids->begin(), ids->end(), raw_ids.begin(),
2040                    [](PlatformDeviceId id) -> int { return id.value(); });
2041     VLOG(1) << "Adding visible gpu devices: " << absl::StrJoin(raw_ids, ", ");
2042   }
2043 
2044   return OkStatus();
2045 }
2046 
SafeAllocFrontier(uint64 old_value)2047 uint64 BaseGPUDevice::SafeAllocFrontier(uint64 old_value) {
2048   if (timestamped_allocator_) {
2049     return kernel_tracker_->LastTerminatedCount(old_value);
2050   } else {
2051     return 0;
2052   }
2053 }
2054 
PendingKernels()2055 int BaseGPUDevice::PendingKernels() {
2056   if (kernel_tracker_) {
2057     return kernel_tracker_->NumPending();
2058   }
2059   return 0;
2060 }
2061 
TestOnlyReset()2062 void BaseGPUDevice::TestOnlyReset() {
2063   StreamGroupFactory::Global().TestOnlyReset();
2064 }
2065 
MaybeQueue(OpKernelContext * ctx)2066 uint64 GPUKernelTracker::MaybeQueue(OpKernelContext* ctx) {
2067   mutex_lock l(mu_);
2068   ++ops_since_last_;
2069   int64_t mem_used =
2070       ctx->persistent_memory_allocated() + ctx->temp_memory_allocated();
2071   VLOG(2) << "kernel: " << ctx->op_kernel().name() << " mem_used: " << mem_used;
2072   mem_since_last_ += mem_used;
2073   int weight = 1;
2074   // Note that if all {max_bytes, max_interval, max_pending} are zero then
2075   // we track every single kernel with no pending cap.  This can happen if
2076   // timestamped_allocator alone was specified.
2077   if ((mem_since_last_ < params_.max_bytes) &&
2078       (ops_since_last_ < params_.max_interval)) {
2079     return 0;
2080   } else {
2081     weight = std::min(
2082         params_.max_pending,
2083         std::max(1, mem_since_last_ / std::max(16386, params_.max_bytes)));
2084     mem_since_last_ = 0;
2085     ops_since_last_ = 0;
2086   }
2087   uint64 queued_count = timing_counter_->next();
2088   RecordQueued(queued_count, weight);
2089   return queued_count;
2090 }
2091 
RecordQueued(uint64 queued_count,int weight)2092 void GPUKernelTracker::RecordQueued(uint64 queued_count, int weight) {
2093   VLOG(2) << "RecordQueued queued_count=" << queued_count
2094           << " first_available_=" << first_available_
2095           << " last_completed_=" << last_completed_
2096           << " num_pending_=" << num_pending_;
2097   pending_kernels_[first_available_].queued_count = queued_count;
2098   pending_kernels_[first_available_].weight = weight;
2099   pending_kernels_[first_available_].terminated = false;
2100   ++first_available_;
2101   num_pending_ += weight;
2102   if (first_available_ >= pending_kernels_.size()) {
2103     if (last_completed_ >= 0) {
2104       // wrap
2105       first_available_ = 0;
2106     } else {
2107       // enlarge the ring buffer
2108       pending_kernels_.resize(2 * pending_kernels_.size());
2109     }
2110   }
2111   if (first_available_ == last_completed_) {
2112     // Ring buffer is full: double it.  All of the same valid PendingKernel
2113     // entries exist after the copy, they are just shifted to begin
2114     // at index 0 in the new array.
2115     std::vector<PendingKernel> new_buffer(pending_kernels_.size() * 2);
2116     for (int i = 0; i < pending_kernels_.size(); ++i) {
2117       int j = (i + last_completed_) % pending_kernels_.size();
2118       new_buffer[i] = pending_kernels_[j];
2119     }
2120     last_completed_ = 0;
2121     first_available_ = pending_kernels_.size();
2122     pending_kernels_.swap(new_buffer);
2123     VLOG(1) << "last_completed_=" << last_completed_
2124             << " first_available_=" << first_available_
2125             << " num_pending_=" << num_pending_;
2126   }
2127   DCHECK_NE(first_available_, last_completed_) << "exhausted pending_kernels";
2128 }
2129 
2130 // Called by LastTerminatedCount() when new_value is equal to old_value.  This
2131 // case can occur where an allocation failed and waited for memory to be freed,
2132 // then when it retried the safe allocation frontier had not advanced because no
2133 // tracking event had matured.  Maybe GPU progress has stalled waiting on an i/o
2134 // event, or maybe we're tracking at too infrequent an interval.  In any case if
2135 // the GPU compute queue is actually empty it's safe to advance the safe
2136 // frontier so that this request can allocate from unrestricted (and better
2137 // compacted) memory.  So queue an event on the compute stream to ensure the
2138 // frontier does advance.
MaybeQueueProgressEvent()2139 void GPUKernelTracker::MaybeQueueProgressEvent() {
2140   mutex_lock l(mu_);
2141   if (num_pending_ == 0) {
2142     uint64 new_count = timing_counter_->next();
2143     RecordQueued(new_count, 1);
2144     em_->ThenExecute(stream_,
2145                      [this, new_count]() { RecordTerminated(new_count); });
2146   }
2147 }
2148 
RecordTerminated(uint64 queued_count)2149 void GPUKernelTracker::RecordTerminated(uint64 queued_count) {
2150   mutex_lock l(mu_);
2151   VLOG(2) << this << " RecordTerminated queued_count=" << queued_count
2152           << " first_available_=" << first_available_
2153           << " last_completed_=" << last_completed_
2154           << " num_pending_=" << num_pending_ << " LC="
2155           << ((last_completed_ >= 0)
2156                   ? pending_kernels_[last_completed_].queued_count
2157                   : -1);
2158   DCHECK_NE(first_available_, last_completed_);
2159   DCHECK_GT(num_pending_, 0);
2160   // Starting just past the last completed entry, find the entry with
2161   // this queued_count and mark it done.
2162   int index = (last_completed_ + 1) % pending_kernels_.size();
2163   int weight = 1;
2164   while (true) {
2165     if (index == first_available_) {
2166       // This should never happen.
2167       LOG(FATAL) << "Failed to find " << queued_count  // Crash OK
2168                  << " in queue, last_completed_=" << last_completed_
2169                  << " index=" << index
2170                  << " first_available_=" << first_available_
2171                  << " pending_kernels_.size()=" << pending_kernels_.size();
2172     }
2173     if (pending_kernels_[index].queued_count == queued_count) {
2174       pending_kernels_[index].terminated = true;
2175       weight = pending_kernels_[index].weight;
2176       break;
2177     }
2178     index = (index + 1) % pending_kernels_.size();
2179   }
2180   // Next move last_completed_ forward past all completed kernels.  In theory
2181   // kernels should always complete in queued order so we should be able to
2182   // advance the completed frontier to the just-completed PendingKernel.  In
2183   // practice we occasionally see the termination callbacks arrive out of
2184   // order probably because of thread scheduling.  Eventually we may support
2185   // out-of- order completion involving multiple compute streams so here we
2186   // follow a conservative approach and wait for every single callback to
2187   // arrive before advancing the frontier.
2188   while (true) {
2189     int next_index = (last_completed_ + 1) % pending_kernels_.size();
2190     if (next_index == first_available_) break;
2191     if (pending_kernels_[next_index].terminated) {
2192       last_completed_ = next_index;
2193     } else {
2194       break;
2195     }
2196   }
2197   if (last_completed_ >= 0) {
2198     int64_t v = pending_kernels_[last_completed_].queued_count;
2199     last_terminated_count_ = v;
2200     if (allocator_) {
2201       allocator_->SetSafeFrontier(v);
2202     }
2203   }
2204   // Last decrease num_pending before maybe waking a waiter.
2205   num_pending_ -= weight;
2206   pending_decreased_.notify_all();
2207 }
2208 
2209 }  // namespace tensorflow
2210 
2211 #endif  // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
2212