1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "UdmfAsyncClient"
16
17 #include "udmf_async_client.h"
18
19 #include "logger.h"
20 #include "plain_text.h"
21 #include "progress_callback.h"
22 #include "udmf_client.h"
23 #include "udmf_copy_file.h"
24 #include "udmf_service_client.h"
25
26 namespace OHOS::UDMF {
27 static constexpr size_t MAX_THREADS = 10;
28 static constexpr size_t MIN_THREADS = 0;
29
30 static constexpr int32_t START_ABILITY_INTERVAL = 500;
31 static constexpr int32_t READ_PROGRESS_INTERVAL = 100;
32 static constexpr int32_t SYNC_INTERVAL = 200;
33 static constexpr int32_t MAX_SYNC_TIMES = 14;
34
35 static std::unordered_map<Status, int32_t> STATUS_MAP = {
36 { E_INVALID_PARAMETERS, ListenerStatus::INVALID_PARAMETERS },
37 { E_NOT_FOUND, ListenerStatus::DATA_NOT_FOUND },
38 { E_SYNC_FAILED, ListenerStatus::SYNC_FAILED },
39 { E_COPY_FILE_FAILED, ListenerStatus::COPY_FILE_FAILED },
40 { E_COPY_CANCELED, ListenerStatus::CANCEL }
41 };
42
43 static constexpr int32_t PROGRESS_ERROR = -1;
44 static constexpr int32_t PROGRESS_INIT = 0;
45 static constexpr int32_t PROGRESS_SYNC_FINSHED = 10;
46 static constexpr int32_t PROGRESS_GET_DATA_FINISHED = 20;
47 static constexpr int32_t PROGRESS_ALL_FINISHED = 100;
48
49 #ifdef IOS_PLATFORM
UdmfAsyncClient()50 UdmfAsyncClient::UdmfAsyncClient() {}
51 #else
UdmfAsyncClient()52 UdmfAsyncClient::UdmfAsyncClient() : executor_(MAX_THREADS, MIN_THREADS) {}
53 #endif
54
GetInstance()55 UdmfAsyncClient &UdmfAsyncClient::GetInstance()
56 {
57 static UdmfAsyncClient instance;
58 return instance;
59 }
60
StartAsyncDataRetrieval(const GetDataParams & params)61 Status UdmfAsyncClient::StartAsyncDataRetrieval(const GetDataParams ¶ms)
62 {
63 #ifndef IOS_PLATFORM
64 if (!IsParamValid(params)) {
65 return E_INVALID_PARAMETERS;
66 }
67 auto status = RegisterAsyncHelper(params);
68 if (status != E_OK) {
69 return status;
70 }
71 auto &asyncHelper = asyncHelperMap_.at(params.query.key);
72 if (params.progressIndicator == ProgressIndicator::DEFAULT) {
73 asyncHelper->progressQueue.SetClearable(false);
74 asyncHelper->invokeHapTask = executor_.Schedule(std::chrono::milliseconds(START_ABILITY_INTERVAL),
75 (std::bind(&UdmfAsyncClient::InvokeHapTask, this, params.query.key)));
76 }
77 asyncHelper->getDataTask = executor_.Execute(std::bind(&UdmfAsyncClient::GetDataTask, this, params.query));
78 asyncHelper->progressTask = executor_.Execute(std::bind(&UdmfAsyncClient::ProgressTask, this, params.query.key));
79 #endif
80 return E_OK;
81 }
82
Cancel(std::string businessUdKey)83 Status UdmfAsyncClient::Cancel(std::string businessUdKey)
84 {
85 std::lock_guard<std::mutex> lockMap(mutex_);
86 if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
87 LOG_ERROR(UDMF_CLIENT, "No task can Cancel, key=%{public}s", businessUdKey.c_str());
88 return E_ERROR;
89 }
90 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
91 asyncHelper->progressQueue.Cancel();
92 return E_OK;
93 }
94
CancelOnSingleTask()95 Status UdmfAsyncClient::CancelOnSingleTask()
96 {
97 std::lock_guard<std::mutex> lockMap(mutex_);
98 if (asyncHelperMap_.empty()) {
99 LOG_ERROR(UDMF_CLIENT, "No task can cancel");
100 return E_ERROR;
101 }
102 if (asyncHelperMap_.size() > 1) {
103 LOG_ERROR(UDMF_CLIENT, "Multiple tasks exist");
104 return E_ERROR;
105 }
106 LOG_INFO(UDMF_CLIENT, "Cancel task key=%{public}s", asyncHelperMap_.begin()->first.c_str());
107 asyncHelperMap_.begin()->second->progressQueue.Cancel();
108 return E_OK;
109 }
110
ProgressTask(const std::string & businessUdKey)111 Status UdmfAsyncClient::ProgressTask(const std::string &businessUdKey)
112 {
113 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
114 if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
115 auto status = SetProgressData(businessUdKey);
116 if (status != E_OK) {
117 Clear(businessUdKey);
118 return status;
119 }
120 }
121 while (asyncHelper->lastProgress >= PROGRESS_INIT && asyncHelper->lastProgress < PROGRESS_ALL_FINISHED) {
122 auto pair = asyncHelper->progressQueue.Poll();
123 if (!pair.first) {
124 std::this_thread::sleep_for(std::chrono::milliseconds(READ_PROGRESS_INTERVAL));
125 continue;
126 }
127 auto progressInfo = pair.second;
128 if (progressInfo->progress >= PROGRESS_INIT && progressInfo->progress <= asyncHelper->lastProgress) {
129 continue;
130 }
131 asyncHelper->lastProgress = progressInfo->progress;
132 if (asyncHelper->progressIndicator == ProgressIndicator::DEFAULT) {
133 UpdateProgressData(asyncHelper->processKey, *progressInfo);
134 }
135 }
136 Clear(businessUdKey);
137 return E_OK;
138 }
139
GetDataTask(const QueryOption & query)140 Status UdmfAsyncClient::GetDataTask(const QueryOption &query)
141 {
142 auto &asyncHelper = asyncHelperMap_.at(query.key);
143 if (GetDataFromCache(asyncHelper, query) == E_OK) {
144 ProcessUnifiedData(asyncHelper);
145 return E_OK;
146 }
147 return CheckSync(asyncHelper, query);
148 }
149
InvokeHapTask(const std::string & businessUdKey)150 Status UdmfAsyncClient::InvokeHapTask(const std::string &businessUdKey)
151 {
152 LOG_INFO(UDMF_CLIENT, "InvokeHap start!");
153 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
154 if (asyncHelper->progressQueue.IsCancel()) {
155 LOG_INFO(UDMF_CLIENT, "Finished, not invoke hap.");
156 Clear(businessUdKey);
157 return E_OK;
158 }
159 if (asyncHelper->processKey.empty()) {
160 LOG_ERROR(UDMF_CLIENT, "Get key failed, not invoke hap.");
161 Clear(businessUdKey);
162 return E_ERROR;
163 }
164 Clear(businessUdKey);
165 return E_OK;
166 }
167
RegisterAsyncHelper(const GetDataParams & params)168 Status UdmfAsyncClient::RegisterAsyncHelper(const GetDataParams ¶ms)
169 {
170 std::lock_guard<std::mutex> lock(mutex_);
171 if (asyncHelperMap_.find(params.query.key) != asyncHelperMap_.end()) {
172 LOG_ERROR(UDMF_CLIENT, "The same task is running, key = %{public}s", params.query.key.c_str());
173 return E_IDEMPOTENT_ERROR;
174 }
175 auto asyncHelper = std::make_unique<AsyncHelper>();
176 asyncHelper->businessUdKey = params.query.key;
177 asyncHelper->progressListener = params.progressListener;
178 asyncHelper->progressIndicator = params.progressIndicator;
179 asyncHelper->fileConflictOptions = params.fileConflictOptions;
180 asyncHelper->destUri = params.destUri;
181 asyncHelperMap_.insert_or_assign(params.query.key, std::move(asyncHelper));
182 return E_OK;
183 }
184
CheckSync(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)185 Status UdmfAsyncClient::CheckSync(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
186 {
187 #ifndef IOS_PLATFORM
188 AsyncProcessInfo processInfo = {
189 .businessUdKey = query.key
190 };
191 auto serviceClient = UdmfServiceClient::GetInstance();
192 if (serviceClient == nullptr) {
193 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
194 return E_IPC;
195 }
196 auto status = serviceClient->ObtainAsynProcess(processInfo);
197 if (status != E_OK) {
198 LOG_ERROR(UDMF_CLIENT, "Sync error status = %{public}d!", status);
199 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_ERROR };
200 CallProgress(asyncHelper, progressInfo, nullptr);
201 return static_cast<Status>(status);
202 }
203 if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_FAILURE) {
204 LOG_ERROR(UDMF_CLIENT, "Sync failed!");
205 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
206 CallProgress(asyncHelper, progressInfo, nullptr);
207 return E_ERROR;
208 }
209 if (processInfo.syncStatus == AsyncTaskStatus::ASYNC_SUCCESS) {
210 ProgressInfo progressInfo = { .progress = PROGRESS_SYNC_FINSHED, .errorCode = E_OK };
211 CallProgress(asyncHelper, progressInfo, nullptr);
212 return GetDataFromDB(asyncHelper, query);
213 }
214 if (asyncHelper->sycnRetryTime > MAX_SYNC_TIMES) {
215 LOG_ERROR(UDMF_CLIENT, "Sync retry timeout!");
216 ProgressInfo progressInfo = { .progress = PROGRESS_ERROR, .errorCode = E_SYNC_FAILED };
217 CallProgress(asyncHelper, progressInfo, nullptr);
218 return E_SYNC_FAILED;
219 }
220 (asyncHelper->sycnRetryTime)++;
221 asyncHelper->getDataTask = executor_.Schedule(std::chrono::milliseconds(SYNC_INTERVAL),
222 std::bind(&UdmfAsyncClient::GetDataTask, this, query));
223 #endif
224 return E_OK;
225 }
226
GetDataFromDB(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)227 Status UdmfAsyncClient::GetDataFromDB(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
228 {
229 auto &data = *(asyncHelper->data);
230 auto status = static_cast<Status>(UdmfServiceClient::GetInstance()->GetData(query, data));
231 if (status != E_OK) {
232 LOG_ERROR(UDMF_CLIENT, "GetData error, status = %{public}d", status);
233 ProgressInfo progressInfo = {
234 .progress = PROGRESS_ERROR,
235 .errorCode = status
236 };
237 CallProgress(asyncHelper, progressInfo, nullptr);
238 return status;
239 }
240 return ProcessUnifiedData(asyncHelper);
241 }
242
GetDataFromCache(std::unique_ptr<AsyncHelper> & asyncHelper,const QueryOption & query)243 Status UdmfAsyncClient::GetDataFromCache(std::unique_ptr<AsyncHelper> &asyncHelper, const QueryOption &query)
244 {
245 return UdmfClient::GetInstance().GetDataFromCache(query, *(asyncHelper->data));
246 }
247
SetProgressData(const std::string & businessUdKey)248 Status UdmfAsyncClient::SetProgressData(const std::string &businessUdKey)
249 {
250 auto serviceClient = UdmfServiceClient::GetInstance();
251 if (serviceClient == nullptr) {
252 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
253 return E_IPC;
254 }
255 std::string progressKey;
256 CustomOption cusomOption = {
257 .intention = Intention::UD_INTENTION_DATA_HUB
258 };
259 auto obj = std::make_shared<Object>();
260 auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
261 progressRecord->SetContent(std::to_string(PROGRESS_INIT));
262 UnifiedData progressData;
263 progressData.AddRecord(progressRecord);
264 auto status = serviceClient->SetData(cusomOption, progressData, progressKey);
265 if (status != E_OK) {
266 LOG_ERROR(UDMF_CLIENT, "Set progress data error, status = %{public}d", status);
267 return static_cast<Status>(status);
268 }
269 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
270 asyncHelper->processKey = progressKey;
271 return E_OK;
272 }
273
UpdateProgressData(const std::string & progressUdKey,const ProgressInfo & progressInfo)274 Status UdmfAsyncClient::UpdateProgressData(const std::string &progressUdKey, const ProgressInfo &progressInfo)
275 {
276 auto serviceClient = UdmfServiceClient::GetInstance();
277 if (serviceClient == nullptr) {
278 LOG_ERROR(UDMF_CLIENT, "UdmfServiceClient GetInstance failed");
279 return E_IPC;
280 }
281 QueryOption queryOption = {
282 .key = progressUdKey,
283 .intention = Intention::UD_INTENTION_DATA_HUB
284 };
285 auto obj = std::make_shared<Object>();
286 auto progressRecord = std::make_shared<PlainText>(UDType::PLAIN_TEXT, obj);
287 if (progressInfo.progress < PROGRESS_INIT) {
288 progressRecord->SetContent(std::to_string(PROGRESS_ALL_FINISHED));
289 } else {
290 progressRecord->SetContent(std::to_string(progressInfo.progress));
291 }
292 UnifiedData progressData;
293 progressData.AddRecord(progressRecord);
294 auto status = serviceClient->UpdateData(queryOption, progressData);
295 if (status != E_OK) {
296 LOG_ERROR(UDMF_CLIENT, "Update progress data error, status = %{public}d", status);
297 return static_cast<Status>(status);
298 }
299 return E_OK;
300 }
301
CopyFile(std::unique_ptr<AsyncHelper> & asyncHelper)302 Status UdmfAsyncClient::CopyFile(std::unique_ptr<AsyncHelper> &asyncHelper)
303 {
304 if (asyncHelper->destUri.empty()) {
305 LOG_INFO(UDMF_CLIENT, "No dest path, no copy.");
306 return E_OK;
307 }
308 auto status = UdmfCopyFile::GetInstance().Copy(asyncHelper);
309 ProgressInfo progressInfo = {
310 .progress = PROGRESS_ALL_FINISHED,
311 .errorCode = status
312 };
313 CallProgress(asyncHelper, progressInfo, asyncHelper->data);
314 return E_OK;
315 }
316
CallProgress(std::unique_ptr<AsyncHelper> & asyncHelper,ProgressInfo & progressInfo,std::shared_ptr<UnifiedData> data)317 void UdmfAsyncClient::CallProgress(std::unique_ptr<AsyncHelper> &asyncHelper, ProgressInfo &progressInfo,
318 std::shared_ptr<UnifiedData> data)
319 {
320 if (progressInfo.errorCode == E_OK) {
321 if (progressInfo.progress == PROGRESS_ALL_FINISHED) {
322 progressInfo.progressStatus = ListenerStatus::FINISHED;
323 } else {
324 progressInfo.progressStatus = ListenerStatus::PROCESSING;
325 }
326 } else {
327 auto it = STATUS_MAP.find(progressInfo.errorCode);
328 if (it == STATUS_MAP.end()) {
329 progressInfo.progressStatus = ListenerStatus::INNER_ERROR;
330 } else {
331 progressInfo.progressStatus = it->second;
332 }
333 }
334
335 asyncHelper->progressListener(progressInfo, data);
336 asyncHelper->progressQueue.PushBack(progressInfo);
337 }
338
Clear(const std::string & businessUdKey)339 Status UdmfAsyncClient::Clear(const std::string &businessUdKey)
340 {
341 #ifndef IOS_PLATFORM
342 std::lock_guard<std::mutex> lockMap(mutex_);
343 if (asyncHelperMap_.find(businessUdKey) == asyncHelperMap_.end()) {
344 LOG_ERROR(UDMF_CLIENT, "No task can Clear, key=%{public}s", businessUdKey.c_str());
345 return E_ERROR;
346 }
347 auto &asyncHelper = asyncHelperMap_.at(businessUdKey);
348 if (!asyncHelper->progressQueue.Clear()) {
349 return E_OK;
350 }
351 if (asyncHelper->invokeHapTask != ExecutorPool::INVALID_TASK_ID) {
352 executor_.Remove(asyncHelper->invokeHapTask);
353 }
354 executor_.Remove(asyncHelper->getDataTask);
355 executor_.Remove(asyncHelper->progressTask);
356 asyncHelperMap_.erase(businessUdKey);
357 UdmfServiceClient::GetInstance()->ClearAsynProcessByKey(businessUdKey);
358 LOG_INFO(UDMF_CLIENT, "Clear task success, key = %{public}s", businessUdKey.c_str());
359 #endif
360 return E_OK;
361 }
362
ProcessUnifiedData(std::unique_ptr<AsyncHelper> & asyncHelper)363 Status UdmfAsyncClient::ProcessUnifiedData(std::unique_ptr<AsyncHelper> &asyncHelper)
364 {
365 if (!asyncHelper->data->HasFileType()) {
366 ProgressInfo progressInfo = {
367 .progress = PROGRESS_ALL_FINISHED,
368 .errorCode = E_OK
369 };
370 CallProgress(asyncHelper, progressInfo, asyncHelper->data);
371 return E_OK;
372 }
373 ProgressInfo progressInfo = {
374 .progress = PROGRESS_GET_DATA_FINISHED,
375 .errorCode = E_OK
376 };
377 CallProgress(asyncHelper, progressInfo, nullptr);
378 return CopyFile(asyncHelper);
379 }
380
IsParamValid(const GetDataParams & params)381 bool UdmfAsyncClient::IsParamValid(const GetDataParams ¶ms)
382 {
383 auto query = params.query;
384 if (query.key.empty() || query.intention != Intention::UD_INTENTION_DRAG) {
385 LOG_ERROR(UDMF_CLIENT, "Params query invalid. query.key=%{public}s, intention=%{public}d", query.key.c_str(),
386 query.intention);
387 return false;
388 }
389 if (params.progressListener == nullptr) {
390 LOG_ERROR(UDMF_CLIENT, "Params progressListener not null.");
391 return false;
392 }
393 return true;
394 }
395 } // namespace OHOS::UDMF
396