1 /** 2 * Copyright 2021-2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_ 18 #define MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_ 19 20 #include <queue> 21 #include <vector> 22 #include <mutex> 23 #include <atomic> 24 #include <condition_variable> 25 #include "thread/threadpool.h" 26 #include "thread/core_affinity.h" 27 #include "actor/actor.h" 28 #include "thread/hqueue.h" 29 #ifndef USE_HQUEUE 30 #define USE_HQUEUE 31 #endif 32 namespace mindspore { 33 class ActorThreadPool; 34 class ActorWorker : public Worker { 35 public: ActorWorker(ThreadPool * pool,size_t index)36 explicit ActorWorker(ThreadPool *pool, size_t index) : Worker(pool, index) {} 37 void CreateThread() override; 38 bool ActorActive(); ~ActorWorker()39 ~ActorWorker() override { 40 { 41 std::lock_guard<std::mutex> _l(mutex_); 42 alive_ = false; 43 } 44 cond_var_->notify_one(); 45 46 bool terminate = false; 47 int count = 0; 48 while (local_task_queue_ && !terminate && count++ < kMaxCount) { 49 terminate = local_task_queue_->Empty(); 50 if (!terminate) { 51 auto task_split = local_task_queue_->Dequeue(); 52 (void)TryRunTask(task_split); 53 } 54 } 55 56 if (thread_->joinable()) { 57 thread_->join(); 58 } 59 local_task_queue_ = nullptr; 60 }; 61 62 private: 63 void RunWithSpin(); 64 bool RunQueueActorTask(); 65 }; 66 67 class MS_CORE_API ActorThreadPool : public ThreadPool { 68 public: 69 // create ThreadPool that contains actor thread and kernel thread CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,BindMode bind_mode)70 static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, BindMode bind_mode) { 71 std::vector<int> core_list; 72 return ActorThreadPool::CreateThreadPool(actor_thread_num, all_thread_num, core_list, bind_mode); 73 } 74 75 static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, 76 const std::vector<int> &core_list, BindMode bind_mode); 77 // create ThreadPool that contains only actor thread 78 static ActorThreadPool *CreateThreadPool(size_t thread_num); 79 ~ActorThreadPool() override; 80 set_actor_queue_size(size_t actor_queue_size)81 static void set_actor_queue_size(size_t actor_queue_size) { actor_queue_size_ = actor_queue_size; } 82 83 virtual int ActorQueueInit(); 84 virtual void PushActorToQueue(ActorBase *actor); 85 virtual ActorBase *PopActorFromQueue(); 86 87 protected: 88 ActorThreadPool() = default; 89 90 std::mutex actor_mutex_; 91 std::condition_variable actor_cond_; 92 #ifdef USE_HQUEUE 93 HQueue<ActorBase> actor_queue_; 94 #else 95 std::queue<ActorBase *> actor_queue_; 96 #endif 97 98 private: 99 int CreateThreads(size_t actor_thread_num, size_t all_thread_num, const std::vector<int> &core_list); 100 101 // Support to set the size of actor queue. 102 static size_t actor_queue_size_; 103 }; 104 } // namespace mindspore 105 #endif // MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_ 106