• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "UdmfAsyncClient"
16 
17 #include "udmf_async_client.h"
18 
19 #include "logger.h"
20 #include "plain_text.h"
21 #include "progress_callback.h"
22 #include "udmf_client.h"
23 #include "udmf_copy_file.h"
24 #include "udmf_service_client.h"
25 
26 namespace OHOS::UDMF {
27 static constexpr size_t MAX_THREADS = 10;
28 static constexpr size_t MIN_THREADS = 0;
29 
30 static constexpr int32_t START_ABILITY_INTERVAL = 500;
31 static constexpr int32_t READ_PROGRESS_INTERVAL = 100;
32 static constexpr int32_t SYNC_INTERVAL = 200;
33 static constexpr int32_t MAX_SYNC_TIMES = 14;
34 
35 static std::unordered_map<Status, int32_t> STATUS_MAP = {
36     { E_INVALID_PARAMETERS, ListenerStatus::INVALID_PARAMETERS },
37     { E_NOT_FOUND, ListenerStatus::DATA_NOT_FOUND },
38     { E_SYNC_FAILED, ListenerStatus::SYNC_FAILED },
39     { E_COPY_FILE_FAILED, ListenerStatus::COPY_FILE_FAILED },
40     { E_COPY_CANCELED, ListenerStatus::CANCEL }
41 };
42 
43 static constexpr int32_t PROGRESS_ERROR = -1;
44 static constexpr int32_t PROGRESS_INIT = 0;
45 static constexpr int32_t PROGRESS_SYNC_FINSHED = 10;
46 static constexpr int32_t PROGRESS_GET_DATA_FINISHED = 20;
47 static constexpr int32_t PROGRESS_ALL_FINISHED = 100;
48 
49 #ifdef IOS_PLATFORM
UdmfAsyncClient()50 UdmfAsyncClient::UdmfAsyncClient() {}
51 #else
UdmfAsyncClient()52 UdmfAsyncClient::UdmfAsyncClient() : executor_(MAX_THREADS, MIN_THREADS) {}
53 #endif
54 
GetInstance()55 UdmfAsyncClient &UdmfAsyncClient::GetInstance()
56 {
57     static UdmfAsyncClient instance;
58     return instance;
59 }
60 
StartAsyncDataRetrieval(const GetDataParams & params)61 Status UdmfAsyncClient::StartAsyncDataRetrieval(const GetDataParams &params)
62 {
63 #ifndef IOS_PLATFORM
64     if (!IsParamValid(params)) {
65         return E_INVALID_PARAMETERS;
66     }
67     auto status = RegisterAsyncHelper(params);
68     if (status != E_OK) {
69         return status;
70     }
71     auto &asyncHelper = asyncHelperMap_.at(params.query.key);
72     if (params.progressIndicator == ProgressIndicator::DEFAULT) {
73         asyncHelper->progressQueue.SetClearable(false);
74         asyncHelper->invokeHapTask = executor_.Schedule(std::chrono::milliseconds(START_ABILITY_INTERVAL),
75             (std::bind(&UdmfAsyncClient::InvokeHapTask, this, params.query.key)));
76     }
77     asyncHelper->getDataTask = executor_.Execute(std::bind(&UdmfAsyncClient::GetDataTask, this, params.query));
78     asyncHelper->progressTask = executor_.Execute(std::bind(&UdmfAsyncClient::ProgressTask, this, params.query.key));
79 #endif
80     return E_OK;
81 }
82 
Cancel(std::string businessUdKey)83 Status UdmfAsyncClient::Cancel(std::string businessUdKey)
84 {
85     std::lock_guard<std::mutex> lockMap(mutex_);
86     if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
87         LOG_ERROR(UDMF_CLIENT, "No task can Cancel, key=%{public}s", businessUdKey.c_str());
88         return E_ERROR;
89     }
90     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
91     asyncHelper->progressQueue.Cancel();
92     return E_OK;
93 }
94 
CancelOnSingleTask()95 Status UdmfAsyncClient::CancelOnSingleTask()
96 {
97     std::lock_guard<std::mutex> lockMap(mutex_);
98     if (asyncHelperMap_.empty()) {
99         LOG_ERROR(UDMF_CLIENT, "No task can cancel");
100         return E_ERROR;
101     }
102     if (asyncHelperMap_.size() > 1) {
103         LOG_ERROR(UDMF_CLIENT, "Multiple tasks exist");
104         return E_ERROR;
105     }
106     LOG_INFO(UDMF_CLIENT, "Cancel task key=%{public}s", asyncHelperMap_.begin()->first.c_str());
107     asyncHelperMap_.begin()->second->progressQueue.Cancel();
108     return E_OK;
109 }
110 
ProgressTask(const std::string & businessUdKey)111 Status UdmfAsyncClient::ProgressTask(const std::string &businessUdKey)
112 {
113     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
114     if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
115         auto status = SetProgressData(businessUdKey);
116         if (status != E_OK) {
117             Clear(businessUdKey);
118             return status;
119         }
120     }
121     while (asyncHelper->lastProgress >= PROGRESS_INIT && asyncHelper->lastProgress < PROGRESS_ALL_FINISHED) {
122         auto pair = asyncHelper->progressQueue.Poll();
123         if (!pair.first) {
124             std::this_thread::sleep_for(std::chrono::milliseconds(READ_PROGRESS_INTERVAL));
125             continue;
126         }
127         auto progressInfo = pair.second;
128         if (progressInfo->progress >= PROGRESS_INIT && progressInfo->progress <= asyncHelper->lastProgress) {
129             continue;
130         }
131         asyncHelper->lastProgress = progressInfo->progress;
132         if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
133             UpdateProgressData(asyncHelper->processKey, *progressInfo);
134         }
135     }
136     Clear(businessUdKey);
137     return E_OK;
138 }
139 
GetDataTask(const QueryOption & query)140 Status UdmfAsyncClient::GetDataTask(const QueryOption &query)
141 {
142     auto &asyncHelper = asyncHelperMap_.at(query.key);
143     if (GetDataFromCache(asyncHelper, query) == E_OK) {
144         ProcessUnifiedData(asyncHelper);
145         return E_OK;
146     }
147     return CheckSync(asyncHelper, query);
148 }
149 
InvokeHapTask(const std::string & businessUdKey)150 Status UdmfAsyncClient::InvokeHapTask(const std::string &businessUdKey)
151 {
152     LOG_INFO(UDMF_CLIENT, "InvokeHap start!");
153     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
154     if (asyncHelper->progressQueue.IsCancel()) {
155         LOG_INFO(UDMF_CLIENT, "Finished, not invoke hap.");
156         Clear(businessUdKey);
157         return E_OK;
158     }
159     if (asyncHelper->processKey.empty()) {
160         LOG_ERROR(UDMF_CLIENT, "Get key failed, not invoke hap.");
161         Clear(businessUdKey);
162         return E_ERROR;
163     }
164     Clear(businessUdKey);
165     return E_OK;
166 }
167 
RegisterAsyncHelper(const GetDataParams & params)168 Status UdmfAsyncClient::RegisterAsyncHelper(const GetDataParams &params)
169 {
170     std::lock_guard<std::mutex> lock(mutex_);
171     if (asyncHelperMap_.find(params.query.key) != asyncHelperMap_.end()) {
172         LOG_ERROR(UDMF_CLIENT, "The same task is running, key = %{public}s", params.query.key.c_str());
173         return E_IDEMPOTENT_ERROR;
174     }
175     auto asyncHelper = std::make_unique<AsyncHelper>();
176     asyncHelper->businessUdKey = params.query.key;
177     asyncHelper->progressListener = params.progressListener;
178     asyncHelper->progressIndicator = params.progressIndicator;
179     asyncHelper->fileConflictOptions = params.fileConflictOptions;
180     asyncHelper->destUri = params.destUri;
181     asyncHelperMap_.insert_or_assign(params.query.key, std::move(asyncHelper));
182     return E_OK;
183 }
184 
CheckSync(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)185 Status UdmfAsyncClient::CheckSync(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
186 {
187 #ifndef IOS_PLATFORM
188     AsyncProcessInfo processInfo = {
189         .businessUdKey = query.key
190     };
191     auto serviceClient = UdmfServiceClient::GetInstance();
192     if (serviceClient == nullptr) {
193         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
194         return E_IPC;
195     }
196     auto status = serviceClient->ObtainAsynProcess(processInfo);
197     if (status != E_OK) {
198         LOG_ERROR(UDMF_CLIENT, "Sync error status = %{public}d!", status);
199         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_ERROR };
200         CallProgress(asyncHelper, progressInfo, nullptr);
201         return static_cast<Status>(status);
202     }
203     if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_FAILURE) {
204         LOG_ERROR(UDMF_CLIENT, "Sync failed!");
205         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
206         CallProgress(asyncHelper, progressInfo, nullptr);
207         return E_ERROR;
208     }
209     if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_SUCCESS) {
210         ProgressInfo progressInfo = { .progress = PROGRESS_SYNC_FINSHED, .errorCode = E_OK };
211         CallProgress(asyncHelper, progressInfo, nullptr);
212         return GetDataFromDB(asyncHelper, query);
213     }
214     if (asyncHelper->sycnRetryTime > MAX_SYNC_TIMES) {
215         LOG_ERROR(UDMF_CLIENT, "Sync retry timeout!");
216         ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
217         CallProgress(asyncHelper, progressInfo, nullptr);
218         return E_SYNC_FAILED;
219     }
220     (asyncHelper->sycnRetryTime)++;
221     asyncHelper->getDataTask = executor_.Schedule(std::chrono::milliseconds(SYNC_INTERVAL),
222         std::bind(&UdmfAsyncClient::GetDataTask, this, query));
223 #endif
224     return E_OK;
225 }
226 
GetDataFromDB(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)227 Status UdmfAsyncClient::GetDataFromDB(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
228 {
229     auto &data = *(asyncHelper->data);
230     auto status = static_cast<Status>(UdmfServiceClient::GetInstance()->GetData(query, data));
231     if (status != E_OK) {
232         LOG_ERROR(UDMF_CLIENT, "GetData error, status = %{public}d", status);
233         ProgressInfo progressInfo = {
234             .progress = PROGRESS_ERROR,
235             .errorCode = status
236         };
237         CallProgress(asyncHelper, progressInfo, nullptr);
238         return status;
239     }
240     return ProcessUnifiedData(asyncHelper);
241 }
242 
GetDataFromCache(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)243 Status UdmfAsyncClient::GetDataFromCache(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
244 {
245     return UdmfClient::GetInstance().GetDataFromCache(query, *(asyncHelper->data));
246 }
247 
SetProgressData(const std::string & businessUdKey)248 Status UdmfAsyncClient::SetProgressData(const std::string &businessUdKey)
249 {
250     auto serviceClient = UdmfServiceClient::GetInstance();
251     if (serviceClient == nullptr) {
252         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
253         return E_IPC;
254     }
255     std::string progressKey;
256     CustomOption cusomOption = {
257         .intention = Intention::UD_INTENTION_DATA_HUB
258     };
259     auto obj = std::make_shared<Object>();
260     auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
261     progressRecord->SetContent(std::to_string(PROGRESS_INIT));
262     UnifiedData progressData;
263     progressData.AddRecord(progressRecord);
264     auto status = serviceClient->SetData(cusomOption, progressData, progressKey);
265     if (status != E_OK) {
266         LOG_ERROR(UDMF_CLIENT, "Set progress data error, status = %{public}d", status);
267         return static_cast<Status>(status);
268     }
269     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
270     asyncHelper->processKey = progressKey;
271     return E_OK;
272 }
273 
UpdateProgressData(const std::string & progressUdKey,const ProgressInfo & progressInfo)274 Status UdmfAsyncClient::UpdateProgressData(const std::string &progressUdKey, const ProgressInfo &progressInfo)
275 {
276     auto serviceClient = UdmfServiceClient::GetInstance();
277     if (serviceClient == nullptr) {
278         LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
279         return E_IPC;
280     }
281     QueryOption queryOption = {
282         .key = progressUdKey,
283         .intention = Intention::UD_INTENTION_DATA_HUB
284     };
285     auto obj = std::make_shared<Object>();
286     auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
287     if (progressInfo.progress < PROGRESS_INIT) {
288         progressRecord->SetContent(std::to_string(PROGRESS_ALL_FINISHED));
289     } else {
290         progressRecord->SetContent(std::to_string(progressInfo.progress));
291     }
292     UnifiedData progressData;
293     progressData.AddRecord(progressRecord);
294     auto status = serviceClient->UpdateData(queryOption, progressData);
295     if (status != E_OK) {
296         LOG_ERROR(UDMF_CLIENT, "Update progress data error, status = %{public}d", status);
297         return static_cast<Status>(status);
298     }
299     return E_OK;
300 }
301 
CopyFile(std::unique_ptr<AsyncHelper> & asyncHelper)302 Status UdmfAsyncClient::CopyFile(std::unique_ptr<AsyncHelper> &asyncHelper)
303 {
304     if (asyncHelper->destUri.empty()) {
305         LOG_INFO(UDMF_CLIENT, "No dest path, no copy.");
306         return E_OK;
307     }
308     auto status = UdmfCopyFile::GetInstance().Copy(asyncHelper);
309     ProgressInfo progressInfo = {
310         .progress = PROGRESS_ALL_FINISHED,
311         .errorCode = status
312     };
313     CallProgress(asyncHelper, progressInfo, asyncHelper->data);
314     return E_OK;
315 }
316 
CallProgress(std::unique_ptr<AsyncHelper> & asyncHelper,ProgressInfo & progressInfo,std::shared_ptr<UnifiedData> data)317 void UdmfAsyncClient::CallProgress(std::unique_ptr<AsyncHelper> &asyncHelper, ProgressInfo &progressInfo,
318     std::shared_ptr<UnifiedData> data)
319 {
320     if (progressInfo.errorCode == E_OK) {
321         if (progressInfo.progress == PROGRESS_ALL_FINISHED) {
322             progressInfo.progressStatus = ListenerStatus::FINISHED;
323         } else {
324             progressInfo.progressStatus = ListenerStatus::PROCESSING;
325         }
326     } else {
327         auto it = STATUS_MAP.find(progressInfo.errorCode);
328         if (it == STATUS_MAP.end()) {
329             progressInfo.progressStatus = ListenerStatus::INNER_ERROR;
330         } else {
331             progressInfo.progressStatus = it->second;
332         }
333     }
334 
335     asyncHelper->progressListener(progressInfo, data);
336     asyncHelper->progressQueue.PushBack(progressInfo);
337 }
338 
Clear(const std::string & businessUdKey)339 Status UdmfAsyncClient::Clear(const std::string &businessUdKey)
340 {
341 #ifndef IOS_PLATFORM
342     std::lock_guard<std::mutex> lockMap(mutex_);
343     if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
344         LOG_ERROR(UDMF_CLIENT, "No task can Clear, key=%{public}s", businessUdKey.c_str());
345         return E_ERROR;
346     }
347     auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
348     if (!asyncHelper->progressQueue.Clear()) {
349         return E_OK;
350     }
351     if (asyncHelper->invokeHapTask != ExecutorPool::INVALID_TASK_ID) {
352         executor_.Remove(asyncHelper->invokeHapTask);
353     }
354     executor_.Remove(asyncHelper->getDataTask);
355     executor_.Remove(asyncHelper->progressTask);
356     asyncHelperMap_.erase(businessUdKey);
357     UdmfServiceClient::GetInstance()->ClearAsynProcessByKey(businessUdKey);
358     LOG_INFO(UDMF_CLIENT, "Clear task success, key = %{public}s", businessUdKey.c_str());
359 #endif
360     return E_OK;
361 }
362 
ProcessUnifiedData(std::unique_ptr<AsyncHelper> & asyncHelper)363 Status UdmfAsyncClient::ProcessUnifiedData(std::unique_ptr<AsyncHelper> &asyncHelper)
364 {
365     if (!asyncHelper->data->HasFileType()) {
366         ProgressInfo progressInfo = {
367             .progress = PROGRESS_ALL_FINISHED,
368             .errorCode = E_OK
369         };
370         CallProgress(asyncHelper, progressInfo, asyncHelper->data);
371         return E_OK;
372     }
373     ProgressInfo progressInfo = {
374         .progress = PROGRESS_GET_DATA_FINISHED,
375         .errorCode = E_OK
376     };
377     CallProgress(asyncHelper, progressInfo, nullptr);
378     return CopyFile(asyncHelper);
379 }
380 
IsParamValid(const GetDataParams & params)381 bool UdmfAsyncClient::IsParamValid(const GetDataParams &params)
382 {
383     auto query = params.query;
384     if (query.key.empty() || query.intention != Intention::UD_INTENTION_DRAG) {
385         LOG_ERROR(UDMF_CLIENT, "Params query invalid. query.key=%{public}s, intention=%{public}d", query.key.c_str(),
386             query.intention);
387         return false;
388     }
389     if (params.progressListener == nullptr) {
390         LOG_ERROR(UDMF_CLIENT, "Params progressListener not null.");
391         return false;
392     }
393     return true;
394 }
395 } // namespace OHOS::UDMF
396