1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #ifndef _MSC_VER
17 #include <sched.h>
18 #include <unistd.h>
19 #endif
20 #include "thread/threadpool.h"
21 #include "thread/core_affinity.h"
22
23 namespace mindspore {
~Worker()24 Worker::~Worker() {
25 {
26 std::lock_guard<std::mutex> _l(mutex_);
27 alive_ = false;
28 }
29 cond_var_.notify_one();
30 if (thread_.joinable()) {
31 thread_.join();
32 }
33 }
34
CreateThread()35 void Worker::CreateThread() { thread_ = std::thread(&Worker::Run, this); }
36
SetAffinity()37 void Worker::SetAffinity() {
38 #ifdef BIND_CORE
39 #ifdef __ANDROID__
40 int ret = sched_setaffinity(gettid(), sizeof(cpu_set_t), &mask_);
41 if (ret != THREAD_OK) {
42 THREAD_ERROR("bind thread %d to cpu failed. ERROR %d", gettid(), errno);
43 }
44 return;
45 #else
46 #if !defined(__APPLE__) && !defined(SUPPORT_MSVC)
47 int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &mask_);
48 if (ret != THREAD_OK) {
49 THREAD_ERROR("bind thread %lu to cpu failed. ERROR %d", pthread_self(), errno);
50 }
51 return;
52 #endif
53 #endif
54 #endif
55 }
56
Run()57 void Worker::Run() {
58 SetAffinity();
59 #if !defined(__APPLE__) && !defined(SUPPORT_MSVC)
60 static std::atomic_int index = {0};
61 (void)pthread_setname_np(pthread_self(), ("KernelThread_" + std::to_string(index++)).c_str());
62 #endif
63 while (alive_) {
64 if (RunLocalKernelTask()) {
65 spin_count_ = 0;
66 } else {
67 YieldAndDeactive();
68 }
69 if (spin_count_ > max_spin_count_) {
70 WaitUntilActive();
71 spin_count_ = 0;
72 }
73 }
74 }
75
RunLocalKernelTask()76 bool Worker::RunLocalKernelTask() {
77 Task *task = task_.load(std::memory_order_consume);
78 if (task == nullptr) {
79 return false;
80 }
81 int task_id = task_id_.load(std::memory_order_consume);
82 task->status |= task->func(task->content, task_id, lhs_scale_, rhs_scale_);
83 task_.store(nullptr, std::memory_order_relaxed);
84 (void)++task->finished;
85 return true;
86 }
87
YieldAndDeactive()88 void Worker::YieldAndDeactive() {
89 // deactivate this worker only on the first entry
90 if (spin_count_ == 0) {
91 status_.store(kThreadIdle);
92 }
93 spin_count_++;
94 std::this_thread::yield();
95 }
96
WaitUntilActive()97 void Worker::WaitUntilActive() {
98 std::unique_lock<std::mutex> _l(mutex_);
99 cond_var_.wait(_l, [&] { return status_ == kThreadBusy || active_num_ > 0 || !alive_; });
100 active_num_--;
101 }
102
set_scale(float lhs_scale,float rhs_scale)103 void Worker::set_scale(float lhs_scale, float rhs_scale) {
104 lhs_scale_ = lhs_scale;
105 rhs_scale_ = rhs_scale;
106 }
107
Active(Task * task,int task_id)108 void Worker::Active(Task *task, int task_id) {
109 {
110 std::lock_guard<std::mutex> _l(mutex_);
111 task_id_.store(task_id, std::memory_order_relaxed);
112 task_.store(task, std::memory_order_relaxed);
113 status_ = kThreadBusy;
114 }
115 cond_var_.notify_one();
116 }
117
Active()118 void Worker::Active() {
119 {
120 std::lock_guard<std::mutex> _l(mutex_);
121 active_num_++;
122 status_ = kThreadBusy;
123 }
124 cond_var_.notify_one();
125 }
126
available()127 bool Worker::available() {
128 int expected = kThreadIdle;
129 return status_.compare_exchange_strong(expected, kThreadHeld);
130 }
131
~ThreadPool()132 ThreadPool::~ThreadPool() {
133 for (auto &worker : workers_) {
134 delete worker;
135 worker = nullptr;
136 }
137 workers_.clear();
138 delete affinity_;
139 affinity_ = nullptr;
140 THREAD_INFO("destruct success");
141 }
142
CreateThreads(size_t thread_num,const std::vector<int> & core_list)143 int ThreadPool::CreateThreads(size_t thread_num, const std::vector<int> &core_list) {
144 size_t core_num = std::thread::hardware_concurrency();
145 thread_num = thread_num < core_num ? thread_num : core_num;
146 THREAD_INFO("ThreadInfo, Num: [%zu], CoreNum: [%zu]", thread_num, core_num);
147 if (thread_num == 0) {
148 THREAD_INFO("Current thread as working thread.");
149 return THREAD_OK;
150 }
151 std::lock_guard<std::mutex> _l(pool_mutex_);
152 for (size_t i = 0; i < thread_num; ++i) {
153 auto worker = new (std::nothrow) Worker();
154 THREAD_ERROR_IF_NULL(worker);
155 #ifdef BIND_CORE
156 cpu_set_t mask;
157 CPU_ZERO(&mask);
158 if (core_list.size() > 0) {
159 CPU_SET(core_list[workers_.size() % core_list.size()], &mask);
160 }
161 worker->set_mask(mask);
162 #endif
163 worker->CreateThread();
164 workers_.push_back(worker);
165 THREAD_INFO("create kernel thread[%zu]", i);
166 }
167 return THREAD_OK;
168 }
169
ParallelLaunch(const Func & func,Content content,int task_num) const170 int ThreadPool::ParallelLaunch(const Func &func, Content content, int task_num) const {
171 // if single thread, run master thread
172 if (thread_num() <= 1 || task_num <= 1) {
173 for (int i = 0; i < task_num; ++i) {
174 int ret = func(content, i, 0, 1);
175 if (ret != 0) {
176 return ret;
177 }
178 }
179 return THREAD_OK;
180 }
181
182 // distribute task to the KernelThread and the idle ActorThread,
183 // if the task num is greater than the KernelThread num
184 THREAD_DEBUG("launch: %d", task_num);
185 Task task = {func, content};
186
187 DistributeTask(&task, task_num);
188 // synchronization
189 // wait until the finished is equal to task_num
190 while (task.finished != task_num) {
191 std::this_thread::yield();
192 }
193 // check the return value of task
194 if (task.status != THREAD_OK) {
195 return THREAD_ERROR;
196 }
197 return THREAD_OK;
198 }
199
SyncRunTask(Task * task,int start_num,int task_num) const200 void ThreadPool::SyncRunTask(Task *task, int start_num, int task_num) const {
201 // run task sequentially
202 // if the current thread is not the actor thread
203 float per_scale = kMaxScale / (task_num - start_num);
204 for (int i = start_num; i < task_num; ++i) {
205 float lhs_scale = i * per_scale;
206 float rhs_scale = (i + 1) * per_scale;
207 rhs_scale = i == task_num - 1 ? kMaxScale : rhs_scale;
208 task->status |= task->func(task->content, i, lhs_scale, rhs_scale);
209 (void)++task->finished;
210 }
211 }
212
DistributeTask(Task * task,int task_num) const213 void ThreadPool::DistributeTask(Task *task, int task_num) const {
214 Worker *curr = CurrentWorker();
215 // if the current thread isn't nullptr, that is the curr is a ActorThread,
216 // then assign (task_num - 1) tasks to workers, and run the last one by itself
217 int count = 0;
218 int num_assigned = curr != nullptr ? task_num - 1 : task_num;
219 int sum_frequency = 0;
220 std::vector<Worker *> assigned;
221 int num = static_cast<int>(workers_.size()) - 1;
222 int offset = 0;
223 if (!occupied_actor_thread_) {
224 offset = static_cast<int>(actor_thread_num_);
225 }
226 for (int i = num; i >= offset && count < num_assigned; --i) {
227 if (workers_[i]->available()) {
228 assigned.push_back(workers_[i]);
229 sum_frequency += workers_[i]->frequency();
230 count++;
231 }
232 }
233 // when there are not enough free threads,
234 // distribute other tasks to the master thread
235 if (curr != nullptr) {
236 for (; count < task_num; ++count) {
237 assigned.push_back(curr);
238 sum_frequency += curr->frequency();
239 }
240 } else if (assigned.size() != static_cast<size_t>(task_num)) {
241 CalculateScales(assigned, sum_frequency);
242 ActiveWorkers(assigned, task, assigned.size(), curr);
243 SyncRunTask(task, assigned.size(), task_num);
244 return;
245 }
246 CalculateScales(assigned, sum_frequency);
247 ActiveWorkers(assigned, task, task_num, curr);
248 }
249
CalculateScales(const std::vector<Worker * > & assigned,int sum_frequency) const250 void ThreadPool::CalculateScales(const std::vector<Worker *> &assigned, int sum_frequency) const {
251 // divide task according to computing power(core frequency)
252 float lhs_scale = 0;
253 float rhs_scale = 0;
254 if (sum_frequency == 0) {
255 return;
256 }
257 for (const auto &worker : assigned) {
258 THREAD_RETURN_IF_NULL(worker);
259 rhs_scale += worker->frequency() * 1.0 / sum_frequency;
260 rhs_scale = rhs_scale < 1 ? rhs_scale : 1;
261 worker->set_scale(lhs_scale, rhs_scale);
262 lhs_scale = rhs_scale;
263 }
264 }
265
ActiveWorkers(const std::vector<Worker * > & workers,Task * task,int task_num,const Worker * curr) const266 void ThreadPool::ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num,
267 const Worker *curr) const {
268 for (int i = 0; i < task_num; ++i) {
269 Worker *worker = workers[i];
270 THREAD_RETURN_IF_NULL(worker);
271 worker->Active(task, i);
272 if (worker == curr) {
273 (void)worker->RunLocalKernelTask();
274 }
275 }
276 }
277
ActiveWorkers() const278 void ThreadPool::ActiveWorkers() const {
279 for (auto &worker : workers_) {
280 worker->Active();
281 }
282 }
283
CurrentWorker() const284 Worker *ThreadPool::CurrentWorker() const {
285 for (const auto &worker : workers_) {
286 if (worker->thread_id() == std::this_thread::get_id()) {
287 return worker;
288 }
289 }
290 return nullptr;
291 }
292
InitAffinityInfo()293 int ThreadPool::InitAffinityInfo() {
294 affinity_ = new (std::nothrow) CoreAffinity();
295 THREAD_ERROR_IF_NULL(affinity_);
296 int ret = affinity_->InitHardwareCoreInfo();
297 if (ret != THREAD_OK) {
298 delete affinity_;
299 affinity_ = nullptr;
300 return THREAD_ERROR;
301 }
302 return THREAD_OK;
303 }
304
SetCpuAffinity(BindMode bind_mode)305 int ThreadPool::SetCpuAffinity(BindMode bind_mode) {
306 if (workers_.empty()) {
307 return THREAD_ERROR;
308 }
309 #ifdef BIND_CORE
310 THREAD_ERROR_IF_NULL(affinity_);
311 return affinity_->BindThreads(workers_, bind_mode);
312 #else
313 return THREAD_OK;
314 #endif // BIND_CORE
315 }
316
SetCpuAffinity(const std::vector<int> & core_list)317 int ThreadPool::SetCpuAffinity(const std::vector<int> &core_list) {
318 if (workers_.empty()) {
319 return THREAD_ERROR;
320 }
321 #ifdef BIND_CORE
322 THREAD_ERROR_IF_NULL(affinity_);
323 return affinity_->BindThreads(workers_, core_list);
324 #else
325 return THREAD_OK;
326 #endif // BIND_CORE
327 }
328
SetProcessAffinity(BindMode bind_mode) const329 int ThreadPool::SetProcessAffinity(BindMode bind_mode) const {
330 #ifdef BIND_CORE
331 THREAD_ERROR_IF_NULL(affinity_);
332 return affinity_->BindProcess(bind_mode);
333 #else
334 return THREAD_OK;
335 #endif // BIND_CORE
336 }
337
SetSpinCountMaxValue()338 void ThreadPool::SetSpinCountMaxValue() {
339 for (auto worker : workers_) {
340 THREAD_RETURN_IF_NULL(worker);
341 worker->SetMaxSpinCount(max_spin_count_);
342 }
343 return;
344 }
345
SetSpinCountMinValue()346 void ThreadPool::SetSpinCountMinValue() {
347 for (auto worker : workers_) {
348 THREAD_RETURN_IF_NULL(worker);
349 worker->SetMaxSpinCount(min_spin_count_);
350 }
351 return;
352 }
353
SetMaxSpinCount(int spin_count)354 void ThreadPool::SetMaxSpinCount(int spin_count) {
355 if (spin_count <= 0) {
356 return;
357 }
358 max_spin_count_ = spin_count;
359 }
360
SetMinSpinCount(int spin_count)361 void ThreadPool::SetMinSpinCount(int spin_count) {
362 if (spin_count <= 0) {
363 return;
364 }
365 min_spin_count_ = spin_count;
366 }
367
CreateThreadPool(size_t thread_num,const std::vector<int> & core_list)368 ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, const std::vector<int> &core_list) {
369 ThreadPool *pool = new (std::nothrow) ThreadPool();
370 if (pool == nullptr) {
371 return nullptr;
372 }
373 int ret = pool->CreateThreads(thread_num, core_list);
374 if (ret != THREAD_OK) {
375 delete pool;
376 return nullptr;
377 }
378 #ifdef BIND_CORE
379 ret = pool->InitAffinityInfo();
380 if (ret != THREAD_OK) {
381 delete pool;
382 return nullptr;
383 }
384 #endif // BIND_CORE
385 return pool;
386 }
387 } // namespace mindspore
388