• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef OH_VEF_RENDER_THREAD_H
16 #define OH_VEF_RENDER_THREAD_H
17 
18 #include "render/graphics/base/queue/RenderFifoQueue.h"
19 #include "render/graphics/base/task/render_task_interface.h"
20 #include "render/graphics/base/worker/render_work_interface.h"
21 #include <thread>
22 
23 #define RENDER_QUEUE_SIZE 4
24 #define COMMON_TASK_TAG 0
25 #define PREVIEW_TASK_TAG 1
26 #define EXPORT_TASK_TAG 2
27 #define SLEEP_TIMEOUT 1000
28 
29 namespace OHOS {
30 namespace Media {
31 template <typename QUEUE = RenderFifoQueue<RenderTaskPtr<void>>>
32 class RenderThread : public RenderWorkerItf<typename QUEUE::DataType> {
33 public:
34     typedef typename QUEUE::DataType LocalTaskType;
35     static_assert(std::is_base_of<RenderQueueItf<LocalTaskType>, QUEUE>::value,
36         "QUEUE should be derived from RenderQueueItf");
37 
38     explicit RenderThread(
39         size_t, std::function<void()> idleTsk = []() {});
40     ~RenderThread();
41     void AddTask(
42         const LocalTaskType&, bool overwrite = false, std::function<void()> callback = []() {}) override;
43     void Start() override;
44     void Stop() override;
45     void ClearTaskQueue();
46 
47 protected:
48     void Run() override;
49 
50     QUEUE* localMsgQueue_ = nullptr;
51     volatile bool isWorking_ = false;
52     volatile bool isStopped_ = true;
53 
54     std::mutex cvMutex_;
55     std::condition_variable cvFull_;
56     std::condition_variable cvEmpty_;
57     std::function<void()> idleTask_;
58 
59     std::thread* thread_{ nullptr };
60     size_t qSize_;
61 };
62 
63 template <typename QUEUE>
RenderThread(size_t queueSize,std::function<void ()> idleTsk)64 RenderThread<QUEUE>::RenderThread(size_t queueSize, std::function<void()> idleTsk)
65     : idleTask_(idleTsk), qSize_(queueSize)
66 {
67     localMsgQueue_ = new (std::nothrow) QUEUE();
68 }
69 
~RenderThread()70 template <typename QUEUE> RenderThread<QUEUE>::~RenderThread()
71 {
72     Stop();
73     if (thread_ != nullptr) {
74         thread_->join();
75         delete thread_;
76         thread_ = nullptr;
77     }
78     if (localMsgQueue_ != nullptr) {
79         delete localMsgQueue_;
80         localMsgQueue_ = nullptr;
81     }
82 }
83 
84 template <typename QUEUE>
AddTask(const LocalTaskType & task,bool overwrite,std::function<void ()> callback)85 void RenderThread<QUEUE>::AddTask(const LocalTaskType& task, bool overwrite, std::function<void()> callback)
86 {
87     std::unique_lock<std::mutex> lk(cvMutex_);
88     if (localMsgQueue_ == nullptr) {
89         return;
90     }
91     cvFull_.wait(lk, [this, &task]() {
92         return (GetTag(task) == PREVIEW_TASK_TAG) || (localMsgQueue_->GetSize() < this->qSize_) || (!isWorking_);
93     });
94     if (isWorking_) {
95         if (overwrite) {
96             LocalTaskType thread_ = localMsgQueue_->Find(task);
97             if (thread_ != nullptr) {
98                 SetFunc(thread_, callback);
99                 SetTag(thread_, COMMON_TASK_TAG);
100             }
101         }
102         localMsgQueue_->Push(task);
103         lk.unlock();
104         cvEmpty_.notify_one();
105     }
106 }
107 
Start()108 template <typename QUEUE> void RenderThread<QUEUE>::Start()
109 {
110     if (isStopped_) {
111         {
112             std::unique_lock<std::mutex> lk(cvMutex_);
113             isWorking_ = true;
114         }
115         thread_ = new (std::nothrow) std::thread([this]() {
116             this->isStopped_ = false;
117             this->Run();
118             this->isStopped_ = true;
119         });
120         while (isStopped_) {
121             std::this_thread::sleep_for(std::chrono::microseconds(SLEEP_TIMEOUT));
122         }
123     }
124 }
125 
Stop()126 template <typename QUEUE> void RenderThread<QUEUE>::Stop()
127 {
128     {
129         std::unique_lock<std::mutex> lk(cvMutex_);
130         isWorking_ = false;
131     }
132     cvEmpty_.notify_all();
133     while (!isStopped_) {
134         std::this_thread::sleep_for(std::chrono::microseconds(SLEEP_TIMEOUT));
135     }
136 }
137 
ClearTaskQueue()138 template <typename QUEUE> void RenderThread<QUEUE>::ClearTaskQueue()
139 {
140     std::unique_lock<std::mutex> lk(cvMutex_);
141     if (localMsgQueue_ == nullptr) {
142         return;
143     }
144     localMsgQueue_->RemoveAll();
145 }
146 
Run()147 template <typename QUEUE> void RenderThread<QUEUE>::Run()
148 {
149     while (isWorking_) {
150         std::unique_lock<std::mutex> lk(cvMutex_);
151         if (localMsgQueue_ == nullptr) {
152             return;
153         }
154         bool cvRet = cvEmpty_.wait_for(lk, std::chrono::milliseconds(2500),
155             [this]() { return (localMsgQueue_->GetSize() > 0) || (!isWorking_); });
156         if (cvRet) {
157             LocalTaskType task;
158             bool ret = localMsgQueue_->Pop(task);
159             lk.unlock();
160             cvFull_.notify_one();
161             if (ret) {
162                 task->Run();
163             }
164         } else {
165             lk.unlock();
166             idleTask_();
167         }
168     }
169 };
170 }
171 }
172 
173 #endif