1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
17
18 #include "tensorflow/core/platform/stacktrace.h"
19 #include "tensorflow/core/platform/stream_executor.h"
20 #include "tensorflow/core/protobuf/config.pb.h"
21
22 namespace tensorflow {
23
24 namespace {
25 // The EventMgr has 1 thread for the polling loop and one to execute
26 // event callback functions. Issues for reconsideration:
27 // - Is this the right number of threads?
28 // - Should EventMgrs be shared between GPUDevices on a multi-GPU machine?
29 static const int kNumThreads = 2;
30 } // namespace
31
32 namespace gpu_event_mgr {
33 class ThreadLabel {
34 public:
GetValue()35 static const char* GetValue() { return value_; }
36
37 // v must be a static const because value_ will capture and use its value
38 // until reset or thread terminates.
SetValue(const char * v)39 static void SetValue(const char* v) { value_ = v; }
40
41 private:
42 static thread_local const char* value_;
43 };
44 thread_local const char* ThreadLabel::value_ = "";
45
WarnIfInCallback(std::function<void ()> f)46 void WarnIfInCallback(std::function<void()> f) {
47 const char* label = ThreadLabel::GetValue();
48 if (label && !strcmp(label, "gpu_event_mgr")) {
49 if (f) {
50 f();
51 } else {
52 LOG(WARNING) << "Executing inside EventMgr callback thread: "
53 << CurrentStackTrace();
54 }
55 }
56 }
57
InitThreadpoolLabels(thread::ThreadPool * threadpool)58 void InitThreadpoolLabels(thread::ThreadPool* threadpool) {
59 static const char* label = "gpu_event_mgr";
60 mutex mu;
61 int init_count = 0;
62 condition_variable all_initialized;
63 int exit_count = 0;
64 condition_variable ready_to_exit;
65 const int num_threads = threadpool->NumThreads();
66 for (int i = 0; i < num_threads; ++i) {
67 threadpool->Schedule([num_threads, &mu, &init_count, &all_initialized,
68 &exit_count, &ready_to_exit]() {
69 gpu_event_mgr::ThreadLabel::SetValue(label);
70 mutex_lock l(mu);
71 ++init_count;
72 if (init_count == num_threads) {
73 all_initialized.notify_all();
74 }
75 while (init_count < num_threads) {
76 all_initialized.wait(l);
77 }
78 if (++exit_count == num_threads) {
79 ready_to_exit.notify_all();
80 }
81 });
82 }
83 {
84 mutex_lock l(mu);
85 while (exit_count < num_threads) {
86 ready_to_exit.wait(l);
87 }
88 }
89 }
90 } // namespace gpu_event_mgr
91
EventMgr(se::StreamExecutor * se,const GPUOptions & gpu_options)92 EventMgr::EventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options)
93 : exec_(se),
94 deferred_bytes_threshold_(gpu_options.deferred_deletion_bytes()
95 ? gpu_options.deferred_deletion_bytes()
96 : 8 * 1048576),
97 polling_active_delay_usecs_(gpu_options.polling_active_delay_usecs()
98 ? gpu_options.polling_active_delay_usecs()
99 : 10),
100 accumulated_stream_(nullptr),
101 accumulated_tensors_(new TensorReferenceVector),
102 accumulated_tensor_bytes_(0),
103 threadpool_(Env::Default(), "GPU_Event_Manager", kNumThreads) {
104 gpu_event_mgr::InitThreadpoolLabels(&threadpool_);
105 StartPollingLoop();
106 }
107
~EventMgr()108 EventMgr::~EventMgr() {
109 StopPollingLoop();
110
111 // Events are owned by this object.
112 for (auto& e : free_events_) {
113 delete e;
114 }
115 for (auto& t : *(accumulated_tensors_)) {
116 t.Unref();
117 }
118 delete accumulated_tensors_;
119 while (!used_events_.empty()) {
120 InUse* ue = &used_events_[0];
121 delete ue->event;
122 if (ue->mem != nullptr) {
123 for (auto& t : *(ue->mem)) {
124 t.Unref();
125 }
126 delete ue->mem;
127 }
128 if (ue->bufrec.buf) {
129 if (LogMemory::IsEnabled()) {
130 LogMemory::RecordRawDeallocation(ue->bufrec.operation,
131 ue->bufrec.step_id, ue->bufrec.buf,
132 ue->bufrec.alloc, false);
133 }
134 ue->bufrec.alloc->DeallocateRaw(ue->bufrec.buf);
135 }
136 if (ue->func != nullptr) threadpool_.Schedule(ue->func);
137 used_events_.pop_front();
138 }
139 }
140
StartPollingLoop()141 void EventMgr::StartPollingLoop() {
142 CHECK(polling_stopped_ == nullptr);
143 {
144 mutex_lock l(mu_);
145 stop_polling_ = false;
146 }
147 polling_stopped_.reset(new Notification);
148 threadpool_.Schedule([this]() { PollLoop(); });
149 }
150
StopPollingLoop()151 void EventMgr::StopPollingLoop() {
152 if (polling_stopped_) {
153 {
154 mutex_lock l(mu_);
155 stop_polling_ = true;
156 events_pending_.notify_all();
157 }
158 polling_stopped_->WaitForNotification();
159 polling_stopped_.reset(nullptr);
160 }
161 }
162
ThenDeleteTensors(se::Stream * stream,const TensorReferenceVector & tensors)163 void EventMgr::ThenDeleteTensors(se::Stream* stream,
164 const TensorReferenceVector& tensors) {
165 mutex_lock l(mu_);
166 // TODO(jeff): We currently keep one accumulated_tensors_ object.
167 // If we start to use multiple streams heavily, we might want to keep
168 // separate vectors/byte counters per stream
169 if (!accumulated_tensors_->empty() && stream != accumulated_stream_) {
170 FlushAccumulatedTensors();
171 }
172 accumulated_stream_ = stream;
173 for (const auto& t : tensors) {
174 // accumulated_tensors_ takes over ownership of the reference to "t"
175 accumulated_tensors_->push_back(t);
176 accumulated_tensor_bytes_ += t.TotalBytes();
177 }
178 if (accumulated_tensor_bytes_ >= deferred_bytes_threshold_) {
179 FlushAccumulatedTensors();
180 }
181 }
182
FlushAccumulatedTensors()183 void EventMgr::FlushAccumulatedTensors() {
184 DCHECK(!accumulated_tensors_->empty());
185 DCHECK(accumulated_stream_ != nullptr);
186 QueueTensors(accumulated_stream_, accumulated_tensors_);
187 accumulated_tensors_ = new TensorReferenceVector;
188 accumulated_tensor_bytes_ = 0;
189 accumulated_stream_ = nullptr;
190 }
191
192 // A polling loop to detect completion of GPU events.
193 //
194 // While one or more events is outstanding, poll for completed events. When no
195 // events are outstanding, we sleep until one is enqueued.
PollLoop()196 void EventMgr::PollLoop() {
197 ToFreeVector to_free;
198 while (true) {
199 bool events_still_pending;
200 {
201 mutex_lock l(mu_);
202 if (stop_polling_) {
203 break;
204 }
205 if (used_events_.empty()) {
206 events_pending_.wait(l);
207 }
208 PollEvents(true, &to_free);
209 events_still_pending = !used_events_.empty();
210 }
211 FreeMemory(to_free);
212 to_free.clear();
213
214 if (events_still_pending) {
215 Env::Default()->SleepForMicroseconds(polling_active_delay_usecs_);
216 }
217 }
218 polling_stopped_->Notify();
219 }
220
QueueInUse(se::Stream * stream,InUse iu)221 void EventMgr::QueueInUse(se::Stream* stream, InUse iu) {
222 VLOG(2) << "QueueInUse free_events_ " << free_events_.size()
223 << " used_events_ " << used_events_.size();
224 // Events are created on demand, and repeatedly reused. There is no
225 // limit placed here on the number of allocated Events.
226 if (free_events_.empty()) {
227 free_events_.push_back(new se::Event(exec_));
228 free_events_.back()->Init();
229 }
230 se::Event* e = free_events_.back();
231 free_events_.pop_back();
232 stream->ThenRecordEvent(e);
233 iu.event = e;
234 bool was_empty = used_events_.empty();
235 used_events_.push_back(iu);
236 // Maybe wake up the polling thread
237 if (was_empty) events_pending_.notify_all();
238 }
239
240 // This function must be called periodically to check whether pending
241 // events have recorded, and then retire them. Initial observations
242 // suggest that typical behavior in a TensorFlow program is to have
243 // 0-3 events pending most of the time, but there are occasionally
244 // spikes of up to several hundred outstanding. (If GPUKernelTracker
245 // is used to cap pending kernels there should never be more than
246 // that many.)
247 //
248 // NOTE: If all events are on the same stream, no later event will
249 // complete before an earlier event, except possibly if the earlier
250 // event transitions to an error state, so there's no advantage in
251 // looking past the first kPending event. However, if we're using
252 // multiple streams there may be some gain in looking deeper.
253 // As a compromise, PollEvent() calls that are triggered by the queueing
254 // of a single event never look past the first kPending event. Consequently
255 // those calls do an expected constant amount of work, unaffected by the
256 // length of the pending queue. Calls coming from the dedicated
257 // polling thread always sweep the full queue.
PollEvents(bool is_dedicated_poller,gtl::InlinedVector<InUse,4> * to_free)258 void EventMgr::PollEvents(bool is_dedicated_poller,
259 gtl::InlinedVector<InUse, 4>* to_free) {
260 VLOG(2) << "PollEvents free_events_ " << free_events_.size()
261 << " used_events_ " << used_events_.size();
262 // Sweep the remaining events in order. If this is the dedicated
263 // polling thread, check the entire set. Otherwise, just sweep up to
264 // the first non-complete record that is still pending.
265 for (auto& iu : used_events_) {
266 if (iu.event == nullptr) continue;
267 se::Event::Status s = iu.event->PollForStatus();
268 switch (s) {
269 case se::Event::Status::kUnknown:
270 case se::Event::Status::kError:
271 // We don't expect to see these. Someday maybe propagate
272 // a Status error, but for now fail hard.
273 LOG(FATAL) << "Unexpected Event status: " << static_cast<int>(s);
274 break;
275 case se::Event::Status::kPending:
276 if (!is_dedicated_poller) return; // quit processing queue
277 break;
278 case se::Event::Status::kComplete:
279 // Make a copy of the InUse record so we can free it after releasing
280 // the lock
281 to_free->push_back(iu);
282 free_events_.push_back(iu.event);
283 // Mark this InUse record as completed.
284 iu.event = nullptr;
285 }
286 }
287 // Then clear any completed InUse records from the front of the queue.
288 while (!used_events_.empty()) {
289 InUse& iu = used_events_.front();
290 if (iu.event == nullptr) {
291 used_events_.pop_front();
292 } else {
293 break;
294 }
295 }
296 }
297
298 } // namespace tensorflow
299