• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud_sync_tag_assets.h"
24 #include "cloud_sync_utils.h"
25 #include "db_errno.h"
26 #include "cloud/icloud_db.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "platform_specific.h"
30 #include "runtime_context.h"
31 #include "strategy_factory.h"
32 #include "storage_proxy.h"
33 #include "store_types.h"
34 
35 namespace DistributedDB {
36 namespace {
37     const TaskId INVALID_TASK_ID = 0u;
38     const int MAX_HEARTBEAT_FAILED_LIMIT = 2;
39     const int HEARTBEAT_PERIOD = 3;
40     const int MAX_DOWNLOAD_COMMIT_LIMIT = 5;
41 }
42 
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy)43 CloudSyncer::CloudSyncer(std::shared_ptr<StorageProxy> storageProxy)
44     : currentTaskId_(INVALID_TASK_ID),
45       storageProxy_(std::move(storageProxy)),
46       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
47       closed_(false),
48       timerId_(0u),
49       heartBeatCount_(0),
50       failedHeartBeatCount_(0),
51       syncCallbackCount_(0)
52 {
53     if (storageProxy_ != nullptr) {
54         id_ = storageProxy_->GetIdentify();
55     }
56 }
57 
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)58 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
59     const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
60 {
61     int errCode = CheckParamValid(devices, mode);
62     if (errCode != E_OK) {
63         return errCode;
64     }
65     if (cloudDB_.IsNotExistCloudDB()) {
66         return -E_CLOUD_ERROR;
67     }
68     if (closed_) {
69         return -E_DB_CLOSED;
70     }
71     CloudTaskInfo taskInfo;
72     taskInfo.mode = mode;
73     taskInfo.table = tables;
74     taskInfo.callback = callback;
75     taskInfo.timeout = waitTime;
76     taskInfo.devices = devices;
77     errCode = TryToAddSyncTask(std::move(taskInfo));
78     if (errCode != E_OK) {
79         return errCode;
80     }
81     return TriggerSync();
82 }
83 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)84 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
85 {
86     cloudDB_.SetCloudDB(cloudDB);
87     LOGI("[CloudSyncer] SetCloudDB finish");
88 }
89 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)90 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
91 {
92     cloudDB_.SetIAssetLoader(loader);
93     LOGI("[CloudSyncer] SetIAssetLoader finish");
94 }
95 
Close()96 void CloudSyncer::Close()
97 {
98     closed_ = true;
99     CloudSyncer::TaskId currentTask;
100     {
101         std::lock_guard<std::mutex> autoLock(contextLock_);
102         currentTask = currentContext_.currentTaskId;
103     }
104     // mark current task db_closed
105     SetTaskFailed(currentTask, -E_DB_CLOSED);
106     cloudDB_.Close();
107     {
108         LOGD("[CloudSyncer] begin wait current task finished");
109         std::unique_lock<std::mutex> uniqueLock(contextLock_);
110         contextCv_.wait(uniqueLock, [this]() {
111             return currentContext_.currentTaskId == INVALID_TASK_ID;
112         });
113         LOGD("[CloudSyncer] current task has been finished");
114     }
115 
116     // copy all task from queue
117     std::vector<CloudTaskInfo> infoList;
118     {
119         std::lock_guard<std::mutex> autoLock(queueLock_);
120         for (const auto &item: cloudTaskInfos_) {
121             infoList.push_back(item.second);
122         }
123         taskQueue_.clear();
124         cloudTaskInfos_.clear();
125     }
126     // notify all DB_CLOSED
127     for (auto &info: infoList) {
128         info.status = ProcessStatus::FINISHED;
129         info.errCode = -E_DB_CLOSED;
130         ProcessNotifier notifier(this);
131         notifier.Init(info.table, info.devices);
132         notifier.NotifyProcess(info, {}, true);
133         LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", info.taskId, info.errCode);
134     }
135     storageProxy_->Close();
136     WaitAllSyncCallbackTaskFinish();
137 }
138 
TriggerSync()139 int CloudSyncer::TriggerSync()
140 {
141     if (closed_) {
142         return -E_DB_CLOSED;
143     }
144     RefObject::IncObjRef(this);
145     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
146         DoSyncIfNeed();
147         RefObject::DecObjRef(this);
148     });
149     if (errCode != E_OK) {
150         LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
151         RefObject::DecObjRef(this);
152     }
153     return errCode;
154 }
155 
DoSyncIfNeed()156 void CloudSyncer::DoSyncIfNeed()
157 {
158     if (closed_) {
159         return;
160     }
161     // get taskId from queue
162     TaskId triggerTaskId;
163     {
164         std::lock_guard<std::mutex> autoLock(queueLock_);
165         if (taskQueue_.empty()) {
166             return;
167         }
168         triggerTaskId = taskQueue_.front();
169     }
170     // pop taskId in queue
171     if (PrepareSync(triggerTaskId) != E_OK) {
172         return;
173     }
174     // do sync logic
175     int errCode = DoSync(triggerTaskId);
176     // finished after sync
177     DoFinished(triggerTaskId, errCode, {});
178     // do next task async
179     (void)TriggerSync();
180 }
181 
DoSync(TaskId taskId)182 int CloudSyncer::DoSync(TaskId taskId)
183 {
184     std::lock_guard<std::mutex> lock(syncMutex_);
185     CloudTaskInfo taskInfo;
186     {
187         std::lock_guard<std::mutex> autoLock(queueLock_);
188         taskInfo = cloudTaskInfos_[taskId];
189     }
190     int errCode = LockCloud(taskId);
191     if (errCode != E_OK) {
192         return errCode;
193     }
194     bool needUpload = true;
195     {
196         std::lock_guard<std::mutex> autoLock(contextLock_);
197         needUpload = currentContext_.strategy->JudgeUpload();
198     }
199     errCode = DoSyncInner(taskInfo, needUpload);
200     int unlockCode = UnlockCloud();
201     if (errCode == E_OK) {
202         errCode = unlockCode;
203     }
204     return errCode;
205 }
206 
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)207 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
208 {
209     if (!needUpload) {
210         return E_OK;
211     }
212     int errCode = storageProxy_->StartTransaction();
213     if (errCode != E_OK) {
214         LOGE("[CloudSyncer] start transaction Failed before doing upload.");
215         return errCode;
216     }
217     for (size_t i = 0; i < taskInfo.table.size(); ++i) {
218         LOGD("[CloudSyncer] try upload table, index: %zu", i);
219         errCode = CheckTaskIdValid(taskInfo.taskId);
220         if (errCode != E_OK) {
221             LOGE("[CloudSyncer] task is invalid, abort sync");
222             break;
223         }
224         {
225             std::lock_guard<std::mutex> autoLock(contextLock_);
226             currentContext_.tableName = taskInfo.table[i];
227         }
228         errCode = DoUpload(taskInfo.taskId, i == (taskInfo.table.size() - 1u));
229         if (errCode != E_OK) {
230             LOGE("[CloudSyncer] upload failed %d", errCode);
231             break;
232         }
233         errCode = SaveCloudWaterMark(taskInfo.table[i]);
234         if (errCode != E_OK) {
235             LOGE("[CloudSyncer] Can not save cloud water mark after uploading %d", errCode);
236             break;
237         }
238     }
239     if (errCode == E_OK) {
240         int commitErrorCode = storageProxy_->Commit();
241         if (commitErrorCode != E_OK) {
242             LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
243         }
244     } else {
245         int rollBackErrorCode = storageProxy_->Rollback();
246         if (rollBackErrorCode != E_OK) {
247             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
248         }
249     }
250     return errCode;
251 }
252 
DoSyncInner(const CloudTaskInfo & taskInfo,const bool needUpload)253 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload)
254 {
255     int errCode = E_OK;
256     for (size_t i = 0; i < taskInfo.table.size(); ++i) {
257         LOGD("[CloudSyncer] try download table, index: %zu", i);
258         errCode = CheckTaskIdValid(taskInfo.taskId);
259         if (errCode != E_OK) {
260             LOGE("[CloudSyncer] task is invalid, abort sync");
261             return errCode;
262         }
263         {
264             std::lock_guard<std::mutex> autoLock(contextLock_);
265             currentContext_.tableName = taskInfo.table[i];
266         }
267         errCode = DoDownload(taskInfo.taskId);
268         if (errCode != E_OK) {
269             LOGE("[CloudSyncer] download failed %d", errCode);
270             return errCode;
271         }
272         if (needUpload) {
273             continue;
274         }
275         errCode = SaveCloudWaterMark(taskInfo.table[i]);
276         if (errCode != E_OK) {
277             LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
278             return errCode;
279         }
280     }
281 
282     return DoUploadInNeed(taskInfo, needUpload);
283 }
284 
DoFinished(TaskId taskId,int errCode,const InnerProcessInfo & processInfo)285 void CloudSyncer::DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo)
286 {
287     {
288         std::lock_guard<std::mutex> autoLock(queueLock_);
289         taskQueue_.remove(taskId);
290     }
291     std::shared_ptr<ProcessNotifier> notifier = nullptr;
292     {
293         // check current task is running or not
294         std::lock_guard<std::mutex> autoLock(contextLock_);
295         if (currentContext_.currentTaskId != taskId) { // should not happen
296             LOGW("[CloudSyncer] taskId %" PRIu64 " not exist in context!", taskId);
297             return;
298         }
299         currentContext_.currentTaskId = INVALID_TASK_ID;
300         notifier = currentContext_.notifier;
301         currentContext_.notifier = nullptr;
302         currentContext_.strategy = nullptr;
303         currentContext_.tableName.clear();
304         currentContext_.assetDownloadList.completeDownloadList.clear();
305         currentContext_.assetDownloadList.downloadList.clear();
306         currentContext_.assetFields.clear();
307         currentContext_.assetsInfo.clear();
308         currentContext_.cloudWaterMarks.clear();
309     }
310     CloudTaskInfo info;
311     {
312         std::lock_guard<std::mutex> autoLock(queueLock_);
313         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
314             LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
315             contextCv_.notify_one();
316             return;
317         }
318         info = std::move(cloudTaskInfos_[taskId]);
319         cloudTaskInfos_.erase(taskId);
320     }
321     contextCv_.notify_one();
322     if (info.errCode == E_OK) {
323         info.errCode = errCode;
324     }
325     LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", taskId, info.errCode);
326     info.status = ProcessStatus::FINISHED;
327     if (notifier != nullptr) {
328         notifier->NotifyProcess(info, processInfo, true);
329     }
330 }
331 
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)332 static int SaveChangedDataByType(const VBucket &datum, ChangedData &changedData, const DataInfoWithLog &localInfo,
333     ChangeType type)
334 {
335     int ret = E_OK;
336     std::vector<Type> cloudPkVals;
337     if (type == ChangeType::OP_DELETE) {
338         ret = GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
339     } else {
340         ret = GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
341     }
342     if (ret != E_OK) {
343         return ret;
344     }
345     changedData.primaryData[type].emplace_back(std::move(cloudPkVals));
346     return E_OK;
347 }
348 
FindDeletedListIndex(const std::vector<std::pair<Key,size_t>> & deletedList,const Key & hashKey,size_t & delIdx)349 int CloudSyncer::FindDeletedListIndex(const std::vector<std::pair<Key, size_t>> &deletedList, const Key &hashKey,
350     size_t &delIdx)
351 {
352     for (std::pair<Key, size_t> pair : deletedList) {
353         if (pair.first == hashKey) {
354             delIdx = pair.second;
355             return E_OK;
356         }
357     }
358     return -E_INTERNAL_ERROR;
359 }
360 
SaveChangedData(SyncParam & param,int downIndex,const DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)361 int CloudSyncer::SaveChangedData(SyncParam &param, int downIndex, const DataInfo &dataInfo,
362     std::vector<std::pair<Key, size_t>> &deletedList)
363 {
364     OpType opType = param.downloadData.opType[downIndex];
365     Key hashKey = dataInfo.localInfo.logInfo.hashKey;
366     if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
367         if (opType == OpType::INSERT) {
368             size_t delIdx;
369             int errCode = FindDeletedListIndex(deletedList, hashKey, delIdx);
370             if (errCode != E_OK) {
371                 LOGE("[CloudSyncer] FindDeletedListIndex could not find delete item.");
372                 return errCode;
373             }
374             param.changedData.primaryData[ChangeType::OP_DELETE].erase(
375                 param.changedData.primaryData[ChangeType::OP_DELETE].begin() + delIdx);
376             (void)param.dupHashKeySet.insert(hashKey);
377             opType = OpType::UPDATE;
378             // only composite primary key needs to be processed.
379             if (!param.isSinglePrimaryKey) {
380                 param.withoutRowIdData.updateData.push_back(std::make_tuple(downIndex,
381                     param.changedData.primaryData[ChangeType::OP_UPDATE].size()));
382             }
383         } else if (opType == OpType::DELETE) {
384             std::pair<Key, size_t> pair{hashKey, static_cast<size_t>(
385                 param.changedData.primaryData[ChangeType::OP_DELETE].size())};
386             deletedList.emplace_back(pair);
387         } else {
388             LOGW("[CloudSyncer] deletePrimaryKeySet ignore opType %d.", opType);
389         }
390     }
391     // INSERT: for no primary key or composite primary key situation
392     if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
393         param.withoutRowIdData.insertData.push_back(downIndex);
394         return E_OK;
395     }
396     switch (opType) {
397         // INSERT: only for single primary key situation
398         case OpType::INSERT:
399             return SaveChangedDataByType(
400                 param.downloadData.data[downIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
401         case OpType::UPDATE:
402             if (NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
403                 return SaveChangedDataByType(param.downloadData.data[downIndex], param.changedData,
404                     dataInfo.localInfo, ChangeType::OP_UPDATE);
405             }
406             break;
407         case OpType::DELETE:
408             return SaveChangedDataByType(param.downloadData.data[downIndex], param.changedData,
409                 dataInfo.localInfo, ChangeType::OP_DELETE);
410         default:
411             break;
412     }
413     return E_OK;
414 }
415 
GetCloudLogInfo(VBucket & datum)416 static LogInfo GetCloudLogInfo(VBucket &datum)
417 {
418     LogInfo cloudLogInfo = { 0 };
419     cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
420     cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
421     cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
422     cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
423     return cloudLogInfo;
424 }
425 
426 /**
427  * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
428 */
UpdateChangedData(SyncParam & param,AssetDownloadList & assetsDownloadList)429 int CloudSyncer::UpdateChangedData(SyncParam &param, AssetDownloadList &assetsDownloadList)
430 {
431     if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
432         return E_OK;
433     }
434     int ret = E_OK;
435     for (size_t j : param.withoutRowIdData.insertData) {
436         VBucket &datum = param.downloadData.data[j];
437         std::vector<Type> primaryValues;
438         ret = GetCloudPkVals(datum, param.changedData.field,
439             std::get<int64_t>(datum[CloudDbConstant::ROW_ID_FIELD_NAME]), primaryValues);
440         if (ret != E_OK) {
441             LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
442             return ret;
443         }
444         param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
445     }
446     for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
447         size_t downloadIndex = std::get<0>(tuple);
448         VBucket &datum = param.downloadData.data[downloadIndex];
449         size_t insertIdx = std::get<1>(tuple);
450         std::vector<Type> &pkVal = std::get<5>(assetsDownloadList.downloadList[insertIdx]); // 5 means primary key list
451         pkVal[0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME];
452     }
453     for (const auto &tuple : param.withoutRowIdData.updateData) {
454         size_t downloadIndex = std::get<0>(tuple);
455         size_t updateIndex = std::get<1>(tuple);
456         VBucket &datum = param.downloadData.data[downloadIndex];
457         size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
458         if (updateIndex >= size) {
459             LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
460             return -E_INTERNAL_ERROR;
461         }
462         if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
463             LOGE("[CloudSyncer] primary key value list should not be empty.");
464             return -E_INTERNAL_ERROR;
465         }
466         // no primary key or composite primary key, the first element is rowid
467         param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
468             datum[CloudDbConstant::ROW_ID_FIELD_NAME];
469     }
470     return ret;
471 }
472 
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)473 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
474 {
475     for (const auto &assetField : assetFields) {
476         if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
477             if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
478                 return true;
479             }
480         }
481     }
482     return false;
483 }
484 
IsDataContainAssets()485 bool CloudSyncer::IsDataContainAssets()
486 {
487     std::lock_guard<std::mutex> autoLock(contextLock_);
488     bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
489     if (!hasTable) {
490         LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exit in currentContext, %d.",
491             -E_INTERNAL_ERROR);
492             return false;
493     }
494     if (hasTable && currentContext_.assetFields[currentContext_.tableName].empty()) {
495         LOGI("[CloudSyncer] Current table do not contain assets, thereby we needn't download assets");
496         return false;
497     }
498     return true;
499 }
500 
IncSyncCallbackTaskCount()501 void CloudSyncer::IncSyncCallbackTaskCount()
502 {
503     std::lock_guard<std::mutex> autoLock(syncCallbackMutex_);
504     syncCallbackCount_++;
505 }
506 
DecSyncCallbackTaskCount()507 void CloudSyncer::DecSyncCallbackTaskCount()
508 {
509     {
510         std::lock_guard<std::mutex> autoLock(syncCallbackMutex_);
511         syncCallbackCount_--;
512     }
513     syncCallbackCv_.notify_all();
514 }
515 
WaitAllSyncCallbackTaskFinish()516 void CloudSyncer::WaitAllSyncCallbackTaskFinish()
517 {
518     std::unique_lock<std::mutex> uniqueLock(syncCallbackMutex_);
519     LOGD("[CloudSyncer] Begin wait all callback task finish");
520     syncCallbackCv_.wait(uniqueLock, [this]() {
521         return syncCallbackCount_ <= 0;
522     });
523     LOGD("[CloudSyncer] End wait all callback task finish");
524 }
525 
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,int & errCode)526 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
527     bool setNormalStatus, int &errCode)
528 {
529     // Define a map to store the result
530     std::map<std::string, Assets> res = {};
531     std::vector<Field> assetFields;
532     {
533         std::lock_guard<std::mutex> autoLock(contextLock_);
534         assetFields = currentContext_.assetFields[currentContext_.tableName];
535     }
536     // For every column contain asset or assets, assetFields are in context
537     for (const Field &assetField : assetFields) {
538         Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus, errCode);
539         if (!assets.empty()) {
540             res[assetField.colName] = assets;
541         }
542         if (errCode != E_OK) {
543             break;
544         }
545     }
546     return res;
547 }
548 
FillCloudAssets(const std::string & tableName,VBucket & normalAssets,VBucket & failedAssets)549 int CloudSyncer::FillCloudAssets(
550     const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets)
551 {
552     int ret = E_OK;
553     if (normalAssets.size() > 1) {
554         ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true);
555         if (ret != E_OK) {
556             LOGE("[CloudSyncer] Can not fill normal cloud assets for download");
557             return ret;
558         }
559     }
560     if (failedAssets.size() > 1) {
561         ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false);
562         if (ret != E_OK) {
563             LOGE("[CloudSyncer] Can not fill abnormal assets for download");
564             return ret;
565         }
566     }
567     return E_OK;
568 }
569 
HandleDownloadResult(const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)570 int CloudSyncer::HandleDownloadResult(const std::string &tableName, DownloadCommitList &commitList,
571     uint32_t &successCount)
572 {
573     successCount = 0;
574     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
575     if (errCode != E_OK) {
576         LOGE("[CloudSyncer] start transaction Failed before handle download.");
577         return errCode;
578     }
579     errCode = storageProxy_->SetLogTriggerStatus(false);
580     if (errCode != E_OK) {
581         return errCode;
582     }
583     for (size_t i = 0; i < commitList.size(); i++) {
584         std::string gid = std::get<0>(commitList[i]); // 0 means gid is the first element in assetsInfo
585         // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
586         std::map<std::string, Assets> assetsMap = std::get<1>(commitList[i]);
587         bool setAllNormal = std::get<2>(commitList[i]); // 2 means whether the download return is E_OK
588         VBucket normalAssets;
589         VBucket failedAssets;
590         normalAssets[CloudDbConstant::GID_FIELD] = gid;
591         failedAssets[CloudDbConstant::GID_FIELD] = gid;
592         for (auto &assetKvPair : assetsMap) {
593             Assets &assets = assetKvPair.second;
594             if (setAllNormal) {
595                 normalAssets[assetKvPair.first] = std::move(assets);
596             } else {
597                 failedAssets[assetKvPair.first] = std::move(assets);
598             }
599         }
600         errCode = FillCloudAssets(tableName, normalAssets, failedAssets);
601         if (errCode != E_OK) {
602             break;
603         }
604         successCount++;
605     }
606     errCode = storageProxy_->SetLogTriggerStatus(true);
607     if (errCode != E_OK) {
608         LOGE("Set log trigger true failed when handle download.%d", errCode);
609         successCount = 0;
610         storageProxy_->Rollback();
611         return errCode;
612     }
613     errCode = storageProxy_->Commit();
614     if (errCode != E_OK) {
615         successCount = 0;
616     }
617     return errCode;
618 }
619 
CloudDbDownloadAssets(InnerProcessInfo & info,DownloadList & downloadList,bool willHandleResult,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)620 int CloudSyncer::CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult,
621     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
622 {
623     int downloadStatus = E_OK;
624     DownloadCommitList commitList;
625     for (size_t i = 0; i < downloadList.size(); i++) {
626         DownloadItem downloadItem;
627         GetDownloadItem(downloadList, i, downloadItem);
628         std::map<std::string, Assets> downloadAssets(downloadItem.assets);
629         CloudStorageUtils::EraseNoChangeAsset(downloadAssets);
630         if (downloadAssets.empty()) { // Download data (include deleting)
631             continue;
632         }
633         int errorCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, downloadAssets);
634         if (errorCode == -E_NOT_SET) {
635             info.downLoadInfo.failCount += (downloadList.size() - i);
636             info.downLoadInfo.successCount -= (downloadList.size() - i);
637             return -E_NOT_SET;
638         }
639         if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
640             changedAssets.primaryData[OpTypeToChangeType(downloadItem.strategy)].push_back(
641                 downloadItem.primaryKeyValList);
642         } else if (downloadItem.strategy == OpType::INSERT) {
643             changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
644         }
645         if (!willHandleResult) {
646             continue;
647         }
648         CloudStorageUtils::MergeDownloadAsset(downloadAssets, downloadItem.assets);
649         // Process result of each asset
650         commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
651         downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
652         if ((i + 1) % MAX_DOWNLOAD_COMMIT_LIMIT == 0 || i == (commitList.size() - 1)) {
653             int ret = CommitDownloadResult(info, commitList);
654             if (ret != E_OK) {
655                 return ret;
656             }
657         }
658     }
659     if (!commitList.empty()) {
660         int ret = CommitDownloadResult(info, commitList);
661         if (ret != E_OK) {
662             return ret;
663         }
664     }
665     LOGD("Download status is %d", downloadStatus);
666     return downloadStatus;
667 }
668 
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)669 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
670 {
671     downloadItem.gid = std::get<0>(downloadList[i]); // 0 means gid is the first element in assetsInfo
672     downloadItem.prefix = std::get<1>(downloadList[i]); // 1 means primaryKey is the second element in assetsInfo
673     downloadItem.strategy = std::get<2>(downloadList[i]); // 2 means strategy is the third element in assetsInfo
674     // 3 means assets info [colName, assets] is the forth element in downloadList[i]
675     downloadItem.assets = std::get<3>(downloadList[i]);
676     downloadItem.hashKey = std::get<4>(downloadList[i]); // 4 means hash key
677     downloadItem.primaryKeyValList = std::get<5>(downloadList[i]); // 5 means primary key value list
678 }
679 
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)680 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
681     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
682 {
683     if (!IsDataContainAssets()) {
684         return E_OK;
685     }
686     // update changed data info
687     if (!IsChngDataEmpty(changedAssets)) {
688         // changedData.primaryData should have no member inside
689         return -E_INVALID_ARGS;
690     }
691     changedAssets.tableName = info.tableName;
692     changedAssets.type = ChangedDataType::ASSET;
693     changedAssets.field = pKColNames;
694     // Get AssetDownloadList
695     DownloadList downloadList;
696     DownloadList completeDeletedList;
697     {
698         std::lock_guard<std::mutex> autoLock(contextLock_);
699         downloadList = currentContext_.assetDownloadList.downloadList;
700         completeDeletedList = currentContext_.assetDownloadList.completeDownloadList;
701     }
702     // Download data (include deleting) will handle return Code in this situation
703     int ret = CloudDbDownloadAssets(info, downloadList, true, dupHashKeySet, changedAssets);
704     if (ret != E_OK) {
705         LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
706         return ret;
707     }
708     // Download data (include deleting), won't handle return Code in this situation
709     ret = CloudDbDownloadAssets(info, completeDeletedList, false, dupHashKeySet, changedAssets);
710     if (ret != E_OK) {
711         LOGE("[CloudSyncer] Can not download assets or can not handle download result for deleted record %d", ret);
712     }
713     return ret;
714 }
715 
GetAssetsFromVBucket(VBucket & data)716 std::map<std::string, Assets> CloudSyncer::GetAssetsFromVBucket(VBucket &data)
717 {
718     std::map<std::string, Assets> assets;
719     std::vector<Field> fields;
720     {
721         std::lock_guard<std::mutex> autoLock(contextLock_);
722         fields = currentContext_.assetFields[currentContext_.tableName];
723     }
724     for (const auto &field : fields) {
725         if (data.find(field.colName) != data.end()) {
726             if (field.type == TYPE_INDEX<Asset> && data[field.colName].index() == TYPE_INDEX<Asset>) {
727                 assets[field.colName] = { std::get<Asset>(data[field.colName]) };
728             } else if (field.type == TYPE_INDEX<Assets> && data[field.colName].index() == TYPE_INDEX<Assets>) {
729                 assets[field.colName] = std::get<Assets>(data[field.colName]);
730             } else {
731                 Assets emptyAssets;
732                 assets[field.colName] = emptyAssets;
733             }
734         }
735     }
736     return assets;
737 }
738 
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)739 int CloudSyncer::TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
740 {
741     OpType strategyOpResult = OpType::NOT_HANDLE;
742     int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
743     if (errCode != E_OK) {
744         return errCode;
745     }
746     param.downloadData.opType[idx] = strategyOpResult;
747     if (!IsDataContainAssets()) {
748         return E_OK;
749     }
750     Key hashKey;
751     if (isExist) {
752         hashKey = dataInfo.localInfo.logInfo.hashKey;
753     }
754     return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
755 }
756 
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,DataInfo & dataInfo,VBucket & localAssetInfo)757 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, DataInfo &dataInfo,
758     VBucket &localAssetInfo)
759 {
760     int ret = E_OK;
761     OpType strategy = param.downloadData.opType[idx];
762     switch (strategy) {
763         case OpType::INSERT:
764         case OpType::UPDATE:
765         case OpType::DELETE:
766             ret = HandleTagAssets(hashKey, idx, param, dataInfo, localAssetInfo);
767             break;
768         case OpType::NOT_HANDLE:
769         case OpType::ONLY_UPDATE_GID:
770         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
771             // Save the asset info into context
772             std::map<std::string, Assets> assetsMap = GetAssetsFromVBucket(param.downloadData.data[idx]);
773             {
774                 std::lock_guard<std::mutex> autoLock(contextLock_);
775                 if (currentContext_.assetsInfo.find(param.tableName) == currentContext_.assetsInfo.end()) {
776                     currentContext_.assetsInfo[param.tableName] = {};
777                 }
778                 currentContext_.assetsInfo[param.tableName][dataInfo.cloudLogInfo.cloudGid] = assetsMap;
779             }
780             break;
781         }
782         default:
783             break;
784     }
785     return ret;
786 }
787 
HandleTagAssets(const Key & hashKey,size_t idx,SyncParam & param,DataInfo & dataInfo,VBucket & localAssetInfo)788 int CloudSyncer::HandleTagAssets(const Key &hashKey, size_t idx, SyncParam &param, DataInfo &dataInfo,
789     VBucket &localAssetInfo)
790 {
791     Type prefix;
792     std::vector<Type> pkVals;
793     OpType strategy = param.downloadData.opType[idx];
794     bool isDelStrategy = (strategy == OpType::DELETE);
795     int ret = GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys : param.downloadData.data[idx],
796         param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
797     if (ret != E_OK) {
798         LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
799         return ret;
800     }
801     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
802     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
803         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
804         return -E_INTERNAL_ERROR;
805     }
806     std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
807         false, ret);
808     if (ret != E_OK) {
809         LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR");
810         return ret;
811     }
812     if (isDelStrategy) {
813         param.assetsDownloadList.completeDownloadList.push_back(
814             std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey, pkVals));
815     } else {
816         if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
817             param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx,
818                 param.assetsDownloadList.downloadList.size()));
819         }
820         param.assetsDownloadList.downloadList.push_back(
821             std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey, pkVals));
822     }
823     return ret;
824 }
825 
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList)826 int CloudSyncer::SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList)
827 {
828     int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
829     if (ret != E_OK) {
830         LOGE("[CloudSyncer] Invalid download data:%d", ret);
831         return ret;
832     }
833     ModifyCloudDataTime(param.downloadData.data[idx]);
834     DataInfo dataInfo;
835     VBucket localAssetInfo;
836     bool isExist = true;
837     ret = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[idx], dataInfo.localInfo,
838         localAssetInfo);
839     if (ret == -E_NOT_FOUND) {
840         isExist = false;
841     } else if (ret != E_OK) {
842         LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
843         return ret;
844     }
845     // Get cloudLogInfo from cloud data
846     dataInfo.cloudLogInfo = GetCloudLogInfo(param.downloadData.data[idx]);
847     // Tag datum to get opType
848     ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
849     if (ret != E_OK) {
850         LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
851         return ret;
852     }
853     ret = SaveChangedData(param, idx, dataInfo, deletedList);
854     if (ret != E_OK) {
855         LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
856     }
857     return ret;
858 }
859 
ClearWithoutData(SyncParam & param)860 void CloudSyncer::ClearWithoutData(SyncParam &param)
861 {
862     param.withoutRowIdData.insertData.clear();
863     param.withoutRowIdData.updateData.clear();
864     param.withoutRowIdData.assetInsertData.clear();
865 }
866 
SaveData(SyncParam & param)867 int CloudSyncer::SaveData(SyncParam &param)
868 {
869     if (!IsChngDataEmpty(param.changedData)) {
870         LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
871         return -E_INVALID_ARGS;
872     }
873     // Update download batch Info
874     param.info.downLoadInfo.batchIndex += 1;
875     param.info.downLoadInfo.total += param.downloadData.data.size();
876     int ret = E_OK;
877     AssetDownloadList assetsDownloadList;
878     param.assetsDownloadList = assetsDownloadList;
879     param.deletePrimaryKeySet.clear();
880     param.dupHashKeySet.clear();
881     ClearWithoutData(param);
882     std::vector<std::pair<Key, size_t>> deletedList;
883     for (size_t i = 0; i < param.downloadData.data.size(); i++) {
884         ret = SaveDatum(param, i, deletedList);
885         if (ret != E_OK) {
886             param.info.downLoadInfo.failCount += param.downloadData.data.size();
887             LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
888             return ret;
889         }
890     }
891     // Save assetsMap into current context
892     {
893         std::lock_guard<std::mutex> autoLock(contextLock_);
894         currentContext_.assetDownloadList = param.assetsDownloadList;
895     }
896     // save the data to the database by batch, downloadData will return rowid when insert data.
897     ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
898     if (ret != E_OK) {
899         param.info.downLoadInfo.failCount += param.downloadData.data.size();
900         LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
901         return ret;
902     }
903     ret = UpdateChangedData(param, currentContext_.assetDownloadList);
904     if (ret != E_OK) {
905         param.info.downLoadInfo.failCount += param.downloadData.data.size();
906         LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
907         return ret;
908     }
909     // Update downloadInfo
910     param.info.downLoadInfo.successCount += param.downloadData.data.size();
911     // Get latest cloudWaterMark
912     VBucket &lastData = param.downloadData.data.back();
913     param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
914     return E_OK;
915 }
916 
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)917 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
918 {
919     // Check Input and Context Validity
920     int ret = CheckTaskIdValid(taskId);
921     if (ret != E_OK) {
922         return ret;
923     }
924     {
925         std::lock_guard<std::mutex> autoLock(queueLock_);
926         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
927             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
928             return -E_INVALID_ARGS;
929         }
930     }
931     if (currentContext_.strategy == nullptr) {
932         LOGE("[CloudSyncer] Strategy has not been initialized");
933         return -E_INVALID_ARGS;
934     }
935     ret = storageProxy_->CheckSchema(tableName);
936     if (ret != E_OK) {
937         LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
938         return ret;
939     }
940     return E_OK;
941 }
942 
NeedNotifyChangedData(const ChangedData & changedData)943 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
944 {
945     {
946         std::lock_guard<std::mutex> autoLock(contextLock_);
947         if (IsModeForcePush(currentContext_.currentTaskId)) {
948             return false;
949         }
950     }
951     // when there have no data been changed, it needn't notified
952     if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
953         changedData.primaryData[OP_DELETE].empty()) {
954             return false;
955         }
956     return true;
957 }
958 
NotifyChangedData(ChangedData && changedData)959 int CloudSyncer::NotifyChangedData(ChangedData &&changedData)
960 {
961     if (!NeedNotifyChangedData(changedData)) {
962         return E_OK;
963     }
964     std::string deviceName;
965     {
966         std::lock_guard<std::mutex> autoLock(contextLock_);
967         std::vector<std::string> devices = currentContext_.notifier->GetDevices();
968         if (devices.empty()) {
969             LOGE("[CloudSyncer] CurrentContext do not contain device info");
970             return -E_CLOUD_ERROR;
971         }
972         // We use first device name as the target of NotifyChangedData
973         deviceName = devices[0];
974     }
975     int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData));
976     if (ret != E_OK) {
977         LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
978     }
979     return ret;
980 }
981 
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param)982 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param)
983 {
984     CloudTaskInfo taskInfo;
985     {
986         std::lock_guard<std::mutex> autoLock(queueLock_);
987         taskInfo = cloudTaskInfos_[taskId];
988     }
989     std::lock_guard<std::mutex> autoLock(contextLock_);
990 
991     if (currentContext_.strategy->JudgeUpload()) {
992         currentContext_.notifier->NotifyProcess(taskInfo, param.info);
993     } else {
994         if (param.isLastBatch) {
995             param.info.tableStatus = ProcessStatus::FINISHED;
996         }
997         if (taskInfo.table.back() == param.tableName && param.isLastBatch) {
998             currentContext_.notifier->UpdateProcess(param.info);
999         } else {
1000             currentContext_.notifier->NotifyProcess(taskInfo, param.info);
1001         }
1002     }
1003 }
1004 
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)1005 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam &param)
1006 {
1007     int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1008     if (ret != E_OK) {
1009         LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
1010         return ret;
1011     }
1012     if (!IsModeForcePush(taskId)) {
1013         param.changedData.tableName = param.info.tableName;
1014         param.changedData.field = param.pkColNames;
1015         param.changedData.type = ChangedDataType::DATA;
1016     }
1017     ret = SaveData(param);
1018     if (ret != E_OK) {
1019         LOGE("[CloudSyncer] cannot save data: %d.", ret);
1020         int rollBackErrorCode = storageProxy_->Rollback();
1021         if (rollBackErrorCode != E_OK) {
1022             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1023         } else {
1024             LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1025         }
1026         return ret;
1027     }
1028     ret = storageProxy_->Commit();
1029     if (ret != E_OK) {
1030         LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1031     }
1032     return ret;
1033 }
1034 
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param)1035 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param)
1036 {
1037     ChangedData changedData;
1038     param.changedData = changedData;
1039     param.downloadData.opType.resize(param.downloadData.data.size());
1040     int ret = SaveDataInTransaction(taskId, param);
1041     if (ret != E_OK) {
1042         return ret;
1043     }
1044     // call OnChange to notify changedData object first time (without Assets)
1045     ret = NotifyChangedData(std::move(param.changedData));
1046     if (ret != E_OK) {
1047         LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1048         return ret;
1049     }
1050     // Begin downloading assets
1051     ChangedData changedAssets;
1052     ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1053     (void)NotifyChangedData(std::move(changedAssets));
1054     if (ret != E_OK) {
1055         LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1056         return ret;
1057     }
1058     UpdateCloudWaterMark(param);
1059     return E_OK;
1060 }
1061 
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1062 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1063     bool lastBatch)
1064 {
1065     CloudTaskInfo taskInfo;
1066     {
1067         std::lock_guard<std::mutex> autoLock(queueLock_);
1068         taskInfo = cloudTaskInfos_[uploadParam.taskId];
1069     }
1070     std::lock_guard<std::mutex> autoLock(contextLock_);
1071     if (uploadParam.lastTable && lastBatch) {
1072         currentContext_.notifier->UpdateProcess(innerProcessInfo);
1073     } else {
1074         currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1075     }
1076 }
1077 
DoDownload(CloudSyncer::TaskId taskId)1078 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId)
1079 {
1080     SyncParam param;
1081     int ret = GetCurrentTableName(param.tableName);
1082     if (ret != E_OK) {
1083         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1084         return ret;
1085     }
1086     param.info.tableName = param.tableName;
1087     std::vector<Field> assetFields;
1088     // only no primary key and composite primary key contains rowid.
1089     ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
1090     if (ret != E_OK) {
1091         LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
1092         return ret;
1093     }
1094     {
1095         std::lock_guard<std::mutex> autoLock(contextLock_);
1096         currentContext_.assetFields[currentContext_.tableName] = assetFields;
1097     }
1098     param.isSinglePrimaryKey = IsSinglePrimaryKey(param.pkColNames);
1099     param.cloudWaterMark = "";
1100     if (!IsModeForcePull(taskId)) {
1101         ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1102         if (ret != E_OK) {
1103             LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
1104             return ret;
1105         }
1106     }
1107     return DoDownloadInner(taskId, param);
1108 }
1109 
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param)1110 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param)
1111 {
1112     // Query data by batch until reaching end and not more data need to be download
1113     int ret = PreCheck(taskId, param.info.tableName);
1114     if (ret != E_OK) {
1115         return ret;
1116     }
1117     bool queryEnd = false;
1118     while (!queryEnd) {
1119         // Get cloud data after cloud water mark
1120         param.info.tableStatus = ProcessStatus::PROCESSING;
1121         DownloadData downloadData;
1122         param.downloadData = downloadData;
1123         param.isLastBatch = false;
1124         ret = QueryCloudData(param.info.tableName, param.cloudWaterMark, param.downloadData);
1125         if (ret == -E_QUERY_END) {
1126             // Won't break here since downloadData may not be null
1127             queryEnd = true;
1128             param.isLastBatch = true;
1129         } else if (ret != E_OK) {
1130             std::lock_guard<std::mutex> autoLock(contextLock_);
1131             param.info.tableStatus = ProcessStatus::FINISHED;
1132             currentContext_.notifier->UpdateProcess(param.info);
1133             return ret;
1134         }
1135         if (param.downloadData.data.empty()) {
1136             if (ret == E_OK) {
1137                 LOGD("[CloudSyncer] try to query cloud data use increment water mark");
1138                 UpdateCloudWaterMark(param);
1139                 continue;
1140             }
1141             NotifyInEmptyDownload(taskId, param.info);
1142             break;
1143         }
1144         // Save data in transaction, update cloud water mark, notify process and changed data
1145         ret = SaveDataNotifyProcess(taskId, param);
1146         if (ret != E_OK) {
1147             std::lock_guard<std::mutex> autoLock(contextLock_);
1148             param.info.tableStatus = ProcessStatus::FINISHED;
1149             currentContext_.notifier->UpdateProcess(param.info);
1150             return ret;
1151         }
1152         (void)NotifyInDownload(taskId, param);
1153     }
1154     return E_OK;
1155 }
1156 
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1157 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1158 {
1159     std::lock_guard<std::mutex> autoLock(contextLock_);
1160     if (currentContext_.strategy->JudgeUpload()) {
1161         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1162     } else {
1163         info.tableStatus = FINISHED;
1164         if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1165             currentContext_.notifier->UpdateProcess(info);
1166         } else {
1167             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1168         }
1169     }
1170 }
1171 
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1172 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1173 {
1174     int ret = PreCheck(taskId, tableName);
1175     if (ret != E_OK) {
1176         return ret;
1177     }
1178     {
1179         std::lock_guard<std::mutex> autoLock(queueLock_);
1180         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1181             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1182             return -E_INVALID_ARGS;
1183         }
1184         if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1185             (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1186             LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1187                 static_cast<int>(cloudTaskInfos_[taskId].mode));
1188             return -E_INVALID_ARGS;
1189         }
1190     }
1191 
1192     if (!IsModeForcePush(taskId)) {
1193         ret = storageProxy_->GetLocalWaterMark(tableName, localMark);
1194         if (ret != E_OK) {
1195             LOGE("[CloudSyncer] Failed to get local water mark when upload, %d.", ret);
1196         }
1197     }
1198     return ret;
1199 }
1200 
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)1201 bool CloudSyncer::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
1202 {
1203     return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
1204            uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
1205            uploadData.delData.extend.empty() && uploadData.delData.record.empty();
1206 }
1207 
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1208 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1209 {
1210     int errCode = E_OK;
1211     Info insertInfo;
1212     Info updateInfo;
1213     Info deleteInfo;
1214 
1215     if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1216         errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
1217             uploadData.delData.extend, deleteInfo);
1218         if (errCode != E_OK) {
1219             return errCode;
1220         }
1221         innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
1222     }
1223 
1224     if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1225         errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
1226             uploadData.insData.extend, insertInfo);
1227         if (errCode != E_OK) {
1228             return errCode;
1229         }
1230         // we need to fill back gid after insert data to cloud.
1231         int ret = storageProxy_->FillCloudGidAndAsset(OpType::INSERT, uploadData);
1232         if (ret != E_OK) {
1233             LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", ret);
1234             return ret;
1235         }
1236         innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount;
1237     }
1238 
1239     if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1240         errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
1241             uploadData.updData.extend, updateInfo);
1242         if (errCode != E_OK) {
1243             return errCode;
1244         }
1245         errCode = storageProxy_->FillCloudGidAndAsset(OpType::UPDATE, uploadData);
1246         if (errCode != E_OK) {
1247             LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errCode);
1248             return errCode;
1249         }
1250         innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount;
1251     }
1252     bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total;
1253     if (lastBatch) {
1254         innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1255     }
1256     // After each batch upload successed, call NotifyProcess
1257     NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1258 
1259     // if batch upload successed, update local water mark
1260     // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1261     errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1262     if (errCode != E_OK) {
1263         LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1264     }
1265     return errCode;
1266 }
1267 
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1268 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1269 {
1270     int errCode = E_OK;
1271     // if we use local cover cloud strategy, it won't update local water mark also.
1272     if (IsModeForcePush(uploadParam.taskId)) {
1273         return E_OK;
1274     }
1275     errCode = storageProxy_->PutLocalWaterMark(tableName, uploadParam.localMark);
1276     if (errCode != E_OK) {
1277         LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1278     }
1279     return errCode;
1280 }
1281 
DoUpload(CloudSyncer::TaskId taskId,bool lastTable)1282 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable)
1283 {
1284     std::string tableName;
1285     int ret = GetCurrentTableName(tableName);
1286     if (ret != E_OK) {
1287         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1288         return ret;
1289     }
1290 
1291     Timestamp localMark = 0u;
1292     ret = PreCheckUpload(taskId, tableName, localMark);
1293     if (ret != E_OK) {
1294         LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1295         return ret;
1296     }
1297 
1298     int64_t count = 0;
1299     ret = storageProxy_->GetUploadCount(tableName, localMark, IsModeForcePush(taskId), count);
1300     if (ret != E_OK) {
1301         // GetUploadCount will return E_OK when upload count is zero.
1302         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1303         return ret;
1304     }
1305     if (count == 0) {
1306         LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
1307         InnerProcessInfo innerProcessInfo;
1308         innerProcessInfo.tableName = tableName;
1309         innerProcessInfo.upLoadInfo.total = 0;  // count is zero
1310         innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1311         {
1312             std::lock_guard<std::mutex> autoLock(contextLock_);
1313             if (lastTable) {
1314                 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1315             } else {
1316                 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
1317             }
1318         }
1319         return E_OK;
1320     }
1321     UploadParam param;
1322     param.count = count;
1323     param.localMark = localMark;
1324     param.lastTable = lastTable;
1325     param.taskId = taskId;
1326     return DoUploadInner(tableName, param);
1327 }
1328 
TagUploadAssets(CloudSyncData & uploadData)1329 int CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1330 {
1331     int errCode = E_OK;
1332     if (!IsDataContainAssets()) {
1333         return E_OK;
1334     }
1335     std::map<std::string, std::map<std::string, Assets>> cloudAssets;
1336     {
1337         std::lock_guard<std::mutex> autoLock(contextLock_);
1338         cloudAssets = currentContext_.assetsInfo[currentContext_.tableName];
1339     }
1340     // for delete scenario, assets should not appear in the records. Thereby we needn't tag the assests.
1341     // for insert scenario, gid does not exist. Thereby, we needn't compare with cloud asset get in download procedure
1342     for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1343         VBucket cloudAsset; // cloudAsset must be empty
1344         (void)TagAssetsInSingleRecord(uploadData.insData.record[i], cloudAsset, true, errCode);
1345         if (errCode != E_OK) {
1346             LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in DELETE/INSERT option");
1347             return errCode;
1348         }
1349     }
1350     // for update scenario, assets shoulb be compared with asset get in download procedure.
1351     for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1352         VBucket cloudAsset;
1353         // gid must exist in UPDATE scenario, cause we have re-fill gid during download procedure
1354         // But we need to check for safety
1355         auto gidIter = uploadData.updData.extend[i].find(CloudDbConstant::GID_FIELD);
1356         if (gidIter == uploadData.updData.extend[i].end()) {
1357             LOGE("[CloudSyncer] Datum to be upload must contain gid");
1358             return -E_INVALID_DATA;
1359         }
1360         // update data must contain gid, however, we could only pull data after water mark
1361         // Therefore, we need to check whether we contain the data
1362         std::string &gid = std::get<std::string>(gidIter->second);
1363         if (cloudAssets.find(gid) == cloudAssets.end()) {
1364             // In this case, we directly upload data without compartion and tagging
1365             std::vector<Field> assetFields;
1366             {
1367                 std::lock_guard<std::mutex> autoLock(contextLock_);
1368                 assetFields = currentContext_.assetFields[currentContext_.tableName];
1369             }
1370             StatusToFlagForAssetsInRecord(assetFields, uploadData.updData.record[i]);
1371             continue;
1372         }
1373         for (const auto &it : cloudAssets[gid]) {
1374             cloudAsset[it.first] = it.second;
1375         }
1376         (void)TagAssetsInSingleRecord(uploadData.updData.record[i], cloudAsset, true, errCode);
1377         if (errCode != E_OK) {
1378             LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in UPDATE option");
1379             return errCode;
1380         }
1381     }
1382     return E_OK;
1383 }
1384 
PreProcessBatchUpload(TaskId taskId,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData,Timestamp & localMark)1385 int CloudSyncer::PreProcessBatchUpload(TaskId taskId, const InnerProcessInfo &innerProcessInfo,
1386     CloudSyncData &uploadData, Timestamp &localMark)
1387 {
1388     // Precheck and calculate local water mark which would be updated if batch upload successed.
1389     int ret = CheckTaskIdValid(taskId);
1390     if (ret != E_OK) {
1391         return ret;
1392     }
1393     ret = CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName, innerProcessInfo.upLoadInfo.total, taskId);
1394     if (ret != E_OK) {
1395         LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1396         return ret;
1397     }
1398     ret = TagUploadAssets(uploadData);
1399     if (ret != E_OK) {
1400         LOGE("TagUploadAssets report ERROR, cannot tag uploadAssets");
1401         return ret;
1402     }
1403     // get local water mark to be updated in future.
1404     ret = UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total, taskId, localMark);
1405     if (ret != E_OK) {
1406         LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1407     }
1408     return ret;
1409 }
1410 
SaveCloudWaterMark(const TableName & tableName)1411 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName)
1412 {
1413     std::string cloudWaterMark;
1414     {
1415         std::lock_guard<std::mutex> autoLock(contextLock_);
1416         if (currentContext_.cloudWaterMarks.find(tableName) == currentContext_.cloudWaterMarks.end()) {
1417             LOGD("[CloudSyncer] Not found water mark just return");
1418             return E_OK;
1419         }
1420         cloudWaterMark = currentContext_.cloudWaterMarks[tableName];
1421     }
1422     int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1423     if (errCode != E_OK) {
1424         LOGE("[CloudSyncer] Cannot set cloud water mark while Uploading, %d.", errCode);
1425     }
1426     return errCode;
1427 }
1428 
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1429 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1430 {
1431     std::lock_guard<std::mutex> autoLock(queueLock_);
1432     uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1433 }
1434 
IsModeForcePush(const TaskId taskId)1435 bool CloudSyncer::IsModeForcePush(const TaskId taskId)
1436 {
1437     std::lock_guard<std::mutex> autoLock(queueLock_);
1438     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1439 }
1440 
IsModeForcePull(const TaskId taskId)1441 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1442 {
1443     std::lock_guard<std::mutex> autoLock(queueLock_);
1444     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1445 }
1446 
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)1447 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
1448 {
1449     ContinueToken continueStmtToken = nullptr;
1450     CloudSyncData uploadData(tableName);
1451     SetUploadDataFlag(uploadParam.taskId, uploadData);
1452 
1453     int ret = storageProxy_->GetCloudData(tableName, uploadParam.localMark, continueStmtToken, uploadData);
1454     if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1455         LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1456         return ret;
1457     }
1458 
1459     InnerProcessInfo info;
1460     info.tableName = tableName;
1461     info.tableStatus = ProcessStatus::PROCESSING;
1462     info.upLoadInfo.total = static_cast<uint32_t>(uploadParam.count);
1463     uint32_t batchIndex = 0;
1464     bool getDataUnfinished = false;
1465 
1466     while (!CheckCloudSyncDataEmpty(uploadData)) {
1467         getDataUnfinished = (ret == -E_UNFINISHED);
1468         ret = PreProcessBatchUpload(uploadParam.taskId, info, uploadData, uploadParam.localMark);
1469         if (ret != E_OK) {
1470             break;
1471         }
1472         info.upLoadInfo.batchIndex = ++batchIndex;
1473 
1474         ret = DoBatchUpload(uploadData, uploadParam, info);
1475         if (ret != E_OK) {
1476             LOGE("[CloudSyncer] Failed to do upload, %d", ret);
1477             info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
1478             info.tableStatus = ProcessStatus::FINISHED;
1479             {
1480                 std::lock_guard<std::mutex> autoLock(contextLock_);
1481                 currentContext_.notifier->UpdateProcess(info);
1482             }
1483             break;
1484         }
1485 
1486         uploadData = CloudSyncData(tableName);
1487 
1488         if (continueStmtToken == nullptr) {
1489             break;
1490         }
1491         SetUploadDataFlag(uploadParam.taskId, uploadData);
1492 
1493         ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
1494         if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1495             LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
1496             break;
1497         }
1498     }
1499 
1500     if (getDataUnfinished) {
1501         storageProxy_->ReleaseContinueToken(continueStmtToken);
1502     }
1503     return ret;
1504 }
1505 
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1506 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1507 {
1508     // type index of field in fields
1509     std::vector<std::pair<std::string, int32_t>> fieldAndIndex = {
1510         std::pair<std::string, int32_t>(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>),
1511         std::pair<std::string, int32_t>(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>),
1512         std::pair<std::string, int32_t>(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>),
1513         std::pair<std::string, int32_t>(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>),
1514         std::pair<std::string, int32_t>(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>)
1515     };
1516 
1517     for (const auto &fieldIndex : fieldAndIndex) {
1518         if (datum.find(fieldIndex.first) == datum.end()) {
1519             LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", fieldIndex.first.c_str());
1520             return -E_CLOUD_ERROR;
1521         }
1522         if (datum[fieldIndex.first].index() != static_cast<size_t>(fieldIndex.second)) {
1523             LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", fieldIndex.first.c_str());
1524             return -E_CLOUD_ERROR;
1525         }
1526     }
1527 
1528     if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1529         RemoveDataExceptExtendInfo(datum, pkColNames);
1530     }
1531     std::lock_guard<std::mutex> autoLock(contextLock_);
1532     if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1533         LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1534         return -E_CLOUD_ERROR;
1535     }
1536     return E_OK;
1537 }
1538 
QueryCloudData(const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1539 int CloudSyncer::QueryCloudData(const std::string &tableName, std::string &cloudWaterMark,
1540     DownloadData &downloadData)
1541 {
1542     VBucket extend = {
1543         {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
1544     };
1545     int ret = cloudDB_.Query(tableName, extend, downloadData.data);
1546     if (ret == -E_QUERY_END) {
1547         LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1548         return -E_QUERY_END;
1549     }
1550     if (ret == E_OK && downloadData.data.empty()) {
1551         if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1552             LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1553             return -E_CLOUD_ERROR;
1554         }
1555         cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1556         LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1557         return ret;
1558     }
1559     if (ret != E_OK) {
1560         LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1561     }
1562     return ret;
1563 }
1564 
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)1565 int CloudSyncer::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
1566 {
1567     if (devices.size() != 1) {
1568         LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
1569         return -E_INVALID_ARGS;
1570     }
1571     for (const auto &dev: devices) {
1572         if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
1573             LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
1574             return -E_INVALID_ARGS;
1575         }
1576     }
1577     if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
1578         LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
1579         return -E_NOT_SUPPORT;
1580     }
1581     if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
1582         LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
1583         return -E_INVALID_ARGS;
1584     }
1585     return E_OK;
1586 }
1587 
CheckTaskIdValid(TaskId taskId)1588 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1589 {
1590     if (closed_) {
1591         LOGE("[CloudSyncer] DB is closed.");
1592         return -E_DB_CLOSED;
1593     }
1594     {
1595         std::lock_guard<std::mutex> autoLock(queueLock_);
1596         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1597             LOGE("[CloudSyncer] not found task.");
1598             return -E_INVALID_ARGS;
1599         }
1600         if (cloudTaskInfos_[taskId].errCode != E_OK) {
1601             return cloudTaskInfos_[taskId].errCode;
1602         }
1603     }
1604     std::lock_guard<std::mutex> autoLock(contextLock_);
1605     return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1606 }
1607 
GetCurrentTableName(std::string & tableName)1608 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1609 {
1610     std::lock_guard<std::mutex> autoLock(contextLock_);
1611     if (currentContext_.tableName.empty()) {
1612         return -E_BUSY;
1613     }
1614     tableName = currentContext_.tableName;
1615     return E_OK;
1616 }
1617 
TryToAddSyncTask(CloudTaskInfo && taskInfo)1618 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
1619 {
1620     if (closed_) {
1621         LOGW("[CloudSyncer] syncer is closed, should not sync now");
1622         return -E_DB_CLOSED;
1623     }
1624     std::lock_guard<std::mutex> autoLock(queueLock_);
1625     int errCode = CheckQueueSizeWithNoLock();
1626     if (errCode != E_OK) {
1627         return errCode;
1628     }
1629     do {
1630         currentTaskId_++;
1631     } while (currentTaskId_ == 0);
1632     taskInfo.taskId = currentTaskId_;
1633     taskQueue_.push_back(currentTaskId_);
1634     cloudTaskInfos_[currentTaskId_] = taskInfo;
1635     LOGI("[CloudSyncer] Add task ok, taskId %" PRIu64, cloudTaskInfos_[currentTaskId_].taskId);
1636     return E_OK;
1637 }
1638 
CheckQueueSizeWithNoLock()1639 int CloudSyncer::CheckQueueSizeWithNoLock()
1640 {
1641     int32_t limit = queuedManualSyncLimit_;
1642     if (taskQueue_.size() >= static_cast<size_t>(limit)) {
1643         LOGW("[CloudSyncer] too much sync task");
1644         return -E_BUSY;
1645     }
1646     return E_OK;
1647 }
1648 
PrepareSync(TaskId taskId)1649 int CloudSyncer::PrepareSync(TaskId taskId)
1650 {
1651     std::vector<std::string> tableNames;
1652     std::vector<std::string> devices;
1653     SyncMode mode;
1654     {
1655         std::lock_guard<std::mutex> autoLock(queueLock_);
1656         if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1657             LOGW("[CloudSyncer] Abort sync because syncer is closed");
1658             return -E_DB_CLOSED;
1659         }
1660         tableNames = cloudTaskInfos_[taskId].table;
1661         mode = cloudTaskInfos_[taskId].mode;
1662         devices = cloudTaskInfos_[taskId].devices;
1663     }
1664     {
1665         // check current task is running or not
1666         std::lock_guard<std::mutex> autoLock(contextLock_);
1667         if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1668             LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1669             return -E_DB_CLOSED;
1670         }
1671         currentContext_.currentTaskId = taskId;
1672         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1673         currentContext_.strategy = StrategyFactory::BuildSyncStrategy(mode);
1674         currentContext_.notifier->Init(tableNames, devices);
1675         LOGI("[CloudSyncer] exec taskId %" PRIu64, taskId);
1676     }
1677     std::lock_guard<std::mutex> autoLock(queueLock_);
1678     cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1679     return E_OK;
1680 }
1681 
LockCloud(TaskId taskId)1682 int CloudSyncer::LockCloud(TaskId taskId)
1683 {
1684     int period;
1685     {
1686         auto res = cloudDB_.Lock();
1687         if (res.first != E_OK) {
1688             return res.first;
1689         }
1690         period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1691     }
1692     int errCode = StartHeartBeatTimer(period, taskId);
1693     if (errCode != E_OK) {
1694         UnlockCloud();
1695     }
1696     return errCode;
1697 }
1698 
UnlockCloud()1699 int CloudSyncer::UnlockCloud()
1700 {
1701     FinishHeartBeatTimer();
1702     int errCode = cloudDB_.UnLock();
1703     WaitAllHeartBeatTaskExit();
1704     return errCode;
1705 }
1706 
StartHeartBeatTimer(int period,TaskId taskId)1707 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1708 {
1709     if (timerId_ != 0u) {
1710         LOGW("[CloudSyncer] HeartBeat timer has been start!");
1711         return E_OK;
1712     }
1713     TimerId timerId = 0;
1714     int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1715         HeartBeat(timerId, taskId);
1716         return E_OK;
1717     }, nullptr, timerId);
1718     if (errCode != E_OK) {
1719         LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1720         return errCode;
1721     }
1722     timerId_ = timerId;
1723     return E_OK;
1724 }
1725 
FinishHeartBeatTimer()1726 void CloudSyncer::FinishHeartBeatTimer()
1727 {
1728     if (timerId_ == 0u) {
1729         return;
1730     }
1731     RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1732     timerId_ = 0u;
1733     LOGD("[CloudSyncer] Finish heartbeat timer ok");
1734 }
1735 
WaitAllHeartBeatTaskExit()1736 void CloudSyncer::WaitAllHeartBeatTaskExit()
1737 {
1738     std::unique_lock<std::mutex> uniqueLock(heartbeatMutex_);
1739     if (heartBeatCount_ <= 0) {
1740         return;
1741     }
1742     LOGD("[CloudSyncer] Begin wait all heartbeat task exit");
1743     heartbeatCv_.wait(uniqueLock, [this]() {
1744         return heartBeatCount_ <= 0;
1745     });
1746     LOGD("[CloudSyncer] End wait all heartbeat task exit");
1747 }
1748 
HeartBeat(TimerId timerId,TaskId taskId)1749 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1750 {
1751     if (timerId_ != timerId) {
1752         return;
1753     }
1754     {
1755         std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1756         heartBeatCount_++;
1757     }
1758     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1759         if (heartBeatCount_ >= HEARTBEAT_PERIOD) {
1760             // heartbeat block twice should finish task now
1761             SetTaskFailed(taskId, -E_CLOUD_ERROR);
1762         } else {
1763             int ret = cloudDB_.HeartBeat();
1764             if (ret != E_OK) {
1765                 HeartBeatFailed(taskId, ret);
1766             } else {
1767                 failedHeartBeatCount_ = 0;
1768             }
1769         }
1770         {
1771             std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1772             heartBeatCount_--;
1773         }
1774         heartbeatCv_.notify_all();
1775     });
1776     if (errCode != E_OK) {
1777         LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1778         {
1779             std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1780             heartBeatCount_--;
1781         }
1782         heartbeatCv_.notify_all();
1783     }
1784 }
1785 
HeartBeatFailed(TaskId taskId,int errCode)1786 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1787 {
1788     failedHeartBeatCount_++;
1789     if (failedHeartBeatCount_ < MAX_HEARTBEAT_FAILED_LIMIT) {
1790         return;
1791     }
1792     LOGW("[CloudSyncer] heartbeat failed too much times!");
1793     FinishHeartBeatTimer();
1794     SetTaskFailed(taskId, errCode);
1795 }
1796 
SetTaskFailed(TaskId taskId,int errCode)1797 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1798 {
1799     std::lock_guard<std::mutex> autoLock(queueLock_);
1800     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1801         return;
1802     }
1803     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1804         return;
1805     }
1806     cloudTaskInfos_[taskId].errCode = errCode;
1807 }
1808 
CheckCloudSyncDataValid(CloudSyncData uploadData,const std::string & tableName,const int64_t & count,TaskId & taskId)1809 int CloudSyncer::CheckCloudSyncDataValid(CloudSyncData uploadData, const std::string &tableName,
1810     const int64_t &count, TaskId &taskId)
1811 {
1812     size_t insRecordLen = uploadData.insData.record.size();
1813     size_t insExtendLen = uploadData.insData.extend.size();
1814     size_t updRecordLen = uploadData.updData.record.size();
1815     size_t updExtendLen = uploadData.updData.extend.size();
1816     size_t delRecordLen = uploadData.delData.record.size();
1817     size_t delExtendLen = uploadData.delData.extend.size();
1818 
1819     bool syncDataValid = (uploadData.tableName == tableName) &&
1820         ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
1821         (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
1822         (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen));
1823     if (!syncDataValid) {
1824         LOGE("[CloudSyncer] upload data is empty but upload count is not zero or upload table name"
1825             " is not the same as table name of sync data.");
1826         return -E_INTERNAL_ERROR;
1827     }
1828     int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
1829         static_cast<int64_t>(delRecordLen);
1830     if (syncDataCount > count) {
1831         LOGE("[CloudSyncer] Size of a batch of sync data is greater than upload data size.");
1832         return -E_INTERNAL_ERROR;
1833     }
1834 
1835     return E_OK;
1836 }
1837 
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)1838 int CloudSyncer::GetWaterMarkAndUpdateTime(std::vector<VBucket>& extend, Timestamp &waterMark)
1839 {
1840     for (auto &extendData: extend) {
1841         if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
1842             LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
1843             return -E_INTERNAL_ERROR;
1844         }
1845         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
1846             LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t.");
1847             return -E_INTERNAL_ERROR;
1848         }
1849         if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
1850             LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
1851             return -E_INTERNAL_ERROR;
1852         }
1853         if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
1854             LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t.");
1855             return -E_INTERNAL_ERROR;
1856         }
1857         waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
1858         int64_t modifyTime =
1859             std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
1860         int64_t createTime =
1861             std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
1862         extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
1863         extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
1864     }
1865     return E_OK;
1866 }
1867 
1868 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,TaskId taskId,Timestamp & waterMark)1869 int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count,
1870     TaskId taskId, Timestamp &waterMark)
1871 {
1872     int ret = CheckCloudSyncDataValid(uploadData, uploadData.tableName, count, taskId);
1873     if (ret != E_OK) {
1874         LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
1875         return ret;
1876     }
1877     if (!uploadData.insData.extend.empty()) {
1878         if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
1879             LOGE("[CloudSyncer] Inconsistent size of inserted data.");
1880             return -E_INTERNAL_ERROR;
1881         }
1882         ret = GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
1883         if (ret != E_OK) {
1884             return ret;
1885         }
1886     }
1887 
1888     if (!uploadData.updData.extend.empty()) {
1889         if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
1890             LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
1891             return -E_INTERNAL_ERROR;
1892         }
1893         ret = GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
1894         if (ret != E_OK) {
1895             return ret;
1896         }
1897     }
1898 
1899     if (!uploadData.delData.extend.empty()) {
1900         if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
1901             LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
1902             return -E_INTERNAL_ERROR;
1903         }
1904         ret = GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
1905         if (ret != E_OK) {
1906             return ret;
1907         }
1908     }
1909     return E_OK;
1910 }
1911 
ClearCloudSyncData(CloudSyncData & uploadData)1912 void CloudSyncer::ClearCloudSyncData(CloudSyncData &uploadData)
1913 {
1914     std::vector<VBucket>().swap(uploadData.insData.record);
1915     std::vector<VBucket>().swap(uploadData.insData.extend);
1916     std::vector<int64_t>().swap(uploadData.insData.rowid);
1917     std::vector<VBucket>().swap(uploadData.updData.record);
1918     std::vector<VBucket>().swap(uploadData.updData.extend);
1919     std::vector<VBucket>().swap(uploadData.delData.record);
1920     std::vector<VBucket>().swap(uploadData.delData.extend);
1921 }
1922 
GetCloudSyncTaskCount()1923 int32_t CloudSyncer::GetCloudSyncTaskCount()
1924 {
1925     std::lock_guard<std::mutex> autoLock(queueLock_);
1926     return taskQueue_.size();
1927 }
1928 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1929 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1930     const RelationalSchemaObject &localSchema)
1931 {
1932     std::lock_guard<std::mutex> lock(syncMutex_);
1933     int index = 1;
1934     for (const auto &tableName: tableNameList) {
1935         LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1936         int ret = storageProxy_->CleanWaterMark(tableName);
1937         if (ret != E_OK) {
1938         LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1939             return ret;
1940         }
1941         index++;
1942     }
1943     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1944     if (errCode != E_OK) {
1945         LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1946         return errCode;
1947     }
1948 
1949     std::vector<Asset> assets;
1950     errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1951     if (errCode != E_OK) {
1952         LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1953         storageProxy_->Rollback();
1954         return errCode;
1955     }
1956 
1957     if (!assets.empty() && mode == FLAG_AND_DATA) {
1958         errCode = cloudDB_.RemoveLocalAssets(assets);
1959         if (errCode != E_OK) {
1960             LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1961             storageProxy_->Rollback();
1962             return errCode;
1963         }
1964     }
1965 
1966     storageProxy_->Commit();
1967     return errCode;
1968 }
1969 
ModifyCloudDataTime(VBucket & data)1970 void CloudSyncer::ModifyCloudDataTime(VBucket &data)
1971 {
1972     // data already check field modify_field and create_field
1973     int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
1974     int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
1975     data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
1976     data[CloudDbConstant::CREATE_FIELD] = createTime;
1977 }
1978 
UpdateCloudWaterMark(const SyncParam & param)1979 void CloudSyncer::UpdateCloudWaterMark(const SyncParam &param)
1980 {
1981     bool isUpdateCloudCursor = true;
1982     {
1983         std::lock_guard<std::mutex> autoLock(queueLock_);
1984         isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1985     }
1986     // use the cursor of the last datum in data set to update cloud water mark
1987     if (isUpdateCloudCursor) {
1988         std::lock_guard<std::mutex> autoLock(contextLock_);
1989         currentContext_.cloudWaterMarks[param.info.tableName] = param.cloudWaterMark;
1990     }
1991 }
1992 
CommitDownloadResult(InnerProcessInfo & info,DownloadCommitList & commitList)1993 int CloudSyncer::CommitDownloadResult(InnerProcessInfo &info, DownloadCommitList &commitList)
1994 {
1995     uint32_t successCount = 0;
1996     int ret = HandleDownloadResult(info.tableName, commitList, successCount);
1997     info.downLoadInfo.failCount += (commitList.size() - successCount);
1998     info.downLoadInfo.successCount -= (commitList.size() - successCount);
1999     if (ret != E_OK) {
2000         LOGE("Commit download result failed.%d", ret);
2001     }
2002     commitList.clear();
2003     return ret;
2004 }
2005 
GetIdentify() const2006 std::string CloudSyncer::GetIdentify() const
2007 {
2008     return id_;
2009 }
2010 
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)2011 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult)
2012 {
2013     strategyOpResult = OpType::NOT_HANDLE;
2014     // ignore same record with local generate data
2015     if (dataInfo.localInfo.logInfo.device.empty() &&
2016         !NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
2017         // not handle same data
2018         return E_OK;
2019     }
2020     {
2021         std::lock_guard<std::mutex> autoLock(contextLock_);
2022         if (!currentContext_.strategy) {
2023             LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
2024             return -E_INTERNAL_ERROR;
2025         }
2026         strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, dataInfo.localInfo.logInfo,
2027             dataInfo.cloudLogInfo, param.deletePrimaryKeySet);
2028     }
2029     return E_OK;
2030 }
2031 } // namespace DistributedDB
2032