• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "data_syncer.h"
17 
18 #include <functional>
19 
20 #include "data_sync_const.h"
21 #include "data_sync_notifier.h"
22 #include "dfs_error.h"
23 #include "ipc/cloud_sync_callback_manager.h"
24 #include "sdk_helper.h"
25 #include "sync_rule/battery_status.h"
26 #include "utils_log.h"
27 
28 namespace OHOS {
29 namespace FileManagement {
30 namespace CloudSync {
31 using namespace std;
32 using namespace placeholders;
33 using namespace DriveKit;
34 using ChangeType = AAFwk::ChangeInfo::ChangeType;
35 
DataSyncer(const std::string bundleName,const int32_t userId)36 DataSyncer::DataSyncer(const std::string bundleName, const int32_t userId)
37     : bundleName_(bundleName), userId_(userId)
38 {
39     /* alloc task runner */
40     taskRunner_ = DelayedSingleton<TaskManager>::GetInstance()->AllocRunner(userId,
41         bundleName, bind(&DataSyncer::Schedule, this));
42 }
43 
~DataSyncer()44 DataSyncer::~DataSyncer()
45 {
46     /* release task runner */
47     DelayedSingleton<TaskManager>::GetInstance()->ReleaseRunner(userId_, bundleName_);
48 }
49 
AsyncRun(std::shared_ptr<TaskContext> context,void (DataSyncer::* f)(std::shared_ptr<TaskContext>))50 int32_t DataSyncer::AsyncRun(std::shared_ptr<TaskContext> context,
51     void(DataSyncer::*f)(std::shared_ptr<TaskContext>))
52 {
53     return taskRunner_->AsyncRun<DataSyncer>(context, f, this);
54 }
55 
56 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...))57 function<RET(ARGS...)> DataSyncer::AsyncCallback(RET(T::*f)(ARGS...))
58 {
59     return taskRunner_->AsyncCallback<DataSyncer>(f, this);
60 }
61 
StartSync(bool forceFlag,SyncTriggerType triggerType)62 int32_t DataSyncer::StartSync(bool forceFlag, SyncTriggerType triggerType)
63 {
64     LOGI("%{private}d %{public}s starts sync, isforceSync %{public}d, triggerType %{public}d",
65         userId_, bundleName_.c_str(), forceFlag, triggerType);
66 
67     /* only one specific data sycner running at a time */
68     if (syncStateManager_.CheckAndSetPending(forceFlag)) {
69         LOGI("syncing, pending sync");
70         return E_PENDING;
71     }
72 
73     /* start data sync */
74     Schedule();
75 
76     return E_OK;
77 }
78 
StopSync(SyncTriggerType triggerType)79 int32_t DataSyncer::StopSync(SyncTriggerType triggerType)
80 {
81     LOGI("%{private}d %{public}s stops sync, trigger stop sync, type:%{public}d",
82         userId_, bundleName_.c_str(), triggerType);
83 
84     syncStateManager_.SetStopSyncFlag();
85     Abort();
86     return E_OK;
87 }
88 
Lock()89 int32_t DataSyncer::Lock()
90 {
91     lock_guard<mutex> lock(lock_.mtx);
92     if (lock_.count > 0) {
93         lock_.count++;
94         return E_OK;
95     }
96 
97     /* lock: device-reentrant */
98     int32_t ret = sdkHelper_->GetLock(lock_.lock);
99     if (ret != E_OK) {
100         LOGE("sdk helper get lock err %{public}d", ret);
101         lock_.lock = { 0 };
102         return ret;
103     }
104     lock_.count++;
105 
106     return ret;
107 }
108 
Unlock()109 void DataSyncer::Unlock()
110 {
111     lock_guard<mutex> lock(lock_.mtx);
112     lock_.count--;
113     if (lock_.count > 0) {
114         return;
115     }
116 
117     /* sdk unlock */
118     sdkHelper_->DeleteLock(lock_.lock);
119 
120     /* reset sdk lock */
121     lock_.lock = { 0 };
122 }
123 
ForceUnlock()124 void DataSyncer::ForceUnlock()
125 {
126     lock_guard<mutex> lock(lock_.mtx);
127     if (lock_.count == 0) {
128         return;
129     }
130     sdkHelper_->DeleteLock(lock_.lock);
131     lock_.lock = { 0 };
132     lock_.count = 0;
133 }
134 
StartDownloadFile(const std::string path,const int32_t userId)135 int32_t DataSyncer::StartDownloadFile(const std::string path, const int32_t userId)
136 {
137     return E_OK;
138 }
139 
StopDownloadFile(const std::string path,const int32_t userId)140 int32_t DataSyncer::StopDownloadFile(const std::string path, const int32_t userId)
141 {
142     downloadCallbackMgr_.StopDonwload(path, userId);
143     return E_OK;
144 }
145 
RegisterDownloadFileCallback(const int32_t userId,const sptr<ICloudDownloadCallback> downloadCallback)146 int32_t DataSyncer::RegisterDownloadFileCallback(const int32_t userId,
147                                                  const sptr<ICloudDownloadCallback> downloadCallback)
148 {
149     downloadCallbackMgr_.RegisterCallback(userId, downloadCallback);
150     return E_OK;
151 }
152 
UnregisterDownloadFileCallback(const int32_t userId)153 int32_t DataSyncer::UnregisterDownloadFileCallback(const int32_t userId)
154 {
155     downloadCallbackMgr_.UnregisterCallback(userId);
156     return E_OK;
157 }
158 
Abort()159 void DataSyncer::Abort()
160 {
161     LOGI("%{private}d %{private}s aborts", userId_, bundleName_.c_str());
162     thread ([this]() {
163         /* stop all the tasks and wait for tasks' termination */
164         if (!taskRunner_->StopAndWaitFor()) {
165             LOGE("wait for tasks stop fail");
166         }
167         /* call the syncer manager's callback for notification */
168         CompleteAll();
169     }).detach();
170 }
171 
SetSdkHelper(shared_ptr<SdkHelper> & sdkHelper)172 void DataSyncer::SetSdkHelper(shared_ptr<SdkHelper> &sdkHelper)
173 {
174     sdkHelper_ = sdkHelper;
175 }
176 
Pull(shared_ptr<DataHandler> handler)177 int32_t DataSyncer::Pull(shared_ptr<DataHandler> handler)
178 {
179     LOGI("%{private}d %{private}s pull", userId_, bundleName_.c_str());
180 
181     shared_ptr<TaskContext> context = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
182 
183     RetryDownloadRecords(context);
184     vector<DKRecordId> records;
185     int32_t ret = handler->GetRetryRecords(records);
186     if (ret == E_OK) {
187         DataSyncer::PullRecordsWithId(context, records, true);
188     }
189 
190     /* Full synchronization and incremental synchronization */
191     if (handler->IsPullRecords()) {
192         ret = AsyncRun(context, &DataSyncer::PullRecords);
193     } else {
194         ret = AsyncRun(context, &DataSyncer::PullDatabaseChanges);
195     }
196     if (ret != E_OK) {
197         LOGE("asyn run pull records err %{public}d", ret);
198         return ret;
199     }
200 
201     return E_OK;
202 }
203 
204 /*
205  * Although records from the cloud should be all pulled down before
206  * uploading the local change, conflicts might be rare in most cases,
207  * and the syncer would just move on.
208  */
PullRecords(shared_ptr<TaskContext> context)209 void DataSyncer::PullRecords(shared_ptr<TaskContext> context)
210 {
211     LOGI("%{private}d %{private}s pull records", userId_, bundleName_.c_str());
212 
213     /* get query condition here */
214     auto handler = context->GetHandler();
215     if (handler == nullptr) {
216         LOGE("context get handler err");
217         return;
218     }
219 
220     FetchCondition cond;
221     handler->GetFetchCondition(cond);
222 
223     DKQueryCursor tempStartCursor;
224     handler->GetTempStartCursor(tempStartCursor);
225     if (tempStartCursor.empty()) {
226         sdkHelper_->GetStartCursor(cond.recordType, tempStartCursor);
227         handler->SetTempStartCursor(tempStartCursor);
228     }
229 
230     DKQueryCursor nextCursor;
231     handler->GetNextCursor(nextCursor);
232 
233     SdkHelper::FetchRecordsCallback callback = nullptr;
234     if (handler->GetCheckFlag()) {
235         callback = AsyncCallback(&DataSyncer::OnFetchCheckRecords);
236     } else {
237         callback = AsyncCallback(&DataSyncer::OnFetchRecords);
238     }
239     if (callback == nullptr) {
240         LOGE("wrap on fetch records fail");
241         return;
242     }
243 
244     int32_t ret = sdkHelper_->FetchRecords(context, cond, nextCursor, callback);
245     if (ret != E_OK) {
246         LOGE("sdk fetch records err %{public}d", ret);
247     }
248 }
249 
PullDatabaseChanges(shared_ptr<TaskContext> context)250 void DataSyncer::PullDatabaseChanges(shared_ptr<TaskContext> context)
251 {
252     LOGI("%{private}d %{private}s pull database changes", userId_, bundleName_.c_str());
253 
254     auto callback = AsyncCallback(&DataSyncer::OnFetchDatabaseChanges);
255     if (callback == nullptr) {
256         LOGE("wrap on fetch records fail");
257         return;
258     }
259 
260     auto handler = context->GetHandler();
261     if (handler == nullptr) {
262         LOGE("context get handler err");
263         return;
264     }
265     DKQueryCursor nextCursor;
266     handler->GetNextCursor(nextCursor);
267 
268     FetchCondition cond;
269     handler->GetFetchCondition(cond);
270     int32_t ret = sdkHelper_->FetchDatabaseChanges(context, cond, nextCursor, callback);
271     if (ret != E_OK) {
272         LOGE("sdk fetch records err %{public}d", ret);
273     }
274 }
275 
276 struct DownloadContext {
277     std::shared_ptr<DKContext> context;
278     vector<DKDownloadAsset> assets;
279     DriveKit::DKDownloadId id;
280     std::function<void(std::shared_ptr<DriveKit::DKContext>,
281                        std::shared_ptr<const DriveKit::DKDatabase>,
282                        const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &,
283                        const DriveKit::DKError &)>
284         resultCallback;
285     std::function<
286         void(std::shared_ptr<DriveKit::DKContext>, DriveKit::DKDownloadAsset, TotalSize, DriveKit::DownloadSize)>
287         progressCallback;
288 };
289 
DownloadAssets(DownloadContext & ctx)290 void DataSyncer::DownloadAssets(DownloadContext &ctx)
291 {
292     if (ctx.resultCallback == nullptr) {
293         LOGE("resultCallback nullptr");
294         return;
295     }
296     if (ctx.progressCallback == nullptr) {
297         LOGE("progressCallback nullptr");
298         return;
299     }
300     sdkHelper_->DownloadAssets(ctx.context, ctx.assets, {}, ctx.id, ctx.resultCallback, ctx.progressCallback);
301 }
302 
ThumbDownloadCallback(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,const map<DKDownloadAsset,DKDownloadResult> & resultMap,const DKError & err)303 static void ThumbDownloadCallback(shared_ptr<DKContext> context,
304                                   shared_ptr<const DKDatabase> database,
305                                   const map<DKDownloadAsset, DKDownloadResult> &resultMap,
306                                   const DKError &err)
307 {
308     if (err.HasError()) {
309         LOGE("DKAssetsDownloader err, localErr: %{public}d, serverErr: %{public}d", static_cast<int>(err.dkErrorCode),
310              err.serverErrorCode);
311     } else {
312         LOGI("DKAssetsDownloader ok");
313     }
314 
315     auto ctx = static_pointer_cast<TaskContext>(context);
316     auto handler = ctx->GetHandler();
317     handler->OnDownloadThumb(resultMap);
318 }
319 
ThumbDownLoadProgress(shared_ptr<DKContext> context,DKDownloadAsset asset,TotalSize total,DownloadSize download)320 void ThumbDownLoadProgress(shared_ptr<DKContext> context, DKDownloadAsset asset, TotalSize total, DownloadSize download)
321 {
322     LOGI("record %s %{public}s download progress", asset.recordId.c_str(), asset.fieldKey.c_str());
323     if (total == download) {
324         auto ctx = static_pointer_cast<TaskContext>(context);
325         auto handler = ctx->GetHandler();
326         handler->OnDownloadThumb(asset);
327     }
328 }
329 
HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,bool checkOrRetry)330 int DataSyncer::HandleOnFetchRecords(const std::shared_ptr<DownloadTaskContext> context,
331     std::shared_ptr<const DKDatabase> database, std::shared_ptr<std::vector<DKRecord>> records, bool checkOrRetry)
332 {
333     if (records->size() == 0) {
334         LOGI("no records to handle");
335         return E_OK;
336     }
337 
338     OnFetchParams onFetchParams;
339     auto ctx = static_pointer_cast<TaskContext>(context);
340     auto handler = ctx->GetHandler();
341     if (handler == nullptr) {
342         LOGE("context get handler err");
343         return E_CONTEXT;
344     }
345 
346     if (handler->IsPullRecords() && !checkOrRetry) {
347         onFetchParams.totalPullCount = context->GetBatchNo() * handler->GetRecordSize();
348     }
349 
350     int32_t ret = handler->OnFetchRecords(records, onFetchParams);
351     DownloadContext dctx = {.context = context,
352                             .assets = onFetchParams.assetsToDownload,
353                             .id = 0,
354                             .resultCallback = ThumbDownloadCallback,
355                             .progressCallback = ThumbDownLoadProgress};
356     DownloadAssets(dctx);
357 
358     if (ret != E_OK) {
359         LOGE("handler on fetch records err %{public}d", ret);
360     }
361     if (!checkOrRetry) {
362         handler->FinishPull(context->GetBatchNo());
363     }
364     return ret;
365 }
366 
OnFetchRecords(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)367 void DataSyncer::OnFetchRecords(const std::shared_ptr<DKContext> context, std::shared_ptr<const DKDatabase> database,
368     std::shared_ptr<std::vector<DKRecord>> records, DKQueryCursor nextCursor, const DKError &err)
369 {
370     if (err.HasError()) {
371         LOGE("OnFetchRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode, err.dkErrorCode);
372         if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
373             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
374         }
375         return;
376     }
377 
378     LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
379     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
380     auto handler = ctx->GetHandler();
381     if (handler == nullptr) {
382         LOGE("context get handler err");
383         return;
384     }
385     if (ctx->GetBatchNo() == 0) {
386         handler->SetRecordSize(records->size());
387     }
388     /* pull more */
389     if (nextCursor.empty()) {
390         LOGI("no more records");
391         handler->GetTempStartCursor(nextCursor);
392         handler->SetTempNextCursor(nextCursor, true);
393         if (records->size() == 0) {
394             handler->FinishPull(ctx->GetBatchNo());
395         }
396     } else {
397         handler->SetTempNextCursor(nextCursor, false);
398         shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
399         int ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
400         if (ret != E_OK) {
401             LOGE("asyn run pull records err %{public}d", ret);
402         }
403     }
404 
405     if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
406         LOGE("HandleOnFetchRecords failed");
407     }
408 }
409 
DownloadInner(std::shared_ptr<DataHandler> handler,const std::string path,const int32_t userId)410 int32_t DataSyncer::DownloadInner(std::shared_ptr<DataHandler> handler,
411                                   const std::string path,
412                                   const int32_t userId)
413 {
414     auto ctx = std::make_shared<TaskContext>(handler);
415     std::vector<DKDownloadAsset> assetsToDownload;
416     int32_t ret = handler->GetDownloadAsset(path, assetsToDownload);
417     if (ret != E_OK) {
418         LOGE("handler on fetch records err %{public}d", ret);
419         return ret;
420     }
421 
422     downloadCallbackMgr_.StartDonwload(path, userId);
423 
424     auto downloadResultCallback = [this, path, assetsToDownload, handler](
425                                       std::shared_ptr<DriveKit::DKContext> context,
426                                       std::shared_ptr<const DriveKit::DKDatabase> database,
427                                       const std::map<DriveKit::DKDownloadAsset, DriveKit::DKDownloadResult> &results,
428                                       const DriveKit::DKError &err) {
429         this->downloadCallbackMgr_.OnDownloadedResult(path, assetsToDownload, handler, context, database, results, err);
430     };
431     auto downloadProcessCallback = [this, path](std::shared_ptr<DKContext> context, DKDownloadAsset asset,
432                                                      TotalSize totalSize, DownloadSize downloadSize) {
433         this->downloadCallbackMgr_.OnDownloadProcess(path, context, asset, totalSize, downloadSize);
434     };
435     DownloadContext dctx = {.context = ctx,
436                             .assets = assetsToDownload,
437                             .id = 0,
438                             .resultCallback = downloadResultCallback,
439                             .progressCallback = downloadProcessCallback};
440     DownloadAssets(dctx);
441     return E_OK;
442 }
443 
OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,std::shared_ptr<const DKDatabase> database,std::shared_ptr<std::vector<DriveKit::DKRecord>> records,DKQueryCursor nextCursor,bool hasMore,const DKError & err)444 void DataSyncer::OnFetchDatabaseChanges(const std::shared_ptr<DKContext> context,
445     std::shared_ptr<const DKDatabase> database,
446     std::shared_ptr<std::vector<DriveKit::DKRecord>> records, DKQueryCursor nextCursor,
447     bool hasMore, const DKError &err)
448 {
449     LOGI("%{private}d %{private}s on fetch database changes", userId_, bundleName_.c_str());
450     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
451 
452     auto handler = ctx->GetHandler();
453     if (handler == nullptr) {
454         LOGE("context get handler err");
455         return;
456     }
457 
458     if (err.HasError()) {
459         LOGE("OnFetchDatabaseChanges server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
460             err.dkErrorCode);
461         if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
462             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
463         } else if (!err.errorDetails.empty()) {
464             DKDetailErrorCode detailCode = static_cast<DKDetailErrorCode>(err.errorDetails[0].detailCode);
465             if (detailCode == DKDetailErrorCode::PARAM_INVALID || detailCode == DKDetailErrorCode::CURSOR_EXPIRED) {
466                 handler->SetChecking();
467                 Pull(handler);
468             }
469         }
470         return;
471     }
472 
473     /* pull more */
474     if (!hasMore) {
475         LOGI("no more records");
476         handler->SetTempNextCursor(nextCursor, true);
477         if (records->size() == 0) {
478             handler->FinishPull(ctx->GetBatchNo());
479         }
480     } else {
481         handler->SetTempNextCursor(nextCursor, false);
482         shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
483         int ret = AsyncRun(nexCtx, &DataSyncer::PullDatabaseChanges);
484         if (ret != E_OK) {
485             LOGE("asyn run pull database changes err %{public}d", ret);
486         }
487     }
488 
489     if (HandleOnFetchRecords(ctx, database, records, false) != E_OK) {
490         LOGE("HandleOnFetchRecords failed");
491         return;
492     }
493 }
494 
OnFetchCheckRecords(const shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<std::vector<DKRecord>> records,DKQueryCursor nextCursor,const DKError & err)495 void DataSyncer::OnFetchCheckRecords(const shared_ptr<DKContext> context,
496     shared_ptr<const DKDatabase> database, shared_ptr<std::vector<DKRecord>> records,
497     DKQueryCursor nextCursor, const DKError &err)
498 {
499     if (err.HasError()) {
500         LOGE("OnFetchCheckRecords server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
501              err.dkErrorCode);
502         if (static_cast<DKServerErrorCode>(err.serverErrorCode) == DKServerErrorCode::NETWORK_ERROR) {
503             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
504         }
505         return;
506     }
507     LOGI("%{private}d %{private}s on fetch records", userId_, bundleName_.c_str());
508     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
509     auto handler = ctx->GetHandler();
510     if (handler == nullptr) {
511         LOGE("context get handler err");
512         return;
513     }
514 
515     int32_t ret = E_OK;
516     /* pull more */
517     if (nextCursor.empty()) {
518         LOGI("no more records");
519         handler->GetTempStartCursor(nextCursor);
520         handler->SetTempNextCursor(nextCursor, true);
521     } else {
522         handler->SetTempNextCursor(nextCursor, false);
523         shared_ptr<DownloadTaskContext> nexCtx = make_shared<DownloadTaskContext>(handler, handler->GetBatchNo());
524         ret = AsyncRun(nexCtx, &DataSyncer::PullRecords);
525         if (ret != E_OK) {
526             LOGE("asyn run pull records err %{public}d", ret);
527         }
528     }
529 
530     std::vector<DriveKit::DKRecordId> checkRecords;
531     ret = handler->GetCheckRecords(checkRecords, records);
532     if (ret != E_OK) {
533         LOGE("handler get check records err %{public}d", ret);
534         return;
535     }
536 
537     DataSyncer::PullRecordsWithId(ctx, checkRecords, false);
538 }
539 
PullRecordsWithId(shared_ptr<TaskContext> context,const std::vector<DriveKit::DKRecordId> & records,bool retry)540 void DataSyncer::PullRecordsWithId(shared_ptr<TaskContext> context, const std::vector<DriveKit::DKRecordId> &records,
541     bool retry)
542 {
543     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
544     auto handler = ctx->GetHandler();
545     if (handler == nullptr) {
546         LOGE("context get handler err");
547         return;
548     }
549 
550     FetchCondition cond;
551     handler->GetFetchCondition(cond);
552     LOGI("retry records count: %{public}u", static_cast<uint32_t>(records.size()));
553     for (auto it : records) {
554         auto callback = AsyncCallback(&DataSyncer::OnFetchRecordWithId);
555         if (callback == nullptr) {
556             LOGE("wrap on fetch records fail");
557             continue;
558         }
559         int32_t ret = sdkHelper_->FetchRecordWithId(context, cond, it, callback);
560         if (ret != E_OK) {
561             LOGE("sdk fetch records err %{public}d", ret);
562         }
563     }
564     if (!retry) {
565         handler->FinishPull(ctx->GetBatchNo());
566     }
567 }
568 
OnFetchRecordWithId(shared_ptr<DKContext> context,shared_ptr<DKDatabase> database,DKRecordId recordId,const DKRecord & record,const DKError & error)569 void DataSyncer::OnFetchRecordWithId(shared_ptr<DKContext> context, shared_ptr<DKDatabase> database,
570     DKRecordId recordId, const DKRecord &record, const DKError &error)
571 {
572     auto records = make_shared<std::vector<DKRecord>>();
573     if (error.HasError()) {
574         LOGE("has error, recordId:%s, dkErrorCode :%{public}d, serverErrorCode:%{public}d", recordId.c_str(),
575              error.dkErrorCode, error.serverErrorCode);
576         LOGI("convert to delete record");
577         DKRecord deleteRecord;
578         deleteRecord.SetRecordId(recordId);
579         deleteRecord.SetDelete(true);
580         records->push_back(deleteRecord);
581     } else {
582         LOGI("handle retry record : %s", record.GetRecordId().c_str());
583         records->push_back(record);
584     }
585     auto ctx = static_pointer_cast<DownloadTaskContext>(context);
586     HandleOnFetchRecords(ctx, database, records, true);
587 }
588 
RetryDownloadRecords(shared_ptr<TaskContext> context)589 void DataSyncer::RetryDownloadRecords(shared_ptr<TaskContext> context)
590 {
591     auto ctx = static_pointer_cast<TaskContext>(context);
592     auto handler = ctx->GetHandler();
593     if (handler == nullptr) {
594         LOGE("context get handler err");
595         return;
596     }
597 
598     vector<DriveKit::DKDownloadAsset> assetsToDownload;
599     int32_t ret = handler->GetAssetsToDownload(assetsToDownload);
600     if (ret != E_OK) {
601         LOGE("get assetsToDownload err %{public}d", ret);
602         return;
603     }
604     if (assetsToDownload.empty()) {
605         return;
606     }
607 
608     LOGI("assetsToDownload count: %{public}zu", assetsToDownload.size());
609     DownloadContext dctx = {.context = ctx,
610                             .assets = assetsToDownload,
611                             .id = 0,
612                             .resultCallback = ThumbDownloadCallback,
613                             .progressCallback = ThumbDownLoadProgress};
614     DownloadAssets(dctx);
615 }
616 
Push(shared_ptr<DataHandler> handler)617 int32_t DataSyncer::Push(shared_ptr<DataHandler> handler)
618 {
619     /*
620      * Although unlikely, if the first callback finds no more records available
621      * and tries to schedule before following tasks commited, the data syncer
622      * will directly schedule to the next stage, while following tasks would be
623      * commited to the next stage mistakenly.
624      * One possible solution: commit dummy task in the beginning and complete
625      * dummy task in the end.
626      */
627     shared_ptr<TaskContext> context = make_shared<TaskContext>(handler);
628 
629     /* commit a dummy task */
630     BeginTransaction();
631 
632     int32_t ret = AsyncRun(context, &DataSyncer::CreateRecords);
633     if (ret != E_OK) {
634         LOGE("async run create records err %{public}d", ret);
635         return ret;
636     }
637 
638     ret = AsyncRun(context, &DataSyncer::DeleteRecords);
639     if (ret != E_OK) {
640         LOGE("async run delete records err %{public}d", ret);
641         return ret;
642     }
643 
644     ret = AsyncRun(context, &DataSyncer::ModifyMdirtyRecords);
645     if (ret != E_OK) {
646         LOGE("async run modify mdirty records err %{public}d", ret);
647         return ret;
648     }
649 
650     ret = AsyncRun(context, &DataSyncer::ModifyFdirtyRecords);
651     if (ret != E_OK) {
652         LOGE("async run modify fdirty records err %{public}d", ret);
653         return ret;
654     }
655 
656     /* complete the dummy task */
657     EndTransaction();
658 
659     return E_OK;
660 }
661 
Init(const std::string bundleName,const int32_t userId)662 int32_t DataSyncer::Init(const std::string bundleName, const int32_t userId)
663 {
664     return E_OK;
665 }
666 
Clean(const int action)667 int32_t DataSyncer::Clean(const int action)
668 {
669     return E_OK;
670 }
671 
CleanInner(std::shared_ptr<DataHandler> handler,const int action)672 int32_t DataSyncer::CleanInner(std::shared_ptr<DataHandler> handler, const int action)
673 {
674     LOGD("Enter function DataSyncer::CleanInner");
675     handler->ClearCursor();
676     int res = handler->Clean(action);
677     if (res != E_OK) {
678         LOGE("Clean file failed res:%{public}d", res);
679     }
680     return res;
681 }
682 
CreateRecords(shared_ptr<TaskContext> context)683 void DataSyncer::CreateRecords(shared_ptr<TaskContext> context)
684 {
685     LOGI("%{private}d %{private}s creates records", userId_, bundleName_.c_str());
686 
687     auto handler = context->GetHandler();
688     if (handler == nullptr) {
689         LOGE("context get handler err");
690         return;
691     }
692 
693     /* query local */
694     vector<DKRecord> records;
695     int32_t ret = handler->GetCreatedRecords(records);
696     if (ret != E_OK) {
697         LOGE("handler get created records err %{public}d", ret);
698         return;
699     }
700 
701     /* no need upload */
702     if (records.size() == 0) {
703         return;
704     }
705 
706     if (!BatteryStatus::IsAllowUpload(syncStateManager_.GetForceFlag())) {
707         LOGE("battery status abnormal, abort upload");
708         SetErrorCodeMask(errorCode_, ErrorType::BATTERY_LEVEL_LOW);
709         return;
710     }
711 
712     /* upload */
713     auto callback = AsyncCallback(&DataSyncer::OnCreateRecords);
714     if (callback == nullptr) {
715         LOGE("wrap on create records fail");
716         return;
717     }
718     ret = sdkHelper_->CreateRecords(context, records, callback);
719     if (ret != E_OK) {
720         LOGE("sdk create records err %{public}d", ret);
721     }
722 }
723 
DeleteRecords(shared_ptr<TaskContext> context)724 void DataSyncer::DeleteRecords(shared_ptr<TaskContext> context)
725 {
726     LOGI("%{private}d %{private}s deletes records", userId_, bundleName_.c_str());
727 
728     auto handler = context->GetHandler();
729     if (handler == nullptr) {
730         LOGE("context get handler err");
731         return;
732     }
733 
734     /* query local */
735     vector<DKRecord> records;
736     int32_t ret = handler->GetDeletedRecords(records);
737     if (ret != E_OK) {
738         LOGE("handler get deleted records err %{public}d", ret);
739         return;
740     }
741 
742     /* no need upload */
743     if (records.size() == 0) {
744         return;
745     }
746 
747     /* upload */
748     auto callback = AsyncCallback(&DataSyncer::OnDeleteRecords);
749     if (callback == nullptr) {
750         LOGE("wrap on delete records fail");
751         return;
752     }
753     ret = sdkHelper_->DeleteRecords(context, records, callback);
754     if (ret != E_OK) {
755         LOGE("sdk delete records err %{public}d", ret);
756     }
757 }
758 
ModifyMdirtyRecords(shared_ptr<TaskContext> context)759 void DataSyncer::ModifyMdirtyRecords(shared_ptr<TaskContext> context)
760 {
761     LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
762 
763     auto handler = context->GetHandler();
764     if (handler == nullptr) {
765         LOGE("context get handler err");
766         return;
767     }
768 
769     /* query local */
770     vector<DKRecord> records;
771     int32_t ret = handler->GetMetaModifiedRecords(records);
772     if (ret != E_OK) {
773         LOGE("handler get modified records err %{public}d", ret);
774         return;
775     }
776 
777     /* no need upload */
778     if (records.size() == 0) {
779         return;
780     }
781 
782     /* upload */
783     auto callback = AsyncCallback(&DataSyncer::OnModifyMdirtyRecords);
784     if (callback == nullptr) {
785         LOGE("wrap on modify records fail");
786         return;
787     }
788     ret = sdkHelper_->ModifyRecords(context, records, callback);
789     if (ret != E_OK) {
790         LOGE("sdk modify records err %{public}d", ret);
791     }
792 }
793 
ModifyFdirtyRecords(shared_ptr<TaskContext> context)794 void DataSyncer::ModifyFdirtyRecords(shared_ptr<TaskContext> context)
795 {
796     LOGI("%{private}d %{private}s modifies records", userId_, bundleName_.c_str());
797 
798     auto handler = context->GetHandler();
799     if (handler == nullptr) {
800         LOGE("context get handler err");
801         return;
802     }
803 
804     /* query local */
805     vector<DKRecord> records;
806     int32_t ret = handler->GetFileModifiedRecords(records);
807     if (ret != E_OK) {
808         LOGE("handler get modified records err %{public}d", ret);
809         return;
810     }
811 
812     /* no need upload */
813     if (records.size() == 0) {
814         return;
815     }
816 
817     /* upload */
818     auto callback = AsyncCallback(&DataSyncer::OnModifyFdirtyRecords);
819     if (callback == nullptr) {
820         LOGE("wrap on modify records fail");
821         return;
822     }
823     ret = sdkHelper_->ModifyRecords(context, records, callback);
824     if (ret != E_OK) {
825         LOGE("sdk modify records err %{public}d", ret);
826     }
827 }
828 
OnCreateRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)829 void DataSyncer::OnCreateRecords(shared_ptr<DKContext> context,
830     shared_ptr<const DKDatabase> database,
831     shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
832 {
833     LOGI("%{private}d %{private}s on create records %{public}zu", userId_,
834         bundleName_.c_str(), map->size());
835 
836     auto ctx = static_pointer_cast<TaskContext>(context);
837 
838     /* update local */
839     auto handler = ctx->GetHandler();
840     int32_t ret = handler->OnCreateRecords(*map);
841     if (ret != E_OK) {
842         LOGE("handler on create records err %{public}d", ret);
843         if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
844             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
845         } else if (ret == E_CLOUD_STORAGE_FULL) {
846             SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
847         }
848         return;
849     }
850 
851     /* push more */
852     ret = AsyncRun(ctx, &DataSyncer::CreateRecords);
853     if (ret != E_OK) {
854         LOGE("async run create records err %{public}d", ret);
855         return;
856     }
857 }
858 
OnDeleteRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> map,const DKError & err)859 void DataSyncer::OnDeleteRecords(shared_ptr<DKContext> context,
860     shared_ptr<const DKDatabase> database,
861     shared_ptr<const map<DKRecordId, DKRecordOperResult>> map, const DKError &err)
862 {
863     LOGI("%{private}d %{private}s on delete records %{public}zu", userId_,
864         bundleName_.c_str(), map->size());
865 
866     auto ctx = static_pointer_cast<TaskContext>(context);
867 
868     /* update local */
869     auto handler = ctx->GetHandler();
870     int32_t ret = handler->OnDeleteRecords(*map);
871     if (ret != E_OK) {
872         LOGE("handler on delete records err %{public}d", ret);
873         if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
874             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
875         } else if (ret == E_CLOUD_STORAGE_FULL) {
876             SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
877         }
878         return;
879     }
880 
881     /* push more */
882     ret = AsyncRun(ctx, &DataSyncer::DeleteRecords);
883     if (ret != E_OK) {
884         LOGE("async run delete records err %{public}d", ret);
885         return;
886     }
887 }
888 
OnModifyMdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)889 void DataSyncer::OnModifyMdirtyRecords(shared_ptr<DKContext> context,
890     shared_ptr<const DKDatabase> database,
891     shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
892     shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
893     const DKError &err)
894 {
895     LOGI("%{private}d %{private}s on modify mdirty records %{public}zu", userId_,
896         bundleName_.c_str(), saveMap->size());
897 
898     auto ctx = static_pointer_cast<TaskContext>(context);
899 
900     /* update local */
901     auto handler = ctx->GetHandler();
902     int32_t ret = handler->OnModifyMdirtyRecords(*saveMap);
903     if (ret != E_OK) {
904         LOGE("handler on modify records err %{public}d", ret);
905         if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
906             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
907         } else if (ret == E_CLOUD_STORAGE_FULL) {
908             SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
909         }
910         return;
911     }
912 
913     /* push more */
914     ret = AsyncRun(ctx, &DataSyncer::ModifyMdirtyRecords);
915     if (ret != E_OK) {
916         LOGE("async run modify records err %{public}d", ret);
917         return;
918     }
919 }
920 
OnModifyFdirtyRecords(shared_ptr<DKContext> context,shared_ptr<const DKDatabase> database,shared_ptr<const map<DKRecordId,DKRecordOperResult>> saveMap,shared_ptr<const map<DKRecordId,DKRecordOperResult>> deleteMap,const DKError & err)921 void DataSyncer::OnModifyFdirtyRecords(shared_ptr<DKContext> context,
922     shared_ptr<const DKDatabase> database,
923     shared_ptr<const map<DKRecordId, DKRecordOperResult>> saveMap,
924     shared_ptr<const map<DKRecordId, DKRecordOperResult>> deleteMap,
925     const DKError &err)
926 {
927     LOGI("%{private}d %{private}s on modify fdirty records %{public}zu", userId_,
928         bundleName_.c_str(), saveMap->size());
929 
930     auto ctx = static_pointer_cast<TaskContext>(context);
931 
932     /* update local */
933     auto handler = ctx->GetHandler();
934     int32_t ret = handler->OnModifyFdirtyRecords(*saveMap);
935     if (ret != E_OK) {
936         LOGE("handler on modify records err %{public}d", ret);
937         if (ret == E_SYNC_FAILED_NETWORK_NOT_AVAILABLE) {
938             SetErrorCodeMask(errorCode_, ErrorType::NETWORK_UNAVAILABLE);
939         } else if (ret == E_CLOUD_STORAGE_FULL) {
940             SetErrorCodeMask(errorCode_, ErrorType::CLOUD_STORAGE_FULL);
941         }
942         return;
943     }
944 
945     /* push more */
946     ret = AsyncRun(ctx, &DataSyncer::ModifyFdirtyRecords);
947     if (ret != E_OK) {
948         LOGE("async run modify records err %{public}d", ret);
949         return;
950     }
951 }
952 
BeginTransaction()953 void DataSyncer::BeginTransaction()
954 {
955     taskRunner_->CommitDummyTask();
956 }
957 
EndTransaction()958 void DataSyncer::EndTransaction()
959 {
960     taskRunner_->CompleteDummyTask();
961 }
962 
GetBundleName() const963 std::string DataSyncer::GetBundleName() const
964 {
965     return bundleName_;
966 }
967 
GetUserId() const968 int32_t DataSyncer::GetUserId() const
969 {
970     return userId_;
971 }
972 
GetSyncState() const973 SyncState DataSyncer::GetSyncState() const
974 {
975     return syncStateManager_.GetSyncState();
976 }
977 
CompletePull()978 int32_t DataSyncer::CompletePull()
979 {
980     LOGI("%{private}d %{private}s completes pull", userId_, bundleName_.c_str());
981     /* call syncer manager callback */
982     auto error = GetErrorType(errorCode_);
983     if (error) {
984         LOGE("pull failed, errorType:%{public}d", error);
985         SyncStateChangedNotify(CloudSyncState::DOWNLOAD_FAILED, error);
986         CompleteAll(false);
987     } else {
988         /* schedule to next stage */
989         Schedule();
990     }
991     return E_OK;
992 }
993 
CompletePush()994 int32_t DataSyncer::CompletePush()
995 {
996     LOGI("%{private}d %{public}s completes push", userId_, bundleName_.c_str());
997     /* call syncer manager callback */
998     auto error = GetErrorType(errorCode_);
999     if (error) {
1000         LOGE("pull failed, errorType:%{public}d", error);
1001         SyncStateChangedNotify(CloudSyncState::UPLOAD_FAILED, error);
1002         CompleteAll(false);
1003     } else {
1004         /* schedule to next stage */
1005         Schedule();
1006     }
1007     return E_OK;
1008 }
1009 
CompleteAll(bool isNeedNotify)1010 void DataSyncer::CompleteAll(bool isNeedNotify)
1011 {
1012     LOGI("%{private}d %{private}s completes all", userId_, bundleName_.c_str());
1013 
1014     /* guarantee: unlock if locked */
1015     ForceUnlock();
1016 
1017     /* reset internal status */
1018     Reset();
1019 
1020     SyncState syncState = SyncState::SYNC_SUCCEED;
1021     if (errorCode_ != E_OK) {
1022         syncState = SyncState::SYNC_FAILED;
1023     }
1024 
1025     CloudSyncState notifyState = CloudSyncState::COMPLETED;
1026     if (syncStateManager_.GetStopSyncFlag()) {
1027         notifyState = CloudSyncState::STOPPED;
1028     } else {
1029         SaveSubscription();
1030     }
1031 
1032     auto nextAction = syncStateManager_.UpdateSyncState(syncState);
1033     if (nextAction == Action::START) {
1034         /* Retrigger sync, clear errorcode */
1035         errorCode_ = E_OK;
1036         StartSync(false, SyncTriggerType::PENDING_TRIGGER);
1037         return;
1038     }
1039     if (nextAction == Action::FORCE_START) {
1040         /* Retrigger sync, clear errorcode */
1041         errorCode_ = E_OK;
1042         StartSync(true, SyncTriggerType::PENDING_TRIGGER);
1043         return;
1044     }
1045 
1046     /* notify sync state */
1047     if (isNeedNotify) {
1048         SyncStateChangedNotify(notifyState, ErrorType::NO_ERROR);
1049     }
1050     /* clear errorcode */
1051     errorCode_ = E_OK;
1052 }
1053 
SyncStateChangedNotify(const CloudSyncState state,const ErrorType error)1054 void DataSyncer::SyncStateChangedNotify(const CloudSyncState state, const ErrorType error)
1055 {
1056     CurrentSyncState_ = state;
1057     CurrentErrorType_ = error;
1058     CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(state, error);
1059 }
1060 
NotifyCurrentSyncState()1061 void DataSyncer::NotifyCurrentSyncState()
1062 {
1063     CloudSyncCallbackManager::GetInstance().NotifySyncStateChanged(CurrentSyncState_, CurrentErrorType_);
1064 }
1065 
SetErrorCodeMask(int32_t & errorCode,ErrorType errorType)1066 void DataSyncer::SetErrorCodeMask(int32_t &errorCode, ErrorType errorType)
1067 {
1068     errorCode |= 1 << errorType;
1069 }
1070 
GetErrorType(const int32_t code)1071 ErrorType DataSyncer::GetErrorType(const int32_t code)
1072 {
1073     if (code == E_OK) {
1074         return ErrorType::NO_ERROR;
1075     }
1076 
1077     std::vector<ErrorType> errorTypes = {NETWORK_UNAVAILABLE, WIFI_UNAVAILABLE,   BATTERY_LEVEL_WARNING,
1078                                          BATTERY_LEVEL_LOW,   CLOUD_STORAGE_FULL, LOCAL_STORAGE_FULL};
1079     for (const auto &errorType : errorTypes) {
1080         if (code & (1 << errorType)) {
1081             return errorType;
1082         }
1083     }
1084     LOGE("errorcode unexpected, errcode: %{public}d", code);
1085     return ErrorType::NO_ERROR;
1086 }
1087 
SaveSubscription()1088 void DataSyncer::SaveSubscription()
1089 {
1090     sdkHelper_->SaveSubscription([] (auto, shared_ptr<DriveKit::DKContainer>,
1091         DriveKit::DKSubscriptionResult & res) {
1092         if (!res.IsSuccess()) {
1093             auto err = res.GetDKError();
1094             LOGE("drivekit save subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1095                 err.dkErrorCode);
1096         }
1097     });
1098 }
1099 
DeleteSubscription()1100 void DataSyncer::DeleteSubscription()
1101 {
1102     sdkHelper_->DeleteSubscription([] (auto, DriveKit::DKError err) {
1103         if (err.HasError()) {
1104             LOGE("drivekit delete subscription server err %{public}d and dk errcor %{public}d", err.serverErrorCode,
1105                 err.dkErrorCode);
1106         }
1107     });
1108 }
1109 } // namespace CloudSync
1110 } // namespace FileManagement
1111 } // namespace OHOS
1112