• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
18 #define MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
19 
20 #include <new>
21 #include <vector>
22 #include <memory>
23 #include <thread>
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <functional>
28 #include "thread/threadlog.h"
29 #include "thread/core_affinity.h"
30 
31 namespace mindspore {
32 constexpr int kDefaultSpinCount = 300000;
33 constexpr int kMaxCount = 30000;
34 constexpr int kMinSpinCount = 1;
35 constexpr int kDefaultFrequency = 1;
36 constexpr float kMaxScale = 1.;
37 
38 enum ThreadStatus {
39   kThreadBusy = 0,  // busy, the thread is running task
40   kThreadHeld = 1,  // held, the thread has been marked as occupied
41   kThreadIdle = 2   // idle, the thread is waiting
42 };
43 
44 // used in scenarios with unequal division of task
45 // the parameters indicate the start and end coefficients
46 using Func = std::function<int(void *, int, float, float)>;
47 using Content = void *;
48 
49 typedef struct Task {
TaskTask50   Task(Func f, Content c) : func(f), content(c) {}
51   Func func;
52   Content content;
53   std::atomic_int finished{0};
54   std::atomic_int status{THREAD_OK};  // return status, RET_OK
55 } Task;
56 
57 class Worker {
58  public:
59   Worker() = default;
60   virtual ~Worker();
61   // create thread and start running at the same time
62   void CreateThread();
63   // assign task and then activate thread
64   void Active(Task *task, int task_id);
65   // activate thread
66   void Active();
67   // whether or not it is idle and marked as held
68   bool available();
69   // assigns task first before running
70   bool RunLocalKernelTask();
71   // set max spin count before running
SetMaxSpinCount(int max_spin_count)72   void SetMaxSpinCount(int max_spin_count) { max_spin_count_ = max_spin_count; }
73 
set_frequency(int frequency)74   void set_frequency(int frequency) { frequency_ = frequency; }
frequency()75   int frequency() const { return frequency_; }
76 
77   void set_scale(float lhs_scale, float rhs_scale);
lhs_scale()78   float lhs_scale() const { return lhs_scale_; }
rhs_scale()79   float rhs_scale() const { return rhs_scale_; }
80 
thread_id()81   std::thread::id thread_id() const { return thread_.get_id(); }
82 #ifdef BIND_CORE
set_mask(const cpu_set_t & mask)83   void set_mask(const cpu_set_t &mask) { mask_ = mask; }
handle()84   pthread_t handle() { return thread_.native_handle(); }
85 #endif
86 
87  protected:
88   void SetAffinity();
89   void Run();
90   void YieldAndDeactive();
91   void WaitUntilActive();
92 
93   bool alive_{true};
94   std::thread thread_;
95 #ifdef BIND_CORE
96   cpu_set_t mask_;
97 #endif
98   std::atomic_int status_{kThreadBusy};
99   std::atomic_int active_num_{0};
100 
101   std::mutex mutex_;
102   std::condition_variable cond_var_;
103 
104   std::atomic<Task *> task_{nullptr};
105   std::atomic_int task_id_{0};
106   float lhs_scale_{0.};
107   float rhs_scale_{kMaxScale};
108   int frequency_{kDefaultFrequency};
109   int spin_count_{0};
110   int max_spin_count_{kMinSpinCount};
111 };
112 
113 class ThreadPool {
114  public:
115   static ThreadPool *CreateThreadPool(size_t thread_num, const std::vector<int> &core_list = {});
116   virtual ~ThreadPool();
117 
thread_num()118   size_t thread_num() const { return workers_.size(); }
119 
120   int SetCpuAffinity(const std::vector<int> &core_list);
121   int SetCpuAffinity(BindMode bind_mode);
122   int SetProcessAffinity(BindMode bind_mode) const;
123 
124   int ParallelLaunch(const Func &func, Content content, int task_num) const;
DisableOccupiedActorThread()125   void DisableOccupiedActorThread() { occupied_actor_thread_ = false; }
SetActorThreadNum(size_t actor_thread_num)126   void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; }
SetKernelThreadNum(size_t kernel_thread_num)127   void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; }
GetKernelThreadNum()128   size_t GetKernelThreadNum() const { return kernel_thread_num_; }
129   void SetSpinCountMaxValue();
130   void SetSpinCountMinValue();
131   void SetMaxSpinCount(int spin_count);
132   void SetMinSpinCount(int spin_count);
133   void ActiveWorkers() const;
134 
135  protected:
136   ThreadPool() = default;
137 
138   int CreateThreads(size_t thread_num, const std::vector<int> &core_list);
139 
140   int InitAffinityInfo();
141 
142   void SyncRunTask(Task *task, int start_num, int task_num) const;
143 
144   void DistributeTask(Task *task, int task_num) const;
145   void CalculateScales(const std::vector<Worker *> &workers, int sum_frequency) const;
146   void ActiveWorkers(const std::vector<Worker *> &workers, Task *task, int task_num, const Worker *curr) const;
147 
148   Worker *CurrentWorker() const;
149 
150   std::mutex pool_mutex_;
151   std::vector<Worker *> workers_;
152   CoreAffinity *affinity_{nullptr};
153   size_t actor_thread_num_{0};
154   size_t kernel_thread_num_{0};
155   bool occupied_actor_thread_{true};
156   int max_spin_count_{kDefaultSpinCount};
157   int min_spin_count_{kMinSpinCount};
158 };
159 
160 }  // namespace mindspore
161 #endif  // MINDSPORE_CORE_MINDRT_RUNTIME_THREADPOOL_H_
162