1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #if !GOOGLE_CUDA && !TENSORFLOW_USE_ROCM 17 #error This file must only be included when building with Cuda or ROCm support 18 #endif 19 20 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 21 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 22 23 #include <memory> 24 #include <string> 25 #include <unordered_map> 26 #include <vector> 27 28 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" 29 #include "tensorflow/core/common_runtime/device/device_id_utils.h" 30 #include "tensorflow/core/common_runtime/device_factory.h" 31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" 32 #include "tensorflow/core/common_runtime/gpu/gpu_id.h" 33 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" 34 #include "tensorflow/core/common_runtime/gpu_device_context.h" 35 #include "tensorflow/core/common_runtime/local_device.h" 36 #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h" 37 #include "tensorflow/core/common_runtime/shared_counter.h" 38 #include "tensorflow/core/framework/allocator.h" 39 #include "tensorflow/core/framework/device_base.h" 40 #include "tensorflow/core/framework/op_kernel.h" 41 #include "tensorflow/core/framework/tensor.h" 42 #include "tensorflow/core/lib/core/status.h" 43 #include "tensorflow/core/lib/gtl/inlined_vector.h" 44 #include "tensorflow/core/platform/mutex.h" 45 #include "tensorflow/core/platform/stream_executor.h" 46 #include "tensorflow/core/platform/types.h" 47 #include "tensorflow/core/public/session_options.h" 48 49 namespace tensorflow { 50 class GPUKernelTracker; 51 52 class BaseGPUDevice : public LocalDevice { 53 public: 54 BaseGPUDevice(const SessionOptions& options, const std::string& name, 55 Bytes memory_limit, const DeviceLocality& locality, 56 TfDeviceId tf_device_id, 57 const std::string& physical_device_desc, 58 Allocator* gpu_allocator, Allocator* cpu_allocator, 59 bool sync_every_op); 60 61 ~BaseGPUDevice() override; 62 63 // Initialize the device and return the status of initialization. 64 Status Init(const SessionOptions& options); 65 66 void Compute(OpKernel* op_kernel, OpKernelContext* context) override; 67 68 Status Sync() override; 69 70 void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context, 71 AsyncOpKernel::DoneCallback done) override; 72 73 Status MakeTensorFromProto(const TensorProto& tensor_proto, 74 const AllocatorAttributes alloc_attrs, 75 Tensor* tensor) override; 76 77 void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor, 78 const DeviceContext* device_context, 79 StatusCallback done) override; 80 81 // The caller owns the returned device. 82 PerOpGpuDevice* MakeGpuDevice() override; 83 84 Status ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device, 85 DeviceContext* dc, 86 Allocator* allocator) override; 87 88 // Returns the platform GPU id of this device within the native driver system; 89 // e.g., for CUDA and ROCm this is the ordinal of the GPU within the system. gpu_id()90 int gpu_id() const { 91 PlatformDeviceId platform_device_id; 92 TF_CHECK_OK( 93 GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id)); 94 return platform_device_id.value(); 95 } 96 97 // The executor that provides control for the device; e.g., for CUDA this 98 // corresponds to the cuda context. executor()99 se::StreamExecutor* executor() const { return executor_; } 100 101 Allocator* GetScopedAllocator(AllocatorAttributes attr, 102 int64_t step_id) override; 103 GetScopedAllocatorMgr()104 ScopedAllocatorMgr* GetScopedAllocatorMgr() const override { 105 return scoped_allocator_mgr_.get(); 106 } 107 108 // The following two functions always return 0 unless one of the 109 // related experimental config options has been specified. 110 111 // If returned value is > 0 then GPU Memory chunks freed before this count 112 // are guaranteed not to be in use by any kernel pending on this device. 113 uint64 SafeAllocFrontier(uint64 old_value) override; 114 115 // Returns the number of kernels that have been queued for execution on 116 // the compute stream and are not yet known to have completed. 117 int PendingKernels(); 118 priority()119 int priority() const { return stream_->priority; } 120 121 // Helper method for unit tests to reset the streams. Never use in production. 122 static void TestOnlyReset(); 123 GetStream()124 void* GetStream() { 125 return stream_->compute->implementation()->GpuStreamMemberHack(); 126 } 127 128 protected: 129 Allocator* gpu_allocator_; // not owned 130 Allocator* cpu_allocator_; // not owned 131 132 se::StreamExecutor* executor_; // not owned 133 std::unique_ptr<ScopedAllocatorMgr> scoped_allocator_mgr_; 134 135 private: 136 friend class GPUDeviceTestHelper; 137 struct StreamGroup { 138 se::Stream* compute = nullptr; 139 #if TENSORFLOW_USE_ROCM 140 se::Stream* nccl = nullptr; 141 #endif 142 se::Stream* host_to_device = nullptr; 143 se::Stream* device_to_host = nullptr; 144 gtl::InlinedVector<se::Stream*, 4> device_to_device; 145 int priority = 0; 146 }; 147 class StreamGroupFactory; 148 149 StreamGroup* stream_; 150 mutex scratch_init_mutex_; 151 char* scratch_ = nullptr; 152 GPUDeviceContext* device_context_; 153 GpuDeviceInfo* gpu_device_info_ = nullptr; 154 mutex trace_mu_; 155 TfDeviceId tf_device_id_; 156 const bool sync_every_op_ = false; 157 EventMgr* em_ = nullptr; 158 std::unique_ptr<thread::ThreadPool> thread_pool_; 159 std::unique_ptr<GPUKernelTracker> kernel_tracker_; 160 int32 pending_cap_ = 0; 161 bool timestamped_allocator_ = false; 162 163 // Initialize scratch buffers used by Eigen. 164 Status InitScratchBuffers(); 165 166 void ReinitializeDevice(OpKernelContext* context, PerOpGpuDevice* device, 167 int stream_id, Allocator* allocator); 168 169 std::string ComputeOpKernelDebugString(const OpKernel& op_kernel, 170 const int& stream_id); 171 172 // This method returns an initialization status, in addition to 173 // calling the "done" StatusCallback, if there is a failure to 174 // allocate memory or if the tensor "from" is not DMA-copyable. 175 // If there is no error prior to enqueueing the copy, an OK status 176 // is returned. 177 Status MaybeCopyTensorToGPU(const AllocatorAttributes& alloc_attrs, 178 const Tensor& from, Tensor* to, 179 StatusCallback done); 180 181 Tensor CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor); 182 void LogInputs(OpKernel* op_kernel, OpKernelContext* context); 183 void LogOutputs(OpKernel* op_kernel, OpKernelContext* context); 184 }; 185 186 // A per-compute-stream utility that keeps track of kernels that have been 187 // queued for execution but may not yet have terminated and also the queued 188 // time of the most recently terminated kernel. 189 class GPUKernelTracker { 190 public: 191 // Controls the strategy for inserting tracking events after GPU kernels. 192 // If max_interval >= 0, then insert an event after this many kernels 193 // if an event has not been inserted for another reason. 194 // If max_bytes > 0, then insert an event after kernels allocating this 195 // many bytes have been queued since the last event. 196 // If max_pending > 0, then track up to this many events at once. If 197 // this limit is reached the GPU::Compute() method will delay starting 198 // additional ops until some event completes. If 0 and one of the other 199 // fields is non-zero, then a reasonable default will be selected. 200 struct Params { 201 int max_interval = 0; 202 int max_bytes = 0; 203 int max_pending = 0; ParamsParams204 Params(int mi, int mb, int mp) 205 : max_interval(mi), max_bytes(mb), max_pending(mp) {} 206 }; 207 208 // If we're going to share a SharedCounter with an allocator, it's owned 209 // by the allocator because allocators are initialized once per process. 210 // Devices are per-session. GPUKernelTracker(const Params & params,Env * env,se::Stream * compute_stream,SharedCounter * timing_counter,Allocator * allocator,EventMgr * event_manager)211 explicit GPUKernelTracker(const Params& params, Env* env, 212 se::Stream* compute_stream, 213 SharedCounter* timing_counter, Allocator* allocator, 214 EventMgr* event_manager) 215 : params_(params), 216 env_(env), 217 stream_(compute_stream), 218 timing_counter_(timing_counter), 219 allocator_(allocator), 220 em_(event_manager), 221 pending_kernels_( 222 params.max_pending > 0 ? std::max(8, 2 * params.max_pending) : 64) { 223 mem_since_last_ = 0; 224 if (!timing_counter_) { 225 // There's not a preexisting counter owned by GPUProcessState, i.e. 226 // pending_cap > 0 but timestamped_allocator == false. 227 owned_counter_.reset(new SharedCounter); 228 timing_counter_ = owned_counter_.get(); 229 } 230 } 231 232 // Determine whether a GPU kernel should have a recording event queued 233 // immediately afterwards. If so, advance the counter and return the new 234 // counter value after enqueuing. 235 uint64 MaybeQueue(OpKernelContext* ctx); 236 237 // Record that a GPU kernel has just been enqueued on the compute stream. 238 // Inserts the supplied counter value in a new PendingKernel record appended 239 // to the end of the ring buffer then returns that same count. 240 // Caller is responsible for ensuring that RecordTerminate() is eventually 241 // called with the same counter value. 242 void RecordQueued(uint64 queued_count, int weight) 243 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 244 245 // Takes a count value returned by RecordQueued and finds the corresponding 246 // PendingKernel record in the ring buffer. Marks the kernel as completed and 247 // advances the completion frontier accordingly. 248 void RecordTerminated(uint64 queued_count); 249 250 // Returns the largest timing count such that all kernels queued no 251 // later than that count are known to have terminated. LastTerminatedCount(uint64 old_value)252 inline uint64 LastTerminatedCount(uint64 old_value) { 253 uint64 new_value = last_terminated_count_.load(std::memory_order_relaxed); 254 if (new_value == old_value) { 255 MaybeQueueProgressEvent(); 256 } 257 return new_value; 258 } 259 260 // Returns the number of kernels enqueued that are not yet known to 261 // have terminated. NumPending()262 int NumPending() { 263 mutex_lock l(mu_); 264 return num_pending_; 265 } 266 267 // Yield current thread until number of pending kernels no longer 268 // exceeds the cap. PauseWhilePendingExceeds(int cap)269 void PauseWhilePendingExceeds(int cap) TF_LOCKS_EXCLUDED(mu_) { 270 mutex_lock l(mu_); 271 while (num_pending_ > cap) { 272 VLOG(1) << "num_pending_=" << num_pending_ << " cap=" << cap; 273 pending_decreased_.wait(l); 274 } 275 } 276 277 private: 278 friend class GPUKernelTrackerTest; 279 Params params_; 280 Env* env_; 281 se::Stream* stream_; 282 SharedCounter* timing_counter_; 283 std::unique_ptr<SharedCounter> owned_counter_; 284 Allocator* allocator_ = nullptr; 285 EventMgr* em_ = nullptr; 286 std::atomic<uint64> last_terminated_count_ = {1}; 287 288 void MaybeQueueProgressEvent(); 289 290 // Records when a kernel was queued for execution. Kernel launches are 291 // identified by a unique count value from a per-GPU device timing counter. 292 struct PendingKernel { 293 uint64 queued_count; 294 int weight; 295 bool terminated; PendingKernelPendingKernel296 PendingKernel(const PendingKernel& pk) 297 : queued_count(pk.queued_count), 298 weight(pk.weight), 299 terminated(pk.terminated) {} PendingKernelPendingKernel300 PendingKernel() : queued_count(0), weight(0), terminated(false) {} 301 }; 302 mutex mu_; 303 int32 mem_since_last_ TF_GUARDED_BY(mu_); 304 int32 ops_since_last_ TF_GUARDED_BY(mu_); 305 // Ring buffer of PendingKernel records. 306 std::vector<PendingKernel> pending_kernels_ TF_GUARDED_BY(mu_); 307 // Next unused slot in pending_kernels_. 308 int first_available_ TF_GUARDED_BY(mu_) = 0; 309 // Last completed PendingKernel such that all prior PendingKernels are 310 // also completed. With out-of-order completion there may be a mixture 311 // of completed and uncompleted entries between last_completed_ and 312 // first_available_. 313 int last_completed_ TF_GUARDED_BY(mu_) = -1; 314 // Sum of weights of the outstanding events marking tracked kernels. 315 int num_pending_ TF_GUARDED_BY(mu_) = 0; 316 condition_variable pending_decreased_ TF_GUARDED_BY(mu_); 317 }; 318 319 class BaseGPUDeviceFactory : public DeviceFactory { 320 public: 321 Status ListPhysicalDevices(std::vector<string>* devices) override; 322 Status CreateDevices(const SessionOptions& options, 323 const std::string& name_prefix, 324 std::vector<std::unique_ptr<Device>>* devices) override; 325 Status GetDeviceDetails(int device_index, 326 std::unordered_map<string, string>* details) override; 327 328 struct InterconnectMap { 329 // Name of interconnect technology, if known. 330 std::string name; 331 // If possible, strength should approximate Gb/sec bandwidth rate. 332 // Where architecture-specific subclassing is not done that won't 333 // always be possible. The minimum expectation is that 334 // faster links should have a higher value than slower links. 335 int32 strength; 336 static const int kSameDeviceStrength; 337 static const int kStreamExecutorStrength; 338 std::set<std::pair<PlatformDeviceId, PlatformDeviceId>> directed_links; 339 }; 340 341 protected: 342 // Populates *maps with interconnect maps for all local direct access 343 // pathways between GPUs. 344 virtual Status GetInterconnectMaps( 345 const std::vector<PlatformDeviceId>& visible_gpu_order, 346 se::Platform* gpu_manager, std::vector<InterconnectMap>* maps); 347 348 struct TfDeviceIdHash { operatorTfDeviceIdHash349 std::size_t operator()(const TfDeviceId& id) const noexcept { 350 return std::hash<int>{}(id.value()); 351 } 352 }; 353 typedef std::unordered_map<TfDeviceId, DeviceLocality, TfDeviceIdHash> 354 LocalityMap; 355 // Populates *localities with the DeviceLocality descriptor for 356 // every TfDeviceId. 357 virtual Status GetDeviceLocalities( 358 int num_tf_gpus, const std::vector<InterconnectMap>& interconnects, 359 LocalityMap* localities); 360 361 private: 362 // Creates a BaseGPUDevice associated with 'tf_device_id', allocates 363 // (strictly) 'memory_limit' bytes of GPU memory to it, and adds it to the 364 // 'devices' vector. 365 Status CreateGPUDevice(const SessionOptions& options, 366 const std::string& name_prefix, 367 TfDeviceId tf_device_id, int64_t memory_limit, 368 const DeviceLocality& dev_locality, size_t num_tf_gpus, 369 std::vector<std::unique_ptr<Device>>* devices); 370 371 virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice( 372 const SessionOptions& options, const string& name, Bytes memory_limit, 373 const DeviceLocality& dev_locality, TfDeviceId tf_device_id, 374 const string& physical_device_desc, Allocator* gpu_allocator, 375 Allocator* cpu_allocator) = 0; 376 377 Status EnablePeerAccess( 378 const std::vector<PlatformDeviceId>& visible_gpu_order); 379 380 // Returns into 'ids' the list of valid platform GPU ids, in the order that 381 // they should map to TF GPU ids "/device:GPU:0", "/device:GPU:1", etc, 382 // based upon 'visible_gpu_order' which was generated by parsing 383 // GPUOptions::visible_device_list which is a comma-separated list of CUDA or 384 // ROCm GPU ids. 385 Status GetValidDeviceIds( 386 const std::vector<PlatformDeviceId>& visible_gpu_order, 387 std::vector<PlatformDeviceId>* ids); 388 389 // Cache the valid device IDs if not already cached. Cached IDs are stored in 390 // field cached_device_ids_. Passes {0, 1, ..., num_devices-1} to 391 // GetValidDeviceIds, so this should only be used in functions where all 392 // devices should be treated as visible, like ListPhysicalDevices. 393 Status CacheDeviceIds(); 394 395 // visible_gpu_initialized_[platform_device_id] is true if visible GPU 396 // platform_device_id has been initialized by the process. 397 std::unordered_map<int, bool> visible_gpu_initialized_; 398 399 // Cached device IDs, as returned by GetValidDeviceIds when every physical 400 // device is visible. Cache should not be used if some devices are not 401 // visible. 402 std::vector<PlatformDeviceId> cached_device_ids_; 403 }; 404 405 } // namespace tensorflow 406 407 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 408