1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_CPU_WORKER_HPP 17 #define FFRT_CPU_WORKER_HPP 18 19 #include <atomic> 20 #include <unistd.h> 21 #ifdef FFRT_PTHREAD_ENABLE 22 #include <pthread.h> 23 #endif 24 #include <thread> 25 #ifdef OHOS_THREAD_STACK_DUMP 26 #include <sstream> 27 #endif 28 #ifdef USE_OHOS_QOS 29 #include "qos.h" 30 #else 31 #include "staging_qos/sched/qos.h" 32 #endif 33 #include "tm/task_base.h" 34 #include "dfx/log/ffrt_log_api.h" 35 #include "c/executor_task.h" 36 #include "util/spmc_queue.h" 37 38 namespace ffrt { 39 constexpr int PTHREAD_CREATE_NO_MEM_CODE = 11; 40 constexpr int FFRT_RETRY_MAX_COUNT = 12; 41 const std::vector<uint64_t> FFRT_RETRY_CYCLE_LIST = { 42 10 * 1000, 50 * 1000, 100 * 1000, 200 * 1000, 500 * 1000, 1000 * 1000, 2 * 1000 * 1000, 43 5 * 1000 * 1000, 10 * 1000 * 1000, 50 * 1000 * 1000, 100 * 1000 * 1000, 500 * 1000 * 1000 44 }; 45 46 enum class WorkerAction { 47 RETRY = 0, 48 RETIRE, 49 MAX, 50 }; 51 52 enum class WorkerStatus { 53 EXECUTING = 0, 54 SLEEPING, 55 DESTROYED, 56 }; 57 58 class CPUWorker; 59 struct CpuWorkerOps { 60 std::function<WorkerAction (CPUWorker*)> WorkerIdleAction; 61 std::function<void (CPUWorker*)> WorkerRetired; 62 std::function<void (CPUWorker*)> WorkerPrepare; 63 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 64 std::function<bool (void)> IsBlockAwareInit; 65 #endif 66 }; 67 68 class CPUWorker { 69 public: 70 explicit CPUWorker(const QoS& qos, CpuWorkerOps&& ops, size_t stackSize); 71 ~CPUWorker(); 72 Exited()73 bool Exited() const 74 { 75 return exited.load(std::memory_order_relaxed); 76 } 77 SetExited()78 void SetExited() 79 { 80 exited.store(true, std::memory_order_relaxed); 81 } 82 Id()83 pid_t Id() const 84 { 85 while (!exited && tid < 0) { 86 } 87 return tid; 88 } 89 GetQos()90 const QoS& GetQos() const 91 { 92 return qos; 93 } 94 GetWorkerState()95 const WorkerStatus& GetWorkerState() const 96 { 97 return state; 98 } 99 SetWorkerState(const WorkerStatus & newState)100 void SetWorkerState(const WorkerStatus& newState) 101 { 102 this->state = newState; 103 } 104 SetWorkerMonitorStatus(bool monitor)105 void SetWorkerMonitorStatus(bool monitor) 106 { 107 monitor_ = monitor; 108 } 109 Monitor()110 bool Monitor() const 111 { 112 return monitor_; 113 } 114 115 #ifdef FFRT_WORKERS_DYNAMIC_SCALING GetDomainId()116 unsigned int GetDomainId() const 117 { 118 return domain_id; 119 } 120 #endif 121 #ifdef FFRT_PTHREAD_ENABLE Start(void * (* ThreadFunc)(void *),void * args)122 void Start(void*(*ThreadFunc)(void*), void* args) 123 { 124 int ret = pthread_create(&thread_, &attr_, ThreadFunc, args); 125 if (ret == PTHREAD_CREATE_NO_MEM_CODE) { 126 int count = 0; 127 while (ret == PTHREAD_CREATE_NO_MEM_CODE && count < FFRT_RETRY_MAX_COUNT) { 128 usleep(FFRT_RETRY_CYCLE_LIST[count]); 129 count++; 130 FFRT_LOGW("pthread_create failed due to shortage of system memory, FFRT retry %d times...", count); 131 ret = pthread_create(&thread_, &attr_, ThreadFunc, args); 132 } 133 } 134 if (ret != 0) { 135 FFRT_LOGE("pthread_create failed, ret = %d", ret); 136 exited = true; 137 } 138 pthread_attr_destroy(&attr_); 139 } 140 Join()141 void Join() 142 { 143 if (tid > 0 && thread_ != 0) { 144 pthread_join(thread_, nullptr); 145 } 146 tid = -1; 147 } 148 Detach()149 void Detach() 150 { 151 if (tid > 0 && thread_ != 0) { 152 pthread_detach(thread_); 153 } else { 154 FFRT_LOGD("qos %d thread not joinable.", qos()); 155 } 156 tid = -1; 157 } 158 GetThread()159 pthread_t& GetThread() 160 { 161 return this->thread_; 162 } 163 #else 164 template <typename F, typename... Args> Start(F && f,Args &&...args)165 void Start(F&& f, Args&&... args) 166 { 167 auto wrap = [&](Args&&... args) { 168 NativeConfig(); 169 return f(args...); 170 }; 171 thread = std::thread(wrap, args...); 172 } 173 Join()174 void Join() 175 { 176 if (thread.joinable()) { 177 thread.join(); 178 } 179 tid = -1; 180 } 181 Detach()182 void Detach() 183 { 184 if (thread.joinable()) { 185 thread.detach(); 186 } else { 187 FFRT_LOGD("qos %d thread not joinable\n", qos()); 188 } 189 tid = -1; 190 } 191 GetThread()192 pthread_t GetThread() 193 { 194 return this->thread.native_handle(); 195 } 196 #endif 197 198 void SetThreadAttr(const QoS& newQos); 199 std::atomic<TaskBase*> curTask = nullptr; 200 std::atomic<uintptr_t> curTaskType_ {ffrt_invalid_task}; 201 std::string curTaskLabel_ = ""; // 需要打开宏WORKER_CAHCE_NAMEID才会赋值 202 uint64_t curTaskGid_ = UINT64_MAX; 203 unsigned int tick = 0; 204 205 private: 206 void NativeConfig(); 207 static void WorkerLooper(CPUWorker* worker); 208 static void* WrapDispatch(void* worker); 209 void WorkerSetup(); 210 static void Dispatch(CPUWorker* worker); 211 static void RunTask(TaskBase* task, CPUWorker* worker); 212 static bool RunSingleTask(int qos, CPUWorker *worker); 213 #ifdef FFRT_SEND_EVENT 214 int cacheQos; // cache int qos 215 std::string cacheLabel; // cache string label 216 uint64_t cacheFreq = 1000000; // cache cpu freq 217 #endif 218 std::atomic_bool exited {false}; 219 std::atomic<pid_t> tid {-1}; 220 QoS qos; 221 CpuWorkerOps ops; 222 WorkerStatus state {WorkerStatus::EXECUTING}; 223 bool monitor_ = true; 224 #ifdef FFRT_PTHREAD_ENABLE 225 pthread_t thread_{0}; 226 pthread_attr_t attr_; 227 #else 228 std::thread thread; 229 #endif 230 #ifdef FFRT_WORKERS_DYNAMIC_SCALING 231 unsigned int domain_id; 232 #endif 233 }; 234 } // namespace ffrt 235 #endif 236