1 /*
2 * Copyright (C) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifndef OH_VEF_RENDER_THREAD_H
16 #define OH_VEF_RENDER_THREAD_H
17
18 #include "render/graphics/base/queue/RenderFifoQueue.h"
19 #include "render/graphics/base/task/render_task_interface.h"
20 #include "render/graphics/base/worker/render_work_interface.h"
21 #include <thread>
22
23 #define RENDER_QUEUE_SIZE 4
24 #define COMMON_TASK_TAG 0
25 #define PREVIEW_TASK_TAG 1
26 #define EXPORT_TASK_TAG 2
27 #define SLEEP_TIMEOUT 1000
28
29 namespace OHOS {
30 namespace Media {
31 template <typename QUEUE = RenderFifoQueue<RenderTaskPtr<void>>>
32 class RenderThread : public RenderWorkerItf<typename QUEUE::DataType> {
33 public:
34 typedef typename QUEUE::DataType LocalTaskType;
35 static_assert(std::is_base_of<RenderQueueItf<LocalTaskType>, QUEUE>::value,
36 "QUEUE should be derived from RenderQueueItf");
37
38 explicit RenderThread(
39 size_t, std::function<void()> idleTsk = []() {});
40 ~RenderThread();
41 void AddTask(
42 const LocalTaskType&, bool overwrite = false, std::function<void()> callback = []() {}) override;
43 void Start() override;
44 void Stop() override;
45 void ClearTaskQueue();
46
47 protected:
48 void Run() override;
49
50 QUEUE* localMsgQueue_ = nullptr;
51 volatile bool isWorking_ = false;
52 volatile bool isStopped_ = true;
53
54 std::mutex cvMutex_;
55 std::condition_variable cvFull_;
56 std::condition_variable cvEmpty_;
57 std::function<void()> idleTask_;
58
59 std::thread* thread_{ nullptr };
60 size_t qSize_;
61 };
62
63 template <typename QUEUE>
RenderThread(size_t queueSize,std::function<void ()> idleTsk)64 RenderThread<QUEUE>::RenderThread(size_t queueSize, std::function<void()> idleTsk)
65 : idleTask_(idleTsk), qSize_(queueSize)
66 {
67 localMsgQueue_ = new (std::nothrow) QUEUE();
68 }
69
~RenderThread()70 template <typename QUEUE> RenderThread<QUEUE>::~RenderThread()
71 {
72 Stop();
73 if (thread_ != nullptr) {
74 thread_->join();
75 delete thread_;
76 thread_ = nullptr;
77 }
78 if (localMsgQueue_ != nullptr) {
79 delete localMsgQueue_;
80 localMsgQueue_ = nullptr;
81 }
82 }
83
84 template <typename QUEUE>
AddTask(const LocalTaskType & task,bool overwrite,std::function<void ()> callback)85 void RenderThread<QUEUE>::AddTask(const LocalTaskType& task, bool overwrite, std::function<void()> callback)
86 {
87 std::unique_lock<std::mutex> lk(cvMutex_);
88 if (localMsgQueue_ == nullptr) {
89 return;
90 }
91 cvFull_.wait(lk, [this, &task]() {
92 return (GetTag(task) == PREVIEW_TASK_TAG) || (localMsgQueue_->GetSize() < this->qSize_) || (!isWorking_);
93 });
94 if (isWorking_) {
95 if (overwrite) {
96 LocalTaskType thread_ = localMsgQueue_->Find(task);
97 if (thread_ != nullptr) {
98 SetFunc(thread_, callback);
99 SetTag(thread_, COMMON_TASK_TAG);
100 }
101 }
102 localMsgQueue_->Push(task);
103 lk.unlock();
104 cvEmpty_.notify_one();
105 }
106 }
107
Start()108 template <typename QUEUE> void RenderThread<QUEUE>::Start()
109 {
110 if (isStopped_) {
111 {
112 std::unique_lock<std::mutex> lk(cvMutex_);
113 isWorking_ = true;
114 }
115 thread_ = new (std::nothrow) std::thread([this]() {
116 this->isStopped_ = false;
117 this->Run();
118 this->isStopped_ = true;
119 });
120 while (isStopped_) {
121 std::this_thread::sleep_for(std::chrono::microseconds(SLEEP_TIMEOUT));
122 }
123 }
124 }
125
Stop()126 template <typename QUEUE> void RenderThread<QUEUE>::Stop()
127 {
128 {
129 std::unique_lock<std::mutex> lk(cvMutex_);
130 isWorking_ = false;
131 }
132 cvEmpty_.notify_all();
133 while (!isStopped_) {
134 std::this_thread::sleep_for(std::chrono::microseconds(SLEEP_TIMEOUT));
135 }
136 }
137
ClearTaskQueue()138 template <typename QUEUE> void RenderThread<QUEUE>::ClearTaskQueue()
139 {
140 std::unique_lock<std::mutex> lk(cvMutex_);
141 if (localMsgQueue_ == nullptr) {
142 return;
143 }
144 localMsgQueue_->RemoveAll();
145 }
146
Run()147 template <typename QUEUE> void RenderThread<QUEUE>::Run()
148 {
149 while (isWorking_) {
150 std::unique_lock<std::mutex> lk(cvMutex_);
151 if (localMsgQueue_ == nullptr) {
152 return;
153 }
154 bool cvRet = cvEmpty_.wait_for(lk, std::chrono::milliseconds(2500),
155 [this]() { return (localMsgQueue_->GetSize() > 0) || (!isWorking_); });
156 if (cvRet) {
157 LocalTaskType task;
158 bool ret = localMsgQueue_->Pop(task);
159 lk.unlock();
160 cvFull_.notify_one();
161 if (ret) {
162 task->Run();
163 }
164 } else {
165 lk.unlock();
166 idleTask_();
167 }
168 }
169 };
170 }
171 }
172
173 #endif