• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20 
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35 
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, SingleVerConflictResolvePolicy policy)
38     : lastTaskId_(INVALID_TASK_ID),
39       storageProxy_(std::move(storageProxy)),
40       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
41       closed_(false),
42       timerId_(0u),
43       policy_(policy)
44 {
45     if (storageProxy_ != nullptr) {
46         id_ = storageProxy_->GetIdentify();
47     }
48     InitCloudSyncStateMachine();
49 }
50 
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)51 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
52     const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
53 {
54     CloudTaskInfo taskInfo;
55     taskInfo.mode = mode;
56     taskInfo.table = tables;
57     taskInfo.callback = callback;
58     taskInfo.timeout = waitTime;
59     taskInfo.devices = devices;
60     for (const auto &item: tables) {
61         QuerySyncObject syncObject;
62         syncObject.SetTableName(item);
63         taskInfo.queryList.push_back(syncObject);
64     }
65     return Sync(taskInfo);
66 }
67 
Sync(const CloudTaskInfo & taskInfo)68 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
69 {
70     int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
71     if (errCode != E_OK) {
72         return errCode;
73     }
74     if (cloudDB_.IsNotExistCloudDB()) {
75         LOGE("[CloudSyncer] Not set cloudDB!");
76         return -E_CLOUD_ERROR;
77     }
78     if (closed_) {
79         LOGE("[CloudSyncer] DB is closed!");
80         return -E_DB_CLOSED;
81     }
82     CloudTaskInfo info = taskInfo;
83     info.status = ProcessStatus::PREPARED;
84     errCode = TryToAddSyncTask(std::move(info));
85     if (errCode != E_OK) {
86         return errCode;
87     }
88     if (taskInfo.priorityTask) {
89         MarkCurrentTaskPausedIfNeed();
90     }
91     return TriggerSync();
92 }
93 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)94 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
95 {
96     cloudDB_.SetCloudDB(cloudDB);
97     LOGI("[CloudSyncer] SetCloudDB finish");
98 }
99 
GetCloudDB() const100 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
101 {
102     return cloudDB_.GetCloudDB();
103 }
104 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)105 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
106 {
107     storageProxy_->SetIAssetLoader(loader);
108     cloudDB_.SetIAssetLoader(loader);
109     LOGI("[CloudSyncer] SetIAssetLoader finish");
110 }
111 
Close()112 void CloudSyncer::Close()
113 {
114     closed_ = true;
115     CloudSyncer::TaskId currentTask;
116     {
117         std::lock_guard<std::mutex> autoLock(dataLock_);
118         currentTask = currentContext_.currentTaskId;
119     }
120     // mark current task db_closed
121     SetTaskFailed(currentTask, -E_DB_CLOSED);
122     UnlockIfNeed();
123     cloudDB_.Close();
124     WaitCurTaskFinished();
125 
126     // copy all task from queue
127     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
128     for (auto &info: infoList) {
129         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
130     }
131     storageProxy_->Close();
132 }
133 
StopAllTasks()134 void CloudSyncer::StopAllTasks()
135 {
136     CloudSyncer::TaskId currentTask;
137     {
138         std::lock_guard<std::mutex> autoLock(dataLock_);
139         currentTask = currentContext_.currentTaskId;
140     }
141     // mark current task user_change
142     SetTaskFailed(currentTask, -E_USER_CHANGE);
143     UnlockIfNeed();
144     WaitCurTaskFinished();
145 
146     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
147     for (auto &info: infoList) {
148         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with user changed.", info.taskId);
149         auto processNotifier = std::make_shared<ProcessNotifier>(this);
150         processNotifier->Init(info.table, info.devices, info.users);
151         info.errCode = -E_USER_CHANGE;
152         info.status = ProcessStatus::FINISHED;
153         processNotifier->NotifyProcess(info, {}, true);
154     }
155 }
156 
TriggerSync()157 int CloudSyncer::TriggerSync()
158 {
159     if (closed_) {
160         return -E_DB_CLOSED;
161     }
162     RefObject::IncObjRef(this);
163     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
164         DoSyncIfNeed();
165         RefObject::DecObjRef(this);
166     });
167     if (errCode != E_OK) {
168         LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
169         RefObject::DecObjRef(this);
170     }
171     return errCode;
172 }
173 
SetProxyUser(const std::string & user)174 void CloudSyncer::SetProxyUser(const std::string &user)
175 {
176     std::lock_guard<std::mutex> autoLock(dataLock_);
177     storageProxy_->SetUser(user);
178     currentContext_.notifier->SetUser(user);
179     currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
180     cloudDB_.SwitchCloudDB(user);
181 }
182 
DoSyncIfNeed()183 void CloudSyncer::DoSyncIfNeed()
184 {
185     if (closed_) {
186         return;
187     }
188     // do all sync task in this loop
189     do {
190         // get taskId from queue
191         TaskId triggerTaskId = GetNextTaskId();
192         if (triggerTaskId == INVALID_TASK_ID) {
193             LOGD("[CloudSyncer] task queue empty");
194             break;
195         }
196         // pop taskId in queue
197         if (PrepareSync(triggerTaskId) != E_OK) {
198             break;
199         }
200         // do sync logic
201         std::vector<std::string> usersList;
202         {
203             std::lock_guard<std::mutex> autoLock(dataLock_);
204             usersList = cloudTaskInfos_[triggerTaskId].users;
205             currentContext_.currentUserIndex = 0;
206         }
207         int errCode = E_OK;
208         if (usersList.empty()) {
209             SetProxyUser("");
210             errCode = DoSync(triggerTaskId);
211         } else {
212             for (const auto &user : usersList) {
213                 SetProxyUser(user);
214                 errCode = DoSync(triggerTaskId);
215             }
216         }
217         LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
218     } while (!closed_);
219     LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
220 }
221 
DoSync(TaskId taskId)222 int CloudSyncer::DoSync(TaskId taskId)
223 {
224     std::lock_guard<std::mutex> lock(syncMutex_);
225     ResetCurrentTableUploadBatchIndex();
226     CloudTaskInfo taskInfo;
227     {
228         std::lock_guard<std::mutex> autoLock(dataLock_);
229         taskInfo = cloudTaskInfos_[taskId];
230         cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
231         LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
232             static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
233     }
234     bool needUpload = true;
235     bool isNeedFirstDownload = false;
236     {
237         std::lock_guard<std::mutex> autoLock(dataLock_);
238         needUpload = currentContext_.strategy->JudgeUpload();
239         // 1. if the locker is already exist, directly reuse the lock, no need do the first download
240         // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
241         isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload);
242     }
243     int errCode = E_OK;
244     bool isFirstDownload = true;
245     if (isNeedFirstDownload) {
246         // do first download
247         errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
248         SetTaskFailed(taskId, errCode);
249         if (errCode != E_OK) {
250             SyncMachineDoFinished();
251             return errCode;
252         }
253         bool isActuallyNeedUpload = false;  // whether the task actually has data to upload
254         {
255             std::lock_guard<std::mutex> autoLock(dataLock_);
256             isActuallyNeedUpload = currentContext_.isNeedUpload;
257         }
258         if (!isActuallyNeedUpload) {
259             LOGI("[CloudSyncer] no table need upload!");
260             SyncMachineDoFinished();
261             return E_OK;
262         }
263         isFirstDownload = false;
264     }
265 
266     {
267         std::lock_guard<std::mutex> autoLock(dataLock_);
268         currentContext_.isFirstDownload = isFirstDownload;
269         currentContext_.isRealNeedUpload = needUpload;
270     }
271     errCode = DoSyncInner(taskInfo);
272     return errCode;
273 }
274 
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)275 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
276 {
277     {
278         std::lock_guard<std::mutex> autoLock(dataLock_);
279         currentContext_.tableName = taskInfo.table[index];
280     }
281     int errCode = CheckTaskIdValid(taskInfo.taskId);
282     if (errCode != E_OK) {
283         LOGE("[CloudSyncer] task is invalid, abort sync");
284         return errCode;
285     }
286     if (taskInfo.table.empty()) {
287         LOGE("[CloudSyncer] Invalid taskInfo table");
288         return -E_INVALID_ARGS;
289     }
290     errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
291     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
292         {
293             std::lock_guard<std::mutex> autoLock(dataLock_);
294             currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
295                 taskInfo.table[index], false);
296             LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
297         }
298         return errCode;
299     }
300     if (errCode != E_OK) {
301         LOGE("[CloudSyncer] upload failed %d", errCode);
302         return errCode;
303     }
304     return errCode;
305 }
306 
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)307 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
308 {
309     if (!needUpload) {
310         return E_OK;
311     }
312     int errCode = storageProxy_->StartTransaction();
313     if (errCode != E_OK) {
314         LOGE("[CloudSyncer] start transaction failed before doing upload.");
315         return errCode;
316     }
317     for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
318         LOGD("[CloudSyncer] try upload table, index: %zu", i);
319         if (IsTableFinishInUpload(taskInfo.table[i])) {
320             continue;
321         }
322         errCode = PrepareAndUpload(taskInfo, i);
323         if (errCode == -E_TASK_PAUSED) { // should re download paused table
324             MarkDownloadFinishIfNeed(taskInfo.table[i], false);
325         }
326         if (errCode != E_OK) {
327             break;
328         }
329         MarkUploadFinishIfNeed(taskInfo.table[i]);
330     }
331     if (errCode == -E_TASK_PAUSED) {
332         std::lock_guard<std::mutex> autoLock(dataLock_);
333         resumeTaskInfos_[taskInfo.taskId].upload = true;
334     }
335     if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
336         int commitErrorCode = storageProxy_->Commit();
337         if (commitErrorCode != E_OK) {
338             LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
339         }
340     } else {
341         int rollBackErrorCode = storageProxy_->Rollback();
342         if (rollBackErrorCode != E_OK) {
343             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
344         }
345     }
346     return errCode;
347 }
348 
SyncMachineDoDownload()349 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
350 {
351     CloudTaskInfo taskInfo;
352     bool needUpload;
353     bool isFirstDownload;
354     {
355         std::lock_guard<std::mutex> autoLock(dataLock_);
356         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
357         needUpload = currentContext_.isRealNeedUpload;
358         isFirstDownload = currentContext_.isFirstDownload;
359     }
360     int errCode = E_OK;
361     if (IsLockInDownload()) {
362         errCode = LockCloudIfNeed(taskInfo.taskId);
363     }
364     if (errCode != E_OK) {
365         return SetCurrentTaskFailedInMachine(errCode);
366     }
367     errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
368     if (errCode != E_OK) {
369         if (errCode == -E_TASK_PAUSED) {
370             DBDfxAdapter::ReportBehavior(
371                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
372         } else {
373             DBDfxAdapter::ReportBehavior(
374                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
375         }
376         return SetCurrentTaskFailedInMachine(errCode);
377     }
378     DBDfxAdapter::ReportBehavior(
379         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
380     return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
381 }
382 
SyncMachineDoUpload()383 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
384 {
385     CloudTaskInfo taskInfo;
386     bool needUpload;
387     {
388         std::lock_guard<std::mutex> autoLock(dataLock_);
389         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
390         needUpload = currentContext_.isRealNeedUpload;
391     }
392     DBDfxAdapter::ReportBehavior(
393         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
394     int errCode = DoUploadInNeed(taskInfo, needUpload);
395     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
396         DBDfxAdapter::ReportBehavior(
397             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
398         return CloudSyncEvent::REPEAT_CHECK_EVENT;
399     }
400     if (errCode != E_OK) {
401         {
402             std::lock_guard<std::mutex> autoLock(dataLock_);
403             cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
404         }
405         if (errCode == -E_TASK_PAUSED) {
406             DBDfxAdapter::ReportBehavior(
407                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
408         } else {
409             DBDfxAdapter::ReportBehavior(
410                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
411         }
412         return CloudSyncEvent::ERROR_EVENT;
413     }
414     DBDfxAdapter::ReportBehavior(
415         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
416     return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
417 }
418 
SyncMachineDoFinished()419 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
420 {
421     UnlockIfNeed();
422     TaskId taskId;
423     int errCode;
424     int currentUserIndex;
425     int userListSize;
426     {
427         std::lock_guard<std::mutex> autoLock(dataLock_);
428         taskId = currentContext_.currentTaskId;
429         errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
430         cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
431         currentUserIndex = currentContext_.currentUserIndex;
432         userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
433     }
434     if (currentUserIndex >= userListSize) {
435         DoFinished(taskId, errCode);
436     } else {
437         CloudTaskInfo taskInfo;
438         {
439             std::lock_guard<std::mutex> autoLock(dataLock_);
440             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
441         }
442         taskInfo.status = ProcessStatus::FINISHED;
443         currentContext_.notifier->NotifyProcess(taskInfo, {});
444     }
445     return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
446 }
447 
DoSyncInner(const CloudTaskInfo & taskInfo)448 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
449 {
450     cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
451     DBDfxAdapter::ReportBehavior(
452         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
453     return E_OK;
454 }
455 
DoFinished(TaskId taskId,int errCode)456 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
457 {
458     storageProxy_->OnSyncFinish();
459     if (errCode == -E_TASK_PAUSED) {
460         DBDfxAdapter::ReportBehavior(
461             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
462         LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
463         {
464             std::lock_guard<std::mutex> autoLock(dataLock_);
465             resumeTaskInfos_[taskId].context = std::move(currentContext_);
466             currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
467             resumeTaskInfos_[taskId].context.locker = nullptr;
468             ClearCurrentContextWithoutLock();
469         }
470         contextCv_.notify_one();
471         return;
472     }
473     {
474         std::lock_guard<std::mutex> autoLock(dataLock_);
475         taskQueue_.remove(taskId);
476         priorityTaskQueue_.remove(taskId);
477     }
478     ClearContextAndNotify(taskId, errCode);
479 }
480 
481 /**
482  * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
483 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)484 int CloudSyncer::UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList)
485 {
486     if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
487         return E_OK;
488     }
489     int ret = E_OK;
490     for (size_t j : param.withoutRowIdData.insertData) {
491         VBucket &datum = param.downloadData.data[j];
492         std::vector<Type> primaryValues;
493         ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
494             std::get<int64_t>(datum[CloudDbConstant::ROW_ID_FIELD_NAME]), primaryValues);
495         if (ret != E_OK) {
496             LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
497             return ret;
498         }
499         param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
500     }
501     for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
502         size_t downloadIndex = std::get<0>(tuple);
503         VBucket &datum = param.downloadData.data[downloadIndex];
504         size_t insertIdx = std::get<1>(tuple);
505         std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
506         pkVal[0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME];
507     }
508     for (const auto &tuple : param.withoutRowIdData.updateData) {
509         size_t downloadIndex = std::get<0>(tuple);
510         size_t updateIndex = std::get<1>(tuple);
511         VBucket &datum = param.downloadData.data[downloadIndex];
512         size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
513         if (updateIndex >= size) {
514             LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
515             return -E_INTERNAL_ERROR;
516         }
517         if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
518             LOGE("[CloudSyncer] primary key value list should not be empty.");
519             return -E_INTERNAL_ERROR;
520         }
521         // no primary key or composite primary key, the first element is rowid
522         param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
523             datum[CloudDbConstant::ROW_ID_FIELD_NAME];
524     }
525     return ret;
526 }
527 
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)528 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
529 {
530     for (const auto &assetField : assetFields) {
531         if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
532             if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
533                 return true;
534             }
535         }
536     }
537     return false;
538 }
539 
IsDataContainAssets()540 bool CloudSyncer::IsDataContainAssets()
541 {
542     std::lock_guard<std::mutex> autoLock(dataLock_);
543     bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
544     if (!hasTable) {
545         LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
546             -E_INTERNAL_ERROR);
547             return false;
548     }
549     if (currentContext_.assetFields[currentContext_.tableName].empty()) {
550         return false;
551     }
552     return true;
553 }
554 
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,int & errCode)555 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
556     bool setNormalStatus, int &errCode)
557 {
558     // Define a map to store the result
559     std::map<std::string, Assets> res = {};
560     std::vector<Field> assetFields;
561     {
562         std::lock_guard<std::mutex> autoLock(dataLock_);
563         assetFields = currentContext_.assetFields[currentContext_.tableName];
564     }
565     // For every column contain asset or assets, assetFields are in context
566     for (const Field &assetField : assetFields) {
567         Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus, errCode);
568         if (!assets.empty()) {
569             res[assetField.colName] = assets;
570         }
571         if (errCode != E_OK) {
572             break;
573         }
574     }
575     return res;
576 }
577 
FillCloudAssets(const std::string & tableName,VBucket & normalAssets,VBucket & failedAssets)578 int CloudSyncer::FillCloudAssets(const std::string &tableName, VBucket &normalAssets,
579     VBucket &failedAssets)
580 {
581     int ret = E_OK;
582     if (normalAssets.size() > 1) {
583         ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true);
584         if (ret != E_OK) {
585             LOGE("[CloudSyncer] Can not fill normal cloud assets for download");
586             return ret;
587         }
588     }
589     if (failedAssets.size() > 1) {
590         ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false);
591         if (ret != E_OK) {
592             LOGE("[CloudSyncer] Can not fill abnormal assets for download");
593             return ret;
594         }
595     }
596     return E_OK;
597 }
598 
HandleDownloadResult(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)599 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, const std::string &tableName,
600     DownloadCommitList &commitList, uint32_t &successCount)
601 {
602     successCount = 0;
603     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
604     if (errCode != E_OK) {
605         LOGE("[CloudSyncer] start transaction Failed before handle download.");
606         return errCode;
607     }
608     errCode = CommitDownloadAssets(downloadItem, tableName, commitList, successCount);
609     if (errCode != E_OK) {
610         successCount = 0;
611         int ret = E_OK;
612         if (errCode == -E_REMOVE_ASSETS_FAILED) {
613             // remove assets failed no effect to asset status, just commit
614             ret = storageProxy_->Commit();
615         } else {
616             ret = storageProxy_->Rollback();
617         }
618         LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
619         return errCode;
620     }
621     errCode = storageProxy_->Commit();
622     if (errCode != E_OK) {
623         successCount = 0;
624         LOGE("[CloudSyncer] commit failed %d", errCode);
625     }
626     return errCode;
627 }
628 
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)629 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
630     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
631 {
632     int downloadStatus = E_OK;
633     {
634         std::lock_guard<std::mutex> autoLock(dataLock_);
635         downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
636         resumeTaskInfos_[taskId].downloadStatus = E_OK;
637     }
638     int errorCode = E_OK;
639     DownloadCommitList commitList;
640     for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
641         errorCode = CheckTaskIdValid(taskId);
642         if (errorCode != E_OK) {
643             std::lock_guard<std::mutex> autoLock(dataLock_);
644             resumeTaskInfos_[taskId].lastDownloadIndex = i;
645             resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
646             break;
647         }
648         DownloadItem downloadItem;
649         GetDownloadItem(downloadList, i, downloadItem);
650         errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
651         if (errorCode == -E_NOT_SET) {
652             info.downLoadInfo.failCount += (downloadList.size() - i);
653             info.downLoadInfo.successCount -= (downloadList.size() - i);
654             return errorCode;
655         }
656         if (downloadItem.strategy == OpType::DELETE) {
657             downloadItem.assets = {};
658             downloadItem.gid = "";
659         }
660         // Process result of each asset
661         commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
662         downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
663         int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
664         if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
665             return ret;
666         }
667         downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
668     }
669     LOGD("Download status is %d", downloadStatus);
670     return errorCode == E_OK ? downloadStatus : errorCode;
671 }
672 
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)673 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
674     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
675 {
676     if (!IsDataContainAssets()) {
677         return E_OK;
678     }
679     // update changed data info
680     if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
681         // changedData.primaryData should have no member inside
682         return -E_INVALID_ARGS;
683     }
684     changedAssets.tableName = info.tableName;
685     changedAssets.type = ChangedDataType::ASSET;
686     changedAssets.field = pKColNames;
687 
688     // Get AssetDownloadList
689     DownloadList changeList;
690     TaskId taskId;
691     {
692         std::lock_guard<std::mutex> autoLock(dataLock_);
693         changeList = currentContext_.assetDownloadList;
694         taskId = currentContext_.currentTaskId;
695     }
696     // Download data (include deleting) will handle return Code in this situation
697     int ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
698     if (ret != E_OK) {
699         LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
700     }
701     return ret;
702 }
703 
GetAssetsFromVBucket(VBucket & data)704 std::map<std::string, Assets> CloudSyncer::GetAssetsFromVBucket(VBucket &data)
705 {
706     std::map<std::string, Assets> assets;
707     std::vector<Field> fields;
708     {
709         std::lock_guard<std::mutex> autoLock(dataLock_);
710         fields = currentContext_.assetFields[currentContext_.tableName];
711     }
712     for (const auto &field : fields) {
713         if (data.find(field.colName) != data.end()) {
714             if (field.type == TYPE_INDEX<Asset> && data[field.colName].index() == TYPE_INDEX<Asset>) {
715                 assets[field.colName] = { std::get<Asset>(data[field.colName]) };
716             } else if (field.type == TYPE_INDEX<Assets> && data[field.colName].index() == TYPE_INDEX<Assets>) {
717                 assets[field.colName] = std::get<Assets>(data[field.colName]);
718             } else {
719                 Assets emptyAssets;
720                 assets[field.colName] = emptyAssets;
721             }
722         }
723     }
724     return assets;
725 }
726 
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)727 int CloudSyncer::TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
728 {
729     OpType strategyOpResult = OpType::NOT_HANDLE;
730     int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
731     if (errCode != E_OK) {
732         return errCode;
733     }
734     param.downloadData.opType[idx] = strategyOpResult;
735     if (!IsDataContainAssets()) {
736         return E_OK;
737     }
738     Key hashKey;
739     if (isExist) {
740         hashKey = dataInfo.localInfo.logInfo.hashKey;
741     }
742     return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
743 }
744 
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)745 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
746     VBucket &localAssetInfo)
747 {
748     int ret = E_OK;
749     OpType strategy = param.downloadData.opType[idx];
750     switch (strategy) {
751         case OpType::INSERT:
752         case OpType::UPDATE:
753         case OpType::DELETE:
754             ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
755             break;
756         case OpType::NOT_HANDLE:
757         case OpType::ONLY_UPDATE_GID:
758         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
759             (void)TagAssetsInSingleRecord(localAssetInfo, param.downloadData.data[idx], true, ret);
760             for (const auto &[col, value]: localAssetInfo) {
761                 param.downloadData.data[idx].insert_or_assign(col, value);
762             }
763             break;
764         }
765         default:
766             break;
767     }
768     return ret;
769 }
770 
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)771 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
772     VBucket &localAssetInfo)
773 {
774     Type prefix;
775     std::vector<Type> pkVals;
776     OpType strategy = param.downloadData.opType[idx];
777     bool isDelStrategy = (strategy == OpType::DELETE);
778     int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
779         param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
780     if (ret != E_OK) {
781         LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
782         return ret;
783     }
784     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
785     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
786         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
787         return -E_INTERNAL_ERROR;
788     }
789     AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
790     std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
791         false, ret);
792     if (ret != E_OK) {
793         LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
794         return ret;
795     }
796     strategy = CloudSyncUtils::CalOpType(param, idx);
797     if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
798         param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
799     }
800     param.assetsDownloadList.push_back(
801         std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
802         pkVals, dataInfo.cloudLogInfo.timestamp));
803     return ret;
804 }
805 
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache)806 int CloudSyncer::SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
807     std::map<std::string, LogInfo> &localLogInfoCache)
808 {
809     int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
810     if (ret != E_OK) {
811         LOGE("[CloudSyncer] Invalid download data:%d", ret);
812         return ret;
813     }
814     CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
815     DataInfo dataInfo;
816     VBucket localAssetInfo;
817     bool isExist = true;
818     ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
819     if (ret == -E_NOT_FOUND) {
820         isExist = false;
821     } else if (ret != E_OK) {
822         LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
823         return ret;
824     }
825     // Get cloudLogInfo from cloud data
826     dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
827     // Tag datum to get opType
828     ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
829     if (ret != E_OK) {
830         LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
831         return ret;
832     }
833     CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
834         localLogInfoCache);
835     ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
836     if (ret != E_OK) {
837         LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
838     }
839     return ret;
840 }
841 
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)842 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam &param)
843 {
844     if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
845         LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
846         return -E_INVALID_ARGS;
847     }
848     // Update download batch Info
849     param.info.downLoadInfo.batchIndex += 1;
850     param.info.downLoadInfo.total += param.downloadData.data.size();
851     int ret = E_OK;
852     DownloadList assetsDownloadList;
853     param.assetsDownloadList = assetsDownloadList;
854     param.deletePrimaryKeySet.clear();
855     param.dupHashKeySet.clear();
856     CloudSyncUtils::ClearWithoutData(param);
857     std::vector<std::pair<Key, size_t>> deletedList;
858     // use for record local delete status
859     std::map<std::string, LogInfo> localLogInfoCache;
860     for (size_t i = 0; i < param.downloadData.data.size(); i++) {
861         ret = SaveDatum(param, i, deletedList, localLogInfoCache);
862         if (ret != E_OK) {
863             param.info.downLoadInfo.failCount += param.downloadData.data.size();
864             LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
865             return ret;
866         }
867     }
868     // Save assetsMap into current context
869     {
870         std::lock_guard<std::mutex> autoLock(dataLock_);
871         currentContext_.assetDownloadList = param.assetsDownloadList;
872     }
873     // save the data to the database by batch, downloadData will return rowid when insert data.
874     ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
875     if (ret != E_OK) {
876         param.info.downLoadInfo.failCount += param.downloadData.data.size();
877         LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
878         return ret;
879     }
880     ret = UpdateChangedData(param, currentContext_.assetDownloadList);
881     if (ret != E_OK) {
882         param.info.downLoadInfo.failCount += param.downloadData.data.size();
883         LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
884         return ret;
885     }
886     // Update downloadInfo
887     param.info.downLoadInfo.successCount += param.downloadData.data.size();
888     // Get latest cloudWaterMark
889     VBucket &lastData = param.downloadData.data.back();
890     if (!IsQueryListEmpty(taskId) && param.isLastBatch) {
891         // the last batch of cursor in the conditional query is useless
892         param.cloudWaterMark = {};
893     } else {
894         param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
895     }
896     return UpdateFlagForSavedRecord(param);
897 }
898 
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)899 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
900 {
901     // Check Input and Context Validity
902     int ret = CheckTaskIdValid(taskId);
903     if (ret != E_OK) {
904         return ret;
905     }
906     {
907         std::lock_guard<std::mutex> autoLock(dataLock_);
908         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
909             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
910             return -E_INVALID_ARGS;
911         }
912     }
913     if (currentContext_.strategy == nullptr) {
914         LOGE("[CloudSyncer] Strategy has not been initialized");
915         return -E_INVALID_ARGS;
916     }
917     ret = storageProxy_->CheckSchema(tableName);
918     if (ret != E_OK) {
919         LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
920         return ret;
921     }
922     return E_OK;
923 }
924 
NeedNotifyChangedData(const ChangedData & changedData)925 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
926 {
927     TaskId taskId;
928     {
929         std::lock_guard<std::mutex> autoLock(dataLock_);
930         taskId = currentContext_.currentTaskId;
931     }
932     if (IsModeForcePush(taskId)) {
933         return false;
934     }
935     // when there have no data been changed, it don't need fill back
936     if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
937         changedData.primaryData[OP_DELETE].empty()) {
938         return false;
939     }
940     return true;
941 }
942 
NotifyChangedData(ChangedData && changedData)943 int CloudSyncer::NotifyChangedData(ChangedData &&changedData)
944 {
945     if (!NeedNotifyChangedData(changedData)) {
946         return E_OK;
947     }
948     std::string deviceName;
949     {
950         std::lock_guard<std::mutex> autoLock(dataLock_);
951         std::vector<std::string> devices = currentContext_.notifier->GetDevices();
952         if (devices.empty()) {
953             DBDfxAdapter::ReportBehavior(
954                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
955             LOGE("[CloudSyncer] CurrentContext do not contain device info");
956             return -E_CLOUD_ERROR;
957         }
958         // We use first device name as the target of NotifyChangedData
959         deviceName = devices[0];
960     }
961     int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData));
962     if (ret != E_OK) {
963         DBDfxAdapter::ReportBehavior(
964             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
965         LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
966     }
967     DBDfxAdapter::ReportBehavior(
968         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
969     return ret;
970 }
971 
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)972 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
973 {
974     if (!isFirstDownload && param.downloadData.data.empty()) {
975         // if the second download and there is no download data, do not notify
976         return;
977     }
978     std::lock_guard<std::mutex> autoLock(dataLock_);
979     if (currentContext_.strategy->JudgeUpload()) {
980         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
981     } else {
982         if (param.isLastBatch) {
983             param.info.tableStatus = ProcessStatus::FINISHED;
984         }
985         if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
986             currentContext_.notifier->UpdateProcess(param.info);
987         } else {
988             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
989         }
990     }
991 }
992 
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)993 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam &param)
994 {
995     int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
996     if (ret != E_OK) {
997         LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
998         return ret;
999     }
1000     if (!IsModeForcePush(taskId)) {
1001         param.changedData.tableName = param.info.tableName;
1002         param.changedData.field = param.pkColNames;
1003         param.changedData.type = ChangedDataType::DATA;
1004     }
1005     ret = SaveData(taskId, param);
1006     param.insertPk.clear();
1007     if (ret != E_OK) {
1008         LOGE("[CloudSyncer] cannot save data: %d.", ret);
1009         int rollBackErrorCode = storageProxy_->Rollback();
1010         if (rollBackErrorCode != E_OK) {
1011             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1012         } else {
1013             LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1014         }
1015         return ret;
1016     }
1017     ret = storageProxy_->Commit();
1018     if (ret != E_OK) {
1019         LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1020     }
1021     return ret;
1022 }
1023 
DoDownloadAssets(bool skipSave,SyncParam & param)1024 int CloudSyncer::DoDownloadAssets(bool skipSave, SyncParam &param)
1025 {
1026     // Begin downloading assets
1027     ChangedData changedAssets;
1028     int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1029     bool isSharedTable = false;
1030     int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1031     if (errCode != E_OK) {
1032         LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1033         return errCode;
1034     }
1035     if (!isSharedTable) {
1036         (void)NotifyChangedData(std::move(changedAssets));
1037     }
1038     if (ret == -E_TASK_PAUSED) {
1039         LOGD("[CloudSyncer] current task was paused, abort download asset");
1040         std::lock_guard<std::mutex> autoLock(dataLock_);
1041         resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = true;
1042         return ret;
1043     } else if (skipSave) {
1044         std::lock_guard<std::mutex> autoLock(dataLock_);
1045         resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = false;
1046     }
1047     if (ret != E_OK) {
1048         LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1049     }
1050     return ret;
1051 }
1052 
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param)1053 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param)
1054 {
1055     ChangedData changedData;
1056     bool skipSave = false;
1057     {
1058         bool currentTableResume = IsCurrentTableResume(taskId, false);
1059         std::lock_guard<std::mutex> autoLock(dataLock_);
1060         if (currentTableResume && resumeTaskInfos_[currentContext_.currentTaskId].skipQuery) {
1061             skipSave = true;
1062         }
1063     }
1064     int ret;
1065     if (!skipSave) {
1066         param.changedData = changedData;
1067         param.downloadData.opType.resize(param.downloadData.data.size());
1068         param.downloadData.existDataKey.resize(param.downloadData.data.size());
1069         param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1070         (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1071         ret = SaveDataInTransaction(taskId, param);
1072         (void)storageProxy_->ClearAllTempSyncTrigger();
1073         if (ret != E_OK) {
1074             return ret;
1075         }
1076         // call OnChange to notify changedData object first time (without Assets)
1077         ret = NotifyChangedData(std::move(param.changedData));
1078         if (ret != E_OK) {
1079             LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1080             return ret;
1081         }
1082     }
1083     ret = DoDownloadAssets(skipSave, param);
1084     if (ret != E_OK) {
1085         return ret;
1086     }
1087     UpdateCloudWaterMark(taskId, param);
1088     return E_OK;
1089 }
1090 
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1091 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1092     bool lastBatch)
1093 {
1094     CloudTaskInfo taskInfo;
1095     {
1096         std::lock_guard<std::mutex> autoLock(dataLock_);
1097         taskInfo = cloudTaskInfos_[uploadParam.taskId];
1098     }
1099     std::lock_guard<std::mutex> autoLock(dataLock_);
1100     if (uploadParam.lastTable && lastBatch) {
1101         currentContext_.notifier->UpdateProcess(innerProcessInfo);
1102     } else {
1103         currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1104     }
1105 }
1106 
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1107 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1108 {
1109     SyncParam param;
1110     int errCode = GetSyncParamForDownload(taskId, param);
1111     if (errCode != E_OK) {
1112         LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1113         return errCode;
1114     }
1115     errCode = DoDownloadInner(taskId, param, isFirstDownload);
1116     if (errCode == -E_TASK_PAUSED) {
1117         // No need to handle ret.
1118         int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1119         if (ret != E_OK) {
1120             LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1121         }
1122         std::lock_guard<std::mutex> autoLock(dataLock_);
1123         resumeTaskInfos_[taskId].syncParam = std::move(param);
1124     }
1125     return errCode;
1126 }
1127 
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1128 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
1129 {
1130     // Query data by batch until reaching end and not more data need to be download
1131     int ret = PreCheck(taskId, param.info.tableName);
1132     if (ret != E_OK) {
1133         return ret;
1134     }
1135     do {
1136         ret = DownloadOneBatch(taskId, param, isFirstDownload);
1137         if (ret != E_OK) {
1138             return ret;
1139         }
1140     } while (!param.isLastBatch);
1141     return E_OK;
1142 }
1143 
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1144 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1145 {
1146     std::lock_guard<std::mutex> autoLock(dataLock_);
1147     if (currentContext_.strategy->JudgeUpload()) {
1148         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1149     } else {
1150         info.tableStatus = FINISHED;
1151         if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1152             currentContext_.notifier->UpdateProcess(info);
1153         } else {
1154             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1155         }
1156     }
1157 }
1158 
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1159 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1160 {
1161     int ret = PreCheck(taskId, tableName);
1162     if (ret != E_OK) {
1163         return ret;
1164     }
1165     {
1166         std::lock_guard<std::mutex> autoLock(dataLock_);
1167         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1168             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1169             return -E_INVALID_ARGS;
1170         }
1171         if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1172             (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1173             LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1174                 static_cast<int>(cloudTaskInfos_[taskId].mode));
1175             return -E_INVALID_ARGS;
1176         }
1177     }
1178 
1179     return ret;
1180 }
1181 
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1182 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1183     InnerProcessInfo &innerProcessInfo)
1184 {
1185     int errCode = E_OK;
1186     if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1187         errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1188         if (errCode != E_OK) {
1189             return errCode;
1190         }
1191     }
1192 
1193     if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1194         errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1195         if (errCode != E_OK) {
1196             return errCode;
1197         }
1198     }
1199 
1200     if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1201         errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1202         if (errCode != E_OK) {
1203             return errCode;
1204         }
1205     }
1206 
1207     if (!uploadData.lockData.rowid.empty()) {
1208         errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1209     }
1210     return errCode;
1211 }
1212 
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1213 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1214 {
1215     int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1216     if (errCode != E_OK) {
1217         return errCode;
1218     }
1219     Info insertInfo;
1220     Info updateInfo;
1221     Info deleteInfo;
1222     errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1223     if (errCode != E_OK) {
1224         return errCode;
1225     }
1226     bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total;
1227     if (lastBatch) {
1228         innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1229     }
1230     // After each batch upload successed, call NotifyProcess
1231     NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1232 
1233     // if batch upload successed, update local water mark
1234     // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1235     errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1236     if (errCode != E_OK) {
1237         LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1238     }
1239     return errCode;
1240 }
1241 
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1242 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1243 {
1244     int errCode = E_OK;
1245     storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1246     // if we use local cover cloud strategy, it won't update local water mark also.
1247     if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1248         !IsQueryListEmpty(uploadParam.taskId))) {
1249         return E_OK;
1250     }
1251     errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1252     if (errCode != E_OK) {
1253         LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1254     }
1255     return errCode;
1256 }
1257 
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1258 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1259 {
1260     std::string tableName;
1261     int ret = GetCurrentTableName(tableName);
1262     if (ret != E_OK) {
1263         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1264         return ret;
1265     }
1266 
1267     Timestamp localMark = 0u;
1268     ret = PreCheckUpload(taskId, tableName, localMark);
1269     if (ret != E_OK) {
1270         LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1271         return ret;
1272     }
1273     ReloadWaterMarkIfNeed(taskId, localMark);
1274     storageProxy_->OnUploadStart();
1275 
1276     int64_t count = 0;
1277     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1278         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1279     LOGI("get upload count:%zu", count);
1280     if (ret != E_OK) {
1281         // GetUploadCount will return E_OK when upload count is zero.
1282         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1283         return ret;
1284     }
1285     if (count == 0) {
1286         UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1287         return E_OK;
1288     }
1289     UploadParam param;
1290     param.count = count;
1291     param.lastTable = lastTable;
1292     param.taskId = taskId;
1293     param.lockAction = lockAction;
1294     return DoUploadInner(tableName, param);
1295 }
1296 
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1297 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1298     CloudSyncData &uploadData)
1299 {
1300     // Precheck and calculate local water mark which would be updated if batch upload successed.
1301     int ret = CheckTaskIdValid(uploadParam.taskId);
1302     if (ret != E_OK) {
1303         return ret;
1304     }
1305     ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1306         innerProcessInfo.upLoadInfo.total);
1307     if (ret != E_OK) {
1308         LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1309         return ret;
1310     }
1311     TagUploadAssets(uploadData);
1312     CloudSyncUtils::UpdateAssetsFlag(uploadData);
1313     // get local water mark to be updated in future.
1314     ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1315         uploadParam.taskId, uploadParam.localMark);
1316     if (ret != E_OK) {
1317         LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1318     }
1319     return ret;
1320 }
1321 
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1322 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1323 {
1324     std::string cloudWaterMark;
1325     bool isUpdateCloudCursor = true;
1326     {
1327         std::lock_guard<std::mutex> autoLock(dataLock_);
1328         if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1329             currentContext_.cloudWaterMarks.end() ||
1330             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1331             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1332             LOGD("[CloudSyncer] Not found water mark just return");
1333             return E_OK;
1334         }
1335         cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1336         isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1337     }
1338     isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId));
1339     if (isUpdateCloudCursor) {
1340         int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1341         if (errCode != E_OK) {
1342             LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1343         }
1344         return errCode;
1345     }
1346     return E_OK;
1347 }
1348 
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1349 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1350 {
1351     std::lock_guard<std::mutex> autoLock(dataLock_);
1352     uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1353     uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1354 }
1355 
IsModeForcePush(TaskId taskId)1356 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1357 {
1358     std::lock_guard<std::mutex> autoLock(dataLock_);
1359     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1360 }
1361 
IsModeForcePull(const TaskId taskId)1362 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1363 {
1364     std::lock_guard<std::mutex> autoLock(dataLock_);
1365     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1366 }
1367 
IsPriorityTask(TaskId taskId)1368 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1369 {
1370     std::lock_guard<std::mutex> autoLock(dataLock_);
1371     return cloudTaskInfos_[taskId].priorityTask;
1372 }
1373 
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1374 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1375 {
1376     // type index of field in fields, true means mandatory filed
1377     static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1378         std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1379         std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1380         std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1381         std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1382         std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1383         std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1384     };
1385 
1386     for (const auto &fieldIndex : fieldAndIndex) {
1387         if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1388             if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1389                 continue;
1390             }
1391             LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", std::get<0>(fieldIndex).c_str());
1392             return -E_CLOUD_ERROR;
1393         }
1394         if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1395             LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", std::get<0>(fieldIndex).c_str());
1396             return -E_CLOUD_ERROR;
1397         }
1398     }
1399 
1400     if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1401         CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1402     }
1403     std::lock_guard<std::mutex> autoLock(dataLock_);
1404     if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1405         LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1406         return -E_CLOUD_ERROR;
1407     }
1408     return E_OK;
1409 }
1410 
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1411 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1412     DownloadData &downloadData)
1413 {
1414     VBucket extend;
1415     int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1416     if (ret != E_OK) {
1417         return ret;
1418     }
1419     ret = cloudDB_.Query(tableName, extend, downloadData.data);
1420     if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1421         if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1422             LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1423             return -E_CLOUD_ERROR;
1424         }
1425         cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1426         LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1427         return ret;
1428     }
1429     if (ret == -E_QUERY_END) {
1430         LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1431         return -E_QUERY_END;
1432     }
1433     if (ret != E_OK) {
1434         LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1435     }
1436     return ret;
1437 }
1438 
CheckTaskIdValid(TaskId taskId)1439 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1440 {
1441     if (closed_) {
1442         LOGE("[CloudSyncer] DB is closed.");
1443         return -E_DB_CLOSED;
1444     }
1445     std::lock_guard<std::mutex> autoLock(dataLock_);
1446     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1447         LOGE("[CloudSyncer] not found task.");
1448         return -E_INVALID_ARGS;
1449     }
1450     if (cloudTaskInfos_[taskId].pause) {
1451         LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1452         return -E_TASK_PAUSED;
1453     }
1454     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1455         return cloudTaskInfos_[taskId].errCode;
1456     }
1457     return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1458 }
1459 
GetCurrentTableName(std::string & tableName)1460 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1461 {
1462     std::lock_guard<std::mutex> autoLock(dataLock_);
1463     if (currentContext_.tableName.empty()) {
1464         return -E_BUSY;
1465     }
1466     tableName = currentContext_.tableName;
1467     return E_OK;
1468 }
1469 
CheckQueueSizeWithNoLock(bool priorityTask)1470 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1471 {
1472     int32_t limit = queuedManualSyncLimit_;
1473     if (!priorityTask && taskQueue_.size() >= static_cast<size_t>(limit)) {
1474         LOGW("[CloudSyncer] too much sync task");
1475         return -E_BUSY;
1476     } else if (priorityTask && priorityTaskQueue_.size() >= static_cast<size_t>(limit)) {
1477         LOGW("[CloudSyncer] too much priority sync task");
1478         return -E_BUSY;
1479     }
1480     return E_OK;
1481 }
1482 
PrepareSync(TaskId taskId)1483 int CloudSyncer::PrepareSync(TaskId taskId)
1484 {
1485     std::lock_guard<std::mutex> autoLock(dataLock_);
1486     if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1487         LOGW("[CloudSyncer] Abort sync because syncer is closed");
1488         return -E_DB_CLOSED;
1489     }
1490     if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1491         LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1492         return -E_DB_CLOSED;
1493     }
1494     currentContext_.currentTaskId = taskId;
1495     cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1496     cloudTaskInfos_[taskId].pause = false;
1497     cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1498     if (cloudTaskInfos_[taskId].resume) {
1499         auto tempLocker = currentContext_.locker;
1500         currentContext_ = resumeTaskInfos_[taskId].context;
1501         currentContext_.locker = tempLocker;
1502     } else {
1503         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1504         currentContext_.strategy = StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, policy_);
1505         currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1506             cloudTaskInfos_[taskId].users);
1507         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1508     }
1509     LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64, cloudTaskInfos_[taskId].storeId.c_str(), taskId);
1510     return E_OK;
1511 }
1512 
LockCloud(TaskId taskId)1513 int CloudSyncer::LockCloud(TaskId taskId)
1514 {
1515     int period;
1516     {
1517         auto res = cloudDB_.Lock();
1518         if (res.first != E_OK) {
1519             return res.first;
1520         }
1521         period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1522     }
1523     int errCode = StartHeartBeatTimer(period, taskId);
1524     if (errCode != E_OK) {
1525         UnlockCloud();
1526     }
1527     return errCode;
1528 }
1529 
UnlockCloud()1530 int CloudSyncer::UnlockCloud()
1531 {
1532     FinishHeartBeatTimer();
1533     return cloudDB_.UnLock();
1534 }
1535 
StartHeartBeatTimer(int period,TaskId taskId)1536 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1537 {
1538     if (timerId_ != 0u) {
1539         LOGW("[CloudSyncer] HeartBeat timer has been start!");
1540         return E_OK;
1541     }
1542     TimerId timerId = 0;
1543     int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1544         HeartBeat(timerId, taskId);
1545         return E_OK;
1546     }, nullptr, timerId);
1547     if (errCode != E_OK) {
1548         LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1549         return errCode;
1550     }
1551     timerId_ = timerId;
1552     return E_OK;
1553 }
1554 
FinishHeartBeatTimer()1555 void CloudSyncer::FinishHeartBeatTimer()
1556 {
1557     if (timerId_ == 0u) {
1558         return;
1559     }
1560     RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1561     timerId_ = 0u;
1562     LOGD("[CloudSyncer] Finish heartbeat timer ok");
1563 }
1564 
HeartBeat(TimerId timerId,TaskId taskId)1565 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1566 {
1567     if (timerId_ != timerId) {
1568         return;
1569     }
1570     IncObjRef(this);
1571     {
1572         std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1573         heartbeatCount_[taskId]++;
1574     }
1575     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1576         {
1577             std::lock_guard<std::mutex> guard(dataLock_);
1578             if (currentContext_.currentTaskId != taskId) {
1579                 heartbeatCount_.erase(taskId);
1580                 failedHeartbeatCount_.erase(taskId);
1581                 DecObjRef(this);
1582                 return;
1583             }
1584         }
1585         if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1586             // heartbeat block twice should finish task now
1587             SetTaskFailed(taskId, -E_CLOUD_ERROR);
1588         } else {
1589             int ret = cloudDB_.HeartBeat();
1590             if (ret != E_OK) {
1591                 HeartBeatFailed(taskId, ret);
1592             } else {
1593                 failedHeartbeatCount_[taskId] = 0;
1594             }
1595         }
1596         {
1597             std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1598             heartbeatCount_[taskId]--;
1599             if (currentContext_.currentTaskId != taskId) {
1600                 heartbeatCount_.erase(taskId);
1601                 failedHeartbeatCount_.erase(taskId);
1602             }
1603         }
1604         DecObjRef(this);
1605     });
1606     if (errCode != E_OK) {
1607         LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1608         DecObjRef(this);
1609     }
1610 }
1611 
HeartBeatFailed(TaskId taskId,int errCode)1612 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1613 {
1614     failedHeartbeatCount_[taskId]++;
1615     if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1616         return;
1617     }
1618     LOGW("[CloudSyncer] heartbeat failed too much times!");
1619     FinishHeartBeatTimer();
1620     SetTaskFailed(taskId, errCode);
1621 }
1622 
SetTaskFailed(TaskId taskId,int errCode)1623 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1624 {
1625     std::lock_guard<std::mutex> autoLock(dataLock_);
1626     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1627         return;
1628     }
1629     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1630         return;
1631     }
1632     cloudTaskInfos_[taskId].errCode = errCode;
1633 }
1634 
GetCloudSyncTaskCount()1635 int32_t CloudSyncer::GetCloudSyncTaskCount()
1636 {
1637     std::lock_guard<std::mutex> autoLock(dataLock_);
1638     return static_cast<int32_t>(taskQueue_.size() + priorityTaskQueue_.size());
1639 }
1640 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1641 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1642     const RelationalSchemaObject &localSchema)
1643 {
1644     std::lock_guard<std::mutex> lock(syncMutex_);
1645     int index = 1;
1646     for (const auto &tableName: tableNameList) {
1647         LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1648         int ret = storageProxy_->CleanWaterMark(tableName);
1649         if (ret != E_OK) {
1650         LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1651             return ret;
1652         }
1653         index++;
1654     }
1655     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1656     if (errCode != E_OK) {
1657         LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1658         return errCode;
1659     }
1660 
1661     std::vector<Asset> assets;
1662     errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1663     if (errCode != E_OK) {
1664         LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1665         storageProxy_->Rollback();
1666         return errCode;
1667     }
1668 
1669     if (!assets.empty() && mode == FLAG_AND_DATA) {
1670         errCode = cloudDB_.RemoveLocalAssets(assets);
1671         if (errCode != E_OK) {
1672             LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1673             storageProxy_->Rollback();
1674             return errCode;
1675         }
1676     }
1677 
1678     storageProxy_->Commit();
1679     return errCode;
1680 }
1681 
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1682 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1683 {
1684     std::lock_guard<std::mutex> lock(syncMutex_);
1685     for (const auto &tableName: tableNameList) {
1686         int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1687         if (ret != E_OK) {
1688             LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1689             return ret;
1690         }
1691     }
1692     return E_OK;
1693 }
1694 
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1695 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam &param)
1696 {
1697     {
1698         std::lock_guard<std::mutex> autoLock(dataLock_);
1699         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1700     }
1701 }
1702 
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1703 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1704     DownloadCommitList &commitList, int errCode)
1705 {
1706     if (commitList.empty()) {
1707         return E_OK;
1708     }
1709     uint32_t successCount = 0u;
1710     int ret = HandleDownloadResult(downloadItem, info.tableName, commitList, successCount);
1711     if (errCode == E_OK) {
1712         info.downLoadInfo.failCount += (commitList.size() - successCount);
1713         info.downLoadInfo.successCount -= (commitList.size() - successCount);
1714     }
1715     if (ret != E_OK) {
1716         LOGE("Commit download result failed.%d", ret);
1717     }
1718     commitList.clear();
1719     return ret;
1720 }
1721 
GetIdentify() const1722 std::string CloudSyncer::GetIdentify() const
1723 {
1724     return id_;
1725 }
1726 
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1727 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult)
1728 {
1729     strategyOpResult = OpType::NOT_HANDLE;
1730     // ignore same record with local generate data
1731     if (dataInfo.localInfo.logInfo.device.empty() &&
1732         !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1733         // not handle same data
1734         return E_OK;
1735     }
1736     {
1737         std::lock_guard<std::mutex> autoLock(dataLock_);
1738         if (!currentContext_.strategy) {
1739             LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1740             return -E_INTERNAL_ERROR;
1741         }
1742         bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1743             dataInfo.cloudLogInfo, policy_);
1744         strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1745             dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1746     }
1747     if (strategyOpResult == OpType::DELETE) {
1748         param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1749     }
1750     return E_OK;
1751 }
1752 
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1753 int CloudSyncer::GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
1754     std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1755 {
1756     int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index],
1757         logInfo, localAssetInfo);
1758     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1759         return errCode;
1760     }
1761     std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1762     if (hashKey.empty()) {
1763         return errCode;
1764     }
1765     param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1766     param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1767     if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1768         LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1769             DBCommon::TransferStringToHex(hashKey).c_str());
1770         logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1771         logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1772         logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1773         logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1774         logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1775         logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1776         logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1777         // delete record should remove local asset info
1778         if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1779             localAssetInfo.clear();
1780         }
1781         errCode = E_OK;
1782     }
1783     logInfo.logInfo.isNeedUpdateAsset = IsNeedUpdateAsset(localAssetInfo);
1784     return errCode;
1785 }
1786 
GetNextTaskId()1787 TaskId CloudSyncer::GetNextTaskId()
1788 {
1789     std::lock_guard<std::mutex> autoLock(dataLock_);
1790     if (!priorityTaskQueue_.empty()) {
1791         return priorityTaskQueue_.front();
1792     }
1793     if (!taskQueue_.empty()) {
1794         return taskQueue_.front();
1795     }
1796     return INVALID_TASK_ID;
1797 }
1798 
MarkCurrentTaskPausedIfNeed()1799 void CloudSyncer::MarkCurrentTaskPausedIfNeed()
1800 {
1801     std::lock_guard<std::mutex> autoLock(dataLock_);
1802     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1803         return;
1804     }
1805     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1806         return;
1807     }
1808     if (!cloudTaskInfos_[currentContext_.currentTaskId].priorityTask) {
1809         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1810         LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1811     }
1812 }
1813 
SetCurrentTaskFailedWithoutLock(int errCode)1814 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1815 {
1816     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1817         return;
1818     }
1819     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1820 }
1821 
LockCloudIfNeed(TaskId taskId)1822 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1823 {
1824     {
1825         std::lock_guard<std::mutex> autoLock(dataLock_);
1826         if (currentContext_.locker != nullptr) {
1827             LOGD("[CloudSyncer] lock exist");
1828             return E_OK;
1829         }
1830     }
1831     std::shared_ptr<CloudLocker> locker = nullptr;
1832     int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1833         return LockCloud(taskId);
1834     }, [this]() {
1835         int unlockCode = UnlockCloud();
1836         if (unlockCode != E_OK) {
1837             SetCurrentTaskFailedWithoutLock(unlockCode);
1838         }
1839     }, locker);
1840     {
1841         std::lock_guard<std::mutex> autoLock(dataLock_);
1842         currentContext_.locker = locker;
1843     }
1844     return errCode;
1845 }
1846 
UnlockIfNeed()1847 void CloudSyncer::UnlockIfNeed()
1848 {
1849     std::shared_ptr<CloudLocker> cacheLocker;
1850     {
1851         std::lock_guard<std::mutex> autoLock(dataLock_);
1852         if (currentContext_.locker == nullptr) {
1853             LOGW("[CloudSyncer] locker is nullptr when unlock it"); // should not happen
1854         }
1855         cacheLocker = currentContext_.locker;
1856         currentContext_.locker = nullptr;
1857     }
1858     // unlock without mutex
1859     cacheLocker = nullptr;
1860 }
1861 
ClearCurrentContextWithoutLock()1862 void CloudSyncer::ClearCurrentContextWithoutLock()
1863 {
1864     heartbeatCount_.erase(currentContext_.currentTaskId);
1865     failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1866     currentContext_.currentTaskId = INVALID_TASK_ID;
1867     currentContext_.notifier = nullptr;
1868     currentContext_.strategy = nullptr;
1869     currentContext_.processRecorder = nullptr;
1870     currentContext_.tableName.clear();
1871     currentContext_.assetDownloadList.clear();
1872     currentContext_.assetFields.clear();
1873     currentContext_.assetsInfo.clear();
1874     currentContext_.cloudWaterMarks.clear();
1875     currentContext_.isNeedUpload = false;
1876     currentContext_.currentUserIndex = 0;
1877     currentContext_.repeatCount = 0;
1878 }
1879 
ClearContextAndNotify(TaskId taskId,int errCode)1880 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1881 {
1882     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1883     CloudTaskInfo info;
1884     {
1885         // clear current context
1886         std::lock_guard<std::mutex> autoLock(dataLock_);
1887         notifier = currentContext_.notifier;
1888         ClearCurrentContextWithoutLock();
1889         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1890             LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1891             contextCv_.notify_one();
1892             return;
1893         }
1894         info = std::move(cloudTaskInfos_[taskId]);
1895         cloudTaskInfos_.erase(taskId);
1896         resumeTaskInfos_.erase(taskId);
1897     }
1898     contextCv_.notify_one();
1899     if (info.errCode == E_OK) {
1900         info.errCode = errCode;
1901     }
1902     LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1903         info.errCode);
1904     info.status = ProcessStatus::FINISHED;
1905     if (notifier != nullptr) {
1906         notifier->NotifyProcess(info, {}, true);
1907     }
1908     // generate compensated sync
1909     if (!info.compensatedTask) {
1910         CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1911         GenerateCompensatedSync(taskInfo);
1912     }
1913 }
1914 
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1915 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload)
1916 {
1917     int ret = CheckTaskIdValid(taskId);
1918     if (ret != E_OK) {
1919         return ret;
1920     }
1921     bool abort = false;
1922     ret = DownloadDataFromCloud(taskId, param, abort, isFirstDownload);
1923     if (abort) {
1924         return ret;
1925     }
1926     // Save data in transaction, update cloud water mark, notify process and changed data
1927     ret = SaveDataNotifyProcess(taskId, param);
1928     if (ret == -E_TASK_PAUSED) {
1929         return ret;
1930     }
1931     if (ret != E_OK) {
1932         std::lock_guard<std::mutex> autoLock(dataLock_);
1933         param.info.tableStatus = ProcessStatus::FINISHED;
1934         currentContext_.notifier->UpdateProcess(param.info);
1935         return ret;
1936     }
1937     (void)NotifyInDownload(taskId, param, isFirstDownload);
1938     return SaveCloudWaterMark(param.tableName, taskId);
1939 }
1940 
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)1941 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1942     DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
1943 {
1944     CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
1945     if (downloadItem.assets.empty()) { // Download data (include deleting)
1946         return E_OK;
1947     }
1948     bool isSharedTable = false;
1949     int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1950     if (errorCode != E_OK) {
1951         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
1952         return errorCode;
1953     }
1954     if (!isSharedTable) {
1955         errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
1956         if (errorCode == -E_NOT_SET) {
1957             return -E_NOT_SET;
1958         }
1959     } else {
1960         // share table will not download asset, need to reset the status
1961         for (auto &entry: downloadItem.assets) {
1962             for (auto &asset: entry.second) {
1963                 asset.status = AssetStatus::NORMAL;
1964             }
1965         }
1966     }
1967     if (errorCode != E_OK) {
1968         info.downLoadInfo.failCount += 1;
1969         if (info.downLoadInfo.successCount == 0) {
1970             LOGW("[CloudSyncer] Invalid successCount");
1971         } else {
1972             info.downLoadInfo.successCount -= 1;
1973         }
1974     }
1975     if (!downloadItem.assets.empty()) {
1976         if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1977             changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1978                 downloadItem.primaryKeyValList);
1979         } else if (downloadItem.strategy == OpType::INSERT) {
1980             changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
1981         }
1982     }
1983     // If the assets are DELETE, needn't fill back cloud assets.
1984     if (downloadItem.strategy == OpType::DELETE) {
1985         return E_OK;
1986     }
1987     return errorCode;
1988 }
1989 
GetSyncParamForDownload(TaskId taskId,SyncParam & param)1990 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam &param)
1991 {
1992     int ret = E_OK;
1993     if (IsCurrentTableResume(taskId, false)) {
1994         std::lock_guard<std::mutex> autoLock(dataLock_);
1995         if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
1996             param = resumeTaskInfos_[taskId].syncParam;
1997             resumeTaskInfos_[taskId].syncParam = {};
1998             ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1999             if (ret != E_OK) {
2000                 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2001             }
2002             LOGD("[CloudSyncer] Get sync param from cache");
2003             return E_OK;
2004         }
2005     }
2006     ret = GetCurrentTableName(param.tableName);
2007     if (ret != E_OK) {
2008         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2009         return ret;
2010     }
2011     param.info.tableName = param.tableName;
2012     std::vector<Field> assetFields;
2013     // only no primary key and composite primary key contains rowid.
2014     ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2015     if (ret != E_OK) {
2016         LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2017         return ret;
2018     }
2019     {
2020         std::lock_guard<std::mutex> autoLock(dataLock_);
2021         currentContext_.assetFields[currentContext_.tableName] = assetFields;
2022     }
2023     param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2024     if (!IsModeForcePull(taskId) && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId))) {
2025         ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2026         if (ret != E_OK) {
2027             LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2028         }
2029     }
2030     currentContext_.notifier->GetDownloadInfoByTableName(param.info);
2031     return ret;
2032 }
2033 
IsCurrentTableResume(TaskId taskId,bool upload)2034 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2035 {
2036     std::lock_guard<std::mutex> autoLock(dataLock_);
2037     if (!cloudTaskInfos_[taskId].resume) {
2038         return false;
2039     }
2040     if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2041         return false;
2042     }
2043     return upload == resumeTaskInfos_[taskId].upload;
2044 }
2045 
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool & abort,bool isFirstDownload)2046 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool &abort,
2047     bool isFirstDownload)
2048 {
2049     // Get cloud data after cloud water mark
2050     param.info.tableStatus = ProcessStatus::PROCESSING;
2051     param.downloadData = {};
2052     int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2053     if (ret == -E_QUERY_END) {
2054         // Won't break here since downloadData may not be null
2055         param.isLastBatch = true;
2056     } else if (ret != E_OK) {
2057         std::lock_guard<std::mutex> autoLock(dataLock_);
2058         param.info.tableStatus = ProcessStatus::FINISHED;
2059         currentContext_.notifier->UpdateProcess(param.info);
2060         abort = true;
2061         return ret;
2062     }
2063     if (param.downloadData.data.empty()) {
2064         if (ret == E_OK || isFirstDownload) {
2065             LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2066             UpdateCloudWaterMark(taskId, param);
2067             // Cloud water may change on the cloud, it needs to be saved here
2068             SaveCloudWaterMark(param.tableName, taskId);
2069         }
2070         if (isFirstDownload) {
2071             NotifyInEmptyDownload(taskId, param.info);
2072         }
2073         abort = true;
2074     }
2075     return E_OK;
2076 }
2077 
GetDownloadAssetIndex(TaskId taskId)2078 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2079 {
2080     size_t index = 0u;
2081     std::lock_guard<std::mutex> autoLock(dataLock_);
2082     if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2083         index = resumeTaskInfos_[taskId].lastDownloadIndex;
2084         resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2085     }
2086     return index;
2087 }
2088 
GetCurrentTableUploadBatchIndex()2089 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2090 {
2091     std::lock_guard<std::mutex> autoLock(dataLock_);
2092     return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2093 }
2094 
ResetCurrentTableUploadBatchIndex()2095 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2096 {
2097     std::lock_guard<std::mutex> autoLock(dataLock_);
2098     currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2099 }
2100 
RecordWaterMark(TaskId taskId,Timestamp waterMark)2101 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2102 {
2103     std::lock_guard<std::mutex> autoLock(dataLock_);
2104     resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2105 }
2106 
GetResumeWaterMark(TaskId taskId)2107 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2108 {
2109     std::lock_guard<std::mutex> autoLock(dataLock_);
2110     return resumeTaskInfos_[taskId].lastLocalWatermark;
2111 }
2112 
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2113 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2114 {
2115     InnerProcessInfo info;
2116     info.tableName = tableName;
2117     info.tableStatus = ProcessStatus::PROCESSING;
2118     ReloadUploadInfoIfNeed(uploadParam.taskId, uploadParam, info);
2119     return info;
2120 }
2121 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2122 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2123 {
2124     cloudDB_.SetGenCloudVersionCallback(callback);
2125 }
2126 
CopyAndClearTaskInfos()2127 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2128 {
2129     std::vector<CloudTaskInfo> infoList;
2130     std::lock_guard<std::mutex> autoLock(dataLock_);
2131     for (const auto &item: cloudTaskInfos_) {
2132         infoList.push_back(item.second);
2133     }
2134     taskQueue_.clear();
2135     priorityTaskQueue_.clear();
2136     cloudTaskInfos_.clear();
2137     resumeTaskInfos_.clear();
2138     currentContext_.notifier = nullptr;
2139     return infoList;
2140 }
2141 
WaitCurTaskFinished()2142 void CloudSyncer::WaitCurTaskFinished()
2143 {
2144     LOGD("[CloudSyncer] begin wait current task finished");
2145     std::unique_lock<std::mutex> uniqueLock(dataLock_);
2146     contextCv_.wait(uniqueLock, [this]() {
2147         return currentContext_.currentTaskId == INVALID_TASK_ID;
2148     });
2149     LOGD("[CloudSyncer] current task has been finished");
2150 }
2151 } // namespace DistributedDB
2152