• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "data_syncer.h"
17 
18 #include <functional>
19 
20 #include "data_sync_const.h"
21 #include "data_sync_notifier.h"
22 #include "data_syncer_rdb_store.h"
23 #include "dfs_error.h"
24 #include "ipc/cloud_sync_callback_manager.h"
25 #include "sdk_helper.h"
26 #include "sync_rule/battery_status.h"
27 #include "sync_rule/network_status.h"
28 #include "sync_rule/screen_status.h"
29 #include "sync_rule/cloud_status.h"
30 #include "task_state_manager.h"
31 #include "utils_log.h"
32 
33 namespace OHOS {
34 namespace FileManagement {
35 namespace CloudSync {
36 using namespace std;
37 using namespace placeholders;
38 using namespace DriveKit;
39 using ChangeType = AAFwk::ChangeInfo::ChangeType;
40 
DataSyncer(const std::string bundleName,const int32_t userId)41 DataSyncer::DataSyncer(const std::string bundleName, const int32_t userId)
42     : bundleName_(bundleName), userId_(userId)
43 {
44     /* alloc task runner */
45     taskRunner_ = DelayedSingleton<TaskManager>::GetInstance()->AllocRunner(userId,
46         bundleName, bind(&DataSyncer::Schedule, this));
47     taskRunner_->SetStopFlag(stopFlag_);
48     downloadCallbackMgr_.SetBundleName(bundleName);
49 }
50 
~DataSyncer()51 DataSyncer::~DataSyncer()
52 {
53     /* release task runner */
54     DelayedSingleton<TaskManager>::GetInstance()->ReleaseRunner(userId_, bundleName_);
55 }
56 
AsyncRun(std::shared_ptr<TaskContext> context,void (DataSyncer::* f)(std::shared_ptr<TaskContext>))57 int32_t DataSyncer::AsyncRun(std::shared_ptr<TaskContext> context,
58     void(DataSyncer::*f)(std::shared_ptr<TaskContext>))
59 {
60     return taskRunner_->AsyncRun<DataSyncer>(context, f, this);
61 }
62 
63 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...))64 function<RET(ARGS...)> DataSyncer::AsyncCallback(RET(T::*f)(ARGS...))
65 {
66     return taskRunner_->AsyncCallback<DataSyncer>(f, this);
67 }
68 
StartSync(bool forceFlag,SyncTriggerType triggerType)69 int32_t DataSyncer::StartSync(bool forceFlag, SyncTriggerType triggerType)
70 {
71     LOGI("%{private}d %{public}s starts sync, isforceSync %{public}d, triggerType %{public}d",
72         userId_, bundleName_.c_str(), forceFlag, triggerType);
73 
74     triggerType_ = triggerType;
75     startTime_ = GetCurrentTimeStamp();
76 
77     /* only one specific data sycner running at a time */
78     if (syncStateManager_.CheckAndSetPending(forceFlag, triggerType)) {
79         LOGI("syncing, pending sync");
80         return E_PENDING;
81     }
82 
83     int32_t ret = InitSysEventData();
84     if (ret != E_OK) {
85         return E_DATA;
86     }
87 
88     TaskStateManager::GetInstance().StartTask(bundleName_, TaskType::SYNC_TASK);
89     /* start data sync */
90     ScheduleByType(triggerType);
91 
92     return E_OK;
93 }
94 
StopSync(SyncTriggerType triggerType)95 int32_t DataSyncer::StopSync(SyncTriggerType triggerType)
96 {
97     LOGI("%{private}d %{public}s stops sync, trigger stop sync, type:%{public}d",
98         userId_, bundleName_.c_str(), triggerType);
99     syncStateManager_.SetStopSyncFlag();
100     StopUploadAssets();
101     Abort();
102     return E_OK;
103 }
104 
Lock()105 int32_t DataSyncer::Lock()
106 {
107     return E_OK;
108 }
109 
Unlock()110 void DataSyncer::Unlock() {}
111 
ForceUnlock()112 void DataSyncer::ForceUnlock() {}
113 
StartDownloadFile(const std::string path,const int32_t userId)114 int32_t DataSyncer::StartDownloadFile(const std::string path, const int32_t userId)
115 {
116     return E_OK;
117 }
118 
StopDownloadFile(const std::string path,const int32_t userId)119 int32_t DataSyncer::StopDownloadFile(const std::string path, const int32_t userId)
120 {
121     DownloadProgressObj download;
122     if (downloadCallbackMgr_.StopDonwload(path, userId, download)) {
123         int64_t downloadId = download.downloadId;
124         int32_t res = sdkHelper_->CancelDownloadAssets(downloadId);
125         LOGD("CancelDownloadAssets result: %d", res);
126         if (res != NO_ERROR) {
127             LOGE("CancelDownloadAssets fail %{public}d", res);
128             return res;
129         }
130         downloadCallbackMgr_.NotifyProcessStop(download);
131     }
132     return E_OK;
133 }
134 
135 
RegisterDownloadFileCallback(const int32_t userId,const sptr<ICloudDownloadCallback> downloadCallback)136 int32_t DataSyncer::RegisterDownloadFileCallback(const int32_t userId,
137                                                  const sptr<ICloudDownloadCallback> downloadCallback)
138 {
139     downloadCallbackMgr_.RegisterCallback(userId, downloadCallback);
140     return E_OK;
141 }
142 
UnregisterDownloadFileCallback(const int32_t userId)143 int32_t DataSyncer::UnregisterDownloadFileCallback(const int32_t userId)
144 {
145     downloadCallbackMgr_.UnregisterCallback(userId);
146     return E_OK;
147 }
148 
Abort()149 void DataSyncer::Abort()
150 {
151     LOGI("%{private}d %{private}s aborts", userId_, bundleName_.c_str());
152     thread ([this]() {
153         taskRunner_->ReleaseTask();
154         /* call the syncer manager's callback for notification */
155         Complete();
156     }).detach();
157 }
158 
SetSdkHelper(shared_ptr<SdkHelper> & sdkHelper)159 void DataSyncer::SetSdkHelper(shared_ptr<SdkHelper> &sdkHelper)
160 {
161     sdkHelper_ = sdkHelper;
162 }
163 
Pull(shared_ptr<DataHandler> handler)164 int32_t DataSyncer::Pull(shared_ptr<DataHandler> handler)
165 {
166     LOGI("%{private}d %{private}s pull", userId_, bundleName_.c_str());
167 
168     shared_ptr<TaskContext> context = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
169 
170     RetryDownloadRecords(context);
171     vector<DKRecordId> records;
172     int32_t ret = handler->GetRetryRecords(records);
173     if (ret == E_OK) {
174         DataSyncer::PullRecordsWithId(context, records, true);
175     }
176 
177     /* Full synchronization and incremental synchronization */
178     if (handler->IsPullRecords()) {
179         SetFullSyncSysEvent();
180         if (handler->GetCheckFlag()) {
181             SetCheckSysEvent();
182         }
183         ret = AsyncRun(context, &DataSyncer::PullRecords);
184     } else {
185         ret = AsyncRun(context, &DataSyncer::PullDatabaseChanges);
186     }
187     if (ret != E_OK) {
188         LOGE("asyn run pull records err %{public}d", ret);
189         return ret;
190     }
191     return E_OK;
192 }
193 
194 /*
195  * Although records from the cloud should be all pulled down before
196  * uploading the local change, conflicts might be rare in most cases,
197  * and the syncer would just move on.
198  */
PullRecords(shared_ptr<TaskContext> context)199 void DataSyncer::PullRecords(shared_ptr<TaskContext> context)
200 {
201     LOGI("%{private}d %{private}s pull records", userId_, bundleName_.c_str());
202 
203     /* get query condition here */
204     auto handler = context->GetHandler();
205     if (handler == nullptr) {
206         LOGE("context get handler err");
207         return;
208     }
209 
210     FetchCondition cond;
211     handler->GetFetchCondition(cond);
212 
213     DKQueryCursor tempStartCursor;
214     handler->GetTempStartCursor(tempStartCursor);
215     if (tempStartCursor.empty()) {
216         sdkHelper_->GetStartCursor(cond.recordType, tempStartCursor);
217         handler->SetTempStartCursor(tempStartCursor);
218     }
219 
220     DKQueryCursor nextCursor;
221     handler->GetNextCursor(nextCursor);
222 
223     SdkHelper::FetchRecordsCallback callback = nullptr;
224     if (handler->GetCheckFlag()) {
225         callback = AsyncCallback(&DataSyncer::OnFetchCheckRecords);
226     } else {
227         callback = AsyncCallback(&DataSyncer::OnFetchRecords);
228     }
229     if (callback == nullptr) {
230         LOGE("wrap on fetch records fail");
231         return;
232     }
233 
234     int32_t ret = sdkHelper_->FetchRecords(context, cond, nextCursor, callback);
235     if (ret != E_OK) {
236         LOGE("sdk fetch records err %{public}d", ret);
237     }
238 }
239 
PullDatabaseChanges(shared_ptr<TaskContext> context)240 void DataSyncer::PullDatabaseChanges(shared_ptr<TaskContext> context)
241 {
242     LOGI("%{private}d %{private}s pull database changes", userId_, bundleName_.c_str());
243 
244     auto callback = AsyncCallback(&DataSyncer::OnFetchDatabaseChanges);
245     if (callback == nullptr) {
246         LOGE("wrap on fetch records fail");
247         return;
248     }
249 
250     auto handler = context->GetHandler();
251     if (handler == nullptr) {
252         LOGE("context get handler err");
253         return;
254     }
255     DKQueryCursor nextCursor;
256     handler->GetNextCursor(nextCursor);
257 
258     FetchCondition cond;
259     handler->GetFetchCondition(cond);
260     int32_t ret = sdkHelper_->FetchDatabaseChanges(context, cond, nextCursor, callback);
261     if (ret != E_OK) {
262         LOGE("sdk fetch records err %{public}d", ret);
263     }
264 }
265 
266 struct DownloadContext {
267     std::shared_ptr<DKContext> context;
268     vector<DKDownloadAsset> assets;
269     DriveKit::DKDownloadId id;
270     std::function<void(std::shared_ptr<DriveKit::DKContext>,
271                        std::shared_ptr<const DriveKit::DKDatabase>,
272                        const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &,
273                        const DriveKit::DKError &)>
274         resultCallback;
275     std::function<
276         void(std::shared_ptr<DriveKit::DKContext>, DriveKit::DKDownloadAsset, TotalSize, DriveKit::DownloadSize)>
277         progressCallback;
278 };
279 
DownloadAssets(DownloadContext & ctx)280 void DataSyncer::DownloadAssets(DownloadContext &ctx)
281 {
282     if (ctx.resultCallback == nullptr) {
283         LOGE("resultCallback nullptr");
284         return;
285     }
286 
287     if (ctx.progressCallback == nullptr) {
288         LOGE("progressCallback nullptr");
289         return;
290     }
291 
292     if (ctx.assets.size() == 0) {
293         LOGE("no assets to download");
294         return;
295     }
296 
297     int32_t ret = sdkHelper_->DownloadAssets(ctx.context, ctx.assets, {}, ctx.id,
298         ctx.resultCallback, ctx.progressCallback);
299     if (ret != E_OK) {
300         LOGE("sdk download assets error %{public}d", ret);
301         /*
302          * 1. remove previous added tasks
303          * 2. invoke callback here to do some statistics in handler
304          * 3. set assets back to handler for its info
305          */
306         shared_ptr<TaskContext> tctx = static_pointer_cast<TaskContext>(ctx.context);
307         tctx->SetAssets(ctx.assets);
308         ctx.resultCallback(ctx.context, nullptr, {}, {});
309     }
310 }
311 
DownloadThumbAssets(DownloadContext ctx)312 void DataSyncer::DownloadThumbAssets(DownloadContext ctx)
313 {
314     if (ctx.resultCallback == nullptr) {
315         LOGE("resultCallback nullptr");
316         return;
317     }
318 
319     if (ctx.progressCallback == nullptr) {
320         LOGE("progressCallback nullptr");
321         return;
322     }
323 
324     if (ctx.assets.size() == 0) {
325         LOGE("no assets to download");
326         return;
327     }
328 
329     int32_t ret = sdkHelper_->DownloadAssets(ctx.context, ctx.assets, {}, ctx.id,
330         ctx.resultCallback, ctx.progressCallback);
331     if (ret != E_OK) {
332         LOGE("sdk download assets error %{public}d", ret);
333         shared_ptr<TaskContext> tctx = static_pointer_cast<TaskContext>(ctx.context);
334         tctx->SetAssets(ctx.assets);
335         ctx.resultCallback(ctx.context, nullptr, {}, {});
336     }
337 }
338 
FetchRecordsDownloadCallback(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,const map<DKDownloadAsset,DKDownloadResult> & resultMap,const DKError & err)339 void DataSyncer::FetchRecordsDownloadCallback(shared_ptr<DKContext> context,
340                                               shared_ptr<const DKDatabase> database,
341                                               const map<DKDownloadAsset, DKDownloadResult> &resultMap,
342                                               const DKError &err)
343 {
344     if (err.HasError()) {
345         LOGE("DKAssetsDownloader err, localErr: %{public}d, serverErr: %{public}d", static_cast<int>(err.dkErrorCode),
346              err.serverErrorCode);
347     } else {
348         LOGI("DKAssetsDownloader ok");
349     }
350 
351     auto ctx = static_pointer_cast<TaskContext>(context);
352     auto handler = ctx->GetHandler();
353     if (resultMap.size() != 0) {
354         handler->OnDownloadAssets(resultMap);
355     } else {
356         /* error account */
357         (void)handler->OnDownloadAssetsFailure(ctx->GetAssets());
358     }
359 }
360 
FetchRecordsDownloadProgress(shared_ptr<DKContext> context,DKDownloadAsset asset,TotalSize total,DownloadSize download)361 static void FetchRecordsDownloadProgress(shared_ptr<DKContext> context,
362                                          DKDownloadAsset asset,
363                                          TotalSize total,
364                                          DownloadSize download)
365 {
366     LOGD("record %s %{public}s download progress", asset.recordId.c_str(), asset.fieldKey.c_str());
367     if (total == download) {
368         auto ctx = static_pointer_cast<TaskContext>(context);
369         auto handler = ctx->GetHandler();
370         handler->OnDownloadAssets(asset);
371     }
372     /* account error in result callback but not here */
373 }
374 
HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,bool checkOrRetry)375 int DataSyncer::HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,
376     std::shared_ptr<const DKDatabase> database, std::shared_ptr<std::vector<DKRecord>> records, bool checkOrRetry)
377 {
378     if (records->size() == 0) {
379         LOGI("no records to handle");
380         return E_OK;
381     }
382 
383     OnFetchParams onFetchParams;
384     auto ctx = static_pointer_cast<TaskContext>(context);
385     auto handler = ctx->GetHandler();
386     if (handler == nullptr) {
387         LOGE("context get handler err");
388         return E_CONTEXT;
389     }
390 
391     if (handler->IsPullRecords() && !checkOrRetry) {
392         onFetchParams.totalPullCount = context->GetBatchNo() * handler->GetRecordSize();
393     }
394 
395     int32_t ret = handler->OnFetchRecords(records, onFetchParams);
396     if (!onFetchParams.assetsToDownload.empty()) {
397         DownloadContext dctx = {.context = context,
398                                 .assets = onFetchParams.assetsToDownload,
399                                 .id = 0,
400                                 .resultCallback = AsyncCallback(&DataSyncer::FetchRecordsDownloadCallback),
401                                 .progressCallback = FetchRecordsDownloadProgress};
402         DownloadAssets(dctx);
403     }
404 
405     if (ret != E_OK) {
406         LOGE("handler on fetch records err %{public}d", ret);
407     } else {
408         this_thread::sleep_for(chrono::seconds(1));
409         SyncStateChangedNotify(CloudSyncState::DOWNLOADING, ErrorType::NO_ERROR);
410     }
411     if (!checkOrRetry) {
412         handler->FinishPull(context->GetBatchNo());
413     }
414     return ret;
415 }
416 
OnFetchRecords(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)417 void DataSyncer::OnFetchRecords(const std::shared_ptr<DKContext> context, std::shared_ptr<const DKDatabase> database,
418     std::shared_ptr<std::vector<DKRecord>> records, DKQueryCursor nextCursor, const DKError &err)
419 {
420     if (err.HasError()) {
421         LOGE("OnFetchRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode, err.dkErrorCode);
422         if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
423             SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
424         }
425         return;
426     }
427 
428     LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
429     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
430     auto handler = ctx->GetHandler();
431     if (handler == nullptr) {
432         LOGE("context get handler err");
433         return;
434     }
435     if (ctx->GetBatchNo() == 0) {
436         handler->SetRecordSize(records->size());
437     }
438     /* pull more */
439     if (nextCursor.empty()) {
440         LOGI("no more records");
441         handler->GetTempStartCursor(nextCursor);
442         handler->SetTempNextCursor(nextCursor, true);
443         if (records->size() == 0) {
444             handler->FinishPull(ctx->GetBatchNo());
445         }
446     } else {
447         handler->SetTempNextCursor(nextCursor, false);
448         shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
449         int ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
450         if (ret != E_OK) {
451             LOGE("asyn run pull records err %{public}d", ret);
452         }
453     }
454 
455     if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
456         LOGE("HandleOnFetchRecords failed");
457     }
458 }
459 
DownloadInner(std::shared_ptr<DataHandler> handler,const std::string path,const int32_t userId)460 int32_t DataSyncer::DownloadInner(std::shared_ptr<DataHandler> handler,
461                                   const std::string path,
462                                   const int32_t userId)
463 {
464     auto ctx = std::make_shared<TaskContext>(handler);
465     std::vector<DKDownloadAsset> assetsToDownload;
466     int32_t ret = handler->GetDownloadAsset(path, assetsToDownload);
467     if (ret != E_OK) {
468         LOGE("handler on fetch records err %{public}d", ret);
469         return ret;
470     }
471 
472     if (downloadCallbackMgr_.FindDownload(path)) {
473         LOGI("download exist %{public}s", path.c_str());
474         return E_OK;
475     }
476 
477     auto downloadResultCallback = [this, path, assetsToDownload, handler](
478                                       std::shared_ptr<DriveKit::DKContext> context,
479                                       std::shared_ptr<const DriveKit::DKDatabase> database,
480                                       const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &results,
481                                       const DriveKit::DKError &err) {
482         this->downloadCallbackMgr_.OnDownloadedResult(path, assetsToDownload, handler, context, database, results, err);
483     };
484     auto downloadProcessCallback = [this, path](std::shared_ptr<DKContext> context, DKDownloadAsset asset,
485                                                      TotalSize totalSize, DownloadSize downloadSize) {
486         this->downloadCallbackMgr_.OnDownloadProcess(path, context, asset, totalSize, downloadSize);
487     };
488     DownloadContext dctx = {.context = ctx,
489                             .assets = assetsToDownload,
490                             .id = 0,
491                             .resultCallback = downloadResultCallback,
492                             .progressCallback = downloadProcessCallback};
493     DownloadAssets(dctx);
494     downloadCallbackMgr_.StartDonwload(path, userId, dctx.id);
495     return E_OK;
496 }
497 
OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DriveKit::DKRecord>> records,DKQueryCursor nextCursor,bool hasMore,const DKError & err)498 void DataSyncer::OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,
499     std::shared_ptr<const DKDatabase> database,
500     std::shared_ptr<std::vector<DriveKit::DKRecord>> records, DKQueryCursor nextCursor,
501     bool hasMore, const DKError &err)
502 {
503     LOGI("%{private}d %{private}s on fetch database changes", userId_, bundleName_.c_str());
504     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
505 
506     auto handler = ctx->GetHandler();
507     if (handler == nullptr) {
508         LOGE("context get handler err");
509         return;
510     }
511 
512     if (err.HasError()) {
513         LOGE("OnFetchDatabaseChanges server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
514             err.dkErrorCode);
515         if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
516             SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
517         } else if (!err.errorDetails.empty()) {
518             DKDetailErrorCode detailCode = static_cast<DKDetailErrorCode>(err.errorDetails[0].detailCode);
519             if (detailCode == DKDetailErrorCode::PARAM_INVALID || detailCode == DKDetailErrorCode::CURSOR_EXPIRED) {
520                 handler->SetChecking();
521                 Pull(handler);
522             }
523         }
524         return;
525     }
526 
527     /* pull more */
528     if (!hasMore) {
529         LOGI("no more records");
530         handler->SetTempNextCursor(nextCursor, true);
531         if (records->size() == 0) {
532             handler->FinishPull(ctx->GetBatchNo());
533         }
534     } else {
535         handler->SetTempNextCursor(nextCursor, false);
536         shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
537         int ret = AsyncRun(nexCtx, &DataSyncer::PullDatabaseChanges);
538         if (ret != E_OK) {
539             LOGE("asyn run pull database changes err %{public}d", ret);
540         }
541     }
542 
543     if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
544         LOGE("HandleOnFetchRecords failed");
545         return;
546     }
547 }
548 
OnFetchCheckRecords(const shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)549 void DataSyncer::OnFetchCheckRecords(const shared_ptr<DKContext> context,
550     shared_ptr<const DKDatabase> database, shared_ptr<std::vector<DKRecord>> records,
551     DKQueryCursor nextCursor, const DKError &err)
552 {
553     if (err.HasError()) {
554         LOGE("OnFetchCheckRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
555              err.dkErrorCode);
556         if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
557             SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
558         }
559         return;
560     }
561     LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
562     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
563     auto handler = ctx->GetHandler();
564     if (handler == nullptr) {
565         LOGE("context get handler err");
566         return;
567     }
568 
569     int32_t ret = E_OK;
570     /* pull more */
571     if (nextCursor.empty()) {
572         LOGI("no more records");
573         handler->GetTempStartCursor(nextCursor);
574         handler->SetTempNextCursor(nextCursor, true);
575     } else {
576         handler->SetTempNextCursor(nextCursor, false);
577         shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
578         ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
579         if (ret != E_OK) {
580             LOGE("asyn run pull records err %{public}d", ret);
581         }
582     }
583 
584     std::vector<DriveKit::DKRecordId> checkRecords;
585     ret = handler->GetCheckRecords(checkRecords, records);
586     if (ret != E_OK) {
587         LOGE("handler get check records err %{public}d", ret);
588         return;
589     }
590 
591     DataSyncer::PullRecordsWithId(ctx, checkRecords, false);
592 }
593 
PullRecordsWithId(shared_ptr<TaskContext> context,const std::vector<DriveKit::DKRecordId> & records,bool retry)594 void DataSyncer::PullRecordsWithId(shared_ptr<TaskContext> context, const std::vector<DriveKit::DKRecordId> &records,
595     bool retry)
596 {
597     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
598     auto handler = ctx->GetHandler();
599     if (handler == nullptr) {
600         LOGE("context get handler err");
601         return;
602     }
603 
604     FetchCondition cond;
605     handler->GetFetchCondition(cond);
606     LOGI("retry records count: %{public}u", static_cast<uint32_t>(records.size()));
607     for (auto it : records) {
608         auto callback = AsyncCallback(&DataSyncer::OnFetchRecordWithId);
609         if (callback == nullptr) {
610             LOGE("wrap on fetch records fail");
611             continue;
612         }
613         int32_t ret = sdkHelper_->FetchRecordWithId(context, cond, it, callback);
614         if (ret != E_OK) {
615             LOGE("sdk fetch records err %{public}d", ret);
616         }
617     }
618     if (!retry) {
619         handler->FinishPull(ctx->GetBatchNo());
620     }
621 }
622 
OnFetchRecordWithId(shared_ptr<DKContext> context,shared_ptr<DKDatabase> database,DKRecordId recordId,const DKRecord & record,const DKError & error)623 void DataSyncer::OnFetchRecordWithId(shared_ptr<DKContext> context, shared_ptr<DKDatabase> database,
624     DKRecordId recordId, const DKRecord &record, const DKError &error)
625 {
626     auto records = make_shared<std::vector<DKRecord>>();
627     if (error.HasError()) {
628         LOGE("has error, recordId:%s, dkErrorCode :%{public}d, serverErrorCode:%{public}d", recordId.c_str(),
629              error.dkErrorCode, error.serverErrorCode);
630         LOGI("convert to delete record");
631         DKRecord deleteRecord;
632         deleteRecord.SetRecordId(recordId);
633         deleteRecord.SetDelete(true);
634         records->push_back(deleteRecord);
635     } else {
636         LOGI("handle retry record : %s", record.GetRecordId().c_str());
637         records->push_back(record);
638     }
639     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
640     HandleOnFetchRecords(ctx, database, records, true);
641 }
642 
RetryDownloadRecords(shared_ptr<TaskContext> context)643 void DataSyncer::RetryDownloadRecords(shared_ptr<TaskContext> context)
644 {
645     auto ctx = static_pointer_cast<TaskContext>(context);
646     auto handler = ctx->GetHandler();
647     if (handler == nullptr) {
648         LOGE("context get handler err");
649         return;
650     }
651 
652     vector<DriveKit::DKDownloadAsset> assetsToDownload;
653     int32_t ret = handler->GetAssetsToDownload(assetsToDownload);
654     if (ret != E_OK) {
655         LOGE("get assetsToDownload err %{public}d", ret);
656         return;
657     }
658     if (assetsToDownload.empty()) {
659         return;
660     }
661 
662     LOGI("assetsToDownload count: %{public}zu", assetsToDownload.size());
663     DownloadContext dctx = {.context = ctx,
664                             .assets = assetsToDownload,
665                             .id = 0,
666                             .resultCallback = AsyncCallback(&DataSyncer::FetchRecordsDownloadCallback),
667                             .progressCallback = FetchRecordsDownloadProgress};
668     DownloadAssets(dctx);
669 }
670 
Push(shared_ptr<DataHandler> handler)671 int32_t DataSyncer::Push(shared_ptr<DataHandler> handler)
672 {
673     /*
674      * Although unlikely, if the first callback finds no more records available
675      * and tries to schedule before following tasks commited, the data syncer
676      * will directly schedule to the next stage, while following tasks would be
677      * commited to the next stage mistakenly.
678      * One possible solution: commit dummy task in the beginning and complete
679      * dummy task in the end.
680      */
681     shared_ptr<TaskContext> context = make_shared<TaskContext>(handler);
682 
683     /* commit a dummy task */
684     BeginTransaction();
685 
686     int32_t ret = AsyncRun(context, &DataSyncer::CreateRecords);
687     if (ret != E_OK) {
688         LOGE("async run create records err %{public}d", ret);
689         return ret;
690     }
691 
692     ret = AsyncRun(context, &DataSyncer::DeleteRecords);
693     if (ret != E_OK) {
694         LOGE("async run delete records err %{public}d", ret);
695         return ret;
696     }
697 
698     ret = AsyncRun(context, &DataSyncer::ModifyMdirtyRecords);
699     if (ret != E_OK) {
700         LOGE("async run modify mdirty records err %{public}d", ret);
701         return ret;
702     }
703 
704     ret = AsyncRun(context, &DataSyncer::ModifyFdirtyRecords);
705     if (ret != E_OK) {
706         LOGE("async run modify fdirty records err %{public}d", ret);
707         return ret;
708     }
709 
710     /* complete the dummy task */
711     EndTransaction();
712 
713     return E_OK;
714 }
715 
Init(const std::string bundleName,const int32_t userId)716 int32_t DataSyncer::Init(const std::string bundleName, const int32_t userId)
717 {
718     return E_OK;
719 }
720 
Clean(const int action)721 int32_t DataSyncer::Clean(const int action)
722 {
723     return E_OK;
724 }
725 
DisableCloud()726 int32_t DataSyncer::DisableCloud()
727 {
728     return E_OK;
729 }
730 
CleanInner(std::shared_ptr<DataHandler> handler,const int action)731 int32_t DataSyncer::CleanInner(std::shared_ptr<DataHandler> handler, const int action)
732 {
733     int ret = 0;
734     LOGD("Enter function DataSyncer::CleanInner");
735     handler->ClearCursor();
736     ret = handler->Clean(action);
737     if (ret != E_OK) {
738         LOGE("Clean file failed res:%{public}d", ret);
739         return ret;
740     }
741     return ret;
742 }
743 
CancelDownload(std::shared_ptr<DataHandler> handler)744 int32_t DataSyncer::CancelDownload(std::shared_ptr<DataHandler> handler)
745 {
746     int32_t ret = 0;
747     std::vector<int64_t> downloadIds = downloadCallbackMgr_.StopAllDownloads(userId_);
748     if (!HasSdkHelper()) {
749         LOGW(" sdk helper is null skip cancel download");
750         return ret;
751     }
752     for (const auto &id : downloadIds) {
753         ret = sdkHelper_->CancelDownloadAssets(id);
754         if (ret != NO_ERROR) {
755             LOGE("CancelDownloadAssets fail %{public}d", ret);
756             return ret;
757         }
758     }
759     return ret;
760 }
761 
CreateRecords(shared_ptr<TaskContext> context)762 void DataSyncer::CreateRecords(shared_ptr<TaskContext> context)
763 {
764     LOGI("%{private}d %{private}s creates records", userId_, bundleName_.c_str());
765 
766     auto handler = context->GetHandler();
767     if (handler == nullptr) {
768         LOGE("context get handler err");
769         return;
770     }
771 
772     /* query local */
773     vector<DKRecord> records;
774     int32_t ret = handler->GetCreatedRecords(records);
775     if (ret != E_OK) {
776         LOGE("handler get created records err %{public}d", ret);
777         return;
778     }
779 
780     /* no need upload */
781     if (records.size() == 0) {
782         return;
783     }
784 
785     if (!BatteryStatus::IsAllowUpload(syncStateManager_.GetForceFlag())) {
786         LOGE("battery status abnormal, abort upload");
787         SetErrorCodeMask(ErrorType::BATTERY_LEVEL_LOW);
788         return;
789     }
790 
791     if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
792         LOGE("network status abnormal, abort upload");
793         SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
794         return;
795     }
796 
797     /* upload */
798     auto callback = AsyncCallback(&DataSyncer::OnCreateRecords);
799     if (callback == nullptr) {
800         LOGE("wrap on create records fail");
801         return;
802     }
803     ret = sdkHelper_->CreateRecords(context, records, callback);
804     if (ret != E_OK) {
805         LOGE("sdk create records err %{public}d", ret);
806     }
807 }
808 
DeleteRecords(shared_ptr<TaskContext> context)809 void DataSyncer::DeleteRecords(shared_ptr<TaskContext> context)
810 {
811     LOGI("%{private}d %{private}s deletes records", userId_, bundleName_.c_str());
812 
813     auto handler = context->GetHandler();
814     if (handler == nullptr) {
815         LOGE("context get handler err");
816         return;
817     }
818 
819     /* query local */
820     vector<DKRecord> records;
821     int32_t ret = handler->GetDeletedRecords(records);
822     if (ret != E_OK) {
823         LOGE("handler get deleted records err %{public}d", ret);
824         return;
825     }
826 
827     /* no need upload */
828     if (records.size() == 0) {
829         return;
830     }
831 
832     if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
833         LOGE("network status abnormal, abort upload");
834         SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
835         return;
836     }
837 
838     /* upload */
839     auto callback = AsyncCallback(&DataSyncer::OnDeleteRecords);
840     if (callback == nullptr) {
841         LOGE("wrap on delete records fail");
842         return;
843     }
844     ret = sdkHelper_->DeleteRecords(context, records, callback);
845     if (ret != E_OK) {
846         LOGE("sdk delete records err %{public}d", ret);
847     }
848 }
849 
ModifyMdirtyRecords(shared_ptr<TaskContext> context)850 void DataSyncer::ModifyMdirtyRecords(shared_ptr<TaskContext> context)
851 {
852     LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
853 
854     auto handler = context->GetHandler();
855     if (handler == nullptr) {
856         LOGE("context get handler err");
857         return;
858     }
859 
860     /* query local */
861     vector<DKRecord> records;
862     int32_t ret = handler->GetMetaModifiedRecords(records);
863     if (ret != E_OK) {
864         LOGE("handler get modified records err %{public}d", ret);
865         return;
866     }
867 
868     /* no need upload */
869     if (records.size() == 0) {
870         return;
871     }
872 
873     if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
874         LOGE("network status abnormal, abort upload");
875         SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
876         return;
877     }
878 
879     /* upload */
880     auto callback = AsyncCallback(&DataSyncer::OnModifyMdirtyRecords);
881     if (callback == nullptr) {
882         LOGE("wrap on modify records fail");
883         return;
884     }
885     ret = sdkHelper_->ModifyRecords(context, records, callback);
886     if (ret != E_OK) {
887         LOGE("sdk modify records err %{public}d", ret);
888     }
889 }
890 
ModifyFdirtyRecords(shared_ptr<TaskContext> context)891 void DataSyncer::ModifyFdirtyRecords(shared_ptr<TaskContext> context)
892 {
893     LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
894 
895     auto handler = context->GetHandler();
896     if (handler == nullptr) {
897         LOGE("context get handler err");
898         return;
899     }
900 
901     /* query local */
902     vector<DKRecord> records;
903     int32_t ret = handler->GetFileModifiedRecords(records);
904     if (ret != E_OK) {
905         LOGE("handler get modified records err %{public}d", ret);
906         return;
907     }
908 
909     /* no need upload */
910     if (records.size() == 0) {
911         return;
912     }
913 
914     if ((NetworkStatus::GetNetConnStatus() != NetworkStatus::WIFI_CONNECT)) {
915         LOGE("network status abnormal, abort upload");
916         SetErrorCodeMask(ErrorType::WIFI_UNAVAILABLE);
917         return;
918     }
919 
920     /* upload */
921     auto callback = AsyncCallback(&DataSyncer::OnModifyFdirtyRecords);
922     if (callback == nullptr) {
923         LOGE("wrap on modify records fail");
924         return;
925     }
926     ret = sdkHelper_->ModifyRecords(context, records, callback);
927     if (ret != E_OK) {
928         LOGE("sdk modify records err %{public}d", ret);
929     }
930 }
931 
OnCreateRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)932 void DataSyncer::OnCreateRecords(shared_ptr<DKContext> context,
933     shared_ptr<const DKDatabase> database,
934     shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
935 {
936     LOGI("%{private}d %{private}s on create records %{public}zu", userId_,
937         bundleName_.c_str(), map->size());
938 
939     auto ctx = static_pointer_cast<TaskContext>(context);
940 
941     /* update local */
942     auto handler = ctx->GetHandler();
943     int32_t ret = handler->OnCreateRecords(*map);
944     if (ret != E_OK) {
945         LOGE("handler on create records err %{public}d", ret);
946         UpdateErrorCode(ret);
947         return;
948     } else {
949         this_thread::sleep_for(chrono::seconds(1));
950         SyncStateChangedNotify(CloudSyncState::UPLOADING, ErrorType::NO_ERROR);
951         isDataChanged_ = true;
952     }
953 
954     /* push more */
955     ret = AsyncRun(ctx, &DataSyncer::CreateRecords);
956     if (ret != E_OK) {
957         LOGE("async run create records err %{public}d", ret);
958         return;
959     }
960 }
961 
OnDeleteRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)962 void DataSyncer::OnDeleteRecords(shared_ptr<DKContext> context,
963     shared_ptr<const DKDatabase> database,
964     shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
965 {
966     LOGI("%{private}d %{private}s on delete records %{public}zu", userId_,
967         bundleName_.c_str(), map->size());
968 
969     auto ctx = static_pointer_cast<TaskContext>(context);
970 
971     /* update local */
972     auto handler = ctx->GetHandler();
973     int32_t ret = handler->OnDeleteRecords(*map);
974     if (ret != E_OK) {
975         LOGE("handler on delete records err %{public}d", ret);
976         UpdateErrorCode(ret);
977         return;
978     } else {
979         isDataChanged_ = true;
980     }
981 
982     /* push more */
983     ret = AsyncRun(ctx, &DataSyncer::DeleteRecords);
984     if (ret != E_OK) {
985         LOGE("async run delete records err %{public}d", ret);
986         return;
987     }
988 }
989 
OnModifyMdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)990 void DataSyncer::OnModifyMdirtyRecords(shared_ptr<DKContext> context,
991     shared_ptr<const DKDatabase> database,
992     shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
993     shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
994     const DKError &err)
995 {
996     LOGI("%{private}d %{private}s on modify mdirty records %{public}zu", userId_,
997         bundleName_.c_str(), saveMap->size());
998 
999     auto ctx = static_pointer_cast<TaskContext>(context);
1000 
1001     /* update local */
1002     auto handler = ctx->GetHandler();
1003     int32_t ret = handler->OnModifyMdirtyRecords(*saveMap);
1004     if (ret != E_OK) {
1005         LOGE("handler on modify records err %{public}d", ret);
1006         UpdateErrorCode(ret);
1007         return;
1008     } else {
1009         isDataChanged_ = true;
1010     }
1011 
1012     /* push more */
1013     ret = AsyncRun(ctx, &DataSyncer::ModifyMdirtyRecords);
1014     if (ret != E_OK) {
1015         LOGE("async run modify records err %{public}d", ret);
1016         return;
1017     }
1018 }
1019 
OnModifyFdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)1020 void DataSyncer::OnModifyFdirtyRecords(shared_ptr<DKContext> context,
1021     shared_ptr<const DKDatabase> database,
1022     shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
1023     shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
1024     const DKError &err)
1025 {
1026     LOGI("%{private}d %{private}s on modify fdirty records %{public}zu", userId_,
1027         bundleName_.c_str(), saveMap->size());
1028 
1029     auto ctx = static_pointer_cast<TaskContext>(context);
1030 
1031     /* update local */
1032     auto handler = ctx->GetHandler();
1033     int32_t ret = handler->OnModifyFdirtyRecords(*saveMap);
1034     if (ret != E_OK) {
1035         LOGE("handler on modify records err %{public}d", ret);
1036         UpdateErrorCode(ret);
1037         return;
1038     } else {
1039         isDataChanged_ = true;
1040     }
1041 
1042     /* push more */
1043     ret = AsyncRun(ctx, &DataSyncer::ModifyFdirtyRecords);
1044     if (ret != E_OK) {
1045         LOGE("async run modify records err %{public}d", ret);
1046         return;
1047     }
1048 }
1049 
BeginTransaction()1050 void DataSyncer::BeginTransaction()
1051 {
1052     taskRunner_->CommitDummyTask();
1053 }
1054 
EndTransaction()1055 void DataSyncer::EndTransaction()
1056 {
1057     taskRunner_->CompleteDummyTask();
1058 }
1059 
GetBundleName() const1060 std::string DataSyncer::GetBundleName() const
1061 {
1062     return bundleName_;
1063 }
1064 
GetUserId() const1065 int32_t DataSyncer::GetUserId() const
1066 {
1067     return userId_;
1068 }
1069 
GetSyncState() const1070 SyncState DataSyncer::GetSyncState() const
1071 {
1072     return syncStateManager_.GetSyncState();
1073 }
1074 
CompletePull()1075 int32_t DataSyncer::CompletePull()
1076 {
1077     LOGI("%{private}d %{private}s completes pull", userId_, bundleName_.c_str());
1078     /* call syncer manager callback */
1079     auto error = GetErrorType();
1080     if (error) {
1081         LOGE("pull failed, errorType:%{public}d", error);
1082         SyncStateChangedNotify(CloudSyncState::DOWNLOAD_FAILED, error);
1083         Complete(false);
1084     } else {
1085         /* schedule to next stage */
1086         Schedule();
1087     }
1088     return E_OK;
1089 }
1090 
CompletePush()1091 int32_t DataSyncer::CompletePush()
1092 {
1093     LOGI("%{private}d %{public}s completes push", userId_, bundleName_.c_str());
1094     /* call syncer manager callback */
1095     auto error = GetErrorType();
1096     if (error) {
1097         LOGE("pull failed, errorType:%{public}d", error);
1098         SyncStateChangedNotify(CloudSyncState::UPLOAD_FAILED, error);
1099         Complete(false);
1100     } else {
1101         /* schedule to next stage */
1102         Schedule();
1103     }
1104     return E_OK;
1105 }
1106 
CompleteAll(bool isNeedNotify)1107 void DataSyncer::CompleteAll(bool isNeedNotify)
1108 {
1109     LOGI("%{private}d %{private}s completes all", userId_, bundleName_.c_str());
1110 
1111     /* guarantee: unlock if locked */
1112     ForceUnlock();
1113 
1114     /* reset internal status */
1115     Reset();
1116 
1117     SyncState syncState = SyncState::SYNC_SUCCEED;
1118     if (errorCode_ != E_OK) {
1119         syncState = SyncState::SYNC_FAILED;
1120     }
1121 
1122     /* sys event report and free */
1123     ReportSysEvent(errorCode_);
1124     FreeSysEventData();
1125 
1126     CloudSyncState notifyState = CloudSyncState::COMPLETED;
1127     if (syncStateManager_.GetStopSyncFlag()) {
1128         notifyState = CloudSyncState::STOPPED;
1129     } else {
1130         SaveSubscription();
1131     }
1132 
1133     auto nextAction = syncStateManager_.UpdateSyncState(syncState);
1134     DataSyncerRdbStore::GetInstance().UpdateSyncState(userId_, bundleName_, syncState);
1135 
1136     if (nextAction == Action::START) {
1137         /* Retrigger sync, clear errorcode */
1138         errorCode_ = E_OK;
1139         StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1140         return;
1141     } else if (nextAction == Action::FORCE_START) {
1142         /* Retrigger sync, clear errorcode */
1143         errorCode_ = E_OK;
1144         StartSync(true, SyncTriggerType::PENDING_TRIGGER);
1145         return;
1146     } else if (nextAction == Action::CHECK) {
1147         errorCode_ = E_OK;
1148         StartSync(false, SyncTriggerType::TASK_TRIGGER);
1149         return;
1150     } else {
1151         TaskStateManager::GetInstance().CompleteTask(bundleName_, TaskType::SYNC_TASK);
1152     }
1153 
1154     /* notify sync state */
1155     if (isNeedNotify) {
1156         SyncStateChangedNotify(notifyState, ErrorType::NO_ERROR);
1157     }
1158     /* clear errorcode */
1159     errorCode_ = E_OK;
1160 }
1161 
BeginClean()1162 void DataSyncer::BeginClean()
1163 {
1164     *stopFlag_ = true;
1165     /* stop all the tasks and wait for tasks' termination */
1166     taskRunner_->ReleaseTask();
1167     TaskStateManager::GetInstance().StartTask(bundleName_, TaskType::CLEAN_TASK);
1168     /* set cleaning  state */
1169     (void)syncStateManager_.SetCleaningFlag();
1170 }
1171 
CompleteClean()1172 void DataSyncer::CompleteClean()
1173 {
1174     DeleteSubscription();
1175     DataSyncerRdbStore::GetInstance().UpdateSyncState(userId_, bundleName_, SyncState::CLEAN_SUCCEED);
1176     TaskStateManager::GetInstance().CompleteTask(bundleName_, TaskType::CLEAN_TASK);
1177     auto nextAction = syncStateManager_.UpdateSyncState(SyncState::CLEAN_SUCCEED);
1178     if (nextAction != Action::STOP) {
1179          /* Retrigger sync, clear errorcode */
1180         if (!CloudStatus::IsCloudStatusOkay(bundleName_, userId_)) {
1181             LOGE("cloud status is not OK");
1182             return;
1183         }
1184         StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1185     }
1186     *stopFlag_ = false;
1187 }
1188 
BeginDisableCloud()1189 void DataSyncer::BeginDisableCloud()
1190 {
1191     /* stop all the tasks and wait for tasks' termination */
1192     TaskStateManager::GetInstance().StartTask(bundleName_, TaskType::DISABLE_CLOUD_TASK);
1193     /* set disable cloud  state */
1194     (void)syncStateManager_.SetDisableCloudFlag();
1195 }
1196 
CompleteDisableCloud()1197 void DataSyncer::CompleteDisableCloud()
1198 {
1199     DataSyncerRdbStore::GetInstance().UpdateSyncState(userId_, bundleName_, SyncState::DISABLE_CLOUD_SUCCEED);
1200     TaskStateManager::GetInstance().CompleteTask(bundleName_, TaskType::DISABLE_CLOUD_TASK);
1201     auto nextAction = syncStateManager_.UpdateSyncState(SyncState::DISABLE_CLOUD_SUCCEED);
1202     if (nextAction != Action::STOP) {
1203          /* Retrigger sync, clear errorcode */
1204         if (!CloudStatus::IsCloudStatusOkay(bundleName_, userId_)) {
1205             LOGE("cloud status is not OK");
1206             return;
1207         }
1208         StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1209     }
1210 }
1211 
SyncStateChangedNotify(const CloudSyncState state,const ErrorType error)1212 void DataSyncer::SyncStateChangedNotify(const CloudSyncState state, const ErrorType error)
1213 {
1214     CurrentSyncState_ = state;
1215     CurrentErrorType_ = error;
1216     CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(bundleName_, state, error);
1217 }
1218 
NotifyCurrentSyncState()1219 void DataSyncer::NotifyCurrentSyncState()
1220 {
1221     CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(bundleName_, CurrentSyncState_, CurrentErrorType_);
1222 }
1223 
UpdateErrorCode(int32_t code)1224 void DataSyncer::UpdateErrorCode(int32_t code)
1225 {
1226     switch (code) {
1227         case E_SYNC_FAILED_NETWORK_NOT_AVAILABLE:
1228             SetErrorCodeMask(ErrorType::NETWORK_UNAVAILABLE);
1229             break;
1230         case E_LOCAL_STORAGE_FULL:
1231             SetErrorCodeMask(ErrorType::LOCAL_STORAGE_FULL);
1232             break;
1233         case E_CLOUD_STORAGE_FULL:
1234             SetErrorCodeMask(ErrorType::CLOUD_STORAGE_FULL);
1235             break;
1236         case E_SYNC_FAILED_BATTERY_LOW:
1237             SetErrorCodeMask(ErrorType::BATTERY_LEVEL_LOW);
1238             break;
1239         default:
1240             LOGI("ignore errcode, errcode: %{public}d", code);
1241             break;
1242     }
1243 }
1244 
SetErrorCodeMask(ErrorType errorType)1245 void DataSyncer::SetErrorCodeMask(ErrorType errorType)
1246 {
1247     errorCode_ |= 1 << static_cast<uint32_t>(errorType);
1248 }
1249 
GetErrorType()1250 ErrorType DataSyncer::GetErrorType()
1251 {
1252     if (errorCode_ == E_OK) {
1253         return ErrorType::NO_ERROR;
1254     }
1255 
1256     std::vector<ErrorType> errorTypes = {NETWORK_UNAVAILABLE, WIFI_UNAVAILABLE,   BATTERY_LEVEL_WARNING,
1257                                          BATTERY_LEVEL_LOW,   CLOUD_STORAGE_FULL, LOCAL_STORAGE_FULL};
1258     for (const auto &errorType : errorTypes) {
1259         if (errorCode_ & (1 << static_cast<uint32_t>(errorType))) {
1260             return errorType;
1261         }
1262     }
1263     LOGE("errorcode unexpected, errcode: %{public}u", errorCode_);
1264     return ErrorType::NO_ERROR;
1265 }
1266 
SaveSubscription()1267 void DataSyncer::SaveSubscription()
1268 {
1269     sdkHelper_->SaveSubscription([] (auto, shared_ptr<DriveKit::DKContainer>,
1270         DriveKit::DKSubscriptionResult & res) {
1271         if (!res.IsSuccess()) {
1272             auto err = res.GetDKError();
1273             LOGE("drivekit save subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1274                 err.dkErrorCode);
1275         }
1276     });
1277 }
1278 
DeleteSubscription()1279 void DataSyncer::DeleteSubscription()
1280 {
1281     if (!HasSdkHelper()) {
1282         LOGW(" sdk helper is null skip delete subscription");
1283         return;
1284     }
1285 
1286     sdkHelper_->DeleteSubscription([] (auto, DriveKit::DKError err) {
1287         if (err.HasError()) {
1288             LOGE("drivekit delete subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1289                 err.dkErrorCode);
1290         }
1291     });
1292 }
1293 
OptimizeStorage(const int32_t agingDays)1294 int32_t DataSyncer::OptimizeStorage(const int32_t agingDays)
1295 {
1296     return E_OK;
1297 }
1298 
HasSdkHelper()1299 bool DataSyncer::HasSdkHelper()
1300 {
1301     if (sdkHelper_ == nullptr) {
1302         return false;
1303     }
1304     return true;
1305 }
1306 
1307 // download the thumb and lcd of file when screenoff and charging
DownloadThumb(int32_t type)1308 int32_t DataSyncer::DownloadThumb(int32_t type)
1309 {
1310     return E_OK;
1311 }
1312 
DownloadThumbInner(std::shared_ptr<DataHandler> handler)1313 int32_t DataSyncer::DownloadThumbInner(std::shared_ptr<DataHandler> handler)
1314 {
1315     if (syncStateManager_.GetSyncState() == SyncState::CLEANING ||
1316         syncStateManager_.GetSyncState() == SyncState::DISABLE_CLOUD) {
1317         LOGI("downloading or cleaning, not to trigger thumb downloading");
1318         return E_STOP;
1319     }
1320 
1321     if (!TaskStateManager::GetInstance().HasTask(bundleName_, TaskType::DOWNLOAD_THUMB_TASK)) {
1322         LOGI("stop download thumb");
1323         return E_STOP;
1324     }
1325     vector<DriveKit::DKDownloadAsset> assetsToDownload;
1326     int32_t ret = handler->GetThumbToDownload(assetsToDownload);
1327     if (ret != E_OK) {
1328         LOGE("get assetsToDownload err %{public}d", ret);
1329         return E_STOP;
1330     }
1331     if (assetsToDownload.empty()) {
1332         return E_STOP;
1333     }
1334 
1335     LOGI("assetsToDownload count: %{public}zu", assetsToDownload.size());
1336     auto ctx = std::make_shared<TaskContext>(handler);
1337     auto callback = [this] (shared_ptr<DKContext> context,
1338                             shared_ptr<const DKDatabase> database,
1339                             const map<DKDownloadAsset, DKDownloadResult> &resultMap,
1340                             const DKError &err) {
1341         DataSyncer::FetchThumbDownloadCallback(context, database, resultMap, err);
1342     };
1343     DownloadContext dctx = {.context = ctx,
1344                             .assets = assetsToDownload,
1345                             .id = 0,
1346                             .resultCallback = callback,
1347                             .progressCallback = FetchRecordsDownloadProgress};
1348     std::thread([this, dctx]() { this->DownloadThumbAssets(dctx); }).detach();
1349     return E_OK;
1350 }
1351 
FetchThumbDownloadCallback(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,const map<DKDownloadAsset,DKDownloadResult> & resultMap,const DKError & err)1352 void DataSyncer::FetchThumbDownloadCallback(shared_ptr<DKContext> context,
1353                                             shared_ptr<const DKDatabase> database,
1354                                             const map<DKDownloadAsset, DKDownloadResult> &resultMap,
1355                                             const DKError &err)
1356 {
1357     auto ctx = static_pointer_cast<TaskContext>(context);
1358     auto handler = ctx->GetHandler();
1359     handler->OnDownloadAssets(resultMap);
1360     if (err.HasError()) {
1361         LOGE("DKAssetsDownloader err, localErr: %{public}d, serverErr: %{public}d", static_cast<int>(err.dkErrorCode),
1362              err.serverErrorCode);
1363         StopDownloadThumb();
1364         return;
1365     }
1366     LOGI("DKAssetsDownloader ok");
1367     if (handler->GetDownloadType() == DataHandler::DownloadThmType::SCREENOFF_TRIGGER) {
1368         if (!CheckScreenAndWifi()) {
1369             LOGI("download thumb condition is not met");
1370             StopDownloadThumb();
1371             return;
1372         }
1373     }
1374     if (DownloadThumbInner(handler) == E_STOP) {
1375         StopDownloadThumb();
1376     }
1377 }
1378 
CheckScreenAndWifi()1379 bool DataSyncer::CheckScreenAndWifi()
1380 {
1381     if (NetworkStatus::GetNetConnStatus() == NetworkStatus::WIFI_CONNECT) {
1382         return true;
1383     }
1384     return false;
1385 }
1386 
CleanCache(const string & uri)1387 int32_t DataSyncer::CleanCache(const string &uri)
1388 {
1389     return E_OK;
1390 }
1391 
StopDownloadThumb()1392 void DataSyncer::StopDownloadThumb()
1393 {
1394     return;
1395 }
1396 
StopUploadAssets()1397 void DataSyncer::StopUploadAssets()
1398 {
1399     sdkHelper_->Release();
1400 }
1401 
InitSysEventData()1402 int32_t DataSyncer::InitSysEventData()
1403 {
1404     return E_OK;
1405 }
1406 
FreeSysEventData()1407 void DataSyncer::FreeSysEventData()
1408 {
1409 }
1410 
ReportSysEvent(uint32_t code)1411 void DataSyncer::ReportSysEvent(uint32_t code)
1412 {
1413 }
1414 
SetFullSyncSysEvent()1415 void DataSyncer::SetFullSyncSysEvent()
1416 {
1417 }
1418 
SetCheckSysEvent()1419 void DataSyncer::SetCheckSysEvent()
1420 {
1421 }
1422 } // namespace CloudSync
1423 } // namespace FileManagement
1424 } // namespace OHOS
1425