1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef RECORDER_TASK_QUEUE_H 17 #define RECORDER_TASK_QUEUE_H 18 19 #include <thread> 20 #include <condition_variable> 21 #include <mutex> 22 #include <functional> 23 #include <list> 24 #include <string> 25 #include <optional> 26 #include <type_traits> 27 #include "media_errors.h" 28 #include "nocopyable.h" 29 30 namespace OHOS { 31 namespace Media { 32 /** 33 * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models 34 * 35 * You can refer to following examples to use this utility. 36 * 37 * Example 1: 38 * TaskQueue taskQ("your_task_queue_name"); 39 * taskQ.Start(); 40 * auto handler1 = std::make_shared<TaskHandler<int32_t>>([]() { 41 * // your job's detail code; 42 * }); 43 * taskQ.EnqueueTask(handler1); 44 * auto result = handler1->GetResult(); 45 * if (result.HasResult()) { 46 * MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value()); 47 * } else { 48 * MEDIA_LOGI("handler1 not executed"); 49 * } 50 * 51 * Example 2: 52 * TaskQueue taskQ("your_task_queue_name"); 53 * taskQ.Start(); 54 * auto handler2 = std::make_shared<TaskHandler<void>>([]() { 55 * // your job's detail code; 56 * }); 57 * taskQ.EnqueueTask(handler2); 58 * auto result = handler2->GetResult(); 59 * if (result.HasResult()) { 60 * MEDIA_LOGI("handler2 executed"); 61 * } else { 62 * MEDIA_LOGI("handler2 not executed"); 63 * } 64 */ 65 66 class TaskQueue; 67 template <typename T> 68 class TaskHandler; 69 70 template <typename T> 71 struct TaskResult { HasResultTaskResult72 bool HasResult() 73 { 74 return val.has_value(); 75 } ValueTaskResult76 T Value() 77 { 78 return val.value(); 79 } 80 private: 81 friend class TaskHandler<T>; 82 std::optional<T> val; 83 }; 84 85 template <> 86 struct TaskResult<void> { 87 bool HasResult() 88 { 89 return executed; 90 } 91 private: 92 friend class TaskHandler<void>; 93 bool executed = false; 94 }; 95 96 class ITaskHandler { 97 public: 98 struct Attribute { 99 // periodic execute time, UINT64_MAX is not need to execute periodic. 100 uint64_t periodicTimeUs_ { UINT64_MAX }; 101 }; 102 virtual ~ITaskHandler() = default; 103 virtual void Execute() = 0; 104 virtual void Cancel() = 0; 105 virtual bool IsCanceled() = 0; 106 virtual Attribute GetAttribute() const = 0; 107 108 private: 109 // clear the internel executed or canceled state. 110 virtual void Clear() = 0; 111 friend class TaskQueue; 112 }; 113 114 template <typename T> 115 class TaskHandler : public ITaskHandler, public NoCopyable { 116 public: 117 TaskHandler(std::function<T(void)> task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {} 118 ~TaskHandler() = default; 119 120 void Execute() override 121 { 122 { 123 std::unique_lock<std::mutex> lock(mutex_); 124 if (state_ != TaskState::IDLE) { 125 return; 126 } 127 state_ = TaskState::RUNNING; 128 } 129 130 if constexpr (std::is_void_v<T>) { 131 task_(); 132 std::unique_lock<std::mutex> lock(mutex_); 133 state_ = TaskState::FINISHED; 134 result_.executed = true; 135 } else { 136 T result = task_(); 137 std::unique_lock<std::mutex> lock(mutex_); 138 state_ = TaskState::FINISHED; 139 result_.val = result; 140 } 141 cond_.notify_all(); 142 } 143 144 /* 145 * After the GetResult called, the last execute result will be clear 146 */ 147 TaskResult<T> GetResult() 148 { 149 std::unique_lock<std::mutex> lock(mutex_); 150 while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) { 151 cond_.wait(lock); 152 } 153 154 return ClearResult(); 155 } 156 157 void Cancel() override 158 { 159 std::unique_lock<std::mutex> lock(mutex_); 160 if (state_ != RUNNING) { 161 state_ = TaskState::CANCELED; 162 cond_.notify_all(); 163 } 164 } 165 166 bool IsCanceled() override 167 { 168 std::unique_lock<std::mutex> lock(mutex_); 169 return state_ == TaskState::CANCELED; 170 } 171 172 ITaskHandler::Attribute GetAttribute() const override 173 { 174 return attribute_; 175 } 176 177 private: 178 TaskResult<T> ClearResult() 179 { 180 if (state_ == TaskState::FINISHED) { 181 state_ = TaskState::IDLE; 182 TaskResult<T> tmp; 183 if constexpr (std::is_void_v<T>) { 184 std::swap(tmp.executed, result_.executed); 185 } else { 186 result_.val.swap(tmp.val); 187 } 188 return tmp; 189 } 190 return result_; 191 } 192 193 void Clear() override 194 { 195 std::unique_lock<std::mutex> lock(mutex_); 196 (void)ClearResult(); 197 } 198 199 enum TaskState { 200 IDLE = 0, 201 RUNNING = 1, 202 CANCELED = 2, 203 FINISHED = 3, 204 }; 205 206 TaskState state_ = TaskState::IDLE; 207 std::mutex mutex_; 208 std::condition_variable cond_; 209 std::function<T(void)> task_; 210 TaskResult<T> result_; 211 ITaskHandler::Attribute attribute_; // task execute attribute. 212 }; 213 214 class __attribute__((visibility("default"))) TaskQueue : public NoCopyable { 215 public: 216 explicit TaskQueue(const std::string &name) : name_(name) {} 217 ~TaskQueue(); 218 219 int32_t Start(); 220 int32_t Stop() noexcept; 221 222 // delayUs cannot be gt 10000000ULL. 223 __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr<ITaskHandler> &task, 224 bool cancelNotExecuted = false, uint64_t delayUs = 0ULL); 225 226 private: 227 struct TaskHandlerItem { 228 std::shared_ptr<ITaskHandler> task_ { nullptr }; 229 uint64_t executeTimeNs_ { 0ULL }; 230 }; 231 __attribute__((no_sanitize("cfi"))) void TaskProcessor(); 232 __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked(); 233 234 bool isExit_ = true; 235 std::unique_ptr<std::thread> thread_; 236 std::list<TaskHandlerItem> taskList_; 237 std::mutex mutex_; 238 std::condition_variable cond_; 239 std::string name_; 240 }; 241 } // namespace Media 242 } // namespace OHOS 243 #endif 244