1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #ifndef _MSC_VER
17 #include <sched.h>
18 #include <unistd.h>
19 #endif
20 #include <sstream>
21 #include "thread/threadpool.h"
22 #include "thread/core_affinity.h"
23 #if !defined(_WIN32) && !defined(BUILD_LITE)
24 #include "include/fork_utils.h"
25 #endif
26
27 namespace mindspore {
28 std::mutex ThreadPool::create_thread_pool_muntex_;
29
~Worker()30 Worker::~Worker() {
31 {
32 std::lock_guard<std::mutex> _l(mutex_);
33 alive_ = false;
34 }
35 cond_var_->notify_one();
36
37 bool terminate = false;
38 int count = 0;
39 while (local_task_queue_ && !terminate && count++ < kMaxCount) {
40 terminate = local_task_queue_->Empty();
41 if (!terminate) {
42 auto task_split = local_task_queue_->Dequeue();
43 (void)TryRunTask(task_split);
44 }
45 }
46
47 if (thread_->joinable()) {
48 thread_->join();
49 }
50 pool_ = nullptr;
51 local_task_queue_ = nullptr;
52 }
53
CreateThread()54 void Worker::CreateThread() { thread_ = std::make_unique<std::thread>(&Worker::Run, this); }
55
ChildAfterFork()56 void Worker::ChildAfterFork() {
57 THREAD_INFO("worker %zu recreate thread after fork in child process", worker_id_);
58 if (cond_var_ != nullptr) {
59 (void)cond_var_.release();
60 cond_var_ = std::make_unique<std::condition_variable>();
61 }
62 if (thread_ != nullptr) {
63 (void)thread_.release();
64 CreateThread();
65 }
66 }
67
68 #if defined(BIND_CORE) && !defined(__ANDROID__) && !defined(__APPLE__) && !defined(_MSC_VER) && !defined(_WIN32)
MaskToStr(cpu_set_t * mask)69 std::string MaskToStr(cpu_set_t *mask) {
70 std::stringstream ss;
71 size_t cpu_num = static_cast<size_t>(sysconf(_SC_NPROCESSORS_ONLN));
72 for (size_t i = 0; i < cpu_num; i++) {
73 ss << (CPU_ISSET(i, mask) ? "1" : "0");
74 }
75 return ss.str();
76 }
77 #endif
78
SetAffinity()79 void Worker::SetAffinity() {
80 #ifdef _WIN32
81 SetWindowsSelfAffinity(core_id_);
82 #elif defined(BIND_CORE)
83 #if defined(__ANDROID__) || defined(MS_COMPILE_OHOS)
84 THREAD_INFO("thread: %d, mask: %lu", gettid(), mask_.__bits[0]);
85 int ret = sched_setaffinity(gettid(), sizeof(cpu_set_t), &mask_);
86 if (ret != THREAD_OK) {
87 THREAD_ERROR("bind thread %d to cpu failed. ERROR %d", gettid(), errno);
88 }
89 return;
90 #else
91 #if !defined(__APPLE__) && !defined(_MSC_VER)
92
93 THREAD_INFO("Worker pthread_setaffinity_np, mask %s", MaskToStr(&mask_));
94 int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask_);
95 if (ret != THREAD_OK) {
96 THREAD_ERROR("bind thread %lu to cpu failed. ERROR %d", pthread_self(), errno);
97 }
98 return;
99 #endif
100 #endif
101 #endif
102 }
103
InitWorkerMask(const std::vector<int> & core_list,const size_t workers_size)104 void Worker::InitWorkerMask(const std::vector<int> &core_list, const size_t workers_size) {
105 core_list_ = core_list;
106 #ifdef _WIN32
107 static uint32_t windows_core_index = 0;
108 core_id_ = windows_core_index++;
109 #elif defined(BIND_CORE)
110 if (core_list.empty()) {
111 return;
112 }
113 cpu_set_t mask;
114 CPU_ZERO(&mask);
115 if (core_list.size() > 0) {
116 CPU_SET(core_list[workers_size % core_list.size()], &mask);
117 }
118 this->set_mask(mask);
119 #endif
120 return;
121 }
122
Run()123 void Worker::Run() {
124 if (!core_list_.empty()) {
125 SetAffinity();
126 }
127 #if !defined(__APPLE__) && !defined(_MSC_VER)
128 static std::atomic_int index = {0};
129 (void)pthread_setname_np(pthread_self(), ("OS_Kernel_" + std::to_string(index++)).c_str());
130 #endif
131 #ifdef PLATFORM_86
132 // Some CPU kernels need set the flush zero mode to improve performance.
133 _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
134 _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON);
135 #endif
136 while (alive_) {
137 if (RunLocalKernelTask()) {
138 spin_count_ = 0;
139 } else {
140 RunOtherKernelTask();
141 YieldAndDeactive();
142 }
143 if (spin_count_ > max_spin_count_) {
144 WaitUntilActive();
145 spin_count_ = 1;
146 }
147 }
148 }
149
TryRunTask(TaskSplit * task_split)150 bool Worker::TryRunTask(TaskSplit *task_split) {
151 if (task_split == nullptr) {
152 return false;
153 }
154 auto task = task_split->task_;
155 auto task_id = task_split->task_id_;
156 task->status |= task->func(task->content, task_id, lhs_scale_, rhs_scale_);
157 (void)++task->finished;
158 return true;
159 }
160
RunLocalKernelTask()161 bool Worker::RunLocalKernelTask() {
162 bool res = false;
163 Task *task = task_.load(std::memory_order_consume);
164 if (task != nullptr) {
165 int task_id = task_id_.load(std::memory_order_consume);
166 task->status |= task->func(task->content, task_id, lhs_scale_, rhs_scale_);
167 task_.store(nullptr, std::memory_order_relaxed);
168 (void)++task->finished;
169 res |= true;
170 }
171
172 while (!local_task_queue_->Empty()) {
173 auto task_split = local_task_queue_->Dequeue();
174 res |= TryRunTask(task_split);
175 }
176 return res;
177 }
178
RunOtherKernelTask()179 void Worker::RunOtherKernelTask() {
180 if (pool_ == nullptr || pool_->actor_thread_num() <= kMinActorRunOther) {
181 return;
182 }
183 auto queues_length = pool_->task_queues().size();
184 for (size_t i = 0; i < queues_length; ++i) {
185 size_t index = (worker_id_ + i + 1) % queues_length;
186 while (!pool_->task_queues()[index]->Empty()) {
187 auto task_split = pool_->task_queues()[index]->Dequeue();
188 if (TryRunTask(task_split)) {
189 return;
190 }
191 }
192 }
193 }
194
YieldAndDeactive()195 void Worker::YieldAndDeactive() {
196 // deactivate this worker only on the first entry
197 if (spin_count_ == 0) {
198 std::lock_guard<std::mutex> _l(mutex_);
199 if (local_task_queue_->Empty()) {
200 status_.store(kThreadIdle);
201 } else {
202 return;
203 }
204 }
205 spin_count_++;
206 std::this_thread::yield();
207 }
208
WaitUntilActive()209 void Worker::WaitUntilActive() {
210 std::unique_lock<std::mutex> _l(mutex_);
211 cond_var_->wait(_l, [&] { return status_ == kThreadBusy || active_num_ > 0 || !alive_; });
212 if (active_num_ > 0) {
213 active_num_--;
214 }
215 // When active_num > 0, status = kThreadIdle, a task may enqueue,
216 // because of spint_count = 0, the status may switch to kThreadIdle without handle this task,
217 // then a new task may enqueue to override the old one, which cause task missed.
218 // So, after wait, the status_ should be kThreadBusy.
219 status_.store(kThreadBusy);
220 }
221
set_scale(float lhs_scale,float rhs_scale)222 void Worker::set_scale(float lhs_scale, float rhs_scale) {
223 lhs_scale_ = lhs_scale;
224 rhs_scale_ = rhs_scale;
225 }
226
Active(std::vector<TaskSplit> * task_list,int task_id_start,int task_id_end)227 void Worker::Active(std::vector<TaskSplit> *task_list, int task_id_start, int task_id_end) {
228 {
229 std::lock_guard<std::mutex> _l(mutex_);
230 // add the first to task_, and others to queue.
231 status_ = kThreadBusy;
232 Task *task = task_.load(std::memory_order_consume);
233 int to_atomic_task = 0;
234 if (task == nullptr) {
235 task_id_.store(task_id_start, std::memory_order_relaxed);
236 THREAD_TEST_TRUE(task_ == nullptr);
237 task_.store((*task_list)[0].task_, std::memory_order_release);
238 to_atomic_task = 1;
239 }
240 for (int i = task_id_start + to_atomic_task; i < task_id_end; ++i) {
241 while (!local_task_queue_->Enqueue(&(*task_list)[i])) {
242 }
243 }
244 status_ = kThreadBusy;
245 }
246 cond_var_->notify_one();
247 }
248
Active()249 void Worker::Active() {
250 if (active_num_ > 0) {
251 return;
252 }
253 {
254 std::lock_guard<std::mutex> _l(mutex_);
255 active_num_++;
256 status_ = kThreadBusy;
257 }
258 cond_var_->notify_one();
259 }
260
available()261 bool Worker::available() {
262 int expected = kThreadIdle;
263 return status_.compare_exchange_strong(expected, kThreadHeld);
264 }
265
~ThreadPool()266 ThreadPool::~ThreadPool() {
267 for (auto &worker : workers_) {
268 delete worker;
269 worker = nullptr;
270 }
271 workers_.clear();
272
273 if (affinity_ != nullptr) {
274 delete affinity_;
275 affinity_ = nullptr;
276 }
277
278 for (auto &task_queue : task_queues_) {
279 task_queue->Clean();
280 }
281 task_queues_.clear();
282 THREAD_INFO("destruct success");
283 }
284
TaskQueuesInit(size_t thread_num)285 int ThreadPool::TaskQueuesInit(size_t thread_num) {
286 for (size_t i = 0; i < thread_num; ++i) {
287 (void)task_queues_.emplace_back(std::make_unique<HQueue<TaskSplit>>());
288 }
289 for (size_t i = 0; i < thread_num; ++i) {
290 if (task_queues_[i]->Init(kMaxHqueueSize) != true) {
291 THREAD_ERROR("init task queue failed.");
292 return THREAD_ERROR;
293 }
294 }
295 THREAD_INFO("init task queues success.");
296 return THREAD_OK;
297 }
298
ParallelLaunch(const Func & func,Content content,int task_num)299 int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num) {
300 // if single thread, run master thread
301 if (task_num <= 1) {
302 return SyncRunFunc(func, content, 0, task_num);
303 }
304
305 // distribute task to the KernelThread and the idle ActorThread,
306 // if the task num is greater than the KernelThread num
307 THREAD_DEBUG("launch: %d", task_num);
308 Task task = {func, content};
309 std::vector<TaskSplit> task_list;
310 for (int i = 0; i < task_num; ++i) {
311 (void)task_list.emplace_back(TaskSplit{&task, i});
312 }
313 Worker *curr = CurrentWorker();
314 DistributeTask(&task_list, &task, task_num, curr);
315 // synchronization
316 // wait until the finished is equal to task_num
317 while (task.finished != task_num) {
318 if (curr != nullptr) {
319 (void)curr->RunLocalKernelTask();
320 }
321 std::this_thread::yield();
322 }
323 // check the return value of task
324 if (task.status != THREAD_OK) {
325 return THREAD_ERROR;
326 }
327 return THREAD_OK;
328 }
329
SyncRunTask(Task * task,int start_num,int task_num) const330 void ThreadPool::SyncRunTask(Task *task, int start_num, int task_num) const {
331 // run task sequentially
332 // if the current thread is not the actor thread
333 float per_scale = kMaxScale / (task_num - start_num);
334 for (int i = start_num; i < task_num; ++i) {
335 float lhs_scale = i * per_scale;
336 float rhs_scale = (i + 1) * per_scale;
337 rhs_scale = i == task_num - 1 ? kMaxScale : rhs_scale;
338 task->status |= task->func(task->content, i, lhs_scale, rhs_scale);
339 (void)++task->finished;
340 }
341 }
342
SyncRunFunc(const Func & func,Content content,int start,int end) const343 int ThreadPool::SyncRunFunc(const Func &func, Content content, int start, int end) const {
344 for (int i = start; i < end; ++i) {
345 int ret = func(content, i, 0, 1);
346 if (ret != 0) {
347 return ret;
348 }
349 }
350 return THREAD_OK;
351 }
352
DistributeTask(std::vector<TaskSplit> * task_list,Task * task,int task_num,Worker * curr) const353 void ThreadPool::DistributeTask(std::vector<TaskSplit> *task_list, Task *task, int task_num, Worker *curr) const {
354 int sum_frequency = 0;
355 std::vector<Worker *> assigned;
356 assigned.reserve(task_num);
357 int num = static_cast<int>(workers_.size()) - 1;
358 int offset = 0;
359 bool use_curr = (curr != nullptr);
360 // if the current thread isn't nullptr, that is the curr is a ActorThread,
361 // then assign (task_num - 1) tasks to workers, and run the last one by itself
362 int num_assigned = use_curr ? task_num - 1 : task_num;
363 int count = 0;
364
365 if (!occupied_actor_thread_) {
366 offset = static_cast<int>(actor_thread_num_);
367 }
368
369 for (int i = num; i >= offset && count < num_assigned; --i) {
370 if (workers_[i]->available()) {
371 assigned.push_back(workers_[i]);
372 sum_frequency += workers_[i]->frequency();
373 (void)++count;
374 }
375 }
376
377 if (use_curr) {
378 assigned.push_back(curr);
379 sum_frequency += curr->frequency();
380 } else if (assigned.size() != static_cast<size_t>(task_num)) {
381 CalculateScales(assigned, sum_frequency);
382 ActiveWorkers(assigned, task_list, assigned.size(), curr);
383 SyncRunTask(task, assigned.size(), task_num);
384 return;
385 }
386
387 CalculateScales(assigned, sum_frequency);
388 ActiveWorkers(assigned, task_list, task_num, curr);
389 }
390
CalculateScales(const std::vector<Worker * > & assigned,int sum_frequency) const391 void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_frequency) const {
392 // divide task according to computing power(core frequency)
393 float lhs_scale = 0;
394 float rhs_scale = 0;
395 if (sum_frequency == 0) {
396 return;
397 }
398 for (const auto &worker : assigned) {
399 THREAD_RETURN_IF_NULL(worker);
400 rhs_scale += worker->frequency() * 1.0 / sum_frequency;
401 rhs_scale = rhs_scale < 1 ? rhs_scale : 1;
402 worker->set_scale(lhs_scale, rhs_scale);
403 lhs_scale = rhs_scale;
404 }
405 }
406
ActiveWorkers(const std::vector<Worker * > & workers,std::vector<TaskSplit> * task_list,int task_num,const Worker * curr) const407 void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, std::vector<TaskSplit> *task_list, int task_num,
408 const Worker *curr) const {
409 // recalculate task num for each worker.
410 int worker_num = static_cast<int>(workers.size());
411 if (worker_num > 0) {
412 int each_worker_task_num = task_num / worker_num;
413 int rest_task_num = task_num % worker_num;
414 int start = 0;
415 int end;
416 for (int i = 0; i < worker_num; ++i) {
417 Worker *worker = workers[i];
418 THREAD_RETURN_IF_NULL(worker);
419 if (i < rest_task_num) {
420 end = start + each_worker_task_num + 1;
421 } else {
422 end = start + each_worker_task_num;
423 }
424 worker->Active(task_list, start, end);
425 if (worker == curr) {
426 (void)worker->RunLocalKernelTask();
427 }
428 start = end;
429 }
430 }
431 }
432
ActiveWorkers()433 void ThreadPool::ActiveWorkers() {
434 for (auto &worker : workers_) {
435 worker->Active();
436 }
437 }
438
CurrentWorker(size_t * index) const439 Worker *ThreadPool::CurrentWorker(size_t *index) const {
440 for (*index = 0; *index < workers_.size(); (*index)++) {
441 if (workers_[*index]->thread_id() == std::this_thread::get_id()) {
442 return workers_[*index];
443 }
444 }
445 return nullptr;
446 }
447
CurrentWorker() const448 Worker *ThreadPool::CurrentWorker() const {
449 for (const auto &worker : workers_) {
450 if (worker->thread_id() == std::this_thread::get_id()) {
451 return worker;
452 }
453 }
454 return nullptr;
455 }
456
InitAffinityInfo()457 int ThreadPool::InitAffinityInfo() {
458 #ifdef BIND_CORE
459 affinity_ = new (std::nothrow) CoreAffinity();
460 THREAD_ERROR_IF_NULL(affinity_);
461 int ret = affinity_->InitHardwareCoreInfo();
462 if (ret != THREAD_OK) {
463 delete affinity_;
464 affinity_ = nullptr;
465 return THREAD_ERROR;
466 }
467 #endif
468
469 server_cpu_frequence = CoreAffinity::GetServerFrequency() / 1000.0f; // 1GHz = 1000MHz
470
471 return THREAD_OK;
472 }
473
SetCpuAffinity(BindMode bind_mode)474 int ThreadPool::SetCpuAffinity(BindMode bind_mode) {
475 if (workers_.empty()) {
476 return THREAD_ERROR;
477 }
478 if (affinity_ != nullptr) {
479 return affinity_->BindThreads(workers_, bind_mode);
480 }
481 return THREAD_OK;
482 }
483
SetCpuAffinity(const std::vector<int> & core_list)484 int ThreadPool::SetCpuAffinity(const std::vector<int> &core_list) {
485 if (workers_.empty()) {
486 return THREAD_ERROR;
487 }
488 if (affinity_ != nullptr) {
489 return affinity_->BindThreads(workers_, core_list);
490 }
491 return THREAD_OK;
492 }
493
SetProcessAffinity(BindMode bind_mode) const494 int ThreadPool::SetProcessAffinity(BindMode bind_mode) const {
495 if (affinity_ != nullptr) {
496 return affinity_->BindProcess(bind_mode);
497 }
498 return THREAD_OK;
499 }
500
SetKernelThreadMaxSpinCount(int spin_count)501 void ThreadPool::SetKernelThreadMaxSpinCount(int spin_count) {
502 size_t num = workers_.size() - 1;
503 for (size_t i = num; i >= actor_thread_num_; i--) {
504 THREAD_RETURN_IF_NULL(workers_[i]);
505 workers_[i]->SetMaxSpinCount(spin_count);
506 }
507 }
508
SetSpinCountMaxValue()509 void ThreadPool::SetSpinCountMaxValue() {
510 for (auto worker : workers_) {
511 THREAD_RETURN_IF_NULL(worker);
512 worker->SetMaxSpinCount(max_spin_count_);
513 }
514 return;
515 }
516
SetSpinCountMinValue()517 void ThreadPool::SetSpinCountMinValue() {
518 for (auto worker : workers_) {
519 THREAD_RETURN_IF_NULL(worker);
520 worker->SetMaxSpinCount(min_spin_count_);
521 }
522 return;
523 }
524
SetMaxSpinCount(int spin_count)525 void ThreadPool::SetMaxSpinCount(int spin_count) {
526 if (spin_count <= 0) {
527 return;
528 }
529 max_spin_count_ = spin_count;
530 }
531
SetMinSpinCount(int spin_count)532 void ThreadPool::SetMinSpinCount(int spin_count) {
533 if (spin_count <= 0) {
534 return;
535 }
536 min_spin_count_ = spin_count;
537 }
538
CreateThreadPool(size_t thread_num,const std::vector<int> & core_list)539 ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, const std::vector<int> &core_list) {
540 std::lock_guard<std::mutex> lock(create_thread_pool_muntex_);
541 ThreadPool *pool = new (std::nothrow) ThreadPool();
542 if (pool == nullptr) {
543 return nullptr;
544 }
545 if (pool->TaskQueuesInit(thread_num) != THREAD_OK) {
546 delete pool;
547 return nullptr;
548 }
549 int ret = pool->CreateThreads<Worker>(thread_num, core_list);
550 if (ret != THREAD_OK) {
551 delete pool;
552 return nullptr;
553 }
554 ret = pool->InitAffinityInfo();
555 if (ret != THREAD_OK) {
556 delete pool;
557 return nullptr;
558 }
559 return pool;
560 }
561
SetWorkerIdMap()562 void ThreadPool::SetWorkerIdMap() {
563 for (size_t i = 0; i < workers_.size(); ++i) {
564 auto thread_id = workers_[i]->thread_id();
565 worker_ids_[thread_id] = i;
566 }
567 return;
568 }
569
ChildAfterFork()570 void ThreadPool::ChildAfterFork() {
571 THREAD_INFO("ThreadPool reinitialize workers after fork");
572 for (auto &worker : workers_) {
573 worker->ChildAfterFork();
574 }
575 }
576 } // namespace mindspore
577