• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20 
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35 
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,bool isKvScene,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(
38     std::shared_ptr<StorageProxy> storageProxy, bool isKvScene, SingleVerConflictResolvePolicy policy)
39     : lastTaskId_(INVALID_TASK_ID),
40       storageProxy_(std::move(storageProxy)),
41       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
42       closed_(false),
43       hasKvRemoveTask(false),
44       timerId_(0u),
45       isKvScene_(isKvScene),
46       policy_(policy),
47       asyncTaskId_(INVALID_TASK_ID),
48       cancelAsyncTask_(false),
49       waitDownloadListener_(nullptr)
50 {
51     if (storageProxy_ != nullptr) {
52         id_ = storageProxy_->GetIdentify();
53     }
54     InitCloudSyncStateMachine();
55 }
56 
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)57 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
58     const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
59 {
60     CloudTaskInfo taskInfo;
61     taskInfo.mode = mode;
62     taskInfo.table = tables;
63     taskInfo.callback = callback;
64     taskInfo.timeout = waitTime;
65     taskInfo.devices = devices;
66     for (const auto &item: tables) {
67         QuerySyncObject syncObject;
68         syncObject.SetTableName(item);
69         taskInfo.queryList.push_back(syncObject);
70     }
71     return Sync(taskInfo);
72 }
73 
Sync(const CloudTaskInfo & taskInfo)74 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
75 {
76     int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
77     if (errCode != E_OK) {
78         return errCode;
79     }
80     if (cloudDB_.IsNotExistCloudDB()) {
81         LOGE("[CloudSyncer] Not set cloudDB!");
82         return -E_CLOUD_ERROR;
83     }
84     if (closed_) {
85         LOGE("[CloudSyncer] DB is closed!");
86         return -E_DB_CLOSED;
87     }
88     if (hasKvRemoveTask) {
89         LOGE("[CloudSyncer] has remove task, stop sync task!");
90         return -E_CLOUD_ERROR;
91     }
92     CloudTaskInfo info = taskInfo;
93     info.status = ProcessStatus::PREPARED;
94     errCode = TryToAddSyncTask(std::move(info));
95     if (errCode != E_OK) {
96         return errCode;
97     }
98     return TriggerSync();
99 }
100 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)101 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
102 {
103     cloudDB_.SetCloudDB(cloudDB);
104     LOGI("[CloudSyncer] SetCloudDB finish");
105 }
106 
GetCloudDB() const107 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
108 {
109     return cloudDB_.GetCloudDB();
110 }
111 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)112 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
113 {
114     storageProxy_->SetIAssetLoader(loader);
115     cloudDB_.SetIAssetLoader(loader);
116     LOGI("[CloudSyncer] SetIAssetLoader finish");
117 }
118 
Close()119 void CloudSyncer::Close()
120 {
121     closed_ = true;
122     CloudSyncer::TaskId currentTask;
123     {
124         std::lock_guard<std::mutex> autoLock(dataLock_);
125         currentTask = currentContext_.currentTaskId;
126     }
127     // mark current task db_closed
128     SetTaskFailed(currentTask, -E_DB_CLOSED);
129     UnlockIfNeed();
130     cloudDB_.Close();
131     WaitCurTaskFinished();
132 
133     // copy all task from queue
134     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
135     for (auto &info: infoList) {
136         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
137     }
138     storageProxy_->Close();
139 }
140 
TriggerSync()141 int CloudSyncer::TriggerSync()
142 {
143     if (closed_) {
144         return -E_DB_CLOSED;
145     }
146     RefObject::IncObjRef(this);
147     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
148         DoSyncIfNeed();
149         RefObject::DecObjRef(this);
150     });
151     if (errCode != E_OK) {
152         LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
153         RefObject::DecObjRef(this);
154     }
155     return errCode;
156 }
157 
SetProxyUser(const std::string & user)158 void CloudSyncer::SetProxyUser(const std::string &user)
159 {
160     std::lock_guard<std::mutex> autoLock(dataLock_);
161     if (storageProxy_ != nullptr) {
162         storageProxy_->SetUser(user);
163     }
164     if (currentContext_.notifier != nullptr) {
165         currentContext_.notifier->SetUser(user);
166     }
167     currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
168     cloudDB_.SwitchCloudDB(user);
169 }
170 
DoSyncIfNeed()171 void CloudSyncer::DoSyncIfNeed()
172 {
173     if (closed_) {
174         return;
175     }
176     // do all sync task in this loop
177     do {
178         // get taskId from queue
179         TaskId triggerTaskId = GetNextTaskId();
180         if (triggerTaskId == INVALID_TASK_ID) {
181             LOGD("[CloudSyncer] task queue empty");
182             break;
183         }
184         // pop taskId in queue
185         if (PrepareSync(triggerTaskId) != E_OK) {
186             break;
187         }
188         //  if the task is compensated task, we should get the query data and user list.
189         if (IsCompensatedTask(triggerTaskId) && !TryToInitQueryAndUserListForCompensatedSync(triggerTaskId)) {
190             // try to init query fail, we finish the compensated task.
191             continue;
192         }
193         CancelBackgroundDownloadAssetsTaskIfNeed();
194         // do sync logic
195         std::vector<std::string> usersList;
196         {
197             std::lock_guard<std::mutex> autoLock(dataLock_);
198             usersList = cloudTaskInfos_[triggerTaskId].users;
199             currentContext_.currentUserIndex = 0;
200         }
201         int errCode = E_OK;
202         if (usersList.empty()) {
203             SetProxyUser("");
204             errCode = DoSync(triggerTaskId);
205         } else {
206             for (const auto &user : usersList) {
207                 SetProxyUser(user);
208                 errCode = DoSync(triggerTaskId);
209             }
210         }
211         LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
212     } while (!closed_);
213     LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
214 }
215 
DoSync(TaskId taskId)216 int CloudSyncer::DoSync(TaskId taskId)
217 {
218     std::lock_guard<std::mutex> lock(syncMutex_);
219     ResetCurrentTableUploadBatchIndex();
220     CloudTaskInfo taskInfo;
221     {
222         std::lock_guard<std::mutex> autoLock(dataLock_);
223         taskInfo = cloudTaskInfos_[taskId];
224         cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
225         LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
226             static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
227     }
228     bool needUpload = true;
229     bool isNeedFirstDownload = false;
230     {
231         std::lock_guard<std::mutex> autoLock(dataLock_);
232         if (currentContext_.strategy != nullptr) {
233             needUpload = currentContext_.strategy->JudgeUpload();
234         }
235         // 1. if the locker is already exist, directly reuse the lock, no need do the first download
236         // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
237         isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload);
238     }
239     int errCode = E_OK;
240     bool isFirstDownload = true;
241     if (isNeedFirstDownload) {
242         // do first download
243         errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
244         SetTaskFailed(taskId, errCode);
245         if (errCode != E_OK) {
246             SyncMachineDoFinished();
247             return errCode;
248         }
249         bool isActuallyNeedUpload = false;  // whether the task actually has data to upload
250         {
251             std::lock_guard<std::mutex> autoLock(dataLock_);
252             isActuallyNeedUpload = currentContext_.isNeedUpload;
253         }
254         if (!isActuallyNeedUpload) {
255             LOGI("[CloudSyncer] no table need upload!");
256             SyncMachineDoFinished();
257             return E_OK;
258         }
259         isFirstDownload = false;
260     }
261 
262     {
263         std::lock_guard<std::mutex> autoLock(dataLock_);
264         currentContext_.isFirstDownload = isFirstDownload;
265         currentContext_.isRealNeedUpload = needUpload;
266     }
267     errCode = DoSyncInner(taskInfo);
268     return errCode;
269 }
270 
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)271 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
272 {
273     {
274         std::lock_guard<std::mutex> autoLock(dataLock_);
275         currentContext_.tableName = taskInfo.table[index];
276     }
277     int errCode = CheckTaskIdValid(taskInfo.taskId);
278     if (errCode != E_OK) {
279         LOGE("[CloudSyncer] task is invalid, abort sync");
280         return errCode;
281     }
282     if (taskInfo.table.empty()) {
283         LOGE("[CloudSyncer] Invalid taskInfo table");
284         return -E_INVALID_ARGS;
285     }
286     errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
287     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
288         {
289             std::lock_guard<std::mutex> autoLock(dataLock_);
290             currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
291                 taskInfo.table[index], false);
292             LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
293         }
294         return errCode;
295     }
296     if (errCode != E_OK) {
297         LOGE("[CloudSyncer] upload failed %d", errCode);
298         return errCode;
299     }
300     return errCode;
301 }
302 
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)303 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
304 {
305     if (!needUpload || taskInfo.isAssetsOnly) {
306         return E_OK;
307     }
308     storageProxy_->BeforeUploadTransaction();
309     int errCode = storageProxy_->StartTransaction();
310     if (errCode != E_OK) {
311         LOGE("[CloudSyncer] start transaction failed before doing upload.");
312         return errCode;
313     }
314     for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
315         LOGD("[CloudSyncer] try upload table, index: %zu, table name: %s, length: %u",
316             i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
317         if (IsTableFinishInUpload(taskInfo.table[i])) {
318             continue;
319         }
320         errCode = PrepareAndUpload(taskInfo, i);
321         if (errCode == -E_TASK_PAUSED) { // should re download [paused table, last table]
322             for (size_t j = i; j < taskInfo.table.size(); ++j) {
323                 MarkDownloadFinishIfNeed(taskInfo.table[j], false);
324             }
325         }
326         if (errCode != E_OK) {
327             break;
328         }
329         MarkUploadFinishIfNeed(taskInfo.table[i]);
330     }
331     if (errCode == -E_TASK_PAUSED) {
332         std::lock_guard<std::mutex> autoLock(dataLock_);
333         resumeTaskInfos_[taskInfo.taskId].upload = true;
334     }
335     if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
336         int commitErrorCode = storageProxy_->Commit();
337         if (commitErrorCode != E_OK) {
338             LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
339         }
340     } else {
341         int rollBackErrorCode = storageProxy_->Rollback();
342         if (rollBackErrorCode != E_OK) {
343             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
344         }
345     }
346     return errCode;
347 }
348 
SyncMachineDoDownload()349 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
350 {
351     CloudTaskInfo taskInfo;
352     bool needUpload;
353     bool isFirstDownload;
354     {
355         std::lock_guard<std::mutex> autoLock(dataLock_);
356         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
357         needUpload = currentContext_.isRealNeedUpload;
358         isFirstDownload = currentContext_.isFirstDownload;
359     }
360     int errCode = E_OK;
361     if (IsLockInDownload()) {
362         errCode = LockCloudIfNeed(taskInfo.taskId);
363     }
364     if (errCode != E_OK) {
365         return SetCurrentTaskFailedInMachine(errCode);
366     }
367     errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
368     if (errCode != E_OK) {
369         if (errCode == -E_TASK_PAUSED) {
370             DBDfxAdapter::ReportBehavior(
371                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
372         } else {
373             DBDfxAdapter::ReportBehavior(
374                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
375         }
376         return SetCurrentTaskFailedInMachine(errCode);
377     }
378     DBDfxAdapter::ReportBehavior(
379         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
380     return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
381 }
382 
SyncMachineDoUpload()383 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
384 {
385     CloudTaskInfo taskInfo;
386     bool needUpload;
387     {
388         std::lock_guard<std::mutex> autoLock(dataLock_);
389         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
390         needUpload = currentContext_.isRealNeedUpload;
391     }
392     DBDfxAdapter::ReportBehavior(
393         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
394     int errCode = DoUploadInNeed(taskInfo, needUpload);
395     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
396         DBDfxAdapter::ReportBehavior(
397             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
398         return CloudSyncEvent::REPEAT_CHECK_EVENT;
399     }
400     if (errCode != E_OK) {
401         {
402             std::lock_guard<std::mutex> autoLock(dataLock_);
403             cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
404         }
405         if (errCode == -E_TASK_PAUSED) {
406             DBDfxAdapter::ReportBehavior(
407                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
408         } else {
409             DBDfxAdapter::ReportBehavior(
410                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
411         }
412         return CloudSyncEvent::ERROR_EVENT;
413     }
414     DBDfxAdapter::ReportBehavior(
415         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
416     return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
417 }
418 
SyncMachineDoFinished()419 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
420 {
421     UnlockIfNeed();
422     TaskId taskId;
423     int errCode;
424     int currentUserIndex;
425     int userListSize;
426     {
427         std::lock_guard<std::mutex> autoLock(dataLock_);
428         taskId = currentContext_.currentTaskId;
429         errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
430         currentUserIndex = currentContext_.currentUserIndex;
431         userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
432     }
433     if (currentUserIndex >= userListSize) {
434         {
435             std::lock_guard<std::mutex> autoLock(dataLock_);
436             cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
437         }
438         DoFinished(taskId, errCode);
439     } else {
440         CloudTaskInfo taskInfo;
441         {
442             std::lock_guard<std::mutex> autoLock(dataLock_);
443             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
444         }
445         taskInfo.status = ProcessStatus::FINISHED;
446         currentContext_.notifier->NotifyProcess(taskInfo, {}, true);
447         {
448             std::lock_guard<std::mutex> autoLock(dataLock_);
449             cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
450         }
451     }
452     return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
453 }
454 
DoSyncInner(const CloudTaskInfo & taskInfo)455 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
456 {
457     cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
458     DBDfxAdapter::ReportBehavior(
459         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
460     return E_OK;
461 }
462 
DoFinished(TaskId taskId,int errCode)463 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
464 {
465     storageProxy_->OnSyncFinish();
466     if (errCode == -E_TASK_PAUSED) {
467         DBDfxAdapter::ReportBehavior(
468             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
469         LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
470         {
471             std::lock_guard<std::mutex> autoLock(dataLock_);
472             resumeTaskInfos_[taskId].context = std::move(currentContext_);
473             currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
474             resumeTaskInfos_[taskId].context.locker = nullptr;
475             ClearCurrentContextWithoutLock();
476         }
477         contextCv_.notify_all();
478         return;
479     }
480     {
481         std::lock_guard<std::mutex> autoLock(dataLock_);
482         RemoveTaskFromQueue(cloudTaskInfos_[taskId].priorityLevel, taskId);
483     }
484     ClearContextAndNotify(taskId, errCode);
485 }
486 
487 /**
488  * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
489 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)490 int CloudSyncer::UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList)
491 {
492     if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
493         return E_OK;
494     }
495     int ret = E_OK;
496     for (size_t j : param.withoutRowIdData.insertData) {
497         VBucket &datum = param.downloadData.data[j];
498         std::vector<Type> primaryValues;
499         ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
500             std::get<int64_t>(datum[DBConstant::ROWID]), primaryValues);
501         if (ret != E_OK) {
502             LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
503             return ret;
504         }
505         param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
506     }
507     for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
508         size_t downloadIndex = std::get<0>(tuple);
509         VBucket &datum = param.downloadData.data[downloadIndex];
510         size_t insertIdx = std::get<1>(tuple);
511         std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
512         pkVal[0] = datum[DBConstant::ROWID];
513     }
514     for (const auto &tuple : param.withoutRowIdData.updateData) {
515         size_t downloadIndex = std::get<0>(tuple);
516         size_t updateIndex = std::get<1>(tuple);
517         VBucket &datum = param.downloadData.data[downloadIndex];
518         size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
519         if (updateIndex >= size) {
520             LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
521             return -E_INTERNAL_ERROR;
522         }
523         if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
524             LOGE("[CloudSyncer] primary key value list should not be empty.");
525             return -E_INTERNAL_ERROR;
526         }
527         // no primary key or composite primary key, the first element is rowid
528         param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
529             datum[DBConstant::ROWID];
530     }
531     return ret;
532 }
533 
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)534 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
535 {
536     for (const auto &assetField : assetFields) {
537         if (data.find(assetField.colName) == data.end()) {
538             return false;
539         }
540         if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
541             if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
542                 return true;
543             }
544         }
545     }
546     return false;
547 }
548 
IsDataContainAssets()549 bool CloudSyncer::IsDataContainAssets()
550 {
551     std::lock_guard<std::mutex> autoLock(dataLock_);
552     bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
553     if (!hasTable) {
554         LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
555             -E_INTERNAL_ERROR);
556             return false;
557     }
558     if (currentContext_.assetFields[currentContext_.tableName].empty()) {
559         return false;
560     }
561     return true;
562 }
563 
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,bool isForcePullAseets,int & errCode)564 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
565     bool setNormalStatus, bool isForcePullAseets, int &errCode)
566 {
567     // Define a map to store the result
568     std::map<std::string, Assets> res = {};
569     std::vector<Field> assetFields;
570     {
571         std::lock_guard<std::mutex> autoLock(dataLock_);
572         assetFields = currentContext_.assetFields[currentContext_.tableName];
573     }
574     // For every column contain asset or assets, assetFields are in context
575     TagAssetsInfo tagAssetsInfo = {coveredData, beCoveredData, setNormalStatus, isForcePullAseets};
576     for (const Field &assetField : assetFields) {
577         Assets assets = TagAssetsInSingleCol(tagAssetsInfo, assetField, errCode);
578         if (!assets.empty()) {
579             res[assetField.colName] = assets;
580         }
581         if (errCode != E_OK) {
582             break;
583         }
584     }
585     return res;
586 }
587 
FillCloudAssets(InnerProcessInfo & info,VBucket & normalAssets,VBucket & failedAssets)588 int CloudSyncer::FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets)
589 {
590     int ret = E_OK;
591     if (normalAssets.size() > 1) {
592         if (info.isAsyncDownload) {
593             ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, normalAssets, true);
594         } else {
595             ret = storageProxy_->FillCloudAssetForDownload(info.tableName, normalAssets, true);
596         }
597         if (ret != E_OK) {
598             LOGE("[CloudSyncer]%s download: Can not fill normal assets for download",
599                 info.isAsyncDownload ? "Async" : "Sync");
600             return ret;
601         }
602     }
603     if (failedAssets.size() > 1) {
604         if (info.isAsyncDownload) {
605             ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, failedAssets, false);
606         } else {
607             ret = storageProxy_->FillCloudAssetForDownload(info.tableName, failedAssets, false);
608         }
609         if (ret != E_OK) {
610             LOGE("[CloudSyncer]%s download: Can not fill abnormal assets for download",
611                 info.isAsyncDownload ? "Async" : "Sync");
612             return ret;
613         }
614     }
615     return E_OK;
616 }
617 
HandleDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)618 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
619     DownloadCommitList &commitList, uint32_t &successCount)
620 {
621     successCount = 0;
622     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
623     if (errCode != E_OK) {
624         LOGE("[CloudSyncer] start transaction Failed before handle download.");
625         return errCode;
626     }
627     errCode = CommitDownloadAssets(downloadItem, info, commitList, successCount);
628     if (errCode != E_OK) {
629         successCount = 0;
630         int ret = E_OK;
631         if (errCode == -E_REMOVE_ASSETS_FAILED) {
632             // remove assets failed no effect to asset status, just commit
633             ret = storageProxy_->Commit();
634         } else {
635             ret = storageProxy_->Rollback();
636         }
637         LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
638         return errCode;
639     }
640     errCode = storageProxy_->Commit();
641     if (errCode != E_OK) {
642         successCount = 0;
643         LOGE("[CloudSyncer] commit failed %d", errCode);
644     }
645     return errCode;
646 }
647 
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)648 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
649     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
650 {
651     int downloadStatus = E_OK;
652     {
653         std::lock_guard<std::mutex> autoLock(dataLock_);
654         downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
655         resumeTaskInfos_[taskId].downloadStatus = E_OK;
656     }
657     int errorCode = E_OK;
658     DownloadCommitList commitList;
659     for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
660         errorCode = CheckTaskIdValid(taskId);
661         if (errorCode != E_OK) {
662             std::lock_guard<std::mutex> autoLock(dataLock_);
663             resumeTaskInfos_[taskId].lastDownloadIndex = i;
664             resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
665             break;
666         }
667         DownloadItem downloadItem;
668         GetDownloadItem(downloadList, i, downloadItem);
669         errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
670         if (errorCode == -E_NOT_SET) {
671             info.downLoadInfo.failCount += (downloadList.size() - i);
672             info.downLoadInfo.successCount -= (downloadList.size() - i);
673             return errorCode;
674         }
675         if (downloadItem.strategy == OpType::DELETE) {
676             downloadItem.assets = {};
677             downloadItem.gid = "";
678         }
679         // Process result of each asset
680         commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
681         downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
682         int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
683         if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
684             return ret;
685         }
686         downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
687     }
688     LOGD("Download status is %d", downloadStatus);
689     return errorCode == E_OK ? downloadStatus : errorCode;
690 }
691 
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)692 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
693     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
694 {
695     if (!IsDataContainAssets()) {
696         return E_OK;
697     }
698     // update changed data info
699     if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
700         // changedData.primaryData should have no member inside
701         return -E_INVALID_ARGS;
702     }
703     changedAssets.tableName = info.tableName;
704     changedAssets.type = ChangedDataType::ASSET;
705     changedAssets.field = pKColNames;
706 
707     // Get AssetDownloadList
708     DownloadList changeList;
709     TaskId taskId;
710     {
711         std::lock_guard<std::mutex> autoLock(dataLock_);
712         changeList = currentContext_.assetDownloadList;
713         taskId = currentContext_.currentTaskId;
714     }
715     // Download data (include deleting) will handle return Code in this situation
716     int ret = E_OK;
717     if (RuntimeContext::GetInstance()->IsBatchDownloadAssets()) {
718         ret = CloudDbBatchDownloadAssets(taskId, changeList, dupHashKeySet, info, changedAssets);
719     } else {
720         ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
721     }
722     if (ret != E_OK) {
723         LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
724     }
725     return ret;
726 }
727 
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)728 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
729     VBucket &localAssetInfo)
730 {
731     int ret = E_OK;
732     OpType strategy = param.downloadData.opType[idx];
733     switch (strategy) {
734         case OpType::INSERT:
735         case OpType::UPDATE:
736         case OpType::DELETE:
737             ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
738             break;
739         case OpType::NOT_HANDLE:
740         case OpType::ONLY_UPDATE_GID:
741         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
742             (void)TagAssetsInSingleRecord(
743                 localAssetInfo, param.downloadData.data[idx], true, param.isForcePullAseets, ret);
744             for (const auto &[col, value]: localAssetInfo) {
745                 param.downloadData.data[idx].insert_or_assign(col, value);
746             }
747             break;
748         }
749         default:
750             break;
751     }
752     return ret;
753 }
754 
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)755 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
756     VBucket &localAssetInfo)
757 {
758     Type prefix;
759     std::vector<Type> pkVals;
760     OpType strategy = param.downloadData.opType[idx];
761     bool isDelStrategy = (strategy == OpType::DELETE);
762     int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
763         param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
764     if (ret != E_OK) {
765         LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
766         return ret;
767     }
768     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
769     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
770         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
771         return -E_INTERNAL_ERROR;
772     }
773     AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
774     std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
775         false, param.isForcePullAseets, ret);
776     if (ret != E_OK) {
777         LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
778         return ret;
779     }
780     strategy = CloudSyncUtils::CalOpType(param, idx);
781     if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
782         param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
783     }
784     param.assetsDownloadList.push_back(
785         std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
786         pkVals, dataInfo.cloudLogInfo.timestamp));
787     return ret;
788 }
789 
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache,std::vector<VBucket> & localInfo)790 int CloudSyncer::SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
791     std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo)
792 {
793     int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
794     if (ret != E_OK) {
795         LOGE("[CloudSyncer] Invalid download data:%d", ret);
796         return ret;
797     }
798     CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
799     DataInfo dataInfo;
800     VBucket localAssetInfo;
801     bool isExist = true;
802     ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
803     if (ret == -E_NOT_FOUND) {
804         isExist = false;
805     } else if (ret != E_OK) {
806         LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
807         return ret;
808     }
809     // Get cloudLogInfo from cloud data
810     dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
811     // Tag datum to get opType
812     ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
813     if (ret != E_OK) {
814         LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
815         return ret;
816     }
817     CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
818         localLogInfoCache);
819 
820     if (param.isAssetsOnly && param.downloadData.opType[idx] != OpType::LOCKED_NOT_HANDLE) {
821         // if data is lock, not need to save the local info.
822         auto findGid = param.downloadData.data[idx].find(CloudDbConstant::GID_FIELD);
823         if (findGid == param.downloadData.data[idx].end()) {
824             LOGE("[CloudSyncer] data doe's not have gid field when download only asset: %d.", -E_NOT_FOUND);
825             return -E_NOT_FOUND;
826         }
827         localAssetInfo[CloudDbConstant::GID_FIELD] = findGid->second;
828         localAssetInfo[CloudDbConstant::HASH_KEY_FIELD] = dataInfo.localInfo.logInfo.hashKey;
829         localInfo.push_back(localAssetInfo);
830         return ret; // assets only not need to save changed data without assets.
831     }
832 
833     ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
834     if (ret != E_OK) {
835         LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
836     }
837     return ret;
838 }
839 
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)840 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam &param)
841 {
842     if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
843         LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
844         return -E_INVALID_ARGS;
845     }
846     // Update download batch Info
847     param.info.downLoadInfo.batchIndex += 1;
848     param.info.downLoadInfo.total += param.downloadData.data.size();
849     param.info.retryInfo.downloadBatchOpCount = 0;
850     int ret = E_OK;
851     param.deletePrimaryKeySet.clear();
852     param.dupHashKeySet.clear();
853     CloudSyncUtils::ClearWithoutData(param);
854     std::vector<std::pair<Key, size_t>> deletedList;
855     // use for record local delete status
856     std::map<std::string, LogInfo> localLogInfoCache;
857     std::vector<VBucket> localInfo;
858     for (size_t i = 0; i < param.downloadData.data.size(); i++) {
859         ret = SaveDatum(param, i, deletedList, localLogInfoCache, localInfo);
860         if (ret != E_OK) {
861             param.info.downLoadInfo.failCount += param.downloadData.data.size();
862             LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
863             return ret;
864         }
865     }
866     // Save assetsMap into current context
867     {
868         std::lock_guard<std::mutex> autoLock(dataLock_);
869         currentContext_.assetDownloadList = param.assetsDownloadList;
870     }
871 
872     ret = PutCloudSyncDataOrUpdateStatusForAssetOnly(param, localInfo);
873     if (ret != E_OK) {
874         return ret;
875     }
876 
877     ret = UpdateChangedData(param, currentContext_.assetDownloadList);
878     if (ret != E_OK) {
879         param.info.downLoadInfo.failCount += param.downloadData.data.size();
880         LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
881         return ret;
882     }
883     // Update downloadInfo
884     param.info.downLoadInfo.successCount += param.downloadData.data.size();
885     // Get latest cloudWaterMark
886     VBucket &lastData = param.downloadData.data.back();
887     if (!IsQueryListEmpty(taskId) && param.isLastBatch) {
888         // the last batch of cursor in the conditional query is useless
889         param.cloudWaterMark = {};
890     } else {
891         param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
892     }
893     return param.isAssetsOnly ? E_OK : UpdateFlagForSavedRecord(param);
894 }
895 
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)896 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
897 {
898     // Check Input and Context Validity
899     int ret = CheckTaskIdValid(taskId);
900     if (ret != E_OK) {
901         return ret;
902     }
903     {
904         std::lock_guard<std::mutex> autoLock(dataLock_);
905         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
906             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
907             return -E_INVALID_ARGS;
908         }
909     }
910     if (currentContext_.strategy == nullptr) {
911         LOGE("[CloudSyncer] Strategy has not been initialized");
912         return -E_INVALID_ARGS;
913     }
914     ret = storageProxy_->CheckSchema(tableName);
915     if (ret != E_OK) {
916         LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
917         return ret;
918     }
919     return E_OK;
920 }
921 
NeedNotifyChangedData(const ChangedData & changedData)922 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
923 {
924     TaskId taskId;
925     {
926         std::lock_guard<std::mutex> autoLock(dataLock_);
927         taskId = currentContext_.currentTaskId;
928     }
929     if (IsModeForcePush(taskId)) {
930         return false;
931     }
932     // when there have no data been changed, it don't need fill back
933     if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
934         changedData.primaryData[OP_DELETE].empty()) {
935         return false;
936     }
937     return true;
938 }
939 
NotifyChangedDataInCurrentTask(ChangedData && changedData)940 int CloudSyncer::NotifyChangedDataInCurrentTask(ChangedData &&changedData)
941 {
942     if (!NeedNotifyChangedData(changedData)) {
943         return E_OK;
944     }
945     std::string deviceName;
946     {
947         std::lock_guard<std::mutex> autoLock(dataLock_);
948         std::vector<std::string> devices = currentContext_.notifier->GetDevices();
949         if (devices.empty()) {
950             DBDfxAdapter::ReportBehavior(
951                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
952             LOGE("[CloudSyncer] CurrentContext do not contain device info");
953             return -E_CLOUD_ERROR;
954         }
955         // We use first device name as the target of NotifyChangedData
956         deviceName = devices[0];
957     }
958     return CloudSyncUtils::NotifyChangeData(deviceName, storageProxy_, std::move(changedData));
959 }
960 
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)961 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
962 {
963     if (!isFirstDownload && param.downloadData.data.empty()) {
964         // if the second download and there is no download data, do not notify
965         return;
966     }
967     std::lock_guard<std::mutex> autoLock(dataLock_);
968     if (currentContext_.strategy->JudgeUpload()) {
969         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
970     } else {
971         if (param.isLastBatch) {
972             param.info.tableStatus = ProcessStatus::FINISHED;
973         }
974         if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
975             currentContext_.notifier->UpdateProcess(param.info);
976         } else {
977             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
978         }
979     }
980 }
981 
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)982 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam &param)
983 {
984     int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
985     if (ret != E_OK) {
986         LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
987         return ret;
988     }
989     uint64_t cursor = DBConstant::INVALID_CURSOR;
990     std::string maskStoreId = DBCommon::StringMiddleMasking(GetStoreIdByTask(taskId));
991     std::string maskTableName = DBCommon::StringMiddleMasking(param.info.tableName);
992     if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
993         LOGI("[CloudSyncer] cursor before save data is %llu, db: %s, table: %s, task id: %llu", cursor,
994             maskStoreId.c_str(), maskTableName.c_str(), taskId);
995     }
996     (void)storageProxy_->SetCursorIncFlag(true);
997     if (!IsModeForcePush(taskId)) {
998         param.changedData.tableName = param.info.tableName;
999         param.changedData.field = param.pkColNames;
1000         param.changedData.type = ChangedDataType::DATA;
1001     }
1002     ret = SaveData(taskId, param);
1003     (void)storageProxy_->SetCursorIncFlag(false);
1004     if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
1005         LOGI("[CloudSyncer] cursor after save data is %llu, db: %s, table: %s, task id: %llu", cursor,
1006              maskStoreId.c_str(), maskTableName.c_str(), taskId);
1007     }
1008     param.insertPk.clear();
1009     if (ret != E_OK) {
1010         LOGE("[CloudSyncer] cannot save data: %d.", ret);
1011         int rollBackErrorCode = storageProxy_->Rollback();
1012         if (rollBackErrorCode != E_OK) {
1013             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1014         } else {
1015             LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1016         }
1017         return ret;
1018     }
1019     ret = storageProxy_->Commit();
1020     if (ret != E_OK) {
1021         LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1022     }
1023     return ret;
1024 }
1025 
DoDownloadAssets(SyncParam & param)1026 int CloudSyncer::DoDownloadAssets(SyncParam &param)
1027 {
1028     // Begin downloading assets
1029     ChangedData changedAssets;
1030     int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1031     bool isSharedTable = false;
1032     int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1033     if (errCode != E_OK) {
1034         LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1035         return errCode;
1036     }
1037     if (!isSharedTable) {
1038         (void)NotifyChangedDataInCurrentTask(std::move(changedAssets));
1039     }
1040     if (ret != E_OK) {
1041         LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1042     } else {
1043         param.assetsDownloadList.clear();
1044     }
1045     return ret;
1046 }
1047 
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param,bool downloadAssetOnly)1048 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param, bool downloadAssetOnly)
1049 {
1050     InnerProcessInfo tmpInfo = param.info;
1051     int ret = E_OK;
1052     if (!downloadAssetOnly) {
1053         ChangedData changedData;
1054         param.changedData = changedData;
1055         param.downloadData.opType.resize(param.downloadData.data.size());
1056         param.downloadData.existDataKey.resize(param.downloadData.data.size());
1057         param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1058         ret = SaveDataInTransaction(taskId, param);
1059         if (ret != E_OK) {
1060             return ret;
1061         }
1062         // call OnChange to notify changedData object first time (without Assets)
1063         ret = NotifyChangedDataInCurrentTask(std::move(param.changedData));
1064         if (ret != E_OK) {
1065             LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1066             return ret;
1067         }
1068     }
1069     ret = DoDownloadAssets(param);
1070     if (ret == -E_TASK_PAUSED) {
1071         param.info = tmpInfo;
1072         if (IsAsyncDownloadAssets(taskId)) {
1073             UpdateCloudWaterMark(taskId, param);
1074             (void)SaveCloudWaterMark(param.tableName, taskId);
1075         }
1076     }
1077     if (ret != E_OK) {
1078         return ret;
1079     }
1080     UpdateCloudWaterMark(taskId, param);
1081     return E_OK;
1082 }
1083 
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1084 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1085     bool lastBatch)
1086 {
1087     CloudTaskInfo taskInfo;
1088     {
1089         std::lock_guard<std::mutex> autoLock(dataLock_);
1090         taskInfo = cloudTaskInfos_[uploadParam.taskId];
1091     }
1092     std::lock_guard<std::mutex> autoLock(dataLock_);
1093     if (uploadParam.lastTable && lastBatch) {
1094         currentContext_.notifier->UpdateProcess(innerProcessInfo);
1095     } else {
1096         currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1097     }
1098 }
1099 
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1100 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1101 {
1102     SyncParam param;
1103     int errCode = GetSyncParamForDownload(taskId, param);
1104     if (errCode != E_OK) {
1105         LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1106         return errCode;
1107     }
1108     for (auto it = param.assetsDownloadList.begin(); it != param.assetsDownloadList.end();) {
1109         if (std::get<CloudSyncUtils::STRATEGY_INDEX>(*it) != OpType::DELETE) {
1110             it = param.assetsDownloadList.erase(it);
1111         } else {
1112             it++;
1113         }
1114     }
1115     (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1116     errCode = DoDownloadInner(taskId, param, isFirstDownload);
1117     (void)storageProxy_->ClearAllTempSyncTrigger();
1118     if (errCode == -E_TASK_PAUSED) {
1119         // No need to handle ret.
1120         int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1121         if (ret != E_OK) {
1122             LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1123         }
1124         std::lock_guard<std::mutex> autoLock(dataLock_);
1125         resumeTaskInfos_[taskId].syncParam = std::move(param);
1126     }
1127     return errCode;
1128 }
1129 
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1130 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
1131 {
1132     // Query data by batch until reaching end and not more data need to be download
1133     int ret = PreCheck(taskId, param.info.tableName);
1134     if (ret != E_OK) {
1135         return ret;
1136     }
1137     do {
1138         ret = DownloadOneBatch(taskId, param, isFirstDownload);
1139         if (ret != E_OK) {
1140             return ret;
1141         }
1142     } while (!param.isLastBatch);
1143     return E_OK;
1144 }
1145 
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1146 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1147 {
1148     std::lock_guard<std::mutex> autoLock(dataLock_);
1149     if (currentContext_.strategy->JudgeUpload()) {
1150         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1151     } else {
1152         info.tableStatus = FINISHED;
1153         if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1154             currentContext_.notifier->UpdateProcess(info);
1155         } else {
1156             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1157         }
1158     }
1159 }
1160 
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1161 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1162 {
1163     int ret = PreCheck(taskId, tableName);
1164     if (ret != E_OK) {
1165         return ret;
1166     }
1167     {
1168         std::lock_guard<std::mutex> autoLock(dataLock_);
1169         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1170             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1171             return -E_INVALID_ARGS;
1172         }
1173         if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1174             (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1175             LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1176                 static_cast<int>(cloudTaskInfos_[taskId].mode));
1177             return -E_INVALID_ARGS;
1178         }
1179     }
1180 
1181     return ret;
1182 }
1183 
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1184 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1185     InnerProcessInfo &innerProcessInfo)
1186 {
1187     int errCode = E_OK;
1188     if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1189         errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1190         if (errCode != E_OK) {
1191             return errCode;
1192         }
1193     }
1194 
1195     if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1196         errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1197         if (errCode != E_OK) {
1198             return errCode;
1199         }
1200     }
1201 
1202     if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1203         errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1204         if (errCode != E_OK) {
1205             return errCode;
1206         }
1207     }
1208 
1209     if (!uploadData.lockData.rowid.empty()) {
1210         errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1211         if (errCode != E_OK) {
1212             LOGE("[CloudSyncer] Failed to fill cloud log and asset after save upload data: %d", errCode);
1213         }
1214     }
1215     return errCode;
1216 }
1217 
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1218 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1219 {
1220     int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1221     if (errCode != E_OK) {
1222         LOGE("[CloudSyncer] Failed to fill cloud log and asset before save upload data: %d", errCode);
1223         return errCode;
1224     }
1225     Info insertInfo;
1226     Info updateInfo;
1227     Info deleteInfo;
1228     errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1229     if (errCode != E_OK) {
1230         return errCode;
1231     }
1232     bool lastBatch = (innerProcessInfo.upLoadInfo.successCount + innerProcessInfo.upLoadInfo.failCount) ==
1233                      innerProcessInfo.upLoadInfo.total;
1234     if (lastBatch) {
1235         innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1236     }
1237     // After each batch upload successed, call NotifyProcess
1238     NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1239 
1240     // if batch upload successed, update local water mark
1241     // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1242     errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1243     if (errCode != E_OK) {
1244         LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1245     }
1246     return errCode;
1247 }
1248 
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1249 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1250 {
1251     int errCode = E_OK;
1252     storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1253     // if we use local cover cloud strategy, it won't update local water mark also.
1254     if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1255         !IsQueryListEmpty(uploadParam.taskId))) {
1256         return E_OK;
1257     }
1258     errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1259     if (errCode != E_OK) {
1260         LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1261     }
1262     return errCode;
1263 }
1264 
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1265 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1266 {
1267     std::string tableName;
1268     int ret = GetCurrentTableName(tableName);
1269     if (ret != E_OK) {
1270         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1271         return ret;
1272     }
1273 
1274     Timestamp localMark = 0u;
1275     ret = PreCheckUpload(taskId, tableName, localMark);
1276     if (ret != E_OK) {
1277         LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1278         return ret;
1279     }
1280     ReloadWaterMarkIfNeed(taskId, localMark);
1281     storageProxy_->OnUploadStart();
1282 
1283     int64_t count = 0;
1284     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1285         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1286     LOGI("get upload count:%zu", count);
1287     if (ret != E_OK) {
1288         // GetUploadCount will return E_OK when upload count is zero.
1289         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1290         return ret;
1291     }
1292     if (count == 0) {
1293         UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1294         return E_OK;
1295     }
1296     UploadParam param;
1297     param.count = count;
1298     param.lastTable = lastTable;
1299     param.taskId = taskId;
1300     param.lockAction = lockAction;
1301     return DoUploadInner(tableName, param);
1302 }
1303 
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1304 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1305     CloudSyncData &uploadData)
1306 {
1307     // Precheck and calculate local water mark which would be updated if batch upload successed.
1308     int ret = CheckTaskIdValid(uploadParam.taskId);
1309     if (ret != E_OK) {
1310         return ret;
1311     }
1312     ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1313         innerProcessInfo.upLoadInfo.total);
1314     if (ret != E_OK) {
1315         LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1316         return ret;
1317     }
1318     TagUploadAssets(uploadData);
1319     CloudSyncUtils::UpdateAssetsFlag(uploadData);
1320     // get local water mark to be updated in future.
1321     ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1322         uploadParam.taskId, uploadParam.localMark);
1323     if (ret != E_OK) {
1324         LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1325     }
1326     return ret;
1327 }
1328 
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1329 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1330 {
1331     {
1332         std::lock_guard<std::mutex> autoLock(dataLock_);
1333         if (cloudTaskInfos_[taskId].isAssetsOnly) {
1334             // assset only task does not need to save water mark.
1335             return E_OK;
1336         }
1337     }
1338     std::string cloudWaterMark;
1339     bool isUpdateCloudCursor = true;
1340     {
1341         std::lock_guard<std::mutex> autoLock(dataLock_);
1342         if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1343             currentContext_.cloudWaterMarks.end() ||
1344             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1345             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1346             LOGD("[CloudSyncer] Not found water mark just return");
1347             return E_OK;
1348         }
1349         cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1350         isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1351     }
1352     isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId));
1353     if (isUpdateCloudCursor) {
1354         int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1355         if (errCode != E_OK) {
1356             LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1357         }
1358         return errCode;
1359     }
1360     return E_OK;
1361 }
1362 
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1363 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1364 {
1365     std::lock_guard<std::mutex> autoLock(dataLock_);
1366     uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1367     uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1368 }
1369 
IsModeForcePush(TaskId taskId)1370 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1371 {
1372     std::lock_guard<std::mutex> autoLock(dataLock_);
1373     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1374 }
1375 
IsModeForcePull(const TaskId taskId)1376 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1377 {
1378     std::lock_guard<std::mutex> autoLock(dataLock_);
1379     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1380 }
1381 
IsPriorityTask(TaskId taskId)1382 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1383 {
1384     std::lock_guard<std::mutex> autoLock(dataLock_);
1385     return cloudTaskInfos_[taskId].priorityTask;
1386 }
1387 
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1388 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1389 {
1390     // type index of field in fields, true means mandatory filed
1391     static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1392         std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1393         std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1394         std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1395         std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1396         std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1397         std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1398     };
1399 
1400     for (const auto &fieldIndex : fieldAndIndex) {
1401         if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1402             if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1403                 continue;
1404             }
1405             LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", std::get<0>(fieldIndex).c_str());
1406             return -E_CLOUD_ERROR;
1407         }
1408         if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1409             LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", std::get<0>(fieldIndex).c_str());
1410             return -E_CLOUD_ERROR;
1411         }
1412     }
1413 
1414     if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1415         CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1416     }
1417     std::lock_guard<std::mutex> autoLock(dataLock_);
1418     if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1419         LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1420         return -E_CLOUD_ERROR;
1421     }
1422     return E_OK;
1423 }
1424 
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1425 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1426     DownloadData &downloadData)
1427 {
1428     VBucket extend;
1429     int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1430     if (ret != E_OK) {
1431         return ret;
1432     }
1433     ret = cloudDB_.Query(tableName, extend, downloadData.data);
1434     if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1435         if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1436             LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1437             return -E_CLOUD_ERROR;
1438         }
1439         cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1440         LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1441         return ret;
1442     }
1443     if (ret == -E_QUERY_END) {
1444         LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1445         return -E_QUERY_END;
1446     }
1447     if (ret != E_OK) {
1448         LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1449     }
1450     return ret;
1451 }
1452 
CheckTaskIdValid(TaskId taskId)1453 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1454 {
1455     if (closed_) {
1456         LOGE("[CloudSyncer] DB is closed.");
1457         return -E_DB_CLOSED;
1458     }
1459     std::lock_guard<std::mutex> autoLock(dataLock_);
1460     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1461         LOGE("[CloudSyncer] not found task.");
1462         return -E_INVALID_ARGS;
1463     }
1464     if (cloudTaskInfos_[taskId].pause) {
1465         LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1466         return -E_TASK_PAUSED;
1467     }
1468     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1469         LOGE("[CloudSyncer] task %" PRIu64 " has error: %d", taskId, cloudTaskInfos_[taskId].errCode);
1470         return cloudTaskInfos_[taskId].errCode;
1471     }
1472     return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1473 }
1474 
GetCurrentTableName(std::string & tableName)1475 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1476 {
1477     std::lock_guard<std::mutex> autoLock(dataLock_);
1478     if (currentContext_.tableName.empty()) {
1479         return -E_BUSY;
1480     }
1481     tableName = currentContext_.tableName;
1482     return E_OK;
1483 }
1484 
GetCurrentCommonTaskNum()1485 size_t CloudSyncer::GetCurrentCommonTaskNum()
1486 {
1487     size_t queuedTaskNum = taskQueue_.count(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1488     auto rang = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1489     size_t compensatedTaskNum = 0;
1490     for (auto it = rang.first; it != rang.second; ++it) {
1491         compensatedTaskNum += cloudTaskInfos_[it->second].compensatedTask ? 1 : 0;
1492     }
1493     return queuedTaskNum - compensatedTaskNum;
1494 }
1495 
CheckQueueSizeWithNoLock(bool priorityTask)1496 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1497 {
1498     int32_t limit = queuedManualSyncLimit_;
1499     size_t totalTaskNum = taskQueue_.size();
1500     size_t commonTaskNum = GetCurrentCommonTaskNum();
1501     size_t queuedTaskNum = priorityTask ? totalTaskNum - commonTaskNum : commonTaskNum;
1502     if (queuedTaskNum >= static_cast<size_t>(limit)) {
1503         std::string priorityName = priorityTask ? CLOUD_PRIORITY_TASK_STRING : CLOUD_COMMON_TASK_STRING;
1504         LOGW("[CloudSyncer] too much %s sync task", priorityName.c_str());
1505         return -E_BUSY;
1506     }
1507     return E_OK;
1508 }
1509 
PrepareSync(TaskId taskId)1510 int CloudSyncer::PrepareSync(TaskId taskId)
1511 {
1512     std::lock_guard<std::mutex> autoLock(dataLock_);
1513     if (hasKvRemoveTask) {
1514         return -E_CLOUD_ERROR;
1515     }
1516     if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1517         LOGW("[CloudSyncer] Abort sync because syncer is closed");
1518         return -E_DB_CLOSED;
1519     }
1520     if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1521         LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1522         return -E_DB_CLOSED;
1523     }
1524     currentContext_.currentTaskId = taskId;
1525     cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1526     cloudTaskInfos_[taskId].pause = false;
1527     cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1528     if (cloudTaskInfos_[taskId].resume) {
1529         auto tempLocker = currentContext_.locker;
1530         currentContext_ = resumeTaskInfos_[taskId].context;
1531         currentContext_.locker = tempLocker;
1532     } else {
1533         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1534         currentContext_.strategy =
1535             StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, isKvScene_, policy_);
1536         currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1537             cloudTaskInfos_[taskId].users);
1538         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1539     }
1540     LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64 " priority[%d] compensated[%d] logicDelete[%d]",
1541         cloudTaskInfos_[taskId].storeId.c_str(), taskId, static_cast<int>(cloudTaskInfos_[taskId].priorityTask),
1542         static_cast<int>(cloudTaskInfos_[taskId].compensatedTask),
1543         static_cast<int>(storageProxy_->IsCurrentLogicDelete()));
1544     return E_OK;
1545 }
1546 
LockCloud(TaskId taskId)1547 int CloudSyncer::LockCloud(TaskId taskId)
1548 {
1549     int period;
1550     {
1551         auto res = cloudDB_.Lock();
1552         if (res.first != E_OK) {
1553             return res.first;
1554         }
1555         period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1556     }
1557     int errCode = StartHeartBeatTimer(period, taskId);
1558     if (errCode != E_OK) {
1559         UnlockCloud();
1560     }
1561     return errCode;
1562 }
1563 
UnlockCloud()1564 int CloudSyncer::UnlockCloud()
1565 {
1566     FinishHeartBeatTimer();
1567     return cloudDB_.UnLock();
1568 }
1569 
StartHeartBeatTimer(int period,TaskId taskId)1570 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1571 {
1572     if (timerId_ != 0u) {
1573         LOGW("[CloudSyncer] HeartBeat timer has been start!");
1574         return E_OK;
1575     }
1576     TimerId timerId = 0;
1577     int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1578         HeartBeat(timerId, taskId);
1579         return E_OK;
1580     }, nullptr, timerId);
1581     if (errCode != E_OK) {
1582         LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1583         return errCode;
1584     }
1585     timerId_ = timerId;
1586     return E_OK;
1587 }
1588 
FinishHeartBeatTimer()1589 void CloudSyncer::FinishHeartBeatTimer()
1590 {
1591     if (timerId_ == 0u) {
1592         return;
1593     }
1594     RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1595     timerId_ = 0u;
1596     LOGD("[CloudSyncer] Finish heartbeat timer ok");
1597 }
1598 
HeartBeat(TimerId timerId,TaskId taskId)1599 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1600 {
1601     if (timerId_ != timerId) {
1602         return;
1603     }
1604     IncObjRef(this);
1605     {
1606         std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1607         heartbeatCount_[taskId]++;
1608     }
1609     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1610         {
1611             std::lock_guard<std::mutex> guard(dataLock_);
1612             if (currentContext_.currentTaskId != taskId) {
1613                 heartbeatCount_.erase(taskId);
1614                 failedHeartbeatCount_.erase(taskId);
1615                 DecObjRef(this);
1616                 return;
1617             }
1618         }
1619         if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1620             // heartbeat block twice should finish task now
1621             SetTaskFailed(taskId, -E_CLOUD_ERROR);
1622         } else {
1623             int ret = cloudDB_.HeartBeat();
1624             if (ret != E_OK) {
1625                 HeartBeatFailed(taskId, ret);
1626             } else {
1627                 failedHeartbeatCount_[taskId] = 0;
1628             }
1629         }
1630         {
1631             std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1632             heartbeatCount_[taskId]--;
1633             if (currentContext_.currentTaskId != taskId) {
1634                 heartbeatCount_.erase(taskId);
1635                 failedHeartbeatCount_.erase(taskId);
1636             }
1637         }
1638         DecObjRef(this);
1639     });
1640     if (errCode != E_OK) {
1641         LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1642         DecObjRef(this);
1643     }
1644 }
1645 
HeartBeatFailed(TaskId taskId,int errCode)1646 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1647 {
1648     failedHeartbeatCount_[taskId]++;
1649     if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1650         return;
1651     }
1652     LOGW("[CloudSyncer] heartbeat failed too much times!");
1653     FinishHeartBeatTimer();
1654     SetTaskFailed(taskId, errCode);
1655 }
1656 
SetTaskFailed(TaskId taskId,int errCode)1657 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1658 {
1659     std::lock_guard<std::mutex> autoLock(dataLock_);
1660     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1661         return;
1662     }
1663     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1664         return;
1665     }
1666     cloudTaskInfos_[taskId].errCode = errCode;
1667 }
1668 
GetCloudSyncTaskCount()1669 int32_t CloudSyncer::GetCloudSyncTaskCount()
1670 {
1671     std::lock_guard<std::mutex> autoLock(dataLock_);
1672     return static_cast<int32_t>(taskQueue_.size());
1673 }
1674 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1675 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1676     const RelationalSchemaObject &localSchema)
1677 {
1678     std::lock_guard<std::mutex> lock(syncMutex_);
1679     int index = 1;
1680     for (const auto &tableName: tableNameList) {
1681         LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1682         int ret = storageProxy_->CleanWaterMark(tableName);
1683         if (ret != E_OK) {
1684         LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1685             return ret;
1686         }
1687         index++;
1688     }
1689     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1690     if (errCode != E_OK) {
1691         LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1692         return errCode;
1693     }
1694 
1695     std::vector<Asset> assets;
1696     errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1697     if (errCode != E_OK) {
1698         LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1699         storageProxy_->Rollback();
1700         return errCode;
1701     }
1702     LOGI("[CloudSyncer] Clean cloud data success, mode=%d", mode);
1703     if (!assets.empty() && mode == FLAG_AND_DATA) {
1704         errCode = cloudDB_.RemoveLocalAssets(assets);
1705         if (errCode != E_OK) {
1706             LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1707             storageProxy_->Rollback();
1708             return errCode;
1709         }
1710         LOGI("[CloudSyncer] Remove local asset success, size=%zu", assets.size());
1711     }
1712 
1713     storageProxy_->Commit();
1714     return errCode;
1715 }
1716 
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1717 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1718 {
1719     std::lock_guard<std::mutex> lock(syncMutex_);
1720     for (const auto &tableName: tableNameList) {
1721         int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1722         if (ret != E_OK) {
1723             LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1724             return ret;
1725         }
1726     }
1727     return E_OK;
1728 }
1729 
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1730 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam &param)
1731 {
1732     LOGI("[CloudSyncer] save cloud water [%s] of table [%s length[%u]]", param.cloudWaterMark.c_str(),
1733         DBCommon::StringMiddleMasking(param.tableName).c_str(), param.tableName.length());
1734     {
1735         std::lock_guard<std::mutex> autoLock(dataLock_);
1736         if (cloudTaskInfos_[taskId].isAssetsOnly) {
1737             LOGI("[CloudSyncer] assets only task not save water mark.");
1738             return;
1739         }
1740         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1741     }
1742 }
1743 
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1744 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1745     DownloadCommitList &commitList, int errCode)
1746 {
1747     if (commitList.empty()) {
1748         return E_OK;
1749     }
1750     uint32_t successCount = 0u;
1751     int ret = E_OK;
1752     if (info.isAsyncDownload) {
1753         ret = CommitDownloadAssetsForAsyncDownload(downloadItem, info, commitList, successCount);
1754     } else {
1755         ret = HandleDownloadResult(downloadItem, info, commitList, successCount);
1756     }
1757     if (errCode == E_OK) {
1758         info.downLoadInfo.failCount += (commitList.size() - successCount);
1759         info.downLoadInfo.successCount -= (commitList.size() - successCount);
1760     }
1761     if (ret != E_OK) {
1762         LOGE("Commit download result failed.%d", ret);
1763     }
1764     commitList.clear();
1765     return ret;
1766 }
1767 
GetIdentify() const1768 std::string CloudSyncer::GetIdentify() const
1769 {
1770     return id_;
1771 }
1772 
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1773 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult)
1774 {
1775     strategyOpResult = OpType::NOT_HANDLE;
1776     // ignore same record with local generate data
1777     if (dataInfo.localInfo.logInfo.device.empty() &&
1778         !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1779         // not handle same data
1780         return E_OK;
1781     }
1782     {
1783         std::lock_guard<std::mutex> autoLock(dataLock_);
1784         if (!currentContext_.strategy) {
1785             LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1786             return -E_INTERNAL_ERROR;
1787         }
1788         bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1789             dataInfo.cloudLogInfo, policy_);
1790         strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1791             dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1792     }
1793     if (strategyOpResult == OpType::DELETE) {
1794         param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1795     }
1796     return E_OK;
1797 }
1798 
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1799 int CloudSyncer::GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
1800     std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1801 {
1802     int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index], true,
1803         logInfo, localAssetInfo);
1804     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1805         return errCode;
1806     }
1807     std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1808     if (hashKey.empty()) {
1809         return errCode;
1810     }
1811 
1812     int ret = CheckLocalQueryAssetsOnlyIfNeed(localAssetInfo, param, logInfo);
1813     if (ret != E_OK) {
1814         LOGE("[CloudSyncer] check local assets failed, error code: %d", ret);
1815         return ret;
1816     }
1817 
1818     param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1819     param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1820     if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1821         LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1822             DBCommon::TransferStringToHex(hashKey).c_str());
1823         logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1824         logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1825         logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1826         logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1827         logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1828         logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1829         logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1830         // delete record should remove local asset info
1831         if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1832             localAssetInfo.clear();
1833         }
1834         errCode = E_OK;
1835     }
1836     logInfo.logInfo.isNeedUpdateAsset = CloudSyncUtils::IsNeedUpdateAsset(localAssetInfo);
1837     return errCode;
1838 }
1839 
GetNextTaskId()1840 TaskId CloudSyncer::GetNextTaskId()
1841 {
1842     std::lock_guard<std::mutex> autoLock(dataLock_);
1843     if (!taskQueue_.empty()) {
1844         return taskQueue_.begin()->second;
1845     }
1846     return INVALID_TASK_ID;
1847 }
1848 
MarkCurrentTaskPausedIfNeed(const CloudTaskInfo & taskInfo)1849 void CloudSyncer::MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo)
1850 {
1851     // must sure have dataLock_ before call this function.
1852     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1853         return;
1854     }
1855     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1856         return;
1857     }
1858     if (cloudTaskInfos_[currentContext_.currentTaskId].priorityLevel < taskInfo.priorityLevel) {
1859         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1860         LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1861     }
1862 }
1863 
SetCurrentTaskFailedWithoutLock(int errCode)1864 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1865 {
1866     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1867         return;
1868     }
1869     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1870 }
1871 
LockCloudIfNeed(TaskId taskId)1872 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1873 {
1874     {
1875         std::lock_guard<std::mutex> autoLock(dataLock_);
1876         if (currentContext_.locker != nullptr) {
1877             LOGD("[CloudSyncer] lock exist");
1878             return E_OK;
1879         }
1880     }
1881     std::shared_ptr<CloudLocker> locker = nullptr;
1882     int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1883         return LockCloud(taskId);
1884     }, [this]() {
1885         int unlockCode = UnlockCloud();
1886         if (unlockCode != E_OK) {
1887             SetCurrentTaskFailedWithoutLock(unlockCode);
1888         }
1889     }, locker);
1890     {
1891         std::lock_guard<std::mutex> autoLock(dataLock_);
1892         currentContext_.locker = locker;
1893     }
1894     return errCode;
1895 }
1896 
UnlockIfNeed()1897 void CloudSyncer::UnlockIfNeed()
1898 {
1899     std::shared_ptr<CloudLocker> cacheLocker;
1900     {
1901         std::lock_guard<std::mutex> autoLock(dataLock_);
1902         cacheLocker = currentContext_.locker;
1903         currentContext_.locker = nullptr;
1904     }
1905     // unlock without mutex
1906     cacheLocker = nullptr;
1907 }
1908 
ClearCurrentContextWithoutLock()1909 void CloudSyncer::ClearCurrentContextWithoutLock()
1910 {
1911     heartbeatCount_.erase(currentContext_.currentTaskId);
1912     failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1913     currentContext_.currentTaskId = INVALID_TASK_ID;
1914     currentContext_.notifier = nullptr;
1915     currentContext_.strategy = nullptr;
1916     currentContext_.processRecorder = nullptr;
1917     currentContext_.tableName.clear();
1918     currentContext_.assetDownloadList.clear();
1919     currentContext_.assetFields.clear();
1920     currentContext_.assetsInfo.clear();
1921     currentContext_.cloudWaterMarks.clear();
1922     currentContext_.isNeedUpload = false;
1923     currentContext_.currentUserIndex = 0;
1924     currentContext_.repeatCount = 0;
1925 }
1926 
IsAlreadyHaveCompensatedSyncTask()1927 bool CloudSyncer::IsAlreadyHaveCompensatedSyncTask()
1928 {
1929     std::lock_guard<std::mutex> autoLock(dataLock_);
1930     for (const auto &item : cloudTaskInfos_) {
1931         const auto &taskInfo = item.second;
1932         if (taskInfo.compensatedTask) {
1933             return true;
1934         }
1935     }
1936     return false;
1937 }
1938 
ClearContextAndNotify(TaskId taskId,int errCode)1939 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1940 {
1941     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1942     CloudTaskInfo info;
1943     {
1944         // clear current context
1945         std::lock_guard<std::mutex> autoLock(dataLock_);
1946         notifier = currentContext_.notifier;
1947         ClearCurrentContextWithoutLock();
1948         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1949             LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1950             contextCv_.notify_all();
1951             return;
1952         }
1953         info = std::move(cloudTaskInfos_[taskId]);
1954         cloudTaskInfos_.erase(taskId);
1955         resumeTaskInfos_.erase(taskId);
1956     }
1957     int err = storageProxy_->ClearUnLockingNoNeedCompensated();
1958     if (err != E_OK) {
1959         // if clear unlocking failed, no return to avoid affecting the entire process
1960         LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err);
1961     }
1962     contextCv_.notify_all();
1963     if (info.errCode == E_OK) {
1964         info.errCode = errCode;
1965     }
1966     LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1967         info.errCode);
1968     info.status = ProcessStatus::FINISHED;
1969     if (notifier != nullptr) {
1970         notifier->UpdateAllTablesFinally();
1971         notifier->NotifyProcess(info, {}, true);
1972     }
1973     // generate compensated sync
1974     // if already have compensated sync task in queue, no need to generate new compensated sync task
1975     if (!info.compensatedTask && !IsAlreadyHaveCompensatedSyncTask()) {
1976         CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1977         GenerateCompensatedSync(taskInfo);
1978     }
1979 }
1980 
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1981 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload)
1982 {
1983     int ret = CheckTaskIdValid(taskId);
1984     if (ret != E_OK) {
1985         return ret;
1986     }
1987     ret = DownloadDataFromCloud(taskId, param, isFirstDownload);
1988     if (ret != E_OK) {
1989         return ret;
1990     }
1991     bool downloadAssetOnly = false;
1992     if (param.downloadData.data.empty()) {
1993         if (param.assetsDownloadList.empty()) {
1994             return E_OK;
1995         }
1996         LOGI("[CloudSyncer] Resume task need download assets");
1997         downloadAssetOnly = true;
1998     }
1999     // Save data in transaction, update cloud water mark, notify process and changed data
2000     ret = SaveDataNotifyProcess(taskId, param, downloadAssetOnly);
2001     if (ret == -E_TASK_PAUSED) {
2002         return ret;
2003     }
2004     if (ret != E_OK) {
2005         std::lock_guard<std::mutex> autoLock(dataLock_);
2006         param.info.tableStatus = ProcessStatus::FINISHED;
2007         currentContext_.notifier->UpdateProcess(param.info);
2008         return ret;
2009     }
2010     (void)NotifyInDownload(taskId, param, isFirstDownload);
2011     return SaveCloudWaterMark(param.tableName, taskId);
2012 }
2013 
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)2014 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
2015     DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
2016 {
2017     CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
2018     if (downloadItem.assets.empty()) { // Download data (include deleting)
2019         return E_OK;
2020     }
2021     bool isSharedTable = false;
2022     int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
2023     if (errorCode != E_OK) {
2024         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
2025         return errorCode;
2026     }
2027     if (!isSharedTable) {
2028         errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
2029         if (errorCode == -E_NOT_SET) {
2030             return -E_NOT_SET;
2031         }
2032     } else {
2033         // share table will not download asset, need to reset the status
2034         for (auto &entry: downloadItem.assets) {
2035             for (auto &asset: entry.second) {
2036                 asset.status = AssetStatus::NORMAL;
2037             }
2038         }
2039     }
2040     ModifyDownLoadInfoCount(errorCode, info);
2041     if (!downloadItem.assets.empty()) {
2042         if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
2043             if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
2044                 LOGW("[CloudSyncer] [DownloadOneAssetRecord] strategy is invalid.");
2045             } else {
2046                 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
2047                     downloadItem.primaryKeyValList);
2048             }
2049         } else if (downloadItem.strategy == OpType::INSERT) {
2050             changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
2051         }
2052     }
2053     return errorCode;
2054 }
2055 
GetSyncParamForDownload(TaskId taskId,SyncParam & param)2056 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam &param)
2057 {
2058     int ret = E_OK;
2059     if (IsCurrentTableResume(taskId, false)) {
2060         std::lock_guard<std::mutex> autoLock(dataLock_);
2061         if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
2062             param = resumeTaskInfos_[taskId].syncParam;
2063             resumeTaskInfos_[taskId].syncParam = {};
2064             ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2065             if (ret != E_OK) {
2066                 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2067             }
2068             LOGD("[CloudSyncer] Get sync param from cache");
2069             return E_OK;
2070         }
2071     }
2072     ret = GetCurrentTableName(param.tableName);
2073     if (ret != E_OK) {
2074         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2075         return ret;
2076     }
2077     param.info.tableName = param.tableName;
2078     std::vector<Field> assetFields;
2079     // only no primary key and composite primary key contains rowid.
2080     ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2081     if (ret != E_OK) {
2082         LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2083         return ret;
2084     }
2085     {
2086         std::lock_guard<std::mutex> autoLock(dataLock_);
2087         currentContext_.assetFields[currentContext_.tableName] = assetFields;
2088     }
2089     param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2090     if (!IsModeForcePull(taskId) && (!IsPriorityTask(taskId) || IsQueryListEmpty(taskId))) {
2091         ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2092         if (ret != E_OK) {
2093             LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2094         }
2095         if (!IsCurrentTaskResume(taskId)) {
2096             ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark);
2097         }
2098     }
2099     currentContext_.notifier->GetDownloadInfoByTableName(param.info);
2100     auto queryObject = GetQuerySyncObject(param.tableName);
2101     param.isAssetsOnly = queryObject.IsAssetsOnly();
2102     param.groupNum = queryObject.GetGroupNum();
2103     param.assetsGroupMap = queryObject.GetAssetsOnlyGroupMap();
2104     param.isVaildForAssetsOnly = queryObject.AssetsOnlyErrFlag() == E_OK;
2105     param.isForcePullAseets = IsModeForcePull(taskId) && queryObject.IsContainQueryNodes();
2106     return ret;
2107 }
2108 
IsCurrentTaskResume(TaskId taskId)2109 bool CloudSyncer::IsCurrentTaskResume(TaskId taskId)
2110 {
2111     std::lock_guard<std::mutex> autoLock(dataLock_);
2112     return cloudTaskInfos_[taskId].resume;
2113 }
2114 
IsCurrentTableResume(TaskId taskId,bool upload)2115 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2116 {
2117     std::lock_guard<std::mutex> autoLock(dataLock_);
2118     if (!cloudTaskInfos_[taskId].resume) {
2119         return false;
2120     }
2121     if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2122         return false;
2123     }
2124     return upload == resumeTaskInfos_[taskId].upload;
2125 }
2126 
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool isFirstDownload)2127 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool isFirstDownload)
2128 {
2129     // Get cloud data after cloud water mark
2130     param.info.tableStatus = ProcessStatus::PROCESSING;
2131     param.downloadData = {};
2132     if (param.isAssetsOnly) {
2133         param.cloudWaterMarkForAssetsOnly = param.cloudWaterMark;
2134     }
2135     int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2136     CloudSyncUtils::CheckQueryCloudData(cloudTaskInfos_[taskId].prepareTraceId, param.downloadData, param.pkColNames);
2137     if (ret == -E_QUERY_END) {
2138         // Won't break here since downloadData may not be null
2139         param.isLastBatch = true;
2140     } else if (ret != E_OK) {
2141         std::lock_guard<std::mutex> autoLock(dataLock_);
2142         param.info.tableStatus = ProcessStatus::FINISHED;
2143         currentContext_.notifier->UpdateProcess(param.info);
2144         return ret;
2145     }
2146 
2147     ret = CheckCloudQueryAssetsOnlyIfNeed(taskId, param);
2148     if (ret != E_OK) {
2149         LOGE("[CloudSyncer] query assets failed, error code: %d", ret);
2150         std::lock_guard<std::mutex> autoLock(dataLock_);
2151         param.info.tableStatus = ProcessStatus::FINISHED;
2152         currentContext_.notifier->UpdateProcess(param.info);
2153         return ret;
2154     }
2155 
2156     if (param.downloadData.data.empty()) {
2157         if (ret == E_OK || isFirstDownload) {
2158             LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2159             UpdateCloudWaterMark(taskId, param);
2160             // Cloud water may change on the cloud, it needs to be saved here
2161             SaveCloudWaterMark(param.tableName, taskId);
2162         }
2163         if (isFirstDownload) {
2164             NotifyInEmptyDownload(taskId, param.info);
2165         }
2166     }
2167     return E_OK;
2168 }
2169 
ModifyDownLoadInfoCount(const int errorCode,InnerProcessInfo & info)2170 void CloudSyncer::ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info)
2171 {
2172     if (errorCode == E_OK) {
2173         return;
2174     }
2175     info.downLoadInfo.failCount += 1;
2176     if (info.downLoadInfo.successCount == 0) {
2177         LOGW("[CloudSyncer] Invalid successCount");
2178     } else {
2179         info.downLoadInfo.successCount -= 1;
2180     }
2181 }
2182 
GetResumeWaterMark(TaskId taskId)2183 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2184 {
2185     std::lock_guard<std::mutex> autoLock(dataLock_);
2186     return resumeTaskInfos_[taskId].lastLocalWatermark;
2187 }
2188 } // namespace DistributedDB
2189