• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_
17 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_
18 
19 #include <deque>
20 #include <vector>
21 #include "tensorflow/core/framework/log_memory.h"
22 #include "tensorflow/core/framework/tensor.h"
23 #include "tensorflow/core/framework/tensor_reference.h"
24 #include "tensorflow/core/lib/core/notification.h"
25 #include "tensorflow/core/lib/core/threadpool.h"
26 #include "tensorflow/core/lib/gtl/inlined_vector.h"
27 #include "tensorflow/core/platform/mutex.h"
28 #include "tensorflow/core/platform/stream_executor.h"
29 #include "tensorflow/core/platform/thread_annotations.h"
30 #include "tensorflow/core/platform/types.h"
31 
32 namespace stream_executor {
33 class Event;
34 class Stream;
35 class StreamExecutor;
36 }  // namespace stream_executor
37 
38 namespace tensorflow {
39 
40 class GPUOptions;
41 
42 // The callback provided to EventMgr::ThenExecute must not block or take a long
43 // time.  If it does, performance may be impacted and GPU memory may be
44 // exhausted.  This macro is for checking that an EventMgr thread is not
45 // accidentally entering blocking parts of the code, e.g. the RPC subsystem.
46 //
47 // Intended use is something like
48 //
49 //   void RespondToAnRPC(Params* params) {
50 //      WARN_IF_IN_EVENT_MGR_THREAD;
51 //      if (params->status.ok()) { ...
52 //
53 namespace gpu_event_mgr {
54 // Logs a stack trace if current execution thread belongs to this EventMgr
55 // object.  If f is not nullptr, executes instead of  logging the stack trace.
56 // trace.
57 void WarnIfInCallback(std::function<void()> f);
58 }  // namespace gpu_event_mgr
59 #define WARN_IF_IN_EVENT_MGR_THREAD gpu_event_mgr::WarnIfInCallback(nullptr)
60 
61 // An object to keep track of pending Events in the StreamExecutor streams
62 // and associated Tensors that cannot safely be deleted until the associated
63 // Events are recorded.
64 class EventMgr {
65  public:
66   EventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options);
67 
68   ~EventMgr();
69 
70   // Releases the references on the elements of "tensors" as soon as
71   // all events currently enqueued on "stream" have completed.
72   void ThenDeleteTensors(se::Stream* stream,
73                          const TensorReferenceVector& tensors);
74 
75   struct BufRec {
76     Allocator* alloc;
77     void* buf;
78     // operation and step_id are only populated when
79     // LogMemory::IsEnabled() is true.
80     string operation;
81     int64 step_id;
82   };
83 
84   // Takes ownership of *bufrec.buf and calls bufrec.alloc->DeallocateRaw()
85   // on it as soon as all events currently enqueued on *stream have completed.
ThenDeleteBuffer(se::Stream * stream,BufRec bufrec)86   inline void ThenDeleteBuffer(se::Stream* stream, BufRec bufrec) {
87     ToFreeVector to_free;
88     {
89       mutex_lock l(mu_);
90       QueueBuffer(stream, bufrec);
91       PollEvents(false, &to_free);
92     }
93     FreeMemory(to_free);
94   }
95 
96   // Execute func when all pending stream actions have completed.
97   // func must be brief and non-blocking since it executes in the one
98   // thread used for all such callbacks and also buffer deletions.
ThenExecute(se::Stream * stream,std::function<void ()> func)99   inline void ThenExecute(se::Stream* stream, std::function<void()> func) {
100     ToFreeVector to_free;
101     {
102       mutex_lock l(mu_);
103       QueueFunc(stream, std::move(func));
104       PollEvents(false, &to_free);
105     }
106     FreeMemory(to_free);
107   }
108 
109  private:
110   friend class TEST_EventMgrHelper;
111   se::StreamExecutor* const exec_;
112   const int64 deferred_bytes_threshold_;
113   const int32 polling_active_delay_usecs_;
114   mutex mu_;
115   condition_variable events_pending_ GUARDED_BY(mu_);
116 
117   void FlushAccumulatedTensors() EXCLUSIVE_LOCKS_REQUIRED(mu_);
118 
119   struct InUse {
120     se::Event* event;
121     TensorReferenceVector* mem;
122     BufRec bufrec;
123     std::function<void()> func;
124   };
125 
126   typedef gtl::InlinedVector<InUse, 4> ToFreeVector;
127 
FreeMemory(const ToFreeVector & to_free)128   void FreeMemory(const ToFreeVector& to_free) {
129     for (const auto& iu : to_free) {
130       if (iu.mem != nullptr) {
131         for (auto& t : *(iu.mem)) {
132           t.Unref();
133         }
134         delete iu.mem;
135       }
136       if (iu.bufrec.buf) {
137         if (LogMemory::IsEnabled()) {
138           LogMemory::RecordRawDeallocation(iu.bufrec.operation,
139                                            iu.bufrec.step_id, iu.bufrec.buf,
140                                            iu.bufrec.alloc, false);
141         }
142         iu.bufrec.alloc->DeallocateRaw(iu.bufrec.buf);
143       }
144       // The function must be called in another thread.
145       if (iu.func != nullptr) threadpool_.Schedule(iu.func);
146     }
147   }
148 
149   // Stream-enqueue an unused Event and save with it a collection of
150   // Tensors and/or a BufRec to be deleted only after the Event
151   // records.
152   void QueueInUse(se::Stream* stream, InUse in_use)
153       EXCLUSIVE_LOCKS_REQUIRED(mu_);
154 
QueueTensors(se::Stream * stream,TensorReferenceVector * tensors)155   void QueueTensors(se::Stream* stream, TensorReferenceVector* tensors)
156       EXCLUSIVE_LOCKS_REQUIRED(mu_) {
157     QueueInUse(stream, {nullptr, tensors, BufRec(), nullptr});
158   }
159 
QueueBuffer(se::Stream * stream,BufRec bufrec)160   void QueueBuffer(se::Stream* stream, BufRec bufrec)
161       EXCLUSIVE_LOCKS_REQUIRED(mu_) {
162     QueueInUse(stream, {nullptr, nullptr, bufrec, nullptr});
163   }
164 
QueueFunc(se::Stream * stream,std::function<void ()> func)165   void QueueFunc(se::Stream* stream, std::function<void()> func)
166       EXCLUSIVE_LOCKS_REQUIRED(mu_) {
167     QueueInUse(stream, {nullptr, nullptr, BufRec(), std::move(func)});
168   }
169 
170   // This function should be called at roughly the same tempo as
171   // QueueTensors() to check whether pending events have recorded,
172   // and then retire them.  It appends InUse elements that need cleanup
173   // to "*to_free".  The caller should call FreeMemory(to_free)
174   // when this returns.
175   void PollEvents(bool is_dedicated_poller, ToFreeVector* to_free)
176       EXCLUSIVE_LOCKS_REQUIRED(mu_);
177 
178   // An internal polling loop that runs at a low frequency to clear
179   // straggler Events.
180   void PollLoop();
181 
182   // Setup/Teardown functions for the polling loop.
183   void StartPollingLoop();
184   void StopPollingLoop();
185 
186   // A stack of unused events
187   std::vector<se::Event*> free_events_ GUARDED_BY(mu_);
188 
189   // Buffered list of tensors waiting to have an event queued for deletion
190   se::Stream* accumulated_stream_ GUARDED_BY(mu_);
191   TensorReferenceVector* accumulated_tensors_ GUARDED_BY(mu_);
192   // Sum of the TotalBytes() of the tensors in "accumulated_tensors_"
193   int64 accumulated_tensor_bytes_ GUARDED_BY(mu_);
194 
195   // A FIFO queue of InUse events and associated tensors.
196   std::deque<InUse> used_events_ GUARDED_BY(mu_);
197 
198   bool stop_polling_ GUARDED_BY(mu_);
199   std::unique_ptr<Notification> polling_stopped_;
200 
201   // The main PollLoop for the event manager runs in this threadpool.
202   thread::ThreadPool threadpool_;
203 };
204 
205 }  // namespace tensorflow
206 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_
207