1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 // TODO(opensource): Use a more generic sounding preprocessor name than
17 // GOOGLE_CUDA
18 #if (defined(GOOGLE_CUDA) && GOOGLE_CUDA) || \
19 (defined(TENSORFLOW_USE_ROCM) && TENSORFLOW_USE_ROCM)
20
21 #if TENSORFLOW_USE_ROCM
22 #include "rocm/include/hip/hip_runtime.h"
23 #endif
24
25 #define EIGEN_USE_GPU
26
27 #include <stdlib.h>
28 #include <string.h>
29
30 #include <algorithm>
31 #include <list>
32 #include <map>
33 #include <tuple>
34 #include <vector>
35
36 #include "absl/container/flat_hash_set.h"
37 #include "absl/strings/str_split.h"
38 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
39 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
40 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
41 #include "tensorflow/core/common_runtime/device_factory.h"
42 #include "tensorflow/core/common_runtime/gpu/gpu_device.h"
43 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
44 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
45 #include "tensorflow/core/common_runtime/gpu/gpu_init.h"
46 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
47 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
48 #include "tensorflow/core/common_runtime/gpu_device_context.h"
49 #include "tensorflow/core/common_runtime/local_device.h"
50 #include "tensorflow/core/framework/allocator.h"
51 #include "tensorflow/core/framework/device_base.h"
52 #include "tensorflow/core/framework/op_kernel.h"
53 #include "tensorflow/core/framework/tensor.h"
54 #include "tensorflow/core/framework/tensor.pb.h"
55 #include "tensorflow/core/framework/types.h"
56 #include "tensorflow/core/framework/variant_op_registry.h"
57 #include "tensorflow/core/graph/types.h"
58 #include "tensorflow/core/lib/core/errors.h"
59 #include "tensorflow/core/lib/core/status.h"
60 #include "tensorflow/core/lib/strings/numbers.h"
61 #include "tensorflow/core/lib/strings/str_util.h"
62 #include "tensorflow/core/lib/strings/strcat.h"
63 #if GOOGLE_CUDA
64 #include "third_party/gpus/cudnn/cudnn.h"
65 #include "tensorflow/stream_executor/cuda/cuda_activation.h"
66 #elif TENSORFLOW_USE_ROCM
67 #include "tensorflow/core/platform/rocm.h"
68 #endif
69 #include "tensorflow/core/platform/logging.h"
70 #include "tensorflow/core/platform/macros.h"
71 #include "tensorflow/core/platform/stream_executor.h"
72 #include "tensorflow/core/platform/types.h"
73 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
74 #include "tensorflow/core/public/session_options.h"
75 #include "tensorflow/core/util/device_name_utils.h"
76 #include "tensorflow/core/util/env_var.h"
77 #include "tensorflow/core/util/stream_executor_util.h"
78 #include "tensorflow/stream_executor/gpu/gpu_stream.h"
79 #include "tensorflow/stream_executor/platform/dso_loader.h"
80
81 #if !defined(PLATFORM_GOOGLE)
82 #if GOOGLE_CUDA
83 #include "third_party/gpus/cuda/cuda_config.h"
84 #endif
85 #endif
86
87 namespace tensorflow {
88
89 #if GOOGLE_CUDA
90
91 typedef cudaStream_t gpuStream_t;
92 typedef cudaDeviceProp gpuDeviceProp_t;
93 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
94 using se::cuda::ScopedActivateExecutorContext;
95
96 #elif TENSORFLOW_USE_ROCM
97
98 typedef hipStream_t gpuStream_t;
99 typedef hipDeviceProp_t gpuDeviceProp_t;
100 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
101 using se::rocm::ScopedActivateExecutorContext;
102
103 #endif
104
105 // Eigen Ops directly allocate memory only for temporary buffers used
106 // during OpKernel::Compute(). The recommended way of allocating such
107 // memory is via OpKernelContext::allocate_temp(). However, Eigen Ops
108 // don't have access to OpKernelContext, instead they get access to
109 // memory directly through the device allocator. As an Open Source
110 // project, Eigen assumes allocator semantics similar to those of the
111 // CUDA or ROCm memory allocator, and may not work correctly due to race
112 // conditions if used with some other allocator. For safety, we need
113 // to delay deallocation calls out of Eigen until all events on the
114 // corresponding stream have completed. The following two classes
115 // serve this purpose in two different compilation environments.
116
117 class EigenGpuStreamDevice : public ::Eigen::StreamInterface {
118 public:
EigenGpuStreamDevice()119 EigenGpuStreamDevice()
120 : scratch_(nullptr), semaphore_(nullptr), context_(nullptr) {}
~EigenGpuStreamDevice()121 ~EigenGpuStreamDevice() override {}
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,TfDeviceId tf_device_id,::tensorflow::Allocator * alloc,char * scratch)122 void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
123 TfDeviceId tf_device_id, ::tensorflow::Allocator* alloc,
124 char* scratch) {
125 if (LogMemory::IsEnabled()) {
126 operation_ = context->op_kernel().name() + "/EigenAllocator";
127 step_id_ = context->step_id();
128 }
129 context_ = context;
130 scratch_ = scratch;
131 semaphore_ =
132 reinterpret_cast<unsigned int*>(scratch + Eigen::kGpuScratchSize);
133 stream_ = gpu_stream;
134 allocator_ = alloc;
135 PlatformDeviceId platform_device_id;
136 TF_CHECK_OK(
137 GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
138 device_prop_ = &Eigen::GetGpuDeviceProperties(platform_device_id.value());
139 }
140
stream() const141 const gpuStream_t& stream() const override { return *stream_; }
deviceProperties() const142 const gpuDeviceProp_t& deviceProperties() const override {
143 return *device_prop_;
144 }
145
allocate(size_t num_bytes) const146 void* allocate(size_t num_bytes) const override {
147 void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes);
148 if (ret == nullptr) {
149 if (context_) {
150 context_->SetStatus(errors::ResourceExhausted(
151 strings::StrCat("Ran out of GPU memory when allocating ", num_bytes,
152 " bytes for ", operation_)));
153 } else {
154 LOG(FATAL)
155 << "EigenAllocator for GPU ran out of memory when allocating "
156 << num_bytes << ". See error logs for more detailed info.";
157 }
158 }
159 if (LogMemory::IsEnabled() && ret != nullptr) {
160 LogMemory::RecordRawAllocation(operation_, step_id_, num_bytes, ret,
161 allocator_);
162 }
163 return ret;
164 }
deallocate(void * buffer) const165 void deallocate(void* buffer) const override {
166 if (LogMemory::IsEnabled() && buffer != nullptr) {
167 LogMemory::RecordRawDeallocation(operation_, step_id_, buffer, allocator_,
168 true);
169 }
170 AsyncFreeData* afData =
171 new AsyncFreeData(allocator_, buffer, operation_, step_id_);
172 #if GOOGLE_CUDA
173 cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0);
174 CHECK_EQ(err, cudaSuccess);
175 #elif TENSORFLOW_USE_ROCM
176 hipError_t err = hipStreamAddCallback(*stream_, asyncFree, afData, 0);
177 CHECK_EQ(err, hipSuccess);
178 #endif
179 }
180
181 // Return a pointer to a per stream scratchpad of 1024 bytes residing
182 // in global memory.
scratchpad() const183 void* scratchpad() const override { return scratch_; }
184
185 // Return a semaphore. The semaphore is initially initialized to 0, and
186 // each kernel using it is responsible for resetting to 0 upon completion
187 // to maintain the invariant that the semaphore is always equal to 0 upon
188 // each kernel start.
semaphore() const189 unsigned int* semaphore() const override { return semaphore_; }
190
191 private:
192 struct AsyncFreeData {
AsyncFreeDatatensorflow::EigenGpuStreamDevice::AsyncFreeData193 AsyncFreeData(::tensorflow::Allocator* a, void* p, const string& o,
194 const int64 s)
195 : allocator_(a), address_(p), operation_(o), step_id_(s) {}
196 ::tensorflow::Allocator* allocator_;
197 void* address_;
198 const string operation_;
199 const int64 step_id_;
200 };
201
202 #if GOOGLE_CUDA
asyncFree(gpuStream_t stream,cudaError_t status,void * userData)203 static void CUDART_CB asyncFree(gpuStream_t stream, cudaError_t status,
204 void* userData)
205 #elif TENSORFLOW_USE_ROCM
206 static void asyncFree(gpuStream_t stream, hipError_t status, void* userData)
207 #endif
208 {
209 AsyncFreeData* data = static_cast<AsyncFreeData*>(userData);
210 if (LogMemory::IsEnabled()) {
211 LogMemory::RecordRawDeallocation(data->operation_, data->step_id_,
212 data->address_, data->allocator_, false);
213 }
214 data->allocator_->DeallocateRaw(data->address_);
215 delete data;
216 }
217
218 string operation_;
219 int64 step_id_;
220 const gpuStream_t* stream_; // Not owned.
221 const gpuDeviceProp_t* device_prop_; // Not owned.
222 ::tensorflow::Allocator* allocator_; // Not owned.
223 mutable char* scratch_;
224 mutable unsigned int* semaphore_;
225 OpKernelContext* context_;
226
227 TF_DISALLOW_COPY_AND_ASSIGN(EigenGpuStreamDevice);
228 };
229
230 // This factory helps to ensure that different GPU device objects that refer to
231 // the same physical device and stream group id use the same stream group
232 // object (and therefore the same CUDA streams). This is necessary since there
233 // is a single memory allocator per device (see ProcessState::GetGPUAllocator)
234 // and allocators must not be shared across streams.
235 class BaseGPUDevice::StreamGroupFactory {
236 public:
237 // Returns the unique stream group for use with the stream defined by
238 // {tf_device_id, stream_group_within_gpu}, creating it if it does not yet
239 // exist.
240 // This function is thread safe.
GetOrCreate(TfDeviceId tf_device_id,int stream_group_within_gpu,se::StreamExecutor * executor,const GPUOptions & options)241 BaseGPUDevice::StreamGroup* GetOrCreate(TfDeviceId tf_device_id,
242 int stream_group_within_gpu,
243 se::StreamExecutor* executor,
244 const GPUOptions& options) {
245 mutex_lock guard(lock_);
246 StreamGroup* group =
247 &streams_[key_type(tf_device_id.value(), stream_group_within_gpu)];
248 if (!group->compute) {
249 int priority = GetPriority(tf_device_id.value(), options);
250 group->priority = priority;
251 group->compute = GetStream(executor, priority);
252 group->compute->Init();
253 VLOG(2) << "Created stream[" << stream_group_within_gpu
254 << "] = " << group->compute << " with priority: " << priority;
255
256 #if TENSORFLOW_USE_ROCM
257 // ROCm streams are lightweight and will not necessarily trigger device
258 // queue init until they are first used. For optimal performance,
259 // compute and nccl streams must be immediate siblings.
260 group->nccl = GetStream(executor, priority);
261 group->nccl->Init();
262 VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
263 << "] = " << group->nccl;
264
265 // Force underlying resource creation now.
266 group->compute->ThenWaitFor(group->nccl);
267 group->nccl->ThenWaitFor(group->compute);
268 #endif
269
270 group->host_to_device = GetStream(executor, priority);
271 group->host_to_device->Init();
272 VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
273 << "] = " << group->host_to_device;
274
275 group->device_to_host = GetStream(executor, priority);
276 group->device_to_host->Init();
277 VLOG(2) << "Created device_to_host_stream[" << stream_group_within_gpu
278 << "] = " << group->device_to_host;
279
280 int num_d2d_streams =
281 options.experimental().num_dev_to_dev_copy_streams();
282 if (num_d2d_streams == 0) num_d2d_streams = 1;
283 if (num_d2d_streams < 1 || num_d2d_streams > 4) {
284 LOG(ERROR)
285 << "Illegal GPUOptions.experimental.num_dev_to_dev_copy_streams="
286 << num_d2d_streams << " set to 1 instead.";
287 num_d2d_streams = 1;
288 }
289 for (int i = 0; i < num_d2d_streams; ++i) {
290 se::Stream* stream = GetStream(executor, priority);
291 stream->Init();
292 group->device_to_device.push_back(stream);
293 VLOG(2) << "Created device_to_device_stream[" << stream_group_within_gpu
294 << "] = " << group->device_to_device.back();
295 }
296 }
297 return group;
298 }
299
300 // Returns a reference to the StreamGroupFactory singleton. Note that this is
301 // never destroyed, so the objects it owns are never deleted.
Global()302 static StreamGroupFactory& Global() {
303 static StreamGroupFactory* instance = new StreamGroupFactory();
304 return *instance;
305 }
306
307 // Helper method for unit tests to reset the streams. Never use in production.
TestOnlyReset()308 void TestOnlyReset() {
309 mutex_lock guard(lock_);
310 for (auto& item : streams_) {
311 auto& stream = item.second;
312 if (stream.compute) {
313 delete stream.compute;
314 stream.compute = nullptr;
315 }
316 #if TENSORFLOW_USE_ROCM
317 if (stream.nccl) {
318 delete stream.nccl;
319 stream.nccl = nullptr;
320 }
321 #endif
322 if (stream.host_to_device) {
323 delete stream.host_to_device;
324 stream.host_to_device = nullptr;
325 }
326 if (stream.device_to_host) {
327 delete stream.device_to_host;
328 stream.device_to_host = nullptr;
329 }
330 while (!stream.device_to_device.empty()) {
331 auto back = stream.device_to_device.back();
332 if (back) {
333 delete back;
334 }
335 stream.device_to_device.pop_back();
336 }
337 }
338 streams_.clear();
339 }
340
341 private:
342 // Returns priority for the given virtual GPU id from the session options.
343 // Returns 0 if no virtual devices are specified.
GetPriority(int tf_device_id,const GPUOptions & options)344 int GetPriority(int tf_device_id, const GPUOptions& options) {
345 int id = tf_device_id;
346 int i = 0;
347 int priority = 0;
348 while (i < options.experimental().virtual_devices_size()) {
349 const int size =
350 options.experimental().virtual_devices().Get(i).priority_size();
351 if (id >= size) {
352 id -= size;
353 } else {
354 priority =
355 options.experimental().virtual_devices().Get(i).priority().Get(id);
356 break;
357 }
358 i++;
359 }
360 return priority;
361 }
362
363 // Returns a Stream with the underlying GPUStream with the given priority.
GetStream(se::StreamExecutor * executor,int priority)364 se::Stream* GetStream(se::StreamExecutor* executor, int priority) {
365 auto stream = new se::Stream(executor);
366 static_cast<stream_executor::gpu::GpuStream*>(stream->implementation())
367 ->SetPriority(priority);
368 return stream;
369 }
370
371 mutex lock_;
372 using key_type = std::tuple<int, int>;
373 std::map<key_type, StreamGroup> streams_;
374
375 // StreamGroupFactory cannot be created directly; Call
376 // StreamGroupFactory::Global() to get the global instance.
377 StreamGroupFactory() = default;
378 TF_DISALLOW_COPY_AND_ASSIGN(StreamGroupFactory);
379 };
380
BaseGPUDevice(const SessionOptions & options,const string & name,Bytes memory_limit,const DeviceLocality & locality,TfDeviceId tf_device_id,const string & physical_device_desc,Allocator * gpu_allocator,Allocator * cpu_allocator,bool sync_every_op)381 BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
382 Bytes memory_limit, const DeviceLocality& locality,
383 TfDeviceId tf_device_id,
384 const string& physical_device_desc,
385 Allocator* gpu_allocator, Allocator* cpu_allocator,
386 bool sync_every_op)
387 : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
388 memory_limit, locality,
389 physical_device_desc)),
390 gpu_allocator_(gpu_allocator),
391 cpu_allocator_(cpu_allocator),
392 scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
393 tf_device_id_(tf_device_id),
394 sync_every_op_(sync_every_op) {
395 GPUProcessState::singleton()->EnableGPUDevice();
396 }
397
~BaseGPUDevice()398 BaseGPUDevice::~BaseGPUDevice() {
399 delete gpu_device_info_;
400 gpu_allocator_->DeallocateRaw(scratch_);
401 device_context_->Unref();
402 }
403
404 // This should be idempotent if already initialized.
InitScratchBuffers()405 Status BaseGPUDevice::InitScratchBuffers() {
406 mutex_lock l(scratch_init_mutex_);
407 if (!scratch_) {
408 DCHECK(stream_);
409 size_t scratch_buffer_size = Eigen::kGpuScratchSize + sizeof(unsigned int);
410 ScopedMemoryDebugAnnotation op_annotation("ScratchBuffer");
411 void* scratch_buffer = gpu_allocator_->AllocateRaw(
412 Allocator::kAllocatorAlignment, scratch_buffer_size);
413 if (scratch_buffer == nullptr) {
414 return errors::FailedPrecondition(
415 "Failed to allocate scratch buffer for device ",
416 tf_device_id_.value());
417 }
418 se::DeviceMemory<char> mem(
419 se::DeviceMemoryBase(scratch_buffer, scratch_buffer_size));
420 TF_RETURN_IF_ERROR(executor_->SynchronousMemZero(
421 &mem, Eigen::kGpuScratchSize + sizeof(unsigned int)));
422 scratch_ = static_cast<char*>(scratch_buffer);
423 }
424 return Status::OK();
425 }
426
Init(const SessionOptions & options)427 Status BaseGPUDevice::Init(const SessionOptions& options) {
428 auto executor_status = DeviceIdUtil::ExecutorForTfDeviceId(
429 DEVICE_GPU, GPUMachineManager(), tf_device_id_);
430 if (!executor_status.status().ok()) {
431 return errors::Internal("Failed to get StreamExecutor for device ",
432 tf_device_id_.value());
433 }
434
435 executor_ = executor_status.ValueOrDie();
436
437 stream_ = StreamGroupFactory::Global().GetOrCreate(
438 tf_device_id_, 0, executor_, options.config.gpu_options());
439 device_context_ =
440 new GPUDeviceContext(0, stream_->compute,
441 #if TENSORFLOW_USE_ROCM
442 stream_->nccl,
443 #endif
444 stream_->host_to_device, stream_->device_to_host,
445 stream_->device_to_device);
446
447 em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
448 options.config.gpu_options());
449
450 GPUKernelTracker::Params tracker_params(
451 options.config.gpu_options().experimental().kernel_tracker_max_interval(),
452 options.config.gpu_options().experimental().kernel_tracker_max_bytes(),
453 options.config.gpu_options().experimental().kernel_tracker_max_pending());
454 timestamped_allocator_ =
455 options.config.gpu_options().experimental().timestamped_allocator();
456 pending_cap_ = tracker_params.max_pending;
457 if (timestamped_allocator_ ||
458 (tracker_params.max_interval > 0 || tracker_params.max_bytes > 0 ||
459 tracker_params.max_pending > 0)) {
460 SharedCounter* timing_counter = nullptr;
461 if (timestamped_allocator_) {
462 // In this case the SharedCounter was already created and set in the
463 // associated Allocator, with ownership by GPUProcessState.
464 // The GPUKernelTracker will use this SharedCounter, instead of
465 // owning its own.
466 timing_counter =
467 GPUProcessState::singleton()->GPUAllocatorCounter(tf_device_id_);
468 DCHECK(timing_counter);
469 }
470 kernel_tracker_.reset(new GPUKernelTracker(
471 tracker_params, Env::Default(), stream_->compute, timing_counter,
472 timestamped_allocator_ ? gpu_allocator_ : nullptr, em_));
473 }
474
475 gpu_device_info_ = new GpuDeviceInfo;
476 gpu_device_info_->stream = stream_->compute;
477 gpu_device_info_->default_context = device_context_;
478 gpu_device_info_->event_mgr = em_;
479 PlatformDeviceId platform_device_id;
480 TF_RETURN_IF_ERROR(
481 GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id));
482 gpu_device_info_->gpu_id = platform_device_id.value();
483 set_tensorflow_gpu_device_info(gpu_device_info_);
484
485 // Whether and how the GPU device uses its own threadpool.
486 // This option is experimental. Once we confirm the best setting, we
487 // may change the default behavior and completely remove this flag.
488 // Default values might change in future releases.
489 // Possible values:
490 // * global: GPU uses threads shared with CPU in the main compute
491 // thread-pool. This is currently the default.
492 // * gpu_private: GPU uses threads dedicated to this device.
493 // * gpu_shared: All GPUs share a dedicated thread pool.
494 string gpu_thread_mode;
495 TF_RETURN_IF_ERROR(
496 ReadStringFromEnvVar("TF_GPU_THREAD_MODE", "global", &gpu_thread_mode));
497 gpu_thread_mode = absl::AsciiStrToLower(gpu_thread_mode);
498 if (gpu_thread_mode != "global") {
499 int64_t gpu_thread_count = -1;
500 // Default to two threads. One for device compute and another for memory
501 // copies.
502 TF_RETURN_IF_ERROR(
503 ReadInt64FromEnvVar("TF_GPU_THREAD_COUNT", 2, &gpu_thread_count));
504 if (gpu_thread_mode == "gpu_private") {
505 // TODO(zhengxq): since these threads only serve a single GPU device,
506 // we should set the device context once for each thread, and avoid
507 // setting them for each kernel.
508 // TODO(zhengxq): pin the thread to the same socket of the target GPU.
509 thread_pool_.reset(new thread::ThreadPool(
510 options.env, ThreadOptions(),
511 strings::StrCat("gpu_private_", tf_device_id_.value()),
512 static_cast<int32>(gpu_thread_count),
513 !options.config.experimental().disable_thread_spinning(),
514 /*allocator=*/nullptr));
515 set_tensorflow_device_thread_pool(thread_pool_.get());
516 } else if (gpu_thread_mode == "gpu_shared") {
517 static thread::ThreadPool* thread_pool = new thread::ThreadPool(
518 options.env, ThreadOptions(), "gpu_shared",
519 static_cast<int32>(gpu_thread_count),
520 !options.config.experimental().disable_thread_spinning(),
521 /*allocator=*/nullptr);
522 set_tensorflow_device_thread_pool(thread_pool);
523 } else {
524 string error_message =
525 strings::StrCat("Invalid gpu_thread_mode: ", gpu_thread_mode);
526 LOG(WARNING) << error_message;
527 return errors::InvalidArgument(error_message);
528 }
529 }
530
531 return Status::OK();
532 }
533
ComputeOpKernelDebugString(const OpKernel & op_kernel,const int & stream_id)534 string BaseGPUDevice::ComputeOpKernelDebugString(const OpKernel& op_kernel,
535 const int& stream_id) {
536 return strings::StrCat(op_kernel.name(), " op ", op_kernel.type_string(),
537 " on GPU ", tf_device_id_.value(), " stream[",
538 stream_id, "]");
539 }
540
541 namespace {
GetOpsToLogFromEnv()542 const absl::flat_hash_set<std::string>* GetOpsToLogFromEnv() {
543 auto* result = new absl::flat_hash_set<std::string>;
544 const char* env = getenv("TF_GPU_DEBUG_OPS_TO_LOG");
545 if (!env) {
546 return result;
547 }
548
549 std::vector<absl::string_view> ops = absl::StrSplit(env, ',');
550 LOG(INFO) << "Will log inputs & outputs from the following ops: ";
551 for (absl::string_view op : ops) {
552 result->insert(std::string(op));
553 LOG(INFO) << " |" << op << "|";
554 }
555
556 return result;
557 }
558
ShouldLogInputsAndOutputs(OpKernel * op_kernel)559 bool ShouldLogInputsAndOutputs(OpKernel* op_kernel) {
560 static const absl::flat_hash_set<std::string>& ops_to_log =
561 *GetOpsToLogFromEnv();
562 return ops_to_log.count(op_kernel->type_string());
563 }
564 } // namespace
565
CopyGpuTensorToHostDebugOnly(const Tensor & gpu_tensor)566 Tensor BaseGPUDevice::CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor) {
567 Tensor host_tensor(gpu_tensor.dtype(), gpu_tensor.shape());
568 CHECK(device_context_->stream()
569 ->ThenMemcpy(host_tensor.data(),
570 se::DeviceMemoryBase(gpu_tensor.data(),
571 gpu_tensor.TotalBytes()),
572 gpu_tensor.TotalBytes())
573 .BlockHostUntilDone()
574 .ok());
575 return host_tensor;
576 }
577
LogInputs(OpKernel * op_kernel,OpKernelContext * context)578 void BaseGPUDevice::LogInputs(OpKernel* op_kernel, OpKernelContext* context) {
579 LOG(INFO) << "Inputs for " << op_kernel->name() << " (total "
580 << context->num_inputs() << "):";
581 for (int i = 0; i < context->num_inputs(); i++) {
582 if (!context->has_input(i)) {
583 LOG(INFO) << "input # " << i << " is absent";
584 continue;
585 }
586
587 Tensor input = context->input_memory_type(i) == DEVICE_MEMORY
588 ? CopyGpuTensorToHostDebugOnly(context->input(i))
589 : context->input(i);
590
591 LOG(INFO) << "input # " << i;
592 LOG(INFO) << input.DebugString(-1);
593 }
594 LOG(INFO) << "";
595 }
596
LogOutputs(OpKernel * op_kernel,OpKernelContext * context)597 void BaseGPUDevice::LogOutputs(OpKernel* op_kernel, OpKernelContext* context) {
598 if (!context->status().ok()) {
599 LOG(INFO) << op_kernel->name()
600 << " failed: " << context->status().error_message();
601 return;
602 }
603
604 LOG(INFO) << "Outputs for " << op_kernel->name() << " (total "
605 << context->num_inputs() << "):";
606 for (int i = 0; i < context->num_outputs(); i++) {
607 Tensor* output_ptr = context->mutable_output(i);
608 if (output_ptr == nullptr) {
609 LOG(INFO) << "output # " << i << " is null";
610 continue;
611 }
612
613 Tensor output = context->output_memory_type(i) == DEVICE_MEMORY
614 ? CopyGpuTensorToHostDebugOnly(*output_ptr)
615 : *output_ptr;
616
617 LOG(INFO) << "output # " << i;
618 LOG(INFO) << output.DebugString(-1);
619 }
620 LOG(INFO) << "";
621 }
622
Compute(OpKernel * op_kernel,OpKernelContext * context)623 void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
624 // NOTE(tucker): We need to discriminate between Eigen GPU
625 // operations and all others. If an operation is Eigen
626 // implemented (or otherwise tries to launch a GPU kernel
627 // directly), we need to establish a stacked-scoped environment
628 // that directs it to execute on the proper device. Otherwise we
629 // expect the Op to use StreamExecutor directly and correctly.
630 GPUDeviceContext* gpu_device_context = device_context_;
631 if (context->op_device_context() != nullptr) {
632 gpu_device_context =
633 static_cast<GPUDeviceContext*>(context->op_device_context());
634 }
635 se::Stream* stream = gpu_device_context->stream();
636 const auto stream_id = gpu_device_context->stream_id();
637
638 const bool vlog_1 = VLOG_IS_ON(1);
639
640 if (vlog_1) {
641 VLOG(1) << "GpuDevice::ComputeHelper "
642 << ComputeOpKernelDebugString(*op_kernel, stream_id);
643 }
644
645 if (kernel_tracker_.get()) {
646 context->set_record_memory_consumption(true);
647 if (pending_cap_ > 0) {
648 kernel_tracker_->PauseWhilePendingExceeds(pending_cap_);
649 }
650 }
651 ScopedActivateExecutorContext scoped_activation{stream->parent()};
652 ScopedMemoryDebugAnnotation op_annotation(op_kernel->name_view().data(),
653 context->step_id());
654 bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
655
656 if (should_log_inputs_and_outputs) {
657 LogInputs(op_kernel, context);
658 }
659
660 op_kernel->Compute(context);
661
662 if (should_log_inputs_and_outputs) {
663 LogOutputs(op_kernel, context);
664 }
665
666 if (context->status().ok()) {
667 if (sync_every_op_) {
668 // Note: GPUUtil::Sync() only syncs the default stream.
669 // We need to either sync the stream used by this op, or
670 // all streams. Given that this flag is typically used for
671 // debugging it makes more sense to sync all GPU activity.
672 context->SetStatus(GPUUtil::SyncAll(this));
673 if (vlog_1) {
674 VLOG(1) << "GpuDevice::ComputeHelper finished "
675 << ComputeOpKernelDebugString(*op_kernel, stream_id);
676 }
677 } else if (vlog_1) {
678 VLOG(1) << "GpuDevice::ComputeHelper scheduled "
679 << ComputeOpKernelDebugString(*op_kernel, stream_id);
680 }
681 if (kernel_tracker_) {
682 GPUKernelTracker* tracker = kernel_tracker_.get();
683 DCHECK(tracker);
684 uint64 queued_count = tracker->MaybeQueue(context);
685 if (queued_count > 0) {
686 em_->ThenExecute(stream, [tracker, queued_count]() {
687 tracker->RecordTerminated(queued_count);
688 });
689 }
690 }
691 } else {
692 if (vlog_1) {
693 VLOG(1) << "GpuDevice::ComputeHelper failed to schedule "
694 << ComputeOpKernelDebugString(*op_kernel, stream_id);
695 }
696 }
697 }
698
Sync()699 Status BaseGPUDevice::Sync() {
700 DCHECK_NE(stream_, nullptr);
701
702 // Device::Sync is supposed to block until all operations queued on the device
703 // at the time of the call have completed. On GPUs, only operations enqueued
704 // on the compute stream can remain pending after the (Async)OpKernel that
705 // enqueued the operation has completed. We do use other streams for copies
706 // and collectives, but in those cases the (Async)OpKernels themselves block
707 // until the queued operation has finished.
708 return stream_->compute->BlockHostUntilDone();
709 }
710
ComputeAsync(AsyncOpKernel * op_kernel,OpKernelContext * context,AsyncOpKernel::DoneCallback done)711 void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel,
712 OpKernelContext* context,
713 AsyncOpKernel::DoneCallback done) {
714 GPUDeviceContext* gpu_device_context = device_context_;
715 if (context->op_device_context() != nullptr) {
716 gpu_device_context =
717 static_cast<GPUDeviceContext*>(context->op_device_context());
718 }
719 se::Stream* stream = gpu_device_context->stream();
720 const auto stream_id = gpu_device_context->stream_id();
721
722 VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op "
723 << op_kernel->type_string() << " on GPU" << tf_device_id_
724 << " stream[" << stream_id << "]";
725
726 bool should_log_inputs_and_outputs = ShouldLogInputsAndOutputs(op_kernel);
727
728 if (should_log_inputs_and_outputs) {
729 LogInputs(op_kernel, context);
730 AsyncOpKernel::DoneCallback parent_done = done;
731 done = [this, parent_done, should_log_inputs_and_outputs, op_kernel,
732 context]() {
733 LogOutputs(op_kernel, context);
734 parent_done();
735 };
736 }
737
738 ScopedActivateExecutorContext scoped_activation{stream->parent()};
739 op_kernel->ComputeAsync(context, std::move(done));
740 }
741
MaybeCopyTensorToGPU(const AllocatorAttributes & alloc_attrs,const Tensor & from,Tensor * to,StatusCallback done)742 Status BaseGPUDevice::MaybeCopyTensorToGPU(
743 const AllocatorAttributes& alloc_attrs, const Tensor& from, Tensor* to,
744 StatusCallback done) {
745 if (alloc_attrs.on_host()) {
746 *to = from;
747 done(Status::OK());
748 return Status::OK();
749 } else {
750 if (!DMAHelper::CanUseDMA(&from)) {
751 Status err = errors::Internal("GPU copy from non-DMA ",
752 DataTypeString(from.dtype()), " tensor");
753 done(err);
754 return err;
755 }
756 AllocationAttributes allocation_attr;
757 uint64 safe_alloc_frontier = 0;
758 std::function<uint64()> freed_by_func = [this, &safe_alloc_frontier]() {
759 safe_alloc_frontier = SafeAllocFrontier(safe_alloc_frontier);
760 return safe_alloc_frontier;
761 };
762 if (timestamped_allocator_) {
763 allocation_attr.freed_by_func = &freed_by_func;
764 }
765 auto* copy = new Tensor(GetAllocator(alloc_attrs), from.dtype(),
766 from.shape(), allocation_attr);
767
768 // If the tensor is not initialized, we likely ran out of memory.
769 if (!copy->IsInitialized()) {
770 delete copy;
771 Status err = errors::ResourceExhausted(
772 "OOM when allocating tensor of shape ", from.shape().DebugString(),
773 " and type ", DataTypeString(from.dtype()));
774 done(err);
775 return err;
776 }
777
778 auto wrapped_done = [to, copy, done = std::move(done)](const Status& s) {
779 if (s.ok()) {
780 *to = std::move(*copy);
781 }
782 delete copy;
783 done(s);
784 };
785
786 profiler::ScopedAnnotation annotation("MakeTensorFromProto");
787 device_context_->CopyCPUTensorToDevice(
788 &from, this, copy, std::move(wrapped_done),
789 !timestamped_allocator_ /*sync_dst_compute*/);
790 return Status::OK();
791 }
792 }
793
MakeTensorFromProto(const TensorProto & tensor_proto,const AllocatorAttributes alloc_attrs,Tensor * tensor)794 Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto,
795 const AllocatorAttributes alloc_attrs,
796 Tensor* tensor) {
797 AllocatorAttributes attr;
798 attr.set_on_host(true);
799 attr.set_gpu_compatible(true);
800 Allocator* host_alloc = GetAllocator(attr);
801 Tensor parsed(tensor_proto.dtype());
802 if (!parsed.FromProto(host_alloc, tensor_proto)) {
803 return errors::InvalidArgument("Cannot parse tensor from proto: ",
804 tensor_proto.DebugString());
805 }
806
807 ScopedMemoryDebugAnnotation op_annotation("MakeTensorFromProto", "dynamic",
808 parsed.dtype(), &parsed.shape());
809 if (parsed.dtype() == DT_VARIANT) {
810 const Variant* from = parsed.flat<Variant>().data();
811 int numa_node = attributes().locality().numa_node();
812 Tensor copy(cpu_allocator(numa_node), DT_VARIANT, parsed.shape());
813 Variant* copy_variant = copy.flat<Variant>().data();
814
815 std::list<Notification> notifications;
816 Status copy_status;
817 auto copier = [this, &alloc_attrs, ¬ifications, ©_status](
818 const Tensor& from, Tensor* to) {
819 // Copier isn't run in a multithreaded environment, so we don't
820 // have to worry about the notifications list being modified in parallel.
821 notifications.emplace_back();
822 Notification& n = *notifications.rbegin();
823 return MaybeCopyTensorToGPU(alloc_attrs, from, to,
824 [&n, ©_status](const Status& s) {
825 if (copy_status.ok()) {
826 copy_status.Update(s);
827 }
828 n.Notify();
829 });
830 };
831 Status s;
832 for (int64_t ix = 0; ix < parsed.NumElements(); ++ix) {
833 s = VariantDeviceCopy(VariantDeviceCopyDirection::HOST_TO_DEVICE,
834 from[ix], ©_variant[ix], copier);
835 if (!s.ok()) {
836 break;
837 }
838 }
839 for (auto& n : notifications) {
840 n.WaitForNotification();
841 }
842 if (!s.ok()) {
843 return s;
844 }
845 *tensor = std::move(copy);
846 return copy_status;
847 } else {
848 Notification n;
849 Status status;
850 TF_RETURN_IF_ERROR(MaybeCopyTensorToGPU(alloc_attrs, parsed, tensor,
851 [&n, &status](const Status& s) {
852 status = s;
853 n.Notify();
854 }));
855 n.WaitForNotification();
856 return status;
857 }
858 }
859
CopyTensorInSameDevice(const Tensor * input_tensor,Tensor * output_tensor,const DeviceContext * device_context,StatusCallback done)860 void BaseGPUDevice::CopyTensorInSameDevice(const Tensor* input_tensor,
861 Tensor* output_tensor,
862 const DeviceContext* device_context,
863 StatusCallback done) {
864 GPUUtil::CopyGPUTensorToSameGPU(static_cast<Device*>(this), device_context,
865 input_tensor, output_tensor, std::move(done));
866 }
867
868 namespace {
869 class ConcretePerOpGpuDevice : public PerOpGpuDevice {
870 public:
ConcretePerOpGpuDevice()871 ConcretePerOpGpuDevice() : device_(&stream_device_) {}
872
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,TfDeviceId tf_device_id,Allocator * base_allocator,char * scratch)873 void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
874 TfDeviceId tf_device_id, Allocator* base_allocator,
875 char* scratch) {
876 stream_device_.Reinitialize(context, gpu_stream, tf_device_id,
877 base_allocator, scratch);
878 }
879
device() const880 const Eigen::GpuDevice& device() const override { return device_; }
881
882 private:
883 EigenGpuStreamDevice stream_device_;
884 Eigen::GpuDevice device_;
885 };
886
VerifyVirtualDeviceSettings(const size_t num_gpus_to_use,const GPUOptions & gpu_options,const std::vector<PlatformDeviceId> & visible_gpu_order,const std::vector<PlatformDeviceId> & valid_platform_device_ids,const std::map<int,std::pair<int,int>> & supported_priority_ranges)887 Status VerifyVirtualDeviceSettings(
888 const size_t num_gpus_to_use, const GPUOptions& gpu_options,
889 const std::vector<PlatformDeviceId>& visible_gpu_order,
890 const std::vector<PlatformDeviceId>& valid_platform_device_ids,
891 const std::map<int, std::pair<int, int>>& supported_priority_ranges) {
892 const auto& virtual_devices = gpu_options.experimental().virtual_devices();
893 CHECK(!virtual_devices.empty());
894 if (gpu_options.per_process_gpu_memory_fraction() > 0) {
895 return errors::InvalidArgument(
896 "It's invalid to set per_process_gpu_memory_fraction when "
897 "virtual_devices is set.");
898 }
899 if (num_gpus_to_use < virtual_devices.size()) {
900 return errors::Unknown(
901 "Not enough GPUs to create virtual devices."
902 " num_gpus_to_use: ",
903 num_gpus_to_use, " #virtual_devices: ", virtual_devices.size());
904 }
905 if (!gpu_options.visible_device_list().empty() &&
906 visible_gpu_order.size() != virtual_devices.size()) {
907 return errors::InvalidArgument(
908 "The number of GPUs in visible_device_list doesn't match the number "
909 "of elements in the virtual_devices list.",
910 " #GPUs in visible_device_list: ", visible_gpu_order.size(),
911 " virtual_devices.size(): ", virtual_devices.size());
912 }
913 if (valid_platform_device_ids.size() != virtual_devices.size()) {
914 return errors::Unknown(
915 "The number of valid GPUs doesn't match the number of elements in "
916 "the virtual_devices list.",
917 " #valid GPUs: ", valid_platform_device_ids.size(),
918 " virtual_devices.size(): ", virtual_devices.size());
919 }
920 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
921 // Check memory_limt_mb and priority sizes match if priority is non-empty.
922 bool priority_exists = !virtual_devices.Get(0).priority().empty();
923 for (int i = 0; i < virtual_devices.size(); ++i) {
924 const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
925 const auto& priority = virtual_devices.Get(i).priority();
926 // If the priority is empty in the first one then treat this as having no
927 // priority set in any of the virtual devices for backward compatibility.
928 // Either it's set for all or none.
929 if (!priority_exists) {
930 if (!priority.empty()) {
931 return errors::InvalidArgument(
932 "Priority must be set for all virtual devices or none. But the "
933 "priority is specified for ",
934 i,
935 " while previous devices didn't "
936 "have any set.");
937 }
938 }
939 if (priority_exists && memory_limit_mb.size() != priority.size()) {
940 return errors::InvalidArgument(
941 "Number of virtual device priorities specified doesn't "
942 "match with number of memory_limit_mb specified for GPU# ",
943 i, " memory_limit_mb size: ", memory_limit_mb.size(),
944 " and priority size: ", priority.size());
945 }
946 const int gpu_id = valid_platform_device_ids[i].value();
947 auto it = supported_priority_ranges.find(gpu_id);
948 if (it == supported_priority_ranges.end()) {
949 return errors::Internal(
950 "Failed to find supported priority range for GPU"
951 " device ",
952 gpu_id);
953 }
954 const std::pair<int, int>& priority_range = it->second;
955 for (int p : priority) {
956 if (p > priority_range.first || p < priority_range.second) {
957 return errors::InvalidArgument(
958 "Priority ", p,
959 " is outside the range of supported priorities "
960 "[",
961 priority_range.second, ",", priority_range.first,
962 "] for virtual device ", i, " on GPU# ", gpu_id);
963 }
964 }
965 }
966 #endif
967
968 return Status::OK();
969 }
970
MinSystemMemory(int64_t available_memory,int cc_major)971 int64 MinSystemMemory(int64_t available_memory, int cc_major) {
972 // We use the following heuristic for now:
973 //
974 // If the available_memory is < 2GiB, we allocate 225MiB to system memory.
975 // Otherwise, depending on the capability version assign
976 // 500MiB (for cuda_compute_capability <= 6.x) or
977 // 1050MiB (for cuda_compute_capability <= 7.x) or
978 // 1536MiB (for cuda_compute_capability >= 8.x)
979 int64_t min_system_memory;
980 if (available_memory < (1LL << 31)) {
981 min_system_memory = 225 * 1024 * 1024;
982 } else {
983 if (cc_major <= 6) {
984 min_system_memory = 500 * 1024 * 1024;
985 } else if (cc_major <= 7) {
986 min_system_memory = 1050 * 1024 * 1024;
987 } else {
988 min_system_memory = 1536 * 1024 * 1024;
989 }
990 }
991 #if defined(__GNUC__) && defined(__OPTIMIZE__)
992 // Do nothing
993 #elif !defined(__GNUC__) && defined(NDEBUG)
994 // Do nothing
995 #else
996 // Double the amount of available GPU memory in non-opt builds (debug
997 // builds in windows); because in non-opt builds more system memory
998 // is necessary.
999 min_system_memory *= 2;
1000 #endif
1001
1002 #if defined(ANDROID_TEGRA)
1003 // 1GB system mem for NVIDIA Tegra devices since they use the same mem for
1004 // RAM and Video RAM
1005 min_system_memory = 1 << 30;
1006 #endif
1007
1008 VLOG(5) << "available_memory = " << available_memory;
1009 VLOG(5) << "min_system_memory = " << min_system_memory;
1010 return min_system_memory;
1011 }
1012
1013 // Get the memory limit for the virtual device being created on GPU with
1014 // 'platform_device_id', when that virtual device is the only virtual device
1015 // being created on that GPU.
SingleVirtualDeviceMemoryLimit(const GPUOptions & gpu_options,PlatformDeviceId platform_device_id,int64 * memory_limit)1016 Status SingleVirtualDeviceMemoryLimit(const GPUOptions& gpu_options,
1017 PlatformDeviceId platform_device_id,
1018 int64* memory_limit) {
1019 int64_t total_memory = 0;
1020 int64_t available_memory = 0;
1021 se::StreamExecutor* se = DeviceIdUtil::ExecutorForPlatformDeviceId(
1022 GPUMachineManager(), platform_device_id)
1023 .ValueOrDie();
1024 if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
1025 return errors::Unknown("Failed to query available memory for GPU ",
1026 platform_device_id.value());
1027 }
1028
1029 int64_t allocated_memory = 0;
1030 const double per_process_gpu_memory_fraction =
1031 gpu_options.per_process_gpu_memory_fraction();
1032 se::CudaComputeCapability cc =
1033 se->GetDeviceDescription().cuda_compute_capability();
1034 if ((per_process_gpu_memory_fraction > 1.0 ||
1035 gpu_options.experimental().use_unified_memory()) &&
1036 !cc.IsAtLeast(se::CudaComputeCapability::PASCAL_)) {
1037 return errors::Internal(
1038 "Unified memory on GPUs with compute capability lower than 6.0 "
1039 "(pre-Pascal class GPUs) does not support oversubscription.");
1040 }
1041
1042 if (per_process_gpu_memory_fraction == 0) {
1043 allocated_memory = available_memory;
1044 const int64 min_system_memory = MinSystemMemory(available_memory, cc.major);
1045 if (min_system_memory < allocated_memory) {
1046 allocated_memory -= min_system_memory;
1047 }
1048 } else {
1049 allocated_memory = total_memory * per_process_gpu_memory_fraction;
1050 }
1051
1052 // Override the excluded memory when TF_DEVICE_MIN_SYS_MEMORY_IN_MB is set.
1053 const char* force_device_reserved_bytes =
1054 std::getenv("TF_DEVICE_MIN_SYS_MEMORY_IN_MB");
1055 if (force_device_reserved_bytes != nullptr &&
1056 strcmp(force_device_reserved_bytes, "") != 0) {
1057 int64 reserved_mb;
1058 if (!strings::safe_strto64(force_device_reserved_bytes, &reserved_mb) ||
1059 reserved_mb < 0) {
1060 LOG(WARNING) << "The requested reserved device memory "
1061 << force_device_reserved_bytes
1062 << " is invalid. The request will be ignored.";
1063 } else {
1064 // Convert MBytes to Bytes.
1065 int64 allowable_reserved_memory = reserved_mb * 1024 * 1024;
1066 // TF_DEVICE_MIN_SYS_MEMORY_IN_MB overrides
1067 // per_process_gpu_memory_fraction.
1068 if (allowable_reserved_memory <= available_memory) {
1069 allocated_memory = available_memory - allowable_reserved_memory;
1070 VLOG(1) << "Setting the GPU reserved bytes to "
1071 << strings::HumanReadableNumBytes(allocated_memory)
1072 << " MBytes";
1073 } else {
1074 LOG(WARNING) << "The requested reserved device memory "
1075 << strings::HumanReadableNumBytes(
1076 allowable_reserved_memory)
1077 << " is larger than the available memory of "
1078 << strings::HumanReadableNumBytes(available_memory)
1079 << ". The request is ignored.";
1080 }
1081 }
1082 }
1083 *memory_limit = allocated_memory;
1084 return Status::OK();
1085 }
1086 } // namespace
1087
ReinitializeDevice(OpKernelContext * context,PerOpGpuDevice * device,int stream_id,Allocator * allocator)1088 void BaseGPUDevice::ReinitializeDevice(OpKernelContext* context,
1089 PerOpGpuDevice* device, int stream_id,
1090 Allocator* allocator) {
1091 ConcretePerOpGpuDevice* concrete_device =
1092 static_cast<ConcretePerOpGpuDevice*>(device);
1093 DCHECK(concrete_device);
1094 DCHECK_EQ(stream_id, 0);
1095 const gpuStream_t* gpu_stream = reinterpret_cast<const gpuStream_t*>(
1096 stream_->compute->implementation()->GpuStreamMemberHack());
1097 concrete_device->Reinitialize(context, gpu_stream, tf_device_id_, allocator,
1098 scratch_);
1099 }
1100
MakeGpuDevice()1101 PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice() {
1102 return new ConcretePerOpGpuDevice();
1103 }
1104
ReinitializeGpuDevice(OpKernelContext * context,PerOpGpuDevice * device,DeviceContext * dc,Allocator * allocator)1105 Status BaseGPUDevice::ReinitializeGpuDevice(OpKernelContext* context,
1106 PerOpGpuDevice* device,
1107 DeviceContext* dc,
1108 Allocator* allocator) {
1109 TF_RETURN_IF_ERROR(InitScratchBuffers());
1110 if (dc) {
1111 const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc);
1112 const int stream_id = gpu_dc->stream_id();
1113 VLOG(1) << " eigen_gpu_device(" << dc << ") => stream[" << stream_id
1114 << "]";
1115 CHECK_EQ(stream_id, 0);
1116 ReinitializeDevice(context, device, stream_id, allocator);
1117 } else {
1118 ReinitializeDevice(context, device, 0, allocator);
1119 }
1120 return Status::OK();
1121 }
1122
GetScopedAllocator(AllocatorAttributes attr,int64_t step_id)1123 Allocator* BaseGPUDevice::GetScopedAllocator(AllocatorAttributes attr,
1124 int64_t step_id) {
1125 if (attr.scope_id > 0) {
1126 return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance(
1127 attr.scope_id);
1128 }
1129 LOG(FATAL) << "Unexpected call to BaseGPUDevice::GetScopedAllocator "
1130 << "attr.scope_id = " << attr.scope_id;
1131 return gpu_allocator_;
1132 }
1133
1134 const int BaseGPUDeviceFactory::InterconnectMap::kSameDeviceStrength = 1000;
1135 const int BaseGPUDeviceFactory::InterconnectMap::kStreamExecutorStrength = 1;
1136
CacheDeviceIds()1137 Status BaseGPUDeviceFactory::CacheDeviceIds() {
1138 if (!cached_device_ids_.empty()) {
1139 return Status::OK();
1140 }
1141
1142 TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1143 se::Platform* gpu_manager = GPUMachineManager();
1144 if (gpu_manager == nullptr) {
1145 return Status::OK();
1146 }
1147
1148 int device_count = gpu_manager->VisibleDeviceCount();
1149 if (device_count <= 0) {
1150 return Status::OK();
1151 }
1152
1153 std::vector<PlatformDeviceId> visible_gpu_order(device_count);
1154 std::iota(visible_gpu_order.begin(), visible_gpu_order.end(), 0);
1155 TF_RETURN_IF_ERROR(GetValidDeviceIds(visible_gpu_order, &cached_device_ids_));
1156 return Status::OK();
1157 }
1158
ListPhysicalDevices(std::vector<string> * devices)1159 Status BaseGPUDeviceFactory::ListPhysicalDevices(std::vector<string>* devices) {
1160 TF_RETURN_IF_ERROR(CacheDeviceIds());
1161 for (PlatformDeviceId platform_device_id : cached_device_ids_) {
1162 const string device_name =
1163 strings::StrCat("/physical_device:GPU:", platform_device_id.value());
1164 devices->push_back(device_name);
1165 }
1166
1167 return Status::OK();
1168 }
1169
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)1170 Status BaseGPUDeviceFactory::GetDeviceDetails(
1171 int device_index, std::unordered_map<string, string>* details) {
1172 TF_RETURN_IF_ERROR(CacheDeviceIds());
1173
1174 if (device_index < 0 || device_index > cached_device_ids_.size()) {
1175 return errors::Internal("Invalid device index: ", device_index);
1176 }
1177 PlatformDeviceId platform_device_id = cached_device_ids_[device_index];
1178
1179 TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1180 se::Platform* gpu_manager = GPUMachineManager();
1181 if (gpu_manager == nullptr) {
1182 return errors::Internal("Cannot get GPUMachineManager");
1183 }
1184 auto desc_status =
1185 gpu_manager->DescriptionForDevice(platform_device_id.value());
1186 if (!desc_status.ok()) {
1187 return desc_status.status();
1188 }
1189
1190 auto desc = desc_status.ConsumeValueOrDie();
1191 (*details)["device_name"] = desc->name();
1192 #if GOOGLE_CUDA
1193 (*details)["compute_capability"] = desc->cuda_compute_capability().ToString();
1194 #endif // GOOGLE_CUDA
1195 return Status::OK();
1196 }
1197
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)1198 Status BaseGPUDeviceFactory::CreateDevices(
1199 const SessionOptions& options, const string& name_prefix,
1200 std::vector<std::unique_ptr<Device>>* devices) {
1201 TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1202 se::Platform* gpu_manager = GPUMachineManager();
1203 if (gpu_manager == nullptr) {
1204 return Status::OK();
1205 }
1206 // If there are no GPUs visible, do nothing.
1207 if (gpu_manager->VisibleDeviceCount() <= 0) {
1208 return Status::OK();
1209 }
1210
1211 size_t num_gpus_to_use = INT_MAX;
1212 auto iter = options.config.device_count().find("GPU");
1213 if (iter != options.config.device_count().end()) {
1214 num_gpus_to_use = iter->second;
1215 }
1216 const auto& gpu_options = options.config.gpu_options();
1217 std::vector<PlatformDeviceId> visible_gpu_order;
1218 std::vector<PlatformDeviceId> valid_platform_device_ids;
1219 // If we aren't going to use any GPUs, don't initialize them.
1220 // We don't want to call ParseVisibleDeviceList if num_gpus_to_use is 0,
1221 // because it treats an empty gpu_options.visible_device_list as 'all GPUs
1222 // are visible'.
1223 if (num_gpus_to_use > 0) {
1224 TF_RETURN_IF_ERROR(DeviceIdUtil::ParseVisibleDeviceList(
1225 gpu_options.visible_device_list(), gpu_manager->VisibleDeviceCount(),
1226 &visible_gpu_order));
1227 bool new_gpu_found = false;
1228 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1229 int visible_gpu_id = visible_gpu_order[i].value();
1230
1231 // Only perform this once per visible gpu id.
1232 if (visible_gpu_initialized_[visible_gpu_id]) {
1233 continue;
1234 }
1235
1236 visible_gpu_initialized_[visible_gpu_id] = true;
1237 new_gpu_found = true;
1238 }
1239
1240 // Checking peering and shows matrix if more than one gpu found.
1241 if (new_gpu_found && visible_gpu_order.size() > 1) {
1242 // Enable peer access
1243 TF_RETURN_IF_ERROR(EnablePeerAccess(visible_gpu_order));
1244 }
1245
1246 TF_RETURN_IF_ERROR(
1247 GetValidDeviceIds(visible_gpu_order, &valid_platform_device_ids));
1248 }
1249 if (num_gpus_to_use > valid_platform_device_ids.size()) {
1250 num_gpus_to_use = valid_platform_device_ids.size();
1251 }
1252 std::map<int, std::pair<int, int>> supported_priority_ranges;
1253 if (!valid_platform_device_ids.empty()) {
1254 // Save the original device.
1255 int original_device = 0;
1256 #if GOOGLE_CUDA
1257 cudaError_t err = cudaGetDevice(&original_device);
1258 if (err != cudaSuccess) {
1259 return errors::Internal("cudaGetDevice() failed. Status: ",
1260 cudaGetErrorString(err));
1261 }
1262 #elif TENSORFLOW_USE_ROCM
1263 hipError_t err = hipGetDevice(&original_device);
1264 if (err != hipSuccess) {
1265 return errors::Internal("hipGetDevice() failed. Status: ",
1266 hipGetErrorString(err));
1267 }
1268 #endif
1269
1270 // Force to implicitly initialize CUDA runtime on each valid GPU before
1271 // CreateGPUDevice().
1272 for (PlatformDeviceId platform_device_id : valid_platform_device_ids) {
1273 #if GOOGLE_CUDA
1274 err = cudaSetDevice(platform_device_id.value());
1275 if (err != cudaSuccess) {
1276 return errors::Internal(
1277 "cudaSetDevice() on GPU:", platform_device_id.value(),
1278 " failed. Status: ", cudaGetErrorString(err));
1279 }
1280 int priority_low, priority_high;
1281 cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1282 if (err != cudaSuccess) {
1283 return errors::Internal(
1284 "cudaDeviceGetStreamPriorityRange() on GPU:", original_device,
1285 " failed. Status: ", cudaGetErrorString(err));
1286 }
1287 VLOG(1) << "Cuda stream priority range on GPU(" << original_device
1288 << "): " << priority_high << "," << priority_low;
1289 supported_priority_ranges.insert(
1290 std::make_pair(platform_device_id.value(),
1291 std::make_pair(priority_low, priority_high)));
1292 #elif TENSORFLOW_USE_ROCM
1293 err = hipSetDevice(platform_device_id.value());
1294 if (err != hipSuccess) {
1295 return errors::Internal(
1296 "hipSetDevice() on GPU:", platform_device_id.value(),
1297 " failed. Status: ", hipGetErrorString(err));
1298 }
1299 err = hipFree(nullptr);
1300 if (err != hipSuccess) {
1301 return errors::Internal("ROCm runtime implicit initialization on GPU:",
1302 platform_device_id.value(),
1303 " failed. Status: ", hipGetErrorString(err));
1304 }
1305 int priority_low, priority_high;
1306 hipDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1307 if (err != hipSuccess) {
1308 return errors::Internal(
1309 "hipDeviceGetStreamPriorityRange() on GPU:", original_device,
1310 " failed. Status: ", hipGetErrorString(err));
1311 }
1312 VLOG(1) << "HIP stream priority range on GPU(" << original_device
1313 << "): " << priority_high << "," << priority_low;
1314 supported_priority_ranges.insert(
1315 std::make_pair(platform_device_id.value(),
1316 std::make_pair(priority_low, priority_high)));
1317 #endif
1318 }
1319 // Reset to the original device.
1320 #if GOOGLE_CUDA
1321 err = cudaSetDevice(original_device);
1322 if (err != cudaSuccess) {
1323 return errors::Internal("cudaSetDevice() on GPU:", original_device,
1324 " failed. Status: ", cudaGetErrorString(err));
1325 }
1326 #elif TENSORFLOW_USE_ROCM
1327 err = hipSetDevice(original_device);
1328 if (err != hipSuccess) {
1329 return errors::Internal("hipSetDevice() on GPU:", original_device,
1330 " failed. Status: ", hipGetErrorString(err));
1331 }
1332 #endif
1333
1334 #if GOOGLE_CUDA
1335 // Log the version of CUDA and cuDNN
1336 int cuda_major_version = CUDART_VERSION / 1000;
1337 int cuda_minor_version = (CUDART_VERSION / 10) % 10;
1338 VLOG(1) << "TensorFlow compiled with CUDA " << cuda_major_version << "."
1339 << cuda_minor_version << " and cuDNN " << CUDNN_MAJOR << "."
1340 << CUDNN_MINOR << "." << CUDNN_PATCHLEVEL;
1341 #endif
1342 }
1343
1344 std::vector<InterconnectMap> interconnect_maps;
1345 TF_RETURN_IF_ERROR(
1346 GetInterconnectMaps(visible_gpu_order, gpu_manager, &interconnect_maps));
1347
1348 // Print each interconnect map to the log.
1349 for (const InterconnectMap& im : interconnect_maps) {
1350 VLOG(1) << "Device interconnect " << im.name << " with strength "
1351 << im.strength << " edge matrix:";
1352 string line_buf = " ";
1353 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1354 strings::StrAppend(&line_buf, visible_gpu_order[i].value(), " ");
1355 }
1356 VLOG(1) << line_buf;
1357 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1358 line_buf = strings::StrCat(visible_gpu_order[i].value(), ": ");
1359 PlatformDeviceId gpu_id_i = visible_gpu_order[i];
1360 for (int j = 0; j < visible_gpu_order.size(); ++j) {
1361 PlatformDeviceId gpu_id_j = visible_gpu_order[j];
1362 if (im.directed_links.find({gpu_id_i, gpu_id_j}) !=
1363 im.directed_links.end()) {
1364 line_buf.append("Y ");
1365 } else {
1366 line_buf.append("N ");
1367 }
1368 }
1369 VLOG(1) << line_buf;
1370 }
1371 }
1372
1373 const auto& virtual_devices = gpu_options.experimental().virtual_devices();
1374 if (!virtual_devices.empty()) {
1375 TF_RETURN_IF_ERROR(VerifyVirtualDeviceSettings(
1376 num_gpus_to_use, gpu_options, visible_gpu_order,
1377 valid_platform_device_ids, supported_priority_ranges));
1378 // We've verified that num_gpus_to_use >= virtual_devices.size().
1379 num_gpus_to_use = virtual_devices.size();
1380 CHECK(gpu_options.visible_device_list().empty() ||
1381 valid_platform_device_ids == visible_gpu_order);
1382 }
1383 int next_tf_device_id = 0;
1384 std::vector<int64> memory_limit_bytes;
1385 for (int i = 0; i < num_gpus_to_use; ++i) {
1386 const PlatformDeviceId platform_device_id = valid_platform_device_ids[i];
1387 if (virtual_devices.empty() ||
1388 virtual_devices.Get(i).memory_limit_mb_size() == 0) {
1389 int64_t single_virtual_device_memory_limit = 0;
1390 TF_RETURN_IF_ERROR(
1391 SingleVirtualDeviceMemoryLimit(gpu_options, platform_device_id,
1392 &single_virtual_device_memory_limit));
1393 memory_limit_bytes.push_back(single_virtual_device_memory_limit);
1394 } else {
1395 const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
1396 std::transform(memory_limit_mb.begin(), memory_limit_mb.end(),
1397 std::back_inserter(memory_limit_bytes), [](float mb) {
1398 return static_cast<int64>(mb) * (1ll << 20);
1399 });
1400 }
1401 while (next_tf_device_id < memory_limit_bytes.size()) {
1402 TfDeviceId tf_device_id(next_tf_device_id);
1403 ++next_tf_device_id;
1404 TF_RETURN_IF_ERROR(GpuIdManager::InsertTfPlatformDeviceIdPair(
1405 tf_device_id, platform_device_id));
1406 }
1407 }
1408 const int num_tf_gpus = next_tf_device_id;
1409
1410 LocalityMap device_localities;
1411 TF_RETURN_IF_ERROR(
1412 GetDeviceLocalities(num_tf_gpus, interconnect_maps, &device_localities));
1413
1414 // Build the GPUDevices
1415 CHECK_EQ(next_tf_device_id, memory_limit_bytes.size());
1416 for (int di = 0; di < num_tf_gpus; ++di) {
1417 TfDeviceId tf_device_id(di);
1418 int64_t bytes = memory_limit_bytes[di];
1419 auto it = device_localities.find(tf_device_id);
1420 if (it == device_localities.end()) {
1421 return errors::Internal("Failed to find DeviceLocality for GPU device ",
1422 tf_device_id.value());
1423 }
1424 TF_RETURN_IF_ERROR(CreateGPUDevice(options, name_prefix, tf_device_id,
1425 bytes, it->second, num_tf_gpus,
1426 devices));
1427 }
1428 return Status::OK();
1429 }
1430
GetShortDeviceDescription(PlatformDeviceId platform_device_id,const se::DeviceDescription & desc)1431 static string GetShortDeviceDescription(PlatformDeviceId platform_device_id,
1432 const se::DeviceDescription& desc) {
1433 #if GOOGLE_CUDA
1434 // LINT.IfChange
1435 return strings::StrCat(
1436 "device: ", platform_device_id.value(), ", name: ", desc.name(),
1437 ", pci bus id: ", desc.pci_bus_id(),
1438 ", compute capability: ", desc.cuda_compute_capability().ToString());
1439 // LINT.ThenChange(//tensorflow/python/framework/gpu_util.py)
1440 #elif TENSORFLOW_USE_ROCM
1441 return strings::StrCat("device: ", platform_device_id.value(),
1442 ", name: ", desc.name(),
1443 ", pci bus id: ", desc.pci_bus_id());
1444 #endif
1445 }
1446
CreateGPUDevice(const SessionOptions & options,const string & name_prefix,TfDeviceId tf_device_id,int64_t memory_limit,const DeviceLocality & dev_locality,size_t num_tf_gpus,std::vector<std::unique_ptr<Device>> * devices)1447 Status BaseGPUDeviceFactory::CreateGPUDevice(
1448 const SessionOptions& options, const string& name_prefix,
1449 TfDeviceId tf_device_id, int64_t memory_limit,
1450 const DeviceLocality& dev_locality, size_t num_tf_gpus,
1451 std::vector<std::unique_ptr<Device>>* devices) {
1452 CHECK_GE(tf_device_id.value(), 0);
1453 const string device_name =
1454 strings::StrCat(name_prefix, "/device:GPU:", tf_device_id.value());
1455 DeviceIdUtil::CheckValidTfDeviceId(DEVICE_GPU, GPUMachineManager(),
1456 tf_device_id);
1457 PlatformDeviceId platform_device_id;
1458 TF_RETURN_IF_ERROR(
1459 GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1460 int numa_node = dev_locality.numa_node();
1461
1462 se::Platform* gpu_manager = GPUMachineManager();
1463 auto desc_status =
1464 gpu_manager->DescriptionForDevice(platform_device_id.value());
1465 if (!desc_status.ok()) {
1466 return desc_status.status();
1467 }
1468 auto desc = desc_status.ConsumeValueOrDie();
1469
1470 std::vector<TfDeviceId> peer_gpu_ids;
1471 peer_gpu_ids.reserve(num_tf_gpus);
1472 for (int id = 0; id < num_tf_gpus; ++id) {
1473 TfDeviceId peer_tf_device_id(id);
1474 if (peer_tf_device_id != tf_device_id) {
1475 peer_gpu_ids.push_back(peer_tf_device_id);
1476 }
1477 }
1478
1479 GPUProcessState* process_state = GPUProcessState::singleton();
1480 Allocator* gpu_allocator = process_state->GetGPUAllocator(
1481 options.config.gpu_options(), tf_device_id, memory_limit, peer_gpu_ids);
1482 if (gpu_allocator == nullptr) {
1483 return errors::Internal("Failed to get memory allocator for TF GPU ",
1484 tf_device_id.value(), " with ", memory_limit,
1485 " bytes of memory.");
1486 }
1487 absl::optional<AllocatorStats> stats = gpu_allocator->GetStats();
1488 if (!stats) {
1489 return errors::Internal("No allocator statistics");
1490 }
1491 // 'memory_limit' is the required memory size, but if the allocator with
1492 // given tf_device_id was created before, we'll use it instead of creating a
1493 // new one (as TF gpu device is a shared resource), in which case the actual
1494 // memory limit represented by 'stats.bytes_limit' used by that allocator
1495 // may be different (which should be an error).
1496 //
1497 // TODO(laigd): report error if memory_limit doesn't match
1498 // stats->bytes_limit.
1499 int64_t bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
1500 std::unique_ptr<BaseGPUDevice> gpu_device = CreateGPUDevice(
1501 options, device_name, static_cast<Bytes>(bytes_limit), dev_locality,
1502 tf_device_id, GetShortDeviceDescription(platform_device_id, *desc),
1503 gpu_allocator, ProcessState::singleton()->GetCPUAllocator(numa_node));
1504 LOG(INFO) << "Created device " << device_name << " with "
1505 << (bytes_limit >> 20) << " MB memory: "
1506 << " -> " << GetShortDeviceDescription(platform_device_id, *desc);
1507 TF_RETURN_IF_ERROR(gpu_device->Init(options));
1508 gpu_allocator->SetStream(gpu_device->GetStream());
1509 devices->push_back(std::move(gpu_device));
1510
1511 return Status::OK();
1512 }
1513
1514 namespace {
1515 std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
GetPeerAccessMap(se::Platform * platform,const std::vector<PlatformDeviceId> & visible_gpu_order)1516 GetPeerAccessMap(se::Platform* platform,
1517 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1518 std::unique_ptr<std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>>
1519 map(new std::map<std::pair<PlatformDeviceId, PlatformDeviceId>, bool>);
1520 for (PlatformDeviceId platform_gpu_i : visible_gpu_order) {
1521 for (PlatformDeviceId platform_gpu_j : visible_gpu_order) {
1522 se::StreamExecutor* from =
1523 DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_i)
1524 .ValueOrDie();
1525 se::StreamExecutor* to =
1526 DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_j)
1527 .ValueOrDie();
1528 (*map)[{platform_gpu_i, platform_gpu_j}] =
1529 from->CanEnablePeerAccessTo(to);
1530 }
1531 }
1532
1533 return map;
1534 }
1535
1536 } // namespace
1537
GetInterconnectMaps(const std::vector<PlatformDeviceId> & visible_gpu_order,se::Platform * gpu_manager,std::vector<InterconnectMap> * maps)1538 Status BaseGPUDeviceFactory::GetInterconnectMaps(
1539 const std::vector<PlatformDeviceId>& visible_gpu_order,
1540 se::Platform* gpu_manager, std::vector<InterconnectMap>* maps) {
1541 // The default interconnect map is obtained from the StreamExecutor.
1542 auto access_map = GetPeerAccessMap(gpu_manager, visible_gpu_order);
1543 maps->resize(1);
1544 InterconnectMap& imap = maps->at(0);
1545 imap.name = "StreamExecutor";
1546 imap.strength = InterconnectMap::kStreamExecutorStrength;
1547 for (PlatformDeviceId gpu_id_i : visible_gpu_order) {
1548 for (PlatformDeviceId gpu_id_j : visible_gpu_order) {
1549 if (gpu_id_i == gpu_id_j) continue;
1550 if ((*access_map)[{gpu_id_i, gpu_id_j}]) {
1551 imap.directed_links.insert({gpu_id_i, gpu_id_j});
1552 }
1553 }
1554 }
1555 return Status::OK();
1556 }
1557
GetDeviceLocalities(int num_tf_gpus,const std::vector<InterconnectMap> & interconnects,LocalityMap * localities)1558 Status BaseGPUDeviceFactory::GetDeviceLocalities(
1559 int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
1560 LocalityMap* localities) {
1561 std::vector<TfDeviceId> all_tf_device_ids;
1562 all_tf_device_ids.reserve(num_tf_gpus);
1563 for (int i = 0; i < num_tf_gpus; ++i) {
1564 all_tf_device_ids.push_back(TfDeviceId(i));
1565 }
1566 for (TfDeviceId tf_device_id : all_tf_device_ids) {
1567 PlatformDeviceId platform_device_id;
1568 TF_RETURN_IF_ERROR(
1569 GpuIdManager::TfToPlatformDeviceId(tf_device_id, &platform_device_id));
1570 // Get GPU bus_id from its reported NUMA affinity. Because GPUs are
1571 // virtualized in some environments, we can't just use the GPU id.
1572 // NUMA locales are indexed from 0, buses are indexed from 1.
1573 se::Platform* gpu_manager = GPUMachineManager();
1574 auto desc_status =
1575 gpu_manager->DescriptionForDevice(platform_device_id.value());
1576 if (!desc_status.ok()) {
1577 return desc_status.status();
1578 }
1579 auto desc = desc_status.ConsumeValueOrDie();
1580 int numa_node = desc->numa_node();
1581 if (numa_node < 0) {
1582 // For some reason the StreamExecutor couldn't get the NUMA
1583 // affinity of the GPU. If this is not a multi-socket mobo with
1584 // GPUs local to different buses, it doesn't matter. If it is, we
1585 // may run into trouble later with data transfer operations. The
1586 // trouble may manifest as slower than expected performance, or
1587 // outright failures.
1588 LOG(INFO) << "Could not identify NUMA node of platform GPU id "
1589 << platform_device_id
1590 << ", defaulting to 0. Your kernel may not have been built "
1591 << "with NUMA support.";
1592 numa_node = 0;
1593 }
1594 DeviceLocality dev_locality;
1595 dev_locality.set_numa_node(numa_node);
1596 dev_locality.set_bus_id(numa_node + 1);
1597
1598 // Set LocalLinks from InterconnectMaps.
1599 LocalLinks* links = dev_locality.mutable_links();
1600 for (const InterconnectMap& imap : interconnects) {
1601 for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1602 PlatformDeviceId platform_gpu_dst;
1603 TF_RETURN_IF_ERROR(
1604 GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1605 if (imap.directed_links.find({platform_device_id, platform_gpu_dst}) !=
1606 imap.directed_links.end()) {
1607 InterconnectLink* ilink = links->add_link();
1608 ilink->set_device_id(tf_gpu_dst.value());
1609 ilink->set_type(imap.name);
1610 ilink->set_strength(imap.strength);
1611 }
1612 }
1613 }
1614
1615 // If this is one of multiple virtual GPUs on the same physical GPU
1616 // add high strength links to the others.
1617 for (TfDeviceId tf_gpu_dst : all_tf_device_ids) {
1618 if (tf_device_id == tf_gpu_dst) continue;
1619 PlatformDeviceId platform_gpu_dst;
1620 TF_RETURN_IF_ERROR(
1621 GpuIdManager::TfToPlatformDeviceId(tf_gpu_dst, &platform_gpu_dst));
1622 if (platform_device_id == platform_gpu_dst) {
1623 InterconnectLink* ilink = links->add_link();
1624 ilink->set_device_id(tf_gpu_dst.value());
1625 ilink->set_type("SAME_DEVICE");
1626 ilink->set_strength(InterconnectMap::kSameDeviceStrength);
1627 }
1628 }
1629
1630 (*localities)[tf_device_id] = dev_locality;
1631 VLOG(1) << "GPUDevice PlatformDeviceId " << platform_device_id
1632 << " TfDeviceId " << tf_device_id << " on bus "
1633 << dev_locality.bus_id() << " numa: " << numa_node
1634 << " pci: " << desc->pci_bus_id()
1635 << " DeviceLocality: " << dev_locality.DebugString();
1636 }
1637 return Status::OK();
1638 }
1639
GetDefaultMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1640 static int GetDefaultMinGPUMultiprocessorCount(
1641 se::Platform* gpu_manager,
1642 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1643 static const int kDefaultMinGPUMultiprocessorCount = 8;
1644
1645 // Find the highest multi-processor count across all visible GPUs.
1646 int max_count = -1;
1647 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1648 int visible_gpu_id = visible_gpu_order[i].value();
1649 auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1650 if (!description_status.ok()) {
1651 continue;
1652 }
1653
1654 auto description = description_status.ConsumeValueOrDie();
1655 max_count = std::max(max_count, description->core_count());
1656 }
1657
1658 if (max_count < 0 || kDefaultMinGPUMultiprocessorCount < max_count) {
1659 return kDefaultMinGPUMultiprocessorCount;
1660 } else {
1661 return max_count;
1662 }
1663 }
1664
GetMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformDeviceId> & visible_gpu_order)1665 static int GetMinGPUMultiprocessorCount(
1666 se::Platform* gpu_manager,
1667 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1668 const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT");
1669
1670 if (tf_min_gpu_core_count == nullptr ||
1671 strcmp(tf_min_gpu_core_count, "") == 0) {
1672 return GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1673 }
1674
1675 int min_gpu_core_count = -1;
1676 if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) {
1677 if (min_gpu_core_count >= 0) {
1678 return min_gpu_core_count;
1679 }
1680 }
1681
1682 int count =
1683 GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1684 LOG(ERROR) << "Invalid minimum GPU multiprocessor count: ["
1685 << tf_min_gpu_core_count << "]. "
1686 << "Using the default value: " << count;
1687 return count;
1688 }
1689
1690 namespace {
1691
1692 #if GOOGLE_CUDA
1693
ComputeCapabilityFromString(const std::string & version_name)1694 se::CudaComputeCapability ComputeCapabilityFromString(
1695 const std::string& version_name) {
1696 int major_part, minor_part;
1697 size_t dot_pos = version_name.find('.');
1698 CHECK(dot_pos != string::npos)
1699 << "Illegal version name: [" << version_name << "]";
1700 string major_str = version_name.substr(0, dot_pos);
1701 CHECK(strings::safe_strto32(major_str, &major_part))
1702 << "Illegal version name: [" << version_name << "]";
1703 string minor_str = version_name.substr(dot_pos + 1);
1704 CHECK(strings::safe_strto32(minor_str, &minor_part))
1705 << "Illegal version name: [" << version_name << "]";
1706 return se::CudaComputeCapability{major_part, minor_part};
1707 }
1708
GetSupportedCudaComputeCapabilities()1709 std::vector<se::CudaComputeCapability> GetSupportedCudaComputeCapabilities() {
1710 std::vector<se::CudaComputeCapability> cuda_caps = {
1711 ComputeCapabilityFromString("3.5"), ComputeCapabilityFromString("5.2")};
1712 #ifdef TF_EXTRA_CUDA_CAPABILITIES
1713 // TF_EXTRA_CUDA_CAPABILITIES should be defined a sequence separated by commas,
1714 // for example:
1715 // TF_EXTRA_CUDA_CAPABILITIES=3.0,4.0,5.0
1716 // Use two-level macro expansion for stringification.
1717 #define TF_XSTRING(...) #__VA_ARGS__
1718 #define TF_STRING(s) TF_XSTRING(s)
1719 string extra_cuda_caps = TF_STRING(TF_EXTRA_CUDA_CAPABILITIES);
1720 #undef TF_STRING
1721 #undef TF_XSTRING
1722 auto extra_capabilities = str_util::Split(extra_cuda_caps, ',');
1723 for (const auto& capability : extra_capabilities) {
1724 cuda_caps.push_back(ComputeCapabilityFromString(capability));
1725 }
1726 #endif
1727 return cuda_caps;
1728 }
1729 #endif // GOOGLE_CUDA
1730
1731 #if TENSORFLOW_USE_ROCM
1732 std::vector<int> supported_amdgpu_isa_versions = {803, 900, 906, 908};
1733
GetSupportedAMDGPUISAVersions()1734 std::vector<int> GetSupportedAMDGPUISAVersions() {
1735 return supported_amdgpu_isa_versions;
1736 }
1737 #endif // TENSORFLOW_USE_ROCM
1738
1739 } // namespace
1740
EnablePeerAccess(const std::vector<PlatformDeviceId> & visible_gpu_order)1741 Status BaseGPUDeviceFactory::EnablePeerAccess(
1742 const std::vector<PlatformDeviceId>& visible_gpu_order) {
1743 se::Platform* gpu_manager = GPUMachineManager();
1744 int possible_peer_count = 0;
1745 int enabled_peer_count = 0;
1746 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1747 const PlatformDeviceId platform_gpu_i = visible_gpu_order[i];
1748 for (int j = 0; j < visible_gpu_order.size(); ++j) {
1749 const PlatformDeviceId platform_gpu_j = visible_gpu_order[j];
1750 // We have already validated that ExecutorForDevice() calls return OK.
1751 se::StreamExecutor* from =
1752 DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_i)
1753 .ValueOrDie();
1754 se::StreamExecutor* to =
1755 DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_j)
1756 .ValueOrDie();
1757
1758 if (from->CanEnablePeerAccessTo(to)) {
1759 ++possible_peer_count;
1760 auto status = from->EnablePeerAccessTo(to);
1761 if (!status.ok()) {
1762 LOG(WARNING)
1763 << "Unable to enable peer access between device ordinals "
1764 << platform_gpu_i << " and " << platform_gpu_j
1765 << ", status: " << status;
1766 } else {
1767 ++enabled_peer_count;
1768 }
1769 }
1770 }
1771 }
1772
1773 // Return an error in the extreme failure case where the driver
1774 // reported that peering was possible but not a single peering was
1775 // successful. This is to catch possible system misconfigurations
1776 // or more fundamental issues.
1777 if (possible_peer_count > 0 && enabled_peer_count == 0) {
1778 return errors::Internal(possible_peer_count,
1779 " potential peer access pairs were reported by the "
1780 "driver, but no peering could be enabled.");
1781 }
1782 return Status::OK();
1783 }
1784
GetValidDeviceIds(const std::vector<PlatformDeviceId> & visible_gpu_order,std::vector<PlatformDeviceId> * ids)1785 Status BaseGPUDeviceFactory::GetValidDeviceIds(
1786 const std::vector<PlatformDeviceId>& visible_gpu_order,
1787 std::vector<PlatformDeviceId>* ids) {
1788 se::Platform* gpu_manager = GPUMachineManager();
1789 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1790 int visible_gpu_id = visible_gpu_order[i].value();
1791 auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1792 if (!description_status.ok()) {
1793 return description_status.status();
1794 }
1795
1796 auto description = description_status.ConsumeValueOrDie();
1797 #if GOOGLE_CUDA
1798 VLOG(1) << "Found device " << i << " with properties: "
1799 << "\npciBusID: " << description->pci_bus_id()
1800 << " name: " << description->name() << " computeCapability: "
1801 << description->cuda_compute_capability().ToString()
1802 << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1803 << " coreCount: " << description->core_count()
1804 << " deviceMemorySize: "
1805 << strings::HumanReadableNumBytes(description->device_memory_size())
1806 << " deviceMemoryBandwidth: "
1807 << strings::HumanReadableNumBytes(description->memory_bandwidth())
1808 << "/s";
1809 #elif TENSORFLOW_USE_ROCM
1810 std::string gcn_arch_name = description->rocm_amdgpu_gcn_arch_name();
1811 VLOG(1) << "Found device " << i << " with properties: "
1812 << "\npciBusID: " << description->pci_bus_id()
1813 << " name: " << description->name()
1814 << " ROCm AMDGPU Arch: " << gcn_arch_name
1815 << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1816 << " coreCount: " << description->core_count()
1817 << " deviceMemorySize: "
1818 << strings::HumanReadableNumBytes(description->device_memory_size())
1819 << " deviceMemoryBandwidth: "
1820 << strings::HumanReadableNumBytes(description->memory_bandwidth())
1821 << "/s";
1822 #endif
1823 }
1824
1825 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
1826 // Try to dlopen GPU libraries if they are supposed to be dynamically loaded.
1827 auto handle_or = se::internal::DsoLoader::MaybeTryDlopenGPULibraries();
1828 if (!handle_or.ok()) {
1829 LOG(WARNING) << "Cannot dlopen some GPU libraries. Please make sure the "
1830 "missing libraries mentioned above are installed properly "
1831 "if you would like to use GPU. Follow the guide at "
1832 "https://www.tensorflow.org/install/gpu for how to "
1833 "download and setup the required libraries for your "
1834 "platform.\nSkipping registering "
1835 "GPU devices...";
1836 return Status::OK();
1837 }
1838 #endif
1839
1840 #if GOOGLE_CUDA
1841 auto cuda_supported_capabilities = GetSupportedCudaComputeCapabilities();
1842 if (cuda_supported_capabilities.empty()) {
1843 return errors::FailedPrecondition(
1844 "No supported cuda capabilities in binary.");
1845 }
1846 se::CudaComputeCapability min_supported_capability = *std::min_element(
1847 cuda_supported_capabilities.begin(), cuda_supported_capabilities.end());
1848 #elif TENSORFLOW_USE_ROCM
1849 auto rocm_supported_isas = GetSupportedAMDGPUISAVersions();
1850 if (rocm_supported_isas.empty()) {
1851 return errors::FailedPrecondition(
1852 "No supported rocm capabilities in binary.");
1853 }
1854 int min_supported_isa =
1855 *std::min_element(rocm_supported_isas.begin(), rocm_supported_isas.end());
1856 #endif
1857
1858 int min_gpu_core_count =
1859 GetMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1860
1861 // Filter out devices that don't have the right capability or power.
1862 for (int i = 0; i < visible_gpu_order.size(); ++i) {
1863 const PlatformDeviceId visible_gpu_id = visible_gpu_order[i];
1864 auto description_status =
1865 gpu_manager->DescriptionForDevice(visible_gpu_id.value());
1866 if (!description_status.ok()) {
1867 LOG(INFO) << "Ignoring visible gpu device " << visible_gpu_id
1868 << " whose executor is in invalid state: "
1869 << description_status.status().ToString();
1870 continue;
1871 }
1872
1873 auto desc = description_status.ConsumeValueOrDie();
1874
1875 #if GOOGLE_CUDA
1876 // Only GPUs with no less than the minimum supported compute capability is
1877 // accepted.
1878 if (desc->cuda_compute_capability() < min_supported_capability) {
1879 LOG(INFO) << "Ignoring visible gpu device "
1880 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1881 << ") "
1882 << "with Cuda compute capability "
1883 << desc->cuda_compute_capability().ToString()
1884 << ". The minimum required Cuda capability is "
1885 << min_supported_capability.ToString() << ".";
1886 continue;
1887 }
1888 #elif TENSORFLOW_USE_ROCM
1889 int device_isa;
1890 if (!desc->rocm_amdgpu_isa_version(&device_isa)) {
1891 continue;
1892 }
1893 // Only GPUs with no less than the minimum supported compute capability is
1894 // accepted.
1895 if (device_isa < min_supported_isa) {
1896 LOG(INFO) << "Ignoring visible gpu device "
1897 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1898 << ") "
1899 << "with AMDGPU ISA gfx" << device_isa
1900 << ". The minimum required AMDGPU ISA is gfx"
1901 << min_supported_isa << ".";
1902 continue;
1903 }
1904 #endif
1905
1906 // Filter out slow GPUs. By default, GPUs with a lower multiprocessor
1907 // count than the fastest GPU are filtered out, unless they have 8 or more
1908 // multiprocessors. If the TF_MIN_GPU_MULTIPROCESSOR_COUNT environment
1909 // variable is set, its value will be used to filter out GPUs.
1910 if (desc->core_count() < min_gpu_core_count) {
1911 LOG(INFO) << "Ignoring visible gpu device "
1912 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1913 << ") "
1914 << "with core count: " << desc->core_count()
1915 << ". The minimum required count is " << min_gpu_core_count
1916 << ". You can adjust this requirement with the env var "
1917 "TF_MIN_GPU_MULTIPROCESSOR_COUNT.";
1918 continue;
1919 }
1920
1921 #if defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
1922 // Compare compute capability of device with list in cuda_config.h and
1923 // warn about long loading time when we need to JIT kernels from PTX.
1924 auto compute_capabilities = {TF_CUDA_COMPUTE_CAPABILITIES};
1925 auto device_capability = desc->cuda_compute_capability();
1926 if (std::count_if(std::cbegin(compute_capabilities),
1927 std::cend(compute_capabilities), [&](int cc) {
1928 // CUBINs are backwards compatible within major version.
1929 return cc / 10 == device_capability.major &&
1930 cc % 10 <= device_capability.minor;
1931 }) == 0) {
1932 LOG(WARNING)
1933 << "TensorFlow was not built with CUDA kernel binaries "
1934 "compatible with compute capability "
1935 << device_capability.ToString() << ". CUDA kernels will be "
1936 << "jit-compiled from PTX, which could take 30 minutes or longer.";
1937 }
1938 #endif // defined(GOOGLE_CUDA) && !defined(PLATFORM_GOOGLE)
1939
1940 ids->push_back(visible_gpu_id);
1941 }
1942 if (!ids->empty()) {
1943 std::vector<int> raw_ids(ids->size());
1944 std::transform(ids->begin(), ids->end(), raw_ids.begin(),
1945 [](PlatformDeviceId id) -> int { return id.value(); });
1946 VLOG(1) << "Adding visible gpu devices: " << absl::StrJoin(raw_ids, ", ");
1947 }
1948
1949 return Status::OK();
1950 }
1951
SafeAllocFrontier(uint64 old_value)1952 uint64 BaseGPUDevice::SafeAllocFrontier(uint64 old_value) {
1953 if (timestamped_allocator_) {
1954 return kernel_tracker_->LastTerminatedCount(old_value);
1955 } else {
1956 return 0;
1957 }
1958 }
1959
PendingKernels()1960 int BaseGPUDevice::PendingKernels() {
1961 if (kernel_tracker_) {
1962 return kernel_tracker_->NumPending();
1963 }
1964 return 0;
1965 }
1966
TestOnlyReset()1967 void BaseGPUDevice::TestOnlyReset() {
1968 StreamGroupFactory::Global().TestOnlyReset();
1969 }
1970
MaybeQueue(OpKernelContext * ctx)1971 uint64 GPUKernelTracker::MaybeQueue(OpKernelContext* ctx) {
1972 mutex_lock l(mu_);
1973 ++ops_since_last_;
1974 int64_t mem_used =
1975 ctx->persistent_memory_allocated() + ctx->temp_memory_allocated();
1976 VLOG(2) << "kernel: " << ctx->op_kernel().name() << " mem_used: " << mem_used;
1977 mem_since_last_ += mem_used;
1978 int weight = 1;
1979 // Note that if all {max_bytes, max_interval, max_pending} are zero then
1980 // we track every single kernel with no pending cap. This can happen if
1981 // timestamped_allocator alone was specified.
1982 if ((mem_since_last_ < params_.max_bytes) &&
1983 (ops_since_last_ < params_.max_interval)) {
1984 return 0;
1985 } else {
1986 weight = std::min(
1987 params_.max_pending,
1988 std::max(1, mem_since_last_ / std::max(16386, params_.max_bytes)));
1989 mem_since_last_ = 0;
1990 ops_since_last_ = 0;
1991 }
1992 uint64 queued_count = timing_counter_->next();
1993 RecordQueued(queued_count, weight);
1994 return queued_count;
1995 }
1996
RecordQueued(uint64 queued_count,int weight)1997 void GPUKernelTracker::RecordQueued(uint64 queued_count, int weight) {
1998 VLOG(2) << "RecordQueued queued_count=" << queued_count
1999 << " first_available_=" << first_available_
2000 << " last_completed_=" << last_completed_
2001 << " num_pending_=" << num_pending_;
2002 pending_kernels_[first_available_].queued_count = queued_count;
2003 pending_kernels_[first_available_].weight = weight;
2004 pending_kernels_[first_available_].terminated = false;
2005 ++first_available_;
2006 num_pending_ += weight;
2007 if (first_available_ >= pending_kernels_.size()) {
2008 if (last_completed_ >= 0) {
2009 // wrap
2010 first_available_ = 0;
2011 } else {
2012 // enlarge the ring buffer
2013 pending_kernels_.resize(2 * pending_kernels_.size());
2014 }
2015 }
2016 if (first_available_ == last_completed_) {
2017 // Ring buffer is full: double it. All of the same valid PendingKernel
2018 // entries exist after the copy, they are just shifted to begin
2019 // at index 0 in the new array.
2020 std::vector<PendingKernel> new_buffer(pending_kernels_.size() * 2);
2021 for (int i = 0; i < pending_kernels_.size(); ++i) {
2022 int j = (i + last_completed_) % pending_kernels_.size();
2023 new_buffer[i] = pending_kernels_[j];
2024 }
2025 last_completed_ = 0;
2026 first_available_ = pending_kernels_.size();
2027 pending_kernels_.swap(new_buffer);
2028 VLOG(1) << "last_completed_=" << last_completed_
2029 << " first_available_=" << first_available_
2030 << " num_pending_=" << num_pending_;
2031 }
2032 DCHECK_NE(first_available_, last_completed_) << "exhausted pending_kernels";
2033 }
2034
2035 // Called by LastTerminatedCount() when new_value is equal to old_value. This
2036 // case can occur where an allocation failed and waited for memory to be freed,
2037 // then when it retried the safe allocation frontier had not advanced because no
2038 // tracking event had matured. Maybe GPU progress has stalled waiting on an i/o
2039 // event, or maybe we're tracking at too infrequent an interval. In any case if
2040 // the GPU compute queue is actually empty it's safe to advance the safe
2041 // frontier so that this request can allocate from unrestricted (and better
2042 // compacted) memory. So queue an event on the compute stream to ensure the
2043 // frontier does advance.
MaybeQueueProgressEvent()2044 void GPUKernelTracker::MaybeQueueProgressEvent() {
2045 mutex_lock l(mu_);
2046 if (num_pending_ == 0) {
2047 uint64 new_count = timing_counter_->next();
2048 RecordQueued(new_count, 1);
2049 em_->ThenExecute(stream_,
2050 [this, new_count]() { RecordTerminated(new_count); });
2051 }
2052 }
2053
RecordTerminated(uint64 queued_count)2054 void GPUKernelTracker::RecordTerminated(uint64 queued_count) {
2055 mutex_lock l(mu_);
2056 VLOG(2) << this << " RecordTerminated queued_count=" << queued_count
2057 << " first_available_=" << first_available_
2058 << " last_completed_=" << last_completed_
2059 << " num_pending_=" << num_pending_ << " LC="
2060 << ((last_completed_ >= 0)
2061 ? pending_kernels_[last_completed_].queued_count
2062 : -1);
2063 DCHECK_NE(first_available_, last_completed_);
2064 DCHECK_GT(num_pending_, 0);
2065 // Starting just past the last completed entry, find the entry with
2066 // this queued_count and mark it done.
2067 int index = (last_completed_ + 1) % pending_kernels_.size();
2068 int weight = 1;
2069 while (true) {
2070 if (index == first_available_) {
2071 // This should never happen.
2072 LOG(FATAL) << "Failed to find " << queued_count // Crash OK
2073 << " in queue, last_completed_=" << last_completed_
2074 << " index=" << index
2075 << " first_available_=" << first_available_
2076 << " pending_kernels_.size()=" << pending_kernels_.size();
2077 }
2078 if (pending_kernels_[index].queued_count == queued_count) {
2079 pending_kernels_[index].terminated = true;
2080 weight = pending_kernels_[index].weight;
2081 break;
2082 }
2083 index = (index + 1) % pending_kernels_.size();
2084 }
2085 // Next move last_completed_ forward past all completed kernels. In theory
2086 // kernels should always complete in queued order so we should be able to
2087 // advance the completed frontier to the just-completed PendingKernel. In
2088 // practice we occasionally see the termination callbacks arrive out of
2089 // order probably because of thread scheduling. Eventually we may support
2090 // out-of- order completion involving multiple compute streams so here we
2091 // follow a conservative approach and wait for every single callback to
2092 // arrive before advancing the frontier.
2093 while (true) {
2094 int next_index = (last_completed_ + 1) % pending_kernels_.size();
2095 if (next_index == first_available_) break;
2096 if (pending_kernels_[next_index].terminated) {
2097 last_completed_ = next_index;
2098 } else {
2099 break;
2100 }
2101 }
2102 if (last_completed_ >= 0) {
2103 int64_t v = pending_kernels_[last_completed_].queued_count;
2104 last_terminated_count_ = v;
2105 if (allocator_) {
2106 allocator_->SetSafeFrontier(v);
2107 }
2108 }
2109 // Last decrease num_pending before maybe waking a waiter.
2110 num_pending_ -= weight;
2111 pending_decreased_.notify_all();
2112 }
2113
2114 } // namespace tensorflow
2115
2116 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
2117