• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2022 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
18 #define MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
19 
20 #include <queue>
21 #include <string>
22 #include <new>
23 #include <vector>
24 #include <unordered_map>
25 #include <memory>
26 #include <thread>
27 #include <atomic>
28 #include <condition_variable>
29 #include <mutex>
30 #include <functional>
31 #include "thread/threadlog.h"
32 #include "thread/core_affinity.h"
33 #ifndef _WIN32
34 #if defined(__x86_64__) || defined(__amd64__) || defined(_M_IX86) || defined(_M_X64)
35 #define PLATFORM_86
36 #include <pmmintrin.h>
37 #endif
38 #endif
39 #include "mindapi/base/macros.h"
40 #include "thread/hqueue.h"
41 
42 #define USE_HQUEUE
43 namespace mindspore {
44 constexpr int kDefaultSpinCount = 300000;
45 constexpr int kMaxCount = 30000;
46 constexpr int kDefaultKernelSpinCount = 3000;
47 constexpr int kMinSpinCount = 1;
48 constexpr int kDefaultFrequency = 1;
49 constexpr float kMaxScale = 1.;
50 constexpr size_t kMaxHqueueSize = 8192;
51 constexpr size_t kMinActorRunOther = 2;
52 /* Thread status */
53 constexpr int kThreadBusy = 0;  // busy, the thread is running task
54 constexpr int kThreadHeld = 1;  // held, the thread has been marked as occupied
55 constexpr int kThreadIdle = 2;  // idle, the thread is waiting
56 
57 // used in scenarios with unequal division of task
58 // the parameters indicate the start and end coefficients
59 
60 using Func = std::function<int(void *, int, float, float)>;
61 
62 using Content = void *;
63 
64 typedef struct Task {
TaskTask65   Task(Func f, Content c) : func(f), content(c) {}
66   Func func;
67   Content content;
68   std::atomic_int finished{0};
69   std::atomic_int status{THREAD_OK};  // return status, RET_OK
70 } Task;
71 
72 typedef struct TaskSplit {
TaskSplitTaskSplit73   TaskSplit(Task *task, int task_id) : task_(task), task_id_(task_id) {}
74   Task *task_;
75   int task_id_;
76 } TaskSplit;
77 
78 class ThreadPool;
79 class Worker {
80  public:
Worker(ThreadPool * pool,size_t index)81   explicit Worker(ThreadPool *pool, size_t index) : pool_(pool), worker_id_(index) {
82     cond_var_ = std::make_unique<std::condition_variable>();
83   }
84   virtual ~Worker();
85   // create thread and start running at the same time
86   virtual void CreateThread();
87   // assign task and then activate thread
88   void Active(std::vector<TaskSplit> *task_list, int task_id_start, int task_id_end);
89   // activate thread
90   void Active();
91 
92   // whether or not it is idle and marked as held
93   bool available();
94   // assigns task first before running
95   virtual bool RunLocalKernelTask();
96   virtual void RunOtherKernelTask();
97   // try to run a single task
98   bool TryRunTask(TaskSplit *task_split);
99   // set max spin count before running
SetMaxSpinCount(int max_spin_count)100   void SetMaxSpinCount(int max_spin_count) { max_spin_count_ = max_spin_count; }
101   void InitWorkerMask(const std::vector<int> &core_list, const size_t workers_size);
InitLocalTaskQueue(HQueue<TaskSplit> * task_queue)102   void InitLocalTaskQueue(HQueue<TaskSplit> *task_queue) { local_task_queue_ = task_queue; }
103 
set_frequency(int frequency)104   void set_frequency(int frequency) { frequency_ = frequency; }
frequency()105   int frequency() const { return frequency_; }
106 
107   void set_scale(float lhs_scale, float rhs_scale);
lhs_scale()108   float lhs_scale() const { return lhs_scale_; }
rhs_scale()109   float rhs_scale() const { return rhs_scale_; }
local_task_queue()110   HQueue<TaskSplit> *local_task_queue() { return local_task_queue_; }
111 
thread_id()112   std::thread::id thread_id() const {
113     THREAD_TEST_TRUE(thread_ == nullptr);
114     return thread_->get_id();
115   }
116 
117 #ifdef _WIN32
core_id()118   uint64_t core_id() { return core_id_; }
119 #elif defined(BIND_CORE)
set_mask(const cpu_set_t & mask)120   void set_mask(const cpu_set_t &mask) { mask_ = mask; }
handle()121   pthread_t handle() {
122     THREAD_TEST_TRUE(thread_ == nullptr);
123     return thread_->native_handle();
124   }
125 #endif
set_alive(bool flag)126   inline void set_alive(bool flag) { alive_ = flag; }
alive()127   inline bool alive() const { return alive_; }
128   void ChildAfterFork();
129 
130  protected:
131   void SetAffinity();
132   void YieldAndDeactive();
133   virtual void WaitUntilActive();
134 
135   bool alive_{true};
136   std::unique_ptr<std::thread> thread_{nullptr};
137 #ifdef _WIN32
138   uint64_t core_id_;
139 #elif defined(BIND_CORE)
140   cpu_set_t mask_;
141 #endif
142   std::atomic_int status_{kThreadBusy};
143   std::atomic_int active_num_{0};
144 
145   std::mutex mutex_;
146   std::unique_ptr<std::condition_variable> cond_var_{nullptr};
147 
148   std::atomic<Task *> task_{nullptr};
149   std::atomic_int task_id_{0};
150   float lhs_scale_{0.};
151   float rhs_scale_{kMaxScale};
152   int frequency_{kDefaultFrequency};
153   int spin_count_{0};
154   std::atomic_int max_spin_count_{kMinSpinCount};
155   ThreadPool *pool_{nullptr};
156   HQueue<TaskSplit> *local_task_queue_{nullptr};
157   size_t worker_id_{0};
158   std::vector<int> core_list_;
159 
160  private:
161   void Run();
162 };
163 
164 class MS_CORE_API ThreadPool {
165  public:
166   static ThreadPool *CreateThreadPool(size_t thread_num, const std::vector<int> &core_list = {});
167   virtual ~ThreadPool();
168 
thread_num()169   size_t thread_num() const { return workers_.size(); }
task_queues()170   const std::vector<std::unique_ptr<HQueue<TaskSplit>>> &task_queues() { return task_queues_; }
171 
172   int SetCpuAffinity(const std::vector<int> &core_list);
173   int SetCpuAffinity(BindMode bind_mode);
174   int SetProcessAffinity(BindMode bind_mode) const;
175   void SyncRunTask(Task *task, int start_num, int task_num) const;
176   int SyncRunFunc(const Func &func, Content content, int start, int end) const;
177 
178   virtual int ParallelLaunch(const Func &func, Content content, int task_num);
179 
DisableOccupiedActorThread()180   void DisableOccupiedActorThread() { occupied_actor_thread_ = false; }
SetActorThreadNum(size_t actor_thread_num)181   void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; }
SetKernelThreadNum(size_t kernel_thread_num)182   void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; }
GetKernelThreadNum()183   size_t GetKernelThreadNum() const { return kernel_thread_num_ + actor_thread_num_; }
GetActorThreadNum()184   size_t GetActorThreadNum() const { return actor_thread_num_; }
185   void SetKernelThreadMaxSpinCount(int spin_count);
186   void SetSpinCountMaxValue();
187   void SetSpinCountMinValue();
188   void SetMaxSpinCount(int spin_count);
189   void SetMinSpinCount(int spin_count);
190   void ActiveWorkers();
191   void SetWorkerIdMap();
192   // init task queues
193   int TaskQueuesInit(size_t thread_num);
194   void ChildAfterFork();
GetWorkerIdMap()195   const std::unordered_map<std::thread::id, size_t> &GetWorkerIdMap() const { return worker_ids_; }
GetServerCpuFrequence()196   float GetServerCpuFrequence() const { return server_cpu_frequence; }
actor_thread_num()197   inline size_t actor_thread_num() const { return actor_thread_num_; }
SetRunnerID(const std::string & runner_id)198   virtual bool SetRunnerID(const std::string &runner_id) { return false; }
199   template <typename T = Worker>
CreateThreads(size_t thread_num,const std::vector<int> & core_list)200   int CreateThreads(size_t thread_num, const std::vector<int> &core_list) {
201     size_t core_num = std::thread::hardware_concurrency();
202     thread_num = thread_num < core_num ? thread_num : core_num;
203     THREAD_INFO("ThreadInfo, Num: [%zu], CoreNum: [%zu]", thread_num, core_num);
204     if (thread_num == 0) {
205       THREAD_INFO("Current thread as working thread.");
206       return THREAD_OK;
207     }
208     std::lock_guard<std::mutex> _l(pool_mutex_);
209     size_t start = workers_.size();
210     for (size_t i = 0; i < thread_num; ++i) {
211       auto worker = new (std::nothrow) T(this, workers_.size());
212       THREAD_ERROR_IF_NULL(worker);
213       worker->InitWorkerMask(core_list, workers_.size());
214       size_t queues_idx = start + i;
215       if (queues_idx >= task_queues_.size()) {
216         THREAD_ERROR("task_queues out of range.");
217         return THREAD_ERROR;
218       }
219       worker->InitLocalTaskQueue(task_queues_[queues_idx].get());
220       workers_.push_back(worker);
221     }
222     for (size_t i = 0; i < thread_num; ++i) {
223       workers_[start + i]->CreateThread();
224       THREAD_INFO("create kernel thread[%zu]", i);
225     }
226     return THREAD_OK;
227   }
228 
229  protected:
230   ThreadPool() = default;
231 
232   int InitAffinityInfo();
233 
234   void DistributeTask(std::vector<TaskSplit> *task_list, Task *task, int task_num, Worker *curr) const;
235   void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const;
236   void ActiveWorkers(const std::vector<Worker *> &workers, std::vector<TaskSplit> *task_list, int task_num,
237                      const Worker *curr) const;
238 
239   Worker *CurrentWorker(size_t *index) const;
240   Worker *CurrentWorker() const;
241 
242   std::mutex pool_mutex_;
243   std::vector<Worker *> workers_;
244   std::vector<std::unique_ptr<HQueue<TaskSplit>>> task_queues_;
245   std::unordered_map<std::thread::id, size_t> worker_ids_;
246   CoreAffinity *affinity_{nullptr};
247   std::atomic<size_t> actor_thread_num_{0};
248   std::atomic<size_t> kernel_thread_num_{0};
249   bool occupied_actor_thread_{true};
250   std::atomic_int max_spin_count_{kDefaultSpinCount};
251   std::atomic_int min_spin_count_{kMinSpinCount};
252   float server_cpu_frequence = -1.0f;  // Unit : GHz
253   static std::mutex create_thread_pool_muntex_;
254 };
255 }  // namespace mindspore
256 #endif  // MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
257