• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #if !GOOGLE_CUDA && !TENSORFLOW_USE_ROCM
17 #error This file must only be included when building with Cuda or ROCm support
18 #endif
19 
20 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
21 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
22 
23 #include <memory>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
29 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
30 #include "tensorflow/core/common_runtime/device_factory.h"
31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
32 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
33 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
34 #include "tensorflow/core/common_runtime/gpu_device_context.h"
35 #include "tensorflow/core/common_runtime/local_device.h"
36 #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h"
37 #include "tensorflow/core/common_runtime/shared_counter.h"
38 #include "tensorflow/core/framework/allocator.h"
39 #include "tensorflow/core/framework/device_base.h"
40 #include "tensorflow/core/framework/op_kernel.h"
41 #include "tensorflow/core/framework/tensor.h"
42 #include "tensorflow/core/lib/core/status.h"
43 #include "tensorflow/core/lib/gtl/inlined_vector.h"
44 #include "tensorflow/core/platform/mutex.h"
45 #include "tensorflow/core/platform/stream_executor.h"
46 #include "tensorflow/core/platform/types.h"
47 #include "tensorflow/core/public/session_options.h"
48 
49 namespace tensorflow {
50 class GPUKernelTracker;
51 
52 class BaseGPUDevice : public LocalDevice {
53  public:
54   BaseGPUDevice(const SessionOptions& options, const std::string& name,
55                 Bytes memory_limit, const DeviceLocality& locality,
56                 TfDeviceId tf_device_id,
57                 const std::string& physical_device_desc,
58                 Allocator* gpu_allocator, Allocator* cpu_allocator,
59                 bool sync_every_op);
60 
61   ~BaseGPUDevice() override;
62 
63   // Initialize the device and return the status of initialization.
64   Status Init(const SessionOptions& options);
65 
66   void Compute(OpKernel* op_kernel, OpKernelContext* context) override;
67 
68   Status Sync() override;
69 
70   void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context,
71                     AsyncOpKernel::DoneCallback done) override;
72 
73   Status MakeTensorFromProto(const TensorProto& tensor_proto,
74                              const AllocatorAttributes alloc_attrs,
75                              Tensor* tensor) override;
76 
77   void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor,
78                               const DeviceContext* device_context,
79                               StatusCallback done) override;
80 
81   // The caller owns the returned device.
82   PerOpGpuDevice* MakeGpuDevice() override;
83 
84   Status ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device,
85                                DeviceContext* dc,
86                                Allocator* allocator) override;
87 
88   // Returns the platform GPU id of this device within the native driver system;
89   // e.g., for CUDA and ROCm this is the ordinal of the GPU within the system.
gpu_id()90   int gpu_id() const {
91     PlatformDeviceId platform_device_id;
92     TF_CHECK_OK(
93         GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id));
94     return platform_device_id.value();
95   }
96 
97   // The executor that provides control for the device; e.g., for CUDA this
98   // corresponds to the cuda context.
executor()99   se::StreamExecutor* executor() const { return executor_; }
100 
101   Allocator* GetScopedAllocator(AllocatorAttributes attr,
102                                 int64_t step_id) override;
103 
GetScopedAllocatorMgr()104   ScopedAllocatorMgr* GetScopedAllocatorMgr() const override {
105     return scoped_allocator_mgr_.get();
106   }
107 
108   // The following two functions always return 0 unless one of the
109   // related experimental config options has been specified.
110 
111   // If returned value is > 0 then GPU Memory chunks freed before this count
112   // are guaranteed not to be in use by any kernel pending on this device.
113   uint64 SafeAllocFrontier(uint64 old_value) override;
114 
115   // Returns the number of kernels that have been queued for execution on
116   // the compute stream and are not yet known to have completed.
117   int PendingKernels();
118 
priority()119   int priority() const { return stream_->priority; }
120 
121   // Helper method for unit tests to reset the streams. Never use in production.
122   static void TestOnlyReset();
123 
GetStream()124   void* GetStream() {
125     return stream_->compute->implementation()->GpuStreamMemberHack();
126   }
127 
128  protected:
129   Allocator* gpu_allocator_;  // not owned
130   Allocator* cpu_allocator_;  // not owned
131 
132   se::StreamExecutor* executor_;  // not owned
133   std::unique_ptr<ScopedAllocatorMgr> scoped_allocator_mgr_;
134 
135  private:
136   friend class GPUDeviceTestHelper;
137   struct StreamGroup {
138     se::Stream* compute = nullptr;
139 #if TENSORFLOW_USE_ROCM
140     se::Stream* nccl = nullptr;
141 #endif
142     se::Stream* host_to_device = nullptr;
143     se::Stream* device_to_host = nullptr;
144     gtl::InlinedVector<se::Stream*, 4> device_to_device;
145     int priority = 0;
146   };
147   class StreamGroupFactory;
148 
149   StreamGroup* stream_;
150   mutex scratch_init_mutex_;
151   char* scratch_ = nullptr;
152   GPUDeviceContext* device_context_;
153   GpuDeviceInfo* gpu_device_info_ = nullptr;
154   mutex trace_mu_;
155   TfDeviceId tf_device_id_;
156   const bool sync_every_op_ = false;
157   EventMgr* em_ = nullptr;
158   std::unique_ptr<thread::ThreadPool> thread_pool_;
159   std::unique_ptr<GPUKernelTracker> kernel_tracker_;
160   int32 pending_cap_ = 0;
161   bool timestamped_allocator_ = false;
162 
163   // Initialize scratch buffers used by Eigen.
164   Status InitScratchBuffers();
165 
166   void ReinitializeDevice(OpKernelContext* context, PerOpGpuDevice* device,
167                           int stream_id, Allocator* allocator);
168 
169   std::string ComputeOpKernelDebugString(const OpKernel& op_kernel,
170                                          const int& stream_id);
171 
172   // This method returns an initialization status, in addition to
173   // calling the "done" StatusCallback, if there is a failure to
174   // allocate memory or if the tensor "from" is not DMA-copyable.
175   // If there is no error prior to enqueueing the copy, an OK status
176   // is returned.
177   Status MaybeCopyTensorToGPU(const AllocatorAttributes& alloc_attrs,
178                               const Tensor& from, Tensor* to,
179                               StatusCallback done);
180 
181   Tensor CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor);
182   void LogInputs(OpKernel* op_kernel, OpKernelContext* context);
183   void LogOutputs(OpKernel* op_kernel, OpKernelContext* context);
184 };
185 
186 // A per-compute-stream utility that keeps track of kernels that have been
187 // queued for execution but may not yet have terminated and also the queued
188 // time of the most recently terminated kernel.
189 class GPUKernelTracker {
190  public:
191   // Controls the strategy for inserting tracking events after GPU kernels.
192   //   If max_interval >= 0, then insert an event after this many kernels
193   //     if an event has not been inserted for another reason.
194   //   If max_bytes > 0, then insert an event after kernels allocating this
195   //     many bytes have been queued since the last event.
196   //   If max_pending > 0, then track up to this many events at once.  If
197   //     this limit is reached the GPU::Compute() method will delay starting
198   //     additional ops until some event completes.  If 0 and one of the other
199   //     fields is non-zero, then a reasonable default will be selected.
200   struct Params {
201     int max_interval = 0;
202     int max_bytes = 0;
203     int max_pending = 0;
ParamsParams204     Params(int mi, int mb, int mp)
205         : max_interval(mi), max_bytes(mb), max_pending(mp) {}
206   };
207 
208   // If we're going to share a SharedCounter with an allocator, it's owned
209   // by the allocator because allocators are initialized once per process.
210   // Devices are per-session.
GPUKernelTracker(const Params & params,Env * env,se::Stream * compute_stream,SharedCounter * timing_counter,Allocator * allocator,EventMgr * event_manager)211   explicit GPUKernelTracker(const Params& params, Env* env,
212                             se::Stream* compute_stream,
213                             SharedCounter* timing_counter, Allocator* allocator,
214                             EventMgr* event_manager)
215       : params_(params),
216         env_(env),
217         stream_(compute_stream),
218         timing_counter_(timing_counter),
219         allocator_(allocator),
220         em_(event_manager),
221         pending_kernels_(
222             params.max_pending > 0 ? std::max(8, 2 * params.max_pending) : 64) {
223     mem_since_last_ = 0;
224     if (!timing_counter_) {
225       // There's not a preexisting counter owned by GPUProcessState, i.e.
226       // pending_cap > 0 but timestamped_allocator == false.
227       owned_counter_.reset(new SharedCounter);
228       timing_counter_ = owned_counter_.get();
229     }
230   }
231 
232   // Determine whether a GPU kernel should have a recording event queued
233   // immediately afterwards.  If so, advance the counter and return the new
234   // counter value after enqueuing.
235   uint64 MaybeQueue(OpKernelContext* ctx);
236 
237   // Record that a GPU kernel has just been enqueued on the compute stream.
238   // Inserts the supplied counter value in a new PendingKernel record appended
239   // to the end of the ring buffer then returns that same count.
240   // Caller is responsible for ensuring that RecordTerminate() is eventually
241   // called with the same counter value.
242   void RecordQueued(uint64 queued_count, int weight)
243       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
244 
245   // Takes a count value returned by RecordQueued and finds the corresponding
246   // PendingKernel record in the ring buffer.  Marks the kernel as completed and
247   // advances the completion frontier accordingly.
248   void RecordTerminated(uint64 queued_count);
249 
250   // Returns the largest timing count such that all kernels queued no
251   // later than that count are known to have terminated.
LastTerminatedCount(uint64 old_value)252   inline uint64 LastTerminatedCount(uint64 old_value) {
253     uint64 new_value = last_terminated_count_.load(std::memory_order_relaxed);
254     if (new_value == old_value) {
255       MaybeQueueProgressEvent();
256     }
257     return new_value;
258   }
259 
260   // Returns the number of kernels enqueued that are not yet known to
261   // have terminated.
NumPending()262   int NumPending() {
263     mutex_lock l(mu_);
264     return num_pending_;
265   }
266 
267   // Yield current thread until number of pending kernels no longer
268   // exceeds the cap.
PauseWhilePendingExceeds(int cap)269   void PauseWhilePendingExceeds(int cap) TF_LOCKS_EXCLUDED(mu_) {
270     mutex_lock l(mu_);
271     while (num_pending_ > cap) {
272       VLOG(1) << "num_pending_=" << num_pending_ << " cap=" << cap;
273       pending_decreased_.wait(l);
274     }
275   }
276 
277  private:
278   friend class GPUKernelTrackerTest;
279   Params params_;
280   Env* env_;
281   se::Stream* stream_;
282   SharedCounter* timing_counter_;
283   std::unique_ptr<SharedCounter> owned_counter_;
284   Allocator* allocator_ = nullptr;
285   EventMgr* em_ = nullptr;
286   std::atomic<uint64> last_terminated_count_ = {1};
287 
288   void MaybeQueueProgressEvent();
289 
290   // Records when a kernel was queued for execution.  Kernel launches are
291   // identified by a unique count value from a per-GPU device timing counter.
292   struct PendingKernel {
293     uint64 queued_count;
294     int weight;
295     bool terminated;
PendingKernelPendingKernel296     PendingKernel(const PendingKernel& pk)
297         : queued_count(pk.queued_count),
298           weight(pk.weight),
299           terminated(pk.terminated) {}
PendingKernelPendingKernel300     PendingKernel() : queued_count(0), weight(0), terminated(false) {}
301   };
302   mutex mu_;
303   int32 mem_since_last_ TF_GUARDED_BY(mu_);
304   int32 ops_since_last_ TF_GUARDED_BY(mu_);
305   // Ring buffer of PendingKernel records.
306   std::vector<PendingKernel> pending_kernels_ TF_GUARDED_BY(mu_);
307   // Next unused slot in pending_kernels_.
308   int first_available_ TF_GUARDED_BY(mu_) = 0;
309   // Last completed PendingKernel such that all prior PendingKernels are
310   // also completed.  With out-of-order completion there may be a mixture
311   // of completed and uncompleted entries between last_completed_ and
312   // first_available_.
313   int last_completed_ TF_GUARDED_BY(mu_) = -1;
314   // Sum of weights of the outstanding events marking tracked kernels.
315   int num_pending_ TF_GUARDED_BY(mu_) = 0;
316   condition_variable pending_decreased_ TF_GUARDED_BY(mu_);
317 };
318 
319 class BaseGPUDeviceFactory : public DeviceFactory {
320  public:
321   Status ListPhysicalDevices(std::vector<string>* devices) override;
322   Status CreateDevices(const SessionOptions& options,
323                        const std::string& name_prefix,
324                        std::vector<std::unique_ptr<Device>>* devices) override;
325   Status GetDeviceDetails(int device_index,
326                           std::unordered_map<string, string>* details) override;
327 
328   struct InterconnectMap {
329     // Name of interconnect technology, if known.
330     std::string name;
331     // If possible, strength should approximate Gb/sec bandwidth rate.
332     // Where architecture-specific subclassing is not done that won't
333     // always be possible.  The minimum expectation is that
334     // faster links should have a higher value than slower links.
335     int32 strength;
336     static const int kSameDeviceStrength;
337     static const int kStreamExecutorStrength;
338     std::set<std::pair<PlatformDeviceId, PlatformDeviceId>> directed_links;
339   };
340 
341  protected:
342   // Populates *maps with interconnect maps for all local direct access
343   // pathways between GPUs.
344   virtual Status GetInterconnectMaps(
345       const std::vector<PlatformDeviceId>& visible_gpu_order,
346       se::Platform* gpu_manager, std::vector<InterconnectMap>* maps);
347 
348   struct TfDeviceIdHash {
operatorTfDeviceIdHash349     std::size_t operator()(const TfDeviceId& id) const noexcept {
350       return std::hash<int>{}(id.value());
351     }
352   };
353   typedef std::unordered_map<TfDeviceId, DeviceLocality, TfDeviceIdHash>
354       LocalityMap;
355   // Populates *localities with the DeviceLocality descriptor for
356   // every TfDeviceId.
357   virtual Status GetDeviceLocalities(
358       int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
359       LocalityMap* localities);
360 
361  private:
362   // Creates a BaseGPUDevice associated with 'tf_device_id', allocates
363   // (strictly) 'memory_limit' bytes of GPU memory to it, and adds it to the
364   // 'devices' vector.
365   Status CreateGPUDevice(const SessionOptions& options,
366                          const std::string& name_prefix,
367                          TfDeviceId tf_device_id, int64_t memory_limit,
368                          const DeviceLocality& dev_locality, size_t num_tf_gpus,
369                          std::vector<std::unique_ptr<Device>>* devices);
370 
371   virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice(
372       const SessionOptions& options, const string& name, Bytes memory_limit,
373       const DeviceLocality& dev_locality, TfDeviceId tf_device_id,
374       const string& physical_device_desc, Allocator* gpu_allocator,
375       Allocator* cpu_allocator) = 0;
376 
377   Status EnablePeerAccess(
378       const std::vector<PlatformDeviceId>& visible_gpu_order);
379 
380   // Returns into 'ids' the list of valid platform GPU ids, in the order that
381   // they should map to TF GPU ids "/device:GPU:0", "/device:GPU:1", etc,
382   // based upon 'visible_gpu_order' which was generated by parsing
383   // GPUOptions::visible_device_list which is a comma-separated list of CUDA or
384   // ROCm GPU ids.
385   Status GetValidDeviceIds(
386       const std::vector<PlatformDeviceId>& visible_gpu_order,
387       std::vector<PlatformDeviceId>* ids);
388 
389   // Cache the valid device IDs if not already cached. Cached IDs are stored in
390   // field cached_device_ids_. Passes {0, 1, ..., num_devices-1} to
391   // GetValidDeviceIds, so this should only be used in functions where all
392   // devices should be treated as visible, like ListPhysicalDevices.
393   Status CacheDeviceIds();
394 
395   // visible_gpu_initialized_[platform_device_id] is true if visible GPU
396   // platform_device_id has been initialized by the process.
397   std::unordered_map<int, bool> visible_gpu_initialized_;
398 
399   // Cached device IDs, as returned by GetValidDeviceIds when every physical
400   // device is visible. Cache should not be used if some devices are not
401   // visible.
402   std::vector<PlatformDeviceId> cached_device_ids_;
403 };
404 
405 }  // namespace tensorflow
406 
407 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
408