• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_
18 #define MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_
19 
20 #include <queue>
21 #include <vector>
22 #include <mutex>
23 #include <atomic>
24 #include <condition_variable>
25 #include "thread/threadpool.h"
26 #include "thread/core_affinity.h"
27 #include "actor/actor.h"
28 #include "thread/hqueue.h"
29 #ifndef USE_HQUEUE
30 #define USE_HQUEUE
31 #endif
32 namespace mindspore {
33 class ActorThreadPool;
34 class ActorWorker : public Worker {
35  public:
ActorWorker(ThreadPool * pool,size_t index)36   explicit ActorWorker(ThreadPool *pool, size_t index) : Worker(pool, index) {}
37   void CreateThread() override;
38   bool ActorActive();
~ActorWorker()39   ~ActorWorker() override {
40     {
41       std::lock_guard<std::mutex> _l(mutex_);
42       alive_ = false;
43     }
44     cond_var_->notify_one();
45 
46     bool terminate = false;
47     int count = 0;
48     while (local_task_queue_ && !terminate && count++ < kMaxCount) {
49       terminate = local_task_queue_->Empty();
50       if (!terminate) {
51         auto task_split = local_task_queue_->Dequeue();
52         (void)TryRunTask(task_split);
53       }
54     }
55 
56     if (thread_->joinable()) {
57       thread_->join();
58     }
59     local_task_queue_ = nullptr;
60   };
61 
62  private:
63   void RunWithSpin();
64   bool RunQueueActorTask();
65 };
66 
67 class MS_CORE_API ActorThreadPool : public ThreadPool {
68  public:
69   // create ThreadPool that contains actor thread and kernel thread
CreateThreadPool(size_t actor_thread_num,size_t all_thread_num,BindMode bind_mode)70   static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num, BindMode bind_mode) {
71     std::vector<int> core_list;
72     return ActorThreadPool::CreateThreadPool(actor_thread_num, all_thread_num, core_list, bind_mode);
73   }
74 
75   static ActorThreadPool *CreateThreadPool(size_t actor_thread_num, size_t all_thread_num,
76                                            const std::vector<int> &core_list, BindMode bind_mode);
77   // create ThreadPool that contains only actor thread
78   static ActorThreadPool *CreateThreadPool(size_t thread_num);
79   ~ActorThreadPool() override;
80 
set_actor_queue_size(size_t actor_queue_size)81   static void set_actor_queue_size(size_t actor_queue_size) { actor_queue_size_ = actor_queue_size; }
82 
83   virtual int ActorQueueInit();
84   virtual void PushActorToQueue(ActorBase *actor);
85   virtual ActorBase *PopActorFromQueue();
86 
87  protected:
88   ActorThreadPool() = default;
89 
90   std::mutex actor_mutex_;
91   std::condition_variable actor_cond_;
92 #ifdef USE_HQUEUE
93   HQueue<ActorBase> actor_queue_;
94 #else
95   std::queue<ActorBase *> actor_queue_;
96 #endif
97 
98  private:
99   int CreateThreads(size_t actor_thread_num, size_t all_thread_num, const std::vector<int> &core_list);
100 
101   // Support to set the size of actor queue.
102   static size_t actor_queue_size_;
103 };
104 }  // namespace mindspore
105 #endif  // MINDSPORE_CORE_MINDRT_RUNTIME_ACTOR_THREADPOOL_H_
106