1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_ 18 #define MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_ 19 20 #include <new> 21 #include <vector> 22 #include <memory> 23 #include <thread> 24 #include <atomic> 25 #include <condition_variable> 26 #include <mutex> 27 #include <functional> 28 #include "thread/threadlog.h" 29 #include "thread/core_affinity.h" 30 31 namespace mindspore { 32 constexpr int kDefaultSpinCount = 300000; 33 constexpr int kMaxCount = 30000; 34 constexpr int kMinSpinCount = 1; 35 constexpr int kDefaultFrequency = 1; 36 constexpr float kMaxScale = 1.; 37 38 enum ThreadStatus { 39 kThreadBusy = 0, // busy, the thread is running task 40 kThreadHeld = 1, // held, the thread has been marked as occupied 41 kThreadIdle = 2 // idle, the thread is waiting 42 }; 43 44 // used in scenarios with unequal division of task 45 // the parameters indicate the start and end coefficients 46 using Func = std::function<int(void *, int, float, float)>; 47 using Content = void *; 48 49 typedef struct Task { TaskTask50 Task(Func f, Content c) : func(f), content(c) {} 51 Func func; 52 Content content; 53 std::atomic_int finished{0}; 54 std::atomic_int status{THREAD_OK}; // return status, RET_OK 55 } Task; 56 57 class Worker { 58 public: 59 Worker() = default; 60 virtual ~Worker(); 61 // create thread and start running at the same time 62 void CreateThread(); 63 // assign task and then activate thread 64 void Active(Task *task, int task_id); 65 // activate thread 66 void Active(); 67 // whether or not it is idle and marked as held 68 bool available(); 69 // assigns task first before running 70 bool RunLocalKernelTask(); 71 // set max spin count before running SetMaxSpinCount(int max_spin_count)72 void SetMaxSpinCount(int max_spin_count) { max_spin_count_ = max_spin_count; } 73 set_frequency(int frequency)74 void set_frequency(int frequency) { frequency_ = frequency; } frequency()75 int frequency() const { return frequency_; } 76 77 void set_scale(float lhs_scale, float rhs_scale); lhs_scale()78 float lhs_scale() const { return lhs_scale_; } rhs_scale()79 float rhs_scale() const { return rhs_scale_; } 80 thread_id()81 std::thread::id thread_id() const { return thread_.get_id(); } 82 #ifdef BIND_CORE set_mask(const cpu_set_t & mask)83 void set_mask(const cpu_set_t &mask) { mask_ = mask; } handle()84 pthread_t handle() { return thread_.native_handle(); } 85 #endif 86 87 protected: 88 void SetAffinity(); 89 void Run(); 90 void YieldAndDeactive(); 91 void WaitUntilActive(); 92 93 bool alive_{true}; 94 std::thread thread_; 95 #ifdef BIND_CORE 96 cpu_set_t mask_; 97 #endif 98 std::atomic_int status_{kThreadBusy}; 99 std::atomic_int active_num_{0}; 100 101 std::mutex mutex_; 102 std::condition_variable cond_var_; 103 104 std::atomic<Task *> task_{nullptr}; 105 std::atomic_int task_id_{0}; 106 float lhs_scale_{0.}; 107 float rhs_scale_{kMaxScale}; 108 int frequency_{kDefaultFrequency}; 109 int spin_count_{0}; 110 int max_spin_count_{kMinSpinCount}; 111 }; 112 113 class ThreadPool { 114 public: 115 static ThreadPool *CreateThreadPool(size_t thread_num, const std::vector<int> &core_list = {}); 116 virtual ~ThreadPool(); 117 thread_num()118 size_t thread_num() const { return workers_.size(); } 119 120 int SetCpuAffinity(const std::vector<int> &core_list); 121 int SetCpuAffinity(BindMode bind_mode); 122 int SetProcessAffinity(BindMode bind_mode) const; 123 124 int ParallelLaunch(const Func &func, Content content, int task_num) const; DisableOccupiedActorThread()125 void DisableOccupiedActorThread() { occupied_actor_thread_ = false; } SetActorThreadNum(size_t actor_thread_num)126 void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; } SetKernelThreadNum(size_t kernel_thread_num)127 void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; } GetKernelThreadNum()128 size_t GetKernelThreadNum() const { return kernel_thread_num_; } 129 void SetSpinCountMaxValue(); 130 void SetSpinCountMinValue(); 131 void SetMaxSpinCount(int spin_count); 132 void SetMinSpinCount(int spin_count); 133 void ActiveWorkers() const; 134 135 protected: 136 ThreadPool() = default; 137 138 int CreateThreads(size_t thread_num, const std::vector<int> &core_list); 139 140 int InitAffinityInfo(); 141 142 void SyncRunTask(Task *task, int start_num, int task_num) const; 143 144 void DistributeTask(Task *task, int task_num) const; 145 void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const; 146 void ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num, const Worker *curr) const; 147 148 Worker *CurrentWorker() const; 149 150 std::mutex pool_mutex_; 151 std::vector<Worker *> workers_; 152 CoreAffinity *affinity_{nullptr}; 153 size_t actor_thread_num_{0}; 154 size_t kernel_thread_num_{0}; 155 bool occupied_actor_thread_{true}; 156 int max_spin_count_{kDefaultSpinCount}; 157 int min_spin_count_{kMinSpinCount}; 158 }; 159 160 } // namespace mindspore 161 #endif // MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_ 162