1 /*
2 * Copyright (C) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define MLOG_TAG "AccurateRefresh::MediaOnNotifyNewObserver"
17 #include "medialibrary_notify_new_observer.h"
18
19 #include "media_file_utils.h"
20 #include "media_notification_utils.h"
21 #include "medialibrary_errno.h"
22 #include "medialibrary_napi_log.h"
23 #include "medialibrary_tracer.h"
24
25 using namespace std;
26
27 namespace OHOS {
28 namespace Media {
29 shared_ptr<ChangeInfoTaskWorker> ChangeInfoTaskWorker::changeInfoTaskWorker_{nullptr};
30 mutex ChangeInfoTaskWorker::instanceMtx_;
31 mutex ChangeInfoTaskWorker::vectorMutex_;
32
33 static const int64_t MAX_NOTIFY_MILLISECONDS = 10;
34 static const int32_t START_NOTIFY_TASK_COUNT = 3;
35 static const int32_t MAX_NOTIFY_TASK_COUNT = 23;
36 static const size_t MAX_NOTIFY_TASK_INFO_SIZE = 5000;
37 static const uint32_t MAX_PARCEL_SIZE = 200 * 1024;
38
OnChange(const ChangeInfo & changeInfo)39 void MediaOnNotifyNewObserver::OnChange(const ChangeInfo &changeInfo)
40 {
41 MediaLibraryTracer tracer;
42 tracer.Start("MediaOnNotifyNewObserver::OnChange");
43 NAPI_DEBUG_LOG("begin OnChange");
44 if (changeInfo.data_ == nullptr || changeInfo.size_ <= 0) {
45 NAPI_ERR_LOG("changeInfo.data_ is null or changeInfo.size_ is invalid");
46 return;
47 }
48 if (changeInfo.size_ > MAX_PARCEL_SIZE) {
49 NAPI_ERR_LOG("The size of the parcel exceeds the limit.");
50 return;
51 }
52 uint8_t *parcelData = static_cast<uint8_t *>(malloc(changeInfo.size_));
53 CHECK_AND_RETURN_LOG(parcelData != nullptr, "parcelData malloc failed");
54 if (memcpy_s(parcelData, changeInfo.size_, changeInfo.data_, changeInfo.size_) != 0) {
55 NAPI_ERR_LOG("parcelData copy parcel data failed");
56 free(parcelData);
57 return;
58 }
59 shared_ptr<MessageParcel> parcel = make_shared<MessageParcel>();
60 // parcel析构函数中会free掉parcelData,成功调用ParseFrom后不可进行free(parcelData)
61 if (!parcel->ParseFrom(reinterpret_cast<uintptr_t>(parcelData), changeInfo.size_)) {
62 NAPI_ERR_LOG("Parse parcelData failed");
63 free(parcelData);
64 return;
65 }
66 NewJsOnChangeCallbackWrapper callbackWrapper;
67 callbackWrapper.mediaChangeInfo_ = NotificationUtils::UnmarshalInMultiMode(*parcel);
68 CHECK_AND_RETURN_LOG(callbackWrapper.mediaChangeInfo_ != nullptr, "invalid mediaChangeInfo");
69 NAPI_INFO_LOG("mediaChangeInfo_ is: %{public}s", callbackWrapper.mediaChangeInfo_->ToString(true).c_str());
70 Notification::NotifyUriType infoUriType = callbackWrapper.mediaChangeInfo_->notifyUri;
71 if (clientObservers_.find(infoUriType) == clientObservers_.end()) {
72 NAPI_ERR_LOG("invalid mediaChangeInfo_->notifyUri: %{public}d", static_cast<int32_t>(infoUriType));
73 for (const auto& pair : clientObservers_) {
74 NAPI_ERR_LOG("invalid clientObservers_ infoUriType: %{public}d", static_cast<int32_t>(pair.first));
75 }
76 return;
77 }
78 callbackWrapper.env_ = env_;
79 callbackWrapper.observerUriType_ = infoUriType;
80 callbackWrapper.clientObservers_ = clientObservers_[infoUriType];
81
82 auto worker = ChangeInfoTaskWorker::GetInstance();
83 if (worker == nullptr) {
84 NAPI_ERR_LOG("Get ChangeInfoTaskWorker instance failed");
85 return;
86 }
87 worker->AddTaskInfo(callbackWrapper);
88 if (!worker->IsRunning()) {
89 worker->StartWorker();
90 }
91 }
92
ReadyForCallbackEvent(const NewJsOnChangeCallbackWrapper & callbackWrapper)93 void MediaOnNotifyNewObserver::ReadyForCallbackEvent(const NewJsOnChangeCallbackWrapper &callbackWrapper)
94 {
95 MediaLibraryTracer tracer;
96 tracer.Start("MediaOnNotifyNewObserver::ReadyForCallbackEvent");
97 NAPI_DEBUG_LOG("start ReadyForCallbackEvent");
98
99 std::unique_ptr<NewJsOnChangeCallbackWrapper> jsCallback = std::make_unique<NewJsOnChangeCallbackWrapper>();
100 if (jsCallback == nullptr) {
101 NAPI_ERR_LOG("NewJsOnChangeCallbackWrapper make_unique failed");
102 return;
103 }
104 jsCallback->env_ = callbackWrapper.env_;
105 jsCallback->clientObservers_ = callbackWrapper.clientObservers_;
106 jsCallback->observerUriType_ = callbackWrapper.observerUriType_;
107 jsCallback->mediaChangeInfo_ = callbackWrapper.mediaChangeInfo_;
108
109 OnJsCallbackEvent(jsCallback);
110 }
111
OnChangeNotifyDetail(NewJsOnChangeCallbackWrapper * wrapper)112 static void OnChangeNotifyDetail(NewJsOnChangeCallbackWrapper* wrapper)
113 {
114 MediaLibraryTracer tracer;
115 tracer.Start("OnChangeNotifyDetail");
116 std::shared_ptr<Notification::MediaChangeInfo> mediaChangeInfo = wrapper->mediaChangeInfo_;
117
118 napi_env env = wrapper->env_;
119 napi_handle_scope scope = nullptr;
120 napi_open_handle_scope(env, &scope);
121 napi_value buildResult = nullptr;
122 switch (wrapper->observerUriType_) {
123 case Notification::PHOTO_URI:
124 case Notification::HIDDEN_PHOTO_URI:
125 case Notification::TRASH_PHOTO_URI:
126 buildResult = mediaChangeInfo == nullptr ? MediaLibraryNotifyUtils::BuildPhotoAssetRecheckChangeInfos(env) :
127 MediaLibraryNotifyUtils::BuildPhotoAssetChangeInfos(env, mediaChangeInfo);
128 break;
129 case Notification::PHOTO_ALBUM_URI:
130 case Notification::HIDDEN_ALBUM_URI:
131 case Notification::TRASH_ALBUM_URI:
132 buildResult = mediaChangeInfo == nullptr ? MediaLibraryNotifyUtils::BuildAlbumRecheckChangeInfos(env) :
133 MediaLibraryNotifyUtils::BuildAlbumChangeInfos(env, mediaChangeInfo);
134 break;
135 default:
136 NAPI_ERR_LOG("Invalid registerUriType");
137 }
138 if (buildResult == nullptr) {
139 NAPI_ERR_LOG("Failed to build result");
140 napi_close_handle_scope(env, scope);
141 return;
142 }
143 napi_value result[ARGS_ONE];
144 result[PARAM0] = buildResult;
145
146 for (auto &observer : wrapper->clientObservers_) {
147 napi_value jsCallback = nullptr;
148 napi_status status = napi_get_reference_value(env, observer->ref_, &jsCallback);
149 if (status != napi_ok) {
150 NAPI_ERR_LOG("Create reference fail, status: %{public}d", status);
151 continue;
152 }
153 napi_value retVal = nullptr;
154 status = napi_call_function(env, nullptr, jsCallback, ARGS_ONE, result, &retVal);
155 if (status != napi_ok) {
156 NAPI_ERR_LOG("CallJs napi_call_function fail, status: %{public}d", status);
157 continue;
158 }
159 }
160 napi_close_handle_scope(env, scope);
161 }
162
OnJsCallbackEvent(std::unique_ptr<NewJsOnChangeCallbackWrapper> & jsCallback)163 void MediaOnNotifyNewObserver::OnJsCallbackEvent(std::unique_ptr<NewJsOnChangeCallbackWrapper> &jsCallback)
164 {
165 if (jsCallback.get() == nullptr) {
166 NAPI_ERR_LOG("jsCallback.get() is nullptr");
167 return;
168 }
169
170 napi_env env = jsCallback->env_;
171 NewJsOnChangeCallbackWrapper *event = jsCallback.release();
172 auto task = [event] () {
173 std::shared_ptr<NewJsOnChangeCallbackWrapper> context(
174 static_cast<NewJsOnChangeCallbackWrapper*>(event),
175 [](NewJsOnChangeCallbackWrapper* ptr) {
176 delete ptr;
177 });
178 CHECK_AND_RETURN_LOG(event != nullptr, "event is nullptr");
179 OnChangeNotifyDetail(event);
180 };
181 if (napi_send_event(env, task, napi_eprio_immediate) != napi_ok) {
182 NAPI_ERR_LOG("failed to execute task");
183 delete event;
184 }
185 }
186
GetInstance()187 shared_ptr<ChangeInfoTaskWorker> ChangeInfoTaskWorker::GetInstance()
188 {
189 if (changeInfoTaskWorker_ == nullptr) {
190 lock_guard<mutex> lockGuard(instanceMtx_);
191 if (changeInfoTaskWorker_ == nullptr) {
192 changeInfoTaskWorker_ = make_shared<ChangeInfoTaskWorker>();
193 }
194 }
195 return changeInfoTaskWorker_;
196 }
197
ChangeInfoTaskWorker()198 ChangeInfoTaskWorker::ChangeInfoTaskWorker() {}
199
~ChangeInfoTaskWorker()200 ChangeInfoTaskWorker::~ChangeInfoTaskWorker() {}
201
StartWorker()202 void ChangeInfoTaskWorker::StartWorker()
203 {
204 if (!isThreadRunning_.load()) {
205 isThreadRunning_.store(true);
206 std::thread([this]() { this->HandleNotifyTaskPeriod(); }).detach();
207 }
208 }
209
GetTaskInfos()210 void ChangeInfoTaskWorker::GetTaskInfos()
211 {
212 // taskMap key: 注册给服务端的uriType value: uriType对应的clientObservers_
213 map<Notification::NotifyUriType, NewJsOnChangeCallbackWrapper> taskMap;
214 for (const auto& taskInfo : taskInfos_) {
215 const auto& clientObservers = taskInfo.clientObservers_;
216 if (clientObservers.empty()) {
217 continue;
218 }
219 Notification::NotifyUriType observerUriType = taskInfo.observerUriType_;
220 napi_env env = taskInfo.env_;
221
222 if (taskMap.find(observerUriType) == taskMap.end()) {
223 NewJsOnChangeCallbackWrapper newCallbackWrapper;
224 newCallbackWrapper.env_ = env;
225 newCallbackWrapper.mediaChangeInfo_ = nullptr;
226 newCallbackWrapper.observerUriType_ = observerUriType;
227 newCallbackWrapper.clientObservers_ = clientObservers;
228 taskMap[observerUriType] = newCallbackWrapper;
229 }
230 }
231
232 taskInfos_.clear();
233 for (const auto& task : taskMap) {
234 const NewJsOnChangeCallbackWrapper& callbackWrapper = task.second;
235 taskInfos_.push_back(callbackWrapper);
236 }
237 NAPI_INFO_LOG("taskInfos_ size: %{public}zu, notifyTaskCount_: %{public}d, notifyTaskInfoSize_: %{public}zu",
238 taskInfos_.size(), notifyTaskCount_, notifyTaskInfoSize_);
239 return;
240 }
AddTaskInfo(NewJsOnChangeCallbackWrapper callbackWrapper)241 void ChangeInfoTaskWorker::AddTaskInfo(NewJsOnChangeCallbackWrapper callbackWrapper)
242 {
243 NAPI_DEBUG_LOG("enter AddTaskInfo");
244 lock_guard<mutex> lock(vectorMutex_);
245 int64_t currentTime = MediaFileUtils::UTCTimeMilliSeconds();
246 if (currentTime - lastTaskTime_ < MAX_NOTIFY_MILLISECONDS) {
247 notifyTaskCount_++;
248 if (notifyTaskCount_ > START_NOTIFY_TASK_COUNT && callbackWrapper.mediaChangeInfo_ != nullptr) {
249 notifyTaskInfoSize_ += callbackWrapper.mediaChangeInfo_->changeInfos.size();
250 }
251 lastTaskTime_ = currentTime;
252 taskInfos_.push_back(callbackWrapper);
253 NAPI_DEBUG_LOG("taskInfos_ size: %{public}zu, notifyTaskCount_: %{public}d, notifyTaskInfoSize_: %{public}zu",
254 taskInfos_.size(), notifyTaskCount_, notifyTaskInfoSize_);
255 return;
256 }
257 taskInfos_.push_back(callbackWrapper);
258 if ((notifyTaskCount_ > MAX_NOTIFY_TASK_COUNT || notifyTaskInfoSize_ > MAX_NOTIFY_TASK_INFO_SIZE) &&
259 !taskInfos_.empty()) {
260 GetTaskInfos();
261 }
262 notifyTaskCount_ = 0;
263 notifyTaskInfoSize_ = 0;
264 lastTaskTime_ = currentTime;
265 NAPI_DEBUG_LOG("taskInfos_ size: %{public}zu, notifyTaskCount_: %{public}d, notifyTaskInfoSize_: %{public}zu",
266 taskInfos_.size(), notifyTaskCount_, notifyTaskInfoSize_);
267 }
268
IsTaskInfosEmpty()269 bool ChangeInfoTaskWorker::IsTaskInfosEmpty()
270 {
271 lock_guard<mutex> lock(vectorMutex_);
272 return taskInfos_.empty();
273 }
274
IsRunning()275 bool ChangeInfoTaskWorker::IsRunning()
276 {
277 return isThreadRunning_.load();
278 }
279
WaitForTask()280 void ChangeInfoTaskWorker::WaitForTask()
281 {
282 if (IsTaskInfosEmpty()) {
283 isThreadRunning_.store(false);
284 }
285 }
286
HandleTimeoutNotifyTask()287 void ChangeInfoTaskWorker::HandleTimeoutNotifyTask()
288 {
289 lock_guard<mutex> lock(vectorMutex_);
290 int64_t currentTime = MediaFileUtils::UTCTimeMilliSeconds();
291 if (taskInfos_.empty() || currentTime - lastTaskTime_ < MAX_NOTIFY_MILLISECONDS) {
292 return;
293 }
294 // taskInfos_非空,并且距离上一个加入队列的任务超过10ms
295 if ((notifyTaskCount_ > MAX_NOTIFY_TASK_COUNT || notifyTaskInfoSize_ > MAX_NOTIFY_TASK_INFO_SIZE)) {
296 GetTaskInfos();
297 }
298 notifyTaskCount_ = 0;
299 notifyTaskInfoSize_ = 0;
300 lastTaskTime_ = currentTime;
301 NAPI_DEBUG_LOG("taskInfos_ size: %{public}zu, notifyTaskCount_: %{public}d, notifyTaskInfoSize_: %{public}zu",
302 taskInfos_.size(), notifyTaskCount_, notifyTaskInfoSize_);
303 }
304
HandleNotifyTask()305 void ChangeInfoTaskWorker::HandleNotifyTask()
306 {
307 lock_guard<mutex> lock(vectorMutex_);
308 if (notifyTaskCount_ > START_NOTIFY_TASK_COUNT) {
309 NAPI_DEBUG_LOG("taskInfos_ size: %{public}zu, notifyTaskCount_: %{public}d, notifyTaskInfoSize_: %{public}zu",
310 taskInfos_.size(), notifyTaskCount_, notifyTaskInfoSize_);
311 return;
312 }
313 if (taskInfos_.empty()) {
314 NAPI_INFO_LOG("taskInfos_ is empty");
315 return;
316 }
317 NewJsOnChangeCallbackWrapper callbackWrapper = taskInfos_.front();
318 taskInfos_.erase(taskInfos_.begin());
319 MediaOnNotifyNewObserver::ReadyForCallbackEvent(callbackWrapper);
320 }
321
HandleNotifyTaskPeriod()322 void ChangeInfoTaskWorker::HandleNotifyTaskPeriod()
323 {
324 MediaLibraryTracer tracer;
325 tracer.Start("ChangeInfoTaskWorker::HandleNotifyTaskPeriod");
326 NAPI_INFO_LOG("start changeInfo notify worker");
327 string name("NewNotifyThread");
328 pthread_setname_np(pthread_self(), name.c_str());
329 while (isThreadRunning_.load()) {
330 WaitForTask();
331 if (!isThreadRunning_.load()) {
332 break;
333 }
334 HandleNotifyTask();
335 HandleTimeoutNotifyTask();
336 }
337 NAPI_INFO_LOG("end changeInfo notify worker");
338 }
339 } // namespace Media
340 } // namespace OHOS
341