• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "UdmfAsyncClient"
16 
17 #include "udmf_async_client.h"
18 
19 #include "dataobs_mgr_client.h"
20 #include "logger.h"
21 #include "plain_text.h"
22 #include "progress_callback.h"
23 #include "udmf_client.h"
24 #include "udmf_copy_file.h"
25 #include "udmf_service_client.h"
26 
27 namespace OHOS::UDMF {
28 static constexpr size_t MAX_THREADS = 10;
29 static constexpr size_t MIN_THREADS = 0;
30 
31 static constexpr int32_t START_ABILITY_INTERVAL = 500;
32 static constexpr int32_t READ_PROGRESS_INTERVAL = 100;
33 static constexpr int32_t SYNC_INTERVAL = 200;
34 static constexpr int32_t MAX_SYNC_TIMES = 14;
35 
36 static std::unordered_map<Status, int32_t> STATUS_MAP = {
37     { E_INVALID_PARAMETERS, ListenerStatus::INVALID_PARAMETERS },
38     { E_NOT_FOUND, ListenerStatus::DATA_NOT_FOUND },
39     { E_SYNC_FAILED, ListenerStatus::SYNC_FAILED },
40     { E_COPY_FILE_FAILED, ListenerStatus::COPY_FILE_FAILED },
41     { E_COPY_CANCELED, ListenerStatus::CANCEL }
42 };
43 
44 static constexpr int32_t PROGRESS_ERROR = -1;
45 static constexpr int32_t PROGRESS_INIT = 0;
46 static constexpr int32_t PROGRESS_SYNC_FINSHED = 10;
47 static constexpr int32_t PROGRESS_GET_DATA_FINISHED = 20;
48 static constexpr int32_t PROGRESS_ALL_FINISHED = 100;
49 
50 #ifdef IOS_PLATFORM
UdmfAsyncClient()51 UdmfAsyncClient::UdmfAsyncClient() {}
52 #else
UdmfAsyncClient()53 UdmfAsyncClient::UdmfAsyncClient() : executor_(MAX_THREADS, MIN_THREADS) {}
54 #endif
55 
GetInstance()56 UdmfAsyncClient &UdmfAsyncClient::GetInstance()
57 {
58     static UdmfAsyncClient instance;
59     return instance;
60 }
61 
StartAsyncDataRetrieval(const GetDataParams & params)62 Status UdmfAsyncClient::StartAsyncDataRetrieval(const GetDataParams &params)
63 {
64 #ifndef IOS_PLATFORM
65     if (!IsParamValid(params)) {
66         return E_INVALID_PARAMETERS;
67     }
68     auto status = RegisterAsyncHelper(params);
69     if (status != E_OK) {
70         return status;
71     }
72     auto &asyncHelper = asyncHelperMap_.at(params.query.key);
73     if (params.progressIndicator == ProgressIndicator::DEFAULT) {
74         asyncHelper->progressQueue.SetClearable(false);
75         asyncHelper->invokeHapTask = executor_.Schedule(std::chrono::milliseconds(START_ABILITY_INTERVAL),
76             (std::bind(&UdmfAsyncClient::InvokeHapTask, this, params.query.key)));
77     }
78     asyncHelper->getDataTask = executor_.Execute(std::bind(&UdmfAsyncClient::GetDataTask, this, params.query));
79     asyncHelper->progressTask = executor_.Execute(std::bind(&UdmfAsyncClient::ProgressTask, this, params.query.key));
80 #endif
81     return E_OK;
82 }
83 
Cancel(std::string businessUdKey)84 Status UdmfAsyncClient::Cancel(std::string businessUdKey)
85 {
86     std::lock_guard<std::mutex> lockMap(mutex_);
87     if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
88         LOG_ERROR(UDMF_CLIENT, "No task can Cancel, key=%{public}s", businessUdKey.c_str());
89         return E_ERROR;
90     }
91     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
92     asyncHelper->progressQueue.Cancel();
93     return E_OK;
94 }
95 
CancelOnSingleTask()96 Status UdmfAsyncClient::CancelOnSingleTask()
97 {
98     std::lock_guard<std::mutex> lockMap(mutex_);
99     if (asyncHelperMap_.empty()) {
100         LOG_ERROR(UDMF_CLIENT, "No task can cancel");
101         return E_ERROR;
102     }
103     if (asyncHelperMap_.size() > 1) {
104         LOG_ERROR(UDMF_CLIENT, "Multiple tasks exist");
105         return E_ERROR;
106     }
107     LOG_INFO(UDMF_CLIENT, "Cancel task key=%{public}s", asyncHelperMap_.begin()->first.c_str());
108     asyncHelperMap_.begin()->second->progressQueue.Cancel();
109     return E_OK;
110 }
111 
ProgressTask(const std::string & businessUdKey)112 Status UdmfAsyncClient::ProgressTask(const std::string &businessUdKey)
113 {
114     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
115     if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
116         auto status = SetProgressData(businessUdKey);
117         if (status != E_OK) {
118             Clear(businessUdKey);
119             return status;
120         }
121     }
122     while (asyncHelper->lastProgress >= PROGRESS_INIT && asyncHelper->lastProgress < PROGRESS_ALL_FINISHED) {
123         auto pair = asyncHelper->progressQueue.Poll();
124         if (!pair.first) {
125             std::this_thread::sleep_for(std::chrono::milliseconds(READ_PROGRESS_INTERVAL));
126             continue;
127         }
128         auto progressInfo = pair.second;
129         if (progressInfo->progress >= PROGRESS_INIT && progressInfo->progress <= asyncHelper->lastProgress) {
130             continue;
131         }
132         asyncHelper->lastProgress = progressInfo->progress;
133         if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
134             UpdateProgressData(asyncHelper->processKey, *progressInfo);
135         }
136     }
137     Clear(businessUdKey);
138     return E_OK;
139 }
140 
GetDataTask(const QueryOption & query)141 Status UdmfAsyncClient::GetDataTask(const QueryOption &query)
142 {
143     auto &asyncHelper = asyncHelperMap_.at(query.key);
144     if (GetDataFromCache(asyncHelper, query) == E_OK) {
145         ProcessUnifiedData(asyncHelper);
146         return E_OK;
147     }
148     return CheckSync(asyncHelper, query);
149 }
150 
InvokeHapTask(const std::string & businessUdKey)151 Status UdmfAsyncClient::InvokeHapTask(const std::string &businessUdKey)
152 {
153     LOG_INFO(UDMF_CLIENT, "InvokeHap start!");
154     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
155     if (asyncHelper->progressQueue.IsCancel() || asyncHelper->progressQueue.IsClear()) {
156         LOG_INFO(UDMF_CLIENT, "Finished, not invoke hap.");
157         Clear(businessUdKey);
158         return E_OK;
159     }
160     if (asyncHelper->processKey.empty()) {
161         LOG_ERROR(UDMF_CLIENT, "Get key failed, not invoke hap.");
162         Clear(businessUdKey);
163         return E_ERROR;
164     }
165     sptr<IRemoteObject> callback = new ProgressSignalCallback();
166     auto obsMgrClient = AAFwk::DataObsMgrClient::GetInstance();
167     if (obsMgrClient == nullptr) {
168         LOG_ERROR(UDMF_CLIENT, "Get DataObsMgrClient failed");
169         Clear(businessUdKey);
170         return E_ERROR;
171     }
172 
173     auto status = obsMgrClient->NotifyProcessObserver(asyncHelper->processKey, callback);
174     if (status != AAFwk::SUCCESS) {
175         LOG_ERROR(UDMF_CLIENT, "Notify process dialog failed, status=%{public}d", status);
176         Clear(businessUdKey);
177         return E_ERROR;
178     }
179     Clear(businessUdKey);
180     return E_OK;
181 }
182 
RegisterAsyncHelper(const GetDataParams & params)183 Status UdmfAsyncClient::RegisterAsyncHelper(const GetDataParams &params)
184 {
185     std::lock_guard<std::mutex> lock(mutex_);
186     if (asyncHelperMap_.find(params.query.key) != asyncHelperMap_.end()) {
187         LOG_ERROR(UDMF_CLIENT, "The same task is running, key = %{public}s", params.query.key.c_str());
188         return E_IDEMPOTENT_ERROR;
189     }
190     auto asyncHelper = std::make_unique<AsyncHelper>();
191     asyncHelper->businessUdKey = params.query.key;
192     asyncHelper->progressListener = params.progressListener;
193     asyncHelper->progressIndicator = params.progressIndicator;
194     asyncHelper->fileConflictOptions = params.fileConflictOptions;
195     asyncHelper->destUri = params.destUri;
196     asyncHelperMap_.insert_or_assign(params.query.key, std::move(asyncHelper));
197     return E_OK;
198 }
199 
CheckSync(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)200 Status UdmfAsyncClient::CheckSync(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
201 {
202 #ifndef IOS_PLATFORM
203     AsyncProcessInfo processInfo = {
204         .businessUdKey = query.key
205     };
206     auto serviceClient = UdmfServiceClient::GetInstance();
207     if (serviceClient == nullptr) {
208         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
209         return E_IPC;
210     }
211     auto status = serviceClient->ObtainAsynProcess(processInfo);
212     if (status != E_OK) {
213         LOG_ERROR(UDMF_CLIENT, "Sync error status = %{public}d!", status);
214         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_ERROR };
215         CallProgress(asyncHelper, progressInfo, nullptr);
216         return static_cast<Status>(status);
217     }
218     if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_FAILURE) {
219         LOG_ERROR(UDMF_CLIENT, "Sync failed!");
220         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
221         CallProgress(asyncHelper, progressInfo, nullptr);
222         return E_ERROR;
223     }
224     if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_SUCCESS) {
225         ProgressInfo progressInfo = { .progress = PROGRESS_SYNC_FINSHED, .errorCode = E_OK };
226         CallProgress(asyncHelper, progressInfo, nullptr);
227         return GetDataFromDB(asyncHelper, query);
228     }
229     if (asyncHelper->sycnRetryTime > MAX_SYNC_TIMES) {
230         LOG_ERROR(UDMF_CLIENT, "Sync retry timeout!");
231         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
232         CallProgress(asyncHelper, progressInfo, nullptr);
233         return E_SYNC_FAILED;
234     }
235     (asyncHelper->sycnRetryTime)++;
236     asyncHelper->getDataTask = executor_.Schedule(std::chrono::milliseconds(SYNC_INTERVAL),
237         std::bind(&UdmfAsyncClient::GetDataTask, this, query));
238 #endif
239     return E_OK;
240 }
241 
GetDataFromDB(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)242 Status UdmfAsyncClient::GetDataFromDB(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
243 {
244     auto &data = *(asyncHelper->data);
245     auto status = static_cast<Status>(UdmfServiceClient::GetInstance()->GetData(query, data));
246     if (status != E_OK) {
247         LOG_ERROR(UDMF_CLIENT, "GetData error, status = %{public}d", status);
248         ProgressInfo progressInfo = {
249             .progress = PROGRESS_ERROR,
250             .errorCode = status
251         };
252         CallProgress(asyncHelper, progressInfo, nullptr);
253         return status;
254     }
255     return ProcessUnifiedData(asyncHelper);
256 }
257 
GetDataFromCache(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)258 Status UdmfAsyncClient::GetDataFromCache(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
259 {
260     return UdmfClient::GetInstance().GetDataFromCache(query, *(asyncHelper->data));
261 }
262 
SetProgressData(const std::string & businessUdKey)263 Status UdmfAsyncClient::SetProgressData(const std::string &businessUdKey)
264 {
265     auto serviceClient = UdmfServiceClient::GetInstance();
266     if (serviceClient == nullptr) {
267         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
268         return E_IPC;
269     }
270     std::string progressKey;
271     CustomOption cusomOption = {
272         .intention = Intention::UD_INTENTION_DATA_HUB
273     };
274     auto obj = std::make_shared<Object>();
275     auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
276     progressRecord->SetContent(std::to_string(PROGRESS_INIT));
277     UnifiedData progressData;
278     progressData.AddRecord(progressRecord);
279     auto status = serviceClient->SetData(cusomOption, progressData, progressKey);
280     if (status != E_OK) {
281         LOG_ERROR(UDMF_CLIENT, "Set progress data error, status = %{public}d", status);
282         return static_cast<Status>(status);
283     }
284     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
285     asyncHelper->processKey = progressKey;
286     return E_OK;
287 }
288 
UpdateProgressData(const std::string & progressUdKey,const ProgressInfo & progressInfo)289 Status UdmfAsyncClient::UpdateProgressData(const std::string &progressUdKey, const ProgressInfo &progressInfo)
290 {
291     auto serviceClient = UdmfServiceClient::GetInstance();
292     if (serviceClient == nullptr) {
293         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
294         return E_IPC;
295     }
296     QueryOption queryOption = {
297         .key = progressUdKey,
298         .intention = Intention::UD_INTENTION_DATA_HUB
299     };
300     auto obj = std::make_shared<Object>();
301     auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
302     if (progressInfo.progress < PROGRESS_INIT) {
303         progressRecord->SetContent(std::to_string(PROGRESS_ALL_FINISHED));
304     } else {
305         progressRecord->SetContent(std::to_string(progressInfo.progress));
306     }
307     UnifiedData progressData;
308     progressData.AddRecord(progressRecord);
309     auto status = serviceClient->UpdateData(queryOption, progressData);
310     if (status != E_OK) {
311         LOG_ERROR(UDMF_CLIENT, "Update progress data error, status = %{public}d", status);
312         return static_cast<Status>(status);
313     }
314     return E_OK;
315 }
316 
CopyFile(std::unique_ptr<AsyncHelper> & asyncHelper)317 Status UdmfAsyncClient::CopyFile(std::unique_ptr<AsyncHelper> &asyncHelper)
318 {
319     auto status = E_OK;
320     if (asyncHelper->destUri.empty()) {
321         LOG_INFO(UDMF_CLIENT, "No dest path, no copy.");
322     } else {
323         status = UdmfCopyFile::GetInstance().Copy(asyncHelper);
324     }
325     ProgressInfo progressInfo = {
326         .progress = PROGRESS_ALL_FINISHED,
327         .errorCode = status
328     };
329     CallProgress(asyncHelper, progressInfo, asyncHelper->data);
330     return E_OK;
331 }
332 
CallProgress(std::unique_ptr<AsyncHelper> & asyncHelper,ProgressInfo & progressInfo,std::shared_ptr<UnifiedData> data)333 void UdmfAsyncClient::CallProgress(std::unique_ptr<AsyncHelper> &asyncHelper, ProgressInfo &progressInfo,
334     std::shared_ptr<UnifiedData> data)
335 {
336     if (progressInfo.errorCode == E_OK) {
337         if (progressInfo.progress == PROGRESS_ALL_FINISHED) {
338             progressInfo.progressStatus = ListenerStatus::FINISHED;
339         } else {
340             progressInfo.progressStatus = ListenerStatus::PROCESSING;
341         }
342     } else {
343         auto it = STATUS_MAP.find(progressInfo.errorCode);
344         if (it == STATUS_MAP.end()) {
345             progressInfo.progressStatus = ListenerStatus::INNER_ERROR;
346         } else {
347             progressInfo.progressStatus = it->second;
348         }
349     }
350 
351     asyncHelper->progressListener(progressInfo, data);
352     asyncHelper->progressQueue.PushBack(progressInfo);
353 }
354 
Clear(const std::string & businessUdKey)355 Status UdmfAsyncClient::Clear(const std::string &businessUdKey)
356 {
357 #ifndef IOS_PLATFORM
358     std::lock_guard<std::mutex> lockMap(mutex_);
359     if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
360         LOG_ERROR(UDMF_CLIENT, "No task can Clear, key=%{public}s", businessUdKey.c_str());
361         return E_ERROR;
362     }
363     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
364     if (!asyncHelper->progressQueue.Clear()) {
365         return E_OK;
366     }
367     if (asyncHelper->invokeHapTask != ExecutorPool::INVALID_TASK_ID) {
368         executor_.Remove(asyncHelper->invokeHapTask);
369     }
370     executor_.Remove(asyncHelper->getDataTask);
371     executor_.Remove(asyncHelper->progressTask);
372     asyncHelperMap_.erase(businessUdKey);
373     UdmfServiceClient::GetInstance()->ClearAsynProcessByKey(businessUdKey);
374     LOG_INFO(UDMF_CLIENT, "Clear task success, key = %{public}s", businessUdKey.c_str());
375 #endif
376     return E_OK;
377 }
378 
ProcessUnifiedData(std::unique_ptr<AsyncHelper> & asyncHelper)379 Status UdmfAsyncClient::ProcessUnifiedData(std::unique_ptr<AsyncHelper> &asyncHelper)
380 {
381     if (!asyncHelper->data->HasFileType() && !asyncHelper->data->HasUriInfo()) {
382         ProgressInfo progressInfo = {
383             .progress = PROGRESS_ALL_FINISHED,
384             .errorCode = E_OK
385         };
386         CallProgress(asyncHelper, progressInfo, asyncHelper->data);
387         return E_OK;
388     }
389     ProgressInfo progressInfo = {
390         .progress = PROGRESS_GET_DATA_FINISHED,
391         .errorCode = E_OK
392     };
393     CallProgress(asyncHelper, progressInfo, nullptr);
394     return CopyFile(asyncHelper);
395 }
396 
IsParamValid(const GetDataParams & params)397 bool UdmfAsyncClient::IsParamValid(const GetDataParams &params)
398 {
399     auto query = params.query;
400     if (query.key.empty() || query.intention != Intention::UD_INTENTION_DRAG) {
401         LOG_ERROR(UDMF_CLIENT, "Params query invalid. query.key=%{public}s, intention=%{public}d", query.key.c_str(),
402             query.intention);
403         return false;
404     }
405     if (params.progressListener == nullptr) {
406         LOG_ERROR(UDMF_CLIENT, "Params progressListener not null.");
407         return false;
408     }
409     return true;
410 }
411 } // namespace OHOS::UDMF
412