• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_CPU_WORKER_HPP
17 #define FFRT_CPU_WORKER_HPP
18 
19 #include <atomic>
20 #include <unistd.h>
21 #ifdef FFRT_PTHREAD_ENABLE
22 #include <pthread.h>
23 #endif
24 #include <thread>
25 #ifdef OHOS_THREAD_STACK_DUMP
26 #include <sstream>
27 #endif
28 #ifdef USE_OHOS_QOS
29 #include "qos.h"
30 #else
31 #include "staging_qos/sched/qos.h"
32 #endif
33 #include "tm/task_base.h"
34 #include "dfx/log/ffrt_log_api.h"
35 #include "c/executor_task.h"
36 #include "util/spmc_queue.h"
37 
38 namespace ffrt {
39 constexpr int PTHREAD_CREATE_NO_MEM_CODE = 11;
40 constexpr int FFRT_RETRY_MAX_COUNT = 12;
41 const std::vector<uint64_t> FFRT_RETRY_CYCLE_LIST = {
42     10 * 1000, 50 * 1000, 100 * 1000, 200 * 1000, 500 * 1000, 1000 * 1000, 2 * 1000 * 1000,
43     5 * 1000 * 1000, 10 * 1000 * 1000, 50 * 1000 * 1000, 100 * 1000 * 1000, 500 * 1000 * 1000
44 };
45 
46 enum class WorkerAction {
47     RETRY = 0,
48     RETIRE,
49     MAX,
50 };
51 
52 enum class WorkerStatus {
53     EXECUTING = 0,
54     SLEEPING,
55     DESTROYED,
56 };
57 
58 class CPUWorker;
59 struct CpuWorkerOps {
60     std::function<WorkerAction (CPUWorker*)> WorkerIdleAction;
61     std::function<void (CPUWorker*)> WorkerRetired;
62     std::function<void (CPUWorker*)> WorkerPrepare;
63 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
64     std::function<bool (void)> IsBlockAwareInit;
65 #endif
66 };
67 
68 class CPUWorker {
69 public:
70     explicit CPUWorker(const QoS& qos, CpuWorkerOps&& ops, size_t stackSize);
71     ~CPUWorker();
72 
Exited()73     bool Exited() const
74     {
75         return exited.load(std::memory_order_relaxed);
76     }
77 
SetExited()78     void SetExited()
79     {
80         exited.store(true, std::memory_order_relaxed);
81     }
82 
Id()83     pid_t Id() const
84     {
85         while (!exited && tid < 0) {
86         }
87         return tid;
88     }
89 
GetQos()90     const QoS& GetQos() const
91     {
92         return qos;
93     }
94 
GetWorkerState()95     const WorkerStatus& GetWorkerState() const
96     {
97         return state;
98     }
99 
SetWorkerState(const WorkerStatus & newState)100     void SetWorkerState(const WorkerStatus& newState)
101     {
102         this->state = newState;
103     }
104 
SetWorkerMonitorStatus(bool monitor)105     void SetWorkerMonitorStatus(bool monitor)
106     {
107         monitor_ = monitor;
108     }
109 
Monitor()110     bool Monitor() const
111     {
112         return monitor_;
113     }
114 
115 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
GetDomainId()116     unsigned int GetDomainId() const
117     {
118         return domain_id;
119     }
120 #endif
121 #ifdef FFRT_PTHREAD_ENABLE
Start(void * (* ThreadFunc)(void *),void * args)122     void Start(void*(*ThreadFunc)(void*), void* args)
123     {
124         int ret = pthread_create(&thread_, &attr_, ThreadFunc, args);
125         if (ret == PTHREAD_CREATE_NO_MEM_CODE) {
126             int count = 0;
127             while (ret == PTHREAD_CREATE_NO_MEM_CODE && count < FFRT_RETRY_MAX_COUNT) {
128                 usleep(FFRT_RETRY_CYCLE_LIST[count]);
129                 count++;
130                 FFRT_LOGW("pthread_create failed due to shortage of system memory, FFRT retry %d times...", count);
131                 ret = pthread_create(&thread_, &attr_, ThreadFunc, args);
132             }
133         }
134         if (ret != 0) {
135             FFRT_LOGE("pthread_create failed, ret = %d", ret);
136             exited = true;
137         }
138         pthread_attr_destroy(&attr_);
139     }
140 
Join()141     void Join()
142     {
143         if (tid > 0 && thread_ != 0) {
144             pthread_join(thread_, nullptr);
145         }
146         tid = -1;
147     }
148 
Detach()149     void Detach()
150     {
151         if (tid > 0 && thread_ != 0) {
152             pthread_detach(thread_);
153         } else {
154             FFRT_LOGD("qos %d thread not joinable.", qos());
155         }
156         tid = -1;
157     }
158 
GetThread()159     pthread_t& GetThread()
160     {
161         return this->thread_;
162     }
163 #else
164     template <typename F, typename... Args>
Start(F && f,Args &&...args)165     void Start(F&& f, Args&&... args)
166     {
167         auto wrap = [&](Args&&... args) {
168             NativeConfig();
169             return f(args...);
170         };
171         thread = std::thread(wrap, args...);
172     }
173 
Join()174     void Join()
175     {
176         if (thread.joinable()) {
177             thread.join();
178         }
179         tid = -1;
180     }
181 
Detach()182     void Detach()
183     {
184         if (thread.joinable()) {
185             thread.detach();
186         } else {
187             FFRT_LOGD("qos %d thread not joinable\n", qos());
188         }
189         tid = -1;
190     }
191 
GetThread()192     pthread_t GetThread()
193     {
194         return this->thread.native_handle();
195     }
196 #endif
197 
198     void SetThreadAttr(const QoS& newQos);
199     std::atomic<TaskBase*> curTask = nullptr;
200     std::atomic<uintptr_t> curTaskType_ {ffrt_invalid_task};
201     std::string curTaskLabel_ = ""; // 需要打开宏WORKER_CAHCE_NAMEID才会赋值
202     uint64_t curTaskGid_ = UINT64_MAX;
203     unsigned int tick = 0;
204 
205 private:
206     void NativeConfig();
207     static void WorkerLooper(CPUWorker* worker);
208     static void* WrapDispatch(void* worker);
209     void WorkerSetup();
210     static void Dispatch(CPUWorker* worker);
211     static void RunTask(TaskBase* task, CPUWorker* worker);
212     static bool RunSingleTask(int qos, CPUWorker *worker);
213 #ifdef FFRT_SEND_EVENT
214     int cacheQos; // cache int qos
215     std::string cacheLabel; // cache string label
216     uint64_t cacheFreq = 1000000; // cache cpu freq
217 #endif
218     std::atomic_bool exited {false};
219     std::atomic<pid_t> tid {-1};
220     QoS qos;
221     CpuWorkerOps ops;
222     WorkerStatus state {WorkerStatus::EXECUTING};
223     bool monitor_ = true;
224 #ifdef FFRT_PTHREAD_ENABLE
225     pthread_t thread_{0};
226     pthread_attr_t attr_;
227 #else
228     std::thread thread;
229 #endif
230 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
231     unsigned int domain_id;
232 #endif
233 };
234 } // namespace ffrt
235 #endif
236