• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #ifndef _MSC_VER
17 #include <sched.h>
18 #include <unistd.h>
19 #endif
20 #include "thread/actor_threadpool.h"
21 #include "thread/core_affinity.h"
22 
23 namespace mindspore {
24 constexpr size_t MAX_READY_ACTOR_NR = 4096;
CreateThread(ActorThreadPool * pool)25 void ActorWorker::CreateThread(ActorThreadPool *pool) {
26   THREAD_RETURN_IF_NULL(pool);
27   pool_ = pool;
28   thread_ = std::thread(&ActorWorker::RunWithSpin, this);
29 }
30 
RunWithSpin()31 void ActorWorker::RunWithSpin() {
32   SetAffinity();
33 #if !defined(__APPLE__) && !defined(SUPPORT_MSVC)
34   static std::atomic_int index = {0};
35   (void)pthread_setname_np(pthread_self(), ("ActorThread_" + std::to_string(index++)).c_str());
36 #endif
37   while (alive_) {
38     // only run either local KernelTask or PoolQueue ActorTask
39     if (RunLocalKernelTask() || RunQueueActorTask()) {
40       spin_count_ = 0;
41     } else {
42       YieldAndDeactive();
43     }
44     if (spin_count_ > max_spin_count_) {
45       WaitUntilActive();
46       spin_count_ = 0;
47     }
48   }
49 }
50 
RunQueueActorTask()51 bool ActorWorker::RunQueueActorTask() {
52   THREAD_ERROR_IF_NULL(pool_);
53   auto actor = pool_->PopActorFromQueue();
54   if (actor == nullptr) {
55     return false;
56   }
57   actor->Run();
58   return true;
59 }
60 
ActorActive()61 bool ActorWorker::ActorActive() {
62   if (status_ != kThreadIdle) {
63     return false;
64   }
65   {
66     std::lock_guard<std::mutex> _l(mutex_);
67     active_num_++;
68     status_ = kThreadBusy;
69   }
70   cond_var_.notify_one();
71   return true;
72 }
73 
~ActorThreadPool()74 ActorThreadPool::~ActorThreadPool() {
75   // wait until actor queue is empty
76   bool terminate = false;
77   int count = 0;
78   do {
79     {
80 #ifdef USE_HQUEUE
81       terminate = actor_queue_.Empty();
82 #else
83       std::lock_guard<std::mutex> _l(actor_mutex_);
84       terminate = actor_queue_.empty();
85 #endif
86     }
87     if (!terminate) {
88       for (auto &worker : workers_) {
89         worker->Active();
90       }
91       std::this_thread::yield();
92     }
93   } while (!terminate && count++ < kMaxCount);
94   for (auto &worker : workers_) {
95     delete worker;
96     worker = nullptr;
97   }
98   workers_.clear();
99 #ifdef USE_HQUEUE
100   actor_queue_.Clean();
101 #endif
102 }
103 
PopActorFromQueue()104 ActorBase *ActorThreadPool::PopActorFromQueue() {
105 #ifdef USE_HQUEUE
106   return actor_queue_.Dequeue();
107 #else
108   std::lock_guard<std::mutex> _l(actor_mutex_);
109   if (actor_queue_.empty()) {
110     return nullptr;
111   }
112   auto actor = actor_queue_.front();
113   actor_queue_.pop();
114   return actor;
115 #endif
116 }
117 
PushActorToQueue(ActorBase * actor)118 void ActorThreadPool::PushActorToQueue(ActorBase *actor) {
119   if (!actor) {
120     return;
121   }
122   {
123 #ifdef USE_HQUEUE
124     while (!actor_queue_.Enqueue(actor)) {
125     }
126 #else
127     std::lock_guard<std::mutex> _l(actor_mutex_);
128     actor_queue_.push(actor);
129 #endif
130   }
131   THREAD_DEBUG("actor[%s] enqueue success", actor->GetAID().Name().c_str());
132   // active one idle actor thread if exist
133   for (size_t i = 0; i < actor_thread_num_; ++i) {
134     auto worker = reinterpret_cast<ActorWorker *>(workers_[i]);
135     if (worker->ActorActive()) {
136       break;
137     }
138   }
139 }
140 
CreateThreads(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list)141 int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num, const std::vector<int> &core_list) {
142 #ifdef USE_HQUEUE
143   if (actor_queue_.Init(MAX_READY_ACTOR_NR) != true) {
144     THREAD_ERROR("init actor queue failed.");
145     return THREAD_ERROR;
146   }
147 #endif
148 #ifdef BIND_CORE
149   affinity_->SetCoreId(core_list);
150 #endif
151   size_t core_num = std::thread::hardware_concurrency();
152   THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num);
153   actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num;
154   if (actor_thread_num > all_thread_num) {
155     THREAD_ERROR("thread num is invalid");
156     return THREAD_ERROR;
157   }
158   for (size_t i = 0; i < actor_thread_num_; ++i) {
159     std::lock_guard<std::mutex> _l(pool_mutex_);
160     auto worker = new (std::nothrow) ActorWorker();
161     THREAD_ERROR_IF_NULL(worker);
162 #ifdef BIND_CORE
163     cpu_set_t mask;
164     CPU_ZERO(&mask);
165     if (core_list.size() > 0) {
166       CPU_SET(core_list[workers_.size() % core_list.size()], &mask);
167     }
168     worker->set_mask(mask);
169 #endif
170     worker->CreateThread(this);
171     workers_.push_back(worker);
172     THREAD_INFO("create actor thread[%zu]", i);
173   }
174   size_t kernel_thread_num = all_thread_num - actor_thread_num_;
175   if (kernel_thread_num > 0) {
176     return ThreadPool::CreateThreads(kernel_thread_num, core_list);
177   }
178   return THREAD_OK;
179 }
180 
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,BindMode bind_mode)181 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, BindMode bind_mode) {
182   ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
183   if (pool == nullptr) {
184     return nullptr;
185   }
186   int ret;
187   std::vector<int> core_list;
188 #ifdef BIND_CORE
189   ret = pool->InitAffinityInfo();
190   if (ret != THREAD_OK) {
191     delete pool;
192     return nullptr;
193   }
194   core_list = pool->affinity_->GetCoreId(all_thread_num, bind_mode);
195 #endif  // BIND_CORE
196   ret = pool->CreateThreads(actor_thread_num, all_thread_num, core_list);
197   if (ret != THREAD_OK) {
198     delete pool;
199     return nullptr;
200   }
201 
202   return pool;
203 }
204 
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list)205 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num,
206                                                    const std::vector<int> &core_list) {
207   ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
208   if (pool == nullptr) {
209     return nullptr;
210   }
211   int ret;
212 #ifdef BIND_CORE
213   ret = pool->InitAffinityInfo();
214   if (ret != THREAD_OK) {
215     delete pool;
216     return nullptr;
217   }
218 #endif  // BIND_CORE
219   ret = pool->CreateThreads(actor_thread_num, all_thread_num, core_list);
220   if (ret != THREAD_OK) {
221     delete pool;
222     return nullptr;
223   }
224 
225   return pool;
226 }
227 
CreateThreadPool(size_t thread_num)228 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) {
229   ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
230   if (pool == nullptr) {
231     return nullptr;
232   }
233   int ret = pool->CreateThreads(thread_num, thread_num, {});
234   if (ret != THREAD_OK) {
235     delete pool;
236     return nullptr;
237   }
238   return pool;
239 }
240 }  // namespace mindspore
241