1 /*
2 * Copyright (C) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "thumbnail_generate_worker.h"
17
18 #include <pthread.h>
19
20 #include "medialibrary_errno.h"
21 #include "medialibrary_notify.h"
22 #include "media_log.h"
23
24 namespace OHOS {
25 namespace Media {
26 static constexpr int32_t THREAD_NUM_FOREGROUND = 4;
27 static constexpr int32_t THREAD_NUM_BACKGROUND = 2;
28 constexpr size_t TASK_INSERT_COUNT = 15;
29 constexpr size_t CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL = 270000;
30
~ThumbnailGenerateWorker()31 ThumbnailGenerateWorker::~ThumbnailGenerateWorker()
32 {
33 ClearWorkerThreads();
34
35 if (timerId_ != 0) {
36 timer_.Unregister(timerId_);
37 timer_.Shutdown();
38 timerId_ = 0;
39 }
40 }
41
Init(const ThumbnailTaskType & taskType)42 int32_t ThumbnailGenerateWorker::Init(const ThumbnailTaskType &taskType)
43 {
44 std::unique_lock<std::mutex> lock(taskMutex_);
45 if (!threads_.empty()) {
46 return E_OK;
47 }
48
49 MEDIA_INFO_LOG("threads empty, need to init, taskType:%{public}d", taskType_);
50 int32_t threadNum;
51 std::string threadName;
52 taskType_ = taskType;
53 if (taskType == ThumbnailTaskType::FOREGROUND) {
54 threadNum = THREAD_NUM_FOREGROUND;
55 threadName = THREAD_NAME_FOREGROUND;
56 } else if (taskType == ThumbnailTaskType::BACKGROUND) {
57 threadNum = THREAD_NUM_BACKGROUND;
58 threadName = THREAD_NAME_BACKGROUND;
59 } else {
60 MEDIA_ERR_LOG("invalid task type");
61 return E_ERR;
62 }
63
64 isThreadRunning_ = true;
65 for (auto i = 0; i < threadNum; i++) {
66 std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus =
67 std::make_shared<ThumbnailGenerateThreadStatus>(i);
68 std::thread thread([this, threadStatus] { this->StartWorker(threadStatus); });
69 pthread_setname_np(thread.native_handle(), threadName.c_str());
70 threads_.emplace_back(std::move(thread));
71 threadsStatus_.emplace_back(threadStatus);
72 }
73 lock.unlock();
74 RegisterWorkerTimer();
75 return E_OK;
76 }
77
ReleaseTaskQueue(const ThumbnailTaskPriority & taskPriority)78 int32_t ThumbnailGenerateWorker::ReleaseTaskQueue(const ThumbnailTaskPriority &taskPriority)
79 {
80 if (taskPriority == ThumbnailTaskPriority::HIGH) {
81 highPriorityTaskQueue_.Clear();
82 } else if (taskPriority == ThumbnailTaskPriority::MID) {
83 midPriorityTaskQueue_.Clear();
84 } else if (taskPriority == ThumbnailTaskPriority::LOW) {
85 lowPriorityTaskQueue_.Clear();
86 } else {
87 MEDIA_ERR_LOG("invalid task priority");
88 return E_ERR;
89 }
90 return E_OK;
91 }
92
AddTask(const std::shared_ptr<ThumbnailGenerateTask> & task,const ThumbnailTaskPriority & taskPriority)93 int32_t ThumbnailGenerateWorker::AddTask(
94 const std::shared_ptr<ThumbnailGenerateTask> &task, const ThumbnailTaskPriority &taskPriority)
95 {
96 IncreaseRequestIdTaskNum(task);
97 if (taskPriority == ThumbnailTaskPriority::HIGH) {
98 highPriorityTaskQueue_.Push(task);
99 } else if (taskPriority == ThumbnailTaskPriority::MID) {
100 midPriorityTaskQueue_.Push(task);
101 } else if (taskPriority == ThumbnailTaskPriority::LOW) {
102 lowPriorityTaskQueue_.Push(task);
103 } else {
104 MEDIA_ERR_LOG("invalid task priority");
105 return E_ERR;
106 }
107
108 Init(taskType_);
109 workerCv_.notify_one();
110 return E_OK;
111 }
112
IgnoreTaskByRequestId(int32_t requestId)113 void ThumbnailGenerateWorker::IgnoreTaskByRequestId(int32_t requestId)
114 {
115 if (highPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Empty()) {
116 MEDIA_INFO_LOG("task queue empty, no need to ignore task requestId: %{public}d", requestId);
117 return;
118 }
119 ignoreRequestId_.store(requestId);
120
121 std::unique_lock<std::mutex> lock(requestIdMapLock_);
122 requestIdTaskMap_.erase(requestId);
123 MEDIA_INFO_LOG("IgnoreTaskByRequestId, requestId: %{public}d", requestId);
124 }
125
WaitForTask(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)126 bool ThumbnailGenerateWorker::WaitForTask(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)
127 {
128 std::unique_lock<std::mutex> lock(workerLock_);
129 if (highPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Empty() &&
130 isThreadRunning_) {
131 ignoreRequestId_ = 0;
132 threadStatus->isThreadWaiting_ = true;
133 bool ret = workerCv_.wait_for(lock, std::chrono::milliseconds(CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL), [this]() {
134 return !isThreadRunning_ || !highPriorityTaskQueue_.Empty() || !midPriorityTaskQueue_.Empty() ||
135 !lowPriorityTaskQueue_.Empty();
136 });
137 if (!ret) {
138 MEDIA_INFO_LOG("Wait for task timeout");
139 return false;
140 }
141 }
142 threadStatus->isThreadWaiting_ = false;
143 return isThreadRunning_;
144 }
145
StartWorker(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)146 void ThumbnailGenerateWorker::StartWorker(std::shared_ptr<ThumbnailGenerateThreadStatus> threadStatus)
147 {
148 std::string name("ThumbnailGenerateWorker");
149 pthread_setname_np(pthread_self(), name.c_str());
150 MEDIA_INFO_LOG("ThumbnailGenerateWorker thread start, taskType:%{public}d, id:%{public}d",
151 taskType_, threadStatus->threadId_);
152 while (isThreadRunning_) {
153 if (!WaitForTask(threadStatus)) {
154 continue;
155 }
156 std::shared_ptr<ThumbnailGenerateTask> task;
157 if (!highPriorityTaskQueue_.Empty() && highPriorityTaskQueue_.Pop(task) && task != nullptr) {
158 if (NeedIgnoreTask(task->data_->requestId_)) {
159 continue;
160 }
161 task->executor_(task->data_);
162 ++(threadStatus->taskNum_);
163 DecreaseRequestIdTaskNum(task);
164 continue;
165 }
166
167 if (!midPriorityTaskQueue_.Empty() && midPriorityTaskQueue_.Pop(task) && task != nullptr) {
168 if (NeedIgnoreTask(task->data_->requestId_)) {
169 continue;
170 }
171 task->executor_(task->data_);
172 ++(threadStatus->taskNum_);
173 DecreaseRequestIdTaskNum(task);
174 continue;
175 }
176
177 if (!lowPriorityTaskQueue_.Empty() && lowPriorityTaskQueue_.Pop(task) && task != nullptr) {
178 if (NeedIgnoreTask(task->data_->requestId_)) {
179 continue;
180 }
181 task->executor_(task->data_);
182 ++(threadStatus->taskNum_);
183 DecreaseRequestIdTaskNum(task);
184 }
185 }
186 MEDIA_INFO_LOG("ThumbnailGenerateWorker thread finish, taskType:%{public}d, id:%{public}d",
187 taskType_, threadStatus->threadId_);
188 }
189
NeedIgnoreTask(int32_t requestId)190 bool ThumbnailGenerateWorker::NeedIgnoreTask(int32_t requestId)
191 {
192 return ignoreRequestId_ != 0 && ignoreRequestId_ == requestId;
193 }
194
IncreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> & task)195 void ThumbnailGenerateWorker::IncreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> &task)
196 {
197 if (task == nullptr || task->data_->requestId_ == 0) {
198 // If requestId is 0, no need to manager this task.
199 return;
200 }
201
202 std::unique_lock<std::mutex> lock(requestIdMapLock_);
203 int32_t requestId = task->data_->requestId_;
204 auto pos = requestIdTaskMap_.find(requestId);
205 if (pos == requestIdTaskMap_.end()) {
206 requestIdTaskMap_.insert(std::make_pair(requestId, 1));
207 return;
208 }
209 ++(pos->second);
210 }
211
DecreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> & task)212 void ThumbnailGenerateWorker::DecreaseRequestIdTaskNum(const std::shared_ptr<ThumbnailGenerateTask> &task)
213 {
214 if (task == nullptr || task->data_->requestId_ == 0) {
215 // If requestId is 0, no need to manager this task.
216 return;
217 }
218
219 std::unique_lock<std::mutex> lock(requestIdMapLock_);
220 int32_t requestId = task->data_->requestId_;
221 auto pos = requestIdTaskMap_.find(requestId);
222 if (pos == requestIdTaskMap_.end()) {
223 return;
224 }
225
226 if (--(pos->second) == 0) {
227 requestIdTaskMap_.erase(requestId);
228 NotifyTaskFinished(requestId);
229 }
230 }
231
NotifyTaskFinished(int32_t requestId)232 void ThumbnailGenerateWorker::NotifyTaskFinished(int32_t requestId)
233 {
234 auto watch = MediaLibraryNotify::GetInstance();
235 if (watch == nullptr) {
236 MEDIA_ERR_LOG("watch is nullptr");
237 return;
238 }
239 std::string notifyUri = PhotoColumn::PHOTO_URI_PREFIX + std::to_string(requestId);
240 watch->Notify(notifyUri, NotifyType::NOTIFY_THUMB_ADD);
241 }
242
ClearWorkerThreads()243 void ThumbnailGenerateWorker::ClearWorkerThreads()
244 {
245 {
246 std::unique_lock<std::mutex> lock(workerLock_);
247 isThreadRunning_ = false;
248 }
249 ignoreRequestId_ = 0;
250 workerCv_.notify_all();
251 for (auto &thread : threads_) {
252 if (!thread.joinable()) {
253 continue;
254 }
255 thread.join();
256 }
257 threadsStatus_.clear();
258 threads_.clear();
259 MEDIA_INFO_LOG("Clear ThumbnailGenerateWorker threads successfully, taskType:%{public}d", taskType_);
260 }
261
IsAllThreadWaiting()262 bool ThumbnailGenerateWorker::IsAllThreadWaiting()
263 {
264 std::unique_lock<std::mutex> lock(workerLock_);
265 if (!highPriorityTaskQueue_.Empty() || !midPriorityTaskQueue_.Empty() || !lowPriorityTaskQueue_.Empty()) {
266 MEDIA_INFO_LOG("task queue is not empty, no need to clear worker threads, taskType:%{public}d", taskType_);
267 return false;
268 }
269
270 for (auto &threadStatus : threadsStatus_) {
271 if (!threadStatus->isThreadWaiting_) {
272 MEDIA_INFO_LOG("thread is running, taskType:%{public}d, id:%{public}d, taskNum:%{public}d",
273 taskType_, threadStatus->threadId_, threadStatus->taskNum_);
274 return false;
275 }
276 }
277 isThreadRunning_ = false;
278 return true;
279 }
280
TryClearWorkerThreads()281 void ThumbnailGenerateWorker::TryClearWorkerThreads()
282 {
283 std::unique_lock<std::mutex> lock(taskMutex_);
284 if (!IsAllThreadWaiting()) {
285 return;
286 }
287
288 ClearWorkerThreads();
289 }
290
RegisterWorkerTimer()291 void ThumbnailGenerateWorker::RegisterWorkerTimer()
292 {
293 Utils::Timer::TimerCallback timerCallback = [this]() {
294 MEDIA_INFO_LOG("ThumbnailGenerateWorker timerCallback, ClearWorkerThreads, taskType:%{public}d", taskType_);
295 TryClearWorkerThreads();
296 };
297
298 std::lock_guard<std::mutex> lock(timerMutex_);
299 if (timerId_ == 0) {
300 MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Setup, taskType:%{public}d", taskType_);
301 timer_.Setup();
302 } else {
303 timer_.Unregister(timerId_);
304 }
305
306 timerId_ = timer_.Register(timerCallback, CLOSE_THUMBNAIL_WORKER_TIME_INTERVAL, false);
307 MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Restart, taskType:%{public}d, timeId:%{public}u",
308 taskType_, timerId_);
309 }
310
TryCloseTimer()311 void ThumbnailGenerateWorker::TryCloseTimer()
312 {
313 std::lock_guard<std::mutex> lock(timerMutex_);
314 if (threads_.empty() && timerId_ != 0) {
315 timer_.Unregister(timerId_);
316 timer_.Shutdown();
317 timerId_ = 0;
318 MEDIA_INFO_LOG("ThumbnailGenerateWorker timer Shutdown, taskType:%{public}d", taskType_);
319 }
320 }
321 } // namespace Media
322 } // namespace OHOS