• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "UdmfAsyncClient"
16 
17 #include "udmf_async_client.h"
18 
19 #include <sys/time.h>
20 #include "dataobs_mgr_client.h"
21 #include "logger.h"
22 #include "plain_text.h"
23 #include "progress_callback.h"
24 #include "udmf_client.h"
25 #include "udmf_copy_file.h"
26 #ifndef IOS_PLATFORM
27 #include "udmf_executor.h"
28 #endif
29 #include "udmf_notifier_stub.h"
30 #include "udmf_service_client.h"
31 
32 namespace OHOS::UDMF {
33 static constexpr size_t MAX_THREADS = 10;
34 static constexpr size_t MIN_THREADS = 0;
35 
36 static constexpr int32_t START_ABILITY_INTERVAL = 500;
37 static constexpr int32_t READ_PROGRESS_INTERVAL = 100;
38 static constexpr int32_t UPDATA_TIMESTAMP_INTERVAL = 1000;
39 static constexpr int32_t SYNC_INTERVAL = 200;
40 static constexpr int32_t MAX_SYNC_TIMES = 14;
41 static constexpr uint64_t SEC_TO_MILLISEC = 1000;
42 static constexpr uint64_t MICROSEC_TO_MILLISEC = 1000;
43 
44 static std::unordered_map<Status, int32_t> STATUS_MAP = {
45     { E_INVALID_PARAMETERS, ListenerStatus::INVALID_PARAMETERS },
46     { E_NOT_FOUND, ListenerStatus::DATA_NOT_FOUND },
47     { E_SYNC_FAILED, ListenerStatus::SYNC_FAILED },
48     { E_COPY_FILE_FAILED, ListenerStatus::COPY_FILE_FAILED },
49     { E_COPY_CANCELED, ListenerStatus::CANCEL }
50 };
51 
52 static constexpr int32_t PROGRESS_ERROR = -1;
53 static constexpr int32_t PROGRESS_INIT = 0;
54 static constexpr int32_t PROGRESS_SYNC_FINSHED = 10;
55 static constexpr int32_t PROGRESS_GET_DATA_FINISHED = 20;
56 static constexpr int32_t PROGRESS_ALL_FINISHED = 100;
57 
58 UdmfAsyncClient::UdmfAsyncClient() = default;
59 
60 #ifndef IOS_PLATFORM
61 static UdmfExecutor udmfExecutor(MAX_THREADS, MIN_THREADS);
62 #endif
63 
GetInstance()64 UdmfAsyncClient &UdmfAsyncClient::GetInstance()
65 {
66     static UdmfAsyncClient instance;
67     return instance;
68 }
69 
StartAsyncDataRetrieval(const GetDataParams & params)70 Status UdmfAsyncClient::StartAsyncDataRetrieval(const GetDataParams &params)
71 {
72 #ifndef IOS_PLATFORM
73     if (!IsParamValid(params)) {
74         return E_INVALID_PARAMETERS;
75     }
76     auto status = RegisterAsyncHelper(params);
77     if (status != E_OK) {
78         return status;
79     }
80     if (asyncHelperMap_.find(params.query.key) == asyncHelperMap_.end()) {
81         LOG_ERROR(UDMF_CLIENT, "RegisterAsyncHelper failed, key=%{public}s", params.query.key.c_str());
82         return E_ERROR;
83     }
84     auto &asyncHelper = asyncHelperMap_.at(params.query.key);
85     if (params.progressIndicator == ProgressIndicator::DEFAULT) {
86         asyncHelper->progressQueue.SetClearable(false);
87         asyncHelper->invokeHapTask = udmfExecutor.Schedule(std::chrono::milliseconds(START_ABILITY_INTERVAL),
88             (std::bind(&UdmfAsyncClient::InvokeHapTask, this, params.query.key)));
89     }
90     asyncHelper->getDataTask = udmfExecutor.Execute(std::bind(&UdmfAsyncClient::GetDataTask, this, params.query));
91     asyncHelper->progressTask = udmfExecutor.Execute(std::bind(&UdmfAsyncClient::ProgressTask, this, params.query.key));
92 #endif
93     return E_OK;
94 }
95 
Cancel(std::string businessUdKey)96 Status UdmfAsyncClient::Cancel(std::string businessUdKey)
97 {
98     std::lock_guard<std::mutex> lockMap(mutex_);
99     if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
100         LOG_ERROR(UDMF_CLIENT, "No task can Cancel, key=%{public}s", businessUdKey.c_str());
101         return E_ERROR;
102     }
103     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
104     asyncHelper->progressQueue.Cancel();
105     return E_OK;
106 }
107 
CancelOnSingleTask()108 Status UdmfAsyncClient::CancelOnSingleTask()
109 {
110     std::lock_guard<std::mutex> lockMap(mutex_);
111     if (asyncHelperMap_.empty()) {
112         LOG_ERROR(UDMF_CLIENT, "No task can cancel");
113         return E_ERROR;
114     }
115     if (asyncHelperMap_.size() > 1) {
116         LOG_ERROR(UDMF_CLIENT, "Multiple tasks exist");
117         return E_ERROR;
118     }
119     LOG_INFO(UDMF_CLIENT, "Cancel task key=%{public}s", asyncHelperMap_.begin()->first.c_str());
120     asyncHelperMap_.begin()->second->progressQueue.Cancel();
121     return E_OK;
122 }
123 
ProgressTask(const std::string & businessUdKey)124 Status UdmfAsyncClient::ProgressTask(const std::string &businessUdKey)
125 {
126     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
127     if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
128         auto status = SetProgressData(businessUdKey);
129         if (status != E_OK) {
130             Clear(businessUdKey);
131             return status;
132         }
133     }
134     auto lastUpdateTime = std::chrono::steady_clock::now();
135     while (asyncHelper->lastProgress >= PROGRESS_INIT && asyncHelper->lastProgress < PROGRESS_ALL_FINISHED) {
136         auto pair = asyncHelper->progressQueue.Poll();
137         if (!pair.first) {
138             std::this_thread::sleep_for(std::chrono::milliseconds(READ_PROGRESS_INTERVAL));
139             continue;
140         }
141         auto progressInfo = pair.second;
142         if (progressInfo->progress >= PROGRESS_INIT && progressInfo->progress < asyncHelper->lastProgress) {
143             continue;
144         }
145         if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
146             auto now = std::chrono::steady_clock::now();
147             auto elapsedMs = std::chrono::duration_cast<std::chrono::milliseconds>(now - lastUpdateTime).count();
148             if (progressInfo->progress != asyncHelper->lastProgress || elapsedMs > UPDATA_TIMESTAMP_INTERVAL) {
149                 UpdateProgressData(asyncHelper->processKey, *progressInfo);
150                 lastUpdateTime = now;
151             }
152         }
153         asyncHelper->lastProgress = progressInfo->progress;
154     }
155     Clear(businessUdKey);
156     return E_OK;
157 }
158 
GetDataTask(const QueryOption & query)159 Status UdmfAsyncClient::GetDataTask(const QueryOption &query)
160 {
161     auto &asyncHelper = asyncHelperMap_.at(query.key);
162     if (GetDataFromCache(asyncHelper, query) == E_OK) {
163         ProcessUnifiedData(asyncHelper);
164         return E_OK;
165     }
166     return CheckSync(asyncHelper, query);
167 }
168 
InvokeHapTask(const std::string & businessUdKey)169 Status UdmfAsyncClient::InvokeHapTask(const std::string &businessUdKey)
170 {
171     LOG_INFO(UDMF_CLIENT, "InvokeHap start!");
172     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
173     if (asyncHelper->progressQueue.IsCancel() || asyncHelper->progressQueue.IsClear()) {
174         LOG_INFO(UDMF_CLIENT, "Finished, not invoke hap.");
175         Clear(businessUdKey);
176         return E_OK;
177     }
178     if (asyncHelper->processKey.empty()) {
179         LOG_ERROR(UDMF_CLIENT, "Get key failed, not invoke hap.");
180         Clear(businessUdKey);
181         return E_ERROR;
182     }
183     sptr<IRemoteObject> callback = new (std::nothrow) ProgressSignalCallback();
184     if (callback == nullptr) {
185         LOG_ERROR(UDMF_CLIENT, "Create ProgressSignalCallback failed");
186         Clear(businessUdKey);
187         return E_ERROR;
188     }
189     auto obsMgrClient = AAFwk::DataObsMgrClient::GetInstance();
190     if (obsMgrClient == nullptr) {
191         LOG_ERROR(UDMF_CLIENT, "Get DataObsMgrClient failed");
192         Clear(businessUdKey);
193         return E_ERROR;
194     }
195 
196     auto status = obsMgrClient->NotifyProcessObserver(asyncHelper->processKey, callback);
197     if (status != AAFwk::SUCCESS) {
198         LOG_ERROR(UDMF_CLIENT, "Notify process dialog failed, status=%{public}d", status);
199         Clear(businessUdKey);
200         return E_ERROR;
201     }
202     Clear(businessUdKey);
203     return E_OK;
204 }
205 
RegisterAsyncHelper(const GetDataParams & params)206 Status UdmfAsyncClient::RegisterAsyncHelper(const GetDataParams &params)
207 {
208     std::lock_guard<std::mutex> lock(mutex_);
209     if (asyncHelperMap_.find(params.query.key) != asyncHelperMap_.end()) {
210         LOG_ERROR(UDMF_CLIENT, "The same task is running, key = %{public}s", params.query.key.c_str());
211         return E_IDEMPOTENT_ERROR;
212     }
213     auto asyncHelper = std::make_unique<AsyncHelper>();
214     asyncHelper->businessUdKey = params.query.key;
215     asyncHelper->progressListener = params.progressListener;
216     asyncHelper->progressIndicator = params.progressIndicator;
217     asyncHelper->fileConflictOptions = params.fileConflictOptions;
218     asyncHelper->destUri = params.destUri;
219     asyncHelper->acceptableInfo = params.acceptableInfo;
220     asyncHelperMap_.insert_or_assign(params.query.key, std::move(asyncHelper));
221     return E_OK;
222 }
223 
CheckSync(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)224 Status UdmfAsyncClient::CheckSync(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
225 {
226 #ifndef IOS_PLATFORM
227     AsyncProcessInfo processInfo = {
228         .businessUdKey = query.key
229     };
230     auto serviceClient = UdmfServiceClient::GetInstance();
231     if (serviceClient == nullptr) {
232         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
233         return E_IPC;
234     }
235     auto status = serviceClient->ObtainAsynProcess(processInfo);
236     if (status != E_OK) {
237         LOG_ERROR(UDMF_CLIENT, "Sync error status = %{public}d!", status);
238         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_ERROR };
239         CallProgress(asyncHelper, progressInfo, nullptr);
240         return static_cast<Status>(status);
241     }
242     if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_FAILURE) {
243         LOG_ERROR(UDMF_CLIENT, "Sync failed!");
244         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
245         CallProgress(asyncHelper, progressInfo, nullptr);
246         return E_ERROR;
247     }
248     if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_SUCCESS) {
249         ProgressInfo progressInfo = { .progress = PROGRESS_SYNC_FINSHED, .errorCode = E_OK };
250         CallProgress(asyncHelper, progressInfo, nullptr);
251         return GetDataFromDB(asyncHelper, query);
252     }
253     if (asyncHelper->sycnRetryTime > MAX_SYNC_TIMES) {
254         LOG_ERROR(UDMF_CLIENT, "Sync retry timeout!");
255         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
256         CallProgress(asyncHelper, progressInfo, nullptr);
257         return E_SYNC_FAILED;
258     }
259     (asyncHelper->sycnRetryTime)++;
260     asyncHelper->getDataTask = udmfExecutor.Schedule(std::chrono::milliseconds(SYNC_INTERVAL),
261         std::bind(&UdmfAsyncClient::GetDataTask, this, query));
262 #endif
263     return E_OK;
264 }
265 
GetDataFromDB(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)266 Status UdmfAsyncClient::GetDataFromDB(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
267 {
268     auto &data = *(asyncHelper->data);
269     auto delayDataCallback = [this](const std::string &key, const UnifiedData &unifiedData) {
270         if (asyncHelperMap_.find(key) == asyncHelperMap_.end()) {
271             LOG_ERROR(UDMF_CLIENT, "No task exist, key=%{public}s", key.c_str());
272             return;
273         }
274         auto &asyncHelper = asyncHelperMap_.at(key);
275         *(asyncHelper->data) = unifiedData;
276         UdmfAsyncClient::GetInstance().ProcessUnifiedData(asyncHelper);
277     };
278     sptr<IRemoteObject> iUdmfNotifier = new (std::nothrow) DelayDataCallbackClient(delayDataCallback);
279     if (iUdmfNotifier == nullptr) {
280         LOG_ERROR(UDMF_CLIENT, "IUdmfNotifier unavailable");
281         return E_ERROR;
282     }
283     std::shared_ptr<UnifiedData> dataPtr = std::make_shared<UnifiedData>();
284     auto status = static_cast<Status>(UdmfClient::GetInstance().GetDataIfAvailable(query.key,
285         asyncHelper->acceptableInfo, iUdmfNotifier, dataPtr));
286     if (status != E_OK) {
287         LOG_ERROR(UDMF_CLIENT, "GetData error, status = %{public}d", status);
288         ProgressInfo progressInfo = {
289             .progress = PROGRESS_ERROR,
290             .errorCode = status
291         };
292         CallProgress(asyncHelper, progressInfo, nullptr);
293         return status;
294     }
295     if (dataPtr->IsEmpty()) {
296         LOG_INFO(UDMF_CLIENT, "Wait delay data");
297         return E_OK;
298     }
299     data = *dataPtr;
300     return ProcessUnifiedData(asyncHelper);
301 }
302 
GetDataFromCache(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)303 Status UdmfAsyncClient::GetDataFromCache(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
304 {
305     return UdmfClient::GetInstance().GetDataFromCache(query, *(asyncHelper->data));
306 }
307 
SetProgressData(const std::string & businessUdKey)308 Status UdmfAsyncClient::SetProgressData(const std::string &businessUdKey)
309 {
310     auto serviceClient = UdmfServiceClient::GetInstance();
311     if (serviceClient == nullptr) {
312         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
313         return E_IPC;
314     }
315     std::string progressKey;
316     CustomOption cusomOption = {
317         .intention = Intention::UD_INTENTION_DATA_HUB
318     };
319     auto obj = std::make_shared<Object>();
320     auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
321     progressRecord->SetContent(std::to_string(PROGRESS_INIT));
322     progressRecord->SetAbstract(std::to_string(GetCurrentTimeMillis()));
323     UnifiedData progressData;
324     progressData.AddRecord(progressRecord);
325     auto status = serviceClient->SetData(cusomOption, progressData, progressKey);
326     if (status != E_OK) {
327         LOG_ERROR(UDMF_CLIENT, "Set progress data error, status = %{public}d", status);
328         return static_cast<Status>(status);
329     }
330     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
331     asyncHelper->processKey = progressKey;
332     return E_OK;
333 }
334 
UpdateProgressData(const std::string & progressUdKey,const ProgressInfo & progressInfo)335 Status UdmfAsyncClient::UpdateProgressData(const std::string &progressUdKey, const ProgressInfo &progressInfo)
336 {
337     auto serviceClient = UdmfServiceClient::GetInstance();
338     if (serviceClient == nullptr) {
339         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
340         return E_IPC;
341     }
342     QueryOption queryOption = {
343         .key = progressUdKey,
344         .intention = Intention::UD_INTENTION_DATA_HUB
345     };
346     auto obj = std::make_shared<Object>();
347     auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
348     if (progressInfo.progress < PROGRESS_INIT) {
349         progressRecord->SetContent(std::to_string(PROGRESS_ALL_FINISHED));
350     } else {
351         progressRecord->SetContent(std::to_string(progressInfo.progress));
352     }
353     progressRecord->SetAbstract(std::to_string(GetCurrentTimeMillis()));
354     UnifiedData progressData;
355     progressData.AddRecord(progressRecord);
356     auto status = serviceClient->UpdateData(queryOption, progressData);
357     if (status != E_OK) {
358         LOG_ERROR(UDMF_CLIENT, "Update progress data error, status = %{public}d", status);
359         return static_cast<Status>(status);
360     }
361     return E_OK;
362 }
363 
CopyFile(std::unique_ptr<AsyncHelper> & asyncHelper)364 Status UdmfAsyncClient::CopyFile(std::unique_ptr<AsyncHelper> &asyncHelper)
365 {
366     auto status = E_OK;
367     if (asyncHelper->destUri.empty()) {
368         LOG_INFO(UDMF_CLIENT, "No dest path, no copy.");
369     } else {
370         status = UdmfCopyFile::GetInstance().Copy(asyncHelper);
371     }
372     ProgressInfo progressInfo = {
373         .progress = PROGRESS_ALL_FINISHED,
374         .errorCode = status
375     };
376     CallProgress(asyncHelper, progressInfo, asyncHelper->data);
377     return E_OK;
378 }
379 
CallProgress(std::unique_ptr<AsyncHelper> & asyncHelper,ProgressInfo & progressInfo,std::shared_ptr<UnifiedData> data)380 void UdmfAsyncClient::CallProgress(std::unique_ptr<AsyncHelper> &asyncHelper, ProgressInfo &progressInfo,
381     std::shared_ptr<UnifiedData> data)
382 {
383     if (progressInfo.errorCode == E_OK) {
384         if (progressInfo.progress == PROGRESS_ALL_FINISHED) {
385             progressInfo.progressStatus = ListenerStatus::FINISHED;
386         } else {
387             progressInfo.progressStatus = ListenerStatus::PROCESSING;
388         }
389     } else {
390         auto it = STATUS_MAP.find(progressInfo.errorCode);
391         if (it == STATUS_MAP.end()) {
392             progressInfo.progressStatus = ListenerStatus::INNER_ERROR;
393         } else {
394             progressInfo.progressStatus = it->second;
395         }
396     }
397 
398     asyncHelper->progressListener(progressInfo, data);
399     asyncHelper->progressQueue.PushBack(progressInfo);
400 }
401 
Clear(const std::string & businessUdKey)402 Status UdmfAsyncClient::Clear(const std::string &businessUdKey)
403 {
404 #ifndef IOS_PLATFORM
405     std::lock_guard<std::mutex> lockMap(mutex_);
406     if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
407         LOG_ERROR(UDMF_CLIENT, "No task can Clear, key=%{public}s", businessUdKey.c_str());
408         return E_ERROR;
409     }
410     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
411     if (!asyncHelper->progressQueue.Clear()) {
412         return E_OK;
413     }
414     if (asyncHelper->invokeHapTask != ExecutorPool::INVALID_TASK_ID) {
415         udmfExecutor.Remove(asyncHelper->invokeHapTask);
416     }
417     udmfExecutor.Remove(asyncHelper->getDataTask);
418     udmfExecutor.Remove(asyncHelper->progressTask);
419     asyncHelperMap_.erase(businessUdKey);
420     UdmfServiceClient::GetInstance()->ClearAsynProcessByKey(businessUdKey);
421     LOG_INFO(UDMF_CLIENT, "Clear task success, key = %{public}s", businessUdKey.c_str());
422 #endif
423     return E_OK;
424 }
425 
ProcessUnifiedData(std::unique_ptr<AsyncHelper> & asyncHelper)426 Status UdmfAsyncClient::ProcessUnifiedData(std::unique_ptr<AsyncHelper> &asyncHelper)
427 {
428     if (!asyncHelper->data->HasFileType() && !asyncHelper->data->HasUriInfo()) {
429         ProgressInfo progressInfo = {
430             .progress = PROGRESS_ALL_FINISHED,
431             .errorCode = E_OK
432         };
433         CallProgress(asyncHelper, progressInfo, asyncHelper->data);
434         return E_OK;
435     }
436     ProgressInfo progressInfo = {
437         .progress = PROGRESS_GET_DATA_FINISHED,
438         .errorCode = E_OK
439     };
440     CallProgress(asyncHelper, progressInfo, nullptr);
441     return CopyFile(asyncHelper);
442 }
443 
IsParamValid(const GetDataParams & params)444 bool UdmfAsyncClient::IsParamValid(const GetDataParams &params)
445 {
446     auto query = params.query;
447     if (query.key.empty() || query.intention != Intention::UD_INTENTION_DRAG) {
448         LOG_ERROR(UDMF_CLIENT, "Params query invalid. query.key=%{public}s, intention=%{public}d", query.key.c_str(),
449             query.intention);
450         return false;
451     }
452     if (params.progressListener == nullptr) {
453         LOG_ERROR(UDMF_CLIENT, "Params progressListener not null.");
454         return false;
455     }
456     return true;
457 }
458 
PushTaskToExecutor(UdmfTask task)459 void UdmfAsyncClient::PushTaskToExecutor(UdmfTask task)
460 {
461     udmfExecutor.Execute(std::move(task));
462 }
463 
GetCurrentTimeMillis()464 uint64_t UdmfAsyncClient::GetCurrentTimeMillis()
465 {
466     struct timeval tv = { 0, 0 };
467     gettimeofday(&tv, nullptr);
468     return (static_cast<uint64_t>(tv.tv_sec) * SEC_TO_MILLISEC +
469         static_cast<uint64_t>(tv.tv_usec) / MICROSEC_TO_MILLISEC);
470 }
471 } // namespace OHOS::UDMF
472