• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #if !GOOGLE_CUDA && !TENSORFLOW_USE_ROCM
17 #error This file must only be included when building with Cuda or ROCm support
18 #endif
19 
20 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
21 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
22 
23 #include <memory>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
29 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
30 #include "tensorflow/core/common_runtime/device_factory.h"
31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
32 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
33 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
34 #include "tensorflow/core/common_runtime/gpu_device_context.h"
35 #include "tensorflow/core/common_runtime/local_device.h"
36 #include "tensorflow/core/common_runtime/node_file_writer.h"
37 #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h"
38 #include "tensorflow/core/common_runtime/shared_counter.h"
39 #include "tensorflow/core/framework/allocator.h"
40 #include "tensorflow/core/framework/device_base.h"
41 #include "tensorflow/core/framework/op_kernel.h"
42 #include "tensorflow/core/framework/tensor.h"
43 #include "tensorflow/core/lib/core/status.h"
44 #include "tensorflow/core/lib/gtl/inlined_vector.h"
45 #include "tensorflow/core/platform/mutex.h"
46 #include "tensorflow/core/platform/stream_executor.h"
47 #include "tensorflow/core/platform/types.h"
48 #include "tensorflow/core/public/session_options.h"
49 
50 namespace Eigen {
51 class StreamInterface;
52 }
53 
54 namespace tensorflow {
55 class GPUKernelTracker;
56 
57 class ConcretePerOpGpuDevice : public PerOpGpuDevice {
58  public:
59   ConcretePerOpGpuDevice();
60 
61   void Reinitialize(OpKernelContext* context, const void* gpu_stream,
62                     TfDeviceId tf_device_id, Allocator* base_allocator,
63                     char* scratch);
64 
65   void Reinitialize(OpKernelContext* context, const void* gpu_stream,
66                     PlatformDeviceId platform_device_id,
67                     Allocator* base_allocator, char* scratch);
68 
69   const Eigen::GpuDevice& device() const override;
70 
71  private:
72   std::unique_ptr<::Eigen::StreamInterface> stream_device_;
73 };
74 
75 class BaseGPUDevice : public LocalDevice {
76  public:
77   BaseGPUDevice(const SessionOptions& options, const std::string& name,
78                 Bytes memory_limit, const DeviceLocality& locality,
79                 TfDeviceId tf_device_id,
80                 const std::string& physical_device_desc,
81                 Allocator* gpu_allocator, Allocator* cpu_allocator,
82                 bool sync_every_op);
83 
84   ~BaseGPUDevice() override;
85 
86   // Initialize the device and return the status of initialization.
87   Status Init(const SessionOptions& options);
88 
89   void Compute(OpKernel* op_kernel, OpKernelContext* context) override;
90 
91   Status Sync() override;
92 
93   void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context,
94                     AsyncOpKernel::DoneCallback done) override;
95 
96   Status MakeTensorFromProto(const TensorProto& tensor_proto,
97                              const AllocatorAttributes alloc_attrs,
98                              Tensor* tensor) override;
99 
100   void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor,
101                               const DeviceContext* device_context,
102                               StatusCallback done) override;
103 
104   // The caller owns the returned device.
105   PerOpGpuDevice* MakeGpuDevice() override;
106 
107   Status ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device,
108                                DeviceContext* dc,
109                                Allocator* allocator) override;
110 
111   // Returns the platform GPU id of this device within the native driver system;
112   // e.g., for CUDA and ROCm this is the ordinal of the GPU within the system.
gpu_id()113   int gpu_id() const {
114     PlatformDeviceId platform_device_id;
115     TF_CHECK_OK(
116         GpuIdManager::TfToPlatformDeviceId(tf_device_id_, &platform_device_id));
117     return platform_device_id.value();
118   }
119 
120   // The executor that provides control for the device; e.g., for CUDA this
121   // corresponds to the cuda context.
executor()122   se::StreamExecutor* executor() const { return executor_; }
123 
124   Allocator* GetScopedAllocator(AllocatorAttributes attr,
125                                 int64_t step_id) override;
126 
GetScopedAllocatorMgr()127   ScopedAllocatorMgr* GetScopedAllocatorMgr() const override {
128     return scoped_allocator_mgr_.get();
129   }
130 
131   // The following two functions always return 0 unless one of the
132   // related experimental config options has been specified.
133 
134   // If returned value is > 0 then GPU Memory chunks freed before this count
135   // are guaranteed not to be in use by any kernel pending on this device.
136   uint64 SafeAllocFrontier(uint64 old_value) override;
137 
138   // Returns the number of kernels that have been queued for execution on
139   // the compute stream and are not yet known to have completed.
140   int PendingKernels();
141 
priority()142   int priority() const { return stream_->priority; }
143 
144   // Helper method for unit tests to reset the streams. Never use in production.
145   static void TestOnlyReset();
146 
GetStream()147   void* GetStream() {
148     return stream_->compute->implementation()->GpuStreamMemberHack();
149   }
150 
151  protected:
152   Allocator* gpu_allocator_;  // not owned
153   Allocator* cpu_allocator_;  // not owned
154 
155   se::StreamExecutor* executor_;  // not owned
156   std::unique_ptr<ScopedAllocatorMgr> scoped_allocator_mgr_;
157 
158  private:
159   friend class GPUDeviceTestHelper;
160   struct StreamGroup {
161     se::Stream* compute = nullptr;
162 #if TENSORFLOW_USE_ROCM
163     se::Stream* nccl = nullptr;
164 #endif
165     se::Stream* host_to_device = nullptr;
166     se::Stream* device_to_host = nullptr;
167     gtl::InlinedVector<se::Stream*, 4> device_to_device;
168     int priority = 0;
169   };
170   class StreamGroupFactory;
171 
172   StreamGroup* stream_;
173   mutex scratch_init_mutex_;
174   char* scratch_ = nullptr;
175   GPUDeviceContext* device_context_;
176   DeviceBase::AcceleratorDeviceInfo* accelerator_device_info_ = nullptr;
177   mutex trace_mu_;
178   TfDeviceId tf_device_id_;
179   const bool sync_every_op_ = false;
180   EventMgr* em_ = nullptr;
181   std::unique_ptr<thread::ThreadPool> thread_pool_;
182   std::unique_ptr<GPUKernelTracker> kernel_tracker_;
183   int32 pending_cap_ = 0;
184   bool timestamped_allocator_ = false;
185   NodeFileWriter* node_file_writer_ = nullptr;  // not owned
186 
187   // Initialize scratch buffers used by Eigen.
188   Status InitScratchBuffers();
189 
190   void ReinitializeDevice(OpKernelContext* context, PerOpGpuDevice* device,
191                           int stream_id, Allocator* allocator);
192 
193   std::string ComputeOpKernelDebugString(const OpKernel& op_kernel,
194                                          const int& stream_id);
195 
196   // This method returns an initialization status, in addition to
197   // calling the "done" StatusCallback, if there is a failure to
198   // allocate memory or if the tensor "from" is not DMA-copyable.
199   // If there is no error prior to enqueueing the copy, an OK status
200   // is returned.
201   Status MaybeCopyTensorToGPU(const AllocatorAttributes& alloc_attrs,
202                               const Tensor& from, Tensor* to,
203                               StatusCallback done);
204 
205   Tensor CopyGpuTensorToHostDebugOnly(const Tensor& gpu_tensor);
206   void LogInputs(OpKernel* op_kernel, OpKernelContext* context);
207   void LogOutputs(OpKernel* op_kernel, OpKernelContext* context);
208 };
209 
210 // A per-compute-stream utility that keeps track of kernels that have been
211 // queued for execution but may not yet have terminated and also the queued
212 // time of the most recently terminated kernel.
213 class GPUKernelTracker {
214  public:
215   // Controls the strategy for inserting tracking events after GPU kernels.
216   //   If max_interval >= 0, then insert an event after this many kernels
217   //     if an event has not been inserted for another reason.
218   //   If max_bytes > 0, then insert an event after kernels allocating this
219   //     many bytes have been queued since the last event.
220   //   If max_pending > 0, then track up to this many events at once.  If
221   //     this limit is reached the GPU::Compute() method will delay starting
222   //     additional ops until some event completes.  If 0 and one of the other
223   //     fields is non-zero, then a reasonable default will be selected.
224   struct Params {
225     int max_interval = 0;
226     int max_bytes = 0;
227     int max_pending = 0;
ParamsParams228     Params(int mi, int mb, int mp)
229         : max_interval(mi), max_bytes(mb), max_pending(mp) {}
230   };
231 
232   // If we're going to share a SharedCounter with an allocator, it's owned
233   // by the allocator because allocators are initialized once per process.
234   // Devices are per-session.
GPUKernelTracker(const Params & params,Env * env,se::Stream * compute_stream,SharedCounter * timing_counter,Allocator * allocator,EventMgr * event_manager)235   explicit GPUKernelTracker(const Params& params, Env* env,
236                             se::Stream* compute_stream,
237                             SharedCounter* timing_counter, Allocator* allocator,
238                             EventMgr* event_manager)
239       : params_(params),
240         env_(env),
241         stream_(compute_stream),
242         timing_counter_(timing_counter),
243         allocator_(allocator),
244         em_(event_manager),
245         pending_kernels_(
246             params.max_pending > 0 ? std::max(8, 2 * params.max_pending) : 64) {
247     mem_since_last_ = 0;
248     if (!timing_counter_) {
249       // There's not a preexisting counter owned by GPUProcessState, i.e.
250       // pending_cap > 0 but timestamped_allocator == false.
251       owned_counter_.reset(new SharedCounter);
252       timing_counter_ = owned_counter_.get();
253     }
254   }
255 
256   // Determine whether a GPU kernel should have a recording event queued
257   // immediately afterwards.  If so, advance the counter and return the new
258   // counter value after enqueuing.
259   uint64 MaybeQueue(OpKernelContext* ctx);
260 
261   // Record that a GPU kernel has just been enqueued on the compute stream.
262   // Inserts the supplied counter value in a new PendingKernel record appended
263   // to the end of the ring buffer then returns that same count.
264   // Caller is responsible for ensuring that RecordTerminate() is eventually
265   // called with the same counter value.
266   void RecordQueued(uint64 queued_count, int weight)
267       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
268 
269   // Takes a count value returned by RecordQueued and finds the corresponding
270   // PendingKernel record in the ring buffer.  Marks the kernel as completed and
271   // advances the completion frontier accordingly.
272   void RecordTerminated(uint64 queued_count);
273 
274   // Returns the largest timing count such that all kernels queued no
275   // later than that count are known to have terminated.
LastTerminatedCount(uint64 old_value)276   inline uint64 LastTerminatedCount(uint64 old_value) {
277     uint64 new_value = last_terminated_count_.load(std::memory_order_relaxed);
278     if (new_value == old_value) {
279       MaybeQueueProgressEvent();
280     }
281     return new_value;
282   }
283 
284   // Returns the number of kernels enqueued that are not yet known to
285   // have terminated.
NumPending()286   int NumPending() {
287     mutex_lock l(mu_);
288     return num_pending_;
289   }
290 
291   // Yield current thread until number of pending kernels no longer
292   // exceeds the cap.
PauseWhilePendingExceeds(int cap)293   void PauseWhilePendingExceeds(int cap) TF_LOCKS_EXCLUDED(mu_) {
294     mutex_lock l(mu_);
295     while (num_pending_ > cap) {
296       VLOG(1) << "num_pending_=" << num_pending_ << " cap=" << cap;
297       pending_decreased_.wait(l);
298     }
299   }
300 
301  private:
302   friend class GPUKernelTrackerTest;
303   Params params_;
304   Env* env_;
305   se::Stream* stream_;
306   SharedCounter* timing_counter_;
307   std::unique_ptr<SharedCounter> owned_counter_;
308   Allocator* allocator_ = nullptr;
309   EventMgr* em_ = nullptr;
310   std::atomic<uint64> last_terminated_count_ = {1};
311 
312   void MaybeQueueProgressEvent();
313 
314   // Records when a kernel was queued for execution.  Kernel launches are
315   // identified by a unique count value from a per-GPU device timing counter.
316   struct PendingKernel {
317     uint64 queued_count;
318     int weight;
319     bool terminated;
PendingKernelPendingKernel320     PendingKernel(const PendingKernel& pk)
321         : queued_count(pk.queued_count),
322           weight(pk.weight),
323           terminated(pk.terminated) {}
PendingKernelPendingKernel324     PendingKernel() : queued_count(0), weight(0), terminated(false) {}
325   };
326   mutex mu_;
327   int32 mem_since_last_ TF_GUARDED_BY(mu_);
328   int32 ops_since_last_ TF_GUARDED_BY(mu_);
329   // Ring buffer of PendingKernel records.
330   std::vector<PendingKernel> pending_kernels_ TF_GUARDED_BY(mu_);
331   // Next unused slot in pending_kernels_.
332   int first_available_ TF_GUARDED_BY(mu_) = 0;
333   // Last completed PendingKernel such that all prior PendingKernels are
334   // also completed.  With out-of-order completion there may be a mixture
335   // of completed and uncompleted entries between last_completed_ and
336   // first_available_.
337   int last_completed_ TF_GUARDED_BY(mu_) = -1;
338   // Sum of weights of the outstanding events marking tracked kernels.
339   int num_pending_ TF_GUARDED_BY(mu_) = 0;
340   condition_variable pending_decreased_ TF_GUARDED_BY(mu_);
341 };
342 
343 class BaseGPUDeviceFactory : public DeviceFactory {
344  public:
345   Status ListPhysicalDevices(std::vector<string>* devices) override;
346   Status CreateDevices(const SessionOptions& options,
347                        const std::string& name_prefix,
348                        std::vector<std::unique_ptr<Device>>* devices) override;
349   Status GetDeviceDetails(int device_index,
350                           std::unordered_map<string, string>* details) override;
351 
352   struct InterconnectMap {
353     // Name of interconnect technology, if known.
354     std::string name;
355     // If possible, strength should approximate Gb/sec bandwidth rate.
356     // Where architecture-specific subclassing is not done that won't
357     // always be possible.  The minimum expectation is that
358     // faster links should have a higher value than slower links.
359     int32 strength;
360     static const int kSameDeviceStrength;
361     static const int kStreamExecutorStrength;
362     std::set<std::pair<PlatformDeviceId, PlatformDeviceId>> directed_links;
363   };
364 
365  protected:
366   // Populates *maps with interconnect maps for all local direct access
367   // pathways between GPUs.
368   virtual Status GetInterconnectMaps(
369       const std::vector<PlatformDeviceId>& visible_gpu_order,
370       se::Platform* gpu_manager, std::vector<InterconnectMap>* maps);
371 
372   struct TfDeviceIdHash {
operatorTfDeviceIdHash373     std::size_t operator()(const TfDeviceId& id) const noexcept {
374       return std::hash<int>{}(id.value());
375     }
376   };
377   typedef std::unordered_map<TfDeviceId, DeviceLocality, TfDeviceIdHash>
378       LocalityMap;
379   // Populates *localities with the DeviceLocality descriptor for
380   // every TfDeviceId.
381   virtual Status GetDeviceLocalities(
382       int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
383       LocalityMap* localities);
384 
385  private:
386   // Creates a BaseGPUDevice associated with 'tf_device_id', allocates
387   // (strictly) 'memory_limit' bytes of GPU memory to it, and adds it to the
388   // 'devices' vector.
389   Status CreateGPUDevice(const SessionOptions& options,
390                          const std::string& name_prefix,
391                          TfDeviceId tf_device_id, int64_t memory_limit,
392                          const DeviceLocality& dev_locality, size_t num_tf_gpus,
393                          std::vector<std::unique_ptr<Device>>* devices);
394 
395   virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice(
396       const SessionOptions& options, const string& name, Bytes memory_limit,
397       const DeviceLocality& dev_locality, TfDeviceId tf_device_id,
398       const string& physical_device_desc, Allocator* gpu_allocator,
399       Allocator* cpu_allocator) = 0;
400 
401   Status EnablePeerAccess(
402       const std::vector<PlatformDeviceId>& visible_gpu_order);
403 
404   // Returns into 'ids' the list of valid platform GPU ids, in the order that
405   // they should map to TF GPU ids "/device:GPU:0", "/device:GPU:1", etc,
406   // based upon 'visible_gpu_order' which was generated by parsing
407   // GPUOptions::visible_device_list which is a comma-separated list of CUDA or
408   // ROCm GPU ids.
409   Status GetValidDeviceIds(
410       const std::vector<PlatformDeviceId>& visible_gpu_order,
411       std::vector<PlatformDeviceId>* ids);
412 
413   // Cache the valid device IDs if not already cached. Cached IDs are stored in
414   // field cached_device_ids_. Passes {0, 1, ..., num_devices-1} to
415   // GetValidDeviceIds, so this should only be used in functions where all
416   // devices should be treated as visible, like ListPhysicalDevices.
417   Status CacheDeviceIds();
418 
419   // visible_gpu_initialized_[platform_device_id] is true if visible GPU
420   // platform_device_id has been initialized by the process.
421   std::unordered_map<int, bool> visible_gpu_initialized_;
422 
423   // Cached device IDs, as returned by GetValidDeviceIds when every physical
424   // device is visible. Cache should not be used if some devices are not
425   // visible.
426   std::vector<PlatformDeviceId> cached_device_ids_;
427 };
428 
429 }  // namespace tensorflow
430 
431 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
432