1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #if !GOOGLE_CUDA && !TENSORFLOW_USE_ROCM 17 #error This file must only be included when building with Cuda or ROCm support 18 #endif 19 20 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 21 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 22 23 #include <memory> 24 #include <string> 25 #include <unordered_map> 26 #include <vector> 27 28 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" 29 #include "tensorflow/core/common_runtime/device/device_id_utils.h" 30 #include "tensorflow/core/common_runtime/device_factory.h" 31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" 32 #include "tensorflow/core/common_runtime/gpu/gpu_id.h" 33 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" 34 #include "tensorflow/core/common_runtime/gpu_device_context.h" 35 #include "tensorflow/core/common_runtime/local_device.h" 36 #include "tensorflow/core/common_runtime/node_file_writer.h" 37 #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h" 38 #include "tensorflow/core/common_runtime/shared_counter.h" 39 #include "tensorflow/core/framework/allocator.h" 40 #include "tensorflow/core/framework/device_base.h" 41 #include "tensorflow/core/framework/op_kernel.h" 42 #include "tensorflow/core/framework/tensor.h" 43 #include "tensorflow/core/lib/core/status.h" 44 #include "tensorflow/core/lib/gtl/inlined_vector.h" 45 #include "tensorflow/core/platform/mutex.h" 46 #include "tensorflow/core/platform/stream_executor.h" 47 #include "tensorflow/core/platform/types.h" 48 #include "tensorflow/core/public/session_options.h" 49 50 namespace Eigen { 51 class StreamInterface; 52 } 53 54 namespace tensorflow { 55 class GPUKernelTracker; 56 57 class ConcretePerOpGpuDevice : public PerOpGpuDevice { 58 public: 59 ConcretePerOpGpuDevice(); 60 61 void Reinitialize(OpKernelContext* context, const void* gpu_stream, 62 TfDeviceId tf_device_id, Allocator* base_allocator, 63 char* scratch); 64 65 void Reinitialize(OpKernelContext* context, const void* gpu_stream, 66 PlatformDeviceId platform_device_id, 67 Allocator* base_allocator, char* scratch); 68 69 const Eigen::GpuDevice& device() const override; 70 71 private: 72 std::unique_ptr<::Eigen::StreamInterface> stream_device_; 73 }; 74 75 class BaseGPUDevice : public LocalDevice { 76 public: 77 BaseGPUDevice(const SessionOptions& options, const std::string& name, 78 Bytes memory_limit, const DeviceLocality& locality, 79 TfDeviceId tf_device_id, 80 const std::string& physical_device_desc, 81 Allocator* gpu_allocator, Allocator* cpu_allocator, 82 bool sync_every_op); 83 84 ~BaseGPUDevice() override; 85 86 // Initialize the device and return the status of initialization. 87 Status Init(const SessionOptions& options); 88 89 void Compute(OpKernel* op_kernel, OpKernelContext* context) override; 90 91 Status Sync() override; 92 93 void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context, 94 AsyncOpKernel::DoneCallback done) override; 95 96 Status MakeTensorFromProto(const TensorProto& tensor_proto, 97 const AllocatorAttributes alloc_attrs, 98 Tensor* tensor) override; 99 100 void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor, 101 const DeviceContext* device_context, 102 StatusCallback done) override; 103 104 // The caller owns the returned device. 105 PerOpGpuDevice* MakeGpuDevice() override; 106 107 Status ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device, 108 DeviceContext* dc, 109 Allocator* allocator) override; 110 111 // Returns the platform GPU id of this device within the native driver system; 112 // e.g., for CUDA and ROCm this is the ordinal of the GPU within the system. gpu_id()113 int gpu_id() const { 114 PlatformDeviceId platform_device_id; 115 TF_CHECK_OK( 116 GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id)); 117 return platform_device_id.value(); 118 } 119 120 // The executor that provides control for the device; e.g., for CUDA this 121 // corresponds to the cuda context. executor()122 se::StreamExecutor* executor() const { return executor_; } 123 124 Allocator* GetScopedAllocator(AllocatorAttributes attr, 125 int64_t step_id) override; 126 GetScopedAllocatorMgr()127 ScopedAllocatorMgr* GetScopedAllocatorMgr() const override { 128 return scoped_allocator_mgr_.get(); 129 } 130 131 // The following two functions always return 0 unless one of the 132 // related experimental config options has been specified. 133 134 // If returned value is > 0 then GPU Memory chunks freed before this count 135 // are guaranteed not to be in use by any kernel pending on this device. 136 uint64 SafeAllocFrontier(uint64 old_value) override; 137 138 // Returns the number of kernels that have been queued for execution on 139 // the compute stream and are not yet known to have completed. 140 int PendingKernels(); 141 priority()142 int priority() const { return stream_->priority; } 143 144 // Helper method for unit tests to reset the streams. Never use in production. 145 static void TestOnlyReset(); 146 GetStream()147 void* GetStream() { 148 return stream_->compute->implementation()->GpuStreamMemberHack(); 149 } 150 151 protected: 152 Allocator* gpu_allocator_; // not owned 153 Allocator* cpu_allocator_; // not owned 154 155 se::StreamExecutor* executor_; // not owned 156 std::unique_ptr<ScopedAllocatorMgr> scoped_allocator_mgr_; 157 158 private: 159 friend class GPUDeviceTestHelper; 160 struct StreamGroup { 161 se::Stream* compute = nullptr; 162 #if TENSORFLOW_USE_ROCM 163 se::Stream* nccl = nullptr; 164 #endif 165 se::Stream* host_to_device = nullptr; 166 se::Stream* device_to_host = nullptr; 167 gtl::InlinedVector<se::Stream*, 4> device_to_device; 168 int priority = 0; 169 }; 170 class StreamGroupFactory; 171 172 StreamGroup* stream_; 173 mutex scratch_init_mutex_; 174 char* scratch_ = nullptr; 175 GPUDeviceContext* device_context_; 176 DeviceBase::AcceleratorDeviceInfo* accelerator_device_info_ = nullptr; 177 mutex trace_mu_; 178 TfDeviceId tf_device_id_; 179 const bool sync_every_op_ = false; 180 EventMgr* em_ = nullptr; 181 std::unique_ptr<thread::ThreadPool> thread_pool_; 182 std::unique_ptr<GPUKernelTracker> kernel_tracker_; 183 int32 pending_cap_ = 0; 184 bool timestamped_allocator_ = false; 185 NodeFileWriter* node_file_writer_ = nullptr; // not owned 186 187 // Initialize scratch buffers used by Eigen. 188 Status InitScratchBuffers(); 189 190 void ReinitializeDevice(OpKernelContext* context, PerOpGpuDevice* device, 191 int stream_id, Allocator* allocator); 192 193 std::string ComputeOpKernelDebugString(const OpKernel& op_kernel, 194 const int& stream_id); 195 196 // This method returns an initialization status, in addition to 197 // calling the "done" StatusCallback, if there is a failure to 198 // allocate memory or if the tensor "from" is not DMA-copyable. 199 // If there is no error prior to enqueueing the copy, an OK status 200 // is returned. 201 Status MaybeCopyTensorToGPU(const AllocatorAttributes& alloc_attrs, 202 const Tensor& from, Tensor* to, 203 StatusCallback done); 204 205 Tensor CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor); 206 void LogInputs(OpKernel* op_kernel, OpKernelContext* context); 207 void LogOutputs(OpKernel* op_kernel, OpKernelContext* context); 208 }; 209 210 // A per-compute-stream utility that keeps track of kernels that have been 211 // queued for execution but may not yet have terminated and also the queued 212 // time of the most recently terminated kernel. 213 class GPUKernelTracker { 214 public: 215 // Controls the strategy for inserting tracking events after GPU kernels. 216 // If max_interval >= 0, then insert an event after this many kernels 217 // if an event has not been inserted for another reason. 218 // If max_bytes > 0, then insert an event after kernels allocating this 219 // many bytes have been queued since the last event. 220 // If max_pending > 0, then track up to this many events at once. If 221 // this limit is reached the GPU::Compute() method will delay starting 222 // additional ops until some event completes. If 0 and one of the other 223 // fields is non-zero, then a reasonable default will be selected. 224 struct Params { 225 int max_interval = 0; 226 int max_bytes = 0; 227 int max_pending = 0; ParamsParams228 Params(int mi, int mb, int mp) 229 : max_interval(mi), max_bytes(mb), max_pending(mp) {} 230 }; 231 232 // If we're going to share a SharedCounter with an allocator, it's owned 233 // by the allocator because allocators are initialized once per process. 234 // Devices are per-session. GPUKernelTracker(const Params & params,Env * env,se::Stream * compute_stream,SharedCounter * timing_counter,Allocator * allocator,EventMgr * event_manager)235 explicit GPUKernelTracker(const Params& params, Env* env, 236 se::Stream* compute_stream, 237 SharedCounter* timing_counter, Allocator* allocator, 238 EventMgr* event_manager) 239 : params_(params), 240 env_(env), 241 stream_(compute_stream), 242 timing_counter_(timing_counter), 243 allocator_(allocator), 244 em_(event_manager), 245 pending_kernels_( 246 params.max_pending > 0 ? std::max(8, 2 * params.max_pending) : 64) { 247 mem_since_last_ = 0; 248 if (!timing_counter_) { 249 // There's not a preexisting counter owned by GPUProcessState, i.e. 250 // pending_cap > 0 but timestamped_allocator == false. 251 owned_counter_.reset(new SharedCounter); 252 timing_counter_ = owned_counter_.get(); 253 } 254 } 255 256 // Determine whether a GPU kernel should have a recording event queued 257 // immediately afterwards. If so, advance the counter and return the new 258 // counter value after enqueuing. 259 uint64 MaybeQueue(OpKernelContext* ctx); 260 261 // Record that a GPU kernel has just been enqueued on the compute stream. 262 // Inserts the supplied counter value in a new PendingKernel record appended 263 // to the end of the ring buffer then returns that same count. 264 // Caller is responsible for ensuring that RecordTerminate() is eventually 265 // called with the same counter value. 266 void RecordQueued(uint64 queued_count, int weight) 267 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 268 269 // Takes a count value returned by RecordQueued and finds the corresponding 270 // PendingKernel record in the ring buffer. Marks the kernel as completed and 271 // advances the completion frontier accordingly. 272 void RecordTerminated(uint64 queued_count); 273 274 // Returns the largest timing count such that all kernels queued no 275 // later than that count are known to have terminated. LastTerminatedCount(uint64 old_value)276 inline uint64 LastTerminatedCount(uint64 old_value) { 277 uint64 new_value = last_terminated_count_.load(std::memory_order_relaxed); 278 if (new_value == old_value) { 279 MaybeQueueProgressEvent(); 280 } 281 return new_value; 282 } 283 284 // Returns the number of kernels enqueued that are not yet known to 285 // have terminated. NumPending()286 int NumPending() { 287 mutex_lock l(mu_); 288 return num_pending_; 289 } 290 291 // Yield current thread until number of pending kernels no longer 292 // exceeds the cap. PauseWhilePendingExceeds(int cap)293 void PauseWhilePendingExceeds(int cap) TF_LOCKS_EXCLUDED(mu_) { 294 mutex_lock l(mu_); 295 while (num_pending_ > cap) { 296 VLOG(1) << "num_pending_=" << num_pending_ << " cap=" << cap; 297 pending_decreased_.wait(l); 298 } 299 } 300 301 private: 302 friend class GPUKernelTrackerTest; 303 Params params_; 304 Env* env_; 305 se::Stream* stream_; 306 SharedCounter* timing_counter_; 307 std::unique_ptr<SharedCounter> owned_counter_; 308 Allocator* allocator_ = nullptr; 309 EventMgr* em_ = nullptr; 310 std::atomic<uint64> last_terminated_count_ = {1}; 311 312 void MaybeQueueProgressEvent(); 313 314 // Records when a kernel was queued for execution. Kernel launches are 315 // identified by a unique count value from a per-GPU device timing counter. 316 struct PendingKernel { 317 uint64 queued_count; 318 int weight; 319 bool terminated; PendingKernelPendingKernel320 PendingKernel(const PendingKernel& pk) 321 : queued_count(pk.queued_count), 322 weight(pk.weight), 323 terminated(pk.terminated) {} PendingKernelPendingKernel324 PendingKernel() : queued_count(0), weight(0), terminated(false) {} 325 }; 326 mutex mu_; 327 int32 mem_since_last_ TF_GUARDED_BY(mu_); 328 int32 ops_since_last_ TF_GUARDED_BY(mu_); 329 // Ring buffer of PendingKernel records. 330 std::vector<PendingKernel> pending_kernels_ TF_GUARDED_BY(mu_); 331 // Next unused slot in pending_kernels_. 332 int first_available_ TF_GUARDED_BY(mu_) = 0; 333 // Last completed PendingKernel such that all prior PendingKernels are 334 // also completed. With out-of-order completion there may be a mixture 335 // of completed and uncompleted entries between last_completed_ and 336 // first_available_. 337 int last_completed_ TF_GUARDED_BY(mu_) = -1; 338 // Sum of weights of the outstanding events marking tracked kernels. 339 int num_pending_ TF_GUARDED_BY(mu_) = 0; 340 condition_variable pending_decreased_ TF_GUARDED_BY(mu_); 341 }; 342 343 class BaseGPUDeviceFactory : public DeviceFactory { 344 public: 345 Status ListPhysicalDevices(std::vector<string>* devices) override; 346 Status CreateDevices(const SessionOptions& options, 347 const std::string& name_prefix, 348 std::vector<std::unique_ptr<Device>>* devices) override; 349 Status GetDeviceDetails(int device_index, 350 std::unordered_map<string, string>* details) override; 351 352 struct InterconnectMap { 353 // Name of interconnect technology, if known. 354 std::string name; 355 // If possible, strength should approximate Gb/sec bandwidth rate. 356 // Where architecture-specific subclassing is not done that won't 357 // always be possible. The minimum expectation is that 358 // faster links should have a higher value than slower links. 359 int32 strength; 360 static const int kSameDeviceStrength; 361 static const int kStreamExecutorStrength; 362 std::set<std::pair<PlatformDeviceId, PlatformDeviceId>> directed_links; 363 }; 364 365 protected: 366 // Populates *maps with interconnect maps for all local direct access 367 // pathways between GPUs. 368 virtual Status GetInterconnectMaps( 369 const std::vector<PlatformDeviceId>& visible_gpu_order, 370 se::Platform* gpu_manager, std::vector<InterconnectMap>* maps); 371 372 struct TfDeviceIdHash { operatorTfDeviceIdHash373 std::size_t operator()(const TfDeviceId& id) const noexcept { 374 return std::hash<int>{}(id.value()); 375 } 376 }; 377 typedef std::unordered_map<TfDeviceId, DeviceLocality, TfDeviceIdHash> 378 LocalityMap; 379 // Populates *localities with the DeviceLocality descriptor for 380 // every TfDeviceId. 381 virtual Status GetDeviceLocalities( 382 int num_tf_gpus, const std::vector<InterconnectMap>& interconnects, 383 LocalityMap* localities); 384 385 private: 386 // Creates a BaseGPUDevice associated with 'tf_device_id', allocates 387 // (strictly) 'memory_limit' bytes of GPU memory to it, and adds it to the 388 // 'devices' vector. 389 Status CreateGPUDevice(const SessionOptions& options, 390 const std::string& name_prefix, 391 TfDeviceId tf_device_id, int64_t memory_limit, 392 const DeviceLocality& dev_locality, size_t num_tf_gpus, 393 std::vector<std::unique_ptr<Device>>* devices); 394 395 virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice( 396 const SessionOptions& options, const string& name, Bytes memory_limit, 397 const DeviceLocality& dev_locality, TfDeviceId tf_device_id, 398 const string& physical_device_desc, Allocator* gpu_allocator, 399 Allocator* cpu_allocator) = 0; 400 401 Status EnablePeerAccess( 402 const std::vector<PlatformDeviceId>& visible_gpu_order); 403 404 // Returns into 'ids' the list of valid platform GPU ids, in the order that 405 // they should map to TF GPU ids "/device:GPU:0", "/device:GPU:1", etc, 406 // based upon 'visible_gpu_order' which was generated by parsing 407 // GPUOptions::visible_device_list which is a comma-separated list of CUDA or 408 // ROCm GPU ids. 409 Status GetValidDeviceIds( 410 const std::vector<PlatformDeviceId>& visible_gpu_order, 411 std::vector<PlatformDeviceId>* ids); 412 413 // Cache the valid device IDs if not already cached. Cached IDs are stored in 414 // field cached_device_ids_. Passes {0, 1, ..., num_devices-1} to 415 // GetValidDeviceIds, so this should only be used in functions where all 416 // devices should be treated as visible, like ListPhysicalDevices. 417 Status CacheDeviceIds(); 418 419 // visible_gpu_initialized_[platform_device_id] is true if visible GPU 420 // platform_device_id has been initialized by the process. 421 std::unordered_map<int, bool> visible_gpu_initialized_; 422 423 // Cached device IDs, as returned by GetValidDeviceIds when every physical 424 // device is visible. Cache should not be used if some devices are not 425 // visible. 426 std::vector<PlatformDeviceId> cached_device_ids_; 427 }; 428 429 } // namespace tensorflow 430 431 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 432