1 /** 2 * Copyright 2021-2022 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_ 18 #define MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_ 19 20 #include <queue> 21 #include <string> 22 #include <new> 23 #include <vector> 24 #include <unordered_map> 25 #include <memory> 26 #include <thread> 27 #include <atomic> 28 #include <condition_variable> 29 #include <mutex> 30 #include <functional> 31 #include "thread/threadlog.h" 32 #include "thread/core_affinity.h" 33 #ifndef _WIN32 34 #if defined(__x86_64__) || defined(__amd64__) || defined(_M_IX86) || defined(_M_X64) 35 #define PLATFORM_86 36 #include <pmmintrin.h> 37 #endif 38 #endif 39 #include "mindapi/base/macros.h" 40 #include "thread/hqueue.h" 41 42 #define USE_HQUEUE 43 namespace mindspore { 44 constexpr int kDefaultSpinCount = 300000; 45 constexpr int kMaxCount = 30000; 46 constexpr int kDefaultKernelSpinCount = 3000; 47 constexpr int kMinSpinCount = 1; 48 constexpr int kDefaultFrequency = 1; 49 constexpr float kMaxScale = 1.; 50 constexpr size_t kMaxHqueueSize = 8192; 51 constexpr size_t kMinActorRunOther = 2; 52 /* Thread status */ 53 constexpr int kThreadBusy = 0; // busy, the thread is running task 54 constexpr int kThreadHeld = 1; // held, the thread has been marked as occupied 55 constexpr int kThreadIdle = 2; // idle, the thread is waiting 56 57 // used in scenarios with unequal division of task 58 // the parameters indicate the start and end coefficients 59 60 using Func = std::function<int(void *, int, float, float)>; 61 62 using Content = void *; 63 64 typedef struct Task { TaskTask65 Task(Func f, Content c) : func(f), content(c) {} 66 Func func; 67 Content content; 68 std::atomic_int finished{0}; 69 std::atomic_int status{THREAD_OK}; // return status, RET_OK 70 } Task; 71 72 typedef struct TaskSplit { TaskSplitTaskSplit73 TaskSplit(Task *task, int task_id) : task_(task), task_id_(task_id) {} 74 Task *task_; 75 int task_id_; 76 } TaskSplit; 77 78 class ThreadPool; 79 class Worker { 80 public: Worker(ThreadPool * pool,size_t index)81 explicit Worker(ThreadPool *pool, size_t index) : pool_(pool), worker_id_(index) { 82 cond_var_ = std::make_unique<std::condition_variable>(); 83 } 84 virtual ~Worker(); 85 // create thread and start running at the same time 86 virtual void CreateThread(); 87 // assign task and then activate thread 88 void Active(std::vector<TaskSplit> *task_list, int task_id_start, int task_id_end); 89 // activate thread 90 void Active(); 91 92 // whether or not it is idle and marked as held 93 bool available(); 94 // assigns task first before running 95 virtual bool RunLocalKernelTask(); 96 virtual void RunOtherKernelTask(); 97 // try to run a single task 98 bool TryRunTask(TaskSplit *task_split); 99 // set max spin count before running SetMaxSpinCount(int max_spin_count)100 void SetMaxSpinCount(int max_spin_count) { max_spin_count_ = max_spin_count; } 101 void InitWorkerMask(const std::vector<int> &core_list, const size_t workers_size); InitLocalTaskQueue(HQueue<TaskSplit> * task_queue)102 void InitLocalTaskQueue(HQueue<TaskSplit> *task_queue) { local_task_queue_ = task_queue; } 103 set_frequency(int frequency)104 void set_frequency(int frequency) { frequency_ = frequency; } frequency()105 int frequency() const { return frequency_; } 106 107 void set_scale(float lhs_scale, float rhs_scale); lhs_scale()108 float lhs_scale() const { return lhs_scale_; } rhs_scale()109 float rhs_scale() const { return rhs_scale_; } local_task_queue()110 HQueue<TaskSplit> *local_task_queue() { return local_task_queue_; } 111 thread_id()112 std::thread::id thread_id() const { 113 THREAD_TEST_TRUE(thread_ == nullptr); 114 return thread_->get_id(); 115 } 116 117 #ifdef _WIN32 core_id()118 uint64_t core_id() { return core_id_; } 119 #elif defined(BIND_CORE) set_mask(const cpu_set_t & mask)120 void set_mask(const cpu_set_t &mask) { mask_ = mask; } handle()121 pthread_t handle() { 122 THREAD_TEST_TRUE(thread_ == nullptr); 123 return thread_->native_handle(); 124 } 125 #endif set_alive(bool flag)126 inline void set_alive(bool flag) { alive_ = flag; } alive()127 inline bool alive() const { return alive_; } 128 void ChildAfterFork(); 129 130 protected: 131 void SetAffinity(); 132 void YieldAndDeactive(); 133 virtual void WaitUntilActive(); 134 135 bool alive_{true}; 136 std::unique_ptr<std::thread> thread_{nullptr}; 137 #ifdef _WIN32 138 uint64_t core_id_; 139 #elif defined(BIND_CORE) 140 cpu_set_t mask_; 141 #endif 142 std::atomic_int status_{kThreadBusy}; 143 std::atomic_int active_num_{0}; 144 145 std::mutex mutex_; 146 std::unique_ptr<std::condition_variable> cond_var_{nullptr}; 147 148 std::atomic<Task *> task_{nullptr}; 149 std::atomic_int task_id_{0}; 150 float lhs_scale_{0.}; 151 float rhs_scale_{kMaxScale}; 152 int frequency_{kDefaultFrequency}; 153 int spin_count_{0}; 154 std::atomic_int max_spin_count_{kMinSpinCount}; 155 ThreadPool *pool_{nullptr}; 156 HQueue<TaskSplit> *local_task_queue_{nullptr}; 157 size_t worker_id_{0}; 158 std::vector<int> core_list_; 159 160 private: 161 void Run(); 162 }; 163 164 class MS_CORE_API ThreadPool { 165 public: 166 static ThreadPool *CreateThreadPool(size_t thread_num, const std::vector<int> &core_list = {}); 167 virtual ~ThreadPool(); 168 thread_num()169 size_t thread_num() const { return workers_.size(); } task_queues()170 const std::vector<std::unique_ptr<HQueue<TaskSplit>>> &task_queues() { return task_queues_; } 171 172 int SetCpuAffinity(const std::vector<int> &core_list); 173 int SetCpuAffinity(BindMode bind_mode); 174 int SetProcessAffinity(BindMode bind_mode) const; 175 void SyncRunTask(Task *task, int start_num, int task_num) const; 176 int SyncRunFunc(const Func &func, Content content, int start, int end) const; 177 178 virtual int ParallelLaunch(const Func &func, Content content, int task_num); 179 DisableOccupiedActorThread()180 void DisableOccupiedActorThread() { occupied_actor_thread_ = false; } SetActorThreadNum(size_t actor_thread_num)181 void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; } SetKernelThreadNum(size_t kernel_thread_num)182 void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; } GetKernelThreadNum()183 size_t GetKernelThreadNum() const { return kernel_thread_num_ + actor_thread_num_; } GetActorThreadNum()184 size_t GetActorThreadNum() const { return actor_thread_num_; } 185 void SetKernelThreadMaxSpinCount(int spin_count); 186 void SetSpinCountMaxValue(); 187 void SetSpinCountMinValue(); 188 void SetMaxSpinCount(int spin_count); 189 void SetMinSpinCount(int spin_count); 190 void ActiveWorkers(); 191 void SetWorkerIdMap(); 192 // init task queues 193 int TaskQueuesInit(size_t thread_num); 194 void ChildAfterFork(); GetWorkerIdMap()195 const std::unordered_map<std::thread::id, size_t> &GetWorkerIdMap() const { return worker_ids_; } GetServerCpuFrequence()196 float GetServerCpuFrequence() const { return server_cpu_frequence; } actor_thread_num()197 inline size_t actor_thread_num() const { return actor_thread_num_; } SetRunnerID(const std::string & runner_id)198 virtual bool SetRunnerID(const std::string &runner_id) { return false; } 199 template <typename T = Worker> CreateThreads(size_t thread_num,const std::vector<int> & core_list)200 int CreateThreads(size_t thread_num, const std::vector<int> &core_list) { 201 size_t core_num = std::thread::hardware_concurrency(); 202 thread_num = thread_num < core_num ? thread_num : core_num; 203 THREAD_INFO("ThreadInfo, Num: [%zu], CoreNum: [%zu]", thread_num, core_num); 204 if (thread_num == 0) { 205 THREAD_INFO("Current thread as working thread."); 206 return THREAD_OK; 207 } 208 std::lock_guard<std::mutex> _l(pool_mutex_); 209 size_t start = workers_.size(); 210 for (size_t i = 0; i < thread_num; ++i) { 211 auto worker = new (std::nothrow) T(this, workers_.size()); 212 THREAD_ERROR_IF_NULL(worker); 213 worker->InitWorkerMask(core_list, workers_.size()); 214 size_t queues_idx = start + i; 215 if (queues_idx >= task_queues_.size()) { 216 THREAD_ERROR("task_queues out of range."); 217 return THREAD_ERROR; 218 } 219 worker->InitLocalTaskQueue(task_queues_[queues_idx].get()); 220 workers_.push_back(worker); 221 } 222 for (size_t i = 0; i < thread_num; ++i) { 223 workers_[start + i]->CreateThread(); 224 THREAD_INFO("create kernel thread[%zu]", i); 225 } 226 return THREAD_OK; 227 } 228 229 protected: 230 ThreadPool() = default; 231 232 int InitAffinityInfo(); 233 234 void DistributeTask(std::vector<TaskSplit> *task_list, Task *task, int task_num, Worker *curr) const; 235 void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const; 236 void ActiveWorkers(const std::vector<Worker *> &workers, std::vector<TaskSplit> *task_list, int task_num, 237 const Worker *curr) const; 238 239 Worker *CurrentWorker(size_t *index) const; 240 Worker *CurrentWorker() const; 241 242 std::mutex pool_mutex_; 243 std::vector<Worker *> workers_; 244 std::vector<std::unique_ptr<HQueue<TaskSplit>>> task_queues_; 245 std::unordered_map<std::thread::id, size_t> worker_ids_; 246 CoreAffinity *affinity_{nullptr}; 247 std::atomic<size_t> actor_thread_num_{0}; 248 std::atomic<size_t> kernel_thread_num_{0}; 249 bool occupied_actor_thread_{true}; 250 std::atomic_int max_spin_count_{kDefaultSpinCount}; 251 std::atomic_int min_spin_count_{kMinSpinCount}; 252 float server_cpu_frequence = -1.0f; // Unit : GHz 253 static std::mutex create_thread_pool_muntex_; 254 }; 255 } // namespace mindspore 256 #endif // MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_ 257