1 /**
2 * Copyright 2021-2023 Huawei Technologies Co., Ltd
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include <memory>
17 #ifndef _MSC_VER
18 #include <sched.h>
19 #include <unistd.h>
20 #endif
21 #include "thread/actor_threadpool.h"
22 #include "thread/core_affinity.h"
23
24 namespace mindspore {
25 size_t ActorThreadPool::actor_queue_size_ = kMaxHqueueSize;
26
CreateThread()27 void ActorWorker::CreateThread() { thread_ = std::make_unique<std::thread>(&ActorWorker::RunWithSpin, this); }
28
RunWithSpin()29 void ActorWorker::RunWithSpin() {
30 if (!core_list_.empty()) {
31 SetAffinity();
32 }
33 #if !defined(__APPLE__) && !defined(_MSC_VER)
34 static std::atomic_int index{0};
35 (void)pthread_setname_np(pthread_self(), ("OS_Actor_" + std::to_string(index++)).c_str());
36 #endif
37 #ifdef PLATFORM_86
38 // Some CPU kernels need set the flush zero mode to improve performance.
39 _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
40 _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON);
41 #endif
42 while (alive_) {
43 // only run either local KernelTask or PoolQueue ActorTask
44 if (RunLocalKernelTask() || RunQueueActorTask()) {
45 spin_count_ = 0;
46 } else {
47 YieldAndDeactive();
48 }
49 if (spin_count_ > max_spin_count_) {
50 WaitUntilActive();
51 spin_count_ = 0;
52 }
53 }
54 }
55
RunQueueActorTask()56 bool ActorWorker::RunQueueActorTask() {
57 if (pool_ == nullptr) {
58 return false;
59 }
60 auto actor = reinterpret_cast<ActorThreadPool *>(pool_)->PopActorFromQueue();
61 if (actor == nullptr) {
62 return false;
63 }
64
65 actor->Run();
66 return true;
67 }
68
ActorActive()69 bool ActorWorker::ActorActive() {
70 if (status_ != kThreadIdle) {
71 return false;
72 }
73 {
74 std::lock_guard<std::mutex> _l(mutex_);
75 active_num_++;
76 status_ = kThreadBusy;
77 }
78 cond_var_->notify_one();
79 return true;
80 }
81
~ActorThreadPool()82 ActorThreadPool::~ActorThreadPool() {
83 // wait until actor queue is empty
84 bool terminate = false;
85 int count = 0;
86 do {
87 {
88 #ifdef USE_HQUEUE
89 terminate = actor_queue_.Empty();
90 #else
91 std::lock_guard<std::mutex> _l(actor_mutex_);
92 terminate = actor_queue_.empty();
93 #endif
94 }
95 if (!terminate) {
96 for (auto &worker : workers_) {
97 worker->Active();
98 }
99 std::this_thread::yield();
100 }
101 } while (!terminate && count++ < kMaxCount);
102 for (auto &worker : workers_) {
103 delete worker;
104 worker = nullptr;
105 }
106 workers_.clear();
107 #ifdef USE_HQUEUE
108 actor_queue_.Clean();
109 #endif
110 }
111
PopActorFromQueue()112 ActorBase *ActorThreadPool::PopActorFromQueue() {
113 #ifdef USE_HQUEUE
114 return actor_queue_.Dequeue();
115 #else
116 std::lock_guard<std::mutex> _l(actor_mutex_);
117 if (actor_queue_.empty()) {
118 return nullptr;
119 }
120 auto actor = actor_queue_.front();
121 actor_queue_.pop();
122 return actor;
123 #endif
124 }
125
PushActorToQueue(ActorBase * actor)126 void ActorThreadPool::PushActorToQueue(ActorBase *actor) {
127 if (!actor) {
128 return;
129 }
130 {
131 #ifdef USE_HQUEUE
132 while (!actor_queue_.Enqueue(actor)) {
133 }
134 #else
135 std::lock_guard<std::mutex> _l(actor_mutex_);
136 actor_queue_.push(actor);
137 #endif
138 }
139 THREAD_DEBUG("actor[%s] enqueue success", actor->GetAID().Name().c_str());
140 // active one idle actor thread if exist
141 for (size_t i = 0; i < actor_thread_num_; ++i) {
142 auto worker = reinterpret_cast<ActorWorker *>(workers_[i]);
143 if (worker->ActorActive()) {
144 break;
145 }
146 }
147 }
148
ActorQueueInit()149 int ActorThreadPool::ActorQueueInit() {
150 #ifdef USE_HQUEUE
151 if (actor_queue_.Init(static_cast<int32_t>(actor_queue_size_)) != true) {
152 THREAD_ERROR("init actor queue failed.");
153 return THREAD_ERROR;
154 }
155 #endif
156 return THREAD_OK;
157 }
158
CreateThreads(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list)159 int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_num, const std::vector<int> &core_list) {
160 if (actor_thread_num > all_thread_num) {
161 THREAD_ERROR("thread num is invalid");
162 return THREAD_ERROR;
163 }
164 if (ActorQueueInit() != THREAD_OK) {
165 return THREAD_ERROR;
166 }
167 if (affinity_ != nullptr) {
168 affinity_->SetCoreId(core_list);
169 }
170 size_t core_num = std::thread::hardware_concurrency();
171 THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num);
172 actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num;
173 core_num -= actor_thread_num_;
174 size_t kernel_thread_num =
175 (all_thread_num - actor_thread_num_) < core_num ? (all_thread_num - actor_thread_num_) : core_num;
176 size_t total_thread_num = actor_thread_num_ + kernel_thread_num;
177 if (TaskQueuesInit(total_thread_num) != THREAD_OK) {
178 return THREAD_ERROR;
179 }
180
181 if (ThreadPool::CreateThreads<ActorWorker>(actor_thread_num_, core_list) != THREAD_OK) {
182 return THREAD_ERROR;
183 }
184
185 if (kernel_thread_num > 0) {
186 return ThreadPool::CreateThreads<Worker>(kernel_thread_num, core_list);
187 }
188 return THREAD_OK;
189 }
190
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,const std::vector<int> & core_list,BindMode bind_mode)191 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t actor_thread_num, size_t all_thread_num,
192 const std::vector<int> &core_list, BindMode bind_mode) {
193 std::lock_guard<std::mutex> lock(create_thread_pool_muntex_);
194 ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
195 if (pool == nullptr) {
196 return nullptr;
197 }
198 int ret = pool->InitAffinityInfo();
199 if (ret != THREAD_OK) {
200 delete pool;
201 return nullptr;
202 }
203 if (core_list.empty()) {
204 ret = pool->CreateThreads(actor_thread_num, all_thread_num, pool->affinity_->GetCoreId(all_thread_num, bind_mode));
205 } else {
206 ret = pool->CreateThreads(actor_thread_num, all_thread_num, core_list);
207 }
208
209 if (ret != THREAD_OK) {
210 delete pool;
211 return nullptr;
212 }
213
214 return pool;
215 }
216
CreateThreadPool(size_t thread_num)217 ActorThreadPool *ActorThreadPool::CreateThreadPool(size_t thread_num) {
218 ActorThreadPool *pool = new (std::nothrow) ActorThreadPool();
219 if (pool == nullptr) {
220 return nullptr;
221 }
222 int ret = pool->CreateThreads(thread_num, thread_num, {});
223 if (ret != THREAD_OK) {
224 delete pool;
225 return nullptr;
226 }
227 return pool;
228 }
229 } // namespace mindspore
230