• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef _MSC_VER
17 #include <sched.h>
18 #include <unistd.h>
19 #endif
20 #include <sstream>
21 #include "thread/threadpool.h"
22 #include "thread/core_affinity.h"
23 #if !defined(_WIN32) && !defined(BUILD_LITE)
24 #include "include/fork_utils.h"
25 #endif
26 
27 namespace mindspore {
28 std::mutex ThreadPool::create_thread_pool_muntex_;
29 
~Worker()30 Worker::~Worker() {
31   {
32     std::lock_guard<std::mutex> _l(mutex_);
33     alive_ = false;
34   }
35   cond_var_->notify_one();
36 
37   bool terminate = false;
38   int count = 0;
39   while (local_task_queue_ && !terminate && count++ < kMaxCount) {
40     terminate = local_task_queue_->Empty();
41     if (!terminate) {
42       auto task_split = local_task_queue_->Dequeue();
43       (void)TryRunTask(task_split);
44     }
45   }
46 
47   if (thread_->joinable()) {
48     thread_->join();
49   }
50   pool_ = nullptr;
51   local_task_queue_ = nullptr;
52 }
53 
CreateThread()54 void Worker::CreateThread() { thread_ = std::make_unique<std::thread>(&Worker::Run, this); }
55 
ChildAfterFork()56 void Worker::ChildAfterFork() {
57   THREAD_INFO("worker %zu recreate thread after fork in child process", worker_id_);
58   if (cond_var_ != nullptr) {
59     (void)cond_var_.release();
60     cond_var_ = std::make_unique<std::condition_variable>();
61   }
62   if (thread_ != nullptr) {
63     (void)thread_.release();
64     CreateThread();
65   }
66 }
67 
68 #if defined(BIND_CORE) && !defined(__ANDROID__) && !defined(__APPLE__) && !defined(_MSC_VER) && !defined(_WIN32)
MaskToStr(cpu_set_t * mask)69 std::string MaskToStr(cpu_set_t *mask) {
70   std::stringstream ss;
71   size_t cpu_num = static_cast<size_t>(sysconf(_SC_NPROCESSORS_ONLN));
72   for (size_t i = 0; i < cpu_num; i++) {
73     ss << (CPU_ISSET(i, mask) ? "1" : "0");
74   }
75   return ss.str();
76 }
77 #endif
78 
SetAffinity()79 void Worker::SetAffinity() {
80 #ifdef _WIN32
81   SetWindowsSelfAffinity(core_id_);
82 #elif defined(BIND_CORE)
83 #if defined(__ANDROID__) || defined(MS_COMPILE_OHOS)
84   THREAD_INFO("thread: %d, mask: %lu", gettid(), mask_.__bits[0]);
85   int ret = sched_setaffinity(gettid(), sizeof(cpu_set_t), &mask_);
86   if (ret != THREAD_OK) {
87     THREAD_ERROR("bind thread %d to cpu failed. ERROR %d", gettid(), errno);
88   }
89   return;
90 #else
91 #if !defined(__APPLE__) && !defined(_MSC_VER)
92 
93   THREAD_INFO("Worker pthread_setaffinity_np, mask %s", MaskToStr(&mask_));
94   int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask_);
95   if (ret != THREAD_OK) {
96     THREAD_ERROR("bind thread %lu to cpu failed. ERROR %d", pthread_self(), errno);
97   }
98   return;
99 #endif
100 #endif
101 #endif
102 }
103 
InitWorkerMask(const std::vector<int> & core_list,const size_t workers_size)104 void Worker::InitWorkerMask(const std::vector<int> &core_list, const size_t workers_size) {
105   core_list_ = core_list;
106 #ifdef _WIN32
107   static uint32_t windows_core_index = 0;
108   core_id_ = windows_core_index++;
109 #elif defined(BIND_CORE)
110   if (core_list.empty()) {
111     return;
112   }
113   cpu_set_t mask;
114   CPU_ZERO(&mask);
115   if (core_list.size() > 0) {
116     CPU_SET(core_list[workers_size % core_list.size()], &mask);
117   }
118   this->set_mask(mask);
119 #endif
120   return;
121 }
122 
Run()123 void Worker::Run() {
124   if (!core_list_.empty()) {
125     SetAffinity();
126   }
127 #if !defined(__APPLE__) && !defined(_MSC_VER)
128   static std::atomic_int index = {0};
129   (void)pthread_setname_np(pthread_self(), ("OS_Kernel_" + std::to_string(index++)).c_str());
130 #endif
131 #ifdef PLATFORM_86
132   // Some CPU kernels need set the flush zero mode to improve performance.
133   _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
134   _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON);
135 #endif
136   while (alive_) {
137     if (RunLocalKernelTask()) {
138       spin_count_ = 0;
139     } else {
140       RunOtherKernelTask();
141       YieldAndDeactive();
142     }
143     if (spin_count_ > max_spin_count_) {
144       WaitUntilActive();
145       spin_count_ = 1;
146     }
147   }
148 }
149 
TryRunTask(TaskSplit * task_split)150 bool Worker::TryRunTask(TaskSplit *task_split) {
151   if (task_split == nullptr) {
152     return false;
153   }
154   auto task = task_split->task_;
155   auto task_id = task_split->task_id_;
156   task->status |= task->func(task->content, task_id, lhs_scale_, rhs_scale_);
157   (void)++task->finished;
158   return true;
159 }
160 
RunLocalKernelTask()161 bool Worker::RunLocalKernelTask() {
162   bool res = false;
163   Task *task = task_.load(std::memory_order_consume);
164   if (task != nullptr) {
165     int task_id = task_id_.load(std::memory_order_consume);
166     task->status |= task->func(task->content, task_id, lhs_scale_, rhs_scale_);
167     task_.store(nullptr, std::memory_order_relaxed);
168     (void)++task->finished;
169     res |= true;
170   }
171 
172   while (!local_task_queue_->Empty()) {
173     auto task_split = local_task_queue_->Dequeue();
174     res |= TryRunTask(task_split);
175   }
176   return res;
177 }
178 
RunOtherKernelTask()179 void Worker::RunOtherKernelTask() {
180   if (pool_ == nullptr || pool_->actor_thread_num() <= kMinActorRunOther) {
181     return;
182   }
183   auto queues_length = pool_->task_queues().size();
184   for (size_t i = 0; i < queues_length; ++i) {
185     size_t index = (worker_id_ + i + 1) % queues_length;
186     while (!pool_->task_queues()[index]->Empty()) {
187       auto task_split = pool_->task_queues()[index]->Dequeue();
188       if (TryRunTask(task_split)) {
189         return;
190       }
191     }
192   }
193 }
194 
YieldAndDeactive()195 void Worker::YieldAndDeactive() {
196   // deactivate this worker only on the first entry
197   if (spin_count_ == 0) {
198     std::lock_guard<std::mutex> _l(mutex_);
199     if (local_task_queue_->Empty()) {
200       status_.store(kThreadIdle);
201     } else {
202       return;
203     }
204   }
205   spin_count_++;
206   std::this_thread::yield();
207 }
208 
WaitUntilActive()209 void Worker::WaitUntilActive() {
210   std::unique_lock<std::mutex> _l(mutex_);
211   cond_var_->wait(_l, [&] { return status_ == kThreadBusy || active_num_ > 0 || !alive_; });
212   if (active_num_ > 0) {
213     active_num_--;
214   }
215   // When active_num > 0, status = kThreadIdle, a task may enqueue,
216   // because of spint_count = 0, the status may switch to kThreadIdle without handle this task,
217   // then a new task may enqueue to override the old one, which cause task missed.
218   // So, after wait, the status_ should be kThreadBusy.
219   status_.store(kThreadBusy);
220 }
221 
set_scale(float lhs_scale,float rhs_scale)222 void Worker::set_scale(float lhs_scale, float rhs_scale) {
223   lhs_scale_ = lhs_scale;
224   rhs_scale_ = rhs_scale;
225 }
226 
Active(std::vector<TaskSplit> * task_list,int task_id_start,int task_id_end)227 void Worker::Active(std::vector<TaskSplit> *task_list, int task_id_start, int task_id_end) {
228   {
229     std::lock_guard<std::mutex> _l(mutex_);
230     // add the first to task_, and others to queue.
231     status_ = kThreadBusy;
232     Task *task = task_.load(std::memory_order_consume);
233     int to_atomic_task = 0;
234     if (task == nullptr) {
235       task_id_.store(task_id_start, std::memory_order_relaxed);
236       THREAD_TEST_TRUE(task_ == nullptr);
237       task_.store((*task_list)[0].task_, std::memory_order_release);
238       to_atomic_task = 1;
239     }
240     for (int i = task_id_start + to_atomic_task; i < task_id_end; ++i) {
241       while (!local_task_queue_->Enqueue(&(*task_list)[i])) {
242       }
243     }
244     status_ = kThreadBusy;
245   }
246   cond_var_->notify_one();
247 }
248 
Active()249 void Worker::Active() {
250   if (active_num_ > 0) {
251     return;
252   }
253   {
254     std::lock_guard<std::mutex> _l(mutex_);
255     active_num_++;
256     status_ = kThreadBusy;
257   }
258   cond_var_->notify_one();
259 }
260 
available()261 bool Worker::available() {
262   int expected = kThreadIdle;
263   return status_.compare_exchange_strong(expected, kThreadHeld);
264 }
265 
~ThreadPool()266 ThreadPool::~ThreadPool() {
267   for (auto &worker : workers_) {
268     delete worker;
269     worker = nullptr;
270   }
271   workers_.clear();
272 
273   if (affinity_ != nullptr) {
274     delete affinity_;
275     affinity_ = nullptr;
276   }
277 
278   for (auto &task_queue : task_queues_) {
279     task_queue->Clean();
280   }
281   task_queues_.clear();
282   THREAD_INFO("destruct success");
283 }
284 
TaskQueuesInit(size_t thread_num)285 int ThreadPool::TaskQueuesInit(size_t thread_num) {
286   for (size_t i = 0; i < thread_num; ++i) {
287     (void)task_queues_.emplace_back(std::make_unique<HQueue<TaskSplit>>());
288   }
289   for (size_t i = 0; i < thread_num; ++i) {
290     if (task_queues_[i]->Init(kMaxHqueueSize) != true) {
291       THREAD_ERROR("init task queue failed.");
292       return THREAD_ERROR;
293     }
294   }
295   THREAD_INFO("init task queues success.");
296   return THREAD_OK;
297 }
298 
ParallelLaunch(const Func & func,Content content,int task_num)299 int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num) {
300   // if single thread, run master thread
301   if (task_num <= 1) {
302     return SyncRunFunc(func, content, 0, task_num);
303   }
304 
305   // distribute task to the KernelThread and the idle ActorThread,
306   // if the task num is greater than the KernelThread num
307   THREAD_DEBUG("launch: %d", task_num);
308   Task task = {func, content};
309   std::vector<TaskSplit> task_list;
310   for (int i = 0; i < task_num; ++i) {
311     (void)task_list.emplace_back(TaskSplit{&task, i});
312   }
313   Worker *curr = CurrentWorker();
314   DistributeTask(&task_list, &task, task_num, curr);
315   // synchronization
316   // wait until the finished is equal to task_num
317   while (task.finished != task_num) {
318     if (curr != nullptr) {
319       (void)curr->RunLocalKernelTask();
320     }
321     std::this_thread::yield();
322   }
323   // check the return value of task
324   if (task.status != THREAD_OK) {
325     return THREAD_ERROR;
326   }
327   return THREAD_OK;
328 }
329 
SyncRunTask(Task * task,int start_num,int task_num) const330 void ThreadPool::SyncRunTask(Task *task, int start_num, int task_num) const {
331   // run task sequentially
332   // if the current thread is not the actor thread
333   float per_scale = kMaxScale / (task_num - start_num);
334   for (int i = start_num; i < task_num; ++i) {
335     float lhs_scale = i * per_scale;
336     float rhs_scale = (i + 1) * per_scale;
337     rhs_scale = i == task_num - 1 ? kMaxScale : rhs_scale;
338     task->status |= task->func(task->content, i, lhs_scale, rhs_scale);
339     (void)++task->finished;
340   }
341 }
342 
SyncRunFunc(const Func & func,Content content,int start,int end) const343 int ThreadPool::SyncRunFunc(const Func &func, Content content, int start, int end) const {
344   for (int i = start; i < end; ++i) {
345     int ret = func(content, i, 0, 1);
346     if (ret != 0) {
347       return ret;
348     }
349   }
350   return THREAD_OK;
351 }
352 
DistributeTask(std::vector<TaskSplit> * task_list,Task * task,int task_num,Worker * curr) const353 void ThreadPool::DistributeTask(std::vector<TaskSplit> *task_list, Task *task, int task_num, Worker *curr) const {
354   int sum_frequency = 0;
355   std::vector<Worker *> assigned;
356   assigned.reserve(task_num);
357   int num = static_cast<int>(workers_.size()) - 1;
358   int offset = 0;
359   bool use_curr = (curr != nullptr);
360   // if the current thread isn't nullptr, that is the curr is a ActorThread,
361   // then assign (task_num - 1) tasks to workers, and run the last one by itself
362   int num_assigned = use_curr ? task_num - 1 : task_num;
363   int count = 0;
364 
365   if (!occupied_actor_thread_) {
366     offset = static_cast<int>(actor_thread_num_);
367   }
368 
369   for (int i = num; i >= offset && count < num_assigned; --i) {
370     if (workers_[i]->available()) {
371       assigned.push_back(workers_[i]);
372       sum_frequency += workers_[i]->frequency();
373       (void)++count;
374     }
375   }
376 
377   if (use_curr) {
378     assigned.push_back(curr);
379     sum_frequency += curr->frequency();
380   } else if (assigned.size() != static_cast<size_t>(task_num)) {
381     CalculateScales(assigned, sum_frequency);
382     ActiveWorkers(assigned, task_list, assigned.size(), curr);
383     SyncRunTask(task, assigned.size(), task_num);
384     return;
385   }
386 
387   CalculateScales(assigned, sum_frequency);
388   ActiveWorkers(assigned, task_list, task_num, curr);
389 }
390 
CalculateScales(const std::vector<Worker * > & assigned,int sum_frequency) const391 void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_frequency) const {
392   // divide task according to computing power(core frequency)
393   float lhs_scale = 0;
394   float rhs_scale = 0;
395   if (sum_frequency == 0) {
396     return;
397   }
398   for (const auto &worker : assigned) {
399     THREAD_RETURN_IF_NULL(worker);
400     rhs_scale += worker->frequency() * 1.0 / sum_frequency;
401     rhs_scale = rhs_scale < 1 ? rhs_scale : 1;
402     worker->set_scale(lhs_scale, rhs_scale);
403     lhs_scale = rhs_scale;
404   }
405 }
406 
ActiveWorkers(const std::vector<Worker * > & workers,std::vector<TaskSplit> * task_list,int task_num,const Worker * curr) const407 void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, std::vector<TaskSplit> *task_list, int task_num,
408                                const Worker *curr) const {
409   // recalculate task num for each worker.
410   int worker_num = static_cast<int>(workers.size());
411   if (worker_num > 0) {
412     int each_worker_task_num = task_num / worker_num;
413     int rest_task_num = task_num % worker_num;
414     int start = 0;
415     int end;
416     for (int i = 0; i < worker_num; ++i) {
417       Worker *worker = workers[i];
418       THREAD_RETURN_IF_NULL(worker);
419       if (i < rest_task_num) {
420         end = start + each_worker_task_num + 1;
421       } else {
422         end = start + each_worker_task_num;
423       }
424       worker->Active(task_list, start, end);
425       if (worker == curr) {
426         (void)worker->RunLocalKernelTask();
427       }
428       start = end;
429     }
430   }
431 }
432 
ActiveWorkers()433 void ThreadPool::ActiveWorkers() {
434   for (auto &worker : workers_) {
435     worker->Active();
436   }
437 }
438 
CurrentWorker(size_t * index) const439 Worker *ThreadPool::CurrentWorker(size_t *index) const {
440   for (*index = 0; *index < workers_.size(); (*index)++) {
441     if (workers_[*index]->thread_id() == std::this_thread::get_id()) {
442       return workers_[*index];
443     }
444   }
445   return nullptr;
446 }
447 
CurrentWorker() const448 Worker *ThreadPool::CurrentWorker() const {
449   for (const auto &worker : workers_) {
450     if (worker->thread_id() == std::this_thread::get_id()) {
451       return worker;
452     }
453   }
454   return nullptr;
455 }
456 
InitAffinityInfo()457 int ThreadPool::InitAffinityInfo() {
458 #ifdef BIND_CORE
459   affinity_ = new (std::nothrow) CoreAffinity();
460   THREAD_ERROR_IF_NULL(affinity_);
461   int ret = affinity_->InitHardwareCoreInfo();
462   if (ret != THREAD_OK) {
463     delete affinity_;
464     affinity_ = nullptr;
465     return THREAD_ERROR;
466   }
467 #endif
468 
469   server_cpu_frequence = CoreAffinity::GetServerFrequency() / 1000.0f;  // 1GHz = 1000MHz
470 
471   return THREAD_OK;
472 }
473 
SetCpuAffinity(BindMode bind_mode)474 int ThreadPool::SetCpuAffinity(BindMode bind_mode) {
475   if (workers_.empty()) {
476     return THREAD_ERROR;
477   }
478   if (affinity_ != nullptr) {
479     return affinity_->BindThreads(workers_, bind_mode);
480   }
481   return THREAD_OK;
482 }
483 
SetCpuAffinity(const std::vector<int> & core_list)484 int ThreadPool::SetCpuAffinity(const std::vector<int> &core_list) {
485   if (workers_.empty()) {
486     return THREAD_ERROR;
487   }
488   if (affinity_ != nullptr) {
489     return affinity_->BindThreads(workers_, core_list);
490   }
491   return THREAD_OK;
492 }
493 
SetProcessAffinity(BindMode bind_mode) const494 int ThreadPool::SetProcessAffinity(BindMode bind_mode) const {
495   if (affinity_ != nullptr) {
496     return affinity_->BindProcess(bind_mode);
497   }
498   return THREAD_OK;
499 }
500 
SetKernelThreadMaxSpinCount(int spin_count)501 void ThreadPool::SetKernelThreadMaxSpinCount(int spin_count) {
502   size_t num = workers_.size() - 1;
503   for (size_t i = num; i >= actor_thread_num_; i--) {
504     THREAD_RETURN_IF_NULL(workers_[i]);
505     workers_[i]->SetMaxSpinCount(spin_count);
506   }
507 }
508 
SetSpinCountMaxValue()509 void ThreadPool::SetSpinCountMaxValue() {
510   for (auto worker : workers_) {
511     THREAD_RETURN_IF_NULL(worker);
512     worker->SetMaxSpinCount(max_spin_count_);
513   }
514   return;
515 }
516 
SetSpinCountMinValue()517 void ThreadPool::SetSpinCountMinValue() {
518   for (auto worker : workers_) {
519     THREAD_RETURN_IF_NULL(worker);
520     worker->SetMaxSpinCount(min_spin_count_);
521   }
522   return;
523 }
524 
SetMaxSpinCount(int spin_count)525 void ThreadPool::SetMaxSpinCount(int spin_count) {
526   if (spin_count <= 0) {
527     return;
528   }
529   max_spin_count_ = spin_count;
530 }
531 
SetMinSpinCount(int spin_count)532 void ThreadPool::SetMinSpinCount(int spin_count) {
533   if (spin_count <= 0) {
534     return;
535   }
536   min_spin_count_ = spin_count;
537 }
538 
CreateThreadPool(size_t thread_num,const std::vector<int> & core_list)539 ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, const std::vector<int> &core_list) {
540   std::lock_guard<std::mutex> lock(create_thread_pool_muntex_);
541   ThreadPool *pool = new (std::nothrow) ThreadPool();
542   if (pool == nullptr) {
543     return nullptr;
544   }
545   if (pool->TaskQueuesInit(thread_num) != THREAD_OK) {
546     delete pool;
547     return nullptr;
548   }
549   int ret = pool->CreateThreads<Worker>(thread_num, core_list);
550   if (ret != THREAD_OK) {
551     delete pool;
552     return nullptr;
553   }
554   ret = pool->InitAffinityInfo();
555   if (ret != THREAD_OK) {
556     delete pool;
557     return nullptr;
558   }
559   return pool;
560 }
561 
SetWorkerIdMap()562 void ThreadPool::SetWorkerIdMap() {
563   for (size_t i = 0; i < workers_.size(); ++i) {
564     auto thread_id = workers_[i]->thread_id();
565     worker_ids_[thread_id] = i;
566   }
567   return;
568 }
569 
ChildAfterFork()570 void ThreadPool::ChildAfterFork() {
571   THREAD_INFO("ThreadPool reinitialize workers after fork");
572   for (auto &worker : workers_) {
573     worker->ChildAfterFork();
574   }
575 }
576 }  // namespace mindspore
577