1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_DEVICE_DEVICE_EVENT_MGR_H_ 17 #define TENSORFLOW_CORE_COMMON_RUNTIME_DEVICE_DEVICE_EVENT_MGR_H_ 18 19 #include <deque> 20 #include <vector> 21 22 #include "tensorflow/core/framework/log_memory.h" 23 #include "tensorflow/core/framework/tensor.h" 24 #include "tensorflow/core/lib/core/notification.h" 25 #include "tensorflow/core/lib/core/threadpool.h" 26 #include "tensorflow/core/lib/gtl/inlined_vector.h" 27 #include "tensorflow/core/platform/mutex.h" 28 #include "tensorflow/core/platform/stream_executor.h" 29 #include "tensorflow/core/platform/thread_annotations.h" 30 #include "tensorflow/core/platform/types.h" 31 32 namespace stream_executor { 33 class Event; 34 class Stream; 35 class StreamExecutor; 36 } // namespace stream_executor 37 38 namespace tensorflow { 39 40 // TODO(annarev): Check if we can use a more general option representation here 41 // that could work for other device types as well. 42 class GPUOptions; 43 44 // The callback provided to EventMgr::ThenExecute must not block or take a long 45 // time. If it does, performance may be impacted and device memory may be 46 // exhausted. This macro is for checking that an EventMgr thread is not 47 // accidentally entering blocking parts of the code, e.g. the RPC subsystem. 48 // 49 // Intended use is something like 50 // 51 // void RespondToAnRPC(Params* params) { 52 // WARN_IF_IN_EVENT_MGR_THREAD; 53 // if (params->status.ok()) { ... 54 // 55 namespace device_event_mgr { 56 // Logs a stack trace if current execution thread belongs to this EventMgr 57 // object. If f is not nullptr, executes instead of logging the stack trace. 58 // trace. 59 void WarnIfInCallback(std::function<void()> f); 60 } // namespace device_event_mgr 61 #define WARN_IF_IN_EVENT_MGR_THREAD device_event_mgr::WarnIfInCallback(nullptr) 62 63 // An object to keep track of pending Events in the StreamExecutor streams 64 // and associated Tensors that cannot safely be deleted until the associated 65 // Events are recorded. 66 class EventMgr { 67 public: 68 virtual ~EventMgr(); 69 70 // Execute func when all pending stream actions have completed. 71 // func must be brief and non-blocking since it executes in the one 72 // thread used for all such callbacks and also buffer deletions. ThenExecute(se::Stream * stream,std::function<void ()> func)73 inline void ThenExecute(se::Stream* stream, std::function<void()> func) { 74 ToFreeVector to_free; 75 { 76 mutex_lock l(mu_); 77 QueueFunc(stream, std::move(func)); 78 PollEvents(false, &to_free); 79 } 80 FreeMemory(to_free); 81 } 82 83 private: 84 friend class TEST_EventMgr; 85 friend class TEST_EventMgrHelper; 86 friend class EventMgrFactory; 87 se::StreamExecutor* const exec_; 88 const int32 polling_active_delay_usecs_; 89 mutex mu_; 90 condition_variable events_pending_ TF_GUARDED_BY(mu_); 91 92 struct InUse { 93 se::Event* event; 94 std::function<void()> func; 95 }; 96 97 typedef gtl::InlinedVector<InUse, 4> ToFreeVector; 98 99 EventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options); 100 FreeMemory(const ToFreeVector & to_free)101 void FreeMemory(const ToFreeVector& to_free) { 102 for (const auto& iu : to_free) { 103 // The function must be called in another thread. 104 if (iu.func != nullptr) threadpool_.Schedule(iu.func); 105 } 106 } 107 108 // Stream-enqueue an unused Event and save with it a collection of 109 // Tensors and/or a BufRec to be deleted only after the Event 110 // records. 111 void QueueInUse(se::Stream* stream, InUse in_use) 112 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 113 QueueFunc(se::Stream * stream,std::function<void ()> func)114 void QueueFunc(se::Stream* stream, std::function<void()> func) 115 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 116 QueueInUse(stream, {nullptr, std::move(func)}); 117 } 118 119 // This function should be called at roughly the same tempo as 120 // QueueTensors() to check whether pending events have recorded, 121 // and then retire them. It appends InUse elements that need cleanup 122 // to "*to_free". The caller should call FreeMemory(to_free) 123 // when this returns. 124 void PollEvents(bool is_dedicated_poller, ToFreeVector* to_free) 125 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 126 127 // An internal polling loop that runs at a low frequency to clear 128 // straggler Events. 129 void PollLoop(); 130 131 // Setup/Teardown functions for the polling loop. 132 void StartPollingLoop(); 133 void StopPollingLoop(); 134 135 // A stack of unused events 136 std::vector<se::Event*> free_events_ TF_GUARDED_BY(mu_); 137 138 // A FIFO queue of InUse events and associated tensors. 139 std::deque<InUse> used_events_ TF_GUARDED_BY(mu_); 140 141 bool stop_polling_ TF_GUARDED_BY(mu_); 142 std::unique_ptr<Notification> polling_stopped_; 143 144 // The main PollLoop for the event manager runs in this threadpool. 145 thread::ThreadPool threadpool_; 146 }; 147 148 // Manages all the EventMgr instances. 149 class EventMgrFactory { 150 public: 151 static EventMgrFactory* Singleton(); 152 153 EventMgr* GetEventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options); 154 155 private: 156 mutex mu_; 157 158 // Maintain one EventMgr per physical device (StreamExecutor is 159 // per-physical-device). 160 std::map<se::StreamExecutor*, EventMgr*> event_mgr_map_ TF_GUARDED_BY(mu_); 161 }; 162 163 } // namespace tensorflow 164 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_DEVICE_DEVICE_EVENT_MGR_H_ 165