1 /**
2 * Copyright 2021 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #ifndef _MSC_VER
17 #include <sched.h>
18 #include <unistd.h>
19 #endif
20 #include "thread/actor_threadpool.h"
21 #include "thread/core_affinity.h"
22
23 namespace mindspore {
24 constexpr size_t MAX_READY_ACTOR_NR = 4096;
CreateThread(ActorThreadPool * pool)25 void ActorWorker::CreateThread(ActorThreadPool *pool) {
26 THREAD_RETURN_IF_NULL(pool);
27 pool_ = pool;
28 thread_ = std::thread(&ActorWorker::RunWithSpin, this);
29 }
30
RunWithSpin()31 void ActorWorker::RunWithSpin() {
32 SetAffinity();
33 #if !defined(__APPLE__) && !defined(SUPPORT_MSVC)
34 static std::atomic_int index = {0};
35 (void)pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str());
36 #endif
37 while (alive_) {
38 // only run either local KernelTask or PoolQueue ActorTask
39 if (RunLocalKernelTask() || RunQueueActorTask()) {
40 spin_count_ = 0;
41 } else {
42 YieldAndDeactive();
43 }
44 if (spin_count_ > max_spin_count_) {
45 WaitUntilActive();
46 spin_count_ = 0;
47 }
48 }
49 }
50
RunQueueActorTask()51 bool ActorWorker::RunQueueActorTask() {
52 THREAD_ERROR_IF_NULL(pool_);
53 auto actor = pool_->PopActorFromQueue();
54 if (actor == nullptr) {
55 return false;
56 }
57 actor->Run();
58 return true;
59 }
60
ActorActive()61 bool ActorWorker::ActorActive() {
62 if (status_ != kThreadIdle) {
63 return false;
64 }
65 {
66 std::lock_guard<std::mutex> _l(mutex_);
67 active_num_++;
68 status_ = kThreadBusy;
69 }
70 cond_var_.notify_one();
71 return true;
72 }
73
~ActorThreadPool()74 ActorThreadPool::~ActorThreadPool() {
75 // wait until actor queue is empty
76 bool terminate = false;
77 int count = 0;
78 do {
79 {
80 #ifdef USE_HQUEUE
81 terminate = actor_queue_.Empty();
82 #else
83 std::lock_guard<std::mutex> _l(actor_mutex_);
84 terminate = actor_queue_.empty();
85 #endif
86 }
87 if (!terminate) {
88 for (auto &worker : workers_) {
89 worker->Active();
90 }
91 std::this_thread::yield();
92 }
93 } while (!terminate && count++ < kMaxCount);
94 for (auto &worker : workers_) {
95 delete worker;
96 worker = nullptr;
97 }
98 workers_.clear();
99 #ifdef USE_HQUEUE
100 actor_queue_.Clean();
101 #endif
102 }
103
PopActorFromQueue()104 ActorBase *ActorThreadPool::PopActorFromQueue() {
105 #ifdef USE_HQUEUE
106 return actor_queue_.Dequeue();
107 #else
108 std::lock_guard<std::mutex> _l(actor_mutex_);
109 if (actor_queue_.empty()) {
110 return nullptr;
111 }
112 auto actor = actor_queue_.front();
113 actor_queue_.pop();
114 return actor;
115 #endif
116 }
117
PushActorToQueue(ActorBase * actor)118 void ActorThreadPool::PushActorToQueue(ActorBase *actor) {
119 if (!actor) {
120 return;
121 }
122 {
123 #ifdef USE_HQUEUE
124 while (!actor_queue_.Enqueue(actor)) {
125 }
126 #else
127 std::lock_guard<std::mutex> _l(actor_mutex_);
128 actor_queue_.push(actor);
129 #endif
130 }
131 THREAD_DEBUG("actor[%s] enqueue success", actor->GetAID().Name().c_str());
132 // active one idle actor thread if exist
133 for (size_t i = 0; i < actor_thread_num_; ++i) {
134 auto worker = reinterpret_cast<ActorWorker *>(workers_[i]);
135 if (worker->ActorActive()) {
136 break;
137 }
138 }
139 }
140
CreateThreads(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list)141 int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num, const std::vector<int> &core_list) {
142 #ifdef USE_HQUEUE
143 if (actor_queue_.Init(MAX_READY_ACTOR_NR) != true) {
144 THREAD_ERROR("init actor queue failed.");
145 return THREAD_ERROR;
146 }
147 #endif
148 #ifdef BIND_CORE
149 affinity_->SetCoreId(core_list);
150 #endif
151 size_t core_num = std::thread::hardware_concurrency();
152 THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num);
153 actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num;
154 if (actor_thread_num > all_thread_num) {
155 THREAD_ERROR("thread num is invalid");
156 return THREAD_ERROR;
157 }
158 for (size_t i = 0; i < actor_thread_num_; ++i) {
159 std::lock_guard<std::mutex> _l(pool_mutex_);
160 auto worker = new (std::nothrow) ActorWorker();
161 THREAD_ERROR_IF_NULL(worker);
162 #ifdef BIND_CORE
163 cpu_set_t mask;
164 CPU_ZERO(&mask);
165 if (core_list.size() > 0) {
166 CPU_SET(core_list[workers_.size() % core_list.size()], &mask);
167 }
168 worker->set_mask(mask);
169 #endif
170 worker->CreateThread(this);
171 workers_.push_back(worker);
172 THREAD_INFO("create actor thread[%zu]", i);
173 }
174 size_t kernel_thread_num = all_thread_num - actor_thread_num_;
175 if (kernel_thread_num > 0) {
176 return ThreadPool::CreateThreads(kernel_thread_num, core_list);
177 }
178 return THREAD_OK;
179 }
180
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,BindMode bind_mode)181 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, BindMode bind_mode) {
182 ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
183 if (pool == nullptr) {
184 return nullptr;
185 }
186 int ret;
187 std::vector<int> core_list;
188 #ifdef BIND_CORE
189 ret = pool->InitAffinityInfo();
190 if (ret != THREAD_OK) {
191 delete pool;
192 return nullptr;
193 }
194 core_list = pool->affinity_->GetCoreId(all_thread_num, bind_mode);
195 #endif // BIND_CORE
196 ret = pool->CreateThreads(actor_thread_num, all_thread_num, core_list);
197 if (ret != THREAD_OK) {
198 delete pool;
199 return nullptr;
200 }
201
202 return pool;
203 }
204
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list)205 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num,
206 const std::vector<int> &core_list) {
207 ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
208 if (pool == nullptr) {
209 return nullptr;
210 }
211 int ret;
212 #ifdef BIND_CORE
213 ret = pool->InitAffinityInfo();
214 if (ret != THREAD_OK) {
215 delete pool;
216 return nullptr;
217 }
218 #endif // BIND_CORE
219 ret = pool->CreateThreads(actor_thread_num, all_thread_num, core_list);
220 if (ret != THREAD_OK) {
221 delete pool;
222 return nullptr;
223 }
224
225 return pool;
226 }
227
CreateThreadPool(size_t thread_num)228 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) {
229 ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
230 if (pool == nullptr) {
231 return nullptr;
232 }
233 int ret = pool->CreateThreads(thread_num, thread_num, {});
234 if (ret != THREAD_OK) {
235 delete pool;
236 return nullptr;
237 }
238 return pool;
239 }
240 } // namespace mindspore
241