1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // TODO(opensource): Use a more generic sounding preprocessor name than
17 // GOOGLE_CUDA
18 #if (defined(GOOGLE_CUDA) && GOOGLE_CUDA) || \
19     (defined(TENSORFLOW_USE_ROCM) && TENSORFLOW_USE_ROCM)
20 
21 #if TENSORFLOW_USE_ROCM
22 #include "rocm/include/hip/hip_runtime.h"
23 #endif
24 
25 #define EIGEN_USE_GPU
26 
27 #include <stdlib.h>
28 #include <string.h>
29 
30 #include <algorithm>
31 #include <list>
32 #include <map>
33 #include <tuple>
34 #include <vector>
35 
36 #include "absl/container/flat_hash_set.h"
37 #include "absl/strings/str_split.h"
38 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
39 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
40 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
41 #include "tensorflow/core/common_runtime/device_factory.h"
42 #include "tensorflow/core/common_runtime/gpu/gpu_device.h"
43 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
44 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
45 #include "tensorflow/core/common_runtime/gpu/gpu_init.h"
46 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
47 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
48 #include "tensorflow/core/common_runtime/gpu_device_context.h"
49 #include "tensorflow/core/common_runtime/local_device.h"
50 #include "tensorflow/core/framework/allocator.h"
51 #include "tensorflow/core/framework/device_base.h"
52 #include "tensorflow/core/framework/op_kernel.h"
53 #include "tensorflow/core/framework/tensor.h"
54 #include "tensorflow/core/framework/tensor.pb.h"
55 #include "tensorflow/core/framework/types.h"
56 #include "tensorflow/core/framework/variant_op_registry.h"
57 #include "tensorflow/core/graph/types.h"
58 #include "tensorflow/core/lib/core/errors.h"
59 #include "tensorflow/core/lib/core/status.h"
60 #include "tensorflow/core/lib/strings/numbers.h"
61 #include "tensorflow/core/lib/strings/str_util.h"
62 #include "tensorflow/core/lib/strings/strcat.h"
63 #if GOOGLE_CUDA
64 #include "third_party/gpus/cudnn/cudnn.h"
65 #include "tensorflow/stream_executor/cuda/cuda_activation.h"
66 #elif TENSORFLOW_USE_ROCM
67 #include "tensorflow/core/platform/rocm.h"
68 #endif
69 #include "tensorflow/core/platform/logging.h"
70 #include "tensorflow/core/platform/macros.h"
71 #include "tensorflow/core/platform/stream_executor.h"
72 #include "tensorflow/core/platform/types.h"
73 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
74 #include "tensorflow/core/public/session_options.h"
75 #include "tensorflow/core/util/device_name_utils.h"
76 #include "tensorflow/core/util/env_var.h"
77 #include "tensorflow/core/util/stream_executor_util.h"
78 #include "tensorflow/stream_executor/gpu/gpu_stream.h"
79 #include "tensorflow/stream_executor/platform/dso_loader.h"
80 
81 #if !defined(PLATFORM_GOOGLE)
82 #if GOOGLE_CUDA
83 #include "third_party/gpus/cuda/cuda_config.h"
84 #endif
85 #endif
86 
87 namespace tensorflow {
88 
89 #if GOOGLE_CUDA
90 
91 typedef cudaStream_t gpuStream_t;
92 typedef cudaDeviceProp gpuDeviceProp_t;
93 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
94 using se::cuda::ScopedActivateExecutorContext;
95 
96 #elif TENSORFLOW_USE_ROCM
97 
98 typedef hipStream_t gpuStream_t;
99 typedef hipDeviceProp_t gpuDeviceProp_t;
100 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
101 using se::rocm::ScopedActivateExecutorContext;
102 
103 #endif
104 
105 // Eigen Ops directly allocate memory only for temporary buffers used
106 // during OpKernel::Compute().  The recommended way of allocating such
107 // memory is via OpKernelContext::allocate_temp().  However, Eigen Ops
108 // don't have access to OpKernelContext, instead they get access to
109 // memory directly through the device allocator.  As an Open Source
110 // project, Eigen assumes allocator semantics similar to those of the
111 // CUDA or ROCm memory allocator, and may not work correctly due to race
112 // conditions if used with some other allocator.  For safety, we need
113 // to delay deallocation calls out of Eigen until all events on the
114 // corresponding stream have completed.  The following two classes
115 // serve this purpose in two different compilation environments.
116 
117 class EigenGpuStreamDevice : public ::Eigen::StreamInterface {
118  public:
EigenGpuStreamDevice()119   EigenGpuStreamDevice()
120       : scratch_(nullptr), semaphore_(nullptr), context_(nullptr) {}
~EigenGpuStreamDevice()121   ~EigenGpuStreamDevice() override {}
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,TfDeviceId tf_device_id,::tensorflow::Allocator * alloc,char * scratch)122   void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
123                     TfDeviceId tf_device_id, ::tensorflow::Allocator* alloc,
124                     char* scratch) {
125     if (LogMemory::IsEnabled()) {
126       operation_ = context->op_kernel().name() + "/EigenAllocator";
127       step_id_ = context->step_id();
128     }
129     context_ = context;
130     scratch_ = scratch;
131     semaphore_ =
132         reinterpret_cast<unsigned int*>(scratch + Eigen::kGpuScratchSize);
133     stream_ = gpu_stream;
134     allocator_ = alloc;
135     PlatformDeviceId platform_device_id;
136     TF_CHECK_OK(
137         GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
138     device_prop_ = &Eigen::GetGpuDeviceProperties(platform_device_id.value());
139   }
140 
stream() const141   const gpuStream_t& stream() const override { return *stream_; }
deviceProperties() const142   const gpuDeviceProp_t& deviceProperties() const override {
143     return *device_prop_;
144   }
145 
allocate(size_t num_bytes) const146   void* allocate(size_t num_bytes) const override {
147     void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes);
148     if (ret == nullptr) {
149       if (context_) {
150         context_->SetStatus(errors::ResourceExhausted(
151             strings::StrCat("Ran out of GPU memory when allocating ", num_bytes,
152                             " bytes for ", operation_)));
153       } else {
154         LOG(FATAL)
155             << "EigenAllocator for GPU ran out of memory when allocating "
156             << num_bytes << ". See error logs for more detailed info.";
157       }
158     }
159     if (LogMemory::IsEnabled() && ret != nullptr) {
160       LogMemory::RecordRawAllocation(operation_, step_id_, num_bytes, ret,
161                                      allocator_);
162     }
163     return ret;
164   }
deallocate(void * buffer) const165   void deallocate(void* buffer) const override {
166     if (LogMemory::IsEnabled() && buffer != nullptr) {
167       LogMemory::RecordRawDeallocation(operation_, step_id_, buffer, allocator_,
168                                        true);
169     }
170     AsyncFreeData* afData =
171         new AsyncFreeData(allocator_, buffer, operation_, step_id_);
172 #if GOOGLE_CUDA
173     cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0);
174     CHECK_EQ(err, cudaSuccess);
175 #elif TENSORFLOW_USE_ROCM
176     hipError_t err = hipStreamAddCallback(*stream_, asyncFree, afData, 0);
177     CHECK_EQ(err, hipSuccess);
178 #endif
179   }
180 
181   // Return a pointer to a per stream scratchpad of 1024 bytes residing
182   // in global memory.
scratchpad() const183   void* scratchpad() const override { return scratch_; }
184 
185   // Return a semaphore. The semaphore is initially initialized to 0, and
186   // each kernel using it is responsible for resetting to 0 upon completion
187   // to maintain the invariant that the semaphore is always equal to 0 upon
188   // each kernel start.
semaphore() const189   unsigned int* semaphore() const override { return semaphore_; }
190 
191  private:
192   struct AsyncFreeData {
AsyncFreeDatatensorflow::EigenGpuStreamDevice::AsyncFreeData193     AsyncFreeData(::tensorflow::Allocator* a, void* p, const string& o,
194                   const int64 s)
195         : allocator_(a), address_(p), operation_(o), step_id_(s) {}
196     ::tensorflow::Allocator* allocator_;
197     void* address_;
198     const string operation_;
199     const int64 step_id_;
200   };
201 
202 #if GOOGLE_CUDA
asyncFree(gpuStream_t stream,cudaError_t status,void * userData)203   static void CUDART_CB asyncFree(gpuStream_t stream, cudaError_t status,
204                                   void* userData)
205 #elif TENSORFLOW_USE_ROCM
206   static void asyncFree(gpuStream_t stream, hipError_t status, void* userData)
207 #endif
208   {
209     AsyncFreeData* data = static_cast<AsyncFreeData*>(userData);
210     if (LogMemory::IsEnabled()) {
211       LogMemory::RecordRawDeallocation(data->operation_, data->step_id_,
212                                        data->address_, data->allocator_, false);
213     }
214     data->allocator_->DeallocateRaw(data->address_);
215     delete data;
216   }
217 
218   string operation_;
219   int64 step_id_;
220   const gpuStream_t* stream_;           // Not owned.
221   const gpuDeviceProp_t* device_prop_;  // Not owned.
222   ::tensorflow::Allocator* allocator_;  // Not owned.
223   mutable char* scratch_;
224   mutable unsigned int* semaphore_;
225   OpKernelContext* context_;
226 
227   TF_DISALLOW_COPY_AND_ASSIGN(EigenGpuStreamDevice);
228 };
229 
230 // This factory helps to ensure that different GPU device objects that refer to
231 // the same physical device and stream group id use the same stream group
232 // object (and therefore the same CUDA streams). This is necessary since there
233 // is a single memory allocator per device (see ProcessState::GetGPUAllocator)
234 // and allocators must not be shared across streams.
235 class BaseGPUDevice::StreamGroupFactory {
236  public:
237   // Returns the unique stream group for use with the stream defined by
238   // {tf_device_id, stream_group_within_gpu}, creating it if it does not yet
239   // exist.
240   // This function is thread safe.
GetOrCreate(TfDeviceId tf_device_id,int stream_group_within_gpu,se::StreamExecutor * executor,const GPUOptions & options)241   BaseGPUDevice::StreamGroup* GetOrCreate(TfDeviceId tf_device_id,
242                                           int stream_group_within_gpu,
243                                           se::StreamExecutor* executor,
244                                           const GPUOptions& options) {
245     mutex_lock guard(lock_);
246     StreamGroup* group =
247         &streams_[key_type(tf_device_id.value(), stream_group_within_gpu)];
248     if (!group->compute) {
249       int priority = GetPriority(tf_device_id.value(), options);
250       group->priority = priority;
251       group->compute = GetStream(executor, priority);
252       group->compute->Init();
253       VLOG(2) << "Created stream[" << stream_group_within_gpu
254               << "] = " << group->compute << " with priority: " << priority;
255 
256 #if TENSORFLOW_USE_ROCM
257       // ROCm streams are lightweight and will not necessarily trigger device
258       // queue init until they are first used. For optimal performance,
259       // compute and nccl streams must be immediate siblings.
260       group->nccl = GetStream(executor, priority);
261       group->nccl->Init();
262       VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
263               << "] = " << group->nccl;
264 
265       // Force underlying resource creation now.
266       group->compute->ThenWaitFor(group->nccl);
267       group->nccl->ThenWaitFor(group->compute);
268 #endif
269 
270       group->host_to_device = GetStream(executor, priority);
271       group->host_to_device->Init();
272       VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
273               << "] = " << group->host_to_device;
274 
275       group->device_to_host = GetStream(executor, priority);
276       group->device_to_host->Init();
277       VLOG(2) << "Created device_to_host_stream[" << stream_group_within_gpu
278               << "] = " << group->device_to_host;
279 
280       int num_d2d_streams =
281           options.experimental().num_dev_to_dev_copy_streams();
282       if (num_d2d_streams == 0) num_d2d_streams = 1;
283       if (num_d2d_streams < 1 || num_d2d_streams > 4) {
284         LOG(ERROR)
285             << "Illegal GPUOptions.experimental.num_dev_to_dev_copy_streams="
286             << num_d2d_streams << " set to 1 instead.";
287         num_d2d_streams = 1;
288       }
289       for (int i = 0; i < num_d2d_streams; ++i) {
290         se::Stream* stream = GetStream(executor, priority);
291         stream->Init();
292         group->device_to_device.push_back(stream);
293         VLOG(2) << "Created device_to_device_stream[" << stream_group_within_gpu
294                 << "] = " << group->device_to_device.back();
295       }
296     }
297     return group;
298   }
299 
300   // Returns a reference to the StreamGroupFactory singleton. Note that this is
301   // never destroyed, so the objects it owns are never deleted.
Global()302   static StreamGroupFactory& Global() {
303     static StreamGroupFactory* instance = new StreamGroupFactory();
304     return *instance;
305   }
306 
307   // Helper method for unit tests to reset the streams. Never use in production.
TestOnlyReset()308   void TestOnlyReset() {
309     mutex_lock guard(lock_);
310     for (auto& item : streams_) {
311       auto& stream = item.second;
312       if (stream.compute) {
313         delete stream.compute;
314         stream.compute = nullptr;
315       }
316 #if TENSORFLOW_USE_ROCM
317       if (stream.nccl) {
318         delete stream.nccl;
319         stream.nccl = nullptr;
320       }
321 #endif
322       if (stream.host_to_device) {
323         delete stream.host_to_device;
324         stream.host_to_device = nullptr;
325       }
326       if (stream.device_to_host) {
327         delete stream.device_to_host;
328         stream.device_to_host = nullptr;
329       }
330       while (!stream.device_to_device.empty()) {
331         auto back = stream.device_to_device.back();
332         if (back) {
333           delete back;
334         }
335         stream.device_to_device.pop_back();
336       }
337     }
338     streams_.clear();
339   }
340 
341  private:
342   // Returns priority for the given virtual GPU id from the session options.
343   // Returns 0 if no virtual devices are specified.
GetPriority(int tf_device_id,const GPUOptions & options)344   int GetPriority(int tf_device_id, const GPUOptions& options) {
345     int id = tf_device_id;
346     int i = 0;
347     int priority = 0;
348     while (i < options.experimental().virtual_devices_size()) {
349       const int size =
350           options.experimental().virtual_devices().Get(i).priority_size();
351       if (id >= size) {
352         id -= size;
353       } else {
354         priority =
355             options.experimental().virtual_devices().Get(i).priority().Get(id);
356         break;
357       }
358       i++;
359     }
360     return priority;
361   }
362 
363   // Returns a Stream with the underlying GPUStream with the given priority.
GetStream(se::StreamExecutor * executor,int priority)364   se::Stream* GetStream(se::StreamExecutor* executor, int priority) {
365     auto stream = new se::Stream(executor);
366     static_cast<stream_executor::gpu::GpuStream*>(stream->implementation())
367         ->SetPriority(priority);
368     return stream;
369   }
370 
371   mutex lock_;
372   using key_type = std::tuple<int, int>;
373   std::map<key_type, StreamGroup> streams_;
374 
375   // StreamGroupFactory cannot be created directly; Call
376   // StreamGroupFactory::Global() to get the global instance.
377   StreamGroupFactory() = default;
378   TF_DISALLOW_COPY_AND_ASSIGN(StreamGroupFactory);
379 };
380 
BaseGPUDevice(const SessionOptions & options,const string & name,Bytes memory_limit,const DeviceLocality & locality,TfDeviceId tf_device_id,const string & physical_device_desc,Allocator * gpu_allocator,Allocator * cpu_allocator,bool sync_every_op)381 BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
382                              Bytes memory_limit, const DeviceLocality& locality,
383                              TfDeviceId tf_device_id,
384                              const string& physical_device_desc,
385                              Allocator* gpu_allocator, Allocator* cpu_allocator,
386                              bool sync_every_op)
387     : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
388                                                          memory_limit, locality,
389                                                          physical_device_desc)),
390       gpu_allocator_(gpu_allocator),
391       cpu_allocator_(cpu_allocator),
392       scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
393       tf_device_id_(tf_device_id),
394       sync_every_op_(sync_every_op) {
395   GPUProcessState::singleton()->EnableGPUDevice();
396 }
397 
~BaseGPUDevice()398 BaseGPUDevice::~BaseGPUDevice() {
399   delete gpu_device_info_;
400   gpu_allocator_->DeallocateRaw(scratch_);
401   device_context_->Unref();
402 }
403 
404 // This should be idempotent if already initialized.
InitScratchBuffers()405 Status BaseGPUDevice::InitScratchBuffers() {
406   mutex_lock l(scratch_init_mutex_);
407   if (!scratch_) {
408     DCHECK(stream_);
409     size_t scratch_buffer_size = Eigen::kGpuScratchSize + sizeof(unsigned int);
410     ScopedMemoryDebugAnnotation op_annotation("ScratchBuffer");
411     void* scratch_buffer = gpu_allocator_->AllocateRaw(
412         Allocator::kAllocatorAlignment, scratch_buffer_size);
413     if (scratch_buffer == nullptr) {
414       return errors::FailedPrecondition(
415           "Failed to allocate scratch buffer for device ",
416           tf_device_id_.value());
417     }
418     se::DeviceMemory<char> mem(
419         se::DeviceMemoryBase(scratch_buffer, scratch_buffer_size));
420     TF_RETURN_IF_ERROR(executor_->SynchronousMemZero(
421         &mem, Eigen::kGpuScratchSize + sizeof(unsigned int)));
422     scratch_ = static_cast<char*>(scratch_buffer);
423   }
424   return Status::OK();
425 }
426 
Init(const SessionOptions & options)427 Status BaseGPUDevice::Init(const SessionOptions& options) {
428   auto executor_status = DeviceIdUtil::ExecutorForTfDeviceId(
429       DEVICE_GPU, GPUMachineManager(), tf_device_id_);
430   if (!executor_status.status().ok()) {
431     return errors::Internal("Failed to get StreamExecutor for device ",
432                             tf_device_id_.value());
433   }
434 
435   executor_ = executor_status.ValueOrDie();
436 
437   stream_ = StreamGroupFactory::Global().GetOrCreate(
438       tf_device_id_, 0, executor_, options.config.gpu_options());
439   device_context_ =
440       new GPUDeviceContext(0, stream_->compute,
441 #if TENSORFLOW_USE_ROCM
442                            stream_->nccl,
443 #endif
444                            stream_->host_to_device, stream_->device_to_host,
445                            stream_->device_to_device);
446 
447   em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
448                                                   options.config.gpu_options());
449 
450   GPUKernelTracker::Params tracker_params(
451       options.config.gpu_options().experimental().kernel_tracker_max_interval(),
452       options.config.gpu_options().experimental().kernel_tracker_max_bytes(),
453       options.config.gpu_options().experimental().kernel_tracker_max_pending());
454   timestamped_allocator_ =
455       options.config.gpu_options().experimental().timestamped_allocator();
456   pending_cap_ = tracker_params.max_pending;
457   if (timestamped_allocator_ ||
458       (tracker_params.max_interval > 0 || tracker_params.max_bytes > 0 ||
459        tracker_params.max_pending > 0)) {
460     SharedCounter* timing_counter = nullptr;
461     if (timestamped_allocator_) {
462       // In this case the SharedCounter was already created and set in the
463       // associated Allocator, with ownership by GPUProcessState.
464       // The GPUKernelTracker will use this SharedCounter, instead of
465       // owning its own.
466       timing_counter =
467           GPUProcessState::singleton()->GPUAllocatorCounter(tf_device_id_);
468       DCHECK(timing_counter);
469     }
470     kernel_tracker_.reset(new GPUKernelTracker(
471         tracker_params, Env::Default(), stream_->compute, timing_counter,
472         timestamped_allocator_ ? gpu_allocator_ : nullptr, em_));
473   }
474 
475   gpu_device_info_ = new GpuDeviceInfo;
476   gpu_device_info_->stream = stream_->compute;
477   gpu_device_info_->default_context = device_context_;
478   gpu_device_info_->event_mgr = em_;
479   PlatformDeviceId platform_device_id;
480   TF_RETURN_IF_ERROR(
481       GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id));
482   gpu_device_info_->gpu_id = platform_device_id.value();
483   set_tensorflow_gpu_device_info(gpu_device_info_);
484 
485   // Whether and how the GPU device uses its own threadpool.
486   // This option is experimental. Once we confirm the best setting, we
487   // may change the default behavior and completely remove this flag.
488   // Default values might change in future releases.
489   // Possible values:
490   //   * global: GPU uses threads shared with CPU in the main compute
491   //          thread-pool. This is currently the default.
492   //   * gpu_private: GPU uses threads dedicated to this device.
493   //   * gpu_shared: All GPUs share a dedicated thread pool.
494   string gpu_thread_mode;
495   TF_RETURN_IF_ERROR(
496       ReadStringFromEnvVar("TF_GPU_THREAD_MODE", "global", &gpu_thread_mode));
497   gpu_thread_mode = absl::AsciiStrToLower(gpu_thread_mode);
498   if (gpu_thread_mode != "global") {
499     int64_t gpu_thread_count = -1;
500     // Default to two threads. One for device compute and another for memory
501     // copies.
502     TF_RETURN_IF_ERROR(
503         ReadInt64FromEnvVar("TF_GPU_THREAD_COUNT", 2, &gpu_thread_count));
504     if (gpu_thread_mode == "gpu_private") {
505       // TODO(zhengxq): since these threads only serve a single GPU device,
506       //   we should set the device context once for each thread, and avoid
507       //   setting them for each kernel.
508       // TODO(zhengxq): pin the thread to the same socket of the target GPU.
509       thread_pool_.reset(new thread::ThreadPool(
510           options.env, ThreadOptions(),
511           strings::StrCat("gpu_private_", tf_device_id_.value()),
512           static_cast<int32>(gpu_thread_count),
513           !options.config.experimental().disable_thread_spinning(),
514           /*allocator=*/nullptr));
515       set_tensorflow_device_thread_pool(thread_pool_.get());
516     } else if (gpu_thread_mode == "gpu_shared") {
517       static thread::ThreadPool* thread_pool = new thread::ThreadPool(
518           options.env, ThreadOptions(), "gpu_shared",
519           static_cast<int32>(gpu_thread_count),
520           !options.config.experimental().disable_thread_spinning(),
521           /*allocator=*/nullptr);
522       set_tensorflow_device_thread_pool(thread_pool);
523     } else {
524       string error_message =
525           strings::StrCat("Invalid gpu_thread_mode: ", gpu_thread_mode);
526       LOG(WARNING) << error_message;
527       return errors::InvalidArgument(error_message);
528     }
529   }
530 
531   return Status::OK();
532 }
533 
ComputeOpKernelDebugString(const OpKernel & op_kernel,const int & stream_id)534 string BaseGPUDevice::ComputeOpKernelDebugString(const OpKernel& op_kernel,
535                                                  const int& stream_id) {
536   return strings::StrCat(op_kernel.name(), " op ", op_kernel.type_string(),
537                          " on GPU ", tf_device_id_.value(), " stream[",
538                          stream_id, "]");
539 }
540 
541 namespace {
GetOpsToLogFromEnv()542 const absl::flat_hash_set<std::string>* GetOpsToLogFromEnv() {
543   auto* result = new absl::flat_hash_set<std::string>;
544   const char* env = getenv("TF_GPU_DEBUG_OPS_TO_LOG");
545   if (!env) {
546     return result;
547   }
548 
549   std::vector<absl::string_view> ops = absl::StrSplit(env, ',');
550   LOG(INFO) << "Will log inputs & outputs from the following ops: ";
551   for (absl::string_view op : ops) {
552     result->insert(std::string(op));
553     LOG(INFO) << "  |" << op << "|";
554   }
555 
556   return result;
557 }
558 
ShouldLogInputsAndOutputs(OpKernel * op_kernel)559 bool ShouldLogInputsAndOutputs(OpKernel* op_kernel) {
560   static const absl::flat_hash_set<std::string>& ops_to_log =
561       *GetOpsToLogFromEnv();
562   return ops_to_log.count(op_kernel->type_string());
563 }
564 }  // namespace
565 
CopyGpuTensorToHostDebugOnly(const Tensor & gpu_tensor)566 Tensor BaseGPUDevice::CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor) {
567   Tensor host_tensor(gpu_tensor.dtype(), gpu_tensor.shape());
568   CHECK(device_context_->stream()
569             ->ThenMemcpy(host_tensor.data(),
570                          se::DeviceMemoryBase(gpu_tensor.data(),
571                                               gpu_tensor.TotalBytes()),
572                          gpu_tensor.TotalBytes())
573             .BlockHostUntilDone()
574             .ok());
575   return host_tensor;
576 }
577 
LogInputs(OpKernel * op_kernel,OpKernelContext * context)578 void BaseGPUDevice::LogInputs(OpKernel* op_kernel, OpKernelContext* context) {
579   LOG(INFO) << "Inputs for " << op_kernel->name() << " (total "
580             << context->num_inputs() << "):";
581   for (int i = 0; i < context->num_inputs(); i++) {
582     if (!context->has_input(i)) {
583       LOG(INFO) << "input # " << i << " is absent";
584       continue;
585     }
586 
587     Tensor input = context->input_memory_type(i) == DEVICE_MEMORY
588                        ? CopyGpuTensorToHostDebugOnly(context->input(i))
589                        : context->input(i);
590 
591     LOG(INFO) << "input # " << i;
592     LOG(INFO) << input.DebugString(-1);
593   }
594   LOG(INFO) << "";
595 }
596 
LogOutputs(OpKernel * op_kernel,OpKernelContext * context)597 void BaseGPUDevice::LogOutputs(OpKernel* op_kernel, OpKernelContext* context) {
598   if (!context->status().ok()) {
599     LOG(INFO) << op_kernel->name()
600               << " failed: " << context->status().error_message();
601     return;
602   }
603 
604   LOG(INFO) << "Outputs for " << op_kernel->name() << " (total "
605             << context->num_inputs() << "):";
606   for (int i = 0; i < context->num_outputs(); i++) {
607     Tensor* output_ptr = context->mutable_output(i);
608     if (output_ptr == nullptr) {
609       LOG(INFO) << "output # " << i << " is null";
610       continue;
611     }
612 
613     Tensor output = context->output_memory_type(i) == DEVICE_MEMORY
614                         ? CopyGpuTensorToHostDebugOnly(*output_ptr)
615                         : *output_ptr;
616 
617     LOG(INFO) << "output # " << i;
618     LOG(INFO) << output.DebugString(-1);
619   }
620   LOG(INFO) << "";
621 }
622 
Compute(OpKernel * op_kernel,OpKernelContext * context)623 void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
624   // NOTE(tucker): We need to discriminate between Eigen GPU
625   // operations and all others.  If an operation is Eigen
626   // implemented (or otherwise tries to launch a GPU kernel
627   // directly), we need to establish a stacked-scoped environment
628   // that directs it to execute on the proper device.  Otherwise we
629   // expect the Op to use StreamExecutor directly and correctly.
630   GPUDeviceContext* gpu_device_context = device_context_;
631   if (context->op_device_context() != nullptr) {
632     gpu_device_context =
633         static_cast<GPUDeviceContext*>(context->op_device_context());
634   }
635   se::Stream* stream = gpu_device_context->stream();
636   const auto stream_id = gpu_device_context->stream_id();
637 
638   const bool vlog_1 = VLOG_IS_ON(1);
639 
640   if (vlog_1) {
641     VLOG(1) << "GpuDevice::ComputeHelper "
642             << ComputeOpKernelDebugString(*op_kernel, stream_id);
643   }
644 
645   if (kernel_tracker_.get()) {
646     context->set_record_memory_consumption(true);
647     if (pending_cap_ > 0) {
648       kernel_tracker_->PauseWhilePendingExceeds(pending_cap_);
649     }
650   }
651   ScopedActivateExecutorContext scoped_activation{stream->parent()};
652   ScopedMemoryDebugAnnotation op_annotation(op_kernel->name_view().data(),
653                                             context->step_id());
654   bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
655 
656   if (should_log_inputs_and_outputs) {
657     LogInputs(op_kernel, context);
658   }
659 
660   op_kernel->Compute(context);
661 
662   if (should_log_inputs_and_outputs) {
663     LogOutputs(op_kernel, context);
664   }
665 
666   if (context->status().ok()) {
667     if (sync_every_op_) {
668       // Note: GPUUtil::Sync() only syncs the default stream.
669       // We need to either sync the stream used by this op, or
670       // all streams.  Given that this flag is typically used for
671       // debugging it makes more sense to sync all GPU activity.
672       context->SetStatus(GPUUtil::SyncAll(this));
673       if (vlog_1) {
674         VLOG(1) << "GpuDevice::ComputeHelper finished "
675                 << ComputeOpKernelDebugString(*op_kernel, stream_id);
676       }
677     } else if (vlog_1) {
678       VLOG(1) << "GpuDevice::ComputeHelper scheduled "
679               << ComputeOpKernelDebugString(*op_kernel, stream_id);
680     }
681     if (kernel_tracker_) {
682       GPUKernelTracker* tracker = kernel_tracker_.get();
683       DCHECK(tracker);
684       uint64 queued_count = tracker->MaybeQueue(context);
685       if (queued_count > 0) {
686         em_->ThenExecute(stream, [tracker, queued_count]() {
687           tracker->RecordTerminated(queued_count);
688         });
689       }
690     }
691   } else {
692     if (vlog_1) {
693       VLOG(1) << "GpuDevice::ComputeHelper failed to schedule "
694               << ComputeOpKernelDebugString(*op_kernel, stream_id);
695     }
696   }
697 }
698 
Sync()699 Status BaseGPUDevice::Sync() {
700   DCHECK_NE(stream_, nullptr);
701 
702   // Device::Sync is supposed to block until all operations queued on the device
703   // at the time of the call have completed.  On GPUs, only operations enqueued
704   // on the compute stream can remain pending after the (Async)OpKernel that
705   // enqueued the operation has completed.  We do use other streams for copies
706   // and collectives, but in those cases the (Async)OpKernels themselves block
707   // until the queued operation has finished.
708   return stream_->compute->BlockHostUntilDone();
709 }
710 
ComputeAsync(AsyncOpKernel * op_kernel,OpKernelContext * context,AsyncOpKernel::DoneCallback done)711 void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel,
712                                  OpKernelContext* context,
713                                  AsyncOpKernel::DoneCallback done) {
714   GPUDeviceContext* gpu_device_context = device_context_;
715   if (context->op_device_context() != nullptr) {
716     gpu_device_context =
717         static_cast<GPUDeviceContext*>(context->op_device_context());
718   }
719   se::Stream* stream = gpu_device_context->stream();
720   const auto stream_id = gpu_device_context->stream_id();
721 
722   VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op "
723           << op_kernel->type_string() << " on GPU" << tf_device_id_
724           << " stream[" << stream_id << "]";
725 
726   bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
727 
728   if (should_log_inputs_and_outputs) {
729     LogInputs(op_kernel, context);
730     AsyncOpKernel::DoneCallback parent_done = done;
731     done = [this, parent_done, should_log_inputs_and_outputs, op_kernel,
732             context]() {
733       LogOutputs(op_kernel, context);
734       parent_done();
735     };
736   }
737 
738   ScopedActivateExecutorContext scoped_activation{stream->parent()};
739   op_kernel->ComputeAsync(context, std::move(done));
740 }
741 
MaybeCopyTensorToGPU(const AllocatorAttributes & alloc_attrs,const Tensor & from,Tensor * to,StatusCallback done)742 Status BaseGPUDevice::MaybeCopyTensorToGPU(
743     const AllocatorAttributes& alloc_attrs, const Tensor& from, Tensor* to,
744     StatusCallback done) {
745   if (alloc_attrs.on_host()) {
746     *to = from;
747     done(Status::OK());
748     return Status::OK();
749   } else {
750     if (!DMAHelper::CanUseDMA(&from)) {
751       Status err = errors::Internal("GPU copy from non-DMA ",
752                                     DataTypeString(from.dtype()), " tensor");
753       done(err);
754       return err;
755     }
756     AllocationAttributes allocation_attr;
757     uint64 safe_alloc_frontier = 0;
758     std::function<uint64()> freed_by_func = [this, &safe_alloc_frontier]() {
759       safe_alloc_frontier = SafeAllocFrontier(safe_alloc_frontier);
760       return safe_alloc_frontier;
761     };
762     if (timestamped_allocator_) {
763       allocation_attr.freed_by_func = &freed_by_func;
764     }
765     auto* copy = new Tensor(GetAllocator(alloc_attrs), from.dtype(),
766                             from.shape(), allocation_attr);
767 
768     // If the tensor is not initialized, we likely ran out of memory.
769     if (!copy->IsInitialized()) {
770       delete copy;
771       Status err = errors::ResourceExhausted(
772           "OOM when allocating tensor of shape ", from.shape().DebugString(),
773           " and type ", DataTypeString(from.dtype()));
774       done(err);
775       return err;
776     }
777 
778     auto wrapped_done = [to, copy, done = std::move(done)](const Status& s) {
779       if (s.ok()) {
780         *to = std::move(*copy);
781       }
782       delete copy;
783       done(s);
784     };
785 
786     profiler::ScopedAnnotation annotation("MakeTensorFromProto");
787     device_context_->CopyCPUTensorToDevice(
788         &from, this, copy, std::move(wrapped_done),
789         !timestamped_allocator_ /*sync_dst_compute*/);
790     return Status::OK();
791   }
792 }
793 
MakeTensorFromProto(const TensorProto & tensor_proto,const AllocatorAttributes alloc_attrs,Tensor * tensor)794 Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto,
795                                           const AllocatorAttributes alloc_attrs,
796                                           Tensor* tensor) {
797   AllocatorAttributes attr;
798   attr.set_on_host(true);
799   attr.set_gpu_compatible(true);
800   Allocator* host_alloc = GetAllocator(attr);
801   Tensor parsed(tensor_proto.dtype());
802   if (!parsed.FromProto(host_alloc, tensor_proto)) {
803     return errors::InvalidArgument("Cannot parse tensor from proto: ",
804                                    tensor_proto.DebugString());
805   }
806 
807   ScopedMemoryDebugAnnotation op_annotation("MakeTensorFromProto", "dynamic",
808                                             parsed.dtype(), &parsed.shape());
809   if (parsed.dtype() == DT_VARIANT) {
810     const Variant* from = parsed.flat<Variant>().data();
811     int numa_node = attributes().locality().numa_node();
812     Tensor copy(cpu_allocator(numa_node), DT_VARIANT, parsed.shape());
813     Variant* copy_variant = copy.flat<Variant>().data();
814 
815     std::list<Notification> notifications;
816     Status copy_status;
817     auto copier = [this, &alloc_attrs, ¬ifications, ©_status](
818                       const Tensor& from, Tensor* to) {
819       // Copier isn't run in a multithreaded environment, so we don't
820       // have to worry about the notifications list being modified in parallel.
821       notifications.emplace_back();
822       Notification& n = *notifications.rbegin();
823       return MaybeCopyTensorToGPU(alloc_attrs, from, to,
824                                   [&n, ©_status](const Status& s) {
825                                     if (copy_status.ok()) {
826                                       copy_status.Update(s);
827                                     }
828                                     n.Notify();
829                                   });
830     };
831     Status s;
832     for (int64_t ix = 0; ix < parsed.NumElements(); ++ix) {
833       s = VariantDeviceCopy(VariantDeviceCopyDirection::HOST_TO_DEVICE,
834                             from[ix], ©_variant[ix], copier);
835       if (!s.ok()) {
836         break;
837       }
838     }
839     for (auto& n : notifications) {
840       n.WaitForNotification();
841     }
842     if (!s.ok()) {
843       return s;
844     }
845     *tensor = std::move(copy);
846     return copy_status;
847   } else {
848     Notification n;
849     Status status;
850     TF_RETURN_IF_ERROR(MaybeCopyTensorToGPU(alloc_attrs, parsed, tensor,
851                                             [&n, &status](const Status& s) {
852                                               status = s;
853                                               n.Notify();
854                                             }));
855     n.WaitForNotification();
856     return status;
857   }
858 }
859 
CopyTensorInSameDevice(const Tensor * input_tensor,Tensor * output_tensor,const DeviceContext * device_context,StatusCallback done)860 void BaseGPUDevice::CopyTensorInSameDevice(const Tensor* input_tensor,
861                                            Tensor* output_tensor,
862                                            const DeviceContext* device_context,
863                                            StatusCallback done) {
864   GPUUtil::CopyGPUTensorToSameGPU(static_cast<Device*>(this), device_context,
865                                   input_tensor, output_tensor, std::move(done));
866 }
867 
868 namespace {
869 class ConcretePerOpGpuDevice : public PerOpGpuDevice {
870  public:
ConcretePerOpGpuDevice()871   ConcretePerOpGpuDevice() : device_(&stream_device_) {}
872 
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,TfDeviceId tf_device_id,Allocator * base_allocator,char * scratch)873   void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
874                     TfDeviceId tf_device_id, Allocator* base_allocator,
875                     char* scratch) {
876     stream_device_.Reinitialize(context, gpu_stream, tf_device_id,
877                                 base_allocator, scratch);
878   }
879 
device() const880   const Eigen::GpuDevice& device() const override { return device_; }
881 
882  private:
883   EigenGpuStreamDevice stream_device_;
884   Eigen::GpuDevice device_;
885 };
886 
VerifyVirtualDeviceSettings(const size_t num_gpus_to_use,const GPUOptions & gpu_options,const std::vector<PlatformDeviceId> & visible_gpu_order,const std::vector<PlatformDeviceId> & valid_platform_device_ids,const std::map<int,std::pair<int,int>> & supported_priority_ranges)887 Status VerifyVirtualDeviceSettings(
888     const size_t num_gpus_to_use, const GPUOptions& gpu_options,
889     const std::vector<PlatformDeviceId>& visible_gpu_order,
890     const std::vector<PlatformDeviceId>& valid_platform_device_ids,
891     const std::map<int, std::pair<int, int>>& supported_priority_ranges) {
892   const auto& virtual_devices = gpu_options.experimental().virtual_devices();
893   CHECK(!virtual_devices.empty());
894   if (gpu_options.per_process_gpu_memory_fraction() > 0) {
895     return errors::InvalidArgument(
896         "It's invalid to set per_process_gpu_memory_fraction when "
897         "virtual_devices is set.");
898   }
899   if (num_gpus_to_use < virtual_devices.size()) {
900     return errors::Unknown(
901         "Not enough GPUs to create virtual devices."
902         " num_gpus_to_use: ",
903         num_gpus_to_use, " #virtual_devices: ", virtual_devices.size());
904   }
905   if (!gpu_options.visible_device_list().empty() &&
906       visible_gpu_order.size() != virtual_devices.size()) {
907     return errors::InvalidArgument(
908         "The number of GPUs in visible_device_list doesn't match the number "
909         "of elements in the virtual_devices list.",
910         " #GPUs in visible_device_list: ", visible_gpu_order.size(),
911         " virtual_devices.size(): ", virtual_devices.size());
912   }
913   if (valid_platform_device_ids.size() != virtual_devices.size()) {
914     return errors::Unknown(
915         "The number of valid GPUs doesn't match the number of elements in "
916         "the virtual_devices list.",
917         " #valid GPUs: ", valid_platform_device_ids.size(),
918         " virtual_devices.size(): ", virtual_devices.size());
919   }
920 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
921   // Check memory_limt_mb and priority sizes match if priority is non-empty.
922   bool priority_exists = !virtual_devices.Get(0).priority().empty();
923   for (int i = 0; i < virtual_devices.size(); ++i) {
924     const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
925     const auto& priority = virtual_devices.Get(i).priority();
926     // If the priority is empty in the first one then treat this as having no
927     // priority set in any of the virtual devices for backward compatibility.
928     // Either it's set for all or none.
929     if (!priority_exists) {
930       if (!priority.empty()) {
931         return errors::InvalidArgument(
932             "Priority must be set for all virtual devices or none. But the "
933             "priority is specified for ",
934             i,
935             " while previous devices didn't "
936             "have any set.");
937       }
938     }
939     if (priority_exists && memory_limit_mb.size() != priority.size()) {
940       return errors::InvalidArgument(
941           "Number of virtual device priorities specified doesn't "
942           "match with number of memory_limit_mb specified for GPU# ",
943           i, " memory_limit_mb size: ", memory_limit_mb.size(),
944           " and priority size: ", priority.size());
945     }
946     const int gpu_id = valid_platform_device_ids[i].value();
947     auto it = supported_priority_ranges.find(gpu_id);
948     if (it == supported_priority_ranges.end()) {
949       return errors::Internal(
950           "Failed to find supported priority range for GPU"
951           " device ",
952           gpu_id);
953     }
954     const std::pair<int, int>& priority_range = it->second;
955     for (int p : priority) {
956       if (p > priority_range.first || p < priority_range.second) {
957         return errors::InvalidArgument(
958             "Priority ", p,
959             " is outside the range of supported priorities "
960             "[",
961             priority_range.second, ",", priority_range.first,
962             "] for virtual device ", i, " on GPU# ", gpu_id);
963       }
964     }
965   }
966 #endif
967 
968   return Status::OK();
969 }
970 
MinSystemMemory(int64_t available_memory,int cc_major)971 int64 MinSystemMemory(int64_t available_memory, int cc_major) {
972   // We use the following heuristic for now:
973   //
974   // If the available_memory is < 2GiB, we allocate 225MiB to system memory.
975   // Otherwise, depending on the capability version assign
976   //  500MiB (for cuda_compute_capability <= 6.x) or
977   // 1050MiB (for cuda_compute_capability <= 7.x) or
978   // 1536MiB (for cuda_compute_capability >= 8.x)
979   int64_t min_system_memory;
980   if (available_memory < (1LL << 31)) {
981     min_system_memory = 225 * 1024 * 1024;
982   } else {
983     if (cc_major <= 6) {
984       min_system_memory = 500 * 1024 * 1024;
985     } else if (cc_major <= 7) {
986       min_system_memory = 1050 * 1024 * 1024;
987     } else {
988       min_system_memory = 1536 * 1024 * 1024;
989     }
990   }
991 #if defined(__GNUC__) && defined(__OPTIMIZE__)
992 // Do nothing
993 #elif !defined(__GNUC__) && defined(NDEBUG)
994 // Do nothing
995 #else
996   // Double the amount of available GPU memory in non-opt builds (debug
997   // builds in windows); because in non-opt builds more system memory
998   // is necessary.
999   min_system_memory *= 2;
1000 #endif
1001 
1002 #if defined(ANDROID_TEGRA)
1003   // 1GB system mem for NVIDIA Tegra devices since they use the same mem for
1004   // RAM and Video RAM
1005   min_system_memory = 1 << 30;
1006 #endif
1007 
1008   VLOG(5) << "available_memory = " << available_memory;
1009   VLOG(5) << "min_system_memory = " << min_system_memory;
1010   return min_system_memory;
1011 }
1012 
1013 // Get the memory limit for the virtual device being created on GPU with
1014 // 'platform_device_id', when that virtual device is the only virtual device
1015 // being created on that GPU.
SingleVirtualDeviceMemoryLimit(const GPUOptions & gpu_options,PlatformDeviceId platform_device_id,int64 * memory_limit)1016 Status SingleVirtualDeviceMemoryLimit(const GPUOptions& gpu_options,
1017                                       PlatformDeviceId platform_device_id,
1018                                       int64* memory_limit) {
1019   int64_t total_memory = 0;
1020   int64_t available_memory = 0;
1021   se::StreamExecutor* se = DeviceIdUtil::ExecutorForPlatformDeviceId(
1022                                GPUMachineManager(), platform_device_id)
1023                                .ValueOrDie();
1024   if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
1025     return errors::Unknown("Failed to query available memory for GPU ",
1026                            platform_device_id.value());
1027   }
1028 
1029   int64_t allocated_memory = 0;
1030   const double per_process_gpu_memory_fraction =
1031       gpu_options.per_process_gpu_memory_fraction();
1032   se::CudaComputeCapability cc =
1033       se->GetDeviceDescription().cuda_compute_capability();
1034   if ((per_process_gpu_memory_fraction > 1.0 ||
1035        gpu_options.experimental().use_unified_memory()) &&
1036       !cc.IsAtLeast(se::CudaComputeCapability::PASCAL_)) {
1037     return errors::Internal(
1038         "Unified memory on GPUs with compute capability lower than 6.0 "
1039         "(pre-Pascal class GPUs) does not support oversubscription.");
1040   }
1041 
1042   if (per_process_gpu_memory_fraction == 0) {
1043     allocated_memory = available_memory;
1044     const int64 min_system_memory = MinSystemMemory(available_memory, cc.major);
1045     if (min_system_memory < allocated_memory) {
1046       allocated_memory -= min_system_memory;
1047     }
1048   } else {
1049     allocated_memory = total_memory * per_process_gpu_memory_fraction;
1050   }
1051 
1052   // Override the excluded memory when TF_DEVICE_MIN_SYS_MEMORY_IN_MB is set.
1053   const char* force_device_reserved_bytes =
1054       std::getenv("TF_DEVICE_MIN_SYS_MEMORY_IN_MB");
1055   if (force_device_reserved_bytes != nullptr &&
1056       strcmp(force_device_reserved_bytes, "") != 0) {
1057     int64 reserved_mb;
1058     if (!strings::safe_strto64(force_device_reserved_bytes, &reserved_mb) ||
1059         reserved_mb < 0) {
1060       LOG(WARNING) << "The requested reserved device memory "
1061                    << force_device_reserved_bytes
1062                    << " is invalid. The request will be ignored.";
1063     } else {
1064       // Convert MBytes to Bytes.
1065       int64 allowable_reserved_memory = reserved_mb * 1024 * 1024;
1066       // TF_DEVICE_MIN_SYS_MEMORY_IN_MB overrides
1067       // per_process_gpu_memory_fraction.
1068       if (allowable_reserved_memory <= available_memory) {
1069         allocated_memory = available_memory - allowable_reserved_memory;
1070         VLOG(1) << "Setting the GPU reserved bytes to "
1071                 << strings::HumanReadableNumBytes(allocated_memory)
1072                 << " MBytes";
1073       } else {
1074         LOG(WARNING) << "The requested reserved device memory "
1075                      << strings::HumanReadableNumBytes(
1076                             allowable_reserved_memory)
1077                      << " is larger than the available memory of "
1078                      << strings::HumanReadableNumBytes(available_memory)
1079                      << ". The request is ignored.";
1080       }
1081     }
1082   }
1083   *memory_limit = allocated_memory;
1084   return Status::OK();
1085 }
1086 }  // namespace
1087 
ReinitializeDevice(OpKernelContext * context,PerOpGpuDevice * device,int stream_id,Allocator * allocator)1088 void BaseGPUDevice::ReinitializeDevice(OpKernelContext* context,
1089                                        PerOpGpuDevice* device, int stream_id,
1090                                        Allocator* allocator) {
1091   ConcretePerOpGpuDevice* concrete_device =
1092       static_cast<ConcretePerOpGpuDevice*>(device);
1093   DCHECK(concrete_device);
1094   DCHECK_EQ(stream_id, 0);
1095   const gpuStream_t* gpu_stream = reinterpret_cast<const gpuStream_t*>(
1096       stream_->compute->implementation()->GpuStreamMemberHack());
1097   concrete_device->Reinitialize(context, gpu_stream, tf_device_id_, allocator,
1098                                 scratch_);
1099 }
1100 
MakeGpuDevice()1101 PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice() {
1102   return new ConcretePerOpGpuDevice();
1103 }
1104 
ReinitializeGpuDevice(OpKernelContext * context,PerOpGpuDevice * device,DeviceContext * dc,Allocator * allocator)1105 Status BaseGPUDevice::ReinitializeGpuDevice(OpKernelContext* context,
1106                                             PerOpGpuDevice* device,
1107                                             DeviceContext* dc,
1108                                             Allocator* allocator) {
1109   TF_RETURN_IF_ERROR(InitScratchBuffers());
1110   if (dc) {
1111     const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc);
1112     const int stream_id = gpu_dc->stream_id();
1113     VLOG(1) << "  eigen_gpu_device(" << dc << ") => stream[" << stream_id
1114             << "]";
1115     CHECK_EQ(stream_id, 0);
1116     ReinitializeDevice(context, device, stream_id, allocator);
1117   } else {
1118     ReinitializeDevice(context, device, 0, allocator);
1119   }
1120   return Status::OK();
1121 }
1122 
GetScopedAllocator(AllocatorAttributes attr,int64_t step_id)1123 Allocator* BaseGPUDevice::GetScopedAllocator(AllocatorAttributes attr,
1124                                              int64_t step_id) {
1125   if (attr.scope_id > 0) {
1126     return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance(
1127         attr.scope_id);
1128   }
1129   LOG(FATAL) << "Unexpected call to BaseGPUDevice::GetScopedAllocator "
1130              << "attr.scope_id = " << attr.scope_id;
1131   return gpu_allocator_;
1132 }
1133 
1134 const int BaseGPUDeviceFactory::InterconnectMap::kSameDeviceStrength = 1000;
1135 const int BaseGPUDeviceFactory::InterconnectMap::kStreamExecutorStrength = 1;
1136 
CacheDeviceIds()1137 Status BaseGPUDeviceFactory::CacheDeviceIds() {
1138   if (!cached_device_ids_.empty()) {
1139     return Status::OK();
1140   }
1141 
1142   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1143   se::Platform* gpu_manager = GPUMachineManager();
1144   if (gpu_manager == nullptr) {
1145     return Status::OK();
1146   }
1147 
1148   int device_count = gpu_manager->VisibleDeviceCount();
1149   if (device_count <= 0) {
1150     return Status::OK();
1151   }
1152 
1153   std::vector<PlatformDeviceId> visible_gpu_order(device_count);
1154   std::iota(visible_gpu_order.begin(), visible_gpu_order.end(), 0);
1155   TF_RETURN_IF_ERROR(GetValidDeviceIds(visible_gpu_order, &cached_device_ids_));
1156   return Status::OK();
1157 }
1158 
ListPhysicalDevices(std::vector<string> * devices)1159 Status BaseGPUDeviceFactory::ListPhysicalDevices(std::vector<string>* devices) {
1160   TF_RETURN_IF_ERROR(CacheDeviceIds());
1161   for (PlatformDeviceId platform_device_id : cached_device_ids_) {
1162     const string device_name =
1163         strings::StrCat("/physical_device:GPU:", platform_device_id.value());
1164     devices->push_back(device_name);
1165   }
1166 
1167   return Status::OK();
1168 }
1169 
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)1170 Status BaseGPUDeviceFactory::GetDeviceDetails(
1171     int device_index, std::unordered_map<string, string>* details) {
1172   TF_RETURN_IF_ERROR(CacheDeviceIds());
1173 
1174   if (device_index < 0 || device_index > cached_device_ids_.size()) {
1175     return errors::Internal("Invalid device index: ", device_index);
1176   }
1177   PlatformDeviceId platform_device_id = cached_device_ids_[device_index];
1178 
1179   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1180   se::Platform* gpu_manager = GPUMachineManager();
1181   if (gpu_manager == nullptr) {
1182     return errors::Internal("Cannot get GPUMachineManager");
1183   }
1184   auto desc_status =
1185       gpu_manager->DescriptionForDevice(platform_device_id.value());
1186   if (!desc_status.ok()) {
1187     return desc_status.status();
1188   }
1189 
1190   auto desc = desc_status.ConsumeValueOrDie();
1191   (*details)["device_name"] = desc->name();
1192 #if GOOGLE_CUDA
1193   (*details)["compute_capability"] = desc->cuda_compute_capability().ToString();
1194 #endif  // GOOGLE_CUDA
1195   return Status::OK();
1196 }
1197 
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)1198 Status BaseGPUDeviceFactory::CreateDevices(
1199     const SessionOptions& options, const string& name_prefix,
1200     std::vector<std::unique_ptr<Device>>* devices) {
1201   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1202   se::Platform* gpu_manager = GPUMachineManager();
1203   if (gpu_manager == nullptr) {
1204     return Status::OK();
1205   }
1206   // If there are no GPUs visible, do nothing.
1207   if (gpu_manager->VisibleDeviceCount() <= 0) {
1208     return Status::OK();
1209   }
1210 
1211   size_t num_gpus_to_use = INT_MAX;
1212   auto iter = options.config.device_count().find("GPU");
1213   if (iter != options.config.device_count().end()) {
1214     num_gpus_to_use = iter->second;
1215   }
1216   const auto& gpu_options = options.config.gpu_options();
1217   std::vector<PlatformDeviceId> visible_gpu_order;
1218   std::vector<PlatformDeviceId> valid_platform_device_ids;
1219   // If we aren't going to use any GPUs, don't initialize them.
1220   // We don't want to call ParseVisibleDeviceList if num_gpus_to_use is 0,
1221   // because it treats an empty gpu_options.visible_device_list as 'all GPUs
1222   // are visible'.
1223   if (num_gpus_to_use > 0) {
1224     TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
1225         gpu_options.visible_device_list(), gpu_manager->VisibleDeviceCount(),
1226         &visible_gpu_order));
1227     bool new_gpu_found = false;
1228     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1229       int visible_gpu_id = visible_gpu_order[i].value();
1230 
1231       // Only perform this once per visible gpu id.
1232       if (visible_gpu_initialized_[visible_gpu_id]) {
1233         continue;
1234       }
1235 
1236       visible_gpu_initialized_[visible_gpu_id] = true;
1237       new_gpu_found = true;
1238     }
1239 
1240     // Checking peering and shows matrix if more than one gpu found.
1241     if (new_gpu_found && visible_gpu_order.size() > 1) {
1242       // Enable peer access
1243       TF_RETURN_IF_ERROR(EnablePeerAccess(visible_gpu_order));
1244     }
1245 
1246     TF_RETURN_IF_ERROR(
1247         GetValidDeviceIds(visible_gpu_order, &valid_platform_device_ids));
1248   }
1249   if (num_gpus_to_use > valid_platform_device_ids.size()) {
1250     num_gpus_to_use = valid_platform_device_ids.size();
1251   }
1252   std::map<int, std::pair<int, int>> supported_priority_ranges;
1253   if (!valid_platform_device_ids.empty()) {
1254     // Save the original device.
1255     int original_device = 0;
1256 #if GOOGLE_CUDA
1257     cudaError_t err = cudaGetDevice(&original_device);
1258     if (err != cudaSuccess) {
1259       return errors::Internal("cudaGetDevice() failed. Status: ",
1260                               cudaGetErrorString(err));
1261     }
1262 #elif TENSORFLOW_USE_ROCM
1263     hipError_t err = hipGetDevice(&original_device);
1264     if (err != hipSuccess) {
1265       return errors::Internal("hipGetDevice() failed. Status: ",
1266                               hipGetErrorString(err));
1267     }
1268 #endif
1269 
1270     // Force to implicitly initialize CUDA runtime on each valid GPU before
1271     // CreateGPUDevice().
1272     for (PlatformDeviceId platform_device_id : valid_platform_device_ids) {
1273 #if GOOGLE_CUDA
1274       err = cudaSetDevice(platform_device_id.value());
1275       if (err != cudaSuccess) {
1276         return errors::Internal(
1277             "cudaSetDevice() on GPU:", platform_device_id.value(),
1278             " failed. Status: ", cudaGetErrorString(err));
1279       }
1280       int priority_low, priority_high;
1281       cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1282       if (err != cudaSuccess) {
1283         return errors::Internal(
1284             "cudaDeviceGetStreamPriorityRange() on GPU:", original_device,
1285             " failed. Status: ", cudaGetErrorString(err));
1286       }
1287       VLOG(1) << "Cuda stream priority range on GPU(" << original_device
1288               << "): " << priority_high << "," << priority_low;
1289       supported_priority_ranges.insert(
1290           std::make_pair(platform_device_id.value(),
1291                          std::make_pair(priority_low, priority_high)));
1292 #elif TENSORFLOW_USE_ROCM
1293       err = hipSetDevice(platform_device_id.value());
1294       if (err != hipSuccess) {
1295         return errors::Internal(
1296             "hipSetDevice() on GPU:", platform_device_id.value(),
1297             " failed. Status: ", hipGetErrorString(err));
1298       }
1299       err = hipFree(nullptr);
1300       if (err != hipSuccess) {
1301         return errors::Internal("ROCm runtime implicit initialization on GPU:",
1302                                 platform_device_id.value(),
1303                                 " failed. Status: ", hipGetErrorString(err));
1304       }
1305       int priority_low, priority_high;
1306       hipDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1307       if (err != hipSuccess) {
1308         return errors::Internal(
1309             "hipDeviceGetStreamPriorityRange() on GPU:", original_device,
1310             " failed. Status: ", hipGetErrorString(err));
1311       }
1312       VLOG(1) << "HIP stream priority range on GPU(" << original_device
1313               << "): " << priority_high << "," << priority_low;
1314       supported_priority_ranges.insert(
1315           std::make_pair(platform_device_id.value(),
1316                          std::make_pair(priority_low, priority_high)));
1317 #endif
1318     }
1319     // Reset to the original device.
1320 #if GOOGLE_CUDA
1321     err = cudaSetDevice(original_device);
1322     if (err != cudaSuccess) {
1323       return errors::Internal("cudaSetDevice() on GPU:", original_device,
1324                               " failed. Status: ", cudaGetErrorString(err));
1325     }
1326 #elif TENSORFLOW_USE_ROCM
1327     err = hipSetDevice(original_device);
1328     if (err != hipSuccess) {
1329       return errors::Internal("hipSetDevice() on GPU:", original_device,
1330                               " failed. Status: ", hipGetErrorString(err));
1331     }
1332 #endif
1333 
1334 #if GOOGLE_CUDA
1335     // Log the version of CUDA and cuDNN
1336     int cuda_major_version = CUDART_VERSION / 1000;
1337     int cuda_minor_version = (CUDART_VERSION / 10) % 10;
1338     VLOG(1) << "TensorFlow compiled with CUDA " << cuda_major_version << "."
1339             << cuda_minor_version << " and cuDNN " << CUDNN_MAJOR << "."
1340             << CUDNN_MINOR << "." << CUDNN_PATCHLEVEL;
1341 #endif
1342   }
1343 
1344   std::vector<InterconnectMap> interconnect_maps;
1345   TF_RETURN_IF_ERROR(
1346       GetInterconnectMaps(visible_gpu_order, gpu_manager, &interconnect_maps));
1347 
1348   // Print each interconnect map to the log.
1349   for (const InterconnectMap& im : interconnect_maps) {
1350     VLOG(1) << "Device interconnect " << im.name << " with strength "
1351             << im.strength << " edge matrix:";
1352     string line_buf = "     ";
1353     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1354       strings::StrAppend(&line_buf, visible_gpu_order[i].value(), " ");
1355     }
1356     VLOG(1) << line_buf;
1357     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1358       line_buf = strings::StrCat(visible_gpu_order[i].value(), ":   ");
1359       PlatformDeviceId gpu_id_i = visible_gpu_order[i];
1360       for (int j = 0; j < visible_gpu_order.size(); ++j) {
1361         PlatformDeviceId gpu_id_j = visible_gpu_order[j];
1362         if (im.directed_links.find({gpu_id_i, gpu_id_j}) !=
1363             im.directed_links.end()) {
1364           line_buf.append("Y ");
1365         } else {
1366           line_buf.append("N ");
1367         }
1368       }
1369       VLOG(1) << line_buf;
1370     }
1371   }
1372 
1373   const auto& virtual_devices = gpu_options.experimental().virtual_devices();
1374   if (!virtual_devices.empty()) {
1375     TF_RETURN_IF_ERROR(VerifyVirtualDeviceSettings(
1376         num_gpus_to_use, gpu_options, visible_gpu_order,
1377         valid_platform_device_ids, supported_priority_ranges));
1378     // We've verified that num_gpus_to_use >= virtual_devices.size().
1379     num_gpus_to_use = virtual_devices.size();
1380     CHECK(gpu_options.visible_device_list().empty() ||
1381           valid_platform_device_ids == visible_gpu_order);
1382   }
1383   int next_tf_device_id = 0;
1384   std::vector<int64> memory_limit_bytes;
1385   for (int i = 0; i < num_gpus_to_use; ++i) {
1386     const PlatformDeviceId platform_device_id = valid_platform_device_ids[i];
1387     if (virtual_devices.empty() ||
1388         virtual_devices.Get(i).memory_limit_mb_size() == 0) {
1389       int64_t single_virtual_device_memory_limit = 0;
1390       TF_RETURN_IF_ERROR(
1391           SingleVirtualDeviceMemoryLimit(gpu_options, platform_device_id,
1392                                          &single_virtual_device_memory_limit));
1393       memory_limit_bytes.push_back(single_virtual_device_memory_limit);
1394     } else {
1395       const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
1396       std::transform(memory_limit_mb.begin(), memory_limit_mb.end(),
1397                      std::back_inserter(memory_limit_bytes), [](float mb) {
1398                        return static_cast<int64>(mb) * (1ll << 20);
1399                      });
1400     }
1401     while (next_tf_device_id < memory_limit_bytes.size()) {
1402       TfDeviceId tf_device_id(next_tf_device_id);
1403       ++next_tf_device_id;
1404       TF_RETURN_IF_ERROR(GpuIdManager::InsertTfPlatformDeviceIdPair(
1405           tf_device_id, platform_device_id));
1406     }
1407   }
1408   const int num_tf_gpus = next_tf_device_id;
1409 
1410   LocalityMap device_localities;
1411   TF_RETURN_IF_ERROR(
1412       GetDeviceLocalities(num_tf_gpus, interconnect_maps, &device_localities));
1413 
1414   // Build the GPUDevices
1415   CHECK_EQ(next_tf_device_id, memory_limit_bytes.size());
1416   for (int di = 0; di < num_tf_gpus; ++di) {
1417     TfDeviceId tf_device_id(di);
1418     int64_t bytes = memory_limit_bytes[di];
1419     auto it = device_localities.find(tf_device_id);
1420     if (it == device_localities.end()) {
1421       return errors::Internal("Failed to find DeviceLocality for GPU device ",
1422                               tf_device_id.value());
1423     }
1424     TF_RETURN_IF_ERROR(CreateGPUDevice(options, name_prefix, tf_device_id,
1425                                        bytes, it->second, num_tf_gpus,
1426                                        devices));
1427   }
1428   return Status::OK();
1429 }
1430 
GetShortDeviceDescription(PlatformDeviceId platform_device_id,const se::DeviceDescription & desc)1431 static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
1432                                         const se::DeviceDescription& desc) {
1433 #if GOOGLE_CUDA
1434   // LINT.IfChange
1435   return strings::StrCat(
1436       "device: ", platform_device_id.value(), ", name: ", desc.name(),
1437       ", pci bus id: ", desc.pci_bus_id(),
1438       ", compute capability: ", desc.cuda_compute_capability().ToString());
1439   // LINT.ThenChange(//tensorflow/python/framework/gpu_util.py)
1440 #elif TENSORFLOW_USE_ROCM
1441   return strings::StrCat("device: ", platform_device_id.value(),
1442                          ", name: ", desc.name(),
1443                          ", pci bus id: ", desc.pci_bus_id());
1444 #endif
1445 }
1446 
CreateGPUDevice(const SessionOptions & options,const string & name_prefix,TfDeviceId tf_device_id,int64_t memory_limit,const DeviceLocality & dev_locality,size_t num_tf_gpus,std::vector<std::unique_ptr<Device>> * devices)1447 Status BaseGPUDeviceFactory::CreateGPUDevice(
1448     const SessionOptions& options, const string& name_prefix,
1449     TfDeviceId tf_device_id, int64_t memory_limit,
1450     const DeviceLocality& dev_locality, size_t num_tf_gpus,
1451     std::vector<std::unique_ptr<Device>>* devices) {
1452   CHECK_GE(tf_device_id.value(), 0);
1453   const string device_name =
1454       strings::StrCat(name_prefix, "/device:GPU:", tf_device_id.value());
1455   DeviceIdUtil::CheckValidTfDeviceId(DEVICE_GPU, GPUMachineManager(),
1456                                      tf_device_id);
1457   PlatformDeviceId platform_device_id;
1458   TF_RETURN_IF_ERROR(
1459       GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1460   int numa_node = dev_locality.numa_node();
1461 
1462   se::Platform* gpu_manager = GPUMachineManager();
1463   auto desc_status =
1464       gpu_manager->DescriptionForDevice(platform_device_id.value());
1465   if (!desc_status.ok()) {
1466     return desc_status.status();
1467   }
1468   auto desc = desc_status.ConsumeValueOrDie();
1469 
1470   std::vector<TfDeviceId> peer_gpu_ids;
1471   peer_gpu_ids.reserve(num_tf_gpus);
1472   for (int id = 0; id < num_tf_gpus; ++id) {
1473     TfDeviceId peer_tf_device_id(id);
1474     if (peer_tf_device_id != tf_device_id) {
1475       peer_gpu_ids.push_back(peer_tf_device_id);
1476     }
1477   }
1478 
1479   GPUProcessState* process_state = GPUProcessState::singleton();
1480   Allocator* gpu_allocator = process_state->GetGPUAllocator(
1481       options.config.gpu_options(), tf_device_id, memory_limit, peer_gpu_ids);
1482   if (gpu_allocator == nullptr) {
1483     return errors::Internal("Failed to get memory allocator for TF GPU ",
1484                             tf_device_id.value(), " with ", memory_limit,
1485                             " bytes of memory.");
1486   }
1487   absl::optional<AllocatorStats> stats = gpu_allocator->GetStats();
1488   if (!stats) {
1489     return errors::Internal("No allocator statistics");
1490   }
1491   // 'memory_limit' is the required memory size, but if the allocator with
1492   // given tf_device_id was created before, we'll use it instead of creating a
1493   // new one (as TF gpu device is a shared resource), in which case the actual
1494   // memory limit represented by 'stats.bytes_limit' used by that allocator
1495   // may be different (which should be an error).
1496   //
1497   // TODO(laigd): report error if memory_limit doesn't match
1498   // stats->bytes_limit.
1499   int64_t bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
1500   std::unique_ptr<BaseGPUDevice> gpu_device = CreateGPUDevice(
1501       options, device_name, static_cast<Bytes>(bytes_limit), dev_locality,
1502       tf_device_id, GetShortDeviceDescription(platform_device_id, *desc),
1503       gpu_allocator, ProcessState::singleton()->GetCPUAllocator(numa_node));
1504   LOG(INFO) << "Created device " << device_name << " with "
1505             << (bytes_limit >> 20) << " MB memory: "
1506             << " -> " << GetShortDeviceDescription(platform_device_id, *desc);
1507   TF_RETURN_IF_ERROR(gpu_device->Init(options));
1508   gpu_allocator->SetStream(gpu_device->GetStream());
1509   devices->push_back(std::move(gpu_device));
1510 
1511   return Status::OK();
1512 }
1513 
1514 namespace {
1515 std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
GetPeerAccessMap(se::Platform * platform,const std::vector<PlatformDeviceId> & visible_gpu_order)1516 GetPeerAccessMap(se::Platform* platform,
1517                  const std::vector<PlatformDeviceId>& visible_gpu_order) {
1518   std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
1519       map(new std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>);
1520   for (PlatformDeviceId platform_gpu_i : visible_gpu_order) {
1521     for (PlatformDeviceId platform_gpu_j : visible_gpu_order) {
1522       se::StreamExecutor* from =
1523           DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_i)
1524               .ValueOrDie();
1525       se::StreamExecutor* to =
1526           DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_j)
1527               .ValueOrDie();
1528       (*map)[{platform_gpu_i, platform_gpu_j}] =
1529           from->CanEnablePeerAccessTo(to);
1530     }
1531   }
1532 
1533   return map;
1534 }
1535 
1536 }  // namespace
1537 
GetInterconnectMaps(const std::vector<PlatformDeviceId> & visible_gpu_order,se::Platform * gpu_manager,std::vector<InterconnectMap> * maps)1538 Status BaseGPUDeviceFactory::GetInterconnectMaps(
1539     const std::vector<PlatformDeviceId>& visible_gpu_order,
1540     se::Platform* gpu_manager, std::vector<InterconnectMap>* maps) {
1541   // The default interconnect map is obtained from the StreamExecutor.
1542   auto access_map = GetPeerAccessMap(gpu_manager, visible_gpu_order);
1543   maps->resize(1);
1544   InterconnectMap& imap = maps->at(0);
1545   imap.name = "StreamExecutor";
1546   imap.strength = InterconnectMap::kStreamExecutorStrength;
1547   for (PlatformDeviceId gpu_id_i : visible_gpu_order) {
1548     for (PlatformDeviceId gpu_id_j : visible_gpu_order) {
1549       if (gpu_id_i == gpu_id_j) continue;
1550       if ((*access_map)[{gpu_id_i, gpu_id_j}]) {
1551         imap.directed_links.insert({gpu_id_i, gpu_id_j});
1552       }
1553     }
1554   }
1555   return Status::OK();
1556 }
1557 
GetDeviceLocalities(int num_tf_gpus,const std::vector<InterconnectMap> & interconnects,LocalityMap * localities)1558 Status BaseGPUDeviceFactory::GetDeviceLocalities(
1559     int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
1560     LocalityMap* localities) {
1561   std::vector<TfDeviceId> all_tf_device_ids;
1562   all_tf_device_ids.reserve(num_tf_gpus);
1563   for (int i = 0; i < num_tf_gpus; ++i) {
1564     all_tf_device_ids.push_back(TfDeviceId(i));
1565   }
1566   for (TfDeviceId tf_device_id : all_tf_device_ids) {
1567     PlatformDeviceId platform_device_id;
1568     TF_RETURN_IF_ERROR(
1569         GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1570     // Get GPU bus_id from its reported NUMA affinity.  Because GPUs are
1571     // virtualized in some environments, we can't just use the GPU id.
1572     // NUMA locales are indexed from 0, buses are indexed from 1.
1573     se::Platform* gpu_manager = GPUMachineManager();
1574     auto desc_status =
1575         gpu_manager->DescriptionForDevice(platform_device_id.value());
1576     if (!desc_status.ok()) {
1577       return desc_status.status();
1578     }
1579     auto desc = desc_status.ConsumeValueOrDie();
1580     int numa_node = desc->numa_node();
1581     if (numa_node < 0) {
1582       // For some reason the StreamExecutor couldn't get the NUMA
1583       // affinity of the GPU.  If this is not a multi-socket mobo with
1584       // GPUs local to different buses, it doesn't matter.  If it is, we
1585       // may run into trouble later with data transfer operations.  The
1586       // trouble may manifest as slower than expected performance, or
1587       // outright failures.
1588       LOG(INFO) << "Could not identify NUMA node of platform GPU id "
1589                 << platform_device_id
1590                 << ", defaulting to 0.  Your kernel may not have been built "
1591                 << "with NUMA support.";
1592       numa_node = 0;
1593     }
1594     DeviceLocality dev_locality;
1595     dev_locality.set_numa_node(numa_node);
1596     dev_locality.set_bus_id(numa_node + 1);
1597 
1598     // Set LocalLinks from InterconnectMaps.
1599     LocalLinks* links = dev_locality.mutable_links();
1600     for (const InterconnectMap& imap : interconnects) {
1601       for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1602         PlatformDeviceId platform_gpu_dst;
1603         TF_RETURN_IF_ERROR(
1604             GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1605         if (imap.directed_links.find({platform_device_id, platform_gpu_dst}) !=
1606             imap.directed_links.end()) {
1607           InterconnectLink* ilink = links->add_link();
1608           ilink->set_device_id(tf_gpu_dst.value());
1609           ilink->set_type(imap.name);
1610           ilink->set_strength(imap.strength);
1611         }
1612       }
1613     }
1614 
1615     // If this is one of multiple virtual GPUs on the same physical GPU
1616     // add high strength links to the others.
1617     for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1618       if (tf_device_id == tf_gpu_dst) continue;
1619       PlatformDeviceId platform_gpu_dst;
1620       TF_RETURN_IF_ERROR(
1621           GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1622       if (platform_device_id == platform_gpu_dst) {
1623         InterconnectLink* ilink = links->add_link();
1624         ilink->set_device_id(tf_gpu_dst.value());
1625         ilink->set_type("SAME_DEVICE");
1626         ilink->set_strength(InterconnectMap::kSameDeviceStrength);
1627       }
1628     }
1629 
1630     (*localities)[tf_device_id] = dev_locality;
1631     VLOG(1) << "GPUDevice PlatformDeviceId " << platform_device_id
1632             << " TfDeviceId " << tf_device_id << " on bus "
1633             << dev_locality.bus_id() << " numa: " << numa_node
1634             << " pci: " << desc->pci_bus_id()
1635             << " DeviceLocality: " << dev_locality.DebugString();
1636   }
1637   return Status::OK();
1638 }
1639 
GetDefaultMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1640 static int GetDefaultMinGPUMultiprocessorCount(
1641     se::Platform* gpu_manager,
1642     const std::vector<PlatformDeviceId>& visible_gpu_order) {
1643   static const int kDefaultMinGPUMultiprocessorCount = 8;
1644 
1645   // Find the highest multi-processor count across all visible GPUs.
1646   int max_count = -1;
1647   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1648     int visible_gpu_id = visible_gpu_order[i].value();
1649     auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1650     if (!description_status.ok()) {
1651       continue;
1652     }
1653 
1654     auto description = description_status.ConsumeValueOrDie();
1655     max_count = std::max(max_count, description->core_count());
1656   }
1657 
1658   if (max_count < 0 || kDefaultMinGPUMultiprocessorCount < max_count) {
1659     return kDefaultMinGPUMultiprocessorCount;
1660   } else {
1661     return max_count;
1662   }
1663 }
1664 
GetMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1665 static int GetMinGPUMultiprocessorCount(
1666     se::Platform* gpu_manager,
1667     const std::vector<PlatformDeviceId>& visible_gpu_order) {
1668   const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT");
1669 
1670   if (tf_min_gpu_core_count == nullptr ||
1671       strcmp(tf_min_gpu_core_count, "") == 0) {
1672     return GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1673   }
1674 
1675   int min_gpu_core_count = -1;
1676   if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) {
1677     if (min_gpu_core_count >= 0) {
1678       return min_gpu_core_count;
1679     }
1680   }
1681 
1682   int count =
1683       GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1684   LOG(ERROR) << "Invalid minimum GPU multiprocessor count: ["
1685              << tf_min_gpu_core_count << "]. "
1686              << "Using the default value: " << count;
1687   return count;
1688 }
1689 
1690 namespace {
1691 
1692 #if GOOGLE_CUDA
1693 
ComputeCapabilityFromString(const std::string & version_name)1694 se::CudaComputeCapability ComputeCapabilityFromString(
1695     const std::string& version_name) {
1696   int major_part, minor_part;
1697   size_t dot_pos = version_name.find('.');
1698   CHECK(dot_pos != string::npos)
1699       << "Illegal version name: [" << version_name << "]";
1700   string major_str = version_name.substr(0, dot_pos);
1701   CHECK(strings::safe_strto32(major_str, &major_part))
1702       << "Illegal version name: [" << version_name << "]";
1703   string minor_str = version_name.substr(dot_pos + 1);
1704   CHECK(strings::safe_strto32(minor_str, &minor_part))
1705       << "Illegal version name: [" << version_name << "]";
1706   return se::CudaComputeCapability{major_part, minor_part};
1707 }
1708 
GetSupportedCudaComputeCapabilities()1709 std::vector<se::CudaComputeCapability> GetSupportedCudaComputeCapabilities() {
1710   std::vector<se::CudaComputeCapability> cuda_caps = {
1711       ComputeCapabilityFromString("3.5"), ComputeCapabilityFromString("5.2")};
1712 #ifdef TF_EXTRA_CUDA_CAPABILITIES
1713 // TF_EXTRA_CUDA_CAPABILITIES should be defined a sequence separated by commas,
1714 // for example:
1715 //   TF_EXTRA_CUDA_CAPABILITIES=3.0,4.0,5.0
1716 // Use two-level macro expansion for stringification.
1717 #define TF_XSTRING(...) #__VA_ARGS__
1718 #define TF_STRING(s) TF_XSTRING(s)
1719   string extra_cuda_caps = TF_STRING(TF_EXTRA_CUDA_CAPABILITIES);
1720 #undef TF_STRING
1721 #undef TF_XSTRING
1722   auto extra_capabilities = str_util::Split(extra_cuda_caps, ',');
1723   for (const auto& capability : extra_capabilities) {
1724     cuda_caps.push_back(ComputeCapabilityFromString(capability));
1725   }
1726 #endif
1727   return cuda_caps;
1728 }
1729 #endif  // GOOGLE_CUDA
1730 
1731 #if TENSORFLOW_USE_ROCM
1732 std::vector<int> supported_amdgpu_isa_versions = {803, 900, 906, 908};
1733 
GetSupportedAMDGPUISAVersions()1734 std::vector<int> GetSupportedAMDGPUISAVersions() {
1735   return supported_amdgpu_isa_versions;
1736 }
1737 #endif  // TENSORFLOW_USE_ROCM
1738 
1739 }  // namespace
1740 
EnablePeerAccess(const std::vector<PlatformDeviceId> & visible_gpu_order)1741 Status BaseGPUDeviceFactory::EnablePeerAccess(
1742     const std::vector<PlatformDeviceId>& visible_gpu_order) {
1743   se::Platform* gpu_manager = GPUMachineManager();
1744   int possible_peer_count = 0;
1745   int enabled_peer_count = 0;
1746   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1747     const PlatformDeviceId platform_gpu_i = visible_gpu_order[i];
1748     for (int j = 0; j < visible_gpu_order.size(); ++j) {
1749       const PlatformDeviceId platform_gpu_j = visible_gpu_order[j];
1750       // We have already validated that ExecutorForDevice() calls return OK.
1751       se::StreamExecutor* from =
1752           DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_i)
1753               .ValueOrDie();
1754       se::StreamExecutor* to =
1755           DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_j)
1756               .ValueOrDie();
1757 
1758       if (from->CanEnablePeerAccessTo(to)) {
1759         ++possible_peer_count;
1760         auto status = from->EnablePeerAccessTo(to);
1761         if (!status.ok()) {
1762           LOG(WARNING)
1763               << "Unable to enable peer access between device ordinals "
1764               << platform_gpu_i << " and " << platform_gpu_j
1765               << ", status: " << status;
1766         } else {
1767           ++enabled_peer_count;
1768         }
1769       }
1770     }
1771   }
1772 
1773   // Return an error in the extreme failure case where the driver
1774   // reported that peering was possible but not a single peering was
1775   // successful.  This is to catch possible system misconfigurations
1776   // or more fundamental issues.
1777   if (possible_peer_count > 0 && enabled_peer_count == 0) {
1778     return errors::Internal(possible_peer_count,
1779                             " potential peer access pairs were reported by the "
1780                             "driver, but no peering could be enabled.");
1781   }
1782   return Status::OK();
1783 }
1784 
GetValidDeviceIds(const std::vector<PlatformDeviceId> & visible_gpu_order,std::vector<PlatformDeviceId> * ids)1785 Status BaseGPUDeviceFactory::GetValidDeviceIds(
1786     const std::vector<PlatformDeviceId>& visible_gpu_order,
1787     std::vector<PlatformDeviceId>* ids) {
1788   se::Platform* gpu_manager = GPUMachineManager();
1789   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1790     int visible_gpu_id = visible_gpu_order[i].value();
1791     auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1792     if (!description_status.ok()) {
1793       return description_status.status();
1794     }
1795 
1796     auto description = description_status.ConsumeValueOrDie();
1797 #if GOOGLE_CUDA
1798     VLOG(1) << "Found device " << i << " with properties: "
1799             << "\npciBusID: " << description->pci_bus_id()
1800             << " name: " << description->name() << " computeCapability: "
1801             << description->cuda_compute_capability().ToString()
1802             << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1803             << " coreCount: " << description->core_count()
1804             << " deviceMemorySize: "
1805             << strings::HumanReadableNumBytes(description->device_memory_size())
1806             << " deviceMemoryBandwidth: "
1807             << strings::HumanReadableNumBytes(description->memory_bandwidth())
1808             << "/s";
1809 #elif TENSORFLOW_USE_ROCM
1810     std::string gcn_arch_name = description->rocm_amdgpu_gcn_arch_name();
1811     VLOG(1) << "Found device " << i << " with properties: "
1812             << "\npciBusID: " << description->pci_bus_id()
1813             << " name: " << description->name()
1814             << "     ROCm AMDGPU Arch: " << gcn_arch_name
1815             << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1816             << " coreCount: " << description->core_count()
1817             << " deviceMemorySize: "
1818             << strings::HumanReadableNumBytes(description->device_memory_size())
1819             << " deviceMemoryBandwidth: "
1820             << strings::HumanReadableNumBytes(description->memory_bandwidth())
1821             << "/s";
1822 #endif
1823   }
1824 
1825 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
1826   // Try to dlopen GPU libraries if they are supposed to be dynamically loaded.
1827   auto handle_or = se::internal::DsoLoader::MaybeTryDlopenGPULibraries();
1828   if (!handle_or.ok()) {
1829     LOG(WARNING) << "Cannot dlopen some GPU libraries. Please make sure the "
1830                     "missing libraries mentioned above are installed properly "
1831                     "if you would like to use GPU. Follow the guide at "
1832                     "https://www.tensorflow.org/install/gpu for how to "
1833                     "download and setup the required libraries for your "
1834                     "platform.\nSkipping registering "
1835                     "GPU devices...";
1836     return Status::OK();
1837   }
1838 #endif
1839 
1840 #if GOOGLE_CUDA
1841   auto cuda_supported_capabilities = GetSupportedCudaComputeCapabilities();
1842   if (cuda_supported_capabilities.empty()) {
1843     return errors::FailedPrecondition(
1844         "No supported cuda capabilities in binary.");
1845   }
1846   se::CudaComputeCapability min_supported_capability = *std::min_element(
1847       cuda_supported_capabilities.begin(), cuda_supported_capabilities.end());
1848 #elif TENSORFLOW_USE_ROCM
1849   auto rocm_supported_isas = GetSupportedAMDGPUISAVersions();
1850   if (rocm_supported_isas.empty()) {
1851     return errors::FailedPrecondition(
1852         "No supported rocm capabilities in binary.");
1853   }
1854   int min_supported_isa =
1855       *std::min_element(rocm_supported_isas.begin(), rocm_supported_isas.end());
1856 #endif
1857 
1858   int min_gpu_core_count =
1859       GetMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1860 
1861   // Filter out devices that don't have the right capability or power.
1862   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1863     const PlatformDeviceId visible_gpu_id = visible_gpu_order[i];
1864     auto description_status =
1865         gpu_manager->DescriptionForDevice(visible_gpu_id.value());
1866     if (!description_status.ok()) {
1867       LOG(INFO) << "Ignoring visible gpu device " << visible_gpu_id
1868                 << " whose executor is in invalid state: "
1869                 << description_status.status().ToString();
1870       continue;
1871     }
1872 
1873     auto desc = description_status.ConsumeValueOrDie();
1874 
1875 #if GOOGLE_CUDA
1876     // Only GPUs with no less than the minimum supported compute capability is
1877     // accepted.
1878     if (desc->cuda_compute_capability() < min_supported_capability) {
1879       LOG(INFO) << "Ignoring visible gpu device "
1880                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1881                 << ") "
1882                 << "with Cuda compute capability "
1883                 << desc->cuda_compute_capability().ToString()
1884                 << ". The minimum required Cuda capability is "
1885                 << min_supported_capability.ToString() << ".";
1886       continue;
1887     }
1888 #elif TENSORFLOW_USE_ROCM
1889     int device_isa;
1890     if (!desc->rocm_amdgpu_isa_version(&device_isa)) {
1891       continue;
1892     }
1893     // Only GPUs with no less than the minimum supported compute capability is
1894     // accepted.
1895     if (device_isa < min_supported_isa) {
1896       LOG(INFO) << "Ignoring visible gpu device "
1897                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1898                 << ") "
1899                 << "with AMDGPU ISA gfx" << device_isa
1900                 << ". The minimum required AMDGPU ISA is gfx"
1901                 << min_supported_isa << ".";
1902       continue;
1903     }
1904 #endif
1905 
1906     // Filter out slow GPUs. By default, GPUs with a lower multiprocessor
1907     // count than the fastest GPU are filtered out, unless they have 8 or more
1908     // multiprocessors. If the TF_MIN_GPU_MULTIPROCESSOR_COUNT environment
1909     // variable is set, its value will be used to filter out GPUs.
1910     if (desc->core_count() < min_gpu_core_count) {
1911       LOG(INFO) << "Ignoring visible gpu device "
1912                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1913                 << ") "
1914                 << "with core count: " << desc->core_count()
1915                 << ". The minimum required count is " << min_gpu_core_count
1916                 << ". You can adjust this requirement with the env var "
1917                    "TF_MIN_GPU_MULTIPROCESSOR_COUNT.";
1918       continue;
1919     }
1920 
1921 #if defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
1922     // Compare compute capability of device with list in cuda_config.h and
1923     // warn about long loading time when we need to JIT kernels from PTX.
1924     auto compute_capabilities = {TF_CUDA_COMPUTE_CAPABILITIES};
1925     auto device_capability = desc->cuda_compute_capability();
1926     if (std::count_if(std::cbegin(compute_capabilities),
1927                       std::cend(compute_capabilities), [&](int cc) {
1928                         // CUBINs are backwards compatible within major version.
1929                         return cc / 10 == device_capability.major &&
1930                                cc % 10 <= device_capability.minor;
1931                       }) == 0) {
1932       LOG(WARNING)
1933           << "TensorFlow was not built with CUDA kernel binaries "
1934              "compatible with compute capability "
1935           << device_capability.ToString() << ". CUDA kernels will be "
1936           << "jit-compiled from PTX, which could take 30 minutes or longer.";
1937     }
1938 #endif  // defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
1939 
1940     ids->push_back(visible_gpu_id);
1941   }
1942   if (!ids->empty()) {
1943     std::vector<int> raw_ids(ids->size());
1944     std::transform(ids->begin(), ids->end(), raw_ids.begin(),
1945                    [](PlatformDeviceId id) -> int { return id.value(); });
1946     VLOG(1) << "Adding visible gpu devices: " << absl::StrJoin(raw_ids, ", ");
1947   }
1948 
1949   return Status::OK();
1950 }
1951 
SafeAllocFrontier(uint64 old_value)1952 uint64 BaseGPUDevice::SafeAllocFrontier(uint64 old_value) {
1953   if (timestamped_allocator_) {
1954     return kernel_tracker_->LastTerminatedCount(old_value);
1955   } else {
1956     return 0;
1957   }
1958 }
1959 
PendingKernels()1960 int BaseGPUDevice::PendingKernels() {
1961   if (kernel_tracker_) {
1962     return kernel_tracker_->NumPending();
1963   }
1964   return 0;
1965 }
1966 
TestOnlyReset()1967 void BaseGPUDevice::TestOnlyReset() {
1968   StreamGroupFactory::Global().TestOnlyReset();
1969 }
1970 
MaybeQueue(OpKernelContext * ctx)1971 uint64 GPUKernelTracker::MaybeQueue(OpKernelContext* ctx) {
1972   mutex_lock l(mu_);
1973   ++ops_since_last_;
1974   int64_t mem_used =
1975       ctx->persistent_memory_allocated() + ctx->temp_memory_allocated();
1976   VLOG(2) << "kernel: " << ctx->op_kernel().name() << " mem_used: " << mem_used;
1977   mem_since_last_ += mem_used;
1978   int weight = 1;
1979   // Note that if all {max_bytes, max_interval, max_pending} are zero then
1980   // we track every single kernel with no pending cap.  This can happen if
1981   // timestamped_allocator alone was specified.
1982   if ((mem_since_last_ < params_.max_bytes) &&
1983       (ops_since_last_ < params_.max_interval)) {
1984     return 0;
1985   } else {
1986     weight = std::min(
1987         params_.max_pending,
1988         std::max(1, mem_since_last_ / std::max(16386, params_.max_bytes)));
1989     mem_since_last_ = 0;
1990     ops_since_last_ = 0;
1991   }
1992   uint64 queued_count = timing_counter_->next();
1993   RecordQueued(queued_count, weight);
1994   return queued_count;
1995 }
1996 
RecordQueued(uint64 queued_count,int weight)1997 void GPUKernelTracker::RecordQueued(uint64 queued_count, int weight) {
1998   VLOG(2) << "RecordQueued queued_count=" << queued_count
1999           << " first_available_=" << first_available_
2000           << " last_completed_=" << last_completed_
2001           << " num_pending_=" << num_pending_;
2002   pending_kernels_[first_available_].queued_count = queued_count;
2003   pending_kernels_[first_available_].weight = weight;
2004   pending_kernels_[first_available_].terminated = false;
2005   ++first_available_;
2006   num_pending_ += weight;
2007   if (first_available_ >= pending_kernels_.size()) {
2008     if (last_completed_ >= 0) {
2009       // wrap
2010       first_available_ = 0;
2011     } else {
2012       // enlarge the ring buffer
2013       pending_kernels_.resize(2 * pending_kernels_.size());
2014     }
2015   }
2016   if (first_available_ == last_completed_) {
2017     // Ring buffer is full: double it.  All of the same valid PendingKernel
2018     // entries exist after the copy, they are just shifted to begin
2019     // at index 0 in the new array.
2020     std::vector<PendingKernel> new_buffer(pending_kernels_.size() * 2);
2021     for (int i = 0; i < pending_kernels_.size(); ++i) {
2022       int j = (i + last_completed_) % pending_kernels_.size();
2023       new_buffer[i] = pending_kernels_[j];
2024     }
2025     last_completed_ = 0;
2026     first_available_ = pending_kernels_.size();
2027     pending_kernels_.swap(new_buffer);
2028     VLOG(1) << "last_completed_=" << last_completed_
2029             << " first_available_=" << first_available_
2030             << " num_pending_=" << num_pending_;
2031   }
2032   DCHECK_NE(first_available_, last_completed_) << "exhausted pending_kernels";
2033 }
2034 
2035 // Called by LastTerminatedCount() when new_value is equal to old_value.  This
2036 // case can occur where an allocation failed and waited for memory to be freed,
2037 // then when it retried the safe allocation frontier had not advanced because no
2038 // tracking event had matured.  Maybe GPU progress has stalled waiting on an i/o
2039 // event, or maybe we're tracking at too infrequent an interval.  In any case if
2040 // the GPU compute queue is actually empty it's safe to advance the safe
2041 // frontier so that this request can allocate from unrestricted (and better
2042 // compacted) memory.  So queue an event on the compute stream to ensure the
2043 // frontier does advance.
MaybeQueueProgressEvent()2044 void GPUKernelTracker::MaybeQueueProgressEvent() {
2045   mutex_lock l(mu_);
2046   if (num_pending_ == 0) {
2047     uint64 new_count = timing_counter_->next();
2048     RecordQueued(new_count, 1);
2049     em_->ThenExecute(stream_,
2050                      [this, new_count]() { RecordTerminated(new_count); });
2051   }
2052 }
2053 
RecordTerminated(uint64 queued_count)2054 void GPUKernelTracker::RecordTerminated(uint64 queued_count) {
2055   mutex_lock l(mu_);
2056   VLOG(2) << this << " RecordTerminated queued_count=" << queued_count
2057           << " first_available_=" << first_available_
2058           << " last_completed_=" << last_completed_
2059           << " num_pending_=" << num_pending_ << " LC="
2060           << ((last_completed_ >= 0)
2061                   ? pending_kernels_[last_completed_].queued_count
2062                   : -1);
2063   DCHECK_NE(first_available_, last_completed_);
2064   DCHECK_GT(num_pending_, 0);
2065   // Starting just past the last completed entry, find the entry with
2066   // this queued_count and mark it done.
2067   int index = (last_completed_ + 1) % pending_kernels_.size();
2068   int weight = 1;
2069   while (true) {
2070     if (index == first_available_) {
2071       // This should never happen.
2072       LOG(FATAL) << "Failed to find " << queued_count  // Crash OK
2073                  << " in queue, last_completed_=" << last_completed_
2074                  << " index=" << index
2075                  << " first_available_=" << first_available_
2076                  << " pending_kernels_.size()=" << pending_kernels_.size();
2077     }
2078     if (pending_kernels_[index].queued_count == queued_count) {
2079       pending_kernels_[index].terminated = true;
2080       weight = pending_kernels_[index].weight;
2081       break;
2082     }
2083     index = (index + 1) % pending_kernels_.size();
2084   }
2085   // Next move last_completed_ forward past all completed kernels.  In theory
2086   // kernels should always complete in queued order so we should be able to
2087   // advance the completed frontier to the just-completed PendingKernel.  In
2088   // practice we occasionally see the termination callbacks arrive out of
2089   // order probably because of thread scheduling.  Eventually we may support
2090   // out-of- order completion involving multiple compute streams so here we
2091   // follow a conservative approach and wait for every single callback to
2092   // arrive before advancing the frontier.
2093   while (true) {
2094     int next_index = (last_completed_ + 1) % pending_kernels_.size();
2095     if (next_index == first_available_) break;
2096     if (pending_kernels_[next_index].terminated) {
2097       last_completed_ = next_index;
2098     } else {
2099       break;
2100     }
2101   }
2102   if (last_completed_ >= 0) {
2103     int64_t v = pending_kernels_[last_completed_].queued_count;
2104     last_terminated_count_ = v;
2105     if (allocator_) {
2106       allocator_->SetSafeFrontier(v);
2107     }
2108   }
2109   // Last decrease num_pending before maybe waking a waiter.
2110   num_pending_ -= weight;
2111   pending_decreased_.notify_all();
2112 }
2113 
2114 }  // namespace tensorflow
2115 
2116 #endif  // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
2117