1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "UdmfAsyncClient"
16
17 #include "udmf_async_client.h"
18
19 #include "dataobs_mgr_client.h"
20 #include "logger.h"
21 #include "plain_text.h"
22 #include "progress_callback.h"
23 #include "udmf_client.h"
24 #include "udmf_copy_file.h"
25 #include "udmf_service_client.h"
26
27 namespace OHOS::UDMF {
28 static constexpr size_t MAX_THREADS = 10;
29 static constexpr size_t MIN_THREADS = 0;
30
31 static constexpr int32_t START_ABILITY_INTERVAL = 500;
32 static constexpr int32_t READ_PROGRESS_INTERVAL = 100;
33 static constexpr int32_t SYNC_INTERVAL = 200;
34 static constexpr int32_t MAX_SYNC_TIMES = 14;
35
36 static std::unordered_map<Status, int32_t> STATUS_MAP = {
37 { E_INVALID_PARAMETERS, ListenerStatus::INVALID_PARAMETERS },
38 { E_NOT_FOUND, ListenerStatus::DATA_NOT_FOUND },
39 { E_SYNC_FAILED, ListenerStatus::SYNC_FAILED },
40 { E_COPY_FILE_FAILED, ListenerStatus::COPY_FILE_FAILED },
41 { E_COPY_CANCELED, ListenerStatus::CANCEL }
42 };
43
44 static constexpr int32_t PROGRESS_ERROR = -1;
45 static constexpr int32_t PROGRESS_INIT = 0;
46 static constexpr int32_t PROGRESS_SYNC_FINSHED = 10;
47 static constexpr int32_t PROGRESS_GET_DATA_FINISHED = 20;
48 static constexpr int32_t PROGRESS_ALL_FINISHED = 100;
49
50 #ifdef IOS_PLATFORM
UdmfAsyncClient()51 UdmfAsyncClient::UdmfAsyncClient() {}
52 #else
UdmfAsyncClient()53 UdmfAsyncClient::UdmfAsyncClient() : executor_(MAX_THREADS, MIN_THREADS) {}
54 #endif
55
GetInstance()56 UdmfAsyncClient &UdmfAsyncClient::GetInstance()
57 {
58 static UdmfAsyncClient instance;
59 return instance;
60 }
61
StartAsyncDataRetrieval(const GetDataParams & params)62 Status UdmfAsyncClient::StartAsyncDataRetrieval(const GetDataParams ¶ms)
63 {
64 #ifndef IOS_PLATFORM
65 if (!IsParamValid(params)) {
66 return E_INVALID_PARAMETERS;
67 }
68 auto status = RegisterAsyncHelper(params);
69 if (status != E_OK) {
70 return status;
71 }
72 auto &asyncHelper = asyncHelperMap_.at(params.query.key);
73 if (params.progressIndicator == ProgressIndicator::DEFAULT) {
74 asyncHelper->progressQueue.SetClearable(false);
75 asyncHelper->invokeHapTask = executor_.Schedule(std::chrono::milliseconds(START_ABILITY_INTERVAL),
76 (std::bind(&UdmfAsyncClient::InvokeHapTask, this, params.query.key)));
77 }
78 asyncHelper->getDataTask = executor_.Execute(std::bind(&UdmfAsyncClient::GetDataTask, this, params.query));
79 asyncHelper->progressTask = executor_.Execute(std::bind(&UdmfAsyncClient::ProgressTask, this, params.query.key));
80 #endif
81 return E_OK;
82 }
83
Cancel(std::string businessUdKey)84 Status UdmfAsyncClient::Cancel(std::string businessUdKey)
85 {
86 std::lock_guard<std::mutex> lockMap(mutex_);
87 if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
88 LOG_ERROR(UDMF_CLIENT, "No task can Cancel, key=%{public}s", businessUdKey.c_str());
89 return E_ERROR;
90 }
91 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
92 asyncHelper->progressQueue.Cancel();
93 return E_OK;
94 }
95
CancelOnSingleTask()96 Status UdmfAsyncClient::CancelOnSingleTask()
97 {
98 std::lock_guard<std::mutex> lockMap(mutex_);
99 if (asyncHelperMap_.empty()) {
100 LOG_ERROR(UDMF_CLIENT, "No task can cancel");
101 return E_ERROR;
102 }
103 if (asyncHelperMap_.size() > 1) {
104 LOG_ERROR(UDMF_CLIENT, "Multiple tasks exist");
105 return E_ERROR;
106 }
107 LOG_INFO(UDMF_CLIENT, "Cancel task key=%{public}s", asyncHelperMap_.begin()->first.c_str());
108 asyncHelperMap_.begin()->second->progressQueue.Cancel();
109 return E_OK;
110 }
111
ProgressTask(const std::string & businessUdKey)112 Status UdmfAsyncClient::ProgressTask(const std::string &businessUdKey)
113 {
114 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
115 if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
116 auto status = SetProgressData(businessUdKey);
117 if (status != E_OK) {
118 Clear(businessUdKey);
119 return status;
120 }
121 }
122 while (asyncHelper->lastProgress >= PROGRESS_INIT && asyncHelper->lastProgress < PROGRESS_ALL_FINISHED) {
123 auto pair = asyncHelper->progressQueue.Poll();
124 if (!pair.first) {
125 std::this_thread::sleep_for(std::chrono::milliseconds(READ_PROGRESS_INTERVAL));
126 continue;
127 }
128 auto progressInfo = pair.second;
129 if (progressInfo->progress >= PROGRESS_INIT && progressInfo->progress <= asyncHelper->lastProgress) {
130 continue;
131 }
132 asyncHelper->lastProgress = progressInfo->progress;
133 if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
134 UpdateProgressData(asyncHelper->processKey, *progressInfo);
135 }
136 }
137 Clear(businessUdKey);
138 return E_OK;
139 }
140
GetDataTask(const QueryOption & query)141 Status UdmfAsyncClient::GetDataTask(const QueryOption &query)
142 {
143 auto &asyncHelper = asyncHelperMap_.at(query.key);
144 if (GetDataFromCache(asyncHelper, query) == E_OK) {
145 ProcessUnifiedData(asyncHelper);
146 return E_OK;
147 }
148 return CheckSync(asyncHelper, query);
149 }
150
InvokeHapTask(const std::string & businessUdKey)151 Status UdmfAsyncClient::InvokeHapTask(const std::string &businessUdKey)
152 {
153 LOG_INFO(UDMF_CLIENT, "InvokeHap start!");
154 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
155 if (asyncHelper->progressQueue.IsCancel() || asyncHelper->progressQueue.IsClear()) {
156 LOG_INFO(UDMF_CLIENT, "Finished, not invoke hap.");
157 Clear(businessUdKey);
158 return E_OK;
159 }
160 if (asyncHelper->processKey.empty()) {
161 LOG_ERROR(UDMF_CLIENT, "Get key failed, not invoke hap.");
162 Clear(businessUdKey);
163 return E_ERROR;
164 }
165 sptr<IRemoteObject> callback = new ProgressSignalCallback();
166 auto obsMgrClient = AAFwk::DataObsMgrClient::GetInstance();
167 if (obsMgrClient == nullptr) {
168 LOG_ERROR(UDMF_CLIENT, "Get DataObsMgrClient failed");
169 Clear(businessUdKey);
170 return E_ERROR;
171 }
172
173 auto status = obsMgrClient->NotifyProcessObserver(asyncHelper->processKey, callback);
174 if (status != AAFwk::SUCCESS) {
175 LOG_ERROR(UDMF_CLIENT, "Notify process dialog failed, status=%{public}d", status);
176 Clear(businessUdKey);
177 return E_ERROR;
178 }
179 Clear(businessUdKey);
180 return E_OK;
181 }
182
RegisterAsyncHelper(const GetDataParams & params)183 Status UdmfAsyncClient::RegisterAsyncHelper(const GetDataParams ¶ms)
184 {
185 std::lock_guard<std::mutex> lock(mutex_);
186 if (asyncHelperMap_.find(params.query.key) != asyncHelperMap_.end()) {
187 LOG_ERROR(UDMF_CLIENT, "The same task is running, key = %{public}s", params.query.key.c_str());
188 return E_IDEMPOTENT_ERROR;
189 }
190 auto asyncHelper = std::make_unique<AsyncHelper>();
191 asyncHelper->businessUdKey = params.query.key;
192 asyncHelper->progressListener = params.progressListener;
193 asyncHelper->progressIndicator = params.progressIndicator;
194 asyncHelper->fileConflictOptions = params.fileConflictOptions;
195 asyncHelper->destUri = params.destUri;
196 asyncHelperMap_.insert_or_assign(params.query.key, std::move(asyncHelper));
197 return E_OK;
198 }
199
CheckSync(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)200 Status UdmfAsyncClient::CheckSync(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
201 {
202 #ifndef IOS_PLATFORM
203 AsyncProcessInfo processInfo = {
204 .businessUdKey = query.key
205 };
206 auto serviceClient = UdmfServiceClient::GetInstance();
207 if (serviceClient == nullptr) {
208 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
209 return E_IPC;
210 }
211 auto status = serviceClient->ObtainAsynProcess(processInfo);
212 if (status != E_OK) {
213 LOG_ERROR(UDMF_CLIENT, "Sync error status = %{public}d!", status);
214 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_ERROR };
215 CallProgress(asyncHelper, progressInfo, nullptr);
216 return static_cast<Status>(status);
217 }
218 if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_FAILURE) {
219 LOG_ERROR(UDMF_CLIENT, "Sync failed!");
220 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
221 CallProgress(asyncHelper, progressInfo, nullptr);
222 return E_ERROR;
223 }
224 if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_SUCCESS) {
225 ProgressInfo progressInfo = { .progress = PROGRESS_SYNC_FINSHED, .errorCode = E_OK };
226 CallProgress(asyncHelper, progressInfo, nullptr);
227 return GetDataFromDB(asyncHelper, query);
228 }
229 if (asyncHelper->sycnRetryTime > MAX_SYNC_TIMES) {
230 LOG_ERROR(UDMF_CLIENT, "Sync retry timeout!");
231 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
232 CallProgress(asyncHelper, progressInfo, nullptr);
233 return E_SYNC_FAILED;
234 }
235 (asyncHelper->sycnRetryTime)++;
236 asyncHelper->getDataTask = executor_.Schedule(std::chrono::milliseconds(SYNC_INTERVAL),
237 std::bind(&UdmfAsyncClient::GetDataTask, this, query));
238 #endif
239 return E_OK;
240 }
241
GetDataFromDB(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)242 Status UdmfAsyncClient::GetDataFromDB(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
243 {
244 auto &data = *(asyncHelper->data);
245 auto status = static_cast<Status>(UdmfServiceClient::GetInstance()->GetData(query, data));
246 if (status != E_OK) {
247 LOG_ERROR(UDMF_CLIENT, "GetData error, status = %{public}d", status);
248 ProgressInfo progressInfo = {
249 .progress = PROGRESS_ERROR,
250 .errorCode = status
251 };
252 CallProgress(asyncHelper, progressInfo, nullptr);
253 return status;
254 }
255 return ProcessUnifiedData(asyncHelper);
256 }
257
GetDataFromCache(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)258 Status UdmfAsyncClient::GetDataFromCache(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
259 {
260 return UdmfClient::GetInstance().GetDataFromCache(query, *(asyncHelper->data));
261 }
262
SetProgressData(const std::string & businessUdKey)263 Status UdmfAsyncClient::SetProgressData(const std::string &businessUdKey)
264 {
265 auto serviceClient = UdmfServiceClient::GetInstance();
266 if (serviceClient == nullptr) {
267 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
268 return E_IPC;
269 }
270 std::string progressKey;
271 CustomOption cusomOption = {
272 .intention = Intention::UD_INTENTION_DATA_HUB
273 };
274 auto obj = std::make_shared<Object>();
275 auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
276 progressRecord->SetContent(std::to_string(PROGRESS_INIT));
277 UnifiedData progressData;
278 progressData.AddRecord(progressRecord);
279 auto status = serviceClient->SetData(cusomOption, progressData, progressKey);
280 if (status != E_OK) {
281 LOG_ERROR(UDMF_CLIENT, "Set progress data error, status = %{public}d", status);
282 return static_cast<Status>(status);
283 }
284 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
285 asyncHelper->processKey = progressKey;
286 return E_OK;
287 }
288
UpdateProgressData(const std::string & progressUdKey,const ProgressInfo & progressInfo)289 Status UdmfAsyncClient::UpdateProgressData(const std::string &progressUdKey, const ProgressInfo &progressInfo)
290 {
291 auto serviceClient = UdmfServiceClient::GetInstance();
292 if (serviceClient == nullptr) {
293 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
294 return E_IPC;
295 }
296 QueryOption queryOption = {
297 .key = progressUdKey,
298 .intention = Intention::UD_INTENTION_DATA_HUB
299 };
300 auto obj = std::make_shared<Object>();
301 auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
302 if (progressInfo.progress < PROGRESS_INIT) {
303 progressRecord->SetContent(std::to_string(PROGRESS_ALL_FINISHED));
304 } else {
305 progressRecord->SetContent(std::to_string(progressInfo.progress));
306 }
307 UnifiedData progressData;
308 progressData.AddRecord(progressRecord);
309 auto status = serviceClient->UpdateData(queryOption, progressData);
310 if (status != E_OK) {
311 LOG_ERROR(UDMF_CLIENT, "Update progress data error, status = %{public}d", status);
312 return static_cast<Status>(status);
313 }
314 return E_OK;
315 }
316
CopyFile(std::unique_ptr<AsyncHelper> & asyncHelper)317 Status UdmfAsyncClient::CopyFile(std::unique_ptr<AsyncHelper> &asyncHelper)
318 {
319 auto status = E_OK;
320 if (asyncHelper->destUri.empty()) {
321 LOG_INFO(UDMF_CLIENT, "No dest path, no copy.");
322 } else {
323 status = UdmfCopyFile::GetInstance().Copy(asyncHelper);
324 }
325 ProgressInfo progressInfo = {
326 .progress = PROGRESS_ALL_FINISHED,
327 .errorCode = status
328 };
329 CallProgress(asyncHelper, progressInfo, asyncHelper->data);
330 return E_OK;
331 }
332
CallProgress(std::unique_ptr<AsyncHelper> & asyncHelper,ProgressInfo & progressInfo,std::shared_ptr<UnifiedData> data)333 void UdmfAsyncClient::CallProgress(std::unique_ptr<AsyncHelper> &asyncHelper, ProgressInfo &progressInfo,
334 std::shared_ptr<UnifiedData> data)
335 {
336 if (progressInfo.errorCode == E_OK) {
337 if (progressInfo.progress == PROGRESS_ALL_FINISHED) {
338 progressInfo.progressStatus = ListenerStatus::FINISHED;
339 } else {
340 progressInfo.progressStatus = ListenerStatus::PROCESSING;
341 }
342 } else {
343 auto it = STATUS_MAP.find(progressInfo.errorCode);
344 if (it == STATUS_MAP.end()) {
345 progressInfo.progressStatus = ListenerStatus::INNER_ERROR;
346 } else {
347 progressInfo.progressStatus = it->second;
348 }
349 }
350
351 asyncHelper->progressListener(progressInfo, data);
352 asyncHelper->progressQueue.PushBack(progressInfo);
353 }
354
Clear(const std::string & businessUdKey)355 Status UdmfAsyncClient::Clear(const std::string &businessUdKey)
356 {
357 #ifndef IOS_PLATFORM
358 std::lock_guard<std::mutex> lockMap(mutex_);
359 if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
360 LOG_ERROR(UDMF_CLIENT, "No task can Clear, key=%{public}s", businessUdKey.c_str());
361 return E_ERROR;
362 }
363 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
364 if (!asyncHelper->progressQueue.Clear()) {
365 return E_OK;
366 }
367 if (asyncHelper->invokeHapTask != ExecutorPool::INVALID_TASK_ID) {
368 executor_.Remove(asyncHelper->invokeHapTask);
369 }
370 executor_.Remove(asyncHelper->getDataTask);
371 executor_.Remove(asyncHelper->progressTask);
372 asyncHelperMap_.erase(businessUdKey);
373 UdmfServiceClient::GetInstance()->ClearAsynProcessByKey(businessUdKey);
374 LOG_INFO(UDMF_CLIENT, "Clear task success, key = %{public}s", businessUdKey.c_str());
375 #endif
376 return E_OK;
377 }
378
ProcessUnifiedData(std::unique_ptr<AsyncHelper> & asyncHelper)379 Status UdmfAsyncClient::ProcessUnifiedData(std::unique_ptr<AsyncHelper> &asyncHelper)
380 {
381 if (!asyncHelper->data->HasFileType() && !asyncHelper->data->HasUriInfo()) {
382 ProgressInfo progressInfo = {
383 .progress = PROGRESS_ALL_FINISHED,
384 .errorCode = E_OK
385 };
386 CallProgress(asyncHelper, progressInfo, asyncHelper->data);
387 return E_OK;
388 }
389 ProgressInfo progressInfo = {
390 .progress = PROGRESS_GET_DATA_FINISHED,
391 .errorCode = E_OK
392 };
393 CallProgress(asyncHelper, progressInfo, nullptr);
394 return CopyFile(asyncHelper);
395 }
396
IsParamValid(const GetDataParams & params)397 bool UdmfAsyncClient::IsParamValid(const GetDataParams ¶ms)
398 {
399 auto query = params.query;
400 if (query.key.empty() || query.intention != Intention::UD_INTENTION_DRAG) {
401 LOG_ERROR(UDMF_CLIENT, "Params query invalid. query.key=%{public}s, intention=%{public}d", query.key.c_str(),
402 query.intention);
403 return false;
404 }
405 if (params.progressListener == nullptr) {
406 LOG_ERROR(UDMF_CLIENT, "Params progressListener not null.");
407 return false;
408 }
409 return true;
410 }
411 } // namespace OHOS::UDMF
412