• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <memory>
17 #ifndef _MSC_VER
18 #include <sched.h>
19 #include <unistd.h>
20 #endif
21 #include "thread/actor_threadpool.h"
22 #include "thread/core_affinity.h"
23 
24 namespace mindspore {
25 size_t ActorThreadPool::actor_queue_size_ = kMaxHqueueSize;
26 
CreateThread()27 void ActorWorker::CreateThread() { thread_ = std::make_unique<std::thread>(&ActorWorker::RunWithSpin, this); }
28 
RunWithSpin()29 void ActorWorker::RunWithSpin() {
30   if (!core_list_.empty()) {
31     SetAffinity();
32   }
33 #if !defined(__APPLE__) && !defined(_MSC_VER)
34   static std::atomic_int index{0};
35   (void)pthread_setname_np(pthread_self(), ("OS_Actor_" + std::to_string(index++)).c_str());
36 #endif
37 #ifdef PLATFORM_86
38   // Some CPU kernels need set the flush zero mode to improve performance.
39   _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
40   _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON);
41 #endif
42   while (alive_) {
43     // only run either local KernelTask or PoolQueue ActorTask
44     if (RunLocalKernelTask() || RunQueueActorTask()) {
45       spin_count_ = 0;
46     } else {
47       YieldAndDeactive();
48     }
49     if (spin_count_ > max_spin_count_) {
50       WaitUntilActive();
51       spin_count_ = 0;
52     }
53   }
54 }
55 
RunQueueActorTask()56 bool ActorWorker::RunQueueActorTask() {
57   if (pool_ == nullptr) {
58     return false;
59   }
60   auto actor = reinterpret_cast<ActorThreadPool *>(pool_)->PopActorFromQueue();
61   if (actor == nullptr) {
62     return false;
63   }
64 
65   actor->Run();
66   return true;
67 }
68 
ActorActive()69 bool ActorWorker::ActorActive() {
70   if (status_ != kThreadIdle) {
71     return false;
72   }
73   {
74     std::lock_guard<std::mutex> _l(mutex_);
75     active_num_++;
76     status_ = kThreadBusy;
77   }
78   cond_var_->notify_one();
79   return true;
80 }
81 
~ActorThreadPool()82 ActorThreadPool::~ActorThreadPool() {
83   // wait until actor queue is empty
84   bool terminate = false;
85   int count = 0;
86   do {
87     {
88 #ifdef USE_HQUEUE
89       terminate = actor_queue_.Empty();
90 #else
91       std::lock_guard<std::mutex> _l(actor_mutex_);
92       terminate = actor_queue_.empty();
93 #endif
94     }
95     if (!terminate) {
96       for (auto &worker : workers_) {
97         worker->Active();
98       }
99       std::this_thread::yield();
100     }
101   } while (!terminate && count++ < kMaxCount);
102   for (auto &worker : workers_) {
103     delete worker;
104     worker = nullptr;
105   }
106   workers_.clear();
107 #ifdef USE_HQUEUE
108   actor_queue_.Clean();
109 #endif
110 }
111 
PopActorFromQueue()112 ActorBase *ActorThreadPool::PopActorFromQueue() {
113 #ifdef USE_HQUEUE
114   return actor_queue_.Dequeue();
115 #else
116   std::lock_guard<std::mutex> _l(actor_mutex_);
117   if (actor_queue_.empty()) {
118     return nullptr;
119   }
120   auto actor = actor_queue_.front();
121   actor_queue_.pop();
122   return actor;
123 #endif
124 }
125 
PushActorToQueue(ActorBase * actor)126 void ActorThreadPool::PushActorToQueue(ActorBase *actor) {
127   if (!actor) {
128     return;
129   }
130   {
131 #ifdef USE_HQUEUE
132     while (!actor_queue_.Enqueue(actor)) {
133     }
134 #else
135     std::lock_guard<std::mutex> _l(actor_mutex_);
136     actor_queue_.push(actor);
137 #endif
138   }
139   THREAD_DEBUG("actor[%s] enqueue success", actor->GetAID().Name().c_str());
140   // active one idle actor thread if exist
141   for (size_t i = 0; i < actor_thread_num_; ++i) {
142     auto worker = reinterpret_cast<ActorWorker *>(workers_[i]);
143     if (worker->ActorActive()) {
144       break;
145     }
146   }
147 }
148 
ActorQueueInit()149 int ActorThreadPool::ActorQueueInit() {
150 #ifdef USE_HQUEUE
151   if (actor_queue_.Init(static_cast<int32_t>(actor_queue_size_)) != true) {
152     THREAD_ERROR("init actor queue failed.");
153     return THREAD_ERROR;
154   }
155 #endif
156   return THREAD_OK;
157 }
158 
CreateThreads(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list)159 int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num, const std::vector<int> &core_list) {
160   if (actor_thread_num > all_thread_num) {
161     THREAD_ERROR("thread num is invalid");
162     return THREAD_ERROR;
163   }
164   if (ActorQueueInit() != THREAD_OK) {
165     return THREAD_ERROR;
166   }
167   if (affinity_ != nullptr) {
168     affinity_->SetCoreId(core_list);
169   }
170   size_t core_num = std::thread::hardware_concurrency();
171   THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num);
172   actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num;
173   core_num -= actor_thread_num_;
174   size_t kernel_thread_num =
175     (all_thread_num - actor_thread_num_) < core_num ? (all_thread_num - actor_thread_num_) : core_num;
176   size_t total_thread_num = actor_thread_num_ + kernel_thread_num;
177   if (TaskQueuesInit(total_thread_num) != THREAD_OK) {
178     return THREAD_ERROR;
179   }
180 
181   if (ThreadPool::CreateThreads<ActorWorker>(actor_thread_num_, core_list) != THREAD_OK) {
182     return THREAD_ERROR;
183   }
184 
185   if (kernel_thread_num > 0) {
186     return ThreadPool::CreateThreads<Worker>(kernel_thread_num, core_list);
187   }
188   return THREAD_OK;
189 }
190 
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list,BindMode bind_mode)191 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num,
192                                                    const std::vector<int> &core_list, BindMode bind_mode) {
193   std::lock_guard<std::mutex> lock(create_thread_pool_muntex_);
194   ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
195   if (pool == nullptr) {
196     return nullptr;
197   }
198   int ret = pool->InitAffinityInfo();
199   if (ret != THREAD_OK) {
200     delete pool;
201     return nullptr;
202   }
203   if (core_list.empty()) {
204     ret = pool->CreateThreads(actor_thread_num, all_thread_num, pool->affinity_->GetCoreId(all_thread_num, bind_mode));
205   } else {
206     ret = pool->CreateThreads(actor_thread_num, all_thread_num, core_list);
207   }
208 
209   if (ret != THREAD_OK) {
210     delete pool;
211     return nullptr;
212   }
213 
214   return pool;
215 }
216 
CreateThreadPool(size_t thread_num)217 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) {
218   ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
219   if (pool == nullptr) {
220     return nullptr;
221   }
222   int ret = pool->CreateThreads(thread_num, thread_num, {});
223   if (ret != THREAD_OK) {
224     delete pool;
225     return nullptr;
226   }
227   return pool;
228 }
229 }  // namespace mindspore
230