• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef RECORDER_TASK_QUEUE_H
17 #define RECORDER_TASK_QUEUE_H
18 
19 #include <thread>
20 #include <condition_variable>
21 #include <mutex>
22 #include <functional>
23 #include <list>
24 #include <string>
25 #include <optional>
26 #include <type_traits>
27 #include "media_errors.h"
28 #include "nocopyable.h"
29 #include "qos.h"
30 
31 namespace OHOS {
32 namespace Media {
33 /**
34  * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models
35  *
36  * You can refer to following examples to use this utility.
37  *
38  * Example 1:
39  * TaskQueue taskQ("your_task_queue_name");
40  * taskQ.Start();
41  * auto handler1 = std::make_shared<TaskHandler<int32_t>>([]() {
42  *     // your job's detail code;
43  * });
44  * taskQ.EnqueueTask(handler1);
45  * auto result = handler1->GetResult();
46  * if (result.HasResult()) {
47  *     MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value());
48  * } else {
49  *     MEDIA_LOGI("handler1 not executed");
50  * }
51  *
52  * Example 2:
53  * TaskQueue taskQ("your_task_queue_name");
54  * taskQ.Start();
55  * auto handler2 = std::make_shared<TaskHandler<void>>([]() {
56  *     // your job's detail code;
57  * });
58  * taskQ.EnqueueTask(handler2);
59  * auto result = handler2->GetResult();
60  * if (result.HasResult()) {
61  *     MEDIA_LOGI("handler2 executed");
62  * } else {
63  *     MEDIA_LOGI("handler2 not executed");
64  * }
65  */
66 
67 class TaskQueue;
68 template <typename T>
69 class TaskHandler;
70 
71 template <typename T>
72 struct TaskResult {
HasResultTaskResult73     bool HasResult()
74     {
75         return val.has_value();
76     }
ValueTaskResult77     T Value()
78     {
79         return val.value();
80     }
81 private:
82     friend class TaskHandler<T>;
83     std::optional<T> val;
84 };
85 
86 template <>
87 struct TaskResult<void> {
88     bool HasResult()
89     {
90         return executed;
91     }
92 private:
93     friend class TaskHandler<void>;
94     bool executed = false;
95 };
96 
97 class ITaskHandler {
98 public:
99     struct Attribute {
100         // periodic execute time, UINT64_MAX is not need to execute periodic.
101         uint64_t periodicTimeUs_ { UINT64_MAX };
102     };
103     virtual ~ITaskHandler() = default;
104     virtual void Execute() = 0;
105     virtual void Cancel() = 0;
106     virtual bool IsCanceled() = 0;
107     virtual Attribute GetAttribute() const = 0;
108 
109 private:
110     // clear the internel executed or canceled state.
111     virtual void Clear() = 0;
112     friend class TaskQueue;
113 };
114 
115 template <typename T>
116 class TaskHandler : public ITaskHandler, public NoCopyable {
117 public:
118     TaskHandler(std::function<T(void)> task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {}
119     ~TaskHandler() = default;
120 
121     void Execute() override
122     {
123         {
124             std::unique_lock<std::mutex> lock(mutex_);
125             if (state_ != TaskState::IDLE) {
126                 return;
127             }
128             state_ = TaskState::RUNNING;
129         }
130 
131         if constexpr (std::is_void_v<T>) {
132             task_();
133             std::unique_lock<std::mutex> lock(mutex_);
134             state_ = TaskState::FINISHED;
135             result_.executed = true;
136         } else {
137             T result = task_();
138             std::unique_lock<std::mutex> lock(mutex_);
139             state_ = TaskState::FINISHED;
140             result_.val = result;
141         }
142         cond_.notify_all();
143     }
144 
145     /*
146      * After the GetResult called, the last execute result will be clear
147      */
148     TaskResult<T> GetResult()
149     {
150         std::unique_lock<std::mutex> lock(mutex_);
151         while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) {
152             cond_.wait(lock);
153         }
154 
155         return ClearResult();
156     }
157 
158     void Cancel() override
159     {
160         std::unique_lock<std::mutex> lock(mutex_);
161         if (state_ != RUNNING) {
162             state_ = TaskState::CANCELED;
163             cond_.notify_all();
164         }
165     }
166 
167     bool IsCanceled() override
168     {
169         std::unique_lock<std::mutex> lock(mutex_);
170         return state_ == TaskState::CANCELED;
171     }
172 
173     ITaskHandler::Attribute GetAttribute() const override
174     {
175         return attribute_;
176     }
177 
178 private:
179     TaskResult<T> ClearResult()
180     {
181         if (state_ == TaskState::FINISHED) {
182             state_ = TaskState::IDLE;
183             TaskResult<T> tmp;
184             if constexpr (std::is_void_v<T>) {
185                 std::swap(tmp.executed, result_.executed);
186             } else {
187                 result_.val.swap(tmp.val);
188             }
189             return tmp;
190         }
191         return result_;
192     }
193 
194     void Clear() override
195     {
196         std::unique_lock<std::mutex> lock(mutex_);
197         (void)ClearResult();
198     }
199 
200     enum TaskState {
201         IDLE = 0,
202         RUNNING = 1,
203         CANCELED = 2,
204         FINISHED = 3,
205     };
206 
207     TaskState state_ = TaskState::IDLE;
208     std::mutex mutex_;
209     std::condition_variable cond_;
210     std::function<T(void)> task_;
211     TaskResult<T> result_;
212     ITaskHandler::Attribute attribute_; // task execute attribute.
213 };
214 
215 class __attribute__((visibility("default"))) TaskQueue : public NoCopyable {
216 public:
217     explicit TaskQueue(const std::string &name) : name_(name) {}
218     ~TaskQueue();
219 
220     int32_t Start();
221     int32_t Stop() noexcept;
222     void SetQos(const OHOS::QOS::QosLevel level);
223     void ResetQos();
224     bool IsTaskExecuting();
225 
226     // delayUs cannot be gt 10000000ULL.
227     __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
228         bool cancelNotExecuted = false, uint64_t delayUs = 0ULL);
229 
230 private:
231     struct TaskHandlerItem {
232         std::shared_ptr<ITaskHandler> task_ { nullptr };
233         uint64_t executeTimeNs_ { 0ULL };
234     };
235     __attribute__((no_sanitize("cfi"))) void TaskProcessor();
236     __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked();
237 
238     bool isExit_ = true;
239     std::unique_ptr<std::thread> thread_;
240     std::list<TaskHandlerItem> taskList_;
241     std::mutex mutex_;
242     std::condition_variable cond_;
243     std::string name_;
244     pid_t tid_ = -1;
245     bool isTaskExecuting_ = false;
246 };
247 } // namespace Media
248 } // namespace OHOS
249 #endif
250