1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_ 17 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_ 18 19 #include <deque> 20 #include <vector> 21 #include "tensorflow/core/framework/log_memory.h" 22 #include "tensorflow/core/framework/tensor.h" 23 #include "tensorflow/core/framework/tensor_reference.h" 24 #include "tensorflow/core/lib/core/notification.h" 25 #include "tensorflow/core/lib/core/threadpool.h" 26 #include "tensorflow/core/lib/gtl/inlined_vector.h" 27 #include "tensorflow/core/platform/mutex.h" 28 #include "tensorflow/core/platform/stream_executor.h" 29 #include "tensorflow/core/platform/thread_annotations.h" 30 #include "tensorflow/core/platform/types.h" 31 32 namespace stream_executor { 33 class Event; 34 class Stream; 35 class StreamExecutor; 36 } // namespace stream_executor 37 38 namespace tensorflow { 39 40 class GPUOptions; 41 42 // The callback provided to EventMgr::ThenExecute must not block or take a long 43 // time. If it does, performance may be impacted and GPU memory may be 44 // exhausted. This macro is for checking that an EventMgr thread is not 45 // accidentally entering blocking parts of the code, e.g. the RPC subsystem. 46 // 47 // Intended use is something like 48 // 49 // void RespondToAnRPC(Params* params) { 50 // WARN_IF_IN_EVENT_MGR_THREAD; 51 // if (params->status.ok()) { ... 52 // 53 namespace gpu_event_mgr { 54 // Logs a stack trace if current execution thread belongs to this EventMgr 55 // object. If f is not nullptr, executes instead of logging the stack trace. 56 // trace. 57 void WarnIfInCallback(std::function<void()> f); 58 } // namespace gpu_event_mgr 59 #define WARN_IF_IN_EVENT_MGR_THREAD gpu_event_mgr::WarnIfInCallback(nullptr) 60 61 // An object to keep track of pending Events in the StreamExecutor streams 62 // and associated Tensors that cannot safely be deleted until the associated 63 // Events are recorded. 64 class EventMgr { 65 public: 66 EventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options); 67 68 ~EventMgr(); 69 70 // Releases the references on the elements of "tensors" as soon as 71 // all events currently enqueued on "stream" have completed. 72 void ThenDeleteTensors(se::Stream* stream, 73 const TensorReferenceVector& tensors); 74 75 struct BufRec { 76 Allocator* alloc; 77 void* buf; 78 // operation and step_id are only populated when 79 // LogMemory::IsEnabled() is true. 80 string operation; 81 int64 step_id; 82 }; 83 84 // Takes ownership of *bufrec.buf and calls bufrec.alloc->DeallocateRaw() 85 // on it as soon as all events currently enqueued on *stream have completed. ThenDeleteBuffer(se::Stream * stream,BufRec bufrec)86 inline void ThenDeleteBuffer(se::Stream* stream, BufRec bufrec) { 87 ToFreeVector to_free; 88 { 89 mutex_lock l(mu_); 90 QueueBuffer(stream, bufrec); 91 PollEvents(false, &to_free); 92 } 93 FreeMemory(to_free); 94 } 95 96 // Execute func when all pending stream actions have completed. 97 // func must be brief and non-blocking since it executes in the one 98 // thread used for all such callbacks and also buffer deletions. ThenExecute(se::Stream * stream,std::function<void ()> func)99 inline void ThenExecute(se::Stream* stream, std::function<void()> func) { 100 ToFreeVector to_free; 101 { 102 mutex_lock l(mu_); 103 QueueFunc(stream, std::move(func)); 104 PollEvents(false, &to_free); 105 } 106 FreeMemory(to_free); 107 } 108 109 private: 110 friend class TEST_EventMgrHelper; 111 se::StreamExecutor* const exec_; 112 const int64 deferred_bytes_threshold_; 113 const int32 polling_active_delay_usecs_; 114 mutex mu_; 115 condition_variable events_pending_ GUARDED_BY(mu_); 116 117 void FlushAccumulatedTensors() EXCLUSIVE_LOCKS_REQUIRED(mu_); 118 119 struct InUse { 120 se::Event* event; 121 TensorReferenceVector* mem; 122 BufRec bufrec; 123 std::function<void()> func; 124 }; 125 126 typedef gtl::InlinedVector<InUse, 4> ToFreeVector; 127 FreeMemory(const ToFreeVector & to_free)128 void FreeMemory(const ToFreeVector& to_free) { 129 for (const auto& iu : to_free) { 130 if (iu.mem != nullptr) { 131 for (auto& t : *(iu.mem)) { 132 t.Unref(); 133 } 134 delete iu.mem; 135 } 136 if (iu.bufrec.buf) { 137 if (LogMemory::IsEnabled()) { 138 LogMemory::RecordRawDeallocation(iu.bufrec.operation, 139 iu.bufrec.step_id, iu.bufrec.buf, 140 iu.bufrec.alloc, false); 141 } 142 iu.bufrec.alloc->DeallocateRaw(iu.bufrec.buf); 143 } 144 // The function must be called in another thread. 145 if (iu.func != nullptr) threadpool_.Schedule(iu.func); 146 } 147 } 148 149 // Stream-enqueue an unused Event and save with it a collection of 150 // Tensors and/or a BufRec to be deleted only after the Event 151 // records. 152 void QueueInUse(se::Stream* stream, InUse in_use) 153 EXCLUSIVE_LOCKS_REQUIRED(mu_); 154 QueueTensors(se::Stream * stream,TensorReferenceVector * tensors)155 void QueueTensors(se::Stream* stream, TensorReferenceVector* tensors) 156 EXCLUSIVE_LOCKS_REQUIRED(mu_) { 157 QueueInUse(stream, {nullptr, tensors, BufRec(), nullptr}); 158 } 159 QueueBuffer(se::Stream * stream,BufRec bufrec)160 void QueueBuffer(se::Stream* stream, BufRec bufrec) 161 EXCLUSIVE_LOCKS_REQUIRED(mu_) { 162 QueueInUse(stream, {nullptr, nullptr, bufrec, nullptr}); 163 } 164 QueueFunc(se::Stream * stream,std::function<void ()> func)165 void QueueFunc(se::Stream* stream, std::function<void()> func) 166 EXCLUSIVE_LOCKS_REQUIRED(mu_) { 167 QueueInUse(stream, {nullptr, nullptr, BufRec(), std::move(func)}); 168 } 169 170 // This function should be called at roughly the same tempo as 171 // QueueTensors() to check whether pending events have recorded, 172 // and then retire them. It appends InUse elements that need cleanup 173 // to "*to_free". The caller should call FreeMemory(to_free) 174 // when this returns. 175 void PollEvents(bool is_dedicated_poller, ToFreeVector* to_free) 176 EXCLUSIVE_LOCKS_REQUIRED(mu_); 177 178 // An internal polling loop that runs at a low frequency to clear 179 // straggler Events. 180 void PollLoop(); 181 182 // Setup/Teardown functions for the polling loop. 183 void StartPollingLoop(); 184 void StopPollingLoop(); 185 186 // A stack of unused events 187 std::vector<se::Event*> free_events_ GUARDED_BY(mu_); 188 189 // Buffered list of tensors waiting to have an event queued for deletion 190 se::Stream* accumulated_stream_ GUARDED_BY(mu_); 191 TensorReferenceVector* accumulated_tensors_ GUARDED_BY(mu_); 192 // Sum of the TotalBytes() of the tensors in "accumulated_tensors_" 193 int64 accumulated_tensor_bytes_ GUARDED_BY(mu_); 194 195 // A FIFO queue of InUse events and associated tensors. 196 std::deque<InUse> used_events_ GUARDED_BY(mu_); 197 198 bool stop_polling_ GUARDED_BY(mu_); 199 std::unique_ptr<Notification> polling_stopped_; 200 201 // The main PollLoop for the event manager runs in this threadpool. 202 thread::ThreadPool threadpool_; 203 }; 204 205 } // namespace tensorflow 206 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_ 207