1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "data_syncer.h"
17
18 #include <functional>
19
20 #include "data_sync_const.h"
21 #include "data_sync_notifier.h"
22 #include "data_syncer_rdb_store.h"
23 #include "dfs_error.h"
24 #include "ipc/cloud_sync_callback_manager.h"
25 #include "sdk_helper.h"
26 #include "sync_rule/battery_status.h"
27 #include "sync_rule/network_status.h"
28 #include "sync_rule/screen_status.h"
29 #include "sync_rule/cloud_status.h"
30 #include "task_state_manager.h"
31 #include "utils_log.h"
32
33 namespace OHOS {
34 namespace FileManagement {
35 namespace CloudSync {
36 using namespace std;
37 using namespace placeholders;
38 using namespace DriveKit;
39 using ChangeType = AAFwk::ChangeInfo::ChangeType;
40
DataSyncer(const std::string bundleName,const int32_t userId)41 DataSyncer::DataSyncer(const std::string bundleName, const int32_t userId)
42 : bundleName_(bundleName), userId_(userId)
43 {
44 /* alloc task runner */
45 taskRunner_ = DelayedSingleton<TaskManager>::GetInstance()->AllocRunner(userId,
46 bundleName, bind(&DataSyncer::Schedule, this));
47 taskRunner_->SetStopFlag(stopFlag_);
48 downloadCallbackMgr_.SetBundleName(bundleName);
49 }
50
~DataSyncer()51 DataSyncer::~DataSyncer()
52 {
53 /* release task runner */
54 DelayedSingleton<TaskManager>::GetInstance()->ReleaseRunner(userId_, bundleName_);
55 }
56
AsyncRun(std::shared_ptr<TaskContext> context,void (DataSyncer::* f)(std::shared_ptr<TaskContext>))57 int32_t DataSyncer::AsyncRun(std::shared_ptr<TaskContext> context,
58 void(DataSyncer::*f)(std::shared_ptr<TaskContext>))
59 {
60 return taskRunner_->AsyncRun<DataSyncer>(context, f, this);
61 }
62
63 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...))64 function<RET(ARGS...)> DataSyncer::AsyncCallback(RET(T::*f)(ARGS...))
65 {
66 return taskRunner_->AsyncCallback<DataSyncer>(f, this);
67 }
68
StartSync(bool forceFlag,SyncTriggerType triggerType)69 int32_t DataSyncer::StartSync(bool forceFlag, SyncTriggerType triggerType)
70 {
71 LOGI("%{private}d %{public}s starts sync, isforceSync %{public}d, triggerType %{public}d",
72 userId_, bundleName_.c_str(), forceFlag, triggerType);
73
74 triggerType_ = triggerType;
75 startTime_ = GetCurrentTimeStamp();
76
77 /* only one specific data sycner running at a time */
78 if (syncStateManager_.CheckAndSetPending(forceFlag, triggerType)) {
79 LOGI("syncing, pending sync");
80 return E_PENDING;
81 }
82
83 int32_t ret = InitSysEventData();
84 if (ret != E_OK) {
85 return E_DATA;
86 }
87
88 TaskStateManager::GetInstance().StartTask(bundleName_, TaskType::SYNC_TASK);
89 /* start data sync */
90 ScheduleByType(triggerType);
91
92 return E_OK;
93 }
94
StopSync(SyncTriggerType triggerType)95 int32_t DataSyncer::StopSync(SyncTriggerType triggerType)
96 {
97 LOGI("%{private}d %{public}s stops sync, trigger stop sync, type:%{public}d",
98 userId_, bundleName_.c_str(), triggerType);
99 syncStateManager_.SetStopSyncFlag();
100 StopUploadAssets();
101 Abort();
102 return E_OK;
103 }
104
Lock()105 int32_t DataSyncer::Lock()
106 {
107 return E_OK;
108 }
109
Unlock()110 void DataSyncer::Unlock() {}
111
ForceUnlock()112 void DataSyncer::ForceUnlock() {}
113
StartDownloadFile(const std::string path,const int32_t userId)114 int32_t DataSyncer::StartDownloadFile(const std::string path, const int32_t userId)
115 {
116 return E_OK;
117 }
118
StopDownloadFile(const std::string path,const int32_t userId)119 int32_t DataSyncer::StopDownloadFile(const std::string path, const int32_t userId)
120 {
121 DownloadProgressObj download;
122 if (downloadCallbackMgr_.StopDonwload(path, userId, download)) {
123 int64_t downloadId = download.downloadId;
124 int32_t res = sdkHelper_->CancelDownloadAssets(downloadId);
125 LOGD("CancelDownloadAssets result: %d", res);
126 if (res != NO_ERROR) {
127 LOGE("CancelDownloadAssets fail %{public}d", res);
128 return res;
129 }
130 downloadCallbackMgr_.NotifyProcessStop(download);
131 }
132 return E_OK;
133 }
134
135
RegisterDownloadFileCallback(const int32_t userId,const sptr<ICloudDownloadCallback> downloadCallback)136 int32_t DataSyncer::RegisterDownloadFileCallback(const int32_t userId,
137 const sptr<ICloudDownloadCallback> downloadCallback)
138 {
139 downloadCallbackMgr_.RegisterCallback(userId, downloadCallback);
140 return E_OK;
141 }
142
UnregisterDownloadFileCallback(const int32_t userId)143 int32_t DataSyncer::UnregisterDownloadFileCallback(const int32_t userId)
144 {
145 downloadCallbackMgr_.UnregisterCallback(userId);
146 return E_OK;
147 }
148
Abort()149 void DataSyncer::Abort()
150 {
151 LOGI("%{private}d %{private}s aborts", userId_, bundleName_.c_str());
152 thread ([this]() {
153 taskRunner_->ReleaseTask();
154 /* call the syncer manager's callback for notification */
155 Complete();
156 }).detach();
157 }
158
SetSdkHelper(shared_ptr<SdkHelper> & sdkHelper)159 void DataSyncer::SetSdkHelper(shared_ptr<SdkHelper> &sdkHelper)
160 {
161 sdkHelper_ = sdkHelper;
162 }
163
Pull(shared_ptr<DataHandler> handler)164 int32_t DataSyncer::Pull(shared_ptr<DataHandler> handler)
165 {
166 LOGI("%{private}d %{private}s pull", userId_, bundleName_.c_str());
167
168 shared_ptr<TaskContext> context = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
169
170 RetryDownloadRecords(context);
171 vector<DKRecordId> records;
172 int32_t ret = handler->GetRetryRecords(records);
173 if (ret == E_OK) {
174 DataSyncer::PullRecordsWithId(context, records, true);
175 }
176
177 /* Full synchronization and incremental synchronization */
178 if (handler->IsPullRecords()) {
179 SetFullSyncSysEvent();
180 if (handler->GetCheckFlag()) {
181 SetCheckSysEvent();
182 }
183 ret = AsyncRun(context, &DataSyncer::PullRecords);
184 } else {
185 ret = AsyncRun(context, &DataSyncer::PullDatabaseChanges);
186 }
187 if (ret != E_OK) {
188 LOGE("asyn run pull records err %{public}d", ret);
189 return ret;
190 }
191 return E_OK;
192 }
193
194 /*
195 * Although records from the cloud should be all pulled down before
196 * uploading the local change, conflicts might be rare in most cases,
197 * and the syncer would just move on.
198 */
PullRecords(shared_ptr<TaskContext> context)199 void DataSyncer::PullRecords(shared_ptr<TaskContext> context)
200 {
201 LOGI("%{private}d %{private}s pull records", userId_, bundleName_.c_str());
202
203 /* get query condition here */
204 auto handler = context->GetHandler();
205 if (handler == nullptr) {
206 LOGE("context get handler err");
207 return;
208 }
209
210 FetchCondition cond;
211 handler->GetFetchCondition(cond);
212
213 DKQueryCursor tempStartCursor;
214 handler->GetTempStartCursor(tempStartCursor);
215 if (tempStartCursor.empty()) {
216 sdkHelper_->GetStartCursor(cond.recordType, tempStartCursor);
217 handler->SetTempStartCursor(tempStartCursor);
218 }
219
220 DKQueryCursor nextCursor;
221 handler->GetNextCursor(nextCursor);
222
223 SdkHelper::FetchRecordsCallback callback = nullptr;
224 if (handler->GetCheckFlag()) {
225 callback = AsyncCallback(&DataSyncer::OnFetchCheckRecords);
226 } else {
227 callback = AsyncCallback(&DataSyncer::OnFetchRecords);
228 }
229 if (callback == nullptr) {
230 LOGE("wrap on fetch records fail");
231 return;
232 }
233
234 int32_t ret = sdkHelper_->FetchRecords(context, cond, nextCursor, callback);
235 if (ret != E_OK) {
236 LOGE("sdk fetch records err %{public}d", ret);
237 }
238 }
239
PullDatabaseChanges(shared_ptr<TaskContext> context)240 void DataSyncer::PullDatabaseChanges(shared_ptr<TaskContext> context)
241 {
242 LOGI("%{private}d %{private}s pull database changes", userId_, bundleName_.c_str());
243
244 auto callback = AsyncCallback(&DataSyncer::OnFetchDatabaseChanges);
245 if (callback == nullptr) {
246 LOGE("wrap on fetch records fail");
247 return;
248 }
249
250 auto handler = context->GetHandler();
251 if (handler == nullptr) {
252 LOGE("context get handler err");
253 return;
254 }
255 DKQueryCursor nextCursor;
256 handler->GetNextCursor(nextCursor);
257
258 FetchCondition cond;
259 handler->GetFetchCondition(cond);
260 int32_t ret = sdkHelper_->FetchDatabaseChanges(context, cond, nextCursor, callback);
261 if (ret != E_OK) {
262 LOGE("sdk fetch records err %{public}d", ret);
263 }
264 }
265
266 struct DownloadContext {
267 std::shared_ptr<DKContext> context;
268 vector<DKDownloadAsset> assets;
269 DriveKit::DKDownloadId id;
270 std::function<void(std::shared_ptr<DriveKit::DKContext>,
271 std::shared_ptr<const DriveKit::DKDatabase>,
272 const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &,
273 const DriveKit::DKError &)>
274 resultCallback;
275 std::function<
276 void(std::shared_ptr<DriveKit::DKContext>, DriveKit::DKDownloadAsset, TotalSize, DriveKit::DownloadSize)>
277 progressCallback;
278 };
279
DownloadAssets(DownloadContext & ctx)280 void DataSyncer::DownloadAssets(DownloadContext &ctx)
281 {
282 if (ctx.resultCallback == nullptr) {
283 LOGE("resultCallback nullptr");
284 return;
285 }
286
287 if (ctx.progressCallback == nullptr) {
288 LOGE("progressCallback nullptr");
289 return;
290 }
291
292 if (ctx.assets.size() == 0) {
293 LOGE("no assets to download");
294 return;
295 }
296
297 int32_t ret = sdkHelper_->DownloadAssets(ctx.context, ctx.assets, {}, ctx.id,
298 ctx.resultCallback, ctx.progressCallback);
299 if (ret != E_OK) {
300 LOGE("sdk download assets error %{public}d", ret);
301 /*
302 * 1. remove previous added tasks
303 * 2. invoke callback here to do some statistics in handler
304 * 3. set assets back to handler for its info
305 */
306 shared_ptr<TaskContext> tctx = static_pointer_cast<TaskContext>(ctx.context);
307 tctx->SetAssets(ctx.assets);
308 ctx.resultCallback(ctx.context, nullptr, {}, {});
309 }
310 }
311
DownloadThumbAssets(DownloadContext ctx)312 void DataSyncer::DownloadThumbAssets(DownloadContext ctx)
313 {
314 if (ctx.resultCallback == nullptr) {
315 LOGE("resultCallback nullptr");
316 return;
317 }
318
319 if (ctx.progressCallback == nullptr) {
320 LOGE("progressCallback nullptr");
321 return;
322 }
323
324 if (ctx.assets.size() == 0) {
325 LOGE("no assets to download");
326 return;
327 }
328
329 int32_t ret = sdkHelper_->DownloadAssets(ctx.context, ctx.assets, {}, ctx.id,
330 ctx.resultCallback, ctx.progressCallback);
331 if (ret != E_OK) {
332 LOGE("sdk download assets error %{public}d", ret);
333 shared_ptr<TaskContext> tctx = static_pointer_cast<TaskContext>(ctx.context);
334 tctx->SetAssets(ctx.assets);
335 ctx.resultCallback(ctx.context, nullptr, {}, {});
336 }
337 }
338
FetchRecordsDownloadCallback(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,const map<DKDownloadAsset,DKDownloadResult> & resultMap,const DKError & err)339 void DataSyncer::FetchRecordsDownloadCallback(shared_ptr<DKContext> context,
340 shared_ptr<const DKDatabase> database,
341 const map<DKDownloadAsset, DKDownloadResult> &resultMap,
342 const DKError &err)
343 {
344 if (err.HasError()) {
345 LOGE("DKAssetsDownloader err, localErr: %{public}d, serverErr: %{public}d", static_cast<int>(err.dkErrorCode),
346 err.serverErrorCode);
347 } else {
348 LOGI("DKAssetsDownloader ok");
349 }
350
351 auto ctx = static_pointer_cast<TaskContext>(context);
352 auto handler = ctx->GetHandler();
353 if (resultMap.size() != 0) {
354 handler->OnDownloadAssets(resultMap);
355 } else {
356 /* error account */
357 (void)handler->OnDownloadAssetsFailure(ctx->GetAssets());
358 }
359 }
360
FetchRecordsDownloadProgress(shared_ptr<DKContext> context,DKDownloadAsset asset,TotalSize total,DownloadSize download)361 static void FetchRecordsDownloadProgress(shared_ptr<DKContext> context,
362 DKDownloadAsset asset,
363 TotalSize total,
364 DownloadSize download)
365 {
366 LOGD("record %s %{public}s download progress", asset.recordId.c_str(), asset.fieldKey.c_str());
367 if (total == download) {
368 auto ctx = static_pointer_cast<TaskContext>(context);
369 auto handler = ctx->GetHandler();
370 handler->OnDownloadAssets(asset);
371 }
372 /* account error in result callback but not here */
373 }
374
HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,bool checkOrRetry)375 int DataSyncer::HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,
376 std::shared_ptr<const DKDatabase> database, std::shared_ptr<std::vector<DKRecord>> records, bool checkOrRetry)
377 {
378 if (records->size() == 0) {
379 LOGI("no records to handle");
380 return E_OK;
381 }
382
383 OnFetchParams onFetchParams;
384 auto ctx = static_pointer_cast<TaskContext>(context);
385 auto handler = ctx->GetHandler();
386 if (handler == nullptr) {
387 LOGE("context get handler err");
388 return E_CONTEXT;
389 }
390
391 if (handler->IsPullRecords() && !checkOrRetry) {
392 onFetchParams.totalPullCount = context->GetBatchNo() * handler->GetRecordSize();
393 }
394
395 int32_t ret = handler->OnFetchRecords(records, onFetchParams);
396 if (!onFetchParams.assetsToDownload.empty()) {
397 DownloadContext dctx = {.context = context,
398 .assets = onFetchParams.assetsToDownload,
399 .id = 0,
400 .resultCallback = AsyncCallback(&DataSyncer::FetchRecordsDownloadCallback),
401 .progressCallback = FetchRecordsDownloadProgress};
402 DownloadAssets(dctx);
403 }
404
405 if (ret != E_OK) {
406 LOGE("handler on fetch records err %{public}d", ret);
407 } else {
408 this_thread::sleep_for(chrono::seconds(1));
409 SyncStateChangedNotify(CloudSyncState::DOWNLOADING, ErrorType::NO_ERROR);
410 }
411 if (!checkOrRetry) {
412 handler->FinishPull(context->GetBatchNo());
413 }
414 return ret;
415 }
416
OnFetchRecords(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)417 void DataSyncer::OnFetchRecords(const std::shared_ptr<DKContext> context, std::shared_ptr<const DKDatabase> database,
418 std::shared_ptr<std::vector<DKRecord>> records, DKQueryCursor nextCursor, const DKError &err)
419 {
420 if (err.HasError()) {
421 LOGE("OnFetchRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode, err.dkErrorCode);
422 if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
423 SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
424 }
425 return;
426 }
427
428 LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
429 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
430 auto handler = ctx->GetHandler();
431 if (handler == nullptr) {
432 LOGE("context get handler err");
433 return;
434 }
435 if (ctx->GetBatchNo() == 0) {
436 handler->SetRecordSize(records->size());
437 }
438 /* pull more */
439 if (nextCursor.empty()) {
440 LOGI("no more records");
441 handler->GetTempStartCursor(nextCursor);
442 handler->SetTempNextCursor(nextCursor, true);
443 if (records->size() == 0) {
444 handler->FinishPull(ctx->GetBatchNo());
445 }
446 } else {
447 handler->SetTempNextCursor(nextCursor, false);
448 shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
449 int ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
450 if (ret != E_OK) {
451 LOGE("asyn run pull records err %{public}d", ret);
452 }
453 }
454
455 if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
456 LOGE("HandleOnFetchRecords failed");
457 }
458 }
459
DownloadInner(std::shared_ptr<DataHandler> handler,const std::string path,const int32_t userId)460 int32_t DataSyncer::DownloadInner(std::shared_ptr<DataHandler> handler,
461 const std::string path,
462 const int32_t userId)
463 {
464 auto ctx = std::make_shared<TaskContext>(handler);
465 std::vector<DKDownloadAsset> assetsToDownload;
466 int32_t ret = handler->GetDownloadAsset(path, assetsToDownload);
467 if (ret != E_OK) {
468 LOGE("handler on fetch records err %{public}d", ret);
469 return ret;
470 }
471
472 if (downloadCallbackMgr_.FindDownload(path)) {
473 LOGI("download exist %{public}s", path.c_str());
474 return E_OK;
475 }
476
477 auto downloadResultCallback = [this, path, assetsToDownload, handler](
478 std::shared_ptr<DriveKit::DKContext> context,
479 std::shared_ptr<const DriveKit::DKDatabase> database,
480 const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &results,
481 const DriveKit::DKError &err) {
482 this->downloadCallbackMgr_.OnDownloadedResult(path, assetsToDownload, handler, context, database, results, err);
483 };
484 auto downloadProcessCallback = [this, path](std::shared_ptr<DKContext> context, DKDownloadAsset asset,
485 TotalSize totalSize, DownloadSize downloadSize) {
486 this->downloadCallbackMgr_.OnDownloadProcess(path, context, asset, totalSize, downloadSize);
487 };
488 DownloadContext dctx = {.context = ctx,
489 .assets = assetsToDownload,
490 .id = 0,
491 .resultCallback = downloadResultCallback,
492 .progressCallback = downloadProcessCallback};
493 DownloadAssets(dctx);
494 downloadCallbackMgr_.StartDonwload(path, userId, dctx.id);
495 return E_OK;
496 }
497
OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DriveKit::DKRecord>> records,DKQueryCursor nextCursor,bool hasMore,const DKError & err)498 void DataSyncer::OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,
499 std::shared_ptr<const DKDatabase> database,
500 std::shared_ptr<std::vector<DriveKit::DKRecord>> records, DKQueryCursor nextCursor,
501 bool hasMore, const DKError &err)
502 {
503 LOGI("%{private}d %{private}s on fetch database changes", userId_, bundleName_.c_str());
504 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
505
506 auto handler = ctx->GetHandler();
507 if (handler == nullptr) {
508 LOGE("context get handler err");
509 return;
510 }
511
512 if (err.HasError()) {
513 LOGE("OnFetchDatabaseChanges server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
514 err.dkErrorCode);
515 if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
516 SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
517 } else if (!err.errorDetails.empty()) {
518 DKDetailErrorCode detailCode = static_cast<DKDetailErrorCode>(err.errorDetails[0].detailCode);
519 if (detailCode == DKDetailErrorCode::PARAM_INVALID || detailCode == DKDetailErrorCode::CURSOR_EXPIRED) {
520 handler->SetChecking();
521 Pull(handler);
522 }
523 }
524 return;
525 }
526
527 /* pull more */
528 if (!hasMore) {
529 LOGI("no more records");
530 handler->SetTempNextCursor(nextCursor, true);
531 if (records->size() == 0) {
532 handler->FinishPull(ctx->GetBatchNo());
533 }
534 } else {
535 handler->SetTempNextCursor(nextCursor, false);
536 shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
537 int ret = AsyncRun(nexCtx, &DataSyncer::PullDatabaseChanges);
538 if (ret != E_OK) {
539 LOGE("asyn run pull database changes err %{public}d", ret);
540 }
541 }
542
543 if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
544 LOGE("HandleOnFetchRecords failed");
545 return;
546 }
547 }
548
OnFetchCheckRecords(const shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)549 void DataSyncer::OnFetchCheckRecords(const shared_ptr<DKContext> context,
550 shared_ptr<const DKDatabase> database, shared_ptr<std::vector<DKRecord>> records,
551 DKQueryCursor nextCursor, const DKError &err)
552 {
553 if (err.HasError()) {
554 LOGE("OnFetchCheckRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
555 err.dkErrorCode);
556 if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
557 SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
558 }
559 return;
560 }
561 LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
562 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
563 auto handler = ctx->GetHandler();
564 if (handler == nullptr) {
565 LOGE("context get handler err");
566 return;
567 }
568
569 int32_t ret = E_OK;
570 /* pull more */
571 if (nextCursor.empty()) {
572 LOGI("no more records");
573 handler->GetTempStartCursor(nextCursor);
574 handler->SetTempNextCursor(nextCursor, true);
575 } else {
576 handler->SetTempNextCursor(nextCursor, false);
577 shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
578 ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
579 if (ret != E_OK) {
580 LOGE("asyn run pull records err %{public}d", ret);
581 }
582 }
583
584 std::vector<DriveKit::DKRecordId> checkRecords;
585 ret = handler->GetCheckRecords(checkRecords, records);
586 if (ret != E_OK) {
587 LOGE("handler get check records err %{public}d", ret);
588 return;
589 }
590
591 DataSyncer::PullRecordsWithId(ctx, checkRecords, false);
592 }
593
PullRecordsWithId(shared_ptr<TaskContext> context,const std::vector<DriveKit::DKRecordId> & records,bool retry)594 void DataSyncer::PullRecordsWithId(shared_ptr<TaskContext> context, const std::vector<DriveKit::DKRecordId> &records,
595 bool retry)
596 {
597 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
598 auto handler = ctx->GetHandler();
599 if (handler == nullptr) {
600 LOGE("context get handler err");
601 return;
602 }
603
604 FetchCondition cond;
605 handler->GetFetchCondition(cond);
606 LOGI("retry records count: %{public}u", static_cast<uint32_t>(records.size()));
607 for (auto it : records) {
608 auto callback = AsyncCallback(&DataSyncer::OnFetchRecordWithId);
609 if (callback == nullptr) {
610 LOGE("wrap on fetch records fail");
611 continue;
612 }
613 int32_t ret = sdkHelper_->FetchRecordWithId(context, cond, it, callback);
614 if (ret != E_OK) {
615 LOGE("sdk fetch records err %{public}d", ret);
616 }
617 }
618 if (!retry) {
619 handler->FinishPull(ctx->GetBatchNo());
620 }
621 }
622
OnFetchRecordWithId(shared_ptr<DKContext> context,shared_ptr<DKDatabase> database,DKRecordId recordId,const DKRecord & record,const DKError & error)623 void DataSyncer::OnFetchRecordWithId(shared_ptr<DKContext> context, shared_ptr<DKDatabase> database,
624 DKRecordId recordId, const DKRecord &record, const DKError &error)
625 {
626 auto records = make_shared<std::vector<DKRecord>>();
627 if (error.HasError()) {
628 LOGE("has error, recordId:%s, dkErrorCode :%{public}d, serverErrorCode:%{public}d", recordId.c_str(),
629 error.dkErrorCode, error.serverErrorCode);
630 LOGI("convert to delete record");
631 DKRecord deleteRecord;
632 deleteRecord.SetRecordId(recordId);
633 deleteRecord.SetDelete(true);
634 records->push_back(deleteRecord);
635 } else {
636 LOGI("handle retry record : %s", record.GetRecordId().c_str());
637 records->push_back(record);
638 }
639 auto ctx = static_pointer_cast<DownloadTaskContext>(context);
640 HandleOnFetchRecords(ctx, database, records, true);
641 }
642
RetryDownloadRecords(shared_ptr<TaskContext> context)643 void DataSyncer::RetryDownloadRecords(shared_ptr<TaskContext> context)
644 {
645 auto ctx = static_pointer_cast<TaskContext>(context);
646 auto handler = ctx->GetHandler();
647 if (handler == nullptr) {
648 LOGE("context get handler err");
649 return;
650 }
651
652 vector<DriveKit::DKDownloadAsset> assetsToDownload;
653 int32_t ret = handler->GetAssetsToDownload(assetsToDownload);
654 if (ret != E_OK) {
655 LOGE("get assetsToDownload err %{public}d", ret);
656 return;
657 }
658 if (assetsToDownload.empty()) {
659 return;
660 }
661
662 LOGI("assetsToDownload count: %{public}zu", assetsToDownload.size());
663 DownloadContext dctx = {.context = ctx,
664 .assets = assetsToDownload,
665 .id = 0,
666 .resultCallback = AsyncCallback(&DataSyncer::FetchRecordsDownloadCallback),
667 .progressCallback = FetchRecordsDownloadProgress};
668 DownloadAssets(dctx);
669 }
670
Push(shared_ptr<DataHandler> handler)671 int32_t DataSyncer::Push(shared_ptr<DataHandler> handler)
672 {
673 /*
674 * Although unlikely, if the first callback finds no more records available
675 * and tries to schedule before following tasks commited, the data syncer
676 * will directly schedule to the next stage, while following tasks would be
677 * commited to the next stage mistakenly.
678 * One possible solution: commit dummy task in the beginning and complete
679 * dummy task in the end.
680 */
681 shared_ptr<TaskContext> context = make_shared<TaskContext>(handler);
682
683 /* commit a dummy task */
684 BeginTransaction();
685
686 int32_t ret = AsyncRun(context, &DataSyncer::CreateRecords);
687 if (ret != E_OK) {
688 LOGE("async run create records err %{public}d", ret);
689 return ret;
690 }
691
692 ret = AsyncRun(context, &DataSyncer::DeleteRecords);
693 if (ret != E_OK) {
694 LOGE("async run delete records err %{public}d", ret);
695 return ret;
696 }
697
698 ret = AsyncRun(context, &DataSyncer::ModifyMdirtyRecords);
699 if (ret != E_OK) {
700 LOGE("async run modify mdirty records err %{public}d", ret);
701 return ret;
702 }
703
704 ret = AsyncRun(context, &DataSyncer::ModifyFdirtyRecords);
705 if (ret != E_OK) {
706 LOGE("async run modify fdirty records err %{public}d", ret);
707 return ret;
708 }
709
710 /* complete the dummy task */
711 EndTransaction();
712
713 return E_OK;
714 }
715
Init(const std::string bundleName,const int32_t userId)716 int32_t DataSyncer::Init(const std::string bundleName, const int32_t userId)
717 {
718 return E_OK;
719 }
720
Clean(const int action)721 int32_t DataSyncer::Clean(const int action)
722 {
723 return E_OK;
724 }
725
DisableCloud()726 int32_t DataSyncer::DisableCloud()
727 {
728 return E_OK;
729 }
730
CleanInner(std::shared_ptr<DataHandler> handler,const int action)731 int32_t DataSyncer::CleanInner(std::shared_ptr<DataHandler> handler, const int action)
732 {
733 int ret = 0;
734 LOGD("Enter function DataSyncer::CleanInner");
735 handler->ClearCursor();
736 ret = handler->Clean(action);
737 if (ret != E_OK) {
738 LOGE("Clean file failed res:%{public}d", ret);
739 return ret;
740 }
741 return ret;
742 }
743
CancelDownload(std::shared_ptr<DataHandler> handler)744 int32_t DataSyncer::CancelDownload(std::shared_ptr<DataHandler> handler)
745 {
746 int32_t ret = 0;
747 std::vector<int64_t> downloadIds = downloadCallbackMgr_.StopAllDownloads(userId_);
748 if (!HasSdkHelper()) {
749 LOGW(" sdk helper is null skip cancel download");
750 return ret;
751 }
752 for (const auto &id : downloadIds) {
753 ret = sdkHelper_->CancelDownloadAssets(id);
754 if (ret != NO_ERROR) {
755 LOGE("CancelDownloadAssets fail %{public}d", ret);
756 return ret;
757 }
758 }
759 return ret;
760 }
761
CreateRecords(shared_ptr<TaskContext> context)762 void DataSyncer::CreateRecords(shared_ptr<TaskContext> context)
763 {
764 LOGI("%{private}d %{private}s creates records", userId_, bundleName_.c_str());
765
766 auto handler = context->GetHandler();
767 if (handler == nullptr) {
768 LOGE("context get handler err");
769 return;
770 }
771
772 /* query local */
773 vector<DKRecord> records;
774 int32_t ret = handler->GetCreatedRecords(records);
775 if (ret != E_OK) {
776 LOGE("handler get created records err %{public}d", ret);
777 return;
778 }
779
780 /* no need upload */
781 if (records.size() == 0) {
782 return;
783 }
784
785 if (!BatteryStatus::IsAllowUpload(syncStateManager_.GetForceFlag())) {
786 LOGE("battery status abnormal, abort upload");
787 SetErrorCodeMask(ErrorType::BATTERY_LEVEL_LOW);
788 return;
789 }
790
791 if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
792 LOGE("network status abnormal, abort upload");
793 SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
794 return;
795 }
796
797 /* upload */
798 auto callback = AsyncCallback(&DataSyncer::OnCreateRecords);
799 if (callback == nullptr) {
800 LOGE("wrap on create records fail");
801 return;
802 }
803 ret = sdkHelper_->CreateRecords(context, records, callback);
804 if (ret != E_OK) {
805 LOGE("sdk create records err %{public}d", ret);
806 }
807 }
808
DeleteRecords(shared_ptr<TaskContext> context)809 void DataSyncer::DeleteRecords(shared_ptr<TaskContext> context)
810 {
811 LOGI("%{private}d %{private}s deletes records", userId_, bundleName_.c_str());
812
813 auto handler = context->GetHandler();
814 if (handler == nullptr) {
815 LOGE("context get handler err");
816 return;
817 }
818
819 /* query local */
820 vector<DKRecord> records;
821 int32_t ret = handler->GetDeletedRecords(records);
822 if (ret != E_OK) {
823 LOGE("handler get deleted records err %{public}d", ret);
824 return;
825 }
826
827 /* no need upload */
828 if (records.size() == 0) {
829 return;
830 }
831
832 if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
833 LOGE("network status abnormal, abort upload");
834 SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
835 return;
836 }
837
838 /* upload */
839 auto callback = AsyncCallback(&DataSyncer::OnDeleteRecords);
840 if (callback == nullptr) {
841 LOGE("wrap on delete records fail");
842 return;
843 }
844 ret = sdkHelper_->DeleteRecords(context, records, callback);
845 if (ret != E_OK) {
846 LOGE("sdk delete records err %{public}d", ret);
847 }
848 }
849
ModifyMdirtyRecords(shared_ptr<TaskContext> context)850 void DataSyncer::ModifyMdirtyRecords(shared_ptr<TaskContext> context)
851 {
852 LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
853
854 auto handler = context->GetHandler();
855 if (handler == nullptr) {
856 LOGE("context get handler err");
857 return;
858 }
859
860 /* query local */
861 vector<DKRecord> records;
862 int32_t ret = handler->GetMetaModifiedRecords(records);
863 if (ret != E_OK) {
864 LOGE("handler get modified records err %{public}d", ret);
865 return;
866 }
867
868 /* no need upload */
869 if (records.size() == 0) {
870 return;
871 }
872
873 if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
874 LOGE("network status abnormal, abort upload");
875 SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
876 return;
877 }
878
879 /* upload */
880 auto callback = AsyncCallback(&DataSyncer::OnModifyMdirtyRecords);
881 if (callback == nullptr) {
882 LOGE("wrap on modify records fail");
883 return;
884 }
885 ret = sdkHelper_->ModifyRecords(context, records, callback);
886 if (ret != E_OK) {
887 LOGE("sdk modify records err %{public}d", ret);
888 }
889 }
890
ModifyFdirtyRecords(shared_ptr<TaskContext> context)891 void DataSyncer::ModifyFdirtyRecords(shared_ptr<TaskContext> context)
892 {
893 LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
894
895 auto handler = context->GetHandler();
896 if (handler == nullptr) {
897 LOGE("context get handler err");
898 return;
899 }
900
901 /* query local */
902 vector<DKRecord> records;
903 int32_t ret = handler->GetFileModifiedRecords(records);
904 if (ret != E_OK) {
905 LOGE("handler get modified records err %{public}d", ret);
906 return;
907 }
908
909 /* no need upload */
910 if (records.size() == 0) {
911 return;
912 }
913
914 if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
915 LOGE("network status abnormal, abort upload");
916 SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
917 return;
918 }
919
920 /* upload */
921 auto callback = AsyncCallback(&DataSyncer::OnModifyFdirtyRecords);
922 if (callback == nullptr) {
923 LOGE("wrap on modify records fail");
924 return;
925 }
926 ret = sdkHelper_->ModifyRecords(context, records, callback);
927 if (ret != E_OK) {
928 LOGE("sdk modify records err %{public}d", ret);
929 }
930 }
931
OnCreateRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)932 void DataSyncer::OnCreateRecords(shared_ptr<DKContext> context,
933 shared_ptr<const DKDatabase> database,
934 shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
935 {
936 LOGI("%{private}d %{private}s on create records %{public}zu", userId_,
937 bundleName_.c_str(), map->size());
938
939 auto ctx = static_pointer_cast<TaskContext>(context);
940
941 /* update local */
942 auto handler = ctx->GetHandler();
943 int32_t ret = handler->OnCreateRecords(*map);
944 if (ret != E_OK) {
945 LOGE("handler on create records err %{public}d", ret);
946 UpdateErrorCode(ret);
947 return;
948 } else {
949 this_thread::sleep_for(chrono::seconds(1));
950 SyncStateChangedNotify(CloudSyncState::UPLOADING, ErrorType::NO_ERROR);
951 isDataChanged_ = true;
952 }
953
954 /* push more */
955 ret = AsyncRun(ctx, &DataSyncer::CreateRecords);
956 if (ret != E_OK) {
957 LOGE("async run create records err %{public}d", ret);
958 return;
959 }
960 }
961
OnDeleteRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)962 void DataSyncer::OnDeleteRecords(shared_ptr<DKContext> context,
963 shared_ptr<const DKDatabase> database,
964 shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
965 {
966 LOGI("%{private}d %{private}s on delete records %{public}zu", userId_,
967 bundleName_.c_str(), map->size());
968
969 auto ctx = static_pointer_cast<TaskContext>(context);
970
971 /* update local */
972 auto handler = ctx->GetHandler();
973 int32_t ret = handler->OnDeleteRecords(*map);
974 if (ret != E_OK) {
975 LOGE("handler on delete records err %{public}d", ret);
976 UpdateErrorCode(ret);
977 return;
978 } else {
979 isDataChanged_ = true;
980 }
981
982 /* push more */
983 ret = AsyncRun(ctx, &DataSyncer::DeleteRecords);
984 if (ret != E_OK) {
985 LOGE("async run delete records err %{public}d", ret);
986 return;
987 }
988 }
989
OnModifyMdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)990 void DataSyncer::OnModifyMdirtyRecords(shared_ptr<DKContext> context,
991 shared_ptr<const DKDatabase> database,
992 shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
993 shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
994 const DKError &err)
995 {
996 LOGI("%{private}d %{private}s on modify mdirty records %{public}zu", userId_,
997 bundleName_.c_str(), saveMap->size());
998
999 auto ctx = static_pointer_cast<TaskContext>(context);
1000
1001 /* update local */
1002 auto handler = ctx->GetHandler();
1003 int32_t ret = handler->OnModifyMdirtyRecords(*saveMap);
1004 if (ret != E_OK) {
1005 LOGE("handler on modify records err %{public}d", ret);
1006 UpdateErrorCode(ret);
1007 return;
1008 } else {
1009 isDataChanged_ = true;
1010 }
1011
1012 /* push more */
1013 ret = AsyncRun(ctx, &DataSyncer::ModifyMdirtyRecords);
1014 if (ret != E_OK) {
1015 LOGE("async run modify records err %{public}d", ret);
1016 return;
1017 }
1018 }
1019
OnModifyFdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)1020 void DataSyncer::OnModifyFdirtyRecords(shared_ptr<DKContext> context,
1021 shared_ptr<const DKDatabase> database,
1022 shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
1023 shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
1024 const DKError &err)
1025 {
1026 LOGI("%{private}d %{private}s on modify fdirty records %{public}zu", userId_,
1027 bundleName_.c_str(), saveMap->size());
1028
1029 auto ctx = static_pointer_cast<TaskContext>(context);
1030
1031 /* update local */
1032 auto handler = ctx->GetHandler();
1033 int32_t ret = handler->OnModifyFdirtyRecords(*saveMap);
1034 if (ret != E_OK) {
1035 LOGE("handler on modify records err %{public}d", ret);
1036 UpdateErrorCode(ret);
1037 return;
1038 } else {
1039 isDataChanged_ = true;
1040 }
1041
1042 /* push more */
1043 ret = AsyncRun(ctx, &DataSyncer::ModifyFdirtyRecords);
1044 if (ret != E_OK) {
1045 LOGE("async run modify records err %{public}d", ret);
1046 return;
1047 }
1048 }
1049
BeginTransaction()1050 void DataSyncer::BeginTransaction()
1051 {
1052 taskRunner_->CommitDummyTask();
1053 }
1054
EndTransaction()1055 void DataSyncer::EndTransaction()
1056 {
1057 taskRunner_->CompleteDummyTask();
1058 }
1059
GetBundleName() const1060 std::string DataSyncer::GetBundleName() const
1061 {
1062 return bundleName_;
1063 }
1064
GetUserId() const1065 int32_t DataSyncer::GetUserId() const
1066 {
1067 return userId_;
1068 }
1069
GetSyncState() const1070 SyncState DataSyncer::GetSyncState() const
1071 {
1072 return syncStateManager_.GetSyncState();
1073 }
1074
CompletePull()1075 int32_t DataSyncer::CompletePull()
1076 {
1077 LOGI("%{private}d %{private}s completes pull", userId_, bundleName_.c_str());
1078 /* call syncer manager callback */
1079 auto error = GetErrorType();
1080 if (error) {
1081 LOGE("pull failed, errorType:%{public}d", error);
1082 SyncStateChangedNotify(CloudSyncState::DOWNLOAD_FAILED, error);
1083 Complete(false);
1084 } else {
1085 /* schedule to next stage */
1086 Schedule();
1087 }
1088 return E_OK;
1089 }
1090
CompletePush()1091 int32_t DataSyncer::CompletePush()
1092 {
1093 LOGI("%{private}d %{public}s completes push", userId_, bundleName_.c_str());
1094 /* call syncer manager callback */
1095 auto error = GetErrorType();
1096 if (error) {
1097 LOGE("pull failed, errorType:%{public}d", error);
1098 SyncStateChangedNotify(CloudSyncState::UPLOAD_FAILED, error);
1099 Complete(false);
1100 } else {
1101 /* schedule to next stage */
1102 Schedule();
1103 }
1104 return E_OK;
1105 }
1106
CompleteAll(bool isNeedNotify)1107 void DataSyncer::CompleteAll(bool isNeedNotify)
1108 {
1109 LOGI("%{private}d %{private}s completes all", userId_, bundleName_.c_str());
1110
1111 /* guarantee: unlock if locked */
1112 ForceUnlock();
1113
1114 /* reset internal status */
1115 Reset();
1116
1117 SyncState syncState = SyncState::SYNC_SUCCEED;
1118 if (errorCode_ != E_OK) {
1119 syncState = SyncState::SYNC_FAILED;
1120 }
1121
1122 /* sys event report and free */
1123 ReportSysEvent(errorCode_);
1124 FreeSysEventData();
1125
1126 CloudSyncState notifyState = CloudSyncState::COMPLETED;
1127 if (syncStateManager_.GetStopSyncFlag()) {
1128 notifyState = CloudSyncState::STOPPED;
1129 } else {
1130 SaveSubscription();
1131 }
1132
1133 auto nextAction = syncStateManager_.UpdateSyncState(syncState);
1134 DataSyncerRdbStore::GetInstance().UpdateSyncState(userId_, bundleName_, syncState);
1135
1136 if (nextAction == Action::START) {
1137 /* Retrigger sync, clear errorcode */
1138 errorCode_ = E_OK;
1139 StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1140 return;
1141 } else if (nextAction == Action::FORCE_START) {
1142 /* Retrigger sync, clear errorcode */
1143 errorCode_ = E_OK;
1144 StartSync(true, SyncTriggerType::PENDING_TRIGGER);
1145 return;
1146 } else if (nextAction == Action::CHECK) {
1147 errorCode_ = E_OK;
1148 StartSync(false, SyncTriggerType::TASK_TRIGGER);
1149 return;
1150 } else {
1151 TaskStateManager::GetInstance().CompleteTask(bundleName_, TaskType::SYNC_TASK);
1152 }
1153
1154 /* notify sync state */
1155 if (isNeedNotify) {
1156 SyncStateChangedNotify(notifyState, ErrorType::NO_ERROR);
1157 }
1158 /* clear errorcode */
1159 errorCode_ = E_OK;
1160 }
1161
BeginClean()1162 void DataSyncer::BeginClean()
1163 {
1164 *stopFlag_ = true;
1165 /* stop all the tasks and wait for tasks' termination */
1166 taskRunner_->ReleaseTask();
1167 TaskStateManager::GetInstance().StartTask(bundleName_, TaskType::CLEAN_TASK);
1168 /* set cleaning state */
1169 (void)syncStateManager_.SetCleaningFlag();
1170 }
1171
CompleteClean()1172 void DataSyncer::CompleteClean()
1173 {
1174 DeleteSubscription();
1175 DataSyncerRdbStore::GetInstance().UpdateSyncState(userId_, bundleName_, SyncState::CLEAN_SUCCEED);
1176 TaskStateManager::GetInstance().CompleteTask(bundleName_, TaskType::CLEAN_TASK);
1177 auto nextAction = syncStateManager_.UpdateSyncState(SyncState::CLEAN_SUCCEED);
1178 if (nextAction != Action::STOP) {
1179 /* Retrigger sync, clear errorcode */
1180 if (!CloudStatus::IsCloudStatusOkay(bundleName_, userId_)) {
1181 LOGE("cloud status is not OK");
1182 return;
1183 }
1184 StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1185 }
1186 *stopFlag_ = false;
1187 }
1188
BeginDisableCloud()1189 void DataSyncer::BeginDisableCloud()
1190 {
1191 /* stop all the tasks and wait for tasks' termination */
1192 TaskStateManager::GetInstance().StartTask(bundleName_, TaskType::DISABLE_CLOUD_TASK);
1193 /* set disable cloud state */
1194 (void)syncStateManager_.SetDisableCloudFlag();
1195 }
1196
CompleteDisableCloud()1197 void DataSyncer::CompleteDisableCloud()
1198 {
1199 DataSyncerRdbStore::GetInstance().UpdateSyncState(userId_, bundleName_, SyncState::DISABLE_CLOUD_SUCCEED);
1200 TaskStateManager::GetInstance().CompleteTask(bundleName_, TaskType::DISABLE_CLOUD_TASK);
1201 auto nextAction = syncStateManager_.UpdateSyncState(SyncState::DISABLE_CLOUD_SUCCEED);
1202 if (nextAction != Action::STOP) {
1203 /* Retrigger sync, clear errorcode */
1204 if (!CloudStatus::IsCloudStatusOkay(bundleName_, userId_)) {
1205 LOGE("cloud status is not OK");
1206 return;
1207 }
1208 StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1209 }
1210 }
1211
SyncStateChangedNotify(const CloudSyncState state,const ErrorType error)1212 void DataSyncer::SyncStateChangedNotify(const CloudSyncState state, const ErrorType error)
1213 {
1214 CurrentSyncState_ = state;
1215 CurrentErrorType_ = error;
1216 CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(bundleName_, state, error);
1217 }
1218
NotifyCurrentSyncState()1219 void DataSyncer::NotifyCurrentSyncState()
1220 {
1221 CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(bundleName_, CurrentSyncState_, CurrentErrorType_);
1222 }
1223
UpdateErrorCode(int32_t code)1224 void DataSyncer::UpdateErrorCode(int32_t code)
1225 {
1226 switch (code) {
1227 case E_SYNC_FAILED_NETWORK_NOT_AVAILABLE:
1228 SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
1229 break;
1230 case E_LOCAL_STORAGE_FULL:
1231 SetErrorCodeMask(ErrorType::LOCAL_STORAGE_FULL);
1232 break;
1233 case E_CLOUD_STORAGE_FULL:
1234 SetErrorCodeMask(ErrorType::CLOUD_STORAGE_FULL);
1235 break;
1236 case E_SYNC_FAILED_BATTERY_LOW:
1237 SetErrorCodeMask(ErrorType::BATTERY_LEVEL_LOW);
1238 break;
1239 default:
1240 LOGI("ignore errcode, errcode: %{public}d", code);
1241 break;
1242 }
1243 }
1244
SetErrorCodeMask(ErrorType errorType)1245 void DataSyncer::SetErrorCodeMask(ErrorType errorType)
1246 {
1247 errorCode_ |= 1 << static_cast<uint32_t>(errorType);
1248 }
1249
GetErrorType()1250 ErrorType DataSyncer::GetErrorType()
1251 {
1252 if (errorCode_ == E_OK) {
1253 return ErrorType::NO_ERROR;
1254 }
1255
1256 std::vector<ErrorType> errorTypes = {NETWORK_UNAVAILABLE, WIFI_UNAVAILABLE, BATTERY_LEVEL_WARNING,
1257 BATTERY_LEVEL_LOW, CLOUD_STORAGE_FULL, LOCAL_STORAGE_FULL};
1258 for (const auto &errorType : errorTypes) {
1259 if (errorCode_ & (1 << static_cast<uint32_t>(errorType))) {
1260 return errorType;
1261 }
1262 }
1263 LOGE("errorcode unexpected, errcode: %{public}u", errorCode_);
1264 return ErrorType::NO_ERROR;
1265 }
1266
SaveSubscription()1267 void DataSyncer::SaveSubscription()
1268 {
1269 sdkHelper_->SaveSubscription([] (auto, shared_ptr<DriveKit::DKContainer>,
1270 DriveKit::DKSubscriptionResult & res) {
1271 if (!res.IsSuccess()) {
1272 auto err = res.GetDKError();
1273 LOGE("drivekit save subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1274 err.dkErrorCode);
1275 }
1276 });
1277 }
1278
DeleteSubscription()1279 void DataSyncer::DeleteSubscription()
1280 {
1281 if (!HasSdkHelper()) {
1282 LOGW(" sdk helper is null skip delete subscription");
1283 return;
1284 }
1285
1286 sdkHelper_->DeleteSubscription([] (auto, DriveKit::DKError err) {
1287 if (err.HasError()) {
1288 LOGE("drivekit delete subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1289 err.dkErrorCode);
1290 }
1291 });
1292 }
1293
OptimizeStorage(const int32_t agingDays)1294 int32_t DataSyncer::OptimizeStorage(const int32_t agingDays)
1295 {
1296 return E_OK;
1297 }
1298
HasSdkHelper()1299 bool DataSyncer::HasSdkHelper()
1300 {
1301 if (sdkHelper_ == nullptr) {
1302 return false;
1303 }
1304 return true;
1305 }
1306
1307 // download the thumb and lcd of file when screenoff and charging
DownloadThumb(int32_t type)1308 int32_t DataSyncer::DownloadThumb(int32_t type)
1309 {
1310 return E_OK;
1311 }
1312
DownloadThumbInner(std::shared_ptr<DataHandler> handler)1313 int32_t DataSyncer::DownloadThumbInner(std::shared_ptr<DataHandler> handler)
1314 {
1315 if (syncStateManager_.GetSyncState() == SyncState::CLEANING ||
1316 syncStateManager_.GetSyncState() == SyncState::DISABLE_CLOUD) {
1317 LOGI("downloading or cleaning, not to trigger thumb downloading");
1318 return E_STOP;
1319 }
1320
1321 if (!TaskStateManager::GetInstance().HasTask(bundleName_, TaskType::DOWNLOAD_THUMB_TASK)) {
1322 LOGI("stop download thumb");
1323 return E_STOP;
1324 }
1325 vector<DriveKit::DKDownloadAsset> assetsToDownload;
1326 int32_t ret = handler->GetThumbToDownload(assetsToDownload);
1327 if (ret != E_OK) {
1328 LOGE("get assetsToDownload err %{public}d", ret);
1329 return E_STOP;
1330 }
1331 if (assetsToDownload.empty()) {
1332 return E_STOP;
1333 }
1334
1335 LOGI("assetsToDownload count: %{public}zu", assetsToDownload.size());
1336 auto ctx = std::make_shared<TaskContext>(handler);
1337 auto callback = [this] (shared_ptr<DKContext> context,
1338 shared_ptr<const DKDatabase> database,
1339 const map<DKDownloadAsset, DKDownloadResult> &resultMap,
1340 const DKError &err) {
1341 DataSyncer::FetchThumbDownloadCallback(context, database, resultMap, err);
1342 };
1343 DownloadContext dctx = {.context = ctx,
1344 .assets = assetsToDownload,
1345 .id = 0,
1346 .resultCallback = callback,
1347 .progressCallback = FetchRecordsDownloadProgress};
1348 std::thread([this, dctx]() { this->DownloadThumbAssets(dctx); }).detach();
1349 return E_OK;
1350 }
1351
FetchThumbDownloadCallback(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,const map<DKDownloadAsset,DKDownloadResult> & resultMap,const DKError & err)1352 void DataSyncer::FetchThumbDownloadCallback(shared_ptr<DKContext> context,
1353 shared_ptr<const DKDatabase> database,
1354 const map<DKDownloadAsset, DKDownloadResult> &resultMap,
1355 const DKError &err)
1356 {
1357 auto ctx = static_pointer_cast<TaskContext>(context);
1358 auto handler = ctx->GetHandler();
1359 handler->OnDownloadAssets(resultMap);
1360 if (err.HasError()) {
1361 LOGE("DKAssetsDownloader err, localErr: %{public}d, serverErr: %{public}d", static_cast<int>(err.dkErrorCode),
1362 err.serverErrorCode);
1363 StopDownloadThumb();
1364 return;
1365 }
1366 LOGI("DKAssetsDownloader ok");
1367 if (handler->GetDownloadType() == DataHandler::DownloadThmType::SCREENOFF_TRIGGER) {
1368 if (!CheckScreenAndWifi()) {
1369 LOGI("download thumb condition is not met");
1370 StopDownloadThumb();
1371 return;
1372 }
1373 }
1374 if (DownloadThumbInner(handler) == E_STOP) {
1375 StopDownloadThumb();
1376 }
1377 }
1378
CheckScreenAndWifi()1379 bool DataSyncer::CheckScreenAndWifi()
1380 {
1381 if (NetworkStatus::GetNetConnStatus() == NetworkStatus::WIFI_CONNECT) {
1382 return true;
1383 }
1384 return false;
1385 }
1386
CleanCache(const string & uri)1387 int32_t DataSyncer::CleanCache(const string &uri)
1388 {
1389 return E_OK;
1390 }
1391
StopDownloadThumb()1392 void DataSyncer::StopDownloadThumb()
1393 {
1394 return;
1395 }
1396
StopUploadAssets()1397 void DataSyncer::StopUploadAssets()
1398 {
1399 sdkHelper_->Release();
1400 }
1401
InitSysEventData()1402 int32_t DataSyncer::InitSysEventData()
1403 {
1404 return E_OK;
1405 }
1406
FreeSysEventData()1407 void DataSyncer::FreeSysEventData()
1408 {
1409 }
1410
ReportSysEvent(uint32_t code)1411 void DataSyncer::ReportSysEvent(uint32_t code)
1412 {
1413 }
1414
SetFullSyncSysEvent()1415 void DataSyncer::SetFullSyncSysEvent()
1416 {
1417 }
1418
SetCheckSysEvent()1419 void DataSyncer::SetCheckSysEvent()
1420 {
1421 }
1422 } // namespace CloudSync
1423 } // namespace FileManagement
1424 } // namespace OHOS
1425