1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "UdmfAsyncClient"
16
17 #include "udmf_async_client.h"
18
19 #include <sys/time.h>
20 #include "dataobs_mgr_client.h"
21 #include "logger.h"
22 #include "plain_text.h"
23 #include "progress_callback.h"
24 #include "udmf_client.h"
25 #include "udmf_copy_file.h"
26 #ifndef IOS_PLATFORM
27 #include "udmf_executor.h"
28 #endif
29 #include "udmf_notifier_stub.h"
30 #include "udmf_service_client.h"
31
32 namespace OHOS::UDMF {
33 static constexpr size_t MAX_THREADS = 10;
34 static constexpr size_t MIN_THREADS = 0;
35
36 static constexpr int32_t START_ABILITY_INTERVAL = 500;
37 static constexpr int32_t READ_PROGRESS_INTERVAL = 100;
38 static constexpr int32_t UPDATA_TIMESTAMP_INTERVAL = 1000;
39 static constexpr int32_t SYNC_INTERVAL = 200;
40 static constexpr int32_t MAX_SYNC_TIMES = 14;
41 static constexpr uint64_t SEC_TO_MILLISEC = 1000;
42 static constexpr uint64_t MICROSEC_TO_MILLISEC = 1000;
43
44 static std::unordered_map<Status, int32_t> STATUS_MAP = {
45 { E_INVALID_PARAMETERS, ListenerStatus::INVALID_PARAMETERS },
46 { E_NOT_FOUND, ListenerStatus::DATA_NOT_FOUND },
47 { E_SYNC_FAILED, ListenerStatus::SYNC_FAILED },
48 { E_COPY_FILE_FAILED, ListenerStatus::COPY_FILE_FAILED },
49 { E_COPY_CANCELED, ListenerStatus::CANCEL }
50 };
51
52 static constexpr int32_t PROGRESS_ERROR = -1;
53 static constexpr int32_t PROGRESS_INIT = 0;
54 static constexpr int32_t PROGRESS_SYNC_FINSHED = 10;
55 static constexpr int32_t PROGRESS_GET_DATA_FINISHED = 20;
56 static constexpr int32_t PROGRESS_ALL_FINISHED = 100;
57
58 UdmfAsyncClient::UdmfAsyncClient() = default;
59
60 #ifndef IOS_PLATFORM
61 static UdmfExecutor udmfExecutor(MAX_THREADS, MIN_THREADS);
62 #endif
63
GetInstance()64 UdmfAsyncClient &UdmfAsyncClient::GetInstance()
65 {
66 static UdmfAsyncClient instance;
67 return instance;
68 }
69
StartAsyncDataRetrieval(const GetDataParams & params)70 Status UdmfAsyncClient::StartAsyncDataRetrieval(const GetDataParams ¶ms)
71 {
72 #ifndef IOS_PLATFORM
73 if (!IsParamValid(params)) {
74 return E_INVALID_PARAMETERS;
75 }
76 auto status = RegisterAsyncHelper(params);
77 if (status != E_OK) {
78 return status;
79 }
80 if (asyncHelperMap_.find(params.query.key) == asyncHelperMap_.end()) {
81 LOG_ERROR(UDMF_CLIENT, "RegisterAsyncHelper failed, key=%{public}s", params.query.key.c_str());
82 return E_ERROR;
83 }
84 auto &asyncHelper = asyncHelperMap_.at(params.query.key);
85 if (params.progressIndicator == ProgressIndicator::DEFAULT) {
86 asyncHelper->progressQueue.SetClearable(false);
87 asyncHelper->invokeHapTask = udmfExecutor.Schedule(std::chrono::milliseconds(START_ABILITY_INTERVAL),
88 (std::bind(&UdmfAsyncClient::InvokeHapTask, this, params.query.key)));
89 }
90 asyncHelper->getDataTask = udmfExecutor.Execute(std::bind(&UdmfAsyncClient::GetDataTask, this, params.query));
91 asyncHelper->progressTask = udmfExecutor.Execute(std::bind(&UdmfAsyncClient::ProgressTask, this, params.query.key));
92 #endif
93 return E_OK;
94 }
95
Cancel(std::string businessUdKey)96 Status UdmfAsyncClient::Cancel(std::string businessUdKey)
97 {
98 std::lock_guard<std::mutex> lockMap(mutex_);
99 if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
100 LOG_ERROR(UDMF_CLIENT, "No task can Cancel, key=%{public}s", businessUdKey.c_str());
101 return E_ERROR;
102 }
103 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
104 asyncHelper->progressQueue.Cancel();
105 return E_OK;
106 }
107
CancelOnSingleTask()108 Status UdmfAsyncClient::CancelOnSingleTask()
109 {
110 std::lock_guard<std::mutex> lockMap(mutex_);
111 if (asyncHelperMap_.empty()) {
112 LOG_ERROR(UDMF_CLIENT, "No task can cancel");
113 return E_ERROR;
114 }
115 if (asyncHelperMap_.size() > 1) {
116 LOG_ERROR(UDMF_CLIENT, "Multiple tasks exist");
117 return E_ERROR;
118 }
119 LOG_INFO(UDMF_CLIENT, "Cancel task key=%{public}s", asyncHelperMap_.begin()->first.c_str());
120 asyncHelperMap_.begin()->second->progressQueue.Cancel();
121 return E_OK;
122 }
123
ProgressTask(const std::string & businessUdKey)124 Status UdmfAsyncClient::ProgressTask(const std::string &businessUdKey)
125 {
126 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
127 if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
128 auto status = SetProgressData(businessUdKey);
129 if (status != E_OK) {
130 Clear(businessUdKey);
131 return status;
132 }
133 }
134 auto lastUpdateTime = std::chrono::steady_clock::now();
135 while (asyncHelper->lastProgress >= PROGRESS_INIT && asyncHelper->lastProgress < PROGRESS_ALL_FINISHED) {
136 auto pair = asyncHelper->progressQueue.Poll();
137 if (!pair.first) {
138 std::this_thread::sleep_for(std::chrono::milliseconds(READ_PROGRESS_INTERVAL));
139 continue;
140 }
141 auto progressInfo = pair.second;
142 if (progressInfo->progress >= PROGRESS_INIT && progressInfo->progress < asyncHelper->lastProgress) {
143 continue;
144 }
145 if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
146 auto now = std::chrono::steady_clock::now();
147 auto elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastUpdateTime).count();
148 if (progressInfo->progress != asyncHelper->lastProgress || elapsedMs > UPDATA_TIMESTAMP_INTERVAL) {
149 UpdateProgressData(asyncHelper->processKey, *progressInfo);
150 lastUpdateTime = now;
151 }
152 }
153 asyncHelper->lastProgress = progressInfo->progress;
154 }
155 Clear(businessUdKey);
156 return E_OK;
157 }
158
GetDataTask(const QueryOption & query)159 Status UdmfAsyncClient::GetDataTask(const QueryOption &query)
160 {
161 auto &asyncHelper = asyncHelperMap_.at(query.key);
162 if (GetDataFromCache(asyncHelper, query) == E_OK) {
163 ProcessUnifiedData(asyncHelper);
164 return E_OK;
165 }
166 return CheckSync(asyncHelper, query);
167 }
168
InvokeHapTask(const std::string & businessUdKey)169 Status UdmfAsyncClient::InvokeHapTask(const std::string &businessUdKey)
170 {
171 LOG_INFO(UDMF_CLIENT, "InvokeHap start!");
172 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
173 if (asyncHelper->progressQueue.IsCancel() || asyncHelper->progressQueue.IsClear()) {
174 LOG_INFO(UDMF_CLIENT, "Finished, not invoke hap.");
175 Clear(businessUdKey);
176 return E_OK;
177 }
178 if (asyncHelper->processKey.empty()) {
179 LOG_ERROR(UDMF_CLIENT, "Get key failed, not invoke hap.");
180 Clear(businessUdKey);
181 return E_ERROR;
182 }
183 sptr<IRemoteObject> callback = new (std::nothrow) ProgressSignalCallback();
184 if (callback == nullptr) {
185 LOG_ERROR(UDMF_CLIENT, "Create ProgressSignalCallback failed");
186 Clear(businessUdKey);
187 return E_ERROR;
188 }
189 auto obsMgrClient = AAFwk::DataObsMgrClient::GetInstance();
190 if (obsMgrClient == nullptr) {
191 LOG_ERROR(UDMF_CLIENT, "Get DataObsMgrClient failed");
192 Clear(businessUdKey);
193 return E_ERROR;
194 }
195
196 auto status = obsMgrClient->NotifyProcessObserver(asyncHelper->processKey, callback);
197 if (status != AAFwk::SUCCESS) {
198 LOG_ERROR(UDMF_CLIENT, "Notify process dialog failed, status=%{public}d", status);
199 Clear(businessUdKey);
200 return E_ERROR;
201 }
202 Clear(businessUdKey);
203 return E_OK;
204 }
205
RegisterAsyncHelper(const GetDataParams & params)206 Status UdmfAsyncClient::RegisterAsyncHelper(const GetDataParams ¶ms)
207 {
208 std::lock_guard<std::mutex> lock(mutex_);
209 if (asyncHelperMap_.find(params.query.key) != asyncHelperMap_.end()) {
210 LOG_ERROR(UDMF_CLIENT, "The same task is running, key = %{public}s", params.query.key.c_str());
211 return E_IDEMPOTENT_ERROR;
212 }
213 auto asyncHelper = std::make_unique<AsyncHelper>();
214 asyncHelper->businessUdKey = params.query.key;
215 asyncHelper->progressListener = params.progressListener;
216 asyncHelper->progressIndicator = params.progressIndicator;
217 asyncHelper->fileConflictOptions = params.fileConflictOptions;
218 asyncHelper->destUri = params.destUri;
219 asyncHelper->acceptableInfo = params.acceptableInfo;
220 asyncHelperMap_.insert_or_assign(params.query.key, std::move(asyncHelper));
221 return E_OK;
222 }
223
CheckSync(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)224 Status UdmfAsyncClient::CheckSync(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
225 {
226 #ifndef IOS_PLATFORM
227 AsyncProcessInfo processInfo = {
228 .businessUdKey = query.key
229 };
230 auto serviceClient = UdmfServiceClient::GetInstance();
231 if (serviceClient == nullptr) {
232 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
233 return E_IPC;
234 }
235 auto status = serviceClient->ObtainAsynProcess(processInfo);
236 if (status != E_OK) {
237 LOG_ERROR(UDMF_CLIENT, "Sync error status = %{public}d!", status);
238 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_ERROR };
239 CallProgress(asyncHelper, progressInfo, nullptr);
240 return static_cast<Status>(status);
241 }
242 if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_FAILURE) {
243 LOG_ERROR(UDMF_CLIENT, "Sync failed!");
244 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
245 CallProgress(asyncHelper, progressInfo, nullptr);
246 return E_ERROR;
247 }
248 if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_SUCCESS) {
249 ProgressInfo progressInfo = { .progress = PROGRESS_SYNC_FINSHED, .errorCode = E_OK };
250 CallProgress(asyncHelper, progressInfo, nullptr);
251 return GetDataFromDB(asyncHelper, query);
252 }
253 if (asyncHelper->sycnRetryTime > MAX_SYNC_TIMES) {
254 LOG_ERROR(UDMF_CLIENT, "Sync retry timeout!");
255 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
256 CallProgress(asyncHelper, progressInfo, nullptr);
257 return E_SYNC_FAILED;
258 }
259 (asyncHelper->sycnRetryTime)++;
260 asyncHelper->getDataTask = udmfExecutor.Schedule(std::chrono::milliseconds(SYNC_INTERVAL),
261 std::bind(&UdmfAsyncClient::GetDataTask, this, query));
262 #endif
263 return E_OK;
264 }
265
GetDataFromDB(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)266 Status UdmfAsyncClient::GetDataFromDB(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
267 {
268 auto &data = *(asyncHelper->data);
269 auto delayDataCallback = [this](const std::string &key, const UnifiedData &unifiedData) {
270 if (asyncHelperMap_.find(key) == asyncHelperMap_.end()) {
271 LOG_ERROR(UDMF_CLIENT, "No task exist, key=%{public}s", key.c_str());
272 return;
273 }
274 auto &asyncHelper = asyncHelperMap_.at(key);
275 *(asyncHelper->data) = unifiedData;
276 UdmfAsyncClient::GetInstance().ProcessUnifiedData(asyncHelper);
277 };
278 sptr<IRemoteObject> iUdmfNotifier = new (std::nothrow) DelayDataCallbackClient(delayDataCallback);
279 if (iUdmfNotifier == nullptr) {
280 LOG_ERROR(UDMF_CLIENT, "IUdmfNotifier unavailable");
281 return E_ERROR;
282 }
283 std::shared_ptr<UnifiedData> dataPtr = std::make_shared<UnifiedData>();
284 auto status = static_cast<Status>(UdmfClient::GetInstance().GetDataIfAvailable(query.key,
285 asyncHelper->acceptableInfo, iUdmfNotifier, dataPtr));
286 if (status != E_OK) {
287 LOG_ERROR(UDMF_CLIENT, "GetData error, status = %{public}d", status);
288 ProgressInfo progressInfo = {
289 .progress = PROGRESS_ERROR,
290 .errorCode = status
291 };
292 CallProgress(asyncHelper, progressInfo, nullptr);
293 return status;
294 }
295 if (dataPtr->IsEmpty()) {
296 LOG_INFO(UDMF_CLIENT, "Wait delay data");
297 return E_OK;
298 }
299 data = *dataPtr;
300 return ProcessUnifiedData(asyncHelper);
301 }
302
GetDataFromCache(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)303 Status UdmfAsyncClient::GetDataFromCache(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
304 {
305 return UdmfClient::GetInstance().GetDataFromCache(query, *(asyncHelper->data));
306 }
307
SetProgressData(const std::string & businessUdKey)308 Status UdmfAsyncClient::SetProgressData(const std::string &businessUdKey)
309 {
310 auto serviceClient = UdmfServiceClient::GetInstance();
311 if (serviceClient == nullptr) {
312 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
313 return E_IPC;
314 }
315 std::string progressKey;
316 CustomOption cusomOption = {
317 .intention = Intention::UD_INTENTION_DATA_HUB
318 };
319 auto obj = std::make_shared<Object>();
320 auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
321 progressRecord->SetContent(std::to_string(PROGRESS_INIT));
322 progressRecord->SetAbstract(std::to_string(GetCurrentTimeMillis()));
323 UnifiedData progressData;
324 progressData.AddRecord(progressRecord);
325 auto status = serviceClient->SetData(cusomOption, progressData, progressKey);
326 if (status != E_OK) {
327 LOG_ERROR(UDMF_CLIENT, "Set progress data error, status = %{public}d", status);
328 return static_cast<Status>(status);
329 }
330 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
331 asyncHelper->processKey = progressKey;
332 return E_OK;
333 }
334
UpdateProgressData(const std::string & progressUdKey,const ProgressInfo & progressInfo)335 Status UdmfAsyncClient::UpdateProgressData(const std::string &progressUdKey, const ProgressInfo &progressInfo)
336 {
337 auto serviceClient = UdmfServiceClient::GetInstance();
338 if (serviceClient == nullptr) {
339 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
340 return E_IPC;
341 }
342 QueryOption queryOption = {
343 .key = progressUdKey,
344 .intention = Intention::UD_INTENTION_DATA_HUB
345 };
346 auto obj = std::make_shared<Object>();
347 auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
348 if (progressInfo.progress < PROGRESS_INIT) {
349 progressRecord->SetContent(std::to_string(PROGRESS_ALL_FINISHED));
350 } else {
351 progressRecord->SetContent(std::to_string(progressInfo.progress));
352 }
353 progressRecord->SetAbstract(std::to_string(GetCurrentTimeMillis()));
354 UnifiedData progressData;
355 progressData.AddRecord(progressRecord);
356 auto status = serviceClient->UpdateData(queryOption, progressData);
357 if (status != E_OK) {
358 LOG_ERROR(UDMF_CLIENT, "Update progress data error, status = %{public}d", status);
359 return static_cast<Status>(status);
360 }
361 return E_OK;
362 }
363
CopyFile(std::unique_ptr<AsyncHelper> & asyncHelper)364 Status UdmfAsyncClient::CopyFile(std::unique_ptr<AsyncHelper> &asyncHelper)
365 {
366 auto status = E_OK;
367 if (asyncHelper->destUri.empty()) {
368 LOG_INFO(UDMF_CLIENT, "No dest path, no copy.");
369 } else {
370 status = UdmfCopyFile::GetInstance().Copy(asyncHelper);
371 }
372 ProgressInfo progressInfo = {
373 .progress = PROGRESS_ALL_FINISHED,
374 .errorCode = status
375 };
376 CallProgress(asyncHelper, progressInfo, asyncHelper->data);
377 return E_OK;
378 }
379
CallProgress(std::unique_ptr<AsyncHelper> & asyncHelper,ProgressInfo & progressInfo,std::shared_ptr<UnifiedData> data)380 void UdmfAsyncClient::CallProgress(std::unique_ptr<AsyncHelper> &asyncHelper, ProgressInfo &progressInfo,
381 std::shared_ptr<UnifiedData> data)
382 {
383 if (progressInfo.errorCode == E_OK) {
384 if (progressInfo.progress == PROGRESS_ALL_FINISHED) {
385 progressInfo.progressStatus = ListenerStatus::FINISHED;
386 } else {
387 progressInfo.progressStatus = ListenerStatus::PROCESSING;
388 }
389 } else {
390 auto it = STATUS_MAP.find(progressInfo.errorCode);
391 if (it == STATUS_MAP.end()) {
392 progressInfo.progressStatus = ListenerStatus::INNER_ERROR;
393 } else {
394 progressInfo.progressStatus = it->second;
395 }
396 }
397
398 asyncHelper->progressListener(progressInfo, data);
399 asyncHelper->progressQueue.PushBack(progressInfo);
400 }
401
Clear(const std::string & businessUdKey)402 Status UdmfAsyncClient::Clear(const std::string &businessUdKey)
403 {
404 #ifndef IOS_PLATFORM
405 std::lock_guard<std::mutex> lockMap(mutex_);
406 if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
407 LOG_ERROR(UDMF_CLIENT, "No task can Clear, key=%{public}s", businessUdKey.c_str());
408 return E_ERROR;
409 }
410 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
411 if (!asyncHelper->progressQueue.Clear()) {
412 return E_OK;
413 }
414 if (asyncHelper->invokeHapTask != ExecutorPool::INVALID_TASK_ID) {
415 udmfExecutor.Remove(asyncHelper->invokeHapTask);
416 }
417 udmfExecutor.Remove(asyncHelper->getDataTask);
418 udmfExecutor.Remove(asyncHelper->progressTask);
419 asyncHelperMap_.erase(businessUdKey);
420 UdmfServiceClient::GetInstance()->ClearAsynProcessByKey(businessUdKey);
421 LOG_INFO(UDMF_CLIENT, "Clear task success, key = %{public}s", businessUdKey.c_str());
422 #endif
423 return E_OK;
424 }
425
ProcessUnifiedData(std::unique_ptr<AsyncHelper> & asyncHelper)426 Status UdmfAsyncClient::ProcessUnifiedData(std::unique_ptr<AsyncHelper> &asyncHelper)
427 {
428 if (!asyncHelper->data->HasFileType() && !asyncHelper->data->HasUriInfo()) {
429 ProgressInfo progressInfo = {
430 .progress = PROGRESS_ALL_FINISHED,
431 .errorCode = E_OK
432 };
433 CallProgress(asyncHelper, progressInfo, asyncHelper->data);
434 return E_OK;
435 }
436 ProgressInfo progressInfo = {
437 .progress = PROGRESS_GET_DATA_FINISHED,
438 .errorCode = E_OK
439 };
440 CallProgress(asyncHelper, progressInfo, nullptr);
441 return CopyFile(asyncHelper);
442 }
443
IsParamValid(const GetDataParams & params)444 bool UdmfAsyncClient::IsParamValid(const GetDataParams ¶ms)
445 {
446 auto query = params.query;
447 if (query.key.empty() || query.intention != Intention::UD_INTENTION_DRAG) {
448 LOG_ERROR(UDMF_CLIENT, "Params query invalid. query.key=%{public}s, intention=%{public}d", query.key.c_str(),
449 query.intention);
450 return false;
451 }
452 if (params.progressListener == nullptr) {
453 LOG_ERROR(UDMF_CLIENT, "Params progressListener not null.");
454 return false;
455 }
456 return true;
457 }
458
PushTaskToExecutor(UdmfTask task)459 void UdmfAsyncClient::PushTaskToExecutor(UdmfTask task)
460 {
461 udmfExecutor.Execute(std::move(task));
462 }
463
GetCurrentTimeMillis()464 uint64_t UdmfAsyncClient::GetCurrentTimeMillis()
465 {
466 struct timeval tv = { 0, 0 };
467 gettimeofday(&tv, nullptr);
468 return (static_cast<uint64_t>(tv.tv_sec) * SEC_TO_MILLISEC +
469 static_cast<uint64_t>(tv.tv_usec) / MICROSEC_TO_MILLISEC);
470 }
471 } // namespace OHOS::UDMF
472