• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef RECORDER_TASK_QUEUE_H
17 #define RECORDER_TASK_QUEUE_H
18 
19 #include <thread>
20 #include <condition_variable>
21 #include <mutex>
22 #include <functional>
23 #include <list>
24 #include <string>
25 #include <optional>
26 #include <type_traits>
27 #include "media_errors.h"
28 #include "nocopyable.h"
29 
30 namespace OHOS {
31 namespace Media {
32 /**
33  * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models
34  *
35  * You can refer to following examples to use this utility.
36  *
37  * Example 1:
38  * TaskQueue taskQ("your_task_queue_name");
39  * taskQ.Start();
40  * auto handler1 = std::make_shared<TaskHandler<int32_t>>([]() {
41  *     // your job's detail code;
42  * });
43  * taskQ.EnqueueTask(handler1);
44  * auto result = handler1->GetResult();
45  * if (result.HasResult()) {
46  *     MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value());
47  * } else {
48  *     MEDIA_LOGI("handler1 not executed");
49  * }
50  *
51  * Example 2:
52  * TaskQueue taskQ("your_task_queue_name");
53  * taskQ.Start();
54  * auto handler2 = std::make_shared<TaskHandler<void>>([]() {
55  *     // your job's detail code;
56  * });
57  * taskQ.EnqueueTask(handler2);
58  * auto result = handler2->GetResult();
59  * if (result.HasResult()) {
60  *     MEDIA_LOGI("handler2 executed");
61  * } else {
62  *     MEDIA_LOGI("handler2 not executed");
63  * }
64  */
65 
66 class TaskQueue;
67 template <typename T>
68 class TaskHandler;
69 
70 template <typename T>
71 struct TaskResult {
HasResultTaskResult72     bool HasResult()
73     {
74         return val.has_value();
75     }
ValueTaskResult76     T Value()
77     {
78         return val.value();
79     }
80 private:
81     friend class TaskHandler<T>;
82     std::optional<T> val;
83 };
84 
85 template <>
86 struct TaskResult<void> {
87     bool HasResult()
88     {
89         return executed;
90     }
91 private:
92     friend class TaskHandler<void>;
93     bool executed = false;
94 };
95 
96 class ITaskHandler {
97 public:
98     struct Attribute {
99         // periodic execute time, UINT64_MAX is not need to execute periodic.
100         uint64_t periodicTimeUs_ { UINT64_MAX };
101     };
102     virtual ~ITaskHandler() = default;
103     virtual void Execute() = 0;
104     virtual void Cancel() = 0;
105     virtual bool IsCanceled() = 0;
106     virtual Attribute GetAttribute() const = 0;
107 
108 private:
109     // clear the internel executed or canceled state.
110     virtual void Clear() = 0;
111     friend class TaskQueue;
112 };
113 
114 template <typename T>
115 class TaskHandler : public ITaskHandler, public NoCopyable {
116 public:
117     TaskHandler(std::function<T(void)> task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {}
118     ~TaskHandler() = default;
119 
120     void Execute() override
121     {
122         {
123             std::unique_lock<std::mutex> lock(mutex_);
124             if (state_ != TaskState::IDLE) {
125                 return;
126             }
127             state_ = TaskState::RUNNING;
128         }
129 
130         if constexpr (std::is_void_v<T>) {
131             task_();
132             std::unique_lock<std::mutex> lock(mutex_);
133             state_ = TaskState::FINISHED;
134             result_.executed = true;
135         } else {
136             T result = task_();
137             std::unique_lock<std::mutex> lock(mutex_);
138             state_ = TaskState::FINISHED;
139             result_.val = result;
140         }
141         cond_.notify_all();
142     }
143 
144     /*
145      * After the GetResult called, the last execute result will be clear
146      */
147     TaskResult<T> GetResult()
148     {
149         std::unique_lock<std::mutex> lock(mutex_);
150         while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) {
151             cond_.wait(lock);
152         }
153 
154         return ClearResult();
155     }
156 
157     void Cancel() override
158     {
159         std::unique_lock<std::mutex> lock(mutex_);
160         if (state_ != RUNNING) {
161             state_ = TaskState::CANCELED;
162             cond_.notify_all();
163         }
164     }
165 
166     bool IsCanceled() override
167     {
168         std::unique_lock<std::mutex> lock(mutex_);
169         return state_ == TaskState::CANCELED;
170     }
171 
172     ITaskHandler::Attribute GetAttribute() const override
173     {
174         return attribute_;
175     }
176 
177 private:
178     TaskResult<T> ClearResult()
179     {
180         if (state_ == TaskState::FINISHED) {
181             state_ = TaskState::IDLE;
182             TaskResult<T> tmp;
183             if constexpr (std::is_void_v<T>) {
184                 std::swap(tmp.executed, result_.executed);
185             } else {
186                 result_.val.swap(tmp.val);
187             }
188             return tmp;
189         }
190         return result_;
191     }
192 
193     void Clear() override
194     {
195         std::unique_lock<std::mutex> lock(mutex_);
196         (void)ClearResult();
197     }
198 
199     enum TaskState {
200         IDLE = 0,
201         RUNNING = 1,
202         CANCELED = 2,
203         FINISHED = 3,
204     };
205 
206     TaskState state_ = TaskState::IDLE;
207     std::mutex mutex_;
208     std::condition_variable cond_;
209     std::function<T(void)> task_;
210     TaskResult<T> result_;
211     ITaskHandler::Attribute attribute_; // task execute attribute.
212 };
213 
214 class __attribute__((visibility("default"))) TaskQueue : public NoCopyable {
215 public:
216     explicit TaskQueue(const std::string &name) : name_(name) {}
217     ~TaskQueue();
218 
219     int32_t Start();
220     int32_t Stop() noexcept;
221 
222     // delayUs cannot be gt 10000000ULL.
223     __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
224         bool cancelNotExecuted = false, uint64_t delayUs = 0ULL);
225 
226 private:
227     struct TaskHandlerItem {
228         std::shared_ptr<ITaskHandler> task_ { nullptr };
229         uint64_t executeTimeNs_ { 0ULL };
230     };
231     __attribute__((no_sanitize("cfi"))) void TaskProcessor();
232     __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked();
233 
234     bool isExit_ = true;
235     std::unique_ptr<std::thread> thread_;
236     std::list<TaskHandlerItem> taskList_;
237     std::mutex mutex_;
238     std::condition_variable cond_;
239     std::string name_;
240 };
241 } // namespace Media
242 } // namespace OHOS
243 #endif
244