• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef _MSC_VER
17 #include <sched.h>
18 #include <unistd.h>
19 #endif
20 #include "thread/threadpool.h"
21 #include "thread/core_affinity.h"
22 
23 namespace mindspore {
~Worker()24 Worker::~Worker() {
25   {
26     std::lock_guard<std::mutex> _l(mutex_);
27     alive_ = false;
28   }
29   cond_var_.notify_one();
30   if (thread_.joinable()) {
31     thread_.join();
32   }
33 }
34 
CreateThread()35 void Worker::CreateThread() { thread_ = std::thread(&Worker::Run, this); }
36 
SetAffinity()37 void Worker::SetAffinity() {
38 #ifdef BIND_CORE
39 #ifdef __ANDROID__
40   int ret = sched_setaffinity(gettid(), sizeof(cpu_set_t), &mask_);
41   if (ret != THREAD_OK) {
42     THREAD_ERROR("bind thread %d to cpu failed. ERROR %d", gettid(), errno);
43   }
44   return;
45 #else
46 #if !defined(__APPLE__) && !defined(SUPPORT_MSVC)
47   int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask_);
48   if (ret != THREAD_OK) {
49     THREAD_ERROR("bind thread %lu to cpu failed. ERROR %d", pthread_self(), errno);
50   }
51   return;
52 #endif
53 #endif
54 #endif
55 }
56 
Run()57 void Worker::Run() {
58   SetAffinity();
59 #if !defined(__APPLE__) && !defined(SUPPORT_MSVC)
60   static std::atomic_int index = {0};
61   (void)pthread_setname_np(pthread_self(), ("KernelThread_" + std::to_string(index++)).c_str());
62 #endif
63   while (alive_) {
64     if (RunLocalKernelTask()) {
65       spin_count_ = 0;
66     } else {
67       YieldAndDeactive();
68     }
69     if (spin_count_ > max_spin_count_) {
70       WaitUntilActive();
71       spin_count_ = 0;
72     }
73   }
74 }
75 
RunLocalKernelTask()76 bool Worker::RunLocalKernelTask() {
77   Task *task = task_.load(std::memory_order_consume);
78   if (task == nullptr) {
79     return false;
80   }
81   int task_id = task_id_.load(std::memory_order_consume);
82   task->status |= task->func(task->content, task_id, lhs_scale_, rhs_scale_);
83   task_.store(nullptr, std::memory_order_relaxed);
84   (void)++task->finished;
85   return true;
86 }
87 
YieldAndDeactive()88 void Worker::YieldAndDeactive() {
89   // deactivate this worker only on the first entry
90   if (spin_count_ == 0) {
91     status_.store(kThreadIdle);
92   }
93   spin_count_++;
94   std::this_thread::yield();
95 }
96 
WaitUntilActive()97 void Worker::WaitUntilActive() {
98   std::unique_lock<std::mutex> _l(mutex_);
99   cond_var_.wait(_l, [&] { return status_ == kThreadBusy || active_num_ > 0 || !alive_; });
100   active_num_--;
101 }
102 
set_scale(float lhs_scale,float rhs_scale)103 void Worker::set_scale(float lhs_scale, float rhs_scale) {
104   lhs_scale_ = lhs_scale;
105   rhs_scale_ = rhs_scale;
106 }
107 
Active(Task * task,int task_id)108 void Worker::Active(Task *task, int task_id) {
109   {
110     std::lock_guard<std::mutex> _l(mutex_);
111     task_id_.store(task_id, std::memory_order_relaxed);
112     task_.store(task, std::memory_order_relaxed);
113     status_ = kThreadBusy;
114   }
115   cond_var_.notify_one();
116 }
117 
Active()118 void Worker::Active() {
119   {
120     std::lock_guard<std::mutex> _l(mutex_);
121     active_num_++;
122     status_ = kThreadBusy;
123   }
124   cond_var_.notify_one();
125 }
126 
available()127 bool Worker::available() {
128   int expected = kThreadIdle;
129   return status_.compare_exchange_strong(expected, kThreadHeld);
130 }
131 
~ThreadPool()132 ThreadPool::~ThreadPool() {
133   for (auto &worker : workers_) {
134     delete worker;
135     worker = nullptr;
136   }
137   workers_.clear();
138   delete affinity_;
139   affinity_ = nullptr;
140   THREAD_INFO("destruct success");
141 }
142 
CreateThreads(size_t thread_num,const std::vector<int> & core_list)143 int ThreadPool::CreateThreads(size_t thread_num, const std::vector<int> &core_list) {
144   size_t core_num = std::thread::hardware_concurrency();
145   thread_num = thread_num < core_num ? thread_num : core_num;
146   THREAD_INFO("ThreadInfo, Num: [%zu], CoreNum: [%zu]", thread_num, core_num);
147   if (thread_num == 0) {
148     THREAD_INFO("Current thread as working thread.");
149     return THREAD_OK;
150   }
151   std::lock_guard<std::mutex> _l(pool_mutex_);
152   for (size_t i = 0; i < thread_num; ++i) {
153     auto worker = new (std::nothrow) Worker();
154     THREAD_ERROR_IF_NULL(worker);
155 #ifdef BIND_CORE
156     cpu_set_t mask;
157     CPU_ZERO(&mask);
158     if (core_list.size() > 0) {
159       CPU_SET(core_list[workers_.size() % core_list.size()], &mask);
160     }
161     worker->set_mask(mask);
162 #endif
163     worker->CreateThread();
164     workers_.push_back(worker);
165     THREAD_INFO("create kernel thread[%zu]", i);
166   }
167   return THREAD_OK;
168 }
169 
ParallelLaunch(const Func & func,Content content,int task_num) const170 int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num) const {
171   // if single thread, run master thread
172   if (thread_num() <= 1 || task_num <= 1) {
173     for (int i = 0; i < task_num; ++i) {
174       int ret = func(content, i, 0, 1);
175       if (ret != 0) {
176         return ret;
177       }
178     }
179     return THREAD_OK;
180   }
181 
182   // distribute task to the KernelThread and the idle ActorThread,
183   // if the task num is greater than the KernelThread num
184   THREAD_DEBUG("launch: %d", task_num);
185   Task task = {func, content};
186 
187   DistributeTask(&task, task_num);
188   // synchronization
189   // wait until the finished is equal to task_num
190   while (task.finished != task_num) {
191     std::this_thread::yield();
192   }
193   // check the return value of task
194   if (task.status != THREAD_OK) {
195     return THREAD_ERROR;
196   }
197   return THREAD_OK;
198 }
199 
SyncRunTask(Task * task,int start_num,int task_num) const200 void ThreadPool::SyncRunTask(Task *task, int start_num, int task_num) const {
201   // run task sequentially
202   // if the current thread is not the actor thread
203   float per_scale = kMaxScale / (task_num - start_num);
204   for (int i = start_num; i < task_num; ++i) {
205     float lhs_scale = i * per_scale;
206     float rhs_scale = (i + 1) * per_scale;
207     rhs_scale = i == task_num - 1 ? kMaxScale : rhs_scale;
208     task->status |= task->func(task->content, i, lhs_scale, rhs_scale);
209     (void)++task->finished;
210   }
211 }
212 
DistributeTask(Task * task,int task_num) const213 void ThreadPool::DistributeTask(Task *task, int task_num) const {
214   Worker *curr = CurrentWorker();
215   // if the current thread isn't nullptr, that is the curr is a ActorThread,
216   // then assign (task_num - 1) tasks to workers, and run the last one by itself
217   int count = 0;
218   int num_assigned = curr != nullptr ? task_num - 1 : task_num;
219   int sum_frequency = 0;
220   std::vector<Worker *> assigned;
221   int num = static_cast<int>(workers_.size()) - 1;
222   int offset = 0;
223   if (!occupied_actor_thread_) {
224     offset = static_cast<int>(actor_thread_num_);
225   }
226   for (int i = num; i >= offset && count < num_assigned; --i) {
227     if (workers_[i]->available()) {
228       assigned.push_back(workers_[i]);
229       sum_frequency += workers_[i]->frequency();
230       count++;
231     }
232   }
233   // when there are not enough free threads,
234   // distribute other tasks to the master thread
235   if (curr != nullptr) {
236     for (; count < task_num; ++count) {
237       assigned.push_back(curr);
238       sum_frequency += curr->frequency();
239     }
240   } else if (assigned.size() != static_cast<size_t>(task_num)) {
241     CalculateScales(assigned, sum_frequency);
242     ActiveWorkers(assigned, task, assigned.size(), curr);
243     SyncRunTask(task, assigned.size(), task_num);
244     return;
245   }
246   CalculateScales(assigned, sum_frequency);
247   ActiveWorkers(assigned, task, task_num, curr);
248 }
249 
CalculateScales(const std::vector<Worker * > & assigned,int sum_frequency) const250 void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_frequency) const {
251   // divide task according to computing power(core frequency)
252   float lhs_scale = 0;
253   float rhs_scale = 0;
254   if (sum_frequency == 0) {
255     return;
256   }
257   for (const auto &worker : assigned) {
258     THREAD_RETURN_IF_NULL(worker);
259     rhs_scale += worker->frequency() * 1.0 / sum_frequency;
260     rhs_scale = rhs_scale < 1 ? rhs_scale : 1;
261     worker->set_scale(lhs_scale, rhs_scale);
262     lhs_scale = rhs_scale;
263   }
264 }
265 
ActiveWorkers(const std::vector<Worker * > & workers,Task * task,int task_num,const Worker * curr) const266 void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num,
267                                const Worker *curr) const {
268   for (int i = 0; i < task_num; ++i) {
269     Worker *worker = workers[i];
270     THREAD_RETURN_IF_NULL(worker);
271     worker->Active(task, i);
272     if (worker == curr) {
273       (void)worker->RunLocalKernelTask();
274     }
275   }
276 }
277 
ActiveWorkers() const278 void ThreadPool::ActiveWorkers() const {
279   for (auto &worker : workers_) {
280     worker->Active();
281   }
282 }
283 
CurrentWorker() const284 Worker *ThreadPool::CurrentWorker() const {
285   for (const auto &worker : workers_) {
286     if (worker->thread_id() == std::this_thread::get_id()) {
287       return worker;
288     }
289   }
290   return nullptr;
291 }
292 
InitAffinityInfo()293 int ThreadPool::InitAffinityInfo() {
294   affinity_ = new (std::nothrow) CoreAffinity();
295   THREAD_ERROR_IF_NULL(affinity_);
296   int ret = affinity_->InitHardwareCoreInfo();
297   if (ret != THREAD_OK) {
298     delete affinity_;
299     affinity_ = nullptr;
300     return THREAD_ERROR;
301   }
302   return THREAD_OK;
303 }
304 
SetCpuAffinity(BindMode bind_mode)305 int ThreadPool::SetCpuAffinity(BindMode bind_mode) {
306   if (workers_.empty()) {
307     return THREAD_ERROR;
308   }
309 #ifdef BIND_CORE
310   THREAD_ERROR_IF_NULL(affinity_);
311   return affinity_->BindThreads(workers_, bind_mode);
312 #else
313   return THREAD_OK;
314 #endif  // BIND_CORE
315 }
316 
SetCpuAffinity(const std::vector<int> & core_list)317 int ThreadPool::SetCpuAffinity(const std::vector<int> &core_list) {
318   if (workers_.empty()) {
319     return THREAD_ERROR;
320   }
321 #ifdef BIND_CORE
322   THREAD_ERROR_IF_NULL(affinity_);
323   return affinity_->BindThreads(workers_, core_list);
324 #else
325   return THREAD_OK;
326 #endif  // BIND_CORE
327 }
328 
SetProcessAffinity(BindMode bind_mode) const329 int ThreadPool::SetProcessAffinity(BindMode bind_mode) const {
330 #ifdef BIND_CORE
331   THREAD_ERROR_IF_NULL(affinity_);
332   return affinity_->BindProcess(bind_mode);
333 #else
334   return THREAD_OK;
335 #endif  // BIND_CORE
336 }
337 
SetSpinCountMaxValue()338 void ThreadPool::SetSpinCountMaxValue() {
339   for (auto worker : workers_) {
340     THREAD_RETURN_IF_NULL(worker);
341     worker->SetMaxSpinCount(max_spin_count_);
342   }
343   return;
344 }
345 
SetSpinCountMinValue()346 void ThreadPool::SetSpinCountMinValue() {
347   for (auto worker : workers_) {
348     THREAD_RETURN_IF_NULL(worker);
349     worker->SetMaxSpinCount(min_spin_count_);
350   }
351   return;
352 }
353 
SetMaxSpinCount(int spin_count)354 void ThreadPool::SetMaxSpinCount(int spin_count) {
355   if (spin_count <= 0) {
356     return;
357   }
358   max_spin_count_ = spin_count;
359 }
360 
SetMinSpinCount(int spin_count)361 void ThreadPool::SetMinSpinCount(int spin_count) {
362   if (spin_count <= 0) {
363     return;
364   }
365   min_spin_count_ = spin_count;
366 }
367 
CreateThreadPool(size_t thread_num,const std::vector<int> & core_list)368 ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, const std::vector<int> &core_list) {
369   ThreadPool *pool = new (std::nothrow) ThreadPool();
370   if (pool == nullptr) {
371     return nullptr;
372   }
373   int ret = pool->CreateThreads(thread_num, core_list);
374   if (ret != THREAD_OK) {
375     delete pool;
376     return nullptr;
377   }
378 #ifdef BIND_CORE
379   ret = pool->InitAffinityInfo();
380   if (ret != THREAD_OK) {
381     delete pool;
382     return nullptr;
383   }
384 #endif  // BIND_CORE
385   return pool;
386 }
387 }  // namespace mindspore
388