1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef RECORDER_TASK_QUEUE_H 17 #define RECORDER_TASK_QUEUE_H 18 19 #include <thread> 20 #include <condition_variable> 21 #include <mutex> 22 #include <functional> 23 #include <list> 24 #include <string> 25 #include <optional> 26 #include <type_traits> 27 #include "media_errors.h" 28 #include "nocopyable.h" 29 #include "qos.h" 30 31 namespace OHOS { 32 namespace Media { 33 /** 34 * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models 35 * 36 * You can refer to following examples to use this utility. 37 * 38 * Example 1: 39 * TaskQueue taskQ("your_task_queue_name"); 40 * taskQ.Start(); 41 * auto handler1 = std::make_shared<TaskHandler<int32_t>>([]() { 42 * // your job's detail code; 43 * }); 44 * taskQ.EnqueueTask(handler1); 45 * auto result = handler1->GetResult(); 46 * if (result.HasResult()) { 47 * MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value()); 48 * } else { 49 * MEDIA_LOGI("handler1 not executed"); 50 * } 51 * 52 * Example 2: 53 * TaskQueue taskQ("your_task_queue_name"); 54 * taskQ.Start(); 55 * auto handler2 = std::make_shared<TaskHandler<void>>([]() { 56 * // your job's detail code; 57 * }); 58 * taskQ.EnqueueTask(handler2); 59 * auto result = handler2->GetResult(); 60 * if (result.HasResult()) { 61 * MEDIA_LOGI("handler2 executed"); 62 * } else { 63 * MEDIA_LOGI("handler2 not executed"); 64 * } 65 */ 66 67 class TaskQueue; 68 template <typename T> 69 class TaskHandler; 70 71 template <typename T> 72 struct TaskResult { HasResultTaskResult73 bool HasResult() 74 { 75 return val.has_value(); 76 } ValueTaskResult77 T Value() 78 { 79 return val.value(); 80 } 81 private: 82 friend class TaskHandler<T>; 83 std::optional<T> val; 84 }; 85 86 template <> 87 struct TaskResult<void> { 88 bool HasResult() 89 { 90 return executed; 91 } 92 private: 93 friend class TaskHandler<void>; 94 bool executed = false; 95 }; 96 97 class ITaskHandler { 98 public: 99 struct Attribute { 100 // periodic execute time, UINT64_MAX is not need to execute periodic. 101 uint64_t periodicTimeUs_ { UINT64_MAX }; 102 }; 103 virtual ~ITaskHandler() = default; 104 virtual void Execute() = 0; 105 virtual void Cancel() = 0; 106 virtual bool IsCanceled() = 0; 107 virtual Attribute GetAttribute() const = 0; 108 109 private: 110 // clear the internel executed or canceled state. 111 virtual void Clear() = 0; 112 friend class TaskQueue; 113 }; 114 115 template <typename T> 116 class TaskHandler : public ITaskHandler, public NoCopyable { 117 public: 118 TaskHandler(std::function<T(void)> task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {} 119 ~TaskHandler() = default; 120 121 void Execute() override 122 { 123 { 124 std::unique_lock<std::mutex> lock(mutex_); 125 if (state_ != TaskState::IDLE) { 126 return; 127 } 128 state_ = TaskState::RUNNING; 129 } 130 131 if constexpr (std::is_void_v<T>) { 132 task_(); 133 std::unique_lock<std::mutex> lock(mutex_); 134 state_ = TaskState::FINISHED; 135 result_.executed = true; 136 } else { 137 T result = task_(); 138 std::unique_lock<std::mutex> lock(mutex_); 139 state_ = TaskState::FINISHED; 140 result_.val = result; 141 } 142 cond_.notify_all(); 143 } 144 145 /* 146 * After the GetResult called, the last execute result will be clear 147 */ 148 TaskResult<T> GetResult() 149 { 150 std::unique_lock<std::mutex> lock(mutex_); 151 while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) { 152 cond_.wait(lock); 153 } 154 155 return ClearResult(); 156 } 157 158 void Cancel() override 159 { 160 std::unique_lock<std::mutex> lock(mutex_); 161 if (state_ != RUNNING) { 162 state_ = TaskState::CANCELED; 163 cond_.notify_all(); 164 } 165 } 166 167 bool IsCanceled() override 168 { 169 std::unique_lock<std::mutex> lock(mutex_); 170 return state_ == TaskState::CANCELED; 171 } 172 173 ITaskHandler::Attribute GetAttribute() const override 174 { 175 return attribute_; 176 } 177 178 private: 179 TaskResult<T> ClearResult() 180 { 181 if (state_ == TaskState::FINISHED) { 182 state_ = TaskState::IDLE; 183 TaskResult<T> tmp; 184 if constexpr (std::is_void_v<T>) { 185 std::swap(tmp.executed, result_.executed); 186 } else { 187 result_.val.swap(tmp.val); 188 } 189 return tmp; 190 } 191 return result_; 192 } 193 194 void Clear() override 195 { 196 std::unique_lock<std::mutex> lock(mutex_); 197 (void)ClearResult(); 198 } 199 200 enum TaskState { 201 IDLE = 0, 202 RUNNING = 1, 203 CANCELED = 2, 204 FINISHED = 3, 205 }; 206 207 TaskState state_ = TaskState::IDLE; 208 std::mutex mutex_; 209 std::condition_variable cond_; 210 std::function<T(void)> task_; 211 TaskResult<T> result_; 212 ITaskHandler::Attribute attribute_; // task execute attribute. 213 }; 214 215 class __attribute__((visibility("default"))) TaskQueue : public NoCopyable { 216 public: 217 explicit TaskQueue(const std::string &name) : name_(name) {} 218 ~TaskQueue(); 219 220 int32_t Start(); 221 int32_t Stop() noexcept; 222 void SetQos(const OHOS::QOS::QosLevel level); 223 void ResetQos(); 224 bool IsTaskExecuting(); 225 226 // delayUs cannot be gt 10000000ULL. 227 __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr<ITaskHandler> &task, 228 bool cancelNotExecuted = false, uint64_t delayUs = 0ULL); 229 230 private: 231 struct TaskHandlerItem { 232 std::shared_ptr<ITaskHandler> task_ { nullptr }; 233 uint64_t executeTimeNs_ { 0ULL }; 234 }; 235 __attribute__((no_sanitize("cfi"))) void TaskProcessor(); 236 __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked(); 237 238 bool isExit_ = true; 239 std::unique_ptr<std::thread> thread_; 240 std::list<TaskHandlerItem> taskList_; 241 std::mutex mutex_; 242 std::condition_variable cond_; 243 std::string name_; 244 pid_t tid_ = -1; 245 bool isTaskExecuting_ = false; 246 }; 247 } // namespace Media 248 } // namespace OHOS 249 #endif 250